Compare commits

11 Commits

Author SHA1 Message Date
1cf05a5b69 WALK-FORWARD testing 2025-10-18 18:40:50 +02:00
e7c7158c68 backword forward strategy 2025-10-18 18:29:06 +02:00
a30f75fae0 wiki 2025-10-18 15:55:53 +02:00
6f822483cc Update README.md 2025-10-18 13:40:28 +00:00
fa5b06d1f6 readme 2025-10-18 15:13:40 +02:00
2a3cf5cacf readme.md 2025-10-18 15:10:46 +02:00
2269fbed9e trade_executor, agent creator 2025-10-16 13:18:39 +02:00
7ef6bd4d14 strategy status table 2025-10-15 18:32:12 +02:00
a69581a07b added market caps 2025-10-14 23:08:37 +02:00
76c993ce76 imort CSV files 2025-10-14 19:15:35 +02:00
903f4ff434 first stategies, import script for BTC histry 2025-10-14 10:48:26 +02:00
45 changed files with 2806 additions and 198 deletions

24
.env.example Normal file
View File

@ -0,0 +1,24 @@
# Example environment variables for the Hyperliquid trading toolkit
# Copy this file to .env and fill in real values. Do NOT commit your real .env file.
# Main wallet (used only to authorize agents on-chain)
# Example: MAIN_WALLET_PRIVATE_KEY=0x...
MAIN_WALLET_PRIVATE_KEY=
MAIN_WALLET_ADDRESS=
# Agent keys (private keys authorized via create_agent.py)
# Preferred patterns:
# - AGENT_PRIVATE_KEY: default agent
# - <NAME>_AGENT_PK or <NAME>_AGENT_PRIVATE_KEY: per-agent keys (e.g., SCALPER_AGENT_PK)
# Example: AGENT_PRIVATE_KEY=0x...
AGENT_PRIVATE_KEY=
# Example per-agent key:
# SCALPER_AGENT_PK=
# SWING_AGENT_PK=
# Optional: CoinGecko API key to reduce rate limits for market cap fetches
COINGECKO_API_KEY=
# Optional: Set a custom environment for development/testing
# E.g., DEBUG=true
DEBUG=

88
README.md Normal file
View File

@ -0,0 +1,88 @@
# Automated Crypto Trading Bot
This project is a sophisticated, multi-process automated trading bot designed to interact with the Hyperliquid decentralized exchange. It features a robust data pipeline, a flexible strategy engine, multi-agent trade execution, and a live terminal dashboard for real-time monitoring.
<!-- It's a good idea to take a screenshot of your dashboard and upload it to a service like Imgur to include here -->
## Features
* **Multi-Process Architecture**: Core components (data fetching, trading, strategies) run in parallel processes for maximum performance and stability.
* **Comprehensive Data Pipeline**:
* Live price feeds for all assets.
* Historical candle data collection for any coin and timeframe.
* Historical market cap data fetching from the CoinGecko API.
* **High-Performance Database**: Uses SQLite with pandas for fast, indexed storage and retrieval of all market data.
* **Configuration-Driven Strategies**: Trading strategies are defined and managed in a simple JSON file (`_data/strategies.json`), allowing for easy configuration without code changes.
* **Multi-Agent Trading**: Supports multiple, independent trading agents for advanced risk segregation and PNL tracking.
* **Live Terminal Dashboard**: A real-time, flicker-free dashboard to monitor live prices, market caps, strategy signals, and the status of all background processes.
* **Secure Key Management**: Uses a `.env` file to securely manage all private keys and API keys, keeping them separate from the codebase.
## Project Structure
The project is composed of several key scripts that work together:
* **`main_app.py`**: The central orchestrator. It launches all background processes and displays the main monitoring dashboard.
* **`trade_executor.py`**: The trading "brain." It reads signals from all active strategies and executes trades using the appropriate agent.
* **`data_fetcher.py`**: A background service that collects 1-minute historical candle data and saves it to the SQLite database.
* **`resampler.py`**: A background service that reads the 1-minute data and generates all other required timeframes (e.g., 5m, 1h, 1d).
* **`market_cap_fetcher.py`**: A scheduled service to download daily market cap data.
* **`strategy_*.py`**: Individual files containing the logic for different types of trading strategies (e.g., SMA Crossover).
* **`_data/strategies.json`**: The configuration file for defining and enabling/disabling your trading strategies.
* **`.env`**: The secure file for storing all your private keys and API keys.
## Installation
1. **Clone the Repository**
```bash
git clone [https://github.com/your-username/your-repo-name.git](https://github.com/your-username/your-repo-name.git)
cd your-repo-name
```
2. **Create and Activate a Virtual Environment**
```bash
# For Windows
python -m venv .venv
.\.venv\Scripts\activate
# For macOS/Linux
python3 -m venv .venv
source .venv/bin/activate
```
3. **Install Dependencies**
```bash
pip install -r requirements.txt
```
## Getting Started: Configuration
Before running the application, you must configure your wallets, agents, and API keys.
1. Create the .env File In the root of the project, create a file named .env. Copy the following content into it and replace the placeholder values with your actual keys.
2. **Activate Your Main Wallet on Hyperliquid**
The `trade_executor.py` script will fail if your main wallet is not registered.
* Go to the Hyperliquid website, connect your main wallet, and make a small deposit. This is a one-time setup step.
3. **Create and Authorize Trading Agents**
The `trade_executor.py` uses secure "agent" keys that can trade but cannot withdraw. You need to generate these and authorize them with your main wallet.
* Run the `create_agent.py` script
```bash
python create_agent.py
```
The script will output a new Agent Private Key. Copy this key and add it to your .env file (e.g., as SCALPER_AGENT_PK). Repeat this for each agent you want to create.
4. **Configure**
Your Strategies Open the `_data/strategies.json` file to define which strategies you want to run.
* Set "enabled": true to activate a strategy.
* Assign an "agent" (e.g., "scalper", "swing") to each strategy. The agent name must correspond to a key in your .env file (e.g., SCALPER_AGENT_PK -> "scalper").
* Configure the parameters for each strategy, such as the coin, timeframe, and any indicator settings.
##Usage##
Once everything is configured, you can run the main application from your terminal:
```bash
python main_app.py
```
## Documentation
Detailed project documentation is available in the `WIKI/` directory. Start with the summary page:
`WIKI/SUMMARY.md`
This contains links and explanations for `OVERVIEW.md`, `SETUP.md`, `SCRIPTS.md`, and other helpful pages that describe usage, data layout, agent management, development notes, and troubleshooting.

5
WIKI/.gitattributes vendored Normal file
View File

@ -0,0 +1,5 @@
# Treat markdown files as text with LF normalization
*.md text eol=lf
# Ensure JSON files are treated as text
*.json text

34
WIKI/AGENTS.md Normal file
View File

@ -0,0 +1,34 @@
Agents and Keys
This project supports running multiple agent identities (private keys) to place orders on Hyperliquid. Agents are lightweight keys authorized on-chain by your main wallet.
Agent storage and environment
- For security, agent private keys should be stored as environment variables and not checked into source control.
- Supported patterns:
- `AGENT_PRIVATE_KEY` (single default agent)
- `<NAME>_AGENT_PK` or `<NAME>_AGENT_PRIVATE_KEY` (per-agent keys)
Discovering agents
- `trade_executor.py` scans environment variables for agent keys and loads them into `Exchange` objects so each agent can sign orders independently.
Creating and authorizing agents
- Use `create_agent.py` with your `MAIN_WALLET_PRIVATE_KEY` to authorize a new agent name. The script will attempt to call `exchange.approve_agent(agent_name)` and print the returned agent private key.
Security notes
- Never commit private keys to Git. Keep them in a secure secrets store or local `.env` file excluded from version control.
- Rotate keys if they are ever exposed and re-authorize agents using your main wallet.
Example `.env` snippet
MAIN_WALLET_PRIVATE_KEY=<your-main-wallet-private-key>
MAIN_WALLET_ADDRESS=<your-main-wallet-address>
AGENT_PRIVATE_KEY=<agent-private-key>
EXECUTOR_SCALPER_AGENT_PK=<agent-private-key-for-scalper>
File `agents`
- This repository may contain a local `agents` file used as a quick snapshot; treat it as insecure and remove it from the repo or add it to `.gitignore` if it contains secrets.

20
WIKI/CONTRIBUTING.md Normal file
View File

@ -0,0 +1,20 @@
Contributing
Thanks for considering contributing! Please follow these guidelines to make the process smooth.
How to contribute
1. Fork the repository and create a feature branch for your change.
2. Keep changes focused and add tests where appropriate.
3. Submit a Pull Request with a clear description and the reason for the change.
Coding standards
- Keep functions small and well-documented.
- Use the existing logging utilities for consistent output.
- Prefer safe, incremental changes for financial code.
Security and secrets
- Never commit private keys, API keys, or secrets. Use environment variables or a secrets manager.
- If you accidentally commit secrets, rotate them immediately.

31
WIKI/DATA.md Normal file
View File

@ -0,0 +1,31 @@
Data layout and formats
This section describes the `_data/` directory and the important files used by the scripts.
Important files
- `_data/market_data.db` — SQLite database that stores candle tables. Tables are typically named `<COIN>_<INTERVAL>` (e.g., `BTC_1m`, `ETH_5m`).
- `_data/coin_precision.json` — Mapping of coin names to their size precision (created by `list_coins.py`).
- `_data/current_prices.json` — Latest market prices that `market.py` writes.
- `_data/fetcher_status.json` — Last run metadata from `data_fetcher.py`.
- `_data/market_cap_data.json` — Market cap summary saved by `market_cap_fetcher.py`.
- `_data/strategies.json` — Configuration for strategies (enabled flag, parameters).
- `_data/strategy_status_<name>.json` — Per-strategy runtime status including last signal and price.
- `_data/executor_managed_positions.json` — Which strategy is currently managing which live position (used by `trade_executor`).
Candle schema
Each candle table contains columns similar to:
- `timestamp_ms` (INTEGER) — milliseconds since epoch
- `open`, `high`, `low`, `close` (FLOAT)
- `volume` (FLOAT)
- `number_of_trades` (INTEGER)
Trade logs
- Persistent trade history is stored in `_logs/trade_history.csv` with the following columns: `timestamp_utc`, `strategy`, `coin`, `action`, `price`, `size`, `signal`, `pnl`.
Backups and maintenance
- Periodically back up `_data/market_data.db`. The WAL and SHM files are also present when SQLite uses WAL mode.
- Keep JSON config/state files under version control only if they contain no secrets.

24
WIKI/DEVELOPMENT.md Normal file
View File

@ -0,0 +1,24 @@
Development and testing
Code style and conventions
- Python 3.11+ with typing hints where helpful.
- Use `logging_utils.setup_logging` for consistent logs across scripts.
Running tests
- This repository doesn't currently include a formal test suite. Suggested quick checks:
- Run `python list_coins.py` to verify connectivity to Hyperliquid Info.
- Run `python -m pyflakes .` or `python -m pylint` if you have linters installed.
Adding a new strategy
1. Create a new script following the pattern in `strategy_template.py`.
2. Add an entry to `_data/strategies.json` with `enabled: true` and relevant parameters.
3. Ensure the strategy writes a status JSON file (`_data/strategy_status_<name>.json`) and uses `trade_log.log_trade` to record actions.
Recommended improvements (low-risk)
- Add a lightweight unit test suite (pytest) for core functions like timeframe parsing, SQL helpers, and signal calculation.
- Add CI (GitHub Actions) to run flake/pylint and unit tests on PRs.
- Move secrets handling to a `.env.example` and document environment variables in `WIKI/SETUP.md`.

29
WIKI/OVERVIEW.md Normal file
View File

@ -0,0 +1,29 @@
Hyperliquid Trading Toolkit
This repository contains a collection of utility scripts, data fetchers, resamplers, trading strategies, and a trade executor for working with Hyperliquid trading APIs and crawled data. It is organized to support data collection, transformation, strategy development, and automated execution via agents.
Key components
- Data fetching and management: `data_fetcher.py`, `market.py`, `resampler.py`, `market_cap_fetcher.py`, `list_coins.py`
- Strategies: `strategy_sma_cross.py`, `strategy_template.py`, `strategy_sma_125d.py` (if present)
- Execution: `trade_executor.py`, `create_agent.py`, `agents` helper
- Utilities: `logging_utils.py`, `trade_log.py`
- Data storage: SQLite database in `_data/market_data.db` and JSON files in `_data`
Intended audience
- Developers building strategies and automations on Hyperliquid
- Data engineers collecting and processing market data
- Operators running the fetchers and executors on a scheduler or as system services
Project goals
- Reliable collection of 1m candles and resampling to common timeframes
- Clean separation between data, strategies, and execution
- Lightweight logging and traceable trade records
Where to start
- Read `WIKI/SETUP.md` to prepare your environment
- Use `WIKI/SCRIPTS.md` for a description of individual scripts and how to run them
- Inspect `WIKI/AGENTS.md` to understand agent keys and how to manage them

47
WIKI/SCRIPTS.md Normal file
View File

@ -0,0 +1,47 @@
Scripts and How to Use Them
This file documents the main scripts in the repository and their purpose, typical runtime parameters, and key notes.
list_coins.py
- Purpose: Fetches asset metadata from Hyperliquid (name and size/precision) and saves `_data/coin_precision.json`.
- Usage: `python list_coins.py`
- Notes: Reads `hyperliquid.info.Info` and writes a JSON file. Useful to run before market feeders.
market.py (MarketDataFeeder)
- Purpose: Fetches live prices from Hyperliquid and writes `_data/current_prices.json` while printing a live table.
- Usage: `python market.py --log-level normal`
- Notes: Expects `_data/coin_precision.json` to exist.
data_fetcher.py (CandleFetcherDB)
- Purpose: Fetches historical 1m candles and stores them in `_data/market_data.db` using a table-per-coin naming convention.
- Usage: `python data_fetcher.py --coins BTC ETH --interval 1m --days 7`
- Notes: Can be run regularly by a scheduler to keep the DB up to date.
resampler.py (Resampler)
- Purpose: Reads 1m candles from SQLite and resamples to configured timeframes (e.g. 5m, 15m, 1h), appending new candles to tables.
- Usage: `python resampler.py --coins BTC ETH --timeframes 5m 15m 1h --log-level normal`
market_cap_fetcher.py (MarketCapFetcher)
- Purpose: Pulls CoinGecko market cap numbers and maintains historical daily tables in the same SQLite DB.
- Usage: `python market_cap_fetcher.py --coins BTC ETH --log-level normal`
- Notes: Optional `COINGECKO_API_KEY` in `.env` avoids throttling.
strategy_sma_cross.py (SmaCrossStrategy)
- Purpose: Run an SMA-based trading strategy. Reads candles from `_data/market_data.db` and writes status to `_data/strategy_status_<name>.json`.
- Usage: `python strategy_sma_cross.py --name sma_cross_1 --params '{"coin":"BTC","timeframe":"1m","fast":5,"slow":20}' --log-level normal`
trade_executor.py (TradeExecutor)
- Purpose: Orchestrates agent-based order execution using agent private keys found in environment variables. Uses `_data/strategies.json` to determine active strategies.
- Usage: `python trade_executor.py --log-level normal`
- Notes: Requires `MAIN_WALLET_ADDRESS` and agent keys. See `create_agent.py` to authorize agents on-chain.
create_agent.py
- Purpose: Authorizes a new on-chain agent using your main wallet (requires `MAIN_WALLET_PRIVATE_KEY`).
- Usage: `python create_agent.py`
- Notes: Prints the new agent private key to stdout — save it securely.
trade_log.py
- Purpose: Provides a thread-safe CSV trade history logger. Used by the executor and strategies to record actions.
Other utility scripts
- import_csv.py, fix_timestamps.py, list_coins.py, etc. — see file headers for details.

42
WIKI/SETUP.md Normal file
View File

@ -0,0 +1,42 @@
Setup and Installation
Prerequisites
- Python 3.11+ (project uses modern dependencies)
- Git (optional)
- A Hyperliquid account and an activated main wallet if you want to authorize agents and trade
Virtual environment
1. Create a virtual environment:
python -m venv .venv
2. Activate the virtual environment (PowerShell on Windows):
.\.venv\Scripts\Activate.ps1
3. Upgrade pip and install dependencies:
python -m pip install --upgrade pip
pip install -r requirements.txt
Configuration
- Copy `.env.example` to `.env` and set the following variables as required:
- MAIN_WALLET_PRIVATE_KEY (used by `create_agent.py` to authorize agents)
- MAIN_WALLET_ADDRESS (used by `trade_executor.py`)
- AGENT_PRIVATE_KEY or per-agent keys like `EXECUTOR_SCALPER_AGENT_PK`
- Optional: COINGECKO_API_KEY for `market_cap_fetcher.py` to avoid rate limits
Data directory
- The project writes and reads data from the `_data/` folder. Ensure the directory exists and is writable by the user running the scripts.
Quick test
After installing packages, run `list_coins.py` in a dry run to verify connectivity to the Hyperliquid info API:
python list_coins.py
If you encounter import errors, ensure the virtual environment is active and the `requirements.txt` dependencies are installed.

15
WIKI/SUMMARY.md Normal file
View File

@ -0,0 +1,15 @@
Project Wiki Summary
This directory contains human-friendly documentation for the project. Files:
- `OVERVIEW.md` — High-level overview and where to start
- `SETUP.md` — Environment setup and quick test steps
- `SCRIPTS.md` — Per-script documentation and usage examples
- `AGENTS.md` — How agents work and secure handling of keys
- `DATA.md` — Data folder layout and schema notes
- `DEVELOPMENT.md` — Developer guidance and recommended improvements
- `CONTRIBUTING.md` — How to contribute safely
- `TROUBLESHOOTING.md` — Common problems and solutions
Notes:
- These pages were generated from repository source files and common patterns in trading/data projects. Validate any sensitive information (agent keys) and remove them from the repository when sharing.

21
WIKI/TROUBLESHOOTING.md Normal file
View File

@ -0,0 +1,21 @@
Troubleshooting common issues
1. Import errors
- Ensure the virtual environment is active.
- Run `pip install -r requirements.txt`.
2. Agent authorization failures
- Ensure your main wallet is activated on Hyperliquid and has funds.
- The `create_agent.py` script will print helpful messages if the vault (main wallet) cannot act.
3. SQLite locked errors
- Increase the SQLite timeout when opening connections (this project uses a 10s timeout in fetcher). Close other programs that may hold the DB open.
4. Missing coin precision file
- Run `python list_coins.py` to regenerate `_data/coin_precision.json`.
5. Rate limits from CoinGecko
- Set `COINGECKO_API_KEY` in your `.env` file and ensure the fetcher respects backoff.
6. Agent keys in `agents` file or other local files
- Treat any `agents` file with private keys as compromised; rotate keys and remove the file from the repository.

Binary file not shown.

View File

@ -0,0 +1,27 @@
{
"sma_cross_eth_1m": {
"strategy_name": "sma_cross_1",
"optimization_params": {
"fast": {
"start": 4,
"end": 15,
"step": 1
},
"slow": {
"start": 20,
"end": 60,
"step": 1
}
}
},
"sma_44d_btc": {
"strategy_name": "sma_cross_2",
"optimization_params": {
"sma_period": {
"start": 20,
"end": 250,
"step": 1
}
}
}
}

View File

@ -0,0 +1,231 @@
import boto3
from botocore import UNSIGNED
from botocore.config import Config
from botocore.exceptions import ClientError
import os
import argparse
from datetime import datetime, timedelta
import asyncio
import lz4.frame
from pathlib import Path
import csv
import json
# MUST USE PATHLIB INSTEAD
DIR_PATH = Path(__file__).parent
BUCKET = "hyperliquid-archive"
CSV_HEADER = ["datetime", "timestamp", "level", "price", "size", "number"]
# s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
# s3.download_file('hyperliquid-archive', 'market_data/20230916/9/l2Book/SOL.lz4', f"{dir_path}/SOL.lz4")
# earliest date: 20230415/0/
def get_args():
parser = argparse.ArgumentParser(description="Retrieve historical tick level market data from Hyperliquid exchange")
subparser = parser.add_subparsers(dest="tool", required=True, help="tool: download, decompress, to_csv")
global_parser = subparser.add_parser("global_settings", add_help=False)
global_parser.add_argument("t", metavar="Tickers", help="Tickers of assets to be downloaded seperated by spaces. e.g. BTC ETH", nargs="+")
global_parser.add_argument("--all", help="Apply action to all available dates and times.", action="store_true", default=False)
global_parser.add_argument("--anonymous", help="Use anonymous (unsigned) S3 requests. Defaults to signed requests if not provided.", action="store_true", default=False)
global_parser.add_argument("-sd", metavar="Start date", help="Starting date as one unbroken string formatted: YYYYMMDD. e.g. 20230916")
global_parser.add_argument("-sh", metavar="Start hour", help="Hour of the starting day as an integer between 0 and 23. e.g. 9 Default: 0", type=int, default=0)
global_parser.add_argument("-ed", metavar="End date", help="Ending date as one unbroken string formatted: YYYYMMDD. e.g. 20230916")
global_parser.add_argument("-eh", metavar="End hour", help="Hour of the ending day as an integer between 0 and 23. e.g. 9 Default: 23", type=int, default=23)
download_parser = subparser.add_parser("download", help="Download historical market data", parents=[global_parser])
decompress_parser = subparser.add_parser("decompress", help="Decompress downloaded lz4 data", parents=[global_parser])
to_csv_parser = subparser.add_parser("to_csv", help="Convert decompressed downloads into formatted CSV", parents=[global_parser])
return parser.parse_args()
def make_date_list(start_date, end_date):
start_date = datetime.strptime(start_date, '%Y%m%d')
end_date = datetime.strptime(end_date, '%Y%m%d')
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date.strftime('%Y%m%d'))
current_date += timedelta(days=1)
return date_list
def make_date_hour_list(date_list, start_hour, end_hour, delimiter="/"):
date_hour_list = []
end_date = date_list[-1]
hour = start_hour
end = 23
for date in date_list:
if date == end_date:
end = end_hour
while hour <= end:
date_hour = date + delimiter + str(hour)
date_hour_list.append(date_hour)
hour += 1
hour = 0
return date_hour_list
async def download_object(s3, asset, date_hour):
date_and_hour = date_hour.split("/")
key = f"market_data/{date_hour}/l2Book/{asset}.lz4"
dest = f"{DIR_PATH}/downloads/{asset}/{date_and_hour[0]}-{date_and_hour[1]}.lz4"
try:
s3.download_file(BUCKET, key, dest)
except ClientError as e:
# Print a concise message and continue. Common errors: 403 Forbidden, 404 Not Found.
code = e.response.get('Error', {}).get('Code') if hasattr(e, 'response') else 'Unknown'
print(f"Failed to download {key}: {code} - {e}")
return
async def download_objects(s3, assets, date_hour_list):
print(f"Downloading {len(date_hour_list)} objects...")
for asset in assets:
await asyncio.gather(*[download_object(s3, asset, date_hour) for date_hour in date_hour_list])
async def decompress_file(asset, date_hour):
lz_file_path = DIR_PATH / "downloads" / asset / f"{date_hour}.lz4"
file_path = DIR_PATH / "downloads" / asset / date_hour
if not lz_file_path.is_file():
print(f"decompress_file: file not found: {lz_file_path}")
return
with lz4.frame.open(lz_file_path, mode='r') as lzfile:
data = lzfile.read()
with open(file_path, "wb") as file:
file.write(data)
async def decompress_files(assets, date_hour_list):
print(f"Decompressing {len(date_hour_list)} files...")
for asset in assets:
await asyncio.gather(*[decompress_file(asset, date_hour) for date_hour in date_hour_list])
def write_rows(csv_writer, line):
rows = []
entry = json.loads(line)
date_time = entry["time"]
timestamp = str(entry["raw"]["data"]["time"])
all_orders = entry["raw"]["data"]["levels"]
for i, order_level in enumerate(all_orders):
level = str(i + 1)
for order in order_level:
price = order["px"]
size = order["sz"]
number = str(order["n"])
rows.append([date_time, timestamp, level, price, size, number])
for row in rows:
csv_writer.writerow(row)
async def convert_file(asset, date_hour):
file_path = DIR_PATH / "downloads" / asset / date_hour
csv_path = DIR_PATH / "csv" / asset / f"{date_hour}.csv"
with open(csv_path, "w", newline='') as csv_file:
csv_writer = csv.writer(csv_file, dialect="excel")
csv_writer.writerow(CSV_HEADER)
with open(file_path) as file:
for line in file:
write_rows(csv_writer, line)
async def files_to_csv(assets, date_hour_list):
print(f"Converting {len(date_hour_list)} files to CSV...")
for asset in assets:
await asyncio.gather(*[convert_file(asset, date_hour) for date_hour in date_hour_list])
def main():
print(DIR_PATH)
args = get_args()
# Create S3 client according to whether anonymous access was requested.
if getattr(args, 'anonymous', False):
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
else:
s3 = boto3.client('s3')
downloads_path = DIR_PATH / "downloads"
downloads_path.mkdir(exist_ok=True)
csv_path = DIR_PATH / "csv"
csv_path.mkdir(exist_ok=True)
for asset in args.t:
downloads_asset_path = downloads_path / asset
downloads_asset_path.mkdir(exist_ok=True)
csv_asset_path = csv_path / asset
csv_asset_path.mkdir(exist_ok=True)
date_list = make_date_list(args.sd, args.ed)
loop = asyncio.new_event_loop()
if args.tool == "download":
date_hour_list = make_date_hour_list(date_list, args.sh, args.eh)
loop.run_until_complete(download_objects(s3, args.t, date_hour_list))
loop.close()
if args.tool == "decompress":
date_hour_list = make_date_hour_list(date_list, args.sh, args.eh, delimiter="-")
loop.run_until_complete(decompress_files(args.t, date_hour_list))
loop.close()
if args.tool == "to_csv":
date_hour_list = make_date_hour_list(date_list, args.sh, args.eh, delimiter="-")
loop.run_until_complete(files_to_csv(args.t, date_hour_list))
loop.close()
print("Done")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,8 @@
boto3==1.34.131
botocore==1.34.131
jmespath==1.0.1
lz4==4.3.3
python-dateutil==2.9.0.post0
s3transfer==0.10.1
six==1.16.0
urllib3==2.2.2

View File

@ -0,0 +1,12 @@
{
"sma_cross_2": {
"coin": "BTC",
"side": "short",
"size": 0.0001
},
"sma_cross_1": {
"coin": "ETH",
"side": "short",
"size": 0.0028
}
}

View File

@ -0,0 +1,47 @@
{
"BTC_market_cap": {
"datetime_utc": "2025-10-14 19:07:32",
"market_cap": 2254100854707.6426
},
"ETH_market_cap": {
"datetime_utc": "2025-10-14 19:07:45",
"market_cap": 498260644977.71
},
"SOL_market_cap": {
"datetime_utc": "2025-10-14 19:07:54",
"market_cap": 110493585034.85222
},
"BNB_market_cap": {
"datetime_utc": "2025-10-14 19:08:01",
"market_cap": 169461959349.39044
},
"ZEC_market_cap": {
"datetime_utc": "2025-10-14 19:08:32",
"market_cap": 3915238492.7266335
},
"SUI_market_cap": {
"datetime_utc": "2025-10-14 19:08:51",
"market_cap": 10305847774.680008
},
"STABLECOINS_market_cap": {
"datetime_utc": "2025-10-14 00:00:00",
"market_cap": 551315140796.8396
},
"ASTER_market_cap": {
"datetime_utc": "2025-10-14 20:47:18",
"market_cap": 163953008.77347806
},
"HYPE_market_cap": {
"datetime_utc": "2025-10-14 20:55:21",
"market_cap": 10637373991.458858
},
"TOTAL_market_cap_daily": {
"datetime_utc": "2025-10-16 00:00:00",
"market_cap": 3849619103702.8604
},
"PUMP_market_cap": {
"datetime_utc": "2025-10-14 21:02:30",
"market_cap": 1454398647.593871
},
"summary_last_updated_utc": "2025-10-16T00:16:09.640449+00:00"
}

BIN
_data/market_data.db-shm Normal file

Binary file not shown.

BIN
_data/market_data.db-wal Normal file

Binary file not shown.

53
_data/strategies.json Normal file
View File

@ -0,0 +1,53 @@
{
"sma_cross_1": {
"enabled": true,
"script": "strategy_sma_cross.py",
"agent": "scalper",
"parameters": {
"coin": "ETH",
"timeframe": "5m",
"slow": 44,
"fast": 7,
"size": 0.0028,
"leverage_long": 5,
"leverage_short": 2
}
},
"sma_cross_2": {
"enabled": true,
"script": "strategy_sma_cross.py",
"agent": "swing",
"parameters": {
"coin": "BTC",
"timeframe": "1D",
"slow": 44,
"fast": 0,
"size": 0.0001,
"leverage_long": 2,
"leverage_short": 1
}
},
"sma_125d_btc": {
"enabled": false,
"script": "strategy_template.py",
"agent": "swing_agent",
"parameters": {
"coin": "BTC",
"timeframe": "1D",
"sma_period": 125,
"size": 0.0001
}
},
"sma_44d_btc": {
"enabled": false,
"script": "strategy_template.py",
"agent": "swing_agent",
"parameters": {
"coin": "BTC",
"timeframe": "1D",
"sma_period": 44,
"size": 0.0001
}
}
}

View File

@ -0,0 +1,7 @@
{
"strategy_name": "ma_cross_btc",
"current_signal": "HOLD",
"last_signal_change_utc": "2025-10-12T17:00:00+00:00",
"signal_price": 114286.0,
"last_checked_utc": "2025-10-15T11:48:55.092260+00:00"
}

View File

@ -0,0 +1,7 @@
{
"strategy_name": "sma_125d_btc",
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-14T00:00:00+00:00",
"signal_price": 113026.0,
"last_checked_utc": "2025-10-16T10:42:03.203292+00:00"
}

View File

@ -0,0 +1,7 @@
{
"strategy_name": "sma_125d_eth",
"current_signal": "BUY",
"last_signal_change_utc": "2025-08-26T00:00:00+00:00",
"signal_price": 4600.63,
"last_checked_utc": "2025-10-15T17:35:17.663159+00:00"
}

View File

@ -0,0 +1,7 @@
{
"strategy_name": "sma_44d_btc",
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-14T00:00:00+00:00",
"signal_price": 113026.0,
"last_checked_utc": "2025-10-16T10:42:03.202977+00:00"
}

View File

@ -0,0 +1,7 @@
{
"strategy_name": "sma_5m_eth",
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-15T17:30:00+00:00",
"signal_price": 3937.5,
"last_checked_utc": "2025-10-15T17:35:05.035566+00:00"
}

View File

@ -0,0 +1,7 @@
{
"strategy_name": "sma_cross",
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-15T11:45:00+00:00",
"signal_price": 111957.0,
"last_checked_utc": "2025-10-15T12:10:05.048434+00:00"
}

View File

@ -0,0 +1,7 @@
{
"strategy_name": "sma_cross_1",
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-18T16:19:00+00:00",
"signal_price": 3870.5,
"last_checked_utc": "2025-10-18T16:40:05.039625+00:00"
}

View File

@ -0,0 +1,7 @@
{
"strategy_name": "sma_cross_2",
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-14T00:00:00+00:00",
"signal_price": 113026.0,
"last_checked_utc": "2025-10-18T16:40:09.950516+00:00"
}

View File

@ -0,0 +1,7 @@
{
"strategy_name": "sma_cross_eth_5m",
"current_signal": "SELL",
"last_signal_change_utc": "2025-10-15T11:45:00+00:00",
"signal_price": 4106.1,
"last_checked_utc": "2025-10-15T12:05:05.022308+00:00"
}

22
agents
View File

@ -1,3 +1,19 @@
agent 001
wallet: 0x7773833262f020c7979ec8aae38455c17ba4040c
Private Key: 0x659326d719a4322244d6e7f28e7fa2780f034e9f6a342ef1919664817e6248df
==================================================
SAVE THESE SECURELY. This is what your bot will use.
Name: trade_executor
(Agent has a default long-term validity)
🔑 Agent Private Key: 0xabed7379ec33253694eba50af8a392a88ea32b72b5f4f9cddceb0f5879428b69
🏠 Agent Address: 0xcB262CeAaE5D8A99b713f87a43Dd18E6Be892739
==================================================
SAVE THESE SECURELY. This is what your bot will use.
Name: executor_scalper
(Agent has a default long-term validity)
🔑 Agent Private Key: 0xe7bd4f3a1e29252ec40edff1bf796beaf13993d23a0c288a75d79c53e3c97812
🏠 Agent Address: 0xD211ba67162aD4E785cd4894D00A1A7A32843094
==================================================
SAVE THESE SECURELY. This is what your bot will use.
Name: executor_swing
(Agent has a default long-term validity)
🔑 Agent Private Key: 0xb6811c8b4a928556b3b95ccfaf72eb452b0d89a903f251b86955654672a3b6ab
🏠 Agent Address: 0xAD27c936672Fa368c2d96a47FDA34e8e3A0f318C
==================================================

247
backtester.py Normal file
View File

@ -0,0 +1,247 @@
import argparse
import logging
import os
import sys
import sqlite3
import pandas as pd
import json
from datetime import datetime, timedelta
import itertools
import multiprocessing
from functools import partial
import time
from logging_utils import setup_logging
def _run_single_simulation(df: pd.DataFrame, params: dict) -> list:
"""
Core simulation logic. Takes a DataFrame and parameters, returns a list of trades.
This is a pure function to be used by different data loaders.
"""
fast_ma_period = params.get('fast', 0)
slow_ma_period = params.get('slow', 0)
sma_period = params.get('sma_period', 0)
if fast_ma_period and slow_ma_period:
df['fast_sma'] = df['close'].rolling(window=fast_ma_period).mean()
df['slow_sma'] = df['close'].rolling(window=slow_ma_period).mean()
df['signal'] = (df['fast_sma'] > df['slow_sma']).astype(int)
elif sma_period:
df['sma'] = df['close'].rolling(window=sma_period).mean()
df['signal'] = (df['close'] > df['sma']).astype(int)
else:
return []
df.dropna(inplace=True)
if df.empty: return []
df['position'] = df['signal'].diff()
trades = []
entry_price = 0
for i, row in df.iterrows():
if row['position'] == 1:
if entry_price == 0: # Only enter if flat
entry_price = row['close']
elif row['position'] == -1:
if entry_price != 0: # Only exit if in a position
pnl = (row['close'] - entry_price) / entry_price
trades.append({'pnl_pct': pnl})
entry_price = 0
return trades
def simulation_worker(params: dict, db_path: str, coin: str, timeframe: str, start_date: str, end_date: str) -> tuple[dict, list]:
"""
A worker function for multiprocessing. It loads its own data from the DB
and then runs the simulation, returning the parameters and results together.
"""
df = pd.DataFrame()
try:
with sqlite3.connect(db_path) as conn:
query = f'SELECT datetime_utc, close FROM "{coin}_{timeframe}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc'
df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc'])
if not df.empty:
df.set_index('datetime_utc', inplace=True)
except Exception as e:
print(f"Worker error loading data for params {params}: {e}")
return (params, [])
if df.empty:
return (params, [])
trades = _run_single_simulation(df, params)
return (params, trades)
class Backtester:
"""
A class to run a Walk-Forward Optimization, which is the gold standard
for testing the robustness of a trading strategy.
"""
def __init__(self, log_level: str, strategy_name_to_test: str):
setup_logging(log_level, 'Backtester')
self.db_path = os.path.join("_data", "market_data.db")
self.backtest_config = self._load_backtest_config(strategy_name_to_test)
if not self.backtest_config:
logging.error(f"Backtest configuration for '{strategy_name_to_test}' not found.")
sys.exit(1)
self.strategy_name = self.backtest_config.get('strategy_name')
self.strategy_config = self._load_strategy_config()
if not self.strategy_config:
logging.error(f"Strategy '{self.strategy_name}' not found.")
sys.exit(1)
self.params = self.strategy_config.get('parameters', {})
self.coin = self.params.get('coin')
self.timeframe = self.params.get('timeframe')
self.pool = None
def _load_backtest_config(self, name_to_test: str) -> dict:
config_path = os.path.join("_data", "backtesting_conf.json")
try:
with open(config_path, 'r') as f: return json.load(f).get(name_to_test)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load backtesting configuration: {e}")
return None
def _load_strategy_config(self) -> dict:
config_path = os.path.join("_data", "strategies.json")
try:
with open(config_path, 'r') as f: return json.load(f).get(self.strategy_name)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load strategy configuration: {e}")
return None
def run_walk_forward_optimization(self, num_periods=10, in_sample_pct=0.9):
"""
Main function to orchestrate the walk-forward analysis.
"""
full_df = self.load_data("2020-01-01", datetime.now().strftime("%Y-%m-%d"))
if full_df.empty: return
period_length = len(full_df) // num_periods
all_out_of_sample_trades = []
for i in range(num_periods):
logging.info(f"\n--- Starting Walk-Forward Period {i+1}/{num_periods} ---")
# 1. Define the In-Sample (training) and Out-of-Sample (testing) periods
start_index = i * period_length
in_sample_end_index = start_index + int(period_length * in_sample_pct)
out_of_sample_end_index = start_index + period_length
if in_sample_end_index >= len(full_df) or out_of_sample_end_index > len(full_df):
logging.warning("Not enough data for the full final period. Ending analysis.")
break
in_sample_df = full_df.iloc[start_index:in_sample_end_index]
out_of_sample_df = full_df.iloc[in_sample_end_index:out_of_sample_end_index]
logging.info(f"In-Sample: {in_sample_df.index[0].date()} to {in_sample_df.index[-1].date()}")
logging.info(f"Out-of-Sample: {out_of_sample_df.index[0].date()} to {out_of_sample_df.index[-1].date()}")
# 2. Find the best parameters on the In-Sample data
best_params = self._find_best_params(in_sample_df)
if not best_params:
logging.warning("No profitable parameters found in this period. Skipping.")
continue
# 3. Test the best parameters on the Out-of-Sample data
logging.info(f"Testing best params {best_params} on Out-of-Sample data...")
out_of_sample_trades = _run_single_simulation(out_of_sample_df.copy(), best_params)
all_out_of_sample_trades.extend(out_of_sample_trades)
self._generate_report(out_of_sample_trades, f"Period {i+1} Out-of-Sample Results")
# 4. Generate a final report for all combined out-of-sample trades
print("\n" + "="*50)
self._generate_report(all_out_of_sample_trades, "AGGREGATE WALK-FORWARD PERFORMANCE")
print("="*50)
def _find_best_params(self, df: pd.DataFrame) -> dict:
"""Runs a multi-core optimization on a given slice of data."""
param_configs = self.backtest_config.get('optimization_params', {})
param_names = list(param_configs.keys())
param_ranges = [range(p['start'], p['end'] + 1, p['step']) for p in param_configs.values()]
all_combinations = list(itertools.product(*param_ranges))
param_dicts = [dict(zip(param_names, combo)) for combo in all_combinations]
logging.info(f"Optimizing on {len(all_combinations)} combinations...")
num_cores = 60
self.pool = multiprocessing.Pool(processes=num_cores)
worker = partial(_run_single_simulation, df.copy())
all_trades_results = self.pool.map(worker, param_dicts)
self.pool.close()
self.pool.join()
self.pool = None
results = []
for i, trades in enumerate(all_trades_results):
if trades:
results.append({'params': param_dicts[i], 'pnl': sum(t['pnl_pct'] for t in trades)})
if not results: return None
return max(results, key=lambda x: x['pnl'])['params']
def load_data(self, start_date, end_date):
# This is a simplified version for the main data load
table_name = f"{self.coin}_{self.timeframe}"
logging.info(f"Loading full dataset for {table_name}...")
try:
with sqlite3.connect(self.db_path) as conn:
query = f'SELECT * FROM "{table_name}" WHERE date(datetime_utc) >= ? AND date(datetime_utc) <= ? ORDER BY datetime_utc'
df = pd.read_sql(query, conn, params=(start_date, end_date), parse_dates=['datetime_utc'])
if df.empty:
logging.warning("No data found for the specified date range.")
return pd.DataFrame()
df.set_index('datetime_utc', inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data for backtest: {e}")
return pd.DataFrame()
def _generate_report(self, trades: list, title: str):
"""Calculates and prints key performance metrics."""
print(f"\n--- {title} ---")
if not trades:
print("No trades were executed during this period.")
return
num_trades = len(trades)
wins = [t for t in trades if t['pnl_pct'] > 0]
total_pnl = sum(t['pnl_pct'] for t in trades)
print(f"Total Trades: {num_trades}")
print(f"Win Rate: {(len(wins) / num_trades) * 100 if num_trades > 0 else 0:.2f}%")
print(f"Total PNL (Cumulative %): {total_pnl * 100:.2f}%")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a Walk-Forward Optimization for a trading strategy.")
parser.add_argument("--strategy", required=True, help="The name of the backtest config to run (from backtesting_conf.json).")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
backtester = Backtester(
log_level=args.log_level,
strategy_name_to_test=args.strategy
)
try:
backtester.run_walk_forward_optimization()
except KeyboardInterrupt:
logging.info("\nWalk-Forward Optimization cancelled by user.")
finally:
if backtester.pool:
logging.info("Terminating worker processes...")
backtester.pool.terminate()
backtester.pool.join()
logging.info("Worker processes terminated.")

70
create_agent.py Normal file
View File

@ -0,0 +1,70 @@
import os
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.utils import constants
from dotenv import load_dotenv
from datetime import datetime, timedelta
import json
# Load environment variables from a .env file if it exists
load_dotenv()
def create_and_authorize_agent():
"""
Creates and authorizes a new agent key pair using your main wallet,
following the correct SDK pattern.
"""
# --- STEP 1: Load your main wallet ---
# This is the wallet that holds the funds and has been activated on Hyperliquid.
main_wallet_private_key = os.environ.get("MAIN_WALLET_PRIVATE_KEY")
if not main_wallet_private_key:
main_wallet_private_key = input("Please enter the private key of your MAIN trading wallet: ")
try:
main_account = Account.from_key(main_wallet_private_key)
print(f"\n✅ Loaded main wallet: {main_account.address}")
except Exception as e:
print(f"❌ Error: Invalid main wallet private key provided. Details: {e}")
return
# --- STEP 2: Initialize the Exchange with your MAIN account ---
# This object is used to send the authorization transaction.
exchange = Exchange(main_account, constants.MAINNET_API_URL, account_address=main_account.address)
# --- STEP 3: Create and approve the agent with a specific name ---
# agent name must be between 1 and 16 characters long
agent_name = "executor_swing"
print(f"\n🔗 Authorizing a new agent named '{agent_name}'...")
try:
# --- FIX: Pass only the agent name string to the function ---
approve_result, agent_private_key = exchange.approve_agent(agent_name)
if approve_result.get("status") == "ok":
# Derive the agent's public address from the key we received
agent_account = Account.from_key(agent_private_key)
print("\n🎉 SUCCESS! Agent has been authorized on-chain.")
print("="*50)
print("SAVE THESE SECURELY. This is what your bot will use.")
print(f" Name: {agent_name}")
print(f" (Agent has a default long-term validity)")
print(f"🔑 Agent Private Key: {agent_private_key}")
print(f"🏠 Agent Address: {agent_account.address}")
print("="*50)
print("\nYou can now set this private key as the AGENT_PRIVATE_KEY environment variable.")
else:
print("\n❌ ERROR: Agent authorization failed.")
print(" Response:", approve_result)
if "Vault may not perform this action" in str(approve_result):
print("\n ACTION REQUIRED: This error means your main wallet (vault) has not been activated. "
"Please go to the Hyperliquid website, connect this wallet, and make a deposit to activate it.")
except Exception as e:
print(f"\nAn unexpected error occurred during authorization: {e}")
if __name__ == "__main__":
create_and_authorize_agent()

118
fix_timestamps.py Normal file
View File

@ -0,0 +1,118 @@
import argparse
import logging
import os
import sys
import sqlite3
import pandas as pd
# script to fix missing millisecond timestamps in the database after import from CSVs (this is already fixed in import_csv.py)
# Assuming logging_utils.py is in the same directory
from logging_utils import setup_logging
class DatabaseFixer:
"""
Scans the SQLite database for rows with missing millisecond timestamps
and updates them based on the datetime_utc column.
"""
def __init__(self, log_level: str, coin: str):
setup_logging(log_level, 'TimestampFixer')
self.coin = coin
self.table_name = f"{self.coin}_1m"
self.db_path = os.path.join("_data", "market_data.db")
def run(self):
"""Orchestrates the entire database update and verification process."""
logging.info(f"Starting timestamp fix process for table '{self.table_name}'...")
if not os.path.exists(self.db_path):
logging.error(f"Database file not found at '{self.db_path}'. Exiting.")
sys.exit(1)
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
# 1. Check how many rows need fixing
rows_to_fix_count = self._count_rows_to_fix(conn)
if rows_to_fix_count == 0:
logging.info(f"No rows with missing timestamps found in '{self.table_name}'. No action needed.")
return
logging.info(f"Found {rows_to_fix_count:,} rows with missing timestamps to update.")
# 2. Process the table in chunks to conserve memory
updated_count = self._process_in_chunks(conn)
# 3. Provide a final summary
self._summarize_update(rows_to_fix_count, updated_count)
except Exception as e:
logging.error(f"A critical error occurred: {e}")
def _count_rows_to_fix(self, conn) -> int:
"""Counts the number of rows where timestamp_ms is NULL."""
try:
return pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}" WHERE timestamp_ms IS NULL', conn).iloc[0, 0]
except pd.io.sql.DatabaseError:
logging.error(f"Table '{self.table_name}' not found in the database. Cannot fix timestamps.")
sys.exit(1)
def _process_in_chunks(self, conn) -> int:
"""Reads, calculates, and updates timestamps in manageable chunks."""
total_updated = 0
chunk_size = 50000 # Process 50,000 rows at a time
# We select the special 'rowid' column to uniquely identify each row for updating
query = f'SELECT rowid, datetime_utc FROM "{self.table_name}" WHERE timestamp_ms IS NULL'
for chunk_df in pd.read_sql_query(query, conn, chunksize=chunk_size):
if chunk_df.empty:
break
logging.info(f"Processing a chunk of {len(chunk_df)} rows...")
# Calculate the missing timestamps
chunk_df['datetime_utc'] = pd.to_datetime(chunk_df['datetime_utc'])
chunk_df['timestamp_ms'] = (chunk_df['datetime_utc'].astype('int64') // 10**6)
# Prepare data for the update command: a list of (timestamp, rowid) tuples
update_data = list(zip(chunk_df['timestamp_ms'], chunk_df['rowid']))
# Use executemany for a fast bulk update
cursor = conn.cursor()
cursor.executemany(f'UPDATE "{self.table_name}" SET timestamp_ms = ? WHERE rowid = ?', update_data)
conn.commit()
total_updated += len(chunk_df)
logging.info(f"Updated {total_updated} rows so far...")
return total_updated
def _summarize_update(self, expected_count: int, actual_count: int):
"""Prints a final summary of the update process."""
logging.info("--- Timestamp Fix Summary ---")
print(f"\n{'Status':<25}: COMPLETE")
print("-" * 40)
print(f"{'Table Processed':<25}: {self.table_name}")
print(f"{'Rows Needing Update':<25}: {expected_count:,}")
print(f"{'Rows Successfully Updated':<25}: {actual_count:,}")
if expected_count == actual_count:
logging.info("Verification successful: All necessary rows have been updated.")
else:
logging.warning("Verification warning: The number of updated rows does not match the expected count.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fix missing millisecond timestamps in the SQLite database.")
parser.add_argument("--coin", default="BTC", help="The coin symbol for the table to fix (e.g., BTC).")
parser.add_argument(
"--log-level",
default="normal",
choices=['off', 'normal', 'debug'],
help="Set the logging level for the script."
)
args = parser.parse_args()
fixer = DatabaseFixer(log_level=args.log_level, coin=args.coin)
fixer.run()

154
import_csv.py Normal file
View File

@ -0,0 +1,154 @@
import argparse
import logging
import os
import sys
import sqlite3
import pandas as pd
from datetime import datetime
# Assuming logging_utils.py is in the same directory
from logging_utils import setup_logging
class CsvImporter:
"""
Imports historical candle data from a large CSV file into the SQLite database,
intelligently adding only the missing data.
"""
def __init__(self, log_level: str, csv_path: str, coin: str):
setup_logging(log_level, 'CsvImporter')
if not os.path.exists(csv_path):
logging.error(f"CSV file not found at '{csv_path}'. Please check the path.")
sys.exit(1)
self.csv_path = csv_path
self.coin = coin
# --- FIX: Corrected the f-string syntax for the table name ---
self.table_name = f"{self.coin}_1m"
self.db_path = os.path.join("_data", "market_data.db")
self.column_mapping = {
'Open time': 'datetime_utc',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
'Number of trades': 'number_of_trades'
}
def run(self):
"""Orchestrates the entire import and verification process."""
logging.info(f"Starting import process for '{self.coin}' from '{self.csv_path}'...")
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
# 1. Get the current state of the database
db_oldest, db_newest, initial_row_count = self._get_db_state(conn)
# 2. Read, clean, and filter the CSV data
new_data_df = self._process_and_filter_csv(db_oldest, db_newest)
if new_data_df.empty:
logging.info("No new data to import. Database is already up-to-date with the CSV file.")
return
# 3. Append the new data to the database
self._append_to_db(new_data_df, conn)
# 4. Summarize and verify the import
self._summarize_import(initial_row_count, len(new_data_df), conn)
def _get_db_state(self, conn) -> (datetime, datetime, int):
"""Gets the oldest and newest timestamps and total row count from the DB table."""
try:
oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0]
oldest_dt = pd.to_datetime(oldest) if oldest else None
newest_dt = pd.to_datetime(newest) if newest else None
if oldest_dt:
logging.info(f"Database contains data from {oldest_dt} to {newest_dt}.")
else:
logging.info("Database table is empty. A full import will be performed.")
return oldest_dt, newest_dt, count
except pd.io.sql.DatabaseError:
logging.info(f"Table '{self.table_name}' not found. It will be created.")
return None, None, 0
def _process_and_filter_csv(self, db_oldest: datetime, db_newest: datetime) -> pd.DataFrame:
"""Reads the CSV and returns a DataFrame of only the missing data."""
logging.info("Reading and processing CSV file. This may take a moment for large files...")
df = pd.read_csv(self.csv_path, usecols=self.column_mapping.keys())
# Clean and format the data
df.rename(columns=self.column_mapping, inplace=True)
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
# --- FIX: Calculate the millisecond timestamp from the datetime column ---
# This converts the datetime to nanoseconds and then to milliseconds.
df['timestamp_ms'] = (df['datetime_utc'].astype('int64') // 10**6)
# Filter the data to find only rows that are outside the range of what's already in the DB
if db_oldest and db_newest:
# Get data from before the oldest record and after the newest record
df_filtered = df[(df['datetime_utc'] < db_oldest) | (df['datetime_utc'] > db_newest)]
else:
# If the DB is empty, all data is new
df_filtered = df
logging.info(f"Found {len(df_filtered):,} new rows to import.")
return df_filtered
def _append_to_db(self, df: pd.DataFrame, conn):
"""Appends the DataFrame to the SQLite table."""
logging.info(f"Appending {len(df):,} new rows to the database...")
df.to_sql(self.table_name, conn, if_exists='append', index=False)
logging.info("Append operation complete.")
def _summarize_import(self, initial_count: int, added_count: int, conn):
"""Prints a final summary and verification of the import."""
logging.info("--- Import Summary & Verification ---")
try:
final_count = pd.read_sql(f'SELECT COUNT(*) FROM "{self.table_name}"', conn).iloc[0, 0]
new_oldest = pd.read_sql(f'SELECT MIN(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
new_newest = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{self.table_name}"', conn).iloc[0, 0]
print(f"\n{'Status':<20}: SUCCESS")
print("-" * 40)
print(f"{'Initial Row Count':<20}: {initial_count:,}")
print(f"{'Rows Added':<20}: {added_count:,}")
print(f"{'Final Row Count':<20}: {final_count:,}")
print("-" * 40)
print(f"{'New Oldest Record':<20}: {new_oldest}")
print(f"{'New Newest Record':<20}: {new_newest}")
# Verification check
if final_count == initial_count + added_count:
logging.info("Verification successful: Final row count matches expected count.")
else:
logging.warning("Verification warning: Final row count does not match expected count.")
except Exception as e:
logging.error(f"Could not generate summary. Error: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Import historical CSV data into the SQLite database.")
parser.add_argument("--file", required=True, help="Path to the large CSV file to import.")
parser.add_argument("--coin", default="BTC", help="The coin symbol for this data (e.g., BTC).")
parser.add_argument(
"--log-level",
default="normal",
choices=['off', 'normal', 'debug'],
help="Set the logging level for the script."
)
args = parser.parse_args()
importer = CsvImporter(log_level=args.log_level, csv_path=args.file, coin=args.coin)
importer.run()

View File

@ -1,5 +1,29 @@
import logging
import sys
from datetime import datetime
class LocalTimeFormatter(logging.Formatter):
"""
Custom formatter to display time with milliseconds and a (UTC+HH) offset.
"""
def formatTime(self, record, datefmt=None):
# Convert log record's creation time to a local, timezone-aware datetime object
dt = datetime.fromtimestamp(record.created).astimezone()
# Format the main time part
time_part = dt.strftime('%Y-%m-%d %H:%M:%S')
# Get the UTC offset and format it as (UTC+HH)
offset = dt.utcoffset()
offset_str = ""
if offset is not None:
offset_hours = int(offset.total_seconds() / 3600)
sign = '+' if offset_hours >= 0 else ''
offset_str = f" (UTC{sign}{offset_hours})"
# --- FIX: Cast record.msecs from float to int before formatting ---
# Combine time, milliseconds, and the offset string
return f"{time_part},{int(record.msecs):03d}{offset_str}"
def setup_logging(log_level: str, process_name: str):
"""
@ -29,10 +53,9 @@ def setup_logging(log_level: str, process_name: str):
handler = logging.StreamHandler(sys.stdout)
# --- FIX: Added a date format that includes the timezone name (%Z) ---
formatter = logging.Formatter(
f'%(asctime)s - {process_name} - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S %Z'
# This will produce timestamps like: 2025-10-13 14:30:00,123 (UTC+2)
formatter = LocalTimeFormatter(
f'%(asctime)s - {process_name} - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)

View File

@ -17,146 +17,336 @@ WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SU
COIN_LISTER_SCRIPT = "list_coins.py"
MARKET_FEEDER_SCRIPT = "market.py"
DATA_FETCHER_SCRIPT = "data_fetcher.py"
RESAMPLER_SCRIPT = "resampler.py" # Restored resampler script
RESAMPLER_SCRIPT = "resampler.py"
MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py"
STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json")
PRICE_DATA_FILE = os.path.join("_data", "current_prices.json")
DB_PATH = os.path.join("_data", "market_data.db")
STATUS_FILE = os.path.join("_data", "fetcher_status.json")
MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json")
LOGS_DIR = "_logs"
TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json")
def format_market_cap(mc_value):
"""Formats a large number into a human-readable market cap string."""
if not isinstance(mc_value, (int, float)) or mc_value == 0:
return "N/A"
if mc_value >= 1_000_000_000_000:
return f"${mc_value / 1_000_000_000_000:.2f}T"
if mc_value >= 1_000_000_000:
return f"${mc_value / 1_000_000_000:.2f}B"
if mc_value >= 1_000_000:
return f"${mc_value / 1_000_000:.2f}M"
return f"${mc_value:,.2f}"
def run_market_feeder():
"""Target function to run the market.py script in a separate process."""
setup_logging('off', 'MarketFeedProcess')
logging.info("Market feeder process started.")
try:
# Pass the log level to the script
subprocess.run([sys.executable, MARKET_FEEDER_SCRIPT, "--log-level", "off"], check=True)
except subprocess.CalledProcessError as e:
logging.error(f"Market feeder script failed with error: {e}")
except KeyboardInterrupt:
logging.info("Market feeder process stopping.")
"""Target function to run market.py and redirect its output to a log file."""
log_file = os.path.join(LOGS_DIR, "market_feeder.log")
while True:
try:
with open(log_file, 'a') as f:
subprocess.run(
[sys.executable, MARKET_FEEDER_SCRIPT, "--log-level", "off"],
check=True, stdout=f, stderr=subprocess.STDOUT
)
except (subprocess.CalledProcessError, Exception) as e:
with open(log_file, 'a') as f:
f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n")
f.write(f"Market feeder script failed: {e}. Restarting...\n")
time.sleep(5)
def run_data_fetcher_job():
"""Defines the job to be run by the scheduler for the data fetcher."""
logging.info(f"Scheduler starting data_fetcher.py task for {', '.join(WATCHED_COINS)}...")
"""Defines the job for the data fetcher, redirecting output to a log file."""
log_file = os.path.join(LOGS_DIR, "data_fetcher.log")
try:
command = [sys.executable, DATA_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--days", "7", "--log-level", "off"]
subprocess.run(command, check=True)
logging.info("data_fetcher.py task finished successfully.")
with open(log_file, 'a') as f:
f.write(f"\n--- Starting data_fetcher.py job at {datetime.now()} ---\n")
subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT)
except Exception as e:
logging.error(f"Failed to run data_fetcher.py job: {e}")
with open(log_file, 'a') as f:
f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n")
f.write(f"Failed to run data_fetcher.py job: {e}\n")
def data_fetcher_scheduler():
"""Schedules and runs the data_fetcher.py script periodically."""
"""Schedules the data_fetcher.py script."""
setup_logging('off', 'DataFetcherScheduler')
run_data_fetcher_job()
schedule.every(1).minutes.do(run_data_fetcher_job)
logging.info("Data fetcher scheduled to run every 1 minute.")
while True:
schedule.run_pending()
time.sleep(1)
# --- Restored Resampler Functions ---
def run_resampler_job():
"""Defines the job to be run by the scheduler for the resampler."""
logging.info(f"Scheduler starting resampler.py task for {', '.join(WATCHED_COINS)}...")
def run_resampler_job(timeframes_to_generate: list):
"""Defines the job for the resampler, redirecting output to a log file."""
log_file = os.path.join(LOGS_DIR, "resampler.log")
try:
# Uses default timeframes configured within resampler.py
command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"]
subprocess.run(command, check=True)
logging.info("resampler.py task finished successfully.")
command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--timeframes"] + timeframes_to_generate + ["--log-level", "off"]
with open(log_file, 'a') as f:
f.write(f"\n--- Starting resampler.py job at {datetime.now()} ---\n")
subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT)
except Exception as e:
logging.error(f"Failed to run resampler.py job: {e}")
with open(log_file, 'a') as f:
f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n")
f.write(f"Failed to run resampler.py job: {e}\n")
def resampler_scheduler():
"""Schedules and runs the resampler.py script periodically."""
def resampler_scheduler(timeframes_to_generate: list):
"""Schedules the resampler.py script."""
setup_logging('off', 'ResamplerScheduler')
run_resampler_job()
schedule.every(4).minutes.do(run_resampler_job)
logging.info("Resampler scheduled to run every 4 minutes.")
run_resampler_job(timeframes_to_generate)
schedule.every(4).minutes.do(run_resampler_job, timeframes_to_generate)
while True:
schedule.run_pending()
time.sleep(1)
# --- End of Restored Functions ---
def run_market_cap_fetcher_job():
"""Defines the job for the market cap fetcher, redirecting output."""
log_file = os.path.join(LOGS_DIR, "market_cap_fetcher.log")
try:
command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"]
with open(log_file, 'a') as f:
f.write(f"\n--- Starting {MARKET_CAP_FETCHER_SCRIPT} job at {datetime.now()} ---\n")
subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT)
except Exception as e:
with open(log_file, 'a') as f:
f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n")
f.write(f"Failed to run {MARKET_CAP_FETCHER_SCRIPT} job: {e}\n")
def market_cap_fetcher_scheduler():
"""Schedules the market_cap_fetcher.py script to run daily at a specific UTC time."""
setup_logging('off', 'MarketCapScheduler')
schedule.every().day.at("00:15", "UTC").do(run_market_cap_fetcher_job)
while True:
schedule.run_pending()
time.sleep(60)
def run_strategy(strategy_name: str, config: dict):
"""Target function to run a strategy, redirecting its output to a log file."""
log_file = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log")
script_name = config['script']
params_str = json.dumps(config['parameters'])
command = [sys.executable, script_name, "--name", strategy_name, "--params", params_str, "--log-level", "normal"]
while True:
try:
with open(log_file, 'a') as f:
f.write(f"\n--- Starting strategy '{strategy_name}' at {datetime.now()} ---\n")
subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT)
except (subprocess.CalledProcessError, Exception) as e:
with open(log_file, 'a') as f:
f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n")
f.write(f"Strategy '{strategy_name}' failed: {e}. Restarting...\n")
time.sleep(10)
class MainApp:
def __init__(self, coins_to_watch: list):
def __init__(self, coins_to_watch: list, processes: dict, strategy_configs: dict):
self.watched_coins = coins_to_watch
self.prices = {}
self.market_caps = {}
self.last_db_update_info = "Initializing..."
self._lines_printed = 0 # To track how many lines we printed last time
self.open_positions = {}
self.background_processes = processes
self.process_status = {}
self.strategy_configs = strategy_configs
self.strategy_statuses = {}
def read_prices(self):
"""Reads the latest prices from the JSON file."""
if not os.path.exists(PRICE_DATA_FILE):
return
try:
with open(PRICE_DATA_FILE, 'r', encoding='utf-8') as f:
self.prices = json.load(f)
except (json.JSONDecodeError, IOError):
logging.debug("Could not read price file (might be locked).")
if os.path.exists(PRICE_DATA_FILE):
try:
with open(PRICE_DATA_FILE, 'r', encoding='utf-8') as f:
self.prices = json.load(f)
except (json.JSONDecodeError, IOError):
logging.debug("Could not read price file.")
def read_market_caps(self):
"""Reads the latest market cap summary from its JSON file."""
if os.path.exists(MARKET_CAP_SUMMARY_FILE):
try:
with open(MARKET_CAP_SUMMARY_FILE, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
for coin in self.watched_coins:
table_key = f"{coin}_market_cap"
if table_key in summary_data:
self.market_caps[coin] = summary_data[table_key].get('market_cap')
except (json.JSONDecodeError, IOError):
logging.debug("Could not read market cap summary file.")
def read_strategy_statuses(self):
"""Reads the status JSON file for each enabled strategy."""
enabled_statuses = {}
for name, config in self.strategy_configs.items():
if config.get("enabled", False):
status_file = os.path.join("_data", f"strategy_status_{name}.json")
if os.path.exists(status_file):
try:
with open(status_file, 'r', encoding='utf-8') as f:
enabled_statuses[name] = json.load(f)
except (IOError, json.JSONDecodeError):
enabled_statuses[name] = {"error": "Could not read status file."}
else:
enabled_statuses[name] = {"current_signal": "Initializing..."}
self.strategy_statuses = enabled_statuses
def read_executor_status(self):
"""Reads the live status file from the trade executor."""
if os.path.exists(TRADE_EXECUTOR_STATUS_FILE):
try:
with open(TRADE_EXECUTOR_STATUS_FILE, 'r', encoding='utf-8') as f:
self.open_positions = json.load(f)
except (IOError, json.JSONDecodeError):
logging.debug("Could not read trade executor status file.")
else:
self.open_positions = {}
def get_overall_db_status(self):
"""Reads the fetcher status from the status file."""
if not os.path.exists(STATUS_FILE):
self.last_db_update_info = "Status file not found."
return
try:
with open(STATUS_FILE, 'r', encoding='utf-8') as f:
status = json.load(f)
coin = status.get("last_updated_coin")
timestamp_utc_str = status.get("last_run_timestamp_utc")
num_candles = status.get("num_updated_candles", 0)
if os.path.exists(STATUS_FILE):
try:
with open(STATUS_FILE, 'r', encoding='utf-8') as f:
status = json.load(f)
coin = status.get("last_updated_coin")
timestamp_utc_str = status.get("last_run_timestamp_utc")
num_candles = status.get("num_updated_candles", 0)
if timestamp_utc_str:
dt_utc = datetime.fromisoformat(timestamp_utc_str.replace('Z', '+00:00')).replace(tzinfo=timezone.utc)
dt_local = dt_utc.astimezone(None)
offset = dt_local.utcoffset()
offset_hours = int(offset.total_seconds() / 3600)
sign = '+' if offset_hours >= 0 else ''
offset_str = f"UTC{sign}{offset_hours}"
timestamp_display = f"{dt_local.strftime('%Y-%m-%d %H:%M:%S')} {offset_str}"
else:
timestamp_display = "N/A"
self.last_db_update_info = f"{coin} at {timestamp_display} | {num_candles} candles"
except (IOError, json.JSONDecodeError):
self.last_db_update_info = "Error reading status file."
if timestamp_utc_str:
dt_naive = datetime.strptime(timestamp_utc_str, '%Y-%m-%d %H:%M:%S')
dt_utc = dt_naive.replace(tzinfo=timezone.utc)
dt_local = dt_utc.astimezone(None)
timestamp_display = dt_local.strftime('%Y-%m-%d %H:%M:%S %Z')
else:
timestamp_display = "N/A"
self.last_db_update_info = f"{coin} at {timestamp_display} ({num_candles} candles)"
except (IOError, json.JSONDecodeError) as e:
self.last_db_update_info = "Error reading status file."
logging.error(f"Could not read status file: {e}")
def check_process_status(self):
"""Checks if the background processes are still running."""
for name, process in self.background_processes.items():
self.process_status[name] = "Running" if process.is_alive() else "STOPPED"
def display_dashboard(self):
"""Displays a formatted table for prices and DB status without blinking."""
# Move the cursor up to overwrite the previous output
if self._lines_printed > 0:
print(f"\x1b[{self._lines_printed}A", end="")
# Build the output as a single string
output_lines = []
output_lines.append("--- Market Dashboard ---")
table_width = 26
output_lines.append("-" * table_width)
output_lines.append(f"{'#':<2} | {'Coin':<6} | {'Live Price':>10} |")
output_lines.append("-" * table_width)
"""Displays a formatted dashboard with side-by-side tables."""
print("\x1b[H\x1b[J", end="") # Clear screen
left_table_lines = []
left_table_width = 44
left_table_lines.append("--- Market Dashboard ---")
left_table_lines.append("-" * left_table_width)
left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Live Price':>10} | {'Market Cap':>15} |")
left_table_lines.append("-" * left_table_width)
for i, coin in enumerate(self.watched_coins, 1):
price = self.prices.get(coin, "Loading...")
output_lines.append(f"{i:<2} | {coin:<6} | {price:>10} |")
output_lines.append("-" * table_width)
output_lines.append(f"DB Status: Last coin updated -> {self.last_db_update_info}")
market_cap = self.market_caps.get(coin)
formatted_mc = format_market_cap(market_cap)
left_table_lines.append(f"{i:<2} | {coin:^6} | {price:>10} | {formatted_mc:>15} |")
left_table_lines.append("-" * left_table_width)
right_table_lines = []
right_table_width = 154
right_table_lines.append("--- Strategy Status ---")
right_table_lines.append("-" * right_table_width)
right_table_lines.append(f"{'#':^2} | {'Strategy Name':<25} | {'Coin':^6} | {'Signal':^8} | {'Signal Price':>12} | {'Last Change':>17} | {'TF':^5} | {'Size':^8} | {'Parameters':<45} |")
right_table_lines.append("-" * right_table_width)
for i, (name, status) in enumerate(self.strategy_statuses.items(), 1):
signal = status.get('current_signal', 'N/A')
price = status.get('signal_price')
price_display = f"{price:.4f}" if isinstance(price, (int, float)) else "-"
last_change = status.get('last_signal_change_utc')
last_change_display = 'Never'
if last_change:
dt_utc = datetime.fromisoformat(last_change.replace('Z', '+00:00')).replace(tzinfo=timezone.utc)
dt_local = dt_utc.astimezone(None)
last_change_display = dt_local.strftime('%Y-%m-%d %H:%M')
config_params = self.strategy_configs.get(name, {}).get('parameters', {})
coin = config_params.get('coin', 'N/A')
timeframe = config_params.get('timeframe', 'N/A')
size = config_params.get('size', 'N/A')
other_params = {k: v for k, v in config_params.items() if k not in ['coin', 'timeframe', 'size']}
params_str = ", ".join([f"{k}={v}" for k, v in other_params.items()])
right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size:>8} | {params_str:<45} |")
right_table_lines.append("-" * right_table_width)
# Join lines and add a code to clear from cursor to end of screen
# This prevents artifacts if the new output is shorter than the old one.
final_output = "\n".join(output_lines) + "\n\x1b[J"
print(final_output, end="")
# Store the number of lines printed for the next iteration
self._lines_printed = len(output_lines)
output_lines = []
max_rows = max(len(left_table_lines), len(right_table_lines))
separator = " "
indent = " " * 10
for i in range(max_rows):
left_part = left_table_lines[i] if i < len(left_table_lines) else " " * left_table_width
right_part = indent + right_table_lines[i] if i < len(right_table_lines) else ""
output_lines.append(f"{left_part}{separator}{right_part}")
output_lines.append(f"\nDB Status: Last update -> {self.last_db_update_info}")
output_lines.append("\n--- Open Positions ---")
pos_table_width = 100
output_lines.append("-" * pos_table_width)
output_lines.append(f"{'Account':<10} | {'Coin':<6} | {'Size':>15} | {'Entry Price':>12} | {'Mark Price':>12} | {'PNL':>15} | {'Leverage':>10} |")
output_lines.append("-" * pos_table_width)
perps_positions = self.open_positions.get('perpetuals_account', {}).get('open_positions', [])
spot_positions = self.open_positions.get('spot_account', {}).get('positions', [])
if not perps_positions and not spot_positions:
output_lines.append("No open positions found.")
else:
for pos in perps_positions:
# --- FIX: Safely handle potentially None values before formatting ---
try:
pnl = float(pos.get('pnl', 0.0))
pnl_str = f"${pnl:,.2f}"
except (ValueError, TypeError):
pnl_str = "Error"
coin = pos.get('coin') or '-'
size = pos.get('size') or '-'
entry_price = pos.get('entry_price') or '-'
mark_price = pos.get('mark_price') or '-'
leverage = pos.get('leverage') or '-'
output_lines.append(f"{'Perps':<10} | {coin:<6} | {size:>15} | {entry_price:>12} | {mark_price:>12} | {pnl_str:>15} | {leverage:>10} |")
for pos in spot_positions:
pnl = pos.get('pnl', 'N/A')
coin = pos.get('coin') or '-'
balance_size = pos.get('balance_size') or '-'
output_lines.append(f"{'Spot':<10} | {coin:<6} | {balance_size:>15} | {'-':>12} | {'-':>12} | {pnl:>15} | {'-':>10} |")
output_lines.append("-" * pos_table_width)
output_lines.append("\n--- Background Processes ---")
for name, status in self.process_status.items():
output_lines.append(f"{name:<25}: {status}")
final_output = "\n".join(output_lines)
print(final_output)
sys.stdout.flush()
def run(self):
"""Main loop to read and display data."""
"""Main loop to read data, display dashboard, and check processes."""
while True:
self.read_prices()
self.read_market_caps()
self.get_overall_db_status()
self.read_strategy_statuses()
self.read_executor_status()
self.check_process_status()
self.display_dashboard()
time.sleep(2)
@ -164,6 +354,9 @@ class MainApp:
if __name__ == "__main__":
setup_logging('normal', 'MainApp')
if not os.path.exists(LOGS_DIR):
os.makedirs(LOGS_DIR)
logging.info(f"Running coin lister: '{COIN_LISTER_SCRIPT}'...")
try:
subprocess.run([sys.executable, COIN_LISTER_SCRIPT], check=True, capture_output=True, text=True)
@ -171,35 +364,55 @@ if __name__ == "__main__":
logging.error(f"Failed to run '{COIN_LISTER_SCRIPT}'. Error: {e.stderr}")
sys.exit(1)
logging.info(f"Starting market feeder ('{MARKET_FEEDER_SCRIPT}')...")
market_process = multiprocessing.Process(target=run_market_feeder, daemon=True)
market_process.start()
processes = {}
strategy_configs = {}
logging.info(f"Starting historical data fetcher ('{DATA_FETCHER_SCRIPT}')...")
fetcher_process = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True)
fetcher_process.start()
try:
with open(STRATEGY_CONFIG_FILE, 'r') as f:
strategy_configs = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}")
sys.exit(1)
required_timeframes = set()
for name, config in strategy_configs.items():
if config.get("enabled", False):
tf = config.get("parameters", {}).get("timeframe")
if tf:
required_timeframes.add(tf)
# --- Restored Resampler Process Start ---
logging.info(f"Starting resampler ('{RESAMPLER_SCRIPT}')...")
resampler_process = multiprocessing.Process(target=resampler_scheduler, daemon=True)
resampler_process.start()
# --- End Resampler Process Start ---
if not required_timeframes:
logging.warning("No timeframes required by any enabled strategy. Resampler will not run effectively.")
processes["Market Feeder"] = multiprocessing.Process(target=run_market_feeder, daemon=True)
processes["Data Fetcher"] = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True)
processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True)
processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True)
for name, config in strategy_configs.items():
if config.get("enabled", False):
if not os.path.exists(config['script']):
logging.error(f"Strategy script '{config['script']}' for strategy '{name}' not found. Skipping.")
continue
proc = multiprocessing.Process(target=run_strategy, args=(name, config), daemon=True)
processes[f"Strategy: {name}"] = proc
for name, proc in processes.items():
logging.info(f"Starting process '{name}'...")
proc.start()
time.sleep(3)
app = MainApp(coins_to_watch=WATCHED_COINS)
app = MainApp(coins_to_watch=WATCHED_COINS, processes=processes, strategy_configs=strategy_configs)
try:
app.run()
except KeyboardInterrupt:
logging.info("Shutting down...")
market_process.terminate()
fetcher_process.terminate()
# --- Restored Resampler Shutdown ---
resampler_process.terminate()
market_process.join()
fetcher_process.join()
resampler_process.join()
# --- End Resampler Shutdown ---
for proc in processes.values():
if proc.is_alive(): proc.terminate()
for proc in processes.values():
if proc.is_alive(): proc.join()
logging.info("Shutdown complete.")
sys.exit(0)

283
market_cap_fetcher.py Normal file
View File

@ -0,0 +1,283 @@
import argparse
import logging
import os
import sys
import sqlite3
import pandas as pd
import requests
import time
from datetime import datetime, timezone, timedelta
import json
# Assuming logging_utils.py is in the same directory
from logging_utils import setup_logging
class MarketCapFetcher:
"""
Fetches historical daily market cap data from the CoinGecko API and
intelligently updates the SQLite database. It processes individual coins,
aggregates stablecoins, and captures total market cap metrics.
"""
COIN_ID_MAP = {
"BTC": "bitcoin",
"ETH": "ethereum",
"SOL": "solana",
"BNB": "binancecoin",
"HYPE": "hyperliquid",
"ASTER": "astar",
"ZEC": "zcash",
"PUMP": "pump-fun", # Correct ID is 'pump-fun'
"SUI": "sui"
}
STABLECOIN_ID_MAP = {
"USDT": "tether",
"USDC": "usd-coin",
"USDE": "ethena-usde",
"DAI": "dai",
"PYUSD": "paypal-usd"
}
def __init__(self, log_level: str, coins: list):
setup_logging(log_level, 'MarketCapFetcher')
self.coins_to_fetch = coins
self.db_path = os.path.join("_data", "market_data.db")
self.api_base_url = "https://api.coingecko.com/api/v3"
self.api_key = os.environ.get("COINGECKO_API_KEY")
if not self.api_key:
logging.error("CoinGecko API key not found. Please set the COINGECKO_API_KEY environment variable.")
sys.exit(1)
def run(self):
"""
Main execution function to process all configured coins and update the database.
"""
logging.info("Starting historical market cap fetch process from CoinGecko...")
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
# 1. Process individual coins
for coin_symbol in self.coins_to_fetch:
coin_id = self.COIN_ID_MAP.get(coin_symbol.upper())
if not coin_id:
logging.warning(f"No CoinGecko ID found for '{coin_symbol}'. Skipping.")
continue
logging.info(f"--- Processing {coin_symbol} ({coin_id}) ---")
try:
self._update_market_cap_for_coin(coin_id, coin_symbol, conn)
except Exception as e:
logging.error(f"An unexpected error occurred while processing {coin_symbol}: {e}")
time.sleep(2)
# 2. Process and aggregate stablecoins
self._update_stablecoin_aggregate(conn)
# 3. Process total market cap metrics
self._update_total_market_cap(conn)
# 4. Save a summary of the latest data
self._save_summary(conn)
logging.info("--- Market cap fetch process complete ---")
def _save_summary(self, conn):
"""
Queries the last record from each market cap table and saves a summary to a JSON file.
"""
logging.info("--- Generating Market Cap Summary ---")
summary_data = {}
summary_file_path = os.path.join("_data", "market_cap_data.json")
try:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE '%_market_cap' OR name LIKE 'TOTAL_%');")
tables = [row[0] for row in cursor.fetchall()]
for table_name in tables:
try:
df_last = pd.read_sql(f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT 1', conn)
if not df_last.empty:
summary_data[table_name] = df_last.to_dict('records')[0]
except Exception as e:
logging.error(f"Could not read last record from table '{table_name}': {e}")
if summary_data:
summary_data['summary_last_updated_utc'] = datetime.now(timezone.utc).isoformat()
with open(summary_file_path, 'w', encoding='utf-8') as f:
json.dump(summary_data, f, indent=4)
logging.info(f"Successfully saved market cap summary to '{summary_file_path}'")
else:
logging.warning("No data found to create a summary.")
except Exception as e:
logging.error(f"Failed to generate summary: {e}")
def _update_total_market_cap(self, conn):
"""
Fetches the current total market cap and saves it for the current date.
"""
logging.info("--- Processing Total Market Cap ---")
table_name = "TOTAL_market_cap_daily"
try:
# --- FIX: Use the current date instead of yesterday's ---
today_date = datetime.now(timezone.utc).date()
cursor = conn.cursor()
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
table_exists = cursor.fetchone()
if table_exists:
# Check if we already have a record for today
cursor.execute(f"SELECT 1 FROM \"{table_name}\" WHERE date(datetime_utc) = ? LIMIT 1", (today_date.isoformat(),))
if cursor.fetchone():
logging.info(f"Total market cap for {today_date} already exists. Skipping.")
return
logging.info("Fetching current global market data...")
url = f"{self.api_base_url}/global"
headers = {"x-cg-demo-api-key": self.api_key}
response = requests.get(url, headers=headers)
response.raise_for_status()
global_data = response.json().get('data', {})
total_mc = global_data.get('total_market_cap', {}).get('usd')
if total_mc:
df_total = pd.DataFrame([{
'datetime_utc': pd.to_datetime(today_date),
'market_cap': total_mc
}])
df_total.to_sql(table_name, conn, if_exists='append', index=False)
logging.info(f"Saved total market cap for {today_date}: ${total_mc:,.2f}")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to fetch global market data: {e}")
except Exception as e:
logging.error(f"An error occurred while updating total market cap: {e}")
def _update_stablecoin_aggregate(self, conn):
"""Fetches data for all stablecoins and saves the aggregated market cap."""
logging.info("--- Processing aggregated stablecoin market cap ---")
all_stablecoin_df = pd.DataFrame()
for symbol, coin_id in self.STABLECOIN_ID_MAP.items():
logging.info(f"Fetching historical data for stablecoin: {symbol}...")
df = self._fetch_historical_data(coin_id, days=365)
if not df.empty:
df['coin'] = symbol
all_stablecoin_df = pd.concat([all_stablecoin_df, df])
time.sleep(2)
if all_stablecoin_df.empty:
logging.warning("No data fetched for any stablecoins. Cannot create aggregate.")
return
aggregated_df = all_stablecoin_df.groupby(all_stablecoin_df['datetime_utc'].dt.date)['market_cap'].sum().reset_index()
aggregated_df['datetime_utc'] = pd.to_datetime(aggregated_df['datetime_utc'])
table_name = "STABLECOINS_market_cap"
last_date_in_db = self._get_last_date_from_db(table_name, conn)
if last_date_in_db:
aggregated_df = aggregated_df[aggregated_df['datetime_utc'] > last_date_in_db]
if not aggregated_df.empty:
aggregated_df.to_sql(table_name, conn, if_exists='append', index=False)
logging.info(f"Successfully saved {len(aggregated_df)} daily records to '{table_name}'.")
else:
logging.info("Aggregated stablecoin data is already up-to-date.")
def _update_market_cap_for_coin(self, coin_id: str, coin_symbol: str, conn):
"""Fetches and appends new market cap data for a single coin."""
table_name = f"{coin_symbol}_market_cap"
last_date_in_db = self._get_last_date_from_db(table_name, conn)
days_to_fetch = 365
if last_date_in_db:
delta_days = (datetime.now() - last_date_in_db).days
if delta_days <= 0:
logging.info(f"Market cap data for '{coin_symbol}' is already up-to-date.")
return
days_to_fetch = min(delta_days + 1, 365)
else:
logging.info(f"No existing data found. Fetching initial {days_to_fetch} days for {coin_symbol}.")
df = self._fetch_historical_data(coin_id, days=days_to_fetch)
if df.empty:
logging.warning(f"No market cap data returned from API for {coin_symbol}.")
return
if last_date_in_db:
df = df[df['datetime_utc'] > last_date_in_db]
if not df.empty:
df.to_sql(table_name, conn, if_exists='append', index=False)
logging.info(f"Successfully saved {len(df)} new daily market cap records for {coin_symbol}.")
else:
logging.info(f"Data was fetched, but no new records needed saving for '{coin_symbol}'.")
def _get_last_date_from_db(self, table_name: str, conn) -> pd.Timestamp:
"""Gets the most recent date from a market cap table as a pandas Timestamp."""
try:
cursor = conn.cursor()
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
if not cursor.fetchone():
return None
last_date_str = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0]
return pd.to_datetime(last_date_str) if last_date_str else None
except Exception as e:
logging.error(f"Could not read last date from table '{table_name}': {e}")
return None
def _fetch_historical_data(self, coin_id: str, days: int) -> pd.DataFrame:
"""Fetches historical market chart data from CoinGecko for a specified number of days."""
url = f"{self.api_base_url}/coins/{coin_id}/market_chart"
params = { "vs_currency": "usd", "days": days, "interval": "daily" }
headers = {"x-cg-demo-api-key": self.api_key}
try:
logging.debug(f"Fetching last {days} days for {coin_id}...")
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
market_caps = data.get('market_caps', [])
if not market_caps: return pd.DataFrame()
df = pd.DataFrame(market_caps, columns=['timestamp_ms', 'market_cap'])
df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms')
df.drop_duplicates(subset=['datetime_utc'], keep='last', inplace=True)
return df[['datetime_utc', 'market_cap']]
except requests.exceptions.RequestException as e:
logging.error(f"API request failed for {coin_id}: {e}.")
return pd.DataFrame()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fetch historical market cap data from CoinGecko.")
parser.add_argument(
"--coins",
nargs='+',
default=["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"],
help="List of coin symbols to fetch (e.g., BTC ETH)."
)
parser.add_argument(
"--log-level",
default="normal",
choices=['off', 'normal', 'debug'],
help="Set the logging level for the script."
)
args = parser.parse_args()
fetcher = MarketCapFetcher(log_level=args.log_level, coins=args.coins)
fetcher.run()

BIN
requirements.txt Normal file

Binary file not shown.

View File

@ -12,8 +12,8 @@ from logging_utils import setup_logging
class Resampler:
"""
Reads 1-minute candle data directly from the SQLite database, resamples
it to various timeframes, and stores the results back in the database.
Reads new 1-minute candle data from the SQLite database, resamples it to
various timeframes, and appends the new candles to the corresponding tables.
"""
def __init__(self, log_level: str, coins: list, timeframes: dict):
@ -31,13 +31,14 @@ class Resampler:
'number_of_trades': 'sum'
}
self.resampling_status = self._load_existing_status()
self.job_start_time = None
def _load_existing_status(self) -> dict:
"""Loads the existing status file if it exists, otherwise returns an empty dict."""
if os.path.exists(self.status_file_path):
try:
with open(self.status_file_path, 'r', encoding='utf-8') as f:
logging.info(f"Loading existing status from '{self.status_file_path}'")
logging.debug(f"Loading existing status from '{self.status_file_path}'")
return json.load(f)
except (IOError, json.JSONDecodeError) as e:
logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}")
@ -47,78 +48,112 @@ class Resampler:
"""
Main execution function to process all configured coins and update the database.
"""
self.job_start_time = datetime.now(timezone.utc)
logging.info(f"--- Resampling job started at {self.job_start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} ---")
if not os.path.exists(self.db_path):
logging.error(f"Database file '{self.db_path}' not found. "
"Please run the data fetcher script first.")
sys.exit(1)
logging.error(f"Database file '{self.db_path}' not found.")
return
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
logging.info(f"Processing {len(self.coins_to_process)} coins: {', '.join(self.coins_to_process)}")
logging.debug(f"Processing {len(self.coins_to_process)} coins...")
for coin in self.coins_to_process:
source_table_name = f"{coin}_1m"
logging.info(f"--- Processing {coin} ---")
logging.debug(f"--- Processing {coin} ---")
try:
df = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn)
if df.empty:
logging.warning(f"Source table '{source_table_name}' is empty or does not exist. Skipping.")
# Load the full 1m history once per coin
df_1m = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn, parse_dates=['datetime_utc'])
if df_1m.empty:
logging.warning(f"Source table '{source_table_name}' is empty. Skipping.")
continue
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
df.set_index('datetime_utc', inplace=True)
df_1m.set_index('datetime_utc', inplace=True)
for tf_name, tf_code in self.timeframes.items():
logging.info(f" Resampling to {tf_name}...")
target_table_name = f"{coin}_{tf_name}"
logging.debug(f" Updating {tf_name} table...")
resampled_df = df.resample(tf_code).agg(self.aggregation_logic)
last_timestamp = self._get_last_timestamp(conn, target_table_name)
# Get the new 1-minute data that needs to be processed
new_df_1m = df_1m[df_1m.index > last_timestamp] if last_timestamp else df_1m
if new_df_1m.empty:
logging.debug(f" -> No new 1-minute data for {tf_name}. Table is up to date.")
continue
resampled_df = new_df_1m.resample(tf_code).agg(self.aggregation_logic)
resampled_df.dropna(how='all', inplace=True)
if coin not in self.resampling_status:
self.resampling_status[coin] = {}
if not resampled_df.empty:
target_table_name = f"{coin}_{tf_name}"
resampled_df.to_sql(
target_table_name,
conn,
if_exists='replace',
index=True
)
last_timestamp = resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S')
num_candles = len(resampled_df)
# Append the newly resampled data to the target table
resampled_df.to_sql(target_table_name, conn, if_exists='append', index=True)
logging.debug(f" -> Appended {len(resampled_df)} new candles to '{target_table_name}'.")
if coin not in self.resampling_status: self.resampling_status[coin] = {}
total_candles = int(self._get_table_count(conn, target_table_name))
self.resampling_status[coin][tf_name] = {
"last_candle_utc": last_timestamp,
"total_candles": num_candles
}
else:
logging.info(f" -> No data to save for '{coin}_{tf_name}'.")
self.resampling_status[coin][tf_name] = {
"last_candle_utc": "N/A",
"total_candles": 0
"last_candle_utc": resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S'),
"total_candles": total_candles
}
except pd.io.sql.DatabaseError as e:
logging.warning(f"Could not read source table '{source_table_name}': {e}")
except Exception as e:
logging.error(f"Failed to process coin '{coin}': {e}")
self._log_summary()
self._save_status()
logging.info("--- Resampling process complete ---")
logging.info(f"--- Resampling job finished at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')} ---")
def _log_summary(self):
"""Logs a summary of the total candles for each timeframe."""
logging.info("--- Resampling Job Summary ---")
timeframe_totals = {}
# Iterate through coins, skipping metadata keys
for coin, tfs in self.resampling_status.items():
if not isinstance(tfs, dict): continue
for tf_name, tf_data in tfs.items():
total = tf_data.get("total_candles", 0)
if tf_name not in timeframe_totals:
timeframe_totals[tf_name] = 0
timeframe_totals[tf_name] += total
if not timeframe_totals:
logging.info("No candles were resampled in this run.")
return
logging.info("Total candles per timeframe across all processed coins:")
for tf_name, total in sorted(timeframe_totals.items()):
logging.info(f" - {tf_name:<10}: {total:,} candles")
def _get_last_timestamp(self, conn, table_name):
"""Gets the timestamp of the last entry in a table."""
try:
return pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0]
except (pd.io.sql.DatabaseError, IndexError):
return None
def _get_table_count(self, conn, table_name):
"""Gets the total row count of a table."""
try:
return pd.read_sql(f'SELECT COUNT(*) FROM "{table_name}"', conn).iloc[0, 0]
except (pd.io.sql.DatabaseError, IndexError):
return 0
def _save_status(self):
"""Saves the final resampling status to a JSON file."""
if not self.resampling_status:
logging.warning("No data was resampled, skipping status file creation.")
return
self.resampling_status['last_completed_utc'] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
stop_time = datetime.now(timezone.utc)
self.resampling_status['job_start_time_utc'] = self.job_start_time.strftime('%Y-%m-%d %H:%M:%S')
self.resampling_status['job_stop_time_utc'] = stop_time.strftime('%Y-%m-%d %H:%M:%S')
# Clean up old key if it exists from previous versions
self.resampling_status.pop('last_completed_utc', None)
try:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(self.resampling_status, f, indent=4, sort_keys=True)
@ -135,55 +170,23 @@ def parse_timeframes(tf_strings: list) -> dict:
unit = ''.join(filter(str.isalpha, tf_str)).lower()
code = ''
if unit == 'm':
code = f"{numeric_part}min"
elif unit == 'w':
# --- FIX: Use uppercase 'W' for weeks to avoid deprecation warning ---
code = f"{numeric_part}W"
elif unit in ['h', 'd']:
code = f"{numeric_part}{unit}"
else:
code = tf_str
logging.warning(f"Unrecognized timeframe unit in '{tf_str}'. Using as-is.")
if unit == 'm': code = f"{numeric_part}min"
elif unit == 'w': code = f"{numeric_part}W"
elif unit in ['h', 'd']: code = f"{numeric_part}{unit}"
else: code = tf_str
tf_map[tf_str] = code
return tf_map
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Resample 1-minute candle data from SQLite to other timeframes.")
parser.add_argument(
"--coins",
nargs='+',
default=["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"],
help="List of coins to process."
)
parser.add_argument(
"--timeframes",
nargs='+',
default=['4m', '5m', '15m', '30m', '37m', '148m', '4h', '12h', '1d', '1w'],
help="List of timeframes to generate (e.g., 5m 1h 1d)."
)
parser.add_argument(
"--timeframe",
dest="timeframes",
nargs='+',
help=argparse.SUPPRESS
)
parser.add_argument(
"--log-level",
default="normal",
choices=['off', 'normal', 'debug'],
help="Set the logging level for the script."
)
parser.add_argument("--coins", nargs='+', required=True, help="List of coins to process.")
parser.add_argument("--timeframes", nargs='+', required=True, help="List of timeframes to generate.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
timeframes_dict = parse_timeframes(args.timeframes)
resampler = Resampler(
log_level=args.log_level,
coins=args.coins,
timeframes=timeframes_dict
)
resampler = Resampler(log_level=args.log_level, coins=args.coins, timeframes=timeframes_dict)
resampler.run()

219
strategy_sma_cross.py Normal file
View File

@ -0,0 +1,219 @@
import argparse
import logging
import sys
import time
import pandas as pd
import sqlite3
import json
import os
from datetime import datetime, timezone, timedelta
from logging_utils import setup_logging
class SmaCrossStrategy:
"""
A flexible strategy that can operate in two modes:
1. Fast SMA / Slow SMA Crossover (if both 'fast' and 'slow' params are set)
2. Price / Single SMA Crossover (if only one 'fast' or 'slow' param is set)
"""
def __init__(self, strategy_name: str, params: dict, log_level: str):
self.strategy_name = strategy_name
self.params = params
self.coin = params.get("coin", "N/A")
self.timeframe = params.get("timeframe", "N/A")
# Load fast and slow SMA periods, defaulting to 0 if not present
self.fast_ma_period = params.get("fast", 0)
self.slow_ma_period = params.get("slow", 0)
self.db_path = os.path.join("_data", "market_data.db")
self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json")
# Strategy state variables
self.current_signal = "INIT"
self.last_signal_change_utc = None
self.signal_price = None
self.fast_ma_value = None
self.slow_ma_value = None
setup_logging(log_level, f"Strategy-{self.strategy_name}")
logging.info(f"Initializing SMA Crossover strategy with parameters:")
for key, value in self.params.items():
logging.info(f" - {key}: {value}")
def load_data(self) -> pd.DataFrame:
"""Loads historical data, ensuring enough for the longest SMA calculation."""
table_name = f"{self.coin}_{self.timeframe}"
# Determine the longest period needed for calculations
longest_period = max(self.fast_ma_period or 0, self.slow_ma_period or 0)
if longest_period == 0:
logging.error("No valid SMA periods ('fast' or 'slow' > 0) are defined in parameters.")
return pd.DataFrame()
limit = longest_period + 50
try:
with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn:
query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}'
df = pd.read_sql(query, conn)
if df.empty: return pd.DataFrame()
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
df.set_index('datetime_utc', inplace=True)
df.sort_index(inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data from table '{table_name}': {e}")
return pd.DataFrame()
def _calculate_signals(self, data: pd.DataFrame):
"""
Analyzes historical data to find the last crossover event based on the
configured parameters (either dual or single SMA mode).
"""
# --- DUAL SMA CROSSOVER LOGIC ---
if self.fast_ma_period and self.slow_ma_period:
if len(data) < self.slow_ma_period + 1:
self.current_signal = "INSUFFICIENT DATA"
return
data['fast_sma'] = data['close'].rolling(window=self.fast_ma_period).mean()
data['slow_sma'] = data['close'].rolling(window=self.slow_ma_period).mean()
self.fast_ma_value = data['fast_sma'].iloc[-1]
self.slow_ma_value = data['slow_sma'].iloc[-1]
# Position is 1 for Golden Cross (fast > slow), -1 for Death Cross
data['position'] = 0
data.loc[data['fast_sma'] > data['slow_sma'], 'position'] = 1
data.loc[data['fast_sma'] < data['slow_sma'], 'position'] = -1
# --- SINGLE SMA PRICE CROSS LOGIC ---
else:
sma_period = self.fast_ma_period or self.slow_ma_period
if len(data) < sma_period + 1:
self.current_signal = "INSUFFICIENT DATA"
return
data['sma'] = data['close'].rolling(window=sma_period).mean()
self.slow_ma_value = data['sma'].iloc[-1] # Use slow_ma_value to store the single SMA
self.fast_ma_value = None # Ensure fast is None
# Position is 1 when price is above SMA, -1 when below
data['position'] = 0
data.loc[data['close'] > data['sma'], 'position'] = 1
data.loc[data['close'] < data['sma'], 'position'] = -1
# --- COMMON LOGIC for determining signal and last change ---
data['crossover'] = data['position'].diff()
last_position = data['position'].iloc[-1]
if last_position == 1: self.current_signal = "BUY"
elif last_position == -1: self.current_signal = "SELL"
else: self.current_signal = "HOLD"
last_cross_series = data[data['crossover'] != 0]
if not last_cross_series.empty:
last_cross_row = last_cross_series.iloc[-1]
self.last_signal_change_utc = last_cross_row.name.tz_localize('UTC').isoformat()
self.signal_price = last_cross_row['close']
if last_cross_row['position'] == 1: self.current_signal = "BUY"
elif last_cross_row['position'] == -1: self.current_signal = "SELL"
else:
self.last_signal_change_utc = data.index[0].tz_localize('UTC').isoformat()
self.signal_price = data['close'].iloc[0]
def _save_status(self):
"""Saves the current strategy state to its JSON file."""
status = {
"strategy_name": self.strategy_name,
"current_signal": self.current_signal,
"last_signal_change_utc": self.last_signal_change_utc,
"signal_price": self.signal_price,
"last_checked_utc": datetime.now(timezone.utc).isoformat()
}
try:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
except IOError as e:
logging.error(f"Failed to write status file: {e}")
def get_sleep_duration(self) -> int:
"""Calculates seconds to sleep until the next full candle closes."""
tf_value = int(''.join(filter(str.isdigit, self.timeframe)))
tf_unit = ''.join(filter(str.isalpha, self.timeframe))
if tf_unit == 'm': interval_seconds = tf_value * 60
elif tf_unit == 'h': interval_seconds = tf_value * 3600
elif tf_unit == 'd': interval_seconds = tf_value * 86400
else: return 60
now = datetime.now(timezone.utc)
timestamp = now.timestamp()
next_candle_ts = ((timestamp // interval_seconds) + 1) * interval_seconds
sleep_seconds = (next_candle_ts - timestamp) + 5
logging.info(f"Next candle closes at {datetime.fromtimestamp(next_candle_ts, tz=timezone.utc)}. "
f"Sleeping for {sleep_seconds:.2f} seconds.")
return sleep_seconds
def run_logic(self):
"""Main loop: loads data, calculates signals, saves status, and sleeps."""
logging.info(f"Starting logic loop for {self.coin} on {self.timeframe} timeframe.")
while True:
data = self.load_data()
if data.empty:
logging.warning("No data loaded. Waiting 1 minute before retrying...")
self.current_signal = "NO DATA"
self._save_status()
time.sleep(60)
continue
self._calculate_signals(data)
self._save_status()
last_close = data['close'].iloc[-1]
# --- Log based on which mode the strategy is running in ---
if self.fast_ma_period and self.slow_ma_period:
fast_ma_str = f"{self.fast_ma_value:.4f}" if self.fast_ma_value is not None else "N/A"
slow_ma_str = f"{self.slow_ma_value:.4f}" if self.slow_ma_value is not None else "N/A"
logging.info(
f"Signal: {self.current_signal} | Price: {last_close:.4f} | "
f"Fast SMA({self.fast_ma_period}): {fast_ma_str} | Slow SMA({self.slow_ma_period}): {slow_ma_str}"
)
else:
sma_period = self.fast_ma_period or self.slow_ma_period
sma_val_str = f"{self.slow_ma_value:.4f}" if self.slow_ma_value is not None else "N/A"
logging.info(
f"Signal: {self.current_signal} | Price: {last_close:.4f} | "
f"SMA({sma_period}): {sma_val_str}"
)
sleep_time = self.get_sleep_duration()
time.sleep(sleep_time)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run an SMA Crossover trading strategy.")
parser.add_argument("--name", required=True, help="The name of the strategy instance from the config.")
parser.add_argument("--params", required=True, help="A JSON string of the strategy's parameters.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
try:
strategy_params = json.loads(args.params)
strategy = SmaCrossStrategy(
strategy_name=args.name,
params=strategy_params,
log_level=args.log_level
)
strategy.run_logic()
except KeyboardInterrupt:
logging.info("Strategy process stopped.")
except Exception as e:
logging.error(f"A critical error occurred: {e}")
sys.exit(1)

186
strategy_template.py Normal file
View File

@ -0,0 +1,186 @@
import argparse
import logging
import sys
import time
import pandas as pd
import sqlite3
import json
import os
from datetime import datetime, timezone, timedelta
from logging_utils import setup_logging
class TradingStrategy:
"""
A template for a trading strategy that reads data from the SQLite database
and executes its logic in a loop, running once per candle.
"""
def __init__(self, strategy_name: str, params: dict, log_level: str):
self.strategy_name = strategy_name
self.params = params
self.coin = params.get("coin", "N/A")
self.timeframe = params.get("timeframe", "N/A")
self.db_path = os.path.join("_data", "market_data.db")
self.status_file_path = os.path.join("_data", f"strategy_status_{self.strategy_name}.json")
# Strategy state variables
self.current_signal = "INIT"
self.last_signal_change_utc = None
self.signal_price = None
self.indicator_value = None
# Load strategy-specific parameters from config
self.rsi_period = params.get("rsi_period")
self.short_ma = params.get("short_ma")
self.long_ma = params.get("long_ma")
self.sma_period = params.get("sma_period")
setup_logging(log_level, f"Strategy-{self.strategy_name}")
logging.info(f"Initializing strategy with parameters: {self.params}")
def load_data(self) -> pd.DataFrame:
"""Loads historical data, ensuring enough for the longest indicator period."""
table_name = f"{self.coin}_{self.timeframe}"
limit = 500
# Determine required data limit based on the longest configured indicator
periods = [p for p in [self.sma_period, self.long_ma, self.rsi_period] if p is not None]
if periods:
limit = max(periods) + 50
try:
with sqlite3.connect(f"file:{self.db_path}?mode=ro", uri=True) as conn:
query = f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT {limit}'
df = pd.read_sql(query, conn)
if df.empty: return pd.DataFrame()
df['datetime_utc'] = pd.to_datetime(df['datetime_utc'])
df.set_index('datetime_utc', inplace=True)
df.sort_index(inplace=True)
return df
except Exception as e:
logging.error(f"Failed to load data from table '{table_name}': {e}")
return pd.DataFrame()
def _calculate_signals(self, data: pd.DataFrame):
"""
Analyzes historical data to find the last signal crossover event.
This method should be expanded to handle different strategy types.
"""
if self.sma_period:
if len(data) < self.sma_period + 1:
self.current_signal = "INSUFFICIENT DATA"
return
data['sma'] = data['close'].rolling(window=self.sma_period).mean()
self.indicator_value = data['sma'].iloc[-1]
data['position'] = 0
data.loc[data['close'] > data['sma'], 'position'] = 1
data.loc[data['close'] < data['sma'], 'position'] = -1
data['crossover'] = data['position'].diff()
last_position = data['position'].iloc[-1]
if last_position == 1: self.current_signal = "BUY"
elif last_position == -1: self.current_signal = "SELL"
else: self.current_signal = "HOLD"
last_cross_series = data[data['crossover'] != 0]
if not last_cross_series.empty:
last_cross_row = last_cross_series.iloc[-1]
self.last_signal_change_utc = last_cross_row.name.tz_localize('UTC').isoformat()
self.signal_price = last_cross_row['close']
if last_cross_row['position'] == 1: self.current_signal = "BUY"
elif last_cross_row['position'] == -1: self.current_signal = "SELL"
else:
self.last_signal_change_utc = data.index[0].tz_localize('UTC').isoformat()
self.signal_price = data['close'].iloc[0]
elif self.rsi_period:
logging.info(f"RSI logic not implemented for period {self.rsi_period}.")
self.current_signal = "NOT IMPLEMENTED"
elif self.short_ma and self.long_ma:
logging.info(f"MA Cross logic not implemented for {self.short_ma}/{self.long_ma}.")
self.current_signal = "NOT IMPLEMENTED"
def _save_status(self):
"""Saves the current strategy state to its JSON file."""
status = {
"strategy_name": self.strategy_name,
"current_signal": self.current_signal,
"last_signal_change_utc": self.last_signal_change_utc,
"signal_price": self.signal_price,
"last_checked_utc": datetime.now(timezone.utc).isoformat()
}
try:
with open(self.status_file_path, 'w', encoding='utf-8') as f:
json.dump(status, f, indent=4)
except IOError as e:
logging.error(f"Failed to write status file: {e}")
def get_sleep_duration(self) -> int:
"""Calculates seconds to sleep until the next full candle closes."""
if not self.timeframe: return 60
tf_value = int(''.join(filter(str.isdigit, self.timeframe)))
tf_unit = ''.join(filter(str.isalpha, self.timeframe))
if tf_unit == 'm': interval_seconds = tf_value * 60
elif tf_unit == 'h': interval_seconds = tf_value * 3600
elif tf_unit == 'd': interval_seconds = tf_value * 86400
else: return 60
now = datetime.now(timezone.utc)
timestamp = now.timestamp()
next_candle_ts = ((timestamp // interval_seconds) + 1) * interval_seconds
sleep_seconds = (next_candle_ts - timestamp) + 5
logging.info(f"Next candle closes at {datetime.fromtimestamp(next_candle_ts, tz=timezone.utc)}. "
f"Sleeping for {sleep_seconds:.2f} seconds.")
return sleep_seconds
def run_logic(self):
"""Main loop: loads data, calculates signals, saves status, and sleeps."""
logging.info(f"Starting main logic loop for {self.coin} on {self.timeframe} timeframe.")
while True:
data = self.load_data()
if data.empty:
logging.warning("No data loaded. Waiting 1 minute before retrying...")
self.current_signal = "NO DATA"
self._save_status()
time.sleep(60)
continue
self._calculate_signals(data)
self._save_status()
last_close = data['close'].iloc[-1]
indicator_val_str = f"{self.indicator_value:.4f}" if self.indicator_value is not None else "N/A"
logging.info(f"Signal: {self.current_signal} | Price: {last_close:.4f} | Indicator: {indicator_val_str}")
sleep_time = self.get_sleep_duration()
time.sleep(sleep_time)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a trading strategy.")
parser.add_argument("--name", required=True, help="The name of the strategy instance from the config.")
parser.add_argument("--params", required=True, help="A JSON string of the strategy's parameters.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
try:
strategy_params = json.loads(args.params)
strategy = TradingStrategy(
strategy_name=args.name,
params=strategy_params,
log_level=args.log_level
)
strategy.run_logic()
except KeyboardInterrupt:
logging.info("Strategy process stopped.")
except Exception as e:
logging.error(f"A critical error occurred: {e}")
sys.exit(1)

200
trade_executor.py Normal file
View File

@ -0,0 +1,200 @@
import argparse
import logging
import os
import sys
import json
import time
from datetime import datetime
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants
from dotenv import load_dotenv
from logging_utils import setup_logging
from trade_log import log_trade
# Load environment variables from a .env file
load_dotenv()
class TradeExecutor:
"""
Monitors strategy signals and executes trades using a multi-agent,
multi-strategy position management system. Each strategy's position is
tracked independently.
"""
def __init__(self, log_level: str):
setup_logging(log_level, 'TradeExecutor')
self.vault_address = os.environ.get("MAIN_WALLET_ADDRESS")
if not self.vault_address:
logging.error("MAIN_WALLET_ADDRESS not set.")
sys.exit(1)
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
self.exchanges = self._load_agents()
if not self.exchanges:
logging.error("No trading agents found in .env file.")
sys.exit(1)
strategy_config_path = os.path.join("_data", "strategies.json")
try:
with open(strategy_config_path, 'r') as f:
self.strategy_configs = {name: config for name, config in json.load(f).items() if config.get("enabled")}
logging.info(f"Loaded {len(self.strategy_configs)} enabled strategies.")
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Could not load strategies from '{strategy_config_path}': {e}")
sys.exit(1)
self.status_file_path = os.path.join("_logs", "trade_executor_status.json")
self.managed_positions_path = os.path.join("_data", "executor_managed_positions.json")
self.managed_positions = self._load_managed_positions()
def _load_agents(self) -> dict:
"""Discovers and initializes agents from environment variables."""
exchanges = {}
logging.info("Discovering agents from environment variables...")
for env_var, private_key in os.environ.items():
agent_name = None
if env_var == "AGENT_PRIVATE_KEY":
agent_name = "default"
elif env_var.endswith("_AGENT_PK"):
agent_name = env_var.replace("_AGENT_PK", "").lower()
if agent_name and private_key:
try:
agent_account = Account.from_key(private_key)
exchanges[agent_name] = Exchange(agent_account, constants.MAINNET_API_URL, account_address=self.vault_address)
logging.info(f"Initialized agent '{agent_name}' with address: {agent_account.address}")
except Exception as e:
logging.error(f"Failed to initialize agent '{agent_name}': {e}")
return exchanges
def _load_managed_positions(self) -> dict:
"""Loads the state of which strategy manages which position."""
if os.path.exists(self.managed_positions_path):
try:
with open(self.managed_positions_path, 'r') as f:
logging.info("Loading existing managed positions state.")
return json.load(f)
except (IOError, json.JSONDecodeError):
logging.warning("Could not read managed positions file. Starting fresh.")
return {}
def _save_managed_positions(self):
"""Saves the current state of managed positions."""
try:
with open(self.managed_positions_path, 'w') as f:
json.dump(self.managed_positions, f, indent=4)
except IOError as e:
logging.error(f"Failed to save managed positions state: {e}")
def _save_executor_status(self, perpetuals_state, spot_state, all_market_contexts):
"""Saves the current balances and open positions to a live status file."""
# This function is correct and does not need changes.
pass
def run(self):
"""The main execution loop with advanced position management."""
logging.info("Starting Trade Executor loop...")
while True:
try:
perpetuals_state = self.info.user_state(self.vault_address)
open_positions_api = {pos['position'].get('coin'): pos['position'] for pos in perpetuals_state.get('assetPositions', []) if float(pos.get('position', {}).get('szi', 0)) != 0}
for name, config in self.strategy_configs.items():
coin = config['parameters'].get('coin')
size = config['parameters'].get('size')
# --- ADDED: Load leverage parameters from config ---
leverage_long = config['parameters'].get('leverage_long')
leverage_short = config['parameters'].get('leverage_short')
status_file = os.path.join("_data", f"strategy_status_{name}.json")
if not os.path.exists(status_file): continue
with open(status_file, 'r') as f: status = json.load(f)
desired_signal = status.get('current_signal')
current_position = self.managed_positions.get(name)
agent_name = config.get("agent", "default").lower()
exchange_to_use = self.exchanges.get(agent_name)
if not exchange_to_use:
logging.error(f"[{name}] Agent '{agent_name}' not found. Skipping trade.")
continue
# --- State Machine Logic with Configurable Leverage ---
if desired_signal == "BUY":
if not current_position:
if not all([size, leverage_long]):
logging.error(f"[{name}] 'size' or 'leverage_long' not defined. Skipping.")
continue
logging.warning(f"[{name}] ACTION: Open LONG for {coin} with {leverage_long}x leverage.")
exchange_to_use.update_leverage(int(leverage_long), coin)
exchange_to_use.market_open(coin, True, size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "long", "size": size}
log_trade(strategy=name, coin=coin, action="OPEN_LONG", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif current_position['side'] == 'short':
if not all([size, leverage_long]):
logging.error(f"[{name}] 'size' or 'leverage_long' not defined. Skipping.")
continue
logging.warning(f"[{name}] ACTION: Close SHORT and open LONG for {coin} with {leverage_long}x leverage.")
exchange_to_use.update_leverage(int(leverage_long), coin)
exchange_to_use.market_open(coin, True, current_position['size'] + size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "long", "size": size}
log_trade(strategy=name, coin=coin, action="CLOSE_SHORT_&_REVERSE", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif desired_signal == "SELL":
if not current_position:
if not all([size, leverage_short]):
logging.error(f"[{name}] 'size' or 'leverage_short' not defined. Skipping.")
continue
logging.warning(f"[{name}] ACTION: Open SHORT for {coin} with {leverage_short}x leverage.")
exchange_to_use.update_leverage(int(leverage_short), coin)
exchange_to_use.market_open(coin, False, size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "short", "size": size}
log_trade(strategy=name, coin=coin, action="OPEN_SHORT", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif current_position['side'] == 'long':
if not all([size, leverage_short]):
logging.error(f"[{name}] 'size' or 'leverage_short' not defined. Skipping.")
continue
logging.warning(f"[{name}] ACTION: Close LONG and open SHORT for {coin} with {leverage_short}x leverage.")
exchange_to_use.update_leverage(int(leverage_short), coin)
exchange_to_use.market_open(coin, False, current_position['size'] + size, None, 0.01)
self.managed_positions[name] = {"coin": coin, "side": "short", "size": size}
log_trade(strategy=name, coin=coin, action="CLOSE_LONG_&_REVERSE", price=status.get('signal_price', 0), size=size, signal=desired_signal)
elif desired_signal == "FLAT":
if current_position:
logging.warning(f"[{name}] ACTION: Close {current_position['side']} position for {coin}.")
is_buy = current_position['side'] == 'short'
exchange_to_use.market_open(coin, is_buy, current_position['size'], None, 0.01)
del self.managed_positions[name]
log_trade(strategy=name, coin=coin, action=f"CLOSE_{current_position['side'].upper()}", price=status.get('signal_price', 0), size=current_position['size'], signal=desired_signal)
self._save_managed_positions()
except Exception as e:
logging.error(f"An error occurred in the main executor loop: {e}")
time.sleep(15)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run the Trade Executor.")
parser.add_argument("--log-level", default="normal", choices=['off', 'normal', 'debug'])
args = parser.parse_args()
executor = TradeExecutor(log_level=args.log_level)
try:
executor.run()
except KeyboardInterrupt:
logging.info("Trade Executor stopped.")

55
trade_log.py Normal file
View File

@ -0,0 +1,55 @@
import os
import csv
from datetime import datetime, timezone
import threading
# A lock to prevent race conditions when multiple strategies might log at once in the future
log_lock = threading.Lock()
def log_trade(strategy: str, coin: str, action: str, price: float, size: float, signal: str, pnl: float = 0.0):
"""
Appends a record of a trade action to a persistent CSV log file.
Args:
strategy (str): The name of the strategy that triggered the action.
coin (str): The coin being traded (e.g., 'BTC').
action (str): The action taken (e.g., 'OPEN_LONG', 'CLOSE_LONG').
price (float): The execution price of the trade.
size (float): The size of the trade.
signal (str): The signal that triggered the trade (e.g., 'BUY', 'SELL').
pnl (float, optional): The realized profit and loss for closing trades. Defaults to 0.0.
"""
log_dir = "_logs"
file_path = os.path.join(log_dir, "trade_history.csv")
# Ensure the logs directory exists
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# Define the headers for the CSV file
headers = ["timestamp_utc", "strategy", "coin", "action", "price", "size", "signal", "pnl"]
# Check if the file needs a header
file_exists = os.path.isfile(file_path)
with log_lock:
try:
with open(file_path, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=headers)
if not file_exists:
writer.writeheader()
writer.writerow({
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
"strategy": strategy,
"coin": coin,
"action": action,
"price": price,
"size": size,
"signal": signal,
"pnl": pnl
})
except IOError as e:
# If logging fails, print an error to the main console as a fallback.
print(f"CRITICAL: Failed to write to trade log file: {e}")