Files
hyper/strategies/base_strategy.py
2025-10-25 21:51:25 +02:00

93 lines
3.8 KiB
Python

from abc import ABC, abstractmethod
import pandas as pd
import json
import os
import logging
from datetime import datetime, timezone
import sqlite3
from logging_utils import setup_logging
class BaseStrategy(ABC):
"""
An abstract base class that defines the blueprint for all trading strategies.
It provides common functionality like loading data, saving status, and state management.
"""
def __init__(self, strategy_name: str, params: dict):
# Note: log_level is not needed here as logging is set up by the process
self.strategy_name = strategy_name
self.params = params
self.coin = params.get("coin", "N/A")
self.timeframe = params.get("timeframe", "N/A")
self.db_path = os.path.join("_data", "market_data.db")
self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json")
self.current_signal = "INIT"
self.last_signal_change_utc = None
self.signal_price = None
logging.info(f"Initializing with parameters: {self.params}")
def load_data(self) -> pd.DataFrame:
"""Loads historical data for the configured coin and timeframe."""
table_name = f"{self.coin}_{self.timeframe}"
periods = [v for k, v in self.params.items() if 'period' in k or '_ma' in k or 'slow' in k or 'fast' in k]
limit = max(periods) + 50 if periods else 500
try:
with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn:
query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}'
df = pd.read_sql(query, conn, parse_dates=['datetime_utc'])
if df.empty: return pd.DataFrame()
df.set_index('datetime_utc', inplace=True)
df.sort_index(inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data from table '{table_name}': {e}")
return pd.DataFrame()
@abstractmethod
def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame:
"""The core logic of the strategy. Must be implemented by child classes."""
pass
def calculate_signals_and_state(self, df: pd.DataFrame):
"""
A wrapper that calls the strategy's signal calculation and then
determines the last signal change from the historical data.
"""
df_with_signals = self.calculate_signals(df)
df_with_signals.dropna(inplace=True)
if df_with_signals.empty: return
df_with_signals['position_change'] = df_with_signals['signal'].diff()
last_signal = df_with_signals['signal'].iloc[-1]
if last_signal == 1: self.current_signal = "BUY"
elif last_signal == -1: self.current_signal = "SELL"
else: self.current_signal = "HOLD"
last_change_series = df_with_signals[df_with_signals['position_change'] != 0]
if not last_change_series.empty:
last_change_row = last_change_series.iloc[-1]
self.last_signal_change_utc = last_change_row.name.tz_localize('UTC').isoformat()
self.signal_price = last_change_row['close']
def _save_status(self):
"""Saves the current strategy state to its JSON file."""
status = {
"strategy_name": self.strategy_name,
"current_signal": self.current_signal,
"last_signal_change_utc": self.last_signal_change_utc,
"signal_price": self.signal_price,
"last_checked_utc": datetime.now(timezone.utc).isoformat()
}
try:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
except IOError as e:
logging.error(f"Failed to write status file for {self.strategy_name}: {e}")