bid, ask, last traded price

This commit is contained in:
2025-11-04 13:34:49 +01:00
parent 5f9109c3a9
commit 596fcde0bf
2 changed files with 214 additions and 115 deletions

View File

@ -3,6 +3,7 @@ import json
import time
import os
import traceback
import sys
from hyperliquid.info import Info
from hyperliquid.utils import constants
@ -28,38 +29,114 @@ def log_error(error_message: str, include_traceback: bool = True):
except Exception:
print(f"CRITICAL: Failed to write to error log file: {error_message}", file=sys.stderr)
def on_message(message, shared_prices_dict):
"""
Callback function to process incoming 'allMids' messages and update the
shared memory dictionary directly.
Callback function to process incoming WebSocket messages for 'bbo' and 'trades'
and update the shared memory dictionary.
"""
try:
if message.get("channel") == "allMids":
new_prices = message.get("data", {}).get("mids", {})
shared_prices_dict.update(new_prices)
logging.debug(f"Received WebSocket message: {message}")
channel = message.get("channel")
# --- Parser 1: Handle Best Bid/Offer messages ---
if channel == "bbo":
data = message.get("data")
if not data:
logging.warning("BBO message received with no data.")
return
coin = data.get("coin")
if not coin:
logging.warning("BBO data received with no coin identifier.")
return
bid_ask_data = data.get("bbo")
if not bid_ask_data or not isinstance(bid_ask_data, list) or len(bid_ask_data) < 2:
logging.warning(f"[{coin}] Received BBO message with invalid 'bbo' array: {bid_ask_data}")
return
try:
bid_price_str = bid_ask_data[0].get('px')
ask_price_str = bid_ask_data[1].get('px')
if not bid_price_str or not ask_price_str:
logging.warning(f"[{coin}] BBO data missing 'px' field.")
return
bid_price = float(bid_price_str)
ask_price = float(ask_price_str)
# Update the shared dictionary for Bid and Ask
shared_prices_dict[f"{coin}_bid"] = bid_price
shared_prices_dict[f"{coin}_ask"] = ask_price
logging.info(f"Updated {coin} (BBO): Bid={bid_price:.4f}, Ask={ask_price:.4f}")
except (ValueError, TypeError, IndexError) as e:
logging.error(f"[{coin}] Error parsing BBO data: {e}. Data: {bid_ask_data}")
# --- Parser 2: Handle Live Trade messages ---
elif channel == "trades":
trade_list = message.get("data")
if not trade_list or not isinstance(trade_list, list) or len(trade_list) == 0:
logging.warning(f"Received 'trades' message with invalid data: {trade_list}")
return
# Process all trades in the batch
for trade in trade_list:
try:
coin = trade.get("coin")
price_str = trade.get("px")
if not coin or not price_str:
logging.warning(f"Trade data missing 'coin' or 'px': {trade}")
continue
price = float(price_str)
# Update the shared dictionary for the "Live Price" column
shared_prices_dict[coin] = price
logging.info(f"Updated {coin} (Live Price) to last trade: {price:.4f}")
except (ValueError, TypeError) as e:
logging.error(f"Error parsing trade data: {e}. Data: {trade}")
except Exception as e:
log_error(f"Error in WebSocket on_message: {e}")
def start_live_feed(shared_prices_dict, log_level='off'):
def start_live_feed(shared_prices_dict, coins_to_watch: list, log_level='off'):
"""
Main function for the WebSocket process. It takes a shared dictionary
and continuously feeds it with live market data.
Includes a watchdog to auto-reconnect on failure.
Main function for the WebSocket process.
Subscribes to BOTH 'bbo' and 'trades' for all watched coins.
"""
setup_logging(log_level, 'LiveMarketFeed')
setup_logging(log_level, 'LiveMarketFeed_Combined')
info = None
callback = lambda msg: on_message(msg, shared_prices_dict)
def connect_and_subscribe():
"""Establishes a new WebSocket connection and subscribes to allMids."""
"""Establishes a new WebSocket connection and subscribes to both streams."""
try:
logging.info("Connecting to Hyperliquid WebSocket...")
# Ensure skip_ws=False to create the ws_manager
new_info = Info(constants.MAINNET_API_URL, skip_ws=False)
subscription = {"type": "allMids"}
new_info.subscribe(subscription, callback)
logging.info("WebSocket connected and subscribed to 'allMids'.")
# --- MODIFIED: Subscribe to 'bbo' AND 'trades' for each coin ---
for coin in coins_to_watch:
# Subscribe to Best Bid/Offer
bbo_sub = {"type": "bbo", "coin": coin}
new_info.subscribe(bbo_sub, callback)
logging.info(f"Subscribed to 'bbo' for {coin}.")
# Subscribe to Live Trades
trades_sub = {"type": "trades", "coin": coin}
new_info.subscribe(trades_sub, callback)
logging.info(f"Subscribed to 'trades' for {coin}.")
logging.info("WebSocket connected and all subscriptions sent.")
return new_info
except Exception as e:
log_error(f"Failed to connect to WebSocket: {e}")
@ -67,24 +144,28 @@ def start_live_feed(shared_prices_dict, log_level='off'):
info = connect_and_subscribe()
logging.info("Starting live price feed process. Press Ctrl+C in main app to stop.")
if info is None:
logging.critical("Initial WebSocket connection failed. Exiting process.")
log_error("Initial WebSocket connection failed. Exiting process.", include_traceback=False)
time.sleep(10) # Wait before letting the process manager restart it
return
logging.info("Starting Combined (BBO + Trades) live price feed process.")
try:
while True:
# --- Watchdog Logic ---
time.sleep(15) # Check the connection every 15 seconds
# --- FIX: Changed 'is_running()' to the correct method 'is_alive()' ---
if info is None or not info.ws_manager.is_alive():
error_msg = "WebSocket connection lost or not running. Attempting to reconnect..."
if not info.ws_manager.is_alive():
error_msg = "WebSocket connection lost. Attempting to reconnect..."
logging.warning(error_msg)
log_error(error_msg, include_traceback=False) # Log it to the file
if info and info.ws_manager: # Check if ws_manager exists before stopping
try:
info.ws_manager.stop() # Clean up old manager
except Exception as e:
log_error(f"Error stopping old ws_manager: {e}")
try:
info.ws_manager.stop() # Clean up old manager
except Exception as e:
log_error(f"Error stopping old ws_manager: {e}")
info = connect_and_subscribe()
@ -102,5 +183,5 @@ def start_live_feed(shared_prices_dict, log_level='off'):
finally:
if info and info.ws_manager:
info.ws_manager.stop()
logging.info("Listener stopped.")
logging.info("Combined Listener stopped.")

View File

@ -23,12 +23,12 @@ from strategies.base_strategy import BaseStrategy
WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"]
LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py"
RESAMPLER_SCRIPT = "resampler.py"
MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py"
# --- REMOVED: Market Cap Fetcher ---
# --- REMOVED: trade_executor.py is no longer a script ---
DASHBOARD_DATA_FETCHER_SCRIPT = "dashboard_data_fetcher.py"
STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json")
DB_PATH = os.path.join("_data", "market_data.db")
MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json")
# --- REMOVED: Market Cap File ---
LOGS_DIR = "_logs"
TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json")
@ -145,51 +145,9 @@ def resampler_scheduler(timeframes_to_generate: list):
logging.info("ResamplerScheduler shutting down.")
def run_market_cap_fetcher_job():
"""Defines the job for the market cap fetcher, redirecting output."""
log_file = os.path.join(LOGS_DIR, "market_cap_fetcher.log")
try:
command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--log-level", "off"]
with open(log_file, 'a') as f:
f.write(f"\n--- Starting {MARKET_CAP_FETCHER_SCRIPT} job at {datetime.now()} ---\n")
subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT)
except Exception as e:
with open(log_file, 'a') as f:
f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n")
f.write(f"Failed to run {MARKET_CAP_FETCHER_SCRIPT} job: {e}\n")
# --- REMOVED: run_market_cap_fetcher_job function ---
def market_cap_fetcher_scheduler():
"""Schedules the market_cap_fetcher.py script to run daily at a specific UTC time."""
# --- GRACEFUL SHUTDOWN HANDLER ---
import signal
shutdown_requested = False
def handle_shutdown_signal(signum, frame):
nonlocal shutdown_requested
try:
logging.info(f"Shutdown signal ({signum}) received. Exiting loop...")
except NameError:
print(f"[MarketCapScheduler] Shutdown signal ({signum}) received. Exiting loop...")
shutdown_requested = True
signal.signal(signal.SIGTERM, handle_shutdown_signal)
signal.signal(signal.SIGINT, handle_shutdown_signal)
# --- END GRACEFUL SHUTDOWN HANDLER ---
setup_logging('off', 'MarketCapScheduler')
schedule.every().day.at("00:15", "UTC").do(run_market_cap_fetcher_job)
while not shutdown_requested: # <-- MODIFIED
schedule.run_pending()
# Sleep for 60 seconds, but check for shutdown flag every second
for _ in range(60):
if shutdown_requested:
break
time.sleep(1)
logging.info("MarketCapScheduler shutting down.")
# --- REMOVED: market_cap_fetcher_scheduler function ---
def run_trade_executor(order_execution_queue: multiprocessing.Queue):
@ -390,7 +348,7 @@ class MainApp:
self.watched_coins = coins_to_watch
self.shared_prices = shared_prices
self.prices = {}
self.market_caps = {}
# --- REMOVED: self.market_caps ---
self.open_positions = {}
self.background_processes = processes
self.process_status = {}
@ -400,23 +358,12 @@ class MainApp:
def read_prices(self):
"""Reads the latest prices directly from the shared memory dictionary."""
try:
self.prices = dict(self.shared_prices)
# --- FIX: Use .copy() for thread-safe iteration ---
self.prices = self.shared_prices.copy()
except Exception as e:
logging.debug("Could not read from shared prices dict: {e}")
logging.debug(f"Could not read from shared prices dict: {e}")
def read_market_caps(self):
"""Reads the latest market cap summary from its JSON file."""
if os.path.exists(MARKET_CAP_SUMMARY_FILE):
try:
with open(MARKET_CAP_SUMMARY_FILE, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
for coin in self.watched_coins:
table_key = f"{coin}_market_cap"
if table_key in summary_data:
self.market_caps[coin] = summary_data[table_key].get('market_cap')
except (json.JSONDecodeError, IOError):
logging.debug("Could not read market cap summary file.")
# --- REMOVED: read_market_caps method ---
def read_strategy_statuses(self):
"""Reads the status JSON file for each enabled strategy."""
@ -439,7 +386,9 @@ class MainApp:
if os.path.exists(TRADE_EXECUTOR_STATUS_FILE):
try:
with open(TRADE_EXECUTOR_STATUS_FILE, 'r', encoding='utf-8') as f:
self.open_positions = json.load(f)
# --- FIX: Read the 'open_positions' key from the file ---
status_data = json.load(f)
self.open_positions = status_data.get('open_positions', {})
except (IOError, json.JSONDecodeError):
logging.debug("Could not read trade executor status file.")
else:
@ -450,32 +399,59 @@ class MainApp:
for name, process in self.background_processes.items():
self.process_status[name] = "Running" if process.is_alive() else "STOPPED"
def _format_price(self, price_val, width=10):
"""Helper function to format prices for the dashboard."""
try:
price_float = float(price_val)
if price_float < 1:
price_str = f"{price_float:>{width}.6f}"
elif price_float < 100:
price_str = f"{price_float:>{width}.4f}"
else:
price_str = f"{price_float:>{width}.2f}"
except (ValueError, TypeError):
price_str = f"{'Loading...':>{width}}"
return price_str
def display_dashboard(self):
"""Displays a formatted dashboard with side-by-side tables."""
print("\x1b[H\x1b[J", end="") # Clear screen
left_table_lines = ["--- Market Dashboard ---"]
left_table_width = 44
# --- MODIFIED: Adjusted width for new columns ---
left_table_width = 65
left_table_lines.append("-" * left_table_width)
left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Live Price':>10} | {'Market Cap':>15} |")
# --- MODIFIED: Replaced Market Cap with Gap ---
left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Best Bid':>10} | {'Live Price':>10} | {'Best Ask':>10} | {'Gap':>10} |")
left_table_lines.append("-" * left_table_width)
for i, coin in enumerate(self.watched_coins, 1):
price_str = self.prices.get(coin, "Loading...")
# Format the price string
try:
price_float = float(price_str)
if price_float < 1:
price_str = f"{price_float:>10.6f}"
elif price_float < 100:
price_str = f"{price_float:>10.4f}"
else:
price_str = f"{price_float:>10.2f}"
except (ValueError, TypeError):
price_str = f"{'Loading...':>10}"
# --- MODIFIED: Fetch all three price types ---
mid_price = self.prices.get(coin, "Loading...")
bid_price = self.prices.get(f"{coin}_bid", "Loading...")
ask_price = self.prices.get(f"{coin}_ask", "Loading...")
market_cap = self.market_caps.get(coin)
formatted_mc = format_market_cap(market_cap)
left_table_lines.append(f"{i:<2} | {coin:^6} | {price_str} | {formatted_mc:>15} |")
# --- MODIFIED: Use the new formatting helper ---
formatted_mid = self._format_price(mid_price)
formatted_bid = self._format_price(bid_price)
formatted_ask = self._format_price(ask_price)
# --- MODIFIED: Calculate gap ---
gap_str = f"{'Loading...':>10}"
try:
# Calculate the spread
gap_val = float(ask_price) - float(bid_price)
# Format gap with high precision, similar to price
if gap_val < 1:
gap_str = f"{gap_val:>{10}.6f}"
else:
gap_str = f"{gap_val:>{10}.4f}"
except (ValueError, TypeError):
pass # Keep 'Loading...'
# --- REMOVED: Market Cap logic ---
# --- MODIFIED: Print all price columns including gap ---
left_table_lines.append(f"{i:<2} | {coin:^6} | {formatted_bid} | {formatted_mid} | {formatted_ask} | {gap_str} |")
left_table_lines.append("-" * left_table_width)
right_table_lines = ["--- Strategy Status ---"]
@ -502,10 +478,13 @@ class MainApp:
coin = status.get('coin', config_params.get('coin', 'N/A'))
# --- FIX: Handle nested 'coins_to_copy' logic for size ---
if 'coins_to_copy' in config_params:
size = status.get('size', 'Multi')
else:
size = config_params.get('size', 'N/A')
# --- MODIFIED: Read 'size' from status first, then config, then 'Multi' ---
size = status.get('size')
if not size:
if 'coins_to_copy' in config_params:
size = 'Multi'
else:
size = config_params.get('size', 'N/A')
timeframe = config_params.get('timeframe', 'N/A')
@ -515,10 +494,16 @@ class MainApp:
size_display = f"{size:>8}"
if isinstance(size, (int, float)):
size_display = f"{size:>8.4f}" # Format size to 4 decimal places
# --- MODIFIED: More flexible size formatting ---
if size < 0.0001:
size_display = f"{size:>8.6f}"
elif size < 1:
size_display = f"{size:>8.4f}"
else:
size_display = f"{size:>8.2f}"
# --- END NEW LOGIC ---
right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size:>8} |")
right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size_display} |")
right_table_lines.append("-" * right_table_width)
output_lines = []
@ -536,7 +521,32 @@ class MainApp:
output_lines.append(f"{'Account':<10} | {'Coin':<6} | {'Size':>15} | {'Entry Price':>12} | {'Mark Price':>12} | {'PNL':>15} | {'Leverage':>10} |")
output_lines.append("-" * pos_table_width)
# --- REMOVED: Background Processes section ---
# --- FIX: Correctly read and display open positions ---
if not self.open_positions:
output_lines.append(f"{'No open positions.':^{pos_table_width}}")
else:
for account, positions in self.open_positions.items():
if not positions:
continue
for coin, pos in positions.items():
try:
size_f = float(pos.get('size', 0))
entry_f = float(pos.get('entry_price', 0))
mark_f = float(self.prices.get(coin, 0))
pnl_f = (mark_f - entry_f) * size_f if size_f > 0 else (entry_f - mark_f) * abs(size_f)
lev = pos.get('leverage', 1)
size_str = f"{size_f:>{15}.5f}"
entry_str = f"{entry_f:>{12}.2f}"
mark_str = f"{mark_f:>{12}.2f}"
pnl_str = f"{pnl_f:>{15}.2f}"
lev_str = f"{lev}x"
output_lines.append(f"{account:<10} | {coin:<6} | {size_str} | {entry_str} | {mark_str} | {pnl_str} | {lev_str:>10} |")
except (ValueError, TypeError):
output_lines.append(f"{account:<10} | {coin:<6} | {'Error parsing data...':^{pos_table_width-20}} |")
output_lines.append("-" * pos_table_width)
final_output = "\n".join(output_lines)
print(final_output)
@ -546,7 +556,7 @@ class MainApp:
"""Main loop to read data, display dashboard, and check processes."""
while True:
self.read_prices()
self.read_market_caps()
# --- REMOVED: self.read_market_caps() ---
self.read_strategy_statuses()
self.read_executor_status()
# --- REMOVED: self.check_process_status() ---
@ -584,10 +594,16 @@ if __name__ == "__main__":
# --- REVERTED: All processes are daemon=True and in one dict ---
processes["Live Market Feed"] = multiprocessing.Process(target=start_live_feed, args=(shared_prices, 'off'), daemon=True)
# --- FIX: Pass WATCHED_COINS to the start_live_feed process ---
# --- MODIFICATION: Set log level back to 'off' ---
processes["Live Market Feed"] = multiprocessing.Process(
target=start_live_feed,
args=(shared_prices, WATCHED_COINS, 'off'),
daemon=True
)
processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True)
processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True)
processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True)
# --- REMOVED: Market Cap Fetcher Process ---
processes["Dashboard Data"] = multiprocessing.Process(target=run_dashboard_data_fetcher, daemon=True)
processes["Position Manager"] = multiprocessing.Process(
@ -665,3 +681,5 @@ if __name__ == "__main__":
logging.info("Shutdown complete.")
sys.exit(0)