diff --git a/_data/backtesting_conf.json b/_data/backtesting_conf.json new file mode 100644 index 0000000..09cd709 --- /dev/null +++ b/_data/backtesting_conf.json @@ -0,0 +1,27 @@ +{ + "sma_cross_eth_1m": { + "strategy_name": "sma_cross_1", + "optimization_params": { + "fast": { + "start": 4, + "end": 15, + "step": 1 + }, + "slow": { + "start": 20, + "end": 60, + "step": 1 + } + } + }, + "sma_44d_btc": { + "strategy_name": "sma_cross_2", + "optimization_params": { + "sma_period": { + "start": 20, + "end": 250, + "step": 1 + } + } + } +} diff --git a/_data/strategies.json b/_data/strategies.json index 668ef58..01e0dc1 100644 --- a/_data/strategies.json +++ b/_data/strategies.json @@ -5,7 +5,7 @@ "agent": "scalper", "parameters": { "coin": "ETH", - "timeframe": "1m", + "timeframe": "5m", "slow": 44, "fast": 7, "size": 0.0028, diff --git a/_data/strategy_status_sma_cross_1.json b/_data/strategy_status_sma_cross_1.json index c83b076..02e9869 100644 --- a/_data/strategy_status_sma_cross_1.json +++ b/_data/strategy_status_sma_cross_1.json @@ -1,7 +1,7 @@ { "strategy_name": "sma_cross_1", "current_signal": "SELL", - "last_signal_change_utc": "2025-10-18T13:03:00+00:00", - "signal_price": 3871.9, - "last_checked_utc": "2025-10-18T13:55:05.015097+00:00" + "last_signal_change_utc": "2025-10-18T16:19:00+00:00", + "signal_price": 3870.5, + "last_checked_utc": "2025-10-18T16:29:05.035278+00:00" } \ No newline at end of file diff --git a/_data/strategy_status_sma_cross_2.json b/_data/strategy_status_sma_cross_2.json index e5d23a6..33adf7c 100644 --- a/_data/strategy_status_sma_cross_2.json +++ b/_data/strategy_status_sma_cross_2.json @@ -3,5 +3,5 @@ "current_signal": "SELL", "last_signal_change_utc": "2025-10-14T00:00:00+00:00", "signal_price": 113026.0, - "last_checked_utc": "2025-10-18T13:55:45.714315+00:00" + "last_checked_utc": "2025-10-18T16:28:52.112584+00:00" } \ No newline at end of file diff --git a/backtester.py b/backtester.py new file mode 100644 index 0000000..8a6e532 --- /dev/null +++ b/backtester.py @@ -0,0 +1,263 @@ +import argparse +import logging +import os +import sys +import sqlite3 +import pandas as pd +import json +from datetime import datetime, timedelta +import itertools +import multiprocessing +from functools import partial +import time + +from logging_utils import setup_logging + +def _run_single_simulation(df: pd.DataFrame, params: dict) -> list: + """ + Core simulation logic. Takes a DataFrame and parameters, returns a list of trades. + This is a pure function to be used by different data loaders. + """ + fast_ma_period = params.get('fast', 0) + slow_ma_period = params.get('slow', 0) + sma_period = params.get('sma_period', 0) + + if fast_ma_period and slow_ma_period: + df['fast_sma'] = df['close'].rolling(window=fast_ma_period).mean() + df['slow_sma'] = df['close'].rolling(window=slow_ma_period).mean() + df['signal'] = (df['fast_sma'] > df['slow_sma']).astype(int) + elif sma_period: + df['sma'] = df['close'].rolling(window=sma_period).mean() + df['signal'] = (df['close'] > df['sma']).astype(int) + else: + return [] + + df.dropna(inplace=True) + if df.empty: return [] + + df['position'] = df['signal'].diff() + trades = [] + entry_price = 0 + + for i, row in df.iterrows(): + if row['position'] == 1: + if entry_price == 0: # Only enter if flat + entry_price = row['close'] + elif row['position'] == -1: + if entry_price != 0: # Only exit if in a position + pnl = (row['close'] - entry_price) / entry_price + trades.append({'pnl_pct': pnl}) + entry_price = 0 + + return trades + +def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, start_date: str, end_date: str) -> tuple[dict, list]: + """ + A worker function for multiprocessing. It loads its own data, runs the + simulation, and returns the parameters and results together. + """ + df = pd.DataFrame() + try: + with sqlite3.connect(db_path) as conn: + query = f'SELECT datetime_utc, close FROM "{coin}_{timeframe}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' + df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) + if not df.empty: + df.set_index('datetime_utc', inplace=True) + except Exception as e: + print(f"Worker error loading data for params {params}: {e}") + return (params, []) + + if df.empty: + return (params, []) + + trades = _run_single_simulation(df, params) + return (params, trades) + + +class Backtester: + """ + A class to run historical simulations (backtests) with parameter optimization + and forward testing on trading strategies, using multiple cores to speed up the process. + """ + + def __init__(self, log_level: str, strategy_name_to_test: str): + setup_logging(log_level, 'Backtester') + self.db_path = os.path.join("_data", "market_data.db") + + self.backtest_config = self._load_backtest_config(strategy_name_to_test) + if not self.backtest_config: + logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found in '_data/backtesting_conf.json'.") + sys.exit(1) + + self.strategy_name = self.backtest_config.get('strategy_name') + self.strategy_config = self._load_strategy_config() + if not self.strategy_config: + logging.error(f"Strategy '{self.strategy_name}' not found in '_data/strategies.json'.") + sys.exit(1) + + self.params = self.strategy_config.get('parameters', {}) + self.coin = self.params.get('coin') + self.timeframe = self.params.get('timeframe') + + self.forward_test_start_date = (datetime.now() - timedelta(weeks=4)).strftime("%Y-%m-%d") + self.backtest_end_date = (datetime.now() - timedelta(weeks=4, days=1)).strftime("%Y-%m-%d") + self.full_history_start_date = "2020-01-01" + self.pool = None + + def _load_backtest_config(self, name_to_test: str) -> dict: + """Loads the specific backtest configuration from the JSON file.""" + config_path = os.path.join("_data", "backtesting_conf.json") + try: + with open(config_path, 'r') as f: + return json.load(f).get(name_to_test) + except (FileNotFoundError, json.JSONDecodeError) as e: + logging.error(f"Could not load backtesting configuration: {e}") + return None + + def _load_strategy_config(self) -> dict: + """Loads the general strategy configuration.""" + config_path = os.path.join("_data", "strategies.json") + try: + with open(config_path, 'r') as f: + return json.load(f).get(self.strategy_name) + except (FileNotFoundError, json.JSONDecodeError) as e: + logging.error(f"Could not load strategy configuration: {e}") + return None + + def load_data(self, start_date, end_date) -> pd.DataFrame: + """Loads historical data for a specific period for single-threaded tasks.""" + table_name = f"{self.coin}_{self.timeframe}" + logging.info(f"Loading data for {table_name} from {start_date} to {end_date}...") + try: + with sqlite3.connect(self.db_path) as conn: + query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' + df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) + if df.empty: + logging.warning("No data found for the specified date range.") + return pd.DataFrame() + + df.set_index('datetime_utc', inplace=True) + return df + except Exception as e: + logging.error(f"Failed to load data for backtest: {e}") + return pd.DataFrame() + + def run_optimization(self): + """ + Runs the backtest simulation for all parameter combinations in parallel, + provides progress updates, and finds the best result. + """ + param_configs = self.backtest_config.get('optimization_params', {}) + param_names = list(param_configs.keys()) + param_ranges = [range(p['start'], p['end'] + 1, p['step']) for p in param_configs.values()] + + all_combinations = list(itertools.product(*param_ranges)) + param_dicts = [dict(zip(param_names, combo)) for combo in all_combinations] + + logging.info(f"Starting optimization... Testing {len(all_combinations)} parameter combinations using up to 60 cores.") + + num_cores = 60 + self.pool = multiprocessing.Pool(processes=num_cores) + + worker = partial( + simulation_worker, + db_path=self.db_path, + coin=self.coin, + timeframe=self.timeframe, + start_date=self.full_history_start_date, + end_date=self.backtest_end_date + ) + + results = [] + total_tasks = len(param_dicts) + completed_tasks = 0 + last_update_time = time.time() + + logging.info("Optimization running... Progress updates will be provided every minute.") + + # Use imap_unordered to get results as they are completed + for params_result, trades_result in self.pool.imap_unordered(worker, param_dicts): + completed_tasks += 1 + if trades_result: + total_pnl = sum(t['pnl_pct'] for t in trades_result) + results.append({'params': params_result, 'pnl': total_pnl, 'trades': len(trades_result)}) + + current_time = time.time() + if current_time - last_update_time >= 60: + progress = (completed_tasks / total_tasks) * 100 + logging.info(f"Progress: {progress:.2f}% complete ({completed_tasks}/{total_tasks} combinations tested).") + last_update_time = current_time + + logging.info(f"Progress: 100.00% complete ({completed_tasks}/{total_tasks} combinations tested).") + + self.pool.close() + self.pool.join() + self.pool = None + + if not results: + logging.error("Optimization produced no trades. Cannot determine best parameters.") + return + + best_result = max(results, key=lambda x: x['pnl']) + logging.info(f"\n--- Optimization Complete ---") + logging.info(f"Best parameters found: {best_result['params']} with PNL: {best_result['pnl']*100:.2f}% over {best_result['trades']} trades.") + + self.run_forward_test(best_result['params']) + + def run_forward_test(self, best_params): + """Runs a backtest on the forward-testing period using the best parameters.""" + logging.info("\n--- Starting Forward Test (Walk-Forward Validation) ---") + forward_test_df = self.load_data(self.forward_test_start_date, datetime.now().strftime("%Y-%m-%d")) + if forward_test_df.empty: + return + + trades = _run_single_simulation(forward_test_df, best_params) + + print("\n--- Final Comparison Report ---") + print(f"\nBest Parameters from Backtest: {best_params}") + + print("\n--- Backtest Period Performance (Historical) ---") + backtest_df = self.load_data(self.full_history_start_date, self.backtest_end_date) + historical_trades = _run_single_simulation(backtest_df, best_params) + self._generate_report(historical_trades) + + print("\n--- Forward Test Performance (Last 4 Weeks) ---") + self._generate_report(trades) + + def _generate_report(self, trades: list): + """Calculates and prints key performance metrics.""" + if not trades: + print("No trades were executed during this period.") + return + + num_trades = len(trades) + wins = [t for t in trades if t['pnl_pct'] > 0] + total_pnl = sum(t['pnl_pct'] for t in trades) + + print(f"Total Trades: {num_trades}") + print(f"Win Rate: {(len(wins) / num_trades) * 100 if num_trades > 0 else 0:.2f}%") + print(f"Total PNL (Cumulative %): {total_pnl * 100:.2f}%") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run a historical backtest with optimization for a trading strategy.") + parser.add_argument("--strategy", required=True, help="The name of the backtest config to run (from backtesting_conf.json).") + parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) + args = parser.parse_args() + + backtester = Backtester( + log_level=args.log_level, + strategy_name_to_test=args.strategy + ) + + try: + backtester.run_optimization() + except KeyboardInterrupt: + logging.info("\nBacktest optimization cancelled by user.") + finally: + if backtester.pool: + logging.info("Terminating worker processes...") + backtester.pool.terminate() + backtester.pool.join() + logging.info("Worker processes terminated.") +