import argparse import logging import os import sys import json import time from datetime import datetime from eth_account import Account from hyperliquid.exchange import Exchange from hyperliquid.info import Info from hyperliquid.utils import constants from dotenv import load_dotenv from logging_utils import setup_logging from trade_log import log_trade # Load environment variables from a .env file load_dotenv() class TradeExecutor: """ Monitors strategy signals, executes trades, logs all trade actions to a persistent CSV, and maintains a live JSON status of the account. """ def __init__(self, log_level: str): setup_logging(log_level, 'TradeExecutor') agent_pk = os.environ.get("AGENT_PRIVATE_KEY") if not agent_pk: logging.error("AGENT_PRIVATE_KEY environment variable not set. Cannot execute trades.") sys.exit(1) self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") if not self.vault_address: logging.error("MAIN_WALLET_ADDRESS environment variable not set. Cannot query account state.") sys.exit(1) self.account = Account.from_key(agent_pk) logging.info(f"Trade Executor initialized. Agent: {self.account.address}, Vault: {self.vault_address}") self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address) self.info = Info(constants.MAINNET_API_URL, skip_ws=True) strategy_config_path = os.path.join("_data", "strategies.json") try: with open(strategy_config_path, 'r') as f: self.strategy_configs = {name: config for name, config in json.load(f).items() if config.get("enabled")} logging.info(f"Loaded {len(self.strategy_configs)} enabled strategies.") except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load strategies from '{strategy_config_path}': {e}") sys.exit(1) self.status_file_path = os.path.join("_logs", "trade_executor_status.json") def _save_executor_status(self, perpetuals_state, spot_state, all_market_contexts): """Saves the current balances and open positions from both accounts to a live status file.""" status = { "last_updated_utc": datetime.now().isoformat(), "perpetuals_account": { "balances": {}, "open_positions": [] }, "spot_account": { "positions": [] } } margin_summary = perpetuals_state.get("marginSummary", {}) status["perpetuals_account"]["balances"] = { "account_value": margin_summary.get("accountValue"), "total_margin_used": margin_summary.get("totalMarginUsed"), "withdrawable": margin_summary.get("withdrawable") } asset_positions = perpetuals_state.get("assetPositions", []) for asset_pos in asset_positions: pos = asset_pos.get('position', {}) if float(pos.get('szi', 0)) != 0: position_value = float(pos.get('positionValue', 0)) margin_used = float(pos.get('marginUsed', 0)) leverage = 0 if margin_used > 0: leverage = position_value / margin_used position_info = { "coin": pos.get('coin'), "size": pos.get('szi'), "position_value": pos.get('positionValue'), "entry_price": pos.get('entryPx'), "mark_price": pos.get('markPx'), "pnl": pos.get('unrealizedPnl'), "liq_price": pos.get('liquidationPx'), "margin": pos.get('marginUsed'), "funding": pos.get('fundingRate'), "leverage": f"{leverage:.1f}x" } status["perpetuals_account"]["open_positions"].append(position_info) price_map = { asset.get("universe", {}).get("name"): asset.get("markPx") for asset in all_market_contexts if asset.get("universe", {}).get("name") } spot_balances = spot_state.get("balances", []) for bal in spot_balances: total_balance = float(bal.get('total', 0)) if total_balance > 0: coin = bal.get('coin') mark_price = float(price_map.get(coin, 0)) balance_info = { "coin": coin, "balance_size": total_balance, "position_value": total_balance * mark_price, "pnl": "N/A" } status["spot_account"]["positions"].append(balance_info) try: with open(self.status_file_path, 'w', encoding='utf-8') as f: json.dump(status, f, indent=4) logging.debug(f"Successfully updated live executor status at '{self.status_file_path}'") except IOError as e: logging.error(f"Failed to write live executor status file: {e}") def run(self): """The main execution loop.""" logging.info("Starting Trade Executor loop...") while True: try: perpetuals_state = self.info.user_state(self.vault_address) spot_state = self.info.spot_user_state(self.vault_address) meta, asset_contexts = self.info.meta_and_asset_ctxs() open_positions = {} for asset_pos in perpetuals_state.get('assetPositions', []): pos_details = asset_pos.get('position', {}) if float(pos_details.get('szi', 0)) != 0: open_positions[pos_details.get('coin')] = pos_details self._save_executor_status(perpetuals_state, spot_state, asset_contexts) for name, config in self.strategy_configs.items(): coin = config['parameters'].get('coin') # --- FIX: Read the 'size' parameter from the strategy config --- size = config['parameters'].get('size') status_file = os.path.join("_data", f"strategy_status_{name}.json") if not os.path.exists(status_file): continue with open(status_file, 'r') as f: status = json.load(f) signal = status.get('current_signal') has_position = coin in open_positions if signal == "BUY": if not has_position: if not size: logging.error(f"[{name}] 'size' parameter not defined in strategies.json. Skipping trade.") continue # --- Using the 'size' from config for all BUY signals --- logging.warning(f"[{name}] SIGNAL: BUY for {coin}. ACTION: Opening new long position of size {size}.") # Placeholder for live trading logic # self.exchange.market_open(coin, True, size, None, 0.01) price = status.get('signal_price', 0) log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=price, size=size, signal=signal) elif signal == "SELL": if has_position: position_details = open_positions[coin] position_size = float(position_details.get('szi', 0)) # Only close if it's a long position. Short logic would go here. if position_size > 0: logging.warning(f"[{name}] SIGNAL: SELL for {coin}. ACTION: Closing existing long position.") # Placeholder for live trading logic # self.exchange.market_close(coin) price = float(position_details.get('markPx', 0)) pnl = float(position_details.get('unrealizedPnl', 0)) log_trade(strategy=name, coin=coin, action="CLOSE_LONG", price=price, size=position_size, signal=signal, pnl=pnl) else: logging.info(f"[{name}] SIGNAL: {signal} for {coin}. ACTION: No trade needed (Position: {'Open' if has_position else 'None'}).") except Exception as e: logging.error(f"An error occurred in the main executor loop: {e}") time.sleep(15) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run the Trade Executor.") parser.add_argument( "--log-level", default="normal", choices=['off', 'normal', 'debug'], help="Set the logging level for the script." ) args = parser.parse_args() executor = TradeExecutor(log_level=args.log_level) try: executor.run() except KeyboardInterrupt: logging.info("Trade Executor stopped.")