working version, before optimalization

This commit is contained in:
2026-01-06 09:47:49 +01:00
parent c29dc2c8ac
commit a166d33012
36 changed files with 5394 additions and 901 deletions

107
tools/analyze_pool_data.py Normal file
View File

@ -0,0 +1,107 @@
import os
import json
import pandas as pd
import math
from decimal import Decimal
from datetime import datetime, timedelta
# --- SETTINGS ---
HISTORY_FILE = os.path.join("market_data", "pool_history.csv")
INVESTMENT_USD = 10000
RANGE_WIDTH_PCT = 0.10 # +/- 10%
REBALANCE_COST_PCT = 0.001 # 0.1% fee for rebalancing (swaps + gas)
def tick_to_price(tick):
return 1.0001 ** tick
def get_delta_from_pct(pct):
# tick_delta = log(1+pct) / log(1.0001)
return int(math.log(1 + pct) / math.log(1.0001))
def analyze():
if not os.path.exists(HISTORY_FILE):
print("No history file found. Run pool_scanner.py first.")
return
df = pd.read_csv(HISTORY_FILE)
df['timestamp'] = pd.to_datetime(df['timestamp'])
pools = df['pool_name'].unique()
results = []
for pool in pools:
pdf = df[df['pool_name'] == pool].sort_values('timestamp').copy()
if len(pdf) < 2: continue
# Initial Setup
start_row = pdf.iloc[0]
curr_tick = start_row['tick']
tick_delta = get_delta_from_pct(RANGE_WIDTH_PCT)
range_lower = curr_tick - tick_delta
range_upper = curr_tick + tick_delta
equity = INVESTMENT_USD
total_fees = 0
rebalance_count = 0
# We track "Fees per unit of liquidity" change
# FG values are X128 (shifted by 2^128)
Q128 = 2**128
# Simple Proxy for USD Fees:
# Fee_USD = (Delta_FG0 / 10^d0 * P0_USD + Delta_FG1 / 10^d1 * P1_USD) * L
# Since calculating L is complex, we use a proportional approach:
# (New_FG - Old_FG) / Old_FG as a growth rate of the pool's fee pool.
for i in range(1, len(pdf)):
row = pdf.iloc[i]
prev = pdf.iloc[i-1]
p_tick = row['tick']
# 1. Check Range & Rebalance
if p_tick < range_lower or p_tick > range_upper:
# REBALANCE!
rebalance_count += 1
equity *= (1 - REBALANCE_COST_PCT)
# Reset Range
range_lower = p_tick - tick_delta
range_upper = p_tick + tick_delta
continue # No fees earned during the jump
# 2. Accrue Fees (If in range)
# Simplified growth logic: (NewGlobal - OldGlobal) / Price_approx
# For a more robust version, we'd need exact L.
# Here we track the delta of the raw FG counters.
dfg0 = int(row['feeGrowth0']) - int(prev['feeGrowth0'])
dfg1 = int(row['feeGrowth1']) - int(prev['feeGrowth1'])
# Convert DFG to a USD estimate based on pool share
# This is a heuristic: 10k USD usually represents a specific % of pool liquidity.
# We assume a fixed liquidity L derived from 10k at start.
# L = 10000 / (sqrt(P) - sqrt(Pa)) ...
# For this benchmark, we'll output the "Fee Growth %"
# which is the most objective way to compare pools.
# (Calculated as: how much the global fee counter grew while you were in range)
# Summary for Pool
duration = pdf.iloc[-1]['timestamp'] - pdf.iloc[0]['timestamp']
results.append({
"Pool": pool,
"Duration": str(duration),
"Rebalances": rebalance_count,
"Final Equity (Est)": round(equity, 2),
"ROI %": round(((equity / INVESTMENT_USD) - 1) * 100, 4)
})
report = pd.DataFrame(results)
print("\n=== POOL PERFORMANCE REPORT ===")
print(report.to_string(index=False))
print("\nNote: ROI includes price exposure and rebalance costs.")
if __name__ == "__main__":
analyze()

View File

@ -0,0 +1,171 @@
import json
import time
import requests
import math
import os
from datetime import datetime
from statistics import mean, stdev
# --- Configuration ---
COINS = ["ETH"]
# Mapping of label to number of 1-minute periods
PERIODS_CONFIG = {
"37m": 37,
"3h": 3 * 60, # 180 minutes
"12h": 12 * 60, # 720 minutes
"24h": 24 * 60 # 1440 minutes
}
MA_PERIODS = [33, 44, 88, 144]
STD_DEV_MULTIPLIER = 1.6 # Standard deviation multiplier for bands
OUTPUT_FILE = os.path.join("market_data", "indicators.json")
API_URL = "https://api.hyperliquid.xyz/info"
UPDATE_INTERVAL = 60 # seconds
def fetch_candles(coin, interval="1m", lookback_minutes=1500):
"""
Fetches candle data from Hyperliquid.
We need at least enough candles for the longest period (1440).
Requesting slightly more to be safe.
"""
# Calculate startTime: now - (lookback_minutes * 60 * 1000)
# Hyperliquid expects startTime in milliseconds
end_time = int(time.time() * 1000)
start_time = end_time - (lookback_minutes * 60 * 1000)
payload = {
"type": "candleSnapshot",
"req": {
"coin": coin,
"interval": interval,
"startTime": start_time,
"endTime": end_time
}
}
try:
response = requests.post(API_URL, json=payload, timeout=10)
response.raise_for_status()
data = response.json()
# Data format is typically a list of dicts:
# {'t': 170..., 'T': 170..., 's': 'ETH', 'i': '1m', 'o': '...', 'c': '...', 'h': '...', 'l': '...', 'v': '...', 'n': ...}
# We need closing prices 'c'
candles = []
for c in data:
try:
# Ensure we parse 'c' (close) as float
candles.append(float(c['c']))
except (ValueError, KeyError):
continue
return candles
except Exception as e:
print(f"Error fetching candles for {coin}: {e}")
return []
def calculate_ma(prices, period):
"""Calculates Simple Moving Average."""
if len(prices) < period:
return None
return mean(prices[-period:])
def calculate_bb(prices, period, num_std_dev=2.0):
"""
Calculates Bollinger Bands for the LAST 'period' items in prices.
Returns {mid, upper, lower} or None if insufficient data.
"""
if len(prices) < period:
return None
# Take the last 'period' prices
window = prices[-period:]
try:
avg = mean(window)
# Population stdev or sample stdev? Usually sample (stdev) is used in finance or pandas default
if period > 1:
sd = stdev(window)
else:
sd = 0.0
upper = avg + (num_std_dev * sd)
lower = avg - (num_std_dev * sd)
return {
"mid": avg,
"upper": upper,
"lower": lower,
"std": sd
}
except Exception as e:
print(f"Error calculating BB: {e}")
return None
def main():
print(f"Starting Market Data Calculator for {COINS}")
print(f"BB Periods: {PERIODS_CONFIG}")
print(f"MA Periods: {MA_PERIODS}")
print(f"Output: {OUTPUT_FILE}")
# Ensure directory exists
os.makedirs(os.path.dirname(OUTPUT_FILE), exist_ok=True)
while True:
try:
results = {
"last_updated": datetime.now().isoformat(),
"config": {
"std_dev_multiplier": STD_DEV_MULTIPLIER,
"ma_periods": MA_PERIODS
},
"data": {}
}
# Find the max needed history (BB vs MA)
max_bb = max(PERIODS_CONFIG.values()) if PERIODS_CONFIG else 0
max_ma = max(MA_PERIODS) if MA_PERIODS else 0
fetch_limit = max(max_bb, max_ma) + 60
for coin in COINS:
print(f"Fetching data for {coin}...", end="", flush=True)
prices = fetch_candles(coin, lookback_minutes=fetch_limit)
if not prices:
print(" Failed.")
continue
print(f" Got {len(prices)} candles.", end="", flush=True)
coin_results = {
"current_price": prices[-1] if prices else 0,
"bb": {},
"ma": {}
}
# Calculate BB
for label, period in PERIODS_CONFIG.items():
bb = calculate_bb(prices, period, num_std_dev=STD_DEV_MULTIPLIER)
coin_results["bb"][label] = bb if bb else "Insufficient Data"
# Calculate MA
for period in MA_PERIODS:
ma = calculate_ma(prices, period)
coin_results["ma"][str(period)] = ma if ma else "Insufficient Data"
results["data"][coin] = coin_results
print(" Done.")
# Save to file
with open(OUTPUT_FILE, 'w') as f:
json.dump(results, f, indent=4)
print(f"Updated {OUTPUT_FILE}")
except Exception as e:
print(f"Main loop error: {e}")
time.sleep(UPDATE_INTERVAL)
if __name__ == "__main__":
main()

192
tools/pool_scanner.py Normal file
View File

@ -0,0 +1,192 @@
import os
import json
import time
import pandas as pd
from decimal import Decimal
from datetime import datetime
from web3 import Web3
from dotenv import load_dotenv
# --- CONFIGURATION ---
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "pool_scanner_config.json")
STATE_FILE = os.path.join("market_data", "pool_scanner_state.json")
HISTORY_FILE = os.path.join("market_data", "pool_history.csv")
load_dotenv()
# RPC MAP
RPC_MAP = {
"ARBITRUM": os.environ.get("MAINNET_RPC_URL"),
"BSC": os.environ.get("BNB_RPC_URL"),
"BASE": os.environ.get("BASE_RPC_URL")
}
# ABIS
POOL_ABI = json.loads('''
[
{"inputs": [], "name": "slot0", "outputs": [{"internalType": "uint160", "name": "sqrtPriceX96", "type": "uint160"}, {"internalType": "int24", "name": "tick", "type": "int24"}, {"internalType": "uint16", "name": "observationIndex", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinality", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinalityNext", "type": "uint16"}, {"internalType": "uint8", "name": "feeProtocol", "type": "uint8"}, {"internalType": "bool", "name": "unlocked", "type": "bool"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal0X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal1X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token0", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token1", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"}
]
''')
# PancakeSwap V3 uses uint32 for feeProtocol
PANCAKE_POOL_ABI = json.loads('''
[
{"inputs": [], "name": "slot0", "outputs": [{"internalType": "uint160", "name": "sqrtPriceX96", "type": "uint160"}, {"internalType": "int24", "name": "tick", "type": "int24"}, {"internalType": "uint16", "name": "observationIndex", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinality", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinalityNext", "type": "uint16"}, {"internalType": "uint32", "name": "feeProtocol", "type": "uint32"}, {"internalType": "bool", "name": "unlocked", "type": "bool"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal0X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal1X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token0", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token1", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"}
]
''')
AERODROME_POOL_ABI = json.loads('''
[
{"inputs": [], "name": "slot0", "outputs": [{"internalType": "uint160", "name": "sqrtPriceX96", "type": "uint160"}, {"internalType": "int24", "name": "tick", "type": "int24"}, {"internalType": "uint16", "name": "observationIndex", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinality", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinalityNext", "type": "uint16"}, {"internalType": "bool", "name": "unlocked", "type": "bool"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal0X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal1X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token0", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token1", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"}
]
''')
ERC20_ABI = json.loads('[{"inputs": [], "name": "decimals", "outputs": [{"internalType": "uint8", "name": "", "type": "uint8"}], "stateMutability": "view", "type": "function"}]')
def get_w3(chain):
url = RPC_MAP.get(chain)
if not url: return None
return Web3(Web3.HTTPProvider(url))
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
return json.load(f)
return {}
def save_state(state):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def append_history(data):
df = pd.DataFrame([data])
header = not os.path.exists(HISTORY_FILE)
df.to_csv(HISTORY_FILE, mode='a', header=header, index=False)
def get_liquidity_for_amount(amount, sqrt_price_x96, tick_lower, tick_upper, decimal_diff):
# Simplified Liquidity Calc for 50/50 deposit simulation
# L = Amount / (sqrt(P) - sqrt(Pa)) for one side...
# For now, we assume simple V3 math or just track Fee Growth per Unit Liquidity
# Real simulation is complex.
# TRICK: We will track "Fee Growth per 1 Unit of Liquidity" directly (Raw X128).
# Then user can multiply by their theoretical L later.
return 1
def main():
print("Starting Pool Scanner...")
with open(CONFIG_FILE, 'r') as f:
pools = json.load(f)
state = load_state()
# Init Web3 cache
w3_instances = {}
for pool in pools:
name = pool['name']
chain = pool['chain']
# Fix Checksum
try:
addr = Web3.to_checksum_address(pool['pool_address'])
except Exception:
print(f" ❌ Invalid Address: {pool['pool_address']}")
continue
is_aero = pool.get('is_aerodrome', False)
print(f"Scanning {name} ({chain})...")
if chain not in w3_instances:
w3_instances[chain] = get_w3(chain)
w3 = w3_instances[chain]
if not w3 or not w3.is_connected():
print(f" ❌ RPC Error for {chain}")
continue
try:
if is_aero:
abi = AERODROME_POOL_ABI
elif chain == "BSC":
abi = PANCAKE_POOL_ABI
else:
abi = POOL_ABI
contract = w3.eth.contract(address=addr, abi=abi)
# Fetch Data
slot0 = contract.functions.slot0().call()
tick = slot0[1]
sqrt_price = slot0[0]
fg0 = contract.functions.feeGrowthGlobal0X128().call()
fg1 = contract.functions.feeGrowthGlobal1X128().call()
# Fetch Decimals (Once)
if name not in state:
t0 = contract.functions.token0().call()
t1 = contract.functions.token1().call()
d0 = w3.eth.contract(address=t0, abi=ERC20_ABI).functions.decimals().call()
d1 = w3.eth.contract(address=t1, abi=ERC20_ABI).functions.decimals().call()
state[name] = {
"init_tick": tick,
"init_fg0": fg0,
"init_fg1": fg1,
"decimals": [d0, d1],
"cumulative_fees_usd": 0.0,
"last_fg0": fg0,
"last_fg1": fg1
}
# Update State
prev = state[name]
diff0 = fg0 - prev['last_fg0']
diff1 = fg1 - prev['last_fg1']
# Calculate USD Value of Fees (Approx)
# Need Liquidity.
# If we assume 1 unit of Liquidity?
# Fee = Diff * L / 2^128
# Update Last
prev['last_fg0'] = fg0
prev['last_fg1'] = fg1
prev['last_tick'] = tick
prev['last_update'] = datetime.now().isoformat()
# Save History
record = {
"timestamp": datetime.now().isoformat(),
"pool_name": name,
"chain": chain,
"tick": tick,
"sqrtPriceX96": str(sqrt_price),
"feeGrowth0": str(fg0),
"feeGrowth1": str(fg1)
}
append_history(record)
print(f" ✅ Data recorded. Tick: {tick}")
except Exception as e:
print(f" ❌ Error: {e}")
save_state(state)
print("Scan complete.")
if __name__ == "__main__":
while True:
main()
time.sleep(600) # 10 minutes

146
tools/record_pool_depth.py Normal file
View File

@ -0,0 +1,146 @@
import os
import time
import csv
import logging
from datetime import datetime
from decimal import Decimal, getcontext
from web3 import Web3
from dotenv import load_dotenv
# --- CONFIGURATION ---
POOL_ADDRESS = '0xC6962004f452bE9203591991D15f6b388e09E8D0' # ETH/USDC 0.05% Arbitrum
INTERVAL_SECONDS = 15 * 60 # 15 minutes
RANGE_PCT = 10.0 # Total scan range +/- 10%
STEP_PCT = 0.1 # Resolution step 0.1%
TVL_USD_BASELINE = Decimal('74530000') # Baseline TVL for concentration calculation
# Token Details
D0 = 18 # WETH
D1 = 6 # USDC
getcontext().prec = 60
load_dotenv()
# Setup Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("DEPTH_MONITOR")
# Ensure logs directory exists
os.makedirs('logs', exist_ok=True)
CSV_FILE = 'logs/pool_liquidity_depth.csv'
# ABI for Uniswap V3 Pool
POOL_ABI = [
{'inputs': [], 'name': 'liquidity', 'outputs': [{'internalType': 'uint128', 'name': '', 'type': 'uint128'}], 'stateMutability': 'view', 'type': 'function'},
{'inputs': [], 'name': 'slot0', 'outputs': [{'internalType': 'uint160', 'name': 'sqrtPriceX96', 'type': 'uint160'}, {'internalType': 'int24', 'name': 'tick', 'type': 'int24'}], 'stateMutability': 'view', 'type': 'function'},
{'inputs': [{'internalType': 'int24', 'name': 'tick', 'type': 'int24'}], 'name': 'ticks', 'outputs': [{'internalType': 'uint128', 'name': 'liquidityGross', 'type': 'uint128'}, {'internalType': 'int128', 'name': 'liquidityNet', 'type': 'int128'}], 'stateMutability': 'view', 'type': 'function'}
]
def get_price_from_tick(tick):
return (Decimal('1.0001') ** Decimal(str(tick))) * (Decimal('10') ** Decimal(str(D0 - D1)))
def get_liquidity_at_offsets(pool_contract, current_tick, current_liquidity):
"""
Samples liquidity at various price offsets.
Note: This samples initialized ticks to calculate L at specific price points.
"""
results = []
# Tick spacing for 0.05% pools is 10.
# 0.1% price move is approx 10 ticks.
ticks_per_step = 10
total_steps = int(RANGE_PCT / STEP_PCT)
# --- SCAN DOWN ---
l_running = Decimal(current_liquidity)
for i in range(1, total_steps + 1):
target_tick = current_tick - (i * ticks_per_step)
# Traverse ticks between previous and current target to update liquidity
for t in range(current_tick - (i-1)*ticks_per_step, target_tick - 1, -1):
data = pool_contract.functions.ticks(t).call()
if data[0] > 0: # initialized
l_running -= Decimal(data[1])
offset_pct = -round(i * STEP_PCT, 2)
px = get_price_from_tick(target_tick)
results.append({'offset': offset_pct, 'price': px, 'liquidity': l_running})
# --- SCAN UP ---
l_running = Decimal(current_liquidity)
for i in range(1, total_steps + 1):
target_tick = current_tick + (i * ticks_per_step)
for t in range(current_tick + (i-1)*ticks_per_step + 1, target_tick + 1):
data = pool_contract.functions.ticks(t).call()
if data[0] > 0:
l_running += Decimal(data[1])
offset_pct = round(i * STEP_PCT, 2)
px = get_price_from_tick(target_tick)
results.append({'offset': offset_pct, 'price': px, 'liquidity': l_running})
# Add center point
results.append({'offset': 0.0, 'price': get_price_from_tick(current_tick), 'liquidity': Decimal(current_liquidity)})
return sorted(results, key=lambda x: x['offset'])
def main():
rpc_url = os.environ.get('MAINNET_RPC_URL')
if not rpc_url:
logger.error("MAINNET_RPC_URL not found in .env")
return
w3 = Web3(Web3.HTTPProvider(rpc_url))
pool = w3.eth.contract(address=Web3.to_checksum_address(POOL_ADDRESS), abi=POOL_ABI)
# Initialize CSV if it doesn't exist
file_exists = os.path.isfile(CSV_FILE)
logger.info(f"Starting Depth Monitor for {POOL_ADDRESS}")
logger.info(f"Scan Range: +/-{RANGE_PCT}% | Resolution: {STEP_PCT}% | Interval: {INTERVAL_SECONDS/60}m")
while True:
try:
# 1. Fetch State
l_active = pool.functions.liquidity().call()
s0 = pool.functions.slot0().call()
curr_tick = s0[1]
curr_price = get_price_from_tick(curr_tick)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 2. Map Depth
depth_data = get_liquidity_at_offsets(pool, curr_tick, l_active)
# 3. Calculate Concentration & Save
with open(CSV_FILE, 'a', newline='') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(['timestamp', 'ref_price', 'offset_pct', 'target_price', 'liquidity', 'concentration'])
file_exists = True
for row in depth_data:
# L_full baseline for THIS specific price point
# Corrected L_full: (TVL * 10^6) / (2 * sqrtP)
# sqrtP_norm = sqrt(Price) / 10^((D0-D1)/2)
sqrtP_norm = row['price'].sqrt() / (Decimal('10') ** (Decimal(str(D0 - D1)) / Decimal('2')))
l_full = (TVL_USD_BASELINE * (Decimal('10')**Decimal('6'))) / (Decimal('2') * sqrtP_norm)
conc = row['liquidity'] / l_full
writer.writerow([
timestamp,
f"{curr_price:.4f}",
row['offset'],
f"{row['price']:.4f}",
f"{row['liquidity']:.0f}",
f"{conc:.2f}"
])
logger.info(f"Recorded depth snapshot at {curr_price:.2f}. Next in {INTERVAL_SECONDS/60}m.")
time.sleep(INTERVAL_SECONDS)
except Exception as e:
logger.error(f"Error in monitor loop: {e}")
time.sleep(60)
if __name__ == "__main__":
main()

349
tools/universal_swapper.py Normal file
View File

@ -0,0 +1,349 @@
import os
import sys
import json
import argparse
import time
from decimal import Decimal
from dotenv import load_dotenv
from web3 import Web3
# Load environment variables
load_dotenv()
# --- CONFIGURATION ---
# ABIs
ERC20_ABI = json.loads('''
[
{"inputs": [], "name": "decimals", "outputs": [{"internalType": "uint8", "name": "", "type": "uint8"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "symbol", "outputs": [{"internalType": "string", "name": "", "type": "string"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"internalType": "address", "name": "account", "type": "address"}], "name": "balanceOf", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"internalType": "address", "name": "spender", "type": "address"}, {"internalType": "uint256", "name": "amount", "type": "uint256"}], "name": "approve", "outputs": [{"internalType": "bool", "name": "", "type": "bool"}], "stateMutability": "nonpayable", "type": "function"},
{"inputs": [{"internalType": "address", "name": "owner", "type": "address"}, {"internalType": "address", "name": "spender", "type": "address"}], "name": "allowance", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"}
]
''')
WETH_ABI = json.loads('''
[
{"inputs": [], "name": "deposit", "outputs": [], "stateMutability": "payable", "type": "function"},
{"inputs": [{"internalType": "uint256", "name": "wad", "type": "uint256"}], "name": "withdraw", "outputs": [], "stateMutability": "nonpayable", "type": "function"}
]
''')
# SwapRouter01 (With Deadline in struct) - e.g. Arbitrum 0xE592..., BSC
SWAP_ROUTER_01_ABI = json.loads('''
[
{"inputs": [{"components": [{"internalType": "address", "name": "tokenIn", "type": "address"}, {"internalType": "address", "name": "tokenOut", "type": "address"}, {"internalType": "uint24", "name": "fee", "type": "uint24"}, {"internalType": "address", "name": "recipient", "type": "address"}, {"internalType": "uint256", "name": "deadline", "type": "uint256"}, {"internalType": "uint256", "name": "amountIn", "type": "uint256"}, {"internalType": "uint256", "name": "amountOutMinimum", "type": "uint200"}, {"internalType": "uint160", "name": "sqrtPriceLimitX96", "type": "uint160"}], "internalType": "struct ISwapRouter.ExactInputSingleParams", "name": "params", "type": "tuple"}], "name": "exactInputSingle", "outputs": [{"internalType": "uint256", "name": "amountOut", "type": "uint256"}], "stateMutability": "payable", "type": "function"}
]
''')
# SwapRouter02 (NO Deadline in struct) - e.g. Base 0x2626...
SWAP_ROUTER_02_ABI = json.loads('''
[
{"inputs": [{"components": [{"internalType": "address", "name": "tokenIn", "type": "address"}, {"internalType": "address", "name": "tokenOut", "type": "address"}, {"internalType": "uint24", "name": "fee", "type": "uint24"}, {"internalType": "address", "name": "recipient", "type": "address"}, {"internalType": "uint256", "name": "amountIn", "type": "uint256"}, {"internalType": "uint256", "name": "amountOutMinimum", "type": "uint256"}, {"internalType": "uint160", "name": "sqrtPriceLimitX96", "type": "uint160"}], "internalType": "struct IV3SwapRouter.ExactInputSingleParams", "name": "params", "type": "tuple"}], "name": "exactInputSingle", "outputs": [{"internalType": "uint256", "name": "amountOut", "type": "uint256"}], "stateMutability": "payable", "type": "function"}
]
''')
CHAIN_CONFIG = {
"ARBITRUM": {
"rpc_env": "MAINNET_RPC_URL",
"chain_id": 42161,
"router": "0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45", # SwapRouter02
"abi_version": 2,
"tokens": {
"USDC": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831",
"WETH": "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1",
"ETH": "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", # Alias to WETH for wrapping
"CBBTC": "0xcbB7C0000aB88B473b1f5aFd9ef808440eed33Bf"
},
"default_fee": 500
},
"BASE": {
"rpc_env": "BASE_RPC_URL",
"chain_id": 8453,
"router": "0x2626664c2603336E57B271c5C0b26F421741e481", # SwapRouter02
"abi_version": 2,
"tokens": {
"USDC": "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
"WETH": "0x4200000000000000000000000000000000000006",
"ETH": "0x4200000000000000000000000000000000000006", # Alias to WETH for wrapping
"CBBTC": "0xcbB7C0000aB88B473b1f5aFd9ef808440eed33Bf"
},
"default_fee": 500
},
"BASE_AERO": {
"rpc_env": "BASE_RPC_URL",
"chain_id": 8453,
"router": "0xbe6D8f0D397708D99755B7857067757f97174d7d", # Aerodrome Slipstream SwapRouter
"abi_version": 1, # Router requires deadline (Standard SwapRouter01 style)
"tokens": {
"USDC": "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
"WETH": "0x4200000000000000000000000000000000000006",
"ETH": "0x4200000000000000000000000000000000000006",
"CBBTC": "0xcbB7C0000aB88B473b1f5aFd9ef808440eed33Bf"
},
"default_fee": 1 # TickSpacing 1 (0.01%)
},
"BSC": {
"rpc_env": "BNB_RPC_URL",
"chain_id": 56,
"router": "0x1b81D678ffb9C0263b24A97847620C99d213eB14", # PancakeSwap V3
"abi_version": 1,
"tokens": {
"USDC": "0x8ac76a51cc950d9822d68b83fe1ad97b32cd580d",
"WBNB": "0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c",
"BNB": "0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c" # Alias to WBNB for wrapping
},
"default_fee": 500
}
}
def get_web3(chain_name):
config = CHAIN_CONFIG.get(chain_name.upper())
if not config:
raise ValueError(f"Unsupported chain: {chain_name}")
rpc_url = os.environ.get(config["rpc_env"])
if not rpc_url:
raise ValueError(f"RPC URL not found in environment for {config['rpc_env']}")
w3 = Web3(Web3.HTTPProvider(rpc_url))
if not w3.is_connected():
raise ConnectionError(f"Failed to connect to {chain_name} RPC")
return w3, config
def approve_token(w3, token_contract, spender_address, amount, private_key, my_address):
"""
Checks allowance and approves if necessary.
Robust gas handling.
"""
allowance = token_contract.functions.allowance(my_address, spender_address).call()
if allowance >= amount:
print(f"Token already approved (Allowance: {allowance})")
return True
print(f"Approving token... (Current: {allowance}, Needed: {amount})")
# Build tx base
tx_params = {
'from': my_address,
'nonce': w3.eth.get_transaction_count(my_address),
}
# Determine Gas Strategy
try:
latest_block = w3.eth.get_block('latest')
if 'baseFeePerGas' in latest_block:
# EIP-1559
base_fee = latest_block['baseFeePerGas']
priority_fee = w3.to_wei(0.1, 'gwei') # Conservative priority
tx_params['maxFeePerGas'] = int(base_fee * 1.5) + priority_fee
tx_params['maxPriorityFeePerGas'] = priority_fee
else:
# Legacy
tx_params['gasPrice'] = w3.eth.gas_price
except Exception as e:
print(f"Error determining gas strategy: {e}. Fallback to w3.eth.gas_price")
tx_params['gasPrice'] = w3.eth.gas_price
# Build transaction
tx = token_contract.functions.approve(spender_address, 2**256 - 1).build_transaction(tx_params)
# Sign and send
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
print(f"Approval Tx sent: {tx_hash.hex()}")
receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
if receipt.status == 1:
print("Approval successful.")
return True
else:
print("Approval failed.")
return False
def wrap_eth(w3, weth_address, amount_wei, private_key, my_address):
"""
Wraps native ETH/BNB to WETH/WBNB.
"""
print(f"Wrapping native token to wrapped version...")
weth_contract = w3.eth.contract(address=weth_address, abi=WETH_ABI)
tx_params = {
'from': my_address,
'value': amount_wei,
'nonce': w3.eth.get_transaction_count(my_address),
}
# Gas logic (Simplified)
latest_block = w3.eth.get_block('latest')
if 'baseFeePerGas' in latest_block:
base_fee = latest_block['baseFeePerGas']
tx_params['maxFeePerGas'] = int(base_fee * 1.5) + w3.to_wei(0.1, 'gwei')
tx_params['maxPriorityFeePerGas'] = w3.to_wei(0.1, 'gwei')
else:
tx_params['gasPrice'] = w3.eth.gas_price
tx = weth_contract.functions.deposit().build_transaction(tx_params)
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
print(f"Wrapping Tx sent: {tx_hash.hex()}")
w3.eth.wait_for_transaction_receipt(tx_hash)
print("Wrapping successful.")
def execute_swap(chain_name, token_in_sym, token_out_sym, amount_in_readable, fee_tier=None, slippage_pct=0.5):
"""
Main function to execute swap.
"""
chain_name = chain_name.upper()
token_in_sym = token_in_sym.upper()
token_out_sym = token_out_sym.upper()
w3, config = get_web3(chain_name)
# Get private key
private_key = os.environ.get("MAIN_WALLET_PRIVATE_KEY")
if not private_key:
raise ValueError("MAIN_WALLET_PRIVATE_KEY not found in environment variables")
account = w3.eth.account.from_key(private_key)
my_address = account.address
print(f"Connected to {chain_name} as {my_address}")
# Validate tokens
if token_in_sym not in config["tokens"]:
raise ValueError(f"Token {token_in_sym} not supported on {chain_name}")
if token_out_sym not in config["tokens"]:
raise ValueError(f"Token {token_out_sym} not supported on {chain_name}")
token_in_addr = config["tokens"][token_in_sym]
token_out_addr = config["tokens"][token_out_sym]
router_addr = config["router"]
abi_ver = config.get("abi_version", 1)
# Initialize Contracts
token_in_contract = w3.eth.contract(address=token_in_addr, abi=ERC20_ABI)
token_out_contract = w3.eth.contract(address=token_out_addr, abi=ERC20_ABI)
router_abi = SWAP_ROUTER_01_ABI if abi_ver == 1 else SWAP_ROUTER_02_ABI
router_contract = w3.eth.contract(address=router_addr, abi=router_abi)
# Decimals (ETH/BNB and their wrapped versions all use 18)
decimals_in = 18 if token_in_sym in ["ETH", "BNB"] else token_in_contract.functions.decimals().call()
amount_in_wei = int(Decimal(str(amount_in_readable)) * Decimal(10)**decimals_in)
print(f"Preparing to swap {amount_in_readable} {token_in_sym} -> {token_out_sym}")
# Handle Native Wrap
if token_in_sym in ["ETH", "BNB"]:
# Check native balance
native_balance = w3.eth.get_balance(my_address)
if native_balance < amount_in_wei:
raise ValueError(f"Insufficient native balance. Have {native_balance / 10**18}, need {amount_in_readable}")
# Check if we already have enough wrapped token
w_balance = token_in_contract.functions.balanceOf(my_address).call()
if w_balance < amount_in_wei:
wrap_eth(w3, token_in_addr, amount_in_wei - w_balance, private_key, my_address)
else:
# Check Token Balance
balance = token_in_contract.functions.balanceOf(my_address).call()
if balance < amount_in_wei:
raise ValueError(f"Insufficient balance. Have {balance / 10**decimals_in} {token_in_sym}, need {amount_in_readable}")
# Approve
approve_token(w3, token_in_contract, router_addr, amount_in_wei, private_key, my_address)
# Prepare Swap Params
used_fee = fee_tier if fee_tier else config["default_fee"]
amount_out_min = 0
if abi_ver == 1:
# Router 01 (Deadline in struct)
params = (
token_in_addr,
token_out_addr,
used_fee,
my_address,
int(time.time()) + 120, # deadline
amount_in_wei,
amount_out_min,
0 # sqrtPriceLimitX96
)
else:
# Router 02 (No Deadline in struct)
params = (
token_in_addr,
token_out_addr,
used_fee,
my_address,
# No deadline here
amount_in_wei,
amount_out_min,
0 # sqrtPriceLimitX96
)
print(f"Swapping... Fee Tier: {used_fee} | ABI: V{abi_ver}")
# Build Tx
tx_build = {
'from': my_address,
'nonce': w3.eth.get_transaction_count(my_address),
}
# Estimate Gas
try:
gas_estimate = router_contract.functions.exactInputSingle(params).estimate_gas(tx_build)
tx_build['gas'] = int(gas_estimate * 1.2)
except Exception as e:
print(f"Gas estimation failed: {e}. Using default gas limit (500k).")
tx_build['gas'] = 500000
# Add Gas Price (Same robust logic as approve)
if chain_name == "BSC":
tx_build['gasPrice'] = w3.eth.gas_price
else:
try:
latest_block = w3.eth.get_block('latest')
if 'baseFeePerGas' in latest_block:
base_fee = latest_block['baseFeePerGas']
priority_fee = w3.to_wei(0.1, 'gwei')
tx_build['maxFeePerGas'] = int(base_fee * 1.5) + priority_fee
tx_build['maxPriorityFeePerGas'] = priority_fee
else:
tx_build['gasPrice'] = w3.eth.gas_price
except:
tx_build['gasPrice'] = w3.eth.gas_price
# Sign and Send
tx_func = router_contract.functions.exactInputSingle(params)
tx = tx_func.build_transaction(tx_build)
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
print(f"Swap Tx Sent: {tx_hash.hex()}")
receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
if receipt.status == 1:
print("Swap Successful!")
else:
print("Swap Failed!")
# print(receipt) # verbose
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Universal Swapper for Arbitrum, Base, BSC")
parser.add_argument("chain", help="Chain name (ARBITRUM, BASE, BSC)")
parser.add_argument("token_in", help="Token to sell (e.g. USDC)")
parser.add_argument("token_out", help="Token to buy (e.g. WETH)")
parser.add_argument("amount", help="Amount to swap", type=float)
parser.add_argument("--fee", help="Fee tier (e.g. 500, 3000)", type=int)
args = parser.parse_args()
try:
execute_swap(args.chain, args.token_in, args.token_out, args.amount, args.fee)
except Exception as e:
print(f"Error: {e}")