import argparse import logging import os import sys import sqlite3 import pandas as pd import requests import time from datetime import datetime, timezone, timedelta import json from dotenv import load_dotenv load_dotenv() from logging_utils import setup_logging class MarketCapFetcher: """ Fetches historical daily market cap data from the CoinGecko API and intelligently upserts it into the SQLite database for all coins. """ def __init__(self, log_level: str): setup_logging(log_level, 'MarketCapFetcher') self.db_path = os.path.join("_data", "market_data.db") self.api_base_url = "https://api.coingecko.com/api/v3" self.api_key = os.environ.get("COINGECKO_API_KEY") if not self.api_key: logging.error("CoinGecko API key not found. Please set the COINGECKO_API_KEY environment variable.") sys.exit(1) self.COIN_ID_MAP = self._load_coin_id_map() if not self.COIN_ID_MAP: logging.error("Coin ID map is empty. Run 'update_coin_map.py' to generate it.") sys.exit(1) self.coins_to_fetch = list(self.COIN_ID_MAP.keys()) self.STABLECOIN_ID_MAP = { "USDT": "tether", "USDC": "usd-coin", "USDE": "ethena-usde", "DAI": "dai", "PYUSD": "paypal-usd" } self._ensure_tables_exist() def _ensure_tables_exist(self): """Ensures all market cap tables exist with timestamp_ms as PRIMARY KEY.""" all_tables_to_check = [f"{coin}_market_cap" for coin in self.coins_to_fetch] all_tables_to_check.extend(["STABLECOINS_market_cap", "TOTAL_market_cap_daily"]) with sqlite3.connect(self.db_path) as conn: for table_name in all_tables_to_check: cursor = conn.cursor() cursor.execute(f"PRAGMA table_info('{table_name}')") columns = cursor.fetchall() if columns: pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns) if not pk_found: logging.warning(f"Schema for table '{table_name}' is incorrect. Dropping and recreating table.") try: conn.execute(f'DROP TABLE "{table_name}"') self._create_market_cap_table(conn, table_name) logging.info(f"Successfully recreated schema for '{table_name}'.") except Exception as e: logging.error(f"FATAL: Failed to recreate table '{table_name}': {e}. Please delete 'market_data.db' and restart.") sys.exit(1) else: self._create_market_cap_table(conn, table_name) logging.info("All market cap table schemas verified.") def _create_market_cap_table(self, conn, table_name): """Creates a new market cap table with the correct schema.""" conn.execute(f''' CREATE TABLE IF NOT EXISTS "{table_name}" ( datetime_utc TEXT, timestamp_ms INTEGER PRIMARY KEY, market_cap REAL ) ''') def _load_coin_id_map(self) -> dict: """Loads the dynamically generated coin-to-id mapping.""" map_file_path = os.path.join("_data", "coin_id_map.json") try: with open(map_file_path, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load '{map_file_path}'. Please run 'update_coin_map.py' first. Error: {e}") return {} def _upsert_market_cap_data(self, conn, table_name: str, df: pd.DataFrame): """Upserts a DataFrame of market cap data into the specified table.""" if df.empty: return records_to_upsert = [] for index, row in df.iterrows(): records_to_upsert.append(( row['datetime_utc'].strftime('%Y-%m-%d %H:%M:%S'), row['timestamp_ms'], row['market_cap'] )) cursor = conn.cursor() cursor.executemany(f''' INSERT OR REPLACE INTO "{table_name}" (datetime_utc, timestamp_ms, market_cap) VALUES (?, ?, ?) ''', records_to_upsert) conn.commit() logging.info(f"Successfully upserted {len(records_to_upsert)} records into '{table_name}'.") def run(self): """ Main execution function to process all configured coins and update the database. """ logging.info("Starting historical market cap fetch process from CoinGecko...") with sqlite3.connect(self.db_path) as conn: conn.execute("PRAGMA journal_mode=WAL;") for coin_symbol in self.coins_to_fetch: coin_id = self.COIN_ID_MAP.get(coin_symbol.upper()) if not coin_id: logging.warning(f"No CoinGecko ID found for '{coin_symbol}'. Skipping.") continue logging.info(f"--- Processing {coin_symbol} ({coin_id}) ---") try: self._update_market_cap_for_coin(coin_id, coin_symbol, conn) except Exception as e: logging.error(f"An unexpected error occurred while processing {coin_symbol}: {e}") time.sleep(2) self._update_stablecoin_aggregate(conn) self._update_total_market_cap(conn) self._save_summary(conn) logging.info("--- Market cap fetch process complete ---") def _save_summary(self, conn): # ... (This function is unchanged) logging.info("--- Generating Market Cap Summary ---") summary_data = {} summary_file_path = os.path.join("_data", "market_cap_data.json") try: cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE '%_market_cap' OR name LIKE 'TOTAL_%');") tables = [row[0] for row in cursor.fetchall()] for table_name in tables: try: df_last = pd.read_sql(f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT 1', conn) if not df_last.empty: summary_data[table_name] = df_last.to_dict('records')[0] except Exception as e: logging.error(f"Could not read last record from table '{table_name}': {e}") if summary_data: summary_data['summary_last_updated_utc'] = datetime.now(timezone.utc).isoformat() with open(summary_file_path, 'w', encoding='utf-8') as f: json.dump(summary_data, f, indent=4) logging.info(f"Successfully saved market cap summary to '{summary_file_path}'") else: logging.warning("No data found to create a summary.") except Exception as e: logging.error(f"Failed to generate summary: {e}") def _update_total_market_cap(self, conn): """Fetches the current total market cap and upserts it for the current date.""" logging.info("--- Processing Total Market Cap ---") table_name = "TOTAL_market_cap_daily" try: today_date = datetime.now(timezone.utc).date() today_dt = pd.to_datetime(today_date) today_ts = int(today_dt.timestamp() * 1000) logging.info("Fetching current global market data...") url = f"{self.api_base_url}/global" headers = {"x-cg-demo-api-key": self.api_key} response = requests.get(url, headers=headers) response.raise_for_status() global_data = response.json().get('data', {}) total_mc = global_data.get('total_market_cap', {}).get('usd') if total_mc: df_total = pd.DataFrame([{ 'datetime_utc': today_dt, 'timestamp_ms': today_ts, 'market_cap': total_mc }]) self._upsert_market_cap_data(conn, table_name, df_total) logging.info(f"Saved total market cap for {today_date}: ${total_mc:,.2f}") except requests.exceptions.RequestException as e: logging.error(f"Failed to fetch global market data: {e}") except Exception as e: logging.error(f"An error occurred while updating total market cap: {e}") def _update_stablecoin_aggregate(self, conn): """Fetches data for all stablecoins and saves the aggregated market cap.""" logging.info("--- Processing aggregated stablecoin market cap ---") all_stablecoin_df = pd.DataFrame() for symbol, coin_id in self.STABLECOIN_ID_MAP.items(): logging.info(f"Fetching historical data for stablecoin: {symbol}...") df = self._fetch_historical_data(coin_id, days=365) if not df.empty: all_stablecoin_df = pd.concat([all_stablecoin_df, df]) time.sleep(2) if all_stablecoin_df.empty: logging.warning("No data fetched for any stablecoins. Cannot create aggregate.") return aggregated_df = all_stablecoin_df.groupby('timestamp_ms').agg( datetime_utc=('datetime_utc', 'first'), market_cap=('market_cap', 'sum') ).reset_index() table_name = "STABLECOINS_market_cap" last_date_in_db = self._get_last_date_from_db(table_name, conn, is_timestamp_ms=True) if last_date_in_db: aggregated_df = aggregated_df[aggregated_df['timestamp_ms'] > last_date_in_db] if not aggregated_df.empty: self._upsert_market_cap_data(conn, table_name, aggregated_df) else: logging.info("Aggregated stablecoin data is already up-to-date.") def _update_market_cap_for_coin(self, coin_id: str, coin_symbol: str, conn): """Fetches and appends new market cap data for a single coin.""" table_name = f"{coin_symbol}_market_cap" last_date_in_db = self._get_last_date_from_db(table_name, conn, is_timestamp_ms=True) days_to_fetch = 365 if last_date_in_db: delta_days = (datetime.now(timezone.utc) - datetime.fromtimestamp(last_date_in_db/1000, tz=timezone.utc)).days if delta_days <= 0: logging.info(f"Market cap data for '{coin_symbol}' is already up-to-date.") return days_to_fetch = min(delta_days + 1, 365) else: logging.info(f"No existing data found. Fetching initial {days_to_fetch} days for {coin_symbol}.") df = self._fetch_historical_data(coin_id, days=days_to_fetch) if df.empty: logging.warning(f"No market cap data returned from API for {coin_symbol}.") return if last_date_in_db: df = df[df['timestamp_ms'] > last_date_in_db] if not df.empty: self._upsert_market_cap_data(conn, table_name, df) else: logging.info(f"Data was fetched, but no new records needed saving for '{coin_symbol}'.") def _get_last_date_from_db(self, table_name: str, conn, is_timestamp_ms: bool = False): """Gets the most recent date or timestamp from a market cap table.""" try: cursor = conn.cursor() cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';") if not cursor.fetchone(): return None col_to_query = "timestamp_ms" if is_timestamp_ms else "datetime_utc" last_val = pd.read_sql(f'SELECT MAX({col_to_query}) FROM "{table_name}"', conn).iloc[0, 0] if pd.isna(last_val): return None if is_timestamp_ms: return int(last_val) return pd.to_datetime(last_val) except Exception as e: logging.error(f"Could not read last date from table '{table_name}': {e}") return None def _fetch_historical_data(self, coin_id: str, days: int) -> pd.DataFrame: """Fetches historical market chart data from CoinGecko for a specified number of days.""" url = f"{self.api_base_url}/coins/{coin_id}/market_chart" params = { "vs_currency": "usd", "days": days, "interval": "daily" } headers = {"x-cg-demo-api-key": self.api_key} try: logging.debug(f"Fetching last {days} days for {coin_id}...") response = requests.get(url, headers=headers, params=params) response.raise_for_status() data = response.json() market_caps = data.get('market_caps', []) if not market_caps: return pd.DataFrame() df = pd.DataFrame(market_caps, columns=['timestamp_ms', 'market_cap']) # --- FIX: Normalize all timestamps to the start of the day (00:00:00 UTC) --- # This prevents duplicate entries for the same day (e.g., a "live" candle vs. the daily one) df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms').dt.normalize() # Recalculate the timestamp_ms to match the normalized 00:00:00 datetime df['timestamp_ms'] = (df['datetime_utc'].astype('int64') // 10**6) df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True) return df[['datetime_utc', 'timestamp_ms', 'market_cap']] except requests.exceptions.RequestException as e: logging.error(f"API request failed for {coin_id}: {e}.") return pd.DataFrame() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fetch historical market cap data from CoinGecko.") parser.add_argument( "--log-level", default="normal", choices=['off', 'normal', 'debug'], help="Set the logging level for the script." ) args = parser.parse_args() fetcher = MarketCapFetcher(log_level=args.log_level) fetcher.run()