943 lines
39 KiB
Python
943 lines
39 KiB
Python
import os
|
|
import time
|
|
import logging
|
|
import sys
|
|
import json
|
|
import math
|
|
from decimal import Decimal, getcontext, ROUND_DOWN
|
|
from typing import Optional, Dict, Any, List, Union
|
|
from dotenv import load_dotenv
|
|
|
|
# --- FIX: Add project root to sys.path to import local modules ---
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_root = os.path.dirname(current_dir)
|
|
sys.path.append(project_root)
|
|
|
|
# Import local modules
|
|
try:
|
|
from logging_utils import setup_logging
|
|
except ImportError:
|
|
setup_logging = None
|
|
# Ensure root logger is clean if we can't use setup_logging
|
|
logging.getLogger().handlers.clear()
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
from eth_account import Account
|
|
from hyperliquid.exchange import Exchange
|
|
from hyperliquid.info import Info
|
|
from hyperliquid.utils import constants
|
|
|
|
# Load environment variables
|
|
dotenv_path = os.path.join(current_dir, '.env')
|
|
load_dotenv(dotenv_path if os.path.exists(dotenv_path) else None)
|
|
|
|
# --- LOGGING SETUP ---
|
|
# Ensure logs directory exists
|
|
log_dir = os.path.join(current_dir, 'logs')
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# Custom Filter for Millisecond Unix Timestamp (Matching Manager style)
|
|
class UnixMsLogFilter(logging.Filter):
|
|
def filter(self, record):
|
|
record.unix_ms = int(record.created * 1000)
|
|
return True
|
|
|
|
# Configure Logging
|
|
logger = logging.getLogger("HEDGER")
|
|
logger.setLevel(logging.INFO)
|
|
logger.propagate = False # Prevent double logging from root logger
|
|
logger.handlers.clear() # Clear existing handlers to prevent duplicates
|
|
|
|
# Console Handler
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setLevel(logging.INFO)
|
|
console_fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
console_handler.setFormatter(console_fmt)
|
|
logger.addHandler(console_handler)
|
|
|
|
# File Handler
|
|
log_file = os.path.join(log_dir, 'clp_hedger.log')
|
|
file_handler = logging.FileHandler(log_file, encoding='utf-8')
|
|
file_handler.setLevel(logging.INFO)
|
|
file_handler.addFilter(UnixMsLogFilter())
|
|
file_fmt = logging.Formatter('%(unix_ms)d, %(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(file_fmt)
|
|
logger.addHandler(file_handler)
|
|
|
|
# --- DECIMAL PRECISION CONFIGURATION ---
|
|
getcontext().prec = 50
|
|
|
|
# --- CONFIGURATION ---
|
|
COIN_SYMBOL = "ETH"
|
|
CHECK_INTERVAL = 1
|
|
LEVERAGE = 5
|
|
STATUS_FILE = "hedge_status.json"
|
|
|
|
# Strategy Zones
|
|
ZONE_BOTTOM_HEDGE_LIMIT = Decimal("1.0")
|
|
ZONE_CLOSE_START = Decimal("10.0")
|
|
ZONE_CLOSE_END = Decimal("11.0")
|
|
ZONE_TOP_HEDGE_START = Decimal("10.0")
|
|
|
|
# Order Settings
|
|
PRICE_BUFFER_PCT = Decimal("0.0015") # 0.15%
|
|
MIN_THRESHOLD_ETH = Decimal("0.008") # ~$24 @ 3k
|
|
MIN_ORDER_VALUE_USD = Decimal("10.0")
|
|
|
|
# Capital Safety
|
|
DYNAMIC_THRESHOLD_MULTIPLIER = Decimal("1.2")
|
|
MIN_TIME_BETWEEN_TRADES = 25
|
|
MAX_HEDGE_MULTIPLIER = Decimal("1.25")
|
|
|
|
# Edge Protection
|
|
EDGE_PROXIMITY_PCT = Decimal("0.04")
|
|
VELOCITY_THRESHOLD_PCT = Decimal("0.0005")
|
|
POSITION_OPEN_EDGE_PROXIMITY_PCT = Decimal("0.06")
|
|
POSITION_CLOSED_EDGE_PROXIMITY_PCT = Decimal("0.025")
|
|
LARGE_HEDGE_MULTIPLIER = Decimal("2.8")
|
|
|
|
# --- HELPER FUNCTIONS ---
|
|
|
|
def to_decimal(value: Any) -> Decimal:
|
|
"""Safely convert value to Decimal."""
|
|
if value is None:
|
|
return Decimal("0")
|
|
return Decimal(str(value))
|
|
|
|
def round_to_sz_decimals_precise(amount: Decimal, sz_decimals: int) -> float:
|
|
"""Round Decimal amount to specific decimals and return float for SDK."""
|
|
if amount == 0:
|
|
return 0.0
|
|
|
|
quantizer = Decimal("1").scaleb(-sz_decimals)
|
|
rounded = amount.quantize(quantizer, rounding=ROUND_DOWN)
|
|
return float(rounded)
|
|
|
|
def round_to_sig_figs_precise(x: Decimal, sig_figs: int = 5) -> float:
|
|
"""Round Decimal to significant figures and return float for SDK."""
|
|
if x == 0:
|
|
return 0.0
|
|
# Use string formatting for sig figs as it's robust
|
|
return float(f"{x:.{sig_figs}g}")
|
|
|
|
def validate_trade_size(size: Decimal, sz_decimals: int, min_order_value: Decimal, price: Decimal) -> float:
|
|
"""Validate trade size against minimums."""
|
|
if size <= 0:
|
|
return 0.0
|
|
|
|
# Check minimum order value
|
|
order_value = size * price
|
|
if order_value < min_order_value:
|
|
return 0.0
|
|
|
|
# Check dust
|
|
min_size = Decimal("10") ** (-sz_decimals)
|
|
if size < min_size:
|
|
return 0.0
|
|
|
|
return round_to_sz_decimals_precise(size, sz_decimals)
|
|
|
|
# --- STATE MANAGEMENT ---
|
|
|
|
def get_active_automatic_position() -> Optional[Dict]:
|
|
if not os.path.exists(STATUS_FILE):
|
|
return None
|
|
try:
|
|
with open(STATUS_FILE, 'r') as f:
|
|
data = json.load(f)
|
|
# Expecting a list of positions
|
|
if isinstance(data, list):
|
|
for entry in data:
|
|
if entry.get('type') == 'AUTOMATIC' and entry.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']:
|
|
return entry
|
|
# Fallback if single dict (legacy)
|
|
elif isinstance(data, dict):
|
|
if data.get('type') == 'AUTOMATIC' and data.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']:
|
|
return data
|
|
except Exception as e:
|
|
logger.error(f"ERROR reading status file: {e}")
|
|
return None
|
|
|
|
def update_position_zones_in_json(token_id: int, zones_data: Dict):
|
|
if not os.path.exists(STATUS_FILE): return
|
|
try:
|
|
with open(STATUS_FILE, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
# Ensure list
|
|
if isinstance(data, dict): data = [data]
|
|
|
|
updated = False
|
|
for entry in data:
|
|
if entry.get('token_id') == token_id:
|
|
entry.update(zones_data)
|
|
updated = True
|
|
break
|
|
|
|
if updated:
|
|
with open(STATUS_FILE, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
logger.info(f"Updated JSON zones for Position {token_id}")
|
|
except Exception as e:
|
|
logger.error(f"Error updating JSON zones: {e}")
|
|
|
|
def update_position_stats(token_id: int, stats_data: Dict):
|
|
if not os.path.exists(STATUS_FILE): return
|
|
try:
|
|
with open(STATUS_FILE, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
if isinstance(data, dict): data = [data]
|
|
|
|
updated = False
|
|
for entry in data:
|
|
if entry.get('token_id') == token_id:
|
|
entry.update(stats_data)
|
|
updated = True
|
|
break
|
|
|
|
if updated:
|
|
with open(STATUS_FILE, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Error updating JSON stats: {e}")
|
|
|
|
# --- STRATEGY CLASS ---
|
|
|
|
class HyperliquidStrategy:
|
|
def __init__(self, entry_amount0: Decimal, entry_amount1: Decimal, target_value: Decimal,
|
|
entry_price: Decimal, low_range: Decimal, high_range: Decimal, start_price: Decimal):
|
|
self.entry_amount0 = entry_amount0
|
|
self.entry_amount1 = entry_amount1
|
|
self.target_value = target_value
|
|
self.entry_price = entry_price
|
|
self.low_range = low_range
|
|
self.high_range = high_range
|
|
self.start_price = start_price
|
|
|
|
self.gap = max(Decimal("0.0"), entry_price - start_price)
|
|
self.recovery_target = entry_price + (Decimal("2") * self.gap)
|
|
|
|
self.L = Decimal("0.0")
|
|
try:
|
|
sqrt_P = entry_price.sqrt()
|
|
sqrt_Pa = low_range.sqrt()
|
|
sqrt_Pb = high_range.sqrt()
|
|
|
|
# Method 1: Amount0 (WETH)
|
|
if entry_amount0 > 0:
|
|
# Assuming amount0 is already in standard units (ETH) from JSON
|
|
denom0 = (Decimal("1") / sqrt_P) - (Decimal("1") / sqrt_Pb)
|
|
if denom0 > Decimal("1e-10"):
|
|
self.L = entry_amount0 / denom0
|
|
logger.info(f"Calculated L from Amount0: {self.L:.4f}")
|
|
|
|
# Method 2: Amount1 (USDC)
|
|
if self.L == 0 and entry_amount1 > 0:
|
|
denom1 = sqrt_P - sqrt_Pa
|
|
if denom1 > Decimal("1e-10"):
|
|
self.L = entry_amount1 / denom1
|
|
logger.info(f"Calculated L from Amount1: {self.L:.4f}")
|
|
|
|
# Method 3: Target Value Heuristic
|
|
if self.L == 0:
|
|
logger.warning("Amounts missing. Using Target Value Heuristic.")
|
|
max_eth = target_value / low_range
|
|
denom_h = (Decimal("1") / sqrt_Pa) - (Decimal("1") / sqrt_Pb)
|
|
if denom_h > 0:
|
|
self.L = max_eth / denom_h
|
|
logger.info(f"Calculated L from Target Value: {self.L:.4f}")
|
|
else:
|
|
logger.error("Critical: Invalid Range for L calculation")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating liquidity: {e}")
|
|
sys.exit(1)
|
|
|
|
def get_pool_delta(self, current_price: Decimal) -> Decimal:
|
|
if current_price >= self.high_range:
|
|
return Decimal("0.0")
|
|
|
|
if current_price <= self.low_range:
|
|
sqrt_Pa = self.low_range.sqrt()
|
|
sqrt_Pb = self.high_range.sqrt()
|
|
return self.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb))
|
|
|
|
sqrt_P = current_price.sqrt()
|
|
sqrt_Pb = self.high_range.sqrt()
|
|
return self.L * ((Decimal("1")/sqrt_P) - (Decimal("1")/sqrt_Pb))
|
|
|
|
def calculate_rebalance(self, current_price: Decimal, current_short_size: Decimal) -> Dict:
|
|
pool_delta = self.get_pool_delta(current_price)
|
|
|
|
# Over-Hedge Logic
|
|
overhedge_pct = Decimal("0.0")
|
|
range_width = self.high_range - self.low_range
|
|
|
|
if range_width > 0:
|
|
price_pct = (current_price - self.low_range) / range_width
|
|
|
|
# If below 80% of range
|
|
if price_pct < Decimal("0.8"):
|
|
# Formula: 0.75% boost for every 0.1 drop below 0.8
|
|
diff_factor = (Decimal("0.8") - max(Decimal("0.0"), price_pct)) / Decimal("0.1")
|
|
overhedge_pct = diff_factor * Decimal("0.0075")
|
|
|
|
raw_target_short = pool_delta
|
|
adjusted_target_short = raw_target_short * (Decimal("1.0") + overhedge_pct)
|
|
|
|
diff = adjusted_target_short - abs(current_short_size)
|
|
|
|
return {
|
|
"current_price": current_price,
|
|
"pool_delta": pool_delta,
|
|
"target_short": adjusted_target_short,
|
|
"current_short": abs(current_short_size),
|
|
"diff": diff,
|
|
"action": "SELL" if diff > 0 else "BUY",
|
|
"overhedge_pct": overhedge_pct
|
|
}
|
|
|
|
# --- MAIN HEDGER CLASS ---
|
|
|
|
class ScalperHedger:
|
|
def __init__(self):
|
|
self.private_key = os.environ.get("HEDGER_PRIVATE_KEY")
|
|
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
|
|
|
|
if not self.private_key:
|
|
logger.error("No HEDGER_PRIVATE_KEY found in .env")
|
|
sys.exit(1)
|
|
|
|
self.account = Account.from_key(self.private_key)
|
|
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
|
|
self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address)
|
|
|
|
# Set Leverage
|
|
try:
|
|
logger.info(f"Setting leverage to {LEVERAGE}x (Cross)...")
|
|
self.exchange.update_leverage(LEVERAGE, COIN_SYMBOL, is_cross=True)
|
|
except Exception as e:
|
|
logger.error(f"Failed to update leverage: {e}")
|
|
|
|
self.strategy: Optional[HyperliquidStrategy] = None
|
|
self.sz_decimals = self._get_sz_decimals(COIN_SYMBOL)
|
|
self.active_position_id = None
|
|
|
|
# Safety & State
|
|
self.last_price: Optional[Decimal] = None
|
|
self.last_trade_time = 0
|
|
|
|
# Velocity Tracking
|
|
self.last_price_for_velocity: Optional[Decimal] = None
|
|
self.price_history: List[Decimal] = []
|
|
self.velocity_history: List[Decimal] = []
|
|
|
|
# PnL Tracking
|
|
self.strategy_start_time = 0
|
|
self.last_pnl_check_time = 0
|
|
self.trade_history_seen = set()
|
|
self.accumulated_pnl = Decimal("0.0")
|
|
self.accumulated_fees = Decimal("0.0")
|
|
|
|
# Logging Rate Limiting
|
|
self.last_idle_log_time = 0
|
|
self.last_pending_log_time = 0
|
|
|
|
# Order Tracking
|
|
self.original_order_side = None
|
|
self.shadow_orders = [] # Store theoretical Maker orders for analysis
|
|
|
|
logger.info(f"[DELTA] Delta-Zero Scalper Hedger initialized. Agent: {self.account.address}")
|
|
|
|
def calculate_volatility(self) -> Decimal:
|
|
"""
|
|
Calculate volatility (Standard Deviation %) of price history.
|
|
Uses standard deviation of the last N prices relative to the mean.
|
|
Returns: Decimal percentage (e.g., 0.001 = 0.1% volatility)
|
|
"""
|
|
if not self.price_history or len(self.price_history) < 30:
|
|
return Decimal("0.0")
|
|
|
|
try:
|
|
# 1. Mean
|
|
n = len(self.price_history)
|
|
mean = sum(self.price_history) / n
|
|
|
|
# 2. Variance (Sum of squared diffs)
|
|
variance = sum([pow(p - mean, 2) for p in self.price_history]) / n
|
|
|
|
# 3. Std Dev
|
|
std_dev = variance.sqrt()
|
|
|
|
# 4. Volatility %
|
|
if mean == 0: return Decimal("0.0")
|
|
return std_dev / mean
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating volatility: {e}")
|
|
return Decimal("0.0")
|
|
|
|
def get_dynamic_edge_proximity(self, price: Decimal) -> Decimal:
|
|
"""
|
|
Calculate dynamic edge proximity based on position value.
|
|
Larger positions need earlier warning (wider buffer).
|
|
Base: 4%. Scale: +4% per $10k value. Cap: 15%.
|
|
"""
|
|
base_pct = Decimal("0.04")
|
|
|
|
# Estimate Position Value (Use Target Value as proxy for total risk)
|
|
# If strategy not ready, fallback to 0
|
|
val_usd = self.strategy.target_value if self.strategy else Decimal("0")
|
|
|
|
# Fallback to current hedge value if target not set
|
|
if val_usd == 0 and self.last_price:
|
|
pos = self.get_current_position(COIN_SYMBOL)
|
|
val_usd = abs(pos['size']) * self.last_price
|
|
|
|
# Scaling: +0.04 (4%) for every 10,000 USD
|
|
# Factor = 0.04 / 10000 = 0.000004
|
|
scaling_factor = Decimal("0.000004")
|
|
|
|
add_pct = val_usd * scaling_factor
|
|
|
|
total = base_pct + add_pct
|
|
|
|
# Cap at 15% (0.15) and Min at 4% (0.04)
|
|
return max(base_pct, min(Decimal("0.15"), total))
|
|
|
|
def _init_strategy(self, position_data: Dict):
|
|
try:
|
|
entry_amount0 = to_decimal(position_data.get('amount0_initial', 0))
|
|
entry_amount1 = to_decimal(position_data.get('amount1_initial', 0))
|
|
target_value = to_decimal(position_data.get('target_value', 50))
|
|
|
|
entry_price = to_decimal(position_data['entry_price'])
|
|
lower = to_decimal(position_data['range_lower'])
|
|
upper = to_decimal(position_data['range_upper'])
|
|
|
|
start_price = self.get_market_price(COIN_SYMBOL)
|
|
if start_price is None:
|
|
logger.warning("Waiting for initial price to start strategy...")
|
|
return
|
|
|
|
self.strategy = HyperliquidStrategy(
|
|
entry_amount0=entry_amount0,
|
|
entry_amount1=entry_amount1,
|
|
target_value=target_value,
|
|
entry_price=entry_price,
|
|
low_range=lower,
|
|
high_range=upper,
|
|
start_price=start_price
|
|
)
|
|
|
|
# Reset State
|
|
self.last_price = start_price
|
|
self.last_trade_time = 0
|
|
self.price_history = [start_price]
|
|
|
|
self.strategy_start_time = int(time.time() * 1000)
|
|
self.trade_history_seen = set()
|
|
|
|
# Resume PnL from file if available, otherwise 0.0
|
|
self.accumulated_pnl = to_decimal(position_data.get('hedge_pnl_realized', 0.0))
|
|
self.accumulated_fees = to_decimal(position_data.get('hedge_fees_paid', 0.0))
|
|
|
|
self.active_position_id = position_data['token_id']
|
|
|
|
logger.info(f"[DELTA] Strat Init: Pos {self.active_position_id} | Range: {lower}-{upper} | Entry: {entry_price} | Start Px: {start_price:.2f} | Resumed PnL: {self.accumulated_pnl:.2f}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to init strategy: {e}")
|
|
self.strategy = None
|
|
|
|
def _get_sz_decimals(self, coin: str) -> int:
|
|
try:
|
|
meta = self.info.meta()
|
|
for asset in meta["universe"]:
|
|
if asset["name"] == coin:
|
|
return asset["szDecimals"]
|
|
return 4
|
|
except: return 4
|
|
|
|
def get_market_price(self, coin: str) -> Optional[Decimal]:
|
|
try:
|
|
mids = self.info.all_mids()
|
|
if coin in mids:
|
|
return to_decimal(mids[coin])
|
|
except: pass
|
|
return None
|
|
|
|
def get_order_book_levels(self, coin: str) -> Optional[Dict[str, Decimal]]:
|
|
try:
|
|
snapshot = self.info.l2_snapshot(coin)
|
|
if snapshot and 'levels' in snapshot:
|
|
bids = snapshot['levels'][0]
|
|
asks = snapshot['levels'][1]
|
|
if bids and asks:
|
|
best_bid = to_decimal(bids[0]['px'])
|
|
best_ask = to_decimal(asks[0]['px'])
|
|
mid = (best_bid + best_ask) / Decimal("2")
|
|
return {'bid': best_bid, 'ask': best_ask, 'mid': mid}
|
|
return None
|
|
except: return None
|
|
|
|
def get_current_position(self, coin: str) -> Dict[str, Decimal]:
|
|
try:
|
|
user_state = self.info.user_state(self.vault_address or self.account.address)
|
|
|
|
# Extract total account equity (marginSummary.accountValue)
|
|
equity = Decimal("0")
|
|
if "marginSummary" in user_state and "accountValue" in user_state["marginSummary"]:
|
|
equity = to_decimal(user_state["marginSummary"]["accountValue"])
|
|
|
|
for pos in user_state["assetPositions"]:
|
|
if pos["position"]["coin"] == coin:
|
|
return {
|
|
'size': to_decimal(pos["position"]["szi"]),
|
|
'pnl': to_decimal(pos["position"]["unrealizedPnl"]),
|
|
'equity': equity
|
|
}
|
|
return {'size': Decimal("0"), 'pnl': Decimal("0"), 'equity': equity}
|
|
except: return {'size': Decimal("0"), 'pnl': Decimal("0"), 'equity': Decimal("0")}
|
|
|
|
def get_open_orders(self) -> List[Dict]:
|
|
try:
|
|
return self.info.open_orders(self.vault_address or self.account.address)
|
|
except: return []
|
|
|
|
def check_shadow_orders(self, levels: Dict[str, Decimal]):
|
|
"""
|
|
Check if pending shadow (theoretical Maker) orders would have been filled.
|
|
"""
|
|
if not self.shadow_orders or not levels:
|
|
return
|
|
|
|
now = time.time()
|
|
remaining_orders = []
|
|
|
|
for order in self.shadow_orders:
|
|
# 1. Check Fill
|
|
filled = False
|
|
fill_time = now - (order['expires_at'] - order['timeout_duration'])
|
|
|
|
if order['side'] == 'BUY':
|
|
# Filled if someone SOLD into our Bid (Current Ask <= Our Bid Price)
|
|
# Wait... Maker Buy sits at Bid. It fills if Market Price drops to it.
|
|
# Actually, we need to track if TRADE price hit it.
|
|
# Proxy: If Current Best Ask <= Our Shadow Bid, it DEFINITELY filled (crossed).
|
|
# Conservative Proxy: If Current Best Bid < Our Shadow Bid? No.
|
|
# Standard Sim: If Low Price <= Our Limit.
|
|
# Here we only have snapshots.
|
|
# If 'levels["bid"]' goes below our price, did we fill? Maybe not.
|
|
# If 'levels["ask"]' goes below our price, we definitely filled.
|
|
if levels['ask'] <= order['price']:
|
|
filled = True
|
|
else: # SELL
|
|
# Filled if Current Best Bid >= Our Shadow Ask
|
|
if levels['bid'] >= order['price']:
|
|
filled = True
|
|
|
|
if filled:
|
|
logger.info(f"[SHADOW] ✅ SUCCESS: Maker {order['side']} @ {order['price']:.2f} filled in {fill_time:.1f}s (Timeout: {order['timeout_duration']:.0f}s)")
|
|
continue # Remove from list
|
|
|
|
# 2. Check Expiry
|
|
if now > order['expires_at']:
|
|
logger.info(f"[SHADOW] ❌ FAILED: Maker {order['side']} @ {order['price']:.2f} timed out after {order['timeout_duration']:.0f}s")
|
|
continue # Remove from list
|
|
|
|
remaining_orders.append(order)
|
|
|
|
self.shadow_orders = remaining_orders
|
|
|
|
def cancel_order(self, coin: str, oid: int):
|
|
logger.info(f"Cancelling order {oid}...")
|
|
try:
|
|
return self.exchange.cancel(coin, oid)
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling order: {e}")
|
|
|
|
def place_limit_order(self, coin: str, is_buy: bool, size: Decimal, price: Decimal, order_type: str = "Alo") -> Optional[int]:
|
|
# Validate using Decimal logic
|
|
validated_size_float = validate_trade_size(size, self.sz_decimals, MIN_ORDER_VALUE_USD, price)
|
|
|
|
if validated_size_float == 0:
|
|
logger.error(f"Trade size {size} invalid after validation")
|
|
return None
|
|
|
|
price_float = round_to_sig_figs_precise(price, 5)
|
|
|
|
logger.info(f"[ORDER] {order_type.upper()} {coin} {'BUY' if is_buy else 'SELL'} {validated_size_float} @ {price_float}")
|
|
|
|
try:
|
|
order_result = self.exchange.order(coin, is_buy, validated_size_float, price_float, {"limit": {"tif": order_type}}, reduce_only=is_buy)
|
|
status = order_result["status"]
|
|
|
|
if status == "ok":
|
|
response_data = order_result["response"]["data"]
|
|
if "statuses" in response_data:
|
|
status_obj = response_data["statuses"][0]
|
|
if "resting" in status_obj:
|
|
return status_obj["resting"]["oid"]
|
|
elif "filled" in status_obj:
|
|
logger.info("Order filled immediately.")
|
|
return status_obj["filled"]["oid"]
|
|
elif "error" in status_obj:
|
|
logger.error(f"Order API Error: {status_obj['error']}")
|
|
else:
|
|
logger.error(f"Order Failed: {order_result}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Exception during trade: {e}")
|
|
return None
|
|
|
|
def manage_orders(self) -> bool:
|
|
"""Returns True if there is an active order that should prevent new trades."""
|
|
open_orders = self.get_open_orders()
|
|
my_orders = [o for o in open_orders if o['coin'] == COIN_SYMBOL]
|
|
|
|
if not my_orders:
|
|
return False
|
|
|
|
if len(my_orders) > 1:
|
|
logger.warning("Multiple orders found. Cancelling all.")
|
|
for o in my_orders:
|
|
self.cancel_order(COIN_SYMBOL, o['oid'])
|
|
return False
|
|
|
|
order = my_orders[0]
|
|
oid = order['oid']
|
|
order_price = to_decimal(order['limitPx'])
|
|
|
|
# Check if price moved too far
|
|
levels = self.get_order_book_levels(COIN_SYMBOL)
|
|
if not levels: return True # Keep order if data missing
|
|
|
|
current_mid = levels['mid']
|
|
pct_diff = abs(current_mid - order_price) / order_price
|
|
|
|
# Dynamic Buffer logic (Simplified for Decimal)
|
|
# Using base buffer for now, can be enhanced
|
|
dynamic_buffer = PRICE_BUFFER_PCT
|
|
if pct_diff > dynamic_buffer:
|
|
logger.info(f"Price moved {pct_diff*100:.3f}% > {dynamic_buffer*100:.3f}%. Cancelling {oid}.")
|
|
self.cancel_order(COIN_SYMBOL, oid)
|
|
return False
|
|
|
|
if time.time() - self.last_pending_log_time > 10:
|
|
logger.info(f"Order {oid} within range ({pct_diff*100:.3f}% < {dynamic_buffer*100:.3f}%). Waiting.")
|
|
self.last_pending_log_time = time.time()
|
|
|
|
return True
|
|
|
|
def track_fills_and_pnl(self, force: bool = False):
|
|
try:
|
|
now = time.time()
|
|
if not force and now - self.last_pnl_check_time < 10:
|
|
return
|
|
self.last_pnl_check_time = now
|
|
|
|
user_fills = self.info.user_fills(self.vault_address or self.account.address)
|
|
new_activity = False
|
|
|
|
for fill in user_fills:
|
|
if fill['coin'] != COIN_SYMBOL: continue
|
|
if fill['time'] < self.strategy_start_time: continue
|
|
|
|
fill_id = fill.get('tid')
|
|
if fill_id in self.trade_history_seen: continue
|
|
|
|
self.trade_history_seen.add(fill_id)
|
|
fees = to_decimal(fill['fee'])
|
|
pnl = to_decimal(fill['closedPnl'])
|
|
|
|
self.accumulated_fees += fees
|
|
self.accumulated_pnl += pnl
|
|
new_activity = True
|
|
logger.info(f"[FILL] {fill['side']} {fill['sz']} @ {fill['px']} | Fee: {fees} | PnL: {pnl}")
|
|
|
|
if new_activity:
|
|
# Convert back to float for JSON compatibility
|
|
update_position_stats(self.active_position_id, {
|
|
"hedge_pnl_realized": round(float(self.accumulated_pnl), 2),
|
|
"hedge_fees_paid": round(float(self.accumulated_fees), 2)
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error tracking fills: {e}")
|
|
|
|
def close_all_positions(self, force_taker: bool = False):
|
|
logger.info("Closing all positions...")
|
|
try:
|
|
# 1. Cancel Orders
|
|
open_orders = self.get_open_orders()
|
|
for o in open_orders:
|
|
if o['coin'] == COIN_SYMBOL:
|
|
self.cancel_order(COIN_SYMBOL, o['oid'])
|
|
|
|
# 2. Get Position
|
|
pos_data = self.get_current_position(COIN_SYMBOL)
|
|
current_pos = pos_data['size']
|
|
|
|
if current_pos == 0: return
|
|
|
|
is_buy_to_close = current_pos < 0
|
|
# Use Decimal absolute
|
|
final_size = abs(current_pos)
|
|
|
|
# --- MAKER CLOSE ---
|
|
if not force_taker:
|
|
levels = self.get_order_book_levels(COIN_SYMBOL)
|
|
if levels:
|
|
tick_size = Decimal("0.1")
|
|
price = levels['bid'] - tick_size if is_buy_to_close else levels['ask'] + tick_size
|
|
|
|
logger.info(f"Attempting Maker Close: {final_size} @ {price}")
|
|
oid = self.place_limit_order(COIN_SYMBOL, is_buy_to_close, final_size, price, "Alo")
|
|
if oid:
|
|
logger.info(f"Close Order Placed: {oid}")
|
|
return
|
|
|
|
# --- TAKER CLOSE ---
|
|
market_price = self.get_market_price(COIN_SYMBOL)
|
|
if market_price:
|
|
# 5% slippage for guaranteed close
|
|
slip = Decimal("1.05") if is_buy_to_close else Decimal("0.95")
|
|
limit_price = market_price * slip
|
|
logger.info(f"Executing Taker Close: {final_size} @ {limit_price}")
|
|
self.place_limit_order(COIN_SYMBOL, is_buy_to_close, final_size, limit_price, "Ioc")
|
|
self.active_position_id = None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error closing positions: {e}")
|
|
|
|
def run(self):
|
|
logger.info(f"Starting Hedger Loop ({CHECK_INTERVAL}s)...")
|
|
|
|
while True:
|
|
try:
|
|
active_pos = get_active_automatic_position()
|
|
|
|
# Check Global Disable or Missing Position
|
|
if not active_pos or not active_pos.get('hedge_enabled', True):
|
|
if self.strategy is not None:
|
|
logger.info("Hedge Disabled/Missing. Closing.")
|
|
self.close_all_positions(force_taker=True)
|
|
self.strategy = None
|
|
time.sleep(CHECK_INTERVAL)
|
|
continue
|
|
|
|
# Check CLOSING status (from Manager)
|
|
if active_pos.get('status') == 'CLOSING':
|
|
logger.info(f"[ALERT] Position {active_pos['token_id']} is CLOSING. Closing Hedge.")
|
|
self.close_all_positions(force_taker=True)
|
|
self.strategy = None
|
|
time.sleep(CHECK_INTERVAL)
|
|
continue
|
|
|
|
# Initialize Strategy if needed
|
|
if self.strategy is None or self.active_position_id != active_pos['token_id']:
|
|
self._init_strategy(active_pos)
|
|
if self.strategy is None:
|
|
time.sleep(CHECK_INTERVAL)
|
|
continue
|
|
|
|
# --- CYCLE START ---
|
|
|
|
# 1. Manage Orders
|
|
if self.manage_orders():
|
|
time.sleep(CHECK_INTERVAL)
|
|
continue
|
|
|
|
# 2. Market Data
|
|
levels = self.get_order_book_levels(COIN_SYMBOL)
|
|
if not levels:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
# Check Shadow Orders (Market Maker Simulation)
|
|
self.check_shadow_orders(levels)
|
|
|
|
price = levels['mid']
|
|
pos_data = self.get_current_position(COIN_SYMBOL)
|
|
current_size = pos_data['size']
|
|
current_pnl = pos_data['pnl']
|
|
current_equity = pos_data['equity']
|
|
|
|
# Update JSON with latest equity stats
|
|
update_position_stats(self.active_position_id, {
|
|
"hedge_equity_usd": float(current_equity)
|
|
})
|
|
|
|
# 3. Calculate Logic
|
|
calc = self.strategy.calculate_rebalance(price, current_size)
|
|
diff_abs = abs(calc['diff'])
|
|
|
|
# Update Price History (Max 300 items = 5 mins @ 1s)
|
|
self.price_history.append(price)
|
|
if len(self.price_history) > 300:
|
|
self.price_history.pop(0)
|
|
|
|
# 4. Thresholds
|
|
sqrt_Pa = self.strategy.low_range.sqrt()
|
|
sqrt_Pb = self.strategy.high_range.sqrt()
|
|
max_potential_eth = self.strategy.L * ((Decimal("1")/sqrt_Pa) - (Decimal("1")/sqrt_Pb))
|
|
|
|
# --- Dynamic Threshold Optimization (ATR/Vol Based) ---
|
|
|
|
# 1. Calculate Volatility
|
|
vol_pct = self.calculate_volatility()
|
|
|
|
# 2. Volatility Multiplier
|
|
# Base Vol assumption: 0.05% (0.0005) per window.
|
|
# If Vol is 0.15%, mult = 3x. Cap at 3.0x. Min 1.0x.
|
|
base_vol_ref = Decimal("0.0005")
|
|
vol_multiplier = Decimal("1.0")
|
|
if vol_pct > 0:
|
|
vol_multiplier = max(Decimal("1.0"), min(Decimal("3.0"), vol_pct / base_vol_ref))
|
|
|
|
# 3. Base Threshold Calculation (Range Dependent)
|
|
range_width_pct = (self.strategy.high_range - self.strategy.low_range) / self.strategy.low_range
|
|
|
|
# Ensure we satisfy PRICE_BUFFER_PCT (0.15%) minimum
|
|
base_threshold_pct = max(Decimal("0.05"), PRICE_BUFFER_PCT / range_width_pct if range_width_pct > 0 else Decimal("0.05"))
|
|
|
|
# 4. Apply Multiplier
|
|
target_threshold_pct = base_threshold_pct * vol_multiplier
|
|
|
|
# 5. Safety Cap
|
|
# Limit threshold to 20% of the total range width (relative) to prevent staying unhedged too long
|
|
# e.g. if range is 1% wide, max threshold is 0.2% deviation.
|
|
# If range is 10% wide, max threshold is 2% deviation.
|
|
# Absolute hard cap at 15% delta deviation.
|
|
safety_cap = min(Decimal("0.15"), Decimal("0.20"))
|
|
|
|
final_threshold_pct = min(target_threshold_pct, safety_cap)
|
|
|
|
rebalance_threshold = max(MIN_THRESHOLD_ETH, max_potential_eth * final_threshold_pct)
|
|
|
|
# Volatility Adjustment (Instantaneous Shock)
|
|
# Keep this for sudden spikes that haven't affected the 5-min average yet
|
|
if self.last_price:
|
|
pct_change = abs(price - self.last_price) / self.last_price
|
|
if pct_change > Decimal("0.003"):
|
|
rebalance_threshold *= DYNAMIC_THRESHOLD_MULTIPLIER
|
|
|
|
self.last_price = price
|
|
|
|
# 5. Check Zones
|
|
# Assuming simple in-range check for now as zone logic was complex float math
|
|
# Using Strategy ranges
|
|
in_range = self.strategy.low_range <= price <= self.strategy.high_range
|
|
|
|
if not in_range:
|
|
if price > self.strategy.high_range:
|
|
logger.info(f"[OUT] ABOVE RANGE ({price:.2f}). Closing Hedge.")
|
|
self.close_all_positions(force_taker=True)
|
|
elif price < self.strategy.low_range:
|
|
if int(time.time()) % 20 == 0:
|
|
logger.info(f"[HOLD] BELOW RANGE ({price:.2f}). Holding Hedge.")
|
|
time.sleep(CHECK_INTERVAL)
|
|
continue
|
|
|
|
# 6. Execute Trade (with Edge Protection)
|
|
bypass_cooldown = False
|
|
override_reason = ""
|
|
|
|
# Edge Proximity Check
|
|
if active_pos.get('status') == 'OPEN':
|
|
# Dynamic Proximity Calculation
|
|
position_edge_proximity = self.get_dynamic_edge_proximity(price)
|
|
|
|
range_width = self.strategy.high_range - self.strategy.low_range
|
|
distance_from_bottom = price - self.strategy.low_range
|
|
distance_from_top = self.strategy.high_range - price
|
|
|
|
edge_distance = range_width * position_edge_proximity
|
|
|
|
is_near_bottom = distance_from_bottom <= edge_distance
|
|
is_near_top = distance_from_top <= edge_distance
|
|
|
|
if is_near_bottom or is_near_top:
|
|
bypass_cooldown = True
|
|
override_reason = f"EDGE PROXIMITY ({position_edge_proximity*100:.1f}% dyn-edge)"
|
|
if is_near_bottom:
|
|
override_reason += f" ({distance_from_bottom:.2f} from bottom)"
|
|
else:
|
|
override_reason += f" ({distance_from_top:.2f} from top)"
|
|
# Large Hedge Check
|
|
if not bypass_cooldown:
|
|
if diff_abs > (rebalance_threshold * LARGE_HEDGE_MULTIPLIER):
|
|
bypass_cooldown = True
|
|
override_reason = f"LARGE HEDGE NEEDED ({diff_abs:.4f} vs {rebalance_threshold:.4f})"
|
|
|
|
can_trade = False
|
|
cooldown_text = ""
|
|
|
|
if diff_abs > rebalance_threshold:
|
|
if bypass_cooldown:
|
|
can_trade = True
|
|
logger.info(f"[WARN] COOLDOWN BYPASSED: {override_reason}")
|
|
elif time.time() - self.last_trade_time > MIN_TIME_BETWEEN_TRADES:
|
|
can_trade = True
|
|
else:
|
|
time_left = MIN_TIME_BETWEEN_TRADES - (time.time() - self.last_trade_time)
|
|
cooldown_text = f" | [WAIT] Cooldown ({time_left:.0f}s)"
|
|
|
|
if can_trade:
|
|
is_buy = (calc['action'] == "BUY")
|
|
# Taker execution for rebalance
|
|
exec_price = levels['ask'] * Decimal("1.001") if is_buy else levels['bid'] * Decimal("0.999")
|
|
|
|
urgency = "URGENT" if bypass_cooldown else "NORMAL"
|
|
logger.info(f"[TRIG] Rebalance ({urgency}): {calc['action']} {diff_abs:.4f} > {rebalance_threshold:.4f} | Book: {levels['bid']}/{levels['ask']} | Vol: {vol_pct*100:.3f}% x{vol_multiplier:.1f} | Thresh: {final_threshold_pct*100:.1f}%")
|
|
|
|
oid = self.place_limit_order(COIN_SYMBOL, is_buy, diff_abs, exec_price, "Ioc")
|
|
if oid:
|
|
self.last_trade_time = time.time()
|
|
self.track_fills_and_pnl(force=True)
|
|
|
|
# --- Shadow Order Creation ---
|
|
# Simulate: "What if we placed a Maker order instead?"
|
|
try:
|
|
# Fixed Timeout for Data Collection (10 min)
|
|
# We want to capture the full distribution of fill times.
|
|
dynamic_timeout = 600.0
|
|
|
|
# Shadow Price (Passive)
|
|
# If we Taker BUY, we would have Maker BUY at BID
|
|
# If we Taker SELL, we would have Maker SELL at ASK
|
|
shadow_price = levels['bid'] if is_buy else levels['ask']
|
|
|
|
self.shadow_orders.append({
|
|
'side': 'BUY' if is_buy else 'SELL',
|
|
'price': shadow_price,
|
|
'timeout_duration': dynamic_timeout,
|
|
'expires_at': time.time() + dynamic_timeout
|
|
})
|
|
logger.info(f"[SHADOW] Created Maker {'BUY' if is_buy else 'SELL'} @ {shadow_price:.2f} (Timeout: {dynamic_timeout:.0f}s)")
|
|
except Exception as e:
|
|
logger.error(f"Shadow logic error: {e}")
|
|
else:
|
|
if time.time() - self.last_idle_log_time > 30:
|
|
logger.info(f"[WAIT] Cooldown. Diff: {diff_abs:.4f}{cooldown_text}")
|
|
self.last_idle_log_time = time.time()
|
|
else:
|
|
if time.time() - self.last_idle_log_time > 30:
|
|
logger.info(f"[IDLE] Px: {price:.2f} | Diff: {diff_abs:.4f} < {rebalance_threshold:.4f} (Vol: {vol_pct*100:.3f}% x{vol_multiplier:.1f} | Thresh: {final_threshold_pct*100:.1f}%) | TotPnL: {self.accumulated_pnl:.2f}")
|
|
self.last_idle_log_time = time.time()
|
|
|
|
self.track_fills_and_pnl()
|
|
time.sleep(CHECK_INTERVAL)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Stopping...")
|
|
self.close_all_positions()
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Loop Error: {e}", exc_info=True)
|
|
time.sleep(5)
|
|
|
|
if __name__ == "__main__":
|
|
hedger = ScalperHedger()
|
|
hedger.run() |