tmiestamp_ms column added to all tables as primary key

This commit is contained in:
2025-10-22 22:22:13 +02:00
parent afbb4e4976
commit 5805601218
2 changed files with 125 additions and 79 deletions

View File

@ -17,7 +17,7 @@ from logging_utils import setup_logging
class MarketCapFetcher:
"""
Fetches historical daily market cap data from the CoinGecko API and
intelligently updates the SQLite database for all coins found in the coin map.
intelligently upserts it into the SQLite database for all coins.
"""
def __init__(self, log_level: str):
@ -34,16 +34,50 @@ class MarketCapFetcher:
logging.error("Coin ID map is empty. Run 'update_coin_map.py' to generate it.")
sys.exit(1)
# --- FIX: The list of coins to fetch is now all coins from the map ---
self.coins_to_fetch = list(self.COIN_ID_MAP.keys())
self.STABLECOIN_ID_MAP = {
"USDT": "tether",
"USDC": "usd-coin",
"USDE": "ethena-usde",
"DAI": "dai",
"PYUSD": "paypal-usd"
"USDT": "tether", "USDC": "usd-coin", "USDE": "ethena-usde",
"DAI": "dai", "PYUSD": "paypal-usd"
}
# --- ADDED: Ensure all tables have the correct schema ---
self._ensure_tables_exist()
def _ensure_tables_exist(self):
"""Ensures all market cap tables exist with timestamp_ms as PRIMARY KEY."""
all_tables_to_check = [f"{coin}_market_cap" for coin in self.coins_to_fetch]
all_tables_to_check.extend(["STABLECOINS_market_cap", "TOTAL_market_cap_daily"])
with sqlite3.connect(self.db_path) as conn:
for table_name in all_tables_to_check:
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info('{table_name}')")
columns = cursor.fetchall()
if columns:
pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns)
if not pk_found:
logging.warning(f"Schema for table '{table_name}' is incorrect. Dropping and recreating table.")
try:
conn.execute(f'DROP TABLE "{table_name}"')
self._create_market_cap_table(conn, table_name)
logging.info(f"Successfully recreated schema for '{table_name}'.")
except Exception as e:
logging.error(f"FATAL: Failed to recreate table '{table_name}': {e}. Please delete 'market_data.db' and restart.")
sys.exit(1)
else:
self._create_market_cap_table(conn, table_name)
logging.info("All market cap table schemas verified.")
def _create_market_cap_table(self, conn, table_name):
"""Creates a new market cap table with the correct schema."""
conn.execute(f'''
CREATE TABLE IF NOT EXISTS "{table_name}" (
datetime_utc TEXT,
timestamp_ms INTEGER PRIMARY KEY,
market_cap REAL
)
''')
def _load_coin_id_map(self) -> dict:
"""Loads the dynamically generated coin-to-id mapping."""
@ -55,6 +89,27 @@ class MarketCapFetcher:
logging.error(f"Could not load '{map_file_path}'. Please run 'update_coin_map.py' first. Error: {e}")
return {}
def _upsert_market_cap_data(self, conn, table_name: str, df: pd.DataFrame):
"""Upserts a DataFrame of market cap data into the specified table."""
if df.empty:
return
records_to_upsert = []
for index, row in df.iterrows():
records_to_upsert.append((
row['datetime_utc'].strftime('%Y-%m-%d %H:%M:%S'),
row['timestamp_ms'],
row['market_cap']
))
cursor = conn.cursor()
cursor.executemany(f'''
INSERT OR REPLACE INTO "{table_name}" (datetime_utc, timestamp_ms, market_cap)
VALUES (?, ?, ?)
''', records_to_upsert)
conn.commit()
logging.info(f"Successfully upserted {len(records_to_upsert)} records into '{table_name}'.")
def run(self):
"""
Main execution function to process all configured coins and update the database.
@ -63,7 +118,6 @@ class MarketCapFetcher:
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
# 1. Process individual coins from the map
for coin_symbol in self.coins_to_fetch:
coin_id = self.COIN_ID_MAP.get(coin_symbol.upper())
if not coin_id:
@ -76,30 +130,21 @@ class MarketCapFetcher:
logging.error(f"An unexpected error occurred while processing {coin_symbol}: {e}")
time.sleep(2)
# 2. Process and aggregate stablecoins
self._update_stablecoin_aggregate(conn)
# 3. Process total market cap metrics
self._update_total_market_cap(conn)
# 4. Save a summary of the latest data
self._save_summary(conn)
logging.info("--- Market cap fetch process complete ---")
def _save_summary(self, conn):
"""
Queries the last record from each market cap table and saves a summary to a JSON file.
"""
# ... (This function is unchanged)
logging.info("--- Generating Market Cap Summary ---")
summary_data = {}
summary_file_path = os.path.join("_data", "market_cap_data.json")
try:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE '%_market_cap' OR name LIKE 'TOTAL_%');")
tables = [row[0] for row in cursor.fetchall()]
for table_name in tables:
try:
df_last = pd.read_sql(f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT 1', conn)
@ -107,38 +152,24 @@ class MarketCapFetcher:
summary_data[table_name] = df_last.to_dict('records')[0]
except Exception as e:
logging.error(f"Could not read last record from table '{table_name}': {e}")
if summary_data:
summary_data['summary_last_updated_utc'] = datetime.now(timezone.utc).isoformat()
with open(summary_file_path, 'w', encoding='utf-8') as f:
json.dump(summary_data, f, indent=4)
logging.info(f"Successfully saved market cap summary to '{summary_file_path}'")
else:
logging.warning("No data found to create a summary.")
except Exception as e:
logging.error(f"Failed to generate summary: {e}")
def _update_total_market_cap(self, conn):
"""
Fetches the current total market cap and saves it for the current date.
"""
"""Fetches the current total market cap and upserts it for the current date."""
logging.info("--- Processing Total Market Cap ---")
table_name = "TOTAL_market_cap_daily"
try:
today_date = datetime.now(timezone.utc).date()
cursor = conn.cursor()
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
table_exists = cursor.fetchone()
if table_exists:
cursor.execute(f"SELECT 1 FROM \"{table_name}\" WHERE date(datetime_utc) = ? LIMIT 1", (today_date.isoformat(),))
if cursor.fetchone():
logging.info(f"Total market cap for {today_date} already exists. Skipping.")
return
today_dt = pd.to_datetime(today_date)
today_ts = int(today_dt.timestamp() * 1000)
logging.info("Fetching current global market data...")
url = f"{self.api_base_url}/global"
@ -150,10 +181,11 @@ class MarketCapFetcher:
if total_mc:
df_total = pd.DataFrame([{
'datetime_utc': pd.to_datetime(today_date),
'datetime_utc': today_dt,
'timestamp_ms': today_ts,
'market_cap': total_mc
}])
df_total.to_sql(table_name, conn, if_exists='append', index=False)
self._upsert_market_cap_data(conn, table_name, df_total)
logging.info(f"Saved total market cap for {today_date}: ${total_mc:,.2f}")
except requests.exceptions.RequestException as e:
@ -161,7 +193,6 @@ class MarketCapFetcher:
except Exception as e:
logging.error(f"An error occurred while updating total market cap: {e}")
def _update_stablecoin_aggregate(self, conn):
"""Fetches data for all stablecoins and saves the aggregated market cap."""
logging.info("--- Processing aggregated stablecoin market cap ---")
@ -171,7 +202,6 @@ class MarketCapFetcher:
logging.info(f"Fetching historical data for stablecoin: {symbol}...")
df = self._fetch_historical_data(coin_id, days=365)
if not df.empty:
df['coin'] = symbol
all_stablecoin_df = pd.concat([all_stablecoin_df, df])
time.sleep(2)
@ -179,31 +209,30 @@ class MarketCapFetcher:
logging.warning("No data fetched for any stablecoins. Cannot create aggregate.")
return
aggregated_df = all_stablecoin_df.groupby(all_stablecoin_df['datetime_utc'].dt.date)['market_cap'].sum().reset_index()
aggregated_df['datetime_utc'] = pd.to_datetime(aggregated_df['datetime_utc'])
aggregated_df = all_stablecoin_df.groupby('timestamp_ms').agg(
datetime_utc=('datetime_utc', 'first'),
market_cap=('market_cap', 'sum')
).reset_index()
table_name = "STABLECOINS_market_cap"
last_date_in_db = self._get_last_date_from_db(table_name, conn)
last_date_in_db = self._get_last_date_from_db(table_name, conn, is_timestamp_ms=True)
if last_date_in_db:
aggregated_df = aggregated_df[aggregated_df['datetime_utc'] > last_date_in_db]
aggregated_df = aggregated_df[aggregated_df['timestamp_ms'] > last_date_in_db]
if not aggregated_df.empty:
aggregated_df.to_sql(table_name, conn, if_exists='append', index=False)
logging.info(f"Successfully saved {len(aggregated_df)} daily records to '{table_name}'.")
self._upsert_market_cap_data(conn, table_name, aggregated_df)
else:
logging.info("Aggregated stablecoin data is already up-to-date.")
def _update_market_cap_for_coin(self, coin_id: str, coin_symbol: str, conn):
"""Fetches and appends new market cap data for a single coin."""
table_name = f"{coin_symbol}_market_cap"
last_date_in_db = self._get_last_date_from_db(table_name, conn)
last_date_in_db = self._get_last_date_from_db(table_name, conn, is_timestamp_ms=True)
days_to_fetch = 365
if last_date_in_db:
delta_days = (datetime.now() - last_date_in_db).days
delta_days = (datetime.now(timezone.utc) - datetime.fromtimestamp(last_date_in_db/1000, tz=timezone.utc)).days
if delta_days <= 0:
logging.info(f"Market cap data for '{coin_symbol}' is already up-to-date.")
return
@ -218,24 +247,30 @@ class MarketCapFetcher:
return
if last_date_in_db:
df = df[df['datetime_utc'] > last_date_in_db]
df = df[df['timestamp_ms'] > last_date_in_db]
if not df.empty:
df.to_sql(table_name, conn, if_exists='append', index=False)
logging.info(f"Successfully saved {len(df)} new daily market cap records for {coin_symbol}.")
self._upsert_market_cap_data(conn, table_name, df)
else:
logging.info(f"Data was fetched, but no new records needed saving for '{coin_symbol}'.")
def _get_last_date_from_db(self, table_name: str, conn) -> pd.Timestamp:
"""Gets the most recent date from a market cap table as a pandas Timestamp."""
def _get_last_date_from_db(self, table_name: str, conn, is_timestamp_ms: bool = False):
"""Gets the most recent date or timestamp from a market cap table."""
try:
cursor = conn.cursor()
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
if not cursor.fetchone():
return None
col_to_query = "timestamp_ms" if is_timestamp_ms else "datetime_utc"
last_val = pd.read_sql(f'SELECT MAX({col_to_query}) FROM "{table_name}"', conn).iloc[0, 0]
if pd.isna(last_val):
return None
if is_timestamp_ms:
return int(last_val)
return pd.to_datetime(last_val)
last_date_str = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0]
return pd.to_datetime(last_date_str) if last_date_str else None
except Exception as e:
logging.error(f"Could not read last date from table '{table_name}': {e}")
return None
@ -256,9 +291,10 @@ class MarketCapFetcher:
if not market_caps: return pd.DataFrame()
df = pd.DataFrame(market_caps, columns=['timestamp_ms', 'market_cap'])
# --- FIX: Convert to datetime object, but do not format as string ---
df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms')
df.drop_duplicates(subset=['datetime_utc'], keep='last', inplace=True)
return df[['datetime_utc', 'market_cap']]
df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True)
return df[['datetime_utc', 'timestamp_ms', 'market_cap']]
except requests.exceptions.RequestException as e:
logging.error(f"API request failed for {coin_id}: {e}.")
@ -267,7 +303,6 @@ class MarketCapFetcher:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fetch historical market cap data from CoinGecko.")
# --- FIX: The --coins argument is no longer needed as the script is now fully automated ---
parser.add_argument(
"--log-level",
default="normal",
@ -276,7 +311,6 @@ if __name__ == "__main__":
)
args = parser.parse_args()
# The 'coins' argument is no longer passed to the constructor
fetcher = MarketCapFetcher(log_level=args.log_level)
fetcher.run()

View File

@ -37,7 +37,7 @@ class Resampler:
def _ensure_tables_exist(self):
"""
Ensures all resampled tables exist with a PRIMARY KEY on datetime_utc.
Ensures all resampled tables exist with a PRIMARY KEY on timestamp_ms.
Attempts to migrate existing tables if the schema is incorrect.
"""
with sqlite3.connect(self.db_path) as conn:
@ -48,13 +48,22 @@ class Resampler:
cursor.execute(f"PRAGMA table_info('{table_name}')")
columns = cursor.fetchall()
if columns:
pk_found = any(col[1] == 'datetime_utc' and col[5] == 1 for col in columns)
# --- FIX: Check for the correct PRIMARY KEY on timestamp_ms ---
pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns)
if not pk_found:
logging.warning(f"Schema migration needed for table '{table_name}'.")
try:
conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"')
self._create_resampled_table(conn, table_name)
conn.execute(f'INSERT OR IGNORE INTO "{table_name}" SELECT * FROM "{table_name}_old"')
# Copy data, ensuring to create the timestamp_ms
logging.info(f" -> Migrating data for '{table_name}'...")
old_df = pd.read_sql(f'SELECT * FROM "{table_name}_old"', conn, parse_dates=['datetime_utc'])
if not old_df.empty:
old_df['timestamp_ms'] = (old_df['datetime_utc'].astype('int64') // 10**6)
# Keep only unique timestamps, preserving the last entry
old_df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True)
old_df.to_sql(table_name, conn, if_exists='append', index=False)
logging.info(f" -> Data migration complete.")
conn.execute(f'DROP TABLE "{table_name}_old"')
conn.commit()
logging.info(f"Successfully migrated schema for '{table_name}'.")
@ -67,9 +76,11 @@ class Resampler:
def _create_resampled_table(self, conn, table_name):
"""Creates a new resampled table with the correct schema."""
# --- FIX: Set PRIMARY KEY on timestamp_ms for performance and uniqueness ---
conn.execute(f'''
CREATE TABLE "{table_name}" (
datetime_utc TEXT PRIMARY KEY,
datetime_utc TEXT,
timestamp_ms INTEGER PRIMARY KEY,
open REAL,
high REAL,
low REAL,
@ -123,22 +134,21 @@ class Resampler:
source_table_name = f"{coin}_1m"
logging.debug(f" Updating {tf_name} table...")
last_timestamp = self._get_last_timestamp(conn, target_table_name)
last_timestamp_ms = self._get_last_timestamp(conn, target_table_name)
query = f'SELECT * FROM "{source_table_name}"'
params = ()
if last_timestamp:
query += ' WHERE datetime_utc >= ?'
if last_timestamp_ms:
query += ' WHERE timestamp_ms >= ?'
# Go back one interval to rebuild the last (potentially partial) candle
try:
# --- FIX: Try the fast method first ---
interval_delta = pd.to_timedelta(tf_code)
query_start_date = last_timestamp - interval_delta
interval_delta_ms = pd.to_timedelta(tf_code).total_seconds() * 1000
except ValueError:
# --- FIX: Fall back to the safe method for special timeframes ---
logging.debug(f"Cannot create timedelta for '{tf_code}'. Using safe 32-day lookback.")
query_start_date = last_timestamp - timedelta(days=32)
# Fall back to a safe 32-day lookback for special timeframes
interval_delta_ms = timedelta(days=32).total_seconds() * 1000
params = (query_start_date.strftime('%Y-%m-%d %H:%M:%S'),)
query_start_ms = last_timestamp_ms - interval_delta_ms
params = (query_start_ms,)
df_1m = pd.read_sql(query, conn, params=params, parse_dates=['datetime_utc'])
@ -155,14 +165,15 @@ class Resampler:
for index, row in resampled_df.iterrows():
records_to_upsert.append((
index.strftime('%Y-%m-%d %H:%M:%S'),
int(index.timestamp() * 1000), # Generate timestamp_ms
row['open'], row['high'], row['low'], row['close'],
row['volume'], row['number_of_trades']
))
cursor = conn.cursor()
cursor.executemany(f'''
INSERT OR REPLACE INTO "{target_table_name}" (datetime_utc, open, high, low, close, volume, number_of_trades)
VALUES (?, ?, ?, ?, ?, ?, ?)
INSERT OR REPLACE INTO "{target_table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', records_to_upsert)
conn.commit()
@ -203,10 +214,11 @@ class Resampler:
logging.info(f" - {tf_name:<10}: {total:,} candles")
def _get_last_timestamp(self, conn, table_name):
"""Gets the timestamp of the last entry in a table as a pandas Timestamp."""
"""Gets the millisecond timestamp of the last entry in a table."""
try:
timestamp_str = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0]
return pd.to_datetime(timestamp_str) if timestamp_str else None
# --- FIX: Query for the integer timestamp_ms, not the text datetime_utc ---
timestamp_ms = pd.read_sql(f'SELECT MAX(timestamp_ms) FROM "{table_name}"', conn).iloc[0, 0]
return int(timestamp_ms) if pd.notna(timestamp_ms) else None
except (pd.io.sql.DatabaseError, IndexError):
return None