bid, ask, last traded price

This commit is contained in:
2025-11-04 13:34:49 +01:00
parent dfec8dcf01
commit 8c35bc2fae
2 changed files with 214 additions and 115 deletions

View File

@ -23,12 +23,12 @@ from strategies.base_strategy import BaseStrategy
WATCHED_COINS = ["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"]
LIVE_CANDLE_FETCHER_SCRIPT = "live_candle_fetcher.py"
RESAMPLER_SCRIPT = "resampler.py"
MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py"
# --- REMOVED: Market Cap Fetcher ---
# --- REMOVED: trade_executor.py is no longer a script ---
DASHBOARD_DATA_FETCHER_SCRIPT = "dashboard_data_fetcher.py"
STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json")
DB_PATH = os.path.join("_data", "market_data.db")
MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json")
# --- REMOVED: Market Cap File ---
LOGS_DIR = "_logs"
TRADE_EXECUTOR_STATUS_FILE = os.path.join(LOGS_DIR, "trade_executor_status.json")
@ -145,51 +145,9 @@ def resampler_scheduler(timeframes_to_generate: list):
logging.info("ResamplerScheduler shutting down.")
def run_market_cap_fetcher_job():
"""Defines the job for the market cap fetcher, redirecting output."""
log_file = os.path.join(LOGS_DIR, "market_cap_fetcher.log")
try:
command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--log-level", "off"]
with open(log_file, 'a') as f:
f.write(f"\n--- Starting {MARKET_CAP_FETCHER_SCRIPT} job at {datetime.now()} ---\n")
subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT)
except Exception as e:
with open(log_file, 'a') as f:
f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n")
f.write(f"Failed to run {MARKET_CAP_FETCHER_SCRIPT} job: {e}\n")
# --- REMOVED: run_market_cap_fetcher_job function ---
def market_cap_fetcher_scheduler():
"""Schedules the market_cap_fetcher.py script to run daily at a specific UTC time."""
# --- GRACEFUL SHUTDOWN HANDLER ---
import signal
shutdown_requested = False
def handle_shutdown_signal(signum, frame):
nonlocal shutdown_requested
try:
logging.info(f"Shutdown signal ({signum}) received. Exiting loop...")
except NameError:
print(f"[MarketCapScheduler] Shutdown signal ({signum}) received. Exiting loop...")
shutdown_requested = True
signal.signal(signal.SIGTERM, handle_shutdown_signal)
signal.signal(signal.SIGINT, handle_shutdown_signal)
# --- END GRACEFUL SHUTDOWN HANDLER ---
setup_logging('off', 'MarketCapScheduler')
schedule.every().day.at("00:15", "UTC").do(run_market_cap_fetcher_job)
while not shutdown_requested: # <-- MODIFIED
schedule.run_pending()
# Sleep for 60 seconds, but check for shutdown flag every second
for _ in range(60):
if shutdown_requested:
break
time.sleep(1)
logging.info("MarketCapScheduler shutting down.")
# --- REMOVED: market_cap_fetcher_scheduler function ---
def run_trade_executor(order_execution_queue: multiprocessing.Queue):
@ -390,7 +348,7 @@ class MainApp:
self.watched_coins = coins_to_watch
self.shared_prices = shared_prices
self.prices = {}
self.market_caps = {}
# --- REMOVED: self.market_caps ---
self.open_positions = {}
self.background_processes = processes
self.process_status = {}
@ -400,23 +358,12 @@ class MainApp:
def read_prices(self):
"""Reads the latest prices directly from the shared memory dictionary."""
try:
self.prices = dict(self.shared_prices)
# --- FIX: Use .copy() for thread-safe iteration ---
self.prices = self.shared_prices.copy()
except Exception as e:
logging.debug("Could not read from shared prices dict: {e}")
logging.debug(f"Could not read from shared prices dict: {e}")
def read_market_caps(self):
"""Reads the latest market cap summary from its JSON file."""
if os.path.exists(MARKET_CAP_SUMMARY_FILE):
try:
with open(MARKET_CAP_SUMMARY_FILE, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
for coin in self.watched_coins:
table_key = f"{coin}_market_cap"
if table_key in summary_data:
self.market_caps[coin] = summary_data[table_key].get('market_cap')
except (json.JSONDecodeError, IOError):
logging.debug("Could not read market cap summary file.")
# --- REMOVED: read_market_caps method ---
def read_strategy_statuses(self):
"""Reads the status JSON file for each enabled strategy."""
@ -439,7 +386,9 @@ class MainApp:
if os.path.exists(TRADE_EXECUTOR_STATUS_FILE):
try:
with open(TRADE_EXECUTOR_STATUS_FILE, 'r', encoding='utf-8') as f:
self.open_positions = json.load(f)
# --- FIX: Read the 'open_positions' key from the file ---
status_data = json.load(f)
self.open_positions = status_data.get('open_positions', {})
except (IOError, json.JSONDecodeError):
logging.debug("Could not read trade executor status file.")
else:
@ -450,32 +399,59 @@ class MainApp:
for name, process in self.background_processes.items():
self.process_status[name] = "Running" if process.is_alive() else "STOPPED"
def _format_price(self, price_val, width=10):
"""Helper function to format prices for the dashboard."""
try:
price_float = float(price_val)
if price_float < 1:
price_str = f"{price_float:>{width}.6f}"
elif price_float < 100:
price_str = f"{price_float:>{width}.4f}"
else:
price_str = f"{price_float:>{width}.2f}"
except (ValueError, TypeError):
price_str = f"{'Loading...':>{width}}"
return price_str
def display_dashboard(self):
"""Displays a formatted dashboard with side-by-side tables."""
print("\x1b[H\x1b[J", end="") # Clear screen
left_table_lines = ["--- Market Dashboard ---"]
left_table_width = 44
# --- MODIFIED: Adjusted width for new columns ---
left_table_width = 65
left_table_lines.append("-" * left_table_width)
left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Live Price':>10} | {'Market Cap':>15} |")
# --- MODIFIED: Replaced Market Cap with Gap ---
left_table_lines.append(f"{'#':<2} | {'Coin':^6} | {'Best Bid':>10} | {'Live Price':>10} | {'Best Ask':>10} | {'Gap':>10} |")
left_table_lines.append("-" * left_table_width)
for i, coin in enumerate(self.watched_coins, 1):
price_str = self.prices.get(coin, "Loading...")
# Format the price string
try:
price_float = float(price_str)
if price_float < 1:
price_str = f"{price_float:>10.6f}"
elif price_float < 100:
price_str = f"{price_float:>10.4f}"
else:
price_str = f"{price_float:>10.2f}"
except (ValueError, TypeError):
price_str = f"{'Loading...':>10}"
# --- MODIFIED: Fetch all three price types ---
mid_price = self.prices.get(coin, "Loading...")
bid_price = self.prices.get(f"{coin}_bid", "Loading...")
ask_price = self.prices.get(f"{coin}_ask", "Loading...")
market_cap = self.market_caps.get(coin)
formatted_mc = format_market_cap(market_cap)
left_table_lines.append(f"{i:<2} | {coin:^6} | {price_str} | {formatted_mc:>15} |")
# --- MODIFIED: Use the new formatting helper ---
formatted_mid = self._format_price(mid_price)
formatted_bid = self._format_price(bid_price)
formatted_ask = self._format_price(ask_price)
# --- MODIFIED: Calculate gap ---
gap_str = f"{'Loading...':>10}"
try:
# Calculate the spread
gap_val = float(ask_price) - float(bid_price)
# Format gap with high precision, similar to price
if gap_val < 1:
gap_str = f"{gap_val:>{10}.6f}"
else:
gap_str = f"{gap_val:>{10}.4f}"
except (ValueError, TypeError):
pass # Keep 'Loading...'
# --- REMOVED: Market Cap logic ---
# --- MODIFIED: Print all price columns including gap ---
left_table_lines.append(f"{i:<2} | {coin:^6} | {formatted_bid} | {formatted_mid} | {formatted_ask} | {gap_str} |")
left_table_lines.append("-" * left_table_width)
right_table_lines = ["--- Strategy Status ---"]
@ -502,10 +478,13 @@ class MainApp:
coin = status.get('coin', config_params.get('coin', 'N/A'))
# --- FIX: Handle nested 'coins_to_copy' logic for size ---
if 'coins_to_copy' in config_params:
size = status.get('size', 'Multi')
else:
size = config_params.get('size', 'N/A')
# --- MODIFIED: Read 'size' from status first, then config, then 'Multi' ---
size = status.get('size')
if not size:
if 'coins_to_copy' in config_params:
size = 'Multi'
else:
size = config_params.get('size', 'N/A')
timeframe = config_params.get('timeframe', 'N/A')
@ -515,10 +494,16 @@ class MainApp:
size_display = f"{size:>8}"
if isinstance(size, (int, float)):
size_display = f"{size:>8.4f}" # Format size to 4 decimal places
# --- MODIFIED: More flexible size formatting ---
if size < 0.0001:
size_display = f"{size:>8.6f}"
elif size < 1:
size_display = f"{size:>8.4f}"
else:
size_display = f"{size:>8.2f}"
# --- END NEW LOGIC ---
right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size:>8} |")
right_table_lines.append(f"{i:^2} | {name:<25} | {coin:^6} | {signal:^8} | {price_display:>12} | {last_change_display:>17} | {timeframe:^5} | {size_display} |")
right_table_lines.append("-" * right_table_width)
output_lines = []
@ -536,7 +521,32 @@ class MainApp:
output_lines.append(f"{'Account':<10} | {'Coin':<6} | {'Size':>15} | {'Entry Price':>12} | {'Mark Price':>12} | {'PNL':>15} | {'Leverage':>10} |")
output_lines.append("-" * pos_table_width)
# --- REMOVED: Background Processes section ---
# --- FIX: Correctly read and display open positions ---
if not self.open_positions:
output_lines.append(f"{'No open positions.':^{pos_table_width}}")
else:
for account, positions in self.open_positions.items():
if not positions:
continue
for coin, pos in positions.items():
try:
size_f = float(pos.get('size', 0))
entry_f = float(pos.get('entry_price', 0))
mark_f = float(self.prices.get(coin, 0))
pnl_f = (mark_f - entry_f) * size_f if size_f > 0 else (entry_f - mark_f) * abs(size_f)
lev = pos.get('leverage', 1)
size_str = f"{size_f:>{15}.5f}"
entry_str = f"{entry_f:>{12}.2f}"
mark_str = f"{mark_f:>{12}.2f}"
pnl_str = f"{pnl_f:>{15}.2f}"
lev_str = f"{lev}x"
output_lines.append(f"{account:<10} | {coin:<6} | {size_str} | {entry_str} | {mark_str} | {pnl_str} | {lev_str:>10} |")
except (ValueError, TypeError):
output_lines.append(f"{account:<10} | {coin:<6} | {'Error parsing data...':^{pos_table_width-20}} |")
output_lines.append("-" * pos_table_width)
final_output = "\n".join(output_lines)
print(final_output)
@ -546,7 +556,7 @@ class MainApp:
"""Main loop to read data, display dashboard, and check processes."""
while True:
self.read_prices()
self.read_market_caps()
# --- REMOVED: self.read_market_caps() ---
self.read_strategy_statuses()
self.read_executor_status()
# --- REMOVED: self.check_process_status() ---
@ -584,10 +594,16 @@ if __name__ == "__main__":
# --- REVERTED: All processes are daemon=True and in one dict ---
processes["Live Market Feed"] = multiprocessing.Process(target=start_live_feed, args=(shared_prices, 'off'), daemon=True)
# --- FIX: Pass WATCHED_COINS to the start_live_feed process ---
# --- MODIFICATION: Set log level back to 'off' ---
processes["Live Market Feed"] = multiprocessing.Process(
target=start_live_feed,
args=(shared_prices, WATCHED_COINS, 'off'),
daemon=True
)
processes["Live Candle Fetcher"] = multiprocessing.Process(target=run_live_candle_fetcher, daemon=True)
processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, args=(list(required_timeframes),), daemon=True)
processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True)
# --- REMOVED: Market Cap Fetcher Process ---
processes["Dashboard Data"] = multiprocessing.Process(target=run_dashboard_data_fetcher, daemon=True)
processes["Position Manager"] = multiprocessing.Process(
@ -665,3 +681,5 @@ if __name__ == "__main__":
logging.info("Shutdown complete.")
sys.exit(0)