resampler much faster
This commit is contained in:
138
resampler.py
138
resampler.py
@ -5,7 +5,7 @@ import sys
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
# Assuming logging_utils.py is in the same directory
|
||||
from logging_utils import setup_logging
|
||||
@ -13,7 +13,8 @@ from logging_utils import setup_logging
|
||||
class Resampler:
|
||||
"""
|
||||
Reads new 1-minute candle data from the SQLite database, resamples it to
|
||||
various timeframes, and appends the new candles to the corresponding tables.
|
||||
various timeframes, and upserts the new candles to the corresponding tables,
|
||||
preventing data duplication.
|
||||
"""
|
||||
|
||||
def __init__(self, log_level: str, coins: list, timeframes: dict):
|
||||
@ -32,6 +33,51 @@ class Resampler:
|
||||
}
|
||||
self.resampling_status = self._load_existing_status()
|
||||
self.job_start_time = None
|
||||
self._ensure_tables_exist()
|
||||
|
||||
def _ensure_tables_exist(self):
|
||||
"""
|
||||
Ensures all resampled tables exist with a PRIMARY KEY on datetime_utc.
|
||||
Attempts to migrate existing tables if the schema is incorrect.
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
for coin in self.coins_to_process:
|
||||
for tf_name in self.timeframes.keys():
|
||||
table_name = f"{coin}_{tf_name}"
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f"PRAGMA table_info('{table_name}')")
|
||||
columns = cursor.fetchall()
|
||||
if columns:
|
||||
pk_found = any(col[1] == 'datetime_utc' and col[5] == 1 for col in columns)
|
||||
if not pk_found:
|
||||
logging.warning(f"Schema migration needed for table '{table_name}'.")
|
||||
try:
|
||||
conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"')
|
||||
self._create_resampled_table(conn, table_name)
|
||||
conn.execute(f'INSERT OR IGNORE INTO "{table_name}" SELECT * FROM "{table_name}_old"')
|
||||
conn.execute(f'DROP TABLE "{table_name}_old"')
|
||||
conn.commit()
|
||||
logging.info(f"Successfully migrated schema for '{table_name}'.")
|
||||
except Exception as e:
|
||||
logging.error(f"FATAL: Migration for '{table_name}' failed: {e}. Please delete 'market_data.db' and restart.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
self._create_resampled_table(conn, table_name)
|
||||
logging.info("All resampled table schemas verified.")
|
||||
|
||||
def _create_resampled_table(self, conn, table_name):
|
||||
"""Creates a new resampled table with the correct schema."""
|
||||
conn.execute(f'''
|
||||
CREATE TABLE "{table_name}" (
|
||||
datetime_utc TEXT PRIMARY KEY,
|
||||
open REAL,
|
||||
high REAL,
|
||||
low REAL,
|
||||
close REAL,
|
||||
volume REAL,
|
||||
number_of_trades INTEGER
|
||||
)
|
||||
''')
|
||||
|
||||
def _load_existing_status(self) -> dict:
|
||||
"""Loads the existing status file if it exists, otherwise returns an empty dict."""
|
||||
@ -51,6 +97,14 @@ class Resampler:
|
||||
self.job_start_time = datetime.now(timezone.utc)
|
||||
logging.info(f"--- Resampling job started at {self.job_start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} ---")
|
||||
|
||||
if '1m' in self.timeframes:
|
||||
logging.debug("Ignoring '1m' timeframe as it is the source resolution.")
|
||||
del self.timeframes['1m']
|
||||
|
||||
if not self.timeframes:
|
||||
logging.warning("No timeframes to process after filtering. Exiting job.")
|
||||
return
|
||||
|
||||
if not os.path.exists(self.db_path):
|
||||
logging.error(f"Database file '{self.db_path}' not found.")
|
||||
return
|
||||
@ -61,37 +115,58 @@ class Resampler:
|
||||
logging.debug(f"Processing {len(self.coins_to_process)} coins...")
|
||||
|
||||
for coin in self.coins_to_process:
|
||||
source_table_name = f"{coin}_1m"
|
||||
logging.debug(f"--- Processing {coin} ---")
|
||||
|
||||
try:
|
||||
# Load the full 1m history once per coin
|
||||
df_1m = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn, parse_dates=['datetime_utc'])
|
||||
if df_1m.empty:
|
||||
logging.warning(f"Source table '{source_table_name}' is empty. Skipping.")
|
||||
continue
|
||||
df_1m.set_index('datetime_utc', inplace=True)
|
||||
|
||||
for tf_name, tf_code in self.timeframes.items():
|
||||
target_table_name = f"{coin}_{tf_name}"
|
||||
source_table_name = f"{coin}_1m"
|
||||
logging.debug(f" Updating {tf_name} table...")
|
||||
|
||||
last_timestamp = self._get_last_timestamp(conn, target_table_name)
|
||||
|
||||
# Get the new 1-minute data that needs to be processed
|
||||
new_df_1m = df_1m[df_1m.index > last_timestamp] if last_timestamp else df_1m
|
||||
query = f'SELECT * FROM "{source_table_name}"'
|
||||
params = ()
|
||||
if last_timestamp:
|
||||
query += ' WHERE datetime_utc >= ?'
|
||||
try:
|
||||
# --- FIX: Try the fast method first ---
|
||||
interval_delta = pd.to_timedelta(tf_code)
|
||||
query_start_date = last_timestamp - interval_delta
|
||||
except ValueError:
|
||||
# --- FIX: Fall back to the safe method for special timeframes ---
|
||||
logging.debug(f"Cannot create timedelta for '{tf_code}'. Using safe 32-day lookback.")
|
||||
query_start_date = last_timestamp - timedelta(days=32)
|
||||
|
||||
params = (query_start_date.strftime('%Y-%m-%d %H:%M:%S'),)
|
||||
|
||||
if new_df_1m.empty:
|
||||
df_1m = pd.read_sql(query, conn, params=params, parse_dates=['datetime_utc'])
|
||||
|
||||
if df_1m.empty:
|
||||
logging.debug(f" -> No new 1-minute data for {tf_name}. Table is up to date.")
|
||||
continue
|
||||
|
||||
resampled_df = new_df_1m.resample(tf_code).agg(self.aggregation_logic)
|
||||
df_1m.set_index('datetime_utc', inplace=True)
|
||||
resampled_df = df_1m.resample(tf_code).agg(self.aggregation_logic)
|
||||
resampled_df.dropna(how='all', inplace=True)
|
||||
|
||||
if not resampled_df.empty:
|
||||
# Append the newly resampled data to the target table
|
||||
resampled_df.to_sql(target_table_name, conn, if_exists='append', index=True)
|
||||
logging.debug(f" -> Appended {len(resampled_df)} new candles to '{target_table_name}'.")
|
||||
records_to_upsert = []
|
||||
for index, row in resampled_df.iterrows():
|
||||
records_to_upsert.append((
|
||||
index.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
row['open'], row['high'], row['low'], row['close'],
|
||||
row['volume'], row['number_of_trades']
|
||||
))
|
||||
|
||||
cursor = conn.cursor()
|
||||
cursor.executemany(f'''
|
||||
INSERT OR REPLACE INTO "{target_table_name}" (datetime_utc, open, high, low, close, volume, number_of_trades)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
''', records_to_upsert)
|
||||
conn.commit()
|
||||
|
||||
logging.debug(f" -> Upserted {len(resampled_df)} candles into '{target_table_name}'.")
|
||||
|
||||
if coin not in self.resampling_status: self.resampling_status[coin] = {}
|
||||
total_candles = int(self._get_table_count(conn, target_table_name))
|
||||
@ -111,7 +186,6 @@ class Resampler:
|
||||
"""Logs a summary of the total candles for each timeframe."""
|
||||
logging.info("--- Resampling Job Summary ---")
|
||||
timeframe_totals = {}
|
||||
# Iterate through coins, skipping metadata keys
|
||||
for coin, tfs in self.resampling_status.items():
|
||||
if not isinstance(tfs, dict): continue
|
||||
for tf_name, tf_data in tfs.items():
|
||||
@ -129,9 +203,10 @@ class Resampler:
|
||||
logging.info(f" - {tf_name:<10}: {total:,} candles")
|
||||
|
||||
def _get_last_timestamp(self, conn, table_name):
|
||||
"""Gets the timestamp of the last entry in a table."""
|
||||
"""Gets the timestamp of the last entry in a table as a pandas Timestamp."""
|
||||
try:
|
||||
return pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0]
|
||||
timestamp_str = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0]
|
||||
return pd.to_datetime(timestamp_str) if timestamp_str else None
|
||||
except (pd.io.sql.DatabaseError, IndexError):
|
||||
return None
|
||||
|
||||
@ -151,7 +226,6 @@ class Resampler:
|
||||
self.resampling_status['job_start_time_utc'] = self.job_start_time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
self.resampling_status['job_stop_time_utc'] = stop_time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# Clean up old key if it exists from previous versions
|
||||
self.resampling_status.pop('last_completed_utc', None)
|
||||
|
||||
try:
|
||||
@ -167,14 +241,24 @@ def parse_timeframes(tf_strings: list) -> dict:
|
||||
tf_map = {}
|
||||
for tf_str in tf_strings:
|
||||
numeric_part = ''.join(filter(str.isdigit, tf_str))
|
||||
unit = ''.join(filter(str.isalpha, tf_str)).lower()
|
||||
unit = ''.join(filter(str.isalpha, tf_str)) # Keep case for 'M'
|
||||
|
||||
key = tf_str
|
||||
code = ''
|
||||
if unit == 'm': code = f"{numeric_part}min"
|
||||
elif unit == 'w': code = f"{numeric_part}W"
|
||||
elif unit in ['h', 'd']: code = f"{numeric_part}{unit}"
|
||||
else: code = tf_str
|
||||
tf_map[tf_str] = code
|
||||
if unit == 'm':
|
||||
code = f"{numeric_part}min"
|
||||
elif unit.lower() == 'w':
|
||||
code = f"{numeric_part}W-MON"
|
||||
elif unit == 'M':
|
||||
code = f"{numeric_part}MS"
|
||||
key = f"{numeric_part}month"
|
||||
elif unit.lower() in ['h', 'd']:
|
||||
code = f"{numeric_part}{unit.lower()}"
|
||||
else:
|
||||
code = tf_str
|
||||
logging.warning(f"Unrecognized timeframe unit in '{tf_str}'. Using as-is.")
|
||||
|
||||
tf_map[key] = code
|
||||
return tf_map
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user