Files
hyper/main_app.py
2025-11-04 13:34:49 +01:00

686 lines
29 KiB
Python

import json
import logging
import os
import sys
import time
import subprocess
import multiprocessing
import schedule
import sqlite3
import pandas as pd
from datetime import datetime, timezone
import importlib
# --- REMOVED: import signal ---
# --- REMOVED: from queue import Empty ---
from logging_utils import setup_logging
# --- Using the new high-performance WebSocket utility for live prices ---
from live_market_utils import start_live_feed
# --- Import the base class for type hinting (optional but good practice) ---
from strategies.base_strategy import BaseStrategy
# --- Configuration ---
WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"]
LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py"
RESAMPLER_SCRIPT = "resampler.py"
# --- REMOVED: Market Cap Fetcher ---
# --- REMOVED: trade_executor.py is no longer a script ---
DASHBOARD_DATA_FETCHER_SCRIPT = "dashboard_data_fetcher.py"
STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json")
DB_PATH = os.path.join("_data", "market_data.db")
# --- REMOVED: Market Cap File ---
LOGS_DIR = "_logs"
TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json")
def format_market_cap(mc_value):
"""Formats a large number into a human-readable market cap string."""
if not isinstance(mc_value, (int, float)) or mc_value == 0:
return "N/A"
if mc_value >= 1_000_000_000_000:
return f"${mc_value / 1_000_000_000_000:.2f}T"
if mc_value >= 1_000_000_000:
return f"${mc_value / 1_000_000_000:.2f}B"
if mc_value >= 1_000_000:
return f"${mc_value / 1_000_000:.2f}M"
return f"${mc_value:,.2f}"
def run_live_candle_fetcher():
"""Target function to run the live_candle_fetcher.py script in a resilient loop."""
# --- GRACEFUL SHUTDOWN HANDLER ---
import signal
shutdown_requested = False
def handle_shutdown_signal(signum, frame):
nonlocal shutdown_requested
# Use print here as logging may not be set up
print(f"[CandleFetcher] Shutdown signal ({signum}) received. Will stop after current run.")
shutdown_requested = True
signal.signal(signal.SIGTERM, handle_shutdown_signal)
signal.signal(signal.SIGINT, handle_shutdown_signal)
# --- END GRACEFUL SHUTDOWN HANDLER ---
log_file = os.path.join(LOGS_DIR, "live_candle_fetcher.log")
while not shutdown_requested: # <-- MODIFIED
process = None
try:
with open(log_file, 'a') as f:
command = [sys.executable, LIVE_CANDLE_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"]
f.write(f"\n--- Starting {LIVE_CANDLE_FETCHER_SCRIPT} at {datetime.now()} ---\n")
# Use Popen instead of run to be non-blocking
process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT)
# Poll the process and check for shutdown request
while process.poll() is None and not shutdown_requested:
time.sleep(0.5) # Poll every 500ms
if shutdown_requested and process.poll() is None:
print(f"[CandleFetcher] Terminating subprocess {LIVE_CANDLE_FETCHER_SCRIPT}...")
process.terminate() # Terminate the child script
process.wait() # Wait for it to exit
print(f"[CandleFetcher] Subprocess terminated.")
except (subprocess.CalledProcessError, Exception) as e:
if shutdown_requested:
break # Don't restart if we're shutting down
with open(log_file, 'a') as f:
f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n")
f.write(f"Live candle fetcher failed: {e}. Restarting...\n")
time.sleep(5)
if shutdown_requested:
break # Exit outer loop
print("[CandleFetcher] Live candle fetcher shutting down.")
def run_resampler_job(timeframes_to_generate: list):
"""Defines the job for the resampler, redirecting output to a log file."""
log_file = os.path.join(LOGS_DIR, "resampler.log")
try:
command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--timeframes"] + timeframes_to_generate + ["--log-level", "normal"]
with open(log_file, 'a') as f:
f.write(f"\n--- Starting resampler.py job at {datetime.now()} ---\n")
subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT)
except Exception as e:
with open(log_file, 'a') as f:
f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n")
f.write(f"Failed to run resampler.py job: {e}\n")
def resampler_scheduler(timeframes_to_generate: list):
"""Schedules the resampler.py script."""
# --- GRACEFUL SHUTDOWN HANDLER ---
import signal
shutdown_requested = False
def handle_shutdown_signal(signum, frame):
nonlocal shutdown_requested
try:
logging.info(f"Shutdown signal ({signum}) received. Exiting loop...")
except NameError:
print(f"[ResamplerScheduler] Shutdown signal ({signum}) received. Exiting loop...")
shutdown_requested = True
signal.signal(signal.SIGTERM, handle_shutdown_signal)
signal.signal(signal.SIGINT, handle_shutdown_signal)
# --- END GRACEFUL SHUTDOWN HANDLER ---
setup_logging('off', 'ResamplerScheduler')
run_resampler_job(timeframes_to_generate)
# Schedule to run every minute at the :01 second mark
schedule.every().minute.at(":01").do(run_resampler_job, timeframes_to_generate=timeframes_to_generate)
logging.info("Resampler scheduled to run every minute at :01.")
while not shutdown_requested: # <-- MODIFIED
schedule.run_pending()
time.sleep(0.5) # Check every 500ms to not miss the scheduled time and be responsive
logging.info("ResamplerScheduler shutting down.")
# --- REMOVED: run_market_cap_fetcher_job function ---
# --- REMOVED: market_cap_fetcher_scheduler function ---
def run_trade_executor(order_execution_queue: multiprocessing.Queue):
"""
Target function to run the TradeExecutor class in a resilient loop.
It now consumes from the order_execution_queue.
"""
# --- GRACEFUL SHUTDOWN HANDLER ---
import signal
def handle_shutdown_signal(signum, frame):
# We can just raise KeyboardInterrupt, as it's handled below
logging.info(f"Shutdown signal ({signum}) received. Initiating graceful exit...")
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, handle_shutdown_signal)
# --- END GRACEFUL SHUTDOWN HANDLER ---
log_file_path = os.path.join(LOGS_DIR, "trade_executor.log")
try:
sys.stdout = open(log_file_path, 'a', buffering=1)
sys.stderr = sys.stdout
except Exception as e:
print(f"Failed to open log file for TradeExecutor: {e}")
setup_logging('normal', f"TradeExecutor")
logging.info("\n--- Starting Trade Executor process ---")
while True:
try:
from trade_executor import TradeExecutor
executor = TradeExecutor(log_level="normal", order_execution_queue=order_execution_queue)
# --- REVERTED: Call executor.run() directly ---
executor.run()
except KeyboardInterrupt:
logging.info("Trade Executor interrupted. Exiting.")
return
except Exception as e:
logging.error(f"Trade Executor failed: {e}. Restarting...\n", exc_info=True)
time.sleep(10)
def run_position_manager(trade_signal_queue: multiprocessing.Queue, order_execution_queue: multiprocessing.Queue):
"""
Target function to run the PositionManager class in a resilient loop.
Consumes from trade_signal_queue, produces for order_execution_queue.
"""
# --- GRACEFUL SHUTDOWN HANDLER ---
import signal
def handle_shutdown_signal(signum, frame):
# Raise KeyboardInterrupt, as it's handled by the loop
logging.info(f"Shutdown signal ({signum}) received. Initiating graceful exit...")
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, handle_shutdown_signal)
# --- END GRACEFUL SHUTDOWN HANDLER ---
log_file_path = os.path.join(LOGS_DIR, "position_manager.log")
try:
sys.stdout = open(log_file_path, 'a', buffering=1)
sys.stderr = sys.stdout
except Exception as e:
print(f"Failed to open log file for PositionManager: {e}")
setup_logging('normal', f"PositionManager")
logging.info("\n--- Starting Position Manager process ---")
while True:
try:
from position_manager import PositionManager
manager = PositionManager(
log_level="normal",
trade_signal_queue=trade_signal_queue,
order_execution_queue=order_execution_queue
)
# --- REVERTED: Call manager.run() directly ---
manager.run()
except KeyboardInterrupt:
logging.info("Position Manager interrupted. Exiting.")
return
except Exception as e:
logging.error(f"Position Manager failed: {e}. Restarting...\n", exc_info=True)
time.sleep(10)
def run_strategy(strategy_name: str, config: dict, trade_signal_queue: multiprocessing.Queue):
"""
This function BECOMES the strategy runner. It is executed as a separate
process and pushes signals to the shared queue.
"""
# These imports only happen in the new, lightweight process
import importlib
import os
import sys
import time
import logging
import signal # <-- ADDED
from logging_utils import setup_logging
from strategies.base_strategy import BaseStrategy
# --- GRACEFUL SHUTDOWN HANDLER ---
def handle_shutdown_signal(signum, frame):
# Raise KeyboardInterrupt, as it's handled by the loop
try:
logging.info(f"Shutdown signal ({signum}) received. Initiating graceful exit...")
except NameError:
print(f"[Strategy-{strategy_name}] Shutdown signal ({signum}) received. Initiating graceful exit...")
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, handle_shutdown_signal)
# --- END GRACEFUL SHUTDOWN HANDLER ---
# --- Setup logging to file for this specific process ---
log_file_path = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log")
try:
sys.stdout = open(log_file_path, 'a', buffering=1) # 1 = line buffering
sys.stderr = sys.stdout
except Exception as e:
print(f"Failed to open log file for {strategy_name}: {e}")
setup_logging('normal', f"Strategy-{strategy_name}")
while True:
try:
logging.info(f"--- Starting strategy '{strategy_name}' ---")
if 'class' not in config:
logging.error(f"Strategy config for '{strategy_name}' is missing the 'class' key. Exiting.")
return
module_path, class_name = config['class'].rsplit('.', 1)
module = importlib.import_module(module_path)
StrategyClass = getattr(module, class_name)
strategy = StrategyClass(strategy_name, config['parameters'], trade_signal_queue)
if config.get("is_event_driven", False):
logging.info(f"Starting EVENT-DRIVEN logic loop...")
strategy.run_event_loop() # This is a blocking call
else:
logging.info(f"Starting POLLING logic loop...")
strategy.run_polling_loop() # This is the original blocking call
# --- REVERTED: Added back simple KeyboardInterrupt handler ---
except KeyboardInterrupt:
logging.info(f"Strategy {strategy_name} process stopping.")
return
except Exception as e:
# --- REVERTED: Removed specific check for KeyboardInterrupt ---
logging.error(f"Strategy '{strategy_name}' failed: {e}", exc_info=True)
logging.info("Restarting strategy in 10 seconds...")
time.sleep(10)
def run_dashboard_data_fetcher():
"""Target function to run the dashboard_data_fetcher.py script."""
# --- GRACEFUL SHUTDOWN HANDLER ---
import signal
def handle_shutdown_signal(signum, frame):
# Raise KeyboardInterrupt, as it's handled by the loop
try:
logging.info(f"Shutdown signal ({signum}) received. Initiating graceful exit...")
except NameError:
print(f"[DashboardDataFetcher] Shutdown signal ({signum}) received. Initiating graceful exit...")
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, handle_shutdown_signal)
# --- END GRACEFUL SHUTDOWN HANDLER ---
log_file = os.path.join(LOGS_DIR, "dashboard_data_fetcher.log")
while True:
try:
with open(log_file, 'a') as f:
f.write(f"\n--- Starting Dashboard Data Fetcher at {datetime.now()} ---\n")
subprocess.run([sys.executable, DASHBOARD_DATA_FETCHER_SCRIPT, "--log-level", "normal"], check=True, stdout=f, stderr=subprocess.STDOUT)
except KeyboardInterrupt: # --- MODIFIED: Added to catch interrupt ---
logging.info("Dashboard Data Fetcher stopping.")
break
except (subprocess.CalledProcessError, Exception) as e:
with open(log_file, 'a') as f:
f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n")
f.write(f"Dashboard Data Fetcher failed: {e}. Restarting...\n")
time.sleep(10)
class MainApp:
def __init__(self, coins_to_watch: list, processes: dict, strategy_configs: dict, shared_prices: dict):
self.watched_coins = coins_to_watch
self.shared_prices = shared_prices
self.prices = {}
# --- REMOVED: self.market_caps ---
self.open_positions = {}
self.background_processes = processes
self.process_status = {}
self.strategy_configs = strategy_configs
self.strategy_statuses = {}
def read_prices(self):
"""Reads the latest prices directly from the shared memory dictionary."""
try:
# --- FIX: Use .copy() for thread-safe iteration ---
self.prices = self.shared_prices.copy()
except Exception as e:
logging.debug(f"Could not read from shared prices dict: {e}")
# --- REMOVED: read_market_caps method ---
def read_strategy_statuses(self):
"""Reads the status JSON file for each enabled strategy."""
enabled_statuses = {}
for name, config in self.strategy_configs.items():
if config.get("enabled", False):
status_file = os.path.join("_data", f"strategy_status_{name}.json")
if os.path.exists(status_file):
try:
with open(status_file, 'r', encoding='utf-8') as f:
enabled_statuses[name] = json.load(f)
except (IOError, json.JSONDecodeError):
enabled_statuses[name] = {"error": "Could not read status file."}
else:
enabled_statuses[name] = {"current_signal": "Initializing..."}
self.strategy_statuses = enabled_statuses
def read_executor_status(self):
"""Reads the live status file from the trade executor."""
if os.path.exists(TRADE_EXECUTOR_STATUS_FILE):
try:
with open(TRADE_EXECUTOR_STATUS_FILE, 'r', encoding='utf-8') as f:
# --- FIX: Read the 'open_positions' key from the file ---
status_data = json.load(f)
self.open_positions = status_data.get('open_positions', {})
except (IOError, json.JSONDecodeError):
logging.debug("Could not read trade executor status file.")
else:
self.open_positions = {}
def check_process_status(self):
"""Checks if the background processes are still running."""
for name, process in self.background_processes.items():
self.process_status[name] = "Running" if process.is_alive() else "STOPPED"
def _format_price(self, price_val, width=10):
"""Helper function to format prices for the dashboard."""
try:
price_float = float(price_val)
if price_float < 1:
price_str = f"{price_float:>{width}.6f}"
elif price_float < 100:
price_str = f"{price_float:>{width}.4f}"
else:
price_str = f"{price_float:>{width}.2f}"
except (ValueError, TypeError):
price_str = f"{'Loading...':>{width}}"
return price_str
def display_dashboard(self):
"""Displays a formatted dashboard with side-by-side tables."""
print("\x1b[H\x1b[J", end="") # Clear screen
left_table_lines = ["--- Market Dashboard ---"]
# --- MODIFIED: Adjusted width for new columns ---
left_table_width = 65
left_table_lines.append("-" * left_table_width)
# --- MODIFIED: Replaced Market Cap with Gap ---
left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Best Bid':>10} | {'Live Price':>10} | {'Best Ask':>10} | {'Gap':>10} |")
left_table_lines.append("-" * left_table_width)
for i, coin in enumerate(self.watched_coins, 1):
# --- MODIFIED: Fetch all three price types ---
mid_price = self.prices.get(coin, "Loading...")
bid_price = self.prices.get(f"{coin}_bid", "Loading...")
ask_price = self.prices.get(f"{coin}_ask", "Loading...")
# --- MODIFIED: Use the new formatting helper ---
formatted_mid = self._format_price(mid_price)
formatted_bid = self._format_price(bid_price)
formatted_ask = self._format_price(ask_price)
# --- MODIFIED: Calculate gap ---
gap_str = f"{'Loading...':>10}"
try:
# Calculate the spread
gap_val = float(ask_price) - float(bid_price)
# Format gap with high precision, similar to price
if gap_val < 1:
gap_str = f"{gap_val:>{10}.6f}"
else:
gap_str = f"{gap_val:>{10}.4f}"
except (ValueError, TypeError):
pass # Keep 'Loading...'
# --- REMOVED: Market Cap logic ---
# --- MODIFIED: Print all price columns including gap ---
left_table_lines.append(f"{i:<2} | {coin:^6} | {formatted_bid} | {formatted_mid} | {formatted_ask} | {gap_str} |")
left_table_lines.append("-" * left_table_width)
right_table_lines = ["--- Strategy Status ---"]
# --- FIX: Adjusted table width after removing parameters ---
right_table_width = 105
right_table_lines.append("-" * right_table_width)
# --- FIX: Removed 'Parameters' from header ---
right_table_lines.append(f"{'#':^2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':^8} | {'Signal Price':>12} | {'Last Change':>17} | {'TF':^5} | {'Size':^8} |")
right_table_lines.append("-" * right_table_width)
for i, (name, status) in enumerate(self.strategy_statuses.items(), 1):
signal = status.get('current_signal', 'N/A')
price = status.get('signal_price')
price_display = f"{price:.4f}" if isinstance(price, (int, float)) else "-"
last_change = status.get('last_signal_change_utc')
last_change_display = 'Never'
if last_change:
dt_utc = datetime.fromisoformat(last_change.replace('Z', '+00:00')).replace(tzinfo=timezone.utc)
dt_local = dt_utc.astimezone(None)
last_change_display = dt_local.strftime('%Y-%m-%d %H:%M')
config_params = self.strategy_configs.get(name, {}).get('parameters', {})
# --- FIX: Read coin/size from status file first, fallback to config ---
coin = status.get('coin', config_params.get('coin', 'N/A'))
# --- FIX: Handle nested 'coins_to_copy' logic for size ---
# --- MODIFIED: Read 'size' from status first, then config, then 'Multi' ---
size = status.get('size')
if not size:
if 'coins_to_copy' in config_params:
size = 'Multi'
else:
size = config_params.get('size', 'N/A')
timeframe = config_params.get('timeframe', 'N/A')
# --- FIX: Removed parameter string logic ---
# --- FIX: Removed 'params_str' from the formatted line ---
size_display = f"{size:>8}"
if isinstance(size, (int, float)):
# --- MODIFIED: More flexible size formatting ---
if size < 0.0001:
size_display = f"{size:>8.6f}"
elif size < 1:
size_display = f"{size:>8.4f}"
else:
size_display = f"{size:>8.2f}"
# --- END NEW LOGIC ---
right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size_display} |")
right_table_lines.append("-" * right_table_width)
output_lines = []
max_rows = max(len(left_table_lines), len(right_table_lines))
separator = " "
indent = " " * 10
for i in range(max_rows):
left_part = left_table_lines[i] if i < len(left_table_lines) else " " * left_table_width
right_part = indent + right_table_lines[i] if i < len(right_table_lines) else ""
output_lines.append(f"{left_part}{separator}{right_part}")
output_lines.append("\n--- Open Positions ---")
pos_table_width = 100
output_lines.append("-" * pos_table_width)
output_lines.append(f"{'Account':<10} | {'Coin':<6} | {'Size':>15} | {'Entry Price':>12} | {'Mark Price':>12} | {'PNL':>15} | {'Leverage':>10} |")
output_lines.append("-" * pos_table_width)
# --- FIX: Correctly read and display open positions ---
if not self.open_positions:
output_lines.append(f"{'No open positions.':^{pos_table_width}}")
else:
for account, positions in self.open_positions.items():
if not positions:
continue
for coin, pos in positions.items():
try:
size_f = float(pos.get('size', 0))
entry_f = float(pos.get('entry_price', 0))
mark_f = float(self.prices.get(coin, 0))
pnl_f = (mark_f - entry_f) * size_f if size_f > 0 else (entry_f - mark_f) * abs(size_f)
lev = pos.get('leverage', 1)
size_str = f"{size_f:>{15}.5f}"
entry_str = f"{entry_f:>{12}.2f}"
mark_str = f"{mark_f:>{12}.2f}"
pnl_str = f"{pnl_f:>{15}.2f}"
lev_str = f"{lev}x"
output_lines.append(f"{account:<10} | {coin:<6} | {size_str} | {entry_str} | {mark_str} | {pnl_str} | {lev_str:>10} |")
except (ValueError, TypeError):
output_lines.append(f"{account:<10} | {coin:<6} | {'Error parsing data...':^{pos_table_width-20}} |")
output_lines.append("-" * pos_table_width)
final_output = "\n".join(output_lines)
print(final_output)
sys.stdout.flush()
def run(self):
"""Main loop to read data, display dashboard, and check processes."""
while True:
self.read_prices()
# --- REMOVED: self.read_market_caps() ---
self.read_strategy_statuses()
self.read_executor_status()
# --- REMOVED: self.check_process_status() ---
self.display_dashboard()
time.sleep(0.5)
if __name__ == "__main__":
setup_logging('normal', 'MainApp')
if not os.path.exists(LOGS_DIR):
os.makedirs(LOGS_DIR)
processes = {}
# --- REVERTED: Removed process groups ---
try:
with open(STRATEGY_CONFIG_FILE, 'r') as f:
strategy_configs = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}")
sys.exit(1)
# --- FIX: Hardcoded timeframes ---
required_timeframes = [
"3m", "5m", "15m", "30m", "1h", "2h", "4h", "8h",
"12h", "1d", "3d", "1w", "1M", "148m", "37m"
]
logging.info(f"Using fixed timeframes for resampler: {required_timeframes}")
with multiprocessing.Manager() as manager:
shared_prices = manager.dict()
# --- FIX: Create TWO queues ---
trade_signal_queue = manager.Queue()
order_execution_queue = manager.Queue()
# --- REVERTED: All processes are daemon=True and in one dict ---
# --- FIX: Pass WATCHED_COINS to the start_live_feed process ---
# --- MODIFICATION: Set log level back to 'off' ---
processes["Live Market Feed"] = multiprocessing.Process(
target=start_live_feed,
args=(shared_prices, WATCHED_COINS, 'off'),
daemon=True
)
processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True)
processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True)
# --- REMOVED: Market Cap Fetcher Process ---
processes["Dashboard Data"] = multiprocessing.Process(target=run_dashboard_data_fetcher, daemon=True)
processes["Position Manager"] = multiprocessing.Process(
target=run_position_manager,
args=(trade_signal_queue, order_execution_queue),
daemon=True
)
processes["Trade Executor"] = multiprocessing.Process(
target=run_trade_executor,
args=(order_execution_queue,),
daemon=True
)
for name, config in strategy_configs.items():
if config.get("enabled", False):
if 'class' not in config:
logging.error(f"Strategy '{name}' is missing 'class' key. Skipping.")
continue
proc = multiprocessing.Process(target=run_strategy, args=(name, config, trade_signal_queue), daemon=True)
processes[f"Strategy: {name}"] = proc # Add to strategy group
# --- REVERTED: Removed combined dict ---
for name, proc in processes.items():
logging.info(f"Starting process '{name}'...")
proc.start()
time.sleep(3)
app = MainApp(coins_to_watch=WATCHED_COINS, processes=processes, strategy_configs=strategy_configs, shared_prices=shared_prices)
try:
app.run()
except KeyboardInterrupt:
# --- MODIFIED: Staged shutdown ---
logging.info("Shutting down...")
strategy_procs = {}
other_procs = {}
for name, proc in processes.items():
if name.startswith("Strategy:"):
strategy_procs[name] = proc
else:
other_procs[name] = proc
# --- 1. Terminate strategy processes ---
logging.info("Shutting down strategy processes first...")
for name, proc in strategy_procs.items():
if proc.is_alive():
logging.info(f"Terminating process: '{name}'...")
proc.terminate()
# --- 2. Wait for 5 seconds ---
logging.info("Waiting 5 seconds for strategies to close...")
time.sleep(5)
# --- 3. Terminate all other processes ---
logging.info("Shutting down remaining core processes...")
for name, proc in other_procs.items():
if proc.is_alive():
logging.info(f"Terminating process: '{name}'...")
proc.terminate()
# --- 4. Join all processes (strategies and others) ---
logging.info("Waiting for all processes to join...")
for name, proc in processes.items(): # Iterate over the original dict to get all
if proc.is_alive():
logging.info(f"Waiting for process '{name}' to join...")
proc.join(timeout=5) # Wait up to 5 seconds
if proc.is_alive():
# If it's still alive, force kill
logging.warning(f"Process '{name}' did not terminate, forcing kill.")
proc.kill()
# --- END MODIFIED ---
logging.info("Shutdown complete.")
sys.exit(0)