market monitor with timestamp works

This commit is contained in:
2025-08-02 22:28:44 +02:00
parent 7a8ce7e416
commit 64e94ef61a
2 changed files with 181 additions and 0 deletions

53
app.py Normal file
View File

@ -0,0 +1,53 @@
import time
from datetime import datetime
import threading
from market import market_info_loop, get_latest_price, get_all_latest_prices, get_latest_price_with_timestamp, set_debug
def start_market_data_thread(debug=False):
"""Start market data collection in a separate thread"""
market_thread = threading.Thread(target=market_info_loop, daemon=True)
market_thread.start()
return market_thread
def main():
print("Starting app with market data collection...")
print("-" * 40)
# Start market data collection in background thread (debug=False by default)
start_market_data_thread(debug=True)
# Give some time for initial market data to be collected
print("Waiting for initial market data...")
time.sleep(10)
try:
while True:
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] update!")
# Show some sample prices with timestamps
eth_data = get_latest_price_with_timestamp("ETHUSD")
btc_data = get_latest_price_with_timestamp("BTCUSD")
shib_data = get_latest_price_with_timestamp("SHIBUSD")
if eth_data:
print(f"ETH/USD: ${eth_data['price']:.2f} (updated: {eth_data['timestamp_str']})")
if btc_data:
print(f"BTC/USD: ${btc_data['price']:.2f} (updated: {btc_data['timestamp_str']})")
if shib_data:
print(f"SHIB/USD: ${shib_data['price']:.8f} (updated: {shib_data['timestamp_str']})")
# Show total number of markets with prices
all_prices = get_all_latest_prices()
print(f"Total markets tracked: {len(all_prices)}")
# Sleep for 60 seconds (1 minute)
time.sleep(60)
except KeyboardInterrupt:
print("\nApp stopped by user")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()

128
market.py
View File

@ -0,0 +1,128 @@
import time
import os
import asyncio
from datetime import datetime
from flextrade.flextrade_client import Client
from flextrade.constants.markets import (
BASE_MARKET_ETH_USD, BASE_MARKET_BTC_USD, BASE_MARKET_BNB_USD,
BASE_MARKET_SHIB_USD, BASE_MARKET_PEPE_USD, BASE_MARKET_SUI_USD,
BASE_MARKET_DOGE_USD, BASE_MARKET_AAVE_USD, BASE_MARKET_HBAR_USD,
BASE_MARKET_VIRTUAL_USD, BASE_MARKET_ADA_USD, BASE_MARKET_PENDLE_USD,
BASE_MARKET_TRX_USD, BASE_MARKET_AVAX_USD, BASE_MARKET_UNI_USD,
BASE_MARKET_SOL_USD, BASE_MARKET_LINK_USD, BASE_MARKET_XRP_USD,
BASE_MARKET_TON_USD
)
from dotenv import load_dotenv
load_dotenv()
RPC_URL = os.getenv("RPC_URL")
PRIVATE_KEY = os.getenv("PRIVATE_KEY")
# List of all markets to test
MARKETS = [
BASE_MARKET_ETH_USD,
BASE_MARKET_BTC_USD,
BASE_MARKET_BNB_USD,
BASE_MARKET_SHIB_USD,
BASE_MARKET_PEPE_USD,
BASE_MARKET_SUI_USD,
BASE_MARKET_DOGE_USD,
BASE_MARKET_AAVE_USD,
BASE_MARKET_HBAR_USD,
BASE_MARKET_VIRTUAL_USD,
BASE_MARKET_ADA_USD,
BASE_MARKET_PENDLE_USD,
BASE_MARKET_TRX_USD,
BASE_MARKET_AVAX_USD,
BASE_MARKET_UNI_USD,
BASE_MARKET_SOL_USD,
BASE_MARKET_LINK_USD,
BASE_MARKET_XRP_USD,
BASE_MARKET_TON_USD
]
# Global variable to store latest prices with timestamps
latest_prices = {}
# Debug flag to control printing
DEBUG = False
def print_market_info(market_info):
if DEBUG:
timestamp = datetime.now().strftime("%H:%M:%S")
print('[{0}] {1} : {2:.4f}'.format(timestamp, market_info["market"], market_info["price"]))
def market_info():
"""Fetch market information once and update global prices"""
client = Client(
eth_private_key=PRIVATE_KEY,
rpc_url=RPC_URL
)
if DEBUG:
print("Fetching market data...\n")
for market in MARKETS:
try:
market_info_data = client.public.get_market_info(market)
print_market_info(market_info_data)
# Store the latest price and timestamp in global dictionary
market_name = market_info_data["market"]
current_timestamp = datetime.now()
latest_prices[market_name] = {
'price': market_info_data["price"],
'timestamp': current_timestamp,
'timestamp_str': current_timestamp.strftime("%Y-%m-%d %H:%M:%S")
}
time.sleep(2) # Wait 2 seconds between requests
except Exception as e:
if DEBUG:
print(f"Error fetching market {market}: {e}")
print('-' * 50)
def market_info_loop():
"""Continuously fetch market information in a loop"""
if DEBUG:
print("Starting market data loop - press Ctrl+C to stop")
print("-" * 50)
try:
while True:
market_info()
except KeyboardInterrupt:
if DEBUG:
print("\nMarket data loop stopped by user")
except Exception as e:
if DEBUG:
print(f"An error occurred: {e}")
def set_debug(enabled):
"""Enable or disable debug printing"""
global DEBUG
DEBUG = enabled
def get_latest_price(market_name):
"""Get the latest price for a specific market"""
market_data = latest_prices.get(market_name, None)
return market_data['price'] if market_data else None
def get_latest_price_with_timestamp(market_name):
"""Get the latest price with timestamp for a specific market"""
return latest_prices.get(market_name, None)
def get_all_latest_prices():
"""Get all latest prices with timestamps"""
return latest_prices.copy()
def get_all_latest_prices_only():
"""Get all latest prices without timestamps (backward compatibility)"""
return {market: data['price'] for market, data in latest_prices.items()}
if __name__ == '__main__':
# Enable debug when running directly
set_debug(True)
market_info_loop()