import argparse import logging import os import sys import sqlite3 import pandas as pd import json from datetime import datetime, timezone, timedelta import time # Assuming logging_utils.py is in the same directory from logging_utils import setup_logging class Resampler: """ Reads 1-minute candle data directly from the SQLite database, resamples it to various timeframes, and stores the results back in the database. This script is designed to run continuously as a self-scheduling service. """ def __init__(self, log_level: str, coins: list, timeframes: dict): setup_logging(log_level, 'Resampler') self.db_path = os.path.join("_data", "market_data.db") self.status_file_path = os.path.join("_data", "resampling_status.json") self.coins_to_process = coins self.timeframes = timeframes self.aggregation_logic = { 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum', 'number_of_trades': 'sum' } self.resampling_status = {} def _execute_resampling_job(self): """ Main execution function to process all configured coins and update the database. """ if not os.path.exists(self.db_path): logging.error(f"Database file '{self.db_path}' not found. " "Please run the data fetcher script first.") return # Don't exit, just wait for the next cycle # Load the latest status file at the start of each job self.resampling_status = self._load_existing_status() with sqlite3.connect(self.db_path) as conn: conn.execute("PRAGMA journal_mode=WAL;") logging.info(f"Processing {len(self.coins_to_process)} coins: {', '.join(self.coins_to_process)}") for coin in self.coins_to_process: source_table_name = f"{coin}_1m" logging.info(f"--- Processing {coin} ---") try: df = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn) if df.empty: logging.warning(f"Source table '{source_table_name}' is empty or does not exist. Skipping.") continue df['datetime_utc'] = pd.to_datetime(df['datetime_utc']) df.set_index('datetime_utc', inplace=True) for tf_name, tf_code in self.timeframes.items(): logging.info(f" Resampling to {tf_name}...") resampled_df = df.resample(tf_code).agg(self.aggregation_logic) resampled_df.dropna(how='all', inplace=True) if coin not in self.resampling_status: self.resampling_status[coin] = {} if not resampled_df.empty: target_table_name = f"{coin}_{tf_name}" resampled_df.to_sql( target_table_name, conn, if_exists='replace', index=True ) last_timestamp = resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S') num_candles = len(resampled_df) self.resampling_status[coin][tf_name] = { "last_candle_utc": last_timestamp, "total_candles": num_candles } else: logging.info(f" -> No data to save for '{coin}_{tf_name}'.") self.resampling_status[coin][tf_name] = { "last_candle_utc": "N/A", "total_candles": 0 } except pd.io.sql.DatabaseError as e: logging.warning(f"Could not read source table '{source_table_name}': {e}") except Exception as e: logging.error(f"Failed to process coin '{coin}': {e}") self._save_status() logging.info("--- Resampling job complete ---") def run_periodically(self): """Runs the resampling job at every 5-minute mark of the hour (00, 05, 10...).""" logging.info("Resampler started. Waiting for the first scheduled run...") while True: # 1. Calculate sleep time now = datetime.now(timezone.utc) # Calculate how many minutes past the last 5-minute mark we are minutes_past_mark = now.minute % 5 seconds_past_mark = minutes_past_mark * 60 + now.second + (now.microsecond / 1_000_000) # The total interval is 5 minutes (300 seconds) sleep_duration = 300 - seconds_past_mark # Add a small buffer to ensure the candle data is ready sleep_duration += 5 logging.info(f"Next resampling run in {sleep_duration:.2f} seconds.") time.sleep(sleep_duration) # 2. Execute the job logging.info("Scheduled time reached. Starting resampling job...") self._execute_resampling_job() def _load_existing_status(self) -> dict: """Loads the existing status file if it exists, otherwise returns an empty dict.""" if os.path.exists(self.status_file_path): try: with open(self.status_file_path, 'r', encoding='utf-8') as f: return json.load(f) except (IOError, json.JSONDecodeError) as e: logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}") return {} def _save_status(self): """Saves the final resampling status to a JSON file.""" if not self.resampling_status: logging.warning("No data was resampled, skipping status file creation.") return self.resampling_status['last_completed_utc'] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') try: with open(self.status_file_path, 'w', encoding='utf-8') as f: json.dump(self.resampling_status, f, indent=4, sort_keys=True) logging.info(f"Successfully saved resampling status to '{self.status_file_path}'") except IOError as e: logging.error(f"Failed to write resampling status file: {e}") def parse_timeframes(tf_strings: list) -> dict: """Converts a list of timeframe strings into a dictionary for pandas.""" tf_map = {} for tf_str in tf_strings: numeric_part = ''.join(filter(str.isdigit, tf_str)) unit = ''.join(filter(str.isalpha, tf_str)).lower() code = '' if unit == 'm': code = f"{numeric_part}min" elif unit == 'w': code = f"{numeric_part}W" elif unit in ['h', 'd']: code = f"{numeric_part}{unit}" else: code = tf_str logging.warning(f"Unrecognized timeframe unit in '{tf_str}'. Using as-is.") tf_map[tf_str] = code return tf_map if __name__ == "__main__": # The script now runs as a long-running service, loading its config from a file. CONFIG_FILE = "resampler_conf.json" try: with open(CONFIG_FILE, 'r') as f: config = json.load(f) coins = config.get("coins", []) timeframes_list = config.get("timeframes", []) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"FATAL: Could not load '{CONFIG_FILE}'. Please ensure it exists and is valid. Error: {e}") sys.exit(1) # Use a basic log level until the class is initialized setup_logging('normal', 'Resampler') timeframes_dict = parse_timeframes(timeframes_list) resampler = Resampler( log_level='normal', coins=coins, timeframes=timeframes_dict ) try: resampler.run_periodically() except KeyboardInterrupt: logging.info("Resampler process stopped.")