import logging import time import json import os from datetime import datetime, timezone from hyperliquid.info import Info from hyperliquid.utils import constants from strategies.base_strategy import BaseStrategy class CopyTraderStrategy(BaseStrategy): """ An event-driven strategy that monitors a target wallet address and copies its trades for a specific set of allowed coins, using per-coin size and leverage settings. This strategy is STATEFUL and tracks its own positions. """ def __init__(self, strategy_name: str, params: dict, trade_signal_queue, shared_status: dict = None): super().__init__(strategy_name, params, trade_signal_queue, shared_status) self.target_address = self.params.get("target_address", "").lower() self.coins_to_copy = self.params.get("coins_to_copy", {}) # Convert all coin keys to uppercase for consistency self.coins_to_copy = {k.upper(): v for k, v in self.coins_to_copy.items()} self.allowed_coins = list(self.coins_to_copy.keys()) if not self.target_address: logging.error("No 'target_address' specified in parameters for copy trader.") raise ValueError("target_address is required") if not self.allowed_coins: logging.warning("No 'coins_to_copy' configured. This strategy will not copy any trades.") self.info = None # Will be initialized in the run loop # --- MODIFIED: Load and manage its own position state --- self.position_state_file = os.path.join("_data", f"strategy_state_{self.strategy_name}.json") self.current_positions = self._load_position_state() # --- MODIFIED: Check if shared_status is None before using it --- if self.shared_status is None: logging.warning("No shared_status dictionary provided. Initializing a new one.") self.shared_status = {} self.current_signal = self.shared_status.get("current_signal", "WAIT") self.signal_price = self.shared_status.get("signal_price") self.last_signal_change_utc = self.shared_status.get("last_signal_change_utc") self.start_time_utc = datetime.now(timezone.utc) logging.info(f"Strategy initialized. Ignoring all trades before {self.start_time_utc.isoformat()}") logging.info(f"Loaded positions: {self.current_positions}") def _load_position_state(self) -> dict: """Loads the strategy's current open positions from a file.""" if os.path.exists(self.position_state_file): try: with open(self.position_state_file, 'r') as f: logging.info(f"Loading existing position state from {self.position_state_file}") return json.load(f) except (IOError, json.JSONDecodeError): logging.warning(f"Could not read position state file {self.position_state_file}. Starting fresh.") return {} # { "ETH": {"side": "long", "size": 0.01, "entry": 3000}, ... } def _save_position_state(self): """Saves the strategy's current open positions to a file.""" try: with open(self.position_state_file, 'w') as f: json.dump(self.current_positions, f, indent=4) except IOError as e: logging.error(f"Failed to save position state: {e}") def calculate_signals(self, df): # This strategy is event-driven, so it does not use polling-based signal calculation. pass def send_explicit_signal(self, signal: str, coin: str, price: float, trade_params: dict, size: float): """Helper to send a formatted signal to the PositionManager.""" config = { "agent": self.params.get("agent"), "parameters": trade_params } self.trade_signal_queue.put({ "strategy_name": self.strategy_name, "signal": signal, # e.g., "OPEN_LONG", "CLOSE_SHORT" "coin": coin, "signal_price": price, "config": config, "size": size # Explicitly pass size }) logging.info(f"Explicit signal SENT: {signal} {coin} @ {price}, Size: {size}") def on_fill_message(self, message): """ This is the callback function that gets triggered by the WebSocket every time the monitored address has an event. """ try: channel = message.get("channel") if channel not in ("user", "userFills", "userEvents"): return data = message.get("data") if not data: return fills = data.get("fills", []) if not fills: return user_address = data.get("user", "").lower() if user_address != self.target_address: return logging.debug(f"Received {len(fills)} fill(s) for user {user_address}") for fill in fills: # Check if the trade is new or historical trade_time = datetime.fromtimestamp(fill['time'] / 1000, tz=timezone.utc) if trade_time < self.start_time_utc: logging.info(f"Ignoring stale/historical trade from {trade_time.isoformat()}") continue coin = fill.get('coin').upper() if coin in self.allowed_coins: side = fill.get('side') price = float(fill.get('px')) fill_size = float(fill.get('sz')) # Get our strategy's configured trade size for this coin coin_config = self.coins_to_copy.get(coin) if not coin_config or not coin_config.get("size"): logging.warning(f"No trade size specified for {coin}. Ignoring fill.") continue strategy_trade_size = coin_config.get("size") # Prepare config for the signal trade_params = self.params.copy() trade_params.update(coin_config) # Get our current position state for this coin current_local_pos = self.current_positions.get(coin) current_local_side = current_local_pos.get("side") if current_local_pos else None signal_sent = False if side == "B": # Target bought if current_local_side == "short": # Flip: Close short, then open long logging.warning(f"[{coin}] Target BOUGHT, we are SHORT. Flipping to LONG.") self.send_explicit_signal("CLOSE_SHORT", coin, price, trade_params, current_local_pos.get("size")) self.send_explicit_signal("OPEN_LONG", coin, price, trade_params, strategy_trade_size) self.current_positions[coin] = {"side": "long", "size": strategy_trade_size, "entry": price} signal_sent = True elif current_local_side is None: # New: Open long logging.warning(f"[{coin}] Target BOUGHT, we are FLAT. Opening LONG.") self.send_explicit_signal("OPEN_LONG", coin, price, trade_params, strategy_trade_size) self.current_positions[coin] = {"side": "long", "size": strategy_trade_size, "entry": price} signal_sent = True else: # We are already long logging.info(f"[{coin}] Target BOUGHT, we are already LONG. Ignoring.") elif side == "A": # Target sold if current_local_side == "long": # Flip: Close long, then open short logging.warning(f"[{coin}] Target SOLD, we are LONG. Flipping to SHORT.") self.send_explicit_signal("CLOSE_LONG", coin, price, trade_params, current_local_pos.get("size")) self.send_explicit_signal("OPEN_SHORT", coin, price, trade_params, strategy_trade_size) self.current_positions[coin] = {"side": "short", "size": strategy_trade_size, "entry": price} signal_sent = True elif current_local_side is None: # New: Open short logging.warning(f"[{coin}] Target SOLD, we are FLAT. Opening SHORT.") self.send_explicit_signal("OPEN_SHORT", coin, price, trade_params, strategy_trade_size) self.current_positions[coin] = {"side": "short", "size": strategy_trade_size, "entry": price} signal_sent = True else: # We are already short logging.info(f"[{coin}] Target SOLD, we are already SHORT. Ignoring.") if signal_sent: # Update dashboard status self.current_signal = f"{side} @ {coin}" self.signal_price = price self.last_signal_change_utc = trade_time.isoformat() # --- MODIFIED: Save BOTH status files --- self._save_status() # For dashboard self._save_position_state() # For our internal tracking logging.info(f"Source trade logged: {json.dumps(fill)}") else: logging.info(f"Ignoring fill for unmonitored coin: {coin}") except Exception as e: logging.error(f"Error in on_fill_message: {e}", exc_info=True) def _connect_and_subscribe(self): """ Establishes a new WebSocket connection and subscribes to the userFills channel. """ try: logging.info("Connecting to Hyperliquid WebSocket...") self.info = Info(constants.MAINNET_API_URL, skip_ws=False) subscription = {"type": "userFills", "user": self.target_address} self.info.subscribe(subscription, self.on_fill_message) logging.info(f"Subscribed to 'userFills' for target address: {self.target_address}") return True except Exception as e: logging.error(f"Failed to connect or subscribe: {e}") self.info = None return False def run_event_loop(self): """ This method overrides the default polling loop. It establishes a persistent WebSocket connection and runs a watchdog to ensure it stays connected. It also catches KeyboardInterrupt to gracefully shut down positions. """ try: if not self._connect_and_subscribe(): # If connection fails on start, wait 60s before letting the process restart time.sleep(60) return # Save the initial "WAIT" status self._save_status() while True: try: time.sleep(15) # Check the connection every 15 seconds if self.info is None or not self.info.ws_manager.is_alive(): logging.error(f"WebSocket connection lost. Attempting to reconnect...") if self.info and self.info.ws_manager: try: self.info.ws_manager.stop() except Exception as e: logging.error(f"Error stopping old ws_manager: {e}") if not self._connect_and_subscribe(): logging.error("Reconnect failed, will retry in 15s.") else: logging.info("Successfully reconnected to WebSocket.") self._save_status() else: logging.debug("Watchdog check: WebSocket connection is active.") except Exception as e: logging.error(f"An error occurred in the watchdog loop: {e}", exc_info=True) except KeyboardInterrupt: # --- THIS IS THE GRACEFUL SHUTDOWN LOGIC --- logging.warning(f"Shutdown signal received. Closing all open positions for '{self.strategy_name}'...") # Use a copy of the items to avoid runtime modification errors for coin, position in list(self.current_positions.items()): current_side = position.get("side") trade_size = position.get("size") if not current_side or not trade_size: continue # Find the config for this coin coin_config = self.coins_to_copy.get(coin.upper(), {}) trade_params = self.params.copy() trade_params.update(coin_config) # Use the last entry price as a placeholder for the market close order price = position.get("entry", 1) # Use 1 as a failsafe if current_side == "long": logging.warning(f"Sending CLOSE_LONG for {coin}, {price}, {trade_size}...") #self.send_explicit_signal("CLOSE_LONG", coin, price, trade_params, trade_size) #del self.current_positions[coin] # Assume it will close elif current_side == "short": logging.warning(f"Sending CLOSE_SHORT for {coin}, {price}, {trade_size} ...") #self.send_explicit_signal("CLOSE_SHORT", coin, price, trade_params, trade_size) #del self.current_positions[coin] # Assume it will close self._save_position_state() # Save the new empty state logging.info("All closing signals sent. Exiting strategy.") except Exception as e: logging.error(f"An unhandled error occurred in run_event_loop: {e}", exc_info=True) finally: if self.info and self.info.ws_manager and self.info.ws_manager.is_alive(): try: self.info.ws_manager.stop() logging.info("WebSocket connection stopped.") except Exception as e: logging.error(f"Error stopping ws_manager on exit: {e}")