import time import logging import asyncio import os import json import csv import re from flask import Flask, render_template, request from flask_socketio import SocketIO from binance import Client import websockets from threading import Lock from datetime import datetime, timedelta # --- Configuration --- SYMBOL = 'ETHUSDT' HISTORY_FILE = 'historical_data_1m.json' DATA_FOLDER = 'data' USER_PREFERENCES_FILE = 'user_preferences.json' RESTART_TIMEOUT_S = 15 BINANCE_WS_URL = f"wss://stream.binance.com:9443/ws/{SYMBOL.lower()}@trade" # --- Logging Setup --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- Flask App Initialization --- app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, async_mode='threading') # --- Global State --- app_initialized = False app_init_lock = Lock() current_bar = {} # To track the currently forming 1-minute candle selected_csv_file = None # Currently selected CSV file csv_file_lock = Lock() # Lock for CSV file operations # --- Utility Functions --- def get_available_csv_files(): """Get list of available CSV files with their start dates.""" csv_files = [] if not os.path.exists(DATA_FOLDER): os.makedirs(DATA_FOLDER) return csv_files for filename in os.listdir(DATA_FOLDER): if filename.endswith('.csv') and SYMBOL in filename: # Extract date from filename like ETHUSDT_20250101.csv match = re.search(r'(\d{8})', filename) if match: date_str = match.group(1) try: start_date = datetime.strptime(date_str, '%Y%m%d') file_path = os.path.join(DATA_FOLDER, filename) file_size = os.path.getsize(file_path) csv_files.append({ 'filename': filename, 'start_date_str': start_date.strftime('%Y-%m-%d'), 'date_str': date_str, 'size': file_size, 'display_name': f"{start_date.strftime('%Y-%m-%d')} ({filename})" }) logging.info(f"Found CSV file: {filename}, size: {file_size}, date: {date_str}") except ValueError: logging.warning(f"Could not parse date from filename: {filename}") continue # Sort by start date (newest first) csv_files.sort(key=lambda x: x['date_str'], reverse=True) logging.info(f"Available CSV files: {[f['filename'] for f in csv_files]}") return csv_files def get_default_csv_file(): """Get the default CSV file (smallest one or last used).""" # Try to load last used file if os.path.exists(USER_PREFERENCES_FILE): try: with open(USER_PREFERENCES_FILE, 'r') as f: prefs = json.load(f) last_file = prefs.get('last_csv_file') if last_file and os.path.exists(os.path.join(DATA_FOLDER, last_file)): logging.info(f"Using last selected file: {last_file}") return last_file except: pass # Fall back to smallest file csv_files = get_available_csv_files() if csv_files: # Filter to exclude the large Binance file for better performance filtered_files = [f for f in csv_files if not f['filename'].endswith('_Binance.csv')] if filtered_files: smallest_file = min(filtered_files, key=lambda x: x['size']) logging.info(f"Using smallest filtered file: {smallest_file['filename']} ({smallest_file['size']} bytes)") else: smallest_file = min(csv_files, key=lambda x: x['size']) logging.info(f"Using smallest file: {smallest_file['filename']} ({smallest_file['size']} bytes)") return smallest_file['filename'] logging.warning("No CSV files found") return None def save_user_preference(csv_filename): """Save the user's CSV file preference.""" prefs = {} if os.path.exists(USER_PREFERENCES_FILE): try: with open(USER_PREFERENCES_FILE, 'r') as f: prefs = json.load(f) except: pass prefs['last_csv_file'] = csv_filename with open(USER_PREFERENCES_FILE, 'w') as f: json.dump(prefs, f) def read_csv_data(csv_filename): """Read historical data from CSV file.""" csv_path = os.path.join(DATA_FOLDER, csv_filename) if not os.path.exists(csv_path): return [] klines = [] try: with open(csv_path, 'r', newline='', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) for row in reader: # Convert CSV row to kline format open_time = datetime.strptime(row['Open time'], '%Y-%m-%d %H:%M:%S') close_time = datetime.strptime(row['Close time'].split('.')[0], '%Y-%m-%d %H:%M:%S') # ================================================================= # --- FIX START: Convert string values to numeric types --- # The original code passed the string values from the CSV directly. # This caused the historical data to be misinterpreted by the chart. # By converting to float/int here, we ensure data consistency. # ================================================================= kline = [ int(open_time.timestamp() * 1000), # Open time (ms) float(row['Open']), # Open float(row['High']), # High float(row['Low']), # Low float(row['Close']), # Close float(row['Volume']), # Volume int(close_time.timestamp() * 1000), # Close time (ms) float(row['Quote asset volume']), # Quote asset volume int(row['Number of trades']), # Number of trades float(row['Taker buy base asset volume']), # Taker buy base asset volume float(row['Taker buy quote asset volume']), # Taker buy quote asset volume float(row['Ignore']) # Ignore ] # --- FIX END --- # ================================================================= klines.append(kline) except Exception as e: logging.error(f"Error reading CSV file {csv_filename}: {e}") return [] return klines def append_to_csv(csv_filename, candle_data): """Append new candle data to CSV file.""" csv_path = os.path.join(DATA_FOLDER, csv_filename) try: with csv_file_lock: # Convert candle data to CSV row open_time = datetime.fromtimestamp(candle_data['time']) close_time = open_time.replace(second=59, microsecond=999000) row = [ open_time.strftime('%Y-%m-%d %H:%M:%S'), candle_data['open'], candle_data['high'], candle_data['low'], candle_data['close'], 0.0, # Volume (placeholder) close_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], 0.0, # Quote asset volume (placeholder) 1, # Number of trades (placeholder) 0.0, # Taker buy base asset volume (placeholder) 0.0, # Taker buy quote asset volume (placeholder) 0.0 # Ignore ] # Check if file exists and has header file_exists = os.path.exists(csv_path) with open(csv_path, 'a', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) # Write header if file is new if not file_exists: headers = [ 'Open time', 'Open', 'High', 'Low', 'Close', 'Volume', 'Close time', 'Quote asset volume', 'Number of trades', 'Taker buy base asset volume', 'Taker buy quote asset volume', 'Ignore' ] writer.writerow(headers) writer.writerow(row) except Exception as e: logging.error(f"Error appending to CSV file {csv_filename}: {e}") def fill_missing_data(csv_filename): """Fill missing data by downloading from Binance.""" global selected_csv_file try: logging.info(f"Checking for missing data in {csv_filename}") # Get the start date from filename match = re.search(r'(\d{8})', csv_filename) if not match: return date_str = match.group(1) start_date = datetime.strptime(date_str, '%Y%m%d') # Read existing data existing_data = read_csv_data(csv_filename) # Determine what data we need to fetch if existing_data: # Get the last timestamp from existing data last_timestamp = existing_data[-1][0] // 1000 # Convert to seconds fetch_start = datetime.fromtimestamp(last_timestamp) + timedelta(minutes=1) else: fetch_start = start_date # Fetch missing data up to current time now = datetime.now() if fetch_start >= now: logging.info(f"No missing data for {csv_filename}") return existing_data logging.info(f"Fetching missing data from {fetch_start} to {now}") client = Client() missing_klines = client.get_historical_klines( SYMBOL, Client.KLINE_INTERVAL_1MINUTE, start_str=fetch_start.strftime('%Y-%m-%d %H:%M:%S'), end_str=now.strftime('%Y-%m-%d %H:%M:%S') ) if missing_klines: # Append missing data to CSV csv_path = os.path.join(DATA_FOLDER, csv_filename) with csv_file_lock: with open(csv_path, 'a', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) for kline in missing_klines: open_time = datetime.fromtimestamp(kline[0] / 1000) close_time = datetime.fromtimestamp(kline[6] / 1000) row = [ open_time.strftime('%Y-%m-%d %H:%M:%S'), kline[1], kline[2], kline[3], kline[4], kline[5], close_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], kline[7], kline[8], kline[9], kline[10], kline[11] ] writer.writerow(row) logging.info(f"Added {len(missing_klines)} missing candles to {csv_filename}") existing_data.extend(missing_klines) return existing_data except Exception as e: logging.error(f"Error filling missing data for {csv_filename}: {e}") return existing_data if 'existing_data' in locals() else [] # --- Historical Data Streaming --- def stream_historical_data(sid): """ Loads historical data from the selected CSV file and sends it to the client. """ global selected_csv_file try: logging.info(f"Starting historical data stream for SID={sid}") # Get selected CSV file or default if not selected_csv_file: selected_csv_file = get_default_csv_file() if not selected_csv_file: # No CSV files available, create a default one logging.warning("No CSV files available, creating default file") selected_csv_file = f"ETHUSDT_{datetime.now().strftime('%Y%m%d')}.csv" logging.info(f"Using CSV file: {selected_csv_file}") # Fill missing data and get all klines all_klines = fill_missing_data(selected_csv_file) # Send progress update socketio.emit('history_progress', {'progress': 100}, to=sid) logging.info(f"Finished data stream for SID={sid}. Sending final payload of {len(all_klines)} klines.") socketio.emit('history_finished', {'klines_1m': all_klines}, to=sid) except Exception as e: logging.error(f"Error in stream_historical_data for SID={sid}: {e}", exc_info=True) socketio.emit('history_error', {'message': str(e)}, to=sid) # --- Real-time Data Listener --- def binance_listener_thread(): """ Connects to Binance, manages the 1-minute candle, and emits updates. """ global current_bar async def listener(): global current_bar while True: try: logging.info(f"Connecting to Binance WebSocket at {BINANCE_WS_URL}...") async with websockets.connect(BINANCE_WS_URL) as websocket: logging.info("Binance WebSocket connected successfully.") while True: message = await websocket.recv() trade = json.loads(message) price = float(trade['p']) trade_time_s = trade['T'] // 1000 candle_timestamp = trade_time_s - (trade_time_s % 60) if not current_bar or candle_timestamp > current_bar.get("time", 0): if current_bar: # The previous candle is now closed, emit it and save to CSV logging.info(f"Candle closed at {current_bar['close']}. Emitting 'candle_closed' event.") socketio.emit('candle_closed', current_bar) # Append to selected CSV file if selected_csv_file: append_to_csv(selected_csv_file, current_bar) current_bar = {"time": candle_timestamp, "open": price, "high": price, "low": price, "close": price} else: current_bar['high'] = max(current_bar.get('high', price), price) current_bar['low'] = min(current_bar.get('low', price), price) current_bar['close'] = price # Emit the live, updating candle for visual feedback socketio.emit('candle_update', current_bar) except Exception as e: logging.error(f"Binance listener error: {e}. Reconnecting...") await asyncio.sleep(RESTART_TIMEOUT_S) asyncio.run(listener()) # --- SocketIO Event Handlers --- @socketio.on('connect') def handle_connect(): global app_initialized logging.info(f"Client connected: IP={request.remote_addr}, SID={request.sid}") with app_init_lock: if not app_initialized: logging.info("--- Initializing Application ---") socketio.start_background_task(binance_listener_thread) app_initialized = True socketio.start_background_task(target=stream_historical_data, sid=request.sid) @socketio.on('get_csv_files') def handle_get_csv_files(): """Send available CSV files to client.""" logging.info(f"Received get_csv_files request from SID={request.sid}") csv_files = get_available_csv_files() default_file = get_default_csv_file() logging.info(f"Sending CSV files list: {len(csv_files)} files, default: {default_file}") socketio.emit('csv_files_list', { 'files': csv_files, 'selected': default_file }) @socketio.on('select_csv_file') def handle_select_csv_file(data): """Handle CSV file selection by user.""" global selected_csv_file logging.info(f"Received select_csv_file request from SID={request.sid} with data: {data}") filename = data.get('filename') if filename: csv_files = get_available_csv_files() valid_files = [f['filename'] for f in csv_files] if filename in valid_files: selected_csv_file = filename save_user_preference(filename) logging.info(f"User selected CSV file: {filename}") # Stream new historical data socketio.start_background_task(target=stream_historical_data, sid=request.sid) else: logging.error(f"Invalid CSV file selected: {filename}") socketio.emit('error', {'message': f'Invalid CSV file: {filename}'}) # --- Flask Routes --- @app.route('/') def index(): return render_template('index.html', symbol=SYMBOL) # --- Main Application Execution --- if __name__ == '__main__': logging.info("Starting Flask-SocketIO server...") socketio.run(app, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True, debug=False)