#!/usr/bin/env python3 """ Telegram Monitor for CLP Position Notifications Monitors hedge_status.json file for new position openings and sends Telegram notifications """ import os import sys import time import json import logging import hashlib import requests from datetime import datetime from decimal import Decimal from typing import Optional, Dict, List, Tuple, Any from dotenv import load_dotenv # --- SETUP PROJECT PATH --- current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) sys.path.append(current_dir) # --- LOGGING SETUP --- os.makedirs(os.path.join(current_dir, 'logs'), exist_ok=True) class UnixMsLogFilter(logging.Filter): def filter(self, record): record.unix_ms = int(record.created * 1000) return True logger = logging.getLogger("TELEGRAM_MONITOR") logger.setLevel(logging.INFO) logger.propagate = False logger.handlers.clear() # Console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # File handler from clp_config import TARGET_DEX file_handler = logging.FileHandler(os.path.join(current_dir, 'logs', f'{TARGET_DEX}_telegram_monitor.log'), encoding='utf-8') file_handler.setLevel(logging.INFO) file_handler.addFilter(UnixMsLogFilter()) file_formatter = logging.Formatter('%(unix_ms)d, %(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) # --- CONFIGURATION --- load_dotenv(os.path.join(current_dir, '.env')) TELEGRAM_ENABLED = os.getenv('TELEGRAM_MONITOR_ENABLED', 'False').lower() == 'true' TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '') TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID', '') TELEGRAM_CHECK_INTERVAL = int(os.getenv('TELEGRAM_CHECK_INTERVAL_SECONDS', '60')) from clp_config import STATUS_FILE TELEGRAM_STATE_FILE = os.getenv('TELEGRAM_STATE_FILE', 'telegram_monitor_state.json') TELEGRAM_TIMEOUT = int(os.getenv('TELEGRAM_TIMEOUT_SECONDS', '10')) HEDGE_STATUS_FILE = os.getenv('HEDGE_STATUS_FILE', STATUS_FILE) class TelegramNotifier: """Handles Telegram API communication""" def __init__(self, bot_token: str, chat_id: str): self.bot_token = bot_token self.chat_id = chat_id self.base_url = f"https://api.telegram.org/bot{bot_token}" def test_connection(self) -> bool: """Test Telegram bot connection""" try: url = f"{self.base_url}/getMe" response = requests.get(url, timeout=TELEGRAM_TIMEOUT) return response.status_code == 200 except Exception as e: logger.error(f"Telegram connection test failed: {e}") return False def send_message(self, text: str) -> bool: """Send message to Telegram chat""" if not TELEGRAM_ENABLED or not self.bot_token or not self.chat_id: logger.debug("Telegram notifications disabled or missing credentials") return False try: url = f"{self.base_url}/sendMessage" payload = { 'chat_id': self.chat_id, 'text': text, 'parse_mode': 'Markdown' } response = requests.post(url, json=payload, timeout=TELEGRAM_TIMEOUT) result = response.json() if result.get('ok'): logger.info("Telegram notification sent successfully") return True else: logger.error(f"Telegram API error: {result}") return False except Exception as e: logger.error(f"Failed to send Telegram message: {e}") return False def format_position_message(self, last_closed: Optional[Dict], current_open: Dict) -> str: """Format position data into readable message""" lines = ["NEW CLP POSITION DETECTED\n"] # Previous closed position section if last_closed: lines.append("LAST CLOSED POSITION:") lines.append(f"• Token ID: {last_closed.get('token_id', 'N/A')}") lines.append(f"• Entry: ${last_closed.get('entry_price', 0):.2f}") lines.append(f"• Target Value: ${last_closed.get('target_value', 0):.2f}") # Add duration if timestamps available if last_closed.get('timestamp_open') and last_closed.get('timestamp_close'): duration = last_closed['timestamp_close'] - last_closed['timestamp_open'] hours = duration // 3600 minutes = (duration % 3600) // 60 lines.append(f"• Duration: {hours}h {minutes}m") # Add hedge performance if available hedge_pnl = last_closed.get('hedge_pnl_realized', 0) if hedge_pnl != 0: lines.append(f"• Hedge PnL: ${hedge_pnl:.2f}") hedge_fees = last_closed.get('hedge_fees_paid', 0) if hedge_fees != 0: lines.append(f"• Hedge Fees: ${hedge_fees:.2f}") else: lines.append("LAST CLOSED POSITION: None") lines.append("") # Empty line # Current opened position section lines.append("CURRENTLY OPENED:") lines.append(f"• Token ID: {current_open.get('token_id', 'N/A')}") lines.append(f"• Entry: ${current_open.get('entry_price', 0):.2f}") lines.append(f"• Target Value: ${current_open.get('target_value', 0):.2f}") # Range information range_lower = current_open.get('range_lower', 0) range_upper = current_open.get('range_upper', 0) if range_lower and range_upper: lines.append(f"• Range: ${range_lower:.2f} - ${range_upper:.2f}") # Initial amounts amount0 = current_open.get('amount0_initial', 0) amount1 = current_open.get('amount1_initial', 0) if amount0 and amount1: lines.append(f"• Initial: {amount0:.4f} ETH + {amount1:.2f} USDC") # Time since opening if current_open.get('timestamp_open'): age = int(time.time()) - current_open['timestamp_open'] hours = age // 3600 minutes = (age % 3600) // 60 lines.append(f"• Time: {hours}h {minutes}m ago") # Performance comparison if we have both positions if last_closed and current_open: lines.append("") # Empty line lines.append("PERFORMANCE COMPARISON:") # Entry price change last_entry = Decimal(str(last_closed.get('entry_price', 0))) curr_entry = Decimal(str(current_open.get('entry_price', 0))) if last_entry > 0: entry_change = curr_entry - last_entry entry_change_pct = (entry_change / last_entry) * 100 sign = "+" if entry_change >= 0 else "" lines.append(f"• Entry Change: {sign}${entry_change:.2f} ({sign}{entry_change_pct:.2f}%)") # Value change last_value = Decimal(str(last_closed.get('target_value', 0))) curr_value = Decimal(str(current_open.get('target_value', 0))) if last_value > 0: value_change = curr_value - last_value value_change_pct = (value_change / last_value) * 100 sign = "+" if value_change >= 0 else "" lines.append(f"• Value Change: {sign}${value_change:.2f} ({sign}{value_change_pct:.2f}%)") # Hedge PnL trend last_hedge_pnl = last_closed.get('hedge_pnl_realized', 0) curr_hedge_equity = current_open.get('hedge_equity_usd', 0) if last_hedge_pnl != 0 and curr_hedge_equity != 0: hedge_trend = curr_hedge_equity - last_hedge_pnl sign = "+" if hedge_trend >= 0 else "" lines.append(f"• Hedge PnL Trend: ${last_hedge_pnl:.2f} -> ${curr_hedge_equity:.2f} ({sign}${hedge_trend:.2f})") return "\n".join(lines) class PositionMonitor: """Monitors hedge_status.json for changes""" def __init__(self, json_file_path: str, state_file_path: str): self.json_file_path = json_file_path self.state_file_path = state_file_path self.last_known_data = [] self.last_file_hash = "" self.state = self.load_state() def load_state(self) -> Dict[str, Any]: """Load monitor state from file""" try: if os.path.exists(self.state_file_path): with open(self.state_file_path, 'r') as f: return json.load(f) except Exception as e: logger.warning(f"Could not load state file: {e}") return { "last_known_open_positions": [], "last_processed_timestamp": 0, "last_file_hash": "" } def save_state(self): """Save monitor state to file""" try: with open(self.state_file_path, 'w') as f: json.dump(self.state, f, indent=2) except Exception as e: logger.error(f"Could not save state file: {e}") def get_file_hash(self, data: List[Dict]) -> str: """Generate hash of file content to detect changes""" content = json.dumps(data, sort_keys=True) return hashlib.md5(content.encode()).hexdigest() def safe_read_json(self) -> List[Dict]: """Safely read JSON file with retry logic""" attempts = 0 while attempts < 3: try: if not os.path.exists(self.json_file_path): logger.warning(f"JSON file not found: {self.json_file_path}") return [] with open(self.json_file_path, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError) as e: logger.warning(f"Attempt {attempts + 1}: Error reading JSON file: {e}") time.sleep(1) attempts += 1 logger.error("Failed to read JSON file after 3 attempts") return [] def extract_notification_data(self, data: List[Dict]) -> Tuple[Optional[Dict], Optional[Dict]]: """Extract last closed and current open positions""" current_open = None last_closed = None # Find current open position for item in data: if item.get('status') == 'OPEN': current_open = item break # Find most recent closed position closed_positions = [item for item in data if item.get('status') == 'CLOSED'] if closed_positions: # Sort by timestamp_open (descending) to get most recent closed_positions.sort(key=lambda x: x.get('timestamp_open', 0), reverse=True) last_closed = closed_positions[0] return last_closed, current_open def check_for_changes(self) -> bool: """Check if there are changes requiring notification""" current_data = self.safe_read_json() if not current_data: return False # Check if file content actually changed current_hash = self.get_file_hash(current_data) if current_hash == self.state.get("last_file_hash", ""): return False # Extract positions last_closed, current_open = self.extract_notification_data(current_data) if not current_open: # No open position, nothing to notify about return False current_open_id = current_open.get('token_id') last_known_opens = self.state.get("last_known_open_positions", []) # Check if this is a new open position if current_open_id not in last_known_opens: # New position detected! self.last_known_data = current_data return True return False def get_notification_data(self) -> Tuple[Optional[Dict], Optional[Dict]]: """Get data for notification""" current_data = self.safe_read_json() return self.extract_notification_data(current_data) def update_state(self): """Update internal state after notification""" current_data = self.safe_read_json() if current_data: _, current_open = self.extract_notification_data(current_data) if current_open: # Update state with current open positions self.state["last_known_open_positions"] = [current_open.get('token_id')] self.state["last_processed_timestamp"] = int(time.time()) self.state["last_file_hash"] = self.get_file_hash(current_data) self.save_state() def main(): """Main monitoring loop""" logger.info("🤖 Telegram Monitor Starting...") notifier = None if not TELEGRAM_ENABLED: logger.info("📵 Telegram notifications disabled (TELEGRAM_MONITOR_ENABLED=False)") else: if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: logger.error("❌ Telegram enabled but missing BOT_TOKEN or CHAT_ID") return # Initialize notifier and test connection notifier = TelegramNotifier(TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID) if not notifier.test_connection(): logger.error("❌ Telegram connection failed - check token and network") return logger.info(f"✅ Telegram connection established to chat ID: {TELEGRAM_CHAT_ID}") # Initialize monitor monitor = PositionMonitor(HEDGE_STATUS_FILE, TELEGRAM_STATE_FILE) logger.info(f"Monitoring file: {HEDGE_STATUS_FILE}") logger.info(f"Check interval: {TELEGRAM_CHECK_INTERVAL} seconds") logger.info(f"State file: {TELEGRAM_STATE_FILE}") try: while True: try: if monitor.check_for_changes(): logger.info("New position opening detected!") if TELEGRAM_ENABLED and notifier: last_closed, current_open = monitor.get_notification_data() if current_open: message = notifier.format_position_message(last_closed, current_open) success = notifier.send_message(message) if success: monitor.update_state() logger.info(f"Notification sent for position {current_open.get('token_id')}") else: logger.error("Failed to send notification") else: logger.warning("Position change detected but no open position found") else: logger.info("Telegram disabled or notifier not available - skipping notification") monitor.update_state() # Still update state to avoid loops else: logger.debug("No changes detected") time.sleep(TELEGRAM_CHECK_INTERVAL) except KeyboardInterrupt: break except Exception as e: logger.error(f"Error in monitoring loop: {e}") time.sleep(TELEGRAM_CHECK_INTERVAL) # Continue after error except KeyboardInterrupt: logger.info("Shutting down Telegram Monitor...") logger.info("Telegram Monitor stopped") if __name__ == "__main__": main()