Files
hyper/import_csv.py
2025-10-14 19:15:35 +02:00

155 lines
6.7 KiB
Python

import argparse
import logging
import os
import sys
import sqlite3
import pandas as pd
from datetime import datetime
# Assuming logging_utils.py is in the same directory
from logging_utils import setup_logging
class CsvImporter:
"""
Imports historical candle data from a large CSV file into the SQLite database,
intelligently adding only the missing data.
"""
def __init__(self, log_level: str, csv_path: str, coin: str):
setup_logging(log_level, 'CsvImporter')
if not os.path.exists(csv_path):
logging.error(f"CSV file not found at '{csv_path}'. Please check the path.")
sys.exit(1)
self.csv_path = csv_path
self.coin = coin
# --- FIX: Corrected the f-string syntax for the table name ---
self.table_name = f"{self.coin}_1m"
self.db_path = os.path.join("_data", "market_data.db")
self.column_mapping = {
'Open time': 'datetime_utc',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
'Number of trades': 'number_of_trades'
}
def run(self):
"""Orchestrates the entire import and verification process."""
logging.info(f"Starting import process for '{self.coin}' from '{self.csv_path}'...")
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
# 1. Get the current state of the database
db_oldest, db_newest, initial_row_count = self._get_db_state(conn)
# 2. Read, clean, and filter the CSV data
new_data_df = self._process_and_filter_csv(db_oldest, db_newest)
if new_data_df.empty:
logging.info("No new data to import. Database is already up-to-date with the CSV file.")
return
# 3. Append the new data to the database
self._append_to_db(new_data_df, conn)
# 4. Summarize and verify the import
self._summarize_import(initial_row_count, len(new_data_df), conn)
def _get_db_state(self, conn) -> (datetime, datetime, int):
"""Gets the oldest and newest timestamps and total row count from the DB table."""
try:
oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0]
oldest_dt = pd.to_datetime(oldest) if oldest else None
newest_dt = pd.to_datetime(newest) if newest else None
if oldest_dt:
logging.info(f"Database contains data from {oldest_dt} to {newest_dt}.")
else:
logging.info("Database table is empty. A full import will be performed.")
return oldest_dt, newest_dt, count
except pd.io.sql.DatabaseError:
logging.info(f"Table '{self.table_name}' not found. It will be created.")
return None, None, 0
def _process_and_filter_csv(self, db_oldest: datetime, db_newest: datetime) -> pd.DataFrame:
"""Reads the CSV and returns a DataFrame of only the missing data."""
logging.info("Reading and processing CSV file. This may take a moment for large files...")
df = pd.read_csv(self.csv_path, usecols=self.column_mapping.keys())
# Clean and format the data
df.rename(columns=self.column_mapping, inplace=True)
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
# --- FIX: Calculate the millisecond timestamp from the datetime column ---
# This converts the datetime to nanoseconds and then to milliseconds.
df['timestamp_ms'] = (df['datetime_utc'].astype('int64') // 10**6)
# Filter the data to find only rows that are outside the range of what's already in the DB
if db_oldest and db_newest:
# Get data from before the oldest record and after the newest record
df_filtered = df[(df['datetime_utc'] < db_oldest) | (df['datetime_utc'] > db_newest)]
else:
# If the DB is empty, all data is new
df_filtered = df
logging.info(f"Found {len(df_filtered):,} new rows to import.")
return df_filtered
def _append_to_db(self, df: pd.DataFrame, conn):
"""Appends the DataFrame to the SQLite table."""
logging.info(f"Appending {len(df):,} new rows to the database...")
df.to_sql(self.table_name, conn, if_exists='append', index=False)
logging.info("Append operation complete.")
def _summarize_import(self, initial_count: int, added_count: int, conn):
"""Prints a final summary and verification of the import."""
logging.info("--- Import Summary & Verification ---")
try:
final_count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0]
new_oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
new_newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
print(f"\n{'Status':<20}: SUCCESS")
print("-" * 40)
print(f"{'Initial Row Count':<20}: {initial_count:,}")
print(f"{'Rows Added':<20}: {added_count:,}")
print(f"{'Final Row Count':<20}: {final_count:,}")
print("-" * 40)
print(f"{'New Oldest Record':<20}: {new_oldest}")
print(f"{'New Newest Record':<20}: {new_newest}")
# Verification check
if final_count == initial_count + added_count:
logging.info("Verification successful: Final row count matches expected count.")
else:
logging.warning("Verification warning: Final row count does not match expected count.")
except Exception as e:
logging.error(f"Could not generate summary. Error: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Import historical CSV data into the SQLite database.")
parser.add_argument("--file", required=True, help="Path to the large CSV file to import.")
parser.add_argument("--coin", default="BTC", help="The coin symbol for this data (e.g., BTC).")
parser.add_argument(
"--log-level",
default="normal",
choices=['off', 'normal', 'debug'],
help="Set the logging level for the script."
)
args = parser.parse_args()
importer = CsvImporter(log_level=args.log_level, csv_path=args.file, coin=args.coin)
importer.run()