imort CSV files
This commit is contained in:
Binary file not shown.
@ -1,11 +1,11 @@
|
|||||||
{
|
{
|
||||||
"rsi_strategy_eth": {
|
"sma_125d_eth": {
|
||||||
"enabled": true,
|
"enabled": true,
|
||||||
"script": "strategy_template.py",
|
"script": "strategy_template.py",
|
||||||
"parameters": {
|
"parameters": {
|
||||||
"coin": "ETH",
|
"coin": "ETH",
|
||||||
"timeframe": "5m",
|
"timeframe": "1D",
|
||||||
"rsi_period": 14
|
"sma_period": 125
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"ma_cross_btc": {
|
"ma_cross_btc": {
|
||||||
|
|||||||
118
fix_timestamps.py
Normal file
118
fix_timestamps.py
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import sqlite3
|
||||||
|
import pandas as pd
|
||||||
|
# script to fix missing millisecond timestamps in the database after import from CSVs (this is already fixed in import_csv.py)
|
||||||
|
# Assuming logging_utils.py is in the same directory
|
||||||
|
from logging_utils import setup_logging
|
||||||
|
|
||||||
|
class DatabaseFixer:
|
||||||
|
"""
|
||||||
|
Scans the SQLite database for rows with missing millisecond timestamps
|
||||||
|
and updates them based on the datetime_utc column.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, log_level: str, coin: str):
|
||||||
|
setup_logging(log_level, 'TimestampFixer')
|
||||||
|
self.coin = coin
|
||||||
|
self.table_name = f"{self.coin}_1m"
|
||||||
|
self.db_path = os.path.join("_data", "market_data.db")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""Orchestrates the entire database update and verification process."""
|
||||||
|
logging.info(f"Starting timestamp fix process for table '{self.table_name}'...")
|
||||||
|
|
||||||
|
if not os.path.exists(self.db_path):
|
||||||
|
logging.error(f"Database file not found at '{self.db_path}'. Exiting.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL;")
|
||||||
|
|
||||||
|
# 1. Check how many rows need fixing
|
||||||
|
rows_to_fix_count = self._count_rows_to_fix(conn)
|
||||||
|
if rows_to_fix_count == 0:
|
||||||
|
logging.info(f"No rows with missing timestamps found in '{self.table_name}'. No action needed.")
|
||||||
|
return
|
||||||
|
|
||||||
|
logging.info(f"Found {rows_to_fix_count:,} rows with missing timestamps to update.")
|
||||||
|
|
||||||
|
# 2. Process the table in chunks to conserve memory
|
||||||
|
updated_count = self._process_in_chunks(conn)
|
||||||
|
|
||||||
|
# 3. Provide a final summary
|
||||||
|
self._summarize_update(rows_to_fix_count, updated_count)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"A critical error occurred: {e}")
|
||||||
|
|
||||||
|
def _count_rows_to_fix(self, conn) -> int:
|
||||||
|
"""Counts the number of rows where timestamp_ms is NULL."""
|
||||||
|
try:
|
||||||
|
return pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}" WHERE timestamp_ms IS NULL', conn).iloc[0, 0]
|
||||||
|
except pd.io.sql.DatabaseError:
|
||||||
|
logging.error(f"Table '{self.table_name}' not found in the database. Cannot fix timestamps.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def _process_in_chunks(self, conn) -> int:
|
||||||
|
"""Reads, calculates, and updates timestamps in manageable chunks."""
|
||||||
|
total_updated = 0
|
||||||
|
chunk_size = 50000 # Process 50,000 rows at a time
|
||||||
|
|
||||||
|
# We select the special 'rowid' column to uniquely identify each row for updating
|
||||||
|
query = f'SELECT rowid, datetime_utc FROM "{self.table_name}" WHERE timestamp_ms IS NULL'
|
||||||
|
|
||||||
|
for chunk_df in pd.read_sql_query(query, conn, chunksize=chunk_size):
|
||||||
|
if chunk_df.empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
logging.info(f"Processing a chunk of {len(chunk_df)} rows...")
|
||||||
|
|
||||||
|
# Calculate the missing timestamps
|
||||||
|
chunk_df['datetime_utc'] = pd.to_datetime(chunk_df['datetime_utc'])
|
||||||
|
chunk_df['timestamp_ms'] = (chunk_df['datetime_utc'].astype('int64') // 10**6)
|
||||||
|
|
||||||
|
# Prepare data for the update command: a list of (timestamp, rowid) tuples
|
||||||
|
update_data = list(zip(chunk_df['timestamp_ms'], chunk_df['rowid']))
|
||||||
|
|
||||||
|
# Use executemany for a fast bulk update
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.executemany(f'UPDATE "{self.table_name}" SET timestamp_ms = ? WHERE rowid = ?', update_data)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
total_updated += len(chunk_df)
|
||||||
|
logging.info(f"Updated {total_updated} rows so far...")
|
||||||
|
|
||||||
|
return total_updated
|
||||||
|
|
||||||
|
def _summarize_update(self, expected_count: int, actual_count: int):
|
||||||
|
"""Prints a final summary of the update process."""
|
||||||
|
logging.info("--- Timestamp Fix Summary ---")
|
||||||
|
print(f"\n{'Status':<25}: COMPLETE")
|
||||||
|
print("-" * 40)
|
||||||
|
print(f"{'Table Processed':<25}: {self.table_name}")
|
||||||
|
print(f"{'Rows Needing Update':<25}: {expected_count:,}")
|
||||||
|
print(f"{'Rows Successfully Updated':<25}: {actual_count:,}")
|
||||||
|
|
||||||
|
if expected_count == actual_count:
|
||||||
|
logging.info("Verification successful: All necessary rows have been updated.")
|
||||||
|
else:
|
||||||
|
logging.warning("Verification warning: The number of updated rows does not match the expected count.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="Fix missing millisecond timestamps in the SQLite database.")
|
||||||
|
parser.add_argument("--coin", default="BTC", help="The coin symbol for the table to fix (e.g., BTC).")
|
||||||
|
parser.add_argument(
|
||||||
|
"--log-level",
|
||||||
|
default="normal",
|
||||||
|
choices=['off', 'normal', 'debug'],
|
||||||
|
help="Set the logging level for the script."
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
fixer = DatabaseFixer(log_level=args.log_level, coin=args.coin)
|
||||||
|
fixer.run()
|
||||||
@ -23,6 +23,7 @@ class CsvImporter:
|
|||||||
|
|
||||||
self.csv_path = csv_path
|
self.csv_path = csv_path
|
||||||
self.coin = coin
|
self.coin = coin
|
||||||
|
# --- FIX: Corrected the f-string syntax for the table name ---
|
||||||
self.table_name = f"{self.coin}_1m"
|
self.table_name = f"{self.coin}_1m"
|
||||||
self.db_path = os.path.join("_data", "market_data.db")
|
self.db_path = os.path.join("_data", "market_data.db")
|
||||||
self.column_mapping = {
|
self.column_mapping = {
|
||||||
@ -87,6 +88,10 @@ class CsvImporter:
|
|||||||
df.rename(columns=self.column_mapping, inplace=True)
|
df.rename(columns=self.column_mapping, inplace=True)
|
||||||
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
|
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
|
||||||
|
|
||||||
|
# --- FIX: Calculate the millisecond timestamp from the datetime column ---
|
||||||
|
# This converts the datetime to nanoseconds and then to milliseconds.
|
||||||
|
df['timestamp_ms'] = (df['datetime_utc'].astype('int64') // 10**6)
|
||||||
|
|
||||||
# Filter the data to find only rows that are outside the range of what's already in the DB
|
# Filter the data to find only rows that are outside the range of what's already in the DB
|
||||||
if db_oldest and db_newest:
|
if db_oldest and db_newest:
|
||||||
# Get data from before the oldest record and after the newest record
|
# Get data from before the oldest record and after the newest record
|
||||||
@ -145,3 +150,5 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
importer = CsvImporter(log_level=args.log_level, csv_path=args.file, coin=args.coin)
|
importer = CsvImporter(log_level=args.log_level, csv_path=args.file, coin=args.coin)
|
||||||
importer.run()
|
importer.run()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user