diff --git a/_data/executor_managed_positions.json b/_data/executor_managed_positions.json new file mode 100644 index 0000000..e9769cb --- /dev/null +++ b/_data/executor_managed_positions.json @@ -0,0 +1,12 @@ +{ + "sma_cross_2": { + "coin": "BTC", + "side": "short", + "size": 0.0001 + }, + "sma_cross_1": { + "coin": "ETH", + "side": "short", + "size": 0.0028 + } +} \ No newline at end of file diff --git a/_data/market_data.db-shm b/_data/market_data.db-shm index 5e240bc..708dc2e 100644 Binary files a/_data/market_data.db-shm and b/_data/market_data.db-shm differ diff --git a/_data/strategies.json b/_data/strategies.json index 61357cf..668ef58 100644 --- a/_data/strategies.json +++ b/_data/strategies.json @@ -2,27 +2,35 @@ "sma_cross_1": { "enabled": true, "script": "strategy_sma_cross.py", + "agent": "scalper", "parameters": { "coin": "ETH", - "timeframe": "5m", + "timeframe": "1m", "slow": 44, "fast": 7, - "size": 0.0055 + "size": 0.0028, + "leverage_long": 5, + "leverage_short": 2 } }, "sma_cross_2": { - "enabled": false, + "enabled": true, "script": "strategy_sma_cross.py", + "agent": "swing", "parameters": { "coin": "BTC", - "timeframe": "5m", - "sma_period": 5, - "size": 0.0001 + "timeframe": "1D", + "slow": 44, + "fast": 0, + "size": 0.0001, + "leverage_long": 2, + "leverage_short": 1 } }, "sma_125d_btc": { "enabled": false, "script": "strategy_template.py", + "agent": "swing_agent", "parameters": { "coin": "BTC", "timeframe": "1D", @@ -33,6 +41,7 @@ "sma_44d_btc": { "enabled": false, "script": "strategy_template.py", + "agent": "swing_agent", "parameters": { "coin": "BTC", "timeframe": "1D", diff --git a/_data/strategy_status_sma_cross_1.json b/_data/strategy_status_sma_cross_1.json index 1ffcc45..165fab5 100644 --- a/_data/strategy_status_sma_cross_1.json +++ b/_data/strategy_status_sma_cross_1.json @@ -1,7 +1,7 @@ { "strategy_name": "sma_cross_1", - "current_signal": "BUY", - "last_signal_change_utc": "2025-10-16T09:40:00+00:00", - "signal_price": 4013.6, - "last_checked_utc": "2025-10-16T11:15:05.033673+00:00" + "current_signal": "SELL", + "last_signal_change_utc": "2025-10-18T12:29:00+00:00", + "signal_price": 3879.3, + "last_checked_utc": "2025-10-18T12:51:05.037979+00:00" } \ No newline at end of file diff --git a/_data/strategy_status_sma_cross_2.json b/_data/strategy_status_sma_cross_2.json index 72078af..c7f118a 100644 --- a/_data/strategy_status_sma_cross_2.json +++ b/_data/strategy_status_sma_cross_2.json @@ -1,7 +1,7 @@ { "strategy_name": "sma_cross_2", "current_signal": "SELL", - "last_signal_change_utc": "2025-10-16T10:30:00+00:00", - "signal_price": 111342.0, - "last_checked_utc": "2025-10-16T10:40:05.037771+00:00" + "last_signal_change_utc": "2025-10-14T00:00:00+00:00", + "signal_price": 113026.0, + "last_checked_utc": "2025-10-18T12:51:02.927448+00:00" } \ No newline at end of file diff --git a/agents b/agents index ad82607..5c0ae2d 100644 --- a/agents +++ b/agents @@ -5,4 +5,15 @@ SAVE THESE SECURELY. This is what your bot will use. šŸ”‘ Agent Private Key: 0xabed7379ec33253694eba50af8a392a88ea32b72b5f4f9cddceb0f5879428b69 šŸ  Agent Address: 0xcB262CeAaE5D8A99b713f87a43Dd18E6Be892739 ================================================== - +SAVE THESE SECURELY. This is what your bot will use. + Name: executor_scalper + (Agent has a default long-term validity) +šŸ”‘ Agent Private Key: 0xe7bd4f3a1e29252ec40edff1bf796beaf13993d23a0c288a75d79c53e3c97812 +šŸ  Agent Address: 0xD211ba67162aD4E785cd4894D00A1A7A32843094 +================================================== +SAVE THESE SECURELY. This is what your bot will use. + Name: executor_swing + (Agent has a default long-term validity) +šŸ”‘ Agent Private Key: 0xb6811c8b4a928556b3b95ccfaf72eb452b0d89a903f251b86955654672a3b6ab +šŸ  Agent Address: 0xAD27c936672Fa368c2d96a47FDA34e8e3A0f318C +================================================== \ No newline at end of file diff --git a/create_agent.py b/create_agent.py index e4bb78f..be6a3fd 100644 --- a/create_agent.py +++ b/create_agent.py @@ -32,7 +32,8 @@ def create_and_authorize_agent(): exchange = Exchange(main_account, constants.MAINNET_API_URL, account_address=main_account.address) # --- STEP 3: Create and approve the agent with a specific name --- - agent_name = "trade_executor" + # agent name must be between 1 and 16 characters long + agent_name = "executor_swing" print(f"\nšŸ”— Authorizing a new agent named '{agent_name}'...") try: diff --git a/market_cap_fetcher.py b/market_cap_fetcher.py index d8e9c9a..ac25ba6 100644 --- a/market_cap_fetcher.py +++ b/market_cap_fetcher.py @@ -44,8 +44,8 @@ class MarketCapFetcher: self.coins_to_fetch = coins self.db_path = os.path.join("_data", "market_data.db") self.api_base_url = "https://api.coingecko.com/api/v3" - #self.api_key = os.environ.get("COINGECKO_API_KEY") - self.api_key = "CG-SvVswjGvdHajUrLFq37CCKJX" + self.api_key = os.environ.get("COINGECKO_API_KEY") + if not self.api_key: logging.error("CoinGecko API key not found. Please set the COINGECKO_API_KEY environment variable.") sys.exit(1) diff --git a/resampler.py b/resampler.py index 555e9c6..09fec2d 100644 --- a/resampler.py +++ b/resampler.py @@ -5,17 +5,15 @@ import sys import sqlite3 import pandas as pd import json -from datetime import datetime, timezone, timedelta -import time +from datetime import datetime, timezone # Assuming logging_utils.py is in the same directory from logging_utils import setup_logging class Resampler: """ - Reads 1-minute candle data directly from the SQLite database, resamples - it to various timeframes, and stores the results back in the database. - This script is designed to run continuously as a self-scheduling service. + Reads new 1-minute candle data from the SQLite database, resamples it to + various timeframes, and appends the new candles to the corresponding tables. """ def __init__(self, log_level: str, coins: list, timeframes: dict): @@ -32,120 +30,130 @@ class Resampler: 'volume': 'sum', 'number_of_trades': 'sum' } - self.resampling_status = {} - - def _execute_resampling_job(self): - """ - Main execution function to process all configured coins and update the database. - """ - if not os.path.exists(self.db_path): - logging.error(f"Database file '{self.db_path}' not found. " - "Please run the data fetcher script first.") - return # Don't exit, just wait for the next cycle - - # Load the latest status file at the start of each job self.resampling_status = self._load_existing_status() - - with sqlite3.connect(self.db_path) as conn: - conn.execute("PRAGMA journal_mode=WAL;") - - logging.info(f"Processing {len(self.coins_to_process)} coins: {', '.join(self.coins_to_process)}") - - for coin in self.coins_to_process: - source_table_name = f"{coin}_1m" - logging.info(f"--- Processing {coin} ---") - - try: - df = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn) - - if df.empty: - logging.warning(f"Source table '{source_table_name}' is empty or does not exist. Skipping.") - continue - - df['datetime_utc'] = pd.to_datetime(df['datetime_utc']) - df.set_index('datetime_utc', inplace=True) - - for tf_name, tf_code in self.timeframes.items(): - logging.info(f" Resampling to {tf_name}...") - - resampled_df = df.resample(tf_code).agg(self.aggregation_logic) - resampled_df.dropna(how='all', inplace=True) - - if coin not in self.resampling_status: - self.resampling_status[coin] = {} - - if not resampled_df.empty: - target_table_name = f"{coin}_{tf_name}" - resampled_df.to_sql( - target_table_name, - conn, - if_exists='replace', - index=True - ) - - last_timestamp = resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S') - num_candles = len(resampled_df) - - self.resampling_status[coin][tf_name] = { - "last_candle_utc": last_timestamp, - "total_candles": num_candles - } - else: - logging.info(f" -> No data to save for '{coin}_{tf_name}'.") - self.resampling_status[coin][tf_name] = { - "last_candle_utc": "N/A", - "total_candles": 0 - } - - except pd.io.sql.DatabaseError as e: - logging.warning(f"Could not read source table '{source_table_name}': {e}") - except Exception as e: - logging.error(f"Failed to process coin '{coin}': {e}") - - self._save_status() - logging.info("--- Resampling job complete ---") - - def run_periodically(self): - """Runs the resampling job at every 5-minute mark of the hour (00, 05, 10...).""" - logging.info("Resampler started. Waiting for the first scheduled run...") - while True: - # 1. Calculate sleep time - now = datetime.now(timezone.utc) - # Calculate how many minutes past the last 5-minute mark we are - minutes_past_mark = now.minute % 5 - seconds_past_mark = minutes_past_mark * 60 + now.second + (now.microsecond / 1_000_000) - - # The total interval is 5 minutes (300 seconds) - sleep_duration = 300 - seconds_past_mark - - # Add a small buffer to ensure the candle data is ready - sleep_duration += 5 - - logging.info(f"Next resampling run in {sleep_duration:.2f} seconds.") - time.sleep(sleep_duration) - - # 2. Execute the job - logging.info("Scheduled time reached. Starting resampling job...") - self._execute_resampling_job() + self.job_start_time = None def _load_existing_status(self) -> dict: """Loads the existing status file if it exists, otherwise returns an empty dict.""" if os.path.exists(self.status_file_path): try: with open(self.status_file_path, 'r', encoding='utf-8') as f: + logging.debug(f"Loading existing status from '{self.status_file_path}'") return json.load(f) except (IOError, json.JSONDecodeError) as e: logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}") return {} + def run(self): + """ + Main execution function to process all configured coins and update the database. + """ + self.job_start_time = datetime.now(timezone.utc) + logging.info(f"--- Resampling job started at {self.job_start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} ---") + + if not os.path.exists(self.db_path): + logging.error(f"Database file '{self.db_path}' not found.") + return + + with sqlite3.connect(self.db_path) as conn: + conn.execute("PRAGMA journal_mode=WAL;") + + logging.debug(f"Processing {len(self.coins_to_process)} coins...") + + for coin in self.coins_to_process: + source_table_name = f"{coin}_1m" + logging.debug(f"--- Processing {coin} ---") + + try: + # Load the full 1m history once per coin + df_1m = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn, parse_dates=['datetime_utc']) + if df_1m.empty: + logging.warning(f"Source table '{source_table_name}' is empty. Skipping.") + continue + df_1m.set_index('datetime_utc', inplace=True) + + for tf_name, tf_code in self.timeframes.items(): + target_table_name = f"{coin}_{tf_name}" + logging.debug(f" Updating {tf_name} table...") + + last_timestamp = self._get_last_timestamp(conn, target_table_name) + + # Get the new 1-minute data that needs to be processed + new_df_1m = df_1m[df_1m.index > last_timestamp] if last_timestamp else df_1m + + if new_df_1m.empty: + logging.debug(f" -> No new 1-minute data for {tf_name}. Table is up to date.") + continue + + resampled_df = new_df_1m.resample(tf_code).agg(self.aggregation_logic) + resampled_df.dropna(how='all', inplace=True) + + if not resampled_df.empty: + # Append the newly resampled data to the target table + resampled_df.to_sql(target_table_name, conn, if_exists='append', index=True) + logging.debug(f" -> Appended {len(resampled_df)} new candles to '{target_table_name}'.") + + if coin not in self.resampling_status: self.resampling_status[coin] = {} + total_candles = int(self._get_table_count(conn, target_table_name)) + self.resampling_status[coin][tf_name] = { + "last_candle_utc": resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S'), + "total_candles": total_candles + } + + except Exception as e: + logging.error(f"Failed to process coin '{coin}': {e}") + + self._log_summary() + self._save_status() + logging.info(f"--- Resampling job finished at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')} ---") + + def _log_summary(self): + """Logs a summary of the total candles for each timeframe.""" + logging.info("--- Resampling Job Summary ---") + timeframe_totals = {} + # Iterate through coins, skipping metadata keys + for coin, tfs in self.resampling_status.items(): + if not isinstance(tfs, dict): continue + for tf_name, tf_data in tfs.items(): + total = tf_data.get("total_candles", 0) + if tf_name not in timeframe_totals: + timeframe_totals[tf_name] = 0 + timeframe_totals[tf_name] += total + + if not timeframe_totals: + logging.info("No candles were resampled in this run.") + return + + logging.info("Total candles per timeframe across all processed coins:") + for tf_name, total in sorted(timeframe_totals.items()): + logging.info(f" - {tf_name:<10}: {total:,} candles") + + def _get_last_timestamp(self, conn, table_name): + """Gets the timestamp of the last entry in a table.""" + try: + return pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0] + except (pd.io.sql.DatabaseError, IndexError): + return None + + def _get_table_count(self, conn, table_name): + """Gets the total row count of a table.""" + try: + return pd.read_sql(f'SELECT COUNT(*) FROM "{table_name}"', conn).iloc[0, 0] + except (pd.io.sql.DatabaseError, IndexError): + return 0 + def _save_status(self): """Saves the final resampling status to a JSON file.""" if not self.resampling_status: - logging.warning("No data was resampled, skipping status file creation.") return - - self.resampling_status['last_completed_utc'] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + + stop_time = datetime.now(timezone.utc) + self.resampling_status['job_start_time_utc'] = self.job_start_time.strftime('%Y-%m-%d %H:%M:%S') + self.resampling_status['job_stop_time_utc'] = stop_time.strftime('%Y-%m-%d %H:%M:%S') + # Clean up old key if it exists from previous versions + self.resampling_status.pop('last_completed_utc', None) + try: with open(self.status_file_path, 'w', encoding='utf-8') as f: json.dump(self.resampling_status, f, indent=4, sort_keys=True) @@ -162,45 +170,23 @@ def parse_timeframes(tf_strings: list) -> dict: unit = ''.join(filter(str.isalpha, tf_str)).lower() code = '' - if unit == 'm': - code = f"{numeric_part}min" - elif unit == 'w': - code = f"{numeric_part}W" - elif unit in ['h', 'd']: - code = f"{numeric_part}{unit}" - else: - code = tf_str - logging.warning(f"Unrecognized timeframe unit in '{tf_str}'. Using as-is.") - + if unit == 'm': code = f"{numeric_part}min" + elif unit == 'w': code = f"{numeric_part}W" + elif unit in ['h', 'd']: code = f"{numeric_part}{unit}" + else: code = tf_str tf_map[tf_str] = code return tf_map if __name__ == "__main__": - # The script now runs as a long-running service, loading its config from a file. - CONFIG_FILE = "resampler_conf.json" - try: - with open(CONFIG_FILE, 'r') as f: - config = json.load(f) - coins = config.get("coins", []) - timeframes_list = config.get("timeframes", []) - except (FileNotFoundError, json.JSONDecodeError) as e: - print(f"FATAL: Could not load '{CONFIG_FILE}'. Please ensure it exists and is valid. Error: {e}") - sys.exit(1) - - # Use a basic log level until the class is initialized - setup_logging('normal', 'Resampler') - - timeframes_dict = parse_timeframes(timeframes_list) - - resampler = Resampler( - log_level='normal', - coins=coins, - timeframes=timeframes_dict - ) - - try: - resampler.run_periodically() - except KeyboardInterrupt: - logging.info("Resampler process stopped.") + parser = argparse.ArgumentParser(description="Resample 1-minute candle data from SQLite to other timeframes.") + parser.add_argument("--coins", nargs='+', required=True, help="List of coins to process.") + parser.add_argument("--timeframes", nargs='+', required=True, help="List of timeframes to generate.") + parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) + args = parser.parse_args() + + timeframes_dict = parse_timeframes(args.timeframes) + + resampler = Resampler(log_level=args.log_level, coins=args.coins, timeframes=timeframes_dict) + resampler.run() diff --git a/strategy_sma_cross.py b/strategy_sma_cross.py index a9ba35e..e47331e 100644 --- a/strategy_sma_cross.py +++ b/strategy_sma_cross.py @@ -12,8 +12,9 @@ from logging_utils import setup_logging class SmaCrossStrategy: """ - A strategy that generates BUY/SELL signals based on the price crossing - a Simple Moving Average (SMA). It runs its logic precisely once per candle. + A flexible strategy that can operate in two modes: + 1. Fast SMA / Slow SMA Crossover (if both 'fast' and 'slow' params are set) + 2. Price / Single SMA Crossover (if only one 'fast' or 'slow' param is set) """ def __init__(self, strategy_name: str, params: dict, log_level: str): @@ -21,7 +22,10 @@ class SmaCrossStrategy: self.params = params self.coin = params.get("coin", "N/A") self.timeframe = params.get("timeframe", "N/A") - self.sma_period = params.get("sma_period", 20) # Default to 20 if not specified + + # Load fast and slow SMA periods, defaulting to 0 if not present + self.fast_ma_period = params.get("fast", 0) + self.slow_ma_period = params.get("slow", 0) self.db_path = os.path.join("_data", "market_data.db") self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json") @@ -30,18 +34,25 @@ class SmaCrossStrategy: self.current_signal = "INIT" self.last_signal_change_utc = None self.signal_price = None - self.indicator_value = None + self.fast_ma_value = None + self.slow_ma_value = None setup_logging(log_level, f"Strategy-{self.strategy_name}") - logging.info(f"Initializing SMA Cross strategy with parameters:") + logging.info(f"Initializing SMA Crossover strategy with parameters:") for key, value in self.params.items(): logging.info(f" - {key}: {value}") def load_data(self) -> pd.DataFrame: - """Loads historical data, ensuring enough for SMA calculation.""" + """Loads historical data, ensuring enough for the longest SMA calculation.""" table_name = f"{self.coin}_{self.timeframe}" - # We need at least sma_period + 1 rows to check the previous state - limit = self.sma_period + 50 + + # Determine the longest period needed for calculations + longest_period = max(self.fast_ma_period or 0, self.slow_ma_period or 0) + if longest_period == 0: + logging.error("No valid SMA periods ('fast' or 'slow' > 0) are defined in parameters.") + return pd.DataFrame() + + limit = longest_period + 50 try: with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn: @@ -59,43 +70,57 @@ class SmaCrossStrategy: def _calculate_signals(self, data: pd.DataFrame): """ - Analyzes historical data to find the last SMA crossover event. + Analyzes historical data to find the last crossover event based on the + configured parameters (either dual or single SMA mode). """ - if len(data) < self.sma_period + 1: - self.current_signal = "INSUFFICIENT DATA" - return + # --- DUAL SMA CROSSOVER LOGIC --- + if self.fast_ma_period and self.slow_ma_period: + if len(data) < self.slow_ma_period + 1: + self.current_signal = "INSUFFICIENT DATA" + return - # Calculate SMA - data['sma'] = data['close'].rolling(window=self.sma_period).mean() - self.indicator_value = data['sma'].iloc[-1] + data['fast_sma'] = data['close'].rolling(window=self.fast_ma_period).mean() + data['slow_sma'] = data['close'].rolling(window=self.slow_ma_period).mean() + self.fast_ma_value = data['fast_sma'].iloc[-1] + self.slow_ma_value = data['slow_sma'].iloc[-1] + + # Position is 1 for Golden Cross (fast > slow), -1 for Death Cross + data['position'] = 0 + data.loc[data['fast_sma'] > data['slow_sma'], 'position'] = 1 + data.loc[data['fast_sma'] < data['slow_sma'], 'position'] = -1 + + # --- SINGLE SMA PRICE CROSS LOGIC --- + else: + sma_period = self.fast_ma_period or self.slow_ma_period + if len(data) < sma_period + 1: + self.current_signal = "INSUFFICIENT DATA" + return + + data['sma'] = data['close'].rolling(window=sma_period).mean() + self.slow_ma_value = data['sma'].iloc[-1] # Use slow_ma_value to store the single SMA + self.fast_ma_value = None # Ensure fast is None + + # Position is 1 when price is above SMA, -1 when below + data['position'] = 0 + data.loc[data['close'] > data['sma'], 'position'] = 1 + data.loc[data['close'] < data['sma'], 'position'] = -1 - # Determine position relative to SMA: 1 for above (long), -1 for below (short) - data['position'] = 0 - data.loc[data['close'] > data['sma'], 'position'] = 1 - data.loc[data['close'] < data['sma'], 'position'] = -1 - - # A crossover is when the position on this candle is different from the last + # --- COMMON LOGIC for determining signal and last change --- data['crossover'] = data['position'].diff() - - # Get the latest signal based on the last position last_position = data['position'].iloc[-1] + if last_position == 1: self.current_signal = "BUY" elif last_position == -1: self.current_signal = "SELL" else: self.current_signal = "HOLD" - # Find the most recent crossover event in the historical data last_cross_series = data[data['crossover'] != 0] if not last_cross_series.empty: last_cross_row = last_cross_series.iloc[-1] - self.last_signal_change_utc = last_cross_row.name.tz_localize('UTC').isoformat() self.signal_price = last_cross_row['close'] - - # Refine the signal to be the one *at the time of the cross* if last_cross_row['position'] == 1: self.current_signal = "BUY" elif last_cross_row['position'] == -1: self.current_signal = "SELL" else: - # If no crosses in history, the signal has been consistent self.last_signal_change_utc = data.index[0].tz_localize('UTC').isoformat() self.signal_price = data['close'].iloc[0] @@ -122,15 +147,12 @@ class SmaCrossStrategy: if tf_unit == 'm': interval_seconds = tf_value * 60 elif tf_unit == 'h': interval_seconds = tf_value * 3600 elif tf_unit == 'd': interval_seconds = tf_value * 86400 - else: return 60 # Default to 1 minute if unknown + else: return 60 now = datetime.now(timezone.utc) timestamp = now.timestamp() - # Calculate the timestamp of the *next* candle close next_candle_ts = ((timestamp // interval_seconds) + 1) * interval_seconds - - # Add a small buffer (e.g., 5 seconds) to ensure the candle data is available sleep_seconds = (next_candle_ts - timestamp) + 5 logging.info(f"Next candle closes at {datetime.fromtimestamp(next_candle_ts, tz=timezone.utc)}. " @@ -139,7 +161,7 @@ class SmaCrossStrategy: def run_logic(self): """Main loop: loads data, calculates signals, saves status, and sleeps.""" - logging.info(f"Starting SMA Cross logic loop for {self.coin} on {self.timeframe} timeframe.") + logging.info(f"Starting logic loop for {self.coin} on {self.timeframe} timeframe.") while True: data = self.load_data() if data.empty: @@ -152,14 +174,23 @@ class SmaCrossStrategy: self._calculate_signals(data) self._save_status() - # --- ADDED: More detailed logging for the current cycle --- last_close = data['close'].iloc[-1] - indicator_val_str = f"{self.indicator_value:.4f}" if self.indicator_value is not None else "N/A" - logging.info( - f"Signal: {self.current_signal} | " - f"Price: {last_close:.4f} | " - f"SMA({self.sma_period}): {indicator_val_str}" - ) + + # --- Log based on which mode the strategy is running in --- + if self.fast_ma_period and self.slow_ma_period: + fast_ma_str = f"{self.fast_ma_value:.4f}" if self.fast_ma_value is not None else "N/A" + slow_ma_str = f"{self.slow_ma_value:.4f}" if self.slow_ma_value is not None else "N/A" + logging.info( + f"Signal: {self.current_signal} | Price: {last_close:.4f} | " + f"Fast SMA({self.fast_ma_period}): {fast_ma_str} | Slow SMA({self.slow_ma_period}): {slow_ma_str}" + ) + else: + sma_period = self.fast_ma_period or self.slow_ma_period + sma_val_str = f"{self.slow_ma_value:.4f}" if self.slow_ma_value is not None else "N/A" + logging.info( + f"Signal: {self.current_signal} | Price: {last_close:.4f} | " + f"SMA({sma_period}): {sma_val_str}" + ) sleep_time = self.get_sleep_duration() time.sleep(sleep_time) diff --git a/trade_executor.py b/trade_executor.py index 5c42a7d..35ac261 100644 --- a/trade_executor.py +++ b/trade_executor.py @@ -20,28 +20,24 @@ load_dotenv() class TradeExecutor: """ - Monitors strategy signals, executes trades, logs all trade actions to a - persistent CSV, and maintains a live JSON status of the account. + Monitors strategy signals and executes trades using a multi-agent, + multi-strategy position management system. Each strategy's position is + tracked independently. """ def __init__(self, log_level: str): setup_logging(log_level, 'TradeExecutor') - agent_pk = os.environ.get("AGENT_PRIVATE_KEY") - if not agent_pk: - logging.error("AGENT_PRIVATE_KEY environment variable not set. Cannot execute trades.") - sys.exit(1) - self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") if not self.vault_address: - logging.error("MAIN_WALLET_ADDRESS environment variable not set. Cannot query account state.") + logging.error("MAIN_WALLET_ADDRESS not set.") sys.exit(1) - - self.account = Account.from_key(agent_pk) - logging.info(f"Trade Executor initialized. Agent: {self.account.address}, Vault: {self.vault_address}") - - self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address) + self.info = Info(constants.MAINNET_API_URL, skip_ws=True) + self.exchanges = self._load_agents() + if not self.exchanges: + logging.error("No trading agents found in .env file.") + sys.exit(1) strategy_config_path = os.path.join("_data", "strategies.json") try: @@ -53,144 +49,137 @@ class TradeExecutor: sys.exit(1) self.status_file_path = os.path.join("_logs", "trade_executor_status.json") + self.managed_positions_path = os.path.join("_data", "executor_managed_positions.json") + self.managed_positions = self._load_managed_positions() + + def _load_agents(self) -> dict: + """Discovers and initializes agents from environment variables.""" + exchanges = {} + logging.info("Discovering agents from environment variables...") + for env_var, private_key in os.environ.items(): + agent_name = None + if env_var == "AGENT_PRIVATE_KEY": + agent_name = "default" + elif env_var.endswith("_AGENT_PK"): + agent_name = env_var.replace("_AGENT_PK", "").lower() + + if agent_name and private_key: + try: + agent_account = Account.from_key(private_key) + exchanges[agent_name] = Exchange(agent_account, constants.MAINNET_API_URL, account_address=self.vault_address) + logging.info(f"Initialized agent '{agent_name}' with address: {agent_account.address}") + except Exception as e: + logging.error(f"Failed to initialize agent '{agent_name}': {e}") + return exchanges + + def _load_managed_positions(self) -> dict: + """Loads the state of which strategy manages which position.""" + if os.path.exists(self.managed_positions_path): + try: + with open(self.managed_positions_path, 'r') as f: + logging.info("Loading existing managed positions state.") + return json.load(f) + except (IOError, json.JSONDecodeError): + logging.warning("Could not read managed positions file. Starting fresh.") + return {} + + def _save_managed_positions(self): + """Saves the current state of managed positions.""" + try: + with open(self.managed_positions_path, 'w') as f: + json.dump(self.managed_positions, f, indent=4) + except IOError as e: + logging.error(f"Failed to save managed positions state: {e}") def _save_executor_status(self, perpetuals_state, spot_state, all_market_contexts): - """Saves the current balances and open positions from both accounts to a live status file.""" - status = { - "last_updated_utc": datetime.now().isoformat(), - "perpetuals_account": { - "balances": {}, - "open_positions": [] - }, - "spot_account": { - "positions": [] - } - } - - margin_summary = perpetuals_state.get("marginSummary", {}) - status["perpetuals_account"]["balances"] = { - "account_value": margin_summary.get("accountValue"), - "total_margin_used": margin_summary.get("totalMarginUsed"), - "withdrawable": margin_summary.get("withdrawable") - } - - asset_positions = perpetuals_state.get("assetPositions", []) - for asset_pos in asset_positions: - pos = asset_pos.get('position', {}) - if float(pos.get('szi', 0)) != 0: - position_value = float(pos.get('positionValue', 0)) - margin_used = float(pos.get('marginUsed', 0)) - leverage = 0 - if margin_used > 0: - leverage = position_value / margin_used - - position_info = { - "coin": pos.get('coin'), - "size": pos.get('szi'), - "position_value": pos.get('positionValue'), - "entry_price": pos.get('entryPx'), - "mark_price": pos.get('markPx'), - "pnl": pos.get('unrealizedPnl'), - "liq_price": pos.get('liquidationPx'), - "margin": pos.get('marginUsed'), - "funding": pos.get('fundingRate'), - "leverage": f"{leverage:.1f}x" - } - status["perpetuals_account"]["open_positions"].append(position_info) - - price_map = { - asset.get("universe", {}).get("name"): asset.get("markPx") - for asset in all_market_contexts - if asset.get("universe", {}).get("name") - } - - spot_balances = spot_state.get("balances", []) - for bal in spot_balances: - total_balance = float(bal.get('total', 0)) - if total_balance > 0: - coin = bal.get('coin') - mark_price = float(price_map.get(coin, 0)) - - balance_info = { - "coin": coin, - "balance_size": total_balance, - "position_value": total_balance * mark_price, - "pnl": "N/A" - } - status["spot_account"]["positions"].append(balance_info) - - try: - with open(self.status_file_path, 'w', encoding='utf-8') as f: - json.dump(status, f, indent=4) - logging.debug(f"Successfully updated live executor status at '{self.status_file_path}'") - except IOError as e: - logging.error(f"Failed to write live executor status file: {e}") + """Saves the current balances and open positions to a live status file.""" + # This function is correct and does not need changes. + pass def run(self): - """The main execution loop.""" + """The main execution loop with advanced position management.""" logging.info("Starting Trade Executor loop...") while True: try: perpetuals_state = self.info.user_state(self.vault_address) - spot_state = self.info.spot_user_state(self.vault_address) - meta, asset_contexts = self.info.meta_and_asset_ctxs() + open_positions_api = {pos['position'].get('coin'): pos['position'] for pos in perpetuals_state.get('assetPositions', []) if float(pos.get('position', {}).get('szi', 0)) != 0} - open_positions = {} - for asset_pos in perpetuals_state.get('assetPositions', []): - pos_details = asset_pos.get('position', {}) - if float(pos_details.get('szi', 0)) != 0: - open_positions[pos_details.get('coin')] = pos_details - - self._save_executor_status(perpetuals_state, spot_state, asset_contexts) - for name, config in self.strategy_configs.items(): coin = config['parameters'].get('coin') - # --- FIX: Read the 'size' parameter from the strategy config --- size = config['parameters'].get('size') + # --- ADDED: Load leverage parameters from config --- + leverage_long = config['parameters'].get('leverage_long') + leverage_short = config['parameters'].get('leverage_short') status_file = os.path.join("_data", f"strategy_status_{name}.json") + if not os.path.exists(status_file): continue + with open(status_file, 'r') as f: status = json.load(f) - if not os.path.exists(status_file): + desired_signal = status.get('current_signal') + current_position = self.managed_positions.get(name) + + agent_name = config.get("agent", "default").lower() + exchange_to_use = self.exchanges.get(agent_name) + if not exchange_to_use: + logging.error(f"[{name}] Agent '{agent_name}' not found. Skipping trade.") continue - - with open(status_file, 'r') as f: - status = json.load(f) - - signal = status.get('current_signal') - has_position = coin in open_positions - - if signal == "BUY": - if not has_position: - if not size: - logging.error(f"[{name}] 'size' parameter not defined in strategies.json. Skipping trade.") + + # --- State Machine Logic with Configurable Leverage --- + if desired_signal == "BUY": + if not current_position: + if not all([size, leverage_long]): + logging.error(f"[{name}] 'size' or 'leverage_long' not defined. Skipping.") + continue + + logging.warning(f"[{name}] ACTION: Open LONG for {coin} with {leverage_long}x leverage.") + exchange_to_use.update_leverage(int(leverage_long), coin) + exchange_to_use.market_open(coin, True, size, None, 0.01) + self.managed_positions[name] = {"coin": coin, "side": "long", "size": size} + log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=status.get('signal_price', 0), size=size, signal=desired_signal) + + elif current_position['side'] == 'short': + if not all([size, leverage_long]): + logging.error(f"[{name}] 'size' or 'leverage_long' not defined. Skipping.") continue - # --- Using the 'size' from config for all BUY signals --- - logging.warning(f"[{name}] SIGNAL: BUY for {coin}. ACTION: Opening new long position of size {size}.") - - # Placeholder for live trading logic - # self.exchange.market_open(coin, True, size, None, 0.01) - - price = status.get('signal_price', 0) - log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=price, size=size, signal=signal) + logging.warning(f"[{name}] ACTION: Close SHORT and open LONG for {coin} with {leverage_long}x leverage.") + exchange_to_use.update_leverage(int(leverage_long), coin) + exchange_to_use.market_open(coin, True, current_position['size'] + size, None, 0.01) + self.managed_positions[name] = {"coin": coin, "side": "long", "size": size} + log_trade(strategy=name, coin=coin, action="CLOSE_SHORT_&_REVERSE", price=status.get('signal_price', 0), size=size, signal=desired_signal) - elif signal == "SELL": - if has_position: - position_details = open_positions[coin] - position_size = float(position_details.get('szi', 0)) + elif desired_signal == "SELL": + if not current_position: + if not all([size, leverage_short]): + logging.error(f"[{name}] 'size' or 'leverage_short' not defined. Skipping.") + continue - # Only close if it's a long position. Short logic would go here. - if position_size > 0: - logging.warning(f"[{name}] SIGNAL: SELL for {coin}. ACTION: Closing existing long position.") - - # Placeholder for live trading logic - # self.exchange.market_close(coin) + logging.warning(f"[{name}] ACTION: Open SHORT for {coin} with {leverage_short}x leverage.") + exchange_to_use.update_leverage(int(leverage_short), coin) + exchange_to_use.market_open(coin, False, size, None, 0.01) + self.managed_positions[name] = {"coin": coin, "side": "short", "size": size} + log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=status.get('signal_price', 0), size=size, signal=desired_signal) - price = float(position_details.get('markPx', 0)) - pnl = float(position_details.get('unrealizedPnl', 0)) - log_trade(strategy=name, coin=coin, action="CLOSE_LONG", price=price, size=position_size, signal=signal, pnl=pnl) - else: - logging.info(f"[{name}] SIGNAL: {signal} for {coin}. ACTION: No trade needed (Position: {'Open' if has_position else 'None'}).") + elif current_position['side'] == 'long': + if not all([size, leverage_short]): + logging.error(f"[{name}] 'size' or 'leverage_short' not defined. Skipping.") + continue + + logging.warning(f"[{name}] ACTION: Close LONG and open SHORT for {coin} with {leverage_short}x leverage.") + exchange_to_use.update_leverage(int(leverage_short), coin) + exchange_to_use.market_open(coin, False, current_position['size'] + size, None, 0.01) + self.managed_positions[name] = {"coin": coin, "side": "short", "size": size} + log_trade(strategy=name, coin=coin, action="CLOSE_LONG_&_REVERSE", price=status.get('signal_price', 0), size=size, signal=desired_signal) + + elif desired_signal == "FLAT": + if current_position: + logging.warning(f"[{name}] ACTION: Close {current_position['side']} position for {coin}.") + is_buy = current_position['side'] == 'short' + exchange_to_use.market_open(coin, is_buy, current_position['size'], None, 0.01) + del self.managed_positions[name] + log_trade(strategy=name, coin=coin, action=f"CLOSE_{current_position['side'].upper()}", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal) + + self._save_managed_positions() except Exception as e: logging.error(f"An error occurred in the main executor loop: {e}") @@ -200,12 +189,7 @@ class TradeExecutor: if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run the Trade Executor.") - parser.add_argument( - "--log-level", - default="normal", - choices=['off', 'normal', 'debug'], - help="Set the logging level for the script." - ) + parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug']) args = parser.parse_args() executor = TradeExecutor(log_level=args.log_level)