diff --git a/live_market_utils.py b/live_market_utils.py index 31aa3eb..ae0eab3 100644 --- a/live_market_utils.py +++ b/live_market_utils.py @@ -74,13 +74,13 @@ def start_live_feed(shared_prices_dict, log_level='off'): # --- Watchdog Logic --- time.sleep(15) # Check the connection every 15 seconds - if info is None or not info.ws_manager.is_running(): - # --- FIX: Log this critical failure to the persistent error log --- + # --- FIX: Changed 'is_running()' to the correct method 'is_alive()' --- + if info is None or not info.ws_manager.is_alive(): error_msg = "WebSocket connection lost or not running. Attempting to reconnect..." logging.warning(error_msg) log_error(error_msg, include_traceback=False) # Log it to the file - if info: + if info and info.ws_manager: # Check if ws_manager exists before stopping try: info.ws_manager.stop() # Clean up old manager except Exception as e: diff --git a/main_app.py b/main_app.py index 32048c4..b425701 100644 --- a/main_app.py +++ b/main_app.py @@ -9,10 +9,13 @@ import schedule import sqlite3 import pandas as pd from datetime import datetime, timezone +import importlib from logging_utils import setup_logging # --- Using the new high-performance WebSocket utility for live prices --- from live_market_utils import start_live_feed +# --- Import the base class for type hinting (optional but good practice) --- +from strategies.base_strategy import BaseStrategy # --- Configuration --- WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"] @@ -108,21 +111,70 @@ def market_cap_fetcher_scheduler(): def run_strategy(strategy_name: str, config: dict): - """Target function to run a strategy, redirecting its output to a log file.""" - log_file = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log") - script_name = config['script'] - command = [sys.executable, script_name, "--name", strategy_name, "--log-level", "normal"] + """ + This function BECOMES the strategy runner. It is executed as a separate + process by multiprocessing. + """ + # These imports only happen in the new, lightweight process + import importlib + import os + import sys + import time + import logging + from logging_utils import setup_logging + from strategies.base_strategy import BaseStrategy + + # --- Setup logging to file for this specific process --- + log_file_path = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log") + try: + # Redirect stdout and stderr of this process to its log file + sys.stdout = open(log_file_path, 'a') + sys.stderr = sys.stdout + except Exception as e: + print(f"Failed to open log file for {strategy_name}: {e}") + + # Setup logging *within this process* + setup_logging('normal', f"Strategy-{strategy_name}") + + # --- Main resilient loop (was previously in main_app) --- while True: try: - with open(log_file, 'a') as f: - f.write(f"\n--- Starting strategy '{strategy_name}' at {datetime.now()} ---\n") - subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) - except (subprocess.CalledProcessError, Exception) as e: - with open(log_file, 'a') as f: - f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") - f.write(f"Strategy '{strategy_name}' failed: {e}. Restarting...\n") + logging.info(f"--- Starting strategy '{strategy_name}' ---") + + # 1. Load the strategy class + if 'class' not in config: + logging.error(f"Strategy config for '{strategy_name}' is missing the 'class' key. Exiting.") + return + + module_path, class_name = config['class'].rsplit('.', 1) + module = importlib.import_module(module_path) + StrategyClass = getattr(module, class_name) + strategy = StrategyClass(strategy_name, config['parameters']) # Log level is now handled here + + # 2. Run the strategy's logic loop + logging.info(f"Starting main logic loop for {strategy.coin} on {strategy.timeframe}.") + while True: + df = strategy.load_data() + if df.empty: + logging.warning("No data loaded. Waiting 1 minute...") + time.sleep(60) + continue + + strategy.calculate_signals_and_state(df.copy()) + strategy._save_status() + + logging.info(f"Current Signal: {strategy.current_signal}") + time.sleep(60) # Simple 1-minute wait + + except KeyboardInterrupt: + logging.info("Strategy process stopping.") + return # Exit the outer loop on Ctrl+C + except Exception as e: + logging.error(f"Strategy '{strategy_name}' failed: {e}", exc_info=True) + logging.info("Restarting strategy in 10 seconds...") time.sleep(10) + def run_trade_executor(): """Target function to run the trade_executor.py script in a resilient loop.""" log_file = os.path.join(LOGS_DIR, "trade_executor.log") @@ -207,10 +259,22 @@ class MainApp: left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Live Price':>10} | {'Market Cap':>15} |") left_table_lines.append("-" * left_table_width) for i, coin in enumerate(self.watched_coins, 1): - price = self.prices.get(coin, "Loading...") + price_str = self.prices.get(coin, "Loading...") + # Format the price string + try: + price_float = float(price_str) + if price_float < 1: + price_str = f"{price_float:>10.6f}" + elif price_float < 100: + price_str = f"{price_float:>10.4f}" + else: + price_str = f"{price_float:>10.2f}" + except (ValueError, TypeError): + price_str = f"{'Loading...':>10}" + market_cap = self.market_caps.get(coin) formatted_mc = format_market_cap(market_cap) - left_table_lines.append(f"{i:<2} | {coin:^6} | {price:>10} | {formatted_mc:>15} |") + left_table_lines.append(f"{i:<2} | {coin:^6} | {price_str} | {formatted_mc:>15} |") left_table_lines.append("-" * left_table_width) right_table_lines = ["--- Strategy Status ---"] @@ -234,7 +298,7 @@ class MainApp: timeframe = config_params.get('timeframe', 'N/A') size = config_params.get('size', 'N/A') - other_params = {k: v for k, v in config_params.items() if k not in ['coin', 'timeframe', 'size']} + other_params = {k: v for k, v in config.get('parameters', {}).items() if k not in ['coin', 'timeframe', 'size']} params_str = ", ".join([f"{k}={v}" for k, v in other_params.items()]) right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size:>8} | {params_str:<45} |") right_table_lines.append("-" * right_table_width) @@ -338,8 +402,9 @@ if __name__ == "__main__": for name, config in strategy_configs.items(): if config.get("enabled", False): - if not os.path.exists(config['script']): - logging.error(f"Strategy script '{config['script']}' for '{name}' not found. Skipping.") + # --- FIX: Check for the 'class' key, not the 'script' key --- + if 'class' not in config: + logging.error(f"Strategy '{name}' is missing 'class' key. Skipping.") continue proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True) processes[f"Strategy: {name}"] = proc diff --git a/position_monitor.py b/position_monitor.py new file mode 100644 index 0000000..e20ec5d --- /dev/null +++ b/position_monitor.py @@ -0,0 +1,159 @@ +import os +import sys +import time +import json +import argparse +from datetime import datetime, timezone +from hyperliquid.info import Info +from hyperliquid.utils import constants +from dotenv import load_dotenv +import logging + +from logging_utils import setup_logging + +# Load .env file +load_dotenv() + +class PositionMonitor: + """ + A standalone, read-only dashboard for monitoring all open perpetuals + positions, spot balances, and their associated strategies. + """ + + def __init__(self, log_level: str): + setup_logging(log_level, 'PositionMonitor') + + self.wallet_address = os.environ.get("MAIN_WALLET_ADDRESS") + if not self.wallet_address: + logging.error("MAIN_WALLET_ADDRESS not set in .env file. Cannot proceed.") + sys.exit(1) + + self.info = Info(constants.MAINNET_API_URL, skip_ws=True) + self.managed_positions_path = os.path.join("_data", "executor_managed_positions.json") + self._lines_printed = 0 + + logging.info(f"Monitoring vault address: {self.wallet_address}") + + def load_managed_positions(self) -> dict: + """Loads the state of which strategy manages which position.""" + if os.path.exists(self.managed_positions_path): + try: + with open(self.managed_positions_path, 'r') as f: + # Create a reverse map: {coin: strategy_name} + data = json.load(f) + return {v['coin']: k for k, v in data.items()} + except (IOError, json.JSONDecodeError): + logging.warning("Could not read managed positions file.") + return {} + + def run(self): + """Main loop to continuously refresh the dashboard.""" + try: + while True: + self.display_dashboard() + time.sleep(5) # Refresh every 5 seconds + except KeyboardInterrupt: + logging.info("Position monitor stopped.") + + def display_dashboard(self): + """Fetches all data and draws the dashboard without blinking.""" + if self._lines_printed > 0: + print(f"\x1b[{self._lines_printed}A", end="") + + output_lines = [] + try: + perp_state = self.info.user_state(self.wallet_address) + spot_state = self.info.spot_user_state(self.wallet_address) + coin_to_strategy_map = self.load_managed_positions() + + output_lines.append(f"--- Live Position Monitor for {self.wallet_address[:6]}...{self.wallet_address[-4:]} ---") + + # --- 1. Perpetuals Account Summary --- + margin_summary = perp_state.get('marginSummary', {}) + account_value = float(margin_summary.get('accountValue', 0)) + margin_used = float(margin_summary.get('totalMarginUsed', 0)) + utilization = (margin_used / account_value) * 100 if account_value > 0 else 0 + + output_lines.append("\n--- Perpetuals Account Summary ---") + output_lines.append(f" Account Value: ${account_value:,.2f} | Margin Used: ${margin_used:,.2f} | Utilization: {utilization:.2f}%") + + # --- 2. Spot Balances Summary --- + output_lines.append("\n--- Spot Balances ---") + spot_balances = spot_state.get('balances', []) + if not spot_balances: + output_lines.append(" No spot balances found.") + else: + balances_str = ", ".join([f"{b.get('coin')}: {float(b.get('total', 0)):,.4f}" for b in spot_balances if float(b.get('total', 0)) > 0]) + output_lines.append(f" {balances_str}") + + # --- 3. Open Positions Table --- + output_lines.append("\n--- Open Perpetual Positions ---") + positions = perp_state.get('assetPositions', []) + open_positions = [p for p in positions if p.get('position') and float(p['position'].get('szi', 0)) != 0] + + if not open_positions: + output_lines.append(" No open perpetual positions found.") + output_lines.append("") # Add a line for stable refresh + else: + self.build_positions_table(open_positions, coin_to_strategy_map, output_lines) + + except Exception as e: + output_lines = [f"An error occurred: {e}"] + + final_output = "\n".join(output_lines) + "\n\x1b[J" # \x1b[J clears to end of screen + print(final_output, end="") + + self._lines_printed = len(output_lines) + sys.stdout.flush() + + def build_positions_table(self, positions: list, coin_to_strategy_map: dict, output_lines: list): + """Builds the text for the positions summary table.""" + header = f"| {'Strategy':<25} | {'Coin':<6} | {'Side':<5} | {'Size':>15} | {'Entry Price':>12} | {'Mark Price':>12} | {'PNL':>15} | {'Leverage':>10} |" + output_lines.append(header) + output_lines.append("-" * len(header)) + + for position in positions: + pos = position.get('position', {}) + coin = pos.get('coin', 'Unknown') + size = float(pos.get('szi', 0)) + entry_px = float(pos.get('entryPx', 0)) + mark_px = float(pos.get('markPx', 0)) + unrealized_pnl = float(pos.get('unrealizedPnl', 0)) + + # Get leverage + position_value = float(pos.get('positionValue', 0)) + margin_used = float(pos.get('marginUsed', 0)) + leverage = (position_value / margin_used) if margin_used > 0 else 0 + + side_text = "LONG" if size > 0 else "SHORT" + pnl_sign = "+" if unrealized_pnl >= 0 else "" + + # Find the strategy that owns this coin + strategy_name = coin_to_strategy_map.get(coin, "Unmanaged") + + # Format all values as strings + strategy_str = f"{strategy_name:<25}" + coin_str = f"{coin:<6}" + side_str = f"{side_text:<5}" + size_str = f"{size:>15.4f}" + entry_str = f"${entry_px:>11,.2f}" + mark_str = f"${mark_px:>11,.2f}" + pnl_str = f"{pnl_sign}${unrealized_pnl:>14,.2f}" + lev_str = f"{leverage:>9.1f}x" + + output_lines.append(f"| {strategy_str} | {coin_str} | {side_str} | {size_str} | {entry_str} | {mark_str} | {pnl_str} | {lev_str} |") + + output_lines.append("-" * len(header)) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Monitor a Hyperliquid wallet's positions in real-time.") + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level for the script." + ) + args = parser.parse_args() + + monitor = PositionMonitor(log_level=args.log_level) + monitor.run() diff --git a/strategies/base_strategy.py b/strategies/base_strategy.py index e40452c..ebeb063 100644 --- a/strategies/base_strategy.py +++ b/strategies/base_strategy.py @@ -6,13 +6,16 @@ import logging from datetime import datetime, timezone import sqlite3 +from logging_utils import setup_logging + class BaseStrategy(ABC): """ An abstract base class that defines the blueprint for all trading strategies. - It provides common functionality like loading data and saving status. + It provides common functionality like loading data, saving status, and state management. """ - def __init__(self, strategy_name: str, params: dict, log_level: str): + def __init__(self, strategy_name: str, params: dict): + # Note: log_level is not needed here as logging is set up by the process self.strategy_name = strategy_name self.params = params self.coin = params.get("coin", "N/A") @@ -20,21 +23,17 @@ class BaseStrategy(ABC): self.db_path = os.path.join("_data", "market_data.db") self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json") - # --- ADDED: State variables required for status reporting --- self.current_signal = "INIT" self.last_signal_change_utc = None self.signal_price = None - # This will be set up by the child class after it's initialized - # setup_logging(log_level, f"Strategy-{self.strategy_name}") - # logging.info(f"Initializing with parameters: {self.params}") + logging.info(f"Initializing with parameters: {self.params}") def load_data(self) -> pd.DataFrame: """Loads historical data for the configured coin and timeframe.""" table_name = f"{self.coin}_{self.timeframe}" - # Dynamically determine the number of candles needed based on all possible period parameters - periods = [v for k, v in self.params.items() if 'period' in k or '_ma' in k or 'slow' in k] + periods = [v for k, v in self.params.items() if 'period' in k or '_ma' in k or 'slow' in k or 'fast' in k] limit = max(periods) + 50 if periods else 500 try: @@ -51,10 +50,30 @@ class BaseStrategy(ABC): @abstractmethod def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame: - """ - The core logic of the strategy. Must be implemented by child classes. - """ + """The core logic of the strategy. Must be implemented by child classes.""" pass + + def calculate_signals_and_state(self, df: pd.DataFrame): + """ + A wrapper that calls the strategy's signal calculation and then + determines the last signal change from the historical data. + """ + df_with_signals = self.calculate_signals(df) + df_with_signals.dropna(inplace=True) + if df_with_signals.empty: return + + df_with_signals['position_change'] = df_with_signals['signal'].diff() + + last_signal = df_with_signals['signal'].iloc[-1] + if last_signal == 1: self.current_signal = "BUY" + elif last_signal == -1: self.current_signal = "SELL" + else: self.current_signal = "HOLD" + + last_change_series = df_with_signals[df_with_signals['position_change'] != 0] + if not last_change_series.empty: + last_change_row = last_change_series.iloc[-1] + self.last_signal_change_utc = last_change_row.name.tz_localize('UTC').isoformat() + self.signal_price = last_change_row['close'] def _save_status(self): """Saves the current strategy state to its JSON file.""" diff --git a/strategies/ma_cross_strategy.py b/strategies/ma_cross_strategy.py index 0002375..3effd1e 100644 --- a/strategies/ma_cross_strategy.py +++ b/strategies/ma_cross_strategy.py @@ -7,29 +7,23 @@ class MaCrossStrategy(BaseStrategy): A strategy based on a fast Simple Moving Average (SMA) crossing a slow SMA. """ - def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame: - # Support multiple naming conventions: some configs use 'fast'/'slow' - # while others use 'short_ma'/'long_ma'. Normalize here so both work. - fast_ma_period = self.params.get('short_ma') or self.params.get('fast') or 0 - slow_ma_period = self.params.get('long_ma') or self.params.get('slow') or 0 - - # If parameters are missing, return a neutral signal frame. - if not fast_ma_period or not slow_ma_period: - logging.warning(f"Missing MA period parameters (fast={fast_ma_period}, slow={slow_ma_period}).") - df['signal'] = 0 - return df + def __init__(self, strategy_name: str, params: dict, log_level: str): + super().__init__(strategy_name, params) + self.fast_ma_period = self.params.get('short_ma') or self.params.get('fast') or 0 + self.slow_ma_period = self.params.get('long_ma') or self.params.get('slow') or 0 - if len(df) < slow_ma_period: - logging.warning(f"Not enough data for MA periods {fast_ma_period}/{slow_ma_period}. Need {slow_ma_period}, have {len(df)}.") + def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame: + if not self.fast_ma_period or not self.slow_ma_period or len(df) < self.slow_ma_period: + logging.warning(f"Not enough data for MA periods.") df['signal'] = 0 return df - df['fast_sma'] = df['close'].rolling(window=fast_ma_period).mean() - df['slow_sma'] = df['close'].rolling(window=slow_ma_period).mean() + df['fast_sma'] = df['close'].rolling(window=self.fast_ma_period).mean() + df['slow_sma'] = df['close'].rolling(window=self.slow_ma_period).mean() - # Signal is 1 for Golden Cross (fast > slow), -1 for Death Cross df['signal'] = 0 df.loc[df['fast_sma'] > df['slow_sma'], 'signal'] = 1 df.loc[df['fast_sma'] < df['slow_sma'], 'signal'] = -1 return df + diff --git a/strategies/single_sma_strategy.py b/strategies/single_sma_strategy.py index 52a1d33..4dd2ddb 100644 --- a/strategies/single_sma_strategy.py +++ b/strategies/single_sma_strategy.py @@ -6,19 +6,21 @@ class SingleSmaStrategy(BaseStrategy): """ A strategy based on the price crossing a single Simple Moving Average (SMA). """ + def __init__(self, strategy_name: str, params: dict): + super().__init__(strategy_name, params) + self.sma_period = self.params.get('sma_period', 0) + def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame: - sma_period = self.params.get('sma_period', 0) - - if not sma_period or len(df) < sma_period: - logging.warning(f"Not enough data for SMA period {sma_period}. Need {sma_period}, have {len(df)}.") + if not self.sma_period or len(df) < self.sma_period: + logging.warning(f"Not enough data for SMA period {self.sma_period}.") df['signal'] = 0 return df - df['sma'] = df['close'].rolling(window=sma_period).mean() + df['sma'] = df['close'].rolling(window=self.sma_period).mean() - # Signal is 1 when price is above SMA, -1 when below df['signal'] = 0 df.loc[df['close'] > df['sma'], 'signal'] = 1 df.loc[df['close'] < df['sma'], 'signal'] = -1 return df +