Files
uniswap_auto_clp/clp_manager.py

1200 lines
55 KiB
Python

import os
import sys
import time
import json
import re
import logging
import math
from decimal import Decimal, getcontext
from datetime import datetime
from typing import Optional, Dict, Tuple, Any, List
from web3 import Web3
from web3.exceptions import TimeExhausted, ContractLogicError
from web3.middleware import ExtraDataToPOAMiddleware # FIX for Web3.py v6+
from eth_account import Account
from eth_account.signers.local import LocalAccount
from dotenv import load_dotenv
# --- IMPORTS FOR KPI ---
try:
from tools.kpi_tracker import log_kpi_snapshot
except ImportError:
logging.warning("KPI Tracker not found. Performance logging disabled.")
log_kpi_snapshot = None
# Set Decimal precision high enough for EVM math
getcontext().prec = 60
# --- LOGGING SETUP ---
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)
# Ensure logs directory exists
log_dir = os.path.join(current_dir, 'logs')
os.makedirs(log_dir, exist_ok=True)
try:
from logging_utils import setup_logging
# Assuming setup_logging might handle file logging if configured,
# but to be safe and explicit as requested, we'll add a FileHandler here
# or rely on setup_logging if it supports it.
# Since I don't see setup_logging code, I will manually add a file handler to the logger.
logger = setup_logging("normal", "UNISWAP_MANAGER")
except ImportError:
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("UNISWAP_MANAGER")
# Custom Filter for Millisecond Unix Timestamp
class UnixMsLogFilter(logging.Filter):
def filter(self, record):
record.unix_ms = int(record.created * 1000)
return True
# Add File Handler
log_file = os.path.join(log_dir, 'uniswap_manager.log')
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.addFilter(UnixMsLogFilter())
formatter = logging.Formatter('%(unix_ms)d, %(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
from clp_abis import (
NONFUNGIBLE_POSITION_MANAGER_ABI,
UNISWAP_V3_POOL_ABI,
ERC20_ABI,
UNISWAP_V3_FACTORY_ABI,
AERODROME_FACTORY_ABI,
AERODROME_POOL_ABI,
AERODROME_NPM_ABI,
SWAP_ROUTER_ABI,
WETH9_ABI
)
from clp_config import get_current_config, STATUS_FILE
from tools.universal_swapper import execute_swap
# --- GET ACTIVE DEX CONFIG ---
CONFIG = get_current_config()
DEX_TO_CHAIN = {
"UNISWAP_V3": "ARBITRUM",
"UNISWAP_wide": "ARBITRUM",
"PANCAKESWAP_BNB": "BSC",
"WETH_CBBTC_BASE": "BASE",
"UNISWAP_BASE_CL": "BASE",
"AERODROME_BASE_CL": "BASE",
"AERODROME_WETH-USDC_008": "BASE"
}
# --- CONFIGURATION FROM STRATEGY ---
MONITOR_INTERVAL_SECONDS = CONFIG.get("MONITOR_INTERVAL_SECONDS", 60)
CLOSE_POSITION_ENABLED = CONFIG.get("CLOSE_POSITION_ENABLED", True)
OPEN_POSITION_ENABLED = CONFIG.get("OPEN_POSITION_ENABLED", True)
REBALANCE_ON_CLOSE_BELOW_RANGE = CONFIG.get("REBALANCE_ON_CLOSE_BELOW_RANGE", True)
TARGET_INVESTMENT_VALUE_USDC = CONFIG.get("TARGET_INVESTMENT_AMOUNT", 2000)
INITIAL_HEDGE_CAPITAL_USDC = CONFIG.get("INITIAL_HEDGE_CAPITAL", 1000)
RANGE_WIDTH_PCT = CONFIG.get("RANGE_WIDTH_PCT", Decimal("0.01"))
RANGE_MODE = CONFIG.get("RANGE_MODE", "FIXED")
SLIPPAGE_TOLERANCE = CONFIG.get("SLIPPAGE_TOLERANCE", Decimal("0.02"))
TRANSACTION_TIMEOUT_SECONDS = CONFIG.get("TRANSACTION_TIMEOUT_SECONDS", 30)
# --- AUTO RANGE HELPERS ---
def get_market_indicators() -> Optional[Dict]:
file_path = os.path.join("market_data", "indicators.json")
if not os.path.exists(file_path):
return None
try:
with open(file_path, 'r') as f:
data = json.load(f)
# Check Freshness (5m)
last_updated_str = data.get("last_updated")
if not last_updated_str: return None
last_updated = datetime.fromisoformat(last_updated_str)
if (datetime.now() - last_updated).total_seconds() > 300:
logger.warning("⚠️ Market indicators file is stale (>5m).")
return None
return data
except Exception as e:
logger.error(f"Error reading indicators: {e}")
return None
def calculate_dynamic_range_pct(coin: str) -> Optional[Decimal]:
indicators = get_market_indicators()
if not indicators: return None
# Normalize symbols (Hyperliquid uses ETH, BNB while DEX uses WETH, WBNB)
symbol_map = {"WETH": "ETH", "WBNB": "BNB"}
lookup_coin = symbol_map.get(coin.upper(), coin.upper())
coin_data = indicators.get("data", {}).get(lookup_coin)
if not coin_data: return None
try:
price = Decimal(str(coin_data["current_price"]))
bb12 = coin_data["bb"]["12h"]
bb_low = Decimal(str(bb12["lower"]))
bb_high = Decimal(str(bb12["upper"]))
ma88 = Decimal(str(coin_data["ma"]["88"]))
# Condition 2: Price inside BB 12h
if not (bb_low <= price <= bb_high):
logger.warning(f"⚖️ AUTO: Price {price:.2f} is outside BB 12h ({bb_low:.2f} - {bb_high:.2f}). Skipping AUTO.")
return None
# Condition 3: MA 88 inside BB 12h
if not (bb_low <= ma88 <= bb_high):
logger.warning(f"⚖️ AUTO: MA 88 {ma88:.2f} is outside BB 12h. Skipping AUTO.")
return None
# Calculation: Max distance to BB edge
dist_low = abs(price - bb_low)
dist_high = abs(price - bb_high)
max_dist = max(dist_low, dist_high)
range_pct = max_dist / price
return range_pct
except (KeyError, TypeError, ValueError) as e:
logger.error(f"Error in dynamic range calc: {e}")
return None
# --- CONFIGURATION CONSTANTS ---
NONFUNGIBLE_POSITION_MANAGER_ADDRESS = CONFIG["NPM_ADDRESS"]
# Router address not strictly needed for Manager if using universal_swapper, but kept for ref
UNISWAP_V3_SWAP_ROUTER_ADDRESS = CONFIG["ROUTER_ADDRESS"]
# Arbitrum WETH/USDC (or generic T0/T1)
WETH_ADDRESS = CONFIG["WRAPPED_NATIVE_ADDRESS"]
USDC_ADDRESS = CONFIG["TOKEN_B_ADDRESS"]
POOL_FEE = CONFIG.get("POOL_FEE", 500)
# --- HELPER FUNCTIONS ---
def clean_address(addr: str) -> str:
"""Ensure address is checksummed."""
if not Web3.is_address(addr):
raise ValueError(f"Invalid address: {addr}")
return Web3.to_checksum_address(addr)
def to_decimal(value: Any, decimals: int = 0) -> Decimal:
"""Convert value to Decimal, optionally scaling down by decimals."""
if isinstance(value, Decimal):
return value
return Decimal(value) / (Decimal(10) ** decimals)
def to_wei_int(value: Decimal, decimals: int) -> int:
"""Convert Decimal value to integer Wei representation."""
return int(value * (Decimal(10) ** decimals))
def get_gas_params(w3: Web3) -> Dict[str, int]:
"""Get dynamic gas parameters for EIP-1559."""
latest_block = w3.eth.get_block("latest")
base_fee = latest_block['baseFeePerGas']
# Priority fee: 0.1 gwei or dynamic
max_priority_fee = w3.eth.max_priority_fee or Web3.to_wei(0.1, 'gwei')
# Max Fee = Base Fee * 1.5 + Priority Fee
max_fee = int(base_fee * 1.25) + max_priority_fee
return {
'maxFeePerGas': max_fee,
'maxPriorityFeePerGas': max_priority_fee
}
def send_transaction_robust(
w3: Web3,
account: LocalAccount,
func_call: Any,
value: int = 0,
gas_limit: Optional[int] = None,
extra_msg: str = ""
) -> Optional[Any]:
"""
Builds, signs, sends, and waits for a transaction with timeout and status check.
"""
try:
# 1. Prepare Params
# Use 'pending' to ensure we get the correct nonce if a tx was just sent/mined
tx_params = {
'from': account.address,
'nonce': w3.eth.get_transaction_count(account.address, 'pending'),
'value': value,
'chainId': w3.eth.chain_id,
}
# 2. Add Gas Params
gas_fees = get_gas_params(w3)
tx_params.update(gas_fees)
# 3. Simulate (Call) & Estimate Gas
try:
# If function call object provided
if hasattr(func_call, 'call'):
func_call.call({'from': account.address, 'value': value}) # Safety Dry-Run
estimated_gas = func_call.estimate_gas({'from': account.address, 'value': value})
else:
# Raw transaction construction if func_call is just params dict (rare here)
estimated_gas = 200000
tx_params['gas'] = gas_limit if gas_limit else int(estimated_gas * 1.2) # 20% buffer
# Build
if hasattr(func_call, 'build_transaction'):
tx = func_call.build_transaction(tx_params)
else:
raise ValueError("Invalid function call object")
except ContractLogicError as e:
logger.error(f"❌ Simulation/Estimation failed for {extra_msg}: {e}")
return None
# 4. Sign
signed_tx = account.sign_transaction(tx)
# 5. Send
tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
logger.info(f"📤 Sent {extra_msg} | Hash: {tx_hash.hex()}")
# 6. Wait for Receipt
receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=TRANSACTION_TIMEOUT_SECONDS)
# 7. Verify Status
if receipt.status == 1:
logger.info(f"✅ Executed {extra_msg} | Block: {receipt.blockNumber}")
return receipt
else:
logger.error(f"❌ Transaction Reverted {extra_msg} | Hash: {tx_hash.hex()}")
return None
except TimeExhausted:
logger.error(f"⌛ Transaction Timeout {extra_msg} - Check Mempool")
# In a full production bot, we would implement gas bumping here.
return None
except Exception as e:
logger.error(f"❌ Transaction Error {extra_msg}: {e}")
return None
def price_from_sqrt_price_x96(sqrt_price_x96: int, token0_decimals: int, token1_decimals: int) -> Decimal:
"""
Returns price of Token0 in terms of Token1.
"""
sqrt_price = Decimal(sqrt_price_x96)
q96 = Decimal(2) ** 96
price = (sqrt_price / q96) ** 2
# Adjust for decimals: Price = (T1 / 10^d1) / (T0 / 10^d0)
# = (T1/T0) * (10^d0 / 10^d1)
adjustment = Decimal(10) ** (token0_decimals - token1_decimals)
return price * adjustment
def price_from_tick(tick: int, token0_decimals: int, token1_decimals: int) -> Decimal:
price = Decimal("1.0001") ** tick
adjustment = Decimal(10) ** (token0_decimals - token1_decimals)
return price * adjustment
def get_sqrt_ratio_at_tick(tick: int) -> int:
return int((1.0001 ** (tick / 2)) * (2 ** 96))
def get_amounts_for_liquidity(sqrt_ratio_current: int, sqrt_ratio_a: int, sqrt_ratio_b: int, liquidity: int) -> Tuple[int, int]:
if sqrt_ratio_a > sqrt_ratio_b:
sqrt_ratio_a, sqrt_ratio_b = sqrt_ratio_b, sqrt_ratio_a
amount0 = 0
amount1 = 0
Q96 = 1 << 96
# Calculations performed in high-precision integer math (EVM style)
if sqrt_ratio_current <= sqrt_ratio_a:
amount0 = (liquidity * Q96 // sqrt_ratio_a) - (liquidity * Q96 // sqrt_ratio_b)
amount1 = 0
elif sqrt_ratio_current < sqrt_ratio_b:
amount0 = (liquidity * Q96 // sqrt_ratio_current) - (liquidity * Q96 // sqrt_ratio_b)
amount1 = (liquidity * (sqrt_ratio_current - sqrt_ratio_a)) // Q96
else:
amount1 = (liquidity * (sqrt_ratio_b - sqrt_ratio_a)) // Q96
amount0 = 0
return amount0, amount1
# --- CORE LOGIC ---
def get_position_details(w3: Web3, npm_contract, factory_contract, token_id: int):
try:
# Check ownership first to avoid errors? positions() works regardless of owner usually.
position_data = npm_contract.functions.positions(token_id).call()
(nonce, operator, token0_address, token1_address, fee, tickLower, tickUpper, liquidity,
feeGrowthInside0, feeGrowthInside1, tokensOwed0, tokensOwed1) = position_data
token0_contract = w3.eth.contract(address=token0_address, abi=ERC20_ABI)
token1_contract = w3.eth.contract(address=token1_address, abi=ERC20_ABI)
# Multi-call optimization could be used here, but keeping simple for now
token0_symbol = token0_contract.functions.symbol().call()
token1_symbol = token1_contract.functions.symbol().call()
token0_decimals = token0_contract.functions.decimals().call()
token1_decimals = token1_contract.functions.decimals().call()
pool_address = factory_contract.functions.getPool(token0_address, token1_address, fee).call()
if pool_address == '0x0000000000000000000000000000000000000000':
return None, None
pool_abi = AERODROME_POOL_ABI if "AERODROME" in CONFIG.get("NAME", "").upper() else UNISWAP_V3_POOL_ABI
pool_contract = w3.eth.contract(address=pool_address, abi=pool_abi)
return {
"token0_address": token0_address, "token1_address": token1_address,
"token0_symbol": token0_symbol, "token1_symbol": token1_symbol,
"token0_decimals": token0_decimals, "token1_decimals": token1_decimals,
"fee": fee, "tickLower": tickLower, "tickUpper": tickUpper, "liquidity": liquidity,
"pool_address": pool_address
}, pool_contract
except Exception as e:
logger.error(f"❌ Error fetching position details for ID {token_id}: {e}")
return None, None
def get_pool_dynamic_data(pool_contract) -> Optional[Dict[str, Any]]:
try:
slot0 = pool_contract.functions.slot0().call()
return {"sqrtPriceX96": slot0[0], "tick": slot0[1]}
except Exception as e:
logger.error(f"❌ Pool data fetch failed: {e}")
return None
def calculate_mint_amounts(current_tick, tick_lower, tick_upper, investment_value_token1: Decimal, decimals0, decimals1, sqrt_price_current_x96) -> Tuple[int, int]:
"""
Calculates required token amounts for a target investment value.
Uses precise Decimal math.
"""
sqrt_price_current = get_sqrt_ratio_at_tick(current_tick)
sqrt_price_lower = get_sqrt_ratio_at_tick(tick_lower)
sqrt_price_upper = get_sqrt_ratio_at_tick(tick_upper)
# Price of T0 in T1
price_t0_in_t1 = price_from_sqrt_price_x96(sqrt_price_current_x96, decimals0, decimals1)
# Calculate amounts for a "Test" liquidity amount
L_test = 1 << 128
amt0_test_wei, amt1_test_wei = get_amounts_for_liquidity(sqrt_price_current, sqrt_price_lower, sqrt_price_upper, L_test)
amt0_test = Decimal(amt0_test_wei) / Decimal(10**decimals0)
amt1_test = Decimal(amt1_test_wei) / Decimal(10**decimals1)
# Value in Token1 terms
value_test = (amt0_test * price_t0_in_t1) + amt1_test
if value_test <= 0:
return 0, 0
scale = investment_value_token1 / value_test
final_amt0_wei = int(Decimal(amt0_test_wei) * scale)
final_amt1_wei = int(Decimal(amt1_test_wei) * scale)
return final_amt0_wei, final_amt1_wei
def ensure_allowance(w3: Web3, account: LocalAccount, token_address: str, spender_address: str, amount_needed: int) -> bool:
"""
Checks if allowance is sufficient, approves if not.
"""
try:
token_c = w3.eth.contract(address=token_address, abi=ERC20_ABI)
allowance = token_c.functions.allowance(account.address, spender_address).call()
if allowance >= amount_needed:
return True
logger.info(f"🔓 Approving {token_address} for {spender_address}...")
# Some tokens (USDT) fail if approving from non-zero to non-zero.
# Safe practice: Approve 0 first if allowance > 0, then new amount.
if allowance > 0:
send_transaction_robust(w3, account, token_c.functions.approve(spender_address, 0), extra_msg="Reset Allowance")
# Approve
receipt = send_transaction_robust(
w3, account,
token_c.functions.approve(spender_address, amount_needed),
extra_msg=f"Approve {token_address}"
)
return receipt is not None
except Exception as e:
logger.error(f"❌ Allowance check/approve failed: {e}")
return False
def check_and_swap_for_deposit(w3: Web3, router_contract, account: LocalAccount, token0: str, token1: str, amount0_needed: int, amount1_needed: int, sqrt_price_x96: int, d0: int, d1: int) -> bool:
"""
Checks balances, wraps ETH if needed, and swaps ONLY the required surplus to meet deposit requirements.
Uses universal_swapper for the swap execution.
"""
token0 = clean_address(token0)
token1 = clean_address(token1)
token0_c = w3.eth.contract(address=token0, abi=ERC20_ABI)
token1_c = w3.eth.contract(address=token1, abi=ERC20_ABI)
bal0 = token0_c.functions.balanceOf(account.address).call()
bal1 = token1_c.functions.balanceOf(account.address).call()
# Calculate Deficits
deficit0 = max(0, amount0_needed - bal0)
deficit1 = max(0, amount1_needed - bal1)
weth_lower = WETH_ADDRESS.lower()
# --- AUTO WRAP ETH ---
if (deficit0 > 0 and token0.lower() == weth_lower) or (deficit1 > 0 and token1.lower() == weth_lower):
eth_bal = w3.eth.get_balance(account.address)
# Keep 0.01 ETH for gas
gas_reserve = Web3.to_wei(0.01, 'ether')
available_eth = max(0, eth_bal - gas_reserve)
wrap_needed = 0
if token0.lower() == weth_lower: wrap_needed += deficit0
if token1.lower() == weth_lower: wrap_needed += deficit1
amount_to_wrap = min(wrap_needed, available_eth)
if amount_to_wrap > 0:
logger.info(f"🌯 Wrapping {Web3.from_wei(amount_to_wrap, 'ether')} ETH...")
weth_c = w3.eth.contract(address=WETH_ADDRESS, abi=WETH9_ABI)
receipt = send_transaction_robust(w3, account, weth_c.functions.deposit(), value=amount_to_wrap, extra_msg="Wrap ETH")
if receipt:
# Refresh Balances
bal0 = token0_c.functions.balanceOf(account.address).call()
bal1 = token1_c.functions.balanceOf(account.address).call()
deficit0 = max(0, amount0_needed - bal0)
deficit1 = max(0, amount1_needed - bal1)
if deficit0 == 0 and deficit1 == 0:
return True
# --- SWAP SURPLUS ---
# Smart Swap: Calculate exactly how much we need to swap
# Price of Token0 in terms of Token1
price_0_in_1 = price_from_sqrt_price_x96(sqrt_price_x96, d0, d1)
chain_name = DEX_TO_CHAIN.get(os.environ.get("TARGET_DEX", "UNISWAP_V3"), "ARBITRUM")
token_in_sym, token_out_sym = None, None
amount_in_float = 0.0
buffer_multiplier = Decimal("1.03")
if deficit0 > 0 and bal1 > amount1_needed:
# Need T0 (ETH), Have extra T1 (USDC)
# Swap T1 -> T0
# Cost in T1 = Deficit0 * Price(T0 in T1)
cost_in_t1 = Decimal(deficit0) / Decimal(10**d0) * price_0_in_1
# Convert back to T1 Wei and apply buffer
amount_in_needed = int(cost_in_t1 * Decimal(10**d1) * buffer_multiplier)
surplus1 = bal1 - amount1_needed
if surplus1 >= amount_in_needed:
# Get Symbols
token_in_sym = token1_c.functions.symbol().call().upper()
token_out_sym = token0_c.functions.symbol().call().upper()
amount_in_float = float(Decimal(amount_in_needed) / Decimal(10**d1))
logger.info(f"🧮 Calc: Need {deficit0} T0. Cost ~{amount_in_needed} T1. Surplus: {surplus1}")
else:
logger.warning(f"❌ Insufficient Surplus T1. Need {amount_in_needed}, Have {surplus1}")
elif deficit1 > 0 and bal0 > amount0_needed:
# Need T1 (USDC), Have extra T0 (ETH)
# Swap T0 -> T1
# Cost in T0 = Deficit1 / Price(T0 in T1)
if price_0_in_1 > 0:
cost_in_t0 = (Decimal(deficit1) / Decimal(10**d1)) / price_0_in_1
amount_in_needed = int(cost_in_t0 * Decimal(10**d0) * buffer_multiplier)
surplus0 = bal0 - amount0_needed
if surplus0 >= amount_in_needed:
token_in_sym = token0_c.functions.symbol().call().upper()
token_out_sym = token1_c.functions.symbol().call().upper()
amount_in_float = float(Decimal(amount_in_needed) / Decimal(10**d0))
logger.info(f"🧮 Calc: Need {deficit1} T1. Cost ~{amount_in_needed} T0. Surplus: {surplus0}")
else:
logger.warning(f"❌ Insufficient Surplus T0. Need {amount_in_needed}, Have {surplus0}")
if token_in_sym and amount_in_float > 0:
logger.info(f"🔄 Delegating Swap to Universal Swapper: {amount_in_float} {token_in_sym} -> {token_out_sym} on {chain_name}...")
try:
# Use Standard Fee (500) if configured fee is weird (like 1 for Aerodrome tickSpacing)
# This ensures the standard router finds a valid pool (WETH/USDC 0.05%)
swap_fee = POOL_FEE if POOL_FEE >= 100 else 500
# Call Universal Swapper
execute_swap(chain_name, token_in_sym, token_out_sym, amount_in_float, fee_tier=swap_fee)
# Wait for node indexing
logger.info("⏳ Waiting for balance update...")
time.sleep(2)
# Retry check loop
for i in range(3):
bal0 = token0_c.functions.balanceOf(account.address).call()
bal1 = token1_c.functions.balanceOf(account.address).call()
if bal0 >= amount0_needed and bal1 >= amount1_needed:
logger.info("✅ Balances sufficient.")
return True
if i < 2:
logger.info(f"⏳ Balance not updated yet, retrying ({i+1}/3)...")
time.sleep(2)
logger.warning(f"⚠️ Swap executed but still short? T0: {bal0}/{amount0_needed}, T1: {bal1}/{amount1_needed}")
return False
except Exception as e:
logger.error(f"❌ Universal Swap Failed: {e}")
return False
logger.warning(f"❌ Insufficient funds (No suitable swap found). T0: {bal0}/{amount0_needed}, T1: {bal1}/{amount1_needed}")
return False
def mint_new_position(w3: Web3, npm_contract, account: LocalAccount, token0: str, token1: str, amount0: int, amount1: int, tick_lower: int, tick_upper: int, d0: int, d1: int) -> Optional[Dict]:
"""
Approves tokens and mints a new V3 position.
"""
logger.info("🚀 Minting new position...")
# 1. Approve
if not ensure_allowance(w3, account, token0, NONFUNGIBLE_POSITION_MANAGER_ADDRESS, amount0): return None
if not ensure_allowance(w3, account, token1, NONFUNGIBLE_POSITION_MANAGER_ADDRESS, amount1): return None
# 2. Calculate Min Amounts (Slippage Protection)
# Using 0.5% slippage tolerance
amount0_min = int(Decimal(amount0) * (Decimal(1) - SLIPPAGE_TOLERANCE))
amount1_min = int(Decimal(amount1) * (Decimal(1) - SLIPPAGE_TOLERANCE))
# 3. Mint
base_params = [
token0, token1, POOL_FEE,
tick_lower, tick_upper,
amount0, amount1,
amount0_min, amount1_min,
account.address,
int(time.time()) + 180
]
# Aerodrome Slipstream expects sqrtPriceX96 as the last parameter
if "AERODROME" in os.environ.get("TARGET_DEX", "").upper():
base_params.append(0) # sqrtPriceX96
params = tuple(base_params)
receipt = send_transaction_robust(w3, account, npm_contract.functions.mint(params), extra_msg="Mint Position")
if receipt and receipt.status == 1:
# Parse Logs
try:
# Transfer Event (Topic0)
transfer_topic = Web3.keccak(text="Transfer(address,address,uint256)").hex()
# IncreaseLiquidity Event (Topic0)
increase_liq_topic = Web3.keccak(text="IncreaseLiquidity(uint256,uint128,uint256,uint256)").hex()
minted_data = {'token_id': None, 'liquidity': 0, 'amount0': 0, 'amount1': 0}
for log in receipt.logs:
topics = [t.hex() for t in log['topics']]
# Capture Token ID
if topics[0] == transfer_topic:
if "0000000000000000000000000000000000000000" in topics[1]:
minted_data['token_id'] = int(topics[3], 16)
# Capture Amounts
if topics[0] == increase_liq_topic:
# decoding data: liquidity(uint128), amount0(uint256), amount1(uint256)
# data is a single hex string, we need to decode it
data = log['data'].hex()
if data.startswith('0x'):
data = data[2:]
# liquidity is first 32 bytes (padded), amt0 next 32, amt1 next 32
minted_data['liquidity'] = int(data[0:64], 16)
minted_data['amount0'] = int(data[64:128], 16)
minted_data['amount1'] = int(data[128:192], 16)
if minted_data['token_id']:
# Format for Log using actual decimals
fmt_amt0 = Decimal(minted_data['amount0']) / Decimal(10**d0)
fmt_amt1 = Decimal(minted_data['amount1']) / Decimal(10**d1)
logger.info(f"✅ POSITION OPENED | ID: {minted_data['token_id']} | Deposited: {fmt_amt0:.6f} + {fmt_amt1:.6f}")
# --- VERIFY TICKS ON-CHAIN ---
try:
pos_data = npm_contract.functions.positions(minted_data['token_id']).call()
# pos_data structure: nonce, operator, t0, t1, fee, tickLower, tickUpper, ...
minted_data['tick_lower'] = pos_data[5]
minted_data['tick_upper'] = pos_data[6]
logger.info(f"🔗 Verified Ticks: {minted_data['tick_lower']} <-> {minted_data['tick_upper']}")
except Exception as e:
logger.warning(f"⚠️ Could not verify ticks immediately: {e}")
# Fallback to requested ticks if fetch fails
minted_data['tick_lower'] = tick_lower
minted_data['tick_upper'] = tick_upper
return minted_data
except Exception as e:
logger.warning(f"Minted but failed to parse details: {e}")
return None
def decrease_liquidity(w3: Web3, npm_contract, account: LocalAccount, token_id: int, liquidity: int, d0: int, d1: int) -> bool:
if liquidity == 0: return True
logger.info(f"📉 Decreasing Liquidity for {token_id}...")
params = (
token_id,
liquidity,
0, 0, # amountMin0, amountMin1
int(time.time()) + 180
)
receipt = send_transaction_robust(w3, account, npm_contract.functions.decreaseLiquidity(params), extra_msg=f"Decrease Liq {token_id}")
if receipt and receipt.status == 1:
try:
# Parse DecreaseLiquidity Event
decrease_topic = Web3.keccak(text="DecreaseLiquidity(uint256,uint128,uint256,uint256)").hex()
amt0, amt1 = 0, 0
for log in receipt.logs:
topics = [t.hex() for t in log['topics']]
if topics[0] == decrease_topic:
# Check tokenID (topic 1)
if int(topics[1], 16) == token_id:
data = log['data'].hex()[2:]
# liquidity (32), amt0 (32), amt1 (32)
amt0 = int(data[64:128], 16)
amt1 = int(data[128:192], 16)
break
fmt_amt0 = Decimal(amt0) / Decimal(10**d0)
fmt_amt1 = Decimal(amt1) / Decimal(10**d1)
logger.info(f"📉 POSITION CLOSED (Liquidity Removed) | ID: {token_id} | Withdrawn: {fmt_amt0:.6f} + {fmt_amt1:.6f}")
except Exception as e:
logger.warning(f"Closed but failed to parse details: {e}")
return True
return False
def collect_fees(w3: Web3, npm_contract, account: LocalAccount, token_id: int) -> bool:
logger.info(f"💰 Collecting Fees for {token_id}...")
max_val = 2**128 - 1
params = (
token_id,
account.address,
max_val, max_val
)
receipt = send_transaction_robust(w3, account, npm_contract.functions.collect(params), extra_msg=f"Collect Fees {token_id}")
return receipt is not None
# --- STATE MANAGEMENT ---
def load_status_data() -> List[Dict]:
if not os.path.exists(STATUS_FILE):
return []
try:
with open(STATUS_FILE, 'r') as f:
return json.load(f)
except:
return []
def save_status_data(data: List[Dict]):
with open(STATUS_FILE, 'w') as f:
json.dump(data, f, indent=2)
def update_position_status(token_id: int, status: str, extra_data: Dict = {}):
data = load_status_data()
# Find existing or create new
entry = next((item for item in data if item.get('token_id') == token_id), None)
if not entry:
if status in ["OPEN", "PENDING_HEDGE"]:
entry = {"type": "AUTOMATIC", "token_id": token_id}
data.append(entry)
else:
return # Can't update non-existent position unless opening
entry['status'] = status
entry.update(extra_data)
if status == "CLOSED":
now = datetime.now()
entry['timestamp_close'] = int(now.timestamp())
entry['time_close'] = now.strftime("%d.%m.%y %H:%M:%S")
save_status_data(data)
logger.info(f"💾 Updated Position {token_id} status to {status}")
import argparse
import requests
# --- REAL-TIME ORACLE HELPER ---
def get_realtime_price(coin: str) -> Optional[Decimal]:
"""Fetches current mid-price directly from Hyperliquid API (low latency)."""
try:
url = "https://api.hyperliquid.xyz/info"
response = requests.post(url, json={"type": "allMids"}, timeout=2)
if response.status_code == 200:
data = response.json()
# Hyperliquid symbols are usually clean (ETH, BNB)
# Map common variations just in case
target = coin.upper().replace("WETH", "ETH").replace("WBNB", "BNB")
if target in data:
return Decimal(data[target])
except Exception as e:
logger.warning(f"⚠️ Failed to fetch realtime Oracle price: {e}")
return None
# --- MAIN LOOP ---
def main():
# --- ARGUMENT PARSING ---
parser = argparse.ArgumentParser(description="Uniswap CLP Manager")
parser.add_argument("--force", type=float, help="Force open a position with specific range width (e.g., 0.75), ignoring AUTO safe checks.")
args = parser.parse_args()
force_mode_active = False
force_width_pct = Decimal("0")
if args.force:
force_mode_active = True
force_width_pct = Decimal(str(args.force)) / 100 # Convert 0.75 -> 0.0075
logger.warning(f"🚨 FORCE MODE ACTIVE: Will bypass safe checks for FIRST position with width {args.force}%")
logger.info(f"🔷 {CONFIG['NAME']} Manager V2 Starting...")
load_dotenv(override=True)
# Dynamically load the RPC based on DEX Profile
rpc_url = os.environ.get(CONFIG["RPC_ENV_VAR"])
private_key = os.environ.get("MAIN_WALLET_PRIVATE_KEY") or os.environ.get("PRIVATE_KEY")
if not rpc_url or not private_key:
logger.error("❌ Missing RPC or Private Key in .env")
return
w3 = Web3(Web3.HTTPProvider(rpc_url))
if not w3.is_connected():
logger.error("❌ Could not connect to RPC")
return
# FIX: Inject POA middleware for BNB Chain/Polygon/etc. (Web3.py v6+)
w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0)
account = Account.from_key(private_key)
logger.info(f"👤 Wallet: {account.address}")
# Contracts
target_dex_name = os.environ.get("TARGET_DEX", "").upper()
if "AERODROME" in target_dex_name or "AERODROME" in CONFIG.get("NAME", "").upper():
logger.info("✈️ Using Aerodrome NPM ABI")
npm = w3.eth.contract(address=clean_address(NONFUNGIBLE_POSITION_MANAGER_ADDRESS), abi=AERODROME_NPM_ABI)
else:
npm = w3.eth.contract(address=clean_address(NONFUNGIBLE_POSITION_MANAGER_ADDRESS), abi=NONFUNGIBLE_POSITION_MANAGER_ABI)
factory_addr = npm.functions.factory().call()
# Select Factory ABI based on DEX type
if "AERODROME" in target_dex_name or "AERODROME" in CONFIG.get("NAME", "").upper():
logger.info("✈️ Using Aerodrome Factory ABI (tickSpacing instead of fee)")
factory_abi = AERODROME_FACTORY_ABI
else:
factory_abi = UNISWAP_V3_FACTORY_ABI
factory = w3.eth.contract(address=factory_addr, abi=factory_abi)
router = w3.eth.contract(address=clean_address(UNISWAP_V3_SWAP_ROUTER_ADDRESS), abi=SWAP_ROUTER_ABI)
while True:
try:
status_data = load_status_data()
open_positions = [p for p in status_data if p.get('status') == 'OPEN']
active_auto_pos = next((p for p in open_positions if p.get('type') == 'AUTOMATIC'), None)
if active_auto_pos:
token_id = active_auto_pos['token_id']
pos_details, pool_c = get_position_details(w3, npm, factory, token_id)
if pos_details:
pool_data = get_pool_dynamic_data(pool_c)
current_tick = pool_data['tick']
# Check Range
tick_lower = pos_details['tickLower']
tick_upper = pos_details['tickUpper']
in_range = tick_lower <= current_tick < tick_upper
# Calculate Prices for logging
price_0_in_1 = price_from_tick(current_tick, pos_details['token0_decimals'], pos_details['token1_decimals'])
# --- SMART STABLE DETECTION ---
# Determine which token is the "Stable" side to anchor USD value
stable_symbols = ["USDC", "USDT", "DAI", "FDUSD", "USDS"]
is_t1_stable = any(s in pos_details['token1_symbol'].upper() for s in stable_symbols)
is_t0_stable = any(s in pos_details['token0_symbol'].upper() for s in stable_symbols)
if is_t1_stable:
# Standard: T0=Volatile, T1=Stable. Price = T1 per T0
current_price = price_0_in_1
lower_price = price_from_tick(tick_lower, pos_details['token0_decimals'], pos_details['token1_decimals'])
upper_price = price_from_tick(tick_upper, pos_details['token0_decimals'], pos_details['token1_decimals'])
elif is_t0_stable:
# Inverted: T0=Stable, T1=Volatile. Price = T0 per T1
# We want Price of T1 in terms of T0
current_price = Decimal("1") / price_0_in_1
lower_price = Decimal("1") / price_from_tick(tick_upper, pos_details['token0_decimals'], pos_details['token1_decimals'])
upper_price = Decimal("1") / price_from_tick(tick_lower, pos_details['token0_decimals'], pos_details['token1_decimals'])
else:
# Fallback to T1
current_price = price_0_in_1
lower_price = price_from_tick(tick_lower, pos_details['token0_decimals'], pos_details['token1_decimals'])
upper_price = price_from_tick(tick_upper, pos_details['token0_decimals'], pos_details['token1_decimals'])
# --- RANGE DISPLAY ---
# Calculate ranges from ticks for display purposes
real_range_lower = round(float(lower_price), 4)
real_range_upper = round(float(upper_price), 4)
status_msg = "✅ IN RANGE" if in_range else "⚠️ OUT OF RANGE"
# Calculate Unclaimed Fees (Simulation)
unclaimed0, unclaimed1, total_fees_usd = 0, 0, 0
try:
# Call collect with zero address to simulate fee estimation
fees_sim = npm.functions.collect((token_id, "0x0000000000000000000000000000000000000000", 2**128-1, 2**128-1)).call({'from': account.address})
u0 = to_decimal(fees_sim[0], pos_details['token0_decimals'])
u1 = to_decimal(fees_sim[1], pos_details['token1_decimals'])
if is_t1_stable:
total_fees_usd = (u0 * current_price) + u1
else:
total_fees_usd = u0 + (u1 * current_price)
except Exception as e:
logger.debug(f"Fee simulation failed for {token_id}: {e}")
# Calculate Total PnL (Fees + Price Appreciation/Depreciation)
# We need the initial investment value (target_value)
initial_value = Decimal(str(active_auto_pos.get('target_value', 0)))
curr_amt0_wei, curr_amt1_wei = get_amounts_for_liquidity(
pool_data['sqrtPriceX96'],
get_sqrt_ratio_at_tick(tick_lower),
get_sqrt_ratio_at_tick(tick_upper),
pos_details['liquidity']
)
curr_amt0 = Decimal(curr_amt0_wei) / Decimal(10**pos_details['token0_decimals'])
curr_amt1 = Decimal(curr_amt1_wei) / Decimal(10**pos_details['token1_decimals'])
if is_t1_stable:
current_pos_value_usd = (curr_amt0 * current_price) + curr_amt1
else:
current_pos_value_usd = curr_amt0 + (curr_amt1 * current_price)
pnl_unrealized = current_pos_value_usd - initial_value
total_pnl_usd = pnl_unrealized + total_fees_usd
# --- PERSIST PERFORMANCE TO JSON ---
update_position_status(token_id, "OPEN", {
"clp_fees": round(float(total_fees_usd), 2),
"clp_TotPnL": round(float(total_pnl_usd), 2)
})
# Calculate Fees/h
fees_per_h_str = "0.00"
ts_open = active_auto_pos.get('timestamp_open')
if ts_open:
hours_open = (time.time() - ts_open) / 3600
if hours_open > 0.01:
fees_per_h_str = f"{float(total_fees_usd) / hours_open:.2f}"
pnl_text = f" | TotPnL: ${total_pnl_usd:.2f} (Fees: ${total_fees_usd:.2f} | ${fees_per_h_str}/h)"
logger.info(f"Position {token_id}: {status_msg} | Price: {current_price:.4f} [{lower_price:.4f} - {upper_price:.4f}]{pnl_text}")
# --- KPI LOGGING ---
if log_kpi_snapshot:
snapshot = {
'initial_eth': active_auto_pos.get('amount0_initial', 0),
'initial_usdc': active_auto_pos.get('amount1_initial', 0),
'initial_hedge_usdc': INITIAL_HEDGE_CAPITAL_USDC,
'current_eth_price': float(current_price),
'uniswap_pos_value_usd': float(current_pos_value_usd),
'uniswap_fees_claimed_usd': 0.0, # Not tracked accumulated yet in JSON, using Unclaimed mainly
'uniswap_fees_unclaimed_usd': float(total_fees_usd),
# Hedge Data (from JSON updated by clp_hedger)
'hedge_equity_usd': float(active_auto_pos.get('hedge_equity_usd', 0.0)),
'hedge_pnl_realized_usd': active_auto_pos.get('hedge_pnl_realized', 0.0),
'hedge_fees_paid_usd': active_auto_pos.get('hedge_fees_paid', 0.0)
}
# We use 'target_value' as a proxy for 'Initial Hedge Equity' + 'Initial Uni Val' if strictly tracking strategy?
# For now, let's pass what we have.
# To get 'hedge_equity', we ideally need clp_hedger to write it to JSON.
# Current implementation of kpi_tracker uses 'hedge_equity' in NAV.
# If we leave it 0, NAV will be underreported.
# WORKAROUND: Assume Hedge PnL Realized IS the equity change if we ignore margin.
log_kpi_snapshot(snapshot)
# --- REPOSITION LOGIC ---
pos_range_mode = active_auto_pos.get("range_mode", RANGE_MODE)
if pos_range_mode == "AUTO" and CLOSE_POSITION_ENABLED:
coin_for_dynamic = pos_details['token0_symbol'] if not is_t0_stable else pos_details['token1_symbol']
new_range_width = calculate_dynamic_range_pct(coin_for_dynamic)
if new_range_width:
# Use initial width from JSON, or current config width as fallback
old_range_width = Decimal(str(active_auto_pos.get("range_width_initial", RANGE_WIDTH_PCT)))
# Condition A: Difference > 20%
width_diff_pct = abs(new_range_width - old_range_width) / old_range_width
# Condition B: Profit > 0.1%
profit_pct = total_pnl_usd / initial_value
logger.info(f"📊 AUTO Check: CurRange {old_range_width*100:.2f}%, NewRange {new_range_width*100:.2f}% | Diff {width_diff_pct*100:.1f}% | Profit {profit_pct*100:.2f}%")
if width_diff_pct > 0.20 and profit_pct > 0.001:
logger.warning(f"🔄 REPOSITION TRIGGERED: Width Diff {width_diff_pct*100:.1f}%, Profit {profit_pct*100:.2f}%")
# Set in_range to False to force the closing logic below
in_range = False
else:
logger.warning(f"⚖️ AUTO Check Skipped: Market indicators for {coin_for_dynamic} are stale or conditions not met.")
if not in_range and CLOSE_POSITION_ENABLED:
logger.warning(f"🛑 Closing Position {token_id} (Out of Range)")
update_position_status(token_id, "CLOSING")
# 1. Remove Liquidity
if decrease_liquidity(w3, npm, account, token_id, pos_details['liquidity'], pos_details['token0_decimals'], pos_details['token1_decimals']):
# 2. Collect Fees
collect_fees(w3, npm, account, token_id)
update_position_status(token_id, "CLOSED")
# 3. Optional Rebalance (Sell 50% WETH if fell below)
if REBALANCE_ON_CLOSE_BELOW_RANGE and current_tick < tick_lower:
# Simple rebalance logic here (similar to original check_and_swap surplus logic)
pass
elif OPEN_POSITION_ENABLED:
logger.info("🔍 No active position. Analyzing market (Fast scan: 37s)...")
# Setup logic for new position
tA = clean_address(WETH_ADDRESS)
tB = clean_address(USDC_ADDRESS)
if tA.lower() < tB.lower():
token0, token1 = tA, tB
else:
token0, token1 = tB, tA
fee = POOL_FEE
pool_addr = factory.functions.getPool(token0, token1, fee).call()
pool_abi = AERODROME_POOL_ABI if "AERODROME" in CONFIG.get("NAME", "").upper() else UNISWAP_V3_POOL_ABI
pool_c = w3.eth.contract(address=pool_addr, abi=pool_abi)
pool_data = get_pool_dynamic_data(pool_c)
if pool_data:
tick = pool_data['tick']
# --- PRE-CALCULATE ESSENTIALS ---
# Fetch Decimals & Symbols immediately (Required for Oracle Check)
t0_c = w3.eth.contract(address=token0, abi=ERC20_ABI)
t1_c = w3.eth.contract(address=token1, abi=ERC20_ABI)
d0 = t0_c.functions.decimals().call()
d1 = t1_c.functions.decimals().call()
t0_sym = t0_c.functions.symbol().call().upper()
t1_sym = t1_c.functions.symbol().call().upper()
stable_symbols = ["USDC", "USDT", "DAI", "FDUSD", "USDS"]
is_t1_stable = any(s in t1_sym for s in stable_symbols)
is_t0_stable = any(s in t0_sym for s in stable_symbols)
price_0_in_1 = price_from_sqrt_price_x96(pool_data['sqrtPriceX96'], d0, d1)
# Define coin_sym early for Guard Rails
coin_sym = CONFIG.get("COIN_SYMBOL", "ETH")
# --- ORACLE GUARD RAIL ---
# Protect against Pool/Oracle divergence (Manipulation/Depeg/Lag)
if not force_mode_active:
oracle_price = get_realtime_price(coin_sym)
if oracle_price:
pool_price_dec = price_0_in_1 if is_t1_stable else (Decimal("1") / price_0_in_1)
divergence = abs(pool_price_dec - oracle_price) / oracle_price
if divergence > Decimal("0.0025"): # 0.25% Tolerance
logger.warning(f"⚠️ Price Divergence! Pool: {pool_price_dec:.2f} vs Oracle: {oracle_price:.2f} (Diff: {divergence*100:.2f}%). Aborting.")
time.sleep(10)
continue
else:
logger.warning("⚠️ Could not fetch Oracle price. Proceeding with caution (or consider aborting).")
# --- DYNAMIC RANGE CALCULATION ---
active_range_width = RANGE_WIDTH_PCT
current_range_mode = RANGE_MODE
# 1. PRIORITY: Force Mode
if force_mode_active:
logger.warning(f"🚨 FORCE OVERRIDE: Using forced width {force_width_pct*100:.2f}% (Ignoring safe checks)")
active_range_width = force_width_pct
current_range_mode = "FIXED"
# 2. AUTO Mode (Only if not forced)
elif RANGE_MODE == "AUTO":
dynamic_width = calculate_dynamic_range_pct(coin_sym)
if dynamic_width:
active_range_width = dynamic_width
logger.info(f"⚖️ AUTO Range Activated: {active_range_width*100:.4f}%")
else:
logger.info(f"⛔ AUTO conditions not met. Waiting for safe entry...")
time.sleep(MONITOR_INTERVAL_SECONDS)
continue # Skip logic
# 3. FIXED Mode (Default Fallback) is already set by initial active_range_width
# Define Range
tick_delta = int(math.log(1 + float(active_range_width)) / math.log(1.0001))
# Fetch actual tick spacing from pool
tick_spacing = pool_c.functions.tickSpacing().call()
logger.info(f"📏 Tick Spacing: {tick_spacing}")
tick_lower = (tick - tick_delta) // tick_spacing * tick_spacing
tick_upper = (tick + tick_delta) // tick_spacing * tick_spacing
# Calculate Amounts
# Target Value logic
# Determine Investment Value in Token1 terms
target_usd = Decimal(str(TARGET_INVESTMENT_VALUE_USDC))
investment_val_token1 = Decimal("0")
if str(TARGET_INVESTMENT_VALUE_USDC).upper() == "MAX":
# ... (Existing MAX logic needs update too, but skipping for brevity as user uses fixed amount)
pass
else:
if is_t1_stable:
# T1 is stable (e.g. ETH/USDC). Target 2000 USD = 2000 Token1.
investment_val_token1 = target_usd
elif is_t0_stable:
# T0 is stable (e.g. USDT/BNB). Target 2000 USD = 2000 Token0.
# We need value in Token1.
# Price 0 in 1 = (BNB per USDT) approx 0.0012
# Val T1 = Val T0 * Price(0 in 1)
investment_val_token1 = target_usd * price_0_in_1
logger.info(f"💱 Converted ${target_usd} -> {investment_val_token1:.4f} {t1_sym} (Price: {price_0_in_1:.6f})")
else:
# Fallback: Assume T1 is Stable (Dangerous but standard default)
logger.warning("⚠️ Could not detect Stable token. Assuming T1 is stable.")
investment_val_token1 = target_usd
amt0, amt1 = calculate_mint_amounts(tick, tick_lower, tick_upper, investment_val_token1, d0, d1, pool_data['sqrtPriceX96'])
if check_and_swap_for_deposit(w3, router, account, token0, token1, amt0, amt1, pool_data['sqrtPriceX96'], d0, d1):
# --- STALE DATA PROTECTION (Pre-Mint) ---
# Check if price moved significantly during calculation/swap
pre_mint_data = get_pool_dynamic_data(pool_c)
if pre_mint_data:
tick_diff = abs(pre_mint_data['tick'] - pool_data['tick'])
# 13 ticks ~ 0.13% price move. Abort if volatile.
if tick_diff > 13:
logger.warning(f"⚠️ Price moved too much ({tick_diff} ticks) during setup/swap. Aborting mint to prevent bad entry.")
time.sleep(5)
continue
minted = mint_new_position(w3, npm, account, token0, token1, amt0, amt1, tick_lower, tick_upper, d0, d1)
if minted:
# --- DISABLE FORCE MODE AFTER FIRST MINT ---
if force_mode_active:
logger.info("🛑 FORCE MODE CONSUMED: Returning to standard AUTO checks for future positions.")
force_mode_active = False
# --- RE-FETCH PRICE FOR ACCURATE ENTRY DATA (Post-Mint) ---
fresh_pool_data = get_pool_dynamic_data(pool_c)
if fresh_pool_data:
fresh_tick = fresh_pool_data['tick']
price_0_in_1 = price_from_tick(fresh_tick, d0, d1)
logger.info(f"🔄 Refreshed Entry Tick: {fresh_tick} (Was: {pool_data['tick']})")
else:
price_0_in_1 = price_from_tick(pool_data['tick'], d0, d1)
fmt_amt0 = float(Decimal(minted['amount0']) / Decimal(10**d0))
fmt_amt1 = float(Decimal(minted['amount1']) / Decimal(10**d1))
if is_t1_stable:
entry_price = float(price_0_in_1)
actual_value = (fmt_amt0 * entry_price) + fmt_amt1
r_upper = float(price_from_tick(minted['tick_upper'], d0, d1))
r_lower = float(price_from_tick(minted['tick_lower'], d0, d1))
else:
# Inverted (T0 is stable)
entry_price = float(Decimal("1") / price_0_in_1)
actual_value = fmt_amt0 + (fmt_amt1 * entry_price)
r_upper = float(Decimal("1") / price_from_tick(minted['tick_lower'], d0, d1))
r_lower = float(Decimal("1") / price_from_tick(minted['tick_upper'], d0, d1))
# Prepare ordered data with specific rounding
new_position_data = {
"type": "AUTOMATIC",
"target_value": round(float(actual_value), 2),
"entry_price": round(entry_price, 4),
"amount0_initial": round(fmt_amt0, 4),
"amount1_initial": round(fmt_amt1, 4),
"liquidity": str(minted['liquidity']),
"range_upper": round(r_upper, 4),
"range_lower": round(r_lower, 4),
"token0_decimals": d0,
"token1_decimals": d1,
"range_mode": current_range_mode,
"range_width_initial": float(active_range_width),
"timestamp_open": int(time.time()),
"time_open": datetime.now().strftime("%d.%m.%y %H:%M:%S")
}
update_position_status(minted['token_id'], "OPEN", new_position_data)
# Dynamic Sleep: 37s if no position, else configured interval
sleep_time = MONITOR_INTERVAL_SECONDS if active_auto_pos else 37
time.sleep(sleep_time)
except KeyboardInterrupt:
logger.info("👋 Exiting...")
break
except Exception as e:
logger.error(f"❌ Main Loop Error: {e}")
time.sleep(MONITOR_INTERVAL_SECONDS)
if __name__ == "__main__":
main()