891 lines
42 KiB
Python
891 lines
42 KiB
Python
import os
|
|
import time
|
|
import logging
|
|
import sys
|
|
import json
|
|
import glob
|
|
import math
|
|
from decimal import Decimal, getcontext, ROUND_DOWN
|
|
from typing import Optional, Dict, Any, List, Union
|
|
from dotenv import load_dotenv
|
|
|
|
# --- FIX: Add project root to sys.path to import local modules ---
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_root = os.path.dirname(current_dir)
|
|
sys.path.append(project_root)
|
|
|
|
# Import local modules
|
|
try:
|
|
from logging_utils import setup_logging
|
|
except ImportError:
|
|
setup_logging = None
|
|
# Ensure root logger is clean if we can't use setup_logging
|
|
logging.getLogger().handlers.clear()
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
from eth_account import Account
|
|
from hyperliquid.exchange import Exchange
|
|
from hyperliquid.info import Info
|
|
from hyperliquid.utils import constants
|
|
from clp_config import CLP_PROFILES, DEFAULT_STRATEGY
|
|
|
|
# Load environment variables
|
|
dotenv_path = os.path.join(current_dir, '.env')
|
|
load_dotenv(dotenv_path if os.path.exists(dotenv_path) else None)
|
|
|
|
# --- LOGGING SETUP ---
|
|
# Ensure logs directory exists
|
|
log_dir = os.path.join(current_dir, 'logs')
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# Custom Filter for Millisecond Unix Timestamp (Matching Manager style)
|
|
class UnixMsLogFilter(logging.Filter):
|
|
def filter(self, record):
|
|
record.unix_ms = int(record.created * 1000)
|
|
return True
|
|
|
|
# Configure Logging
|
|
logger = logging.getLogger("CLP_HEDGER")
|
|
logger.setLevel(logging.INFO)
|
|
logger.propagate = False # Prevent double logging from root logger
|
|
logger.handlers.clear() # Clear existing handlers to prevent duplicates
|
|
|
|
# Console Handler
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setLevel(logging.INFO)
|
|
console_fmt = logging.Formatter('%(asctime)s - %(message)s')
|
|
console_handler.setFormatter(console_fmt)
|
|
logger.addHandler(console_handler)
|
|
|
|
# File Handler
|
|
log_file = os.path.join(log_dir, 'clp_hedger.log')
|
|
file_handler = logging.FileHandler(log_file, encoding='utf-8')
|
|
file_handler.setLevel(logging.INFO)
|
|
file_handler.addFilter(UnixMsLogFilter())
|
|
file_fmt = logging.Formatter('%(unix_ms)d, %(asctime)s - %(message)s')
|
|
file_handler.setFormatter(file_fmt)
|
|
logger.addHandler(file_handler)
|
|
|
|
# --- DECIMAL PRECISION CONFIGURATION ---
|
|
getcontext().prec = 50
|
|
|
|
# --- CONFIGURATION ---
|
|
# Settings are loaded from clp_config.py into self.coin_configs
|
|
|
|
# --- HELPER FUNCTIONS ---
|
|
|
|
def to_decimal(value: Any) -> Decimal:
|
|
"""Safely convert value to Decimal."""
|
|
if value is None:
|
|
return Decimal("0")
|
|
return Decimal(str(value))
|
|
|
|
def round_to_sz_decimals_precise(amount: Decimal, sz_decimals: int) -> float:
|
|
"""Round Decimal amount to specific decimals and return float for SDK."""
|
|
if amount == 0:
|
|
return 0.0
|
|
|
|
quantizer = Decimal("1").scaleb(-sz_decimals)
|
|
rounded = amount.quantize(quantizer, rounding=ROUND_DOWN)
|
|
return float(rounded)
|
|
|
|
def round_to_sig_figs_precise(x: Decimal, sig_figs: int = 5) -> float:
|
|
"""Round Decimal to significant figures and return float for SDK."""
|
|
if x == 0:
|
|
return 0.0
|
|
# Use string formatting for sig figs as it's robust
|
|
return float(f"{x:.{sig_figs}g}")
|
|
|
|
def validate_trade_size(size: Decimal, sz_decimals: int, min_order_value: Decimal, price: Decimal) -> float:
|
|
"""Validate trade size against minimums."""
|
|
if size <= 0:
|
|
return 0.0
|
|
|
|
# Check minimum order value
|
|
order_value = size * price
|
|
if order_value < min_order_value:
|
|
return 0.0
|
|
|
|
# Check dust
|
|
min_size = Decimal("10") ** (-sz_decimals)
|
|
if size < min_size:
|
|
return 0.0
|
|
|
|
return round_to_sz_decimals_precise(size, sz_decimals)
|
|
|
|
def update_position_stats(file_path: str, token_id: int, stats_data: Dict):
|
|
if not os.path.exists(file_path): return
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
if isinstance(data, dict): data = [data]
|
|
|
|
updated = False
|
|
for entry in data:
|
|
if entry.get('token_id') == token_id:
|
|
entry.update(stats_data)
|
|
updated = True
|
|
break
|
|
|
|
if updated:
|
|
with open(file_path, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Error updating JSON stats in {file_path}: {e}")
|
|
|
|
# --- STRATEGY CLASS ---
|
|
|
|
class HyperliquidStrategy:
|
|
def __init__(self, entry_amount0: Decimal, entry_amount1: Decimal, target_value: Decimal,
|
|
entry_price: Decimal, low_range: Decimal, high_range: Decimal, start_price: Decimal,
|
|
liquidity: int = 0, liquidity_scale: Decimal = Decimal("1e-12")):
|
|
self.entry_amount0 = entry_amount0
|
|
self.entry_amount1 = entry_amount1
|
|
self.target_value = target_value
|
|
self.entry_price = entry_price
|
|
self.low_range = low_range
|
|
self.high_range = high_range
|
|
self.start_price = start_price
|
|
|
|
self.gap = max(Decimal("0.0"), entry_price - start_price)
|
|
self.recovery_target = entry_price + (Decimal("2") * self.gap)
|
|
|
|
self.L = Decimal("0.0")
|
|
|
|
# Priority: Use exact Liquidity from Contract if available
|
|
if liquidity > 0:
|
|
self.L = Decimal(liquidity) * liquidity_scale
|
|
# Calculate implied delta at entry for verification
|
|
implied_delta = self.get_pool_delta(entry_price)
|
|
# Log removed to reduce spam in unified logger
|
|
# logger.info(f"Using Exact Liquidity: {self.L:.4f} (Raw: {liquidity}, Scale: {liquidity_scale})")
|
|
else:
|
|
try:
|
|
sqrt_P = entry_price.sqrt()
|
|
sqrt_Pa = low_range.sqrt()
|
|
sqrt_Pb = high_range.sqrt()
|
|
|
|
if entry_amount0 > 0:
|
|
denom0 = (Decimal("1") / sqrt_P) - (Decimal("1") / sqrt_Pb)
|
|
if denom0 > Decimal("1e-10"):
|
|
self.L = entry_amount0 / denom0
|
|
|
|
if self.L == 0 and entry_amount1 > 0:
|
|
denom1 = sqrt_P - sqrt_Pa
|
|
if denom1 > Decimal("1e-10"):
|
|
self.L = entry_amount1 / denom1
|
|
|
|
if self.L == 0:
|
|
max_eth = target_value / low_range
|
|
denom_h = (Decimal("1") / sqrt_Pa) - (Decimal("1") / sqrt_Pb)
|
|
if denom_h > 0:
|
|
self.L = max_eth / denom_h
|
|
except Exception as e:
|
|
logger.error(f"Error calculating liquidity: {e}")
|
|
|
|
def get_pool_delta(self, current_price: Decimal) -> Decimal:
|
|
if current_price >= self.high_range:
|
|
return Decimal("0.0")
|
|
|
|
if current_price <= self.low_range:
|
|
sqrt_Pa = self.low_range.sqrt()
|
|
sqrt_Pb = self.high_range.sqrt()
|
|
return self.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb))
|
|
|
|
sqrt_P = current_price.sqrt()
|
|
sqrt_Pb = self.high_range.sqrt()
|
|
return self.L * (sqrt_Pb - sqrt_P) / (sqrt_P * sqrt_Pb)
|
|
|
|
def get_compensation_boost(self) -> Decimal:
|
|
if self.low_range <= 0: return Decimal("0.075")
|
|
range_width_pct = (self.high_range - self.low_range) / self.low_range
|
|
|
|
# Use default strategy values if not available in instance context,
|
|
# but typically these are constant. For now hardcode per plan or use safe defaults.
|
|
# Since this is inside Strategy which doesn't know about global config easily,
|
|
# we'll implement the logic defined in the plan directly.
|
|
|
|
if range_width_pct < Decimal("0.02"): # <2% range
|
|
return Decimal("0.15") # Double protection for narrow ranges
|
|
elif range_width_pct < Decimal("0.05"): # <5% range
|
|
return Decimal("0.10") # Moderate for medium ranges
|
|
else: # >=5% range
|
|
return Decimal("0.075") # Standard for wide ranges
|
|
|
|
def calculate_rebalance(self, current_price: Decimal, current_short_size: Decimal, strategy_type: str = "ASYMMETRIC") -> Dict:
|
|
# Note: current_short_size here is virtual (just for this specific strategy),
|
|
# but the unified hedger will use the 'target_short' output primarily.
|
|
|
|
pool_delta = self.get_pool_delta(current_price)
|
|
|
|
# --- ASYMMETRIC COMPENSATION ---
|
|
adj_pct = Decimal("0.0")
|
|
|
|
if strategy_type == "ASYMMETRIC":
|
|
range_width = self.high_range - self.low_range
|
|
|
|
if range_width > 0:
|
|
dist = current_price - self.entry_price
|
|
half_width = range_width / Decimal("2")
|
|
norm_dist = dist / half_width
|
|
max_boost = self.get_compensation_boost()
|
|
adj_pct = -norm_dist * max_boost
|
|
adj_pct = max(-max_boost, min(max_boost, adj_pct))
|
|
|
|
raw_target_short = pool_delta
|
|
adjusted_target_short = raw_target_short * (Decimal("1.0") + adj_pct)
|
|
|
|
diff = adjusted_target_short - abs(current_short_size)
|
|
|
|
return {
|
|
"current_price": current_price,
|
|
"pool_delta": pool_delta,
|
|
"target_short": adjusted_target_short,
|
|
"current_short": abs(current_short_size),
|
|
"diff": diff,
|
|
"action": "SELL" if diff > 0 else "BUY",
|
|
"adj_pct": adj_pct
|
|
}
|
|
|
|
# --- UNIFIED HEDGER CLASS ---
|
|
|
|
class UnifiedHedger:
|
|
def __init__(self):
|
|
self.private_key = os.environ.get("HEDGER_PRIVATE_KEY")
|
|
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
|
|
|
|
if not self.private_key:
|
|
logger.error("No HEDGER_PRIVATE_KEY found in .env")
|
|
sys.exit(1)
|
|
|
|
self.account = Account.from_key(self.private_key)
|
|
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
|
|
self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address)
|
|
|
|
# Maps (file_path, token_id) -> Strategy Instance
|
|
self.strategies: Dict[tuple, HyperliquidStrategy] = {}
|
|
# Maps (file_path, token_id) -> State Data (accumulated pnl etc)
|
|
self.strategy_states: Dict[tuple, Dict] = {}
|
|
|
|
# Unified State
|
|
self.coin_configs: Dict[str, Dict] = {} # Symbol -> Config (thresholds, decimals)
|
|
self.active_coins = set()
|
|
self.api_backoff_until = 0
|
|
|
|
# Market Data Cache
|
|
self.last_prices = {}
|
|
self.price_history = {} # Symbol -> List[Decimal]
|
|
self.last_trade_times = {} # Symbol -> timestamp
|
|
self.last_idle_log_times = {} # Symbol -> timestamp
|
|
|
|
# Shadow Orders (Global List)
|
|
self.shadow_orders = []
|
|
|
|
self.startup_time = time.time()
|
|
|
|
logger.info(f"[CLP_HEDGER] Master Hedger initialized. Agent: {self.account.address}")
|
|
self._init_coin_configs()
|
|
|
|
def _init_coin_configs(self):
|
|
"""Pre-load configuration for known coins from CLP_PROFILES."""
|
|
for profile_key, profile_data in CLP_PROFILES.items():
|
|
symbol = profile_data.get("COIN_SYMBOL")
|
|
if symbol:
|
|
if symbol not in self.coin_configs:
|
|
# Init with Defaults
|
|
self.coin_configs[symbol] = DEFAULT_STRATEGY.copy()
|
|
self.coin_configs[symbol]["sz_decimals"] = 4 # Will be updated by API
|
|
|
|
# Update with Profile Specifics
|
|
self.coin_configs[symbol].update(profile_data)
|
|
|
|
def _get_sz_decimals(self, coin: str) -> int:
|
|
try:
|
|
meta = self.info.meta()
|
|
for asset in meta["universe"]:
|
|
if asset["name"] == coin:
|
|
return asset["szDecimals"]
|
|
return 4
|
|
except: return 4
|
|
|
|
def update_coin_decimals(self):
|
|
try:
|
|
meta = self.info.meta()
|
|
for asset in meta["universe"]:
|
|
coin = asset["name"]
|
|
if coin in self.coin_configs:
|
|
self.coin_configs[coin]["sz_decimals"] = asset["szDecimals"]
|
|
elif coin in self.active_coins:
|
|
self.coin_configs[coin] = DEFAULT_STRATEGY.copy()
|
|
self.coin_configs[coin]["sz_decimals"] = asset["szDecimals"]
|
|
except Exception as e:
|
|
logger.error(f"Failed to update coin decimals: {e}")
|
|
|
|
def calculate_volatility(self, coin: str) -> Decimal:
|
|
history = self.price_history.get(coin, [])
|
|
if not history or len(history) < 30:
|
|
return Decimal("0.0")
|
|
try:
|
|
n = len(history)
|
|
mean = sum(history) / n
|
|
variance = sum([pow(p - mean, 2) for p in history]) / n
|
|
std_dev = variance.sqrt()
|
|
if mean == 0: return Decimal("0.0")
|
|
return std_dev / mean
|
|
except: return Decimal("0.0")
|
|
|
|
def check_shadow_orders(self, l2_snapshots: Dict):
|
|
"""Check if pending shadow (theoretical Maker) orders would have been filled."""
|
|
if not self.shadow_orders: return
|
|
|
|
now = time.time()
|
|
remaining = []
|
|
|
|
for order in self.shadow_orders:
|
|
coin = order.get('coin')
|
|
if not coin or coin not in l2_snapshots:
|
|
remaining.append(order)
|
|
continue
|
|
|
|
levels = l2_snapshots[coin]['levels']
|
|
# Snapshot: [ [ {px, sz, n}, ... ], [ {px, sz, n}, ... ] ] -> [Bids, Asks]
|
|
# Bids = levels[0], Asks = levels[1]
|
|
if not levels[0] or not levels[1]:
|
|
remaining.append(order)
|
|
continue
|
|
|
|
best_bid = to_decimal(levels[0][0]['px'])
|
|
best_ask = to_decimal(levels[1][0]['px'])
|
|
|
|
filled = False
|
|
if order['side'] == 'BUY':
|
|
# Filled if Ask <= Our Bid
|
|
if best_ask <= order['price']: filled = True
|
|
else: # SELL
|
|
# Filled if Bid >= Our Ask
|
|
if best_bid >= order['price']: filled = True
|
|
|
|
if filled:
|
|
timeout = self.coin_configs.get(coin, {}).get("SHADOW_ORDER_TIMEOUT", 600)
|
|
duration = now - (order['expires_at'] - timeout)
|
|
logger.info(f"[SHADOW] ✅ SUCCESS: {coin} Maker {order['side']} @ {order['price']:.2f} filled in {duration:.1f}s")
|
|
elif now > order['expires_at']:
|
|
logger.info(f"[SHADOW] ❌ FAILED: {coin} Maker {order['side']} @ {order['price']:.2f} timed out")
|
|
else:
|
|
remaining.append(order)
|
|
|
|
self.shadow_orders = remaining
|
|
|
|
def scan_strategies(self) -> bool:
|
|
"""Scans all *_status.json files and updates active strategies. Returns False if any read failed."""
|
|
status_files = glob.glob(os.path.join(current_dir, "*_status.json"))
|
|
# logger.info(f"[DEBUG] Scanning {len(status_files)} status files...")
|
|
|
|
active_ids = set()
|
|
successful_files = set()
|
|
has_errors = False
|
|
|
|
for file_path in status_files:
|
|
# Determine Profile/Coin from filename or content?
|
|
# Filename convention: {TARGET_DEX}_status.json
|
|
filename = os.path.basename(file_path)
|
|
dex_name = filename.replace("_status.json", "")
|
|
|
|
profile = CLP_PROFILES.get(dex_name)
|
|
if not profile:
|
|
# Fallback: Try to guess or skip
|
|
continue
|
|
|
|
coin_symbol = profile.get("COIN_SYMBOL", "ETH")
|
|
self.active_coins.add(coin_symbol)
|
|
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
content = f.read().strip()
|
|
if not content:
|
|
# Empty file (being written?) -> treat as error to be safe
|
|
raise ValueError("Empty file")
|
|
data = json.loads(content)
|
|
|
|
successful_files.add(file_path) # Mark as safe to sync
|
|
|
|
if isinstance(data, dict): data = [data]
|
|
|
|
for entry in data:
|
|
if entry.get('type') == 'AUTOMATIC' and entry.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']:
|
|
token_id = entry['token_id']
|
|
key = (file_path, token_id)
|
|
active_ids.add(key)
|
|
|
|
# Init or Update Strategy
|
|
if key not in self.strategies:
|
|
self._init_single_strategy(key, entry, coin_symbol)
|
|
# Init Status
|
|
if key in self.strategy_states:
|
|
self.strategy_states[key]['status'] = entry.get('status', 'OPEN')
|
|
else:
|
|
# Refresh PnL/Fees from file in case they changed
|
|
if key in self.strategy_states:
|
|
self.strategy_states[key]['pnl'] = to_decimal(entry.get('hedge_pnl_realized', 0))
|
|
self.strategy_states[key]['fees'] = to_decimal(entry.get('hedge_fees_paid', 0))
|
|
self.strategy_states[key]['status'] = entry.get('status', 'OPEN')
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading {filename}: {e}. Skipping updates.")
|
|
has_errors = True
|
|
|
|
# Remove stale strategies ONLY from files we successfully read
|
|
current_keys = list(self.strategies.keys())
|
|
for k in current_keys:
|
|
file_origin = k[0]
|
|
# If the file is gone (not in status_files) OR we successfully read it and the key is missing:
|
|
if file_origin not in status_files or (file_origin in successful_files and k not in active_ids):
|
|
logger.info(f"Strategy {k[1]} removed (Closed/Gone).")
|
|
del self.strategies[k]
|
|
if k in self.strategy_states: del self.strategy_states[k]
|
|
|
|
return not has_errors
|
|
|
|
def _init_single_strategy(self, key, position_data, coin_symbol):
|
|
try:
|
|
entry_amount0 = to_decimal(position_data.get('amount0_initial', 0))
|
|
entry_amount1 = to_decimal(position_data.get('amount1_initial', 0))
|
|
target_value = to_decimal(position_data.get('target_value', 50))
|
|
entry_price = to_decimal(position_data['entry_price'])
|
|
lower = to_decimal(position_data['range_lower'])
|
|
upper = to_decimal(position_data['range_upper'])
|
|
liquidity_val = int(position_data.get('liquidity', 0))
|
|
|
|
d0 = int(position_data.get('token0_decimals', 18))
|
|
d1 = int(position_data.get('token1_decimals', 6))
|
|
liquidity_scale = Decimal("10") ** Decimal(str(-(d0 + d1) / 2))
|
|
|
|
start_price = self.last_prices.get(coin_symbol)
|
|
if start_price is None:
|
|
# Will init next loop when price available
|
|
return
|
|
|
|
strat = HyperliquidStrategy(
|
|
entry_amount0, entry_amount1, target_value, entry_price, lower, upper, start_price,
|
|
liquidity_val, liquidity_scale
|
|
)
|
|
|
|
# Fix: Use persistent start time from JSON to track all fills
|
|
ts_open = position_data.get('timestamp_open')
|
|
start_time_ms = int(ts_open * 1000) if ts_open else int(time.time() * 1000)
|
|
|
|
self.strategies[key] = strat
|
|
self.strategy_states[key] = {
|
|
"coin": coin_symbol,
|
|
"start_time": start_time_ms,
|
|
"pnl": to_decimal(position_data.get('hedge_pnl_realized', 0)),
|
|
"fees": to_decimal(position_data.get('hedge_fees_paid', 0)),
|
|
"hedge_TotPnL": to_decimal(position_data.get('hedge_TotPnL', 0)), # NEW: Total Closed PnL
|
|
"entry_price": entry_price, # Store for fishing logic
|
|
"status": position_data.get('status', 'OPEN')
|
|
}
|
|
logger.info(f"[STRAT] Init {key[1]} ({coin_symbol}) | Range: {lower}-{upper}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to init strategy {key[1]}: {e}")
|
|
|
|
def place_limit_order(self, coin: str, is_buy: bool, size: Decimal, price: Decimal, order_type: str = "Alo") -> Optional[int]:
|
|
config = self.coin_configs.get(coin, {})
|
|
sz_decimals = config.get("sz_decimals", 4)
|
|
min_order = config.get("MIN_ORDER_VALUE_USD", Decimal("10.0"))
|
|
|
|
validated_size_float = validate_trade_size(size, sz_decimals, min_order, price)
|
|
if validated_size_float == 0:
|
|
return None
|
|
|
|
price_float = round_to_sig_figs_precise(price, 5)
|
|
|
|
logger.info(f"[ORDER] {order_type.upper()} {coin} {'BUY' if is_buy else 'SELL'} {validated_size_float} @ {price_float}")
|
|
|
|
try:
|
|
order_result = self.exchange.order(coin, is_buy, validated_size_float, price_float, {"limit": {"tif": order_type}}, reduce_only=is_buy)
|
|
status = order_result["status"]
|
|
|
|
if status == "ok":
|
|
response_data = order_result["response"]["data"]
|
|
if "statuses" in response_data:
|
|
status_obj = response_data["statuses"][0]
|
|
if "resting" in status_obj:
|
|
return status_obj["resting"]["oid"]
|
|
elif "filled" in status_obj:
|
|
logger.info("Order filled immediately.")
|
|
return status_obj["filled"]["oid"]
|
|
elif "error" in status_obj:
|
|
err_msg = status_obj['error']
|
|
if "Post only order would have immediately matched" in err_msg:
|
|
logger.warning(f"[RETRY] Maker order rejected (Crossed). Will retry.")
|
|
else:
|
|
logger.error(f"Order API Error: {err_msg}")
|
|
else:
|
|
logger.error(f"Order Failed: {order_result}")
|
|
except Exception as e:
|
|
if "429" in str(e):
|
|
logger.warning(f"Rate limit hit (429). Backing off.")
|
|
self.api_backoff_until = time.time() + 30
|
|
else:
|
|
logger.error(f"Exception during trade: {e}")
|
|
return None
|
|
|
|
def get_open_orders(self) -> List[Dict]:
|
|
try:
|
|
return self.info.open_orders(self.vault_address or self.account.address)
|
|
except:
|
|
return []
|
|
|
|
def cancel_order(self, coin: str, oid: int):
|
|
try:
|
|
self.exchange.cancel(coin, oid)
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling order: {e}")
|
|
|
|
def _update_closed_pnl(self, coin: str):
|
|
"""Fetch fills from API and sum closedPnl and fees for active strategies."""
|
|
try:
|
|
# 1. Identify relevant strategies for this coin
|
|
active_strats = [k for k, v in self.strategy_states.items() if v['coin'] == coin]
|
|
if not active_strats: return
|
|
|
|
# 2. Fetch all fills (This is heavy, maybe cache or limit?)
|
|
# SDK user_fills returns recent fills.
|
|
fills = self.info.user_fills(self.vault_address or self.account.address)
|
|
|
|
for key in active_strats:
|
|
start_time = self.strategy_states[key]['start_time']
|
|
total_closed_pnl = Decimal("0")
|
|
total_fees = Decimal("0")
|
|
|
|
for fill in fills:
|
|
if fill['coin'] == coin:
|
|
# Check timestamp
|
|
if fill['time'] >= start_time:
|
|
# Sum closedPnl
|
|
total_closed_pnl += to_decimal(fill.get('closedPnl', 0))
|
|
# Sum fees
|
|
total_fees += to_decimal(fill.get('fee', 0))
|
|
|
|
# Update State
|
|
self.strategy_states[key]['hedge_TotPnL'] = total_closed_pnl
|
|
self.strategy_states[key]['fees'] = total_fees
|
|
|
|
# Write to JSON
|
|
file_path, token_id = key
|
|
update_position_stats(file_path, token_id, {
|
|
"hedge_TotPnL": float(total_closed_pnl),
|
|
"hedge_fees_paid": float(total_fees)
|
|
})
|
|
logger.info(f"[PnL] Updated {coin} | Closed PnL: ${total_closed_pnl:.2f} | Fees: ${total_fees:.2f}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update closed PnL/Fees for {coin}: {e}")
|
|
|
|
def run(self):
|
|
logger.info("Starting Unified Hedger Loop...")
|
|
self.update_coin_decimals()
|
|
|
|
# --- LOG SETTINGS ON START ---
|
|
logger.info("=== HEDGER CONFIGURATION ===")
|
|
for symbol, config in self.coin_configs.items():
|
|
logger.info(f"--- {symbol} ---")
|
|
for k, v in config.items():
|
|
logger.info(f" {k}: {v}")
|
|
logger.info("============================")
|
|
|
|
while True:
|
|
try:
|
|
# 1. API Backoff
|
|
if time.time() < self.api_backoff_until:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# 2. Update Strategies
|
|
if not self.scan_strategies():
|
|
logger.warning("Strategy scan failed (read error). Skipping execution tick.")
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# 3. Fetch Market Data (Centralized)
|
|
try:
|
|
mids = self.info.all_mids()
|
|
user_state = self.info.user_state(self.vault_address or self.account.address)
|
|
open_orders = self.get_open_orders()
|
|
l2_snapshots = {} # Cache for snapshots
|
|
except Exception as e:
|
|
logger.error(f"API Error fetching data: {e}")
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Map Open Orders
|
|
orders_map = {}
|
|
for o in open_orders:
|
|
c = o['coin']
|
|
if c not in orders_map: orders_map[c] = []
|
|
orders_map[c].append(o)
|
|
|
|
# Parse User State
|
|
account_value = Decimal("0")
|
|
if "marginSummary" in user_state and "accountValue" in user_state["marginSummary"]:
|
|
account_value = to_decimal(user_state["marginSummary"]["accountValue"])
|
|
|
|
# Map current positions
|
|
current_positions = {} # Coin -> Size
|
|
current_pnls = {} # Coin -> Unrealized PnL
|
|
current_entry_pxs = {} # Coin -> Entry Price (NEW)
|
|
for pos in user_state["assetPositions"]:
|
|
c = pos["position"]["coin"]
|
|
s = to_decimal(pos["position"]["szi"])
|
|
u = to_decimal(pos["position"]["unrealizedPnl"])
|
|
e = to_decimal(pos["position"]["entryPx"])
|
|
current_positions[c] = s
|
|
current_pnls[c] = u
|
|
current_entry_pxs[c] = e
|
|
|
|
# 4. Aggregate Targets
|
|
# Coin -> { 'target_short': Decimal, 'contributors': int, 'is_at_edge': bool }
|
|
aggregates = {}
|
|
|
|
# First, update all prices from mids for active coins
|
|
for coin in self.active_coins:
|
|
if coin in mids:
|
|
price = to_decimal(mids[coin])
|
|
self.last_prices[coin] = price
|
|
|
|
# Update Price History
|
|
if coin not in self.price_history: self.price_history[coin] = []
|
|
self.price_history[coin].append(price)
|
|
if len(self.price_history[coin]) > 300: self.price_history[coin].pop(0)
|
|
|
|
for key, strat in self.strategies.items():
|
|
coin = self.strategy_states[key]['coin']
|
|
status = self.strategy_states[key].get('status', 'OPEN')
|
|
if coin not in self.last_prices: continue
|
|
price = self.last_prices[coin]
|
|
|
|
# Get Config & Strategy Type
|
|
config = self.coin_configs.get(coin, {})
|
|
strategy_type = config.get("HEDGE_STRATEGY", "ASYMMETRIC")
|
|
|
|
# Calc Logic
|
|
calc = strat.calculate_rebalance(price, Decimal("0"), strategy_type)
|
|
|
|
if coin not in aggregates:
|
|
aggregates[coin] = {'target_short': Decimal("0"), 'contributors': 0, 'is_at_edge': False, 'is_at_bottom_edge': False, 'adj_pct': Decimal("0"), 'is_closing': False}
|
|
|
|
if status == 'CLOSING':
|
|
# If Closing, we want target to be 0 for this strategy
|
|
logger.info(f"[STRAT] {key[1]} is CLOSING -> Force Target 0")
|
|
aggregates[coin]['is_closing'] = True
|
|
# Do not add to target_short
|
|
else:
|
|
aggregates[coin]['target_short'] += calc['target_short']
|
|
|
|
aggregates[coin]['contributors'] += 1
|
|
aggregates[coin]['adj_pct'] = calc['adj_pct']
|
|
|
|
# Check Edge Proximity for Cleanup
|
|
config = self.coin_configs.get(coin, {})
|
|
enable_cleanup = config.get("ENABLE_EDGE_CLEANUP", True)
|
|
cleanup_margin = config.get("EDGE_CLEANUP_MARGIN_PCT", Decimal("0.02"))
|
|
|
|
if enable_cleanup:
|
|
dist_bottom_pct = (price - strat.low_range) / strat.low_range
|
|
dist_top_pct = (strat.high_range - price) / strat.high_range
|
|
range_width_pct = (strat.high_range - strat.low_range) / strat.low_range
|
|
safety_margin_pct = range_width_pct * cleanup_margin
|
|
|
|
if dist_bottom_pct < safety_margin_pct or dist_top_pct < safety_margin_pct:
|
|
aggregates[coin]['is_at_edge'] = True
|
|
if dist_bottom_pct < safety_margin_pct:
|
|
aggregates[coin]['is_at_bottom_edge'] = True
|
|
|
|
# Check Shadow Orders (Pre-Execution)
|
|
self.check_shadow_orders(l2_snapshots)
|
|
|
|
# 5. Execute Per Coin
|
|
# Union of coins with Active Strategies OR Active Positions
|
|
coins_to_process = set(aggregates.keys())
|
|
for c, pos in current_positions.items():
|
|
if abs(pos) > 0: coins_to_process.add(c)
|
|
|
|
for coin in coins_to_process:
|
|
data = aggregates.get(coin, {'target_short': Decimal("0"), 'contributors': 0, 'is_at_edge': False, 'adj_pct': Decimal("0"), 'is_closing': False})
|
|
|
|
price = self.last_prices.get(coin, Decimal("0"))
|
|
if price == 0: continue
|
|
|
|
target_short_abs = data['target_short']
|
|
target_position = -target_short_abs
|
|
current_pos = current_positions.get(coin, Decimal("0"))
|
|
diff = target_position - current_pos
|
|
diff_abs = abs(diff)
|
|
|
|
# Thresholds
|
|
config = self.coin_configs.get(coin, {})
|
|
min_thresh = config.get("MIN_HEDGE_THRESHOLD", Decimal("0.008"))
|
|
vol_pct = self.calculate_volatility(coin)
|
|
base_vol = Decimal("0.0005")
|
|
vol_mult = max(Decimal("1.0"), min(Decimal("3.0"), vol_pct / base_vol)) if vol_pct > 0 else Decimal("1.0")
|
|
base_rebalance_pct = config.get("BASE_REBALANCE_THRESHOLD_PCT", Decimal("0.20"))
|
|
thresh_pct = min(Decimal("0.15"), base_rebalance_pct * vol_mult)
|
|
dynamic_thresh = max(min_thresh, abs(target_position) * thresh_pct)
|
|
|
|
if data['is_at_edge'] and config.get("ENABLE_EDGE_CLEANUP", True):
|
|
if dynamic_thresh > min_thresh: dynamic_thresh = min_thresh
|
|
|
|
action_needed = diff_abs > dynamic_thresh
|
|
is_buy_bool = diff > 0
|
|
side_str = "BUY" if is_buy_bool else "SELL"
|
|
|
|
# Manage Existing Orders
|
|
existing_orders = orders_map.get(coin, [])
|
|
force_taker_retry = False
|
|
order_matched = False
|
|
price_buffer_pct = config.get("PRICE_BUFFER_PCT", Decimal("0.0015"))
|
|
|
|
for o in existing_orders:
|
|
o_oid = o['oid']
|
|
o_price = to_decimal(o['limitPx'])
|
|
o_side = o['side']
|
|
o_timestamp = o.get('timestamp', int(time.time()*1000))
|
|
is_same_side = (o_side == 'B' and is_buy_bool) or (o_side == 'A' and not is_buy_bool)
|
|
order_age_sec = (int(time.time()*1000) - o_timestamp) / 1000.0
|
|
|
|
if is_same_side and order_age_sec > config.get("MAKER_ORDER_TIMEOUT", 300):
|
|
logger.info(f"[TIMEOUT] {coin} Order {o_oid} expired. Cancelling.")
|
|
self.cancel_order(coin, o_oid)
|
|
continue
|
|
|
|
if config.get("ENABLE_FISHING", False) and is_same_side and order_age_sec > config.get("FISHING_TIMEOUT_FALLBACK", 30):
|
|
logger.info(f"[FISHING] {coin} Order {o_oid} timed out. Retrying as Taker.")
|
|
self.cancel_order(coin, o_oid)
|
|
force_taker_retry = True
|
|
continue
|
|
|
|
if is_same_side and (abs(price - o_price) / price) < price_buffer_pct:
|
|
order_matched = True
|
|
if int(time.time()) % 15 == 0:
|
|
logger.info(f"[WAIT] {coin} Pending {side_str} @ {o_price} | Age: {order_age_sec:.1f}s")
|
|
break
|
|
else:
|
|
logger.info(f"Cancelling stale order {o_oid} ({o_side} @ {o_price})")
|
|
self.cancel_order(coin, o_oid)
|
|
|
|
# Determine Urgency / Bypass Cooldown
|
|
bypass_cooldown = False
|
|
force_maker = False
|
|
if not order_matched and (action_needed or force_taker_retry):
|
|
if force_taker_retry: bypass_cooldown = True
|
|
elif data.get('is_closing', False): bypass_cooldown = True
|
|
elif data.get('contributors', 0) == 0:
|
|
if time.time() - self.startup_time > 5: force_maker = True
|
|
else: continue # Skip startup ghost positions
|
|
|
|
large_hedge_mult = config.get("LARGE_HEDGE_MULTIPLIER", Decimal("5.0"))
|
|
if diff_abs > (dynamic_thresh * large_hedge_mult) and not force_maker and data.get('is_at_edge', False):
|
|
# Prevent IOC for BUYs at bottom edge
|
|
if not (is_buy_bool and data.get('is_at_bottom_edge', False)):
|
|
bypass_cooldown = True
|
|
|
|
# --- ASYMMETRIC HEDGE CHECK ---
|
|
is_asymmetric_blocked = False
|
|
p_mid_asym = Decimal("0")
|
|
strategy_type = config.get("HEDGE_STRATEGY", "ASYMMETRIC")
|
|
|
|
if strategy_type == "ASYMMETRIC" and is_buy_bool and not bypass_cooldown:
|
|
total_L_asym = Decimal("0")
|
|
for k_strat, strat_inst in self.strategies.items():
|
|
if self.strategy_states[k_strat]['coin'] == coin:
|
|
total_L_asym += strat_inst.L
|
|
|
|
gamma_asym = (Decimal("0.5") * total_L_asym * (price ** Decimal("-1.5")))
|
|
if gamma_asym > 0:
|
|
p_mid_asym = price - (diff_abs / gamma_asym)
|
|
if not data.get('is_at_edge', False) and price >= p_mid_asym:
|
|
is_asymmetric_blocked = True
|
|
|
|
# --- EXECUTION ---
|
|
if not order_matched and not is_asymmetric_blocked:
|
|
if action_needed or force_taker_retry:
|
|
last_trade = self.last_trade_times.get(coin, 0)
|
|
min_time = config.get("MIN_TIME_BETWEEN_TRADES", 60)
|
|
|
|
if bypass_cooldown or (time.time() - last_trade > min_time):
|
|
if coin not in l2_snapshots: l2_snapshots[coin] = self.info.l2_snapshot(coin)
|
|
levels = l2_snapshots[coin]['levels']
|
|
if levels[0] and levels[1]:
|
|
bid, ask = to_decimal(levels[0][0]['px']), to_decimal(levels[1][0]['px'])
|
|
if bypass_cooldown and not force_maker:
|
|
exec_price = ask * Decimal("1.001") if is_buy_bool else bid * Decimal("0.999")
|
|
order_type = "Ioc"
|
|
else:
|
|
exec_price = bid if is_buy_bool else ask
|
|
order_type = "Alo"
|
|
|
|
logger.info(f"[TRIG] {coin} {side_str} {diff_abs:.4f} | Cur: {current_pos:.4f} | Type: {order_type}")
|
|
oid = self.place_limit_order(coin, is_buy_bool, diff_abs, exec_price, order_type)
|
|
if oid:
|
|
self.last_trade_times[coin] = time.time()
|
|
if order_type == "Ioc":
|
|
shadow_price = bid if is_buy_bool else ask
|
|
self.shadow_orders.append({'coin': coin, 'side': side_str, 'price': shadow_price, 'expires_at': time.time() + config.get("SHADOW_ORDER_TIMEOUT", 600)})
|
|
|
|
logger.info("Sleeping 10s for position update...")
|
|
time.sleep(10)
|
|
self._update_closed_pnl(coin)
|
|
else:
|
|
# Idle Cleanup
|
|
if existing_orders and not order_matched:
|
|
for o in existing_orders: self.cancel_order(coin, o['oid'])
|
|
|
|
# --- THROTTLED STATUS LOGGING ---
|
|
now = time.time()
|
|
last_log = self.last_idle_log_times.get(coin, 0)
|
|
monitor_interval = config.get("MONITOR_INTERVAL_SECONDS", 60)
|
|
|
|
if now - last_log >= monitor_interval:
|
|
self.last_idle_log_times[coin] = now
|
|
if is_asymmetric_blocked:
|
|
logger.info(f"[ASYMMETRIC] Blocking BUY. Px ({price:.2f}) >= Eq ({p_mid_asym:.2f}) & Not Edge")
|
|
|
|
total_L_log = Decimal("0")
|
|
for k_strat, strat_inst in self.strategies.items():
|
|
if self.strategy_states[k_strat]['coin'] == coin:
|
|
total_L_log += strat_inst.L
|
|
|
|
if total_L_log > 0 and price > 0:
|
|
gamma_log = (Decimal("0.5") * total_L_log * (price ** Decimal("-1.5")))
|
|
if gamma_log > 0:
|
|
p_mid_log = price - (diff / gamma_log) # Corrected equilibrium formula
|
|
p_buy = price + (dynamic_thresh + diff) / gamma_log
|
|
p_sell = price - (dynamic_thresh - diff) / gamma_log
|
|
pad = " " if coin == "BNB" else ""
|
|
|
|
unrealized = current_pnls.get(coin, Decimal("0"))
|
|
closed_pnl = sum(s['hedge_TotPnL'] for s in self.strategy_states.values() if s['coin'] == coin)
|
|
fees = sum(s['fees'] for s in self.strategy_states.values() if s['coin'] == coin)
|
|
total_pnl = (closed_pnl - fees) + unrealized
|
|
|
|
logger.info(f"[IDLE] {coin} | Px: {price:.2f}{pad} | M: {p_mid_log:.1f}{pad} | B: {p_buy:.1f}{pad} / S: {p_sell:.1f}{pad} | delta: {target_position:.4f}({diff:+.4f}) | Adj: {data.get('adj_pct',0)*100:+.2f}%, Vol: {vol_mult:.2f}, Thr: {dynamic_thresh:.4f} | PnL: {unrealized:.2f} | TotPnL: {total_pnl:.2f}")
|
|
else:
|
|
logger.info(f"[IDLE] {coin} | Px: {price:.2f} | delta: {target_position:.4f} | Diff: {diff:.4f} (Thresh: {dynamic_thresh:.4f})")
|
|
else:
|
|
logger.info(f"[IDLE] {coin} | Px: {price:.2f} | delta: {target_position:.4f} | Diff: {diff:.4f}")
|
|
|
|
time.sleep(DEFAULT_STRATEGY.get("CHECK_INTERVAL", 1))
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Stopping...")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Loop Error: {e}", exc_info=True)
|
|
time.sleep(5)
|
|
|
|
if __name__ == "__main__":
|
|
hedger = UnifiedHedger()
|
|
hedger.run()
|