119 lines
5.0 KiB
Python
119 lines
5.0 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import sqlite3
|
|
import pandas as pd
|
|
# script to fix missing millisecond timestamps in the database after import from CSVs (this is already fixed in import_csv.py)
|
|
# Assuming logging_utils.py is in the same directory
|
|
from logging_utils import setup_logging
|
|
|
|
class DatabaseFixer:
|
|
"""
|
|
Scans the SQLite database for rows with missing millisecond timestamps
|
|
and updates them based on the datetime_utc column.
|
|
"""
|
|
|
|
def __init__(self, log_level: str, coin: str):
|
|
setup_logging(log_level, 'TimestampFixer')
|
|
self.coin = coin
|
|
self.table_name = f"{self.coin}_1m"
|
|
self.db_path = os.path.join("_data", "market_data.db")
|
|
|
|
def run(self):
|
|
"""Orchestrates the entire database update and verification process."""
|
|
logging.info(f"Starting timestamp fix process for table '{self.table_name}'...")
|
|
|
|
if not os.path.exists(self.db_path):
|
|
logging.error(f"Database file not found at '{self.db_path}'. Exiting.")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute("PRAGMA journal_mode=WAL;")
|
|
|
|
# 1. Check how many rows need fixing
|
|
rows_to_fix_count = self._count_rows_to_fix(conn)
|
|
if rows_to_fix_count == 0:
|
|
logging.info(f"No rows with missing timestamps found in '{self.table_name}'. No action needed.")
|
|
return
|
|
|
|
logging.info(f"Found {rows_to_fix_count:,} rows with missing timestamps to update.")
|
|
|
|
# 2. Process the table in chunks to conserve memory
|
|
updated_count = self._process_in_chunks(conn)
|
|
|
|
# 3. Provide a final summary
|
|
self._summarize_update(rows_to_fix_count, updated_count)
|
|
|
|
except Exception as e:
|
|
logging.error(f"A critical error occurred: {e}")
|
|
|
|
def _count_rows_to_fix(self, conn) -> int:
|
|
"""Counts the number of rows where timestamp_ms is NULL."""
|
|
try:
|
|
return pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}" WHERE timestamp_ms IS NULL', conn).iloc[0, 0]
|
|
except pd.io.sql.DatabaseError:
|
|
logging.error(f"Table '{self.table_name}' not found in the database. Cannot fix timestamps.")
|
|
sys.exit(1)
|
|
|
|
def _process_in_chunks(self, conn) -> int:
|
|
"""Reads, calculates, and updates timestamps in manageable chunks."""
|
|
total_updated = 0
|
|
chunk_size = 50000 # Process 50,000 rows at a time
|
|
|
|
# We select the special 'rowid' column to uniquely identify each row for updating
|
|
query = f'SELECT rowid, datetime_utc FROM "{self.table_name}" WHERE timestamp_ms IS NULL'
|
|
|
|
for chunk_df in pd.read_sql_query(query, conn, chunksize=chunk_size):
|
|
if chunk_df.empty:
|
|
break
|
|
|
|
logging.info(f"Processing a chunk of {len(chunk_df)} rows...")
|
|
|
|
# Calculate the missing timestamps
|
|
chunk_df['datetime_utc'] = pd.to_datetime(chunk_df['datetime_utc'])
|
|
chunk_df['timestamp_ms'] = (chunk_df['datetime_utc'].astype('int64') // 10**6)
|
|
|
|
# Prepare data for the update command: a list of (timestamp, rowid) tuples
|
|
update_data = list(zip(chunk_df['timestamp_ms'], chunk_df['rowid']))
|
|
|
|
# Use executemany for a fast bulk update
|
|
cursor = conn.cursor()
|
|
cursor.executemany(f'UPDATE "{self.table_name}" SET timestamp_ms = ? WHERE rowid = ?', update_data)
|
|
conn.commit()
|
|
|
|
total_updated += len(chunk_df)
|
|
logging.info(f"Updated {total_updated} rows so far...")
|
|
|
|
return total_updated
|
|
|
|
def _summarize_update(self, expected_count: int, actual_count: int):
|
|
"""Prints a final summary of the update process."""
|
|
logging.info("--- Timestamp Fix Summary ---")
|
|
print(f"\n{'Status':<25}: COMPLETE")
|
|
print("-" * 40)
|
|
print(f"{'Table Processed':<25}: {self.table_name}")
|
|
print(f"{'Rows Needing Update':<25}: {expected_count:,}")
|
|
print(f"{'Rows Successfully Updated':<25}: {actual_count:,}")
|
|
|
|
if expected_count == actual_count:
|
|
logging.info("Verification successful: All necessary rows have been updated.")
|
|
else:
|
|
logging.warning("Verification warning: The number of updated rows does not match the expected count.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Fix missing millisecond timestamps in the SQLite database.")
|
|
parser.add_argument("--coin", default="BTC", help="The coin symbol for the table to fix (e.g., BTC).")
|
|
parser.add_argument(
|
|
"--log-level",
|
|
default="normal",
|
|
choices=['off', 'normal', 'debug'],
|
|
help="Set the logging level for the script."
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
fixer = DatabaseFixer(log_level=args.log_level, coin=args.coin)
|
|
fixer.run()
|