import argparse import logging import os import sys import sqlite3 import pandas as pd import json from datetime import datetime, timedelta import itertools import multiprocessing from functools import partial import time from logging_utils import setup_logging def _run_single_simulation(df: pd.DataFrame, params: dict) -> list: """ Core simulation logic. Takes a DataFrame and parameters, returns a list of trades. This is a pure function to be used by different data loaders. """ fast_ma_period = params.get('fast', 0) slow_ma_period = params.get('slow', 0) sma_period = params.get('sma_period', 0) if fast_ma_period and slow_ma_period: df['fast_sma'] = df['close'].rolling(window=fast_ma_period).mean() df['slow_sma'] = df['close'].rolling(window=slow_ma_period).mean() df['signal'] = (df['fast_sma'] > df['slow_sma']).astype(int) elif sma_period: df['sma'] = df['close'].rolling(window=sma_period).mean() df['signal'] = (df['close'] > df['sma']).astype(int) else: return [] df.dropna(inplace=True) if df.empty: return [] df['position'] = df['signal'].diff() trades = [] entry_price = 0 for i, row in df.iterrows(): if row['position'] == 1: if entry_price == 0: # Only enter if flat entry_price = row['close'] elif row['position'] == -1: if entry_price != 0: # Only exit if in a position pnl = (row['close'] - entry_price) / entry_price trades.append({'pnl_pct': pnl}) entry_price = 0 return trades def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, start_date: str, end_date: str) -> tuple[dict, list]: """ A worker function for multiprocessing. It loads its own data from the DB and then runs the simulation, returning the parameters and results together. """ df = pd.DataFrame() try: with sqlite3.connect(db_path) as conn: query = f'SELECT datetime_utc, close FROM "{coin}_{timeframe}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) if not df.empty: df.set_index('datetime_utc', inplace=True) except Exception as e: print(f"Worker error loading data for params {params}: {e}") return (params, []) if df.empty: return (params, []) trades = _run_single_simulation(df, params) return (params, trades) class Backtester: """ A class to run a Walk-Forward Optimization, which is the gold standard for testing the robustness of a trading strategy. """ def __init__(self, log_level: str, strategy_name_to_test: str): setup_logging(log_level, 'Backtester') self.db_path = os.path.join("_data", "market_data.db") self.backtest_config = self._load_backtest_config(strategy_name_to_test) if not self.backtest_config: logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found.") sys.exit(1) self.strategy_name = self.backtest_config.get('strategy_name') self.strategy_config = self._load_strategy_config() if not self.strategy_config: logging.error(f"Strategy '{self.strategy_name}' not found.") sys.exit(1) self.params = self.strategy_config.get('parameters', {}) self.coin = self.params.get('coin') self.timeframe = self.params.get('timeframe') self.pool = None def _load_backtest_config(self, name_to_test: str) -> dict: config_path = os.path.join("_data", "backtesting_conf.json") try: with open(config_path, 'r') as f: return json.load(f).get(name_to_test) except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load backtesting configuration: {e}") return None def _load_strategy_config(self) -> dict: config_path = os.path.join("_data", "strategies.json") try: with open(config_path, 'r') as f: return json.load(f).get(self.strategy_name) except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load strategy configuration: {e}") return None def run_walk_forward_optimization(self, num_periods=10, in_sample_pct=0.9): """ Main function to orchestrate the walk-forward analysis. """ full_df = self.load_data("2020-01-01", datetime.now().strftime("%Y-%m-%d")) if full_df.empty: return period_length = len(full_df) // num_periods all_out_of_sample_trades = [] for i in range(num_periods): logging.info(f"\n--- Starting Walk-Forward Period {i+1}/{num_periods} ---") # 1. Define the In-Sample (training) and Out-of-Sample (testing) periods start_index = i * period_length in_sample_end_index = start_index + int(period_length * in_sample_pct) out_of_sample_end_index = start_index + period_length if in_sample_end_index >= len(full_df) or out_of_sample_end_index > len(full_df): logging.warning("Not enough data for the full final period. Ending analysis.") break in_sample_df = full_df.iloc[start_index:in_sample_end_index] out_of_sample_df = full_df.iloc[in_sample_end_index:out_of_sample_end_index] logging.info(f"In-Sample: {in_sample_df.index[0].date()} to {in_sample_df.index[-1].date()}") logging.info(f"Out-of-Sample: {out_of_sample_df.index[0].date()} to {out_of_sample_df.index[-1].date()}") # 2. Find the best parameters on the In-Sample data best_params = self._find_best_params(in_sample_df) if not best_params: logging.warning("No profitable parameters found in this period. Skipping.") continue # 3. Test the best parameters on the Out-of-Sample data logging.info(f"Testing best params {best_params} on Out-of-Sample data...") out_of_sample_trades = _run_single_simulation(out_of_sample_df.copy(), best_params) all_out_of_sample_trades.extend(out_of_sample_trades) self._generate_report(out_of_sample_trades, f"Period {i+1} Out-of-Sample Results") # 4. Generate a final report for all combined out-of-sample trades print("\n" + "="*50) self._generate_report(all_out_of_sample_trades, "AGGREGATE WALK-FORWARD PERFORMANCE") print("="*50) def _find_best_params(self, df: pd.DataFrame) -> dict: """Runs a multi-core optimization on a given slice of data.""" param_configs = self.backtest_config.get('optimization_params', {}) param_names = list(param_configs.keys()) param_ranges = [range(p['start'], p['end'] + 1, p['step']) for p in param_configs.values()] all_combinations = list(itertools.product(*param_ranges)) param_dicts = [dict(zip(param_names, combo)) for combo in all_combinations] logging.info(f"Optimizing on {len(all_combinations)} combinations...") num_cores = 60 self.pool = multiprocessing.Pool(processes=num_cores) worker = partial(_run_single_simulation, df.copy()) all_trades_results = self.pool.map(worker, param_dicts) self.pool.close() self.pool.join() self.pool = None results = [] for i, trades in enumerate(all_trades_results): if trades: results.append({'params': param_dicts[i], 'pnl': sum(t['pnl_pct'] for t in trades)}) if not results: return None return max(results, key=lambda x: x['pnl'])['params'] def load_data(self, start_date, end_date): # This is a simplified version for the main data load table_name = f"{self.coin}_{self.timeframe}" logging.info(f"Loading full dataset for {table_name}...") try: with sqlite3.connect(self.db_path) as conn: query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) if df.empty: logging.warning("No data found for the specified date range.") return pd.DataFrame() df.set_index('datetime_utc', inplace=True) return df except Exception as e: logging.error(f"Failed to load data for backtest: {e}") return pd.DataFrame() def _generate_report(self, trades: list, title: str): """Calculates and prints key performance metrics.""" print(f"\n--- {title} ---") if not trades: print("No trades were executed during this period.") return num_trades = len(trades) wins = [t for t in trades if t['pnl_pct'] > 0] total_pnl = sum(t['pnl_pct'] for t in trades) print(f"Total Trades: {num_trades}") print(f"Win Rate: {(len(wins) / num_trades) * 100 if num_trades > 0 else 0:.2f}%") print(f"Total PNL (Cumulative %): {total_pnl * 100:.2f}%") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run a Walk-Forward Optimization for a trading strategy.") parser.add_argument("--strategy", required=True, help="The name of the backtest config to run (from backtesting_conf.json).") parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) args = parser.parse_args() backtester = Backtester( log_level=args.log_level, strategy_name_to_test=args.strategy ) try: backtester.run_walk_forward_optimization() except KeyboardInterrupt: logging.info("\nWalk-Forward Optimization cancelled by user.") finally: if backtester.pool: logging.info("Terminating worker processes...") backtester.pool.terminate() backtester.pool.join() logging.info("Worker processes terminated.")