Update sensor.py

This commit is contained in:
balgerion
2025-06-28 11:34:53 +02:00
committed by GitHub
parent a625941e8e
commit 1e7fee32bf

View File

@ -2,12 +2,16 @@
import logging import logging
import asyncio import asyncio
from datetime import datetime, timedelta from datetime import datetime, timedelta
from decimal import Decimal, ROUND_HALF_UP
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant, callback
from homeassistant.components.sensor import SensorEntity, SensorStateClass from homeassistant.components.sensor import SensorEntity, SensorStateClass, SensorDeviceClass
from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers import entity_registry as er
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from .update_coordinator import PstrykDataUpdateCoordinator from .update_coordinator import PstrykDataUpdateCoordinator
from .energy_cost_coordinator import PstrykCostDataUpdateCoordinator
from .const import ( from .const import (
DOMAIN, DOMAIN,
CONF_MQTT_48H_MODE, CONF_MQTT_48H_MODE,
@ -28,7 +32,7 @@ async def async_setup_entry(
entry: ConfigEntry, entry: ConfigEntry,
async_add_entities, async_add_entities,
) -> None: ) -> None:
"""Set up the two Pstryk sensors via the coordinator.""" """Set up the Pstryk sensors via the coordinator."""
api_key = hass.data[DOMAIN][entry.entry_id]["api_key"] api_key = hass.data[DOMAIN][entry.entry_id]["api_key"]
buy_top = entry.options.get("buy_top", entry.data.get("buy_top", 5)) buy_top = entry.options.get("buy_top", entry.data.get("buy_top", 5))
sell_top = entry.options.get("sell_top", entry.data.get("sell_top", 5)) sell_top = entry.options.get("sell_top", entry.data.get("sell_top", 5))
@ -67,11 +71,22 @@ async def async_setup_entry(
coordinator._unsub_afternoon() coordinator._unsub_afternoon()
# Remove from hass data # Remove from hass data
hass.data[DOMAIN].pop(key, None) hass.data[DOMAIN].pop(key, None)
# Cleanup old cost coordinator if exists
cost_key = f"{entry.entry_id}_cost"
cost_coordinator = hass.data[DOMAIN].get(cost_key)
if cost_coordinator:
_LOGGER.debug("Cleaning up existing cost coordinator")
if hasattr(cost_coordinator, '_unsub_hourly') and cost_coordinator._unsub_hourly:
cost_coordinator._unsub_hourly()
if hasattr(cost_coordinator, '_unsub_midnight') and cost_coordinator._unsub_midnight:
cost_coordinator._unsub_midnight()
hass.data[DOMAIN].pop(cost_key, None)
entities = [] entities = []
coordinators = [] coordinators = []
# Create coordinators first # Create price coordinators first
for price_type in ("buy", "sell"): for price_type in ("buy", "sell"):
key = f"{entry.entry_id}_{price_type}" key = f"{entry.entry_id}_{price_type}"
coordinator = PstrykDataUpdateCoordinator( coordinator = PstrykDataUpdateCoordinator(
@ -84,9 +99,13 @@ async def async_setup_entry(
) )
coordinators.append((coordinator, price_type, key)) coordinators.append((coordinator, price_type, key))
# Create cost coordinator
cost_coordinator = PstrykCostDataUpdateCoordinator(hass, api_key)
coordinators.append((cost_coordinator, "cost", cost_key))
# Initialize coordinators in parallel to save time # Initialize coordinators in parallel to save time
initial_refresh_tasks = [] initial_refresh_tasks = []
for coordinator, _, _ in coordinators: for coordinator, coordinator_type, _ in coordinators:
# Check if we're in the setup process or reloading # Check if we're in the setup process or reloading
try: try:
# Newer Home Assistant versions # Newer Home Assistant versions
@ -107,27 +126,60 @@ async def async_setup_entry(
hass.data[DOMAIN][f"{entry.entry_id}_initialized"] = True hass.data[DOMAIN][f"{entry.entry_id}_initialized"] = True
# Process coordinators and set up sensors # Process coordinators and set up sensors
for i, (coordinator, price_type, key) in enumerate(coordinators): for i, (coordinator, coordinator_type, key) in enumerate(coordinators):
# Check if initial refresh succeeded # Check if initial refresh succeeded
if isinstance(refresh_results[i], Exception): if isinstance(refresh_results[i], Exception):
_LOGGER.error("Failed to initialize %s coordinator: %s", _LOGGER.error("Failed to initialize %s coordinator: %s",
price_type, str(refresh_results[i])) coordinator_type, str(refresh_results[i]))
# Still add coordinator and set up sensors even if initial load failed # Still add coordinator and set up sensors even if initial load failed
# Schedule updates # Store coordinator
coordinator.schedule_hourly_update()
coordinator.schedule_midnight_update()
# Schedule afternoon update if 48h mode is enabled
if mqtt_48h_mode:
coordinator.schedule_afternoon_update()
hass.data[DOMAIN][key] = coordinator hass.data[DOMAIN][key] = coordinator
# Schedule updates for price coordinators
if coordinator_type in ("buy", "sell"):
coordinator.schedule_hourly_update()
coordinator.schedule_midnight_update()
# Schedule afternoon update if 48h mode is enabled
if mqtt_48h_mode:
coordinator.schedule_afternoon_update()
# Schedule updates for cost coordinator
elif coordinator_type == "cost":
coordinator.schedule_hourly_update()
coordinator.schedule_midnight_update()
# Create only one sensor per price type that combines both current price and table data # Create price sensors
top = buy_top if price_type == "buy" else sell_top if coordinator_type in ("buy", "sell"):
worst = buy_worst if price_type == "buy" else sell_worst top = buy_top if coordinator_type == "buy" else sell_top
entities.append(PstrykPriceSensor(coordinator, price_type, top, worst, entry.entry_id)) worst = buy_worst if coordinator_type == "buy" else sell_worst
entities.append(PstrykPriceSensor(coordinator, coordinator_type, top, worst, entry.entry_id))
# Create average price sensors (with both coordinators)
entities.append(PstrykAveragePriceSensor(
cost_coordinator,
coordinator, # Pass the actual price coordinator, not string!
"monthly",
entry.entry_id
))
entities.append(PstrykAveragePriceSensor(
cost_coordinator,
coordinator, # Pass the actual price coordinator, not string!
"yearly",
entry.entry_id
))
# Create financial balance sensors using cost coordinator
entities.append(PstrykFinancialBalanceSensor(
cost_coordinator, "daily", entry.entry_id
))
entities.append(PstrykFinancialBalanceSensor(
cost_coordinator, "monthly", entry.entry_id
))
entities.append(PstrykFinancialBalanceSensor(
cost_coordinator, "yearly", entry.entry_id
))
async_add_entities(entities, True) async_add_entities(entities, True)
@ -142,7 +194,7 @@ class PstrykPriceSensor(CoordinatorEntity, SensorEntity):
self.top_count = top_count self.top_count = top_count
self.worst_count = worst_count self.worst_count = worst_count
self.entry_id = entry_id self.entry_id = entry_id
self._attr_device_class = "monetary" self._attr_device_class = SensorDeviceClass.MONETARY
self._cached_sorted_prices = None self._cached_sorted_prices = None
self._last_data_hash = None self._last_data_hash = None
@ -166,7 +218,7 @@ class PstrykPriceSensor(CoordinatorEntity, SensorEntity):
"name": "Pstryk Energy", "name": "Pstryk Energy",
"manufacturer": "Pstryk", "manufacturer": "Pstryk",
"model": "Energy Price Monitor", "model": "Energy Price Monitor",
"sw_version": "1.6.2", "sw_version": "1.7.1",
} }
def _get_current_price(self): def _get_current_price(self):
@ -420,6 +472,83 @@ class PstrykPriceSensor(CoordinatorEntity, SensorEntity):
valid_count += len(tomorrow_prices) valid_count += len(tomorrow_prices)
return valid_count return valid_count
def _get_sunrise_sunset_average(self, today_prices):
"""Calculate average price between sunrise and sunset."""
if not today_prices:
return None
# Get sun entity
sun_entity = self.hass.states.get("sun.sun")
if not sun_entity:
_LOGGER.debug("Sun entity not available")
return None
# Get sunrise and sunset times from attributes
sunrise_attr = sun_entity.attributes.get("next_rising")
sunset_attr = sun_entity.attributes.get("next_setting")
if not sunrise_attr or not sunset_attr:
_LOGGER.debug("Sunrise/sunset times not available")
return None
# Parse sunrise and sunset times
try:
sunrise = dt_util.parse_datetime(sunrise_attr)
sunset = dt_util.parse_datetime(sunset_attr)
if not sunrise or not sunset:
return None
# Convert to local time
sunrise_local = dt_util.as_local(sunrise)
sunset_local = dt_util.as_local(sunset)
# If sunrise is tomorrow, use today's sunrise from calculation
now = dt_util.now()
if sunrise_local.date() > now.date():
# Calculate approximate sunrise for today (subtract 24h)
sunrise_local = sunrise_local - timedelta(days=1)
# If sunset is tomorrow, we're after sunset today
if sunset_local.date() > now.date():
# Use previous sunset
sunset_local = sunset_local - timedelta(days=1)
_LOGGER.debug(f"Calculating s/s average between {sunrise_local.strftime('%H:%M')} and {sunset_local.strftime('%H:%M')}")
# Get prices between sunrise and sunset
sunrise_sunset_prices = []
for price_entry in today_prices:
if "start" not in price_entry or "price" not in price_entry:
continue
price_time = dt_util.parse_datetime(price_entry["start"])
if not price_time:
continue
price_time_local = dt_util.as_local(price_time)
# Check if price hour is between sunrise and sunset
# We check the start of the hour
if sunrise_local <= price_time_local < sunset_local:
price_value = price_entry.get("price")
if price_value is not None:
sunrise_sunset_prices.append(price_value)
# Calculate average
if sunrise_sunset_prices:
avg = round(sum(sunrise_sunset_prices) / len(sunrise_sunset_prices), 2)
_LOGGER.debug(f"S/S average calculated from {len(sunrise_sunset_prices)} hours: {avg}")
return avg
else:
_LOGGER.debug("No prices found between sunrise and sunset")
return None
except Exception as e:
_LOGGER.error(f"Error calculating sunrise/sunset average: {e}")
return None
@property @property
def extra_state_attributes(self) -> dict: def extra_state_attributes(self) -> dict:
@ -497,10 +626,17 @@ class PstrykPriceSensor(CoordinatorEntity, SensorEntity):
"MQTT price count" "MQTT price count"
) )
# Add sunrise/sunset average key
avg_price_sunrise_sunset_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.avg_price_sunrise_sunset",
"Average price today s/s"
)
if self.coordinator.data is None: if self.coordinator.data is None:
return { return {
f"{avg_price_key} /0": None, f"{avg_price_key} /0": None,
f"{avg_price_key} /24": None, f"{avg_price_key} /24": None,
avg_price_sunrise_sunset_key: None,
next_hour_key: None, next_hour_key: None,
all_prices_key: [], all_prices_key: [],
best_prices_key: [], best_prices_key: [],
@ -541,6 +677,9 @@ class PstrykPriceSensor(CoordinatorEntity, SensorEntity):
if remaining_hours_count > 0: if remaining_hours_count > 0:
avg_price_remaining = round(sum(remaining_prices) / remaining_hours_count, 2) avg_price_remaining = round(sum(remaining_prices) / remaining_hours_count, 2)
# Calculate sunrise to sunset average
avg_price_sunrise_sunset = self._get_sunrise_sunset_average(today)
# Create keys with hour count in user's preferred format # Create keys with hour count in user's preferred format
avg_price_remaining_with_hours = f"{avg_price_key} /{remaining_hours_count}" avg_price_remaining_with_hours = f"{avg_price_key} /{remaining_hours_count}"
avg_price_full_day_with_hours = f"{avg_price_key} /24" avg_price_full_day_with_hours = f"{avg_price_key} /24"
@ -581,6 +720,7 @@ class PstrykPriceSensor(CoordinatorEntity, SensorEntity):
return { return {
avg_price_remaining_with_hours: avg_price_remaining, avg_price_remaining_with_hours: avg_price_remaining,
avg_price_full_day_with_hours: avg_price_full_day, avg_price_full_day_with_hours: avg_price_full_day,
avg_price_sunrise_sunset_key: avg_price_sunrise_sunset,
next_hour_key: next_hour_data, next_hour_key: next_hour_data,
all_prices_key: today, all_prices_key: today,
best_prices_key: sorted_prices["best"], best_prices_key: sorted_prices["best"],
@ -599,3 +739,352 @@ class PstrykPriceSensor(CoordinatorEntity, SensorEntity):
def available(self) -> bool: def available(self) -> bool:
"""Return if entity is available.""" """Return if entity is available."""
return self.coordinator.last_update_success and self.coordinator.data is not None return self.coordinator.last_update_success and self.coordinator.data is not None
class PstrykAveragePriceSensor(RestoreEntity, SensorEntity):
"""Average price sensor using weighted averages from API data."""
_attr_state_class = SensorStateClass.MEASUREMENT
def __init__(self, cost_coordinator: PstrykCostDataUpdateCoordinator,
price_coordinator: PstrykDataUpdateCoordinator,
period: str, entry_id: str):
"""Initialize the average price sensor."""
self.cost_coordinator = cost_coordinator
self.price_coordinator = price_coordinator
self.price_type = price_coordinator.price_type
self.period = period # 'monthly' or 'yearly'
self.entry_id = entry_id
self._attr_device_class = SensorDeviceClass.MONETARY
self._state = None
self._energy_bought = 0.0
self._energy_sold = 0.0
self._total_cost = 0.0
self._total_revenue = 0.0
async def async_added_to_hass(self):
"""Restore state when entity is added."""
await super().async_added_to_hass()
# Subscribe to cost coordinator updates
self.async_on_remove(
self.cost_coordinator.async_add_listener(self._handle_cost_update)
)
# Restore previous state
last_state = await self.async_get_last_state()
if last_state and last_state.state not in (None, "unknown", "unavailable"):
try:
self._state = float(last_state.state)
# Restore attributes
if last_state.attributes:
self._energy_bought = float(last_state.attributes.get("energy_bought", 0))
self._energy_sold = float(last_state.attributes.get("energy_sold", 0))
self._total_cost = float(last_state.attributes.get("total_cost", 0))
self._total_revenue = float(last_state.attributes.get("total_revenue", 0))
_LOGGER.debug("Restored weighted average for %s %s: %s",
self.price_type, self.period, self._state)
except (ValueError, TypeError):
_LOGGER.warning("Could not restore state for %s", self.name)
@property
def name(self) -> str:
"""Return the name of the sensor."""
period_name = _TRANSLATIONS_CACHE.get(
f"entity.sensor.period_{self.period}",
self.period.title()
)
return f"Pstryk {self.price_type.title()} {period_name} Average"
@property
def unique_id(self) -> str:
"""Return unique ID."""
return f"{DOMAIN}_{self.price_type}_{self.period}_average"
@property
def device_info(self):
"""Return device information."""
return {
"identifiers": {(DOMAIN, "pstryk_energy")},
"name": "Pstryk Energy",
"manufacturer": "Pstryk",
"model": "Energy Price Monitor",
"sw_version": "1.7.1",
}
@property
def native_value(self):
"""Return the state of the sensor."""
return self._state
@property
def native_unit_of_measurement(self) -> str:
"""Return the unit of measurement."""
return "PLN/kWh"
@property
def extra_state_attributes(self) -> dict:
"""Return extra state attributes."""
period_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.period",
"Period"
)
calculation_method_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.calculation_method",
"Calculation method"
)
energy_bought_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.energy_bought",
"Energy bought"
)
energy_sold_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.energy_sold",
"Energy sold"
)
total_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.total_cost",
"Total cost"
)
total_revenue_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.total_revenue",
"Total revenue"
)
attrs = {
period_key: self.period,
calculation_method_key: "Weighted average",
}
# Add energy and cost data if available
if self.price_type == "buy" and self._energy_bought > 0:
attrs[energy_bought_key] = round(self._energy_bought, 2)
attrs[total_cost_key] = round(self._total_cost, 2)
elif self.price_type == "sell" and self._energy_sold > 0:
attrs[energy_sold_key] = round(self._energy_sold, 2)
attrs[total_revenue_key] = round(self._total_revenue, 2)
# Add last updated at the bottom
last_updated_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.last_updated",
"Last updated"
)
now = dt_util.now()
attrs[last_updated_key] = now.strftime("%Y-%m-%d %H:%M:%S")
return attrs
@callback
def _handle_cost_update(self) -> None:
"""Handle updated data from the cost coordinator."""
if not self.cost_coordinator or not self.cost_coordinator.data:
return
period_data = self.cost_coordinator.data.get(self.period)
if not period_data:
return
# Calculate weighted average based on actual costs and usage
if self.price_type == "buy":
# For buy price: total cost / total energy bought
total_cost = abs(period_data.get("total_cost", 0)) # Already calculated in coordinator
energy_bought = period_data.get("fae_usage", 0) # kWh from usage API
if energy_bought > 0:
self._state = round(total_cost / energy_bought, 4)
self._energy_bought = energy_bought
self._total_cost = total_cost
else:
self._state = None
elif self.price_type == "sell":
# For sell price: total revenue / total energy sold
total_revenue = period_data.get("total_sold", 0)
energy_sold = period_data.get("rae_usage", 0) # kWh from usage API
if energy_sold > 0:
self._state = round(total_revenue / energy_sold, 4)
self._energy_sold = energy_sold
self._total_revenue = total_revenue
else:
self._state = None
self.async_write_ha_state()
@property
def available(self) -> bool:
"""Return if entity is available."""
return (self.cost_coordinator is not None and
self.cost_coordinator.last_update_success and
self.cost_coordinator.data is not None)
class PstrykFinancialBalanceSensor(CoordinatorEntity, SensorEntity):
"""Financial balance sensor that gets data directly from API."""
_attr_state_class = SensorStateClass.TOTAL
_attr_device_class = SensorDeviceClass.MONETARY
def __init__(self, coordinator: PstrykCostDataUpdateCoordinator,
period: str, entry_id: str):
"""Initialize the financial balance sensor."""
super().__init__(coordinator)
self.period = period # 'daily', 'monthly', or 'yearly'
self.entry_id = entry_id
@property
def name(self) -> str:
"""Return the name of the sensor."""
period_name = _TRANSLATIONS_CACHE.get(
f"entity.sensor.period_{self.period}",
self.period.title()
)
balance_text = _TRANSLATIONS_CACHE.get(
"entity.sensor.financial_balance",
"Financial Balance"
)
return f"Pstryk {period_name} {balance_text}"
@property
def unique_id(self) -> str:
"""Return unique ID."""
return f"{DOMAIN}_financial_balance_{self.period}"
@property
def device_info(self):
"""Return device information."""
return {
"identifiers": {(DOMAIN, "pstryk_energy")},
"name": "Pstryk Energy",
"manufacturer": "Pstryk",
"model": "Energy Price Monitor",
"sw_version": "1.7.1",
}
@property
def native_value(self):
"""Return the state of the sensor from API data."""
if not self.coordinator.data:
return None
period_data = self.coordinator.data.get(self.period)
if not period_data or "total_balance" not in period_data:
return None
# Get the balance value from API
balance = period_data.get("total_balance")
# Return exact value from API without rounding
return balance
@property
def native_unit_of_measurement(self) -> str:
"""Return the unit of measurement."""
return "PLN"
@property
def icon(self) -> str:
"""Return the icon based on balance."""
if self.native_value is None:
return "mdi:currency-usd-off"
elif self.native_value < 0:
return "mdi:cash-minus" # We're paying
elif self.native_value > 0:
return "mdi:cash-plus" # We're earning
else:
return "mdi:cash"
@property
def extra_state_attributes(self) -> dict:
"""Return extra state attributes from API data."""
if not self.coordinator.data or not self.coordinator.data.get(self.period):
return {}
period_data = self.coordinator.data.get(self.period)
frame = period_data.get("frame", {})
# Get translated attribute names
buy_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.buy_cost",
"Buy cost"
)
sell_revenue_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.sell_revenue",
"Sell revenue"
)
period_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.period",
"Period"
)
net_balance_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.balance",
"Balance"
)
energy_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.buy_cost",
"Buy cost"
)
distribution_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.distribution_cost",
"Distribution cost"
)
excise_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.excise",
"Excise"
)
vat_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.vat",
"VAT"
)
service_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.service_cost",
"Service cost"
)
energy_bought_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.energy_bought",
"Energy bought"
)
energy_sold_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.energy_sold",
"Energy sold"
)
attrs = {
period_key: self.period,
net_balance_key: period_data.get("total_balance", 0),
buy_cost_key: period_data.get("total_cost", 0),
sell_revenue_key: period_data.get("total_sold", 0),
energy_bought_key: period_data.get("fae_usage", 0),
energy_sold_key: period_data.get("rae_usage", 0),
}
# Add detailed cost breakdown if available
if frame:
# Konwertuj daty UTC na lokalne
start_utc = frame.get("start")
end_utc = frame.get("end")
start_local = dt_util.as_local(dt_util.parse_datetime(start_utc)) if start_utc else None
end_local = dt_util.as_local(dt_util.parse_datetime(end_utc)) if end_utc else None
attrs.update({
energy_cost_key: frame.get("fae_cost", 0),
distribution_cost_key: frame.get("var_dist_cost_net", 0) + frame.get("fix_dist_cost_net", 0),
excise_key: frame.get("excise", 0),
vat_key: frame.get("vat", 0),
service_cost_key: frame.get("service_cost_net", 0),
"start": start_local.strftime("%Y-%m-%d") if start_local else None,
"end": end_local.strftime("%Y-%m-%d") if end_local else None,
})
# Add last updated at the bottom
last_updated_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.last_updated",
"Last updated"
)
now = dt_util.now()
attrs[last_updated_key] = now.strftime("%Y-%m-%d %H:%M:%S")
return attrs
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self.coordinator.data is not None