171 lines
7.9 KiB
Python
171 lines
7.9 KiB
Python
import logging
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import multiprocessing
|
|
import numpy as np # Import numpy to handle np.float64
|
|
|
|
from logging_utils import setup_logging
|
|
from trade_log import log_trade
|
|
|
|
class PositionManager:
|
|
"""
|
|
(Stateless) Listens for EXPLICIT signals (e.g., "OPEN_LONG") from all
|
|
strategies and converts them into specific execution orders
|
|
(e.g., "market_open") for the TradeExecutor.
|
|
|
|
It holds NO position state.
|
|
"""
|
|
|
|
def __init__(self, log_level: str, trade_signal_queue: multiprocessing.Queue, order_execution_queue: multiprocessing.Queue):
|
|
# Note: Logging is set up by the run_position_manager function
|
|
|
|
self.trade_signal_queue = trade_signal_queue
|
|
self.order_execution_queue = order_execution_queue
|
|
|
|
# --- REMOVED: All state management ---
|
|
|
|
logging.info("Position Manager (Stateless) started.")
|
|
|
|
# --- REMOVED: _load_managed_positions method ---
|
|
# --- REMOVED: _save_managed_positions method ---
|
|
# --- REMOVED: All tick/rounding/meta logic ---
|
|
|
|
def send_order(self, agent: str, action: str, coin: str, is_buy: bool, size: float, reduce_only: bool = False, limit_px=None, sl_px=None, tp_px=None):
|
|
"""Helper function to put a standardized order onto the execution queue."""
|
|
order_data = {
|
|
"agent": agent,
|
|
"action": action,
|
|
"coin": coin,
|
|
"is_buy": is_buy,
|
|
"size": size,
|
|
"reduce_only": reduce_only,
|
|
"limit_px": limit_px,
|
|
"sl_px": sl_px,
|
|
"tp_px": tp_px,
|
|
}
|
|
logging.info(f"Sending order to executor: {order_data}")
|
|
self.order_execution_queue.put(order_data)
|
|
|
|
def run(self):
|
|
"""
|
|
Main execution loop. Blocks and waits for a signal from the queue.
|
|
Converts explicit strategy signals into execution orders.
|
|
"""
|
|
logging.info("Position Manager started. Waiting for signals...")
|
|
while True:
|
|
try:
|
|
trade_signal = self.trade_signal_queue.get()
|
|
if not trade_signal:
|
|
continue
|
|
|
|
logging.info(f"Received signal: {trade_signal}")
|
|
|
|
name = trade_signal['strategy_name']
|
|
config = trade_signal['config']
|
|
params = config['parameters']
|
|
coin = trade_signal['coin'].upper()
|
|
|
|
# --- NEW: The signal is now the explicit action ---
|
|
desired_signal = trade_signal['signal']
|
|
|
|
status = trade_signal
|
|
|
|
signal_price = status.get('signal_price')
|
|
if isinstance(signal_price, np.float64):
|
|
signal_price = float(signal_price)
|
|
|
|
if not signal_price or signal_price <= 0:
|
|
logging.warning(f"[{name}] Signal received with invalid or missing price ({signal_price}). Skipping.")
|
|
continue
|
|
|
|
# --- This logic is still needed for copy_trader's nested config ---
|
|
# --- But ONLY for finding leverage, not size ---
|
|
if 'coins_to_copy' in params:
|
|
logging.info(f"[{name}] Detected 'coins_to_copy'. Entering copy_trader logic...")
|
|
matching_coin_key = None
|
|
for key in params['coins_to_copy'].keys():
|
|
if key.upper() == coin:
|
|
matching_coin_key = key
|
|
break
|
|
|
|
if matching_coin_key:
|
|
coin_specific_config = params['coins_to_copy'][matching_coin_key]
|
|
else:
|
|
coin_specific_config = {}
|
|
|
|
# --- REMOVED: size = coin_specific_config.get('size') ---
|
|
|
|
params['leverage_long'] = coin_specific_config.get('leverage_long', 2)
|
|
params['leverage_short'] = coin_specific_config.get('leverage_short', 2)
|
|
|
|
# --- FIX: Read the size from the ROOT of the trade signal ---
|
|
size = trade_signal.get('size')
|
|
if not size or size <= 0:
|
|
logging.error(f"[{name}] Signal received with no 'size' or invalid size ({size}). Skipping trade.")
|
|
continue
|
|
# --- END FIX ---
|
|
|
|
leverage_long = int(params.get('leverage_long', 2))
|
|
leverage_short = int(params.get('leverage_short', 2))
|
|
|
|
agent_name = (config.get("agent") or "default").lower()
|
|
|
|
logging.info(f"[{name}] Agent set to: {agent_name}")
|
|
|
|
# --- REMOVED: current_position check ---
|
|
|
|
# --- Use pure signal_price directly for the limit_px ---
|
|
limit_px = signal_price
|
|
logging.info(f"[{name}] Using pure signal price for limit_px: {limit_px}")
|
|
|
|
# --- NEW: Stateless Signal-to-Order Conversion ---
|
|
|
|
if desired_signal == "OPEN_LONG":
|
|
logging.warning(f"[{name}] ACTION: Opening LONG for {coin}.")
|
|
# --- REMOVED: Leverage update signal ---
|
|
self.send_order(agent_name, "market_open", coin, True, size, limit_px=limit_px)
|
|
log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=signal_price, size=size, signal=desired_signal)
|
|
|
|
elif desired_signal == "OPEN_SHORT":
|
|
logging.warning(f"[{name}] ACTION: Opening SHORT for {coin}.")
|
|
# --- REMOVED: Leverage update signal ---
|
|
self.send_order(agent_name, "market_open", coin, False, size, limit_px=limit_px)
|
|
log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=signal_price, size=size, signal=desired_signal)
|
|
|
|
elif desired_signal == "CLOSE_LONG":
|
|
logging.warning(f"[{name}] ACTION: Closing LONG position for {coin}.")
|
|
# A "market_close" for a LONG is a SELL order
|
|
self.send_order(agent_name, "market_close", coin, False, size, limit_px=limit_px)
|
|
log_trade(strategy=name, coin=coin, action="CLOSE_LONG", price=signal_price, size=size, signal=desired_signal)
|
|
|
|
elif desired_signal == "CLOSE_SHORT":
|
|
logging.warning(f"[{name}] ACTION: Closing SHORT position for {coin}.")
|
|
# A "market_close" for a SHORT is a BUY order
|
|
self.send_order(agent_name, "market_close", coin, True, size, limit_px=limit_px)
|
|
log_trade(strategy=name, coin=coin, action="CLOSE_SHORT", price=signal_price, size=size, signal=desired_signal)
|
|
|
|
# --- NEW: Handle leverage update signals ---
|
|
elif desired_signal == "UPDATE_LEVERAGE_LONG":
|
|
logging.warning(f"[{name}] ACTION: Updating LONG leverage for {coin} to {size}x")
|
|
# 'size' field holds the leverage value for this signal
|
|
self.send_order(agent_name, "update_leverage", coin, True, size)
|
|
|
|
elif desired_signal == "UPDATE_LEVERAGE_SHORT":
|
|
logging.warning(f"[{name}] ACTION: Updating SHORT leverage for {coin} to {size}x")
|
|
# 'size' field holds the leverage value for this signal
|
|
self.send_order(agent_name, "update_leverage", coin, False, size)
|
|
|
|
else:
|
|
logging.warning(f"[{name}] Received unknown signal '{desired_signal}'. No action taken.")
|
|
|
|
# --- REMOVED: _save_managed_positions() ---
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred in the position manager loop: {e}", exc_info=True)
|
|
time.sleep(1)
|
|
|
|
# This script is no longer run directly, but is called by main_app.py
|
|
|