397 lines
16 KiB
Python
397 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Telegram Monitor for CLP Position Notifications
|
|
Monitors hedge_status.json file for new position openings and sends Telegram notifications
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import logging
|
|
import hashlib
|
|
import requests
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from typing import Optional, Dict, List, Tuple, Any
|
|
from dotenv import load_dotenv
|
|
|
|
# --- SETUP PROJECT PATH ---
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_root = os.path.dirname(current_dir)
|
|
sys.path.append(current_dir)
|
|
|
|
# --- LOGGING SETUP ---
|
|
os.makedirs(os.path.join(current_dir, 'logs'), exist_ok=True)
|
|
|
|
class UnixMsLogFilter(logging.Filter):
|
|
def filter(self, record):
|
|
record.unix_ms = int(record.created * 1000)
|
|
return True
|
|
|
|
logger = logging.getLogger("TELEGRAM_MONITOR")
|
|
logger.setLevel(logging.INFO)
|
|
logger.propagate = False
|
|
logger.handlers.clear()
|
|
|
|
# Console handler
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
console_handler.setFormatter(console_formatter)
|
|
logger.addHandler(console_handler)
|
|
|
|
# File handler
|
|
from clp_config import TARGET_DEX
|
|
|
|
file_handler = logging.FileHandler(os.path.join(current_dir, 'logs', f'{TARGET_DEX}_telegram_monitor.log'), encoding='utf-8')
|
|
file_handler.setLevel(logging.INFO)
|
|
file_handler.addFilter(UnixMsLogFilter())
|
|
file_formatter = logging.Formatter('%(unix_ms)d, %(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(file_formatter)
|
|
logger.addHandler(file_handler)
|
|
|
|
# --- CONFIGURATION ---
|
|
load_dotenv(os.path.join(current_dir, '.env'))
|
|
|
|
TELEGRAM_ENABLED = os.getenv('TELEGRAM_MONITOR_ENABLED', 'False').lower() == 'true'
|
|
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
|
|
TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID', '')
|
|
TELEGRAM_CHECK_INTERVAL = int(os.getenv('TELEGRAM_CHECK_INTERVAL_SECONDS', '60'))
|
|
from clp_config import STATUS_FILE
|
|
TELEGRAM_STATE_FILE = os.getenv('TELEGRAM_STATE_FILE', 'telegram_monitor_state.json')
|
|
TELEGRAM_TIMEOUT = int(os.getenv('TELEGRAM_TIMEOUT_SECONDS', '10'))
|
|
HEDGE_STATUS_FILE = os.getenv('HEDGE_STATUS_FILE', STATUS_FILE)
|
|
|
|
class TelegramNotifier:
|
|
"""Handles Telegram API communication"""
|
|
|
|
def __init__(self, bot_token: str, chat_id: str):
|
|
self.bot_token = bot_token
|
|
self.chat_id = chat_id
|
|
self.base_url = f"https://api.telegram.org/bot{bot_token}"
|
|
|
|
def test_connection(self) -> bool:
|
|
"""Test Telegram bot connection"""
|
|
try:
|
|
url = f"{self.base_url}/getMe"
|
|
response = requests.get(url, timeout=TELEGRAM_TIMEOUT)
|
|
return response.status_code == 200
|
|
except Exception as e:
|
|
logger.error(f"Telegram connection test failed: {e}")
|
|
return False
|
|
|
|
def send_message(self, text: str) -> bool:
|
|
"""Send message to Telegram chat"""
|
|
if not TELEGRAM_ENABLED or not self.bot_token or not self.chat_id:
|
|
logger.debug("Telegram notifications disabled or missing credentials")
|
|
return False
|
|
|
|
try:
|
|
url = f"{self.base_url}/sendMessage"
|
|
payload = {
|
|
'chat_id': self.chat_id,
|
|
'text': text,
|
|
'parse_mode': 'Markdown'
|
|
}
|
|
|
|
response = requests.post(url, json=payload, timeout=TELEGRAM_TIMEOUT)
|
|
result = response.json()
|
|
|
|
if result.get('ok'):
|
|
logger.info("Telegram notification sent successfully")
|
|
return True
|
|
else:
|
|
logger.error(f"Telegram API error: {result}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send Telegram message: {e}")
|
|
return False
|
|
|
|
def format_position_message(self, last_closed: Optional[Dict], current_open: Dict) -> str:
|
|
"""Format position data into readable message"""
|
|
lines = ["NEW CLP POSITION DETECTED\n"]
|
|
|
|
# Previous closed position section
|
|
if last_closed:
|
|
lines.append("LAST CLOSED POSITION:")
|
|
lines.append(f"• Token ID: {last_closed.get('token_id', 'N/A')}")
|
|
lines.append(f"• Entry: ${last_closed.get('entry_price', 0):.2f}")
|
|
lines.append(f"• Target Value: ${last_closed.get('target_value', 0):.2f}")
|
|
|
|
# Add duration if timestamps available
|
|
if last_closed.get('timestamp_open') and last_closed.get('timestamp_close'):
|
|
duration = last_closed['timestamp_close'] - last_closed['timestamp_open']
|
|
hours = duration // 3600
|
|
minutes = (duration % 3600) // 60
|
|
lines.append(f"• Duration: {hours}h {minutes}m")
|
|
|
|
# Add hedge performance if available
|
|
hedge_pnl = last_closed.get('hedge_pnl_realized', 0)
|
|
if hedge_pnl != 0:
|
|
lines.append(f"• Hedge PnL: ${hedge_pnl:.2f}")
|
|
|
|
hedge_fees = last_closed.get('hedge_fees_paid', 0)
|
|
if hedge_fees != 0:
|
|
lines.append(f"• Hedge Fees: ${hedge_fees:.2f}")
|
|
else:
|
|
lines.append("LAST CLOSED POSITION: None")
|
|
|
|
lines.append("") # Empty line
|
|
|
|
# Current opened position section
|
|
lines.append("CURRENTLY OPENED:")
|
|
lines.append(f"• Token ID: {current_open.get('token_id', 'N/A')}")
|
|
lines.append(f"• Entry: ${current_open.get('entry_price', 0):.2f}")
|
|
lines.append(f"• Target Value: ${current_open.get('target_value', 0):.2f}")
|
|
|
|
# Range information
|
|
range_lower = current_open.get('range_lower', 0)
|
|
range_upper = current_open.get('range_upper', 0)
|
|
if range_lower and range_upper:
|
|
lines.append(f"• Range: ${range_lower:.2f} - ${range_upper:.2f}")
|
|
|
|
# Initial amounts
|
|
amount0 = current_open.get('amount0_initial', 0)
|
|
amount1 = current_open.get('amount1_initial', 0)
|
|
if amount0 and amount1:
|
|
lines.append(f"• Initial: {amount0:.4f} ETH + {amount1:.2f} USDC")
|
|
|
|
# Time since opening
|
|
if current_open.get('timestamp_open'):
|
|
age = int(time.time()) - current_open['timestamp_open']
|
|
hours = age // 3600
|
|
minutes = (age % 3600) // 60
|
|
lines.append(f"• Time: {hours}h {minutes}m ago")
|
|
|
|
# Performance comparison if we have both positions
|
|
if last_closed and current_open:
|
|
lines.append("") # Empty line
|
|
lines.append("PERFORMANCE COMPARISON:")
|
|
|
|
# Entry price change
|
|
last_entry = Decimal(str(last_closed.get('entry_price', 0)))
|
|
curr_entry = Decimal(str(current_open.get('entry_price', 0)))
|
|
if last_entry > 0:
|
|
entry_change = curr_entry - last_entry
|
|
entry_change_pct = (entry_change / last_entry) * 100
|
|
sign = "+" if entry_change >= 0 else ""
|
|
lines.append(f"• Entry Change: {sign}${entry_change:.2f} ({sign}{entry_change_pct:.2f}%)")
|
|
|
|
# Value change
|
|
last_value = Decimal(str(last_closed.get('target_value', 0)))
|
|
curr_value = Decimal(str(current_open.get('target_value', 0)))
|
|
if last_value > 0:
|
|
value_change = curr_value - last_value
|
|
value_change_pct = (value_change / last_value) * 100
|
|
sign = "+" if value_change >= 0 else ""
|
|
lines.append(f"• Value Change: {sign}${value_change:.2f} ({sign}{value_change_pct:.2f}%)")
|
|
|
|
# Hedge PnL trend
|
|
last_hedge_pnl = last_closed.get('hedge_pnl_realized', 0)
|
|
curr_hedge_equity = current_open.get('hedge_equity_usd', 0)
|
|
if last_hedge_pnl != 0 and curr_hedge_equity != 0:
|
|
hedge_trend = curr_hedge_equity - last_hedge_pnl
|
|
sign = "+" if hedge_trend >= 0 else ""
|
|
lines.append(f"• Hedge PnL Trend: ${last_hedge_pnl:.2f} -> ${curr_hedge_equity:.2f} ({sign}${hedge_trend:.2f})")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
class PositionMonitor:
|
|
"""Monitors hedge_status.json for changes"""
|
|
|
|
def __init__(self, json_file_path: str, state_file_path: str):
|
|
self.json_file_path = json_file_path
|
|
self.state_file_path = state_file_path
|
|
self.last_known_data = []
|
|
self.last_file_hash = ""
|
|
self.state = self.load_state()
|
|
|
|
def load_state(self) -> Dict[str, Any]:
|
|
"""Load monitor state from file"""
|
|
try:
|
|
if os.path.exists(self.state_file_path):
|
|
with open(self.state_file_path, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.warning(f"Could not load state file: {e}")
|
|
|
|
return {
|
|
"last_known_open_positions": [],
|
|
"last_processed_timestamp": 0,
|
|
"last_file_hash": ""
|
|
}
|
|
|
|
def save_state(self):
|
|
"""Save monitor state to file"""
|
|
try:
|
|
with open(self.state_file_path, 'w') as f:
|
|
json.dump(self.state, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Could not save state file: {e}")
|
|
|
|
def get_file_hash(self, data: List[Dict]) -> str:
|
|
"""Generate hash of file content to detect changes"""
|
|
content = json.dumps(data, sort_keys=True)
|
|
return hashlib.md5(content.encode()).hexdigest()
|
|
|
|
def safe_read_json(self) -> List[Dict]:
|
|
"""Safely read JSON file with retry logic"""
|
|
attempts = 0
|
|
while attempts < 3:
|
|
try:
|
|
if not os.path.exists(self.json_file_path):
|
|
logger.warning(f"JSON file not found: {self.json_file_path}")
|
|
return []
|
|
|
|
with open(self.json_file_path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.warning(f"Attempt {attempts + 1}: Error reading JSON file: {e}")
|
|
time.sleep(1)
|
|
attempts += 1
|
|
|
|
logger.error("Failed to read JSON file after 3 attempts")
|
|
return []
|
|
|
|
def extract_notification_data(self, data: List[Dict]) -> Tuple[Optional[Dict], Optional[Dict]]:
|
|
"""Extract last closed and current open positions"""
|
|
current_open = None
|
|
last_closed = None
|
|
|
|
# Find current open position
|
|
for item in data:
|
|
if item.get('status') == 'OPEN':
|
|
current_open = item
|
|
break
|
|
|
|
# Find most recent closed position
|
|
closed_positions = [item for item in data if item.get('status') == 'CLOSED']
|
|
if closed_positions:
|
|
# Sort by timestamp_open (descending) to get most recent
|
|
closed_positions.sort(key=lambda x: x.get('timestamp_open', 0), reverse=True)
|
|
last_closed = closed_positions[0]
|
|
|
|
return last_closed, current_open
|
|
|
|
def check_for_changes(self) -> bool:
|
|
"""Check if there are changes requiring notification"""
|
|
current_data = self.safe_read_json()
|
|
|
|
if not current_data:
|
|
return False
|
|
|
|
# Check if file content actually changed
|
|
current_hash = self.get_file_hash(current_data)
|
|
if current_hash == self.state.get("last_file_hash", ""):
|
|
return False
|
|
|
|
# Extract positions
|
|
last_closed, current_open = self.extract_notification_data(current_data)
|
|
|
|
if not current_open:
|
|
# No open position, nothing to notify about
|
|
return False
|
|
|
|
current_open_id = current_open.get('token_id')
|
|
last_known_opens = self.state.get("last_known_open_positions", [])
|
|
|
|
# Check if this is a new open position
|
|
if current_open_id not in last_known_opens:
|
|
# New position detected!
|
|
self.last_known_data = current_data
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_notification_data(self) -> Tuple[Optional[Dict], Optional[Dict]]:
|
|
"""Get data for notification"""
|
|
current_data = self.safe_read_json()
|
|
return self.extract_notification_data(current_data)
|
|
|
|
def update_state(self):
|
|
"""Update internal state after notification"""
|
|
current_data = self.safe_read_json()
|
|
if current_data:
|
|
_, current_open = self.extract_notification_data(current_data)
|
|
|
|
if current_open:
|
|
# Update state with current open positions
|
|
self.state["last_known_open_positions"] = [current_open.get('token_id')]
|
|
self.state["last_processed_timestamp"] = int(time.time())
|
|
self.state["last_file_hash"] = self.get_file_hash(current_data)
|
|
self.save_state()
|
|
|
|
|
|
def main():
|
|
"""Main monitoring loop"""
|
|
logger.info("🤖 Telegram Monitor Starting...")
|
|
|
|
notifier = None
|
|
if not TELEGRAM_ENABLED:
|
|
logger.info("📵 Telegram notifications disabled (TELEGRAM_MONITOR_ENABLED=False)")
|
|
else:
|
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
|
logger.error("❌ Telegram enabled but missing BOT_TOKEN or CHAT_ID")
|
|
return
|
|
|
|
# Initialize notifier and test connection
|
|
notifier = TelegramNotifier(TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID)
|
|
if not notifier.test_connection():
|
|
logger.error("❌ Telegram connection failed - check token and network")
|
|
return
|
|
|
|
logger.info(f"✅ Telegram connection established to chat ID: {TELEGRAM_CHAT_ID}")
|
|
|
|
# Initialize monitor
|
|
monitor = PositionMonitor(HEDGE_STATUS_FILE, TELEGRAM_STATE_FILE)
|
|
|
|
logger.info(f"Monitoring file: {HEDGE_STATUS_FILE}")
|
|
logger.info(f"Check interval: {TELEGRAM_CHECK_INTERVAL} seconds")
|
|
logger.info(f"State file: {TELEGRAM_STATE_FILE}")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
if monitor.check_for_changes():
|
|
logger.info("New position opening detected!")
|
|
|
|
if TELEGRAM_ENABLED and notifier:
|
|
last_closed, current_open = monitor.get_notification_data()
|
|
|
|
if current_open:
|
|
message = notifier.format_position_message(last_closed, current_open)
|
|
success = notifier.send_message(message)
|
|
|
|
if success:
|
|
monitor.update_state()
|
|
logger.info(f"Notification sent for position {current_open.get('token_id')}")
|
|
else:
|
|
logger.error("Failed to send notification")
|
|
else:
|
|
logger.warning("Position change detected but no open position found")
|
|
else:
|
|
logger.info("Telegram disabled or notifier not available - skipping notification")
|
|
monitor.update_state() # Still update state to avoid loops
|
|
else:
|
|
logger.debug("No changes detected")
|
|
|
|
time.sleep(TELEGRAM_CHECK_INTERVAL)
|
|
|
|
except KeyboardInterrupt:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in monitoring loop: {e}")
|
|
time.sleep(TELEGRAM_CHECK_INTERVAL) # Continue after error
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutting down Telegram Monitor...")
|
|
|
|
logger.info("Telegram Monitor stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |