feat: align ping_pong_bot logic with STRATEGY_PING_PONG.md

This commit is contained in:
Gemini CLI
2026-03-05 21:21:34 +01:00
parent d08f12c6aa
commit 07c71ab209

View File

@ -113,6 +113,7 @@ class PingPongBot:
df['tr'] = df[['h_l', 'h_pc', 'l_pc']].max(axis=1) df['tr'] = df[['h_l', 'h_pc', 'l_pc']].max(axis=1)
df['ma_mcl'] = self.rma(df['close'], mcl) df['ma_mcl'] = self.rma(df['close'], mcl)
df['max_tr'] = df['tr'].rolling(window=int(mcl)).max() # ATR proxy or just RMA(TR)? RMA(TR) is standard for ATR
df['atr_mcl'] = self.rma(df['tr'], mcl) df['atr_mcl'] = self.rma(df['tr'], mcl)
df['center'] = df['ma_mcl'].shift(mcl_2) df['center'] = df['ma_mcl'].shift(mcl_2)
@ -133,18 +134,15 @@ class PingPongBot:
def render_dashboard(self): def render_dashboard(self):
"""Render a clean summary of the bot state""" """Render a clean summary of the bot state"""
# We don't clear to avoid flickering in some terminals, just print a separator or use rich.Live if needed
# But for simple docker logs, clear is usually fine or just printing the table.
# 1. Config Table # 1. Config Table
cfg_table = Table(title="[bold cyan]PING-PONG BOT CONFIG[/]", box=box.ROUNDED, expand=True) cfg_table = Table(title="[bold cyan]PING-PONG BOT CONFIG[/]", box=box.ROUNDED, expand=True)
cfg_table.add_column("Property", style="dim") cfg_table.add_column("Property", style="dim")
cfg_table.add_column("Value", style="bold white") cfg_table.add_column("Value", style="bold white")
cfg_table.add_row("Symbol", f"{self.symbol} ({self.interval}m)") cfg_table.add_row("Symbol", f"{self.symbol} ({self.interval}m)")
cfg_table.add_row("Direction", f"{self.direction.upper()}") cfg_table.add_row("Direction", f"{self.direction.upper()}")
cfg_table.add_row("Capital/Margin", f"${self.pos_size_margin} (Lev: {self.leverage}x)") cfg_table.add_row("Margin/Ping", f"${self.pos_size_margin} (Lev: {self.leverage}x)")
cfg_table.add_row("Max Account Lev", f"{self.max_eff_lev}x")
cfg_table.add_row("Partial Exit", f"{self.partial_exit_pct*100}%") cfg_table.add_row("Partial Exit", f"{self.partial_exit_pct*100}%")
cfg_table.add_row("TP Target", f"{self.tp_pct*100}%")
# 2. Indicators Table # 2. Indicators Table
ind_table = Table(title="[bold yellow]TECHNICAL INDICATORS[/]", box=box.ROUNDED, expand=True) ind_table = Table(title="[bold yellow]TECHNICAL INDICATORS[/]", box=box.ROUNDED, expand=True)
@ -168,7 +166,13 @@ class PingPongBot:
pos_table.add_row("Position Size", f"{p['size']}") pos_table.add_row("Position Size", f"{p['size']}")
pos_table.add_row("Entry Price", f"{p['avgPrice']}") pos_table.add_row("Entry Price", f"{p['avgPrice']}")
pos_table.add_row("Unrealized PnL", f"[{pnl_color}]${pnl:.2f}[/]") pos_table.add_row("Unrealized PnL", f"[{pnl_color}]${pnl:.2f}[/]")
pos_table.add_row("Liquidation", f"{p.get('liqPrice', 'N/A')}")
# Current Effective Leverage
last_price = self.current_indicators.get("price", 0)
if last_price > 0:
current_notional = float(p['size']) * last_price
current_lev = current_notional / max(self.wallet_balance, 1.0)
pos_table.add_row("Current Eff. Lev", f"{current_lev:.2f}x")
else: else:
pos_table.add_row("Position", "NONE (Scanning...)") pos_table.add_row("Position", "NONE (Scanning...)")
@ -184,7 +188,6 @@ class PingPongBot:
async def fetch_data(self): async def fetch_data(self):
"""Fetch latest Klines from Bybit V5""" """Fetch latest Klines from Bybit V5"""
try: try:
# We fetch 200 candles to ensure indicators stabilize
response = self.session.get_kline( response = self.session.get_kline(
category="linear", category="linear",
symbol=self.symbol, symbol=self.symbol,
@ -197,11 +200,11 @@ class PingPongBot:
return None return None
klines = response['result']['list'] klines = response['result']['list']
# Bybit returns newest first, we need oldest first
df = pd.DataFrame(klines, columns=['start_time', 'open', 'high', 'low', 'close', 'volume', 'turnover']) df = pd.DataFrame(klines, columns=['start_time', 'open', 'high', 'low', 'close', 'volume', 'turnover'])
df = df.astype(float) df = df.astype(float)
df = df.iloc[::-1].reset_index(drop=True) df = df.iloc[::-1].reset_index(drop=True)
self.current_indicators["price"] = df.iloc[-1]['close']
return self.calculate_indicators(df) return self.calculate_indicators(df)
except Exception as e: except Exception as e:
@ -212,7 +215,6 @@ class PingPongBot:
async def update_account_info(self): async def update_account_info(self):
"""Update position and balance information""" """Update position and balance information"""
try: try:
# Get Position (with settleCoin for Hedge Mode compatibility)
pos_response = self.session.get_positions( pos_response = self.session.get_positions(
category="linear", category="linear",
symbol=self.symbol, symbol=self.symbol,
@ -220,146 +222,120 @@ class PingPongBot:
) )
if pos_response['retCode'] == 0: if pos_response['retCode'] == 0:
positions = pos_response['result']['list'] active_pos = [p for p in pos_response['result']['list'] if float(p['size']) > 0]
active_pos = [p for p in positions if float(p['size']) > 0] self.position = active_pos[0] if active_pos else None
if active_pos:
self.position = active_pos[0]
else:
self.position = None
# Get Balance
wallet_response = self.session.get_wallet_balance( wallet_response = self.session.get_wallet_balance(
category="linear", category="linear", accountType="UNIFIED", coin="USDT"
accountType="UNIFIED",
coin="USDT"
) )
if wallet_response['retCode'] == 0: if wallet_response['retCode'] == 0:
result_list = wallet_response['result']['list'] result_list = wallet_response['result']['list']
if result_list: if result_list:
# Priority 1: totalWalletBalance (for UTA pooled funds)
self.wallet_balance = float(result_list[0].get('totalWalletBalance', 0)) self.wallet_balance = float(result_list[0].get('totalWalletBalance', 0))
# If totalWalletBalance is 0, check the specific coin
if self.wallet_balance == 0: if self.wallet_balance == 0:
coin_info = result_list[0].get('coin', []) coin_info = result_list[0].get('coin', [])
if coin_info: if coin_info:
self.wallet_balance = float(coin_info[0].get('walletBalance', 0)) self.wallet_balance = float(coin_info[0].get('walletBalance', 0))
else:
logger.error(f"Wallet API Error: {wallet_response['retMsg']}")
except Exception as e: except Exception as e:
logger.error(f"Error updating account info: {e}") logger.error(f"Error updating account info: {e}")
def check_signals(self, df): def check_signals(self, df):
"""Determine if we should Open or Close based on indicators""" """Strict Crossover Signal Logic matching STRATEGY_PING_PONG.md"""
if len(df) < 2: if len(df) < 2:
return None return None
last = df.iloc[-1] last = df.iloc[-1]
prev = df.iloc[-2] prev = df.iloc[-2]
rsi_cfg = self.config['rsi'] rsi_cfg = self.config['rsi']
hurst_cfg = self.config['hurst'] hurst_cfg = self.config['hurst']
open_signal = False open_signal = False
close_signal = False close_signal = False
# 1. RSI Signals # 1. RSI Crossovers
rsi_buy = prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold'] # BUY (Long): Crossed UP through oversold
rsi_sell = prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought'] rsi_buy_long = prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']
# SELL (Long): Crossed DOWN through overbought
rsi_sell_long = prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']
# 2. Hurst Signals # BUY (Short): Crossed DOWN through overbought
hurst_buy = prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'] rsi_buy_short = prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']
hurst_sell = prev['close'] > prev['hurst_upper'] and last['close'] <= last['hurst_upper'] # SELL (Short): Crossed UP through oversold
rsi_sell_short = prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']
# 2. Hurst Crossovers
# BUY (Long): Price crossed DOWN below lower band
hurst_buy_long = prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower']
# SELL (Long): Price crossed UP above upper band
hurst_sell_long = prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper']
# BUY (Short): Price crossed UP above upper band
hurst_buy_short = prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper']
# SELL (Short): Price crossed DOWN below lower band
hurst_sell_short = prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower']
# Logic for LONG
if self.direction == 'long': if self.direction == 'long':
if (rsi_cfg['enabled_for_open'] and rsi_buy) or (hurst_cfg['enabled_for_open'] and hurst_buy): if (rsi_cfg['enabled_for_open'] and rsi_buy_long) or (hurst_cfg['enabled_for_open'] and hurst_buy_long):
open_signal = True open_signal = True
if (rsi_cfg['enabled_for_close'] and rsi_sell) or (hurst_cfg['enabled_for_close'] and hurst_sell): if (rsi_cfg['enabled_for_close'] and rsi_sell_long) or (hurst_cfg['enabled_for_close'] and hurst_sell_long):
close_signal = True close_signal = True
# Logic for SHORT else: # Short
else: if (rsi_cfg['enabled_for_open'] and rsi_buy_short) or (hurst_cfg['enabled_for_open'] and hurst_buy_short):
if (rsi_cfg['enabled_for_open'] and rsi_sell) or (hurst_cfg['enabled_for_open'] and hurst_sell):
open_signal = True open_signal = True
if (rsi_cfg['enabled_for_close'] and rsi_buy) or (hurst_cfg['enabled_for_close'] and hurst_buy): if (rsi_cfg['enabled_for_close'] and rsi_sell_short) or (hurst_cfg['enabled_for_close'] and hurst_sell_short):
close_signal = True close_signal = True
return "open" if open_signal else ("close" if close_signal else None) return "open" if open_signal else ("close" if close_signal else None)
async def execute_trade_logic(self, df, signal): async def execute_trade_logic(self, df, signal):
"""Apply the Ping-Pong strategy logic (Accumulation + TP)""" """Execute Ping-Pong logic: Partial exits and Accumulation"""
last_price = float(df.iloc[-1]['close']) last_price = float(df.iloc[-1]['close'])
# 1. Check Take Profit (TP) # 1. Closing & Partial Exit
if self.position:
avg_price = float(self.position['avgPrice'])
current_qty = float(self.position['size'])
is_tp = False
if self.direction == 'long':
if last_price >= avg_price * (1 + self.tp_pct):
is_tp = True
else:
if last_price <= avg_price * (1 - self.tp_pct):
is_tp = True
if is_tp:
qty_to_close = current_qty * self.partial_exit_pct
remaining_qty = current_qty - qty_to_close
# Min size check
if (remaining_qty * last_price) < self.min_val_usd:
qty_to_close = current_qty
self.status_msg = "TP: Closing Full Position (Min Size reached)"
else:
self.status_msg = f"TP: Closing Partial {self.partial_exit_pct*100}%"
self.place_order(qty_to_close, last_price, is_close=True)
return
# 2. Check Close Signal
if signal == "close" and self.position: if signal == "close" and self.position:
current_qty = float(self.position['size']) current_qty = float(self.position['size'])
qty_to_close = current_qty * self.partial_exit_pct qty_to_close = current_qty * self.partial_exit_pct
if (current_qty - qty_to_close) * last_price < self.min_val_usd: remaining_qty = current_qty - qty_to_close
qty_to_close = current_qty
# Minimum Value Rule
if (remaining_qty * last_price) < self.min_val_usd:
qty_to_close = current_qty
self.status_msg = "Signal Close: Entire position (Min Value rule)"
else:
self.status_msg = f"Signal Close: Partial Exit {self.partial_exit_pct*100}%"
self.status_msg = "Signal: Closing Position (Partial/Full)"
self.place_order(qty_to_close, last_price, is_close=True) self.place_order(qty_to_close, last_price, is_close=True)
return return
# 3. Check Open/Accumulate Signal # 2. Opening & Accumulation
if signal == "open": if signal == "open":
# Check Max Effective Leverage
current_qty = float(self.position['size']) if self.position else 0 current_qty = float(self.position['size']) if self.position else 0
current_notional = current_qty * last_price current_notional = current_qty * last_price
entry_notional = self.pos_size_margin * self.leverage ping_notional = self.pos_size_margin * self.leverage
projected_notional = current_notional + entry_notional projected_notional = current_notional + ping_notional
# Risk Filter: Total Effective Leverage
effective_leverage = projected_notional / max(self.wallet_balance, 1.0) effective_leverage = projected_notional / max(self.wallet_balance, 1.0)
if effective_leverage <= self.max_eff_lev: if effective_leverage <= self.max_eff_lev:
qty_to_open = entry_notional / last_price qty_to_open = ping_notional / last_price
# Round qty based on symbol precision (simplified)
qty_to_open = round(qty_to_open, 3) qty_to_open = round(qty_to_open, 3)
self.status_msg = f"Signal: Opening/Accumulating {qty_to_open} units" self.status_msg = f"Signal Open: Accumulating {qty_to_open} units"
self.place_order(qty_to_open, last_price, is_close=False) self.place_order(qty_to_open, last_price, is_close=False)
else: else:
self.status_msg = f"Signal Ignored: Max Leverage {effective_leverage:.2f} > {self.max_eff_lev}" self.status_msg = f"Open Ignored: Max Lev Reached ({effective_leverage:.2f}x)"
def place_order(self, qty, price, is_close=False): def place_order(self, qty, price, is_close=False):
"""Send order to Bybit V5""" """Send Market Order to Bybit V5"""
side = "" side = ""
if self.direction == "long": if self.direction == "long":
side = "Sell" if is_close else "Buy" side = "Sell" if is_close else "Buy"
else: else:
side = "Buy" if is_close else "Sell" side = "Buy" if is_close else "Sell"
# Hedge Mode Index: 1 for Long, 2 for Short
pos_idx = 1 if self.direction == "long" else 2 pos_idx = 1 if self.direction == "long" else 2
try: try:
@ -386,23 +362,28 @@ class PingPongBot:
self.status_msg = f"Exec Error: {str(e)}" self.status_msg = f"Exec Error: {str(e)}"
async def run(self): async def run(self):
"""Main loop""" """Main loop with strict New Candle detection"""
logger.info(f"Bot started for {self.symbol} in {self.direction} mode") logger.info(f"Bot started for {self.symbol} in {self.direction} mode")
while True: while True:
# 1. Update Account # 1. Update Account & Market Data
await self.update_account_info() await self.update_account_info()
# 2. Fetch Data & Calculate Indicators
df = await self.fetch_data() df = await self.fetch_data()
if df is not None: if df is not None:
# 3. Strategy Logic current_candle_time = df.iloc[-1]['start_time']
signal = self.check_signals(df)
if signal: # 2. Strict Crossover Check on New Candle ONLY
logger.info(f"Signal detected: {signal} @ {df.iloc[-1]['close']}") if self.last_candle_time is not None and current_candle_time != self.last_candle_time:
await self.execute_trade_logic(df, signal) signal = self.check_signals(df)
if signal:
logger.info(f"CROSSOVER DETECTED: {signal.upper()} @ {df.iloc[-1]['close']}")
await self.execute_trade_logic(df, signal)
else:
self.status_msg = "New Candle: No Crossover"
self.last_candle_time = current_candle_time
# 4. Render Summary Dashboard # 3. Render Summary Dashboard
self.render_dashboard() self.render_dashboard()
await asyncio.sleep(self.config.get('loop_interval_seconds', 10)) await asyncio.sleep(self.config.get('loop_interval_seconds', 10))