import time import logging import asyncio import os import json import csv from flask import Flask, render_template, request from flask_socketio import SocketIO from binance import Client import websockets from threading import Lock from datetime import datetime, timedelta, timezone # --- Configuration --- SYMBOL = 'ETHUSDT' # The CSV file is now the primary source of historical data. HISTORY_CSV_FILE = 'ETHUSDT_1m_Binance.csv' RESTART_TIMEOUT_S = 15 BINANCE_WS_URL = f"wss://stream.binance.com:9443/ws/{SYMBOL.lower()}@trade" # --- Logging Setup --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- Flask App Initialization --- app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, async_mode='threading') # --- Global State --- app_initialized = False app_init_lock = Lock() # This cache will hold the filtered historical data to be sent to the frontend. historical_data_cache = [] # --- Helper Function for Optimized Reading --- def get_last_timestamp_from_csv(filepath): """ Efficiently reads the end of a CSV to get the timestamp from the last valid row. This avoids reading the entire file into memory. Returns a datetime object or None. """ try: with open(filepath, 'rb') as f: # Seek to a position near the end of the file to read a chunk. # 4096 bytes should be enough to contain several lines. f.seek(0, os.SEEK_END) filesize = f.tell() if filesize == 0: return None f.seek(max(0, filesize - 4096), os.SEEK_SET) # Read the last part of the file lines = f.readlines() if not lines: return None # Get the last non-empty line last_line_str = '' for line in reversed(lines): decoded_line = line.decode('utf-8').strip() if decoded_line: last_line_str = decoded_line break if not last_line_str or 'Open time' in last_line_str: return None last_row = last_line_str.split(',') dt_obj = datetime.strptime(last_row[0], '%Y-%m-%d %H:%M:%S') return dt_obj.replace(tzinfo=timezone.utc) except (IOError, IndexError, ValueError) as e: logging.error(f"Could not get last timestamp from CSV: {e}") return None # --- Data Management --- def load_and_update_data(): """ Loads historical data from the CSV, updates it with the latest data from Binance, and then filters it for the frontend. """ global historical_data_cache client = Client() # 1. Check if the primary CSV data source exists. if not os.path.exists(HISTORY_CSV_FILE): logging.critical(f"CRITICAL: History file '{HISTORY_CSV_FILE}' not found. Please provide the CSV file. Halting data load.") historical_data_cache = [] return # 2. OPTIMIZED: Efficiently get the last timestamp to determine where to start fetching. last_dt_in_csv = get_last_timestamp_from_csv(HISTORY_CSV_FILE) start_fetch_date = None if last_dt_in_csv: start_fetch_date = last_dt_in_csv + timedelta(minutes=1) logging.info(f"Last record in CSV is from {last_dt_in_csv}. Checking for new data since {start_fetch_date}.") else: logging.warning("Could not determine last timestamp from CSV. Assuming file is new or empty. No new data will be fetched.") # 3. Fetch new data from Binance. new_klines = [] if start_fetch_date and start_fetch_date < datetime.now(timezone.utc): while True: logging.info(f"Fetching new klines from {start_fetch_date}...") fetched = client.get_historical_klines(SYMBOL, Client.KLINE_INTERVAL_1MINUTE, start_fetch_date.strftime("%Y-%m-%d %H:%M:%S")) if not fetched: logging.info("No new klines to fetch.") break new_klines.extend(fetched) last_fetched_dt = datetime.fromtimestamp(fetched[-1][0] / 1000, tz=timezone.utc) start_fetch_date = last_fetched_dt + timedelta(minutes=1) logging.info(f"Fetched {len(fetched)} new klines, up to {last_fetched_dt}.") if len(fetched) < 1000: break time.sleep(0.1) # 4. If new data was found, append it to the CSV file. if new_klines: logging.info(f"Appending {len(new_klines)} new candles to {HISTORY_CSV_FILE}.") try: with open(HISTORY_CSV_FILE, 'a', newline='') as f: writer = csv.writer(f) for kline in new_klines: open_time_dt = datetime.fromtimestamp(kline[0] / 1000, tz=timezone.utc) open_time_str = open_time_dt.strftime('%Y-%m-%d %H:%M:%S') close_time_dt = datetime.fromtimestamp(kline[6] / 1000, tz=timezone.utc) close_time_str = close_time_dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] writer.writerow([open_time_str] + kline[1:6] + [close_time_str] + kline[7:]) except Exception as e: logging.error(f"Failed to append new data to {HISTORY_CSV_FILE}: {e}") # 5. OPTIMIZED: Read the CSV and load only the necessary data (2025 onwards) for the frontend. logging.info("Reading CSV to populate cache with data from 01.01.2025 onwards...") frontend_klines = [] frontend_start_dt = datetime(2025, 1, 1, tzinfo=timezone.utc) try: with open(HISTORY_CSV_FILE, 'r', newline='') as f: reader = csv.reader(f) next(reader) # Skip header for row in reader: try: dt_obj = datetime.strptime(row[0], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) if dt_obj >= frontend_start_dt: timestamp_ms = int(dt_obj.timestamp() * 1000) frontend_klines.append([ timestamp_ms, row[1], row[2], row[3], row[4], "0", "0", "0", "0", "0", "0" ]) except (ValueError, IndexError): continue historical_data_cache = frontend_klines logging.info(f"--- Data initialization complete. {len(historical_data_cache)} candles cached for frontend. ---") except Exception as e: logging.error(f"Failed to read CSV for frontend cache: {e}") historical_data_cache = [] # --- Real-time Data Listener --- def binance_listener_thread(): async def listener(): while True: try: logging.info(f"Connecting to Binance WebSocket at {BINANCE_WS_URL}...") async with websockets.connect(BINANCE_WS_URL) as websocket: logging.info("Binance WebSocket connected successfully.") while True: message = await websocket.recv() socketio.emit('trade', json.loads(message)) except Exception as e: logging.error(f"Binance listener error: {e}. Reconnecting...") await asyncio.sleep(RESTART_TIMEOUT_S) asyncio.run(listener()) # --- SocketIO Event Handlers --- @socketio.on('connect') def handle_connect(): global app_initialized logging.info(f"Client connected: IP={request.remote_addr}, SID={request.sid}") with app_init_lock: if not app_initialized: logging.info("--- First client connected, initializing application data ---") socketio.start_background_task(load_and_update_data) socketio.start_background_task(binance_listener_thread) app_initialized = True # Wait until the cache is populated. while not historical_data_cache: logging.info(f"SID={request.sid} is waiting for historical data cache...") socketio.sleep(1) logging.info(f"Sending {len(historical_data_cache)} cached klines to SID={request.sid}") socketio.emit('history_finished', {'klines_1m': historical_data_cache}, to=request.sid) @socketio.on('analyze_chart') def handle_analyze_chart(data): sid = request.sid logging.info(f"Received 'analyze_chart' request from frontend (SID={sid})") recent_data = data[-100:] prompt_data = "\n".join([f"Time: {c['time']}, Open: {c['open']}, High: {c['high']}, Low: {c['low']}, Close: {c['close']}" for c in recent_data]) prompt = (f"You are a financial analyst. Based on the following recent candlestick data for {SYMBOL}, provide a brief technical analysis (3-4 sentences). Mention the current trend and any potential short-term support or resistance levels.\n\nData:\n{prompt_data}") socketio.emit('analysis_result', {'analysis': "AI analysis is currently unavailable."}, to=sid) # --- Flask Routes --- @app.route('/') def index(): return render_template('index.html', symbol=SYMBOL) # --- Main Application Execution --- if __name__ == '__main__': logging.info("Starting Flask-SocketIO server...") socketio.run(app, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True, debug=False)