Update sensor.py

This commit is contained in:
balgerion
2025-05-02 12:45:49 +02:00
committed by GitHub
parent e9a958823d
commit c7e5eb3afc

View File

@ -1,6 +1,7 @@
"""Sensor platform for Pstryk Energy integration.""" """Sensor platform for Pstryk Energy integration."""
import logging import logging
import asyncio import asyncio
from datetime import datetime, timedelta
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.components.sensor import SensorEntity, SensorStateClass from homeassistant.components.sensor import SensorEntity, SensorStateClass
@ -8,6 +9,7 @@ from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from .update_coordinator import PstrykDataUpdateCoordinator from .update_coordinator import PstrykDataUpdateCoordinator
from .const import DOMAIN from .const import DOMAIN
from homeassistant.helpers.translation import async_get_translations
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -16,7 +18,7 @@ async def async_setup_entry(
entry: ConfigEntry, entry: ConfigEntry,
async_add_entities, async_add_entities,
) -> None: ) -> None:
"""Set up the four Pstryk sensors via the coordinator.""" """Set up the two Pstryk sensors via the coordinator."""
api_key = hass.data[DOMAIN][entry.entry_id]["api_key"] api_key = hass.data[DOMAIN][entry.entry_id]["api_key"]
buy_top = entry.options.get("buy_top", entry.data.get("buy_top", 5)) buy_top = entry.options.get("buy_top", entry.data.get("buy_top", 5))
sell_top = entry.options.get("sell_top", entry.data.get("sell_top", 5)) sell_top = entry.options.get("sell_top", entry.data.get("sell_top", 5))
@ -81,21 +83,41 @@ async def async_setup_entry(
coordinator.schedule_midnight_update() coordinator.schedule_midnight_update()
hass.data[DOMAIN][key] = coordinator hass.data[DOMAIN][key] = coordinator
entities.append(PstrykCurrentPriceSensor(coordinator, price_type)) # Create only one sensor per price type that combines both current price and table data
top = buy_top if price_type == "buy" else sell_top top = buy_top if price_type == "buy" else sell_top
entities.append(PstrykPriceTableSensor(coordinator, price_type, top)) entities.append(PstrykPriceSensor(coordinator, price_type, top))
async_add_entities(entities, True) async_add_entities(entities, True)
class PstrykCurrentPriceSensor(CoordinatorEntity, SensorEntity): class PstrykPriceSensor(CoordinatorEntity, SensorEntity):
"""Current price sensor.""" """Combined price sensor with table data attributes."""
_attr_state_class = SensorStateClass.MEASUREMENT _attr_state_class = SensorStateClass.MEASUREMENT
def __init__(self, coordinator: PstrykDataUpdateCoordinator, price_type: str): def __init__(self, coordinator: PstrykDataUpdateCoordinator, price_type: str, top_count: int):
super().__init__(coordinator) super().__init__(coordinator)
self.price_type = price_type self.price_type = price_type
self.top_count = top_count
self._attr_device_class = "monetary" self._attr_device_class = "monetary"
self._translations = {}
async def async_added_to_hass(self):
"""When entity is added to Home Assistant."""
await super().async_added_to_hass()
# Load translations
self._translations = await self._load_translations()
async def _load_translations(self):
"""Load translations for the current language."""
translations = {}
try:
translations = await async_get_translations(
self.hass, self.hass.config.language, DOMAIN, ["entity", "debug"]
)
except Exception as ex:
_LOGGER.warning("Failed to load translations: %s", ex)
return translations
@property @property
def name(self) -> str: def name(self) -> str:
@ -103,7 +125,7 @@ class PstrykCurrentPriceSensor(CoordinatorEntity, SensorEntity):
@property @property
def unique_id(self) -> str: def unique_id(self) -> str:
return f"{DOMAIN}_{self.price_type}_current" return f"{DOMAIN}_{self.price_type}_price"
@property @property
def native_value(self): def native_value(self):
@ -115,58 +137,128 @@ class PstrykCurrentPriceSensor(CoordinatorEntity, SensorEntity):
def native_unit_of_measurement(self) -> str: def native_unit_of_measurement(self) -> str:
return "PLN/kWh" return "PLN/kWh"
@property def _get_next_hour_price(self) -> dict:
def available(self) -> bool: """Get price data for the next hour."""
"""Return if entity is available.""" if not self.coordinator.data:
return self.coordinator.last_update_success and self.coordinator.data is not None return None
now = dt_util.as_local(dt_util.utcnow())
next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
class PstrykPriceTableSensor(CoordinatorEntity, SensorEntity): # Use translations for debug messages
"""Today's price table sensor.""" debug_msg = self._translations.get(
_attr_state_class = SensorStateClass.MEASUREMENT "debug.looking_for_next_hour",
"Looking for price for next hour: {next_hour}"
).format(next_hour=next_hour.isoformat())
_LOGGER.debug(debug_msg)
def __init__(self, coordinator: PstrykDataUpdateCoordinator, price_type: str, top_count: int): # Check if we're looking for the next day's hour (midnight)
super().__init__(coordinator) is_looking_for_next_day = next_hour.day != now.day
self.price_type = price_type
self.top_count = top_count
@property # First check in prices_today
def name(self) -> str: if not is_looking_for_next_day or self.coordinator.data.get("prices_today"):
return f"Pstryk {self.price_type.title()} Price Table" for price_data in self.coordinator.data.get("prices_today", []):
if "start" not in price_data:
continue
@property try:
def unique_id(self) -> str: price_datetime = dt_util.parse_datetime(price_data["start"])
return f"{DOMAIN}_{self.price_type}_table" if not price_datetime:
continue
@property price_datetime = dt_util.as_local(price_datetime)
def native_value(self) -> int:
# number of price slots today if price_datetime.hour == next_hour.hour and price_datetime.day == next_hour.day:
if self.coordinator.data is None: return price_data.get("price")
return 0 except Exception as e:
return len(self.coordinator.data.get("prices_today", [])) error_msg = self._translations.get(
"debug.error_processing_date",
"Error processing date: {error}"
).format(error=str(e))
_LOGGER.error(error_msg)
# If looking for midnight hour (next day), also check prices (full 48h list)
if is_looking_for_next_day and self.coordinator.data.get("prices"):
next_day_msg = self._translations.get(
"debug.looking_for_next_day",
"Looking for next day price in full price list (48h)"
)
_LOGGER.debug(next_day_msg)
for price_data in self.coordinator.data.get("prices", []):
if "start" not in price_data:
continue
try:
price_datetime = dt_util.parse_datetime(price_data["start"])
if not price_datetime:
continue
price_datetime = dt_util.as_local(price_datetime)
# Check if this is 00:00 of the next day
if price_datetime.hour == 0 and price_datetime.day == next_hour.day:
return price_data.get("price")
except Exception as e:
full_list_error_msg = self._translations.get(
"debug.error_processing_full_list",
"Error processing date for full list: {error}"
).format(error=str(e))
_LOGGER.error(full_list_error_msg)
# If no price found for next hour
if is_looking_for_next_day:
midnight_msg = self._translations.get(
"debug.no_price_midnight",
"No price found for next day midnight. Data probably not loaded yet."
)
_LOGGER.info(midnight_msg)
else:
no_price_msg = self._translations.get(
"debug.no_price_next_hour",
"No price found for next hour: {next_hour}"
).format(next_hour=next_hour.isoformat())
_LOGGER.warning(no_price_msg)
return None
@property @property
def extra_state_attributes(self) -> dict: def extra_state_attributes(self) -> dict:
"""Include the price table attributes in the current price sensor."""
now = dt_util.as_local(dt_util.utcnow())
# Get translated attribute name
next_hour_key = self._translations.get(
"entity.sensor.next_hour",
"Next hour"
)
if self.coordinator.data is None: if self.coordinator.data is None:
return { return {
next_hour_key: None,
"all_prices": [], "all_prices": [],
"best_prices": [], "best_prices": [],
"top_count": self.top_count, "top_count": self.top_count,
"last_updated": dt_util.as_local(dt_util.utcnow()).isoformat(), "last_updated": now.isoformat(),
"price_count": 0,
"data_available": False "data_available": False
} }
next_hour_data = self._get_next_hour_price()
today = self.coordinator.data.get("prices_today", []) today = self.coordinator.data.get("prices_today", [])
sorted_prices = sorted( sorted_prices = sorted(
today, today,
key=lambda x: x["price"], key=lambda x: x["price"],
reverse=(self.price_type == "sell"), reverse=(self.price_type == "sell"),
) )
return { return {
next_hour_key: next_hour_data,
"all_prices": today, "all_prices": today,
"best_prices": sorted_prices[: self.top_count], "best_prices": sorted_prices[: self.top_count],
"top_count": self.top_count, "top_count": self.top_count,
"last_updated": dt_util.as_local(dt_util.utcnow()).isoformat(), "price_count": len(today),
"last_updated": now.isoformat(),
"data_available": True "data_available": True
} }