""" Main entry point for data collector service Integrates WebSocket client, buffer, database, indicators, and brain """ import asyncio import logging import signal import sys from datetime import datetime, timezone from typing import Optional, List import os import yaml from .websocket_client import HyperliquidWebSocket, Candle from .candle_buffer import CandleBuffer from .database import DatabaseManager from .custom_timeframe_generator import CustomTimeframeGenerator from .indicator_engine import IndicatorEngine, IndicatorConfig from .brain import Brain from .backfill import HyperliquidBackfill # Configure logging logging.basicConfig( level=getattr(logging, os.getenv('LOG_LEVEL', 'INFO')), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('/app/logs/collector.log') if os.path.exists('/app/logs') else logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class DataCollector: """ Main data collection orchestrator Manages WebSocket connection, buffering, and database writes """ STANDARD_INTERVALS = ["1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "8h", "12h", "1d", "3d", "1w"] def __init__( self, symbol: str = "BTC", interval: str = "1m" ): self.symbol = symbol self.interval = interval # Components self.db: Optional[DatabaseManager] = None self.buffer: Optional[CandleBuffer] = None self.websocket: Optional[HyperliquidWebSocket] = None self.custom_tf_generator: Optional[CustomTimeframeGenerator] = None # State self.is_running = False self._stop_event = asyncio.Event() self._tasks = [] async def start(self) -> None: """Initialize and start all components""" logger.info(f"Starting DataCollector for {self.symbol}") try: # Initialize database self.db = DatabaseManager() await self.db.connect() # Run startup backfill for all intervals await self._startup_backfill() # Initialize custom timeframe generator self.custom_tf_generator = CustomTimeframeGenerator(self.db) await self.custom_tf_generator.initialize() # Regenerate custom timeframes after startup backfill await self._regenerate_custom_timeframes() # Initialize indicator engine # Hardcoded config for now, eventually load from yaml indicator_configs = [ IndicatorConfig("ma44", "sma", 44, ["37m", "148m", "1d"]), IndicatorConfig("ma125", "sma", 125, ["37m", "148m", "1d"]) ] self.indicator_engine = IndicatorEngine(self.db, indicator_configs) # Initialize brain self.brain = Brain(self.db, self.indicator_engine) # Initialize buffer self.buffer = CandleBuffer( max_size=1000, flush_interval_seconds=30, batch_size=100, on_flush_callback=self._on_buffer_flush ) await self.buffer.start() # Initialize WebSocket client self.websocket = HyperliquidWebSocket( symbol=self.symbol, interval=self.interval, on_candle_callback=self._on_candle, on_error_callback=self._on_error ) # Setup signal handlers self._setup_signal_handlers() # Connect to WebSocket await self.websocket.connect() # Start main loops self.is_running = True self._tasks = [ asyncio.create_task(self.websocket.receive_loop()), asyncio.create_task(self._health_check_loop()), asyncio.create_task(self._monitoring_loop()) ] logger.info("DataCollector started successfully") # Wait for stop signal await self._stop_event.wait() except Exception as e: logger.error(f"Failed to start DataCollector: {type(e).__name__}: {e!r}") raise finally: await self.stop() async def _startup_backfill(self) -> None: """ Backfill missing data on startup for all standard intervals. Uses both gap detection AND time-based backfill for robustness. """ logger.info("Running startup backfill for all intervals...") try: async with HyperliquidBackfill(self.db, self.symbol, self.STANDARD_INTERVALS) as backfill: for interval in self.STANDARD_INTERVALS: try: # First, use gap detection to find any holes gaps = await self.db.detect_gaps(self.symbol, interval) if gaps: logger.info(f"{interval}: {len(gaps)} gaps detected") for gap in gaps: gap_start = datetime.fromisoformat(gap['gap_start'].replace('Z', '+00:00')) gap_end = datetime.fromisoformat(gap['gap_end'].replace('Z', '+00:00')) logger.info(f" Filling gap: {gap_start} to {gap_end}") candles = await backfill.fetch_candles(interval, gap_start, gap_end) if candles: inserted = await self.db.insert_candles(candles) logger.info(f" Inserted {inserted} candles for gap") await asyncio.sleep(0.2) # Second, check if we're behind current time latest = await self.db.get_latest_candle(self.symbol, interval) now = datetime.now(timezone.utc) if latest: last_time = latest['time'] gap_minutes = (now - last_time).total_seconds() / 60 if gap_minutes > 2: logger.info(f"{interval}: {gap_minutes:.0f} min behind, backfilling to now...") candles = await backfill.fetch_candles(interval, last_time, now) if candles: inserted = await self.db.insert_candles(candles) logger.info(f" Inserted {inserted} candles") else: logger.info(f"{interval}: up to date") else: # No data exists, backfill last 7 days logger.info(f"{interval}: No data, backfilling 7 days...") count = await backfill.backfill_interval(interval, days_back=7) logger.info(f" Inserted {count} candles") await asyncio.sleep(0.2) except Exception as e: logger.error(f"Startup backfill failed for {interval}: {e}") import traceback logger.error(traceback.format_exc()) continue except Exception as e: logger.error(f"Startup backfill error: {e}") import traceback logger.error(traceback.format_exc()) logger.info("Startup backfill complete") async def _regenerate_custom_timeframes(self) -> None: """ Regenerate custom timeframes (37m, 148m) only from gaps. Only generates candles that are missing, not all from beginning. """ if not self.custom_tf_generator: return logger.info("Checking custom timeframes for gaps...") try: for interval in ['37m', '148m']: try: count = await self.custom_tf_generator.generate_from_gap(interval) if count > 0: logger.info(f"{interval}: Generated {count} candles") else: logger.info(f"{interval}: Up to date") except Exception as e: logger.error(f"Failed to regenerate {interval}: {e}") except Exception as e: logger.error(f"Custom timeframe regeneration error: {e}") logger.info("Custom timeframe check complete") async def stop(self) -> None: """Graceful shutdown""" if not self.is_running: return logger.info("Stopping DataCollector...") self.is_running = False self._stop_event.set() # Cancel tasks for task in self._tasks: if not task.done(): task.cancel() # Wait for tasks to complete if self._tasks: await asyncio.gather(*self._tasks, return_exceptions=True) # Stop components if self.websocket: await self.websocket.disconnect() if self.buffer: await self.buffer.stop() if self.db: await self.db.disconnect() logger.info("DataCollector stopped") async def _on_candle(self, candle: Candle) -> None: """Handle incoming candle from WebSocket""" try: # Add to buffer await self.buffer.add(candle) logger.debug(f"Received candle: {candle.time} - Close: {candle.close}") except Exception as e: logger.error(f"Error processing candle: {e}") async def _on_buffer_flush(self, candles: list) -> None: """Handle buffer flush - write to database and update custom timeframes""" try: inserted = await self.db.insert_candles(candles) logger.info(f"Flushed {inserted} candles to database") # Update custom timeframes (37m, 148m) in background if self.custom_tf_generator and inserted > 0: asyncio.create_task( self._update_custom_timeframes(candles), name="custom_tf_update" ) except Exception as e: logger.error(f"Failed to write candles to database: {e}") raise # Re-raise to trigger buffer retry async def _update_custom_timeframes(self, candles: list) -> None: """ Update custom timeframes in background, then trigger indicators/brain. This chain ensures that indicators are computed on fresh candle data, and the brain evaluates on fresh indicator data. """ try: # 1. Update custom candles (37m, 148m, etc.) await self.custom_tf_generator.update_realtime(candles) logger.debug("Custom timeframes updated") # 2. Trigger indicator updates for configured intervals # We use the timestamp of the last 1m candle as the trigger point trigger_time = candles[-1].time if self.indicator_engine: intervals = self.indicator_engine.get_configured_intervals() for interval in intervals: # Get the correct bucket start time for this interval # e.g., if trigger_time is 09:48:00, 37m bucket might start at 09:25:00 if self.custom_tf_generator: bucket_start = self.custom_tf_generator.get_bucket_start(trigger_time, interval) else: bucket_start = trigger_time # Compute indicators for this bucket raw_indicators = await self.indicator_engine.on_interval_update( self.symbol, interval, bucket_start ) # Filter out None values to satisfy type checker indicators = {k: v for k, v in raw_indicators.items() if v is not None} # 3. Evaluate brain if we have fresh indicators if self.brain and indicators: await self.brain.evaluate( self.symbol, interval, bucket_start, indicators ) except Exception as e: logger.error(f"Failed to update custom timeframes/indicators: {e}") # Don't raise - this is non-critical async def _on_error(self, error: Exception) -> None: """Handle WebSocket errors""" logger.error(f"WebSocket error: {error}") # Could implement alerting here (Telegram, etc.) async def _health_check_loop(self) -> None: """Periodic health checks""" while self.is_running: try: await asyncio.sleep(60) # Check every minute if not self.is_running: break # Check WebSocket health health = self.websocket.get_connection_health() if health['seconds_since_last_message'] and health['seconds_since_last_message'] > 120: logger.warning("No messages received for 2+ minutes") # Could trigger reconnection or alert # Log stats buffer_stats = self.buffer.get_stats() logger.info(f"Health: {health}, Buffer: {buffer_stats.to_dict()}") except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in health check: {e}") async def _monitoring_loop(self) -> None: """Periodic monitoring and maintenance tasks""" while self.is_running: try: await asyncio.sleep(300) # Every 5 minutes if not self.is_running: break # Detect gaps gaps = await self.db.detect_gaps(self.symbol, self.interval) if gaps: logger.warning(f"Detected {len(gaps)} data gaps: {gaps}") await self._backfill_gaps(gaps) # Log database health health = await self.db.get_health_stats() logger.info(f"Database health: {health}") except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in monitoring loop: {e}") async def _backfill_gaps(self, gaps: list) -> None: """Backfill detected data gaps from Hyperliquid""" if not gaps: return logger.info(f"Starting backfill for {len(gaps)} gaps...") try: async with HyperliquidBackfill(self.db, self.symbol, [self.interval]) as backfill: for gap in gaps: gap_start = datetime.fromisoformat(gap['gap_start'].replace('Z', '+00:00')) gap_end = datetime.fromisoformat(gap['gap_end'].replace('Z', '+00:00')) logger.info(f"Backfilling gap: {gap_start} to {gap_end} ({gap['missing_candles']} candles)") candles = await backfill.fetch_candles(self.interval, gap_start, gap_end) if candles: inserted = await self.db.insert_candles(candles) logger.info(f"Backfilled {inserted} candles for gap {gap_start}") # Update custom timeframes and indicators for backfilled data if inserted > 0: await self._update_custom_timeframes(candles) else: logger.warning(f"No candles available for gap {gap_start} to {gap_end}") except Exception as e: logger.error(f"Backfill failed: {e}") def _setup_signal_handlers(self) -> None: """Setup handlers for graceful shutdown""" def signal_handler(sig, frame): logger.info(f"Received signal {sig}, shutting down...") asyncio.create_task(self.stop()) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) async def main(): """Main entry point""" collector = DataCollector( symbol="BTC", interval="1m" ) try: await collector.start() except KeyboardInterrupt: logger.info("Interrupted by user") except Exception as e: logger.error(f"Fatal error: {type(e).__name__}: {e!r}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())