feat: refactor bot to use DB for candles with 5s/15s polling sync (v1.3.0)

This commit is contained in:
Gemini CLI
2026-03-05 22:11:17 +01:00
parent 9ffcf4c8c4
commit 7c6faebead
2 changed files with 160 additions and 282 deletions

View File

@ -9,12 +9,14 @@ import asyncio
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.layout import Layout
from rich import box
import asyncpg
# Try to import pybit
try:
@ -37,14 +39,41 @@ logging.basicConfig(
)
logger = logging.getLogger("PingPongBot")
class DatabaseManager:
"""Minimal Database Manager for the bot"""
def __init__(self):
self.host = os.getenv('DB_HOST', '20.20.20.20')
self.port = int(os.getenv('DB_PORT', 5433))
self.database = os.getenv('DB_NAME', 'btc_data')
self.user = os.getenv('DB_USER', 'btc_bot')
self.password = os.getenv('DB_PASSWORD', '')
self.pool = None
async def connect(self):
self.pool = await asyncpg.create_pool(
host=self.host, port=self.port, user=self.user,
password=self.password, database=self.database
)
logger.info("Connected to Database")
async def get_candles(self, symbol: str, interval: str, limit: int = 100):
async with self.pool.acquire() as conn:
rows = await conn.fetch('''
SELECT time, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2
ORDER BY time DESC LIMIT $3
''', symbol, interval, limit)
return [dict(r) for r in rows]
class PingPongBot:
def __init__(self, config_path="config/ping_pong_config.yaml"):
self.version = "1.2.1"
self.version = "1.3.0"
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.api_key = os.getenv("API_KEY")
self.api_secret = os.getenv("API_SECRET")
self.api_key = os.getenv("BYBIT_API_KEY") or os.getenv("API_KEY")
self.api_secret = os.getenv("BYBIT_API_SECRET") or os.getenv("API_SECRET")
if not self.api_key or not self.api_secret:
raise ValueError("API_KEY and API_SECRET must be set in .env file")
@ -55,12 +84,15 @@ class PingPongBot:
api_secret=self.api_secret,
)
self.db = DatabaseManager()
self.symbol = self.config['symbol']
self.interval = self.config['interval']
self.interval = str(self.config['interval'])
self.direction = self.config['direction'].lower()
# State
self.last_candle_time = None
self.last_candle_price = 0.0
self.current_indicators = {
"rsi": {"value": 0.0, "timestamp": "N/A"},
"hurst_lower": {"value": 0.0, "timestamp": "N/A"},
@ -68,63 +100,44 @@ class PingPongBot:
}
self.position = None
self.wallet_balance = 0
self.market_price = 0.0
self.status_msg = "Initializing..."
self.last_signal = None
self.start_time = datetime.now()
self.console = Console()
# Grid parameters from config
self.tp_pct = self.config['take_profit_pct'] / 100.0
self.partial_exit_pct = self.config['partial_exit_pct']
self.min_val_usd = self.config['min_position_value_usd']
self.pos_size_margin = self.config['pos_size_margin']
self.leverage = self.config['exchange_leverage']
self.max_eff_lev = self.config['max_effective_leverage']
# Parameters
self.tp_pct = float(self.config.get('take_profit_pct', 1.5)) / 100.0
self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15))
self.min_val_usd = float(self.config.get('min_position_value_usd', 15.0))
self.pos_size_margin = float(self.config.get('pos_size_margin', 20.0))
self.leverage = float(self.config.get('exchange_leverage', 3.0))
self.max_eff_lev = float(self.config.get('max_effective_leverage', 1.0))
def rma(self, series, length):
"""Rolling Moving Average (Wilder's Smoothing) - matches Pine Script ta.rma"""
alpha = 1 / length
return series.ewm(alpha=alpha, adjust=False).mean()
def calculate_indicators(self, df):
"""Calculate RSI and Hurst Bands matching the JS/Dashboard implementation"""
# 1. RSI
# RSI
rsi_cfg = self.config['rsi']
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0))
loss = (-delta.where(delta < 0, 0))
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
df['rsi'] = 100 - (100 / (1 + (self.rma(gain, rsi_cfg['period']) / self.rma(loss, rsi_cfg['period']))))
avg_gain = self.rma(gain, rsi_cfg['period'])
avg_loss = self.rma(loss, rsi_cfg['period'])
rs = avg_gain / avg_loss
df['rsi'] = 100 - (100 / (1 + rs))
# 2. Hurst Bands
# Hurst
hurst_cfg = self.config['hurst']
mcl_t = hurst_cfg['period']
mcm = hurst_cfg['multiplier']
mcl = mcl_t / 2
mcl = hurst_cfg['period'] / 2
mcl_2 = int(round(mcl / 2))
df['h_l'] = df['high'] - df['low']
df['h_pc'] = abs(df['high'] - df['close'].shift(1))
df['l_pc'] = abs(df['low'] - df['close'].shift(1))
df['tr'] = df[['h_l', 'h_pc', 'l_pc']].max(axis=1)
df['tr'] = df[['high', 'low', 'close']].apply(lambda x: max(x[0]-x[1], abs(x[0]-x[2]), abs(x[1]-x[2])), axis=1)
df['ma_mcl'] = self.rma(df['close'], mcl)
df['max_tr'] = df['tr'].rolling(window=int(mcl)).max() # ATR proxy or just RMA(TR)? RMA(TR) is standard for ATR
df['atr_mcl'] = self.rma(df['tr'], mcl)
df['center'] = df['ma_mcl'].shift(mcl_2)
df['center'] = df['center'].fillna(df['ma_mcl'])
mcm_off = mcm * df['atr_mcl']
df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl'])
mcm_off = hurst_cfg['multiplier'] * df['atr_mcl']
df['hurst_upper'] = df['center'] + mcm_off
df['hurst_lower'] = df['center'] - mcm_off
# Update current indicator state
last_row = df.iloc[-1]
now_str = datetime.now().strftime("%H:%M:%S")
self.current_indicators["rsi"] = {"value": last_row['rsi'], "timestamp": now_str}
@ -133,267 +146,131 @@ class PingPongBot:
return df
def render_dashboard(self):
"""Render a clean summary of the bot state"""
# 1. Config Table
cfg_table = Table(title=f"[bold cyan]PING-PONG BOT CONFIG (v{self.version})[/]", box=box.ROUNDED, expand=True)
cfg_table.add_column("Property", style="dim")
cfg_table.add_column("Value", style="bold white")
cfg_table.add_row("Symbol", f"{self.symbol} ({self.interval}m)")
cfg_table.add_row("Direction", f"{self.direction.upper()}")
cfg_table.add_row("Margin/Ping", f"${self.pos_size_margin} (Lev: {self.leverage}x)")
cfg_table.add_row("Max Account Lev", f"{self.max_eff_lev}x")
cfg_table.add_row("Partial Exit", f"{self.partial_exit_pct*100}%")
# 2. Indicators Table
ind_table = Table(title="[bold yellow]TECHNICAL INDICATORS[/]", box=box.ROUNDED, expand=True)
ind_table.add_column("Indicator", style="dim")
ind_table.add_column("Current Value", justify="right")
ind_table.add_column("Last Update", justify="center")
ind_table.add_row("Hurst Upper", f"{self.current_indicators['hurst_upper']['value']:.2f}", self.current_indicators['hurst_upper']['timestamp'])
ind_table.add_row("Hurst Lower", f"{self.current_indicators['hurst_lower']['value']:.2f}", self.current_indicators['hurst_lower']['timestamp'])
ind_table.add_row("RSI", f"{self.current_indicators['rsi']['value']:.2f}", self.current_indicators['rsi']['timestamp'])
# 3. Position Table
pos_table = Table(title="[bold green]ACTIVE POSITION & ACCOUNT[/]", box=box.ROUNDED, expand=True)
pos_table.add_column("Property", style="dim")
pos_table.add_column("Value", style="bold white")
pos_table.add_row("Wallet Balance", f"${self.wallet_balance:.2f}")
if self.position:
p = self.position
pnl = float(p.get('unrealisedPnl', 0))
pnl_color = "green" if pnl >= 0 else "red"
pos_table.add_row("Position Size", f"{p['size']}")
pos_table.add_row("Entry Price", f"{p['avgPrice']}")
pos_table.add_row("Unrealized PnL", f"[{pnl_color}]${pnl:.2f}[/]")
# Current Effective Leverage
last_price = self.current_indicators.get("price", 0)
if last_price > 0:
current_notional = float(p['size']) * last_price
current_lev = current_notional / max(self.wallet_balance, 1.0)
pos_table.add_row("Current Eff. Lev", f"{current_lev:.2f}x")
else:
pos_table.add_row("Position", "NONE (Scanning...)")
pos_table.add_row("Last Signal", str(self.last_signal or "None"))
pos_table.add_row("Status", f"[bold blue]{self.status_msg}[/]")
self.console.print("\n" + "="*50)
self.console.print(cfg_table)
self.console.print(ind_table)
self.console.print(pos_table)
self.console.print(f"[dim]Uptime: {str(datetime.now() - self.start_time).split('.')[0]} | Last Run: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}[/]\n")
async def fetch_data(self):
"""Fetch latest Klines from Bybit V5"""
async def update_exchange_data(self):
"""Fetch Price, Balance, Position every 15s"""
try:
response = self.session.get_kline(
category="linear",
symbol=self.symbol,
interval=self.interval,
limit=200
)
# 1. Price
ticker = self.session.get_tickers(category="linear", symbol=self.symbol)
if ticker['retCode'] == 0:
self.market_price = float(ticker['result']['list'][0]['lastPrice'])
if response['retCode'] != 0:
self.status_msg = f"API Error: {response['retMsg']}"
return None
klines = response['result']['list']
df = pd.DataFrame(klines, columns=['start_time', 'open', 'high', 'low', 'close', 'volume', 'turnover'])
df = df.astype(float)
df = df.iloc[::-1].reset_index(drop=True)
self.current_indicators["price"] = df.iloc[-1]['close']
return self.calculate_indicators(df)
# 2. Position
pos = self.session.get_positions(category="linear", symbol=self.symbol, settleCoin="USDT")
if pos['retCode'] == 0:
active = [p for p in pos['result']['list'] if float(p['size']) > 0]
self.position = active[0] if active else None
# 3. Balance
wallet = self.session.get_wallet_balance(category="linear", accountType="UNIFIED", coin="USDT")
if wallet['retCode'] == 0:
self.wallet_balance = float(wallet['result']['list'][0].get('totalWalletBalance', 0))
except Exception as e:
logger.error(f"Error fetching data: {e}")
self.status_msg = f"Fetch Error: {str(e)}"
return None
async def update_account_info(self):
"""Update position and balance information"""
try:
pos_response = self.session.get_positions(
category="linear",
symbol=self.symbol,
settleCoin="USDT"
)
if pos_response['retCode'] == 0:
active_pos = [p for p in pos_response['result']['list'] if float(p['size']) > 0]
self.position = active_pos[0] if active_pos else None
wallet_response = self.session.get_wallet_balance(
category="linear", accountType="UNIFIED", coin="USDT"
)
if wallet_response['retCode'] == 0:
result_list = wallet_response['result']['list']
if result_list:
self.wallet_balance = float(result_list[0].get('totalWalletBalance', 0))
if self.wallet_balance == 0:
coin_info = result_list[0].get('coin', [])
if coin_info:
self.wallet_balance = float(coin_info[0].get('walletBalance', 0))
except Exception as e:
logger.error(f"Error updating account info: {e}")
logger.error(f"Exchange Sync Error: {e}")
def check_signals(self, df):
"""Strict Crossover Signal Logic matching STRATEGY_PING_PONG.md"""
if len(df) < 2:
return None
last = df.iloc[-1]
prev = df.iloc[-2]
rsi_cfg = self.config['rsi']
hurst_cfg = self.config['hurst']
last, prev = df.iloc[-1], df.iloc[-2]
rsi_cfg, hurst_cfg = self.config['rsi'], self.config['hurst']
open_signal = False
close_signal = False
# 1. RSI Crossovers
# BUY (Long): Crossed UP through oversold
rsi_buy_long = prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']
# SELL (Long): Crossed DOWN through overbought
rsi_sell_long = prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']
# BUY (Short): Crossed DOWN through overbought
rsi_buy_short = prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']
# SELL (Short): Crossed UP through oversold
rsi_sell_short = prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']
# 2. Hurst Crossovers
# BUY (Long): Price crossed DOWN below lower band
hurst_buy_long = prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower']
# SELL (Long): Price crossed UP above upper band
hurst_sell_long = prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper']
# BUY (Short): Price crossed UP above upper band
hurst_buy_short = prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper']
# SELL (Short): Price crossed DOWN below lower band
hurst_sell_short = prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower']
# Long Signals
l_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \
(hurst_cfg['enabled_for_open'] and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'])
l_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \
(hurst_cfg['enabled_for_close'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
# Short Signals
s_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \
(hurst_cfg['enabled_for_open'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
s_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \
(hurst_cfg['enabled_for_close'] and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'])
if self.direction == 'long':
if (rsi_cfg['enabled_for_open'] and rsi_buy_long) or (hurst_cfg['enabled_for_open'] and hurst_buy_long):
open_signal = True
if (rsi_cfg['enabled_for_close'] and rsi_sell_long) or (hurst_cfg['enabled_for_close'] and hurst_sell_long):
close_signal = True
else: # Short
if (rsi_cfg['enabled_for_open'] and rsi_buy_short) or (hurst_cfg['enabled_for_open'] and hurst_buy_short):
open_signal = True
if (rsi_cfg['enabled_for_close'] and rsi_sell_short) or (hurst_cfg['enabled_for_close'] and hurst_sell_short):
close_signal = True
return "open" if open_signal else ("close" if close_signal else None)
async def execute_trade_logic(self, df, signal):
"""Execute Ping-Pong logic: Partial exits and Accumulation"""
last_price = float(df.iloc[-1]['close'])
# 1. Closing & Partial Exit
if signal == "close" and self.position:
current_qty = float(self.position['size'])
qty_to_close = current_qty * self.partial_exit_pct
remaining_qty = current_qty - qty_to_close
# Minimum Value Rule
if (remaining_qty * last_price) < self.min_val_usd:
qty_to_close = current_qty
self.status_msg = "Signal Close: Entire position (Min Value rule)"
else:
self.status_msg = f"Signal Close: Partial Exit {self.partial_exit_pct*100}%"
self.place_order(qty_to_close, last_price, is_close=True)
return
# 2. Opening & Accumulation
if signal == "open":
current_qty = float(self.position['size']) if self.position else 0
current_notional = current_qty * last_price
ping_notional = self.pos_size_margin * self.leverage
projected_notional = current_notional + ping_notional
# Risk Filter: Total Effective Leverage
effective_leverage = projected_notional / max(self.wallet_balance, 1.0)
if effective_leverage <= self.max_eff_lev:
qty_to_open = ping_notional / last_price
qty_to_open = round(qty_to_open, 3)
self.status_msg = f"Signal Open: Accumulating {qty_to_open} units"
self.place_order(qty_to_open, last_price, is_close=False)
else:
self.status_msg = f"Open Ignored: Max Lev Reached ({effective_leverage:.2f}x)"
def place_order(self, qty, price, is_close=False):
"""Send Market Order to Bybit V5"""
side = ""
if self.direction == "long":
side = "Sell" if is_close else "Buy"
return "open" if l_open else ("close" if l_close else None)
else:
side = "Buy" if is_close else "Sell"
return "open" if s_open else ("close" if s_close else None)
async def execute_trade(self, signal):
if not signal: return
last_price = self.market_price
if signal == "close" and self.position:
qty = float(self.position['size']) * self.partial_exit_pct
if (float(self.position['size']) - qty) * last_price < self.min_val_usd:
qty = float(self.position['size'])
self.place_order(qty, is_close=True)
pos_idx = 1 if self.direction == "long" else 2
try:
response = self.session.place_order(
category="linear",
symbol=self.symbol,
side=side,
orderType="Market",
qty=str(qty),
timeInForce="GTC",
reduceOnly=is_close,
positionIdx=pos_idx
)
if response['retCode'] == 0:
logger.info(f"Order Placed: {side} {qty} {self.symbol}")
self.last_signal = f"{side} {qty} @ Market"
elif signal == "open":
cur_notional = float(self.position['size']) * last_price if self.position else 0
ping_notional = self.pos_size_margin * self.leverage
if (cur_notional + ping_notional) / max(self.wallet_balance, 1) <= self.max_eff_lev:
self.place_order(ping_notional / last_price, is_close=False)
else:
logger.error(f"Order Failed: {response['retMsg']}")
self.status_msg = f"Order Error: {response['retMsg']}"
self.status_msg = "Max Leverage Reached"
def place_order(self, qty, is_close=False):
side = "Sell" if (self.direction == "long" and is_close) or (self.direction == "short" and not is_close) else "Buy"
pos_idx = 1 if self.direction == "long" else 2
try:
res = self.session.place_order(
category="linear", symbol=self.symbol, side=side, orderType="Market",
qty=str(round(qty, 3)), reduceOnly=is_close, positionIdx=pos_idx
)
if res['retCode'] == 0:
self.last_signal = f"{side} {qty:.3f}"
self.status_msg = f"Order Success: {side}"
else:
self.status_msg = f"Order Error: {res['retMsg']}"
except Exception as e:
logger.error(f"Execution Error: {e}")
self.status_msg = f"Exec Error: {str(e)}"
logger.error(f"Trade Error: {e}")
def render_dashboard(self):
self.console.clear()
cfg_table = Table(title=f"PING-PONG BOT v{self.version} [{self.direction.upper()}]", box=box.ROUNDED, expand=True)
cfg_table.add_column("Property"); cfg_table.add_column("Value")
cfg_table.add_row("Symbol", self.symbol); cfg_table.add_row("Price", f"${self.market_price:.2f}")
cfg_table.add_row("Last Candle", f"{self.last_candle_time} (@${self.last_candle_price:.2f})")
ind_table = Table(title="INDICATORS", box=box.ROUNDED, expand=True)
ind_table.add_column("Indicator"); ind_table.add_column("Value"); ind_table.add_column("Updated")
for k, v in self.current_indicators.items():
if k != "price": ind_table.add_row(k.upper(), f"{v['value']:.2f}", v['timestamp'])
pos_table = Table(title="POSITION", box=box.ROUNDED, expand=True)
pos_table.add_column("Wallet"); pos_table.add_column("Size"); pos_table.add_column("Entry"); pos_table.add_column("PnL")
if self.position:
pnl = float(self.position['unrealisedPnl'])
pos_table.add_row(f"${self.wallet_balance:.2f}", self.position['size'], self.position['avgPrice'], f"[bold {'green' if pnl>=0 else 'red'}]${pnl:.2f}")
else:
pos_table.add_row(f"${self.wallet_balance:.2f}", "0", "-", "-")
self.console.print(cfg_table); self.console.print(ind_table); self.console.print(pos_table)
self.console.print(f"[dim]Status: {self.status_msg} | Signal: {self.last_signal}[/]")
async def run(self):
"""Main loop with strict New Candle detection"""
logger.info(f"Bot started for {self.symbol} in {self.direction} mode")
await self.db.connect()
last_exchange_update = 0
while True:
# 1. Update Account & Market Data
await self.update_account_info()
df = await self.fetch_data()
now = time.time()
# 1. Exchange Sync (15s)
if now - last_exchange_update >= 15:
await self.update_exchange_data()
last_exchange_update = now
if df is not None:
current_candle_time = df.iloc[-1]['start_time']
# 2. Strict Crossover Check on New Candle ONLY
if self.last_candle_time is not None and current_candle_time != self.last_candle_time:
# 2. DB Sync (5s)
candles = await self.db.get_candles(self.symbol, self.interval, limit=100)
if candles:
latest = candles[0]
if latest['time'] != self.last_candle_time:
df = pd.DataFrame(candles[::-1])
df = self.calculate_indicators(df)
signal = self.check_signals(df)
if signal:
logger.info(f"CROSSOVER DETECTED: {signal.upper()} @ {df.iloc[-1]['close']}")
await self.execute_trade_logic(df, signal)
else:
self.status_msg = "New Candle: No Crossover"
self.last_candle_time = current_candle_time
if signal: await self.execute_trade(signal)
self.last_candle_time = latest['time']
self.last_candle_price = latest['close']
self.status_msg = f"New Candle processed: {latest['time']}"
# 3. Render Summary Dashboard
self.render_dashboard()
await asyncio.sleep(self.config.get('loop_interval_seconds', 10))
await asyncio.sleep(5)
if __name__ == "__main__":
try:
bot = PingPongBot()
asyncio.run(bot.run())
except KeyboardInterrupt:
print("\nBot Stopped by User")
except Exception as e:
logger.exception("Critical Error in main loop")
bot = PingPongBot()
asyncio.run(bot.run())