368 lines
15 KiB
Python
368 lines
15 KiB
Python
import json
|
|
import os
|
|
import time
|
|
import requests
|
|
import logging
|
|
import argparse
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
|
|
# --- Configuration ---
|
|
# !! IMPORTANT: Update this to your actual Hyperliquid API endpoint !!
|
|
API_ENDPOINT = "https://api.hyperliquid.xyz/info"
|
|
|
|
INPUT_FILE = os.path.join("_data", "wallets_to_track.json")
|
|
OUTPUT_FILE = os.path.join("_data", "wallets_info.json")
|
|
LOGS_DIR = "_logs"
|
|
LOG_FILE = os.path.join(LOGS_DIR, "whale_tracker.log")
|
|
|
|
# Polling intervals (in seconds)
|
|
POLL_INTERVALS = {
|
|
'core_data': 10, # 5-15s range
|
|
'open_orders': 20, # 15-30s range
|
|
'account_metrics': 180, # 1-5m range
|
|
'ledger_updates': 600, # 5-15m range
|
|
'save_data': 5, # How often to write to wallets_info.json
|
|
'reload_wallets': 60 # Check for wallet list changes every 60s
|
|
}
|
|
|
|
class HyperliquidAPI:
|
|
"""
|
|
Client to handle POST requests to the Hyperliquid info endpoint.
|
|
"""
|
|
def __init__(self, base_url):
|
|
self.base_url = base_url
|
|
self.session = requests.Session()
|
|
logging.info(f"API Client initialized for endpoint: {base_url}")
|
|
|
|
def post_request(self, payload):
|
|
"""
|
|
Internal helper to send POST requests and handle errors.
|
|
"""
|
|
try:
|
|
response = self.session.post(self.base_url, json=payload, timeout=10)
|
|
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
logging.error(f"HTTP Error: {e.response.status_code} for {e.request.url}. Response: {e.response.text}")
|
|
except requests.exceptions.ConnectionError as e:
|
|
logging.error(f"Connection Error: {e}")
|
|
except requests.exceptions.Timeout:
|
|
logging.error(f"Request timed out for payload: {payload.get('type')}")
|
|
except json.JSONDecodeError:
|
|
logging.error(f"Failed to decode JSON response. Response text: {response.text if 'response' in locals() else 'No response text'}")
|
|
except Exception as e:
|
|
logging.error(f"An unexpected error occurred in post_request: {e}", exc_info=True)
|
|
return None
|
|
|
|
def get_user_state(self, user_address: str):
|
|
payload = {"type": "clearinghouseState", "user": user_address}
|
|
return self.post_request(payload)
|
|
|
|
def get_open_orders(self, user_address: str):
|
|
payload = {"type": "openOrders", "user": user_address}
|
|
return self.post_request(payload)
|
|
|
|
def get_user_rate_limit(self, user_address: str):
|
|
payload = {"type": "userRateLimit", "user": user_address}
|
|
return self.post_request(payload)
|
|
|
|
def get_user_ledger_updates(self, user_address: str, start_time_ms: int, end_time_ms: int):
|
|
payload = {
|
|
"type": "userNonFundingLedgerUpdates",
|
|
"user": user_address,
|
|
"startTime": start_time_ms,
|
|
"endTime": end_time_ms
|
|
}
|
|
return self.post_request(payload)
|
|
|
|
class WalletTracker:
|
|
"""
|
|
Main class to track wallets, process data, and store results.
|
|
"""
|
|
def __init__(self, api_client, wallets_to_track):
|
|
self.api = api_client
|
|
self.wallets = wallets_to_track # This is the list of dicts
|
|
self.wallets_by_name = {w['name']: w for w in self.wallets}
|
|
self.wallets_data = {
|
|
wallet['name']: {"address": wallet['address']} for wallet in self.wallets
|
|
}
|
|
logging.info(f"WalletTracker initialized for {len(self.wallets)} wallets.")
|
|
|
|
def reload_wallets(self):
|
|
"""
|
|
Checks the INPUT_FILE for changes and updates the tracked wallet list.
|
|
"""
|
|
logging.debug("Reloading wallet list...")
|
|
try:
|
|
with open(INPUT_FILE, 'r') as f:
|
|
new_wallets_list = json.load(f)
|
|
if not isinstance(new_wallets_list, list):
|
|
logging.warning(f"Failed to reload '{INPUT_FILE}': content is not a list.")
|
|
return
|
|
|
|
new_wallets_by_name = {w['name']: w for w in new_wallets_list}
|
|
old_names = set(self.wallets_by_name.keys())
|
|
new_names = set(new_wallets_by_name.keys())
|
|
|
|
added_names = new_names - old_names
|
|
removed_names = old_names - new_names
|
|
|
|
if not added_names and not removed_names:
|
|
logging.debug("Wallet list is unchanged.")
|
|
return # No changes
|
|
|
|
# Update internal wallet list
|
|
self.wallets = new_wallets_list
|
|
self.wallets_by_name = new_wallets_by_name
|
|
|
|
# Add new wallets to wallets_data
|
|
for name in added_names:
|
|
self.wallets_data[name] = {"address": self.wallets_by_name[name]['address']}
|
|
logging.info(f"Added new wallet to track: {name}")
|
|
|
|
# Remove old wallets from wallets_data
|
|
for name in removed_names:
|
|
if name in self.wallets_data:
|
|
del self.wallets_data[name]
|
|
logging.info(f"Removed wallet from tracking: {name}")
|
|
|
|
logging.info(f"Wallet list reloaded. Tracking {len(self.wallets)} wallets.")
|
|
|
|
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
|
|
logging.error(f"Failed to reload and parse '{INPUT_FILE}': {e}")
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error during wallet reload: {e}", exc_info=True)
|
|
|
|
|
|
def calculate_core_metrics(self, state_data: dict) -> dict:
|
|
"""
|
|
Performs calculations based on user_state data.
|
|
"""
|
|
if not state_data or 'crossMarginSummary' not in state_data:
|
|
logging.warning("Core state data is missing 'crossMarginSummary'.")
|
|
return {"raw_state": state_data}
|
|
|
|
summary = state_data['crossMarginSummary']
|
|
account_value = float(summary.get('accountValue', 0))
|
|
margin_used = float(summary.get('totalMarginUsed', 0))
|
|
|
|
# Calculations
|
|
margin_utilization = (margin_used / account_value) if account_value > 0 else 0
|
|
available_margin = account_value - margin_used
|
|
|
|
total_position_value = 0
|
|
if 'assetPositions' in state_data:
|
|
for pos in state_data.get('assetPositions', []):
|
|
try:
|
|
# Use 'value' for position value
|
|
pos_value_str = pos.get('position', {}).get('value', '0')
|
|
total_position_value += float(pos_value_str)
|
|
except (ValueError, TypeError):
|
|
logging.warning(f"Could not parse position value: {pos.get('position', {}).get('value')}")
|
|
continue
|
|
|
|
portfolio_leverage = (total_position_value / account_value) if account_value > 0 else 0
|
|
|
|
# Return calculated metrics alongside raw data
|
|
return {
|
|
"raw_state": state_data,
|
|
"account_value": account_value,
|
|
"margin_used": margin_used,
|
|
"margin_utilization": margin_utilization,
|
|
"available_margin": available_margin,
|
|
"total_position_value": total_position_value,
|
|
"portfolio_leverage": portfolio_leverage
|
|
}
|
|
|
|
def poll_core_data(self):
|
|
logging.debug("Polling Core Data...")
|
|
# Use self.wallets which is updated by reload_wallets
|
|
for wallet in self.wallets:
|
|
name = wallet['name']
|
|
address = wallet['address']
|
|
state_data = self.api.get_user_state(address)
|
|
if state_data:
|
|
calculated_data = self.calculate_core_metrics(state_data)
|
|
# Ensure wallet hasn't been removed by a concurrent reload
|
|
if name in self.wallets_data:
|
|
self.wallets_data[name]['core_state'] = calculated_data
|
|
time.sleep(0.1) # Avoid bursting requests
|
|
|
|
def poll_open_orders(self):
|
|
logging.debug("Polling Open Orders...")
|
|
for wallet in self.wallets:
|
|
name = wallet['name']
|
|
address = wallet['address']
|
|
orders_data = self.api.get_open_orders(address)
|
|
if orders_data:
|
|
# TODO: Add calculations for 'pending_margin_required' if logic is available
|
|
if name in self.wallets_data:
|
|
self.wallets_data[name]['open_orders'] = {"raw_orders": orders_data}
|
|
time.sleep(0.1)
|
|
|
|
def poll_account_metrics(self):
|
|
logging.debug("Polling Account Metrics...")
|
|
for wallet in self.wallets:
|
|
name = wallet['name']
|
|
address = wallet['address']
|
|
metrics_data = self.api.get_user_rate_limit(address)
|
|
if metrics_data:
|
|
if name in self.wallets_data:
|
|
self.wallets_data[name]['account_metrics'] = metrics_data
|
|
time.sleep(0.1)
|
|
|
|
def poll_ledger_updates(self):
|
|
logging.debug("Polling Ledger Updates...")
|
|
end_time_ms = int(datetime.now().timestamp() * 1000)
|
|
start_time_ms = int((datetime.now() - timedelta(minutes=15)).timestamp() * 1000)
|
|
|
|
for wallet in self.wallets:
|
|
name = wallet['name']
|
|
address = wallet['address']
|
|
ledger_data = self.api.get_user_ledger_updates(address, start_time_ms, end_time_ms)
|
|
if ledger_data:
|
|
if name in self.wallets_data:
|
|
self.wallets_data[name]['ledger_updates'] = ledger_data
|
|
time.sleep(0.1)
|
|
|
|
def save_data_to_json(self):
|
|
"""
|
|
Atomically writes the current wallet data to the output JSON file.
|
|
(No longer needs cleaning logic)
|
|
"""
|
|
logging.debug(f"Saving data to {OUTPUT_FILE}...")
|
|
|
|
temp_file = OUTPUT_FILE + ".tmp"
|
|
try:
|
|
# Save the data
|
|
with open(temp_file, 'w', encoding='utf-8') as f:
|
|
# self.wallets_data is automatically kept clean by reload_wallets
|
|
json.dump(self.wallets_data, f, indent=2)
|
|
# Atomic rename (move)
|
|
os.replace(temp_file, OUTPUT_FILE)
|
|
except (IOError, json.JSONDecodeError) as e:
|
|
logging.error(f"Failed to write wallet data to file: {e}")
|
|
except Exception as e:
|
|
logging.error(f"An unexpected error occurred during file save: {e}")
|
|
if os.path.exists(temp_file):
|
|
os.remove(temp_file)
|
|
|
|
class WhaleTrackerRunner:
|
|
"""
|
|
Manages the polling loop using last-run timestamps instead of a complex scheduler.
|
|
"""
|
|
def __init__(self, api_client, wallets, shared_whale_data_dict=None): # Kept arg for compatibility
|
|
self.tracker = WalletTracker(api_client, wallets)
|
|
self.last_poll_times = {key: 0 for key in POLL_INTERVALS}
|
|
self.poll_intervals = POLL_INTERVALS
|
|
logging.info("WhaleTrackerRunner initialized to save to JSON file.")
|
|
|
|
def update_shared_data(self):
|
|
"""
|
|
This function is no longer called by the run loop.
|
|
It's kept here to prevent errors if imported elsewhere, but is now unused.
|
|
"""
|
|
logging.debug("No shared dict, saving data to JSON file.")
|
|
self.tracker.save_data_to_json()
|
|
|
|
|
|
def run(self):
|
|
logging.info("Starting main polling loop...")
|
|
while True:
|
|
try:
|
|
now = time.time()
|
|
|
|
if now - self.last_poll_times['reload_wallets'] > self.poll_intervals['reload_wallets']:
|
|
self.tracker.reload_wallets()
|
|
self.last_poll_times['reload_wallets'] = now
|
|
|
|
if now - self.last_poll_times['core_data'] > self.poll_intervals['core_data']:
|
|
self.tracker.poll_core_data()
|
|
self.last_poll_times['core_data'] = now
|
|
|
|
if now - self.last_poll_times['open_orders'] > self.poll_intervals['open_orders']:
|
|
self.tracker.poll_open_orders()
|
|
self.last_poll_times['open_orders'] = now
|
|
|
|
if now - self.last_poll_times['account_metrics'] > self.poll_intervals['account_metrics']:
|
|
self.tracker.poll_account_metrics()
|
|
self.last_poll_times['account_metrics'] = now
|
|
|
|
if now - self.last_poll_times['ledger_updates'] > self.poll_intervals['ledger_updates']:
|
|
self.tracker.poll_ledger_updates()
|
|
self.last_poll_times['ledger_updates'] = now
|
|
|
|
if now - self.last_poll_times['save_data'] > self.poll_intervals['save_data']:
|
|
self.tracker.save_data_to_json() # <-- NEW
|
|
self.last_poll_times['save_data'] = now
|
|
|
|
# Sleep for a short duration to prevent busy-waiting
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
logging.critical(f"Unhandled exception in main loop: {e}", exc_info=True)
|
|
time.sleep(10)
|
|
|
|
def setup_logging(log_level_str: str, process_name: str):
|
|
"""Configures logging for the script."""
|
|
if not os.path.exists(LOGS_DIR):
|
|
try:
|
|
os.makedirs(LOGS_DIR)
|
|
except OSError as e:
|
|
print(f"Failed to create logs directory {LOGS_DIR}: {e}")
|
|
return
|
|
|
|
level_map = {
|
|
'debug': logging.DEBUG,
|
|
'normal': logging.INFO,
|
|
'off': logging.NOTSET
|
|
}
|
|
log_level = level_map.get(log_level_str.lower(), logging.INFO)
|
|
|
|
if log_level == logging.NOTSET:
|
|
return
|
|
|
|
handlers_list = [logging.FileHandler(LOG_FILE, mode='a')]
|
|
|
|
if sys.stdout.isatty():
|
|
handlers_list.append(logging.StreamHandler(sys.stdout))
|
|
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format=f"%(asctime)s.%(msecs)03d | {process_name:<20} | %(levelname)-8s | %(message)s",
|
|
datefmt='%Y-%m-%d %H:%M:%S',
|
|
handlers=handlers_list
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Hyperliquid Whale Tracker")
|
|
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
|
|
args = parser.parse_args()
|
|
|
|
setup_logging(args.log_level, "WhaleTracker")
|
|
|
|
# Load wallets to track
|
|
wallets_to_track = []
|
|
try:
|
|
with open(INPUT_FILE, 'r') as f:
|
|
wallets_to_track = json.load(f)
|
|
if not isinstance(wallets_to_track, list) or not wallets_to_track:
|
|
raise ValueError(f"'{INPUT_FILE}' is empty or not a list.")
|
|
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
|
|
logging.critical(f"Failed to load '{INPUT_FILE}': {e}. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# Initialize API client
|
|
api_client = HyperliquidAPI(base_url=API_ENDPOINT)
|
|
|
|
# Initialize and run the tracker
|
|
runner = WhaleTrackerRunner(api_client, wallets_to_track, shared_whale_data_dict=None)
|
|
|
|
try:
|
|
runner.run()
|
|
except KeyboardInterrupt:
|
|
logging.info("Whale Tracker shutting down.")
|
|
sys.exit(0)
|
|
|