166 lines
6.9 KiB
Python
166 lines
6.9 KiB
Python
from abc import ABC, abstractmethod
|
|
import pandas as pd
|
|
import json
|
|
import os
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
import sqlite3
|
|
import multiprocessing
|
|
import time
|
|
|
|
from logging_utils import setup_logging
|
|
from hyperliquid.info import Info
|
|
from hyperliquid.utils import constants
|
|
|
|
class BaseStrategy(ABC):
|
|
"""
|
|
An abstract base class that defines the blueprint for all trading strategies.
|
|
It provides common functionality like loading data, saving status, and state management.
|
|
"""
|
|
|
|
def __init__(self, strategy_name: str, params: dict, trade_signal_queue: multiprocessing.Queue = None, shared_status: dict = None):
|
|
self.strategy_name = strategy_name
|
|
self.params = params
|
|
self.trade_signal_queue = trade_signal_queue
|
|
# Optional multiprocessing.Manager().dict() to hold live status (avoids file IO)
|
|
self.shared_status = shared_status
|
|
|
|
self.coin = params.get("coin", "N/A")
|
|
self.timeframe = params.get("timeframe", "N/A")
|
|
self.db_path = os.path.join("_data", "market_data.db")
|
|
self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json")
|
|
|
|
self.current_signal = "INIT"
|
|
self.last_signal_change_utc = None
|
|
self.signal_price = None
|
|
|
|
# Note: Logging is set up by the run_strategy function
|
|
|
|
def load_data(self) -> pd.DataFrame:
|
|
"""Loads historical data for the configured coin and timeframe."""
|
|
table_name = f"{self.coin}_{self.timeframe}"
|
|
|
|
periods = [v for k, v in self.params.items() if 'period' in k or '_ma' in k or 'slow' in k or 'fast' in k]
|
|
limit = max(periods) + 50 if periods else 500
|
|
|
|
try:
|
|
with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn:
|
|
query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}'
|
|
df = pd.read_sql(query, conn, parse_dates=['datetime_utc'])
|
|
if df.empty: return pd.DataFrame()
|
|
df.set_index('datetime_utc', inplace=True)
|
|
df.sort_index(inplace=True)
|
|
return df
|
|
except Exception as e:
|
|
logging.error(f"Failed to load data from table '{table_name}': {e}")
|
|
return pd.DataFrame()
|
|
|
|
@abstractmethod
|
|
def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""The core logic of the strategy. Must be implemented by child classes."""
|
|
pass
|
|
|
|
def calculate_signals_and_state(self, df: pd.DataFrame) -> bool:
|
|
"""
|
|
A wrapper that calls the strategy's signal calculation, determines
|
|
the last signal change, and returns True if the signal has changed.
|
|
"""
|
|
df_with_signals = self.calculate_signals(df)
|
|
df_with_signals.dropna(inplace=True)
|
|
if df_with_signals.empty:
|
|
return False
|
|
|
|
df_with_signals['position_change'] = df_with_signals['signal'].diff()
|
|
|
|
last_signal_int = df_with_signals['signal'].iloc[-1]
|
|
new_signal_str = "HOLD"
|
|
if last_signal_int == 1: new_signal_str = "BUY"
|
|
elif last_signal_int == -1: new_signal_str = "SELL"
|
|
|
|
signal_changed = False
|
|
if self.current_signal == "INIT":
|
|
if new_signal_str == "BUY": self.current_signal = "INIT_BUY"
|
|
elif new_signal_str == "SELL": self.current_signal = "INIT_SELL"
|
|
else: self.current_signal = "HOLD"
|
|
signal_changed = True
|
|
elif new_signal_str != self.current_signal:
|
|
self.current_signal = new_signal_str
|
|
signal_changed = True
|
|
|
|
if signal_changed:
|
|
last_change_series = df_with_signals[df_with_signals['position_change'] != 0]
|
|
if not last_change_series.empty:
|
|
last_change_row = last_change_series.iloc[-1]
|
|
self.last_signal_change_utc = last_change_row.name.tz_localize('UTC').isoformat()
|
|
self.signal_price = last_change_row['close']
|
|
|
|
return signal_changed
|
|
|
|
def _save_status(self):
|
|
"""Saves the current strategy state to its JSON file."""
|
|
status = {
|
|
"strategy_name": self.strategy_name,
|
|
"current_signal": self.current_signal,
|
|
"last_signal_change_utc": self.last_signal_change_utc,
|
|
"signal_price": self.signal_price,
|
|
"last_checked_utc": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
# If a shared status dict is provided (Manager.dict()), update it instead of writing files
|
|
try:
|
|
if self.shared_status is not None:
|
|
try:
|
|
# store the status under the strategy name for easy lookup
|
|
self.shared_status[self.strategy_name] = status
|
|
except Exception:
|
|
# Manager proxies may not accept nested mutable objects consistently; assign a copy
|
|
self.shared_status[self.strategy_name] = dict(status)
|
|
else:
|
|
with open(self.status_file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(status, f, indent=4)
|
|
except IOError as e:
|
|
logging.error(f"Failed to write status file for {self.strategy_name}: {e}")
|
|
|
|
def run_polling_loop(self):
|
|
"""
|
|
The default execution loop for polling-based strategies (e.g., SMAs).
|
|
"""
|
|
while True:
|
|
df = self.load_data()
|
|
if df.empty:
|
|
logging.warning("No data loaded. Waiting 1 minute...")
|
|
time.sleep(60)
|
|
continue
|
|
|
|
signal_changed = self.calculate_signals_and_state(df.copy())
|
|
self._save_status()
|
|
|
|
if signal_changed or self.current_signal == "INIT_BUY" or self.current_signal == "INIT_SELL":
|
|
logging.warning(f"New signal detected: {self.current_signal}")
|
|
self.trade_signal_queue.put({
|
|
"strategy_name": self.strategy_name,
|
|
"signal": self.current_signal,
|
|
"coin": self.coin,
|
|
"signal_price": self.signal_price,
|
|
"config": {"agent": self.params.get("agent"), "parameters": self.params}
|
|
})
|
|
if self.current_signal == "INIT_BUY": self.current_signal = "BUY"
|
|
if self.current_signal == "INIT_SELL": self.current_signal = "SELL"
|
|
|
|
logging.info(f"Current Signal: {self.current_signal}")
|
|
time.sleep(60)
|
|
|
|
def run_event_loop(self):
|
|
"""
|
|
A placeholder for event-driven (WebSocket) strategies.
|
|
Child classes must override this.
|
|
"""
|
|
logging.error("run_event_loop() is not implemented for this strategy.")
|
|
time.sleep(3600) # Sleep for an hour to prevent rapid error loops
|
|
|
|
def on_fill_message(self, message):
|
|
"""
|
|
Placeholder for the WebSocket callback.
|
|
Child classes must override this.
|
|
"""
|
|
pass
|