no blinking on the dashboard

This commit is contained in:
2025-10-13 13:26:34 +02:00
parent 5800fb6e2c
commit ad4cf07585
3 changed files with 118 additions and 100 deletions

View File

@ -25,10 +25,11 @@ STATUS_FILE = os.path.join("_data", "fetcher_status.json")
def run_market_feeder():
"""Target function to run the market.py script in a separate process."""
setup_logging('normal', 'MarketFeedProcess')
setup_logging('off', 'MarketFeedProcess')
logging.info("Market feeder process started.")
try:
subprocess.run([sys.executable, MARKET_FEEDER_SCRIPT], check=True)
# Pass the log level to the script
subprocess.run([sys.executable, MARKET_FEEDER_SCRIPT, "--log-level", "off"], check=True)
except subprocess.CalledProcessError as e:
logging.error(f"Market feeder script failed with error: {e}")
except KeyboardInterrupt:
@ -48,7 +49,7 @@ def run_data_fetcher_job():
def data_fetcher_scheduler():
"""Schedules and runs the data_fetcher.py script periodically."""
setup_logging('normal', 'DataFetcherScheduler')
setup_logging('off', 'DataFetcherScheduler')
run_data_fetcher_job()
schedule.every(1).minutes.do(run_data_fetcher_job)
logging.info("Data fetcher scheduled to run every 1 minute.")
@ -71,7 +72,7 @@ def run_resampler_job():
def resampler_scheduler():
"""Schedules and runs the resampler.py script periodically."""
setup_logging('normal', 'ResamplerScheduler')
setup_logging('off', 'ResamplerScheduler')
run_resampler_job()
schedule.every(4).minutes.do(run_resampler_job)
logging.info("Resampler scheduled to run every 4 minutes.")
@ -85,6 +86,7 @@ class MainApp:
self.watched_coins = coins_to_watch
self.prices = {}
self.last_db_update_info = "Initializing..."
self._lines_printed = 0 # To track how many lines we printed last time
def read_prices(self):
"""Reads the latest prices from the JSON file."""
@ -122,19 +124,32 @@ class MainApp:
logging.error(f"Could not read status file: {e}")
def display_dashboard(self):
"""Displays a formatted table for prices and DB status."""
print("\x1b[H\x1b[J", end="")
"""Displays a formatted table for prices and DB status without blinking."""
# Move the cursor up to overwrite the previous output
if self._lines_printed > 0:
print(f"\x1b[{self._lines_printed}A", end="")
print("--- Market Dashboard ---")
# Build the output as a single string
output_lines = []
output_lines.append("--- Market Dashboard ---")
table_width = 26
print("-" * table_width)
print(f"{'#':<2} | {'Coin':<6} | {'Live Price':>10} |")
print("-" * table_width)
output_lines.append("-" * table_width)
output_lines.append(f"{'#':<2} | {'Coin':<6} | {'Live Price':>10} |")
output_lines.append("-" * table_width)
for i, coin in enumerate(self.watched_coins, 1):
price = self.prices.get(coin, "Loading...")
print(f"{i:<2} | {coin:<6} | {price:>10} |")
print("-" * table_width)
print(f"DB Status: Last coin updated -> {self.last_db_update_info}")
output_lines.append(f"{i:<2} | {coin:<6} | {price:>10} |")
output_lines.append("-" * table_width)
output_lines.append(f"DB Status: Last coin updated -> {self.last_db_update_info}")
# Join lines and add a code to clear from cursor to end of screen
# This prevents artifacts if the new output is shorter than the old one.
final_output = "\n".join(output_lines) + "\n\x1b[J"
print(final_output, end="")
# Store the number of lines printed for the next iteration
self._lines_printed = len(output_lines)
sys.stdout.flush()
def run(self):

172
market.py
View File

@ -3,128 +3,128 @@ import logging
import os
import sys
import time
import argparse
from datetime import datetime
from hyperliquid.info import Info
from hyperliquid.utils import constants
from hyperliquid.utils.error import ClientError
# Assuming logging_utils.py is in the same directory
from logging_utils import setup_logging
class MarketDataFeeder:
"""
Fetches live market prices for all coins from Hyperliquid, displays them
in a table, and saves the data for other scripts to use.
"""
class MarketData:
"""
Manages fetching and storing real-time market data for all coins.
"""
def __init__(self, coins_file="_data/coin_precision.json"):
def __init__(self):
self.info = Info(constants.MAINNET_API_URL, skip_ws=True)
self.coins_file = coins_file
self.target_coins = self.load_coins()
self.current_prices = {}
self.coins = self._load_coins()
self.data_folder = "_data"
self.output_file = os.path.join(self.data_folder, "current_prices.json")
self.price_file = os.path.join(self.data_folder, "current_prices.json")
def load_coins(self) -> list:
"""Loads the list of target coins from the precision data file."""
if not os.path.exists(self.coins_file):
logging.error(f"'{self.coins_file}' not found. Please run the coin lister script first.")
def _load_coins(self) -> list:
"""Loads the list of coins from the coin_precision.json file."""
try:
with open("coin_precision.json", 'r') as f:
coins_data = json.load(f)
logging.info(f"Loaded {len(coins_data)} coins from 'coin_precision.json'.")
return list(coins_data.keys())
except FileNotFoundError:
logging.error("'coin_precision.json' not found. Please run list_coins.py first.")
sys.exit(1)
except (IOError, json.JSONDecodeError) as e:
logging.error(f"Failed to load or parse 'coin_precision.json': {e}")
sys.exit(1)
with open(self.coins_file, 'r') as f:
data = json.load(f)
logging.info(f"Loaded {len(data)} coins from '{self.coins_file}'.")
return list(data.keys())
def fetch_and_update_prices(self):
"""Fetches the latest market data and updates the price dictionary."""
"""Fetches all asset contexts and updates the local price dictionary."""
try:
# The API returns a tuple: (static_meta_data, dynamic_asset_contexts)
meta_data, asset_contexts = self.info.meta_and_asset_ctxs()
if not asset_contexts or "universe" not in meta_data:
logging.warning("API did not return sufficient market data.")
return
universe = meta_data["universe"]
# Create a temporary dictionary by pairing the static name with the dynamic price.
# The two lists are ordered by the same asset index.
api_prices = {}
for asset_meta, asset_context in zip(universe, asset_contexts):
coin_name = asset_meta.get("name")
mark_price = asset_context.get("markPx")
if coin_name and mark_price:
api_prices[coin_name] = mark_price
# Update our price dictionary for the coins we are tracking
for coin in self.target_coins:
if coin in api_prices:
self.current_prices[coin] = api_prices[coin]
else:
self.current_prices.pop(coin, None) # Remove if it's no longer in the context
except Exception as e:
logging.error(f"An error occurred while fetching prices: {e}")
def display_prices(self):
"""Displays the current prices in a formatted table if debug is enabled."""
if not logging.getLogger().isEnabledFor(logging.DEBUG):
return
meta, asset_contexts = self.info.meta_and_asset_ctxs()
# Use ANSI escape codes for a smoother, in-place update
print("\x1b[H\x1b[J", end="")
prices = {}
# The API returns two lists. The first has static info (name), the second has live data (markPx).
# We need to pair them up. They are guaranteed to be in the same order.
for asset_info, asset_ctx in zip(meta["universe"], asset_contexts):
coin_name = asset_info["name"]
if asset_ctx: # The context can sometimes be null for unlisted assets
mark_price = asset_ctx["markPx"]
prices[coin_name] = float(mark_price)
return prices
except ClientError as e:
logging.error(f"An API error occurred: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during fetch: {e}")
return None
def save_prices_to_file(self, prices: dict):
"""Saves the current price dictionary to a JSON file."""
if not os.path.exists(self.data_folder):
os.makedirs(self.data_folder)
try:
with open(self.price_file, 'w', encoding='utf-8') as f:
json.dump(prices, f, indent=4)
logging.debug(f"Prices successfully saved to '{self.price_file}'")
except IOError as e:
logging.error(f"Failed to write to price file: {e}")
def display_prices(self, prices: dict):
"""Clears the screen and displays a formatted table of all prices."""
# Use ANSI escape codes to clear the screen and move the cursor to the top-left
print("\x1b[H\x1b[J", end="")
print("--- Hyperliquid Market Prices ---")
print(f"Last Updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Adjust width for the new row number column
table_width = 40
print("-" * table_width)
print(f"{'#':<4} | {'Coin':<10} | {'Price':>20}")
print("-" * table_width)
sorted_coins = sorted(self.current_prices.keys())
for i, coin in enumerate(sorted_coins, 1):
price = self.current_prices.get(coin, "N/A")
# Sort prices by coin name for a consistent display
sorted_prices = sorted(prices.items())
for i, (coin, price) in enumerate(sorted_prices, 1):
print(f"{i:<4} | {coin:<10} | {price:>20}")
print("-" * table_width)
# Flush the output to ensure it's displayed immediately
sys.stdout.flush()
def save_prices_to_file(self):
"""Atomically saves the current prices to a JSON file in the _data folder."""
# Ensure the data directory exists
if not os.path.exists(self.data_folder):
os.makedirs(self.data_folder)
logging.info(f"Created data directory: '{self.data_folder}'")
temp_file = f"{self.output_file}.tmp"
try:
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(self.current_prices, f, indent=4)
# Atomic move/rename
os.replace(temp_file, self.output_file)
logging.debug(f"Prices successfully saved to '{self.output_file}'")
except Exception as e:
logging.error(f"Failed to save prices to file: {e}")
def run(self):
"""Starts the main loop to fetch and update market data."""
"""Main loop to fetch, display, and save prices every second."""
logging.info("Starting market data feed. Press Ctrl+C to stop.")
while True:
self.fetch_and_update_prices()
# Save data (and its log message) BEFORE clearing and displaying
self.save_prices_to_file()
self.display_prices()
current_prices = self.fetch_and_update_prices()
if current_prices:
self.save_prices_to_file(current_prices)
# Only display the table if logging is set to DEBUG
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
self.display_prices(current_prices)
time.sleep(1)
if __name__ == "__main__":
# Change 'debug' to 'normal' to hide the price table
setup_logging('normal', 'MarketFeed')
parser = argparse.ArgumentParser(description="Fetch live price data from Hyperliquid.")
parser.add_argument(
"--log-level",
default="normal",
choices=['off', 'normal', 'debug'],
help="Set the logging level for the script."
)
args = parser.parse_args()
setup_logging(args.log_level, 'MarketFeed')
market_data = MarketData()
feeder = MarketDataFeeder()
try:
market_data.run()
feeder.run()
except KeyboardInterrupt:
logging.info("Market data feed stopped by user.")
logging.info("Market data feeder stopped.")
sys.exit(0)

View File

@ -137,7 +137,10 @@ def parse_timeframes(tf_strings: list) -> dict:
code = ''
if unit == 'm':
code = f"{numeric_part}min"
elif unit in ['h', 'd', 'w']:
elif unit == 'w':
# --- FIX: Use uppercase 'W' for weeks to avoid deprecation warning ---
code = f"{numeric_part}W"
elif unit in ['h', 'd']:
code = f"{numeric_part}{unit}"
else:
code = tf_str