From 93363750aea07a19753b74bdf4d758205866cf5f Mon Sep 17 00:00:00 2001 From: DiTus Date: Mon, 27 Oct 2025 21:54:33 +0100 Subject: [PATCH] fixes, old way to handle strategies --- address_monitor.py | 4 +- dashboard_data_fetcher.py | 136 +++++++++++ main_app.py | 172 +++++++++----- strategies/base_strategy.py | 112 +++++++-- strategies/copy_trader_strategy.py | 178 ++++++++++++++ strategies/ma_cross_strategy.py | 7 +- strategies/single_sma_strategy.py | 7 +- trade_executor.py | 283 +++++++++++++--------- whale_tracker.py | 367 +++++++++++++++++++++++++++++ 9 files changed, 1063 insertions(+), 203 deletions(-) create mode 100644 dashboard_data_fetcher.py create mode 100644 strategies/copy_trader_strategy.py create mode 100644 whale_tracker.py diff --git a/address_monitor.py b/address_monitor.py index 9bd768a..ced5e44 100644 --- a/address_monitor.py +++ b/address_monitor.py @@ -15,13 +15,13 @@ from logging_utils import setup_logging # --- Configuration --- DEFAULT_ADDRESSES_TO_WATCH = [ #"0xd4c1f7e8d876c4749228d515473d36f919583d1d", - "0x0fd468a73084daa6ea77a9261e40fdec3e67e0c7", + "0x47930c76790c865217472f2ddb4d14c640ee450a", # "0x4d69495d16fab95c3c27b76978affa50301079d0", # "0x09bc1cf4d9f0b59e1425a8fde4d4b1f7d3c9410d", "0xc6ac58a7a63339898aeda32499a8238a46d88e84", "0xa8ef95dbd3db55911d3307930a84b27d6e969526", # "0x4129c62faf652fea61375dcd9ca8ce24b2bb8b95", - "0xbf1935fe7ab6d0aa3ee8d3da47c2f80e215b2a1c", + "0x32885a6adac4375858E6edC092EfDDb0Ef46484C", ] MAX_FILLS_TO_DISPLAY = 10 LOGS_DIR = "_logs" diff --git a/dashboard_data_fetcher.py b/dashboard_data_fetcher.py new file mode 100644 index 0000000..789fbeb --- /dev/null +++ b/dashboard_data_fetcher.py @@ -0,0 +1,136 @@ +import logging +import os +import sys +import json +import time +import argparse # <-- THE FIX: Added this import +from datetime import datetime +from eth_account import Account +from hyperliquid.info import Info +from hyperliquid.utils import constants +from dotenv import load_dotenv + +from logging_utils import setup_logging + +# Load .env file +load_dotenv() + +class DashboardDataFetcher: + """ + A dedicated, lightweight process that runs in a loop to fetch and save + the account's state (balances, positions) for the main dashboard to display. + """ + + def __init__(self, log_level: str): + setup_logging(log_level, 'DashboardDataFetcher') + + self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") + if not self.vault_address: + logging.error("MAIN_WALLET_ADDRESS not set in .env file. Cannot proceed.") + sys.exit(1) + + self.info = Info(constants.MAINNET_API_URL, skip_ws=True) + self.status_file_path = os.path.join("_logs", "trade_executor_status.json") + self.managed_positions_path = os.path.join("_data", "executor_managed_positions.json") + logging.info(f"Dashboard Data Fetcher initialized for vault: {self.vault_address}") + + def load_managed_positions(self) -> dict: + """Loads the state of which strategy manages which position.""" + if os.path.exists(self.managed_positions_path): + try: + with open(self.managed_positions_path, 'r') as f: + data = json.load(f) + # Create a reverse map: {coin: strategy_name} + return {v['coin']: k for k, v in data.items()} + except (IOError, json.JSONDecodeError): + logging.warning("Could not read managed positions file.") + return {} + + def fetch_and_save_status(self): + """Fetches all account data and saves it to the JSON status file.""" + try: + perpetuals_state = self.info.user_state(self.vault_address) + spot_state = self.info.spot_user_state(self.vault_address) + meta, all_market_contexts = self.info.meta_and_asset_ctxs() + coin_to_strategy_map = self.load_managed_positions() + + status = { + "last_updated_utc": datetime.now().isoformat(), + "perpetuals_account": { "balances": {}, "open_positions": [] }, + "spot_account": { "positions": [] } + } + + # 1. Extract Perpetuals Account Data + margin_summary = perpetuals_state.get("marginSummary", {}) + status["perpetuals_account"]["balances"] = { + "account_value": margin_summary.get("accountValue"), + "total_margin_used": margin_summary.get("totalMarginUsed"), + "withdrawable": margin_summary.get("withdrawable") + } + + asset_positions = perpetuals_state.get("assetPositions", []) + for asset_pos in asset_positions: + pos = asset_pos.get('position', {}) + if float(pos.get('szi', 0)) != 0: + coin = pos.get('coin') + position_value = float(pos.get('positionValue', 0)) + margin_used = float(pos.get('marginUsed', 0)) + leverage = position_value / margin_used if margin_used > 0 else 0 + + position_info = { + "coin": coin, + "strategy": coin_to_strategy_map.get(coin, "Unmanaged"), + "size": pos.get('szi'), + "position_value": pos.get('positionValue'), + "entry_price": pos.get('entryPx'), + "mark_price": pos.get('markPx'), + "pnl": pos.get('unrealizedPnl'), + "liq_price": pos.get('liquidationPx'), + "margin": pos.get('marginUsed'), + "funding": pos.get('fundingRate'), + "leverage": f"{leverage:.1f}x" + } + status["perpetuals_account"]["open_positions"].append(position_info) + + # 2. Extract Spot Account Data + price_map = { asset.get("universe", {}).get("name"): asset.get("markPx") for asset in all_market_contexts if asset.get("universe", {}).get("name") } + spot_balances = spot_state.get("balances", []) + for bal in spot_balances: + total_balance = float(bal.get('total', 0)) + if total_balance > 0: + coin = bal.get('coin') + mark_price = float(price_map.get(coin, 0)) + status["spot_account"]["positions"].append({ + "coin": coin, "balance_size": total_balance, + "position_value": total_balance * mark_price, "pnl": "N/A" + }) + + # 3. Write to file + # Use atomic write to prevent partial reads from main_app + temp_file_path = self.status_file_path + ".tmp" + with open(temp_file_path, 'w', encoding='utf-8') as f: + json.dump(status, f, indent=4) + # Rename is atomic + os.replace(temp_file_path, self.status_file_path) + + logging.debug(f"Successfully updated dashboard status file.") + + except Exception as e: + logging.error(f"Failed to fetch or save account status: {e}") + + def run(self): + """Main loop to periodically fetch and save data.""" + while True: + self.fetch_and_save_status() + time.sleep(5) # Update dashboard data every 5 seconds + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run the Dashboard Data Fetcher.") + parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) + args = parser.parse_args() + + fetcher = DashboardDataFetcher(log_level=args.log_level) + try: + fetcher.run() + except KeyboardInterrupt: + logging.info("Dashboard Data Fetcher stopped.") diff --git a/main_app.py b/main_app.py index b425701..5e7cb5d 100644 --- a/main_app.py +++ b/main_app.py @@ -19,11 +19,11 @@ from strategies.base_strategy import BaseStrategy # --- Configuration --- WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"] -# --- FIX: Replaced old data_fetcher with the new live_candle_fetcher --- LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py" RESAMPLER_SCRIPT = "resampler.py" MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py" TRADE_EXECUTOR_SCRIPT = "trade_executor.py" +DASHBOARD_DATA_FETCHER_SCRIPT = "dashboard_data_fetcher.py" STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json") DB_PATH = os.path.join("_data", "market_data.db") MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json") @@ -65,6 +65,7 @@ def run_resampler_job(timeframes_to_generate: list): """Defines the job for the resampler, redirecting output to a log file.""" log_file = os.path.join(LOGS_DIR, "resampler.log") try: + # --- MODIFIED: No longer needs to check for empty list, coins are from WATCHED_COINS --- command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--timeframes"] + timeframes_to_generate + ["--log-level", "normal"] with open(log_file, 'a') as f: f.write(f"\n--- Starting resampler.py job at {datetime.now()} ---\n") @@ -78,10 +79,15 @@ def run_resampler_job(timeframes_to_generate: list): def resampler_scheduler(timeframes_to_generate: list): """Schedules the resampler.py script.""" setup_logging('off', 'ResamplerScheduler') + + if not timeframes_to_generate: + logging.warning("Resampler scheduler started but no timeframes were provided to generate. The process will idle.") + return # Exit the function if there's nothing to do + run_resampler_job(timeframes_to_generate) # Schedule to run every minute at the :01 second mark schedule.every().minute.at(":01").do(run_resampler_job, timeframes_to_generate=timeframes_to_generate) - logging.info("Resampler scheduled to run every minute at :01.") + logging.info(f"Resampler scheduled to run every minute at :01 for {timeframes_to_generate}.") while True: schedule.run_pending() time.sleep(1) # Check every second to not miss the scheduled time @@ -110,10 +116,32 @@ def market_cap_fetcher_scheduler(): time.sleep(60) -def run_strategy(strategy_name: str, config: dict): +def run_trade_executor(trade_signal_queue): + """ + Target function to run the trade_executor.py script in a resilient loop. + It passes the shared signal queue to the executor. + """ + log_file = os.path.join(LOGS_DIR, "trade_executor.log") + while True: + try: + with open(log_file, 'a') as f: + f.write(f"\n--- Starting Trade Executor at {datetime.now()} ---\n") + + from trade_executor import TradeExecutor + + executor = TradeExecutor(log_level="normal", trade_signal_queue=trade_signal_queue) + executor.run() # This will block and run forever + + except (subprocess.CalledProcessError, Exception) as e: + with open(log_file, 'a') as f: + f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") + f.write(f"Trade Executor failed: {e}. Restarting...\n") + time.sleep(10) + +def run_strategy(strategy_name: str, config: dict, trade_signal_queue: multiprocessing.Queue): """ This function BECOMES the strategy runner. It is executed as a separate - process by multiprocessing. + process and pushes signals to the shared queue. """ # These imports only happen in the new, lightweight process import importlib @@ -127,21 +155,17 @@ def run_strategy(strategy_name: str, config: dict): # --- Setup logging to file for this specific process --- log_file_path = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log") try: - # Redirect stdout and stderr of this process to its log file - sys.stdout = open(log_file_path, 'a') + sys.stdout = open(log_file_path, 'a', buffering=1) # 1 = line buffering sys.stderr = sys.stdout except Exception as e: print(f"Failed to open log file for {strategy_name}: {e}") - # Setup logging *within this process* setup_logging('normal', f"Strategy-{strategy_name}") - # --- Main resilient loop (was previously in main_app) --- while True: try: logging.info(f"--- Starting strategy '{strategy_name}' ---") - # 1. Load the strategy class if 'class' not in config: logging.error(f"Strategy config for '{strategy_name}' is missing the 'class' key. Exiting.") return @@ -149,44 +173,37 @@ def run_strategy(strategy_name: str, config: dict): module_path, class_name = config['class'].rsplit('.', 1) module = importlib.import_module(module_path) StrategyClass = getattr(module, class_name) - strategy = StrategyClass(strategy_name, config['parameters']) # Log level is now handled here + + strategy = StrategyClass(strategy_name, config['parameters'], trade_signal_queue) - # 2. Run the strategy's logic loop - logging.info(f"Starting main logic loop for {strategy.coin} on {strategy.timeframe}.") - while True: - df = strategy.load_data() - if df.empty: - logging.warning("No data loaded. Waiting 1 minute...") - time.sleep(60) - continue - - strategy.calculate_signals_and_state(df.copy()) - strategy._save_status() - - logging.info(f"Current Signal: {strategy.current_signal}") - time.sleep(60) # Simple 1-minute wait + if config.get("is_event_driven", False): + logging.info(f"Starting EVENT-DRIVEN logic loop...") + strategy.run_event_loop() # This is a blocking call + else: + logging.info(f"Starting POLLING logic loop...") + strategy.run_polling_loop() # This is the original blocking call except KeyboardInterrupt: logging.info("Strategy process stopping.") - return # Exit the outer loop on Ctrl+C + return except Exception as e: logging.error(f"Strategy '{strategy_name}' failed: {e}", exc_info=True) logging.info("Restarting strategy in 10 seconds...") time.sleep(10) -def run_trade_executor(): - """Target function to run the trade_executor.py script in a resilient loop.""" - log_file = os.path.join(LOGS_DIR, "trade_executor.log") +def run_dashboard_data_fetcher(): + """Target function to run the dashboard_data_fetcher.py script.""" + log_file = os.path.join(LOGS_DIR, "dashboard_data_fetcher.log") while True: try: with open(log_file, 'a') as f: - f.write(f"\n--- Starting Trade Executor at {datetime.now()} ---\n") - subprocess.run([sys.executable, TRADE_EXECUTOR_SCRIPT, "--log-level", "normal"], check=True, stdout=f, stderr=subprocess.STDOUT) + f.write(f"\n--- Starting Dashboard Data Fetcher at {datetime.now()} ---\n") + subprocess.run([sys.executable, DASHBOARD_DATA_FETCHER_SCRIPT, "--log-level", "normal"], check=True, stdout=f, stderr=subprocess.STDOUT) except (subprocess.CalledProcessError, Exception) as e: with open(log_file, 'a') as f: f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") - f.write(f"Trade Executor failed: {e}. Restarting...\n") + f.write(f"Dashboard Data Fetcher failed: {e}. Restarting...\n") time.sleep(10) @@ -207,13 +224,15 @@ class MainApp: try: self.prices = dict(self.shared_prices) except Exception as e: - logging.debug(f"Could not read from shared prices dict: {e}") + logging.debug("Could not read from shared prices dict: {e}") def read_market_caps(self): + """Reads the latest market cap summary from its JSON file.""" if os.path.exists(MARKET_CAP_SUMMARY_FILE): try: with open(MARKET_CAP_SUMMARY_FILE, 'r', encoding='utf-8') as f: summary_data = json.load(f) + for coin in self.watched_coins: table_key = f"{coin}_market_cap" if table_key in summary_data: @@ -222,6 +241,7 @@ class MainApp: logging.debug("Could not read market cap summary file.") def read_strategy_statuses(self): + """Reads the status JSON file for each enabled strategy.""" enabled_statuses = {} for name, config in self.strategy_configs.items(): if config.get("enabled", False): @@ -237,6 +257,7 @@ class MainApp: self.strategy_statuses = enabled_statuses def read_executor_status(self): + """Reads the live status file from the trade executor.""" if os.path.exists(TRADE_EXECUTOR_STATUS_FILE): try: with open(TRADE_EXECUTOR_STATUS_FILE, 'r', encoding='utf-8') as f: @@ -247,11 +268,13 @@ class MainApp: self.open_positions = {} def check_process_status(self): + """Checks if the background processes are still running.""" for name, process in self.background_processes.items(): self.process_status[name] = "Running" if process.is_alive() else "STOPPED" def display_dashboard(self): - print("\x1b[H\x1b[J", end="") + """Displays a formatted dashboard with side-by-side tables.""" + print("\x1b[H\x1b[J", end="") # Clear screen left_table_lines = ["--- Market Dashboard ---"] left_table_width = 44 @@ -278,9 +301,11 @@ class MainApp: left_table_lines.append("-" * left_table_width) right_table_lines = ["--- Strategy Status ---"] - right_table_width = 154 + # --- FIX: Adjusted table width after removing parameters --- + right_table_width = 105 right_table_lines.append("-" * right_table_width) - right_table_lines.append(f"{'#':^2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':^8} | {'Signal Price':>12} | {'Last Change':>17} | {'TF':^5} | {'Size':^8} | {'Parameters':<45} |") + # --- FIX: Removed 'Parameters' from header --- + right_table_lines.append(f"{'#':^2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':^8} | {'Signal Price':>12} | {'Last Change':>17} | {'TF':^5} | {'Size':^8} |") right_table_lines.append("-" * right_table_width) for i, (name, status) in enumerate(self.strategy_statuses.items(), 1): signal = status.get('current_signal', 'N/A') @@ -294,13 +319,40 @@ class MainApp: last_change_display = dt_local.strftime('%Y-%m-%d %H:%M') config_params = self.strategy_configs.get(name, {}).get('parameters', {}) - coin = config_params.get('coin', 'N/A') - timeframe = config_params.get('timeframe', 'N/A') - size = config_params.get('size', 'N/A') - other_params = {k: v for k, v in config.get('parameters', {}).items() if k not in ['coin', 'timeframe', 'size']} - params_str = ", ".join([f"{k}={v}" for k, v in other_params.items()]) - right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size:>8} | {params_str:<45} |") + # --- NEW ROBUST LOGIC --- + # 1. Get Timeframe (always from config) + timeframe = config_params.get('timeframe', 'N/A') + + # 2. Get Coin: Try status file first (live), then config file (static) + coin = status.get('coin', config_params.get('coin', 'N/A')) + + # 3. Get Size: Try status file first, then config file + size_from_status = status.get('size', None) + size_from_config = config_params.get('size', None) + + size = "N/A" + if size_from_status is not None: + size = size_from_status # Use live status from copy_trader + elif size_from_config is not None: + size = size_from_config # Use config from simple strategy + elif 'coins_to_copy' in config_params: + # Special case: copy_trader, but status file is old (no 'size' field) + if coin != 'N/A' and coin != 'Multi': + # Try to find size in config if we know the coin from status + # --- SYNTAX FIX: Removed extra ".get(" --- + size = config_params.get('coins_to_copy', {}).get(coin, {}).get('size', 'Multi') + else: + coin = 'Multi' # It's a copy trader, but we don't know the coin + size = 'Multi' + + size_display = f"{size:>8}" if isinstance(size, (int, float)) else f"{str(size):>8}" + # --- END OF NEW LOGIC --- + + # --- FIX: Removed parameter string logic --- + + # --- FIX: Removed 'params_str' from the formatted line --- + right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size_display} |") right_table_lines.append("-" * right_table_width) output_lines = [] @@ -346,9 +398,7 @@ class MainApp: output_lines.append(f"{'Spot':<10} | {coin:<6} | {balance_size:>15} | {'-':>12} | {'-':>12} | {pnl:>15} | {'-':>10} |") output_lines.append("-" * pos_table_width) - output_lines.append("\n--- Background Processes ---") - for name, status in self.process_status.items(): - output_lines.append(f"{name:<25}: {status}") + # --- REMOVED: Background Processes Section --- final_output = "\n".join(output_lines) print(final_output) @@ -361,7 +411,7 @@ class MainApp: self.read_market_caps() self.read_strategy_statuses() self.read_executor_status() - self.check_process_status() + # --- REMOVED: self.check_process_status() --- self.display_dashboard() time.sleep(0.5) @@ -381,32 +431,34 @@ if __name__ == "__main__": logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}") sys.exit(1) - required_timeframes = set() - for name, config in strategy_configs.items(): - if config.get("enabled", False): - tf = config.get("parameters", {}).get("timeframe") - if tf: - required_timeframes.add(tf) - - if not required_timeframes: - logging.warning("No timeframes required by any enabled strategy.") + # --- MODIFIED: Removed dynamic timeframe logic --- + # --- NEW: Hardcoded timeframes for the resampler --- + resampler_timeframes = [ + "3m", "5m", "15m", "30m", "1h", "2h", "4h", "8h", + "12h", "1d", "3d", "1w", "1M", "148m", "37m" + ] + logging.info(f"Using hardcoded timeframes for resampler: {resampler_timeframes}") + # --- END NEW --- with multiprocessing.Manager() as manager: shared_prices = manager.dict() + trade_signal_queue = manager.Queue() processes["Live Market Feed"] = multiprocessing.Process(target=start_live_feed, args=(shared_prices, 'off'), daemon=True) processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True) - processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True) + # --- MODIFIED: Pass the new hardcoded list to the resampler process --- + processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(resampler_timeframes,), daemon=True) processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) - processes["Trade Executor"] = multiprocessing.Process(target=run_trade_executor, daemon=True) + + processes["Trade Executor"] = multiprocessing.Process(target=run_trade_executor, args=(trade_signal_queue,), daemon=True) + processes["Dashboard Data"] = multiprocessing.Process(target=run_dashboard_data_fetcher, daemon=True) for name, config in strategy_configs.items(): if config.get("enabled", False): - # --- FIX: Check for the 'class' key, not the 'script' key --- if 'class' not in config: logging.error(f"Strategy '{name}' is missing 'class' key. Skipping.") continue - proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True) + proc = multiprocessing.Process(target=run_strategy, args=(name, config, trade_signal_queue), daemon=True) processes[f"Strategy: {name}"] = proc for name, proc in processes.items(): @@ -424,6 +476,6 @@ if __name__ == "__main__": if proc.is_alive(): proc.terminate() for proc in processes.values(): if proc.is_alive(): proc.join() - logging.info("Shutdown complete.") - sys.exit(0) + logging.info("Shutdown complete.") + sys.exit(0) diff --git a/strategies/base_strategy.py b/strategies/base_strategy.py index ebeb063..afe3ff6 100644 --- a/strategies/base_strategy.py +++ b/strategies/base_strategy.py @@ -5,8 +5,12 @@ import os import logging from datetime import datetime, timezone import sqlite3 +import multiprocessing +import time from logging_utils import setup_logging +from hyperliquid.info import Info +from hyperliquid.utils import constants class BaseStrategy(ABC): """ @@ -14,20 +18,23 @@ class BaseStrategy(ABC): It provides common functionality like loading data, saving status, and state management. """ - def __init__(self, strategy_name: str, params: dict): - # Note: log_level is not needed here as logging is set up by the process + def __init__(self, strategy_name: str, params: dict, trade_signal_queue: multiprocessing.Queue = None, shared_status: dict = None): self.strategy_name = strategy_name self.params = params + self.trade_signal_queue = trade_signal_queue + # Optional multiprocessing.Manager().dict() to hold live status (avoids file IO) + self.shared_status = shared_status + self.coin = params.get("coin", "N/A") self.timeframe = params.get("timeframe", "N/A") self.db_path = os.path.join("_data", "market_data.db") self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json") - + self.current_signal = "INIT" self.last_signal_change_utc = None self.signal_price = None - logging.info(f"Initializing with parameters: {self.params}") + # Note: Logging is set up by the run_strategy function def load_data(self) -> pd.DataFrame: """Loads historical data for the configured coin and timeframe.""" @@ -53,27 +60,41 @@ class BaseStrategy(ABC): """The core logic of the strategy. Must be implemented by child classes.""" pass - def calculate_signals_and_state(self, df: pd.DataFrame): + def calculate_signals_and_state(self, df: pd.DataFrame) -> bool: """ - A wrapper that calls the strategy's signal calculation and then - determines the last signal change from the historical data. + A wrapper that calls the strategy's signal calculation, determines + the last signal change, and returns True if the signal has changed. """ df_with_signals = self.calculate_signals(df) df_with_signals.dropna(inplace=True) - if df_with_signals.empty: return + if df_with_signals.empty: + return False df_with_signals['position_change'] = df_with_signals['signal'].diff() - last_signal = df_with_signals['signal'].iloc[-1] - if last_signal == 1: self.current_signal = "BUY" - elif last_signal == -1: self.current_signal = "SELL" - else: self.current_signal = "HOLD" + last_signal_int = df_with_signals['signal'].iloc[-1] + new_signal_str = "HOLD" + if last_signal_int == 1: new_signal_str = "BUY" + elif last_signal_int == -1: new_signal_str = "SELL" - last_change_series = df_with_signals[df_with_signals['position_change'] != 0] - if not last_change_series.empty: - last_change_row = last_change_series.iloc[-1] - self.last_signal_change_utc = last_change_row.name.tz_localize('UTC').isoformat() - self.signal_price = last_change_row['close'] + signal_changed = False + if self.current_signal == "INIT": + if new_signal_str == "BUY": self.current_signal = "INIT_BUY" + elif new_signal_str == "SELL": self.current_signal = "INIT_SELL" + else: self.current_signal = "HOLD" + signal_changed = True + elif new_signal_str != self.current_signal: + self.current_signal = new_signal_str + signal_changed = True + + if signal_changed: + last_change_series = df_with_signals[df_with_signals['position_change'] != 0] + if not last_change_series.empty: + last_change_row = last_change_series.iloc[-1] + self.last_signal_change_utc = last_change_row.name.tz_localize('UTC').isoformat() + self.signal_price = last_change_row['close'] + + return signal_changed def _save_status(self): """Saves the current strategy state to its JSON file.""" @@ -84,9 +105,62 @@ class BaseStrategy(ABC): "signal_price": self.signal_price, "last_checked_utc": datetime.now(timezone.utc).isoformat() } + # If a shared status dict is provided (Manager.dict()), update it instead of writing files try: - with open(self.status_file_path, 'w', encoding='utf-8') as f: - json.dump(status, f, indent=4) + if self.shared_status is not None: + try: + # store the status under the strategy name for easy lookup + self.shared_status[self.strategy_name] = status + except Exception: + # Manager proxies may not accept nested mutable objects consistently; assign a copy + self.shared_status[self.strategy_name] = dict(status) + else: + with open(self.status_file_path, 'w', encoding='utf-8') as f: + json.dump(status, f, indent=4) except IOError as e: logging.error(f"Failed to write status file for {self.strategy_name}: {e}") + def run_polling_loop(self): + """ + The default execution loop for polling-based strategies (e.g., SMAs). + """ + while True: + df = self.load_data() + if df.empty: + logging.warning("No data loaded. Waiting 1 minute...") + time.sleep(60) + continue + + signal_changed = self.calculate_signals_and_state(df.copy()) + self._save_status() + + if signal_changed or self.current_signal == "INIT_BUY" or self.current_signal == "INIT_SELL": + logging.warning(f"New signal detected: {self.current_signal}") + self.trade_signal_queue.put({ + "strategy_name": self.strategy_name, + "signal": self.current_signal, + "coin": self.coin, + "signal_price": self.signal_price, + "config": {"agent": self.params.get("agent"), "parameters": self.params} + }) + if self.current_signal == "INIT_BUY": self.current_signal = "BUY" + if self.current_signal == "INIT_SELL": self.current_signal = "SELL" + + logging.info(f"Current Signal: {self.current_signal}") + time.sleep(60) + + def run_event_loop(self): + """ + A placeholder for event-driven (WebSocket) strategies. + Child classes must override this. + """ + logging.error("run_event_loop() is not implemented for this strategy.") + time.sleep(3600) # Sleep for an hour to prevent rapid error loops + + def on_fill_message(self, message): + """ + Placeholder for the WebSocket callback. + Child classes must override this. + """ + pass + diff --git a/strategies/copy_trader_strategy.py b/strategies/copy_trader_strategy.py new file mode 100644 index 0000000..9f086ea --- /dev/null +++ b/strategies/copy_trader_strategy.py @@ -0,0 +1,178 @@ +import logging +import time +import json +from datetime import datetime, timezone +from hyperliquid.info import Info +from hyperliquid.utils import constants + +from strategies.base_strategy import BaseStrategy + +class CopyTraderStrategy(BaseStrategy): + """ + An event-driven strategy that monitors a target wallet address and + copies its trades for a specific set of allowed coins, using + per-coin size and leverage settings. + """ + def __init__(self, strategy_name: str, params: dict, trade_signal_queue, shared_status: dict = None): + super().__init__(strategy_name, params, trade_signal_queue, shared_status) + + self.target_address = self.params.get("target_address", "").lower() + + self.coins_to_copy = self.params.get("coins_to_copy", {}) + self.allowed_coins = list(self.coins_to_copy.keys()) + + if not self.target_address: + logging.error("No 'target_address' specified in parameters for copy trader.") + raise ValueError("target_address is required") + if not self.allowed_coins: + logging.warning("No 'coins_to_copy' configured. This strategy will not copy any trades.") + + self.info = None # Will be initialized in the run loop + + # --- FIX: Set initial state to "WAIT" --- + self.current_signal = "WAIT" + + # Record the strategy's start time to ignore historical data + self.start_time_utc = datetime.now(timezone.utc) + logging.info(f"Strategy initialized. Ignoring all trades before {self.start_time_utc.isoformat()}") + + def calculate_signals(self, df): + # This strategy is event-driven, so it does not use polling-based signal calculation. + pass + + def on_fill_message(self, message): + """ + This is the callback function that gets triggered by the WebSocket + every time the monitored address has an event. + """ + try: + channel = message.get("channel") + if channel not in ("user", "userFills", "userEvents"): + return + + data = message.get("data") + if not data: + return + + fills = data.get("fills", []) + if not fills: + return + + user_address = data.get("user", "").lower() + + if user_address != self.target_address: + return + + logging.debug(f"Received {len(fills)} fill(s) for user {user_address}") + + for fill in fills: + # Check if the trade is new or historical + trade_time = datetime.fromtimestamp(fill['time'] / 1000, tz=timezone.utc) + if trade_time < self.start_time_utc: + logging.info(f"Ignoring stale/historical trade from {trade_time.isoformat()}") + continue + + coin = fill.get('coin') + + if coin in self.allowed_coins: + side = fill.get('side') + price = float(fill.get('px')) + + signal = "HOLD" + if side == "B": + signal = "BUY" + elif side == "A": + signal = "SELL" + + coin_config = self.coins_to_copy.get(coin) + if not coin_config or not coin_config.get("size"): + logging.warning(f"No trade size specified for {coin}. Ignoring fill.") + continue + + # --- 1. Create the trade-specific config --- + trade_params = self.params.copy() + trade_params.update(coin_config) + trade_config = { + "agent": self.params.get("agent"), + "parameters": trade_params + } + + # --- 2. (PRIORITY) Put the signal on the queue for the executor --- + self.trade_signal_queue.put({ + "strategy_name": self.strategy_name, + "signal": signal, + "coin": coin, + "signal_price": price, + "config": trade_config + }) + + # --- 3. (Secondary) Update internal state and log --- + self.current_signal = signal + self.signal_price = price + self.last_signal_change_utc = trade_time.isoformat() + self._save_status() # Update the dashboard status file + + logging.warning(f"Copy trade signal SENT for {coin}: {signal} @ {price}, Size: {coin_config['size']}") + logging.info(f"Source trade logged: {json.dumps(fill)}") + + else: + logging.info(f"Ignoring fill for unmonitored coin: {coin}") + + except Exception as e: + logging.error(f"Error in on_fill_message: {e}", exc_info=True) + + def _connect_and_subscribe(self): + """ + Establishes a new WebSocket connection and subscribes to the userFills channel. + """ + try: + logging.info("Connecting to Hyperliquid WebSocket...") + self.info = Info(constants.MAINNET_API_URL, skip_ws=False) + subscription = {"type": "userFills", "user": self.target_address} + self.info.subscribe(subscription, self.on_fill_message) + logging.info(f"Subscribed to 'userFills' for target address: {self.target_address}") + return True + except Exception as e: + logging.error(f"Failed to connect or subscribe: {e}") + self.info = None + return False + + def run_event_loop(self): + """ + This method overrides the default polling loop. It establishes a + persistent WebSocket connection and runs a watchdog to ensure + it stays connected. + """ + if not self._connect_and_subscribe(): + # If connection fails on start, wait 60s before letting the process restart + time.sleep(60) + return + + # --- ADDED: Save the initial "WAIT" status --- + self._save_status() + + while True: + try: + time.sleep(15) # Check the connection every 15 seconds + + if self.info is None or not self.info.ws_manager.is_alive(): + logging.error(f"WebSocket connection lost. Attempting to reconnect...") + + if self.info and self.info.ws_manager: + try: + self.info.ws_manager.stop() + except Exception as e: + logging.error(f"Error stopping old ws_manager: {e}") + + if not self._connect_and_subscribe(): + logging.error("Reconnect failed, will retry in 15s.") + else: + logging.info("Successfully reconnected to WebSocket.") + # After reconnecting, save the current status again + self._save_status() + else: + logging.debug("Watchdog check: WebSocket connection is active.") + + except Exception as e: + logging.error(f"An error occurred in the watchdog loop: {e}", exc_info=True) + diff --git a/strategies/ma_cross_strategy.py b/strategies/ma_cross_strategy.py index 3effd1e..ff756a2 100644 --- a/strategies/ma_cross_strategy.py +++ b/strategies/ma_cross_strategy.py @@ -7,8 +7,10 @@ class MaCrossStrategy(BaseStrategy): A strategy based on a fast Simple Moving Average (SMA) crossing a slow SMA. """ - def __init__(self, strategy_name: str, params: dict, log_level: str): - super().__init__(strategy_name, params) + # --- FIX: Changed 3rd argument from log_level to trade_signal_queue --- + def __init__(self, strategy_name: str, params: dict, trade_signal_queue): + # --- FIX: Passed trade_signal_queue to the parent class --- + super().__init__(strategy_name, params, trade_signal_queue) self.fast_ma_period = self.params.get('short_ma') or self.params.get('fast') or 0 self.slow_ma_period = self.params.get('long_ma') or self.params.get('slow') or 0 @@ -26,4 +28,3 @@ class MaCrossStrategy(BaseStrategy): df.loc[df['fast_sma'] < df['slow_sma'], 'signal'] = -1 return df - diff --git a/strategies/single_sma_strategy.py b/strategies/single_sma_strategy.py index 4dd2ddb..a05f95e 100644 --- a/strategies/single_sma_strategy.py +++ b/strategies/single_sma_strategy.py @@ -6,8 +6,10 @@ class SingleSmaStrategy(BaseStrategy): """ A strategy based on the price crossing a single Simple Moving Average (SMA). """ - def __init__(self, strategy_name: str, params: dict): - super().__init__(strategy_name, params) + # --- FIX: Added trade_signal_queue to the constructor --- + def __init__(self, strategy_name: str, params: dict, trade_signal_queue): + # --- FIX: Passed trade_signal_queue to the parent class --- + super().__init__(strategy_name, params, trade_signal_queue) self.sma_period = self.params.get('sma_period', 0) def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame: @@ -23,4 +25,3 @@ class SingleSmaStrategy(BaseStrategy): df.loc[df['close'] < df['sma'], 'signal'] = -1 return df - diff --git a/trade_executor.py b/trade_executor.py index 35ac261..4a1f40c 100644 --- a/trade_executor.py +++ b/trade_executor.py @@ -5,6 +5,7 @@ import sys import json import time from datetime import datetime +import multiprocessing from eth_account import Account from hyperliquid.exchange import Exchange @@ -20,40 +21,41 @@ load_dotenv() class TradeExecutor: """ - Monitors strategy signals and executes trades using a multi-agent, - multi-strategy position management system. Each strategy's position is - tracked independently. + Monitors a shared queue for strategy signals and executes trades. + This script is now a dedicated, event-driven consumer. """ - def __init__(self, log_level: str): + def __init__(self, log_level: str, trade_signal_queue: multiprocessing.Queue, shared_executor_status: dict = None): setup_logging(log_level, 'TradeExecutor') - + + self.trade_signal_queue = trade_signal_queue + + # Optional Manager.dict() to store live managed positions and other executor status + self.shared_executor_status = shared_executor_status + self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") if not self.vault_address: logging.error("MAIN_WALLET_ADDRESS not set.") - sys.exit(1) + # --- FIX: Raise an exception instead of sys.exit() --- + # This allows the main_app process manager to catch and log the error. + raise ValueError("MAIN_WALLET_ADDRESS not set in environment.") + # --- FIX: Corrected constant name from MAIN_NET_API_URL to MAINNET_API_URL --- self.info = Info(constants.MAINNET_API_URL, skip_ws=True) self.exchanges = self._load_agents() if not self.exchanges: logging.error("No trading agents found in .env file.") - sys.exit(1) + # --- FIX: Raise an exception instead of sys.exit() --- + raise ValueError("No trading agents found in .env file. Check AGENT_PRIVATE_KEY or _AGENT_PK vars.") - strategy_config_path = os.path.join("_data", "strategies.json") - try: - with open(strategy_config_path, 'r') as f: - self.strategy_configs = {name: config for name, config in json.load(f).items() if config.get("enabled")} - logging.info(f"Loaded {len(self.strategy_configs)} enabled strategies.") - except (FileNotFoundError, json.JSONDecodeError) as e: - logging.error(f"Could not load strategies from '{strategy_config_path}': {e}") - sys.exit(1) - - self.status_file_path = os.path.join("_logs", "trade_executor_status.json") self.managed_positions_path = os.path.join("_data", "executor_managed_positions.json") self.managed_positions = self._load_managed_positions() + logging.info(f"TradeExecutor initialized. Agents available: {list(self.exchanges.keys())}") def _load_agents(self) -> dict: - """Discovers and initializes agents from environment variables.""" + """ + Discovers and initializes agents by scanning for environment variables. + """ exchanges = {} logging.info("Discovering agents from environment variables...") for env_var, private_key in os.environ.items(): @@ -74,10 +76,20 @@ class TradeExecutor: def _load_managed_positions(self) -> dict: """Loads the state of which strategy manages which position.""" + # Prefer shared in-memory state when available + try: + if self.shared_executor_status is not None: + mgr = self.shared_executor_status.get('managed_positions') if isinstance(self.shared_executor_status, dict) else None + if mgr: + logging.info("Loading managed positions from shared executor status.") + return dict(mgr) + except Exception: + logging.debug("Unable to read managed positions from shared status. Falling back to file.") + if os.path.exists(self.managed_positions_path): try: with open(self.managed_positions_path, 'r') as f: - logging.info("Loading existing managed positions state.") + logging.info("Loading existing managed positions state from file.") return json.load(f) except (IOError, json.JSONDecodeError): logging.warning("Could not read managed positions file. Starting fresh.") @@ -86,115 +98,154 @@ class TradeExecutor: def _save_managed_positions(self): """Saves the current state of managed positions.""" try: - with open(self.managed_positions_path, 'w') as f: - json.dump(self.managed_positions, f, indent=4) + if self.shared_executor_status is not None: + try: + # store under a known key + self.shared_executor_status['managed_positions'] = dict(self.managed_positions) + except Exception: + # fallback: try direct assignment + self.shared_executor_status['managed_positions'] = self.managed_positions + else: + with open(self.managed_positions_path, 'w') as f: + json.dump(self.managed_positions, f, indent=4) except IOError as e: logging.error(f"Failed to save managed positions state: {e}") - def _save_executor_status(self, perpetuals_state, spot_state, all_market_contexts): - """Saves the current balances and open positions to a live status file.""" - # This function is correct and does not need changes. - pass - def run(self): - """The main execution loop with advanced position management.""" - logging.info("Starting Trade Executor loop...") + """ + Main execution loop. Blocks and waits for a signal from the queue. + """ + logging.info("Trade Executor started. Waiting for signals...") while True: try: - perpetuals_state = self.info.user_state(self.vault_address) - open_positions_api = {pos['position'].get('coin'): pos['position'] for pos in perpetuals_state.get('assetPositions', []) if float(pos.get('position', {}).get('szi', 0)) != 0} - - for name, config in self.strategy_configs.items(): - coin = config['parameters'].get('coin') - size = config['parameters'].get('size') - # --- ADDED: Load leverage parameters from config --- - leverage_long = config['parameters'].get('leverage_long') - leverage_short = config['parameters'].get('leverage_short') - - status_file = os.path.join("_data", f"strategy_status_{name}.json") - if not os.path.exists(status_file): continue - with open(status_file, 'r') as f: status = json.load(f) - - desired_signal = status.get('current_signal') - current_position = self.managed_positions.get(name) - - agent_name = config.get("agent", "default").lower() - exchange_to_use = self.exchanges.get(agent_name) - if not exchange_to_use: - logging.error(f"[{name}] Agent '{agent_name}' not found. Skipping trade.") + trade_signal = self.trade_signal_queue.get() + if not trade_signal: + continue + + logging.info(f"Received signal: {trade_signal}") + + # Basic validation and debug information to help trace gaps + if 'config' not in trade_signal: + logging.error(f"Signal missing 'config' key. Ignoring: {trade_signal}") + continue + if 'strategy_name' not in trade_signal: + logging.error(f"Signal missing 'strategy_name' key. Ignoring: {trade_signal}") + continue + # Special command handling + if isinstance(trade_signal, dict) and trade_signal.get('_cmd') == 'CLOSE_ALL': + target_agent = trade_signal.get('agent') + logging.warning(f"Received CLOSE_ALL command for agent: {target_agent}") + if not target_agent: + logging.error("CLOSE_ALL command missing 'agent' field. Ignoring.") continue - # --- State Machine Logic with Configurable Leverage --- - if desired_signal == "BUY": - if not current_position: - if not all([size, leverage_long]): - logging.error(f"[{name}] 'size' or 'leverage_long' not defined. Skipping.") - continue - - logging.warning(f"[{name}] ACTION: Open LONG for {coin} with {leverage_long}x leverage.") - exchange_to_use.update_leverage(int(leverage_long), coin) - exchange_to_use.market_open(coin, True, size, None, 0.01) - self.managed_positions[name] = {"coin": coin, "side": "long", "size": size} - log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=status.get('signal_price', 0), size=size, signal=desired_signal) + # Iterate managed positions and close those opened by the target agent + to_close = [s for s, v in self.managed_positions.items() if v.get('agent') == target_agent] + if not to_close: + logging.info(f"No managed positions found for agent '{target_agent}'.") + continue - elif current_position['side'] == 'short': - if not all([size, leverage_long]): - logging.error(f"[{name}] 'size' or 'leverage_long' not defined. Skipping.") - continue + for sname in to_close: + pos = self.managed_positions.get(sname) + if not pos: + continue + coin = pos.get('coin') + side = pos.get('side') + size = pos.get('size') + # Determine is_buy to neutralize the position + is_buy = True if side == 'short' else False + logging.warning(f"[CLOSE_ALL] Closing {side} position for strategy {sname}, coin {coin}, size {size}") + try: + # Use the agent's exchange if available + exch = self.exchanges.get(target_agent) + if exch: + exch.market_open(coin, is_buy, size, None, 0.01) + else: + logging.error(f"Exchange object for agent '{target_agent}' not found. Skipping live close for {sname}.") + except Exception as e: + logging.error(f"Error closing position for {sname}: {e}") + # remove from managed positions regardless to avoid stuck state + try: + del self.managed_positions[sname] + except KeyError: + pass - logging.warning(f"[{name}] ACTION: Close SHORT and open LONG for {coin} with {leverage_long}x leverage.") - exchange_to_use.update_leverage(int(leverage_long), coin) - exchange_to_use.market_open(coin, True, current_position['size'] + size, None, 0.01) - self.managed_positions[name] = {"coin": coin, "side": "long", "size": size} - log_trade(strategy=name, coin=coin, action="CLOSE_SHORT_&_REVERSE", price=status.get('signal_price', 0), size=size, signal=desired_signal) - - elif desired_signal == "SELL": - if not current_position: - if not all([size, leverage_short]): - logging.error(f"[{name}] 'size' or 'leverage_short' not defined. Skipping.") - continue - - logging.warning(f"[{name}] ACTION: Open SHORT for {coin} with {leverage_short}x leverage.") - exchange_to_use.update_leverage(int(leverage_short), coin) - exchange_to_use.market_open(coin, False, size, None, 0.01) - self.managed_positions[name] = {"coin": coin, "side": "short", "size": size} - log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=status.get('signal_price', 0), size=size, signal=desired_signal) - - elif current_position['side'] == 'long': - if not all([size, leverage_short]): - logging.error(f"[{name}] 'size' or 'leverage_short' not defined. Skipping.") - continue - - logging.warning(f"[{name}] ACTION: Close LONG and open SHORT for {coin} with {leverage_short}x leverage.") - exchange_to_use.update_leverage(int(leverage_short), coin) - exchange_to_use.market_open(coin, False, current_position['size'] + size, None, 0.01) - self.managed_positions[name] = {"coin": coin, "side": "short", "size": size} - log_trade(strategy=name, coin=coin, action="CLOSE_LONG_&_REVERSE", price=status.get('signal_price', 0), size=size, signal=desired_signal) - - elif desired_signal == "FLAT": - if current_position: - logging.warning(f"[{name}] ACTION: Close {current_position['side']} position for {coin}.") - is_buy = current_position['side'] == 'short' - exchange_to_use.market_open(coin, is_buy, current_position['size'], None, 0.01) - del self.managed_positions[name] - log_trade(strategy=name, coin=coin, action=f"CLOSE_{current_position['side'].upper()}", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal) - self._save_managed_positions() + logging.info(f"CLOSE_ALL for agent '{target_agent}' completed.") + continue + + name = trade_signal['strategy_name'] + config = trade_signal['config'] + params = config.get('parameters', {}) + coin = trade_signal['coin'] + desired_signal = trade_signal['signal'] + status = trade_signal + + size = params.get('size') + if size is None: + logging.error(f"[{name}] No 'size' in parameters: {params}. Skipping.") + continue + leverage_long = int(params.get('leverage_long', 2)) + leverage_short = int(params.get('leverage_short', 2)) + current_position = self.managed_positions.get(name) + + agent_name = (config.get("agent") or "default").lower() + exchange_to_use = self.exchanges.get(agent_name) + if not exchange_to_use: + logging.error(f"[{name}] Agent '{agent_name}' not found. Available agents: {list(self.exchanges.keys())}. Skipping trade.") + continue + + # --- State Machine Logic (now runs instantly on signal) --- + if desired_signal == "BUY" or desired_signal == "INIT_BUY": + if not current_position: + logging.warning(f"[{name}] ACTION: Setting leverage to {leverage_long}x and opening LONG for {coin}.") + exchange_to_use.update_leverage(leverage_long, coin) + exchange_to_use.market_open(coin, True, size, None, 0.01) + self.managed_positions[name] = {"coin": coin, "side": "long", "size": size} + log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=status.get('signal_price', 0), size=size, signal=desired_signal) + elif current_position['side'] == 'short': + logging.warning(f"[{name}] ACTION: Closing SHORT and opening LONG for {coin} with {leverage_long}x leverage.") + exchange_to_use.update_leverage(leverage_long, coin) + # 1. Close the short by buying back (this is a market_open, but is_buy=True) + exchange_to_use.market_open(coin, True, current_position['size'], None, 0.01) + log_trade(strategy=name, coin=coin, action="CLOSE_SHORT", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal) + # 2. Open the new long + exchange_to_use.market_open(coin, True, size, None, 0.01) + self.managed_positions[name] = {"coin": coin, "side": "long", "size": size} + log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=status.get('signal_price', 0), size=size, signal=desired_signal) + + elif desired_signal == "SELL" or desired_signal == "INIT_SELL": + if not current_position: + logging.warning(f"[{name}] ACTION: Setting leverage to {leverage_short}x and opening SHORT for {coin}.") + exchange_to_use.update_leverage(leverage_short, coin) + exchange_to_use.market_open(coin, False, size, None, 0.01) + self.managed_positions[name] = {"coin": coin, "side": "short", "size": size} + log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=status.get('signal_price', 0), size=size, signal=desired_signal) + elif current_position['side'] == 'long': + logging.warning(f"[{name}] ACTION: Closing LONG and opening SHORT for {coin} with {leverage_short}x leverage.") + exchange_to_use.update_leverage(leverage_short, coin) + # 1. Close the long by selling + exchange_to_use.market_open(coin, False, current_position['size'], None, 0.01) + log_trade(strategy=name, coin=coin, action="CLOSE_LONG", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal) + # 2. Open the new short + exchange_to_use.market_open(coin, False, size, None, 0.01) + self.managed_positions[name] = {"coin": coin, "side": "short", "size": size} + # --- FIX: Corrected typo from 'signal.desired_signal' to 'signal=desired_signal' --- + log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=status.get('signal_price', 0), size=size, signal=desired_signal) + + elif desired_signal == "FLAT": + if current_position: + logging.warning(f"[{name}] ACTION: Close {current_position['side']} position for {coin}.") + is_buy = current_position['side'] == 'short' + exchange_to_use.market_open(coin, is_buy, current_position['size'], None, 0.01) + del self.managed_positions[name] + log_trade(strategy=name, coin=coin, action=f"CLOSE_{current_position['side'].upper()}", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal) + + self._save_managed_positions() except Exception as e: - logging.error(f"An error occurred in the main executor loop: {e}") + logging.error(f"An error occurred in the main executor loop: {e}", exc_info=True) + time.sleep(1) - time.sleep(15) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Run the Trade Executor.") - parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) - args = parser.parse_args() - - executor = TradeExecutor(log_level=args.log_level) - try: - executor.run() - except KeyboardInterrupt: - logging.info("Trade Executor stopped.") +# This script is no longer run directly, but is called by main_app.py diff --git a/whale_tracker.py b/whale_tracker.py new file mode 100644 index 0000000..69fc29a --- /dev/null +++ b/whale_tracker.py @@ -0,0 +1,367 @@ +import json +import os +import time +import requests +import logging +import argparse +import sys +from datetime import datetime, timedelta + +# --- Configuration --- +# !! IMPORTANT: Update this to your actual Hyperliquid API endpoint !! +API_ENDPOINT = "https://api.hyperliquid.xyz/info" + +INPUT_FILE = os.path.join("_data", "wallets_to_track.json") +OUTPUT_FILE = os.path.join("_data", "wallets_info.json") +LOGS_DIR = "_logs" +LOG_FILE = os.path.join(LOGS_DIR, "whale_tracker.log") + +# Polling intervals (in seconds) +POLL_INTERVALS = { + 'core_data': 10, # 5-15s range + 'open_orders': 20, # 15-30s range + 'account_metrics': 180, # 1-5m range + 'ledger_updates': 600, # 5-15m range + 'save_data': 5, # How often to write to wallets_info.json + 'reload_wallets': 60 # Check for wallet list changes every 60s +} + +class HyperliquidAPI: + """ + Client to handle POST requests to the Hyperliquid info endpoint. + """ + def __init__(self, base_url): + self.base_url = base_url + self.session = requests.Session() + logging.info(f"API Client initialized for endpoint: {base_url}") + + def post_request(self, payload): + """ + Internal helper to send POST requests and handle errors. + """ + try: + response = self.session.post(self.base_url, json=payload, timeout=10) + response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) + return response.json() + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP Error: {e.response.status_code} for {e.request.url}. Response: {e.response.text}") + except requests.exceptions.ConnectionError as e: + logging.error(f"Connection Error: {e}") + except requests.exceptions.Timeout: + logging.error(f"Request timed out for payload: {payload.get('type')}") + except json.JSONDecodeError: + logging.error(f"Failed to decode JSON response. Response text: {response.text if 'response' in locals() else 'No response text'}") + except Exception as e: + logging.error(f"An unexpected error occurred in post_request: {e}", exc_info=True) + return None + + def get_user_state(self, user_address: str): + payload = {"type": "clearinghouseState", "user": user_address} + return self.post_request(payload) + + def get_open_orders(self, user_address: str): + payload = {"type": "openOrders", "user": user_address} + return self.post_request(payload) + + def get_user_rate_limit(self, user_address: str): + payload = {"type": "userRateLimit", "user": user_address} + return self.post_request(payload) + + def get_user_ledger_updates(self, user_address: str, start_time_ms: int, end_time_ms: int): + payload = { + "type": "userNonFundingLedgerUpdates", + "user": user_address, + "startTime": start_time_ms, + "endTime": end_time_ms + } + return self.post_request(payload) + +class WalletTracker: + """ + Main class to track wallets, process data, and store results. + """ + def __init__(self, api_client, wallets_to_track): + self.api = api_client + self.wallets = wallets_to_track # This is the list of dicts + self.wallets_by_name = {w['name']: w for w in self.wallets} + self.wallets_data = { + wallet['name']: {"address": wallet['address']} for wallet in self.wallets + } + logging.info(f"WalletTracker initialized for {len(self.wallets)} wallets.") + + def reload_wallets(self): + """ + Checks the INPUT_FILE for changes and updates the tracked wallet list. + """ + logging.debug("Reloading wallet list...") + try: + with open(INPUT_FILE, 'r') as f: + new_wallets_list = json.load(f) + if not isinstance(new_wallets_list, list): + logging.warning(f"Failed to reload '{INPUT_FILE}': content is not a list.") + return + + new_wallets_by_name = {w['name']: w for w in new_wallets_list} + old_names = set(self.wallets_by_name.keys()) + new_names = set(new_wallets_by_name.keys()) + + added_names = new_names - old_names + removed_names = old_names - new_names + + if not added_names and not removed_names: + logging.debug("Wallet list is unchanged.") + return # No changes + + # Update internal wallet list + self.wallets = new_wallets_list + self.wallets_by_name = new_wallets_by_name + + # Add new wallets to wallets_data + for name in added_names: + self.wallets_data[name] = {"address": self.wallets_by_name[name]['address']} + logging.info(f"Added new wallet to track: {name}") + + # Remove old wallets from wallets_data + for name in removed_names: + if name in self.wallets_data: + del self.wallets_data[name] + logging.info(f"Removed wallet from tracking: {name}") + + logging.info(f"Wallet list reloaded. Tracking {len(self.wallets)} wallets.") + + except (FileNotFoundError, json.JSONDecodeError, ValueError) as e: + logging.error(f"Failed to reload and parse '{INPUT_FILE}': {e}") + except Exception as e: + logging.error(f"Unexpected error during wallet reload: {e}", exc_info=True) + + + def calculate_core_metrics(self, state_data: dict) -> dict: + """ + Performs calculations based on user_state data. + """ + if not state_data or 'crossMarginSummary' not in state_data: + logging.warning("Core state data is missing 'crossMarginSummary'.") + return {"raw_state": state_data} + + summary = state_data['crossMarginSummary'] + account_value = float(summary.get('accountValue', 0)) + margin_used = float(summary.get('totalMarginUsed', 0)) + + # Calculations + margin_utilization = (margin_used / account_value) if account_value > 0 else 0 + available_margin = account_value - margin_used + + total_position_value = 0 + if 'assetPositions' in state_data: + for pos in state_data.get('assetPositions', []): + try: + # Use 'value' for position value + pos_value_str = pos.get('position', {}).get('value', '0') + total_position_value += float(pos_value_str) + except (ValueError, TypeError): + logging.warning(f"Could not parse position value: {pos.get('position', {}).get('value')}") + continue + + portfolio_leverage = (total_position_value / account_value) if account_value > 0 else 0 + + # Return calculated metrics alongside raw data + return { + "raw_state": state_data, + "account_value": account_value, + "margin_used": margin_used, + "margin_utilization": margin_utilization, + "available_margin": available_margin, + "total_position_value": total_position_value, + "portfolio_leverage": portfolio_leverage + } + + def poll_core_data(self): + logging.debug("Polling Core Data...") + # Use self.wallets which is updated by reload_wallets + for wallet in self.wallets: + name = wallet['name'] + address = wallet['address'] + state_data = self.api.get_user_state(address) + if state_data: + calculated_data = self.calculate_core_metrics(state_data) + # Ensure wallet hasn't been removed by a concurrent reload + if name in self.wallets_data: + self.wallets_data[name]['core_state'] = calculated_data + time.sleep(0.1) # Avoid bursting requests + + def poll_open_orders(self): + logging.debug("Polling Open Orders...") + for wallet in self.wallets: + name = wallet['name'] + address = wallet['address'] + orders_data = self.api.get_open_orders(address) + if orders_data: + # TODO: Add calculations for 'pending_margin_required' if logic is available + if name in self.wallets_data: + self.wallets_data[name]['open_orders'] = {"raw_orders": orders_data} + time.sleep(0.1) + + def poll_account_metrics(self): + logging.debug("Polling Account Metrics...") + for wallet in self.wallets: + name = wallet['name'] + address = wallet['address'] + metrics_data = self.api.get_user_rate_limit(address) + if metrics_data: + if name in self.wallets_data: + self.wallets_data[name]['account_metrics'] = metrics_data + time.sleep(0.1) + + def poll_ledger_updates(self): + logging.debug("Polling Ledger Updates...") + end_time_ms = int(datetime.now().timestamp() * 1000) + start_time_ms = int((datetime.now() - timedelta(minutes=15)).timestamp() * 1000) + + for wallet in self.wallets: + name = wallet['name'] + address = wallet['address'] + ledger_data = self.api.get_user_ledger_updates(address, start_time_ms, end_time_ms) + if ledger_data: + if name in self.wallets_data: + self.wallets_data[name]['ledger_updates'] = ledger_data + time.sleep(0.1) + + def save_data_to_json(self): + """ + Atomically writes the current wallet data to the output JSON file. + (No longer needs cleaning logic) + """ + logging.debug(f"Saving data to {OUTPUT_FILE}...") + + temp_file = OUTPUT_FILE + ".tmp" + try: + # Save the data + with open(temp_file, 'w', encoding='utf-8') as f: + # self.wallets_data is automatically kept clean by reload_wallets + json.dump(self.wallets_data, f, indent=2) + # Atomic rename (move) + os.replace(temp_file, OUTPUT_FILE) + except (IOError, json.JSONDecodeError) as e: + logging.error(f"Failed to write wallet data to file: {e}") + except Exception as e: + logging.error(f"An unexpected error occurred during file save: {e}") + if os.path.exists(temp_file): + os.remove(temp_file) + +class WhaleTrackerRunner: + """ + Manages the polling loop using last-run timestamps instead of a complex scheduler. + """ + def __init__(self, api_client, wallets, shared_whale_data_dict=None): # Kept arg for compatibility + self.tracker = WalletTracker(api_client, wallets) + self.last_poll_times = {key: 0 for key in POLL_INTERVALS} + self.poll_intervals = POLL_INTERVALS + logging.info("WhaleTrackerRunner initialized to save to JSON file.") + + def update_shared_data(self): + """ + This function is no longer called by the run loop. + It's kept here to prevent errors if imported elsewhere, but is now unused. + """ + logging.debug("No shared dict, saving data to JSON file.") + self.tracker.save_data_to_json() + + + def run(self): + logging.info("Starting main polling loop...") + while True: + try: + now = time.time() + + if now - self.last_poll_times['reload_wallets'] > self.poll_intervals['reload_wallets']: + self.tracker.reload_wallets() + self.last_poll_times['reload_wallets'] = now + + if now - self.last_poll_times['core_data'] > self.poll_intervals['core_data']: + self.tracker.poll_core_data() + self.last_poll_times['core_data'] = now + + if now - self.last_poll_times['open_orders'] > self.poll_intervals['open_orders']: + self.tracker.poll_open_orders() + self.last_poll_times['open_orders'] = now + + if now - self.last_poll_times['account_metrics'] > self.poll_intervals['account_metrics']: + self.tracker.poll_account_metrics() + self.last_poll_times['account_metrics'] = now + + if now - self.last_poll_times['ledger_updates'] > self.poll_intervals['ledger_updates']: + self.tracker.poll_ledger_updates() + self.last_poll_times['ledger_updates'] = now + + if now - self.last_poll_times['save_data'] > self.poll_intervals['save_data']: + self.tracker.save_data_to_json() # <-- NEW + self.last_poll_times['save_data'] = now + + # Sleep for a short duration to prevent busy-waiting + time.sleep(1) + + except Exception as e: + logging.critical(f"Unhandled exception in main loop: {e}", exc_info=True) + time.sleep(10) + +def setup_logging(log_level_str: str, process_name: str): + """Configures logging for the script.""" + if not os.path.exists(LOGS_DIR): + try: + os.makedirs(LOGS_DIR) + except OSError as e: + print(f"Failed to create logs directory {LOGS_DIR}: {e}") + return + + level_map = { + 'debug': logging.DEBUG, + 'normal': logging.INFO, + 'off': logging.NOTSET + } + log_level = level_map.get(log_level_str.lower(), logging.INFO) + + if log_level == logging.NOTSET: + return + + handlers_list = [logging.FileHandler(LOG_FILE, mode='a')] + + if sys.stdout.isatty(): + handlers_list.append(logging.StreamHandler(sys.stdout)) + + logging.basicConfig( + level=log_level, + format=f"%(asctime)s.%(msecs)03d | {process_name:<20} | %(levelname)-8s | %(message)s", + datefmt='%Y-%m-%d %H:%M:%S', + handlers=handlers_list + ) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Hyperliquid Whale Tracker") + parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) + args = parser.parse_args() + + setup_logging(args.log_level, "WhaleTracker") + + # Load wallets to track + wallets_to_track = [] + try: + with open(INPUT_FILE, 'r') as f: + wallets_to_track = json.load(f) + if not isinstance(wallets_to_track, list) or not wallets_to_track: + raise ValueError(f"'{INPUT_FILE}' is empty or not a list.") + except (FileNotFoundError, json.JSONDecodeError, ValueError) as e: + logging.critical(f"Failed to load '{INPUT_FILE}': {e}. Exiting.") + sys.exit(1) + + # Initialize API client + api_client = HyperliquidAPI(base_url=API_ENDPOINT) + + # Initialize and run the tracker + runner = WhaleTrackerRunner(api_client, wallets_to_track, shared_whale_data_dict=None) + + try: + runner.run() + except KeyboardInterrupt: + logging.info("Whale Tracker shutting down.") + sys.exit(0) +