size taken from monitored wallet

This commit is contained in:
2025-11-02 22:38:31 +01:00
parent d650bb5fe2
commit 5f9109c3a9
3 changed files with 261 additions and 201 deletions

View File

@ -11,12 +11,16 @@ from strategies.base_strategy import BaseStrategy
class CopyTraderStrategy(BaseStrategy):
"""
An event-driven strategy that monitors a target wallet address and
copies its trades for a specific set of allowed coins, using
per-coin size and leverage settings.
copies its trades for a specific set of allowed coins.
This strategy is STATEFUL and tracks its own positions.
This strategy is STATELESS. It translates a target's fill direction
(e.g., "Open Long") directly into an explicit signal
(e.g., "OPEN_LONG") for the PositionManager.
"""
def __init__(self, strategy_name: str, params: dict, trade_signal_queue, shared_status: dict = None):
# --- MODIFIED: Pass the correct queue to the parent ---
# The event-driven copy trader should send orders to the order_execution_queue
# We will assume the queue passed in is the correct one (as setup in main_app.py)
super().__init__(strategy_name, params, trade_signal_queue, shared_status)
self.target_address = self.params.get("target_address", "").lower()
@ -33,9 +37,9 @@ class CopyTraderStrategy(BaseStrategy):
self.info = None # Will be initialized in the run loop
# --- MODIFIED: Load and manage its own position state ---
self.position_state_file = os.path.join("_data", f"strategy_state_{self.strategy_name}.json")
self.current_positions = self._load_position_state()
# --- REMOVED: All local state management ---
# self.position_state_file = ...
# self.current_positions = ...
# --- MODIFIED: Check if shared_status is None before using it ---
if self.shared_status is None:
@ -48,26 +52,9 @@ class CopyTraderStrategy(BaseStrategy):
self.start_time_utc = datetime.now(timezone.utc)
logging.info(f"Strategy initialized. Ignoring all trades before {self.start_time_utc.isoformat()}")
logging.info(f"Loaded positions: {self.current_positions}")
def _load_position_state(self) -> dict:
"""Loads the strategy's current open positions from a file."""
if os.path.exists(self.position_state_file):
try:
with open(self.position_state_file, 'r') as f:
logging.info(f"Loading existing position state from {self.position_state_file}")
return json.load(f)
except (IOError, json.JSONDecodeError):
logging.warning(f"Could not read position state file {self.position_state_file}. Starting fresh.")
return {} # { "ETH": {"side": "long", "size": 0.01, "entry": 3000}, ... }
def _save_position_state(self):
"""Saves the strategy's current open positions to a file."""
try:
with open(self.position_state_file, 'w') as f:
json.dump(self.current_positions, f, indent=4)
except IOError as e:
logging.error(f"Failed to save position state: {e}")
# --- REMOVED: _load_position_state ---
# --- REMOVED: _save_position_state ---
def calculate_signals(self, df):
# This strategy is event-driven, so it does not use polling-based signal calculation.
@ -76,17 +63,19 @@ class CopyTraderStrategy(BaseStrategy):
def send_explicit_signal(self, signal: str, coin: str, price: float, trade_params: dict, size: float):
"""Helper to send a formatted signal to the PositionManager."""
config = {
# --- MODIFIED: Ensure agent is read from params ---
"agent": self.params.get("agent"),
"parameters": trade_params
}
# --- MODIFIED: Use self.trade_signal_queue (which is the queue passed in) ---
self.trade_signal_queue.put({
"strategy_name": self.strategy_name,
"signal": signal, # e.g., "OPEN_LONG", "CLOSE_SHORT"
"coin": coin,
"signal_price": price,
"config": config,
"size": size # Explicitly pass size
"size": size # Explicitly pass size (or leverage for leverage updates)
})
logging.info(f"Explicit signal SENT: {signal} {coin} @ {price}, Size: {size}")
@ -96,23 +85,45 @@ class CopyTraderStrategy(BaseStrategy):
every time the monitored address has an event.
"""
try:
# --- NEW: Add logging to see ALL messages ---
logging.debug(f"Received WebSocket message: {message}")
channel = message.get("channel")
if channel not in ("user", "userFills", "userEvents"):
# --- NEW: Added debug logging ---
logging.debug(f"Ignoring message from unhandled channel: {channel}")
return
data = message.get("data")
if not data:
# --- NEW: Added debug logging ---
logging.debug("Message received with no 'data' field. Ignoring.")
return
fills = data.get("fills", [])
if not fills:
return
# --- NEW: Check for user address FIRST ---
user_address = data.get("user", "").lower()
if user_address != self.target_address:
if not user_address:
logging.debug("Received message with 'data' but no 'user'. Ignoring.")
return
# --- MODIFIED: Check for 'fills' vs. other event types ---
# This check is still valid for userFills
if "fills" not in data or not data.get("fills"):
# This is a userEvent, but not a fill (e.g., order placement, cancel, withdrawal)
event_type = data.get("type") # e.g., 'order', 'cancel', 'withdrawal'
if event_type:
logging.debug(f"Received non-fill user event: '{event_type}'. Ignoring.")
else:
logging.debug(f"Received 'data' message with no 'fills'. Ignoring.")
return
# --- This line is now safe to run ---
if user_address != self.target_address:
# This shouldn't happen if the subscription is correct, but good to check
logging.warning(f"Received fill for wrong user: {user_address}")
return
fills = data.get("fills")
logging.debug(f"Received {len(fills)} fill(s) for user {user_address}")
for fill in fills:
@ -125,71 +136,108 @@ class CopyTraderStrategy(BaseStrategy):
coin = fill.get('coin').upper()
if coin in self.allowed_coins:
side = fill.get('side')
price = float(fill.get('px'))
fill_size = float(fill.get('sz'))
# Get our strategy's configured trade size for this coin
coin_config = self.coins_to_copy.get(coin)
if not coin_config or not coin_config.get("size"):
logging.warning(f"No trade size specified for {coin}. Ignoring fill.")
# --- MODIFIED: Use the target's fill size ---
fill_size = float(fill.get('sz')) # Target's size
if fill_size == 0:
logging.warning(f"Ignoring fill with size 0.")
continue
strategy_trade_size = coin_config.get("size")
# --- NEW: Get the fill direction ---
# "dir": "Open Long", "Close Long", "Open Short", "Close Short"
fill_direction = fill.get("dir")
# --- NEW: Get startPosition to calculate flip sizes ---
start_pos_size = float(fill.get('startPosition', 0.0))
if not fill_direction:
logging.warning(f"Fill message missing 'dir'. Ignoring fill: {fill}")
continue
# Get our strategy's configured leverage for this coin
coin_config = self.coins_to_copy.get(coin)
# --- REMOVED: Check for coin_config.get("size") ---
# --- REMOVED: strategy_trade_size = coin_config.get("size") ---
# Prepare config for the signal
trade_params = self.params.copy()
trade_params.update(coin_config)
if coin_config:
trade_params.update(coin_config)
# Get our current position state for this coin
current_local_pos = self.current_positions.get(coin)
current_local_side = current_local_pos.get("side") if current_local_pos else None
# --- REMOVED: All stateful logic (current_local_pos, etc.) ---
# --- MODIFIED: Expanded logic to handle flip directions ---
signal_sent = False
if side == "B": # Target bought
if current_local_side == "short":
# Flip: Close short, then open long
logging.warning(f"[{coin}] Target BOUGHT, we are SHORT. Flipping to LONG.")
self.send_explicit_signal("CLOSE_SHORT", coin, price, trade_params, current_local_pos.get("size"))
self.send_explicit_signal("OPEN_LONG", coin, price, trade_params, strategy_trade_size)
self.current_positions[coin] = {"side": "long", "size": strategy_trade_size, "entry": price}
signal_sent = True
elif current_local_side is None:
# New: Open long
logging.warning(f"[{coin}] Target BOUGHT, we are FLAT. Opening LONG.")
self.send_explicit_signal("OPEN_LONG", coin, price, trade_params, strategy_trade_size)
self.current_positions[coin] = {"side": "long", "size": strategy_trade_size, "entry": price}
signal_sent = True
else: # We are already long
logging.info(f"[{coin}] Target BOUGHT, we are already LONG. Ignoring.")
elif side == "A": # Target sold
if current_local_side == "long":
# Flip: Close long, then open short
logging.warning(f"[{coin}] Target SOLD, we are LONG. Flipping to SHORT.")
self.send_explicit_signal("CLOSE_LONG", coin, price, trade_params, current_local_pos.get("size"))
self.send_explicit_signal("OPEN_SHORT", coin, price, trade_params, strategy_trade_size)
self.current_positions[coin] = {"side": "short", "size": strategy_trade_size, "entry": price}
signal_sent = True
elif current_local_side is None:
# New: Open short
logging.warning(f"[{coin}] Target SOLD, we are FLAT. Opening SHORT.")
self.send_explicit_signal("OPEN_SHORT", coin, price, trade_params, strategy_trade_size)
self.current_positions[coin] = {"side": "short", "size": strategy_trade_size, "entry": price}
signal_sent = True
else: # We are already short
logging.info(f"[{coin}] Target SOLD, we are already SHORT. Ignoring.")
dashboard_signal = ""
if fill_direction == "Open Long":
logging.warning(f"[{coin}] Target action: {fill_direction}. Sending signal: OPEN_LONG")
self.send_explicit_signal("OPEN_LONG", coin, price, trade_params, fill_size)
signal_sent = True
dashboard_signal = "OPEN_LONG"
elif fill_direction == "Close Long":
logging.warning(f"[{coin}] Target action: {fill_direction}. Sending signal: CLOSE_LONG")
self.send_explicit_signal("CLOSE_LONG", coin, price, trade_params, fill_size)
signal_sent = True
dashboard_signal = "CLOSE_LONG"
elif fill_direction == "Open Short":
logging.warning(f"[{coin}] Target action: {fill_direction}. Sending signal: OPEN_SHORT")
self.send_explicit_signal("OPEN_SHORT", coin, price, trade_params, fill_size)
signal_sent = True
dashboard_signal = "OPEN_SHORT"
elif fill_direction == "Close Short":
logging.warning(f"[{coin}] Target action: {fill_direction}. Sending signal: CLOSE_SHORT")
self.send_explicit_signal("CLOSE_SHORT", coin, price, trade_params, fill_size)
signal_sent = True
dashboard_signal = "CLOSE_SHORT"
elif fill_direction == "Short > Long":
logging.warning(f"[{coin}] Target action: {fill_direction}. Sending CLOSE_SHORT then OPEN_LONG.")
close_size = abs(start_pos_size)
open_size = fill_size - close_size
if close_size > 0:
self.send_explicit_signal("CLOSE_SHORT", coin, price, trade_params, close_size)
if open_size > 0:
self.send_explicit_signal("OPEN_LONG", coin, price, trade_params, open_size)
signal_sent = True
dashboard_signal = "FLIP_TO_LONG"
elif fill_direction == "Long > Short":
logging.warning(f"[{coin}] Target action: {fill_direction}. Sending CLOSE_LONG then OPEN_SHORT.")
close_size = abs(start_pos_size)
open_size = fill_size - close_size
if close_size > 0:
self.send_explicit_signal("CLOSE_LONG", coin, price, trade_params, close_size)
if open_size > 0:
self.send_explicit_signal("OPEN_SHORT", coin, price, trade_params, open_size)
signal_sent = True
dashboard_signal = "FLIP_TO_SHORT"
if signal_sent:
# Update dashboard status
self.current_signal = f"{side} @ {coin}"
self.current_signal = dashboard_signal # Show the action
self.signal_price = price
self.last_signal_change_utc = trade_time.isoformat()
# --- MODIFIED: Save BOTH status files ---
self.coin = coin # Update coin for dashboard
self.size = fill_size # Update size for dashboard
self._save_status() # For dashboard
self._save_position_state() # For our internal tracking
logging.info(f"Source trade logged: {json.dumps(fill)}")
else:
logging.info(f"[{coin}] Ignoring unhandled fill direction: {fill_direction}")
else:
logging.info(f"Ignoring fill for unmonitored coin: {coin}")
@ -203,9 +251,12 @@ class CopyTraderStrategy(BaseStrategy):
try:
logging.info("Connecting to Hyperliquid WebSocket...")
self.info = Info(constants.MAINNET_API_URL, skip_ws=False)
# --- MODIFIED: Reverted to 'userFills' as requested ---
subscription = {"type": "userFills", "user": self.target_address}
self.info.subscribe(subscription, self.on_fill_message)
logging.info(f"Subscribed to 'userFills' for target address: {self.target_address}")
return True
except Exception as e:
logging.error(f"Failed to connect or subscribe: {e}")
@ -217,8 +268,6 @@ class CopyTraderStrategy(BaseStrategy):
This method overrides the default polling loop. It establishes a
persistent WebSocket connection and runs a watchdog to ensure
it stays connected.
It also catches KeyboardInterrupt to gracefully shut down positions.
"""
try:
if not self._connect_and_subscribe():
@ -226,6 +275,40 @@ class CopyTraderStrategy(BaseStrategy):
time.sleep(60)
return
# --- MODIFIED: Add a small delay to ensure Info object is ready for REST calls ---
logging.info("Connection established. Waiting 2 seconds for Info client to be ready...")
time.sleep(2)
# --- END MODIFICATION ---
# --- NEW: Set initial leverage for all monitored coins ---
logging.info("Setting initial leverage for all monitored coins...")
try:
all_mids = self.info.all_mids()
for coin_key, coin_config in self.coins_to_copy.items():
coin = coin_key.upper()
# Use a failsafe price of 1.0 if coin not in mids (e.g., new listing)
current_price = float(all_mids.get(coin, 1.0))
leverage_long = coin_config.get('leverage_long', 2)
leverage_short = coin_config.get('leverage_short', 2)
# Prepare config for the signal
trade_params = self.params.copy()
trade_params.update(coin_config)
# Send LONG leverage update
# The 'size' param is used to pass the leverage value for this signal type
self.send_explicit_signal("UPDATE_LEVERAGE_LONG", coin, current_price, trade_params, leverage_long)
# Send SHORT leverage update
self.send_explicit_signal("UPDATE_LEVERAGE_SHORT", coin, current_price, trade_params, leverage_short)
logging.info(f"Sent initial leverage signals for {coin} (Long: {leverage_long}x, Short: {leverage_short}x)")
except Exception as e:
logging.error(f"Failed to set initial leverage: {e}", exc_info=True)
# --- END NEW LEVERAGE LOGIC ---
# Save the initial "WAIT" status
self._save_status()
@ -253,36 +336,9 @@ class CopyTraderStrategy(BaseStrategy):
except Exception as e:
logging.error(f"An error occurred in the watchdog loop: {e}", exc_info=True)
except KeyboardInterrupt: # --- THIS IS THE GRACEFUL SHUTDOWN LOGIC ---
logging.warning(f"Shutdown signal received. Closing all open positions for '{self.strategy_name}'...")
# Use a copy of the items to avoid runtime modification errors
for coin, position in list(self.current_positions.items()):
current_side = position.get("side")
trade_size = position.get("size")
if not current_side or not trade_size:
continue
# Find the config for this coin
coin_config = self.coins_to_copy.get(coin.upper(), {})
trade_params = self.params.copy()
trade_params.update(coin_config)
# Use the last entry price as a placeholder for the market close order
price = position.get("entry", 1) # Use 1 as a failsafe
if current_side == "long":
logging.warning(f"Sending CLOSE_LONG for {coin}, {price}, {trade_size}...")
#self.send_explicit_signal("CLOSE_LONG", coin, price, trade_params, trade_size)
#del self.current_positions[coin] # Assume it will close
elif current_side == "short":
logging.warning(f"Sending CLOSE_SHORT for {coin}, {price}, {trade_size} ...")
#self.send_explicit_signal("CLOSE_SHORT", coin, price, trade_params, trade_size)
#del self.current_positions[coin] # Assume it will close
self._save_position_state() # Save the new empty state
logging.info("All closing signals sent. Exiting strategy.")
except KeyboardInterrupt:
# --- MODIFIED: No positions to close, just exit ---
logging.warning(f"Shutdown signal received. Exiting strategy '{self.strategy_name}'.")
except Exception as e:
logging.error(f"An unhandled error occurred in run_event_loop: {e}", exc_info=True)