Files
hyper/whale_tracker.py

368 lines
15 KiB
Python

import json
import os
import time
import requests
import logging
import argparse
import sys
from datetime import datetime, timedelta
# --- Configuration ---
# !! IMPORTANT: Update this to your actual Hyperliquid API endpoint !!
API_ENDPOINT = "https://api.hyperliquid.xyz/info"
INPUT_FILE = os.path.join("_data", "wallets_to_track.json")
OUTPUT_FILE = os.path.join("_data", "wallets_info.json")
LOGS_DIR = "_logs"
LOG_FILE = os.path.join(LOGS_DIR, "whale_tracker.log")
# Polling intervals (in seconds)
POLL_INTERVALS = {
'core_data': 10, # 5-15s range
'open_orders': 20, # 15-30s range
'account_metrics': 180, # 1-5m range
'ledger_updates': 600, # 5-15m range
'save_data': 5, # How often to write to wallets_info.json
'reload_wallets': 60 # Check for wallet list changes every 60s
}
class HyperliquidAPI:
"""
Client to handle POST requests to the Hyperliquid info endpoint.
"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
logging.info(f"API Client initialized for endpoint: {base_url}")
def post_request(self, payload):
"""
Internal helper to send POST requests and handle errors.
"""
try:
response = self.session.post(self.base_url, json=payload, timeout=10)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
return response.json()
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP Error: {e.response.status_code} for {e.request.url}. Response: {e.response.text}")
except requests.exceptions.ConnectionError as e:
logging.error(f"Connection Error: {e}")
except requests.exceptions.Timeout:
logging.error(f"Request timed out for payload: {payload.get('type')}")
except json.JSONDecodeError:
logging.error(f"Failed to decode JSON response. Response text: {response.text if 'response' in locals() else 'No response text'}")
except Exception as e:
logging.error(f"An unexpected error occurred in post_request: {e}", exc_info=True)
return None
def get_user_state(self, user_address: str):
payload = {"type": "clearinghouseState", "user": user_address}
return self.post_request(payload)
def get_open_orders(self, user_address: str):
payload = {"type": "openOrders", "user": user_address}
return self.post_request(payload)
def get_user_rate_limit(self, user_address: str):
payload = {"type": "userRateLimit", "user": user_address}
return self.post_request(payload)
def get_user_ledger_updates(self, user_address: str, start_time_ms: int, end_time_ms: int):
payload = {
"type": "userNonFundingLedgerUpdates",
"user": user_address,
"startTime": start_time_ms,
"endTime": end_time_ms
}
return self.post_request(payload)
class WalletTracker:
"""
Main class to track wallets, process data, and store results.
"""
def __init__(self, api_client, wallets_to_track):
self.api = api_client
self.wallets = wallets_to_track # This is the list of dicts
self.wallets_by_name = {w['name']: w for w in self.wallets}
self.wallets_data = {
wallet['name']: {"address": wallet['address']} for wallet in self.wallets
}
logging.info(f"WalletTracker initialized for {len(self.wallets)} wallets.")
def reload_wallets(self):
"""
Checks the INPUT_FILE for changes and updates the tracked wallet list.
"""
logging.debug("Reloading wallet list...")
try:
with open(INPUT_FILE, 'r') as f:
new_wallets_list = json.load(f)
if not isinstance(new_wallets_list, list):
logging.warning(f"Failed to reload '{INPUT_FILE}': content is not a list.")
return
new_wallets_by_name = {w['name']: w for w in new_wallets_list}
old_names = set(self.wallets_by_name.keys())
new_names = set(new_wallets_by_name.keys())
added_names = new_names - old_names
removed_names = old_names - new_names
if not added_names and not removed_names:
logging.debug("Wallet list is unchanged.")
return # No changes
# Update internal wallet list
self.wallets = new_wallets_list
self.wallets_by_name = new_wallets_by_name
# Add new wallets to wallets_data
for name in added_names:
self.wallets_data[name] = {"address": self.wallets_by_name[name]['address']}
logging.info(f"Added new wallet to track: {name}")
# Remove old wallets from wallets_data
for name in removed_names:
if name in self.wallets_data:
del self.wallets_data[name]
logging.info(f"Removed wallet from tracking: {name}")
logging.info(f"Wallet list reloaded. Tracking {len(self.wallets)} wallets.")
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
logging.error(f"Failed to reload and parse '{INPUT_FILE}': {e}")
except Exception as e:
logging.error(f"Unexpected error during wallet reload: {e}", exc_info=True)
def calculate_core_metrics(self, state_data: dict) -> dict:
"""
Performs calculations based on user_state data.
"""
if not state_data or 'crossMarginSummary' not in state_data:
logging.warning("Core state data is missing 'crossMarginSummary'.")
return {"raw_state": state_data}
summary = state_data['crossMarginSummary']
account_value = float(summary.get('accountValue', 0))
margin_used = float(summary.get('totalMarginUsed', 0))
# Calculations
margin_utilization = (margin_used / account_value) if account_value > 0 else 0
available_margin = account_value - margin_used
total_position_value = 0
if 'assetPositions' in state_data:
for pos in state_data.get('assetPositions', []):
try:
# Use 'value' for position value
pos_value_str = pos.get('position', {}).get('value', '0')
total_position_value += float(pos_value_str)
except (ValueError, TypeError):
logging.warning(f"Could not parse position value: {pos.get('position', {}).get('value')}")
continue
portfolio_leverage = (total_position_value / account_value) if account_value > 0 else 0
# Return calculated metrics alongside raw data
return {
"raw_state": state_data,
"account_value": account_value,
"margin_used": margin_used,
"margin_utilization": margin_utilization,
"available_margin": available_margin,
"total_position_value": total_position_value,
"portfolio_leverage": portfolio_leverage
}
def poll_core_data(self):
logging.debug("Polling Core Data...")
# Use self.wallets which is updated by reload_wallets
for wallet in self.wallets:
name = wallet['name']
address = wallet['address']
state_data = self.api.get_user_state(address)
if state_data:
calculated_data = self.calculate_core_metrics(state_data)
# Ensure wallet hasn't been removed by a concurrent reload
if name in self.wallets_data:
self.wallets_data[name]['core_state'] = calculated_data
time.sleep(0.1) # Avoid bursting requests
def poll_open_orders(self):
logging.debug("Polling Open Orders...")
for wallet in self.wallets:
name = wallet['name']
address = wallet['address']
orders_data = self.api.get_open_orders(address)
if orders_data:
# TODO: Add calculations for 'pending_margin_required' if logic is available
if name in self.wallets_data:
self.wallets_data[name]['open_orders'] = {"raw_orders": orders_data}
time.sleep(0.1)
def poll_account_metrics(self):
logging.debug("Polling Account Metrics...")
for wallet in self.wallets:
name = wallet['name']
address = wallet['address']
metrics_data = self.api.get_user_rate_limit(address)
if metrics_data:
if name in self.wallets_data:
self.wallets_data[name]['account_metrics'] = metrics_data
time.sleep(0.1)
def poll_ledger_updates(self):
logging.debug("Polling Ledger Updates...")
end_time_ms = int(datetime.now().timestamp() * 1000)
start_time_ms = int((datetime.now() - timedelta(minutes=15)).timestamp() * 1000)
for wallet in self.wallets:
name = wallet['name']
address = wallet['address']
ledger_data = self.api.get_user_ledger_updates(address, start_time_ms, end_time_ms)
if ledger_data:
if name in self.wallets_data:
self.wallets_data[name]['ledger_updates'] = ledger_data
time.sleep(0.1)
def save_data_to_json(self):
"""
Atomically writes the current wallet data to the output JSON file.
(No longer needs cleaning logic)
"""
logging.debug(f"Saving data to {OUTPUT_FILE}...")
temp_file = OUTPUT_FILE + ".tmp"
try:
# Save the data
with open(temp_file, 'w', encoding='utf-8') as f:
# self.wallets_data is automatically kept clean by reload_wallets
json.dump(self.wallets_data, f, indent=2)
# Atomic rename (move)
os.replace(temp_file, OUTPUT_FILE)
except (IOError, json.JSONDecodeError) as e:
logging.error(f"Failed to write wallet data to file: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during file save: {e}")
if os.path.exists(temp_file):
os.remove(temp_file)
class WhaleTrackerRunner:
"""
Manages the polling loop using last-run timestamps instead of a complex scheduler.
"""
def __init__(self, api_client, wallets, shared_whale_data_dict=None): # Kept arg for compatibility
self.tracker = WalletTracker(api_client, wallets)
self.last_poll_times = {key: 0 for key in POLL_INTERVALS}
self.poll_intervals = POLL_INTERVALS
logging.info("WhaleTrackerRunner initialized to save to JSON file.")
def update_shared_data(self):
"""
This function is no longer called by the run loop.
It's kept here to prevent errors if imported elsewhere, but is now unused.
"""
logging.debug("No shared dict, saving data to JSON file.")
self.tracker.save_data_to_json()
def run(self):
logging.info("Starting main polling loop...")
while True:
try:
now = time.time()
if now - self.last_poll_times['reload_wallets'] > self.poll_intervals['reload_wallets']:
self.tracker.reload_wallets()
self.last_poll_times['reload_wallets'] = now
if now - self.last_poll_times['core_data'] > self.poll_intervals['core_data']:
self.tracker.poll_core_data()
self.last_poll_times['core_data'] = now
if now - self.last_poll_times['open_orders'] > self.poll_intervals['open_orders']:
self.tracker.poll_open_orders()
self.last_poll_times['open_orders'] = now
if now - self.last_poll_times['account_metrics'] > self.poll_intervals['account_metrics']:
self.tracker.poll_account_metrics()
self.last_poll_times['account_metrics'] = now
if now - self.last_poll_times['ledger_updates'] > self.poll_intervals['ledger_updates']:
self.tracker.poll_ledger_updates()
self.last_poll_times['ledger_updates'] = now
if now - self.last_poll_times['save_data'] > self.poll_intervals['save_data']:
self.tracker.save_data_to_json() # <-- NEW
self.last_poll_times['save_data'] = now
# Sleep for a short duration to prevent busy-waiting
time.sleep(1)
except Exception as e:
logging.critical(f"Unhandled exception in main loop: {e}", exc_info=True)
time.sleep(10)
def setup_logging(log_level_str: str, process_name: str):
"""Configures logging for the script."""
if not os.path.exists(LOGS_DIR):
try:
os.makedirs(LOGS_DIR)
except OSError as e:
print(f"Failed to create logs directory {LOGS_DIR}: {e}")
return
level_map = {
'debug': logging.DEBUG,
'normal': logging.INFO,
'off': logging.NOTSET
}
log_level = level_map.get(log_level_str.lower(), logging.INFO)
if log_level == logging.NOTSET:
return
handlers_list = [logging.FileHandler(LOG_FILE, mode='a')]
if sys.stdout.isatty():
handlers_list.append(logging.StreamHandler(sys.stdout))
logging.basicConfig(
level=log_level,
format=f"%(asctime)s.%(msecs)03d | {process_name:<20} | %(levelname)-8s | %(message)s",
datefmt='%Y-%m-%d %H:%M:%S',
handlers=handlers_list
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Hyperliquid Whale Tracker")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
setup_logging(args.log_level, "WhaleTracker")
# Load wallets to track
wallets_to_track = []
try:
with open(INPUT_FILE, 'r') as f:
wallets_to_track = json.load(f)
if not isinstance(wallets_to_track, list) or not wallets_to_track:
raise ValueError(f"'{INPUT_FILE}' is empty or not a list.")
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
logging.critical(f"Failed to load '{INPUT_FILE}': {e}. Exiting.")
sys.exit(1)
# Initialize API client
api_client = HyperliquidAPI(base_url=API_ENDPOINT)
# Initialize and run the tracker
runner = WhaleTrackerRunner(api_client, wallets_to_track, shared_whale_data_dict=None)
try:
runner.run()
except KeyboardInterrupt:
logging.info("Whale Tracker shutting down.")
sys.exit(0)