From 2e901ac95e2a1105c6a7bc59ec17ff9b7ff9425a Mon Sep 17 00:00:00 2001 From: Gemini CLI Date: Thu, 5 Mar 2026 15:09:14 +0100 Subject: [PATCH] feat: refactor ping_pong_bot to use database for candles and sync crossover logic with dashboard --- config/ping_pong_config.yaml | 12 +- src/strategies/ping_pong_bot.py | 409 ++++++++++++++++++-------------- 2 files changed, 239 insertions(+), 182 deletions(-) diff --git a/config/ping_pong_config.yaml b/config/ping_pong_config.yaml index 9b6873a..5753e10 100644 --- a/config/ping_pong_config.yaml +++ b/config/ping_pong_config.yaml @@ -10,24 +10,24 @@ rsi: overbought: 70 oversold: 30 enabled_for_open: true - enabled_for_close: false + enabled_for_close: true hurst: period: 30 multiplier: 1.8 enabled_for_open: true - enabled_for_close: false + enabled_for_close: true # Strategy Settings direction: "long" # "long" or "short" capital: 1000.0 # Initial capital for calculations (informational) -exchange_leverage: 1.0 # Multiplier for each 'ping' size -max_effective_leverage: 5.0 # Cap on total position size relative to equity -pos_size_margin: 10.0 # Margin per 'ping' (USD) +exchange_leverage: 3.0 # Multiplier for each 'ping' size +max_effective_leverage: 1.0 # Cap on total position size relative to equity +pos_size_margin: 20.0 # Margin per 'ping' (USD) take_profit_pct: 1.5 # Target profit percentage per exit (1.5 = 1.5%) partial_exit_pct: 0.15 # 15% of position closed on each TP hit min_position_value_usd: 15.0 # Minimum remaining value to keep position open # Execution Settings -loop_interval_seconds: 5 # How often to check for new data +loop_interval_seconds: 10 # How often to check for new data debug_mode: false diff --git a/src/strategies/ping_pong_bot.py b/src/strategies/ping_pong_bot.py index 57724ed..167dc25 100644 --- a/src/strategies/ping_pong_bot.py +++ b/src/strategies/ping_pong_bot.py @@ -10,14 +10,24 @@ import pandas as pd import numpy as np from datetime import datetime, timezone from dotenv import load_dotenv +from rich.console import Console +from rich.table import Table +from rich.panel import Panel +from rich.layout import Layout +from rich import box -# Try to import pybit, if not available, we'll suggest installing it +# Try to import pybit try: from pybit.unified_trading import HTTP except ImportError: - print("Error: 'pybit' library not found. Please install it with: pip install pybit") + print("Error: 'pybit' library not found.") exit(1) +# Import DatabaseManager from the project +import sys +sys.path.append(os.path.join(os.getcwd(), 'src')) +from data_collector.database import DatabaseManager + # Load environment variables load_dotenv() log_level = os.getenv("LOG_LEVEL", "INFO") @@ -32,6 +42,8 @@ logging.basicConfig( ) logger = logging.getLogger("PingPongBot") +console = Console() + class PingPongBot: def __init__(self, config_path="config/ping_pong_config.yaml"): with open(config_path, 'r') as f: @@ -43,49 +55,75 @@ class PingPongBot: if not self.api_key or not self.api_secret: raise ValueError("API_KEY and API_SECRET must be set in .env file") + # Bybit Session self.session = HTTP( testnet=False, api_key=self.api_key, api_secret=self.api_secret, ) - self.symbol = self.config['symbol'] - self.interval = self.config['interval'] + # Database Manager + self.db = DatabaseManager( + host=os.getenv('DB_HOST', '20.20.20.20'), + port=int(os.getenv('DB_PORT', 5433)), + database=os.getenv('DB_NAME', 'btc_data'), + user=os.getenv('DB_USER', 'btc_bot'), + password=os.getenv('DB_PASSWORD', '') + ) + + # Strategy Direction & Category Logic self.direction = self.config['direction'].lower() + self.category = "inverse" if self.direction == "long" else "linear" + + # Symbol Adjustment + raw_symbol = self.config['symbol'].upper() + self.base_coin = raw_symbol.replace("USDT", "").replace("USD", "") + if self.category == "inverse": + self.symbol = f"{self.base_coin}USD" + else: + self.symbol = f"{self.base_coin}USDT" + + self.interval = str(self.config['interval']) # State - self.last_candle_time = None - self.current_indicators = {} + self.last_processed_candle = None + self.last_account_update = 0 + self.last_price_update = 0 + self.current_market_price = 0.0 + self.position = None self.wallet_balance = 0 self.status_msg = "Initializing..." self.last_signal = None self.start_time = datetime.now() - # Grid parameters from config - self.tp_pct = self.config['take_profit_pct'] / 100.0 - self.partial_exit_pct = self.config['partial_exit_pct'] - self.min_val_usd = self.config['min_position_value_usd'] - self.pos_size_margin = self.config['pos_size_margin'] - self.leverage = self.config['exchange_leverage'] - self.max_eff_lev = self.config['max_effective_leverage'] + # Ping-Pong Parameters (as per dashboard simulation) + self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15)) + self.min_val_usd = float(self.config.get('min_position_value_usd', 15.0)) + self.pos_size_margin = float(self.config.get('pos_size_margin', 10.0)) + self.leverage = float(self.config.get('exchange_leverage', 1.0)) + self.max_eff_lev = float(self.config.get('max_effective_leverage', 5.0)) + + # Indicator Values for Summary + self.indicator_data = { + "rsi": {"last": 0, "prev": 0, "signal": "-"}, + "hurst": {"last_l": 0, "last_u": 0, "prev_l": 0, "prev_u": 0, "signal": "-"} + } def rma(self, series, length): - """Rolling Moving Average (Wilder's Smoothing) - matches Pine Script ta.rma""" + """Rolling Moving Average (Wilder's Smoothing)""" alpha = 1 / length return series.ewm(alpha=alpha, adjust=False).mean() def calculate_indicators(self, df): - """Calculate RSI and Hurst Bands matching the JS/Dashboard implementation""" + """Calculate RSI and Hurst Bands""" # 1. RSI rsi_cfg = self.config['rsi'] delta = df['close'].diff() gain = (delta.where(delta > 0, 0)) loss = (-delta.where(delta < 0, 0)) - avg_gain = self.rma(gain, rsi_cfg['period']) avg_loss = self.rma(loss, rsi_cfg['period']) - rs = avg_gain / avg_loss df['rsi'] = 100 - (100 / (1 + rs)) @@ -93,23 +131,18 @@ class PingPongBot: hurst_cfg = self.config['hurst'] mcl_t = hurst_cfg['period'] mcm = hurst_cfg['multiplier'] - mcl = mcl_t / 2 mcl_2 = int(round(mcl / 2)) - # True Range df['h_l'] = df['high'] - df['low'] df['h_pc'] = abs(df['high'] - df['close'].shift(1)) df['l_pc'] = abs(df['low'] - df['close'].shift(1)) df['tr'] = df[['h_l', 'h_pc', 'l_pc']].max(axis=1) - # RMA of Close and ATR df['ma_mcl'] = self.rma(df['close'], mcl) df['atr_mcl'] = self.rma(df['tr'], mcl) - # Historical Offset df['center'] = df['ma_mcl'].shift(mcl_2) - # Fill first values where shift produces NaN with the MA itself (as done in JS: historical_ma || src) df['center'] = df['center'].fillna(df['ma_mcl']) mcm_off = mcm * df['atr_mcl'] @@ -118,79 +151,59 @@ class PingPongBot: return df - async def fetch_data(self): - """Fetch latest Klines from Bybit V5""" + async def fetch_db_data(self): + """Fetch last 100 candles from DB""" try: - # We fetch 200 candles to ensure indicators stabilize - response = self.session.get_kline( - category="linear", - symbol=self.symbol, - interval=self.interval, - limit=200 - ) + db_symbol = f"{self.base_coin}USDT" + candles = await self.db.get_candles(symbol=db_symbol, interval=self.interval, limit=100) - if response['retCode'] != 0: - self.status_msg = f"API Error: {response['retMsg']}" + if not candles: + self.status_msg = f"DB Error: No data for {db_symbol}" return None - klines = response['result']['list'] - # Bybit returns newest first, we need oldest first - df = pd.DataFrame(klines, columns=['start_time', 'open', 'high', 'low', 'close', 'volume', 'turnover']) - df = df.astype(float) - df = df.iloc[::-1].reset_index(drop=True) - + df = pd.DataFrame(candles) + df = df.sort_values('time').reset_index(drop=True) return self.calculate_indicators(df) except Exception as e: - logger.error(f"Error fetching data: {e}") - self.status_msg = f"Fetch Error: {str(e)}" + logger.error(f"Error fetching DB data: {e}") return None - async def update_account_info(self): - """Update position and balance information""" + async def update_market_price(self): + """Fetch current price from exchange every 15s""" try: - # Get Position - pos_response = self.session.get_positions( - category="linear", - symbol=self.symbol - ) - + response = self.session.get_tickers(category=self.category, symbol=self.symbol) + if response['retCode'] == 0: + self.current_market_price = float(response['result']['list'][0]['lastPrice']) + except Exception as e: + logger.error(f"Error updating market price: {e}") + + async def update_account_info(self): + """Update position and balance""" + try: + pos_response = self.session.get_positions(category=self.category, symbol=self.symbol) if pos_response['retCode'] == 0: - positions = pos_response['result']['list'] - active_pos = [p for p in positions if float(p['size']) > 0] - if active_pos: - self.position = active_pos[0] - else: - self.position = None + active_pos = [p for p in pos_response['result']['list'] if float(p['size']) > 0] + self.position = active_pos[0] if active_pos else None - # Get Balance + target_coin = "USDT" if self.category == "linear" else self.base_coin wallet_response = self.session.get_wallet_balance( - category="linear", - accountType="UNIFIED", - coin="USDT" + category=self.category, accountType="UNIFIED", coin=target_coin ) if wallet_response['retCode'] == 0: result_list = wallet_response['result']['list'] if result_list: - # Priority 1: totalWalletBalance (for UTA pooled funds) self.wallet_balance = float(result_list[0].get('totalWalletBalance', 0)) - - # If totalWalletBalance is 0, check the specific coin if self.wallet_balance == 0: - coin_info = result_list[0].get('coin', []) - if coin_info: - self.wallet_balance = float(coin_info[0].get('walletBalance', 0)) - else: - logger.error(f"Wallet API Error: {wallet_response['retMsg']}") - + self.wallet_balance = float(result_list[0].get('totalEquity', 0)) except Exception as e: logger.error(f"Error updating account info: {e}") def check_signals(self, df): - """Determine if we should Open or Close based on indicators""" + """Strict Crossover Signal Logic matching Dashboard""" if len(df) < 2: - return None + return None, {} last = df.iloc[-1] prev = df.iloc[-2] @@ -198,97 +211,97 @@ class PingPongBot: rsi_cfg = self.config['rsi'] hurst_cfg = self.config['hurst'] - open_signal = False - close_signal = False + signals = {"rsi": None, "hurst": None} - # 1. RSI Signals - rsi_buy = prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold'] - rsi_sell = prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought'] + # 1. RSI Crossovers + # BUY: Crossed UP through oversold + if prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']: + signals["rsi"] = "BUY" + # SELL: Crossed DOWN through overbought + elif prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']: + signals["rsi"] = "SELL" + + # 2. Hurst Crossovers + # BUY: Price crossed DOWN below lower band + if prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower']: + signals["hurst"] = "BUY" + # SELL: Price crossed UP above upper band + elif prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper']: + signals["hurst"] = "SELL" + + # Store for summary + self.indicator_data["rsi"] = {"last": last['rsi'], "prev": prev['rsi'], "signal": signals["rsi"] or "-"} + self.indicator_data["hurst"] = { + "last_l": last['hurst_lower'], "last_u": last['hurst_upper'], + "prev_l": prev['hurst_lower'], "prev_u": prev['hurst_upper'], + "signal": signals["hurst"] or "-" + } - # 2. Hurst Signals - hurst_buy = prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'] - hurst_sell = prev['close'] > prev['hurst_upper'] and last['close'] <= last['hurst_upper'] - - # Logic for LONG + final_signal = None + # Ping-Pong Strategy logic if self.direction == 'long': - if (rsi_cfg['enabled_for_open'] and rsi_buy) or (hurst_cfg['enabled_for_open'] and hurst_buy): - open_signal = True - if (rsi_cfg['enabled_for_close'] and rsi_sell) or (hurst_cfg['enabled_for_close'] and hurst_sell): - close_signal = True - # Logic for SHORT - else: - if (rsi_cfg['enabled_for_open'] and rsi_sell) or (hurst_cfg['enabled_for_open'] and hurst_sell): - open_signal = True - if (rsi_cfg['enabled_for_close'] and rsi_buy) or (hurst_cfg['enabled_for_close'] and hurst_buy): - close_signal = True + # Accumulate on ANY buy signal + if (rsi_cfg['enabled_for_open'] and signals["rsi"] == "BUY") or (hurst_cfg['enabled_for_open'] and signals["hurst"] == "BUY"): + final_signal = "open" + # Offload on ANY sell signal + elif (rsi_cfg['enabled_for_close'] and signals["rsi"] == "SELL") or (hurst_cfg['enabled_for_close'] and signals["hurst"] == "SELL"): + final_signal = "close" + else: # Short + # Short Open on SELL signals + if (rsi_cfg['enabled_for_open'] and signals["rsi"] == "SELL") or (hurst_cfg['enabled_for_open'] and signals["hurst"] == "SELL"): + final_signal = "open" + # Short Close on BUY signals + elif (rsi_cfg['enabled_for_close'] and signals["rsi"] == "BUY") or (hurst_cfg['enabled_for_close'] and signals["hurst"] == "BUY"): + final_signal = "close" - return "open" if open_signal else ("close" if close_signal else None) + return final_signal, signals - async def execute_trade_logic(self, df, signal): - """Apply the Ping-Pong strategy logic (Accumulation + TP)""" + async def execute_trade_logic(self, df, final_signal): + """Execute Ping-Pong logic: Partial exits on 'close' signals""" last_price = float(df.iloc[-1]['close']) - # 1. Check Take Profit (TP) - if self.position: - avg_price = float(self.position['avgPrice']) - current_qty = float(self.position['size']) - - is_tp = False - if self.direction == 'long': - if last_price >= avg_price * (1 + self.tp_pct): - is_tp = True - else: - if last_price <= avg_price * (1 - self.tp_pct): - is_tp = True - - if is_tp: - qty_to_close = current_qty * self.partial_exit_pct - remaining_qty = current_qty - qty_to_close - - # Min size check - if (remaining_qty * last_price) < self.min_val_usd: - qty_to_close = current_qty - self.status_msg = "TP: Closing Full Position (Min Size reached)" - else: - self.status_msg = f"TP: Closing Partial {self.partial_exit_pct*100}%" - - self.place_order(qty_to_close, last_price, is_close=True) - return - - # 2. Check Close Signal - if signal == "close" and self.position: + # 1. Close/Partial Exit + if final_signal == "close" and self.position: current_qty = float(self.position['size']) qty_to_close = current_qty * self.partial_exit_pct - if (current_qty - qty_to_close) * last_price < self.min_val_usd: - qty_to_close = current_qty + remaining_qty = current_qty - qty_to_close - self.status_msg = "Signal: Closing Position (Partial/Full)" - self.place_order(qty_to_close, last_price, is_close=True) + # Check remaining value in USD + remaining_val_usd = remaining_qty if self.category == "inverse" else remaining_qty * last_price + + if remaining_val_usd < self.min_val_usd: + # Close Full + self.status_msg = f"Ping-Pong: Closing Full Position ({current_qty})" + await self.place_order(current_qty, last_price, is_close=True) + else: + # Close Partial (15%) + self.status_msg = f"Ping-Pong: Partial Exit ({qty_to_close:.3f})" + await self.place_order(qty_to_close, last_price, is_close=True) return - # 3. Check Open/Accumulate Signal - if signal == "open": - # Check Max Effective Leverage + # 2. Open/Accumulate + if final_signal == "open": current_qty = float(self.position['size']) if self.position else 0 - current_notional = current_qty * last_price + if self.category == "inverse": + entry_notional = self.pos_size_margin * self.leverage + qty_to_open = int(entry_notional) + current_notional = current_qty + else: + entry_notional = self.pos_size_margin * self.leverage + qty_to_open = round(entry_notional / last_price, 3) + current_notional = current_qty * last_price - entry_notional = self.pos_size_margin * self.leverage projected_notional = current_notional + entry_notional - effective_leverage = projected_notional / max(self.wallet_balance, 1.0) if effective_leverage <= self.max_eff_lev: - qty_to_open = entry_notional / last_price - # Round qty based on symbol precision (simplified) - qty_to_open = round(qty_to_open, 3) - - self.status_msg = f"Signal: Opening/Accumulating {qty_to_open} units" - self.place_order(qty_to_open, last_price, is_close=False) + self.status_msg = f"Ping-Pong: Accumulating {qty_to_open}" + await self.place_order(qty_to_open, last_price, is_close=False) else: - self.status_msg = f"Signal Ignored: Max Leverage {effective_leverage:.2f} > {self.max_eff_lev}" + self.status_msg = f"Max Leverage reached: {effective_leverage:.2f}" - def place_order(self, qty, price, is_close=False): - """Send order to Bybit V5""" + async def place_order(self, qty, price, is_close=False): + """Send Market Order""" side = "" if self.direction == "long": side = "Sell" if is_close else "Buy" @@ -296,61 +309,105 @@ class PingPongBot: side = "Buy" if is_close else "Sell" try: + # Round qty based on Bybit standards (Inverse: integer USD, Linear: BTC precision) + if self.category == "inverse": + qty_str = str(int(float(qty))) + else: + qty_str = f"{float(qty):.3f}" + response = self.session.place_order( - category="linear", - symbol=self.symbol, - side=side, - orderType="Market", - qty=str(qty), - timeInForce="GTC", - reduceOnly=is_close + category=self.category, symbol=self.symbol, side=side, + orderType="Market", qty=qty_str, timeInForce="GTC", reduceOnly=is_close ) - if response['retCode'] == 0: - logger.info(f"Order Placed: {side} {qty} {self.symbol}") - self.last_signal = f"{side} {qty} @ Market" + logger.info(f"Order Placed: {side} {qty_str} {self.symbol}") + self.last_signal = f"{side} {qty_str} @ Market" else: logger.error(f"Order Failed: {response['retMsg']}") self.status_msg = f"Order Error: {response['retMsg']}" - except Exception as e: logger.error(f"Execution Error: {e}") - self.status_msg = f"Exec Error: {str(e)}" + + def log_summary(self): + """Display summary table""" + title = f"PING-PONG BOT: {self.symbol} [{self.category.upper()}] ({self.direction.upper()})" + acc_table = Table(title=title, box=box.ROUNDED, expand=True) + acc_table.add_column("Property", style="cyan") + acc_table.add_column("Value", style="white") + + acc_table.add_row("Exchange Price", f"{self.current_market_price:.2f}") + acc_table.add_row("Wallet Balance", f"{self.wallet_balance:.2f} USD") + + if self.position: + acc_table.add_row("Position Size", f"{self.position['size']}") + acc_table.add_row("Avg Entry", f"{self.position['avgPrice']}") + acc_table.add_row("Unrealized PnL", f"{self.position['unrealisedPnl']} USDT") + else: + acc_table.add_row("Position", "None") + + acc_table.add_row("Last Action", f"{self.last_signal or 'None'}") + acc_table.add_row("Status", f"{self.status_msg}") + + ind_table = Table(title=f"INDICATORS (Timeframe: {self.interval}m | Source: DB)", box=box.ROUNDED, expand=True) + ind_table.add_column("Indicator", style="cyan") + ind_table.add_column("Value", style="white") + ind_table.add_column("Crossover Signal", style="bold yellow") + + rsi = self.indicator_data["rsi"] + ind_table.add_row("RSI", f"{rsi['last']:.2f}", rsi["signal"]) + + h = self.indicator_data["hurst"] + ind_table.add_row("Hurst Lower", f"{h['last_l']:.2f}", h["signal"] if h["signal"] == "BUY" else "-") + ind_table.add_row("Hurst Upper", f"{h['last_u']:.2f}", h["signal"] if h["signal"] == "SELL" else "-") + + console.print("\n") + console.print(acc_table) + console.print(ind_table) + console.print(f"--- Updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---\n") async def run(self): - """Main loop""" - logger.info(f"Bot started for {self.symbol} in {self.direction} mode") + """Refactored loop: DB polling every 5s, Price polling every 15s""" + logger.info(f"Bot started: {self.symbol} | Ping-Pong Logic (Partial Exits)") + await self.db.connect() + while True: - # 1. Update Account - await self.update_account_info() + now = time.time() - # 2. Fetch Data & Calculate Indicators - df = await self.fetch_data() + # 1. Update Market Price (every 15s) + if now - self.last_price_update >= 15: + await self.update_market_price() + await self.update_account_info() + self.last_price_update = now + self.log_summary() + # 2. Check DB for New Data (every 5s) + df = await self.fetch_db_data() if df is not None: - # 3. Check for New Candle (for signal processing) - last_price = float(df.iloc[-1]['close']) + latest_candle_time = df.iloc[-1]['time'] - # 4. Strategy Logic - signal = self.check_signals(df) - if signal: - logger.info(f"Signal detected: {signal} @ {last_price}") - await self.execute_trade_logic(df, signal) - - # 5. Simple status log - if self.position: - logger.info(f"Price: {last_price:.2f} | Pos: {self.position['size']} @ {self.position['avgPrice']} | Wallet: {self.wallet_balance:.2f}") - else: - logger.info(f"Price: {last_price:.2f} | No Position | Wallet: {self.wallet_balance:.2f}") + # 3. New Candle Logic + if latest_candle_time != self.last_processed_candle: + self.last_processed_candle = latest_candle_time + + # 4. Recalculate Indicators and Check Signals + final_signal, _ = self.check_signals(df) + + # 5. Execute Trade on Crossover + if final_signal: + logger.info(f"CROSSOVER SIGNAL: {final_signal.upper()}") + await self.execute_trade_logic(df, final_signal) + else: + self.status_msg = "Scanning (Wait for Crossover)" + + self.log_summary() - await asyncio.sleep(self.config.get('loop_interval_seconds', 5)) + await asyncio.sleep(5) if __name__ == "__main__": try: bot = PingPongBot() asyncio.run(bot.run()) except KeyboardInterrupt: - print("\nBot Stopped by User") + print("\nBot Stopped") except Exception as e: - print(f"\nCritical Error: {e}") - logger.exception("Critical Error in main loop") + logger.exception(f"Critical Error: {e}")