Fix concurrent connection usage in multi-timeframe generator
This commit is contained in:
@ -131,7 +131,7 @@ class CustomTimeframeGenerator:
|
||||
return days * 1440
|
||||
return 1
|
||||
|
||||
async def aggregate_and_upsert(self, symbol: str, interval: str, bucket_start: datetime) -> None:
|
||||
async def aggregate_and_upsert(self, symbol: str, interval: str, bucket_start: datetime, conn=None) -> None:
|
||||
"""Aggregate 1m data for a specific bucket and upsert"""
|
||||
bucket_end = bucket_start # Initialize
|
||||
|
||||
@ -159,55 +159,70 @@ class CustomTimeframeGenerator:
|
||||
else:
|
||||
bucket_end = bucket_start + timedelta(minutes=1)
|
||||
|
||||
async with self.db.acquire() as conn:
|
||||
rows = await conn.fetch(f"""
|
||||
SELECT time, open, high, low, close, volume
|
||||
FROM candles
|
||||
WHERE symbol = $1 AND interval = $2
|
||||
AND time >= $3 AND time < $4
|
||||
ORDER BY time ASC
|
||||
""", symbol, source_interval, bucket_start, bucket_end)
|
||||
|
||||
if not rows:
|
||||
return
|
||||
# Use provided connection or acquire a new one
|
||||
if conn is None:
|
||||
async with self.db.acquire() as connection:
|
||||
await self._process_aggregation(connection, symbol, interval, source_interval, bucket_start, bucket_end, expected_count)
|
||||
else:
|
||||
await self._process_aggregation(conn, symbol, interval, source_interval, bucket_start, bucket_end, expected_count)
|
||||
|
||||
# Aggregate
|
||||
is_complete = len(rows) >= expected_count
|
||||
|
||||
candle = CustomCandle(
|
||||
time=bucket_start,
|
||||
symbol=symbol,
|
||||
interval=interval,
|
||||
open=float(rows[0]['open']),
|
||||
high=max(float(r['high']) for r in rows),
|
||||
low=min(float(r['low']) for r in rows),
|
||||
close=float(rows[-1]['close']),
|
||||
volume=sum(float(r['volume']) for r in rows),
|
||||
is_complete=is_complete
|
||||
)
|
||||
|
||||
await self._upsert_candle(candle)
|
||||
async def _process_aggregation(self, conn, symbol, interval, source_interval, bucket_start, bucket_end, expected_count):
|
||||
"""Internal method to perform aggregation using a specific connection"""
|
||||
rows = await conn.fetch(f"""
|
||||
SELECT time, open, high, low, close, volume
|
||||
FROM candles
|
||||
WHERE symbol = $1 AND interval = $2
|
||||
AND time >= $3 AND time < $4
|
||||
ORDER BY time ASC
|
||||
""", symbol, source_interval, bucket_start, bucket_end)
|
||||
|
||||
if not rows:
|
||||
return
|
||||
|
||||
async def _upsert_candle(self, c: CustomCandle) -> None:
|
||||
"""Upsert a single candle"""
|
||||
async with self.db.acquire() as conn:
|
||||
await conn.execute("""
|
||||
INSERT INTO candles (time, symbol, interval, open, high, low, close, volume, validated)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT (time, symbol, interval) DO UPDATE SET
|
||||
open = EXCLUDED.open,
|
||||
high = EXCLUDED.high,
|
||||
low = EXCLUDED.low,
|
||||
close = EXCLUDED.close,
|
||||
volume = EXCLUDED.volume,
|
||||
validated = EXCLUDED.validated,
|
||||
created_at = NOW()
|
||||
""", c.time, c.symbol, c.interval, c.open, c.high, c.low, c.close, c.volume, c.is_complete)
|
||||
# Aggregate
|
||||
is_complete = len(rows) >= expected_count
|
||||
|
||||
candle = CustomCandle(
|
||||
time=bucket_start,
|
||||
symbol=symbol,
|
||||
interval=interval,
|
||||
open=float(rows[0]['open']),
|
||||
high=max(float(r['high']) for r in rows),
|
||||
low=min(float(r['low']) for r in rows),
|
||||
close=float(rows[-1]['close']),
|
||||
volume=sum(float(r['volume']) for r in rows),
|
||||
is_complete=is_complete
|
||||
)
|
||||
|
||||
await self._upsert_candle(candle, conn)
|
||||
|
||||
async def _upsert_candle(self, c: CustomCandle, conn=None) -> None:
|
||||
"""Upsert a single candle using provided connection or acquiring a new one"""
|
||||
query = """
|
||||
INSERT INTO candles (time, symbol, interval, open, high, low, close, volume, validated)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT (time, symbol, interval) DO UPDATE SET
|
||||
open = EXCLUDED.open,
|
||||
high = EXCLUDED.high,
|
||||
low = EXCLUDED.low,
|
||||
close = EXCLUDED.close,
|
||||
volume = EXCLUDED.volume,
|
||||
validated = EXCLUDED.validated,
|
||||
created_at = NOW()
|
||||
"""
|
||||
values = (c.time, c.symbol, c.interval, c.open, c.high, c.low, c.close, c.volume, c.is_complete)
|
||||
|
||||
if conn is None:
|
||||
async with self.db.acquire() as connection:
|
||||
await connection.execute(query, *values)
|
||||
else:
|
||||
await conn.execute(query, *values)
|
||||
|
||||
async def update_realtime(self, new_1m_candles: List[Candle]) -> None:
|
||||
"""
|
||||
Update ALL timeframes (standard and custom) based on new 1m candles.
|
||||
Called after 1m buffer flush.
|
||||
Uses a single connection for all updates sequentially to prevent pool exhaustion.
|
||||
"""
|
||||
if not new_1m_candles:
|
||||
return
|
||||
@ -215,21 +230,29 @@ class CustomTimeframeGenerator:
|
||||
if not self.first_1m_time:
|
||||
await self.initialize()
|
||||
|
||||
if not self.first_1m_time:
|
||||
return
|
||||
|
||||
symbol = new_1m_candles[0].symbol
|
||||
|
||||
# 1. Update all intervals except 1m and 148m
|
||||
intervals_to_update = list(self.STANDARD_INTERVALS.keys()) + ['37m']
|
||||
|
||||
tasks = []
|
||||
for interval in intervals_to_update:
|
||||
bucket_start = self.get_bucket_start(new_1m_candles[-1].time, interval)
|
||||
tasks.append(self.aggregate_and_upsert(symbol, interval, bucket_start))
|
||||
async with self.db.acquire() as conn:
|
||||
# 1. Update all standard intervals + 37m sequentially
|
||||
# sequential is required because we are sharing the same connection 'conn'
|
||||
intervals_to_update = list(self.STANDARD_INTERVALS.keys()) + ['37m']
|
||||
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
# 2. Update 148m (it depends on 37m being updated first)
|
||||
bucket_148m = self.get_bucket_start(new_1m_candles[-1].time, '148m')
|
||||
await self.aggregate_and_upsert(symbol, '148m', bucket_148m)
|
||||
for interval in intervals_to_update:
|
||||
try:
|
||||
bucket_start = self.get_bucket_start(new_1m_candles[-1].time, interval)
|
||||
await self.aggregate_and_upsert(symbol, interval, bucket_start, conn=conn)
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating interval {interval}: {e}")
|
||||
|
||||
# 2. Update 148m (it depends on 37m being updated first)
|
||||
try:
|
||||
bucket_148m = self.get_bucket_start(new_1m_candles[-1].time, '148m')
|
||||
await self.aggregate_and_upsert(symbol, '148m', bucket_148m, conn=conn)
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating interval 148m: {e}")
|
||||
|
||||
async def generate_historical(self, interval: str, batch_size: int = 5000) -> int:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user