import json import logging import os import sys import time from datetime import datetime from hyperliquid.info import Info from hyperliquid.utils import constants from logging_utils import setup_logging class MarketData: """ Manages fetching and storing real-time market data for all coins. """ def __init__(self, coins_file="_data/coin_precision.json"): self.info = Info(constants.MAINNET_API_URL, skip_ws=True) self.coins_file = coins_file self.target_coins = self.load_coins() self.current_prices = {} self.data_folder = "_data" self.output_file = os.path.join(self.data_folder, "current_prices.json") def load_coins(self) -> list: """Loads the list of target coins from the precision data file.""" if not os.path.exists(self.coins_file): logging.error(f"'{self.coins_file}' not found. Please run the coin lister script first.") sys.exit(1) with open(self.coins_file, 'r') as f: data = json.load(f) logging.info(f"Loaded {len(data)} coins from '{self.coins_file}'.") return list(data.keys()) def fetch_and_update_prices(self): """Fetches the latest market data and updates the price dictionary.""" try: # The API returns a tuple: (static_meta_data, dynamic_asset_contexts) meta_data, asset_contexts = self.info.meta_and_asset_ctxs() if not asset_contexts or "universe" not in meta_data: logging.warning("API did not return sufficient market data.") return universe = meta_data["universe"] # Create a temporary dictionary by pairing the static name with the dynamic price. # The two lists are ordered by the same asset index. api_prices = {} for asset_meta, asset_context in zip(universe, asset_contexts): coin_name = asset_meta.get("name") mark_price = asset_context.get("markPx") if coin_name and mark_price: api_prices[coin_name] = mark_price # Update our price dictionary for the coins we are tracking for coin in self.target_coins: if coin in api_prices: self.current_prices[coin] = api_prices[coin] else: self.current_prices.pop(coin, None) # Remove if it's no longer in the context except Exception as e: logging.error(f"An error occurred while fetching prices: {e}") def display_prices(self): """Displays the current prices in a formatted table if debug is enabled.""" if not logging.getLogger().isEnabledFor(logging.DEBUG): return # Use ANSI escape codes for a smoother, in-place update print("\x1b[H\x1b[J", end="") print("--- Hyperliquid Market Prices ---") print(f"Last Updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # Adjust width for the new row number column table_width = 40 print("-" * table_width) print(f"{'#':<4} | {'Coin':<10} | {'Price':>20}") print("-" * table_width) sorted_coins = sorted(self.current_prices.keys()) for i, coin in enumerate(sorted_coins, 1): price = self.current_prices.get(coin, "N/A") print(f"{i:<4} | {coin:<10} | {price:>20}") print("-" * table_width) # Flush the output to ensure it's displayed immediately sys.stdout.flush() def save_prices_to_file(self): """Atomically saves the current prices to a JSON file in the _data folder.""" # Ensure the data directory exists if not os.path.exists(self.data_folder): os.makedirs(self.data_folder) logging.info(f"Created data directory: '{self.data_folder}'") temp_file = f"{self.output_file}.tmp" try: with open(temp_file, 'w', encoding='utf-8') as f: json.dump(self.current_prices, f, indent=4) # Atomic move/rename os.replace(temp_file, self.output_file) logging.debug(f"Prices successfully saved to '{self.output_file}'") except Exception as e: logging.error(f"Failed to save prices to file: {e}") def run(self): """Starts the main loop to fetch and update market data.""" logging.info("Starting market data feed. Press Ctrl+C to stop.") while True: self.fetch_and_update_prices() # Save data (and its log message) BEFORE clearing and displaying self.save_prices_to_file() self.display_prices() time.sleep(1) if __name__ == "__main__": # Change 'debug' to 'normal' to hide the price table setup_logging('normal', 'MarketFeed') market_data = MarketData() try: market_data.run() except KeyboardInterrupt: logging.info("Market data feed stopped by user.") sys.exit(0)