222 lines
9.3 KiB
Python
222 lines
9.3 KiB
Python
import time
|
|
import logging
|
|
import asyncio
|
|
import os
|
|
import json
|
|
import csv
|
|
from flask import Flask, render_template, request
|
|
from flask_socketio import SocketIO
|
|
from binance import Client
|
|
import websockets
|
|
from threading import Lock
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
# --- Configuration ---
|
|
SYMBOL = 'ETHUSDT'
|
|
# The CSV file is now the primary source of historical data.
|
|
HISTORY_CSV_FILE = 'ETHUSDT_1m_Binance.csv'
|
|
RESTART_TIMEOUT_S = 15
|
|
BINANCE_WS_URL = f"wss://stream.binance.com:9443/ws/{SYMBOL.lower()}@trade"
|
|
|
|
# --- Logging Setup ---
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# --- Flask App Initialization ---
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'secret!'
|
|
socketio = SocketIO(app, async_mode='threading')
|
|
|
|
# --- Global State ---
|
|
app_initialized = False
|
|
app_init_lock = Lock()
|
|
# This cache will hold the filtered historical data to be sent to the frontend.
|
|
historical_data_cache = []
|
|
|
|
# --- Helper Function for Optimized Reading ---
|
|
def get_last_timestamp_from_csv(filepath):
|
|
"""
|
|
Efficiently reads the end of a CSV to get the timestamp from the last valid row.
|
|
This avoids reading the entire file into memory.
|
|
Returns a datetime object or None.
|
|
"""
|
|
try:
|
|
with open(filepath, 'rb') as f:
|
|
# Seek to a position near the end of the file to read a chunk.
|
|
# 4096 bytes should be enough to contain several lines.
|
|
f.seek(0, os.SEEK_END)
|
|
filesize = f.tell()
|
|
if filesize == 0:
|
|
return None
|
|
|
|
f.seek(max(0, filesize - 4096), os.SEEK_SET)
|
|
|
|
# Read the last part of the file
|
|
lines = f.readlines()
|
|
if not lines:
|
|
return None
|
|
|
|
# Get the last non-empty line
|
|
last_line_str = ''
|
|
for line in reversed(lines):
|
|
decoded_line = line.decode('utf-8').strip()
|
|
if decoded_line:
|
|
last_line_str = decoded_line
|
|
break
|
|
|
|
if not last_line_str or 'Open time' in last_line_str:
|
|
return None
|
|
|
|
last_row = last_line_str.split(',')
|
|
dt_obj = datetime.strptime(last_row[0], '%Y-%m-%d %H:%M:%S')
|
|
return dt_obj.replace(tzinfo=timezone.utc)
|
|
|
|
except (IOError, IndexError, ValueError) as e:
|
|
logging.error(f"Could not get last timestamp from CSV: {e}")
|
|
return None
|
|
|
|
# --- Data Management ---
|
|
def load_and_update_data():
|
|
"""
|
|
Loads historical data from the CSV, updates it with the latest data from Binance,
|
|
and then filters it for the frontend.
|
|
"""
|
|
global historical_data_cache
|
|
client = Client()
|
|
|
|
# 1. Check if the primary CSV data source exists.
|
|
if not os.path.exists(HISTORY_CSV_FILE):
|
|
logging.critical(f"CRITICAL: History file '{HISTORY_CSV_FILE}' not found. Please provide the CSV file. Halting data load.")
|
|
historical_data_cache = []
|
|
return
|
|
|
|
# 2. OPTIMIZED: Efficiently get the last timestamp to determine where to start fetching.
|
|
last_dt_in_csv = get_last_timestamp_from_csv(HISTORY_CSV_FILE)
|
|
|
|
start_fetch_date = None
|
|
if last_dt_in_csv:
|
|
start_fetch_date = last_dt_in_csv + timedelta(minutes=1)
|
|
logging.info(f"Last record in CSV is from {last_dt_in_csv}. Checking for new data since {start_fetch_date}.")
|
|
else:
|
|
logging.warning("Could not determine last timestamp from CSV. Assuming file is new or empty. No new data will be fetched.")
|
|
|
|
# 3. Fetch new data from Binance.
|
|
new_klines = []
|
|
if start_fetch_date and start_fetch_date < datetime.now(timezone.utc):
|
|
while True:
|
|
logging.info(f"Fetching new klines from {start_fetch_date}...")
|
|
fetched = client.get_historical_klines(SYMBOL, Client.KLINE_INTERVAL_1MINUTE, start_fetch_date.strftime("%Y-%m-%d %H:%M:%S"))
|
|
if not fetched:
|
|
logging.info("No new klines to fetch.")
|
|
break
|
|
|
|
new_klines.extend(fetched)
|
|
last_fetched_dt = datetime.fromtimestamp(fetched[-1][0] / 1000, tz=timezone.utc)
|
|
start_fetch_date = last_fetched_dt + timedelta(minutes=1)
|
|
logging.info(f"Fetched {len(fetched)} new klines, up to {last_fetched_dt}.")
|
|
if len(fetched) < 1000:
|
|
break
|
|
time.sleep(0.1)
|
|
|
|
# 4. If new data was found, append it to the CSV file.
|
|
if new_klines:
|
|
logging.info(f"Appending {len(new_klines)} new candles to {HISTORY_CSV_FILE}.")
|
|
try:
|
|
with open(HISTORY_CSV_FILE, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
for kline in new_klines:
|
|
open_time_dt = datetime.fromtimestamp(kline[0] / 1000, tz=timezone.utc)
|
|
open_time_str = open_time_dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
close_time_dt = datetime.fromtimestamp(kline[6] / 1000, tz=timezone.utc)
|
|
close_time_str = close_time_dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
|
writer.writerow([open_time_str] + kline[1:6] + [close_time_str] + kline[7:])
|
|
except Exception as e:
|
|
logging.error(f"Failed to append new data to {HISTORY_CSV_FILE}: {e}")
|
|
|
|
# 5. OPTIMIZED: Read the CSV and load only the necessary data (2025 onwards) for the frontend.
|
|
logging.info("Reading CSV to populate cache with data from 01.01.2025 onwards...")
|
|
frontend_klines = []
|
|
frontend_start_dt = datetime(2025, 1, 1, tzinfo=timezone.utc)
|
|
|
|
try:
|
|
with open(HISTORY_CSV_FILE, 'r', newline='') as f:
|
|
reader = csv.reader(f)
|
|
next(reader) # Skip header
|
|
for row in reader:
|
|
try:
|
|
dt_obj = datetime.strptime(row[0], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
|
|
if dt_obj >= frontend_start_dt:
|
|
timestamp_ms = int(dt_obj.timestamp() * 1000)
|
|
frontend_klines.append([
|
|
timestamp_ms, row[1], row[2], row[3], row[4],
|
|
"0", "0", "0", "0", "0", "0"
|
|
])
|
|
except (ValueError, IndexError):
|
|
continue
|
|
|
|
historical_data_cache = frontend_klines
|
|
logging.info(f"--- Data initialization complete. {len(historical_data_cache)} candles cached for frontend. ---")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Failed to read CSV for frontend cache: {e}")
|
|
historical_data_cache = []
|
|
|
|
|
|
# --- Real-time Data Listener ---
|
|
def binance_listener_thread():
|
|
async def listener():
|
|
while True:
|
|
try:
|
|
logging.info(f"Connecting to Binance WebSocket at {BINANCE_WS_URL}...")
|
|
async with websockets.connect(BINANCE_WS_URL) as websocket:
|
|
logging.info("Binance WebSocket connected successfully.")
|
|
while True:
|
|
message = await websocket.recv()
|
|
socketio.emit('trade', json.loads(message))
|
|
except Exception as e:
|
|
logging.error(f"Binance listener error: {e}. Reconnecting...")
|
|
await asyncio.sleep(RESTART_TIMEOUT_S)
|
|
|
|
asyncio.run(listener())
|
|
|
|
# --- SocketIO Event Handlers ---
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
global app_initialized
|
|
logging.info(f"Client connected: IP={request.remote_addr}, SID={request.sid}")
|
|
|
|
with app_init_lock:
|
|
if not app_initialized:
|
|
logging.info("--- First client connected, initializing application data ---")
|
|
socketio.start_background_task(load_and_update_data)
|
|
socketio.start_background_task(binance_listener_thread)
|
|
app_initialized = True
|
|
|
|
# Wait until the cache is populated.
|
|
while not historical_data_cache:
|
|
logging.info(f"SID={request.sid} is waiting for historical data cache...")
|
|
socketio.sleep(1)
|
|
|
|
logging.info(f"Sending {len(historical_data_cache)} cached klines to SID={request.sid}")
|
|
socketio.emit('history_finished', {'klines_1m': historical_data_cache}, to=request.sid)
|
|
|
|
|
|
@socketio.on('analyze_chart')
|
|
def handle_analyze_chart(data):
|
|
sid = request.sid
|
|
logging.info(f"Received 'analyze_chart' request from frontend (SID={sid})")
|
|
recent_data = data[-100:]
|
|
prompt_data = "\n".join([f"Time: {c['time']}, Open: {c['open']}, High: {c['high']}, Low: {c['low']}, Close: {c['close']}" for c in recent_data])
|
|
prompt = (f"You are a financial analyst. Based on the following recent candlestick data for {SYMBOL}, provide a brief technical analysis (3-4 sentences). Mention the current trend and any potential short-term support or resistance levels.\n\nData:\n{prompt_data}")
|
|
socketio.emit('analysis_result', {'analysis': "AI analysis is currently unavailable."}, to=sid)
|
|
|
|
|
|
# --- Flask Routes ---
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html', symbol=SYMBOL)
|
|
|
|
# --- Main Application Execution ---
|
|
if __name__ == '__main__':
|
|
logging.info("Starting Flask-SocketIO server...")
|
|
socketio.run(app, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True, debug=False)
|