feat: implement automated direction (1D SMA44) and asset management (v1.5.0)

This commit is contained in:
Gemini CLI
2026-03-05 23:12:30 +01:00
parent e553a1dd48
commit 6e6e9db5cc

View File

@ -68,7 +68,7 @@ class DatabaseManager:
class PingPongBot: class PingPongBot:
def __init__(self, config_path="config/ping_pong_config.yaml"): def __init__(self, config_path="config/ping_pong_config.yaml"):
self.version = "1.4.2" self.version = "1.5.0"
with open(config_path, 'r') as f: with open(config_path, 'r') as f:
self.config = yaml.safe_load(f) self.config = yaml.safe_load(f)
@ -87,15 +87,22 @@ class PingPongBot:
self.db = DatabaseManager() self.db = DatabaseManager()
self.symbol = self.config['symbol'].upper() # e.g. BTCUSDT # Base settings
self.db_symbol = self.symbol.replace("USDT", "") # e.g. BTC self.base_coin = self.config['symbol'].upper().replace("USDT", "").replace("USDC", "") # e.g. BTC
self.db_symbol = self.base_coin
self.interval = str(self.config['interval']) self.interval = str(self.config['interval'])
# Map interval to DB format: '1' -> '1m'
self.db_interval = self.interval + "m" if self.interval.isdigit() else self.interval self.db_interval = self.interval + "m" if self.interval.isdigit() else self.interval
self.direction = self.config['direction'].lower() # Dynamic Strategy State
self.direction = None # 'long' or 'short'
self.category = None # 'linear' or 'inverse'
self.symbol = None # 'BTCUSDC' or 'BTCUSD'
# State # Tracking for SMA(44, 1D)
self.ma_44_val = 0.0
self.last_ma_check_time = 0
# Bot State
self.last_candle_time = None self.last_candle_time = None
self.last_candle_price = 0.0 self.last_candle_price = 0.0
self.current_indicators = { self.current_indicators = {
@ -111,7 +118,7 @@ class PingPongBot:
self.start_time = datetime.now() self.start_time = datetime.now()
self.console = Console() self.console = Console()
# Parameters # Fixed Parameters from Config
self.tp_pct = float(self.config.get('take_profit_pct', 1.5)) / 100.0 self.tp_pct = float(self.config.get('take_profit_pct', 1.5)) / 100.0
self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15)) self.partial_exit_pct = float(self.config.get('partial_exit_pct', 0.15))
self.min_val_usd = float(self.config.get('min_position_value_usd', 15.0)) self.min_val_usd = float(self.config.get('min_position_value_usd', 15.0))
@ -135,16 +142,7 @@ class PingPongBot:
hurst_cfg = self.config['hurst'] hurst_cfg = self.config['hurst']
mcl = hurst_cfg['period'] / 2 mcl = hurst_cfg['period'] / 2
mcl_2 = int(round(mcl / 2)) mcl_2 = int(round(mcl / 2))
df['tr'] = np.maximum(df['high'] - df['low'], np.maximum(abs(df['high'] - df['close'].shift(1)), abs(df['low'] - df['close'].shift(1))))
# Vectorized TR calculation
df['tr'] = np.maximum(
df['high'] - df['low'],
np.maximum(
abs(df['high'] - df['close'].shift(1)),
abs(df['low'] - df['close'].shift(1))
)
)
df['ma_mcl'] = self.rma(df['close'], mcl) df['ma_mcl'] = self.rma(df['close'], mcl)
df['atr_mcl'] = self.rma(df['tr'], mcl) df['atr_mcl'] = self.rma(df['tr'], mcl)
df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl']) df['center'] = df['ma_mcl'].shift(mcl_2).fillna(df['ma_mcl'])
@ -157,31 +155,124 @@ class PingPongBot:
self.current_indicators["rsi"] = {"value": float(last_row['rsi']), "timestamp": now_str} self.current_indicators["rsi"] = {"value": float(last_row['rsi']), "timestamp": now_str}
self.current_indicators["hurst_lower"] = {"value": float(last_row['hurst_lower']), "timestamp": now_str} self.current_indicators["hurst_lower"] = {"value": float(last_row['hurst_lower']), "timestamp": now_str}
self.current_indicators["hurst_upper"] = {"value": float(last_row['hurst_upper']), "timestamp": now_str} self.current_indicators["hurst_upper"] = {"value": float(last_row['hurst_upper']), "timestamp": now_str}
return df return df
async def update_direction(self):
"""Logic Point I: 1D MA44 check and Point II: Asset/Perp selection"""
try:
logger.info("Checking direction based on SMA(44, 1D)...")
candles_1d = await self.db.get_candles(self.db_symbol, "1d", limit=50)
if not candles_1d or len(candles_1d) < 44:
self.status_msg = "Error: Not enough 1D data for MA44"
return False
df_1d = pd.DataFrame(candles_1d[::-1])
df_1d['close'] = df_1d['close'].astype(float)
self.ma_44_val = df_1d['close'].rolling(window=44).mean().iloc[-1]
# Get current price from exchange
ticker = await asyncio.to_thread(self.session.get_tickers, category="linear", symbol=f"{self.base_coin}USDC")
current_price = float(ticker['result']['list'][0]['lastPrice'])
self.market_price = current_price
new_direction = "long" if current_price > self.ma_44_val else "short"
if new_direction != self.direction:
logger.info(f"DIRECTION CHANGE: {self.direction} -> {new_direction}")
self.status_msg = f"Switching to {new_direction.upper()}"
# 1. Close all positions (Point III.3)
if self.direction is not None:
await self.close_all_positions()
# 2. Swap Assets on Spot (Point II)
await self.swap_assets(new_direction)
# 3. Update configuration
self.direction = new_direction
if self.direction == "long":
self.category = "inverse"
self.symbol = f"{self.base_coin}USD"
else:
self.category = "linear"
self.symbol = f"{self.base_coin}USDC"
logger.info(f"Bot configured for {self.direction.upper()} | Symbol: {self.symbol} | Category: {self.category}")
self.last_candle_time = None # Force indicator recalculation
return True
return False
except Exception as e:
logger.error(f"Direction Update Error: {e}")
self.status_msg = f"Dir Error: {str(e)[:20]}"
return False
async def close_all_positions(self):
"""Closes any active position in the current category/symbol"""
try:
pos = await asyncio.to_thread(self.session.get_positions, category=self.category, symbol=self.symbol)
if pos['retCode'] == 0:
for p in pos['result']['list']:
if float(p.get('size', 0)) > 0:
logger.info(f"Closing existing position: {p['size']} {self.symbol}")
await self.place_order(float(p['size']), is_close=True)
except Exception as e:
logger.error(f"Error closing positions: {e}")
async def swap_assets(self, target_direction):
"""Point II: Exchange BTC/USDC on Spot market"""
try:
logger.info(f"Swapping assets for {target_direction.upper()} mode...")
spot_symbol = f"{self.base_coin}USDC"
# Get Spot Balances
balance = await asyncio.to_thread(self.session.get_wallet_balance, category="spot", coin=f"{self.base_coin},USDC")
coins = {c['coin']: float(c['walletBalance']) for c in balance['result']['list'][0]['coin']}
if target_direction == "short":
# Need USDC: Sell all BTC
btc_bal = coins.get(self.base_coin, 0)
if btc_bal > 0.0001:
logger.info(f"Spot: Selling {btc_bal} {self.base_coin} for USDC")
await asyncio.to_thread(self.session.place_order,
category="spot", symbol=spot_symbol, side="Sell", orderType="Market", qty=str(btc_bal)
)
else:
# Need BTC: Buy BTC with all USDC
usdc_bal = coins.get("USDC", 0)
if usdc_bal > 1.0:
logger.info(f"Spot: Buying {self.base_coin} with {usdc_bal} USDC")
# Spot Market Buy using orderAmount (spending USDC)
await asyncio.to_thread(self.session.place_order,
category="spot", symbol=spot_symbol, side="Buy", orderType="Market",
qty=str(usdc_bal), marketUnit="quote"
)
await asyncio.sleep(2) # Wait for spot settlement
except Exception as e:
logger.error(f"Asset Swap Error: {e}")
async def update_exchange_data(self): async def update_exchange_data(self):
"""Fetch Price, Balance, Position every 15s""" """Fetch Price, Balance, Position every 15s"""
try: try:
# Wrap synchronous pybit calls in asyncio.to_thread ticker = await asyncio.to_thread(self.session.get_tickers, category=self.category, symbol=self.symbol)
ticker = await asyncio.to_thread(self.session.get_tickers, category="linear", symbol=self.symbol)
if ticker['retCode'] == 0: if ticker['retCode'] == 0:
self.market_price = float(ticker['result']['list'][0]['lastPrice']) self.market_price = float(ticker['result']['list'][0]['lastPrice'])
pos = await asyncio.to_thread(self.session.get_positions, category="linear", symbol=self.symbol, settleCoin="USDT") pos = await asyncio.to_thread(self.session.get_positions, category=self.category, symbol=self.symbol, settleCoin="USDT" if self.category == "linear" else None)
if pos['retCode'] == 0: if pos['retCode'] == 0:
active = [p for p in pos['result']['list'] if float(p.get('size', 0)) > 0] active = [p for p in pos['result']['list'] if float(p.get('size', 0)) > 0]
self.position = active[0] if active else None self.position = active[0] if active else None
wallet = await asyncio.to_thread(self.session.get_wallet_balance, category="linear", accountType="UNIFIED", coin="USDT") # Use appropriate coin for balance based on category
target_coin = "USDC" if self.category == "linear" else self.base_coin
wallet = await asyncio.to_thread(self.session.get_wallet_balance, category=self.category, accountType="UNIFIED", coin=target_coin)
if wallet['retCode'] == 0: if wallet['retCode'] == 0:
result_list = wallet['result']['list'] res_list = wallet['result']['list']
if result_list: if res_list:
self.wallet_balance = float(result_list[0].get('totalWalletBalance', 0)) # In inverse, we value in BTC, but dashboard usually shows USD.
if self.wallet_balance == 0: # We'll stick to totalWalletBalance which is usually USD-equiv in UTA
coin_info = result_list[0].get('coin', []) self.wallet_balance = float(res_list[0].get('totalWalletBalance', 0))
if coin_info:
self.wallet_balance = float(coin_info[0].get('walletBalance', 0))
except Exception as e: except Exception as e:
logger.error(f"Exchange Sync Error: {e}") logger.error(f"Exchange Sync Error: {e}")
@ -189,13 +280,12 @@ class PingPongBot:
last, prev = df.iloc[-1], df.iloc[-2] last, prev = df.iloc[-1], df.iloc[-2]
rsi_cfg, hurst_cfg = self.config['rsi'], self.config['hurst'] rsi_cfg, hurst_cfg = self.config['rsi'], self.config['hurst']
# Long Signals # Signals defined by crossover
l_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \ l_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \
(hurst_cfg['enabled_for_open'] and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower']) (hurst_cfg['enabled_for_open'] and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'])
l_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \ l_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \
(hurst_cfg['enabled_for_close'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper']) (hurst_cfg['enabled_for_close'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
# Short Signals
s_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \ s_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \
(hurst_cfg['enabled_for_open'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper']) (hurst_cfg['enabled_for_open'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
s_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \ s_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \
@ -217,10 +307,19 @@ class PingPongBot:
await self.place_order(qty, is_close=True) await self.place_order(qty, is_close=True)
elif signal == "open": elif signal == "open":
cur_notional = float(self.position['size']) * last_price if self.position else 0 cur_qty = float(self.position['size']) if self.position else 0
ping_notional = self.pos_size_margin * self.leverage # Notional calculation differs between Linear and Inverse
if self.category == "linear":
cur_notional = cur_qty * last_price
ping_notional = self.pos_size_margin * self.leverage
qty_to_open = ping_notional / last_price
else: # Inverse
cur_notional = cur_qty # Inverse size is in USD usually, but Bybit V5 size is in contracts (USD)
ping_notional = self.pos_size_margin * self.leverage
qty_to_open = ping_notional # For Inverse BTCUSD, Qty is USD amount
if (cur_notional + ping_notional) / max(self.wallet_balance, 1) <= self.max_eff_lev: if (cur_notional + ping_notional) / max(self.wallet_balance, 1) <= self.max_eff_lev:
await self.place_order(ping_notional / last_price, is_close=False) await self.place_order(qty_to_open, is_close=False)
else: else:
self.status_msg = "Max Leverage Reached" self.status_msg = "Max Leverage Reached"
@ -228,12 +327,15 @@ class PingPongBot:
side = "Sell" if (self.direction == "long" and is_close) or (self.direction == "short" and not is_close) else "Buy" side = "Sell" if (self.direction == "long" and is_close) or (self.direction == "short" and not is_close) else "Buy"
pos_idx = 1 if self.direction == "long" else 2 pos_idx = 1 if self.direction == "long" else 2
try: try:
# Rounding: Linear BTC needs 3 decimals, Inverse BTCUSD needs integers (USD)
qty_str = str(int(qty)) if self.category == "inverse" else str(round(qty, 3))
res = await asyncio.to_thread(self.session.place_order, res = await asyncio.to_thread(self.session.place_order,
category="linear", symbol=self.symbol, side=side, orderType="Market", category=self.category, symbol=self.symbol, side=side, orderType="Market",
qty=str(round(qty, 3)), reduceOnly=is_close, positionIdx=pos_idx qty=qty_str, reduceOnly=is_close, positionIdx=pos_idx
) )
if res['retCode'] == 0: if res['retCode'] == 0:
self.last_signal = f"{side} {qty:.3f}" self.last_signal = f"{side} {qty_str}"
self.status_msg = f"Order Success: {side}" self.status_msg = f"Order Success: {side}"
else: else:
self.status_msg = f"Order Error: {res['retMsg']}" self.status_msg = f"Order Error: {res['retMsg']}"
@ -241,23 +343,22 @@ class PingPongBot:
logger.error(f"Trade Error: {e}") logger.error(f"Trade Error: {e}")
def render_dashboard(self): def render_dashboard(self):
# standard print based dashboard
self.console.print("\n" + "="*60) self.console.print("\n" + "="*60)
cfg_table = Table(title=f"PING-PONG BOT v{self.version} [{self.direction.upper()}]", box=box.ROUNDED, expand=True) title = f"PING-PONG BOT v{self.version} [{self.direction.upper() if self.direction else 'INIT'}]"
cfg_table = Table(title=title, box=box.ROUNDED, expand=True)
cfg_table.add_column("Property"); cfg_table.add_column("Value") cfg_table.add_column("Property"); cfg_table.add_column("Value")
cfg_table.add_row("Symbol", self.symbol); cfg_table.add_row("Price", f"${self.market_price:.2f}") cfg_table.add_row("Symbol", self.symbol or "N/A"); cfg_table.add_row("Category", self.category or "N/A")
cfg_table.add_row("Market Price", f"${self.market_price:.2f}"); cfg_table.add_row("SMA(44, 1D)", f"${self.ma_44_val:.2f}")
cfg_table.add_row("Last Candle", f"{self.last_candle_time} (@${self.last_candle_price:.2f})") cfg_table.add_row("Last Candle", f"{self.last_candle_time} (@${self.last_candle_price:.2f})")
ind_table = Table(title="INDICATORS", box=box.ROUNDED, expand=True) ind_table = Table(title="INDICATORS", box=box.ROUNDED, expand=True)
ind_table.add_column("Indicator"); ind_table.add_column("Value"); ind_table.add_column("Updated") ind_table.add_column("Indicator"); ind_table.add_column("Value"); ind_table.add_column("Updated")
# Explicit order: Hurst Upper, Hurst Lower, RSI
for k in ["hurst_upper", "hurst_lower", "rsi"]: for k in ["hurst_upper", "hurst_lower", "rsi"]:
v = self.current_indicators[k] v = self.current_indicators[k]
ind_table.add_row(k.upper().replace("_", " "), f"{v['value']:.2f}", v['timestamp']) ind_table.add_row(k.upper().replace("_", " "), f"{v['value']:.2f}", v['timestamp'])
pos_table = Table(title="POSITION", box=box.ROUNDED, expand=True) pos_table = Table(title="POSITION", box=box.ROUNDED, expand=True)
pos_table.add_column("Wallet"); pos_table.add_column("Size"); pos_table.add_column("Entry"); pos_table.add_column("PnL") pos_table.add_column("Account Equity"); pos_table.add_column("Size"); pos_table.add_column("Entry"); pos_table.add_column("PnL")
if self.position: if self.position:
pnl = float(self.position['unrealisedPnl']) pnl = float(self.position['unrealisedPnl'])
pos_table.add_row(f"${self.wallet_balance:.2f}", self.position['size'], self.position['avgPrice'], f"[bold {'green' if pnl>=0 else 'red'}]${pnl:.2f}") pos_table.add_row(f"${self.wallet_balance:.2f}", self.position['size'], self.position['avgPrice'], f"[bold {'green' if pnl>=0 else 'red'}]${pnl:.2f}")
@ -265,21 +366,29 @@ class PingPongBot:
pos_table.add_row(f"${self.wallet_balance:.2f}", "0", "-", "-") pos_table.add_row(f"${self.wallet_balance:.2f}", "0", "-", "-")
self.console.print(cfg_table); self.console.print(ind_table); self.console.print(pos_table) self.console.print(cfg_table); self.console.print(ind_table); self.console.print(pos_table)
self.console.print(f"[dim]Status: {self.status_msg} | Signal: {self.last_signal}[/]") self.console.print(f"[dim]Status: {self.status_msg} | Last Signal: {self.last_signal}[/]")
self.console.print("="*60 + "\n") self.console.print("="*60 + "\n")
async def run(self): async def run(self):
await self.db.connect() await self.db.connect()
await self.update_direction() # Initial point I
last_exchange_update = 0 last_exchange_update = 0
while True: while True:
try: try:
now = time.time() now = time.time()
# 1. Exchange Sync (15s)
# 1. Periodically check direction (every 2m - Point III.2)
if now - self.last_ma_check_time >= 120:
await self.update_direction()
self.last_ma_check_time = now
# 2. Exchange Sync (15s)
if now - last_exchange_update >= 15: if now - last_exchange_update >= 15:
await self.update_exchange_data() await self.update_exchange_data()
last_exchange_update = now last_exchange_update = now
# 2. DB Sync (5s) # 3. DB Sync (5s)
candles = await self.db.get_candles(self.db_symbol, self.db_interval, limit=100) candles = await self.db.get_candles(self.db_symbol, self.db_interval, limit=100)
if candles: if candles:
latest = candles[0] latest = candles[0]
@ -291,9 +400,7 @@ class PingPongBot:
if signal: await self.execute_trade(signal) if signal: await self.execute_trade(signal)
self.last_candle_time = latest['time'] self.last_candle_time = latest['time']
self.last_candle_price = latest['close'] self.last_candle_price = latest['close']
self.status_msg = f"New Candle: {latest['time']}" self.status_msg = f"New Candle: {latest['time'].strftime('%H:%M:%S')}"
else:
self.status_msg = f"No candles found for {self.db_symbol}/{self.db_interval}"
self.render_dashboard() self.render_dashboard()
except Exception as e: except Exception as e: