commit 5800fb6e2c5556dbd258e8d41901910a0e868ecb Author: DiTus Date: Mon Oct 13 11:28:09 2025 +0200 Initial commit diff --git a/!migrate_to_sqlite.py b/!migrate_to_sqlite.py new file mode 100644 index 0000000..be0d7a5 --- /dev/null +++ b/!migrate_to_sqlite.py @@ -0,0 +1,92 @@ +import argparse +import logging +import os +import sys +import sqlite3 +import pandas as pd + +# Assuming logging_utils.py is in the same directory +from logging_utils import setup_logging + +class Migrator: + """ + Reads 1-minute candle data from CSV files and migrates it into an + SQLite database for improved performance and easier access. + """ + + def __init__(self, log_level: str): + setup_logging(log_level, 'Migrator') + self.source_folder = os.path.join("_data", "candles") + self.db_path = os.path.join("_data", "market_data.db") + + def run(self): + """ + Main execution function to find all CSV files and migrate them to the database. + """ + if not os.path.exists(self.source_folder): + logging.error(f"Source data folder '{self.source_folder}' not found. " + "Please ensure data has been fetched first.") + sys.exit(1) + + csv_files = [f for f in os.listdir(self.source_folder) if f.endswith('_1m.csv')] + + if not csv_files: + logging.warning("No 1-minute CSV files found in the source folder to migrate.") + return + + logging.info(f"Found {len(csv_files)} source CSV files to migrate to SQLite.") + + # Connect to the SQLite database (it will be created if it doesn't exist) + with sqlite3.connect(self.db_path) as conn: + for file_name in csv_files: + coin = file_name.split('_')[0] + table_name = f"{coin}_1m" + file_path = os.path.join(self.source_folder, file_name) + + logging.info(f"Migrating '{file_name}' to table '{table_name}'...") + + try: + # 1. Load the entire CSV file into a pandas DataFrame. + df = pd.read_csv(file_path) + + if df.empty: + logging.warning(f"CSV file '{file_name}' is empty. Skipping.") + continue + + # 2. Convert the timestamp column to a proper datetime object. + df['datetime_utc'] = pd.to_datetime(df['datetime_utc']) + + # 3. Write the DataFrame to the SQLite database. + # 'replace' will drop the table first if it exists and create a new one. + # This is ideal for a migration script to ensure a clean import. + df.to_sql( + table_name, + conn, + if_exists='replace', + index=False # Do not write the pandas DataFrame index as a column + ) + + # 4. (Optional but Recommended) Create an index on the timestamp for fast queries. + logging.debug(f"Creating index on 'datetime_utc' for table '{table_name}'...") + conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_time ON {table_name}(datetime_utc);") + + logging.info(f"Successfully migrated {len(df)} rows to '{table_name}'.") + + except Exception as e: + logging.error(f"Failed to process and migrate file '{file_name}': {e}") + + logging.info("--- Database migration complete ---") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Migrate 1-minute candle data from CSV files to an SQLite database.") + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level for the script." + ) + args = parser.parse_args() + + migrator = Migrator(log_level=args.log_level) + migrator.run() diff --git a/__pycache__/logging_utils.cpython-313.pyc b/__pycache__/logging_utils.cpython-313.pyc new file mode 100644 index 0000000..9515649 Binary files /dev/null and b/__pycache__/logging_utils.cpython-313.pyc differ diff --git a/_data/candles/!clean_csv.py b/_data/candles/!clean_csv.py new file mode 100644 index 0000000..e6d1508 --- /dev/null +++ b/_data/candles/!clean_csv.py @@ -0,0 +1,66 @@ +import pandas as pd +import os + +def process_csv_in_directory(directory_path='.'): + """ + Finds all CSV files in a specified directory, removes duplicate rows, + and saves the cleaned data to new files. + + Args: + directory_path (str): The path to the directory containing the CSV files. + Defaults to the current directory '.'. + """ + # 1. Get a list of all files in the specified directory + try: + all_files = os.listdir(directory_path) + except FileNotFoundError: + print(f"Error: The directory '{directory_path}' was not found.") + return + + # 2. Filter the list to include only CSV files + csv_files = [f for f in all_files if f.endswith('.csv')] + + if not csv_files: + print("No CSV files found in the directory.") + return + + print(f"Found {len(csv_files)} CSV files to process...\n") + + # 3. Loop through each CSV file and process it + for filename in csv_files: + file_path = os.path.join(directory_path, filename) + + try: + # --- Step 1: Open the CSV file --- + print(f"--- Processing file: {filename} ---") + df = pd.read_csv(file_path) + initial_rows = len(df) + print(f"Initial rows: {initial_rows}") + + # --- Step 2: Remove doubled (duplicate) rows --- + df.drop_duplicates(inplace=True) + final_rows = len(df) + + # --- Step 3: Print summary --- + duplicates_removed = initial_rows - final_rows + print(f"Duplicate rows removed: {duplicates_removed}") + print(f"Final rows: {final_rows}") + + # --- Step 4: Save the updated CSV file --- + # Create a new filename to avoid overwriting the original + new_filename = filename.replace('.csv', '_cleaned.csv') + new_file_path = os.path.join(directory_path, new_filename) + + # Save the cleaned DataFrame to the new file + # index=False prevents pandas from writing the DataFrame index as a column + df.to_csv(new_file_path, index=False) + print(f"Cleaned data saved to: '{new_filename}'\n") + + except Exception as e: + print(f"Could not process {filename}. Error: {e}\n") + +# --- How to use it --- +# Run the function on the current directory +# To specify a different directory, pass it as an argument, +# e.g., process_csv_in_directory('/path/to/your/files') +process_csv_in_directory() \ No newline at end of file diff --git a/_data/coin_precision.json b/_data/coin_precision.json new file mode 100644 index 0000000..fb9d0e8 --- /dev/null +++ b/_data/coin_precision.json @@ -0,0 +1,219 @@ +{ + "0G": 0, + "2Z": 0, + "AAVE": 2, + "ACE": 2, + "ADA": 0, + "AI": 1, + "AI16Z": 1, + "AIXBT": 0, + "ALGO": 0, + "ALT": 0, + "ANIME": 0, + "APE": 1, + "APEX": 0, + "APT": 2, + "AR": 2, + "ARB": 1, + "ARK": 0, + "ASTER": 0, + "ATOM": 2, + "AVAX": 2, + "AVNT": 0, + "BABY": 0, + "BADGER": 1, + "BANANA": 1, + "BCH": 3, + "BERA": 1, + "BIGTIME": 0, + "BIO": 0, + "BLAST": 0, + "BLUR": 0, + "BLZ": 0, + "BNB": 3, + "BNT": 0, + "BOME": 0, + "BRETT": 0, + "BSV": 2, + "BTC": 5, + "CAKE": 1, + "CANTO": 0, + "CATI": 0, + "CELO": 0, + "CFX": 0, + "CHILLGUY": 0, + "COMP": 2, + "CRV": 1, + "CYBER": 1, + "DOGE": 0, + "DOOD": 0, + "DOT": 1, + "DYDX": 1, + "DYM": 1, + "EIGEN": 2, + "ENA": 0, + "ENS": 2, + "ETC": 2, + "ETH": 4, + "ETHFI": 1, + "FARTCOIN": 1, + "FET": 0, + "FIL": 1, + "FRIEND": 1, + "FTM": 0, + "FTT": 1, + "FXS": 1, + "GALA": 0, + "GAS": 1, + "GMT": 0, + "GMX": 2, + "GOAT": 0, + "GRASS": 1, + "GRIFFAIN": 0, + "HBAR": 0, + "HEMI": 0, + "HMSTR": 0, + "HPOS": 0, + "HYPE": 2, + "HYPER": 0, + "ILV": 2, + "IMX": 1, + "INIT": 0, + "INJ": 1, + "IO": 1, + "IOTA": 0, + "IP": 1, + "JELLY": 0, + "JTO": 0, + "JUP": 0, + "KAITO": 0, + "KAS": 0, + "LAUNCHCOIN": 0, + "LAYER": 0, + "LDO": 1, + "LINEA": 0, + "LINK": 1, + "LISTA": 0, + "LOOM": 0, + "LTC": 2, + "MANTA": 1, + "MATIC": 1, + "MAV": 0, + "MAVIA": 1, + "ME": 1, + "MELANIA": 1, + "MEME": 0, + "MERL": 0, + "MET": 0, + "MEW": 0, + "MINA": 0, + "MKR": 4, + "MNT": 1, + "MON": 0, + "MOODENG": 0, + "MORPHO": 1, + "MOVE": 0, + "MYRO": 0, + "NEAR": 1, + "NEIROETH": 0, + "NEO": 2, + "NFTI": 1, + "NIL": 0, + "NOT": 0, + "NTRN": 0, + "NXPC": 0, + "OGN": 0, + "OM": 1, + "OMNI": 2, + "ONDO": 0, + "OP": 1, + "ORBS": 0, + "ORDI": 2, + "OX": 0, + "PANDORA": 5, + "PAXG": 3, + "PENDLE": 0, + "PENGU": 0, + "PEOPLE": 0, + "PIXEL": 0, + "PNUT": 1, + "POL": 0, + "POLYX": 0, + "POPCAT": 0, + "PROMPT": 0, + "PROVE": 0, + "PUMP": 0, + "PURR": 0, + "PYTH": 0, + "RDNT": 0, + "RENDER": 1, + "REQ": 0, + "RESOLV": 0, + "REZ": 0, + "RLB": 0, + "RNDR": 1, + "RSR": 0, + "RUNE": 1, + "S": 0, + "SAGA": 1, + "SAND": 0, + "SCR": 1, + "SEI": 0, + "SHIA": 0, + "SKY": 0, + "SNX": 1, + "SOL": 2, + "SOPH": 0, + "SPX": 1, + "STBL": 0, + "STG": 0, + "STRAX": 0, + "STRK": 1, + "STX": 1, + "SUI": 1, + "SUPER": 0, + "SUSHI": 1, + "SYRUP": 0, + "TAO": 3, + "TIA": 1, + "TNSR": 1, + "TON": 1, + "TRB": 2, + "TRUMP": 1, + "TRX": 0, + "TST": 0, + "TURBO": 0, + "UMA": 1, + "UNI": 1, + "UNIBOT": 3, + "USTC": 0, + "USUAL": 1, + "VINE": 0, + "VIRTUAL": 1, + "VVV": 2, + "W": 1, + "WCT": 0, + "WIF": 0, + "WLD": 1, + "WLFI": 0, + "XAI": 1, + "XLM": 0, + "XPL": 0, + "XRP": 0, + "YGG": 0, + "YZY": 0, + "ZEC": 2, + "ZEN": 2, + "ZEREBRO": 0, + "ZETA": 1, + "ZK": 0, + "ZORA": 0, + "ZRO": 1, + "kBONK": 0, + "kDOGS": 0, + "kFLOKI": 0, + "kLUNC": 0, + "kNEIRO": 1, + "kPEPE": 0, + "kSHIB": 0 +} \ No newline at end of file diff --git a/agents b/agents new file mode 100644 index 0000000..e9ce135 --- /dev/null +++ b/agents @@ -0,0 +1,3 @@ +agent 001 +wallet: 0x7773833262f020c7979ec8aae38455c17ba4040c +Private Key: 0x659326d719a4322244d6e7f28e7fa2780f034e9f6a342ef1919664817e6248df \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e69de29 diff --git a/coin_precision.json b/coin_precision.json new file mode 100644 index 0000000..f961508 --- /dev/null +++ b/coin_precision.json @@ -0,0 +1,217 @@ +{ + "0G": 0, + "2Z": 0, + "AAVE": 2, + "ACE": 2, + "ADA": 0, + "AI": 1, + "AI16Z": 1, + "AIXBT": 0, + "ALGO": 0, + "ALT": 0, + "ANIME": 0, + "APE": 1, + "APEX": 0, + "APT": 2, + "AR": 2, + "ARB": 1, + "ARK": 0, + "ASTER": 0, + "ATOM": 2, + "AVAX": 2, + "AVNT": 0, + "BABY": 0, + "BADGER": 1, + "BANANA": 1, + "BCH": 3, + "BERA": 1, + "BIGTIME": 0, + "BIO": 0, + "BLAST": 0, + "BLUR": 0, + "BLZ": 0, + "BNB": 3, + "BNT": 0, + "BOME": 0, + "BRETT": 0, + "BSV": 2, + "BTC": 5, + "CAKE": 1, + "CANTO": 0, + "CATI": 0, + "CELO": 0, + "CFX": 0, + "CHILLGUY": 0, + "COMP": 2, + "CRV": 1, + "CYBER": 1, + "DOGE": 0, + "DOOD": 0, + "DOT": 1, + "DYDX": 1, + "DYM": 1, + "EIGEN": 2, + "ENA": 0, + "ENS": 2, + "ETC": 2, + "ETH": 4, + "ETHFI": 1, + "FARTCOIN": 1, + "FET": 0, + "FIL": 1, + "FRIEND": 1, + "FTM": 0, + "FTT": 1, + "FXS": 1, + "GALA": 0, + "GAS": 1, + "GMT": 0, + "GMX": 2, + "GOAT": 0, + "GRASS": 1, + "GRIFFAIN": 0, + "HBAR": 0, + "HEMI": 0, + "HMSTR": 0, + "HPOS": 0, + "HYPE": 2, + "HYPER": 0, + "ILV": 2, + "IMX": 1, + "INIT": 0, + "INJ": 1, + "IO": 1, + "IOTA": 0, + "IP": 1, + "JELLY": 0, + "JTO": 0, + "JUP": 0, + "KAITO": 0, + "KAS": 0, + "LAUNCHCOIN": 0, + "LAYER": 0, + "LDO": 1, + "LINEA": 0, + "LINK": 1, + "LISTA": 0, + "LOOM": 0, + "LTC": 2, + "MANTA": 1, + "MATIC": 1, + "MAV": 0, + "MAVIA": 1, + "ME": 1, + "MELANIA": 1, + "MEME": 0, + "MERL": 0, + "MEW": 0, + "MINA": 0, + "MKR": 4, + "MNT": 1, + "MOODENG": 0, + "MORPHO": 1, + "MOVE": 0, + "MYRO": 0, + "NEAR": 1, + "NEIROETH": 0, + "NEO": 2, + "NFTI": 1, + "NIL": 0, + "NOT": 0, + "NTRN": 0, + "NXPC": 0, + "OGN": 0, + "OM": 1, + "OMNI": 2, + "ONDO": 0, + "OP": 1, + "ORBS": 0, + "ORDI": 2, + "OX": 0, + "PANDORA": 5, + "PAXG": 3, + "PENDLE": 0, + "PENGU": 0, + "PEOPLE": 0, + "PIXEL": 0, + "PNUT": 1, + "POL": 0, + "POLYX": 0, + "POPCAT": 0, + "PROMPT": 0, + "PROVE": 0, + "PUMP": 0, + "PURR": 0, + "PYTH": 0, + "RDNT": 0, + "RENDER": 1, + "REQ": 0, + "RESOLV": 0, + "REZ": 0, + "RLB": 0, + "RNDR": 1, + "RSR": 0, + "RUNE": 1, + "S": 0, + "SAGA": 1, + "SAND": 0, + "SCR": 1, + "SEI": 0, + "SHIA": 0, + "SKY": 0, + "SNX": 1, + "SOL": 2, + "SOPH": 0, + "SPX": 1, + "STBL": 0, + "STG": 0, + "STRAX": 0, + "STRK": 1, + "STX": 1, + "SUI": 1, + "SUPER": 0, + "SUSHI": 1, + "SYRUP": 0, + "TAO": 3, + "TIA": 1, + "TNSR": 1, + "TON": 1, + "TRB": 2, + "TRUMP": 1, + "TRX": 0, + "TST": 0, + "TURBO": 0, + "UMA": 1, + "UNI": 1, + "UNIBOT": 3, + "USTC": 0, + "USUAL": 1, + "VINE": 0, + "VIRTUAL": 1, + "VVV": 2, + "W": 1, + "WCT": 0, + "WIF": 0, + "WLD": 1, + "WLFI": 0, + "XAI": 1, + "XLM": 0, + "XPL": 0, + "XRP": 0, + "YGG": 0, + "YZY": 0, + "ZEC": 2, + "ZEN": 2, + "ZEREBRO": 0, + "ZETA": 1, + "ZK": 0, + "ZORA": 0, + "ZRO": 1, + "kBONK": 0, + "kDOGS": 0, + "kFLOKI": 0, + "kLUNC": 0, + "kNEIRO": 1, + "kPEPE": 0, + "kSHIB": 0 +} \ No newline at end of file diff --git a/constraints.txt b/constraints.txt new file mode 100644 index 0000000..3fc14b1 --- /dev/null +++ b/constraints.txt @@ -0,0 +1,33 @@ +# requirements.txt + +annotated-types==0.7.0 +bitarray==3.7.1 +certifi==2025.10.5 +charset-normalizer==3.4.3 +ckzg==2.1.5 +cytoolz==1.0.1 +eth-account==0.13.7 +eth-hash==0.7.1 +eth-keyfile==0.8.1 +eth-keys==0.7.0 +eth-rlp==2.2.0 +eth-typing==5.2.1 +eth-utils==5.3.1 +eth_abi==5.2.0 +hexbytes==1.3.1 +hyperliquid-python-sdk==0.19.0 +idna==3.10 +msgpack==1.1.1 +parsimonious==0.10.0 +pycryptodome==3.23.0 +pydantic==2.11.10 +pydantic_core==2.33.2 +regex==2025.9.18 +requests==2.32.5 +rlp==4.1.0 +schedule==1.2.2 +toolz==1.0.0 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +urllib3==2.5.0 +websocket-client==1.8.0 diff --git a/data_fetcher.py b/data_fetcher.py new file mode 100644 index 0000000..63041cd --- /dev/null +++ b/data_fetcher.py @@ -0,0 +1,195 @@ +import argparse +import json +import logging +import os +import sys +import time +import sqlite3 +import pandas as pd +from datetime import datetime, timedelta, timezone + +from hyperliquid.info import Info +from hyperliquid.utils import constants +from hyperliquid.utils.error import ClientError + +# Assuming logging_utils.py is in the same directory +from logging_utils import setup_logging + + +class CandleFetcherDB: + """ + Fetches 1-minute candle data and saves/updates it directly in an SQLite database. + """ + + def __init__(self, coins_to_fetch: list, interval: str, days_back: int): + self.info = Info(constants.MAINNET_API_URL, skip_ws=True) + self.coins = self._resolve_coins(coins_to_fetch) + self.interval = interval + self.days_back = days_back + self.db_path = os.path.join("_data", "market_data.db") + self.column_rename_map = { + 't': 'timestamp_ms', 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume', 'n': 'number_of_trades' + } + + def _resolve_coins(self, coins_arg: list) -> list: + """Determines the final list of coins to fetch.""" + if coins_arg and "all" in [c.lower() for c in coins_arg]: + logging.info("Fetching data for all available coins.") + try: + with open("coin_precision.json", 'r') as f: + return list(json.load(f).keys()) + except FileNotFoundError: + logging.error("'coin_precision.json' not found. Please run list_coins.py first.") + sys.exit(1) + else: + logging.info(f"Fetching data for specified coins: {coins_arg}") + return coins_arg + + def run(self): + """Starts the data fetching process and reports status after each coin.""" + with sqlite3.connect(self.db_path, timeout=10) as self.conn: + self.conn.execute("PRAGMA journal_mode=WAL;") + for coin in self.coins: + logging.info(f"--- Starting process for {coin} ---") + num_updated = self._update_data_for_coin(coin) + self._report_status(coin, num_updated) + time.sleep(1) + + def _report_status(self, last_coin: str, num_updated: int): + """Saves the status of the fetcher run to a JSON file.""" + status_file = os.path.join("_data", "fetcher_status.json") + status = { + "last_updated_coin": last_coin, + "num_updated_candles": num_updated, + "last_run_timestamp_utc": datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + } + try: + with open(status_file, 'w', encoding='utf-8') as f: + json.dump(status, f, indent=4) + logging.info(f"Updated status file. Last processed coin: {last_coin} ({num_updated} candles)") + except IOError as e: + logging.error(f"Failed to write status file: {e}") + + + def _get_start_time(self, coin: str) -> (int, bool): + """Checks the database for an existing table and returns the last timestamp.""" + table_name = f"{coin}_{self.interval}" + try: + cursor = self.conn.cursor() + cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';") + if cursor.fetchone(): + query = f'SELECT MAX(timestamp_ms) FROM "{table_name}"' + last_ts = pd.read_sql(query, self.conn).iloc[0, 0] + if pd.notna(last_ts): + logging.info(f"Existing table '{table_name}' found. Resuming from timestamp: {last_ts}") + return int(last_ts), True + except Exception as e: + logging.warning(f"Could not read from table '{table_name}'. Re-fetching history. Error: {e}") + + start_dt = datetime.now() - timedelta(days=self.days_back) + start_ms = int(start_dt.timestamp() * 1000) + logging.info(f"No valid data in '{table_name}'. Fetching last {self.days_back} days.") + return start_ms, False + + def _update_data_for_coin(self, coin: str) -> int: + """Fetches, processes, and saves new candle data for a single coin to the database.""" + start_time_ms, table_existed = self._get_start_time(coin) + end_time_ms = int(time.time() * 1000) + + if start_time_ms >= end_time_ms: + logging.warning(f"Start time is in the future for {coin}. Skipping.") + return 0 + + all_candles = self._fetch_candles_aggressively(coin, start_time_ms, end_time_ms) + + if not all_candles: + logging.info(f"No new data found for {coin}.") + return 0 + + df = pd.DataFrame(all_candles) + df.drop_duplicates(subset=['t'], keep='last', inplace=True) + if table_existed: + df = df[df['t'] > start_time_ms] + df.sort_values(by='t', inplace=True) + + if not df.empty: + return self._save_to_sqlite_with_pandas(df, coin, table_existed) + else: + logging.info(f"No new candles to append for {coin}.") + return 0 + + def _fetch_candles_aggressively(self, coin, start_ms, end_ms): + """Uses a greedy loop to fetch data efficiently.""" + all_candles = [] + current_start_time = start_ms + while current_start_time < end_ms: + candle_batch = self._fetch_batch_with_retry(coin, current_start_time, end_ms) + if not candle_batch: + break + all_candles.extend(candle_batch) + last_ts = candle_batch[-1]["t"] + if last_ts < current_start_time: + break + current_start_time = last_ts + 1 + time.sleep(0.25) + return all_candles + + def _fetch_batch_with_retry(self, coin, start_ms, end_ms): + """Performs a single API call with a retry mechanism.""" + max_retries = 3 + for attempt in range(max_retries): + try: + return self.info.candles_snapshot(coin, self.interval, start_ms, end_ms) + except ClientError as e: + if e.status_code == 429 and attempt < max_retries - 1: + logging.warning("Rate limited. Retrying...") + time.sleep(2) + else: + logging.error(f"API Error for {coin}: {e}.") + return None + return None + + def _save_to_sqlite_with_pandas(self, df: pd.DataFrame, coin: str, is_append: bool) -> int: + """Saves a pandas DataFrame to an SQLite table and returns the number of saved rows.""" + table_name = f"{coin}_{self.interval}" + try: + df.rename(columns=self.column_rename_map, inplace=True) + df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms') + final_df = df[['datetime_utc', 'timestamp_ms', 'open', 'high', 'low', 'close', 'volume', 'number_of_trades']] + + write_mode = 'append' if is_append else 'replace' + final_df.to_sql(table_name, self.conn, if_exists=write_mode, index=False) + + self.conn.execute(f'CREATE INDEX IF NOT EXISTS "idx_{table_name}_time" ON "{table_name}"(datetime_utc);') + + num_saved = len(final_df) + logging.info(f"Successfully saved {num_saved} candles to table '{table_name}'") + return num_saved + except Exception as e: + logging.error(f"Failed to write to SQLite table '{table_name}': {e}") + return 0 + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Fetch historical candle data and save to SQLite.") + parser.add_argument( + "--coins", + nargs='+', + default=["BTC", "ETH"], + help="List of coins to fetch (e.g., BTC ETH), or 'all' to fetch all coins." + ) + parser.add_argument("--interval", default="1m", help="Candle interval (e.g., 1m, 5m, 1h).") + parser.add_argument("--days", type=int, default=7, help="Number of days of history to fetch for new coins.") + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level." + ) + + args = parser.parse_args() + setup_logging(args.log_level, 'DataFetcherDB') + + fetcher = CandleFetcherDB(coins_to_fetch=args.coins, interval=args.interval, days_back=args.days) + fetcher.run() + diff --git a/data_fetcher_old.py b/data_fetcher_old.py new file mode 100644 index 0000000..5af87fd --- /dev/null +++ b/data_fetcher_old.py @@ -0,0 +1,213 @@ +import argparse +import json +import logging +import os +import sys +import time +from collections import deque +from datetime import datetime, timedelta +import csv + +from hyperliquid.info import Info +from hyperliquid.utils import constants +from hyperliquid.utils.error import ClientError + +# Assuming logging_utils.py is in the same directory +from logging_utils import setup_logging + + +class CandleFetcher: + """ + A class to fetch and manage historical candle data from Hyperliquid. + """ + + def __init__(self, coins_to_fetch: list, interval: str, days_back: int): + self.info = Info(constants.MAINNET_API_URL, skip_ws=True) + self.coins = self._resolve_coins(coins_to_fetch) + self.interval = interval + self.days_back = days_back + self.data_folder = os.path.join("_data", "candles") + self.csv_headers = [ + 'datetime_utc', 'timestamp_ms', 'open', 'high', 'low', 'close', 'volume', 'number_of_trades' + ] + self.header_mapping = { + 't': 'timestamp_ms', 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume', 'n': 'number_of_trades' + } + + def _resolve_coins(self, coins_arg: list) -> list: + """Determines the final list of coins to fetch.""" + if coins_arg and "all" in [c.lower() for c in coins_arg]: + logging.info("Fetching data for all available coins.") + try: + with open("coin_precision.json", 'r') as f: + return list(json.load(f).keys()) + except FileNotFoundError: + logging.error("'coin_precision.json' not found. Please run list_coins.py first.") + sys.exit(1) + else: + logging.info(f"Fetching data for specified coins: {coins_arg}") + return coins_arg + + def run(self): + """Starts the data fetching process for all configured coins.""" + if not os.path.exists(self.data_folder): + os.makedirs(self.data_folder) + logging.info(f"Created data directory: '{self.data_folder}'") + + for coin in self.coins: + logging.info(f"--- Starting process for {coin} ---") + self._update_data_for_coin(coin) + time.sleep(1) # Be polite to the API between processing different coins + + def _get_start_time(self, file_path: str) -> (int, bool): + """Checks for an existing file and returns the last timestamp, or a default start time.""" + if os.path.exists(file_path): + try: + with open(file_path, 'r', newline='', encoding='utf-8') as f: + reader = csv.reader(f) + header = next(reader) + timestamp_index = header.index('timestamp_ms') + last_row = deque(reader, maxlen=1) + if last_row: + last_timestamp = int(last_row[0][timestamp_index]) + logging.info(f"Existing file found. Resuming from timestamp: {last_timestamp}") + return last_timestamp, True + except (IOError, ValueError, StopIteration, IndexError) as e: + logging.warning(f"Could not read '{file_path}'. Re-fetching history. Error: {e}") + + # If file doesn't exist or is invalid, fetch history + start_dt = datetime.now() - timedelta(days=self.days_back) + start_ms = int(start_dt.timestamp() * 1000) + logging.info(f"No valid data file. Fetching last {self.days_back} days.") + return start_ms, False + + def _update_data_for_coin(self, coin: str): + """Fetches and appends new candle data for a single coin.""" + file_path = os.path.join(self.data_folder, f"{coin}_{self.interval}.csv") + start_time_ms, file_existed = self._get_start_time(file_path) + end_time_ms = int(time.time() * 1000) + + if start_time_ms >= end_time_ms: + logging.warning(f"Start time ({datetime.fromtimestamp(start_time_ms/1000)}) is in the future. " + f"This can be caused by an incorrect system clock. No data will be fetched for {coin}.") + return + + all_candles = self._fetch_candles_aggressively(coin, start_time_ms, end_time_ms) + + if not all_candles: + logging.info(f"No new data found for {coin}.") + return + + # --- FIX: Robust de-duplication and filtering --- + # This explicitly processes candles to ensure only new, unique ones are kept. + new_unique_candles = [] + seen_timestamps = set() + + # If updating an existing file, add the last known timestamp to the seen set + # to prevent re-adding the exact same candle. + if file_existed: + seen_timestamps.add(start_time_ms) + + # Sort all fetched candles to process them chronologically + all_candles.sort(key=lambda c: c['t']) + + for candle in all_candles: + timestamp = candle['t'] + # Only process candles that are strictly newer than the last saved one + if timestamp > start_time_ms: + # Add the candle only if we haven't already added this timestamp + if timestamp not in seen_timestamps: + new_unique_candles.append(candle) + seen_timestamps.add(timestamp) + + if new_unique_candles: + self._save_to_csv(new_unique_candles, file_path, file_existed) + else: + logging.info(f"No new candles to append for {coin}.") + + def _fetch_candles_aggressively(self, coin, start_ms, end_ms): + """ + Uses a greedy, self-correcting loop to fetch data efficiently. + This is faster as it reduces the number of API calls. + """ + all_candles = [] + current_start_time = start_ms + total_duration = end_ms - start_ms + + while current_start_time < end_ms: + progress = ((current_start_time - start_ms) / total_duration) * 100 if total_duration > 0 else 100 + current_time_str = datetime.fromtimestamp(current_start_time / 1000).strftime('%Y-%m-%d %H:%M:%S') + logging.info(f"Fetching {coin}: {progress:.2f}% complete. Current: {current_time_str}") + + candle_batch = self._fetch_batch_with_retry(coin, current_start_time, end_ms) + + if not candle_batch: + logging.info("No more candles returned from API. Fetch complete.") + break + + all_candles.extend(candle_batch) + last_candle_timestamp = candle_batch[-1]["t"] + + if last_candle_timestamp < current_start_time: + logging.warning("API returned older candles than requested. Breaking loop to prevent issues.") + break + + current_start_time = last_candle_timestamp + 1 + time.sleep(0.25) # Small delay to be polite + + return all_candles + + def _fetch_batch_with_retry(self, coin, start_ms, end_ms): + """Performs a single API call with a retry mechanism.""" + max_retries = 3 + for attempt in range(max_retries): + try: + return self.info.candles_snapshot(coin, self.interval, start_ms, end_ms) + except ClientError as e: + if e.status_code == 429 and attempt < max_retries - 1: + logging.warning("Rate limited. Retrying in 2 seconds...") + time.sleep(2) + else: + logging.error(f"API Error for {coin}: {e}. Skipping batch.") + return None + return None + + + def _save_to_csv(self, candles: list, file_path: str, is_append: bool): + """Saves a list of candle data to a CSV file.""" + processed_candles = [] + for candle in candles: + new_candle = {self.header_mapping[k]: v for k, v in candle.items() if k in self.header_mapping} + new_candle['datetime_utc'] = datetime.fromtimestamp(candle['t'] / 1000).strftime('%Y-%m-%d %H:%M:%S') + processed_candles.append(new_candle) + + write_mode = 'a' if is_append else 'w' + try: + with open(file_path, write_mode, newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=self.csv_headers) + if not is_append: + writer.writeheader() + writer.writerows(processed_candles) + logging.info(f"Successfully saved {len(processed_candles)} candles to '{file_path}'") + except IOError as e: + logging.error(f"Failed to write to file '{file_path}': {e}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Fetch historical candle data from Hyperliquid.") + parser.add_argument( + "--coins", + nargs='+', + default=["BTC", "ETH"], + help="List of coins to fetch (e.g., BTC ETH), or 'all' to fetch all coins." + ) + parser.add_argument("--interval", default="1m", help="Candle interval (e.g., 1m, 5m, 1h).") + parser.add_argument("--days", type=int, default=7, help="Number of days of history to fetch for new coins.") + + args = parser.parse_args() + + setup_logging('normal', 'DataFetcher') + + fetcher = CandleFetcher(coins_to_fetch=args.coins, interval=args.interval, days_back=args.days) + fetcher.run() + diff --git a/list_coins.py b/list_coins.py new file mode 100644 index 0000000..96a4191 --- /dev/null +++ b/list_coins.py @@ -0,0 +1,61 @@ +import json +import logging +from hyperliquid.info import Info +from hyperliquid.utils import constants + +# Import the setup function from our new logging module +from logging_utils import setup_logging + +def save_coin_precision_data(): + """ + Connects to the Hyperliquid API, gets a list of all listed coins + and their trade size precision, and saves it to a JSON file. + """ + logging.info("Fetching asset information from Hyperliquid...") + + try: + info = Info(constants.MAINNET_API_URL, skip_ws=True) + meta_data = info.meta_and_asset_ctxs()[0] + all_assets = meta_data.get("universe", []) + + if not all_assets: + logging.error("Could not retrieve asset information from the meta object.") + return + + # Create a dictionary mapping the coin name to its precision + coin_precision_map = {} + for asset in all_assets: + name = asset.get("name") + precision = asset.get("szDecimals") + + if name is not None and precision is not None: + coin_precision_map[name] = precision + + # Save the dictionary to a JSON file + file_name = "_data/coin_precision.json" + with open(file_name, 'w', encoding='utf-8') as f: + # indent=4 makes the file readable; sort_keys keeps it organized + json.dump(coin_precision_map, f, indent=4, sort_keys=True) + + logging.info(f"Successfully saved coin precision data to '{file_name}'") + + # Provide an example of how to use the generated file + # print("\n--- Example Usage in another script ---") + # print("import json") + # print("\n# Load the data from the file") + # print("with open('coin_precision.json', 'r') as f:") + # print(" precision_data = json.load(f)") + # print("\n# Access the precision for a specific coin") + # print("eth_precision = precision_data.get('ETH')") + # print("print(f'The size precision for ETH is: {eth_precision}')") + + + except Exception as e: + logging.error(f"An error occurred: {e}") + + +if __name__ == "__main__": + # Setup logging with a specified level and process name + setup_logging('off', 'CoinLister') + save_coin_precision_data() + diff --git a/logging_utils.py b/logging_utils.py new file mode 100644 index 0000000..9ad66af --- /dev/null +++ b/logging_utils.py @@ -0,0 +1,40 @@ +import logging +import sys + +def setup_logging(log_level: str, process_name: str): + """ + Configures logging for a process. + + Args: + log_level: The desired logging level ('off', 'normal', 'debug'). + process_name: The name of the current process for log formatting. + """ + level_map = { + 'normal': logging.INFO, + 'debug': logging.DEBUG, + } + + if log_level == 'off': + logging.getLogger().addHandler(logging.NullHandler()) + return + + log_level_val = level_map.get(log_level.lower()) + if log_level_val is None: + print(f"Invalid log level '{log_level}'. Defaulting to 'normal'.") + log_level_val = logging.INFO + + logger = logging.getLogger() + if logger.hasHandlers(): + logger.handlers.clear() + + handler = logging.StreamHandler(sys.stdout) + + # --- FIX: Added a date format that includes the timezone name (%Z) --- + formatter = logging.Formatter( + f'%(asctime)s - {process_name} - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S %Z' + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(log_level_val) + diff --git a/main_app.py b/main_app.py new file mode 100644 index 0000000..c6682ce --- /dev/null +++ b/main_app.py @@ -0,0 +1,190 @@ +import json +import logging +import os +import sys +import time +import subprocess +import multiprocessing +import schedule +import sqlite3 +import pandas as pd +from datetime import datetime, timezone + +from logging_utils import setup_logging + +# --- Configuration --- +WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"] +COIN_LISTER_SCRIPT = "list_coins.py" +MARKET_FEEDER_SCRIPT = "market.py" +DATA_FETCHER_SCRIPT = "data_fetcher.py" +RESAMPLER_SCRIPT = "resampler.py" # Restored resampler script +PRICE_DATA_FILE = os.path.join("_data", "current_prices.json") +DB_PATH = os.path.join("_data", "market_data.db") +STATUS_FILE = os.path.join("_data", "fetcher_status.json") + + +def run_market_feeder(): + """Target function to run the market.py script in a separate process.""" + setup_logging('normal', 'MarketFeedProcess') + logging.info("Market feeder process started.") + try: + subprocess.run([sys.executable, MARKET_FEEDER_SCRIPT], check=True) + except subprocess.CalledProcessError as e: + logging.error(f"Market feeder script failed with error: {e}") + except KeyboardInterrupt: + logging.info("Market feeder process stopping.") + + +def run_data_fetcher_job(): + """Defines the job to be run by the scheduler for the data fetcher.""" + logging.info(f"Scheduler starting data_fetcher.py task for {', '.join(WATCHED_COINS)}...") + try: + command = [sys.executable, DATA_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--days", "7", "--log-level", "off"] + subprocess.run(command, check=True) + logging.info("data_fetcher.py task finished successfully.") + except Exception as e: + logging.error(f"Failed to run data_fetcher.py job: {e}") + + +def data_fetcher_scheduler(): + """Schedules and runs the data_fetcher.py script periodically.""" + setup_logging('normal', 'DataFetcherScheduler') + run_data_fetcher_job() + schedule.every(1).minutes.do(run_data_fetcher_job) + logging.info("Data fetcher scheduled to run every 1 minute.") + while True: + schedule.run_pending() + time.sleep(1) + +# --- Restored Resampler Functions --- +def run_resampler_job(): + """Defines the job to be run by the scheduler for the resampler.""" + logging.info(f"Scheduler starting resampler.py task for {', '.join(WATCHED_COINS)}...") + try: + # Uses default timeframes configured within resampler.py + command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] + subprocess.run(command, check=True) + logging.info("resampler.py task finished successfully.") + except Exception as e: + logging.error(f"Failed to run resampler.py job: {e}") + + +def resampler_scheduler(): + """Schedules and runs the resampler.py script periodically.""" + setup_logging('normal', 'ResamplerScheduler') + run_resampler_job() + schedule.every(4).minutes.do(run_resampler_job) + logging.info("Resampler scheduled to run every 4 minutes.") + while True: + schedule.run_pending() + time.sleep(1) +# --- End of Restored Functions --- + +class MainApp: + def __init__(self, coins_to_watch: list): + self.watched_coins = coins_to_watch + self.prices = {} + self.last_db_update_info = "Initializing..." + + def read_prices(self): + """Reads the latest prices from the JSON file.""" + if not os.path.exists(PRICE_DATA_FILE): + return + try: + with open(PRICE_DATA_FILE, 'r', encoding='utf-8') as f: + self.prices = json.load(f) + except (json.JSONDecodeError, IOError): + logging.debug("Could not read price file (might be locked).") + + def get_overall_db_status(self): + """Reads the fetcher status from the status file.""" + if not os.path.exists(STATUS_FILE): + self.last_db_update_info = "Status file not found." + return + try: + with open(STATUS_FILE, 'r', encoding='utf-8') as f: + status = json.load(f) + coin = status.get("last_updated_coin") + timestamp_utc_str = status.get("last_run_timestamp_utc") + num_candles = status.get("num_updated_candles", 0) + + if timestamp_utc_str: + dt_naive = datetime.strptime(timestamp_utc_str, '%Y-%m-%d %H:%M:%S') + dt_utc = dt_naive.replace(tzinfo=timezone.utc) + dt_local = dt_utc.astimezone(None) + timestamp_display = dt_local.strftime('%Y-%m-%d %H:%M:%S %Z') + else: + timestamp_display = "N/A" + + self.last_db_update_info = f"{coin} at {timestamp_display} ({num_candles} candles)" + except (IOError, json.JSONDecodeError) as e: + self.last_db_update_info = "Error reading status file." + logging.error(f"Could not read status file: {e}") + + def display_dashboard(self): + """Displays a formatted table for prices and DB status.""" + print("\x1b[H\x1b[J", end="") + + print("--- Market Dashboard ---") + table_width = 26 + print("-" * table_width) + print(f"{'#':<2} | {'Coin':<6} | {'Live Price':>10} |") + print("-" * table_width) + for i, coin in enumerate(self.watched_coins, 1): + price = self.prices.get(coin, "Loading...") + print(f"{i:<2} | {coin:<6} | {price:>10} |") + print("-" * table_width) + print(f"DB Status: Last coin updated -> {self.last_db_update_info}") + sys.stdout.flush() + + def run(self): + """Main loop to read and display data.""" + while True: + self.read_prices() + self.get_overall_db_status() + self.display_dashboard() + time.sleep(2) + + +if __name__ == "__main__": + setup_logging('normal', 'MainApp') + + logging.info(f"Running coin lister: '{COIN_LISTER_SCRIPT}'...") + try: + subprocess.run([sys.executable, COIN_LISTER_SCRIPT], check=True, capture_output=True, text=True) + except subprocess.CalledProcessError as e: + logging.error(f"Failed to run '{COIN_LISTER_SCRIPT}'. Error: {e.stderr}") + sys.exit(1) + + logging.info(f"Starting market feeder ('{MARKET_FEEDER_SCRIPT}')...") + market_process = multiprocessing.Process(target=run_market_feeder, daemon=True) + market_process.start() + + logging.info(f"Starting historical data fetcher ('{DATA_FETCHER_SCRIPT}')...") + fetcher_process = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True) + fetcher_process.start() + + # --- Restored Resampler Process Start --- + logging.info(f"Starting resampler ('{RESAMPLER_SCRIPT}')...") + resampler_process = multiprocessing.Process(target=resampler_scheduler, daemon=True) + resampler_process.start() + # --- End Resampler Process Start --- + + time.sleep(3) + + app = MainApp(coins_to_watch=WATCHED_COINS) + try: + app.run() + except KeyboardInterrupt: + logging.info("Shutting down...") + market_process.terminate() + fetcher_process.terminate() + # --- Restored Resampler Shutdown --- + resampler_process.terminate() + market_process.join() + fetcher_process.join() + resampler_process.join() + # --- End Resampler Shutdown --- + logging.info("Shutdown complete.") + sys.exit(0) + diff --git a/market.py b/market.py new file mode 100644 index 0000000..1c69d89 --- /dev/null +++ b/market.py @@ -0,0 +1,130 @@ +import json +import logging +import os +import sys +import time +from datetime import datetime + +from hyperliquid.info import Info +from hyperliquid.utils import constants +from logging_utils import setup_logging + + +class MarketData: + """ + Manages fetching and storing real-time market data for all coins. + """ + def __init__(self, coins_file="_data/coin_precision.json"): + self.info = Info(constants.MAINNET_API_URL, skip_ws=True) + self.coins_file = coins_file + self.target_coins = self.load_coins() + self.current_prices = {} + self.data_folder = "_data" + self.output_file = os.path.join(self.data_folder, "current_prices.json") + + def load_coins(self) -> list: + """Loads the list of target coins from the precision data file.""" + if not os.path.exists(self.coins_file): + logging.error(f"'{self.coins_file}' not found. Please run the coin lister script first.") + sys.exit(1) + + with open(self.coins_file, 'r') as f: + data = json.load(f) + logging.info(f"Loaded {len(data)} coins from '{self.coins_file}'.") + return list(data.keys()) + + def fetch_and_update_prices(self): + """Fetches the latest market data and updates the price dictionary.""" + try: + # The API returns a tuple: (static_meta_data, dynamic_asset_contexts) + meta_data, asset_contexts = self.info.meta_and_asset_ctxs() + + if not asset_contexts or "universe" not in meta_data: + logging.warning("API did not return sufficient market data.") + return + + universe = meta_data["universe"] + + # Create a temporary dictionary by pairing the static name with the dynamic price. + # The two lists are ordered by the same asset index. + api_prices = {} + for asset_meta, asset_context in zip(universe, asset_contexts): + coin_name = asset_meta.get("name") + mark_price = asset_context.get("markPx") + if coin_name and mark_price: + api_prices[coin_name] = mark_price + + # Update our price dictionary for the coins we are tracking + for coin in self.target_coins: + if coin in api_prices: + self.current_prices[coin] = api_prices[coin] + else: + self.current_prices.pop(coin, None) # Remove if it's no longer in the context + + except Exception as e: + logging.error(f"An error occurred while fetching prices: {e}") + + def display_prices(self): + """Displays the current prices in a formatted table if debug is enabled.""" + if not logging.getLogger().isEnabledFor(logging.DEBUG): + return + + # Use ANSI escape codes for a smoother, in-place update + print("\x1b[H\x1b[J", end="") + + print("--- Hyperliquid Market Prices ---") + print(f"Last Updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + # Adjust width for the new row number column + table_width = 40 + print("-" * table_width) + print(f"{'#':<4} | {'Coin':<10} | {'Price':>20}") + print("-" * table_width) + + sorted_coins = sorted(self.current_prices.keys()) + for i, coin in enumerate(sorted_coins, 1): + price = self.current_prices.get(coin, "N/A") + print(f"{i:<4} | {coin:<10} | {price:>20}") + + print("-" * table_width) + # Flush the output to ensure it's displayed immediately + sys.stdout.flush() + + def save_prices_to_file(self): + """Atomically saves the current prices to a JSON file in the _data folder.""" + # Ensure the data directory exists + if not os.path.exists(self.data_folder): + os.makedirs(self.data_folder) + logging.info(f"Created data directory: '{self.data_folder}'") + + temp_file = f"{self.output_file}.tmp" + try: + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(self.current_prices, f, indent=4) + # Atomic move/rename + os.replace(temp_file, self.output_file) + logging.debug(f"Prices successfully saved to '{self.output_file}'") + except Exception as e: + logging.error(f"Failed to save prices to file: {e}") + + def run(self): + """Starts the main loop to fetch and update market data.""" + logging.info("Starting market data feed. Press Ctrl+C to stop.") + while True: + self.fetch_and_update_prices() + # Save data (and its log message) BEFORE clearing and displaying + self.save_prices_to_file() + self.display_prices() + time.sleep(1) + + +if __name__ == "__main__": + # Change 'debug' to 'normal' to hide the price table + setup_logging('normal', 'MarketFeed') + + market_data = MarketData() + try: + market_data.run() + except KeyboardInterrupt: + logging.info("Market data feed stopped by user.") + sys.exit(0) + diff --git a/market_old.py b/market_old.py new file mode 100644 index 0000000..ea81112 --- /dev/null +++ b/market_old.py @@ -0,0 +1,150 @@ +from hyperliquid.info import Info +from hyperliquid.utils import constants +import time +import os +import sys + +def get_asset_prices(asset_names=["BTC", "ETH", "SOL", "BNB", "FARTCOIN", "PUMP", "TRUMP", "ZEC"]): + """ + Connects to the Hyperliquid API to get the current mark price of specified assets. + + Args: + asset_names (list): A list of asset names to retrieve prices for. + + Returns: + list: A list of dictionaries, where each dictionary contains the name and mark price of an asset. + Returns an empty list if the API call fails or no assets are found. + """ + try: + info = Info(constants.MAINNET_API_URL, skip_ws=True) + meta, asset_contexts = info.meta_and_asset_ctxs() + + universe = meta.get("universe", []) + asset_data = [] + + for name in asset_names: + try: + index = next(i for i, asset in enumerate(universe) if asset["name"] == name) + context = asset_contexts[index] + asset_data.append({ + "name": name, + "mark_price": context.get("markPx") + }) + except StopIteration: + print(f"Warning: Could not find asset '{name}' in the API response.") + + return asset_data + + except KeyError: + print("Error: A KeyError occurred. The structure of the API response may have changed.") + return [] + except Exception as e: + print(f"An unexpected error occurred: {e}") + return [] + +def clear_console(): + # Cross-platform clear screen + if os.name == 'nt': + os.system('cls') + else: + print('\033c', end='') + +def display_prices_table(prices, previous_prices): + """ + Displays a list of asset prices in a formatted table with price change indicators. + Clears the console before displaying to keep the table in the same place. + Args: + prices (list): A list of asset data dictionaries from get_asset_prices. + previous_prices (dict): A dictionary of previous prices with asset names as keys. + """ + clear_console() + if not prices: + print("No price data to display.") + return + + # Filter prices to only include assets in assets_to_track + tracked_assets = {asset['name'] for asset in assets_to_track} + prices = [asset for asset in prices if asset['name'] in tracked_assets] + + # ANSI color codes + GREEN = '\033[92m' + RED = '\033[91m' + RESET = '\033[0m' + + print(f"{'Asset':<12} | {'Mark Price':<20} | {'Change'}") + print("-" * 40) + for asset in prices: + current_price = float(asset['mark_price']) if asset['mark_price'] else 0 + previous_price = previous_prices.get(asset['name'], 0) + + indicator = " " + color = RESET + if previous_price and current_price > previous_price: + indicator = "↑" + color = GREEN + elif previous_price and current_price < previous_price: + indicator = "↓" + color = RED + + # Use precision set in assets_to_track + precision = next((a['precision'] for a in assets_to_track if a['name'] == asset['name']), 2) + price_str = f"${current_price:,.{precision}f}" if current_price else "N/A" + print(f"{asset['name']:<12} | {color}{price_str:<20}{RESET} | {color}{indicator}{RESET}") + """ + Displays a list of asset prices in a formatted table with price change indicators. + Clears the console before displaying to keep the table in the same place. + Args: + prices (list): A list of asset data dictionaries from get_asset_prices. + previous_prices (dict): A dictionary of previous prices with asset names as keys. + """ + clear_console() + if not prices: + print("No price data to display.") + return + + # ANSI color codes + GREEN = '\033[92m' + RED = '\033[91m' + RESET = '\033[0m' + + print("\n") + print("-" * 38) + print(f"{'Asset':<8} | {'Mark Price':<15} | {'Change':<6} |") + print("-" * 38) + for asset in prices: + current_price = float(asset['mark_price']) if asset['mark_price'] else 0 + previous_price = previous_prices.get(asset['name'], 0) + + indicator = " " + color = RESET + if previous_price and current_price > previous_price: + indicator = "↑" + color = GREEN + elif previous_price and current_price < previous_price: + indicator = "↓" + color = RED + + # Use precision set in assets_to_track + precision = next((a['precision'] for a in assets_to_track if a['name'] == asset['name']), 2) + price_str = f"${current_price:,.{precision}f}" if current_price else "N/A" + print(f"{asset['name']:<8} | {color}{price_str:<15}{RESET} | {color}{indicator:<4}{RESET} | ") + print("-" * 38) +if __name__ == "__main__": + assets_to_track = [ + {"name": "BTC", "precision": 0} + ] + previous_prices = {} + + while True: + # Pass only the asset names to get_asset_prices + asset_names = [a["name"] for a in assets_to_track] + current_prices_data = get_asset_prices(asset_names) + display_prices_table(current_prices_data, previous_prices) + + # Update previous_prices for the next iteration + for asset in current_prices_data: + if asset['mark_price']: + previous_prices[asset['name']] = float(asset['mark_price']) + + time.sleep(1) # Add a delay to avoid overwhelming the API + diff --git a/resampler.py b/resampler.py new file mode 100644 index 0000000..f9740a9 --- /dev/null +++ b/resampler.py @@ -0,0 +1,186 @@ +import argparse +import logging +import os +import sys +import sqlite3 +import pandas as pd +import json +from datetime import datetime, timezone + +# Assuming logging_utils.py is in the same directory +from logging_utils import setup_logging + +class Resampler: + """ + Reads 1-minute candle data directly from the SQLite database, resamples + it to various timeframes, and stores the results back in the database. + """ + + def __init__(self, log_level: str, coins: list, timeframes: dict): + setup_logging(log_level, 'Resampler') + self.db_path = os.path.join("_data", "market_data.db") + self.status_file_path = os.path.join("_data", "resampling_status.json") + self.coins_to_process = coins + self.timeframes = timeframes + self.aggregation_logic = { + 'open': 'first', + 'high': 'max', + 'low': 'min', + 'close': 'last', + 'volume': 'sum', + 'number_of_trades': 'sum' + } + self.resampling_status = self._load_existing_status() + + def _load_existing_status(self) -> dict: + """Loads the existing status file if it exists, otherwise returns an empty dict.""" + if os.path.exists(self.status_file_path): + try: + with open(self.status_file_path, 'r', encoding='utf-8') as f: + logging.info(f"Loading existing status from '{self.status_file_path}'") + return json.load(f) + except (IOError, json.JSONDecodeError) as e: + logging.warning(f"Could not read existing status file. Starting fresh. Error: {e}") + return {} + + def run(self): + """ + Main execution function to process all configured coins and update the database. + """ + if not os.path.exists(self.db_path): + logging.error(f"Database file '{self.db_path}' not found. " + "Please run the data fetcher script first.") + sys.exit(1) + + with sqlite3.connect(self.db_path) as conn: + conn.execute("PRAGMA journal_mode=WAL;") + + logging.info(f"Processing {len(self.coins_to_process)} coins: {', '.join(self.coins_to_process)}") + + for coin in self.coins_to_process: + source_table_name = f"{coin}_1m" + logging.info(f"--- Processing {coin} ---") + + try: + df = pd.read_sql(f'SELECT * FROM "{source_table_name}"', conn) + + if df.empty: + logging.warning(f"Source table '{source_table_name}' is empty or does not exist. Skipping.") + continue + + df['datetime_utc'] = pd.to_datetime(df['datetime_utc']) + df.set_index('datetime_utc', inplace=True) + + for tf_name, tf_code in self.timeframes.items(): + logging.info(f" Resampling to {tf_name}...") + + resampled_df = df.resample(tf_code).agg(self.aggregation_logic) + resampled_df.dropna(how='all', inplace=True) + + if coin not in self.resampling_status: + self.resampling_status[coin] = {} + + if not resampled_df.empty: + target_table_name = f"{coin}_{tf_name}" + resampled_df.to_sql( + target_table_name, + conn, + if_exists='replace', + index=True + ) + + last_timestamp = resampled_df.index[-1].strftime('%Y-%m-%d %H:%M:%S') + num_candles = len(resampled_df) + + self.resampling_status[coin][tf_name] = { + "last_candle_utc": last_timestamp, + "total_candles": num_candles + } + else: + logging.info(f" -> No data to save for '{coin}_{tf_name}'.") + self.resampling_status[coin][tf_name] = { + "last_candle_utc": "N/A", + "total_candles": 0 + } + + except pd.io.sql.DatabaseError as e: + logging.warning(f"Could not read source table '{source_table_name}': {e}") + except Exception as e: + logging.error(f"Failed to process coin '{coin}': {e}") + + self._save_status() + logging.info("--- Resampling process complete ---") + + def _save_status(self): + """Saves the final resampling status to a JSON file.""" + if not self.resampling_status: + logging.warning("No data was resampled, skipping status file creation.") + return + + self.resampling_status['last_completed_utc'] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + + try: + with open(self.status_file_path, 'w', encoding='utf-8') as f: + json.dump(self.resampling_status, f, indent=4, sort_keys=True) + logging.info(f"Successfully saved resampling status to '{self.status_file_path}'") + except IOError as e: + logging.error(f"Failed to write resampling status file: {e}") + + +def parse_timeframes(tf_strings: list) -> dict: + """Converts a list of timeframe strings into a dictionary for pandas.""" + tf_map = {} + for tf_str in tf_strings: + numeric_part = ''.join(filter(str.isdigit, tf_str)) + unit = ''.join(filter(str.isalpha, tf_str)).lower() + + code = '' + if unit == 'm': + code = f"{numeric_part}min" + elif unit in ['h', 'd', 'w']: + code = f"{numeric_part}{unit}" + else: + code = tf_str + logging.warning(f"Unrecognized timeframe unit in '{tf_str}'. Using as-is.") + + tf_map[tf_str] = code + return tf_map + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Resample 1-minute candle data from SQLite to other timeframes.") + parser.add_argument( + "--coins", + nargs='+', + default=["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"], + help="List of coins to process." + ) + parser.add_argument( + "--timeframes", + nargs='+', + default=['4m', '5m', '15m', '30m', '37m', '148m', '4h', '12h', '1d', '1w'], + help="List of timeframes to generate (e.g., 5m 1h 1d)." + ) + parser.add_argument( + "--timeframe", + dest="timeframes", + nargs='+', + help=argparse.SUPPRESS + ) + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level for the script." + ) + args = parser.parse_args() + + timeframes_dict = parse_timeframes(args.timeframes) + + resampler = Resampler( + log_level=args.log_level, + coins=args.coins, + timeframes=timeframes_dict + ) + resampler.run() + diff --git a/sdk/hyperliquid-python-sdk b/sdk/hyperliquid-python-sdk new file mode 160000 index 0000000..64b252e --- /dev/null +++ b/sdk/hyperliquid-python-sdk @@ -0,0 +1 @@ +Subproject commit 64b252e99d1cc211a5edc7346387fbbdae4cbdbc