Files
uniswap_auto_clp/clp_scalper_hedger.py

1230 lines
59 KiB
Python

import os
import time
import logging
import sys
import math
import json
import threading
from decimal import Decimal, getcontext, ROUND_DOWN, ROUND_HALF_UP
from dotenv import load_dotenv
# --- FIX: Add project root to sys.path to import local modules ---
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
sys.path.append(project_root)
# Now we can import from root
from logging_utils import setup_logging
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants
# Load environment variables from .env in current directory
dotenv_path = os.path.join(current_dir, '.env')
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path)
else:
# Fallback to default search
load_dotenv()
# Configure logging and get logger instance
logger = setup_logging("info", "SCALPER_HEDGER")
logger.propagate = False # Prevent double logging from root logger
# Update root logger to ensure all logging calls go to our handlers
root_logger = logging.getLogger()
root_logger.handlers.clear()
for handler in logger.handlers:
root_logger.addHandler(handler)
root_logger.setLevel(logger.level)
# --- DECIMAL PRECISION CONFIGURATION ---
# Set high precision for calculations to avoid float_to_wire serialization errors
getcontext().prec = 28
def safe_decimal_from_float(value):
"""Safely convert float to Decimal without precision loss"""
if value is None:
return Decimal('0')
return Decimal(str(value))
def round_to_sz_decimals_precise(amount, sz_decimals):
"""
Round amount to specified decimals using Decimal for precise rounding
Avoids float_to_wire serialization errors
"""
if amount == 0:
return 0.0
# Convert to Decimal precisely
decimal_amount = safe_decimal_from_float(abs(amount))
# Create rounding quantizer
quantizer = Decimal('1').scaleb(-sz_decimals) # Equivalent to 10^(-sz_decimals)
# Round using ROUND_DOWN to avoid exceeding limits
rounded = decimal_amount.quantize(quantizer, rounding=ROUND_DOWN)
# Convert back to float for API compatibility
return float(rounded)
def round_to_sig_figs_precise(x, sig_figs=5):
"""
Round to significant figures using Decimal for precision
Ensures compatibility with Hyperliquid's 5 sig fig requirement
"""
if x == 0:
return 0.0
decimal_x = safe_decimal_from_float(x)
# Simple approach: use string-based rounding for significant figures
str_x = f"{decimal_x:.{sig_figs}g}"
return float(str_x)
def validate_trade_size(size, sz_decimals, min_order_value=10.0, price=3000.0):
"""
Validate and adjust trade size to meet exchange requirements
"""
if size <= 0:
return 0.0
# Round to correct decimals
rounded_size = round_to_sz_decimals_precise(size, sz_decimals)
# Check minimum order value
order_value = rounded_size * price
if order_value < min_order_value:
return 0.0
# Ensure not too small (avoid dust)
min_size = 10 ** (-sz_decimals)
if rounded_size < min_size:
return 0.0
return rounded_size
# --- CONFIGURATION ---
COIN_SYMBOL = "ETH"
CHECK_INTERVAL = 1 # Optimized for speed (was 5)
LEVERAGE = 5 # 3x Leverage
STATUS_FILE = "hedge_status.json"
# Import enhanced order functions
import sys
import os
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)
from enhanced_order_functions import get_price_momentum_pct, get_dynamic_price_buffer
# REMOVED: Uniswap Spread Monitoring for cleaner delta-zero hedging
# - Eliminated external RPC dependencies
# - Reduced complexity and failure points
# - Focused on core delta-zero hedging mission
# - Improved performance and reliability
# --- STRATEGY ZONES (Percent of Range Width) ---
# Bottom Hedge Zone: Covers entire range (0.0 to 1.5) -> Always Active
ZONE_BOTTOM_HEDGE_LIMIT = 1
# Close Zone: Disabled (Set > 1.0)
ZONE_CLOSE_START = 10.0
ZONE_CLOSE_END = 11.0
# Top Hedge Zone: Disabled/Redundant
ZONE_TOP_HEDGE_START = 10.0
# --- ORDER SETTINGS ---
PRICE_BUFFER_PCT = 0.0015 # 0.25% price move triggers order update (Optimized for capital safety)
MIN_THRESHOLD_ETH = 0.012 # Minimum trade size in ETH (~$35, Optimized for significant trades)
MIN_ORDER_VALUE_USD = 10.0 # Minimum order value for API safety
# --- CAPITAL SAFETY PARAMETERS ---
DYNAMIC_THRESHOLD_MULTIPLIER = 1.3 # Reduce from 1.5 for smoother operation with 7k position
MIN_TIME_BETWEEN_TRADES = 25 # Reduce from 30 for more responsive 7k hedging
MAX_HEDGE_MULTIPLIER = 1.25 # Increase from 1.2 for adequate 7k position buffer
# --- RANGE EDGE PROTECTION PARAMETERS ---
# Conservative settings for $8000 CLP positions with higher capital efficiency
EDGE_PROXIMITY_PCT = 0.04 # 5% of range width from edge (conservative fee protection)
VELOCITY_THRESHOLD_PCT = 0.0005 # Multi-timeframe velocity threshold (0.05% smoothed over 5s) for emergency override
POSITION_OPEN_EDGE_PROXIMITY_PCT = 0.06 # 7% (very conservative when earning fees)
POSITION_CLOSED_EDGE_PROXIMITY_PCT = 0.025 # 3% (standard when position closed)
LARGE_HEDGE_MULTIPLIER = 2.8 # More forgiving for large hedge requirements
# Multi-Timeframe Velocity Calculation (Option 3B):
# - 1s velocity: Immediate response for extreme moves (>0.2% per second)
# - 5s average: Smoothed signal for sustained directional moves
# - Reduces false triggers from 1s noise while maintaining emergency response capability
# REMOVED: UniswapPriceMonitor class for cleaner delta-zero hedging
# Benefits:
# - Eliminated external RPC dependencies
# - Reduced threading complexity
# - Removed external failure points
# - Focused on core delta-zero hedging mission
# - Improved system reliability and performance
def get_active_automatic_position():
if not os.path.exists(STATUS_FILE):
return None
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
for entry in data:
if entry.get('type') == 'AUTOMATIC' and entry.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING']:
return entry
except Exception as e:
logging.error(f"ERROR reading status file: {e}")
return None
def update_position_zones_in_json(token_id, zones_data):
"""Updates the active position in JSON with calculated zone prices and formats the entry."""
if not os.path.exists(STATUS_FILE): return
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
updated = False
for i, entry in enumerate(data):
if entry.get('type') == 'AUTOMATIC' and (entry.get('status') == 'OPEN' or entry.get('status') == 'PENDING_HEDGE') and entry.get('token_id') == token_id:
# Merge Zones
for k, v in zones_data.items():
entry[k] = v
# Format & Reorder
open_ts = entry.get('timestamp_open', int(time.time()))
opened_str = time.strftime('%H:%M %d/%m/%y', time.localtime(open_ts))
# Reconstruct Dict in Order
new_entry = {
"type": entry.get('type'),
"token_id": entry.get('token_id'),
"opened": opened_str,
"status": entry.get('status'),
"entry_price": round(entry.get('entry_price', 0), 2),
"target_value": round(entry.get('target_value', 0), 2),
# Amounts might be string or float or int. Ensure float.
"amount0_initial": round(float(entry.get('amount0_initial', 0)), 4),
"amount1_initial": round(float(entry.get('amount1_initial', 0)), 2),
"range_upper": round(entry.get('range_upper', 0), 2),
"zone_top_start_price": entry.get('zone_top_start_price'),
"zone_close_top_price": entry.get('zone_close_top_price'),
"zone_close_bottom_price": entry.get('zone_close_bottom_price'),
"zone_bottom_limit_price": entry.get('zone_bottom_limit_price'),
"range_lower": round(entry.get('range_lower', 0), 2),
"static_long": entry.get('static_long', 0.0),
"timestamp_open": open_ts,
"timestamp_close": entry.get('timestamp_close')
}
data[i] = new_entry
updated = True
break
if updated:
with open(STATUS_FILE, 'w') as f:
json.dump(data, f, indent=2)
logging.info(f"Updated JSON with Formatted Zone Prices for Position {token_id}")
except Exception as e:
logging.error(f"Error updating JSON zones: {e}")
# Legacy functions replaced with precise decimal versions above
def round_to_sig_figs(x, sig_figs=5):
"""Legacy wrapper - use round_to_sig_figs_precise"""
return round_to_sig_figs_precise(x, sig_figs)
def round_to_sz_decimals(amount, sz_decimals=4):
"""Legacy wrapper - use round_to_sz_decimals_precise"""
return round_to_sz_decimals_precise(amount, sz_decimals)
def update_position_stats(token_id, stats_data):
"""Updates the active position in JSON with stats (zones, pnl, fees)."""
if not os.path.exists(STATUS_FILE): return
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
updated = False
for i, entry in enumerate(data):
if entry.get('type') == 'AUTOMATIC' and entry.get('status') in ['OPEN', 'PENDING_HEDGE', 'CLOSING'] and entry.get('token_id') == token_id:
# Merge Stats
for k, v in stats_data.items():
entry[k] = v
# Format & Reorder (Preserve existing logic)
open_ts = entry.get('timestamp_open', int(time.time()))
opened_str = time.strftime('%H:%M %d/%m/%y', time.localtime(open_ts))
new_entry = {
"type": entry.get('type'),
"token_id": entry.get('token_id'),
"opened": opened_str,
"status": entry.get('status'),
"entry_price": round(entry.get('entry_price', 0), 2),
"target_value": round(entry.get('target_value', 0), 2),
"amount0_initial": round(float(entry.get('amount0_initial', 0)), 4),
"amount1_initial": round(float(entry.get('amount1_initial', 0)), 2),
"range_upper": round(entry.get('range_upper', 0), 2),
"zone_top_start_price": entry.get('zone_top_start_price'),
"zone_close_top_price": entry.get('zone_close_top_price'),
"zone_close_bottom_price": entry.get('zone_close_bottom_price'),
"zone_bottom_limit_price": entry.get('zone_bottom_limit_price'),
"range_lower": round(entry.get('range_lower', 0), 2),
"static_long": entry.get('static_long', 0.0),
# New Stats
"hedge_pnl_realized": round(entry.get('hedge_pnl_realized', 0.0), 2),
"hedge_fees_paid": round(entry.get('hedge_fees_paid', 0.0), 2),
"timestamp_open": open_ts,
"timestamp_close": entry.get('timestamp_close')
}
data[i] = new_entry
updated = True
break
if updated:
with open(STATUS_FILE, 'w') as f:
json.dump(data, f, indent=2)
# logging.info(f"Updated JSON stats for Position {token_id}")
except Exception as e:
logging.error(f"Error updating JSON stats: {e}")
class HyperliquidStrategy:
def __init__(self, entry_amount0, entry_amount1, target_value, entry_price, low_range, high_range, start_price, static_long=0.0):
self.entry_amount0 = entry_amount0
self.entry_amount1 = entry_amount1
self.target_value = target_value
self.entry_price = entry_price
self.low_range = low_range
self.high_range = high_range
self.static_long = static_long
self.start_price = start_price
self.gap = max(0.0, entry_price - start_price)
self.recovery_target = entry_price + (2 * self.gap)
self.current_mode = "NORMAL"
self.last_switch_time = 0
logging.info(f"Strategy Init. Start Px: {start_price:.2f} | Gap: {self.gap:.2f} | Recovery Tgt: {self.recovery_target:.2f}")
try:
sqrt_P = math.sqrt(entry_price)
sqrt_Pa = math.sqrt(low_range)
sqrt_Pb = math.sqrt(high_range)
self.L = 0.0
# Method 1: Use Amount0 (WETH)
if entry_amount0 > 0:
# If amount is huge (Wei), scale it. If small (ETH), use as is.
if entry_amount0 > 1000: amount0_eth = entry_amount0 / 10**18
else: amount0_eth = entry_amount0
denom0 = (1/sqrt_P) - (1/sqrt_Pb)
if denom0 > 0.00000001:
self.L = amount0_eth / denom0
logging.info(f"Calculated L from Amount0: {self.L:.4f}")
# Method 2: Use Amount1 (USDC)
if self.L == 0.0 and entry_amount1 > 0:
if entry_amount1 > 100000: amount1_usdc = entry_amount1 / 10**6
else: amount1_usdc = entry_amount1
denom1 = sqrt_P - sqrt_Pa
if denom1 > 0.00000001:
self.L = amount1_usdc / denom1
logging.info(f"Calculated L from Amount1: {self.L:.4f}")
# Method 3: Fallback Heuristic
if self.L == 0.0:
logging.warning("Amounts missing or 0. Using Target Value Heuristic.")
max_eth_heuristic = target_value / low_range
denom_h = (1/sqrt_Pa) - (1/sqrt_Pb)
if denom_h > 0:
self.L = max_eth_heuristic / denom_h
logging.info(f"Calculated L from Target Value: {self.L:.4f}")
else:
logging.error("Critical: Denominator 0 in Heuristic. Invalid Range?")
self.L = 0.0
except Exception as e:
logging.error(f"Error calculating liquidity: {e}")
sys.exit(1)
def get_pool_delta(self, current_price):
if current_price >= self.high_range: return 0.0
if current_price <= self.low_range:
sqrt_Pa = math.sqrt(self.low_range)
sqrt_Pb = math.sqrt(self.high_range)
return self.L * ((1/sqrt_Pa) - (1/sqrt_Pb))
sqrt_P = math.sqrt(current_price)
sqrt_Pb = math.sqrt(self.high_range)
return self.L * ((1/sqrt_P) - (1/sqrt_Pb))
def calculate_rebalance(self, current_price, current_short_position_size):
pool_delta = self.get_pool_delta(current_price)
# --- Over-Hedge Logic ---
overhedge_pct = 0.0
range_width = self.high_range - self.low_range
if range_width > 0:
price_pct = (current_price - self.low_range) / range_width
# If below 0.8 (80%) of range
if price_pct < 0.8:
# Formula: 0.75% boost for every 0.1 drop below 0.8
# Example: At 0.6 (60%), diff is 0.2. (0.2/0.1)*0.0075 = 0.015 (1.5%)
overhedge_pct = ((0.8 - max(0.0, price_pct)) / 0.1) * 0.0075
raw_target_short = pool_delta + self.static_long
# Apply Boost
adjusted_target_short = raw_target_short * (1.0 + overhedge_pct)
target_short_size = adjusted_target_short
diff = target_short_size - abs(current_short_position_size)
return {
"current_price": current_price,
"pool_delta": pool_delta,
"target_short": target_short_size,
"current_short": abs(current_short_position_size),
"diff": diff,
"action": "SELL" if diff > 0 else "BUY",
"mode": "OVERHEDGE" if overhedge_pct > 0 else "NORMAL",
"overhedge_pct": overhedge_pct
}
class ScalperHedger:
def __init__(self):
self.private_key = os.environ.get("SCALPER_AGENT_PK")
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.private_key:
logging.error("No SCALPER_AGENT_PK found in .env")
sys.exit(1)
self.account = Account.from_key(self.private_key)
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address)
try:
logging.info(f"Setting leverage to {LEVERAGE}x (Cross)...")
self.exchange.update_leverage(LEVERAGE, COIN_SYMBOL, is_cross=True)
except Exception as e:
logging.error(f"Failed to update leverage: {e}")
self.strategy = None
self.sz_decimals = self._get_sz_decimals(COIN_SYMBOL)
self.active_position_id = None
self.active_order = None
# --- Capital Safety Tracking Variables ---
self.last_price = None # For volatility detection
self.last_trade_time = 0 # For minimum time between trades
# --- Velocity Tracking for Edge Protection ---
self.last_price_for_velocity = None # For velocity calculations
self.price_history = [] # Track last N prices for velocity
self.velocity_history = [] # Track velocity history for multi-timeframe analysis
# --- Price Momentum Tracking ---
self.price_momentum_history = [] # Track last 5 price changes for momentum
# --- Order Management Enhancements ---
self.order_placement_time = 0 # Track when orders are placed
self.original_order_side = None # Track original order intent (BUY/SELL)
# --- Order Management Enhancements ---
self.order_placement_time = 0 # Track when orders are placed
self.original_order_side = None # Track original order intent (BUY/SELL)
# --- PnL Tracking ---
self.strategy_start_time = 0
self.last_pnl_check_time = 0
self.trade_history_seen = set() # Store fill IDs to avoid double counting
self.accumulated_pnl = 0.0
self.accumulated_fees = 0.0
# REMOVED: Uniswap Monitor for cleaner delta-zero hedging
# Benefits: No external RPC calls, no threading overhead, focused on core mission
logging.info(f"[DELTA] Delta-Zero Scalper Hedger initialized. Agent: {self.account.address}")
logging.info(f"[SAFE] Capital Safety: Price Buffer {PRICE_BUFFER_PCT*100:.1f}% | Min Threshold {MIN_THRESHOLD_ETH} ETH (~${MIN_THRESHOLD_ETH*3000:.0f} USD)")
logging.info(f"[TRIG] Dynamic Protection: Volatility Multiplier {DYNAMIC_THRESHOLD_MULTIPLIER}x | Trade Cooldown {MIN_TIME_BETWEEN_TRADES}s | Max Hedge {MAX_HEDGE_MULTIPLIER*100:.0f}%")
logging.info(f"[INFO] Uniswap spread monitoring removed for cleaner delta-zero hedging")
def _init_strategy(self, position_data):
try:
entry_amount0 = position_data.get('amount0_initial', 0)
entry_amount1 = position_data.get('amount1_initial', 0)
target_value = position_data.get('target_value', 50.0)
entry_price = position_data['entry_price']
lower = position_data['range_lower']
upper = position_data['range_upper']
static_long = position_data.get('static_long', 0.0)
start_price = self.get_market_price(COIN_SYMBOL)
if start_price is None:
logging.warning("Waiting for initial price to start strategy...")
return
self.strategy = HyperliquidStrategy(
entry_amount0=entry_amount0,
entry_amount1=entry_amount1,
target_value=target_value,
entry_price=entry_price,
low_range=lower,
high_range=upper,
start_price=start_price,
static_long=static_long
)
# Reset tracking variables for new strategy
self.last_price = start_price
self.last_trade_time = 0
self.last_price_for_velocity = start_price
self.price_history = [start_price] # Initialize price history for velocity
self.velocity_history = [] # Initialize velocity history for multi-timeframe analysis
# Reset PnL Tracking
self.strategy_start_time = int(time.time() * 1000) # MS
self.trade_history_seen = set()
self.accumulated_pnl = 0.0
self.accumulated_fees = 0.0
self.active_position_id = position_data['token_id']
# Init JSON stats
update_position_stats(self.active_position_id, {
"hedge_pnl_realized": 0.0,
"hedge_fees_paid": 0.0
})
logging.info(f"[DELTA] Delta-Zero Strategy Initialized for Position {position_data['token_id']}.")
logging.info(f"[INFO] CLP Range: ${lower:.2f} - ${upper:.2f} | Entry: ${entry_price:.2f} | Width: {((upper-lower)/lower)*100:.2f}%")
logging.info(f"[TRIG] Delta-Zero Hedging ACTIVE across entire CLP range with capital safety protections")
logging.info(f"[SAFE] Edge Protection: {EDGE_PROXIMITY_PCT*100:.1f}% proximity | Velocity: {VELOCITY_THRESHOLD_PCT*100:.2f}% threshold | Position-aware: OPEN={POSITION_OPEN_EDGE_PROXIMITY_PCT*100:.1f}% | CLOSED={POSITION_CLOSED_EDGE_PROXIMITY_PCT*100:.1f}%")
self.active_position_id = position_data['token_id']
except Exception as e:
logging.error(f"Failed to init strategy: {e}")
self.strategy = None
def track_fills_and_pnl(self, force=False):
"""Fetches recent fills, filters by strategy start, accumulates PnL/Fees, and updates JSON."""
try:
now = time.time()
# Check every 10 seconds unless forced
if not force and now - self.last_pnl_check_time < 10:
return
self.last_pnl_check_time = now
# Get user fills (returns list of recent fills)
user_fills = self.info.user_fills(self.vault_address or self.account.address)
new_activity = False
for fill in user_fills:
# Check Coin
if fill['coin'] != COIN_SYMBOL: continue
# Check Time (fill['time'] is ms)
if fill['time'] < self.strategy_start_time: continue
# Check duplication via unique 'tid'
fill_id = fill.get('tid')
if not fill_id: continue
if fill_id in self.trade_history_seen:
continue
# New Fill Found
self.trade_history_seen.add(fill_id)
fees = float(fill['fee'])
pnl = float(fill['closedPnl']) # Realized PnL from this trade (if closing)
self.accumulated_fees += fees
self.accumulated_pnl += pnl
new_activity = True
logging.info(f"[FILL] New Fill Processed: {fill['side']} {fill['sz']} @ {fill['px']} | Fee: ${fees:.4f} | Realized PnL: ${pnl:.4f}")
if new_activity:
logging.info(f"[PNL] Total Strategy PnL (Hedge): ${self.accumulated_pnl:.2f} | Fees Paid: ${self.accumulated_fees:.2f}")
update_position_stats(self.active_position_id, {
"hedge_pnl_realized": self.accumulated_pnl,
"hedge_fees_paid": self.accumulated_fees
})
except Exception as e:
logging.error(f"Error tracking fills: {e}")
def _get_sz_decimals(self, coin):
try:
meta = self.info.meta()
for asset in meta["universe"]:
if asset["name"] == coin:
return asset["szDecimals"]
return 4
except: return 4
def get_order_book_levels(self, coin):
try:
l2_snapshot = self.info.l2_snapshot(coin)
if l2_snapshot and 'levels' in l2_snapshot:
bids = l2_snapshot['levels'][0]
asks = l2_snapshot['levels'][1]
if bids and asks:
best_bid = float(bids[0]['px'])
best_ask = float(asks[0]['px'])
mid = (best_bid + best_ask) / 2
return {'bid': best_bid, 'ask': best_ask, 'mid': mid}
return None
except:
return None
def get_market_price(self, coin):
try:
mids = self.info.all_mids()
if coin in mids: return float(mids[coin])
except: pass
return None
def get_order_book_mid(self, coin):
try:
l2_snapshot = self.info.l2_snapshot(coin)
if l2_snapshot and 'levels' in l2_snapshot:
bids = l2_snapshot['levels'][0]
asks = l2_snapshot['levels'][1]
if bids and asks:
best_bid = float(bids[0]['px'])
best_ask = float(asks[0]['px'])
return (best_bid + best_ask) / 2
return self.get_market_price(coin)
except:
return self.get_market_price(coin)
def get_funding_rate(self, coin):
try:
meta, asset_ctxs = self.info.meta_and_asset_ctxs()
for i, asset in enumerate(meta["universe"]):
if asset["name"] == coin:
return float(asset_ctxs[i]["funding"])
return 0.0
except: return 0.0
def get_current_position(self, coin):
try:
user_state = self.info.user_state(self.vault_address or self.account.address)
for pos in user_state["assetPositions"]:
if pos["position"]["coin"] == coin:
return {
'size': float(pos["position"]["szi"]),
'pnl': float(pos["position"]["unrealizedPnl"])
}
return {'size': 0.0, 'pnl': 0.0}
except: return {'size': 0.0, 'pnl': 0.0}
def get_open_orders(self):
try:
return self.info.open_orders(self.vault_address or self.account.address)
except: return []
def cancel_order(self, coin, oid):
logging.info(f"Cancelling order {oid}...")
try:
return self.exchange.cancel(coin, oid)
except Exception as e:
logging.error(f"Error cancelling order: {e}")
def place_limit_order(self, coin, is_buy, size, price, order_type="Alo"):
# NEW: Validate and round size using decimal precision to avoid float_to_wire errors
validated_size = validate_trade_size(size, self.sz_decimals, MIN_ORDER_VALUE_USD, price)
if validated_size == 0:
logging.error(f"Trade size {size} is too small or invalid after validation")
return None
logging.info(f"[ORDER] PLACING {order_type.upper()}: {coin} {'BUY' if is_buy else 'SELL'} {validated_size:.8f} @ {price:.2f}")
reduce_only = is_buy
try:
# Use precise rounding for price to avoid serialization issues
limit_px = round_to_sig_figs_precise(price, 5)
# Log actual values being sent to API for debugging
logging.info(f"[API] API Call: Size={validated_size:.8f}, Price={limit_px:.2f}, Type={order_type}")
# Use specified TIF (Alo, Ioc, Gtc)
order_result = self.exchange.order(coin, is_buy, validated_size, limit_px, {"limit": {"tif": order_type}}, reduce_only=reduce_only)
status = order_result["status"]
if status == "ok":
response_data = order_result["response"]["data"]
if "statuses" in response_data:
status_obj = response_data["statuses"][0]
if "error" in status_obj:
logging.error(f"Order API Error: {status_obj['error']}")
return None
# Parse OID from nested structure
oid = None
if "resting" in status_obj:
oid = status_obj["resting"]["oid"]
elif "filled" in status_obj:
oid = status_obj["filled"]["oid"]
logging.info("Order filled immediately.")
if oid:
logging.info(f"[OK]: OID {oid}")
return oid
else:
logging.warning(f"Order placed but OID not found in: {status_obj}")
return None
else:
logging.error(f"Order Failed: {order_result}")
return None
except Exception as e:
logging.error(f"Exception during trade: {e}")
return None
def get_price_momentum_pct(self, current_price):
"""Calculate price momentum percentage over last 5 intervals"""
if not hasattr(self, 'price_momentum_history') or len(self.price_momentum_history) < 2:
return 0.0
recent_prices = self.price_momentum_history[-5:] # Last 5 prices
if len(recent_prices) < 2:
return 0.0
# Calculate momentum as percentage change
oldest_price = recent_prices[0]
momentum_pct = (current_price - oldest_price) / oldest_price
return momentum_pct
def get_dynamic_price_buffer(self):
"""Calculate dynamic price buffer based on market conditions"""
if not MOMENTUM_ADJUSTMENT_ENABLED:
return PRICE_BUFFER_PCT
current_price = self.last_price if self.last_price else 0
momentum_pct = self.get_price_momentum_pct(current_price)
base_buffer = PRICE_BUFFER_PCT
# Adjust buffer based on momentum and position direction
if self.original_order_side == "BUY":
# For BUY orders: tolerate more upside movement
if momentum_pct > 0.005: # Strong upward momentum
dynamic_buffer = base_buffer * 2.0
elif momentum_pct > 0.002: # Moderate upward momentum
dynamic_buffer = base_buffer * 1.5
else:
dynamic_buffer = base_buffer
elif self.original_order_side == "SELL":
# For SELL orders: tolerate more downside movement
if momentum_pct < -0.005: # Strong downward momentum
dynamic_buffer = base_buffer * 2.0
elif momentum_pct < -0.002: # Moderate downward momentum
dynamic_buffer = base_buffer * 1.5
else:
dynamic_buffer = base_buffer
else:
dynamic_buffer = base_buffer
return min(dynamic_buffer, MAX_PRICE_BUFFER_PCT)
def update_price_momentum_history(self, current_price):
"""Track price history for momentum calculation"""
if not hasattr(self, 'price_momentum_history'):
self.price_momentum_history = []
self.price_momentum_history.append(current_price)
if len(self.price_momentum_history) > 10: # Keep last 10 prices
self.price_momentum_history = self.price_momentum_history[-10:]
def manage_orders(self):
"""
Enhanced order management with directional awareness and dynamic price buffering.
Returns: True if an order exists and is valid (don't trade), False if no order (can trade).
"""
open_orders = self.get_open_orders()
my_orders = [o for o in open_orders if o['coin'] == COIN_SYMBOL]
if not my_orders:
self.active_order = None
return False
if len(my_orders) > 1:
logging.warning("Multiple open orders found. Cancelling all for safety.")
for o in my_orders:
self.cancel_order(COIN_SYMBOL, o['oid'])
self.active_order = None
return False
order = my_orders[0]
oid = order['oid']
order_price = float(order['limitPx'])
current_mid = self.get_order_book_mid(COIN_SYMBOL)
pct_diff = abs(current_mid - order_price) / order_price
# Get dynamic price buffer based on market conditions
dynamic_buffer = self.get_dynamic_price_buffer()
# Apply dynamic buffer with enhanced logic
dynamic_buffer = self.get_dynamic_price_buffer()
enhanced_pct_diff = pct_diff * (1 + abs(momentum_pct) * 0.5) if hasattr(self, 'get_price_momentum_pct') else pct_diff
if enhanced_pct_diff > dynamic_buffer:
# Update order side tracking before cancelling
if hasattr(self, 'active_order'):
order_side = "BUY" if my_orders[0]['side'].lower() == 'buy' else "SELL"
if not hasattr(self, 'original_order_side') or self.original_order_side != order_side:
self.original_order_side = order_side
logging.info(f"New order direction tracked: {self.original_order_side}")
logging.info(f"Price moved {pct_diff*100:.3f}% > {dynamic_buffer*100:.3f}% (Dynamic: {self.get_dynamic_price_buffer()*100:.3f}%). Cancelling/Replacing order {oid}.")
self.cancel_order(COIN_SYMBOL, oid)
self.active_order = None
return False
else:
logging.info(f"Pending Order {oid} @ {order_price:.2f} is within range ({pct_diff*100:.3f}%). Dynamic Buffer: {self.get_dynamic_price_buffer()*100:.3f}% Waiting.")
return True
def close_all_positions(self, force_taker=False):
logging.info("Closing all positions (Market Order)...")
try:
# Cancel open orders first
open_orders = self.get_open_orders()
for o in open_orders:
if o['coin'] == COIN_SYMBOL:
self.cancel_order(COIN_SYMBOL, o['oid'])
price = self.get_market_price(COIN_SYMBOL)
pos_data = self.get_current_position(COIN_SYMBOL)
current_pos = pos_data['size']
if current_pos == 0: return
is_buy_to_close = current_pos < 0
final_size = round_to_sz_decimals(abs(current_pos), self.sz_decimals)
if final_size == 0: return
# --- ATTEMPT MAKER CLOSE (Alo) ---
if not force_taker:
try:
book_levels = self.get_order_book_levels(COIN_SYMBOL)
TICK_SIZE = 0.1
if is_buy_to_close: # We are short, need to buy to close
maker_price = book_levels['bid'] - TICK_SIZE
else: # We are long, need to sell to close
maker_price = book_levels['ask'] + TICK_SIZE
logging.info(f"Attempting MAKER CLOSE (Alo): {COIN_SYMBOL} {'BUY' if is_buy_to_close else 'SELL'} {final_size} @ {maker_price:.2f}")
order_result = self.exchange.order(COIN_SYMBOL, is_buy_to_close, final_size, round_to_sig_figs(maker_price, 5), {"limit": {"tif": "Alo"}}, reduce_only=True)
status = order_result["status"]
if status == "ok":
response_data = order_result["response"]["data"]
if "statuses" in response_data and "resting" in response_data["statuses"][0]:
logging.info(f"✅ MAKER CLOSE Order Placed (Alo). OID: {response_data['statuses'][0]['resting']['oid']}")
return
elif "statuses" in response_data and "filled" in response_data["statuses"][0]:
logging.info(f"✅ MAKER CLOSE Order Filled (Alo). OID: {response_data['statuses'][0]['filled']['oid']}")
return
else:
# Fallback if Alo didn't rest or fill immediately in an expected way
logging.warning(f"Alo order result unclear: {order_result}. Falling back to Market Close.")
elif status == "error":
if "Post only order would have immediately matched" in order_result["response"]["data"]["statuses"][0].get("error", ""):
logging.warning("Alo order would have immediately matched. Falling back to Market Close for guaranteed fill.")
else:
logging.error(f"Alo order failed with unknown error: {order_result}. Falling back to Market Close.")
else:
logging.warning(f"Alo order failed with status {status}. Falling back to Market Close.")
except Exception as e:
logging.error(f"Exception during Alo close attempt: {e}. Falling back to Market Close.", exc_info=True)
# --- FALLBACK TO MARKET CLOSE (Ioc) for guaranteed fill ---
logging.info(f"Falling back to MARKET CLOSE (Ioc): {COIN_SYMBOL} {'BUY' if is_buy_to_close else 'SELL'} {final_size} @ {price:.2f} (guaranteed)")
self.exchange.order(COIN_SYMBOL, is_buy_to_close, final_size, round_to_sig_figs(price * (1.05 if is_buy_to_close else 0.95), 5), {"limit": {"tif": "Ioc"}}, reduce_only=True)
self.active_position_id = None
logging.info("✅ MARKET CLOSE Order Placed (Ioc).")
except Exception as e:
logging.error(f"Error closing positions: {e}", exc_info=True)
def run(self):
logging.info(f"Starting Scalper Monitor Loop. Interval: {CHECK_INTERVAL}s")
while True:
try:
active_pos = get_active_automatic_position()
# 1. Global Disable / No Position Check
if not active_pos or not active_pos.get('hedge_enabled', True):
if self.strategy is not None:
logging.info("Hedge Disabled or Position Missing. Closing.")
self.close_all_positions(force_taker=True)
self.strategy = None
time.sleep(CHECK_INTERVAL)
continue
# 2. Explicit CLOSING Status Check
if active_pos.get('status') == 'CLOSING':
logging.info(f"[ALERT] {active_pos['token_id']} is CLOSING. Forcing hedge close.")
self.close_all_positions(force_taker=True)
self.strategy = None
time.sleep(CHECK_INTERVAL)
continue
if self.strategy is None or self.active_position_id != active_pos['token_id']:
logging.info(f"New position {active_pos['token_id']} detected or strategy not initialized. Initializing strategy.")
self._init_strategy(active_pos)
if self.strategy is None:
time.sleep(CHECK_INTERVAL)
continue
if self.strategy is None: continue
# --- ORDER MANAGEMENT ---
if self.manage_orders():
time.sleep(CHECK_INTERVAL)
continue
# 2. Market Data
book_levels = self.get_order_book_levels(COIN_SYMBOL)
if book_levels is None:
# logging.warning("Order book data unavailable. Skipping cycle.")
time.sleep(0.1) # Short sleep before retry
continue
price = book_levels['mid']
funding_rate = self.get_funding_rate(COIN_SYMBOL)
pos_data = self.get_current_position(COIN_SYMBOL)
current_pos_size = pos_data['size']
current_pnl = pos_data['pnl']
# REMOVED: Uniswap spread monitoring for cleaner delta-zero hedging
# Benefits:
# - No external RPC dependency
# - Eliminated spread text overhead
# - Focused on core hedging decisions
# - Cleaner logs with essential information only
spread_text = "" # Empty since spread monitoring removed
# 3. Calculate Logic
calc = self.strategy.calculate_rebalance(price, current_pos_size)
diff_abs = abs(calc['diff'])
# Log ETH price with delta calculation for debugging
eth_price = self.get_market_price(COIN_SYMBOL)
price_delta = eth_price - (self.last_price if self.last_price else 0)
# --- LOGGING OVERHEDGE ---
oh_text = ""
if calc.get('overhedge_pct', 0) > 0:
oh_text = f" | [OH] OH: +{calc['overhedge_pct']*100:.2f}%"
# 4. Dynamic Threshold Calculation
sqrt_Pa = math.sqrt(self.strategy.low_range)
sqrt_Pb = math.sqrt(self.strategy.high_range)
max_potential_eth = self.strategy.L * ((1/sqrt_Pa) - (1/sqrt_Pb))
# Use MIN_THRESHOLD_ETH from config
rebalance_threshold = max(MIN_THRESHOLD_ETH, max_potential_eth * 0.05)
# 5. Determine Hedge Zone
clp_low_range = self.strategy.low_range
clp_high_range = self.strategy.high_range
range_width = clp_high_range - clp_low_range
# Calculate Prices for Zones
# If config > 9, set to None (Disabled Zone)
zone_bottom_limit_price = (clp_low_range + (range_width * ZONE_BOTTOM_HEDGE_LIMIT)) if ZONE_BOTTOM_HEDGE_LIMIT <= 9 else None
zone_close_bottom_price = (clp_low_range + (range_width * ZONE_CLOSE_START)) if ZONE_CLOSE_START <= 9 else None
zone_close_top_price = (clp_low_range + (range_width * ZONE_CLOSE_END)) if ZONE_CLOSE_END <= 9 else None
zone_top_start_price = (clp_low_range + (range_width * ZONE_TOP_HEDGE_START)) if ZONE_TOP_HEDGE_START <= 9 else None
# Update JSON with zone prices if they are None (initially set by uniswap_manager.py)
if active_pos.get('zone_bottom_limit_price') is None:
update_position_zones_in_json(active_pos['token_id'], {
'zone_top_start_price': round(zone_top_start_price, 2) if zone_top_start_price else None,
'zone_close_top_price': round(zone_close_top_price, 2) if zone_close_top_price else None,
'zone_close_bottom_price': round(zone_close_bottom_price, 2) if zone_close_bottom_price else None,
'zone_bottom_limit_price': round(zone_bottom_limit_price, 2) if zone_bottom_limit_price else None
})
# --- DELTA-ZERO HEDGING: Active throughout CLP range ---
# Delta-zero hedging is now active across the entire CLP range
in_hedge_zone = (price >= clp_low_range and price <= clp_high_range)
# Close zone check (for emergency shutdown)
in_close_zone = False
if zone_close_bottom_price is not None and zone_close_top_price is not None:
in_close_zone = (price >= zone_close_bottom_price and price <= zone_close_top_price)
# --- DELTA-ZERO HEDGING EXECUTION LOGIC ---
if in_close_zone:
logging.info(f"ZONE: CLOSE ({price:.2f} in {zone_close_bottom_price:.2f}-{zone_close_top_price:.2f}). PNL: ${current_pnl:.2f}. Closing all hedge positions.")
self.close_all_positions(force_taker=True)
time.sleep(CHECK_INTERVAL)
continue
elif in_hedge_zone:
# DELTA-ZERO HEDGING: Active throughout CLP range
pct_position = (price - clp_low_range) / range_width
# Dynamic threshold adjustment for volatility protection
dynamic_threshold = rebalance_threshold
if hasattr(self, 'last_price') and self.last_price:
price_change_pct = abs(price - self.last_price) / self.last_price
if price_change_pct > 0.003: # >0.3% change = high volatility (adjusted for multi-timeframe)
dynamic_threshold *= DYNAMIC_THRESHOLD_MULTIPLIER
volatility_text = f" | [VOL] HIGH VOLATILITY ({price_change_pct*100:.2f}%)"
else:
volatility_text = ""
else:
volatility_text = ""
# Calculate velocity first, then update price history (Multi-timeframe approach)
if (hasattr(self, 'last_price_for_velocity') and
self.last_price_for_velocity and
hasattr(self, 'price_history') and
len(self.price_history) >= 2):
# Option 3B: Multi-Timeframe Velocity Calculation
# 1-second velocity (instantaneous)
velocity_1s = (price - self.last_price_for_velocity) / self.last_price_for_velocity
# 5-second average velocity (smoother)
velocity_5s = 0.0
if len(self.price_history) >= 5:
price_5s_ago = self.price_history[-5]
velocity_5s = (price - price_5s_ago) / price_5s_ago / 5 # Per second average
# Choose velocity: Use 5s average for normal conditions, 1s for extreme moves
if abs(velocity_1s) > 0.002: # If 1s move is extreme (>0.2%), use it
price_velocity = velocity_1s
else: # Otherwise use 5s average for smoother signals
price_velocity = velocity_5s
# Add validation to prevent extreme readings
if abs(price_velocity) > 0.5: # Cap at 50% change per interval
price_velocity = 0.5 if price_velocity > 0 else -0.5
# Update velocity history for tracking
if not hasattr(self, 'velocity_history'):
self.velocity_history = []
self.velocity_history.append(velocity_1s)
if len(self.velocity_history) > 10: # Keep last 10 velocity readings
self.velocity_history = self.velocity_history[-10:]
else:
price_velocity = 0.0
velocity_1s = 0.0
velocity_5s = 0.0
# Update price history for velocity tracking
if hasattr(self, 'price_history'):
self.price_history.append(price)
# Keep only last 10 prices for velocity calculation (increased from 5)
if len(self.price_history) > 10:
self.price_history = self.price_history[-10:]
# --- COMPREHENSIVE EDGE PROTECTION LOGIC ---
can_trade = True
override_text = ""
cooldown_text = ""
# --- MULTI-LAYER OVERRIDE CONDITIONS ---
bypass_cooldown = False
override_reason = ""
# 1. CRITICAL: Already outside CLP range (highest priority)
if price < clp_low_range or price > clp_high_range:
bypass_cooldown = True
override_reason = "OUTSIDE RANGE (CRITICAL)"
if price < clp_low_range:
override_reason += " (BELOW)"
else:
override_reason += " (ABOVE)"
# 2. URGENT: Within edge proximity AND position still open
elif (hasattr(active_pos, 'status') and
active_pos.get('status') == 'OPEN'):
# Use position-aware edge proximity
position_edge_proximity = POSITION_OPEN_EDGE_PROXIMITY_PCT
distance_from_bottom = price - clp_low_range
distance_from_top = clp_high_range - price
range_width = clp_high_range - clp_low_range
edge_distance = range_width * position_edge_proximity
is_near_bottom = distance_from_bottom <= edge_distance
is_near_top = distance_from_top <= edge_distance
if is_near_bottom or is_near_top:
bypass_cooldown = True
override_reason = f"EDGE PROXIMITY ({position_edge_proximity*100:.1f}% edge)"
if is_near_bottom:
override_reason += f" ({distance_from_bottom:.2f} from bottom)"
else:
override_reason += f" ({distance_from_top:.2f} from top)"
# 3. EMERGENCY: High velocity toward range edge (using smoothed velocity)
elif abs(price_velocity) > VELOCITY_THRESHOLD_PCT:
# Only if moving toward edge
moving_toward_bottom = price_velocity < 0 and price < (clp_low_range * 1.05)
moving_toward_top = price_velocity > 0 and price > (clp_high_range * 0.95)
if moving_toward_bottom or moving_toward_top:
bypass_cooldown = True
# Improved logging with actual price movement context
if self.last_price_for_velocity:
actual_price_move = price - self.last_price_for_velocity
override_reason = f"HIGH VELOCITY ({price_velocity*100:.2f}%/interval, ${actual_price_move:+.2f})"
else:
override_reason = f"HIGH VELOCITY ({price_velocity*100:.2f}%/interval)"
# 4. LARGE GAP: Target hedge is significantly different
elif abs(calc['diff']) > (dynamic_threshold * LARGE_HEDGE_MULTIPLIER):
bypass_cooldown = True
override_reason = f"LARGE HEDGE NEEDED ({abs(calc['diff']):.4f} vs {dynamic_threshold:.4f})"
# Apply cooldown override logic
if bypass_cooldown:
can_trade = True
cooldown_text = f" | 🚨 OVERRIDE: {override_reason}"
self.last_price_for_velocity = price
logging.info(f"[WARN] COOLDOWN BYPASSED: {override_reason}")
elif hasattr(self, 'last_trade_time'):
time_since_last = time.time() - self.last_trade_time
if time_since_last < MIN_TIME_BETWEEN_TRADES:
can_trade = False
cooldown_text = f" | [WAIT] COOLDOWN ({MIN_TIME_BETWEEN_TRADES - time_since_last:.0f}s)"
# Update velocity and momentum tracking
self.last_price_for_velocity = price
self.update_price_momentum_history(price)
if diff_abs > dynamic_threshold and can_trade:
# Use precise decimal rounding to avoid float_to_wire errors
trade_size = round_to_sz_decimals_precise(diff_abs, self.sz_decimals)
# Safety cap: Prevent position from exceeding maximum hedge multiplier
max_allowed_size = calc['target_short'] * MAX_HEDGE_MULTIPLIER
if abs(calc['current_short']) + trade_size > max_allowed_size:
trade_size = max_allowed_size - abs(calc['current_short'])
# Use precise decimal rounding to avoid float_to_wire errors
trade_size = round_to_sz_decimals_precise(trade_size, self.sz_decimals)
safety_text = f" | [SAFE] SIZE CAP ({max_allowed_size:.4f})"
else:
safety_text = ""
min_trade_size = MIN_ORDER_VALUE_USD / price
if trade_size < min_trade_size:
logger.info(f"[DELTA] DELTA-ZERO: Idle. Trade size {trade_size:.4f} < Min {min_trade_size:.4f} (${MIN_ORDER_VALUE_USD:.2f}). Pos: {pct_position*100:.1f}% | PNL: ${current_pnl:.2f}{spread_text}{oh_text}{volatility_text} | ETH: ${eth_price:.2f}{price_delta:+.2f})")
elif trade_size > 0.0001: # Minimum meaningful trade
# Determine Order Type and Urgency
order_type = "Alo" # Default to Maker
is_initial_entry = abs(calc['current_short']) < (trade_size * 0.1) # Less than 10% of target is open
if bypass_cooldown or is_initial_entry:
order_type = "Ioc" # Taker for urgency or start
urgency_reason = "URGENT" if bypass_cooldown else "INITIAL"
logging.info(f"[TRIG] DELTA-ZERO TRIGGERED ({urgency_reason}): {diff_abs:.4f} >= {dynamic_threshold:.4f}. Pos: {pct_position*100:.1f}% | PNL: ${current_pnl:.2f}{spread_text}{oh_text}{volatility_text}{safety_text}")
else:
logging.info(f"[TRIG] DELTA-ZERO TRIGGERED (PASSIVE): {diff_abs:.4f} >= {dynamic_threshold:.4f}. Pos: {pct_position*100:.1f}% | PNL: ${current_pnl:.2f}{spread_text}{oh_text}{volatility_text}{safety_text}")
# Execute
TICK_SIZE = 0.2
is_buy = (calc['action'] == "BUY")
if order_type == "Ioc":
# Taker Price: Cross the spread + slippage tolerance
# Buy at Ask + buffer, Sell at Bid - buffer
# 0.1% slippage tolerance for taker orders
if is_buy:
exec_price = book_levels['ask'] * 1.001
else:
exec_price = book_levels['bid'] * 0.999
else:
# Maker Price: Passive offset
if is_buy:
exec_price = book_levels['bid'] - TICK_SIZE
else:
exec_price = book_levels['ask'] + TICK_SIZE
order_id = self.place_limit_order(COIN_SYMBOL, is_buy, trade_size, exec_price, order_type=order_type)
if order_id:
self.last_trade_time = time.time()
self.track_fills_and_pnl(force=True)
else:
logging.info(f"[DELTA] DELTA-ZERO: Trade size rounds to 0. Pos: {pct_position*100:.1f}% | PNL: ${current_pnl:.2f}{spread_text}{oh_text}{volatility_text}{cooldown_text}")
else:
if not can_trade:
reason = f"Cooldown ({MIN_TIME_BETWEEN_TRADES}s)"
else:
reason = f"Threshold ({diff_abs:.4f} < {dynamic_threshold:.4f})"
# Add velocity context for debugging (show multi-timeframe)
if abs(price_velocity) > 0.001:
velocity_text = f" | Vel: {price_velocity*100:+.2f}% (1s:{velocity_1s*100:+.2f}%,5s:{velocity_5s*100:+.2f}%)"
else:
velocity_text = ""
logger.info(f"[DELTA] DELTA-ZERO: Idle. {reason}. Pos: {pct_position*100:.1f}% | PNL: ${current_pnl:.2f}{spread_text}{oh_text}{volatility_text}{velocity_text}{cooldown_text}")
else:
# OUTSIDE CLP RANGE:
# 1. If ABOVE Range: We are 100% USDC. CLOSE HEDGE.
# 2. If BELOW Range: We are 100% ETH. HOLD HEDGE (Don't Close).
if price > clp_high_range:
zone_text = f"ABOVE range ({price:.2f} > {clp_high_range:.2f})"
logging.info(f"[OUT] OUTSIDE CLP RANGE: {zone_text}. Closing hedge (100% USDC). PNL: ${current_pnl:.2f}")
self.close_all_positions(force_taker=True)
elif price < clp_low_range:
zone_text = f"BELOW range ({price:.2f} < {clp_low_range:.2f})"
# Log periodically (every ~10s) to avoid spam
if int(time.time()) % 20 == 0:
logger.info(f"[HOLD] OUTSIDE CLP RANGE: {zone_text}. Holding hedge (100% ETH). Waiting for Manager signal.")
time.sleep(CHECK_INTERVAL)
continue
# Update PnL/Fees periodically
self.track_fills_and_pnl()
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
logging.info("Stopping Hedger...")
self.close_all_positions()
break
except Exception as e:
logging.error(f"Loop Error: {e}", exc_info=True)
time.sleep(10)
if __name__ == "__main__":
hedger = ScalperHedger()
hedger.run()