feat: implement backtesting environment and refine strategy logic (v1.7.5)

- Extract strategy logic into a reusable PingPongStrategy class for bot and backtester.
- Create src/strategies/backtest_engine.py for local historical testing using DB data.
- Add BACKTESTING_GUIDE.md with instructions on how to use the new environment.
- Update dashboard to show Net Realized PnL (including fees).
- Verified bot and backtester compatibility with existing Docker setup.
This commit is contained in:
Gemini CLI
2026-03-08 19:56:53 +01:00
parent f544b06753
commit 56d0237bbf
3 changed files with 287 additions and 71 deletions

View File

@ -0,0 +1,142 @@
import pandas as pd
import numpy as np
import yaml
import os
import asyncio
import asyncpg
from datetime import datetime
from ping_pong_bot import PingPongStrategy
class BacktestEngine:
def __init__(self, config_path="config/ping_pong_config.yaml"):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.strategy = PingPongStrategy(self.config)
self.direction = self.config.get('direction', 'long')
self.strategy.direction = self.direction
# Virtual Exchange State
self.balance = 1000.0 # Starting USD
self.equity = 1000.0
self.position_size = 0.0 # BTC
self.position_value = 0.0 # USD
self.entry_price = 0.0
# Settings
self.fee_rate = 0.0005 # 0.05% Taker
self.leverage = float(self.config.get('leverage_long' if self.direction == 'long' else 'leverage_short', 5.0))
self.pos_size_margin = float(self.config.get('pos_size_margin', 20.0))
self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15))
self.trades = []
self.equity_curve = []
async def load_data(self, symbol, interval, limit=5000):
conn = await asyncpg.connect(
host=os.getenv('DB_HOST', '20.20.20.20'),
port=int(os.getenv('DB_PORT', 5433)),
user=os.getenv('DB_USER', 'btc_bot'),
password=os.getenv('DB_PASSWORD', ''),
database=os.getenv('DB_NAME', 'btc_data')
)
rows = await conn.fetch('''
SELECT time, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2
ORDER BY time ASC LIMIT $3
''', symbol, interval, limit)
await conn.close()
df = pd.DataFrame([dict(r) for r in rows])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
return df
def run(self, df):
print(f"Starting backtest on {len(df)} candles...")
df = self.strategy.calculate_indicators(df)
# Start after enough candles for indicators
start_idx = max(self.config['rsi']['period'], self.config['hurst']['period']) + 5
for i in range(start_idx, len(df)):
current_df = df.iloc[:i+1]
price = df.iloc[i]['close']
time = df.iloc[i]['time']
signal = self.strategy.check_signals(current_df)
if signal == "open":
# Entry Logic
qty_usd = self.pos_size_margin * self.leverage
qty_btc = qty_usd / price
fee = qty_usd * self.fee_rate
self.balance -= fee
if self.direction == "long":
self.position_size += qty_btc
else: # Short
self.position_size -= qty_btc
self.entry_price = price # Simplified avg entry
self.trades.append({"time": time, "type": "Enter", "price": price, "fee": fee})
elif signal == "close" and abs(self.position_size) > 0:
# Exit Logic
qty_btc_exit = abs(self.position_size) * self.partial_exit_pct
qty_usd_exit = qty_btc_exit * price
fee = qty_usd_exit * self.fee_rate
# Realized PnL
if self.direction == "long":
pnl = qty_btc_exit * (price - self.entry_price)
self.position_size -= qty_btc_exit
else: # Short
pnl = qty_btc_exit * (self.entry_price - price)
self.position_size += qty_btc_exit
self.balance += (pnl - fee)
self.trades.append({"time": time, "type": "Exit", "price": price, "pnl": pnl, "fee": fee})
# Mark to Market Equity
unrealized = 0
if self.direction == "long":
unrealized = self.position_size * (price - self.entry_price) if self.position_size > 0 else 0
else:
unrealized = abs(self.position_size) * (self.entry_price - price) if self.position_size < 0 else 0
self.equity = self.balance + unrealized
self.equity_curve.append({"time": time, "equity": self.equity})
self.print_results()
def print_results(self):
total_pnl = self.equity - 1000.0
roi = (total_pnl / 1000.0) * 100
fees = sum(t['fee'] for t in self.trades)
print("\n" + "="*30)
print(" BACKTEST RESULTS ")
print("="*30)
print(f"Total Trades: {len(self.trades)}")
print(f"Final Equity: ${self.equity:.2f}")
print(f"Total PnL: ${total_pnl:.2f}")
print(f"ROI: {roi:.2f}%")
print(f"Total Fees: ${fees:.2f}")
print("="*30)
async def main():
engine = BacktestEngine()
# Assume BTC/1m for now
symbol = engine.config['symbol'].replace("USDT", "").replace("USD", "")
interval = engine.config['interval'] + "m"
df = await engine.load_data(symbol, interval, limit=10000)
if not df.empty:
engine.run(df)
else:
print("No data found in DB.")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -84,12 +84,93 @@ class DatabaseManager:
logger.error(f"DB Query Error for {symbol} {interval}: {e}")
return []
class PingPongStrategy:
"""Core Strategy Logic for Ping-Pong Scalping"""
def __init__(self, config):
self.config = config
self.direction = config.get('direction', 'long')
def rma(self, series, length):
alpha = 1 / length
return series.ewm(alpha=alpha, adjust=False).mean()
def calculate_indicators(self, df):
# RSI
rsi_cfg = self.config['rsi']
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
df['rsi'] = 100 - (100 / (1 + (self.rma(gain, rsi_cfg['period']) / self.rma(loss, rsi_cfg['period']))))
# Hurst
hurst_cfg = self.config['hurst']
mcl = hurst_cfg['period'] / 2
mcl_2 = int(round(mcl / 2))
df['tr'] = np.maximum(df['high'] - df['low'], np.maximum(abs(df['high'] - df['close'].shift(1)), abs(df['low'] - df['close'].shift(1))))
df['ma_mcl'] = self.rma(df['close'], mcl)
df['atr_mcl'] = self.rma(df['tr'], mcl)
df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl'])
mcm_off = hurst_cfg['multiplier'] * df['atr_mcl']
df['hurst_upper'] = df['center'] + mcm_off
df['hurst_lower'] = df['center'] - mcm_off
return df
def check_signals(self, df):
if len(df) < 3: return None
# finished = candle that just closed (e.g. 10:30)
# prev = candle before that (e.g. 10:29)
finished = df.iloc[-2]
prev = df.iloc[-3]
rsi_cfg, hurst_cfg = self.config['rsi'] or {}, self.config['hurst'] or {}
def is_crossing_up(p_val, p_band, c_open, c_close, c_band):
# 1. Crossed up BETWEEN candles
between = p_val < p_band and c_close >= c_band
# 2. Crossed up WITHIN this candle
within = c_open is not None and c_open < c_band and c_close >= c_band
return between or within
def is_crossing_down(p_val, p_band, c_open, c_close, c_band):
# 1. Crossed down BETWEEN candles
between = p_val > p_band and c_close <= c_band
# 2. Crossed down WITHIN this candle
within = c_open is not None and c_open > c_band and c_close <= c_band
return between or within
# Hurst Signals
h_upper_cross_down = is_crossing_down(prev['close'], prev['hurst_upper'], finished['open'], finished['close'], finished['hurst_upper'])
h_lower_cross_down = is_crossing_down(prev['close'], prev['hurst_lower'], finished['open'], finished['close'], finished['hurst_lower'])
# RSI Signals
rsi_cross_up = is_crossing_up(prev['rsi'], rsi_cfg.get('oversold', 30), None, finished['rsi'], rsi_cfg.get('oversold', 30))
rsi_cross_down = is_crossing_down(prev['rsi'], rsi_cfg.get('overbought', 70), None, finished['rsi'], rsi_cfg.get('overbought', 70))
l_open = (rsi_cfg.get('enabled_for_open') and rsi_cross_up) or \
(hurst_cfg.get('enabled_for_open') and h_lower_cross_down)
l_close = (rsi_cfg.get('enabled_for_close') and rsi_cross_down) or \
(hurst_cfg.get('enabled_for_close') and h_upper_cross_down)
s_open = (rsi_cfg.get('enabled_for_open') and rsi_cross_down) or \
(hurst_cfg.get('enabled_for_open') and h_upper_cross_down)
s_close = (rsi_cfg.get('enabled_for_close') and rsi_cross_up) or \
(hurst_cfg.get('enabled_for_close') and h_lower_cross_down)
if self.direction == 'long':
return "open" if l_open else ("close" if l_close else None)
else:
return "open" if s_open else ("close" if s_close else None)
class PingPongBot:
def __init__(self, config_path="config/ping_pong_config.yaml"):
self.version = "1.7.3"
self.version = "1.7.5"
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.strategy = PingPongStrategy(self.config)
# Explicitly load from ENV to ensure they are available
self.api_key = os.getenv("BYBIT_API_KEY") or os.getenv("API_KEY")
self.api_secret = os.getenv("BYBIT_API_SECRET") or os.getenv("API_SECRET")
@ -201,30 +282,8 @@ class PingPongBot:
except Exception as e:
logger.error(f"Failed to write to CSV log: {e}")
def rma(self, series, length):
alpha = 1 / length
return series.ewm(alpha=alpha, adjust=False).mean()
def calculate_indicators(self, df):
# RSI
rsi_cfg = self.config['rsi']
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
df['rsi'] = 100 - (100 / (1 + (self.rma(gain, rsi_cfg['period']) / self.rma(loss, rsi_cfg['period']))))
# Hurst
hurst_cfg = self.config['hurst']
mcl = hurst_cfg['period'] / 2
mcl_2 = int(round(mcl / 2))
df['tr'] = np.maximum(df['high'] - df['low'], np.maximum(abs(df['high'] - df['close'].shift(1)), abs(df['low'] - df['close'].shift(1))))
df['ma_mcl'] = self.rma(df['close'], mcl)
df['atr_mcl'] = self.rma(df['tr'], mcl)
df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl'])
mcm_off = hurst_cfg['multiplier'] * df['atr_mcl']
df['hurst_upper'] = df['center'] + mcm_off
df['hurst_lower'] = df['center'] - mcm_off
df = self.strategy.calculate_indicators(df)
last_row = df.iloc[-1]
now_str = datetime.now().strftime("%H:%M:%S")
self.current_indicators["rsi"] = {"value": float(last_row['rsi']), "timestamp": now_str}
@ -262,6 +321,7 @@ class PingPongBot:
await self.close_all_positions()
self.direction = new_direction
self.strategy.direction = new_direction
if self.direction == "long":
self.category = "inverse"
self.symbol = f"{self.base_coin}USD"
@ -396,52 +456,7 @@ class PingPongBot:
logger.error(f"Exchange Sync Error: {e}")
def check_signals(self, df):
if len(df) < 3: return None
# finished = candle that just closed (e.g. 10:30)
# prev = candle before that (e.g. 10:29)
finished = df.iloc[-2]
prev = df.iloc[-3]
rsi_cfg, hurst_cfg = self.config['rsi'] or {}, self.config['hurst'] or {}
def is_crossing_up(p_val, p_band, c_open, c_close, c_band):
# 1. Crossed up BETWEEN candles
between = p_val < p_band and c_close >= c_band
# 2. Crossed up WITHIN this candle
within = c_open is not None and c_open < c_band and c_close >= c_band
return between or within
def is_crossing_down(p_val, p_band, c_open, c_close, c_band):
# 1. Crossed down BETWEEN candles
between = p_val > p_band and c_close <= c_band
# 2. Crossed down WITHIN this candle
within = c_open is not None and c_open > c_band and c_close <= c_band
return between or within
# Hurst Signals - Only using 'is_crossing_down' as requested
h_upper_cross_down = is_crossing_down(prev['close'], prev['hurst_upper'], finished['open'], finished['close'], finished['hurst_upper'])
h_lower_cross_down = is_crossing_down(prev['close'], prev['hurst_lower'], finished['open'], finished['close'], finished['hurst_lower'])
# RSI Signals
rsi_cross_up = is_crossing_up(prev['rsi'], rsi_cfg.get('oversold', 30), None, finished['rsi'], rsi_cfg.get('oversold', 30))
rsi_cross_down = is_crossing_down(prev['rsi'], rsi_cfg.get('overbought', 70), None, finished['rsi'], rsi_cfg.get('overbought', 70))
l_open = (rsi_cfg.get('enabled_for_open') and rsi_cross_up) or \
(hurst_cfg.get('enabled_for_open') and h_lower_cross_down)
l_close = (rsi_cfg.get('enabled_for_close') and rsi_cross_down) or \
(hurst_cfg.get('enabled_for_close') and h_upper_cross_down)
s_open = (rsi_cfg.get('enabled_for_open') and rsi_cross_down) or \
(hurst_cfg.get('enabled_for_open') and h_upper_cross_down)
s_close = (rsi_cfg.get('enabled_for_close') and rsi_cross_up) or \
(hurst_cfg.get('enabled_for_close') and h_lower_cross_down)
if self.direction == 'long':
return "open" if l_open else ("close" if l_close else None)
else:
return "open" if s_open else ("close" if s_close else None)
return self.strategy.check_signals(df)
async def execute_trade(self, signal):
if not signal or not self.market_price: return
@ -544,11 +559,14 @@ class PingPongBot:
pnl_color = "green" if self.session_pnl >= 0 else "red"
pnl_btc_color = "green" if self.session_pnl_btc >= 0 else "red"
net_realized_pnl = self.total_realized_pnl - self.total_fees
cfg_table.add_row("Running Time", runtime_str)
cfg_table.add_row("Session PnL (USD)", f"[bold {pnl_color}]{'$' if self.session_pnl >= 0 else '-$'}{abs(self.session_pnl):.2f}[/]")
cfg_table.add_row("Session PnL (BTC)", f"[bold {pnl_btc_color}]{'{:+.6f}'.format(self.session_pnl_btc)} BTC[/]")
cfg_table.add_row("Total Fees", f"[bold red]-${self.total_fees:.2f}[/]")
cfg_table.add_row("Realized PnL", f"[bold {'green' if self.total_realized_pnl >= 0 else 'red'}]${self.total_realized_pnl:.2f}[/]")
cfg_table.add_row("Gross Realized PnL", f"[bold {'green' if self.total_realized_pnl >= 0 else 'red'}]${self.total_realized_pnl:.2f}[/]")
cfg_table.add_row("Net Realized PnL", f"[bold {'green' if net_realized_pnl >= 0 else 'red'}]${net_realized_pnl:.2f}[/]")
ind_table = Table(title="INDICATORS", box=box.ROUNDED, expand=True)
ind_table.add_column("Indicator"); ind_table.add_column("Value"); ind_table.add_column("Updated")