259 lines
10 KiB
Python
259 lines
10 KiB
Python
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import argparse
|
|
from datetime import datetime, timedelta, timezone
|
|
from hyperliquid.info import Info
|
|
from hyperliquid.utils import constants
|
|
from collections import deque, defaultdict
|
|
|
|
# --- Configuration ---
|
|
MAX_TRADE_HISTORY = 100000
|
|
all_trades = {
|
|
"BTC": deque(maxlen=MAX_TRADE_HISTORY),
|
|
"ETH": deque(maxlen=MAX_TRADE_HISTORY),
|
|
}
|
|
latest_raw_trades = {
|
|
"BTC": None,
|
|
"ETH": None,
|
|
}
|
|
decoded_trade_output = []
|
|
_lines_printed = 0
|
|
|
|
def get_coins_from_strategies() -> set:
|
|
"""
|
|
Reads the strategies.json file and returns a unique set of coin symbols
|
|
from all enabled strategies.
|
|
"""
|
|
coins = set()
|
|
config_path = os.path.join("_data", "strategies.json")
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
all_configs = json.load(f)
|
|
for name, config in all_configs.items():
|
|
if config.get("enabled", False):
|
|
coin = config.get("parameters", {}).get("coin")
|
|
if coin:
|
|
coins.add(coin)
|
|
print(f"Found {len(coins)} unique coins to watch from enabled strategies: {list(coins)}")
|
|
return coins
|
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
|
print(f"ERROR: Could not load or parse '{config_path}': {e}", file=sys.stderr)
|
|
return set()
|
|
|
|
def on_message(message):
|
|
"""
|
|
Callback function to process incoming trades from the WebSocket and store them.
|
|
"""
|
|
try:
|
|
if message.get("channel") == "trades":
|
|
for trade in message["data"]:
|
|
coin = trade['coin']
|
|
if coin in all_trades:
|
|
latest_raw_trades[coin] = trade
|
|
price = float(trade['px'])
|
|
size = float(trade['sz'])
|
|
decoded_trade = {
|
|
"time": datetime.fromtimestamp(trade['time'] / 1000, tz=timezone.utc),
|
|
"side": "BUY" if trade['side'] == "B" else "SELL",
|
|
"value": price * size,
|
|
"users": trade.get('users', [])
|
|
}
|
|
all_trades[coin].append(decoded_trade)
|
|
except (KeyError, TypeError, ValueError):
|
|
pass
|
|
|
|
def build_top_trades_table(title: str, trades: list) -> list:
|
|
"""Builds the formatted lines for a top-5 trades by value table."""
|
|
lines = []
|
|
header = f"{'Time (UTC)':<10} | {'Side':<5} | {'Value (USD)':>20}"
|
|
lines.append(f"--- {title} ---")
|
|
lines.append(header)
|
|
lines.append("-" * len(header))
|
|
|
|
top_trades = sorted(trades, key=lambda x: x['value'], reverse=True)[:5]
|
|
|
|
for trade in top_trades:
|
|
lines.append(
|
|
f"{trade['time'].strftime('%H:%M:%S'):<10} | "
|
|
f"{trade['side']:<5} | "
|
|
f"${trade['value']:>18,.2f}"
|
|
)
|
|
while len(lines) < 8: lines.append(" " * len(header))
|
|
return lines
|
|
|
|
def build_top_takers_table(title: str, trades: list) -> list:
|
|
"""Analyzes a list of trades to find the top 5 takers by total volume."""
|
|
lines = []
|
|
header = f"{'#':<2} | {'Taker Address':<15} | {'Total Volume (USD)':>20}"
|
|
lines.append(f"--- {title} ---")
|
|
lines.append(header)
|
|
lines.append("-" * len(header))
|
|
|
|
volumes = defaultdict(float)
|
|
for trade in trades:
|
|
for user in trade['users']:
|
|
volumes[user] += trade['value']
|
|
|
|
top_takers = sorted(volumes.items(), key=lambda item: item[1], reverse=True)[:5]
|
|
|
|
for i, (address, volume) in enumerate(top_takers, 1):
|
|
short_address = f"{address[:6]}...{address[-4:]}"
|
|
lines.append(f"{i:<2} | {short_address:<15} | ${volume:>18,.2f}")
|
|
|
|
while len(lines) < 8: lines.append(" " * len(header))
|
|
return lines
|
|
|
|
def build_top_active_takers_table(title: str, trades: list) -> list:
|
|
"""Analyzes a list of trades to find the top 5 takers by trade count."""
|
|
lines = []
|
|
header = f"{'#':<2} | {'Taker Address':<42} | {'Trade Count':>12} | {'Total Volume (USD)':>20}"
|
|
lines.append(f"--- {title} ---")
|
|
lines.append(header)
|
|
lines.append("-" * len(header))
|
|
|
|
taker_data = defaultdict(lambda: {'count': 0, 'volume': 0.0})
|
|
for trade in trades:
|
|
for user in trade['users']:
|
|
taker_data[user]['count'] += 1
|
|
taker_data[user]['volume'] += trade['value']
|
|
|
|
top_takers = sorted(taker_data.items(), key=lambda item: item[1]['count'], reverse=True)[:5]
|
|
|
|
for i, (address, data) in enumerate(top_takers, 1):
|
|
lines.append(f"{i:<2} | {address:<42} | {data['count']:>12} | ${data['volume']:>18,.2f}")
|
|
|
|
while len(lines) < 8: lines.append(" " * len(header))
|
|
return lines
|
|
|
|
|
|
def build_decoded_trade_lines(coin: str) -> list:
|
|
"""Builds a formatted, multi-line string for a single decoded trade."""
|
|
trade = latest_raw_trades[coin]
|
|
if not trade: return ["No trade data yet..."] * 7
|
|
|
|
return [
|
|
f"Time: {datetime.fromtimestamp(trade['time'] / 1000, tz=timezone.utc)}",
|
|
f"Side: {'BUY' if trade.get('side') == 'B' else 'SELL'}",
|
|
f"Price: {trade.get('px', 'N/A')}",
|
|
f"Size: {trade.get('sz', 'N/A')}",
|
|
f"Trade ID: {trade.get('tid', 'N/A')}",
|
|
f"Hash: {trade.get('hash', 'N/A')}",
|
|
f"Users: {', '.join(trade.get('users', []))}"
|
|
]
|
|
|
|
def update_decoded_trade_display():
|
|
"""
|
|
Updates the global variable holding the decoded trade output, but only
|
|
at the 40-second mark of each minute.
|
|
"""
|
|
global decoded_trade_output
|
|
if datetime.now().second == 40:
|
|
lines = []
|
|
lines.append("--- Last BTC Trade (Decoded) ---")
|
|
lines.extend(build_decoded_trade_lines("BTC"))
|
|
lines.append("")
|
|
lines.append("--- Last ETH Trade (Decoded) ---")
|
|
lines.extend(build_decoded_trade_lines("ETH"))
|
|
decoded_trade_output = lines
|
|
|
|
def display_dashboard(view: str):
|
|
"""Clears the screen and prints the selected dashboard view."""
|
|
global _lines_printed
|
|
if _lines_printed > 0: print(f"\x1b[{_lines_printed}A", end="")
|
|
|
|
now_utc = datetime.now(timezone.utc)
|
|
output_lines = []
|
|
separator = " | "
|
|
|
|
time_windows = [
|
|
("All Time", None), ("Last 24h", timedelta(hours=24)),
|
|
("Last 1h", timedelta(hours=1)), ("Last 5m", timedelta(minutes=5)),
|
|
("Last 1m", timedelta(minutes=1)),
|
|
]
|
|
|
|
btc_trades_copy = list(all_trades["BTC"])
|
|
eth_trades_copy = list(all_trades["ETH"])
|
|
|
|
if view == "trades":
|
|
output_lines.append("--- Top 5 Trades by Value ---")
|
|
for title, delta in time_windows:
|
|
btc_trades = [t for t in btc_trades_copy if not delta or t['time'] > now_utc - delta]
|
|
eth_trades = [t for t in eth_trades_copy if not delta or t['time'] > now_utc - delta]
|
|
btc_lines = build_top_trades_table(f"BTC - {title}", btc_trades)
|
|
eth_lines = build_top_trades_table(f"ETH - {title}", eth_trades)
|
|
for i in range(len(btc_lines)):
|
|
output_lines.append(f"{btc_lines[i]:<45}{separator}{eth_lines[i] if i < len(eth_lines) else ''}")
|
|
output_lines.append("")
|
|
|
|
elif view == "takers":
|
|
output_lines.append("--- Top 5 Takers by Volume (Rolling Windows) ---")
|
|
for title, delta in time_windows[1:]:
|
|
btc_trades = [t for t in btc_trades_copy if t['time'] > now_utc - delta]
|
|
eth_trades = [t for t in eth_trades_copy if t['time'] > now_utc - delta]
|
|
btc_lines = build_top_takers_table(f"BTC - {title}", btc_trades)
|
|
eth_lines = build_top_takers_table(f"ETH - {title}", eth_trades)
|
|
for i in range(len(btc_lines)):
|
|
output_lines.append(f"{btc_lines[i]:<45}{separator}{eth_lines[i] if i < len(eth_lines) else ''}")
|
|
output_lines.append("")
|
|
|
|
elif view == "active_takers":
|
|
output_lines.append("--- Top 5 Active Takers by Trade Count (Rolling Windows) ---")
|
|
for title, delta in time_windows[1:]:
|
|
btc_trades = [t for t in btc_trades_copy if t['time'] > now_utc - delta]
|
|
eth_trades = [t for t in eth_trades_copy if t['time'] > now_utc - delta]
|
|
btc_lines = build_top_active_takers_table(f"BTC - {title}", btc_trades)
|
|
eth_lines = build_top_active_takers_table(f"ETH - {title}", eth_trades)
|
|
header_width = 85
|
|
for i in range(len(btc_lines)):
|
|
output_lines.append(f"{btc_lines[i]:<{header_width}}{separator}{eth_lines[i] if i < len(eth_lines) else ''}")
|
|
output_lines.append("")
|
|
|
|
if decoded_trade_output:
|
|
output_lines.extend(decoded_trade_output)
|
|
else:
|
|
for _ in range(17): output_lines.append("")
|
|
|
|
final_output = "\n".join(output_lines) + "\n\x1b[J"
|
|
print(final_output, end="")
|
|
|
|
_lines_printed = len(output_lines)
|
|
sys.stdout.flush()
|
|
|
|
|
|
def main():
|
|
"""Main function to set up the WebSocket and run the display loop."""
|
|
parser = argparse.ArgumentParser(description="Live market data dashboard for Hyperliquid.")
|
|
parser.add_argument("--view", default="trades", choices=['trades', 'takers', 'active_takers'],
|
|
help="The data view to display: 'trades' (default), 'takers', or 'active_takers'.")
|
|
args = parser.parse_args()
|
|
|
|
coins_to_watch = get_coins_from_strategies()
|
|
if not ("BTC" in coins_to_watch and "ETH" in coins_to_watch):
|
|
print("This script is configured to display BTC and ETH. Please ensure they are in your strategies.", file=sys.stderr)
|
|
return
|
|
|
|
info = Info(constants.MAINNET_API_URL, skip_ws=False)
|
|
|
|
for coin in ["BTC", "ETH"]:
|
|
trade_subscription = {"type": "trades", "coin": coin}
|
|
info.subscribe(trade_subscription, on_message)
|
|
print(f"Subscribed to Trades for {coin}")
|
|
time.sleep(0.2)
|
|
|
|
print(f"\nDisplaying live '{args.view}' summary... Press Ctrl+C to stop.")
|
|
try:
|
|
while True:
|
|
update_decoded_trade_display()
|
|
display_dashboard(view=args.view)
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\nStopping WebSocket listener...")
|
|
info.ws_manager.stop()
|
|
print("Listener stopped.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|