Initial commit: BTC Bot with dashboard, TA analysis, and 14 timeframes

This commit is contained in:
BTC Bot
2026-02-11 22:27:51 +01:00
commit 933537d759
32 changed files with 4689 additions and 0 deletions

View File

@ -0,0 +1,842 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>BTC Trading Dashboard</title>
<script src="https://unpkg.com/lightweight-charts@4.1.0/dist/lightweight-charts.standalone.production.js"></script>
<style>
:root {
--tv-bg: #131722;
--tv-panel-bg: #1e222d;
--tv-border: #2a2e39;
--tv-text: #d1d4dc;
--tv-text-secondary: #787b86;
--tv-green: #26a69a;
--tv-red: #ef5350;
--tv-blue: #2962ff;
--tv-hover: #2a2e39;
}
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: var(--tv-bg);
color: var(--tv-text);
height: 100vh;
display: flex;
flex-direction: column;
overflow: hidden;
}
.toolbar {
background: var(--tv-panel-bg);
border-bottom: 1px solid var(--tv-border);
padding: 8px 16px;
display: flex;
align-items: center;
gap: 16px;
height: 56px;
}
.toolbar-left {
display: flex;
align-items: center;
gap: 12px;
flex: 1;
overflow: hidden;
}
.symbol-badge {
background: var(--tv-bg);
padding: 6px 12px;
border-radius: 4px;
font-weight: 600;
font-size: 14px;
border: 1px solid var(--tv-border);
white-space: nowrap;
}
.timeframe-scroll {
display: flex;
gap: 2px;
overflow-x: auto;
scrollbar-width: thin;
scrollbar-color: var(--tv-border) transparent;
flex: 1;
}
.timeframe-scroll::-webkit-scrollbar {
height: 4px;
}
.timeframe-scroll::-webkit-scrollbar-track {
background: transparent;
}
.timeframe-scroll::-webkit-scrollbar-thumb {
background: var(--tv-border);
border-radius: 2px;
}
.timeframe-btn {
background: transparent;
border: none;
color: var(--tv-text-secondary);
padding: 6px 10px;
font-size: 12px;
cursor: pointer;
border-radius: 4px;
transition: all 0.2s;
white-space: nowrap;
min-width: 36px;
}
.timeframe-btn:hover {
background: var(--tv-hover);
color: var(--tv-text);
}
.timeframe-btn.active {
background: var(--tv-blue);
color: white;
}
.connection-status {
display: flex;
align-items: center;
gap: 8px;
margin-left: auto;
white-space: nowrap;
}
.status-dot {
width: 8px;
height: 8px;
border-radius: 50%;
background: var(--tv-green);
}
.status-text {
font-size: 12px;
color: var(--tv-text-secondary);
}
.stats-panel {
background: var(--tv-panel-bg);
border-bottom: 1px solid var(--tv-border);
padding: 8px 16px;
display: flex;
gap: 32px;
height: 44px;
align-items: center;
}
.stat-item {
display: flex;
flex-direction: column;
}
.stat-label {
font-size: 10px;
color: var(--tv-text-secondary);
text-transform: uppercase;
letter-spacing: 0.5px;
}
.stat-value {
font-size: 14px;
font-weight: 600;
}
.stat-value.positive { color: var(--tv-green); }
.stat-value.negative { color: var(--tv-red); }
.main-container {
flex: 1;
display: flex;
flex-direction: column;
overflow: hidden;
}
.chart-wrapper {
flex: 2;
position: relative;
background: var(--tv-bg);
min-height: 0;
}
#chart {
width: 100%;
height: 100%;
}
.ta-panel {
flex: 1;
background: var(--tv-panel-bg);
border-top: 1px solid var(--tv-border);
display: flex;
flex-direction: column;
min-height: 200px;
max-height: 400px;
}
.ta-header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 12px 16px;
border-bottom: 1px solid var(--tv-border);
background: var(--tv-bg);
}
.ta-title {
font-size: 14px;
font-weight: 600;
display: flex;
align-items: center;
gap: 8px;
}
.ta-interval {
background: var(--tv-blue);
color: white;
padding: 2px 8px;
border-radius: 4px;
font-size: 11px;
}
.ta-actions {
display: flex;
gap: 8px;
}
.ta-btn {
background: var(--tv-bg);
border: 1px solid var(--tv-border);
color: var(--tv-text);
padding: 6px 12px;
border-radius: 4px;
cursor: pointer;
font-size: 12px;
transition: all 0.2s;
}
.ta-btn:hover {
background: var(--tv-hover);
border-color: var(--tv-blue);
}
.ta-btn.ai-btn {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border: none;
color: white;
}
.ta-btn.ai-btn:hover {
opacity: 0.9;
}
.ta-last-update {
font-size: 11px;
color: var(--tv-text-secondary);
}
.ta-content {
flex: 1;
padding: 16px;
display: grid;
grid-template-columns: repeat(4, 1fr);
gap: 16px;
overflow-y: auto;
}
.ta-section {
background: var(--tv-bg);
border: 1px solid var(--tv-border);
border-radius: 8px;
padding: 12px;
}
.ta-section-title {
font-size: 11px;
color: var(--tv-text-secondary);
text-transform: uppercase;
margin-bottom: 8px;
letter-spacing: 0.5px;
}
.ta-trend {
display: flex;
align-items: center;
gap: 8px;
font-size: 18px;
font-weight: 600;
}
.ta-trend.bullish { color: var(--tv-green); }
.ta-trend.bearish { color: var(--tv-red); }
.ta-trend.neutral { color: var(--tv-text-secondary); }
.ta-strength {
font-size: 12px;
color: var(--tv-text-secondary);
margin-top: 4px;
}
.ta-signal {
display: inline-block;
padding: 4px 12px;
border-radius: 4px;
font-size: 12px;
font-weight: 600;
margin-top: 8px;
}
.ta-signal.buy {
background: rgba(38, 166, 154, 0.2);
color: var(--tv-green);
}
.ta-signal.sell {
background: rgba(239, 83, 80, 0.2);
color: var(--tv-red);
}
.ta-signal.hold {
background: rgba(120, 123, 134, 0.2);
color: var(--tv-text-secondary);
}
.ta-ma-row {
display: flex;
justify-content: space-between;
padding: 6px 0;
border-bottom: 1px solid var(--tv-border);
font-size: 13px;
}
.ta-ma-row:last-child {
border-bottom: none;
}
.ta-ma-label {
color: var(--tv-text-secondary);
}
.ta-ma-value {
font-weight: 600;
}
.ta-ma-change {
font-size: 11px;
margin-left: 4px;
}
.ta-ma-change.positive { color: var(--tv-green); }
.ta-ma-change.negative { color: var(--tv-red); }
.ta-level {
display: flex;
justify-content: space-between;
padding: 8px 0;
font-size: 14px;
}
.ta-level-label {
color: var(--tv-text-secondary);
}
.ta-level-value {
font-weight: 600;
font-family: 'Courier New', monospace;
}
.ta-position-bar {
width: 100%;
height: 6px;
background: var(--tv-border);
border-radius: 3px;
margin-top: 8px;
position: relative;
}
.ta-position-marker {
position: absolute;
width: 12px;
height: 12px;
background: var(--tv-blue);
border-radius: 50%;
top: 50%;
transform: translate(-50%, -50%);
border: 2px solid var(--tv-bg);
}
.ta-loading {
display: flex;
align-items: center;
justify-content: center;
height: 100%;
color: var(--tv-text-secondary);
font-size: 14px;
}
.ta-error {
display: flex;
align-items: center;
justify-content: center;
height: 100%;
color: var(--tv-red);
font-size: 14px;
}
@media (max-width: 1200px) {
.ta-content {
grid-template-columns: repeat(2, 1fr);
}
}
@media (max-width: 768px) {
.ta-content {
grid-template-columns: 1fr;
}
}
</style>
</head>
<body>
<div class="toolbar">
<div class="toolbar-left">
<span class="symbol-badge">BTC/USD</span>
<div class="timeframe-scroll" id="timeframeContainer">
<!-- Timeframes will be inserted here by JS -->
</div>
</div>
<div class="connection-status">
<div class="status-dot" id="statusDot"></div>
<span class="status-text" id="statusText">Live</span>
</div>
</div>
<div class="stats-panel">
<div class="stat-item">
<span class="stat-label">Price</span>
<span class="stat-value" id="currentPrice">--</span>
</div>
<div class="stat-item">
<span class="stat-label">Change</span>
<span class="stat-value" id="priceChange">--</span>
</div>
<div class="stat-item">
<span class="stat-label">High</span>
<span class="stat-value" id="dailyHigh">--</span>
</div>
<div class="stat-item">
<span class="stat-label">Low</span>
<span class="stat-value" id="dailyLow">--</span>
</div>
</div>
<div class="main-container">
<div class="chart-wrapper">
<div id="chart"></div>
</div>
<div class="ta-panel" id="taPanel">
<div class="ta-header">
<div class="ta-title">
Technical Analysis
<span class="ta-interval" id="taInterval">1D</span>
</div>
<div class="ta-actions">
<span class="ta-last-update" id="taLastUpdate">--</span>
<button class="ta-btn ai-btn" id="aiBtn" onclick="openAIAnalysis()">
🤖 AI Analysis
</button>
<button class="ta-btn" onclick="refreshTA()">
🔄 Refresh
</button>
</div>
</div>
<div class="ta-content" id="taContent">
<div class="ta-loading">Loading technical analysis...</div>
</div>
</div>
</div>
<script>
class TradingDashboard {
constructor() {
this.chart = null;
this.candleSeries = null;
this.currentInterval = '1d';
this.intervals = ['1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '8h', '12h', '1d', '3d', '1w', '1M'];
this.allData = new Map();
this.isLoading = false;
this.hasInitialLoad = false;
this.taData = null;
this.init();
}
init() {
this.createTimeframeButtons();
this.initChart();
this.initEventListeners();
this.loadInitialData();
this.loadTA();
setInterval(() => this.loadNewData(), 15000);
}
createTimeframeButtons() {
const container = document.getElementById('timeframeContainer');
this.intervals.forEach(interval => {
const btn = document.createElement('button');
btn.className = 'timeframe-btn';
btn.dataset.interval = interval;
btn.textContent = interval;
if (interval === this.currentInterval) {
btn.classList.add('active');
}
btn.addEventListener('click', () => this.switchTimeframe(interval));
container.appendChild(btn);
});
}
initChart() {
const chartContainer = document.getElementById('chart');
this.chart = LightweightCharts.createChart(chartContainer, {
layout: {
background: { color: '#131722' },
textColor: '#d1d4dc',
},
grid: {
vertLines: { color: '#2a2e39' },
horzLines: { color: '#2a2e39' },
},
crosshair: {
mode: LightweightCharts.CrosshairMode.Normal,
},
rightPriceScale: {
borderColor: '#2a2e39',
},
timeScale: {
borderColor: '#2a2e39',
timeVisible: true,
},
});
this.candleSeries = this.chart.addCandlestickSeries({
upColor: '#26a69a',
downColor: '#ef5350',
borderUpColor: '#26a69a',
borderDownColor: '#ef5350',
wickUpColor: '#26a69a',
wickDownColor: '#ef5350',
});
this.chart.timeScale().subscribeVisibleLogicalRangeChange(this.onVisibleRangeChange.bind(this));
window.addEventListener('resize', () => {
this.chart.applyOptions({
width: chartContainer.clientWidth,
height: chartContainer.clientHeight,
});
});
}
initEventListeners() {
document.addEventListener('keydown', (e) => {
if (e.target.tagName === 'INPUT' || e.target.tagName === 'BUTTON') return;
const shortcuts = {
'1': '1m', '2': '3m', '3': '5m', '4': '15m', '5': '30m',
'6': '1h', '7': '2h', '8': '4h', '9': '8h', '0': '12h',
'd': '1d', 'D': '1d', 'w': '1w', 'W': '1w', 'm': '1M', 'M': '1M'
};
if (shortcuts[e.key]) {
this.switchTimeframe(shortcuts[e.key]);
}
});
}
async loadInitialData() {
await this.loadData(500, true);
this.hasInitialLoad = true;
}
async loadData(limit = 500, fitToContent = false) {
if (this.isLoading) return;
this.isLoading = true;
try {
const visibleRange = this.chart.timeScale().getVisibleLogicalRange();
const response = await fetch(`/api/v1/candles?symbol=BTC&interval=${this.currentInterval}&limit=${limit}`);
const data = await response.json();
if (data.candles && data.candles.length > 0) {
const chartData = data.candles.reverse().map(c => ({
time: Math.floor(new Date(c.time).getTime() / 1000),
open: parseFloat(c.open),
high: parseFloat(c.high),
low: parseFloat(c.low),
close: parseFloat(c.close)
}));
const existingData = this.allData.get(this.currentInterval) || [];
const mergedData = this.mergeData(existingData, chartData);
this.allData.set(this.currentInterval, mergedData);
this.candleSeries.setData(mergedData);
if (fitToContent) {
this.chart.timeScale().fitContent();
} else if (visibleRange) {
this.chart.timeScale().setVisibleLogicalRange(visibleRange);
}
const latest = mergedData[mergedData.length - 1];
this.updateStats(latest);
}
} catch (error) {
console.error('Error loading data:', error);
} finally {
this.isLoading = false;
}
}
async loadNewData() {
if (!this.hasInitialLoad) return;
try {
const response = await fetch(`/api/v1/candles?symbol=BTC&interval=${this.currentInterval}&limit=100`);
const data = await response.json();
if (data.candles && data.candles.length > 0) {
const chartData = data.candles.reverse().map(c => ({
time: Math.floor(new Date(c.time).getTime() / 1000),
open: parseFloat(c.open),
high: parseFloat(c.high),
low: parseFloat(c.low),
close: parseFloat(c.close)
}));
const existingData = this.allData.get(this.currentInterval) || [];
const mergedData = this.mergeData(existingData, chartData);
this.allData.set(this.currentInterval, mergedData);
this.candleSeries.setData(mergedData);
const latest = mergedData[mergedData.length - 1];
this.updateStats(latest);
}
} catch (error) {
console.error('Error loading new data:', error);
}
}
mergeData(existing, newData) {
const dataMap = new Map();
existing.forEach(c => dataMap.set(c.time, c));
newData.forEach(c => dataMap.set(c.time, c));
return Array.from(dataMap.values()).sort((a, b) => a.time - b.time);
}
onVisibleRangeChange() {
if (!this.hasInitialLoad || this.isLoading) return;
const visibleRange = this.chart.timeScale().getVisibleLogicalRange();
if (!visibleRange) return;
const data = this.candleSeries.data();
if (!data || data.length === 0) return;
if (visibleRange.from < 10) {
const oldestCandle = data[0];
if (oldestCandle) {
this.loadHistoricalData(oldestCandle.time);
}
}
}
async loadHistoricalData(beforeTime) {
if (this.isLoading) return;
this.isLoading = true;
try {
const endTime = new Date(beforeTime * 1000);
const startTime = new Date(endTime.getTime() - 24 * 60 * 60 * 1000);
const response = await fetch(
`/api/v1/candles?symbol=BTC&interval=${this.currentInterval}&start=${startTime.toISOString()}&end=${endTime.toISOString()}&limit=500`
);
const data = await response.json();
if (data.candles && data.candles.length > 0) {
const chartData = data.candles.reverse().map(c => ({
time: Math.floor(new Date(c.time).getTime() / 1000),
open: parseFloat(c.open),
high: parseFloat(c.high),
low: parseFloat(c.low),
close: parseFloat(c.close)
}));
const existingData = this.allData.get(this.currentInterval) || [];
const mergedData = this.mergeData(existingData, chartData);
this.allData.set(this.currentInterval, mergedData);
this.candleSeries.setData(mergedData);
}
} catch (error) {
console.error('Error loading historical data:', error);
} finally {
this.isLoading = false;
}
}
async loadTA() {
try {
const response = await fetch(`/api/v1/ta?symbol=BTC&interval=${this.currentInterval}`);
this.taData = await response.json();
this.renderTA();
} catch (error) {
console.error('Error loading TA:', error);
document.getElementById('taContent').innerHTML = '<div class="ta-error">Failed to load technical analysis</div>';
}
}
renderTA() {
if (!this.taData || this.taData.error) {
document.getElementById('taContent').innerHTML = `<div class="ta-error">${this.taData?.error || 'No data available'}</div>`;
return;
}
const data = this.taData;
const trendClass = data.trend.direction.toLowerCase();
const signalClass = data.trend.signal.toLowerCase();
const ma44Change = data.moving_averages.price_vs_ma44;
const ma125Change = data.moving_averages.price_vs_ma125;
document.getElementById('taInterval').textContent = this.currentInterval.toUpperCase();
document.getElementById('taLastUpdate').textContent = new Date().toLocaleTimeString();
document.getElementById('taContent').innerHTML = `
<div class="ta-section">
<div class="ta-section-title">Trend Analysis</div>
<div class="ta-trend ${trendClass}">
${data.trend.direction} ${trendClass === 'bullish' ? '↑' : trendClass === 'bearish' ? '↓' : '→'}
</div>
<div class="ta-strength">${data.trend.strength}</div>
<span class="ta-signal ${signalClass}">${data.trend.signal}</span>
</div>
<div class="ta-section">
<div class="ta-section-title">Moving Averages</div>
<div class="ta-ma-row">
<span class="ta-ma-label">MA 44</span>
<span class="ta-ma-value">
${data.moving_averages.ma_44 ? data.moving_averages.ma_44.toFixed(2) : 'N/A'}
${ma44Change !== null ? `<span class="ta-ma-change ${ma44Change >= 0 ? 'positive' : 'negative'}">${ma44Change >= 0 ? '+' : ''}${ma44Change.toFixed(1)}%</span>` : ''}
</span>
</div>
<div class="ta-ma-row">
<span class="ta-ma-label">MA 125</span>
<span class="ta-ma-value">
${data.moving_averages.ma_125 ? data.moving_averages.ma_125.toFixed(2) : 'N/A'}
${ma125Change !== null ? `<span class="ta-ma-change ${ma125Change >= 0 ? 'positive' : 'negative'}">${ma125Change >= 0 ? '+' : ''}${ma125Change.toFixed(1)}%</span>` : ''}
</span>
</div>
</div>
<div class="ta-section">
<div class="ta-section-title">Key Levels</div>
<div class="ta-level">
<span class="ta-level-label">Resistance</span>
<span class="ta-level-value">$${data.levels.resistance.toLocaleString()}</span>
</div>
<div class="ta-level">
<span class="ta-level-label">Support</span>
<span class="ta-level-value">$${data.levels.support.toLocaleString()}</span>
</div>
<div class="ta-position-bar">
<div class="ta-position-marker" style="left: ${data.levels.position_in_range}%"></div>
</div>
<div style="font-size: 11px; color: var(--tv-text-secondary); margin-top: 4px; text-align: center;">
Position in range: ${data.levels.position_in_range.toFixed(1)}%
</div>
</div>
<div class="ta-section">
<div class="ta-section-title">Price Info</div>
<div class="ta-level">
<span class="ta-level-label">Current</span>
<span class="ta-level-value">$${data.current_price.toLocaleString()}</span>
</div>
<div style="font-size: 12px; color: var(--tv-text-secondary); margin-top: 8px;">
Based on last 200 candles<br>
Strategy: Trend following with MA crossovers
</div>
</div>
`;
}
updateStats(candle) {
const price = candle.close;
const change = ((price - candle.open) / candle.open * 100);
document.getElementById('currentPrice').textContent = price.toFixed(2);
document.getElementById('currentPrice').className = 'stat-value ' + (change >= 0 ? 'positive' : 'negative');
document.getElementById('priceChange').textContent = (change >= 0 ? '+' : '') + change.toFixed(2) + '%';
document.getElementById('priceChange').className = 'stat-value ' + (change >= 0 ? 'positive' : 'negative');
document.getElementById('dailyHigh').textContent = candle.high.toFixed(2);
document.getElementById('dailyLow').textContent = candle.low.toFixed(2);
}
switchTimeframe(interval) {
if (!this.intervals.includes(interval) || interval === this.currentInterval) return;
this.currentInterval = interval;
this.hasInitialLoad = false;
document.querySelectorAll('.timeframe-btn').forEach(btn => {
btn.classList.toggle('active', btn.dataset.interval === interval);
});
this.allData.delete(interval);
this.loadInitialData();
this.loadTA();
}
}
function refreshTA() {
if (window.dashboard) {
window.dashboard.loadTA();
}
}
function openAIAnalysis() {
const symbol = 'BTC';
const interval = window.dashboard?.currentInterval || '1d';
const prompt = `Analyze Bitcoin (${symbol}) ${interval} chart. Current trend, support/resistance levels, and trading recommendation. Technical indicators: MA44, MA125.`;
const geminiUrl = `https://gemini.google.com/app?prompt=${encodeURIComponent(prompt)}`;
window.open(geminiUrl, '_blank');
}
document.addEventListener('DOMContentLoaded', () => {
window.dashboard = new TradingDashboard();
});
</script>
</body>
</html>

376
src/api/server.py Normal file
View File

@ -0,0 +1,376 @@
"""
Simplified FastAPI server - working version
Removes the complex WebSocket manager that was causing issues
"""
import os
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query
from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncpg
import csv
import io
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database connection settings
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5432))
DB_NAME = os.getenv('DB_NAME', 'btc_data')
DB_USER = os.getenv('DB_USER', 'btc_bot')
DB_PASSWORD = os.getenv('DB_PASSWORD', '')
async def get_db_pool():
"""Create database connection pool"""
return await asyncpg.create_pool(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
min_size=1,
max_size=10
)
pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan"""
global pool
pool = await get_db_pool()
logger.info("API Server started successfully")
yield
if pool:
await pool.close()
logger.info("API Server stopped")
app = FastAPI(
title="BTC Bot Data API",
description="REST API for accessing BTC candle data",
version="1.1.0",
lifespan=lifespan
)
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""Root endpoint"""
return {
"message": "BTC Bot Data API",
"docs": "/docs",
"dashboard": "/dashboard",
"status": "operational"
}
@app.get("/api/v1/candles")
async def get_candles(
symbol: str = Query("BTC", description="Trading pair symbol"),
interval: str = Query("1m", description="Candle interval"),
start: Optional[datetime] = Query(None, description="Start time (ISO format)"),
end: Optional[datetime] = Query(None, description="End time (ISO format)"),
limit: int = Query(1000, ge=1, le=10000, description="Maximum number of candles")
):
"""Get candle data for a symbol"""
async with pool.acquire() as conn:
query = """
SELECT time, symbol, interval, open, high, low, close, volume, validated
FROM candles
WHERE symbol = $1 AND interval = $2
"""
params = [symbol, interval]
if start:
query += f" AND time >= ${len(params) + 1}"
params.append(start)
if end:
query += f" AND time <= ${len(params) + 1}"
params.append(end)
query += f" ORDER BY time DESC LIMIT ${len(params) + 1}"
params.append(limit)
rows = await conn.fetch(query, *params)
return {
"symbol": symbol,
"interval": interval,
"count": len(rows),
"candles": [dict(row) for row in rows]
}
@app.get("/api/v1/candles/latest")
async def get_latest_candle(symbol: str = "BTC", interval: str = "1m"):
"""Get the most recent candle"""
async with pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT time, symbol, interval, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2
ORDER BY time DESC
LIMIT 1
""", symbol, interval)
if not row:
raise HTTPException(status_code=404, detail="No data found")
return dict(row)
@app.get("/api/v1/stats")
async def get_stats(symbol: str = "BTC"):
"""Get trading statistics"""
async with pool.acquire() as conn:
# Get latest price and 24h stats
latest = await conn.fetchrow("""
SELECT close, time
FROM candles
WHERE symbol = $1 AND interval = '1m'
ORDER BY time DESC
LIMIT 1
""", symbol)
day_ago = await conn.fetchrow("""
SELECT close
FROM candles
WHERE symbol = $1 AND interval = '1m' AND time <= NOW() - INTERVAL '24 hours'
ORDER BY time DESC
LIMIT 1
""", symbol)
stats_24h = await conn.fetchrow("""
SELECT
MAX(high) as high_24h,
MIN(low) as low_24h,
SUM(volume) as volume_24h
FROM candles
WHERE symbol = $1 AND interval = '1m' AND time > NOW() - INTERVAL '24 hours'
""", symbol)
if not latest:
raise HTTPException(status_code=404, detail="No data found")
current_price = float(latest['close'])
previous_price = float(day_ago['close']) if day_ago else current_price
change_24h = ((current_price - previous_price) / previous_price * 100) if previous_price else 0
return {
"symbol": symbol,
"current_price": current_price,
"change_24h": round(change_24h, 2),
"high_24h": float(stats_24h['high_24h']) if stats_24h['high_24h'] else current_price,
"low_24h": float(stats_24h['low_24h']) if stats_24h['low_24h'] else current_price,
"volume_24h": float(stats_24h['volume_24h']) if stats_24h['volume_24h'] else 0,
"last_update": latest['time'].isoformat()
}
@app.get("/api/v1/health")
async def health_check():
"""System health check"""
try:
async with pool.acquire() as conn:
latest = await conn.fetchrow("""
SELECT symbol, MAX(time) as last_time, COUNT(*) as count
FROM candles
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY symbol
""")
return {
"status": "healthy",
"database": "connected",
"latest_candles": dict(latest) if latest else None,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=503, detail=f"Health check failed: {str(e)}")
@app.get("/api/v1/ta")
async def get_technical_analysis(
symbol: str = Query("BTC", description="Trading pair symbol"),
interval: str = Query("1d", description="Candle interval")
):
"""
Get technical analysis for a symbol
Calculates MA 44, MA 125, trend, support/resistance
"""
try:
async with pool.acquire() as conn:
# Get enough candles for MA 125 calculation
rows = await conn.fetch("""
SELECT time, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2
ORDER BY time DESC
LIMIT 200
""", symbol, interval)
if len(rows) < 50:
return {
"symbol": symbol,
"interval": interval,
"error": "Not enough data for technical analysis",
"min_required": 50,
"available": len(rows)
}
# Reverse to chronological order
candles = list(reversed(rows))
closes = [float(c['close']) for c in candles]
# Calculate Moving Averages
def calculate_ma(data, period):
if len(data) < period:
return None
return sum(data[-period:]) / period
ma_44 = calculate_ma(closes, 44)
ma_125 = calculate_ma(closes, 125)
current_price = closes[-1]
# Determine trend
if ma_44 and ma_125:
if current_price > ma_44 > ma_125:
trend = "Bullish"
trend_strength = "Strong" if current_price > ma_44 * 1.05 else "Moderate"
elif current_price < ma_44 < ma_125:
trend = "Bearish"
trend_strength = "Strong" if current_price < ma_44 * 0.95 else "Moderate"
else:
trend = "Neutral"
trend_strength = "Consolidation"
else:
trend = "Unknown"
trend_strength = "Insufficient data"
# Find support and resistance (recent swing points)
highs = [float(c['high']) for c in candles[-20:]]
lows = [float(c['low']) for c in candles[-20:]]
resistance = max(highs)
support = min(lows)
# Calculate price position
price_range = resistance - support
if price_range > 0:
position = (current_price - support) / price_range * 100
else:
position = 50
return {
"symbol": symbol,
"interval": interval,
"timestamp": datetime.utcnow().isoformat(),
"current_price": round(current_price, 2),
"moving_averages": {
"ma_44": round(ma_44, 2) if ma_44 else None,
"ma_125": round(ma_125, 2) if ma_125 else None,
"price_vs_ma44": round((current_price / ma_44 - 1) * 100, 2) if ma_44 else None,
"price_vs_ma125": round((current_price / ma_125 - 1) * 100, 2) if ma_125 else None
},
"trend": {
"direction": trend,
"strength": trend_strength,
"signal": "Buy" if trend == "Bullish" and trend_strength == "Strong" else
"Sell" if trend == "Bearish" and trend_strength == "Strong" else "Hold"
},
"levels": {
"resistance": round(resistance, 2),
"support": round(support, 2),
"position_in_range": round(position, 1)
},
"ai_placeholder": {
"available": False,
"message": "AI analysis available via Gemini or local LLM",
"action": "Click to analyze with AI"
}
}
except Exception as e:
logger.error(f"Technical analysis error: {e}")
raise HTTPException(status_code=500, detail=f"Technical analysis failed: {str(e)}")
@app.get("/api/v1/export/csv")
async def export_csv(
symbol: str = "BTC",
interval: str = "1m",
days: int = Query(7, ge=1, le=365, description="Number of days to export")
):
"""Export candle data to CSV"""
start_date = datetime.utcnow() - timedelta(days=days)
async with pool.acquire() as conn:
query = """
SELECT time, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2 AND time >= $3
ORDER BY time
"""
rows = await conn.fetch(query, symbol, interval, start_date)
if not rows:
raise HTTPException(status_code=404, detail="No data found for export")
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['timestamp', 'open', 'high', 'low', 'close', 'volume'])
for row in rows:
writer.writerow([
row['time'].isoformat(),
row['open'],
row['high'],
row['low'],
row['close'],
row['volume']
])
output.seek(0)
return StreamingResponse(
io.BytesIO(output.getvalue().encode()),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={symbol}_{interval}_{days}d.csv"
}
)
# Serve static files for dashboard
app.mount("/dashboard", StaticFiles(directory="src/api/dashboard/static", html=True), name="dashboard")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -0,0 +1,13 @@
# Data collector module
from .websocket_client import HyperliquidWebSocket, Candle
from .candle_buffer import CandleBuffer
from .database import DatabaseManager
from .backfill import HyperliquidBackfill
__all__ = [
'HyperliquidWebSocket',
'Candle',
'CandleBuffer',
'DatabaseManager',
'HyperliquidBackfill'
]

View File

@ -0,0 +1,366 @@
"""
Hyperliquid Historical Data Backfill Module
Downloads candle data from Hyperliquid REST API with pagination support
"""
import asyncio
import logging
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional
import aiohttp
from .database import DatabaseManager
from .websocket_client import Candle
logger = logging.getLogger(__name__)
class HyperliquidBackfill:
"""
Backfills historical candle data from Hyperliquid REST API
API Limitations:
- Max 5000 candles per coin/interval combination
- 500 candles per response (requires pagination)
- Available intervals: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 8h, 12h, 1d, 3d, 1w, 1M
"""
API_URL = "https://api.hyperliquid.xyz/info"
MAX_CANDLES_PER_REQUEST = 500
MAX_TOTAL_CANDLES = 5000
# Standard timeframes supported by Hyperliquid
INTERVALS = [
"1m", "3m", "5m", "15m", "30m",
"1h", "2h", "4h", "8h", "12h",
"1d", "3d", "1w", "1M"
]
def __init__(
self,
db: DatabaseManager,
coin: str = "BTC",
intervals: Optional[List[str]] = None
):
self.db = db
self.coin = coin
self.intervals = intervals or ["1m"] # Default to 1m
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
"""Async context manager entry"""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.close()
async def fetch_candles(
self,
interval: str,
start_time: datetime,
end_time: Optional[datetime] = None
) -> List[Candle]:
"""
Fetch candles for a specific interval with pagination
Args:
interval: Candle interval (e.g., "1m", "1h", "1d")
start_time: Start time (inclusive)
end_time: End time (inclusive, defaults to now)
Returns:
List of Candle objects
"""
if interval not in self.INTERVALS:
raise ValueError(f"Invalid interval: {interval}. Must be one of {self.INTERVALS}")
end_time = end_time or datetime.now(timezone.utc)
# Convert to milliseconds
start_ms = int(start_time.timestamp() * 1000)
end_ms = int(end_time.timestamp() * 1000)
all_candles = []
total_fetched = 0
while total_fetched < self.MAX_TOTAL_CANDLES:
logger.info(f"Fetching {interval} candles from {datetime.fromtimestamp(start_ms/1000, tz=timezone.utc)} "
f"(batch {total_fetched//self.MAX_CANDLES_PER_REQUEST + 1})")
try:
batch = await self._fetch_batch(interval, start_ms, end_ms)
if not batch:
logger.info(f"No more {interval} candles available")
break
all_candles.extend(batch)
total_fetched += len(batch)
logger.info(f"Fetched {len(batch)} {interval} candles (total: {total_fetched})")
# Check if we got less than max, means we're done
if len(batch) < self.MAX_CANDLES_PER_REQUEST:
break
# Update start_time for next batch (last candle's time + 1ms)
last_candle = batch[-1]
start_ms = int(last_candle.time.timestamp() * 1000) + 1
# Small delay to avoid rate limiting
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error fetching {interval} candles: {e}")
break
logger.info(f"Backfill complete for {interval}: {len(all_candles)} candles total")
return all_candles
async def _fetch_batch(
self,
interval: str,
start_ms: int,
end_ms: int
) -> List[Candle]:
"""Fetch a single batch of candles from the API"""
if not self.session:
raise RuntimeError("Session not initialized. Use async context manager.")
payload = {
"type": "candleSnapshot",
"req": {
"coin": self.coin,
"interval": interval,
"startTime": start_ms,
"endTime": end_ms
}
}
async with self.session.post(self.API_URL, json=payload) as response:
if response.status != 200:
text = await response.text()
raise Exception(f"API error {response.status}: {text}")
data = await response.json()
if not isinstance(data, list):
logger.warning(f"Unexpected response format: {data}")
return []
candles = []
for item in data:
try:
candle = self._parse_candle_item(item, interval)
if candle:
candles.append(candle)
except Exception as e:
logger.warning(f"Failed to parse candle: {item}, error: {e}")
return candles
def _parse_candle_item(self, data: Dict[str, Any], interval: str) -> Optional[Candle]:
"""Parse a single candle item from API response"""
try:
# Format: {"t": 1770812400000, "T": ..., "s": "BTC", "i": "1m", "o": "67164.0", ...}
timestamp_ms = int(data.get("t", 0))
timestamp = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
return Candle(
time=timestamp,
symbol=self.coin,
interval=interval,
open=float(data.get("o", 0)),
high=float(data.get("h", 0)),
low=float(data.get("l", 0)),
close=float(data.get("c", 0)),
volume=float(data.get("v", 0))
)
except (KeyError, ValueError, TypeError) as e:
logger.error(f"Failed to parse candle data: {e}, data: {data}")
return None
async def backfill_interval(
self,
interval: str,
days_back: int = 7
) -> int:
"""
Backfill a specific interval for the last N days
Args:
interval: Candle interval
days_back: Number of days to backfill (use 0 for max available)
Returns:
Number of candles inserted
"""
if days_back == 0:
# Fetch maximum available data (5000 candles)
return await self.backfill_max(interval)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=days_back)
logger.info(f"Starting backfill for {interval}: {start_time} to {end_time}")
candles = await self.fetch_candles(interval, start_time, end_time)
if not candles:
logger.warning(f"No candles fetched for {interval}")
return 0
# Insert into database
inserted = await self.db.insert_candles(candles)
logger.info(f"Inserted {inserted} candles for {interval}")
return inserted
async def backfill_max(self, interval: str) -> int:
"""
Backfill maximum available data (5000 candles) for an interval
Args:
interval: Candle interval
Returns:
Number of candles inserted
"""
logger.info(f"Fetching maximum available {interval} data (up to 5000 candles)")
# For weekly and monthly, start from 2020 to ensure we get all available data
# Hyperliquid launched around 2023, so this should capture everything
start_time = datetime(2020, 1, 1, tzinfo=timezone.utc)
end_time = datetime.now(timezone.utc)
logger.info(f"Fetching {interval} candles from {start_time} to {end_time}")
candles = await self.fetch_candles(interval, start_time, end_time)
if not candles:
logger.warning(f"No candles fetched for {interval}")
return 0
# Insert into database
inserted = await self.db.insert_candles(candles)
logger.info(f"Inserted {inserted} candles for {interval} (max available)")
return inserted
def _interval_to_minutes(self, interval: str) -> int:
"""Convert interval string to minutes"""
mapping = {
"1m": 1, "3m": 3, "5m": 5, "15m": 15, "30m": 30,
"1h": 60, "2h": 120, "4h": 240, "8h": 480, "12h": 720,
"1d": 1440, "3d": 4320, "1w": 10080, "1M": 43200
}
return mapping.get(interval, 1)
async def backfill_all_intervals(
self,
days_back: int = 7
) -> Dict[str, int]:
"""
Backfill all configured intervals
Args:
days_back: Number of days to backfill
Returns:
Dictionary mapping interval to count inserted
"""
results = {}
for interval in self.intervals:
try:
count = await self.backfill_interval(interval, days_back)
results[interval] = count
except Exception as e:
logger.error(f"Failed to backfill {interval}: {e}")
results[interval] = 0
return results
async def get_earliest_candle_time(self, interval: str) -> Optional[datetime]:
"""Get the earliest candle time available for an interval"""
# Try fetching from epoch to find earliest available
start_time = datetime(2020, 1, 1, tzinfo=timezone.utc)
end_time = datetime.now(timezone.utc)
candles = await self.fetch_candles(interval, start_time, end_time)
if candles:
earliest = min(c.time for c in candles)
logger.info(f"Earliest {interval} candle available: {earliest}")
return earliest
return None
async def main():
"""CLI entry point for backfill"""
import argparse
import os
parser = argparse.ArgumentParser(description="Backfill Hyperliquid historical data")
parser.add_argument("--coin", default="BTC", help="Coin symbol (default: BTC)")
parser.add_argument("--intervals", nargs="+", default=["1m"],
help="Intervals to backfill (default: 1m)")
parser.add_argument("--days", type=str, default="7",
help="Days to backfill (default: 7, use 'max' for maximum available)")
parser.add_argument("--db-host", default=os.getenv("DB_HOST", "localhost"),
help="Database host (default: localhost or DB_HOST env)")
parser.add_argument("--db-port", type=int, default=int(os.getenv("DB_PORT", 5432)),
help="Database port (default: 5432 or DB_PORT env)")
parser.add_argument("--db-name", default=os.getenv("DB_NAME", "btc_data"),
help="Database name (default: btc_data or DB_NAME env)")
parser.add_argument("--db-user", default=os.getenv("DB_USER", "btc_bot"),
help="Database user (default: btc_bot or DB_USER env)")
parser.add_argument("--db-password", default=os.getenv("DB_PASSWORD", ""),
help="Database password (default: from DB_PASSWORD env)")
args = parser.parse_args()
# Parse days argument
if args.days.lower() == "max":
days_back = 0 # 0 means max available
logger.info("Backfill mode: MAX (fetching up to 5000 candles per interval)")
else:
days_back = int(args.days)
logger.info(f"Backfill mode: Last {days_back} days")
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Initialize database
db = DatabaseManager(
host=args.db_host,
port=args.db_port,
database=args.db_name,
user=args.db_user,
password=args.db_password
)
await db.connect()
try:
async with HyperliquidBackfill(db, args.coin, args.intervals) as backfill:
results = await backfill.backfill_all_intervals(days_back)
print("\n=== Backfill Summary ===")
for interval, count in results.items():
print(f"{interval}: {count} candles")
print(f"Total: {sum(results.values())} candles")
finally:
await db.disconnect()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,224 @@
"""
In-memory candle buffer with automatic batching
Optimized for low memory footprint on DS218+
"""
import asyncio
import logging
from collections import deque
from datetime import datetime, timezone
from typing import Dict, List, Optional, Callable, Any, Awaitable
from dataclasses import dataclass, field
from .websocket_client import Candle
logger = logging.getLogger(__name__)
@dataclass
class BufferStats:
"""Statistics for buffer performance monitoring"""
total_added: int = 0
total_flushed: int = 0
total_dropped: int = 0
last_flush_time: Optional[datetime] = None
avg_batch_size: float = 0.0
def to_dict(self) -> Dict[str, Any]:
return {
'total_added': self.total_added,
'total_flushed': self.total_flushed,
'total_dropped': self.total_dropped,
'last_flush_time': self.last_flush_time.isoformat() if self.last_flush_time else None,
'avg_batch_size': round(self.avg_batch_size, 2)
}
class CandleBuffer:
"""
Thread-safe circular buffer for candle data
Automatically flushes to database in batches
"""
def __init__(
self,
max_size: int = 1000,
flush_interval_seconds: float = 30.0,
batch_size: int = 100,
on_flush_callback: Optional[Callable[[List[Candle]], Awaitable[None]]] = None
):
self.max_size = max_size
self.flush_interval = flush_interval_seconds
self.batch_size = batch_size
self.on_flush = on_flush_callback
# Thread-safe buffer using deque
self._buffer: deque = deque(maxlen=max_size)
self._lock = asyncio.Lock()
self._flush_event = asyncio.Event()
self._stop_event = asyncio.Event()
self.stats = BufferStats()
self._batch_sizes: deque = deque(maxlen=100) # For averaging
# Tasks
self._flush_task: Optional[asyncio.Task] = None
async def start(self) -> None:
"""Start the background flush task"""
self._flush_task = asyncio.create_task(self._flush_loop())
logger.info(f"CandleBuffer started (max_size={self.max_size}, flush_interval={self.flush_interval}s)")
async def stop(self) -> None:
"""Stop the buffer and flush remaining data"""
self._stop_event.set()
self._flush_event.set() # Wake up flush loop
if self._flush_task:
try:
await asyncio.wait_for(self._flush_task, timeout=10.0)
except asyncio.TimeoutError:
logger.warning("Flush task did not stop in time")
# Final flush
await self.flush()
logger.info("CandleBuffer stopped")
async def add(self, candle: Candle) -> bool:
"""
Add a candle to the buffer
Returns True if added, False if buffer full and candle dropped
"""
async with self._lock:
if len(self._buffer) >= self.max_size:
logger.warning(f"Buffer full, dropping oldest candle. Size: {len(self._buffer)}")
self.stats.total_dropped += 1
self._buffer.append(candle)
self.stats.total_added += 1
# Trigger immediate flush if batch size reached
if len(self._buffer) >= self.batch_size:
self._flush_event.set()
return True
async def add_many(self, candles: List[Candle]) -> int:
"""Add multiple candles to the buffer"""
added = 0
for candle in candles:
if await self.add(candle):
added += 1
return added
async def get_batch(self, n: Optional[int] = None) -> List[Candle]:
"""Get up to N candles from buffer (without removing)"""
async with self._lock:
n = n or len(self._buffer)
return list(self._buffer)[:n]
async def flush(self) -> int:
"""
Manually flush buffer to callback
Returns number of candles flushed
"""
candles_to_flush: List[Candle] = []
async with self._lock:
if not self._buffer:
return 0
candles_to_flush = list(self._buffer)
self._buffer.clear()
if candles_to_flush and self.on_flush:
try:
await self.on_flush(candles_to_flush)
# Update stats
self.stats.total_flushed += len(candles_to_flush)
self.stats.last_flush_time = datetime.now(timezone.utc)
self._batch_sizes.append(len(candles_to_flush))
self.stats.avg_batch_size = sum(self._batch_sizes) / len(self._batch_sizes)
logger.debug(f"Flushed {len(candles_to_flush)} candles")
return len(candles_to_flush)
except Exception as e:
logger.error(f"Flush callback failed: {e}")
# Put candles back in buffer
async with self._lock:
for candle in reversed(candles_to_flush):
self._buffer.appendleft(candle)
return 0
elif candles_to_flush:
# No callback, just clear
self.stats.total_flushed += len(candles_to_flush)
return len(candles_to_flush)
return 0
async def _flush_loop(self) -> None:
"""Background task to periodically flush buffer"""
while not self._stop_event.is_set():
try:
# Wait for flush interval or until triggered
await asyncio.wait_for(
self._flush_event.wait(),
timeout=self.flush_interval
)
self._flush_event.clear()
# Flush if we have data
buffer_size = await self.get_buffer_size()
if buffer_size > 0:
await self.flush()
except asyncio.TimeoutError:
# Flush interval reached, flush if we have data
buffer_size = await self.get_buffer_size()
if buffer_size > 0:
await self.flush()
except Exception as e:
logger.error(f"Error in flush loop: {e}")
await asyncio.sleep(1)
def get_stats(self) -> BufferStats:
"""Get current buffer statistics"""
return self.stats
async def get_buffer_size(self) -> int:
"""Get current buffer size"""
async with self._lock:
return len(self._buffer)
def detect_gaps(self, candles: List[Candle]) -> List[Dict[str, Any]]:
"""
Detect gaps in candle sequence
Returns list of gap information
"""
if len(candles) < 2:
return []
gaps = []
sorted_candles = sorted(candles, key=lambda c: c.time)
for i in range(1, len(sorted_candles)):
prev = sorted_candles[i-1]
curr = sorted_candles[i]
# Calculate expected interval (1 minute)
expected_diff = 60 # seconds
actual_diff = (curr.time - prev.time).total_seconds()
if actual_diff > expected_diff * 1.5: # Allow 50% tolerance
gaps.append({
'from_time': prev.time.isoformat(),
'to_time': curr.time.isoformat(),
'missing_candles': int(actual_diff / expected_diff) - 1,
'duration_seconds': actual_diff
})
return gaps

View File

@ -0,0 +1,255 @@
"""
Database interface for TimescaleDB
Optimized for batch inserts and low resource usage
"""
import logging
from contextlib import asynccontextmanager
from datetime import datetime
from typing import List, Dict, Any, Optional
import os
import asyncpg
from asyncpg import Pool
from .websocket_client import Candle
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Manages TimescaleDB connections and operations"""
def __init__(
self,
host: str = None,
port: int = None,
database: str = None,
user: str = None,
password: str = None,
pool_size: int = 5
):
self.host = host or os.getenv('DB_HOST', 'localhost')
self.port = port or int(os.getenv('DB_PORT', 5432))
self.database = database or os.getenv('DB_NAME', 'btc_data')
self.user = user or os.getenv('DB_USER', 'btc_bot')
self.password = password or os.getenv('DB_PASSWORD', '')
self.pool_size = pool_size
self.pool: Optional[Pool] = None
async def connect(self) -> None:
"""Initialize connection pool"""
try:
self.pool = await asyncpg.create_pool(
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password,
min_size=1,
max_size=self.pool_size,
command_timeout=60
)
# Test connection
async with self.pool.acquire() as conn:
version = await conn.fetchval('SELECT version()')
logger.info(f"Connected to database: {version[:50]}...")
logger.info(f"Database pool created (size: {self.pool_size})")
except Exception as e:
logger.error(f"Failed to connect to database: {type(e).__name__}: {e!r}")
raise
async def disconnect(self) -> None:
"""Close connection pool"""
if self.pool:
await self.pool.close()
logger.info("Database pool closed")
@asynccontextmanager
async def acquire(self):
"""Context manager for acquiring connection"""
if not self.pool:
raise RuntimeError("Database not connected")
async with self.pool.acquire() as conn:
yield conn
async def insert_candles(self, candles: List[Candle]) -> int:
"""
Batch insert candles into database
Uses ON CONFLICT to handle duplicates
"""
if not candles:
return 0
# Prepare values for batch insert
values = [
(
c.time,
c.symbol,
c.interval,
c.open,
c.high,
c.low,
c.close,
c.volume,
False, # validated
'hyperliquid' # source
)
for c in candles
]
async with self.acquire() as conn:
# Use execute_many for efficient batch insert
result = await conn.executemany('''
INSERT INTO candles (time, symbol, interval, open, high, low, close, volume, validated, source)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (time, symbol, interval)
DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume,
source = EXCLUDED.source
''', values)
inserted = len(candles)
logger.debug(f"Inserted/updated {inserted} candles")
return inserted
async def get_candles(
self,
symbol: str,
interval: str,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
limit: int = 1000
) -> List[Dict[str, Any]]:
"""Query candles from database"""
query = '''
SELECT time, symbol, interval, open, high, low, close, volume, validated
FROM candles
WHERE symbol = $1 AND interval = $2
'''
params = [symbol, interval]
if start:
query += ' AND time >= $3'
params.append(start)
if end:
query += f' AND time <= ${len(params) + 1}'
params.append(end)
query += f' ORDER BY time DESC LIMIT ${len(params) + 1}'
params.append(limit)
async with self.acquire() as conn:
rows = await conn.fetch(query, *params)
return [dict(row) for row in rows]
async def get_latest_candle(self, symbol: str, interval: str) -> Optional[Dict[str, Any]]:
"""Get the most recent candle for a symbol"""
async with self.acquire() as conn:
row = await conn.fetchrow('''
SELECT time, symbol, interval, open, high, low, close, volume
FROM candles
WHERE symbol = $1 AND interval = $2
ORDER BY time DESC
LIMIT 1
''', symbol, interval)
return dict(row) if row else None
async def detect_gaps(
self,
symbol: str,
interval: str,
since: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""
Detect missing candles in the database
Uses SQL window functions for efficiency
"""
since = since or datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
async with self.acquire() as conn:
# Find gaps using lead/lag window functions
rows = await conn.fetch('''
WITH ordered AS (
SELECT
time,
LAG(time) OVER (ORDER BY time) as prev_time
FROM candles
WHERE symbol = $1
AND interval = $2
AND time >= $3
ORDER BY time
)
SELECT
prev_time as gap_start,
time as gap_end,
EXTRACT(EPOCH FROM (time - prev_time)) / 60 - 1 as missing_candles
FROM ordered
WHERE time - prev_time > INTERVAL '2 minutes'
ORDER BY prev_time
''', symbol, interval, since)
return [
{
'gap_start': row['gap_start'].isoformat(),
'gap_end': row['gap_end'].isoformat(),
'missing_candles': int(row['missing_candles'])
}
for row in rows
]
async def log_quality_issue(
self,
check_type: str,
severity: str,
symbol: Optional[str] = None,
details: Optional[Dict[str, Any]] = None
) -> None:
"""Log a data quality issue"""
async with self.acquire() as conn:
await conn.execute('''
INSERT INTO data_quality (check_type, severity, symbol, details)
VALUES ($1, $2, $3, $4)
''', check_type, severity, symbol, details)
logger.warning(f"Quality issue logged: {check_type} ({severity})")
async def get_health_stats(self) -> Dict[str, Any]:
"""Get database health statistics"""
async with self.acquire() as conn:
# Get table sizes
table_stats = await conn.fetch('''
SELECT
relname as table_name,
pg_size_pretty(pg_total_relation_size(relid)) as size,
n_live_tup as row_count
FROM pg_stat_user_tables
WHERE relname IN ('candles', 'indicators', 'data_quality')
''')
# Get latest candles
latest = await conn.fetch('''
SELECT symbol, MAX(time) as last_time, COUNT(*) as count
FROM candles
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY symbol
''')
return {
'tables': [dict(row) for row in table_stats],
'latest_candles': [dict(row) for row in latest],
'unresolved_issues': await conn.fetchval('''
SELECT COUNT(*) FROM data_quality WHERE resolved = FALSE
''')
}

236
src/data_collector/main.py Normal file
View File

@ -0,0 +1,236 @@
"""
Main entry point for data collector service
Integrates WebSocket client, buffer, and database
"""
import asyncio
import logging
import signal
import sys
from datetime import datetime, timezone
from typing import Optional
import os
from .websocket_client import HyperliquidWebSocket, Candle
from .candle_buffer import CandleBuffer
from .database import DatabaseManager
# Configure logging
logging.basicConfig(
level=getattr(logging, os.getenv('LOG_LEVEL', 'INFO')),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('/app/logs/collector.log') if os.path.exists('/app/logs') else logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class DataCollector:
"""
Main data collection orchestrator
Manages WebSocket connection, buffering, and database writes
"""
def __init__(
self,
symbol: str = "BTC",
interval: str = "1m"
):
self.symbol = symbol
self.interval = interval
# Components
self.db: Optional[DatabaseManager] = None
self.buffer: Optional[CandleBuffer] = None
self.websocket: Optional[HyperliquidWebSocket] = None
# State
self.is_running = False
self._stop_event = asyncio.Event()
self._tasks = []
async def start(self) -> None:
"""Initialize and start all components"""
logger.info(f"Starting DataCollector for {self.symbol}")
try:
# Initialize database
self.db = DatabaseManager()
await self.db.connect()
# Initialize buffer
self.buffer = CandleBuffer(
max_size=1000,
flush_interval_seconds=30,
batch_size=100,
on_flush_callback=self._on_buffer_flush
)
await self.buffer.start()
# Initialize WebSocket client
self.websocket = HyperliquidWebSocket(
symbol=self.symbol,
interval=self.interval,
on_candle_callback=self._on_candle,
on_error_callback=self._on_error
)
# Setup signal handlers
self._setup_signal_handlers()
# Connect to WebSocket
await self.websocket.connect()
# Start main loops
self.is_running = True
self._tasks = [
asyncio.create_task(self.websocket.receive_loop()),
asyncio.create_task(self._health_check_loop()),
asyncio.create_task(self._monitoring_loop())
]
logger.info("DataCollector started successfully")
# Wait for stop signal
await self._stop_event.wait()
except Exception as e:
logger.error(f"Failed to start DataCollector: {type(e).__name__}: {e!r}")
raise
finally:
await self.stop()
async def stop(self) -> None:
"""Graceful shutdown"""
if not self.is_running:
return
logger.info("Stopping DataCollector...")
self.is_running = False
self._stop_event.set()
# Cancel tasks
for task in self._tasks:
if not task.done():
task.cancel()
# Wait for tasks to complete
if self._tasks:
await asyncio.gather(*self._tasks, return_exceptions=True)
# Stop components
if self.websocket:
await self.websocket.disconnect()
if self.buffer:
await self.buffer.stop()
if self.db:
await self.db.disconnect()
logger.info("DataCollector stopped")
async def _on_candle(self, candle: Candle) -> None:
"""Handle incoming candle from WebSocket"""
try:
# Add to buffer
await self.buffer.add(candle)
logger.debug(f"Received candle: {candle.time} - Close: {candle.close}")
except Exception as e:
logger.error(f"Error processing candle: {e}")
async def _on_buffer_flush(self, candles: list) -> None:
"""Handle buffer flush - write to database"""
try:
inserted = await self.db.insert_candles(candles)
logger.info(f"Flushed {inserted} candles to database")
except Exception as e:
logger.error(f"Failed to write candles to database: {e}")
raise # Re-raise to trigger buffer retry
async def _on_error(self, error: Exception) -> None:
"""Handle WebSocket errors"""
logger.error(f"WebSocket error: {error}")
# Could implement alerting here (Telegram, etc.)
async def _health_check_loop(self) -> None:
"""Periodic health checks"""
while self.is_running:
try:
await asyncio.sleep(60) # Check every minute
if not self.is_running:
break
# Check WebSocket health
health = self.websocket.get_connection_health()
if health['seconds_since_last_message'] and health['seconds_since_last_message'] > 120:
logger.warning("No messages received for 2+ minutes")
# Could trigger reconnection or alert
# Log stats
buffer_stats = self.buffer.get_stats()
logger.info(f"Health: {health}, Buffer: {buffer_stats.to_dict()}")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in health check: {e}")
async def _monitoring_loop(self) -> None:
"""Periodic monitoring and maintenance tasks"""
while self.is_running:
try:
await asyncio.sleep(300) # Every 5 minutes
if not self.is_running:
break
# Detect gaps
gaps = await self.db.detect_gaps(self.symbol, self.interval)
if gaps:
logger.warning(f"Detected {len(gaps)} data gaps: {gaps}")
# Could trigger backfill here
# Log database health
health = await self.db.get_health_stats()
logger.info(f"Database health: {health}")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in monitoring loop: {e}")
def _setup_signal_handlers(self) -> None:
"""Setup handlers for graceful shutdown"""
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down...")
asyncio.create_task(self.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def main():
"""Main entry point"""
collector = DataCollector(
symbol="BTC",
interval="1m"
)
try:
await collector.start()
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.error(f"Fatal error: {type(e).__name__}: {e!r}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,300 @@
"""
Hyperliquid WebSocket Client for cbBTC Data Collection
Optimized for Synology DS218+ with automatic reconnection
"""
import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import Optional, Dict, Any, Callable, Awaitable, List
from dataclasses import dataclass
import websockets
from websockets.exceptions import ConnectionClosed, InvalidStatusCode
from websockets.typing import Data
logger = logging.getLogger(__name__)
@dataclass
class Candle:
"""Represents a single candlestick"""
time: datetime
symbol: str
interval: str
open: float
high: float
low: float
close: float
volume: float
def to_dict(self) -> Dict[str, Any]:
return {
'time': self.time,
'symbol': self.symbol,
'interval': self.interval,
'open': self.open,
'high': self.high,
'low': self.low,
'close': self.close,
'volume': self.volume
}
class HyperliquidWebSocket:
"""
WebSocket client for Hyperliquid exchange
Handles connection, reconnection, and candle data parsing
"""
def __init__(
self,
symbol: str = "BTC",
interval: str = "1m",
url: str = "wss://api.hyperliquid.xyz/ws",
reconnect_delays: Optional[List[int]] = None,
on_candle_callback: Optional[Callable[[Candle], Awaitable[None]]] = None,
on_error_callback: Optional[Callable[[Exception], Awaitable[None]]] = None
):
self.symbol = symbol
self.interval = interval
self.url = url
self.reconnect_delays = reconnect_delays or [1, 2, 5, 10, 30, 60, 120, 300, 600, 900]
self.on_candle = on_candle_callback
self.on_error = on_error_callback
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.is_running = False
self.reconnect_count = 0
self.last_message_time: Optional[datetime] = None
self.last_candle_time: Optional[datetime] = None
self._should_stop = False
async def connect(self) -> None:
"""Establish WebSocket connection with subscription"""
try:
logger.info(f"Connecting to Hyperliquid WebSocket: {self.url}")
self.websocket = await websockets.connect(
self.url,
ping_interval=None,
ping_timeout=None,
close_timeout=10
)
# Subscribe to candle data
subscribe_msg = {
"method": "subscribe",
"subscription": {
"type": "candle",
"coin": self.symbol,
"interval": self.interval
}
}
await self.websocket.send(json.dumps(subscribe_msg))
response = await self.websocket.recv()
logger.info(f"Subscription response: {response}")
self.reconnect_count = 0
self.is_running = True
logger.info(f"Successfully connected and subscribed to {self.symbol} {self.interval} candles")
except Exception as e:
logger.error(f"Failed to connect: {e}")
raise
async def disconnect(self) -> None:
"""Gracefully close connection"""
self._should_stop = True
self.is_running = False
if self.websocket:
try:
await self.websocket.close()
logger.info("WebSocket connection closed")
except Exception as e:
logger.warning(f"Error closing WebSocket: {e}")
async def receive_loop(self) -> None:
"""Main message receiving loop"""
while self.is_running and not self._should_stop:
try:
if not self.websocket:
raise ConnectionClosed(None, None)
message = await self.websocket.recv()
self.last_message_time = datetime.now(timezone.utc)
await self._handle_message(message)
except ConnectionClosed as e:
if self._should_stop:
break
logger.warning(f"WebSocket connection closed: {e}")
await self._handle_reconnect()
except Exception as e:
logger.error(f"Error in receive loop: {e}")
if self.on_error:
await self.on_error(e)
await asyncio.sleep(1)
async def _handle_message(self, message: Data) -> None:
"""Parse and process incoming WebSocket message"""
try:
# Convert bytes to string if necessary
if isinstance(message, bytes):
message = message.decode('utf-8')
data = json.loads(message)
# Handle subscription confirmation
if data.get("channel") == "subscriptionResponse":
logger.info(f"Subscription confirmed: {data}")
return
# Handle candle data
if data.get("channel") == "candle":
candle_data = data.get("data", {})
if candle_data:
candle = self._parse_candle(candle_data)
if candle:
self.last_candle_time = candle.time
if self.on_candle:
await self.on_candle(candle)
# Handle ping/pong
if "ping" in data and self.websocket:
await self.websocket.send(json.dumps({"pong": data["ping"]}))
except json.JSONDecodeError as e:
logger.error(f"Failed to parse message: {e}")
except Exception as e:
logger.error(f"Error handling message: {e}")
def _parse_candle(self, data: Any) -> Optional[Candle]:
"""Parse candle data from WebSocket message"""
try:
# Hyperliquid candle format: [open, high, low, close, volume, timestamp]
if isinstance(data, list) and len(data) >= 6:
open_price = float(data[0])
high = float(data[1])
low = float(data[2])
close = float(data[3])
volume = float(data[4])
timestamp_ms = int(data[5])
elif isinstance(data, dict):
# New format: {'t': 1770812400000, 'T': ..., 's': 'BTC', 'i': '1m', 'o': '67164.0', 'c': ..., 'h': ..., 'l': ..., 'v': ..., 'n': ...}
if 't' in data and 'o' in data:
open_price = float(data.get("o", 0))
high = float(data.get("h", 0))
low = float(data.get("l", 0))
close = float(data.get("c", 0))
volume = float(data.get("v", 0))
timestamp_ms = int(data.get("t", 0))
else:
# Old format fallback
open_price = float(data.get("open", 0))
high = float(data.get("high", 0))
low = float(data.get("low", 0))
close = float(data.get("close", 0))
volume = float(data.get("volume", 0))
timestamp_ms = int(data.get("time", 0))
else:
logger.warning(f"Unknown candle format: {data}")
return None
timestamp = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
return Candle(
time=timestamp,
symbol=self.symbol,
interval=self.interval,
open=open_price,
high=high,
low=low,
close=close,
volume=volume
)
except (KeyError, ValueError, TypeError) as e:
logger.error(f"Failed to parse candle data: {e}, data: {data}")
return None
async def _handle_reconnect(self) -> None:
"""Handle reconnection with exponential backoff"""
if self._should_stop:
return
if self.reconnect_count >= len(self.reconnect_delays):
logger.error("Max reconnection attempts reached")
self.is_running = False
if self.on_error:
await self.on_error(Exception("Max reconnection attempts reached"))
return
delay = self.reconnect_delays[self.reconnect_count]
self.reconnect_count += 1
logger.info(f"Reconnecting in {delay} seconds (attempt {self.reconnect_count})...")
await asyncio.sleep(delay)
try:
await self.connect()
except Exception as e:
logger.error(f"Reconnection failed: {e}")
def get_connection_health(self) -> Dict[str, Any]:
"""Return connection health metrics"""
now = datetime.now(timezone.utc)
return {
"is_connected": self.websocket is not None and self.is_running,
"is_running": self.is_running,
"reconnect_count": self.reconnect_count,
"last_message_time": self.last_message_time.isoformat() if self.last_message_time else None,
"last_candle_time": self.last_candle_time.isoformat() if self.last_candle_time else None,
"seconds_since_last_message": (now - self.last_message_time).total_seconds() if self.last_message_time else None
}
async def test_websocket():
"""Test function for WebSocket client"""
candles_received = []
stop_event = asyncio.Event()
async def on_candle(candle: Candle):
candles_received.append(candle)
print(f"Candle: {candle.time} - O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}")
if len(candles_received) >= 5:
print("Received 5 candles, stopping...")
stop_event.set()
client = HyperliquidWebSocket(
symbol="cbBTC-PERP",
interval="1m",
on_candle_callback=on_candle
)
try:
await client.connect()
# Run receive loop in background
receive_task = asyncio.create_task(client.receive_loop())
# Wait for stop event
await stop_event.wait()
await client.disconnect()
await receive_task
except KeyboardInterrupt:
print("\nStopping...")
finally:
await client.disconnect()
print(f"Total candles received: {len(candles_received)}")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
asyncio.run(test_websocket())