readme.md
This commit is contained in:
262
resampler.py
262
resampler.py
@ -5,17 +5,15 @@ import sys
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
import json
|
||||
from datetime import datetime, timezone, timedelta
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# Assuming logging_utils.py is in the same directory
|
||||
from logging_utils import setup_logging
|
||||
|
||||
class Resampler:
|
||||
"""
|
||||
Reads 1-minute candle data directly from the SQLite database, resamples
|
||||
it to various timeframes, and stores the results back in the database.
|
||||
This script is designed to run continuously as a self-scheduling service.
|
||||
Reads new 1-minute candle data from the SQLite database, resamples it to
|
||||
various timeframes, and appends the new candles to the corresponding tables.
|
||||
"""
|
||||
|
||||
def __init__(self, log_level: str, coins: list, timeframes: dict):
|
||||
@ -32,120 +30,130 @@ class Resampler:
|
||||
'volume': 'sum',
|
||||
'number_of_trades': 'sum'
|
||||
}
|
||||
self.resampling_status = {}
|
||||
|
||||
def _execute_resampling_job(self):
|
||||
"""
|
||||
Main execution function to process all configured coins and update the database.
|
||||
"""
|
||||
if not os.path.exists(self.db_path):
|
||||
logging.error(f"Database file '{self.db_path}' not found. "
|
||||
"Please run the data fetcher script first.")
|
||||
return # Don't exit, just wait for the next cycle
|
||||
|
||||
# Load the latest status file at the start of each job
|
||||
self.resampling_status = self._load_existing_status()
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL;")
|
||||
|
||||
logging.info(f"Processing {len(self.coins_to_process)} coins: {', '.join(self.coins_to_process)}")
|
||||
|
||||
for coin in self.coins_to_process:
|
||||
source_table_name = f"{coin}_1m"
|
||||
logging.info(f"--- Processing {coin} ---")
|
||||
|
||||
try:
|
||||
df = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn)
|
||||
|
||||
if df.empty:
|
||||
logging.warning(f"Source table '{source_table_name}' is empty or does not exist. Skipping.")
|
||||
continue
|
||||
|
||||
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
|
||||
df.set_index('datetime_utc', inplace=True)
|
||||
|
||||
for tf_name, tf_code in self.timeframes.items():
|
||||
logging.info(f" Resampling to {tf_name}...")
|
||||
|
||||
resampled_df = df.resample(tf_code).agg(self.aggregation_logic)
|
||||
resampled_df.dropna(how='all', inplace=True)
|
||||
|
||||
if coin not in self.resampling_status:
|
||||
self.resampling_status[coin] = {}
|
||||
|
||||
if not resampled_df.empty:
|
||||
target_table_name = f"{coin}_{tf_name}"
|
||||
resampled_df.to_sql(
|
||||
target_table_name,
|
||||
conn,
|
||||
if_exists='replace',
|
||||
index=True
|
||||
)
|
||||
|
||||
last_timestamp = resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S')
|
||||
num_candles = len(resampled_df)
|
||||
|
||||
self.resampling_status[coin][tf_name] = {
|
||||
"last_candle_utc": last_timestamp,
|
||||
"total_candles": num_candles
|
||||
}
|
||||
else:
|
||||
logging.info(f" -> No data to save for '{coin}_{tf_name}'.")
|
||||
self.resampling_status[coin][tf_name] = {
|
||||
"last_candle_utc": "N/A",
|
||||
"total_candles": 0
|
||||
}
|
||||
|
||||
except pd.io.sql.DatabaseError as e:
|
||||
logging.warning(f"Could not read source table '{source_table_name}': {e}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to process coin '{coin}': {e}")
|
||||
|
||||
self._save_status()
|
||||
logging.info("--- Resampling job complete ---")
|
||||
|
||||
def run_periodically(self):
|
||||
"""Runs the resampling job at every 5-minute mark of the hour (00, 05, 10...)."""
|
||||
logging.info("Resampler started. Waiting for the first scheduled run...")
|
||||
while True:
|
||||
# 1. Calculate sleep time
|
||||
now = datetime.now(timezone.utc)
|
||||
# Calculate how many minutes past the last 5-minute mark we are
|
||||
minutes_past_mark = now.minute % 5
|
||||
seconds_past_mark = minutes_past_mark * 60 + now.second + (now.microsecond / 1_000_000)
|
||||
|
||||
# The total interval is 5 minutes (300 seconds)
|
||||
sleep_duration = 300 - seconds_past_mark
|
||||
|
||||
# Add a small buffer to ensure the candle data is ready
|
||||
sleep_duration += 5
|
||||
|
||||
logging.info(f"Next resampling run in {sleep_duration:.2f} seconds.")
|
||||
time.sleep(sleep_duration)
|
||||
|
||||
# 2. Execute the job
|
||||
logging.info("Scheduled time reached. Starting resampling job...")
|
||||
self._execute_resampling_job()
|
||||
self.job_start_time = None
|
||||
|
||||
def _load_existing_status(self) -> dict:
|
||||
"""Loads the existing status file if it exists, otherwise returns an empty dict."""
|
||||
if os.path.exists(self.status_file_path):
|
||||
try:
|
||||
with open(self.status_file_path, 'r', encoding='utf-8') as f:
|
||||
logging.debug(f"Loading existing status from '{self.status_file_path}'")
|
||||
return json.load(f)
|
||||
except (IOError, json.JSONDecodeError) as e:
|
||||
logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}")
|
||||
return {}
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Main execution function to process all configured coins and update the database.
|
||||
"""
|
||||
self.job_start_time = datetime.now(timezone.utc)
|
||||
logging.info(f"--- Resampling job started at {self.job_start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} ---")
|
||||
|
||||
if not os.path.exists(self.db_path):
|
||||
logging.error(f"Database file '{self.db_path}' not found.")
|
||||
return
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL;")
|
||||
|
||||
logging.debug(f"Processing {len(self.coins_to_process)} coins...")
|
||||
|
||||
for coin in self.coins_to_process:
|
||||
source_table_name = f"{coin}_1m"
|
||||
logging.debug(f"--- Processing {coin} ---")
|
||||
|
||||
try:
|
||||
# Load the full 1m history once per coin
|
||||
df_1m = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn, parse_dates=['datetime_utc'])
|
||||
if df_1m.empty:
|
||||
logging.warning(f"Source table '{source_table_name}' is empty. Skipping.")
|
||||
continue
|
||||
df_1m.set_index('datetime_utc', inplace=True)
|
||||
|
||||
for tf_name, tf_code in self.timeframes.items():
|
||||
target_table_name = f"{coin}_{tf_name}"
|
||||
logging.debug(f" Updating {tf_name} table...")
|
||||
|
||||
last_timestamp = self._get_last_timestamp(conn, target_table_name)
|
||||
|
||||
# Get the new 1-minute data that needs to be processed
|
||||
new_df_1m = df_1m[df_1m.index > last_timestamp] if last_timestamp else df_1m
|
||||
|
||||
if new_df_1m.empty:
|
||||
logging.debug(f" -> No new 1-minute data for {tf_name}. Table is up to date.")
|
||||
continue
|
||||
|
||||
resampled_df = new_df_1m.resample(tf_code).agg(self.aggregation_logic)
|
||||
resampled_df.dropna(how='all', inplace=True)
|
||||
|
||||
if not resampled_df.empty:
|
||||
# Append the newly resampled data to the target table
|
||||
resampled_df.to_sql(target_table_name, conn, if_exists='append', index=True)
|
||||
logging.debug(f" -> Appended {len(resampled_df)} new candles to '{target_table_name}'.")
|
||||
|
||||
if coin not in self.resampling_status: self.resampling_status[coin] = {}
|
||||
total_candles = int(self._get_table_count(conn, target_table_name))
|
||||
self.resampling_status[coin][tf_name] = {
|
||||
"last_candle_utc": resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S'),
|
||||
"total_candles": total_candles
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to process coin '{coin}': {e}")
|
||||
|
||||
self._log_summary()
|
||||
self._save_status()
|
||||
logging.info(f"--- Resampling job finished at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')} ---")
|
||||
|
||||
def _log_summary(self):
|
||||
"""Logs a summary of the total candles for each timeframe."""
|
||||
logging.info("--- Resampling Job Summary ---")
|
||||
timeframe_totals = {}
|
||||
# Iterate through coins, skipping metadata keys
|
||||
for coin, tfs in self.resampling_status.items():
|
||||
if not isinstance(tfs, dict): continue
|
||||
for tf_name, tf_data in tfs.items():
|
||||
total = tf_data.get("total_candles", 0)
|
||||
if tf_name not in timeframe_totals:
|
||||
timeframe_totals[tf_name] = 0
|
||||
timeframe_totals[tf_name] += total
|
||||
|
||||
if not timeframe_totals:
|
||||
logging.info("No candles were resampled in this run.")
|
||||
return
|
||||
|
||||
logging.info("Total candles per timeframe across all processed coins:")
|
||||
for tf_name, total in sorted(timeframe_totals.items()):
|
||||
logging.info(f" - {tf_name:<10}: {total:,} candles")
|
||||
|
||||
def _get_last_timestamp(self, conn, table_name):
|
||||
"""Gets the timestamp of the last entry in a table."""
|
||||
try:
|
||||
return pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0]
|
||||
except (pd.io.sql.DatabaseError, IndexError):
|
||||
return None
|
||||
|
||||
def _get_table_count(self, conn, table_name):
|
||||
"""Gets the total row count of a table."""
|
||||
try:
|
||||
return pd.read_sql(f'SELECT COUNT(*) FROM "{table_name}"', conn).iloc[0, 0]
|
||||
except (pd.io.sql.DatabaseError, IndexError):
|
||||
return 0
|
||||
|
||||
def _save_status(self):
|
||||
"""Saves the final resampling status to a JSON file."""
|
||||
if not self.resampling_status:
|
||||
logging.warning("No data was resampled, skipping status file creation.")
|
||||
return
|
||||
|
||||
self.resampling_status['last_completed_utc'] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
stop_time = datetime.now(timezone.utc)
|
||||
self.resampling_status['job_start_time_utc'] = self.job_start_time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
self.resampling_status['job_stop_time_utc'] = stop_time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# Clean up old key if it exists from previous versions
|
||||
self.resampling_status.pop('last_completed_utc', None)
|
||||
|
||||
try:
|
||||
with open(self.status_file_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.resampling_status, f, indent=4, sort_keys=True)
|
||||
@ -162,45 +170,23 @@ def parse_timeframes(tf_strings: list) -> dict:
|
||||
unit = ''.join(filter(str.isalpha, tf_str)).lower()
|
||||
|
||||
code = ''
|
||||
if unit == 'm':
|
||||
code = f"{numeric_part}min"
|
||||
elif unit == 'w':
|
||||
code = f"{numeric_part}W"
|
||||
elif unit in ['h', 'd']:
|
||||
code = f"{numeric_part}{unit}"
|
||||
else:
|
||||
code = tf_str
|
||||
logging.warning(f"Unrecognized timeframe unit in '{tf_str}'. Using as-is.")
|
||||
|
||||
if unit == 'm': code = f"{numeric_part}min"
|
||||
elif unit == 'w': code = f"{numeric_part}W"
|
||||
elif unit in ['h', 'd']: code = f"{numeric_part}{unit}"
|
||||
else: code = tf_str
|
||||
tf_map[tf_str] = code
|
||||
return tf_map
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# The script now runs as a long-running service, loading its config from a file.
|
||||
CONFIG_FILE = "resampler_conf.json"
|
||||
try:
|
||||
with open(CONFIG_FILE, 'r') as f:
|
||||
config = json.load(f)
|
||||
coins = config.get("coins", [])
|
||||
timeframes_list = config.get("timeframes", [])
|
||||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||||
print(f"FATAL: Could not load '{CONFIG_FILE}'. Please ensure it exists and is valid. Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Use a basic log level until the class is initialized
|
||||
setup_logging('normal', 'Resampler')
|
||||
|
||||
timeframes_dict = parse_timeframes(timeframes_list)
|
||||
|
||||
resampler = Resampler(
|
||||
log_level='normal',
|
||||
coins=coins,
|
||||
timeframes=timeframes_dict
|
||||
)
|
||||
|
||||
try:
|
||||
resampler.run_periodically()
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Resampler process stopped.")
|
||||
parser = argparse.ArgumentParser(description="Resample 1-minute candle data from SQLite to other timeframes.")
|
||||
parser.add_argument("--coins", nargs='+', required=True, help="List of coins to process.")
|
||||
parser.add_argument("--timeframes", nargs='+', required=True, help="List of timeframes to generate.")
|
||||
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
|
||||
args = parser.parse_args()
|
||||
|
||||
timeframes_dict = parse_timeframes(args.timeframes)
|
||||
|
||||
resampler = Resampler(log_level=args.log_level, coins=args.coins, timeframes=timeframes_dict)
|
||||
resampler.run()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user