- Split inline JS into separate ES module files (indicators/, strategies/, ui/, utils/) - Fix brain.py strategy registry to use MAStrategy directly instead of missing modules - Add auto-backfill for detected data gaps in collector monitoring loop - Fix chart resize on sidebar toggle - Fix chart scrollToTime -> setVisibleLogicalRange
227 lines
7.5 KiB
Python
227 lines
7.5 KiB
Python
"""
|
|
Brain - Strategy evaluation and decision logging
|
|
Pure strategy logic separated from DB I/O for testability
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Optional, Any, List, Callable
|
|
|
|
from .database import DatabaseManager
|
|
from .indicator_engine import IndicatorEngine
|
|
from src.strategies.base import BaseStrategy, StrategySignal, SignalType
|
|
from src.strategies.ma_strategy import MAStrategy
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def _create_ma44() -> BaseStrategy:
|
|
return MAStrategy(config={"period": 44})
|
|
|
|
def _create_ma125() -> BaseStrategy:
|
|
return MAStrategy(config={"period": 125})
|
|
|
|
STRATEGY_REGISTRY: Dict[str, Callable[[], BaseStrategy]] = {
|
|
"ma_trend": MAStrategy,
|
|
"ma44_strategy": _create_ma44,
|
|
"ma125_strategy": _create_ma125,
|
|
}
|
|
|
|
def load_strategy(strategy_name: str) -> BaseStrategy:
|
|
"""Load a strategy instance from registry"""
|
|
if strategy_name not in STRATEGY_REGISTRY:
|
|
logger.warning(f"Strategy {strategy_name} not found, defaulting to ma_trend")
|
|
strategy_name = "ma_trend"
|
|
|
|
factory = STRATEGY_REGISTRY[strategy_name]
|
|
return factory()
|
|
|
|
@dataclass
|
|
class Decision:
|
|
"""A single brain evaluation result"""
|
|
time: datetime
|
|
symbol: str
|
|
interval: str
|
|
decision_type: str # "buy", "sell", "hold" -> Now maps to SignalType
|
|
strategy: str
|
|
confidence: float
|
|
price_at_decision: float
|
|
indicator_snapshot: Dict[str, Any]
|
|
candle_snapshot: Dict[str, Any]
|
|
reasoning: str
|
|
backtest_id: Optional[str] = None
|
|
|
|
def to_db_tuple(self) -> tuple:
|
|
"""Convert to positional tuple for DB insert"""
|
|
return (
|
|
self.time,
|
|
self.symbol,
|
|
self.interval,
|
|
self.decision_type,
|
|
self.strategy,
|
|
self.confidence,
|
|
self.price_at_decision,
|
|
json.dumps(self.indicator_snapshot),
|
|
json.dumps(self.candle_snapshot),
|
|
self.reasoning,
|
|
self.backtest_id,
|
|
)
|
|
|
|
|
|
class Brain:
|
|
"""
|
|
Evaluates market conditions using a loaded Strategy.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
db: DatabaseManager,
|
|
indicator_engine: IndicatorEngine,
|
|
strategy: str = "ma44_strategy",
|
|
):
|
|
self.db = db
|
|
self.indicator_engine = indicator_engine
|
|
self.strategy_name = strategy
|
|
self.active_strategy: BaseStrategy = load_strategy(strategy)
|
|
|
|
logger.info(f"Brain initialized with strategy: {self.active_strategy.name}")
|
|
|
|
async def evaluate(
|
|
self,
|
|
symbol: str,
|
|
interval: str,
|
|
timestamp: datetime,
|
|
indicators: Optional[Dict[str, float]] = None,
|
|
backtest_id: Optional[str] = None,
|
|
current_position: Optional[Dict[str, Any]] = None,
|
|
) -> Decision:
|
|
"""
|
|
Evaluate market conditions and produce a decision.
|
|
"""
|
|
# Get indicator values
|
|
if indicators is None:
|
|
indicators = await self.indicator_engine.get_values_at(
|
|
symbol, interval, timestamp
|
|
)
|
|
|
|
# Get the triggering candle
|
|
candle = await self._get_candle(symbol, interval, timestamp)
|
|
if not candle:
|
|
return self._create_empty_decision(timestamp, symbol, interval, indicators, backtest_id)
|
|
|
|
price = float(candle["close"])
|
|
candle_dict = {
|
|
"time": candle["time"].isoformat(),
|
|
"open": float(candle["open"]),
|
|
"high": float(candle["high"]),
|
|
"low": float(candle["low"]),
|
|
"close": price,
|
|
"volume": float(candle["volume"]),
|
|
}
|
|
|
|
# Delegate to Strategy
|
|
signal: StrategySignal = self.active_strategy.analyze(
|
|
candle_dict, indicators, current_position
|
|
)
|
|
|
|
# Build decision
|
|
decision = Decision(
|
|
time=timestamp,
|
|
symbol=symbol,
|
|
interval=interval,
|
|
decision_type=signal.type.value,
|
|
strategy=self.strategy_name,
|
|
confidence=signal.confidence,
|
|
price_at_decision=price,
|
|
indicator_snapshot=indicators,
|
|
candle_snapshot=candle_dict,
|
|
reasoning=signal.reasoning,
|
|
backtest_id=backtest_id,
|
|
)
|
|
|
|
# Store to DB
|
|
await self._store_decision(decision)
|
|
|
|
return decision
|
|
|
|
def _create_empty_decision(self, timestamp, symbol, interval, indicators, backtest_id):
|
|
return Decision(
|
|
time=timestamp,
|
|
symbol=symbol,
|
|
interval=interval,
|
|
decision_type="hold",
|
|
strategy=self.strategy_name,
|
|
confidence=0.0,
|
|
price_at_decision=0.0,
|
|
indicator_snapshot=indicators or {},
|
|
candle_snapshot={},
|
|
reasoning="No candle data available",
|
|
backtest_id=backtest_id,
|
|
)
|
|
|
|
async def _get_candle(
|
|
self,
|
|
symbol: str,
|
|
interval: str,
|
|
timestamp: datetime,
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Fetch a specific candle from the database"""
|
|
async with self.db.acquire() as conn:
|
|
row = await conn.fetchrow("""
|
|
SELECT time, open, high, low, close, volume
|
|
FROM candles
|
|
WHERE symbol = $1 AND interval = $2 AND time = $3
|
|
""", symbol, interval, timestamp)
|
|
|
|
return dict(row) if row else None
|
|
|
|
async def _store_decision(self, decision: Decision) -> None:
|
|
"""Write decision to the decisions table"""
|
|
# Note: We might want to skip writing every single HOLD to DB to save space if simulating millions of candles
|
|
# But keeping it for now for full traceability
|
|
async with self.db.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO decisions (
|
|
time, symbol, interval, decision_type, strategy,
|
|
confidence, price_at_decision, indicator_snapshot,
|
|
candle_snapshot, reasoning, backtest_id
|
|
)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
|
""", *decision.to_db_tuple())
|
|
|
|
async def get_recent_decisions(
|
|
self,
|
|
symbol: str,
|
|
limit: int = 20,
|
|
backtest_id: Optional[str] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get recent decisions, optionally filtered by backtest_id"""
|
|
async with self.db.acquire() as conn:
|
|
if backtest_id is not None:
|
|
rows = await conn.fetch("""
|
|
SELECT time, symbol, interval, decision_type, strategy,
|
|
confidence, price_at_decision, indicator_snapshot,
|
|
candle_snapshot, reasoning, backtest_id
|
|
FROM decisions
|
|
WHERE symbol = $1 AND backtest_id = $2
|
|
ORDER BY time DESC
|
|
LIMIT $3
|
|
""", symbol, backtest_id, limit)
|
|
else:
|
|
rows = await conn.fetch("""
|
|
SELECT time, symbol, interval, decision_type, strategy,
|
|
confidence, price_at_decision, indicator_snapshot,
|
|
candle_snapshot, reasoning, backtest_id
|
|
FROM decisions
|
|
WHERE symbol = $1 AND backtest_id IS NULL
|
|
ORDER BY time DESC
|
|
LIMIT $2
|
|
""", symbol, limit)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
def reset_state(self) -> None:
|
|
"""Reset internal state tracking"""
|
|
pass
|