239 lines
10 KiB
Python
239 lines
10 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from hyperliquid.info import Info
|
|
from hyperliquid.utils import constants
|
|
import sqlite3
|
|
from queue import Queue
|
|
from threading import Thread
|
|
|
|
from logging_utils import setup_logging
|
|
|
|
class LiveCandleFetcher:
|
|
"""
|
|
Connects to Hyperliquid to maintain a complete and up-to-date database of
|
|
1-minute candles using a robust producer-consumer architecture to prevent
|
|
data corruption and duplication.
|
|
"""
|
|
|
|
def __init__(self, log_level: str, coins: list):
|
|
setup_logging(log_level, 'LiveCandleFetcher')
|
|
self.db_path = os.path.join("_data", "market_data.db")
|
|
self.coins_to_watch = set(coins)
|
|
if not self.coins_to_watch:
|
|
logging.error("No coins provided to watch. Exiting.")
|
|
sys.exit(1)
|
|
|
|
self.info = Info(constants.MAINNET_API_URL, skip_ws=False)
|
|
self.candle_queue = Queue() # Thread-safe queue for candles
|
|
self._ensure_tables_exist()
|
|
|
|
def _ensure_tables_exist(self):
|
|
"""
|
|
Ensures that all necessary tables are created with the correct schema and PRIMARY KEY.
|
|
If a table exists with an incorrect schema, it attempts to migrate the data.
|
|
"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
for coin in self.coins_to_watch:
|
|
table_name = f"{coin}_1m"
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"PRAGMA table_info('{table_name}')")
|
|
columns = cursor.fetchall()
|
|
|
|
if columns:
|
|
pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns)
|
|
if not pk_found:
|
|
logging.warning(f"Schema migration needed for table '{table_name}': 'timestamp_ms' is not the PRIMARY KEY.")
|
|
logging.warning("Attempting to automatically rebuild the table...")
|
|
try:
|
|
# 1. Rename old table
|
|
conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"')
|
|
logging.info(f" -> Renamed existing table to '{table_name}_old'.")
|
|
|
|
# 2. Create new table with correct schema
|
|
self._create_candle_table(conn, table_name)
|
|
logging.info(f" -> Created new '{table_name}' table with correct schema.")
|
|
|
|
# 3. Copy unique data from old table to new table
|
|
conn.execute(f'''
|
|
INSERT OR IGNORE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades)
|
|
SELECT datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades
|
|
FROM "{table_name}_old"
|
|
''')
|
|
conn.commit()
|
|
logging.info(" -> Copied data to new table.")
|
|
|
|
# 4. Drop the old table
|
|
conn.execute(f'DROP TABLE "{table_name}_old"')
|
|
logging.info(f" -> Removed old table. Migration for '{table_name}' complete.")
|
|
except Exception as e:
|
|
logging.error(f"FATAL: Automatic schema migration for '{table_name}' failed: {e}")
|
|
logging.error("Please delete the database file '_data/market_data.db' manually and restart.")
|
|
sys.exit(1)
|
|
else:
|
|
# If table does not exist, create it
|
|
self._create_candle_table(conn, table_name)
|
|
logging.info("Database tables verified.")
|
|
|
|
def _create_candle_table(self, conn, table_name: str):
|
|
"""Creates a new candle table with the correct schema."""
|
|
conn.execute(f'''
|
|
CREATE TABLE "{table_name}" (
|
|
datetime_utc TEXT,
|
|
timestamp_ms INTEGER PRIMARY KEY,
|
|
open REAL,
|
|
high REAL,
|
|
low REAL,
|
|
close REAL,
|
|
volume REAL,
|
|
number_of_trades INTEGER
|
|
)
|
|
''')
|
|
|
|
def on_message(self, message):
|
|
"""
|
|
Callback function to process incoming candle messages. This is the "Producer".
|
|
It puts the raw message onto the queue for the DB writer.
|
|
"""
|
|
try:
|
|
if message.get("channel") == "candle":
|
|
candle_data = message.get("data", {})
|
|
if candle_data:
|
|
self.candle_queue.put(candle_data)
|
|
except Exception as e:
|
|
logging.error(f"Error in on_message: {e}")
|
|
|
|
def _database_writer_thread(self):
|
|
"""
|
|
This is the "Consumer" thread. It runs forever, pulling candles from the
|
|
queue and writing them to the database, ensuring all writes are serial.
|
|
"""
|
|
while True:
|
|
try:
|
|
candle = self.candle_queue.get()
|
|
if candle is None: # A signal to stop the thread
|
|
break
|
|
|
|
coin = candle.get('coin')
|
|
if not coin:
|
|
continue
|
|
|
|
table_name = f"{coin}_1m"
|
|
record = (
|
|
datetime.fromtimestamp(candle['t'] / 1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S'),
|
|
candle['t'],
|
|
candle.get('o'), candle.get('h'), candle.get('l'), candle.get('c'),
|
|
candle.get('v'), candle.get('n')
|
|
)
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute(f'''
|
|
INSERT OR REPLACE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', record)
|
|
conn.commit()
|
|
logging.debug(f"Upserted candle for {coin} at {record[0]}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in database writer thread: {e}")
|
|
|
|
def _get_last_timestamp_from_db(self, coin: str) -> int:
|
|
"""Gets the most recent millisecond timestamp from a coin's 1m table."""
|
|
table_name = f"{coin}_1m"
|
|
try:
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
result = conn.execute(f'SELECT MAX(timestamp_ms) FROM "{table_name}"').fetchone()
|
|
return int(result[0]) if result and result[0] is not None else None
|
|
except Exception as e:
|
|
logging.error(f"Could not read last timestamp from table '{table_name}': {e}")
|
|
return None
|
|
|
|
def _fetch_historical_candles(self, coin: str, start_ms: int, end_ms: int):
|
|
"""Fetches historical candles and puts them on the queue for the writer."""
|
|
logging.info(f"Fetching historical data for {coin} from {datetime.fromtimestamp(start_ms/1000)}...")
|
|
current_start = start_ms
|
|
|
|
while current_start < end_ms:
|
|
try:
|
|
http_info = Info(constants.MAINNET_API_URL, skip_ws=True)
|
|
batch = http_info.candles_snapshot(coin, "1m", current_start, end_ms)
|
|
if not batch:
|
|
break
|
|
|
|
for candle in batch:
|
|
candle['coin'] = coin
|
|
self.candle_queue.put(candle)
|
|
|
|
last_ts = batch[-1]['t']
|
|
if last_ts < current_start:
|
|
break
|
|
current_start = last_ts + 1
|
|
time.sleep(0.5)
|
|
except Exception as e:
|
|
logging.error(f"Error fetching historical chunk for {coin}: {e}")
|
|
break
|
|
|
|
logging.info(f"Historical data fetching for {coin} is complete.")
|
|
|
|
def run(self):
|
|
"""
|
|
Starts the database writer, catches up on historical data, then
|
|
subscribes to the WebSocket for live updates.
|
|
"""
|
|
db_writer = Thread(target=self._database_writer_thread, daemon=True)
|
|
db_writer.start()
|
|
|
|
logging.info("--- Starting Historical Data Catch-Up Phase ---")
|
|
now_ms = int(time.time() * 1000)
|
|
for coin in self.coins_to_watch:
|
|
last_ts = self._get_last_timestamp_from_db(coin)
|
|
start_ts = last_ts + 60000 if last_ts else now_ms - (7 * 24 * 60 * 60 * 1000)
|
|
if start_ts < now_ms:
|
|
self._fetch_historical_candles(coin, start_ts, now_ms)
|
|
|
|
logging.info("--- Historical Catch-Up Complete. Starting Live WebSocket Feed ---")
|
|
for coin in self.coins_to_watch:
|
|
# --- FIX: Use a lambda to create a unique callback for each subscription ---
|
|
# This captures the 'coin' variable and adds it to the message data.
|
|
callback = lambda msg, c=coin: self.on_message({**msg, 'data': {**msg.get('data',{}), 'coin': c}})
|
|
subscription = {"type": "candle", "coin": coin, "interval": "1m"}
|
|
self.info.subscribe(subscription, callback)
|
|
logging.info(f"Subscribed to 1m candles for {coin}")
|
|
time.sleep(0.2)
|
|
|
|
print("\nListening for live candle data... Press Ctrl+C to stop.")
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\nStopping WebSocket listener...")
|
|
self.info.ws_manager.stop()
|
|
self.candle_queue.put(None)
|
|
db_writer.join()
|
|
print("Listener stopped.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="A hybrid historical and live candle data fetcher for Hyperliquid.")
|
|
parser.add_argument(
|
|
"--coins",
|
|
nargs='+',
|
|
required=True,
|
|
help="List of coin symbols to fetch (e.g., BTC ETH)."
|
|
)
|
|
parser.add_argument(
|
|
"--log-level",
|
|
default="normal",
|
|
choices=['off', 'normal', 'debug'],
|
|
help="Set the logging level for the script."
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
fetcher = LiveCandleFetcher(log_level=args.log_level, coins=args.coins)
|
|
fetcher.run()
|
|
|