Initial commit - BTC Trading Dashboard
- FastAPI backend with PostgreSQL database connection - Frontend dashboard with lightweight-charts - Technical indicators (SMA, EMA, RSI, MACD, Bollinger Bands, etc.) - Trading strategy simulation and backtesting - Database connection to NAS at 20.20.20.20:5433 - Development server setup and documentation
This commit is contained in:
440
src/data_collector/main.py
Normal file
440
src/data_collector/main.py
Normal file
@ -0,0 +1,440 @@
|
||||
"""
|
||||
Main entry point for data collector service
|
||||
Integrates WebSocket client, buffer, database, indicators, and brain
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, List
|
||||
import os
|
||||
|
||||
import yaml
|
||||
|
||||
from .websocket_client import HyperliquidWebSocket, Candle
|
||||
from .candle_buffer import CandleBuffer
|
||||
from .database import DatabaseManager
|
||||
from .custom_timeframe_generator import CustomTimeframeGenerator
|
||||
from .indicator_engine import IndicatorEngine, IndicatorConfig
|
||||
from .brain import Brain
|
||||
from .backfill import HyperliquidBackfill
|
||||
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, os.getenv('LOG_LEVEL', 'INFO')),
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(sys.stdout),
|
||||
logging.FileHandler('/app/logs/collector.log') if os.path.exists('/app/logs') else logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataCollector:
|
||||
"""
|
||||
Main data collection orchestrator
|
||||
Manages WebSocket connection, buffering, and database writes
|
||||
"""
|
||||
|
||||
STANDARD_INTERVALS = ["1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "8h", "12h", "1d", "3d", "1w"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
symbol: str = "BTC",
|
||||
interval: str = "1m"
|
||||
):
|
||||
self.symbol = symbol
|
||||
self.interval = interval
|
||||
|
||||
# Components
|
||||
self.db: Optional[DatabaseManager] = None
|
||||
self.buffer: Optional[CandleBuffer] = None
|
||||
self.websocket: Optional[HyperliquidWebSocket] = None
|
||||
self.custom_tf_generator: Optional[CustomTimeframeGenerator] = None
|
||||
|
||||
# State
|
||||
self.is_running = False
|
||||
self._stop_event = asyncio.Event()
|
||||
self._tasks = []
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Initialize and start all components"""
|
||||
logger.info(f"Starting DataCollector for {self.symbol}")
|
||||
|
||||
try:
|
||||
# Initialize database
|
||||
self.db = DatabaseManager()
|
||||
await self.db.connect()
|
||||
|
||||
# Run startup backfill for all intervals
|
||||
await self._startup_backfill()
|
||||
|
||||
# Initialize custom timeframe generator
|
||||
self.custom_tf_generator = CustomTimeframeGenerator(self.db)
|
||||
await self.custom_tf_generator.initialize()
|
||||
|
||||
# Regenerate custom timeframes after startup backfill
|
||||
await self._regenerate_custom_timeframes()
|
||||
|
||||
# Initialize indicator engine
|
||||
# Hardcoded config for now, eventually load from yaml
|
||||
indicator_configs = [
|
||||
IndicatorConfig("ma44", "sma", 44, ["37m", "148m", "1d"]),
|
||||
IndicatorConfig("ma125", "sma", 125, ["37m", "148m", "1d"])
|
||||
]
|
||||
self.indicator_engine = IndicatorEngine(self.db, indicator_configs)
|
||||
|
||||
# Initialize brain
|
||||
self.brain = Brain(self.db, self.indicator_engine)
|
||||
|
||||
# Initialize buffer
|
||||
self.buffer = CandleBuffer(
|
||||
max_size=1000,
|
||||
flush_interval_seconds=30,
|
||||
batch_size=100,
|
||||
on_flush_callback=self._on_buffer_flush
|
||||
)
|
||||
await self.buffer.start()
|
||||
|
||||
# Initialize WebSocket client
|
||||
self.websocket = HyperliquidWebSocket(
|
||||
symbol=self.symbol,
|
||||
interval=self.interval,
|
||||
on_candle_callback=self._on_candle,
|
||||
on_error_callback=self._on_error
|
||||
)
|
||||
|
||||
# Setup signal handlers
|
||||
self._setup_signal_handlers()
|
||||
|
||||
# Connect to WebSocket
|
||||
await self.websocket.connect()
|
||||
|
||||
# Start main loops
|
||||
self.is_running = True
|
||||
self._tasks = [
|
||||
asyncio.create_task(self.websocket.receive_loop()),
|
||||
asyncio.create_task(self._health_check_loop()),
|
||||
asyncio.create_task(self._monitoring_loop())
|
||||
]
|
||||
|
||||
logger.info("DataCollector started successfully")
|
||||
|
||||
# Wait for stop signal
|
||||
await self._stop_event.wait()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start DataCollector: {type(e).__name__}: {e!r}")
|
||||
raise
|
||||
finally:
|
||||
await self.stop()
|
||||
|
||||
async def _startup_backfill(self) -> None:
|
||||
"""
|
||||
Backfill missing data on startup for all standard intervals.
|
||||
Uses both gap detection AND time-based backfill for robustness.
|
||||
"""
|
||||
logger.info("Running startup backfill for all intervals...")
|
||||
|
||||
try:
|
||||
async with HyperliquidBackfill(self.db, self.symbol, self.STANDARD_INTERVALS) as backfill:
|
||||
for interval in self.STANDARD_INTERVALS:
|
||||
try:
|
||||
# First, use gap detection to find any holes
|
||||
gaps = await self.db.detect_gaps(self.symbol, interval)
|
||||
|
||||
if gaps:
|
||||
logger.info(f"{interval}: {len(gaps)} gaps detected")
|
||||
for gap in gaps:
|
||||
gap_start = datetime.fromisoformat(gap['gap_start'].replace('Z', '+00:00'))
|
||||
gap_end = datetime.fromisoformat(gap['gap_end'].replace('Z', '+00:00'))
|
||||
|
||||
logger.info(f" Filling gap: {gap_start} to {gap_end}")
|
||||
candles = await backfill.fetch_candles(interval, gap_start, gap_end)
|
||||
|
||||
if candles:
|
||||
inserted = await self.db.insert_candles(candles)
|
||||
logger.info(f" Inserted {inserted} candles for gap")
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Second, check if we're behind current time
|
||||
latest = await self.db.get_latest_candle(self.symbol, interval)
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
if latest:
|
||||
last_time = latest['time']
|
||||
gap_minutes = (now - last_time).total_seconds() / 60
|
||||
|
||||
if gap_minutes > 2:
|
||||
logger.info(f"{interval}: {gap_minutes:.0f} min behind, backfilling to now...")
|
||||
candles = await backfill.fetch_candles(interval, last_time, now)
|
||||
|
||||
if candles:
|
||||
inserted = await self.db.insert_candles(candles)
|
||||
logger.info(f" Inserted {inserted} candles")
|
||||
else:
|
||||
logger.info(f"{interval}: up to date")
|
||||
else:
|
||||
# No data exists, backfill last 7 days
|
||||
logger.info(f"{interval}: No data, backfilling 7 days...")
|
||||
count = await backfill.backfill_interval(interval, days_back=7)
|
||||
logger.info(f" Inserted {count} candles")
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Startup backfill failed for {interval}: {e}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Startup backfill error: {e}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
logger.info("Startup backfill complete")
|
||||
|
||||
async def _regenerate_custom_timeframes(self) -> None:
|
||||
"""
|
||||
Regenerate custom timeframes (37m, 148m) only from gaps.
|
||||
Only generates candles that are missing, not all from beginning.
|
||||
"""
|
||||
if not self.custom_tf_generator:
|
||||
return
|
||||
|
||||
logger.info("Checking custom timeframes for gaps...")
|
||||
|
||||
try:
|
||||
for interval in ['37m', '148m']:
|
||||
try:
|
||||
count = await self.custom_tf_generator.generate_from_gap(interval)
|
||||
if count > 0:
|
||||
logger.info(f"{interval}: Generated {count} candles")
|
||||
else:
|
||||
logger.info(f"{interval}: Up to date")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to regenerate {interval}: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Custom timeframe regeneration error: {e}")
|
||||
|
||||
logger.info("Custom timeframe check complete")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Graceful shutdown"""
|
||||
if not self.is_running:
|
||||
return
|
||||
|
||||
logger.info("Stopping DataCollector...")
|
||||
self.is_running = False
|
||||
self._stop_event.set()
|
||||
|
||||
# Cancel tasks
|
||||
for task in self._tasks:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
|
||||
# Wait for tasks to complete
|
||||
if self._tasks:
|
||||
await asyncio.gather(*self._tasks, return_exceptions=True)
|
||||
|
||||
# Stop components
|
||||
if self.websocket:
|
||||
await self.websocket.disconnect()
|
||||
|
||||
if self.buffer:
|
||||
await self.buffer.stop()
|
||||
|
||||
if self.db:
|
||||
await self.db.disconnect()
|
||||
|
||||
logger.info("DataCollector stopped")
|
||||
|
||||
async def _on_candle(self, candle: Candle) -> None:
|
||||
"""Handle incoming candle from WebSocket"""
|
||||
try:
|
||||
# Add to buffer
|
||||
await self.buffer.add(candle)
|
||||
logger.debug(f"Received candle: {candle.time} - Close: {candle.close}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing candle: {e}")
|
||||
|
||||
async def _on_buffer_flush(self, candles: list) -> None:
|
||||
"""Handle buffer flush - write to database and update custom timeframes"""
|
||||
try:
|
||||
inserted = await self.db.insert_candles(candles)
|
||||
logger.info(f"Flushed {inserted} candles to database")
|
||||
|
||||
# Update custom timeframes (37m, 148m) in background
|
||||
if self.custom_tf_generator and inserted > 0:
|
||||
asyncio.create_task(
|
||||
self._update_custom_timeframes(candles),
|
||||
name="custom_tf_update"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write candles to database: {e}")
|
||||
raise # Re-raise to trigger buffer retry
|
||||
|
||||
async def _update_custom_timeframes(self, candles: list) -> None:
|
||||
"""
|
||||
Update custom timeframes in background, then trigger indicators/brain.
|
||||
|
||||
This chain ensures that indicators are computed on fresh candle data,
|
||||
and the brain evaluates on fresh indicator data.
|
||||
"""
|
||||
try:
|
||||
# 1. Update custom candles (37m, 148m, etc.)
|
||||
await self.custom_tf_generator.update_realtime(candles)
|
||||
logger.debug("Custom timeframes updated")
|
||||
|
||||
# 2. Trigger indicator updates for configured intervals
|
||||
# We use the timestamp of the last 1m candle as the trigger point
|
||||
trigger_time = candles[-1].time
|
||||
|
||||
if self.indicator_engine:
|
||||
intervals = self.indicator_engine.get_configured_intervals()
|
||||
for interval in intervals:
|
||||
# Get the correct bucket start time for this interval
|
||||
# e.g., if trigger_time is 09:48:00, 37m bucket might start at 09:25:00
|
||||
if self.custom_tf_generator:
|
||||
bucket_start = self.custom_tf_generator.get_bucket_start(trigger_time, interval)
|
||||
else:
|
||||
bucket_start = trigger_time
|
||||
|
||||
# Compute indicators for this bucket
|
||||
raw_indicators = await self.indicator_engine.on_interval_update(
|
||||
self.symbol, interval, bucket_start
|
||||
)
|
||||
|
||||
# Filter out None values to satisfy type checker
|
||||
indicators = {k: v for k, v in raw_indicators.items() if v is not None}
|
||||
|
||||
# 3. Evaluate brain if we have fresh indicators
|
||||
if self.brain and indicators:
|
||||
await self.brain.evaluate(
|
||||
self.symbol, interval, bucket_start, indicators
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update custom timeframes/indicators: {e}")
|
||||
# Don't raise - this is non-critical
|
||||
|
||||
async def _on_error(self, error: Exception) -> None:
|
||||
"""Handle WebSocket errors"""
|
||||
logger.error(f"WebSocket error: {error}")
|
||||
# Could implement alerting here (Telegram, etc.)
|
||||
|
||||
async def _health_check_loop(self) -> None:
|
||||
"""Periodic health checks"""
|
||||
while self.is_running:
|
||||
try:
|
||||
await asyncio.sleep(60) # Check every minute
|
||||
|
||||
if not self.is_running:
|
||||
break
|
||||
|
||||
# Check WebSocket health
|
||||
health = self.websocket.get_connection_health()
|
||||
|
||||
if health['seconds_since_last_message'] and health['seconds_since_last_message'] > 120:
|
||||
logger.warning("No messages received for 2+ minutes")
|
||||
# Could trigger reconnection or alert
|
||||
|
||||
# Log stats
|
||||
buffer_stats = self.buffer.get_stats()
|
||||
logger.info(f"Health: {health}, Buffer: {buffer_stats.to_dict()}")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error in health check: {e}")
|
||||
|
||||
async def _monitoring_loop(self) -> None:
|
||||
"""Periodic monitoring and maintenance tasks"""
|
||||
while self.is_running:
|
||||
try:
|
||||
await asyncio.sleep(300) # Every 5 minutes
|
||||
|
||||
if not self.is_running:
|
||||
break
|
||||
|
||||
# Detect gaps
|
||||
gaps = await self.db.detect_gaps(self.symbol, self.interval)
|
||||
if gaps:
|
||||
logger.warning(f"Detected {len(gaps)} data gaps: {gaps}")
|
||||
await self._backfill_gaps(gaps)
|
||||
|
||||
# Log database health
|
||||
health = await self.db.get_health_stats()
|
||||
logger.info(f"Database health: {health}")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error in monitoring loop: {e}")
|
||||
|
||||
async def _backfill_gaps(self, gaps: list) -> None:
|
||||
"""Backfill detected data gaps from Hyperliquid"""
|
||||
if not gaps:
|
||||
return
|
||||
|
||||
logger.info(f"Starting backfill for {len(gaps)} gaps...")
|
||||
|
||||
try:
|
||||
async with HyperliquidBackfill(self.db, self.symbol, [self.interval]) as backfill:
|
||||
for gap in gaps:
|
||||
gap_start = datetime.fromisoformat(gap['gap_start'].replace('Z', '+00:00'))
|
||||
gap_end = datetime.fromisoformat(gap['gap_end'].replace('Z', '+00:00'))
|
||||
|
||||
logger.info(f"Backfilling gap: {gap_start} to {gap_end} ({gap['missing_candles']} candles)")
|
||||
|
||||
candles = await backfill.fetch_candles(self.interval, gap_start, gap_end)
|
||||
|
||||
if candles:
|
||||
inserted = await self.db.insert_candles(candles)
|
||||
logger.info(f"Backfilled {inserted} candles for gap {gap_start}")
|
||||
|
||||
# Update custom timeframes and indicators for backfilled data
|
||||
if inserted > 0:
|
||||
await self._update_custom_timeframes(candles)
|
||||
else:
|
||||
logger.warning(f"No candles available for gap {gap_start} to {gap_end}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Backfill failed: {e}")
|
||||
|
||||
def _setup_signal_handlers(self) -> None:
|
||||
"""Setup handlers for graceful shutdown"""
|
||||
def signal_handler(sig, frame):
|
||||
logger.info(f"Received signal {sig}, shutting down...")
|
||||
asyncio.create_task(self.stop())
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main entry point"""
|
||||
collector = DataCollector(
|
||||
symbol="BTC",
|
||||
interval="1m"
|
||||
)
|
||||
|
||||
try:
|
||||
await collector.start()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Interrupted by user")
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error: {type(e).__name__}: {e!r}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user