Files
hyper/fix_timestamps.py
2025-10-14 19:15:35 +02:00

119 lines
5.0 KiB
Python

import argparse
import logging
import os
import sys
import sqlite3
import pandas as pd
# script to fix missing millisecond timestamps in the database after import from CSVs (this is already fixed in import_csv.py)
# Assuming logging_utils.py is in the same directory
from logging_utils import setup_logging
class DatabaseFixer:
"""
Scans the SQLite database for rows with missing millisecond timestamps
and updates them based on the datetime_utc column.
"""
def __init__(self, log_level: str, coin: str):
setup_logging(log_level, 'TimestampFixer')
self.coin = coin
self.table_name = f"{self.coin}_1m"
self.db_path = os.path.join("_data", "market_data.db")
def run(self):
"""Orchestrates the entire database update and verification process."""
logging.info(f"Starting timestamp fix process for table '{self.table_name}'...")
if not os.path.exists(self.db_path):
logging.error(f"Database file not found at '{self.db_path}'. Exiting.")
sys.exit(1)
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
# 1. Check how many rows need fixing
rows_to_fix_count = self._count_rows_to_fix(conn)
if rows_to_fix_count == 0:
logging.info(f"No rows with missing timestamps found in '{self.table_name}'. No action needed.")
return
logging.info(f"Found {rows_to_fix_count:,} rows with missing timestamps to update.")
# 2. Process the table in chunks to conserve memory
updated_count = self._process_in_chunks(conn)
# 3. Provide a final summary
self._summarize_update(rows_to_fix_count, updated_count)
except Exception as e:
logging.error(f"A critical error occurred: {e}")
def _count_rows_to_fix(self, conn) -> int:
"""Counts the number of rows where timestamp_ms is NULL."""
try:
return pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}" WHERE timestamp_ms IS NULL', conn).iloc[0, 0]
except pd.io.sql.DatabaseError:
logging.error(f"Table '{self.table_name}' not found in the database. Cannot fix timestamps.")
sys.exit(1)
def _process_in_chunks(self, conn) -> int:
"""Reads, calculates, and updates timestamps in manageable chunks."""
total_updated = 0
chunk_size = 50000 # Process 50,000 rows at a time
# We select the special 'rowid' column to uniquely identify each row for updating
query = f'SELECT rowid, datetime_utc FROM "{self.table_name}" WHERE timestamp_ms IS NULL'
for chunk_df in pd.read_sql_query(query, conn, chunksize=chunk_size):
if chunk_df.empty:
break
logging.info(f"Processing a chunk of {len(chunk_df)} rows...")
# Calculate the missing timestamps
chunk_df['datetime_utc'] = pd.to_datetime(chunk_df['datetime_utc'])
chunk_df['timestamp_ms'] = (chunk_df['datetime_utc'].astype('int64') // 10**6)
# Prepare data for the update command: a list of (timestamp, rowid) tuples
update_data = list(zip(chunk_df['timestamp_ms'], chunk_df['rowid']))
# Use executemany for a fast bulk update
cursor = conn.cursor()
cursor.executemany(f'UPDATE "{self.table_name}" SET timestamp_ms = ? WHERE rowid = ?', update_data)
conn.commit()
total_updated += len(chunk_df)
logging.info(f"Updated {total_updated} rows so far...")
return total_updated
def _summarize_update(self, expected_count: int, actual_count: int):
"""Prints a final summary of the update process."""
logging.info("--- Timestamp Fix Summary ---")
print(f"\n{'Status':<25}: COMPLETE")
print("-" * 40)
print(f"{'Table Processed':<25}: {self.table_name}")
print(f"{'Rows Needing Update':<25}: {expected_count:,}")
print(f"{'Rows Successfully Updated':<25}: {actual_count:,}")
if expected_count == actual_count:
logging.info("Verification successful: All necessary rows have been updated.")
else:
logging.warning("Verification warning: The number of updated rows does not match the expected count.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fix missing millisecond timestamps in the SQLite database.")
parser.add_argument("--coin", default="BTC", help="The coin symbol for the table to fix (e.g., BTC).")
parser.add_argument(
"--log-level",
default="normal",
choices=['off', 'normal', 'debug'],
help="Set the logging level for the script."
)
args = parser.parse_args()
fixer = DatabaseFixer(log_level=args.log_level, coin=args.coin)
fixer.run()