diff --git a/live_market_utils.py b/live_market_utils.py index ae0eab3..bab9129 100644 --- a/live_market_utils.py +++ b/live_market_utils.py @@ -3,6 +3,7 @@ import json import time import os import traceback +import sys from hyperliquid.info import Info from hyperliquid.utils import constants @@ -28,38 +29,114 @@ def log_error(error_message: str, include_traceback: bool = True): except Exception: print(f"CRITICAL: Failed to write to error log file: {error_message}", file=sys.stderr) + def on_message(message, shared_prices_dict): """ - Callback function to process incoming 'allMids' messages and update the - shared memory dictionary directly. + Callback function to process incoming WebSocket messages for 'bbo' and 'trades' + and update the shared memory dictionary. """ try: - if message.get("channel") == "allMids": - new_prices = message.get("data", {}).get("mids", {}) - shared_prices_dict.update(new_prices) + logging.debug(f"Received WebSocket message: {message}") + channel = message.get("channel") + + # --- Parser 1: Handle Best Bid/Offer messages --- + if channel == "bbo": + data = message.get("data") + if not data: + logging.warning("BBO message received with no data.") + return + + coin = data.get("coin") + if not coin: + logging.warning("BBO data received with no coin identifier.") + return + + bid_ask_data = data.get("bbo") + + if not bid_ask_data or not isinstance(bid_ask_data, list) or len(bid_ask_data) < 2: + logging.warning(f"[{coin}] Received BBO message with invalid 'bbo' array: {bid_ask_data}") + return + + try: + bid_price_str = bid_ask_data[0].get('px') + ask_price_str = bid_ask_data[1].get('px') + + if not bid_price_str or not ask_price_str: + logging.warning(f"[{coin}] BBO data missing 'px' field.") + return + + bid_price = float(bid_price_str) + ask_price = float(ask_price_str) + + # Update the shared dictionary for Bid and Ask + shared_prices_dict[f"{coin}_bid"] = bid_price + shared_prices_dict[f"{coin}_ask"] = ask_price + + logging.info(f"Updated {coin} (BBO): Bid={bid_price:.4f}, Ask={ask_price:.4f}") + + except (ValueError, TypeError, IndexError) as e: + logging.error(f"[{coin}] Error parsing BBO data: {e}. Data: {bid_ask_data}") + + # --- Parser 2: Handle Live Trade messages --- + elif channel == "trades": + trade_list = message.get("data") + + if not trade_list or not isinstance(trade_list, list) or len(trade_list) == 0: + logging.warning(f"Received 'trades' message with invalid data: {trade_list}") + return + + # Process all trades in the batch + for trade in trade_list: + try: + coin = trade.get("coin") + price_str = trade.get("px") + + if not coin or not price_str: + logging.warning(f"Trade data missing 'coin' or 'px': {trade}") + continue + + price = float(price_str) + + # Update the shared dictionary for the "Live Price" column + shared_prices_dict[coin] = price + + logging.info(f"Updated {coin} (Live Price) to last trade: {price:.4f}") + + except (ValueError, TypeError) as e: + logging.error(f"Error parsing trade data: {e}. Data: {trade}") + except Exception as e: log_error(f"Error in WebSocket on_message: {e}") -def start_live_feed(shared_prices_dict, log_level='off'): +def start_live_feed(shared_prices_dict, coins_to_watch: list, log_level='off'): """ - Main function for the WebSocket process. It takes a shared dictionary - and continuously feeds it with live market data. - Includes a watchdog to auto-reconnect on failure. + Main function for the WebSocket process. + Subscribes to BOTH 'bbo' and 'trades' for all watched coins. """ - setup_logging(log_level, 'LiveMarketFeed') + setup_logging(log_level, 'LiveMarketFeed_Combined') info = None callback = lambda msg: on_message(msg, shared_prices_dict) def connect_and_subscribe(): - """Establishes a new WebSocket connection and subscribes to allMids.""" + """Establishes a new WebSocket connection and subscribes to both streams.""" try: logging.info("Connecting to Hyperliquid WebSocket...") - # Ensure skip_ws=False to create the ws_manager new_info = Info(constants.MAINNET_API_URL, skip_ws=False) - subscription = {"type": "allMids"} - new_info.subscribe(subscription, callback) - logging.info("WebSocket connected and subscribed to 'allMids'.") + + # --- MODIFIED: Subscribe to 'bbo' AND 'trades' for each coin --- + for coin in coins_to_watch: + # Subscribe to Best Bid/Offer + bbo_sub = {"type": "bbo", "coin": coin} + new_info.subscribe(bbo_sub, callback) + logging.info(f"Subscribed to 'bbo' for {coin}.") + + # Subscribe to Live Trades + trades_sub = {"type": "trades", "coin": coin} + new_info.subscribe(trades_sub, callback) + logging.info(f"Subscribed to 'trades' for {coin}.") + + logging.info("WebSocket connected and all subscriptions sent.") return new_info except Exception as e: log_error(f"Failed to connect to WebSocket: {e}") @@ -67,24 +144,28 @@ def start_live_feed(shared_prices_dict, log_level='off'): info = connect_and_subscribe() - logging.info("Starting live price feed process. Press Ctrl+C in main app to stop.") + if info is None: + logging.critical("Initial WebSocket connection failed. Exiting process.") + log_error("Initial WebSocket connection failed. Exiting process.", include_traceback=False) + time.sleep(10) # Wait before letting the process manager restart it + return + + logging.info("Starting Combined (BBO + Trades) live price feed process.") try: while True: # --- Watchdog Logic --- time.sleep(15) # Check the connection every 15 seconds - # --- FIX: Changed 'is_running()' to the correct method 'is_alive()' --- - if info is None or not info.ws_manager.is_alive(): - error_msg = "WebSocket connection lost or not running. Attempting to reconnect..." + if not info.ws_manager.is_alive(): + error_msg = "WebSocket connection lost. Attempting to reconnect..." logging.warning(error_msg) log_error(error_msg, include_traceback=False) # Log it to the file - if info and info.ws_manager: # Check if ws_manager exists before stopping - try: - info.ws_manager.stop() # Clean up old manager - except Exception as e: - log_error(f"Error stopping old ws_manager: {e}") + try: + info.ws_manager.stop() # Clean up old manager + except Exception as e: + log_error(f"Error stopping old ws_manager: {e}") info = connect_and_subscribe() @@ -102,5 +183,5 @@ def start_live_feed(shared_prices_dict, log_level='off'): finally: if info and info.ws_manager: info.ws_manager.stop() - logging.info("Listener stopped.") + logging.info("Combined Listener stopped.") diff --git a/main_app.py b/main_app.py index f64f7ce..345013d 100644 --- a/main_app.py +++ b/main_app.py @@ -23,12 +23,12 @@ from strategies.base_strategy import BaseStrategy WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"] LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py" RESAMPLER_SCRIPT = "resampler.py" -MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py" +# --- REMOVED: Market Cap Fetcher --- # --- REMOVED: trade_executor.py is no longer a script --- DASHBOARD_DATA_FETCHER_SCRIPT = "dashboard_data_fetcher.py" STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json") DB_PATH = os.path.join("_data", "market_data.db") -MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json") +# --- REMOVED: Market Cap File --- LOGS_DIR = "_logs" TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json") @@ -145,51 +145,9 @@ def resampler_scheduler(timeframes_to_generate: list): logging.info("ResamplerScheduler shutting down.") -def run_market_cap_fetcher_job(): - """Defines the job for the market cap fetcher, redirecting output.""" - log_file = os.path.join(LOGS_DIR, "market_cap_fetcher.log") - try: - command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--log-level", "off"] - with open(log_file, 'a') as f: - f.write(f"\n--- Starting {MARKET_CAP_FETCHER_SCRIPT} job at {datetime.now()} ---\n") - subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) - except Exception as e: - with open(log_file, 'a') as f: - f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n") - f.write(f"Failed to run {MARKET_CAP_FETCHER_SCRIPT} job: {e}\n") +# --- REMOVED: run_market_cap_fetcher_job function --- - -def market_cap_fetcher_scheduler(): - """Schedules the market_cap_fetcher.py script to run daily at a specific UTC time.""" - - # --- GRACEFUL SHUTDOWN HANDLER --- - import signal - shutdown_requested = False - - def handle_shutdown_signal(signum, frame): - nonlocal shutdown_requested - try: - logging.info(f"Shutdown signal ({signum}) received. Exiting loop...") - except NameError: - print(f"[MarketCapScheduler] Shutdown signal ({signum}) received. Exiting loop...") - shutdown_requested = True - - signal.signal(signal.SIGTERM, handle_shutdown_signal) - signal.signal(signal.SIGINT, handle_shutdown_signal) - # --- END GRACEFUL SHUTDOWN HANDLER --- - - setup_logging('off', 'MarketCapScheduler') - schedule.every().day.at("00:15", "UTC").do(run_market_cap_fetcher_job) - - while not shutdown_requested: # <-- MODIFIED - schedule.run_pending() - # Sleep for 60 seconds, but check for shutdown flag every second - for _ in range(60): - if shutdown_requested: - break - time.sleep(1) - - logging.info("MarketCapScheduler shutting down.") +# --- REMOVED: market_cap_fetcher_scheduler function --- def run_trade_executor(order_execution_queue: multiprocessing.Queue): @@ -390,7 +348,7 @@ class MainApp: self.watched_coins = coins_to_watch self.shared_prices = shared_prices self.prices = {} - self.market_caps = {} + # --- REMOVED: self.market_caps --- self.open_positions = {} self.background_processes = processes self.process_status = {} @@ -400,23 +358,12 @@ class MainApp: def read_prices(self): """Reads the latest prices directly from the shared memory dictionary.""" try: - self.prices = dict(self.shared_prices) + # --- FIX: Use .copy() for thread-safe iteration --- + self.prices = self.shared_prices.copy() except Exception as e: - logging.debug("Could not read from shared prices dict: {e}") + logging.debug(f"Could not read from shared prices dict: {e}") - def read_market_caps(self): - """Reads the latest market cap summary from its JSON file.""" - if os.path.exists(MARKET_CAP_SUMMARY_FILE): - try: - with open(MARKET_CAP_SUMMARY_FILE, 'r', encoding='utf-8') as f: - summary_data = json.load(f) - - for coin in self.watched_coins: - table_key = f"{coin}_market_cap" - if table_key in summary_data: - self.market_caps[coin] = summary_data[table_key].get('market_cap') - except (json.JSONDecodeError, IOError): - logging.debug("Could not read market cap summary file.") + # --- REMOVED: read_market_caps method --- def read_strategy_statuses(self): """Reads the status JSON file for each enabled strategy.""" @@ -439,7 +386,9 @@ class MainApp: if os.path.exists(TRADE_EXECUTOR_STATUS_FILE): try: with open(TRADE_EXECUTOR_STATUS_FILE, 'r', encoding='utf-8') as f: - self.open_positions = json.load(f) + # --- FIX: Read the 'open_positions' key from the file --- + status_data = json.load(f) + self.open_positions = status_data.get('open_positions', {}) except (IOError, json.JSONDecodeError): logging.debug("Could not read trade executor status file.") else: @@ -450,32 +399,59 @@ class MainApp: for name, process in self.background_processes.items(): self.process_status[name] = "Running" if process.is_alive() else "STOPPED" + def _format_price(self, price_val, width=10): + """Helper function to format prices for the dashboard.""" + try: + price_float = float(price_val) + if price_float < 1: + price_str = f"{price_float:>{width}.6f}" + elif price_float < 100: + price_str = f"{price_float:>{width}.4f}" + else: + price_str = f"{price_float:>{width}.2f}" + except (ValueError, TypeError): + price_str = f"{'Loading...':>{width}}" + return price_str + def display_dashboard(self): """Displays a formatted dashboard with side-by-side tables.""" print("\x1b[H\x1b[J", end="") # Clear screen left_table_lines = ["--- Market Dashboard ---"] - left_table_width = 44 + # --- MODIFIED: Adjusted width for new columns --- + left_table_width = 65 left_table_lines.append("-" * left_table_width) - left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Live Price':>10} | {'Market Cap':>15} |") + # --- MODIFIED: Replaced Market Cap with Gap --- + left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Best Bid':>10} | {'Live Price':>10} | {'Best Ask':>10} | {'Gap':>10} |") left_table_lines.append("-" * left_table_width) for i, coin in enumerate(self.watched_coins, 1): - price_str = self.prices.get(coin, "Loading...") - # Format the price string - try: - price_float = float(price_str) - if price_float < 1: - price_str = f"{price_float:>10.6f}" - elif price_float < 100: - price_str = f"{price_float:>10.4f}" - else: - price_str = f"{price_float:>10.2f}" - except (ValueError, TypeError): - price_str = f"{'Loading...':>10}" + # --- MODIFIED: Fetch all three price types --- + mid_price = self.prices.get(coin, "Loading...") + bid_price = self.prices.get(f"{coin}_bid", "Loading...") + ask_price = self.prices.get(f"{coin}_ask", "Loading...") - market_cap = self.market_caps.get(coin) - formatted_mc = format_market_cap(market_cap) - left_table_lines.append(f"{i:<2} | {coin:^6} | {price_str} | {formatted_mc:>15} |") + # --- MODIFIED: Use the new formatting helper --- + formatted_mid = self._format_price(mid_price) + formatted_bid = self._format_price(bid_price) + formatted_ask = self._format_price(ask_price) + + # --- MODIFIED: Calculate gap --- + gap_str = f"{'Loading...':>10}" + try: + # Calculate the spread + gap_val = float(ask_price) - float(bid_price) + # Format gap with high precision, similar to price + if gap_val < 1: + gap_str = f"{gap_val:>{10}.6f}" + else: + gap_str = f"{gap_val:>{10}.4f}" + except (ValueError, TypeError): + pass # Keep 'Loading...' + + # --- REMOVED: Market Cap logic --- + + # --- MODIFIED: Print all price columns including gap --- + left_table_lines.append(f"{i:<2} | {coin:^6} | {formatted_bid} | {formatted_mid} | {formatted_ask} | {gap_str} |") left_table_lines.append("-" * left_table_width) right_table_lines = ["--- Strategy Status ---"] @@ -502,10 +478,13 @@ class MainApp: coin = status.get('coin', config_params.get('coin', 'N/A')) # --- FIX: Handle nested 'coins_to_copy' logic for size --- - if 'coins_to_copy' in config_params: - size = status.get('size', 'Multi') - else: - size = config_params.get('size', 'N/A') + # --- MODIFIED: Read 'size' from status first, then config, then 'Multi' --- + size = status.get('size') + if not size: + if 'coins_to_copy' in config_params: + size = 'Multi' + else: + size = config_params.get('size', 'N/A') timeframe = config_params.get('timeframe', 'N/A') @@ -515,10 +494,16 @@ class MainApp: size_display = f"{size:>8}" if isinstance(size, (int, float)): - size_display = f"{size:>8.4f}" # Format size to 4 decimal places + # --- MODIFIED: More flexible size formatting --- + if size < 0.0001: + size_display = f"{size:>8.6f}" + elif size < 1: + size_display = f"{size:>8.4f}" + else: + size_display = f"{size:>8.2f}" # --- END NEW LOGIC --- - right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size:>8} |") + right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size_display} |") right_table_lines.append("-" * right_table_width) output_lines = [] @@ -536,7 +521,32 @@ class MainApp: output_lines.append(f"{'Account':<10} | {'Coin':<6} | {'Size':>15} | {'Entry Price':>12} | {'Mark Price':>12} | {'PNL':>15} | {'Leverage':>10} |") output_lines.append("-" * pos_table_width) - # --- REMOVED: Background Processes section --- + # --- FIX: Correctly read and display open positions --- + if not self.open_positions: + output_lines.append(f"{'No open positions.':^{pos_table_width}}") + else: + for account, positions in self.open_positions.items(): + if not positions: + continue + for coin, pos in positions.items(): + try: + size_f = float(pos.get('size', 0)) + entry_f = float(pos.get('entry_price', 0)) + mark_f = float(self.prices.get(coin, 0)) + pnl_f = (mark_f - entry_f) * size_f if size_f > 0 else (entry_f - mark_f) * abs(size_f) + lev = pos.get('leverage', 1) + + size_str = f"{size_f:>{15}.5f}" + entry_str = f"{entry_f:>{12}.2f}" + mark_str = f"{mark_f:>{12}.2f}" + pnl_str = f"{pnl_f:>{15}.2f}" + lev_str = f"{lev}x" + + output_lines.append(f"{account:<10} | {coin:<6} | {size_str} | {entry_str} | {mark_str} | {pnl_str} | {lev_str:>10} |") + except (ValueError, TypeError): + output_lines.append(f"{account:<10} | {coin:<6} | {'Error parsing data...':^{pos_table_width-20}} |") + + output_lines.append("-" * pos_table_width) final_output = "\n".join(output_lines) print(final_output) @@ -546,7 +556,7 @@ class MainApp: """Main loop to read data, display dashboard, and check processes.""" while True: self.read_prices() - self.read_market_caps() + # --- REMOVED: self.read_market_caps() --- self.read_strategy_statuses() self.read_executor_status() # --- REMOVED: self.check_process_status() --- @@ -584,10 +594,16 @@ if __name__ == "__main__": # --- REVERTED: All processes are daemon=True and in one dict --- - processes["Live Market Feed"] = multiprocessing.Process(target=start_live_feed, args=(shared_prices, 'off'), daemon=True) + # --- FIX: Pass WATCHED_COINS to the start_live_feed process --- + # --- MODIFICATION: Set log level back to 'off' --- + processes["Live Market Feed"] = multiprocessing.Process( + target=start_live_feed, + args=(shared_prices, WATCHED_COINS, 'off'), + daemon=True + ) processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True) processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True) - processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) + # --- REMOVED: Market Cap Fetcher Process --- processes["Dashboard Data"] = multiprocessing.Process(target=run_dashboard_data_fetcher, daemon=True) processes["Position Manager"] = multiprocessing.Process( @@ -665,3 +681,5 @@ if __name__ == "__main__": logging.info("Shutdown complete.") sys.exit(0) + +