188 lines
7.2 KiB
Python
188 lines
7.2 KiB
Python
import logging
|
|
import json
|
|
import time
|
|
import os
|
|
import traceback
|
|
import sys
|
|
from hyperliquid.info import Info
|
|
from hyperliquid.utils import constants
|
|
|
|
from logging_utils import setup_logging
|
|
|
|
# --- Configuration for standalone error logging ---
|
|
LOGS_DIR = "_logs"
|
|
ERROR_LOG_FILE = os.path.join(LOGS_DIR, "live_market_errors.log")
|
|
|
|
def log_error(error_message: str, include_traceback: bool = True):
|
|
"""A simple, robust file logger for any errors."""
|
|
try:
|
|
if not os.path.exists(LOGS_DIR):
|
|
os.makedirs(LOGS_DIR)
|
|
|
|
with open(ERROR_LOG_FILE, 'a') as f:
|
|
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
|
|
f.write(f"--- ERROR at {timestamp} UTC ---\n")
|
|
f.write(error_message + "\n")
|
|
if include_traceback:
|
|
f.write(traceback.format_exc() + "\n")
|
|
f.write("="*50 + "\n")
|
|
except Exception:
|
|
print(f"CRITICAL: Failed to write to error log file: {error_message}", file=sys.stderr)
|
|
|
|
|
|
def on_message(message, shared_prices_dict):
|
|
"""
|
|
Callback function to process incoming WebSocket messages for 'bbo' and 'trades'
|
|
and update the shared memory dictionary.
|
|
"""
|
|
try:
|
|
logging.debug(f"Received WebSocket message: {message}")
|
|
channel = message.get("channel")
|
|
|
|
# --- Parser 1: Handle Best Bid/Offer messages ---
|
|
if channel == "bbo":
|
|
data = message.get("data")
|
|
if not data:
|
|
logging.warning("BBO message received with no data.")
|
|
return
|
|
|
|
coin = data.get("coin")
|
|
if not coin:
|
|
logging.warning("BBO data received with no coin identifier.")
|
|
return
|
|
|
|
bid_ask_data = data.get("bbo")
|
|
|
|
if not bid_ask_data or not isinstance(bid_ask_data, list) or len(bid_ask_data) < 2:
|
|
logging.warning(f"[{coin}] Received BBO message with invalid 'bbo' array: {bid_ask_data}")
|
|
return
|
|
|
|
try:
|
|
bid_price_str = bid_ask_data[0].get('px')
|
|
ask_price_str = bid_ask_data[1].get('px')
|
|
|
|
if not bid_price_str or not ask_price_str:
|
|
logging.warning(f"[{coin}] BBO data missing 'px' field.")
|
|
return
|
|
|
|
bid_price = float(bid_price_str)
|
|
ask_price = float(ask_price_str)
|
|
|
|
# Update the shared dictionary for Bid and Ask
|
|
shared_prices_dict[f"{coin}_bid"] = bid_price
|
|
shared_prices_dict[f"{coin}_ask"] = ask_price
|
|
|
|
logging.info(f"Updated {coin} (BBO): Bid={bid_price:.4f}, Ask={ask_price:.4f}")
|
|
|
|
except (ValueError, TypeError, IndexError) as e:
|
|
logging.error(f"[{coin}] Error parsing BBO data: {e}. Data: {bid_ask_data}")
|
|
|
|
# --- Parser 2: Handle Live Trade messages ---
|
|
elif channel == "trades":
|
|
trade_list = message.get("data")
|
|
|
|
if not trade_list or not isinstance(trade_list, list) or len(trade_list) == 0:
|
|
logging.warning(f"Received 'trades' message with invalid data: {trade_list}")
|
|
return
|
|
|
|
# Process all trades in the batch
|
|
for trade in trade_list:
|
|
try:
|
|
coin = trade.get("coin")
|
|
price_str = trade.get("px")
|
|
|
|
if not coin or not price_str:
|
|
logging.warning(f"Trade data missing 'coin' or 'px': {trade}")
|
|
continue
|
|
|
|
price = float(price_str)
|
|
|
|
# Update the shared dictionary for the "Live Price" column
|
|
shared_prices_dict[coin] = price
|
|
|
|
logging.info(f"Updated {coin} (Live Price) to last trade: {price:.4f}")
|
|
|
|
except (ValueError, TypeError) as e:
|
|
logging.error(f"Error parsing trade data: {e}. Data: {trade}")
|
|
|
|
except Exception as e:
|
|
log_error(f"Error in WebSocket on_message: {e}")
|
|
|
|
def start_live_feed(shared_prices_dict, coins_to_watch: list, log_level='off'):
|
|
"""
|
|
Main function for the WebSocket process.
|
|
Subscribes to BOTH 'bbo' and 'trades' for all watched coins.
|
|
"""
|
|
setup_logging(log_level, 'LiveMarketFeed_Combined')
|
|
|
|
info = None
|
|
callback = lambda msg: on_message(msg, shared_prices_dict)
|
|
|
|
def connect_and_subscribe():
|
|
"""Establishes a new WebSocket connection and subscribes to both streams."""
|
|
try:
|
|
logging.info("Connecting to Hyperliquid WebSocket...")
|
|
new_info = Info(constants.MAINNET_API_URL, skip_ws=False)
|
|
|
|
# --- MODIFIED: Subscribe to 'bbo' AND 'trades' for each coin ---
|
|
for coin in coins_to_watch:
|
|
# Subscribe to Best Bid/Offer
|
|
bbo_sub = {"type": "bbo", "coin": coin}
|
|
new_info.subscribe(bbo_sub, callback)
|
|
logging.info(f"Subscribed to 'bbo' for {coin}.")
|
|
|
|
# Subscribe to Live Trades
|
|
trades_sub = {"type": "trades", "coin": coin}
|
|
new_info.subscribe(trades_sub, callback)
|
|
logging.info(f"Subscribed to 'trades' for {coin}.")
|
|
|
|
logging.info("WebSocket connected and all subscriptions sent.")
|
|
return new_info
|
|
except Exception as e:
|
|
log_error(f"Failed to connect to WebSocket: {e}")
|
|
return None
|
|
|
|
info = connect_and_subscribe()
|
|
|
|
if info is None:
|
|
logging.critical("Initial WebSocket connection failed. Exiting process.")
|
|
log_error("Initial WebSocket connection failed. Exiting process.", include_traceback=False)
|
|
time.sleep(10) # Wait before letting the process manager restart it
|
|
return
|
|
|
|
logging.info("Starting Combined (BBO + Trades) live price feed process.")
|
|
|
|
try:
|
|
while True:
|
|
# --- Watchdog Logic ---
|
|
time.sleep(15) # Check the connection every 15 seconds
|
|
|
|
if not info.ws_manager.is_alive():
|
|
error_msg = "WebSocket connection lost. Attempting to reconnect..."
|
|
logging.warning(error_msg)
|
|
log_error(error_msg, include_traceback=False) # Log it to the file
|
|
|
|
try:
|
|
info.ws_manager.stop() # Clean up old manager
|
|
except Exception as e:
|
|
log_error(f"Error stopping old ws_manager: {e}")
|
|
|
|
info = connect_and_subscribe()
|
|
|
|
if info is None:
|
|
logging.error("Reconnect failed, will retry in 15s.")
|
|
else:
|
|
logging.info("Successfully reconnected to WebSocket.")
|
|
else:
|
|
logging.debug("Watchdog check: WebSocket connection is active.")
|
|
|
|
except KeyboardInterrupt:
|
|
logging.info("Stopping WebSocket listener...")
|
|
except Exception as e:
|
|
log_error(f"Live Market Feed process crashed: {e}")
|
|
finally:
|
|
if info and info.ws_manager:
|
|
info.ws_manager.stop()
|
|
logging.info("Combined Listener stopped.")
|
|
|