import argparse import logging import os import sys import sqlite3 import pandas as pd # Assuming logging_utils.py is in the same directory from logging_utils import setup_logging class Migrator: """ Reads 1-minute candle data from CSV files and migrates it into an SQLite database for improved performance and easier access. """ def __init__(self, log_level: str): setup_logging(log_level, 'Migrator') self.source_folder = os.path.join("_data", "candles") self.db_path = os.path.join("_data", "market_data.db") def run(self): """ Main execution function to find all CSV files and migrate them to the database. """ if not os.path.exists(self.source_folder): logging.error(f"Source data folder '{self.source_folder}' not found. " "Please ensure data has been fetched first.") sys.exit(1) csv_files = [f for f in os.listdir(self.source_folder) if f.endswith('_1m.csv')] if not csv_files: logging.warning("No 1-minute CSV files found in the source folder to migrate.") return logging.info(f"Found {len(csv_files)} source CSV files to migrate to SQLite.") # Connect to the SQLite database (it will be created if it doesn't exist) with sqlite3.connect(self.db_path) as conn: for file_name in csv_files: coin = file_name.split('_')[0] table_name = f"{coin}_1m" file_path = os.path.join(self.source_folder, file_name) logging.info(f"Migrating '{file_name}' to table '{table_name}'...") try: # 1. Load the entire CSV file into a pandas DataFrame. df = pd.read_csv(file_path) if df.empty: logging.warning(f"CSV file '{file_name}' is empty. Skipping.") continue # 2. Convert the timestamp column to a proper datetime object. df['datetime_utc'] = pd.to_datetime(df['datetime_utc']) # 3. Write the DataFrame to the SQLite database. # 'replace' will drop the table first if it exists and create a new one. # This is ideal for a migration script to ensure a clean import. df.to_sql( table_name, conn, if_exists='replace', index=False # Do not write the pandas DataFrame index as a column ) # 4. (Optional but Recommended) Create an index on the timestamp for fast queries. logging.debug(f"Creating index on 'datetime_utc' for table '{table_name}'...") conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_time ON {table_name}(datetime_utc);") logging.info(f"Successfully migrated {len(df)} rows to '{table_name}'.") except Exception as e: logging.error(f"Failed to process and migrate file '{file_name}': {e}") logging.info("--- Database migration complete ---") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Migrate 1-minute candle data from CSV files to an SQLite database.") parser.add_argument( "--log-level", default="normal", choices=['off', 'normal', 'debug'], help="Set the logging level for the script." ) args = parser.parse_args() migrator = Migrator(log_level=args.log_level) migrator.run()