import os import time import logging import sys import math import json from dotenv import load_dotenv # --- FIX: Add project root to sys.path to import local modules --- current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) sys.path.append(project_root) # Now we can import from root from logging_utils import setup_logging from eth_account import Account from hyperliquid.exchange import Exchange from hyperliquid.info import Info from hyperliquid.utils import constants # Load environment variables from .env in current directory dotenv_path = os.path.join(current_dir, '.env') if os.path.exists(dotenv_path): load_dotenv(dotenv_path) else: # Fallback to default search load_dotenv() # Setup Logging using project convention setup_logging("normal", "CLP_HEDGER") # --- CONFIGURATION DEFAULTS (Can be overridden by JSON) --- REBALANCE_THRESHOLD = 0.15 # ETH CHECK_INTERVAL = 30 # Seconds LEVERAGE = 5 STATUS_FILE = "hedge_status.json" # Gap Recovery Configuration PRICE_BUFFER_PCT = 0.004 # 0.5% buffer to prevent churn TIME_BUFFER_SECONDS = 120 # 2 minutes wait between mode switches def get_manual_position_config(): """Reads hedge_status.json and returns the first OPEN MANUAL position dict, or None.""" if not os.path.exists(STATUS_FILE): return None try: with open(STATUS_FILE, 'r') as f: data = json.load(f) for entry in data: if entry.get('type') == 'MANUAL' and entry.get('status') == 'OPEN': return entry except Exception as e: logging.error(f"ERROR reading status file: {e}") return None class HyperliquidStrategy: def __init__(self, entry_weth, entry_price, low_range, high_range, start_price, static_long=0.4): # Your Pool Configuration self.entry_weth = entry_weth self.entry_price = entry_price self.low_range = low_range self.high_range = high_range self.static_long = static_long # Gap Recovery State self.start_price = start_price # GAP = max(0, ENTRY - START). If Start > Entry (we are winning), Gap is 0. self.gap = max(0.0, entry_price - start_price) self.recovery_target = entry_price + (2 * self.gap) self.current_mode = "NORMAL" # "NORMAL" (100% Hedge) or "RECOVERY" (0% Hedge) self.last_switch_time = 0 logging.info(f"Strategy Init. Start Px: {start_price:.2f} | Gap: {self.gap:.2f} | Recovery Tgt: {self.recovery_target:.2f}") # Calculate Constant Liquidity (L) once # Formula: L = x / (1/sqrt(P) - 1/sqrt(Pb)) try: sqrt_P = math.sqrt(entry_price) sqrt_Pb = math.sqrt(high_range) self.L = entry_weth / ((1/sqrt_P) - (1/sqrt_Pb)) logging.info(f"Liquidity (L): {self.L:.4f}") except Exception as e: logging.error(f"Error calculating liquidity: {e}") sys.exit(1) def get_pool_delta(self, current_price): """Calculates how much ETH the pool currently holds (The Risk)""" # If price is above range, you hold 0 ETH (100% USDC) if current_price >= self.high_range: return 0.0 # If price is below range, you hold Max ETH if current_price <= self.low_range: sqrt_Pa = math.sqrt(self.low_range) sqrt_Pb = math.sqrt(self.high_range) return self.L * ((1/sqrt_Pa) - (1/sqrt_Pb)) # If in range, calculate active ETH sqrt_P = math.sqrt(current_price) sqrt_Pb = math.sqrt(self.high_range) return self.L * ((1/sqrt_P) - (1/sqrt_Pb)) def calculate_rebalance(self, current_price, current_short_position_size): """ Determines if we need to trade and the exact order size. """ # 1. Base Target (Full Hedge) pool_delta = self.get_pool_delta(current_price) raw_target_short = pool_delta + self.static_long # 2. Determine Mode (Normal vs Recovery) # Buffers entry_upper = self.entry_price * (1 + PRICE_BUFFER_PCT) entry_lower = self.entry_price * (1 - PRICE_BUFFER_PCT) desired_mode = self.current_mode # Default to staying same if self.current_mode == "NORMAL": # Switch to RECOVERY if: # Price > Entry + Buffer AND Price < Recovery Target if current_price > entry_upper and current_price < self.recovery_target: desired_mode = "RECOVERY" elif self.current_mode == "RECOVERY": # Switch back to NORMAL if: # Price < Entry - Buffer (Fell back down) OR Price > Recovery Target (Finished) if current_price < entry_lower or current_price >= self.recovery_target: desired_mode = "NORMAL" # 3. Apply Time Buffer now = time.time() if desired_mode != self.current_mode: if (now - self.last_switch_time) >= TIME_BUFFER_SECONDS: logging.info(f"🔄 MODE SWITCH: {self.current_mode} -> {desired_mode} (Px: {current_price:.2f})") self.current_mode = desired_mode self.last_switch_time = now else: logging.info(f"⏳ Mode Switch Delayed (Time Buffer). Pending: {desired_mode}") # 4. Set Final Target based on Mode if self.current_mode == "RECOVERY": target_short_size = 0.0 logging.info(f"🩹 RECOVERY MODE ACTIVE (0% Hedge). Target: {self.recovery_target:.2f}") else: target_short_size = raw_target_short # 5. Calculate Difference diff = target_short_size - abs(current_short_position_size) return { "current_price": current_price, "pool_delta": pool_delta, "target_short": target_short_size, "raw_target": raw_target_short, "current_short": abs(current_short_position_size), "diff": diff, # Positive = SELL more (Add Short), Negative = BUY (Reduce Short) "action": "SELL" if diff > 0 else "BUY", "mode": self.current_mode } def round_to_sz_decimals(amount, sz_decimals=4): """ Hyperliquid requires specific rounding 'szDecimals'. For ETH, this is usually 4 (e.g., 1.2345). """ factor = 10 ** sz_decimals # Use floor to avoid rounding up into money you don't have, # but strictly simply rounding is often sufficient for small adjustments. # Using round() standard here. return round(abs(amount), sz_decimals) def round_to_sig_figs(x, sig_figs=5): """ Rounds a number to a specified number of significant figures. Hyperliquid prices generally require 5 significant figures. """ if x == 0: return 0.0 return round(x, sig_figs - int(math.floor(math.log10(abs(x)))) - 1) class CLPHedger: def __init__(self): self.private_key = os.environ.get("HEDGER_PRIVATE_KEY") or os.environ.get("AGENT_PRIVATE_KEY") self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") if not self.private_key: logging.error("No private key found (HEDGER_PRIVATE_KEY or AGENT_PRIVATE_KEY) in .env") sys.exit(1) if not self.vault_address: logging.warning("MAIN_WALLET_ADDRESS not found in .env. Assuming Agent is the Vault (not strictly recommended for CLPs).") self.account = Account.from_key(self.private_key) # API Connection self.info = Info(constants.MAINNET_API_URL, skip_ws=True) # Note: If this agent is trading on behalf of a Vault (Main Account), # the exchange object needs the vault's address as `account_address`. self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address) # Load Manual Config from JSON self.manual_config = get_manual_position_config() self.coin_symbol = "ETH" # Default, but will try to read from JSON self.sz_decimals = 4 self.strategy = None if self.manual_config: self.coin_symbol = self.manual_config.get('coin_symbol', 'ETH') if self.manual_config.get('hedge_enabled', False): self._init_strategy() else: logging.warning("MANUAL position found but 'hedge_enabled' is FALSE. Hedger will remain idle.") else: logging.warning("No MANUAL position found in hedge_status.json. Hedger will remain idle.") # Set Leverage on Initialization (if coin symbol known) try: logging.info(f"Setting leverage to {LEVERAGE}x (Cross) for {self.coin_symbol}...") self.exchange.update_leverage(LEVERAGE, self.coin_symbol, is_cross=True) except Exception as e: logging.error(f"Failed to update leverage: {e}") # Fetch meta once to get szDecimals self.sz_decimals = self._get_sz_decimals(self.coin_symbol) logging.info(f"CLP Hedger initialized. Agent: {self.account.address}. Coin: {self.coin_symbol} (Decimals: {self.sz_decimals})") def _init_strategy(self): try: entry_p = self.manual_config['entry_price'] lower = self.manual_config['range_lower'] upper = self.manual_config['range_upper'] static_long = self.manual_config.get('static_long', 0.0) # Require entry_amount0 (or entry_weth) entry_weth = self.manual_config.get('entry_amount0', 0.45) # Default to 0.45 if missing for now start_price = self.get_market_price(self.coin_symbol) if start_price is None: logging.warning("Waiting for initial price to start strategy...") # Logic will retry in run loop return self.strategy = HyperliquidStrategy( entry_weth=entry_weth, entry_price=entry_p, low_range=lower, high_range=upper, start_price=start_price, static_long=static_long ) logging.info(f"Strategy Initialized for {self.coin_symbol}.") except Exception as e: logging.error(f"Failed to init strategy: {e}") self.strategy = None def _get_sz_decimals(self, coin): try: meta = self.info.meta() for asset in meta["universe"]: if asset["name"] == coin: return asset["szDecimals"] logging.warning(f"Could not find szDecimals for {coin}, defaulting to 4.") return 4 except Exception as e: logging.error(f"Failed to fetch meta: {e}") return 4 def get_funding_rate(self, coin): try: meta, asset_ctxs = self.info.meta_and_asset_ctxs() for i, asset in enumerate(meta["universe"]): if asset["name"] == coin: # Funding rate is in the asset context at same index return float(asset_ctxs[i]["funding"]) return 0.0 except Exception as e: logging.error(f"Error fetching funding rate: {e}") return 0.0 def get_market_price(self, coin): try: # Get all mids is efficient mids = self.info.all_mids() if coin in mids: return float(mids[coin]) else: logging.error(f"Price for {coin} not found in all_mids.") return None except Exception as e: logging.error(f"Error fetching price: {e}") return None def get_current_position(self, coin): try: # We need the User State of the Vault (or the account we are trading for) user_state = self.info.user_state(self.vault_address or self.account.address) for pos in user_state["assetPositions"]: if pos["position"]["coin"] == coin: # szi is the size. Positive = Long, Negative = Short. return float(pos["position"]["szi"]) return 0.0 # No position except Exception as e: logging.error(f"Error fetching position: {e}") return 0.0 def execute_trade(self, coin, is_buy, size, price): logging.info(f"🚀 EXECUTING: {coin} {'BUY' if is_buy else 'SELL'} {size} @ ~{price}") # Check for reduceOnly logic # If we are BUYING to reduce a SHORT, it is reduceOnly. # If we are SELLING to increase a SHORT, it is NOT reduceOnly. # Since we are essentially managing a Short hedge: # Action BUY = Reducing Hedge -> reduceOnly=True # Action SELL = Increasing Hedge -> reduceOnly=False reduce_only = is_buy try: # Market order (limit with aggressive TIF or just widely crossing limit) # Hyperliquid SDK 'order' method parameters: coin, is_buy, sz, limit_px, order_type, reduce_only # We use a limit price slightly better than market to ensure fill or just use market price logic # Using a simplistic "Market" approach by setting limit far away slippage = 0.05 # 5% slippage tolerance raw_limit_px = price * (1.05 if is_buy else 0.95) limit_px = round_to_sig_figs(raw_limit_px, 5) order_result = self.exchange.order( coin, is_buy, size, limit_px, {"limit": {"tif": "Ioc"}}, reduce_only=reduce_only ) status = order_result["status"] if status == "ok": response_data = order_result["response"]["data"] if "statuses" in response_data and "error" in response_data["statuses"][0]: logging.error(f"Order API Error: {response_data['statuses'][0]['error']}") else: logging.info(f"✅ Trade Success: {response_data}") else: logging.error(f"Order Failed: {order_result}") except Exception as e: logging.error(f"Exception during trade execution: {e}") def close_all_positions(self): logging.info("Attempting to close all open positions...") try: # 1. Get latest price price = self.get_market_price(COIN_SYMBOL) if price is None: logging.error("Could not fetch price to close positions. Aborting close.") return # 2. Get current position current_pos = self.get_current_position(COIN_SYMBOL) if current_pos == 0: logging.info("No open positions to close.") return # 3. Determine Side and Size # If Short (-), we need to Buy (+). # If Long (+), we need to Sell (-). is_buy = current_pos < 0 abs_size = abs(current_pos) # Ensure size is rounded correctly for the API final_size = round_to_sz_decimals(abs_size, self.sz_decimals) if final_size == 0: logging.info("Position size effectively 0 after rounding.") return logging.info(f"Closing Position: {current_pos} {COIN_SYMBOL} -> Action: {'BUY' if is_buy else 'SELL'} {final_size}") # 4. Execute self.execute_trade(COIN_SYMBOL, is_buy, final_size, price) except Exception as e: logging.error(f"Error during close_all_positions: {e}") def run(self): logging.info(f"Starting Hedge Monitor Loop. Interval: {CHECK_INTERVAL}s") while True: try: # Reload Config periodically self.manual_config = get_manual_position_config() # Check Global Enable Switch if not self.manual_config or not self.manual_config.get('hedge_enabled', False): # If previously active, close? # Yes, safety first. if self.strategy is not None: logging.info("Hedge Disabled. Closing any remaining positions.") self.close_all_positions() self.strategy = None else: # Just idle check to keep connection alive or log occasionally # logging.info("Idle. Hedge Disabled.") pass time.sleep(CHECK_INTERVAL) continue # If enabled but strategy not init, Init it. if self.strategy is None: self._init_strategy() if self.strategy is None: # Init failed time.sleep(CHECK_INTERVAL) continue # 1. Get Data price = self.get_market_price(COIN_SYMBOL) if price is None: time.sleep(5) continue funding_rate = self.get_funding_rate(COIN_SYMBOL) current_pos_size = self.get_current_position(COIN_SYMBOL) # 2. Calculate Logic # Pass raw size (e.g. -1.5). The strategy handles the logic. calc = self.strategy.calculate_rebalance(price, current_pos_size) diff_abs = abs(calc['diff']) trade_size = round_to_sz_decimals(diff_abs, self.sz_decimals) # Logging Status status_msg = ( f"Price: {price:.2f} | Fund: {funding_rate:.6f} | " f"Mode: {calc['mode']} | " f"Pool Delta: {calc['pool_delta']:.3f} | " f"Tgt Short: {calc['target_short']:.3f} | " f"Act Short: {calc['current_short']:.3f} | " f"Diff: {calc['diff']:.3f}" ) if calc.get('is_recovering'): status_msg += f" | 🩹 REC MODE ({calc['raw_target']:.3f} -> {calc['target_short']:.3f})" logging.info(status_msg) # 3. Check Threshold if diff_abs >= REBALANCE_THRESHOLD: if trade_size > 0: logging.info(f"⚡ THRESHOLD TRIGGERED ({diff_abs:.3f} >= {REBALANCE_THRESHOLD})") is_buy = (calc['action'] == "BUY") self.execute_trade(COIN_SYMBOL, is_buy, trade_size, price) else: logging.info("Trade size rounds to 0. Skipping.") time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: logging.info("Stopping Hedger...") self.close_all_positions() break except Exception as e: logging.error(f"Loop Error: {e}", exc_info=True) time.sleep(10) if __name__ == "__main__": hedger = CLPHedger() hedger.run()