Files
hyper/live_market_utils.py
2025-11-04 13:34:49 +01:00

188 lines
7.2 KiB
Python

import logging
import json
import time
import os
import traceback
import sys
from hyperliquid.info import Info
from hyperliquid.utils import constants
from logging_utils import setup_logging
# --- Configuration for standalone error logging ---
LOGS_DIR = "_logs"
ERROR_LOG_FILE = os.path.join(LOGS_DIR, "live_market_errors.log")
def log_error(error_message: str, include_traceback: bool = True):
"""A simple, robust file logger for any errors."""
try:
if not os.path.exists(LOGS_DIR):
os.makedirs(LOGS_DIR)
with open(ERROR_LOG_FILE, 'a') as f:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
f.write(f"--- ERROR at {timestamp} UTC ---\n")
f.write(error_message + "\n")
if include_traceback:
f.write(traceback.format_exc() + "\n")
f.write("="*50 + "\n")
except Exception:
print(f"CRITICAL: Failed to write to error log file: {error_message}", file=sys.stderr)
def on_message(message, shared_prices_dict):
"""
Callback function to process incoming WebSocket messages for 'bbo' and 'trades'
and update the shared memory dictionary.
"""
try:
logging.debug(f"Received WebSocket message: {message}")
channel = message.get("channel")
# --- Parser 1: Handle Best Bid/Offer messages ---
if channel == "bbo":
data = message.get("data")
if not data:
logging.warning("BBO message received with no data.")
return
coin = data.get("coin")
if not coin:
logging.warning("BBO data received with no coin identifier.")
return
bid_ask_data = data.get("bbo")
if not bid_ask_data or not isinstance(bid_ask_data, list) or len(bid_ask_data) < 2:
logging.warning(f"[{coin}] Received BBO message with invalid 'bbo' array: {bid_ask_data}")
return
try:
bid_price_str = bid_ask_data[0].get('px')
ask_price_str = bid_ask_data[1].get('px')
if not bid_price_str or not ask_price_str:
logging.warning(f"[{coin}] BBO data missing 'px' field.")
return
bid_price = float(bid_price_str)
ask_price = float(ask_price_str)
# Update the shared dictionary for Bid and Ask
shared_prices_dict[f"{coin}_bid"] = bid_price
shared_prices_dict[f"{coin}_ask"] = ask_price
logging.info(f"Updated {coin} (BBO): Bid={bid_price:.4f}, Ask={ask_price:.4f}")
except (ValueError, TypeError, IndexError) as e:
logging.error(f"[{coin}] Error parsing BBO data: {e}. Data: {bid_ask_data}")
# --- Parser 2: Handle Live Trade messages ---
elif channel == "trades":
trade_list = message.get("data")
if not trade_list or not isinstance(trade_list, list) or len(trade_list) == 0:
logging.warning(f"Received 'trades' message with invalid data: {trade_list}")
return
# Process all trades in the batch
for trade in trade_list:
try:
coin = trade.get("coin")
price_str = trade.get("px")
if not coin or not price_str:
logging.warning(f"Trade data missing 'coin' or 'px': {trade}")
continue
price = float(price_str)
# Update the shared dictionary for the "Live Price" column
shared_prices_dict[coin] = price
logging.info(f"Updated {coin} (Live Price) to last trade: {price:.4f}")
except (ValueError, TypeError) as e:
logging.error(f"Error parsing trade data: {e}. Data: {trade}")
except Exception as e:
log_error(f"Error in WebSocket on_message: {e}")
def start_live_feed(shared_prices_dict, coins_to_watch: list, log_level='off'):
"""
Main function for the WebSocket process.
Subscribes to BOTH 'bbo' and 'trades' for all watched coins.
"""
setup_logging(log_level, 'LiveMarketFeed_Combined')
info = None
callback = lambda msg: on_message(msg, shared_prices_dict)
def connect_and_subscribe():
"""Establishes a new WebSocket connection and subscribes to both streams."""
try:
logging.info("Connecting to Hyperliquid WebSocket...")
new_info = Info(constants.MAINNET_API_URL, skip_ws=False)
# --- MODIFIED: Subscribe to 'bbo' AND 'trades' for each coin ---
for coin in coins_to_watch:
# Subscribe to Best Bid/Offer
bbo_sub = {"type": "bbo", "coin": coin}
new_info.subscribe(bbo_sub, callback)
logging.info(f"Subscribed to 'bbo' for {coin}.")
# Subscribe to Live Trades
trades_sub = {"type": "trades", "coin": coin}
new_info.subscribe(trades_sub, callback)
logging.info(f"Subscribed to 'trades' for {coin}.")
logging.info("WebSocket connected and all subscriptions sent.")
return new_info
except Exception as e:
log_error(f"Failed to connect to WebSocket: {e}")
return None
info = connect_and_subscribe()
if info is None:
logging.critical("Initial WebSocket connection failed. Exiting process.")
log_error("Initial WebSocket connection failed. Exiting process.", include_traceback=False)
time.sleep(10) # Wait before letting the process manager restart it
return
logging.info("Starting Combined (BBO + Trades) live price feed process.")
try:
while True:
# --- Watchdog Logic ---
time.sleep(15) # Check the connection every 15 seconds
if not info.ws_manager.is_alive():
error_msg = "WebSocket connection lost. Attempting to reconnect..."
logging.warning(error_msg)
log_error(error_msg, include_traceback=False) # Log it to the file
try:
info.ws_manager.stop() # Clean up old manager
except Exception as e:
log_error(f"Error stopping old ws_manager: {e}")
info = connect_and_subscribe()
if info is None:
logging.error("Reconnect failed, will retry in 15s.")
else:
logging.info("Successfully reconnected to WebSocket.")
else:
logging.debug("Watchdog check: WebSocket connection is active.")
except KeyboardInterrupt:
logging.info("Stopping WebSocket listener...")
except Exception as e:
log_error(f"Live Market Feed process crashed: {e}")
finally:
if info and info.ws_manager:
info.ws_manager.stop()
logging.info("Combined Listener stopped.")