import json import logging import os import sys import time import subprocess import multiprocessing import schedule import sqlite3 import pandas as pd from datetime import datetime, timezone import importlib # --- REMOVED: import signal --- # --- REMOVED: from queue import Empty --- from logging_utils import setup_logging # --- Using the new high-performance WebSocket utility for live prices --- from live_market_utils import start_live_feed # --- Import the base class for type hinting (optional but good practice) --- from strategies.base_strategy import BaseStrategy # --- Configuration --- WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"] LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py" RESAMPLER_SCRIPT = "resampler.py" # --- REMOVED: Market Cap Fetcher --- # --- REMOVED: trade_executor.py is no longer a script --- DASHBOARD_DATA_FETCHER_SCRIPT = "dashboard_data_fetcher.py" STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json") DB_PATH = os.path.join("_data", "market_data.db") # --- REMOVED: Market Cap File --- LOGS_DIR = "_logs" TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json") def format_market_cap(mc_value): """Formats a large number into a human-readable market cap string.""" if not isinstance(mc_value, (int, float)) or mc_value == 0: return "N/A" if mc_value >= 1_000_000_000_000: return f"${mc_value / 1_000_000_000_000:.2f}T" if mc_value >= 1_000_000_000: return f"${mc_value / 1_000_000_000:.2f}B" if mc_value >= 1_000_000: return f"${mc_value / 1_000_000:.2f}M" return f"${mc_value:,.2f}" def run_live_candle_fetcher(): """Target function to run the live_candle_fetcher.py script in a resilient loop.""" # --- GRACEFUL SHUTDOWN HANDLER --- import signal shutdown_requested = False def handle_shutdown_signal(signum, frame): nonlocal shutdown_requested # Use print here as logging may not be set up print(f"[CandleFetcher] Shutdown signal ({signum}) received. Will stop after current run.") shutdown_requested = True signal.signal(signal.SIGTERM, handle_shutdown_signal) signal.signal(signal.SIGINT, handle_shutdown_signal) # --- END GRACEFUL SHUTDOWN HANDLER --- log_file = os.path.join(LOGS_DIR, "live_candle_fetcher.log") while not shutdown_requested: # <-- MODIFIED process = None try: with open(log_file, 'a') as f: command = [sys.executable, LIVE_CANDLE_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] f.write(f"\n--- Starting {LIVE_CANDLE_FETCHER_SCRIPT} at {datetime.now()} ---\n") # Use Popen instead of run to be non-blocking process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT) # Poll the process and check for shutdown request while process.poll() is None and not shutdown_requested: time.sleep(0.5) # Poll every 500ms if shutdown_requested and process.poll() is None: print(f"[CandleFetcher] Terminating subprocess {LIVE_CANDLE_FETCHER_SCRIPT}...") process.terminate() # Terminate the child script process.wait() # Wait for it to exit print(f"[CandleFetcher] Subprocess terminated.") except (subprocess.CalledProcessError, Exception) as e: if shutdown_requested: break # Don't restart if we're shutting down with open(log_file, 'a') as f: f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") f.write(f"Live candle fetcher failed: {e}. Restarting...\n") time.sleep(5) if shutdown_requested: break # Exit outer loop print("[CandleFetcher] Live candle fetcher shutting down.") def run_resampler_job(timeframes_to_generate: list): """Defines the job for the resampler, redirecting output to a log file.""" log_file = os.path.join(LOGS_DIR, "resampler.log") try: command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--timeframes"] + timeframes_to_generate + ["--log-level", "normal"] with open(log_file, 'a') as f: f.write(f"\n--- Starting resampler.py job at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except Exception as e: with open(log_file, 'a') as f: f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n") f.write(f"Failed to run resampler.py job: {e}\n") def resampler_scheduler(timeframes_to_generate: list): """Schedules the resampler.py script.""" # --- GRACEFUL SHUTDOWN HANDLER --- import signal shutdown_requested = False def handle_shutdown_signal(signum, frame): nonlocal shutdown_requested try: logging.info(f"Shutdown signal ({signum}) received. Exiting loop...") except NameError: print(f"[ResamplerScheduler] Shutdown signal ({signum}) received. Exiting loop...") shutdown_requested = True signal.signal(signal.SIGTERM, handle_shutdown_signal) signal.signal(signal.SIGINT, handle_shutdown_signal) # --- END GRACEFUL SHUTDOWN HANDLER --- setup_logging('off', 'ResamplerScheduler') run_resampler_job(timeframes_to_generate) # Schedule to run every minute at the :01 second mark schedule.every().minute.at(":01").do(run_resampler_job, timeframes_to_generate=timeframes_to_generate) logging.info("Resampler scheduled to run every minute at :01.") while not shutdown_requested: # <-- MODIFIED schedule.run_pending() time.sleep(0.5) # Check every 500ms to not miss the scheduled time and be responsive logging.info("ResamplerScheduler shutting down.") # --- REMOVED: run_market_cap_fetcher_job function --- # --- REMOVED: market_cap_fetcher_scheduler function --- def run_trade_executor(order_execution_queue: multiprocessing.Queue): """ Target function to run the TradeExecutor class in a resilient loop. It now consumes from the order_execution_queue. """ # --- GRACEFUL SHUTDOWN HANDLER --- import signal def handle_shutdown_signal(signum, frame): # We can just raise KeyboardInterrupt, as it's handled below logging.info(f"Shutdown signal ({signum}) received. Initiating graceful exit...") raise KeyboardInterrupt signal.signal(signal.SIGTERM, handle_shutdown_signal) # --- END GRACEFUL SHUTDOWN HANDLER --- log_file_path = os.path.join(LOGS_DIR, "trade_executor.log") try: sys.stdout = open(log_file_path, 'a', buffering=1) sys.stderr = sys.stdout except Exception as e: print(f"Failed to open log file for TradeExecutor: {e}") setup_logging('normal', f"TradeExecutor") logging.info("\n--- Starting Trade Executor process ---") while True: try: from trade_executor import TradeExecutor executor = TradeExecutor(log_level="normal", order_execution_queue=order_execution_queue) # --- REVERTED: Call executor.run() directly --- executor.run() except KeyboardInterrupt: logging.info("Trade Executor interrupted. Exiting.") return except Exception as e: logging.error(f"Trade Executor failed: {e}. Restarting...\n", exc_info=True) time.sleep(10) def run_position_manager(trade_signal_queue: multiprocessing.Queue, order_execution_queue: multiprocessing.Queue): """ Target function to run the PositionManager class in a resilient loop. Consumes from trade_signal_queue, produces for order_execution_queue. """ # --- GRACEFUL SHUTDOWN HANDLER --- import signal def handle_shutdown_signal(signum, frame): # Raise KeyboardInterrupt, as it's handled by the loop logging.info(f"Shutdown signal ({signum}) received. Initiating graceful exit...") raise KeyboardInterrupt signal.signal(signal.SIGTERM, handle_shutdown_signal) # --- END GRACEFUL SHUTDOWN HANDLER --- log_file_path = os.path.join(LOGS_DIR, "position_manager.log") try: sys.stdout = open(log_file_path, 'a', buffering=1) sys.stderr = sys.stdout except Exception as e: print(f"Failed to open log file for PositionManager: {e}") setup_logging('normal', f"PositionManager") logging.info("\n--- Starting Position Manager process ---") while True: try: from position_manager import PositionManager manager = PositionManager( log_level="normal", trade_signal_queue=trade_signal_queue, order_execution_queue=order_execution_queue ) # --- REVERTED: Call manager.run() directly --- manager.run() except KeyboardInterrupt: logging.info("Position Manager interrupted. Exiting.") return except Exception as e: logging.error(f"Position Manager failed: {e}. Restarting...\n", exc_info=True) time.sleep(10) def run_strategy(strategy_name: str, config: dict, trade_signal_queue: multiprocessing.Queue): """ This function BECOMES the strategy runner. It is executed as a separate process and pushes signals to the shared queue. """ # These imports only happen in the new, lightweight process import importlib import os import sys import time import logging import signal # <-- ADDED from logging_utils import setup_logging from strategies.base_strategy import BaseStrategy # --- GRACEFUL SHUTDOWN HANDLER --- def handle_shutdown_signal(signum, frame): # Raise KeyboardInterrupt, as it's handled by the loop try: logging.info(f"Shutdown signal ({signum}) received. Initiating graceful exit...") except NameError: print(f"[Strategy-{strategy_name}] Shutdown signal ({signum}) received. Initiating graceful exit...") raise KeyboardInterrupt signal.signal(signal.SIGTERM, handle_shutdown_signal) # --- END GRACEFUL SHUTDOWN HANDLER --- # --- Setup logging to file for this specific process --- log_file_path = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log") try: sys.stdout = open(log_file_path, 'a', buffering=1) # 1 = line buffering sys.stderr = sys.stdout except Exception as e: print(f"Failed to open log file for {strategy_name}: {e}") setup_logging('normal', f"Strategy-{strategy_name}") while True: try: logging.info(f"--- Starting strategy '{strategy_name}' ---") if 'class' not in config: logging.error(f"Strategy config for '{strategy_name}' is missing the 'class' key. Exiting.") return module_path, class_name = config['class'].rsplit('.', 1) module = importlib.import_module(module_path) StrategyClass = getattr(module, class_name) strategy = StrategyClass(strategy_name, config['parameters'], trade_signal_queue) if config.get("is_event_driven", False): logging.info(f"Starting EVENT-DRIVEN logic loop...") strategy.run_event_loop() # This is a blocking call else: logging.info(f"Starting POLLING logic loop...") strategy.run_polling_loop() # This is the original blocking call # --- REVERTED: Added back simple KeyboardInterrupt handler --- except KeyboardInterrupt: logging.info(f"Strategy {strategy_name} process stopping.") return except Exception as e: # --- REVERTED: Removed specific check for KeyboardInterrupt --- logging.error(f"Strategy '{strategy_name}' failed: {e}", exc_info=True) logging.info("Restarting strategy in 10 seconds...") time.sleep(10) def run_dashboard_data_fetcher(): """Target function to run the dashboard_data_fetcher.py script.""" # --- GRACEFUL SHUTDOWN HANDLER --- import signal def handle_shutdown_signal(signum, frame): # Raise KeyboardInterrupt, as it's handled by the loop try: logging.info(f"Shutdown signal ({signum}) received. Initiating graceful exit...") except NameError: print(f"[DashboardDataFetcher] Shutdown signal ({signum}) received. Initiating graceful exit...") raise KeyboardInterrupt signal.signal(signal.SIGTERM, handle_shutdown_signal) # --- END GRACEFUL SHUTDOWN HANDLER --- log_file = os.path.join(LOGS_DIR, "dashboard_data_fetcher.log") while True: try: with open(log_file, 'a') as f: f.write(f"\n--- Starting Dashboard Data Fetcher at {datetime.now()} ---\n") subprocess.run([sys.executable, DASHBOARD_DATA_FETCHER_SCRIPT, "--log-level", "normal"], check=True, stdout=f, stderr=subprocess.STDOUT) except KeyboardInterrupt: # --- MODIFIED: Added to catch interrupt --- logging.info("Dashboard Data Fetcher stopping.") break except (subprocess.CalledProcessError, Exception) as e: with open(log_file, 'a') as f: f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") f.write(f"Dashboard Data Fetcher failed: {e}. Restarting...\n") time.sleep(10) class MainApp: def __init__(self, coins_to_watch: list, processes: dict, strategy_configs: dict, shared_prices: dict): self.watched_coins = coins_to_watch self.shared_prices = shared_prices self.prices = {} # --- REMOVED: self.market_caps --- self.open_positions = {} self.background_processes = processes self.process_status = {} self.strategy_configs = strategy_configs self.strategy_statuses = {} def read_prices(self): """Reads the latest prices directly from the shared memory dictionary.""" try: # --- FIX: Use .copy() for thread-safe iteration --- self.prices = self.shared_prices.copy() except Exception as e: logging.debug(f"Could not read from shared prices dict: {e}") # --- REMOVED: read_market_caps method --- def read_strategy_statuses(self): """Reads the status JSON file for each enabled strategy.""" enabled_statuses = {} for name, config in self.strategy_configs.items(): if config.get("enabled", False): status_file = os.path.join("_data", f"strategy_status_{name}.json") if os.path.exists(status_file): try: with open(status_file, 'r', encoding='utf-8') as f: enabled_statuses[name] = json.load(f) except (IOError, json.JSONDecodeError): enabled_statuses[name] = {"error": "Could not read status file."} else: enabled_statuses[name] = {"current_signal": "Initializing..."} self.strategy_statuses = enabled_statuses def read_executor_status(self): """Reads the live status file from the trade executor.""" if os.path.exists(TRADE_EXECUTOR_STATUS_FILE): try: with open(TRADE_EXECUTOR_STATUS_FILE, 'r', encoding='utf-8') as f: # --- FIX: Read the 'open_positions' key from the file --- status_data = json.load(f) self.open_positions = status_data.get('open_positions', {}) except (IOError, json.JSONDecodeError): logging.debug("Could not read trade executor status file.") else: self.open_positions = {} def check_process_status(self): """Checks if the background processes are still running.""" for name, process in self.background_processes.items(): self.process_status[name] = "Running" if process.is_alive() else "STOPPED" def _format_price(self, price_val, width=10): """Helper function to format prices for the dashboard.""" try: price_float = float(price_val) if price_float < 1: price_str = f"{price_float:>{width}.6f}" elif price_float < 100: price_str = f"{price_float:>{width}.4f}" else: price_str = f"{price_float:>{width}.2f}" except (ValueError, TypeError): price_str = f"{'Loading...':>{width}}" return price_str def display_dashboard(self): """Displays a formatted dashboard with side-by-side tables.""" print("\x1b[H\x1b[J", end="") # Clear screen left_table_lines = ["--- Market Dashboard ---"] # --- MODIFIED: Adjusted width for new columns --- left_table_width = 65 left_table_lines.append("-" * left_table_width) # --- MODIFIED: Replaced Market Cap with Gap --- left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Best Bid':>10} | {'Live Price':>10} | {'Best Ask':>10} | {'Gap':>10} |") left_table_lines.append("-" * left_table_width) for i, coin in enumerate(self.watched_coins, 1): # --- MODIFIED: Fetch all three price types --- mid_price = self.prices.get(coin, "Loading...") bid_price = self.prices.get(f"{coin}_bid", "Loading...") ask_price = self.prices.get(f"{coin}_ask", "Loading...") # --- MODIFIED: Use the new formatting helper --- formatted_mid = self._format_price(mid_price) formatted_bid = self._format_price(bid_price) formatted_ask = self._format_price(ask_price) # --- MODIFIED: Calculate gap --- gap_str = f"{'Loading...':>10}" try: # Calculate the spread gap_val = float(ask_price) - float(bid_price) # Format gap with high precision, similar to price if gap_val < 1: gap_str = f"{gap_val:>{10}.6f}" else: gap_str = f"{gap_val:>{10}.4f}" except (ValueError, TypeError): pass # Keep 'Loading...' # --- REMOVED: Market Cap logic --- # --- MODIFIED: Print all price columns including gap --- left_table_lines.append(f"{i:<2} | {coin:^6} | {formatted_bid} | {formatted_mid} | {formatted_ask} | {gap_str} |") left_table_lines.append("-" * left_table_width) right_table_lines = ["--- Strategy Status ---"] # --- FIX: Adjusted table width after removing parameters --- right_table_width = 105 right_table_lines.append("-" * right_table_width) # --- FIX: Removed 'Parameters' from header --- right_table_lines.append(f"{'#':^2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':^8} | {'Signal Price':>12} | {'Last Change':>17} | {'TF':^5} | {'Size':^8} |") right_table_lines.append("-" * right_table_width) for i, (name, status) in enumerate(self.strategy_statuses.items(), 1): signal = status.get('current_signal', 'N/A') price = status.get('signal_price') price_display = f"{price:.4f}" if isinstance(price, (int, float)) else "-" last_change = status.get('last_signal_change_utc') last_change_display = 'Never' if last_change: dt_utc = datetime.fromisoformat(last_change.replace('Z', '+00:00')).replace(tzinfo=timezone.utc) dt_local = dt_utc.astimezone(None) last_change_display = dt_local.strftime('%Y-%m-%d %H:%M') config_params = self.strategy_configs.get(name, {}).get('parameters', {}) # --- FIX: Read coin/size from status file first, fallback to config --- coin = status.get('coin', config_params.get('coin', 'N/A')) # --- FIX: Handle nested 'coins_to_copy' logic for size --- # --- MODIFIED: Read 'size' from status first, then config, then 'Multi' --- size = status.get('size') if not size: if 'coins_to_copy' in config_params: size = 'Multi' else: size = config_params.get('size', 'N/A') timeframe = config_params.get('timeframe', 'N/A') # --- FIX: Removed parameter string logic --- # --- FIX: Removed 'params_str' from the formatted line --- size_display = f"{size:>8}" if isinstance(size, (int, float)): # --- MODIFIED: More flexible size formatting --- if size < 0.0001: size_display = f"{size:>8.6f}" elif size < 1: size_display = f"{size:>8.4f}" else: size_display = f"{size:>8.2f}" # --- END NEW LOGIC --- right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size_display} |") right_table_lines.append("-" * right_table_width) output_lines = [] max_rows = max(len(left_table_lines), len(right_table_lines)) separator = " " indent = " " * 10 for i in range(max_rows): left_part = left_table_lines[i] if i < len(left_table_lines) else " " * left_table_width right_part = indent + right_table_lines[i] if i < len(right_table_lines) else "" output_lines.append(f"{left_part}{separator}{right_part}") output_lines.append("\n--- Open Positions ---") pos_table_width = 100 output_lines.append("-" * pos_table_width) output_lines.append(f"{'Account':<10} | {'Coin':<6} | {'Size':>15} | {'Entry Price':>12} | {'Mark Price':>12} | {'PNL':>15} | {'Leverage':>10} |") output_lines.append("-" * pos_table_width) # --- FIX: Correctly read and display open positions --- if not self.open_positions: output_lines.append(f"{'No open positions.':^{pos_table_width}}") else: for account, positions in self.open_positions.items(): if not positions: continue for coin, pos in positions.items(): try: size_f = float(pos.get('size', 0)) entry_f = float(pos.get('entry_price', 0)) mark_f = float(self.prices.get(coin, 0)) pnl_f = (mark_f - entry_f) * size_f if size_f > 0 else (entry_f - mark_f) * abs(size_f) lev = pos.get('leverage', 1) size_str = f"{size_f:>{15}.5f}" entry_str = f"{entry_f:>{12}.2f}" mark_str = f"{mark_f:>{12}.2f}" pnl_str = f"{pnl_f:>{15}.2f}" lev_str = f"{lev}x" output_lines.append(f"{account:<10} | {coin:<6} | {size_str} | {entry_str} | {mark_str} | {pnl_str} | {lev_str:>10} |") except (ValueError, TypeError): output_lines.append(f"{account:<10} | {coin:<6} | {'Error parsing data...':^{pos_table_width-20}} |") output_lines.append("-" * pos_table_width) final_output = "\n".join(output_lines) print(final_output) sys.stdout.flush() def run(self): """Main loop to read data, display dashboard, and check processes.""" while True: self.read_prices() # --- REMOVED: self.read_market_caps() --- self.read_strategy_statuses() self.read_executor_status() # --- REMOVED: self.check_process_status() --- self.display_dashboard() time.sleep(0.5) if __name__ == "__main__": setup_logging('normal', 'MainApp') if not os.path.exists(LOGS_DIR): os.makedirs(LOGS_DIR) processes = {} # --- REVERTED: Removed process groups --- try: with open(STRATEGY_CONFIG_FILE, 'r') as f: strategy_configs = json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}") sys.exit(1) # --- FIX: Hardcoded timeframes --- required_timeframes = [ "3m", "5m", "15m", "30m", "1h", "2h", "4h", "8h", "12h", "1d", "3d", "1w", "1M", "148m", "37m" ] logging.info(f"Using fixed timeframes for resampler: {required_timeframes}") with multiprocessing.Manager() as manager: shared_prices = manager.dict() # --- FIX: Create TWO queues --- trade_signal_queue = manager.Queue() order_execution_queue = manager.Queue() # --- REVERTED: All processes are daemon=True and in one dict --- # --- FIX: Pass WATCHED_COINS to the start_live_feed process --- # --- MODIFICATION: Set log level back to 'off' --- processes["Live Market Feed"] = multiprocessing.Process( target=start_live_feed, args=(shared_prices, WATCHED_COINS, 'off'), daemon=True ) processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True) processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True) # --- REMOVED: Market Cap Fetcher Process --- processes["Dashboard Data"] = multiprocessing.Process(target=run_dashboard_data_fetcher, daemon=True) processes["Position Manager"] = multiprocessing.Process( target=run_position_manager, args=(trade_signal_queue, order_execution_queue), daemon=True ) processes["Trade Executor"] = multiprocessing.Process( target=run_trade_executor, args=(order_execution_queue,), daemon=True ) for name, config in strategy_configs.items(): if config.get("enabled", False): if 'class' not in config: logging.error(f"Strategy '{name}' is missing 'class' key. Skipping.") continue proc = multiprocessing.Process(target=run_strategy, args=(name, config, trade_signal_queue), daemon=True) processes[f"Strategy: {name}"] = proc # Add to strategy group # --- REVERTED: Removed combined dict --- for name, proc in processes.items(): logging.info(f"Starting process '{name}'...") proc.start() time.sleep(3) app = MainApp(coins_to_watch=WATCHED_COINS, processes=processes, strategy_configs=strategy_configs, shared_prices=shared_prices) try: app.run() except KeyboardInterrupt: # --- MODIFIED: Staged shutdown --- logging.info("Shutting down...") strategy_procs = {} other_procs = {} for name, proc in processes.items(): if name.startswith("Strategy:"): strategy_procs[name] = proc else: other_procs[name] = proc # --- 1. Terminate strategy processes --- logging.info("Shutting down strategy processes first...") for name, proc in strategy_procs.items(): if proc.is_alive(): logging.info(f"Terminating process: '{name}'...") proc.terminate() # --- 2. Wait for 5 seconds --- logging.info("Waiting 5 seconds for strategies to close...") time.sleep(5) # --- 3. Terminate all other processes --- logging.info("Shutting down remaining core processes...") for name, proc in other_procs.items(): if proc.is_alive(): logging.info(f"Terminating process: '{name}'...") proc.terminate() # --- 4. Join all processes (strategies and others) --- logging.info("Waiting for all processes to join...") for name, proc in processes.items(): # Iterate over the original dict to get all if proc.is_alive(): logging.info(f"Waiting for process '{name}' to join...") proc.join(timeout=5) # Wait up to 5 seconds if proc.is_alive(): # If it's still alive, force kill logging.warning(f"Process '{name}' did not terminate, forcing kill.") proc.kill() # --- END MODIFIED --- logging.info("Shutdown complete.") sys.exit(0)