backword forward strategy

This commit is contained in:
2025-10-18 18:29:06 +02:00
parent 2b55851136
commit 6812c481e5
5 changed files with 295 additions and 5 deletions

View File

@ -0,0 +1,27 @@
{
"sma_cross_eth_1m": {
"strategy_name": "sma_cross_1",
"optimization_params": {
"fast": {
"start": 4,
"end": 15,
"step": 1
},
"slow": {
"start": 20,
"end": 60,
"step": 1
}
}
},
"sma_44d_btc": {
"strategy_name": "sma_cross_2",
"optimization_params": {
"sma_period": {
"start": 20,
"end": 250,
"step": 1
}
}
}
}

View File

@ -5,7 +5,7 @@
"agent": "scalper",
"parameters": {
"coin": "ETH",
"timeframe": "1m",
"timeframe": "5m",
"slow": 44,
"fast": 7,
"size": 0.0028,

View File

@ -1,7 +1,7 @@
{
"strategy_name": "sma_cross_1",
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-18T13:03:00+00:00",
"signal_price": 3871.9,
"last_checked_utc": "2025-10-18T13:55:05.015097+00:00"
"last_signal_change_utc": "2025-10-18T16:19:00+00:00",
"signal_price": 3870.5,
"last_checked_utc": "2025-10-18T16:29:05.035278+00:00"
}

View File

@ -3,5 +3,5 @@
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-14T00:00:00+00:00",
"signal_price": 113026.0,
"last_checked_utc": "2025-10-18T13:55:45.714315+00:00"
"last_checked_utc": "2025-10-18T16:28:52.112584+00:00"
}

263
backtester.py Normal file
View File

@ -0,0 +1,263 @@
import argparse
import logging
import os
import sys
import sqlite3
import pandas as pd
import json
from datetime import datetime, timedelta
import itertools
import multiprocessing
from functools import partial
import time
from logging_utils import setup_logging
def _run_single_simulation(df: pd.DataFrame, params: dict) -> list:
"""
Core simulation logic. Takes a DataFrame and parameters, returns a list of trades.
This is a pure function to be used by different data loaders.
"""
fast_ma_period = params.get('fast', 0)
slow_ma_period = params.get('slow', 0)
sma_period = params.get('sma_period', 0)
if fast_ma_period and slow_ma_period:
df['fast_sma'] = df['close'].rolling(window=fast_ma_period).mean()
df['slow_sma'] = df['close'].rolling(window=slow_ma_period).mean()
df['signal'] = (df['fast_sma'] > df['slow_sma']).astype(int)
elif sma_period:
df['sma'] = df['close'].rolling(window=sma_period).mean()
df['signal'] = (df['close'] > df['sma']).astype(int)
else:
return []
df.dropna(inplace=True)
if df.empty: return []
df['position'] = df['signal'].diff()
trades = []
entry_price = 0
for i, row in df.iterrows():
if row['position'] == 1:
if entry_price == 0: # Only enter if flat
entry_price = row['close']
elif row['position'] == -1:
if entry_price != 0: # Only exit if in a position
pnl = (row['close'] - entry_price) / entry_price
trades.append({'pnl_pct': pnl})
entry_price = 0
return trades
def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, start_date: str, end_date: str) -> tuple[dict, list]:
"""
A worker function for multiprocessing. It loads its own data, runs the
simulation, and returns the parameters and results together.
"""
df = pd.DataFrame()
try:
with sqlite3.connect(db_path) as conn:
query = f'SELECT datetime_utc, close FROM "{coin}_{timeframe}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc'
df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc'])
if not df.empty:
df.set_index('datetime_utc', inplace=True)
except Exception as e:
print(f"Worker error loading data for params {params}: {e}")
return (params, [])
if df.empty:
return (params, [])
trades = _run_single_simulation(df, params)
return (params, trades)
class Backtester:
"""
A class to run historical simulations (backtests) with parameter optimization
and forward testing on trading strategies, using multiple cores to speed up the process.
"""
def __init__(self, log_level: str, strategy_name_to_test: str):
setup_logging(log_level, 'Backtester')
self.db_path = os.path.join("_data", "market_data.db")
self.backtest_config = self._load_backtest_config(strategy_name_to_test)
if not self.backtest_config:
logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found in '_data/backtesting_conf.json'.")
sys.exit(1)
self.strategy_name = self.backtest_config.get('strategy_name')
self.strategy_config = self._load_strategy_config()
if not self.strategy_config:
logging.error(f"Strategy '{self.strategy_name}' not found in '_data/strategies.json'.")
sys.exit(1)
self.params = self.strategy_config.get('parameters', {})
self.coin = self.params.get('coin')
self.timeframe = self.params.get('timeframe')
self.forward_test_start_date = (datetime.now() - timedelta(weeks=4)).strftime("%Y-%m-%d")
self.backtest_end_date = (datetime.now() - timedelta(weeks=4, days=1)).strftime("%Y-%m-%d")
self.full_history_start_date = "2020-01-01"
self.pool = None
def _load_backtest_config(self, name_to_test: str) -> dict:
"""Loads the specific backtest configuration from the JSON file."""
config_path = os.path.join("_data", "backtesting_conf.json")
try:
with open(config_path, 'r') as f:
return json.load(f).get(name_to_test)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load backtesting configuration: {e}")
return None
def _load_strategy_config(self) -> dict:
"""Loads the general strategy configuration."""
config_path = os.path.join("_data", "strategies.json")
try:
with open(config_path, 'r') as f:
return json.load(f).get(self.strategy_name)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load strategy configuration: {e}")
return None
def load_data(self, start_date, end_date) -> pd.DataFrame:
"""Loads historical data for a specific period for single-threaded tasks."""
table_name = f"{self.coin}_{self.timeframe}"
logging.info(f"Loading data for {table_name} from {start_date} to {end_date}...")
try:
with sqlite3.connect(self.db_path) as conn:
query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc'
df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc'])
if df.empty:
logging.warning("No data found for the specified date range.")
return pd.DataFrame()
df.set_index('datetime_utc', inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data for backtest: {e}")
return pd.DataFrame()
def run_optimization(self):
"""
Runs the backtest simulation for all parameter combinations in parallel,
provides progress updates, and finds the best result.
"""
param_configs = self.backtest_config.get('optimization_params', {})
param_names = list(param_configs.keys())
param_ranges = [range(p['start'], p['end'] + 1, p['step']) for p in param_configs.values()]
all_combinations = list(itertools.product(*param_ranges))
param_dicts = [dict(zip(param_names, combo)) for combo in all_combinations]
logging.info(f"Starting optimization... Testing {len(all_combinations)} parameter combinations using up to 60 cores.")
num_cores = 60
self.pool = multiprocessing.Pool(processes=num_cores)
worker = partial(
simulation_worker,
db_path=self.db_path,
coin=self.coin,
timeframe=self.timeframe,
start_date=self.full_history_start_date,
end_date=self.backtest_end_date
)
results = []
total_tasks = len(param_dicts)
completed_tasks = 0
last_update_time = time.time()
logging.info("Optimization running... Progress updates will be provided every minute.")
# Use imap_unordered to get results as they are completed
for params_result, trades_result in self.pool.imap_unordered(worker, param_dicts):
completed_tasks += 1
if trades_result:
total_pnl = sum(t['pnl_pct'] for t in trades_result)
results.append({'params': params_result, 'pnl': total_pnl, 'trades': len(trades_result)})
current_time = time.time()
if current_time - last_update_time >= 60:
progress = (completed_tasks / total_tasks) * 100
logging.info(f"Progress: {progress:.2f}% complete ({completed_tasks}/{total_tasks} combinations tested).")
last_update_time = current_time
logging.info(f"Progress: 100.00% complete ({completed_tasks}/{total_tasks} combinations tested).")
self.pool.close()
self.pool.join()
self.pool = None
if not results:
logging.error("Optimization produced no trades. Cannot determine best parameters.")
return
best_result = max(results, key=lambda x: x['pnl'])
logging.info(f"\n--- Optimization Complete ---")
logging.info(f"Best parameters found: {best_result['params']} with PNL: {best_result['pnl']*100:.2f}% over {best_result['trades']} trades.")
self.run_forward_test(best_result['params'])
def run_forward_test(self, best_params):
"""Runs a backtest on the forward-testing period using the best parameters."""
logging.info("\n--- Starting Forward Test (Walk-Forward Validation) ---")
forward_test_df = self.load_data(self.forward_test_start_date, datetime.now().strftime("%Y-%m-%d"))
if forward_test_df.empty:
return
trades = _run_single_simulation(forward_test_df, best_params)
print("\n--- Final Comparison Report ---")
print(f"\nBest Parameters from Backtest: {best_params}")
print("\n--- Backtest Period Performance (Historical) ---")
backtest_df = self.load_data(self.full_history_start_date, self.backtest_end_date)
historical_trades = _run_single_simulation(backtest_df, best_params)
self._generate_report(historical_trades)
print("\n--- Forward Test Performance (Last 4 Weeks) ---")
self._generate_report(trades)
def _generate_report(self, trades: list):
"""Calculates and prints key performance metrics."""
if not trades:
print("No trades were executed during this period.")
return
num_trades = len(trades)
wins = [t for t in trades if t['pnl_pct'] > 0]
total_pnl = sum(t['pnl_pct'] for t in trades)
print(f"Total Trades: {num_trades}")
print(f"Win Rate: {(len(wins) / num_trades) * 100 if num_trades > 0 else 0:.2f}%")
print(f"Total PNL (Cumulative %): {total_pnl * 100:.2f}%")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a historical backtest with optimization for a trading strategy.")
parser.add_argument("--strategy", required=True, help="The name of the backtest config to run (from backtesting_conf.json).")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
backtester = Backtester(
log_level=args.log_level,
strategy_name_to_test=args.strategy
)
try:
backtester.run_optimization()
except KeyboardInterrupt:
logging.info("\nBacktest optimization cancelled by user.")
finally:
if backtester.pool:
logging.info("Terminating worker processes...")
backtester.pool.terminate()
backtester.pool.join()
logging.info("Worker processes terminated.")