import logging import os import sys import json import time import multiprocessing import numpy as np # Import numpy to handle np.float64 from logging_utils import setup_logging from trade_log import log_trade class PositionManager: """ (Stateless) Listens for EXPLICIT signals (e.g., "OPEN_LONG") from all strategies and converts them into specific execution orders (e.g., "market_open") for the TradeExecutor. It holds NO position state. """ def __init__(self, log_level: str, trade_signal_queue: multiprocessing.Queue, order_execution_queue: multiprocessing.Queue): # Note: Logging is set up by the run_position_manager function self.trade_signal_queue = trade_signal_queue self.order_execution_queue = order_execution_queue # --- REMOVED: All state management --- logging.info("Position Manager (Stateless) started.") # --- REMOVED: _load_managed_positions method --- # --- REMOVED: _save_managed_positions method --- # --- REMOVED: All tick/rounding/meta logic --- def send_order(self, agent: str, action: str, coin: str, is_buy: bool, size: float, reduce_only: bool = False, limit_px=None, sl_px=None, tp_px=None): """Helper function to put a standardized order onto the execution queue.""" order_data = { "agent": agent, "action": action, "coin": coin, "is_buy": is_buy, "size": size, "reduce_only": reduce_only, "limit_px": limit_px, "sl_px": sl_px, "tp_px": tp_px, } logging.info(f"Sending order to executor: {order_data}") self.order_execution_queue.put(order_data) def run(self): """ Main execution loop. Blocks and waits for a signal from the queue. Converts explicit strategy signals into execution orders. """ logging.info("Position Manager started. Waiting for signals...") while True: try: trade_signal = self.trade_signal_queue.get() if not trade_signal: continue logging.info(f"Received signal: {trade_signal}") name = trade_signal['strategy_name'] config = trade_signal['config'] params = config['parameters'] coin = trade_signal['coin'].upper() # --- NEW: The signal is now the explicit action --- desired_signal = trade_signal['signal'] status = trade_signal signal_price = status.get('signal_price') if isinstance(signal_price, np.float64): signal_price = float(signal_price) if not signal_price or signal_price <= 0: logging.warning(f"[{name}] Signal received with invalid or missing price ({signal_price}). Skipping.") continue # --- This logic is still needed for copy_trader's nested config --- # --- But ONLY for finding leverage, not size --- if 'coins_to_copy' in params: logging.info(f"[{name}] Detected 'coins_to_copy'. Entering copy_trader logic...") matching_coin_key = None for key in params['coins_to_copy'].keys(): if key.upper() == coin: matching_coin_key = key break if matching_coin_key: coin_specific_config = params['coins_to_copy'][matching_coin_key] else: coin_specific_config = {} # --- REMOVED: size = coin_specific_config.get('size') --- params['leverage_long'] = coin_specific_config.get('leverage_long', 2) params['leverage_short'] = coin_specific_config.get('leverage_short', 2) # --- FIX: Read the size from the ROOT of the trade signal --- size = trade_signal.get('size') if not size or size <= 0: logging.error(f"[{name}] Signal received with no 'size' or invalid size ({size}). Skipping trade.") continue # --- END FIX --- leverage_long = int(params.get('leverage_long', 2)) leverage_short = int(params.get('leverage_short', 2)) agent_name = (config.get("agent") or "default").lower() logging.info(f"[{name}] Agent set to: {agent_name}") # --- REMOVED: current_position check --- # --- Use pure signal_price directly for the limit_px --- limit_px = signal_price logging.info(f"[{name}] Using pure signal price for limit_px: {limit_px}") # --- NEW: Stateless Signal-to-Order Conversion --- if desired_signal == "OPEN_LONG": logging.warning(f"[{name}] ACTION: Opening LONG for {coin}.") # --- REMOVED: Leverage update signal --- self.send_order(agent_name, "market_open", coin, True, size, limit_px=limit_px) log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=signal_price, size=size, signal=desired_signal) elif desired_signal == "OPEN_SHORT": logging.warning(f"[{name}] ACTION: Opening SHORT for {coin}.") # --- REMOVED: Leverage update signal --- self.send_order(agent_name, "market_open", coin, False, size, limit_px=limit_px) log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=signal_price, size=size, signal=desired_signal) elif desired_signal == "CLOSE_LONG": logging.warning(f"[{name}] ACTION: Closing LONG position for {coin}.") # A "market_close" for a LONG is a SELL order self.send_order(agent_name, "market_close", coin, False, size, limit_px=limit_px) log_trade(strategy=name, coin=coin, action="CLOSE_LONG", price=signal_price, size=size, signal=desired_signal) elif desired_signal == "CLOSE_SHORT": logging.warning(f"[{name}] ACTION: Closing SHORT position for {coin}.") # A "market_close" for a SHORT is a BUY order self.send_order(agent_name, "market_close", coin, True, size, limit_px=limit_px) log_trade(strategy=name, coin=coin, action="CLOSE_SHORT", price=signal_price, size=size, signal=desired_signal) # --- NEW: Handle leverage update signals --- elif desired_signal == "UPDATE_LEVERAGE_LONG": logging.warning(f"[{name}] ACTION: Updating LONG leverage for {coin} to {size}x") # 'size' field holds the leverage value for this signal self.send_order(agent_name, "update_leverage", coin, True, size) elif desired_signal == "UPDATE_LEVERAGE_SHORT": logging.warning(f"[{name}] ACTION: Updating SHORT leverage for {coin} to {size}x") # 'size' field holds the leverage value for this signal self.send_order(agent_name, "update_leverage", coin, False, size) else: logging.warning(f"[{name}] Received unknown signal '{desired_signal}'. No action taken.") # --- REMOVED: _save_managed_positions() --- except Exception as e: logging.error(f"An error occurred in the position manager loop: {e}", exc_info=True) time.sleep(1) # This script is no longer run directly, but is called by main_app.py