diff --git a/docker/Dockerfile.api b/docker/Dockerfile.api index 4da9afb..8432790 100644 --- a/docker/Dockerfile.api +++ b/docker/Dockerfile.api @@ -11,6 +11,7 @@ RUN pip install --no-cache-dir -r requirements.txt # Copy application code COPY src/ ./src/ COPY config/ ./config/ +COPY scripts/ ./scripts/ # Set Python path ENV PYTHONPATH=/app diff --git a/docker/Dockerfile.collector b/docker/Dockerfile.collector index 89fe450..b165c20 100644 --- a/docker/Dockerfile.collector +++ b/docker/Dockerfile.collector @@ -12,6 +12,7 @@ RUN pip install --no-cache-dir -r requirements.txt # Copy application code COPY src/ ./src/ COPY config/ ./config/ +COPY scripts/ ./scripts/ # Set Python path ENV PYTHONPATH=/app diff --git a/scripts/generate_custom_timeframes.py b/scripts/generate_custom_timeframes.py new file mode 100644 index 0000000..8ad0ae9 --- /dev/null +++ b/scripts/generate_custom_timeframes.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +""" +Generate custom timeframes (37m, 148m) from historical 1m data +Run once to backfill all historical data +""" + +import asyncio +import argparse +import logging +import sys +from pathlib import Path + +# Add parent to path +sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) + +from data_collector.database import DatabaseManager +from data_collector.custom_timeframe_generator import CustomTimeframeGenerator + + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +async def main(): + parser = argparse.ArgumentParser(description='Generate custom timeframe candles') + parser.add_argument('--interval', + default='all', + help='Which interval to generate (default: all, choices: 3m, 5m, 1h, 37m, etc.)') + parser.add_argument('--batch-size', type=int, default=5000, + help='Number of source candles per batch') + parser.add_argument('--verify', action='store_true', + help='Verify integrity after generation') + + args = parser.parse_args() + + # Initialize database + db = DatabaseManager() + await db.connect() + + try: + generator = CustomTimeframeGenerator(db) + await generator.initialize() + + if not generator.first_1m_time: + logger.error("No 1m data found in database. Cannot generate custom timeframes.") + return 1 + + if args.interval == 'all': + intervals = list(generator.STANDARD_INTERVALS.keys()) + list(generator.CUSTOM_INTERVALS.keys()) + else: + intervals = [args.interval] + + for interval in intervals: + logger.info(f"=" * 60) + logger.info(f"Generating {interval} candles") + logger.info(f"=" * 60) + + # Generate historical data + count = await generator.generate_historical( + interval=interval, + batch_size=args.batch_size + ) + + logger.info(f"Generated {count} {interval} candles") + + # Verify if requested + if args.verify: + logger.info(f"Verifying {interval} integrity...") + stats = await generator.verify_integrity(interval) + logger.info(f"Stats: {stats}") + + except Exception as e: + logger.error(f"Error: {e}", exc_info=True) + return 1 + finally: + await db.disconnect() + + logger.info("Custom timeframe generation complete!") + return 0 + + +if __name__ == '__main__': + exit_code = asyncio.run(main()) + sys.exit(exit_code) diff --git a/src/api/dashboard/static/index.html b/src/api/dashboard/static/index.html index 021cd8f..d1e4c27 100644 --- a/src/api/dashboard/static/index.html +++ b/src/api/dashboard/static/index.html @@ -476,7 +476,7 @@ this.chart = null; this.candleSeries = null; this.currentInterval = '1d'; - this.intervals = ['1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '8h', '12h', '1d', '3d', '1w', '1M']; + this.intervals = ['1m', '3m', '5m', '15m', '30m', '37m', '1h', '2h', '4h', '8h', '12h', '1d', '3d', '1w', '1M']; this.allData = new Map(); this.isLoading = false; this.hasInitialLoad = false; @@ -558,8 +558,8 @@ if (e.target.tagName === 'INPUT' || e.target.tagName === 'BUTTON') return; const shortcuts = { - '1': '1m', '2': '3m', '3': '5m', '4': '15m', '5': '30m', - '6': '1h', '7': '2h', '8': '4h', '9': '8h', '0': '12h', + '1': '1m', '2': '3m', '3': '5m', '4': '15m', '5': '30m', '7': '37m', + '6': '1h', '8': '4h', '9': '8h', '0': '12h', 'd': '1d', 'D': '1d', 'w': '1w', 'W': '1w', 'm': '1M', 'M': '1M' }; diff --git a/src/data_collector/__init__.py b/src/data_collector/__init__.py index c8736ac..9eadded 100644 --- a/src/data_collector/__init__.py +++ b/src/data_collector/__init__.py @@ -3,11 +3,13 @@ from .websocket_client import HyperliquidWebSocket, Candle from .candle_buffer import CandleBuffer from .database import DatabaseManager from .backfill import HyperliquidBackfill +from .custom_timeframe_generator import CustomTimeframeGenerator __all__ = [ 'HyperliquidWebSocket', 'Candle', 'CandleBuffer', 'DatabaseManager', - 'HyperliquidBackfill' + 'HyperliquidBackfill', + 'CustomTimeframeGenerator' ] \ No newline at end of file diff --git a/src/data_collector/custom_timeframe_generator.py b/src/data_collector/custom_timeframe_generator.py new file mode 100644 index 0000000..b3e782a --- /dev/null +++ b/src/data_collector/custom_timeframe_generator.py @@ -0,0 +1,299 @@ +""" +Custom Timeframe Generator +Generates both standard and custom timeframes from 1m data +Updates "building" candles in real-time +""" + +import asyncio +import logging +import calendar +from datetime import datetime, timedelta, timezone +from typing import List, Optional, Dict, Tuple +from dataclasses import dataclass + +from .database import DatabaseManager +from .websocket_client import Candle + + +logger = logging.getLogger(__name__) + + +@dataclass +class CustomCandle(Candle): + """Extended candle with completion flag""" + is_complete: bool = True + + +class CustomTimeframeGenerator: + """ + Manages and generates multiple timeframes from 1m candles. + Standard intervals use clock-aligned boundaries. + Custom intervals use continuous bucketing from the first recorded 1m candle. + """ + + # Standard intervals (Hyperliquid supported) + STANDARD_INTERVALS = { + '3m': {'type': 'min', 'value': 3}, + '5m': {'type': 'min', 'value': 5}, + '15m': {'type': 'min', 'value': 15}, + '30m': {'type': 'min', 'value': 30}, + '1h': {'type': 'hour', 'value': 1}, + '2h': {'type': 'hour', 'value': 2}, + '4h': {'type': 'hour', 'value': 4}, + '8h': {'type': 'hour', 'value': 8}, + '12h': {'type': 'hour', 'value': 12}, + '1d': {'type': 'day', 'value': 1}, + '3d': {'type': 'day', 'value': 3}, + '1w': {'type': 'week', 'value': 1}, + '1M': {'type': 'month', 'value': 1} + } + + # Custom intervals + CUSTOM_INTERVALS = { + '37m': {'minutes': 37, 'source': '1m'}, + '148m': {'minutes': 148, 'source': '37m'} + } + + def __init__(self, db: DatabaseManager): + self.db = db + self.first_1m_time: Optional[datetime] = None + # Anchor for 3d candles (fixed date) + self.anchor_3d = datetime(2020, 1, 1, tzinfo=timezone.utc) + + async def initialize(self) -> None: + """Get first 1m timestamp for custom continuous bucketing""" + async with self.db.acquire() as conn: + first = await conn.fetchval(""" + SELECT MIN(time) + FROM candles + WHERE interval = '1m' AND symbol = 'BTC' + """) + if first: + self.first_1m_time = first + logger.info(f"TF Generator: First 1m candle at {first}") + else: + logger.warning("TF Generator: No 1m data found") + + def get_bucket_start(self, timestamp: datetime, interval: str) -> datetime: + """Calculate bucket start time for any interval""" + # Handle custom intervals + if interval in self.CUSTOM_INTERVALS: + if not self.first_1m_time: + return timestamp # Fallback if not initialized + minutes = self.CUSTOM_INTERVALS[interval]['minutes'] + delta = timestamp - self.first_1m_time + bucket_num = int(delta.total_seconds() // (minutes * 60)) + return self.first_1m_time + timedelta(minutes=bucket_num * minutes) + + # Handle standard intervals + if interval not in self.STANDARD_INTERVALS: + return timestamp + + cfg = self.STANDARD_INTERVALS[interval] + t = timestamp.replace(second=0, microsecond=0) + + if cfg['type'] == 'min': + n = cfg['value'] + return t - timedelta(minutes=t.minute % n) + elif cfg['type'] == 'hour': + n = cfg['value'] + t = t.replace(minute=0) + return t - timedelta(hours=t.hour % n) + elif cfg['type'] == 'day': + n = cfg['value'] + t = t.replace(hour=0, minute=0) + if n == 1: + return t + else: # 3d + days_since_anchor = (t - self.anchor_3d).days + return t - timedelta(days=days_since_anchor % n) + elif cfg['type'] == 'week': + t = t.replace(hour=0, minute=0) + return t - timedelta(days=t.weekday()) # Monday start + elif cfg['type'] == 'month': + return t.replace(day=1, hour=0, minute=0) + + return t + + def get_expected_1m_count(self, bucket_start: datetime, interval: str) -> int: + """Calculate expected number of 1m candles in a full bucket""" + if interval in self.CUSTOM_INTERVALS: + return self.CUSTOM_INTERVALS[interval]['minutes'] + + if interval in self.STANDARD_INTERVALS: + cfg = self.STANDARD_INTERVALS[interval] + if cfg['type'] == 'min': return cfg['value'] + if cfg['type'] == 'hour': return cfg['value'] * 60 + if cfg['type'] == 'day': return cfg['value'] * 1440 + if cfg['type'] == 'week': return 7 * 1440 + if cfg['type'] == 'month': + _, days = calendar.monthrange(bucket_start.year, bucket_start.month) + return days * 1440 + return 1 + + async def aggregate_and_upsert(self, symbol: str, interval: str, bucket_start: datetime) -> None: + """Aggregate 1m data for a specific bucket and upsert""" + bucket_end = bucket_start # Initialize + + if interval == '148m': + # Aggregate from 37m + source_interval = '37m' + expected_count = 4 + else: + source_interval = '1m' + expected_count = self.get_expected_1m_count(bucket_start, interval) + + # Calculate bucket end + if interval == '1M': + _, days = calendar.monthrange(bucket_start.year, bucket_start.month) + bucket_end = bucket_start + timedelta(days=days) + elif interval in self.STANDARD_INTERVALS: + cfg = self.STANDARD_INTERVALS[interval] + if cfg['type'] == 'min': bucket_end = bucket_start + timedelta(minutes=cfg['value']) + elif cfg['type'] == 'hour': bucket_end = bucket_start + timedelta(hours=cfg['value']) + elif cfg['type'] == 'day': bucket_end = bucket_start + timedelta(days=cfg['value']) + elif cfg['type'] == 'week': bucket_end = bucket_start + timedelta(weeks=1) + elif interval in self.CUSTOM_INTERVALS: + minutes = self.CUSTOM_INTERVALS[interval]['minutes'] + bucket_end = bucket_start + timedelta(minutes=minutes) + else: + bucket_end = bucket_start + timedelta(minutes=1) + + async with self.db.acquire() as conn: + rows = await conn.fetch(f""" + SELECT time, open, high, low, close, volume + FROM candles + WHERE symbol = $1 AND interval = $2 + AND time >= $3 AND time < $4 + ORDER BY time ASC + """, symbol, source_interval, bucket_start, bucket_end) + + if not rows: + return + + # Aggregate + is_complete = len(rows) >= expected_count + + candle = CustomCandle( + time=bucket_start, + symbol=symbol, + interval=interval, + open=float(rows[0]['open']), + high=max(float(r['high']) for r in rows), + low=min(float(r['low']) for r in rows), + close=float(rows[-1]['close']), + volume=sum(float(r['volume']) for r in rows), + is_complete=is_complete + ) + + await self._upsert_candle(candle) + + async def _upsert_candle(self, c: CustomCandle) -> None: + """Upsert a single candle""" + async with self.db.acquire() as conn: + await conn.execute(""" + INSERT INTO candles (time, symbol, interval, open, high, low, close, volume, validated) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (time, symbol, interval) DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + validated = EXCLUDED.validated, + created_at = NOW() + """, c.time, c.symbol, c.interval, c.open, c.high, c.low, c.close, c.volume, c.is_complete) + + async def update_realtime(self, new_1m_candles: List[Candle]) -> None: + """ + Update ALL timeframes (standard and custom) based on new 1m candles. + Called after 1m buffer flush. + """ + if not new_1m_candles: + return + + if not self.first_1m_time: + await self.initialize() + + symbol = new_1m_candles[0].symbol + + # 1. Update all intervals except 1m and 148m + intervals_to_update = list(self.STANDARD_INTERVALS.keys()) + ['37m'] + + tasks = [] + for interval in intervals_to_update: + bucket_start = self.get_bucket_start(new_1m_candles[-1].time, interval) + tasks.append(self.aggregate_and_upsert(symbol, interval, bucket_start)) + + await asyncio.gather(*tasks) + + # 2. Update 148m (it depends on 37m being updated first) + bucket_148m = self.get_bucket_start(new_1m_candles[-1].time, '148m') + await self.aggregate_and_upsert(symbol, '148m', bucket_148m) + + async def generate_historical(self, interval: str, batch_size: int = 5000) -> int: + """ + Force recalculation of all candles for a timeframe from 1m data. + """ + if not self.first_1m_time: + await self.initialize() + + if not self.first_1m_time: + return 0 + + config = self.CUSTOM_INTERVALS.get(interval) or {'source': '1m'} + source_interval = config.get('source', '1m') + + logger.info(f"Generating historical {interval} from {source_interval}...") + + # Get date range available in source data + async with self.db.acquire() as conn: + min_max = await conn.fetchrow(""" + SELECT MIN(time), MAX(time) FROM candles + WHERE symbol = 'BTC' AND interval = $1 + """, source_interval) + + if not min_max or not min_max[0]: + return 0 + + curr = self.get_bucket_start(min_max[0], interval) + end = min_max[1] + + total_inserted = 0 + while curr <= end: + await self.aggregate_and_upsert('BTC', interval, curr) + total_inserted += 1 + + # Advance curr + if interval == '1M': + _, days = calendar.monthrange(curr.year, curr.month) + curr += timedelta(days=days) + elif interval in self.STANDARD_INTERVALS: + cfg = self.STANDARD_INTERVALS[interval] + if cfg['type'] == 'min': curr += timedelta(minutes=cfg['value']) + elif cfg['type'] == 'hour': curr += timedelta(hours=cfg['value']) + elif cfg['type'] == 'day': curr += timedelta(days=cfg['value']) + elif cfg['type'] == 'week': curr += timedelta(weeks=1) + else: # Custom + minutes = self.CUSTOM_INTERVALS[interval]['minutes'] + curr += timedelta(minutes=minutes) + + if total_inserted % 100 == 0: + logger.info(f"Generated {total_inserted} {interval} candles...") + await asyncio.sleep(0.01) + + return total_inserted + + async def verify_integrity(self, interval: str) -> Dict: + async with self.db.acquire() as conn: + stats = await conn.fetchrow(""" + SELECT + COUNT(*) as total_candles, + MIN(time) as earliest, + MAX(time) as latest, + COUNT(*) FILTER (WHERE validated = TRUE) as complete_candles, + COUNT(*) FILTER (WHERE validated = FALSE) as incomplete_candles + FROM candles + WHERE interval = $1 AND symbol = 'BTC' + """, interval) + return dict(stats) if stats else {} diff --git a/src/data_collector/main.py b/src/data_collector/main.py index 52de0ec..b52c2f4 100644 --- a/src/data_collector/main.py +++ b/src/data_collector/main.py @@ -14,6 +14,7 @@ import os from .websocket_client import HyperliquidWebSocket, Candle from .candle_buffer import CandleBuffer from .database import DatabaseManager +from .custom_timeframe_generator import CustomTimeframeGenerator # Configure logging @@ -47,6 +48,7 @@ class DataCollector: self.db: Optional[DatabaseManager] = None self.buffer: Optional[CandleBuffer] = None self.websocket: Optional[HyperliquidWebSocket] = None + self.custom_tf_generator: Optional[CustomTimeframeGenerator] = None # State self.is_running = False @@ -62,6 +64,10 @@ class DataCollector: self.db = DatabaseManager() await self.db.connect() + # Initialize custom timeframe generator + self.custom_tf_generator = CustomTimeframeGenerator(self.db) + await self.custom_tf_generator.initialize() + # Initialize buffer self.buffer = CandleBuffer( max_size=1000, @@ -144,13 +150,29 @@ class DataCollector: logger.error(f"Error processing candle: {e}") async def _on_buffer_flush(self, candles: list) -> None: - """Handle buffer flush - write to database""" + """Handle buffer flush - write to database and update custom timeframes""" try: inserted = await self.db.insert_candles(candles) logger.info(f"Flushed {inserted} candles to database") + + # Update custom timeframes (37m, 148m) in background + if self.custom_tf_generator and inserted > 0: + asyncio.create_task( + self._update_custom_timeframes(candles), + name="custom_tf_update" + ) except Exception as e: logger.error(f"Failed to write candles to database: {e}") raise # Re-raise to trigger buffer retry + + async def _update_custom_timeframes(self, candles: list) -> None: + """Update custom timeframes in background (non-blocking)""" + try: + await self.custom_tf_generator.update_realtime(candles) + logger.debug("Custom timeframes updated") + except Exception as e: + logger.error(f"Failed to update custom timeframes: {e}") + # Don't raise - this is non-critical async def _on_error(self, error: Exception) -> None: """Handle WebSocket errors"""