import json import logging import os import sys import time import subprocess import multiprocessing import schedule import sqlite3 import pandas as pd from datetime import datetime, timezone from logging_utils import setup_logging # --- Configuration --- WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"] COIN_LISTER_SCRIPT = "list_coins.py" MARKET_FEEDER_SCRIPT = "market.py" DATA_FETCHER_SCRIPT = "data_fetcher.py" RESAMPLER_SCRIPT = "resampler.py" MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py" STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json") PRICE_DATA_FILE = os.path.join("_data", "current_prices.json") DB_PATH = os.path.join("_data", "market_data.db") STATUS_FILE = os.path.join("_data", "fetcher_status.json") MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json") LOGS_DIR = "_logs" def format_market_cap(mc_value): """Formats a large number into a human-readable market cap string.""" if not isinstance(mc_value, (int, float)) or mc_value == 0: return "N/A" if mc_value >= 1_000_000_000_000: return f"${mc_value / 1_000_000_000_000:.2f}T" if mc_value >= 1_000_000_000: return f"${mc_value / 1_000_000_000:.2f}B" if mc_value >= 1_000_000: return f"${mc_value / 1_000_000:.2f}M" return f"${mc_value:,.2f}" def run_market_feeder(): """Target function to run market.py and redirect its output to a log file.""" log_file = os.path.join(LOGS_DIR, "market_feeder.log") while True: try: with open(log_file, 'a') as f: subprocess.run( [sys.executable, MARKET_FEEDER_SCRIPT, "--log-level", "off"], check=True, stdout=f, stderr=subprocess.STDOUT ) except (subprocess.CalledProcessError, Exception) as e: with open(log_file, 'a') as f: f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") f.write(f"Market feeder script failed: {e}. Restarting...\n") time.sleep(5) def run_data_fetcher_job(): """Defines the job for the data fetcher, redirecting output to a log file.""" log_file = os.path.join(LOGS_DIR, "data_fetcher.log") try: command = [sys.executable, DATA_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--days", "7", "--log-level", "off"] with open(log_file, 'a') as f: f.write(f"\n--- Starting data_fetcher.py job at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except Exception as e: with open(log_file, 'a') as f: f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n") f.write(f"Failed to run data_fetcher.py job: {e}\n") def data_fetcher_scheduler(): """Schedules the data_fetcher.py script.""" setup_logging('off', 'DataFetcherScheduler') run_data_fetcher_job() schedule.every(1).minutes.do(run_data_fetcher_job) while True: schedule.run_pending() time.sleep(1) def run_resampler_job(): """Defines the job for the resampler, redirecting output to a log file.""" log_file = os.path.join(LOGS_DIR, "resampler.log") try: command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] with open(log_file, 'a') as f: f.write(f"\n--- Starting resampler.py job at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except Exception as e: with open(log_file, 'a') as f: f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n") f.write(f"Failed to run resampler.py job: {e}\n") def resampler_scheduler(): """Schedules the resampler.py script.""" setup_logging('off', 'ResamplerScheduler') run_resampler_job() schedule.every(4).minutes.do(run_resampler_job) while True: schedule.run_pending() time.sleep(1) def run_market_cap_fetcher_job(): """Defines the job for the market cap fetcher, redirecting output.""" log_file = os.path.join(LOGS_DIR, "market_cap_fetcher.log") try: command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] with open(log_file, 'a') as f: f.write(f"\n--- Starting {MARKET_CAP_FETCHER_SCRIPT} job at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except Exception as e: with open(log_file, 'a') as f: f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n") f.write(f"Failed to run {MARKET_CAP_FETCHER_SCRIPT} job: {e}\n") def market_cap_fetcher_scheduler(): """Schedules the market_cap_fetcher.py script to run daily at a specific UTC time.""" setup_logging('off', 'MarketCapScheduler') schedule.every().day.at("00:15", "UTC").do(run_market_cap_fetcher_job) while True: schedule.run_pending() time.sleep(60) def run_strategy(strategy_name: str, config: dict): """Target function to run a strategy, redirecting its output to a log file.""" log_file = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log") script_name = config['script'] params_str = json.dumps(config['parameters']) command = [sys.executable, script_name, "--name", strategy_name, "--params", params_str, "--log-level", "off"] while True: try: with open(log_file, 'a') as f: f.write(f"\n--- Starting strategy '{strategy_name}' at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except (subprocess.CalledProcessError, Exception) as e: with open(log_file, 'a') as f: f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") f.write(f"Strategy '{strategy_name}' failed: {e}. Restarting...\n") time.sleep(10) class MainApp: def __init__(self, coins_to_watch: list, processes: dict): self.watched_coins = coins_to_watch self.prices = {} self.market_caps = {} self.last_db_update_info = "Initializing..." self._lines_printed = 0 self.background_processes = processes self.process_status = {} def read_prices(self): """Reads the latest prices from the JSON file.""" if os.path.exists(PRICE_DATA_FILE): try: with open(PRICE_DATA_FILE, 'r', encoding='utf-8') as f: self.prices = json.load(f) except (json.JSONDecodeError, IOError): logging.debug("Could not read price file.") def read_market_caps(self): """Reads the latest market cap summary from its JSON file.""" if os.path.exists(MARKET_CAP_SUMMARY_FILE): try: with open(MARKET_CAP_SUMMARY_FILE, 'r', encoding='utf-8') as f: summary_data = json.load(f) # Extract just the market cap value for each coin for coin in self.watched_coins: table_key = f"{coin}_market_cap" if table_key in summary_data: self.market_caps[coin] = summary_data[table_key].get('market_cap') except (json.JSONDecodeError, IOError): logging.debug("Could not read market cap summary file.") def get_overall_db_status(self): """Reads the fetcher status from the status file.""" if os.path.exists(STATUS_FILE): try: with open(STATUS_FILE, 'r', encoding='utf-8') as f: status = json.load(f) coin = status.get("last_updated_coin") timestamp_utc_str = status.get("last_run_timestamp_utc") num_candles = status.get("num_updated_candles", 0) if timestamp_utc_str: dt_utc = datetime.fromisoformat(timestamp_utc_str.replace('Z', '+00:00')).replace(tzinfo=timezone.utc) dt_local = dt_utc.astimezone(None) offset = dt_local.utcoffset() offset_hours = int(offset.total_seconds() / 3600) sign = '+' if offset_hours >= 0 else '' offset_str = f"UTC{sign}{offset_hours}" timestamp_display = f"{dt_local.strftime('%Y-%m-%d %H:%M:%S')} {offset_str}" else: timestamp_display = "N/A" self.last_db_update_info = f"{coin} at {timestamp_display} | {num_candles} candles" except (IOError, json.JSONDecodeError): self.last_db_update_info = "Error reading status file." def check_process_status(self): """Checks if the background processes are still running.""" for name, process in self.background_processes.items(): self.process_status[name] = "Running" if process.is_alive() else "STOPPED" def display_dashboard(self): """Displays a formatted table without blinking by overwriting previous lines.""" if self._lines_printed > 0: print(f"\x1b[{self._lines_printed}A", end="") output_lines = ["--- Market Dashboard ---"] table_width = 44 output_lines.append("-" * table_width) output_lines.append(f"{'#':<2} | {'Coin':<6} | {'Live Price':>10} | {'Market Cap':>15} |") output_lines.append("-" * table_width) for i, coin in enumerate(self.watched_coins, 1): price = self.prices.get(coin, "Loading...") market_cap = self.market_caps.get(coin) formatted_mc = format_market_cap(market_cap) output_lines.append(f"{i:<2} | {coin:<6} | {price:>10} | {formatted_mc:>15} |") output_lines.append("-" * table_width) status_prefix = "DB Status: Last update -> " max_len = 80 status_message = f"{status_prefix}{self.last_db_update_info}" if len(status_message) > max_len: status_message = status_message[:max_len-3] + "..." output_lines.append(status_message) output_lines.append("--- Background Processes ---") for name, status in self.process_status.items(): output_lines.append(f"{name:<25}: {status}") final_output = "\n".join(output_lines) + "\n\x1b[J" print(final_output, end="") self._lines_printed = len(output_lines) sys.stdout.flush() def run(self): """Main loop to read data, display dashboard, and check processes.""" while True: self.read_prices() self.read_market_caps() self.get_overall_db_status() self.check_process_status() self.display_dashboard() time.sleep(2) if __name__ == "__main__": setup_logging('normal', 'MainApp') if not os.path.exists(LOGS_DIR): os.makedirs(LOGS_DIR) logging.info(f"Running coin lister: '{COIN_LISTER_SCRIPT}'...") try: subprocess.run([sys.executable, COIN_LISTER_SCRIPT], check=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: logging.error(f"Failed to run '{COIN_LISTER_SCRIPT}'. Error: {e.stderr}") sys.exit(1) processes = {} processes["Market Feeder"] = multiprocessing.Process(target=run_market_feeder, daemon=True) processes["Data Fetcher"] = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True) processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, daemon=True) processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) try: with open(STRATEGY_CONFIG_FILE, 'r') as f: strategy_configs = json.load(f) for name, config in strategy_configs.items(): if config.get("enabled", False): proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True) processes[f"Strategy: {name}"] = proc except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}") for name, proc in processes.items(): logging.info(f"Starting process '{name}'...") proc.start() time.sleep(3) app = MainApp(coins_to_watch=WATCHED_COINS, processes=processes) try: app.run() except KeyboardInterrupt: logging.info("Shutting down...") for proc in processes.values(): if proc.is_alive(): proc.terminate() for proc in processes.values(): if proc.is_alive(): proc.join() logging.info("Shutdown complete.") sys.exit(0)