From 70f3d48336e60b0a5eb570f4b64cd842b32d007f Mon Sep 17 00:00:00 2001 From: DiTus Date: Mon, 20 Oct 2025 20:46:48 +0200 Subject: [PATCH] live market websocket and monitoring wallets --- _data/backtesting_conf.json | 23 +- _data/executor_managed_positions.json | 5 - _data/strategies.json | 55 ++-- _data/strategy_status_sma_cross_1.json | 8 +- _data/strategy_status_sma_cross_2.json | 6 +- address_monitor.py | 221 +++++++++++++++ backtester.py | 355 +++++++++++++++++-------- basic_ws.py | 31 +++ live_market.py | 258 ++++++++++++++++++ strategies/base_strategy.py | 73 +++++ strategies/ma_cross_strategy.py | 35 +++ strategies/single_sma_strategy.py | 24 ++ strategy_runner.py | 85 ++++++ 13 files changed, 996 insertions(+), 183 deletions(-) create mode 100644 address_monitor.py create mode 100644 basic_ws.py create mode 100644 live_market.py create mode 100644 strategies/base_strategy.py create mode 100644 strategies/ma_cross_strategy.py create mode 100644 strategies/single_sma_strategy.py create mode 100644 strategy_runner.py diff --git a/_data/backtesting_conf.json b/_data/backtesting_conf.json index 09cd709..66c76cf 100644 --- a/_data/backtesting_conf.json +++ b/_data/backtesting_conf.json @@ -1,25 +1,16 @@ { - "sma_cross_eth_1m": { - "strategy_name": "sma_cross_1", + "sma_cross_eth_5m": { + "strategy_name": "sma_cross_2", + "script": "strategies.ma_cross_strategy.MaCrossStrategy", "optimization_params": { "fast": { - "start": 4, - "end": 15, + "start": 5, + "end": 150, "step": 1 }, "slow": { - "start": 20, - "end": 60, - "step": 1 - } - } - }, - "sma_44d_btc": { - "strategy_name": "sma_cross_2", - "optimization_params": { - "sma_period": { - "start": 20, - "end": 250, + "start": 0, + "end": 0, "step": 1 } } diff --git a/_data/executor_managed_positions.json b/_data/executor_managed_positions.json index e9769cb..4a80fe9 100644 --- a/_data/executor_managed_positions.json +++ b/_data/executor_managed_positions.json @@ -3,10 +3,5 @@ "coin": "BTC", "side": "short", "size": 0.0001 - }, - "sma_cross_1": { - "coin": "ETH", - "side": "short", - "size": 0.0028 } } \ No newline at end of file diff --git a/_data/strategies.json b/_data/strategies.json index 01e0dc1..e4ad53e 100644 --- a/_data/strategies.json +++ b/_data/strategies.json @@ -1,52 +1,31 @@ { - "sma_cross_1": { + "sma_cross_eth_5m": { "enabled": true, - "script": "strategy_sma_cross.py", - "agent": "scalper", + "script": "strategy_runner.py", + "class": "strategies.ma_cross_strategy.MaCrossStrategy", + "agent": "scalper_agent", "parameters": { "coin": "ETH", - "timeframe": "5m", - "slow": 44, - "fast": 7, - "size": 0.0028, - "leverage_long": 5, - "leverage_short": 2 + "timeframe": "1m", + "short_ma": 7, + "long_ma": 44, + "size": 0.0055, + "leverage_long": 5, + "leverage_short": 5 } }, - "sma_cross_2": { + "sma_125d_btc": { "enabled": true, - "script": "strategy_sma_cross.py", - "agent": "swing", - "parameters": { - "coin": "BTC", - "timeframe": "1D", - "slow": 44, - "fast": 0, - "size": 0.0001, - "leverage_long": 2, - "leverage_short": 1 - } - }, - "sma_125d_btc": { - "enabled": false, - "script": "strategy_template.py", + "script": "strategy_runner.py", + "class": "strategies.single_sma_strategy.SingleSmaStrategy", "agent": "swing_agent", "parameters": { "coin": "BTC", - "timeframe": "1D", - "sma_period": 125, - "size": 0.0001 - } - }, - "sma_44d_btc": { - "enabled": false, - "script": "strategy_template.py", - "agent": "swing_agent", - "parameters": { - "coin": "BTC", - "timeframe": "1D", + "timeframe": "1d", "sma_period": 44, - "size": 0.0001 + "size": 0.0001, + "leverage_long": 2, + "leverage_short": 1 } } } diff --git a/_data/strategy_status_sma_cross_1.json b/_data/strategy_status_sma_cross_1.json index 2944d79..e9a5b44 100644 --- a/_data/strategy_status_sma_cross_1.json +++ b/_data/strategy_status_sma_cross_1.json @@ -1,7 +1,7 @@ { "strategy_name": "sma_cross_1", - "current_signal": "SELL", - "last_signal_change_utc": "2025-10-18T16:19:00+00:00", - "signal_price": 3870.5, - "last_checked_utc": "2025-10-18T16:40:05.039625+00:00" + "current_signal": "FLAT", + "last_signal_change_utc": "2025-10-18T20:22:00+00:00", + "signal_price": 3893.9, + "last_checked_utc": "2025-10-18T20:30:05.021192+00:00" } \ No newline at end of file diff --git a/_data/strategy_status_sma_cross_2.json b/_data/strategy_status_sma_cross_2.json index 95a0419..a4b5055 100644 --- a/_data/strategy_status_sma_cross_2.json +++ b/_data/strategy_status_sma_cross_2.json @@ -1,7 +1,7 @@ { "strategy_name": "sma_cross_2", "current_signal": "SELL", - "last_signal_change_utc": "2025-10-14T00:00:00+00:00", - "signal_price": 113026.0, - "last_checked_utc": "2025-10-18T16:40:09.950516+00:00" + "last_signal_change_utc": "2025-10-20T00:00:00+00:00", + "signal_price": 110811.0, + "last_checked_utc": "2025-10-20T18:45:51.578502+00:00" } \ No newline at end of file diff --git a/address_monitor.py b/address_monitor.py new file mode 100644 index 0000000..9bd768a --- /dev/null +++ b/address_monitor.py @@ -0,0 +1,221 @@ +import os +import sys +import time +import json +import argparse +from datetime import datetime, timezone +from hyperliquid.info import Info +from hyperliquid.utils import constants +from collections import deque +import logging +import csv + +from logging_utils import setup_logging + +# --- Configuration --- +DEFAULT_ADDRESSES_TO_WATCH = [ + #"0xd4c1f7e8d876c4749228d515473d36f919583d1d", + "0x0fd468a73084daa6ea77a9261e40fdec3e67e0c7", + # "0x4d69495d16fab95c3c27b76978affa50301079d0", + # "0x09bc1cf4d9f0b59e1425a8fde4d4b1f7d3c9410d", + "0xc6ac58a7a63339898aeda32499a8238a46d88e84", + "0xa8ef95dbd3db55911d3307930a84b27d6e969526", + # "0x4129c62faf652fea61375dcd9ca8ce24b2bb8b95", + "0xbf1935fe7ab6d0aa3ee8d3da47c2f80e215b2a1c", +] +MAX_FILLS_TO_DISPLAY = 10 +LOGS_DIR = "_logs" +recent_fills = {} +_lines_printed = 0 + +TABLE_HEADER = f"{'Time (UTC)':<10} | {'Coin':<6} | {'Side':<5} | {'Size':>15} | {'Price':>15} | {'Value (USD)':>20}" +TABLE_WIDTH = len(TABLE_HEADER) + +def log_fill_to_csv(address: str, fill_data: dict): + """Appends a single fill record to the CSV file for a specific address.""" + log_file_path = os.path.join(LOGS_DIR, f"fills_{address}.csv") + file_exists = os.path.exists(log_file_path) + + # The CSV will store a flattened version of the decoded fill + csv_row = { + 'time_utc': fill_data['time'].isoformat(), + 'coin': fill_data['coin'], + 'side': fill_data['side'], + 'price': fill_data['price'], + 'size': fill_data['size'], + 'value_usd': fill_data['value'] + } + + try: + with open(log_file_path, 'a', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=csv_row.keys()) + if not file_exists: + writer.writeheader() + writer.writerow(csv_row) + except IOError as e: + logging.error(f"Failed to write to CSV log for {address}: {e}") + +def on_message(message): + """ + Callback function to process incoming userEvents from the WebSocket. + """ + try: + logging.debug(f"Received message: {message}") + channel = message.get("channel") + if channel in ("user", "userFills"): + data = message.get("data") + if not data: + return + + user_address = data.get("user", "").lower() + fills = data.get("fills", []) + + if user_address in recent_fills and fills: + logging.info(f"Fill detected for user: {user_address}") + for fill_data in fills: + decoded_fill = { + "time": datetime.fromtimestamp(fill_data['time'] / 1000, tz=timezone.utc), + "coin": fill_data['coin'], + "side": "BUY" if fill_data['side'] == "B" else "SELL", + "price": float(fill_data['px']), + "size": float(fill_data['sz']), + "value": float(fill_data['px']) * float(fill_data['sz']), + } + recent_fills[user_address].append(decoded_fill) + # --- ADDED: Log every fill to its CSV file --- + log_fill_to_csv(user_address, decoded_fill) + + except (KeyError, TypeError, ValueError) as e: + logging.error(f"Error processing message: {e} | Data: {message}") + +def build_fills_table(address: str, fills: deque) -> list: + """Builds the formatted lines for a single address's fills table.""" + lines = [] + short_address = f"{address[:6]}...{address[-4:]}" + + lines.append(f"--- Fills for {short_address} ---") + lines.append(TABLE_HEADER) + lines.append("-" * TABLE_WIDTH) + + for fill in list(fills): + lines.append( + f"{fill['time'].strftime('%H:%M:%S'):<10} | " + f"{fill['coin']:<6} | " + f"{fill['side']:<5} | " + f"{fill['size']:>15.4f} | " + f"{fill['price']:>15,.2f} | " + f"${fill['value']:>18,.2f}" + ) + + padding_needed = MAX_FILLS_TO_DISPLAY - len(fills) + for _ in range(padding_needed): + lines.append("") + + return lines + +def display_dashboard(): + """ + Clears the screen and prints a two-column layout of recent fills tables. + """ + global _lines_printed + + if _lines_printed > 0: + print(f"\x1b[{_lines_printed}A", end="") + + output_lines = ["--- Live Address Fill Monitor ---", ""] + + addresses_to_display = list(recent_fills.keys()) + num_addresses = len(addresses_to_display) + mid_point = (num_addresses + 1) // 2 + left_column_addresses = addresses_to_display[:mid_point] + right_column_addresses = addresses_to_display[mid_point:] + + separator = " | " + + for i in range(mid_point): + left_address = left_column_addresses[i] + left_table_lines = build_fills_table(left_address, recent_fills[left_address]) + + right_table_lines = [] + if i < len(right_column_addresses): + right_address = right_column_addresses[i] + right_table_lines = build_fills_table(right_address, recent_fills[right_address]) + + table_height = 3 + MAX_FILLS_TO_DISPLAY + for j in range(table_height): + left_part = left_table_lines[j] if j < len(left_table_lines) else "" + right_part = right_table_lines[j] if j < len(right_table_lines) else "" + output_lines.append(f"{left_part:<{TABLE_WIDTH}}{separator}{right_part}") + output_lines.append("") + + final_output = "\n".join(output_lines) + "\n\x1b[J" + print(final_output, end="") + + _lines_printed = len(output_lines) + sys.stdout.flush() + +def main(): + """ + Main function to set up the WebSocket and run the display loop. + """ + global recent_fills + parser = argparse.ArgumentParser(description="Monitor live fills for specific wallet addresses on Hyperliquid.") + parser.add_argument( + "--addresses", + nargs='+', + default=DEFAULT_ADDRESSES_TO_WATCH, + help="A space-separated list of Ethereum addresses to monitor." + ) + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level for the script." + ) + args = parser.parse_args() + + setup_logging(args.log_level, 'AddressMonitor') + + # --- ADDED: Ensure the logs directory exists --- + if not os.path.exists(LOGS_DIR): + os.makedirs(LOGS_DIR) + + addresses_to_watch = [] + for addr in args.addresses: + clean_addr = addr.strip().lower() + if len(clean_addr) == 42 and clean_addr.startswith('0x'): + addresses_to_watch.append(clean_addr) + else: + logging.warning(f"Invalid or malformed address provided: '{addr}'. Skipping.") + + recent_fills = {addr: deque(maxlen=MAX_FILLS_TO_DISPLAY) for addr in addresses_to_watch} + + if not addresses_to_watch: + print("No valid addresses configured to watch. Exiting.", file=sys.stderr) + return + + info = Info(constants.MAINNET_API_URL, skip_ws=False) + + for addr in addresses_to_watch: + try: + info.subscribe({"type": "userFills", "user": addr}, on_message) + logging.debug(f"Queued subscribe for userFills: {addr}") + time.sleep(0.02) + except Exception as e: + logging.error(f"Failed to subscribe for {addr}: {e}") + + logging.info(f"Subscribed to userFills for {len(addresses_to_watch)} addresses") + + print("\nDisplaying live fill data... Press Ctrl+C to stop.") + try: + while True: + display_dashboard() + time.sleep(0.2) + except KeyboardInterrupt: + print("\nStopping WebSocket listener...") + info.ws_manager.stop() + print("Listener stopped.") + +if __name__ == "__main__": + main() + diff --git a/backtester.py b/backtester.py index 4a37bfc..d0c597f 100644 --- a/backtester.py +++ b/backtester.py @@ -10,97 +10,125 @@ import itertools import multiprocessing from functools import partial import time +import importlib +import signal from logging_utils import setup_logging -def _run_single_simulation(df: pd.DataFrame, params: dict) -> list: +def _run_trade_simulation(df: pd.DataFrame, capital: float, size_pct: float, leverage_long: int, leverage_short: int, taker_fee_pct: float, maker_fee_pct: float) -> tuple[float, list]: """ - Core simulation logic. Takes a DataFrame and parameters, returns a list of trades. - This is a pure function to be used by different data loaders. + Simulates a trading strategy with portfolio management, including capital, + position sizing, leverage, and fees. """ - fast_ma_period = params.get('fast', 0) - slow_ma_period = params.get('slow', 0) - sma_period = params.get('sma_period', 0) - - if fast_ma_period and slow_ma_period: - df['fast_sma'] = df['close'].rolling(window=fast_ma_period).mean() - df['slow_sma'] = df['close'].rolling(window=slow_ma_period).mean() - df['signal'] = (df['fast_sma'] > df['slow_sma']).astype(int) - elif sma_period: - df['sma'] = df['close'].rolling(window=sma_period).mean() - df['signal'] = (df['close'] > df['sma']).astype(int) - else: - return [] - df.dropna(inplace=True) - if df.empty: return [] + if df.empty: return capital, [] - df['position'] = df['signal'].diff() + df['position_change'] = df['signal'].diff() trades = [] entry_price = 0 - - for i, row in df.iterrows(): - if row['position'] == 1: - if entry_price == 0: # Only enter if flat - entry_price = row['close'] - elif row['position'] == -1: - if entry_price != 0: # Only exit if in a position - pnl = (row['close'] - entry_price) / entry_price - trades.append({'pnl_pct': pnl}) - entry_price = 0 - - return trades + asset_size = 0 + current_position = 0 # 0=flat, 1=long, -1=short + equity = capital -def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, start_date: str, end_date: str) -> tuple[dict, list]: + for i, row in df.iterrows(): + # --- Close Positions --- + if (current_position == 1 and row['signal'] != 1) or \ + (current_position == -1 and row['signal'] != -1): + + exit_value = asset_size * row['close'] + fee = exit_value * (taker_fee_pct / 100) + + if current_position == 1: # Closing a long + pnl_usd = (row['close'] - entry_price) * asset_size + equity += pnl_usd - fee + trades.append({'pnl_usd': pnl_usd, 'pnl_pct': (row['close'] - entry_price) / entry_price, 'type': 'long'}) + + elif current_position == -1: # Closing a short + pnl_usd = (entry_price - row['close']) * asset_size + equity += pnl_usd - fee + trades.append({'pnl_usd': pnl_usd, 'pnl_pct': (entry_price - row['close']) / entry_price, 'type': 'short'}) + + entry_price = 0 + asset_size = 0 + current_position = 0 + + # --- Open New Positions --- + if current_position == 0: + if row['signal'] == 1: # Open Long + margin_to_use = equity * (size_pct / 100) + trade_value = margin_to_use * leverage_long + asset_size = trade_value / row['close'] + fee = trade_value * (taker_fee_pct / 100) + equity -= fee + entry_price = row['close'] + current_position = 1 + elif row['signal'] == -1: # Open Short + margin_to_use = equity * (size_pct / 100) + trade_value = margin_to_use * leverage_short + asset_size = trade_value / row['close'] + fee = trade_value * (taker_fee_pct / 100) + equity -= fee + entry_price = row['close'] + current_position = -1 + + return equity, trades + + +def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, start_date: str, end_date: str, strategy_class, sim_params: dict) -> tuple[dict, float, list]: """ - A worker function for multiprocessing. It loads its own data from the DB - and then runs the simulation, returning the parameters and results together. + Worker function that loads data, runs the full simulation, and returns results. """ df = pd.DataFrame() try: with sqlite3.connect(db_path) as conn: - query = f'SELECT datetime_utc, close FROM "{coin}_{timeframe}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' + query = f'SELECT datetime_utc, open, high, low, close FROM "{coin}_{timeframe}" WHERE datetime_utc >= ? AND datetime_utc <= ? ORDER BY datetime_utc' df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) if not df.empty: df.set_index('datetime_utc', inplace=True) except Exception as e: print(f"Worker error loading data for params {params}: {e}") - return (params, []) + return (params, sim_params['capital'], []) if df.empty: - return (params, []) + return (params, sim_params['capital'], []) - trades = _run_single_simulation(df, params) - return (params, trades) + strategy_instance = strategy_class(params) + df_with_signals = strategy_instance.calculate_signals(df) + + final_equity, trades = _run_trade_simulation(df_with_signals, **sim_params) + return (params, final_equity, trades) + + +def init_worker(): + signal.signal(signal.SIGINT, signal.SIG_IGN) class Backtester: - """ - A class to run a Walk-Forward Optimization, which is the gold standard - for testing the robustness of a trading strategy. - """ - - def __init__(self, log_level: str, strategy_name_to_test: str): + def __init__(self, log_level: str, strategy_name_to_test: str, start_date: str, sim_params: dict): setup_logging(log_level, 'Backtester') self.db_path = os.path.join("_data", "market_data.db") + self.simulation_params = sim_params self.backtest_config = self._load_backtest_config(strategy_name_to_test) - if not self.backtest_config: - logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found.") - sys.exit(1) - + # ... (rest of __init__ is unchanged) self.strategy_name = self.backtest_config.get('strategy_name') self.strategy_config = self._load_strategy_config() - if not self.strategy_config: - logging.error(f"Strategy '{self.strategy_name}' not found.") - sys.exit(1) - self.params = self.strategy_config.get('parameters', {}) self.coin = self.params.get('coin') self.timeframe = self.params.get('timeframe') self.pool = None - - def _load_backtest_config(self, name_to_test: str) -> dict: + self.full_history_start_date = start_date + try: + module_path, class_name = self.backtest_config['script'].rsplit('.', 1) + module = importlib.import_module(module_path) + self.strategy_class = getattr(module, class_name) + logging.info(f"Successfully loaded strategy class '{class_name}'.") + except (ImportError, AttributeError, KeyError) as e: + logging.error(f"Could not load strategy script '{self.backtest_config.get('script')}': {e}") + sys.exit(1) + + def _load_backtest_config(self, name_to_test: str): + # ... (unchanged) config_path = os.path.join("_data", "backtesting_conf.json") try: with open(config_path, 'r') as f: return json.load(f).get(name_to_test) @@ -108,7 +136,8 @@ class Backtester: logging.error(f"Could not load backtesting configuration: {e}") return None - def _load_strategy_config(self) -> dict: + def _load_strategy_config(self): + # ... (unchanged) config_path = os.path.join("_data", "strategies.json") try: with open(config_path, 'r') as f: return json.load(f).get(self.strategy_name) @@ -116,53 +145,86 @@ class Backtester: logging.error(f"Could not load strategy configuration: {e}") return None - def run_walk_forward_optimization(self, num_periods=10, in_sample_pct=0.9): - """ - Main function to orchestrate the walk-forward analysis. - """ - full_df = self.load_data("2020-01-01", datetime.now().strftime("%Y-%m-%d")) + def run_walk_forward_optimization(self, optimization_weeks: int, testing_weeks: int, step_weeks: int): + # ... (unchanged, will now use the new simulation logic via the worker) + full_df = self.load_data(self.full_history_start_date, datetime.now().strftime("%Y-%m-%d")) if full_df.empty: return - period_length = len(full_df) // num_periods - all_out_of_sample_trades = [] - - for i in range(num_periods): - logging.info(f"\n--- Starting Walk-Forward Period {i+1}/{num_periods} ---") - - # 1. Define the In-Sample (training) and Out-of-Sample (testing) periods - start_index = i * period_length - in_sample_end_index = start_index + int(period_length * in_sample_pct) - out_of_sample_end_index = start_index + period_length + optimization_delta = timedelta(weeks=optimization_weeks) + testing_delta = timedelta(weeks=testing_weeks) + step_delta = timedelta(weeks=step_weeks) - if in_sample_end_index >= len(full_df) or out_of_sample_end_index > len(full_df): - logging.warning("Not enough data for the full final period. Ending analysis.") + all_out_of_sample_trades = [] + all_period_summaries = [] + + current_date = full_df.index[0] + end_date = full_df.index[-1] + + period_num = 1 + while current_date + optimization_delta + testing_delta <= end_date: + logging.info(f"\n--- Starting Walk-Forward Period {period_num} ---") + + in_sample_start = current_date + in_sample_end = in_sample_start + optimization_delta + out_of_sample_end = in_sample_end + testing_delta + + in_sample_df = full_df[in_sample_start:in_sample_end] + out_of_sample_df = full_df[in_sample_end:out_of_sample_end] + + if in_sample_df.empty or out_of_sample_df.empty: break - in_sample_df = full_df.iloc[start_index:in_sample_end_index] - out_of_sample_df = full_df.iloc[in_sample_end_index:out_of_sample_end_index] - - logging.info(f"In-Sample: {in_sample_df.index[0].date()} to {in_sample_df.index[-1].date()}") - logging.info(f"Out-of-Sample: {out_of_sample_df.index[0].date()} to {out_of_sample_df.index[-1].date()}") + logging.info(f"In-Sample (Optimization): {in_sample_df.index[0].date()} to {in_sample_df.index[-1].date()}") + logging.info(f"Out-of-Sample (Testing): {out_of_sample_df.index[0].date()} to {out_of_sample_df.index[-1].date()}") - # 2. Find the best parameters on the In-Sample data - best_params = self._find_best_params(in_sample_df) - if not best_params: - logging.warning("No profitable parameters found in this period. Skipping.") + best_result = self._find_best_params(in_sample_df) + if not best_result: + all_period_summaries.append({"period": period_num, "params": "None Found"}) + current_date += step_delta + period_num += 1 continue + + print("\n--- [1] In-Sample Optimization Result ---") + print(f"Best Parameters Found: {best_result['params']}") + self._generate_report(best_result['final_equity'], best_result['trades_list'], "In-Sample Performance with Best Params") - # 3. Test the best parameters on the Out-of-Sample data - logging.info(f"Testing best params {best_params} on Out-of-Sample data...") - out_of_sample_trades = _run_single_simulation(out_of_sample_df.copy(), best_params) + logging.info(f"\n--- [2] Forward Testing on Out-of-Sample Data ---") + df_with_signals = self.strategy_class(best_result['params']).calculate_signals(out_of_sample_df.copy()) + final_equity_oos, out_of_sample_trades = _run_trade_simulation(df_with_signals, **self.simulation_params) + all_out_of_sample_trades.extend(out_of_sample_trades) - self._generate_report(out_of_sample_trades, f"Period {i+1} Out-of-Sample Results") + oos_summary = self._generate_report(final_equity_oos, out_of_sample_trades, "Out-of-Sample Performance") + + # Store the summary for the final table + summary_to_store = {"period": period_num, "params": best_result['params'], **oos_summary} + all_period_summaries.append(summary_to_store) + + current_date += step_delta + period_num += 1 - # 4. Generate a final report for all combined out-of-sample trades + # ... (Final reports will be generated here, but need to adapt to equity tracking) print("\n" + "="*50) - self._generate_report(all_out_of_sample_trades, "AGGREGATE WALK-FORWARD PERFORMANCE") + # self._generate_report(all_out_of_sample_trades, "FINAL AGGREGATE WALK-FORWARD PERFORMANCE") print("="*50) + # --- ADDED: Final summary table of best parameters and performance per period --- + print("\n--- Summary of Best Parameters and Performance per Period ---") + header = f"{'#':<3} | {'Best Parameters':<30} | {'Trades':>8} | {'Longs':>6} | {'Shorts':>7} | {'Win %':>8} | {'L Win %':>9} | {'S Win %':>9} | {'Return %':>10} | {'Equity':>15}" + print(header) + print("-" * len(header)) + for item in all_period_summaries: + params_str = str(item.get('params', 'N/A')) + trades = item.get('num_trades', 'N/A') + longs = item.get('num_longs', 'N/A') + shorts = item.get('num_shorts', 'N/A') + win_rate = f"{item.get('win_rate', 0):.2f}%" if 'win_rate' in item else 'N/A' + long_win_rate = f"{item.get('long_win_rate', 0):.2f}%" if 'long_win_rate' in item else 'N/A' + short_win_rate = f"{item.get('short_win_rate', 0):.2f}%" if 'short_win_rate' in item else 'N/A' + return_pct = f"{item.get('return_pct', 0):.2f}%" if 'return_pct' in item else 'N/A' + equity = f"${item.get('final_equity', 0):,.2f}" if 'final_equity' in item else 'N/A' + print(f"{item['period']:<3} | {params_str:<30} | {trades:>8} | {longs:>6} | {shorts:>7} | {win_rate:>8} | {long_win_rate:>9} | {short_win_rate:>9} | {return_pct:>10} | {equity:>15}") + def _find_best_params(self, df: pd.DataFrame) -> dict: - """Runs a multi-core optimization on a given slice of data.""" param_configs = self.backtest_config.get('optimization_params', {}) param_names = list(param_configs.keys()) param_ranges = [range(p['start'], p['end'] + 1, p['step']) for p in param_configs.values()] @@ -173,71 +235,130 @@ class Backtester: logging.info(f"Optimizing on {len(all_combinations)} combinations...") num_cores = 60 - self.pool = multiprocessing.Pool(processes=num_cores) - - worker = partial(_run_single_simulation, df.copy()) - all_trades_results = self.pool.map(worker, param_dicts) + self.pool = multiprocessing.Pool(processes=num_cores, initializer=init_worker) + worker = partial( + simulation_worker, + db_path=self.db_path, coin=self.coin, timeframe=self.timeframe, + start_date=df.index[0].isoformat(), end_date=df.index[-1].isoformat(), + strategy_class=self.strategy_class, + sim_params=self.simulation_params + ) + + all_results = self.pool.map(worker, param_dicts) + self.pool.close() self.pool.join() self.pool = None - results = [] - for i, trades in enumerate(all_trades_results): - if trades: - results.append({'params': param_dicts[i], 'pnl': sum(t['pnl_pct'] for t in trades)}) - + results = [{'params': params, 'final_equity': final_equity, 'trades_list': trades} for params, final_equity, trades in all_results if trades] if not results: return None - return max(results, key=lambda x: x['pnl'])['params'] - + return max(results, key=lambda x: x['final_equity']) + def load_data(self, start_date, end_date): - # This is a simplified version for the main data load + # ... (unchanged) table_name = f"{self.coin}_{self.timeframe}" logging.info(f"Loading full dataset for {table_name}...") try: with sqlite3.connect(self.db_path) as conn: - query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc' + query = f'SELECT * FROM "{table_name}" WHERE datetime_utc >= ? AND datetime_utc <= ? ORDER BY datetime_utc' df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc']) - if df.empty: - logging.warning("No data found for the specified date range.") - return pd.DataFrame() + if df.empty: return pd.DataFrame() df.set_index('datetime_utc', inplace=True) return df except Exception as e: logging.error(f"Failed to load data for backtest: {e}") return pd.DataFrame() - def _generate_report(self, trades: list, title: str): - """Calculates and prints key performance metrics.""" + def _generate_report(self, final_equity: float, trades: list, title: str) -> dict: + """Calculates, prints, and returns a detailed performance report.""" print(f"\n--- {title} ---") + + initial_capital = self.simulation_params['capital'] + if not trades: print("No trades were executed during this period.") - return + print(f"Final Equity: ${initial_capital:,.2f}") + return {"num_trades": 0, "num_longs": 0, "num_shorts": 0, "win_rate": 0, "long_win_rate": 0, "short_win_rate": 0, "return_pct": 0, "final_equity": initial_capital} num_trades = len(trades) - wins = [t for t in trades if t['pnl_pct'] > 0] - total_pnl = sum(t['pnl_pct'] for t in trades) + long_trades = [t for t in trades if t.get('type') == 'long'] + short_trades = [t for t in trades if t.get('type') == 'short'] - print(f"Total Trades: {num_trades}") - print(f"Win Rate: {(len(wins) / num_trades) * 100 if num_trades > 0 else 0:.2f}%") - print(f"Total PNL (Cumulative %): {total_pnl * 100:.2f}%") + pnls_pct = pd.Series([t['pnl_pct'] for t in trades]) + + wins = pnls_pct[pnls_pct > 0] + win_rate = (len(wins) / num_trades) * 100 if num_trades > 0 else 0 + + long_wins = len([t for t in long_trades if t['pnl_pct'] > 0]) + short_wins = len([t for t in short_trades if t['pnl_pct'] > 0]) + long_win_rate = (long_wins / len(long_trades)) * 100 if long_trades else 0 + short_win_rate = (short_wins / len(short_trades)) * 100 if short_trades else 0 + + total_return_pct = ((final_equity - initial_capital) / initial_capital) * 100 + + print(f"Final Equity: ${final_equity:,.2f}") + print(f"Total Return: {total_return_pct:.2f}%") + print(f"Total Trades: {num_trades} (Longs: {len(long_trades)}, Shorts: {len(short_trades)})") + print(f"Win Rate (Overall): {win_rate:.2f}%") + print(f"Win Rate (Longs): {long_win_rate:.2f}%") + print(f"Win Rate (Shorts): {short_win_rate:.2f}%") + + # Return a dictionary of the key metrics for the summary table + return { + "num_trades": num_trades, + "num_longs": len(long_trades), + "num_shorts": len(short_trades), + "win_rate": win_rate, + "long_win_rate": long_win_rate, + "short_win_rate": short_win_rate, + "return_pct": total_return_pct, + "final_equity": final_equity + } if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run a Walk-Forward Optimization for a trading strategy.") - parser.add_argument("--strategy", required=True, help="The name of the backtest config to run (from backtesting_conf.json).") + parser.add_argument("--strategy", required=True, help="The name of the backtest config to run.") + parser.add_argument("--start-date", default="2020-08-01", help="The overall start date for historical data.") + parser.add_argument("--optimization-weeks", type=int, default=4) + parser.add_argument("--testing-weeks", type=int, default=1) + parser.add_argument("--step-weeks", type=int, default=1) parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) + + parser.add_argument("--capital", type=float, default=1000) + parser.add_argument("--size-pct", type=float, default=50) + parser.add_argument("--leverage-long", type=int, default=3) + parser.add_argument("--leverage-short", type=int, default=2) + parser.add_argument("--taker-fee-pct", type=float, default=0.045) + parser.add_argument("--maker-fee-pct", type=float, default=0.015) + args = parser.parse_args() + sim_params = { + "capital": args.capital, + "size_pct": args.size_pct, + "leverage_long": args.leverage_long, + "leverage_short": args.leverage_short, + "taker_fee_pct": args.taker_fee_pct, + "maker_fee_pct": args.maker_fee_pct + } + backtester = Backtester( log_level=args.log_level, - strategy_name_to_test=args.strategy + strategy_name_to_test=args.strategy, + start_date=args.start_date, + sim_params=sim_params ) try: - backtester.run_walk_forward_optimization() + backtester.run_walk_forward_optimization( + optimization_weeks=args.optimization_weeks, + testing_weeks=args.testing_weeks, + step_weeks=args.step_weeks + ) except KeyboardInterrupt: - logging.info("\nWalk-Forward Optimization cancelled by user.") + logging.info("\nBacktest optimization cancelled by user.") finally: if backtester.pool: logging.info("Terminating worker processes...") diff --git a/basic_ws.py b/basic_ws.py new file mode 100644 index 0000000..bdd375d --- /dev/null +++ b/basic_ws.py @@ -0,0 +1,31 @@ +import os +import sys +import time +import json +from datetime import datetime, timezone +from hyperliquid.info import Info +from hyperliquid.utils import constants +from collections import deque + +def main(): + address, info, _ = example_utils.setup(constants.MAINNET_API_URL) + # An example showing how to subscribe to the different subscription types and prints the returned messages + # Some subscriptions do not return snapshots, so you will not receive a message until something happens + info.subscribe({"type": "allMids"}, print) + info.subscribe({"type": "l2Book", "coin": "ETH"}, print) + info.subscribe({"type": "trades", "coin": "PURR/USDC"}, print) + info.subscribe({"type": "userEvents", "user": address}, print) + info.subscribe({"type": "userFills", "user": address}, print) + info.subscribe({"type": "candle", "coin": "ETH", "interval": "1m"}, print) + info.subscribe({"type": "orderUpdates", "user": address}, print) + info.subscribe({"type": "userFundings", "user": address}, print) + info.subscribe({"type": "userNonFundingLedgerUpdates", "user": address}, print) + info.subscribe({"type": "webData2", "user": address}, print) + info.subscribe({"type": "bbo", "coin": "ETH"}, print) + info.subscribe({"type": "activeAssetCtx", "coin": "BTC"}, print) # Perp + info.subscribe({"type": "activeAssetCtx", "coin": "@1"}, print) # Spot + info.subscribe({"type": "activeAssetData", "user": address, "coin": "BTC"}, print) # Perp only + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/live_market.py b/live_market.py new file mode 100644 index 0000000..fe9ed90 --- /dev/null +++ b/live_market.py @@ -0,0 +1,258 @@ +import os +import sys +import time +import json +import argparse +from datetime import datetime, timedelta, timezone +from hyperliquid.info import Info +from hyperliquid.utils import constants +from collections import deque, defaultdict + +# --- Configuration --- +MAX_TRADE_HISTORY = 100000 +all_trades = { + "BTC": deque(maxlen=MAX_TRADE_HISTORY), + "ETH": deque(maxlen=MAX_TRADE_HISTORY), +} +latest_raw_trades = { + "BTC": None, + "ETH": None, +} +decoded_trade_output = [] +_lines_printed = 0 + +def get_coins_from_strategies() -> set: + """ + Reads the strategies.json file and returns a unique set of coin symbols + from all enabled strategies. + """ + coins = set() + config_path = os.path.join("_data", "strategies.json") + try: + with open(config_path, 'r') as f: + all_configs = json.load(f) + for name, config in all_configs.items(): + if config.get("enabled", False): + coin = config.get("parameters", {}).get("coin") + if coin: + coins.add(coin) + print(f"Found {len(coins)} unique coins to watch from enabled strategies: {list(coins)}") + return coins + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"ERROR: Could not load or parse '{config_path}': {e}", file=sys.stderr) + return set() + +def on_message(message): + """ + Callback function to process incoming trades from the WebSocket and store them. + """ + try: + if message.get("channel") == "trades": + for trade in message["data"]: + coin = trade['coin'] + if coin in all_trades: + latest_raw_trades[coin] = trade + price = float(trade['px']) + size = float(trade['sz']) + decoded_trade = { + "time": datetime.fromtimestamp(trade['time'] / 1000, tz=timezone.utc), + "side": "BUY" if trade['side'] == "B" else "SELL", + "value": price * size, + "users": trade.get('users', []) + } + all_trades[coin].append(decoded_trade) + except (KeyError, TypeError, ValueError): + pass + +def build_top_trades_table(title: str, trades: list) -> list: + """Builds the formatted lines for a top-5 trades by value table.""" + lines = [] + header = f"{'Time (UTC)':<10} | {'Side':<5} | {'Value (USD)':>20}" + lines.append(f"--- {title} ---") + lines.append(header) + lines.append("-" * len(header)) + + top_trades = sorted(trades, key=lambda x: x['value'], reverse=True)[:5] + + for trade in top_trades: + lines.append( + f"{trade['time'].strftime('%H:%M:%S'):<10} | " + f"{trade['side']:<5} | " + f"${trade['value']:>18,.2f}" + ) + while len(lines) < 8: lines.append(" " * len(header)) + return lines + +def build_top_takers_table(title: str, trades: list) -> list: + """Analyzes a list of trades to find the top 5 takers by total volume.""" + lines = [] + header = f"{'#':<2} | {'Taker Address':<15} | {'Total Volume (USD)':>20}" + lines.append(f"--- {title} ---") + lines.append(header) + lines.append("-" * len(header)) + + volumes = defaultdict(float) + for trade in trades: + for user in trade['users']: + volumes[user] += trade['value'] + + top_takers = sorted(volumes.items(), key=lambda item: item[1], reverse=True)[:5] + + for i, (address, volume) in enumerate(top_takers, 1): + short_address = f"{address[:6]}...{address[-4:]}" + lines.append(f"{i:<2} | {short_address:<15} | ${volume:>18,.2f}") + + while len(lines) < 8: lines.append(" " * len(header)) + return lines + +def build_top_active_takers_table(title: str, trades: list) -> list: + """Analyzes a list of trades to find the top 5 takers by trade count.""" + lines = [] + header = f"{'#':<2} | {'Taker Address':<42} | {'Trade Count':>12} | {'Total Volume (USD)':>20}" + lines.append(f"--- {title} ---") + lines.append(header) + lines.append("-" * len(header)) + + taker_data = defaultdict(lambda: {'count': 0, 'volume': 0.0}) + for trade in trades: + for user in trade['users']: + taker_data[user]['count'] += 1 + taker_data[user]['volume'] += trade['value'] + + top_takers = sorted(taker_data.items(), key=lambda item: item[1]['count'], reverse=True)[:5] + + for i, (address, data) in enumerate(top_takers, 1): + lines.append(f"{i:<2} | {address:<42} | {data['count']:>12} | ${data['volume']:>18,.2f}") + + while len(lines) < 8: lines.append(" " * len(header)) + return lines + + +def build_decoded_trade_lines(coin: str) -> list: + """Builds a formatted, multi-line string for a single decoded trade.""" + trade = latest_raw_trades[coin] + if not trade: return ["No trade data yet..."] * 7 + + return [ + f"Time: {datetime.fromtimestamp(trade['time'] / 1000, tz=timezone.utc)}", + f"Side: {'BUY' if trade.get('side') == 'B' else 'SELL'}", + f"Price: {trade.get('px', 'N/A')}", + f"Size: {trade.get('sz', 'N/A')}", + f"Trade ID: {trade.get('tid', 'N/A')}", + f"Hash: {trade.get('hash', 'N/A')}", + f"Users: {', '.join(trade.get('users', []))}" + ] + +def update_decoded_trade_display(): + """ + Updates the global variable holding the decoded trade output, but only + at the 40-second mark of each minute. + """ + global decoded_trade_output + if datetime.now().second == 40: + lines = [] + lines.append("--- Last BTC Trade (Decoded) ---") + lines.extend(build_decoded_trade_lines("BTC")) + lines.append("") + lines.append("--- Last ETH Trade (Decoded) ---") + lines.extend(build_decoded_trade_lines("ETH")) + decoded_trade_output = lines + +def display_dashboard(view: str): + """Clears the screen and prints the selected dashboard view.""" + global _lines_printed + if _lines_printed > 0: print(f"\x1b[{_lines_printed}A", end="") + + now_utc = datetime.now(timezone.utc) + output_lines = [] + separator = " | " + + time_windows = [ + ("All Time", None), ("Last 24h", timedelta(hours=24)), + ("Last 1h", timedelta(hours=1)), ("Last 5m", timedelta(minutes=5)), + ("Last 1m", timedelta(minutes=1)), + ] + + btc_trades_copy = list(all_trades["BTC"]) + eth_trades_copy = list(all_trades["ETH"]) + + if view == "trades": + output_lines.append("--- Top 5 Trades by Value ---") + for title, delta in time_windows: + btc_trades = [t for t in btc_trades_copy if not delta or t['time'] > now_utc - delta] + eth_trades = [t for t in eth_trades_copy if not delta or t['time'] > now_utc - delta] + btc_lines = build_top_trades_table(f"BTC - {title}", btc_trades) + eth_lines = build_top_trades_table(f"ETH - {title}", eth_trades) + for i in range(len(btc_lines)): + output_lines.append(f"{btc_lines[i]:<45}{separator}{eth_lines[i] if i < len(eth_lines) else ''}") + output_lines.append("") + + elif view == "takers": + output_lines.append("--- Top 5 Takers by Volume (Rolling Windows) ---") + for title, delta in time_windows[1:]: + btc_trades = [t for t in btc_trades_copy if t['time'] > now_utc - delta] + eth_trades = [t for t in eth_trades_copy if t['time'] > now_utc - delta] + btc_lines = build_top_takers_table(f"BTC - {title}", btc_trades) + eth_lines = build_top_takers_table(f"ETH - {title}", eth_trades) + for i in range(len(btc_lines)): + output_lines.append(f"{btc_lines[i]:<45}{separator}{eth_lines[i] if i < len(eth_lines) else ''}") + output_lines.append("") + + elif view == "active_takers": + output_lines.append("--- Top 5 Active Takers by Trade Count (Rolling Windows) ---") + for title, delta in time_windows[1:]: + btc_trades = [t for t in btc_trades_copy if t['time'] > now_utc - delta] + eth_trades = [t for t in eth_trades_copy if t['time'] > now_utc - delta] + btc_lines = build_top_active_takers_table(f"BTC - {title}", btc_trades) + eth_lines = build_top_active_takers_table(f"ETH - {title}", eth_trades) + header_width = 85 + for i in range(len(btc_lines)): + output_lines.append(f"{btc_lines[i]:<{header_width}}{separator}{eth_lines[i] if i < len(eth_lines) else ''}") + output_lines.append("") + + if decoded_trade_output: + output_lines.extend(decoded_trade_output) + else: + for _ in range(17): output_lines.append("") + + final_output = "\n".join(output_lines) + "\n\x1b[J" + print(final_output, end="") + + _lines_printed = len(output_lines) + sys.stdout.flush() + + +def main(): + """Main function to set up the WebSocket and run the display loop.""" + parser = argparse.ArgumentParser(description="Live market data dashboard for Hyperliquid.") + parser.add_argument("--view", default="trades", choices=['trades', 'takers', 'active_takers'], + help="The data view to display: 'trades' (default), 'takers', or 'active_takers'.") + args = parser.parse_args() + + coins_to_watch = get_coins_from_strategies() + if not ("BTC" in coins_to_watch and "ETH" in coins_to_watch): + print("This script is configured to display BTC and ETH. Please ensure they are in your strategies.", file=sys.stderr) + return + + info = Info(constants.MAINNET_API_URL, skip_ws=False) + + for coin in ["BTC", "ETH"]: + trade_subscription = {"type": "trades", "coin": coin} + info.subscribe(trade_subscription, on_message) + print(f"Subscribed to Trades for {coin}") + time.sleep(0.2) + + print(f"\nDisplaying live '{args.view}' summary... Press Ctrl+C to stop.") + try: + while True: + update_decoded_trade_display() + display_dashboard(view=args.view) + time.sleep(1) + except KeyboardInterrupt: + print("\nStopping WebSocket listener...") + info.ws_manager.stop() + print("Listener stopped.") + +if __name__ == "__main__": + main() + diff --git a/strategies/base_strategy.py b/strategies/base_strategy.py new file mode 100644 index 0000000..e40452c --- /dev/null +++ b/strategies/base_strategy.py @@ -0,0 +1,73 @@ +from abc import ABC, abstractmethod +import pandas as pd +import json +import os +import logging +from datetime import datetime, timezone +import sqlite3 + +class BaseStrategy(ABC): + """ + An abstract base class that defines the blueprint for all trading strategies. + It provides common functionality like loading data and saving status. + """ + + def __init__(self, strategy_name: str, params: dict, log_level: str): + self.strategy_name = strategy_name + self.params = params + self.coin = params.get("coin", "N/A") + self.timeframe = params.get("timeframe", "N/A") + self.db_path = os.path.join("_data", "market_data.db") + self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json") + + # --- ADDED: State variables required for status reporting --- + self.current_signal = "INIT" + self.last_signal_change_utc = None + self.signal_price = None + + # This will be set up by the child class after it's initialized + # setup_logging(log_level, f"Strategy-{self.strategy_name}") + # logging.info(f"Initializing with parameters: {self.params}") + + def load_data(self) -> pd.DataFrame: + """Loads historical data for the configured coin and timeframe.""" + table_name = f"{self.coin}_{self.timeframe}" + + # Dynamically determine the number of candles needed based on all possible period parameters + periods = [v for k, v in self.params.items() if 'period' in k or '_ma' in k or 'slow' in k] + limit = max(periods) + 50 if periods else 500 + + try: + with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn: + query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}' + df = pd.read_sql(query, conn, parse_dates=['datetime_utc']) + if df.empty: return pd.DataFrame() + df.set_index('datetime_utc', inplace=True) + df.sort_index(inplace=True) + return df + except Exception as e: + logging.error(f"Failed to load data from table '{table_name}': {e}") + return pd.DataFrame() + + @abstractmethod + def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame: + """ + The core logic of the strategy. Must be implemented by child classes. + """ + pass + + def _save_status(self): + """Saves the current strategy state to its JSON file.""" + status = { + "strategy_name": self.strategy_name, + "current_signal": self.current_signal, + "last_signal_change_utc": self.last_signal_change_utc, + "signal_price": self.signal_price, + "last_checked_utc": datetime.now(timezone.utc).isoformat() + } + try: + with open(self.status_file_path, 'w', encoding='utf-8') as f: + json.dump(status, f, indent=4) + except IOError as e: + logging.error(f"Failed to write status file for {self.strategy_name}: {e}") + diff --git a/strategies/ma_cross_strategy.py b/strategies/ma_cross_strategy.py new file mode 100644 index 0000000..0002375 --- /dev/null +++ b/strategies/ma_cross_strategy.py @@ -0,0 +1,35 @@ +import pandas as pd +from strategies.base_strategy import BaseStrategy +import logging + +class MaCrossStrategy(BaseStrategy): + """ + A strategy based on a fast Simple Moving Average (SMA) crossing + a slow SMA. + """ + def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame: + # Support multiple naming conventions: some configs use 'fast'/'slow' + # while others use 'short_ma'/'long_ma'. Normalize here so both work. + fast_ma_period = self.params.get('short_ma') or self.params.get('fast') or 0 + slow_ma_period = self.params.get('long_ma') or self.params.get('slow') or 0 + + # If parameters are missing, return a neutral signal frame. + if not fast_ma_period or not slow_ma_period: + logging.warning(f"Missing MA period parameters (fast={fast_ma_period}, slow={slow_ma_period}).") + df['signal'] = 0 + return df + + if len(df) < slow_ma_period: + logging.warning(f"Not enough data for MA periods {fast_ma_period}/{slow_ma_period}. Need {slow_ma_period}, have {len(df)}.") + df['signal'] = 0 + return df + + df['fast_sma'] = df['close'].rolling(window=fast_ma_period).mean() + df['slow_sma'] = df['close'].rolling(window=slow_ma_period).mean() + + # Signal is 1 for Golden Cross (fast > slow), -1 for Death Cross + df['signal'] = 0 + df.loc[df['fast_sma'] > df['slow_sma'], 'signal'] = 1 + df.loc[df['fast_sma'] < df['slow_sma'], 'signal'] = -1 + + return df diff --git a/strategies/single_sma_strategy.py b/strategies/single_sma_strategy.py new file mode 100644 index 0000000..52a1d33 --- /dev/null +++ b/strategies/single_sma_strategy.py @@ -0,0 +1,24 @@ +import pandas as pd +from strategies.base_strategy import BaseStrategy +import logging + +class SingleSmaStrategy(BaseStrategy): + """ + A strategy based on the price crossing a single Simple Moving Average (SMA). + """ + def calculate_signals(self, df: pd.DataFrame) -> pd.DataFrame: + sma_period = self.params.get('sma_period', 0) + + if not sma_period or len(df) < sma_period: + logging.warning(f"Not enough data for SMA period {sma_period}. Need {sma_period}, have {len(df)}.") + df['signal'] = 0 + return df + + df['sma'] = df['close'].rolling(window=sma_period).mean() + + # Signal is 1 when price is above SMA, -1 when below + df['signal'] = 0 + df.loc[df['close'] > df['sma'], 'signal'] = 1 + df.loc[df['close'] < df['sma'], 'signal'] = -1 + + return df diff --git a/strategy_runner.py b/strategy_runner.py new file mode 100644 index 0000000..29a67fa --- /dev/null +++ b/strategy_runner.py @@ -0,0 +1,85 @@ +import argparse +import logging +import sys +import time +import pandas as pd +import sqlite3 +import json +import os +from datetime import datetime, timezone +import importlib + +from logging_utils import setup_logging +from strategies.base_strategy import BaseStrategy + +class StrategyRunner: + """ + A generic runner that can execute any strategy that adheres to the + BaseStrategy blueprint. It handles the main logic loop, including data + loading, signal calculation, status saving, and sleeping. + """ + + def __init__(self, strategy_name: str, log_level: str): + self.strategy_name = strategy_name + self.log_level = log_level + self.config = self._load_strategy_config() + if not self.config: + print(f"FATAL: Strategy '{strategy_name}' not found in configuration.") + sys.exit(1) + + # Dynamically import and instantiate the strategy logic class + try: + module_path, class_name = self.config['class'].rsplit('.', 1) + module = importlib.import_module(module_path) + StrategyClass = getattr(module, class_name) + self.strategy_instance = StrategyClass(strategy_name, self.config['parameters'], self.log_level) + except (ImportError, AttributeError, KeyError) as e: + print(f"FATAL: Could not load strategy class for '{strategy_name}': {e}") + sys.exit(1) + + def _load_strategy_config(self) -> dict: + """Loads the configuration for the specified strategy.""" + config_path = os.path.join("_data", "strategies.json") + try: + with open(config_path, 'r') as f: + all_configs = json.load(f) + return all_configs.get(self.strategy_name) + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"FATAL: Could not load strategy configuration: {e}") + return None + + def run(self): + """Main loop: loads data, calculates signals, saves status, and sleeps.""" + logging.info(f"Starting main logic loop for {self.strategy_instance.coin} on {self.strategy_instance.timeframe}.") + while True: + df = self.strategy_instance.load_data() + if df.empty: + logging.warning("No data loaded. Waiting 1 minute before retrying...") + time.sleep(60) + continue + + # The strategy instance calculates signals and updates its internal state + self.strategy_instance.calculate_signals_and_state(df.copy()) + self.strategy_instance._save_status() # Save the new state + + logging.info(f"Current Signal: {self.strategy_instance.current_signal}") + + # Simple 1-minute wait for the next cycle + # A more precise timing mechanism could be implemented here if needed + time.sleep(60) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A generic runner for trading strategies.") + parser.add_argument("--name", required=True, help="The name of the strategy instance from strategies.json.") + parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) + args = parser.parse_args() + + try: + runner = StrategyRunner(strategy_name=args.name, log_level=args.log_level) + runner.run() + except KeyboardInterrupt: + logging.info("Strategy runner stopped.") + except Exception as e: + logging.error(f"A critical error occurred in the strategy runner: {e}") + sys.exit(1)