"""Sensor platform for Pstryk Energy integration.""" import logging import asyncio import math from datetime import timedelta from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback from homeassistant.components.sensor import SensorEntity, SensorStateClass, SensorDeviceClass from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.helpers.restore_state import RestoreEntity from homeassistant.helpers.event import async_track_state_change_event from homeassistant.util import dt as dt_util from .update_coordinator import PstrykDataUpdateCoordinator from .energy_cost_coordinator import PstrykCostDataUpdateCoordinator from .api_client import PstrykAPIClient from .const import ( DOMAIN, CONF_MQTT_48H_MODE, CONF_RETRY_ATTEMPTS, CONF_RETRY_DELAY, DEFAULT_RETRY_ATTEMPTS, DEFAULT_RETRY_DELAY, # Battery recommendation constants CONF_BATTERY_ENABLED, CONF_BATTERY_SOC_ENTITY, CONF_BATTERY_CAPACITY, CONF_BATTERY_CHARGE_RATE, CONF_BATTERY_DISCHARGE_RATE, CONF_BATTERY_MIN_SOC, CONF_BATTERY_CHARGE_HOURS, CONF_BATTERY_DISCHARGE_MULTIPLIER, DEFAULT_BATTERY_CAPACITY, DEFAULT_BATTERY_CHARGE_RATE, DEFAULT_BATTERY_DISCHARGE_RATE, DEFAULT_BATTERY_MIN_SOC, DEFAULT_BATTERY_CHARGE_HOURS, DEFAULT_BATTERY_DISCHARGE_MULTIPLIER, ) from homeassistant.helpers.translation import async_get_translations _LOGGER = logging.getLogger(__name__) # Store translations globally to avoid reloading for each sensor _TRANSLATIONS_CACHE = {} # Cache for manifest version - load at module import time (outside event loop) _VERSION_CACHE = None def _load_version_sync() -> str: """Load version synchronously at module import time.""" try: import json import os manifest_path = os.path.join(os.path.dirname(__file__), "manifest.json") with open(manifest_path, "r") as f: manifest = json.load(f) return manifest.get("version", "unknown") except Exception: return "unknown" # Load version once at module import (not in event loop) _VERSION_CACHE = _load_version_sync() def get_integration_version(hass: HomeAssistant) -> str: """Get integration version from manifest.json.""" return _VERSION_CACHE async def async_setup_entry( hass: HomeAssistant, entry: ConfigEntry, async_add_entities, ) -> None: """Set up the Pstryk sensors via the coordinator.""" api_key = hass.data[DOMAIN][entry.entry_id]["api_key"] buy_top = entry.options.get("buy_top", entry.data.get("buy_top", 5)) sell_top = entry.options.get("sell_top", entry.data.get("sell_top", 5)) buy_worst = entry.options.get("buy_worst", entry.data.get("buy_worst", 5)) sell_worst = entry.options.get("sell_worst", entry.data.get("sell_worst", 5)) mqtt_48h_mode = entry.options.get(CONF_MQTT_48H_MODE, False) retry_attempts = entry.options.get(CONF_RETRY_ATTEMPTS, DEFAULT_RETRY_ATTEMPTS) retry_delay = entry.options.get(CONF_RETRY_DELAY, DEFAULT_RETRY_DELAY) _LOGGER.debug("Setting up Pstryk sensors with buy_top=%d, sell_top=%d, buy_worst=%d, sell_worst=%d, mqtt_48h_mode=%s, retry_attempts=%d, retry_delay=%ds", buy_top, sell_top, buy_worst, sell_worst, mqtt_48h_mode, retry_attempts, retry_delay) # Load translations once for all sensors global _TRANSLATIONS_CACHE if not _TRANSLATIONS_CACHE: try: _TRANSLATIONS_CACHE = await async_get_translations( hass, hass.config.language, DOMAIN ) except Exception as ex: _LOGGER.warning("Failed to load translations: %s", ex) _TRANSLATIONS_CACHE = {} # Cleanup old coordinators if they exist for price_type in ("buy", "sell"): key = f"{entry.entry_id}_{price_type}" coordinator = hass.data[DOMAIN].get(key) if coordinator: _LOGGER.debug("Cleaning up existing %s coordinator", price_type) # Cancel scheduled updates if hasattr(coordinator, '_unsub_hourly') and coordinator._unsub_hourly: coordinator._unsub_hourly() if hasattr(coordinator, '_unsub_midnight') and coordinator._unsub_midnight: coordinator._unsub_midnight() if hasattr(coordinator, '_unsub_afternoon') and coordinator._unsub_afternoon: coordinator._unsub_afternoon() # Remove from hass data hass.data[DOMAIN].pop(key, None) # Cleanup old cost coordinator if exists cost_key = f"{entry.entry_id}_cost" cost_coordinator = hass.data[DOMAIN].get(cost_key) if cost_coordinator: _LOGGER.debug("Cleaning up existing cost coordinator") if hasattr(cost_coordinator, '_unsub_hourly') and cost_coordinator._unsub_hourly: cost_coordinator._unsub_hourly() if hasattr(cost_coordinator, '_unsub_midnight') and cost_coordinator._unsub_midnight: cost_coordinator._unsub_midnight() hass.data[DOMAIN].pop(cost_key, None) # Create shared API client (or reuse existing one) api_client_key = f"{entry.entry_id}_api_client" if api_client_key not in hass.data[DOMAIN]: api_client = PstrykAPIClient(hass, api_key) hass.data[DOMAIN][api_client_key] = api_client else: api_client = hass.data[DOMAIN][api_client_key] entities = [] coordinators = [] # Create price coordinators first for price_type in ("buy", "sell"): key = f"{entry.entry_id}_{price_type}" coordinator = PstrykDataUpdateCoordinator( hass, api_client, price_type, mqtt_48h_mode, retry_attempts, retry_delay ) coordinators.append((coordinator, price_type, key)) # Create cost coordinator (will be initialized as unavailable for lazy loading) cost_coordinator = PstrykCostDataUpdateCoordinator(hass, api_client) cost_coordinator.last_update_success = False coordinators.append((cost_coordinator, "cost", cost_key)) # Initialize ONLY price coordinators immediately (fast startup) # Cost coordinator will be loaded lazily in background _LOGGER.info("Starting quick initialization - loading price coordinators only") async def safe_initial_fetch(coord, coord_type): """Safely fetch initial data for coordinator with timeout.""" try: # Add timeout to prevent blocking startup data = await asyncio.wait_for( coord._async_update_data(), timeout=20.0 # 20 seconds max per coordinator ) coord.data = data coord.last_update_success = True _LOGGER.debug("Successfully initialized %s coordinator", coord_type) return True except asyncio.TimeoutError: _LOGGER.warning("Timeout initializing %s coordinator - will retry later", coord_type) coord.last_update_success = False return False except Exception as err: _LOGGER.error("Failed initial fetch for %s coordinator: %s", coord_type, err) coord.last_update_success = False return err # Load only price coordinators immediately for fast startup price_coordinators = [(c, t, k) for c, t, k in coordinators if t in ("buy", "sell")] initial_refresh_tasks = [ safe_initial_fetch(coordinator, coordinator_type) for coordinator, coordinator_type, _ in price_coordinators ] refresh_results = await asyncio.gather(*initial_refresh_tasks, return_exceptions=True) # Track failed coordinators for quick retry failed_coordinators = [] # Check results for price coordinators for i, (coordinator, coordinator_type, key) in enumerate(price_coordinators): if isinstance(refresh_results[i], Exception) or refresh_results[i] is False: _LOGGER.warning("Coordinator %s failed initial load - scheduling retry with backoff", coordinator_type) failed_coordinators.append((coordinator, coordinator_type)) # Schedule exponential backoff retry for failed coordinators # Delays: 2, 4, 8, 16, 32 minutes (5 attempts) if failed_coordinators: async def exponential_backoff_retry(): """Retry failed coordinators with exponential backoff.""" base_delay = 120 # 2 minutes max_attempts = 5 for attempt in range(max_attempts): delay = base_delay * (2 ** attempt) # 2, 4, 8, 16, 32 minutes # Check if any coordinators still need retry coords_to_retry = [ (c, t) for c, t in failed_coordinators if not c.last_update_success ] if not coords_to_retry: _LOGGER.info("All coordinators recovered, stopping backoff retry") return _LOGGER.info( "Backoff retry attempt %d/%d in %d seconds for %d coordinator(s)", attempt + 1, max_attempts, delay, len(coords_to_retry) ) await asyncio.sleep(delay) for coord, coord_type in coords_to_retry: if not coord.last_update_success: _LOGGER.info("Retry attempt %d for %s coordinator", attempt + 1, coord_type) try: await coord.async_request_refresh() if coord.last_update_success: _LOGGER.info("%s coordinator recovered on attempt %d", coord_type, attempt + 1) except Exception as e: _LOGGER.warning("Retry attempt %d failed for %s: %s", attempt + 1, coord_type, e) # Final check still_failed = [t for c, t in failed_coordinators if not c.last_update_success] if still_failed: _LOGGER.error( "Coordinators %s failed after %d retry attempts. Will use hourly schedule.", still_failed, max_attempts ) hass.async_create_task(exponential_backoff_retry()) # Store all coordinators and set up scheduling buy_coord = None sell_coord = None for coordinator, coordinator_type, key in coordinators: # Store coordinator hass.data[DOMAIN][key] = coordinator # Schedule updates for price coordinators if coordinator_type in ("buy", "sell"): coordinator.schedule_hourly_update() coordinator.schedule_midnight_update() # Schedule afternoon update if 48h mode is enabled if mqtt_48h_mode: coordinator.schedule_afternoon_update() # Create ONLY current price sensors (fast, immediate) top = buy_top if coordinator_type == "buy" else sell_top worst = buy_worst if coordinator_type == "buy" else sell_worst entities.append(PstrykPriceSensor(coordinator, coordinator_type, top, worst, entry.entry_id)) # Store coordinator references for later use if coordinator_type == "buy": buy_coord = coordinator elif coordinator_type == "sell": sell_coord = coordinator # Schedule updates for cost coordinator elif coordinator_type == "cost": coordinator.schedule_hourly_update() coordinator.schedule_midnight_update() # Create remaining sensors (average price + financial balance) - they will show as unavailable initially remaining_entities = [] # Create average price sensors for buy if buy_coord: for period in ("daily", "monthly", "yearly"): remaining_entities.append(PstrykAveragePriceSensor( cost_coordinator, buy_coord, period, entry.entry_id )) # Create average price sensors for sell if sell_coord: for period in ("daily", "monthly", "yearly"): remaining_entities.append(PstrykAveragePriceSensor( cost_coordinator, sell_coord, period, entry.entry_id )) # Create financial balance sensors for period in ("daily", "monthly", "yearly"): remaining_entities.append(PstrykFinancialBalanceSensor( cost_coordinator, period, entry.entry_id )) # Create battery recommendation sensor if enabled battery_enabled = entry.options.get(CONF_BATTERY_ENABLED, False) if battery_enabled and buy_coord: battery_sensor = PstrykBatteryRecommendationSensor( coordinator=buy_coord, entry_id=entry.entry_id, soc_entity_id=entry.options.get(CONF_BATTERY_SOC_ENTITY, ""), capacity=entry.options.get(CONF_BATTERY_CAPACITY, DEFAULT_BATTERY_CAPACITY), charge_rate=entry.options.get(CONF_BATTERY_CHARGE_RATE, DEFAULT_BATTERY_CHARGE_RATE), discharge_rate=entry.options.get(CONF_BATTERY_DISCHARGE_RATE, DEFAULT_BATTERY_DISCHARGE_RATE), min_soc=entry.options.get(CONF_BATTERY_MIN_SOC, DEFAULT_BATTERY_MIN_SOC), charge_hours_count=entry.options.get(CONF_BATTERY_CHARGE_HOURS, DEFAULT_BATTERY_CHARGE_HOURS), discharge_multiplier=entry.options.get(CONF_BATTERY_DISCHARGE_MULTIPLIER, DEFAULT_BATTERY_DISCHARGE_MULTIPLIER), ) remaining_entities.append(battery_sensor) _LOGGER.info("Battery recommendation sensor enabled with SoC entity: %s", entry.options.get(CONF_BATTERY_SOC_ENTITY, "not configured")) # Register ALL sensors immediately: # - Current price sensors (2) with data # - Remaining sensors (15) as unavailable until cost coordinator loads _LOGGER.info("Registering %d current price sensors with data and %d additional sensors as unavailable", len(entities), len(remaining_entities)) async_add_entities(entities + remaining_entities) # Load cost coordinator data in background - sensors will automatically update when data arrives async def lazy_load_cost_data(): """Load cost coordinator data in background - sensors update automatically via coordinator.""" _LOGGER.info("Waiting 15 seconds before loading cost coordinator data") await asyncio.sleep(15) _LOGGER.info("Loading cost coordinator data in background") try: # Load cost coordinator with all resolutions data = await cost_coordinator._async_update_data(fetch_all=True) cost_coordinator.data = data cost_coordinator.last_update_success = True # Notify all listening sensors that data is available cost_coordinator.async_update_listeners() _LOGGER.info("Cost coordinator loaded successfully - %d sensors updated", len(remaining_entities)) except Exception as err: _LOGGER.warning("Failed to load cost coordinator: %s. %d sensors remain unavailable.", err, len(remaining_entities)) cost_coordinator.last_update_success = False cost_coordinator.data = None # Start background data loading hass.async_create_task(lazy_load_cost_data()) class PstrykPriceSensor(CoordinatorEntity, SensorEntity): """Combined price sensor with table data attributes.""" # Note: state_class removed - MONETARY device_class doesn't support MEASUREMENT def __init__(self, coordinator: PstrykDataUpdateCoordinator, price_type: str, top_count: int, worst_count: int, entry_id: str): super().__init__(coordinator) self.price_type = price_type self.top_count = top_count self.worst_count = worst_count self.entry_id = entry_id self._attr_device_class = SensorDeviceClass.MONETARY self._cached_sorted_prices = None self._last_data_hash = None async def async_added_to_hass(self): """When entity is added to Home Assistant.""" await super().async_added_to_hass() @property def name(self) -> str: return f"Pstryk Current {self.price_type.title()} Price" @property def unique_id(self) -> str: return f"{DOMAIN}_{self.price_type}_price" @property def device_info(self): """Return device information.""" return { "identifiers": {(DOMAIN, "pstryk_energy")}, "name": "Pstryk Energy", "manufacturer": "Pstryk", "model": "Energy Price Monitor", "sw_version": get_integration_version(self.hass), } def _get_current_price(self): """Get current price based on current time.""" if not self.coordinator.data or not self.coordinator.data.get("prices"): return None now_utc = dt_util.utcnow() for price_entry in self.coordinator.data.get("prices", []): try: if "start" not in price_entry: continue price_datetime = dt_util.parse_datetime(price_entry["start"]) if not price_datetime: continue # Konwersja do UTC dla porównania price_datetime_utc = dt_util.as_utc(price_datetime) price_end_utc = price_datetime_utc + timedelta(hours=1) if price_datetime_utc <= now_utc < price_end_utc: return price_entry.get("price") except Exception as e: _LOGGER.error("Error determining current price: %s", str(e)) return None @property def native_value(self): if self.coordinator.data is None: return None # Próbujemy znaleźć aktualną cenę na podstawie czasu current_price = self._get_current_price() # Jeśli nie znaleźliśmy, używamy wartości z koordynatora if current_price is None: current_price = self.coordinator.data.get("current") return current_price @property def native_unit_of_measurement(self) -> str: return "PLN/kWh" def _get_next_hour_price(self) -> dict: """Get price data for the next hour.""" if not self.coordinator.data: return None now = dt_util.as_local(dt_util.utcnow()) next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) # Use translations for debug messages debug_msg = _TRANSLATIONS_CACHE.get( "debug.looking_for_next_hour", "Looking for price for next hour: {next_hour}" ).format(next_hour=next_hour.strftime("%Y-%m-%d %H:%M:%S")) _LOGGER.debug(debug_msg) # Check if we're looking for the next day's hour (midnight) is_looking_for_next_day = next_hour.day != now.day # First check in prices_today price_found = None if self.coordinator.data.get("prices_today"): for price_data in self.coordinator.data.get("prices_today", []): if "start" not in price_data: continue try: price_datetime = dt_util.parse_datetime(price_data["start"]) if not price_datetime: continue price_datetime = dt_util.as_local(price_datetime) if price_datetime.hour == next_hour.hour and price_datetime.day == next_hour.day: price_found = price_data.get("price") _LOGGER.debug("Found price for %s in today's list: %s", next_hour.strftime("%Y-%m-%d %H:%M:%S"), price_found) return price_found except Exception as e: error_msg = _TRANSLATIONS_CACHE.get( "debug.error_processing_date", "Error processing date: {error}" ).format(error=str(e)) _LOGGER.error(error_msg) # Always check the full list as a fallback, regardless of day if self.coordinator.data.get("prices"): _LOGGER.debug("Looking for price in full 48h list as fallback") for price_data in self.coordinator.data.get("prices", []): if "start" not in price_data: continue try: price_datetime = dt_util.parse_datetime(price_data["start"]) if not price_datetime: continue price_datetime = dt_util.as_local(price_datetime) # Check if this matches the hour and day we're looking for if price_datetime.hour == next_hour.hour and price_datetime.day == next_hour.day: price_found = price_data.get("price") _LOGGER.debug("Found price for %s in full 48h list: %s", next_hour.strftime("%Y-%m-%d %H:%M:%S"), price_found) return price_found except Exception as e: full_list_error_msg = _TRANSLATIONS_CACHE.get( "debug.error_processing_full_list", "Error processing date for full list: {error}" ).format(error=str(e)) _LOGGER.error(full_list_error_msg) # If no price found for next hour if is_looking_for_next_day: midnight_msg = _TRANSLATIONS_CACHE.get( "debug.no_price_midnight", "No price found for next day midnight. Data probably not loaded yet." ) _LOGGER.info(midnight_msg) else: no_price_msg = _TRANSLATIONS_CACHE.get( "debug.no_price_next_hour", "No price found for next hour: {next_hour}" ).format(next_hour=next_hour.strftime("%Y-%m-%d %H:%M:%S")) _LOGGER.warning(no_price_msg) return None def _get_cached_sorted_prices(self, today): """Get cached sorted prices or compute if data changed.""" # Create a simple hash of the data to detect changes data_hash = hash(tuple((p["start"], p["price"]) for p in today)) if self._last_data_hash != data_hash or self._cached_sorted_prices is None: _LOGGER.debug("Price data changed, recalculating sorted prices") # Sortowanie dla najlepszych cen sorted_best = sorted( today, key=lambda x: x["price"], reverse=(self.price_type == "sell"), ) # Sortowanie dla najgorszych cen (odwrotna kolejność sortowania) sorted_worst = sorted( today, key=lambda x: x["price"], reverse=(self.price_type != "sell"), ) self._cached_sorted_prices = { "best": sorted_best[: self.top_count], "worst": sorted_worst[: self.worst_count] } self._last_data_hash = data_hash return self._cached_sorted_prices def _is_likely_placeholder_data(self, prices_for_day): """Check if prices for a day are likely placeholders. Returns True if: - There are no prices - ALL prices have exactly the same value (suggesting API returned default values) - There are too many consecutive hours with the same value (e.g., 10+ hours) """ if not prices_for_day: return True # Get all price values price_values = [p.get("price") for p in prices_for_day if p.get("price") is not None] if not price_values: return True # If we have less than 20 prices for a day, it's incomplete data if len(price_values) < 20: _LOGGER.debug(f"Only {len(price_values)} prices for the day, likely incomplete data") return True # Check if ALL values are identical unique_values = set(price_values) if len(unique_values) == 1: _LOGGER.debug(f"All {len(price_values)} prices have the same value ({price_values[0]}), likely placeholders") return True # Additional check: if more than 90% of values are the same, it's suspicious most_common_value = max(set(price_values), key=price_values.count) count_most_common = price_values.count(most_common_value) if count_most_common / len(price_values) > 0.9: _LOGGER.debug(f"{count_most_common}/{len(price_values)} prices have value {most_common_value}, likely placeholders") return True return False def _count_consecutive_same_values(self, prices): """Count maximum consecutive hours with the same price.""" if not prices: return 0 # Sort by time to ensure consecutive checking sorted_prices = sorted(prices, key=lambda x: x.get("start", "")) max_consecutive = 1 current_consecutive = 1 last_value = None for price in sorted_prices: value = price.get("price") if value is not None: if value == last_value: current_consecutive += 1 max_consecutive = max(max_consecutive, current_consecutive) else: current_consecutive = 1 last_value = value return max_consecutive def _get_mqtt_price_count(self): """Get the actual count of prices that would be published to MQTT.""" if not self.coordinator.data: return 0 if not self.coordinator.mqtt_48h_mode: # If not in 48h mode, we only publish today's prices prices_today = self.coordinator.data.get("prices_today", []) return len(prices_today) else: # In 48h mode, we need to count valid prices all_prices = self.coordinator.data.get("prices", []) # Just count today's prices as they're always valid now = dt_util.as_local(dt_util.utcnow()) today_str = now.strftime("%Y-%m-%d") today_prices = [p for p in all_prices if p.get("start", "").startswith(today_str)] # For tomorrow, check if data looks valid tomorrow_str = (now + timedelta(days=1)).strftime("%Y-%m-%d") tomorrow_prices = [p for p in all_prices if p.get("start", "").startswith(tomorrow_str)] # Count today's prices always valid_count = len(today_prices) # Add tomorrow's prices only if they look like real data if tomorrow_prices and not self._is_likely_placeholder_data(tomorrow_prices): valid_count += len(tomorrow_prices) return valid_count def _get_sunrise_sunset_average(self, today_prices): """Calculate average price between sunrise and sunset.""" if not today_prices: return None # Get sun entity sun_entity = self.hass.states.get("sun.sun") if not sun_entity: _LOGGER.debug("Sun entity not available") return None # Get sunrise and sunset times from attributes sunrise_attr = sun_entity.attributes.get("next_rising") sunset_attr = sun_entity.attributes.get("next_setting") if not sunrise_attr or not sunset_attr: _LOGGER.debug("Sunrise/sunset times not available") return None # Parse sunrise and sunset times try: sunrise = dt_util.parse_datetime(sunrise_attr) sunset = dt_util.parse_datetime(sunset_attr) if not sunrise or not sunset: return None # Convert to local time sunrise_local = dt_util.as_local(sunrise) sunset_local = dt_util.as_local(sunset) # If sunrise is tomorrow, use today's sunrise from calculation now = dt_util.now() if sunrise_local.date() > now.date(): # Calculate approximate sunrise for today (subtract 24h) sunrise_local = sunrise_local - timedelta(days=1) # If sunset is tomorrow, we're after sunset today if sunset_local.date() > now.date(): # Use previous sunset sunset_local = sunset_local - timedelta(days=1) _LOGGER.debug(f"Calculating s/s average between {sunrise_local.strftime('%H:%M')} and {sunset_local.strftime('%H:%M')}") # Get prices between sunrise and sunset sunrise_sunset_prices = [] for price_entry in today_prices: if "start" not in price_entry or "price" not in price_entry: continue price_time = dt_util.parse_datetime(price_entry["start"]) if not price_time: continue price_time_local = dt_util.as_local(price_time) # Check if price hour is between sunrise and sunset # We check the start of the hour if sunrise_local <= price_time_local < sunset_local: price_value = price_entry.get("price") if price_value is not None: sunrise_sunset_prices.append(price_value) # Calculate average if sunrise_sunset_prices: avg = round(sum(sunrise_sunset_prices) / len(sunrise_sunset_prices), 2) _LOGGER.debug(f"S/S average calculated from {len(sunrise_sunset_prices)} hours: {avg}") return avg else: _LOGGER.debug("No prices found between sunrise and sunset") return None except Exception as e: _LOGGER.error(f"Error calculating sunrise/sunset average: {e}") return None @property def extra_state_attributes(self) -> dict: """Include the price table attributes in the current price sensor.""" now = dt_util.as_local(dt_util.utcnow()) # Get translated attribute names from cache next_hour_key = _TRANSLATIONS_CACHE.get( "entity.sensor.next_hour", "Next hour" ) using_cached_key = _TRANSLATIONS_CACHE.get( "entity.sensor.using_cached_data", "Using cached data" ) all_prices_key = _TRANSLATIONS_CACHE.get( "entity.sensor.all_prices", "All prices" ) best_prices_key = _TRANSLATIONS_CACHE.get( "entity.sensor.best_prices", "Best prices" ) worst_prices_key = _TRANSLATIONS_CACHE.get( "entity.sensor.worst_prices", "Worst prices" ) best_count_key = _TRANSLATIONS_CACHE.get( "entity.sensor.best_count", "Best count" ) worst_count_key = _TRANSLATIONS_CACHE.get( "entity.sensor.worst_count", "Worst count" ) price_count_key = _TRANSLATIONS_CACHE.get( "entity.sensor.price_count", "Price count" ) last_updated_key = _TRANSLATIONS_CACHE.get( "entity.sensor.last_updated", "Last updated" ) avg_price_key = _TRANSLATIONS_CACHE.get( "entity.sensor.avg_price", "Average price today" ) avg_price_remaining_key = _TRANSLATIONS_CACHE.get( "entity.sensor.avg_price_remaining", "Average price remaining" ) avg_price_full_day_key = _TRANSLATIONS_CACHE.get( "entity.sensor.avg_price_full_day", "Average price full day" ) tomorrow_available_key = _TRANSLATIONS_CACHE.get( "entity.sensor.tomorrow_available", "Tomorrow prices available" ) mqtt_price_count_key = _TRANSLATIONS_CACHE.get( "entity.sensor.mqtt_price_count", "MQTT price count" ) # Add sunrise/sunset average key avg_price_sunrise_sunset_key = _TRANSLATIONS_CACHE.get( "entity.sensor.avg_price_sunrise_sunset", "Average price today s/s" ) if self.coordinator.data is None: return { f"{avg_price_key} /0": None, f"{avg_price_key} /24": None, avg_price_sunrise_sunset_key: None, next_hour_key: None, all_prices_key: [], "all_prices": [], "prices_today": [], best_prices_key: [], worst_prices_key: [], best_count_key: self.top_count, worst_count_key: self.worst_count, price_count_key: 0, using_cached_key: False, tomorrow_available_key: False, mqtt_price_count_key: 0 } next_hour_data = self._get_next_hour_price() today = self.coordinator.data.get("prices_today", []) all_prices_list = self.coordinator.data.get("prices", []) is_cached = self.coordinator.data.get("is_cached", False) # Calculate average price for remaining hours today (from current hour) avg_price_remaining = None remaining_hours_count = 0 avg_price_full_day = None if today: # Full day average (all 24 hours) total_price_full = sum(p.get("price", 0) for p in today if p.get("price") is not None) valid_prices_count_full = sum(1 for p in today if p.get("price") is not None) if valid_prices_count_full > 0: avg_price_full_day = round(total_price_full / valid_prices_count_full, 2) # Remaining hours average (from current hour onwards) current_hour = now.strftime("%Y-%m-%dT%H:") remaining_prices = [] for p in today: if p.get("price") is not None and p.get("start", "") >= current_hour: remaining_prices.append(p.get("price")) remaining_hours_count = len(remaining_prices) if remaining_hours_count > 0: avg_price_remaining = round(sum(remaining_prices) / remaining_hours_count, 2) # Calculate sunrise to sunset average avg_price_sunrise_sunset = self._get_sunrise_sunset_average(today) # Create keys with hour count in user's preferred format avg_price_remaining_with_hours = f"{avg_price_key} /{remaining_hours_count}" avg_price_full_day_with_hours = f"{avg_price_key} /24" # Check if tomorrow's prices are available (more robust check) tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d") tomorrow_prices = [] # Only check for tomorrow prices if we have a reasonable amount of data if len(all_prices_list) > 0: tomorrow_prices = [p for p in all_prices_list if p.get("start", "").startswith(tomorrow)] # Log what we found for debugging if tomorrow_prices: unique_values = set(p.get("price") for p in tomorrow_prices if p.get("price") is not None) consecutive = self._count_consecutive_same_values(tomorrow_prices) _LOGGER.debug( f"Tomorrow has {len(tomorrow_prices)} prices, " f"{len(unique_values)} unique values, " f"max {consecutive} consecutive same values" ) # Tomorrow is available only if: # 1. We have at least 20 hours of data for tomorrow # 2. The data doesn't look like placeholders tomorrow_available = ( len(tomorrow_prices) >= 20 and not self._is_likely_placeholder_data(tomorrow_prices) ) # Get cached sorted prices sorted_prices = self._get_cached_sorted_prices(today) if today else {"best": [], "worst": []} # Get actual MQTT price count mqtt_price_count = self._get_mqtt_price_count() return { avg_price_remaining_with_hours: avg_price_remaining, avg_price_full_day_with_hours: avg_price_full_day, avg_price_sunrise_sunset_key: avg_price_sunrise_sunset, next_hour_key: next_hour_data, all_prices_key: today, "all_prices": all_prices_list, "prices_today": today, best_prices_key: sorted_prices["best"], worst_prices_key: sorted_prices["worst"], best_count_key: self.top_count, worst_count_key: self.worst_count, price_count_key: len(today), last_updated_key: now.strftime("%Y-%m-%d %H:%M:%S"), using_cached_key: is_cached, tomorrow_available_key: tomorrow_available, "tomorrow_available": tomorrow_available, mqtt_price_count_key: mqtt_price_count, "mqtt_48h_mode": self.coordinator.mqtt_48h_mode } @property def available(self) -> bool: """Return if entity is available.""" return self.coordinator.last_update_success and self.coordinator.data is not None class PstrykAveragePriceSensor(RestoreEntity, SensorEntity): """Average price sensor using weighted averages from API data.""" # Note: state_class removed - MONETARY device_class doesn't support MEASUREMENT def __init__(self, cost_coordinator: PstrykCostDataUpdateCoordinator, price_coordinator: PstrykDataUpdateCoordinator, period: str, entry_id: str): """Initialize the average price sensor.""" self.cost_coordinator = cost_coordinator self.price_coordinator = price_coordinator self.price_type = price_coordinator.price_type self.period = period # 'daily', 'monthly' or 'yearly' self.entry_id = entry_id self._attr_device_class = SensorDeviceClass.MONETARY self._state = None self._energy_bought = 0.0 self._energy_sold = 0.0 self._total_cost = 0.0 self._total_revenue = 0.0 async def async_added_to_hass(self): """Restore state when entity is added.""" await super().async_added_to_hass() # Subscribe to cost coordinator updates self.async_on_remove( self.cost_coordinator.async_add_listener(self._handle_cost_update) ) # Restore previous state last_state = await self.async_get_last_state() if last_state and last_state.state not in (None, "unknown", "unavailable"): try: self._state = float(last_state.state) # Restore attributes if last_state.attributes: self._energy_bought = float(last_state.attributes.get("energy_bought", 0)) self._energy_sold = float(last_state.attributes.get("energy_sold", 0)) self._total_cost = float(last_state.attributes.get("total_cost", 0)) self._total_revenue = float(last_state.attributes.get("total_revenue", 0)) _LOGGER.debug("Restored weighted average for %s %s: %s", self.price_type, self.period, self._state) except (ValueError, TypeError): _LOGGER.warning("Could not restore state for %s", self.name) @property def name(self) -> str: """Return the name of the sensor.""" period_name = _TRANSLATIONS_CACHE.get( f"entity.sensor.period_{self.period}", self.period.title() ) return f"Pstryk {self.price_type.title()} {period_name} Average" @property def unique_id(self) -> str: """Return unique ID.""" return f"{DOMAIN}_{self.price_type}_{self.period}_average" @property def device_info(self): """Return device information.""" return { "identifiers": {(DOMAIN, "pstryk_energy")}, "name": "Pstryk Energy", "manufacturer": "Pstryk", "model": "Energy Price Monitor", "sw_version": get_integration_version(self.hass), } @property def native_value(self): """Return the state of the sensor.""" return self._state @property def native_unit_of_measurement(self) -> str: """Return the unit of measurement.""" return "PLN/kWh" @property def extra_state_attributes(self) -> dict: """Return extra state attributes.""" period_key = _TRANSLATIONS_CACHE.get( "entity.sensor.period", "Period" ) calculation_method_key = _TRANSLATIONS_CACHE.get( "entity.sensor.calculation_method", "Calculation method" ) energy_bought_key = _TRANSLATIONS_CACHE.get( "entity.sensor.energy_bought", "Energy bought" ) energy_sold_key = _TRANSLATIONS_CACHE.get( "entity.sensor.energy_sold", "Energy sold" ) total_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.total_cost", "Total cost" ) total_revenue_key = _TRANSLATIONS_CACHE.get( "entity.sensor.total_revenue", "Total revenue" ) attrs = { period_key: self.period, calculation_method_key: "Weighted average", } # Add energy and cost data if available if self.price_type == "buy" and self._energy_bought > 0: attrs[energy_bought_key] = round(self._energy_bought, 2) attrs[total_cost_key] = round(self._total_cost, 2) elif self.price_type == "sell" and self._energy_sold > 0: attrs[energy_sold_key] = round(self._energy_sold, 2) attrs[total_revenue_key] = round(self._total_revenue, 2) # Add last updated at the bottom last_updated_key = _TRANSLATIONS_CACHE.get( "entity.sensor.last_updated", "Last updated" ) now = dt_util.now() attrs[last_updated_key] = now.strftime("%Y-%m-%d %H:%M:%S") return attrs @callback def _handle_cost_update(self) -> None: """Handle updated data from the cost coordinator.""" if not self.cost_coordinator or not self.cost_coordinator.data: return period_data = self.cost_coordinator.data.get(self.period) if not period_data: return # Calculate weighted average based on actual costs and usage if self.price_type == "buy": # For buy price: total cost / total energy bought total_cost = abs(period_data.get("total_cost", 0)) # Already calculated in coordinator energy_bought = period_data.get("fae_usage", 0) # kWh from usage API if energy_bought > 0: self._state = round(total_cost / energy_bought, 4) self._energy_bought = energy_bought self._total_cost = total_cost else: self._state = None elif self.price_type == "sell": # For sell price: total revenue / total energy sold total_revenue = period_data.get("total_sold", 0) energy_sold = period_data.get("rae_usage", 0) # kWh from usage API if energy_sold > 0: self._state = round(total_revenue / energy_sold, 4) self._energy_sold = energy_sold self._total_revenue = total_revenue else: self._state = None self.async_write_ha_state() @property def available(self) -> bool: """Return if entity is available.""" return (self.cost_coordinator is not None and self.cost_coordinator.last_update_success and self.cost_coordinator.data is not None) class PstrykFinancialBalanceSensor(CoordinatorEntity, SensorEntity): """Financial balance sensor that gets data directly from API.""" _attr_state_class = SensorStateClass.TOTAL _attr_device_class = SensorDeviceClass.MONETARY def __init__(self, coordinator: PstrykCostDataUpdateCoordinator, period: str, entry_id: str): """Initialize the financial balance sensor.""" super().__init__(coordinator) self.period = period # 'daily', 'monthly', or 'yearly' self.entry_id = entry_id @property def name(self) -> str: """Return the name of the sensor.""" period_name = _TRANSLATIONS_CACHE.get( f"entity.sensor.period_{self.period}", self.period.title() ) balance_text = _TRANSLATIONS_CACHE.get( "entity.sensor.financial_balance", "Financial Balance" ) return f"Pstryk {period_name} {balance_text}" @property def unique_id(self) -> str: """Return unique ID.""" return f"{DOMAIN}_financial_balance_{self.period}" @property def device_info(self): """Return device information.""" return { "identifiers": {(DOMAIN, "pstryk_energy")}, "name": "Pstryk Energy", "manufacturer": "Pstryk", "model": "Energy Price Monitor", "sw_version": get_integration_version(self.hass), } @property def native_value(self): """Return the state of the sensor from API data.""" if not self.coordinator.data: return None period_data = self.coordinator.data.get(self.period) if not period_data or "total_balance" not in period_data: return None # Get the balance value from API balance = period_data.get("total_balance") # Return exact value from API without rounding return balance @property def native_unit_of_measurement(self) -> str: """Return the unit of measurement.""" return "PLN" @property def icon(self) -> str: """Return the icon based on balance.""" if self.native_value is None: return "mdi:currency-usd-off" elif self.native_value < 0: return "mdi:cash-minus" # We're paying elif self.native_value > 0: return "mdi:cash-plus" # We're earning else: return "mdi:cash" @property def extra_state_attributes(self) -> dict: """Return extra state attributes from API data.""" if not self.coordinator.data or not self.coordinator.data.get(self.period): return {} period_data = self.coordinator.data.get(self.period) frame = period_data.get("frame", {}) # Get translated attribute names buy_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.buy_cost", "Buy cost" ) sell_revenue_key = _TRANSLATIONS_CACHE.get( "entity.sensor.sell_revenue", "Sell revenue" ) period_key = _TRANSLATIONS_CACHE.get( "entity.sensor.period", "Period" ) net_balance_key = _TRANSLATIONS_CACHE.get( "entity.sensor.balance", "Balance" ) energy_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.buy_cost", "Buy cost" ) distribution_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.distribution_cost", "Distribution cost" ) excise_key = _TRANSLATIONS_CACHE.get( "entity.sensor.excise", "Excise" ) vat_key = _TRANSLATIONS_CACHE.get( "entity.sensor.vat", "VAT" ) service_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.service_cost", "Service cost" ) energy_bought_key = _TRANSLATIONS_CACHE.get( "entity.sensor.energy_bought", "Energy bought" ) energy_sold_key = _TRANSLATIONS_CACHE.get( "entity.sensor.energy_sold", "Energy sold" ) attrs = { period_key: self.period, net_balance_key: period_data.get("total_balance", 0), buy_cost_key: period_data.get("total_cost", 0), sell_revenue_key: period_data.get("total_sold", 0), energy_bought_key: period_data.get("fae_usage", 0), energy_sold_key: period_data.get("rae_usage", 0), } # Add detailed cost breakdown if available if frame: # Konwertuj daty UTC na lokalne start_utc = frame.get("start") end_utc = frame.get("end") start_local = dt_util.as_local(dt_util.parse_datetime(start_utc)) if start_utc else None end_local = dt_util.as_local(dt_util.parse_datetime(end_utc)) if end_utc else None attrs.update({ energy_cost_key: frame.get("fae_cost", 0), distribution_cost_key: frame.get("var_dist_cost_net", 0) + frame.get("fix_dist_cost_net", 0), excise_key: frame.get("excise", 0), vat_key: frame.get("vat", 0), service_cost_key: frame.get("service_cost_net", 0), "start": start_local.strftime("%Y-%m-%d") if start_local else None, "end": end_local.strftime("%Y-%m-%d") if end_local else None, }) # Add last updated at the bottom last_updated_key = _TRANSLATIONS_CACHE.get( "entity.sensor.last_updated", "Last updated" ) now = dt_util.now() attrs[last_updated_key] = now.strftime("%Y-%m-%d %H:%M:%S") return attrs @property def available(self) -> bool: """Return if entity is available.""" return self.coordinator.last_update_success and self.coordinator.data is not None class PstrykBatteryRecommendationSensor(CoordinatorEntity, SensorEntity, RestoreEntity): """Battery charging recommendation sensor based on dynamic prices.""" # State values STATE_CHARGE = "charge" STATE_DISCHARGE = "discharge" STATE_STANDBY = "standby" def __init__( self, coordinator: PstrykDataUpdateCoordinator, entry_id: str, soc_entity_id: str, capacity: int, charge_rate: int, discharge_rate: int, min_soc: int, charge_hours_count: int, discharge_multiplier: float, ): """Initialize the battery recommendation sensor.""" super().__init__(coordinator) self.entry_id = entry_id self._soc_entity_id = soc_entity_id self._capacity = capacity self._charge_rate = charge_rate self._discharge_rate = discharge_rate self._min_soc = min_soc self._charge_hours_count = charge_hours_count self._discharge_multiplier = discharge_multiplier self._attr_icon = "mdi:battery-clock" self._unsub_soc_listener = None self._stored_energy_price = 0.0 # Weighted average cost of energy in battery async def async_added_to_hass(self) -> None: """Run when entity is added to hass.""" await super().async_added_to_hass() # Restore state last_state = await self.async_get_last_state() if last_state: try: # Restore stored energy price if available if "stored_energy_price" in last_state.attributes: self._stored_energy_price = float(last_state.attributes["stored_energy_price"]) _LOGGER.debug("Restored stored energy price: %.4f PLN/kWh", self._stored_energy_price) except (ValueError, TypeError): _LOGGER.warning("Could not restore stored energy price") # Subscribe to SoC entity state changes for immediate updates if self._soc_entity_id: @callback def _async_soc_state_changed(event) -> None: """Handle SoC entity state changes.""" new_state = event.data.get("new_state") old_state = event.data.get("old_state") if new_state is None or new_state.state in ("unknown", "unavailable"): return # Update weighted average cost if SoC increased (Charging) if old_state and old_state.state not in ("unknown", "unavailable"): try: old_soc = float(old_state.state) new_soc = float(new_state.state) if new_soc > old_soc: self._update_weighted_cost(old_soc, new_soc) except ValueError: pass _LOGGER.debug( "SoC changed from %s to %s, triggering update", old_state.state if old_state else "None", new_state.state ) # Schedule an update self.async_write_ha_state() self._unsub_soc_listener = async_track_state_change_event( self.hass, [self._soc_entity_id], _async_soc_state_changed ) _LOGGER.info( "Battery recommendation sensor now listening to SoC changes from %s", self._soc_entity_id ) def _update_weighted_cost(self, old_soc: float, new_soc: float): """Calculate new weighted average cost when charging.""" # Get current price current_price = self.coordinator.data.get("current") if current_price is None: return # Cannot calculate without price # Calculate energy chunks # Capacity is in kWh. SoC is %. # Energy = (SoC / 100) * Capacity energy_old = (old_soc / 100.0) * self._capacity energy_added = ((new_soc - old_soc) / 100.0) * self._capacity # If battery was empty OR if stored price is uninitialized (0.0), take new price as baseline if energy_old <= 0.1 or self._stored_energy_price == 0.0: self._stored_energy_price = current_price else: # Weighted Average: # (Old_kWh * Old_Price) + (Added_kWh * Current_Price) # --------------------------------------------------- # (Old_kWh + Added_kWh) total_value = (energy_old * self._stored_energy_price) + (energy_added * current_price) total_energy = energy_old + energy_added if total_energy > 0: self._stored_energy_price = total_value / total_energy _LOGGER.debug( "Updated stored energy price: %.4f PLN/kWh (Added %.2f kWh @ %.2f)", self._stored_energy_price, energy_added, current_price ) async def async_will_remove_from_hass(self) -> None: """Run when entity is removed from hass.""" await super().async_will_remove_from_hass() # Unsubscribe from SoC entity state changes if self._unsub_soc_listener: self._unsub_soc_listener() self._unsub_soc_listener = None _LOGGER.debug("Unsubscribed from SoC state changes") @property def name(self) -> str: """Return the name of the sensor.""" return "Pstryk Battery Recommendation" @property def unique_id(self) -> str: """Return unique ID.""" return f"{DOMAIN}_battery_recommendation" @property def device_info(self): """Return device information.""" return { "identifiers": {(DOMAIN, "pstryk_energy")}, "name": "Pstryk Energy", "manufacturer": "Pstryk", "model": "Energy Price Monitor", "sw_version": get_integration_version(self.hass), } def _get_current_soc(self) -> float | None: """Get current SoC from configured entity.""" if not self._soc_entity_id: return None state = self.hass.states.get(self._soc_entity_id) if state is None or state.state in ("unknown", "unavailable"): return None try: return float(state.state) except (ValueError, TypeError): _LOGGER.warning("Cannot parse SoC value from %s: %s", self._soc_entity_id, state.state) return None def _get_prices_with_hours(self) -> list[dict]: """Get prices with hour information from coordinator.""" if not self.coordinator.data: return [] prices = self.coordinator.data.get("prices", []) if not prices: return [] result = [] for price_entry in prices: try: start_str = price_entry.get("start", "") price = price_entry.get("price") if not start_str or price is None: continue dt = dt_util.parse_datetime(start_str) if dt: dt_local = dt_util.as_local(dt) result.append({ "hour": dt_local.hour, "price": price, "datetime": dt_local, "date": dt_local.date() }) except Exception as e: _LOGGER.debug("Error parsing price entry: %s", e) return result def _calculate_recommendation(self) -> tuple[str, dict]: """Calculate battery recommendation based on prices and SoC.""" now = dt_util.now() current_hour = now.hour current_soc = self._get_current_soc() prices = self._get_prices_with_hours() # Default attributes attrs = { "current_price": None, "current_soc": current_soc, "stored_energy_price": round(self._stored_energy_price, 4), "avg_charge_price": None, "discharge_threshold": None, "charge_hours": [], "discharge_hours": [], "standby_hours": [], "soc_forecast": [], "emergency_charge": False, "pre_peak_charge": False, "critical_hour": None, "reason": "No data available", "next_state_change": None, "next_state": None, "prices_horizon": "unknown", "config": { "charge_hours_count": self._charge_hours_count, "discharge_multiplier": self._discharge_multiplier, "min_soc": self._min_soc, "charge_rate": self._charge_rate, "discharge_rate": self._discharge_rate, "capacity": self._capacity, "soc_entity": self._soc_entity_id, }, "last_updated": now.strftime("%Y-%m-%d %H:%M:%S"), } if not prices or len(prices) < 12: return self.STATE_STANDBY, attrs # Get today's prices only for hour classification today = now.date() today_prices = [p for p in prices if p["date"] == today] if len(today_prices) < 12: attrs["reason"] = f"Insufficient price data for today ({len(today_prices)} hours)" return self.STATE_STANDBY, attrs # ============================================================ # ENHANCED ALGORITHM: Multi-phase arbitrage detection # ============================================================ # Instead of just picking N cheapest hours globally, we: # 1. Find primary charge hours (night - cheapest globally) # 2. Identify peaks (morning 7-10, evening 15-20) # 3. Identify mid-day valley (11-14) # 4. If mid-day valley is profitable vs evening peak, charge there too # ============================================================ # Round-trip efficiency factor (20% losses = multiply by 1.25 to break even) EFFICIENCY_FACTOR = 1.25 # Time block definitions NIGHT_HOURS = set(range(0, 6)) # 00:00 - 05:59 MORNING_PEAK = set(range(6, 11)) # 06:00 - 10:59 MIDDAY_VALLEY = set(range(11, 15)) # 11:00 - 14:59 EVENING_PEAK = set(range(15, 21)) # 15:00 - 20:59 LATE_EVENING = set(range(21, 24)) # 21:00 - 23:59 # Helper to get prices for a set of hours def get_prices_for_hours(hours_set): return [p for p in today_prices if p["hour"] in hours_set] def avg_price(price_list): if not price_list: return 0 return sum(p["price"] for p in price_list) / len(price_list) # Get prices for each block night_prices = get_prices_for_hours(NIGHT_HOURS) morning_peak_prices = get_prices_for_hours(MORNING_PEAK) midday_prices = get_prices_for_hours(MIDDAY_VALLEY) evening_peak_prices = get_prices_for_hours(EVENING_PEAK) late_evening_prices = get_prices_for_hours(LATE_EVENING) # Calculate average prices per block avg_night = avg_price(night_prices) avg_morning_peak = avg_price(morning_peak_prices) avg_midday = avg_price(midday_prices) avg_evening_peak = avg_price(evening_peak_prices) avg_late_evening = avg_price(late_evening_prices) # Sort by price to find cheapest hours globally sorted_by_price = sorted(today_prices, key=lambda x: x["price"]) # PRIMARY CHARGE: N cheapest hours (typically night) primary_charge_data = sorted_by_price[:self._charge_hours_count] charge_hours = set(p["hour"] for p in primary_charge_data) avg_charge_price = avg_price(primary_charge_data) # DISCHARGE THRESHOLD based on primary charge price discharge_threshold = avg_charge_price * self._discharge_multiplier # INTRA-DAY ARBITRAGE CHECK # If mid-day valley price * efficiency < evening peak price, it's profitable # to charge during mid-day and discharge in evening midday_arbitrage_profitable = False midday_charge_hours = set() if midday_prices and evening_peak_prices: # Find the 2-3 cheapest hours in mid-day valley sorted_midday = sorted(midday_prices, key=lambda x: x["price"]) cheapest_midday = sorted_midday[:3] # Top 3 cheapest in valley avg_cheapest_midday = avg_price(cheapest_midday) # Check if charging mid-day is profitable for evening discharge # breakeven = midday_price * 1.25 (accounting for 20% round-trip losses) if avg_cheapest_midday * EFFICIENCY_FACTOR < avg_evening_peak: midday_arbitrage_profitable = True # Add mid-day valley hours where price * efficiency < evening peak avg for p in midday_prices: if p["price"] * EFFICIENCY_FACTOR < avg_evening_peak: midday_charge_hours.add(p["hour"]) charge_hours.add(p["hour"]) # DETERMINE DISCHARGE HOURS # Hours where price >= discharge_threshold AND not in charge_hours discharge_hours = set( p["hour"] for p in today_prices if p["price"] >= discharge_threshold and p["hour"] not in charge_hours ) # STANDBY HOURS = everything else all_hours = set(range(24)) standby_hours = all_hours - charge_hours - discharge_hours # Store arbitrage info in attributes attrs["midday_arbitrage"] = { "profitable": midday_arbitrage_profitable, "midday_charge_hours": sorted(midday_charge_hours), "avg_midday_price": round(avg_midday, 4) if midday_prices else None, "avg_evening_peak": round(avg_evening_peak, 4) if evening_peak_prices else None, "breakeven_price": round(avg_midday * EFFICIENCY_FACTOR, 4) if midday_prices else None, } # Get current price current_price_data = next( (p for p in today_prices if p["hour"] == current_hour), None ) current_price = current_price_data["price"] if current_price_data else None # Update attributes attrs.update({ "current_price": current_price, "avg_charge_price": round(avg_charge_price, 4), "discharge_threshold": round(discharge_threshold, 4), "charge_hours": sorted(charge_hours), "discharge_hours": sorted(discharge_hours), "standby_hours": sorted(standby_hours), "prices_horizon": "48h" if len(prices) > 24 else "24h", }) # SoC-based logic (if SoC available) emergency_charge = False pre_peak_charge = False critical_hour = None if current_soc is not None: # Simulate SoC forward to detect critical situations soc_forecast = self._simulate_soc_forward( current_hour, current_soc, charge_hours, discharge_hours ) attrs["soc_forecast"] = soc_forecast[:12] # Next 12 hours # Check for critical SoC drop # We run this check regardless of current SoC to ensure safety. for entry in soc_forecast: if entry["soc"] < self._min_soc and entry["action"] != "charge": critical_hour = entry["hour"] # Check if there's a charge hour before critical hours_until_critical = (critical_hour - current_hour) % 24 has_charge_before = any( (current_hour + i) % 24 in charge_hours for i in range(hours_until_critical) ) # If no scheduled charge saves us, trigger emergency if not has_charge_before: emergency_charge = True break attrs["critical_hour"] = critical_hour attrs["emergency_charge"] = emergency_charge # --- FORWARD COVERAGE STRATEGY (Pre-Peak Charge) --- # Look ahead 24h for "High Price" blocks where we WANT to discharge # and ensure we have enough SoC to cover them. # 1. Identify Target Discharge Hours in next 24h # We look for prices > discharge_threshold future_discharge_hours = [] # Filter prices for next 24h window # We need to find the index of current hour in the prices list # Since prices are sorted by time, we can just find the current hour entry # Find index of current hour in the main 'prices' list start_index = -1 for idx, p in enumerate(prices): if p["date"] == today and p["hour"] == current_hour: start_index = idx break if start_index != -1: # Look at next 18 hours (typical planning horizon) # CRITICAL FIX: Start looking from NEXT hour (start_index + 1). # We want to find the *upcoming* peak. If we include the current hour, # and the current hour is marginally high (1.23), it becomes the "peak start", # making time_until_peak = 0, which disables Pre-Peak charging. lookahead_window = prices[start_index + 1 : start_index + 19] for p in lookahead_window: if p["price"] >= discharge_threshold: future_discharge_hours.append(p) # 2. Calculate Required Capacity # Required = (Hours * Discharge_Rate) + Min_SoC # We group them into "blocks". If there is a block of 5 hours coming up, # we need 5 * 10% + 20% = 70% SoC at the start of that block. if future_discharge_hours: # Find the start of the first major block first_discharge_hour = future_discharge_hours[0] # Count hours in that block (contiguous or close) # For simplicity, we just count total high hours in next 12h high_hours_count = len([p for p in future_discharge_hours if (p["datetime"] - first_discharge_hour["datetime"]).total_seconds() < 12*3600]) required_soc = (high_hours_count * self._discharge_rate) + self._min_soc # 3. Gap Analysis # Hysteresis Logic: # If we are already charging due to coverage, we want to KEEP charging # until we have a buffer (e.g., +5%) to prevent flip-flopping. threshold_soc = required_soc + 2.0 # CRITICAL FIX: Only plan coverage charging if current price is LOW. # If we are already in the high-price zone (current_price >= threshold), # we should just discharge what we have and then stop. We should NOT panic-charge # expensive energy just to discharge it again. # REFINEMENT: "Low" is relative. 1.23 is high compared to night (0.80), # but LOW compared to the upcoming peak (1.60). # We should charge if current price is notably cheaper than the peak we are protecting against. # Find min price in the upcoming discharge block min_future_peak_price = min(p["price"] for p in future_discharge_hours) if future_discharge_hours else 0 # Allow charging if: # 1. Price is generally cheap (< threshold) # OR # 2. Price is cheaper than the future peak (arbitrage opportunity to avoid running dry) # We apply a safety margin (e.g., current must be < 95% of future peak min) is_cheap_enough = False if current_price is not None: if current_price < discharge_threshold: is_cheap_enough = True elif current_price < (min_future_peak_price * 0.95): is_cheap_enough = True if current_soc < threshold_soc and is_cheap_enough: # We have a deficit AND it is cheap enough to charge! # Check if we are currently in the "Pre-Peak" window (before the high price starts) time_until_peak = (first_discharge_hour["datetime"] - now).total_seconds() / 3600 if 0 < time_until_peak < 6: # If peak is approaching (within 6 hours) # We need to charge NOW if this is a relatively cheap hour compared to the peak # or if it's the only chance left. # Find all hours between now and peak available_hours = prices[start_index : start_index + int(time_until_peak) + 1] # Sort them by price available_hours_sorted = sorted(available_hours, key=lambda x: x["price"]) # How many hours do we need to charge to fill the gap? # Gap = 30%. Charge rate = 30%/h. -> Need 1 hour. soc_deficit = threshold_soc - current_soc hours_needed = max(1, math.ceil(soc_deficit / self._charge_rate)) # Pick the cheapest N hours cheapest_pre_peak = available_hours_sorted[:hours_needed] # Is NOW one of them? if any(p["hour"] == current_hour and p["date"] == today for p in cheapest_pre_peak): pre_peak_charge = True attrs["pre_peak_charge"] = True attrs["reason"] = f"Forward Coverage: Charging for upcoming {high_hours_count}h peak (Target {threshold_soc:.0f}%)" # Add to charge set for visualization consistency charge_hours.add(current_hour) # Final decision # First check: if battery is full (100%), don't charge - switch to standby if current_soc is not None and current_soc >= 99.5: # Hysteresis for top-off if current_hour in discharge_hours: state = self.STATE_DISCHARGE reason = f"Battery full, discharging (price {current_price:.2f} >= threshold {discharge_threshold:.2f})" else: state = self.STATE_STANDBY reason = "Battery full (100%), waiting for discharge opportunity" elif emergency_charge: state = self.STATE_CHARGE reason = f"EMERGENCY: SoC will drop below {self._min_soc}% at {critical_hour}:00" elif pre_peak_charge: state = self.STATE_CHARGE # Reason already set above elif current_hour in charge_hours: state = self.STATE_CHARGE # Check if this is a midday arbitrage hour or primary cheap hour if current_hour in midday_charge_hours: reason = f"Mid-day arbitrage charge (price {current_price:.2f} profitable vs evening peak {avg_evening_peak:.2f})" elif not pre_peak_charge: # Avoid overwriting coverage reason reason = f"Cheapest hour (price {current_price:.2f} PLN/kWh in top {self._charge_hours_count} lowest)" elif current_hour in discharge_hours: if current_soc is not None and current_soc <= self._min_soc: state = self.STATE_STANDBY reason = f"Would discharge but SoC ({current_soc:.0f}%) at minimum" else: state = self.STATE_DISCHARGE reason = f"Price {current_price:.2f} >= threshold {discharge_threshold:.2f}" else: state = self.STATE_STANDBY reason = "Price between thresholds" attrs["reason"] = reason # Find next state change next_change = self._find_next_state_change( current_hour, state, charge_hours, discharge_hours ) if next_change: attrs["next_state_change"] = f"{next_change['hour']:02d}:00" attrs["next_state"] = next_change["state"] return state, attrs def _simulate_soc_forward( self, from_hour: int, start_soc: float, charge_hours: set, discharge_hours: set ) -> list[dict]: """Simulate SoC for next 24 hours.""" forecast = [] soc = start_soc for i in range(24): hour = (from_hour + i) % 24 if hour in charge_hours: # Charging: use configured charge rate, cap at 100 soc = min(100, soc + self._charge_rate) action = "charge" elif hour in discharge_hours: # Discharging: use configured discharge rate, floor at 0 soc = max(0, soc - self._discharge_rate) action = "discharge" else: # Standby: minimal drain (base consumption ~2%/h) soc = max(0, soc - 2) action = "standby" forecast.append({ "hour": hour, "soc": round(soc, 1), "action": action }) return forecast def _find_next_state_change( self, current_hour: int, current_state: str, charge_hours: set, discharge_hours: set ) -> dict | None: """Find when the next state change will occur.""" for i in range(1, 25): hour = (current_hour + i) % 24 if hour in charge_hours: next_state = self.STATE_CHARGE elif hour in discharge_hours: next_state = self.STATE_DISCHARGE else: next_state = self.STATE_STANDBY if next_state != current_state: return {"hour": hour, "state": next_state} return None @property def native_value(self) -> str: """Return the current recommendation state.""" state, _ = self._calculate_recommendation() return state @property def extra_state_attributes(self) -> dict: """Return extra state attributes.""" _, attrs = self._calculate_recommendation() return attrs @property def available(self) -> bool: """Return if entity is available.""" return self.coordinator.last_update_success and self.coordinator.data is not None