import os import time import logging import sys import json import glob import math from decimal import Decimal, getcontext, ROUND_DOWN from typing import Optional, Dict, Any, List, Union from dotenv import load_dotenv # --- FIX: Add project root to sys.path to import local modules --- current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) sys.path.append(project_root) # Import local modules try: from logging_utils import setup_logging except ImportError: setup_logging = None # Ensure root logger is clean if we can't use setup_logging logging.getLogger().handlers.clear() logging.basicConfig(level=logging.INFO) from eth_account import Account from hyperliquid.exchange import Exchange from hyperliquid.info import Info from hyperliquid.utils import constants from clp_config import CLP_PROFILES, DEFAULT_STRATEGY # Load environment variables dotenv_path = os.path.join(current_dir, '.env') load_dotenv(dotenv_path if os.path.exists(dotenv_path) else None) # --- LOGGING SETUP --- # Ensure logs directory exists log_dir = os.path.join(current_dir, 'logs') os.makedirs(log_dir, exist_ok=True) # Custom Filter for Millisecond Unix Timestamp (Matching Manager style) class UnixMsLogFilter(logging.Filter): def filter(self, record): record.unix_ms = int(record.created * 1000) return True # Configure Logging logger = logging.getLogger("CLP_HEDGER") logger.setLevel(logging.INFO) logger.propagate = False # Prevent double logging from root logger logger.handlers.clear() # Clear existing handlers to prevent duplicates # Console Handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_fmt = logging.Formatter('%(asctime)s - %(message)s') console_handler.setFormatter(console_fmt) logger.addHandler(console_handler) # File Handler log_file = os.path.join(log_dir, 'clp_hedger.log') file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.INFO) file_handler.addFilter(UnixMsLogFilter()) file_fmt = logging.Formatter('%(unix_ms)d, %(asctime)s - %(message)s') file_handler.setFormatter(file_fmt) logger.addHandler(file_handler) # --- DECIMAL PRECISION CONFIGURATION --- getcontext().prec = 50 # --- CONFIGURATION --- # Settings are loaded from clp_config.py into self.coin_configs # --- HELPER FUNCTIONS --- def to_decimal(value: Any) -> Decimal: """Safely convert value to Decimal.""" if value is None: return Decimal("0") return Decimal(str(value)) def round_to_sz_decimals_precise(amount: Decimal, sz_decimals: int) -> float: """Round Decimal amount to specific decimals and return float for SDK.""" if amount == 0: return 0.0 quantizer = Decimal("1").scaleb(-sz_decimals) rounded = amount.quantize(quantizer, rounding=ROUND_DOWN) return float(rounded) def round_to_sig_figs_precise(x: Decimal, sig_figs: int = 5) -> float: """Round Decimal to significant figures and return float for SDK.""" if x == 0: return 0.0 # Use string formatting for sig figs as it's robust return float(f"{x:.{sig_figs}g}") def validate_trade_size(size: Decimal, sz_decimals: int, min_order_value: Decimal, price: Decimal) -> float: """Validate trade size against minimums.""" if size <= 0: return 0.0 # Check minimum order value order_value = size * price if order_value < min_order_value: return 0.0 # Check dust min_size = Decimal("10") ** (-sz_decimals) if size < min_size: return 0.0 return round_to_sz_decimals_precise(size, sz_decimals) def update_position_stats(file_path: str, token_id: int, stats_data: Dict): if not os.path.exists(file_path): return try: with open(file_path, 'r') as f: data = json.load(f) if isinstance(data, dict): data = [data] updated = False for entry in data: if entry.get('token_id') == token_id: entry.update(stats_data) updated = True break if updated: with open(file_path, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Error updating JSON stats in {file_path}: {e}") # --- STRATEGY CLASS --- class HyperliquidStrategy: def __init__(self, entry_amount0: Decimal, entry_amount1: Decimal, target_value: Decimal, entry_price: Decimal, low_range: Decimal, high_range: Decimal, start_price: Decimal, liquidity: int = 0, liquidity_scale: Decimal = Decimal("1e-12")): self.entry_amount0 = entry_amount0 self.entry_amount1 = entry_amount1 self.target_value = target_value self.entry_price = entry_price self.low_range = low_range self.high_range = high_range self.start_price = start_price self.gap = max(Decimal("0.0"), entry_price - start_price) self.recovery_target = entry_price + (Decimal("2") * self.gap) self.L = Decimal("0.0") # Priority: Use exact Liquidity from Contract if available if liquidity > 0: self.L = Decimal(liquidity) * liquidity_scale # Calculate implied delta at entry for verification implied_delta = self.get_pool_delta(entry_price) # Log removed to reduce spam in unified logger # logger.info(f"Using Exact Liquidity: {self.L:.4f} (Raw: {liquidity}, Scale: {liquidity_scale})") else: try: sqrt_P = entry_price.sqrt() sqrt_Pa = low_range.sqrt() sqrt_Pb = high_range.sqrt() if entry_amount0 > 0: denom0 = (Decimal("1") / sqrt_P) - (Decimal("1") / sqrt_Pb) if denom0 > Decimal("1e-10"): self.L = entry_amount0 / denom0 if self.L == 0 and entry_amount1 > 0: denom1 = sqrt_P - sqrt_Pa if denom1 > Decimal("1e-10"): self.L = entry_amount1 / denom1 if self.L == 0: max_eth = target_value / low_range denom_h = (Decimal("1") / sqrt_Pa) - (Decimal("1") / sqrt_Pb) if denom_h > 0: self.L = max_eth / denom_h except Exception as e: logger.error(f"Error calculating liquidity: {e}") def get_pool_delta(self, current_price: Decimal) -> Decimal: if current_price >= self.high_range: return Decimal("0.0") if current_price <= self.low_range: sqrt_Pa = self.low_range.sqrt() sqrt_Pb = self.high_range.sqrt() return self.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb)) sqrt_P = current_price.sqrt() sqrt_Pb = self.high_range.sqrt() return self.L * (sqrt_Pb - sqrt_P) / (sqrt_P * sqrt_Pb) def get_compensation_boost(self) -> Decimal: if self.low_range <= 0: return Decimal("0.075") range_width_pct = (self.high_range - self.low_range) / self.low_range # Use default strategy values if not available in instance context, # but typically these are constant. For now hardcode per plan or use safe defaults. # Since this is inside Strategy which doesn't know about global config easily, # we'll implement the logic defined in the plan directly. if range_width_pct < Decimal("0.02"): # <2% range return Decimal("0.15") # Double protection for narrow ranges elif range_width_pct < Decimal("0.05"): # <5% range return Decimal("0.10") # Moderate for medium ranges else: # >=5% range return Decimal("0.075") # Standard for wide ranges def calculate_rebalance(self, current_price: Decimal, current_short_size: Decimal, strategy_type: str = "ASYMMETRIC") -> Dict: # Note: current_short_size here is virtual (just for this specific strategy), # but the unified hedger will use the 'target_short' output primarily. pool_delta = self.get_pool_delta(current_price) # --- ASYMMETRIC COMPENSATION --- adj_pct = Decimal("0.0") if strategy_type == "ASYMMETRIC": range_width = self.high_range - self.low_range if range_width > 0: dist = current_price - self.entry_price half_width = range_width / Decimal("2") norm_dist = dist / half_width max_boost = self.get_compensation_boost() adj_pct = -norm_dist * max_boost adj_pct = max(-max_boost, min(max_boost, adj_pct)) raw_target_short = pool_delta adjusted_target_short = raw_target_short * (Decimal("1.0") + adj_pct) diff = adjusted_target_short - abs(current_short_size) return { "current_price": current_price, "pool_delta": pool_delta, "target_short": adjusted_target_short, "current_short": abs(current_short_size), "diff": diff, "action": "SELL" if diff > 0 else "BUY", "adj_pct": adj_pct } # --- UNIFIED HEDGER CLASS --- class UnifiedHedger: def __init__(self): self.private_key = os.environ.get("HEDGER_PRIVATE_KEY") self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") if not self.private_key: logger.error("No HEDGER_PRIVATE_KEY found in .env") sys.exit(1) self.account = Account.from_key(self.private_key) self.info = Info(constants.MAINNET_API_URL, skip_ws=True) self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address) # Maps (file_path, token_id) -> Strategy Instance self.strategies: Dict[tuple, HyperliquidStrategy] = {} # Maps (file_path, token_id) -> State Data (accumulated pnl etc) self.strategy_states: Dict[tuple, Dict] = {} # Unified State self.coin_configs: Dict[str, Dict] = {} # Symbol -> Config (thresholds, decimals) self.active_coins = set() self.api_backoff_until = 0 # Market Data Cache self.last_prices = {} self.price_history = {} # Symbol -> List[Decimal] self.last_trade_times = {} # Symbol -> timestamp self.last_idle_log_times = {} # Symbol -> timestamp # Shadow Orders (Global List) self.shadow_orders = [] self.startup_time = time.time() logger.info(f"[CLP_HEDGER] Master Hedger initialized. Agent: {self.account.address}") self._init_coin_configs() def _init_coin_configs(self): """Pre-load configuration for known coins from CLP_PROFILES.""" for profile_key, profile_data in CLP_PROFILES.items(): symbol = profile_data.get("COIN_SYMBOL") if symbol: if symbol not in self.coin_configs: # Init with Defaults self.coin_configs[symbol] = DEFAULT_STRATEGY.copy() self.coin_configs[symbol]["sz_decimals"] = 4 # Will be updated by API # Update with Profile Specifics self.coin_configs[symbol].update(profile_data) def _get_sz_decimals(self, coin: str) -> int: try: meta = self.info.meta() for asset in meta["universe"]: if asset["name"] == coin: return asset["szDecimals"] return 4 except: return 4 def update_coin_decimals(self): try: meta = self.info.meta() for asset in meta["universe"]: coin = asset["name"] if coin in self.coin_configs: self.coin_configs[coin]["sz_decimals"] = asset["szDecimals"] elif coin in self.active_coins: self.coin_configs[coin] = DEFAULT_STRATEGY.copy() self.coin_configs[coin]["sz_decimals"] = asset["szDecimals"] except Exception as e: logger.error(f"Failed to update coin decimals: {e}") def calculate_volatility(self, coin: str) -> Decimal: history = self.price_history.get(coin, []) if not history or len(history) < 30: return Decimal("0.0") try: n = len(history) mean = sum(history) / n variance = sum([pow(p - mean, 2) for p in history]) / n std_dev = variance.sqrt() if mean == 0: return Decimal("0.0") return std_dev / mean except: return Decimal("0.0") def check_shadow_orders(self, l2_snapshots: Dict): """Check if pending shadow (theoretical Maker) orders would have been filled.""" if not self.shadow_orders: return now = time.time() remaining = [] for order in self.shadow_orders: coin = order.get('coin') if not coin or coin not in l2_snapshots: remaining.append(order) continue levels = l2_snapshots[coin]['levels'] # Snapshot: [ [ {px, sz, n}, ... ], [ {px, sz, n}, ... ] ] -> [Bids, Asks] # Bids = levels[0], Asks = levels[1] if not levels[0] or not levels[1]: remaining.append(order) continue best_bid = to_decimal(levels[0][0]['px']) best_ask = to_decimal(levels[1][0]['px']) filled = False if order['side'] == 'BUY': # Filled if Ask <= Our Bid if best_ask <= order['price']: filled = True else: # SELL # Filled if Bid >= Our Ask if best_bid >= order['price']: filled = True if filled: timeout = self.coin_configs.get(coin, {}).get("SHADOW_ORDER_TIMEOUT", 600) duration = now - (order['expires_at'] - timeout) logger.info(f"[SHADOW] ✅ SUCCESS: {coin} Maker {order['side']} @ {order['price']:.2f} filled in {duration:.1f}s") elif now > order['expires_at']: logger.info(f"[SHADOW] ❌ FAILED: {coin} Maker {order['side']} @ {order['price']:.2f} timed out") else: remaining.append(order) self.shadow_orders = remaining def scan_strategies(self) -> bool: """Scans all *_status.json files and updates active strategies. Returns False if any read failed.""" status_files = glob.glob(os.path.join(current_dir, "*_status.json")) # logger.info(f"[DEBUG] Scanning {len(status_files)} status files...") active_ids = set() successful_files = set() has_errors = False for file_path in status_files: # Determine Profile/Coin from filename or content? # Filename convention: {TARGET_DEX}_status.json filename = os.path.basename(file_path) dex_name = filename.replace("_status.json", "") profile = CLP_PROFILES.get(dex_name) if not profile: # Fallback: Try to guess or skip continue coin_symbol = profile.get("COIN_SYMBOL", "ETH") self.active_coins.add(coin_symbol) try: with open(file_path, 'r') as f: content = f.read().strip() if not content: # Empty file (being written?) -> treat as error to be safe raise ValueError("Empty file") data = json.loads(content) successful_files.add(file_path) # Mark as safe to sync if isinstance(data, dict): data = [data] for entry in data: if entry.get('type') == 'AUTOMATIC' and entry.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']: token_id = entry['token_id'] key = (file_path, token_id) active_ids.add(key) # Init or Update Strategy if key not in self.strategies: self._init_single_strategy(key, entry, coin_symbol) # Init Status if key in self.strategy_states: self.strategy_states[key]['status'] = entry.get('status', 'OPEN') else: # Refresh PnL/Fees from file in case they changed if key in self.strategy_states: self.strategy_states[key]['pnl'] = to_decimal(entry.get('hedge_pnl_realized', 0)) self.strategy_states[key]['fees'] = to_decimal(entry.get('hedge_fees_paid', 0)) self.strategy_states[key]['status'] = entry.get('status', 'OPEN') except Exception as e: logger.error(f"Error reading {filename}: {e}. Skipping updates.") has_errors = True # Remove stale strategies ONLY from files we successfully read current_keys = list(self.strategies.keys()) for k in current_keys: file_origin = k[0] # If the file is gone (not in status_files) OR we successfully read it and the key is missing: if file_origin not in status_files or (file_origin in successful_files and k not in active_ids): logger.info(f"Strategy {k[1]} removed (Closed/Gone).") del self.strategies[k] if k in self.strategy_states: del self.strategy_states[k] return not has_errors def _init_single_strategy(self, key, position_data, coin_symbol): try: entry_amount0 = to_decimal(position_data.get('amount0_initial', 0)) entry_amount1 = to_decimal(position_data.get('amount1_initial', 0)) target_value = to_decimal(position_data.get('target_value', 50)) entry_price = to_decimal(position_data['entry_price']) lower = to_decimal(position_data['range_lower']) upper = to_decimal(position_data['range_upper']) liquidity_val = int(position_data.get('liquidity', 0)) d0 = int(position_data.get('token0_decimals', 18)) d1 = int(position_data.get('token1_decimals', 6)) liquidity_scale = Decimal("10") ** Decimal(str(-(d0 + d1) / 2)) start_price = self.last_prices.get(coin_symbol) if start_price is None: # Will init next loop when price available return strat = HyperliquidStrategy( entry_amount0, entry_amount1, target_value, entry_price, lower, upper, start_price, liquidity_val, liquidity_scale ) # Fix: Use persistent start time from JSON to track all fills ts_open = position_data.get('timestamp_open') start_time_ms = int(ts_open * 1000) if ts_open else int(time.time() * 1000) self.strategies[key] = strat self.strategy_states[key] = { "coin": coin_symbol, "start_time": start_time_ms, "pnl": to_decimal(position_data.get('hedge_pnl_realized', 0)), "fees": to_decimal(position_data.get('hedge_fees_paid', 0)), "hedge_TotPnL": to_decimal(position_data.get('hedge_TotPnL', 0)), # NEW: Total Closed PnL "entry_price": entry_price, # Store for fishing logic "status": position_data.get('status', 'OPEN') } logger.info(f"[STRAT] Init {key[1]} ({coin_symbol}) | Range: {lower}-{upper}") except Exception as e: logger.error(f"Failed to init strategy {key[1]}: {e}") def place_limit_order(self, coin: str, is_buy: bool, size: Decimal, price: Decimal, order_type: str = "Alo") -> Optional[int]: config = self.coin_configs.get(coin, {}) sz_decimals = config.get("sz_decimals", 4) min_order = config.get("MIN_ORDER_VALUE_USD", Decimal("10.0")) validated_size_float = validate_trade_size(size, sz_decimals, min_order, price) if validated_size_float == 0: return None price_float = round_to_sig_figs_precise(price, 5) logger.info(f"[ORDER] {order_type.upper()} {coin} {'BUY' if is_buy else 'SELL'} {validated_size_float} @ {price_float}") try: order_result = self.exchange.order(coin, is_buy, validated_size_float, price_float, {"limit": {"tif": order_type}}, reduce_only=is_buy) status = order_result["status"] if status == "ok": response_data = order_result["response"]["data"] if "statuses" in response_data: status_obj = response_data["statuses"][0] if "resting" in status_obj: return status_obj["resting"]["oid"] elif "filled" in status_obj: logger.info("Order filled immediately.") return status_obj["filled"]["oid"] elif "error" in status_obj: err_msg = status_obj['error'] if "Post only order would have immediately matched" in err_msg: logger.warning(f"[RETRY] Maker order rejected (Crossed). Will retry.") else: logger.error(f"Order API Error: {err_msg}") else: logger.error(f"Order Failed: {order_result}") except Exception as e: if "429" in str(e): logger.warning(f"Rate limit hit (429). Backing off.") self.api_backoff_until = time.time() + 30 else: logger.error(f"Exception during trade: {e}") return None def get_open_orders(self) -> List[Dict]: try: return self.info.open_orders(self.vault_address or self.account.address) except: return [] def cancel_order(self, coin: str, oid: int): try: self.exchange.cancel(coin, oid) except Exception as e: logger.error(f"Error cancelling order: {e}") def _update_closed_pnl(self, coin: str): """Fetch fills from API and sum closedPnl and fees for active strategies.""" try: # 1. Identify relevant strategies for this coin active_strats = [k for k, v in self.strategy_states.items() if v['coin'] == coin] if not active_strats: return # 2. Fetch all fills (This is heavy, maybe cache or limit?) # SDK user_fills returns recent fills. fills = self.info.user_fills(self.vault_address or self.account.address) for key in active_strats: start_time = self.strategy_states[key]['start_time'] total_closed_pnl = Decimal("0") total_fees = Decimal("0") for fill in fills: if fill['coin'] == coin: # Check timestamp if fill['time'] >= start_time: # Sum closedPnl total_closed_pnl += to_decimal(fill.get('closedPnl', 0)) # Sum fees total_fees += to_decimal(fill.get('fee', 0)) # Update State self.strategy_states[key]['hedge_TotPnL'] = total_closed_pnl self.strategy_states[key]['fees'] = total_fees # Write to JSON file_path, token_id = key update_position_stats(file_path, token_id, { "hedge_TotPnL": float(total_closed_pnl), "hedge_fees_paid": float(total_fees) }) logger.info(f"[PnL] Updated {coin} | Closed PnL: ${total_closed_pnl:.2f} | Fees: ${total_fees:.2f}") except Exception as e: logger.error(f"Failed to update closed PnL/Fees for {coin}: {e}") def run(self): logger.info("Starting Unified Hedger Loop...") self.update_coin_decimals() # --- LOG SETTINGS ON START --- logger.info("=== HEDGER CONFIGURATION ===") for symbol, config in self.coin_configs.items(): logger.info(f"--- {symbol} ---") for k, v in config.items(): logger.info(f" {k}: {v}") logger.info("============================") while True: try: # 1. API Backoff if time.time() < self.api_backoff_until: time.sleep(1) continue # 2. Update Strategies if not self.scan_strategies(): logger.warning("Strategy scan failed (read error). Skipping execution tick.") time.sleep(1) continue # 3. Fetch Market Data (Centralized) try: mids = self.info.all_mids() user_state = self.info.user_state(self.vault_address or self.account.address) open_orders = self.get_open_orders() l2_snapshots = {} # Cache for snapshots except Exception as e: logger.error(f"API Error fetching data: {e}") time.sleep(1) continue # Map Open Orders orders_map = {} for o in open_orders: c = o['coin'] if c not in orders_map: orders_map[c] = [] orders_map[c].append(o) # Parse User State account_value = Decimal("0") if "marginSummary" in user_state and "accountValue" in user_state["marginSummary"]: account_value = to_decimal(user_state["marginSummary"]["accountValue"]) # Map current positions current_positions = {} # Coin -> Size current_pnls = {} # Coin -> Unrealized PnL current_entry_pxs = {} # Coin -> Entry Price (NEW) for pos in user_state["assetPositions"]: c = pos["position"]["coin"] s = to_decimal(pos["position"]["szi"]) u = to_decimal(pos["position"]["unrealizedPnl"]) e = to_decimal(pos["position"]["entryPx"]) current_positions[c] = s current_pnls[c] = u current_entry_pxs[c] = e # 4. Aggregate Targets # Coin -> { 'target_short': Decimal, 'contributors': int, 'is_at_edge': bool } aggregates = {} # First, update all prices from mids for active coins for coin in self.active_coins: if coin in mids: price = to_decimal(mids[coin]) self.last_prices[coin] = price # Update Price History if coin not in self.price_history: self.price_history[coin] = [] self.price_history[coin].append(price) if len(self.price_history[coin]) > 300: self.price_history[coin].pop(0) for key, strat in self.strategies.items(): coin = self.strategy_states[key]['coin'] status = self.strategy_states[key].get('status', 'OPEN') if coin not in self.last_prices: continue price = self.last_prices[coin] # Get Config & Strategy Type config = self.coin_configs.get(coin, {}) strategy_type = config.get("HEDGE_STRATEGY", "ASYMMETRIC") # Calc Logic calc = strat.calculate_rebalance(price, Decimal("0"), strategy_type) if coin not in aggregates: aggregates[coin] = {'target_short': Decimal("0"), 'contributors': 0, 'is_at_edge': False, 'is_at_bottom_edge': False, 'adj_pct': Decimal("0"), 'is_closing': False} if status == 'CLOSING': # If Closing, we want target to be 0 for this strategy logger.info(f"[STRAT] {key[1]} is CLOSING -> Force Target 0") aggregates[coin]['is_closing'] = True # Do not add to target_short else: aggregates[coin]['target_short'] += calc['target_short'] aggregates[coin]['contributors'] += 1 aggregates[coin]['adj_pct'] = calc['adj_pct'] # Check Edge Proximity for Cleanup config = self.coin_configs.get(coin, {}) enable_cleanup = config.get("ENABLE_EDGE_CLEANUP", True) cleanup_margin = config.get("EDGE_CLEANUP_MARGIN_PCT", Decimal("0.02")) if enable_cleanup: dist_bottom_pct = (price - strat.low_range) / strat.low_range dist_top_pct = (strat.high_range - price) / strat.high_range range_width_pct = (strat.high_range - strat.low_range) / strat.low_range safety_margin_pct = range_width_pct * cleanup_margin if dist_bottom_pct < safety_margin_pct or dist_top_pct < safety_margin_pct: aggregates[coin]['is_at_edge'] = True if dist_bottom_pct < safety_margin_pct: aggregates[coin]['is_at_bottom_edge'] = True # Check Shadow Orders (Pre-Execution) self.check_shadow_orders(l2_snapshots) # 5. Execute Per Coin # Union of coins with Active Strategies OR Active Positions coins_to_process = set(aggregates.keys()) for c, pos in current_positions.items(): if abs(pos) > 0: coins_to_process.add(c) for coin in coins_to_process: data = aggregates.get(coin, {'target_short': Decimal("0"), 'contributors': 0, 'is_at_edge': False, 'adj_pct': Decimal("0"), 'is_closing': False}) price = self.last_prices.get(coin, Decimal("0")) if price == 0: continue target_short_abs = data['target_short'] target_position = -target_short_abs current_pos = current_positions.get(coin, Decimal("0")) diff = target_position - current_pos diff_abs = abs(diff) # Thresholds config = self.coin_configs.get(coin, {}) min_thresh = config.get("MIN_HEDGE_THRESHOLD", Decimal("0.008")) vol_pct = self.calculate_volatility(coin) base_vol = Decimal("0.0005") vol_mult = max(Decimal("1.0"), min(Decimal("3.0"), vol_pct / base_vol)) if vol_pct > 0 else Decimal("1.0") base_rebalance_pct = config.get("BASE_REBALANCE_THRESHOLD_PCT", Decimal("0.20")) thresh_pct = min(Decimal("0.15"), base_rebalance_pct * vol_mult) dynamic_thresh = max(min_thresh, abs(target_position) * thresh_pct) if data['is_at_edge'] and config.get("ENABLE_EDGE_CLEANUP", True): if dynamic_thresh > min_thresh: dynamic_thresh = min_thresh action_needed = diff_abs > dynamic_thresh is_buy_bool = diff > 0 side_str = "BUY" if is_buy_bool else "SELL" # Manage Existing Orders existing_orders = orders_map.get(coin, []) force_taker_retry = False order_matched = False price_buffer_pct = config.get("PRICE_BUFFER_PCT", Decimal("0.0015")) for o in existing_orders: o_oid = o['oid'] o_price = to_decimal(o['limitPx']) o_side = o['side'] o_timestamp = o.get('timestamp', int(time.time()*1000)) is_same_side = (o_side == 'B' and is_buy_bool) or (o_side == 'A' and not is_buy_bool) order_age_sec = (int(time.time()*1000) - o_timestamp) / 1000.0 if is_same_side and order_age_sec > config.get("MAKER_ORDER_TIMEOUT", 300): logger.info(f"[TIMEOUT] {coin} Order {o_oid} expired. Cancelling.") self.cancel_order(coin, o_oid) continue if config.get("ENABLE_FISHING", False) and is_same_side and order_age_sec > config.get("FISHING_TIMEOUT_FALLBACK", 30): logger.info(f"[FISHING] {coin} Order {o_oid} timed out. Retrying as Taker.") self.cancel_order(coin, o_oid) force_taker_retry = True continue if is_same_side and (abs(price - o_price) / price) < price_buffer_pct: order_matched = True if int(time.time()) % 15 == 0: logger.info(f"[WAIT] {coin} Pending {side_str} @ {o_price} | Age: {order_age_sec:.1f}s") break else: logger.info(f"Cancelling stale order {o_oid} ({o_side} @ {o_price})") self.cancel_order(coin, o_oid) # Determine Urgency / Bypass Cooldown bypass_cooldown = False force_maker = False if not order_matched and (action_needed or force_taker_retry): if force_taker_retry: bypass_cooldown = True elif data.get('is_closing', False): bypass_cooldown = True elif data.get('contributors', 0) == 0: if time.time() - self.startup_time > 5: force_maker = True else: continue # Skip startup ghost positions large_hedge_mult = config.get("LARGE_HEDGE_MULTIPLIER", Decimal("5.0")) if diff_abs > (dynamic_thresh * large_hedge_mult) and not force_maker and data.get('is_at_edge', False): # Prevent IOC for BUYs at bottom edge if not (is_buy_bool and data.get('is_at_bottom_edge', False)): bypass_cooldown = True # --- ASYMMETRIC HEDGE CHECK --- is_asymmetric_blocked = False p_mid_asym = Decimal("0") strategy_type = config.get("HEDGE_STRATEGY", "ASYMMETRIC") if strategy_type == "ASYMMETRIC" and is_buy_bool and not bypass_cooldown: total_L_asym = Decimal("0") for k_strat, strat_inst in self.strategies.items(): if self.strategy_states[k_strat]['coin'] == coin: total_L_asym += strat_inst.L gamma_asym = (Decimal("0.5") * total_L_asym * (price ** Decimal("-1.5"))) if gamma_asym > 0: p_mid_asym = price - (diff_abs / gamma_asym) if not data.get('is_at_edge', False) and price >= p_mid_asym: is_asymmetric_blocked = True # --- EXECUTION --- if not order_matched and not is_asymmetric_blocked: if action_needed or force_taker_retry: last_trade = self.last_trade_times.get(coin, 0) min_time = config.get("MIN_TIME_BETWEEN_TRADES", 60) if bypass_cooldown or (time.time() - last_trade > min_time): if coin not in l2_snapshots: l2_snapshots[coin] = self.info.l2_snapshot(coin) levels = l2_snapshots[coin]['levels'] if levels[0] and levels[1]: bid, ask = to_decimal(levels[0][0]['px']), to_decimal(levels[1][0]['px']) if bypass_cooldown and not force_maker: exec_price = ask * Decimal("1.001") if is_buy_bool else bid * Decimal("0.999") order_type = "Ioc" else: exec_price = bid if is_buy_bool else ask order_type = "Alo" logger.info(f"[TRIG] {coin} {side_str} {diff_abs:.4f} | Cur: {current_pos:.4f} | Type: {order_type}") oid = self.place_limit_order(coin, is_buy_bool, diff_abs, exec_price, order_type) if oid: self.last_trade_times[coin] = time.time() if order_type == "Ioc": shadow_price = bid if is_buy_bool else ask self.shadow_orders.append({'coin': coin, 'side': side_str, 'price': shadow_price, 'expires_at': time.time() + config.get("SHADOW_ORDER_TIMEOUT", 600)}) logger.info("Sleeping 10s for position update...") time.sleep(10) self._update_closed_pnl(coin) else: # Idle Cleanup if existing_orders and not order_matched: for o in existing_orders: self.cancel_order(coin, o['oid']) # --- THROTTLED STATUS LOGGING --- now = time.time() last_log = self.last_idle_log_times.get(coin, 0) monitor_interval = config.get("MONITOR_INTERVAL_SECONDS", 60) if now - last_log >= monitor_interval: self.last_idle_log_times[coin] = now if is_asymmetric_blocked: logger.info(f"[ASYMMETRIC] Blocking BUY. Px ({price:.2f}) >= Eq ({p_mid_asym:.2f}) & Not Edge") total_L_log = Decimal("0") for k_strat, strat_inst in self.strategies.items(): if self.strategy_states[k_strat]['coin'] == coin: total_L_log += strat_inst.L if total_L_log > 0 and price > 0: gamma_log = (Decimal("0.5") * total_L_log * (price ** Decimal("-1.5"))) if gamma_log > 0: p_mid_log = price - (diff / gamma_log) # Corrected equilibrium formula p_buy = price + (dynamic_thresh + diff) / gamma_log p_sell = price - (dynamic_thresh - diff) / gamma_log pad = " " if coin == "BNB" else "" unrealized = current_pnls.get(coin, Decimal("0")) closed_pnl = sum(s['hedge_TotPnL'] for s in self.strategy_states.values() if s['coin'] == coin) fees = sum(s['fees'] for s in self.strategy_states.values() if s['coin'] == coin) total_pnl = (closed_pnl - fees) + unrealized logger.info(f"[IDLE] {coin} | Px: {price:.2f}{pad} | M: {p_mid_log:.1f}{pad} | B: {p_buy:.1f}{pad} / S: {p_sell:.1f}{pad} | delta: {target_position:.4f}({diff:+.4f}) | Adj: {data.get('adj_pct',0)*100:+.2f}%, Vol: {vol_mult:.2f}, Thr: {dynamic_thresh:.4f} | PnL: {unrealized:.2f} | TotPnL: {total_pnl:.2f}") else: logger.info(f"[IDLE] {coin} | Px: {price:.2f} | delta: {target_position:.4f} | Diff: {diff:.4f} (Thresh: {dynamic_thresh:.4f})") else: logger.info(f"[IDLE] {coin} | Px: {price:.2f} | delta: {target_position:.4f} | Diff: {diff:.4f}") time.sleep(DEFAULT_STRATEGY.get("CHECK_INTERVAL", 1)) except KeyboardInterrupt: logger.info("Stopping...") break except Exception as e: logger.error(f"Loop Error: {e}", exc_info=True) time.sleep(5) if __name__ == "__main__": hedger = UnifiedHedger() hedger.run()