From 656c4a9448e6dd5411f9facdb83b14f676eac46e Mon Sep 17 00:00:00 2001 From: DiTus Date: Tue, 21 Oct 2025 23:07:07 +0200 Subject: [PATCH] resampler much faster --- main_app.py | 32 ++++++------ resampler.py | 138 +++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 125 insertions(+), 45 deletions(-) diff --git a/main_app.py b/main_app.py index a53256b..aca61fb 100644 --- a/main_app.py +++ b/main_app.py @@ -16,7 +16,6 @@ from live_market_utils import start_live_feed # --- Configuration --- WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"] -# --- FIX: Replaced old data_fetcher with the new live_candle_fetcher --- LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py" RESAMPLER_SCRIPT = "resampler.py" MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py" @@ -27,6 +26,9 @@ MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json") LOGS_DIR = "_logs" TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json") +# --- ADDED: Standard list of timeframes for the resampler to generate --- +STANDARD_RESAMPLING_TIMEFRAMES = ["3m", "5m", "15m", "30m", "37m", "148m", "1h", "2h", "4h", "8h", "12h", "1d", "3d", "1w", "1M"] + def format_market_cap(mc_value): """Formats a large number into a human-readable market cap string.""" @@ -61,7 +63,7 @@ def run_resampler_job(timeframes_to_generate: list): """Defines the job for the resampler, redirecting output to a log file.""" log_file = os.path.join(LOGS_DIR, "resampler.log") try: - command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--timeframes"] + timeframes_to_generate + ["--log-level", "off"] + command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--timeframes"] + timeframes_to_generate + ["--log-level", "normal"] with open(log_file, 'a') as f: f.write(f"\n--- Starting resampler.py job at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) @@ -71,14 +73,17 @@ def run_resampler_job(timeframes_to_generate: list): f.write(f"Failed to run resampler.py job: {e}\n") -def resampler_scheduler(timeframes_to_generate: list): - """Schedules the resampler.py script.""" +def resampler_scheduler(): + """Schedules the resampler.py script to run at the start of every minute.""" setup_logging('off', 'ResamplerScheduler') - run_resampler_job(timeframes_to_generate) - schedule.every(4).minutes.do(run_resampler_job, timeframes_to_generate) + # Run once at startup + run_resampler_job(STANDARD_RESAMPLING_TIMEFRAMES) + # Schedule to run every minute at the :01 second mark + schedule.every().minute.at(":01").do(run_resampler_job, timeframes_to_generate=STANDARD_RESAMPLING_TIMEFRAMES) + logging.info("Resampler scheduled to run every minute at :01.") while True: schedule.run_pending() - time.sleep(1) + time.sleep(1) # Check every second to not miss the scheduled time def run_market_cap_fetcher_job(): @@ -314,22 +319,13 @@ if __name__ == "__main__": logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}") sys.exit(1) - required_timeframes = set() - for name, config in strategy_configs.items(): - if config.get("enabled", False): - tf = config.get("parameters", {}).get("timeframe") - if tf: - required_timeframes.add(tf) - - if not required_timeframes: - logging.warning("No timeframes required by any enabled strategy.") - with multiprocessing.Manager() as manager: shared_prices = manager.dict() processes["Live Market Feed"] = multiprocessing.Process(target=start_live_feed, args=(shared_prices, 'off'), daemon=True) processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True) - processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True) + # --- FIX: The resampler now uses a fixed list of TFs and a new schedule --- + processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, daemon=True) processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) processes["Trade Executor"] = multiprocessing.Process(target=run_trade_executor, daemon=True) diff --git a/resampler.py b/resampler.py index 09fec2d..ea489b9 100644 --- a/resampler.py +++ b/resampler.py @@ -5,7 +5,7 @@ import sys import sqlite3 import pandas as pd import json -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta # Assuming logging_utils.py is in the same directory from logging_utils import setup_logging @@ -13,7 +13,8 @@ from logging_utils import setup_logging class Resampler: """ Reads new 1-minute candle data from the SQLite database, resamples it to - various timeframes, and appends the new candles to the corresponding tables. + various timeframes, and upserts the new candles to the corresponding tables, + preventing data duplication. """ def __init__(self, log_level: str, coins: list, timeframes: dict): @@ -32,6 +33,51 @@ class Resampler: } self.resampling_status = self._load_existing_status() self.job_start_time = None + self._ensure_tables_exist() + + def _ensure_tables_exist(self): + """ + Ensures all resampled tables exist with a PRIMARY KEY on datetime_utc. + Attempts to migrate existing tables if the schema is incorrect. + """ + with sqlite3.connect(self.db_path) as conn: + for coin in self.coins_to_process: + for tf_name in self.timeframes.keys(): + table_name = f"{coin}_{tf_name}" + cursor = conn.cursor() + cursor.execute(f"PRAGMA table_info('{table_name}')") + columns = cursor.fetchall() + if columns: + pk_found = any(col[1] == 'datetime_utc' and col[5] == 1 for col in columns) + if not pk_found: + logging.warning(f"Schema migration needed for table '{table_name}'.") + try: + conn.execute(f'ALTER TABLE "{table_name}" RENAME TO "{table_name}_old"') + self._create_resampled_table(conn, table_name) + conn.execute(f'INSERT OR IGNORE INTO "{table_name}" SELECT * FROM "{table_name}_old"') + conn.execute(f'DROP TABLE "{table_name}_old"') + conn.commit() + logging.info(f"Successfully migrated schema for '{table_name}'.") + except Exception as e: + logging.error(f"FATAL: Migration for '{table_name}' failed: {e}. Please delete 'market_data.db' and restart.") + sys.exit(1) + else: + self._create_resampled_table(conn, table_name) + logging.info("All resampled table schemas verified.") + + def _create_resampled_table(self, conn, table_name): + """Creates a new resampled table with the correct schema.""" + conn.execute(f''' + CREATE TABLE "{table_name}" ( + datetime_utc TEXT PRIMARY KEY, + open REAL, + high REAL, + low REAL, + close REAL, + volume REAL, + number_of_trades INTEGER + ) + ''') def _load_existing_status(self) -> dict: """Loads the existing status file if it exists, otherwise returns an empty dict.""" @@ -51,6 +97,14 @@ class Resampler: self.job_start_time = datetime.now(timezone.utc) logging.info(f"--- Resampling job started at {self.job_start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} ---") + if '1m' in self.timeframes: + logging.debug("Ignoring '1m' timeframe as it is the source resolution.") + del self.timeframes['1m'] + + if not self.timeframes: + logging.warning("No timeframes to process after filtering. Exiting job.") + return + if not os.path.exists(self.db_path): logging.error(f"Database file '{self.db_path}' not found.") return @@ -61,37 +115,58 @@ class Resampler: logging.debug(f"Processing {len(self.coins_to_process)} coins...") for coin in self.coins_to_process: - source_table_name = f"{coin}_1m" logging.debug(f"--- Processing {coin} ---") try: - # Load the full 1m history once per coin - df_1m = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn, parse_dates=['datetime_utc']) - if df_1m.empty: - logging.warning(f"Source table '{source_table_name}' is empty. Skipping.") - continue - df_1m.set_index('datetime_utc', inplace=True) - for tf_name, tf_code in self.timeframes.items(): target_table_name = f"{coin}_{tf_name}" + source_table_name = f"{coin}_1m" logging.debug(f" Updating {tf_name} table...") last_timestamp = self._get_last_timestamp(conn, target_table_name) - # Get the new 1-minute data that needs to be processed - new_df_1m = df_1m[df_1m.index > last_timestamp] if last_timestamp else df_1m + query = f'SELECT * FROM "{source_table_name}"' + params = () + if last_timestamp: + query += ' WHERE datetime_utc >= ?' + try: + # --- FIX: Try the fast method first --- + interval_delta = pd.to_timedelta(tf_code) + query_start_date = last_timestamp - interval_delta + except ValueError: + # --- FIX: Fall back to the safe method for special timeframes --- + logging.debug(f"Cannot create timedelta for '{tf_code}'. Using safe 32-day lookback.") + query_start_date = last_timestamp - timedelta(days=32) + + params = (query_start_date.strftime('%Y-%m-%d %H:%M:%S'),) - if new_df_1m.empty: + df_1m = pd.read_sql(query, conn, params=params, parse_dates=['datetime_utc']) + + if df_1m.empty: logging.debug(f" -> No new 1-minute data for {tf_name}. Table is up to date.") continue - resampled_df = new_df_1m.resample(tf_code).agg(self.aggregation_logic) + df_1m.set_index('datetime_utc', inplace=True) + resampled_df = df_1m.resample(tf_code).agg(self.aggregation_logic) resampled_df.dropna(how='all', inplace=True) if not resampled_df.empty: - # Append the newly resampled data to the target table - resampled_df.to_sql(target_table_name, conn, if_exists='append', index=True) - logging.debug(f" -> Appended {len(resampled_df)} new candles to '{target_table_name}'.") + records_to_upsert = [] + for index, row in resampled_df.iterrows(): + records_to_upsert.append(( + index.strftime('%Y-%m-%d %H:%M:%S'), + row['open'], row['high'], row['low'], row['close'], + row['volume'], row['number_of_trades'] + )) + + cursor = conn.cursor() + cursor.executemany(f''' + INSERT OR REPLACE INTO "{target_table_name}" (datetime_utc, open, high, low, close, volume, number_of_trades) + VALUES (?, ?, ?, ?, ?, ?, ?) + ''', records_to_upsert) + conn.commit() + + logging.debug(f" -> Upserted {len(resampled_df)} candles into '{target_table_name}'.") if coin not in self.resampling_status: self.resampling_status[coin] = {} total_candles = int(self._get_table_count(conn, target_table_name)) @@ -111,7 +186,6 @@ class Resampler: """Logs a summary of the total candles for each timeframe.""" logging.info("--- Resampling Job Summary ---") timeframe_totals = {} - # Iterate through coins, skipping metadata keys for coin, tfs in self.resampling_status.items(): if not isinstance(tfs, dict): continue for tf_name, tf_data in tfs.items(): @@ -129,9 +203,10 @@ class Resampler: logging.info(f" - {tf_name:<10}: {total:,} candles") def _get_last_timestamp(self, conn, table_name): - """Gets the timestamp of the last entry in a table.""" + """Gets the timestamp of the last entry in a table as a pandas Timestamp.""" try: - return pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0] + timestamp_str = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0] + return pd.to_datetime(timestamp_str) if timestamp_str else None except (pd.io.sql.DatabaseError, IndexError): return None @@ -151,7 +226,6 @@ class Resampler: self.resampling_status['job_start_time_utc'] = self.job_start_time.strftime('%Y-%m-%d %H:%M:%S') self.resampling_status['job_stop_time_utc'] = stop_time.strftime('%Y-%m-%d %H:%M:%S') - # Clean up old key if it exists from previous versions self.resampling_status.pop('last_completed_utc', None) try: @@ -167,14 +241,24 @@ def parse_timeframes(tf_strings: list) -> dict: tf_map = {} for tf_str in tf_strings: numeric_part = ''.join(filter(str.isdigit, tf_str)) - unit = ''.join(filter(str.isalpha, tf_str)).lower() + unit = ''.join(filter(str.isalpha, tf_str)) # Keep case for 'M' + key = tf_str code = '' - if unit == 'm': code = f"{numeric_part}min" - elif unit == 'w': code = f"{numeric_part}W" - elif unit in ['h', 'd']: code = f"{numeric_part}{unit}" - else: code = tf_str - tf_map[tf_str] = code + if unit == 'm': + code = f"{numeric_part}min" + elif unit.lower() == 'w': + code = f"{numeric_part}W-MON" + elif unit == 'M': + code = f"{numeric_part}MS" + key = f"{numeric_part}month" + elif unit.lower() in ['h', 'd']: + code = f"{numeric_part}{unit.lower()}" + else: + code = tf_str + logging.warning(f"Unrecognized timeframe unit in '{tf_str}'. Using as-is.") + + tf_map[key] = code return tf_map