Files
hyper/strategy_sma_cross.py
2025-10-18 15:10:46 +02:00

220 lines
9.4 KiB
Python

import argparse
import logging
import sys
import time
import pandas as pd
import sqlite3
import json
import os
from datetime import datetime, timezone, timedelta
from logging_utils import setup_logging
class SmaCrossStrategy:
"""
A flexible strategy that can operate in two modes:
1. Fast SMA / Slow SMA Crossover (if both 'fast' and 'slow' params are set)
2. Price / Single SMA Crossover (if only one 'fast' or 'slow' param is set)
"""
def __init__(self, strategy_name: str, params: dict, log_level: str):
self.strategy_name = strategy_name
self.params = params
self.coin = params.get("coin", "N/A")
self.timeframe = params.get("timeframe", "N/A")
# Load fast and slow SMA periods, defaulting to 0 if not present
self.fast_ma_period = params.get("fast", 0)
self.slow_ma_period = params.get("slow", 0)
self.db_path = os.path.join("_data", "market_data.db")
self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json")
# Strategy state variables
self.current_signal = "INIT"
self.last_signal_change_utc = None
self.signal_price = None
self.fast_ma_value = None
self.slow_ma_value = None
setup_logging(log_level, f"Strategy-{self.strategy_name}")
logging.info(f"Initializing SMA Crossover strategy with parameters:")
for key, value in self.params.items():
logging.info(f" - {key}: {value}")
def load_data(self) -> pd.DataFrame:
"""Loads historical data, ensuring enough for the longest SMA calculation."""
table_name = f"{self.coin}_{self.timeframe}"
# Determine the longest period needed for calculations
longest_period = max(self.fast_ma_period or 0, self.slow_ma_period or 0)
if longest_period == 0:
logging.error("No valid SMA periods ('fast' or 'slow' > 0) are defined in parameters.")
return pd.DataFrame()
limit = longest_period + 50
try:
with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn:
query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}'
df = pd.read_sql(query, conn)
if df.empty: return pd.DataFrame()
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
df.set_index('datetime_utc', inplace=True)
df.sort_index(inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data from table '{table_name}': {e}")
return pd.DataFrame()
def _calculate_signals(self, data: pd.DataFrame):
"""
Analyzes historical data to find the last crossover event based on the
configured parameters (either dual or single SMA mode).
"""
# --- DUAL SMA CROSSOVER LOGIC ---
if self.fast_ma_period and self.slow_ma_period:
if len(data) < self.slow_ma_period + 1:
self.current_signal = "INSUFFICIENT DATA"
return
data['fast_sma'] = data['close'].rolling(window=self.fast_ma_period).mean()
data['slow_sma'] = data['close'].rolling(window=self.slow_ma_period).mean()
self.fast_ma_value = data['fast_sma'].iloc[-1]
self.slow_ma_value = data['slow_sma'].iloc[-1]
# Position is 1 for Golden Cross (fast > slow), -1 for Death Cross
data['position'] = 0
data.loc[data['fast_sma'] > data['slow_sma'], 'position'] = 1
data.loc[data['fast_sma'] < data['slow_sma'], 'position'] = -1
# --- SINGLE SMA PRICE CROSS LOGIC ---
else:
sma_period = self.fast_ma_period or self.slow_ma_period
if len(data) < sma_period + 1:
self.current_signal = "INSUFFICIENT DATA"
return
data['sma'] = data['close'].rolling(window=sma_period).mean()
self.slow_ma_value = data['sma'].iloc[-1] # Use slow_ma_value to store the single SMA
self.fast_ma_value = None # Ensure fast is None
# Position is 1 when price is above SMA, -1 when below
data['position'] = 0
data.loc[data['close'] > data['sma'], 'position'] = 1
data.loc[data['close'] < data['sma'], 'position'] = -1
# --- COMMON LOGIC for determining signal and last change ---
data['crossover'] = data['position'].diff()
last_position = data['position'].iloc[-1]
if last_position == 1: self.current_signal = "BUY"
elif last_position == -1: self.current_signal = "SELL"
else: self.current_signal = "HOLD"
last_cross_series = data[data['crossover'] != 0]
if not last_cross_series.empty:
last_cross_row = last_cross_series.iloc[-1]
self.last_signal_change_utc = last_cross_row.name.tz_localize('UTC').isoformat()
self.signal_price = last_cross_row['close']
if last_cross_row['position'] == 1: self.current_signal = "BUY"
elif last_cross_row['position'] == -1: self.current_signal = "SELL"
else:
self.last_signal_change_utc = data.index[0].tz_localize('UTC').isoformat()
self.signal_price = data['close'].iloc[0]
def _save_status(self):
"""Saves the current strategy state to its JSON file."""
status = {
"strategy_name": self.strategy_name,
"current_signal": self.current_signal,
"last_signal_change_utc": self.last_signal_change_utc,
"signal_price": self.signal_price,
"last_checked_utc": datetime.now(timezone.utc).isoformat()
}
try:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
except IOError as e:
logging.error(f"Failed to write status file: {e}")
def get_sleep_duration(self) -> int:
"""Calculates seconds to sleep until the next full candle closes."""
tf_value = int(''.join(filter(str.isdigit, self.timeframe)))
tf_unit = ''.join(filter(str.isalpha, self.timeframe))
if tf_unit == 'm': interval_seconds = tf_value * 60
elif tf_unit == 'h': interval_seconds = tf_value * 3600
elif tf_unit == 'd': interval_seconds = tf_value * 86400
else: return 60
now = datetime.now(timezone.utc)
timestamp = now.timestamp()
next_candle_ts = ((timestamp // interval_seconds) + 1) * interval_seconds
sleep_seconds = (next_candle_ts - timestamp) + 5
logging.info(f"Next candle closes at {datetime.fromtimestamp(next_candle_ts, tz=timezone.utc)}. "
f"Sleeping for {sleep_seconds:.2f} seconds.")
return sleep_seconds
def run_logic(self):
"""Main loop: loads data, calculates signals, saves status, and sleeps."""
logging.info(f"Starting logic loop for {self.coin} on {self.timeframe} timeframe.")
while True:
data = self.load_data()
if data.empty:
logging.warning("No data loaded. Waiting 1 minute before retrying...")
self.current_signal = "NO DATA"
self._save_status()
time.sleep(60)
continue
self._calculate_signals(data)
self._save_status()
last_close = data['close'].iloc[-1]
# --- Log based on which mode the strategy is running in ---
if self.fast_ma_period and self.slow_ma_period:
fast_ma_str = f"{self.fast_ma_value:.4f}" if self.fast_ma_value is not None else "N/A"
slow_ma_str = f"{self.slow_ma_value:.4f}" if self.slow_ma_value is not None else "N/A"
logging.info(
f"Signal: {self.current_signal} | Price: {last_close:.4f} | "
f"Fast SMA({self.fast_ma_period}): {fast_ma_str} | Slow SMA({self.slow_ma_period}): {slow_ma_str}"
)
else:
sma_period = self.fast_ma_period or self.slow_ma_period
sma_val_str = f"{self.slow_ma_value:.4f}" if self.slow_ma_value is not None else "N/A"
logging.info(
f"Signal: {self.current_signal} | Price: {last_close:.4f} | "
f"SMA({sma_period}): {sma_val_str}"
)
sleep_time = self.get_sleep_duration()
time.sleep(sleep_time)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run an SMA Crossover trading strategy.")
parser.add_argument("--name", required=True, help="The name of the strategy instance from the config.")
parser.add_argument("--params", required=True, help="A JSON string of the strategy's parameters.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
try:
strategy_params = json.loads(args.params)
strategy = SmaCrossStrategy(
strategy_name=args.name,
params=strategy_params,
log_level=args.log_level
)
strategy.run_logic()
except KeyboardInterrupt:
logging.info("Strategy process stopped.")
except Exception as e:
logging.error(f"A critical error occurred: {e}")
sys.exit(1)