Files
uniswap_auto_clp/clp_hedger.py
DiTus b22fdcf741 refactor: Standardize CLP Manager and Hedger modules & cleanup
- **clp_manager.py**: Renamed from 'uniswap_manager.py'. Standardized logic for Uniswap V3 liquidity provision.
- **clp_hedger.py**: Renamed from 'unified_hedger.py'. Consolidated hedging logic including Delta Calculation fixes, EAC (Edge Avoidance), and Fishing order implementation.
- **Cleanup**: Removed legacy 'aerodrome' folder and tools.
- **Monitoring**: Added Telegram monitoring scripts.
- **Config**: Updated gitignore to exclude market data CSVs.
2025-12-31 11:09:33 +01:00

971 lines
45 KiB
Python

import os
import time
import logging
import sys
import json
import glob
import math
from decimal import Decimal, getcontext, ROUND_DOWN
from typing import Optional, Dict, Any, List, Union
from dotenv import load_dotenv
# --- FIX: Add project root to sys.path to import local modules ---
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
sys.path.append(project_root)
# Import local modules
try:
from logging_utils import setup_logging
except ImportError:
setup_logging = None
# Ensure root logger is clean if we can't use setup_logging
logging.getLogger().handlers.clear()
logging.basicConfig(level=logging.INFO)
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants
from clp_config import CLP_PROFILES, DEFAULT_STRATEGY
# Load environment variables
dotenv_path = os.path.join(current_dir, '.env')
load_dotenv(dotenv_path if os.path.exists(dotenv_path) else None)
# --- LOGGING SETUP ---
# Ensure logs directory exists
log_dir = os.path.join(current_dir, 'logs')
os.makedirs(log_dir, exist_ok=True)
# Custom Filter for Millisecond Unix Timestamp (Matching Manager style)
class UnixMsLogFilter(logging.Filter):
def filter(self, record):
record.unix_ms = int(record.created * 1000)
return True
# Configure Logging
logger = logging.getLogger("CLP_HEDGER")
logger.setLevel(logging.INFO)
logger.propagate = False # Prevent double logging from root logger
logger.handlers.clear() # Clear existing handlers to prevent duplicates
# Console Handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_fmt = logging.Formatter('%(asctime)s - %(message)s')
console_handler.setFormatter(console_fmt)
logger.addHandler(console_handler)
# File Handler
log_file = os.path.join(log_dir, 'clp_hedger.log')
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.addFilter(UnixMsLogFilter())
file_fmt = logging.Formatter('%(unix_ms)d, %(asctime)s - %(message)s')
file_handler.setFormatter(file_fmt)
logger.addHandler(file_handler)
# --- DECIMAL PRECISION CONFIGURATION ---
getcontext().prec = 50
# --- CONFIGURATION ---
# Settings are loaded from clp_config.py into self.coin_configs
# --- HELPER FUNCTIONS ---
def to_decimal(value: Any) -> Decimal:
"""Safely convert value to Decimal."""
if value is None:
return Decimal("0")
return Decimal(str(value))
def round_to_sz_decimals_precise(amount: Decimal, sz_decimals: int) -> float:
"""Round Decimal amount to specific decimals and return float for SDK."""
if amount == 0:
return 0.0
quantizer = Decimal("1").scaleb(-sz_decimals)
rounded = amount.quantize(quantizer, rounding=ROUND_DOWN)
return float(rounded)
def round_to_sig_figs_precise(x: Decimal, sig_figs: int = 5) -> float:
"""Round Decimal to significant figures and return float for SDK."""
if x == 0:
return 0.0
# Use string formatting for sig figs as it's robust
return float(f"{x:.{sig_figs}g}")
def validate_trade_size(size: Decimal, sz_decimals: int, min_order_value: Decimal, price: Decimal) -> float:
"""Validate trade size against minimums."""
if size <= 0:
return 0.0
# Check minimum order value
order_value = size * price
if order_value < min_order_value:
return 0.0
# Check dust
min_size = Decimal("10") ** (-sz_decimals)
if size < min_size:
return 0.0
return round_to_sz_decimals_precise(size, sz_decimals)
def update_position_stats(file_path: str, token_id: int, stats_data: Dict):
if not os.path.exists(file_path): return
try:
with open(file_path, 'r') as f:
data = json.load(f)
if isinstance(data, dict): data = [data]
updated = False
for entry in data:
if entry.get('token_id') == token_id:
entry.update(stats_data)
updated = True
break
if updated:
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Error updating JSON stats in {file_path}: {e}")
# --- STRATEGY CLASS ---
class HyperliquidStrategy:
def __init__(self, entry_amount0: Decimal, entry_amount1: Decimal, target_value: Decimal,
entry_price: Decimal, low_range: Decimal, high_range: Decimal, start_price: Decimal,
liquidity: int = 0, liquidity_scale: Decimal = Decimal("1e-12")):
self.entry_amount0 = entry_amount0
self.entry_amount1 = entry_amount1
self.target_value = target_value
self.entry_price = entry_price
self.low_range = low_range
self.high_range = high_range
self.start_price = start_price
self.gap = max(Decimal("0.0"), entry_price - start_price)
self.recovery_target = entry_price + (Decimal("2") * self.gap)
self.L = Decimal("0.0")
# Priority: Use exact Liquidity from Contract if available
if liquidity > 0:
self.L = Decimal(liquidity) * liquidity_scale
# Calculate implied delta at entry for verification
implied_delta = self.get_pool_delta(entry_price)
# Log removed to reduce spam in unified logger
# logger.info(f"Using Exact Liquidity: {self.L:.4f} (Raw: {liquidity}, Scale: {liquidity_scale})")
else:
try:
sqrt_P = entry_price.sqrt()
sqrt_Pa = low_range.sqrt()
sqrt_Pb = high_range.sqrt()
if entry_amount0 > 0:
denom0 = (Decimal("1") / sqrt_P) - (Decimal("1") / sqrt_Pb)
if denom0 > Decimal("1e-10"):
self.L = entry_amount0 / denom0
if self.L == 0 and entry_amount1 > 0:
denom1 = sqrt_P - sqrt_Pa
if denom1 > Decimal("1e-10"):
self.L = entry_amount1 / denom1
if self.L == 0:
max_eth = target_value / low_range
denom_h = (Decimal("1") / sqrt_Pa) - (Decimal("1") / sqrt_Pb)
if denom_h > 0:
self.L = max_eth / denom_h
except Exception as e:
logger.error(f"Error calculating liquidity: {e}")
def get_pool_delta(self, current_price: Decimal) -> Decimal:
if current_price >= self.high_range:
return Decimal("0.0")
if current_price <= self.low_range:
sqrt_Pa = self.low_range.sqrt()
sqrt_Pb = self.high_range.sqrt()
return self.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb))
sqrt_P = current_price.sqrt()
sqrt_Pb = self.high_range.sqrt()
return self.L * (sqrt_Pb - sqrt_P) / (sqrt_P * sqrt_Pb)
def get_compensation_boost(self) -> Decimal:
if self.low_range <= 0: return Decimal("0.075")
range_width_pct = (self.high_range - self.low_range) / self.low_range
# Use default strategy values if not available in instance context,
# but typically these are constant. For now hardcode per plan or use safe defaults.
# Since this is inside Strategy which doesn't know about global config easily,
# we'll implement the logic defined in the plan directly.
if range_width_pct < Decimal("0.02"): # <2% range
return Decimal("0.15") # Double protection for narrow ranges
elif range_width_pct < Decimal("0.05"): # <5% range
return Decimal("0.10") # Moderate for medium ranges
else: # >=5% range
return Decimal("0.075") # Standard for wide ranges
def calculate_rebalance(self, current_price: Decimal, current_short_size: Decimal) -> Dict:
# Note: current_short_size here is virtual (just for this specific strategy),
# but the unified hedger will use the 'target_short' output primarily.
pool_delta = self.get_pool_delta(current_price)
# --- ASYMMETRIC COMPENSATION ---
adj_pct = Decimal("0.0")
range_width = self.high_range - self.low_range
if range_width > 0:
dist = current_price - self.entry_price
half_width = range_width / Decimal("2")
norm_dist = dist / half_width
max_boost = self.get_compensation_boost()
adj_pct = -norm_dist * max_boost
adj_pct = max(-max_boost, min(max_boost, adj_pct))
raw_target_short = pool_delta
adjusted_target_short = raw_target_short * (Decimal("1.0") + adj_pct)
diff = adjusted_target_short - abs(current_short_size)
return {
"current_price": current_price,
"pool_delta": pool_delta,
"target_short": adjusted_target_short,
"current_short": abs(current_short_size),
"diff": diff,
"action": "SELL" if diff > 0 else "BUY",
"adj_pct": adj_pct
}
# --- UNIFIED HEDGER CLASS ---
class UnifiedHedger:
def __init__(self):
self.private_key = os.environ.get("HEDGER_PRIVATE_KEY")
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.private_key:
logger.error("No HEDGER_PRIVATE_KEY found in .env")
sys.exit(1)
self.account = Account.from_key(self.private_key)
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address)
# Maps (file_path, token_id) -> Strategy Instance
self.strategies: Dict[tuple, HyperliquidStrategy] = {}
# Maps (file_path, token_id) -> State Data (accumulated pnl etc)
self.strategy_states: Dict[tuple, Dict] = {}
# Unified State
self.coin_configs: Dict[str, Dict] = {} # Symbol -> Config (thresholds, decimals)
self.active_coins = set()
self.api_backoff_until = 0
# Market Data Cache
self.last_prices = {}
self.price_history = {} # Symbol -> List[Decimal]
self.last_trade_times = {} # Symbol -> timestamp
# Shadow Orders (Global List)
self.shadow_orders = []
self.startup_time = time.time()
logger.info(f"[CLP_HEDGER] Master Hedger initialized. Agent: {self.account.address}")
self._init_coin_configs()
def _init_coin_configs(self):
"""Pre-load configuration for known coins from CLP_PROFILES."""
for profile_key, profile_data in CLP_PROFILES.items():
symbol = profile_data.get("COIN_SYMBOL")
if symbol:
if symbol not in self.coin_configs:
# Init with Defaults
self.coin_configs[symbol] = DEFAULT_STRATEGY.copy()
self.coin_configs[symbol]["sz_decimals"] = 4 # Will be updated by API
# Update with Profile Specifics
self.coin_configs[symbol].update(profile_data)
def _get_sz_decimals(self, coin: str) -> int:
try:
meta = self.info.meta()
for asset in meta["universe"]:
if asset["name"] == coin:
return asset["szDecimals"]
return 4
except: return 4
def update_coin_decimals(self):
try:
meta = self.info.meta()
for asset in meta["universe"]:
coin = asset["name"]
if coin in self.coin_configs:
self.coin_configs[coin]["sz_decimals"] = asset["szDecimals"]
elif coin in self.active_coins:
self.coin_configs[coin] = DEFAULT_STRATEGY.copy()
self.coin_configs[coin]["sz_decimals"] = asset["szDecimals"]
except Exception as e:
logger.error(f"Failed to update coin decimals: {e}")
def calculate_volatility(self, coin: str) -> Decimal:
history = self.price_history.get(coin, [])
if not history or len(history) < 30:
return Decimal("0.0")
try:
n = len(history)
mean = sum(history) / n
variance = sum([pow(p - mean, 2) for p in history]) / n
std_dev = variance.sqrt()
if mean == 0: return Decimal("0.0")
return std_dev / mean
except: return Decimal("0.0")
def check_shadow_orders(self, l2_snapshots: Dict):
"""Check if pending shadow (theoretical Maker) orders would have been filled."""
if not self.shadow_orders: return
now = time.time()
remaining = []
for order in self.shadow_orders:
coin = order.get('coin')
if not coin or coin not in l2_snapshots:
remaining.append(order)
continue
levels = l2_snapshots[coin]['levels']
# Snapshot: [ [ {px, sz, n}, ... ], [ {px, sz, n}, ... ] ] -> [Bids, Asks]
# Bids = levels[0], Asks = levels[1]
if not levels[0] or not levels[1]:
remaining.append(order)
continue
best_bid = to_decimal(levels[0][0]['px'])
best_ask = to_decimal(levels[1][0]['px'])
filled = False
if order['side'] == 'BUY':
# Filled if Ask <= Our Bid
if best_ask <= order['price']: filled = True
else: # SELL
# Filled if Bid >= Our Ask
if best_bid >= order['price']: filled = True
if filled:
timeout = self.coin_configs.get(coin, {}).get("SHADOW_ORDER_TIMEOUT", 600)
duration = now - (order['expires_at'] - timeout)
logger.info(f"[SHADOW] ✅ SUCCESS: {coin} Maker {order['side']} @ {order['price']:.2f} filled in {duration:.1f}s")
elif now > order['expires_at']:
logger.info(f"[SHADOW] ❌ FAILED: {coin} Maker {order['side']} @ {order['price']:.2f} timed out")
else:
remaining.append(order)
self.shadow_orders = remaining
def scan_strategies(self) -> bool:
"""Scans all *_status.json files and updates active strategies. Returns False if any read failed."""
status_files = glob.glob(os.path.join(current_dir, "*_status.json"))
# logger.info(f"[DEBUG] Scanning {len(status_files)} status files...")
active_ids = set()
successful_files = set()
has_errors = False
for file_path in status_files:
# Determine Profile/Coin from filename or content?
# Filename convention: {TARGET_DEX}_status.json
filename = os.path.basename(file_path)
dex_name = filename.replace("_status.json", "")
profile = CLP_PROFILES.get(dex_name)
if not profile:
# Fallback: Try to guess or skip
continue
coin_symbol = profile.get("COIN_SYMBOL", "ETH")
self.active_coins.add(coin_symbol)
try:
with open(file_path, 'r') as f:
content = f.read().strip()
if not content:
# Empty file (being written?) -> treat as error to be safe
raise ValueError("Empty file")
data = json.loads(content)
successful_files.add(file_path) # Mark as safe to sync
if isinstance(data, dict): data = [data]
for entry in data:
if entry.get('type') == 'AUTOMATIC' and entry.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']:
token_id = entry['token_id']
key = (file_path, token_id)
active_ids.add(key)
# Init or Update Strategy
if key not in self.strategies:
self._init_single_strategy(key, entry, coin_symbol)
# Init Status
if key in self.strategy_states:
self.strategy_states[key]['status'] = entry.get('status', 'OPEN')
else:
# Refresh PnL/Fees from file in case they changed
if key in self.strategy_states:
self.strategy_states[key]['pnl'] = to_decimal(entry.get('hedge_pnl_realized', 0))
self.strategy_states[key]['fees'] = to_decimal(entry.get('hedge_fees_paid', 0))
self.strategy_states[key]['status'] = entry.get('status', 'OPEN')
except Exception as e:
logger.error(f"Error reading {filename}: {e}. Skipping updates.")
has_errors = True
# Remove stale strategies ONLY from files we successfully read
current_keys = list(self.strategies.keys())
for k in current_keys:
file_origin = k[0]
# If the file is gone (not in status_files) OR we successfully read it and the key is missing:
if file_origin not in status_files or (file_origin in successful_files and k not in active_ids):
logger.info(f"Strategy {k[1]} removed (Closed/Gone).")
del self.strategies[k]
if k in self.strategy_states: del self.strategy_states[k]
return not has_errors
def _init_single_strategy(self, key, position_data, coin_symbol):
try:
entry_amount0 = to_decimal(position_data.get('amount0_initial', 0))
entry_amount1 = to_decimal(position_data.get('amount1_initial', 0))
target_value = to_decimal(position_data.get('target_value', 50))
entry_price = to_decimal(position_data['entry_price'])
lower = to_decimal(position_data['range_lower'])
upper = to_decimal(position_data['range_upper'])
liquidity_val = int(position_data.get('liquidity', 0))
d0 = int(position_data.get('token0_decimals', 18))
d1 = int(position_data.get('token1_decimals', 6))
liquidity_scale = Decimal("10") ** Decimal(str(-(d0 + d1) / 2))
start_price = self.last_prices.get(coin_symbol)
if start_price is None:
# Will init next loop when price available
return
strat = HyperliquidStrategy(
entry_amount0, entry_amount1, target_value, entry_price, lower, upper, start_price,
liquidity_val, liquidity_scale
)
# Fix: Use persistent start time from JSON to track all fills
ts_open = position_data.get('timestamp_open')
start_time_ms = int(ts_open * 1000) if ts_open else int(time.time() * 1000)
self.strategies[key] = strat
self.strategy_states[key] = {
"coin": coin_symbol,
"start_time": start_time_ms,
"pnl": to_decimal(position_data.get('hedge_pnl_realized', 0)),
"fees": to_decimal(position_data.get('hedge_fees_paid', 0)),
"hedge_TotPnL": to_decimal(position_data.get('hedge_TotPnL', 0)), # NEW: Total Closed PnL
"entry_price": entry_price, # Store for fishing logic
"status": position_data.get('status', 'OPEN')
}
logger.info(f"[STRAT] Init {key[1]} ({coin_symbol}) | Range: {lower}-{upper}")
except Exception as e:
logger.error(f"Failed to init strategy {key[1]}: {e}")
def place_limit_order(self, coin: str, is_buy: bool, size: Decimal, price: Decimal, order_type: str = "Alo") -> Optional[int]:
config = self.coin_configs.get(coin, {})
sz_decimals = config.get("sz_decimals", 4)
min_order = config.get("MIN_ORDER_VALUE_USD", Decimal("10.0"))
validated_size_float = validate_trade_size(size, sz_decimals, min_order, price)
if validated_size_float == 0:
return None
price_float = round_to_sig_figs_precise(price, 5)
logger.info(f"[ORDER] {order_type.upper()} {coin} {'BUY' if is_buy else 'SELL'} {validated_size_float} @ {price_float}")
try:
order_result = self.exchange.order(coin, is_buy, validated_size_float, price_float, {"limit": {"tif": order_type}}, reduce_only=is_buy)
status = order_result["status"]
if status == "ok":
response_data = order_result["response"]["data"]
if "statuses" in response_data:
status_obj = response_data["statuses"][0]
if "resting" in status_obj:
return status_obj["resting"]["oid"]
elif "filled" in status_obj:
logger.info("Order filled immediately.")
return status_obj["filled"]["oid"]
elif "error" in status_obj:
err_msg = status_obj['error']
if "Post only order would have immediately matched" in err_msg:
logger.warning(f"[RETRY] Maker order rejected (Crossed). Will retry.")
else:
logger.error(f"Order API Error: {err_msg}")
else:
logger.error(f"Order Failed: {order_result}")
except Exception as e:
if "429" in str(e):
logger.warning(f"Rate limit hit (429). Backing off.")
self.api_backoff_until = time.time() + 30
else:
logger.error(f"Exception during trade: {e}")
return None
def get_open_orders(self) -> List[Dict]:
try:
return self.info.open_orders(self.vault_address or self.account.address)
except:
return []
def cancel_order(self, coin: str, oid: int):
try:
self.exchange.cancel(coin, oid)
except Exception as e:
logger.error(f"Error cancelling order: {e}")
def _update_closed_pnl(self, coin: str):
"""Fetch fills from API and sum closedPnl and fees for active strategies."""
try:
# 1. Identify relevant strategies for this coin
active_strats = [k for k, v in self.strategy_states.items() if v['coin'] == coin]
if not active_strats: return
# 2. Fetch all fills (This is heavy, maybe cache or limit?)
# SDK user_fills returns recent fills.
fills = self.info.user_fills(self.vault_address or self.account.address)
for key in active_strats:
start_time = self.strategy_states[key]['start_time']
total_closed_pnl = Decimal("0")
total_fees = Decimal("0")
for fill in fills:
if fill['coin'] == coin:
# Check timestamp
if fill['time'] >= start_time:
# Sum closedPnl
total_closed_pnl += to_decimal(fill.get('closedPnl', 0))
# Sum fees
total_fees += to_decimal(fill.get('fee', 0))
# Update State
self.strategy_states[key]['hedge_TotPnL'] = total_closed_pnl
self.strategy_states[key]['fees'] = total_fees
# Write to JSON
file_path, token_id = key
update_position_stats(file_path, token_id, {
"hedge_TotPnL": float(total_closed_pnl),
"hedge_fees_paid": float(total_fees)
})
logger.info(f"[PnL] Updated {coin} | Closed PnL: ${total_closed_pnl:.2f} | Fees: ${total_fees:.2f}")
except Exception as e:
logger.error(f"Failed to update closed PnL/Fees for {coin}: {e}")
def run(self):
logger.info("Starting Unified Hedger Loop...")
self.update_coin_decimals()
# --- LOG SETTINGS ON START ---
logger.info("=== HEDGER CONFIGURATION ===")
for symbol, config in self.coin_configs.items():
logger.info(f"--- {symbol} ---")
for k, v in config.items():
logger.info(f" {k}: {v}")
logger.info("============================")
while True:
try:
# 1. API Backoff
if time.time() < self.api_backoff_until:
time.sleep(1)
continue
# 2. Update Strategies
if not self.scan_strategies():
logger.warning("Strategy scan failed (read error). Skipping execution tick.")
time.sleep(1)
continue
# 3. Fetch Market Data (Centralized)
try:
mids = self.info.all_mids()
user_state = self.info.user_state(self.vault_address or self.account.address)
open_orders = self.get_open_orders()
l2_snapshots = {} # Cache for snapshots
except Exception as e:
logger.error(f"API Error fetching data: {e}")
time.sleep(1)
continue
# Map Open Orders
orders_map = {}
for o in open_orders:
c = o['coin']
if c not in orders_map: orders_map[c] = []
orders_map[c].append(o)
# Parse User State
account_value = Decimal("0")
if "marginSummary" in user_state and "accountValue" in user_state["marginSummary"]:
account_value = to_decimal(user_state["marginSummary"]["accountValue"])
# Map current positions
current_positions = {} # Coin -> Size
current_pnls = {} # Coin -> Unrealized PnL
current_entry_pxs = {} # Coin -> Entry Price (NEW)
for pos in user_state["assetPositions"]:
c = pos["position"]["coin"]
s = to_decimal(pos["position"]["szi"])
u = to_decimal(pos["position"]["unrealizedPnl"])
e = to_decimal(pos["position"]["entryPx"])
current_positions[c] = s
current_pnls[c] = u
current_entry_pxs[c] = e
# 4. Aggregate Targets
# Coin -> { 'target_short': Decimal, 'contributors': int, 'is_at_edge': bool }
aggregates = {}
# First, update all prices from mids for active coins
for coin in self.active_coins:
if coin in mids:
price = to_decimal(mids[coin])
self.last_prices[coin] = price
# Update Price History
if coin not in self.price_history: self.price_history[coin] = []
self.price_history[coin].append(price)
if len(self.price_history[coin]) > 300: self.price_history[coin].pop(0)
for key, strat in self.strategies.items():
coin = self.strategy_states[key]['coin']
status = self.strategy_states[key].get('status', 'OPEN')
if coin not in self.last_prices: continue
price = self.last_prices[coin]
# Calc Logic
calc = strat.calculate_rebalance(price, Decimal("0"))
if coin not in aggregates:
aggregates[coin] = {'target_short': Decimal("0"), 'contributors': 0, 'is_at_edge': False, 'adj_pct': Decimal("0"), 'is_closing': False}
if status == 'CLOSING':
# If Closing, we want target to be 0 for this strategy
logger.info(f"[STRAT] {key[1]} is CLOSING -> Force Target 0")
aggregates[coin]['is_closing'] = True
# Do not add to target_short
else:
aggregates[coin]['target_short'] += calc['target_short']
aggregates[coin]['contributors'] += 1
aggregates[coin]['adj_pct'] = calc['adj_pct']
# Check Edge Proximity for Cleanup
config = self.coin_configs.get(coin, {})
enable_cleanup = config.get("ENABLE_EDGE_CLEANUP", True)
cleanup_margin = config.get("EDGE_CLEANUP_MARGIN_PCT", Decimal("0.02"))
if enable_cleanup:
dist_bottom_pct = (price - strat.low_range) / strat.low_range
dist_top_pct = (strat.high_range - price) / strat.high_range
range_width_pct = (strat.high_range - strat.low_range) / strat.low_range
safety_margin_pct = range_width_pct * cleanup_margin
if dist_bottom_pct < safety_margin_pct or dist_top_pct < safety_margin_pct:
aggregates[coin]['is_at_edge'] = True
# Check Shadow Orders (Pre-Execution)
self.check_shadow_orders(l2_snapshots)
# 5. Execute Per Coin
# Union of coins with Active Strategies OR Active Positions
coins_to_process = set(aggregates.keys())
for c, pos in current_positions.items():
if abs(pos) > 0: coins_to_process.add(c)
for coin in coins_to_process:
data = aggregates.get(coin, {'target_short': Decimal("0"), 'contributors': 0, 'is_at_edge': False, 'adj_pct': Decimal("0"), 'is_closing': False})
price = self.last_prices.get(coin, Decimal("0")) # FIX: Explicitly get price for this coin
if price == 0: continue
target_short_abs = data['target_short'] # Always positive (it's a magnitude of short)
target_position = -target_short_abs # We want to be Short, so negative size
current_pos = current_positions.get(coin, Decimal("0"))
diff = target_position - current_pos # e.g. -1.0 - (-0.8) = -0.2 (Sell 0.2)
diff_abs = abs(diff)
# Thresholds
config = self.coin_configs.get(coin, {})
min_thresh = config.get("min_threshold", Decimal("0.008"))
# Volatility Multiplier
vol_pct = self.calculate_volatility(coin)
base_vol = Decimal("0.0005")
vol_mult = max(Decimal("1.0"), min(Decimal("3.0"), vol_pct / base_vol)) if vol_pct > 0 else Decimal("1.0")
base_rebalance_pct = config.get("BASE_REBALANCE_THRESHOLD_PCT", Decimal("0.20"))
thresh_pct = min(Decimal("0.15"), base_rebalance_pct * vol_mult)
dynamic_thresh = max(min_thresh, abs(target_position) * thresh_pct)
# FORCE EDGE CLEANUP
enable_edge_cleanup = config.get("ENABLE_EDGE_CLEANUP", True)
if data['is_at_edge'] and enable_edge_cleanup:
if dynamic_thresh > min_thresh:
# logger.info(f"[EDGE] {coin} forced to min threshold.")
dynamic_thresh = min_thresh
# Check Trigger
action_needed = diff_abs > dynamic_thresh
# Determine Intent (Moved UP for Order Logic)
is_buy_bool = diff > 0
side_str = "BUY" if is_buy_bool else "SELL"
# Manage Existing Orders
existing_orders = orders_map.get(coin, [])
force_taker_retry = False
# Fishing Config
enable_fishing = config.get("ENABLE_FISHING", False)
fishing_timeout = config.get("FISHING_TIMEOUT_FALLBACK", 30)
# Check Existing Orders for compatibility
order_matched = False
price_buffer_pct = config.get("PRICE_BUFFER_PCT", Decimal("0.0015"))
for o in existing_orders:
o_oid = o['oid']
o_price = to_decimal(o['limitPx'])
o_side = o['side'] # 'B' or 'A'
o_timestamp = o.get('timestamp', int(time.time()*1000))
is_same_side = (o_side == 'B' and is_buy_bool) or (o_side == 'A' and not is_buy_bool)
# Price Check (within buffer)
dist_pct = abs(price - o_price) / price
# Maker Timeout Check (General)
maker_timeout = config.get("MAKER_ORDER_TIMEOUT", 300)
order_age_sec = (int(time.time()*1000) - o_timestamp) / 1000.0
if is_same_side and order_age_sec > maker_timeout:
logger.info(f"[TIMEOUT] {coin} Order {o_oid} expired ({order_age_sec:.1f}s > {maker_timeout}s). Cancelling to refresh.")
self.cancel_order(coin, o_oid)
continue
# Fishing Timeout Check
if enable_fishing and is_same_side and order_age_sec > fishing_timeout:
logger.info(f"[FISHING] {coin} Order {o_oid} timed out ({order_age_sec:.1f}s > {fishing_timeout}s). Cancelling for Taker retry.")
self.cancel_order(coin, o_oid)
force_taker_retry = True
continue # Do not mark matched, let it flow to execution
if is_same_side and dist_pct < price_buffer_pct:
order_matched = True
if int(time.time()) % 10 == 0:
logger.info(f"[WAIT] {coin} Pending {side_str} Order {o_oid} @ {o_price} (Dist: {dist_pct*100:.3f}%) | Age: {order_age_sec:.1f}s")
break
else:
logger.info(f"Cancelling stale order {o_oid} ({o_side} @ {o_price})")
self.cancel_order(coin, o_oid)
# --- EXECUTION LOGIC ---
if not order_matched:
if action_needed or force_taker_retry:
bypass_cooldown = False
force_maker = False
# 0. Forced Taker Retry (Fishing Timeout)
if force_taker_retry:
bypass_cooldown = True
logger.info(f"[RETRY] {coin} Fishing Failed -> Force Taker")
# 1. Urgent Closing -> Taker
elif data.get('is_closing', False):
bypass_cooldown = True
logger.info(f"[URGENT] {coin} Closing Strategy -> Force Taker Exit")
# 2. Ghost/Cleanup -> Maker
elif data.get('contributors', 0) == 0:
if time.time() - self.startup_time > 5:
force_maker = True
logger.info(f"[CLEANUP] {coin} Ghost Position -> Force Maker Reduce")
else:
logger.info(f"[STARTUP] Skipping Ghost Cleanup for {coin} (Grace Period)")
continue # Skip execution for this coin
# Large Hedge Check (Only Force Taker if AT EDGE)
large_hedge_mult = config.get("LARGE_HEDGE_MULTIPLIER", Decimal("5.0"))
if diff_abs > (dynamic_thresh * large_hedge_mult) and not force_maker and data.get('is_at_edge', False):
bypass_cooldown = True
logger.info(f"[WARN] LARGE HEDGE (Edge Protection): {diff_abs:.4f} > {dynamic_thresh:.4f} (x{large_hedge_mult})")
elif diff_abs > (dynamic_thresh * large_hedge_mult) and not force_maker:
# Large hedge but safe zone -> Maker is fine, but maybe log it
logger.info(f"[INFO] Large Hedge (Safe Zone): {diff_abs:.4f}. Using Standard Execution.")
last_trade = self.last_trade_times.get(coin, 0)
min_time_trade = config.get("MIN_TIME_BETWEEN_TRADES", 60)
can_trade = False
if bypass_cooldown:
can_trade = True
elif time.time() - last_trade > min_time_trade:
can_trade = True
if can_trade:
# Get Orderbook for Price
if coin not in l2_snapshots:
l2_snapshots[coin] = self.info.l2_snapshot(coin)
levels = l2_snapshots[coin]['levels']
if not levels[0] or not levels[1]: continue
bid = to_decimal(levels[0][0]['px'])
ask = to_decimal(levels[1][0]['px'])
# Price logic
create_shadow = False
# Decide Order Type: Taker (Ioc) or Maker (Alo)
# Taker if: Urgent (bypass_cooldown) OR Fishing Disabled OR Force Maker is False (wait, Force Maker means Alo)
# Logic:
# If Force Maker -> Alo
# Else if Urgent -> Ioc
# Else if Enable Fishing -> Alo
# Else -> Alo (Default non-urgent behavior was Alo anyway?)
# Let's clarify:
# Previous logic: if bypass_cooldown -> Ioc. Else -> Alo.
# New logic:
# If bypass_cooldown -> Ioc
# Else -> Alo (Fishing)
if bypass_cooldown and not force_maker:
exec_price = ask * Decimal("1.001") if is_buy_bool else bid * Decimal("0.999")
order_type = "Ioc"
create_shadow = True
else:
# Fishing / Standard Maker
exec_price = bid if is_buy_bool else ask
order_type = "Alo"
logger.info(f"[TRIG] Net {coin}: {side_str} {diff_abs:.4f} | Tgt: {target_position:.4f} / Cur: {current_pos:.4f} | Thresh: {dynamic_thresh:.4f} | Type: {order_type}")
oid = self.place_limit_order(coin, is_buy_bool, diff_abs, exec_price, order_type)
if oid:
self.last_trade_times[coin] = time.time()
# Shadow Order
if create_shadow:
shadow_price = bid if is_buy_bool else ask
shadow_timeout = config.get("SHADOW_ORDER_TIMEOUT", 600)
self.shadow_orders.append({
'coin': coin,
'side': side_str,
'price': shadow_price,
'expires_at': time.time() + shadow_timeout
})
logger.info(f"[SHADOW] Created Maker {side_str} @ {shadow_price:.2f}")
# UPDATED: Sleep for API Lag (Phase 5.1)
logger.info("Sleeping 10s to allow position update...")
time.sleep(10)
# --- UPDATE CLOSED PnL FROM API ---
self._update_closed_pnl(coin)
else:
# Cooldown log
pass
else:
# Action NOT needed
# Cleanup any dangling orders
if existing_orders:
for o in existing_orders:
logger.info(f"Cancelling idle order {o['oid']} ({o['side']} @ {o['limitPx']})")
self.cancel_order(coin, o['oid'])
# --- IDLE LOGGING (Restored Format) ---
# Calculate aggregate Gamma to estimate triggers
# Gamma = 0.5 * Sum(L) * P^-1.5
# We need Sum(L) for this coin.
total_L = Decimal("0")
# We need to re-iterate or cache L.
# Simpler: Just re-sum L from active strats for this coin.
for key, strat in self.strategies.items():
if self.strategy_states[key]['coin'] == coin:
total_L += strat.L
if total_L > 0 and price > 0:
gamma = (Decimal("0.5") * total_L * (price ** Decimal("-1.5")))
if gamma > 0:
# Equilibrium Price (Diff = 0)
p_mid = price + (diff / gamma)
# Triggers
p_buy = price + (dynamic_thresh + diff) / gamma
p_sell = price - (dynamic_thresh - diff) / gamma
if int(time.time()) % 30 == 0:
pad = " " if coin == "BNB" else ""
adj_val = data.get('adj_pct', Decimal("0")) * 100
# PnL Calc
unrealized = current_pnls.get(coin, Decimal("0"))
closed_pnl_total = Decimal("0")
fees_total = Decimal("0")
for k, s_state in self.strategy_states.items():
if s_state['coin'] == coin:
closed_pnl_total += s_state.get('hedge_TotPnL', Decimal("0"))
fees_total += s_state.get('fees', Decimal("0"))
total_pnl = (closed_pnl_total - fees_total) + unrealized
pnl_pad = " " if unrealized >= 0 else ""
tot_pnl_pad = " " if total_pnl >= 0 else ""
logger.info(f"[IDLE] {coin} | Px: {price:.2f}{pad} | M: {p_mid:.1f}{pad} | B: {p_buy:.1f}{pad} / S: {p_sell:.1f}{pad} | delta: {target_position:.4f}({diff:+.4f}) | Adj: {adj_val:+.2f}%, Vol: {vol_mult:.2f}, Thr: {dynamic_thresh:.4f} | PnL: {unrealized:.2f}{pnl_pad} | TotPnL: {total_pnl:.2f}{tot_pnl_pad}")
else:
if int(time.time()) % 30 == 0:
logger.info(f"[IDLE] {coin} | Px: {price:.2f} | delta: {target_position:.4f} | Diff: {diff:.4f} (Thresh: {dynamic_thresh:.4f})")
else:
if int(time.time()) % 30 == 0:
logger.info(f"[IDLE] {coin} | Px: {price:.2f} | delta: {target_position:.4f} | Diff: {diff:.4f} (Thresh: {dynamic_thresh:.4f})")
time.sleep(DEFAULT_STRATEGY.get("CHECK_INTERVAL", 1))
except KeyboardInterrupt:
logger.info("Stopping...")
break
except Exception as e:
logger.error(f"Loop Error: {e}", exc_info=True)
time.sleep(5)
if __name__ == "__main__":
hedger = UnifiedHedger()
hedger.run()