import argparse import logging import sys import time import pandas as pd import sqlite3 import json import os from datetime import datetime, timezone, timedelta from logging_utils import setup_logging class SmaCrossStrategy: """ A flexible strategy that can operate in two modes: 1. Fast SMA / Slow SMA Crossover (if both 'fast' and 'slow' params are set) 2. Price / Single SMA Crossover (if only one 'fast' or 'slow' param is set) """ def __init__(self, strategy_name: str, params: dict, log_level: str): self.strategy_name = strategy_name self.params = params self.coin = params.get("coin", "N/A") self.timeframe = params.get("timeframe", "N/A") # Load fast and slow SMA periods, defaulting to 0 if not present self.fast_ma_period = params.get("fast", 0) self.slow_ma_period = params.get("slow", 0) self.db_path = os.path.join("_data", "market_data.db") self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json") # Strategy state variables self.current_signal = "INIT" self.last_signal_change_utc = None self.signal_price = None self.fast_ma_value = None self.slow_ma_value = None setup_logging(log_level, f"Strategy-{self.strategy_name}") logging.info(f"Initializing SMA Crossover strategy with parameters:") for key, value in self.params.items(): logging.info(f" - {key}: {value}") def load_data(self) -> pd.DataFrame: """Loads historical data, ensuring enough for the longest SMA calculation.""" table_name = f"{self.coin}_{self.timeframe}" # Determine the longest period needed for calculations longest_period = max(self.fast_ma_period or 0, self.slow_ma_period or 0) if longest_period == 0: logging.error("No valid SMA periods ('fast' or 'slow' > 0) are defined in parameters.") return pd.DataFrame() limit = longest_period + 50 try: with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn: query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}' df = pd.read_sql(query, conn) if df.empty: return pd.DataFrame() df['datetime_utc'] = pd.to_datetime(df['datetime_utc']) df.set_index('datetime_utc', inplace=True) df.sort_index(inplace=True) return df except Exception as e: logging.error(f"Failed to load data from table '{table_name}': {e}") return pd.DataFrame() def _calculate_signals(self, data: pd.DataFrame): """ Analyzes historical data to find the last crossover event based on the configured parameters (either dual or single SMA mode). """ # --- DUAL SMA CROSSOVER LOGIC --- if self.fast_ma_period and self.slow_ma_period: if len(data) < self.slow_ma_period + 1: self.current_signal = "INSUFFICIENT DATA" return data['fast_sma'] = data['close'].rolling(window=self.fast_ma_period).mean() data['slow_sma'] = data['close'].rolling(window=self.slow_ma_period).mean() self.fast_ma_value = data['fast_sma'].iloc[-1] self.slow_ma_value = data['slow_sma'].iloc[-1] # Position is 1 for Golden Cross (fast > slow), -1 for Death Cross data['position'] = 0 data.loc[data['fast_sma'] > data['slow_sma'], 'position'] = 1 data.loc[data['fast_sma'] < data['slow_sma'], 'position'] = -1 # --- SINGLE SMA PRICE CROSS LOGIC --- else: sma_period = self.fast_ma_period or self.slow_ma_period if len(data) < sma_period + 1: self.current_signal = "INSUFFICIENT DATA" return data['sma'] = data['close'].rolling(window=sma_period).mean() self.slow_ma_value = data['sma'].iloc[-1] # Use slow_ma_value to store the single SMA self.fast_ma_value = None # Ensure fast is None # Position is 1 when price is above SMA, -1 when below data['position'] = 0 data.loc[data['close'] > data['sma'], 'position'] = 1 data.loc[data['close'] < data['sma'], 'position'] = -1 # --- COMMON LOGIC for determining signal and last change --- data['crossover'] = data['position'].diff() last_position = data['position'].iloc[-1] if last_position == 1: self.current_signal = "BUY" elif last_position == -1: self.current_signal = "SELL" else: self.current_signal = "HOLD" last_cross_series = data[data['crossover'] != 0] if not last_cross_series.empty: last_cross_row = last_cross_series.iloc[-1] self.last_signal_change_utc = last_cross_row.name.tz_localize('UTC').isoformat() self.signal_price = last_cross_row['close'] if last_cross_row['position'] == 1: self.current_signal = "BUY" elif last_cross_row['position'] == -1: self.current_signal = "SELL" else: self.last_signal_change_utc = data.index[0].tz_localize('UTC').isoformat() self.signal_price = data['close'].iloc[0] def _save_status(self): """Saves the current strategy state to its JSON file.""" status = { "strategy_name": self.strategy_name, "current_signal": self.current_signal, "last_signal_change_utc": self.last_signal_change_utc, "signal_price": self.signal_price, "last_checked_utc": datetime.now(timezone.utc).isoformat() } try: with open(self.status_file_path, 'w', encoding='utf-8') as f: json.dump(status, f, indent=4) except IOError as e: logging.error(f"Failed to write status file: {e}") def get_sleep_duration(self) -> int: """Calculates seconds to sleep until the next full candle closes.""" tf_value = int(''.join(filter(str.isdigit, self.timeframe))) tf_unit = ''.join(filter(str.isalpha, self.timeframe)) if tf_unit == 'm': interval_seconds = tf_value * 60 elif tf_unit == 'h': interval_seconds = tf_value * 3600 elif tf_unit == 'd': interval_seconds = tf_value * 86400 else: return 60 now = datetime.now(timezone.utc) timestamp = now.timestamp() next_candle_ts = ((timestamp // interval_seconds) + 1) * interval_seconds sleep_seconds = (next_candle_ts - timestamp) + 5 logging.info(f"Next candle closes at {datetime.fromtimestamp(next_candle_ts, tz=timezone.utc)}. " f"Sleeping for {sleep_seconds:.2f} seconds.") return sleep_seconds def run_logic(self): """Main loop: loads data, calculates signals, saves status, and sleeps.""" logging.info(f"Starting logic loop for {self.coin} on {self.timeframe} timeframe.") while True: data = self.load_data() if data.empty: logging.warning("No data loaded. Waiting 1 minute before retrying...") self.current_signal = "NO DATA" self._save_status() time.sleep(60) continue self._calculate_signals(data) self._save_status() last_close = data['close'].iloc[-1] # --- Log based on which mode the strategy is running in --- if self.fast_ma_period and self.slow_ma_period: fast_ma_str = f"{self.fast_ma_value:.4f}" if self.fast_ma_value is not None else "N/A" slow_ma_str = f"{self.slow_ma_value:.4f}" if self.slow_ma_value is not None else "N/A" logging.info( f"Signal: {self.current_signal} | Price: {last_close:.4f} | " f"Fast SMA({self.fast_ma_period}): {fast_ma_str} | Slow SMA({self.slow_ma_period}): {slow_ma_str}" ) else: sma_period = self.fast_ma_period or self.slow_ma_period sma_val_str = f"{self.slow_ma_value:.4f}" if self.slow_ma_value is not None else "N/A" logging.info( f"Signal: {self.current_signal} | Price: {last_close:.4f} | " f"SMA({sma_period}): {sma_val_str}" ) sleep_time = self.get_sleep_duration() time.sleep(sleep_time) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run an SMA Crossover trading strategy.") parser.add_argument("--name", required=True, help="The name of the strategy instance from the config.") parser.add_argument("--params", required=True, help="A JSON string of the strategy's parameters.") parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) args = parser.parse_args() try: strategy_params = json.loads(args.params) strategy = SmaCrossStrategy( strategy_name=args.name, params=strategy_params, log_level=args.log_level ) strategy.run_logic() except KeyboardInterrupt: logging.info("Strategy process stopped.") except Exception as e: logging.error(f"A critical error occurred: {e}") sys.exit(1)