import argparse import logging import os import sys import sqlite3 import pandas as pd import json from datetime import datetime, timezone, timedelta # Assuming logging_utils.py is in the same directory from logging_utils import setup_logging class Resampler: """ Reads new 1-minute candle data from the SQLite database, resamples it to various timeframes, and upserts the new candles to the corresponding tables, preventing data duplication. """ def __init__(self, log_level: str, coins: list, timeframes: dict): setup_logging(log_level, 'Resampler') self.db_path = os.path.join("_data", "market_data.db") self.status_file_path = os.path.join("_data", "resampling_status.json") self.coins_to_process = coins self.timeframes = timeframes self.aggregation_logic = { 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum', 'number_of_trades': 'sum' } self.resampling_status = self._load_existing_status() self.job_start_time = None self._ensure_tables_exist() def _ensure_tables_exist(self): """ Ensures all resampled tables exist with a PRIMARY KEY on timestamp_ms. Attempts to migrate existing tables if the schema is incorrect. """ with sqlite3.connect(self.db_path) as conn: for coin in self.coins_to_process: for tf_name in self.timeframes.keys(): table_name = f"{coin}_{tf_name}" cursor = conn.cursor() cursor.execute(f"PRAGMA table_info('{table_name}')") columns = cursor.fetchall() if columns: # --- FIX: Check for the correct PRIMARY KEY on timestamp_ms --- pk_found = any(col[1] == 'timestamp_ms' and col[5] == 1 for col in columns) if not pk_found: logging.warning(f"Schema migration needed for table '{table_name}'.") try: conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"') self._create_resampled_table(conn, table_name) # Copy data, ensuring to create the timestamp_ms logging.info(f" -> Migrating data for '{table_name}'...") old_df = pd.read_sql(f'SELECT * FROM "{table_name}_old"', conn, parse_dates=['datetime_utc']) if not old_df.empty: old_df['timestamp_ms'] = (old_df['datetime_utc'].astype('int64') // 10**6) # Keep only unique timestamps, preserving the last entry old_df.drop_duplicates(subset=['timestamp_ms'], keep='last', inplace=True) old_df.to_sql(table_name, conn, if_exists='append', index=False) logging.info(f" -> Data migration complete.") conn.execute(f'DROP TABLE "{table_name}_old"') conn.commit() logging.info(f"Successfully migrated schema for '{table_name}'.") except Exception as e: logging.error(f"FATAL: Migration for '{table_name}' failed: {e}. Please delete 'market_data.db' and restart.") sys.exit(1) else: self._create_resampled_table(conn, table_name) logging.info("All resampled table schemas verified.") def _create_resampled_table(self, conn, table_name): """Creates a new resampled table with the correct schema.""" # --- FIX: Set PRIMARY KEY on timestamp_ms for performance and uniqueness --- conn.execute(f''' CREATE TABLE "{table_name}" ( datetime_utc TEXT, timestamp_ms INTEGER PRIMARY KEY, open REAL, high REAL, low REAL, close REAL, volume REAL, number_of_trades INTEGER ) ''') def _load_existing_status(self) -> dict: """Loads the existing status file if it exists, otherwise returns an empty dict.""" if os.path.exists(self.status_file_path): try: with open(self.status_file_path, 'r', encoding='utf-8') as f: logging.debug(f"Loading existing status from '{self.status_file_path}'") return json.load(f) except (IOError, json.JSONDecodeError) as e: logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}") return {} def run(self): """ Main execution function to process all configured coins and update the database. """ self.job_start_time = datetime.now(timezone.utc) logging.info(f"--- Resampling job started at {self.job_start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} ---") if '1m' in self.timeframes: logging.debug("Ignoring '1m' timeframe as it is the source resolution.") del self.timeframes['1m'] if not self.timeframes: logging.warning("No timeframes to process after filtering. Exiting job.") return if not os.path.exists(self.db_path): logging.error(f"Database file '{self.db_path}' not found.") return with sqlite3.connect(self.db_path) as conn: conn.execute("PRAGMA journal_mode=WAL;") logging.debug(f"Processing {len(self.coins_to_process)} coins...") for coin in self.coins_to_process: logging.debug(f"--- Processing {coin} ---") try: for tf_name, tf_code in self.timeframes.items(): target_table_name = f"{coin}_{tf_name}" source_table_name = f"{coin}_1m" logging.debug(f" Updating {tf_name} table...") last_timestamp_ms = self._get_last_timestamp(conn, target_table_name) query = f'SELECT * FROM "{source_table_name}"' params = () if last_timestamp_ms: query += ' WHERE timestamp_ms >= ?' # Go back one interval to rebuild the last (potentially partial) candle try: interval_delta_ms = pd.to_timedelta(tf_code).total_seconds() * 1000 except ValueError: # Fall back to a safe 32-day lookback for special timeframes interval_delta_ms = timedelta(days=32).total_seconds() * 1000 query_start_ms = last_timestamp_ms - interval_delta_ms params = (query_start_ms,) df_1m = pd.read_sql(query, conn, params=params, parse_dates=['datetime_utc']) if df_1m.empty: logging.debug(f" -> No new 1-minute data for {tf_name}. Table is up to date.") continue df_1m.set_index('datetime_utc', inplace=True) resampled_df = df_1m.resample(tf_code).agg(self.aggregation_logic) resampled_df.dropna(how='all', inplace=True) if not resampled_df.empty: records_to_upsert = [] for index, row in resampled_df.iterrows(): records_to_upsert.append(( index.strftime('%Y-%m-%d %H:%M:%S'), int(index.timestamp() * 1000), # Generate timestamp_ms row['open'], row['high'], row['low'], row['close'], row['volume'], row['number_of_trades'] )) cursor = conn.cursor() cursor.executemany(f''' INSERT OR REPLACE INTO "{target_table_name}" (datetime_utc, timestamp_ms, open, high, low, close, volume, number_of_trades) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', records_to_upsert) conn.commit() logging.debug(f" -> Upserted {len(resampled_df)} candles into '{target_table_name}'.") if coin not in self.resampling_status: self.resampling_status[coin] = {} total_candles = int(self._get_table_count(conn, target_table_name)) self.resampling_status[coin][tf_name] = { "last_candle_utc": resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S'), "total_candles": total_candles } except Exception as e: logging.error(f"Failed to process coin '{coin}': {e}") self._log_summary() self._save_status() logging.info(f"--- Resampling job finished at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')} ---") def _log_summary(self): """Logs a summary of the total candles for each timeframe.""" logging.info("--- Resampling Job Summary ---") timeframe_totals = {} for coin, tfs in self.resampling_status.items(): if not isinstance(tfs, dict): continue for tf_name, tf_data in tfs.items(): total = tf_data.get("total_candles", 0) if tf_name not in timeframe_totals: timeframe_totals[tf_name] = 0 timeframe_totals[tf_name] += total if not timeframe_totals: logging.info("No candles were resampled in this run.") return logging.info("Total candles per timeframe across all processed coins:") for tf_name, total in sorted(timeframe_totals.items()): logging.info(f" - {tf_name:<10}: {total:,} candles") def _get_last_timestamp(self, conn, table_name): """Gets the millisecond timestamp of the last entry in a table.""" try: # --- FIX: Query for the integer timestamp_ms, not the text datetime_utc --- timestamp_ms = pd.read_sql(f'SELECT MAX(timestamp_ms) FROM "{table_name}"', conn).iloc[0, 0] return int(timestamp_ms) if pd.notna(timestamp_ms) else None except (pd.io.sql.DatabaseError, IndexError): return None def _get_table_count(self, conn, table_name): """Gets the total row count of a table.""" try: return pd.read_sql(f'SELECT COUNT(*) FROM "{table_name}"', conn).iloc[0, 0] except (pd.io.sql.DatabaseError, IndexError): return 0 def _save_status(self): """Saves the final resampling status to a JSON file.""" if not self.resampling_status: return stop_time = datetime.now(timezone.utc) self.resampling_status['job_start_time_utc'] = self.job_start_time.strftime('%Y-%m-%d %H:%M:%S') self.resampling_status['job_stop_time_utc'] = stop_time.strftime('%Y-%m-%d %H:%M:%S') self.resampling_status.pop('last_completed_utc', None) try: with open(self.status_file_path, 'w', encoding='utf-8') as f: json.dump(self.resampling_status, f, indent=4, sort_keys=True) logging.info(f"Successfully saved resampling status to '{self.status_file_path}'") except IOError as e: logging.error(f"Failed to write resampling status file: {e}") def parse_timeframes(tf_strings: list) -> dict: """Converts a list of timeframe strings into a dictionary for pandas.""" tf_map = {} for tf_str in tf_strings: numeric_part = ''.join(filter(str.isdigit, tf_str)) unit = ''.join(filter(str.isalpha, tf_str)) # Keep case for 'M' key = tf_str code = '' if unit == 'm': code = f"{numeric_part}min" elif unit.lower() == 'w': code = f"{numeric_part}W-MON" elif unit == 'M': code = f"{numeric_part}MS" key = f"{numeric_part}month" elif unit.lower() in ['h', 'd']: code = f"{numeric_part}{unit.lower()}" else: code = tf_str logging.warning(f"Unrecognized timeframe unit in '{tf_str}'. Using as-is.") tf_map[key] = code return tf_map if __name__ == "__main__": parser = argparse.ArgumentParser(description="Resample 1-minute candle data from SQLite to other timeframes.") parser.add_argument("--coins", nargs='+', required=True, help="List of coins to process.") parser.add_argument("--timeframes", nargs='+', required=True, help="List of timeframes to generate.") parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) args = parser.parse_args() timeframes_dict = parse_timeframes(args.timeframes) resampler = Resampler(log_level=args.log_level, coins=args.coins, timeframes=timeframes_dict) resampler.run()