first stategies, import script for BTC histry
This commit is contained in:
147
import_csv.py
Normal file
147
import_csv.py
Normal file
@ -0,0 +1,147 @@
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
|
||||
# Assuming logging_utils.py is in the same directory
|
||||
from logging_utils import setup_logging
|
||||
|
||||
class CsvImporter:
|
||||
"""
|
||||
Imports historical candle data from a large CSV file into the SQLite database,
|
||||
intelligently adding only the missing data.
|
||||
"""
|
||||
|
||||
def __init__(self, log_level: str, csv_path: str, coin: str):
|
||||
setup_logging(log_level, 'CsvImporter')
|
||||
if not os.path.exists(csv_path):
|
||||
logging.error(f"CSV file not found at '{csv_path}'. Please check the path.")
|
||||
sys.exit(1)
|
||||
|
||||
self.csv_path = csv_path
|
||||
self.coin = coin
|
||||
self.table_name = f"{self.coin}_1m"
|
||||
self.db_path = os.path.join("_data", "market_data.db")
|
||||
self.column_mapping = {
|
||||
'Open time': 'datetime_utc',
|
||||
'Open': 'open',
|
||||
'High': 'high',
|
||||
'Low': 'low',
|
||||
'Close': 'close',
|
||||
'Volume': 'volume',
|
||||
'Number of trades': 'number_of_trades'
|
||||
}
|
||||
|
||||
def run(self):
|
||||
"""Orchestrates the entire import and verification process."""
|
||||
logging.info(f"Starting import process for '{self.coin}' from '{self.csv_path}'...")
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL;")
|
||||
|
||||
# 1. Get the current state of the database
|
||||
db_oldest, db_newest, initial_row_count = self._get_db_state(conn)
|
||||
|
||||
# 2. Read, clean, and filter the CSV data
|
||||
new_data_df = self._process_and_filter_csv(db_oldest, db_newest)
|
||||
|
||||
if new_data_df.empty:
|
||||
logging.info("No new data to import. Database is already up-to-date with the CSV file.")
|
||||
return
|
||||
|
||||
# 3. Append the new data to the database
|
||||
self._append_to_db(new_data_df, conn)
|
||||
|
||||
# 4. Summarize and verify the import
|
||||
self._summarize_import(initial_row_count, len(new_data_df), conn)
|
||||
|
||||
def _get_db_state(self, conn) -> (datetime, datetime, int):
|
||||
"""Gets the oldest and newest timestamps and total row count from the DB table."""
|
||||
try:
|
||||
oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
|
||||
newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
|
||||
count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0]
|
||||
|
||||
oldest_dt = pd.to_datetime(oldest) if oldest else None
|
||||
newest_dt = pd.to_datetime(newest) if newest else None
|
||||
|
||||
if oldest_dt:
|
||||
logging.info(f"Database contains data from {oldest_dt} to {newest_dt}.")
|
||||
else:
|
||||
logging.info("Database table is empty. A full import will be performed.")
|
||||
|
||||
return oldest_dt, newest_dt, count
|
||||
except pd.io.sql.DatabaseError:
|
||||
logging.info(f"Table '{self.table_name}' not found. It will be created.")
|
||||
return None, None, 0
|
||||
|
||||
def _process_and_filter_csv(self, db_oldest: datetime, db_newest: datetime) -> pd.DataFrame:
|
||||
"""Reads the CSV and returns a DataFrame of only the missing data."""
|
||||
logging.info("Reading and processing CSV file. This may take a moment for large files...")
|
||||
df = pd.read_csv(self.csv_path, usecols=self.column_mapping.keys())
|
||||
|
||||
# Clean and format the data
|
||||
df.rename(columns=self.column_mapping, inplace=True)
|
||||
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
|
||||
|
||||
# Filter the data to find only rows that are outside the range of what's already in the DB
|
||||
if db_oldest and db_newest:
|
||||
# Get data from before the oldest record and after the newest record
|
||||
df_filtered = df[(df['datetime_utc'] < db_oldest) | (df['datetime_utc'] > db_newest)]
|
||||
else:
|
||||
# If the DB is empty, all data is new
|
||||
df_filtered = df
|
||||
|
||||
logging.info(f"Found {len(df_filtered):,} new rows to import.")
|
||||
return df_filtered
|
||||
|
||||
def _append_to_db(self, df: pd.DataFrame, conn):
|
||||
"""Appends the DataFrame to the SQLite table."""
|
||||
logging.info(f"Appending {len(df):,} new rows to the database...")
|
||||
df.to_sql(self.table_name, conn, if_exists='append', index=False)
|
||||
logging.info("Append operation complete.")
|
||||
|
||||
def _summarize_import(self, initial_count: int, added_count: int, conn):
|
||||
"""Prints a final summary and verification of the import."""
|
||||
logging.info("--- Import Summary & Verification ---")
|
||||
|
||||
try:
|
||||
final_count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0]
|
||||
new_oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
|
||||
new_newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
|
||||
|
||||
print(f"\n{'Status':<20}: SUCCESS")
|
||||
print("-" * 40)
|
||||
print(f"{'Initial Row Count':<20}: {initial_count:,}")
|
||||
print(f"{'Rows Added':<20}: {added_count:,}")
|
||||
print(f"{'Final Row Count':<20}: {final_count:,}")
|
||||
print("-" * 40)
|
||||
print(f"{'New Oldest Record':<20}: {new_oldest}")
|
||||
print(f"{'New Newest Record':<20}: {new_newest}")
|
||||
|
||||
# Verification check
|
||||
if final_count == initial_count + added_count:
|
||||
logging.info("Verification successful: Final row count matches expected count.")
|
||||
else:
|
||||
logging.warning("Verification warning: Final row count does not match expected count.")
|
||||
except Exception as e:
|
||||
logging.error(f"Could not generate summary. Error: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Import historical CSV data into the SQLite database.")
|
||||
parser.add_argument("--file", required=True, help="Path to the large CSV file to import.")
|
||||
parser.add_argument("--coin", default="BTC", help="The coin symbol for this data (e.g., BTC).")
|
||||
parser.add_argument(
|
||||
"--log-level",
|
||||
default="normal",
|
||||
choices=['off', 'normal', 'debug'],
|
||||
help="Set the logging level for the script."
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
importer = CsvImporter(log_level=args.log_level, csv_path=args.file, coin=args.coin)
|
||||
importer.run()
|
||||
Reference in New Issue
Block a user