Files
btc-trading/src/strategies/ping_pong_bot.py

328 lines
14 KiB
Python

import os
import time
import yaml
import hmac
import hashlib
import json
import logging
import asyncio
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.layout import Layout
from rich import box
import asyncpg
# Try to import pybit
try:
from pybit.unified_trading import HTTP
except ImportError:
print("Error: 'pybit' library not found. Please install it with: pip install pybit")
exit(1)
# Load environment variables
load_dotenv()
log_level = os.getenv("LOG_LEVEL", "INFO")
# Setup Logging
logging.basicConfig(
level=getattr(logging, log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("PingPongBot")
class DatabaseManager:
"""Minimal Database Manager for the bot"""
def __init__(self):
self.host = os.getenv('DB_HOST', '20.20.20.20')
self.port = int(os.getenv('DB_PORT', 5433))
self.database = os.getenv('DB_NAME', 'btc_data')
self.user = os.getenv('DB_USER', 'btc_bot')
self.password = os.getenv('DB_PASSWORD', '')
self.pool = None
async def connect(self):
self.pool = await asyncpg.create_pool(
host=self.host, port=self.port, user=self.user,
password=self.password, database=self.database
)
logger.info("Connected to Database")
async def get_candles(self, symbol: str, interval: str, limit: int = 100):
async with self.pool.acquire() as conn:
rows = await conn.fetch('''
SELECT time, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2
ORDER BY time DESC LIMIT $3
''', symbol, interval, limit)
return [dict(r) for r in rows]
class PingPongBot:
def __init__(self, config_path="config/ping_pong_config.yaml"):
self.version = "1.3.7"
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.api_key = os.getenv("BYBIT_API_KEY") or os.getenv("API_KEY")
self.api_secret = os.getenv("BYBIT_API_SECRET") or os.getenv("API_SECRET")
if not self.api_key or not self.api_secret:
raise ValueError("API_KEY and API_SECRET must be set in .env file")
self.session = HTTP(
testnet=False,
api_key=self.api_key,
api_secret=self.api_secret
)
self.db = DatabaseManager()
self.symbol = self.config['symbol'].upper()
self.db_symbol = self.symbol.replace("USDT", "")
self.interval = str(self.config['interval'])
self.db_interval = self.interval + "m" if self.interval.isdigit() else self.interval
self.direction = self.config['direction'].lower()
# State
self.last_candle_time = None
self.last_candle_price = 0.0
self.current_indicators = {
"rsi": {"value": 0.0, "timestamp": "N/A"},
"hurst_lower": {"value": 0.0, "timestamp": "N/A"},
"hurst_upper": {"value": 0.0, "timestamp": "N/A"}
}
self.position = None
self.wallet_balance = 0
self.market_price = 0.0
self.status_msg = "Initializing..."
self.last_signal = None
self.start_time = datetime.now()
self.last_heartbeat = datetime.now()
self.console = Console()
# Parameters
self.tp_pct = float(self.config.get('take_profit_pct', 1.5)) / 100.0
self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15))
self.min_val_usd = float(self.config.get('min_position_value_usd', 15.0))
self.pos_size_margin = float(self.config.get('pos_size_margin', 20.0))
self.leverage = float(self.config.get('exchange_leverage', 3.0))
self.max_eff_lev = float(self.config.get('max_effective_leverage', 1.0))
def rma(self, series, length):
alpha = 1 / length
return series.ewm(alpha=alpha, adjust=False).mean()
def calculate_indicators(self, df):
rsi_cfg = self.config['rsi']
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
df['rsi'] = 100 - (100 / (1 + (self.rma(gain, rsi_cfg['period']) / self.rma(loss, rsi_cfg['period']))))
hurst_cfg = self.config['hurst']
mcl = hurst_cfg['period'] / 2
mcl_2 = int(round(mcl / 2))
df['tr'] = np.maximum(
df['high'] - df['low'],
np.maximum(
abs(df['high'] - df['close'].shift(1)),
abs(df['low'] - df['close'].shift(1))
)
)
df['ma_mcl'] = self.rma(df['close'], mcl)
df['atr_mcl'] = self.rma(df['tr'], mcl)
df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl'])
mcm_off = hurst_cfg['multiplier'] * df['atr_mcl']
df['hurst_upper'] = df['center'] + mcm_off
df['hurst_lower'] = df['center'] - mcm_off
last_row = df.iloc[-1]
now_str = datetime.now().strftime("%H:%M:%S")
self.current_indicators["rsi"] = {"value": float(last_row['rsi']), "timestamp": now_str}
self.current_indicators["hurst_lower"] = {"value": float(last_row['hurst_lower']), "timestamp": now_str}
self.current_indicators["hurst_upper"] = {"value": float(last_row['hurst_upper']), "timestamp": now_str}
return df
async def update_exchange_data(self):
"""Fetch Price, Balance, Position every 15s with timeout"""
try:
# Wrap synchronous pybit calls in asyncio.to_thread for better async behavior
ticker = await asyncio.to_thread(self.session.get_tickers, category="linear", symbol=self.symbol)
if ticker['retCode'] == 0:
self.market_price = float(ticker['result']['list'][0]['lastPrice'])
pos = await asyncio.to_thread(self.session.get_positions, category="linear", symbol=self.symbol, settleCoin="USDT")
if pos['retCode'] == 0:
active = [p for p in pos['result']['list'] if float(p['size']) > 0]
self.position = active[0] if active else None
wallet = await asyncio.to_thread(self.session.get_wallet_balance, category="linear", accountType="UNIFIED", coin="USDT")
if wallet['retCode'] == 0:
result_list = wallet['result']['list']
if result_list:
self.wallet_balance = float(result_list[0].get('totalWalletBalance', 0))
if self.wallet_balance == 0:
coin_info = result_list[0].get('coin', [])
if coin_info:
self.wallet_balance = float(coin_info[0].get('walletBalance', 0))
except Exception as e:
logger.error(f"Exchange Sync Error: {e}")
def check_signals(self, df):
last, prev = df.iloc[-1], df.iloc[-2]
rsi_cfg, hurst_cfg = self.config['rsi'], self.config['hurst']
l_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \
(hurst_cfg['enabled_for_open'] and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'])
l_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \
(hurst_cfg['enabled_for_close'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
s_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \
(hurst_cfg['enabled_for_open'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
s_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \
(hurst_cfg['enabled_for_close'] and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'])
if self.direction == 'long':
return "open" if l_open else ("close" if l_close else None)
else:
return "open" if s_open else ("close" if s_close else None)
async def execute_trade(self, signal):
if not signal: return
last_price = self.market_price
if signal == "close" and self.position:
qty = float(self.position['size']) * self.partial_exit_pct
if (float(self.position['size']) - qty) * last_price < self.min_val_usd:
qty = float(self.position['size'])
await self.place_order(qty, is_close=True)
elif signal == "open":
cur_notional = float(self.position['size']) * last_price if self.position else 0
ping_notional = self.pos_size_margin * self.leverage
if (cur_notional + ping_notional) / max(self.wallet_balance, 1) <= self.max_eff_lev:
await self.place_order(ping_notional / last_price, is_close=False)
else:
self.status_msg = "Max Leverage Reached"
async def place_order(self, qty, is_close=False):
side = "Sell" if (self.direction == "long" and is_close) or (self.direction == "short" and not is_close) else "Buy"
pos_idx = 1 if self.direction == "long" else 2
try:
res = await asyncio.to_thread(self.session.place_order,
category="linear", symbol=self.symbol, side=side, orderType="Market",
qty=str(round(qty, 3)), reduceOnly=is_close, positionIdx=pos_idx
)
if res['retCode'] == 0:
self.last_signal = f"{side} {qty:.3f}"
self.status_msg = f"Order Success: {side}"
else:
self.status_msg = f"Order Error: {res['retMsg']}"
except Exception as e:
logger.error(f"Trade Error: {e}")
def print_dashboard(self):
"""Prints a clean summary to the logs"""
now = datetime.now().strftime("%H:%M:%S")
self.last_heartbeat = datetime.now()
# 1. Header
header = Table(title=f"PING-PONG BOT v{self.version} Dashboard", box=box.ROUNDED, expand=True)
header.add_column("Property"); header.add_column("Value")
header.add_row("Symbol", self.symbol)
header.add_row("Market Price", f"${self.market_price:,.2f}")
header.add_row("Direction", self.direction.upper())
header.add_row("Last Candle", f"{self.last_candle_time} (@${self.last_candle_price:,.2f})")
# 2. Indicators
inds = Table(title="INDICATORS", box=box.ROUNDED, expand=True)
inds.add_column("Indicator"); inds.add_column("Value"); inds.add_column("Updated")
inds.add_row("Hurst Upper", f"{self.current_indicators['hurst_upper']['value']:.2f}", self.current_indicators['hurst_upper']['timestamp'])
inds.add_row("Hurst Lower", f"{self.current_indicators['hurst_lower']['value']:.2f}", self.current_indicators['hurst_lower']['timestamp'])
inds.add_row("RSI", f"{self.current_indicators['rsi']['value']:.2f}", self.current_indicators['rsi']['timestamp'])
# 3. Position
pos = Table(title="PORTFOLIO & STATUS", box=box.ROUNDED, expand=True)
pos.add_column("Property"); pos.add_column("Value")
pos.add_row("Wallet Balance", f"${self.wallet_balance:,.2f}")
if self.position:
pnl = float(self.position['unrealisedPnl'])
pos.add_row("Position Size", str(self.position['size']))
pos.add_row("Entry Price", f"${float(self.position['avgPrice']):,.2f}")
pos.add_row("Unrealized PnL", f"[bold {'green' if pnl>=0 else 'red'}]${pnl:,.2f}")
else:
pos_row = "NONE"
pos.add_row("Position", "NONE")
pos.add_row("Status", f"[bold blue]{self.status_msg}[/]")
pos.add_row("Last Signal", str(self.last_signal or "None"))
pos.add_row("Heartbeat", f"[italic]{now}[/]")
self.console.print("\n")
self.console.print(header)
self.console.print(inds)
self.console.print(pos)
self.console.print("-" * 50)
async def run(self):
logger.info("Bot starting...")
await self.db.connect()
last_exchange_update = 0
while True:
try:
now_ts = time.time()
# 1. Exchange Sync (15s)
if now_ts - last_exchange_update >= 15:
# Wrapped in timeout to prevent hanging
await asyncio.wait_for(self.update_exchange_data(), timeout=10)
last_exchange_update = now_ts
# 2. DB Sync (5s)
candles = await self.db.get_candles(self.db_symbol, self.db_interval, limit=100)
if candles:
latest = candles[0]
if latest['time'] != self.last_candle_time:
df = pd.DataFrame(candles[::-1])
df = df.astype({'open': float, 'high': float, 'low': float, 'close': float, 'volume': float})
df = self.calculate_indicators(df)
signal = self.check_signals(df)
if signal:
logger.info(f"Signal detected: {signal}")
await self.execute_trade(signal)
self.last_candle_time = latest['time']
self.last_candle_price = latest['close']
self.status_msg = f"New Candle: {latest['time']}"
else:
self.status_msg = f"No candles found for {self.db_symbol}/{self.db_interval}"
# 3. Print Dashboard
self.print_dashboard()
except asyncio.TimeoutError:
logger.error("Exchange update timed out")
self.status_msg = "Exchange Update Timeout"
except Exception as e:
logger.exception(f"Loop Error: {e}")
self.status_msg = f"Error: {str(e)[:50]}"
await asyncio.sleep(5)
if __name__ == "__main__":
bot = PingPongBot()
asyncio.run(bot.run())