Files
uniswap_auto_clp/florida/tools/pool_scanner.py

193 lines
8.9 KiB
Python

import os
import json
import time
import pandas as pd
from decimal import Decimal
from datetime import datetime
from web3 import Web3
from dotenv import load_dotenv
# --- CONFIGURATION ---
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "pool_scanner_config.json")
STATE_FILE = os.path.join("market_data", "pool_scanner_state.json")
HISTORY_FILE = os.path.join("market_data", "pool_history.csv")
load_dotenv()
# RPC MAP
RPC_MAP = {
"ARBITRUM": os.environ.get("MAINNET_RPC_URL"),
"BSC": os.environ.get("BNB_RPC_URL"),
"BASE": os.environ.get("BASE_RPC_URL")
}
# ABIS
POOL_ABI = json.loads('''
[
{"inputs": [], "name": "slot0", "outputs": [{"internalType": "uint160", "name": "sqrtPriceX96", "type": "uint160"}, {"internalType": "int24", "name": "tick", "type": "int24"}, {"internalType": "uint16", "name": "observationIndex", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinality", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinalityNext", "type": "uint16"}, {"internalType": "uint8", "name": "feeProtocol", "type": "uint8"}, {"internalType": "bool", "name": "unlocked", "type": "bool"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal0X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal1X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token0", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token1", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"}
]
''')
# PancakeSwap V3 uses uint32 for feeProtocol
PANCAKE_POOL_ABI = json.loads('''
[
{"inputs": [], "name": "slot0", "outputs": [{"internalType": "uint160", "name": "sqrtPriceX96", "type": "uint160"}, {"internalType": "int24", "name": "tick", "type": "int24"}, {"internalType": "uint16", "name": "observationIndex", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinality", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinalityNext", "type": "uint16"}, {"internalType": "uint32", "name": "feeProtocol", "type": "uint32"}, {"internalType": "bool", "name": "unlocked", "type": "bool"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal0X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal1X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token0", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token1", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"}
]
''')
AERODROME_POOL_ABI = json.loads('''
[
{"inputs": [], "name": "slot0", "outputs": [{"internalType": "uint160", "name": "sqrtPriceX96", "type": "uint160"}, {"internalType": "int24", "name": "tick", "type": "int24"}, {"internalType": "uint16", "name": "observationIndex", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinality", "type": "uint16"}, {"internalType": "uint16", "name": "observationCardinalityNext", "type": "uint16"}, {"internalType": "bool", "name": "unlocked", "type": "bool"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal0X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "feeGrowthGlobal1X128", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token0", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "token1", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"}
]
''')
ERC20_ABI = json.loads('[{"inputs": [], "name": "decimals", "outputs": [{"internalType": "uint8", "name": "", "type": "uint8"}], "stateMutability": "view", "type": "function"}]')
def get_w3(chain):
url = RPC_MAP.get(chain)
if not url: return None
return Web3(Web3.HTTPProvider(url))
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
return json.load(f)
return {}
def save_state(state):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def append_history(data):
df = pd.DataFrame([data])
header = not os.path.exists(HISTORY_FILE)
df.to_csv(HISTORY_FILE, mode='a', header=header, index=False)
def get_liquidity_for_amount(amount, sqrt_price_x96, tick_lower, tick_upper, decimal_diff):
# Simplified Liquidity Calc for 50/50 deposit simulation
# L = Amount / (sqrt(P) - sqrt(Pa)) for one side...
# For now, we assume simple V3 math or just track Fee Growth per Unit Liquidity
# Real simulation is complex.
# TRICK: We will track "Fee Growth per 1 Unit of Liquidity" directly (Raw X128).
# Then user can multiply by their theoretical L later.
return 1
def main():
print("Starting Pool Scanner...")
with open(CONFIG_FILE, 'r') as f:
pools = json.load(f)
state = load_state()
# Init Web3 cache
w3_instances = {}
for pool in pools:
name = pool['name']
chain = pool['chain']
# Fix Checksum
try:
addr = Web3.to_checksum_address(pool['pool_address'])
except Exception:
print(f" ❌ Invalid Address: {pool['pool_address']}")
continue
is_aero = pool.get('is_aerodrome', False)
print(f"Scanning {name} ({chain})...")
if chain not in w3_instances:
w3_instances[chain] = get_w3(chain)
w3 = w3_instances[chain]
if not w3 or not w3.is_connected():
print(f" ❌ RPC Error for {chain}")
continue
try:
if is_aero:
abi = AERODROME_POOL_ABI
elif chain == "BSC":
abi = PANCAKE_POOL_ABI
else:
abi = POOL_ABI
contract = w3.eth.contract(address=addr, abi=abi)
# Fetch Data
slot0 = contract.functions.slot0().call()
tick = slot0[1]
sqrt_price = slot0[0]
fg0 = contract.functions.feeGrowthGlobal0X128().call()
fg1 = contract.functions.feeGrowthGlobal1X128().call()
# Fetch Decimals (Once)
if name not in state:
t0 = contract.functions.token0().call()
t1 = contract.functions.token1().call()
d0 = w3.eth.contract(address=t0, abi=ERC20_ABI).functions.decimals().call()
d1 = w3.eth.contract(address=t1, abi=ERC20_ABI).functions.decimals().call()
state[name] = {
"init_tick": tick,
"init_fg0": fg0,
"init_fg1": fg1,
"decimals": [d0, d1],
"cumulative_fees_usd": 0.0,
"last_fg0": fg0,
"last_fg1": fg1
}
# Update State
prev = state[name]
diff0 = fg0 - prev['last_fg0']
diff1 = fg1 - prev['last_fg1']
# Calculate USD Value of Fees (Approx)
# Need Liquidity.
# If we assume 1 unit of Liquidity?
# Fee = Diff * L / 2^128
# Update Last
prev['last_fg0'] = fg0
prev['last_fg1'] = fg1
prev['last_tick'] = tick
prev['last_update'] = datetime.now().isoformat()
# Save History
record = {
"timestamp": datetime.now().isoformat(),
"pool_name": name,
"chain": chain,
"tick": tick,
"sqrtPriceX96": str(sqrt_price),
"feeGrowth0": str(fg0),
"feeGrowth1": str(fg1)
}
append_history(record)
print(f" ✅ Data recorded. Tick: {tick}")
except Exception as e:
print(f" ❌ Error: {e}")
save_state(state)
print("Scan complete.")
if __name__ == "__main__":
while True:
main()
time.sleep(600) # 10 minutes