Files
uniswap_auto_clp/florida/tools/collect_fees_v2.py

325 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Fee Collection & Position Recovery Script
Collects all accumulated fees from Uniswap V3 positions
Usage:
python collect_fees_v2.py
"""
import os
import sys
import json
import time
import argparse
# Required libraries
try:
from web3 import Web3
from eth_account import Account
except ImportError as e:
print(f"[ERROR] Missing required library: {e}")
print("Please install with: pip install web3 eth-account python-dotenv")
sys.exit(1)
try:
from dotenv import load_dotenv
except ImportError:
print("[WARNING] python-dotenv not found, using environment variables directly")
def load_dotenv(override=True):
pass
def setup_logging():
"""Setup logging for fee collection"""
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('collect_fees.log', encoding='utf-8')
]
)
return logging.getLogger(__name__)
logger = setup_logging()
# --- Contract ABIs ---
NONFUNGIBLE_POSITION_MANAGER_ABI = json.loads('''
[
{"inputs": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}], "name": "positions", "outputs": [{"internalType": "uint96", "name": "nonce", "type": "uint96"}, {"internalType": "address", "name": "operator", "type": "address"}, {"internalType": "address", "name": "token0", "type": "address"}, {"internalType": "address", "name": "token1", "type": "address"}, {"internalType": "uint24", "name": "fee", "type": "uint24"}, {"internalType": "int24", "name": "tickLower", "type": "int24"}, {"internalType": "int24", "name": "tickUpper", "type": "int24"}, {"internalType": "uint128", "name": "liquidity", "type": "uint128"}, {"internalType": "uint256", "name": "feeGrowthInside0LastX128", "type": "uint256"}, {"internalType": "uint256", "name": "feeGrowthInside1LastX128", "type": "uint256"}, {"internalType": "uint128", "name": "tokensOwed0", "type": "uint128"}, {"internalType": "uint128", "name": "tokensOwed1", "type": "uint128"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"components": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}, {"internalType": "address", "name": "recipient", "type": "address"}, {"internalType": "uint128", "name": "amount0Max", "type": "uint128"}, {"internalType": "uint128", "name": "amount1Max", "type": "uint128"}], "internalType": "struct INonfungiblePositionManager.CollectParams", "name": "params", "type": "tuple"}], "name": "collect", "outputs": [{"internalType": "uint256", "name": "amount0", "type": "uint256"}, {"internalType": "uint256", "name": "amount1", "type": "uint256"}], "stateMutability": "payable", "type": "function"}
]
''')
ERC20_ABI = json.loads('''
[
{"inputs": [], "name": "decimals", "outputs": [{"internalType": "uint8", "name": "", "type": "uint8"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "symbol", "outputs": [{"internalType": "string", "name": "", "type": "string"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"internalType": "address", "name": "account", "type": "address"}], "name": "balanceOf", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"}
]
''')
def load_status_file():
"""Load hedge status file"""
status_file = "hedge_status.json"
if not os.path.exists(status_file):
logger.error(f"Status file {status_file} not found")
return []
try:
with open(status_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading status file: {e}")
return []
def from_wei(amount, decimals):
"""Convert wei to human readable amount"""
if amount is None:
return 0
return amount / (10**decimals)
def get_position_details(w3, npm_contract, token_id):
"""Get detailed position information"""
try:
position_data = npm_contract.functions.positions(token_id).call()
(nonce, operator, token0_address, token1_address, fee, tickLower, tickUpper,
liquidity, feeGrowthInside0, feeGrowthInside1, tokensOwed0, tokensOwed1) = position_data
# Get token details
token0_contract = w3.eth.contract(address=token0_address, abi=ERC20_ABI)
token1_contract = w3.eth.contract(address=token1_address, abi=ERC20_ABI)
token0_symbol = token0_contract.functions.symbol().call()
token1_symbol = token1_contract.functions.symbol().call()
token0_decimals = token0_contract.functions.decimals().call()
token1_decimals = token1_contract.functions.decimals().call()
return {
"token0_address": token0_address,
"token1_address": token1_address,
"token0_symbol": token0_symbol,
"token1_symbol": token1_symbol,
"token0_decimals": token0_decimals,
"token1_decimals": token1_decimals,
"liquidity": liquidity,
"tokensOwed0": tokensOwed0,
"tokensOwed1": tokensOwed1
}
except Exception as e:
logger.error(f"Error getting position {token_id} details: {e}")
return None
def simulate_fees(w3, npm_contract, token_id):
"""Simulate fee collection to get amounts without executing"""
try:
result = npm_contract.functions.collect(
(token_id, "0x0000000000000000000000000000000000000000", 2**128-1, 2**128-1)
).call()
return result[0], result[1] # amount0, amount1
except Exception as e:
logger.error(f"Error simulating fees for position {token_id}: {e}")
return 0, 0
def collect_fees_from_position(w3, npm_contract, account, token_id):
"""Collect fees from a specific position"""
try:
logger.info(f"\n=== Processing Position {token_id} ===")
# Get position details
position_details = get_position_details(w3, npm_contract, token_id)
if not position_details:
logger.error(f"Could not get details for position {token_id}")
return False
logger.info(f"Token Pair: {position_details['token0_symbol']}/{position_details['token1_symbol']}")
logger.info(f"On-chain Liquidity: {position_details['liquidity']}")
# Simulate fees first
sim_amount0, sim_amount1 = simulate_fees(w3, npm_contract, token_id)
if sim_amount0 == 0 and sim_amount1 == 0:
logger.info(f"No fees available for position {token_id}")
return True
logger.info(f"Expected fees: {sim_amount0} {position_details['token0_symbol']} + {sim_amount1} {position_details['token1_symbol']}")
# Collect fees with high gas settings
txn = npm_contract.functions.collect(
(token_id, account.address, 2**128-1, 2**128-1)
).build_transaction({
'from': account.address,
'nonce': w3.eth.get_transaction_count(account.address),
'gas': 300000, # High gas limit
'maxFeePerGas': w3.eth.gas_price * 4, # 4x gas price
'maxPriorityFeePerGas': w3.eth.max_priority_fee * 3,
'chainId': w3.eth.chain_id
})
# Sign and send
signed_txn = w3.eth.account.sign_transaction(txn, private_key=account.key)
tx_hash = w3.eth.send_raw_transaction(signed_txn.raw_transaction)
logger.info(f"Collect fees sent: {tx_hash.hex()}")
logger.info(f"Arbiscan: https://arbiscan.io/tx/{tx_hash.hex()}")
# Wait with extended timeout
receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=600)
if receipt.status == 1:
logger.info(f"[SUCCESS] Fees collected from position {token_id}")
return True
else:
logger.error(f"[ERROR] Fee collection failed for position {token_id}. Status: {receipt.status}")
return False
except Exception as e:
logger.error(f"[ERROR] Fee collection failed for position {token_id}: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Collect fees from Uniswap V3 positions')
parser.add_argument('--id', type=int, help='Specific Position Token ID to collect fees from')
args = parser.parse_args()
logger.info("=== Fee Collection Script v2 ===")
logger.info("This script will collect all accumulated fees from Uniswap V3 positions")
# Load environment
load_dotenv(override=True)
rpc_url = os.environ.get("MAINNET_RPC_URL")
private_key = os.environ.get("MAIN_WALLET_PRIVATE_KEY") or os.environ.get("PRIVATE_KEY")
if not rpc_url or not private_key:
logger.error("[ERROR] Missing RPC URL or Private Key")
logger.error("Please ensure MAINNET_RPC_URL and PRIVATE_KEY are set in your .env file")
return
# Connect to Arbitrum
try:
w3 = Web3(Web3.HTTPProvider(rpc_url))
if not w3.is_connected():
logger.error("[ERROR] Failed to connect to Arbitrum RPC")
return
logger.info(f"[SUCCESS] Connected to Chain ID: {w3.eth.chain_id}")
except Exception as e:
logger.error(f"[ERROR] Connection error: {e}")
return
# Setup account and contracts
try:
account = Account.from_key(private_key)
w3.eth.default_account = account.address
logger.info(f"Wallet: {account.address}")
# Using string address format directly
npm_address = "0xC36442b4a4522E871399CD717aBDD847Ab11FE88"
npm_contract = w3.eth.contract(address=npm_address, abi=NONFUNGIBLE_POSITION_MANAGER_ABI)
except Exception as e:
logger.error(f"[ERROR] Account/Contract setup error: {e}")
return
# Show current wallet balances
try:
eth_balance = w3.eth.get_balance(account.address)
logger.info(f"ETH Balance: {eth_balance / 10**18:.6f} ETH")
# Check token balances using basic addresses
try:
weth_address = "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"
weth_contract = w3.eth.contract(address=weth_address, abi=ERC20_ABI)
weth_balance = weth_contract.functions.balanceOf(account.address).call()
logger.info(f"WETH Balance: {weth_balance / 10**18:.6f} WETH")
except:
pass
try:
usdc_address = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831"
usdc_contract = w3.eth.contract(address=usdc_address, abi=ERC20_ABI)
usdc_balance = usdc_contract.functions.balanceOf(account.address).call()
logger.info(f"USDC Balance: {usdc_balance / 10**6:.2f} USDC")
except:
pass
except Exception as e:
logger.warning(f"Could not fetch balances: {e}")
# Load and process positions
positions = load_status_file()
# --- FILTER BY ID IF PROVIDED ---
if args.id:
logger.info(f"🎯 Target Mode: Checking specific Position ID {args.id}")
# Check if it exists in the file
target_pos = next((p for p in positions if p.get('token_id') == args.id), None)
if target_pos:
positions = [target_pos]
else:
logger.warning(f"⚠️ Position {args.id} not found in hedge_status.json")
logger.info("Attempting to collect from it anyway (Manual Override)...")
positions = [{'token_id': args.id, 'status': 'MANUAL_OVERRIDE'}]
if not positions:
logger.info("No positions found to process")
return
logger.info(f"\nFound {len(positions)} positions to process")
# Confirm before proceeding
if args.id:
print(f"\nReady to collect fees from Position {args.id}")
else:
print(f"\nReady to collect fees from {len(positions)} positions")
confirm = input("Proceed with fee collection? (y/N): ").strip().lower()
if confirm != 'y':
logger.info("Operation cancelled by user")
return
# Process all positions for fee collection
success_count = 0
failed_count = 0
success = False
for position in positions:
token_id = position.get('token_id')
status = position.get('status', 'UNKNOWN')
if success:
time.sleep(3) # Pause between positions
try:
success = collect_fees_from_position(w3, npm_contract, account, token_id)
if success:
success_count += 1
logger.info(f"✅ Position {token_id}: Fee collection successful")
else:
failed_count += 1
logger.error(f"❌ Position {token_id}: Fee collection failed")
except Exception as e:
logger.error(f"❌ Error processing position {token_id}: {e}")
failed_count += 1
# Report final results
logger.info(f"\n=== Fee Collection Summary ===")
logger.info(f"Total Positions: {len(positions)}")
logger.info(f"Successful: {success_count}")
logger.info(f"Failed: {failed_count}")
if success_count > 0:
logger.info(f"[SUCCESS] Fee collection completed for {success_count} positions!")
logger.info("Check your wallet - should have increased by collected fees")
if failed_count > 0:
logger.warning(f"[WARNING] {failed_count} positions failed. Check collect_fees.log for details.")
logger.info("=== Fee Collection Script Complete ===")
if __name__ == "__main__":
main()