live market websocket and monitoring wallets
This commit is contained in:
221
address_monitor.py
Normal file
221
address_monitor.py
Normal file
@ -0,0 +1,221 @@
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import argparse
|
||||
from datetime import datetime, timezone
|
||||
from hyperliquid.info import Info
|
||||
from hyperliquid.utils import constants
|
||||
from collections import deque
|
||||
import logging
|
||||
import csv
|
||||
|
||||
from logging_utils import setup_logging
|
||||
|
||||
# --- Configuration ---
|
||||
DEFAULT_ADDRESSES_TO_WATCH = [
|
||||
#"0xd4c1f7e8d876c4749228d515473d36f919583d1d",
|
||||
"0x0fd468a73084daa6ea77a9261e40fdec3e67e0c7",
|
||||
# "0x4d69495d16fab95c3c27b76978affa50301079d0",
|
||||
# "0x09bc1cf4d9f0b59e1425a8fde4d4b1f7d3c9410d",
|
||||
"0xc6ac58a7a63339898aeda32499a8238a46d88e84",
|
||||
"0xa8ef95dbd3db55911d3307930a84b27d6e969526",
|
||||
# "0x4129c62faf652fea61375dcd9ca8ce24b2bb8b95",
|
||||
"0xbf1935fe7ab6d0aa3ee8d3da47c2f80e215b2a1c",
|
||||
]
|
||||
MAX_FILLS_TO_DISPLAY = 10
|
||||
LOGS_DIR = "_logs"
|
||||
recent_fills = {}
|
||||
_lines_printed = 0
|
||||
|
||||
TABLE_HEADER = f"{'Time (UTC)':<10} | {'Coin':<6} | {'Side':<5} | {'Size':>15} | {'Price':>15} | {'Value (USD)':>20}"
|
||||
TABLE_WIDTH = len(TABLE_HEADER)
|
||||
|
||||
def log_fill_to_csv(address: str, fill_data: dict):
|
||||
"""Appends a single fill record to the CSV file for a specific address."""
|
||||
log_file_path = os.path.join(LOGS_DIR, f"fills_{address}.csv")
|
||||
file_exists = os.path.exists(log_file_path)
|
||||
|
||||
# The CSV will store a flattened version of the decoded fill
|
||||
csv_row = {
|
||||
'time_utc': fill_data['time'].isoformat(),
|
||||
'coin': fill_data['coin'],
|
||||
'side': fill_data['side'],
|
||||
'price': fill_data['price'],
|
||||
'size': fill_data['size'],
|
||||
'value_usd': fill_data['value']
|
||||
}
|
||||
|
||||
try:
|
||||
with open(log_file_path, 'a', newline='', encoding='utf-8') as f:
|
||||
writer = csv.DictWriter(f, fieldnames=csv_row.keys())
|
||||
if not file_exists:
|
||||
writer.writeheader()
|
||||
writer.writerow(csv_row)
|
||||
except IOError as e:
|
||||
logging.error(f"Failed to write to CSV log for {address}: {e}")
|
||||
|
||||
def on_message(message):
|
||||
"""
|
||||
Callback function to process incoming userEvents from the WebSocket.
|
||||
"""
|
||||
try:
|
||||
logging.debug(f"Received message: {message}")
|
||||
channel = message.get("channel")
|
||||
if channel in ("user", "userFills"):
|
||||
data = message.get("data")
|
||||
if not data:
|
||||
return
|
||||
|
||||
user_address = data.get("user", "").lower()
|
||||
fills = data.get("fills", [])
|
||||
|
||||
if user_address in recent_fills and fills:
|
||||
logging.info(f"Fill detected for user: {user_address}")
|
||||
for fill_data in fills:
|
||||
decoded_fill = {
|
||||
"time": datetime.fromtimestamp(fill_data['time'] / 1000, tz=timezone.utc),
|
||||
"coin": fill_data['coin'],
|
||||
"side": "BUY" if fill_data['side'] == "B" else "SELL",
|
||||
"price": float(fill_data['px']),
|
||||
"size": float(fill_data['sz']),
|
||||
"value": float(fill_data['px']) * float(fill_data['sz']),
|
||||
}
|
||||
recent_fills[user_address].append(decoded_fill)
|
||||
# --- ADDED: Log every fill to its CSV file ---
|
||||
log_fill_to_csv(user_address, decoded_fill)
|
||||
|
||||
except (KeyError, TypeError, ValueError) as e:
|
||||
logging.error(f"Error processing message: {e} | Data: {message}")
|
||||
|
||||
def build_fills_table(address: str, fills: deque) -> list:
|
||||
"""Builds the formatted lines for a single address's fills table."""
|
||||
lines = []
|
||||
short_address = f"{address[:6]}...{address[-4:]}"
|
||||
|
||||
lines.append(f"--- Fills for {short_address} ---")
|
||||
lines.append(TABLE_HEADER)
|
||||
lines.append("-" * TABLE_WIDTH)
|
||||
|
||||
for fill in list(fills):
|
||||
lines.append(
|
||||
f"{fill['time'].strftime('%H:%M:%S'):<10} | "
|
||||
f"{fill['coin']:<6} | "
|
||||
f"{fill['side']:<5} | "
|
||||
f"{fill['size']:>15.4f} | "
|
||||
f"{fill['price']:>15,.2f} | "
|
||||
f"${fill['value']:>18,.2f}"
|
||||
)
|
||||
|
||||
padding_needed = MAX_FILLS_TO_DISPLAY - len(fills)
|
||||
for _ in range(padding_needed):
|
||||
lines.append("")
|
||||
|
||||
return lines
|
||||
|
||||
def display_dashboard():
|
||||
"""
|
||||
Clears the screen and prints a two-column layout of recent fills tables.
|
||||
"""
|
||||
global _lines_printed
|
||||
|
||||
if _lines_printed > 0:
|
||||
print(f"\x1b[{_lines_printed}A", end="")
|
||||
|
||||
output_lines = ["--- Live Address Fill Monitor ---", ""]
|
||||
|
||||
addresses_to_display = list(recent_fills.keys())
|
||||
num_addresses = len(addresses_to_display)
|
||||
mid_point = (num_addresses + 1) // 2
|
||||
left_column_addresses = addresses_to_display[:mid_point]
|
||||
right_column_addresses = addresses_to_display[mid_point:]
|
||||
|
||||
separator = " | "
|
||||
|
||||
for i in range(mid_point):
|
||||
left_address = left_column_addresses[i]
|
||||
left_table_lines = build_fills_table(left_address, recent_fills[left_address])
|
||||
|
||||
right_table_lines = []
|
||||
if i < len(right_column_addresses):
|
||||
right_address = right_column_addresses[i]
|
||||
right_table_lines = build_fills_table(right_address, recent_fills[right_address])
|
||||
|
||||
table_height = 3 + MAX_FILLS_TO_DISPLAY
|
||||
for j in range(table_height):
|
||||
left_part = left_table_lines[j] if j < len(left_table_lines) else ""
|
||||
right_part = right_table_lines[j] if j < len(right_table_lines) else ""
|
||||
output_lines.append(f"{left_part:<{TABLE_WIDTH}}{separator}{right_part}")
|
||||
output_lines.append("")
|
||||
|
||||
final_output = "\n".join(output_lines) + "\n\x1b[J"
|
||||
print(final_output, end="")
|
||||
|
||||
_lines_printed = len(output_lines)
|
||||
sys.stdout.flush()
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function to set up the WebSocket and run the display loop.
|
||||
"""
|
||||
global recent_fills
|
||||
parser = argparse.ArgumentParser(description="Monitor live fills for specific wallet addresses on Hyperliquid.")
|
||||
parser.add_argument(
|
||||
"--addresses",
|
||||
nargs='+',
|
||||
default=DEFAULT_ADDRESSES_TO_WATCH,
|
||||
help="A space-separated list of Ethereum addresses to monitor."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--log-level",
|
||||
default="normal",
|
||||
choices=['off', 'normal', 'debug'],
|
||||
help="Set the logging level for the script."
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
setup_logging(args.log_level, 'AddressMonitor')
|
||||
|
||||
# --- ADDED: Ensure the logs directory exists ---
|
||||
if not os.path.exists(LOGS_DIR):
|
||||
os.makedirs(LOGS_DIR)
|
||||
|
||||
addresses_to_watch = []
|
||||
for addr in args.addresses:
|
||||
clean_addr = addr.strip().lower()
|
||||
if len(clean_addr) == 42 and clean_addr.startswith('0x'):
|
||||
addresses_to_watch.append(clean_addr)
|
||||
else:
|
||||
logging.warning(f"Invalid or malformed address provided: '{addr}'. Skipping.")
|
||||
|
||||
recent_fills = {addr: deque(maxlen=MAX_FILLS_TO_DISPLAY) for addr in addresses_to_watch}
|
||||
|
||||
if not addresses_to_watch:
|
||||
print("No valid addresses configured to watch. Exiting.", file=sys.stderr)
|
||||
return
|
||||
|
||||
info = Info(constants.MAINNET_API_URL, skip_ws=False)
|
||||
|
||||
for addr in addresses_to_watch:
|
||||
try:
|
||||
info.subscribe({"type": "userFills", "user": addr}, on_message)
|
||||
logging.debug(f"Queued subscribe for userFills: {addr}")
|
||||
time.sleep(0.02)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to subscribe for {addr}: {e}")
|
||||
|
||||
logging.info(f"Subscribed to userFills for {len(addresses_to_watch)} addresses")
|
||||
|
||||
print("\nDisplaying live fill data... Press Ctrl+C to stop.")
|
||||
try:
|
||||
while True:
|
||||
display_dashboard()
|
||||
time.sleep(0.2)
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopping WebSocket listener...")
|
||||
info.ws_manager.stop()
|
||||
print("Listener stopped.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user