289 lines
14 KiB
Python
289 lines
14 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import sqlite3
|
|
import pandas as pd
|
|
import json
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
# Assuming logging_utils.py is in the same directory
|
|
from logging_utils import setup_logging
|
|
|
|
class Resampler:
|
|
"""
|
|
Reads new 1-minute candle data from the SQLite database, resamples it to
|
|
various timeframes, and upserts the new candles to the corresponding tables,
|
|
preventing data duplication.
|
|
"""
|
|
|
|
def __init__(self, log_level: str, coins: list, timeframes: dict):
|
|
setup_logging(log_level, 'Resampler')
|
|
self.db_path = os.path.join("_data", "market_data.db")
|
|
self.status_file_path = os.path.join("_data", "resampling_status.json")
|
|
self.coins_to_process = coins
|
|
self.timeframes = timeframes
|
|
self.aggregation_logic = {
|
|
'open': 'first',
|
|
'high': 'max',
|
|
'low': 'min',
|
|
'close': 'last',
|
|
'volume': 'sum',
|
|
'number_of_trades': 'sum'
|
|
}
|
|
self.resampling_status = self._load_existing_status()
|
|
self.job_start_time = None
|
|
self._ensure_tables_exist()
|
|
|
|
def _ensure_tables_exist(self):
|
|
"""
|
|
Ensures all resampled tables exist with a PRIMARY KEY on timestamp_ms.
|
|
Attempts to migrate existing tables if the schema is incorrect.
|
|
"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
for coin in self.coins_to_process:
|
|
for tf_name in self.timeframes.keys():
|
|
table_name = f"{coin}_{tf_name}"
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"PRAGMA table_info('{table_name}')")
|
|
columns = cursor.fetchall()
|
|
if columns:
|
|
# --- FIX: Check for the correct PRIMARY KEY on timestamp_ms ---
|
|
pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns)
|
|
if not pk_found:
|
|
logging.warning(f"Schema migration needed for table '{table_name}'.")
|
|
try:
|
|
conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"')
|
|
self._create_resampled_table(conn, table_name)
|
|
# Copy data, ensuring to create the timestamp_ms
|
|
logging.info(f" -> Migrating data for '{table_name}'...")
|
|
old_df = pd.read_sql(f'SELECT * FROM "{table_name}_old"', conn, parse_dates=['datetime_utc'])
|
|
if not old_df.empty:
|
|
old_df['timestamp_ms'] = (old_df['datetime_utc'].astype('int64') // 10**6)
|
|
# Keep only unique timestamps, preserving the last entry
|
|
old_df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True)
|
|
old_df.to_sql(table_name, conn, if_exists='append', index=False)
|
|
logging.info(f" -> Data migration complete.")
|
|
conn.execute(f'DROP TABLE "{table_name}_old"')
|
|
conn.commit()
|
|
logging.info(f"Successfully migrated schema for '{table_name}'.")
|
|
except Exception as e:
|
|
logging.error(f"FATAL: Migration for '{table_name}' failed: {e}. Please delete 'market_data.db' and restart.")
|
|
sys.exit(1)
|
|
else:
|
|
self._create_resampled_table(conn, table_name)
|
|
logging.info("All resampled table schemas verified.")
|
|
|
|
def _create_resampled_table(self, conn, table_name):
|
|
"""Creates a new resampled table with the correct schema."""
|
|
# --- FIX: Set PRIMARY KEY on timestamp_ms for performance and uniqueness ---
|
|
conn.execute(f'''
|
|
CREATE TABLE "{table_name}" (
|
|
datetime_utc TEXT,
|
|
timestamp_ms INTEGER PRIMARY KEY,
|
|
open REAL,
|
|
high REAL,
|
|
low REAL,
|
|
close REAL,
|
|
volume REAL,
|
|
number_of_trades INTEGER
|
|
)
|
|
''')
|
|
|
|
def _load_existing_status(self) -> dict:
|
|
"""Loads the existing status file if it exists, otherwise returns an empty dict."""
|
|
if os.path.exists(self.status_file_path):
|
|
try:
|
|
with open(self.status_file_path, 'r', encoding='utf-8') as f:
|
|
logging.debug(f"Loading existing status from '{self.status_file_path}'")
|
|
return json.load(f)
|
|
except (IOError, json.JSONDecodeError) as e:
|
|
logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}")
|
|
return {}
|
|
|
|
def run(self):
|
|
"""
|
|
Main execution function to process all configured coins and update the database.
|
|
"""
|
|
self.job_start_time = datetime.now(timezone.utc)
|
|
logging.info(f"--- Resampling job started at {self.job_start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} ---")
|
|
|
|
if '1m' in self.timeframes:
|
|
logging.debug("Ignoring '1m' timeframe as it is the source resolution.")
|
|
del self.timeframes['1m']
|
|
|
|
if not self.timeframes:
|
|
logging.warning("No timeframes to process after filtering. Exiting job.")
|
|
return
|
|
|
|
if not os.path.exists(self.db_path):
|
|
logging.error(f"Database file '{self.db_path}' not found.")
|
|
return
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute("PRAGMA journal_mode=WAL;")
|
|
|
|
logging.debug(f"Processing {len(self.coins_to_process)} coins...")
|
|
|
|
for coin in self.coins_to_process:
|
|
logging.debug(f"--- Processing {coin} ---")
|
|
|
|
try:
|
|
for tf_name, tf_code in self.timeframes.items():
|
|
target_table_name = f"{coin}_{tf_name}"
|
|
source_table_name = f"{coin}_1m"
|
|
logging.debug(f" Updating {tf_name} table...")
|
|
|
|
last_timestamp_ms = self._get_last_timestamp(conn, target_table_name)
|
|
|
|
query = f'SELECT * FROM "{source_table_name}"'
|
|
params = ()
|
|
if last_timestamp_ms:
|
|
query += ' WHERE timestamp_ms >= ?'
|
|
# Go back one interval to rebuild the last (potentially partial) candle
|
|
try:
|
|
interval_delta_ms = pd.to_timedelta(tf_code).total_seconds() * 1000
|
|
except ValueError:
|
|
# Fall back to a safe 32-day lookback for special timeframes
|
|
interval_delta_ms = timedelta(days=32).total_seconds() * 1000
|
|
|
|
query_start_ms = last_timestamp_ms - interval_delta_ms
|
|
params = (query_start_ms,)
|
|
|
|
df_1m = pd.read_sql(query, conn, params=params, parse_dates=['datetime_utc'])
|
|
|
|
if df_1m.empty:
|
|
logging.debug(f" -> No new 1-minute data for {tf_name}. Table is up to date.")
|
|
continue
|
|
|
|
df_1m.set_index('datetime_utc', inplace=True)
|
|
resampled_df = df_1m.resample(tf_code).agg(self.aggregation_logic)
|
|
resampled_df.dropna(how='all', inplace=True)
|
|
|
|
if not resampled_df.empty:
|
|
records_to_upsert = []
|
|
for index, row in resampled_df.iterrows():
|
|
records_to_upsert.append((
|
|
index.strftime('%Y-%m-%d %H:%M:%S'),
|
|
int(index.timestamp() * 1000), # Generate timestamp_ms
|
|
row['open'], row['high'], row['low'], row['close'],
|
|
row['volume'], row['number_of_trades']
|
|
))
|
|
|
|
cursor = conn.cursor()
|
|
cursor.executemany(f'''
|
|
INSERT OR REPLACE INTO "{target_table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', records_to_upsert)
|
|
conn.commit()
|
|
|
|
logging.debug(f" -> Upserted {len(resampled_df)} candles into '{target_table_name}'.")
|
|
|
|
if coin not in self.resampling_status: self.resampling_status[coin] = {}
|
|
total_candles = int(self._get_table_count(conn, target_table_name))
|
|
self.resampling_status[coin][tf_name] = {
|
|
"last_candle_utc": resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S'),
|
|
"total_candles": total_candles
|
|
}
|
|
|
|
except Exception as e:
|
|
logging.error(f"Failed to process coin '{coin}': {e}")
|
|
|
|
self._log_summary()
|
|
self._save_status()
|
|
logging.info(f"--- Resampling job finished at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')} ---")
|
|
|
|
def _log_summary(self):
|
|
"""Logs a summary of the total candles for each timeframe."""
|
|
logging.info("--- Resampling Job Summary ---")
|
|
timeframe_totals = {}
|
|
for coin, tfs in self.resampling_status.items():
|
|
if not isinstance(tfs, dict): continue
|
|
for tf_name, tf_data in tfs.items():
|
|
total = tf_data.get("total_candles", 0)
|
|
if tf_name not in timeframe_totals:
|
|
timeframe_totals[tf_name] = 0
|
|
timeframe_totals[tf_name] += total
|
|
|
|
if not timeframe_totals:
|
|
logging.info("No candles were resampled in this run.")
|
|
return
|
|
|
|
logging.info("Total candles per timeframe across all processed coins:")
|
|
for tf_name, total in sorted(timeframe_totals.items()):
|
|
logging.info(f" - {tf_name:<10}: {total:,} candles")
|
|
|
|
def _get_last_timestamp(self, conn, table_name):
|
|
"""Gets the millisecond timestamp of the last entry in a table."""
|
|
try:
|
|
# --- FIX: Query for the integer timestamp_ms, not the text datetime_utc ---
|
|
timestamp_ms = pd.read_sql(f'SELECT MAX(timestamp_ms) FROM "{table_name}"', conn).iloc[0, 0]
|
|
return int(timestamp_ms) if pd.notna(timestamp_ms) else None
|
|
except (pd.io.sql.DatabaseError, IndexError):
|
|
return None
|
|
|
|
def _get_table_count(self, conn, table_name):
|
|
"""Gets the total row count of a table."""
|
|
try:
|
|
return pd.read_sql(f'SELECT COUNT(*) FROM "{table_name}"', conn).iloc[0, 0]
|
|
except (pd.io.sql.DatabaseError, IndexError):
|
|
return 0
|
|
|
|
def _save_status(self):
|
|
"""Saves the final resampling status to a JSON file."""
|
|
if not self.resampling_status:
|
|
return
|
|
|
|
stop_time = datetime.now(timezone.utc)
|
|
self.resampling_status['job_start_time_utc'] = self.job_start_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
self.resampling_status['job_stop_time_utc'] = stop_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
self.resampling_status.pop('last_completed_utc', None)
|
|
|
|
try:
|
|
with open(self.status_file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(self.resampling_status, f, indent=4, sort_keys=True)
|
|
logging.info(f"Successfully saved resampling status to '{self.status_file_path}'")
|
|
except IOError as e:
|
|
logging.error(f"Failed to write resampling status file: {e}")
|
|
|
|
|
|
def parse_timeframes(tf_strings: list) -> dict:
|
|
"""Converts a list of timeframe strings into a dictionary for pandas."""
|
|
tf_map = {}
|
|
for tf_str in tf_strings:
|
|
numeric_part = ''.join(filter(str.isdigit, tf_str))
|
|
unit = ''.join(filter(str.isalpha, tf_str)) # Keep case for 'M'
|
|
|
|
key = tf_str
|
|
code = ''
|
|
if unit == 'm':
|
|
code = f"{numeric_part}min"
|
|
elif unit.lower() == 'w':
|
|
code = f"{numeric_part}W-MON"
|
|
elif unit == 'M':
|
|
code = f"{numeric_part}MS"
|
|
key = f"{numeric_part}month"
|
|
elif unit.lower() in ['h', 'd']:
|
|
code = f"{numeric_part}{unit.lower()}"
|
|
else:
|
|
code = tf_str
|
|
logging.warning(f"Unrecognized timeframe unit in '{tf_str}'. Using as-is.")
|
|
|
|
tf_map[key] = code
|
|
return tf_map
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Resample 1-minute candle data from SQLite to other timeframes.")
|
|
parser.add_argument("--coins", nargs='+', required=True, help="List of coins to process.")
|
|
parser.add_argument("--timeframes", nargs='+', required=True, help="List of timeframes to generate.")
|
|
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
|
|
args = parser.parse_args()
|
|
|
|
timeframes_dict = parse_timeframes(args.timeframes)
|
|
|
|
resampler = Resampler(log_level=args.log_level, coins=args.coins, timeframes=timeframes_dict)
|
|
resampler.run()
|
|
|