market cap fixes
This commit is contained in:
@ -11,7 +11,7 @@ def update_coin_mapping():
|
||||
"""
|
||||
Fetches all assets from Hyperliquid and all coins from CoinGecko,
|
||||
then creates and saves a mapping from the Hyperliquid symbol to the
|
||||
CoinGecko ID.
|
||||
CoinGecko ID using a robust matching algorithm.
|
||||
"""
|
||||
setup_logging('normal', 'CoinMapUpdater')
|
||||
logging.info("Starting coin mapping update process...")
|
||||
@ -20,13 +20,8 @@ def update_coin_mapping():
|
||||
try:
|
||||
logging.info("Fetching assets from Hyperliquid...")
|
||||
info = Info(constants.MAINNET_API_URL, skip_ws=True)
|
||||
# The meta object contains the 'universe' list with asset details
|
||||
meta, asset_contexts = info.meta_and_asset_ctxs()
|
||||
|
||||
# --- FIX: The asset names are in the 'universe' list inside the meta object ---
|
||||
# The 'universe' is a list of dictionaries, each with a 'name'
|
||||
hyperliquid_assets = [asset['name'] for asset in meta['universe']]
|
||||
|
||||
hyperliquid_assets = meta['universe']
|
||||
logging.info(f"Found {len(hyperliquid_assets)} assets on Hyperliquid.")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to fetch assets from Hyperliquid: {e}")
|
||||
@ -38,8 +33,11 @@ def update_coin_mapping():
|
||||
response = requests.get("https://api.coingecko.com/api/v3/coins/list")
|
||||
response.raise_for_status()
|
||||
coingecko_coins = response.json()
|
||||
# Create a lookup table: {symbol: id}
|
||||
coingecko_lookup = {coin['symbol'].upper(): coin['id'] for coin in coingecko_coins}
|
||||
|
||||
# Create more robust lookup tables
|
||||
cg_symbol_lookup = {coin['symbol'].upper(): coin['id'] for coin in coingecko_coins}
|
||||
cg_name_lookup = {coin['name'].upper(): coin['id'] for coin in coingecko_coins}
|
||||
|
||||
logging.info(f"Found {len(coingecko_coins)} coins on CoinGecko.")
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Failed to fetch coin list from CoinGecko: {e}")
|
||||
@ -47,24 +45,41 @@ def update_coin_mapping():
|
||||
|
||||
# --- 3. Create the mapping ---
|
||||
final_mapping = {}
|
||||
# Use manual overrides for critical coins where symbols are ambiguous
|
||||
manual_overrides = {
|
||||
"BTC": "bitcoin",
|
||||
"ETH": "ethereum",
|
||||
"SOL": "solana",
|
||||
"BNB": "binancecoin",
|
||||
"HYPE": "hyperliquid",
|
||||
"PUMP": "pump-fun",
|
||||
"ASTER": "astar",
|
||||
"ZEC": "zcash",
|
||||
"SUI": "sui",
|
||||
"ACE": "endurance",
|
||||
# Add other important ones you watch here
|
||||
}
|
||||
|
||||
logging.info("Generating symbol-to-id mapping...")
|
||||
for asset_symbol in hyperliquid_assets:
|
||||
# Check for manual overrides first
|
||||
for asset in hyperliquid_assets:
|
||||
asset_symbol = asset['name'].upper()
|
||||
asset_name = asset.get('name', '').upper() # Use full name if available
|
||||
|
||||
# Priority 1: Manual Overrides
|
||||
if asset_symbol in manual_overrides:
|
||||
final_mapping[asset_symbol] = manual_overrides[asset_symbol]
|
||||
continue
|
||||
|
||||
# Try to find a direct match in the CoinGecko lookup table
|
||||
if asset_symbol in coingecko_lookup:
|
||||
final_mapping[asset_symbol] = coingecko_lookup[asset_symbol]
|
||||
# Priority 2: Exact Name Match
|
||||
if asset_name in cg_name_lookup:
|
||||
final_mapping[asset_symbol] = cg_name_lookup[asset_name]
|
||||
continue
|
||||
|
||||
# Priority 3: Symbol Match
|
||||
if asset_symbol in cg_symbol_lookup:
|
||||
final_mapping[asset_symbol] = cg_symbol_lookup[asset_symbol]
|
||||
else:
|
||||
logging.warning(f"No direct match found for '{asset_symbol}' on CoinGecko. It will be excluded.")
|
||||
logging.warning(f"No match found for '{asset_symbol}' on CoinGecko. It will be excluded.")
|
||||
|
||||
# --- 4. Save the mapping to a file ---
|
||||
map_file_path = os.path.join("_data", "coin_id_map.json")
|
||||
|
||||
56
del_market_cap_tables.py
Normal file
56
del_market_cap_tables.py
Normal file
@ -0,0 +1,56 @@
|
||||
import sqlite3
|
||||
import logging
|
||||
import os
|
||||
|
||||
from logging_utils import setup_logging
|
||||
|
||||
def cleanup_market_cap_tables():
|
||||
"""
|
||||
Scans the database and drops all tables related to market cap data
|
||||
to allow for a clean refresh.
|
||||
"""
|
||||
setup_logging('normal', 'DBCleanup')
|
||||
db_path = os.path.join("_data", "market_data.db")
|
||||
|
||||
if not os.path.exists(db_path):
|
||||
logging.error(f"Database file not found at '{db_path}'. Nothing to clean.")
|
||||
return
|
||||
|
||||
logging.info(f"Connecting to database at '{db_path}'...")
|
||||
try:
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Find all tables that were created by the market cap fetcher
|
||||
cursor.execute("""
|
||||
SELECT name FROM sqlite_master
|
||||
WHERE type='table'
|
||||
AND (name LIKE '%_market_cap' OR name LIKE 'TOTAL_%')
|
||||
""")
|
||||
|
||||
tables_to_drop = cursor.fetchall()
|
||||
|
||||
if not tables_to_drop:
|
||||
logging.info("No market cap tables found to clean up. Database is already clean.")
|
||||
return
|
||||
|
||||
logging.warning(f"Found {len(tables_to_drop)} market cap tables to remove...")
|
||||
|
||||
for table in tables_to_drop:
|
||||
table_name = table[0]
|
||||
try:
|
||||
logging.info(f"Dropping table: {table_name}...")
|
||||
conn.execute(f'DROP TABLE IF EXISTS "{table_name}"')
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to drop table {table_name}: {e}")
|
||||
|
||||
conn.commit()
|
||||
logging.info("--- Database cleanup complete ---")
|
||||
|
||||
except sqlite3.Error as e:
|
||||
logging.error(f"A database error occurred: {e}")
|
||||
except Exception as e:
|
||||
logging.error(f"An unexpected error occurred: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
cleanup_market_cap_tables()
|
||||
@ -1,11 +1,33 @@
|
||||
import logging
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
import traceback
|
||||
from hyperliquid.info import Info
|
||||
from hyperliquid.utils import constants
|
||||
|
||||
from logging_utils import setup_logging
|
||||
|
||||
# --- Configuration for standalone error logging ---
|
||||
LOGS_DIR = "_logs"
|
||||
ERROR_LOG_FILE = os.path.join(LOGS_DIR, "live_market_errors.log")
|
||||
|
||||
def log_error(error_message: str, include_traceback: bool = True):
|
||||
"""A simple, robust file logger for any errors."""
|
||||
try:
|
||||
if not os.path.exists(LOGS_DIR):
|
||||
os.makedirs(LOGS_DIR)
|
||||
|
||||
with open(ERROR_LOG_FILE, 'a') as f:
|
||||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
|
||||
f.write(f"--- ERROR at {timestamp} UTC ---\n")
|
||||
f.write(error_message + "\n")
|
||||
if include_traceback:
|
||||
f.write(traceback.format_exc() + "\n")
|
||||
f.write("="*50 + "\n")
|
||||
except Exception:
|
||||
print(f"CRITICAL: Failed to write to error log file: {error_message}", file=sys.stderr)
|
||||
|
||||
def on_message(message, shared_prices_dict):
|
||||
"""
|
||||
Callback function to process incoming 'allMids' messages and update the
|
||||
@ -14,36 +36,71 @@ def on_message(message, shared_prices_dict):
|
||||
try:
|
||||
if message.get("channel") == "allMids":
|
||||
new_prices = message.get("data", {}).get("mids", {})
|
||||
# Update the shared dictionary with the new price data
|
||||
shared_prices_dict.update(new_prices)
|
||||
except Exception as e:
|
||||
# It's important to log errors inside the process
|
||||
logging.error(f"Error in WebSocket on_message: {e}")
|
||||
log_error(f"Error in WebSocket on_message: {e}")
|
||||
|
||||
def start_live_feed(shared_prices_dict, log_level='off'):
|
||||
"""
|
||||
Main function for the WebSocket process. It takes a shared dictionary
|
||||
and continuously feeds it with live market data.
|
||||
Includes a watchdog to auto-reconnect on failure.
|
||||
"""
|
||||
setup_logging(log_level, 'LiveMarketFeed')
|
||||
|
||||
# The Info object manages the WebSocket connection.
|
||||
info = Info(constants.MAINNET_API_URL, skip_ws=False)
|
||||
|
||||
# We need to wrap the callback in a lambda to pass our shared dictionary
|
||||
info = None
|
||||
callback = lambda msg: on_message(msg, shared_prices_dict)
|
||||
|
||||
# Subscribe to the allMids channel
|
||||
subscription = {"type": "allMids"}
|
||||
info.subscribe(subscription, callback)
|
||||
logging.info("Subscribed to 'allMids' for live mark prices.")
|
||||
def connect_and_subscribe():
|
||||
"""Establishes a new WebSocket connection and subscribes to allMids."""
|
||||
try:
|
||||
logging.info("Connecting to Hyperliquid WebSocket...")
|
||||
# Ensure skip_ws=False to create the ws_manager
|
||||
new_info = Info(constants.MAINNET_API_URL, skip_ws=False)
|
||||
subscription = {"type": "allMids"}
|
||||
new_info.subscribe(subscription, callback)
|
||||
logging.info("WebSocket connected and subscribed to 'allMids'.")
|
||||
return new_info
|
||||
except Exception as e:
|
||||
log_error(f"Failed to connect to WebSocket: {e}")
|
||||
return None
|
||||
|
||||
info = connect_and_subscribe()
|
||||
|
||||
logging.info("Starting live price feed process. Press Ctrl+C in main app to stop.")
|
||||
|
||||
try:
|
||||
# The background thread in the SDK handles messages. This loop just keeps the process alive.
|
||||
while True:
|
||||
time.sleep(1)
|
||||
# --- Watchdog Logic ---
|
||||
time.sleep(15) # Check the connection every 15 seconds
|
||||
|
||||
if info is None or not info.ws_manager.is_running():
|
||||
# --- FIX: Log this critical failure to the persistent error log ---
|
||||
error_msg = "WebSocket connection lost or not running. Attempting to reconnect..."
|
||||
logging.warning(error_msg)
|
||||
log_error(error_msg, include_traceback=False) # Log it to the file
|
||||
|
||||
if info:
|
||||
try:
|
||||
info.ws_manager.stop() # Clean up old manager
|
||||
except Exception as e:
|
||||
log_error(f"Error stopping old ws_manager: {e}")
|
||||
|
||||
info = connect_and_subscribe()
|
||||
|
||||
if info is None:
|
||||
logging.error("Reconnect failed, will retry in 15s.")
|
||||
else:
|
||||
logging.info("Successfully reconnected to WebSocket.")
|
||||
else:
|
||||
logging.debug("Watchdog check: WebSocket connection is active.")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Stopping WebSocket listener...")
|
||||
info.ws_manager.stop()
|
||||
except Exception as e:
|
||||
log_error(f"Live Market Feed process crashed: {e}")
|
||||
finally:
|
||||
if info and info.ws_manager:
|
||||
info.ws_manager.stop()
|
||||
logging.info("Listener stopped.")
|
||||
|
||||
|
||||
31
main_app.py
31
main_app.py
@ -11,11 +11,12 @@ import pandas as pd
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from logging_utils import setup_logging
|
||||
# --- Using the high-performance WebSocket utility for live prices ---
|
||||
# --- Using the new high-performance WebSocket utility for live prices ---
|
||||
from live_market_utils import start_live_feed
|
||||
|
||||
# --- Configuration ---
|
||||
WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"]
|
||||
# --- FIX: Replaced old data_fetcher with the new live_candle_fetcher ---
|
||||
LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py"
|
||||
RESAMPLER_SCRIPT = "resampler.py"
|
||||
MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py"
|
||||
@ -26,9 +27,6 @@ MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json")
|
||||
LOGS_DIR = "_logs"
|
||||
TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json")
|
||||
|
||||
# --- ADDED: Standard list of timeframes for the resampler to generate ---
|
||||
STANDARD_RESAMPLING_TIMEFRAMES = ["3m", "5m", "15m", "30m", "37m", "148m", "1h", "2h", "4h", "8h", "12h", "1d", "3d", "1w", "1M"]
|
||||
|
||||
|
||||
def format_market_cap(mc_value):
|
||||
"""Formats a large number into a human-readable market cap string."""
|
||||
@ -49,6 +47,7 @@ def run_live_candle_fetcher():
|
||||
while True:
|
||||
try:
|
||||
with open(log_file, 'a') as f:
|
||||
# We can't get coins from strategies.json here, so we pass the default list
|
||||
command = [sys.executable, LIVE_CANDLE_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"]
|
||||
f.write(f"\n--- Starting {LIVE_CANDLE_FETCHER_SCRIPT} at {datetime.now()} ---\n")
|
||||
subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT)
|
||||
@ -73,13 +72,12 @@ def run_resampler_job(timeframes_to_generate: list):
|
||||
f.write(f"Failed to run resampler.py job: {e}\n")
|
||||
|
||||
|
||||
def resampler_scheduler():
|
||||
"""Schedules the resampler.py script to run at the start of every minute."""
|
||||
def resampler_scheduler(timeframes_to_generate: list):
|
||||
"""Schedules the resampler.py script."""
|
||||
setup_logging('off', 'ResamplerScheduler')
|
||||
# Run once at startup
|
||||
run_resampler_job(STANDARD_RESAMPLING_TIMEFRAMES)
|
||||
run_resampler_job(timeframes_to_generate)
|
||||
# Schedule to run every minute at the :01 second mark
|
||||
schedule.every().minute.at(":01").do(run_resampler_job, timeframes_to_generate=STANDARD_RESAMPLING_TIMEFRAMES)
|
||||
schedule.every().minute.at(":01").do(run_resampler_job, timeframes_to_generate=timeframes_to_generate)
|
||||
logging.info("Resampler scheduled to run every minute at :01.")
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
@ -90,7 +88,7 @@ def run_market_cap_fetcher_job():
|
||||
"""Defines the job for the market cap fetcher, redirecting output."""
|
||||
log_file = os.path.join(LOGS_DIR, "market_cap_fetcher.log")
|
||||
try:
|
||||
command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"]
|
||||
command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--log-level", "off"]
|
||||
with open(log_file, 'a') as f:
|
||||
f.write(f"\n--- Starting {MARKET_CAP_FETCHER_SCRIPT} job at {datetime.now()} ---\n")
|
||||
subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT)
|
||||
@ -319,13 +317,22 @@ if __name__ == "__main__":
|
||||
logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}")
|
||||
sys.exit(1)
|
||||
|
||||
required_timeframes = set()
|
||||
for name, config in strategy_configs.items():
|
||||
if config.get("enabled", False):
|
||||
tf = config.get("parameters", {}).get("timeframe")
|
||||
if tf:
|
||||
required_timeframes.add(tf)
|
||||
|
||||
if not required_timeframes:
|
||||
logging.warning("No timeframes required by any enabled strategy.")
|
||||
|
||||
with multiprocessing.Manager() as manager:
|
||||
shared_prices = manager.dict()
|
||||
|
||||
processes["Live Market Feed"] = multiprocessing.Process(target=start_live_feed, args=(shared_prices, 'off'), daemon=True)
|
||||
processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True)
|
||||
# --- FIX: The resampler now uses a fixed list of TFs and a new schedule ---
|
||||
processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, daemon=True)
|
||||
processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True)
|
||||
processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True)
|
||||
processes["Trade Executor"] = multiprocessing.Process(target=run_trade_executor, daemon=True)
|
||||
|
||||
|
||||
@ -41,7 +41,6 @@ class MarketCapFetcher:
|
||||
"DAI": "dai", "PYUSD": "paypal-usd"
|
||||
}
|
||||
|
||||
# --- ADDED: Ensure all tables have the correct schema ---
|
||||
self._ensure_tables_exist()
|
||||
|
||||
def _ensure_tables_exist(self):
|
||||
@ -291,8 +290,14 @@ class MarketCapFetcher:
|
||||
if not market_caps: return pd.DataFrame()
|
||||
|
||||
df = pd.DataFrame(market_caps, columns=['timestamp_ms', 'market_cap'])
|
||||
# --- FIX: Convert to datetime object, but do not format as string ---
|
||||
df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms')
|
||||
|
||||
# --- FIX: Normalize all timestamps to the start of the day (00:00:00 UTC) ---
|
||||
# This prevents duplicate entries for the same day (e.g., a "live" candle vs. the daily one)
|
||||
df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms').dt.normalize()
|
||||
|
||||
# Recalculate the timestamp_ms to match the normalized 00:00:00 datetime
|
||||
df['timestamp_ms'] = (df['datetime_utc'].astype('int64') // 10**6)
|
||||
|
||||
df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True)
|
||||
return df[['datetime_utc', 'timestamp_ms', 'market_cap']]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user