Implement custom timeframes (37m, 148m) with real-time refresh for all intervals
This commit is contained in:
299
src/data_collector/custom_timeframe_generator.py
Normal file
299
src/data_collector/custom_timeframe_generator.py
Normal file
@ -0,0 +1,299 @@
|
||||
"""
|
||||
Custom Timeframe Generator
|
||||
Generates both standard and custom timeframes from 1m data
|
||||
Updates "building" candles in real-time
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import calendar
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import List, Optional, Dict, Tuple
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .database import DatabaseManager
|
||||
from .websocket_client import Candle
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CustomCandle(Candle):
|
||||
"""Extended candle with completion flag"""
|
||||
is_complete: bool = True
|
||||
|
||||
|
||||
class CustomTimeframeGenerator:
|
||||
"""
|
||||
Manages and generates multiple timeframes from 1m candles.
|
||||
Standard intervals use clock-aligned boundaries.
|
||||
Custom intervals use continuous bucketing from the first recorded 1m candle.
|
||||
"""
|
||||
|
||||
# Standard intervals (Hyperliquid supported)
|
||||
STANDARD_INTERVALS = {
|
||||
'3m': {'type': 'min', 'value': 3},
|
||||
'5m': {'type': 'min', 'value': 5},
|
||||
'15m': {'type': 'min', 'value': 15},
|
||||
'30m': {'type': 'min', 'value': 30},
|
||||
'1h': {'type': 'hour', 'value': 1},
|
||||
'2h': {'type': 'hour', 'value': 2},
|
||||
'4h': {'type': 'hour', 'value': 4},
|
||||
'8h': {'type': 'hour', 'value': 8},
|
||||
'12h': {'type': 'hour', 'value': 12},
|
||||
'1d': {'type': 'day', 'value': 1},
|
||||
'3d': {'type': 'day', 'value': 3},
|
||||
'1w': {'type': 'week', 'value': 1},
|
||||
'1M': {'type': 'month', 'value': 1}
|
||||
}
|
||||
|
||||
# Custom intervals
|
||||
CUSTOM_INTERVALS = {
|
||||
'37m': {'minutes': 37, 'source': '1m'},
|
||||
'148m': {'minutes': 148, 'source': '37m'}
|
||||
}
|
||||
|
||||
def __init__(self, db: DatabaseManager):
|
||||
self.db = db
|
||||
self.first_1m_time: Optional[datetime] = None
|
||||
# Anchor for 3d candles (fixed date)
|
||||
self.anchor_3d = datetime(2020, 1, 1, tzinfo=timezone.utc)
|
||||
|
||||
async def initialize(self) -> None:
|
||||
"""Get first 1m timestamp for custom continuous bucketing"""
|
||||
async with self.db.acquire() as conn:
|
||||
first = await conn.fetchval("""
|
||||
SELECT MIN(time)
|
||||
FROM candles
|
||||
WHERE interval = '1m' AND symbol = 'BTC'
|
||||
""")
|
||||
if first:
|
||||
self.first_1m_time = first
|
||||
logger.info(f"TF Generator: First 1m candle at {first}")
|
||||
else:
|
||||
logger.warning("TF Generator: No 1m data found")
|
||||
|
||||
def get_bucket_start(self, timestamp: datetime, interval: str) -> datetime:
|
||||
"""Calculate bucket start time for any interval"""
|
||||
# Handle custom intervals
|
||||
if interval in self.CUSTOM_INTERVALS:
|
||||
if not self.first_1m_time:
|
||||
return timestamp # Fallback if not initialized
|
||||
minutes = self.CUSTOM_INTERVALS[interval]['minutes']
|
||||
delta = timestamp - self.first_1m_time
|
||||
bucket_num = int(delta.total_seconds() // (minutes * 60))
|
||||
return self.first_1m_time + timedelta(minutes=bucket_num * minutes)
|
||||
|
||||
# Handle standard intervals
|
||||
if interval not in self.STANDARD_INTERVALS:
|
||||
return timestamp
|
||||
|
||||
cfg = self.STANDARD_INTERVALS[interval]
|
||||
t = timestamp.replace(second=0, microsecond=0)
|
||||
|
||||
if cfg['type'] == 'min':
|
||||
n = cfg['value']
|
||||
return t - timedelta(minutes=t.minute % n)
|
||||
elif cfg['type'] == 'hour':
|
||||
n = cfg['value']
|
||||
t = t.replace(minute=0)
|
||||
return t - timedelta(hours=t.hour % n)
|
||||
elif cfg['type'] == 'day':
|
||||
n = cfg['value']
|
||||
t = t.replace(hour=0, minute=0)
|
||||
if n == 1:
|
||||
return t
|
||||
else: # 3d
|
||||
days_since_anchor = (t - self.anchor_3d).days
|
||||
return t - timedelta(days=days_since_anchor % n)
|
||||
elif cfg['type'] == 'week':
|
||||
t = t.replace(hour=0, minute=0)
|
||||
return t - timedelta(days=t.weekday()) # Monday start
|
||||
elif cfg['type'] == 'month':
|
||||
return t.replace(day=1, hour=0, minute=0)
|
||||
|
||||
return t
|
||||
|
||||
def get_expected_1m_count(self, bucket_start: datetime, interval: str) -> int:
|
||||
"""Calculate expected number of 1m candles in a full bucket"""
|
||||
if interval in self.CUSTOM_INTERVALS:
|
||||
return self.CUSTOM_INTERVALS[interval]['minutes']
|
||||
|
||||
if interval in self.STANDARD_INTERVALS:
|
||||
cfg = self.STANDARD_INTERVALS[interval]
|
||||
if cfg['type'] == 'min': return cfg['value']
|
||||
if cfg['type'] == 'hour': return cfg['value'] * 60
|
||||
if cfg['type'] == 'day': return cfg['value'] * 1440
|
||||
if cfg['type'] == 'week': return 7 * 1440
|
||||
if cfg['type'] == 'month':
|
||||
_, days = calendar.monthrange(bucket_start.year, bucket_start.month)
|
||||
return days * 1440
|
||||
return 1
|
||||
|
||||
async def aggregate_and_upsert(self, symbol: str, interval: str, bucket_start: datetime) -> None:
|
||||
"""Aggregate 1m data for a specific bucket and upsert"""
|
||||
bucket_end = bucket_start # Initialize
|
||||
|
||||
if interval == '148m':
|
||||
# Aggregate from 37m
|
||||
source_interval = '37m'
|
||||
expected_count = 4
|
||||
else:
|
||||
source_interval = '1m'
|
||||
expected_count = self.get_expected_1m_count(bucket_start, interval)
|
||||
|
||||
# Calculate bucket end
|
||||
if interval == '1M':
|
||||
_, days = calendar.monthrange(bucket_start.year, bucket_start.month)
|
||||
bucket_end = bucket_start + timedelta(days=days)
|
||||
elif interval in self.STANDARD_INTERVALS:
|
||||
cfg = self.STANDARD_INTERVALS[interval]
|
||||
if cfg['type'] == 'min': bucket_end = bucket_start + timedelta(minutes=cfg['value'])
|
||||
elif cfg['type'] == 'hour': bucket_end = bucket_start + timedelta(hours=cfg['value'])
|
||||
elif cfg['type'] == 'day': bucket_end = bucket_start + timedelta(days=cfg['value'])
|
||||
elif cfg['type'] == 'week': bucket_end = bucket_start + timedelta(weeks=1)
|
||||
elif interval in self.CUSTOM_INTERVALS:
|
||||
minutes = self.CUSTOM_INTERVALS[interval]['minutes']
|
||||
bucket_end = bucket_start + timedelta(minutes=minutes)
|
||||
else:
|
||||
bucket_end = bucket_start + timedelta(minutes=1)
|
||||
|
||||
async with self.db.acquire() as conn:
|
||||
rows = await conn.fetch(f"""
|
||||
SELECT time, open, high, low, close, volume
|
||||
FROM candles
|
||||
WHERE symbol = $1 AND interval = $2
|
||||
AND time >= $3 AND time < $4
|
||||
ORDER BY time ASC
|
||||
""", symbol, source_interval, bucket_start, bucket_end)
|
||||
|
||||
if not rows:
|
||||
return
|
||||
|
||||
# Aggregate
|
||||
is_complete = len(rows) >= expected_count
|
||||
|
||||
candle = CustomCandle(
|
||||
time=bucket_start,
|
||||
symbol=symbol,
|
||||
interval=interval,
|
||||
open=float(rows[0]['open']),
|
||||
high=max(float(r['high']) for r in rows),
|
||||
low=min(float(r['low']) for r in rows),
|
||||
close=float(rows[-1]['close']),
|
||||
volume=sum(float(r['volume']) for r in rows),
|
||||
is_complete=is_complete
|
||||
)
|
||||
|
||||
await self._upsert_candle(candle)
|
||||
|
||||
async def _upsert_candle(self, c: CustomCandle) -> None:
|
||||
"""Upsert a single candle"""
|
||||
async with self.db.acquire() as conn:
|
||||
await conn.execute("""
|
||||
INSERT INTO candles (time, symbol, interval, open, high, low, close, volume, validated)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT (time, symbol, interval) DO UPDATE SET
|
||||
open = EXCLUDED.open,
|
||||
high = EXCLUDED.high,
|
||||
low = EXCLUDED.low,
|
||||
close = EXCLUDED.close,
|
||||
volume = EXCLUDED.volume,
|
||||
validated = EXCLUDED.validated,
|
||||
created_at = NOW()
|
||||
""", c.time, c.symbol, c.interval, c.open, c.high, c.low, c.close, c.volume, c.is_complete)
|
||||
|
||||
async def update_realtime(self, new_1m_candles: List[Candle]) -> None:
|
||||
"""
|
||||
Update ALL timeframes (standard and custom) based on new 1m candles.
|
||||
Called after 1m buffer flush.
|
||||
"""
|
||||
if not new_1m_candles:
|
||||
return
|
||||
|
||||
if not self.first_1m_time:
|
||||
await self.initialize()
|
||||
|
||||
symbol = new_1m_candles[0].symbol
|
||||
|
||||
# 1. Update all intervals except 1m and 148m
|
||||
intervals_to_update = list(self.STANDARD_INTERVALS.keys()) + ['37m']
|
||||
|
||||
tasks = []
|
||||
for interval in intervals_to_update:
|
||||
bucket_start = self.get_bucket_start(new_1m_candles[-1].time, interval)
|
||||
tasks.append(self.aggregate_and_upsert(symbol, interval, bucket_start))
|
||||
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
# 2. Update 148m (it depends on 37m being updated first)
|
||||
bucket_148m = self.get_bucket_start(new_1m_candles[-1].time, '148m')
|
||||
await self.aggregate_and_upsert(symbol, '148m', bucket_148m)
|
||||
|
||||
async def generate_historical(self, interval: str, batch_size: int = 5000) -> int:
|
||||
"""
|
||||
Force recalculation of all candles for a timeframe from 1m data.
|
||||
"""
|
||||
if not self.first_1m_time:
|
||||
await self.initialize()
|
||||
|
||||
if not self.first_1m_time:
|
||||
return 0
|
||||
|
||||
config = self.CUSTOM_INTERVALS.get(interval) or {'source': '1m'}
|
||||
source_interval = config.get('source', '1m')
|
||||
|
||||
logger.info(f"Generating historical {interval} from {source_interval}...")
|
||||
|
||||
# Get date range available in source data
|
||||
async with self.db.acquire() as conn:
|
||||
min_max = await conn.fetchrow("""
|
||||
SELECT MIN(time), MAX(time) FROM candles
|
||||
WHERE symbol = 'BTC' AND interval = $1
|
||||
""", source_interval)
|
||||
|
||||
if not min_max or not min_max[0]:
|
||||
return 0
|
||||
|
||||
curr = self.get_bucket_start(min_max[0], interval)
|
||||
end = min_max[1]
|
||||
|
||||
total_inserted = 0
|
||||
while curr <= end:
|
||||
await self.aggregate_and_upsert('BTC', interval, curr)
|
||||
total_inserted += 1
|
||||
|
||||
# Advance curr
|
||||
if interval == '1M':
|
||||
_, days = calendar.monthrange(curr.year, curr.month)
|
||||
curr += timedelta(days=days)
|
||||
elif interval in self.STANDARD_INTERVALS:
|
||||
cfg = self.STANDARD_INTERVALS[interval]
|
||||
if cfg['type'] == 'min': curr += timedelta(minutes=cfg['value'])
|
||||
elif cfg['type'] == 'hour': curr += timedelta(hours=cfg['value'])
|
||||
elif cfg['type'] == 'day': curr += timedelta(days=cfg['value'])
|
||||
elif cfg['type'] == 'week': curr += timedelta(weeks=1)
|
||||
else: # Custom
|
||||
minutes = self.CUSTOM_INTERVALS[interval]['minutes']
|
||||
curr += timedelta(minutes=minutes)
|
||||
|
||||
if total_inserted % 100 == 0:
|
||||
logger.info(f"Generated {total_inserted} {interval} candles...")
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
return total_inserted
|
||||
|
||||
async def verify_integrity(self, interval: str) -> Dict:
|
||||
async with self.db.acquire() as conn:
|
||||
stats = await conn.fetchrow("""
|
||||
SELECT
|
||||
COUNT(*) as total_candles,
|
||||
MIN(time) as earliest,
|
||||
MAX(time) as latest,
|
||||
COUNT(*) FILTER (WHERE validated = TRUE) as complete_candles,
|
||||
COUNT(*) FILTER (WHERE validated = FALSE) as incomplete_candles
|
||||
FROM candles
|
||||
WHERE interval = $1 AND symbol = 'BTC'
|
||||
""", interval)
|
||||
return dict(stats) if stats else {}
|
||||
Reference in New Issue
Block a user