import argparse import logging import os import sys import sqlite3 import pandas as pd import json from datetime import datetime, timedelta import itertools import multiprocessing from functools import partial import time from logging_utils import setup_logging def _run_single_simulation(df: pd.DataFrame, params: dict) -> list: """ Core simulation logic. Takes a DataFrame and parameters, returns a list of trades. This is a pure function to be used by different data loaders. """ fast_ma_period = params.get('fast', 0) slow_ma_period = params.get('slow', 0) sma_period = params.get('sma_period', 0) if fast_ma_period and slow_ma_period: df['fast_sma'] = df['close'].rolling(window=fast_ma_period).mean() df['slow_sma'] = df['close'].rolling(window=slow_ma_period).mean() df['signal'] = (df['fast_sma'] > df['slow_sma']).astype(int) elif sma_period: df['sma'] = df['close'].rolling(window=sma_period).mean() df['signal'] = (df['close'] > df['sma']).astype(int) else: return [] df.dropna(inplace=True) if df.empty: return [] df['position'] = df['signal'].diff() trades = [] entry_price = 0 for i, row in df.iterrows(): if row['position'] == 1: if entry_price == 0: # Only enter if flat entry_price = row['close'] elif row['position'] == -1: if entry_price != 0: # Only exit if in a position pnl = (row['close'] - entry_price) / entry_price trades.append({'pnl_pct': pnl}) entry_price = 0 return trades def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, start_date: str, end_date: str) -> tuple[dict, list]: """ A worker function for multiprocessing. It loads its own data, runs the simulation, and returns the parameters and results together. """ df = pd.DataFrame() try: with sqlite3.connect(db_path) as conn: query = f'SELECT datetime_utc, close FROM "{coin}_{timeframe}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) if not df.empty: df.set_index('datetime_utc', inplace=True) except Exception as e: print(f"Worker error loading data for params {params}: {e}") return (params, []) if df.empty: return (params, []) trades = _run_single_simulation(df, params) return (params, trades) class Backtester: """ A class to run historical simulations (backtests) with parameter optimization and forward testing on trading strategies, using multiple cores to speed up the process. """ def __init__(self, log_level: str, strategy_name_to_test: str): setup_logging(log_level, 'Backtester') self.db_path = os.path.join("_data", "market_data.db") self.backtest_config = self._load_backtest_config(strategy_name_to_test) if not self.backtest_config: logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found in '_data/backtesting_conf.json'.") sys.exit(1) self.strategy_name = self.backtest_config.get('strategy_name') self.strategy_config = self._load_strategy_config() if not self.strategy_config: logging.error(f"Strategy '{self.strategy_name}' not found in '_data/strategies.json'.") sys.exit(1) self.params = self.strategy_config.get('parameters', {}) self.coin = self.params.get('coin') self.timeframe = self.params.get('timeframe') self.forward_test_start_date = (datetime.now() - timedelta(weeks=4)).strftime("%Y-%m-%d") self.backtest_end_date = (datetime.now() - timedelta(weeks=4, days=1)).strftime("%Y-%m-%d") self.full_history_start_date = "2020-01-01" self.pool = None def _load_backtest_config(self, name_to_test: str) -> dict: """Loads the specific backtest configuration from the JSON file.""" config_path = os.path.join("_data", "backtesting_conf.json") try: with open(config_path, 'r') as f: return json.load(f).get(name_to_test) except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load backtesting configuration: {e}") return None def _load_strategy_config(self) -> dict: """Loads the general strategy configuration.""" config_path = os.path.join("_data", "strategies.json") try: with open(config_path, 'r') as f: return json.load(f).get(self.strategy_name) except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load strategy configuration: {e}") return None def load_data(self, start_date, end_date) -> pd.DataFrame: """Loads historical data for a specific period for single-threaded tasks.""" table_name = f"{self.coin}_{self.timeframe}" logging.info(f"Loading data for {table_name} from {start_date} to {end_date}...") try: with sqlite3.connect(self.db_path) as conn: query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) if df.empty: logging.warning("No data found for the specified date range.") return pd.DataFrame() df.set_index('datetime_utc', inplace=True) return df except Exception as e: logging.error(f"Failed to load data for backtest: {e}") return pd.DataFrame() def run_optimization(self): """ Runs the backtest simulation for all parameter combinations in parallel, provides progress updates, and finds the best result. """ param_configs = self.backtest_config.get('optimization_params', {}) param_names = list(param_configs.keys()) param_ranges = [range(p['start'], p['end'] + 1, p['step']) for p in param_configs.values()] all_combinations = list(itertools.product(*param_ranges)) param_dicts = [dict(zip(param_names, combo)) for combo in all_combinations] logging.info(f"Starting optimization... Testing {len(all_combinations)} parameter combinations using up to 60 cores.") num_cores = 60 self.pool = multiprocessing.Pool(processes=num_cores) worker = partial( simulation_worker, db_path=self.db_path, coin=self.coin, timeframe=self.timeframe, start_date=self.full_history_start_date, end_date=self.backtest_end_date ) results = [] total_tasks = len(param_dicts) completed_tasks = 0 last_update_time = time.time() logging.info("Optimization running... Progress updates will be provided every minute.") # Use imap_unordered to get results as they are completed for params_result, trades_result in self.pool.imap_unordered(worker, param_dicts): completed_tasks += 1 if trades_result: total_pnl = sum(t['pnl_pct'] for t in trades_result) results.append({'params': params_result, 'pnl': total_pnl, 'trades': len(trades_result)}) current_time = time.time() if current_time - last_update_time >= 60: progress = (completed_tasks / total_tasks) * 100 logging.info(f"Progress: {progress:.2f}% complete ({completed_tasks}/{total_tasks} combinations tested).") last_update_time = current_time logging.info(f"Progress: 100.00% complete ({completed_tasks}/{total_tasks} combinations tested).") self.pool.close() self.pool.join() self.pool = None if not results: logging.error("Optimization produced no trades. Cannot determine best parameters.") return best_result = max(results, key=lambda x: x['pnl']) logging.info(f"\n--- Optimization Complete ---") logging.info(f"Best parameters found: {best_result['params']} with PNL: {best_result['pnl']*100:.2f}% over {best_result['trades']} trades.") self.run_forward_test(best_result['params']) def run_forward_test(self, best_params): """Runs a backtest on the forward-testing period using the best parameters.""" logging.info("\n--- Starting Forward Test (Walk-Forward Validation) ---") forward_test_df = self.load_data(self.forward_test_start_date, datetime.now().strftime("%Y-%m-%d")) if forward_test_df.empty: return trades = _run_single_simulation(forward_test_df, best_params) print("\n--- Final Comparison Report ---") print(f"\nBest Parameters from Backtest: {best_params}") print("\n--- Backtest Period Performance (Historical) ---") backtest_df = self.load_data(self.full_history_start_date, self.backtest_end_date) historical_trades = _run_single_simulation(backtest_df, best_params) self._generate_report(historical_trades) print("\n--- Forward Test Performance (Last 4 Weeks) ---") self._generate_report(trades) def _generate_report(self, trades: list): """Calculates and prints key performance metrics.""" if not trades: print("No trades were executed during this period.") return num_trades = len(trades) wins = [t for t in trades if t['pnl_pct'] > 0] total_pnl = sum(t['pnl_pct'] for t in trades) print(f"Total Trades: {num_trades}") print(f"Win Rate: {(len(wins) / num_trades) * 100 if num_trades > 0 else 0:.2f}%") print(f"Total PNL (Cumulative %): {total_pnl * 100:.2f}%") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run a historical backtest with optimization for a trading strategy.") parser.add_argument("--strategy", required=True, help="The name of the backtest config to run (from backtesting_conf.json).") parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) args = parser.parse_args() backtester = Backtester( log_level=args.log_level, strategy_name_to_test=args.strategy ) try: backtester.run_optimization() except KeyboardInterrupt: logging.info("\nBacktest optimization cancelled by user.") finally: if backtester.pool: logging.info("Terminating worker processes...") backtester.pool.terminate() backtester.pool.join() logging.info("Worker processes terminated.")