live market web socket
This commit is contained in:
238
live_candle_fetcher.py
Normal file
238
live_candle_fetcher.py
Normal file
@ -0,0 +1,238 @@
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from hyperliquid.info import Info
|
||||
from hyperliquid.utils import constants
|
||||
import sqlite3
|
||||
from queue import Queue
|
||||
from threading import Thread
|
||||
|
||||
from logging_utils import setup_logging
|
||||
|
||||
class LiveCandleFetcher:
|
||||
"""
|
||||
Connects to Hyperliquid to maintain a complete and up-to-date database of
|
||||
1-minute candles using a robust producer-consumer architecture to prevent
|
||||
data corruption and duplication.
|
||||
"""
|
||||
|
||||
def __init__(self, log_level: str, coins: list):
|
||||
setup_logging(log_level, 'LiveCandleFetcher')
|
||||
self.db_path = os.path.join("_data", "market_data.db")
|
||||
self.coins_to_watch = set(coins)
|
||||
if not self.coins_to_watch:
|
||||
logging.error("No coins provided to watch. Exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
self.info = Info(constants.MAINNET_API_URL, skip_ws=False)
|
||||
self.candle_queue = Queue() # Thread-safe queue for candles
|
||||
self._ensure_tables_exist()
|
||||
|
||||
def _ensure_tables_exist(self):
|
||||
"""
|
||||
Ensures that all necessary tables are created with the correct schema and PRIMARY KEY.
|
||||
If a table exists with an incorrect schema, it attempts to migrate the data.
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
for coin in self.coins_to_watch:
|
||||
table_name = f"{coin}_1m"
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f"PRAGMA table_info('{table_name}')")
|
||||
columns = cursor.fetchall()
|
||||
|
||||
if columns:
|
||||
pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns)
|
||||
if not pk_found:
|
||||
logging.warning(f"Schema migration needed for table '{table_name}': 'timestamp_ms' is not the PRIMARY KEY.")
|
||||
logging.warning("Attempting to automatically rebuild the table...")
|
||||
try:
|
||||
# 1. Rename old table
|
||||
conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"')
|
||||
logging.info(f" -> Renamed existing table to '{table_name}_old'.")
|
||||
|
||||
# 2. Create new table with correct schema
|
||||
self._create_candle_table(conn, table_name)
|
||||
logging.info(f" -> Created new '{table_name}' table with correct schema.")
|
||||
|
||||
# 3. Copy unique data from old table to new table
|
||||
conn.execute(f'''
|
||||
INSERT OR IGNORE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades)
|
||||
SELECT datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades
|
||||
FROM "{table_name}_old"
|
||||
''')
|
||||
conn.commit()
|
||||
logging.info(" -> Copied data to new table.")
|
||||
|
||||
# 4. Drop the old table
|
||||
conn.execute(f'DROP TABLE "{table_name}_old"')
|
||||
logging.info(f" -> Removed old table. Migration for '{table_name}' complete.")
|
||||
except Exception as e:
|
||||
logging.error(f"FATAL: Automatic schema migration for '{table_name}' failed: {e}")
|
||||
logging.error("Please delete the database file '_data/market_data.db' manually and restart.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
# If table does not exist, create it
|
||||
self._create_candle_table(conn, table_name)
|
||||
logging.info("Database tables verified.")
|
||||
|
||||
def _create_candle_table(self, conn, table_name: str):
|
||||
"""Creates a new candle table with the correct schema."""
|
||||
conn.execute(f'''
|
||||
CREATE TABLE "{table_name}" (
|
||||
datetime_utc TEXT,
|
||||
timestamp_ms INTEGER PRIMARY KEY,
|
||||
open REAL,
|
||||
high REAL,
|
||||
low REAL,
|
||||
close REAL,
|
||||
volume REAL,
|
||||
number_of_trades INTEGER
|
||||
)
|
||||
''')
|
||||
|
||||
def on_message(self, message):
|
||||
"""
|
||||
Callback function to process incoming candle messages. This is the "Producer".
|
||||
It puts the raw message onto the queue for the DB writer.
|
||||
"""
|
||||
try:
|
||||
if message.get("channel") == "candle":
|
||||
candle_data = message.get("data", {})
|
||||
if candle_data:
|
||||
self.candle_queue.put(candle_data)
|
||||
except Exception as e:
|
||||
logging.error(f"Error in on_message: {e}")
|
||||
|
||||
def _database_writer_thread(self):
|
||||
"""
|
||||
This is the "Consumer" thread. It runs forever, pulling candles from the
|
||||
queue and writing them to the database, ensuring all writes are serial.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
candle = self.candle_queue.get()
|
||||
if candle is None: # A signal to stop the thread
|
||||
break
|
||||
|
||||
coin = candle.get('coin')
|
||||
if not coin:
|
||||
continue
|
||||
|
||||
table_name = f"{coin}_1m"
|
||||
record = (
|
||||
datetime.fromtimestamp(candle['t'] / 1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S'),
|
||||
candle['t'],
|
||||
candle.get('o'), candle.get('h'), candle.get('l'), candle.get('c'),
|
||||
candle.get('v'), candle.get('n')
|
||||
)
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(f'''
|
||||
INSERT OR REPLACE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
''', record)
|
||||
conn.commit()
|
||||
logging.debug(f"Upserted candle for {coin} at {record[0]}")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error in database writer thread: {e}")
|
||||
|
||||
def _get_last_timestamp_from_db(self, coin: str) -> int:
|
||||
"""Gets the most recent millisecond timestamp from a coin's 1m table."""
|
||||
table_name = f"{coin}_1m"
|
||||
try:
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
result = conn.execute(f'SELECT MAX(timestamp_ms) FROM "{table_name}"').fetchone()
|
||||
return int(result[0]) if result and result[0] is not None else None
|
||||
except Exception as e:
|
||||
logging.error(f"Could not read last timestamp from table '{table_name}': {e}")
|
||||
return None
|
||||
|
||||
def _fetch_historical_candles(self, coin: str, start_ms: int, end_ms: int):
|
||||
"""Fetches historical candles and puts them on the queue for the writer."""
|
||||
logging.info(f"Fetching historical data for {coin} from {datetime.fromtimestamp(start_ms/1000)}...")
|
||||
current_start = start_ms
|
||||
|
||||
while current_start < end_ms:
|
||||
try:
|
||||
http_info = Info(constants.MAINNET_API_URL, skip_ws=True)
|
||||
batch = http_info.candles_snapshot(coin, "1m", current_start, end_ms)
|
||||
if not batch:
|
||||
break
|
||||
|
||||
for candle in batch:
|
||||
candle['coin'] = coin
|
||||
self.candle_queue.put(candle)
|
||||
|
||||
last_ts = batch[-1]['t']
|
||||
if last_ts < current_start:
|
||||
break
|
||||
current_start = last_ts + 1
|
||||
time.sleep(0.5)
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching historical chunk for {coin}: {e}")
|
||||
break
|
||||
|
||||
logging.info(f"Historical data fetching for {coin} is complete.")
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Starts the database writer, catches up on historical data, then
|
||||
subscribes to the WebSocket for live updates.
|
||||
"""
|
||||
db_writer = Thread(target=self._database_writer_thread, daemon=True)
|
||||
db_writer.start()
|
||||
|
||||
logging.info("--- Starting Historical Data Catch-Up Phase ---")
|
||||
now_ms = int(time.time() * 1000)
|
||||
for coin in self.coins_to_watch:
|
||||
last_ts = self._get_last_timestamp_from_db(coin)
|
||||
start_ts = last_ts + 60000 if last_ts else now_ms - (7 * 24 * 60 * 60 * 1000)
|
||||
if start_ts < now_ms:
|
||||
self._fetch_historical_candles(coin, start_ts, now_ms)
|
||||
|
||||
logging.info("--- Historical Catch-Up Complete. Starting Live WebSocket Feed ---")
|
||||
for coin in self.coins_to_watch:
|
||||
# --- FIX: Use a lambda to create a unique callback for each subscription ---
|
||||
# This captures the 'coin' variable and adds it to the message data.
|
||||
callback = lambda msg, c=coin: self.on_message({**msg, 'data': {**msg.get('data',{}), 'coin': c}})
|
||||
subscription = {"type": "candle", "coin": coin, "interval": "1m"}
|
||||
self.info.subscribe(subscription, callback)
|
||||
logging.info(f"Subscribed to 1m candles for {coin}")
|
||||
time.sleep(0.2)
|
||||
|
||||
print("\nListening for live candle data... Press Ctrl+C to stop.")
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopping WebSocket listener...")
|
||||
self.info.ws_manager.stop()
|
||||
self.candle_queue.put(None)
|
||||
db_writer.join()
|
||||
print("Listener stopped.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="A hybrid historical and live candle data fetcher for Hyperliquid.")
|
||||
parser.add_argument(
|
||||
"--coins",
|
||||
nargs='+',
|
||||
required=True,
|
||||
help="List of coin symbols to fetch (e.g., BTC ETH)."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--log-level",
|
||||
default="normal",
|
||||
choices=['off', 'normal', 'debug'],
|
||||
help="Set the logging level for the script."
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
fetcher = LiveCandleFetcher(log_level=args.log_level, coins=args.coins)
|
||||
fetcher.run()
|
||||
|
||||
Reference in New Issue
Block a user