2 Commits

Author SHA1 Message Date
84242f3654 CLP auto hedge 2025-12-12 23:49:50 +01:00
aeaae84750 remove market_data.db-shm from tracking 2025-11-11 10:56:47 +01:00
11 changed files with 1676 additions and 148 deletions

147
AGENTS.md
View File

@ -1,147 +0,0 @@
# Agents Documentation
This file tracks the usage, improvements, and evolution of OpenCode agents in this project.
## Agent Overview
### Available Agents
| Agent | Mode | Purpose | Last Updated | Usage Count |
|--------|------|---------|--------------|-------------|
| sessionsummary | subagent | Generates structured session summaries for GEMINI.md | 2025-11-11 | 3+ |
| build | primary | Full development work with all tools | - | - |
| plan | primary | Planning and analysis without making changes | - | - |
| general | subagent | Research and multi-step tasks | - | - |
| cleanup | subagent | Repository cleanup and organization | - | - |
| docs-writer | subagent | Technical writing and documentation | - | - |
| review | subagent | Code review and quality assessment | - | - |
| security | subagent | Security auditing and vulnerability analysis | - | - |
## Session History
### 2025-11-10 (Initial Session)
**Agents Used**: sessionsummary (manual implementation)
**Session Summary**:
- Fixed urllib3 SSL compatibility warning by downgrading from 2.5.0 to 1.26.20
- Created initial sessionsummary agent (incorrect Python implementation)
- User corrected approach to use OpenCode.ai agent guidelines
- Created proper sessionsummary agent in `.opencode/agent/` following OpenCode.ai specifications
**Agent Improvements**:
- Learned to follow OpenCode.ai agent guidelines instead of custom implementations
- Established proper agent configuration with YAML frontmatter and permissions
---
### 2025-11-11 (Dashboard Fix Session)
**Agents Used**: sessionsummary (manual), sessionsummary (subagent)
**Session Summary**:
- Started new Gemini session
- User requested file organization with .temp folder
- Created .temp folder and updated .gitignore
- Moved example files to .temp folder
- Fixed critical DashboardDataFetcher path resolution error
- Added session summaries to GEMINI.md
**Key Technical Fix**:
- **Issue**: `DashboardDataFetcher - ERROR - Failed to fetch or save account status: [Errno 2] No such file or directory`
- **Root Cause**: Path resolution issue when running as subprocess from main_app.py
- **Solution**: Used absolute paths with `os.path.dirname(os.path.abspath(__file__))`
- **Result**: DashboardDataFetcher now works correctly
**Agent Improvements**:
- Enhanced sessionsummary agent usage for better documentation
- Improved file organization practices
- Established better debugging workflow
---
## Agent Configuration Details
### sessionsummary
**File**: `.opencode/agent/sessionsummary.md`
**Configuration**:
```yaml
---
description: Analyzes development sessions and generates structured summary reports for GEMINI.md
mode: subagent
model: anthropic/claude-sonnet-4-20250514
temperature: 0.1
tools:
write: true
edit: true
bash: false
permission:
bash: "deny"
webfetch: "deny"
---
```
**Purpose**: Analyzes development sessions and generates structured summary reports for GEMINI.md
**Key Features**:
- Follows exact session summary format as specified
- Integrates with GEMINI.md automatically
- Provides structured analysis of session objectives, accomplishments, decisions, and next steps
- Uses proper OpenCode.ai agent configuration with permissions
**Usage**: `@sessionsummary please analyze our current session and add summary to GEMINI.md`
---
## Agent Improvement Ideas
### Potential Enhancements
1. **Automated Session Detection**
- Automatically detect when sessions start/end
- Prompt for session summary creation
- Track session duration and productivity metrics
2. **Enhanced sessionsummary Agent**
- Add code analysis capabilities
- Track git commits during session
- Generate metrics on lines of code added/removed
3. **Cross-Session Analytics**
- Track most frequently used agents
- Identify common patterns in development work
- Generate productivity reports
4. **Integration with Project Tools**
- Auto-detect files modified during session
- Link to specific commits/PRs
- Integrate with issue tracking
### Agent Usage Statistics
**Total Sessions Documented**: 2
**Most Used Agent**: sessionsummary (100%)
**Average Session Length**: 2-3 hours
**Common Themes**: Bug fixes, file organization, documentation
---
## Maintenance
### Updating This File
This AGENTS.md file should be updated:
- At the end of each session where agents are used
- When new agents are created or modified
- When agent configurations are changed
- When significant agent improvements are implemented
### Agent File Locations
- **Agent Definitions**: `.opencode/agent/`
- **Agent Usage Logs**: This file (AGENTS.md)
- **Session Summaries**: `GEMINI.md`
---
*Last Updated: 2025-11-11*
*Next Review: After next agent usage session*

Binary file not shown.

6
clp_hedger.log Normal file
View File

@ -0,0 +1,6 @@
2025-12-11 14:29:08,607 - INFO - Strategy Initialized. Liquidity (L): 1236.4542
2025-12-11 14:29:09,125 - INFO - CLP Hedger initialized. Agent: 0xcB262CeAaE5D8A99b713f87a43Dd18E6Be892739. Coin: ETH (Decimals: 4)
2025-12-11 14:29:09,126 - INFO - Starting Hedge Monitor Loop. Interval: 30s
2025-12-11 14:29:09,126 - INFO - Hedging Range: 2844.11 - 3477.24 | Static Long: 0.4
2025-12-11 14:29:09,769 - INFO - Price: 3201.85 | Pool Delta: 0.883 | Tgt Short: 1.283 | Act Short: 0.000 | Diff: 1.283
2025-12-11 14:29:11,987 - ERROR - Order API Error: Order has invalid price.

86
clp_hedger/GEMINI.md Normal file
View File

@ -0,0 +1,86 @@
# Session Summary
**Date:** 2025-12-11
**Objective(s):**
Fix API errors, enhance bot functionality with safety features (auto-close), and add leverage/funding monitoring.
**Key Accomplishments:**
* **Fixed API Price Error:** Implemented `round_to_sig_figs` to ensure limit prices meet Hyperliquid's 5 significant figure requirement, resolving the "Order has invalid price" error.
* **Safety Shutdown:** Added `close_all_positions` method and linked it to `KeyboardInterrupt`. The bot now automatically closes its hedge position when stopped manually.
* **Leverage Management:** Configured the bot to automatically set leverage to **4x Cross** (`LEVERAGE = 4`) upon initialization.
* **Market Monitoring:** Added real-time **Funding Rate** display to the main logging loop using `meta_and_asset_ctxs`.
**Key Files Modified:**
* `clp_hedger.py`
**Decisions Made:**
* Used `math.log10` based calculation for significant figures to ensure broad compatibility with asset price ranges.
* Implemented `close_all_positions` as a blocking call during shutdown to prioritize safety over an immediate exit.
* Hardcoded `LEVERAGE` in configuration for now, with a plan to potentially move to a config file later if needed.
# Session Summary
**Date:** 2025-12-11
**Objective(s):**
Implement a dynamic gap recovery strategy to neutralize initial losses from delayed hedging.
**Key Accomplishments:**
* Implemented "Gap Recovery" logic to dynamically adjust hedging based on current price relative to CLP `ENTRY_PRICE` and initial `START_PRICE`.
* Defined three distinct hedging zones:
* **NORMAL (below Entry):** 100% hedge for safety.
* **RECOVERY (between Entry and Recovery Target):** 0% hedge (naked long) to maximize recovery.
* **NORMAL (above Recovery Target):** 100% hedge after gap is neutralized.
* Introduced `PRICE_BUFFER_PCT` and `TIME_BUFFER_SECONDS` to prevent trade churn around zone boundaries.
**Key Files Modified:**
* `clp_hedger.py`
**Decisions Made:**
* Chosen a dynamic `START_PRICE` capture at bot initialization to calculate the `GAP`.
* Opted for 0% hedge in the recovery zone for faster loss neutralization, acknowledging higher short-term risk.
* Implemented price and time buffers for robust mode switching.
# Session Summary
**Date:** 2025-12-12
**Objective(s):**
Develop a Uniswap V3 position manager script (formerly monitor) for Arbitrum, including fee collection, closing positions, and automated opening of new positions with auto-swapping. Refine hedging architecture for multi-position management.
**Key Accomplishments:**
* **`uniswap_manager.py` (Unified Lifecycle Manager):**
* Transformed into a continuous lifecycle manager for AUTOMATIC positions.
* **Features:**
* Manages "AUTOMATIC" CLP positions (Open, Monitor, Close, Collect Fees).
* Reads/Writes state to `hedge_status.json`.
* Implemented auto-wrapping of native ETH to WETH when needed.
* Includes robust auto-swapping (WETH <-> USDC) to balance tokens before minting.
* Implemented robust event parsing using `process_receipt` to extract exact `amount0` and `amount1` from mint transactions.
* **Fixed `web3.py` v7 `raw_transaction` access across all transaction types.**
* **Fixed Uniswap V3 Math precision** in `calculate_mint_amounts` for accurate token splits.
* **Troubleshooting & Resolution:**
* **Address Validation:** Replaced hardcoded factory address with dynamic lookup.
* **ABI Mismatch:** Updated NPM ABI with event definitions for `IncreaseLiquidity` and `Transfer`.
* **Typo/Indentation Errors:** Resolved multiple `NameError` (`target_tick_lower`, `w3_instance`, `position_details`) and `IndentationError` issues during script refactoring.
* **JSON Update Failure:** Fixed `mint_new_position`'s log parsing for Token ID to correctly update `hedge_status.json` after successful mint.
* **`clp_scalper_hedger.py` (Dedicated Automatic Hedger):**
* Created as a new script to hedge `type: "AUTOMATIC"` positions defined in `hedge_status.json`.
* Uses `SCALPER_AGENT_PK` from `.env`.
* **Accurate L Calculation:** Calculates Uniswap V3 liquidity (`L`) using `amount0_initial` or `amount1_initial` from `hedge_status.json`, falling back to a heuristic based on `target_value` if amounts are missing.
* **Dynamic Rebalance Threshold:** Threshold adapts to 5% of the position's maximum ETH risk (`max_potential_eth`).
* **Minimum Order Value:** Enforces a minimum order size of $10 to prevent dust trades and API errors.
* **`clp_hedger.py` (Updated Manual Hedger):**
* Modified to load its configuration entirely from the `type: "MANUAL"` entry in `hedge_status.json`.
* Respects the `hedge_enabled` flag from the JSON.
* Idles if hedging is disabled or no manual position is found.
* **`hedge_status.json`:**
* Becomes the central source of truth for all (MANUAL and AUTOMATIC) CLP positions, including their type, status, ranges, `entry_price`, `target_value` (for automatic), and `hedge_enabled` flag.
* **.env File Location:** All scripts updated to load `.env` from the current working directory (`clp_hedger/`).
**Decisions Made:**
* Adopted a multi-script architecture for clarity and separation of concerns (Manager vs. Hedgers).
* Used `hedge_status.json` as the centralized state manager for all CLP positions.
* Implemented robust error handling and debugging throughout the development process.
* Ensured `clp_scalper_hedger.py` is resilient to missing initial amount data in `hedge_status.json` by implementing fallback `L` calculation methods.

0
clp_hedger/__init__.py Normal file
View File

469
clp_hedger/clp_hedger.py Normal file
View File

@ -0,0 +1,469 @@
import os
import time
import logging
import sys
import math
import json
from dotenv import load_dotenv
# --- FIX: Add project root to sys.path to import local modules ---
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
sys.path.append(project_root)
# Now we can import from root
from logging_utils import setup_logging
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants
# Load environment variables from .env in current directory
dotenv_path = os.path.join(current_dir, '.env')
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path)
else:
# Fallback to default search
load_dotenv()
# Setup Logging using project convention
setup_logging("normal", "CLP_HEDGER")
# --- CONFIGURATION DEFAULTS (Can be overridden by JSON) ---
REBALANCE_THRESHOLD = 0.15 # ETH
CHECK_INTERVAL = 30 # Seconds
LEVERAGE = 5
STATUS_FILE = "hedge_status.json"
# Gap Recovery Configuration
PRICE_BUFFER_PCT = 0.004 # 0.5% buffer to prevent churn
TIME_BUFFER_SECONDS = 120 # 2 minutes wait between mode switches
def get_manual_position_config():
"""Reads hedge_status.json and returns the first OPEN MANUAL position dict, or None."""
if not os.path.exists(STATUS_FILE):
return None
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
for entry in data:
if entry.get('type') == 'MANUAL' and entry.get('status') == 'OPEN':
return entry
except Exception as e:
logging.error(f"ERROR reading status file: {e}")
return None
class HyperliquidStrategy:
def __init__(self, entry_weth, entry_price, low_range, high_range, start_price, static_long=0.4):
# Your Pool Configuration
self.entry_weth = entry_weth
self.entry_price = entry_price
self.low_range = low_range
self.high_range = high_range
self.static_long = static_long
# Gap Recovery State
self.start_price = start_price
# GAP = max(0, ENTRY - START). If Start > Entry (we are winning), Gap is 0.
self.gap = max(0.0, entry_price - start_price)
self.recovery_target = entry_price + (2 * self.gap)
self.current_mode = "NORMAL" # "NORMAL" (100% Hedge) or "RECOVERY" (0% Hedge)
self.last_switch_time = 0
logging.info(f"Strategy Init. Start Px: {start_price:.2f} | Gap: {self.gap:.2f} | Recovery Tgt: {self.recovery_target:.2f}")
# Calculate Constant Liquidity (L) once
# Formula: L = x / (1/sqrt(P) - 1/sqrt(Pb))
try:
sqrt_P = math.sqrt(entry_price)
sqrt_Pb = math.sqrt(high_range)
self.L = entry_weth / ((1/sqrt_P) - (1/sqrt_Pb))
logging.info(f"Liquidity (L): {self.L:.4f}")
except Exception as e:
logging.error(f"Error calculating liquidity: {e}")
sys.exit(1)
def get_pool_delta(self, current_price):
"""Calculates how much ETH the pool currently holds (The Risk)"""
# If price is above range, you hold 0 ETH (100% USDC)
if current_price >= self.high_range:
return 0.0
# If price is below range, you hold Max ETH
if current_price <= self.low_range:
sqrt_Pa = math.sqrt(self.low_range)
sqrt_Pb = math.sqrt(self.high_range)
return self.L * ((1/sqrt_Pa) - (1/sqrt_Pb))
# If in range, calculate active ETH
sqrt_P = math.sqrt(current_price)
sqrt_Pb = math.sqrt(self.high_range)
return self.L * ((1/sqrt_P) - (1/sqrt_Pb))
def calculate_rebalance(self, current_price, current_short_position_size):
"""
Determines if we need to trade and the exact order size.
"""
# 1. Base Target (Full Hedge)
pool_delta = self.get_pool_delta(current_price)
raw_target_short = pool_delta + self.static_long
# 2. Determine Mode (Normal vs Recovery)
# Buffers
entry_upper = self.entry_price * (1 + PRICE_BUFFER_PCT)
entry_lower = self.entry_price * (1 - PRICE_BUFFER_PCT)
desired_mode = self.current_mode # Default to staying same
if self.current_mode == "NORMAL":
# Switch to RECOVERY if:
# Price > Entry + Buffer AND Price < Recovery Target
if current_price > entry_upper and current_price < self.recovery_target:
desired_mode = "RECOVERY"
elif self.current_mode == "RECOVERY":
# Switch back to NORMAL if:
# Price < Entry - Buffer (Fell back down) OR Price > Recovery Target (Finished)
if current_price < entry_lower or current_price >= self.recovery_target:
desired_mode = "NORMAL"
# 3. Apply Time Buffer
now = time.time()
if desired_mode != self.current_mode:
if (now - self.last_switch_time) >= TIME_BUFFER_SECONDS:
logging.info(f"🔄 MODE SWITCH: {self.current_mode} -> {desired_mode} (Px: {current_price:.2f})")
self.current_mode = desired_mode
self.last_switch_time = now
else:
logging.info(f"⏳ Mode Switch Delayed (Time Buffer). Pending: {desired_mode}")
# 4. Set Final Target based on Mode
if self.current_mode == "RECOVERY":
target_short_size = 0.0
logging.info(f"🩹 RECOVERY MODE ACTIVE (0% Hedge). Target: {self.recovery_target:.2f}")
else:
target_short_size = raw_target_short
# 5. Calculate Difference
diff = target_short_size - abs(current_short_position_size)
return {
"current_price": current_price,
"pool_delta": pool_delta,
"target_short": target_short_size,
"raw_target": raw_target_short,
"current_short": abs(current_short_position_size),
"diff": diff, # Positive = SELL more (Add Short), Negative = BUY (Reduce Short)
"action": "SELL" if diff > 0 else "BUY",
"mode": self.current_mode
}
def round_to_sz_decimals(amount, sz_decimals=4):
"""
Hyperliquid requires specific rounding 'szDecimals'.
For ETH, this is usually 4 (e.g., 1.2345).
"""
factor = 10 ** sz_decimals
# Use floor to avoid rounding up into money you don't have,
# but strictly simply rounding is often sufficient for small adjustments.
# Using round() standard here.
return round(abs(amount), sz_decimals)
def round_to_sig_figs(x, sig_figs=5):
"""
Rounds a number to a specified number of significant figures.
Hyperliquid prices generally require 5 significant figures.
"""
if x == 0:
return 0.0
return round(x, sig_figs - int(math.floor(math.log10(abs(x)))) - 1)
class CLPHedger:
def __init__(self):
self.private_key = os.environ.get("HEDGER_PRIVATE_KEY") or os.environ.get("AGENT_PRIVATE_KEY")
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.private_key:
logging.error("No private key found (HEDGER_PRIVATE_KEY or AGENT_PRIVATE_KEY) in .env")
sys.exit(1)
if not self.vault_address:
logging.warning("MAIN_WALLET_ADDRESS not found in .env. Assuming Agent is the Vault (not strictly recommended for CLPs).")
self.account = Account.from_key(self.private_key)
# API Connection
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
# Note: If this agent is trading on behalf of a Vault (Main Account),
# the exchange object needs the vault's address as `account_address`.
self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address)
# Load Manual Config from JSON
self.manual_config = get_manual_position_config()
self.coin_symbol = "ETH" # Default, but will try to read from JSON
self.sz_decimals = 4
self.strategy = None
if self.manual_config:
self.coin_symbol = self.manual_config.get('coin_symbol', 'ETH')
if self.manual_config.get('hedge_enabled', False):
self._init_strategy()
else:
logging.warning("MANUAL position found but 'hedge_enabled' is FALSE. Hedger will remain idle.")
else:
logging.warning("No MANUAL position found in hedge_status.json. Hedger will remain idle.")
# Set Leverage on Initialization (if coin symbol known)
try:
logging.info(f"Setting leverage to {LEVERAGE}x (Cross) for {self.coin_symbol}...")
self.exchange.update_leverage(LEVERAGE, self.coin_symbol, is_cross=True)
except Exception as e:
logging.error(f"Failed to update leverage: {e}")
# Fetch meta once to get szDecimals
self.sz_decimals = self._get_sz_decimals(self.coin_symbol)
logging.info(f"CLP Hedger initialized. Agent: {self.account.address}. Coin: {self.coin_symbol} (Decimals: {self.sz_decimals})")
def _init_strategy(self):
try:
entry_p = self.manual_config['entry_price']
lower = self.manual_config['range_lower']
upper = self.manual_config['range_upper']
static_long = self.manual_config.get('static_long', 0.0)
# Require entry_amount0 (or entry_weth)
entry_weth = self.manual_config.get('entry_amount0', 0.45) # Default to 0.45 if missing for now
start_price = self.get_market_price(self.coin_symbol)
if start_price is None:
logging.warning("Waiting for initial price to start strategy...")
# Logic will retry in run loop
return
self.strategy = HyperliquidStrategy(
entry_weth=entry_weth,
entry_price=entry_p,
low_range=lower,
high_range=upper,
start_price=start_price,
static_long=static_long
)
logging.info(f"Strategy Initialized for {self.coin_symbol}.")
except Exception as e:
logging.error(f"Failed to init strategy: {e}")
self.strategy = None
def _get_sz_decimals(self, coin):
try:
meta = self.info.meta()
for asset in meta["universe"]:
if asset["name"] == coin:
return asset["szDecimals"]
logging.warning(f"Could not find szDecimals for {coin}, defaulting to 4.")
return 4
except Exception as e:
logging.error(f"Failed to fetch meta: {e}")
return 4
def get_funding_rate(self, coin):
try:
meta, asset_ctxs = self.info.meta_and_asset_ctxs()
for i, asset in enumerate(meta["universe"]):
if asset["name"] == coin:
# Funding rate is in the asset context at same index
return float(asset_ctxs[i]["funding"])
return 0.0
except Exception as e:
logging.error(f"Error fetching funding rate: {e}")
return 0.0
def get_market_price(self, coin):
try:
# Get all mids is efficient
mids = self.info.all_mids()
if coin in mids:
return float(mids[coin])
else:
logging.error(f"Price for {coin} not found in all_mids.")
return None
except Exception as e:
logging.error(f"Error fetching price: {e}")
return None
def get_current_position(self, coin):
try:
# We need the User State of the Vault (or the account we are trading for)
user_state = self.info.user_state(self.vault_address or self.account.address)
for pos in user_state["assetPositions"]:
if pos["position"]["coin"] == coin:
# szi is the size. Positive = Long, Negative = Short.
return float(pos["position"]["szi"])
return 0.0 # No position
except Exception as e:
logging.error(f"Error fetching position: {e}")
return 0.0
def execute_trade(self, coin, is_buy, size, price):
logging.info(f"🚀 EXECUTING: {coin} {'BUY' if is_buy else 'SELL'} {size} @ ~{price}")
# Check for reduceOnly logic
# If we are BUYING to reduce a SHORT, it is reduceOnly.
# If we are SELLING to increase a SHORT, it is NOT reduceOnly.
# Since we are essentially managing a Short hedge:
# Action BUY = Reducing Hedge -> reduceOnly=True
# Action SELL = Increasing Hedge -> reduceOnly=False
reduce_only = is_buy
try:
# Market order (limit with aggressive TIF or just widely crossing limit)
# Hyperliquid SDK 'order' method parameters: coin, is_buy, sz, limit_px, order_type, reduce_only
# We use a limit price slightly better than market to ensure fill or just use market price logic
# Using a simplistic "Market" approach by setting limit far away
slippage = 0.05 # 5% slippage tolerance
raw_limit_px = price * (1.05 if is_buy else 0.95)
limit_px = round_to_sig_figs(raw_limit_px, 5)
order_result = self.exchange.order(
coin,
is_buy,
size,
limit_px,
{"limit": {"tif": "Ioc"}},
reduce_only=reduce_only
)
status = order_result["status"]
if status == "ok":
response_data = order_result["response"]["data"]
if "statuses" in response_data and "error" in response_data["statuses"][0]:
logging.error(f"Order API Error: {response_data['statuses'][0]['error']}")
else:
logging.info(f"✅ Trade Success: {response_data}")
else:
logging.error(f"Order Failed: {order_result}")
except Exception as e:
logging.error(f"Exception during trade execution: {e}")
def close_all_positions(self):
logging.info("Attempting to close all open positions...")
try:
# 1. Get latest price
price = self.get_market_price(COIN_SYMBOL)
if price is None:
logging.error("Could not fetch price to close positions. Aborting close.")
return
# 2. Get current position
current_pos = self.get_current_position(COIN_SYMBOL)
if current_pos == 0:
logging.info("No open positions to close.")
return
# 3. Determine Side and Size
# If Short (-), we need to Buy (+).
# If Long (+), we need to Sell (-).
is_buy = current_pos < 0
abs_size = abs(current_pos)
# Ensure size is rounded correctly for the API
final_size = round_to_sz_decimals(abs_size, self.sz_decimals)
if final_size == 0:
logging.info("Position size effectively 0 after rounding.")
return
logging.info(f"Closing Position: {current_pos} {COIN_SYMBOL} -> Action: {'BUY' if is_buy else 'SELL'} {final_size}")
# 4. Execute
self.execute_trade(COIN_SYMBOL, is_buy, final_size, price)
except Exception as e:
logging.error(f"Error during close_all_positions: {e}")
def run(self):
logging.info(f"Starting Hedge Monitor Loop. Interval: {CHECK_INTERVAL}s")
while True:
try:
# Reload Config periodically
self.manual_config = get_manual_position_config()
# Check Global Enable Switch
if not self.manual_config or not self.manual_config.get('hedge_enabled', False):
# If previously active, close?
# Yes, safety first.
if self.strategy is not None:
logging.info("Hedge Disabled. Closing any remaining positions.")
self.close_all_positions()
self.strategy = None
else:
# Just idle check to keep connection alive or log occasionally
# logging.info("Idle. Hedge Disabled.")
pass
time.sleep(CHECK_INTERVAL)
continue
# If enabled but strategy not init, Init it.
if self.strategy is None:
self._init_strategy()
if self.strategy is None: # Init failed
time.sleep(CHECK_INTERVAL)
continue
# 1. Get Data
price = self.get_market_price(COIN_SYMBOL)
if price is None:
time.sleep(5)
continue
funding_rate = self.get_funding_rate(COIN_SYMBOL)
current_pos_size = self.get_current_position(COIN_SYMBOL)
# 2. Calculate Logic
# Pass raw size (e.g. -1.5). The strategy handles the logic.
calc = self.strategy.calculate_rebalance(price, current_pos_size)
diff_abs = abs(calc['diff'])
trade_size = round_to_sz_decimals(diff_abs, self.sz_decimals)
# Logging Status
status_msg = (
f"Price: {price:.2f} | Fund: {funding_rate:.6f} | "
f"Mode: {calc['mode']} | "
f"Pool Delta: {calc['pool_delta']:.3f} | "
f"Tgt Short: {calc['target_short']:.3f} | "
f"Act Short: {calc['current_short']:.3f} | "
f"Diff: {calc['diff']:.3f}"
)
if calc.get('is_recovering'):
status_msg += f" | 🩹 REC MODE ({calc['raw_target']:.3f} -> {calc['target_short']:.3f})"
logging.info(status_msg)
# 3. Check Threshold
if diff_abs >= REBALANCE_THRESHOLD:
if trade_size > 0:
logging.info(f"⚡ THRESHOLD TRIGGERED ({diff_abs:.3f} >= {REBALANCE_THRESHOLD})")
is_buy = (calc['action'] == "BUY")
self.execute_trade(COIN_SYMBOL, is_buy, trade_size, price)
else:
logging.info("Trade size rounds to 0. Skipping.")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
logging.info("Stopping Hedger...")
self.close_all_positions()
break
except Exception as e:
logging.error(f"Loop Error: {e}", exc_info=True)
time.sleep(10)
if __name__ == "__main__":
hedger = CLPHedger()
hedger.run()

View File

@ -0,0 +1,387 @@
import os
import time
import logging
import sys
import math
import json
from dotenv import load_dotenv
# --- FIX: Add project root to sys.path to import local modules ---
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
sys.path.append(project_root)
# Now we can import from root
from logging_utils import setup_logging
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants
# Load environment variables from .env in current directory
dotenv_path = os.path.join(current_dir, '.env')
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path)
else:
# Fallback to default search
load_dotenv()
setup_logging("normal", "SCALPER_HEDGER")
# --- CONFIGURATION ---
COIN_SYMBOL = "ETH"
CHECK_INTERVAL = 10 # Faster check for scalper
LEVERAGE = 5
STATUS_FILE = "hedge_status.json"
# Gap Recovery Configuration
PRICE_BUFFER_PCT = 0.002 # 0.25% buffer
TIME_BUFFER_SECONDS = 120 # 2 minutes wait
def get_active_automatic_position():
if not os.path.exists(STATUS_FILE):
return None
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
for entry in data:
if entry.get('type') == 'AUTOMATIC' and entry.get('status') == 'OPEN':
return entry
except Exception as e:
logging.error(f"ERROR reading status file: {e}")
return None
def round_to_sig_figs(x, sig_figs=5):
if x == 0: return 0.0
return round(x, sig_figs - int(math.floor(math.log10(abs(x)))) - 1)
def round_to_sz_decimals(amount, sz_decimals=4):
return round(abs(amount), sz_decimals)
class HyperliquidStrategy:
def __init__(self, entry_amount0, entry_amount1, target_value, entry_price, low_range, high_range, start_price, static_long=0.0):
self.entry_amount0 = entry_amount0
self.entry_amount1 = entry_amount1
self.target_value = target_value
self.entry_price = entry_price
self.low_range = low_range
self.high_range = high_range
self.static_long = static_long
self.start_price = start_price
self.gap = max(0.0, entry_price - start_price)
self.recovery_target = entry_price + (2 * self.gap)
self.current_mode = "NORMAL"
self.last_switch_time = 0
logging.info(f"Strategy Init. Start Px: {start_price:.2f} | Gap: {self.gap:.2f} | Recovery Tgt: {self.recovery_target:.2f}")
try:
sqrt_P = math.sqrt(entry_price)
sqrt_Pa = math.sqrt(low_range)
sqrt_Pb = math.sqrt(high_range)
self.L = 0.0
# Method 1: Use Amount0 (WETH)
# Formula: L = amount0 / (1/sqrtP - 1/sqrtPb)
if entry_amount0 > 0:
amount0_eth = entry_amount0 / 10**18
denom0 = (1/sqrt_P) - (1/sqrt_Pb)
if denom0 > 0.00000001:
self.L = amount0_eth / denom0
logging.info(f"Calculated L from Amount0: {self.L:.4f}")
# Method 2: Use Amount1 (USDC) if Method 1 failed or yielded 0
# Formula: L = amount1 / (sqrtP - sqrtPa)
# Note: Price in formula is Token1/Token0? No, sqrtPrice is sqrt(Token1/Token0).
# Yes. Amount1 = L * (sqrtP - sqrtPa)
if self.L == 0.0 and entry_amount1 > 0:
amount1_usdc = entry_amount1 / 10**6 # USDC is 6 decimals
denom1 = sqrt_P - sqrt_Pa
if denom1 > 0.00000001:
self.L = amount1_usdc / denom1
logging.info(f"Calculated L from Amount1: {self.L:.4f}")
# Method 3: Fallback Heuristic using Target Value
# Max ETH = Value / LowerPrice.
# L = MaxETH / (1/sqrtPa - 1/sqrtPb)
if self.L == 0.0:
logging.warning("Amounts missing or 0. Using Target Value Heuristic.")
max_eth_heuristic = target_value / low_range
denom_h = (1/sqrt_Pa) - (1/sqrt_Pb)
if denom_h > 0:
self.L = max_eth_heuristic / denom_h
logging.info(f"Calculated L from Target Value: {self.L:.4f}")
else:
logging.error("Critical: Denominator 0 in Heuristic. Invalid Range?")
self.L = 0.0
except Exception as e:
logging.error(f"Error calculating liquidity: {e}")
sys.exit(1)
def get_pool_delta(self, current_price):
if current_price >= self.high_range: return 0.0
if current_price <= self.low_range:
sqrt_Pa = math.sqrt(self.low_range)
sqrt_Pb = math.sqrt(self.high_range)
return self.L * ((1/sqrt_Pa) - (1/sqrt_Pb))
sqrt_P = math.sqrt(current_price)
sqrt_Pb = math.sqrt(self.high_range)
return self.L * ((1/sqrt_P) - (1/sqrt_Pb))
def calculate_rebalance(self, current_price, current_short_position_size):
pool_delta = self.get_pool_delta(current_price)
raw_target_short = pool_delta + self.static_long
entry_upper = self.entry_price * (1 + PRICE_BUFFER_PCT)
entry_lower = self.entry_price * (1 - PRICE_BUFFER_PCT)
desired_mode = self.current_mode
if self.current_mode == "NORMAL":
if current_price > entry_upper and current_price < self.recovery_target:
desired_mode = "RECOVERY"
elif self.current_mode == "RECOVERY":
if current_price < entry_lower or current_price >= self.recovery_target:
desired_mode = "NORMAL"
now = time.time()
if desired_mode != self.current_mode:
if (now - self.last_switch_time) >= TIME_BUFFER_SECONDS:
logging.info(f"🔄 MODE SWITCH: {self.current_mode} -> {desired_mode} (Px: {current_price:.2f})")
self.current_mode = desired_mode
self.last_switch_time = now
else:
logging.info(f"⏳ Mode Switch Delayed (Time Buffer). Pending: {desired_mode}")
if self.current_mode == "RECOVERY":
target_short_size = 0.0
logging.info(f"🩹 RECOVERY MODE ACTIVE (0% Hedge). Target: {self.recovery_target:.2f}")
else:
target_short_size = raw_target_short
diff = target_short_size - abs(current_short_position_size)
return {
"current_price": current_price,
"pool_delta": pool_delta,
"target_short": target_short_size,
"current_short": abs(current_short_position_size),
"diff": diff,
"action": "SELL" if diff > 0 else "BUY",
"mode": self.current_mode
}
class ScalperHedger:
def __init__(self):
self.private_key = os.environ.get("SCALPER_AGENT_PK")
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.private_key:
logging.error("No SCALPER_AGENT_PK found in .env")
sys.exit(1)
self.account = Account.from_key(self.private_key)
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address)
try:
logging.info(f"Setting leverage to {LEVERAGE}x (Cross)...")
self.exchange.update_leverage(LEVERAGE, COIN_SYMBOL, is_cross=True)
except Exception as e:
logging.error(f"Failed to update leverage: {e}")
self.strategy = None
self.sz_decimals = self._get_sz_decimals(COIN_SYMBOL)
self.active_position_id = None
logging.info(f"Scalper Hedger initialized. Agent: {self.account.address}")
def _init_strategy(self, position_data):
try:
entry_amount0 = position_data.get('amount0_initial', 0)
entry_amount1 = position_data.get('amount1_initial', 0)
target_value = position_data.get('target_value', 50.0)
entry_price = position_data['entry_price']
lower = position_data['range_lower']
upper = position_data['range_upper']
static_long = position_data.get('static_long', 0.0)
start_price = self.get_market_price(COIN_SYMBOL)
if start_price is None:
logging.warning("Waiting for initial price to start strategy...")
return
self.strategy = HyperliquidStrategy(
entry_amount0=entry_amount0,
entry_amount1=entry_amount1,
target_value=target_value,
entry_price=entry_price,
low_range=lower,
high_range=upper,
start_price=start_price,
static_long=static_long
)
logging.info(f"Strategy Initialized for Position {position_data['token_id']}.")
self.active_position_id = position_data['token_id']
except Exception as e:
logging.error(f"Failed to init strategy: {e}")
self.strategy = None
def _get_sz_decimals(self, coin):
try:
meta = self.info.meta()
for asset in meta["universe"]:
if asset["name"] == coin:
return asset["szDecimals"]
return 4
except: return 4
def get_market_price(self, coin):
try:
mids = self.info.all_mids()
if coin in mids: return float(mids[coin])
except: pass
return None
def get_funding_rate(self, coin):
try:
meta, asset_ctxs = self.info.meta_and_asset_ctxs()
for i, asset in enumerate(meta["universe"]):
if asset["name"] == coin:
return float(asset_ctxs[i]["funding"])
return 0.0
except: return 0.0
def get_current_position(self, coin):
try:
user_state = self.info.user_state(self.vault_address or self.account.address)
for pos in user_state["assetPositions"]:
if pos["position"]["coin"] == coin:
return float(pos["position"]["szi"])
return 0.0
except: return 0.0
def execute_trade(self, coin, is_buy, size, price):
logging.info(f"🚀 EXECUTING: {coin} {'BUY' if is_buy else 'SELL'} {size} @ ~{price}")
reduce_only = is_buy
try:
raw_limit_px = price * (1.05 if is_buy else 0.95)
limit_px = round_to_sig_figs(raw_limit_px, 5)
order_result = self.exchange.order(coin, is_buy, size, limit_px, {"limit": {"tif": "Ioc"}}, reduce_only=reduce_only)
status = order_result["status"]
if status == "ok":
response_data = order_result["response"]["data"]
if "statuses" in response_data and "error" in response_data["statuses"][0]:
logging.error(f"Order API Error: {response_data['statuses'][0]['error']}")
else:
logging.info(f"✅ Trade Success")
else:
logging.error(f"Order Failed: {order_result}")
except Exception as e:
logging.error(f"Exception during trade: {e}")
def close_all_positions(self):
logging.info("Closing all positions (Safety/Closed State)...")
try:
price = self.get_market_price(COIN_SYMBOL)
current_pos = self.get_current_position(COIN_SYMBOL)
if current_pos == 0: return
is_buy = current_pos < 0
final_size = round_to_sz_decimals(abs(current_pos), self.sz_decimals)
if final_size == 0: return
self.execute_trade(COIN_SYMBOL, is_buy, final_size, price)
self.active_position_id = None
except Exception as e:
logging.error(f"Error closing: {e}")
def run(self):
logging.info(f"Starting Scalper Monitor Loop. Interval: {CHECK_INTERVAL}s")
while True:
try:
active_pos = get_active_automatic_position()
# Check Global Enable Switch
if not active_pos or not active_pos.get('hedge_enabled', True):
if self.strategy is not None:
logging.info("Hedge Disabled or Position Closed. Closing remaining positions.")
self.close_all_positions()
self.strategy = None
else:
pass
time.sleep(CHECK_INTERVAL)
continue
# Initialize Strategy if needed
if self.strategy is None or self.active_position_id != active_pos['token_id']:
logging.info(f"New position {active_pos['token_id']} detected or strategy not initialized. Initializing strategy.")
self._init_strategy(active_pos)
if self.strategy is None:
time.sleep(CHECK_INTERVAL)
continue
# Double Check Strategy validity
if self.strategy is None:
continue
# 2. Market Data
price = self.get_market_price(COIN_SYMBOL)
if price is None:
time.sleep(5)
continue
funding_rate = self.get_funding_rate(COIN_SYMBOL)
current_pos_size = self.get_current_position(COIN_SYMBOL)
# 3. Calculate
calc = self.strategy.calculate_rebalance(price, current_pos_size)
diff_abs = abs(calc['diff'])
# 4. Dynamic Threshold Calculation
sqrt_Pa = math.sqrt(self.strategy.low_range)
sqrt_Pb = math.sqrt(self.strategy.high_range)
max_potential_eth = self.strategy.L * ((1/sqrt_Pa) - (1/sqrt_Pb))
min_threshold = 0.001
rebalance_threshold = max(min_threshold, max_potential_eth * 0.05)
# 5. Execute with Min Order Value check
if diff_abs > rebalance_threshold:
trade_size = round_to_sz_decimals(diff_abs, self.sz_decimals)
min_order_value_usd = 10.0
min_trade_size = min_order_value_usd / price
if trade_size < min_trade_size:
logging.info(f"Idle. Trade size {trade_size} < Min Order Size {min_trade_size:.4f} (${min_order_value_usd})")
elif trade_size > 0:
logging.info(f"⚡ THRESHOLD TRIGGERED ({diff_abs:.4f} >= {rebalance_threshold:.4f})")
is_buy = (calc['action'] == "BUY")
self.execute_trade(COIN_SYMBOL, is_buy, trade_size, price)
else:
logging.info("Trade size rounds to 0. Skipping.")
else:
logging.info(f"Idle. Diff {diff_abs:.4f} < Threshold {rebalance_threshold:.4f}")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
logging.info("Stopping Hedger...")
self.close_all_positions()
break
except Exception as e:
logging.error(f"Loop Error: {e}", exc_info=True)
time.sleep(10)
if __name__ == "__main__":
hedger = ScalperHedger()
hedger.run()

View File

@ -0,0 +1,16 @@
[
{
"type": "AUTOMATIC",
"token_id": 5154921,
"status": "OPEN",
"entry_price": 3088.180203068298,
"range_lower": 3071.745207606606,
"range_upper": 3102.615208978462,
"target_value": 99.31729381997206,
"amount0_initial": 0,
"amount1_initial": 0,
"static_long": 0.0,
"timestamp_open": 1765575924,
"timestamp_close": null
}
]

View File

@ -0,0 +1,711 @@
import os
import time
import json
import re
from web3 import Web3
from eth_account import Account
from dotenv import load_dotenv
# --- Helper Functions ---
def clean_address(addr):
return re.sub(r'[^0-9a-fA-FxX]', '', addr)
def price_from_sqrt_price_x96(sqrt_price_x96, token0_decimals, token1_decimals):
price = (sqrt_price_x96 / (2**96))**2
price = price * (10**(token0_decimals - token1_decimals))
return price
def price_from_tick(tick, token0_decimals, token1_decimals):
price = 1.0001**tick
price = price * (10**(token0_decimals - token1_decimals))
return price
def from_wei(amount, decimals):
return amount / (10**decimals)
# --- V3 Math Helpers ---
def get_sqrt_ratio_at_tick(tick):
return int((1.0001 ** (tick / 2)) * (2 ** 96))
def get_liquidity_for_amount0(sqrt_ratio_a, sqrt_ratio_b, amount0):
if sqrt_ratio_a > sqrt_ratio_b:
sqrt_ratio_a, sqrt_ratio_b = sqrt_ratio_b, sqrt_ratio_a
return int(amount0 * sqrt_ratio_a * sqrt_ratio_b / (sqrt_ratio_b - sqrt_ratio_a))
def get_liquidity_for_amount1(sqrt_ratio_a, sqrt_ratio_b, amount1):
if sqrt_ratio_a > sqrt_ratio_b:
sqrt_ratio_a, sqrt_ratio_b = sqrt_ratio_b, sqrt_ratio_a
return int(amount1 / (sqrt_ratio_b - sqrt_ratio_a))
def get_amounts_for_liquidity(sqrt_ratio_current, sqrt_ratio_a, sqrt_ratio_b, liquidity):
if sqrt_ratio_a > sqrt_ratio_b:
sqrt_ratio_a, sqrt_ratio_b = sqrt_ratio_b, sqrt_ratio_a
amount0 = 0
amount1 = 0
Q96 = 1 << 96
if sqrt_ratio_current <= sqrt_ratio_a:
amount0 = ((liquidity * Q96) // sqrt_ratio_a) - ((liquidity * Q96) // sqrt_ratio_b)
elif sqrt_ratio_current < sqrt_ratio_b:
amount0 = ((liquidity * Q96) // sqrt_ratio_current) - ((liquidity * Q96) // sqrt_ratio_b)
amount1 = (liquidity * (sqrt_ratio_current - sqrt_ratio_a)) // Q96
else:
amount1 = (liquidity * (sqrt_ratio_b - sqrt_ratio_a)) // Q96
return amount0, amount1
# --- Configuration ---
RPC_URL = os.environ.get("MAINNET_RPC_URL")
POSITION_TOKEN_ID = int(os.environ.get("POSITION_TOKEN_ID", "0"))
PRIVATE_KEY = os.environ.get("MAIN_WALLET_PRIVATE_KEY") or os.environ.get("PRIVATE_KEY")
MONITOR_INTERVAL_SECONDS = 30
COLLECT_FEES_ENABLED = False
CLOSE_POSITION_ENABLED = True
CLOSE_IF_OUT_OF_RANGE_ONLY = True
OPEN_POSITION_ENABLED = True
TARGET_INVESTMENT_VALUE_TOKEN1 = 100.0
RANGE_WIDTH_PCT = 0.005
STATUS_FILE = "hedge_status.json"
# --- JSON State Helpers ---
def get_active_automatic_position():
if not os.path.exists(STATUS_FILE):
return None
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
for entry in data:
if entry.get('type') == 'AUTOMATIC' and entry.get('status') == 'OPEN':
return entry
except Exception as e:
print(f"ERROR reading status file: {e}")
return None
def get_all_open_positions():
"""Reads hedge_status.json and returns a list of all OPEN positions (Manual and Automatic)."""
if not os.path.exists(STATUS_FILE):
return []
try:
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
return [entry for entry in data if entry.get('status') == 'OPEN']
except Exception as e:
print(f"ERROR reading status file: {e}")
return []
def update_hedge_status_file(action, position_data):
current_data = []
if os.path.exists(STATUS_FILE):
try:
with open(STATUS_FILE, 'r') as f:
current_data = json.load(f)
except:
current_data = []
if action == "OPEN":
new_entry = {
"type": "AUTOMATIC",
"token_id": position_data['token_id'],
"status": "OPEN",
"entry_price": position_data['entry_price'],
"range_lower": position_data['range_lower'],
"range_upper": position_data['range_upper'],
"target_value": position_data.get('target_value', 0.0),
"amount0_initial": position_data.get('amount0', 0),
"amount1_initial": position_data.get('amount1', 0),
"static_long": 0.0,
"timestamp_open": int(time.time()),
"timestamp_close": None
}
current_data.append(new_entry)
print(f"Recorded new AUTOMATIC position {position_data['token_id']} in {STATUS_FILE}")
elif action == "CLOSE":
found = False
for entry in current_data:
if (
entry.get('type') == "AUTOMATIC" and
entry.get('status') == "OPEN" and
entry.get('token_id') == position_data['token_id']
):
entry['status'] = "CLOSED"
entry['timestamp_close'] = int(time.time())
found = True
print(f"Marked position {entry['token_id']} as CLOSED in {STATUS_FILE}")
break
if not found:
print(f"WARNING: Could not find open AUTOMATIC position {position_data['token_id']} to close.")
with open(STATUS_FILE, 'w') as f:
json.dump(current_data, f, indent=2)
# --- ABIs ---
# Simplified for length, usually loaded from huge string
NONFUNGIBLE_POSITION_MANAGER_ABI = json.loads('''
[
{"inputs": [], "name": "factory", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}], "name": "positions", "outputs": [{"internalType": "uint96", "name": "nonce", "type": "uint96"}, {"internalType": "address", "name": "operator", "type": "address"}, {"internalType": "address", "name": "token0", "type": "address"}, {"internalType": "address", "name": "token1", "type": "address"}, {"internalType": "uint24", "name": "fee", "type": "uint24"}, {"internalType": "int24", "name": "tickLower", "type": "int24"}, {"internalType": "int24", "name": "tickUpper", "type": "int24"}, {"internalType": "uint128", "name": "liquidity", "type": "uint128"}, {"internalType": "uint256", "name": "feeGrowthInside0LastX128", "type": "uint256"}, {"internalType": "uint256", "name": "feeGrowthInside1LastX128", "type": "uint256"}, {"internalType": "uint128", "name": "tokensOwed0", "type": "uint128"}, {"internalType": "uint128", "name": "tokensOwed1", "type": "uint128"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"components": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}, {"internalType": "address", "name": "recipient", "type": "address"}, {"internalType": "uint128", "name": "amount0Max", "type": "uint128"}, {"internalType": "uint128", "name": "amount1Max", "type": "uint128"}], "internalType": "struct INonfungiblePositionManager.CollectParams", "name": "params", "type": "tuple"}], "name": "collect", "outputs": [{"internalType": "uint256", "name": "amount0", "type": "uint256"}, {"internalType": "uint256", "name": "amount1", "type": "uint256"}], "stateMutability": "payable", "type": "function"},
{"inputs": [{"components": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}, {"internalType": "uint128", "name": "liquidity", "type": "uint128"}, {"internalType": "uint256", "name": "amount0Min", "type": "uint256"}, {"internalType": "uint256", "name": "amount1Min", "type": "uint256"}, {"internalType": "uint256", "name": "deadline", "type": "uint256"}], "internalType": "struct INonfungiblePositionManager.DecreaseLiquidityParams", "name": "params", "type": "tuple"}], "name": "decreaseLiquidity", "outputs": [{"internalType": "uint256", "name": "amount0", "type": "uint256"}, {"internalType": "uint256", "name": "amount1", "type": "uint256"}], "stateMutability": "payable", "type": "function"},
{"inputs": [{"components": [{"internalType": "address", "name": "token0", "type": "address"}, {"internalType": "address", "name": "token1", "type": "address"}, {"internalType": "uint24", "name": "fee", "type": "uint24"}, {"internalType": "int24", "name": "tickLower", "type": "int24"}, {"internalType": "int24", "name": "tickUpper", "type": "int24"}, {"internalType": "uint256", "name": "amount0Desired", "type": "uint256"}, {"internalType": "uint256", "name": "amount1Desired", "type": "uint256"}, {"internalType": "uint256", "name": "amount0Min", "type": "uint256"}, {"internalType": "uint256", "name": "amount1Min", "type": "uint256"}, {"internalType": "address", "name": "recipient", "type": "address"}, {"internalType": "uint256", "name": "deadline", "type": "uint256"}], "internalType": "struct INonfungiblePositionManager.MintParams", "name": "params", "type": "tuple"}], "name": "mint", "outputs": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}, {"internalType": "uint128", "name": "liquidity", "type": "uint128"}, {"internalType": "uint256", "name": "amount0", "type": "uint256"}, {"internalType": "uint256", "name": "amount1", "type": "uint256"}], "stateMutability": "payable", "type": "function"}
]
''')
UNISWAP_V3_POOL_ABI = json.loads('''
[
{"inputs": [], "name": "slot0", "outputs": [{"internalType": "uint160", "name": "sqrtPriceX96", "type": "uint160"}, {"internalType": "int24", "name": "tick", "type": "int24"}, {"internalType": "uint16", "name": "observationIndex", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinality", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinalityNext", "type": "uint16"}, {"internalType": "uint8", "name": "feeProtocol", "type": "uint8"}, {"internalType": "bool", "name": "unlocked", "type": "bool"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token0", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token1", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "fee", "outputs": [{"internalType": "uint24", "name": "", "type": "uint24"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "liquidity", "outputs": [{"internalType": "uint128", "name": "", "type": "uint128"}], "stateMutability": "view", "type": "function"}
]
''')
ERC20_ABI = json.loads('''
[
{"inputs": [], "name": "decimals", "outputs": [{"internalType": "uint8", "name": "", "type": "uint8"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "symbol", "outputs": [{"internalType": "string", "name": "", "type": "string"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"internalType": "address", "name": "account", "type": "address"}], "name": "balanceOf", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"internalType": "address", "name": "spender", "type": "address"}, {"internalType": "uint256", "name": "amount", "type": "uint256"}], "name": "approve", "outputs": [{"internalType": "bool", "name": "", "type": "bool"}], "stateMutability": "nonpayable", "type": "function"},
{"inputs": [{"internalType": "address", "name": "owner", "type": "address"}, {"internalType": "address", "name": "spender", "type": "address"}], "name": "allowance", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"}
]
''')
UNISWAP_V3_FACTORY_ABI = json.loads('''
[
{"inputs": [{"internalType": "address", "name": "tokenA", "type": "address"}, {"internalType": "address", "name": "tokenB", "type": "address"}, {"internalType": "uint24", "name": "fee", "type": "uint24"}], "name": "getPool", "outputs": [{"internalType": "address", "name": "pool", "type": "address"}], "stateMutability": "view", "type": "function"}
]
''')
SWAP_ROUTER_ABI = json.loads('''
[
{"inputs": [{"components": [{"internalType": "address", "name": "tokenIn", "type": "address"}, {"internalType": "address", "name": "tokenOut", "type": "address"}, {"internalType": "uint24", "name": "fee", "type": "uint24"}, {"internalType": "address", "name": "recipient", "type": "address"}, {"internalType": "uint256", "name": "deadline", "type": "uint256"}, {"internalType": "uint256", "name": "amountIn", "type": "uint256"}, {"internalType": "uint256", "name": "amountOutMinimum", "type": "uint256"}, {"internalType": "uint160", "name": "sqrtPriceLimitX96", "type": "uint160"}], "internalType": "struct ISwapRouter.ExactInputSingleParams", "name": "params", "type": "tuple"}], "name": "exactInputSingle", "outputs": [{"internalType": "uint256", "name": "amountOut", "type": "uint256"}], "stateMutability": "payable", "type": "function"}
]
''')
WETH9_ABI = json.loads('''
[
{"constant": false, "inputs": [], "name": "deposit", "outputs": [], "payable": true, "stateMutability": "payable", "type": "function"},
{"constant": false, "inputs": [{"name": "wad", "type": "uint256"}], "name": "withdraw", "outputs": [], "payable": false, "stateMutability": "nonpayable", "type": "function"}
]
''')
NONFUNGIBLE_POSITION_MANAGER_ADDRESS = bytes.fromhex("C36442b4" + "a4522E87" + "1399CD71" + "7aBDD847" + "Ab11FE88")
UNISWAP_V3_SWAP_ROUTER_ADDRESS = bytes.fromhex("E592427A0AEce92De3Edee1F18E0157C05861564")
WETH_ADDRESS = "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1" # Arbitrum WETH
# --- Core Logic Functions ---
def get_position_details(w3_instance, npm_c, factory_c, token_id):
try:
position_data = npm_c.functions.positions(token_id).call()
(nonce, operator, token0_address, token1_address, fee, tickLower, tickUpper, liquidity,
feeGrowthInside0, feeGrowthInside1, tokensOwed0, tokensOwed1) = position_data
token0_contract = w3_instance.eth.contract(address=token0_address, abi=ERC20_ABI)
token1_contract = w3_instance.eth.contract(address=token1_address, abi=ERC20_ABI)
token0_symbol = token0_contract.functions.symbol().call()
token1_symbol = token1_contract.functions.symbol().call()
token0_decimals = token0_contract.functions.decimals().call()
token1_decimals = token1_contract.functions.decimals().call()
pool_address = factory_c.functions.getPool(token0_address, token1_address, fee).call()
if pool_address == '0x0000000000000000000000000000000000000000':
return None, None
pool_contract = w3_instance.eth.contract(address=pool_address, abi=UNISWAP_V3_POOL_ABI)
return {
"token0_address": token0_address, "token1_address": token1_address,
"token0_symbol": token0_symbol, "token1_symbol": token1_symbol,
"token0_decimals": token0_decimals, "token1_decimals": token1_decimals,
"fee": fee, "tickLower": tickLower, "tickUpper": tickUpper, "liquidity": liquidity,
"pool_address": pool_address
}, pool_contract
except Exception as e:
print(f"ERROR fetching position details: {e}")
return None, None
def get_pool_dynamic_data(pool_c):
try:
slot0_data = pool_c.functions.slot0().call()
return {"sqrtPriceX96": slot0_data[0], "tick": slot0_data[1]}
except Exception as e:
print(f"ERROR fetching pool dynamic data: {e}")
return None
def calculate_mint_amounts(current_tick, tick_lower, tick_upper, investment_value_token1, decimals0, decimals1, sqrt_price_current_x96):
sqrt_price_current = get_sqrt_ratio_at_tick(current_tick)
sqrt_price_lower = get_sqrt_ratio_at_tick(tick_lower)
sqrt_price_upper = get_sqrt_ratio_at_tick(tick_upper)
# 1. Get Price of Token0 in terms of Token1 (e.g., WETH price in USDC)
price_of_token0_in_token1_units = price_from_sqrt_price_x96(sqrt_price_current_x96, decimals0, decimals1)
# 2. Estimate Amounts for a Test Liquidity (L_test)
L_test = 1 << 128
amt0_test, amt1_test = get_amounts_for_liquidity(sqrt_price_current, sqrt_price_lower, sqrt_price_upper, L_test)
# 3. Adjust test amounts for decimals to get "Real Units" (e.g., 0.1 WETH, 500 USDC)
real_amt0_test = amt0_test / (10**decimals0)
real_amt1_test = amt1_test / (10**decimals1)
# 4. Calculate Total Value of Test Position in Token1 terms (e.g., Total in USDC)
value_test = (real_amt0_test * price_of_token0_in_token1_units) + real_amt1_test
if value_test == 0:
return 0, 0
# 5. Scale to Target Investment Value
scale = investment_value_token1 / value_test
# 6. Calculate Final Amounts (raw integer units for contract call)
final_amt0 = int(amt0_test * scale)
final_amt1 = int(amt1_test * scale)
return final_amt0, final_amt1
def check_and_swap(w3_instance, router_contract, account, token0, token1, amount0_needed, amount1_needed):
token0_contract = w3_instance.eth.contract(address=token0, abi=ERC20_ABI)
token1_contract = w3_instance.eth.contract(address=token1, abi=ERC20_ABI)
bal0 = token0_contract.functions.balanceOf(account.address).call()
bal1 = token1_contract.functions.balanceOf(account.address).call()
deficit0 = max(0, amount0_needed - bal0)
deficit1 = max(0, amount1_needed - bal1)
# --- AUTO-WRAP ETH LOGIC ---
# Check if we need WETH and have Native ETH
# WETH Address Check (Case insensitive)
weth_addr_lower = WETH_ADDRESS.lower()
# Check Token0 (Deficit0)
if (deficit0 > 0 or deficit1 > 0) and token0.lower() == weth_addr_lower:
native_bal = w3_instance.eth.get_balance(account.address)
gas_reserve = 2 * 10**16 # 0.02 ETH gas reserve
available_native = max(0, native_bal - gas_reserve)
# Determine how much to wrap
# If we have deficit1 (need USDC), we likely need to wrap more WETH to swap it.
# Strategy: If deficit1 > 0, wrap ALL available native ETH (up to reasonable limit?).
# Or just wrap what we have.
amount_to_wrap = 0
if deficit0 > 0:
amount_to_wrap = deficit0
if deficit1 > 0:
# We need to buy Token1. We need surplus Token0.
# Wrap all remaining available native ETH to facilitate swap.
amount_to_wrap = available_native
# Safety clamp
amount_to_wrap = min(amount_to_wrap, available_native)
if amount_to_wrap > 0:
print(f"Auto-Wrapping {from_wei(amount_to_wrap, 18)} ETH to WETH...")
weth_contract = w3_instance.eth.contract(address=token0, abi=WETH9_ABI)
wrap_txn = weth_contract.functions.deposit().build_transaction({
'from': account.address,
'value': amount_to_wrap,
'nonce': w3_instance.eth.get_transaction_count(account.address),
'gas': 100000,
'maxFeePerGas': w3_instance.eth.gas_price * 2,
'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee,
'chainId': w3_instance.eth.chain_id
})
signed_wrap = w3_instance.eth.account.sign_transaction(wrap_txn, private_key=account.key)
raw_wrap = signed_wrap.rawTransaction if hasattr(signed_wrap, 'rawTransaction') else signed_wrap.raw_transaction
tx_hash = w3_instance.eth.send_raw_transaction(raw_wrap)
print(f"Wrap Sent: {tx_hash.hex()}")
w3_instance.eth.wait_for_transaction_receipt(tx_hash)
# Refresh Balance
bal0 = token0_contract.functions.balanceOf(account.address).call()
deficit0 = max(0, amount0_needed - bal0)
else:
if deficit0 > 0:
print(f"Insufficient Native ETH to wrap. Need: {from_wei(deficit0, 18)}, Have: {from_wei(available_native, 18)}")
# Check Token1 (Deficit1) - Assuming Token1 could be WETH too
if deficit1 > 0 and token1.lower() == weth_addr_lower:
native_bal = w3_instance.eth.get_balance(account.address)
gas_reserve = 10**16
available_native = max(0, native_bal - gas_reserve)
if available_native >= deficit1:
print(f"Auto-Wrapping {from_wei(deficit1, 18)} ETH to WETH...")
weth_contract = w3_instance.eth.contract(address=token1, abi=WETH9_ABI)
wrap_txn = weth_contract.functions.deposit().build_transaction({
'from': account.address,
'value': deficit1,
'nonce': w3_instance.eth.get_transaction_count(account.address),
'gas': 100000,
'maxFeePerGas': w3_instance.eth.gas_price * 2,
'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee,
'chainId': w3_instance.eth.chain_id
})
signed_wrap = w3_instance.eth.account.sign_transaction(wrap_txn, private_key=account.key)
raw_wrap = signed_wrap.rawTransaction if hasattr(signed_wrap, 'rawTransaction') else signed_wrap.raw_transaction
tx_hash = w3_instance.eth.send_raw_transaction(raw_wrap)
print(f"Wrap Sent: {tx_hash.hex()}")
w3_instance.eth.wait_for_transaction_receipt(tx_hash)
# Refresh Balance
bal1 = token1_contract.functions.balanceOf(account.address).call()
deficit1 = max(0, amount1_needed - bal1)
if deficit0 == 0 and deficit1 == 0:
return True
if deficit0 > 0 and bal1 > amount1_needed:
surplus1 = bal1 - amount1_needed
print(f"Swapping surplus Token1 ({surplus1}) for Token0...")
approve_txn = token1_contract.functions.approve(router_contract.address, surplus1).build_transaction({
'from': account.address, 'nonce': w3_instance.eth.get_transaction_count(account.address),
'gas': 100000, 'maxFeePerGas': w3_instance.eth.gas_price * 2, 'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee,
'chainId': w3_instance.eth.chain_id
})
signed = w3_instance.eth.account.sign_transaction(approve_txn, private_key=account.key)
raw = signed.rawTransaction if hasattr(signed, 'rawTransaction') else signed.raw_transaction
w3_instance.eth.send_raw_transaction(raw)
time.sleep(2)
params = (token1, token0, 500, account.address, int(time.time()) + 120, surplus1, 0, 0)
swap_txn = router_contract.functions.exactInputSingle(params).build_transaction({
'from': account.address, 'nonce': w3_instance.eth.get_transaction_count(account.address),
'gas': 300000, 'maxFeePerGas': w3_instance.eth.gas_price * 2, 'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee,
'chainId': w3_instance.eth.chain_id
})
signed_swap = w3_instance.eth.account.sign_transaction(swap_txn, private_key=account.key)
raw_swap = signed_swap.rawTransaction if hasattr(signed_swap, 'rawTransaction') else signed_swap.raw_transaction
tx_hash = w3_instance.eth.send_raw_transaction(raw_swap)
print(f"Swap Sent: {tx_hash.hex()}")
w3_instance.eth.wait_for_transaction_receipt(tx_hash)
return True
elif deficit1 > 0 and bal0 > amount0_needed:
surplus0 = bal0 - amount0_needed
print(f"Swapping surplus Token0 ({surplus0}) for Token1...")
approve_txn = token0_contract.functions.approve(router_contract.address, surplus0).build_transaction({
'from': account.address, 'nonce': w3_instance.eth.get_transaction_count(account.address),
'gas': 100000, 'maxFeePerGas': w3_instance.eth.gas_price * 2, 'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee,
'chainId': w3_instance.eth.chain_id
})
signed = w3_instance.eth.account.sign_transaction(approve_txn, private_key=account.key)
raw = signed.rawTransaction if hasattr(signed, 'rawTransaction') else signed.raw_transaction
w3_instance.eth.send_raw_transaction(raw)
time.sleep(2)
params = (token0, token1, 500, account.address, int(time.time()) + 120, surplus0, 0, 0)
swap_txn = router_contract.functions.exactInputSingle(params).build_transaction({
'from': account.address, 'nonce': w3_instance.eth.get_transaction_count(account.address),
'gas': 300000, 'maxFeePerGas': w3_instance.eth.gas_price * 2, 'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee,
'chainId': w3_instance.eth.chain_id
})
signed_swap = w3_instance.eth.account.sign_transaction(swap_txn, private_key=account.key)
raw_swap = signed_swap.rawTransaction if hasattr(signed_swap, 'rawTransaction') else signed_swap.raw_transaction
tx_hash = w3_instance.eth.send_raw_transaction(raw_swap)
print(f"Swap Sent: {tx_hash.hex()}")
w3_instance.eth.wait_for_transaction_receipt(tx_hash)
return True
print("❌ Insufficient funds for required amounts.")
return False
def get_token_balances(w3_instance, account_address, token0_address, token1_address):
try:
token0_contract = w3_instance.eth.contract(address=token0_address, abi=ERC20_ABI)
token1_contract = w3_instance.eth.contract(address=token1_address, abi=ERC20_ABI)
b0 = token0_contract.functions.balanceOf(account_address).call()
b1 = token1_contract.functions.balanceOf(account_address).call()
return b0, b1
except: return 0, 0
def decrease_liquidity(w3_instance, npm_contract, account, position_id, liquidity_amount):
try:
txn = npm_contract.functions.decreaseLiquidity((position_id, liquidity_amount, 0, 0, int(time.time()) + 180)).build_transaction({
'from': account.address, 'gas': 1000000, 'maxFeePerGas': w3_instance.eth.gas_price * 2, 'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee, 'nonce': w3_instance.eth.get_transaction_count(account.address), 'chainId': w3_instance.eth.chain_id
})
signed = w3_instance.eth.account.sign_transaction(txn, private_key=account.key)
raw = signed.rawTransaction if hasattr(signed, 'rawTransaction') else signed.raw_transaction
tx_hash = w3_instance.eth.send_raw_transaction(raw)
print(f"Decrease Sent: {tx_hash.hex()}")
w3_instance.eth.wait_for_transaction_receipt(tx_hash)
return True
except Exception as e:
print(f"Error decreasing: {e}")
return False
def mint_new_position(w3_instance, npm_contract, account, token0, token1, amount0, amount1, tick_lower, tick_upper):
print(f"\n--- Attempting to Mint ---")
try:
token0_c = w3_instance.eth.contract(address=token0, abi=ERC20_ABI)
token1_c = w3_instance.eth.contract(address=token1, abi=ERC20_ABI)
# Approve 0
txn0 = token0_c.functions.approve(npm_contract.address, amount0).build_transaction({
'from': account.address, 'nonce': w3_instance.eth.get_transaction_count(account.address),
'gas': 100000, 'maxFeePerGas': w3_instance.eth.gas_price * 2, 'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee, 'chainId': w3_instance.eth.chain_id
})
signed0 = w3_instance.eth.account.sign_transaction(txn0, private_key=account.key)
raw0 = signed0.rawTransaction if hasattr(signed0, 'rawTransaction') else signed0.raw_transaction
w3_instance.eth.send_raw_transaction(raw0)
time.sleep(2)
# Approve 1
txn1 = token1_c.functions.approve(npm_contract.address, amount1).build_transaction({
'from': account.address, 'nonce': w3_instance.eth.get_transaction_count(account.address),
'gas': 100000, 'maxFeePerGas': w3_instance.eth.gas_price * 2, 'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee, 'chainId': w3_instance.eth.chain_id
})
signed1 = w3_instance.eth.account.sign_transaction(txn1, private_key=account.key)
raw1 = signed1.rawTransaction if hasattr(signed1, 'rawTransaction') else signed1.raw_transaction
w3_instance.eth.send_raw_transaction(raw1)
time.sleep(2)
# Mint
params = (token0, token1, 500, tick_lower, tick_upper, amount0, amount1, 0, 0, account.address, int(time.time()) + 180)
mint_txn = npm_contract.functions.mint(params).build_transaction({
'from': account.address, 'nonce': w3_instance.eth.get_transaction_count(account.address),
'gas': 800000, 'maxFeePerGas': w3_instance.eth.gas_price * 2, 'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee, 'chainId': w3_instance.eth.chain_id
})
signed_mint = w3_instance.eth.account.sign_transaction(mint_txn, private_key=account.key)
raw_mint = signed_mint.rawTransaction if hasattr(signed_mint, 'rawTransaction') else signed_mint.raw_transaction
tx_hash = w3_instance.eth.send_raw_transaction(raw_mint)
print(f"Mint Sent: {tx_hash.hex()}")
receipt = w3_instance.eth.wait_for_transaction_receipt(tx_hash)
if receipt.status == 1:
print("✅ Mint Successful!")
result_data = {'token_id': None, 'liquidity': 0, 'amount0': 0, 'amount1': 0}
# Event Topics
transfer_topic = "ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
increase_liquidity_topic = "3067048beee31b25b2f1681f88dac838c8bba36af25bfb2b7cf7473a5847e35f"
for log in receipt['logs']:
topic0 = log['topics'][0].hex().replace("0x", "")
# Parse Token ID from Transfer
if topic0 == transfer_topic and len(log['topics']) > 3:
result_data['token_id'] = int(log['topics'][3].hex(), 16)
# Parse Amounts from IncreaseLiquidity
# Event: IncreaseLiquidity(uint256 indexed tokenId, uint128 liquidity, uint256 amount0, uint256 amount1)
# Indexed args are in topics. Non-indexed in data.
# tokenId is indexed (Topic 1).
# Data: liquidity (32 bytes), amount0 (32 bytes), amount1 (32 bytes) = 96 bytes total
if topic0 == increase_liquidity_topic:
data_hex = log['data'].hex().replace("0x", "")
if len(data_hex) >= 192: # 3 * 64 chars
# Split data into 3 chunks of 64 chars (32 bytes)
liquidity_hex = data_hex[0:64]
amount0_hex = data_hex[64:128]
amount1_hex = data_hex[128:192]
result_data['liquidity'] = int(liquidity_hex, 16)
result_data['amount0'] = int(amount0_hex, 16)
result_data['amount1'] = int(amount1_hex, 16)
print(f"Captured Actual Mint Amounts: {result_data['amount0']} Token0 / {result_data['amount1']} Token1")
if result_data['token_id']:
return result_data
return None
else:
print("❌ Mint Failed!")
return None
except Exception as e:
print(f"Mint Error: {e}")
return None
def collect_fees(w3_instance, npm_contract, account, position_id):
try:
txn = npm_contract.functions.collect((position_id, account.address, 2**128-1, 2**128-1)).build_transaction({
'from': account.address, 'gas': 1000000, 'maxFeePerGas': w3_instance.eth.gas_price * 2, 'maxPriorityFeePerGas': w3_instance.eth.max_priority_fee, 'nonce': w3_instance.eth.get_transaction_count(account.address), 'chainId': w3_instance.eth.chain_id
})
signed = w3_instance.eth.account.sign_transaction(txn, private_key=account.key)
raw = signed.rawTransaction if hasattr(signed, 'rawTransaction') else signed.raw_transaction
tx_hash = w3_instance.eth.send_raw_transaction(raw)
print(f"Collect Sent: {tx_hash.hex()}")
w3_instance.eth.wait_for_transaction_receipt(tx_hash)
return True
except: return False
def main():
print(f"CWD: {os.getcwd()}")
# Load .env from current directory
load_dotenv(override=True)
rpc_url = os.environ.get("MAINNET_RPC_URL")
private_key = os.environ.get("MAIN_WALLET_PRIVATE_KEY") or os.environ.get("PRIVATE_KEY")
if not rpc_url or not private_key:
print("Missing RPC or Private Key.")
return
w3 = Web3(Web3.HTTPProvider(rpc_url))
if not w3.is_connected():
print("RPC Connection Failed")
return
print(f"Connected to Chain ID: {w3.eth.chain_id}")
account = Account.from_key(private_key)
w3.eth.default_account = account.address
print(f"Wallet: {account.address}")
npm_contract = w3.eth.contract(address=NONFUNGIBLE_POSITION_MANAGER_ADDRESS, abi=NONFUNGIBLE_POSITION_MANAGER_ABI)
factory_addr = npm_contract.functions.factory().call()
factory_contract = w3.eth.contract(address=factory_addr, abi=UNISWAP_V3_FACTORY_ABI)
router_contract = w3.eth.contract(address=UNISWAP_V3_SWAP_ROUTER_ADDRESS, abi=SWAP_ROUTER_ABI)
print("\n--- STARTING LIFECYCLE MANAGER ---")
while True:
try:
# 1. Get All Open Positions
all_positions = get_all_open_positions()
# Check if we have an active AUTOMATIC position
active_automatic_position = next((p for p in all_positions if p['type'] == 'AUTOMATIC'), None)
if all_positions:
print("\n" + "="*60)
print(f"Monitoring at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
for position in all_positions:
token_id = position['token_id']
pos_type = position['type']
# Fetch Details
pos_details, pool_c = get_position_details(w3, npm_contract, factory_contract, token_id)
if not pos_details:
print(f"ERROR: Could not get details for Position {token_id}. Skipping.")
continue
pool_data = get_pool_dynamic_data(pool_c)
current_tick = pool_data['tick']
# Calculate Fees (Simulation)
unclaimed0 = 0
unclaimed1 = 0
try:
fees_sim = npm_contract.functions.collect((token_id, "0x0000000000000000000000000000000000000000", 2**128-1, 2**128-1)).call()
unclaimed0 = from_wei(fees_sim[0], pos_details['token0_decimals'])
unclaimed1 = from_wei(fees_sim[1], pos_details['token1_decimals'])
except: pass
# Check Range
is_out_of_range = False
status_str = "IN RANGE"
if current_tick < pos_details['tickLower']:
is_out_of_range = True
status_str = "OUT OF RANGE (BELOW)"
elif current_tick >= pos_details['tickUpper']:
is_out_of_range = True
status_str = "OUT OF RANGE (ABOVE)"
print(f"\nID: {token_id} | Type: {pos_type} | Status: {status_str}")
print(f" Range: {position['range_lower']:.2f} - {position['range_upper']:.2f}")
print(f" Fees: {unclaimed0:.4f} {pos_details['token0_symbol']} / {unclaimed1:.4f} {pos_details['token1_symbol']}")
# --- AUTO CLOSE LOGIC (AUTOMATIC ONLY) ---
if pos_type == 'AUTOMATIC' and CLOSE_POSITION_ENABLED and is_out_of_range:
print(f"⚠️ Automatic Position {token_id} is OUT OF RANGE! Initiating Close...")
liq = pos_details['liquidity']
if liq > 0:
if decrease_liquidity(w3, npm_contract, account, token_id, liq):
time.sleep(5)
collect_fees(w3, npm_contract, account, token_id)
update_hedge_status_file("CLOSE", {'token_id': token_id})
print("Position Closed & Status Updated.")
# We don't break loop here, let it finish monitoring others,
# but next main loop iteration will see it closed.
else:
print("Liquidity 0. Marking closed.")
update_hedge_status_file("CLOSE", {'token_id': token_id})
# 2. Opening Logic (If no active automatic position)
if not active_automatic_position and OPEN_POSITION_ENABLED:
print("\n[OPENING] No active automatic position. Starting Open Sequence...")
# Get Pool (WETH/USDC)
token0 = "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1" # WETH
token1 = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831" # USDC
pool_addr = factory_contract.functions.getPool(token0, token1, 500).call()
pool_c = w3.eth.contract(address=pool_addr, abi=UNISWAP_V3_POOL_ABI)
pool_data = get_pool_dynamic_data(pool_c)
tick = pool_data['tick']
# Range +/- 2%
import math
tick_delta = int(math.log(1 + RANGE_WIDTH_PCT) / math.log(1.0001))
spacing = 10
lower = (tick - tick_delta) // spacing * spacing
upper = (tick + tick_delta) // spacing * spacing
# Amounts
token0_c = w3.eth.contract(address=token0, abi=ERC20_ABI)
token1_c = w3.eth.contract(address=token1, abi=ERC20_ABI)
try:
d0 = token0_c.functions.decimals().call()
d1 = token1_c.functions.decimals().call()
except:
print("Error fetching decimals")
time.sleep(5)
continue
amt0, amt1 = calculate_mint_amounts(tick, lower, upper, TARGET_INVESTMENT_VALUE_TOKEN1, d0, d1, pool_data['sqrtPriceX96'])
amt0_buf, amt1_buf = int(amt0 * 1.02), int(amt1 * 1.02)
# 4. Swap & Mint
if check_and_swap(w3, router_contract, account, token0, token1, amt0_buf, amt1_buf):
mint_result = mint_new_position(w3, npm_contract, account, token0, token1, amt0, amt1, lower, upper)
if mint_result:
# Calculate Actual Value
try:
s0 = token0_c.functions.symbol().call()
s1 = token1_c.functions.symbol().call()
except:
s0, s1 = "T0", "T1"
real_amt0 = from_wei(mint_result['amount0'], d0)
real_amt1 = from_wei(mint_result['amount1'], d1)
entry_price = price_from_sqrt_price_x96(pool_data['sqrtPriceX96'], d0, d1)
# Value in Token1 terms (e.g. USDC)
actual_value = (real_amt0 * entry_price) + real_amt1
print(f"ACTUAL MINT VALUE: {actual_value:.2f} {s1}/{s0}")
pos_data = {
'token_id': mint_result['token_id'],
'entry_price': entry_price,
'range_lower': price_from_tick(lower, d0, d1),
'range_upper': price_from_tick(upper, d0, d1),
'target_value': actual_value, # Save Actual Value as Target for hedging accuracy
'amount0_initial': mint_result['amount0'],
'amount1_initial': mint_result['amount1']
}
update_hedge_status_file("OPEN", pos_data)
print("Cycle Complete. Monitoring.")
elif not all_positions:
print("No open positions (Manual or Automatic). Waiting...")
time.sleep(MONITOR_INTERVAL_SECONDS)
except KeyboardInterrupt:
print("\nManager stopped.")
break
except Exception as e:
print(f"Error in Main Loop: {e}")
time.sleep(MONITOR_INTERVAL_SECONDS)
if __name__ == "__main__":
main()

View File

@ -33,7 +33,7 @@ def create_and_authorize_agent():
# --- STEP 3: Create and approve the agent with a specific name --- # --- STEP 3: Create and approve the agent with a specific name ---
# agent name must be between 1 and 16 characters long # agent name must be between 1 and 16 characters long
agent_name = "executor_swing" agent_name = "executor_SCALPER"
print(f"\n🔗 Authorizing a new agent named '{agent_name}'...") print(f"\n🔗 Authorizing a new agent named '{agent_name}'...")
try: try:

Binary file not shown.