import os import time import csv import logging from datetime import datetime from decimal import Decimal, getcontext from web3 import Web3 from dotenv import load_dotenv # --- CONFIGURATION --- POOL_ADDRESS = '0xC6962004f452bE9203591991D15f6b388e09E8D0' # ETH/USDC 0.05% Arbitrum INTERVAL_SECONDS = 15 * 60 # 15 minutes RANGE_PCT = 10.0 # Total scan range +/- 10% STEP_PCT = 0.1 # Resolution step 0.1% TVL_USD_BASELINE = Decimal('74530000') # Baseline TVL for concentration calculation # Token Details D0 = 18 # WETH D1 = 6 # USDC getcontext().prec = 60 load_dotenv() # Setup Logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("DEPTH_MONITOR") # Ensure logs directory exists os.makedirs('logs', exist_ok=True) CSV_FILE = 'logs/pool_liquidity_depth.csv' # ABI for Uniswap V3 Pool POOL_ABI = [ {'inputs': [], 'name': 'liquidity', 'outputs': [{'internalType': 'uint128', 'name': '', 'type': 'uint128'}], 'stateMutability': 'view', 'type': 'function'}, {'inputs': [], 'name': 'slot0', 'outputs': [{'internalType': 'uint160', 'name': 'sqrtPriceX96', 'type': 'uint160'}, {'internalType': 'int24', 'name': 'tick', 'type': 'int24'}], 'stateMutability': 'view', 'type': 'function'}, {'inputs': [{'internalType': 'int24', 'name': 'tick', 'type': 'int24'}], 'name': 'ticks', 'outputs': [{'internalType': 'uint128', 'name': 'liquidityGross', 'type': 'uint128'}, {'internalType': 'int128', 'name': 'liquidityNet', 'type': 'int128'}], 'stateMutability': 'view', 'type': 'function'} ] def get_price_from_tick(tick): return (Decimal('1.0001') ** Decimal(str(tick))) * (Decimal('10') ** Decimal(str(D0 - D1))) def get_liquidity_at_offsets(pool_contract, current_tick, current_liquidity): """ Samples liquidity at various price offsets. Note: This samples initialized ticks to calculate L at specific price points. """ results = [] # Tick spacing for 0.05% pools is 10. # 0.1% price move is approx 10 ticks. ticks_per_step = 10 total_steps = int(RANGE_PCT / STEP_PCT) # --- SCAN DOWN --- l_running = Decimal(current_liquidity) for i in range(1, total_steps + 1): target_tick = current_tick - (i * ticks_per_step) # Traverse ticks between previous and current target to update liquidity for t in range(current_tick - (i-1)*ticks_per_step, target_tick - 1, -1): data = pool_contract.functions.ticks(t).call() if data[0] > 0: # initialized l_running -= Decimal(data[1]) offset_pct = -round(i * STEP_PCT, 2) px = get_price_from_tick(target_tick) results.append({'offset': offset_pct, 'price': px, 'liquidity': l_running}) # --- SCAN UP --- l_running = Decimal(current_liquidity) for i in range(1, total_steps + 1): target_tick = current_tick + (i * ticks_per_step) for t in range(current_tick + (i-1)*ticks_per_step + 1, target_tick + 1): data = pool_contract.functions.ticks(t).call() if data[0] > 0: l_running += Decimal(data[1]) offset_pct = round(i * STEP_PCT, 2) px = get_price_from_tick(target_tick) results.append({'offset': offset_pct, 'price': px, 'liquidity': l_running}) # Add center point results.append({'offset': 0.0, 'price': get_price_from_tick(current_tick), 'liquidity': Decimal(current_liquidity)}) return sorted(results, key=lambda x: x['offset']) def main(): rpc_url = os.environ.get('MAINNET_RPC_URL') if not rpc_url: logger.error("MAINNET_RPC_URL not found in .env") return w3 = Web3(Web3.HTTPProvider(rpc_url)) pool = w3.eth.contract(address=Web3.to_checksum_address(POOL_ADDRESS), abi=POOL_ABI) # Initialize CSV if it doesn't exist file_exists = os.path.isfile(CSV_FILE) logger.info(f"Starting Depth Monitor for {POOL_ADDRESS}") logger.info(f"Scan Range: +/-{RANGE_PCT}% | Resolution: {STEP_PCT}% | Interval: {INTERVAL_SECONDS/60}m") while True: try: # 1. Fetch State l_active = pool.functions.liquidity().call() s0 = pool.functions.slot0().call() curr_tick = s0[1] curr_price = get_price_from_tick(curr_tick) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 2. Map Depth depth_data = get_liquidity_at_offsets(pool, curr_tick, l_active) # 3. Calculate Concentration & Save with open(CSV_FILE, 'a', newline='') as f: writer = csv.writer(f) if not file_exists: writer.writerow(['timestamp', 'ref_price', 'offset_pct', 'target_price', 'liquidity', 'concentration']) file_exists = True for row in depth_data: # L_full baseline for THIS specific price point # Corrected L_full: (TVL * 10^6) / (2 * sqrtP) # sqrtP_norm = sqrt(Price) / 10^((D0-D1)/2) sqrtP_norm = row['price'].sqrt() / (Decimal('10') ** (Decimal(str(D0 - D1)) / Decimal('2'))) l_full = (TVL_USD_BASELINE * (Decimal('10')**Decimal('6'))) / (Decimal('2') * sqrtP_norm) conc = row['liquidity'] / l_full writer.writerow([ timestamp, f"{curr_price:.4f}", row['offset'], f"{row['price']:.4f}", f"{row['liquidity']:.0f}", f"{conc:.2f}" ]) logger.info(f"Recorded depth snapshot at {curr_price:.2f}. Next in {INTERVAL_SECONDS/60}m.") time.sleep(INTERVAL_SECONDS) except Exception as e: logger.error(f"Error in monitor loop: {e}") time.sleep(60) if __name__ == "__main__": main()