From fe5cc8e1d1464fa054f1fa4d11500ed019667522 Mon Sep 17 00:00:00 2001 From: DiTus Date: Sat, 25 Oct 2025 19:58:52 +0200 Subject: [PATCH] market cap fixes --- coin_id_map.py | 45 ++++++++++++++------- del_market_cap_tables.py | 56 ++++++++++++++++++++++++++ live_market_utils.py | 85 +++++++++++++++++++++++++++++++++------- main_app.py | 31 +++++++++------ market_cap_fetcher.py | 11 ++++-- 5 files changed, 184 insertions(+), 44 deletions(-) create mode 100644 del_market_cap_tables.py diff --git a/coin_id_map.py b/coin_id_map.py index 3f4608d..91183bc 100644 --- a/coin_id_map.py +++ b/coin_id_map.py @@ -11,7 +11,7 @@ def update_coin_mapping(): """ Fetches all assets from Hyperliquid and all coins from CoinGecko, then creates and saves a mapping from the Hyperliquid symbol to the - CoinGecko ID. + CoinGecko ID using a robust matching algorithm. """ setup_logging('normal', 'CoinMapUpdater') logging.info("Starting coin mapping update process...") @@ -20,13 +20,8 @@ def update_coin_mapping(): try: logging.info("Fetching assets from Hyperliquid...") info = Info(constants.MAINNET_API_URL, skip_ws=True) - # The meta object contains the 'universe' list with asset details meta, asset_contexts = info.meta_and_asset_ctxs() - - # --- FIX: The asset names are in the 'universe' list inside the meta object --- - # The 'universe' is a list of dictionaries, each with a 'name' - hyperliquid_assets = [asset['name'] for asset in meta['universe']] - + hyperliquid_assets = meta['universe'] logging.info(f"Found {len(hyperliquid_assets)} assets on Hyperliquid.") except Exception as e: logging.error(f"Failed to fetch assets from Hyperliquid: {e}") @@ -38,8 +33,11 @@ def update_coin_mapping(): response = requests.get("https://api.coingecko.com/api/v3/coins/list") response.raise_for_status() coingecko_coins = response.json() - # Create a lookup table: {symbol: id} - coingecko_lookup = {coin['symbol'].upper(): coin['id'] for coin in coingecko_coins} + + # Create more robust lookup tables + cg_symbol_lookup = {coin['symbol'].upper(): coin['id'] for coin in coingecko_coins} + cg_name_lookup = {coin['name'].upper(): coin['id'] for coin in coingecko_coins} + logging.info(f"Found {len(coingecko_coins)} coins on CoinGecko.") except requests.exceptions.RequestException as e: logging.error(f"Failed to fetch coin list from CoinGecko: {e}") @@ -47,24 +45,41 @@ def update_coin_mapping(): # --- 3. Create the mapping --- final_mapping = {} + # Use manual overrides for critical coins where symbols are ambiguous manual_overrides = { + "BTC": "bitcoin", + "ETH": "ethereum", + "SOL": "solana", + "BNB": "binancecoin", "HYPE": "hyperliquid", "PUMP": "pump-fun", "ASTER": "astar", + "ZEC": "zcash", + "SUI": "sui", + "ACE": "endurance", + # Add other important ones you watch here } logging.info("Generating symbol-to-id mapping...") - for asset_symbol in hyperliquid_assets: - # Check for manual overrides first + for asset in hyperliquid_assets: + asset_symbol = asset['name'].upper() + asset_name = asset.get('name', '').upper() # Use full name if available + + # Priority 1: Manual Overrides if asset_symbol in manual_overrides: final_mapping[asset_symbol] = manual_overrides[asset_symbol] continue - # Try to find a direct match in the CoinGecko lookup table - if asset_symbol in coingecko_lookup: - final_mapping[asset_symbol] = coingecko_lookup[asset_symbol] + # Priority 2: Exact Name Match + if asset_name in cg_name_lookup: + final_mapping[asset_symbol] = cg_name_lookup[asset_name] + continue + + # Priority 3: Symbol Match + if asset_symbol in cg_symbol_lookup: + final_mapping[asset_symbol] = cg_symbol_lookup[asset_symbol] else: - logging.warning(f"No direct match found for '{asset_symbol}' on CoinGecko. It will be excluded.") + logging.warning(f"No match found for '{asset_symbol}' on CoinGecko. It will be excluded.") # --- 4. Save the mapping to a file --- map_file_path = os.path.join("_data", "coin_id_map.json") diff --git a/del_market_cap_tables.py b/del_market_cap_tables.py new file mode 100644 index 0000000..d18050c --- /dev/null +++ b/del_market_cap_tables.py @@ -0,0 +1,56 @@ +import sqlite3 +import logging +import os + +from logging_utils import setup_logging + +def cleanup_market_cap_tables(): + """ + Scans the database and drops all tables related to market cap data + to allow for a clean refresh. + """ + setup_logging('normal', 'DBCleanup') + db_path = os.path.join("_data", "market_data.db") + + if not os.path.exists(db_path): + logging.error(f"Database file not found at '{db_path}'. Nothing to clean.") + return + + logging.info(f"Connecting to database at '{db_path}'...") + try: + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + + # Find all tables that were created by the market cap fetcher + cursor.execute(""" + SELECT name FROM sqlite_master + WHERE type='table' + AND (name LIKE '%_market_cap' OR name LIKE 'TOTAL_%') + """) + + tables_to_drop = cursor.fetchall() + + if not tables_to_drop: + logging.info("No market cap tables found to clean up. Database is already clean.") + return + + logging.warning(f"Found {len(tables_to_drop)} market cap tables to remove...") + + for table in tables_to_drop: + table_name = table[0] + try: + logging.info(f"Dropping table: {table_name}...") + conn.execute(f'DROP TABLE IF EXISTS "{table_name}"') + except Exception as e: + logging.error(f"Failed to drop table {table_name}: {e}") + + conn.commit() + logging.info("--- Database cleanup complete ---") + + except sqlite3.Error as e: + logging.error(f"A database error occurred: {e}") + except Exception as e: + logging.error(f"An unexpected error occurred: {e}") + +if __name__ == "__main__": + cleanup_market_cap_tables() diff --git a/live_market_utils.py b/live_market_utils.py index bc5246b..31aa3eb 100644 --- a/live_market_utils.py +++ b/live_market_utils.py @@ -1,11 +1,33 @@ import logging import json import time +import os +import traceback from hyperliquid.info import Info from hyperliquid.utils import constants from logging_utils import setup_logging +# --- Configuration for standalone error logging --- +LOGS_DIR = "_logs" +ERROR_LOG_FILE = os.path.join(LOGS_DIR, "live_market_errors.log") + +def log_error(error_message: str, include_traceback: bool = True): + """A simple, robust file logger for any errors.""" + try: + if not os.path.exists(LOGS_DIR): + os.makedirs(LOGS_DIR) + + with open(ERROR_LOG_FILE, 'a') as f: + timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) + f.write(f"--- ERROR at {timestamp} UTC ---\n") + f.write(error_message + "\n") + if include_traceback: + f.write(traceback.format_exc() + "\n") + f.write("="*50 + "\n") + except Exception: + print(f"CRITICAL: Failed to write to error log file: {error_message}", file=sys.stderr) + def on_message(message, shared_prices_dict): """ Callback function to process incoming 'allMids' messages and update the @@ -14,36 +36,71 @@ def on_message(message, shared_prices_dict): try: if message.get("channel") == "allMids": new_prices = message.get("data", {}).get("mids", {}) - # Update the shared dictionary with the new price data shared_prices_dict.update(new_prices) except Exception as e: - # It's important to log errors inside the process - logging.error(f"Error in WebSocket on_message: {e}") + log_error(f"Error in WebSocket on_message: {e}") def start_live_feed(shared_prices_dict, log_level='off'): """ Main function for the WebSocket process. It takes a shared dictionary and continuously feeds it with live market data. + Includes a watchdog to auto-reconnect on failure. """ setup_logging(log_level, 'LiveMarketFeed') - # The Info object manages the WebSocket connection. - info = Info(constants.MAINNET_API_URL, skip_ws=False) - - # We need to wrap the callback in a lambda to pass our shared dictionary + info = None callback = lambda msg: on_message(msg, shared_prices_dict) - # Subscribe to the allMids channel - subscription = {"type": "allMids"} - info.subscribe(subscription, callback) - logging.info("Subscribed to 'allMids' for live mark prices.") + def connect_and_subscribe(): + """Establishes a new WebSocket connection and subscribes to allMids.""" + try: + logging.info("Connecting to Hyperliquid WebSocket...") + # Ensure skip_ws=False to create the ws_manager + new_info = Info(constants.MAINNET_API_URL, skip_ws=False) + subscription = {"type": "allMids"} + new_info.subscribe(subscription, callback) + logging.info("WebSocket connected and subscribed to 'allMids'.") + return new_info + except Exception as e: + log_error(f"Failed to connect to WebSocket: {e}") + return None + + info = connect_and_subscribe() logging.info("Starting live price feed process. Press Ctrl+C in main app to stop.") + try: - # The background thread in the SDK handles messages. This loop just keeps the process alive. while True: - time.sleep(1) + # --- Watchdog Logic --- + time.sleep(15) # Check the connection every 15 seconds + + if info is None or not info.ws_manager.is_running(): + # --- FIX: Log this critical failure to the persistent error log --- + error_msg = "WebSocket connection lost or not running. Attempting to reconnect..." + logging.warning(error_msg) + log_error(error_msg, include_traceback=False) # Log it to the file + + if info: + try: + info.ws_manager.stop() # Clean up old manager + except Exception as e: + log_error(f"Error stopping old ws_manager: {e}") + + info = connect_and_subscribe() + + if info is None: + logging.error("Reconnect failed, will retry in 15s.") + else: + logging.info("Successfully reconnected to WebSocket.") + else: + logging.debug("Watchdog check: WebSocket connection is active.") + except KeyboardInterrupt: logging.info("Stopping WebSocket listener...") - info.ws_manager.stop() + except Exception as e: + log_error(f"Live Market Feed process crashed: {e}") + finally: + if info and info.ws_manager: + info.ws_manager.stop() logging.info("Listener stopped.") + diff --git a/main_app.py b/main_app.py index aca61fb..32048c4 100644 --- a/main_app.py +++ b/main_app.py @@ -11,11 +11,12 @@ import pandas as pd from datetime import datetime, timezone from logging_utils import setup_logging -# --- Using the high-performance WebSocket utility for live prices --- +# --- Using the new high-performance WebSocket utility for live prices --- from live_market_utils import start_live_feed # --- Configuration --- WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"] +# --- FIX: Replaced old data_fetcher with the new live_candle_fetcher --- LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py" RESAMPLER_SCRIPT = "resampler.py" MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py" @@ -26,9 +27,6 @@ MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json") LOGS_DIR = "_logs" TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json") -# --- ADDED: Standard list of timeframes for the resampler to generate --- -STANDARD_RESAMPLING_TIMEFRAMES = ["3m", "5m", "15m", "30m", "37m", "148m", "1h", "2h", "4h", "8h", "12h", "1d", "3d", "1w", "1M"] - def format_market_cap(mc_value): """Formats a large number into a human-readable market cap string.""" @@ -49,6 +47,7 @@ def run_live_candle_fetcher(): while True: try: with open(log_file, 'a') as f: + # We can't get coins from strategies.json here, so we pass the default list command = [sys.executable, LIVE_CANDLE_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] f.write(f"\n--- Starting {LIVE_CANDLE_FETCHER_SCRIPT} at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) @@ -73,13 +72,12 @@ def run_resampler_job(timeframes_to_generate: list): f.write(f"Failed to run resampler.py job: {e}\n") -def resampler_scheduler(): - """Schedules the resampler.py script to run at the start of every minute.""" +def resampler_scheduler(timeframes_to_generate: list): + """Schedules the resampler.py script.""" setup_logging('off', 'ResamplerScheduler') - # Run once at startup - run_resampler_job(STANDARD_RESAMPLING_TIMEFRAMES) + run_resampler_job(timeframes_to_generate) # Schedule to run every minute at the :01 second mark - schedule.every().minute.at(":01").do(run_resampler_job, timeframes_to_generate=STANDARD_RESAMPLING_TIMEFRAMES) + schedule.every().minute.at(":01").do(run_resampler_job, timeframes_to_generate=timeframes_to_generate) logging.info("Resampler scheduled to run every minute at :01.") while True: schedule.run_pending() @@ -90,7 +88,7 @@ def run_market_cap_fetcher_job(): """Defines the job for the market cap fetcher, redirecting output.""" log_file = os.path.join(LOGS_DIR, "market_cap_fetcher.log") try: - command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] + command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--log-level", "off"] with open(log_file, 'a') as f: f.write(f"\n--- Starting {MARKET_CAP_FETCHER_SCRIPT} job at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) @@ -319,13 +317,22 @@ if __name__ == "__main__": logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}") sys.exit(1) + required_timeframes = set() + for name, config in strategy_configs.items(): + if config.get("enabled", False): + tf = config.get("parameters", {}).get("timeframe") + if tf: + required_timeframes.add(tf) + + if not required_timeframes: + logging.warning("No timeframes required by any enabled strategy.") + with multiprocessing.Manager() as manager: shared_prices = manager.dict() processes["Live Market Feed"] = multiprocessing.Process(target=start_live_feed, args=(shared_prices, 'off'), daemon=True) processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True) - # --- FIX: The resampler now uses a fixed list of TFs and a new schedule --- - processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, daemon=True) + processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True) processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) processes["Trade Executor"] = multiprocessing.Process(target=run_trade_executor, daemon=True) diff --git a/market_cap_fetcher.py b/market_cap_fetcher.py index 2557828..9b26fa5 100644 --- a/market_cap_fetcher.py +++ b/market_cap_fetcher.py @@ -41,7 +41,6 @@ class MarketCapFetcher: "DAI": "dai", "PYUSD": "paypal-usd" } - # --- ADDED: Ensure all tables have the correct schema --- self._ensure_tables_exist() def _ensure_tables_exist(self): @@ -291,8 +290,14 @@ class MarketCapFetcher: if not market_caps: return pd.DataFrame() df = pd.DataFrame(market_caps, columns=['timestamp_ms', 'market_cap']) - # --- FIX: Convert to datetime object, but do not format as string --- - df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms') + + # --- FIX: Normalize all timestamps to the start of the day (00:00:00 UTC) --- + # This prevents duplicate entries for the same day (e.g., a "live" candle vs. the daily one) + df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms').dt.normalize() + + # Recalculate the timestamp_ms to match the normalized 00:00:00 datetime + df['timestamp_ms'] = (df['datetime_utc'].astype('int64') // 10**6) + df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True) return df[['datetime_utc', 'timestamp_ms', 'market_cap']]