import os import sys import time import json import argparse from datetime import datetime, timezone from hyperliquid.info import Info from hyperliquid.utils import constants from collections import deque import logging import csv from logging_utils import setup_logging # --- Configuration --- DEFAULT_ADDRESSES_TO_WATCH = [ #"0xd4c1f7e8d876c4749228d515473d36f919583d1d", "0x47930c76790c865217472f2ddb4d14c640ee450a", # "0x4d69495d16fab95c3c27b76978affa50301079d0", # "0x09bc1cf4d9f0b59e1425a8fde4d4b1f7d3c9410d", "0xc6ac58a7a63339898aeda32499a8238a46d88e84", "0xa8ef95dbd3db55911d3307930a84b27d6e969526", # "0x4129c62faf652fea61375dcd9ca8ce24b2bb8b95", "0x32885a6adac4375858E6edC092EfDDb0Ef46484C", ] MAX_FILLS_TO_DISPLAY = 10 LOGS_DIR = "_logs" recent_fills = {} _lines_printed = 0 TABLE_HEADER = f"{'Time (UTC)':<10} | {'Coin':<6} | {'Side':<5} | {'Size':>15} | {'Price':>15} | {'Value (USD)':>20}" TABLE_WIDTH = len(TABLE_HEADER) def log_fill_to_csv(address: str, fill_data: dict): """Appends a single fill record to the CSV file for a specific address.""" log_file_path = os.path.join(LOGS_DIR, f"fills_{address}.csv") file_exists = os.path.exists(log_file_path) # The CSV will store a flattened version of the decoded fill csv_row = { 'time_utc': fill_data['time'].isoformat(), 'coin': fill_data['coin'], 'side': fill_data['side'], 'price': fill_data['price'], 'size': fill_data['size'], 'value_usd': fill_data['value'] } try: with open(log_file_path, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=csv_row.keys()) if not file_exists: writer.writeheader() writer.writerow(csv_row) except IOError as e: logging.error(f"Failed to write to CSV log for {address}: {e}") def on_message(message): """ Callback function to process incoming userEvents from the WebSocket. """ try: logging.debug(f"Received message: {message}") channel = message.get("channel") if channel in ("user", "userFills"): data = message.get("data") if not data: return user_address = data.get("user", "").lower() fills = data.get("fills", []) if user_address in recent_fills and fills: logging.info(f"Fill detected for user: {user_address}") for fill_data in fills: decoded_fill = { "time": datetime.fromtimestamp(fill_data['time'] / 1000, tz=timezone.utc), "coin": fill_data['coin'], "side": "BUY" if fill_data['side'] == "B" else "SELL", "price": float(fill_data['px']), "size": float(fill_data['sz']), "value": float(fill_data['px']) * float(fill_data['sz']), } recent_fills[user_address].append(decoded_fill) # --- ADDED: Log every fill to its CSV file --- log_fill_to_csv(user_address, decoded_fill) except (KeyError, TypeError, ValueError) as e: logging.error(f"Error processing message: {e} | Data: {message}") def build_fills_table(address: str, fills: deque) -> list: """Builds the formatted lines for a single address's fills table.""" lines = [] short_address = f"{address[:6]}...{address[-4:]}" lines.append(f"--- Fills for {short_address} ---") lines.append(TABLE_HEADER) lines.append("-" * TABLE_WIDTH) for fill in list(fills): lines.append( f"{fill['time'].strftime('%H:%M:%S'):<10} | " f"{fill['coin']:<6} | " f"{fill['side']:<5} | " f"{fill['size']:>15.4f} | " f"{fill['price']:>15,.2f} | " f"${fill['value']:>18,.2f}" ) padding_needed = MAX_FILLS_TO_DISPLAY - len(fills) for _ in range(padding_needed): lines.append("") return lines def display_dashboard(): """ Clears the screen and prints a two-column layout of recent fills tables. """ global _lines_printed if _lines_printed > 0: print(f"\x1b[{_lines_printed}A", end="") output_lines = ["--- Live Address Fill Monitor ---", ""] addresses_to_display = list(recent_fills.keys()) num_addresses = len(addresses_to_display) mid_point = (num_addresses + 1) // 2 left_column_addresses = addresses_to_display[:mid_point] right_column_addresses = addresses_to_display[mid_point:] separator = " | " for i in range(mid_point): left_address = left_column_addresses[i] left_table_lines = build_fills_table(left_address, recent_fills[left_address]) right_table_lines = [] if i < len(right_column_addresses): right_address = right_column_addresses[i] right_table_lines = build_fills_table(right_address, recent_fills[right_address]) table_height = 3 + MAX_FILLS_TO_DISPLAY for j in range(table_height): left_part = left_table_lines[j] if j < len(left_table_lines) else "" right_part = right_table_lines[j] if j < len(right_table_lines) else "" output_lines.append(f"{left_part:<{TABLE_WIDTH}}{separator}{right_part}") output_lines.append("") final_output = "\n".join(output_lines) + "\n\x1b[J" print(final_output, end="") _lines_printed = len(output_lines) sys.stdout.flush() def main(): """ Main function to set up the WebSocket and run the display loop. """ global recent_fills parser = argparse.ArgumentParser(description="Monitor live fills for specific wallet addresses on Hyperliquid.") parser.add_argument( "--addresses", nargs='+', default=DEFAULT_ADDRESSES_TO_WATCH, help="A space-separated list of Ethereum addresses to monitor." ) parser.add_argument( "--log-level", default="normal", choices=['off', 'normal', 'debug'], help="Set the logging level for the script." ) args = parser.parse_args() setup_logging(args.log_level, 'AddressMonitor') # --- ADDED: Ensure the logs directory exists --- if not os.path.exists(LOGS_DIR): os.makedirs(LOGS_DIR) addresses_to_watch = [] for addr in args.addresses: clean_addr = addr.strip().lower() if len(clean_addr) == 42 and clean_addr.startswith('0x'): addresses_to_watch.append(clean_addr) else: logging.warning(f"Invalid or malformed address provided: '{addr}'. Skipping.") recent_fills = {addr: deque(maxlen=MAX_FILLS_TO_DISPLAY) for addr in addresses_to_watch} if not addresses_to_watch: print("No valid addresses configured to watch. Exiting.", file=sys.stderr) return info = Info(constants.MAINNET_API_URL, skip_ws=False) for addr in addresses_to_watch: try: info.subscribe({"type": "userFills", "user": addr}, on_message) logging.debug(f"Queued subscribe for userFills: {addr}") time.sleep(0.02) except Exception as e: logging.error(f"Failed to subscribe for {addr}: {e}") logging.info(f"Subscribed to userFills for {len(addresses_to_watch)} addresses") print("\nDisplaying live fill data... Press Ctrl+C to stop.") try: while True: display_dashboard() time.sleep(0.2) except KeyboardInterrupt: print("\nStopping WebSocket listener...") info.ws_manager.stop() print("Listener stopped.") if __name__ == "__main__": main()