Files
hyper/_data/candles/hyperliquid-historical.py

231 lines
7.3 KiB
Python

import boto3
from botocore import UNSIGNED
from botocore.config import Config
from botocore.exceptions import ClientError
import os
import argparse
from datetime import datetime, timedelta
import asyncio
import lz4.frame
from pathlib import Path
import csv
import json
# MUST USE PATHLIB INSTEAD
DIR_PATH = Path(__file__).parent
BUCKET = "hyperliquid-archive"
CSV_HEADER = ["datetime", "timestamp", "level", "price", "size", "number"]
# s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
# s3.download_file('hyperliquid-archive', 'market_data/20230916/9/l2Book/SOL.lz4', f"{dir_path}/SOL.lz4")
# earliest date: 20230415/0/
def get_args():
parser = argparse.ArgumentParser(description="Retrieve historical tick level market data from Hyperliquid exchange")
subparser = parser.add_subparsers(dest="tool", required=True, help="tool: download, decompress, to_csv")
global_parser = subparser.add_parser("global_settings", add_help=False)
global_parser.add_argument("t", metavar="Tickers", help="Tickers of assets to be downloaded seperated by spaces. e.g. BTC ETH", nargs="+")
global_parser.add_argument("--all", help="Apply action to all available dates and times.", action="store_true", default=False)
global_parser.add_argument("--anonymous", help="Use anonymous (unsigned) S3 requests. Defaults to signed requests if not provided.", action="store_true", default=False)
global_parser.add_argument("-sd", metavar="Start date", help="Starting date as one unbroken string formatted: YYYYMMDD. e.g. 20230916")
global_parser.add_argument("-sh", metavar="Start hour", help="Hour of the starting day as an integer between 0 and 23. e.g. 9 Default: 0", type=int, default=0)
global_parser.add_argument("-ed", metavar="End date", help="Ending date as one unbroken string formatted: YYYYMMDD. e.g. 20230916")
global_parser.add_argument("-eh", metavar="End hour", help="Hour of the ending day as an integer between 0 and 23. e.g. 9 Default: 23", type=int, default=23)
download_parser = subparser.add_parser("download", help="Download historical market data", parents=[global_parser])
decompress_parser = subparser.add_parser("decompress", help="Decompress downloaded lz4 data", parents=[global_parser])
to_csv_parser = subparser.add_parser("to_csv", help="Convert decompressed downloads into formatted CSV", parents=[global_parser])
return parser.parse_args()
def make_date_list(start_date, end_date):
start_date = datetime.strptime(start_date, '%Y%m%d')
end_date = datetime.strptime(end_date, '%Y%m%d')
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date.strftime('%Y%m%d'))
current_date += timedelta(days=1)
return date_list
def make_date_hour_list(date_list, start_hour, end_hour, delimiter="/"):
date_hour_list = []
end_date = date_list[-1]
hour = start_hour
end = 23
for date in date_list:
if date == end_date:
end = end_hour
while hour <= end:
date_hour = date + delimiter + str(hour)
date_hour_list.append(date_hour)
hour += 1
hour = 0
return date_hour_list
async def download_object(s3, asset, date_hour):
date_and_hour = date_hour.split("/")
key = f"market_data/{date_hour}/l2Book/{asset}.lz4"
dest = f"{DIR_PATH}/downloads/{asset}/{date_and_hour[0]}-{date_and_hour[1]}.lz4"
try:
s3.download_file(BUCKET, key, dest)
except ClientError as e:
# Print a concise message and continue. Common errors: 403 Forbidden, 404 Not Found.
code = e.response.get('Error', {}).get('Code') if hasattr(e, 'response') else 'Unknown'
print(f"Failed to download {key}: {code} - {e}")
return
async def download_objects(s3, assets, date_hour_list):
print(f"Downloading {len(date_hour_list)} objects...")
for asset in assets:
await asyncio.gather(*[download_object(s3, asset, date_hour) for date_hour in date_hour_list])
async def decompress_file(asset, date_hour):
lz_file_path = DIR_PATH / "downloads" / asset / f"{date_hour}.lz4"
file_path = DIR_PATH / "downloads" / asset / date_hour
if not lz_file_path.is_file():
print(f"decompress_file: file not found: {lz_file_path}")
return
with lz4.frame.open(lz_file_path, mode='r') as lzfile:
data = lzfile.read()
with open(file_path, "wb") as file:
file.write(data)
async def decompress_files(assets, date_hour_list):
print(f"Decompressing {len(date_hour_list)} files...")
for asset in assets:
await asyncio.gather(*[decompress_file(asset, date_hour) for date_hour in date_hour_list])
def write_rows(csv_writer, line):
rows = []
entry = json.loads(line)
date_time = entry["time"]
timestamp = str(entry["raw"]["data"]["time"])
all_orders = entry["raw"]["data"]["levels"]
for i, order_level in enumerate(all_orders):
level = str(i + 1)
for order in order_level:
price = order["px"]
size = order["sz"]
number = str(order["n"])
rows.append([date_time, timestamp, level, price, size, number])
for row in rows:
csv_writer.writerow(row)
async def convert_file(asset, date_hour):
file_path = DIR_PATH / "downloads" / asset / date_hour
csv_path = DIR_PATH / "csv" / asset / f"{date_hour}.csv"
with open(csv_path, "w", newline='') as csv_file:
csv_writer = csv.writer(csv_file, dialect="excel")
csv_writer.writerow(CSV_HEADER)
with open(file_path) as file:
for line in file:
write_rows(csv_writer, line)
async def files_to_csv(assets, date_hour_list):
print(f"Converting {len(date_hour_list)} files to CSV...")
for asset in assets:
await asyncio.gather(*[convert_file(asset, date_hour) for date_hour in date_hour_list])
def main():
print(DIR_PATH)
args = get_args()
# Create S3 client according to whether anonymous access was requested.
if getattr(args, 'anonymous', False):
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
else:
s3 = boto3.client('s3')
downloads_path = DIR_PATH / "downloads"
downloads_path.mkdir(exist_ok=True)
csv_path = DIR_PATH / "csv"
csv_path.mkdir(exist_ok=True)
for asset in args.t:
downloads_asset_path = downloads_path / asset
downloads_asset_path.mkdir(exist_ok=True)
csv_asset_path = csv_path / asset
csv_asset_path.mkdir(exist_ok=True)
date_list = make_date_list(args.sd, args.ed)
loop = asyncio.new_event_loop()
if args.tool == "download":
date_hour_list = make_date_hour_list(date_list, args.sh, args.eh)
loop.run_until_complete(download_objects(s3, args.t, date_hour_list))
loop.close()
if args.tool == "decompress":
date_hour_list = make_date_hour_list(date_list, args.sh, args.eh, delimiter="-")
loop.run_until_complete(decompress_files(args.t, date_hour_list))
loop.close()
if args.tool == "to_csv":
date_hour_list = make_date_hour_list(date_list, args.sh, args.eh, delimiter="-")
loop.run_until_complete(files_to_csv(args.t, date_hour_list))
loop.close()
print("Done")
if __name__ == "__main__":
main()