feat: implement rich.live for non-blinking dashboard (v1.3.4)

This commit is contained in:
Gemini CLI
2026-03-05 22:28:22 +01:00
parent 38d8e5f1de
commit fcfa2244f2

View File

@ -15,6 +15,7 @@ from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.layout import Layout
from rich.live import Live
from rich import box
import asyncpg
@ -68,7 +69,7 @@ class DatabaseManager:
class PingPongBot:
def __init__(self, config_path="config/ping_pong_config.yaml"):
self.version = "1.3.3"
self.version = "1.3.4"
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
@ -89,7 +90,6 @@ class PingPongBot:
self.symbol = self.config['symbol'].upper() # e.g. BTCUSDT
self.db_symbol = self.symbol.replace("USDT", "") # e.g. BTC
self.interval = str(self.config['interval'])
# Map interval to DB format: '1' -> '1m'
self.db_interval = self.interval + "m" if self.interval.isdigit() else self.interval
self.direction = self.config['direction'].lower()
@ -135,7 +135,6 @@ class PingPongBot:
mcl = hurst_cfg['period'] / 2
mcl_2 = int(round(mcl / 2))
# Vectorized TR calculation
df['tr'] = np.maximum(
df['high'] - df['low'],
np.maximum(
@ -162,18 +161,15 @@ class PingPongBot:
async def update_exchange_data(self):
"""Fetch Price, Balance, Position every 15s"""
try:
# 1. Price
ticker = self.session.get_tickers(category="linear", symbol=self.symbol)
if ticker['retCode'] == 0:
self.market_price = float(ticker['result']['list'][0]['lastPrice'])
# 2. Position
pos = self.session.get_positions(category="linear", symbol=self.symbol, settleCoin="USDT")
if pos['retCode'] == 0:
active = [p for p in pos['result']['list'] if float(p['size']) > 0]
self.position = active[0] if active else None
# 3. Balance
wallet = self.session.get_wallet_balance(category="linear", accountType="UNIFIED", coin="USDT")
if wallet['retCode'] == 0:
result_list = wallet['result']['list']
@ -190,13 +186,11 @@ class PingPongBot:
last, prev = df.iloc[-1], df.iloc[-2]
rsi_cfg, hurst_cfg = self.config['rsi'], self.config['hurst']
# Long Signals
l_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \
(hurst_cfg['enabled_for_open'] and prev['close'] > prev['hurst_lower'] and last['close'] <= last['hurst_lower'])
l_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \
(hurst_cfg['enabled_for_close'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
# Short Signals
s_open = (rsi_cfg['enabled_for_open'] and prev['rsi'] > rsi_cfg['overbought'] and last['rsi'] <= rsi_cfg['overbought']) or \
(hurst_cfg['enabled_for_open'] and prev['close'] < prev['hurst_upper'] and last['close'] >= last['hurst_upper'])
s_close = (rsi_cfg['enabled_for_close'] and prev['rsi'] < rsi_cfg['oversold'] and last['rsi'] >= rsi_cfg['oversold']) or \
@ -215,17 +209,17 @@ class PingPongBot:
qty = float(self.position['size']) * self.partial_exit_pct
if (float(self.position['size']) - qty) * last_price < self.min_val_usd:
qty = float(self.position['size'])
self.place_order(qty, is_close=True)
await self.place_order(qty, is_close=True)
elif signal == "open":
cur_notional = float(self.position['size']) * last_price if self.position else 0
ping_notional = self.pos_size_margin * self.leverage
if (cur_notional + ping_notional) / max(self.wallet_balance, 1) <= self.max_eff_lev:
self.place_order(ping_notional / last_price, is_close=False)
await self.place_order(ping_notional / last_price, is_close=False)
else:
self.status_msg = "Max Leverage Reached"
def place_order(self, qty, is_close=False):
async def place_order(self, qty, is_close=False):
side = "Sell" if (self.direction == "long" and is_close) or (self.direction == "short" and not is_close) else "Buy"
pos_idx = 1 if self.direction == "long" else 2
try:
@ -241,57 +235,76 @@ class PingPongBot:
except Exception as e:
logger.error(f"Trade Error: {e}")
def render_dashboard(self):
self.console.clear()
cfg_table = Table(title=f"PING-PONG BOT v{self.version} [{self.direction.upper()}]", box=box.ROUNDED, expand=True)
cfg_table.add_column("Property"); cfg_table.add_column("Value")
cfg_table.add_row("Symbol", self.symbol); cfg_table.add_row("Price", f"${self.market_price:.2f}")
cfg_table.add_row("Last Candle", f"{self.last_candle_time} (@${self.last_candle_price:.2f})")
def generate_dashboard(self):
"""Generates the dashboard layout without printing directly"""
layout = Layout()
layout.split_column(
Layout(name="header", size=3),
Layout(name="indicators", size=7),
Layout(name="position", size=6),
Layout(name="footer", size=2)
)
ind_table = Table(title="INDICATORS", box=box.ROUNDED, expand=True)
ind_table.add_column("Indicator"); ind_table.add_column("Value"); ind_table.add_column("Updated")
for k, v in self.current_indicators.items():
if k != "price": ind_table.add_row(k.upper(), f"{v['value']:.2f}", v['timestamp'])
# Header
header_table = Table(box=box.ROUNDED, expand=True, show_header=False)
header_table.add_row(f"[bold cyan]PING-PONG BOT v{self.version}[/] | Symbol: [bold white]{self.symbol}[/] | Price: [bold yellow]${self.market_price:,.2f}[/]")
layout["header"].update(Panel(header_table))
pos_table = Table(title="POSITION", box=box.ROUNDED, expand=True)
pos_table.add_column("Wallet"); pos_table.add_column("Size"); pos_table.add_column("Entry"); pos_table.add_column("PnL")
# Indicators
ind_table = Table(box=box.ROUNDED, expand=True)
ind_table.add_column("Indicator", style="dim"); ind_table.add_column("Value", justify="right"); ind_table.add_column("Last Update", justify="center")
ind_table.add_row("Hurst Upper", f"{self.current_indicators['hurst_upper']['value']:.2f}", self.current_indicators['hurst_upper']['timestamp'])
ind_table.add_row("Hurst Lower", f"{self.current_indicators['hurst_lower']['value']:.2f}", self.current_indicators['hurst_lower']['timestamp'])
ind_table.add_row("RSI", f"{self.current_indicators['rsi']['value']:.2f}", self.current_indicators['rsi']['timestamp'])
layout["indicators"].update(Panel(ind_table, title="[bold yellow]TECHNICAL INDICATORS[/]"))
# Position
pos_table = Table(box=box.ROUNDED, expand=True)
pos_table.add_column("Account Balance", justify="center"); pos_table.add_column("Position Size", justify="center"); pos_table.add_column("Entry Price", justify="center"); pos_table.add_column("Unrealized PnL", justify="center")
if self.position:
pnl = float(self.position['unrealisedPnl'])
pos_table.add_row(f"${self.wallet_balance:.2f}", self.position['size'], self.position['avgPrice'], f"[bold {'green' if pnl>=0 else 'red'}]${pnl:.2f}")
pos_table.add_row(f"${self.wallet_balance:,.2f}", str(self.position['size']), f"${float(self.position['avgPrice']):,.2f}", f"[bold {'green' if pnl>=0 else 'red'}]${pnl:,.2f}")
else:
pos_table.add_row(f"${self.wallet_balance:.2f}", "0", "-", "-")
pos_table.add_row(f"${self.wallet_balance:,.2f}", "0", "-", "-")
layout["position"].update(Panel(pos_table, title="[bold green]PORTFOLIO STATUS[/]"))
self.console.print(cfg_table); self.console.print(ind_table); self.console.print(pos_table)
self.console.print(f"[dim]Status: {self.status_msg} | Signal: {self.last_signal}[/]")
# Footer
footer_text = f"Status: [bold blue]{self.status_msg}[/] | Last Signal: [bold yellow]{self.last_signal or 'N/A'}[/] | Last Candle: {self.last_candle_time or 'N/A'}"
layout["footer"].update(Panel(footer_text))
return layout
async def run(self):
await self.db.connect()
last_exchange_update = 0
while True:
now = time.time()
# 1. Exchange Sync (15s)
if now - last_exchange_update >= 15:
await self.update_exchange_data()
last_exchange_update = now
# 2. DB Sync (5s)
candles = await self.db.get_candles(self.db_symbol, self.db_interval, limit=100)
if candles:
latest = candles[0]
if latest['time'] != self.last_candle_time:
df = pd.DataFrame(candles[::-1])
df = df.astype({'open': float, 'high': float, 'low': float, 'close': float, 'volume': float})
df = self.calculate_indicators(df)
signal = self.check_signals(df)
if signal: await self.execute_trade(signal)
self.last_candle_time = latest['time']
self.last_candle_price = latest['close']
self.status_msg = f"New Candle processed: {latest['time']}"
else:
self.status_msg = f"No candles found for {self.db_symbol} / {self.db_interval}"
with Live(self.generate_dashboard(), refresh_per_second=2) as live:
while True:
now = time.time()
# 1. Exchange Sync (15s)
if now - last_exchange_update >= 15:
await self.update_exchange_data()
last_exchange_update = now
self.render_dashboard()
await asyncio.sleep(5)
# 2. DB Sync (5s)
candles = await self.db.get_candles(self.db_symbol, self.db_interval, limit=100)
if candles:
latest = candles[0]
if latest['time'] != self.last_candle_time:
df = pd.DataFrame(candles[::-1])
df = df.astype({'open': float, 'high': float, 'low': float, 'close': float, 'volume': float})
df = self.calculate_indicators(df)
signal = self.check_signals(df)
if signal: await self.execute_trade(signal)
self.last_candle_time = latest['time']
self.last_candle_price = latest['close']
self.status_msg = f"New Candle processed: {latest['time']}"
else:
self.status_msg = f"No candles found for {self.db_symbol} / {self.db_interval}"
# 3. Update Dashboard
live.update(self.generate_dashboard())
await asyncio.sleep(5)
if __name__ == "__main__":
bot = PingPongBot()