From 25df8b8ba9d169bd2ef9f215967328a399fa9435 Mon Sep 17 00:00:00 2001 From: DiTus Date: Thu, 16 Oct 2025 13:18:39 +0200 Subject: [PATCH] trade_executor, agent creator --- __pycache__/trade_log.cpython-313.pyc | Bin 0 -> 2574 bytes _data/market_cap_data.json | 6 +- _data/strategies.json | 39 ++--- _data/strategy_status_sma_125d_btc.json | 2 +- _data/strategy_status_sma_125d_eth.json | 2 +- _data/strategy_status_sma_44d_btc.json | 2 +- _data/strategy_status_sma_5m_eth.json | 8 +- _data/strategy_status_sma_cross_1.json | 6 +- _data/strategy_status_sma_cross_2.json | 8 +- agents | 11 +- create_agent.py | 69 ++++++++ main_app.py | 139 ++++++++++----- resampler.py | 111 ++++++------ trade_executor.py | 216 ++++++++++++++++++++++++ trade_log.py | 55 ++++++ 15 files changed, 541 insertions(+), 133 deletions(-) create mode 100644 __pycache__/trade_log.cpython-313.pyc create mode 100644 create_agent.py create mode 100644 trade_executor.py create mode 100644 trade_log.py diff --git a/__pycache__/trade_log.cpython-313.pyc b/__pycache__/trade_log.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0827178c6472ec9c94d54d5193290dac2664acdb GIT binary patch literal 2574 zcma)7U2GIp6u$GbyEC)9ZMQ!ZN-eh=OS`eGB>Yqwt*PzOlr}|oSi$Tjb=aNV4s2)E zJ5!;3>Z6bt=z}ToMdE|vnM(^g%`R=** z{M~cz*^PKyK`?~-e@uNLA@mDd1f$*u8|MLhf>fk(V~BW^BOI*xG5#1&_+tVQu*VmP zSP-+W&OnVvljxAoAYnDC3WbO&!VJL-7bGVcqr(xTum|B$zHM3fb z_--%~hbh3^SmNCXIWv-He0OAl_w2}qp5-8vi>Uk||Ewwfhx~TyzN%d05a+=jpHtd_ z&^A4gmuE*3tOimTy^2)f9nO@u$-Kz-=(AeDcPY?fD*0Hj&m)XFP(H{%oKi(Kq=wbV z5s^n`G~W=M%r*vSizd;j?^WptZ^m-UW<(&7i)CB3P6a5a3e5If1MU{h?+kFZEkIi| ziSq5iD%%mDEgD3Z?cCx8D6pJEIWEWNgq)ZQ<-)3v-n^{G)$epMcZAJ-s zEb|!cf&0T~qV)}74=Qt=D4z^$Mvw;anDKm9z)_W4ybc}U15cT>Dx2}a_(Y#a%Ul@+ zn*PmJMfOglqFOVgUKy;_bhBh*4HLa+krKAbfZ?f>bnLwl>{!65>BKf{M>icjq`r%* zRt1-hs-9MqK~k~%6$~SIVYmlSsvn=2(y^&c>rAHucyr&G(j2g1R4O{rOYWX$EA8rG z?{m8jcYRtn%!=n&3HRvfO1c+!zjb1$JLL)a&TcauxUZenL7gvg{O!!dDX4|-I5nKec zd6jk=59?)Zrs~)%&->H+(+UkuLVE4FgzLiO6r|lEv+1J!VM?NL)=+Ino32gHI7Py` z90{`z>sN#|AR@C5YwU#e5z1*4&4p&1^1j!gHAhVST-9LxST~DS2^>jrR4E&JwZuxr zo|xM=lpQ@WIy5-mkB@3lVo)CzK1U2kXTZCbp#1uAs)_QJO~sn#Oi}TS+u8N=;Id7n zX~=xZAT|vfHY-*tfgHqEJWE9j%AN`k6IF(dqH~hzCp7BXnbM*936EEiR5ol2u5(&*Xk_$wh7gOm|0SlAs{23J5@Ei8cT{GsO}hR^Jj*tXx;#G)u1v$8 zE0ojiI`Du!wyizcAW!bSCq5sUI^)^($ zV}4|

Z*?5Lt{Z#IBsYz4ypUL;w8fqcDoqUFy5gcWL0lz?JU1(d2S8xh5%>q8Fo= zNA5`Ne}+;0``r8p9BgV?>|W?zOfRIb?pkSlWq#~|oLJ|ZPtxneYbT1N`FQ6OI|p}j zGt6ZY!c8eQ+{@o=9y~m}JG7)ouvpry45#F!E`i}5c4#T(;$9_F7g{=8mytua#1Qar z$syp}suLJaD49Kx6juS&?no)5831AbMs8twH6=S!@M_t6pyD`GC}IC$RDhOALvA;6 zllK+cU`j|Y+mv3Pwn{TqeSjQ10} | {'Market Cap':>15} |") + left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Live Price':>10} | {'Market Cap':>15} |") left_table_lines.append("-" * left_table_width) for i, coin in enumerate(self.watched_coins, 1): price = self.prices.get(coin, "Loading...") @@ -241,12 +256,11 @@ class MainApp: left_table_lines.append(f"{i:<2} | {coin:^6} | {price:>10} | {formatted_mc:>15} |") left_table_lines.append("-" * left_table_width) - # --- Build Right Table (Strategy Status) --- right_table_lines = [] - right_table_width = 148 + right_table_width = 154 right_table_lines.append("--- Strategy Status ---") right_table_lines.append("-" * right_table_width) - right_table_lines.append(f"{'#':<2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':<8} | {'Signal Price':>12} | {'Last Change (Local)':>22} | {'TF':^5} | {'Parameters':<45} |") + right_table_lines.append(f"{'#':^2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':^8} | {'Signal Price':>12} | {'Last Change':>17} | {'TF':^5} | {'Size':^8} | {'Parameters':<45} |") right_table_lines.append("-" * right_table_width) for i, (name, status) in enumerate(self.strategy_statuses.items(), 1): signal = status.get('current_signal', 'N/A') @@ -255,7 +269,6 @@ class MainApp: last_change = status.get('last_signal_change_utc') last_change_display = 'Never' if last_change: - # Convert UTC timestamp from file to local time for display dt_utc = datetime.fromisoformat(last_change.replace('Z', '+00:00')).replace(tzinfo=timezone.utc) dt_local = dt_utc.astimezone(None) last_change_display = dt_local.strftime('%Y-%m-%d %H:%M') @@ -263,14 +276,14 @@ class MainApp: config_params = self.strategy_configs.get(name, {}).get('parameters', {}) coin = config_params.get('coin', 'N/A') timeframe = config_params.get('timeframe', 'N/A') + size = config_params.get('size', 'N/A') - other_params = {k: v for k, v in config_params.items() if k not in ['coin', 'timeframe']} + other_params = {k: v for k, v in config_params.items() if k not in ['coin', 'timeframe', 'size']} params_str = ", ".join([f"{k}={v}" for k, v in other_params.items()]) - right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:<8} | {price_display:>12} | {last_change_display:>22} | {timeframe:^5} | {params_str:<45} |") + right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size:>8} | {params_str:<45} |") right_table_lines.append("-" * right_table_width) - # --- Combine Tables Side-by-Side --- output_lines = [] max_rows = max(len(left_table_lines), len(right_table_lines)) separator = " " @@ -280,8 +293,43 @@ class MainApp: right_part = indent + right_table_lines[i] if i < len(right_table_lines) else "" output_lines.append(f"{left_part}{separator}{right_part}") - # --- Add Bottom Sections --- output_lines.append(f"\nDB Status: Last update -> {self.last_db_update_info}") + + output_lines.append("\n--- Open Positions ---") + pos_table_width = 100 + output_lines.append("-" * pos_table_width) + output_lines.append(f"{'Account':<10} | {'Coin':<6} | {'Size':>15} | {'Entry Price':>12} | {'Mark Price':>12} | {'PNL':>15} | {'Leverage':>10} |") + output_lines.append("-" * pos_table_width) + + perps_positions = self.open_positions.get('perpetuals_account', {}).get('open_positions', []) + spot_positions = self.open_positions.get('spot_account', {}).get('positions', []) + + if not perps_positions and not spot_positions: + output_lines.append("No open positions found.") + else: + for pos in perps_positions: + # --- FIX: Safely handle potentially None values before formatting --- + try: + pnl = float(pos.get('pnl', 0.0)) + pnl_str = f"${pnl:,.2f}" + except (ValueError, TypeError): + pnl_str = "Error" + + coin = pos.get('coin') or '-' + size = pos.get('size') or '-' + entry_price = pos.get('entry_price') or '-' + mark_price = pos.get('mark_price') or '-' + leverage = pos.get('leverage') or '-' + + output_lines.append(f"{'Perps':<10} | {coin:<6} | {size:>15} | {entry_price:>12} | {mark_price:>12} | {pnl_str:>15} | {leverage:>10} |") + + for pos in spot_positions: + pnl = pos.get('pnl', 'N/A') + coin = pos.get('coin') or '-' + balance_size = pos.get('balance_size') or '-' + output_lines.append(f"{'Spot':<10} | {coin:<6} | {balance_size:>15} | {'-':>12} | {'-':>12} | {pnl:>15} | {'-':>10} |") + output_lines.append("-" * pos_table_width) + output_lines.append("\n--- Background Processes ---") for name, status in self.process_status.items(): output_lines.append(f"{name:<25}: {status}") @@ -297,6 +345,7 @@ class MainApp: self.read_market_caps() self.get_overall_db_status() self.read_strategy_statuses() + self.read_executor_status() self.check_process_status() self.display_dashboard() time.sleep(2) @@ -318,25 +367,37 @@ if __name__ == "__main__": processes = {} strategy_configs = {} - processes["Market Feeder"] = multiprocessing.Process(target=run_market_feeder, daemon=True) - processes["Data Fetcher"] = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True) - processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, daemon=True) - processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) - try: with open(STRATEGY_CONFIG_FILE, 'r') as f: strategy_configs = json.load(f) - for name, config in strategy_configs.items(): - if config.get("enabled", False): - if not os.path.exists(config['script']): - logging.error(f"Strategy script '{config['script']}' for strategy '{name}' not found. Skipping.") - continue - proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True) - processes[f"Strategy: {name}"] = proc except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}") + sys.exit(1) + + required_timeframes = set() + for name, config in strategy_configs.items(): + if config.get("enabled", False): + tf = config.get("parameters", {}).get("timeframe") + if tf: + required_timeframes.add(tf) + + if not required_timeframes: + logging.warning("No timeframes required by any enabled strategy. Resampler will not run effectively.") + + + processes["Market Feeder"] = multiprocessing.Process(target=run_market_feeder, daemon=True) + processes["Data Fetcher"] = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True) + processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True) + processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) + + for name, config in strategy_configs.items(): + if config.get("enabled", False): + if not os.path.exists(config['script']): + logging.error(f"Strategy script '{config['script']}' for strategy '{name}' not found. Skipping.") + continue + proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True) + processes[f"Strategy: {name}"] = proc - # Launch all processes for name, proc in processes.items(): logging.info(f"Starting process '{name}'...") proc.start() diff --git a/resampler.py b/resampler.py index ee36a58..555e9c6 100644 --- a/resampler.py +++ b/resampler.py @@ -5,7 +5,8 @@ import sys import sqlite3 import pandas as pd import json -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta +import time # Assuming logging_utils.py is in the same directory from logging_utils import setup_logging @@ -14,6 +15,7 @@ class Resampler: """ Reads 1-minute candle data directly from the SQLite database, resamples it to various timeframes, and stores the results back in the database. + This script is designed to run continuously as a self-scheduling service. """ def __init__(self, log_level: str, coins: list, timeframes: dict): @@ -30,27 +32,19 @@ class Resampler: 'volume': 'sum', 'number_of_trades': 'sum' } - self.resampling_status = self._load_existing_status() + self.resampling_status = {} - def _load_existing_status(self) -> dict: - """Loads the existing status file if it exists, otherwise returns an empty dict.""" - if os.path.exists(self.status_file_path): - try: - with open(self.status_file_path, 'r', encoding='utf-8') as f: - logging.info(f"Loading existing status from '{self.status_file_path}'") - return json.load(f) - except (IOError, json.JSONDecodeError) as e: - logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}") - return {} - - def run(self): + def _execute_resampling_job(self): """ Main execution function to process all configured coins and update the database. """ if not os.path.exists(self.db_path): logging.error(f"Database file '{self.db_path}' not found. " "Please run the data fetcher script first.") - sys.exit(1) + return # Don't exit, just wait for the next cycle + + # Load the latest status file at the start of each job + self.resampling_status = self._load_existing_status() with sqlite3.connect(self.db_path) as conn: conn.execute("PRAGMA journal_mode=WAL;") @@ -109,7 +103,40 @@ class Resampler: logging.error(f"Failed to process coin '{coin}': {e}") self._save_status() - logging.info("--- Resampling process complete ---") + logging.info("--- Resampling job complete ---") + + def run_periodically(self): + """Runs the resampling job at every 5-minute mark of the hour (00, 05, 10...).""" + logging.info("Resampler started. Waiting for the first scheduled run...") + while True: + # 1. Calculate sleep time + now = datetime.now(timezone.utc) + # Calculate how many minutes past the last 5-minute mark we are + minutes_past_mark = now.minute % 5 + seconds_past_mark = minutes_past_mark * 60 + now.second + (now.microsecond / 1_000_000) + + # The total interval is 5 minutes (300 seconds) + sleep_duration = 300 - seconds_past_mark + + # Add a small buffer to ensure the candle data is ready + sleep_duration += 5 + + logging.info(f"Next resampling run in {sleep_duration:.2f} seconds.") + time.sleep(sleep_duration) + + # 2. Execute the job + logging.info("Scheduled time reached. Starting resampling job...") + self._execute_resampling_job() + + def _load_existing_status(self) -> dict: + """Loads the existing status file if it exists, otherwise returns an empty dict.""" + if os.path.exists(self.status_file_path): + try: + with open(self.status_file_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (IOError, json.JSONDecodeError) as e: + logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}") + return {} def _save_status(self): """Saves the final resampling status to a JSON file.""" @@ -138,7 +165,6 @@ def parse_timeframes(tf_strings: list) -> dict: if unit == 'm': code = f"{numeric_part}min" elif unit == 'w': - # --- FIX: Use uppercase 'W' for weeks to avoid deprecation warning --- code = f"{numeric_part}W" elif unit in ['h', 'd']: code = f"{numeric_part}{unit}" @@ -151,39 +177,30 @@ def parse_timeframes(tf_strings: list) -> dict: if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Resample 1-minute candle data from SQLite to other timeframes.") - parser.add_argument( - "--coins", - nargs='+', - default=["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"], - help="List of coins to process." - ) - parser.add_argument( - "--timeframes", - nargs='+', - default=['4m', '5m', '15m', '30m', '37m', '148m', '4h', '12h', '1d', '1w'], - help="List of timeframes to generate (e.g., 5m 1h 1d)." - ) - parser.add_argument( - "--timeframe", - dest="timeframes", - nargs='+', - help=argparse.SUPPRESS - ) - parser.add_argument( - "--log-level", - default="normal", - choices=['off', 'normal', 'debug'], - help="Set the logging level for the script." - ) - args = parser.parse_args() + # The script now runs as a long-running service, loading its config from a file. + CONFIG_FILE = "resampler_conf.json" + try: + with open(CONFIG_FILE, 'r') as f: + config = json.load(f) + coins = config.get("coins", []) + timeframes_list = config.get("timeframes", []) + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"FATAL: Could not load '{CONFIG_FILE}'. Please ensure it exists and is valid. Error: {e}") + sys.exit(1) + + # Use a basic log level until the class is initialized + setup_logging('normal', 'Resampler') - timeframes_dict = parse_timeframes(args.timeframes) + timeframes_dict = parse_timeframes(timeframes_list) resampler = Resampler( - log_level=args.log_level, - coins=args.coins, + log_level='normal', + coins=coins, timeframes=timeframes_dict ) - resampler.run() + + try: + resampler.run_periodically() + except KeyboardInterrupt: + logging.info("Resampler process stopped.") diff --git a/trade_executor.py b/trade_executor.py new file mode 100644 index 0000000..5c42a7d --- /dev/null +++ b/trade_executor.py @@ -0,0 +1,216 @@ +import argparse +import logging +import os +import sys +import json +import time +from datetime import datetime + +from eth_account import Account +from hyperliquid.exchange import Exchange +from hyperliquid.info import Info +from hyperliquid.utils import constants +from dotenv import load_dotenv + +from logging_utils import setup_logging +from trade_log import log_trade + +# Load environment variables from a .env file +load_dotenv() + +class TradeExecutor: + """ + Monitors strategy signals, executes trades, logs all trade actions to a + persistent CSV, and maintains a live JSON status of the account. + """ + + def __init__(self, log_level: str): + setup_logging(log_level, 'TradeExecutor') + + agent_pk = os.environ.get("AGENT_PRIVATE_KEY") + if not agent_pk: + logging.error("AGENT_PRIVATE_KEY environment variable not set. Cannot execute trades.") + sys.exit(1) + + self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS") + if not self.vault_address: + logging.error("MAIN_WALLET_ADDRESS environment variable not set. Cannot query account state.") + sys.exit(1) + + self.account = Account.from_key(agent_pk) + logging.info(f"Trade Executor initialized. Agent: {self.account.address}, Vault: {self.vault_address}") + + self.exchange = Exchange(self.account, constants.MAINNET_API_URL, account_address=self.vault_address) + self.info = Info(constants.MAINNET_API_URL, skip_ws=True) + + strategy_config_path = os.path.join("_data", "strategies.json") + try: + with open(strategy_config_path, 'r') as f: + self.strategy_configs = {name: config for name, config in json.load(f).items() if config.get("enabled")} + logging.info(f"Loaded {len(self.strategy_configs)} enabled strategies.") + except (FileNotFoundError, json.JSONDecodeError) as e: + logging.error(f"Could not load strategies from '{strategy_config_path}': {e}") + sys.exit(1) + + self.status_file_path = os.path.join("_logs", "trade_executor_status.json") + + def _save_executor_status(self, perpetuals_state, spot_state, all_market_contexts): + """Saves the current balances and open positions from both accounts to a live status file.""" + status = { + "last_updated_utc": datetime.now().isoformat(), + "perpetuals_account": { + "balances": {}, + "open_positions": [] + }, + "spot_account": { + "positions": [] + } + } + + margin_summary = perpetuals_state.get("marginSummary", {}) + status["perpetuals_account"]["balances"] = { + "account_value": margin_summary.get("accountValue"), + "total_margin_used": margin_summary.get("totalMarginUsed"), + "withdrawable": margin_summary.get("withdrawable") + } + + asset_positions = perpetuals_state.get("assetPositions", []) + for asset_pos in asset_positions: + pos = asset_pos.get('position', {}) + if float(pos.get('szi', 0)) != 0: + position_value = float(pos.get('positionValue', 0)) + margin_used = float(pos.get('marginUsed', 0)) + leverage = 0 + if margin_used > 0: + leverage = position_value / margin_used + + position_info = { + "coin": pos.get('coin'), + "size": pos.get('szi'), + "position_value": pos.get('positionValue'), + "entry_price": pos.get('entryPx'), + "mark_price": pos.get('markPx'), + "pnl": pos.get('unrealizedPnl'), + "liq_price": pos.get('liquidationPx'), + "margin": pos.get('marginUsed'), + "funding": pos.get('fundingRate'), + "leverage": f"{leverage:.1f}x" + } + status["perpetuals_account"]["open_positions"].append(position_info) + + price_map = { + asset.get("universe", {}).get("name"): asset.get("markPx") + for asset in all_market_contexts + if asset.get("universe", {}).get("name") + } + + spot_balances = spot_state.get("balances", []) + for bal in spot_balances: + total_balance = float(bal.get('total', 0)) + if total_balance > 0: + coin = bal.get('coin') + mark_price = float(price_map.get(coin, 0)) + + balance_info = { + "coin": coin, + "balance_size": total_balance, + "position_value": total_balance * mark_price, + "pnl": "N/A" + } + status["spot_account"]["positions"].append(balance_info) + + try: + with open(self.status_file_path, 'w', encoding='utf-8') as f: + json.dump(status, f, indent=4) + logging.debug(f"Successfully updated live executor status at '{self.status_file_path}'") + except IOError as e: + logging.error(f"Failed to write live executor status file: {e}") + + def run(self): + """The main execution loop.""" + logging.info("Starting Trade Executor loop...") + while True: + try: + perpetuals_state = self.info.user_state(self.vault_address) + spot_state = self.info.spot_user_state(self.vault_address) + meta, asset_contexts = self.info.meta_and_asset_ctxs() + + open_positions = {} + for asset_pos in perpetuals_state.get('assetPositions', []): + pos_details = asset_pos.get('position', {}) + if float(pos_details.get('szi', 0)) != 0: + open_positions[pos_details.get('coin')] = pos_details + + self._save_executor_status(perpetuals_state, spot_state, asset_contexts) + + for name, config in self.strategy_configs.items(): + coin = config['parameters'].get('coin') + # --- FIX: Read the 'size' parameter from the strategy config --- + size = config['parameters'].get('size') + + status_file = os.path.join("_data", f"strategy_status_{name}.json") + + if not os.path.exists(status_file): + continue + + with open(status_file, 'r') as f: + status = json.load(f) + + signal = status.get('current_signal') + has_position = coin in open_positions + + if signal == "BUY": + if not has_position: + if not size: + logging.error(f"[{name}] 'size' parameter not defined in strategies.json. Skipping trade.") + continue + + # --- Using the 'size' from config for all BUY signals --- + logging.warning(f"[{name}] SIGNAL: BUY for {coin}. ACTION: Opening new long position of size {size}.") + + # Placeholder for live trading logic + # self.exchange.market_open(coin, True, size, None, 0.01) + + price = status.get('signal_price', 0) + log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=price, size=size, signal=signal) + + elif signal == "SELL": + if has_position: + position_details = open_positions[coin] + position_size = float(position_details.get('szi', 0)) + + # Only close if it's a long position. Short logic would go here. + if position_size > 0: + logging.warning(f"[{name}] SIGNAL: SELL for {coin}. ACTION: Closing existing long position.") + + # Placeholder for live trading logic + # self.exchange.market_close(coin) + + price = float(position_details.get('markPx', 0)) + pnl = float(position_details.get('unrealizedPnl', 0)) + log_trade(strategy=name, coin=coin, action="CLOSE_LONG", price=price, size=position_size, signal=signal, pnl=pnl) + else: + logging.info(f"[{name}] SIGNAL: {signal} for {coin}. ACTION: No trade needed (Position: {'Open' if has_position else 'None'}).") + + except Exception as e: + logging.error(f"An error occurred in the main executor loop: {e}") + + time.sleep(15) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run the Trade Executor.") + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level for the script." + ) + args = parser.parse_args() + + executor = TradeExecutor(log_level=args.log_level) + try: + executor.run() + except KeyboardInterrupt: + logging.info("Trade Executor stopped.") + diff --git a/trade_log.py b/trade_log.py new file mode 100644 index 0000000..6360dd5 --- /dev/null +++ b/trade_log.py @@ -0,0 +1,55 @@ +import os +import csv +from datetime import datetime, timezone +import threading + +# A lock to prevent race conditions when multiple strategies might log at once in the future +log_lock = threading.Lock() + +def log_trade(strategy: str, coin: str, action: str, price: float, size: float, signal: str, pnl: float = 0.0): + """ + Appends a record of a trade action to a persistent CSV log file. + + Args: + strategy (str): The name of the strategy that triggered the action. + coin (str): The coin being traded (e.g., 'BTC'). + action (str): The action taken (e.g., 'OPEN_LONG', 'CLOSE_LONG'). + price (float): The execution price of the trade. + size (float): The size of the trade. + signal (str): The signal that triggered the trade (e.g., 'BUY', 'SELL'). + pnl (float, optional): The realized profit and loss for closing trades. Defaults to 0.0. + """ + log_dir = "_logs" + file_path = os.path.join(log_dir, "trade_history.csv") + + # Ensure the logs directory exists + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + # Define the headers for the CSV file + headers = ["timestamp_utc", "strategy", "coin", "action", "price", "size", "signal", "pnl"] + + # Check if the file needs a header + file_exists = os.path.isfile(file_path) + + with log_lock: + try: + with open(file_path, 'a', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=headers) + + if not file_exists: + writer.writeheader() + + writer.writerow({ + "timestamp_utc": datetime.now(timezone.utc).isoformat(), + "strategy": strategy, + "coin": coin, + "action": action, + "price": price, + "size": size, + "signal": signal, + "pnl": pnl + }) + except IOError as e: + # If logging fails, print an error to the main console as a fallback. + print(f"CRITICAL: Failed to write to trade log file: {e}")