first hurst working, only for current TF

This commit is contained in:
2025-07-14 21:45:48 +02:00
parent f8064f2f44
commit 0c3c9ecd81
4 changed files with 103 additions and 225 deletions

186
app.py
View File

@ -3,18 +3,16 @@ import logging
import asyncio
import os
import json
import csv
from flask import Flask, render_template, request
from flask_socketio import SocketIO
from binance import Client
import websockets
from threading import Lock
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
# --- Configuration ---
SYMBOL = 'ETHUSDT'
# The CSV file is now the primary source of historical data.
HISTORY_CSV_FILE = 'ETHUSDT_1m_Binance.csv'
HISTORY_FILE = 'historical_data_1m.json' # Used as a cache to prevent re-downloading
RESTART_TIMEOUT_S = 15
BINANCE_WS_URL = f"wss://stream.binance.com:9443/ws/{SYMBOL.lower()}@trade"
@ -29,136 +27,57 @@ socketio = SocketIO(app, async_mode='threading')
# --- Global State ---
app_initialized = False
app_init_lock = Lock()
# This cache will hold the filtered historical data to be sent to the frontend.
historical_data_cache = []
# --- Helper Function for Optimized Reading ---
def get_last_timestamp_from_csv(filepath):
# --- Historical Data Streaming ---
def stream_historical_data(sid):
"""
Efficiently reads the end of a CSV to get the timestamp from the last valid row.
This avoids reading the entire file into memory.
Returns a datetime object or None.
Fetches historical data in chunks and streams it to the client with progress updates.
"""
try:
with open(filepath, 'rb') as f:
# Seek to a position near the end of the file to read a chunk.
# 4096 bytes should be enough to contain several lines.
f.seek(0, os.SEEK_END)
filesize = f.tell()
if filesize == 0:
return None
f.seek(max(0, filesize - 4096), os.SEEK_SET)
# Read the last part of the file
lines = f.readlines()
if not lines:
return None
# Get the last non-empty line
last_line_str = ''
for line in reversed(lines):
decoded_line = line.decode('utf-8').strip()
if decoded_line:
last_line_str = decoded_line
break
if not last_line_str or 'Open time' in last_line_str:
return None
last_row = last_line_str.split(',')
dt_obj = datetime.strptime(last_row[0], '%Y-%m-%d %H:%M:%S')
return dt_obj.replace(tzinfo=timezone.utc)
except (IOError, IndexError, ValueError) as e:
logging.error(f"Could not get last timestamp from CSV: {e}")
return None
# --- Data Management ---
def load_and_update_data():
"""
Loads historical data from the CSV, updates it with the latest data from Binance,
and then filters it for the frontend.
"""
global historical_data_cache
client = Client()
# 1. Check if the primary CSV data source exists.
if not os.path.exists(HISTORY_CSV_FILE):
logging.critical(f"CRITICAL: History file '{HISTORY_CSV_FILE}' not found. Please provide the CSV file. Halting data load.")
historical_data_cache = []
return
# 2. OPTIMIZED: Efficiently get the last timestamp to determine where to start fetching.
last_dt_in_csv = get_last_timestamp_from_csv(HISTORY_CSV_FILE)
start_fetch_date = None
if last_dt_in_csv:
start_fetch_date = last_dt_in_csv + timedelta(minutes=1)
logging.info(f"Last record in CSV is from {last_dt_in_csv}. Checking for new data since {start_fetch_date}.")
else:
logging.warning("Could not determine last timestamp from CSV. Assuming file is new or empty. No new data will be fetched.")
# 3. Fetch new data from Binance.
new_klines = []
if start_fetch_date and start_fetch_date < datetime.now(timezone.utc):
while True:
logging.info(f"Fetching new klines from {start_fetch_date}...")
fetched = client.get_historical_klines(SYMBOL, Client.KLINE_INTERVAL_1MINUTE, start_fetch_date.strftime("%Y-%m-%d %H:%M:%S"))
if not fetched:
logging.info("No new klines to fetch.")
break
new_klines.extend(fetched)
last_fetched_dt = datetime.fromtimestamp(fetched[-1][0] / 1000, tz=timezone.utc)
start_fetch_date = last_fetched_dt + timedelta(minutes=1)
logging.info(f"Fetched {len(fetched)} new klines, up to {last_fetched_dt}.")
if len(fetched) < 1000:
break
time.sleep(0.1)
# 4. If new data was found, append it to the CSV file.
if new_klines:
logging.info(f"Appending {len(new_klines)} new candles to {HISTORY_CSV_FILE}.")
try:
with open(HISTORY_CSV_FILE, 'a', newline='') as f:
writer = csv.writer(f)
for kline in new_klines:
open_time_dt = datetime.fromtimestamp(kline[0] / 1000, tz=timezone.utc)
open_time_str = open_time_dt.strftime('%Y-%m-%d %H:%M:%S')
close_time_dt = datetime.fromtimestamp(kline[6] / 1000, tz=timezone.utc)
close_time_str = close_time_dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
writer.writerow([open_time_str] + kline[1:6] + [close_time_str] + kline[7:])
except Exception as e:
logging.error(f"Failed to append new data to {HISTORY_CSV_FILE}: {e}")
# 5. OPTIMIZED: Read the CSV and load only the necessary data (2025 onwards) for the frontend.
logging.info("Reading CSV to populate cache with data from 01.01.2025 onwards...")
frontend_klines = []
frontend_start_dt = datetime(2025, 1, 1, tzinfo=timezone.utc)
try:
with open(HISTORY_CSV_FILE, 'r', newline='') as f:
reader = csv.reader(f)
next(reader) # Skip header
for row in reader:
try:
dt_obj = datetime.strptime(row[0], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
if dt_obj >= frontend_start_dt:
timestamp_ms = int(dt_obj.timestamp() * 1000)
frontend_klines.append([
timestamp_ms, row[1], row[2], row[3], row[4],
"0", "0", "0", "0", "0", "0"
])
except (ValueError, IndexError):
continue
logging.info(f"Starting historical data stream for SID={sid}")
client = Client()
historical_data_cache = frontend_klines
logging.info(f"--- Data initialization complete. {len(historical_data_cache)} candles cached for frontend. ---")
# Fetch the last 90 days of data in 6 chunks of 15 days each.
num_chunks = 6
chunk_size_days = 15
end_date = datetime.utcnow()
all_klines = []
for i in range(num_chunks):
start_date = end_date - timedelta(days=chunk_size_days)
logging.info(f"Fetching chunk {i + 1}/{num_chunks} ({start_date} to {end_date}) for SID={sid}")
new_klines = client.get_historical_klines(SYMBOL, Client.KLINE_INTERVAL_1MINUTE, str(start_date), str(end_date))
if new_klines:
all_klines.extend(new_klines)
progress_payload = {
'progress': ((i + 1) / num_chunks) * 100
}
socketio.emit('history_progress', progress_payload, to=sid)
end_date = start_date
socketio.sleep(0.05)
seen = set()
unique_klines = []
for kline in sorted(all_klines, key=lambda x: x[0]):
kline_tuple = tuple(kline)
if kline_tuple not in seen:
unique_klines.append(kline)
seen.add(kline_tuple)
with open(HISTORY_FILE, 'w') as f:
json.dump(unique_klines, f)
logging.info(f"Finished data stream for SID={sid}. Sending final payload of {len(unique_klines)} klines.")
socketio.emit('history_finished', {'klines_1m': unique_klines}, to=sid)
except Exception as e:
logging.error(f"Failed to read CSV for frontend cache: {e}")
historical_data_cache = []
logging.error(f"Error in stream_historical_data for SID={sid}: {e}", exc_info=True)
socketio.emit('history_error', {'message': str(e)}, to=sid)
# --- Real-time Data Listener ---
@ -183,21 +102,12 @@ def binance_listener_thread():
def handle_connect():
global app_initialized
logging.info(f"Client connected: IP={request.remote_addr}, SID={request.sid}")
with app_init_lock:
if not app_initialized:
logging.info("--- First client connected, initializing application data ---")
socketio.start_background_task(load_and_update_data)
logging.info("--- Initializing Application ---")
socketio.start_background_task(binance_listener_thread)
app_initialized = True
# Wait until the cache is populated.
while not historical_data_cache:
logging.info(f"SID={request.sid} is waiting for historical data cache...")
socketio.sleep(1)
logging.info(f"Sending {len(historical_data_cache)} cached klines to SID={request.sid}")
socketio.emit('history_finished', {'klines_1m': historical_data_cache}, to=request.sid)
socketio.start_background_task(target=stream_historical_data, sid=request.sid)
@socketio.on('analyze_chart')