diff --git a/src/data_collector/custom_timeframe_generator.py b/src/data_collector/custom_timeframe_generator.py index b3e782a..55b690f 100644 --- a/src/data_collector/custom_timeframe_generator.py +++ b/src/data_collector/custom_timeframe_generator.py @@ -131,7 +131,7 @@ class CustomTimeframeGenerator: return days * 1440 return 1 - async def aggregate_and_upsert(self, symbol: str, interval: str, bucket_start: datetime) -> None: + async def aggregate_and_upsert(self, symbol: str, interval: str, bucket_start: datetime, conn=None) -> None: """Aggregate 1m data for a specific bucket and upsert""" bucket_end = bucket_start # Initialize @@ -159,55 +159,70 @@ class CustomTimeframeGenerator: else: bucket_end = bucket_start + timedelta(minutes=1) - async with self.db.acquire() as conn: - rows = await conn.fetch(f""" - SELECT time, open, high, low, close, volume - FROM candles - WHERE symbol = $1 AND interval = $2 - AND time >= $3 AND time < $4 - ORDER BY time ASC - """, symbol, source_interval, bucket_start, bucket_end) - - if not rows: - return + # Use provided connection or acquire a new one + if conn is None: + async with self.db.acquire() as connection: + await self._process_aggregation(connection, symbol, interval, source_interval, bucket_start, bucket_end, expected_count) + else: + await self._process_aggregation(conn, symbol, interval, source_interval, bucket_start, bucket_end, expected_count) - # Aggregate - is_complete = len(rows) >= expected_count - - candle = CustomCandle( - time=bucket_start, - symbol=symbol, - interval=interval, - open=float(rows[0]['open']), - high=max(float(r['high']) for r in rows), - low=min(float(r['low']) for r in rows), - close=float(rows[-1]['close']), - volume=sum(float(r['volume']) for r in rows), - is_complete=is_complete - ) - - await self._upsert_candle(candle) + async def _process_aggregation(self, conn, symbol, interval, source_interval, bucket_start, bucket_end, expected_count): + """Internal method to perform aggregation using a specific connection""" + rows = await conn.fetch(f""" + SELECT time, open, high, low, close, volume + FROM candles + WHERE symbol = $1 AND interval = $2 + AND time >= $3 AND time < $4 + ORDER BY time ASC + """, symbol, source_interval, bucket_start, bucket_end) + + if not rows: + return - async def _upsert_candle(self, c: CustomCandle) -> None: - """Upsert a single candle""" - async with self.db.acquire() as conn: - await conn.execute(""" - INSERT INTO candles (time, symbol, interval, open, high, low, close, volume, validated) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (time, symbol, interval) DO UPDATE SET - open = EXCLUDED.open, - high = EXCLUDED.high, - low = EXCLUDED.low, - close = EXCLUDED.close, - volume = EXCLUDED.volume, - validated = EXCLUDED.validated, - created_at = NOW() - """, c.time, c.symbol, c.interval, c.open, c.high, c.low, c.close, c.volume, c.is_complete) + # Aggregate + is_complete = len(rows) >= expected_count + + candle = CustomCandle( + time=bucket_start, + symbol=symbol, + interval=interval, + open=float(rows[0]['open']), + high=max(float(r['high']) for r in rows), + low=min(float(r['low']) for r in rows), + close=float(rows[-1]['close']), + volume=sum(float(r['volume']) for r in rows), + is_complete=is_complete + ) + + await self._upsert_candle(candle, conn) + + async def _upsert_candle(self, c: CustomCandle, conn=None) -> None: + """Upsert a single candle using provided connection or acquiring a new one""" + query = """ + INSERT INTO candles (time, symbol, interval, open, high, low, close, volume, validated) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (time, symbol, interval) DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + validated = EXCLUDED.validated, + created_at = NOW() + """ + values = (c.time, c.symbol, c.interval, c.open, c.high, c.low, c.close, c.volume, c.is_complete) + + if conn is None: + async with self.db.acquire() as connection: + await connection.execute(query, *values) + else: + await conn.execute(query, *values) async def update_realtime(self, new_1m_candles: List[Candle]) -> None: """ Update ALL timeframes (standard and custom) based on new 1m candles. Called after 1m buffer flush. + Uses a single connection for all updates sequentially to prevent pool exhaustion. """ if not new_1m_candles: return @@ -215,21 +230,29 @@ class CustomTimeframeGenerator: if not self.first_1m_time: await self.initialize() + if not self.first_1m_time: + return + symbol = new_1m_candles[0].symbol - # 1. Update all intervals except 1m and 148m - intervals_to_update = list(self.STANDARD_INTERVALS.keys()) + ['37m'] - - tasks = [] - for interval in intervals_to_update: - bucket_start = self.get_bucket_start(new_1m_candles[-1].time, interval) - tasks.append(self.aggregate_and_upsert(symbol, interval, bucket_start)) + async with self.db.acquire() as conn: + # 1. Update all standard intervals + 37m sequentially + # sequential is required because we are sharing the same connection 'conn' + intervals_to_update = list(self.STANDARD_INTERVALS.keys()) + ['37m'] - await asyncio.gather(*tasks) - - # 2. Update 148m (it depends on 37m being updated first) - bucket_148m = self.get_bucket_start(new_1m_candles[-1].time, '148m') - await self.aggregate_and_upsert(symbol, '148m', bucket_148m) + for interval in intervals_to_update: + try: + bucket_start = self.get_bucket_start(new_1m_candles[-1].time, interval) + await self.aggregate_and_upsert(symbol, interval, bucket_start, conn=conn) + except Exception as e: + logger.error(f"Error updating interval {interval}: {e}") + + # 2. Update 148m (it depends on 37m being updated first) + try: + bucket_148m = self.get_bucket_start(new_1m_candles[-1].time, '148m') + await self.aggregate_and_upsert(symbol, '148m', bucket_148m, conn=conn) + except Exception as e: + logger.error(f"Error updating interval 148m: {e}") async def generate_historical(self, interval: str, batch_size: int = 5000) -> int: """