import os import time import logging import sys import json import glob import math from decimal import Decimal, getcontext, ROUND_DOWN from typing import Optional, Dict, Any, List, Union from dotenv import load_dotenv # --- FIX: Add project root to sys.path to import local modules --- current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) sys.path.append(project_root) # Import local modules try: from logging_utils import setup_logging except ImportError: setup_logging = None # Ensure root logger is clean if we can't use setup_logging logging.getLogger().handlers.clear() logging.basicConfig(level=logging.INFO) from eth_account import Account from hyperliquid.exchange import Exchange from hyperliquid.info import Info from hyperliquid.utils import constants from clp_config import CLP_PROFILES, DEFAULT_STRATEGY # Load environment variables dotenv_path = os.path.join(current_dir, '.env') load_dotenv(dotenv_path if os.path.exists(dotenv_path) else None) # --- LOGGING SETUP --- # Ensure logs directory exists log_dir = os.path.join(current_dir, 'logs') os.makedirs(log_dir, exist_ok=True) # Custom Filter for Millisecond Unix Timestamp (Matching Manager style) class UnixMsLogFilter(logging.Filter): def filter(self, record): record.unix_ms = int(record.created * 1000) return True # Configure Logging logger = logging.getLogger("CLP_HEDGER") logger.setLevel(logging.INFO) logger.propagate = False # Prevent double logging from root logger logger.handlers.clear() # Clear existing handlers to prevent duplicates # Console Handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_fmt = logging.Formatter('%(asctime)s - %(message)s') console_handler.setFormatter(console_fmt) logger.addHandler(console_handler) # File Handler log_file = os.path.join(log_dir, 'clp_hedger.log') file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.INFO) file_handler.addFilter(UnixMsLogFilter()) file_fmt = logging.Formatter('%(unix_ms)d, %(asctime)s - %(message)s') file_handler.setFormatter(file_fmt) logger.addHandler(file_handler) # --- DECIMAL PRECISION CONFIGURATION --- getcontext().prec = 50 # --- CONFIGURATION --- # Settings are loaded from clp_config.py into self.coin_configs # --- HELPER FUNCTIONS --- def to_decimal(value: Any) -> Decimal: """Safely convert value to Decimal.""" if value is None: return Decimal("0") return Decimal(str(value)) def round_to_sz_decimals_precise(amount: Decimal, sz_decimals: int) -> float: """Round Decimal amount to specific decimals and return float for SDK.""" if amount == 0: return 0.0 quantizer = Decimal("1").scaleb(-sz_decimals) rounded = amount.quantize(quantizer, rounding=ROUND_DOWN) return float(rounded) def round_to_sig_figs_precise(x: Decimal, sig_figs: int = 5) -> float: """Round Decimal to significant figures and return float for SDK.""" if x == 0: return 0.0 # Use string formatting for sig figs as it's robust return float(f"{x:.{sig_figs}g}") def validate_trade_size(size: Decimal, sz_decimals: int, min_order_value: Decimal, price: Decimal) -> float: """Validate trade size against minimums.""" if size <= 0: return 0.0 # Check minimum order value order_value = size * price if order_value < min_order_value: return 0.0 # Check dust min_size = Decimal("10") ** (-sz_decimals) if size < min_size: return 0.0 return round_to_sz_decimals_precise(size, sz_decimals) def update_position_stats(file_path: str, token_id: int, stats_data: Dict): if not os.path.exists(file_path): return try: with open(file_path, 'r') as f: data = json.load(f) if isinstance(data, dict): data = [data] updated = False for entry in data: if entry.get('token_id') == token_id: entry.update(stats_data) updated = True break if updated: with open(file_path, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Error updating JSON stats in {file_path}: {e}") # --- STRATEGY CLASS --- class HyperliquidStrategy: def __init__(self, entry_amount0: Decimal, entry_amount1: Decimal, target_value: Decimal, entry_price: Decimal, low_range: Decimal, high_range: Decimal, start_price: Decimal, liquidity: int = 0, liquidity_scale: Decimal = Decimal("1e-12")): self.entry_amount0 = entry_amount0 self.entry_amount1 = entry_amount1 self.target_value = target_value self.entry_price = entry_price self.low_range = low_range self.high_range = high_range self.start_price = start_price self.gap = max(Decimal("0.0"), entry_price - start_price) self.recovery_target = entry_price + (Decimal("2") * self.gap) self.L = Decimal("0.0") # Priority: Use exact Liquidity from Contract if available if liquidity > 0: self.L = Decimal(liquidity) * liquidity_scale # Calculate implied delta at entry for verification implied_delta = self.get_pool_delta(entry_price) # Log removed to reduce spam in unified logger # logger.info(f"Using Exact Liquidity: {self.L:.4f} (Raw: {liquidity}, Scale: {liquidity_scale})") else: try: sqrt_P = entry_price.sqrt() sqrt_Pa = low_range.sqrt() sqrt_Pb = high_range.sqrt() if entry_amount0 > 0: denom0 = (Decimal("1") / sqrt_P) - (Decimal("1") / sqrt_Pb) if denom0 > Decimal("1e-10"): self.L = entry_amount0 / denom0 if self.L == 0 and entry_amount1 > 0: denom1 = sqrt_P - sqrt_Pa if denom1 > Decimal("1e-10"): self.L = entry_amount1 / denom1 if self.L == 0: max_eth = target_value / low_range denom_h = (Decimal("1") / sqrt_Pa) - (Decimal("1") / sqrt_Pb) if denom_h > 0: self.L = max_eth / denom_h except Exception as e: logger.error(f"Error calculating liquidity: {e}") def get_pool_delta(self, current_price: Decimal) -> Decimal: if current_price >= self.high_range: return Decimal("0.0") if current_price <= self.low_range: sqrt_Pa = self.low_range.sqrt() sqrt_Pb = self.high_range.sqrt() return self.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb)) sqrt_P = current_price.sqrt() sqrt_Pb = self.high_range.sqrt() return self.L * (sqrt_Pb - sqrt_P) / (sqrt_P * sqrt_Pb) def get_compensation_boost(self) -> Decimal: if self.low_range <= 0: return Decimal("0.075") range_width_pct = (self.high_range - self.low_range) / self.low_range # Use default strategy values if not available in instance context, # but typically these are constant. For now hardcode per plan or use safe defaults. # Since this is inside Strategy which doesn't know about global config easily, # we'll implement the logic defined in the plan directly. if range_width_pct < Decimal("0.02"): # <2% range return Decimal("0.15") # Double protection for narrow ranges elif range_width_pct < Decimal("0.05"): # <5% range return Decimal("0.10") # Moderate for medium ranges else: # >=5% range return Decimal("0.075") # Standard for wide ranges def calculate_rebalance(self, current_price: Decimal, current_short_size: Decimal) -> Dict: # Note: current_short_size here is virtual (just for this specific strategy), # but the unified hedger will use the 'target_short' output primarily. pool_delta = self.get_pool_delta(current_price) # --- ASYMMETRIC COMPENSATION --- adj_pct = Decimal("0.0") range_width = self.high_range - self.low_range if range_width > 0: dist = current_price - self.entry_price half_width = range_width / Decimal("2") norm_dist = dist / half_width max_boost = self.get_compensation_boost() adj_pct = -norm_dist * max_boost adj_pct = max(-max_boost, min(max_boost, adj_pct)) raw_target_short = pool_delta adjusted_target_short = raw_target_short * (Decimal("1.0") + adj_pct) diff = adjusted_target_short - abs(current_short_size) return { "current_price": current_price, "pool_delta": pool_delta, "target_short": adjusted_target_short, "current_short": abs(current_short_size), "diff": diff, "action": "SELL" if diff > 0 else "BUY", "adj_pct": adj_pct } # --- UNIFIED HEDGER CLASS --- class UnifiedHedger: def __init__(self): self.private_key = os.environ.get("HEDGER_PRIVATE_KEY") self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") if not self.private_key: logger.error("No HEDGER_PRIVATE_KEY found in .env") sys.exit(1) self.account = Account.from_key(self.private_key) self.info = Info(constants.MAINNET_API_URL, skip_ws=True) self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address) # Maps (file_path, token_id) -> Strategy Instance self.strategies: Dict[tuple, HyperliquidStrategy] = {} # Maps (file_path, token_id) -> State Data (accumulated pnl etc) self.strategy_states: Dict[tuple, Dict] = {} # Unified State self.coin_configs: Dict[str, Dict] = {} # Symbol -> Config (thresholds, decimals) self.active_coins = set() self.api_backoff_until = 0 # Market Data Cache self.last_prices = {} self.price_history = {} # Symbol -> List[Decimal] self.last_trade_times = {} # Symbol -> timestamp # Shadow Orders (Global List) self.shadow_orders = [] self.startup_time = time.time() logger.info(f"[CLP_HEDGER] Master Hedger initialized. Agent: {self.account.address}") self._init_coin_configs() def _init_coin_configs(self): """Pre-load configuration for known coins from CLP_PROFILES.""" for profile_key, profile_data in CLP_PROFILES.items(): symbol = profile_data.get("COIN_SYMBOL") if symbol: if symbol not in self.coin_configs: # Init with Defaults self.coin_configs[symbol] = DEFAULT_STRATEGY.copy() self.coin_configs[symbol]["sz_decimals"] = 4 # Will be updated by API # Update with Profile Specifics self.coin_configs[symbol].update(profile_data) def _get_sz_decimals(self, coin: str) -> int: try: meta = self.info.meta() for asset in meta["universe"]: if asset["name"] == coin: return asset["szDecimals"] return 4 except: return 4 def update_coin_decimals(self): try: meta = self.info.meta() for asset in meta["universe"]: coin = asset["name"] if coin in self.coin_configs: self.coin_configs[coin]["sz_decimals"] = asset["szDecimals"] elif coin in self.active_coins: self.coin_configs[coin] = DEFAULT_STRATEGY.copy() self.coin_configs[coin]["sz_decimals"] = asset["szDecimals"] except Exception as e: logger.error(f"Failed to update coin decimals: {e}") def calculate_volatility(self, coin: str) -> Decimal: history = self.price_history.get(coin, []) if not history or len(history) < 30: return Decimal("0.0") try: n = len(history) mean = sum(history) / n variance = sum([pow(p - mean, 2) for p in history]) / n std_dev = variance.sqrt() if mean == 0: return Decimal("0.0") return std_dev / mean except: return Decimal("0.0") def check_shadow_orders(self, l2_snapshots: Dict): """Check if pending shadow (theoretical Maker) orders would have been filled.""" if not self.shadow_orders: return now = time.time() remaining = [] for order in self.shadow_orders: coin = order.get('coin') if not coin or coin not in l2_snapshots: remaining.append(order) continue levels = l2_snapshots[coin]['levels'] # Snapshot: [ [ {px, sz, n}, ... ], [ {px, sz, n}, ... ] ] -> [Bids, Asks] # Bids = levels[0], Asks = levels[1] if not levels[0] or not levels[1]: remaining.append(order) continue best_bid = to_decimal(levels[0][0]['px']) best_ask = to_decimal(levels[1][0]['px']) filled = False if order['side'] == 'BUY': # Filled if Ask <= Our Bid if best_ask <= order['price']: filled = True else: # SELL # Filled if Bid >= Our Ask if best_bid >= order['price']: filled = True if filled: timeout = self.coin_configs.get(coin, {}).get("SHADOW_ORDER_TIMEOUT", 600) duration = now - (order['expires_at'] - timeout) logger.info(f"[SHADOW] ✅ SUCCESS: {coin} Maker {order['side']} @ {order['price']:.2f} filled in {duration:.1f}s") elif now > order['expires_at']: logger.info(f"[SHADOW] ❌ FAILED: {coin} Maker {order['side']} @ {order['price']:.2f} timed out") else: remaining.append(order) self.shadow_orders = remaining def scan_strategies(self) -> bool: """Scans all *_status.json files and updates active strategies. Returns False if any read failed.""" status_files = glob.glob(os.path.join(current_dir, "*_status.json")) # logger.info(f"[DEBUG] Scanning {len(status_files)} status files...") active_ids = set() successful_files = set() has_errors = False for file_path in status_files: # Determine Profile/Coin from filename or content? # Filename convention: {TARGET_DEX}_status.json filename = os.path.basename(file_path) dex_name = filename.replace("_status.json", "") profile = CLP_PROFILES.get(dex_name) if not profile: # Fallback: Try to guess or skip continue coin_symbol = profile.get("COIN_SYMBOL", "ETH") self.active_coins.add(coin_symbol) try: with open(file_path, 'r') as f: content = f.read().strip() if not content: # Empty file (being written?) -> treat as error to be safe raise ValueError("Empty file") data = json.loads(content) successful_files.add(file_path) # Mark as safe to sync if isinstance(data, dict): data = [data] for entry in data: if entry.get('type') == 'AUTOMATIC' and entry.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']: token_id = entry['token_id'] key = (file_path, token_id) active_ids.add(key) # Init or Update Strategy if key not in self.strategies: self._init_single_strategy(key, entry, coin_symbol) # Init Status if key in self.strategy_states: self.strategy_states[key]['status'] = entry.get('status', 'OPEN') else: # Refresh PnL/Fees from file in case they changed if key in self.strategy_states: self.strategy_states[key]['pnl'] = to_decimal(entry.get('hedge_pnl_realized', 0)) self.strategy_states[key]['fees'] = to_decimal(entry.get('hedge_fees_paid', 0)) self.strategy_states[key]['status'] = entry.get('status', 'OPEN') except Exception as e: logger.error(f"Error reading {filename}: {e}. Skipping updates.") has_errors = True # Remove stale strategies ONLY from files we successfully read current_keys = list(self.strategies.keys()) for k in current_keys: file_origin = k[0] # If the file is gone (not in status_files) OR we successfully read it and the key is missing: if file_origin not in status_files or (file_origin in successful_files and k not in active_ids): logger.info(f"Strategy {k[1]} removed (Closed/Gone).") del self.strategies[k] if k in self.strategy_states: del self.strategy_states[k] return not has_errors def _init_single_strategy(self, key, position_data, coin_symbol): try: entry_amount0 = to_decimal(position_data.get('amount0_initial', 0)) entry_amount1 = to_decimal(position_data.get('amount1_initial', 0)) target_value = to_decimal(position_data.get('target_value', 50)) entry_price = to_decimal(position_data['entry_price']) lower = to_decimal(position_data['range_lower']) upper = to_decimal(position_data['range_upper']) liquidity_val = int(position_data.get('liquidity', 0)) d0 = int(position_data.get('token0_decimals', 18)) d1 = int(position_data.get('token1_decimals', 6)) liquidity_scale = Decimal("10") ** Decimal(str(-(d0 + d1) / 2)) start_price = self.last_prices.get(coin_symbol) if start_price is None: # Will init next loop when price available return strat = HyperliquidStrategy( entry_amount0, entry_amount1, target_value, entry_price, lower, upper, start_price, liquidity_val, liquidity_scale ) # Fix: Use persistent start time from JSON to track all fills ts_open = position_data.get('timestamp_open') start_time_ms = int(ts_open * 1000) if ts_open else int(time.time() * 1000) self.strategies[key] = strat self.strategy_states[key] = { "coin": coin_symbol, "start_time": start_time_ms, "pnl": to_decimal(position_data.get('hedge_pnl_realized', 0)), "fees": to_decimal(position_data.get('hedge_fees_paid', 0)), "hedge_TotPnL": to_decimal(position_data.get('hedge_TotPnL', 0)), # NEW: Total Closed PnL "entry_price": entry_price, # Store for fishing logic "status": position_data.get('status', 'OPEN') } logger.info(f"[STRAT] Init {key[1]} ({coin_symbol}) | Range: {lower}-{upper}") except Exception as e: logger.error(f"Failed to init strategy {key[1]}: {e}") def place_limit_order(self, coin: str, is_buy: bool, size: Decimal, price: Decimal, order_type: str = "Alo") -> Optional[int]: config = self.coin_configs.get(coin, {}) sz_decimals = config.get("sz_decimals", 4) min_order = config.get("MIN_ORDER_VALUE_USD", Decimal("10.0")) validated_size_float = validate_trade_size(size, sz_decimals, min_order, price) if validated_size_float == 0: return None price_float = round_to_sig_figs_precise(price, 5) logger.info(f"[ORDER] {order_type.upper()} {coin} {'BUY' if is_buy else 'SELL'} {validated_size_float} @ {price_float}") try: order_result = self.exchange.order(coin, is_buy, validated_size_float, price_float, {"limit": {"tif": order_type}}, reduce_only=is_buy) status = order_result["status"] if status == "ok": response_data = order_result["response"]["data"] if "statuses" in response_data: status_obj = response_data["statuses"][0] if "resting" in status_obj: return status_obj["resting"]["oid"] elif "filled" in status_obj: logger.info("Order filled immediately.") return status_obj["filled"]["oid"] elif "error" in status_obj: err_msg = status_obj['error'] if "Post only order would have immediately matched" in err_msg: logger.warning(f"[RETRY] Maker order rejected (Crossed). Will retry.") else: logger.error(f"Order API Error: {err_msg}") else: logger.error(f"Order Failed: {order_result}") except Exception as e: if "429" in str(e): logger.warning(f"Rate limit hit (429). Backing off.") self.api_backoff_until = time.time() + 30 else: logger.error(f"Exception during trade: {e}") return None def get_open_orders(self) -> List[Dict]: try: return self.info.open_orders(self.vault_address or self.account.address) except: return [] def cancel_order(self, coin: str, oid: int): try: self.exchange.cancel(coin, oid) except Exception as e: logger.error(f"Error cancelling order: {e}") def _update_closed_pnl(self, coin: str): """Fetch fills from API and sum closedPnl and fees for active strategies.""" try: # 1. Identify relevant strategies for this coin active_strats = [k for k, v in self.strategy_states.items() if v['coin'] == coin] if not active_strats: return # 2. Fetch all fills (This is heavy, maybe cache or limit?) # SDK user_fills returns recent fills. fills = self.info.user_fills(self.vault_address or self.account.address) for key in active_strats: start_time = self.strategy_states[key]['start_time'] total_closed_pnl = Decimal("0") total_fees = Decimal("0") for fill in fills: if fill['coin'] == coin: # Check timestamp if fill['time'] >= start_time: # Sum closedPnl total_closed_pnl += to_decimal(fill.get('closedPnl', 0)) # Sum fees total_fees += to_decimal(fill.get('fee', 0)) # Update State self.strategy_states[key]['hedge_TotPnL'] = total_closed_pnl self.strategy_states[key]['fees'] = total_fees # Write to JSON file_path, token_id = key update_position_stats(file_path, token_id, { "hedge_TotPnL": float(total_closed_pnl), "hedge_fees_paid": float(total_fees) }) logger.info(f"[PnL] Updated {coin} | Closed PnL: ${total_closed_pnl:.2f} | Fees: ${total_fees:.2f}") except Exception as e: logger.error(f"Failed to update closed PnL/Fees for {coin}: {e}") def run(self): logger.info("Starting Unified Hedger Loop...") self.update_coin_decimals() # --- LOG SETTINGS ON START --- logger.info("=== HEDGER CONFIGURATION ===") for symbol, config in self.coin_configs.items(): logger.info(f"--- {symbol} ---") for k, v in config.items(): logger.info(f" {k}: {v}") logger.info("============================") while True: try: # 1. API Backoff if time.time() < self.api_backoff_until: time.sleep(1) continue # 2. Update Strategies if not self.scan_strategies(): logger.warning("Strategy scan failed (read error). Skipping execution tick.") time.sleep(1) continue # 3. Fetch Market Data (Centralized) try: mids = self.info.all_mids() user_state = self.info.user_state(self.vault_address or self.account.address) open_orders = self.get_open_orders() l2_snapshots = {} # Cache for snapshots except Exception as e: logger.error(f"API Error fetching data: {e}") time.sleep(1) continue # Map Open Orders orders_map = {} for o in open_orders: c = o['coin'] if c not in orders_map: orders_map[c] = [] orders_map[c].append(o) # Parse User State account_value = Decimal("0") if "marginSummary" in user_state and "accountValue" in user_state["marginSummary"]: account_value = to_decimal(user_state["marginSummary"]["accountValue"]) # Map current positions current_positions = {} # Coin -> Size current_pnls = {} # Coin -> Unrealized PnL current_entry_pxs = {} # Coin -> Entry Price (NEW) for pos in user_state["assetPositions"]: c = pos["position"]["coin"] s = to_decimal(pos["position"]["szi"]) u = to_decimal(pos["position"]["unrealizedPnl"]) e = to_decimal(pos["position"]["entryPx"]) current_positions[c] = s current_pnls[c] = u current_entry_pxs[c] = e # 4. Aggregate Targets # Coin -> { 'target_short': Decimal, 'contributors': int, 'is_at_edge': bool } aggregates = {} # First, update all prices from mids for active coins for coin in self.active_coins: if coin in mids: price = to_decimal(mids[coin]) self.last_prices[coin] = price # Update Price History if coin not in self.price_history: self.price_history[coin] = [] self.price_history[coin].append(price) if len(self.price_history[coin]) > 300: self.price_history[coin].pop(0) for key, strat in self.strategies.items(): coin = self.strategy_states[key]['coin'] status = self.strategy_states[key].get('status', 'OPEN') if coin not in self.last_prices: continue price = self.last_prices[coin] # Calc Logic calc = strat.calculate_rebalance(price, Decimal("0")) if coin not in aggregates: aggregates[coin] = {'target_short': Decimal("0"), 'contributors': 0, 'is_at_edge': False, 'adj_pct': Decimal("0"), 'is_closing': False} if status == 'CLOSING': # If Closing, we want target to be 0 for this strategy logger.info(f"[STRAT] {key[1]} is CLOSING -> Force Target 0") aggregates[coin]['is_closing'] = True # Do not add to target_short else: aggregates[coin]['target_short'] += calc['target_short'] aggregates[coin]['contributors'] += 1 aggregates[coin]['adj_pct'] = calc['adj_pct'] # Check Edge Proximity for Cleanup config = self.coin_configs.get(coin, {}) enable_cleanup = config.get("ENABLE_EDGE_CLEANUP", True) cleanup_margin = config.get("EDGE_CLEANUP_MARGIN_PCT", Decimal("0.02")) if enable_cleanup: dist_bottom_pct = (price - strat.low_range) / strat.low_range dist_top_pct = (strat.high_range - price) / strat.high_range range_width_pct = (strat.high_range - strat.low_range) / strat.low_range safety_margin_pct = range_width_pct * cleanup_margin if dist_bottom_pct < safety_margin_pct or dist_top_pct < safety_margin_pct: aggregates[coin]['is_at_edge'] = True # Check Shadow Orders (Pre-Execution) self.check_shadow_orders(l2_snapshots) # 5. Execute Per Coin # Union of coins with Active Strategies OR Active Positions coins_to_process = set(aggregates.keys()) for c, pos in current_positions.items(): if abs(pos) > 0: coins_to_process.add(c) for coin in coins_to_process: data = aggregates.get(coin, {'target_short': Decimal("0"), 'contributors': 0, 'is_at_edge': False, 'adj_pct': Decimal("0"), 'is_closing': False}) price = self.last_prices.get(coin, Decimal("0")) # FIX: Explicitly get price for this coin if price == 0: continue target_short_abs = data['target_short'] # Always positive (it's a magnitude of short) target_position = -target_short_abs # We want to be Short, so negative size current_pos = current_positions.get(coin, Decimal("0")) diff = target_position - current_pos # e.g. -1.0 - (-0.8) = -0.2 (Sell 0.2) diff_abs = abs(diff) # Thresholds config = self.coin_configs.get(coin, {}) min_thresh = config.get("min_threshold", Decimal("0.008")) # Volatility Multiplier vol_pct = self.calculate_volatility(coin) base_vol = Decimal("0.0005") vol_mult = max(Decimal("1.0"), min(Decimal("3.0"), vol_pct / base_vol)) if vol_pct > 0 else Decimal("1.0") base_rebalance_pct = config.get("BASE_REBALANCE_THRESHOLD_PCT", Decimal("0.20")) thresh_pct = min(Decimal("0.15"), base_rebalance_pct * vol_mult) dynamic_thresh = max(min_thresh, abs(target_position) * thresh_pct) # FORCE EDGE CLEANUP enable_edge_cleanup = config.get("ENABLE_EDGE_CLEANUP", True) if data['is_at_edge'] and enable_edge_cleanup: if dynamic_thresh > min_thresh: # logger.info(f"[EDGE] {coin} forced to min threshold.") dynamic_thresh = min_thresh # Check Trigger action_needed = diff_abs > dynamic_thresh # Determine Intent (Moved UP for Order Logic) is_buy_bool = diff > 0 side_str = "BUY" if is_buy_bool else "SELL" # Manage Existing Orders existing_orders = orders_map.get(coin, []) force_taker_retry = False # Fishing Config enable_fishing = config.get("ENABLE_FISHING", False) fishing_timeout = config.get("FISHING_TIMEOUT_FALLBACK", 30) # Check Existing Orders for compatibility order_matched = False price_buffer_pct = config.get("PRICE_BUFFER_PCT", Decimal("0.0015")) for o in existing_orders: o_oid = o['oid'] o_price = to_decimal(o['limitPx']) o_side = o['side'] # 'B' or 'A' o_timestamp = o.get('timestamp', int(time.time()*1000)) is_same_side = (o_side == 'B' and is_buy_bool) or (o_side == 'A' and not is_buy_bool) # Price Check (within buffer) dist_pct = abs(price - o_price) / price # Maker Timeout Check (General) maker_timeout = config.get("MAKER_ORDER_TIMEOUT", 300) order_age_sec = (int(time.time()*1000) - o_timestamp) / 1000.0 if is_same_side and order_age_sec > maker_timeout: logger.info(f"[TIMEOUT] {coin} Order {o_oid} expired ({order_age_sec:.1f}s > {maker_timeout}s). Cancelling to refresh.") self.cancel_order(coin, o_oid) continue # Fishing Timeout Check if enable_fishing and is_same_side and order_age_sec > fishing_timeout: logger.info(f"[FISHING] {coin} Order {o_oid} timed out ({order_age_sec:.1f}s > {fishing_timeout}s). Cancelling for Taker retry.") self.cancel_order(coin, o_oid) force_taker_retry = True continue # Do not mark matched, let it flow to execution if is_same_side and dist_pct < price_buffer_pct: order_matched = True if int(time.time()) % 10 == 0: logger.info(f"[WAIT] {coin} Pending {side_str} Order {o_oid} @ {o_price} (Dist: {dist_pct*100:.3f}%) | Age: {order_age_sec:.1f}s") break else: logger.info(f"Cancelling stale order {o_oid} ({o_side} @ {o_price})") self.cancel_order(coin, o_oid) # --- EXECUTION LOGIC --- if not order_matched: if action_needed or force_taker_retry: bypass_cooldown = False force_maker = False # 0. Forced Taker Retry (Fishing Timeout) if force_taker_retry: bypass_cooldown = True logger.info(f"[RETRY] {coin} Fishing Failed -> Force Taker") # 1. Urgent Closing -> Taker elif data.get('is_closing', False): bypass_cooldown = True logger.info(f"[URGENT] {coin} Closing Strategy -> Force Taker Exit") # 2. Ghost/Cleanup -> Maker elif data.get('contributors', 0) == 0: if time.time() - self.startup_time > 5: force_maker = True logger.info(f"[CLEANUP] {coin} Ghost Position -> Force Maker Reduce") else: logger.info(f"[STARTUP] Skipping Ghost Cleanup for {coin} (Grace Period)") continue # Skip execution for this coin # Large Hedge Check (Only Force Taker if AT EDGE) large_hedge_mult = config.get("LARGE_HEDGE_MULTIPLIER", Decimal("5.0")) if diff_abs > (dynamic_thresh * large_hedge_mult) and not force_maker and data.get('is_at_edge', False): bypass_cooldown = True logger.info(f"[WARN] LARGE HEDGE (Edge Protection): {diff_abs:.4f} > {dynamic_thresh:.4f} (x{large_hedge_mult})") elif diff_abs > (dynamic_thresh * large_hedge_mult) and not force_maker: # Large hedge but safe zone -> Maker is fine, but maybe log it logger.info(f"[INFO] Large Hedge (Safe Zone): {diff_abs:.4f}. Using Standard Execution.") last_trade = self.last_trade_times.get(coin, 0) min_time_trade = config.get("MIN_TIME_BETWEEN_TRADES", 60) can_trade = False if bypass_cooldown: can_trade = True elif time.time() - last_trade > min_time_trade: can_trade = True if can_trade: # Get Orderbook for Price if coin not in l2_snapshots: l2_snapshots[coin] = self.info.l2_snapshot(coin) levels = l2_snapshots[coin]['levels'] if not levels[0] or not levels[1]: continue bid = to_decimal(levels[0][0]['px']) ask = to_decimal(levels[1][0]['px']) # Price logic create_shadow = False # Decide Order Type: Taker (Ioc) or Maker (Alo) # Taker if: Urgent (bypass_cooldown) OR Fishing Disabled OR Force Maker is False (wait, Force Maker means Alo) # Logic: # If Force Maker -> Alo # Else if Urgent -> Ioc # Else if Enable Fishing -> Alo # Else -> Alo (Default non-urgent behavior was Alo anyway?) # Let's clarify: # Previous logic: if bypass_cooldown -> Ioc. Else -> Alo. # New logic: # If bypass_cooldown -> Ioc # Else -> Alo (Fishing) if bypass_cooldown and not force_maker: exec_price = ask * Decimal("1.001") if is_buy_bool else bid * Decimal("0.999") order_type = "Ioc" create_shadow = True else: # Fishing / Standard Maker exec_price = bid if is_buy_bool else ask order_type = "Alo" logger.info(f"[TRIG] Net {coin}: {side_str} {diff_abs:.4f} | Tgt: {target_position:.4f} / Cur: {current_pos:.4f} | Thresh: {dynamic_thresh:.4f} | Type: {order_type}") oid = self.place_limit_order(coin, is_buy_bool, diff_abs, exec_price, order_type) if oid: self.last_trade_times[coin] = time.time() # Shadow Order if create_shadow: shadow_price = bid if is_buy_bool else ask shadow_timeout = config.get("SHADOW_ORDER_TIMEOUT", 600) self.shadow_orders.append({ 'coin': coin, 'side': side_str, 'price': shadow_price, 'expires_at': time.time() + shadow_timeout }) logger.info(f"[SHADOW] Created Maker {side_str} @ {shadow_price:.2f}") # UPDATED: Sleep for API Lag (Phase 5.1) logger.info("Sleeping 10s to allow position update...") time.sleep(10) # --- UPDATE CLOSED PnL FROM API --- self._update_closed_pnl(coin) else: # Cooldown log pass else: # Action NOT needed # Cleanup any dangling orders if existing_orders: for o in existing_orders: logger.info(f"Cancelling idle order {o['oid']} ({o['side']} @ {o['limitPx']})") self.cancel_order(coin, o['oid']) # --- IDLE LOGGING (Restored Format) --- # Calculate aggregate Gamma to estimate triggers # Gamma = 0.5 * Sum(L) * P^-1.5 # We need Sum(L) for this coin. total_L = Decimal("0") # We need to re-iterate or cache L. # Simpler: Just re-sum L from active strats for this coin. for key, strat in self.strategies.items(): if self.strategy_states[key]['coin'] == coin: total_L += strat.L if total_L > 0 and price > 0: gamma = (Decimal("0.5") * total_L * (price ** Decimal("-1.5"))) if gamma > 0: # Equilibrium Price (Diff = 0) p_mid = price + (diff / gamma) # Triggers p_buy = price + (dynamic_thresh + diff) / gamma p_sell = price - (dynamic_thresh - diff) / gamma if int(time.time()) % 30 == 0: pad = " " if coin == "BNB" else "" adj_val = data.get('adj_pct', Decimal("0")) * 100 # PnL Calc unrealized = current_pnls.get(coin, Decimal("0")) closed_pnl_total = Decimal("0") fees_total = Decimal("0") for k, s_state in self.strategy_states.items(): if s_state['coin'] == coin: closed_pnl_total += s_state.get('hedge_TotPnL', Decimal("0")) fees_total += s_state.get('fees', Decimal("0")) total_pnl = (closed_pnl_total - fees_total) + unrealized pnl_pad = " " if unrealized >= 0 else "" tot_pnl_pad = " " if total_pnl >= 0 else "" logger.info(f"[IDLE] {coin} | Px: {price:.2f}{pad} | M: {p_mid:.1f}{pad} | B: {p_buy:.1f}{pad} / S: {p_sell:.1f}{pad} | delta: {target_position:.4f}({diff:+.4f}) | Adj: {adj_val:+.2f}%, Vol: {vol_mult:.2f}, Thr: {dynamic_thresh:.4f} | PnL: {unrealized:.2f}{pnl_pad} | TotPnL: {total_pnl:.2f}{tot_pnl_pad}") else: if int(time.time()) % 30 == 0: logger.info(f"[IDLE] {coin} | Px: {price:.2f} | delta: {target_position:.4f} | Diff: {diff:.4f} (Thresh: {dynamic_thresh:.4f})") else: if int(time.time()) % 30 == 0: logger.info(f"[IDLE] {coin} | Px: {price:.2f} | delta: {target_position:.4f} | Diff: {diff:.4f} (Thresh: {dynamic_thresh:.4f})") time.sleep(DEFAULT_STRATEGY.get("CHECK_INTERVAL", 1)) except KeyboardInterrupt: logger.info("Stopping...") break except Exception as e: logger.error(f"Loop Error: {e}", exc_info=True) time.sleep(5) if __name__ == "__main__": hedger = UnifiedHedger() hedger.run()