214 lines
9.1 KiB
Python
214 lines
9.1 KiB
Python
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from collections import deque
|
|
from datetime import datetime, timedelta
|
|
import csv
|
|
|
|
from hyperliquid.info import Info
|
|
from hyperliquid.utils import constants
|
|
from hyperliquid.utils.error import ClientError
|
|
|
|
# Assuming logging_utils.py is in the same directory
|
|
from logging_utils import setup_logging
|
|
|
|
|
|
class CandleFetcher:
|
|
"""
|
|
A class to fetch and manage historical candle data from Hyperliquid.
|
|
"""
|
|
|
|
def __init__(self, coins_to_fetch: list, interval: str, days_back: int):
|
|
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
|
|
self.coins = self._resolve_coins(coins_to_fetch)
|
|
self.interval = interval
|
|
self.days_back = days_back
|
|
self.data_folder = os.path.join("_data", "candles")
|
|
self.csv_headers = [
|
|
'datetime_utc', 'timestamp_ms', 'open', 'high', 'low', 'close', 'volume', 'number_of_trades'
|
|
]
|
|
self.header_mapping = {
|
|
't': 'timestamp_ms', 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume', 'n': 'number_of_trades'
|
|
}
|
|
|
|
def _resolve_coins(self, coins_arg: list) -> list:
|
|
"""Determines the final list of coins to fetch."""
|
|
if coins_arg and "all" in [c.lower() for c in coins_arg]:
|
|
logging.info("Fetching data for all available coins.")
|
|
try:
|
|
with open("coin_precision.json", 'r') as f:
|
|
return list(json.load(f).keys())
|
|
except FileNotFoundError:
|
|
logging.error("'coin_precision.json' not found. Please run list_coins.py first.")
|
|
sys.exit(1)
|
|
else:
|
|
logging.info(f"Fetching data for specified coins: {coins_arg}")
|
|
return coins_arg
|
|
|
|
def run(self):
|
|
"""Starts the data fetching process for all configured coins."""
|
|
if not os.path.exists(self.data_folder):
|
|
os.makedirs(self.data_folder)
|
|
logging.info(f"Created data directory: '{self.data_folder}'")
|
|
|
|
for coin in self.coins:
|
|
logging.info(f"--- Starting process for {coin} ---")
|
|
self._update_data_for_coin(coin)
|
|
time.sleep(1) # Be polite to the API between processing different coins
|
|
|
|
def _get_start_time(self, file_path: str) -> (int, bool):
|
|
"""Checks for an existing file and returns the last timestamp, or a default start time."""
|
|
if os.path.exists(file_path):
|
|
try:
|
|
with open(file_path, 'r', newline='', encoding='utf-8') as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader)
|
|
timestamp_index = header.index('timestamp_ms')
|
|
last_row = deque(reader, maxlen=1)
|
|
if last_row:
|
|
last_timestamp = int(last_row[0][timestamp_index])
|
|
logging.info(f"Existing file found. Resuming from timestamp: {last_timestamp}")
|
|
return last_timestamp, True
|
|
except (IOError, ValueError, StopIteration, IndexError) as e:
|
|
logging.warning(f"Could not read '{file_path}'. Re-fetching history. Error: {e}")
|
|
|
|
# If file doesn't exist or is invalid, fetch history
|
|
start_dt = datetime.now() - timedelta(days=self.days_back)
|
|
start_ms = int(start_dt.timestamp() * 1000)
|
|
logging.info(f"No valid data file. Fetching last {self.days_back} days.")
|
|
return start_ms, False
|
|
|
|
def _update_data_for_coin(self, coin: str):
|
|
"""Fetches and appends new candle data for a single coin."""
|
|
file_path = os.path.join(self.data_folder, f"{coin}_{self.interval}.csv")
|
|
start_time_ms, file_existed = self._get_start_time(file_path)
|
|
end_time_ms = int(time.time() * 1000)
|
|
|
|
if start_time_ms >= end_time_ms:
|
|
logging.warning(f"Start time ({datetime.fromtimestamp(start_time_ms/1000)}) is in the future. "
|
|
f"This can be caused by an incorrect system clock. No data will be fetched for {coin}.")
|
|
return
|
|
|
|
all_candles = self._fetch_candles_aggressively(coin, start_time_ms, end_time_ms)
|
|
|
|
if not all_candles:
|
|
logging.info(f"No new data found for {coin}.")
|
|
return
|
|
|
|
# --- FIX: Robust de-duplication and filtering ---
|
|
# This explicitly processes candles to ensure only new, unique ones are kept.
|
|
new_unique_candles = []
|
|
seen_timestamps = set()
|
|
|
|
# If updating an existing file, add the last known timestamp to the seen set
|
|
# to prevent re-adding the exact same candle.
|
|
if file_existed:
|
|
seen_timestamps.add(start_time_ms)
|
|
|
|
# Sort all fetched candles to process them chronologically
|
|
all_candles.sort(key=lambda c: c['t'])
|
|
|
|
for candle in all_candles:
|
|
timestamp = candle['t']
|
|
# Only process candles that are strictly newer than the last saved one
|
|
if timestamp > start_time_ms:
|
|
# Add the candle only if we haven't already added this timestamp
|
|
if timestamp not in seen_timestamps:
|
|
new_unique_candles.append(candle)
|
|
seen_timestamps.add(timestamp)
|
|
|
|
if new_unique_candles:
|
|
self._save_to_csv(new_unique_candles, file_path, file_existed)
|
|
else:
|
|
logging.info(f"No new candles to append for {coin}.")
|
|
|
|
def _fetch_candles_aggressively(self, coin, start_ms, end_ms):
|
|
"""
|
|
Uses a greedy, self-correcting loop to fetch data efficiently.
|
|
This is faster as it reduces the number of API calls.
|
|
"""
|
|
all_candles = []
|
|
current_start_time = start_ms
|
|
total_duration = end_ms - start_ms
|
|
|
|
while current_start_time < end_ms:
|
|
progress = ((current_start_time - start_ms) / total_duration) * 100 if total_duration > 0 else 100
|
|
current_time_str = datetime.fromtimestamp(current_start_time / 1000).strftime('%Y-%m-%d %H:%M:%S')
|
|
logging.info(f"Fetching {coin}: {progress:.2f}% complete. Current: {current_time_str}")
|
|
|
|
candle_batch = self._fetch_batch_with_retry(coin, current_start_time, end_ms)
|
|
|
|
if not candle_batch:
|
|
logging.info("No more candles returned from API. Fetch complete.")
|
|
break
|
|
|
|
all_candles.extend(candle_batch)
|
|
last_candle_timestamp = candle_batch[-1]["t"]
|
|
|
|
if last_candle_timestamp < current_start_time:
|
|
logging.warning("API returned older candles than requested. Breaking loop to prevent issues.")
|
|
break
|
|
|
|
current_start_time = last_candle_timestamp + 1
|
|
time.sleep(0.25) # Small delay to be polite
|
|
|
|
return all_candles
|
|
|
|
def _fetch_batch_with_retry(self, coin, start_ms, end_ms):
|
|
"""Performs a single API call with a retry mechanism."""
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return self.info.candles_snapshot(coin, self.interval, start_ms, end_ms)
|
|
except ClientError as e:
|
|
if e.status_code == 429 and attempt < max_retries - 1:
|
|
logging.warning("Rate limited. Retrying in 2 seconds...")
|
|
time.sleep(2)
|
|
else:
|
|
logging.error(f"API Error for {coin}: {e}. Skipping batch.")
|
|
return None
|
|
return None
|
|
|
|
|
|
def _save_to_csv(self, candles: list, file_path: str, is_append: bool):
|
|
"""Saves a list of candle data to a CSV file."""
|
|
processed_candles = []
|
|
for candle in candles:
|
|
new_candle = {self.header_mapping[k]: v for k, v in candle.items() if k in self.header_mapping}
|
|
new_candle['datetime_utc'] = datetime.fromtimestamp(candle['t'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
|
|
processed_candles.append(new_candle)
|
|
|
|
write_mode = 'a' if is_append else 'w'
|
|
try:
|
|
with open(file_path, write_mode, newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=self.csv_headers)
|
|
if not is_append:
|
|
writer.writeheader()
|
|
writer.writerows(processed_candles)
|
|
logging.info(f"Successfully saved {len(processed_candles)} candles to '{file_path}'")
|
|
except IOError as e:
|
|
logging.error(f"Failed to write to file '{file_path}': {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Fetch historical candle data from Hyperliquid.")
|
|
parser.add_argument(
|
|
"--coins",
|
|
nargs='+',
|
|
default=["BTC", "ETH"],
|
|
help="List of coins to fetch (e.g., BTC ETH), or 'all' to fetch all coins."
|
|
)
|
|
parser.add_argument("--interval", default="1m", help="Candle interval (e.g., 1m, 5m, 1h).")
|
|
parser.add_argument("--days", type=int, default=7, help="Number of days of history to fetch for new coins.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
setup_logging('normal', 'DataFetcher')
|
|
|
|
fetcher = CandleFetcher(coins_to_fetch=args.coins, interval=args.interval, days_back=args.days)
|
|
fetcher.run()
|
|
|