411 lines
17 KiB
Python
411 lines
17 KiB
Python
import time
|
|
import logging
|
|
import asyncio
|
|
import os
|
|
import json
|
|
import csv
|
|
import re
|
|
from flask import Flask, render_template, request
|
|
from flask_socketio import SocketIO
|
|
from binance import Client
|
|
import websockets
|
|
from threading import Lock
|
|
from datetime import datetime, timedelta
|
|
|
|
# --- Configuration ---
|
|
SYMBOL = 'ETHUSDT'
|
|
HISTORY_FILE = 'historical_data_1m.json'
|
|
DATA_FOLDER = 'data'
|
|
USER_PREFERENCES_FILE = 'user_preferences.json'
|
|
RESTART_TIMEOUT_S = 15
|
|
BINANCE_WS_URL = f"wss://stream.binance.com:9443/ws/{SYMBOL.lower()}@trade"
|
|
|
|
# --- Logging Setup ---
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# --- Flask App Initialization ---
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'secret!'
|
|
socketio = SocketIO(app, async_mode='threading')
|
|
|
|
# --- Global State ---
|
|
app_initialized = False
|
|
app_init_lock = Lock()
|
|
current_bar = {} # To track the currently forming 1-minute candle
|
|
selected_csv_file = None # Currently selected CSV file
|
|
csv_file_lock = Lock() # Lock for CSV file operations
|
|
|
|
# --- Utility Functions ---
|
|
def get_available_csv_files():
|
|
"""Get list of available CSV files with their start dates."""
|
|
csv_files = []
|
|
if not os.path.exists(DATA_FOLDER):
|
|
os.makedirs(DATA_FOLDER)
|
|
return csv_files
|
|
|
|
for filename in os.listdir(DATA_FOLDER):
|
|
if filename.endswith('.csv') and SYMBOL in filename:
|
|
# Extract date from filename like ETHUSDT_20250101.csv
|
|
match = re.search(r'(\d{8})', filename)
|
|
if match:
|
|
date_str = match.group(1)
|
|
try:
|
|
start_date = datetime.strptime(date_str, '%Y%m%d')
|
|
file_path = os.path.join(DATA_FOLDER, filename)
|
|
file_size = os.path.getsize(file_path)
|
|
csv_files.append({
|
|
'filename': filename,
|
|
'start_date_str': start_date.strftime('%Y-%m-%d'),
|
|
'date_str': date_str,
|
|
'size': file_size,
|
|
'display_name': f"{start_date.strftime('%Y-%m-%d')} ({filename})"
|
|
})
|
|
logging.info(f"Found CSV file: {filename}, size: {file_size}, date: {date_str}")
|
|
except ValueError:
|
|
logging.warning(f"Could not parse date from filename: {filename}")
|
|
continue
|
|
|
|
# Sort by start date (newest first)
|
|
csv_files.sort(key=lambda x: x['date_str'], reverse=True)
|
|
logging.info(f"Available CSV files: {[f['filename'] for f in csv_files]}")
|
|
return csv_files
|
|
|
|
def get_default_csv_file():
|
|
"""Get the default CSV file (smallest one or last used)."""
|
|
# Try to load last used file
|
|
if os.path.exists(USER_PREFERENCES_FILE):
|
|
try:
|
|
with open(USER_PREFERENCES_FILE, 'r') as f:
|
|
prefs = json.load(f)
|
|
last_file = prefs.get('last_csv_file')
|
|
if last_file and os.path.exists(os.path.join(DATA_FOLDER, last_file)):
|
|
logging.info(f"Using last selected file: {last_file}")
|
|
return last_file
|
|
except:
|
|
pass
|
|
|
|
# Fall back to smallest file
|
|
csv_files = get_available_csv_files()
|
|
if csv_files:
|
|
# Filter to exclude the large Binance file for better performance
|
|
filtered_files = [f for f in csv_files if not f['filename'].endswith('_Binance.csv')]
|
|
if filtered_files:
|
|
smallest_file = min(filtered_files, key=lambda x: x['size'])
|
|
logging.info(f"Using smallest filtered file: {smallest_file['filename']} ({smallest_file['size']} bytes)")
|
|
else:
|
|
smallest_file = min(csv_files, key=lambda x: x['size'])
|
|
logging.info(f"Using smallest file: {smallest_file['filename']} ({smallest_file['size']} bytes)")
|
|
return smallest_file['filename']
|
|
|
|
logging.warning("No CSV files found")
|
|
return None
|
|
|
|
def save_user_preference(csv_filename):
|
|
"""Save the user's CSV file preference."""
|
|
prefs = {}
|
|
if os.path.exists(USER_PREFERENCES_FILE):
|
|
try:
|
|
with open(USER_PREFERENCES_FILE, 'r') as f:
|
|
prefs = json.load(f)
|
|
except:
|
|
pass
|
|
|
|
prefs['last_csv_file'] = csv_filename
|
|
with open(USER_PREFERENCES_FILE, 'w') as f:
|
|
json.dump(prefs, f)
|
|
|
|
def read_csv_data(csv_filename):
|
|
"""Read historical data from CSV file."""
|
|
csv_path = os.path.join(DATA_FOLDER, csv_filename)
|
|
if not os.path.exists(csv_path):
|
|
return []
|
|
|
|
klines = []
|
|
try:
|
|
with open(csv_path, 'r', newline='', encoding='utf-8') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
for row in reader:
|
|
# Convert CSV row to kline format
|
|
open_time = datetime.strptime(row['Open time'], '%Y-%m-%d %H:%M:%S')
|
|
close_time = datetime.strptime(row['Close time'].split('.')[0], '%Y-%m-%d %H:%M:%S')
|
|
|
|
# =================================================================
|
|
# --- FIX START: Convert string values to numeric types ---
|
|
# The original code passed the string values from the CSV directly.
|
|
# This caused the historical data to be misinterpreted by the chart.
|
|
# By converting to float/int here, we ensure data consistency.
|
|
# =================================================================
|
|
kline = [
|
|
int(open_time.timestamp() * 1000), # Open time (ms)
|
|
float(row['Open']), # Open
|
|
float(row['High']), # High
|
|
float(row['Low']), # Low
|
|
float(row['Close']), # Close
|
|
float(row['Volume']), # Volume
|
|
int(close_time.timestamp() * 1000), # Close time (ms)
|
|
float(row['Quote asset volume']), # Quote asset volume
|
|
int(row['Number of trades']), # Number of trades
|
|
float(row['Taker buy base asset volume']), # Taker buy base asset volume
|
|
float(row['Taker buy quote asset volume']), # Taker buy quote asset volume
|
|
float(row['Ignore']) # Ignore
|
|
]
|
|
# --- FIX END ---
|
|
# =================================================================
|
|
klines.append(kline)
|
|
except Exception as e:
|
|
logging.error(f"Error reading CSV file {csv_filename}: {e}")
|
|
return []
|
|
|
|
return klines
|
|
|
|
def append_to_csv(csv_filename, candle_data):
|
|
"""Append new candle data to CSV file."""
|
|
csv_path = os.path.join(DATA_FOLDER, csv_filename)
|
|
|
|
try:
|
|
with csv_file_lock:
|
|
# Convert candle data to CSV row
|
|
open_time = datetime.fromtimestamp(candle_data['time'])
|
|
close_time = open_time.replace(second=59, microsecond=999000)
|
|
|
|
row = [
|
|
open_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
candle_data['open'],
|
|
candle_data['high'],
|
|
candle_data['low'],
|
|
candle_data['close'],
|
|
0.0, # Volume (placeholder)
|
|
close_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
|
|
0.0, # Quote asset volume (placeholder)
|
|
1, # Number of trades (placeholder)
|
|
0.0, # Taker buy base asset volume (placeholder)
|
|
0.0, # Taker buy quote asset volume (placeholder)
|
|
0.0 # Ignore
|
|
]
|
|
|
|
# Check if file exists and has header
|
|
file_exists = os.path.exists(csv_path)
|
|
|
|
with open(csv_path, 'a', newline='', encoding='utf-8') as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
|
|
# Write header if file is new
|
|
if not file_exists:
|
|
headers = [
|
|
'Open time', 'Open', 'High', 'Low', 'Close', 'Volume',
|
|
'Close time', 'Quote asset volume', 'Number of trades',
|
|
'Taker buy base asset volume', 'Taker buy quote asset volume', 'Ignore'
|
|
]
|
|
writer.writerow(headers)
|
|
|
|
writer.writerow(row)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error appending to CSV file {csv_filename}: {e}")
|
|
|
|
def fill_missing_data(csv_filename):
|
|
"""Fill missing data by downloading from Binance."""
|
|
global selected_csv_file
|
|
|
|
try:
|
|
logging.info(f"Checking for missing data in {csv_filename}")
|
|
|
|
# Get the start date from filename
|
|
match = re.search(r'(\d{8})', csv_filename)
|
|
if not match:
|
|
return
|
|
|
|
date_str = match.group(1)
|
|
start_date = datetime.strptime(date_str, '%Y%m%d')
|
|
|
|
# Read existing data
|
|
existing_data = read_csv_data(csv_filename)
|
|
|
|
# Determine what data we need to fetch
|
|
if existing_data:
|
|
# Get the last timestamp from existing data
|
|
last_timestamp = existing_data[-1][0] // 1000 # Convert to seconds
|
|
fetch_start = datetime.fromtimestamp(last_timestamp) + timedelta(minutes=1)
|
|
else:
|
|
fetch_start = start_date
|
|
|
|
# Fetch missing data up to current time
|
|
now = datetime.now()
|
|
if fetch_start >= now:
|
|
logging.info(f"No missing data for {csv_filename}")
|
|
return existing_data
|
|
|
|
logging.info(f"Fetching missing data from {fetch_start} to {now}")
|
|
|
|
client = Client()
|
|
missing_klines = client.get_historical_klines(
|
|
SYMBOL,
|
|
Client.KLINE_INTERVAL_1MINUTE,
|
|
start_str=fetch_start.strftime('%Y-%m-%d %H:%M:%S'),
|
|
end_str=now.strftime('%Y-%m-%d %H:%M:%S')
|
|
)
|
|
|
|
if missing_klines:
|
|
# Append missing data to CSV
|
|
csv_path = os.path.join(DATA_FOLDER, csv_filename)
|
|
with csv_file_lock:
|
|
with open(csv_path, 'a', newline='', encoding='utf-8') as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
|
|
for kline in missing_klines:
|
|
open_time = datetime.fromtimestamp(kline[0] / 1000)
|
|
close_time = datetime.fromtimestamp(kline[6] / 1000)
|
|
|
|
row = [
|
|
open_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
kline[1], kline[2], kline[3], kline[4], kline[5],
|
|
close_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
|
|
kline[7], kline[8], kline[9], kline[10], kline[11]
|
|
]
|
|
writer.writerow(row)
|
|
|
|
logging.info(f"Added {len(missing_klines)} missing candles to {csv_filename}")
|
|
existing_data.extend(missing_klines)
|
|
|
|
return existing_data
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error filling missing data for {csv_filename}: {e}")
|
|
return existing_data if 'existing_data' in locals() else []
|
|
|
|
# --- Historical Data Streaming ---
|
|
def stream_historical_data(sid):
|
|
"""
|
|
Loads historical data from the selected CSV file and sends it to the client.
|
|
"""
|
|
global selected_csv_file
|
|
|
|
try:
|
|
logging.info(f"Starting historical data stream for SID={sid}")
|
|
|
|
# Get selected CSV file or default
|
|
if not selected_csv_file:
|
|
selected_csv_file = get_default_csv_file()
|
|
|
|
if not selected_csv_file:
|
|
# No CSV files available, create a default one
|
|
logging.warning("No CSV files available, creating default file")
|
|
selected_csv_file = f"ETHUSDT_{datetime.now().strftime('%Y%m%d')}.csv"
|
|
|
|
logging.info(f"Using CSV file: {selected_csv_file}")
|
|
|
|
# Fill missing data and get all klines
|
|
all_klines = fill_missing_data(selected_csv_file)
|
|
|
|
# Send progress update
|
|
socketio.emit('history_progress', {'progress': 100}, to=sid)
|
|
|
|
logging.info(f"Finished data stream for SID={sid}. Sending final payload of {len(all_klines)} klines.")
|
|
socketio.emit('history_finished', {'klines_1m': all_klines}, to=sid)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in stream_historical_data for SID={sid}: {e}", exc_info=True)
|
|
socketio.emit('history_error', {'message': str(e)}, to=sid)
|
|
|
|
# --- Real-time Data Listener ---
|
|
def binance_listener_thread():
|
|
"""
|
|
Connects to Binance, manages the 1-minute candle, and emits updates.
|
|
"""
|
|
global current_bar
|
|
async def listener():
|
|
global current_bar
|
|
while True:
|
|
try:
|
|
logging.info(f"Connecting to Binance WebSocket at {BINANCE_WS_URL}...")
|
|
async with websockets.connect(BINANCE_WS_URL) as websocket:
|
|
logging.info("Binance WebSocket connected successfully.")
|
|
while True:
|
|
message = await websocket.recv()
|
|
trade = json.loads(message)
|
|
|
|
price = float(trade['p'])
|
|
trade_time_s = trade['T'] // 1000
|
|
candle_timestamp = trade_time_s - (trade_time_s % 60)
|
|
|
|
if not current_bar or candle_timestamp > current_bar.get("time", 0):
|
|
if current_bar:
|
|
# The previous candle is now closed, emit it and save to CSV
|
|
logging.info(f"Candle closed at {current_bar['close']}. Emitting 'candle_closed' event.")
|
|
socketio.emit('candle_closed', current_bar)
|
|
|
|
# Append to selected CSV file
|
|
if selected_csv_file:
|
|
append_to_csv(selected_csv_file, current_bar)
|
|
|
|
current_bar = {"time": candle_timestamp, "open": price, "high": price, "low": price, "close": price}
|
|
else:
|
|
current_bar['high'] = max(current_bar.get('high', price), price)
|
|
current_bar['low'] = min(current_bar.get('low', price), price)
|
|
current_bar['close'] = price
|
|
|
|
# Emit the live, updating candle for visual feedback
|
|
socketio.emit('candle_update', current_bar)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Binance listener error: {e}. Reconnecting...")
|
|
await asyncio.sleep(RESTART_TIMEOUT_S)
|
|
|
|
asyncio.run(listener())
|
|
|
|
# --- SocketIO Event Handlers ---
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
global app_initialized
|
|
logging.info(f"Client connected: IP={request.remote_addr}, SID={request.sid}")
|
|
with app_init_lock:
|
|
if not app_initialized:
|
|
logging.info("--- Initializing Application ---")
|
|
socketio.start_background_task(binance_listener_thread)
|
|
app_initialized = True
|
|
socketio.start_background_task(target=stream_historical_data, sid=request.sid)
|
|
|
|
@socketio.on('get_csv_files')
|
|
def handle_get_csv_files():
|
|
"""Send available CSV files to client."""
|
|
logging.info(f"Received get_csv_files request from SID={request.sid}")
|
|
csv_files = get_available_csv_files()
|
|
default_file = get_default_csv_file()
|
|
logging.info(f"Sending CSV files list: {len(csv_files)} files, default: {default_file}")
|
|
socketio.emit('csv_files_list', {
|
|
'files': csv_files,
|
|
'selected': default_file
|
|
})
|
|
|
|
@socketio.on('select_csv_file')
|
|
def handle_select_csv_file(data):
|
|
"""Handle CSV file selection by user."""
|
|
global selected_csv_file
|
|
|
|
logging.info(f"Received select_csv_file request from SID={request.sid} with data: {data}")
|
|
filename = data.get('filename')
|
|
if filename:
|
|
csv_files = get_available_csv_files()
|
|
valid_files = [f['filename'] for f in csv_files]
|
|
|
|
if filename in valid_files:
|
|
selected_csv_file = filename
|
|
save_user_preference(filename)
|
|
logging.info(f"User selected CSV file: {filename}")
|
|
|
|
# Stream new historical data
|
|
socketio.start_background_task(target=stream_historical_data, sid=request.sid)
|
|
else:
|
|
logging.error(f"Invalid CSV file selected: {filename}")
|
|
socketio.emit('error', {'message': f'Invalid CSV file: {filename}'})
|
|
|
|
# --- Flask Routes ---
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html', symbol=SYMBOL)
|
|
|
|
# --- Main Application Execution ---
|
|
if __name__ == '__main__':
|
|
logging.info("Starting Flask-SocketIO server...")
|
|
socketio.run(app, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True, debug=False)
|