Files
hyper/strategy_sma_cross.py
2025-10-15 18:32:12 +02:00

189 lines
7.8 KiB
Python

import argparse
import logging
import sys
import time
import pandas as pd
import sqlite3
import json
import os
from datetime import datetime, timezone, timedelta
from logging_utils import setup_logging
class SmaCrossStrategy:
"""
A strategy that generates BUY/SELL signals based on the price crossing
a Simple Moving Average (SMA). It runs its logic precisely once per candle.
"""
def __init__(self, strategy_name: str, params: dict, log_level: str):
self.strategy_name = strategy_name
self.params = params
self.coin = params.get("coin", "N/A")
self.timeframe = params.get("timeframe", "N/A")
self.sma_period = params.get("sma_period", 20) # Default to 20 if not specified
self.db_path = os.path.join("_data", "market_data.db")
self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json")
# Strategy state variables
self.current_signal = "INIT"
self.last_signal_change_utc = None
self.signal_price = None
self.indicator_value = None
setup_logging(log_level, f"Strategy-{self.strategy_name}")
logging.info(f"Initializing SMA Cross strategy with parameters:")
for key, value in self.params.items():
logging.info(f" - {key}: {value}")
def load_data(self) -> pd.DataFrame:
"""Loads historical data, ensuring enough for SMA calculation."""
table_name = f"{self.coin}_{self.timeframe}"
# We need at least sma_period + 1 rows to check the previous state
limit = self.sma_period + 50
try:
with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn:
query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}'
df = pd.read_sql(query, conn)
if df.empty: return pd.DataFrame()
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
df.set_index('datetime_utc', inplace=True)
df.sort_index(inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data from table '{table_name}': {e}")
return pd.DataFrame()
def _calculate_signals(self, data: pd.DataFrame):
"""
Analyzes historical data to find the last SMA crossover event.
"""
if len(data) < self.sma_period + 1:
self.current_signal = "INSUFFICIENT DATA"
return
# Calculate SMA
data['sma'] = data['close'].rolling(window=self.sma_period).mean()
self.indicator_value = data['sma'].iloc[-1]
# Determine position relative to SMA: 1 for above (long), -1 for below (short)
data['position'] = 0
data.loc[data['close'] > data['sma'], 'position'] = 1
data.loc[data['close'] < data['sma'], 'position'] = -1
# A crossover is when the position on this candle is different from the last
data['crossover'] = data['position'].diff()
# Get the latest signal based on the last position
last_position = data['position'].iloc[-1]
if last_position == 1: self.current_signal = "BUY"
elif last_position == -1: self.current_signal = "SELL"
else: self.current_signal = "HOLD"
# Find the most recent crossover event in the historical data
last_cross_series = data[data['crossover'] != 0]
if not last_cross_series.empty:
last_cross_row = last_cross_series.iloc[-1]
self.last_signal_change_utc = last_cross_row.name.tz_localize('UTC').isoformat()
self.signal_price = last_cross_row['close']
# Refine the signal to be the one *at the time of the cross*
if last_cross_row['position'] == 1: self.current_signal = "BUY"
elif last_cross_row['position'] == -1: self.current_signal = "SELL"
else:
# If no crosses in history, the signal has been consistent
self.last_signal_change_utc = data.index[0].tz_localize('UTC').isoformat()
self.signal_price = data['close'].iloc[0]
def _save_status(self):
"""Saves the current strategy state to its JSON file."""
status = {
"strategy_name": self.strategy_name,
"current_signal": self.current_signal,
"last_signal_change_utc": self.last_signal_change_utc,
"signal_price": self.signal_price,
"last_checked_utc": datetime.now(timezone.utc).isoformat()
}
try:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
except IOError as e:
logging.error(f"Failed to write status file: {e}")
def get_sleep_duration(self) -> int:
"""Calculates seconds to sleep until the next full candle closes."""
tf_value = int(''.join(filter(str.isdigit, self.timeframe)))
tf_unit = ''.join(filter(str.isalpha, self.timeframe))
if tf_unit == 'm': interval_seconds = tf_value * 60
elif tf_unit == 'h': interval_seconds = tf_value * 3600
elif tf_unit == 'd': interval_seconds = tf_value * 86400
else: return 60 # Default to 1 minute if unknown
now = datetime.now(timezone.utc)
timestamp = now.timestamp()
# Calculate the timestamp of the *next* candle close
next_candle_ts = ((timestamp // interval_seconds) + 1) * interval_seconds
# Add a small buffer (e.g., 5 seconds) to ensure the candle data is available
sleep_seconds = (next_candle_ts - timestamp) + 5
logging.info(f"Next candle closes at {datetime.fromtimestamp(next_candle_ts, tz=timezone.utc)}. "
f"Sleeping for {sleep_seconds:.2f} seconds.")
return sleep_seconds
def run_logic(self):
"""Main loop: loads data, calculates signals, saves status, and sleeps."""
logging.info(f"Starting SMA Cross logic loop for {self.coin} on {self.timeframe} timeframe.")
while True:
data = self.load_data()
if data.empty:
logging.warning("No data loaded. Waiting 1 minute before retrying...")
self.current_signal = "NO DATA"
self._save_status()
time.sleep(60)
continue
self._calculate_signals(data)
self._save_status()
# --- ADDED: More detailed logging for the current cycle ---
last_close = data['close'].iloc[-1]
indicator_val_str = f"{self.indicator_value:.4f}" if self.indicator_value is not None else "N/A"
logging.info(
f"Signal: {self.current_signal} | "
f"Price: {last_close:.4f} | "
f"SMA({self.sma_period}): {indicator_val_str}"
)
sleep_time = self.get_sleep_duration()
time.sleep(sleep_time)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run an SMA Crossover trading strategy.")
parser.add_argument("--name", required=True, help="The name of the strategy instance from the config.")
parser.add_argument("--params", required=True, help="A JSON string of the strategy's parameters.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
try:
strategy_params = json.loads(args.params)
strategy = SmaCrossStrategy(
strategy_name=args.name,
params=strategy_params,
log_level=args.log_level
)
strategy.run_logic()
except KeyboardInterrupt:
logging.info("Strategy process stopped.")
except Exception as e:
logging.error(f"A critical error occurred: {e}")
sys.exit(1)