import argparse import json import logging import os import sys import time from collections import deque from datetime import datetime, timedelta import csv from hyperliquid.info import Info from hyperliquid.utils import constants from hyperliquid.utils.error import ClientError # Assuming logging_utils.py is in the same directory from logging_utils import setup_logging class CandleFetcher: """ A class to fetch and manage historical candle data from Hyperliquid. """ def __init__(self, coins_to_fetch: list, interval: str, days_back: int): self.info = Info(constants.MAINNET_API_URL, skip_ws=True) self.coins = self._resolve_coins(coins_to_fetch) self.interval = interval self.days_back = days_back self.data_folder = os.path.join("_data", "candles") self.csv_headers = [ 'datetime_utc', 'timestamp_ms', 'open', 'high', 'low', 'close', 'volume', 'number_of_trades' ] self.header_mapping = { 't': 'timestamp_ms', 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume', 'n': 'number_of_trades' } def _resolve_coins(self, coins_arg: list) -> list: """Determines the final list of coins to fetch.""" if coins_arg and "all" in [c.lower() for c in coins_arg]: logging.info("Fetching data for all available coins.") try: with open("coin_precision.json", 'r') as f: return list(json.load(f).keys()) except FileNotFoundError: logging.error("'coin_precision.json' not found. Please run list_coins.py first.") sys.exit(1) else: logging.info(f"Fetching data for specified coins: {coins_arg}") return coins_arg def run(self): """Starts the data fetching process for all configured coins.""" if not os.path.exists(self.data_folder): os.makedirs(self.data_folder) logging.info(f"Created data directory: '{self.data_folder}'") for coin in self.coins: logging.info(f"--- Starting process for {coin} ---") self._update_data_for_coin(coin) time.sleep(1) # Be polite to the API between processing different coins def _get_start_time(self, file_path: str) -> (int, bool): """Checks for an existing file and returns the last timestamp, or a default start time.""" if os.path.exists(file_path): try: with open(file_path, 'r', newline='', encoding='utf-8') as f: reader = csv.reader(f) header = next(reader) timestamp_index = header.index('timestamp_ms') last_row = deque(reader, maxlen=1) if last_row: last_timestamp = int(last_row[0][timestamp_index]) logging.info(f"Existing file found. Resuming from timestamp: {last_timestamp}") return last_timestamp, True except (IOError, ValueError, StopIteration, IndexError) as e: logging.warning(f"Could not read '{file_path}'. Re-fetching history. Error: {e}") # If file doesn't exist or is invalid, fetch history start_dt = datetime.now() - timedelta(days=self.days_back) start_ms = int(start_dt.timestamp() * 1000) logging.info(f"No valid data file. Fetching last {self.days_back} days.") return start_ms, False def _update_data_for_coin(self, coin: str): """Fetches and appends new candle data for a single coin.""" file_path = os.path.join(self.data_folder, f"{coin}_{self.interval}.csv") start_time_ms, file_existed = self._get_start_time(file_path) end_time_ms = int(time.time() * 1000) if start_time_ms >= end_time_ms: logging.warning(f"Start time ({datetime.fromtimestamp(start_time_ms/1000)}) is in the future. " f"This can be caused by an incorrect system clock. No data will be fetched for {coin}.") return all_candles = self._fetch_candles_aggressively(coin, start_time_ms, end_time_ms) if not all_candles: logging.info(f"No new data found for {coin}.") return # --- FIX: Robust de-duplication and filtering --- # This explicitly processes candles to ensure only new, unique ones are kept. new_unique_candles = [] seen_timestamps = set() # If updating an existing file, add the last known timestamp to the seen set # to prevent re-adding the exact same candle. if file_existed: seen_timestamps.add(start_time_ms) # Sort all fetched candles to process them chronologically all_candles.sort(key=lambda c: c['t']) for candle in all_candles: timestamp = candle['t'] # Only process candles that are strictly newer than the last saved one if timestamp > start_time_ms: # Add the candle only if we haven't already added this timestamp if timestamp not in seen_timestamps: new_unique_candles.append(candle) seen_timestamps.add(timestamp) if new_unique_candles: self._save_to_csv(new_unique_candles, file_path, file_existed) else: logging.info(f"No new candles to append for {coin}.") def _fetch_candles_aggressively(self, coin, start_ms, end_ms): """ Uses a greedy, self-correcting loop to fetch data efficiently. This is faster as it reduces the number of API calls. """ all_candles = [] current_start_time = start_ms total_duration = end_ms - start_ms while current_start_time < end_ms: progress = ((current_start_time - start_ms) / total_duration) * 100 if total_duration > 0 else 100 current_time_str = datetime.fromtimestamp(current_start_time / 1000).strftime('%Y-%m-%d %H:%M:%S') logging.info(f"Fetching {coin}: {progress:.2f}% complete. Current: {current_time_str}") candle_batch = self._fetch_batch_with_retry(coin, current_start_time, end_ms) if not candle_batch: logging.info("No more candles returned from API. Fetch complete.") break all_candles.extend(candle_batch) last_candle_timestamp = candle_batch[-1]["t"] if last_candle_timestamp < current_start_time: logging.warning("API returned older candles than requested. Breaking loop to prevent issues.") break current_start_time = last_candle_timestamp + 1 time.sleep(0.25) # Small delay to be polite return all_candles def _fetch_batch_with_retry(self, coin, start_ms, end_ms): """Performs a single API call with a retry mechanism.""" max_retries = 3 for attempt in range(max_retries): try: return self.info.candles_snapshot(coin, self.interval, start_ms, end_ms) except ClientError as e: if e.status_code == 429 and attempt < max_retries - 1: logging.warning("Rate limited. Retrying in 2 seconds...") time.sleep(2) else: logging.error(f"API Error for {coin}: {e}. Skipping batch.") return None return None def _save_to_csv(self, candles: list, file_path: str, is_append: bool): """Saves a list of candle data to a CSV file.""" processed_candles = [] for candle in candles: new_candle = {self.header_mapping[k]: v for k, v in candle.items() if k in self.header_mapping} new_candle['datetime_utc'] = datetime.fromtimestamp(candle['t'] / 1000).strftime('%Y-%m-%d %H:%M:%S') processed_candles.append(new_candle) write_mode = 'a' if is_append else 'w' try: with open(file_path, write_mode, newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=self.csv_headers) if not is_append: writer.writeheader() writer.writerows(processed_candles) logging.info(f"Successfully saved {len(processed_candles)} candles to '{file_path}'") except IOError as e: logging.error(f"Failed to write to file '{file_path}': {e}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fetch historical candle data from Hyperliquid.") parser.add_argument( "--coins", nargs='+', default=["BTC", "ETH"], help="List of coins to fetch (e.g., BTC ETH), or 'all' to fetch all coins." ) parser.add_argument("--interval", default="1m", help="Candle interval (e.g., 1m, 5m, 1h).") parser.add_argument("--days", type=int, default=7, help="Number of days of history to fetch for new coins.") args = parser.parse_args() setup_logging('normal', 'DataFetcher') fetcher = CandleFetcher(coins_to_fetch=args.coins, interval=args.interval, days_back=args.days) fetcher.run()