Files
hyper/dashboard_data_fetcher.py
opencode 25e9a22a8e Fix DashboardDataFetcher path resolution error
- Use absolute path for status file to ensure consistency across subprocess execution
- Add os.makedirs() call to ensure _logs directory exists
- Prevents 'No such file or directory' error when running as subprocess
- Fixes issue: [Errno 2] No such file or directory: '_logs/trade_executor_status.json.tmp'
2025-11-11 00:38:07 +01:00

144 lines
6.3 KiB
Python

import logging
import os
import sys
import json
import time
import argparse # <-- THE FIX: Added this import
from datetime import datetime
from eth_account import Account
from hyperliquid.info import Info
from hyperliquid.utils import constants
from dotenv import load_dotenv
from logging_utils import setup_logging
# Load .env file
load_dotenv()
class DashboardDataFetcher:
"""
A dedicated, lightweight process that runs in a loop to fetch and save
the account's state (balances, positions) for the main dashboard to display.
"""
def __init__(self, log_level: str):
setup_logging(log_level, 'DashboardDataFetcher')
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.vault_address:
logging.error("MAIN_WALLET_ADDRESS not set in .env file. Cannot proceed.")
sys.exit(1)
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
# Use absolute path to ensure consistency across different working directories
project_root = os.path.dirname(os.path.abspath(__file__))
self.status_file_path = os.path.join(project_root, "_logs", "trade_executor_status.json")
self.managed_positions_path = os.path.join(project_root, "_data", "executor_managed_positions.json")
logging.info(f"Dashboard Data Fetcher initialized for vault: {self.vault_address}")
def load_managed_positions(self) -> dict:
"""Loads the state of which strategy manages which position."""
if os.path.exists(self.managed_positions_path):
try:
with open(self.managed_positions_path, 'r') as f:
data = json.load(f)
# Create a reverse map: {coin: strategy_name}
return {v['coin']: k for k, v in data.items()}
except (IOError, json.JSONDecodeError):
logging.warning("Could not read managed positions file.")
return {}
def fetch_and_save_status(self):
"""Fetches all account data and saves it to JSON status file."""
try:
perpetuals_state = self.info.user_state(self.vault_address)
spot_state = self.info.spot_user_state(self.vault_address)
meta, all_market_contexts = self.info.meta_and_asset_ctxs()
coin_to_strategy_map = self.load_managed_positions()
status = {
"last_updated_utc": datetime.now().isoformat(),
"perpetuals_account": { "balances": {}, "open_positions": [] },
"spot_account": { "positions": [] }
}
# 1. Extract Perpetuals Account Data
margin_summary = perpetuals_state.get("marginSummary", {})
status["perpetuals_account"]["balances"] = {
"account_value": margin_summary.get("accountValue"),
"total_margin_used": margin_summary.get("totalMarginUsed"),
"withdrawable": margin_summary.get("withdrawable")
}
asset_positions = perpetuals_state.get("assetPositions", [])
for asset_pos in asset_positions:
pos = asset_pos.get('position', {})
if float(pos.get('szi', 0)) != 0:
coin = pos.get('coin')
position_value = float(pos.get('positionValue', 0))
margin_used = float(pos.get('marginUsed', 0))
leverage = position_value / margin_used if margin_used > 0 else 0
position_info = {
"coin": coin,
"strategy": coin_to_strategy_map.get(coin, "Unmanaged"),
"size": pos.get('szi'),
"position_value": pos.get('positionValue'),
"entry_price": pos.get('entryPx'),
"mark_price": pos.get('markPx'),
"pnl": pos.get('unrealizedPnl'),
"liq_price": pos.get('liquidationPx'),
"margin": pos.get('marginUsed'),
"funding": pos.get('fundingRate'),
"leverage": f"{leverage:.1f}x"
}
status["perpetuals_account"]["open_positions"].append(position_info)
# 2. Extract Spot Account Data
price_map = { asset.get("universe", {}).get("name"): asset.get("markPx") for asset in all_market_contexts if asset.get("universe", {}).get("name") }
spot_balances = spot_state.get("balances", [])
for bal in spot_balances:
total_balance = float(bal.get('total', 0))
if total_balance > 0:
coin = bal.get('coin')
mark_price = float(price_map.get(coin, 0))
status["spot_account"]["positions"].append({
"coin": coin, "balance_size": total_balance,
"position_value": total_balance * mark_price, "pnl": "N/A"
})
# 3. Ensure directory exists and write to file
# Ensure the _logs directory exists
logs_dir = os.path.dirname(self.status_file_path)
os.makedirs(logs_dir, exist_ok=True)
# Use atomic write to prevent partial reads from main_app
temp_file_path = self.status_file_path + ".tmp"
with open(temp_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
# Rename is atomic
os.replace(temp_file_path, self.status_file_path)
logging.debug(f"Successfully updated dashboard status file.")
except Exception as e:
logging.error(f"Failed to fetch or save account status: {e}")
def run(self):
"""Main loop to periodically fetch and save data."""
while True:
self.fetch_and_save_status()
time.sleep(5) # Update dashboard data every 5 seconds
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run the Dashboard Data Fetcher.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
fetcher = DashboardDataFetcher(log_level=args.log_level)
try:
fetcher.run()
except KeyboardInterrupt:
logging.info("Dashboard Data Fetcher stopped.")