Compare commits

...

2 Commits

Author SHA1 Message Date
56d0237bbf feat: implement backtesting environment and refine strategy logic (v1.7.5)
- Extract strategy logic into a reusable PingPongStrategy class for bot and backtester.
- Create src/strategies/backtest_engine.py for local historical testing using DB data.
- Add BACKTESTING_GUIDE.md with instructions on how to use the new environment.
- Update dashboard to show Net Realized PnL (including fees).
- Verified bot and backtester compatibility with existing Docker setup.
2026-03-08 19:56:53 +01:00
f544b06753 feat: enhance trade tracking with fees, PnL, and refined logging (v1.7.3)
- Implement real-time fee and realized PnL tracking using get_executions.
- Rename 'side' column to 'trade' in CSV log and dashboard (Enter/Exit labels).
- Add automatic CSV header migration (side -> trade).
- Enhance dashboard with session PnL (USD/BTC), total fees, and used leverage.
- Improve signal detection with candle-internal crossover logic.
- Add robust retry mechanism with failure window tracking.
- Sync exchange leverage automatically based on direction.
- Update config with robustness and mode-specific leverage settings.
2026-03-07 22:57:51 +01:00
5 changed files with 521 additions and 64 deletions

View File

@ -36,10 +36,10 @@ uvicorn src.api.server:app --reload --host 0.0.0.0 --port 8000
### Testing
```bash
# Test database connection
python test_db.py
python -c "from src.data_collector.database import get_db; print('Database connection test successful')"
# Run single test (no existing test framework found but for any future tests)
python -m pytest <test_file>.py::test_<function_name> -v
# Run single test (using pytest framework)
python -m pytest tests/ -v -k "test_function_name"
```
### Environment Setup

56
BACKTESTING_GUIDE.md Normal file
View File

@ -0,0 +1,56 @@
# Ping-Pong Bot: Backtesting & Optimization Guide
This guide explains how to use the local backtesting environment to test and optimize your BTC trading strategy using historical data from your PostgreSQL database.
## 1. Prerequisites
The backtesting engine requires `pandas`, `numpy`, and `asyncpg`. These are already installed in your `btc_ping_pong_bot` Docker container.
To run the backtester, use the following command:
```bash
docker exec -it btc_ping_pong_bot python src/strategies/backtest_engine.py
```
## 2. Backtest Engine (`backtest_engine.py`)
The backtest engine reuses the core logic from `ping_pong_bot.py` via the `PingPongStrategy` class.
### Key Features:
* **Virtual Exchange:** Simulates a $1,000 account with customizable leverage and fees.
* **Fee Simulation:** Applies a 0.05% taker fee (configurable) to every entry and exit.
* **Mark-to-Market:** Calculates real-time equity based on current price and position size.
* **Data Sourcing:** Automatically pulls the last 10,000 candles from your `btc_data` database.
### How to Run:
```bash
# From the project root
docker exec -it btc_ping_pong_bot python src/strategies/backtest_engine.py
```
## 3. Strategy Optimization (Optional)
To find the absolute best parameters for RSI and Hurst, you can use **Optuna**.
### Installation:
Inside the Docker container:
```bash
docker exec -it btc_ping_pong_bot pip install optuna
```
### Planned Optimizer (`optimize_strategy.py`):
Once installed, we can implement an optimization script that searches for:
* `rsi_period`: 7 to 21
* `hurst_multiplier`: 1.2 to 2.5
* `partial_exit_pct`: 0.05 to 0.30
## 4. Local DB Data
The engine connects to your local PostgreSQL DB using the credentials in your `.env` file. It specifically queries the `candles` table for the symbol and interval defined in `config/ping_pong_config.yaml`.
## 5. Interpreting Results
* **Final Equity:** Your simulated account balance after all trades.
* **ROI:** Return on Investment (Percentage).
* **Total Fees:** Total cost paid to the "Virtual Exchange". High fees indicate over-trading.
* **Trade Count:** Total number of Enter/Exit signals triggered.
## 6. Next Steps
1. Run the backtester to see baseline performance.
2. Adjust parameters in `config/ping_pong_config.yaml`.
3. Rerun the backtest to see the impact of your changes.
4. (Optional) Ask me to implement the `optimize_strategy.py` script once you have Optuna installed.

View File

@ -1,7 +1,7 @@
# Ping-Pong Strategy Configuration
# Trading Pair & Timeframe
symbol: BTCUSDT
symbol: BTCUSD
interval: "1" # Minutes (1, 3, 5, 15, 30, 60, 120, 240, 360, 720, D, W, M)
# Indicator Settings
@ -9,25 +9,36 @@ rsi:
period: 14
overbought: 70
oversold: 30
TF: 1 # same as symbol's interval
enabled_for_open: true
enabled_for_close: true
hurst:
period: 30
multiplier: 1.8
TF: 1 # same as symbol's interval
enabled_for_open: true
enabled_for_close: true
# Strategy Settings
direction: "long" # "long" or "short"
capital: 1000.0 # Initial capital for calculations (informational)
exchange_leverage: 3.0 # Multiplier for each 'ping' size
max_effective_leverage: 1.0 # Cap on total position size relative to equity
leverage_long: 10.0 # Leverage for LONG mode
leverage_short: 5.0 # Leverage for SHORT mode
max_effective_leverage: 2.5 # Cap on total position size relative to equity
pos_size_margin: 20.0 # Margin per 'ping' (USD)
take_profit_pct: 1.5 # Target profit percentage per exit (1.5 = 1.5%)
#take_profit_pct: 1.5 # Target profit percentage per exit (1.5 = 1.5%)
partial_exit_pct: 0.15 # 15% of position closed on each TP hit
min_position_value_usd: 15.0 # Minimum remaining value to keep position open
# Execution Settings
loop_interval_seconds: 10 # How often to check for new data
loop_interval_seconds: 5 # How often to check for new data
debug_mode: false
# Robustness Settings
robustness:
enabled: true
max_retries: 3
retry_window_seconds: 300 # 5 minutes
autostart_on_reboot: true

View File

@ -0,0 +1,142 @@
import pandas as pd
import numpy as np
import yaml
import os
import asyncio
import asyncpg
from datetime import datetime
from ping_pong_bot import PingPongStrategy
class BacktestEngine:
def __init__(self, config_path="config/ping_pong_config.yaml"):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.strategy = PingPongStrategy(self.config)
self.direction = self.config.get('direction', 'long')
self.strategy.direction = self.direction
# Virtual Exchange State
self.balance = 1000.0 # Starting USD
self.equity = 1000.0
self.position_size = 0.0 # BTC
self.position_value = 0.0 # USD
self.entry_price = 0.0
# Settings
self.fee_rate = 0.0005 # 0.05% Taker
self.leverage = float(self.config.get('leverage_long' if self.direction == 'long' else 'leverage_short', 5.0))
self.pos_size_margin = float(self.config.get('pos_size_margin', 20.0))
self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15))
self.trades = []
self.equity_curve = []
async def load_data(self, symbol, interval, limit=5000):
conn = await asyncpg.connect(
host=os.getenv('DB_HOST', '20.20.20.20'),
port=int(os.getenv('DB_PORT', 5433)),
user=os.getenv('DB_USER', 'btc_bot'),
password=os.getenv('DB_PASSWORD', ''),
database=os.getenv('DB_NAME', 'btc_data')
)
rows = await conn.fetch('''
SELECT time, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2
ORDER BY time ASC LIMIT $3
''', symbol, interval, limit)
await conn.close()
df = pd.DataFrame([dict(r) for r in rows])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
return df
def run(self, df):
print(f"Starting backtest on {len(df)} candles...")
df = self.strategy.calculate_indicators(df)
# Start after enough candles for indicators
start_idx = max(self.config['rsi']['period'], self.config['hurst']['period']) + 5
for i in range(start_idx, len(df)):
current_df = df.iloc[:i+1]
price = df.iloc[i]['close']
time = df.iloc[i]['time']
signal = self.strategy.check_signals(current_df)
if signal == "open":
# Entry Logic
qty_usd = self.pos_size_margin * self.leverage
qty_btc = qty_usd / price
fee = qty_usd * self.fee_rate
self.balance -= fee
if self.direction == "long":
self.position_size += qty_btc
else: # Short
self.position_size -= qty_btc
self.entry_price = price # Simplified avg entry
self.trades.append({"time": time, "type": "Enter", "price": price, "fee": fee})
elif signal == "close" and abs(self.position_size) > 0:
# Exit Logic
qty_btc_exit = abs(self.position_size) * self.partial_exit_pct
qty_usd_exit = qty_btc_exit * price
fee = qty_usd_exit * self.fee_rate
# Realized PnL
if self.direction == "long":
pnl = qty_btc_exit * (price - self.entry_price)
self.position_size -= qty_btc_exit
else: # Short
pnl = qty_btc_exit * (self.entry_price - price)
self.position_size += qty_btc_exit
self.balance += (pnl - fee)
self.trades.append({"time": time, "type": "Exit", "price": price, "pnl": pnl, "fee": fee})
# Mark to Market Equity
unrealized = 0
if self.direction == "long":
unrealized = self.position_size * (price - self.entry_price) if self.position_size > 0 else 0
else:
unrealized = abs(self.position_size) * (self.entry_price - price) if self.position_size < 0 else 0
self.equity = self.balance + unrealized
self.equity_curve.append({"time": time, "equity": self.equity})
self.print_results()
def print_results(self):
total_pnl = self.equity - 1000.0
roi = (total_pnl / 1000.0) * 100
fees = sum(t['fee'] for t in self.trades)
print("\n" + "="*30)
print(" BACKTEST RESULTS ")
print("="*30)
print(f"Total Trades: {len(self.trades)}")
print(f"Final Equity: ${self.equity:.2f}")
print(f"Total PnL: ${total_pnl:.2f}")
print(f"ROI: {roi:.2f}%")
print(f"Total Fees: ${fees:.2f}")
print("="*30)
async def main():
engine = BacktestEngine()
# Assume BTC/1m for now
symbol = engine.config['symbol'].replace("USDT", "").replace("USD", "")
interval = engine.config['interval'] + "m"
df = await engine.load_data(symbol, interval, limit=10000)
if not df.empty:
engine.run(df)
else:
print("No data found in DB.")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -84,12 +84,93 @@ class DatabaseManager:
logger.error(f"DB Query Error for {symbol} {interval}: {e}")
return []
class PingPongStrategy:
"""Core Strategy Logic for Ping-Pong Scalping"""
def __init__(self, config):
self.config = config
self.direction = config.get('direction', 'long')
def rma(self, series, length):
alpha = 1 / length
return series.ewm(alpha=alpha, adjust=False).mean()
def calculate_indicators(self, df):
# RSI
rsi_cfg = self.config['rsi']
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
df['rsi'] = 100 - (100 / (1 + (self.rma(gain, rsi_cfg['period']) / self.rma(loss, rsi_cfg['period']))))
# Hurst
hurst_cfg = self.config['hurst']
mcl = hurst_cfg['period'] / 2
mcl_2 = int(round(mcl / 2))
df['tr'] = np.maximum(df['high'] - df['low'], np.maximum(abs(df['high'] - df['close'].shift(1)), abs(df['low'] - df['close'].shift(1))))
df['ma_mcl'] = self.rma(df['close'], mcl)
df['atr_mcl'] = self.rma(df['tr'], mcl)
df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl'])
mcm_off = hurst_cfg['multiplier'] * df['atr_mcl']
df['hurst_upper'] = df['center'] + mcm_off
df['hurst_lower'] = df['center'] - mcm_off
return df
def check_signals(self, df):
if len(df) < 3: return None
# finished = candle that just closed (e.g. 10:30)
# prev = candle before that (e.g. 10:29)
finished = df.iloc[-2]
prev = df.iloc[-3]
rsi_cfg, hurst_cfg = self.config['rsi'] or {}, self.config['hurst'] or {}
def is_crossing_up(p_val, p_band, c_open, c_close, c_band):
# 1. Crossed up BETWEEN candles
between = p_val < p_band and c_close >= c_band
# 2. Crossed up WITHIN this candle
within = c_open is not None and c_open < c_band and c_close >= c_band
return between or within
def is_crossing_down(p_val, p_band, c_open, c_close, c_band):
# 1. Crossed down BETWEEN candles
between = p_val > p_band and c_close <= c_band
# 2. Crossed down WITHIN this candle
within = c_open is not None and c_open > c_band and c_close <= c_band
return between or within
# Hurst Signals
h_upper_cross_down = is_crossing_down(prev['close'], prev['hurst_upper'], finished['open'], finished['close'], finished['hurst_upper'])
h_lower_cross_down = is_crossing_down(prev['close'], prev['hurst_lower'], finished['open'], finished['close'], finished['hurst_lower'])
# RSI Signals
rsi_cross_up = is_crossing_up(prev['rsi'], rsi_cfg.get('oversold', 30), None, finished['rsi'], rsi_cfg.get('oversold', 30))
rsi_cross_down = is_crossing_down(prev['rsi'], rsi_cfg.get('overbought', 70), None, finished['rsi'], rsi_cfg.get('overbought', 70))
l_open = (rsi_cfg.get('enabled_for_open') and rsi_cross_up) or \
(hurst_cfg.get('enabled_for_open') and h_lower_cross_down)
l_close = (rsi_cfg.get('enabled_for_close') and rsi_cross_down) or \
(hurst_cfg.get('enabled_for_close') and h_upper_cross_down)
s_open = (rsi_cfg.get('enabled_for_open') and rsi_cross_down) or \
(hurst_cfg.get('enabled_for_open') and h_upper_cross_down)
s_close = (rsi_cfg.get('enabled_for_close') and rsi_cross_up) or \
(hurst_cfg.get('enabled_for_close') and h_lower_cross_down)
if self.direction == 'long':
return "open" if l_open else ("close" if l_close else None)
else:
return "open" if s_open else ("close" if s_close else None)
class PingPongBot:
def __init__(self, config_path="config/ping_pong_config.yaml"):
self.version = "1.5.7"
self.version = "1.7.5"
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.strategy = PingPongStrategy(self.config)
# Explicitly load from ENV to ensure they are available
self.api_key = os.getenv("BYBIT_API_KEY") or os.getenv("API_KEY")
self.api_secret = os.getenv("BYBIT_API_SECRET") or os.getenv("API_SECRET")
@ -132,51 +213,77 @@ class PingPongBot:
# Bot State
self.last_candle_time = None
self.last_candle_open = 0.0
self.last_candle_close = 0.0
self.last_candle_price = 0.0
self.current_indicators = {
"rsi": {"value": 0.0, "timestamp": "N/A"},
"hurst_lower": {"value": 0.0, "timestamp": "N/A"},
"hurst_upper": {"value": 0.0, "timestamp": "N/A"}
}
self.failure_history = []
self.position = None
self.wallet_balance = 0
self.available_balance = 0
self.start_equity = 0.0
self.start_equity_btc = 0.0
self.session_pnl = 0.0
self.session_pnl_btc = 0.0
self.total_fees = 0.0
self.total_realized_pnl = 0.0
self.market_price = 0.0
self.status_msg = "Initializing..."
self.last_signal = None
self.start_time = datetime.now()
self.console = Console()
# Transaction Logging
self.tx_log_path = "logs/ping_pong_transactions.csv"
self._init_tx_log()
# Fixed Parameters from Config
self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15))
self.min_val_usd = float(self.config.get('min_position_value_usd', 15.0))
self.pos_size_margin = float(self.config.get('pos_size_margin', 20.0))
self.leverage = float(self.config.get('exchange_leverage', 3.0))
self.leverage_long = float(self.config.get('leverage_long', 10.0))
self.leverage_short = float(self.config.get('leverage_short', 3.0))
self.leverage = 1.0 # Current leverage
self.max_eff_lev = float(self.config.get('max_effective_leverage', 1.0))
def rma(self, series, length):
alpha = 1 / length
return series.ewm(alpha=alpha, adjust=False).mean()
def _init_tx_log(self):
"""Ensures CSV header exists and is up to date"""
header = "time,version,direction,symbol,trade,qty,price,leverage,pnl,fee,status\n"
if not os.path.exists(self.tx_log_path):
os.makedirs(os.path.dirname(self.tx_log_path), exist_ok=True)
with open(self.tx_log_path, 'w') as f:
f.write(header)
else:
# Check if we need to update the header from 'side' to 'trade'
try:
with open(self.tx_log_path, 'r') as f:
first_line = f.readline()
if "side" in first_line:
with open(self.tx_log_path, 'r') as f:
lines = f.readlines()
if lines:
lines[0] = header
with open(self.tx_log_path, 'w') as f:
f.writelines(lines)
logger.info("Updated CSV log header: 'side' -> 'trade'")
except Exception as e:
logger.error(f"Failed to update CSV header: {e}")
async def log_transaction(self, trade, qty, price, pnl=0, fee=0, status="Success"):
"""Appends a trade record to CSV"""
try:
with open(self.tx_log_path, 'a') as f:
t_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{t_str},{self.version},{self.direction},{self.symbol},{trade},{qty},{price},{self.leverage},{pnl},{fee},{status}\n")
except Exception as e:
logger.error(f"Failed to write to CSV log: {e}")
def calculate_indicators(self, df):
# RSI
rsi_cfg = self.config['rsi']
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
df['rsi'] = 100 - (100 / (1 + (self.rma(gain, rsi_cfg['period']) / self.rma(loss, rsi_cfg['period']))))
# Hurst
hurst_cfg = self.config['hurst']
mcl = hurst_cfg['period'] / 2
mcl_2 = int(round(mcl / 2))
df['tr'] = np.maximum(df['high'] - df['low'], np.maximum(abs(df['high'] - df['close'].shift(1)), abs(df['low'] - df['close'].shift(1))))
df['ma_mcl'] = self.rma(df['close'], mcl)
df['atr_mcl'] = self.rma(df['tr'], mcl)
df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl'])
mcm_off = hurst_cfg['multiplier'] * df['atr_mcl']
df['hurst_upper'] = df['center'] + mcm_off
df['hurst_lower'] = df['center'] - mcm_off
df = self.strategy.calculate_indicators(df)
last_row = df.iloc[-1]
now_str = datetime.now().strftime("%H:%M:%S")
self.current_indicators["rsi"] = {"value": float(last_row['rsi']), "timestamp": now_str}
@ -214,19 +321,25 @@ class PingPongBot:
await self.close_all_positions()
self.direction = new_direction
self.strategy.direction = new_direction
if self.direction == "long":
self.category = "inverse"
self.symbol = f"{self.base_coin}USD"
self.settle_coin = self.base_coin
self.leverage = self.leverage_long
else:
self.category = "linear"
self.symbol = "BTCPERP" if self.base_coin == "BTC" else f"{self.base_coin}USDC"
self.settle_coin = "USDC"
self.leverage = self.leverage_short
# Perform swap
await self.swap_assets(new_direction)
logger.info(f"Bot configured for {self.direction.upper()} | Symbol: {self.symbol} | Category: {self.category}")
# Sync Leverage with Bybit
await self.set_exchange_leverage()
logger.info(f"Bot configured for {self.direction.upper()} | Symbol: {self.symbol} | Category: {self.category} | Leverage: {self.leverage}")
self.last_candle_time = None
return True
@ -236,6 +349,26 @@ class PingPongBot:
self.status_msg = f"Dir Error: {str(e)[:20]}"
return False
async def set_exchange_leverage(self):
"""Points Bybit API to set account leverage for current category/symbol"""
try:
if not self.category or not self.symbol: return
logger.info(f"Setting exchange leverage to {self.leverage}x for {self.symbol}...")
res = await asyncio.to_thread(self.session.set_leverage,
category=self.category,
symbol=self.symbol,
buyLeverage=str(self.leverage),
sellLeverage=str(self.leverage)
)
if res['retCode'] == 0:
logger.info(f"Leverage successfully set to {self.leverage}x")
elif res['retCode'] == 110043: # Leverage not modified
logger.info(f"Leverage is already {self.leverage}x")
else:
logger.warning(f"Bybit Leverage Warning: {res['retMsg']} (Code: {res['retCode']})")
except Exception as e:
logger.error(f"Failed to set leverage on Bybit: {e}")
async def close_all_positions(self):
"""Closes any active position in the current category/symbol"""
try:
@ -305,30 +438,25 @@ class PingPongBot:
if wallet['retCode'] == 0:
res_list = wallet['result']['list']
if res_list:
self.wallet_balance = float(res_list[0].get('totalWalletBalance', 0))
# Use totalEquity for NAV (Net Asset Value) tracking
current_equity = float(res_list[0].get('totalEquity', 0))
self.wallet_balance = current_equity
self.available_balance = float(res_list[0].get('totalAvailableBalance', 0))
# Calculate BTC-equivalent equity
current_equity_btc = current_equity / max(self.market_price, 1)
if self.start_equity == 0.0:
self.start_equity = current_equity
self.start_equity_btc = current_equity_btc
self.session_pnl = current_equity - self.start_equity
self.session_pnl_btc = current_equity_btc - self.start_equity_btc
except Exception as e:
logger.error(f"Exchange Sync Error: {e}")
def check_signals(self, df):
if len(df) < 2: return None
last, prev = df.iloc[-1], df.iloc[-2]
rsi_cfg, hurst_cfg = self.config['rsi'] or {}, self.config['hurst'] or {}
# Signals defined by crossover
l_open = (rsi_cfg.get('enabled_for_open') and prev['rsi'] < rsi_cfg.get('oversold', 30) and last['rsi'] >= rsi_cfg.get('oversold', 30)) or \
(hurst_cfg.get('enabled_for_open') and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'])
l_close = (rsi_cfg.get('enabled_for_close') and prev['rsi'] > rsi_cfg.get('overbought', 70) and last['rsi'] <= rsi_cfg.get('overbought', 70)) or \
(hurst_cfg.get('enabled_for_close') and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
s_open = (rsi_cfg.get('enabled_for_open') and prev['rsi'] > rsi_cfg.get('overbought', 70) and last['rsi'] <= rsi_cfg.get('overbought', 70)) or \
(hurst_cfg.get('enabled_for_open') and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
s_close = (rsi_cfg.get('enabled_for_close') and prev['rsi'] < rsi_cfg.get('oversold', 30) and last['rsi'] >= rsi_cfg.get('oversold', 30)) or \
(hurst_cfg.get('enabled_for_close') and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'])
if self.direction == 'long':
return "open" if l_open else ("close" if l_close else None)
else:
return "open" if s_open else ("close" if s_close else None)
return self.strategy.check_signals(df)
async def execute_trade(self, signal):
if not signal or not self.market_price: return
@ -359,7 +487,10 @@ class PingPongBot:
async def place_order(self, qty, is_close=False):
if not self.category or not self.symbol: return
side = "Sell" if (self.direction == "long" and is_close) or (self.direction == "short" and not is_close) else "Buy"
pos_idx = 1 if self.direction == "long" else 2
trade = "Exit" if is_close else "Enter"
# Using positionIdx=0 for One-Way Mode to avoid Error 10001
pos_idx = 0
try:
qty_str = str(int(qty)) if self.category == "inverse" else str(round(qty, 3))
@ -368,10 +499,46 @@ class PingPongBot:
qty=qty_str, reduceOnly=is_close, positionIdx=pos_idx
)
if res['retCode'] == 0:
self.last_signal = f"{side} {qty_str}"
self.status_msg = f"Order Success: {side}"
order_id = res['result']['orderId']
self.last_signal = f"{trade} {qty_str}"
self.status_msg = f"Order Success: {trade}"
# Fetch execution details for fees and PnL
await asyncio.sleep(1.5) # Wait for fill and indexing
exec_info = await asyncio.to_thread(self.session.get_executions,
category=self.category,
symbol=self.symbol,
orderId=order_id)
exec_fee = 0.0
exec_pnl = 0.0
exec_price = self.market_price
if exec_info['retCode'] == 0 and exec_info['result']['list']:
fills = exec_info['result']['list']
# Fees and closedPnl are in settleCoin (BTC for inverse, USDC for linear)
exec_fee = sum(float(f.get('execFee', 0)) for f in fills)
exec_pnl = sum(float(f.get('closedPnl', 0)) for f in fills)
exec_price = float(fills[0].get('execPrice', self.market_price))
# Convert to USD if in BTC for consistent tracking
if self.category == "inverse":
usd_fee = exec_fee * exec_price
usd_pnl = exec_pnl * exec_price
else:
usd_fee = exec_fee
usd_pnl = exec_pnl
self.total_fees += usd_fee
self.total_realized_pnl += usd_pnl
await self.log_transaction(trade, qty_str, exec_price, pnl=usd_pnl, fee=usd_fee, status="Filled")
else:
await self.log_transaction(trade, qty_str, self.market_price, status="Filled (No Exec Info)")
else:
self.status_msg = f"Order Error: {res['retMsg']}"
logger.error(f"Bybit Order Error: {res['retMsg']} (Code: {res['retCode']})")
await self.log_transaction(trade, qty_str, self.market_price, status=f"Error: {res['retMsg']}")
except Exception as e:
logger.error(f"Trade Error: {e}")
@ -382,7 +549,24 @@ class PingPongBot:
cfg_table.add_column("Property"); cfg_table.add_column("Value")
cfg_table.add_row("Symbol", self.symbol or "N/A"); cfg_table.add_row("Category", self.category or "N/A")
cfg_table.add_row("Market Price", f"${self.market_price:.2f}"); cfg_table.add_row("SMA(44, 1D)", f"${self.ma_44_val:.2f}")
cfg_table.add_row("Last Candle", f"{self.last_candle_time} (@${self.last_candle_price:.2f})")
cfg_table.add_row("Last Candle", f"{self.last_candle_time}")
cfg_table.add_row("Candle O / C", f"${self.last_candle_open:.2f} / ${self.last_candle_close:.2f}")
cfg_table.add_row("Leverage", f"{self.leverage}x")
# Running Stats
runtime = datetime.now() - self.start_time
runtime_str = str(runtime).split('.')[0] # Remove microseconds
pnl_color = "green" if self.session_pnl >= 0 else "red"
pnl_btc_color = "green" if self.session_pnl_btc >= 0 else "red"
net_realized_pnl = self.total_realized_pnl - self.total_fees
cfg_table.add_row("Running Time", runtime_str)
cfg_table.add_row("Session PnL (USD)", f"[bold {pnl_color}]{'$' if self.session_pnl >= 0 else '-$'}{abs(self.session_pnl):.2f}[/]")
cfg_table.add_row("Session PnL (BTC)", f"[bold {pnl_btc_color}]{'{:+.6f}'.format(self.session_pnl_btc)} BTC[/]")
cfg_table.add_row("Total Fees", f"[bold red]-${self.total_fees:.2f}[/]")
cfg_table.add_row("Gross Realized PnL", f"[bold {'green' if self.total_realized_pnl >= 0 else 'red'}]${self.total_realized_pnl:.2f}[/]")
cfg_table.add_row("Net Realized PnL", f"[bold {'green' if net_realized_pnl >= 0 else 'red'}]${net_realized_pnl:.2f}[/]")
ind_table = Table(title="INDICATORS", box=box.ROUNDED, expand=True)
ind_table.add_column("Indicator"); ind_table.add_column("Value"); ind_table.add_column("Updated")
@ -391,12 +575,31 @@ class PingPongBot:
ind_table.add_row(k.upper().replace("_", " "), f"{v['value']:.2f}", v['timestamp'])
pos_table = Table(title="POSITION", box=box.ROUNDED, expand=True)
pos_table.add_column("Account Equity"); pos_table.add_column("Size"); pos_table.add_column("Entry"); pos_table.add_column("PnL")
pos_table.add_column("Account Equity"); pos_table.add_column("Available"); pos_table.add_column("Size (BTC/USD)"); pos_table.add_column("Used Lev"); pos_table.add_column("PnL")
if self.position:
p_size = float(self.position['size'])
pnl = float(self.position['unrealisedPnl'])
pos_table.add_row(f"${self.wallet_balance:.2f}", self.position['size'], self.position['avgPrice'], f"[bold {'green' if pnl>=0 else 'red'}]${pnl:.2f}")
# Categorize by Inverse (BTCUSD) vs Linear (BTCPERP)
if self.category == "inverse":
size_usd = p_size
size_btc = size_usd / max(self.market_price, 1)
else:
size_btc = p_size
size_usd = size_btc * self.market_price
used_lev = size_usd / max(self.wallet_balance, 1)
pnl_str = f"[bold {'green' if pnl>=0 else 'red'}]${pnl:.2f}[/]"
pos_table.add_row(
f"${self.wallet_balance:.2f}",
f"${self.available_balance:.2f}",
f"{size_btc:.3f} / ${size_usd:.1f}",
f"{used_lev:.2f}x ({self.max_eff_lev}x)",
pnl_str
)
else:
pos_table.add_row(f"${self.wallet_balance:.2f}", "0", "-", "-")
pos_table.add_row(f"${self.wallet_balance:.2f}", f"${self.available_balance:.2f}", "0 / $0", f"0.00x ({self.max_eff_lev}x)", "-")
self.console.print(cfg_table); self.console.print(ind_table); self.console.print(pos_table)
self.console.print(f"[dim]Status: {self.status_msg} | Last Signal: {self.last_signal}[/]")
@ -433,7 +636,9 @@ class PingPongBot:
signal = self.check_signals(df)
if signal: await self.execute_trade(signal)
self.last_candle_time = latest['time']
self.last_candle_price = latest['close']
self.last_candle_open = float(latest['open'])
self.last_candle_close = float(latest['close'])
self.last_candle_price = self.last_candle_close
self.status_msg = f"New Candle: {latest['time'].strftime('%H:%M:%S')}"
self.render_dashboard()
@ -444,6 +649,49 @@ class PingPongBot:
await asyncio.sleep(5)
from math import floor
import sys
async def run_with_retries():
config_path = "config/ping_pong_config.yaml"
# Load config to see robustness settings
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
except Exception as e:
print(f"CRITICAL: Failed to load config: {e}")
sys.exit(1)
robust_cfg = config.get('robustness', {})
if not robust_cfg.get('enabled', True):
bot = PingPongBot(config_path)
await bot.run()
return
max_retries = robust_cfg.get('max_retries', 3)
window = robust_cfg.get('retry_window_seconds', 300)
failure_history = []
while True:
try:
bot = PingPongBot(config_path)
await bot.run()
# If run() returns normally, it means the bot stopped gracefully
break
except Exception as e:
now = time.time()
failure_history.append(now)
# Keep only failures within the window
failure_history = [t for t in failure_history if now - t <= window]
if len(failure_history) > max_retries:
logger.error(f"FATAL: Too many failures ({len(failure_history)}) within {window}s. Stopping bot.")
sys.exit(1)
wait_time = min(30, 5 * len(failure_history))
logger.warning(f"Bot crashed! Retry {len(failure_history)}/{max_retries} in {wait_time}s... Error: {e}")
await asyncio.sleep(wait_time)
if __name__ == "__main__":
bot = PingPongBot()
asyncio.run(bot.run())
asyncio.run(run_with_retries())