146 lines
6.2 KiB
Python
146 lines
6.2 KiB
Python
import time
|
|
import logging
|
|
import asyncio
|
|
import os
|
|
import json
|
|
from flask import Flask, render_template, request
|
|
from flask_socketio import SocketIO
|
|
from binance import Client
|
|
import websockets
|
|
from threading import Lock
|
|
from datetime import datetime, timedelta
|
|
|
|
# --- Configuration ---
|
|
SYMBOL = 'ETHUSDT'
|
|
HISTORY_FILE = 'historical_data_1m.json'
|
|
RESTART_TIMEOUT_S = 15
|
|
BINANCE_WS_URL = f"wss://stream.binance.com:9443/ws/{SYMBOL.lower()}@trade"
|
|
|
|
# --- Logging Setup ---
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# --- Flask App Initialization ---
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'secret!'
|
|
socketio = SocketIO(app, async_mode='threading')
|
|
|
|
# --- Global State ---
|
|
app_initialized = False
|
|
app_init_lock = Lock()
|
|
current_bar = {} # To track the currently forming 1-minute candle
|
|
|
|
# --- Historical Data Streaming ---
|
|
def stream_historical_data(sid):
|
|
"""
|
|
Fetches the last week of historical 1-minute kline data from Binance,
|
|
saves it to a file, and sends it to the connected client.
|
|
"""
|
|
try:
|
|
logging.info(f"Starting historical data stream for SID={sid}")
|
|
client = Client()
|
|
|
|
# --- NEW SOLUTION: Load data for the last week ---
|
|
logging.info(f"Fetching historical data for the last 7 days for SID={sid}")
|
|
# The `python-binance` library allows using relative date strings.
|
|
# This single call is more efficient for this use case.
|
|
all_klines = client.get_historical_klines(
|
|
SYMBOL,
|
|
Client.KLINE_INTERVAL_1MINUTE,
|
|
start_str="1 week ago UTC" # Fetches data starting from 7 days ago until now
|
|
)
|
|
|
|
# --- ORIGINAL SOLUTION COMMENTED OUT ---
|
|
# num_chunks = 6
|
|
# chunk_size_days = 15
|
|
# end_date = datetime.utcnow()
|
|
# all_klines = []
|
|
#
|
|
# for i in range(num_chunks):
|
|
# start_date = end_date - timedelta(days=chunk_size_days)
|
|
# logging.info(f"Fetching chunk {i + 1}/{num_chunks} for SID={sid}")
|
|
# new_klines = client.get_historical_klines(SYMBOL, Client.KLINE_INTERVAL_1MINUTE, str(start_date), str(end_date))
|
|
# if new_klines:
|
|
# all_klines.extend(new_klines)
|
|
# # The progress emission is no longer needed for a single API call
|
|
# # socketio.emit('history_progress', {'progress': ((i + 1) / num_chunks) * 100}, to=sid)
|
|
# end_date = start_date
|
|
# socketio.sleep(0.05)
|
|
# --- END OF ORIGINAL SOLUTION ---
|
|
|
|
# The rest of the function processes the `all_klines` data as before
|
|
seen = set()
|
|
unique_klines = [kline for kline in sorted(all_klines, key=lambda x: x[0]) if tuple(kline) not in seen and not seen.add(tuple(kline))]
|
|
|
|
with open(HISTORY_FILE, 'w') as f:
|
|
json.dump(unique_klines, f)
|
|
|
|
logging.info(f"Finished data stream for SID={sid}. Sending final payload of {len(unique_klines)} klines.")
|
|
socketio.emit('history_finished', {'klines_1m': unique_klines}, to=sid)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in stream_historical_data for SID={sid}: {e}", exc_info=True)
|
|
socketio.emit('history_error', {'message': str(e)}, to=sid)
|
|
|
|
# --- Real-time Data Listener ---
|
|
def binance_listener_thread():
|
|
"""
|
|
Connects to Binance, manages the 1-minute candle, and emits updates.
|
|
"""
|
|
global current_bar
|
|
async def listener():
|
|
global current_bar
|
|
while True:
|
|
try:
|
|
logging.info(f"Connecting to Binance WebSocket at {BINANCE_WS_URL}...")
|
|
async with websockets.connect(BINANCE_WS_URL) as websocket:
|
|
logging.info("Binance WebSocket connected successfully.")
|
|
while True:
|
|
message = await websocket.recv()
|
|
trade = json.loads(message)
|
|
|
|
price = float(trade['p'])
|
|
trade_time_s = trade['T'] // 1000
|
|
candle_timestamp = trade_time_s - (trade_time_s % 60)
|
|
|
|
if not current_bar or candle_timestamp > current_bar.get("time", 0):
|
|
if current_bar:
|
|
# The previous candle is now closed, emit it
|
|
logging.info(f"Candle closed at {current_bar['close']}. Emitting 'candle_closed' event.")
|
|
socketio.emit('candle_closed', current_bar)
|
|
|
|
current_bar = {"time": candle_timestamp, "open": price, "high": price, "low": price, "close": price}
|
|
else:
|
|
current_bar['high'] = max(current_bar.get('high', price), price)
|
|
current_bar['low'] = min(current_bar.get('low', price), price)
|
|
current_bar['close'] = price
|
|
|
|
# Emit the live, updating candle for visual feedback
|
|
socketio.emit('candle_update', current_bar)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Binance listener error: {e}. Reconnecting...")
|
|
await asyncio.sleep(RESTART_TIMEOUT_S)
|
|
|
|
asyncio.run(listener())
|
|
|
|
# --- SocketIO Event Handlers ---
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
global app_initialized
|
|
logging.info(f"Client connected: IP={request.remote_addr}, SID={request.sid}")
|
|
with app_init_lock:
|
|
if not app_initialized:
|
|
logging.info("--- Initializing Application ---")
|
|
socketio.start_background_task(binance_listener_thread)
|
|
app_initialized = True
|
|
socketio.start_background_task(target=stream_historical_data, sid=request.sid)
|
|
|
|
# --- Flask Routes ---
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html', symbol=SYMBOL)
|
|
|
|
# --- Main Application Execution ---
|
|
if __name__ == '__main__':
|
|
logging.info("Starting Flask-SocketIO server...")
|
|
socketio.run(app, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True, debug=False) |