import os import sys import time import json import re import logging import math from decimal import Decimal, getcontext from datetime import datetime from typing import Optional, Dict, Tuple, Any, List from web3 import Web3 from web3.exceptions import TimeExhausted, ContractLogicError from web3.middleware import ExtraDataToPOAMiddleware # FIX for Web3.py v6+ from eth_account import Account from eth_account.signers.local import LocalAccount from dotenv import load_dotenv # --- IMPORTS FOR KPI --- try: from tools.kpi_tracker import log_kpi_snapshot except ImportError: logging.warning("KPI Tracker not found. Performance logging disabled.") log_kpi_snapshot = None # Set Decimal precision high enough for EVM math getcontext().prec = 60 # --- LOGGING SETUP --- current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(current_dir) # Ensure logs directory exists log_dir = os.path.join(current_dir, 'logs') os.makedirs(log_dir, exist_ok=True) try: from logging_utils import setup_logging # Assuming setup_logging might handle file logging if configured, # but to be safe and explicit as requested, we'll add a FileHandler here # or rely on setup_logging if it supports it. # Since I don't see setup_logging code, I will manually add a file handler to the logger. logger = setup_logging("normal", "UNISWAP_MANAGER") except ImportError: logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("UNISWAP_MANAGER") # Custom Filter for Millisecond Unix Timestamp class UnixMsLogFilter(logging.Filter): def filter(self, record): record.unix_ms = int(record.created * 1000) return True # Add File Handler log_file = os.path.join(log_dir, 'uniswap_manager.log') file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.INFO) file_handler.addFilter(UnixMsLogFilter()) formatter = logging.Formatter('%(unix_ms)d, %(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) logger.addHandler(file_handler) from clp_abis import ( NONFUNGIBLE_POSITION_MANAGER_ABI, UNISWAP_V3_POOL_ABI, ERC20_ABI, UNISWAP_V3_FACTORY_ABI, AERODROME_FACTORY_ABI, AERODROME_POOL_ABI, AERODROME_NPM_ABI, SWAP_ROUTER_ABI, WETH9_ABI ) from clp_config import get_current_config, STATUS_FILE from tools.universal_swapper import execute_swap # --- GET ACTIVE DEX CONFIG --- CONFIG = get_current_config() DEX_TO_CHAIN = { "UNISWAP_V3": "ARBITRUM", "UNISWAP_wide": "ARBITRUM", "PANCAKESWAP_BNB": "BSC", "WETH_CBBTC_BASE": "BASE", "UNISWAP_BASE_CL": "BASE", "AERODROME_BASE_CL": "BASE", "AERODROME_WETH-USDC_008": "BASE" } # --- CONFIGURATION FROM STRATEGY --- MONITOR_INTERVAL_SECONDS = CONFIG.get("MONITOR_INTERVAL_SECONDS", 60) CLOSE_POSITION_ENABLED = CONFIG.get("CLOSE_POSITION_ENABLED", True) OPEN_POSITION_ENABLED = CONFIG.get("OPEN_POSITION_ENABLED", True) REBALANCE_ON_CLOSE_BELOW_RANGE = CONFIG.get("REBALANCE_ON_CLOSE_BELOW_RANGE", True) TARGET_INVESTMENT_VALUE_USDC = CONFIG.get("TARGET_INVESTMENT_AMOUNT", 2000) INITIAL_HEDGE_CAPITAL_USDC = CONFIG.get("INITIAL_HEDGE_CAPITAL", 1000) RANGE_WIDTH_PCT = CONFIG.get("RANGE_WIDTH_PCT", Decimal("0.01")) RANGE_MODE = CONFIG.get("RANGE_MODE", "FIXED") SLIPPAGE_TOLERANCE = CONFIG.get("SLIPPAGE_TOLERANCE", Decimal("0.02")) TRANSACTION_TIMEOUT_SECONDS = CONFIG.get("TRANSACTION_TIMEOUT_SECONDS", 30) # --- AUTO RANGE HELPERS --- def get_market_indicators() -> Optional[Dict]: file_path = os.path.join("market_data", "indicators.json") if not os.path.exists(file_path): return None try: with open(file_path, 'r') as f: data = json.load(f) # Check Freshness (5m) last_updated_str = data.get("last_updated") if not last_updated_str: return None last_updated = datetime.fromisoformat(last_updated_str) if (datetime.now() - last_updated).total_seconds() > 300: logger.warning("⚠️ Market indicators file is stale (>5m).") return None return data except Exception as e: logger.error(f"Error reading indicators: {e}") return None def calculate_dynamic_range_pct(coin: str) -> Optional[Decimal]: indicators = get_market_indicators() if not indicators: return None # Normalize symbols (Hyperliquid uses ETH, BNB while DEX uses WETH, WBNB) symbol_map = {"WETH": "ETH", "WBNB": "BNB"} lookup_coin = symbol_map.get(coin.upper(), coin.upper()) coin_data = indicators.get("data", {}).get(lookup_coin) if not coin_data: return None try: price = Decimal(str(coin_data["current_price"])) bb12 = coin_data["bb"]["12h"] bb_low = Decimal(str(bb12["lower"])) bb_high = Decimal(str(bb12["upper"])) ma88 = Decimal(str(coin_data["ma"]["88"])) # Condition 2: Price inside BB 12h if not (bb_low <= price <= bb_high): logger.warning(f"⚖️ AUTO: Price {price:.2f} is outside BB 12h ({bb_low:.2f} - {bb_high:.2f}). Skipping AUTO.") return None # Condition 3: MA 88 inside BB 12h if not (bb_low <= ma88 <= bb_high): logger.warning(f"⚖️ AUTO: MA 88 {ma88:.2f} is outside BB 12h. Skipping AUTO.") return None # Calculation: Max distance to BB edge dist_low = abs(price - bb_low) dist_high = abs(price - bb_high) max_dist = max(dist_low, dist_high) range_pct = max_dist / price return range_pct except (KeyError, TypeError, ValueError) as e: logger.error(f"Error in dynamic range calc: {e}") return None # --- CONFIGURATION CONSTANTS --- NONFUNGIBLE_POSITION_MANAGER_ADDRESS = CONFIG["NPM_ADDRESS"] # Router address not strictly needed for Manager if using universal_swapper, but kept for ref UNISWAP_V3_SWAP_ROUTER_ADDRESS = CONFIG["ROUTER_ADDRESS"] # Arbitrum WETH/USDC (or generic T0/T1) WETH_ADDRESS = CONFIG["WRAPPED_NATIVE_ADDRESS"] USDC_ADDRESS = CONFIG["TOKEN_B_ADDRESS"] POOL_FEE = CONFIG.get("POOL_FEE", 500) # --- HELPER FUNCTIONS --- def clean_address(addr: str) -> str: """Ensure address is checksummed.""" if not Web3.is_address(addr): raise ValueError(f"Invalid address: {addr}") return Web3.to_checksum_address(addr) def to_decimal(value: Any, decimals: int = 0) -> Decimal: """Convert value to Decimal, optionally scaling down by decimals.""" if isinstance(value, Decimal): return value return Decimal(value) / (Decimal(10) ** decimals) def to_wei_int(value: Decimal, decimals: int) -> int: """Convert Decimal value to integer Wei representation.""" return int(value * (Decimal(10) ** decimals)) def get_gas_params(w3: Web3) -> Dict[str, int]: """Get dynamic gas parameters for EIP-1559.""" latest_block = w3.eth.get_block("latest") base_fee = latest_block['baseFeePerGas'] # Priority fee: 0.1 gwei or dynamic max_priority_fee = w3.eth.max_priority_fee or Web3.to_wei(0.1, 'gwei') # Max Fee = Base Fee * 1.5 + Priority Fee max_fee = int(base_fee * 1.25) + max_priority_fee return { 'maxFeePerGas': max_fee, 'maxPriorityFeePerGas': max_priority_fee } def send_transaction_robust( w3: Web3, account: LocalAccount, func_call: Any, value: int = 0, gas_limit: Optional[int] = None, extra_msg: str = "" ) -> Optional[Any]: """ Builds, signs, sends, and waits for a transaction with timeout and status check. """ try: # 1. Prepare Params # Use 'pending' to ensure we get the correct nonce if a tx was just sent/mined tx_params = { 'from': account.address, 'nonce': w3.eth.get_transaction_count(account.address, 'pending'), 'value': value, 'chainId': w3.eth.chain_id, } # 2. Add Gas Params gas_fees = get_gas_params(w3) tx_params.update(gas_fees) # 3. Simulate (Call) & Estimate Gas try: # If function call object provided if hasattr(func_call, 'call'): func_call.call({'from': account.address, 'value': value}) # Safety Dry-Run estimated_gas = func_call.estimate_gas({'from': account.address, 'value': value}) else: # Raw transaction construction if func_call is just params dict (rare here) estimated_gas = 200000 tx_params['gas'] = gas_limit if gas_limit else int(estimated_gas * 1.2) # 20% buffer # Build if hasattr(func_call, 'build_transaction'): tx = func_call.build_transaction(tx_params) else: raise ValueError("Invalid function call object") except ContractLogicError as e: logger.error(f"❌ Simulation/Estimation failed for {extra_msg}: {e}") return None # 4. Sign signed_tx = account.sign_transaction(tx) # 5. Send tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction) logger.info(f"📤 Sent {extra_msg} | Hash: {tx_hash.hex()}") # 6. Wait for Receipt receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=TRANSACTION_TIMEOUT_SECONDS) # 7. Verify Status if receipt.status == 1: logger.info(f"✅ Executed {extra_msg} | Block: {receipt.blockNumber}") return receipt else: logger.error(f"❌ Transaction Reverted {extra_msg} | Hash: {tx_hash.hex()}") return None except TimeExhausted: logger.error(f"⌛ Transaction Timeout {extra_msg} - Check Mempool") # In a full production bot, we would implement gas bumping here. return None except Exception as e: logger.error(f"❌ Transaction Error {extra_msg}: {e}") return None def price_from_sqrt_price_x96(sqrt_price_x96: int, token0_decimals: int, token1_decimals: int) -> Decimal: """ Returns price of Token0 in terms of Token1. """ sqrt_price = Decimal(sqrt_price_x96) q96 = Decimal(2) ** 96 price = (sqrt_price / q96) ** 2 # Adjust for decimals: Price = (T1 / 10^d1) / (T0 / 10^d0) # = (T1/T0) * (10^d0 / 10^d1) adjustment = Decimal(10) ** (token0_decimals - token1_decimals) return price * adjustment def price_from_tick(tick: int, token0_decimals: int, token1_decimals: int) -> Decimal: price = Decimal("1.0001") ** tick adjustment = Decimal(10) ** (token0_decimals - token1_decimals) return price * adjustment def get_sqrt_ratio_at_tick(tick: int) -> int: return int((1.0001 ** (tick / 2)) * (2 ** 96)) def get_amounts_for_liquidity(sqrt_ratio_current: int, sqrt_ratio_a: int, sqrt_ratio_b: int, liquidity: int) -> Tuple[int, int]: if sqrt_ratio_a > sqrt_ratio_b: sqrt_ratio_a, sqrt_ratio_b = sqrt_ratio_b, sqrt_ratio_a amount0 = 0 amount1 = 0 Q96 = 1 << 96 # Calculations performed in high-precision integer math (EVM style) if sqrt_ratio_current <= sqrt_ratio_a: amount0 = (liquidity * Q96 // sqrt_ratio_a) - (liquidity * Q96 // sqrt_ratio_b) amount1 = 0 elif sqrt_ratio_current < sqrt_ratio_b: amount0 = (liquidity * Q96 // sqrt_ratio_current) - (liquidity * Q96 // sqrt_ratio_b) amount1 = (liquidity * (sqrt_ratio_current - sqrt_ratio_a)) // Q96 else: amount1 = (liquidity * (sqrt_ratio_b - sqrt_ratio_a)) // Q96 amount0 = 0 return amount0, amount1 # --- CORE LOGIC --- def get_position_details(w3: Web3, npm_contract, factory_contract, token_id: int): try: # Check ownership first to avoid errors? positions() works regardless of owner usually. position_data = npm_contract.functions.positions(token_id).call() (nonce, operator, token0_address, token1_address, fee, tickLower, tickUpper, liquidity, feeGrowthInside0, feeGrowthInside1, tokensOwed0, tokensOwed1) = position_data token0_contract = w3.eth.contract(address=token0_address, abi=ERC20_ABI) token1_contract = w3.eth.contract(address=token1_address, abi=ERC20_ABI) # Multi-call optimization could be used here, but keeping simple for now token0_symbol = token0_contract.functions.symbol().call() token1_symbol = token1_contract.functions.symbol().call() token0_decimals = token0_contract.functions.decimals().call() token1_decimals = token1_contract.functions.decimals().call() pool_address = factory_contract.functions.getPool(token0_address, token1_address, fee).call() if pool_address == '0x0000000000000000000000000000000000000000': return None, None pool_abi = AERODROME_POOL_ABI if "AERODROME" in CONFIG.get("NAME", "").upper() else UNISWAP_V3_POOL_ABI pool_contract = w3.eth.contract(address=pool_address, abi=pool_abi) return { "token0_address": token0_address, "token1_address": token1_address, "token0_symbol": token0_symbol, "token1_symbol": token1_symbol, "token0_decimals": token0_decimals, "token1_decimals": token1_decimals, "fee": fee, "tickLower": tickLower, "tickUpper": tickUpper, "liquidity": liquidity, "pool_address": pool_address }, pool_contract except Exception as e: logger.error(f"❌ Error fetching position details for ID {token_id}: {e}") return None, None def get_pool_dynamic_data(pool_contract) -> Optional[Dict[str, Any]]: try: slot0 = pool_contract.functions.slot0().call() return {"sqrtPriceX96": slot0[0], "tick": slot0[1]} except Exception as e: logger.error(f"❌ Pool data fetch failed: {e}") return None def calculate_mint_amounts(current_tick, tick_lower, tick_upper, investment_value_token1: Decimal, decimals0, decimals1, sqrt_price_current_x96) -> Tuple[int, int]: """ Calculates required token amounts for a target investment value. Uses precise Decimal math. """ sqrt_price_current = get_sqrt_ratio_at_tick(current_tick) sqrt_price_lower = get_sqrt_ratio_at_tick(tick_lower) sqrt_price_upper = get_sqrt_ratio_at_tick(tick_upper) # Price of T0 in T1 price_t0_in_t1 = price_from_sqrt_price_x96(sqrt_price_current_x96, decimals0, decimals1) # Calculate amounts for a "Test" liquidity amount L_test = 1 << 128 amt0_test_wei, amt1_test_wei = get_amounts_for_liquidity(sqrt_price_current, sqrt_price_lower, sqrt_price_upper, L_test) amt0_test = Decimal(amt0_test_wei) / Decimal(10**decimals0) amt1_test = Decimal(amt1_test_wei) / Decimal(10**decimals1) # Value in Token1 terms value_test = (amt0_test * price_t0_in_t1) + amt1_test if value_test <= 0: return 0, 0 scale = investment_value_token1 / value_test final_amt0_wei = int(Decimal(amt0_test_wei) * scale) final_amt1_wei = int(Decimal(amt1_test_wei) * scale) return final_amt0_wei, final_amt1_wei def ensure_allowance(w3: Web3, account: LocalAccount, token_address: str, spender_address: str, amount_needed: int) -> bool: """ Checks if allowance is sufficient, approves if not. """ try: token_c = w3.eth.contract(address=token_address, abi=ERC20_ABI) allowance = token_c.functions.allowance(account.address, spender_address).call() if allowance >= amount_needed: return True logger.info(f"🔓 Approving {token_address} for {spender_address}...") # Some tokens (USDT) fail if approving from non-zero to non-zero. # Safe practice: Approve 0 first if allowance > 0, then new amount. if allowance > 0: send_transaction_robust(w3, account, token_c.functions.approve(spender_address, 0), extra_msg="Reset Allowance") # Approve receipt = send_transaction_robust( w3, account, token_c.functions.approve(spender_address, amount_needed), extra_msg=f"Approve {token_address}" ) return receipt is not None except Exception as e: logger.error(f"❌ Allowance check/approve failed: {e}") return False def check_and_swap_for_deposit(w3: Web3, router_contract, account: LocalAccount, token0: str, token1: str, amount0_needed: int, amount1_needed: int, sqrt_price_x96: int, d0: int, d1: int) -> bool: """ Checks balances, wraps ETH if needed, and swaps ONLY the required surplus to meet deposit requirements. Uses universal_swapper for the swap execution. """ token0 = clean_address(token0) token1 = clean_address(token1) token0_c = w3.eth.contract(address=token0, abi=ERC20_ABI) token1_c = w3.eth.contract(address=token1, abi=ERC20_ABI) bal0 = token0_c.functions.balanceOf(account.address).call() bal1 = token1_c.functions.balanceOf(account.address).call() # Calculate Deficits deficit0 = max(0, amount0_needed - bal0) deficit1 = max(0, amount1_needed - bal1) weth_lower = WETH_ADDRESS.lower() # --- AUTO WRAP ETH --- if (deficit0 > 0 and token0.lower() == weth_lower) or (deficit1 > 0 and token1.lower() == weth_lower): eth_bal = w3.eth.get_balance(account.address) # Keep 0.01 ETH for gas gas_reserve = Web3.to_wei(0.01, 'ether') available_eth = max(0, eth_bal - gas_reserve) wrap_needed = 0 if token0.lower() == weth_lower: wrap_needed += deficit0 if token1.lower() == weth_lower: wrap_needed += deficit1 amount_to_wrap = min(wrap_needed, available_eth) if amount_to_wrap > 0: logger.info(f"🌯 Wrapping {Web3.from_wei(amount_to_wrap, 'ether')} ETH...") weth_c = w3.eth.contract(address=WETH_ADDRESS, abi=WETH9_ABI) receipt = send_transaction_robust(w3, account, weth_c.functions.deposit(), value=amount_to_wrap, extra_msg="Wrap ETH") if receipt: # Refresh Balances bal0 = token0_c.functions.balanceOf(account.address).call() bal1 = token1_c.functions.balanceOf(account.address).call() deficit0 = max(0, amount0_needed - bal0) deficit1 = max(0, amount1_needed - bal1) if deficit0 == 0 and deficit1 == 0: return True # --- SWAP SURPLUS --- # Smart Swap: Calculate exactly how much we need to swap # Price of Token0 in terms of Token1 price_0_in_1 = price_from_sqrt_price_x96(sqrt_price_x96, d0, d1) chain_name = DEX_TO_CHAIN.get(os.environ.get("TARGET_DEX", "UNISWAP_V3"), "ARBITRUM") token_in_sym, token_out_sym = None, None amount_in_float = 0.0 buffer_multiplier = Decimal("1.03") if deficit0 > 0 and bal1 > amount1_needed: # Need T0 (ETH), Have extra T1 (USDC) # Swap T1 -> T0 # Cost in T1 = Deficit0 * Price(T0 in T1) cost_in_t1 = Decimal(deficit0) / Decimal(10**d0) * price_0_in_1 # Convert back to T1 Wei and apply buffer amount_in_needed = int(cost_in_t1 * Decimal(10**d1) * buffer_multiplier) surplus1 = bal1 - amount1_needed if surplus1 >= amount_in_needed: # Get Symbols token_in_sym = token1_c.functions.symbol().call().upper() token_out_sym = token0_c.functions.symbol().call().upper() amount_in_float = float(Decimal(amount_in_needed) / Decimal(10**d1)) logger.info(f"🧮 Calc: Need {deficit0} T0. Cost ~{amount_in_needed} T1. Surplus: {surplus1}") else: logger.warning(f"❌ Insufficient Surplus T1. Need {amount_in_needed}, Have {surplus1}") elif deficit1 > 0 and bal0 > amount0_needed: # Need T1 (USDC), Have extra T0 (ETH) # Swap T0 -> T1 # Cost in T0 = Deficit1 / Price(T0 in T1) if price_0_in_1 > 0: cost_in_t0 = (Decimal(deficit1) / Decimal(10**d1)) / price_0_in_1 amount_in_needed = int(cost_in_t0 * Decimal(10**d0) * buffer_multiplier) surplus0 = bal0 - amount0_needed if surplus0 >= amount_in_needed: token_in_sym = token0_c.functions.symbol().call().upper() token_out_sym = token1_c.functions.symbol().call().upper() amount_in_float = float(Decimal(amount_in_needed) / Decimal(10**d0)) logger.info(f"🧮 Calc: Need {deficit1} T1. Cost ~{amount_in_needed} T0. Surplus: {surplus0}") else: logger.warning(f"❌ Insufficient Surplus T0. Need {amount_in_needed}, Have {surplus0}") if token_in_sym and amount_in_float > 0: logger.info(f"🔄 Delegating Swap to Universal Swapper: {amount_in_float} {token_in_sym} -> {token_out_sym} on {chain_name}...") try: # Use Standard Fee (500) if configured fee is weird (like 1 for Aerodrome tickSpacing) # This ensures the standard router finds a valid pool (WETH/USDC 0.05%) swap_fee = POOL_FEE if POOL_FEE >= 100 else 500 # Call Universal Swapper execute_swap(chain_name, token_in_sym, token_out_sym, amount_in_float, fee_tier=swap_fee) # Wait for node indexing logger.info("⏳ Waiting for balance update...") time.sleep(2) # Retry check loop for i in range(3): bal0 = token0_c.functions.balanceOf(account.address).call() bal1 = token1_c.functions.balanceOf(account.address).call() if bal0 >= amount0_needed and bal1 >= amount1_needed: logger.info("✅ Balances sufficient.") return True if i < 2: logger.info(f"⏳ Balance not updated yet, retrying ({i+1}/3)...") time.sleep(2) logger.warning(f"⚠️ Swap executed but still short? T0: {bal0}/{amount0_needed}, T1: {bal1}/{amount1_needed}") return False except Exception as e: logger.error(f"❌ Universal Swap Failed: {e}") return False logger.warning(f"❌ Insufficient funds (No suitable swap found). T0: {bal0}/{amount0_needed}, T1: {bal1}/{amount1_needed}") return False def mint_new_position(w3: Web3, npm_contract, account: LocalAccount, token0: str, token1: str, amount0: int, amount1: int, tick_lower: int, tick_upper: int, d0: int, d1: int) -> Optional[Dict]: """ Approves tokens and mints a new V3 position. """ logger.info("🚀 Minting new position...") # 1. Approve if not ensure_allowance(w3, account, token0, NONFUNGIBLE_POSITION_MANAGER_ADDRESS, amount0): return None if not ensure_allowance(w3, account, token1, NONFUNGIBLE_POSITION_MANAGER_ADDRESS, amount1): return None # 2. Calculate Min Amounts (Slippage Protection) # Using 0.5% slippage tolerance amount0_min = int(Decimal(amount0) * (Decimal(1) - SLIPPAGE_TOLERANCE)) amount1_min = int(Decimal(amount1) * (Decimal(1) - SLIPPAGE_TOLERANCE)) # 3. Mint base_params = [ token0, token1, POOL_FEE, tick_lower, tick_upper, amount0, amount1, amount0_min, amount1_min, account.address, int(time.time()) + 180 ] # Aerodrome Slipstream expects sqrtPriceX96 as the last parameter if "AERODROME" in os.environ.get("TARGET_DEX", "").upper(): base_params.append(0) # sqrtPriceX96 params = tuple(base_params) receipt = send_transaction_robust(w3, account, npm_contract.functions.mint(params), extra_msg="Mint Position") if receipt and receipt.status == 1: # Parse Logs try: # Transfer Event (Topic0) transfer_topic = Web3.keccak(text="Transfer(address,address,uint256)").hex() # IncreaseLiquidity Event (Topic0) increase_liq_topic = Web3.keccak(text="IncreaseLiquidity(uint256,uint128,uint256,uint256)").hex() minted_data = {'token_id': None, 'liquidity': 0, 'amount0': 0, 'amount1': 0} for log in receipt.logs: topics = [t.hex() for t in log['topics']] # Capture Token ID if topics[0] == transfer_topic: if "0000000000000000000000000000000000000000" in topics[1]: minted_data['token_id'] = int(topics[3], 16) # Capture Amounts if topics[0] == increase_liq_topic: # decoding data: liquidity(uint128), amount0(uint256), amount1(uint256) # data is a single hex string, we need to decode it data = log['data'].hex() if data.startswith('0x'): data = data[2:] # liquidity is first 32 bytes (padded), amt0 next 32, amt1 next 32 minted_data['liquidity'] = int(data[0:64], 16) minted_data['amount0'] = int(data[64:128], 16) minted_data['amount1'] = int(data[128:192], 16) if minted_data['token_id']: # Format for Log using actual decimals fmt_amt0 = Decimal(minted_data['amount0']) / Decimal(10**d0) fmt_amt1 = Decimal(minted_data['amount1']) / Decimal(10**d1) logger.info(f"✅ POSITION OPENED | ID: {minted_data['token_id']} | Deposited: {fmt_amt0:.6f} + {fmt_amt1:.6f}") # --- VERIFY TICKS ON-CHAIN --- try: pos_data = npm_contract.functions.positions(minted_data['token_id']).call() # pos_data structure: nonce, operator, t0, t1, fee, tickLower, tickUpper, ... minted_data['tick_lower'] = pos_data[5] minted_data['tick_upper'] = pos_data[6] logger.info(f"🔗 Verified Ticks: {minted_data['tick_lower']} <-> {minted_data['tick_upper']}") except Exception as e: logger.warning(f"⚠️ Could not verify ticks immediately: {e}") # Fallback to requested ticks if fetch fails minted_data['tick_lower'] = tick_lower minted_data['tick_upper'] = tick_upper return minted_data except Exception as e: logger.warning(f"Minted but failed to parse details: {e}") return None def decrease_liquidity(w3: Web3, npm_contract, account: LocalAccount, token_id: int, liquidity: int, d0: int, d1: int) -> bool: if liquidity == 0: return True logger.info(f"📉 Decreasing Liquidity for {token_id}...") params = ( token_id, liquidity, 0, 0, # amountMin0, amountMin1 int(time.time()) + 180 ) receipt = send_transaction_robust(w3, account, npm_contract.functions.decreaseLiquidity(params), extra_msg=f"Decrease Liq {token_id}") if receipt and receipt.status == 1: try: # Parse DecreaseLiquidity Event decrease_topic = Web3.keccak(text="DecreaseLiquidity(uint256,uint128,uint256,uint256)").hex() amt0, amt1 = 0, 0 for log in receipt.logs: topics = [t.hex() for t in log['topics']] if topics[0] == decrease_topic: # Check tokenID (topic 1) if int(topics[1], 16) == token_id: data = log['data'].hex()[2:] # liquidity (32), amt0 (32), amt1 (32) amt0 = int(data[64:128], 16) amt1 = int(data[128:192], 16) break fmt_amt0 = Decimal(amt0) / Decimal(10**d0) fmt_amt1 = Decimal(amt1) / Decimal(10**d1) logger.info(f"📉 POSITION CLOSED (Liquidity Removed) | ID: {token_id} | Withdrawn: {fmt_amt0:.6f} + {fmt_amt1:.6f}") except Exception as e: logger.warning(f"Closed but failed to parse details: {e}") return True return False def collect_fees(w3: Web3, npm_contract, account: LocalAccount, token_id: int) -> bool: logger.info(f"💰 Collecting Fees for {token_id}...") max_val = 2**128 - 1 params = ( token_id, account.address, max_val, max_val ) receipt = send_transaction_robust(w3, account, npm_contract.functions.collect(params), extra_msg=f"Collect Fees {token_id}") return receipt is not None # --- STATE MANAGEMENT --- def load_status_data() -> List[Dict]: if not os.path.exists(STATUS_FILE): return [] try: with open(STATUS_FILE, 'r') as f: return json.load(f) except: return [] def save_status_data(data: List[Dict]): with open(STATUS_FILE, 'w') as f: json.dump(data, f, indent=2) def update_position_status(token_id: int, status: str, extra_data: Dict = {}): data = load_status_data() # Find existing or create new entry = next((item for item in data if item.get('token_id') == token_id), None) if not entry: if status in ["OPEN", "PENDING_HEDGE"]: entry = {"type": "AUTOMATIC", "token_id": token_id} data.append(entry) else: return # Can't update non-existent position unless opening entry['status'] = status entry.update(extra_data) if status == "CLOSED": now = datetime.now() entry['timestamp_close'] = int(now.timestamp()) entry['time_close'] = now.strftime("%d.%m.%y %H:%M:%S") save_status_data(data) logger.info(f"💾 Updated Position {token_id} status to {status}") import argparse import requests # --- REAL-TIME ORACLE HELPER --- def get_realtime_price(coin: str) -> Optional[Decimal]: """Fetches current mid-price directly from Hyperliquid API (low latency).""" try: url = "https://api.hyperliquid.xyz/info" response = requests.post(url, json={"type": "allMids"}, timeout=2) if response.status_code == 200: data = response.json() # Hyperliquid symbols are usually clean (ETH, BNB) # Map common variations just in case target = coin.upper().replace("WETH", "ETH").replace("WBNB", "BNB") if target in data: return Decimal(data[target]) except Exception as e: logger.warning(f"⚠️ Failed to fetch realtime Oracle price: {e}") return None # --- MAIN LOOP --- def main(): # --- ARGUMENT PARSING --- parser = argparse.ArgumentParser(description="Uniswap CLP Manager") parser.add_argument("--force", type=float, help="Force open a position with specific range width (e.g., 0.75), ignoring AUTO safe checks.") args = parser.parse_args() force_mode_active = False force_width_pct = Decimal("0") if args.force: force_mode_active = True force_width_pct = Decimal(str(args.force)) / 100 # Convert 0.75 -> 0.0075 logger.warning(f"🚨 FORCE MODE ACTIVE: Will bypass safe checks for FIRST position with width {args.force}%") logger.info(f"🔷 {CONFIG['NAME']} Manager V2 Starting...") load_dotenv(override=True) # Dynamically load the RPC based on DEX Profile rpc_url = os.environ.get(CONFIG["RPC_ENV_VAR"]) private_key = os.environ.get("MAIN_WALLET_PRIVATE_KEY") or os.environ.get("PRIVATE_KEY") if not rpc_url or not private_key: logger.error("❌ Missing RPC or Private Key in .env") return w3 = Web3(Web3.HTTPProvider(rpc_url)) if not w3.is_connected(): logger.error("❌ Could not connect to RPC") return # FIX: Inject POA middleware for BNB Chain/Polygon/etc. (Web3.py v6+) w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) account = Account.from_key(private_key) logger.info(f"👤 Wallet: {account.address}") # Contracts target_dex_name = os.environ.get("TARGET_DEX", "").upper() if "AERODROME" in target_dex_name or "AERODROME" in CONFIG.get("NAME", "").upper(): logger.info("✈️ Using Aerodrome NPM ABI") npm = w3.eth.contract(address=clean_address(NONFUNGIBLE_POSITION_MANAGER_ADDRESS), abi=AERODROME_NPM_ABI) else: npm = w3.eth.contract(address=clean_address(NONFUNGIBLE_POSITION_MANAGER_ADDRESS), abi=NONFUNGIBLE_POSITION_MANAGER_ABI) factory_addr = npm.functions.factory().call() # Select Factory ABI based on DEX type if "AERODROME" in target_dex_name or "AERODROME" in CONFIG.get("NAME", "").upper(): logger.info("✈️ Using Aerodrome Factory ABI (tickSpacing instead of fee)") factory_abi = AERODROME_FACTORY_ABI else: factory_abi = UNISWAP_V3_FACTORY_ABI factory = w3.eth.contract(address=factory_addr, abi=factory_abi) router = w3.eth.contract(address=clean_address(UNISWAP_V3_SWAP_ROUTER_ADDRESS), abi=SWAP_ROUTER_ABI) while True: try: status_data = load_status_data() open_positions = [p for p in status_data if p.get('status') == 'OPEN'] active_auto_pos = next((p for p in open_positions if p.get('type') == 'AUTOMATIC'), None) if active_auto_pos: token_id = active_auto_pos['token_id'] pos_details, pool_c = get_position_details(w3, npm, factory, token_id) if pos_details: pool_data = get_pool_dynamic_data(pool_c) current_tick = pool_data['tick'] # Check Range tick_lower = pos_details['tickLower'] tick_upper = pos_details['tickUpper'] in_range = tick_lower <= current_tick < tick_upper # Calculate Prices for logging price_0_in_1 = price_from_tick(current_tick, pos_details['token0_decimals'], pos_details['token1_decimals']) # --- SMART STABLE DETECTION --- # Determine which token is the "Stable" side to anchor USD value stable_symbols = ["USDC", "USDT", "DAI", "FDUSD", "USDS"] is_t1_stable = any(s in pos_details['token1_symbol'].upper() for s in stable_symbols) is_t0_stable = any(s in pos_details['token0_symbol'].upper() for s in stable_symbols) if is_t1_stable: # Standard: T0=Volatile, T1=Stable. Price = T1 per T0 current_price = price_0_in_1 lower_price = price_from_tick(tick_lower, pos_details['token0_decimals'], pos_details['token1_decimals']) upper_price = price_from_tick(tick_upper, pos_details['token0_decimals'], pos_details['token1_decimals']) elif is_t0_stable: # Inverted: T0=Stable, T1=Volatile. Price = T0 per T1 # We want Price of T1 in terms of T0 current_price = Decimal("1") / price_0_in_1 lower_price = Decimal("1") / price_from_tick(tick_upper, pos_details['token0_decimals'], pos_details['token1_decimals']) upper_price = Decimal("1") / price_from_tick(tick_lower, pos_details['token0_decimals'], pos_details['token1_decimals']) else: # Fallback to T1 current_price = price_0_in_1 lower_price = price_from_tick(tick_lower, pos_details['token0_decimals'], pos_details['token1_decimals']) upper_price = price_from_tick(tick_upper, pos_details['token0_decimals'], pos_details['token1_decimals']) # --- RANGE DISPLAY --- # Calculate ranges from ticks for display purposes real_range_lower = round(float(lower_price), 4) real_range_upper = round(float(upper_price), 4) status_msg = "✅ IN RANGE" if in_range else "⚠️ OUT OF RANGE" # Calculate Unclaimed Fees (Simulation) unclaimed0, unclaimed1, total_fees_usd = 0, 0, 0 try: # Call collect with zero address to simulate fee estimation fees_sim = npm.functions.collect((token_id, "0x0000000000000000000000000000000000000000", 2**128-1, 2**128-1)).call({'from': account.address}) u0 = to_decimal(fees_sim[0], pos_details['token0_decimals']) u1 = to_decimal(fees_sim[1], pos_details['token1_decimals']) if is_t1_stable: total_fees_usd = (u0 * current_price) + u1 else: total_fees_usd = u0 + (u1 * current_price) except Exception as e: logger.debug(f"Fee simulation failed for {token_id}: {e}") # Calculate Total PnL (Fees + Price Appreciation/Depreciation) # We need the initial investment value (target_value) initial_value = Decimal(str(active_auto_pos.get('target_value', 0))) curr_amt0_wei, curr_amt1_wei = get_amounts_for_liquidity( pool_data['sqrtPriceX96'], get_sqrt_ratio_at_tick(tick_lower), get_sqrt_ratio_at_tick(tick_upper), pos_details['liquidity'] ) curr_amt0 = Decimal(curr_amt0_wei) / Decimal(10**pos_details['token0_decimals']) curr_amt1 = Decimal(curr_amt1_wei) / Decimal(10**pos_details['token1_decimals']) if is_t1_stable: current_pos_value_usd = (curr_amt0 * current_price) + curr_amt1 else: current_pos_value_usd = curr_amt0 + (curr_amt1 * current_price) pnl_unrealized = current_pos_value_usd - initial_value total_pnl_usd = pnl_unrealized + total_fees_usd # --- PERSIST PERFORMANCE TO JSON --- update_position_status(token_id, "OPEN", { "clp_fees": round(float(total_fees_usd), 2), "clp_TotPnL": round(float(total_pnl_usd), 2) }) # Calculate Fees/h fees_per_h_str = "0.00" ts_open = active_auto_pos.get('timestamp_open') if ts_open: hours_open = (time.time() - ts_open) / 3600 if hours_open > 0.01: fees_per_h_str = f"{float(total_fees_usd) / hours_open:.2f}" pnl_text = f" | TotPnL: ${total_pnl_usd:.2f} (Fees: ${total_fees_usd:.2f} | ${fees_per_h_str}/h)" logger.info(f"Position {token_id}: {status_msg} | Price: {current_price:.4f} [{lower_price:.4f} - {upper_price:.4f}]{pnl_text}") # --- KPI LOGGING --- if log_kpi_snapshot: snapshot = { 'initial_eth': active_auto_pos.get('amount0_initial', 0), 'initial_usdc': active_auto_pos.get('amount1_initial', 0), 'initial_hedge_usdc': INITIAL_HEDGE_CAPITAL_USDC, 'current_eth_price': float(current_price), 'uniswap_pos_value_usd': float(current_pos_value_usd), 'uniswap_fees_claimed_usd': 0.0, # Not tracked accumulated yet in JSON, using Unclaimed mainly 'uniswap_fees_unclaimed_usd': float(total_fees_usd), # Hedge Data (from JSON updated by clp_hedger) 'hedge_equity_usd': float(active_auto_pos.get('hedge_equity_usd', 0.0)), 'hedge_pnl_realized_usd': active_auto_pos.get('hedge_pnl_realized', 0.0), 'hedge_fees_paid_usd': active_auto_pos.get('hedge_fees_paid', 0.0) } # We use 'target_value' as a proxy for 'Initial Hedge Equity' + 'Initial Uni Val' if strictly tracking strategy? # For now, let's pass what we have. # To get 'hedge_equity', we ideally need clp_hedger to write it to JSON. # Current implementation of kpi_tracker uses 'hedge_equity' in NAV. # If we leave it 0, NAV will be underreported. # WORKAROUND: Assume Hedge PnL Realized IS the equity change if we ignore margin. log_kpi_snapshot(snapshot) # --- REPOSITION LOGIC --- pos_range_mode = active_auto_pos.get("range_mode", RANGE_MODE) if pos_range_mode == "AUTO" and CLOSE_POSITION_ENABLED: coin_for_dynamic = pos_details['token0_symbol'] if not is_t0_stable else pos_details['token1_symbol'] new_range_width = calculate_dynamic_range_pct(coin_for_dynamic) if new_range_width: # Use initial width from JSON, or current config width as fallback old_range_width = Decimal(str(active_auto_pos.get("range_width_initial", RANGE_WIDTH_PCT))) # Condition A: Difference > 20% width_diff_pct = abs(new_range_width - old_range_width) / old_range_width # Condition B: Profit > 0.1% profit_pct = total_pnl_usd / initial_value logger.info(f"📊 AUTO Check: CurRange {old_range_width*100:.2f}%, NewRange {new_range_width*100:.2f}% | Diff {width_diff_pct*100:.1f}% | Profit {profit_pct*100:.2f}%") if width_diff_pct > 0.20 and profit_pct > 0.001: logger.warning(f"🔄 REPOSITION TRIGGERED: Width Diff {width_diff_pct*100:.1f}%, Profit {profit_pct*100:.2f}%") # Set in_range to False to force the closing logic below in_range = False else: logger.warning(f"⚖️ AUTO Check Skipped: Market indicators for {coin_for_dynamic} are stale or conditions not met.") if not in_range and CLOSE_POSITION_ENABLED: logger.warning(f"🛑 Closing Position {token_id} (Out of Range)") update_position_status(token_id, "CLOSING") # 1. Remove Liquidity if decrease_liquidity(w3, npm, account, token_id, pos_details['liquidity'], pos_details['token0_decimals'], pos_details['token1_decimals']): # 2. Collect Fees collect_fees(w3, npm, account, token_id) update_position_status(token_id, "CLOSED") # 3. Optional Rebalance (Sell 50% WETH if fell below) if REBALANCE_ON_CLOSE_BELOW_RANGE and current_tick < tick_lower: # Simple rebalance logic here (similar to original check_and_swap surplus logic) pass elif OPEN_POSITION_ENABLED: logger.info("🔍 No active position. Analyzing market (Fast scan: 37s)...") # Setup logic for new position tA = clean_address(WETH_ADDRESS) tB = clean_address(USDC_ADDRESS) if tA.lower() < tB.lower(): token0, token1 = tA, tB else: token0, token1 = tB, tA fee = POOL_FEE pool_addr = factory.functions.getPool(token0, token1, fee).call() pool_abi = AERODROME_POOL_ABI if "AERODROME" in CONFIG.get("NAME", "").upper() else UNISWAP_V3_POOL_ABI pool_c = w3.eth.contract(address=pool_addr, abi=pool_abi) pool_data = get_pool_dynamic_data(pool_c) if pool_data: tick = pool_data['tick'] # --- PRE-CALCULATE ESSENTIALS --- # Fetch Decimals & Symbols immediately (Required for Oracle Check) t0_c = w3.eth.contract(address=token0, abi=ERC20_ABI) t1_c = w3.eth.contract(address=token1, abi=ERC20_ABI) d0 = t0_c.functions.decimals().call() d1 = t1_c.functions.decimals().call() t0_sym = t0_c.functions.symbol().call().upper() t1_sym = t1_c.functions.symbol().call().upper() stable_symbols = ["USDC", "USDT", "DAI", "FDUSD", "USDS"] is_t1_stable = any(s in t1_sym for s in stable_symbols) is_t0_stable = any(s in t0_sym for s in stable_symbols) price_0_in_1 = price_from_sqrt_price_x96(pool_data['sqrtPriceX96'], d0, d1) # Define coin_sym early for Guard Rails coin_sym = CONFIG.get("COIN_SYMBOL", "ETH") # --- ORACLE GUARD RAIL --- # Protect against Pool/Oracle divergence (Manipulation/Depeg/Lag) if not force_mode_active: oracle_price = get_realtime_price(coin_sym) if oracle_price: pool_price_dec = price_0_in_1 if is_t1_stable else (Decimal("1") / price_0_in_1) divergence = abs(pool_price_dec - oracle_price) / oracle_price if divergence > Decimal("0.0025"): # 0.25% Tolerance logger.warning(f"⚠️ Price Divergence! Pool: {pool_price_dec:.2f} vs Oracle: {oracle_price:.2f} (Diff: {divergence*100:.2f}%). Aborting.") time.sleep(10) continue else: logger.warning("⚠️ Could not fetch Oracle price. Proceeding with caution (or consider aborting).") # --- DYNAMIC RANGE CALCULATION --- active_range_width = RANGE_WIDTH_PCT current_range_mode = RANGE_MODE # 1. PRIORITY: Force Mode if force_mode_active: logger.warning(f"🚨 FORCE OVERRIDE: Using forced width {force_width_pct*100:.2f}% (Ignoring safe checks)") active_range_width = force_width_pct current_range_mode = "FIXED" # 2. AUTO Mode (Only if not forced) elif RANGE_MODE == "AUTO": dynamic_width = calculate_dynamic_range_pct(coin_sym) if dynamic_width: active_range_width = dynamic_width logger.info(f"⚖️ AUTO Range Activated: {active_range_width*100:.4f}%") else: logger.info(f"⛔ AUTO conditions not met. Waiting for safe entry...") time.sleep(MONITOR_INTERVAL_SECONDS) continue # Skip logic # 3. FIXED Mode (Default Fallback) is already set by initial active_range_width # Define Range tick_delta = int(math.log(1 + float(active_range_width)) / math.log(1.0001)) # Fetch actual tick spacing from pool tick_spacing = pool_c.functions.tickSpacing().call() logger.info(f"📏 Tick Spacing: {tick_spacing}") tick_lower = (tick - tick_delta) // tick_spacing * tick_spacing tick_upper = (tick + tick_delta) // tick_spacing * tick_spacing # Calculate Amounts # Target Value logic # Determine Investment Value in Token1 terms target_usd = Decimal(str(TARGET_INVESTMENT_VALUE_USDC)) investment_val_token1 = Decimal("0") if str(TARGET_INVESTMENT_VALUE_USDC).upper() == "MAX": # ... (Existing MAX logic needs update too, but skipping for brevity as user uses fixed amount) pass else: if is_t1_stable: # T1 is stable (e.g. ETH/USDC). Target 2000 USD = 2000 Token1. investment_val_token1 = target_usd elif is_t0_stable: # T0 is stable (e.g. USDT/BNB). Target 2000 USD = 2000 Token0. # We need value in Token1. # Price 0 in 1 = (BNB per USDT) approx 0.0012 # Val T1 = Val T0 * Price(0 in 1) investment_val_token1 = target_usd * price_0_in_1 logger.info(f"💱 Converted ${target_usd} -> {investment_val_token1:.4f} {t1_sym} (Price: {price_0_in_1:.6f})") else: # Fallback: Assume T1 is Stable (Dangerous but standard default) logger.warning("⚠️ Could not detect Stable token. Assuming T1 is stable.") investment_val_token1 = target_usd amt0, amt1 = calculate_mint_amounts(tick, tick_lower, tick_upper, investment_val_token1, d0, d1, pool_data['sqrtPriceX96']) if check_and_swap_for_deposit(w3, router, account, token0, token1, amt0, amt1, pool_data['sqrtPriceX96'], d0, d1): # --- STALE DATA PROTECTION (Pre-Mint) --- # Check if price moved significantly during calculation/swap pre_mint_data = get_pool_dynamic_data(pool_c) if pre_mint_data: tick_diff = abs(pre_mint_data['tick'] - pool_data['tick']) # 13 ticks ~ 0.13% price move. Abort if volatile. if tick_diff > 13: logger.warning(f"⚠️ Price moved too much ({tick_diff} ticks) during setup/swap. Aborting mint to prevent bad entry.") time.sleep(5) continue minted = mint_new_position(w3, npm, account, token0, token1, amt0, amt1, tick_lower, tick_upper, d0, d1) if minted: # --- DISABLE FORCE MODE AFTER FIRST MINT --- if force_mode_active: logger.info("🛑 FORCE MODE CONSUMED: Returning to standard AUTO checks for future positions.") force_mode_active = False # --- RE-FETCH PRICE FOR ACCURATE ENTRY DATA (Post-Mint) --- fresh_pool_data = get_pool_dynamic_data(pool_c) if fresh_pool_data: fresh_tick = fresh_pool_data['tick'] price_0_in_1 = price_from_tick(fresh_tick, d0, d1) logger.info(f"🔄 Refreshed Entry Tick: {fresh_tick} (Was: {pool_data['tick']})") else: price_0_in_1 = price_from_tick(pool_data['tick'], d0, d1) fmt_amt0 = float(Decimal(minted['amount0']) / Decimal(10**d0)) fmt_amt1 = float(Decimal(minted['amount1']) / Decimal(10**d1)) if is_t1_stable: entry_price = float(price_0_in_1) actual_value = (fmt_amt0 * entry_price) + fmt_amt1 r_upper = float(price_from_tick(minted['tick_upper'], d0, d1)) r_lower = float(price_from_tick(minted['tick_lower'], d0, d1)) else: # Inverted (T0 is stable) entry_price = float(Decimal("1") / price_0_in_1) actual_value = fmt_amt0 + (fmt_amt1 * entry_price) r_upper = float(Decimal("1") / price_from_tick(minted['tick_lower'], d0, d1)) r_lower = float(Decimal("1") / price_from_tick(minted['tick_upper'], d0, d1)) # Prepare ordered data with specific rounding new_position_data = { "type": "AUTOMATIC", "target_value": round(float(actual_value), 2), "entry_price": round(entry_price, 4), "amount0_initial": round(fmt_amt0, 4), "amount1_initial": round(fmt_amt1, 4), "liquidity": str(minted['liquidity']), "range_upper": round(r_upper, 4), "range_lower": round(r_lower, 4), "token0_decimals": d0, "token1_decimals": d1, "range_mode": current_range_mode, "range_width_initial": float(active_range_width), "timestamp_open": int(time.time()), "time_open": datetime.now().strftime("%d.%m.%y %H:%M:%S") } update_position_status(minted['token_id'], "OPEN", new_position_data) # Dynamic Sleep: 37s if no position, else configured interval sleep_time = MONITOR_INTERVAL_SECONDS if active_auto_pos else 37 time.sleep(sleep_time) except KeyboardInterrupt: logger.info("👋 Exiting...") break except Exception as e: logger.error(f"❌ Main Loop Error: {e}") time.sleep(MONITOR_INTERVAL_SECONDS) if __name__ == "__main__": main()