import boto3 from botocore import UNSIGNED from botocore.config import Config from botocore.exceptions import ClientError import os import argparse from datetime import datetime, timedelta import asyncio import lz4.frame from pathlib import Path import csv import json # MUST USE PATHLIB INSTEAD DIR_PATH = Path(__file__).parent BUCKET = "hyperliquid-archive" CSV_HEADER = ["datetime", "timestamp", "level", "price", "size", "number"] # s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) # s3.download_file('hyperliquid-archive', 'market_data/20230916/9/l2Book/SOL.lz4', f"{dir_path}/SOL.lz4") # earliest date: 20230415/0/ def get_args(): parser = argparse.ArgumentParser(description="Retrieve historical tick level market data from Hyperliquid exchange") subparser = parser.add_subparsers(dest="tool", required=True, help="tool: download, decompress, to_csv") global_parser = subparser.add_parser("global_settings", add_help=False) global_parser.add_argument("t", metavar="Tickers", help="Tickers of assets to be downloaded seperated by spaces. e.g. BTC ETH", nargs="+") global_parser.add_argument("--all", help="Apply action to all available dates and times.", action="store_true", default=False) global_parser.add_argument("--anonymous", help="Use anonymous (unsigned) S3 requests. Defaults to signed requests if not provided.", action="store_true", default=False) global_parser.add_argument("-sd", metavar="Start date", help="Starting date as one unbroken string formatted: YYYYMMDD. e.g. 20230916") global_parser.add_argument("-sh", metavar="Start hour", help="Hour of the starting day as an integer between 0 and 23. e.g. 9 Default: 0", type=int, default=0) global_parser.add_argument("-ed", metavar="End date", help="Ending date as one unbroken string formatted: YYYYMMDD. e.g. 20230916") global_parser.add_argument("-eh", metavar="End hour", help="Hour of the ending day as an integer between 0 and 23. e.g. 9 Default: 23", type=int, default=23) download_parser = subparser.add_parser("download", help="Download historical market data", parents=[global_parser]) decompress_parser = subparser.add_parser("decompress", help="Decompress downloaded lz4 data", parents=[global_parser]) to_csv_parser = subparser.add_parser("to_csv", help="Convert decompressed downloads into formatted CSV", parents=[global_parser]) return parser.parse_args() def make_date_list(start_date, end_date): start_date = datetime.strptime(start_date, '%Y%m%d') end_date = datetime.strptime(end_date, '%Y%m%d') date_list = [] current_date = start_date while current_date <= end_date: date_list.append(current_date.strftime('%Y%m%d')) current_date += timedelta(days=1) return date_list def make_date_hour_list(date_list, start_hour, end_hour, delimiter="/"): date_hour_list = [] end_date = date_list[-1] hour = start_hour end = 23 for date in date_list: if date == end_date: end = end_hour while hour <= end: date_hour = date + delimiter + str(hour) date_hour_list.append(date_hour) hour += 1 hour = 0 return date_hour_list async def download_object(s3, asset, date_hour): date_and_hour = date_hour.split("/") key = f"market_data/{date_hour}/l2Book/{asset}.lz4" dest = f"{DIR_PATH}/downloads/{asset}/{date_and_hour[0]}-{date_and_hour[1]}.lz4" try: s3.download_file(BUCKET, key, dest) except ClientError as e: # Print a concise message and continue. Common errors: 403 Forbidden, 404 Not Found. code = e.response.get('Error', {}).get('Code') if hasattr(e, 'response') else 'Unknown' print(f"Failed to download {key}: {code} - {e}") return async def download_objects(s3, assets, date_hour_list): print(f"Downloading {len(date_hour_list)} objects...") for asset in assets: await asyncio.gather(*[download_object(s3, asset, date_hour) for date_hour in date_hour_list]) async def decompress_file(asset, date_hour): lz_file_path = DIR_PATH / "downloads" / asset / f"{date_hour}.lz4" file_path = DIR_PATH / "downloads" / asset / date_hour if not lz_file_path.is_file(): print(f"decompress_file: file not found: {lz_file_path}") return with lz4.frame.open(lz_file_path, mode='r') as lzfile: data = lzfile.read() with open(file_path, "wb") as file: file.write(data) async def decompress_files(assets, date_hour_list): print(f"Decompressing {len(date_hour_list)} files...") for asset in assets: await asyncio.gather(*[decompress_file(asset, date_hour) for date_hour in date_hour_list]) def write_rows(csv_writer, line): rows = [] entry = json.loads(line) date_time = entry["time"] timestamp = str(entry["raw"]["data"]["time"]) all_orders = entry["raw"]["data"]["levels"] for i, order_level in enumerate(all_orders): level = str(i + 1) for order in order_level: price = order["px"] size = order["sz"] number = str(order["n"]) rows.append([date_time, timestamp, level, price, size, number]) for row in rows: csv_writer.writerow(row) async def convert_file(asset, date_hour): file_path = DIR_PATH / "downloads" / asset / date_hour csv_path = DIR_PATH / "csv" / asset / f"{date_hour}.csv" with open(csv_path, "w", newline='') as csv_file: csv_writer = csv.writer(csv_file, dialect="excel") csv_writer.writerow(CSV_HEADER) with open(file_path) as file: for line in file: write_rows(csv_writer, line) async def files_to_csv(assets, date_hour_list): print(f"Converting {len(date_hour_list)} files to CSV...") for asset in assets: await asyncio.gather(*[convert_file(asset, date_hour) for date_hour in date_hour_list]) def main(): print(DIR_PATH) args = get_args() # Create S3 client according to whether anonymous access was requested. if getattr(args, 'anonymous', False): s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) else: s3 = boto3.client('s3') downloads_path = DIR_PATH / "downloads" downloads_path.mkdir(exist_ok=True) csv_path = DIR_PATH / "csv" csv_path.mkdir(exist_ok=True) for asset in args.t: downloads_asset_path = downloads_path / asset downloads_asset_path.mkdir(exist_ok=True) csv_asset_path = csv_path / asset csv_asset_path.mkdir(exist_ok=True) date_list = make_date_list(args.sd, args.ed) loop = asyncio.new_event_loop() if args.tool == "download": date_hour_list = make_date_hour_list(date_list, args.sh, args.eh) loop.run_until_complete(download_objects(s3, args.t, date_hour_list)) loop.close() if args.tool == "decompress": date_hour_list = make_date_hour_list(date_list, args.sh, args.eh, delimiter="-") loop.run_until_complete(decompress_files(args.t, date_hour_list)) loop.close() if args.tool == "to_csv": date_hour_list = make_date_hour_list(date_list, args.sh, args.eh, delimiter="-") loop.run_until_complete(files_to_csv(args.t, date_hour_list)) loop.close() print("Done") if __name__ == "__main__": main()