Files
uniswap_auto_clp/clp_hedger.py

861 lines
35 KiB
Python

import os
import time
import logging
import sys
import json
import math
from decimal import Decimal, getcontext, ROUND_DOWN
from typing import Optional, Dict, Any, List, Union
from dotenv import load_dotenv
# --- FIX: Add project root to sys.path to import local modules ---
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
sys.path.append(project_root)
# Import local modules
try:
from logging_utils import setup_logging
except ImportError:
setup_logging = None
# Ensure root logger is clean if we can't use setup_logging
logging.getLogger().handlers.clear()
logging.basicConfig(level=logging.INFO)
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants
# Load environment variables
dotenv_path = os.path.join(current_dir, '.env')
load_dotenv(dotenv_path if os.path.exists(dotenv_path) else None)
# --- LOGGING SETUP ---
# Ensure logs directory exists
log_dir = os.path.join(current_dir, 'logs')
os.makedirs(log_dir, exist_ok=True)
# Custom Filter for Millisecond Unix Timestamp (Matching Manager style)
class UnixMsLogFilter(logging.Filter):
def filter(self, record):
record.unix_ms = int(record.created * 1000)
return True
# Configure Logging
logger = logging.getLogger("HEDGER")
logger.setLevel(logging.INFO)
logger.propagate = False # Prevent double logging from root logger
logger.handlers.clear() # Clear existing handlers to prevent duplicates
# Console Handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_fmt)
logger.addHandler(console_handler)
# File Handler
log_file = os.path.join(log_dir, 'clp_hedger.log')
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.addFilter(UnixMsLogFilter())
file_fmt = logging.Formatter('%(unix_ms)d, %(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_fmt)
logger.addHandler(file_handler)
# --- DECIMAL PRECISION CONFIGURATION ---
getcontext().prec = 50
# --- CONFIGURATION ---
COIN_SYMBOL = "ETH"
CHECK_INTERVAL = 1
LEVERAGE = 5
STATUS_FILE = "hedge_status.json"
# Strategy Zones
ZONE_BOTTOM_HEDGE_LIMIT = Decimal("1.0")
ZONE_CLOSE_START = Decimal("10.0")
ZONE_CLOSE_END = Decimal("11.0")
ZONE_TOP_HEDGE_START = Decimal("10.0")
# Order Settings
PRICE_BUFFER_PCT = Decimal("0.0015") # 0.15%
MIN_THRESHOLD_ETH = Decimal("0.008") # ~$24 @ 3k
MIN_ORDER_VALUE_USD = Decimal("10.0")
# Capital Safety
DYNAMIC_THRESHOLD_MULTIPLIER = Decimal("1.2")
MIN_TIME_BETWEEN_TRADES = 25
MAX_HEDGE_MULTIPLIER = Decimal("1.25")
# Edge Protection
EDGE_PROXIMITY_PCT = Decimal("0.04")
VELOCITY_THRESHOLD_PCT = Decimal("0.0005")
POSITION_OPEN_EDGE_PROXIMITY_PCT = Decimal("0.06")
POSITION_CLOSED_EDGE_PROXIMITY_PCT = Decimal("0.025")
LARGE_HEDGE_MULTIPLIER = Decimal("2.8")
# --- HELPER FUNCTIONS ---
def to_decimal(value: Any) -> Decimal:
"""Safely convert value to Decimal."""
if value is None:
return Decimal("0")
return Decimal(str(value))
def round_to_sz_decimals_precise(amount: Decimal, sz_decimals: int) -> float:
"""Round Decimal amount to specific decimals and return float for SDK."""
if amount == 0:
return 0.0
quantizer = Decimal("1").scaleb(-sz_decimals)
rounded = amount.quantize(quantizer, rounding=ROUND_DOWN)
return float(rounded)
def round_to_sig_figs_precise(x: Decimal, sig_figs: int = 5) -> float:
"""Round Decimal to significant figures and return float for SDK."""
if x == 0:
return 0.0
# Use string formatting for sig figs as it's robust
return float(f"{x:.{sig_figs}g}")
def validate_trade_size(size: Decimal, sz_decimals: int, min_order_value: Decimal, price: Decimal) -> float:
"""Validate trade size against minimums."""
if size <= 0:
return 0.0
# Check minimum order value
order_value = size * price
if order_value < min_order_value:
return 0.0
# Check dust
min_size = Decimal("10") ** (-sz_decimals)
if size < min_size:
return 0.0
return round_to_sz_decimals_precise(size, sz_decimals)
# --- STATE MANAGEMENT ---
def get_active_automatic_position() -> Optional[Dict]:
if not os.path.exists(STATUS_FILE):
return None
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
# Expecting a list of positions
if isinstance(data, list):
for entry in data:
if entry.get('type') == 'AUTOMATIC' and entry.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']:
return entry
# Fallback if single dict (legacy)
elif isinstance(data, dict):
if data.get('type') == 'AUTOMATIC' and data.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']:
return data
except Exception as e:
logger.error(f"ERROR reading status file: {e}")
return None
def update_position_zones_in_json(token_id: int, zones_data: Dict):
if not os.path.exists(STATUS_FILE): return
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
# Ensure list
if isinstance(data, dict): data = [data]
updated = False
for entry in data:
if entry.get('token_id') == token_id:
entry.update(zones_data)
updated = True
break
if updated:
with open(STATUS_FILE, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Updated JSON zones for Position {token_id}")
except Exception as e:
logger.error(f"Error updating JSON zones: {e}")
def update_position_stats(token_id: int, stats_data: Dict):
if not os.path.exists(STATUS_FILE): return
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
if isinstance(data, dict): data = [data]
updated = False
for entry in data:
if entry.get('token_id') == token_id:
entry.update(stats_data)
updated = True
break
if updated:
with open(STATUS_FILE, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Error updating JSON stats: {e}")
# --- STRATEGY CLASS ---
class HyperliquidStrategy:
def __init__(self, entry_amount0: Decimal, entry_amount1: Decimal, target_value: Decimal,
entry_price: Decimal, low_range: Decimal, high_range: Decimal, start_price: Decimal):
self.entry_amount0 = entry_amount0
self.entry_amount1 = entry_amount1
self.target_value = target_value
self.entry_price = entry_price
self.low_range = low_range
self.high_range = high_range
self.start_price = start_price
self.gap = max(Decimal("0.0"), entry_price - start_price)
self.recovery_target = entry_price + (Decimal("2") * self.gap)
self.L = Decimal("0.0")
try:
sqrt_P = entry_price.sqrt()
sqrt_Pa = low_range.sqrt()
sqrt_Pb = high_range.sqrt()
# Method 1: Amount0 (WETH)
if entry_amount0 > 0:
# Assuming amount0 is already in standard units (ETH) from JSON
denom0 = (Decimal("1") / sqrt_P) - (Decimal("1") / sqrt_Pb)
if denom0 > Decimal("1e-10"):
self.L = entry_amount0 / denom0
logger.info(f"Calculated L from Amount0: {self.L:.4f}")
# Method 2: Amount1 (USDC)
if self.L == 0 and entry_amount1 > 0:
denom1 = sqrt_P - sqrt_Pa
if denom1 > Decimal("1e-10"):
self.L = entry_amount1 / denom1
logger.info(f"Calculated L from Amount1: {self.L:.4f}")
# Method 3: Target Value Heuristic
if self.L == 0:
logger.warning("Amounts missing. Using Target Value Heuristic.")
max_eth = target_value / low_range
denom_h = (Decimal("1") / sqrt_Pa) - (Decimal("1") / sqrt_Pb)
if denom_h > 0:
self.L = max_eth / denom_h
logger.info(f"Calculated L from Target Value: {self.L:.4f}")
else:
logger.error("Critical: Invalid Range for L calculation")
except Exception as e:
logger.error(f"Error calculating liquidity: {e}")
sys.exit(1)
def get_pool_delta(self, current_price: Decimal) -> Decimal:
if current_price >= self.high_range:
return Decimal("0.0")
if current_price <= self.low_range:
sqrt_Pa = self.low_range.sqrt()
sqrt_Pb = self.high_range.sqrt()
return self.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb))
sqrt_P = current_price.sqrt()
sqrt_Pb = self.high_range.sqrt()
return self.L * ((Decimal("1")/sqrt_P) - (Decimal("1")/sqrt_Pb))
def calculate_rebalance(self, current_price: Decimal, current_short_size: Decimal) -> Dict:
pool_delta = self.get_pool_delta(current_price)
# Over-Hedge Logic
overhedge_pct = Decimal("0.0")
range_width = self.high_range - self.low_range
if range_width > 0:
price_pct = (current_price - self.low_range) / range_width
# If below 80% of range
if price_pct < Decimal("0.8"):
# Formula: 0.75% boost for every 0.1 drop below 0.8
diff_factor = (Decimal("0.8") - max(Decimal("0.0"), price_pct)) / Decimal("0.1")
overhedge_pct = diff_factor * Decimal("0.0075")
raw_target_short = pool_delta
adjusted_target_short = raw_target_short * (Decimal("1.0") + overhedge_pct)
diff = adjusted_target_short - abs(current_short_size)
return {
"current_price": current_price,
"pool_delta": pool_delta,
"target_short": adjusted_target_short,
"current_short": abs(current_short_size),
"diff": diff,
"action": "SELL" if diff > 0 else "BUY",
"overhedge_pct": overhedge_pct
}
# --- MAIN HEDGER CLASS ---
class ScalperHedger:
def __init__(self):
self.private_key = os.environ.get("HEDGER_PRIVATE_KEY")
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.private_key:
logger.error("No HEDGER_PRIVATE_KEY found in .env")
sys.exit(1)
self.account = Account.from_key(self.private_key)
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address)
# Set Leverage
try:
logger.info(f"Setting leverage to {LEVERAGE}x (Cross)...")
self.exchange.update_leverage(LEVERAGE, COIN_SYMBOL, is_cross=True)
except Exception as e:
logger.error(f"Failed to update leverage: {e}")
self.strategy: Optional[HyperliquidStrategy] = None
self.sz_decimals = self._get_sz_decimals(COIN_SYMBOL)
self.active_position_id = None
# Safety & State
self.last_price: Optional[Decimal] = None
self.last_trade_time = 0
# Velocity Tracking
self.last_price_for_velocity: Optional[Decimal] = None
self.price_history: List[Decimal] = []
self.velocity_history: List[Decimal] = []
# PnL Tracking
self.strategy_start_time = 0
self.last_pnl_check_time = 0
self.trade_history_seen = set()
self.accumulated_pnl = Decimal("0.0")
self.accumulated_fees = Decimal("0.0")
# Logging Rate Limiting
self.last_idle_log_time = 0
self.last_pending_log_time = 0
# Order Tracking
self.original_order_side = None
logger.info(f"[DELTA] Delta-Zero Scalper Hedger initialized. Agent: {self.account.address}")
def calculate_volatility(self) -> Decimal:
"""
Calculate volatility (Standard Deviation %) of price history.
Uses standard deviation of the last N prices relative to the mean.
Returns: Decimal percentage (e.g., 0.001 = 0.1% volatility)
"""
if not self.price_history or len(self.price_history) < 30:
return Decimal("0.0")
try:
# 1. Mean
n = len(self.price_history)
mean = sum(self.price_history) / n
# 2. Variance (Sum of squared diffs)
variance = sum([pow(p - mean, 2) for p in self.price_history]) / n
# 3. Std Dev
std_dev = variance.sqrt()
# 4. Volatility %
if mean == 0: return Decimal("0.0")
return std_dev / mean
except Exception as e:
logger.error(f"Error calculating volatility: {e}")
return Decimal("0.0")
def get_dynamic_edge_proximity(self, price: Decimal) -> Decimal:
"""
Calculate dynamic edge proximity based on position value.
Larger positions need earlier warning (wider buffer).
Base: 4%. Scale: +4% per $10k value. Cap: 15%.
"""
base_pct = Decimal("0.04")
# Estimate Position Value (Use Target Value as proxy for total risk)
# If strategy not ready, fallback to 0
val_usd = self.strategy.target_value if self.strategy else Decimal("0")
# Fallback to current hedge value if target not set
if val_usd == 0 and self.last_price:
pos = self.get_current_position(COIN_SYMBOL)
val_usd = abs(pos['size']) * self.last_price
# Scaling: +0.04 (4%) for every 10,000 USD
# Factor = 0.04 / 10000 = 0.000004
scaling_factor = Decimal("0.000004")
add_pct = val_usd * scaling_factor
total = base_pct + add_pct
# Cap at 15% (0.15) and Min at 4% (0.04)
return max(base_pct, min(Decimal("0.15"), total))
def _init_strategy(self, position_data: Dict):
try:
entry_amount0 = to_decimal(position_data.get('amount0_initial', 0))
entry_amount1 = to_decimal(position_data.get('amount1_initial', 0))
target_value = to_decimal(position_data.get('target_value', 50))
entry_price = to_decimal(position_data['entry_price'])
lower = to_decimal(position_data['range_lower'])
upper = to_decimal(position_data['range_upper'])
start_price = self.get_market_price(COIN_SYMBOL)
if start_price is None:
logger.warning("Waiting for initial price to start strategy...")
return
self.strategy = HyperliquidStrategy(
entry_amount0=entry_amount0,
entry_amount1=entry_amount1,
target_value=target_value,
entry_price=entry_price,
low_range=lower,
high_range=upper,
start_price=start_price
)
# Reset State
self.last_price = start_price
self.last_trade_time = 0
self.price_history = [start_price]
self.strategy_start_time = int(time.time() * 1000)
self.trade_history_seen = set()
self.accumulated_pnl = Decimal("0.0")
self.accumulated_fees = Decimal("0.0")
self.active_position_id = position_data['token_id']
update_position_stats(self.active_position_id, {
"hedge_pnl_realized": 0.0,
"hedge_fees_paid": 0.0
})
logger.info(f"[DELTA] Strat Init: Pos {self.active_position_id} | Range: {lower}-{upper} | Entry: {entry_price} | Start Px: {start_price:.2f}")
except Exception as e:
logger.error(f"Failed to init strategy: {e}")
self.strategy = None
def _get_sz_decimals(self, coin: str) -> int:
try:
meta = self.info.meta()
for asset in meta["universe"]:
if asset["name"] == coin:
return asset["szDecimals"]
return 4
except: return 4
def get_market_price(self, coin: str) -> Optional[Decimal]:
try:
mids = self.info.all_mids()
if coin in mids:
return to_decimal(mids[coin])
except: pass
return None
def get_order_book_levels(self, coin: str) -> Optional[Dict[str, Decimal]]:
try:
snapshot = self.info.l2_snapshot(coin)
if snapshot and 'levels' in snapshot:
bids = snapshot['levels'][0]
asks = snapshot['levels'][1]
if bids and asks:
best_bid = to_decimal(bids[0]['px'])
best_ask = to_decimal(asks[0]['px'])
mid = (best_bid + best_ask) / Decimal("2")
return {'bid': best_bid, 'ask': best_ask, 'mid': mid}
return None
except: return None
def get_current_position(self, coin: str) -> Dict[str, Decimal]:
try:
user_state = self.info.user_state(self.vault_address or self.account.address)
for pos in user_state["assetPositions"]:
if pos["position"]["coin"] == coin:
return {
'size': to_decimal(pos["position"]["szi"]),
'pnl': to_decimal(pos["position"]["unrealizedPnl"])
}
return {'size': Decimal("0"), 'pnl': Decimal("0")}
except: return {'size': Decimal("0"), 'pnl': Decimal("0")}
def get_open_orders(self) -> List[Dict]:
try:
return self.info.open_orders(self.vault_address or self.account.address)
except: return []
def cancel_order(self, coin: str, oid: int):
logger.info(f"Cancelling order {oid}...")
try:
return self.exchange.cancel(coin, oid)
except Exception as e:
logger.error(f"Error cancelling order: {e}")
def place_limit_order(self, coin: str, is_buy: bool, size: Decimal, price: Decimal, order_type: str = "Alo") -> Optional[int]:
# Validate using Decimal logic
validated_size_float = validate_trade_size(size, self.sz_decimals, MIN_ORDER_VALUE_USD, price)
if validated_size_float == 0:
logger.error(f"Trade size {size} invalid after validation")
return None
price_float = round_to_sig_figs_precise(price, 5)
logger.info(f"[ORDER] {order_type.upper()} {coin} {'BUY' if is_buy else 'SELL'} {validated_size_float} @ {price_float}")
try:
order_result = self.exchange.order(coin, is_buy, validated_size_float, price_float, {"limit": {"tif": order_type}}, reduce_only=is_buy)
status = order_result["status"]
if status == "ok":
response_data = order_result["response"]["data"]
if "statuses" in response_data:
status_obj = response_data["statuses"][0]
if "resting" in status_obj:
return status_obj["resting"]["oid"]
elif "filled" in status_obj:
logger.info("Order filled immediately.")
return status_obj["filled"]["oid"]
elif "error" in status_obj:
logger.error(f"Order API Error: {status_obj['error']}")
else:
logger.error(f"Order Failed: {order_result}")
except Exception as e:
logger.error(f"Exception during trade: {e}")
return None
def manage_orders(self) -> bool:
"""Returns True if there is an active order that should prevent new trades."""
open_orders = self.get_open_orders()
my_orders = [o for o in open_orders if o['coin'] == COIN_SYMBOL]
if not my_orders:
return False
if len(my_orders) > 1:
logger.warning("Multiple orders found. Cancelling all.")
for o in my_orders:
self.cancel_order(COIN_SYMBOL, o['oid'])
return False
order = my_orders[0]
oid = order['oid']
order_price = to_decimal(order['limitPx'])
# Check if price moved too far
levels = self.get_order_book_levels(COIN_SYMBOL)
if not levels: return True # Keep order if data missing
current_mid = levels['mid']
pct_diff = abs(current_mid - order_price) / order_price
# Dynamic Buffer logic (Simplified for Decimal)
# Using base buffer for now, can be enhanced
dynamic_buffer = PRICE_BUFFER_PCT
if pct_diff > dynamic_buffer:
logger.info(f"Price moved {pct_diff*100:.3f}% > {dynamic_buffer*100:.3f}%. Cancelling {oid}.")
self.cancel_order(COIN_SYMBOL, oid)
return False
if time.time() - self.last_pending_log_time > 10:
logger.info(f"Order {oid} within range ({pct_diff*100:.3f}% < {dynamic_buffer*100:.3f}%). Waiting.")
self.last_pending_log_time = time.time()
return True
def track_fills_and_pnl(self, force: bool = False):
try:
now = time.time()
if not force and now - self.last_pnl_check_time < 10:
return
self.last_pnl_check_time = now
user_fills = self.info.user_fills(self.vault_address or self.account.address)
new_activity = False
for fill in user_fills:
if fill['coin'] != COIN_SYMBOL: continue
if fill['time'] < self.strategy_start_time: continue
fill_id = fill.get('tid')
if fill_id in self.trade_history_seen: continue
self.trade_history_seen.add(fill_id)
fees = to_decimal(fill['fee'])
pnl = to_decimal(fill['closedPnl'])
self.accumulated_fees += fees
self.accumulated_pnl += pnl
new_activity = True
logger.info(f"[FILL] {fill['side']} {fill['sz']} @ {fill['px']} | Fee: {fees} | PnL: {pnl}")
if new_activity:
# Convert back to float for JSON compatibility
update_position_stats(self.active_position_id, {
"hedge_pnl_realized": round(float(self.accumulated_pnl), 2),
"hedge_fees_paid": round(float(self.accumulated_fees), 2)
})
except Exception as e:
logger.error(f"Error tracking fills: {e}")
def close_all_positions(self, force_taker: bool = False):
logger.info("Closing all positions...")
try:
# 1. Cancel Orders
open_orders = self.get_open_orders()
for o in open_orders:
if o['coin'] == COIN_SYMBOL:
self.cancel_order(COIN_SYMBOL, o['oid'])
# 2. Get Position
pos_data = self.get_current_position(COIN_SYMBOL)
current_pos = pos_data['size']
if current_pos == 0: return
is_buy_to_close = current_pos < 0
# Use Decimal absolute
final_size = abs(current_pos)
# --- MAKER CLOSE ---
if not force_taker:
levels = self.get_order_book_levels(COIN_SYMBOL)
if levels:
tick_size = Decimal("0.1")
price = levels['bid'] - tick_size if is_buy_to_close else levels['ask'] + tick_size
logger.info(f"Attempting Maker Close: {final_size} @ {price}")
oid = self.place_limit_order(COIN_SYMBOL, is_buy_to_close, final_size, price, "Alo")
if oid:
logger.info(f"Close Order Placed: {oid}")
return
# --- TAKER CLOSE ---
market_price = self.get_market_price(COIN_SYMBOL)
if market_price:
# 5% slippage for guaranteed close
slip = Decimal("1.05") if is_buy_to_close else Decimal("0.95")
limit_price = market_price * slip
logger.info(f"Executing Taker Close: {final_size} @ {limit_price}")
self.place_limit_order(COIN_SYMBOL, is_buy_to_close, final_size, limit_price, "Ioc")
self.active_position_id = None
except Exception as e:
logger.error(f"Error closing positions: {e}")
def run(self):
logger.info(f"Starting Hedger Loop ({CHECK_INTERVAL}s)...")
while True:
try:
active_pos = get_active_automatic_position()
# Check Global Disable or Missing Position
if not active_pos or not active_pos.get('hedge_enabled', True):
if self.strategy is not None:
logger.info("Hedge Disabled/Missing. Closing.")
self.close_all_positions(force_taker=True)
self.strategy = None
time.sleep(CHECK_INTERVAL)
continue
# Check CLOSING status (from Manager)
if active_pos.get('status') == 'CLOSING':
logger.info(f"[ALERT] Position {active_pos['token_id']} is CLOSING. Closing Hedge.")
self.close_all_positions(force_taker=True)
self.strategy = None
time.sleep(CHECK_INTERVAL)
continue
# Initialize Strategy if needed
if self.strategy is None or self.active_position_id != active_pos['token_id']:
self._init_strategy(active_pos)
if self.strategy is None:
time.sleep(CHECK_INTERVAL)
continue
# --- CYCLE START ---
# 1. Manage Orders
if self.manage_orders():
time.sleep(CHECK_INTERVAL)
continue
# 2. Market Data
levels = self.get_order_book_levels(COIN_SYMBOL)
if not levels:
time.sleep(0.1)
continue
price = levels['mid']
pos_data = self.get_current_position(COIN_SYMBOL)
current_size = pos_data['size']
current_pnl = pos_data['pnl']
# 3. Calculate Logic
calc = self.strategy.calculate_rebalance(price, current_size)
diff_abs = abs(calc['diff'])
# Update Price History (Max 300 items = 5 mins @ 1s)
self.price_history.append(price)
if len(self.price_history) > 300:
self.price_history.pop(0)
# 4. Thresholds
sqrt_Pa = self.strategy.low_range.sqrt()
sqrt_Pb = self.strategy.high_range.sqrt()
max_potential_eth = self.strategy.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb))
# --- Dynamic Threshold Optimization (ATR/Vol Based) ---
# 1. Calculate Volatility
vol_pct = self.calculate_volatility()
# 2. Volatility Multiplier
# Base Vol assumption: 0.05% (0.0005) per window.
# If Vol is 0.15%, mult = 3x. Cap at 3.0x. Min 1.0x.
base_vol_ref = Decimal("0.0005")
vol_multiplier = Decimal("1.0")
if vol_pct > 0:
vol_multiplier = max(Decimal("1.0"), min(Decimal("3.0"), vol_pct / base_vol_ref))
# 3. Base Threshold Calculation (Range Dependent)
range_width_pct = (self.strategy.high_range - self.strategy.low_range) / self.strategy.low_range
# Ensure we satisfy PRICE_BUFFER_PCT (0.15%) minimum
base_threshold_pct = max(Decimal("0.05"), PRICE_BUFFER_PCT / range_width_pct if range_width_pct > 0 else Decimal("0.05"))
# 4. Apply Multiplier
target_threshold_pct = base_threshold_pct * vol_multiplier
# 5. Safety Cap
# Limit threshold to 20% of the total range width (relative) to prevent staying unhedged too long
# e.g. if range is 1% wide, max threshold is 0.2% deviation.
# If range is 10% wide, max threshold is 2% deviation.
# Absolute hard cap at 15% delta deviation.
safety_cap = min(Decimal("0.15"), Decimal("0.20"))
final_threshold_pct = min(target_threshold_pct, safety_cap)
rebalance_threshold = max(MIN_THRESHOLD_ETH, max_potential_eth * final_threshold_pct)
# Volatility Adjustment (Instantaneous Shock)
# Keep this for sudden spikes that haven't affected the 5-min average yet
if self.last_price:
pct_change = abs(price - self.last_price) / self.last_price
if pct_change > Decimal("0.003"):
rebalance_threshold *= DYNAMIC_THRESHOLD_MULTIPLIER
self.last_price = price
# 5. Check Zones
# Assuming simple in-range check for now as zone logic was complex float math
# Using Strategy ranges
in_range = self.strategy.low_range <= price <= self.strategy.high_range
if not in_range:
if price > self.strategy.high_range:
logger.info(f"[OUT] ABOVE RANGE ({price:.2f}). Closing Hedge.")
self.close_all_positions(force_taker=True)
elif price < self.strategy.low_range:
if int(time.time()) % 20 == 0:
logger.info(f"[HOLD] BELOW RANGE ({price:.2f}). Holding Hedge.")
time.sleep(CHECK_INTERVAL)
continue
# 6. Execute Trade (with Edge Protection)
bypass_cooldown = False
override_reason = ""
# Edge Proximity Check
if active_pos.get('status') == 'OPEN':
# Dynamic Proximity Calculation
position_edge_proximity = self.get_dynamic_edge_proximity(price)
range_width = self.strategy.high_range - self.strategy.low_range
distance_from_bottom = price - self.strategy.low_range
distance_from_top = self.strategy.high_range - price
edge_distance = range_width * position_edge_proximity
is_near_bottom = distance_from_bottom <= edge_distance
is_near_top = distance_from_top <= edge_distance
if is_near_bottom or is_near_top:
bypass_cooldown = True
override_reason = f"EDGE PROXIMITY ({position_edge_proximity*100:.1f}% dyn-edge)"
if is_near_bottom:
override_reason += f" ({distance_from_bottom:.2f} from bottom)"
else:
override_reason += f" ({distance_from_top:.2f} from top)"
# Large Hedge Check
if not bypass_cooldown:
if diff_abs > (rebalance_threshold * LARGE_HEDGE_MULTIPLIER):
bypass_cooldown = True
override_reason = f"LARGE HEDGE NEEDED ({diff_abs:.4f} vs {rebalance_threshold:.4f})"
can_trade = False
cooldown_text = ""
if diff_abs > rebalance_threshold:
if bypass_cooldown:
can_trade = True
logger.info(f"[WARN] COOLDOWN BYPASSED: {override_reason}")
elif time.time() - self.last_trade_time > MIN_TIME_BETWEEN_TRADES:
can_trade = True
else:
time_left = MIN_TIME_BETWEEN_TRADES - (time.time() - self.last_trade_time)
cooldown_text = f" | [WAIT] Cooldown ({time_left:.0f}s)"
if can_trade:
is_buy = (calc['action'] == "BUY")
# Taker execution for rebalance
exec_price = levels['ask'] * Decimal("1.001") if is_buy else levels['bid'] * Decimal("0.999")
urgency = "URGENT" if bypass_cooldown else "NORMAL"
logger.info(f"[TRIG] Rebalance ({urgency}): {calc['action']} {diff_abs:.4f} > {rebalance_threshold:.4f} | Book: {levels['bid']}/{levels['ask']} | Vol: {vol_pct*100:.3f}% x{vol_multiplier:.1f} | Thresh: {final_threshold_pct*100:.1f}%")
oid = self.place_limit_order(COIN_SYMBOL, is_buy, diff_abs, exec_price, "Ioc")
if oid:
self.last_trade_time = time.time()
self.track_fills_and_pnl(force=True)
else:
if time.time() - self.last_idle_log_time > 30:
logger.info(f"[WAIT] Cooldown. Diff: {diff_abs:.4f}{cooldown_text}")
self.last_idle_log_time = time.time()
else:
if time.time() - self.last_idle_log_time > 30:
logger.info(f"[IDLE] Px: {price:.2f} | Diff: {diff_abs:.4f} < {rebalance_threshold:.4f} (Vol: {vol_pct*100:.3f}% x{vol_multiplier:.1f} | Thresh: {final_threshold_pct*100:.1f}%) | TotPnL: {self.accumulated_pnl:.2f}")
self.last_idle_log_time = time.time()
self.track_fills_and_pnl()
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
logger.info("Stopping...")
self.close_all_positions()
break
except Exception as e:
logger.error(f"Loop Error: {e}", exc_info=True)
time.sleep(5)
if __name__ == "__main__":
hedger = ScalperHedger()
hedger.run()