From 933537d7595e4090425fd6d0c2addd562ffc41b4 Mon Sep 17 00:00:00 2001 From: BTC Bot Date: Wed, 11 Feb 2026 22:27:51 +0100 Subject: [PATCH] Initial commit: BTC Bot with dashboard, TA analysis, and 14 timeframes --- .env.example | 28 + .gitignore | 59 ++ 466c0182639b | 0 7778c368c489 | 0 AGENTS.md | 223 +++++++ MIGRATION_CHECKLIST.md | 120 ++++ PROJECT_CONTEXT.md | 439 ++++++++++++ README.md | 305 +++++++++ Running | 0 SYNOLOGY_INSTALL.md | 251 +++++++ config/data_config.yaml | 85 +++ docker/Dockerfile.api | 22 + docker/Dockerfile.collector | 20 + docker/Dockerfile.timescaledb | 1 + docker/docker-compose.yml | 83 +++ docker/init-scripts/01-schema.sql | 139 ++++ docker/init-scripts/02-optimization.sql | 43 ++ docker/timescaledb.conf | 40 ++ requirements.txt | 23 + scripts/backfill.sh | 36 + scripts/backup.sh | 37 ++ scripts/deploy.sh | 59 ++ scripts/health_check.sh | 31 + scripts/verify_files.sh | 33 + src/api/dashboard/static/index.html | 842 ++++++++++++++++++++++++ src/api/server.py | 376 +++++++++++ src/data_collector/__init__.py | 13 + src/data_collector/backfill.py | 366 ++++++++++ src/data_collector/candle_buffer.py | 224 +++++++ src/data_collector/database.py | 255 +++++++ src/data_collector/main.py | 236 +++++++ src/data_collector/websocket_client.py | 300 +++++++++ 32 files changed, 4689 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 466c0182639b create mode 100644 7778c368c489 create mode 100644 AGENTS.md create mode 100644 MIGRATION_CHECKLIST.md create mode 100644 PROJECT_CONTEXT.md create mode 100644 README.md create mode 100644 Running create mode 100644 SYNOLOGY_INSTALL.md create mode 100644 config/data_config.yaml create mode 100644 docker/Dockerfile.api create mode 100644 docker/Dockerfile.collector create mode 100644 docker/Dockerfile.timescaledb create mode 100644 docker/docker-compose.yml create mode 100644 docker/init-scripts/01-schema.sql create mode 100644 docker/init-scripts/02-optimization.sql create mode 100644 docker/timescaledb.conf create mode 100644 requirements.txt create mode 100644 scripts/backfill.sh create mode 100644 scripts/backup.sh create mode 100644 scripts/deploy.sh create mode 100644 scripts/health_check.sh create mode 100644 scripts/verify_files.sh create mode 100644 src/api/dashboard/static/index.html create mode 100644 src/api/server.py create mode 100644 src/data_collector/__init__.py create mode 100644 src/data_collector/backfill.py create mode 100644 src/data_collector/candle_buffer.py create mode 100644 src/data_collector/database.py create mode 100644 src/data_collector/main.py create mode 100644 src/data_collector/websocket_client.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ee094a7 --- /dev/null +++ b/.env.example @@ -0,0 +1,28 @@ +# BTC Bot Environment Configuration +# Copy this file to .env and fill in your values + +# Database Configuration +DB_HOST=timescaledb +DB_PORT=5432 +DB_NAME=btc_data +DB_USER=btc_bot +DB_PASSWORD=change_this_to_secure_password + +# Hyperliquid Configuration +HYPERLIQUID_API_KEY=optional_for_trading +HYPERLIQUID_API_SECRET=optional_for_trading + +# Base RPC (for validation) +BASE_RPC_URL=https://base-mainnet.g.alchemy.com/v2/YOUR_ALCHEMY_API_KEY + +# Coinbase API (for validation) +COINBASE_API_KEY=optional + +# Telegram Notifications (optional) +TELEGRAM_BOT_TOKEN=your_bot_token_here +TELEGRAM_CHAT_ID=your_chat_id_here + +# API Server +API_HOST=0.0.0.0 +API_PORT=8000 +API_SECRET_KEY=change_this_secret_key \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b766298 --- /dev/null +++ b/.gitignore @@ -0,0 +1,59 @@ +# Environment variables +.env +.env.local +.env.*.local + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +ENV/ +env/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Docker +.docker/ + +# Data directories +data/ +backups/ +logs/ +exports/ + +# Temporary files +*.tmp +*.log +.DS_Store +Thumbs.db + +# Node modules (if any) +node_modules/ + +# Local development +*.local diff --git a/466c0182639b b/466c0182639b new file mode 100644 index 0000000..e69de29 diff --git a/7778c368c489 b/7778c368c489 new file mode 100644 index 0000000..e69de29 diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..54c1525 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,223 @@ +# AGENTS.md - AI Coding Assistant Guidelines + +## Project Overview +BTC Accumulation Bot - Data Collection Phase. High-performance async data collection +system for cbBTC on Hyperliquid with TimescaleDB storage. Python 3.11, asyncio, +FastAPI, asyncpg, WebSockets. + +## Build/Run Commands + +### Docker (Primary deployment - Synology DS218+) +```bash +# Build and start all services (timescaledb, data_collector, api_server) +cd docker && docker-compose up -d --build + +# View logs +docker-compose logs -f data_collector +docker-compose logs -f api_server + +# Full deploy (creates dirs, pulls, builds, starts) +bash scripts/deploy.sh +``` + +### Development +```bash +# API server (requires DB running) +cd src/api && uvicorn server:app --reload --host 0.0.0.0 --port 8000 +# Docs: http://localhost:8000/docs | Dashboard: http://localhost:8000/dashboard + +# Data collector +cd src/data_collector && python -m data_collector.main +``` + +### Testing +```bash +# Run all tests +pytest + +# Run a specific test file +pytest tests/data_collector/test_websocket_client.py + +# Run a single test by name +pytest tests/data_collector/test_websocket_client.py::test_websocket_connection -v + +# Run with coverage +pytest --cov=src --cov-report=html +``` +Note: The tests/ directory structure exists but test files have not been written yet. +When creating tests, use pytest with pytest-asyncio for async test support. + +### Linting & Formatting +```bash +# No config files exist for these tools; use these flags: +flake8 src/ --max-line-length=100 --extend-ignore=E203,W503 +black --check src/ # Check formatting +black src/ # Auto-format +mypy src/ --ignore-missing-imports +``` + +## Project Structure +``` +src/ +├── data_collector/ # WebSocket client, buffer, database +│ ├── __init__.py +│ ├── main.py # Entry point, orchestration, signal handling +│ ├── websocket_client.py # Hyperliquid WS client, Candle dataclass +│ ├── candle_buffer.py # Circular buffer with async flush +│ ├── database.py # asyncpg/TimescaleDB interface +│ └── backfill.py # Historical data backfill from REST API +└── api/ + ├── server.py # FastAPI app, all endpoints + └── dashboard/static/ + └── index.html # Real-time web dashboard +config/data_config.yaml # Non-secret operational config +docker/ +├── docker-compose.yml # 3-service orchestration +├── Dockerfile.api / .collector # python:3.11-slim based +└── init-scripts/ # 01-schema.sql, 02-optimization.sql +scripts/ # deploy.sh, backup.sh, health_check.sh, backfill.sh +tests/data_collector/ # Test directory (empty - tests not yet written) +``` + +## Code Style Guidelines + +### Imports +Group in this order, separated by blank lines: +1. Standard library (`import asyncio`, `from datetime import datetime`) +2. Third-party (`import websockets`, `import asyncpg`, `from fastapi import FastAPI`) +3. Local/relative (`from .websocket_client import Candle`) + +Use relative imports (`.module`) within the `data_collector` package. +Use absolute imports for third-party packages. + +### Formatting +- Line length: 100 characters max +- Indentation: 4 spaces +- Strings: double quotes (single only to avoid escaping) +- Trailing commas in multi-line collections +- Formatter: black + +### Type Hints +- Required on all function parameters and return values +- `Optional[Type]` for nullable values +- `List[Type]`, `Dict[str, Any]` from `typing` module +- `@dataclass` for data-holding classes (e.g., `Candle`, `BufferStats`) +- Callable types for callbacks: `Callable[[Candle], Awaitable[None]]` + +### Naming Conventions +- Classes: `PascalCase` (DataCollector, CandleBuffer) +- Functions/variables: `snake_case` (get_candles, buffer_size) +- Constants: `UPPER_SNAKE_CASE` (DB_HOST, MAX_BUFFER_SIZE) +- Private methods: `_leading_underscore` (_handle_reconnect, _flush_loop) + +### Docstrings +- Triple double quotes on all modules, classes, and public methods +- Brief one-line description on first line +- Optional blank line + detail if needed +- No Args/Returns sections (not strict Google-style) +```python +"""Add a candle to the buffer +Returns True if added, False if buffer full and candle dropped""" +``` + +### Error Handling +- `try/except` with specific exceptions (never bare `except:`) +- Log errors with `logger.error()` before re-raising in critical paths +- Catch `asyncio.CancelledError` to break loops cleanly +- Use `finally` blocks for cleanup (always call `self.stop()`) +- Use `@asynccontextmanager` for resource acquisition (DB connections) + +### Async Patterns +- `async/await` for all I/O operations +- `asyncio.Lock()` for thread-safe buffer access +- `asyncio.Event()` for stop/flush coordination +- `asyncio.create_task()` for background loops +- `asyncio.gather(*tasks, return_exceptions=True)` for parallel cleanup +- `asyncio.wait_for(coro, timeout)` for graceful shutdown +- `asyncio.run(main())` as the entry point + +### Logging +- Module-level: `logger = logging.getLogger(__name__)` in every file +- Format: `'%(asctime)s - %(name)s - %(levelname)s - %(message)s'` +- Log level from env: `getattr(logging, os.getenv('LOG_LEVEL', 'INFO'))` +- Use f-strings in log messages with relevant context +- Levels: DEBUG (candle receipt), INFO (lifecycle), WARNING (gaps), ERROR (failures) + +### Database (asyncpg + TimescaleDB) +- Connection pool: `asyncpg.create_pool(min_size=1, max_size=N)` +- `@asynccontextmanager` wrapper for connection acquisition +- Batch inserts with `executemany()` +- Upserts with `ON CONFLICT ... DO UPDATE` +- Positional params: `$1, $2, ...` (not `%s`) +- Use `conn.fetch()`, `conn.fetchrow()`, `conn.fetchval()` for results + +### Configuration +- Secrets via environment variables (`os.getenv('DB_PASSWORD')`) +- Non-secret config in `config/data_config.yaml` +- Constructor defaults fall back to env vars +- Never commit `.env` files (contains real credentials) + +## Common Tasks + +### Add New API Endpoint +1. Add route in `src/api/server.py` with `@app.get()`/`@app.post()` +2. Type-hint query params with `Query()`; return `dict` or raise `HTTPException` +3. Use `asyncpg` pool for database queries + +### Add New Data Source +1. Create module in `src/data_collector/` following `websocket_client.py` pattern +2. Implement async `connect()`, `disconnect()`, `receive()` methods +3. Use callback architecture: `on_data`, `on_error` callables + +### Database Schema Changes +1. Update `docker/init-scripts/01-schema.sql` +2. Update `DatabaseManager` methods in `src/data_collector/database.py` +3. Rebuild: `docker-compose down -v && docker-compose up -d --build` + +### Writing Tests +1. Create test files in `tests/data_collector/` (e.g., `test_websocket_client.py`) +2. Use `pytest-asyncio` for async tests: `@pytest.mark.asyncio` +3. Mock external services (WebSocket, database) with `unittest.mock` +4. Descriptive names: `test_websocket_reconnection_with_backoff` + +### Historical Data Backfill +The `backfill.py` module downloads historical candle data from Hyperliquid's REST API. + +**API Limitations:** +- Max 5000 candles per coin/interval combination +- 500 candles per response (requires pagination) +- Available intervals: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 8h, 12h, 1d, 3d, 1w, 1M + +**Usage - Python Module:** +```python +from data_collector.backfill import HyperliquidBackfill +from data_collector.database import DatabaseManager + +async with HyperliquidBackfill(db, coin="BTC", intervals=["1m", "1h"]) as backfill: + # Backfill last 7 days for all configured intervals + results = await backfill.backfill_all_intervals(days_back=7) + + # Or backfill specific interval + count = await backfill.backfill_interval("1m", days_back=3) +``` + +**Usage - CLI:** +```bash +# Backfill 7 days of 1m candles for BTC +cd src/data_collector && python -m data_collector.backfill --coin BTC --days 7 --intervals 1m + +# Backfill multiple intervals +python -m data_collector.backfill --coin BTC --days 30 --intervals 1m 5m 1h + +# Backfill MAXIMUM available data (5000 candles per interval) +python -m data_collector.backfill --coin BTC --days max --intervals 1m 1h 1d + +# Or use the convenience script +bash scripts/backfill.sh BTC 7 "1m 5m 1h" +bash scripts/backfill.sh BTC max "1m 1h 1d" # Maximum data +``` + +**Data Coverage by Interval:** +- 1m candles: ~3.5 days (5000 candles) +- 1h candles: ~7 months (5000 candles) +- 1d candles: ~13.7 years (5000 candles) diff --git a/MIGRATION_CHECKLIST.md b/MIGRATION_CHECKLIST.md new file mode 100644 index 0000000..4eaae88 --- /dev/null +++ b/MIGRATION_CHECKLIST.md @@ -0,0 +1,120 @@ +# Quick Migration Checklist + +## Pre-Migration (Current Location) + +- [ ] Review PROJECT_CONTEXT.md +- [ ] Ensure all Dockerfiles don't have `apt-get` commands +- [ ] Note any custom modifications made +- [ ] Export any existing data (if needed): + ```bash + docker exec btc_timescale pg_dump -U btc_bot btc_data > backup.sql + ``` + +## Copy Files + +Copy entire `btc_bot/` folder to new location: + +```bash +# Option 1: Direct copy +cp -r btc_bot /new/location/ + +# Option 2: Tar archive +tar -czvf btc_bot_backup.tar.gz btc_bot/ +# Copy to new location +tar -xzvf btc_bot_backup.tar.gz -C /new/location/ + +# Option 3: rsync +rsync -avz --progress btc_bot/ user@new-host:/path/to/btc_bot/ +``` + +## Post-Migration (New Location) + +- [ ] Run migration helper: `./scripts/migrate.sh` +- [ ] Verify `.env` file exists and has correct password +- [ ] Check file permissions: `ls -la scripts/*.sh` +- [ ] Review `docker/timescaledb.conf` for any local adjustments +- [ ] Update `config/data_config.yaml` if paths changed + +## Deploy + +- [ ] Run: `./scripts/deploy.sh` +- [ ] Check container status: `docker-compose ps` +- [ ] Test health endpoint: `curl http://localhost:8000/api/v1/health` +- [ ] Open dashboard: `http://your-ip:8000/dashboard` +- [ ] Verify database: `docker exec btc_timescale psql -U btc_bot -d btc_data -c "SELECT version();"` + +## Verification + +- [ ] WebSocket connecting (check logs) +- [ ] Candles appearing in database +- [ ] Dashboard loading +- [ ] API responding +- [ ] No errors in logs + +## Optional: Import Old Data + +If you exported data from old location: + +```bash +docker exec -i btc_timescale psql -U btc_bot -d btc_data < backup.sql +``` + +## Troubleshooting Migration Issues + +### Permission Denied on Scripts +```bash +chmod +x scripts/*.sh +``` + +### Docker Build Fails +```bash +cd docker +docker-compose down +docker system prune -f +docker-compose build --no-cache +``` + +### Database Connection Failed +```bash +# Check if .env has correct DB_PASSWORD +cat .env | grep DB_PASSWORD + +# Verify database is running +docker-compose ps + +# Check database logs +docker-compose logs timescaledb +``` + +### Port Already in Use +```bash +# Check what's using port 5432 or 8000 +sudo netstat -tulpn | grep 5432 +sudo netstat -tulpn | grep 8000 + +# Change ports in docker-compose.yml if needed +``` + +## Post-Migration Cleanup (Old Location) + +After confirming new location works: + +```bash +# Old location - stop and remove containers +cd docker +docker-compose down -v # -v removes volumes (WARNING: deletes data!) + +# Or just stop without deleting data +docker-compose down +``` + +## Notes + +- **Database data**: Will be fresh (empty) in new location unless you export/import +- **Logs**: Start fresh in new location +- **Configuration**: .env file needs to be recreated in new location +- **Backups**: Update backup scripts to point to new path + +--- + +**Remember**: The project is now portable and ready for deployment anywhere with Docker support! \ No newline at end of file diff --git a/PROJECT_CONTEXT.md b/PROJECT_CONTEXT.md new file mode 100644 index 0000000..1612edd --- /dev/null +++ b/PROJECT_CONTEXT.md @@ -0,0 +1,439 @@ +# BTC Bot Project - Migration Context + +**Created:** 2024-02-11 +**Phase:** 1 of 4 (Data Collection) - COMPLETE +**Status:** Ready for deployment on Synology DS218+ + +--- + +## 📁 Project Structure + +``` +btc_bot/ +├── docker/ # Docker configurations +│ ├── docker-compose.yml # Main orchestration file +│ ├── Dockerfile.collector # Data collector service (no apt-get) +│ ├── Dockerfile.api # API server service +│ ├── timescaledb.conf # Database optimization for NAS +│ └── init-scripts/ # Auto-run SQL on first start +│ ├── 01-schema.sql # Main tables & hypertables +│ └── 02-optimization.sql # Indexes & compression +│ +├── config/ +│ └── data_config.yaml # Data collection settings +│ +├── src/ +│ ├── data_collector/ # Data ingestion module +│ │ ├── __init__.py +│ │ ├── main.py # Entry point & orchestrator +│ │ ├── websocket_client.py # Hyperliquid WebSocket client +│ │ ├── candle_buffer.py # In-memory circular buffer +│ │ └── database.py # TimescaleDB interface +│ │ +│ └── api/ # REST API & dashboard +│ ├── server.py # FastAPI application +│ └── dashboard/ +│ └── static/ +│ └── index.html # Real-time web dashboard +│ +├── scripts/ # Operations +│ ├── deploy.sh # One-command deployment +│ ├── backup.sh # Automated backup script +│ └── health_check.sh # Health monitoring +│ +├── requirements.txt # Python dependencies +├── .env.example # Environment template +└── README.md # Full documentation +``` + +--- + +## ✅ Completed Features + +### Phase 1: Data Collection (DONE) + +**Components Built:** + +1. **Hyperliquid WebSocket Client** + - Real-time cbBTC-PERP 1m candles + - Auto-reconnection with exponential backoff + - Connection health monitoring + - File: `src/data_collector/websocket_client.py` + +2. **Candle Buffer System** + - Circular buffer (1000 candles max) + - Automatic batching (every 30s or 100 candles) + - Gap detection + - File: `src/data_collector/candle_buffer.py` + +3. **TimescaleDB Integration** + - Hypertables with weekly partitioning + - Automatic compression after 7 days + - Connection pooling + - Batch inserts with conflict resolution + - File: `src/data_collector/database.py` + +4. **Main Orchestrator** + - Async event loop + - Health monitoring (every 60s) + - Gap detection (every 5 min) + - Graceful shutdown handling + - File: `src/data_collector/main.py` + +5. **REST API** + - FastAPI with auto-generated docs + - Endpoints: /candles, /candles/latest, /health, /export/csv + - Real-time dashboard with charts + - File: `src/api/server.py` + +6. **Database Schema** + - `candles` - Main price data (hypertable) + - `indicators` - Computed values (hypertable) + - `data_quality` - Issues & gaps log + - `collector_state` - Metadata tracking + - Compression enabled for old data + - Files: `docker/init-scripts/*.sql` + +7. **Operations Scripts** + - Automated deployment + - Backup with retention + - Health monitoring + - Files: `scripts/*.sh` + +--- + +## ⚙️ Configuration + +### Environment Variables (.env) + +```bash +# Database +DB_HOST=timescaledb # Use 'timescaledb' for Docker, 'localhost' for direct +DB_PORT=5432 +DB_NAME=btc_data +DB_USER=btc_bot +DB_PASSWORD=your_secure_password_here + +# Validation (optional) +BASE_RPC_URL=https://base-mainnet.g.alchemy.com/v2/YOUR_KEY + +# Notifications (optional) +TELEGRAM_BOT_TOKEN=your_token +TELEGRAM_CHAT_ID=your_chat_id +``` + +### Data Collection Settings (config/data_config.yaml) + +```yaml +# Key settings: +# - Primary: Hyperliquid WebSocket +# - Symbol: cbBTC-PERP +# - Interval: 1m (base), custom intervals computed +# - Buffer: 1000 candles, 30s flush +# - Validation: Every 5 minutes +``` + +--- + +## 🚀 Deployment Steps + +### Prerequisites +- Synology DS218+ (or similar NAS) +- Docker package installed +- 6GB RAM recommended (upgraded from 2GB) +- SSH access enabled + +### Deploy Command + +```bash +# On NAS: +cd /volume1/btc_bot +chmod +x scripts/deploy.sh +./scripts/deploy.sh +``` + +### Post-Deployment Verification + +```bash +# Check services +cd docker +docker-compose ps + +# View logs +docker-compose logs -f data_collector +docker-compose logs -f api_server + +# Test database +docker exec btc_timescale psql -U btc_bot -d btc_data -c "SELECT COUNT(*) FROM candles;" + +# Access dashboard +http://your-nas-ip:8000/dashboard +``` + +--- + +## 📊 Database Access + +### Direct Connection +```bash +# From NAS +docker exec -it btc_timescale psql -U btc_bot -d btc_data + +# Useful queries: +# Latest data: SELECT * FROM candles ORDER BY time DESC LIMIT 10; +# Check gaps: SELECT * FROM data_quality WHERE resolved = false; +# Health: SELECT * FROM data_health; +``` + +### Connection String +``` +postgresql://btc_bot:password@localhost:5432/btc_data +``` + +--- + +## 🔧 Known Issues & Solutions + +### 1. Docker DNS Resolution (FIXED) +**Problem:** `apt-get update` fails in containers +**Solution:** Removed `apt-get` from Dockerfiles - using pre-compiled Python packages only +**Files Modified:** `docker/Dockerfile.collector`, `docker/Dockerfile.api` + +### 2. CPU Architecture (opencode incompatibility) +**Problem:** Intel Atom D2701 lacks SSE4.2 instructions +**Solution:** Run opencode on modern PC, connect to NAS via VS Code Remote-SSH +**Workflow:** Edit on PC → Files on NAS via SSH → Docker sees changes + +### 3. Memory Constraints on DS218+ +**Mitigation:** +- TimescaleDB limited to 1.5GB RAM +- Collector limited to 256MB +- API limited to 512MB +- Compression enabled after 7 days + +--- + +## 📈 Storage Estimates + +| Data Type | Growth Rate | Compression | +|-----------|-------------|-------------| +| 1m Candles | ~50MB/year | ~70% reduction | +| Indicators | ~100MB/year | ~70% reduction | +| Backups | Configurable | gzip compressed | + +**Total with 1 year retention:** ~200MB compressed + +--- + +## 🎯 Next Phases (TODO) + +### Phase 2: Indicators & Brain +**Status:** Not Started +**Components:** +- [ ] RSI, MACD, EMA calculations +- [ ] Custom interval builder (37m, etc.) +- [ ] Indicator storage in database +- [ ] Backfill system for gaps +- [ ] Brain/decision engine +- [ ] Weighted signal combination + +**Files to Create:** +- `src/indicators/*.py` (base, rsi, macd, ema, bollinger, volume) +- `src/brain/decision_engine.py` +- `src/brain/weights.py` + +### Phase 3: Wallet & Execution +**Status:** Not Started +**Components:** +- [ ] Web3.py EVM integration +- [ ] Wallet management (EOA) +- [ ] Uniswap V3 swap execution +- [ ] Gas management +- [ ] Aave V3 integration (supply cbBTC) + +**Files to Create:** +- `src/execution/wallet.py` +- `src/execution/uniswap.py` +- `src/execution/aave.py` +- `src/execution/gas_manager.py` + +### Phase 4: Trading Bot Integration +**Status:** Not Started +**Components:** +- [ ] Signal → Trade execution flow +- [ ] $25 USDC trade size +- [ ] Automatic Aave deposit +- [ ] Risk management +- [ ] Telegram notifications +- [ ] Performance tracking + +--- + +## 🔌 Integration Points + +### Hyperliquid API +- **WebSocket:** `wss://api.hyperliquid.xyz/ws` +- **REST:** `https://api.hyperliquid.xyz/info` +- **Subscription:** `{"method": "subscribe", "subscription": {"type": "candle", "coin": "cbBTC", "interval": "1m"}}` + +### Base Chain (for validation) +- **RPC:** Alchemy/Infura/QuickNode +- **Contract:** cbBTC token on Base +- **Purpose:** Cross-validate Hyperliquid prices + +### Aave V3 (Phase 3) +- **Network:** Base +- **Pool:** `0xA238Dd80C259a72e81d7e4664a9801593F98d1c5` +- **Action:** Supply cbBTC as collateral + +--- + +## 📋 Dependencies + +### Python Packages (requirements.txt) +Key packages: +- `websockets` - WebSocket client +- `asyncpg` - PostgreSQL async driver +- `fastapi` + `uvicorn` - API server +- `pandas` + `numpy` - Data processing +- `web3` - Ethereum integration (Phase 3) +- `pydantic` - Data validation +- `pyyaml` - Configuration + +### Docker Images +- `timescale/timescaledb:2.11.2-pg15` +- `python:3.11-slim` (for custom builds) + +--- + +## 🎨 Development Workflow + +### Recommended Setup +``` +PC (opencode + VS Code) ──SSH──► NAS (Docker containers) +``` + +### VS Code Extensions +- Remote - SSH +- Python +- Docker (optional) + +### File Editing Options +1. **VS Code Remote-SSH:** Edit directly on NAS +2. **SSHFS:** Mount NAS locally +3. **WinSCP:** Sync local ↔ remote +4. **Synology Drive:** Bidirectional sync + +--- + +## 🔒 Security Notes + +1. **Environment File:** `.env` contains secrets - never commit to git +2. **Database:** Not exposed externally by default +3. **API:** No authentication (assumes local network) +4. **Wallet Keys:** Will be in `.env` for Phase 3 - use hardware wallet for large amounts + +--- + +## 📞 Troubleshooting Guide + +### Container Won't Start +```bash +# Check logs +docker-compose logs service_name + +# Common fixes: +docker-compose down +docker system prune -f +docker-compose build --no-cache +docker-compose up -d +``` + +### Database Connection Issues +```bash +# Check if DB is ready +docker exec btc_timescale pg_isready -U btc_bot + +# Reset (WARNING: deletes all data!) +docker-compose down -v +docker-compose up -d +``` + +### High Memory Usage +- Edit `docker/timescaledb.conf` - reduce `shared_buffers` +- Edit `docker/docker-compose.yml` - reduce memory limits +- Enable more aggressive compression + +### Data Gaps Detected +- Check WebSocket logs for disconnections +- Verify Hyperliquid API status +- Consider implementing REST API backfill (Phase 2) + +--- + +## 📝 Additional Notes + +### Design Decisions +1. **1m candles as base:** Custom intervals computed on-demand +2. **TimescaleDB over InfluxDB:** Better SQL support, handles custom intervals +3. **Docker over native:** Easier deployment, resource isolation +4. **Asyncio:** Handles many concurrent connections efficiently +5. **Batch writes:** Minimizes database load on NAS + +### Performance Targets +- **Latency:** < 1s from trade to database +- **Throughput:** Handle 1 candle/minute easily +- **Memory:** < 2.5GB total usage +- **Storage:** < 1GB/year with compression + +### Future Enhancements +- [ ] Multi-asset support (ETH, SOL, etc.) +- [ ] Historical backfill from REST API +- [ ] Machine learning layer for signals +- [ ] WebSocket multiplexing for multiple symbols +- [ ] Prometheus metrics export +- [ ] Grafana dashboard (alternative to custom UI) + +--- + +## ✅ Pre-Migration Checklist + +Before moving to new folder/location: + +- [ ] All files listed in structure present +- [ ] Dockerfiles don't contain `apt-get` commands +- [ ] `.env` file configured with your passwords +- [ ] `scripts/*.sh` have execute permissions (`chmod +x`) +- [ ] `docker/init-scripts/*.sql` present +- [ ] `requirements.txt` includes all dependencies +- [ ] Tested on current location (if possible) + +--- + +## 🚀 Quick Start in New Location + +```bash +# 1. Copy all files to new location +cp -r btc_bot /new/location/ + +# 2. Set permissions +chmod +x /new/location/btc_bot/scripts/*.sh + +# 3. Configure environment +cd /new/location/btc_bot +cp .env.example .env +nano .env # Edit passwords + +# 4. Deploy +cd docker +docker-compose up -d + +# 5. Verify +docker-compose ps +curl http://localhost:8000/api/v1/health +``` + +--- + +**End of Context File** +**Ready for migration to new location** \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..c410a8d --- /dev/null +++ b/README.md @@ -0,0 +1,305 @@ +# BTC Accumulation Bot - Data Collection Phase + +High-performance data collection system for cbBTC on Hyperliquid with TimescaleDB storage on Synology DS218+. + +## Architecture Overview + +- **Data Source**: Hyperliquid WebSocket (primary) +- **Database**: TimescaleDB (PostgreSQL extension) on NAS +- **Collection**: 1-minute candles with automatic batching +- **API**: FastAPI with real-time dashboard +- **Deployment**: Docker Compose on Synology + +## Project Structure + +``` +btc_bot/ +├── docker/ # Docker configurations +│ ├── docker-compose.yml +│ ├── Dockerfile.collector +│ ├── Dockerfile.api +│ └── init-scripts/ # Database initialization +├── config/ # YAML configurations +├── src/ +│ ├── data_collector/ # WebSocket client & database writer +│ └── api/ # REST API & dashboard +├── scripts/ # Deployment & backup scripts +└── requirements.txt +``` + +## Prerequisites + +1. **Synology DS218+** with: + - Docker package installed + - SSH access enabled + - 6GB RAM recommended (upgrade from stock 2GB) + +2. **Network**: + - Static IP for NAS (recommended) + - Port 5432 (database) and 8000 (API) available + +## Installation + +### 1. Clone Repository on NAS + +```bash +ssh user@your-nas-ip +cd /volume1 +mkdir -p btc_bot +cd btc_bot +# Copy project files here +``` + +### 2. Configure Environment + +```bash +# Copy example environment file +cp .env.example .env + +# Edit with your settings +nano .env +``` + +Required settings: +- `DB_PASSWORD`: Strong password for database +- `BASE_RPC_URL`: Alchemy/Infura API key for Base chain validation +- `TELEGRAM_BOT_TOKEN` and `CHAT_ID`: For notifications (optional) + +### 3. Deploy + +```bash +chmod +x scripts/deploy.sh +./scripts/deploy.sh +``` + +This will: +1. Create necessary directories +2. Build Docker images +3. Start TimescaleDB +4. Initialize database schema +5. Start data collector +6. Start API server + +### 4. Verify Installation + +```bash +# Check container status +cd docker +docker-compose ps + +# View logs +docker-compose logs -f data_collector +docker-compose logs -f api_server + +# Test database connection +docker exec btc_timescale psql -U btc_bot -d btc_data -c "SELECT COUNT(*) FROM candles;" +``` + +## Usage + +### Web Dashboard + +Access the dashboard at: `http://your-nas-ip:8000/dashboard` + +Features: +- Real-time price chart +- 24h statistics +- Recent candles table +- CSV export +- Auto-refresh every 30 seconds + +### REST API + +#### Get Candles +```bash +curl "http://your-nas-ip:8000/api/v1/candles?symbol=cbBTC-PERP&interval=1m&limit=100" +``` + +#### Get Latest Candle +```bash +curl "http://your-nas-ip:8000/api/v1/candles/latest?symbol=cbBTC-PERP&interval=1m" +``` + +#### Export CSV +```bash +curl "http://your-nas-ip:8000/api/v1/export/csv?symbol=cbBTC-PERP&days=7" -o cbBTC_7d.csv +``` + +#### Health Check +```bash +curl "http://your-nas-ip:8000/api/v1/health" +``` + +### API Documentation + +Interactive API docs available at: `http://your-nas-ip:8000/docs` + +## Database Access + +Connect directly to TimescaleDB: + +```bash +# From NAS +docker exec -it btc_timescale psql -U btc_bot -d btc_data + +# From remote (if port 5432 forwarded) +psql -h your-nas-ip -p 5432 -U btc_bot -d btc_data +``` + +### Useful Queries + +```sql +-- Check latest data +SELECT * FROM candles ORDER BY time DESC LIMIT 10; + +-- Check data gaps (last 24h) +SELECT * FROM data_quality +WHERE time > NOW() - INTERVAL '24 hours' +AND resolved = false; + +-- Database statistics +SELECT * FROM data_health; + +-- Compression status +SELECT chunk_name, compression_status +FROM timescaledb_information.chunks +WHERE hypertable_name = 'candles'; +``` + +## Backup & Maintenance + +### Automated Backups + +Setup scheduled task in Synology DSM: + +1. Open **Control Panel** → **Task Scheduler** +2. Create **Triggered Task** → **User-defined script** +3. Schedule: Every 6 hours +4. Command: `/volume1/btc_bot/scripts/backup.sh` + +### Manual Backup + +```bash +cd /volume1/btc_bot +./scripts/backup.sh +``` + +Backups stored in: `/volume1/btc_bot/backups/` + +### Health Monitoring + +Add to Task Scheduler (every 5 minutes): + +```bash +/volume1/btc_bot/scripts/health_check.sh +``` + +### Database Maintenance + +```bash +# Manual compression (runs automatically after 7 days) +docker exec btc_timescale psql -U btc_bot -d btc_data -c "SELECT compress_chunk(i) FROM show_chunks('candles') i;" + +# Vacuum and analyze +docker exec btc_timescale psql -U btc_bot -d btc_data -c "VACUUM ANALYZE candles;" +``` + +## Troubleshooting + +### High Memory Usage + +If DS218+ runs out of memory: + +```bash +# Reduce memory limits in docker-compose.yml +# Edit docker/docker-compose.yml +deploy: + resources: + limits: + memory: 1G # Reduce from 1.5G +``` + +Then restart: +```bash +cd docker +docker-compose down +docker-compose up -d +``` + +### Data Gaps + +If gaps detected: + +```bash +# Check logs +docker-compose logs data_collector | grep -i gap + +# Manual backfill (not yet implemented - will be in Phase 2) +``` + +### WebSocket Disconnections + +Normal behavior - client auto-reconnects. Check: + +```bash +# Connection health +docker-compose logs data_collector | grep -i "reconnect" +``` + +### Disk Space + +Monitor usage: + +```bash +du -sh /volume1/btc_bot/data +du -sh /volume1/btc_bot/backups +``` + +Expected growth: +- 1m candles: ~50MB/year (compressed) +- Indicators: ~100MB/year +- Backups: Varies based on retention + +## Performance Tuning + +For DS218+ with limited resources: + +1. **Buffer size**: Reduce in `config/data_config.yaml`: + ```yaml + buffer: + max_size: 500 # From 1000 + flush_interval_seconds: 60 # From 30 + ``` + +2. **Database connections**: Reduce pool size: + ```yaml + database: + pool_size: 3 # From 5 + ``` + +3. **Compression**: Already enabled after 7 days + +## Security Considerations + +1. **Environment file**: `.env` contains secrets - never commit to git +2. **Database**: Not exposed externally by default +3. **API**: No authentication (local network only) +4. **Firewall**: Only open port 8000 if needed externally (use VPN instead) + +## Next Steps (Phase 2) + +1. **Backfill system**: REST API integration for gap filling +2. **Indicators**: RSI, MACD, EMA computation engine +3. **Brain**: Decision engine with configurable rules +4. **Execution**: EVM wallet integration for cbBTC trading +5. **Aave**: Automatic yield generation on collected cbBTC + +## Support + +- **API Issues**: Check logs with `docker-compose logs api_server` +- **Data Issues**: Check logs with `docker-compose logs data_collector` +- **Database Issues**: Check logs with `docker-compose logs timescaledb` + +## License + +Private project - not for redistribution \ No newline at end of file diff --git a/Running b/Running new file mode 100644 index 0000000..e69de29 diff --git a/SYNOLOGY_INSTALL.md b/SYNOLOGY_INSTALL.md new file mode 100644 index 0000000..726544b --- /dev/null +++ b/SYNOLOGY_INSTALL.md @@ -0,0 +1,251 @@ +# Installing BTC Bot on Synology NAS + +Tested on Synology DS218+ (DSM 7.x, 2 GB RAM). The system runs three Docker +containers: TimescaleDB, a data collector (WebSocket to Hyperliquid), and a +FastAPI server with a web dashboard. + +## Prerequisites + +1. **Docker** -- install from Synology Package Center. +2. **SSH access** -- enable in DSM: Control Panel > Terminal & SNMP > Enable SSH. +3. **Git** (optional) -- install the Git Server package, or copy files manually. + +## Step 1 -- Set NAS DNS + +Docker on Synology often can't resolve hostnames during image builds. Fix this +before anything else: + +**DSM > Control Panel > Network > General tab > set DNS Server:** + +| Field | Value | +|-------|-------| +| Preferred DNS | `8.8.8.8` | +| Alternate DNS | `8.8.4.4` | + +Click Apply. + +## Step 2 -- Get the code onto the NAS + +SSH into the NAS and clone/copy the project: + +```bash +ssh your_user@nas_ip +cd /volume1/homes/your_user # or wherever you prefer +git clone btc_bot +cd btc_bot +``` + +Or copy files via SMB/SCP to `/volume1/homes/your_user/btc_bot`. + +## Step 3 -- Create directories + +```bash +sudo mkdir -p /volume1/btc_bot/{data,backups,logs,exports} +``` + +These are mounted as volumes by the containers. + +## Step 4 -- Configure environment + +```bash +cp .env.example .env +nano .env # or vi +``` + +At minimum, set a strong password for these two values: + +``` +DB_PASSWORD=your_secure_password_here +API_SECRET_KEY=your_secret_key_here +``` + +The remaining values can stay at their defaults for data collection. + +## Step 5 -- Build images + +The standard `docker-compose build` may fail with DNS errors inside the build +container. Use `docker build` with `--network host` to bypass this: + +```bash +cd ~/btc_bot + +# Build both images (uses host network for DNS resolution) +docker build --network host -f docker/Dockerfile.collector -t btc_collector . +docker build --network host -f docker/Dockerfile.api -t btc_api . +``` + +If you make code changes later, re-run these two commands to rebuild. + +## Step 6 -- Start services + +```bash +cd ~/btc_bot/docker +docker-compose up -d --no-build +``` + +The `--no-build` flag tells compose to use the images you built in Step 5 +instead of trying to build through its own (potentially broken) network. + +TimescaleDB will initialize the schema automatically from +`docker/init-scripts/01-schema.sql` on first start. + +## Step 7 -- Verify + +```bash +# Check all three containers are running +docker-compose ps + +# Watch collector logs (Ctrl+C to stop) +docker-compose logs -f data_collector + +# Check database health +docker exec btc_timescale pg_isready -U btc_bot -d btc_data + +# Run the health check script +bash ~/btc_bot/scripts/health_check.sh +``` + +The API dashboard is available at: **http://nas_ip:8000/dashboard** +API docs (Swagger): **http://nas_ip:8000/docs** + +## Setting up automated backups + +The backup script dumps the database every run and keeps 30 days of history. + +### Option A -- DSM Task Scheduler (recommended) + +1. Open DSM > Control Panel > Task Scheduler +2. Create > Scheduled Task > User-defined script +3. Set schedule: every 6 hours +4. User-defined script: + ``` + /bin/bash /volume1/homes/your_user/btc_bot/scripts/backup.sh + ``` +5. Run as: `root` (needed for docker exec) + +### Option B -- Cron (via SSH) + +```bash +sudo crontab -e +``` + +Add this line (runs every 6 hours): + +``` +0 */6 * * * /bin/bash /volume1/homes/your_user/btc_bot/scripts/backup.sh >> /volume1/btc_bot/logs/backup.log 2>&1 +``` + +Backups are stored in `/volume1/btc_bot/backups/`. + +## Setting up health monitoring + +Add another scheduled task (every 5 minutes) to check if everything is running: + +``` +/bin/bash /volume1/homes/your_user/btc_bot/scripts/health_check.sh +``` + +## Common operations + +### View logs + +```bash +cd ~/btc_bot/docker +docker-compose logs -f data_collector # collector +docker-compose logs -f api_server # API +docker-compose logs -f timescaledb # database +``` + +### Restart a single service + +```bash +cd ~/btc_bot/docker +docker-compose restart data_collector +``` + +### Stop everything + +```bash +cd ~/btc_bot/docker +docker-compose down +``` + +### Rebuild after code changes + +```bash +cd ~/btc_bot +docker build --network host -f docker/Dockerfile.collector -t btc_collector . +docker build --network host -f docker/Dockerfile.api -t btc_api . +cd docker +docker-compose up -d --no-build +``` + +### Reset database (destroys all data) + +Only do this if you need a fresh schema: + +```bash +cd ~/btc_bot/docker +docker-compose down -v +docker-compose up -d --no-build +``` + +### Restore from backup + +```bash +gunzip /volume1/btc_bot/backups/btc_data_YYYYMMDD_HHMM.dump.gz +docker exec -i btc_timescale pg_restore -U btc_bot -d btc_data --clean < /volume1/btc_bot/backups/btc_data_YYYYMMDD_HHMM.dump +``` + +## Resource usage + +The system is tuned for the DS218+ (2 GB RAM, dual-core): + +| Container | Memory limit | Typical usage | +|-----------|-------------|---------------| +| TimescaleDB | 1.5 GB | ~400-800 MB | +| Data Collector | 256 MB | ~50-100 MB | +| API Server | 512 MB | ~50-100 MB | + +PostgreSQL settings in `docker/timescaledb.conf` are optimized for this +hardware. If running on a more powerful NAS, increase `shared_buffers` and +`effective_cache_size` proportionally. + +## Troubleshooting + +### DNS errors during build + +Use `--network host` when building (Step 5). If it still fails, check that +your NAS can resolve DNS from the host: + +```bash +nslookup pypi.org +``` + +If that fails, fix DNS in DSM (Step 1). + +### Container keeps restarting + +```bash +docker logs btc_collector # check error output +docker logs btc_api +``` + +Common causes: wrong `DB_PASSWORD` in `.env`, database not ready yet (wait +30 seconds after first start), or port 5432/8000 already in use. + +### No data appearing + +1. Check collector logs: `docker-compose logs data_collector` +2. Verify WebSocket connectivity: the NAS needs outbound access to + `wss://api.hyperliquid.xyz/ws` (port 443) +3. Check database: `docker exec btc_timescale psql -U btc_bot -d btc_data -c "SELECT COUNT(*) FROM candles;"` + +### Ports used + +| Port | Service | Direction | +|------|---------|-----------| +| 5432 | PostgreSQL | Internal (+ exposed to host) | +| 8000 | API/Dashboard | Inbound from LAN | +| 443 | PyPI, Hyperliquid WS | Outbound | +| 53 | DNS | Outbound | diff --git a/config/data_config.yaml b/config/data_config.yaml new file mode 100644 index 0000000..1705128 --- /dev/null +++ b/config/data_config.yaml @@ -0,0 +1,85 @@ +# Data Collection Configuration +data_collection: + # Primary data source + primary_exchange: "hyperliquid" + + # Assets to collect + assets: + cbBTC: + symbol: "cbBTC-PERP" + enabled: true + base_asset: "cbBTC" + quote_asset: "USD" + + # Validation settings + validation: + enabled: true + tolerance_percent: 1.0 # 1% price divergence allowed + check_interval_minutes: 5 + + # Reference sources for cross-validation + references: + uniswap_v3: + enabled: true + chain: "base" + pool_address: "0x4f1480ba4F40f2A41a718c8699E64976b222b56d" # cbBTC/USDC + rpc_url: "https://base-mainnet.g.alchemy.com/v2/YOUR_API_KEY" + + coinbase: + enabled: true + api_url: "https://api.exchange.coinbase.com" + + # Intervals to collect (1m is base, others computed) + intervals: + - "1m" # Base collection + + # WebSocket settings + websocket: + url: "wss://api.hyperliquid.xyz/ws" + reconnect_attempts: 10 + reconnect_delays: [1, 2, 5, 10, 30, 60, 120, 300, 600, 900] # seconds + ping_interval: 30 + ping_timeout: 10 + + # Buffer settings + buffer: + max_size: 1000 # candles in memory + flush_interval_seconds: 30 + batch_size: 100 + + # Database settings + database: + host: "${DB_HOST}" + port: ${DB_PORT} + name: "${DB_NAME}" + user: "${DB_USER}" + password: "${DB_PASSWORD}" + pool_size: 5 + max_overflow: 10 + + # Backfill settings + backfill: + enabled: true + max_gap_minutes: 60 + rest_api_url: "https://api.hyperliquid.xyz/info" + + # Quality monitoring + quality_monitor: + enabled: true + check_interval_seconds: 300 # 5 minutes + anomaly_detection: + price_change_threshold: 0.10 # 10% + volume_spike_std: 5.0 # 5 sigma + +# Logging +logging: + level: "INFO" + format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + file: "/app/logs/collector.log" + max_size_mb: 100 + backup_count: 10 + +# Performance +performance: + max_cpu_percent: 80 + max_memory_mb: 256 \ No newline at end of file diff --git a/docker/Dockerfile.api b/docker/Dockerfile.api new file mode 100644 index 0000000..4da9afb --- /dev/null +++ b/docker/Dockerfile.api @@ -0,0 +1,22 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Copy requirements first (for better caching) +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY src/ ./src/ +COPY config/ ./config/ + +# Set Python path +ENV PYTHONPATH=/app + +# Expose API port +EXPOSE 8000 + +# Run the API server +CMD ["uvicorn", "src.api.server:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/docker/Dockerfile.collector b/docker/Dockerfile.collector new file mode 100644 index 0000000..89fe450 --- /dev/null +++ b/docker/Dockerfile.collector @@ -0,0 +1,20 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Copy requirements first (for better caching) +COPY requirements.txt . + +# Install Python dependencies +# --no-cache-dir reduces image size +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY src/ ./src/ +COPY config/ ./config/ + +# Set Python path +ENV PYTHONPATH=/app + +# Run the collector +CMD ["python", "-m", "src.data_collector.main"] \ No newline at end of file diff --git a/docker/Dockerfile.timescaledb b/docker/Dockerfile.timescaledb new file mode 100644 index 0000000..b9d6f67 --- /dev/null +++ b/docker/Dockerfile.timescaledb @@ -0,0 +1 @@ +timescale/timescaledb:2.11.2-pg15 \ No newline at end of file diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..297f5e2 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,83 @@ +version: '3.8' + +services: + timescaledb: + image: timescale/timescaledb:2.11.2-pg15 + container_name: btc_timescale + environment: + POSTGRES_USER: btc_bot + POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_DB: btc_data + TZ: Europe/Warsaw + volumes: + - /volume1/btc_bot/data:/var/lib/postgresql/data + - /volume1/btc_bot/backups:/backups + - ./timescaledb.conf:/etc/postgresql/postgresql.conf + - ./init-scripts:/docker-entrypoint-initdb.d + ports: + - "5433:5432" + command: postgres -c config_file=/etc/postgresql/postgresql.conf + restart: unless-stopped + deploy: + resources: + limits: + memory: 1.5G + reservations: + memory: 512M + healthcheck: + test: ["CMD-SHELL", "pg_isready -U btc_bot -d btc_data"] + interval: 10s + timeout: 5s + retries: 5 + + data_collector: + build: + context: .. + dockerfile: docker/Dockerfile.collector + image: btc_collector + container_name: btc_collector + network_mode: host + environment: + - DB_HOST=localhost + - DB_PORT=5433 + - DB_NAME=btc_data + - DB_USER=btc_bot + - DB_PASSWORD=${DB_PASSWORD} + - LOG_LEVEL=INFO + volumes: + - /volume1/btc_bot/logs:/app/logs + - ../config:/app/config:ro + depends_on: + timescaledb: + condition: service_healthy + restart: unless-stopped + deploy: + resources: + limits: + memory: 256M + reservations: + memory: 128M + + api_server: + build: + context: .. + dockerfile: docker/Dockerfile.api + image: btc_api + container_name: btc_api + network_mode: host + environment: + - DB_HOST=localhost + - DB_PORT=5433 + - DB_NAME=btc_data + - DB_USER=btc_bot + - DB_PASSWORD=${DB_PASSWORD} + volumes: + - /volume1/btc_bot/exports:/app/exports + - ../config:/app/config:ro + depends_on: + - timescaledb + restart: unless-stopped + deploy: + resources: + limits: + memory: 512M \ No newline at end of file diff --git a/docker/init-scripts/01-schema.sql b/docker/init-scripts/01-schema.sql new file mode 100644 index 0000000..5a1f2c0 --- /dev/null +++ b/docker/init-scripts/01-schema.sql @@ -0,0 +1,139 @@ +-- 1. Enable TimescaleDB extension +CREATE EXTENSION IF NOT EXISTS timescaledb; + +-- 2. Create candles table (main data storage) +CREATE TABLE IF NOT EXISTS candles ( + time TIMESTAMPTZ NOT NULL, + symbol TEXT NOT NULL, + interval TEXT NOT NULL, + open DECIMAL(18,8) NOT NULL, + high DECIMAL(18,8) NOT NULL, + low DECIMAL(18,8) NOT NULL, + close DECIMAL(18,8) NOT NULL, + volume DECIMAL(18,8) NOT NULL, + validated BOOLEAN DEFAULT FALSE, + source TEXT DEFAULT 'hyperliquid', + created_at TIMESTAMPTZ DEFAULT NOW() +); + +-- 3. Convert to hypertable (partitioned by time) +SELECT create_hypertable('candles', 'time', + chunk_time_interval => INTERVAL '7 days', + if_not_exists => TRUE +); + +-- 4. Create unique constraint for upserts (required by ON CONFLICT) +ALTER TABLE candles + ADD CONSTRAINT candles_unique_candle + UNIQUE (time, symbol, interval); + +-- 5. Create indexes for efficient queries +CREATE INDEX IF NOT EXISTS idx_candles_symbol_time + ON candles (symbol, interval, time DESC); + +CREATE INDEX IF NOT EXISTS idx_candles_validated + ON candles (validated) WHERE validated = FALSE; + +-- 5. Create indicators table (computed values) +CREATE TABLE IF NOT EXISTS indicators ( + time TIMESTAMPTZ NOT NULL, + symbol TEXT NOT NULL, + interval TEXT NOT NULL, + indicator_name TEXT NOT NULL, + value DECIMAL(18,8) NOT NULL, + parameters JSONB, + computed_at TIMESTAMPTZ DEFAULT NOW() +); + +-- 6. Convert indicators to hypertable +SELECT create_hypertable('indicators', 'time', + chunk_time_interval => INTERVAL '7 days', + if_not_exists => TRUE +); + +-- 7. Create index for indicators +CREATE INDEX IF NOT EXISTS idx_indicators_lookup + ON indicators (symbol, interval, indicator_name, time DESC); + +-- 8. Create data quality log table +CREATE TABLE IF NOT EXISTS data_quality ( + time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + check_type TEXT NOT NULL, + severity TEXT NOT NULL, + symbol TEXT, + details JSONB, + resolved BOOLEAN DEFAULT FALSE +); + +CREATE INDEX IF NOT EXISTS idx_quality_unresolved + ON data_quality (resolved) WHERE resolved = FALSE; + +CREATE INDEX IF NOT EXISTS idx_quality_time + ON data_quality (time DESC); + +-- 9. Create collector state tracking table +CREATE TABLE IF NOT EXISTS collector_state ( + id SERIAL PRIMARY KEY, + symbol TEXT NOT NULL UNIQUE, + last_candle_time TIMESTAMPTZ, + last_validation_time TIMESTAMPTZ, + total_candles BIGINT DEFAULT 0, + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- 10. Insert initial state for cbBTC +INSERT INTO collector_state (symbol, last_candle_time) +VALUES ('cbBTC', NULL) +ON CONFLICT (symbol) DO NOTHING; + +-- 11. Enable compression for old data (after 7 days) +ALTER TABLE candles SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'symbol,interval' +); + +ALTER TABLE indicators SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'symbol,interval,indicator_name' +); + +-- 12. Add compression policies +SELECT add_compression_policy('candles', INTERVAL '7 days', if_not_exists => TRUE); +SELECT add_compression_policy('indicators', INTERVAL '7 days', if_not_exists => TRUE); + +-- 13. Create function to update collector state +CREATE OR REPLACE FUNCTION update_collector_state() +RETURNS TRIGGER AS $$ +BEGIN + INSERT INTO collector_state (symbol, last_candle_time, total_candles) + VALUES (NEW.symbol, NEW.time, 1) + ON CONFLICT (symbol) + DO UPDATE SET + last_candle_time = NEW.time, + total_candles = collector_state.total_candles + 1, + updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- 14. Create trigger to auto-update state +DROP TRIGGER IF EXISTS trigger_update_state ON candles; +CREATE TRIGGER trigger_update_state + AFTER INSERT ON candles + FOR EACH ROW + EXECUTE FUNCTION update_collector_state(); + +-- 15. Create view for data health check +CREATE OR REPLACE VIEW data_health AS +SELECT + symbol, + COUNT(*) as total_candles, + COUNT(*) FILTER (WHERE validated) as validated_candles, + MAX(time) as latest_candle, + MIN(time) as earliest_candle, + NOW() - MAX(time) as time_since_last +FROM candles +GROUP BY symbol; + +-- Success message +SELECT 'Database schema initialized successfully' as status; \ No newline at end of file diff --git a/docker/init-scripts/02-optimization.sql b/docker/init-scripts/02-optimization.sql new file mode 100644 index 0000000..4f81995 --- /dev/null +++ b/docker/init-scripts/02-optimization.sql @@ -0,0 +1,43 @@ +-- Create a read-only user for API access (optional security) +DO $$ +BEGIN + IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'btc_api') THEN + CREATE USER btc_api WITH PASSWORD 'api_password_change_me'; + END IF; +END +$$; + +-- Grant read-only permissions +GRANT CONNECT ON DATABASE btc_data TO btc_api; +GRANT USAGE ON SCHEMA public TO btc_api; +GRANT SELECT ON ALL TABLES IN SCHEMA public TO btc_api; + +-- Grant sequence access for ID columns +GRANT USAGE ON ALL SEQUENCES IN SCHEMA public TO btc_api; + +-- Apply to future tables +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO btc_api; + +-- Create continuous aggregate for hourly stats (optional optimization) +CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_stats +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 hour', time) as bucket, + symbol, + interval, + FIRST(open, time) as first_open, + MAX(high) as max_high, + MIN(low) as min_low, + LAST(close, time) as last_close, + SUM(volume) as total_volume, + COUNT(*) as candle_count +FROM candles +GROUP BY bucket, symbol, interval; + +-- Add refresh policy for continuous aggregate +SELECT add_continuous_aggregate_policy('hourly_stats', + start_offset => INTERVAL '1 month', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour', + if_not_exists => TRUE +); \ No newline at end of file diff --git a/docker/timescaledb.conf b/docker/timescaledb.conf new file mode 100644 index 0000000..7e38535 --- /dev/null +++ b/docker/timescaledb.conf @@ -0,0 +1,40 @@ +# Optimized for Synology DS218+ (2GB RAM, dual-core CPU) + +# Required for TimescaleDB +shared_preload_libraries = 'timescaledb' + +# Memory settings +shared_buffers = 256MB +effective_cache_size = 768MB +work_mem = 16MB +maintenance_work_mem = 128MB + +# Connection settings +listen_addresses = '*' +max_connections = 50 +max_worker_processes = 2 +max_parallel_workers_per_gather = 1 +max_parallel_workers = 2 +max_parallel_maintenance_workers = 1 + +# Write performance +wal_buffers = 16MB +checkpoint_completion_target = 0.9 +random_page_cost = 1.1 +effective_io_concurrency = 200 + +# TimescaleDB settings +timescaledb.max_background_workers = 4 + +# Logging (use default pg_log directory inside PGDATA) +logging_collector = on +log_directory = 'log' +log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log' +log_rotation_age = 1d +log_rotation_size = 100MB +log_min_messages = warning +log_min_error_statement = error + +# Auto-vacuum for hypertables +autovacuum_max_workers = 2 +autovacuum_naptime = 10s \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b7bd767 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,23 @@ +websockets>=11.0,<13.0 +asyncio-mqtt>=0.13 +python-dotenv>=1.0.0 +pyyaml>=6.0 +asyncpg>=0.29.0 +psycopg2-binary>=2.9.9 +SQLAlchemy>=2.0.0 +pandas>=2.0.0 +numpy>=1.24.0 +aiohttp>=3.8.0 +aiofiles>=23.0.0 +web3>=6.0.0 +fastapi>=0.104.0 +uvicorn>=0.24.0 +python-multipart>=0.0.6 +jinja2>=3.1.0 +python-json-logger>=2.0.7 +structlog>=23.0.0 +rich>=13.0.0 +pydantic>=2.0.0 +pydantic-settings>=2.0.0 +python-dateutil>=2.8.0 +schedule>=1.2.0 \ No newline at end of file diff --git a/scripts/backfill.sh b/scripts/backfill.sh new file mode 100644 index 0000000..151f6f2 --- /dev/null +++ b/scripts/backfill.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# Backfill script for Hyperliquid historical data +# Usage: ./backfill.sh [coin] [days|max] [intervals...] +# Examples: +# ./backfill.sh BTC 7 "1m" # Last 7 days of 1m candles +# ./backfill.sh BTC max "1m 1h 1d" # Maximum available data for all intervals + +set -e + +COIN=${1:-BTC} +DAYS=${2:-7} +INTERVALS=${3:-"1m"} + +echo "=== Hyperliquid Historical Data Backfill ===" +echo "Coin: $COIN" +if [ "$DAYS" == "max" ]; then + echo "Mode: MAXIMUM (up to 5000 candles per interval)" +else + echo "Days: $DAYS" +fi +echo "Intervals: $INTERVALS" +echo "" + +# Change to project root +cd "$(dirname "$0")/.." + +# Run backfill inside Docker container +docker exec btc_collector python -m src.data_collector.backfill \ + --coin "$COIN" \ + --days "$DAYS" \ + --intervals $INTERVALS \ + --db-host localhost \ + --db-port 5433 + +echo "" +echo "=== Backfill Complete ===" diff --git a/scripts/backup.sh b/scripts/backup.sh new file mode 100644 index 0000000..fddef8d --- /dev/null +++ b/scripts/backup.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# Backup script for Synology DS218+ +# Run via Task Scheduler every 6 hours + +BACKUP_DIR="/volume1/btc_bot/backups" +DB_NAME="btc_data" +DB_USER="btc_bot" +RETENTION_DAYS=30 +DATE=$(date +%Y%m%d_%H%M) + +echo "Starting backup at $(date)" + +# Create backup directory if not exists +mkdir -p $BACKUP_DIR + +# Create backup +docker exec btc_timescale pg_dump -U $DB_USER -Fc $DB_NAME > $BACKUP_DIR/btc_data_$DATE.dump + +# Compress +if [ -f "$BACKUP_DIR/btc_data_$DATE.dump" ]; then + gzip $BACKUP_DIR/btc_data_$DATE.dump + echo "Backup created: btc_data_$DATE.dump.gz" + + # Calculate size + SIZE=$(du -h $BACKUP_DIR/btc_data_$DATE.dump.gz | cut -f1) + echo "Backup size: $SIZE" +else + echo "Error: Backup file not created" + exit 1 +fi + +# Delete old backups +DELETED=$(find $BACKUP_DIR -name "*.dump.gz" -mtime +$RETENTION_DAYS | wc -l) +find $BACKUP_DIR -name "*.dump.gz" -mtime +$RETENTION_DAYS -delete + +echo "Deleted $DELETED old backup(s)" +echo "Backup completed at $(date)" \ No newline at end of file diff --git a/scripts/deploy.sh b/scripts/deploy.sh new file mode 100644 index 0000000..a49477a --- /dev/null +++ b/scripts/deploy.sh @@ -0,0 +1,59 @@ +#!/bin/bash +# Deployment script for Synology DS218+ + +set -e + +echo "=== BTC Bot Data Collector Deployment ===" +echo "" + +# Check if running on Synology +if [ ! -d "/volume1" ]; then + echo "Warning: This script is designed for Synology NAS" + echo "Continuing anyway..." +fi + +# Create directories +echo "Creating directories..." +mkdir -p /volume1/btc_bot/{data,backups,logs,exports} + +# Check if Docker is installed +if ! command -v docker &> /dev/null; then + echo "Error: Docker not found. Please install Docker package from Synology Package Center" + exit 1 +fi + +# Copy configuration +echo "Setting up configuration..." +if [ ! -f "/volume1/btc_bot/.env" ]; then + cp .env.example /volume1/btc_bot/.env + echo "Created .env file. Please edit /volume1/btc_bot/.env with your settings" +fi + +# Build and start services +echo "Building and starting services..." +cd docker +docker-compose pull +docker-compose build --no-cache +docker-compose up -d + +# Wait for database +echo "Waiting for database to be ready..." +sleep 10 + +# Check status +echo "" +echo "=== Status ===" +docker-compose ps + +echo "" +echo "=== Logs (last 20 lines) ===" +docker-compose logs --tail=20 + +echo "" +echo "=== Deployment Complete ===" +echo "Database available at: localhost:5432" +echo "API available at: http://localhost:8000" +echo "" +echo "To view logs: docker-compose logs -f" +echo "To stop: docker-compose down" +echo "To backup: ./scripts/backup.sh" \ No newline at end of file diff --git a/scripts/health_check.sh b/scripts/health_check.sh new file mode 100644 index 0000000..b61d503 --- /dev/null +++ b/scripts/health_check.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# Health check script for cron/scheduler + +# Check if containers are running +if ! docker ps | grep -q "btc_timescale"; then + echo "ERROR: TimescaleDB container not running" + # Send notification (if configured) + exit 1 +fi + +if ! docker ps | grep -q "btc_collector"; then + echo "ERROR: Data collector container not running" + exit 1 +fi + +# Check database connectivity +docker exec btc_timescale pg_isready -U btc_bot -d btc_data > /dev/null 2>&1 +if [ $? -ne 0 ]; then + echo "ERROR: Cannot connect to database" + exit 1 +fi + +# Check if recent data exists +LATEST=$(docker exec btc_timescale psql -U btc_bot -d btc_data -t -c "SELECT MAX(time) FROM candles WHERE time > NOW() - INTERVAL '5 minutes';" 2>/dev/null) +if [ -z "$LATEST" ]; then + echo "WARNING: No recent data in database" + exit 1 +fi + +echo "OK: All systems operational" +exit 0 \ No newline at end of file diff --git a/scripts/verify_files.sh b/scripts/verify_files.sh new file mode 100644 index 0000000..5e906a9 --- /dev/null +++ b/scripts/verify_files.sh @@ -0,0 +1,33 @@ +#!/bin/bash +# BTC Bot Dashboard Setup Script +# Run this from ~/btc_bot to verify all files exist + +echo "=== BTC Bot File Verification ===" +echo "" + +FILES=( + "src/api/server.py" + "src/api/websocket_manager.py" + "src/api/dashboard/static/index.html" + "docker/Dockerfile.api" + "docker/Dockerfile.collector" +) + +for file in "${FILES[@]}"; do + if [ -f "$file" ]; then + size=$(stat -f%z "$file" 2>/dev/null || stat -c%s "$file" 2>/dev/null || echo "unknown") + echo "✓ $file (${size} bytes)" + else + echo "✗ $file (MISSING)" + fi +done + +echo "" +echo "=== Next Steps ===" +echo "1. If all files exist, rebuild:" +echo " cd ~/btc_bot" +echo " docker build --network host --no-cache -f docker/Dockerfile.api -t btc_api ." +echo " cd docker && docker-compose up -d" +echo "" +echo "2. Check logs:" +echo " docker logs btc_api --tail 20" diff --git a/src/api/dashboard/static/index.html b/src/api/dashboard/static/index.html new file mode 100644 index 0000000..021cd8f --- /dev/null +++ b/src/api/dashboard/static/index.html @@ -0,0 +1,842 @@ + + + + + + BTC Trading Dashboard + + + + +
+
+ BTC/USD + +
+ +
+
+ +
+
+ Live +
+
+ +
+
+ Price + -- +
+
+ Change + -- +
+
+ High + -- +
+
+ Low + -- +
+
+ +
+
+
+
+ +
+
+
+ Technical Analysis + 1D +
+
+ -- + + +
+
+
+
Loading technical analysis...
+
+
+
+ + + + diff --git a/src/api/server.py b/src/api/server.py new file mode 100644 index 0000000..8723601 --- /dev/null +++ b/src/api/server.py @@ -0,0 +1,376 @@ +""" +Simplified FastAPI server - working version +Removes the complex WebSocket manager that was causing issues +""" + +import os +import asyncio +import logging +from datetime import datetime, timedelta +from typing import Optional +from contextlib import asynccontextmanager + +from fastapi import FastAPI, HTTPException, Query +from fastapi.staticfiles import StaticFiles +from fastapi.responses import StreamingResponse +from fastapi.middleware.cors import CORSMiddleware +import asyncpg +import csv +import io + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# Database connection settings +DB_HOST = os.getenv('DB_HOST', 'localhost') +DB_PORT = int(os.getenv('DB_PORT', 5432)) +DB_NAME = os.getenv('DB_NAME', 'btc_data') +DB_USER = os.getenv('DB_USER', 'btc_bot') +DB_PASSWORD = os.getenv('DB_PASSWORD', '') + + +async def get_db_pool(): + """Create database connection pool""" + return await asyncpg.create_pool( + host=DB_HOST, + port=DB_PORT, + database=DB_NAME, + user=DB_USER, + password=DB_PASSWORD, + min_size=1, + max_size=10 + ) + + +pool = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage application lifespan""" + global pool + pool = await get_db_pool() + logger.info("API Server started successfully") + yield + if pool: + await pool.close() + logger.info("API Server stopped") + + +app = FastAPI( + title="BTC Bot Data API", + description="REST API for accessing BTC candle data", + version="1.1.0", + lifespan=lifespan +) + +# Enable CORS +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/") +async def root(): + """Root endpoint""" + return { + "message": "BTC Bot Data API", + "docs": "/docs", + "dashboard": "/dashboard", + "status": "operational" + } + + +@app.get("/api/v1/candles") +async def get_candles( + symbol: str = Query("BTC", description="Trading pair symbol"), + interval: str = Query("1m", description="Candle interval"), + start: Optional[datetime] = Query(None, description="Start time (ISO format)"), + end: Optional[datetime] = Query(None, description="End time (ISO format)"), + limit: int = Query(1000, ge=1, le=10000, description="Maximum number of candles") +): + """Get candle data for a symbol""" + async with pool.acquire() as conn: + query = """ + SELECT time, symbol, interval, open, high, low, close, volume, validated + FROM candles + WHERE symbol = $1 AND interval = $2 + """ + params = [symbol, interval] + + if start: + query += f" AND time >= ${len(params) + 1}" + params.append(start) + + if end: + query += f" AND time <= ${len(params) + 1}" + params.append(end) + + query += f" ORDER BY time DESC LIMIT ${len(params) + 1}" + params.append(limit) + + rows = await conn.fetch(query, *params) + + return { + "symbol": symbol, + "interval": interval, + "count": len(rows), + "candles": [dict(row) for row in rows] + } + + +@app.get("/api/v1/candles/latest") +async def get_latest_candle(symbol: str = "BTC", interval: str = "1m"): + """Get the most recent candle""" + async with pool.acquire() as conn: + row = await conn.fetchrow(""" + SELECT time, symbol, interval, open, high, low, close, volume + FROM candles + WHERE symbol = $1 AND interval = $2 + ORDER BY time DESC + LIMIT 1 + """, symbol, interval) + + if not row: + raise HTTPException(status_code=404, detail="No data found") + + return dict(row) + + +@app.get("/api/v1/stats") +async def get_stats(symbol: str = "BTC"): + """Get trading statistics""" + async with pool.acquire() as conn: + # Get latest price and 24h stats + latest = await conn.fetchrow(""" + SELECT close, time + FROM candles + WHERE symbol = $1 AND interval = '1m' + ORDER BY time DESC + LIMIT 1 + """, symbol) + + day_ago = await conn.fetchrow(""" + SELECT close + FROM candles + WHERE symbol = $1 AND interval = '1m' AND time <= NOW() - INTERVAL '24 hours' + ORDER BY time DESC + LIMIT 1 + """, symbol) + + stats_24h = await conn.fetchrow(""" + SELECT + MAX(high) as high_24h, + MIN(low) as low_24h, + SUM(volume) as volume_24h + FROM candles + WHERE symbol = $1 AND interval = '1m' AND time > NOW() - INTERVAL '24 hours' + """, symbol) + + if not latest: + raise HTTPException(status_code=404, detail="No data found") + + current_price = float(latest['close']) + previous_price = float(day_ago['close']) if day_ago else current_price + change_24h = ((current_price - previous_price) / previous_price * 100) if previous_price else 0 + + return { + "symbol": symbol, + "current_price": current_price, + "change_24h": round(change_24h, 2), + "high_24h": float(stats_24h['high_24h']) if stats_24h['high_24h'] else current_price, + "low_24h": float(stats_24h['low_24h']) if stats_24h['low_24h'] else current_price, + "volume_24h": float(stats_24h['volume_24h']) if stats_24h['volume_24h'] else 0, + "last_update": latest['time'].isoformat() + } + + +@app.get("/api/v1/health") +async def health_check(): + """System health check""" + try: + async with pool.acquire() as conn: + latest = await conn.fetchrow(""" + SELECT symbol, MAX(time) as last_time, COUNT(*) as count + FROM candles + WHERE time > NOW() - INTERVAL '24 hours' + GROUP BY symbol + """) + + return { + "status": "healthy", + "database": "connected", + "latest_candles": dict(latest) if latest else None, + "timestamp": datetime.utcnow().isoformat() + } + except Exception as e: + logger.error(f"Health check failed: {e}") + raise HTTPException(status_code=503, detail=f"Health check failed: {str(e)}") + + +@app.get("/api/v1/ta") +async def get_technical_analysis( + symbol: str = Query("BTC", description="Trading pair symbol"), + interval: str = Query("1d", description="Candle interval") +): + """ + Get technical analysis for a symbol + Calculates MA 44, MA 125, trend, support/resistance + """ + try: + async with pool.acquire() as conn: + # Get enough candles for MA 125 calculation + rows = await conn.fetch(""" + SELECT time, open, high, low, close, volume + FROM candles + WHERE symbol = $1 AND interval = $2 + ORDER BY time DESC + LIMIT 200 + """, symbol, interval) + + if len(rows) < 50: + return { + "symbol": symbol, + "interval": interval, + "error": "Not enough data for technical analysis", + "min_required": 50, + "available": len(rows) + } + + # Reverse to chronological order + candles = list(reversed(rows)) + closes = [float(c['close']) for c in candles] + + # Calculate Moving Averages + def calculate_ma(data, period): + if len(data) < period: + return None + return sum(data[-period:]) / period + + ma_44 = calculate_ma(closes, 44) + ma_125 = calculate_ma(closes, 125) + + current_price = closes[-1] + + # Determine trend + if ma_44 and ma_125: + if current_price > ma_44 > ma_125: + trend = "Bullish" + trend_strength = "Strong" if current_price > ma_44 * 1.05 else "Moderate" + elif current_price < ma_44 < ma_125: + trend = "Bearish" + trend_strength = "Strong" if current_price < ma_44 * 0.95 else "Moderate" + else: + trend = "Neutral" + trend_strength = "Consolidation" + else: + trend = "Unknown" + trend_strength = "Insufficient data" + + # Find support and resistance (recent swing points) + highs = [float(c['high']) for c in candles[-20:]] + lows = [float(c['low']) for c in candles[-20:]] + + resistance = max(highs) + support = min(lows) + + # Calculate price position + price_range = resistance - support + if price_range > 0: + position = (current_price - support) / price_range * 100 + else: + position = 50 + + return { + "symbol": symbol, + "interval": interval, + "timestamp": datetime.utcnow().isoformat(), + "current_price": round(current_price, 2), + "moving_averages": { + "ma_44": round(ma_44, 2) if ma_44 else None, + "ma_125": round(ma_125, 2) if ma_125 else None, + "price_vs_ma44": round((current_price / ma_44 - 1) * 100, 2) if ma_44 else None, + "price_vs_ma125": round((current_price / ma_125 - 1) * 100, 2) if ma_125 else None + }, + "trend": { + "direction": trend, + "strength": trend_strength, + "signal": "Buy" if trend == "Bullish" and trend_strength == "Strong" else + "Sell" if trend == "Bearish" and trend_strength == "Strong" else "Hold" + }, + "levels": { + "resistance": round(resistance, 2), + "support": round(support, 2), + "position_in_range": round(position, 1) + }, + "ai_placeholder": { + "available": False, + "message": "AI analysis available via Gemini or local LLM", + "action": "Click to analyze with AI" + } + } + + except Exception as e: + logger.error(f"Technical analysis error: {e}") + raise HTTPException(status_code=500, detail=f"Technical analysis failed: {str(e)}") + + +@app.get("/api/v1/export/csv") +async def export_csv( + symbol: str = "BTC", + interval: str = "1m", + days: int = Query(7, ge=1, le=365, description="Number of days to export") +): + """Export candle data to CSV""" + start_date = datetime.utcnow() - timedelta(days=days) + + async with pool.acquire() as conn: + query = """ + SELECT time, open, high, low, close, volume + FROM candles + WHERE symbol = $1 AND interval = $2 AND time >= $3 + ORDER BY time + """ + rows = await conn.fetch(query, symbol, interval, start_date) + + if not rows: + raise HTTPException(status_code=404, detail="No data found for export") + + output = io.StringIO() + writer = csv.writer(output) + writer.writerow(['timestamp', 'open', 'high', 'low', 'close', 'volume']) + + for row in rows: + writer.writerow([ + row['time'].isoformat(), + row['open'], + row['high'], + row['low'], + row['close'], + row['volume'] + ]) + + output.seek(0) + + return StreamingResponse( + io.BytesIO(output.getvalue().encode()), + media_type="text/csv", + headers={ + "Content-Disposition": f"attachment; filename={symbol}_{interval}_{days}d.csv" + } + ) + + +# Serve static files for dashboard +app.mount("/dashboard", StaticFiles(directory="src/api/dashboard/static", html=True), name="dashboard") + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/src/data_collector/__init__.py b/src/data_collector/__init__.py new file mode 100644 index 0000000..c8736ac --- /dev/null +++ b/src/data_collector/__init__.py @@ -0,0 +1,13 @@ +# Data collector module +from .websocket_client import HyperliquidWebSocket, Candle +from .candle_buffer import CandleBuffer +from .database import DatabaseManager +from .backfill import HyperliquidBackfill + +__all__ = [ + 'HyperliquidWebSocket', + 'Candle', + 'CandleBuffer', + 'DatabaseManager', + 'HyperliquidBackfill' +] \ No newline at end of file diff --git a/src/data_collector/backfill.py b/src/data_collector/backfill.py new file mode 100644 index 0000000..92c78e4 --- /dev/null +++ b/src/data_collector/backfill.py @@ -0,0 +1,366 @@ +""" +Hyperliquid Historical Data Backfill Module +Downloads candle data from Hyperliquid REST API with pagination support +""" + +import asyncio +import logging +from datetime import datetime, timezone, timedelta +from typing import List, Dict, Any, Optional +import aiohttp + +from .database import DatabaseManager +from .websocket_client import Candle + + +logger = logging.getLogger(__name__) + + +class HyperliquidBackfill: + """ + Backfills historical candle data from Hyperliquid REST API + + API Limitations: + - Max 5000 candles per coin/interval combination + - 500 candles per response (requires pagination) + - Available intervals: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 8h, 12h, 1d, 3d, 1w, 1M + """ + + API_URL = "https://api.hyperliquid.xyz/info" + MAX_CANDLES_PER_REQUEST = 500 + MAX_TOTAL_CANDLES = 5000 + + # Standard timeframes supported by Hyperliquid + INTERVALS = [ + "1m", "3m", "5m", "15m", "30m", + "1h", "2h", "4h", "8h", "12h", + "1d", "3d", "1w", "1M" + ] + + def __init__( + self, + db: DatabaseManager, + coin: str = "BTC", + intervals: Optional[List[str]] = None + ): + self.db = db + self.coin = coin + self.intervals = intervals or ["1m"] # Default to 1m + self.session: Optional[aiohttp.ClientSession] = None + + async def __aenter__(self): + """Async context manager entry""" + self.session = aiohttp.ClientSession() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit""" + if self.session: + await self.session.close() + + async def fetch_candles( + self, + interval: str, + start_time: datetime, + end_time: Optional[datetime] = None + ) -> List[Candle]: + """ + Fetch candles for a specific interval with pagination + + Args: + interval: Candle interval (e.g., "1m", "1h", "1d") + start_time: Start time (inclusive) + end_time: End time (inclusive, defaults to now) + + Returns: + List of Candle objects + """ + if interval not in self.INTERVALS: + raise ValueError(f"Invalid interval: {interval}. Must be one of {self.INTERVALS}") + + end_time = end_time or datetime.now(timezone.utc) + + # Convert to milliseconds + start_ms = int(start_time.timestamp() * 1000) + end_ms = int(end_time.timestamp() * 1000) + + all_candles = [] + total_fetched = 0 + + while total_fetched < self.MAX_TOTAL_CANDLES: + logger.info(f"Fetching {interval} candles from {datetime.fromtimestamp(start_ms/1000, tz=timezone.utc)} " + f"(batch {total_fetched//self.MAX_CANDLES_PER_REQUEST + 1})") + + try: + batch = await self._fetch_batch(interval, start_ms, end_ms) + + if not batch: + logger.info(f"No more {interval} candles available") + break + + all_candles.extend(batch) + total_fetched += len(batch) + + logger.info(f"Fetched {len(batch)} {interval} candles (total: {total_fetched})") + + # Check if we got less than max, means we're done + if len(batch) < self.MAX_CANDLES_PER_REQUEST: + break + + # Update start_time for next batch (last candle's time + 1ms) + last_candle = batch[-1] + start_ms = int(last_candle.time.timestamp() * 1000) + 1 + + # Small delay to avoid rate limiting + await asyncio.sleep(0.1) + + except Exception as e: + logger.error(f"Error fetching {interval} candles: {e}") + break + + logger.info(f"Backfill complete for {interval}: {len(all_candles)} candles total") + return all_candles + + async def _fetch_batch( + self, + interval: str, + start_ms: int, + end_ms: int + ) -> List[Candle]: + """Fetch a single batch of candles from the API""" + if not self.session: + raise RuntimeError("Session not initialized. Use async context manager.") + + payload = { + "type": "candleSnapshot", + "req": { + "coin": self.coin, + "interval": interval, + "startTime": start_ms, + "endTime": end_ms + } + } + + async with self.session.post(self.API_URL, json=payload) as response: + if response.status != 200: + text = await response.text() + raise Exception(f"API error {response.status}: {text}") + + data = await response.json() + + if not isinstance(data, list): + logger.warning(f"Unexpected response format: {data}") + return [] + + candles = [] + for item in data: + try: + candle = self._parse_candle_item(item, interval) + if candle: + candles.append(candle) + except Exception as e: + logger.warning(f"Failed to parse candle: {item}, error: {e}") + + return candles + + def _parse_candle_item(self, data: Dict[str, Any], interval: str) -> Optional[Candle]: + """Parse a single candle item from API response""" + try: + # Format: {"t": 1770812400000, "T": ..., "s": "BTC", "i": "1m", "o": "67164.0", ...} + timestamp_ms = int(data.get("t", 0)) + timestamp = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) + + return Candle( + time=timestamp, + symbol=self.coin, + interval=interval, + open=float(data.get("o", 0)), + high=float(data.get("h", 0)), + low=float(data.get("l", 0)), + close=float(data.get("c", 0)), + volume=float(data.get("v", 0)) + ) + except (KeyError, ValueError, TypeError) as e: + logger.error(f"Failed to parse candle data: {e}, data: {data}") + return None + + async def backfill_interval( + self, + interval: str, + days_back: int = 7 + ) -> int: + """ + Backfill a specific interval for the last N days + + Args: + interval: Candle interval + days_back: Number of days to backfill (use 0 for max available) + + Returns: + Number of candles inserted + """ + if days_back == 0: + # Fetch maximum available data (5000 candles) + return await self.backfill_max(interval) + + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=days_back) + + logger.info(f"Starting backfill for {interval}: {start_time} to {end_time}") + + candles = await self.fetch_candles(interval, start_time, end_time) + + if not candles: + logger.warning(f"No candles fetched for {interval}") + return 0 + + # Insert into database + inserted = await self.db.insert_candles(candles) + logger.info(f"Inserted {inserted} candles for {interval}") + + return inserted + + async def backfill_max(self, interval: str) -> int: + """ + Backfill maximum available data (5000 candles) for an interval + + Args: + interval: Candle interval + + Returns: + Number of candles inserted + """ + logger.info(f"Fetching maximum available {interval} data (up to 5000 candles)") + + # For weekly and monthly, start from 2020 to ensure we get all available data + # Hyperliquid launched around 2023, so this should capture everything + start_time = datetime(2020, 1, 1, tzinfo=timezone.utc) + end_time = datetime.now(timezone.utc) + + logger.info(f"Fetching {interval} candles from {start_time} to {end_time}") + + candles = await self.fetch_candles(interval, start_time, end_time) + + if not candles: + logger.warning(f"No candles fetched for {interval}") + return 0 + + # Insert into database + inserted = await self.db.insert_candles(candles) + logger.info(f"Inserted {inserted} candles for {interval} (max available)") + + return inserted + + def _interval_to_minutes(self, interval: str) -> int: + """Convert interval string to minutes""" + mapping = { + "1m": 1, "3m": 3, "5m": 5, "15m": 15, "30m": 30, + "1h": 60, "2h": 120, "4h": 240, "8h": 480, "12h": 720, + "1d": 1440, "3d": 4320, "1w": 10080, "1M": 43200 + } + return mapping.get(interval, 1) + + async def backfill_all_intervals( + self, + days_back: int = 7 + ) -> Dict[str, int]: + """ + Backfill all configured intervals + + Args: + days_back: Number of days to backfill + + Returns: + Dictionary mapping interval to count inserted + """ + results = {} + + for interval in self.intervals: + try: + count = await self.backfill_interval(interval, days_back) + results[interval] = count + except Exception as e: + logger.error(f"Failed to backfill {interval}: {e}") + results[interval] = 0 + + return results + + async def get_earliest_candle_time(self, interval: str) -> Optional[datetime]: + """Get the earliest candle time available for an interval""" + # Try fetching from epoch to find earliest available + start_time = datetime(2020, 1, 1, tzinfo=timezone.utc) + end_time = datetime.now(timezone.utc) + + candles = await self.fetch_candles(interval, start_time, end_time) + + if candles: + earliest = min(c.time for c in candles) + logger.info(f"Earliest {interval} candle available: {earliest}") + return earliest + return None + + +async def main(): + """CLI entry point for backfill""" + import argparse + import os + + parser = argparse.ArgumentParser(description="Backfill Hyperliquid historical data") + parser.add_argument("--coin", default="BTC", help="Coin symbol (default: BTC)") + parser.add_argument("--intervals", nargs="+", default=["1m"], + help="Intervals to backfill (default: 1m)") + parser.add_argument("--days", type=str, default="7", + help="Days to backfill (default: 7, use 'max' for maximum available)") + parser.add_argument("--db-host", default=os.getenv("DB_HOST", "localhost"), + help="Database host (default: localhost or DB_HOST env)") + parser.add_argument("--db-port", type=int, default=int(os.getenv("DB_PORT", 5432)), + help="Database port (default: 5432 or DB_PORT env)") + parser.add_argument("--db-name", default=os.getenv("DB_NAME", "btc_data"), + help="Database name (default: btc_data or DB_NAME env)") + parser.add_argument("--db-user", default=os.getenv("DB_USER", "btc_bot"), + help="Database user (default: btc_bot or DB_USER env)") + parser.add_argument("--db-password", default=os.getenv("DB_PASSWORD", ""), + help="Database password (default: from DB_PASSWORD env)") + + args = parser.parse_args() + + # Parse days argument + if args.days.lower() == "max": + days_back = 0 # 0 means max available + logger.info("Backfill mode: MAX (fetching up to 5000 candles per interval)") + else: + days_back = int(args.days) + logger.info(f"Backfill mode: Last {days_back} days") + + # Setup logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + # Initialize database + db = DatabaseManager( + host=args.db_host, + port=args.db_port, + database=args.db_name, + user=args.db_user, + password=args.db_password + ) + + await db.connect() + + try: + async with HyperliquidBackfill(db, args.coin, args.intervals) as backfill: + results = await backfill.backfill_all_intervals(days_back) + + print("\n=== Backfill Summary ===") + for interval, count in results.items(): + print(f"{interval}: {count} candles") + print(f"Total: {sum(results.values())} candles") + + finally: + await db.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/data_collector/candle_buffer.py b/src/data_collector/candle_buffer.py new file mode 100644 index 0000000..811d32d --- /dev/null +++ b/src/data_collector/candle_buffer.py @@ -0,0 +1,224 @@ +""" +In-memory candle buffer with automatic batching +Optimized for low memory footprint on DS218+ +""" + +import asyncio +import logging +from collections import deque +from datetime import datetime, timezone +from typing import Dict, List, Optional, Callable, Any, Awaitable +from dataclasses import dataclass, field + +from .websocket_client import Candle + + +logger = logging.getLogger(__name__) + + +@dataclass +class BufferStats: + """Statistics for buffer performance monitoring""" + total_added: int = 0 + total_flushed: int = 0 + total_dropped: int = 0 + last_flush_time: Optional[datetime] = None + avg_batch_size: float = 0.0 + + def to_dict(self) -> Dict[str, Any]: + return { + 'total_added': self.total_added, + 'total_flushed': self.total_flushed, + 'total_dropped': self.total_dropped, + 'last_flush_time': self.last_flush_time.isoformat() if self.last_flush_time else None, + 'avg_batch_size': round(self.avg_batch_size, 2) + } + + +class CandleBuffer: + """ + Thread-safe circular buffer for candle data + Automatically flushes to database in batches + """ + + def __init__( + self, + max_size: int = 1000, + flush_interval_seconds: float = 30.0, + batch_size: int = 100, + on_flush_callback: Optional[Callable[[List[Candle]], Awaitable[None]]] = None + ): + self.max_size = max_size + self.flush_interval = flush_interval_seconds + self.batch_size = batch_size + self.on_flush = on_flush_callback + + # Thread-safe buffer using deque + self._buffer: deque = deque(maxlen=max_size) + self._lock = asyncio.Lock() + self._flush_event = asyncio.Event() + self._stop_event = asyncio.Event() + + self.stats = BufferStats() + self._batch_sizes: deque = deque(maxlen=100) # For averaging + + # Tasks + self._flush_task: Optional[asyncio.Task] = None + + async def start(self) -> None: + """Start the background flush task""" + self._flush_task = asyncio.create_task(self._flush_loop()) + logger.info(f"CandleBuffer started (max_size={self.max_size}, flush_interval={self.flush_interval}s)") + + async def stop(self) -> None: + """Stop the buffer and flush remaining data""" + self._stop_event.set() + self._flush_event.set() # Wake up flush loop + + if self._flush_task: + try: + await asyncio.wait_for(self._flush_task, timeout=10.0) + except asyncio.TimeoutError: + logger.warning("Flush task did not stop in time") + + # Final flush + await self.flush() + logger.info("CandleBuffer stopped") + + async def add(self, candle: Candle) -> bool: + """ + Add a candle to the buffer + Returns True if added, False if buffer full and candle dropped + """ + async with self._lock: + if len(self._buffer) >= self.max_size: + logger.warning(f"Buffer full, dropping oldest candle. Size: {len(self._buffer)}") + self.stats.total_dropped += 1 + + self._buffer.append(candle) + self.stats.total_added += 1 + + # Trigger immediate flush if batch size reached + if len(self._buffer) >= self.batch_size: + self._flush_event.set() + + return True + + async def add_many(self, candles: List[Candle]) -> int: + """Add multiple candles to the buffer""" + added = 0 + for candle in candles: + if await self.add(candle): + added += 1 + return added + + async def get_batch(self, n: Optional[int] = None) -> List[Candle]: + """Get up to N candles from buffer (without removing)""" + async with self._lock: + n = n or len(self._buffer) + return list(self._buffer)[:n] + + async def flush(self) -> int: + """ + Manually flush buffer to callback + Returns number of candles flushed + """ + candles_to_flush: List[Candle] = [] + + async with self._lock: + if not self._buffer: + return 0 + + candles_to_flush = list(self._buffer) + self._buffer.clear() + + if candles_to_flush and self.on_flush: + try: + await self.on_flush(candles_to_flush) + + # Update stats + self.stats.total_flushed += len(candles_to_flush) + self.stats.last_flush_time = datetime.now(timezone.utc) + self._batch_sizes.append(len(candles_to_flush)) + self.stats.avg_batch_size = sum(self._batch_sizes) / len(self._batch_sizes) + + logger.debug(f"Flushed {len(candles_to_flush)} candles") + return len(candles_to_flush) + + except Exception as e: + logger.error(f"Flush callback failed: {e}") + # Put candles back in buffer + async with self._lock: + for candle in reversed(candles_to_flush): + self._buffer.appendleft(candle) + return 0 + elif candles_to_flush: + # No callback, just clear + self.stats.total_flushed += len(candles_to_flush) + return len(candles_to_flush) + + return 0 + + async def _flush_loop(self) -> None: + """Background task to periodically flush buffer""" + while not self._stop_event.is_set(): + try: + # Wait for flush interval or until triggered + await asyncio.wait_for( + self._flush_event.wait(), + timeout=self.flush_interval + ) + self._flush_event.clear() + + # Flush if we have data + buffer_size = await self.get_buffer_size() + if buffer_size > 0: + await self.flush() + + except asyncio.TimeoutError: + # Flush interval reached, flush if we have data + buffer_size = await self.get_buffer_size() + if buffer_size > 0: + await self.flush() + + except Exception as e: + logger.error(f"Error in flush loop: {e}") + await asyncio.sleep(1) + + def get_stats(self) -> BufferStats: + """Get current buffer statistics""" + return self.stats + + async def get_buffer_size(self) -> int: + """Get current buffer size""" + async with self._lock: + return len(self._buffer) + + def detect_gaps(self, candles: List[Candle]) -> List[Dict[str, Any]]: + """ + Detect gaps in candle sequence + Returns list of gap information + """ + if len(candles) < 2: + return [] + + gaps = [] + sorted_candles = sorted(candles, key=lambda c: c.time) + + for i in range(1, len(sorted_candles)): + prev = sorted_candles[i-1] + curr = sorted_candles[i] + + # Calculate expected interval (1 minute) + expected_diff = 60 # seconds + actual_diff = (curr.time - prev.time).total_seconds() + + if actual_diff > expected_diff * 1.5: # Allow 50% tolerance + gaps.append({ + 'from_time': prev.time.isoformat(), + 'to_time': curr.time.isoformat(), + 'missing_candles': int(actual_diff / expected_diff) - 1, + 'duration_seconds': actual_diff + }) + + return gaps \ No newline at end of file diff --git a/src/data_collector/database.py b/src/data_collector/database.py new file mode 100644 index 0000000..5a575c0 --- /dev/null +++ b/src/data_collector/database.py @@ -0,0 +1,255 @@ +""" +Database interface for TimescaleDB +Optimized for batch inserts and low resource usage +""" + +import logging +from contextlib import asynccontextmanager +from datetime import datetime +from typing import List, Dict, Any, Optional +import os + +import asyncpg +from asyncpg import Pool + +from .websocket_client import Candle + + +logger = logging.getLogger(__name__) + + +class DatabaseManager: + """Manages TimescaleDB connections and operations""" + + def __init__( + self, + host: str = None, + port: int = None, + database: str = None, + user: str = None, + password: str = None, + pool_size: int = 5 + ): + self.host = host or os.getenv('DB_HOST', 'localhost') + self.port = port or int(os.getenv('DB_PORT', 5432)) + self.database = database or os.getenv('DB_NAME', 'btc_data') + self.user = user or os.getenv('DB_USER', 'btc_bot') + self.password = password or os.getenv('DB_PASSWORD', '') + self.pool_size = pool_size + + self.pool: Optional[Pool] = None + + async def connect(self) -> None: + """Initialize connection pool""" + try: + self.pool = await asyncpg.create_pool( + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.password, + min_size=1, + max_size=self.pool_size, + command_timeout=60 + ) + + # Test connection + async with self.pool.acquire() as conn: + version = await conn.fetchval('SELECT version()') + logger.info(f"Connected to database: {version[:50]}...") + + logger.info(f"Database pool created (size: {self.pool_size})") + + except Exception as e: + logger.error(f"Failed to connect to database: {type(e).__name__}: {e!r}") + raise + + async def disconnect(self) -> None: + """Close connection pool""" + if self.pool: + await self.pool.close() + logger.info("Database pool closed") + + @asynccontextmanager + async def acquire(self): + """Context manager for acquiring connection""" + if not self.pool: + raise RuntimeError("Database not connected") + async with self.pool.acquire() as conn: + yield conn + + async def insert_candles(self, candles: List[Candle]) -> int: + """ + Batch insert candles into database + Uses ON CONFLICT to handle duplicates + """ + if not candles: + return 0 + + # Prepare values for batch insert + values = [ + ( + c.time, + c.symbol, + c.interval, + c.open, + c.high, + c.low, + c.close, + c.volume, + False, # validated + 'hyperliquid' # source + ) + for c in candles + ] + + async with self.acquire() as conn: + # Use execute_many for efficient batch insert + result = await conn.executemany(''' + INSERT INTO candles (time, symbol, interval, open, high, low, close, volume, validated, source) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (time, symbol, interval) + DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + source = EXCLUDED.source + ''', values) + + inserted = len(candles) + logger.debug(f"Inserted/updated {inserted} candles") + return inserted + + async def get_candles( + self, + symbol: str, + interval: str, + start: Optional[datetime] = None, + end: Optional[datetime] = None, + limit: int = 1000 + ) -> List[Dict[str, Any]]: + """Query candles from database""" + query = ''' + SELECT time, symbol, interval, open, high, low, close, volume, validated + FROM candles + WHERE symbol = $1 AND interval = $2 + ''' + params = [symbol, interval] + + if start: + query += ' AND time >= $3' + params.append(start) + + if end: + query += f' AND time <= ${len(params) + 1}' + params.append(end) + + query += f' ORDER BY time DESC LIMIT ${len(params) + 1}' + params.append(limit) + + async with self.acquire() as conn: + rows = await conn.fetch(query, *params) + return [dict(row) for row in rows] + + async def get_latest_candle(self, symbol: str, interval: str) -> Optional[Dict[str, Any]]: + """Get the most recent candle for a symbol""" + async with self.acquire() as conn: + row = await conn.fetchrow(''' + SELECT time, symbol, interval, open, high, low, close, volume + FROM candles + WHERE symbol = $1 AND interval = $2 + ORDER BY time DESC + LIMIT 1 + ''', symbol, interval) + + return dict(row) if row else None + + async def detect_gaps( + self, + symbol: str, + interval: str, + since: Optional[datetime] = None + ) -> List[Dict[str, Any]]: + """ + Detect missing candles in the database + Uses SQL window functions for efficiency + """ + since = since or datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) + + async with self.acquire() as conn: + # Find gaps using lead/lag window functions + rows = await conn.fetch(''' + WITH ordered AS ( + SELECT + time, + LAG(time) OVER (ORDER BY time) as prev_time + FROM candles + WHERE symbol = $1 + AND interval = $2 + AND time >= $3 + ORDER BY time + ) + SELECT + prev_time as gap_start, + time as gap_end, + EXTRACT(EPOCH FROM (time - prev_time)) / 60 - 1 as missing_candles + FROM ordered + WHERE time - prev_time > INTERVAL '2 minutes' + ORDER BY prev_time + ''', symbol, interval, since) + + return [ + { + 'gap_start': row['gap_start'].isoformat(), + 'gap_end': row['gap_end'].isoformat(), + 'missing_candles': int(row['missing_candles']) + } + for row in rows + ] + + async def log_quality_issue( + self, + check_type: str, + severity: str, + symbol: Optional[str] = None, + details: Optional[Dict[str, Any]] = None + ) -> None: + """Log a data quality issue""" + async with self.acquire() as conn: + await conn.execute(''' + INSERT INTO data_quality (check_type, severity, symbol, details) + VALUES ($1, $2, $3, $4) + ''', check_type, severity, symbol, details) + + logger.warning(f"Quality issue logged: {check_type} ({severity})") + + async def get_health_stats(self) -> Dict[str, Any]: + """Get database health statistics""" + async with self.acquire() as conn: + # Get table sizes + table_stats = await conn.fetch(''' + SELECT + relname as table_name, + pg_size_pretty(pg_total_relation_size(relid)) as size, + n_live_tup as row_count + FROM pg_stat_user_tables + WHERE relname IN ('candles', 'indicators', 'data_quality') + ''') + + # Get latest candles + latest = await conn.fetch(''' + SELECT symbol, MAX(time) as last_time, COUNT(*) as count + FROM candles + WHERE time > NOW() - INTERVAL '24 hours' + GROUP BY symbol + ''') + + return { + 'tables': [dict(row) for row in table_stats], + 'latest_candles': [dict(row) for row in latest], + 'unresolved_issues': await conn.fetchval(''' + SELECT COUNT(*) FROM data_quality WHERE resolved = FALSE + ''') + } \ No newline at end of file diff --git a/src/data_collector/main.py b/src/data_collector/main.py new file mode 100644 index 0000000..52de0ec --- /dev/null +++ b/src/data_collector/main.py @@ -0,0 +1,236 @@ +""" +Main entry point for data collector service +Integrates WebSocket client, buffer, and database +""" + +import asyncio +import logging +import signal +import sys +from datetime import datetime, timezone +from typing import Optional +import os + +from .websocket_client import HyperliquidWebSocket, Candle +from .candle_buffer import CandleBuffer +from .database import DatabaseManager + + +# Configure logging +logging.basicConfig( + level=getattr(logging, os.getenv('LOG_LEVEL', 'INFO')), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler('/app/logs/collector.log') if os.path.exists('/app/logs') else logging.StreamHandler() + ] +) + +logger = logging.getLogger(__name__) + + +class DataCollector: + """ + Main data collection orchestrator + Manages WebSocket connection, buffering, and database writes + """ + + def __init__( + self, + symbol: str = "BTC", + interval: str = "1m" + ): + self.symbol = symbol + self.interval = interval + + # Components + self.db: Optional[DatabaseManager] = None + self.buffer: Optional[CandleBuffer] = None + self.websocket: Optional[HyperliquidWebSocket] = None + + # State + self.is_running = False + self._stop_event = asyncio.Event() + self._tasks = [] + + async def start(self) -> None: + """Initialize and start all components""" + logger.info(f"Starting DataCollector for {self.symbol}") + + try: + # Initialize database + self.db = DatabaseManager() + await self.db.connect() + + # Initialize buffer + self.buffer = CandleBuffer( + max_size=1000, + flush_interval_seconds=30, + batch_size=100, + on_flush_callback=self._on_buffer_flush + ) + await self.buffer.start() + + # Initialize WebSocket client + self.websocket = HyperliquidWebSocket( + symbol=self.symbol, + interval=self.interval, + on_candle_callback=self._on_candle, + on_error_callback=self._on_error + ) + + # Setup signal handlers + self._setup_signal_handlers() + + # Connect to WebSocket + await self.websocket.connect() + + # Start main loops + self.is_running = True + self._tasks = [ + asyncio.create_task(self.websocket.receive_loop()), + asyncio.create_task(self._health_check_loop()), + asyncio.create_task(self._monitoring_loop()) + ] + + logger.info("DataCollector started successfully") + + # Wait for stop signal + await self._stop_event.wait() + + except Exception as e: + logger.error(f"Failed to start DataCollector: {type(e).__name__}: {e!r}") + raise + finally: + await self.stop() + + async def stop(self) -> None: + """Graceful shutdown""" + if not self.is_running: + return + + logger.info("Stopping DataCollector...") + self.is_running = False + self._stop_event.set() + + # Cancel tasks + for task in self._tasks: + if not task.done(): + task.cancel() + + # Wait for tasks to complete + if self._tasks: + await asyncio.gather(*self._tasks, return_exceptions=True) + + # Stop components + if self.websocket: + await self.websocket.disconnect() + + if self.buffer: + await self.buffer.stop() + + if self.db: + await self.db.disconnect() + + logger.info("DataCollector stopped") + + async def _on_candle(self, candle: Candle) -> None: + """Handle incoming candle from WebSocket""" + try: + # Add to buffer + await self.buffer.add(candle) + logger.debug(f"Received candle: {candle.time} - Close: {candle.close}") + except Exception as e: + logger.error(f"Error processing candle: {e}") + + async def _on_buffer_flush(self, candles: list) -> None: + """Handle buffer flush - write to database""" + try: + inserted = await self.db.insert_candles(candles) + logger.info(f"Flushed {inserted} candles to database") + except Exception as e: + logger.error(f"Failed to write candles to database: {e}") + raise # Re-raise to trigger buffer retry + + async def _on_error(self, error: Exception) -> None: + """Handle WebSocket errors""" + logger.error(f"WebSocket error: {error}") + # Could implement alerting here (Telegram, etc.) + + async def _health_check_loop(self) -> None: + """Periodic health checks""" + while self.is_running: + try: + await asyncio.sleep(60) # Check every minute + + if not self.is_running: + break + + # Check WebSocket health + health = self.websocket.get_connection_health() + + if health['seconds_since_last_message'] and health['seconds_since_last_message'] > 120: + logger.warning("No messages received for 2+ minutes") + # Could trigger reconnection or alert + + # Log stats + buffer_stats = self.buffer.get_stats() + logger.info(f"Health: {health}, Buffer: {buffer_stats.to_dict()}") + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in health check: {e}") + + async def _monitoring_loop(self) -> None: + """Periodic monitoring and maintenance tasks""" + while self.is_running: + try: + await asyncio.sleep(300) # Every 5 minutes + + if not self.is_running: + break + + # Detect gaps + gaps = await self.db.detect_gaps(self.symbol, self.interval) + if gaps: + logger.warning(f"Detected {len(gaps)} data gaps: {gaps}") + # Could trigger backfill here + + # Log database health + health = await self.db.get_health_stats() + logger.info(f"Database health: {health}") + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in monitoring loop: {e}") + + def _setup_signal_handlers(self) -> None: + """Setup handlers for graceful shutdown""" + def signal_handler(sig, frame): + logger.info(f"Received signal {sig}, shutting down...") + asyncio.create_task(self.stop()) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + +async def main(): + """Main entry point""" + collector = DataCollector( + symbol="BTC", + interval="1m" + ) + + try: + await collector.start() + except KeyboardInterrupt: + logger.info("Interrupted by user") + except Exception as e: + logger.error(f"Fatal error: {type(e).__name__}: {e!r}") + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/data_collector/websocket_client.py b/src/data_collector/websocket_client.py new file mode 100644 index 0000000..253e6cf --- /dev/null +++ b/src/data_collector/websocket_client.py @@ -0,0 +1,300 @@ +""" +Hyperliquid WebSocket Client for cbBTC Data Collection +Optimized for Synology DS218+ with automatic reconnection +""" + +import asyncio +import json +import logging +from datetime import datetime, timezone +from typing import Optional, Dict, Any, Callable, Awaitable, List +from dataclasses import dataclass +import websockets +from websockets.exceptions import ConnectionClosed, InvalidStatusCode +from websockets.typing import Data + + +logger = logging.getLogger(__name__) + + +@dataclass +class Candle: + """Represents a single candlestick""" + time: datetime + symbol: str + interval: str + open: float + high: float + low: float + close: float + volume: float + + def to_dict(self) -> Dict[str, Any]: + return { + 'time': self.time, + 'symbol': self.symbol, + 'interval': self.interval, + 'open': self.open, + 'high': self.high, + 'low': self.low, + 'close': self.close, + 'volume': self.volume + } + + +class HyperliquidWebSocket: + """ + WebSocket client for Hyperliquid exchange + Handles connection, reconnection, and candle data parsing + """ + + def __init__( + self, + symbol: str = "BTC", + interval: str = "1m", + url: str = "wss://api.hyperliquid.xyz/ws", + reconnect_delays: Optional[List[int]] = None, + on_candle_callback: Optional[Callable[[Candle], Awaitable[None]]] = None, + on_error_callback: Optional[Callable[[Exception], Awaitable[None]]] = None + ): + self.symbol = symbol + self.interval = interval + self.url = url + self.reconnect_delays = reconnect_delays or [1, 2, 5, 10, 30, 60, 120, 300, 600, 900] + self.on_candle = on_candle_callback + self.on_error = on_error_callback + + self.websocket: Optional[websockets.WebSocketClientProtocol] = None + self.is_running = False + self.reconnect_count = 0 + self.last_message_time: Optional[datetime] = None + self.last_candle_time: Optional[datetime] = None + self._should_stop = False + + async def connect(self) -> None: + """Establish WebSocket connection with subscription""" + try: + logger.info(f"Connecting to Hyperliquid WebSocket: {self.url}") + + self.websocket = await websockets.connect( + self.url, + ping_interval=None, + ping_timeout=None, + close_timeout=10 + ) + + # Subscribe to candle data + subscribe_msg = { + "method": "subscribe", + "subscription": { + "type": "candle", + "coin": self.symbol, + "interval": self.interval + } + } + + await self.websocket.send(json.dumps(subscribe_msg)) + response = await self.websocket.recv() + logger.info(f"Subscription response: {response}") + + self.reconnect_count = 0 + self.is_running = True + logger.info(f"Successfully connected and subscribed to {self.symbol} {self.interval} candles") + + except Exception as e: + logger.error(f"Failed to connect: {e}") + raise + + async def disconnect(self) -> None: + """Gracefully close connection""" + self._should_stop = True + self.is_running = False + if self.websocket: + try: + await self.websocket.close() + logger.info("WebSocket connection closed") + except Exception as e: + logger.warning(f"Error closing WebSocket: {e}") + + async def receive_loop(self) -> None: + """Main message receiving loop""" + while self.is_running and not self._should_stop: + try: + if not self.websocket: + raise ConnectionClosed(None, None) + + message = await self.websocket.recv() + self.last_message_time = datetime.now(timezone.utc) + + await self._handle_message(message) + + except ConnectionClosed as e: + if self._should_stop: + break + logger.warning(f"WebSocket connection closed: {e}") + await self._handle_reconnect() + + except Exception as e: + logger.error(f"Error in receive loop: {e}") + if self.on_error: + await self.on_error(e) + await asyncio.sleep(1) + + async def _handle_message(self, message: Data) -> None: + """Parse and process incoming WebSocket message""" + try: + # Convert bytes to string if necessary + if isinstance(message, bytes): + message = message.decode('utf-8') + + data = json.loads(message) + + # Handle subscription confirmation + if data.get("channel") == "subscriptionResponse": + logger.info(f"Subscription confirmed: {data}") + return + + # Handle candle data + if data.get("channel") == "candle": + candle_data = data.get("data", {}) + if candle_data: + candle = self._parse_candle(candle_data) + if candle: + self.last_candle_time = candle.time + if self.on_candle: + await self.on_candle(candle) + + # Handle ping/pong + if "ping" in data and self.websocket: + await self.websocket.send(json.dumps({"pong": data["ping"]})) + + except json.JSONDecodeError as e: + logger.error(f"Failed to parse message: {e}") + except Exception as e: + logger.error(f"Error handling message: {e}") + + def _parse_candle(self, data: Any) -> Optional[Candle]: + """Parse candle data from WebSocket message""" + try: + # Hyperliquid candle format: [open, high, low, close, volume, timestamp] + if isinstance(data, list) and len(data) >= 6: + open_price = float(data[0]) + high = float(data[1]) + low = float(data[2]) + close = float(data[3]) + volume = float(data[4]) + timestamp_ms = int(data[5]) + elif isinstance(data, dict): + # New format: {'t': 1770812400000, 'T': ..., 's': 'BTC', 'i': '1m', 'o': '67164.0', 'c': ..., 'h': ..., 'l': ..., 'v': ..., 'n': ...} + if 't' in data and 'o' in data: + open_price = float(data.get("o", 0)) + high = float(data.get("h", 0)) + low = float(data.get("l", 0)) + close = float(data.get("c", 0)) + volume = float(data.get("v", 0)) + timestamp_ms = int(data.get("t", 0)) + else: + # Old format fallback + open_price = float(data.get("open", 0)) + high = float(data.get("high", 0)) + low = float(data.get("low", 0)) + close = float(data.get("close", 0)) + volume = float(data.get("volume", 0)) + timestamp_ms = int(data.get("time", 0)) + else: + logger.warning(f"Unknown candle format: {data}") + return None + + timestamp = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) + + return Candle( + time=timestamp, + symbol=self.symbol, + interval=self.interval, + open=open_price, + high=high, + low=low, + close=close, + volume=volume + ) + + except (KeyError, ValueError, TypeError) as e: + logger.error(f"Failed to parse candle data: {e}, data: {data}") + return None + + async def _handle_reconnect(self) -> None: + """Handle reconnection with exponential backoff""" + if self._should_stop: + return + + if self.reconnect_count >= len(self.reconnect_delays): + logger.error("Max reconnection attempts reached") + self.is_running = False + if self.on_error: + await self.on_error(Exception("Max reconnection attempts reached")) + return + + delay = self.reconnect_delays[self.reconnect_count] + self.reconnect_count += 1 + + logger.info(f"Reconnecting in {delay} seconds (attempt {self.reconnect_count})...") + await asyncio.sleep(delay) + + try: + await self.connect() + except Exception as e: + logger.error(f"Reconnection failed: {e}") + + def get_connection_health(self) -> Dict[str, Any]: + """Return connection health metrics""" + now = datetime.now(timezone.utc) + return { + "is_connected": self.websocket is not None and self.is_running, + "is_running": self.is_running, + "reconnect_count": self.reconnect_count, + "last_message_time": self.last_message_time.isoformat() if self.last_message_time else None, + "last_candle_time": self.last_candle_time.isoformat() if self.last_candle_time else None, + "seconds_since_last_message": (now - self.last_message_time).total_seconds() if self.last_message_time else None + } + + +async def test_websocket(): + """Test function for WebSocket client""" + candles_received = [] + stop_event = asyncio.Event() + + async def on_candle(candle: Candle): + candles_received.append(candle) + print(f"Candle: {candle.time} - O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}") + if len(candles_received) >= 5: + print("Received 5 candles, stopping...") + stop_event.set() + + client = HyperliquidWebSocket( + symbol="cbBTC-PERP", + interval="1m", + on_candle_callback=on_candle + ) + + try: + await client.connect() + # Run receive loop in background + receive_task = asyncio.create_task(client.receive_loop()) + # Wait for stop event + await stop_event.wait() + await client.disconnect() + await receive_task + except KeyboardInterrupt: + print("\nStopping...") + finally: + await client.disconnect() + print(f"Total candles received: {len(candles_received)}") + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + asyncio.run(test_websocket()) \ No newline at end of file