diff --git a/__pycache__/logging_utils.cpython-313.pyc b/__pycache__/logging_utils.cpython-313.pyc index 9515649..6a8df43 100644 Binary files a/__pycache__/logging_utils.cpython-313.pyc and b/__pycache__/logging_utils.cpython-313.pyc differ diff --git a/_data/candles/hyperliquid-historical.py b/_data/candles/hyperliquid-historical.py new file mode 100644 index 0000000..d87bfba --- /dev/null +++ b/_data/candles/hyperliquid-historical.py @@ -0,0 +1,231 @@ +import boto3 +from botocore import UNSIGNED +from botocore.config import Config +from botocore.exceptions import ClientError +import os +import argparse +from datetime import datetime, timedelta +import asyncio +import lz4.frame +from pathlib import Path +import csv +import json + + + +# MUST USE PATHLIB INSTEAD +DIR_PATH = Path(__file__).parent +BUCKET = "hyperliquid-archive" +CSV_HEADER = ["datetime", "timestamp", "level", "price", "size", "number"] + +# s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) +# s3.download_file('hyperliquid-archive', 'market_data/20230916/9/l2Book/SOL.lz4', f"{dir_path}/SOL.lz4") + +# earliest date: 20230415/0/ + + + +def get_args(): + parser = argparse.ArgumentParser(description="Retrieve historical tick level market data from Hyperliquid exchange") + subparser = parser.add_subparsers(dest="tool", required=True, help="tool: download, decompress, to_csv") + + global_parser = subparser.add_parser("global_settings", add_help=False) + global_parser.add_argument("t", metavar="Tickers", help="Tickers of assets to be downloaded seperated by spaces. e.g. BTC ETH", nargs="+") + global_parser.add_argument("--all", help="Apply action to all available dates and times.", action="store_true", default=False) + global_parser.add_argument("--anonymous", help="Use anonymous (unsigned) S3 requests. Defaults to signed requests if not provided.", action="store_true", default=False) + global_parser.add_argument("-sd", metavar="Start date", help="Starting date as one unbroken string formatted: YYYYMMDD. e.g. 20230916") + global_parser.add_argument("-sh", metavar="Start hour", help="Hour of the starting day as an integer between 0 and 23. e.g. 9 Default: 0", type=int, default=0) + global_parser.add_argument("-ed", metavar="End date", help="Ending date as one unbroken string formatted: YYYYMMDD. e.g. 20230916") + global_parser.add_argument("-eh", metavar="End hour", help="Hour of the ending day as an integer between 0 and 23. e.g. 9 Default: 23", type=int, default=23) + + + download_parser = subparser.add_parser("download", help="Download historical market data", parents=[global_parser]) + decompress_parser = subparser.add_parser("decompress", help="Decompress downloaded lz4 data", parents=[global_parser]) + to_csv_parser = subparser.add_parser("to_csv", help="Convert decompressed downloads into formatted CSV", parents=[global_parser]) + + + return parser.parse_args() + + + + +def make_date_list(start_date, end_date): + start_date = datetime.strptime(start_date, '%Y%m%d') + end_date = datetime.strptime(end_date, '%Y%m%d') + + date_list = [] + + current_date = start_date + while current_date <= end_date: + date_list.append(current_date.strftime('%Y%m%d')) + current_date += timedelta(days=1) + + return date_list + + + + +def make_date_hour_list(date_list, start_hour, end_hour, delimiter="/"): + date_hour_list = [] + end_date = date_list[-1] + hour = start_hour + end = 23 + for date in date_list: + if date == end_date: + end = end_hour + + while hour <= end: + date_hour = date + delimiter + str(hour) + date_hour_list.append(date_hour) + hour += 1 + + hour = 0 + + return date_hour_list + + + + +async def download_object(s3, asset, date_hour): + date_and_hour = date_hour.split("/") + key = f"market_data/{date_hour}/l2Book/{asset}.lz4" + dest = f"{DIR_PATH}/downloads/{asset}/{date_and_hour[0]}-{date_and_hour[1]}.lz4" + try: + s3.download_file(BUCKET, key, dest) + except ClientError as e: + # Print a concise message and continue. Common errors: 403 Forbidden, 404 Not Found. + code = e.response.get('Error', {}).get('Code') if hasattr(e, 'response') else 'Unknown' + print(f"Failed to download {key}: {code} - {e}") + return + + + + +async def download_objects(s3, assets, date_hour_list): + print(f"Downloading {len(date_hour_list)} objects...") + for asset in assets: + await asyncio.gather(*[download_object(s3, asset, date_hour) for date_hour in date_hour_list]) + + + + +async def decompress_file(asset, date_hour): + lz_file_path = DIR_PATH / "downloads" / asset / f"{date_hour}.lz4" + file_path = DIR_PATH / "downloads" / asset / date_hour + + if not lz_file_path.is_file(): + print(f"decompress_file: file not found: {lz_file_path}") + return + + with lz4.frame.open(lz_file_path, mode='r') as lzfile: + data = lzfile.read() + with open(file_path, "wb") as file: + file.write(data) + + + + +async def decompress_files(assets, date_hour_list): + print(f"Decompressing {len(date_hour_list)} files...") + for asset in assets: + await asyncio.gather(*[decompress_file(asset, date_hour) for date_hour in date_hour_list]) + + + + +def write_rows(csv_writer, line): + rows = [] + entry = json.loads(line) + date_time = entry["time"] + timestamp = str(entry["raw"]["data"]["time"]) + all_orders = entry["raw"]["data"]["levels"] + + for i, order_level in enumerate(all_orders): + level = str(i + 1) + for order in order_level: + price = order["px"] + size = order["sz"] + number = str(order["n"]) + + rows.append([date_time, timestamp, level, price, size, number]) + + for row in rows: + csv_writer.writerow(row) + + + + + +async def convert_file(asset, date_hour): + file_path = DIR_PATH / "downloads" / asset / date_hour + csv_path = DIR_PATH / "csv" / asset / f"{date_hour}.csv" + + with open(csv_path, "w", newline='') as csv_file: + csv_writer = csv.writer(csv_file, dialect="excel") + csv_writer.writerow(CSV_HEADER) + + with open(file_path) as file: + for line in file: + write_rows(csv_writer, line) + + + + +async def files_to_csv(assets, date_hour_list): + print(f"Converting {len(date_hour_list)} files to CSV...") + for asset in assets: + await asyncio.gather(*[convert_file(asset, date_hour) for date_hour in date_hour_list]) + + + + + +def main(): + print(DIR_PATH) + args = get_args() + + # Create S3 client according to whether anonymous access was requested. + if getattr(args, 'anonymous', False): + s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) + else: + s3 = boto3.client('s3') + + downloads_path = DIR_PATH / "downloads" + downloads_path.mkdir(exist_ok=True) + + csv_path = DIR_PATH / "csv" + csv_path.mkdir(exist_ok=True) + + for asset in args.t: + downloads_asset_path = downloads_path / asset + downloads_asset_path.mkdir(exist_ok=True) + csv_asset_path = csv_path / asset + csv_asset_path.mkdir(exist_ok=True) + + date_list = make_date_list(args.sd, args.ed) + loop = asyncio.new_event_loop() + + if args.tool == "download": + date_hour_list = make_date_hour_list(date_list, args.sh, args.eh) + loop.run_until_complete(download_objects(s3, args.t, date_hour_list)) + loop.close() + + if args.tool == "decompress": + date_hour_list = make_date_hour_list(date_list, args.sh, args.eh, delimiter="-") + loop.run_until_complete(decompress_files(args.t, date_hour_list)) + loop.close() + + if args.tool == "to_csv": + date_hour_list = make_date_hour_list(date_list, args.sh, args.eh, delimiter="-") + loop.run_until_complete(files_to_csv(args.t, date_hour_list)) + loop.close() + + + print("Done") + + + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/_data/candles/requirements.txt b/_data/candles/requirements.txt new file mode 100644 index 0000000..d26aba1 --- /dev/null +++ b/_data/candles/requirements.txt @@ -0,0 +1,8 @@ +boto3==1.34.131 +botocore==1.34.131 +jmespath==1.0.1 +lz4==4.3.3 +python-dateutil==2.9.0.post0 +s3transfer==0.10.1 +six==1.16.0 +urllib3==2.2.2 \ No newline at end of file diff --git a/_data/market_data.db-shm b/_data/market_data.db-shm new file mode 100644 index 0000000..289c224 Binary files /dev/null and b/_data/market_data.db-shm differ diff --git a/_data/strategies.json b/_data/strategies.json new file mode 100644 index 0000000..d541d78 --- /dev/null +++ b/_data/strategies.json @@ -0,0 +1,48 @@ +{ + "rsi_strategy_eth": { + "enabled": true, + "script": "strategy_template.py", + "parameters": { + "coin": "ETH", + "timeframe": "5m", + "rsi_period": 14 + } + }, + "ma_cross_btc": { + "enabled": true, + "script": "strategy_template.py", + "parameters": { + "coin": "BTC", + "timeframe": "1h", + "short_ma": 10, + "long_ma": 50 + } + }, + "sma_125d_btc": { + "enabled": true, + "script": "strategy_template.py", + "parameters": { + "coin": "BTC", + "timeframe": "1D", + "sma_period": 125 + } + }, + "sma_44d_btc": { + "enabled": true, + "script": "strategy_template.py", + "parameters": { + "coin": "BTC", + "timeframe": "1D", + "sma_period": 44 + } + }, + "disabled_strategy": { + "enabled": false, + "script": "strategy_template.py", + "parameters": { + "coin": "SOL", + "timeframe": "15m" + } + } +} + diff --git a/import_csv.py b/import_csv.py new file mode 100644 index 0000000..3757ef2 --- /dev/null +++ b/import_csv.py @@ -0,0 +1,147 @@ +import argparse +import logging +import os +import sys +import sqlite3 +import pandas as pd +from datetime import datetime + +# Assuming logging_utils.py is in the same directory +from logging_utils import setup_logging + +class CsvImporter: + """ + Imports historical candle data from a large CSV file into the SQLite database, + intelligently adding only the missing data. + """ + + def __init__(self, log_level: str, csv_path: str, coin: str): + setup_logging(log_level, 'CsvImporter') + if not os.path.exists(csv_path): + logging.error(f"CSV file not found at '{csv_path}'. Please check the path.") + sys.exit(1) + + self.csv_path = csv_path + self.coin = coin + self.table_name = f"{self.coin}_1m" + self.db_path = os.path.join("_data", "market_data.db") + self.column_mapping = { + 'Open time': 'datetime_utc', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', + 'Number of trades': 'number_of_trades' + } + + def run(self): + """Orchestrates the entire import and verification process.""" + logging.info(f"Starting import process for '{self.coin}' from '{self.csv_path}'...") + + with sqlite3.connect(self.db_path) as conn: + conn.execute("PRAGMA journal_mode=WAL;") + + # 1. Get the current state of the database + db_oldest, db_newest, initial_row_count = self._get_db_state(conn) + + # 2. Read, clean, and filter the CSV data + new_data_df = self._process_and_filter_csv(db_oldest, db_newest) + + if new_data_df.empty: + logging.info("No new data to import. Database is already up-to-date with the CSV file.") + return + + # 3. Append the new data to the database + self._append_to_db(new_data_df, conn) + + # 4. Summarize and verify the import + self._summarize_import(initial_row_count, len(new_data_df), conn) + + def _get_db_state(self, conn) -> (datetime, datetime, int): + """Gets the oldest and newest timestamps and total row count from the DB table.""" + try: + oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0] + newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0] + count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0] + + oldest_dt = pd.to_datetime(oldest) if oldest else None + newest_dt = pd.to_datetime(newest) if newest else None + + if oldest_dt: + logging.info(f"Database contains data from {oldest_dt} to {newest_dt}.") + else: + logging.info("Database table is empty. A full import will be performed.") + + return oldest_dt, newest_dt, count + except pd.io.sql.DatabaseError: + logging.info(f"Table '{self.table_name}' not found. It will be created.") + return None, None, 0 + + def _process_and_filter_csv(self, db_oldest: datetime, db_newest: datetime) -> pd.DataFrame: + """Reads the CSV and returns a DataFrame of only the missing data.""" + logging.info("Reading and processing CSV file. This may take a moment for large files...") + df = pd.read_csv(self.csv_path, usecols=self.column_mapping.keys()) + + # Clean and format the data + df.rename(columns=self.column_mapping, inplace=True) + df['datetime_utc'] = pd.to_datetime(df['datetime_utc']) + + # Filter the data to find only rows that are outside the range of what's already in the DB + if db_oldest and db_newest: + # Get data from before the oldest record and after the newest record + df_filtered = df[(df['datetime_utc'] < db_oldest) | (df['datetime_utc'] > db_newest)] + else: + # If the DB is empty, all data is new + df_filtered = df + + logging.info(f"Found {len(df_filtered):,} new rows to import.") + return df_filtered + + def _append_to_db(self, df: pd.DataFrame, conn): + """Appends the DataFrame to the SQLite table.""" + logging.info(f"Appending {len(df):,} new rows to the database...") + df.to_sql(self.table_name, conn, if_exists='append', index=False) + logging.info("Append operation complete.") + + def _summarize_import(self, initial_count: int, added_count: int, conn): + """Prints a final summary and verification of the import.""" + logging.info("--- Import Summary & Verification ---") + + try: + final_count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0] + new_oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0] + new_newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0] + + print(f"\n{'Status':<20}: SUCCESS") + print("-" * 40) + print(f"{'Initial Row Count':<20}: {initial_count:,}") + print(f"{'Rows Added':<20}: {added_count:,}") + print(f"{'Final Row Count':<20}: {final_count:,}") + print("-" * 40) + print(f"{'New Oldest Record':<20}: {new_oldest}") + print(f"{'New Newest Record':<20}: {new_newest}") + + # Verification check + if final_count == initial_count + added_count: + logging.info("Verification successful: Final row count matches expected count.") + else: + logging.warning("Verification warning: Final row count does not match expected count.") + except Exception as e: + logging.error(f"Could not generate summary. Error: {e}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Import historical CSV data into the SQLite database.") + parser.add_argument("--file", required=True, help="Path to the large CSV file to import.") + parser.add_argument("--coin", default="BTC", help="The coin symbol for this data (e.g., BTC).") + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level for the script." + ) + args = parser.parse_args() + + importer = CsvImporter(log_level=args.log_level, csv_path=args.file, coin=args.coin) + importer.run() diff --git a/logging_utils.py b/logging_utils.py index 9ad66af..490363e 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -1,5 +1,29 @@ import logging import sys +from datetime import datetime + +class LocalTimeFormatter(logging.Formatter): + """ + Custom formatter to display time with milliseconds and a (UTC+HH) offset. + """ + def formatTime(self, record, datefmt=None): + # Convert log record's creation time to a local, timezone-aware datetime object + dt = datetime.fromtimestamp(record.created).astimezone() + + # Format the main time part + time_part = dt.strftime('%Y-%m-%d %H:%M:%S') + + # Get the UTC offset and format it as (UTC+HH) + offset = dt.utcoffset() + offset_str = "" + if offset is not None: + offset_hours = int(offset.total_seconds() / 3600) + sign = '+' if offset_hours >= 0 else '' + offset_str = f" (UTC{sign}{offset_hours})" + + # --- FIX: Cast record.msecs from float to int before formatting --- + # Combine time, milliseconds, and the offset string + return f"{time_part},{int(record.msecs):03d}{offset_str}" def setup_logging(log_level: str, process_name: str): """ @@ -29,10 +53,9 @@ def setup_logging(log_level: str, process_name: str): handler = logging.StreamHandler(sys.stdout) - # --- FIX: Added a date format that includes the timezone name (%Z) --- - formatter = logging.Formatter( - f'%(asctime)s - {process_name} - %(levelname)s - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S %Z' + # This will produce timestamps like: 2025-10-13 14:30:00,123 (UTC+2) + formatter = LocalTimeFormatter( + f'%(asctime)s - {process_name} - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) diff --git a/main_app.py b/main_app.py index b3f446a..6cd9306 100644 --- a/main_app.py +++ b/main_app.py @@ -17,121 +17,141 @@ WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SU COIN_LISTER_SCRIPT = "list_coins.py" MARKET_FEEDER_SCRIPT = "market.py" DATA_FETCHER_SCRIPT = "data_fetcher.py" -RESAMPLER_SCRIPT = "resampler.py" # Restored resampler script +RESAMPLER_SCRIPT = "resampler.py" +STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json") PRICE_DATA_FILE = os.path.join("_data", "current_prices.json") DB_PATH = os.path.join("_data", "market_data.db") STATUS_FILE = os.path.join("_data", "fetcher_status.json") +LOGS_DIR = "_logs" # Directory to store logs from background processes def run_market_feeder(): - """Target function to run the market.py script in a separate process.""" - setup_logging('off', 'MarketFeedProcess') - logging.info("Market feeder process started.") - try: - # Pass the log level to the script - subprocess.run([sys.executable, MARKET_FEEDER_SCRIPT, "--log-level", "off"], check=True) - except subprocess.CalledProcessError as e: - logging.error(f"Market feeder script failed with error: {e}") - except KeyboardInterrupt: - logging.info("Market feeder process stopping.") + """Target function to run market.py and redirect its output to a log file.""" + log_file = os.path.join(LOGS_DIR, "market_feeder.log") + while True: + try: + with open(log_file, 'a') as f: + subprocess.run( + [sys.executable, MARKET_FEEDER_SCRIPT, "--log-level", "normal"], + check=True, stdout=f, stderr=subprocess.STDOUT + ) + except (subprocess.CalledProcessError, Exception) as e: + logging.error(f"Market feeder script failed: {e}. Restarting...") + time.sleep(5) def run_data_fetcher_job(): - """Defines the job to be run by the scheduler for the data fetcher.""" - logging.info(f"Scheduler starting data_fetcher.py task for {', '.join(WATCHED_COINS)}...") + """Defines the job for the data fetcher, redirecting output to a log file.""" + log_file = os.path.join(LOGS_DIR, "data_fetcher.log") + logging.info(f"Scheduler starting data_fetcher.py task...") try: - command = [sys.executable, DATA_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--days", "7", "--log-level", "off"] - subprocess.run(command, check=True) - logging.info("data_fetcher.py task finished successfully.") + command = [sys.executable, DATA_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--days", "7", "--log-level", "normal"] + with open(log_file, 'a') as f: + subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except Exception as e: logging.error(f"Failed to run data_fetcher.py job: {e}") def data_fetcher_scheduler(): - """Schedules and runs the data_fetcher.py script periodically.""" + """Schedules the data_fetcher.py script.""" setup_logging('off', 'DataFetcherScheduler') run_data_fetcher_job() schedule.every(1).minutes.do(run_data_fetcher_job) - logging.info("Data fetcher scheduled to run every 1 minute.") while True: schedule.run_pending() time.sleep(1) -# --- Restored Resampler Functions --- + def run_resampler_job(): - """Defines the job to be run by the scheduler for the resampler.""" - logging.info(f"Scheduler starting resampler.py task for {', '.join(WATCHED_COINS)}...") + """Defines the job for the resampler, redirecting output to a log file.""" + log_file = os.path.join(LOGS_DIR, "resampler.log") + logging.info(f"Scheduler starting resampler.py task...") try: - # Uses default timeframes configured within resampler.py - command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] - subprocess.run(command, check=True) - logging.info("resampler.py task finished successfully.") + command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "normal"] + with open(log_file, 'a') as f: + subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except Exception as e: logging.error(f"Failed to run resampler.py job: {e}") def resampler_scheduler(): - """Schedules and runs the resampler.py script periodically.""" + """Schedules the resampler.py script.""" setup_logging('off', 'ResamplerScheduler') run_resampler_job() schedule.every(4).minutes.do(run_resampler_job) - logging.info("Resampler scheduled to run every 4 minutes.") while True: schedule.run_pending() time.sleep(1) -# --- End of Restored Functions --- + + +def run_strategy(strategy_name: str, config: dict): + """Target function to run a strategy, redirecting its output to a log file.""" + log_file = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log") + script_name = config['script'] + params_str = json.dumps(config['parameters']) + command = [sys.executable, script_name, "--name", strategy_name, "--params", params_str, "--log-level", "normal"] + while True: + try: + with open(log_file, 'a') as f: + f.write(f"\n--- Starting strategy '{strategy_name}' at {datetime.now()} ---\n") + subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) + except (subprocess.CalledProcessError, Exception) as e: + logging.error(f"Strategy '{strategy_name}' failed: {e}. Restarting...") + time.sleep(10) + class MainApp: - def __init__(self, coins_to_watch: list): + def __init__(self, coins_to_watch: list, processes: dict): self.watched_coins = coins_to_watch self.prices = {} self.last_db_update_info = "Initializing..." - self._lines_printed = 0 # To track how many lines we printed last time + self._lines_printed = 0 + self.background_processes = processes + self.process_status = {} def read_prices(self): """Reads the latest prices from the JSON file.""" - if not os.path.exists(PRICE_DATA_FILE): - return - try: - with open(PRICE_DATA_FILE, 'r', encoding='utf-8') as f: - self.prices = json.load(f) - except (json.JSONDecodeError, IOError): - logging.debug("Could not read price file (might be locked).") + if os.path.exists(PRICE_DATA_FILE): + try: + with open(PRICE_DATA_FILE, 'r', encoding='utf-8') as f: + self.prices = json.load(f) + except (json.JSONDecodeError, IOError): + logging.debug("Could not read price file.") def get_overall_db_status(self): """Reads the fetcher status from the status file.""" - if not os.path.exists(STATUS_FILE): - self.last_db_update_info = "Status file not found." - return - try: - with open(STATUS_FILE, 'r', encoding='utf-8') as f: - status = json.load(f) - coin = status.get("last_updated_coin") - timestamp_utc_str = status.get("last_run_timestamp_utc") - num_candles = status.get("num_updated_candles", 0) + if os.path.exists(STATUS_FILE): + try: + with open(STATUS_FILE, 'r', encoding='utf-8') as f: + status = json.load(f) + coin = status.get("last_updated_coin") + timestamp_utc_str = status.get("last_run_timestamp_utc") + num_candles = status.get("num_updated_candles", 0) + if timestamp_utc_str: + dt_utc = datetime.fromisoformat(timestamp_utc_str.replace('Z', '+00:00')).replace(tzinfo=timezone.utc) + dt_local = dt_utc.astimezone(None) + + # --- FIX: Manually format the UTC offset --- + offset = dt_local.utcoffset() + offset_hours = int(offset.total_seconds() / 3600) + sign = '+' if offset_hours >= 0 else '' + offset_str = f"(UTC{sign}{offset_hours})" + timestamp_display = f"{dt_local.strftime('%Y-%m-%d %H:%M:%S')} {offset_str}" + else: + timestamp_display = "N/A" + self.last_db_update_info = f"{coin} at {timestamp_display} ({num_candles} candles)" + except (IOError, json.JSONDecodeError): + self.last_db_update_info = "Error reading status file." - if timestamp_utc_str: - dt_naive = datetime.strptime(timestamp_utc_str, '%Y-%m-%d %H:%M:%S') - dt_utc = dt_naive.replace(tzinfo=timezone.utc) - dt_local = dt_utc.astimezone(None) - timestamp_display = dt_local.strftime('%Y-%m-%d %H:%M:%S %Z') - else: - timestamp_display = "N/A" - - self.last_db_update_info = f"{coin} at {timestamp_display} ({num_candles} candles)" - except (IOError, json.JSONDecodeError) as e: - self.last_db_update_info = "Error reading status file." - logging.error(f"Could not read status file: {e}") + def check_process_status(self): + """Checks if the background processes are still running.""" + for name, process in self.background_processes.items(): + self.process_status[name] = "Running" if process.is_alive() else "STOPPED" def display_dashboard(self): - """Displays a formatted table for prices and DB status without blinking.""" - # Move the cursor up to overwrite the previous output - if self._lines_printed > 0: - print(f"\x1b[{self._lines_printed}A", end="") - - # Build the output as a single string - output_lines = [] - output_lines.append("--- Market Dashboard ---") + """Displays a formatted table without blinking.""" + if self._lines_printed > 0: print(f"\x1b[{self._lines_printed}A", end="") + output_lines = ["--- Market Dashboard ---"] table_width = 26 output_lines.append("-" * table_width) output_lines.append(f"{'#':<2} | {'Coin':<6} | {'Live Price':>10} |") @@ -140,23 +160,25 @@ class MainApp: price = self.prices.get(coin, "Loading...") output_lines.append(f"{i:<2} | {coin:<6} | {price:>10} |") output_lines.append("-" * table_width) - output_lines.append(f"DB Status: Last coin updated -> {self.last_db_update_info}") - # Join lines and add a code to clear from cursor to end of screen - # This prevents artifacts if the new output is shorter than the old one. + output_lines.append("DB Status:") + output_lines.append(f" Last update -> {self.last_db_update_info}") + + output_lines.append("--- Background Processes ---") + for name, status in self.process_status.items(): + output_lines.append(f"{name:<25}: {status}") + final_output = "\n".join(output_lines) + "\n\x1b[J" print(final_output, end="") - - # Store the number of lines printed for the next iteration self._lines_printed = len(output_lines) - sys.stdout.flush() def run(self): - """Main loop to read and display data.""" + """Main loop to read data, display dashboard, and check processes.""" while True: self.read_prices() self.get_overall_db_status() + self.check_process_status() self.display_dashboard() time.sleep(2) @@ -164,6 +186,10 @@ class MainApp: if __name__ == "__main__": setup_logging('normal', 'MainApp') + # Create logs directory if it doesn't exist + if not os.path.exists(LOGS_DIR): + os.makedirs(LOGS_DIR) + logging.info(f"Running coin lister: '{COIN_LISTER_SCRIPT}'...") try: subprocess.run([sys.executable, COIN_LISTER_SCRIPT], check=True, capture_output=True, text=True) @@ -171,35 +197,40 @@ if __name__ == "__main__": logging.error(f"Failed to run '{COIN_LISTER_SCRIPT}'. Error: {e.stderr}") sys.exit(1) - logging.info(f"Starting market feeder ('{MARKET_FEEDER_SCRIPT}')...") - market_process = multiprocessing.Process(target=run_market_feeder, daemon=True) - market_process.start() + processes = {} - logging.info(f"Starting historical data fetcher ('{DATA_FETCHER_SCRIPT}')...") - fetcher_process = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True) - fetcher_process.start() + # Start Data Pipeline Processes + processes["Market Feeder"] = multiprocessing.Process(target=run_market_feeder, daemon=True) + processes["Data Fetcher"] = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True) + processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, daemon=True) - # --- Restored Resampler Process Start --- - logging.info(f"Starting resampler ('{RESAMPLER_SCRIPT}')...") - resampler_process = multiprocessing.Process(target=resampler_scheduler, daemon=True) - resampler_process.start() - # --- End Resampler Process Start --- + # Start Strategy Processes based on config + try: + with open(STRATEGY_CONFIG_FILE, 'r') as f: + strategy_configs = json.load(f) + for name, config in strategy_configs.items(): + if config.get("enabled", False): + proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True) + processes[f"Strategy: {name}"] = proc + except (FileNotFoundError, json.JSONDecodeError) as e: + logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}") + + # Launch all processes + for name, proc in processes.items(): + logging.info(f"Starting process '{name}'...") + proc.start() time.sleep(3) - app = MainApp(coins_to_watch=WATCHED_COINS) + app = MainApp(coins_to_watch=WATCHED_COINS, processes=processes) try: app.run() except KeyboardInterrupt: logging.info("Shutting down...") - market_process.terminate() - fetcher_process.terminate() - # --- Restored Resampler Shutdown --- - resampler_process.terminate() - market_process.join() - fetcher_process.join() - resampler_process.join() - # --- End Resampler Shutdown --- + for proc in processes.values(): + if proc.is_alive(): proc.terminate() + for proc in processes.values(): + if proc.is_alive(): proc.join() logging.info("Shutdown complete.") sys.exit(0) diff --git a/strategy_template.py b/strategy_template.py new file mode 100644 index 0000000..911fb7c --- /dev/null +++ b/strategy_template.py @@ -0,0 +1,132 @@ +import argparse +import logging +import sys +import time +import pandas as pd +import sqlite3 +import json +import os + +from logging_utils import setup_logging + +class TradingStrategy: + """ + A template for a trading strategy that reads data from the SQLite database + and executes its logic in a loop. + """ + + def __init__(self, strategy_name: str, params: dict, log_level: str): + self.strategy_name = strategy_name + self.params = params + self.coin = params.get("coin", "N/A") + self.timeframe = params.get("timeframe", "N/A") + self.db_path = os.path.join("_data", "market_data.db") + + # Load strategy-specific parameters + self.rsi_period = params.get("rsi_period") + self.short_ma = params.get("short_ma") + self.long_ma = params.get("long_ma") + self.sma_period = params.get("sma_period") + + setup_logging(log_level, f"Strategy-{self.strategy_name}") + logging.info(f"Initializing strategy with parameters: {self.params}") + + def load_data(self) -> pd.DataFrame: + """Loads historical data for the configured coin and timeframe from the database.""" + table_name = f"{self.coin}_{self.timeframe}" + # Ensure we load enough data for the longest indicator period + limit = 500 + if self.sma_period and self.sma_period > limit: + limit = self.sma_period + 50 # Add a buffer + elif self.long_ma and self.long_ma > limit: + limit = self.long_ma + 50 + + try: + with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn: + query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}' + df = pd.read_sql(query, conn) + df['datetime_utc'] = pd.to_datetime(df['datetime_utc']) + df.set_index('datetime_utc', inplace=True) + df.sort_index(inplace=True) # Ensure data is chronological + return df + except Exception as e: + logging.error(f"Failed to load data from table '{table_name}': {e}") + return pd.DataFrame() + + def run_logic(self): + """ + The main loop where the strategy's logic is executed. + This should be implemented with your specific trading rules. + """ + logging.info(f"Starting main logic loop for {self.coin} on {self.timeframe} timeframe.") + while True: + data = self.load_data() + + if data.empty: + logging.warning("No data loaded. Waiting before retrying...") + time.sleep(60) + continue + + last_close = data['close'].iloc[-1] + logging.info(f"Latest data loaded. Last close price for {self.coin}: {last_close}") + + # --- SMA Strategy Logic --- + if self.sma_period: + if len(data) < self.sma_period: + logging.warning(f"Not enough data to calculate {self.sma_period}-period SMA. " + f"Need {self.sma_period}, have {len(data)}.") + else: + # Calculate the Simple Moving Average + sma = data['close'].rolling(window=self.sma_period).mean().iloc[-1] + logging.info(f"Current Price: {last_close}, {self.sma_period}-period SMA: {sma:.4f}") + + if last_close > sma: + logging.warning("--- BUY SIGNAL --- (Price is above SMA)") + elif last_close < sma: + logging.warning("--- SELL SIGNAL --- (Price is below SMA)") + else: + logging.info("--- HOLD SIGNAL --- (Price is at SMA)") + + # --- RSI Strategy Logic (Placeholder) --- + if self.rsi_period: + logging.info(f"RSI Period is set to: {self.rsi_period}. (RSI calculation not implemented).") + + # --- MA Cross Strategy Logic (Placeholder) --- + if self.short_ma and self.long_ma: + logging.info(f"Short MA: {self.short_ma}, Long MA: {self.long_ma}. (MA Cross logic not implemented).") + + logging.info("Logic execution finished. Waiting for next cycle.") + time.sleep(60) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run a trading strategy.") + parser.add_argument("--name", required=True, help="The name of the strategy instance from the config.") + parser.add_argument("--params", required=True, help="A JSON string of the strategy's parameters.") + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level for the script." + ) + + args = parser.parse_args() + + try: + strategy_params = json.loads(args.params) + strategy = TradingStrategy( + strategy_name=args.name, + params=strategy_params, + log_level=args.log_level + ) + strategy.run_logic() + except json.JSONDecodeError: + logging.error("Failed to decode JSON from --params argument.") + sys.exit(1) + except KeyboardInterrupt: + logging.info("Strategy process stopped.") + sys.exit(0) + except Exception as e: + logging.error(f"A critical error occurred: {e}") + sys.exit(1) +