diff --git a/requirements.txt b/requirements.txt index 3c4849c..d45f1d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ numpy pyyaml python-dotenv rich +asyncpg diff --git a/src/strategies/ping_pong_bot.py b/src/strategies/ping_pong_bot.py index 4cbf057..6212233 100644 --- a/src/strategies/ping_pong_bot.py +++ b/src/strategies/ping_pong_bot.py @@ -9,12 +9,14 @@ import asyncio import pandas as pd import numpy as np from datetime import datetime, timezone +from typing import List, Dict, Any, Optional from dotenv import load_dotenv from rich.console import Console from rich.table import Table from rich.panel import Panel from rich.layout import Layout from rich import box +import asyncpg # Try to import pybit try: @@ -37,14 +39,41 @@ logging.basicConfig( ) logger = logging.getLogger("PingPongBot") +class DatabaseManager: + """Minimal Database Manager for the bot""" + def __init__(self): + self.host = os.getenv('DB_HOST', '20.20.20.20') + self.port = int(os.getenv('DB_PORT', 5433)) + self.database = os.getenv('DB_NAME', 'btc_data') + self.user = os.getenv('DB_USER', 'btc_bot') + self.password = os.getenv('DB_PASSWORD', '') + self.pool = None + + async def connect(self): + self.pool = await asyncpg.create_pool( + host=self.host, port=self.port, user=self.user, + password=self.password, database=self.database + ) + logger.info("Connected to Database") + + async def get_candles(self, symbol: str, interval: str, limit: int = 100): + async with self.pool.acquire() as conn: + rows = await conn.fetch(''' + SELECT time, open, high, low, close, volume + FROM candles + WHERE symbol = $1 AND interval = $2 + ORDER BY time DESC LIMIT $3 + ''', symbol, interval, limit) + return [dict(r) for r in rows] + class PingPongBot: def __init__(self, config_path="config/ping_pong_config.yaml"): - self.version = "1.2.1" + self.version = "1.3.0" with open(config_path, 'r') as f: self.config = yaml.safe_load(f) - self.api_key = os.getenv("API_KEY") - self.api_secret = os.getenv("API_SECRET") + self.api_key = os.getenv("BYBIT_API_KEY") or os.getenv("API_KEY") + self.api_secret = os.getenv("BYBIT_API_SECRET") or os.getenv("API_SECRET") if not self.api_key or not self.api_secret: raise ValueError("API_KEY and API_SECRET must be set in .env file") @@ -55,12 +84,15 @@ class PingPongBot: api_secret=self.api_secret, ) + self.db = DatabaseManager() + self.symbol = self.config['symbol'] - self.interval = self.config['interval'] + self.interval = str(self.config['interval']) self.direction = self.config['direction'].lower() # State self.last_candle_time = None + self.last_candle_price = 0.0 self.current_indicators = { "rsi": {"value": 0.0, "timestamp": "N/A"}, "hurst_lower": {"value": 0.0, "timestamp": "N/A"}, @@ -68,63 +100,44 @@ class PingPongBot: } self.position = None self.wallet_balance = 0 + self.market_price = 0.0 self.status_msg = "Initializing..." self.last_signal = None self.start_time = datetime.now() self.console = Console() - # Grid parameters from config - self.tp_pct = self.config['take_profit_pct'] / 100.0 - self.partial_exit_pct = self.config['partial_exit_pct'] - self.min_val_usd = self.config['min_position_value_usd'] - self.pos_size_margin = self.config['pos_size_margin'] - self.leverage = self.config['exchange_leverage'] - self.max_eff_lev = self.config['max_effective_leverage'] + # Parameters + self.tp_pct = float(self.config.get('take_profit_pct', 1.5)) / 100.0 + self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15)) + self.min_val_usd = float(self.config.get('min_position_value_usd', 15.0)) + self.pos_size_margin = float(self.config.get('pos_size_margin', 20.0)) + self.leverage = float(self.config.get('exchange_leverage', 3.0)) + self.max_eff_lev = float(self.config.get('max_effective_leverage', 1.0)) def rma(self, series, length): - """Rolling Moving Average (Wilder's Smoothing) - matches Pine Script ta.rma""" alpha = 1 / length return series.ewm(alpha=alpha, adjust=False).mean() def calculate_indicators(self, df): - """Calculate RSI and Hurst Bands matching the JS/Dashboard implementation""" - # 1. RSI + # RSI rsi_cfg = self.config['rsi'] delta = df['close'].diff() - gain = (delta.where(delta > 0, 0)) - loss = (-delta.where(delta < 0, 0)) + gain = delta.where(delta > 0, 0) + loss = -delta.where(delta < 0, 0) + df['rsi'] = 100 - (100 / (1 + (self.rma(gain, rsi_cfg['period']) / self.rma(loss, rsi_cfg['period'])))) - avg_gain = self.rma(gain, rsi_cfg['period']) - avg_loss = self.rma(loss, rsi_cfg['period']) - - rs = avg_gain / avg_loss - df['rsi'] = 100 - (100 / (1 + rs)) - - # 2. Hurst Bands + # Hurst hurst_cfg = self.config['hurst'] - mcl_t = hurst_cfg['period'] - mcm = hurst_cfg['multiplier'] - - mcl = mcl_t / 2 + mcl = hurst_cfg['period'] / 2 mcl_2 = int(round(mcl / 2)) - - df['h_l'] = df['high'] - df['low'] - df['h_pc'] = abs(df['high'] - df['close'].shift(1)) - df['l_pc'] = abs(df['low'] - df['close'].shift(1)) - df['tr'] = df[['h_l', 'h_pc', 'l_pc']].max(axis=1) - + df['tr'] = df[['high', 'low', 'close']].apply(lambda x: max(x[0]-x[1], abs(x[0]-x[2]), abs(x[1]-x[2])), axis=1) df['ma_mcl'] = self.rma(df['close'], mcl) - df['max_tr'] = df['tr'].rolling(window=int(mcl)).max() # ATR proxy or just RMA(TR)? RMA(TR) is standard for ATR df['atr_mcl'] = self.rma(df['tr'], mcl) - - df['center'] = df['ma_mcl'].shift(mcl_2) - df['center'] = df['center'].fillna(df['ma_mcl']) - - mcm_off = mcm * df['atr_mcl'] + df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl']) + mcm_off = hurst_cfg['multiplier'] * df['atr_mcl'] df['hurst_upper'] = df['center'] + mcm_off df['hurst_lower'] = df['center'] - mcm_off - # Update current indicator state last_row = df.iloc[-1] now_str = datetime.now().strftime("%H:%M:%S") self.current_indicators["rsi"] = {"value": last_row['rsi'], "timestamp": now_str} @@ -133,267 +146,131 @@ class PingPongBot: return df - def render_dashboard(self): - """Render a clean summary of the bot state""" - # 1. Config Table - cfg_table = Table(title=f"[bold cyan]PING-PONG BOT CONFIG (v{self.version})[/]", box=box.ROUNDED, expand=True) - cfg_table.add_column("Property", style="dim") - cfg_table.add_column("Value", style="bold white") - cfg_table.add_row("Symbol", f"{self.symbol} ({self.interval}m)") - cfg_table.add_row("Direction", f"{self.direction.upper()}") - cfg_table.add_row("Margin/Ping", f"${self.pos_size_margin} (Lev: {self.leverage}x)") - cfg_table.add_row("Max Account Lev", f"{self.max_eff_lev}x") - cfg_table.add_row("Partial Exit", f"{self.partial_exit_pct*100}%") - - # 2. Indicators Table - ind_table = Table(title="[bold yellow]TECHNICAL INDICATORS[/]", box=box.ROUNDED, expand=True) - ind_table.add_column("Indicator", style="dim") - ind_table.add_column("Current Value", justify="right") - ind_table.add_column("Last Update", justify="center") - ind_table.add_row("Hurst Upper", f"{self.current_indicators['hurst_upper']['value']:.2f}", self.current_indicators['hurst_upper']['timestamp']) - ind_table.add_row("Hurst Lower", f"{self.current_indicators['hurst_lower']['value']:.2f}", self.current_indicators['hurst_lower']['timestamp']) - ind_table.add_row("RSI", f"{self.current_indicators['rsi']['value']:.2f}", self.current_indicators['rsi']['timestamp']) - - # 3. Position Table - pos_table = Table(title="[bold green]ACTIVE POSITION & ACCOUNT[/]", box=box.ROUNDED, expand=True) - pos_table.add_column("Property", style="dim") - pos_table.add_column("Value", style="bold white") - pos_table.add_row("Wallet Balance", f"${self.wallet_balance:.2f}") - - if self.position: - p = self.position - pnl = float(p.get('unrealisedPnl', 0)) - pnl_color = "green" if pnl >= 0 else "red" - pos_table.add_row("Position Size", f"{p['size']}") - pos_table.add_row("Entry Price", f"{p['avgPrice']}") - pos_table.add_row("Unrealized PnL", f"[{pnl_color}]${pnl:.2f}[/]") - - # Current Effective Leverage - last_price = self.current_indicators.get("price", 0) - if last_price > 0: - current_notional = float(p['size']) * last_price - current_lev = current_notional / max(self.wallet_balance, 1.0) - pos_table.add_row("Current Eff. Lev", f"{current_lev:.2f}x") - else: - pos_table.add_row("Position", "NONE (Scanning...)") - - pos_table.add_row("Last Signal", str(self.last_signal or "None")) - pos_table.add_row("Status", f"[bold blue]{self.status_msg}[/]") - - self.console.print("\n" + "="*50) - self.console.print(cfg_table) - self.console.print(ind_table) - self.console.print(pos_table) - self.console.print(f"[dim]Uptime: {str(datetime.now() - self.start_time).split('.')[0]} | Last Run: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}[/]\n") - - async def fetch_data(self): - """Fetch latest Klines from Bybit V5""" + async def update_exchange_data(self): + """Fetch Price, Balance, Position every 15s""" try: - response = self.session.get_kline( - category="linear", - symbol=self.symbol, - interval=self.interval, - limit=200 - ) + # 1. Price + ticker = self.session.get_tickers(category="linear", symbol=self.symbol) + if ticker['retCode'] == 0: + self.market_price = float(ticker['result']['list'][0]['lastPrice']) - if response['retCode'] != 0: - self.status_msg = f"API Error: {response['retMsg']}" - return None - - klines = response['result']['list'] - df = pd.DataFrame(klines, columns=['start_time', 'open', 'high', 'low', 'close', 'volume', 'turnover']) - df = df.astype(float) - df = df.iloc[::-1].reset_index(drop=True) - - self.current_indicators["price"] = df.iloc[-1]['close'] - return self.calculate_indicators(df) + # 2. Position + pos = self.session.get_positions(category="linear", symbol=self.symbol, settleCoin="USDT") + if pos['retCode'] == 0: + active = [p for p in pos['result']['list'] if float(p['size']) > 0] + self.position = active[0] if active else None + # 3. Balance + wallet = self.session.get_wallet_balance(category="linear", accountType="UNIFIED", coin="USDT") + if wallet['retCode'] == 0: + self.wallet_balance = float(wallet['result']['list'][0].get('totalWalletBalance', 0)) except Exception as e: - logger.error(f"Error fetching data: {e}") - self.status_msg = f"Fetch Error: {str(e)}" - return None - - async def update_account_info(self): - """Update position and balance information""" - try: - pos_response = self.session.get_positions( - category="linear", - symbol=self.symbol, - settleCoin="USDT" - ) - - if pos_response['retCode'] == 0: - active_pos = [p for p in pos_response['result']['list'] if float(p['size']) > 0] - self.position = active_pos[0] if active_pos else None - - wallet_response = self.session.get_wallet_balance( - category="linear", accountType="UNIFIED", coin="USDT" - ) - - if wallet_response['retCode'] == 0: - result_list = wallet_response['result']['list'] - if result_list: - self.wallet_balance = float(result_list[0].get('totalWalletBalance', 0)) - if self.wallet_balance == 0: - coin_info = result_list[0].get('coin', []) - if coin_info: - self.wallet_balance = float(coin_info[0].get('walletBalance', 0)) - except Exception as e: - logger.error(f"Error updating account info: {e}") + logger.error(f"Exchange Sync Error: {e}") def check_signals(self, df): - """Strict Crossover Signal Logic matching STRATEGY_PING_PONG.md""" - if len(df) < 2: - return None - - last = df.iloc[-1] - prev = df.iloc[-2] - rsi_cfg = self.config['rsi'] - hurst_cfg = self.config['hurst'] + last, prev = df.iloc[-1], df.iloc[-2] + rsi_cfg, hurst_cfg = self.config['rsi'], self.config['hurst'] - open_signal = False - close_signal = False - - # 1. RSI Crossovers - # BUY (Long): Crossed UP through oversold - rsi_buy_long = prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold'] - # SELL (Long): Crossed DOWN through overbought - rsi_sell_long = prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought'] - - # BUY (Short): Crossed DOWN through overbought - rsi_buy_short = prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought'] - # SELL (Short): Crossed UP through oversold - rsi_sell_short = prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold'] - - # 2. Hurst Crossovers - # BUY (Long): Price crossed DOWN below lower band - hurst_buy_long = prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'] - # SELL (Long): Price crossed UP above upper band - hurst_sell_long = prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'] - - # BUY (Short): Price crossed UP above upper band - hurst_buy_short = prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'] - # SELL (Short): Price crossed DOWN below lower band - hurst_sell_short = prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'] + # Long Signals + l_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \ + (hurst_cfg['enabled_for_open'] and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower']) + l_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \ + (hurst_cfg['enabled_for_close'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper']) + # Short Signals + s_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \ + (hurst_cfg['enabled_for_open'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper']) + s_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \ + (hurst_cfg['enabled_for_close'] and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower']) + if self.direction == 'long': - if (rsi_cfg['enabled_for_open'] and rsi_buy_long) or (hurst_cfg['enabled_for_open'] and hurst_buy_long): - open_signal = True - if (rsi_cfg['enabled_for_close'] and rsi_sell_long) or (hurst_cfg['enabled_for_close'] and hurst_sell_long): - close_signal = True - else: # Short - if (rsi_cfg['enabled_for_open'] and rsi_buy_short) or (hurst_cfg['enabled_for_open'] and hurst_buy_short): - open_signal = True - if (rsi_cfg['enabled_for_close'] and rsi_sell_short) or (hurst_cfg['enabled_for_close'] and hurst_sell_short): - close_signal = True - - return "open" if open_signal else ("close" if close_signal else None) - - async def execute_trade_logic(self, df, signal): - """Execute Ping-Pong logic: Partial exits and Accumulation""" - last_price = float(df.iloc[-1]['close']) - - # 1. Closing & Partial Exit - if signal == "close" and self.position: - current_qty = float(self.position['size']) - qty_to_close = current_qty * self.partial_exit_pct - remaining_qty = current_qty - qty_to_close - - # Minimum Value Rule - if (remaining_qty * last_price) < self.min_val_usd: - qty_to_close = current_qty - self.status_msg = "Signal Close: Entire position (Min Value rule)" - else: - self.status_msg = f"Signal Close: Partial Exit {self.partial_exit_pct*100}%" - - self.place_order(qty_to_close, last_price, is_close=True) - return - - # 2. Opening & Accumulation - if signal == "open": - current_qty = float(self.position['size']) if self.position else 0 - current_notional = current_qty * last_price - - ping_notional = self.pos_size_margin * self.leverage - projected_notional = current_notional + ping_notional - - # Risk Filter: Total Effective Leverage - effective_leverage = projected_notional / max(self.wallet_balance, 1.0) - - if effective_leverage <= self.max_eff_lev: - qty_to_open = ping_notional / last_price - qty_to_open = round(qty_to_open, 3) - - self.status_msg = f"Signal Open: Accumulating {qty_to_open} units" - self.place_order(qty_to_open, last_price, is_close=False) - else: - self.status_msg = f"Open Ignored: Max Lev Reached ({effective_leverage:.2f}x)" - - def place_order(self, qty, price, is_close=False): - """Send Market Order to Bybit V5""" - side = "" - if self.direction == "long": - side = "Sell" if is_close else "Buy" + return "open" if l_open else ("close" if l_close else None) else: - side = "Buy" if is_close else "Sell" + return "open" if s_open else ("close" if s_close else None) + + async def execute_trade(self, signal): + if not signal: return + last_price = self.market_price + + if signal == "close" and self.position: + qty = float(self.position['size']) * self.partial_exit_pct + if (float(self.position['size']) - qty) * last_price < self.min_val_usd: + qty = float(self.position['size']) + self.place_order(qty, is_close=True) - pos_idx = 1 if self.direction == "long" else 2 - - try: - response = self.session.place_order( - category="linear", - symbol=self.symbol, - side=side, - orderType="Market", - qty=str(qty), - timeInForce="GTC", - reduceOnly=is_close, - positionIdx=pos_idx - ) - - if response['retCode'] == 0: - logger.info(f"Order Placed: {side} {qty} {self.symbol}") - self.last_signal = f"{side} {qty} @ Market" + elif signal == "open": + cur_notional = float(self.position['size']) * last_price if self.position else 0 + ping_notional = self.pos_size_margin * self.leverage + if (cur_notional + ping_notional) / max(self.wallet_balance, 1) <= self.max_eff_lev: + self.place_order(ping_notional / last_price, is_close=False) else: - logger.error(f"Order Failed: {response['retMsg']}") - self.status_msg = f"Order Error: {response['retMsg']}" - + self.status_msg = "Max Leverage Reached" + + def place_order(self, qty, is_close=False): + side = "Sell" if (self.direction == "long" and is_close) or (self.direction == "short" and not is_close) else "Buy" + pos_idx = 1 if self.direction == "long" else 2 + try: + res = self.session.place_order( + category="linear", symbol=self.symbol, side=side, orderType="Market", + qty=str(round(qty, 3)), reduceOnly=is_close, positionIdx=pos_idx + ) + if res['retCode'] == 0: + self.last_signal = f"{side} {qty:.3f}" + self.status_msg = f"Order Success: {side}" + else: + self.status_msg = f"Order Error: {res['retMsg']}" except Exception as e: - logger.error(f"Execution Error: {e}") - self.status_msg = f"Exec Error: {str(e)}" + logger.error(f"Trade Error: {e}") + + def render_dashboard(self): + self.console.clear() + cfg_table = Table(title=f"PING-PONG BOT v{self.version} [{self.direction.upper()}]", box=box.ROUNDED, expand=True) + cfg_table.add_column("Property"); cfg_table.add_column("Value") + cfg_table.add_row("Symbol", self.symbol); cfg_table.add_row("Price", f"${self.market_price:.2f}") + cfg_table.add_row("Last Candle", f"{self.last_candle_time} (@${self.last_candle_price:.2f})") + + ind_table = Table(title="INDICATORS", box=box.ROUNDED, expand=True) + ind_table.add_column("Indicator"); ind_table.add_column("Value"); ind_table.add_column("Updated") + for k, v in self.current_indicators.items(): + if k != "price": ind_table.add_row(k.upper(), f"{v['value']:.2f}", v['timestamp']) + + pos_table = Table(title="POSITION", box=box.ROUNDED, expand=True) + pos_table.add_column("Wallet"); pos_table.add_column("Size"); pos_table.add_column("Entry"); pos_table.add_column("PnL") + if self.position: + pnl = float(self.position['unrealisedPnl']) + pos_table.add_row(f"${self.wallet_balance:.2f}", self.position['size'], self.position['avgPrice'], f"[bold {'green' if pnl>=0 else 'red'}]${pnl:.2f}") + else: + pos_table.add_row(f"${self.wallet_balance:.2f}", "0", "-", "-") + + self.console.print(cfg_table); self.console.print(ind_table); self.console.print(pos_table) + self.console.print(f"[dim]Status: {self.status_msg} | Signal: {self.last_signal}[/]") async def run(self): - """Main loop with strict New Candle detection""" - logger.info(f"Bot started for {self.symbol} in {self.direction} mode") + await self.db.connect() + last_exchange_update = 0 while True: - # 1. Update Account & Market Data - await self.update_account_info() - df = await self.fetch_data() + now = time.time() + # 1. Exchange Sync (15s) + if now - last_exchange_update >= 15: + await self.update_exchange_data() + last_exchange_update = now - if df is not None: - current_candle_time = df.iloc[-1]['start_time'] - - # 2. Strict Crossover Check on New Candle ONLY - if self.last_candle_time is not None and current_candle_time != self.last_candle_time: + # 2. DB Sync (5s) + candles = await self.db.get_candles(self.symbol, self.interval, limit=100) + if candles: + latest = candles[0] + if latest['time'] != self.last_candle_time: + df = pd.DataFrame(candles[::-1]) + df = self.calculate_indicators(df) signal = self.check_signals(df) - if signal: - logger.info(f"CROSSOVER DETECTED: {signal.upper()} @ {df.iloc[-1]['close']}") - await self.execute_trade_logic(df, signal) - else: - self.status_msg = "New Candle: No Crossover" - - self.last_candle_time = current_candle_time + if signal: await self.execute_trade(signal) + self.last_candle_time = latest['time'] + self.last_candle_price = latest['close'] + self.status_msg = f"New Candle processed: {latest['time']}" - # 3. Render Summary Dashboard self.render_dashboard() - - await asyncio.sleep(self.config.get('loop_interval_seconds', 10)) + await asyncio.sleep(5) if __name__ == "__main__": - try: - bot = PingPongBot() - asyncio.run(bot.run()) - except KeyboardInterrupt: - print("\nBot Stopped by User") - except Exception as e: - logger.exception("Critical Error in main loop") + bot = PingPongBot() + asyncio.run(bot.run())