import logging import json import time import os import traceback import sys from hyperliquid.info import Info from hyperliquid.utils import constants from logging_utils import setup_logging # --- Configuration for standalone error logging --- LOGS_DIR = "_logs" ERROR_LOG_FILE = os.path.join(LOGS_DIR, "live_market_errors.log") def log_error(error_message: str, include_traceback: bool = True): """A simple, robust file logger for any errors.""" try: if not os.path.exists(LOGS_DIR): os.makedirs(LOGS_DIR) with open(ERROR_LOG_FILE, 'a') as f: timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) f.write(f"--- ERROR at {timestamp} UTC ---\n") f.write(error_message + "\n") if include_traceback: f.write(traceback.format_exc() + "\n") f.write("="*50 + "\n") except Exception: print(f"CRITICAL: Failed to write to error log file: {error_message}", file=sys.stderr) def on_message(message, shared_prices_dict): """ Callback function to process incoming WebSocket messages for 'bbo' and 'trades' and update the shared memory dictionary. """ try: logging.debug(f"Received WebSocket message: {message}") channel = message.get("channel") # --- Parser 1: Handle Best Bid/Offer messages --- if channel == "bbo": data = message.get("data") if not data: logging.warning("BBO message received with no data.") return coin = data.get("coin") if not coin: logging.warning("BBO data received with no coin identifier.") return bid_ask_data = data.get("bbo") if not bid_ask_data or not isinstance(bid_ask_data, list) or len(bid_ask_data) < 2: logging.warning(f"[{coin}] Received BBO message with invalid 'bbo' array: {bid_ask_data}") return try: bid_price_str = bid_ask_data[0].get('px') ask_price_str = bid_ask_data[1].get('px') if not bid_price_str or not ask_price_str: logging.warning(f"[{coin}] BBO data missing 'px' field.") return bid_price = float(bid_price_str) ask_price = float(ask_price_str) # Update the shared dictionary for Bid and Ask shared_prices_dict[f"{coin}_bid"] = bid_price shared_prices_dict[f"{coin}_ask"] = ask_price logging.info(f"Updated {coin} (BBO): Bid={bid_price:.4f}, Ask={ask_price:.4f}") except (ValueError, TypeError, IndexError) as e: logging.error(f"[{coin}] Error parsing BBO data: {e}. Data: {bid_ask_data}") # --- Parser 2: Handle Live Trade messages --- elif channel == "trades": trade_list = message.get("data") if not trade_list or not isinstance(trade_list, list) or len(trade_list) == 0: logging.warning(f"Received 'trades' message with invalid data: {trade_list}") return # Process all trades in the batch for trade in trade_list: try: coin = trade.get("coin") price_str = trade.get("px") if not coin or not price_str: logging.warning(f"Trade data missing 'coin' or 'px': {trade}") continue price = float(price_str) # Update the shared dictionary for the "Live Price" column shared_prices_dict[coin] = price logging.info(f"Updated {coin} (Live Price) to last trade: {price:.4f}") except (ValueError, TypeError) as e: logging.error(f"Error parsing trade data: {e}. Data: {trade}") except Exception as e: log_error(f"Error in WebSocket on_message: {e}") def start_live_feed(shared_prices_dict, coins_to_watch: list, log_level='off'): """ Main function for the WebSocket process. Subscribes to BOTH 'bbo' and 'trades' for all watched coins. """ setup_logging(log_level, 'LiveMarketFeed_Combined') info = None callback = lambda msg: on_message(msg, shared_prices_dict) def connect_and_subscribe(): """Establishes a new WebSocket connection and subscribes to both streams.""" try: logging.info("Connecting to Hyperliquid WebSocket...") new_info = Info(constants.MAINNET_API_URL, skip_ws=False) # --- MODIFIED: Subscribe to 'bbo' AND 'trades' for each coin --- for coin in coins_to_watch: # Subscribe to Best Bid/Offer bbo_sub = {"type": "bbo", "coin": coin} new_info.subscribe(bbo_sub, callback) logging.info(f"Subscribed to 'bbo' for {coin}.") # Subscribe to Live Trades trades_sub = {"type": "trades", "coin": coin} new_info.subscribe(trades_sub, callback) logging.info(f"Subscribed to 'trades' for {coin}.") logging.info("WebSocket connected and all subscriptions sent.") return new_info except Exception as e: log_error(f"Failed to connect to WebSocket: {e}") return None info = connect_and_subscribe() if info is None: logging.critical("Initial WebSocket connection failed. Exiting process.") log_error("Initial WebSocket connection failed. Exiting process.", include_traceback=False) time.sleep(10) # Wait before letting the process manager restart it return logging.info("Starting Combined (BBO + Trades) live price feed process.") try: while True: # --- Watchdog Logic --- time.sleep(15) # Check the connection every 15 seconds if not info.ws_manager.is_alive(): error_msg = "WebSocket connection lost. Attempting to reconnect..." logging.warning(error_msg) log_error(error_msg, include_traceback=False) # Log it to the file try: info.ws_manager.stop() # Clean up old manager except Exception as e: log_error(f"Error stopping old ws_manager: {e}") info = connect_and_subscribe() if info is None: logging.error("Reconnect failed, will retry in 15s.") else: logging.info("Successfully reconnected to WebSocket.") else: logging.debug("Watchdog check: WebSocket connection is active.") except KeyboardInterrupt: logging.info("Stopping WebSocket listener...") except Exception as e: log_error(f"Live Market Feed process crashed: {e}") finally: if info and info.ws_manager: info.ws_manager.stop() logging.info("Combined Listener stopped.")