Files
btc-trading/src/strategies/ping_pong_bot.py
Gemini CLI cd66a976de feat: implement Mid-Price logic and separate error logging (v1.8.6)
- Implement Mid-Price calculation (Bid+Ask)/2 for Maker orders to improve fill rates.
- Add separate 'logs/ping_pong_errors.log' for WARNING and ERROR messages.
- Add RotatingFileHandler for error logs (5MB cap, 2 backups).
- Refine 'leverage not modified' (110043) handling from ERROR to INFO.
- Improve order verification with explicit status checks and race condition handling.
- Verify script syntax and update version to 1.8.6.
2026-03-08 22:07:11 +01:00

809 lines
38 KiB
Python

import os
import time
import yaml
import hmac
import hashlib
import json
import logging
from logging.handlers import RotatingFileHandler
import asyncio
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.layout import Layout
from rich import box
import asyncpg
# Try to import pybit
try:
from pybit.unified_trading import HTTP
except ImportError:
print("Error: 'pybit' library not found. Please install it with: pip install pybit")
exit(1)
# Load environment variables
load_dotenv()
log_level = os.getenv("LOG_LEVEL", "INFO")
# Setup Logging
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
error_log_path = os.path.join(log_dir, "ping_pong_errors.log")
# Create logger
logger = logging.getLogger("PingPongBot")
logger.setLevel(logging.DEBUG) # Catch everything, handlers will filter
# Formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Console Handler (Normal logs)
ch = logging.StreamHandler()
ch.setLevel(getattr(logging, log_level))
ch.setFormatter(formatter)
logger.addHandler(ch)
# Error File Handler (Warnings and Errors only)
fh = RotatingFileHandler(error_log_path, maxBytes=5*1024*1024, backupCount=2)
fh.setLevel(logging.WARNING)
fh.setFormatter(formatter)
logger.addHandler(fh)
class DatabaseManager:
"""Minimal Database Manager for the bot"""
def __init__(self, host, port, database, user, password):
self.host = host
self.port = int(port)
self.database = database
self.user = user
self.password = password
self.pool = None
async def connect(self):
try:
self.pool = await asyncpg.create_pool(
host=self.host, port=self.port, user=self.user,
password=self.password, database=self.database,
min_size=1, max_size=10
)
# Test connection
async with self.pool.acquire() as conn:
res = await conn.fetchval("SELECT 1")
if res == 1:
logger.info(f"Database connection verified at {self.host}:{self.port}")
else:
raise Exception("Database test query failed")
except Exception as e:
logger.error(f"DATABASE CONNECTION FAILED: {e}")
raise
async def get_candles(self, symbol: str, interval: str, limit: int = 100):
if not self.pool:
logger.error("Attempted to query DB before connecting")
return []
try:
async with self.pool.acquire() as conn:
rows = await conn.fetch('''
SELECT time, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2
ORDER BY time DESC LIMIT $3
''', symbol, interval, limit)
return [dict(r) for r in rows]
except Exception as e:
logger.error(f"DB Query Error for {symbol} {interval}: {e}")
return []
class PingPongStrategy:
"""Core Strategy Logic for Ping-Pong Scalping"""
def __init__(self, config):
self.config = config
self.direction = config.get('direction', 'long')
def rma(self, series, length):
alpha = 1 / length
return series.ewm(alpha=alpha, adjust=False).mean()
def calculate_indicators(self, df):
# RSI
rsi_cfg = self.config['rsi']
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
df['rsi'] = 100 - (100 / (1 + (self.rma(gain, rsi_cfg['period']) / self.rma(loss, rsi_cfg['period']))))
# Hurst
hurst_cfg = self.config['hurst']
mcl = hurst_cfg['period'] / 2
mcl_2 = int(round(mcl / 2))
df['tr'] = np.maximum(df['high'] - df['low'], np.maximum(abs(df['high'] - df['close'].shift(1)), abs(df['low'] - df['close'].shift(1))))
df['ma_mcl'] = self.rma(df['close'], mcl)
df['atr_mcl'] = self.rma(df['tr'], mcl)
df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl'])
mcm_off = hurst_cfg['multiplier'] * df['atr_mcl']
df['hurst_upper'] = df['center'] + mcm_off
df['hurst_lower'] = df['center'] - mcm_off
return df
def check_signals(self, df):
if len(df) < 3: return None
# finished = candle that just closed (e.g. 10:30)
# prev = candle before that (e.g. 10:29)
finished = df.iloc[-2]
prev = df.iloc[-3]
rsi_cfg, hurst_cfg = self.config['rsi'] or {}, self.config['hurst'] or {}
def is_crossing_up(p_val, p_band, c_open, c_close, c_band):
# 1. Crossed up BETWEEN candles
between = p_val < p_band and c_close >= c_band
# 2. Crossed up WITHIN this candle
within = c_open is not None and c_open < c_band and c_close >= c_band
return between or within
def is_crossing_down(p_val, p_band, c_open, c_close, c_band):
# 1. Crossed down BETWEEN candles
between = p_val > p_band and c_close <= c_band
# 2. Crossed down WITHIN this candle
within = c_open is not None and c_open > c_band and c_close <= c_band
return between or within
# Hurst Signals
h_upper_cross_down = is_crossing_down(prev['close'], prev['hurst_upper'], finished['open'], finished['close'], finished['hurst_upper'])
h_lower_cross_down = is_crossing_down(prev['close'], prev['hurst_lower'], finished['open'], finished['close'], finished['hurst_lower'])
# RSI Signals
rsi_cross_up = is_crossing_up(prev['rsi'], rsi_cfg.get('oversold', 30), None, finished['rsi'], rsi_cfg.get('oversold', 30))
rsi_cross_down = is_crossing_down(prev['rsi'], rsi_cfg.get('overbought', 70), None, finished['rsi'], rsi_cfg.get('overbought', 70))
l_open = (rsi_cfg.get('enabled_for_open') and rsi_cross_up) or \
(hurst_cfg.get('enabled_for_open') and h_lower_cross_down)
l_close = (rsi_cfg.get('enabled_for_close') and rsi_cross_down) or \
(hurst_cfg.get('enabled_for_close') and h_upper_cross_down)
s_open = (rsi_cfg.get('enabled_for_open') and rsi_cross_down) or \
(hurst_cfg.get('enabled_for_open') and h_upper_cross_down)
s_close = (rsi_cfg.get('enabled_for_close') and rsi_cross_up) or \
(hurst_cfg.get('enabled_for_close') and h_lower_cross_down)
if self.direction == 'long':
return "open" if l_open else ("close" if l_close else None)
else:
return "open" if s_open else ("close" if s_close else None)
class PingPongBot:
def __init__(self, config_path="config/ping_pong_config.yaml"):
self.version = "1.8.6"
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.strategy = PingPongStrategy(self.config)
# Explicitly load from ENV to ensure they are available
self.api_key = os.getenv("BYBIT_API_KEY") or os.getenv("API_KEY")
self.api_secret = os.getenv("BYBIT_API_SECRET") or os.getenv("API_SECRET")
if not self.api_key or not self.api_secret:
raise ValueError("API_KEY and API_SECRET must be set in .env file")
self.session = HTTP(
testnet=False,
api_key=self.api_key,
api_secret=self.api_secret,
timeout=10
)
# Initialize DB with explicit credentials
self.db = DatabaseManager(
host=os.getenv('DB_HOST', '20.20.20.20'),
port=os.getenv('DB_PORT', 5433),
database=os.getenv('DB_NAME', 'btc_data'),
user=os.getenv('DB_USER', 'btc_bot'),
password=os.getenv('DB_PASSWORD', '')
)
# Base settings
raw_symbol = self.config['symbol'].upper()
self.base_coin = raw_symbol.replace("USDT", "").replace("USDC", "").replace("USD", "")
self.db_symbol = self.base_coin
self.interval = str(self.config['interval'])
self.db_interval = self.interval + "m" if self.interval.isdigit() else self.interval
# Dynamic Strategy State
self.direction = None
self.category = None
self.symbol = None
self.settle_coin = None
# Tracking for SMA(44, 1D)
self.ma_44_val = 0.0
self.last_ma_check_time = 0
# Bot State
self.last_candle_time = None
self.last_candle_open = 0.0
self.last_candle_close = 0.0
self.last_candle_price = 0.0
self.current_indicators = {
"rsi": {"value": 0.0, "timestamp": "N/A"},
"hurst_lower": {"value": 0.0, "timestamp": "N/A"},
"hurst_upper": {"value": 0.0, "timestamp": "N/A"}
}
self.failure_history = []
self.position = None
self.wallet_balance = 0
self.available_balance = 0
self.start_equity = 0.0
self.start_equity_btc = 0.0
self.session_pnl = 0.0
self.session_pnl_btc = 0.0
self.total_fees = 0.0
self.total_realized_pnl = 0.0
self.market_price = 0.0
self.status_msg = "Initializing..."
self.last_signal = None
self.start_time = datetime.now()
self.console = Console()
# Transaction Logging
self.tx_log_path = "logs/ping_pong_transactions.csv"
self._init_tx_log()
# Fixed Parameters from Config
self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15))
self.min_val_usd = float(self.config.get('min_position_value_usd', 15.0))
self.pos_size_margin = float(self.config.get('pos_size_margin', 20.0))
self.leverage_long = float(self.config.get('leverage_long', 10.0))
self.leverage_short = float(self.config.get('leverage_short', 3.0))
self.leverage = 1.0 # Current leverage
self.max_eff_lev = float(self.config.get('max_effective_leverage', 1.0))
self.exec_type = self.config.get('execution_type', 'taker').lower()
def _init_tx_log(self):
"""Ensures CSV header exists and is up to date"""
header = "time,version,direction,symbol,trade,qty,price,leverage,pnl,fee,attempts,status\n"
if not os.path.exists(self.tx_log_path):
os.makedirs(os.path.dirname(self.tx_log_path), exist_ok=True)
with open(self.tx_log_path, 'w') as f:
f.write(header)
else:
# Check if we need to update the header
try:
with open(self.tx_log_path, 'r') as f:
first_line = f.readline()
if "attempts" not in first_line:
with open(self.tx_log_path, 'r') as f:
lines = f.readlines()
if lines:
lines[0] = header
with open(self.tx_log_path, 'w') as f:
f.writelines(lines)
logger.info("Updated CSV log header: Added 'attempts' column")
except Exception as e:
logger.error(f"Failed to update CSV header: {e}")
async def log_transaction(self, trade, qty, price, pnl=0, fee=0, attempts=1, status="Success"):
"""Appends a trade record to CSV"""
try:
with open(self.tx_log_path, 'a') as f:
t_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{t_str},{self.version},{self.direction},{self.symbol},{trade},{qty},{price},{self.leverage},{pnl},{fee},{attempts},{status}\n")
except Exception as e:
logger.error(f"Failed to write to CSV log: {e}")
def calculate_indicators(self, df):
df = self.strategy.calculate_indicators(df)
last_row = df.iloc[-1]
now_str = datetime.now().strftime("%H:%M:%S")
self.current_indicators["rsi"] = {"value": float(last_row['rsi']), "timestamp": now_str}
self.current_indicators["hurst_lower"] = {"value": float(last_row['hurst_lower']), "timestamp": now_str}
self.current_indicators["hurst_upper"] = {"value": float(last_row['hurst_upper']), "timestamp": now_str}
return df
async def update_direction(self):
"""Logic Point I: 1D MA44 check and Point II: Asset/Perp selection"""
try:
logger.info(f"Checking direction based on SMA(44, 1D) for {self.db_symbol}...")
candles_1d = await self.db.get_candles(self.db_symbol, "1d", limit=100)
if not candles_1d or len(candles_1d) < 44:
got = len(candles_1d) if candles_1d else 0
self.status_msg = f"Error: Need 44 1D candles (Got {got})"
return False
df_1d = pd.DataFrame(candles_1d[::-1])
df_1d['close'] = df_1d['close'].astype(float)
self.ma_44_val = df_1d['close'].rolling(window=44).mean().iloc[-1]
# Use BTCUSDT (Linear) for reliable initial price check
ticker = await asyncio.to_thread(self.session.get_tickers, category="linear", symbol=f"{self.base_coin}USDT")
current_price = float(ticker['result']['list'][0]['lastPrice'])
self.market_price = current_price
new_direction = "long" if current_price > self.ma_44_val else "short"
if new_direction != self.direction:
logger.info(f"DIRECTION CHANGE: {self.direction} -> {new_direction} (Price: {current_price:.2f}, MA44: {self.ma_44_val:.2f})")
self.status_msg = f"Switching to {new_direction.upper()}"
if self.direction is not None:
await self.close_all_positions()
self.direction = new_direction
self.strategy.direction = new_direction
if self.direction == "long":
self.category = "inverse"
self.symbol = f"{self.base_coin}USD"
self.settle_coin = self.base_coin
self.leverage = self.leverage_long
else:
self.category = "linear"
self.symbol = "BTCPERP" if self.base_coin == "BTC" else f"{self.base_coin}USDC"
self.settle_coin = "USDC"
self.leverage = self.leverage_short
# Perform swap
await self.swap_assets(new_direction)
# Sync Leverage with Bybit
await self.set_exchange_leverage()
logger.info(f"Bot configured for {self.direction.upper()} | Symbol: {self.symbol} | Category: {self.category} | Leverage: {self.leverage}")
self.last_candle_time = None
return True
return False
except Exception as e:
logger.error(f"Direction Update Error: {e}")
self.status_msg = f"Dir Error: {str(e)[:20]}"
return False
async def set_exchange_leverage(self):
"""Points Bybit API to set account leverage for current category/symbol"""
try:
if not self.category or not self.symbol: return
logger.info(f"Setting exchange leverage to {self.leverage}x for {self.symbol}...")
res = await asyncio.to_thread(self.session.set_leverage,
category=self.category,
symbol=self.symbol,
buyLeverage=str(self.leverage),
sellLeverage=str(self.leverage)
)
# If pybit returns normally, check the retCode
if res['retCode'] == 0:
logger.info(f"Leverage successfully set to {self.leverage}x")
elif res['retCode'] == 110043: # Leverage not modified
logger.info(f"Leverage is already {self.leverage}x")
else:
logger.warning(f"Bybit Leverage Warning: {res['retMsg']} (Code: {res['retCode']})")
except Exception as e:
# Check if exception contains "leverage not modified" or code 110043
err_str = str(e)
if "110043" in err_str or "leverage not modified" in err_str.lower():
logger.info(f"Leverage is already correctly set ({self.leverage}x)")
else:
logger.error(f"Failed to set leverage on Bybit: {e}")
async def close_all_positions(self):
"""Closes any active position in the current category/symbol"""
try:
if not self.category or not self.symbol: return
pos = await asyncio.to_thread(self.session.get_positions, category=self.category, symbol=self.symbol)
if pos['retCode'] == 0:
for p in pos['result']['list']:
if float(p.get('size', 0)) > 0:
logger.info(f"Closing existing position: {p['size']} {self.symbol}")
await self.place_order(float(p['size']), is_close=True)
except Exception as e:
logger.error(f"Error closing positions: {e}")
async def swap_assets(self, target_direction):
"""Point II: Exchange BTC/USDC on Spot market with proper rounding"""
try:
logger.info(f"Swapping assets for {target_direction.upper()} mode...")
spot_symbol = f"{self.base_coin}USDC"
# Use accountType='UNIFIED' for UTA accounts
balance = await asyncio.to_thread(self.session.get_wallet_balance, accountType="UNIFIED", coin=f"{self.base_coin},USDC")
coins = {c['coin']: float(c['walletBalance']) for c in balance['result']['list'][0]['coin']}
logger.info(f"Current Balances: {coins}")
if target_direction == "short":
# SHORT: Need USDC, Sell BTC (Max 6 decimals for BTCUSDC spot)
btc_bal = floor(coins.get(self.base_coin, 0) * 1000000) / 1000000
if btc_bal > 0.000001:
logger.info(f"Spot: Selling {btc_bal} {self.base_coin} for USDC")
res = await asyncio.to_thread(self.session.place_order,
category="spot", symbol=spot_symbol, side="Sell", orderType="Market", qty=f"{btc_bal:.6f}"
)
logger.info(f"Swap Result: {res['retMsg']}")
else:
# LONG: Need BTC, Buy BTC with USDC (Max 4 decimals for USDC amount)
usdc_bal = floor(coins.get("USDC", 0) * 10000) / 10000
if usdc_bal > 1.0:
logger.info(f"Spot: Buying {self.base_coin} with {usdc_bal} USDC")
# marketUnit='quote' means spending USDC
res = await asyncio.to_thread(self.session.place_order,
category="spot", symbol=spot_symbol, side="Buy", orderType="Market",
qty=f"{usdc_bal:.4f}", marketUnit="quote"
)
logger.info(f"Swap Result: {res['retMsg']}")
await asyncio.sleep(5) # Wait for spot settlement
except Exception as e:
logger.error(f"Asset Swap Error: {e}")
async def update_exchange_data(self):
if not self.category or not self.symbol: return
try:
ticker = await asyncio.to_thread(self.session.get_tickers, category=self.category, symbol=self.symbol)
if ticker['retCode'] == 0:
self.market_price = float(ticker['result']['list'][0]['lastPrice'])
# settleCoin is only for USDC linear perpetuals
settle_coin = "USDC" if (self.category == "linear" and "USDC" in self.symbol) else None
pos = await asyncio.to_thread(self.session.get_positions, category=self.category, symbol=self.symbol, settleCoin=settle_coin)
if pos['retCode'] == 0:
active = [p for p in pos['result']['list'] if float(p.get('size', 0)) > 0]
self.position = active[0] if active else None
target_coin = self.settle_coin
wallet = await asyncio.to_thread(self.session.get_wallet_balance, category=self.category, accountType="UNIFIED", coin=target_coin)
if wallet['retCode'] == 0:
res_list = wallet['result']['list']
if res_list:
# Use totalEquity for NAV (Net Asset Value) tracking
current_equity = float(res_list[0].get('totalEquity', 0))
self.wallet_balance = current_equity
self.available_balance = float(res_list[0].get('totalAvailableBalance', 0))
# Calculate BTC-equivalent equity
current_equity_btc = current_equity / max(self.market_price, 1)
if self.start_equity == 0.0:
self.start_equity = current_equity
self.start_equity_btc = current_equity_btc
self.session_pnl = current_equity - self.start_equity
self.session_pnl_btc = current_equity_btc - self.start_equity_btc
except Exception as e:
logger.error(f"Exchange Sync Error: {e}")
def check_signals(self, df):
return self.strategy.check_signals(df)
async def execute_trade(self, signal):
if not signal or not self.market_price: return
last_price = self.market_price
if signal == "close" and self.position:
qty = float(self.position['size']) * self.partial_exit_pct
if (float(self.position['size']) - qty) * last_price < self.min_val_usd:
qty = float(self.position['size'])
await self.place_order(qty, is_close=True)
elif signal == "open":
cur_qty = float(self.position['size']) if self.position else 0
if self.category == "linear":
cur_notional = cur_qty * last_price
ping_notional = self.pos_size_margin * self.leverage
qty_to_open = ping_notional / last_price
else: # Inverse
cur_notional = cur_qty
ping_notional = self.pos_size_margin * self.leverage
qty_to_open = ping_notional
if (cur_notional + ping_notional) / max(self.wallet_balance, 1) <= self.max_eff_lev:
await self.place_order(qty_to_open, is_close=False)
else:
self.status_msg = "Max Leverage Reached"
async def place_order(self, qty, is_close=False):
if not self.category or not self.symbol: return
side = "Sell" if (self.direction == "long" and is_close) or (self.direction == "short" and not is_close) else "Buy"
trade = "Exit" if is_close else "Enter"
pos_idx = 0
qty_str = str(int(qty)) if self.category == "inverse" else str(round(qty, 3))
if self.exec_type != "maker":
try:
res = await asyncio.to_thread(self.session.place_order,
category=self.category, symbol=self.symbol, side=side, orderType="Market",
qty=qty_str, reduceOnly=is_close, positionIdx=pos_idx
)
if res['retCode'] == 0:
await self._process_filled_order(res['result']['orderId'], trade, qty_str, attempts=1)
else:
self.status_msg = f"Order Error: {res['retMsg']}"
except Exception as e:
logger.error(f"Taker Trade Error: {e}")
return
# Maker Chase Logic (Max 5 tries)
max_retries = 5
for attempt in range(1, max_retries + 1):
try:
# Fresh Bid/Ask for Mid-Price Limit order
ticker = await asyncio.to_thread(self.session.get_tickers, category=self.category, symbol=self.symbol)
if ticker['retCode'] == 0 and ticker['result']['list']:
t = ticker['result']['list'][0]
bid = float(t.get('bid1Price', 0))
ask = float(t.get('ask1Price', 0))
last = float(t.get('lastPrice', 0))
if bid > 0 and ask > 0:
self.market_price = (bid + ask) / 2
else:
self.market_price = last
price_str = str(round(self.market_price, 1))
self.status_msg = f"Chase {trade}: {attempt}/{max_retries} @ {price_str} (Mid)"
res = await asyncio.to_thread(self.session.place_order,
category=self.category, symbol=self.symbol, side=side, orderType="Limit",
qty=qty_str, price=price_str, timeInForce="PostOnly",
reduceOnly=is_close, positionIdx=pos_idx
)
if res['retCode'] != 0:
# Specific check for race condition: order filled while trying to place/cancel
if res['retCode'] in [110001, 170213, 170210]:
# Check if actually filled
history = await asyncio.to_thread(self.session.get_order_history,
category=self.category, symbol=self.symbol, limit=1)
if history['retCode'] == 0 and history['result']['list']:
latest = history['result']['list'][0]
if latest['orderStatus'] == "Filled" and float(latest['cumExecQty']) > 0:
await self._process_filled_order(latest['orderId'], trade, qty_str, attempts=attempt)
return
logger.warning(f"Maker rejected (Try {attempt}): {res['retMsg']}")
await asyncio.sleep(2)
continue
order_id = res['result']['orderId']
# Monitor for fill (Wait 10 seconds)
for _ in range(10):
await asyncio.sleep(1)
# Check order history for definitive status
history = await asyncio.to_thread(self.session.get_order_history,
category=self.category, symbol=self.symbol, orderId=order_id)
if history['retCode'] == 0 and history['result']['list']:
status = history['result']['list'][0]['orderStatus']
if status == "Filled":
await self._process_filled_order(order_id, trade, qty_str, attempts=attempt)
return
elif status in ["Cancelled", "Rejected", "Deactivated"]:
break # Go to retry
# Timeout: Cancel and retry
try:
cancel_res = await asyncio.to_thread(self.session.cancel_order, category=self.category, symbol=self.symbol, orderId=order_id)
# Even if successful, double check if it filled in the last millisecond
if cancel_res['retCode'] in [0, 110001, 170213]:
history = await asyncio.to_thread(self.session.get_order_history,
category=self.category, symbol=self.symbol, orderId=order_id)
if history['retCode'] == 0 and history['result']['list'] and history['result']['list'][0]['orderStatus'] == "Filled":
await self._process_filled_order(order_id, trade, qty_str, attempts=attempt)
return
except Exception as ce:
# Handle exception for 110001
if "110001" in str(ce) or "170213" in str(ce):
history = await asyncio.to_thread(self.session.get_order_history,
category=self.category, symbol=self.symbol, orderId=order_id)
if history['retCode'] == 0 and history['result']['list'] and history['result']['list'][0]['orderStatus'] == "Filled":
await self._process_filled_order(order_id, trade, qty_str, attempts=attempt)
return
logger.warning(f"Cancel error during chase: {ce}")
logger.info(f"Maker {trade} timed out, retrying ({attempt}/{max_retries})")
except Exception as e:
logger.error(f"Maker Chase Error (Try {attempt}): {e}")
await asyncio.sleep(2)
self.status_msg = f"{trade} failed after {max_retries} chase attempts"
await self.log_transaction(trade, qty_str, self.market_price, attempts=max_retries, status="Failed (Chase Timeout)")
async def _process_filled_order(self, order_id, trade, qty_str, attempts=1):
"""Finalizes a successful trade by logging fees and PnL"""
self.last_signal = f"{trade} {qty_str}"
self.status_msg = f"Order Success: {trade} ({self.exec_type})"
# Wait for Bybit indexing (multiple attempts if needed)
for _ in range(3):
await asyncio.sleep(1.5)
try:
exec_info = await asyncio.to_thread(self.session.get_executions,
category=self.category,
symbol=self.symbol,
orderId=order_id)
if exec_info['retCode'] == 0 and exec_info['result']['list']:
fills = exec_info['result']['list']
exec_fee = sum(float(f.get('execFee', 0)) for f in fills)
exec_pnl = sum(float(f.get('closedPnl', 0)) for f in fills)
exec_price = float(fills[0].get('execPrice', self.market_price))
if self.category == "inverse":
usd_fee = exec_fee * exec_price
usd_pnl = exec_pnl * exec_price
else:
usd_fee = exec_fee
usd_pnl = exec_pnl
self.total_fees += usd_fee
self.total_realized_pnl += usd_pnl
await self.log_transaction(trade, qty_str, exec_price, pnl=usd_pnl, fee=usd_fee, attempts=attempts, status="Filled")
return
except Exception as e:
logger.error(f"Execution fetch error: {e}")
# Fallback if execution list is still empty after retries
await self.log_transaction(trade, qty_str, self.market_price, attempts=attempts, status=f"Filled ({self.exec_type})")
def render_dashboard(self):
self.console.print("\n" + "="*60)
title = f"PING-PONG BOT v{self.version} [{self.direction.upper() if self.direction else 'INIT'}]"
cfg_table = Table(title=title, box=box.ROUNDED, expand=True)
cfg_table.add_column("Property"); cfg_table.add_column("Value")
cfg_table.add_row("Symbol", self.symbol or "N/A"); cfg_table.add_row("Category", self.category or "N/A")
cfg_table.add_row("Market Price", f"${self.market_price:.2f}"); cfg_table.add_row("SMA(44, 1D)", f"${self.ma_44_val:.2f}")
cfg_table.add_row("Last Candle", f"{self.last_candle_time}")
cfg_table.add_row("Candle O / C", f"${self.last_candle_open:.2f} / ${self.last_candle_close:.2f}")
cfg_table.add_row("Leverage", f"{self.leverage}x")
# Running Stats
runtime = datetime.now() - self.start_time
runtime_str = str(runtime).split('.')[0] # Remove microseconds
pnl_color = "green" if self.session_pnl >= 0 else "red"
pnl_btc_color = "green" if self.session_pnl_btc >= 0 else "red"
net_realized_pnl = self.total_realized_pnl - self.total_fees
cfg_table.add_row("Running Time", runtime_str)
cfg_table.add_row("Session PnL (USD)", f"[bold {pnl_color}]{'$' if self.session_pnl >= 0 else '-$'}{abs(self.session_pnl):.2f}[/]")
cfg_table.add_row("Session PnL (BTC)", f"[bold {pnl_btc_color}]{'{:+.6f}'.format(self.session_pnl_btc)} BTC[/]")
cfg_table.add_row("Total Fees", f"[bold red]-${self.total_fees:.2f}[/]")
cfg_table.add_row("Gross Realized PnL", f"[bold {'green' if self.total_realized_pnl >= 0 else 'red'}]${self.total_realized_pnl:.2f}[/]")
cfg_table.add_row("Net Realized PnL", f"[bold {'green' if net_realized_pnl >= 0 else 'red'}]${net_realized_pnl:.2f}[/]")
ind_table = Table(title="INDICATORS", box=box.ROUNDED, expand=True)
ind_table.add_column("Indicator"); ind_table.add_column("Value"); ind_table.add_column("Updated")
for k in ["hurst_upper", "hurst_lower", "rsi"]:
v = self.current_indicators[k]
ind_table.add_row(k.upper().replace("_", " "), f"{v['value']:.2f}", v['timestamp'])
pos_table = Table(title="POSITION", box=box.ROUNDED, expand=True)
pos_table.add_column("Account Equity"); pos_table.add_column("Available"); pos_table.add_column("Size (BTC/USD)"); pos_table.add_column("Used Lev"); pos_table.add_column("PnL")
if self.position:
p_size = float(self.position['size'])
pnl = float(self.position['unrealisedPnl'])
# Categorize by Inverse (BTCUSD) vs Linear (BTCPERP)
if self.category == "inverse":
size_usd = p_size
size_btc = size_usd / max(self.market_price, 1)
else:
size_btc = p_size
size_usd = size_btc * self.market_price
used_lev = size_usd / max(self.wallet_balance, 1)
pnl_str = f"[bold {'green' if pnl>=0 else 'red'}]${pnl:.2f}[/]"
pos_table.add_row(
f"${self.wallet_balance:.2f}",
f"${self.available_balance:.2f}",
f"{size_btc:.3f} / ${size_usd:.1f}",
f"{used_lev:.2f}x ({self.max_eff_lev}x)",
pnl_str
)
else:
pos_table.add_row(f"${self.wallet_balance:.2f}", f"${self.available_balance:.2f}", "0 / $0", f"0.00x ({self.max_eff_lev}x)", "-")
self.console.print(cfg_table); self.console.print(ind_table); self.console.print(pos_table)
self.console.print(f"[dim]Status: {self.status_msg} | Last Signal: {self.last_signal}[/]")
self.console.print("="*60 + "\n")
async def run(self):
try:
await self.db.connect()
await self.update_direction()
except Exception as e:
logger.error(f"Startup Failure: {e}")
return
last_exchange_update = 0
while True:
try:
now = time.time()
if now - self.last_ma_check_time >= 120:
await self.update_direction()
self.last_ma_check_time = now
if now - last_exchange_update >= 15:
await self.update_exchange_data()
last_exchange_update = now
candles = await self.db.get_candles(self.db_symbol, self.db_interval, limit=100)
if candles:
latest = candles[0]
if latest['time'] != self.last_candle_time:
df = pd.DataFrame(candles[::-1])
df = df.astype({'open': float, 'high': float, 'low': float, 'close': float, 'volume': float})
df = self.calculate_indicators(df)
signal = self.check_signals(df)
if signal: await self.execute_trade(signal)
self.last_candle_time = latest['time']
self.last_candle_open = float(latest['open'])
self.last_candle_close = float(latest['close'])
self.last_candle_price = self.last_candle_close
self.status_msg = f"New Candle: {latest['time'].strftime('%H:%M:%S')}"
self.render_dashboard()
except Exception as e:
logger.error(f"Loop error: {e}")
self.status_msg = f"Error: {str(e)[:40]}"
await asyncio.sleep(5)
from math import floor
import sys
async def run_with_retries():
config_path = "config/ping_pong_config.yaml"
# Load config to see robustness settings
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
except Exception as e:
print(f"CRITICAL: Failed to load config: {e}")
sys.exit(1)
robust_cfg = config.get('robustness', {})
if not robust_cfg.get('enabled', True):
bot = PingPongBot(config_path)
await bot.run()
return
max_retries = robust_cfg.get('max_retries', 3)
window = robust_cfg.get('retry_window_seconds', 300)
failure_history = []
while True:
try:
bot = PingPongBot(config_path)
await bot.run()
# If run() returns normally, it means the bot stopped gracefully
break
except Exception as e:
now = time.time()
failure_history.append(now)
# Keep only failures within the window
failure_history = [t for t in failure_history if now - t <= window]
if len(failure_history) > max_retries:
logger.error(f"FATAL: Too many failures ({len(failure_history)}) within {window}s. Stopping bot.")
sys.exit(1)
wait_time = min(30, 5 * len(failure_history))
logger.warning(f"Bot crashed! Retry {len(failure_history)}/{max_retries} in {wait_time}s... Error: {e}")
await asyncio.sleep(wait_time)
if __name__ == "__main__":
asyncio.run(run_with_retries())