diff --git a/_data/market_cap_data.json b/_data/market_cap_data.json new file mode 100644 index 0000000..834f67d --- /dev/null +++ b/_data/market_cap_data.json @@ -0,0 +1,47 @@ +{ + "BTC_market_cap": { + "datetime_utc": "2025-10-14 19:07:32", + "market_cap": 2254100854707.6426 + }, + "ETH_market_cap": { + "datetime_utc": "2025-10-14 19:07:45", + "market_cap": 498260644977.71 + }, + "SOL_market_cap": { + "datetime_utc": "2025-10-14 19:07:54", + "market_cap": 110493585034.85222 + }, + "BNB_market_cap": { + "datetime_utc": "2025-10-14 19:08:01", + "market_cap": 169461959349.39044 + }, + "ZEC_market_cap": { + "datetime_utc": "2025-10-14 19:08:32", + "market_cap": 3915238492.7266335 + }, + "SUI_market_cap": { + "datetime_utc": "2025-10-14 19:08:51", + "market_cap": 10305847774.680008 + }, + "STABLECOINS_market_cap": { + "datetime_utc": "2025-10-14 00:00:00", + "market_cap": 551315140796.8396 + }, + "ASTER_market_cap": { + "datetime_utc": "2025-10-14 20:47:18", + "market_cap": 163953008.77347806 + }, + "HYPE_market_cap": { + "datetime_utc": "2025-10-14 20:55:21", + "market_cap": 10637373991.458858 + }, + "TOTAL_market_cap_daily": { + "datetime_utc": "2025-10-14 00:00:00", + "market_cap": 3942937396387.7046 + }, + "PUMP_market_cap": { + "datetime_utc": "2025-10-14 21:02:30", + "market_cap": 1454398647.593871 + }, + "summary_last_updated_utc": "2025-10-14T21:08:01.788055+00:00" +} \ No newline at end of file diff --git a/_data/market_data.db-shm b/_data/market_data.db-shm index 93c3d75..66c2db3 100644 Binary files a/_data/market_data.db-shm and b/_data/market_data.db-shm differ diff --git a/main_app.py b/main_app.py index 6cd9306..e223545 100644 --- a/main_app.py +++ b/main_app.py @@ -18,11 +18,26 @@ COIN_LISTER_SCRIPT = "list_coins.py" MARKET_FEEDER_SCRIPT = "market.py" DATA_FETCHER_SCRIPT = "data_fetcher.py" RESAMPLER_SCRIPT = "resampler.py" +MARKET_CAP_FETCHER_SCRIPT = "market_cap_fetcher.py" STRATEGY_CONFIG_FILE = os.path.join("_data", "strategies.json") PRICE_DATA_FILE = os.path.join("_data", "current_prices.json") DB_PATH = os.path.join("_data", "market_data.db") STATUS_FILE = os.path.join("_data", "fetcher_status.json") -LOGS_DIR = "_logs" # Directory to store logs from background processes +MARKET_CAP_SUMMARY_FILE = os.path.join("_data", "market_cap_data.json") +LOGS_DIR = "_logs" + + +def format_market_cap(mc_value): + """Formats a large number into a human-readable market cap string.""" + if not isinstance(mc_value, (int, float)) or mc_value == 0: + return "N/A" + if mc_value >= 1_000_000_000_000: + return f"${mc_value / 1_000_000_000_000:.2f}T" + if mc_value >= 1_000_000_000: + return f"${mc_value / 1_000_000_000:.2f}B" + if mc_value >= 1_000_000: + return f"${mc_value / 1_000_000:.2f}M" + return f"${mc_value:,.2f}" def run_market_feeder(): @@ -32,24 +47,28 @@ def run_market_feeder(): try: with open(log_file, 'a') as f: subprocess.run( - [sys.executable, MARKET_FEEDER_SCRIPT, "--log-level", "normal"], + [sys.executable, MARKET_FEEDER_SCRIPT, "--log-level", "off"], check=True, stdout=f, stderr=subprocess.STDOUT ) except (subprocess.CalledProcessError, Exception) as e: - logging.error(f"Market feeder script failed: {e}. Restarting...") - time.sleep(5) + with open(log_file, 'a') as f: + f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") + f.write(f"Market feeder script failed: {e}. Restarting...\n") + time.sleep(5) def run_data_fetcher_job(): """Defines the job for the data fetcher, redirecting output to a log file.""" log_file = os.path.join(LOGS_DIR, "data_fetcher.log") - logging.info(f"Scheduler starting data_fetcher.py task...") try: - command = [sys.executable, DATA_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--days", "7", "--log-level", "normal"] + command = [sys.executable, DATA_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--days", "7", "--log-level", "off"] with open(log_file, 'a') as f: + f.write(f"\n--- Starting data_fetcher.py job at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except Exception as e: - logging.error(f"Failed to run data_fetcher.py job: {e}") + with open(log_file, 'a') as f: + f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n") + f.write(f"Failed to run data_fetcher.py job: {e}\n") def data_fetcher_scheduler(): @@ -65,13 +84,15 @@ def data_fetcher_scheduler(): def run_resampler_job(): """Defines the job for the resampler, redirecting output to a log file.""" log_file = os.path.join(LOGS_DIR, "resampler.log") - logging.info(f"Scheduler starting resampler.py task...") try: - command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "normal"] + command = [sys.executable, RESAMPLER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] with open(log_file, 'a') as f: + f.write(f"\n--- Starting resampler.py job at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except Exception as e: - logging.error(f"Failed to run resampler.py job: {e}") + with open(log_file, 'a') as f: + f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n") + f.write(f"Failed to run resampler.py job: {e}\n") def resampler_scheduler(): @@ -84,26 +105,52 @@ def resampler_scheduler(): time.sleep(1) +def run_market_cap_fetcher_job(): + """Defines the job for the market cap fetcher, redirecting output.""" + log_file = os.path.join(LOGS_DIR, "market_cap_fetcher.log") + try: + command = [sys.executable, MARKET_CAP_FETCHER_SCRIPT, "--coins"] + WATCHED_COINS + ["--log-level", "off"] + with open(log_file, 'a') as f: + f.write(f"\n--- Starting {MARKET_CAP_FETCHER_SCRIPT} job at {datetime.now()} ---\n") + subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) + except Exception as e: + with open(log_file, 'a') as f: + f.write(f"\n--- SCHEDULER ERROR at {datetime.now()} ---\n") + f.write(f"Failed to run {MARKET_CAP_FETCHER_SCRIPT} job: {e}\n") + + +def market_cap_fetcher_scheduler(): + """Schedules the market_cap_fetcher.py script to run daily at a specific UTC time.""" + setup_logging('off', 'MarketCapScheduler') + schedule.every().day.at("00:15", "UTC").do(run_market_cap_fetcher_job) + while True: + schedule.run_pending() + time.sleep(60) + + def run_strategy(strategy_name: str, config: dict): """Target function to run a strategy, redirecting its output to a log file.""" log_file = os.path.join(LOGS_DIR, f"strategy_{strategy_name}.log") script_name = config['script'] params_str = json.dumps(config['parameters']) - command = [sys.executable, script_name, "--name", strategy_name, "--params", params_str, "--log-level", "normal"] + command = [sys.executable, script_name, "--name", strategy_name, "--params", params_str, "--log-level", "off"] while True: try: with open(log_file, 'a') as f: f.write(f"\n--- Starting strategy '{strategy_name}' at {datetime.now()} ---\n") subprocess.run(command, check=True, stdout=f, stderr=subprocess.STDOUT) except (subprocess.CalledProcessError, Exception) as e: - logging.error(f"Strategy '{strategy_name}' failed: {e}. Restarting...") - time.sleep(10) + with open(log_file, 'a') as f: + f.write(f"\n--- PROCESS ERROR at {datetime.now()} ---\n") + f.write(f"Strategy '{strategy_name}' failed: {e}. Restarting...\n") + time.sleep(10) class MainApp: def __init__(self, coins_to_watch: list, processes: dict): self.watched_coins = coins_to_watch self.prices = {} + self.market_caps = {} self.last_db_update_info = "Initializing..." self._lines_printed = 0 self.background_processes = processes @@ -118,6 +165,21 @@ class MainApp: except (json.JSONDecodeError, IOError): logging.debug("Could not read price file.") + def read_market_caps(self): + """Reads the latest market cap summary from its JSON file.""" + if os.path.exists(MARKET_CAP_SUMMARY_FILE): + try: + with open(MARKET_CAP_SUMMARY_FILE, 'r', encoding='utf-8') as f: + summary_data = json.load(f) + + # Extract just the market cap value for each coin + for coin in self.watched_coins: + table_key = f"{coin}_market_cap" + if table_key in summary_data: + self.market_caps[coin] = summary_data[table_key].get('market_cap') + except (json.JSONDecodeError, IOError): + logging.debug("Could not read market cap summary file.") + def get_overall_db_status(self): """Reads the fetcher status from the status file.""" if os.path.exists(STATUS_FILE): @@ -131,15 +193,14 @@ class MainApp: dt_utc = datetime.fromisoformat(timestamp_utc_str.replace('Z', '+00:00')).replace(tzinfo=timezone.utc) dt_local = dt_utc.astimezone(None) - # --- FIX: Manually format the UTC offset --- offset = dt_local.utcoffset() offset_hours = int(offset.total_seconds() / 3600) sign = '+' if offset_hours >= 0 else '' - offset_str = f"(UTC{sign}{offset_hours})" + offset_str = f"UTC{sign}{offset_hours}" timestamp_display = f"{dt_local.strftime('%Y-%m-%d %H:%M:%S')} {offset_str}" else: timestamp_display = "N/A" - self.last_db_update_info = f"{coin} at {timestamp_display} ({num_candles} candles)" + self.last_db_update_info = f"{coin} at {timestamp_display} | {num_candles} candles" except (IOError, json.JSONDecodeError): self.last_db_update_info = "Error reading status file." @@ -149,20 +210,28 @@ class MainApp: self.process_status[name] = "Running" if process.is_alive() else "STOPPED" def display_dashboard(self): - """Displays a formatted table without blinking.""" - if self._lines_printed > 0: print(f"\x1b[{self._lines_printed}A", end="") + """Displays a formatted table without blinking by overwriting previous lines.""" + if self._lines_printed > 0: + print(f"\x1b[{self._lines_printed}A", end="") + output_lines = ["--- Market Dashboard ---"] - table_width = 26 + table_width = 44 output_lines.append("-" * table_width) - output_lines.append(f"{'#':<2} | {'Coin':<6} | {'Live Price':>10} |") + output_lines.append(f"{'#':<2} | {'Coin':<6} | {'Live Price':>10} | {'Market Cap':>15} |") output_lines.append("-" * table_width) for i, coin in enumerate(self.watched_coins, 1): price = self.prices.get(coin, "Loading...") - output_lines.append(f"{i:<2} | {coin:<6} | {price:>10} |") + market_cap = self.market_caps.get(coin) + formatted_mc = format_market_cap(market_cap) + output_lines.append(f"{i:<2} | {coin:<6} | {price:>10} | {formatted_mc:>15} |") output_lines.append("-" * table_width) - output_lines.append("DB Status:") - output_lines.append(f" Last update -> {self.last_db_update_info}") + status_prefix = "DB Status: Last update -> " + max_len = 80 + status_message = f"{status_prefix}{self.last_db_update_info}" + if len(status_message) > max_len: + status_message = status_message[:max_len-3] + "..." + output_lines.append(status_message) output_lines.append("--- Background Processes ---") for name, status in self.process_status.items(): @@ -170,6 +239,7 @@ class MainApp: final_output = "\n".join(output_lines) + "\n\x1b[J" print(final_output, end="") + self._lines_printed = len(output_lines) sys.stdout.flush() @@ -177,6 +247,7 @@ class MainApp: """Main loop to read data, display dashboard, and check processes.""" while True: self.read_prices() + self.read_market_caps() self.get_overall_db_status() self.check_process_status() self.display_dashboard() @@ -186,7 +257,6 @@ class MainApp: if __name__ == "__main__": setup_logging('normal', 'MainApp') - # Create logs directory if it doesn't exist if not os.path.exists(LOGS_DIR): os.makedirs(LOGS_DIR) @@ -199,12 +269,11 @@ if __name__ == "__main__": processes = {} - # Start Data Pipeline Processes processes["Market Feeder"] = multiprocessing.Process(target=run_market_feeder, daemon=True) processes["Data Fetcher"] = multiprocessing.Process(target=data_fetcher_scheduler, daemon=True) processes["Resampler"] = multiprocessing.Process(target=resampler_scheduler, daemon=True) + processes["Market Cap Fetcher"] = multiprocessing.Process(target=market_cap_fetcher_scheduler, daemon=True) - # Start Strategy Processes based on config try: with open(STRATEGY_CONFIG_FILE, 'r') as f: strategy_configs = json.load(f) @@ -215,7 +284,6 @@ if __name__ == "__main__": except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Could not load strategies from '{STRATEGY_CONFIG_FILE}': {e}") - # Launch all processes for name, proc in processes.items(): logging.info(f"Starting process '{name}'...") proc.start() diff --git a/market_cap_fetcher.py b/market_cap_fetcher.py new file mode 100644 index 0000000..d8e9c9a --- /dev/null +++ b/market_cap_fetcher.py @@ -0,0 +1,283 @@ +import argparse +import logging +import os +import sys +import sqlite3 +import pandas as pd +import requests +import time +from datetime import datetime, timezone, timedelta +import json + +# Assuming logging_utils.py is in the same directory +from logging_utils import setup_logging + +class MarketCapFetcher: + """ + Fetches historical daily market cap data from the CoinGecko API and + intelligently updates the SQLite database. It processes individual coins, + aggregates stablecoins, and captures total market cap metrics. + """ + + COIN_ID_MAP = { + "BTC": "bitcoin", + "ETH": "ethereum", + "SOL": "solana", + "BNB": "binancecoin", + "HYPE": "hyperliquid", + "ASTER": "astar", + "ZEC": "zcash", + "PUMP": "pump-fun", # Correct ID is 'pump-fun' + "SUI": "sui" + } + + STABLECOIN_ID_MAP = { + "USDT": "tether", + "USDC": "usd-coin", + "USDE": "ethena-usde", + "DAI": "dai", + "PYUSD": "paypal-usd" + } + + def __init__(self, log_level: str, coins: list): + setup_logging(log_level, 'MarketCapFetcher') + self.coins_to_fetch = coins + self.db_path = os.path.join("_data", "market_data.db") + self.api_base_url = "https://api.coingecko.com/api/v3" + #self.api_key = os.environ.get("COINGECKO_API_KEY") + self.api_key = "CG-SvVswjGvdHajUrLFq37CCKJX" + if not self.api_key: + logging.error("CoinGecko API key not found. Please set the COINGECKO_API_KEY environment variable.") + sys.exit(1) + + def run(self): + """ + Main execution function to process all configured coins and update the database. + """ + logging.info("Starting historical market cap fetch process from CoinGecko...") + with sqlite3.connect(self.db_path) as conn: + conn.execute("PRAGMA journal_mode=WAL;") + + # 1. Process individual coins + for coin_symbol in self.coins_to_fetch: + coin_id = self.COIN_ID_MAP.get(coin_symbol.upper()) + if not coin_id: + logging.warning(f"No CoinGecko ID found for '{coin_symbol}'. Skipping.") + continue + logging.info(f"--- Processing {coin_symbol} ({coin_id}) ---") + try: + self._update_market_cap_for_coin(coin_id, coin_symbol, conn) + except Exception as e: + logging.error(f"An unexpected error occurred while processing {coin_symbol}: {e}") + time.sleep(2) + + # 2. Process and aggregate stablecoins + self._update_stablecoin_aggregate(conn) + + # 3. Process total market cap metrics + self._update_total_market_cap(conn) + + # 4. Save a summary of the latest data + self._save_summary(conn) + + logging.info("--- Market cap fetch process complete ---") + + def _save_summary(self, conn): + """ + Queries the last record from each market cap table and saves a summary to a JSON file. + """ + logging.info("--- Generating Market Cap Summary ---") + summary_data = {} + summary_file_path = os.path.join("_data", "market_cap_data.json") + + try: + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE '%_market_cap' OR name LIKE 'TOTAL_%');") + tables = [row[0] for row in cursor.fetchall()] + + for table_name in tables: + try: + df_last = pd.read_sql(f'SELECT * FROM "{table_name}" ORDER BY datetime_utc DESC LIMIT 1', conn) + if not df_last.empty: + summary_data[table_name] = df_last.to_dict('records')[0] + except Exception as e: + logging.error(f"Could not read last record from table '{table_name}': {e}") + + if summary_data: + summary_data['summary_last_updated_utc'] = datetime.now(timezone.utc).isoformat() + + with open(summary_file_path, 'w', encoding='utf-8') as f: + json.dump(summary_data, f, indent=4) + logging.info(f"Successfully saved market cap summary to '{summary_file_path}'") + else: + logging.warning("No data found to create a summary.") + + except Exception as e: + logging.error(f"Failed to generate summary: {e}") + + def _update_total_market_cap(self, conn): + """ + Fetches the current total market cap and saves it for the current date. + """ + logging.info("--- Processing Total Market Cap ---") + table_name = "TOTAL_market_cap_daily" + + try: + # --- FIX: Use the current date instead of yesterday's --- + today_date = datetime.now(timezone.utc).date() + + cursor = conn.cursor() + cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';") + table_exists = cursor.fetchone() + + if table_exists: + # Check if we already have a record for today + cursor.execute(f"SELECT 1 FROM \"{table_name}\" WHERE date(datetime_utc) = ? LIMIT 1", (today_date.isoformat(),)) + if cursor.fetchone(): + logging.info(f"Total market cap for {today_date} already exists. Skipping.") + return + + logging.info("Fetching current global market data...") + url = f"{self.api_base_url}/global" + headers = {"x-cg-demo-api-key": self.api_key} + response = requests.get(url, headers=headers) + response.raise_for_status() + global_data = response.json().get('data', {}) + total_mc = global_data.get('total_market_cap', {}).get('usd') + + if total_mc: + df_total = pd.DataFrame([{ + 'datetime_utc': pd.to_datetime(today_date), + 'market_cap': total_mc + }]) + df_total.to_sql(table_name, conn, if_exists='append', index=False) + logging.info(f"Saved total market cap for {today_date}: ${total_mc:,.2f}") + + except requests.exceptions.RequestException as e: + logging.error(f"Failed to fetch global market data: {e}") + except Exception as e: + logging.error(f"An error occurred while updating total market cap: {e}") + + + def _update_stablecoin_aggregate(self, conn): + """Fetches data for all stablecoins and saves the aggregated market cap.""" + logging.info("--- Processing aggregated stablecoin market cap ---") + all_stablecoin_df = pd.DataFrame() + + for symbol, coin_id in self.STABLECOIN_ID_MAP.items(): + logging.info(f"Fetching historical data for stablecoin: {symbol}...") + df = self._fetch_historical_data(coin_id, days=365) + if not df.empty: + df['coin'] = symbol + all_stablecoin_df = pd.concat([all_stablecoin_df, df]) + time.sleep(2) + + if all_stablecoin_df.empty: + logging.warning("No data fetched for any stablecoins. Cannot create aggregate.") + return + + aggregated_df = all_stablecoin_df.groupby(all_stablecoin_df['datetime_utc'].dt.date)['market_cap'].sum().reset_index() + aggregated_df['datetime_utc'] = pd.to_datetime(aggregated_df['datetime_utc']) + + table_name = "STABLECOINS_market_cap" + last_date_in_db = self._get_last_date_from_db(table_name, conn) + + if last_date_in_db: + aggregated_df = aggregated_df[aggregated_df['datetime_utc'] > last_date_in_db] + + if not aggregated_df.empty: + aggregated_df.to_sql(table_name, conn, if_exists='append', index=False) + logging.info(f"Successfully saved {len(aggregated_df)} daily records to '{table_name}'.") + else: + logging.info("Aggregated stablecoin data is already up-to-date.") + + + def _update_market_cap_for_coin(self, coin_id: str, coin_symbol: str, conn): + """Fetches and appends new market cap data for a single coin.""" + table_name = f"{coin_symbol}_market_cap" + + last_date_in_db = self._get_last_date_from_db(table_name, conn) + + days_to_fetch = 365 + if last_date_in_db: + delta_days = (datetime.now() - last_date_in_db).days + if delta_days <= 0: + logging.info(f"Market cap data for '{coin_symbol}' is already up-to-date.") + return + days_to_fetch = min(delta_days + 1, 365) + else: + logging.info(f"No existing data found. Fetching initial {days_to_fetch} days for {coin_symbol}.") + + df = self._fetch_historical_data(coin_id, days=days_to_fetch) + + if df.empty: + logging.warning(f"No market cap data returned from API for {coin_symbol}.") + return + + if last_date_in_db: + df = df[df['datetime_utc'] > last_date_in_db] + + if not df.empty: + df.to_sql(table_name, conn, if_exists='append', index=False) + logging.info(f"Successfully saved {len(df)} new daily market cap records for {coin_symbol}.") + else: + logging.info(f"Data was fetched, but no new records needed saving for '{coin_symbol}'.") + + def _get_last_date_from_db(self, table_name: str, conn) -> pd.Timestamp: + """Gets the most recent date from a market cap table as a pandas Timestamp.""" + try: + cursor = conn.cursor() + cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';") + if not cursor.fetchone(): + return None + + last_date_str = pd.read_sql(f'SELECT MAX(datetime_utc) FROM "{table_name}"', conn).iloc[0, 0] + return pd.to_datetime(last_date_str) if last_date_str else None + except Exception as e: + logging.error(f"Could not read last date from table '{table_name}': {e}") + return None + + def _fetch_historical_data(self, coin_id: str, days: int) -> pd.DataFrame: + """Fetches historical market chart data from CoinGecko for a specified number of days.""" + url = f"{self.api_base_url}/coins/{coin_id}/market_chart" + params = { "vs_currency": "usd", "days": days, "interval": "daily" } + headers = {"x-cg-demo-api-key": self.api_key} + + try: + logging.debug(f"Fetching last {days} days for {coin_id}...") + response = requests.get(url, headers=headers) + response.raise_for_status() + data = response.json() + + market_caps = data.get('market_caps', []) + if not market_caps: return pd.DataFrame() + + df = pd.DataFrame(market_caps, columns=['timestamp_ms', 'market_cap']) + df['datetime_utc'] = pd.to_datetime(df['timestamp_ms'], unit='ms') + df.drop_duplicates(subset=['datetime_utc'], keep='last', inplace=True) + return df[['datetime_utc', 'market_cap']] + + except requests.exceptions.RequestException as e: + logging.error(f"API request failed for {coin_id}: {e}.") + return pd.DataFrame() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Fetch historical market cap data from CoinGecko.") + parser.add_argument( + "--coins", + nargs='+', + default=["BTC", "ETH", "SOL", "BNB", "HYPE", "ASTER", "ZEC", "PUMP", "SUI"], + help="List of coin symbols to fetch (e.g., BTC ETH)." + ) + parser.add_argument( + "--log-level", + default="normal", + choices=['off', 'normal', 'debug'], + help="Set the logging level for the script." + ) + args = parser.parse_args() + + fetcher = MarketCapFetcher(log_level=args.log_level, coins=args.coins) + fetcher.run() +