WALK-FORWARD testing

This commit is contained in:
2025-10-18 18:40:50 +02:00
parent 6812c481e5
commit 64f7866083
3 changed files with 86 additions and 102 deletions

View File

@ -3,5 +3,5 @@
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-18T16:19:00+00:00",
"signal_price": 3870.5,
"last_checked_utc": "2025-10-18T16:29:05.035278+00:00"
"last_checked_utc": "2025-10-18T16:40:05.039625+00:00"
}

View File

@ -3,5 +3,5 @@
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-14T00:00:00+00:00",
"signal_price": 113026.0,
"last_checked_utc": "2025-10-18T16:28:52.112584+00:00"
"last_checked_utc": "2025-10-18T16:40:09.950516+00:00"
}

View File

@ -53,8 +53,8 @@ def _run_single_simulation(df: pd.DataFrame, params: dict) -> list:
def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, start_date: str, end_date: str) -> tuple[dict, list]:
"""
A worker function for multiprocessing. It loads its own data, runs the
simulation, and returns the parameters and results together.
A worker function for multiprocessing. It loads its own data from the DB
and then runs the simulation, returning the parameters and results together.
"""
df = pd.DataFrame()
try:
@ -76,8 +76,8 @@ def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, sta
class Backtester:
"""
A class to run historical simulations (backtests) with parameter optimization
and forward testing on trading strategies, using multiple cores to speed up the process.
A class to run a Walk-Forward Optimization, which is the gold standard
for testing the robustness of a trading strategy.
"""
def __init__(self, log_level: str, strategy_name_to_test: str):
@ -86,67 +86,83 @@ class Backtester:
self.backtest_config = self._load_backtest_config(strategy_name_to_test)
if not self.backtest_config:
logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found in '_data/backtesting_conf.json'.")
logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found.")
sys.exit(1)
self.strategy_name = self.backtest_config.get('strategy_name')
self.strategy_config = self._load_strategy_config()
if not self.strategy_config:
logging.error(f"Strategy '{self.strategy_name}' not found in '_data/strategies.json'.")
logging.error(f"Strategy '{self.strategy_name}' not found.")
sys.exit(1)
self.params = self.strategy_config.get('parameters', {})
self.coin = self.params.get('coin')
self.timeframe = self.params.get('timeframe')
self.forward_test_start_date = (datetime.now() - timedelta(weeks=4)).strftime("%Y-%m-%d")
self.backtest_end_date = (datetime.now() - timedelta(weeks=4, days=1)).strftime("%Y-%m-%d")
self.full_history_start_date = "2020-01-01"
self.pool = None
def _load_backtest_config(self, name_to_test: str) -> dict:
"""Loads the specific backtest configuration from the JSON file."""
config_path = os.path.join("_data", "backtesting_conf.json")
try:
with open(config_path, 'r') as f:
return json.load(f).get(name_to_test)
with open(config_path, 'r') as f: return json.load(f).get(name_to_test)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load backtesting configuration: {e}")
return None
def _load_strategy_config(self) -> dict:
"""Loads the general strategy configuration."""
config_path = os.path.join("_data", "strategies.json")
try:
with open(config_path, 'r') as f:
return json.load(f).get(self.strategy_name)
with open(config_path, 'r') as f: return json.load(f).get(self.strategy_name)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load strategy configuration: {e}")
return None
def load_data(self, start_date, end_date) -> pd.DataFrame:
"""Loads historical data for a specific period for single-threaded tasks."""
table_name = f"{self.coin}_{self.timeframe}"
logging.info(f"Loading data for {table_name} from {start_date} to {end_date}...")
try:
with sqlite3.connect(self.db_path) as conn:
query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc'
df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc'])
if df.empty:
logging.warning("No data found for the specified date range.")
return pd.DataFrame()
df.set_index('datetime_utc', inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data for backtest: {e}")
return pd.DataFrame()
def run_walk_forward_optimization(self, num_periods=10, in_sample_pct=0.9):
"""
Main function to orchestrate the walk-forward analysis.
"""
full_df = self.load_data("2020-01-01", datetime.now().strftime("%Y-%m-%d"))
if full_df.empty: return
def run_optimization(self):
"""
Runs the backtest simulation for all parameter combinations in parallel,
provides progress updates, and finds the best result.
"""
period_length = len(full_df) // num_periods
all_out_of_sample_trades = []
for i in range(num_periods):
logging.info(f"\n--- Starting Walk-Forward Period {i+1}/{num_periods} ---")
# 1. Define the In-Sample (training) and Out-of-Sample (testing) periods
start_index = i * period_length
in_sample_end_index = start_index + int(period_length * in_sample_pct)
out_of_sample_end_index = start_index + period_length
if in_sample_end_index >= len(full_df) or out_of_sample_end_index > len(full_df):
logging.warning("Not enough data for the full final period. Ending analysis.")
break
in_sample_df = full_df.iloc[start_index:in_sample_end_index]
out_of_sample_df = full_df.iloc[in_sample_end_index:out_of_sample_end_index]
logging.info(f"In-Sample: {in_sample_df.index[0].date()} to {in_sample_df.index[-1].date()}")
logging.info(f"Out-of-Sample: {out_of_sample_df.index[0].date()} to {out_of_sample_df.index[-1].date()}")
# 2. Find the best parameters on the In-Sample data
best_params = self._find_best_params(in_sample_df)
if not best_params:
logging.warning("No profitable parameters found in this period. Skipping.")
continue
# 3. Test the best parameters on the Out-of-Sample data
logging.info(f"Testing best params {best_params} on Out-of-Sample data...")
out_of_sample_trades = _run_single_simulation(out_of_sample_df.copy(), best_params)
all_out_of_sample_trades.extend(out_of_sample_trades)
self._generate_report(out_of_sample_trades, f"Period {i+1} Out-of-Sample Results")
# 4. Generate a final report for all combined out-of-sample trades
print("\n" + "="*50)
self._generate_report(all_out_of_sample_trades, "AGGREGATE WALK-FORWARD PERFORMANCE")
print("="*50)
def _find_best_params(self, df: pd.DataFrame) -> dict:
"""Runs a multi-core optimization on a given slice of data."""
param_configs = self.backtest_config.get('optimization_params', {})
param_names = list(param_configs.keys())
param_ranges = [range(p['start'], p['end'] + 1, p['step']) for p in param_configs.values()]
@ -154,78 +170,46 @@ class Backtester:
all_combinations = list(itertools.product(*param_ranges))
param_dicts = [dict(zip(param_names, combo)) for combo in all_combinations]
logging.info(f"Starting optimization... Testing {len(all_combinations)} parameter combinations using up to 60 cores.")
logging.info(f"Optimizing on {len(all_combinations)} combinations...")
num_cores = 60
self.pool = multiprocessing.Pool(processes=num_cores)
worker = partial(
simulation_worker,
db_path=self.db_path,
coin=self.coin,
timeframe=self.timeframe,
start_date=self.full_history_start_date,
end_date=self.backtest_end_date
)
results = []
total_tasks = len(param_dicts)
completed_tasks = 0
last_update_time = time.time()
logging.info("Optimization running... Progress updates will be provided every minute.")
# Use imap_unordered to get results as they are completed
for params_result, trades_result in self.pool.imap_unordered(worker, param_dicts):
completed_tasks += 1
if trades_result:
total_pnl = sum(t['pnl_pct'] for t in trades_result)
results.append({'params': params_result, 'pnl': total_pnl, 'trades': len(trades_result)})
current_time = time.time()
if current_time - last_update_time >= 60:
progress = (completed_tasks / total_tasks) * 100
logging.info(f"Progress: {progress:.2f}% complete ({completed_tasks}/{total_tasks} combinations tested).")
last_update_time = current_time
logging.info(f"Progress: 100.00% complete ({completed_tasks}/{total_tasks} combinations tested).")
worker = partial(_run_single_simulation, df.copy())
all_trades_results = self.pool.map(worker, param_dicts)
self.pool.close()
self.pool.join()
self.pool = None
if not results:
logging.error("Optimization produced no trades. Cannot determine best parameters.")
return
results = []
for i, trades in enumerate(all_trades_results):
if trades:
results.append({'params': param_dicts[i], 'pnl': sum(t['pnl_pct'] for t in trades)})
best_result = max(results, key=lambda x: x['pnl'])
logging.info(f"\n--- Optimization Complete ---")
logging.info(f"Best parameters found: {best_result['params']} with PNL: {best_result['pnl']*100:.2f}% over {best_result['trades']} trades.")
if not results: return None
return max(results, key=lambda x: x['pnl'])['params']
self.run_forward_test(best_result['params'])
def load_data(self, start_date, end_date):
# This is a simplified version for the main data load
table_name = f"{self.coin}_{self.timeframe}"
logging.info(f"Loading full dataset for {table_name}...")
try:
with sqlite3.connect(self.db_path) as conn:
query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc'
df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc'])
if df.empty:
logging.warning("No data found for the specified date range.")
return pd.DataFrame()
df.set_index('datetime_utc', inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data for backtest: {e}")
return pd.DataFrame()
def run_forward_test(self, best_params):
"""Runs a backtest on the forward-testing period using the best parameters."""
logging.info("\n--- Starting Forward Test (Walk-Forward Validation) ---")
forward_test_df = self.load_data(self.forward_test_start_date, datetime.now().strftime("%Y-%m-%d"))
if forward_test_df.empty:
return
trades = _run_single_simulation(forward_test_df, best_params)
print("\n--- Final Comparison Report ---")
print(f"\nBest Parameters from Backtest: {best_params}")
print("\n--- Backtest Period Performance (Historical) ---")
backtest_df = self.load_data(self.full_history_start_date, self.backtest_end_date)
historical_trades = _run_single_simulation(backtest_df, best_params)
self._generate_report(historical_trades)
print("\n--- Forward Test Performance (Last 4 Weeks) ---")
self._generate_report(trades)
def _generate_report(self, trades: list):
def _generate_report(self, trades: list, title: str):
"""Calculates and prints key performance metrics."""
print(f"\n--- {title} ---")
if not trades:
print("No trades were executed during this period.")
return
@ -240,7 +224,7 @@ class Backtester:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a historical backtest with optimization for a trading strategy.")
parser = argparse.ArgumentParser(description="Run a Walk-Forward Optimization for a trading strategy.")
parser.add_argument("--strategy", required=True, help="The name of the backtest config to run (from backtesting_conf.json).")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
@ -251,9 +235,9 @@ if __name__ == "__main__":
)
try:
backtester.run_optimization()
backtester.run_walk_forward_optimization()
except KeyboardInterrupt:
logging.info("\nBacktest optimization cancelled by user.")
logging.info("\nWalk-Forward Optimization cancelled by user.")
finally:
if backtester.pool:
logging.info("Terminating worker processes...")