Files
hyper/market_cap_fetcher.py
2025-10-25 19:58:52 +02:00

322 lines
14 KiB
Python

import argparse
import logging
import os
import sys
import sqlite3
import pandas as pd
import requests
import time
from datetime import datetime, timezone, timedelta
import json
from dotenv import load_dotenv
load_dotenv()
from logging_utils import setup_logging
class MarketCapFetcher:
"""
Fetches historical daily market cap data from the CoinGecko API and
intelligently upserts it into the SQLite database for all coins.
"""
def __init__(self, log_level: str):
setup_logging(log_level, 'MarketCapFetcher')
self.db_path = os.path.join("_data", "market_data.db")
self.api_base_url = "https://api.coingecko.com/api/v3"
self.api_key = os.environ.get("COINGECKO_API_KEY")
if not self.api_key:
logging.error("CoinGecko API key not found. Please set the COINGECKO_API_KEY environment variable.")
sys.exit(1)
self.COIN_ID_MAP = self._load_coin_id_map()
if not self.COIN_ID_MAP:
logging.error("Coin ID map is empty. Run 'update_coin_map.py' to generate it.")
sys.exit(1)
self.coins_to_fetch = list(self.COIN_ID_MAP.keys())
self.STABLECOIN_ID_MAP = {
"USDT": "tether", "USDC": "usd-coin", "USDE": "ethena-usde",
"DAI": "dai", "PYUSD": "paypal-usd"
}
self._ensure_tables_exist()
def _ensure_tables_exist(self):
"""Ensures all market cap tables exist with timestamp_ms as PRIMARY KEY."""
all_tables_to_check = [f"{coin}_market_cap" for coin in self.coins_to_fetch]
all_tables_to_check.extend(["STABLECOINS_market_cap", "TOTAL_market_cap_daily"])
with sqlite3.connect(self.db_path) as conn:
for table_name in all_tables_to_check:
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info('{table_name}')")
columns = cursor.fetchall()
if columns:
pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns)
if not pk_found:
logging.warning(f"Schema for table '{table_name}' is incorrect. Dropping and recreating table.")
try:
conn.execute(f'DROP TABLE "{table_name}"')
self._create_market_cap_table(conn, table_name)
logging.info(f"Successfully recreated schema for '{table_name}'.")
except Exception as e:
logging.error(f"FATAL: Failed to recreate table '{table_name}': {e}. Please delete 'market_data.db' and restart.")
sys.exit(1)
else:
self._create_market_cap_table(conn, table_name)
logging.info("All market cap table schemas verified.")
def _create_market_cap_table(self, conn, table_name):
"""Creates a new market cap table with the correct schema."""
conn.execute(f'''
CREATE TABLE IF NOT EXISTS "{table_name}" (
datetime_utc TEXT,
timestamp_ms INTEGER PRIMARY KEY,
market_cap REAL
)
''')
def _load_coin_id_map(self) -> dict:
"""Loads the dynamically generated coin-to-id mapping."""
map_file_path = os.path.join("_data", "coin_id_map.json")
try:
with open(map_file_path, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load '{map_file_path}'. Please run 'update_coin_map.py' first. Error: {e}")
return {}
def _upsert_market_cap_data(self, conn, table_name: str, df: pd.DataFrame):
"""Upserts a DataFrame of market cap data into the specified table."""
if df.empty:
return
records_to_upsert = []
for index, row in df.iterrows():
records_to_upsert.append((
row['datetime_utc'].strftime('%Y-%m-%d %H:%M:%S'),
row['timestamp_ms'],
row['market_cap']
))
cursor = conn.cursor()
cursor.executemany(f'''
INSERT OR REPLACE INTO "{table_name}" (datetime_utc, timestamp_ms, market_cap)
VALUES (?, ?, ?)
''', records_to_upsert)
conn.commit()
logging.info(f"Successfully upserted {len(records_to_upsert)} records into '{table_name}'.")
def run(self):
"""
Main execution function to process all configured coins and update the database.
"""
logging.info("Starting historical market cap fetch process from CoinGecko...")
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
for coin_symbol in self.coins_to_fetch:
coin_id = self.COIN_ID_MAP.get(coin_symbol.upper())
if not coin_id:
logging.warning(f"No CoinGecko ID found for '{coin_symbol}'. Skipping.")
continue
logging.info(f"--- Processing {coin_symbol} ({coin_id}) ---")
try:
self._update_market_cap_for_coin(coin_id, coin_symbol, conn)
except Exception as e:
logging.error(f"An unexpected error occurred while processing {coin_symbol}: {e}")
time.sleep(2)
self._update_stablecoin_aggregate(conn)
self._update_total_market_cap(conn)
self._save_summary(conn)
logging.info("--- Market cap fetch process complete ---")
def _save_summary(self, conn):
# ... (This function is unchanged)
logging.info("--- Generating Market Cap Summary ---")
summary_data = {}
summary_file_path = os.path.join("_data", "market_cap_data.json")
try:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE '%_market_cap' OR name LIKE 'TOTAL_%');")
tables = [row[0] for row in cursor.fetchall()]
for table_name in tables:
try:
df_last = pd.read_sql(f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT 1', conn)
if not df_last.empty:
summary_data[table_name] = df_last.to_dict('records')[0]
except Exception as e:
logging.error(f"Could not read last record from table '{table_name}': {e}")
if summary_data:
summary_data['summary_last_updated_utc'] = datetime.now(timezone.utc).isoformat()
with open(summary_file_path, 'w', encoding='utf-8') as f:
json.dump(summary_data, f, indent=4)
logging.info(f"Successfully saved market cap summary to '{summary_file_path}'")
else:
logging.warning("No data found to create a summary.")
except Exception as e:
logging.error(f"Failed to generate summary: {e}")
def _update_total_market_cap(self, conn):
"""Fetches the current total market cap and upserts it for the current date."""
logging.info("--- Processing Total Market Cap ---")
table_name = "TOTAL_market_cap_daily"
try:
today_date = datetime.now(timezone.utc).date()
today_dt = pd.to_datetime(today_date)
today_ts = int(today_dt.timestamp() * 1000)
logging.info("Fetching current global market data...")
url = f"{self.api_base_url}/global"
headers = {"x-cg-demo-api-key": self.api_key}
response = requests.get(url, headers=headers)
response.raise_for_status()
global_data = response.json().get('data', {})
total_mc = global_data.get('total_market_cap', {}).get('usd')
if total_mc:
df_total = pd.DataFrame([{
'datetime_utc': today_dt,
'timestamp_ms': today_ts,
'market_cap': total_mc
}])
self._upsert_market_cap_data(conn, table_name, df_total)
logging.info(f"Saved total market cap for {today_date}: ${total_mc:,.2f}")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to fetch global market data: {e}")
except Exception as e:
logging.error(f"An error occurred while updating total market cap: {e}")
def _update_stablecoin_aggregate(self, conn):
"""Fetches data for all stablecoins and saves the aggregated market cap."""
logging.info("--- Processing aggregated stablecoin market cap ---")
all_stablecoin_df = pd.DataFrame()
for symbol, coin_id in self.STABLECOIN_ID_MAP.items():
logging.info(f"Fetching historical data for stablecoin: {symbol}...")
df = self._fetch_historical_data(coin_id, days=365)
if not df.empty:
all_stablecoin_df = pd.concat([all_stablecoin_df, df])
time.sleep(2)
if all_stablecoin_df.empty:
logging.warning("No data fetched for any stablecoins. Cannot create aggregate.")
return
aggregated_df = all_stablecoin_df.groupby('timestamp_ms').agg(
datetime_utc=('datetime_utc', 'first'),
market_cap=('market_cap', 'sum')
).reset_index()
table_name = "STABLECOINS_market_cap"
last_date_in_db = self._get_last_date_from_db(table_name, conn, is_timestamp_ms=True)
if last_date_in_db:
aggregated_df = aggregated_df[aggregated_df['timestamp_ms'] > last_date_in_db]
if not aggregated_df.empty:
self._upsert_market_cap_data(conn, table_name, aggregated_df)
else:
logging.info("Aggregated stablecoin data is already up-to-date.")
def _update_market_cap_for_coin(self, coin_id: str, coin_symbol: str, conn):
"""Fetches and appends new market cap data for a single coin."""
table_name = f"{coin_symbol}_market_cap"
last_date_in_db = self._get_last_date_from_db(table_name, conn, is_timestamp_ms=True)
days_to_fetch = 365
if last_date_in_db:
delta_days = (datetime.now(timezone.utc) - datetime.fromtimestamp(last_date_in_db/1000, tz=timezone.utc)).days
if delta_days <= 0:
logging.info(f"Market cap data for '{coin_symbol}' is already up-to-date.")
return
days_to_fetch = min(delta_days + 1, 365)
else:
logging.info(f"No existing data found. Fetching initial {days_to_fetch} days for {coin_symbol}.")
df = self._fetch_historical_data(coin_id, days=days_to_fetch)
if df.empty:
logging.warning(f"No market cap data returned from API for {coin_symbol}.")
return
if last_date_in_db:
df = df[df['timestamp_ms'] > last_date_in_db]
if not df.empty:
self._upsert_market_cap_data(conn, table_name, df)
else:
logging.info(f"Data was fetched, but no new records needed saving for '{coin_symbol}'.")
def _get_last_date_from_db(self, table_name: str, conn, is_timestamp_ms: bool = False):
"""Gets the most recent date or timestamp from a market cap table."""
try:
cursor = conn.cursor()
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
if not cursor.fetchone():
return None
col_to_query = "timestamp_ms" if is_timestamp_ms else "datetime_utc"
last_val = pd.read_sql(f'SELECT MAX({col_to_query}) FROM "{table_name}"', conn).iloc[0, 0]
if pd.isna(last_val):
return None
if is_timestamp_ms:
return int(last_val)
return pd.to_datetime(last_val)
except Exception as e:
logging.error(f"Could not read last date from table '{table_name}': {e}")
return None
def _fetch_historical_data(self, coin_id: str, days: int) -> pd.DataFrame:
"""Fetches historical market chart data from CoinGecko for a specified number of days."""
url = f"{self.api_base_url}/coins/{coin_id}/market_chart"
params = { "vs_currency": "usd", "days": days, "interval": "daily" }
headers = {"x-cg-demo-api-key": self.api_key}
try:
logging.debug(f"Fetching last {days} days for {coin_id}...")
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
market_caps = data.get('market_caps', [])
if not market_caps: return pd.DataFrame()
df = pd.DataFrame(market_caps, columns=['timestamp_ms', 'market_cap'])
# --- FIX: Normalize all timestamps to the start of the day (00:00:00 UTC) ---
# This prevents duplicate entries for the same day (e.g., a "live" candle vs. the daily one)
df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms').dt.normalize()
# Recalculate the timestamp_ms to match the normalized 00:00:00 datetime
df['timestamp_ms'] = (df['datetime_utc'].astype('int64') // 10**6)
df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True)
return df[['datetime_utc', 'timestamp_ms', 'market_cap']]
except requests.exceptions.RequestException as e:
logging.error(f"API request failed for {coin_id}: {e}.")
return pd.DataFrame()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fetch historical market cap data from CoinGecko.")
parser.add_argument(
"--log-level",
default="normal",
choices=['off', 'normal', 'debug'],
help="Set the logging level for the script."
)
args = parser.parse_args()
fetcher = MarketCapFetcher(log_level=args.log_level)
fetcher.run()