diff --git a/_data/strategy_status_sma_cross_1.json b/_data/strategy_status_sma_cross_1.json index 02e9869..2944d79 100644 --- a/_data/strategy_status_sma_cross_1.json +++ b/_data/strategy_status_sma_cross_1.json @@ -3,5 +3,5 @@ "current_signal": "SELL", "last_signal_change_utc": "2025-10-18T16:19:00+00:00", "signal_price": 3870.5, - "last_checked_utc": "2025-10-18T16:29:05.035278+00:00" + "last_checked_utc": "2025-10-18T16:40:05.039625+00:00" } \ No newline at end of file diff --git a/_data/strategy_status_sma_cross_2.json b/_data/strategy_status_sma_cross_2.json index 33adf7c..95a0419 100644 --- a/_data/strategy_status_sma_cross_2.json +++ b/_data/strategy_status_sma_cross_2.json @@ -3,5 +3,5 @@ "current_signal": "SELL", "last_signal_change_utc": "2025-10-14T00:00:00+00:00", "signal_price": 113026.0, - "last_checked_utc": "2025-10-18T16:28:52.112584+00:00" + "last_checked_utc": "2025-10-18T16:40:09.950516+00:00" } \ No newline at end of file diff --git a/backtester.py b/backtester.py index 8a6e532..4a37bfc 100644 --- a/backtester.py +++ b/backtester.py @@ -53,8 +53,8 @@ def _run_single_simulation(df: pd.DataFrame, params: dict) -> list: def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, start_date: str, end_date: str) -> tuple[dict, list]: """ - A worker function for multiprocessing. It loads its own data, runs the - simulation, and returns the parameters and results together. + A worker function for multiprocessing. It loads its own data from the DB + and then runs the simulation, returning the parameters and results together. """ df = pd.DataFrame() try: @@ -76,8 +76,8 @@ def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, sta class Backtester: """ - A class to run historical simulations (backtests) with parameter optimization - and forward testing on trading strategies, using multiple cores to speed up the process. + A class to run a Walk-Forward Optimization, which is the gold standard + for testing the robustness of a trading strategy. """ def __init__(self, log_level: str, strategy_name_to_test: str): @@ -86,67 +86,83 @@ class Backtester: self.backtest_config = self._load_backtest_config(strategy_name_to_test) if not self.backtest_config: - logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found in '_data/backtesting_conf.json'.") + logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found.") sys.exit(1) self.strategy_name = self.backtest_config.get('strategy_name') self.strategy_config = self._load_strategy_config() if not self.strategy_config: - logging.error(f"Strategy '{self.strategy_name}' not found in '_data/strategies.json'.") + logging.error(f"Strategy '{self.strategy_name}' not found.") sys.exit(1) self.params = self.strategy_config.get('parameters', {}) self.coin = self.params.get('coin') self.timeframe = self.params.get('timeframe') - - self.forward_test_start_date = (datetime.now() - timedelta(weeks=4)).strftime("%Y-%m-%d") - self.backtest_end_date = (datetime.now() - timedelta(weeks=4, days=1)).strftime("%Y-%m-%d") - self.full_history_start_date = "2020-01-01" self.pool = None def _load_backtest_config(self, name_to_test: str) -> dict: - """Loads the specific backtest configuration from the JSON file.""" config_path = os.path.join("_data", "backtesting_conf.json") try: - with open(config_path, 'r') as f: - return json.load(f).get(name_to_test) + with open(config_path, 'r') as f: return json.load(f).get(name_to_test) except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load backtesting configuration: {e}") return None def _load_strategy_config(self) -> dict: - """Loads the general strategy configuration.""" config_path = os.path.join("_data", "strategies.json") try: - with open(config_path, 'r') as f: - return json.load(f).get(self.strategy_name) + with open(config_path, 'r') as f: return json.load(f).get(self.strategy_name) except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load strategy configuration: {e}") return None - def load_data(self, start_date, end_date) -> pd.DataFrame: - """Loads historical data for a specific period for single-threaded tasks.""" - table_name = f"{self.coin}_{self.timeframe}" - logging.info(f"Loading data for {table_name} from {start_date} to {end_date}...") - try: - with sqlite3.connect(self.db_path) as conn: - query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' - df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) - if df.empty: - logging.warning("No data found for the specified date range.") - return pd.DataFrame() - - df.set_index('datetime_utc', inplace=True) - return df - except Exception as e: - logging.error(f"Failed to load data for backtest: {e}") - return pd.DataFrame() + def run_walk_forward_optimization(self, num_periods=10, in_sample_pct=0.9): + """ + Main function to orchestrate the walk-forward analysis. + """ + full_df = self.load_data("2020-01-01", datetime.now().strftime("%Y-%m-%d")) + if full_df.empty: return - def run_optimization(self): - """ - Runs the backtest simulation for all parameter combinations in parallel, - provides progress updates, and finds the best result. - """ + period_length = len(full_df) // num_periods + all_out_of_sample_trades = [] + + for i in range(num_periods): + logging.info(f"\n--- Starting Walk-Forward Period {i+1}/{num_periods} ---") + + # 1. Define the In-Sample (training) and Out-of-Sample (testing) periods + start_index = i * period_length + in_sample_end_index = start_index + int(period_length * in_sample_pct) + out_of_sample_end_index = start_index + period_length + + if in_sample_end_index >= len(full_df) or out_of_sample_end_index > len(full_df): + logging.warning("Not enough data for the full final period. Ending analysis.") + break + + in_sample_df = full_df.iloc[start_index:in_sample_end_index] + out_of_sample_df = full_df.iloc[in_sample_end_index:out_of_sample_end_index] + + logging.info(f"In-Sample: {in_sample_df.index[0].date()} to {in_sample_df.index[-1].date()}") + logging.info(f"Out-of-Sample: {out_of_sample_df.index[0].date()} to {out_of_sample_df.index[-1].date()}") + + # 2. Find the best parameters on the In-Sample data + best_params = self._find_best_params(in_sample_df) + if not best_params: + logging.warning("No profitable parameters found in this period. Skipping.") + continue + + # 3. Test the best parameters on the Out-of-Sample data + logging.info(f"Testing best params {best_params} on Out-of-Sample data...") + out_of_sample_trades = _run_single_simulation(out_of_sample_df.copy(), best_params) + all_out_of_sample_trades.extend(out_of_sample_trades) + self._generate_report(out_of_sample_trades, f"Period {i+1} Out-of-Sample Results") + + # 4. Generate a final report for all combined out-of-sample trades + print("\n" + "="*50) + self._generate_report(all_out_of_sample_trades, "AGGREGATE WALK-FORWARD PERFORMANCE") + print("="*50) + + def _find_best_params(self, df: pd.DataFrame) -> dict: + """Runs a multi-core optimization on a given slice of data.""" param_configs = self.backtest_config.get('optimization_params', {}) param_names = list(param_configs.keys()) param_ranges = [range(p['start'], p['end'] + 1, p['step']) for p in param_configs.values()] @@ -154,78 +170,46 @@ class Backtester: all_combinations = list(itertools.product(*param_ranges)) param_dicts = [dict(zip(param_names, combo)) for combo in all_combinations] - logging.info(f"Starting optimization... Testing {len(all_combinations)} parameter combinations using up to 60 cores.") + logging.info(f"Optimizing on {len(all_combinations)} combinations...") num_cores = 60 self.pool = multiprocessing.Pool(processes=num_cores) - worker = partial( - simulation_worker, - db_path=self.db_path, - coin=self.coin, - timeframe=self.timeframe, - start_date=self.full_history_start_date, - end_date=self.backtest_end_date - ) - - results = [] - total_tasks = len(param_dicts) - completed_tasks = 0 - last_update_time = time.time() - - logging.info("Optimization running... Progress updates will be provided every minute.") - - # Use imap_unordered to get results as they are completed - for params_result, trades_result in self.pool.imap_unordered(worker, param_dicts): - completed_tasks += 1 - if trades_result: - total_pnl = sum(t['pnl_pct'] for t in trades_result) - results.append({'params': params_result, 'pnl': total_pnl, 'trades': len(trades_result)}) - - current_time = time.time() - if current_time - last_update_time >= 60: - progress = (completed_tasks / total_tasks) * 100 - logging.info(f"Progress: {progress:.2f}% complete ({completed_tasks}/{total_tasks} combinations tested).") - last_update_time = current_time - - logging.info(f"Progress: 100.00% complete ({completed_tasks}/{total_tasks} combinations tested).") + worker = partial(_run_single_simulation, df.copy()) + all_trades_results = self.pool.map(worker, param_dicts) self.pool.close() self.pool.join() self.pool = None - if not results: - logging.error("Optimization produced no trades. Cannot determine best parameters.") - return + results = [] + for i, trades in enumerate(all_trades_results): + if trades: + results.append({'params': param_dicts[i], 'pnl': sum(t['pnl_pct'] for t in trades)}) - best_result = max(results, key=lambda x: x['pnl']) - logging.info(f"\n--- Optimization Complete ---") - logging.info(f"Best parameters found: {best_result['params']} with PNL: {best_result['pnl']*100:.2f}% over {best_result['trades']} trades.") + if not results: return None + return max(results, key=lambda x: x['pnl'])['params'] - self.run_forward_test(best_result['params']) + def load_data(self, start_date, end_date): + # This is a simplified version for the main data load + table_name = f"{self.coin}_{self.timeframe}" + logging.info(f"Loading full dataset for {table_name}...") + try: + with sqlite3.connect(self.db_path) as conn: + query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' + df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) + if df.empty: + logging.warning("No data found for the specified date range.") + return pd.DataFrame() + df.set_index('datetime_utc', inplace=True) + return df + except Exception as e: + logging.error(f"Failed to load data for backtest: {e}") + return pd.DataFrame() - def run_forward_test(self, best_params): - """Runs a backtest on the forward-testing period using the best parameters.""" - logging.info("\n--- Starting Forward Test (Walk-Forward Validation) ---") - forward_test_df = self.load_data(self.forward_test_start_date, datetime.now().strftime("%Y-%m-%d")) - if forward_test_df.empty: - return - - trades = _run_single_simulation(forward_test_df, best_params) - - print("\n--- Final Comparison Report ---") - print(f"\nBest Parameters from Backtest: {best_params}") - - print("\n--- Backtest Period Performance (Historical) ---") - backtest_df = self.load_data(self.full_history_start_date, self.backtest_end_date) - historical_trades = _run_single_simulation(backtest_df, best_params) - self._generate_report(historical_trades) - - print("\n--- Forward Test Performance (Last 4 Weeks) ---") - self._generate_report(trades) - - def _generate_report(self, trades: list): + def _generate_report(self, trades: list, title: str): """Calculates and prints key performance metrics.""" + print(f"\n--- {title} ---") if not trades: print("No trades were executed during this period.") return @@ -240,7 +224,7 @@ class Backtester: if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Run a historical backtest with optimization for a trading strategy.") + parser = argparse.ArgumentParser(description="Run a Walk-Forward Optimization for a trading strategy.") parser.add_argument("--strategy", required=True, help="The name of the backtest config to run (from backtesting_conf.json).") parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) args = parser.parse_args() @@ -251,9 +235,9 @@ if __name__ == "__main__": ) try: - backtester.run_optimization() + backtester.run_walk_forward_optimization() except KeyboardInterrupt: - logging.info("\nBacktest optimization cancelled by user.") + logging.info("\nWalk-Forward Optimization cancelled by user.") finally: if backtester.pool: logging.info("Terminating worker processes...")