import argparse import logging import os import sys import sqlite3 import pandas as pd # script to fix missing millisecond timestamps in the database after import from CSVs (this is already fixed in import_csv.py) # Assuming logging_utils.py is in the same directory from logging_utils import setup_logging class DatabaseFixer: """ Scans the SQLite database for rows with missing millisecond timestamps and updates them based on the datetime_utc column. """ def __init__(self, log_level: str, coin: str): setup_logging(log_level, 'TimestampFixer') self.coin = coin self.table_name = f"{self.coin}_1m" self.db_path = os.path.join("_data", "market_data.db") def run(self): """Orchestrates the entire database update and verification process.""" logging.info(f"Starting timestamp fix process for table '{self.table_name}'...") if not os.path.exists(self.db_path): logging.error(f"Database file not found at '{self.db_path}'. Exiting.") sys.exit(1) try: with sqlite3.connect(self.db_path) as conn: conn.execute("PRAGMA journal_mode=WAL;") # 1. Check how many rows need fixing rows_to_fix_count = self._count_rows_to_fix(conn) if rows_to_fix_count == 0: logging.info(f"No rows with missing timestamps found in '{self.table_name}'. No action needed.") return logging.info(f"Found {rows_to_fix_count:,} rows with missing timestamps to update.") # 2. Process the table in chunks to conserve memory updated_count = self._process_in_chunks(conn) # 3. Provide a final summary self._summarize_update(rows_to_fix_count, updated_count) except Exception as e: logging.error(f"A critical error occurred: {e}") def _count_rows_to_fix(self, conn) -> int: """Counts the number of rows where timestamp_ms is NULL.""" try: return pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}" WHERE timestamp_ms IS NULL', conn).iloc[0, 0] except pd.io.sql.DatabaseError: logging.error(f"Table '{self.table_name}' not found in the database. Cannot fix timestamps.") sys.exit(1) def _process_in_chunks(self, conn) -> int: """Reads, calculates, and updates timestamps in manageable chunks.""" total_updated = 0 chunk_size = 50000 # Process 50,000 rows at a time # We select the special 'rowid' column to uniquely identify each row for updating query = f'SELECT rowid, datetime_utc FROM "{self.table_name}" WHERE timestamp_ms IS NULL' for chunk_df in pd.read_sql_query(query, conn, chunksize=chunk_size): if chunk_df.empty: break logging.info(f"Processing a chunk of {len(chunk_df)} rows...") # Calculate the missing timestamps chunk_df['datetime_utc'] = pd.to_datetime(chunk_df['datetime_utc']) chunk_df['timestamp_ms'] = (chunk_df['datetime_utc'].astype('int64') // 10**6) # Prepare data for the update command: a list of (timestamp, rowid) tuples update_data = list(zip(chunk_df['timestamp_ms'], chunk_df['rowid'])) # Use executemany for a fast bulk update cursor = conn.cursor() cursor.executemany(f'UPDATE "{self.table_name}" SET timestamp_ms = ? WHERE rowid = ?', update_data) conn.commit() total_updated += len(chunk_df) logging.info(f"Updated {total_updated} rows so far...") return total_updated def _summarize_update(self, expected_count: int, actual_count: int): """Prints a final summary of the update process.""" logging.info("--- Timestamp Fix Summary ---") print(f"\n{'Status':<25}: COMPLETE") print("-" * 40) print(f"{'Table Processed':<25}: {self.table_name}") print(f"{'Rows Needing Update':<25}: {expected_count:,}") print(f"{'Rows Successfully Updated':<25}: {actual_count:,}") if expected_count == actual_count: logging.info("Verification successful: All necessary rows have been updated.") else: logging.warning("Verification warning: The number of updated rows does not match the expected count.") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fix missing millisecond timestamps in the SQLite database.") parser.add_argument("--coin", default="BTC", help="The coin symbol for the table to fix (e.g., BTC).") parser.add_argument( "--log-level", default="normal", choices=['off', 'normal', 'debug'], help="Set the logging level for the script." ) args = parser.parse_args() fixer = DatabaseFixer(log_level=args.log_level, coin=args.coin) fixer.run()