import argparse import logging import os import sys import json import time from datetime import datetime, timezone from hyperliquid.info import Info from hyperliquid.utils import constants import sqlite3 from queue import Queue from threading import Thread from logging_utils import setup_logging class LiveCandleFetcher: """ Connects to Hyperliquid to maintain a complete and up-to-date database of 1-minute candles using a robust producer-consumer architecture to prevent data corruption and duplication. """ def __init__(self, log_level: str, coins: list): setup_logging(log_level, 'LiveCandleFetcher') self.db_path = os.path.join("_data", "market_data.db") self.coins_to_watch = set(coins) if not self.coins_to_watch: logging.error("No coins provided to watch. Exiting.") sys.exit(1) self.info = Info(constants.MAINNET_API_URL, skip_ws=False) self.candle_queue = Queue() # Thread-safe queue for candles self._ensure_tables_exist() def _ensure_tables_exist(self): """ Ensures that all necessary tables are created with the correct schema and PRIMARY KEY. If a table exists with an incorrect schema, it attempts to migrate the data. """ with sqlite3.connect(self.db_path) as conn: for coin in self.coins_to_watch: table_name = f"{coin}_1m" cursor = conn.cursor() cursor.execute(f"PRAGMA table_info('{table_name}')") columns = cursor.fetchall() if columns: pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns) if not pk_found: logging.warning(f"Schema migration needed for table '{table_name}': 'timestamp_ms' is not the PRIMARY KEY.") logging.warning("Attempting to automatically rebuild the table...") try: # 1. Rename old table conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"') logging.info(f" -> Renamed existing table to '{table_name}_old'.") # 2. Create new table with correct schema self._create_candle_table(conn, table_name) logging.info(f" -> Created new '{table_name}' table with correct schema.") # 3. Copy unique data from old table to new table conn.execute(f''' INSERT OR IGNORE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades) SELECT datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades FROM "{table_name}_old" ''') conn.commit() logging.info(" -> Copied data to new table.") # 4. Drop the old table conn.execute(f'DROP TABLE "{table_name}_old"') logging.info(f" -> Removed old table. Migration for '{table_name}' complete.") except Exception as e: logging.error(f"FATAL: Automatic schema migration for '{table_name}' failed: {e}") logging.error("Please delete the database file '_data/market_data.db' manually and restart.") sys.exit(1) else: # If table does not exist, create it self._create_candle_table(conn, table_name) logging.info("Database tables verified.") def _create_candle_table(self, conn, table_name: str): """Creates a new candle table with the correct schema.""" conn.execute(f''' CREATE TABLE "{table_name}" ( datetime_utc TEXT, timestamp_ms INTEGER PRIMARY KEY, open REAL, high REAL, low REAL, close REAL, volume REAL, number_of_trades INTEGER ) ''') def on_message(self, message): """ Callback function to process incoming candle messages. This is the "Producer". It puts the raw message onto the queue for the DB writer. """ try: if message.get("channel") == "candle": candle_data = message.get("data", {}) if candle_data: self.candle_queue.put(candle_data) except Exception as e: logging.error(f"Error in on_message: {e}") def _database_writer_thread(self): """ This is the "Consumer" thread. It runs forever, pulling candles from the queue and writing them to the database, ensuring all writes are serial. """ while True: try: candle = self.candle_queue.get() if candle is None: # A signal to stop the thread break coin = candle.get('coin') if not coin: continue table_name = f"{coin}_1m" record = ( datetime.fromtimestamp(candle['t'] / 1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), candle['t'], candle.get('o'), candle.get('h'), candle.get('l'), candle.get('c'), candle.get('v'), candle.get('n') ) with sqlite3.connect(self.db_path) as conn: conn.execute(f''' INSERT OR REPLACE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', record) conn.commit() logging.debug(f"Upserted candle for {coin} at {record[0]}") except Exception as e: logging.error(f"Error in database writer thread: {e}") def _get_last_timestamp_from_db(self, coin: str) -> int: """Gets the most recent millisecond timestamp from a coin's 1m table.""" table_name = f"{coin}_1m" try: with sqlite3.connect(self.db_path) as conn: result = conn.execute(f'SELECT MAX(timestamp_ms) FROM "{table_name}"').fetchone() return int(result[0]) if result and result[0] is not None else None except Exception as e: logging.error(f"Could not read last timestamp from table '{table_name}': {e}") return None def _fetch_historical_candles(self, coin: str, start_ms: int, end_ms: int): """Fetches historical candles and puts them on the queue for the writer.""" logging.info(f"Fetching historical data for {coin} from {datetime.fromtimestamp(start_ms/1000)}...") current_start = start_ms while current_start < end_ms: try: http_info = Info(constants.MAINNET_API_URL, skip_ws=True) batch = http_info.candles_snapshot(coin, "1m", current_start, end_ms) if not batch: break for candle in batch: candle['coin'] = coin self.candle_queue.put(candle) last_ts = batch[-1]['t'] if last_ts < current_start: break current_start = last_ts + 1 time.sleep(0.5) except Exception as e: logging.error(f"Error fetching historical chunk for {coin}: {e}") break logging.info(f"Historical data fetching for {coin} is complete.") def run(self): """ Starts the database writer, catches up on historical data, then subscribes to the WebSocket for live updates. """ db_writer = Thread(target=self._database_writer_thread, daemon=True) db_writer.start() logging.info("--- Starting Historical Data Catch-Up Phase ---") now_ms = int(time.time() * 1000) for coin in self.coins_to_watch: last_ts = self._get_last_timestamp_from_db(coin) start_ts = last_ts + 60000 if last_ts else now_ms - (7 * 24 * 60 * 60 * 1000) if start_ts < now_ms: self._fetch_historical_candles(coin, start_ts, now_ms) logging.info("--- Historical Catch-Up Complete. Starting Live WebSocket Feed ---") for coin in self.coins_to_watch: # --- FIX: Use a lambda to create a unique callback for each subscription --- # This captures the 'coin' variable and adds it to the message data. callback = lambda msg, c=coin: self.on_message({**msg, 'data': {**msg.get('data',{}), 'coin': c}}) subscription = {"type": "candle", "coin": coin, "interval": "1m"} self.info.subscribe(subscription, callback) logging.info(f"Subscribed to 1m candles for {coin}") time.sleep(0.2) print("\nListening for live candle data... Press Ctrl+C to stop.") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nStopping WebSocket listener...") self.info.ws_manager.stop() self.candle_queue.put(None) db_writer.join() print("Listener stopped.") if __name__ == "__main__": parser = argparse.ArgumentParser(description="A hybrid historical and live candle data fetcher for Hyperliquid.") parser.add_argument( "--coins", nargs='+', required=True, help="List of coin symbols to fetch (e.g., BTC ETH)." ) parser.add_argument( "--log-level", default="normal", choices=['off', 'normal', 'debug'], help="Set the logging level for the script." ) args = parser.parse_args() fetcher = LiveCandleFetcher(log_level=args.log_level, coins=args.coins) fetcher.run()