Files
hyper/strategy_template.py
2025-10-15 18:32:12 +02:00

187 lines
7.8 KiB
Python

import argparse
import logging
import sys
import time
import pandas as pd
import sqlite3
import json
import os
from datetime import datetime, timezone, timedelta
from logging_utils import setup_logging
class TradingStrategy:
"""
A template for a trading strategy that reads data from the SQLite database
and executes its logic in a loop, running once per candle.
"""
def __init__(self, strategy_name: str, params: dict, log_level: str):
self.strategy_name = strategy_name
self.params = params
self.coin = params.get("coin", "N/A")
self.timeframe = params.get("timeframe", "N/A")
self.db_path = os.path.join("_data", "market_data.db")
self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json")
# Strategy state variables
self.current_signal = "INIT"
self.last_signal_change_utc = None
self.signal_price = None
self.indicator_value = None
# Load strategy-specific parameters from config
self.rsi_period = params.get("rsi_period")
self.short_ma = params.get("short_ma")
self.long_ma = params.get("long_ma")
self.sma_period = params.get("sma_period")
setup_logging(log_level, f"Strategy-{self.strategy_name}")
logging.info(f"Initializing strategy with parameters: {self.params}")
def load_data(self) -> pd.DataFrame:
"""Loads historical data, ensuring enough for the longest indicator period."""
table_name = f"{self.coin}_{self.timeframe}"
limit = 500
# Determine required data limit based on the longest configured indicator
periods = [p for p in [self.sma_period, self.long_ma, self.rsi_period] if p is not None]
if periods:
limit = max(periods) + 50
try:
with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn:
query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}'
df = pd.read_sql(query, conn)
if df.empty: return pd.DataFrame()
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
df.set_index('datetime_utc', inplace=True)
df.sort_index(inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data from table '{table_name}': {e}")
return pd.DataFrame()
def _calculate_signals(self, data: pd.DataFrame):
"""
Analyzes historical data to find the last signal crossover event.
This method should be expanded to handle different strategy types.
"""
if self.sma_period:
if len(data) < self.sma_period + 1:
self.current_signal = "INSUFFICIENT DATA"
return
data['sma'] = data['close'].rolling(window=self.sma_period).mean()
self.indicator_value = data['sma'].iloc[-1]
data['position'] = 0
data.loc[data['close'] > data['sma'], 'position'] = 1
data.loc[data['close'] < data['sma'], 'position'] = -1
data['crossover'] = data['position'].diff()
last_position = data['position'].iloc[-1]
if last_position == 1: self.current_signal = "BUY"
elif last_position == -1: self.current_signal = "SELL"
else: self.current_signal = "HOLD"
last_cross_series = data[data['crossover'] != 0]
if not last_cross_series.empty:
last_cross_row = last_cross_series.iloc[-1]
self.last_signal_change_utc = last_cross_row.name.tz_localize('UTC').isoformat()
self.signal_price = last_cross_row['close']
if last_cross_row['position'] == 1: self.current_signal = "BUY"
elif last_cross_row['position'] == -1: self.current_signal = "SELL"
else:
self.last_signal_change_utc = data.index[0].tz_localize('UTC').isoformat()
self.signal_price = data['close'].iloc[0]
elif self.rsi_period:
logging.info(f"RSI logic not implemented for period {self.rsi_period}.")
self.current_signal = "NOT IMPLEMENTED"
elif self.short_ma and self.long_ma:
logging.info(f"MA Cross logic not implemented for {self.short_ma}/{self.long_ma}.")
self.current_signal = "NOT IMPLEMENTED"
def _save_status(self):
"""Saves the current strategy state to its JSON file."""
status = {
"strategy_name": self.strategy_name,
"current_signal": self.current_signal,
"last_signal_change_utc": self.last_signal_change_utc,
"signal_price": self.signal_price,
"last_checked_utc": datetime.now(timezone.utc).isoformat()
}
try:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
except IOError as e:
logging.error(f"Failed to write status file: {e}")
def get_sleep_duration(self) -> int:
"""Calculates seconds to sleep until the next full candle closes."""
if not self.timeframe: return 60
tf_value = int(''.join(filter(str.isdigit, self.timeframe)))
tf_unit = ''.join(filter(str.isalpha, self.timeframe))
if tf_unit == 'm': interval_seconds = tf_value * 60
elif tf_unit == 'h': interval_seconds = tf_value * 3600
elif tf_unit == 'd': interval_seconds = tf_value * 86400
else: return 60
now = datetime.now(timezone.utc)
timestamp = now.timestamp()
next_candle_ts = ((timestamp // interval_seconds) + 1) * interval_seconds
sleep_seconds = (next_candle_ts - timestamp) + 5
logging.info(f"Next candle closes at {datetime.fromtimestamp(next_candle_ts, tz=timezone.utc)}. "
f"Sleeping for {sleep_seconds:.2f} seconds.")
return sleep_seconds
def run_logic(self):
"""Main loop: loads data, calculates signals, saves status, and sleeps."""
logging.info(f"Starting main logic loop for {self.coin} on {self.timeframe} timeframe.")
while True:
data = self.load_data()
if data.empty:
logging.warning("No data loaded. Waiting 1 minute before retrying...")
self.current_signal = "NO DATA"
self._save_status()
time.sleep(60)
continue
self._calculate_signals(data)
self._save_status()
last_close = data['close'].iloc[-1]
indicator_val_str = f"{self.indicator_value:.4f}" if self.indicator_value is not None else "N/A"
logging.info(f"Signal: {self.current_signal} | Price: {last_close:.4f} | Indicator: {indicator_val_str}")
sleep_time = self.get_sleep_duration()
time.sleep(sleep_time)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a trading strategy.")
parser.add_argument("--name", required=True, help="The name of the strategy instance from the config.")
parser.add_argument("--params", required=True, help="A JSON string of the strategy's parameters.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
try:
strategy_params = json.loads(args.params)
strategy = TradingStrategy(
strategy_name=args.name,
params=strategy_params,
log_level=args.log_level
)
strategy.run_logic()
except KeyboardInterrupt:
logging.info("Strategy process stopped.")
except Exception as e:
logging.error(f"A critical error occurred: {e}")
sys.exit(1)