From 58056012181bc5a35840a2c870f6877aaa3566d6 Mon Sep 17 00:00:00 2001 From: DiTus Date: Wed, 22 Oct 2025 22:22:13 +0200 Subject: [PATCH] tmiestamp_ms column added to all tables as primary key --- market_cap_fetcher.py | 154 ++++++++++++++++++++++++++---------------- resampler.py | 50 ++++++++------ 2 files changed, 125 insertions(+), 79 deletions(-) diff --git a/market_cap_fetcher.py b/market_cap_fetcher.py index 95877f0..2557828 100644 --- a/market_cap_fetcher.py +++ b/market_cap_fetcher.py @@ -17,7 +17,7 @@ from logging_utils import setup_logging class MarketCapFetcher: """ Fetches historical daily market cap data from the CoinGecko API and - intelligently updates the SQLite database for all coins found in the coin map. + intelligently upserts it into the SQLite database for all coins. """ def __init__(self, log_level: str): @@ -34,16 +34,50 @@ class MarketCapFetcher: logging.error("Coin ID map is empty. Run 'update_coin_map.py' to generate it.") sys.exit(1) - # --- FIX: The list of coins to fetch is now all coins from the map --- self.coins_to_fetch = list(self.COIN_ID_MAP.keys()) self.STABLECOIN_ID_MAP = { - "USDT": "tether", - "USDC": "usd-coin", - "USDE": "ethena-usde", - "DAI": "dai", - "PYUSD": "paypal-usd" + "USDT": "tether", "USDC": "usd-coin", "USDE": "ethena-usde", + "DAI": "dai", "PYUSD": "paypal-usd" } + + # --- ADDED: Ensure all tables have the correct schema --- + self._ensure_tables_exist() + + def _ensure_tables_exist(self): + """Ensures all market cap tables exist with timestamp_ms as PRIMARY KEY.""" + all_tables_to_check = [f"{coin}_market_cap" for coin in self.coins_to_fetch] + all_tables_to_check.extend(["STABLECOINS_market_cap", "TOTAL_market_cap_daily"]) + + with sqlite3.connect(self.db_path) as conn: + for table_name in all_tables_to_check: + cursor = conn.cursor() + cursor.execute(f"PRAGMA table_info('{table_name}')") + columns = cursor.fetchall() + if columns: + pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns) + if not pk_found: + logging.warning(f"Schema for table '{table_name}' is incorrect. Dropping and recreating table.") + try: + conn.execute(f'DROP TABLE "{table_name}"') + self._create_market_cap_table(conn, table_name) + logging.info(f"Successfully recreated schema for '{table_name}'.") + except Exception as e: + logging.error(f"FATAL: Failed to recreate table '{table_name}': {e}. Please delete 'market_data.db' and restart.") + sys.exit(1) + else: + self._create_market_cap_table(conn, table_name) + logging.info("All market cap table schemas verified.") + + def _create_market_cap_table(self, conn, table_name): + """Creates a new market cap table with the correct schema.""" + conn.execute(f''' + CREATE TABLE IF NOT EXISTS "{table_name}" ( + datetime_utc TEXT, + timestamp_ms INTEGER PRIMARY KEY, + market_cap REAL + ) + ''') def _load_coin_id_map(self) -> dict: """Loads the dynamically generated coin-to-id mapping.""" @@ -55,6 +89,27 @@ class MarketCapFetcher: logging.error(f"Could not load '{map_file_path}'. Please run 'update_coin_map.py' first. Error: {e}") return {} + def _upsert_market_cap_data(self, conn, table_name: str, df: pd.DataFrame): + """Upserts a DataFrame of market cap data into the specified table.""" + if df.empty: + return + + records_to_upsert = [] + for index, row in df.iterrows(): + records_to_upsert.append(( + row['datetime_utc'].strftime('%Y-%m-%d %H:%M:%S'), + row['timestamp_ms'], + row['market_cap'] + )) + + cursor = conn.cursor() + cursor.executemany(f''' + INSERT OR REPLACE INTO "{table_name}" (datetime_utc, timestamp_ms, market_cap) + VALUES (?, ?, ?) + ''', records_to_upsert) + conn.commit() + logging.info(f"Successfully upserted {len(records_to_upsert)} records into '{table_name}'.") + def run(self): """ Main execution function to process all configured coins and update the database. @@ -63,7 +118,6 @@ class MarketCapFetcher: with sqlite3.connect(self.db_path) as conn: conn.execute("PRAGMA journal_mode=WAL;") - # 1. Process individual coins from the map for coin_symbol in self.coins_to_fetch: coin_id = self.COIN_ID_MAP.get(coin_symbol.upper()) if not coin_id: @@ -76,30 +130,21 @@ class MarketCapFetcher: logging.error(f"An unexpected error occurred while processing {coin_symbol}: {e}") time.sleep(2) - # 2. Process and aggregate stablecoins self._update_stablecoin_aggregate(conn) - - # 3. Process total market cap metrics self._update_total_market_cap(conn) - - # 4. Save a summary of the latest data self._save_summary(conn) logging.info("--- Market cap fetch process complete ---") def _save_summary(self, conn): - """ - Queries the last record from each market cap table and saves a summary to a JSON file. - """ + # ... (This function is unchanged) logging.info("--- Generating Market Cap Summary ---") summary_data = {} summary_file_path = os.path.join("_data", "market_cap_data.json") - try: cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE '%_market_cap' OR name LIKE 'TOTAL_%');") tables = [row[0] for row in cursor.fetchall()] - for table_name in tables: try: df_last = pd.read_sql(f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT 1', conn) @@ -107,38 +152,24 @@ class MarketCapFetcher: summary_data[table_name] = df_last.to_dict('records')[0] except Exception as e: logging.error(f"Could not read last record from table '{table_name}': {e}") - if summary_data: summary_data['summary_last_updated_utc'] = datetime.now(timezone.utc).isoformat() - with open(summary_file_path, 'w', encoding='utf-8') as f: json.dump(summary_data, f, indent=4) logging.info(f"Successfully saved market cap summary to '{summary_file_path}'") else: logging.warning("No data found to create a summary.") - except Exception as e: logging.error(f"Failed to generate summary: {e}") def _update_total_market_cap(self, conn): - """ - Fetches the current total market cap and saves it for the current date. - """ + """Fetches the current total market cap and upserts it for the current date.""" logging.info("--- Processing Total Market Cap ---") table_name = "TOTAL_market_cap_daily" - try: today_date = datetime.now(timezone.utc).date() - - cursor = conn.cursor() - cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';") - table_exists = cursor.fetchone() - - if table_exists: - cursor.execute(f"SELECT 1 FROM \"{table_name}\" WHERE date(datetime_utc) = ? LIMIT 1", (today_date.isoformat(),)) - if cursor.fetchone(): - logging.info(f"Total market cap for {today_date} already exists. Skipping.") - return + today_dt = pd.to_datetime(today_date) + today_ts = int(today_dt.timestamp() * 1000) logging.info("Fetching current global market data...") url = f"{self.api_base_url}/global" @@ -150,10 +181,11 @@ class MarketCapFetcher: if total_mc: df_total = pd.DataFrame([{ - 'datetime_utc': pd.to_datetime(today_date), + 'datetime_utc': today_dt, + 'timestamp_ms': today_ts, 'market_cap': total_mc }]) - df_total.to_sql(table_name, conn, if_exists='append', index=False) + self._upsert_market_cap_data(conn, table_name, df_total) logging.info(f"Saved total market cap for {today_date}: ${total_mc:,.2f}") except requests.exceptions.RequestException as e: @@ -161,7 +193,6 @@ class MarketCapFetcher: except Exception as e: logging.error(f"An error occurred while updating total market cap: {e}") - def _update_stablecoin_aggregate(self, conn): """Fetches data for all stablecoins and saves the aggregated market cap.""" logging.info("--- Processing aggregated stablecoin market cap ---") @@ -171,7 +202,6 @@ class MarketCapFetcher: logging.info(f"Fetching historical data for stablecoin: {symbol}...") df = self._fetch_historical_data(coin_id, days=365) if not df.empty: - df['coin'] = symbol all_stablecoin_df = pd.concat([all_stablecoin_df, df]) time.sleep(2) @@ -179,31 +209,30 @@ class MarketCapFetcher: logging.warning("No data fetched for any stablecoins. Cannot create aggregate.") return - aggregated_df = all_stablecoin_df.groupby(all_stablecoin_df['datetime_utc'].dt.date)['market_cap'].sum().reset_index() - aggregated_df['datetime_utc'] = pd.to_datetime(aggregated_df['datetime_utc']) + aggregated_df = all_stablecoin_df.groupby('timestamp_ms').agg( + datetime_utc=('datetime_utc', 'first'), + market_cap=('market_cap', 'sum') + ).reset_index() table_name = "STABLECOINS_market_cap" - last_date_in_db = self._get_last_date_from_db(table_name, conn) + last_date_in_db = self._get_last_date_from_db(table_name, conn, is_timestamp_ms=True) if last_date_in_db: - aggregated_df = aggregated_df[aggregated_df['datetime_utc'] > last_date_in_db] + aggregated_df = aggregated_df[aggregated_df['timestamp_ms'] > last_date_in_db] if not aggregated_df.empty: - aggregated_df.to_sql(table_name, conn, if_exists='append', index=False) - logging.info(f"Successfully saved {len(aggregated_df)} daily records to '{table_name}'.") + self._upsert_market_cap_data(conn, table_name, aggregated_df) else: logging.info("Aggregated stablecoin data is already up-to-date.") - def _update_market_cap_for_coin(self, coin_id: str, coin_symbol: str, conn): """Fetches and appends new market cap data for a single coin.""" table_name = f"{coin_symbol}_market_cap" - - last_date_in_db = self._get_last_date_from_db(table_name, conn) + last_date_in_db = self._get_last_date_from_db(table_name, conn, is_timestamp_ms=True) days_to_fetch = 365 if last_date_in_db: - delta_days = (datetime.now() - last_date_in_db).days + delta_days = (datetime.now(timezone.utc) - datetime.fromtimestamp(last_date_in_db/1000, tz=timezone.utc)).days if delta_days <= 0: logging.info(f"Market cap data for '{coin_symbol}' is already up-to-date.") return @@ -218,24 +247,30 @@ class MarketCapFetcher: return if last_date_in_db: - df = df[df['datetime_utc'] > last_date_in_db] + df = df[df['timestamp_ms'] > last_date_in_db] if not df.empty: - df.to_sql(table_name, conn, if_exists='append', index=False) - logging.info(f"Successfully saved {len(df)} new daily market cap records for {coin_symbol}.") + self._upsert_market_cap_data(conn, table_name, df) else: logging.info(f"Data was fetched, but no new records needed saving for '{coin_symbol}'.") - def _get_last_date_from_db(self, table_name: str, conn) -> pd.Timestamp: - """Gets the most recent date from a market cap table as a pandas Timestamp.""" + def _get_last_date_from_db(self, table_name: str, conn, is_timestamp_ms: bool = False): + """Gets the most recent date or timestamp from a market cap table.""" try: cursor = conn.cursor() cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';") if not cursor.fetchone(): return None + + col_to_query = "timestamp_ms" if is_timestamp_ms else "datetime_utc" + last_val = pd.read_sql(f'SELECT MAX({col_to_query}) FROM "{table_name}"', conn).iloc[0, 0] + + if pd.isna(last_val): + return None + if is_timestamp_ms: + return int(last_val) + return pd.to_datetime(last_val) - last_date_str = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0] - return pd.to_datetime(last_date_str) if last_date_str else None except Exception as e: logging.error(f"Could not read last date from table '{table_name}': {e}") return None @@ -256,9 +291,10 @@ class MarketCapFetcher: if not market_caps: return pd.DataFrame() df = pd.DataFrame(market_caps, columns=['timestamp_ms', 'market_cap']) + # --- FIX: Convert to datetime object, but do not format as string --- df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms') - df.drop_duplicates(subset=['datetime_utc'], keep='last', inplace=True) - return df[['datetime_utc', 'market_cap']] + df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True) + return df[['datetime_utc', 'timestamp_ms', 'market_cap']] except requests.exceptions.RequestException as e: logging.error(f"API request failed for {coin_id}: {e}.") @@ -267,7 +303,6 @@ class MarketCapFetcher: if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fetch historical market cap data from CoinGecko.") - # --- FIX: The --coins argument is no longer needed as the script is now fully automated --- parser.add_argument( "--log-level", default="normal", @@ -276,7 +311,6 @@ if __name__ == "__main__": ) args = parser.parse_args() - # The 'coins' argument is no longer passed to the constructor fetcher = MarketCapFetcher(log_level=args.log_level) fetcher.run() diff --git a/resampler.py b/resampler.py index ea489b9..ffbeb59 100644 --- a/resampler.py +++ b/resampler.py @@ -37,7 +37,7 @@ class Resampler: def _ensure_tables_exist(self): """ - Ensures all resampled tables exist with a PRIMARY KEY on datetime_utc. + Ensures all resampled tables exist with a PRIMARY KEY on timestamp_ms. Attempts to migrate existing tables if the schema is incorrect. """ with sqlite3.connect(self.db_path) as conn: @@ -48,13 +48,22 @@ class Resampler: cursor.execute(f"PRAGMA table_info('{table_name}')") columns = cursor.fetchall() if columns: - pk_found = any(col[1] == 'datetime_utc' and col[5] == 1 for col in columns) + # --- FIX: Check for the correct PRIMARY KEY on timestamp_ms --- + pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns) if not pk_found: logging.warning(f"Schema migration needed for table '{table_name}'.") try: conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"') self._create_resampled_table(conn, table_name) - conn.execute(f'INSERT OR IGNORE INTO "{table_name}" SELECT * FROM "{table_name}_old"') + # Copy data, ensuring to create the timestamp_ms + logging.info(f" -> Migrating data for '{table_name}'...") + old_df = pd.read_sql(f'SELECT * FROM "{table_name}_old"', conn, parse_dates=['datetime_utc']) + if not old_df.empty: + old_df['timestamp_ms'] = (old_df['datetime_utc'].astype('int64') // 10**6) + # Keep only unique timestamps, preserving the last entry + old_df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True) + old_df.to_sql(table_name, conn, if_exists='append', index=False) + logging.info(f" -> Data migration complete.") conn.execute(f'DROP TABLE "{table_name}_old"') conn.commit() logging.info(f"Successfully migrated schema for '{table_name}'.") @@ -67,9 +76,11 @@ class Resampler: def _create_resampled_table(self, conn, table_name): """Creates a new resampled table with the correct schema.""" + # --- FIX: Set PRIMARY KEY on timestamp_ms for performance and uniqueness --- conn.execute(f''' CREATE TABLE "{table_name}" ( - datetime_utc TEXT PRIMARY KEY, + datetime_utc TEXT, + timestamp_ms INTEGER PRIMARY KEY, open REAL, high REAL, low REAL, @@ -123,22 +134,21 @@ class Resampler: source_table_name = f"{coin}_1m" logging.debug(f" Updating {tf_name} table...") - last_timestamp = self._get_last_timestamp(conn, target_table_name) + last_timestamp_ms = self._get_last_timestamp(conn, target_table_name) query = f'SELECT * FROM "{source_table_name}"' params = () - if last_timestamp: - query += ' WHERE datetime_utc >= ?' + if last_timestamp_ms: + query += ' WHERE timestamp_ms >= ?' + # Go back one interval to rebuild the last (potentially partial) candle try: - # --- FIX: Try the fast method first --- - interval_delta = pd.to_timedelta(tf_code) - query_start_date = last_timestamp - interval_delta + interval_delta_ms = pd.to_timedelta(tf_code).total_seconds() * 1000 except ValueError: - # --- FIX: Fall back to the safe method for special timeframes --- - logging.debug(f"Cannot create timedelta for '{tf_code}'. Using safe 32-day lookback.") - query_start_date = last_timestamp - timedelta(days=32) + # Fall back to a safe 32-day lookback for special timeframes + interval_delta_ms = timedelta(days=32).total_seconds() * 1000 - params = (query_start_date.strftime('%Y-%m-%d %H:%M:%S'),) + query_start_ms = last_timestamp_ms - interval_delta_ms + params = (query_start_ms,) df_1m = pd.read_sql(query, conn, params=params, parse_dates=['datetime_utc']) @@ -155,14 +165,15 @@ class Resampler: for index, row in resampled_df.iterrows(): records_to_upsert.append(( index.strftime('%Y-%m-%d %H:%M:%S'), + int(index.timestamp() * 1000), # Generate timestamp_ms row['open'], row['high'], row['low'], row['close'], row['volume'], row['number_of_trades'] )) cursor = conn.cursor() cursor.executemany(f''' - INSERT OR REPLACE INTO "{target_table_name}" (datetime_utc, open, high, low, close, volume, number_of_trades) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT OR REPLACE INTO "{target_table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', records_to_upsert) conn.commit() @@ -203,10 +214,11 @@ class Resampler: logging.info(f" - {tf_name:<10}: {total:,} candles") def _get_last_timestamp(self, conn, table_name): - """Gets the timestamp of the last entry in a table as a pandas Timestamp.""" + """Gets the millisecond timestamp of the last entry in a table.""" try: - timestamp_str = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0] - return pd.to_datetime(timestamp_str) if timestamp_str else None + # --- FIX: Query for the integer timestamp_ms, not the text datetime_utc --- + timestamp_ms = pd.read_sql(f'SELECT MAX(timestamp_ms) FROM "{table_name}"', conn).iloc[0, 0] + return int(timestamp_ms) if pd.notna(timestamp_ms) else None except (pd.io.sql.DatabaseError, IndexError): return None