import logging import os import sys import json import time import argparse # <-- THE FIX: Added this import from datetime import datetime from eth_account import Account from hyperliquid.info import Info from hyperliquid.utils import constants from dotenv import load_dotenv from logging_utils import setup_logging # Load .env file load_dotenv() class DashboardDataFetcher: """ A dedicated, lightweight process that runs in a loop to fetch and save the account's state (balances, positions) for the main dashboard to display. """ def __init__(self, log_level: str): setup_logging(log_level, 'DashboardDataFetcher') self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") if not self.vault_address: logging.error("MAIN_WALLET_ADDRESS not set in .env file. Cannot proceed.") sys.exit(1) self.info = Info(constants.MAINNET_API_URL, skip_ws=True) # Use absolute path to ensure consistency across different working directories project_root = os.path.dirname(os.path.abspath(__file__)) self.status_file_path = os.path.join(project_root, "_logs", "trade_executor_status.json") self.managed_positions_path = os.path.join(project_root, "_data", "executor_managed_positions.json") logging.info(f"Dashboard Data Fetcher initialized for vault: {self.vault_address}") def load_managed_positions(self) -> dict: """Loads the state of which strategy manages which position.""" if os.path.exists(self.managed_positions_path): try: with open(self.managed_positions_path, 'r') as f: data = json.load(f) # Create a reverse map: {coin: strategy_name} return {v['coin']: k for k, v in data.items()} except (IOError, json.JSONDecodeError): logging.warning("Could not read managed positions file.") return {} def fetch_and_save_status(self): """Fetches all account data and saves it to JSON status file.""" try: perpetuals_state = self.info.user_state(self.vault_address) spot_state = self.info.spot_user_state(self.vault_address) meta, all_market_contexts = self.info.meta_and_asset_ctxs() coin_to_strategy_map = self.load_managed_positions() status = { "last_updated_utc": datetime.now().isoformat(), "perpetuals_account": { "balances": {}, "open_positions": [] }, "spot_account": { "positions": [] } } # 1. Extract Perpetuals Account Data margin_summary = perpetuals_state.get("marginSummary", {}) status["perpetuals_account"]["balances"] = { "account_value": margin_summary.get("accountValue"), "total_margin_used": margin_summary.get("totalMarginUsed"), "withdrawable": margin_summary.get("withdrawable") } asset_positions = perpetuals_state.get("assetPositions", []) for asset_pos in asset_positions: pos = asset_pos.get('position', {}) if float(pos.get('szi', 0)) != 0: coin = pos.get('coin') position_value = float(pos.get('positionValue', 0)) margin_used = float(pos.get('marginUsed', 0)) leverage = position_value / margin_used if margin_used > 0 else 0 position_info = { "coin": coin, "strategy": coin_to_strategy_map.get(coin, "Unmanaged"), "size": pos.get('szi'), "position_value": pos.get('positionValue'), "entry_price": pos.get('entryPx'), "mark_price": pos.get('markPx'), "pnl": pos.get('unrealizedPnl'), "liq_price": pos.get('liquidationPx'), "margin": pos.get('marginUsed'), "funding": pos.get('fundingRate'), "leverage": f"{leverage:.1f}x" } status["perpetuals_account"]["open_positions"].append(position_info) # 2. Extract Spot Account Data price_map = { asset.get("universe", {}).get("name"): asset.get("markPx") for asset in all_market_contexts if asset.get("universe", {}).get("name") } spot_balances = spot_state.get("balances", []) for bal in spot_balances: total_balance = float(bal.get('total', 0)) if total_balance > 0: coin = bal.get('coin') mark_price = float(price_map.get(coin, 0)) status["spot_account"]["positions"].append({ "coin": coin, "balance_size": total_balance, "position_value": total_balance * mark_price, "pnl": "N/A" }) # 3. Ensure directory exists and write to file # Ensure the _logs directory exists logs_dir = os.path.dirname(self.status_file_path) os.makedirs(logs_dir, exist_ok=True) # Use atomic write to prevent partial reads from main_app temp_file_path = self.status_file_path + ".tmp" with open(temp_file_path, 'w', encoding='utf-8') as f: json.dump(status, f, indent=4) # Rename is atomic os.replace(temp_file_path, self.status_file_path) logging.debug(f"Successfully updated dashboard status file.") except Exception as e: logging.error(f"Failed to fetch or save account status: {e}") def run(self): """Main loop to periodically fetch and save data.""" while True: self.fetch_and_save_status() time.sleep(5) # Update dashboard data every 5 seconds if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run the Dashboard Data Fetcher.") parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) args = parser.parse_args() fetcher = DashboardDataFetcher(log_level=args.log_level) try: fetcher.run() except KeyboardInterrupt: logging.info("Dashboard Data Fetcher stopped.")