fixes, old way to handle strategies

This commit is contained in:
2025-10-27 21:54:33 +01:00
parent 541a71d2a6
commit 93363750ae
9 changed files with 1063 additions and 203 deletions

View File

@ -15,13 +15,13 @@ from logging_utils import setup_logging
# --- Configuration ---
DEFAULT_ADDRESSES_TO_WATCH = [
#"0xd4c1f7e8d876c4749228d515473d36f919583d1d",
"0x0fd468a73084daa6ea77a9261e40fdec3e67e0c7",
"0x47930c76790c865217472f2ddb4d14c640ee450a",
# "0x4d69495d16fab95c3c27b76978affa50301079d0",
# "0x09bc1cf4d9f0b59e1425a8fde4d4b1f7d3c9410d",
"0xc6ac58a7a63339898aeda32499a8238a46d88e84",
"0xa8ef95dbd3db55911d3307930a84b27d6e969526",
# "0x4129c62faf652fea61375dcd9ca8ce24b2bb8b95",
"0xbf1935fe7ab6d0aa3ee8d3da47c2f80e215b2a1c",
"0x32885a6adac4375858E6edC092EfDDb0Ef46484C",
]
MAX_FILLS_TO_DISPLAY = 10
LOGS_DIR = "_logs"

136
dashboard_data_fetcher.py Normal file
View File

@ -0,0 +1,136 @@
import logging
import os
import sys
import json
import time
import argparse # <-- THE FIX: Added this import
from datetime import datetime
from eth_account import Account
from hyperliquid.info import Info
from hyperliquid.utils import constants
from dotenv import load_dotenv
from logging_utils import setup_logging
# Load .env file
load_dotenv()
class DashboardDataFetcher:
"""
A dedicated, lightweight process that runs in a loop to fetch and save
the account's state (balances, positions) for the main dashboard to display.
"""
def __init__(self, log_level: str):
setup_logging(log_level, 'DashboardDataFetcher')
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.vault_address:
logging.error("MAIN_WALLET_ADDRESS not set in .env file. Cannot proceed.")
sys.exit(1)
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
self.status_file_path = os.path.join("_logs", "trade_executor_status.json")
self.managed_positions_path = os.path.join("_data", "executor_managed_positions.json")
logging.info(f"Dashboard Data Fetcher initialized for vault: {self.vault_address}")
def load_managed_positions(self) -> dict:
"""Loads the state of which strategy manages which position."""
if os.path.exists(self.managed_positions_path):
try:
with open(self.managed_positions_path, 'r') as f:
data = json.load(f)
# Create a reverse map: {coin: strategy_name}
return {v['coin']: k for k, v in data.items()}
except (IOError, json.JSONDecodeError):
logging.warning("Could not read managed positions file.")
return {}
def fetch_and_save_status(self):
"""Fetches all account data and saves it to the JSON status file."""
try:
perpetuals_state = self.info.user_state(self.vault_address)
spot_state = self.info.spot_user_state(self.vault_address)
meta, all_market_contexts = self.info.meta_and_asset_ctxs()
coin_to_strategy_map = self.load_managed_positions()
status = {
"last_updated_utc": datetime.now().isoformat(),
"perpetuals_account": { "balances": {}, "open_positions": [] },
"spot_account": { "positions": [] }
}
# 1. Extract Perpetuals Account Data
margin_summary = perpetuals_state.get("marginSummary", {})
status["perpetuals_account"]["balances"] = {
"account_value": margin_summary.get("accountValue"),
"total_margin_used": margin_summary.get("totalMarginUsed"),
"withdrawable": margin_summary.get("withdrawable")
}
asset_positions = perpetuals_state.get("assetPositions", [])
for asset_pos in asset_positions:
pos = asset_pos.get('position', {})
if float(pos.get('szi', 0)) != 0:
coin = pos.get('coin')
position_value = float(pos.get('positionValue', 0))
margin_used = float(pos.get('marginUsed', 0))
leverage = position_value / margin_used if margin_used > 0 else 0
position_info = {
"coin": coin,
"strategy": coin_to_strategy_map.get(coin, "Unmanaged"),
"size": pos.get('szi'),
"position_value": pos.get('positionValue'),
"entry_price": pos.get('entryPx'),
"mark_price": pos.get('markPx'),
"pnl": pos.get('unrealizedPnl'),
"liq_price": pos.get('liquidationPx'),
"margin": pos.get('marginUsed'),
"funding": pos.get('fundingRate'),
"leverage": f"{leverage:.1f}x"
}
status["perpetuals_account"]["open_positions"].append(position_info)
# 2. Extract Spot Account Data
price_map = { asset.get("universe", {}).get("name"): asset.get("markPx") for asset in all_market_contexts if asset.get("universe", {}).get("name") }
spot_balances = spot_state.get("balances", [])
for bal in spot_balances:
total_balance = float(bal.get('total', 0))
if total_balance > 0:
coin = bal.get('coin')
mark_price = float(price_map.get(coin, 0))
status["spot_account"]["positions"].append({
"coin": coin, "balance_size": total_balance,
"position_value": total_balance * mark_price, "pnl": "N/A"
})
# 3. Write to file
# Use atomic write to prevent partial reads from main_app
temp_file_path = self.status_file_path + ".tmp"
with open(temp_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
# Rename is atomic
os.replace(temp_file_path, self.status_file_path)
logging.debug(f"Successfully updated dashboard status file.")
except Exception as e:
logging.error(f"Failed to fetch or save account status: {e}")
def run(self):
"""Main loop to periodically fetch and save data."""
while True:
self.fetch_and_save_status()
time.sleep(5) # Update dashboard data every 5 seconds
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run the Dashboard Data Fetcher.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
fetcher = DashboardDataFetcher(log_level=args.log_level)
try:
fetcher.run()
except KeyboardInterrupt:
logging.info("Dashboard Data Fetcher stopped.")

View File

@ -19,11 +19,11 @@ from strategies.base_strategy import BaseStrategy
# --- Configuration ---
WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"]
# --- FIX: Replaced old data_fetcher with the new live_candle_fetcher ---
LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py"
RESAMPLER_SCRIPT = "resampler.py"
MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py"
TRADE_EXECUTOR_SCRIPT = "trade_executor.py"
DASHBOARD_DATA_FETCHER_SCRIPT = "dashboard_data_fetcher.py"
STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json")
DB_PATH = os.path.join("_data", "market_data.db")
MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json")
@ -65,6 +65,7 @@ def run_resampler_job(timeframes_to_generate: list):
"""Defines the job for the resampler, redirecting output to a log file."""
log_file = os.path.join(LOGS_DIR, "resampler.log")
try:
# --- MODIFIED: No longer needs to check for empty list, coins are from WATCHED_COINS ---
command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--timeframes"] + timeframes_to_generate + ["--log-level", "normal"]
with open(log_file, 'a') as f:
f.write(f"\n--- Starting resampler.py job at {datetime.now()} ---\n")
@ -78,10 +79,15 @@ def run_resampler_job(timeframes_to_generate: list):
def resampler_scheduler(timeframes_to_generate: list):
"""Schedules the resampler.py script."""
setup_logging('off', 'ResamplerScheduler')
if not timeframes_to_generate:
logging.warning("Resampler scheduler started but no timeframes were provided to generate. The process will idle.")
return # Exit the function if there's nothing to do
run_resampler_job(timeframes_to_generate)
# Schedule to run every minute at the :01 second mark
schedule.every().minute.at(":01").do(run_resampler_job, timeframes_to_generate=timeframes_to_generate)
logging.info("Resampler scheduled to run every minute at :01.")
logging.info(f"Resampler scheduled to run every minute at :01 for {timeframes_to_generate}.")
while True:
schedule.run_pending()
time.sleep(1) # Check every second to not miss the scheduled time
@ -110,10 +116,32 @@ def market_cap_fetcher_scheduler():
time.sleep(60)
def run_strategy(strategy_name: str, config: dict):
def run_trade_executor(trade_signal_queue):
"""
Target function to run the trade_executor.py script in a resilient loop.
It passes the shared signal queue to the executor.
"""
log_file = os.path.join(LOGS_DIR, "trade_executor.log")
while True:
try:
with open(log_file, 'a') as f:
f.write(f"\n--- Starting Trade Executor at {datetime.now()} ---\n")
from trade_executor import TradeExecutor
executor = TradeExecutor(log_level="normal", trade_signal_queue=trade_signal_queue)
executor.run() # This will block and run forever
except (subprocess.CalledProcessError, Exception) as e:
with open(log_file, 'a') as f:
f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n")
f.write(f"Trade Executor failed: {e}. Restarting...\n")
time.sleep(10)
def run_strategy(strategy_name: str, config: dict, trade_signal_queue: multiprocessing.Queue):
"""
This function BECOMES the strategy runner. It is executed as a separate
process by multiprocessing.
process and pushes signals to the shared queue.
"""
# These imports only happen in the new, lightweight process
import importlib
@ -127,21 +155,17 @@ def run_strategy(strategy_name: str, config: dict):
# --- Setup logging to file for this specific process ---
log_file_path = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log")
try:
# Redirect stdout and stderr of this process to its log file
sys.stdout = open(log_file_path, 'a')
sys.stdout = open(log_file_path, 'a', buffering=1) # 1 = line buffering
sys.stderr = sys.stdout
except Exception as e:
print(f"Failed to open log file for {strategy_name}: {e}")
# Setup logging *within this process*
setup_logging('normal', f"Strategy-{strategy_name}")
# --- Main resilient loop (was previously in main_app) ---
while True:
try:
logging.info(f"--- Starting strategy '{strategy_name}' ---")
# 1. Load the strategy class
if 'class' not in config:
logging.error(f"Strategy config for '{strategy_name}' is missing the 'class' key. Exiting.")
return
@ -149,44 +173,37 @@ def run_strategy(strategy_name: str, config: dict):
module_path, class_name = config['class'].rsplit('.', 1)
module = importlib.import_module(module_path)
StrategyClass = getattr(module, class_name)
strategy = StrategyClass(strategy_name, config['parameters']) # Log level is now handled here
strategy = StrategyClass(strategy_name, config['parameters'], trade_signal_queue)
# 2. Run the strategy's logic loop
logging.info(f"Starting main logic loop for {strategy.coin} on {strategy.timeframe}.")
while True:
df = strategy.load_data()
if df.empty:
logging.warning("No data loaded. Waiting 1 minute...")
time.sleep(60)
continue
strategy.calculate_signals_and_state(df.copy())
strategy._save_status()
logging.info(f"Current Signal: {strategy.current_signal}")
time.sleep(60) # Simple 1-minute wait
if config.get("is_event_driven", False):
logging.info(f"Starting EVENT-DRIVEN logic loop...")
strategy.run_event_loop() # This is a blocking call
else:
logging.info(f"Starting POLLING logic loop...")
strategy.run_polling_loop() # This is the original blocking call
except KeyboardInterrupt:
logging.info("Strategy process stopping.")
return # Exit the outer loop on Ctrl+C
return
except Exception as e:
logging.error(f"Strategy '{strategy_name}' failed: {e}", exc_info=True)
logging.info("Restarting strategy in 10 seconds...")
time.sleep(10)
def run_trade_executor():
"""Target function to run the trade_executor.py script in a resilient loop."""
log_file = os.path.join(LOGS_DIR, "trade_executor.log")
def run_dashboard_data_fetcher():
"""Target function to run the dashboard_data_fetcher.py script."""
log_file = os.path.join(LOGS_DIR, "dashboard_data_fetcher.log")
while True:
try:
with open(log_file, 'a') as f:
f.write(f"\n--- Starting Trade Executor at {datetime.now()} ---\n")
subprocess.run([sys.executable, TRADE_EXECUTOR_SCRIPT, "--log-level", "normal"], check=True, stdout=f, stderr=subprocess.STDOUT)
f.write(f"\n--- Starting Dashboard Data Fetcher at {datetime.now()} ---\n")
subprocess.run([sys.executable, DASHBOARD_DATA_FETCHER_SCRIPT, "--log-level", "normal"], check=True, stdout=f, stderr=subprocess.STDOUT)
except (subprocess.CalledProcessError, Exception) as e:
with open(log_file, 'a') as f:
f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n")
f.write(f"Trade Executor failed: {e}. Restarting...\n")
f.write(f"Dashboard Data Fetcher failed: {e}. Restarting...\n")
time.sleep(10)
@ -207,13 +224,15 @@ class MainApp:
try:
self.prices = dict(self.shared_prices)
except Exception as e:
logging.debug(f"Could not read from shared prices dict: {e}")
logging.debug("Could not read from shared prices dict: {e}")
def read_market_caps(self):
"""Reads the latest market cap summary from its JSON file."""
if os.path.exists(MARKET_CAP_SUMMARY_FILE):
try:
with open(MARKET_CAP_SUMMARY_FILE, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
for coin in self.watched_coins:
table_key = f"{coin}_market_cap"
if table_key in summary_data:
@ -222,6 +241,7 @@ class MainApp:
logging.debug("Could not read market cap summary file.")
def read_strategy_statuses(self):
"""Reads the status JSON file for each enabled strategy."""
enabled_statuses = {}
for name, config in self.strategy_configs.items():
if config.get("enabled", False):
@ -237,6 +257,7 @@ class MainApp:
self.strategy_statuses = enabled_statuses
def read_executor_status(self):
"""Reads the live status file from the trade executor."""
if os.path.exists(TRADE_EXECUTOR_STATUS_FILE):
try:
with open(TRADE_EXECUTOR_STATUS_FILE, 'r', encoding='utf-8') as f:
@ -247,11 +268,13 @@ class MainApp:
self.open_positions = {}
def check_process_status(self):
"""Checks if the background processes are still running."""
for name, process in self.background_processes.items():
self.process_status[name] = "Running" if process.is_alive() else "STOPPED"
def display_dashboard(self):
print("\x1b[H\x1b[J", end="")
"""Displays a formatted dashboard with side-by-side tables."""
print("\x1b[H\x1b[J", end="") # Clear screen
left_table_lines = ["--- Market Dashboard ---"]
left_table_width = 44
@ -278,9 +301,11 @@ class MainApp:
left_table_lines.append("-" * left_table_width)
right_table_lines = ["--- Strategy Status ---"]
right_table_width = 154
# --- FIX: Adjusted table width after removing parameters ---
right_table_width = 105
right_table_lines.append("-" * right_table_width)
right_table_lines.append(f"{'#':^2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':^8} | {'Signal Price':>12} | {'Last Change':>17} | {'TF':^5} | {'Size':^8} | {'Parameters':<45} |")
# --- FIX: Removed 'Parameters' from header ---
right_table_lines.append(f"{'#':^2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':^8} | {'Signal Price':>12} | {'Last Change':>17} | {'TF':^5} | {'Size':^8} |")
right_table_lines.append("-" * right_table_width)
for i, (name, status) in enumerate(self.strategy_statuses.items(), 1):
signal = status.get('current_signal', 'N/A')
@ -294,13 +319,40 @@ class MainApp:
last_change_display = dt_local.strftime('%Y-%m-%d %H:%M')
config_params = self.strategy_configs.get(name, {}).get('parameters', {})
coin = config_params.get('coin', 'N/A')
timeframe = config_params.get('timeframe', 'N/A')
size = config_params.get('size', 'N/A')
other_params = {k: v for k, v in config.get('parameters', {}).items() if k not in ['coin', 'timeframe', 'size']}
params_str = ", ".join([f"{k}={v}" for k, v in other_params.items()])
right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size:>8} | {params_str:<45} |")
# --- NEW ROBUST LOGIC ---
# 1. Get Timeframe (always from config)
timeframe = config_params.get('timeframe', 'N/A')
# 2. Get Coin: Try status file first (live), then config file (static)
coin = status.get('coin', config_params.get('coin', 'N/A'))
# 3. Get Size: Try status file first, then config file
size_from_status = status.get('size', None)
size_from_config = config_params.get('size', None)
size = "N/A"
if size_from_status is not None:
size = size_from_status # Use live status from copy_trader
elif size_from_config is not None:
size = size_from_config # Use config from simple strategy
elif 'coins_to_copy' in config_params:
# Special case: copy_trader, but status file is old (no 'size' field)
if coin != 'N/A' and coin != 'Multi':
# Try to find size in config if we know the coin from status
# --- SYNTAX FIX: Removed extra ".get(" ---
size = config_params.get('coins_to_copy', {}).get(coin, {}).get('size', 'Multi')
else:
coin = 'Multi' # It's a copy trader, but we don't know the coin
size = 'Multi'
size_display = f"{size:>8}" if isinstance(size, (int, float)) else f"{str(size):>8}"
# --- END OF NEW LOGIC ---
# --- FIX: Removed parameter string logic ---
# --- FIX: Removed 'params_str' from the formatted line ---
right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size_display} |")
right_table_lines.append("-" * right_table_width)
output_lines = []
@ -346,9 +398,7 @@ class MainApp:
output_lines.append(f"{'Spot':<10} | {coin:<6} | {balance_size:>15} | {'-':>12} | {'-':>12} | {pnl:>15} | {'-':>10} |")
output_lines.append("-" * pos_table_width)
output_lines.append("\n--- Background Processes ---")
for name, status in self.process_status.items():
output_lines.append(f"{name:<25}: {status}")
# --- REMOVED: Background Processes Section ---
final_output = "\n".join(output_lines)
print(final_output)
@ -361,7 +411,7 @@ class MainApp:
self.read_market_caps()
self.read_strategy_statuses()
self.read_executor_status()
self.check_process_status()
# --- REMOVED: self.check_process_status() ---
self.display_dashboard()
time.sleep(0.5)
@ -381,32 +431,34 @@ if __name__ == "__main__":
logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}")
sys.exit(1)
required_timeframes = set()
for name, config in strategy_configs.items():
if config.get("enabled", False):
tf = config.get("parameters", {}).get("timeframe")
if tf:
required_timeframes.add(tf)
if not required_timeframes:
logging.warning("No timeframes required by any enabled strategy.")
# --- MODIFIED: Removed dynamic timeframe logic ---
# --- NEW: Hardcoded timeframes for the resampler ---
resampler_timeframes = [
"3m", "5m", "15m", "30m", "1h", "2h", "4h", "8h",
"12h", "1d", "3d", "1w", "1M", "148m", "37m"
]
logging.info(f"Using hardcoded timeframes for resampler: {resampler_timeframes}")
# --- END NEW ---
with multiprocessing.Manager() as manager:
shared_prices = manager.dict()
trade_signal_queue = manager.Queue()
processes["Live Market Feed"] = multiprocessing.Process(target=start_live_feed, args=(shared_prices, 'off'), daemon=True)
processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True)
processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True)
# --- MODIFIED: Pass the new hardcoded list to the resampler process ---
processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(resampler_timeframes,), daemon=True)
processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True)
processes["Trade Executor"] = multiprocessing.Process(target=run_trade_executor, daemon=True)
processes["Trade Executor"] = multiprocessing.Process(target=run_trade_executor, args=(trade_signal_queue,), daemon=True)
processes["Dashboard Data"] = multiprocessing.Process(target=run_dashboard_data_fetcher, daemon=True)
for name, config in strategy_configs.items():
if config.get("enabled", False):
# --- FIX: Check for the 'class' key, not the 'script' key ---
if 'class' not in config:
logging.error(f"Strategy '{name}' is missing 'class' key. Skipping.")
continue
proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True)
proc = multiprocessing.Process(target=run_strategy, args=(name, config, trade_signal_queue), daemon=True)
processes[f"Strategy: {name}"] = proc
for name, proc in processes.items():
@ -424,6 +476,6 @@ if __name__ == "__main__":
if proc.is_alive(): proc.terminate()
for proc in processes.values():
if proc.is_alive(): proc.join()
logging.info("Shutdown complete.")
sys.exit(0)
logging.info("Shutdown complete.")
sys.exit(0)

View File

@ -5,8 +5,12 @@ import os
import logging
from datetime import datetime, timezone
import sqlite3
import multiprocessing
import time
from logging_utils import setup_logging
from hyperliquid.info import Info
from hyperliquid.utils import constants
class BaseStrategy(ABC):
"""
@ -14,20 +18,23 @@ class BaseStrategy(ABC):
It provides common functionality like loading data, saving status, and state management.
"""
def __init__(self, strategy_name: str, params: dict):
# Note: log_level is not needed here as logging is set up by the process
def __init__(self, strategy_name: str, params: dict, trade_signal_queue: multiprocessing.Queue = None, shared_status: dict = None):
self.strategy_name = strategy_name
self.params = params
self.trade_signal_queue = trade_signal_queue
# Optional multiprocessing.Manager().dict() to hold live status (avoids file IO)
self.shared_status = shared_status
self.coin = params.get("coin", "N/A")
self.timeframe = params.get("timeframe", "N/A")
self.db_path = os.path.join("_data", "market_data.db")
self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json")
self.current_signal = "INIT"
self.last_signal_change_utc = None
self.signal_price = None
logging.info(f"Initializing with parameters: {self.params}")
# Note: Logging is set up by the run_strategy function
def load_data(self) -> pd.DataFrame:
"""Loads historical data for the configured coin and timeframe."""
@ -53,27 +60,41 @@ class BaseStrategy(ABC):
"""The core logic of the strategy. Must be implemented by child classes."""
pass
def calculate_signals_and_state(self, df: pd.DataFrame):
def calculate_signals_and_state(self, df: pd.DataFrame) -> bool:
"""
A wrapper that calls the strategy's signal calculation and then
determines the last signal change from the historical data.
A wrapper that calls the strategy's signal calculation, determines
the last signal change, and returns True if the signal has changed.
"""
df_with_signals = self.calculate_signals(df)
df_with_signals.dropna(inplace=True)
if df_with_signals.empty: return
if df_with_signals.empty:
return False
df_with_signals['position_change'] = df_with_signals['signal'].diff()
last_signal = df_with_signals['signal'].iloc[-1]
if last_signal == 1: self.current_signal = "BUY"
elif last_signal == -1: self.current_signal = "SELL"
else: self.current_signal = "HOLD"
last_signal_int = df_with_signals['signal'].iloc[-1]
new_signal_str = "HOLD"
if last_signal_int == 1: new_signal_str = "BUY"
elif last_signal_int == -1: new_signal_str = "SELL"
last_change_series = df_with_signals[df_with_signals['position_change'] != 0]
if not last_change_series.empty:
last_change_row = last_change_series.iloc[-1]
self.last_signal_change_utc = last_change_row.name.tz_localize('UTC').isoformat()
self.signal_price = last_change_row['close']
signal_changed = False
if self.current_signal == "INIT":
if new_signal_str == "BUY": self.current_signal = "INIT_BUY"
elif new_signal_str == "SELL": self.current_signal = "INIT_SELL"
else: self.current_signal = "HOLD"
signal_changed = True
elif new_signal_str != self.current_signal:
self.current_signal = new_signal_str
signal_changed = True
if signal_changed:
last_change_series = df_with_signals[df_with_signals['position_change'] != 0]
if not last_change_series.empty:
last_change_row = last_change_series.iloc[-1]
self.last_signal_change_utc = last_change_row.name.tz_localize('UTC').isoformat()
self.signal_price = last_change_row['close']
return signal_changed
def _save_status(self):
"""Saves the current strategy state to its JSON file."""
@ -84,9 +105,62 @@ class BaseStrategy(ABC):
"signal_price": self.signal_price,
"last_checked_utc": datetime.now(timezone.utc).isoformat()
}
# If a shared status dict is provided (Manager.dict()), update it instead of writing files
try:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
if self.shared_status is not None:
try:
# store the status under the strategy name for easy lookup
self.shared_status[self.strategy_name] = status
except Exception:
# Manager proxies may not accept nested mutable objects consistently; assign a copy
self.shared_status[self.strategy_name] = dict(status)
else:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
except IOError as e:
logging.error(f"Failed to write status file for {self.strategy_name}: {e}")
def run_polling_loop(self):
"""
The default execution loop for polling-based strategies (e.g., SMAs).
"""
while True:
df = self.load_data()
if df.empty:
logging.warning("No data loaded. Waiting 1 minute...")
time.sleep(60)
continue
signal_changed = self.calculate_signals_and_state(df.copy())
self._save_status()
if signal_changed or self.current_signal == "INIT_BUY" or self.current_signal == "INIT_SELL":
logging.warning(f"New signal detected: {self.current_signal}")
self.trade_signal_queue.put({
"strategy_name": self.strategy_name,
"signal": self.current_signal,
"coin": self.coin,
"signal_price": self.signal_price,
"config": {"agent": self.params.get("agent"), "parameters": self.params}
})
if self.current_signal == "INIT_BUY": self.current_signal = "BUY"
if self.current_signal == "INIT_SELL": self.current_signal = "SELL"
logging.info(f"Current Signal: {self.current_signal}")
time.sleep(60)
def run_event_loop(self):
"""
A placeholder for event-driven (WebSocket) strategies.
Child classes must override this.
"""
logging.error("run_event_loop() is not implemented for this strategy.")
time.sleep(3600) # Sleep for an hour to prevent rapid error loops
def on_fill_message(self, message):
"""
Placeholder for the WebSocket callback.
Child classes must override this.
"""
pass

View File

@ -0,0 +1,178 @@
import logging
import time
import json
from datetime import datetime, timezone
from hyperliquid.info import Info
from hyperliquid.utils import constants
from strategies.base_strategy import BaseStrategy
class CopyTraderStrategy(BaseStrategy):
"""
An event-driven strategy that monitors a target wallet address and
copies its trades for a specific set of allowed coins, using
per-coin size and leverage settings.
"""
def __init__(self, strategy_name: str, params: dict, trade_signal_queue, shared_status: dict = None):
super().__init__(strategy_name, params, trade_signal_queue, shared_status)
self.target_address = self.params.get("target_address", "").lower()
self.coins_to_copy = self.params.get("coins_to_copy", {})
self.allowed_coins = list(self.coins_to_copy.keys())
if not self.target_address:
logging.error("No 'target_address' specified in parameters for copy trader.")
raise ValueError("target_address is required")
if not self.allowed_coins:
logging.warning("No 'coins_to_copy' configured. This strategy will not copy any trades.")
self.info = None # Will be initialized in the run loop
# --- FIX: Set initial state to "WAIT" ---
self.current_signal = "WAIT"
# Record the strategy's start time to ignore historical data
self.start_time_utc = datetime.now(timezone.utc)
logging.info(f"Strategy initialized. Ignoring all trades before {self.start_time_utc.isoformat()}")
def calculate_signals(self, df):
# This strategy is event-driven, so it does not use polling-based signal calculation.
pass
def on_fill_message(self, message):
"""
This is the callback function that gets triggered by the WebSocket
every time the monitored address has an event.
"""
try:
channel = message.get("channel")
if channel not in ("user", "userFills", "userEvents"):
return
data = message.get("data")
if not data:
return
fills = data.get("fills", [])
if not fills:
return
user_address = data.get("user", "").lower()
if user_address != self.target_address:
return
logging.debug(f"Received {len(fills)} fill(s) for user {user_address}")
for fill in fills:
# Check if the trade is new or historical
trade_time = datetime.fromtimestamp(fill['time'] / 1000, tz=timezone.utc)
if trade_time < self.start_time_utc:
logging.info(f"Ignoring stale/historical trade from {trade_time.isoformat()}")
continue
coin = fill.get('coin')
if coin in self.allowed_coins:
side = fill.get('side')
price = float(fill.get('px'))
signal = "HOLD"
if side == "B":
signal = "BUY"
elif side == "A":
signal = "SELL"
coin_config = self.coins_to_copy.get(coin)
if not coin_config or not coin_config.get("size"):
logging.warning(f"No trade size specified for {coin}. Ignoring fill.")
continue
# --- 1. Create the trade-specific config ---
trade_params = self.params.copy()
trade_params.update(coin_config)
trade_config = {
"agent": self.params.get("agent"),
"parameters": trade_params
}
# --- 2. (PRIORITY) Put the signal on the queue for the executor ---
self.trade_signal_queue.put({
"strategy_name": self.strategy_name,
"signal": signal,
"coin": coin,
"signal_price": price,
"config": trade_config
})
# --- 3. (Secondary) Update internal state and log ---
self.current_signal = signal
self.signal_price = price
self.last_signal_change_utc = trade_time.isoformat()
self._save_status() # Update the dashboard status file
logging.warning(f"Copy trade signal SENT for {coin}: {signal} @ {price}, Size: {coin_config['size']}")
logging.info(f"Source trade logged: {json.dumps(fill)}")
else:
logging.info(f"Ignoring fill for unmonitored coin: {coin}")
except Exception as e:
logging.error(f"Error in on_fill_message: {e}", exc_info=True)
def _connect_and_subscribe(self):
"""
Establishes a new WebSocket connection and subscribes to the userFills channel.
"""
try:
logging.info("Connecting to Hyperliquid WebSocket...")
self.info = Info(constants.MAINNET_API_URL, skip_ws=False)
subscription = {"type": "userFills", "user": self.target_address}
self.info.subscribe(subscription, self.on_fill_message)
logging.info(f"Subscribed to 'userFills' for target address: {self.target_address}")
return True
except Exception as e:
logging.error(f"Failed to connect or subscribe: {e}")
self.info = None
return False
def run_event_loop(self):
"""
This method overrides the default polling loop. It establishes a
persistent WebSocket connection and runs a watchdog to ensure
it stays connected.
"""
if not self._connect_and_subscribe():
# If connection fails on start, wait 60s before letting the process restart
time.sleep(60)
return
# --- ADDED: Save the initial "WAIT" status ---
self._save_status()
while True:
try:
time.sleep(15) # Check the connection every 15 seconds
if self.info is None or not self.info.ws_manager.is_alive():
logging.error(f"WebSocket connection lost. Attempting to reconnect...")
if self.info and self.info.ws_manager:
try:
self.info.ws_manager.stop()
except Exception as e:
logging.error(f"Error stopping old ws_manager: {e}")
if not self._connect_and_subscribe():
logging.error("Reconnect failed, will retry in 15s.")
else:
logging.info("Successfully reconnected to WebSocket.")
# After reconnecting, save the current status again
self._save_status()
else:
logging.debug("Watchdog check: WebSocket connection is active.")
except Exception as e:
logging.error(f"An error occurred in the watchdog loop: {e}", exc_info=True)

View File

@ -7,8 +7,10 @@ class MaCrossStrategy(BaseStrategy):
A strategy based on a fast Simple Moving Average (SMA) crossing
a slow SMA.
"""
def __init__(self, strategy_name: str, params: dict, log_level: str):
super().__init__(strategy_name, params)
# --- FIX: Changed 3rd argument from log_level to trade_signal_queue ---
def __init__(self, strategy_name: str, params: dict, trade_signal_queue):
# --- FIX: Passed trade_signal_queue to the parent class ---
super().__init__(strategy_name, params, trade_signal_queue)
self.fast_ma_period = self.params.get('short_ma') or self.params.get('fast') or 0
self.slow_ma_period = self.params.get('long_ma') or self.params.get('slow') or 0
@ -26,4 +28,3 @@ class MaCrossStrategy(BaseStrategy):
df.loc[df['fast_sma'] < df['slow_sma'], 'signal'] = -1
return df

View File

@ -6,8 +6,10 @@ class SingleSmaStrategy(BaseStrategy):
"""
A strategy based on the price crossing a single Simple Moving Average (SMA).
"""
def __init__(self, strategy_name: str, params: dict):
super().__init__(strategy_name, params)
# --- FIX: Added trade_signal_queue to the constructor ---
def __init__(self, strategy_name: str, params: dict, trade_signal_queue):
# --- FIX: Passed trade_signal_queue to the parent class ---
super().__init__(strategy_name, params, trade_signal_queue)
self.sma_period = self.params.get('sma_period', 0)
def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame:
@ -23,4 +25,3 @@ class SingleSmaStrategy(BaseStrategy):
df.loc[df['close'] < df['sma'], 'signal'] = -1
return df

View File

@ -5,6 +5,7 @@ import sys
import json
import time
from datetime import datetime
import multiprocessing
from eth_account import Account
from hyperliquid.exchange import Exchange
@ -20,40 +21,41 @@ load_dotenv()
class TradeExecutor:
"""
Monitors strategy signals and executes trades using a multi-agent,
multi-strategy position management system. Each strategy's position is
tracked independently.
Monitors a shared queue for strategy signals and executes trades.
This script is now a dedicated, event-driven consumer.
"""
def __init__(self, log_level: str):
def __init__(self, log_level: str, trade_signal_queue: multiprocessing.Queue, shared_executor_status: dict = None):
setup_logging(log_level, 'TradeExecutor')
self.trade_signal_queue = trade_signal_queue
# Optional Manager.dict() to store live managed positions and other executor status
self.shared_executor_status = shared_executor_status
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.vault_address:
logging.error("MAIN_WALLET_ADDRESS not set.")
sys.exit(1)
# --- FIX: Raise an exception instead of sys.exit() ---
# This allows the main_app process manager to catch and log the error.
raise ValueError("MAIN_WALLET_ADDRESS not set in environment.")
# --- FIX: Corrected constant name from MAIN_NET_API_URL to MAINNET_API_URL ---
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
self.exchanges = self._load_agents()
if not self.exchanges:
logging.error("No trading agents found in .env file.")
sys.exit(1)
# --- FIX: Raise an exception instead of sys.exit() ---
raise ValueError("No trading agents found in .env file. Check AGENT_PRIVATE_KEY or _AGENT_PK vars.")
strategy_config_path = os.path.join("_data", "strategies.json")
try:
with open(strategy_config_path, 'r') as f:
self.strategy_configs = {name: config for name, config in json.load(f).items() if config.get("enabled")}
logging.info(f"Loaded {len(self.strategy_configs)} enabled strategies.")
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load strategies from '{strategy_config_path}': {e}")
sys.exit(1)
self.status_file_path = os.path.join("_logs", "trade_executor_status.json")
self.managed_positions_path = os.path.join("_data", "executor_managed_positions.json")
self.managed_positions = self._load_managed_positions()
logging.info(f"TradeExecutor initialized. Agents available: {list(self.exchanges.keys())}")
def _load_agents(self) -> dict:
"""Discovers and initializes agents from environment variables."""
"""
Discovers and initializes agents by scanning for environment variables.
"""
exchanges = {}
logging.info("Discovering agents from environment variables...")
for env_var, private_key in os.environ.items():
@ -74,10 +76,20 @@ class TradeExecutor:
def _load_managed_positions(self) -> dict:
"""Loads the state of which strategy manages which position."""
# Prefer shared in-memory state when available
try:
if self.shared_executor_status is not None:
mgr = self.shared_executor_status.get('managed_positions') if isinstance(self.shared_executor_status, dict) else None
if mgr:
logging.info("Loading managed positions from shared executor status.")
return dict(mgr)
except Exception:
logging.debug("Unable to read managed positions from shared status. Falling back to file.")
if os.path.exists(self.managed_positions_path):
try:
with open(self.managed_positions_path, 'r') as f:
logging.info("Loading existing managed positions state.")
logging.info("Loading existing managed positions state from file.")
return json.load(f)
except (IOError, json.JSONDecodeError):
logging.warning("Could not read managed positions file. Starting fresh.")
@ -86,115 +98,154 @@ class TradeExecutor:
def _save_managed_positions(self):
"""Saves the current state of managed positions."""
try:
with open(self.managed_positions_path, 'w') as f:
json.dump(self.managed_positions, f, indent=4)
if self.shared_executor_status is not None:
try:
# store under a known key
self.shared_executor_status['managed_positions'] = dict(self.managed_positions)
except Exception:
# fallback: try direct assignment
self.shared_executor_status['managed_positions'] = self.managed_positions
else:
with open(self.managed_positions_path, 'w') as f:
json.dump(self.managed_positions, f, indent=4)
except IOError as e:
logging.error(f"Failed to save managed positions state: {e}")
def _save_executor_status(self, perpetuals_state, spot_state, all_market_contexts):
"""Saves the current balances and open positions to a live status file."""
# This function is correct and does not need changes.
pass
def run(self):
"""The main execution loop with advanced position management."""
logging.info("Starting Trade Executor loop...")
"""
Main execution loop. Blocks and waits for a signal from the queue.
"""
logging.info("Trade Executor started. Waiting for signals...")
while True:
try:
perpetuals_state = self.info.user_state(self.vault_address)
open_positions_api = {pos['position'].get('coin'): pos['position'] for pos in perpetuals_state.get('assetPositions', []) if float(pos.get('position', {}).get('szi', 0)) != 0}
for name, config in self.strategy_configs.items():
coin = config['parameters'].get('coin')
size = config['parameters'].get('size')
# --- ADDED: Load leverage parameters from config ---
leverage_long = config['parameters'].get('leverage_long')
leverage_short = config['parameters'].get('leverage_short')
status_file = os.path.join("_data", f"strategy_status_{name}.json")
if not os.path.exists(status_file): continue
with open(status_file, 'r') as f: status = json.load(f)
desired_signal = status.get('current_signal')
current_position = self.managed_positions.get(name)
agent_name = config.get("agent", "default").lower()
exchange_to_use = self.exchanges.get(agent_name)
if not exchange_to_use:
logging.error(f"[{name}] Agent '{agent_name}' not found. Skipping trade.")
trade_signal = self.trade_signal_queue.get()
if not trade_signal:
continue
logging.info(f"Received signal: {trade_signal}")
# Basic validation and debug information to help trace gaps
if 'config' not in trade_signal:
logging.error(f"Signal missing 'config' key. Ignoring: {trade_signal}")
continue
if 'strategy_name' not in trade_signal:
logging.error(f"Signal missing 'strategy_name' key. Ignoring: {trade_signal}")
continue
# Special command handling
if isinstance(trade_signal, dict) and trade_signal.get('_cmd') == 'CLOSE_ALL':
target_agent = trade_signal.get('agent')
logging.warning(f"Received CLOSE_ALL command for agent: {target_agent}")
if not target_agent:
logging.error("CLOSE_ALL command missing 'agent' field. Ignoring.")
continue
# --- State Machine Logic with Configurable Leverage ---
if desired_signal == "BUY":
if not current_position:
if not all([size, leverage_long]):
logging.error(f"[{name}] 'size' or 'leverage_long' not defined. Skipping.")
continue
logging.warning(f"[{name}] ACTION: Open LONG for {coin} with {leverage_long}x leverage.")
exchange_to_use.update_leverage(int(leverage_long), coin)
exchange_to_use.market_open(coin, True, size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "long", "size": size}
log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=status.get('signal_price', 0), size=size, signal=desired_signal)
# Iterate managed positions and close those opened by the target agent
to_close = [s for s, v in self.managed_positions.items() if v.get('agent') == target_agent]
if not to_close:
logging.info(f"No managed positions found for agent '{target_agent}'.")
continue
elif current_position['side'] == 'short':
if not all([size, leverage_long]):
logging.error(f"[{name}] 'size' or 'leverage_long' not defined. Skipping.")
continue
for sname in to_close:
pos = self.managed_positions.get(sname)
if not pos:
continue
coin = pos.get('coin')
side = pos.get('side')
size = pos.get('size')
# Determine is_buy to neutralize the position
is_buy = True if side == 'short' else False
logging.warning(f"[CLOSE_ALL] Closing {side} position for strategy {sname}, coin {coin}, size {size}")
try:
# Use the agent's exchange if available
exch = self.exchanges.get(target_agent)
if exch:
exch.market_open(coin, is_buy, size, None, 0.01)
else:
logging.error(f"Exchange object for agent '{target_agent}' not found. Skipping live close for {sname}.")
except Exception as e:
logging.error(f"Error closing position for {sname}: {e}")
# remove from managed positions regardless to avoid stuck state
try:
del self.managed_positions[sname]
except KeyError:
pass
logging.warning(f"[{name}] ACTION: Close SHORT and open LONG for {coin} with {leverage_long}x leverage.")
exchange_to_use.update_leverage(int(leverage_long), coin)
exchange_to_use.market_open(coin, True, current_position['size'] + size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "long", "size": size}
log_trade(strategy=name, coin=coin, action="CLOSE_SHORT_&_REVERSE", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif desired_signal == "SELL":
if not current_position:
if not all([size, leverage_short]):
logging.error(f"[{name}] 'size' or 'leverage_short' not defined. Skipping.")
continue
logging.warning(f"[{name}] ACTION: Open SHORT for {coin} with {leverage_short}x leverage.")
exchange_to_use.update_leverage(int(leverage_short), coin)
exchange_to_use.market_open(coin, False, size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "short", "size": size}
log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif current_position['side'] == 'long':
if not all([size, leverage_short]):
logging.error(f"[{name}] 'size' or 'leverage_short' not defined. Skipping.")
continue
logging.warning(f"[{name}] ACTION: Close LONG and open SHORT for {coin} with {leverage_short}x leverage.")
exchange_to_use.update_leverage(int(leverage_short), coin)
exchange_to_use.market_open(coin, False, current_position['size'] + size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "short", "size": size}
log_trade(strategy=name, coin=coin, action="CLOSE_LONG_&_REVERSE", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif desired_signal == "FLAT":
if current_position:
logging.warning(f"[{name}] ACTION: Close {current_position['side']} position for {coin}.")
is_buy = current_position['side'] == 'short'
exchange_to_use.market_open(coin, is_buy, current_position['size'], None, 0.01)
del self.managed_positions[name]
log_trade(strategy=name, coin=coin, action=f"CLOSE_{current_position['side'].upper()}", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal)
self._save_managed_positions()
logging.info(f"CLOSE_ALL for agent '{target_agent}' completed.")
continue
name = trade_signal['strategy_name']
config = trade_signal['config']
params = config.get('parameters', {})
coin = trade_signal['coin']
desired_signal = trade_signal['signal']
status = trade_signal
size = params.get('size')
if size is None:
logging.error(f"[{name}] No 'size' in parameters: {params}. Skipping.")
continue
leverage_long = int(params.get('leverage_long', 2))
leverage_short = int(params.get('leverage_short', 2))
current_position = self.managed_positions.get(name)
agent_name = (config.get("agent") or "default").lower()
exchange_to_use = self.exchanges.get(agent_name)
if not exchange_to_use:
logging.error(f"[{name}] Agent '{agent_name}' not found. Available agents: {list(self.exchanges.keys())}. Skipping trade.")
continue
# --- State Machine Logic (now runs instantly on signal) ---
if desired_signal == "BUY" or desired_signal == "INIT_BUY":
if not current_position:
logging.warning(f"[{name}] ACTION: Setting leverage to {leverage_long}x and opening LONG for {coin}.")
exchange_to_use.update_leverage(leverage_long, coin)
exchange_to_use.market_open(coin, True, size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "long", "size": size}
log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif current_position['side'] == 'short':
logging.warning(f"[{name}] ACTION: Closing SHORT and opening LONG for {coin} with {leverage_long}x leverage.")
exchange_to_use.update_leverage(leverage_long, coin)
# 1. Close the short by buying back (this is a market_open, but is_buy=True)
exchange_to_use.market_open(coin, True, current_position['size'], None, 0.01)
log_trade(strategy=name, coin=coin, action="CLOSE_SHORT", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal)
# 2. Open the new long
exchange_to_use.market_open(coin, True, size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "long", "size": size}
log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif desired_signal == "SELL" or desired_signal == "INIT_SELL":
if not current_position:
logging.warning(f"[{name}] ACTION: Setting leverage to {leverage_short}x and opening SHORT for {coin}.")
exchange_to_use.update_leverage(leverage_short, coin)
exchange_to_use.market_open(coin, False, size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "short", "size": size}
log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif current_position['side'] == 'long':
logging.warning(f"[{name}] ACTION: Closing LONG and opening SHORT for {coin} with {leverage_short}x leverage.")
exchange_to_use.update_leverage(leverage_short, coin)
# 1. Close the long by selling
exchange_to_use.market_open(coin, False, current_position['size'], None, 0.01)
log_trade(strategy=name, coin=coin, action="CLOSE_LONG", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal)
# 2. Open the new short
exchange_to_use.market_open(coin, False, size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "short", "size": size}
# --- FIX: Corrected typo from 'signal.desired_signal' to 'signal=desired_signal' ---
log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif desired_signal == "FLAT":
if current_position:
logging.warning(f"[{name}] ACTION: Close {current_position['side']} position for {coin}.")
is_buy = current_position['side'] == 'short'
exchange_to_use.market_open(coin, is_buy, current_position['size'], None, 0.01)
del self.managed_positions[name]
log_trade(strategy=name, coin=coin, action=f"CLOSE_{current_position['side'].upper()}", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal)
self._save_managed_positions()
except Exception as e:
logging.error(f"An error occurred in the main executor loop: {e}")
logging.error(f"An error occurred in the main executor loop: {e}", exc_info=True)
time.sleep(1)
time.sleep(15)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run the Trade Executor.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
executor = TradeExecutor(log_level=args.log_level)
try:
executor.run()
except KeyboardInterrupt:
logging.info("Trade Executor stopped.")
# This script is no longer run directly, but is called by main_app.py

367
whale_tracker.py Normal file
View File

@ -0,0 +1,367 @@
import json
import os
import time
import requests
import logging
import argparse
import sys
from datetime import datetime, timedelta
# --- Configuration ---
# !! IMPORTANT: Update this to your actual Hyperliquid API endpoint !!
API_ENDPOINT = "https://api.hyperliquid.xyz/info"
INPUT_FILE = os.path.join("_data", "wallets_to_track.json")
OUTPUT_FILE = os.path.join("_data", "wallets_info.json")
LOGS_DIR = "_logs"
LOG_FILE = os.path.join(LOGS_DIR, "whale_tracker.log")
# Polling intervals (in seconds)
POLL_INTERVALS = {
'core_data': 10, # 5-15s range
'open_orders': 20, # 15-30s range
'account_metrics': 180, # 1-5m range
'ledger_updates': 600, # 5-15m range
'save_data': 5, # How often to write to wallets_info.json
'reload_wallets': 60 # Check for wallet list changes every 60s
}
class HyperliquidAPI:
"""
Client to handle POST requests to the Hyperliquid info endpoint.
"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
logging.info(f"API Client initialized for endpoint: {base_url}")
def post_request(self, payload):
"""
Internal helper to send POST requests and handle errors.
"""
try:
response = self.session.post(self.base_url, json=payload, timeout=10)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
return response.json()
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP Error: {e.response.status_code} for {e.request.url}. Response: {e.response.text}")
except requests.exceptions.ConnectionError as e:
logging.error(f"Connection Error: {e}")
except requests.exceptions.Timeout:
logging.error(f"Request timed out for payload: {payload.get('type')}")
except json.JSONDecodeError:
logging.error(f"Failed to decode JSON response. Response text: {response.text if 'response' in locals() else 'No response text'}")
except Exception as e:
logging.error(f"An unexpected error occurred in post_request: {e}", exc_info=True)
return None
def get_user_state(self, user_address: str):
payload = {"type": "clearinghouseState", "user": user_address}
return self.post_request(payload)
def get_open_orders(self, user_address: str):
payload = {"type": "openOrders", "user": user_address}
return self.post_request(payload)
def get_user_rate_limit(self, user_address: str):
payload = {"type": "userRateLimit", "user": user_address}
return self.post_request(payload)
def get_user_ledger_updates(self, user_address: str, start_time_ms: int, end_time_ms: int):
payload = {
"type": "userNonFundingLedgerUpdates",
"user": user_address,
"startTime": start_time_ms,
"endTime": end_time_ms
}
return self.post_request(payload)
class WalletTracker:
"""
Main class to track wallets, process data, and store results.
"""
def __init__(self, api_client, wallets_to_track):
self.api = api_client
self.wallets = wallets_to_track # This is the list of dicts
self.wallets_by_name = {w['name']: w for w in self.wallets}
self.wallets_data = {
wallet['name']: {"address": wallet['address']} for wallet in self.wallets
}
logging.info(f"WalletTracker initialized for {len(self.wallets)} wallets.")
def reload_wallets(self):
"""
Checks the INPUT_FILE for changes and updates the tracked wallet list.
"""
logging.debug("Reloading wallet list...")
try:
with open(INPUT_FILE, 'r') as f:
new_wallets_list = json.load(f)
if not isinstance(new_wallets_list, list):
logging.warning(f"Failed to reload '{INPUT_FILE}': content is not a list.")
return
new_wallets_by_name = {w['name']: w for w in new_wallets_list}
old_names = set(self.wallets_by_name.keys())
new_names = set(new_wallets_by_name.keys())
added_names = new_names - old_names
removed_names = old_names - new_names
if not added_names and not removed_names:
logging.debug("Wallet list is unchanged.")
return # No changes
# Update internal wallet list
self.wallets = new_wallets_list
self.wallets_by_name = new_wallets_by_name
# Add new wallets to wallets_data
for name in added_names:
self.wallets_data[name] = {"address": self.wallets_by_name[name]['address']}
logging.info(f"Added new wallet to track: {name}")
# Remove old wallets from wallets_data
for name in removed_names:
if name in self.wallets_data:
del self.wallets_data[name]
logging.info(f"Removed wallet from tracking: {name}")
logging.info(f"Wallet list reloaded. Tracking {len(self.wallets)} wallets.")
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
logging.error(f"Failed to reload and parse '{INPUT_FILE}': {e}")
except Exception as e:
logging.error(f"Unexpected error during wallet reload: {e}", exc_info=True)
def calculate_core_metrics(self, state_data: dict) -> dict:
"""
Performs calculations based on user_state data.
"""
if not state_data or 'crossMarginSummary' not in state_data:
logging.warning("Core state data is missing 'crossMarginSummary'.")
return {"raw_state": state_data}
summary = state_data['crossMarginSummary']
account_value = float(summary.get('accountValue', 0))
margin_used = float(summary.get('totalMarginUsed', 0))
# Calculations
margin_utilization = (margin_used / account_value) if account_value > 0 else 0
available_margin = account_value - margin_used
total_position_value = 0
if 'assetPositions' in state_data:
for pos in state_data.get('assetPositions', []):
try:
# Use 'value' for position value
pos_value_str = pos.get('position', {}).get('value', '0')
total_position_value += float(pos_value_str)
except (ValueError, TypeError):
logging.warning(f"Could not parse position value: {pos.get('position', {}).get('value')}")
continue
portfolio_leverage = (total_position_value / account_value) if account_value > 0 else 0
# Return calculated metrics alongside raw data
return {
"raw_state": state_data,
"account_value": account_value,
"margin_used": margin_used,
"margin_utilization": margin_utilization,
"available_margin": available_margin,
"total_position_value": total_position_value,
"portfolio_leverage": portfolio_leverage
}
def poll_core_data(self):
logging.debug("Polling Core Data...")
# Use self.wallets which is updated by reload_wallets
for wallet in self.wallets:
name = wallet['name']
address = wallet['address']
state_data = self.api.get_user_state(address)
if state_data:
calculated_data = self.calculate_core_metrics(state_data)
# Ensure wallet hasn't been removed by a concurrent reload
if name in self.wallets_data:
self.wallets_data[name]['core_state'] = calculated_data
time.sleep(0.1) # Avoid bursting requests
def poll_open_orders(self):
logging.debug("Polling Open Orders...")
for wallet in self.wallets:
name = wallet['name']
address = wallet['address']
orders_data = self.api.get_open_orders(address)
if orders_data:
# TODO: Add calculations for 'pending_margin_required' if logic is available
if name in self.wallets_data:
self.wallets_data[name]['open_orders'] = {"raw_orders": orders_data}
time.sleep(0.1)
def poll_account_metrics(self):
logging.debug("Polling Account Metrics...")
for wallet in self.wallets:
name = wallet['name']
address = wallet['address']
metrics_data = self.api.get_user_rate_limit(address)
if metrics_data:
if name in self.wallets_data:
self.wallets_data[name]['account_metrics'] = metrics_data
time.sleep(0.1)
def poll_ledger_updates(self):
logging.debug("Polling Ledger Updates...")
end_time_ms = int(datetime.now().timestamp() * 1000)
start_time_ms = int((datetime.now() - timedelta(minutes=15)).timestamp() * 1000)
for wallet in self.wallets:
name = wallet['name']
address = wallet['address']
ledger_data = self.api.get_user_ledger_updates(address, start_time_ms, end_time_ms)
if ledger_data:
if name in self.wallets_data:
self.wallets_data[name]['ledger_updates'] = ledger_data
time.sleep(0.1)
def save_data_to_json(self):
"""
Atomically writes the current wallet data to the output JSON file.
(No longer needs cleaning logic)
"""
logging.debug(f"Saving data to {OUTPUT_FILE}...")
temp_file = OUTPUT_FILE + ".tmp"
try:
# Save the data
with open(temp_file, 'w', encoding='utf-8') as f:
# self.wallets_data is automatically kept clean by reload_wallets
json.dump(self.wallets_data, f, indent=2)
# Atomic rename (move)
os.replace(temp_file, OUTPUT_FILE)
except (IOError, json.JSONDecodeError) as e:
logging.error(f"Failed to write wallet data to file: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during file save: {e}")
if os.path.exists(temp_file):
os.remove(temp_file)
class WhaleTrackerRunner:
"""
Manages the polling loop using last-run timestamps instead of a complex scheduler.
"""
def __init__(self, api_client, wallets, shared_whale_data_dict=None): # Kept arg for compatibility
self.tracker = WalletTracker(api_client, wallets)
self.last_poll_times = {key: 0 for key in POLL_INTERVALS}
self.poll_intervals = POLL_INTERVALS
logging.info("WhaleTrackerRunner initialized to save to JSON file.")
def update_shared_data(self):
"""
This function is no longer called by the run loop.
It's kept here to prevent errors if imported elsewhere, but is now unused.
"""
logging.debug("No shared dict, saving data to JSON file.")
self.tracker.save_data_to_json()
def run(self):
logging.info("Starting main polling loop...")
while True:
try:
now = time.time()
if now - self.last_poll_times['reload_wallets'] > self.poll_intervals['reload_wallets']:
self.tracker.reload_wallets()
self.last_poll_times['reload_wallets'] = now
if now - self.last_poll_times['core_data'] > self.poll_intervals['core_data']:
self.tracker.poll_core_data()
self.last_poll_times['core_data'] = now
if now - self.last_poll_times['open_orders'] > self.poll_intervals['open_orders']:
self.tracker.poll_open_orders()
self.last_poll_times['open_orders'] = now
if now - self.last_poll_times['account_metrics'] > self.poll_intervals['account_metrics']:
self.tracker.poll_account_metrics()
self.last_poll_times['account_metrics'] = now
if now - self.last_poll_times['ledger_updates'] > self.poll_intervals['ledger_updates']:
self.tracker.poll_ledger_updates()
self.last_poll_times['ledger_updates'] = now
if now - self.last_poll_times['save_data'] > self.poll_intervals['save_data']:
self.tracker.save_data_to_json() # <-- NEW
self.last_poll_times['save_data'] = now
# Sleep for a short duration to prevent busy-waiting
time.sleep(1)
except Exception as e:
logging.critical(f"Unhandled exception in main loop: {e}", exc_info=True)
time.sleep(10)
def setup_logging(log_level_str: str, process_name: str):
"""Configures logging for the script."""
if not os.path.exists(LOGS_DIR):
try:
os.makedirs(LOGS_DIR)
except OSError as e:
print(f"Failed to create logs directory {LOGS_DIR}: {e}")
return
level_map = {
'debug': logging.DEBUG,
'normal': logging.INFO,
'off': logging.NOTSET
}
log_level = level_map.get(log_level_str.lower(), logging.INFO)
if log_level == logging.NOTSET:
return
handlers_list = [logging.FileHandler(LOG_FILE, mode='a')]
if sys.stdout.isatty():
handlers_list.append(logging.StreamHandler(sys.stdout))
logging.basicConfig(
level=log_level,
format=f"%(asctime)s.%(msecs)03d | {process_name:<20} | %(levelname)-8s | %(message)s",
datefmt='%Y-%m-%d %H:%M:%S',
handlers=handlers_list
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Hyperliquid Whale Tracker")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
setup_logging(args.log_level, "WhaleTracker")
# Load wallets to track
wallets_to_track = []
try:
with open(INPUT_FILE, 'r') as f:
wallets_to_track = json.load(f)
if not isinstance(wallets_to_track, list) or not wallets_to_track:
raise ValueError(f"'{INPUT_FILE}' is empty or not a list.")
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
logging.critical(f"Failed to load '{INPUT_FILE}': {e}. Exiting.")
sys.exit(1)
# Initialize API client
api_client = HyperliquidAPI(base_url=API_ENDPOINT)
# Initialize and run the tracker
runner = WhaleTrackerRunner(api_client, wallets_to_track, shared_whale_data_dict=None)
try:
runner.run()
except KeyboardInterrupt:
logging.info("Whale Tracker shutting down.")
sys.exit(0)