import os import sys import time import json import argparse from datetime import datetime, timedelta, timezone from hyperliquid.info import Info from hyperliquid.utils import constants from collections import deque, defaultdict # --- Configuration --- MAX_TRADE_HISTORY = 100000 all_trades = { "BTC": deque(maxlen=MAX_TRADE_HISTORY), "ETH": deque(maxlen=MAX_TRADE_HISTORY), } latest_raw_trades = { "BTC": None, "ETH": None, } decoded_trade_output = [] _lines_printed = 0 def get_coins_from_strategies() -> set: """ Reads the strategies.json file and returns a unique set of coin symbols from all enabled strategies. """ coins = set() config_path = os.path.join("_data", "strategies.json") try: with open(config_path, 'r') as f: all_configs = json.load(f) for name, config in all_configs.items(): if config.get("enabled", False): coin = config.get("parameters", {}).get("coin") if coin: coins.add(coin) print(f"Found {len(coins)} unique coins to watch from enabled strategies: {list(coins)}") return coins except (FileNotFoundError, json.JSONDecodeError) as e: print(f"ERROR: Could not load or parse '{config_path}': {e}", file=sys.stderr) return set() def on_message(message): """ Callback function to process incoming trades from the WebSocket and store them. """ try: if message.get("channel") == "trades": for trade in message["data"]: coin = trade['coin'] if coin in all_trades: latest_raw_trades[coin] = trade price = float(trade['px']) size = float(trade['sz']) decoded_trade = { "time": datetime.fromtimestamp(trade['time'] / 1000, tz=timezone.utc), "side": "BUY" if trade['side'] == "B" else "SELL", "value": price * size, "users": trade.get('users', []) } all_trades[coin].append(decoded_trade) except (KeyError, TypeError, ValueError): pass def build_top_trades_table(title: str, trades: list) -> list: """Builds the formatted lines for a top-5 trades by value table.""" lines = [] header = f"{'Time (UTC)':<10} | {'Side':<5} | {'Value (USD)':>20}" lines.append(f"--- {title} ---") lines.append(header) lines.append("-" * len(header)) top_trades = sorted(trades, key=lambda x: x['value'], reverse=True)[:5] for trade in top_trades: lines.append( f"{trade['time'].strftime('%H:%M:%S'):<10} | " f"{trade['side']:<5} | " f"${trade['value']:>18,.2f}" ) while len(lines) < 8: lines.append(" " * len(header)) return lines def build_top_takers_table(title: str, trades: list) -> list: """Analyzes a list of trades to find the top 5 takers by total volume.""" lines = [] header = f"{'#':<2} | {'Taker Address':<15} | {'Total Volume (USD)':>20}" lines.append(f"--- {title} ---") lines.append(header) lines.append("-" * len(header)) volumes = defaultdict(float) for trade in trades: for user in trade['users']: volumes[user] += trade['value'] top_takers = sorted(volumes.items(), key=lambda item: item[1], reverse=True)[:5] for i, (address, volume) in enumerate(top_takers, 1): short_address = f"{address[:6]}...{address[-4:]}" lines.append(f"{i:<2} | {short_address:<15} | ${volume:>18,.2f}") while len(lines) < 8: lines.append(" " * len(header)) return lines def build_top_active_takers_table(title: str, trades: list) -> list: """Analyzes a list of trades to find the top 5 takers by trade count.""" lines = [] header = f"{'#':<2} | {'Taker Address':<42} | {'Trade Count':>12} | {'Total Volume (USD)':>20}" lines.append(f"--- {title} ---") lines.append(header) lines.append("-" * len(header)) taker_data = defaultdict(lambda: {'count': 0, 'volume': 0.0}) for trade in trades: for user in trade['users']: taker_data[user]['count'] += 1 taker_data[user]['volume'] += trade['value'] top_takers = sorted(taker_data.items(), key=lambda item: item[1]['count'], reverse=True)[:5] for i, (address, data) in enumerate(top_takers, 1): lines.append(f"{i:<2} | {address:<42} | {data['count']:>12} | ${data['volume']:>18,.2f}") while len(lines) < 8: lines.append(" " * len(header)) return lines def build_decoded_trade_lines(coin: str) -> list: """Builds a formatted, multi-line string for a single decoded trade.""" trade = latest_raw_trades[coin] if not trade: return ["No trade data yet..."] * 7 return [ f"Time: {datetime.fromtimestamp(trade['time'] / 1000, tz=timezone.utc)}", f"Side: {'BUY' if trade.get('side') == 'B' else 'SELL'}", f"Price: {trade.get('px', 'N/A')}", f"Size: {trade.get('sz', 'N/A')}", f"Trade ID: {trade.get('tid', 'N/A')}", f"Hash: {trade.get('hash', 'N/A')}", f"Users: {', '.join(trade.get('users', []))}" ] def update_decoded_trade_display(): """ Updates the global variable holding the decoded trade output, but only at the 40-second mark of each minute. """ global decoded_trade_output if datetime.now().second == 40: lines = [] lines.append("--- Last BTC Trade (Decoded) ---") lines.extend(build_decoded_trade_lines("BTC")) lines.append("") lines.append("--- Last ETH Trade (Decoded) ---") lines.extend(build_decoded_trade_lines("ETH")) decoded_trade_output = lines def display_dashboard(view: str): """Clears the screen and prints the selected dashboard view.""" global _lines_printed if _lines_printed > 0: print(f"\x1b[{_lines_printed}A", end="") now_utc = datetime.now(timezone.utc) output_lines = [] separator = " | " time_windows = [ ("All Time", None), ("Last 24h", timedelta(hours=24)), ("Last 1h", timedelta(hours=1)), ("Last 5m", timedelta(minutes=5)), ("Last 1m", timedelta(minutes=1)), ] btc_trades_copy = list(all_trades["BTC"]) eth_trades_copy = list(all_trades["ETH"]) if view == "trades": output_lines.append("--- Top 5 Trades by Value ---") for title, delta in time_windows: btc_trades = [t for t in btc_trades_copy if not delta or t['time'] > now_utc - delta] eth_trades = [t for t in eth_trades_copy if not delta or t['time'] > now_utc - delta] btc_lines = build_top_trades_table(f"BTC - {title}", btc_trades) eth_lines = build_top_trades_table(f"ETH - {title}", eth_trades) for i in range(len(btc_lines)): output_lines.append(f"{btc_lines[i]:<45}{separator}{eth_lines[i] if i < len(eth_lines) else ''}") output_lines.append("") elif view == "takers": output_lines.append("--- Top 5 Takers by Volume (Rolling Windows) ---") for title, delta in time_windows[1:]: btc_trades = [t for t in btc_trades_copy if t['time'] > now_utc - delta] eth_trades = [t for t in eth_trades_copy if t['time'] > now_utc - delta] btc_lines = build_top_takers_table(f"BTC - {title}", btc_trades) eth_lines = build_top_takers_table(f"ETH - {title}", eth_trades) for i in range(len(btc_lines)): output_lines.append(f"{btc_lines[i]:<45}{separator}{eth_lines[i] if i < len(eth_lines) else ''}") output_lines.append("") elif view == "active_takers": output_lines.append("--- Top 5 Active Takers by Trade Count (Rolling Windows) ---") for title, delta in time_windows[1:]: btc_trades = [t for t in btc_trades_copy if t['time'] > now_utc - delta] eth_trades = [t for t in eth_trades_copy if t['time'] > now_utc - delta] btc_lines = build_top_active_takers_table(f"BTC - {title}", btc_trades) eth_lines = build_top_active_takers_table(f"ETH - {title}", eth_trades) header_width = 85 for i in range(len(btc_lines)): output_lines.append(f"{btc_lines[i]:<{header_width}}{separator}{eth_lines[i] if i < len(eth_lines) else ''}") output_lines.append("") if decoded_trade_output: output_lines.extend(decoded_trade_output) else: for _ in range(17): output_lines.append("") final_output = "\n".join(output_lines) + "\n\x1b[J" print(final_output, end="") _lines_printed = len(output_lines) sys.stdout.flush() def main(): """Main function to set up the WebSocket and run the display loop.""" parser = argparse.ArgumentParser(description="Live market data dashboard for Hyperliquid.") parser.add_argument("--view", default="trades", choices=['trades', 'takers', 'active_takers'], help="The data view to display: 'trades' (default), 'takers', or 'active_takers'.") args = parser.parse_args() coins_to_watch = get_coins_from_strategies() if not ("BTC" in coins_to_watch and "ETH" in coins_to_watch): print("This script is configured to display BTC and ETH. Please ensure they are in your strategies.", file=sys.stderr) return info = Info(constants.MAINNET_API_URL, skip_ws=False) for coin in ["BTC", "ETH"]: trade_subscription = {"type": "trades", "coin": coin} info.subscribe(trade_subscription, on_message) print(f"Subscribed to Trades for {coin}") time.sleep(0.2) print(f"\nDisplaying live '{args.view}' summary... Press Ctrl+C to stop.") try: while True: update_decoded_trade_display() display_dashboard(view=args.view) time.sleep(1) except KeyboardInterrupt: print("\nStopping WebSocket listener...") info.ws_manager.stop() print("Listener stopped.") if __name__ == "__main__": main()