import argparse import logging import os import sys import sqlite3 import pandas as pd from datetime import datetime # Assuming logging_utils.py is in the same directory from logging_utils import setup_logging class CsvImporter: """ Imports historical candle data from a large CSV file into the SQLite database, intelligently adding only the missing data. """ def __init__(self, log_level: str, csv_path: str, coin: str): setup_logging(log_level, 'CsvImporter') if not os.path.exists(csv_path): logging.error(f"CSV file not found at '{csv_path}'. Please check the path.") sys.exit(1) self.csv_path = csv_path self.coin = coin # --- FIX: Corrected the f-string syntax for the table name --- self.table_name = f"{self.coin}_1m" self.db_path = os.path.join("_data", "market_data.db") self.column_mapping = { 'Open time': 'datetime_utc', 'Open': 'open', 'High': 'high', 'Low': 'low', 'Close': 'close', 'Volume': 'volume', 'Number of trades': 'number_of_trades' } def run(self): """Orchestrates the entire import and verification process.""" logging.info(f"Starting import process for '{self.coin}' from '{self.csv_path}'...") with sqlite3.connect(self.db_path) as conn: conn.execute("PRAGMA journal_mode=WAL;") # 1. Get the current state of the database db_oldest, db_newest, initial_row_count = self._get_db_state(conn) # 2. Read, clean, and filter the CSV data new_data_df = self._process_and_filter_csv(db_oldest, db_newest) if new_data_df.empty: logging.info("No new data to import. Database is already up-to-date with the CSV file.") return # 3. Append the new data to the database self._append_to_db(new_data_df, conn) # 4. Summarize and verify the import self._summarize_import(initial_row_count, len(new_data_df), conn) def _get_db_state(self, conn) -> (datetime, datetime, int): """Gets the oldest and newest timestamps and total row count from the DB table.""" try: oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0] newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0] count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0] oldest_dt = pd.to_datetime(oldest) if oldest else None newest_dt = pd.to_datetime(newest) if newest else None if oldest_dt: logging.info(f"Database contains data from {oldest_dt} to {newest_dt}.") else: logging.info("Database table is empty. A full import will be performed.") return oldest_dt, newest_dt, count except pd.io.sql.DatabaseError: logging.info(f"Table '{self.table_name}' not found. It will be created.") return None, None, 0 def _process_and_filter_csv(self, db_oldest: datetime, db_newest: datetime) -> pd.DataFrame: """Reads the CSV and returns a DataFrame of only the missing data.""" logging.info("Reading and processing CSV file. This may take a moment for large files...") df = pd.read_csv(self.csv_path, usecols=self.column_mapping.keys()) # Clean and format the data df.rename(columns=self.column_mapping, inplace=True) df['datetime_utc'] = pd.to_datetime(df['datetime_utc']) # --- FIX: Calculate the millisecond timestamp from the datetime column --- # This converts the datetime to nanoseconds and then to milliseconds. df['timestamp_ms'] = (df['datetime_utc'].astype('int64') // 10**6) # Filter the data to find only rows that are outside the range of what's already in the DB if db_oldest and db_newest: # Get data from before the oldest record and after the newest record df_filtered = df[(df['datetime_utc'] < db_oldest) | (df['datetime_utc'] > db_newest)] else: # If the DB is empty, all data is new df_filtered = df logging.info(f"Found {len(df_filtered):,} new rows to import.") return df_filtered def _append_to_db(self, df: pd.DataFrame, conn): """Appends the DataFrame to the SQLite table.""" logging.info(f"Appending {len(df):,} new rows to the database...") df.to_sql(self.table_name, conn, if_exists='append', index=False) logging.info("Append operation complete.") def _summarize_import(self, initial_count: int, added_count: int, conn): """Prints a final summary and verification of the import.""" logging.info("--- Import Summary & Verification ---") try: final_count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0] new_oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0] new_newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0] print(f"\n{'Status':<20}: SUCCESS") print("-" * 40) print(f"{'Initial Row Count':<20}: {initial_count:,}") print(f"{'Rows Added':<20}: {added_count:,}") print(f"{'Final Row Count':<20}: {final_count:,}") print("-" * 40) print(f"{'New Oldest Record':<20}: {new_oldest}") print(f"{'New Newest Record':<20}: {new_newest}") # Verification check if final_count == initial_count + added_count: logging.info("Verification successful: Final row count matches expected count.") else: logging.warning("Verification warning: Final row count does not match expected count.") except Exception as e: logging.error(f"Could not generate summary. Error: {e}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Import historical CSV data into the SQLite database.") parser.add_argument("--file", required=True, help="Path to the large CSV file to import.") parser.add_argument("--coin", default="BTC", help="The coin symbol for this data (e.g., BTC).") parser.add_argument( "--log-level", default="normal", choices=['off', 'normal', 'debug'], help="Set the logging level for the script." ) args = parser.parse_args() importer = CsvImporter(log_level=args.log_level, csv_path=args.file, coin=args.coin) importer.run()