import os import time import logging import sys import json import math from decimal import Decimal, getcontext, ROUND_DOWN from typing import Optional, Dict, Any, List, Union from dotenv import load_dotenv # --- FIX: Add project root to sys.path to import local modules --- current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) sys.path.append(project_root) # Import local modules try: from logging_utils import setup_logging except ImportError: logging.basicConfig(level=logging.INFO) setup_logging = None from eth_account import Account from hyperliquid.exchange import Exchange from hyperliquid.info import Info from hyperliquid.utils import constants # Load environment variables dotenv_path = os.path.join(current_dir, '.env') load_dotenv(dotenv_path if os.path.exists(dotenv_path) else None) # --- LOGGING SETUP --- # Ensure logs directory exists log_dir = os.path.join(current_dir, 'logs') os.makedirs(log_dir, exist_ok=True) # Custom Filter for Millisecond Unix Timestamp (Matching Manager style) class UnixMsLogFilter(logging.Filter): def filter(self, record): record.unix_ms = int(record.created * 1000) return True # Configure Logging logger = logging.getLogger("SCALPER_HEDGER") logger.setLevel(logging.INFO) logger.handlers.clear() # Clear existing handlers to prevent duplicates # Console Handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_fmt) logger.addHandler(console_handler) # File Handler log_file = os.path.join(log_dir, 'clp_hedger.log') file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.INFO) file_handler.addFilter(UnixMsLogFilter()) file_fmt = logging.Formatter('%(unix_ms)d, %(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_fmt) logger.addHandler(file_handler) # --- DECIMAL PRECISION CONFIGURATION --- getcontext().prec = 50 # --- CONFIGURATION --- COIN_SYMBOL = "ETH" CHECK_INTERVAL = 1 LEVERAGE = 5 STATUS_FILE = "hedge_status.json" # Strategy Zones ZONE_BOTTOM_HEDGE_LIMIT = Decimal("1.0") ZONE_CLOSE_START = Decimal("10.0") ZONE_CLOSE_END = Decimal("11.0") ZONE_TOP_HEDGE_START = Decimal("10.0") # Order Settings PRICE_BUFFER_PCT = Decimal("0.0015") # 0.15% MIN_THRESHOLD_ETH = Decimal("0.012") MIN_ORDER_VALUE_USD = Decimal("10.0") # Capital Safety DYNAMIC_THRESHOLD_MULTIPLIER = Decimal("1.3") MIN_TIME_BETWEEN_TRADES = 25 MAX_HEDGE_MULTIPLIER = Decimal("1.25") # Edge Protection EDGE_PROXIMITY_PCT = Decimal("0.04") VELOCITY_THRESHOLD_PCT = Decimal("0.0005") POSITION_OPEN_EDGE_PROXIMITY_PCT = Decimal("0.06") POSITION_CLOSED_EDGE_PROXIMITY_PCT = Decimal("0.025") LARGE_HEDGE_MULTIPLIER = Decimal("2.8") # --- HELPER FUNCTIONS --- def to_decimal(value: Any) -> Decimal: """Safely convert value to Decimal.""" if value is None: return Decimal("0") return Decimal(str(value)) def round_to_sz_decimals_precise(amount: Decimal, sz_decimals: int) -> float: """Round Decimal amount to specific decimals and return float for SDK.""" if amount == 0: return 0.0 quantizer = Decimal("1").scaleb(-sz_decimals) rounded = amount.quantize(quantizer, rounding=ROUND_DOWN) return float(rounded) def round_to_sig_figs_precise(x: Decimal, sig_figs: int = 5) -> float: """Round Decimal to significant figures and return float for SDK.""" if x == 0: return 0.0 # Use string formatting for sig figs as it's robust return float(f"{x:.{sig_figs}g}") def validate_trade_size(size: Decimal, sz_decimals: int, min_order_value: Decimal, price: Decimal) -> float: """Validate trade size against minimums.""" if size <= 0: return 0.0 # Check minimum order value order_value = size * price if order_value < min_order_value: return 0.0 # Check dust min_size = Decimal("10") ** (-sz_decimals) if size < min_size: return 0.0 return round_to_sz_decimals_precise(size, sz_decimals) # --- STATE MANAGEMENT --- def get_active_automatic_position() -> Optional[Dict]: if not os.path.exists(STATUS_FILE): return None try: with open(STATUS_FILE, 'r') as f: data = json.load(f) # Expecting a list of positions if isinstance(data, list): for entry in data: if entry.get('type') == 'AUTOMATIC' and entry.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']: return entry # Fallback if single dict (legacy) elif isinstance(data, dict): if data.get('type') == 'AUTOMATIC' and data.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']: return data except Exception as e: logger.error(f"ERROR reading status file: {e}") return None def update_position_zones_in_json(token_id: int, zones_data: Dict): if not os.path.exists(STATUS_FILE): return try: with open(STATUS_FILE, 'r') as f: data = json.load(f) # Ensure list if isinstance(data, dict): data = [data] updated = False for entry in data: if entry.get('token_id') == token_id: entry.update(zones_data) updated = True break if updated: with open(STATUS_FILE, 'w') as f: json.dump(data, f, indent=2) logger.info(f"Updated JSON zones for Position {token_id}") except Exception as e: logger.error(f"Error updating JSON zones: {e}") def update_position_stats(token_id: int, stats_data: Dict): if not os.path.exists(STATUS_FILE): return try: with open(STATUS_FILE, 'r') as f: data = json.load(f) if isinstance(data, dict): data = [data] updated = False for entry in data: if entry.get('token_id') == token_id: entry.update(stats_data) updated = True break if updated: with open(STATUS_FILE, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Error updating JSON stats: {e}") # --- STRATEGY CLASS --- class HyperliquidStrategy: def __init__(self, entry_amount0: Decimal, entry_amount1: Decimal, target_value: Decimal, entry_price: Decimal, low_range: Decimal, high_range: Decimal, start_price: Decimal): self.entry_amount0 = entry_amount0 self.entry_amount1 = entry_amount1 self.target_value = target_value self.entry_price = entry_price self.low_range = low_range self.high_range = high_range self.start_price = start_price self.gap = max(Decimal("0.0"), entry_price - start_price) self.recovery_target = entry_price + (Decimal("2") * self.gap) self.L = Decimal("0.0") try: sqrt_P = entry_price.sqrt() sqrt_Pa = low_range.sqrt() sqrt_Pb = high_range.sqrt() # Method 1: Amount0 (WETH) if entry_amount0 > 0: # Assuming amount0 is already in standard units (ETH) from JSON denom0 = (Decimal("1") / sqrt_P) - (Decimal("1") / sqrt_Pb) if denom0 > Decimal("1e-10"): self.L = entry_amount0 / denom0 logger.info(f"Calculated L from Amount0: {self.L:.4f}") # Method 2: Amount1 (USDC) if self.L == 0 and entry_amount1 > 0: denom1 = sqrt_P - sqrt_Pa if denom1 > Decimal("1e-10"): self.L = entry_amount1 / denom1 logger.info(f"Calculated L from Amount1: {self.L:.4f}") # Method 3: Target Value Heuristic if self.L == 0: logger.warning("Amounts missing. Using Target Value Heuristic.") max_eth = target_value / low_range denom_h = (Decimal("1") / sqrt_Pa) - (Decimal("1") / sqrt_Pb) if denom_h > 0: self.L = max_eth / denom_h logger.info(f"Calculated L from Target Value: {self.L:.4f}") else: logger.error("Critical: Invalid Range for L calculation") except Exception as e: logger.error(f"Error calculating liquidity: {e}") sys.exit(1) def get_pool_delta(self, current_price: Decimal) -> Decimal: if current_price >= self.high_range: return Decimal("0.0") if current_price <= self.low_range: sqrt_Pa = self.low_range.sqrt() sqrt_Pb = self.high_range.sqrt() return self.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb)) sqrt_P = current_price.sqrt() sqrt_Pb = self.high_range.sqrt() return self.L * ((Decimal("1")/sqrt_P) - (Decimal("1")/sqrt_Pb)) def calculate_rebalance(self, current_price: Decimal, current_short_size: Decimal) -> Dict: pool_delta = self.get_pool_delta(current_price) # Over-Hedge Logic overhedge_pct = Decimal("0.0") range_width = self.high_range - self.low_range if range_width > 0: price_pct = (current_price - self.low_range) / range_width # If below 80% of range if price_pct < Decimal("0.8"): # Formula: 0.75% boost for every 0.1 drop below 0.8 diff_factor = (Decimal("0.8") - max(Decimal("0.0"), price_pct)) / Decimal("0.1") overhedge_pct = diff_factor * Decimal("0.0075") raw_target_short = pool_delta adjusted_target_short = raw_target_short * (Decimal("1.0") + overhedge_pct) diff = adjusted_target_short - abs(current_short_size) return { "current_price": current_price, "pool_delta": pool_delta, "target_short": adjusted_target_short, "current_short": abs(current_short_size), "diff": diff, "action": "SELL" if diff > 0 else "BUY", "overhedge_pct": overhedge_pct } # --- MAIN HEDGER CLASS --- class ScalperHedger: def __init__(self): self.private_key = os.environ.get("HEDGER_PRIVATE_KEY") self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") if not self.private_key: logger.error("No HEDGER_PRIVATE_KEY found in .env") sys.exit(1) self.account = Account.from_key(self.private_key) self.info = Info(constants.MAINNET_API_URL, skip_ws=True) self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address) # Set Leverage try: logger.info(f"Setting leverage to {LEVERAGE}x (Cross)...") self.exchange.update_leverage(LEVERAGE, COIN_SYMBOL, is_cross=True) except Exception as e: logger.error(f"Failed to update leverage: {e}") self.strategy: Optional[HyperliquidStrategy] = None self.sz_decimals = self._get_sz_decimals(COIN_SYMBOL) self.active_position_id = None # Safety & State self.last_price: Optional[Decimal] = None self.last_trade_time = 0 # Velocity Tracking self.last_price_for_velocity: Optional[Decimal] = None self.price_history: List[Decimal] = [] self.velocity_history: List[Decimal] = [] # PnL Tracking self.strategy_start_time = 0 self.last_pnl_check_time = 0 self.trade_history_seen = set() self.accumulated_pnl = Decimal("0.0") self.accumulated_fees = Decimal("0.0") # Order Tracking self.original_order_side = None logger.info(f"[DELTA] Delta-Zero Scalper Hedger initialized. Agent: {self.account.address}") def _init_strategy(self, position_data: Dict): try: entry_amount0 = to_decimal(position_data.get('amount0_initial', 0)) entry_amount1 = to_decimal(position_data.get('amount1_initial', 0)) target_value = to_decimal(position_data.get('target_value', 50)) entry_price = to_decimal(position_data['entry_price']) lower = to_decimal(position_data['range_lower']) upper = to_decimal(position_data['range_upper']) start_price = self.get_market_price(COIN_SYMBOL) if start_price is None: logger.warning("Waiting for initial price to start strategy...") return self.strategy = HyperliquidStrategy( entry_amount0=entry_amount0, entry_amount1=entry_amount1, target_value=target_value, entry_price=entry_price, low_range=lower, high_range=upper, start_price=start_price ) # Reset State self.last_price = start_price self.last_trade_time = 0 self.price_history = [start_price] self.strategy_start_time = int(time.time() * 1000) self.trade_history_seen = set() self.accumulated_pnl = Decimal("0.0") self.accumulated_fees = Decimal("0.0") self.active_position_id = position_data['token_id'] update_position_stats(self.active_position_id, { "hedge_pnl_realized": 0.0, "hedge_fees_paid": 0.0 }) logger.info(f"[DELTA] Strat Init: Pos {self.active_position_id} | Range: {lower}-{upper} | Entry: {entry_price} | Start Px: {start_price:.2f}") except Exception as e: logger.error(f"Failed to init strategy: {e}") self.strategy = None def _get_sz_decimals(self, coin: str) -> int: try: meta = self.info.meta() for asset in meta["universe"]: if asset["name"] == coin: return asset["szDecimals"] return 4 except: return 4 def get_market_price(self, coin: str) -> Optional[Decimal]: try: mids = self.info.all_mids() if coin in mids: return to_decimal(mids[coin]) except: pass return None def get_order_book_levels(self, coin: str) -> Optional[Dict[str, Decimal]]: try: snapshot = self.info.l2_snapshot(coin) if snapshot and 'levels' in snapshot: bids = snapshot['levels'][0] asks = snapshot['levels'][1] if bids and asks: best_bid = to_decimal(bids[0]['px']) best_ask = to_decimal(asks[0]['px']) mid = (best_bid + best_ask) / Decimal("2") return {'bid': best_bid, 'ask': best_ask, 'mid': mid} return None except: return None def get_current_position(self, coin: str) -> Dict[str, Decimal]: try: user_state = self.info.user_state(self.vault_address or self.account.address) for pos in user_state["assetPositions"]: if pos["position"]["coin"] == coin: return { 'size': to_decimal(pos["position"]["szi"]), 'pnl': to_decimal(pos["position"]["unrealizedPnl"]) } return {'size': Decimal("0"), 'pnl': Decimal("0")} except: return {'size': Decimal("0"), 'pnl': Decimal("0")} def get_open_orders(self) -> List[Dict]: try: return self.info.open_orders(self.vault_address or self.account.address) except: return [] def cancel_order(self, coin: str, oid: int): logger.info(f"Cancelling order {oid}...") try: return self.exchange.cancel(coin, oid) except Exception as e: logger.error(f"Error cancelling order: {e}") def place_limit_order(self, coin: str, is_buy: bool, size: Decimal, price: Decimal, order_type: str = "Alo") -> Optional[int]: # Validate using Decimal logic validated_size_float = validate_trade_size(size, self.sz_decimals, MIN_ORDER_VALUE_USD, price) if validated_size_float == 0: logger.error(f"Trade size {size} invalid after validation") return None price_float = round_to_sig_figs_precise(price, 5) logger.info(f"[ORDER] {order_type.upper()} {coin} {'BUY' if is_buy else 'SELL'} {validated_size_float} @ {price_float}") try: order_result = self.exchange.order(coin, is_buy, validated_size_float, price_float, {"limit": {"tif": order_type}}, reduce_only=is_buy) status = order_result["status"] if status == "ok": response_data = order_result["response"]["data"] if "statuses" in response_data: status_obj = response_data["statuses"][0] if "resting" in status_obj: return status_obj["resting"]["oid"] elif "filled" in status_obj: logger.info("Order filled immediately.") return status_obj["filled"]["oid"] elif "error" in status_obj: logger.error(f"Order API Error: {status_obj['error']}") else: logger.error(f"Order Failed: {order_result}") except Exception as e: logger.error(f"Exception during trade: {e}") return None def manage_orders(self) -> bool: """Returns True if there is an active order that should prevent new trades.""" open_orders = self.get_open_orders() my_orders = [o for o in open_orders if o['coin'] == COIN_SYMBOL] if not my_orders: return False if len(my_orders) > 1: logger.warning("Multiple orders found. Cancelling all.") for o in my_orders: self.cancel_order(COIN_SYMBOL, o['oid']) return False order = my_orders[0] oid = order['oid'] order_price = to_decimal(order['limitPx']) # Check if price moved too far levels = self.get_order_book_levels(COIN_SYMBOL) if not levels: return True # Keep order if data missing current_mid = levels['mid'] pct_diff = abs(current_mid - order_price) / order_price # Dynamic Buffer logic (Simplified for Decimal) # Using base buffer for now, can be enhanced if pct_diff > PRICE_BUFFER_PCT: logger.info(f"Price moved {pct_diff*100:.3f}% > {PRICE_BUFFER_PCT*100:.3f}%. Cancelling {oid}.") self.cancel_order(COIN_SYMBOL, oid) return False logger.info(f"Order {oid} within range ({pct_diff*100:.3f}%). Waiting.") return True def track_fills_and_pnl(self, force: bool = False): try: now = time.time() if not force and now - self.last_pnl_check_time < 10: return self.last_pnl_check_time = now user_fills = self.info.user_fills(self.vault_address or self.account.address) new_activity = False for fill in user_fills: if fill['coin'] != COIN_SYMBOL: continue if fill['time'] < self.strategy_start_time: continue fill_id = fill.get('tid') if fill_id in self.trade_history_seen: continue self.trade_history_seen.add(fill_id) fees = to_decimal(fill['fee']) pnl = to_decimal(fill['closedPnl']) self.accumulated_fees += fees self.accumulated_pnl += pnl new_activity = True logger.info(f"[FILL] {fill['side']} {fill['sz']} @ {fill['px']} | Fee: {fees} | PnL: {pnl}") if new_activity: # Convert back to float for JSON compatibility update_position_stats(self.active_position_id, { "hedge_pnl_realized": round(float(self.accumulated_pnl), 2), "hedge_fees_paid": round(float(self.accumulated_fees), 2) }) except Exception as e: logger.error(f"Error tracking fills: {e}") def close_all_positions(self, force_taker: bool = False): logger.info("Closing all positions...") try: # 1. Cancel Orders open_orders = self.get_open_orders() for o in open_orders: if o['coin'] == COIN_SYMBOL: self.cancel_order(COIN_SYMBOL, o['oid']) # 2. Get Position pos_data = self.get_current_position(COIN_SYMBOL) current_pos = pos_data['size'] if current_pos == 0: return is_buy_to_close = current_pos < 0 # Use Decimal absolute final_size = abs(current_pos) # --- MAKER CLOSE --- if not force_taker: levels = self.get_order_book_levels(COIN_SYMBOL) if levels: tick_size = Decimal("0.1") price = levels['bid'] - tick_size if is_buy_to_close else levels['ask'] + tick_size logger.info(f"Attempting Maker Close: {final_size} @ {price}") oid = self.place_limit_order(COIN_SYMBOL, is_buy_to_close, final_size, price, "Alo") if oid: logger.info(f"Close Order Placed: {oid}") return # --- TAKER CLOSE --- market_price = self.get_market_price(COIN_SYMBOL) if market_price: # 5% slippage for guaranteed close slip = Decimal("1.05") if is_buy_to_close else Decimal("0.95") limit_price = market_price * slip logger.info(f"Executing Taker Close: {final_size} @ {limit_price}") self.place_limit_order(COIN_SYMBOL, is_buy_to_close, final_size, limit_price, "Ioc") self.active_position_id = None except Exception as e: logger.error(f"Error closing positions: {e}") def run(self): logger.info(f"Starting Hedger Loop ({CHECK_INTERVAL}s)...") while True: try: active_pos = get_active_automatic_position() # Check Global Disable or Missing Position if not active_pos or not active_pos.get('hedge_enabled', True): if self.strategy is not None: logger.info("Hedge Disabled/Missing. Closing.") self.close_all_positions(force_taker=True) self.strategy = None time.sleep(CHECK_INTERVAL) continue # Check CLOSING status (from Manager) if active_pos.get('status') == 'CLOSING': logger.info(f"[ALERT] Position {active_pos['token_id']} is CLOSING. Closing Hedge.") self.close_all_positions(force_taker=True) self.strategy = None time.sleep(CHECK_INTERVAL) continue # Initialize Strategy if needed if self.strategy is None or self.active_position_id != active_pos['token_id']: self._init_strategy(active_pos) if self.strategy is None: time.sleep(CHECK_INTERVAL) continue # --- CYCLE START --- # 1. Manage Orders if self.manage_orders(): time.sleep(CHECK_INTERVAL) continue # 2. Market Data levels = self.get_order_book_levels(COIN_SYMBOL) if not levels: time.sleep(0.1) continue price = levels['mid'] pos_data = self.get_current_position(COIN_SYMBOL) current_size = pos_data['size'] current_pnl = pos_data['pnl'] # 3. Calculate Logic calc = self.strategy.calculate_rebalance(price, current_size) diff_abs = abs(calc['diff']) # 4. Thresholds sqrt_Pa = self.strategy.low_range.sqrt() sqrt_Pb = self.strategy.high_range.sqrt() max_potential_eth = self.strategy.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb)) rebalance_threshold = max(MIN_THRESHOLD_ETH, max_potential_eth * Decimal("0.05")) # Volatility Adjustment if self.last_price: pct_change = abs(price - self.last_price) / self.last_price if pct_change > Decimal("0.003"): rebalance_threshold *= DYNAMIC_THRESHOLD_MULTIPLIER self.last_price = price # 5. Check Zones # Assuming simple in-range check for now as zone logic was complex float math # Using Strategy ranges in_range = self.strategy.low_range <= price <= self.strategy.high_range if not in_range: if price > self.strategy.high_range: logger.info(f"[OUT] ABOVE RANGE ({price:.2f}). Closing Hedge.") self.close_all_positions(force_taker=True) elif price < self.strategy.low_range: if int(time.time()) % 20 == 0: logger.info(f"[HOLD] BELOW RANGE ({price:.2f}). Holding Hedge.") time.sleep(CHECK_INTERVAL) continue # 6. Execute Trade if diff_abs > rebalance_threshold: if time.time() - self.last_trade_time > MIN_TIME_BETWEEN_TRADES: is_buy = (calc['action'] == "BUY") # Taker execution for rebalance exec_price = levels['ask'] * Decimal("1.001") if is_buy else levels['bid'] * Decimal("0.999") logger.info(f"[TRIG] Rebalance: {calc['action']} {diff_abs:.4f} (Diff > {rebalance_threshold:.4f})") oid = self.place_limit_order(COIN_SYMBOL, is_buy, diff_abs, exec_price, "Ioc") if oid: self.last_trade_time = time.time() self.track_fills_and_pnl(force=True) else: logger.info(f"[WAIT] Cooldown. Diff: {diff_abs:.4f}") else: logger.info(f"[IDLE] Px: {price:.2f} | Diff: {diff_abs:.4f} < {rebalance_threshold:.4f} | PnL: {current_pnl:.2f}") self.track_fills_and_pnl() time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: logger.info("Stopping...") self.close_all_positions() break except Exception as e: logger.error(f"Loop Error: {e}", exc_info=True) time.sleep(5) if __name__ == "__main__": hedger = ScalperHedger() hedger.run()