155 lines
6.7 KiB
Python
155 lines
6.7 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import sqlite3
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
|
|
# Assuming logging_utils.py is in the same directory
|
|
from logging_utils import setup_logging
|
|
|
|
class CsvImporter:
|
|
"""
|
|
Imports historical candle data from a large CSV file into the SQLite database,
|
|
intelligently adding only the missing data.
|
|
"""
|
|
|
|
def __init__(self, log_level: str, csv_path: str, coin: str):
|
|
setup_logging(log_level, 'CsvImporter')
|
|
if not os.path.exists(csv_path):
|
|
logging.error(f"CSV file not found at '{csv_path}'. Please check the path.")
|
|
sys.exit(1)
|
|
|
|
self.csv_path = csv_path
|
|
self.coin = coin
|
|
# --- FIX: Corrected the f-string syntax for the table name ---
|
|
self.table_name = f"{self.coin}_1m"
|
|
self.db_path = os.path.join("_data", "market_data.db")
|
|
self.column_mapping = {
|
|
'Open time': 'datetime_utc',
|
|
'Open': 'open',
|
|
'High': 'high',
|
|
'Low': 'low',
|
|
'Close': 'close',
|
|
'Volume': 'volume',
|
|
'Number of trades': 'number_of_trades'
|
|
}
|
|
|
|
def run(self):
|
|
"""Orchestrates the entire import and verification process."""
|
|
logging.info(f"Starting import process for '{self.coin}' from '{self.csv_path}'...")
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute("PRAGMA journal_mode=WAL;")
|
|
|
|
# 1. Get the current state of the database
|
|
db_oldest, db_newest, initial_row_count = self._get_db_state(conn)
|
|
|
|
# 2. Read, clean, and filter the CSV data
|
|
new_data_df = self._process_and_filter_csv(db_oldest, db_newest)
|
|
|
|
if new_data_df.empty:
|
|
logging.info("No new data to import. Database is already up-to-date with the CSV file.")
|
|
return
|
|
|
|
# 3. Append the new data to the database
|
|
self._append_to_db(new_data_df, conn)
|
|
|
|
# 4. Summarize and verify the import
|
|
self._summarize_import(initial_row_count, len(new_data_df), conn)
|
|
|
|
def _get_db_state(self, conn) -> (datetime, datetime, int):
|
|
"""Gets the oldest and newest timestamps and total row count from the DB table."""
|
|
try:
|
|
oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
|
|
newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
|
|
count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0]
|
|
|
|
oldest_dt = pd.to_datetime(oldest) if oldest else None
|
|
newest_dt = pd.to_datetime(newest) if newest else None
|
|
|
|
if oldest_dt:
|
|
logging.info(f"Database contains data from {oldest_dt} to {newest_dt}.")
|
|
else:
|
|
logging.info("Database table is empty. A full import will be performed.")
|
|
|
|
return oldest_dt, newest_dt, count
|
|
except pd.io.sql.DatabaseError:
|
|
logging.info(f"Table '{self.table_name}' not found. It will be created.")
|
|
return None, None, 0
|
|
|
|
def _process_and_filter_csv(self, db_oldest: datetime, db_newest: datetime) -> pd.DataFrame:
|
|
"""Reads the CSV and returns a DataFrame of only the missing data."""
|
|
logging.info("Reading and processing CSV file. This may take a moment for large files...")
|
|
df = pd.read_csv(self.csv_path, usecols=self.column_mapping.keys())
|
|
|
|
# Clean and format the data
|
|
df.rename(columns=self.column_mapping, inplace=True)
|
|
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
|
|
|
|
# --- FIX: Calculate the millisecond timestamp from the datetime column ---
|
|
# This converts the datetime to nanoseconds and then to milliseconds.
|
|
df['timestamp_ms'] = (df['datetime_utc'].astype('int64') // 10**6)
|
|
|
|
# Filter the data to find only rows that are outside the range of what's already in the DB
|
|
if db_oldest and db_newest:
|
|
# Get data from before the oldest record and after the newest record
|
|
df_filtered = df[(df['datetime_utc'] < db_oldest) | (df['datetime_utc'] > db_newest)]
|
|
else:
|
|
# If the DB is empty, all data is new
|
|
df_filtered = df
|
|
|
|
logging.info(f"Found {len(df_filtered):,} new rows to import.")
|
|
return df_filtered
|
|
|
|
def _append_to_db(self, df: pd.DataFrame, conn):
|
|
"""Appends the DataFrame to the SQLite table."""
|
|
logging.info(f"Appending {len(df):,} new rows to the database...")
|
|
df.to_sql(self.table_name, conn, if_exists='append', index=False)
|
|
logging.info("Append operation complete.")
|
|
|
|
def _summarize_import(self, initial_count: int, added_count: int, conn):
|
|
"""Prints a final summary and verification of the import."""
|
|
logging.info("--- Import Summary & Verification ---")
|
|
|
|
try:
|
|
final_count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0]
|
|
new_oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
|
|
new_newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
|
|
|
|
print(f"\n{'Status':<20}: SUCCESS")
|
|
print("-" * 40)
|
|
print(f"{'Initial Row Count':<20}: {initial_count:,}")
|
|
print(f"{'Rows Added':<20}: {added_count:,}")
|
|
print(f"{'Final Row Count':<20}: {final_count:,}")
|
|
print("-" * 40)
|
|
print(f"{'New Oldest Record':<20}: {new_oldest}")
|
|
print(f"{'New Newest Record':<20}: {new_newest}")
|
|
|
|
# Verification check
|
|
if final_count == initial_count + added_count:
|
|
logging.info("Verification successful: Final row count matches expected count.")
|
|
else:
|
|
logging.warning("Verification warning: Final row count does not match expected count.")
|
|
except Exception as e:
|
|
logging.error(f"Could not generate summary. Error: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Import historical CSV data into the SQLite database.")
|
|
parser.add_argument("--file", required=True, help="Path to the large CSV file to import.")
|
|
parser.add_argument("--coin", default="BTC", help="The coin symbol for this data (e.g., BTC).")
|
|
parser.add_argument(
|
|
"--log-level",
|
|
default="normal",
|
|
choices=['off', 'normal', 'debug'],
|
|
help="Set the logging level for the script."
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
importer = CsvImporter(log_level=args.log_level, csv_path=args.file, coin=args.coin)
|
|
importer.run()
|
|
|
|
|