diff --git a/live_candle_fetcher.py b/live_candle_fetcher.py new file mode 100644 index 0000000..b8bd7b5 --- /dev/null +++ b/live_candle_fetcher.py @@ -0,0 +1,238 @@ +import argparse +import logging +import os +import sys +import json +import time +from datetime import datetime, timezone +from hyperliquid.info import Info +from hyperliquid.utils import constants +import sqlite3 +from queue import Queue +from threading import Thread + +from logging_utils import setup_logging + +class LiveCandleFetcher: + """ + Connects to Hyperliquid to maintain a complete and up-to-date database of + 1-minute candles using a robust producer-consumer architecture to prevent + data corruption and duplication. + """ + + def __init__(self, log_level: str, coins: list): + setup_logging(log_level, 'LiveCandleFetcher') + self.db_path = os.path.join("_data", "market_data.db") + self.coins_to_watch = set(coins) + if not self.coins_to_watch: + logging.error("No coins provided to watch. Exiting.") + sys.exit(1) + + self.info = Info(constants.MAINNET_API_URL, skip_ws=False) + self.candle_queue = Queue() # Thread-safe queue for candles + self._ensure_tables_exist() + + def _ensure_tables_exist(self): + """ + Ensures that all necessary tables are created with the correct schema and PRIMARY KEY. + If a table exists with an incorrect schema, it attempts to migrate the data. + """ + with sqlite3.connect(self.db_path) as conn: + for coin in self.coins_to_watch: + table_name = f"{coin}_1m" + cursor = conn.cursor() + cursor.execute(f"PRAGMA table_info('{table_name}')") + columns = cursor.fetchall() + + if columns: + pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns) + if not pk_found: + logging.warning(f"Schema migration needed for table '{table_name}': 'timestamp_ms' is not the PRIMARY KEY.") + logging.warning("Attempting to automatically rebuild the table...") + try: + # 1. Rename old table + conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"') + logging.info(f" -> Renamed existing table to '{table_name}_old'.") + + # 2. Create new table with correct schema + self._create_candle_table(conn, table_name) + logging.info(f" -> Created new '{table_name}' table with correct schema.") + + # 3. Copy unique data from old table to new table + conn.execute(f''' + INSERT OR IGNORE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades) + SELECT datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades + FROM "{table_name}_old" + ''') + conn.commit() + logging.info(" -> Copied data to new table.") + + # 4. Drop the old table + conn.execute(f'DROP TABLE "{table_name}_old"') + logging.info(f" -> Removed old table. Migration for '{table_name}' complete.") + except Exception as e: + logging.error(f"FATAL: Automatic schema migration for '{table_name}' failed: {e}") + logging.error("Please delete the database file '_data/market_data.db' manually and restart.") + sys.exit(1) + else: + # If table does not exist, create it + self._create_candle_table(conn, table_name) + logging.info("Database tables verified.") + + def _create_candle_table(self, conn, table_name: str): + """Creates a new candle table with the correct schema.""" + conn.execute(f''' + CREATE TABLE "{table_name}" ( + datetime_utc TEXT, + timestamp_ms INTEGER PRIMARY KEY, + open REAL, + high REAL, + low REAL, + close REAL, + volume REAL, + number_of_trades INTEGER + ) + ''') + + def on_message(self, message): + """ + Callback function to process incoming candle messages. This is the "Producer". + It puts the raw message onto the queue for the DB writer. + """ + try: + if message.get("channel") == "candle": + candle_data = message.get("data", {}) + if candle_data: + self.candle_queue.put(candle_data) + except Exception as e: + logging.error(f"Error in on_message: {e}") + + def _database_writer_thread(self): + """ + This is the "Consumer" thread. It runs forever, pulling candles from the + queue and writing them to the database, ensuring all writes are serial. + """ + while True: + try: + candle = self.candle_queue.get() + if candle is None: # A signal to stop the thread + break + + coin = candle.get('coin') + if not coin: + continue + + table_name = f"{coin}_1m" + record = ( + datetime.fromtimestamp(candle['t'] / 1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), + candle['t'], + candle.get('o'), candle.get('h'), candle.get('l'), candle.get('c'), + candle.get('v'), candle.get('n') + ) + + with sqlite3.connect(self.db_path) as conn: + conn.execute(f''' + INSERT OR REPLACE INTO "{table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', record) + conn.commit() + logging.debug(f"Upserted candle for {coin} at {record[0]}") + + except Exception as e: + logging.error(f"Error in database writer thread: {e}") + + def _get_last_timestamp_from_db(self, coin: str) -> int: + """Gets the most recent millisecond timestamp from a coin's 1m table.""" + table_name = f"{coin}_1m" + try: + with sqlite3.connect(self.db_path) as conn: + result = conn.execute(f'SELECT MAX(timestamp_ms) FROM "{table_name}"').fetchone() + return int(result[0]) if result and result[0] is not None else None + except Exception as e: + logging.error(f"Could not read last timestamp from table '{table_name}': {e}") + return None + + def _fetch_historical_candles(self, coin: str, start_ms: int, end_ms: int): + """Fetches historical candles and puts them on the queue for the writer.""" + logging.info(f"Fetching historical data for {coin} from {datetime.fromtimestamp(start_ms/1000)}...") + current_start = start_ms + + while current_start < end_ms: + try: + http_info = Info(constants.MAINNET_API_URL, skip_ws=True) + batch = http_info.candles_snapshot(coin, "1m", current_start, end_ms) + if not batch: + break + + for candle in batch: + candle['coin'] = coin + self.candle_queue.put(candle) + + last_ts = batch[-1]['t'] + if last_ts < current_start: + break + current_start = last_ts + 1 + time.sleep(0.5) + except Exception as e: + logging.error(f"Error fetching historical chunk for {coin}: {e}") + break + + logging.info(f"Historical data fetching for {coin} is complete.") + + def run(self): + """ + Starts the database writer, catches up on historical data, then + subscribes to the WebSocket for live updates. + """ + db_writer = Thread(target=self._database_writer_thread, daemon=True) + db_writer.start() + + logging.info("--- Starting Historical Data Catch-Up Phase ---") + now_ms = int(time.time() * 1000) + for coin in self.coins_to_watch: + last_ts = self._get_last_timestamp_from_db(coin) + start_ts = last_ts + 60000 if last_ts else now_ms - (7 * 24 * 60 * 60 * 1000) + if start_ts < now_ms: + self._fetch_historical_candles(coin, start_ts, now_ms) + + logging.info("--- Historical Catch-Up Complete. Starting Live WebSocket Feed ---") + for coin in self.coins_to_watch: + # --- FIX: Use a lambda to create a unique callback for each subscription --- + # This captures the 'coin' variable and adds it to the message data. + callback = lambda msg, c=coin: self.on_message({**msg, 'data': {**msg.get('data',{}), 'coin': c}}) + subscription = {"type": "candle", "coin": coin, "interval": "1m"} + self.info.subscribe(subscription, callback) + logging.info(f"Subscribed to 1m candles for {coin}") + time.sleep(0.2) + + print("\nListening for live candle data... Press Ctrl+C to stop.") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nStopping WebSocket listener...") + self.info.ws_manager.stop() + self.candle_queue.put(None) + db_writer.join() + print("Listener stopped.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A hybrid historical and live candle data fetcher for Hyperliquid.") + parser.add_argument( + "--coins", + nargs='+', + required=True, + help="List of coin symbols to fetch (e.g., BTC ETH)." + ) + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level for the script." + ) + args = parser.parse_args() + + fetcher = LiveCandleFetcher(log_level=args.log_level, coins=args.coins) + fetcher.run() + diff --git a/main_app.py b/main_app.py index 47670b4..a53256b 100644 --- a/main_app.py +++ b/main_app.py @@ -11,18 +11,18 @@ import pandas as pd from datetime import datetime, timezone from logging_utils import setup_logging +# --- Using the high-performance WebSocket utility for live prices --- +from live_market_utils import start_live_feed # --- Configuration --- WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"] -COIN_LISTER_SCRIPT = "list_coins.py" -MARKET_FEEDER_SCRIPT = "market.py" -DATA_FETCHER_SCRIPT = "data_fetcher.py" +# --- FIX: Replaced old data_fetcher with the new live_candle_fetcher --- +LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py" RESAMPLER_SCRIPT = "resampler.py" MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py" +TRADE_EXECUTOR_SCRIPT = "trade_executor.py" STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json") -PRICE_DATA_FILE = os.path.join("_data", "current_prices.json") DB_PATH = os.path.join("_data", "market_data.db") -STATUS_FILE = os.path.join("_data", "fetcher_status.json") MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json") LOGS_DIR = "_logs" TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json") @@ -41,47 +41,22 @@ def format_market_cap(mc_value): return f"${mc_value:,.2f}" -def run_market_feeder(): - """Target function to run market.py and redirect its output to a log file.""" - log_file = os.path.join(LOGS_DIR, "market_feeder.log") +def run_live_candle_fetcher(): + """Target function to run the live_candle_fetcher.py script in a resilient loop.""" + log_file = os.path.join(LOGS_DIR, "live_candle_fetcher.log") while True: try: with open(log_file, 'a') as f: - subprocess.run( - [sys.executable, MARKET_FEEDER_SCRIPT, "--log-level", "off"], - check=True, stdout=f, stderr=subprocess.STDOUT - ) + command = [sys.executable, LIVE_CANDLE_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] + f.write(f"\n--- Starting {LIVE_CANDLE_FETCHER_SCRIPT} at {datetime.now()} ---\n") + subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except (subprocess.CalledProcessError, Exception) as e: with open(log_file, 'a') as f: f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") - f.write(f"Market feeder script failed: {e}. Restarting...\n") + f.write(f"Live candle fetcher failed: {e}. Restarting...\n") time.sleep(5) -def run_data_fetcher_job(): - """Defines the job for the data fetcher, redirecting output to a log file.""" - log_file = os.path.join(LOGS_DIR, "data_fetcher.log") - try: - command = [sys.executable, DATA_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--days", "7", "--log-level", "off"] - with open(log_file, 'a') as f: - f.write(f"\n--- Starting data_fetcher.py job at {datetime.now()} ---\n") - subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) - except Exception as e: - with open(log_file, 'a') as f: - f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n") - f.write(f"Failed to run data_fetcher.py job: {e}\n") - - -def data_fetcher_scheduler(): - """Schedules the data_fetcher.py script.""" - setup_logging('off', 'DataFetcherScheduler') - run_data_fetcher_job() - schedule.every(1).minutes.do(run_data_fetcher_job) - while True: - schedule.run_pending() - time.sleep(1) - - def run_resampler_job(timeframes_to_generate: list): """Defines the job for the resampler, redirecting output to a log file.""" log_file = os.path.join(LOGS_DIR, "resampler.log") @@ -133,8 +108,7 @@ def run_strategy(strategy_name: str, config: dict): """Target function to run a strategy, redirecting its output to a log file.""" log_file = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log") script_name = config['script'] - params_str = json.dumps(config['parameters']) - command = [sys.executable, script_name, "--name", strategy_name, "--params", params_str, "--log-level", "normal"] + command = [sys.executable, script_name, "--name", strategy_name, "--log-level", "normal"] while True: try: with open(log_file, 'a') as f: @@ -146,13 +120,27 @@ def run_strategy(strategy_name: str, config: dict): f.write(f"Strategy '{strategy_name}' failed: {e}. Restarting...\n") time.sleep(10) +def run_trade_executor(): + """Target function to run the trade_executor.py script in a resilient loop.""" + log_file = os.path.join(LOGS_DIR, "trade_executor.log") + while True: + try: + with open(log_file, 'a') as f: + f.write(f"\n--- Starting Trade Executor at {datetime.now()} ---\n") + subprocess.run([sys.executable, TRADE_EXECUTOR_SCRIPT, "--log-level", "normal"], check=True, stdout=f, stderr=subprocess.STDOUT) + except (subprocess.CalledProcessError, Exception) as e: + with open(log_file, 'a') as f: + f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") + f.write(f"Trade Executor failed: {e}. Restarting...\n") + time.sleep(10) + class MainApp: - def __init__(self, coins_to_watch: list, processes: dict, strategy_configs: dict): + def __init__(self, coins_to_watch: list, processes: dict, strategy_configs: dict, shared_prices: dict): self.watched_coins = coins_to_watch + self.shared_prices = shared_prices self.prices = {} self.market_caps = {} - self.last_db_update_info = "Initializing..." self.open_positions = {} self.background_processes = processes self.process_status = {} @@ -160,21 +148,17 @@ class MainApp: self.strategy_statuses = {} def read_prices(self): - """Reads the latest prices from the JSON file.""" - if os.path.exists(PRICE_DATA_FILE): - try: - with open(PRICE_DATA_FILE, 'r', encoding='utf-8') as f: - self.prices = json.load(f) - except (json.JSONDecodeError, IOError): - logging.debug("Could not read price file.") + """Reads the latest prices directly from the shared memory dictionary.""" + try: + self.prices = dict(self.shared_prices) + except Exception as e: + logging.debug(f"Could not read from shared prices dict: {e}") def read_market_caps(self): - """Reads the latest market cap summary from its JSON file.""" if os.path.exists(MARKET_CAP_SUMMARY_FILE): try: with open(MARKET_CAP_SUMMARY_FILE, 'r', encoding='utf-8') as f: summary_data = json.load(f) - for coin in self.watched_coins: table_key = f"{coin}_market_cap" if table_key in summary_data: @@ -183,7 +167,6 @@ class MainApp: logging.debug("Could not read market cap summary file.") def read_strategy_statuses(self): - """Reads the status JSON file for each enabled strategy.""" enabled_statuses = {} for name, config in self.strategy_configs.items(): if config.get("enabled", False): @@ -199,7 +182,6 @@ class MainApp: self.strategy_statuses = enabled_statuses def read_executor_status(self): - """Reads the live status file from the trade executor.""" if os.path.exists(TRADE_EXECUTOR_STATUS_FILE): try: with open(TRADE_EXECUTOR_STATUS_FILE, 'r', encoding='utf-8') as f: @@ -209,43 +191,15 @@ class MainApp: else: self.open_positions = {} - - def get_overall_db_status(self): - """Reads the fetcher status from the status file.""" - if os.path.exists(STATUS_FILE): - try: - with open(STATUS_FILE, 'r', encoding='utf-8') as f: - status = json.load(f) - coin = status.get("last_updated_coin") - timestamp_utc_str = status.get("last_run_timestamp_utc") - num_candles = status.get("num_updated_candles", 0) - if timestamp_utc_str: - dt_utc = datetime.fromisoformat(timestamp_utc_str.replace('Z', '+00:00')).replace(tzinfo=timezone.utc) - dt_local = dt_utc.astimezone(None) - - offset = dt_local.utcoffset() - offset_hours = int(offset.total_seconds() / 3600) - sign = '+' if offset_hours >= 0 else '' - offset_str = f"UTC{sign}{offset_hours}" - timestamp_display = f"{dt_local.strftime('%Y-%m-%d %H:%M:%S')} {offset_str}" - else: - timestamp_display = "N/A" - self.last_db_update_info = f"{coin} at {timestamp_display} | {num_candles} candles" - except (IOError, json.JSONDecodeError): - self.last_db_update_info = "Error reading status file." - def check_process_status(self): - """Checks if the background processes are still running.""" for name, process in self.background_processes.items(): self.process_status[name] = "Running" if process.is_alive() else "STOPPED" def display_dashboard(self): - """Displays a formatted dashboard with side-by-side tables.""" - print("\x1b[H\x1b[J", end="") # Clear screen + print("\x1b[H\x1b[J", end="") - left_table_lines = [] + left_table_lines = ["--- Market Dashboard ---"] left_table_width = 44 - left_table_lines.append("--- Market Dashboard ---") left_table_lines.append("-" * left_table_width) left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Live Price':>10} | {'Market Cap':>15} |") left_table_lines.append("-" * left_table_width) @@ -256,9 +210,8 @@ class MainApp: left_table_lines.append(f"{i:<2} | {coin:^6} | {price:>10} | {formatted_mc:>15} |") left_table_lines.append("-" * left_table_width) - right_table_lines = [] + right_table_lines = ["--- Strategy Status ---"] right_table_width = 154 - right_table_lines.append("--- Strategy Status ---") right_table_lines.append("-" * right_table_width) right_table_lines.append(f"{'#':^2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':^8} | {'Signal Price':>12} | {'Last Change':>17} | {'TF':^5} | {'Size':^8} | {'Parameters':<45} |") right_table_lines.append("-" * right_table_width) @@ -280,7 +233,6 @@ class MainApp: other_params = {k: v for k, v in config_params.items() if k not in ['coin', 'timeframe', 'size']} params_str = ", ".join([f"{k}={v}" for k, v in other_params.items()]) - right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size:>8} | {params_str:<45} |") right_table_lines.append("-" * right_table_width) @@ -292,8 +244,6 @@ class MainApp: left_part = left_table_lines[i] if i < len(left_table_lines) else " " * left_table_width right_part = indent + right_table_lines[i] if i < len(right_table_lines) else "" output_lines.append(f"{left_part}{separator}{right_part}") - - output_lines.append(f"\nDB Status: Last update -> {self.last_db_update_info}") output_lines.append("\n--- Open Positions ---") pos_table_width = 100 @@ -308,7 +258,6 @@ class MainApp: output_lines.append("No open positions found.") else: for pos in perps_positions: - # --- FIX: Safely handle potentially None values before formatting --- try: pnl = float(pos.get('pnl', 0.0)) pnl_str = f"${pnl:,.2f}" @@ -343,27 +292,18 @@ class MainApp: while True: self.read_prices() self.read_market_caps() - self.get_overall_db_status() self.read_strategy_statuses() self.read_executor_status() self.check_process_status() self.display_dashboard() - time.sleep(2) - + time.sleep(0.5) if __name__ == "__main__": setup_logging('normal', 'MainApp') if not os.path.exists(LOGS_DIR): os.makedirs(LOGS_DIR) - - logging.info(f"Running coin lister: '{COIN_LISTER_SCRIPT}'...") - try: - subprocess.run([sys.executable, COIN_LISTER_SCRIPT], check=True, capture_output=True, text=True) - except subprocess.CalledProcessError as e: - logging.error(f"Failed to run '{COIN_LISTER_SCRIPT}'. Error: {e.stderr}") - sys.exit(1) - + processes = {} strategy_configs = {} @@ -382,37 +322,40 @@ if __name__ == "__main__": required_timeframes.add(tf) if not required_timeframes: - logging.warning("No timeframes required by any enabled strategy. Resampler will not run effectively.") + logging.warning("No timeframes required by any enabled strategy.") - - processes["Market Feeder"] = multiprocessing.Process(target=run_market_feeder, daemon=True) - processes["Data Fetcher"] = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True) - processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True) - processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) - - for name, config in strategy_configs.items(): - if config.get("enabled", False): - if not os.path.exists(config['script']): - logging.error(f"Strategy script '{config['script']}' for strategy '{name}' not found. Skipping.") - continue - proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True) - processes[f"Strategy: {name}"] = proc + with multiprocessing.Manager() as manager: + shared_prices = manager.dict() - for name, proc in processes.items(): - logging.info(f"Starting process '{name}'...") - proc.start() - - time.sleep(3) + processes["Live Market Feed"] = multiprocessing.Process(target=start_live_feed, args=(shared_prices, 'off'), daemon=True) + processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True) + processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True) + processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) + processes["Trade Executor"] = multiprocessing.Process(target=run_trade_executor, daemon=True) + + for name, config in strategy_configs.items(): + if config.get("enabled", False): + if not os.path.exists(config['script']): + logging.error(f"Strategy script '{config['script']}' for '{name}' not found. Skipping.") + continue + proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True) + processes[f"Strategy: {name}"] = proc - app = MainApp(coins_to_watch=WATCHED_COINS, processes=processes, strategy_configs=strategy_configs) - try: - app.run() - except KeyboardInterrupt: - logging.info("Shutting down...") - for proc in processes.values(): - if proc.is_alive(): proc.terminate() - for proc in processes.values(): - if proc.is_alive(): proc.join() - logging.info("Shutdown complete.") - sys.exit(0) + for name, proc in processes.items(): + logging.info(f"Starting process '{name}'...") + proc.start() + + time.sleep(3) + + app = MainApp(coins_to_watch=WATCHED_COINS, processes=processes, strategy_configs=strategy_configs, shared_prices=shared_prices) + try: + app.run() + except KeyboardInterrupt: + logging.info("Shutting down...") + for proc in processes.values(): + if proc.is_alive(): proc.terminate() + for proc in processes.values(): + if proc.is_alive(): proc.join() + logging.info("Shutdown complete.") + sys.exit(0)