Files
hyper/resampler.py
2025-10-18 15:10:46 +02:00

193 lines
8.4 KiB
Python

import argparse
import logging
import os
import sys
import sqlite3
import pandas as pd
import json
from datetime import datetime, timezone
# Assuming logging_utils.py is in the same directory
from logging_utils import setup_logging
class Resampler:
"""
Reads new 1-minute candle data from the SQLite database, resamples it to
various timeframes, and appends the new candles to the corresponding tables.
"""
def __init__(self, log_level: str, coins: list, timeframes: dict):
setup_logging(log_level, 'Resampler')
self.db_path = os.path.join("_data", "market_data.db")
self.status_file_path = os.path.join("_data", "resampling_status.json")
self.coins_to_process = coins
self.timeframes = timeframes
self.aggregation_logic = {
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum',
'number_of_trades': 'sum'
}
self.resampling_status = self._load_existing_status()
self.job_start_time = None
def _load_existing_status(self) -> dict:
"""Loads the existing status file if it exists, otherwise returns an empty dict."""
if os.path.exists(self.status_file_path):
try:
with open(self.status_file_path, 'r', encoding='utf-8') as f:
logging.debug(f"Loading existing status from '{self.status_file_path}'")
return json.load(f)
except (IOError, json.JSONDecodeError) as e:
logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}")
return {}
def run(self):
"""
Main execution function to process all configured coins and update the database.
"""
self.job_start_time = datetime.now(timezone.utc)
logging.info(f"--- Resampling job started at {self.job_start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} ---")
if not os.path.exists(self.db_path):
logging.error(f"Database file '{self.db_path}' not found.")
return
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
logging.debug(f"Processing {len(self.coins_to_process)} coins...")
for coin in self.coins_to_process:
source_table_name = f"{coin}_1m"
logging.debug(f"--- Processing {coin} ---")
try:
# Load the full 1m history once per coin
df_1m = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn, parse_dates=['datetime_utc'])
if df_1m.empty:
logging.warning(f"Source table '{source_table_name}' is empty. Skipping.")
continue
df_1m.set_index('datetime_utc', inplace=True)
for tf_name, tf_code in self.timeframes.items():
target_table_name = f"{coin}_{tf_name}"
logging.debug(f" Updating {tf_name} table...")
last_timestamp = self._get_last_timestamp(conn, target_table_name)
# Get the new 1-minute data that needs to be processed
new_df_1m = df_1m[df_1m.index > last_timestamp] if last_timestamp else df_1m
if new_df_1m.empty:
logging.debug(f" -> No new 1-minute data for {tf_name}. Table is up to date.")
continue
resampled_df = new_df_1m.resample(tf_code).agg(self.aggregation_logic)
resampled_df.dropna(how='all', inplace=True)
if not resampled_df.empty:
# Append the newly resampled data to the target table
resampled_df.to_sql(target_table_name, conn, if_exists='append', index=True)
logging.debug(f" -> Appended {len(resampled_df)} new candles to '{target_table_name}'.")
if coin not in self.resampling_status: self.resampling_status[coin] = {}
total_candles = int(self._get_table_count(conn, target_table_name))
self.resampling_status[coin][tf_name] = {
"last_candle_utc": resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S'),
"total_candles": total_candles
}
except Exception as e:
logging.error(f"Failed to process coin '{coin}': {e}")
self._log_summary()
self._save_status()
logging.info(f"--- Resampling job finished at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')} ---")
def _log_summary(self):
"""Logs a summary of the total candles for each timeframe."""
logging.info("--- Resampling Job Summary ---")
timeframe_totals = {}
# Iterate through coins, skipping metadata keys
for coin, tfs in self.resampling_status.items():
if not isinstance(tfs, dict): continue
for tf_name, tf_data in tfs.items():
total = tf_data.get("total_candles", 0)
if tf_name not in timeframe_totals:
timeframe_totals[tf_name] = 0
timeframe_totals[tf_name] += total
if not timeframe_totals:
logging.info("No candles were resampled in this run.")
return
logging.info("Total candles per timeframe across all processed coins:")
for tf_name, total in sorted(timeframe_totals.items()):
logging.info(f" - {tf_name:<10}: {total:,} candles")
def _get_last_timestamp(self, conn, table_name):
"""Gets the timestamp of the last entry in a table."""
try:
return pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0]
except (pd.io.sql.DatabaseError, IndexError):
return None
def _get_table_count(self, conn, table_name):
"""Gets the total row count of a table."""
try:
return pd.read_sql(f'SELECT COUNT(*) FROM "{table_name}"', conn).iloc[0, 0]
except (pd.io.sql.DatabaseError, IndexError):
return 0
def _save_status(self):
"""Saves the final resampling status to a JSON file."""
if not self.resampling_status:
return
stop_time = datetime.now(timezone.utc)
self.resampling_status['job_start_time_utc'] = self.job_start_time.strftime('%Y-%m-%d %H:%M:%S')
self.resampling_status['job_stop_time_utc'] = stop_time.strftime('%Y-%m-%d %H:%M:%S')
# Clean up old key if it exists from previous versions
self.resampling_status.pop('last_completed_utc', None)
try:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(self.resampling_status, f, indent=4, sort_keys=True)
logging.info(f"Successfully saved resampling status to '{self.status_file_path}'")
except IOError as e:
logging.error(f"Failed to write resampling status file: {e}")
def parse_timeframes(tf_strings: list) -> dict:
"""Converts a list of timeframe strings into a dictionary for pandas."""
tf_map = {}
for tf_str in tf_strings:
numeric_part = ''.join(filter(str.isdigit, tf_str))
unit = ''.join(filter(str.isalpha, tf_str)).lower()
code = ''
if unit == 'm': code = f"{numeric_part}min"
elif unit == 'w': code = f"{numeric_part}W"
elif unit in ['h', 'd']: code = f"{numeric_part}{unit}"
else: code = tf_str
tf_map[tf_str] = code
return tf_map
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Resample 1-minute candle data from SQLite to other timeframes.")
parser.add_argument("--coins", nargs='+', required=True, help="List of coins to process.")
parser.add_argument("--timeframes", nargs='+', required=True, help="List of timeframes to generate.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
timeframes_dict = parse_timeframes(args.timeframes)
resampler = Resampler(log_level=args.log_level, coins=args.coins, timeframes=timeframes_dict)
resampler.run()