"""Sensor platform for Pstryk Energy integration.""" import logging import asyncio from datetime import datetime, timedelta from decimal import Decimal, ROUND_HALF_UP from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback from homeassistant.components.sensor import SensorEntity, SensorStateClass, SensorDeviceClass from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.helpers.restore_state import RestoreEntity from homeassistant.helpers import entity_registry as er from homeassistant.util import dt as dt_util from .update_coordinator import PstrykDataUpdateCoordinator from .energy_cost_coordinator import PstrykCostDataUpdateCoordinator from .const import ( DOMAIN, CONF_MQTT_48H_MODE, CONF_RETRY_ATTEMPTS, CONF_RETRY_DELAY, DEFAULT_RETRY_ATTEMPTS, DEFAULT_RETRY_DELAY ) from homeassistant.helpers.translation import async_get_translations _LOGGER = logging.getLogger(__name__) # Store translations globally to avoid reloading for each sensor _TRANSLATIONS_CACHE = {} async def async_setup_entry( hass: HomeAssistant, entry: ConfigEntry, async_add_entities, ) -> None: """Set up the Pstryk sensors via the coordinator.""" api_key = hass.data[DOMAIN][entry.entry_id]["api_key"] buy_top = entry.options.get("buy_top", entry.data.get("buy_top", 5)) sell_top = entry.options.get("sell_top", entry.data.get("sell_top", 5)) buy_worst = entry.options.get("buy_worst", entry.data.get("buy_worst", 5)) sell_worst = entry.options.get("sell_worst", entry.data.get("sell_worst", 5)) mqtt_48h_mode = entry.options.get(CONF_MQTT_48H_MODE, False) retry_attempts = entry.options.get(CONF_RETRY_ATTEMPTS, DEFAULT_RETRY_ATTEMPTS) retry_delay = entry.options.get(CONF_RETRY_DELAY, DEFAULT_RETRY_DELAY) _LOGGER.debug("Setting up Pstryk sensors with buy_top=%d, sell_top=%d, buy_worst=%d, sell_worst=%d, mqtt_48h_mode=%s, retry_attempts=%d, retry_delay=%ds", buy_top, sell_top, buy_worst, sell_worst, mqtt_48h_mode, retry_attempts, retry_delay) # Load translations once for all sensors global _TRANSLATIONS_CACHE if not _TRANSLATIONS_CACHE: try: _TRANSLATIONS_CACHE = await async_get_translations( hass, hass.config.language, DOMAIN, ["entity", "debug"] ) except Exception as ex: _LOGGER.warning("Failed to load translations: %s", ex) _TRANSLATIONS_CACHE = {} # Cleanup old coordinators if they exist for price_type in ("buy", "sell"): key = f"{entry.entry_id}_{price_type}" coordinator = hass.data[DOMAIN].get(key) if coordinator: _LOGGER.debug("Cleaning up existing %s coordinator", price_type) # Cancel scheduled updates if hasattr(coordinator, '_unsub_hourly') and coordinator._unsub_hourly: coordinator._unsub_hourly() if hasattr(coordinator, '_unsub_midnight') and coordinator._unsub_midnight: coordinator._unsub_midnight() if hasattr(coordinator, '_unsub_afternoon') and coordinator._unsub_afternoon: coordinator._unsub_afternoon() # Remove from hass data hass.data[DOMAIN].pop(key, None) # Cleanup old cost coordinator if exists cost_key = f"{entry.entry_id}_cost" cost_coordinator = hass.data[DOMAIN].get(cost_key) if cost_coordinator: _LOGGER.debug("Cleaning up existing cost coordinator") if hasattr(cost_coordinator, '_unsub_hourly') and cost_coordinator._unsub_hourly: cost_coordinator._unsub_hourly() if hasattr(cost_coordinator, '_unsub_midnight') and cost_coordinator._unsub_midnight: cost_coordinator._unsub_midnight() hass.data[DOMAIN].pop(cost_key, None) entities = [] coordinators = [] # Create price coordinators first for price_type in ("buy", "sell"): key = f"{entry.entry_id}_{price_type}" coordinator = PstrykDataUpdateCoordinator( hass, api_key, price_type, mqtt_48h_mode, retry_attempts, retry_delay ) coordinators.append((coordinator, price_type, key)) # Create cost coordinator cost_coordinator = PstrykCostDataUpdateCoordinator(hass, api_key, retry_attempts, retry_delay) coordinators.append((cost_coordinator, "cost", cost_key)) # Initialize coordinators in parallel to save time initial_refresh_tasks = [] for coordinator, coordinator_type, _ in coordinators: # Check if we're in the setup process or reloading try: # Newer Home Assistant versions from homeassistant.config_entries import ConfigEntryState is_setup = entry.state == ConfigEntryState.SETUP_IN_PROGRESS except ImportError: # Older Home Assistant versions - try another approach is_setup = not hass.data[DOMAIN].get(f"{entry.entry_id}_initialized", False) if is_setup: initial_refresh_tasks.append(coordinator.async_config_entry_first_refresh()) else: initial_refresh_tasks.append(coordinator.async_refresh()) refresh_results = await asyncio.gather(*initial_refresh_tasks, return_exceptions=True) # Mark as initialized after first setup hass.data[DOMAIN][f"{entry.entry_id}_initialized"] = True # Process coordinators and set up sensors for i, (coordinator, coordinator_type, key) in enumerate(coordinators): # Check if initial refresh succeeded if isinstance(refresh_results[i], Exception): _LOGGER.error("Failed to initialize %s coordinator: %s", coordinator_type, str(refresh_results[i])) # Still add coordinator and set up sensors even if initial load failed # Store coordinator hass.data[DOMAIN][key] = coordinator # Schedule updates for price coordinators if coordinator_type in ("buy", "sell"): coordinator.schedule_hourly_update() coordinator.schedule_midnight_update() # Schedule afternoon update if 48h mode is enabled if mqtt_48h_mode: coordinator.schedule_afternoon_update() # Schedule updates for cost coordinator elif coordinator_type == "cost": coordinator.schedule_hourly_update() coordinator.schedule_midnight_update() # Create price sensors if coordinator_type in ("buy", "sell"): top = buy_top if coordinator_type == "buy" else sell_top worst = buy_worst if coordinator_type == "buy" else sell_worst entities.append(PstrykPriceSensor(coordinator, coordinator_type, top, worst, entry.entry_id)) # Create average price sensors (with both coordinators) entities.append(PstrykAveragePriceSensor( cost_coordinator, coordinator, # Pass the actual price coordinator, not string! "monthly", entry.entry_id )) entities.append(PstrykAveragePriceSensor( cost_coordinator, coordinator, # Pass the actual price coordinator, not string! "yearly", entry.entry_id )) # Create financial balance sensors using cost coordinator entities.append(PstrykFinancialBalanceSensor( cost_coordinator, "daily", entry.entry_id )) entities.append(PstrykFinancialBalanceSensor( cost_coordinator, "monthly", entry.entry_id )) entities.append(PstrykFinancialBalanceSensor( cost_coordinator, "yearly", entry.entry_id )) async_add_entities(entities, True) class PstrykPriceSensor(CoordinatorEntity, SensorEntity): """Combined price sensor with table data attributes.""" _attr_state_class = SensorStateClass.MEASUREMENT def __init__(self, coordinator: PstrykDataUpdateCoordinator, price_type: str, top_count: int, worst_count: int, entry_id: str): super().__init__(coordinator) self.price_type = price_type self.top_count = top_count self.worst_count = worst_count self.entry_id = entry_id self._attr_device_class = SensorDeviceClass.MONETARY self._cached_sorted_prices = None self._last_data_hash = None async def async_added_to_hass(self): """When entity is added to Home Assistant.""" await super().async_added_to_hass() @property def name(self) -> str: return f"Pstryk Current {self.price_type.title()} Price" @property def unique_id(self) -> str: return f"{DOMAIN}_{self.price_type}_price" @property def device_info(self): """Return device information.""" return { "identifiers": {(DOMAIN, "pstryk_energy")}, "name": "Pstryk Energy", "manufacturer": "Pstryk", "model": "Energy Price Monitor", "sw_version": "1.7.1", } def _get_current_price(self): """Get current price based on current time.""" if not self.coordinator.data or not self.coordinator.data.get("prices"): return None now_utc = dt_util.utcnow() for price_entry in self.coordinator.data.get("prices", []): try: if "start" not in price_entry: continue price_datetime = dt_util.parse_datetime(price_entry["start"]) if not price_datetime: continue # Konwersja do UTC dla porównania price_datetime_utc = dt_util.as_utc(price_datetime) price_end_utc = price_datetime_utc + timedelta(hours=1) if price_datetime_utc <= now_utc < price_end_utc: return price_entry.get("price") except Exception as e: _LOGGER.error("Error determining current price: %s", str(e)) return None @property def native_value(self): if self.coordinator.data is None: return None # Próbujemy znaleźć aktualną cenę na podstawie czasu current_price = self._get_current_price() # Jeśli nie znaleźliśmy, używamy wartości z koordynatora if current_price is None: current_price = self.coordinator.data.get("current") return current_price @property def native_unit_of_measurement(self) -> str: return "PLN/kWh" def _get_next_hour_price(self) -> dict: """Get price data for the next hour.""" if not self.coordinator.data: return None now = dt_util.as_local(dt_util.utcnow()) next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) # Use translations for debug messages debug_msg = _TRANSLATIONS_CACHE.get( "debug.looking_for_next_hour", "Looking for price for next hour: {next_hour}" ).format(next_hour=next_hour.strftime("%Y-%m-%d %H:%M:%S")) _LOGGER.debug(debug_msg) # Check if we're looking for the next day's hour (midnight) is_looking_for_next_day = next_hour.day != now.day # First check in prices_today price_found = None if self.coordinator.data.get("prices_today"): for price_data in self.coordinator.data.get("prices_today", []): if "start" not in price_data: continue try: price_datetime = dt_util.parse_datetime(price_data["start"]) if not price_datetime: continue price_datetime = dt_util.as_local(price_datetime) if price_datetime.hour == next_hour.hour and price_datetime.day == next_hour.day: price_found = price_data.get("price") _LOGGER.debug("Found price for %s in today's list: %s", next_hour.strftime("%Y-%m-%d %H:%M:%S"), price_found) return price_found except Exception as e: error_msg = _TRANSLATIONS_CACHE.get( "debug.error_processing_date", "Error processing date: {error}" ).format(error=str(e)) _LOGGER.error(error_msg) # Always check the full list as a fallback, regardless of day if self.coordinator.data.get("prices"): _LOGGER.debug("Looking for price in full 48h list as fallback") for price_data in self.coordinator.data.get("prices", []): if "start" not in price_data: continue try: price_datetime = dt_util.parse_datetime(price_data["start"]) if not price_datetime: continue price_datetime = dt_util.as_local(price_datetime) # Check if this matches the hour and day we're looking for if price_datetime.hour == next_hour.hour and price_datetime.day == next_hour.day: price_found = price_data.get("price") _LOGGER.debug("Found price for %s in full 48h list: %s", next_hour.strftime("%Y-%m-%d %H:%M:%S"), price_found) return price_found except Exception as e: full_list_error_msg = _TRANSLATIONS_CACHE.get( "debug.error_processing_full_list", "Error processing date for full list: {error}" ).format(error=str(e)) _LOGGER.error(full_list_error_msg) # If no price found for next hour if is_looking_for_next_day: midnight_msg = _TRANSLATIONS_CACHE.get( "debug.no_price_midnight", "No price found for next day midnight. Data probably not loaded yet." ) _LOGGER.info(midnight_msg) else: no_price_msg = _TRANSLATIONS_CACHE.get( "debug.no_price_next_hour", "No price found for next hour: {next_hour}" ).format(next_hour=next_hour.strftime("%Y-%m-%d %H:%M:%S")) _LOGGER.warning(no_price_msg) return None def _get_cached_sorted_prices(self, today): """Get cached sorted prices or compute if data changed.""" # Create a simple hash of the data to detect changes data_hash = hash(tuple((p["start"], p["price"]) for p in today)) if self._last_data_hash != data_hash or self._cached_sorted_prices is None: _LOGGER.debug("Price data changed, recalculating sorted prices") # Sortowanie dla najlepszych cen sorted_best = sorted( today, key=lambda x: x["price"], reverse=(self.price_type == "sell"), ) # Sortowanie dla najgorszych cen (odwrotna kolejność sortowania) sorted_worst = sorted( today, key=lambda x: x["price"], reverse=(self.price_type != "sell"), ) self._cached_sorted_prices = { "best": sorted_best[: self.top_count], "worst": sorted_worst[: self.worst_count] } self._last_data_hash = data_hash return self._cached_sorted_prices def _is_likely_placeholder_data(self, prices_for_day): """Check if prices for a day are likely placeholders. Returns True if: - There are no prices - ALL prices have exactly the same value (suggesting API returned default values) - There are too many consecutive hours with the same value (e.g., 10+ hours) """ if not prices_for_day: return True # Get all price values price_values = [p.get("price") for p in prices_for_day if p.get("price") is not None] if not price_values: return True # If we have less than 20 prices for a day, it's incomplete data if len(price_values) < 20: _LOGGER.debug(f"Only {len(price_values)} prices for the day, likely incomplete data") return True # Check if ALL values are identical unique_values = set(price_values) if len(unique_values) == 1: _LOGGER.debug(f"All {len(price_values)} prices have the same value ({price_values[0]}), likely placeholders") return True # Additional check: if more than 90% of values are the same, it's suspicious most_common_value = max(set(price_values), key=price_values.count) count_most_common = price_values.count(most_common_value) if count_most_common / len(price_values) > 0.9: _LOGGER.debug(f"{count_most_common}/{len(price_values)} prices have value {most_common_value}, likely placeholders") return True return False def _count_consecutive_same_values(self, prices): """Count maximum consecutive hours with the same price.""" if not prices: return 0 # Sort by time to ensure consecutive checking sorted_prices = sorted(prices, key=lambda x: x.get("start", "")) max_consecutive = 1 current_consecutive = 1 last_value = None for price in sorted_prices: value = price.get("price") if value is not None: if value == last_value: current_consecutive += 1 max_consecutive = max(max_consecutive, current_consecutive) else: current_consecutive = 1 last_value = value return max_consecutive def _get_mqtt_price_count(self): """Get the actual count of prices that would be published to MQTT.""" if not self.coordinator.data: return 0 if not self.coordinator.mqtt_48h_mode: # If not in 48h mode, we only publish today's prices prices_today = self.coordinator.data.get("prices_today", []) return len(prices_today) else: # In 48h mode, we need to count valid prices all_prices = self.coordinator.data.get("prices", []) # Just count today's prices as they're always valid now = dt_util.as_local(dt_util.utcnow()) today_str = now.strftime("%Y-%m-%d") today_prices = [p for p in all_prices if p.get("start", "").startswith(today_str)] # For tomorrow, check if data looks valid tomorrow_str = (now + timedelta(days=1)).strftime("%Y-%m-%d") tomorrow_prices = [p for p in all_prices if p.get("start", "").startswith(tomorrow_str)] # Count today's prices always valid_count = len(today_prices) # Add tomorrow's prices only if they look like real data if tomorrow_prices and not self._is_likely_placeholder_data(tomorrow_prices): valid_count += len(tomorrow_prices) return valid_count def _get_sunrise_sunset_average(self, today_prices): """Calculate average price between sunrise and sunset.""" if not today_prices: return None # Get sun entity sun_entity = self.hass.states.get("sun.sun") if not sun_entity: _LOGGER.debug("Sun entity not available") return None # Get sunrise and sunset times from attributes sunrise_attr = sun_entity.attributes.get("next_rising") sunset_attr = sun_entity.attributes.get("next_setting") if not sunrise_attr or not sunset_attr: _LOGGER.debug("Sunrise/sunset times not available") return None # Parse sunrise and sunset times try: sunrise = dt_util.parse_datetime(sunrise_attr) sunset = dt_util.parse_datetime(sunset_attr) if not sunrise or not sunset: return None # Convert to local time sunrise_local = dt_util.as_local(sunrise) sunset_local = dt_util.as_local(sunset) # If sunrise is tomorrow, use today's sunrise from calculation now = dt_util.now() if sunrise_local.date() > now.date(): # Calculate approximate sunrise for today (subtract 24h) sunrise_local = sunrise_local - timedelta(days=1) # If sunset is tomorrow, we're after sunset today if sunset_local.date() > now.date(): # Use previous sunset sunset_local = sunset_local - timedelta(days=1) _LOGGER.debug(f"Calculating s/s average between {sunrise_local.strftime('%H:%M')} and {sunset_local.strftime('%H:%M')}") # Get prices between sunrise and sunset sunrise_sunset_prices = [] for price_entry in today_prices: if "start" not in price_entry or "price" not in price_entry: continue price_time = dt_util.parse_datetime(price_entry["start"]) if not price_time: continue price_time_local = dt_util.as_local(price_time) # Check if price hour is between sunrise and sunset # We check the start of the hour if sunrise_local <= price_time_local < sunset_local: price_value = price_entry.get("price") if price_value is not None: sunrise_sunset_prices.append(price_value) # Calculate average if sunrise_sunset_prices: avg = round(sum(sunrise_sunset_prices) / len(sunrise_sunset_prices), 2) _LOGGER.debug(f"S/S average calculated from {len(sunrise_sunset_prices)} hours: {avg}") return avg else: _LOGGER.debug("No prices found between sunrise and sunset") return None except Exception as e: _LOGGER.error(f"Error calculating sunrise/sunset average: {e}") return None @property def extra_state_attributes(self) -> dict: """Include the price table attributes in the current price sensor.""" now = dt_util.as_local(dt_util.utcnow()) # Get translated attribute names from cache next_hour_key = _TRANSLATIONS_CACHE.get( "entity.sensor.next_hour", "Next hour" ) using_cached_key = _TRANSLATIONS_CACHE.get( "entity.sensor.using_cached_data", "Using cached data" ) all_prices_key = _TRANSLATIONS_CACHE.get( "entity.sensor.all_prices", "All prices" ) best_prices_key = _TRANSLATIONS_CACHE.get( "entity.sensor.best_prices", "Best prices" ) worst_prices_key = _TRANSLATIONS_CACHE.get( "entity.sensor.worst_prices", "Worst prices" ) best_count_key = _TRANSLATIONS_CACHE.get( "entity.sensor.best_count", "Best count" ) worst_count_key = _TRANSLATIONS_CACHE.get( "entity.sensor.worst_count", "Worst count" ) price_count_key = _TRANSLATIONS_CACHE.get( "entity.sensor.price_count", "Price count" ) last_updated_key = _TRANSLATIONS_CACHE.get( "entity.sensor.last_updated", "Last updated" ) avg_price_key = _TRANSLATIONS_CACHE.get( "entity.sensor.avg_price", "Average price today" ) avg_price_remaining_key = _TRANSLATIONS_CACHE.get( "entity.sensor.avg_price_remaining", "Average price remaining" ) avg_price_full_day_key = _TRANSLATIONS_CACHE.get( "entity.sensor.avg_price_full_day", "Average price full day" ) tomorrow_available_key = _TRANSLATIONS_CACHE.get( "entity.sensor.tomorrow_available", "Tomorrow prices available" ) mqtt_price_count_key = _TRANSLATIONS_CACHE.get( "entity.sensor.mqtt_price_count", "MQTT price count" ) # Add sunrise/sunset average key avg_price_sunrise_sunset_key = _TRANSLATIONS_CACHE.get( "entity.sensor.avg_price_sunrise_sunset", "Average price today s/s" ) if self.coordinator.data is None: return { f"{avg_price_key} /0": None, f"{avg_price_key} /24": None, avg_price_sunrise_sunset_key: None, next_hour_key: None, all_prices_key: [], best_prices_key: [], worst_prices_key: [], best_count_key: self.top_count, worst_count_key: self.worst_count, price_count_key: 0, using_cached_key: False, tomorrow_available_key: False, mqtt_price_count_key: 0 } next_hour_data = self._get_next_hour_price() today = self.coordinator.data.get("prices_today", []) is_cached = self.coordinator.data.get("is_cached", False) # Calculate average price for remaining hours today (from current hour) avg_price_remaining = None remaining_hours_count = 0 avg_price_full_day = None if today: # Full day average (all 24 hours) total_price_full = sum(p.get("price", 0) for p in today if p.get("price") is not None) valid_prices_count_full = sum(1 for p in today if p.get("price") is not None) if valid_prices_count_full > 0: avg_price_full_day = round(total_price_full / valid_prices_count_full, 2) # Remaining hours average (from current hour onwards) current_hour = now.strftime("%Y-%m-%dT%H:") remaining_prices = [] for p in today: if p.get("price") is not None and p.get("start", "") >= current_hour: remaining_prices.append(p.get("price")) remaining_hours_count = len(remaining_prices) if remaining_hours_count > 0: avg_price_remaining = round(sum(remaining_prices) / remaining_hours_count, 2) # Calculate sunrise to sunset average avg_price_sunrise_sunset = self._get_sunrise_sunset_average(today) # Create keys with hour count in user's preferred format avg_price_remaining_with_hours = f"{avg_price_key} /{remaining_hours_count}" avg_price_full_day_with_hours = f"{avg_price_key} /24" # Check if tomorrow's prices are available (more robust check) all_prices = self.coordinator.data.get("prices", []) tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d") tomorrow_prices = [] # Only check for tomorrow prices if we have a reasonable amount of data if len(all_prices) > 0: tomorrow_prices = [p for p in all_prices if p.get("start", "").startswith(tomorrow)] # Log what we found for debugging if tomorrow_prices: unique_values = set(p.get("price") for p in tomorrow_prices if p.get("price") is not None) consecutive = self._count_consecutive_same_values(tomorrow_prices) _LOGGER.debug( f"Tomorrow has {len(tomorrow_prices)} prices, " f"{len(unique_values)} unique values, " f"max {consecutive} consecutive same values" ) # Tomorrow is available only if: # 1. We have at least 20 hours of data for tomorrow # 2. The data doesn't look like placeholders tomorrow_available = ( len(tomorrow_prices) >= 20 and not self._is_likely_placeholder_data(tomorrow_prices) ) # Get cached sorted prices sorted_prices = self._get_cached_sorted_prices(today) if today else {"best": [], "worst": []} # Get actual MQTT price count mqtt_price_count = self._get_mqtt_price_count() return { avg_price_remaining_with_hours: avg_price_remaining, avg_price_full_day_with_hours: avg_price_full_day, avg_price_sunrise_sunset_key: avg_price_sunrise_sunset, next_hour_key: next_hour_data, all_prices_key: today, best_prices_key: sorted_prices["best"], worst_prices_key: sorted_prices["worst"], best_count_key: self.top_count, worst_count_key: self.worst_count, price_count_key: len(today), last_updated_key: now.strftime("%Y-%m-%d %H:%M:%S"), using_cached_key: is_cached, tomorrow_available_key: tomorrow_available, mqtt_price_count_key: mqtt_price_count, "mqtt_48h_mode": self.coordinator.mqtt_48h_mode } @property def available(self) -> bool: """Return if entity is available.""" return self.coordinator.last_update_success and self.coordinator.data is not None class PstrykAveragePriceSensor(RestoreEntity, SensorEntity): """Average price sensor using weighted averages from API data.""" _attr_state_class = SensorStateClass.MEASUREMENT def __init__(self, cost_coordinator: PstrykCostDataUpdateCoordinator, price_coordinator: PstrykDataUpdateCoordinator, period: str, entry_id: str): """Initialize the average price sensor.""" self.cost_coordinator = cost_coordinator self.price_coordinator = price_coordinator self.price_type = price_coordinator.price_type self.period = period # 'monthly' or 'yearly' self.entry_id = entry_id self._attr_device_class = SensorDeviceClass.MONETARY self._state = None self._energy_bought = 0.0 self._energy_sold = 0.0 self._total_cost = 0.0 self._total_revenue = 0.0 async def async_added_to_hass(self): """Restore state when entity is added.""" await super().async_added_to_hass() # Subscribe to cost coordinator updates self.async_on_remove( self.cost_coordinator.async_add_listener(self._handle_cost_update) ) # Restore previous state last_state = await self.async_get_last_state() if last_state and last_state.state not in (None, "unknown", "unavailable"): try: self._state = float(last_state.state) # Restore attributes if last_state.attributes: self._energy_bought = float(last_state.attributes.get("energy_bought", 0)) self._energy_sold = float(last_state.attributes.get("energy_sold", 0)) self._total_cost = float(last_state.attributes.get("total_cost", 0)) self._total_revenue = float(last_state.attributes.get("total_revenue", 0)) _LOGGER.debug("Restored weighted average for %s %s: %s", self.price_type, self.period, self._state) except (ValueError, TypeError): _LOGGER.warning("Could not restore state for %s", self.name) @property def name(self) -> str: """Return the name of the sensor.""" period_name = _TRANSLATIONS_CACHE.get( f"entity.sensor.period_{self.period}", self.period.title() ) return f"Pstryk {self.price_type.title()} {period_name} Average" @property def unique_id(self) -> str: """Return unique ID.""" return f"{DOMAIN}_{self.price_type}_{self.period}_average" @property def device_info(self): """Return device information.""" return { "identifiers": {(DOMAIN, "pstryk_energy")}, "name": "Pstryk Energy", "manufacturer": "Pstryk", "model": "Energy Price Monitor", "sw_version": "1.7.1", } @property def native_value(self): """Return the state of the sensor.""" return self._state @property def native_unit_of_measurement(self) -> str: """Return the unit of measurement.""" return "PLN/kWh" @property def extra_state_attributes(self) -> dict: """Return extra state attributes.""" period_key = _TRANSLATIONS_CACHE.get( "entity.sensor.period", "Period" ) calculation_method_key = _TRANSLATIONS_CACHE.get( "entity.sensor.calculation_method", "Calculation method" ) energy_bought_key = _TRANSLATIONS_CACHE.get( "entity.sensor.energy_bought", "Energy bought" ) energy_sold_key = _TRANSLATIONS_CACHE.get( "entity.sensor.energy_sold", "Energy sold" ) total_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.total_cost", "Total cost" ) total_revenue_key = _TRANSLATIONS_CACHE.get( "entity.sensor.total_revenue", "Total revenue" ) attrs = { period_key: self.period, calculation_method_key: "Weighted average", } # Add energy and cost data if available if self.price_type == "buy" and self._energy_bought > 0: attrs[energy_bought_key] = round(self._energy_bought, 2) attrs[total_cost_key] = round(self._total_cost, 2) elif self.price_type == "sell" and self._energy_sold > 0: attrs[energy_sold_key] = round(self._energy_sold, 2) attrs[total_revenue_key] = round(self._total_revenue, 2) # Add last updated at the bottom last_updated_key = _TRANSLATIONS_CACHE.get( "entity.sensor.last_updated", "Last updated" ) now = dt_util.now() attrs[last_updated_key] = now.strftime("%Y-%m-%d %H:%M:%S") return attrs @callback def _handle_cost_update(self) -> None: """Handle updated data from the cost coordinator.""" if not self.cost_coordinator or not self.cost_coordinator.data: return period_data = self.cost_coordinator.data.get(self.period) if not period_data: return # Calculate weighted average based on actual costs and usage if self.price_type == "buy": # For buy price: total cost / total energy bought total_cost = abs(period_data.get("total_cost", 0)) # Already calculated in coordinator energy_bought = period_data.get("fae_usage", 0) # kWh from usage API if energy_bought > 0: self._state = round(total_cost / energy_bought, 4) self._energy_bought = energy_bought self._total_cost = total_cost else: self._state = None elif self.price_type == "sell": # For sell price: total revenue / total energy sold total_revenue = period_data.get("total_sold", 0) energy_sold = period_data.get("rae_usage", 0) # kWh from usage API if energy_sold > 0: self._state = round(total_revenue / energy_sold, 4) self._energy_sold = energy_sold self._total_revenue = total_revenue else: self._state = None self.async_write_ha_state() @property def available(self) -> bool: """Return if entity is available.""" return (self.cost_coordinator is not None and self.cost_coordinator.last_update_success and self.cost_coordinator.data is not None) class PstrykFinancialBalanceSensor(CoordinatorEntity, SensorEntity): """Financial balance sensor that gets data directly from API.""" _attr_state_class = SensorStateClass.TOTAL _attr_device_class = SensorDeviceClass.MONETARY def __init__(self, coordinator: PstrykCostDataUpdateCoordinator, period: str, entry_id: str): """Initialize the financial balance sensor.""" super().__init__(coordinator) self.period = period # 'daily', 'monthly', or 'yearly' self.entry_id = entry_id @property def name(self) -> str: """Return the name of the sensor.""" period_name = _TRANSLATIONS_CACHE.get( f"entity.sensor.period_{self.period}", self.period.title() ) balance_text = _TRANSLATIONS_CACHE.get( "entity.sensor.financial_balance", "Financial Balance" ) return f"Pstryk {period_name} {balance_text}" @property def unique_id(self) -> str: """Return unique ID.""" return f"{DOMAIN}_financial_balance_{self.period}" @property def device_info(self): """Return device information.""" return { "identifiers": {(DOMAIN, "pstryk_energy")}, "name": "Pstryk Energy", "manufacturer": "Pstryk", "model": "Energy Price Monitor", "sw_version": "1.7.1", } @property def native_value(self): """Return the state of the sensor from API data.""" if not self.coordinator.data: return None period_data = self.coordinator.data.get(self.period) if not period_data or "total_balance" not in period_data: return None # Get the balance value from API balance = period_data.get("total_balance") # Return exact value from API without rounding return balance @property def native_unit_of_measurement(self) -> str: """Return the unit of measurement.""" return "PLN" @property def icon(self) -> str: """Return the icon based on balance.""" if self.native_value is None: return "mdi:currency-usd-off" elif self.native_value < 0: return "mdi:cash-minus" # We're paying elif self.native_value > 0: return "mdi:cash-plus" # We're earning else: return "mdi:cash" @property def extra_state_attributes(self) -> dict: """Return extra state attributes from API data.""" if not self.coordinator.data or not self.coordinator.data.get(self.period): return {} period_data = self.coordinator.data.get(self.period) frame = period_data.get("frame", {}) # Get translated attribute names buy_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.buy_cost", "Buy cost" ) sell_revenue_key = _TRANSLATIONS_CACHE.get( "entity.sensor.sell_revenue", "Sell revenue" ) period_key = _TRANSLATIONS_CACHE.get( "entity.sensor.period", "Period" ) net_balance_key = _TRANSLATIONS_CACHE.get( "entity.sensor.balance", "Balance" ) energy_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.buy_cost", "Buy cost" ) distribution_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.distribution_cost", "Distribution cost" ) excise_key = _TRANSLATIONS_CACHE.get( "entity.sensor.excise", "Excise" ) vat_key = _TRANSLATIONS_CACHE.get( "entity.sensor.vat", "VAT" ) service_cost_key = _TRANSLATIONS_CACHE.get( "entity.sensor.service_cost", "Service cost" ) energy_bought_key = _TRANSLATIONS_CACHE.get( "entity.sensor.energy_bought", "Energy bought" ) energy_sold_key = _TRANSLATIONS_CACHE.get( "entity.sensor.energy_sold", "Energy sold" ) attrs = { period_key: self.period, net_balance_key: period_data.get("total_balance", 0), buy_cost_key: period_data.get("total_cost", 0), sell_revenue_key: period_data.get("total_sold", 0), energy_bought_key: period_data.get("fae_usage", 0), energy_sold_key: period_data.get("rae_usage", 0), } # Add detailed cost breakdown if available if frame: # Konwertuj daty UTC na lokalne start_utc = frame.get("start") end_utc = frame.get("end") start_local = dt_util.as_local(dt_util.parse_datetime(start_utc)) if start_utc else None end_local = dt_util.as_local(dt_util.parse_datetime(end_utc)) if end_utc else None attrs.update({ energy_cost_key: frame.get("fae_cost", 0), distribution_cost_key: frame.get("var_dist_cost_net", 0) + frame.get("fix_dist_cost_net", 0), excise_key: frame.get("excise", 0), vat_key: frame.get("vat", 0), service_cost_key: frame.get("service_cost_net", 0), "start": start_local.strftime("%Y-%m-%d") if start_local else None, "end": end_local.strftime("%Y-%m-%d") if end_local else None, }) # Add last updated at the bottom last_updated_key = _TRANSLATIONS_CACHE.get( "entity.sensor.last_updated", "Last updated" ) now = dt_util.now() attrs[last_updated_key] = now.strftime("%Y-%m-%d %H:%M:%S") return attrs @property def available(self) -> bool: """Return if entity is available.""" return self.coordinator.last_update_success and self.coordinator.data is not None