From 5cae5dde8f8feb2454b5bbf613d72c8aca6f72b8 Mon Sep 17 00:00:00 2001 From: ditus Date: Mon, 14 Jul 2025 10:31:19 +0000 Subject: [PATCH] Upload files to "/" initial --- app.py | 221 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 app.py diff --git a/app.py b/app.py new file mode 100644 index 0000000..ef3aa90 --- /dev/null +++ b/app.py @@ -0,0 +1,221 @@ +import time +import logging +import asyncio +import os +import json +import csv +from flask import Flask, render_template, request +from flask_socketio import SocketIO +from binance import Client +import websockets +from threading import Lock +from datetime import datetime, timedelta, timezone + +# --- Configuration --- +SYMBOL = 'ETHUSDT' +# The CSV file is now the primary source of historical data. +HISTORY_CSV_FILE = 'ETHUSDT_1m_Binance.csv' +RESTART_TIMEOUT_S = 15 +BINANCE_WS_URL = f"wss://stream.binance.com:9443/ws/{SYMBOL.lower()}@trade" + +# --- Logging Setup --- +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# --- Flask App Initialization --- +app = Flask(__name__) +app.config['SECRET_KEY'] = 'secret!' +socketio = SocketIO(app, async_mode='threading') + +# --- Global State --- +app_initialized = False +app_init_lock = Lock() +# This cache will hold the filtered historical data to be sent to the frontend. +historical_data_cache = [] + +# --- Helper Function for Optimized Reading --- +def get_last_timestamp_from_csv(filepath): + """ + Efficiently reads the end of a CSV to get the timestamp from the last valid row. + This avoids reading the entire file into memory. + Returns a datetime object or None. + """ + try: + with open(filepath, 'rb') as f: + # Seek to a position near the end of the file to read a chunk. + # 4096 bytes should be enough to contain several lines. + f.seek(0, os.SEEK_END) + filesize = f.tell() + if filesize == 0: + return None + + f.seek(max(0, filesize - 4096), os.SEEK_SET) + + # Read the last part of the file + lines = f.readlines() + if not lines: + return None + + # Get the last non-empty line + last_line_str = '' + for line in reversed(lines): + decoded_line = line.decode('utf-8').strip() + if decoded_line: + last_line_str = decoded_line + break + + if not last_line_str or 'Open time' in last_line_str: + return None + + last_row = last_line_str.split(',') + dt_obj = datetime.strptime(last_row[0], '%Y-%m-%d %H:%M:%S') + return dt_obj.replace(tzinfo=timezone.utc) + + except (IOError, IndexError, ValueError) as e: + logging.error(f"Could not get last timestamp from CSV: {e}") + return None + +# --- Data Management --- +def load_and_update_data(): + """ + Loads historical data from the CSV, updates it with the latest data from Binance, + and then filters it for the frontend. + """ + global historical_data_cache + client = Client() + + # 1. Check if the primary CSV data source exists. + if not os.path.exists(HISTORY_CSV_FILE): + logging.critical(f"CRITICAL: History file '{HISTORY_CSV_FILE}' not found. Please provide the CSV file. Halting data load.") + historical_data_cache = [] + return + + # 2. OPTIMIZED: Efficiently get the last timestamp to determine where to start fetching. + last_dt_in_csv = get_last_timestamp_from_csv(HISTORY_CSV_FILE) + + start_fetch_date = None + if last_dt_in_csv: + start_fetch_date = last_dt_in_csv + timedelta(minutes=1) + logging.info(f"Last record in CSV is from {last_dt_in_csv}. Checking for new data since {start_fetch_date}.") + else: + logging.warning("Could not determine last timestamp from CSV. Assuming file is new or empty. No new data will be fetched.") + + # 3. Fetch new data from Binance. + new_klines = [] + if start_fetch_date and start_fetch_date < datetime.now(timezone.utc): + while True: + logging.info(f"Fetching new klines from {start_fetch_date}...") + fetched = client.get_historical_klines(SYMBOL, Client.KLINE_INTERVAL_1MINUTE, start_fetch_date.strftime("%Y-%m-%d %H:%M:%S")) + if not fetched: + logging.info("No new klines to fetch.") + break + + new_klines.extend(fetched) + last_fetched_dt = datetime.fromtimestamp(fetched[-1][0] / 1000, tz=timezone.utc) + start_fetch_date = last_fetched_dt + timedelta(minutes=1) + logging.info(f"Fetched {len(fetched)} new klines, up to {last_fetched_dt}.") + if len(fetched) < 1000: + break + time.sleep(0.1) + + # 4. If new data was found, append it to the CSV file. + if new_klines: + logging.info(f"Appending {len(new_klines)} new candles to {HISTORY_CSV_FILE}.") + try: + with open(HISTORY_CSV_FILE, 'a', newline='') as f: + writer = csv.writer(f) + for kline in new_klines: + open_time_dt = datetime.fromtimestamp(kline[0] / 1000, tz=timezone.utc) + open_time_str = open_time_dt.strftime('%Y-%m-%d %H:%M:%S') + close_time_dt = datetime.fromtimestamp(kline[6] / 1000, tz=timezone.utc) + close_time_str = close_time_dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + writer.writerow([open_time_str] + kline[1:6] + [close_time_str] + kline[7:]) + except Exception as e: + logging.error(f"Failed to append new data to {HISTORY_CSV_FILE}: {e}") + + # 5. OPTIMIZED: Read the CSV and load only the necessary data (2025 onwards) for the frontend. + logging.info("Reading CSV to populate cache with data from 01.01.2025 onwards...") + frontend_klines = [] + frontend_start_dt = datetime(2025, 1, 1, tzinfo=timezone.utc) + + try: + with open(HISTORY_CSV_FILE, 'r', newline='') as f: + reader = csv.reader(f) + next(reader) # Skip header + for row in reader: + try: + dt_obj = datetime.strptime(row[0], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) + if dt_obj >= frontend_start_dt: + timestamp_ms = int(dt_obj.timestamp() * 1000) + frontend_klines.append([ + timestamp_ms, row[1], row[2], row[3], row[4], + "0", "0", "0", "0", "0", "0" + ]) + except (ValueError, IndexError): + continue + + historical_data_cache = frontend_klines + logging.info(f"--- Data initialization complete. {len(historical_data_cache)} candles cached for frontend. ---") + + except Exception as e: + logging.error(f"Failed to read CSV for frontend cache: {e}") + historical_data_cache = [] + + +# --- Real-time Data Listener --- +def binance_listener_thread(): + async def listener(): + while True: + try: + logging.info(f"Connecting to Binance WebSocket at {BINANCE_WS_URL}...") + async with websockets.connect(BINANCE_WS_URL) as websocket: + logging.info("Binance WebSocket connected successfully.") + while True: + message = await websocket.recv() + socketio.emit('trade', json.loads(message)) + except Exception as e: + logging.error(f"Binance listener error: {e}. Reconnecting...") + await asyncio.sleep(RESTART_TIMEOUT_S) + + asyncio.run(listener()) + +# --- SocketIO Event Handlers --- +@socketio.on('connect') +def handle_connect(): + global app_initialized + logging.info(f"Client connected: IP={request.remote_addr}, SID={request.sid}") + + with app_init_lock: + if not app_initialized: + logging.info("--- First client connected, initializing application data ---") + socketio.start_background_task(load_and_update_data) + socketio.start_background_task(binance_listener_thread) + app_initialized = True + + # Wait until the cache is populated. + while not historical_data_cache: + logging.info(f"SID={request.sid} is waiting for historical data cache...") + socketio.sleep(1) + + logging.info(f"Sending {len(historical_data_cache)} cached klines to SID={request.sid}") + socketio.emit('history_finished', {'klines_1m': historical_data_cache}, to=request.sid) + + +@socketio.on('analyze_chart') +def handle_analyze_chart(data): + sid = request.sid + logging.info(f"Received 'analyze_chart' request from frontend (SID={sid})") + recent_data = data[-100:] + prompt_data = "\n".join([f"Time: {c['time']}, Open: {c['open']}, High: {c['high']}, Low: {c['low']}, Close: {c['close']}" for c in recent_data]) + prompt = (f"You are a financial analyst. Based on the following recent candlestick data for {SYMBOL}, provide a brief technical analysis (3-4 sentences). Mention the current trend and any potential short-term support or resistance levels.\n\nData:\n{prompt_data}") + socketio.emit('analysis_result', {'analysis': "AI analysis is currently unavailable."}, to=sid) + + +# --- Flask Routes --- +@app.route('/') +def index(): + return render_template('index.html', symbol=SYMBOL) + +# --- Main Application Execution --- +if __name__ == '__main__': + logging.info("Starting Flask-SocketIO server...") + socketio.run(app, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True, debug=False)