Files

1893 lines
78 KiB
Python

"""Sensor platform for Pstryk Energy integration."""
import logging
import asyncio
import math
from datetime import timedelta
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.components.sensor import SensorEntity, SensorStateClass, SensorDeviceClass
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.util import dt as dt_util
from .update_coordinator import PstrykDataUpdateCoordinator
from .energy_cost_coordinator import PstrykCostDataUpdateCoordinator
from .api_client import PstrykAPIClient
from .const import (
DOMAIN,
CONF_MQTT_48H_MODE,
CONF_RETRY_ATTEMPTS,
CONF_RETRY_DELAY,
DEFAULT_RETRY_ATTEMPTS,
DEFAULT_RETRY_DELAY,
# Battery recommendation constants
CONF_BATTERY_ENABLED,
CONF_BATTERY_SOC_ENTITY,
CONF_BATTERY_CAPACITY,
CONF_BATTERY_CHARGE_RATE,
CONF_BATTERY_DISCHARGE_RATE,
CONF_BATTERY_MIN_SOC,
CONF_BATTERY_CHARGE_HOURS,
CONF_BATTERY_DISCHARGE_MULTIPLIER,
DEFAULT_BATTERY_CAPACITY,
DEFAULT_BATTERY_CHARGE_RATE,
DEFAULT_BATTERY_DISCHARGE_RATE,
DEFAULT_BATTERY_MIN_SOC,
DEFAULT_BATTERY_CHARGE_HOURS,
DEFAULT_BATTERY_DISCHARGE_MULTIPLIER,
)
from homeassistant.helpers.translation import async_get_translations
_LOGGER = logging.getLogger(__name__)
# Store translations globally to avoid reloading for each sensor
_TRANSLATIONS_CACHE = {}
# Cache for manifest version - load at module import time (outside event loop)
_VERSION_CACHE = None
def _load_version_sync() -> str:
"""Load version synchronously at module import time."""
try:
import json
import os
manifest_path = os.path.join(os.path.dirname(__file__), "manifest.json")
with open(manifest_path, "r") as f:
manifest = json.load(f)
return manifest.get("version", "unknown")
except Exception:
return "unknown"
# Load version once at module import (not in event loop)
_VERSION_CACHE = _load_version_sync()
def get_integration_version(hass: HomeAssistant) -> str:
"""Get integration version from manifest.json."""
return _VERSION_CACHE
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities,
) -> None:
"""Set up the Pstryk sensors via the coordinator."""
api_key = hass.data[DOMAIN][entry.entry_id]["api_key"]
buy_top = entry.options.get("buy_top", entry.data.get("buy_top", 5))
sell_top = entry.options.get("sell_top", entry.data.get("sell_top", 5))
buy_worst = entry.options.get("buy_worst", entry.data.get("buy_worst", 5))
sell_worst = entry.options.get("sell_worst", entry.data.get("sell_worst", 5))
mqtt_48h_mode = entry.options.get(CONF_MQTT_48H_MODE, False)
retry_attempts = entry.options.get(CONF_RETRY_ATTEMPTS, DEFAULT_RETRY_ATTEMPTS)
retry_delay = entry.options.get(CONF_RETRY_DELAY, DEFAULT_RETRY_DELAY)
_LOGGER.debug("Setting up Pstryk sensors with buy_top=%d, sell_top=%d, buy_worst=%d, sell_worst=%d, mqtt_48h_mode=%s, retry_attempts=%d, retry_delay=%ds",
buy_top, sell_top, buy_worst, sell_worst, mqtt_48h_mode, retry_attempts, retry_delay)
# Load translations once for all sensors
global _TRANSLATIONS_CACHE
if not _TRANSLATIONS_CACHE:
try:
_TRANSLATIONS_CACHE = await async_get_translations(
hass, hass.config.language, DOMAIN
)
except Exception as ex:
_LOGGER.warning("Failed to load translations: %s", ex)
_TRANSLATIONS_CACHE = {}
# Cleanup old coordinators if they exist
for price_type in ("buy", "sell"):
key = f"{entry.entry_id}_{price_type}"
coordinator = hass.data[DOMAIN].get(key)
if coordinator:
_LOGGER.debug("Cleaning up existing %s coordinator", price_type)
# Cancel scheduled updates
if hasattr(coordinator, '_unsub_hourly') and coordinator._unsub_hourly:
coordinator._unsub_hourly()
if hasattr(coordinator, '_unsub_midnight') and coordinator._unsub_midnight:
coordinator._unsub_midnight()
if hasattr(coordinator, '_unsub_afternoon') and coordinator._unsub_afternoon:
coordinator._unsub_afternoon()
# Remove from hass data
hass.data[DOMAIN].pop(key, None)
# Cleanup old cost coordinator if exists
cost_key = f"{entry.entry_id}_cost"
cost_coordinator = hass.data[DOMAIN].get(cost_key)
if cost_coordinator:
_LOGGER.debug("Cleaning up existing cost coordinator")
if hasattr(cost_coordinator, '_unsub_hourly') and cost_coordinator._unsub_hourly:
cost_coordinator._unsub_hourly()
if hasattr(cost_coordinator, '_unsub_midnight') and cost_coordinator._unsub_midnight:
cost_coordinator._unsub_midnight()
hass.data[DOMAIN].pop(cost_key, None)
# Create shared API client (or reuse existing one)
api_client_key = f"{entry.entry_id}_api_client"
if api_client_key not in hass.data[DOMAIN]:
api_client = PstrykAPIClient(hass, api_key)
hass.data[DOMAIN][api_client_key] = api_client
else:
api_client = hass.data[DOMAIN][api_client_key]
entities = []
coordinators = []
# Create price coordinators first
for price_type in ("buy", "sell"):
key = f"{entry.entry_id}_{price_type}"
coordinator = PstrykDataUpdateCoordinator(
hass,
api_client,
price_type,
mqtt_48h_mode,
retry_attempts,
retry_delay
)
coordinators.append((coordinator, price_type, key))
# Create cost coordinator (will be initialized as unavailable for lazy loading)
cost_coordinator = PstrykCostDataUpdateCoordinator(hass, api_client)
cost_coordinator.last_update_success = False
coordinators.append((cost_coordinator, "cost", cost_key))
# Initialize ONLY price coordinators immediately (fast startup)
# Cost coordinator will be loaded lazily in background
_LOGGER.info("Starting quick initialization - loading price coordinators only")
async def safe_initial_fetch(coord, coord_type):
"""Safely fetch initial data for coordinator with timeout."""
try:
# Add timeout to prevent blocking startup
data = await asyncio.wait_for(
coord._async_update_data(),
timeout=20.0 # 20 seconds max per coordinator
)
coord.data = data
coord.last_update_success = True
_LOGGER.debug("Successfully initialized %s coordinator", coord_type)
return True
except asyncio.TimeoutError:
_LOGGER.warning("Timeout initializing %s coordinator - will retry later", coord_type)
coord.last_update_success = False
return False
except Exception as err:
_LOGGER.error("Failed initial fetch for %s coordinator: %s", coord_type, err)
coord.last_update_success = False
return err
# Load only price coordinators immediately for fast startup
price_coordinators = [(c, t, k) for c, t, k in coordinators if t in ("buy", "sell")]
initial_refresh_tasks = [
safe_initial_fetch(coordinator, coordinator_type)
for coordinator, coordinator_type, _ in price_coordinators
]
refresh_results = await asyncio.gather(*initial_refresh_tasks, return_exceptions=True)
# Track failed coordinators for quick retry
failed_coordinators = []
# Check results for price coordinators
for i, (coordinator, coordinator_type, key) in enumerate(price_coordinators):
if isinstance(refresh_results[i], Exception) or refresh_results[i] is False:
_LOGGER.warning("Coordinator %s failed initial load - scheduling retry with backoff",
coordinator_type)
failed_coordinators.append((coordinator, coordinator_type))
# Schedule exponential backoff retry for failed coordinators
# Delays: 2, 4, 8, 16, 32 minutes (5 attempts)
if failed_coordinators:
async def exponential_backoff_retry():
"""Retry failed coordinators with exponential backoff."""
base_delay = 120 # 2 minutes
max_attempts = 5
for attempt in range(max_attempts):
delay = base_delay * (2 ** attempt) # 2, 4, 8, 16, 32 minutes
# Check if any coordinators still need retry
coords_to_retry = [
(c, t) for c, t in failed_coordinators
if not c.last_update_success
]
if not coords_to_retry:
_LOGGER.info("All coordinators recovered, stopping backoff retry")
return
_LOGGER.info(
"Backoff retry attempt %d/%d in %d seconds for %d coordinator(s)",
attempt + 1, max_attempts, delay, len(coords_to_retry)
)
await asyncio.sleep(delay)
for coord, coord_type in coords_to_retry:
if not coord.last_update_success:
_LOGGER.info("Retry attempt %d for %s coordinator", attempt + 1, coord_type)
try:
await coord.async_request_refresh()
if coord.last_update_success:
_LOGGER.info("%s coordinator recovered on attempt %d", coord_type, attempt + 1)
except Exception as e:
_LOGGER.warning("Retry attempt %d failed for %s: %s", attempt + 1, coord_type, e)
# Final check
still_failed = [t for c, t in failed_coordinators if not c.last_update_success]
if still_failed:
_LOGGER.error(
"Coordinators %s failed after %d retry attempts. Will use hourly schedule.",
still_failed, max_attempts
)
hass.async_create_task(exponential_backoff_retry())
# Store all coordinators and set up scheduling
buy_coord = None
sell_coord = None
for coordinator, coordinator_type, key in coordinators:
# Store coordinator
hass.data[DOMAIN][key] = coordinator
# Schedule updates for price coordinators
if coordinator_type in ("buy", "sell"):
coordinator.schedule_hourly_update()
coordinator.schedule_midnight_update()
# Schedule afternoon update if 48h mode is enabled
if mqtt_48h_mode:
coordinator.schedule_afternoon_update()
# Create ONLY current price sensors (fast, immediate)
top = buy_top if coordinator_type == "buy" else sell_top
worst = buy_worst if coordinator_type == "buy" else sell_worst
entities.append(PstrykPriceSensor(coordinator, coordinator_type, top, worst, entry.entry_id))
# Store coordinator references for later use
if coordinator_type == "buy":
buy_coord = coordinator
elif coordinator_type == "sell":
sell_coord = coordinator
# Schedule updates for cost coordinator
elif coordinator_type == "cost":
coordinator.schedule_hourly_update()
coordinator.schedule_midnight_update()
# Create remaining sensors (average price + financial balance) - they will show as unavailable initially
remaining_entities = []
# Create average price sensors for buy
if buy_coord:
for period in ("daily", "monthly", "yearly"):
remaining_entities.append(PstrykAveragePriceSensor(
cost_coordinator, buy_coord, period, entry.entry_id
))
# Create average price sensors for sell
if sell_coord:
for period in ("daily", "monthly", "yearly"):
remaining_entities.append(PstrykAveragePriceSensor(
cost_coordinator, sell_coord, period, entry.entry_id
))
# Create financial balance sensors
for period in ("daily", "monthly", "yearly"):
remaining_entities.append(PstrykFinancialBalanceSensor(
cost_coordinator, period, entry.entry_id
))
# Create battery recommendation sensor if enabled
battery_enabled = entry.options.get(CONF_BATTERY_ENABLED, False)
if battery_enabled and buy_coord:
battery_sensor = PstrykBatteryRecommendationSensor(
coordinator=buy_coord,
entry_id=entry.entry_id,
soc_entity_id=entry.options.get(CONF_BATTERY_SOC_ENTITY, ""),
capacity=entry.options.get(CONF_BATTERY_CAPACITY, DEFAULT_BATTERY_CAPACITY),
charge_rate=entry.options.get(CONF_BATTERY_CHARGE_RATE, DEFAULT_BATTERY_CHARGE_RATE),
discharge_rate=entry.options.get(CONF_BATTERY_DISCHARGE_RATE, DEFAULT_BATTERY_DISCHARGE_RATE),
min_soc=entry.options.get(CONF_BATTERY_MIN_SOC, DEFAULT_BATTERY_MIN_SOC),
charge_hours_count=entry.options.get(CONF_BATTERY_CHARGE_HOURS, DEFAULT_BATTERY_CHARGE_HOURS),
discharge_multiplier=entry.options.get(CONF_BATTERY_DISCHARGE_MULTIPLIER, DEFAULT_BATTERY_DISCHARGE_MULTIPLIER),
)
remaining_entities.append(battery_sensor)
_LOGGER.info("Battery recommendation sensor enabled with SoC entity: %s",
entry.options.get(CONF_BATTERY_SOC_ENTITY, "not configured"))
# Register ALL sensors immediately:
# - Current price sensors (2) with data
# - Remaining sensors (15) as unavailable until cost coordinator loads
_LOGGER.info("Registering %d current price sensors with data and %d additional sensors as unavailable",
len(entities), len(remaining_entities))
async_add_entities(entities + remaining_entities)
# Load cost coordinator data in background - sensors will automatically update when data arrives
async def lazy_load_cost_data():
"""Load cost coordinator data in background - sensors update automatically via coordinator."""
_LOGGER.info("Waiting 15 seconds before loading cost coordinator data")
await asyncio.sleep(15)
_LOGGER.info("Loading cost coordinator data in background")
try:
# Load cost coordinator with all resolutions
data = await cost_coordinator._async_update_data(fetch_all=True)
cost_coordinator.data = data
cost_coordinator.last_update_success = True
# Notify all listening sensors that data is available
cost_coordinator.async_update_listeners()
_LOGGER.info("Cost coordinator loaded successfully - %d sensors updated",
len(remaining_entities))
except Exception as err:
_LOGGER.warning("Failed to load cost coordinator: %s. %d sensors remain unavailable.",
err, len(remaining_entities))
cost_coordinator.last_update_success = False
cost_coordinator.data = None
# Start background data loading
hass.async_create_task(lazy_load_cost_data())
class PstrykPriceSensor(CoordinatorEntity, SensorEntity):
"""Combined price sensor with table data attributes."""
# Note: state_class removed - MONETARY device_class doesn't support MEASUREMENT
def __init__(self, coordinator: PstrykDataUpdateCoordinator, price_type: str, top_count: int, worst_count: int, entry_id: str):
super().__init__(coordinator)
self.price_type = price_type
self.top_count = top_count
self.worst_count = worst_count
self.entry_id = entry_id
self._attr_device_class = SensorDeviceClass.MONETARY
self._cached_sorted_prices = None
self._last_data_hash = None
async def async_added_to_hass(self):
"""When entity is added to Home Assistant."""
await super().async_added_to_hass()
@property
def name(self) -> str:
return f"Pstryk Current {self.price_type.title()} Price"
@property
def unique_id(self) -> str:
return f"{DOMAIN}_{self.price_type}_price"
@property
def device_info(self):
"""Return device information."""
return {
"identifiers": {(DOMAIN, "pstryk_energy")},
"name": "Pstryk Energy",
"manufacturer": "Pstryk",
"model": "Energy Price Monitor",
"sw_version": get_integration_version(self.hass),
}
def _get_current_price(self):
"""Get current price based on current time."""
if not self.coordinator.data or not self.coordinator.data.get("prices"):
return None
now_utc = dt_util.utcnow()
for price_entry in self.coordinator.data.get("prices", []):
try:
if "start" not in price_entry:
continue
price_datetime = dt_util.parse_datetime(price_entry["start"])
if not price_datetime:
continue
# Konwersja do UTC dla porównania
price_datetime_utc = dt_util.as_utc(price_datetime)
price_end_utc = price_datetime_utc + timedelta(hours=1)
if price_datetime_utc <= now_utc < price_end_utc:
return price_entry.get("price")
except Exception as e:
_LOGGER.error("Error determining current price: %s", str(e))
return None
@property
def native_value(self):
if self.coordinator.data is None:
return None
# Próbujemy znaleźć aktualną cenę na podstawie czasu
current_price = self._get_current_price()
# Jeśli nie znaleźliśmy, używamy wartości z koordynatora
if current_price is None:
current_price = self.coordinator.data.get("current")
return current_price
@property
def native_unit_of_measurement(self) -> str:
return "PLN/kWh"
def _get_next_hour_price(self) -> dict:
"""Get price data for the next hour."""
if not self.coordinator.data:
return None
now = dt_util.as_local(dt_util.utcnow())
next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
# Use translations for debug messages
debug_msg = _TRANSLATIONS_CACHE.get(
"debug.looking_for_next_hour",
"Looking for price for next hour: {next_hour}"
).format(next_hour=next_hour.strftime("%Y-%m-%d %H:%M:%S"))
_LOGGER.debug(debug_msg)
# Check if we're looking for the next day's hour (midnight)
is_looking_for_next_day = next_hour.day != now.day
# First check in prices_today
price_found = None
if self.coordinator.data.get("prices_today"):
for price_data in self.coordinator.data.get("prices_today", []):
if "start" not in price_data:
continue
try:
price_datetime = dt_util.parse_datetime(price_data["start"])
if not price_datetime:
continue
price_datetime = dt_util.as_local(price_datetime)
if price_datetime.hour == next_hour.hour and price_datetime.day == next_hour.day:
price_found = price_data.get("price")
_LOGGER.debug("Found price for %s in today's list: %s", next_hour.strftime("%Y-%m-%d %H:%M:%S"), price_found)
return price_found
except Exception as e:
error_msg = _TRANSLATIONS_CACHE.get(
"debug.error_processing_date",
"Error processing date: {error}"
).format(error=str(e))
_LOGGER.error(error_msg)
# Always check the full list as a fallback, regardless of day
if self.coordinator.data.get("prices"):
_LOGGER.debug("Looking for price in full 48h list as fallback")
for price_data in self.coordinator.data.get("prices", []):
if "start" not in price_data:
continue
try:
price_datetime = dt_util.parse_datetime(price_data["start"])
if not price_datetime:
continue
price_datetime = dt_util.as_local(price_datetime)
# Check if this matches the hour and day we're looking for
if price_datetime.hour == next_hour.hour and price_datetime.day == next_hour.day:
price_found = price_data.get("price")
_LOGGER.debug("Found price for %s in full 48h list: %s", next_hour.strftime("%Y-%m-%d %H:%M:%S"), price_found)
return price_found
except Exception as e:
full_list_error_msg = _TRANSLATIONS_CACHE.get(
"debug.error_processing_full_list",
"Error processing date for full list: {error}"
).format(error=str(e))
_LOGGER.error(full_list_error_msg)
# If no price found for next hour
if is_looking_for_next_day:
midnight_msg = _TRANSLATIONS_CACHE.get(
"debug.no_price_midnight",
"No price found for next day midnight. Data probably not loaded yet."
)
_LOGGER.info(midnight_msg)
else:
no_price_msg = _TRANSLATIONS_CACHE.get(
"debug.no_price_next_hour",
"No price found for next hour: {next_hour}"
).format(next_hour=next_hour.strftime("%Y-%m-%d %H:%M:%S"))
_LOGGER.warning(no_price_msg)
return None
def _get_cached_sorted_prices(self, today):
"""Get cached sorted prices or compute if data changed."""
# Create a simple hash of the data to detect changes
data_hash = hash(tuple((p["start"], p["price"]) for p in today))
if self._last_data_hash != data_hash or self._cached_sorted_prices is None:
_LOGGER.debug("Price data changed, recalculating sorted prices")
# Sortowanie dla najlepszych cen
sorted_best = sorted(
today,
key=lambda x: x["price"],
reverse=(self.price_type == "sell"),
)
# Sortowanie dla najgorszych cen (odwrotna kolejność sortowania)
sorted_worst = sorted(
today,
key=lambda x: x["price"],
reverse=(self.price_type != "sell"),
)
self._cached_sorted_prices = {
"best": sorted_best[: self.top_count],
"worst": sorted_worst[: self.worst_count]
}
self._last_data_hash = data_hash
return self._cached_sorted_prices
def _is_likely_placeholder_data(self, prices_for_day):
"""Check if prices for a day are likely placeholders.
Returns True if:
- There are no prices
- ALL prices have exactly the same value (suggesting API returned default values)
- There are too many consecutive hours with the same value (e.g., 10+ hours)
"""
if not prices_for_day:
return True
# Get all price values
price_values = [p.get("price") for p in prices_for_day if p.get("price") is not None]
if not price_values:
return True
# If we have less than 20 prices for a day, it's incomplete data
if len(price_values) < 20:
_LOGGER.debug(f"Only {len(price_values)} prices for the day, likely incomplete data")
return True
# Check if ALL values are identical
unique_values = set(price_values)
if len(unique_values) == 1:
_LOGGER.debug(f"All {len(price_values)} prices have the same value ({price_values[0]}), likely placeholders")
return True
# Additional check: if more than 90% of values are the same, it's suspicious
most_common_value = max(set(price_values), key=price_values.count)
count_most_common = price_values.count(most_common_value)
if count_most_common / len(price_values) > 0.9:
_LOGGER.debug(f"{count_most_common}/{len(price_values)} prices have value {most_common_value}, likely placeholders")
return True
return False
def _count_consecutive_same_values(self, prices):
"""Count maximum consecutive hours with the same price."""
if not prices:
return 0
# Sort by time to ensure consecutive checking
sorted_prices = sorted(prices, key=lambda x: x.get("start", ""))
max_consecutive = 1
current_consecutive = 1
last_value = None
for price in sorted_prices:
value = price.get("price")
if value is not None:
if value == last_value:
current_consecutive += 1
max_consecutive = max(max_consecutive, current_consecutive)
else:
current_consecutive = 1
last_value = value
return max_consecutive
def _get_mqtt_price_count(self):
"""Get the actual count of prices that would be published to MQTT."""
if not self.coordinator.data:
return 0
if not self.coordinator.mqtt_48h_mode:
# If not in 48h mode, we only publish today's prices
prices_today = self.coordinator.data.get("prices_today", [])
return len(prices_today)
else:
# In 48h mode, we need to count valid prices
all_prices = self.coordinator.data.get("prices", [])
# Just count today's prices as they're always valid
now = dt_util.as_local(dt_util.utcnow())
today_str = now.strftime("%Y-%m-%d")
today_prices = [p for p in all_prices if p.get("start", "").startswith(today_str)]
# For tomorrow, check if data looks valid
tomorrow_str = (now + timedelta(days=1)).strftime("%Y-%m-%d")
tomorrow_prices = [p for p in all_prices if p.get("start", "").startswith(tomorrow_str)]
# Count today's prices always
valid_count = len(today_prices)
# Add tomorrow's prices only if they look like real data
if tomorrow_prices and not self._is_likely_placeholder_data(tomorrow_prices):
valid_count += len(tomorrow_prices)
return valid_count
def _get_sunrise_sunset_average(self, today_prices):
"""Calculate average price between sunrise and sunset."""
if not today_prices:
return None
# Get sun entity
sun_entity = self.hass.states.get("sun.sun")
if not sun_entity:
_LOGGER.debug("Sun entity not available")
return None
# Get sunrise and sunset times from attributes
sunrise_attr = sun_entity.attributes.get("next_rising")
sunset_attr = sun_entity.attributes.get("next_setting")
if not sunrise_attr or not sunset_attr:
_LOGGER.debug("Sunrise/sunset times not available")
return None
# Parse sunrise and sunset times
try:
sunrise = dt_util.parse_datetime(sunrise_attr)
sunset = dt_util.parse_datetime(sunset_attr)
if not sunrise or not sunset:
return None
# Convert to local time
sunrise_local = dt_util.as_local(sunrise)
sunset_local = dt_util.as_local(sunset)
# If sunrise is tomorrow, use today's sunrise from calculation
now = dt_util.now()
if sunrise_local.date() > now.date():
# Calculate approximate sunrise for today (subtract 24h)
sunrise_local = sunrise_local - timedelta(days=1)
# If sunset is tomorrow, we're after sunset today
if sunset_local.date() > now.date():
# Use previous sunset
sunset_local = sunset_local - timedelta(days=1)
_LOGGER.debug(f"Calculating s/s average between {sunrise_local.strftime('%H:%M')} and {sunset_local.strftime('%H:%M')}")
# Get prices between sunrise and sunset
sunrise_sunset_prices = []
for price_entry in today_prices:
if "start" not in price_entry or "price" not in price_entry:
continue
price_time = dt_util.parse_datetime(price_entry["start"])
if not price_time:
continue
price_time_local = dt_util.as_local(price_time)
# Check if price hour is between sunrise and sunset
# We check the start of the hour
if sunrise_local <= price_time_local < sunset_local:
price_value = price_entry.get("price")
if price_value is not None:
sunrise_sunset_prices.append(price_value)
# Calculate average
if sunrise_sunset_prices:
avg = round(sum(sunrise_sunset_prices) / len(sunrise_sunset_prices), 2)
_LOGGER.debug(f"S/S average calculated from {len(sunrise_sunset_prices)} hours: {avg}")
return avg
else:
_LOGGER.debug("No prices found between sunrise and sunset")
return None
except Exception as e:
_LOGGER.error(f"Error calculating sunrise/sunset average: {e}")
return None
@property
def extra_state_attributes(self) -> dict:
"""Include the price table attributes in the current price sensor."""
now = dt_util.as_local(dt_util.utcnow())
# Get translated attribute names from cache
next_hour_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.next_hour",
"Next hour"
)
using_cached_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.using_cached_data",
"Using cached data"
)
all_prices_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.all_prices",
"All prices"
)
best_prices_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.best_prices",
"Best prices"
)
worst_prices_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.worst_prices",
"Worst prices"
)
best_count_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.best_count",
"Best count"
)
worst_count_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.worst_count",
"Worst count"
)
price_count_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.price_count",
"Price count"
)
last_updated_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.last_updated",
"Last updated"
)
avg_price_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.avg_price",
"Average price today"
)
avg_price_remaining_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.avg_price_remaining",
"Average price remaining"
)
avg_price_full_day_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.avg_price_full_day",
"Average price full day"
)
tomorrow_available_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.tomorrow_available",
"Tomorrow prices available"
)
mqtt_price_count_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.mqtt_price_count",
"MQTT price count"
)
# Add sunrise/sunset average key
avg_price_sunrise_sunset_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.avg_price_sunrise_sunset",
"Average price today s/s"
)
if self.coordinator.data is None:
return {
f"{avg_price_key} /0": None,
f"{avg_price_key} /24": None,
avg_price_sunrise_sunset_key: None,
next_hour_key: None,
all_prices_key: [],
"all_prices": [],
"prices_today": [],
best_prices_key: [],
worst_prices_key: [],
best_count_key: self.top_count,
worst_count_key: self.worst_count,
price_count_key: 0,
using_cached_key: False,
tomorrow_available_key: False,
mqtt_price_count_key: 0
}
next_hour_data = self._get_next_hour_price()
today = self.coordinator.data.get("prices_today", [])
all_prices_list = self.coordinator.data.get("prices", [])
is_cached = self.coordinator.data.get("is_cached", False)
# Calculate average price for remaining hours today (from current hour)
avg_price_remaining = None
remaining_hours_count = 0
avg_price_full_day = None
if today:
# Full day average (all 24 hours)
total_price_full = sum(p.get("price", 0) for p in today if p.get("price") is not None)
valid_prices_count_full = sum(1 for p in today if p.get("price") is not None)
if valid_prices_count_full > 0:
avg_price_full_day = round(total_price_full / valid_prices_count_full, 2)
# Remaining hours average (from current hour onwards)
current_hour = now.strftime("%Y-%m-%dT%H:")
remaining_prices = []
for p in today:
if p.get("price") is not None and p.get("start", "") >= current_hour:
remaining_prices.append(p.get("price"))
remaining_hours_count = len(remaining_prices)
if remaining_hours_count > 0:
avg_price_remaining = round(sum(remaining_prices) / remaining_hours_count, 2)
# Calculate sunrise to sunset average
avg_price_sunrise_sunset = self._get_sunrise_sunset_average(today)
# Create keys with hour count in user's preferred format
avg_price_remaining_with_hours = f"{avg_price_key} /{remaining_hours_count}"
avg_price_full_day_with_hours = f"{avg_price_key} /24"
# Check if tomorrow's prices are available (more robust check)
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
tomorrow_prices = []
# Only check for tomorrow prices if we have a reasonable amount of data
if len(all_prices_list) > 0:
tomorrow_prices = [p for p in all_prices_list if p.get("start", "").startswith(tomorrow)]
# Log what we found for debugging
if tomorrow_prices:
unique_values = set(p.get("price") for p in tomorrow_prices if p.get("price") is not None)
consecutive = self._count_consecutive_same_values(tomorrow_prices)
_LOGGER.debug(
f"Tomorrow has {len(tomorrow_prices)} prices, "
f"{len(unique_values)} unique values, "
f"max {consecutive} consecutive same values"
)
# Tomorrow is available only if:
# 1. We have at least 20 hours of data for tomorrow
# 2. The data doesn't look like placeholders
tomorrow_available = (
len(tomorrow_prices) >= 20 and
not self._is_likely_placeholder_data(tomorrow_prices)
)
# Get cached sorted prices
sorted_prices = self._get_cached_sorted_prices(today) if today else {"best": [], "worst": []}
# Get actual MQTT price count
mqtt_price_count = self._get_mqtt_price_count()
return {
avg_price_remaining_with_hours: avg_price_remaining,
avg_price_full_day_with_hours: avg_price_full_day,
avg_price_sunrise_sunset_key: avg_price_sunrise_sunset,
next_hour_key: next_hour_data,
all_prices_key: today,
"all_prices": all_prices_list,
"prices_today": today,
best_prices_key: sorted_prices["best"],
worst_prices_key: sorted_prices["worst"],
best_count_key: self.top_count,
worst_count_key: self.worst_count,
price_count_key: len(today),
last_updated_key: now.strftime("%Y-%m-%d %H:%M:%S"),
using_cached_key: is_cached,
tomorrow_available_key: tomorrow_available,
"tomorrow_available": tomorrow_available,
mqtt_price_count_key: mqtt_price_count,
"mqtt_48h_mode": self.coordinator.mqtt_48h_mode
}
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self.coordinator.data is not None
class PstrykAveragePriceSensor(RestoreEntity, SensorEntity):
"""Average price sensor using weighted averages from API data."""
# Note: state_class removed - MONETARY device_class doesn't support MEASUREMENT
def __init__(self, cost_coordinator: PstrykCostDataUpdateCoordinator,
price_coordinator: PstrykDataUpdateCoordinator,
period: str, entry_id: str):
"""Initialize the average price sensor."""
self.cost_coordinator = cost_coordinator
self.price_coordinator = price_coordinator
self.price_type = price_coordinator.price_type
self.period = period # 'daily', 'monthly' or 'yearly'
self.entry_id = entry_id
self._attr_device_class = SensorDeviceClass.MONETARY
self._state = None
self._energy_bought = 0.0
self._energy_sold = 0.0
self._total_cost = 0.0
self._total_revenue = 0.0
async def async_added_to_hass(self):
"""Restore state when entity is added."""
await super().async_added_to_hass()
# Subscribe to cost coordinator updates
self.async_on_remove(
self.cost_coordinator.async_add_listener(self._handle_cost_update)
)
# Restore previous state
last_state = await self.async_get_last_state()
if last_state and last_state.state not in (None, "unknown", "unavailable"):
try:
self._state = float(last_state.state)
# Restore attributes
if last_state.attributes:
self._energy_bought = float(last_state.attributes.get("energy_bought", 0))
self._energy_sold = float(last_state.attributes.get("energy_sold", 0))
self._total_cost = float(last_state.attributes.get("total_cost", 0))
self._total_revenue = float(last_state.attributes.get("total_revenue", 0))
_LOGGER.debug("Restored weighted average for %s %s: %s",
self.price_type, self.period, self._state)
except (ValueError, TypeError):
_LOGGER.warning("Could not restore state for %s", self.name)
@property
def name(self) -> str:
"""Return the name of the sensor."""
period_name = _TRANSLATIONS_CACHE.get(
f"entity.sensor.period_{self.period}",
self.period.title()
)
return f"Pstryk {self.price_type.title()} {period_name} Average"
@property
def unique_id(self) -> str:
"""Return unique ID."""
return f"{DOMAIN}_{self.price_type}_{self.period}_average"
@property
def device_info(self):
"""Return device information."""
return {
"identifiers": {(DOMAIN, "pstryk_energy")},
"name": "Pstryk Energy",
"manufacturer": "Pstryk",
"model": "Energy Price Monitor",
"sw_version": get_integration_version(self.hass),
}
@property
def native_value(self):
"""Return the state of the sensor."""
return self._state
@property
def native_unit_of_measurement(self) -> str:
"""Return the unit of measurement."""
return "PLN/kWh"
@property
def extra_state_attributes(self) -> dict:
"""Return extra state attributes."""
period_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.period",
"Period"
)
calculation_method_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.calculation_method",
"Calculation method"
)
energy_bought_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.energy_bought",
"Energy bought"
)
energy_sold_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.energy_sold",
"Energy sold"
)
total_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.total_cost",
"Total cost"
)
total_revenue_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.total_revenue",
"Total revenue"
)
attrs = {
period_key: self.period,
calculation_method_key: "Weighted average",
}
# Add energy and cost data if available
if self.price_type == "buy" and self._energy_bought > 0:
attrs[energy_bought_key] = round(self._energy_bought, 2)
attrs[total_cost_key] = round(self._total_cost, 2)
elif self.price_type == "sell" and self._energy_sold > 0:
attrs[energy_sold_key] = round(self._energy_sold, 2)
attrs[total_revenue_key] = round(self._total_revenue, 2)
# Add last updated at the bottom
last_updated_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.last_updated",
"Last updated"
)
now = dt_util.now()
attrs[last_updated_key] = now.strftime("%Y-%m-%d %H:%M:%S")
return attrs
@callback
def _handle_cost_update(self) -> None:
"""Handle updated data from the cost coordinator."""
if not self.cost_coordinator or not self.cost_coordinator.data:
return
period_data = self.cost_coordinator.data.get(self.period)
if not period_data:
return
# Calculate weighted average based on actual costs and usage
if self.price_type == "buy":
# For buy price: total cost / total energy bought
total_cost = abs(period_data.get("total_cost", 0)) # Already calculated in coordinator
energy_bought = period_data.get("fae_usage", 0) # kWh from usage API
if energy_bought > 0:
self._state = round(total_cost / energy_bought, 4)
self._energy_bought = energy_bought
self._total_cost = total_cost
else:
self._state = None
elif self.price_type == "sell":
# For sell price: total revenue / total energy sold
total_revenue = period_data.get("total_sold", 0)
energy_sold = period_data.get("rae_usage", 0) # kWh from usage API
if energy_sold > 0:
self._state = round(total_revenue / energy_sold, 4)
self._energy_sold = energy_sold
self._total_revenue = total_revenue
else:
self._state = None
self.async_write_ha_state()
@property
def available(self) -> bool:
"""Return if entity is available."""
return (self.cost_coordinator is not None and
self.cost_coordinator.last_update_success and
self.cost_coordinator.data is not None)
class PstrykFinancialBalanceSensor(CoordinatorEntity, SensorEntity):
"""Financial balance sensor that gets data directly from API."""
_attr_state_class = SensorStateClass.TOTAL
_attr_device_class = SensorDeviceClass.MONETARY
def __init__(self, coordinator: PstrykCostDataUpdateCoordinator,
period: str, entry_id: str):
"""Initialize the financial balance sensor."""
super().__init__(coordinator)
self.period = period # 'daily', 'monthly', or 'yearly'
self.entry_id = entry_id
@property
def name(self) -> str:
"""Return the name of the sensor."""
period_name = _TRANSLATIONS_CACHE.get(
f"entity.sensor.period_{self.period}",
self.period.title()
)
balance_text = _TRANSLATIONS_CACHE.get(
"entity.sensor.financial_balance",
"Financial Balance"
)
return f"Pstryk {period_name} {balance_text}"
@property
def unique_id(self) -> str:
"""Return unique ID."""
return f"{DOMAIN}_financial_balance_{self.period}"
@property
def device_info(self):
"""Return device information."""
return {
"identifiers": {(DOMAIN, "pstryk_energy")},
"name": "Pstryk Energy",
"manufacturer": "Pstryk",
"model": "Energy Price Monitor",
"sw_version": get_integration_version(self.hass),
}
@property
def native_value(self):
"""Return the state of the sensor from API data."""
if not self.coordinator.data:
return None
period_data = self.coordinator.data.get(self.period)
if not period_data or "total_balance" not in period_data:
return None
# Get the balance value from API
balance = period_data.get("total_balance")
# Return exact value from API without rounding
return balance
@property
def native_unit_of_measurement(self) -> str:
"""Return the unit of measurement."""
return "PLN"
@property
def icon(self) -> str:
"""Return the icon based on balance."""
if self.native_value is None:
return "mdi:currency-usd-off"
elif self.native_value < 0:
return "mdi:cash-minus" # We're paying
elif self.native_value > 0:
return "mdi:cash-plus" # We're earning
else:
return "mdi:cash"
@property
def extra_state_attributes(self) -> dict:
"""Return extra state attributes from API data."""
if not self.coordinator.data or not self.coordinator.data.get(self.period):
return {}
period_data = self.coordinator.data.get(self.period)
frame = period_data.get("frame", {})
# Get translated attribute names
buy_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.buy_cost",
"Buy cost"
)
sell_revenue_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.sell_revenue",
"Sell revenue"
)
period_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.period",
"Period"
)
net_balance_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.balance",
"Balance"
)
energy_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.buy_cost",
"Buy cost"
)
distribution_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.distribution_cost",
"Distribution cost"
)
excise_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.excise",
"Excise"
)
vat_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.vat",
"VAT"
)
service_cost_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.service_cost",
"Service cost"
)
energy_bought_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.energy_bought",
"Energy bought"
)
energy_sold_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.energy_sold",
"Energy sold"
)
attrs = {
period_key: self.period,
net_balance_key: period_data.get("total_balance", 0),
buy_cost_key: period_data.get("total_cost", 0),
sell_revenue_key: period_data.get("total_sold", 0),
energy_bought_key: period_data.get("fae_usage", 0),
energy_sold_key: period_data.get("rae_usage", 0),
}
# Add detailed cost breakdown if available
if frame:
# Konwertuj daty UTC na lokalne
start_utc = frame.get("start")
end_utc = frame.get("end")
start_local = dt_util.as_local(dt_util.parse_datetime(start_utc)) if start_utc else None
end_local = dt_util.as_local(dt_util.parse_datetime(end_utc)) if end_utc else None
attrs.update({
energy_cost_key: frame.get("fae_cost", 0),
distribution_cost_key: frame.get("var_dist_cost_net", 0) + frame.get("fix_dist_cost_net", 0),
excise_key: frame.get("excise", 0),
vat_key: frame.get("vat", 0),
service_cost_key: frame.get("service_cost_net", 0),
"start": start_local.strftime("%Y-%m-%d") if start_local else None,
"end": end_local.strftime("%Y-%m-%d") if end_local else None,
})
# Add last updated at the bottom
last_updated_key = _TRANSLATIONS_CACHE.get(
"entity.sensor.last_updated",
"Last updated"
)
now = dt_util.now()
attrs[last_updated_key] = now.strftime("%Y-%m-%d %H:%M:%S")
return attrs
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self.coordinator.data is not None
class PstrykBatteryRecommendationSensor(CoordinatorEntity, SensorEntity, RestoreEntity):
"""Battery charging recommendation sensor based on dynamic prices."""
# State values
STATE_CHARGE = "charge"
STATE_DISCHARGE = "discharge"
STATE_STANDBY = "standby"
def __init__(
self,
coordinator: PstrykDataUpdateCoordinator,
entry_id: str,
soc_entity_id: str,
capacity: int,
charge_rate: int,
discharge_rate: int,
min_soc: int,
charge_hours_count: int,
discharge_multiplier: float,
):
"""Initialize the battery recommendation sensor."""
super().__init__(coordinator)
self.entry_id = entry_id
self._soc_entity_id = soc_entity_id
self._capacity = capacity
self._charge_rate = charge_rate
self._discharge_rate = discharge_rate
self._min_soc = min_soc
self._charge_hours_count = charge_hours_count
self._discharge_multiplier = discharge_multiplier
self._attr_icon = "mdi:battery-clock"
self._unsub_soc_listener = None
self._stored_energy_price = 0.0 # Weighted average cost of energy in battery
async def async_added_to_hass(self) -> None:
"""Run when entity is added to hass."""
await super().async_added_to_hass()
# Restore state
last_state = await self.async_get_last_state()
if last_state:
try:
# Restore stored energy price if available
if "stored_energy_price" in last_state.attributes:
self._stored_energy_price = float(last_state.attributes["stored_energy_price"])
_LOGGER.debug("Restored stored energy price: %.4f PLN/kWh", self._stored_energy_price)
except (ValueError, TypeError):
_LOGGER.warning("Could not restore stored energy price")
# Subscribe to SoC entity state changes for immediate updates
if self._soc_entity_id:
@callback
def _async_soc_state_changed(event) -> None:
"""Handle SoC entity state changes."""
new_state = event.data.get("new_state")
old_state = event.data.get("old_state")
if new_state is None or new_state.state in ("unknown", "unavailable"):
return
# Update weighted average cost if SoC increased (Charging)
if old_state and old_state.state not in ("unknown", "unavailable"):
try:
old_soc = float(old_state.state)
new_soc = float(new_state.state)
if new_soc > old_soc:
self._update_weighted_cost(old_soc, new_soc)
except ValueError:
pass
_LOGGER.debug(
"SoC changed from %s to %s, triggering update",
old_state.state if old_state else "None",
new_state.state
)
# Schedule an update
self.async_write_ha_state()
self._unsub_soc_listener = async_track_state_change_event(
self.hass,
[self._soc_entity_id],
_async_soc_state_changed
)
_LOGGER.info(
"Battery recommendation sensor now listening to SoC changes from %s",
self._soc_entity_id
)
def _update_weighted_cost(self, old_soc: float, new_soc: float):
"""Calculate new weighted average cost when charging."""
# Get current price
current_price = self.coordinator.data.get("current")
if current_price is None:
return # Cannot calculate without price
# Calculate energy chunks
# Capacity is in kWh. SoC is %.
# Energy = (SoC / 100) * Capacity
energy_old = (old_soc / 100.0) * self._capacity
energy_added = ((new_soc - old_soc) / 100.0) * self._capacity
# If battery was empty OR if stored price is uninitialized (0.0), take new price as baseline
if energy_old <= 0.1 or self._stored_energy_price == 0.0:
self._stored_energy_price = current_price
else:
# Weighted Average:
# (Old_kWh * Old_Price) + (Added_kWh * Current_Price)
# ---------------------------------------------------
# (Old_kWh + Added_kWh)
total_value = (energy_old * self._stored_energy_price) + (energy_added * current_price)
total_energy = energy_old + energy_added
if total_energy > 0:
self._stored_energy_price = total_value / total_energy
_LOGGER.debug(
"Updated stored energy price: %.4f PLN/kWh (Added %.2f kWh @ %.2f)",
self._stored_energy_price, energy_added, current_price
)
async def async_will_remove_from_hass(self) -> None:
"""Run when entity is removed from hass."""
await super().async_will_remove_from_hass()
# Unsubscribe from SoC entity state changes
if self._unsub_soc_listener:
self._unsub_soc_listener()
self._unsub_soc_listener = None
_LOGGER.debug("Unsubscribed from SoC state changes")
@property
def name(self) -> str:
"""Return the name of the sensor."""
return "Pstryk Battery Recommendation"
@property
def unique_id(self) -> str:
"""Return unique ID."""
return f"{DOMAIN}_battery_recommendation"
@property
def device_info(self):
"""Return device information."""
return {
"identifiers": {(DOMAIN, "pstryk_energy")},
"name": "Pstryk Energy",
"manufacturer": "Pstryk",
"model": "Energy Price Monitor",
"sw_version": get_integration_version(self.hass),
}
def _get_current_soc(self) -> float | None:
"""Get current SoC from configured entity."""
if not self._soc_entity_id:
return None
state = self.hass.states.get(self._soc_entity_id)
if state is None or state.state in ("unknown", "unavailable"):
return None
try:
return float(state.state)
except (ValueError, TypeError):
_LOGGER.warning("Cannot parse SoC value from %s: %s", self._soc_entity_id, state.state)
return None
def _get_prices_with_hours(self) -> list[dict]:
"""Get prices with hour information from coordinator."""
if not self.coordinator.data:
return []
prices = self.coordinator.data.get("prices", [])
if not prices:
return []
result = []
for price_entry in prices:
try:
start_str = price_entry.get("start", "")
price = price_entry.get("price")
if not start_str or price is None:
continue
dt = dt_util.parse_datetime(start_str)
if dt:
dt_local = dt_util.as_local(dt)
result.append({
"hour": dt_local.hour,
"price": price,
"datetime": dt_local,
"date": dt_local.date()
})
except Exception as e:
_LOGGER.debug("Error parsing price entry: %s", e)
return result
def _calculate_recommendation(self) -> tuple[str, dict]:
"""Calculate battery recommendation based on prices and SoC."""
now = dt_util.now()
current_hour = now.hour
current_soc = self._get_current_soc()
prices = self._get_prices_with_hours()
# Default attributes
attrs = {
"current_price": None,
"current_soc": current_soc,
"stored_energy_price": round(self._stored_energy_price, 4),
"avg_charge_price": None,
"discharge_threshold": None,
"charge_hours": [],
"discharge_hours": [],
"standby_hours": [],
"soc_forecast": [],
"emergency_charge": False,
"pre_peak_charge": False,
"critical_hour": None,
"reason": "No data available",
"next_state_change": None,
"next_state": None,
"prices_horizon": "unknown",
"config": {
"charge_hours_count": self._charge_hours_count,
"discharge_multiplier": self._discharge_multiplier,
"min_soc": self._min_soc,
"charge_rate": self._charge_rate,
"discharge_rate": self._discharge_rate,
"capacity": self._capacity,
"soc_entity": self._soc_entity_id,
},
"last_updated": now.strftime("%Y-%m-%d %H:%M:%S"),
}
if not prices or len(prices) < 12:
return self.STATE_STANDBY, attrs
# Get today's prices only for hour classification
today = now.date()
today_prices = [p for p in prices if p["date"] == today]
if len(today_prices) < 12:
attrs["reason"] = f"Insufficient price data for today ({len(today_prices)} hours)"
return self.STATE_STANDBY, attrs
# ============================================================
# ENHANCED ALGORITHM: Multi-phase arbitrage detection
# ============================================================
# Instead of just picking N cheapest hours globally, we:
# 1. Find primary charge hours (night - cheapest globally)
# 2. Identify peaks (morning 7-10, evening 15-20)
# 3. Identify mid-day valley (11-14)
# 4. If mid-day valley is profitable vs evening peak, charge there too
# ============================================================
# Round-trip efficiency factor (20% losses = multiply by 1.25 to break even)
EFFICIENCY_FACTOR = 1.25
# Time block definitions
NIGHT_HOURS = set(range(0, 6)) # 00:00 - 05:59
MORNING_PEAK = set(range(6, 11)) # 06:00 - 10:59
MIDDAY_VALLEY = set(range(11, 15)) # 11:00 - 14:59
EVENING_PEAK = set(range(15, 21)) # 15:00 - 20:59
LATE_EVENING = set(range(21, 24)) # 21:00 - 23:59
# Helper to get prices for a set of hours
def get_prices_for_hours(hours_set):
return [p for p in today_prices if p["hour"] in hours_set]
def avg_price(price_list):
if not price_list:
return 0
return sum(p["price"] for p in price_list) / len(price_list)
# Get prices for each block
night_prices = get_prices_for_hours(NIGHT_HOURS)
morning_peak_prices = get_prices_for_hours(MORNING_PEAK)
midday_prices = get_prices_for_hours(MIDDAY_VALLEY)
evening_peak_prices = get_prices_for_hours(EVENING_PEAK)
late_evening_prices = get_prices_for_hours(LATE_EVENING)
# Calculate average prices per block
avg_night = avg_price(night_prices)
avg_morning_peak = avg_price(morning_peak_prices)
avg_midday = avg_price(midday_prices)
avg_evening_peak = avg_price(evening_peak_prices)
avg_late_evening = avg_price(late_evening_prices)
# Sort by price to find cheapest hours globally
sorted_by_price = sorted(today_prices, key=lambda x: x["price"])
# PRIMARY CHARGE: N cheapest hours (typically night)
primary_charge_data = sorted_by_price[:self._charge_hours_count]
charge_hours = set(p["hour"] for p in primary_charge_data)
avg_charge_price = avg_price(primary_charge_data)
# DISCHARGE THRESHOLD based on primary charge price
discharge_threshold = avg_charge_price * self._discharge_multiplier
# INTRA-DAY ARBITRAGE CHECK
# If mid-day valley price * efficiency < evening peak price, it's profitable
# to charge during mid-day and discharge in evening
midday_arbitrage_profitable = False
midday_charge_hours = set()
if midday_prices and evening_peak_prices:
# Find the 2-3 cheapest hours in mid-day valley
sorted_midday = sorted(midday_prices, key=lambda x: x["price"])
cheapest_midday = sorted_midday[:3] # Top 3 cheapest in valley
avg_cheapest_midday = avg_price(cheapest_midday)
# Check if charging mid-day is profitable for evening discharge
# breakeven = midday_price * 1.25 (accounting for 20% round-trip losses)
if avg_cheapest_midday * EFFICIENCY_FACTOR < avg_evening_peak:
midday_arbitrage_profitable = True
# Add mid-day valley hours where price * efficiency < evening peak avg
for p in midday_prices:
if p["price"] * EFFICIENCY_FACTOR < avg_evening_peak:
midday_charge_hours.add(p["hour"])
charge_hours.add(p["hour"])
# DETERMINE DISCHARGE HOURS
# Hours where price >= discharge_threshold AND not in charge_hours
discharge_hours = set(
p["hour"] for p in today_prices
if p["price"] >= discharge_threshold and p["hour"] not in charge_hours
)
# STANDBY HOURS = everything else
all_hours = set(range(24))
standby_hours = all_hours - charge_hours - discharge_hours
# Store arbitrage info in attributes
attrs["midday_arbitrage"] = {
"profitable": midday_arbitrage_profitable,
"midday_charge_hours": sorted(midday_charge_hours),
"avg_midday_price": round(avg_midday, 4) if midday_prices else None,
"avg_evening_peak": round(avg_evening_peak, 4) if evening_peak_prices else None,
"breakeven_price": round(avg_midday * EFFICIENCY_FACTOR, 4) if midday_prices else None,
}
# Get current price
current_price_data = next(
(p for p in today_prices if p["hour"] == current_hour),
None
)
current_price = current_price_data["price"] if current_price_data else None
# Update attributes
attrs.update({
"current_price": current_price,
"avg_charge_price": round(avg_charge_price, 4),
"discharge_threshold": round(discharge_threshold, 4),
"charge_hours": sorted(charge_hours),
"discharge_hours": sorted(discharge_hours),
"standby_hours": sorted(standby_hours),
"prices_horizon": "48h" if len(prices) > 24 else "24h",
})
# SoC-based logic (if SoC available)
emergency_charge = False
pre_peak_charge = False
critical_hour = None
if current_soc is not None:
# Simulate SoC forward to detect critical situations
soc_forecast = self._simulate_soc_forward(
current_hour, current_soc, charge_hours, discharge_hours
)
attrs["soc_forecast"] = soc_forecast[:12] # Next 12 hours
# Check for critical SoC drop
# We run this check regardless of current SoC to ensure safety.
for entry in soc_forecast:
if entry["soc"] < self._min_soc and entry["action"] != "charge":
critical_hour = entry["hour"]
# Check if there's a charge hour before critical
hours_until_critical = (critical_hour - current_hour) % 24
has_charge_before = any(
(current_hour + i) % 24 in charge_hours
for i in range(hours_until_critical)
)
# If no scheduled charge saves us, trigger emergency
if not has_charge_before:
emergency_charge = True
break
attrs["critical_hour"] = critical_hour
attrs["emergency_charge"] = emergency_charge
# --- FORWARD COVERAGE STRATEGY (Pre-Peak Charge) ---
# Look ahead 24h for "High Price" blocks where we WANT to discharge
# and ensure we have enough SoC to cover them.
# 1. Identify Target Discharge Hours in next 24h
# We look for prices > discharge_threshold
future_discharge_hours = []
# Filter prices for next 24h window
# We need to find the index of current hour in the prices list
# Since prices are sorted by time, we can just find the current hour entry
# Find index of current hour in the main 'prices' list
start_index = -1
for idx, p in enumerate(prices):
if p["date"] == today and p["hour"] == current_hour:
start_index = idx
break
if start_index != -1:
# Look at next 18 hours (typical planning horizon)
# CRITICAL FIX: Start looking from NEXT hour (start_index + 1).
# We want to find the *upcoming* peak. If we include the current hour,
# and the current hour is marginally high (1.23), it becomes the "peak start",
# making time_until_peak = 0, which disables Pre-Peak charging.
lookahead_window = prices[start_index + 1 : start_index + 19]
for p in lookahead_window:
if p["price"] >= discharge_threshold:
future_discharge_hours.append(p)
# 2. Calculate Required Capacity
# Required = (Hours * Discharge_Rate) + Min_SoC
# We group them into "blocks". If there is a block of 5 hours coming up,
# we need 5 * 10% + 20% = 70% SoC at the start of that block.
if future_discharge_hours:
# Find the start of the first major block
first_discharge_hour = future_discharge_hours[0]
# Count hours in that block (contiguous or close)
# For simplicity, we just count total high hours in next 12h
high_hours_count = len([p for p in future_discharge_hours if (p["datetime"] - first_discharge_hour["datetime"]).total_seconds() < 12*3600])
required_soc = (high_hours_count * self._discharge_rate) + self._min_soc
# 3. Gap Analysis
# Hysteresis Logic:
# If we are already charging due to coverage, we want to KEEP charging
# until we have a buffer (e.g., +5%) to prevent flip-flopping.
threshold_soc = required_soc + 2.0
# CRITICAL FIX: Only plan coverage charging if current price is LOW.
# If we are already in the high-price zone (current_price >= threshold),
# we should just discharge what we have and then stop. We should NOT panic-charge
# expensive energy just to discharge it again.
# REFINEMENT: "Low" is relative. 1.23 is high compared to night (0.80),
# but LOW compared to the upcoming peak (1.60).
# We should charge if current price is notably cheaper than the peak we are protecting against.
# Find min price in the upcoming discharge block
min_future_peak_price = min(p["price"] for p in future_discharge_hours) if future_discharge_hours else 0
# Allow charging if:
# 1. Price is generally cheap (< threshold)
# OR
# 2. Price is cheaper than the future peak (arbitrage opportunity to avoid running dry)
# We apply a safety margin (e.g., current must be < 95% of future peak min)
is_cheap_enough = False
if current_price is not None:
if current_price < discharge_threshold:
is_cheap_enough = True
elif current_price < (min_future_peak_price * 0.95):
is_cheap_enough = True
if current_soc < threshold_soc and is_cheap_enough:
# We have a deficit AND it is cheap enough to charge!
# Check if we are currently in the "Pre-Peak" window (before the high price starts)
time_until_peak = (first_discharge_hour["datetime"] - now).total_seconds() / 3600
if 0 < time_until_peak < 6: # If peak is approaching (within 6 hours)
# We need to charge NOW if this is a relatively cheap hour compared to the peak
# or if it's the only chance left.
# Find all hours between now and peak
available_hours = prices[start_index : start_index + int(time_until_peak) + 1]
# Sort them by price
available_hours_sorted = sorted(available_hours, key=lambda x: x["price"])
# How many hours do we need to charge to fill the gap?
# Gap = 30%. Charge rate = 30%/h. -> Need 1 hour.
soc_deficit = threshold_soc - current_soc
hours_needed = max(1, math.ceil(soc_deficit / self._charge_rate))
# Pick the cheapest N hours
cheapest_pre_peak = available_hours_sorted[:hours_needed]
# Is NOW one of them?
if any(p["hour"] == current_hour and p["date"] == today for p in cheapest_pre_peak):
pre_peak_charge = True
attrs["pre_peak_charge"] = True
attrs["reason"] = f"Forward Coverage: Charging for upcoming {high_hours_count}h peak (Target {threshold_soc:.0f}%)"
# Add to charge set for visualization consistency
charge_hours.add(current_hour)
# Final decision
# First check: if battery is full (100%), don't charge - switch to standby
if current_soc is not None and current_soc >= 99.5: # Hysteresis for top-off
if current_hour in discharge_hours:
state = self.STATE_DISCHARGE
reason = f"Battery full, discharging (price {current_price:.2f} >= threshold {discharge_threshold:.2f})"
else:
state = self.STATE_STANDBY
reason = "Battery full (100%), waiting for discharge opportunity"
elif emergency_charge:
state = self.STATE_CHARGE
reason = f"EMERGENCY: SoC will drop below {self._min_soc}% at {critical_hour}:00"
elif pre_peak_charge:
state = self.STATE_CHARGE
# Reason already set above
elif current_hour in charge_hours:
state = self.STATE_CHARGE
# Check if this is a midday arbitrage hour or primary cheap hour
if current_hour in midday_charge_hours:
reason = f"Mid-day arbitrage charge (price {current_price:.2f} profitable vs evening peak {avg_evening_peak:.2f})"
elif not pre_peak_charge: # Avoid overwriting coverage reason
reason = f"Cheapest hour (price {current_price:.2f} PLN/kWh in top {self._charge_hours_count} lowest)"
elif current_hour in discharge_hours:
if current_soc is not None and current_soc <= self._min_soc:
state = self.STATE_STANDBY
reason = f"Would discharge but SoC ({current_soc:.0f}%) at minimum"
else:
state = self.STATE_DISCHARGE
reason = f"Price {current_price:.2f} >= threshold {discharge_threshold:.2f}"
else:
state = self.STATE_STANDBY
reason = "Price between thresholds"
attrs["reason"] = reason
# Find next state change
next_change = self._find_next_state_change(
current_hour, state, charge_hours, discharge_hours
)
if next_change:
attrs["next_state_change"] = f"{next_change['hour']:02d}:00"
attrs["next_state"] = next_change["state"]
return state, attrs
def _simulate_soc_forward(
self,
from_hour: int,
start_soc: float,
charge_hours: set,
discharge_hours: set
) -> list[dict]:
"""Simulate SoC for next 24 hours."""
forecast = []
soc = start_soc
for i in range(24):
hour = (from_hour + i) % 24
if hour in charge_hours:
# Charging: use configured charge rate, cap at 100
soc = min(100, soc + self._charge_rate)
action = "charge"
elif hour in discharge_hours:
# Discharging: use configured discharge rate, floor at 0
soc = max(0, soc - self._discharge_rate)
action = "discharge"
else:
# Standby: minimal drain (base consumption ~2%/h)
soc = max(0, soc - 2)
action = "standby"
forecast.append({
"hour": hour,
"soc": round(soc, 1),
"action": action
})
return forecast
def _find_next_state_change(
self,
current_hour: int,
current_state: str,
charge_hours: set,
discharge_hours: set
) -> dict | None:
"""Find when the next state change will occur."""
for i in range(1, 25):
hour = (current_hour + i) % 24
if hour in charge_hours:
next_state = self.STATE_CHARGE
elif hour in discharge_hours:
next_state = self.STATE_DISCHARGE
else:
next_state = self.STATE_STANDBY
if next_state != current_state:
return {"hour": hour, "state": next_state}
return None
@property
def native_value(self) -> str:
"""Return the current recommendation state."""
state, _ = self._calculate_recommendation()
return state
@property
def extra_state_attributes(self) -> dict:
"""Return extra state attributes."""
_, attrs = self._calculate_recommendation()
return attrs
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self.coordinator.data is not None