Files
hyper/clp_hedger/clp_scalper_hedger.py
2025-12-14 22:11:36 +01:00

562 lines
23 KiB
Python

import os
import time
import logging
import sys
import math
import json
from dotenv import load_dotenv
# --- FIX: Add project root to sys.path to import local modules ---
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
sys.path.append(project_root)
# Now we can import from root
from logging_utils import setup_logging
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants
# Load environment variables from .env in current directory
dotenv_path = os.path.join(current_dir, '.env')
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path)
else:
# Fallback to default search
load_dotenv()
setup_logging("normal", "SCALPER_HEDGER")
# --- CONFIGURATION ---
COIN_SYMBOL = "ETH"
CHECK_INTERVAL = 1 # Faster check for scalper
LEVERAGE = 5 # 3x Leverage
STATUS_FILE = "hedge_status.json"
# --- STRATEGY ZONES (Percent of Range Width) ---
# Bottom Hedge Zone: 0% to 15% -> Active Hedging
ZONE_BOTTOM_HEDGE_LIMIT = 0.5
# Close Zone: 15% to 20% -> Close All Hedges (Flatten)
ZONE_CLOSE_START = 0.51
ZONE_CLOSE_END = 0.52
# Middle Zone: 20% to 85% -> Idle (No new orders, keep existing)
# Implied by gaps between other zones.
# Top Hedge Zone: 85% to 100% -> Active Hedging
ZONE_TOP_HEDGE_START = 0.8
# --- ORDER SETTINGS ---
PRICE_BUFFER_PCT = 0.0005 # 0.05% price move triggers order update
MIN_THRESHOLD_ETH = 0.01 # Minimum trade size in ETH
MIN_ORDER_VALUE_USD = 10.0 # Minimum order value for API safety
def get_active_automatic_position():
if not os.path.exists(STATUS_FILE):
return None
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
for entry in data:
if entry.get('type') == 'AUTOMATIC' and entry.get('status') == 'OPEN':
return entry
except Exception as e:
logging.error(f"ERROR reading status file: {e}")
return None
def update_position_zones_in_json(token_id, zones_data):
"""Updates the active position in JSON with calculated zone prices and formats the entry."""
if not os.path.exists(STATUS_FILE): return
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
updated = False
for i, entry in enumerate(data):
if entry.get('type') == 'AUTOMATIC' and entry.get('status') == 'OPEN' and entry.get('token_id') == token_id:
# Merge Zones
for k, v in zones_data.items():
entry[k] = v
# Format & Reorder
open_ts = entry.get('timestamp_open', int(time.time()))
opened_str = time.strftime('%H:%M %d/%m/%y', time.localtime(open_ts))
# Reconstruct Dict in Order
new_entry = {
"type": entry.get('type'),
"token_id": entry.get('token_id'),
"opened": opened_str,
"status": entry.get('status'),
"entry_price": round(entry.get('entry_price', 0), 2),
"target_value": round(entry.get('target_value', 0), 2),
# Amounts might be string or float or int. Ensure float.
"amount0_initial": round(float(entry.get('amount0_initial', 0)), 4),
"amount1_initial": round(float(entry.get('amount1_initial', 0)), 2),
"range_upper": round(entry.get('range_upper', 0), 2),
"zone_top_start_price": entry.get('zone_top_start_price'),
"zone_close_top_price": entry.get('zone_close_top_price'),
"zone_close_bottom_price": entry.get('zone_close_bottom_price'),
"zone_bottom_limit_price": entry.get('zone_bottom_limit_price'),
"range_lower": round(entry.get('range_lower', 0), 2),
"static_long": entry.get('static_long', 0.0),
"timestamp_open": open_ts,
"timestamp_close": entry.get('timestamp_close')
}
data[i] = new_entry
updated = True
break
if updated:
with open(STATUS_FILE, 'w') as f:
json.dump(data, f, indent=2)
logging.info(f"Updated JSON with Formatted Zone Prices for Position {token_id}")
except Exception as e:
logging.error(f"Error updating JSON zones: {e}")
def round_to_sig_figs(x, sig_figs=5):
if x == 0: return 0.0
return round(x, sig_figs - int(math.floor(math.log10(abs(x)))) - 1)
def round_to_sz_decimals(amount, sz_decimals=4):
return round(abs(amount), sz_decimals)
class HyperliquidStrategy:
def __init__(self, entry_amount0, entry_amount1, target_value, entry_price, low_range, high_range, start_price, static_long=0.0):
self.entry_amount0 = entry_amount0
self.entry_amount1 = entry_amount1
self.target_value = target_value
self.entry_price = entry_price
self.low_range = low_range
self.high_range = high_range
self.static_long = static_long
self.start_price = start_price
self.gap = max(0.0, entry_price - start_price)
self.recovery_target = entry_price + (2 * self.gap)
self.current_mode = "NORMAL"
self.last_switch_time = 0
logging.info(f"Strategy Init. Start Px: {start_price:.2f} | Gap: {self.gap:.2f} | Recovery Tgt: {self.recovery_target:.2f}")
try:
sqrt_P = math.sqrt(entry_price)
sqrt_Pa = math.sqrt(low_range)
sqrt_Pb = math.sqrt(high_range)
self.L = 0.0
# Method 1: Use Amount0 (WETH)
if entry_amount0 > 0:
# If amount is huge (Wei), scale it. If small (ETH), use as is.
if entry_amount0 > 1000: amount0_eth = entry_amount0 / 10**18
else: amount0_eth = entry_amount0
denom0 = (1/sqrt_P) - (1/sqrt_Pb)
if denom0 > 0.00000001:
self.L = amount0_eth / denom0
logging.info(f"Calculated L from Amount0: {self.L:.4f}")
# Method 2: Use Amount1 (USDC)
if self.L == 0.0 and entry_amount1 > 0:
if entry_amount1 > 100000: amount1_usdc = entry_amount1 / 10**6
else: amount1_usdc = entry_amount1
denom1 = sqrt_P - sqrt_Pa
if denom1 > 0.00000001:
self.L = amount1_usdc / denom1
logging.info(f"Calculated L from Amount1: {self.L:.4f}")
# Method 3: Fallback Heuristic
if self.L == 0.0:
logging.warning("Amounts missing or 0. Using Target Value Heuristic.")
max_eth_heuristic = target_value / low_range
denom_h = (1/sqrt_Pa) - (1/sqrt_Pb)
if denom_h > 0:
self.L = max_eth_heuristic / denom_h
logging.info(f"Calculated L from Target Value: {self.L:.4f}")
else:
logging.error("Critical: Denominator 0 in Heuristic. Invalid Range?")
self.L = 0.0
except Exception as e:
logging.error(f"Error calculating liquidity: {e}")
sys.exit(1)
def get_pool_delta(self, current_price):
if current_price >= self.high_range: return 0.0
if current_price <= self.low_range:
sqrt_Pa = math.sqrt(self.low_range)
sqrt_Pb = math.sqrt(self.high_range)
return self.L * ((1/sqrt_Pa) - (1/sqrt_Pb))
sqrt_P = math.sqrt(current_price)
sqrt_Pb = math.sqrt(self.high_range)
return self.L * ((1/sqrt_P) - (1/sqrt_Pb))
def calculate_rebalance(self, current_price, current_short_position_size):
pool_delta = self.get_pool_delta(current_price)
raw_target_short = pool_delta + self.static_long
target_short_size = raw_target_short
diff = target_short_size - abs(current_short_position_size)
return {
"current_price": current_price,
"pool_delta": pool_delta,
"target_short": target_short_size,
"current_short": abs(current_short_position_size),
"diff": diff,
"action": "SELL" if diff > 0 else "BUY",
"mode": "NORMAL"
}
class ScalperHedger:
def __init__(self):
self.private_key = os.environ.get("SCALPER_AGENT_PK")
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.private_key:
logging.error("No SCALPER_AGENT_PK found in .env")
sys.exit(1)
self.account = Account.from_key(self.private_key)
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address)
try:
logging.info(f"Setting leverage to {LEVERAGE}x (Cross)...")
self.exchange.update_leverage(LEVERAGE, COIN_SYMBOL, is_cross=True)
except Exception as e:
logging.error(f"Failed to update leverage: {e}")
self.strategy = None
self.sz_decimals = self._get_sz_decimals(COIN_SYMBOL)
self.active_position_id = None
self.active_order = None
logging.info(f"Scalper Hedger initialized. Agent: {self.account.address}")
def _init_strategy(self, position_data):
try:
entry_amount0 = position_data.get('amount0_initial', 0)
entry_amount1 = position_data.get('amount1_initial', 0)
target_value = position_data.get('target_value', 50.0)
entry_price = position_data['entry_price']
lower = position_data['range_lower']
upper = position_data['range_upper']
static_long = position_data.get('static_long', 0.0)
start_price = self.get_market_price(COIN_SYMBOL)
if start_price is None:
logging.warning("Waiting for initial price to start strategy...")
return
self.strategy = HyperliquidStrategy(
entry_amount0=entry_amount0,
entry_amount1=entry_amount1,
target_value=target_value,
entry_price=entry_price,
low_range=lower,
high_range=upper,
start_price=start_price,
static_long=static_long
)
logging.info(f"Strategy Initialized for Position {position_data['token_id']}.")
self.active_position_id = position_data['token_id']
except Exception as e:
logging.error(f"Failed to init strategy: {e}")
self.strategy = None
def _get_sz_decimals(self, coin):
try:
meta = self.info.meta()
for asset in meta["universe"]:
if asset["name"] == coin:
return asset["szDecimals"]
return 4
except: return 4
def get_market_price(self, coin):
try:
mids = self.info.all_mids()
if coin in mids: return float(mids[coin])
except: pass
return None
def get_order_book_mid(self, coin):
try:
l2_snapshot = self.info.l2_snapshot(coin)
if l2_snapshot and 'levels' in l2_snapshot:
bids = l2_snapshot['levels'][0]
asks = l2_snapshot['levels'][1]
if bids and asks:
best_bid = float(bids[0]['px'])
best_ask = float(asks[0]['px'])
return (best_bid + best_ask) / 2
return self.get_market_price(coin)
except:
return self.get_market_price(coin)
def get_funding_rate(self, coin):
try:
meta, asset_ctxs = self.info.meta_and_asset_ctxs()
for i, asset in enumerate(meta["universe"]):
if asset["name"] == coin:
return float(asset_ctxs[i]["funding"])
return 0.0
except: return 0.0
def get_current_position(self, coin):
try:
user_state = self.info.user_state(self.vault_address or self.account.address)
for pos in user_state["assetPositions"]:
if pos["position"]["coin"] == coin:
return float(pos["position"]["szi"])
return 0.0
except: return 0.0
def get_open_orders(self):
try:
return self.info.open_orders(self.vault_address or self.account.address)
except: return []
def cancel_order(self, coin, oid):
logging.info(f"Cancelling order {oid}...")
try:
return self.exchange.cancel(coin, oid)
except Exception as e:
logging.error(f"Error cancelling order: {e}")
def place_limit_order(self, coin, is_buy, size, price):
logging.info(f"🕒 PLACING LIMIT: {coin} {'BUY' if is_buy else 'SELL'} {size} @ {price:.2f}")
reduce_only = is_buy
try:
# Gtc order (Maker)
limit_px = round_to_sig_figs(price, 5)
order_result = self.exchange.order(coin, is_buy, size, limit_px, {"limit": {"tif": "Gtc"}}, reduce_only=reduce_only)
status = order_result["status"]
if status == "ok":
response_data = order_result["response"]["data"]
if "statuses" in response_data:
status_obj = response_data["statuses"][0]
if "error" in status_obj:
logging.error(f"Order API Error: {status_obj['error']}")
return None
# Parse OID from nested structure
oid = None
if "resting" in status_obj:
oid = status_obj["resting"]["oid"]
elif "filled" in status_obj:
oid = status_obj["filled"]["oid"]
logging.info("Order filled immediately.")
if oid:
logging.info(f"✅ Limit Order Placed: OID {oid}")
return oid
else:
logging.warning(f"Order placed but OID not found in: {status_obj}")
return None
else:
logging.error(f"Order Failed: {order_result}")
return None
except Exception as e:
logging.error(f"Exception during trade: {e}")
return None
def manage_orders(self):
"""
Checks open orders.
Returns: True if an order exists and is valid (don't trade), False if no order (can trade).
"""
open_orders = self.get_open_orders()
my_orders = [o for o in open_orders if o['coin'] == COIN_SYMBOL]
if not my_orders:
self.active_order = None
return False
if len(my_orders) > 1:
logging.warning("Multiple open orders found. Cancelling all for safety.")
for o in my_orders:
self.cancel_order(COIN_SYMBOL, o['oid'])
self.active_order = None
return False
order = my_orders[0]
oid = order['oid']
order_price = float(order['limitPx'])
current_mid = self.get_order_book_mid(COIN_SYMBOL)
pct_diff = abs(current_mid - order_price) / order_price
if pct_diff > PRICE_BUFFER_PCT:
logging.info(f"Price moved {pct_diff*100:.3f}% > {PRICE_BUFFER_PCT*100}%. Cancelling/Replacing order {oid}.")
self.cancel_order(COIN_SYMBOL, oid)
self.active_order = None
return False
else:
logging.info(f"Pending Order {oid} @ {order_price:.2f} is within range ({pct_diff*100:.3f}%). Waiting.")
return True
def close_all_positions(self):
logging.info("Closing all positions (Market Order)...")
try:
# Cancel open orders first
open_orders = self.get_open_orders()
for o in open_orders:
if o['coin'] == COIN_SYMBOL:
self.cancel_order(COIN_SYMBOL, o['oid'])
price = self.get_market_price(COIN_SYMBOL)
current_pos = self.get_current_position(COIN_SYMBOL)
if current_pos == 0: return
is_buy = current_pos < 0
final_size = round_to_sz_decimals(abs(current_pos), self.sz_decimals)
if final_size == 0: return
# Market order for closing
self.exchange.order(COIN_SYMBOL, is_buy, final_size, round_to_sig_figs(price * (1.05 if is_buy else 0.95), 5), {"limit": {"tif": "Ioc"}}, reduce_only=True)
self.active_position_id = None
except Exception as e:
logging.error(f"Error closing: {e}")
def run(self):
logging.info(f"Starting Scalper Monitor Loop. Interval: {CHECK_INTERVAL}s")
while True:
try:
active_pos = get_active_automatic_position()
# Check Global Enable Switch
if not active_pos or not active_pos.get('hedge_enabled', True):
if self.strategy is not None:
logging.info("Hedge Disabled or Position Closed. Closing remaining positions.")
self.close_all_positions()
self.strategy = None
else:
pass
time.sleep(CHECK_INTERVAL)
continue
if self.strategy is None or self.active_position_id != active_pos['token_id']:
logging.info(f"New position {active_pos['token_id']} detected or strategy not initialized. Initializing strategy.")
self._init_strategy(active_pos)
if self.strategy is None:
time.sleep(CHECK_INTERVAL)
continue
if self.strategy is None: continue
# --- ORDER MANAGEMENT ---
if self.manage_orders():
time.sleep(CHECK_INTERVAL)
continue
# 2. Market Data
price = self.get_order_book_mid(COIN_SYMBOL)
if price is None:
time.sleep(5)
continue
funding_rate = self.get_funding_rate(COIN_SYMBOL)
current_pos_size = self.get_current_position(COIN_SYMBOL)
# 3. Calculate Logic
calc = self.strategy.calculate_rebalance(price, current_pos_size)
diff_abs = abs(calc['diff'])
# 4. Dynamic Threshold Calculation
sqrt_Pa = math.sqrt(self.strategy.low_range)
sqrt_Pb = math.sqrt(self.strategy.high_range)
max_potential_eth = self.strategy.L * ((1/sqrt_Pa) - (1/sqrt_Pb))
# Use MIN_THRESHOLD_ETH from config
rebalance_threshold = max(MIN_THRESHOLD_ETH, max_potential_eth * 0.05)
# 5. Determine Hedge Zone
clp_low_range = self.strategy.low_range
clp_high_range = self.strategy.high_range
range_width = clp_high_range - clp_low_range
# Calculate Prices for Zones
zone_bottom_limit_price = clp_low_range + (range_width * ZONE_BOTTOM_HEDGE_LIMIT)
zone_close_bottom_price = clp_low_range + (range_width * ZONE_CLOSE_START)
zone_close_top_price = clp_low_range + (range_width * ZONE_CLOSE_END)
zone_top_start_price = clp_low_range + (range_width * ZONE_TOP_HEDGE_START)
# Update JSON with zone prices if missing
if 'zone_bottom_limit_price' not in active_pos:
update_position_zones_in_json(active_pos['token_id'], {
'zone_top_start_price': round(zone_top_start_price, 2),
'zone_close_top_price': round(zone_close_top_price, 2),
'zone_close_bottom_price': round(zone_close_bottom_price, 2),
'zone_bottom_limit_price': round(zone_bottom_limit_price, 2)
})
# Check Zones
in_close_zone = (price >= zone_close_bottom_price and price <= zone_close_top_price)
in_hedge_zone = (price <= zone_bottom_limit_price) or (price >= zone_top_start_price)
# --- Execute Logic ---
if in_close_zone:
logging.info(f"ZONE: CLOSE ({price:.2f} in {zone_close_bottom_price:.2f}-{zone_close_top_price:.2f}). Closing all hedge positions.")
self.close_all_positions()
time.sleep(CHECK_INTERVAL)
continue
elif in_hedge_zone:
# HEDGE NORMALLY
if diff_abs > rebalance_threshold:
trade_size = round_to_sz_decimals(diff_abs, self.sz_decimals)
# --- SOFT START LOGIC (Bottom Zone Only) ---
# If in Bottom Zone, opening a NEW Short (SELL), and current position is 0 -> Cut size by 50%
if (price <= zone_bottom_limit_price) and (current_pos_size == 0) and (calc['action'] == "SELL"):
logging.info(f"🔰 SOFT START: Reducing initial hedge size by 50% in Bottom Zone.")
trade_size = round_to_sz_decimals(trade_size * 0.5, self.sz_decimals)
min_trade_size = MIN_ORDER_VALUE_USD / price
if trade_size < min_trade_size:
logging.info(f"Idle. Trade size {trade_size} < Min Order Size {min_trade_size:.4f} (${MIN_ORDER_VALUE_USD:.2f})")
elif trade_size > 0:
logging.info(f"⚡ THRESHOLD TRIGGERED ({diff_abs:.4f} >= {rebalance_threshold:.4f}). In Hedge Zone.")
is_buy = (calc['action'] == "BUY")
self.place_limit_order(COIN_SYMBOL, is_buy, trade_size, price)
else:
logging.info("Trade size rounds to 0. Skipping.")
else:
logging.info(f"Idle. Diff {diff_abs:.4f} < Threshold {rebalance_threshold:.4f}. In Hedge Zone.")
else:
# MIDDLE ZONE (IDLE)
pct_position = (price - clp_low_range) / range_width
logging.info(f"Idle. In Middle Zone ({pct_position*100:.1f}%). No Actions.")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
logging.info("Stopping Hedger...")
self.close_all_positions()
break
except Exception as e:
logging.error(f"Loop Error: {e}", exc_info=True)
time.sleep(10)
if __name__ == "__main__":
hedger = ScalperHedger()
hedger.run()