""" Simplified FastAPI server - working version Removes the complex WebSocket manager that was causing issues """ import os import asyncio import logging from dotenv import load_dotenv load_dotenv() from datetime import datetime, timedelta, timezone from typing import Optional, List from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Query, BackgroundTasks, Response from fastapi.staticfiles import StaticFiles from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware import asyncpg import csv import io from pydantic import BaseModel, Field # Imports for backtest runner from src.data_collector.database import DatabaseManager from src.data_collector.indicator_engine import IndicatorEngine, IndicatorConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Database connection settings DB_HOST = os.getenv('DB_HOST', 'localhost') DB_PORT = int(os.getenv('DB_PORT', 5432)) DB_NAME = os.getenv('DB_NAME', 'btc_data') DB_USER = os.getenv('DB_USER', 'btc_bot') DB_PASSWORD = os.getenv('DB_PASSWORD', '') async def get_db_pool(): """Create database connection pool""" logger.info(f"Connecting to database: {DB_HOST}:{DB_PORT}/{DB_NAME} as {DB_USER}") return await asyncpg.create_pool( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD, min_size=2, max_size=20, max_inactive_connection_lifetime=300 ) pool = None @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifespan""" global pool pool = await get_db_pool() logger.info("API Server started successfully") yield if pool: await pool.close() logger.info("API Server stopped") app = FastAPI( title="BTC Bot Data API", description="REST API for accessing BTC candle data", version="1.1.0", lifespan=lifespan ) # Enable CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): """Root endpoint""" return { "message": "BTC Bot Data API", "docs": "/docs", "dashboard": "/dashboard", "status": "operational" } @app.get("/api/v1/candles") async def get_candles( symbol: str = Query("BTC", description="Trading pair symbol"), interval: str = Query("1m", description="Candle interval"), start: Optional[datetime] = Query(None, description="Start time (ISO format)"), end: Optional[datetime] = Query(None, description="End time (ISO format)"), limit: int = Query(1000, ge=1, le=10000, description="Maximum number of candles") ): """Get candle data for a symbol""" async with pool.acquire() as conn: query = """ SELECT time, symbol, interval, open, high, low, close, volume, validated FROM candles WHERE symbol = $1 AND interval = $2 """ params = [symbol, interval] if start: query += f" AND time >= ${len(params) + 1}" params.append(start) if end: query += f" AND time <= ${len(params) + 1}" params.append(end) query += f" ORDER BY time DESC LIMIT ${len(params) + 1}" params.append(limit) rows = await conn.fetch(query, *params) return { "symbol": symbol, "interval": interval, "count": len(rows), "candles": [dict(row) for row in rows] } from typing import Optional, List # ... @app.get("/api/v1/candles/bulk") async def get_candles_bulk( symbol: str = Query("BTC"), timeframes: List[str] = Query(["1h"]), start: datetime = Query(...), end: Optional[datetime] = Query(None), ): """Get multiple timeframes of candles in a single request for client-side processing""" logger.info(f"Bulk candle request: {symbol}, TFs: {timeframes}, Start: {start}, End: {end}") if not end: end = datetime.now(timezone.utc) results = {} async with pool.acquire() as conn: for tf in timeframes: rows = await conn.fetch(""" SELECT time, open, high, low, close, volume FROM candles WHERE symbol = $1 AND interval = $2 AND time >= $3 AND time <= $4 ORDER BY time ASC """, symbol, tf, start, end) results[tf] = [ { "time": r['time'].isoformat(), "open": float(r['open']), "high": float(r['high']), "low": float(r['low']), "close": float(r['close']), "volume": float(r['volume']) } for r in rows ] logger.info(f"Returning {sum(len(v) for v in results.values())} candles total") return results @app.get("/api/v1/candles/latest") async def get_latest_candle(symbol: str = "BTC", interval: str = "1m"): """Get the most recent candle""" async with pool.acquire() as conn: row = await conn.fetchrow(""" SELECT time, symbol, interval, open, high, low, close, volume FROM candles WHERE symbol = $1 AND interval = $2 ORDER BY time DESC LIMIT 1 """, symbol, interval) if not row: raise HTTPException(status_code=404, detail="No data found") return dict(row) @app.get("/api/v1/stats") async def get_stats(symbol: str = "BTC"): """Get trading statistics""" async with pool.acquire() as conn: # Get latest price and 24h stats latest = await conn.fetchrow(""" SELECT close, time FROM candles WHERE symbol = $1 AND interval = '1m' ORDER BY time DESC LIMIT 1 """, symbol) day_ago = await conn.fetchrow(""" SELECT close FROM candles WHERE symbol = $1 AND interval = '1m' AND time <= NOW() - INTERVAL '24 hours' ORDER BY time DESC LIMIT 1 """, symbol) stats_24h = await conn.fetchrow(""" SELECT MAX(high) as high_24h, MIN(low) as low_24h, SUM(volume) as volume_24h FROM candles WHERE symbol = $1 AND interval = '1m' AND time > NOW() - INTERVAL '24 hours' """, symbol) if not latest: raise HTTPException(status_code=404, detail="No data found") current_price = float(latest['close']) previous_price = float(day_ago['close']) if day_ago else current_price change_24h = ((current_price - previous_price) / previous_price * 100) if previous_price else 0 return { "symbol": symbol, "current_price": current_price, "change_24h": round(change_24h, 2), "high_24h": float(stats_24h['high_24h']) if stats_24h['high_24h'] else current_price, "low_24h": float(stats_24h['low_24h']) if stats_24h['low_24h'] else current_price, "volume_24h": float(stats_24h['volume_24h']) if stats_24h['volume_24h'] else 0, "last_update": latest['time'].isoformat() } @app.get("/api/v1/health") async def health_check(): """System health check""" try: async with pool.acquire() as conn: latest = await conn.fetchrow(""" SELECT symbol, MAX(time) as last_time, COUNT(*) as count FROM candles WHERE time > NOW() - INTERVAL '24 hours' GROUP BY symbol """) return { "status": "healthy", "database": "connected", "latest_candles": dict(latest) if latest else None, "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Health check failed: {e}") raise HTTPException(status_code=503, detail=f"Health check failed: {str(e)}") @app.get("/api/v1/indicators") async def get_indicators( symbol: str = Query("BTC", description="Trading pair symbol"), interval: str = Query("1d", description="Candle interval"), name: str = Query(None, description="Filter by indicator name (e.g., ma44)"), start: Optional[datetime] = Query(None, description="Start time"), end: Optional[datetime] = Query(None, description="End time"), limit: int = Query(1000, le=5000) ): """Get indicator values""" async with pool.acquire() as conn: query = """ SELECT time, indicator_name, value FROM indicators WHERE symbol = $1 AND interval = $2 """ params = [symbol, interval] if name: query += f" AND indicator_name = ${len(params) + 1}" params.append(name) if start: query += f" AND time >= ${len(params) + 1}" params.append(start) if end: query += f" AND time <= ${len(params) + 1}" params.append(end) query += f" ORDER BY time DESC LIMIT ${len(params) + 1}" params.append(limit) rows = await conn.fetch(query, *params) # Group by time for easier charting grouped = {} for row in rows: ts = row['time'].isoformat() if ts not in grouped: grouped[ts] = {'time': ts} grouped[ts][row['indicator_name']] = float(row['value']) return { "symbol": symbol, "interval": interval, "data": list(grouped.values()) } @app.get("/api/v1/decisions") async def get_decisions( symbol: str = Query("BTC"), interval: Optional[str] = Query(None), backtest_id: Optional[str] = Query(None), limit: int = Query(100, le=1000) ): """Get brain decisions""" async with pool.acquire() as conn: query = """ SELECT time, interval, decision_type, strategy, confidence, price_at_decision, indicator_snapshot, reasoning, backtest_id FROM decisions WHERE symbol = $1 """ params = [symbol] if interval: query += f" AND interval = ${len(params) + 1}" params.append(interval) if backtest_id: query += f" AND backtest_id = ${len(params) + 1}" params.append(backtest_id) else: query += " AND backtest_id IS NULL" query += f" ORDER BY time DESC LIMIT ${len(params) + 1}" params.append(limit) rows = await conn.fetch(query, *params) return [dict(row) for row in rows] @app.get("/api/v1/backtests") async def list_backtests(symbol: Optional[str] = None, limit: int = 20): """List historical backtests""" async with pool.acquire() as conn: query = """ SELECT id, strategy, symbol, start_time, end_time, intervals, results, created_at FROM backtest_runs """ params = [] if symbol: query += " WHERE symbol = $1" params.append(symbol) query += f" ORDER BY created_at DESC LIMIT ${len(params) + 1}" params.append(limit) rows = await conn.fetch(query, *params) return [dict(row) for row in rows] @app.get("/api/v1/ta") async def get_technical_analysis( symbol: str = Query("BTC", description="Trading pair symbol"), interval: str = Query("1d", description="Candle interval") ): """ Get technical analysis for a symbol Uses stored indicators from DB if available, falls back to on-the-fly calc """ try: async with pool.acquire() as conn: # 1. Get latest price latest = await conn.fetchrow(""" SELECT close, time FROM candles WHERE symbol = $1 AND interval = $2 ORDER BY time DESC LIMIT 1 """, symbol, interval) if not latest: return {"error": "No candle data found"} current_price = float(latest['close']) timestamp = latest['time'] # 2. Get latest indicators from DB indicators = await conn.fetch(""" SELECT indicator_name, value FROM indicators WHERE symbol = $1 AND interval = $2 AND time <= $3 ORDER BY time DESC """, symbol, interval, timestamp) # Convert list to dict, e.g. {'ma44': 65000, 'ma125': 64000} # We take the most recent value for each indicator ind_map = {} for row in indicators: name = row['indicator_name'] if name not in ind_map: ind_map[name] = float(row['value']) ma_44 = ind_map.get('ma44') ma_125 = ind_map.get('ma125') # Determine trend if ma_44 and ma_125: if current_price > ma_44 > ma_125: trend = "Bullish" trend_strength = "Strong" if current_price > ma_44 * 1.05 else "Moderate" elif current_price < ma_44 < ma_125: trend = "Bearish" trend_strength = "Strong" if current_price < ma_44 * 0.95 else "Moderate" else: trend = "Neutral" trend_strength = "Consolidation" else: trend = "Unknown" trend_strength = "Insufficient data" # 3. Find support/resistance (simple recent high/low) rows = await conn.fetch(""" SELECT high, low FROM candles WHERE symbol = $1 AND interval = $2 ORDER BY time DESC LIMIT 20 """, symbol, interval) if rows: highs = [float(r['high']) for r in rows] lows = [float(r['low']) for r in rows] resistance = max(highs) support = min(lows) price_range = resistance - support if price_range > 0: position = (current_price - support) / price_range * 100 else: position = 50 else: resistance = current_price support = current_price position = 50 return { "symbol": symbol, "interval": interval, "timestamp": timestamp.isoformat(), "current_price": round(current_price, 2), "moving_averages": { "ma_44": round(ma_44, 2) if ma_44 else None, "ma_125": round(ma_125, 2) if ma_125 else None, "price_vs_ma44": round((current_price / ma_44 - 1) * 100, 2) if ma_44 else None, "price_vs_ma125": round((current_price / ma_125 - 1) * 100, 2) if ma_125 else None }, "trend": { "direction": trend, "strength": trend_strength, "signal": "Buy" if trend == "Bullish" and trend_strength == "Strong" else "Sell" if trend == "Bearish" and trend_strength == "Strong" else "Hold" }, "levels": { "resistance": round(resistance, 2), "support": round(support, 2), "position_in_range": round(position, 1) }, "ai_placeholder": { "available": False, "message": "AI analysis available via Gemini or local LLM", "action": "Click to analyze with AI" } } except Exception as e: logger.error(f"Technical analysis error: {e}") raise HTTPException(status_code=500, detail=f"Technical analysis failed: {str(e)}") @app.get("/api/v1/export/csv") async def export_csv( symbol: str = "BTC", interval: str = "1m", days: int = Query(7, ge=1, le=365, description="Number of days to export") ): """Export candle data to CSV""" start_date = datetime.utcnow() - timedelta(days=days) async with pool.acquire() as conn: query = """ SELECT time, open, high, low, close, volume FROM candles WHERE symbol = $1 AND interval = $2 AND time >= $3 ORDER BY time """ rows = await conn.fetch(query, symbol, interval, start_date) if not rows: raise HTTPException(status_code=404, detail="No data found for export") output = io.StringIO() writer = csv.writer(output) writer.writerow(['timestamp', 'open', 'high', 'low', 'close', 'volume']) for row in rows: writer.writerow([ row['time'].isoformat(), row['open'], row['high'], row['low'], row['close'], row['volume'] ]) output.seek(0) return StreamingResponse( io.BytesIO(output.getvalue().encode()), media_type="text/csv", headers={ "Content-Disposition": f"attachment; filename={symbol}_{interval}_{days}d.csv" } ) # Serve static files for dashboard app.mount("/dashboard", StaticFiles(directory="src/api/dashboard/static", html=True), name="dashboard") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)