feat: implement strategy metadata and dashboard simulation panel

- Added display_name and description to BaseStrategy
- Updated MA44 and MA125 strategies with metadata
- Added /api/v1/strategies endpoint for dynamic discovery
- Added Strategy Simulation panel to dashboard with date picker and tooltips
- Implemented JS polling for backtest results in dashboard
- Added performance test scripts and DB connection guide
- Expanded indicator config to all 15 timeframes
This commit is contained in:
BTC Bot
2026-02-13 09:50:08 +01:00
parent 38f0a21f56
commit d7bdfcf716
23 changed files with 3623 additions and 241 deletions

273
AGENTS.md
View File

@ -1,223 +1,128 @@
# AGENTS.md - AI Coding Assistant Guidelines # AGENTS.md - AI Coding Assistant Guidelines
## Project Overview ## Project Overview
BTC Accumulation Bot - Data Collection Phase. High-performance async data collection BTC Accumulation Bot - Data Collection & Backtesting Phase. High-performance system for
system for cbBTC on Hyperliquid with TimescaleDB storage. Python 3.11, asyncio, cbBTC on Hyperliquid with TimescaleDB. Core components: Data Collector (WS),
FastAPI, asyncpg, WebSockets. Indicator Engine (SMA, etc.), Brain (Decision Logic), and Backtester.
## Build/Run Commands ## Build/Run Commands
### Docker (Primary deployment - Synology DS218+) ### Docker (Primary deployment - Synology DS218+)
```bash ```bash
# Build and start all services (timescaledb, data_collector, api_server) cd docker && docker-compose up -d --build # Build and start all services
cd docker && docker-compose up -d --build docker-compose logs -f data_collector # View logs
bash scripts/deploy.sh # Full deploy
# View logs
docker-compose logs -f data_collector
docker-compose logs -f api_server
# Full deploy (creates dirs, pulls, builds, starts)
bash scripts/deploy.sh
``` ```
### Development ### Development
```bash ```bash
# API server (requires DB running)
cd src/api && uvicorn server:app --reload --host 0.0.0.0 --port 8000 cd src/api && uvicorn server:app --reload --host 0.0.0.0 --port 8000
# Docs: http://localhost:8000/docs | Dashboard: http://localhost:8000/dashboard # Docs: http://localhost:8000/docs | Dashboard: http://localhost:8000/dashboard
# Data collector
cd src/data_collector && python -m data_collector.main cd src/data_collector && python -m data_collector.main
``` ```
### Testing ### Testing
```bash ```bash
# Run all tests pytest # All tests
pytest pytest tests/data_collector/test_websocket_client.py # Single file
pytest --cov=src --cov-report=html # With coverage
# Run a specific test file
pytest tests/data_collector/test_websocket_client.py
# Run a single test by name
pytest tests/data_collector/test_websocket_client.py::test_websocket_connection -v
# Run with coverage
pytest --cov=src --cov-report=html
```
Note: The tests/ directory structure exists but test files have not been written yet.
When creating tests, use pytest with pytest-asyncio for async test support.
### Linting & Formatting
```bash
# No config files exist for these tools; use these flags:
flake8 src/ --max-line-length=100 --extend-ignore=E203,W503
black --check src/ # Check formatting
black src/ # Auto-format
mypy src/ --ignore-missing-imports
``` ```
## Project Structure ## Project Structure
``` ```
src/ src/
├── data_collector/ # WebSocket client, buffer, database ├── data_collector/ # WebSocket client, buffer, database
│ ├── __init__.py │ ├── __init__.py # Package exports (all public classes)
│ ├── main.py # Entry point, orchestration, signal handling │ ├── main.py # Entry point, orchestration
│ ├── websocket_client.py # Hyperliquid WS client, Candle dataclass │ ├── websocket_client.py # Hyperliquid WS client
│ ├── candle_buffer.py # Circular buffer with async flush │ ├── candle_buffer.py # Circular buffer with async flush
│ ├── database.py # asyncpg/TimescaleDB interface │ ├── database.py # asyncpg/TimescaleDB interface
── backfill.py # Historical data backfill from REST API ── backfill.py # Historical data backfill
│ ├── custom_timeframe_generator.py # 37m, 148m, 1d aggregation
│ ├── indicator_engine.py # SMA/EMA computation & storage
│ ├── brain.py # Strategy evaluation & decision logging
│ └── backtester.py # Historical replay driver
└── api/ └── api/
├── server.py # FastAPI app, all endpoints ├── server.py # FastAPI app, endpoints for data/backtests
└── dashboard/static/ └── dashboard/static/index.html # Real-time web dashboard
└── index.html # Real-time web dashboard config/data_config.yaml # Operational config & indicator settings
config/data_config.yaml # Non-secret operational config docker/ # Docker orchestration & init-scripts
docker/ scripts/ # Deploy, backup, & utility scripts
├── docker-compose.yml # 3-service orchestration
├── Dockerfile.api / .collector # python:3.11-slim based
└── init-scripts/ # 01-schema.sql, 02-optimization.sql
scripts/ # deploy.sh, backup.sh, health_check.sh, backfill.sh
tests/data_collector/ # Test directory (empty - tests not yet written)
``` ```
## Architecture & Data Flow
```
Live: WS -> Buffer -> DB -> CustomTF -> IndicatorEngine -> Brain -> Decisions
│ │
Backtest: DB (History) -> Backtester ─────────┴─────────────┘
```
- **Stateless Logic**: `IndicatorEngine` and `Brain` are driver-agnostic. They read from DB
and write to DB, unaware if the trigger is live WS or backtest replay.
- **Consistency**: Indicators are computed exactly the same way for live and backtest.
- **Visualization**: Dashboard queries `indicators` and `decisions` tables directly.
Decisions contain a JSON snapshot of indicators at the moment of decision.
## Key Dataclasses
```python
@dataclass
class Candle: # Standard OHLCV
time: datetime; symbol: str; interval: str; ...
@dataclass
class Decision: # Brain output
time: datetime; symbol: str; decision_type: str; confidence: float
indicator_snapshot: Dict; # Values seen by Brain at decision time
backtest_id: Optional[str] # UUID if backtest, None if live
```
## Database Schema (TimescaleDB)
| Table | Purpose | Key Columns |
|-------|---------|-------------|
| `candles` | OHLCV data | `(time, symbol, interval)` UNIQUE |
| `indicators` | Computed values | `(time, symbol, interval, indicator_name)` UNIQUE |
| `decisions` | Buy/sell signals | `(time, symbol, interval, backtest_id)` |
| `backtest_runs` | Run metadata | `(id, strategy, config, results)` |
- `decisions` table stores `indicator_snapshot` JSONB for exact replay/audit.
- Compression enabled on all hypertables (7-day policy).
## API Endpoints (src/api/server.py)
| Method | Path | Description |
|--------|------|-------------|
| GET | `/api/v1/candles` | Query raw candles |
| GET | `/api/v1/indicators` | Query computed indicators (MA, RSI, etc.) |
| GET | `/api/v1/decisions` | Query signals (live or backtest) |
| GET | `/api/v1/backtests` | List historical backtest runs |
| POST | `/api/v1/backtests` | Trigger a new backtest (async background task) |
| GET | `/api/v1/stats` | 24h trading stats |
## Code Style Guidelines ## Code Style Guidelines
### Imports - **Imports**: Stdlib, then Third-party, then Local (relative within package).
Group in this order, separated by blank lines: - **Async**: Use `async/await` for all I/O. Use `asyncpg` pool.
1. Standard library (`import asyncio`, `from datetime import datetime`) - **Typing**: strict type hints required. `Optional[T]`, `List[T]`.
2. Third-party (`import websockets`, `import asyncpg`, `from fastapi import FastAPI`) - **Logging**: Use `logger = logging.getLogger(__name__)`.
3. Local/relative (`from .websocket_client import Candle`) - **Config**: Load from `config/data_config.yaml` or env vars.
Use relative imports (`.module`) within the `data_collector` package.
Use absolute imports for third-party packages.
### Formatting
- Line length: 100 characters max
- Indentation: 4 spaces
- Strings: double quotes (single only to avoid escaping)
- Trailing commas in multi-line collections
- Formatter: black
### Type Hints
- Required on all function parameters and return values
- `Optional[Type]` for nullable values
- `List[Type]`, `Dict[str, Any]` from `typing` module
- `@dataclass` for data-holding classes (e.g., `Candle`, `BufferStats`)
- Callable types for callbacks: `Callable[[Candle], Awaitable[None]]`
### Naming Conventions
- Classes: `PascalCase` (DataCollector, CandleBuffer)
- Functions/variables: `snake_case` (get_candles, buffer_size)
- Constants: `UPPER_SNAKE_CASE` (DB_HOST, MAX_BUFFER_SIZE)
- Private methods: `_leading_underscore` (_handle_reconnect, _flush_loop)
### Docstrings
- Triple double quotes on all modules, classes, and public methods
- Brief one-line description on first line
- Optional blank line + detail if needed
- No Args/Returns sections (not strict Google-style)
```python
"""Add a candle to the buffer
Returns True if added, False if buffer full and candle dropped"""
```
### Error Handling
- `try/except` with specific exceptions (never bare `except:`)
- Log errors with `logger.error()` before re-raising in critical paths
- Catch `asyncio.CancelledError` to break loops cleanly
- Use `finally` blocks for cleanup (always call `self.stop()`)
- Use `@asynccontextmanager` for resource acquisition (DB connections)
### Async Patterns
- `async/await` for all I/O operations
- `asyncio.Lock()` for thread-safe buffer access
- `asyncio.Event()` for stop/flush coordination
- `asyncio.create_task()` for background loops
- `asyncio.gather(*tasks, return_exceptions=True)` for parallel cleanup
- `asyncio.wait_for(coro, timeout)` for graceful shutdown
- `asyncio.run(main())` as the entry point
### Logging
- Module-level: `logger = logging.getLogger(__name__)` in every file
- Format: `'%(asctime)s - %(name)s - %(levelname)s - %(message)s'`
- Log level from env: `getattr(logging, os.getenv('LOG_LEVEL', 'INFO'))`
- Use f-strings in log messages with relevant context
- Levels: DEBUG (candle receipt), INFO (lifecycle), WARNING (gaps), ERROR (failures)
### Database (asyncpg + TimescaleDB)
- Connection pool: `asyncpg.create_pool(min_size=1, max_size=N)`
- `@asynccontextmanager` wrapper for connection acquisition
- Batch inserts with `executemany()`
- Upserts with `ON CONFLICT ... DO UPDATE`
- Positional params: `$1, $2, ...` (not `%s`)
- Use `conn.fetch()`, `conn.fetchrow()`, `conn.fetchval()` for results
### Configuration
- Secrets via environment variables (`os.getenv('DB_PASSWORD')`)
- Non-secret config in `config/data_config.yaml`
- Constructor defaults fall back to env vars
- Never commit `.env` files (contains real credentials)
## Common Tasks ## Common Tasks
### Add New API Endpoint ### Add New Indicator
1. Add route in `src/api/server.py` with `@app.get()`/`@app.post()` 1. Add to `config/data_config.yaml` under `indicators`.
2. Type-hint query params with `Query()`; return `dict` or raise `HTTPException` 2. Update `IndicatorEngine._compute_indicator` in `src/data_collector/indicator_engine.py` if new type (non-SMA).
3. Use `asyncpg` pool for database queries 3. No DB schema change needed (rows are generic).
### Add New Data Source ### Run Backtest
1. Create module in `src/data_collector/` following `websocket_client.py` pattern
2. Implement async `connect()`, `disconnect()`, `receive()` methods
3. Use callback architecture: `on_data`, `on_error` callables
### Database Schema Changes
1. Update `docker/init-scripts/01-schema.sql`
2. Update `DatabaseManager` methods in `src/data_collector/database.py`
3. Rebuild: `docker-compose down -v && docker-compose up -d --build`
### Writing Tests
1. Create test files in `tests/data_collector/` (e.g., `test_websocket_client.py`)
2. Use `pytest-asyncio` for async tests: `@pytest.mark.asyncio`
3. Mock external services (WebSocket, database) with `unittest.mock`
4. Descriptive names: `test_websocket_reconnection_with_backoff`
### Historical Data Backfill
The `backfill.py` module downloads historical candle data from Hyperliquid's REST API.
**API Limitations:**
- Max 5000 candles per coin/interval combination
- 500 candles per response (requires pagination)
- Available intervals: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 8h, 12h, 1d, 3d, 1w, 1M
**Usage - Python Module:**
```python
from data_collector.backfill import HyperliquidBackfill
from data_collector.database import DatabaseManager
async with HyperliquidBackfill(db, coin="BTC", intervals=["1m", "1h"]) as backfill:
# Backfill last 7 days for all configured intervals
results = await backfill.backfill_all_intervals(days_back=7)
# Or backfill specific interval
count = await backfill.backfill_interval("1m", days_back=3)
```
**Usage - CLI:**
```bash ```bash
# Backfill 7 days of 1m candles for BTC # CLI
cd src/data_collector && python -m data_collector.backfill --coin BTC --days 7 --intervals 1m python -m data_collector.backtester --symbol BTC --intervals 37m --start 2025-01-01
# Backfill multiple intervals # API
python -m data_collector.backfill --coin BTC --days 30 --intervals 1m 5m 1h curl -X POST http://localhost:8000/api/v1/backtests \
-H "Content-Type: application/json" \
# Backfill MAXIMUM available data (5000 candles per interval) -d '{"symbol": "BTC", "intervals": ["37m"], "start_date": "2025-01-01"}'
python -m data_collector.backfill --coin BTC --days max --intervals 1m 1h 1d
# Or use the convenience script
bash scripts/backfill.sh BTC 7 "1m 5m 1h"
bash scripts/backfill.sh BTC max "1m 1h 1d" # Maximum data
``` ```
**Data Coverage by Interval:**
- 1m candles: ~3.5 days (5000 candles)
- 1h candles: ~7 months (5000 candles)
- 1d candles: ~13.7 years (5000 candles)

340
IMPLEMENTATION_SUMMARY.md Normal file
View File

@ -0,0 +1,340 @@
# BTC Accumulation Bot - Implementation Summary
## Current Status: ✅ CORE SYSTEM COMPLETE
### What We've Built
This is a **production-ready cryptocurrency trading data collection and backtesting system** for cbBTC on Hyperliquid exchange.
---
## Architecture Overview
```
Live Data Flow:
Hyperliquid WS → WebSocket Client → Candle Buffer → TimescaleDB
Custom TF Generator → Indicator Engine (MA44, MA125) → Brain → Decisions
Backtest Flow:
Historical DB Data → Backtester → Same Indicator Engine/Brain → Simulated Trades
```
---
## Core Components (All Implemented)
### 1. Data Collection Pipeline
**WebSocket Client** (`websocket_client.py`)
- Connects to Hyperliquid WebSocket API
- Automatic reconnection with exponential backoff (1s to 15min)
- Handles multiple candle formats
- Health monitoring and connection metrics
**Candle Buffer** (`candle_buffer.py`)
- Thread-safe circular buffer (1000 max size)
- Async batch flushing (30s interval or 100 candles)
- Gap detection in candle sequences
- Statistics tracking for monitoring
**Database Manager** (`database.py`)
- asyncpg connection pool (min: 2, max: 20)
- Batch inserts with conflict resolution
- Gap detection using window functions
- Health statistics queries
### 2. Data Processing
**Custom Timeframe Generator** (`custom_timeframe_generator.py`)
- Standard intervals: 3m, 5m, 15m, 30m, 1h, 2h, 4h, 8h, 12h, 1d, 3d, 1w, 1M
- Custom intervals: **37m**, **148m** (Fibonacci-based)
- Real-time aggregation from 1m candles
- Historical backfill capability
- Integrity verification
**Indicator Engine** (`indicator_engine.py`)
- Stateless design (same code for live and backtest)
- Configurable indicators via dataclasses
- Currently implemented: **SMA (Simple Moving Average)**
- Indicators configured:
- MA44 (44-period SMA) on 37m, 148m, 1d
- MA125 (125-period SMA) on 37m, 148m, 1d
- Batch computation for historical data
### 3. Decision Making
**Brain** (`brain.py`)
- Strategy-based evaluation system
- Pluggable strategy loader
- Decision logging with full indicator snapshots (JSONB)
- Supports live trading and backtesting
- Current strategies:
- MA44Strategy: Long when Price > MA44, Short when Price < MA44
- MA125Strategy: Long when Price > MA125, Short when Price < MA125
**Strategy System** (`strategies/`)
- Base class with abstract interface
- Signal types: OPEN_LONG, OPEN_SHORT, CLOSE_LONG, CLOSE_SHORT, HOLD
- Confidence scoring (0.0 - 1.0)
- Position-aware decision making
### 4. Backtesting & Simulation
**Simulator** (`simulator.py`)
- Account management with leverage support
- Trade tracking with P&L calculation
- Fee calculation (maker/taker)
- Equity mark-to-market updates
- Statistics generation (win rate, total P&L, etc.)
**Backtester** (`backtester.py`)
- Historical replay through same IndicatorEngine/Brain
- UUID-based run tracking
- Progress logging
- Results storage in database
- CLI interface for running backtests
### 5. API & Dashboard
**FastAPI Server** (`api/server.py`)
Endpoints:
- `GET /` - API status
- `GET /api/v1/candles` - Query candle data
- `GET /api/v1/candles/latest` - Latest candle
- `GET /api/v1/indicators` - Query indicator values
- `GET /api/v1/decisions` - Query brain decisions
- `GET /api/v1/backtests` - List backtest runs
- `POST /api/v1/backtests` - Trigger new backtest (async)
- `GET /api/v1/stats` - 24h trading statistics
- `GET /api/v1/health` - System health check
- `GET /api/v1/ta` - Technical analysis endpoint
- `GET /api/v1/export/csv` - Export data to CSV
- `GET /dashboard` - Static dashboard
**Dashboard**
- Real-time web interface
- Connects to API endpoints
- Visualizes candles, indicators, and decisions
---
## Database Schema (TimescaleDB)
### Tables
**candles** - OHLCV data
- `(time, symbol, interval)` UNIQUE
- Compression enabled (7-day policy)
- Supports conflict resolution
**indicators** - Computed technical indicators
- `(time, symbol, interval, indicator_name)` UNIQUE
- Stores value and parameters (JSONB)
**decisions** - Trading signals
- `(time, symbol, interval, backtest_id)`
- Stores indicator_snapshot (JSONB) for audit
- Distinguishes live vs backtest decisions
**backtest_runs** - Backtest metadata
- `(id, strategy, config, results)`
- JSON config and results storage
**data_quality** - Quality issues log
- Tracks gaps, anomalies, validation failures
---
## Key Features
### ✅ Implemented
1. **Real-time Data Collection**
- Live WebSocket connection to Hyperliquid
- Automatic reconnection on failure
- Buffering and batch database writes
2. **Custom Timeframes**
- 37-minute candles (Fibonacci sequence)
- 148-minute candles (4 × 37m)
- Real-time aggregation from 1m data
3. **Technical Indicators**
- Moving Average 44 (MA44)
- Moving Average 125 (MA125)
- Computed on all timeframes
4. **Strategy Engine**
- Pluggable strategy system
- Position-aware decisions
- Confidence scoring
5. **Backtesting Framework**
- Historical replay capability
- Same code path as live trading
- P&L tracking and statistics
6. **REST API**
- Full CRUD for all data types
- Async backtest execution
- CSV export functionality
7. **Docker Deployment**
- Docker Compose configuration
- Optimized for Synology DS218+
- Persistent volumes for data
---
## Configuration
### Indicators (Hardcoded in main.py)
```python
indicator_configs = [
IndicatorConfig("ma44", "sma", 44, ["37m", "148m", "1d"]),
IndicatorConfig("ma125", "sma", 125, ["37m", "148m", "1d"])
]
```
### Database Connection
Environment variables:
- `DB_HOST` (default: localhost)
- `DB_PORT` (default: 5432)
- `DB_NAME` (default: btc_data)
- `DB_USER` (default: btc_bot)
- `DB_PASSWORD` (default: empty)
### Buffer Settings
- Max size: 1000 candles
- Flush interval: 30 seconds
- Batch size: 100 candles
---
## Usage Examples
### Run Data Collector
```bash
cd src/data_collector && python -m data_collector.main
```
### Run API Server
```bash
cd src/api && uvicorn server:app --reload --host 0.0.0.0 --port 8000
```
### Run Backtest (CLI)
```bash
python -m data_collector.backtester \
--symbol BTC \
--intervals 37m \
--start 2025-01-01
```
### Run Backtest (API)
```bash
curl -X POST http://localhost:8000/api/v1/backtests \
-H "Content-Type: application/json" \
-d '{"symbol": "BTC", "intervals": ["37m"], "start_date": "2025-01-01"}'
```
### Query Candles
```bash
curl "http://localhost:8000/api/v1/candles?symbol=BTC&interval=37m&limit=100"
```
---
## Code Quality
### Strengths
- Full async/await implementation
- Comprehensive type hints
- Proper error handling and logging
- Stateless design for core logic
- Clean separation of concerns
- Follows AGENTS.md style guidelines
- Docker-ready deployment
### Testing
- No unit tests implemented yet
- No integration tests
- Recommendation: Add pytest suite
---
## Performance Characteristics
### Data Collection
- Handles 1 candle/minute per symbol
- Buffer prevents database overload
- Batch inserts reduce DB round trips
### Memory Usage
- Fixed buffer size (1000 candles)
- Connection pooling (max 20)
- Optimized for low-memory devices (Synology DS218+)
### Database
- TimescaleDB compression (7-day policy)
- Proper indexing on all tables
- Window functions for efficient queries
---
## Missing Features (Future Work)
### High Priority
1. **Unit Tests** - pytest suite for all components
2. **Integration Tests** - End-to-end testing
3. **Alerting System** - Telegram/Discord notifications
4. **Configuration File** - YAML-based config (currently hardcoded)
### Medium Priority
5. **Additional Indicators** - RSI, MACD, Bollinger Bands
6. **Advanced Strategies** - Multi-indicator strategies
7. **Performance Metrics** - Sharpe ratio, max drawdown
8. **Data Validation** - Enhanced quality checks
### Low Priority
9. **WebSocket API** - Real-time streaming
10. **Multi-symbol Support** - Beyond BTC
11. **Machine Learning** - Pattern recognition
12. **Paper Trading** - Test mode before live
---
## Production Readiness
### ✅ Ready for Production
- Core data collection pipeline
- Database schema and operations
- REST API endpoints
- Backtesting framework
- Docker deployment
### ⚠️ Needs Attention
- No automated testing
- No monitoring/alerting
- Basic strategies only
- No backup/restore scripts
---
## Summary
**The BTC Accumulation Bot is a fully functional, production-ready system for collecting and analyzing cryptocurrency trading data.**
It successfully implements:
- Live data collection from Hyperliquid
- Custom timeframe generation (37m, 148m)
- Technical indicator computation (MA44, MA125)
- Strategy-based decision making
- Historical backtesting with P&L tracking
- REST API with dashboard
- Docker deployment
The system is architected for consistency between live trading and backtesting, ensuring that strategies tested historically will behave identically in production.
**Current State: Ready for live deployment and iterative enhancement.**

View File

@ -32,6 +32,15 @@ data_collection:
# Intervals to collect (1m is base, others computed) # Intervals to collect (1m is base, others computed)
intervals: intervals:
- "1m" # Base collection - "1m" # Base collection
indicators:
ma44:
type: "sma"
period: 44
intervals: ["1d"]
ma125:
type: "sma"
period: 125
intervals: ["1d"]
# WebSocket settings # WebSocket settings
websocket: websocket:

View File

@ -1,3 +1,4 @@
# Update docker-compose.yml to mount source code as volume
version: '3.8' version: '3.8'
services: services:
@ -45,6 +46,7 @@ services:
- DB_PASSWORD=${DB_PASSWORD} - DB_PASSWORD=${DB_PASSWORD}
- LOG_LEVEL=INFO - LOG_LEVEL=INFO
volumes: volumes:
- ../src:/app/src
- /volume1/btc_bot/logs:/app/logs - /volume1/btc_bot/logs:/app/logs
- ../config:/app/config:ro - ../config:/app/config:ro
depends_on: depends_on:
@ -72,6 +74,7 @@ services:
- DB_USER=btc_bot - DB_USER=btc_bot
- DB_PASSWORD=${DB_PASSWORD} - DB_PASSWORD=${DB_PASSWORD}
volumes: volumes:
- ../src:/app/src
- /volume1/btc_bot/exports:/app/exports - /volume1/btc_bot/exports:/app/exports
- ../config:/app/config:ro - ../config:/app/config:ro
depends_on: depends_on:
@ -80,4 +83,4 @@ services:
deploy: deploy:
resources: resources:
limits: limits:
memory: 512M memory: 512M

View File

@ -51,7 +51,11 @@ SELECT create_hypertable('indicators', 'time',
if_not_exists => TRUE if_not_exists => TRUE
); );
-- 7. Create index for indicators -- 7. Create unique constraint + index for indicators (required for upserts)
ALTER TABLE indicators
ADD CONSTRAINT indicators_unique
UNIQUE (time, symbol, interval, indicator_name);
CREATE INDEX IF NOT EXISTS idx_indicators_lookup CREATE INDEX IF NOT EXISTS idx_indicators_lookup
ON indicators (symbol, interval, indicator_name, time DESC); ON indicators (symbol, interval, indicator_name, time DESC);
@ -135,5 +139,61 @@ SELECT
FROM candles FROM candles
GROUP BY symbol; GROUP BY symbol;
-- 16. Create decisions table (brain outputs - buy/sell/hold with full context)
CREATE TABLE IF NOT EXISTS decisions (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
decision_type TEXT NOT NULL,
strategy TEXT NOT NULL,
confidence DECIMAL(5,4),
price_at_decision DECIMAL(18,8),
indicator_snapshot JSONB NOT NULL,
candle_snapshot JSONB NOT NULL,
reasoning TEXT,
backtest_id TEXT,
executed BOOLEAN DEFAULT FALSE,
execution_price DECIMAL(18,8),
execution_time TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 17. Convert decisions to hypertable
SELECT create_hypertable('decisions', 'time',
chunk_time_interval => INTERVAL '7 days',
if_not_exists => TRUE
);
-- 18. Indexes for decisions - separate live from backtest queries
CREATE INDEX IF NOT EXISTS idx_decisions_live
ON decisions (symbol, interval, time DESC) WHERE backtest_id IS NULL;
CREATE INDEX IF NOT EXISTS idx_decisions_backtest
ON decisions (backtest_id, symbol, time DESC) WHERE backtest_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_decisions_type
ON decisions (symbol, decision_type, time DESC);
-- 19. Create backtest_runs metadata table
CREATE TABLE IF NOT EXISTS backtest_runs (
id TEXT PRIMARY KEY,
strategy TEXT NOT NULL,
symbol TEXT NOT NULL DEFAULT 'BTC',
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ NOT NULL,
intervals TEXT[] NOT NULL,
config JSONB,
results JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 20. Compression for decisions
ALTER TABLE decisions SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'symbol,interval,strategy'
);
SELECT add_compression_policy('decisions', INTERVAL '7 days', if_not_exists => TRUE);
-- Success message -- Success message
SELECT 'Database schema initialized successfully' as status; SELECT 'Database schema initialized successfully' as status;

View File

@ -0,0 +1,673 @@
# Guide: Connecting to Synology PostgreSQL from Local PC
This guide explains how to connect to your Synology's TimescaleDB from your local PC to work with historical 1m candle data.
## Prerequisites
### 1. Install PostgreSQL Client on Your PC
**Windows:**
```bash
# Download from: https://www.postgresql.org/download/windows/
# Or use chocolatey:
choco install postgresql
```
**Mac:**
```bash
brew install postgresql
# Or download Postgres.app from postgresapp.com
```
**Linux:**
```bash
# Ubuntu/Debian
sudo apt-get install postgresql-client
# Or use Docker:
docker run -it --rm postgres:15 psql --version
```
### 2. Install Python Dependencies
```bash
pip install asyncpg pandas numpy
# Or use requirements.txt from the project
pip install -r requirements.txt
```
## Step 1: Configure Synology for Remote Access
### Open PostgreSQL Port
1. **SSH into your Synology:**
```bash
ssh admin@YOUR_SYNOLOGY_IP
```
2. **Edit PostgreSQL configuration:**
```bash
# Find postgresql.conf (usually in /var/lib/postgresql/data/)
sudo vim /var/lib/postgresql/data/postgresql.conf
# Change:
# listen_addresses = 'localhost'
# To:
listen_addresses = '*'
```
3. **Edit pg_hba.conf to allow remote connections:**
```bash
sudo vim /var/lib/postgresql/data/pg_hba.conf
# Add at the end:
host all all 0.0.0.0/0 md5
# Or for specific IP:
host all all YOUR_PC_IP/32 md5
```
4. **Restart PostgreSQL:**
```bash
sudo systemctl restart postgresql
# Or if using Docker:
cd ~/btc_bot/docker && docker-compose restart timescaledb
```
### Configure Synology Firewall
1. Open **Control Panel** → **Security** → **Firewall**
2. Click **Edit Rules**
3. Create new rule:
- **Ports:** Custom → TCP → 5433 (or your PostgreSQL port)
- **Source IP:** Your PC's IP address (or allow all)
- **Action:** Allow
4. Apply the rule
## Step 2: Test Connection
### Using psql CLI
```bash
# Replace with your Synology IP
export DB_HOST=192.168.1.100 # Your Synology IP
export DB_PORT=5433
export DB_NAME=btc_data
export DB_USER=btc_bot
export DB_PASSWORD=your_password
# Test connection
psql -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME -c "SELECT version();"
```
### Using Python
Create `test_connection.py`:
```python
import asyncio
import asyncpg
async def test():
conn = await asyncpg.connect(
host='192.168.1.100', # Your Synology IP
port=5433,
database='btc_data',
user='btc_bot',
password='your_password'
)
version = await conn.fetchval('SELECT version()')
print(f"Connected! PostgreSQL version: {version}")
# Test candle count
count = await conn.fetchval(
"SELECT COUNT(*) FROM candles WHERE interval = '1m'"
)
print(f"Total 1m candles: {count:,}")
await conn.close()
asyncio.run(test())
```
Run it:
```bash
python test_connection.py
```
## Step 3: Export Historical 1m Data
### Option A: Using psql (Quick Export)
```bash
psql -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME -c "
COPY (
SELECT time, symbol, interval, open, high, low, close, volume
FROM candles
WHERE symbol = 'BTC'
AND interval = '1m'
AND time >= '2025-01-01'
ORDER BY time
) TO STDOUT WITH CSV HEADER;
" > btc_1m_candles.csv
```
### Option B: Using Python (With Progress Bar)
Create `export_candles.py`:
```python
import asyncio
import asyncpg
import csv
from datetime import datetime, timezone
from tqdm import tqdm
async def export_candles(
host: str,
port: int,
database: str,
user: str,
password: str,
symbol: str = 'BTC',
interval: str = '1m',
start_date: str = '2025-01-01',
output_file: str = 'candles.csv'
):
"""Export candles to CSV with progress bar"""
conn = await asyncpg.connect(
host=host, port=port, database=database,
user=user, password=password
)
try:
# Get total count
total = await conn.fetchval("""
SELECT COUNT(*) FROM candles
WHERE symbol = $1 AND interval = $2 AND time >= $3
""", symbol, interval, datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc))
print(f"Exporting {total:,} candles...")
# Export in batches
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['time', 'symbol', 'interval', 'open', 'high', 'low', 'close', 'volume'])
async with conn.transaction():
cursor = await conn.cursor(
"SELECT time, symbol, interval, open, high, low, close, volume "
"FROM candles WHERE symbol = $1 AND interval = $2 AND time >= $3 "
"ORDER BY time",
symbol, interval, datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc)
)
with tqdm(total=total, unit='candles') as pbar:
while True:
rows = await cursor.fetch(1000)
if not rows:
break
for row in rows:
writer.writerow(row)
pbar.update(len(rows))
print(f"✓ Exported to {output_file}")
print(f" Total rows: {total:,}")
finally:
await conn.close()
if __name__ == "__main__":
asyncio.run(export_candles(
host='192.168.1.100', # Change to your Synology IP
port=5433,
database='btc_data',
user='btc_bot',
password='your_password',
symbol='BTC',
interval='1m',
start_date='2025-01-01',
output_file='btc_1m_candles.csv'
))
```
Run:
```bash
pip install tqdm
python export_candles.py
```
## Step 4: Calculate Indicators on PC
### Using pandas-ta (Recommended)
Create `calculate_indicators.py`:
```python
import pandas as pd
import pandas_ta as ta
from datetime import datetime
def calculate_indicators(input_file: str, output_file: str):
"""Calculate technical indicators from candle data"""
# Read candles
print(f"Reading {input_file}...")
df = pd.read_csv(input_file)
df['time'] = pd.to_datetime(df['time'])
df = df.sort_values('time')
print(f"Loaded {len(df):,} candles")
# Calculate indicators
print("Calculating indicators...")
# Moving Averages
df['ma44'] = ta.sma(df['close'], length=44)
df['ma125'] = ta.sma(df['close'], length=125)
df['ma200'] = ta.sma(df['close'], length=200)
# RSI
df['rsi'] = ta.rsi(df['close'], length=14)
# Bollinger Bands
bb = ta.bbands(df['close'], length=20, std=2)
df['bb_upper'] = bb['BBU_20_2.0']
df['bb_middle'] = bb['BBM_20_2.0']
df['bb_lower'] = bb['BBL_20_2.0']
# MACD
macd = ta.macd(df['close'], fast=12, slow=26, signal=9)
df['macd'] = macd['MACD_12_26_9']
df['macd_signal'] = macd['MACDs_12_26_9']
df['macd_histogram'] = macd['MACDh_12_26_9']
# Save
print(f"Saving to {output_file}...")
df.to_csv(output_file, index=False)
print(f"✓ Calculated {len(df.columns) - 8} indicators")
print(f" Output: {output_file}")
return df
if __name__ == "__main__":
df = calculate_indicators(
input_file='btc_1m_candles.csv',
output_file='btc_1m_with_indicators.csv'
)
# Show sample
print("\nSample data:")
print(df[['time', 'close', 'ma44', 'ma125', 'rsi']].tail(10))
```
Install dependencies:
```bash
pip install pandas pandas-ta
```
Run:
```bash
python calculate_indicators.py
```
### Performance Tips
- **Chunk processing** for very large files (> 1GB):
```python
# Process 100k rows at a time
chunksize = 100000
for chunk in pd.read_csv(input_file, chunksize=chunksize):
# Calculate indicators for chunk
pass
```
- **Use multiple cores:**
```python
from multiprocessing import Pool
# Parallelize indicator calculation
```
## Step 5: Import Indicators Back to Synology
### Option A: Direct SQL Insert (Fastest)
Create `import_indicators.py`:
```python
import asyncio
import asyncpg
import pandas as pd
from datetime import datetime
from tqdm import tqdm
async def import_indicators(
host: str,
port: int,
database: str,
user: str,
password: str,
input_file: str,
batch_size: int = 1000
):
"""Import calculated indicators to Synology database"""
# Read calculated indicators
print(f"Reading {input_file}...")
df = pd.read_csv(input_file)
print(f"Loaded {len(df):,} rows with {len(df.columns)} columns")
# Connect to database
conn = await asyncpg.connect(
host=host, port=port, database=database,
user=user, password=password
)
try:
# Get indicator columns (exclude candle data)
indicator_cols = [c for c in df.columns if c not in
['time', 'symbol', 'interval', 'open', 'high', 'low', 'close', 'volume']]
print(f"Importing {len(indicator_cols)} indicators: {indicator_cols}")
# Prepare data
symbol = df['symbol'].iloc[0]
interval = df['interval'].iloc[0]
total_inserted = 0
with tqdm(total=len(df) * len(indicator_cols), unit='indicators') as pbar:
for col in indicator_cols:
# Prepare batch
values = []
for _, row in df.iterrows():
if pd.notna(row[col]): # Skip NaN values
values.append((
row['time'],
symbol,
interval,
col, # indicator_name
float(row[col]),
{} # parameters (JSONB)
))
# Insert in batches
for i in range(0, len(values), batch_size):
batch = values[i:i + batch_size]
await conn.executemany(
"""
INSERT INTO indicators (time, symbol, interval, indicator_name, value, parameters)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (time, symbol, interval, indicator_name) DO UPDATE SET
value = EXCLUDED.value,
updated_at = NOW()
""",
batch
)
total_inserted += len(batch)
pbar.update(len(batch))
print(f"\n✓ Imported {total_inserted:,} indicator values")
finally:
await conn.close()
if __name__ == "__main__":
asyncio.run(import_indicators(
host='192.168.1.100',
port=5433,
database='btc_data',
user='btc_bot',
password='your_password',
input_file='btc_1m_with_indicators.csv',
batch_size=1000
))
```
Run:
```bash
python import_indicators.py
```
### Option B: Using psql COPY (For Large Files)
```bash
# Convert to format for COPY command
python -c "
import pandas as pd
df = pd.read_csv('btc_1m_with_indicators.csv')
# Transform to long format for indicators table
indicators = []
for col in ['ma44', 'ma125', 'rsi', 'bb_upper', 'bb_lower']:
temp = df[['time', 'symbol', 'interval', col]].copy()
temp['indicator_name'] = col
temp = temp.rename(columns={col: 'value'})
indicators.append(temp)
result = pd.concat(indicators)
result = result[result['value'].notna()]
result.to_csv('indicators_for_import.csv', index=False)
"
# Upload and import on Synology
psql -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME -c "
COPY indicators (time, symbol, interval, indicator_name, value)
FROM '/path/to/indicators_for_import.csv'
DELIMITER ',' CSV HEADER;
"
```
## Complete Workflow Script
Create `sync_indicators.py` for one-command workflow:
```python
#!/usr/bin/env python3
"""
Complete workflow: Export candles → Calculate indicators → Import to Synology
"""
import asyncio
import asyncpg
import pandas as pd
import pandas_ta as ta
from datetime import datetime, timezone
from tqdm import tqdm
class IndicatorSync:
def __init__(self, host: str, port: int, database: str, user: str, password: str):
self.db_config = {
'host': host, 'port': port, 'database': database,
'user': user, 'password': password
}
async def export_and_calculate(
self,
symbol: str = 'BTC',
interval: str = '1m',
start_date: str = '2025-01-01',
indicators_config: dict = None
):
"""Main workflow"""
print("="*70)
print(f"INDICATOR SYNC: {symbol}/{interval}")
print(f"Period: {start_date} to now")
print("="*70)
# Connect to database
conn = await asyncpg.connect(**self.db_config)
try:
# Step 1: Export candles
print("\n📥 Step 1: Exporting candles...")
candles = await self._export_candles(conn, symbol, interval, start_date)
print(f" Exported {len(candles):,} candles")
# Step 2: Calculate indicators
print("\n⚙ Step 2: Calculating indicators...")
indicators_df = self._calculate_indicators(candles, indicators_config)
print(f" Calculated {len(indicators_df)} indicator values")
# Step 3: Import indicators
print("\n📤 Step 3: Importing to database...")
await self._import_indicators(conn, indicators_df)
print(" Import complete!")
print("\n" + "="*70)
print("✅ SYNC COMPLETE")
print("="*70)
finally:
await conn.close()
async def _export_candles(self, conn, symbol, interval, start_date):
"""Export candles from database"""
rows = await conn.fetch(
"SELECT time, symbol, interval, open, high, low, close, volume "
"FROM candles WHERE symbol = $1 AND interval = $2 AND time >= $3 "
"ORDER BY time",
symbol, interval, datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc)
)
return pd.DataFrame(rows, columns=['time', 'symbol', 'interval', 'open', 'high', 'low', 'close', 'volume'])
def _calculate_indicators(self, candles_df, config=None):
"""Calculate technical indicators"""
df = candles_df.copy()
# Default indicators
config = config or {
'ma44': lambda d: ta.sma(d['close'], length=44),
'ma125': lambda d: ta.sma(d['close'], length=125),
'rsi': lambda d: ta.rsi(d['close'], length=14),
}
# Calculate each indicator
for name, func in config.items():
df[name] = func(df)
# Transform to long format
indicators = []
indicator_cols = list(config.keys())
for col in indicator_cols:
temp = df[['time', 'symbol', 'interval', col]].copy()
temp['indicator_name'] = col
temp = temp.rename(columns={col: 'value'})
temp = temp[temp['value'].notna()]
indicators.append(temp)
return pd.concat(indicators, ignore_index=True)
async def _import_indicators(self, conn, indicators_df):
"""Import indicators to database"""
# Convert to list of tuples
values = [
(row['time'], row['symbol'], row['interval'], row['indicator_name'], float(row['value']), {})
for _, row in indicators_df.iterrows()
]
# Insert in batches
batch_size = 1000
for i in tqdm(range(0, len(values), batch_size), desc="Importing"):
batch = values[i:i + batch_size]
await conn.executemany(
"""
INSERT INTO indicators (time, symbol, interval, indicator_name, value, parameters)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (time, symbol, interval, indicator_name) DO UPDATE SET
value = EXCLUDED.value,
updated_at = NOW()
""",
batch
)
# Usage
if __name__ == "__main__":
sync = IndicatorSync(
host='192.168.1.100', # Your Synology IP
port=5433,
database='btc_data',
user='btc_bot',
password='your_password'
)
asyncio.run(sync.export_and_calculate(
symbol='BTC',
interval='1m',
start_date='2025-01-01'
))
```
Run complete workflow:
```bash
python sync_indicators.py
```
## Troubleshooting
### Connection Refused
```
Error: connection refused
```
- Check if PostgreSQL is running: `docker ps | grep timescale`
- Verify port is open: `telnet SYNOLOGY_IP 5433`
- Check firewall rules on Synology
### Permission Denied
```
Error: password authentication failed
```
- Verify password is correct
- Check pg_hba.conf has proper entries
- Restart PostgreSQL after config changes
### Slow Performance
- Use batch inserts (1000 rows at a time)
- Process large datasets in chunks
- Consider using COPY command for very large imports
- Close cursor after use
### Memory Issues
- Don't load entire table into memory
- Use server-side cursors
- Process in smaller date ranges
- Use chunksize when reading CSV
## Security Notes
⚠️ **Important:**
- Use strong password for database user
- Limit pg_hba.conf to specific IPs when possible
- Use VPN if accessing over internet
- Consider SSL connection for remote access
- Don't commit passwords to git
## Next Steps
Once indicators are calculated and imported:
1. **Backtests will be fast** - Server just reads pre-calculated values
2. **Dashboard will load quickly** - No on-the-fly calculation needed
3. **Can add more indicators** - Just re-run sync with new calculations
## Alternative: SSH Tunnel (More Secure)
Instead of opening PostgreSQL port, use SSH tunnel:
```bash
# On your PC
ssh -L 5433:localhost:5433 admin@SYNOLOGY_IP
# Now connect to localhost:5433 (tunnels to Synology)
export DB_HOST=localhost
export DB_PORT=5433
python sync_indicators.py
```
This encrypts all traffic and doesn't require opening PostgreSQL port.
---
**Questions or issues?** Check the logs and verify each step works before proceeding to the next.

107
scripts/check_db_stats.py Normal file
View File

@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""
Quick database statistics checker
Shows oldest date, newest date, and count for each interval
"""
import asyncio
import asyncpg
import os
from datetime import datetime
async def check_database_stats():
# Database connection (uses same env vars as your app)
conn = await asyncpg.connect(
host=os.getenv('DB_HOST', 'localhost'),
port=int(os.getenv('DB_PORT', 5432)),
database=os.getenv('DB_NAME', 'btc_data'),
user=os.getenv('DB_USER', 'btc_bot'),
password=os.getenv('DB_PASSWORD', '')
)
try:
print("=" * 70)
print("DATABASE STATISTICS")
print("=" * 70)
print()
# Check for each interval
intervals = ['1m', '3m', '5m', '15m', '30m', '37m', '1h', '2h', '4h', '8h', '12h', '1d']
for interval in intervals:
stats = await conn.fetchrow("""
SELECT
COUNT(*) as count,
MIN(time) as oldest,
MAX(time) as newest
FROM candles
WHERE symbol = 'BTC' AND interval = $1
""", interval)
if stats['count'] > 0:
oldest = stats['oldest'].strftime('%Y-%m-%d %H:%M') if stats['oldest'] else 'N/A'
newest = stats['newest'].strftime('%Y-%m-%d %H:%M') if stats['newest'] else 'N/A'
count = stats['count']
# Calculate days of data
if stats['oldest'] and stats['newest']:
days = (stats['newest'] - stats['oldest']).days
print(f"{interval:6} | {count:>8,} candles | {days:>4} days | {oldest} to {newest}")
print()
print("=" * 70)
# Check indicators
print("\nINDICATORS AVAILABLE:")
indicators = await conn.fetch("""
SELECT DISTINCT indicator_name, interval, COUNT(*) as count
FROM indicators
WHERE symbol = 'BTC'
GROUP BY indicator_name, interval
ORDER BY interval, indicator_name
""")
if indicators:
for ind in indicators:
print(f" {ind['indicator_name']:10} on {ind['interval']:6} | {ind['count']:>8,} values")
else:
print(" No indicators found in database")
print()
print("=" * 70)
# Check 1m specifically with more detail
print("\n1-MINUTE DATA DETAIL:")
one_min_stats = await conn.fetchrow("""
SELECT
COUNT(*) as count,
MIN(time) as oldest,
MAX(time) as newest,
COUNT(*) FILTER (WHERE time > NOW() - INTERVAL '24 hours') as last_24h
FROM candles
WHERE symbol = 'BTC' AND interval = '1m'
""")
if one_min_stats['count'] > 0:
total_days = (one_min_stats['newest'] - one_min_stats['oldest']).days
expected_candles = total_days * 24 * 60 # 1 candle per minute
actual_candles = one_min_stats['count']
coverage = (actual_candles / expected_candles) * 100 if expected_candles > 0 else 0
print(f" Total candles: {actual_candles:,}")
print(f" Date range: {one_min_stats['oldest'].strftime('%Y-%m-%d')} to {one_min_stats['newest'].strftime('%Y-%m-%d')}")
print(f" Total days: {total_days}")
print(f" Expected candles: {expected_candles:,} (if complete)")
print(f" Coverage: {coverage:.1f}%")
print(f" Last 24 hours: {one_min_stats['last_24h']:,} candles")
else:
print(" No 1m data found")
print()
print("=" * 70)
finally:
await conn.close()
if __name__ == "__main__":
asyncio.run(check_database_stats())

18
scripts/check_status.sh Normal file
View File

@ -0,0 +1,18 @@
#!/bin/bash
# Check the status of the indicators table (constraints and compression)
docker exec -i btc_timescale psql -U btc_bot -d btc_data <<EOF
\x
SELECT 'Checking constraints...' as step;
SELECT conname, pg_get_constraintdef(oid)
FROM pg_constraint
WHERE conrelid = 'indicators'::regclass;
SELECT 'Checking compression settings...' as step;
SELECT * FROM timescaledb_information.hypertables
WHERE hypertable_name = 'indicators';
SELECT 'Checking compression jobs...' as step;
SELECT * FROM timescaledb_information.jobs
WHERE hypertable_name = 'indicators';
EOF

View File

@ -0,0 +1,54 @@
#!/bin/bash
# Fix indicators table schema - Version 2 (Final)
# Handles TimescaleDB compression constraints properly
echo "Fixing indicators table schema (v2)..."
# 1. Decompress chunks individually (safest method)
# We fetch the list of compressed chunks and process them one by one
echo "Checking for compressed chunks..."
CHUNKS=$(docker exec -i btc_timescale psql -U btc_bot -d btc_data -t -c "SELECT chunk_schema || '.' || chunk_name FROM timescaledb_information.chunks WHERE hypertable_name = 'indicators' AND is_compressed = true;")
for chunk in $CHUNKS; do
# Trim whitespace
chunk=$(echo "$chunk" | xargs)
if [[ ! -z "$chunk" ]]; then
echo "Decompressing chunk: $chunk"
docker exec -i btc_timescale psql -U btc_bot -d btc_data -c "SELECT decompress_chunk('$chunk');"
fi
done
# 2. Execute the schema changes
docker exec -i btc_timescale psql -U btc_bot -d btc_data <<EOF
BEGIN;
-- Remove policy first
SELECT remove_compression_policy('indicators', if_exists => true);
-- Disable compression setting (REQUIRED to add unique constraint)
ALTER TABLE indicators SET (timescaledb.compress = false);
-- Deduplicate data (just in case duplicates exist)
DELETE FROM indicators a USING indicators b
WHERE a.ctid < b.ctid
AND a.time = b.time
AND a.symbol = b.symbol
AND a.interval = b.interval
AND a.indicator_name = b.indicator_name;
-- Add the unique constraint
ALTER TABLE indicators ADD CONSTRAINT indicators_unique UNIQUE (time, symbol, interval, indicator_name);
-- Re-enable compression configuration
ALTER TABLE indicators SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'symbol,interval,indicator_name'
);
-- Re-add compression policy (7 days)
SELECT add_compression_policy('indicators', INTERVAL '7 days', if_not_exists => true);
COMMIT;
SELECT 'Indicators schema fix v2 completed successfully' as status;
EOF

11
scripts/run_test.sh Normal file
View File

@ -0,0 +1,11 @@
#!/bin/bash
# Run performance test inside Docker container
# Usage: ./run_test.sh [days] [interval]
DAYS=${1:-7}
INTERVAL=${2:-1m}
echo "Running MA44 performance test: ${DAYS} days of ${INTERVAL} data"
echo "=================================================="
docker exec btc_collector python scripts/test_ma44_performance.py --days $DAYS --interval $INTERVAL

View File

@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""
Performance Test Script for MA44 Strategy
Tests backtesting performance on Synology DS218+ with 6GB RAM
Usage:
python test_ma44_performance.py [--days DAYS] [--interval INTERVAL]
Example:
python test_ma44_performance.py --days 7 --interval 1m
"""
import asyncio
import argparse
import time
import sys
import os
from datetime import datetime, timedelta, timezone
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from data_collector.database import DatabaseManager
from data_collector.indicator_engine import IndicatorEngine, IndicatorConfig
from data_collector.brain import Brain
from data_collector.backtester import Backtester
async def run_performance_test(days: int = 7, interval: str = "1m"):
"""Run MA44 backtest and measure performance"""
print("=" * 70)
print(f"PERFORMANCE TEST: MA44 Strategy")
print(f"Timeframe: {interval}")
print(f"Period: Last {days} days")
print(f"Hardware: Synology DS218+ (6GB RAM)")
print("=" * 70)
print()
# Database connection (adjust these if needed)
db = DatabaseManager(
host=os.getenv('DB_HOST', 'localhost'),
port=int(os.getenv('DB_PORT', 5432)),
database=os.getenv('DB_NAME', 'btc_data'),
user=os.getenv('DB_USER', 'btc_bot'),
password=os.getenv('DB_PASSWORD', '')
)
try:
await db.connect()
print("✓ Database connected")
# Calculate date range
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
print(f"✓ Date range: {start_date.date()} to {end_date.date()}")
print(f"✓ Symbol: BTC")
print(f"✓ Strategy: MA44 (44-period SMA)")
print()
# Check data availability
async with db.acquire() as conn:
count = await conn.fetchval("""
SELECT COUNT(*) FROM candles
WHERE symbol = 'BTC'
AND interval = $1
AND time >= $2
AND time <= $3
""", interval, start_date, end_date)
print(f"📊 Data points: {count:,} {interval} candles")
if count == 0:
print("❌ ERROR: No data found for this period!")
print(f" Run: python -m data_collector.backfill --days {days} --intervals {interval}")
return
print(f" (Expected: ~{count * int(interval.replace('m','').replace('h','').replace('d',''))} minutes of data)")
print()
# Setup indicator configuration
indicator_configs = [
IndicatorConfig("ma44", "sma", 44, [interval])
]
engine = IndicatorEngine(db, indicator_configs)
brain = Brain(db, engine)
backtester = Backtester(db, engine, brain)
print("⚙️ Running backtest...")
print("-" * 70)
# Measure execution time
start_time = time.time()
await backtester.run("BTC", [interval], start_date, end_date)
end_time = time.time()
execution_time = end_time - start_time
print("-" * 70)
print()
# Fetch results from database
async with db.acquire() as conn:
latest_backtest = await conn.fetchrow("""
SELECT id, strategy, start_time, end_time, intervals, results, created_at
FROM backtest_runs
WHERE strategy LIKE '%ma44%'
ORDER BY created_at DESC
LIMIT 1
""")
if latest_backtest and latest_backtest['results']:
import json
results = json.loads(latest_backtest['results'])
print("📈 RESULTS:")
print("=" * 70)
print(f" Total Trades: {results.get('total_trades', 'N/A')}")
print(f" Win Rate: {results.get('win_rate', 0):.1f}%")
print(f" Win Count: {results.get('win_count', 0)}")
print(f" Loss Count: {results.get('loss_count', 0)}")
print(f" Total P&L: ${results.get('total_pnl', 0):.2f}")
print(f" P&L Percent: {results.get('total_pnl_pct', 0):.2f}%")
print(f" Initial Balance: ${results.get('initial_balance', 1000):.2f}")
print(f" Final Balance: ${results.get('final_balance', 1000):.2f}")
print(f" Max Drawdown: {results.get('max_drawdown', 0):.2f}%")
print()
print("⏱️ PERFORMANCE:")
print(f" Execution Time: {execution_time:.2f} seconds")
print(f" Candles/Second: {count / execution_time:.0f}")
print(f" Backtest ID: {latest_backtest['id']}")
print()
# Performance assessment
if execution_time < 30:
print("✅ PERFORMANCE: Excellent (< 30s)")
elif execution_time < 60:
print("✅ PERFORMANCE: Good (< 60s)")
elif execution_time < 300:
print("⚠️ PERFORMANCE: Acceptable (1-5 min)")
else:
print("❌ PERFORMANCE: Slow (> 5 min) - Consider shorter periods or higher TFs")
print()
print("💡 RECOMMENDATIONS:")
if execution_time > 60:
print(" • For faster results, use higher timeframes (15m, 1h, 4h)")
print(" • Or reduce date range (< 7 days)")
else:
print(" • Hardware is sufficient for this workload")
print(" • Can handle larger date ranges or multiple timeframes")
else:
print("❌ ERROR: No results found in database!")
print(" The backtest may have failed. Check server logs.")
except Exception as e:
print(f"\n❌ ERROR: {e}")
import traceback
traceback.print_exc()
finally:
await db.disconnect()
print()
print("=" * 70)
print("Test completed")
print("=" * 70)
def main():
parser = argparse.ArgumentParser(description='Test MA44 backtest performance')
parser.add_argument('--days', type=int, default=7,
help='Number of days to backtest (default: 7)')
parser.add_argument('--interval', type=str, default='1m',
help='Candle interval (default: 1m)')
args = parser.parse_args()
# Run the async test
asyncio.run(run_performance_test(args.days, args.interval))
if __name__ == "__main__":
main()

87
scripts/update_schema.sh Normal file
View File

@ -0,0 +1,87 @@
#!/bin/bash
# Apply schema updates to a running TimescaleDB container without wiping data
echo "Applying schema updates to btc_timescale container..."
# Execute the schema SQL inside the container
# We use psql with the environment variables set in docker-compose
docker exec -i btc_timescale psql -U btc_bot -d btc_data <<EOF
-- 1. Unique constraint for indicators (if not exists)
DO \$\$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'indicators_unique') THEN
ALTER TABLE indicators ADD CONSTRAINT indicators_unique UNIQUE (time, symbol, interval, indicator_name);
END IF;
END \$\$;
-- 2. Index for indicators
CREATE INDEX IF NOT EXISTS idx_indicators_lookup ON indicators (symbol, interval, indicator_name, time DESC);
-- 3. Data health view update
CREATE OR REPLACE VIEW data_health AS
SELECT
symbol,
COUNT(*) as total_candles,
COUNT(*) FILTER (WHERE validated) as validated_candles,
MAX(time) as latest_candle,
MIN(time) as earliest_candle,
NOW() - MAX(time) as time_since_last
FROM candles
GROUP BY symbol;
-- 4. Decisions table
CREATE TABLE IF NOT EXISTS decisions (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
decision_type TEXT NOT NULL,
strategy TEXT NOT NULL,
confidence DECIMAL(5,4),
price_at_decision DECIMAL(18,8),
indicator_snapshot JSONB NOT NULL,
candle_snapshot JSONB NOT NULL,
reasoning TEXT,
backtest_id TEXT,
executed BOOLEAN DEFAULT FALSE,
execution_price DECIMAL(18,8),
execution_time TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 5. Decisions hypertable (ignore error if already exists)
DO \$\$
BEGIN
PERFORM create_hypertable('decisions', 'time', chunk_time_interval => INTERVAL '7 days', if_not_exists => TRUE);
EXCEPTION WHEN OTHERS THEN
NULL; -- Ignore if already hypertable
END \$\$;
-- 6. Decisions indexes
CREATE INDEX IF NOT EXISTS idx_decisions_live ON decisions (symbol, interval, time DESC) WHERE backtest_id IS NULL;
CREATE INDEX IF NOT EXISTS idx_decisions_backtest ON decisions (backtest_id, symbol, time DESC) WHERE backtest_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_decisions_type ON decisions (symbol, decision_type, time DESC);
-- 7. Backtest runs table
CREATE TABLE IF NOT EXISTS backtest_runs (
id TEXT PRIMARY KEY,
strategy TEXT NOT NULL,
symbol TEXT NOT NULL DEFAULT 'BTC',
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ NOT NULL,
intervals TEXT[] NOT NULL,
config JSONB,
results JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 8. Compression policies
DO \$\$
BEGIN
ALTER TABLE decisions SET (timescaledb.compress, timescaledb.compress_segmentby = 'symbol,interval,strategy');
PERFORM add_compression_policy('decisions', INTERVAL '7 days', if_not_exists => TRUE);
EXCEPTION WHEN OTHERS THEN
NULL; -- Ignore compression errors if already set
END \$\$;
SELECT 'Schema update completed successfully' as status;
EOF

View File

@ -396,13 +396,173 @@
font-size: 14px; font-size: 14px;
} }
@media (max-width: 1200px) { /* Simulation Panel Styles */
.sim-strategies {
display: flex;
flex-direction: column;
gap: 4px;
margin: 8px 0;
}
.sim-strategy-option {
display: flex;
align-items: center;
gap: 8px;
padding: 8px;
border-radius: 4px;
cursor: pointer;
transition: background 0.2s;
}
.sim-strategy-option:hover {
background: var(--tv-hover);
}
.sim-strategy-option input[type="radio"] {
cursor: pointer;
}
.sim-strategy-option label {
cursor: pointer;
flex: 1;
font-size: 13px;
}
.sim-strategy-info {
color: var(--tv-text-secondary);
cursor: help;
position: relative;
font-size: 14px;
width: 20px;
height: 20px;
display: flex;
align-items: center;
justify-content: center;
border-radius: 50%;
transition: background 0.2s;
}
.sim-strategy-info:hover {
background: var(--tv-hover);
color: var(--tv-text);
}
/* Tooltip */
.sim-strategy-info:hover::after {
content: attr(data-tooltip);
position: absolute;
bottom: 100%;
right: 0;
background: var(--tv-panel-bg);
border: 1px solid var(--tv-border);
padding: 8px 12px;
border-radius: 4px;
font-size: 12px;
width: 250px;
z-index: 100;
box-shadow: 0 4px 12px rgba(0,0,0,0.3);
color: var(--tv-text);
margin-bottom: 4px;
line-height: 1.4;
pointer-events: none;
}
.sim-input-group {
margin-bottom: 12px;
}
.sim-input-group label {
display: block;
font-size: 11px;
color: var(--tv-text-secondary);
margin-bottom: 4px;
text-transform: uppercase;
}
.sim-input {
width: 100%;
padding: 8px 12px;
background: var(--tv-bg);
border: 1px solid var(--tv-border);
border-radius: 4px;
color: var(--tv-text);
font-size: 13px;
font-family: inherit;
}
.sim-input:focus {
outline: none;
border-color: var(--tv-blue);
}
.sim-run-btn {
width: 100%;
padding: 10px;
background: var(--tv-blue);
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 13px;
font-weight: 600;
transition: opacity 0.2s;
margin-top: 8px;
}
.sim-run-btn:hover:not(:disabled) {
opacity: 0.9;
}
.sim-run-btn:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.sim-results {
margin-top: 16px;
padding-top: 16px;
border-top: 1px solid var(--tv-border);
}
.sim-stat-row {
display: flex;
justify-content: space-between;
padding: 6px 0;
font-size: 13px;
}
.sim-stat-row span:first-child {
color: var(--tv-text-secondary);
}
.sim-value {
font-weight: 600;
font-family: 'Courier New', monospace;
}
.sim-value.positive { color: var(--tv-green); }
.sim-value.negative { color: var(--tv-red); }
.loading-strategies {
padding: 12px;
text-align: center;
color: var(--tv-text-secondary);
font-size: 12px;
}
@media (max-width: 1400px) {
.ta-content {
grid-template-columns: repeat(3, 1fr);
}
}
@media (max-width: 1000px) {
.ta-content { .ta-content {
grid-template-columns: repeat(2, 1fr); grid-template-columns: repeat(2, 1fr);
} }
} }
@media (max-width: 768px) { @media (max-width: 600px) {
.ta-content { .ta-content {
grid-template-columns: 1fr; grid-template-columns: 1fr;
} }
@ -834,18 +994,47 @@
</div> </div>
</div> </div>
<div class="ta-section"> <div class="ta-section" id="simulationPanel">
<div class="ta-section-title">Price Info</div> <div class="ta-section-title">Strategy Simulation</div>
<div class="ta-level">
<span class="ta-level-label">Current</span> <!-- Date picker -->
<span class="ta-level-value">$${data.current_price.toLocaleString()}</span> <div class="sim-input-group" style="margin: 0 0 8px 0;">
<label style="font-size: 10px; text-transform: uppercase; color: var(--tv-text-secondary);">Start Date:</label>
<input type="datetime-local" id="simStartDate" class="sim-input" style="margin-top: 2px;">
</div> </div>
<div style="font-size: 12px; color: var(--tv-text-secondary); margin-top: 8px;">
Based on last 200 candles<br> <!-- Strategies loaded dynamically here -->
Strategy: Trend following with MA crossovers <div id="strategyList" class="sim-strategies" style="max-height: 100px; overflow-y: auto;">
<div class="loading-strategies">Loading strategies...</div>
</div>
<button class="sim-run-btn" onclick="runSimulation()" id="runSimBtn" disabled style="padding: 6px; font-size: 12px; margin-top: 6px;">
Run Simulation
</button>
<!-- Results -->
<div id="simResults" class="sim-results" style="display: none; margin-top: 8px; padding-top: 8px;">
<div class="sim-stat-row" style="padding: 2px 0; font-size: 11px;">
<span>Trades:</span>
<span id="simTrades" class="sim-value">--</span>
</div>
<div class="sim-stat-row" style="padding: 2px 0; font-size: 11px;">
<span>Win Rate:</span>
<span id="simWinRate" class="sim-value">--</span>
</div>
<div class="sim-stat-row" style="padding: 2px 0; font-size: 11px;">
<span>Total P&L:</span>
<span id="simPnL" class="sim-value">--</span>
</div>
</div> </div>
</div> </div>
`; `;
// Load strategies after simulation panel is rendered
setTimeout(() => {
loadStrategies();
setDefaultStartDate();
}, 0);
} }
updateStats(candle) { updateStats(candle) {
@ -891,6 +1080,226 @@
window.open(geminiUrl, '_blank'); window.open(geminiUrl, '_blank');
} }
// Load strategies on page load
async function loadStrategies() {
try {
console.log('Fetching strategies from API...');
// Add timeout to fetch
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 5000); // 5 second timeout
const response = await fetch('/api/v1/strategies', {
signal: controller.signal
});
clearTimeout(timeoutId);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
console.log('Strategies loaded:', data);
if (!data.strategies) {
throw new Error('Invalid response format: missing strategies array');
}
renderStrategies(data.strategies);
} catch (error) {
console.error('Error loading strategies:', error);
let errorMessage = error.message;
if (error.name === 'AbortError') {
errorMessage = 'Request timeout - API server not responding';
} else if (error.message.includes('Failed to fetch')) {
errorMessage = 'Cannot connect to API server - is it running?';
}
document.getElementById('strategyList').innerHTML =
`<div class="loading-strategies" style="color: var(--tv-red);">
${errorMessage}<br>
<small>Check console (F12) for details</small>
</div>`;
}
}
// Render strategy list
function renderStrategies(strategies) {
const container = document.getElementById('strategyList');
if (!strategies || strategies.length === 0) {
container.innerHTML = '<div class="loading-strategies">No strategies available</div>';
return;
}
container.innerHTML = strategies.map((s, index) => `
<div class="sim-strategy-option">
<input type="radio" name="strategy" id="strat_${s.id}"
value="${s.id}" ${index === 0 ? 'checked' : ''}>
<label for="strat_${s.id}">${s.name}</label>
<span class="sim-strategy-info" data-tooltip="${s.description}">ⓘ</span>
</div>
`).join('');
// Enable run button
document.getElementById('runSimBtn').disabled = false;
}
// Run simulation
async function runSimulation() {
const selectedStrategy = document.querySelector('input[name="strategy"]:checked');
if (!selectedStrategy) {
alert('Please select a strategy');
return;
}
const strategyId = selectedStrategy.value;
const startDateInput = document.getElementById('simStartDate').value;
if (!startDateInput) {
alert('Please select a start date');
return;
}
// Format date for API
const startDate = new Date(startDateInput).toISOString().split('T')[0];
// Disable button during simulation
const runBtn = document.getElementById('runSimBtn');
runBtn.disabled = true;
runBtn.textContent = 'Running...';
try {
// Trigger backtest via API
const response = await fetch('/api/v1/backtests', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
symbol: 'BTC',
intervals: [window.dashboard?.currentInterval || '1d'],
start_date: startDate
})
});
const result = await response.json();
if (result.message) {
// Show that simulation is running
runBtn.textContent = 'Running...';
// Poll for results
setTimeout(() => {
pollForBacktestResults(strategyId, startDate);
}, 2000); // Wait 2 seconds then poll
} else {
alert('Failed to start simulation');
}
} catch (error) {
console.error('Error running simulation:', error);
alert('Error running simulation: ' + error.message);
// Reset button only on error
runBtn.disabled = false;
runBtn.textContent = 'Run Simulation';
}
// Button stays as "Running..." until polling completes or times out
}
// Poll for backtest results
async function pollForBacktestResults(strategyId, startDate, attempts = 0) {
const runBtn = document.getElementById('runSimBtn');
if (attempts > 30) { // Stop after 30 attempts (60 seconds)
console.log('Backtest polling timeout');
runBtn.textContent = 'Run Simulation';
runBtn.disabled = false;
// Show timeout message in results area
const simResults = document.getElementById('simResults');
if (simResults) {
simResults.innerHTML = `
<div class="sim-stat-row" style="color: var(--tv-text-secondary); font-size: 11px; text-align: center;">
<span>Simulation timeout - no results found after 60s.<br>Check server logs or try again.</span>
</div>
`;
simResults.style.display = 'block';
}
return;
}
try {
const response = await fetch('/api/v1/backtests?limit=5');
const backtests = await response.json();
// Find the most recent backtest that matches our criteria
const recentBacktest = backtests.find(bt =>
bt.strategy && bt.strategy.includes(strategyId) ||
bt.created_at > new Date(Date.now() - 60000).toISOString() // Created in last minute
);
if (recentBacktest && recentBacktest.results) {
// Parse JSON string if needed (database stores results as text)
const parsedBacktest = {
...recentBacktest,
results: typeof recentBacktest.results === 'string'
? JSON.parse(recentBacktest.results)
: recentBacktest.results
};
// Results found! Display them
displayBacktestResults(parsedBacktest);
runBtn.textContent = 'Run Simulation';
runBtn.disabled = false;
return;
}
// No results yet, poll again in 2 seconds
setTimeout(() => {
pollForBacktestResults(strategyId, startDate, attempts + 1);
}, 2000);
} catch (error) {
console.error('Error polling for backtest results:', error);
runBtn.textContent = 'Run Simulation';
runBtn.disabled = false;
}
}
// Display backtest results in the UI
function displayBacktestResults(backtest) {
// Parse JSON string if needed (database stores results as text)
const results = typeof backtest.results === 'string'
? JSON.parse(backtest.results)
: backtest.results;
// Update the results display
document.getElementById('simTrades').textContent = results.total_trades || '--';
document.getElementById('simWinRate').textContent = results.win_rate ? results.win_rate.toFixed(1) + '%' : '--';
const pnlElement = document.getElementById('simPnL');
const pnl = results.total_pnl || 0;
pnlElement.textContent = (pnl >= 0 ? '+' : '') + '$' + pnl.toFixed(2);
pnlElement.className = 'sim-value ' + (pnl >= 0 ? 'positive' : 'negative');
// Show results section
document.getElementById('simResults').style.display = 'block';
console.log('Backtest results:', backtest);
}
// Set default start date (7 days ago)
function setDefaultStartDate() {
const startDateInput = document.getElementById('simStartDate');
if (startDateInput) {
const sevenDaysAgo = new Date();
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
// Format as datetime-local: YYYY-MM-DDTHH:mm
startDateInput.value = sevenDaysAgo.toISOString().slice(0, 16);
}
}
document.addEventListener('DOMContentLoaded', () => { document.addEventListener('DOMContentLoaded', () => {
window.dashboard = new TradingDashboard(); window.dashboard = new TradingDashboard();
}); });

View File

@ -6,17 +6,28 @@ Removes the complex WebSocket manager that was causing issues
import os import os
import asyncio import asyncio
import logging import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from typing import Optional from typing import Optional
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query from fastapi import FastAPI, HTTPException, Query, BackgroundTasks
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
import asyncpg import asyncpg
import csv import csv
import io import io
from pydantic import BaseModel, Field
# Imports for backtest runner
from src.data_collector.database import DatabaseManager
from src.data_collector.indicator_engine import IndicatorEngine, IndicatorConfig
from src.data_collector.brain import Brain
from src.data_collector.backtester import Backtester
# Imports for strategy discovery
import importlib
from src.strategies.base import BaseStrategy
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@ -88,6 +99,41 @@ async def root():
} }
@app.get("/api/v1/strategies")
async def list_strategies():
"""List all available trading strategies with metadata"""
# Strategy registry from brain.py
strategy_registry = {
"ma44_strategy": "src.strategies.ma44_strategy.MA44Strategy",
"ma125_strategy": "src.strategies.ma125_strategy.MA125Strategy",
}
strategies = []
for strategy_id, class_path in strategy_registry.items():
try:
module_path, class_name = class_path.rsplit('.', 1)
module = importlib.import_module(module_path)
strategy_class = getattr(module, class_name)
# Instantiate to get metadata
strategy_instance = strategy_class()
strategies.append({
"id": strategy_id,
"name": strategy_instance.display_name,
"description": strategy_instance.description,
"required_indicators": strategy_instance.required_indicators
})
except Exception as e:
logger.error(f"Failed to load strategy {strategy_id}: {e}")
return {
"strategies": strategies,
"count": len(strategies)
}
@app.get("/api/v1/candles") @app.get("/api/v1/candles")
async def get_candles( async def get_candles(
symbol: str = Query("BTC", description="Trading pair symbol"), symbol: str = Query("BTC", description="Trading pair symbol"),
@ -215,6 +261,155 @@ async def health_check():
raise HTTPException(status_code=503, detail=f"Health check failed: {str(e)}") raise HTTPException(status_code=503, detail=f"Health check failed: {str(e)}")
@app.get("/api/v1/indicators")
async def get_indicators(
symbol: str = Query("BTC", description="Trading pair symbol"),
interval: str = Query("1d", description="Candle interval"),
name: str = Query(None, description="Filter by indicator name (e.g., ma44)"),
start: Optional[datetime] = Query(None, description="Start time"),
end: Optional[datetime] = Query(None, description="End time"),
limit: int = Query(1000, le=5000)
):
"""Get indicator values"""
async with pool.acquire() as conn:
query = """
SELECT time, indicator_name, value
FROM indicators
WHERE symbol = $1 AND interval = $2
"""
params = [symbol, interval]
if name:
query += f" AND indicator_name = ${len(params) + 1}"
params.append(name)
if start:
query += f" AND time >= ${len(params) + 1}"
params.append(start)
if end:
query += f" AND time <= ${len(params) + 1}"
params.append(end)
query += f" ORDER BY time DESC LIMIT ${len(params) + 1}"
params.append(limit)
rows = await conn.fetch(query, *params)
# Group by time for easier charting
grouped = {}
for row in rows:
ts = row['time'].isoformat()
if ts not in grouped:
grouped[ts] = {'time': ts}
grouped[ts][row['indicator_name']] = float(row['value'])
return {
"symbol": symbol,
"interval": interval,
"data": list(grouped.values())
}
@app.get("/api/v1/decisions")
async def get_decisions(
symbol: str = Query("BTC"),
interval: Optional[str] = Query(None),
backtest_id: Optional[str] = Query(None),
limit: int = Query(100, le=1000)
):
"""Get brain decisions"""
async with pool.acquire() as conn:
query = """
SELECT time, interval, decision_type, strategy, confidence,
price_at_decision, indicator_snapshot, reasoning, backtest_id
FROM decisions
WHERE symbol = $1
"""
params = [symbol]
if interval:
query += f" AND interval = ${len(params) + 1}"
params.append(interval)
if backtest_id:
query += f" AND backtest_id = ${len(params) + 1}"
params.append(backtest_id)
else:
query += " AND backtest_id IS NULL"
query += f" ORDER BY time DESC LIMIT ${len(params) + 1}"
params.append(limit)
rows = await conn.fetch(query, *params)
return [dict(row) for row in rows]
@app.get("/api/v1/backtests")
async def list_backtests(symbol: Optional[str] = None, limit: int = 20):
"""List historical backtests"""
async with pool.acquire() as conn:
query = """
SELECT id, strategy, symbol, start_time, end_time,
intervals, results, created_at
FROM backtest_runs
"""
params = []
if symbol:
query += " WHERE symbol = $1"
params.append(symbol)
query += f" ORDER BY created_at DESC LIMIT ${len(params) + 1}"
params.append(limit)
rows = await conn.fetch(query, *params)
return [dict(row) for row in rows]
class BacktestRequest(BaseModel):
symbol: str = "BTC"
intervals: list[str] = ["37m"]
start_date: str = "2025-01-01" # ISO date
end_date: Optional[str] = None
async def run_backtest_task(req: BacktestRequest):
"""Background task to run backtest"""
db = DatabaseManager(
host=DB_HOST, port=DB_PORT, database=DB_NAME,
user=DB_USER, password=DB_PASSWORD
)
await db.connect()
try:
# Load configs (hardcoded for now to match main.py)
configs = [
IndicatorConfig("ma44", "sma", 44, req.intervals),
IndicatorConfig("ma125", "sma", 125, req.intervals)
]
engine = IndicatorEngine(db, configs)
brain = Brain(db, engine)
backtester = Backtester(db, engine, brain)
start = datetime.fromisoformat(req.start_date).replace(tzinfo=timezone.utc)
end = datetime.fromisoformat(req.end_date).replace(tzinfo=timezone.utc) if req.end_date else datetime.now(timezone.utc)
await backtester.run(req.symbol, req.intervals, start, end)
except Exception as e:
logger.error(f"Backtest failed: {e}")
finally:
await db.disconnect()
@app.post("/api/v1/backtests")
async def trigger_backtest(req: BacktestRequest, background_tasks: BackgroundTasks):
"""Start a backtest in the background"""
background_tasks.add_task(run_backtest_task, req)
return {"message": "Backtest started", "params": req.dict()}
@app.get("/api/v1/ta") @app.get("/api/v1/ta")
async def get_technical_analysis( async def get_technical_analysis(
symbol: str = Query("BTC", description="Trading pair symbol"), symbol: str = Query("BTC", description="Trading pair symbol"),
@ -222,42 +417,44 @@ async def get_technical_analysis(
): ):
""" """
Get technical analysis for a symbol Get technical analysis for a symbol
Calculates MA 44, MA 125, trend, support/resistance Uses stored indicators from DB if available, falls back to on-the-fly calc
""" """
try: try:
async with pool.acquire() as conn: async with pool.acquire() as conn:
# Get enough candles for MA 125 calculation # 1. Get latest price
rows = await conn.fetch(""" latest = await conn.fetchrow("""
SELECT time, open, high, low, close, volume SELECT close, time
FROM candles FROM candles
WHERE symbol = $1 AND interval = $2 WHERE symbol = $1 AND interval = $2
ORDER BY time DESC ORDER BY time DESC
LIMIT 200 LIMIT 1
""", symbol, interval) """, symbol, interval)
if len(rows) < 50: if not latest:
return { return {"error": "No candle data found"}
"symbol": symbol,
"interval": interval, current_price = float(latest['close'])
"error": "Not enough data for technical analysis", timestamp = latest['time']
"min_required": 50,
"available": len(rows)
}
# Reverse to chronological order # 2. Get latest indicators from DB
candles = list(reversed(rows)) indicators = await conn.fetch("""
closes = [float(c['close']) for c in candles] SELECT indicator_name, value
FROM indicators
WHERE symbol = $1 AND interval = $2
AND time <= $3
ORDER BY time DESC
""", symbol, interval, timestamp)
# Calculate Moving Averages # Convert list to dict, e.g. {'ma44': 65000, 'ma125': 64000}
def calculate_ma(data, period): # We take the most recent value for each indicator
if len(data) < period: ind_map = {}
return None for row in indicators:
return sum(data[-period:]) / period name = row['indicator_name']
if name not in ind_map:
ind_map[name] = float(row['value'])
ma_44 = calculate_ma(closes, 44) ma_44 = ind_map.get('ma44')
ma_125 = calculate_ma(closes, 125) ma_125 = ind_map.get('ma125')
current_price = closes[-1]
# Determine trend # Determine trend
if ma_44 and ma_125: if ma_44 and ma_125:
@ -274,24 +471,35 @@ async def get_technical_analysis(
trend = "Unknown" trend = "Unknown"
trend_strength = "Insufficient data" trend_strength = "Insufficient data"
# Find support and resistance (recent swing points) # 3. Find support/resistance (simple recent high/low)
highs = [float(c['high']) for c in candles[-20:]] rows = await conn.fetch("""
lows = [float(c['low']) for c in candles[-20:]] SELECT high, low
FROM candles
WHERE symbol = $1 AND interval = $2
ORDER BY time DESC
LIMIT 20
""", symbol, interval)
resistance = max(highs) if rows:
support = min(lows) highs = [float(r['high']) for r in rows]
lows = [float(r['low']) for r in rows]
# Calculate price position resistance = max(highs)
price_range = resistance - support support = min(lows)
if price_range > 0:
position = (current_price - support) / price_range * 100 price_range = resistance - support
if price_range > 0:
position = (current_price - support) / price_range * 100
else:
position = 50
else: else:
resistance = current_price
support = current_price
position = 50 position = 50
return { return {
"symbol": symbol, "symbol": symbol,
"interval": interval, "interval": interval,
"timestamp": datetime.utcnow().isoformat(), "timestamp": timestamp.isoformat(),
"current_price": round(current_price, 2), "current_price": round(current_price, 2),
"moving_averages": { "moving_averages": {
"ma_44": round(ma_44, 2) if ma_44 else None, "ma_44": round(ma_44, 2) if ma_44 else None,

View File

@ -4,6 +4,9 @@ from .candle_buffer import CandleBuffer
from .database import DatabaseManager from .database import DatabaseManager
from .backfill import HyperliquidBackfill from .backfill import HyperliquidBackfill
from .custom_timeframe_generator import CustomTimeframeGenerator from .custom_timeframe_generator import CustomTimeframeGenerator
from .indicator_engine import IndicatorEngine, IndicatorConfig
from .brain import Brain, Decision
from .backtester import Backtester
__all__ = [ __all__ = [
'HyperliquidWebSocket', 'HyperliquidWebSocket',
@ -11,5 +14,10 @@ __all__ = [
'CandleBuffer', 'CandleBuffer',
'DatabaseManager', 'DatabaseManager',
'HyperliquidBackfill', 'HyperliquidBackfill',
'CustomTimeframeGenerator' 'CustomTimeframeGenerator',
] 'IndicatorEngine',
'IndicatorConfig',
'Brain',
'Decision',
'Backtester'
]

View File

@ -0,0 +1,391 @@
"""
Backtester - Historical replay driver for IndicatorEngine + Brain
Iterates over stored candle data to simulate live trading decisions
"""
import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from uuid import uuid4
from .database import DatabaseManager
from .indicator_engine import IndicatorEngine, IndicatorConfig
from .brain import Brain, Decision
from .simulator import Account
from src.strategies.base import SignalType
logger = logging.getLogger(__name__)
class Backtester:
"""
Replays historical candle data through IndicatorEngine and Brain.
Uses Simulator (Account) to track PnL, leverage, and fees.
"""
def __init__(
self,
db: DatabaseManager,
indicator_engine: IndicatorEngine,
brain: Brain,
):
self.db = db
self.indicator_engine = indicator_engine
self.brain = brain
self.account = Account(initial_balance=1000.0)
async def run(
self,
symbol: str,
intervals: List[str],
start: datetime,
end: datetime,
config: Optional[Dict[str, Any]] = None,
) -> str:
"""
Run a full backtest over the given time range.
"""
backtest_id = str(uuid4())
logger.info(
f"Starting backtest {backtest_id}: {symbol} "
f"{intervals} from {start} to {end}"
)
# Reset brain state
self.brain.reset_state()
# Reset account for this run
self.account = Account(initial_balance=1000.0)
# Store the run metadata
await self._save_run_start(
backtest_id, symbol, intervals, start, end, config
)
total_decisions = 0
for interval in intervals:
# Only process intervals that have indicators configured
configured = self.indicator_engine.get_configured_intervals()
if interval not in configured:
logger.warning(
f"Skipping interval {interval}: no indicators configured"
)
continue
# Get all candle timestamps in range
timestamps = await self._get_candle_timestamps(
symbol, interval, start, end
)
if not timestamps:
logger.warning(
f"No candles found for {symbol}/{interval} in range"
)
continue
logger.info(
f"Backtest {backtest_id}: processing {len(timestamps)} "
f"{interval} candles..."
)
for i, ts in enumerate(timestamps):
# 1. Compute indicators
raw_indicators = await self.indicator_engine.compute_at(
symbol, interval, ts
)
indicators = {k: v for k, v in raw_indicators.items() if v is not None}
# 2. Get Current Position info for Strategy
current_pos = self.account.get_position_dict()
# 3. Brain Evaluate
decision: Decision = await self.brain.evaluate(
symbol=symbol,
interval=interval,
timestamp=ts,
indicators=indicators,
backtest_id=backtest_id,
current_position=current_pos
)
# 4. Execute Decision in Simulator
self._execute_decision(decision)
total_decisions += 1
if (i + 1) % 200 == 0:
logger.info(
f"Backtest {backtest_id}: {i + 1}/{len(timestamps)} "
f"{interval} candles processed. Eq: {self.account.equity:.2f}"
)
await asyncio.sleep(0.01)
# Compute and store summary results from Simulator
results = self.account.get_stats()
results['total_evaluations'] = total_decisions
await self._save_run_results(backtest_id, results)
logger.info(
f"Backtest {backtest_id} complete. Final Balance: {results['final_balance']:.2f}"
)
return backtest_id
def _execute_decision(self, decision: Decision):
"""Translate Brain decision into Account action"""
price = decision.price_at_decision
time = decision.time
# Open Long
if decision.decision_type == SignalType.OPEN_LONG.value:
self.account.open_position(time, 'long', price, leverage=1.0) # Todo: Configurable leverage
# Open Short
elif decision.decision_type == SignalType.OPEN_SHORT.value:
self.account.open_position(time, 'short', price, leverage=1.0)
# Close Long (only if we are long)
elif decision.decision_type == SignalType.CLOSE_LONG.value:
if self.account.current_position and self.account.current_position.side == 'long':
self.account.close_position(time, price)
# Close Short (only if we are short)
elif decision.decision_type == SignalType.CLOSE_SHORT.value:
if self.account.current_position and self.account.current_position.side == 'short':
self.account.close_position(time, price)
# Update equity mark-to-market
self.account.update_equity(price)
async def _get_candle_timestamps(
self,
symbol: str,
interval: str,
start: datetime,
end: datetime,
) -> List[datetime]:
"""Get all candle timestamps in a range, ordered chronologically"""
async with self.db.acquire() as conn:
rows = await conn.fetch("""
SELECT time FROM candles
WHERE symbol = $1 AND interval = $2
AND time >= $3 AND time <= $4
ORDER BY time ASC
""", symbol, interval, start, end)
return [row["time"] for row in rows]
async def _save_run_start(
self,
backtest_id: str,
symbol: str,
intervals: List[str],
start: datetime,
end: datetime,
config: Optional[Dict[str, Any]],
) -> None:
"""Store backtest run metadata at start"""
async with self.db.acquire() as conn:
await conn.execute("""
INSERT INTO backtest_runs (
id, strategy, symbol, start_time, end_time,
intervals, config
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
""",
backtest_id,
self.brain.strategy_name,
symbol,
start,
end,
intervals,
json.dumps(config) if config else None,
)
async def _compute_results(self, backtest_id, symbol):
"""Deprecated: Logic moved to Account class"""
return {}
async def _save_run_results(
self,
backtest_id: str,
results: Dict[str, Any],
) -> None:
"""Update backtest run with final results"""
# Remove trades list from stored results (can be large)
stored_results = {k: v for k, v in results.items() if k != "trades"}
async with self.db.acquire() as conn:
await conn.execute("""
UPDATE backtest_runs
SET results = $1
WHERE id = $2
""", json.dumps(stored_results), backtest_id)
async def get_run(self, backtest_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific backtest run with results"""
async with self.db.acquire() as conn:
row = await conn.fetchrow("""
SELECT id, strategy, symbol, start_time, end_time,
intervals, config, results, created_at
FROM backtest_runs
WHERE id = $1
""", backtest_id)
return dict(row) if row else None
async def list_runs(
self,
symbol: Optional[str] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""List recent backtest runs"""
async with self.db.acquire() as conn:
if symbol:
rows = await conn.fetch("""
SELECT id, strategy, symbol, start_time, end_time,
intervals, results, created_at
FROM backtest_runs
WHERE symbol = $1
ORDER BY created_at DESC
LIMIT $2
""", symbol, limit)
else:
rows = await conn.fetch("""
SELECT id, strategy, symbol, start_time, end_time,
intervals, results, created_at
FROM backtest_runs
ORDER BY created_at DESC
LIMIT $1
""", limit)
return [dict(row) for row in rows]
async def cleanup_run(self, backtest_id: str) -> int:
"""Delete all decisions and metadata for a backtest run"""
async with self.db.acquire() as conn:
result = await conn.execute("""
DELETE FROM decisions WHERE backtest_id = $1
""", backtest_id)
await conn.execute("""
DELETE FROM backtest_runs WHERE id = $1
""", backtest_id)
deleted = int(result.split()[-1]) if result else 0
logger.info(
f"Cleaned up backtest {backtest_id}: "
f"{deleted} decisions deleted"
)
return deleted
async def main():
"""CLI entry point for running backtests"""
import argparse
import os
parser = argparse.ArgumentParser(
description="Run backtest on historical data"
)
parser.add_argument(
"--symbol", default="BTC", help="Symbol (default: BTC)"
)
parser.add_argument(
"--intervals", nargs="+", default=["37m"],
help="Intervals to backtest (default: 37m)"
)
parser.add_argument(
"--start", required=True,
help="Start date (ISO format, e.g., 2025-01-01)"
)
parser.add_argument(
"--end", default=None,
help="End date (ISO format, default: now)"
)
parser.add_argument(
"--db-host", default=os.getenv("DB_HOST", "localhost"),
)
parser.add_argument(
"--db-port", type=int, default=int(os.getenv("DB_PORT", 5432)),
)
parser.add_argument(
"--db-name", default=os.getenv("DB_NAME", "btc_data"),
)
parser.add_argument(
"--db-user", default=os.getenv("DB_USER", "btc_bot"),
)
parser.add_argument(
"--db-password", default=os.getenv("DB_PASSWORD", ""),
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Parse dates
start = datetime.fromisoformat(args.start).replace(tzinfo=timezone.utc)
end = (
datetime.fromisoformat(args.end).replace(tzinfo=timezone.utc)
if args.end
else datetime.now(timezone.utc)
)
# Initialize components
db = DatabaseManager(
host=args.db_host,
port=args.db_port,
database=args.db_name,
user=args.db_user,
password=args.db_password,
)
await db.connect()
try:
# Default indicator configs (MA44 + MA125 on selected intervals)
configs = [
IndicatorConfig("ma44", "sma", 44, args.intervals),
IndicatorConfig("ma125", "sma", 125, args.intervals),
]
indicator_engine = IndicatorEngine(db, configs)
brain = Brain(db, indicator_engine)
backtester = Backtester(db, indicator_engine, brain)
# Run the backtest
backtest_id = await backtester.run(
symbol=args.symbol,
intervals=args.intervals,
start=start,
end=end,
)
# Print results
run = await backtester.get_run(backtest_id)
if run and run.get("results"):
results = json.loads(run["results"]) if isinstance(run["results"], str) else run["results"]
print("\n=== Backtest Results ===")
print(f"ID: {backtest_id}")
print(f"Strategy: {run['strategy']}")
print(f"Period: {run['start_time']} to {run['end_time']}")
print(f"Intervals: {run['intervals']}")
print(f"Total evaluations: {results.get('total_evaluations', 0)}")
print(f"Total trades: {results.get('total_trades', 0)}")
print(f"Win rate: {results.get('win_rate', 0)}%")
print(f"Total P&L: {results.get('total_pnl_pct', 0)}%")
print(f"Final Balance: {results.get('final_balance', 0)}")
finally:
await db.disconnect()
if __name__ == "__main__":
asyncio.run(main())

223
src/data_collector/brain.py Normal file
View File

@ -0,0 +1,223 @@
"""
Brain - Strategy evaluation and decision logging
Pure strategy logic separated from DB I/O for testability
"""
import json
import logging
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Dict, Optional, Any, List
import importlib
from .database import DatabaseManager
from .indicator_engine import IndicatorEngine
from src.strategies.base import BaseStrategy, StrategySignal, SignalType
logger = logging.getLogger(__name__)
# Registry of available strategies
STRATEGY_REGISTRY = {
"ma44_strategy": "src.strategies.ma44_strategy.MA44Strategy",
"ma125_strategy": "src.strategies.ma125_strategy.MA125Strategy",
}
def load_strategy(strategy_name: str) -> BaseStrategy:
"""Dynamically load a strategy class"""
if strategy_name not in STRATEGY_REGISTRY:
# Default fallback or error
logger.warning(f"Strategy {strategy_name} not found, defaulting to MA44")
strategy_name = "ma44_strategy"
module_path, class_name = STRATEGY_REGISTRY[strategy_name].rsplit('.', 1)
module = importlib.import_module(module_path)
cls = getattr(module, class_name)
return cls()
@dataclass
class Decision:
"""A single brain evaluation result"""
time: datetime
symbol: str
interval: str
decision_type: str # "buy", "sell", "hold" -> Now maps to SignalType
strategy: str
confidence: float
price_at_decision: float
indicator_snapshot: Dict[str, Any]
candle_snapshot: Dict[str, Any]
reasoning: str
backtest_id: Optional[str] = None
def to_db_tuple(self) -> tuple:
"""Convert to positional tuple for DB insert"""
return (
self.time,
self.symbol,
self.interval,
self.decision_type,
self.strategy,
self.confidence,
self.price_at_decision,
json.dumps(self.indicator_snapshot),
json.dumps(self.candle_snapshot),
self.reasoning,
self.backtest_id,
)
class Brain:
"""
Evaluates market conditions using a loaded Strategy.
"""
def __init__(
self,
db: DatabaseManager,
indicator_engine: IndicatorEngine,
strategy: str = "ma44_strategy",
):
self.db = db
self.indicator_engine = indicator_engine
self.strategy_name = strategy
self.active_strategy: BaseStrategy = load_strategy(strategy)
logger.info(f"Brain initialized with strategy: {self.active_strategy.name}")
async def evaluate(
self,
symbol: str,
interval: str,
timestamp: datetime,
indicators: Optional[Dict[str, float]] = None,
backtest_id: Optional[str] = None,
current_position: Optional[Dict[str, Any]] = None,
) -> Decision:
"""
Evaluate market conditions and produce a decision.
"""
# Get indicator values
if indicators is None:
indicators = await self.indicator_engine.get_values_at(
symbol, interval, timestamp
)
# Get the triggering candle
candle = await self._get_candle(symbol, interval, timestamp)
if not candle:
return self._create_empty_decision(timestamp, symbol, interval, indicators, backtest_id)
price = float(candle["close"])
candle_dict = {
"time": candle["time"].isoformat(),
"open": float(candle["open"]),
"high": float(candle["high"]),
"low": float(candle["low"]),
"close": price,
"volume": float(candle["volume"]),
}
# Delegate to Strategy
signal: StrategySignal = self.active_strategy.analyze(
candle_dict, indicators, current_position
)
# Build decision
decision = Decision(
time=timestamp,
symbol=symbol,
interval=interval,
decision_type=signal.type.value,
strategy=self.strategy_name,
confidence=signal.confidence,
price_at_decision=price,
indicator_snapshot=indicators,
candle_snapshot=candle_dict,
reasoning=signal.reasoning,
backtest_id=backtest_id,
)
# Store to DB
await self._store_decision(decision)
return decision
def _create_empty_decision(self, timestamp, symbol, interval, indicators, backtest_id):
return Decision(
time=timestamp,
symbol=symbol,
interval=interval,
decision_type="hold",
strategy=self.strategy_name,
confidence=0.0,
price_at_decision=0.0,
indicator_snapshot=indicators or {},
candle_snapshot={},
reasoning="No candle data available",
backtest_id=backtest_id,
)
async def _get_candle(
self,
symbol: str,
interval: str,
timestamp: datetime,
) -> Optional[Dict[str, Any]]:
"""Fetch a specific candle from the database"""
async with self.db.acquire() as conn:
row = await conn.fetchrow("""
SELECT time, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2 AND time = $3
""", symbol, interval, timestamp)
return dict(row) if row else None
async def _store_decision(self, decision: Decision) -> None:
"""Write decision to the decisions table"""
# Note: We might want to skip writing every single HOLD to DB to save space if simulating millions of candles
# But keeping it for now for full traceability
async with self.db.acquire() as conn:
await conn.execute("""
INSERT INTO decisions (
time, symbol, interval, decision_type, strategy,
confidence, price_at_decision, indicator_snapshot,
candle_snapshot, reasoning, backtest_id
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
""", *decision.to_db_tuple())
async def get_recent_decisions(
self,
symbol: str,
limit: int = 20,
backtest_id: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Get recent decisions, optionally filtered by backtest_id"""
async with self.db.acquire() as conn:
if backtest_id is not None:
rows = await conn.fetch("""
SELECT time, symbol, interval, decision_type, strategy,
confidence, price_at_decision, indicator_snapshot,
candle_snapshot, reasoning, backtest_id
FROM decisions
WHERE symbol = $1 AND backtest_id = $2
ORDER BY time DESC
LIMIT $3
""", symbol, backtest_id, limit)
else:
rows = await conn.fetch("""
SELECT time, symbol, interval, decision_type, strategy,
confidence, price_at_decision, indicator_snapshot,
candle_snapshot, reasoning, backtest_id
FROM decisions
WHERE symbol = $1 AND backtest_id IS NULL
ORDER BY time DESC
LIMIT $2
""", symbol, limit)
return [dict(row) for row in rows]
def reset_state(self) -> None:
"""Reset internal state tracking"""
pass

View File

@ -0,0 +1,285 @@
"""
Indicator Engine - Computes and stores technical indicators
Stateless DB-backed design: same code for live updates and backtesting
"""
import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from .database import DatabaseManager
logger = logging.getLogger(__name__)
@dataclass
class IndicatorConfig:
"""Configuration for a single indicator"""
name: str # e.g., "ma44"
type: str # e.g., "sma"
period: int # e.g., 44
intervals: List[str] # e.g., ["37m", "148m", "1d"]
@classmethod
def from_dict(cls, name: str, data: Dict[str, Any]) -> "IndicatorConfig":
"""Create config from YAML dict entry"""
return cls(
name=name,
type=data["type"],
period=data["period"],
intervals=data["intervals"],
)
@dataclass
class IndicatorResult:
"""Result of a single indicator computation"""
name: str
value: Optional[float]
period: int
timestamp: datetime
class IndicatorEngine:
"""
Computes technical indicators from candle data in the database.
Two modes, same math:
- on_interval_update(): called by live system after higher-TF candle update
- compute_at(): called by backtester for a specific point in time
Both query the DB for the required candle history and store results.
"""
def __init__(self, db: DatabaseManager, configs: List[IndicatorConfig]):
self.db = db
self.configs = configs
# Build lookup: interval -> list of configs that need computation
self._interval_configs: Dict[str, List[IndicatorConfig]] = {}
for cfg in configs:
for interval in cfg.intervals:
if interval not in self._interval_configs:
self._interval_configs[interval] = []
self._interval_configs[interval].append(cfg)
logger.info(
f"IndicatorEngine initialized with {len(configs)} indicators "
f"across intervals: {list(self._interval_configs.keys())}"
)
def get_configured_intervals(self) -> List[str]:
"""Return all intervals that have indicators configured"""
return list(self._interval_configs.keys())
async def on_interval_update(
self,
symbol: str,
interval: str,
timestamp: datetime,
) -> Dict[str, Optional[float]]:
"""
Compute all indicators configured for this interval.
Called by main.py after CustomTimeframeGenerator updates a higher TF.
Returns dict of indicator_name -> value (for use by Brain).
"""
configs = self._interval_configs.get(interval, [])
if not configs:
return {}
return await self._compute_and_store(symbol, interval, timestamp, configs)
async def compute_at(
self,
symbol: str,
interval: str,
timestamp: datetime,
) -> Dict[str, Optional[float]]:
"""
Compute indicators at a specific point in time.
Alias for on_interval_update -- used by backtester for clarity.
"""
return await self.on_interval_update(symbol, interval, timestamp)
async def compute_historical(
self,
symbol: str,
interval: str,
start: datetime,
end: datetime,
) -> int:
"""
Batch-compute indicators for a time range.
Iterates over every candle timestamp in [start, end] and computes.
Returns total number of indicator values stored.
"""
configs = self._interval_configs.get(interval, [])
if not configs:
logger.warning(f"No indicators configured for interval {interval}")
return 0
# Get all candle timestamps in range
async with self.db.acquire() as conn:
rows = await conn.fetch("""
SELECT time FROM candles
WHERE symbol = $1 AND interval = $2
AND time >= $3 AND time <= $4
ORDER BY time ASC
""", symbol, interval, start, end)
if not rows:
logger.warning(f"No candles found for {symbol}/{interval} in range")
return 0
timestamps = [row["time"] for row in rows]
total_stored = 0
logger.info(
f"Computing {len(configs)} indicators across "
f"{len(timestamps)} {interval} candles..."
)
for i, ts in enumerate(timestamps):
results = await self._compute_and_store(symbol, interval, ts, configs)
total_stored += sum(1 for v in results.values() if v is not None)
if (i + 1) % 100 == 0:
logger.info(f"Progress: {i + 1}/{len(timestamps)} candles processed")
await asyncio.sleep(0.01) # Yield to event loop
logger.info(
f"Historical compute complete: {total_stored} indicator values "
f"stored for {interval}"
)
return total_stored
async def _compute_and_store(
self,
symbol: str,
interval: str,
timestamp: datetime,
configs: List[IndicatorConfig],
) -> Dict[str, Optional[float]]:
"""Core computation: fetch candles, compute indicators, store results"""
# Determine max lookback needed
max_period = max(cfg.period for cfg in configs)
# Fetch enough candles for the longest indicator
async with self.db.acquire() as conn:
rows = await conn.fetch("""
SELECT time, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2
AND time <= $3
ORDER BY time DESC
LIMIT $4
""", symbol, interval, timestamp, max_period)
if not rows:
return {cfg.name: None for cfg in configs}
# Reverse to chronological order
candles = list(reversed(rows))
closes = [float(c["close"]) for c in candles]
# Compute each indicator
results: Dict[str, Optional[float]] = {}
values_to_store: List[tuple] = []
for cfg in configs:
value = self._compute_indicator(cfg, closes)
results[cfg.name] = value
if value is not None:
values_to_store.append((
timestamp,
symbol,
interval,
cfg.name,
value,
json.dumps({"type": cfg.type, "period": cfg.period}),
))
# Batch upsert all computed values
if values_to_store:
async with self.db.acquire() as conn:
await conn.executemany("""
INSERT INTO indicators (time, symbol, interval, indicator_name, value, parameters)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (time, symbol, interval, indicator_name)
DO UPDATE SET
value = EXCLUDED.value,
parameters = EXCLUDED.parameters,
computed_at = NOW()
""", values_to_store)
logger.debug(
f"Stored {len(values_to_store)} indicator values for "
f"{symbol}/{interval} at {timestamp}"
)
return results
def _compute_indicator(
self,
config: IndicatorConfig,
closes: List[float],
) -> Optional[float]:
"""Dispatch to the correct computation function"""
if config.type == "sma":
return self.compute_sma(closes, config.period)
else:
logger.warning(f"Unknown indicator type: {config.type}")
return None
# ── Pure math functions (no DB, no async, easily testable) ──────────
@staticmethod
def compute_sma(closes: List[float], period: int) -> Optional[float]:
"""Simple Moving Average over the last `period` closes"""
if len(closes) < period:
return None
return sum(closes[-period:]) / period
async def get_latest_values(
self,
symbol: str,
interval: str,
) -> Dict[str, float]:
"""
Get the most recent indicator values for a symbol/interval.
Used by Brain to read current state.
"""
async with self.db.acquire() as conn:
rows = await conn.fetch("""
SELECT DISTINCT ON (indicator_name)
indicator_name, value, time
FROM indicators
WHERE symbol = $1 AND interval = $2
ORDER BY indicator_name, time DESC
""", symbol, interval)
return {row["indicator_name"]: float(row["value"]) for row in rows}
async def get_values_at(
self,
symbol: str,
interval: str,
timestamp: datetime,
) -> Dict[str, float]:
"""
Get indicator values at a specific timestamp.
Used by Brain during backtesting.
"""
async with self.db.acquire() as conn:
rows = await conn.fetch("""
SELECT indicator_name, value
FROM indicators
WHERE symbol = $1 AND interval = $2 AND time = $3
""", symbol, interval, timestamp)
return {row["indicator_name"]: float(row["value"]) for row in rows}

View File

@ -1,6 +1,6 @@
""" """
Main entry point for data collector service Main entry point for data collector service
Integrates WebSocket client, buffer, and database Integrates WebSocket client, buffer, database, indicators, and brain
""" """
import asyncio import asyncio
@ -8,13 +8,17 @@ import logging
import signal import signal
import sys import sys
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Optional, List
import os import os
import yaml
from .websocket_client import HyperliquidWebSocket, Candle from .websocket_client import HyperliquidWebSocket, Candle
from .candle_buffer import CandleBuffer from .candle_buffer import CandleBuffer
from .database import DatabaseManager from .database import DatabaseManager
from .custom_timeframe_generator import CustomTimeframeGenerator from .custom_timeframe_generator import CustomTimeframeGenerator
from .indicator_engine import IndicatorEngine, IndicatorConfig
from .brain import Brain
# Configure logging # Configure logging
@ -68,6 +72,17 @@ class DataCollector:
self.custom_tf_generator = CustomTimeframeGenerator(self.db) self.custom_tf_generator = CustomTimeframeGenerator(self.db)
await self.custom_tf_generator.initialize() await self.custom_tf_generator.initialize()
# Initialize indicator engine
# Hardcoded config for now, eventually load from yaml
indicator_configs = [
IndicatorConfig("ma44", "sma", 44, ["37m", "148m", "1d"]),
IndicatorConfig("ma125", "sma", 125, ["37m", "148m", "1d"])
]
self.indicator_engine = IndicatorEngine(self.db, indicator_configs)
# Initialize brain
self.brain = Brain(self.db, self.indicator_engine)
# Initialize buffer # Initialize buffer
self.buffer = CandleBuffer( self.buffer = CandleBuffer(
max_size=1000, max_size=1000,
@ -166,12 +181,47 @@ class DataCollector:
raise # Re-raise to trigger buffer retry raise # Re-raise to trigger buffer retry
async def _update_custom_timeframes(self, candles: list) -> None: async def _update_custom_timeframes(self, candles: list) -> None:
"""Update custom timeframes in background (non-blocking)""" """
Update custom timeframes in background, then trigger indicators/brain.
This chain ensures that indicators are computed on fresh candle data,
and the brain evaluates on fresh indicator data.
"""
try: try:
# 1. Update custom candles (37m, 148m, etc.)
await self.custom_tf_generator.update_realtime(candles) await self.custom_tf_generator.update_realtime(candles)
logger.debug("Custom timeframes updated") logger.debug("Custom timeframes updated")
# 2. Trigger indicator updates for configured intervals
# We use the timestamp of the last 1m candle as the trigger point
trigger_time = candles[-1].time
if self.indicator_engine:
intervals = self.indicator_engine.get_configured_intervals()
for interval in intervals:
# Get the correct bucket start time for this interval
# e.g., if trigger_time is 09:48:00, 37m bucket might start at 09:25:00
if self.custom_tf_generator:
bucket_start = self.custom_tf_generator.get_bucket_start(trigger_time, interval)
else:
bucket_start = trigger_time
# Compute indicators for this bucket
raw_indicators = await self.indicator_engine.on_interval_update(
self.symbol, interval, bucket_start
)
# Filter out None values to satisfy type checker
indicators = {k: v for k, v in raw_indicators.items() if v is not None}
# 3. Evaluate brain if we have fresh indicators
if self.brain and indicators:
await self.brain.evaluate(
self.symbol, interval, bucket_start, indicators
)
except Exception as e: except Exception as e:
logger.error(f"Failed to update custom timeframes: {e}") logger.error(f"Failed to update custom timeframes/indicators: {e}")
# Don't raise - this is non-critical # Don't raise - this is non-critical
async def _on_error(self, error: Exception) -> None: async def _on_error(self, error: Exception) -> None:

View File

@ -0,0 +1,160 @@
"""
Simulator
Handles account accounting, leverage, fees, and position management for backtesting.
"""
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from datetime import datetime
from .brain import Decision # We might need to decouple this later, but reusing for now
@dataclass
class Trade:
entry_time: datetime
exit_time: Optional[datetime]
side: str # 'long' or 'short'
entry_price: float
exit_price: Optional[float]
size: float # Quantity of asset
leverage: float
pnl: float = 0.0
pnl_percent: float = 0.0
fees: float = 0.0
status: str = 'open' # 'open', 'closed'
class Account:
def __init__(self, initial_balance: float = 1000.0, maker_fee: float = 0.0002, taker_fee: float = 0.0005):
self.initial_balance = initial_balance
self.balance = initial_balance
self.equity = initial_balance
self.maker_fee = maker_fee
self.taker_fee = taker_fee
self.trades: List[Trade] = []
self.current_position: Optional[Trade] = None
self.margin_used = 0.0
def update_equity(self, current_price: float):
"""Update equity based on unrealized PnL of current position"""
if not self.current_position:
self.equity = self.balance
return
trade = self.current_position
if trade.side == 'long':
unrealized_pnl = (current_price - trade.entry_price) * trade.size
else:
unrealized_pnl = (trade.entry_price - current_price) * trade.size
self.equity = self.balance + unrealized_pnl
def open_position(self, time: datetime, side: str, price: float, leverage: float = 1.0, portion: float = 1.0):
"""
Open a position.
portion: 0.0 to 1.0 (percentage of available balance to use)
"""
if self.current_position:
# Already have a position, ignore for now (or could add to it)
return
# Calculate position size
# Margin = (Balance * portion)
# Position Value = Margin * Leverage
# Size = Position Value / Price
margin_to_use = self.balance * portion
position_value = margin_to_use * leverage
size = position_value / price
# Fee (Taker)
fee = position_value * self.taker_fee
self.balance -= fee # Deduct fee immediately
self.current_position = Trade(
entry_time=time,
exit_time=None,
side=side,
entry_price=price,
exit_price=None,
size=size,
leverage=leverage,
fees=fee
)
self.margin_used = margin_to_use
def close_position(self, time: datetime, price: float):
"""Close the current position"""
if not self.current_position:
return
trade = self.current_position
position_value = trade.size * price
# Calculate PnL
if trade.side == 'long':
pnl = (price - trade.entry_price) * trade.size
pnl_pct = (price - trade.entry_price) / trade.entry_price * trade.leverage * 100
else:
pnl = (trade.entry_price - price) * trade.size
pnl_pct = (trade.entry_price - price) / trade.entry_price * trade.leverage * 100
# Fee (Taker)
fee = position_value * self.taker_fee
self.balance -= fee
trade.fees += fee
# Update Balance
self.balance += pnl
self.margin_used = 0.0
# Update Trade Record
trade.exit_time = time
trade.exit_price = price
trade.pnl = pnl
trade.pnl_percent = pnl_pct
trade.status = 'closed'
self.trades.append(trade)
self.current_position = None
self.equity = self.balance
def get_position_dict(self) -> Optional[Dict[str, Any]]:
if not self.current_position:
return None
return {
'type': self.current_position.side,
'entry_price': self.current_position.entry_price,
'size': self.current_position.size,
'leverage': self.current_position.leverage
}
def get_stats(self) -> Dict[str, Any]:
wins = [t for t in self.trades if t.pnl > 0]
losses = [t for t in self.trades if t.pnl <= 0]
total_pnl = self.balance - self.initial_balance
total_pnl_pct = (total_pnl / self.initial_balance) * 100
return {
"initial_balance": self.initial_balance,
"final_balance": self.balance,
"total_pnl": total_pnl,
"total_pnl_pct": total_pnl_pct,
"total_trades": len(self.trades),
"win_count": len(wins),
"loss_count": len(losses),
"win_rate": (len(wins) / len(self.trades) * 100) if self.trades else 0.0,
"max_drawdown": 0.0, # Todo: implement DD tracking
"trades": [
{
"entry_time": t.entry_time.isoformat(),
"exit_time": t.exit_time.isoformat() if t.exit_time else None,
"side": t.side,
"entry_price": t.entry_price,
"exit_price": t.exit_price,
"pnl": t.pnl,
"pnl_pct": t.pnl_percent,
"fees": t.fees
}
for t in self.trades
]
}

68
src/strategies/base.py Normal file
View File

@ -0,0 +1,68 @@
"""
Base Strategy Interface
All strategies must inherit from this class.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from enum import Enum
class SignalType(Enum):
OPEN_LONG = "open_long"
OPEN_SHORT = "open_short"
CLOSE_LONG = "close_long"
CLOSE_SHORT = "close_short"
HOLD = "hold"
@dataclass
class StrategySignal:
type: SignalType
confidence: float
reasoning: str
class BaseStrategy(ABC):
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
@property
@abstractmethod
def name(self) -> str:
"""Unique identifier for the strategy"""
pass
@property
@abstractmethod
def required_indicators(self) -> List[str]:
"""List of indicator names required by this strategy (e.g. ['ma44'])"""
pass
@property
@abstractmethod
def display_name(self) -> str:
"""User-friendly name for display in UI (e.g. 'MA44 Crossover')"""
pass
@property
@abstractmethod
def description(self) -> str:
"""Detailed description of how the strategy works"""
pass
@abstractmethod
def analyze(
self,
candle: Dict[str, Any],
indicators: Dict[str, float],
current_position: Optional[Dict[str, Any]] = None
) -> StrategySignal:
"""
Analyze market data and return a trading signal.
Args:
candle: Dictionary containing 'close', 'open', 'high', 'low', 'volume', 'time'
indicators: Dictionary of pre-computed indicator values
current_position: Details about current open position (if any)
{'type': 'long'/'short', 'entry_price': float, 'size': float}
"""
pass

View File

@ -0,0 +1,63 @@
"""
MA125 Strategy
Simple trend following strategy.
- Long when Price > MA125
- Short when Price < MA125
"""
from typing import Dict, Any, List, Optional
from .base import BaseStrategy, StrategySignal, SignalType
class MA125Strategy(BaseStrategy):
@property
def name(self) -> str:
return "ma125_strategy"
@property
def required_indicators(self) -> List[str]:
return ["ma125"]
@property
def display_name(self) -> str:
return "MA125 Strategy"
@property
def description(self) -> str:
return "Long-term trend following using 125-period moving average. Better for identifying major trends."
def analyze(
self,
candle: Dict[str, Any],
indicators: Dict[str, float],
current_position: Optional[Dict[str, Any]] = None
) -> StrategySignal:
price = candle['close']
ma125 = indicators.get('ma125')
if ma125 is None:
return StrategySignal(SignalType.HOLD, 0.0, "MA125 not available")
# Current position state
is_long = current_position and current_position.get('type') == 'long'
is_short = current_position and current_position.get('type') == 'short'
# Logic: Price > MA125 -> Bullish
if price > ma125:
if is_long:
return StrategySignal(SignalType.HOLD, 1.0, f"Price {price:.2f} > MA125 {ma125:.2f}. Stay Long.")
elif is_short:
return StrategySignal(SignalType.CLOSE_SHORT, 1.0, f"Price {price:.2f} crossed above MA125 {ma125:.2f}. Close Short.")
else:
return StrategySignal(SignalType.OPEN_LONG, 1.0, f"Price {price:.2f} > MA125 {ma125:.2f}. Open Long.")
# Logic: Price < MA125 -> Bearish
elif price < ma125:
if is_short:
return StrategySignal(SignalType.HOLD, 1.0, f"Price {price:.2f} < MA125 {ma125:.2f}. Stay Short.")
elif is_long:
return StrategySignal(SignalType.CLOSE_LONG, 1.0, f"Price {price:.2f} crossed below MA125 {ma125:.2f}. Close Long.")
else:
return StrategySignal(SignalType.OPEN_SHORT, 1.0, f"Price {price:.2f} < MA125 {ma125:.2f}. Open Short.")
return StrategySignal(SignalType.HOLD, 0.0, "Price == MA125")

View File

@ -0,0 +1,63 @@
"""
MA44 Strategy
Simple trend following strategy.
- Long when Price > MA44
- Short when Price < MA44
"""
from typing import Dict, Any, List, Optional
from .base import BaseStrategy, StrategySignal, SignalType
class MA44Strategy(BaseStrategy):
@property
def name(self) -> str:
return "ma44_strategy"
@property
def required_indicators(self) -> List[str]:
return ["ma44"]
@property
def display_name(self) -> str:
return "MA44 Strategy"
@property
def description(self) -> str:
return "Buy when price crosses above MA44, sell when below. Good for trending markets."
def analyze(
self,
candle: Dict[str, Any],
indicators: Dict[str, float],
current_position: Optional[Dict[str, Any]] = None
) -> StrategySignal:
price = candle['close']
ma44 = indicators.get('ma44')
if ma44 is None:
return StrategySignal(SignalType.HOLD, 0.0, "MA44 not available")
# Current position state
is_long = current_position and current_position.get('type') == 'long'
is_short = current_position and current_position.get('type') == 'short'
# Logic: Price > MA44 -> Bullish
if price > ma44:
if is_long:
return StrategySignal(SignalType.HOLD, 1.0, f"Price {price:.2f} > MA44 {ma44:.2f}. Stay Long.")
elif is_short:
return StrategySignal(SignalType.CLOSE_SHORT, 1.0, f"Price {price:.2f} crossed above MA44 {ma44:.2f}. Close Short.")
else:
return StrategySignal(SignalType.OPEN_LONG, 1.0, f"Price {price:.2f} > MA44 {ma44:.2f}. Open Long.")
# Logic: Price < MA44 -> Bearish
elif price < ma44:
if is_short:
return StrategySignal(SignalType.HOLD, 1.0, f"Price {price:.2f} < MA44 {ma44:.2f}. Stay Short.")
elif is_long:
return StrategySignal(SignalType.CLOSE_LONG, 1.0, f"Price {price:.2f} crossed below MA44 {ma44:.2f}. Close Long.")
else:
return StrategySignal(SignalType.OPEN_SHORT, 1.0, f"Price {price:.2f} < MA44 {ma44:.2f}. Open Short.")
return StrategySignal(SignalType.HOLD, 0.0, "Price == MA44")