Files
hyper/live_candle_fetcher.py
2025-10-21 15:09:14 +02:00

239 lines
10 KiB
Python

import argparse
import logging
import os
import sys
import json
import time
from datetime import datetime, timezone
from hyperliquid.info import Info
from hyperliquid.utils import constants
import sqlite3
from queue import Queue
from threading import Thread
from logging_utils import setup_logging
class LiveCandleFetcher:
"""
Connects to Hyperliquid to maintain a complete and up-to-date database of
1-minute candles using a robust producer-consumer architecture to prevent
data corruption and duplication.
"""
def __init__(self, log_level: str, coins: list):
setup_logging(log_level, 'LiveCandleFetcher')
self.db_path = os.path.join("_data", "market_data.db")
self.coins_to_watch = set(coins)
if not self.coins_to_watch:
logging.error("No coins provided to watch. Exiting.")
sys.exit(1)
self.info = Info(constants.MAINNET_API_URL, skip_ws=False)
self.candle_queue = Queue() # Thread-safe queue for candles
self._ensure_tables_exist()
def _ensure_tables_exist(self):
"""
Ensures that all necessary tables are created with the correct schema and PRIMARY KEY.
If a table exists with an incorrect schema, it attempts to migrate the data.
"""
with sqlite3.connect(self.db_path) as conn:
for coin in self.coins_to_watch:
table_name = f"{coin}_1m"
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info('{table_name}')")
columns = cursor.fetchall()
if columns:
pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns)
if not pk_found:
logging.warning(f"Schema migration needed for table '{table_name}': 'timestamp_ms' is not the PRIMARY KEY.")
logging.warning("Attempting to automatically rebuild the table...")
try:
# 1. Rename old table
conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"')
logging.info(f" -> Renamed existing table to '{table_name}_old'.")
# 2. Create new table with correct schema
self._create_candle_table(conn, table_name)
logging.info(f" -> Created new '{table_name}' table with correct schema.")
# 3. Copy unique data from old table to new table
conn.execute(f'''
INSERT OR IGNORE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades)
SELECT datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades
FROM "{table_name}_old"
''')
conn.commit()
logging.info(" -> Copied data to new table.")
# 4. Drop the old table
conn.execute(f'DROP TABLE "{table_name}_old"')
logging.info(f" -> Removed old table. Migration for '{table_name}' complete.")
except Exception as e:
logging.error(f"FATAL: Automatic schema migration for '{table_name}' failed: {e}")
logging.error("Please delete the database file '_data/market_data.db' manually and restart.")
sys.exit(1)
else:
# If table does not exist, create it
self._create_candle_table(conn, table_name)
logging.info("Database tables verified.")
def _create_candle_table(self, conn, table_name: str):
"""Creates a new candle table with the correct schema."""
conn.execute(f'''
CREATE TABLE "{table_name}" (
datetime_utc TEXT,
timestamp_ms INTEGER PRIMARY KEY,
open REAL,
high REAL,
low REAL,
close REAL,
volume REAL,
number_of_trades INTEGER
)
''')
def on_message(self, message):
"""
Callback function to process incoming candle messages. This is the "Producer".
It puts the raw message onto the queue for the DB writer.
"""
try:
if message.get("channel") == "candle":
candle_data = message.get("data", {})
if candle_data:
self.candle_queue.put(candle_data)
except Exception as e:
logging.error(f"Error in on_message: {e}")
def _database_writer_thread(self):
"""
This is the "Consumer" thread. It runs forever, pulling candles from the
queue and writing them to the database, ensuring all writes are serial.
"""
while True:
try:
candle = self.candle_queue.get()
if candle is None: # A signal to stop the thread
break
coin = candle.get('coin')
if not coin:
continue
table_name = f"{coin}_1m"
record = (
datetime.fromtimestamp(candle['t'] / 1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S'),
candle['t'],
candle.get('o'), candle.get('h'), candle.get('l'), candle.get('c'),
candle.get('v'), candle.get('n')
)
with sqlite3.connect(self.db_path) as conn:
conn.execute(f'''
INSERT OR REPLACE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', record)
conn.commit()
logging.debug(f"Upserted candle for {coin} at {record[0]}")
except Exception as e:
logging.error(f"Error in database writer thread: {e}")
def _get_last_timestamp_from_db(self, coin: str) -> int:
"""Gets the most recent millisecond timestamp from a coin's 1m table."""
table_name = f"{coin}_1m"
try:
with sqlite3.connect(self.db_path) as conn:
result = conn.execute(f'SELECT MAX(timestamp_ms) FROM "{table_name}"').fetchone()
return int(result[0]) if result and result[0] is not None else None
except Exception as e:
logging.error(f"Could not read last timestamp from table '{table_name}': {e}")
return None
def _fetch_historical_candles(self, coin: str, start_ms: int, end_ms: int):
"""Fetches historical candles and puts them on the queue for the writer."""
logging.info(f"Fetching historical data for {coin} from {datetime.fromtimestamp(start_ms/1000)}...")
current_start = start_ms
while current_start < end_ms:
try:
http_info = Info(constants.MAINNET_API_URL, skip_ws=True)
batch = http_info.candles_snapshot(coin, "1m", current_start, end_ms)
if not batch:
break
for candle in batch:
candle['coin'] = coin
self.candle_queue.put(candle)
last_ts = batch[-1]['t']
if last_ts < current_start:
break
current_start = last_ts + 1
time.sleep(0.5)
except Exception as e:
logging.error(f"Error fetching historical chunk for {coin}: {e}")
break
logging.info(f"Historical data fetching for {coin} is complete.")
def run(self):
"""
Starts the database writer, catches up on historical data, then
subscribes to the WebSocket for live updates.
"""
db_writer = Thread(target=self._database_writer_thread, daemon=True)
db_writer.start()
logging.info("--- Starting Historical Data Catch-Up Phase ---")
now_ms = int(time.time() * 1000)
for coin in self.coins_to_watch:
last_ts = self._get_last_timestamp_from_db(coin)
start_ts = last_ts + 60000 if last_ts else now_ms - (7 * 24 * 60 * 60 * 1000)
if start_ts < now_ms:
self._fetch_historical_candles(coin, start_ts, now_ms)
logging.info("--- Historical Catch-Up Complete. Starting Live WebSocket Feed ---")
for coin in self.coins_to_watch:
# --- FIX: Use a lambda to create a unique callback for each subscription ---
# This captures the 'coin' variable and adds it to the message data.
callback = lambda msg, c=coin: self.on_message({**msg, 'data': {**msg.get('data',{}), 'coin': c}})
subscription = {"type": "candle", "coin": coin, "interval": "1m"}
self.info.subscribe(subscription, callback)
logging.info(f"Subscribed to 1m candles for {coin}")
time.sleep(0.2)
print("\nListening for live candle data... Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping WebSocket listener...")
self.info.ws_manager.stop()
self.candle_queue.put(None)
db_writer.join()
print("Listener stopped.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A hybrid historical and live candle data fetcher for Hyperliquid.")
parser.add_argument(
"--coins",
nargs='+',
required=True,
help="List of coin symbols to fetch (e.g., BTC ETH)."
)
parser.add_argument(
"--log-level",
default="normal",
choices=['off', 'normal', 'debug'],
help="Set the logging level for the script."
)
args = parser.parse_args()
fetcher = LiveCandleFetcher(log_level=args.log_level, coins=args.coins)
fetcher.run()