Files
ha_pstryk/custom_components/pstryk/__init__.py
2025-05-12 15:20:12 +02:00

302 lines
12 KiB
Python

"""Pstryk Energy integration."""
import logging
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.components import mqtt
from homeassistant.helpers.event import async_track_point_in_time
from datetime import timedelta
from homeassistant.util import dt as dt_util
import json
from .mqtt_publisher import PstrykMqttPublisher
from .services import async_setup_services, async_unload_services
from .const import (
DOMAIN,
CONF_MQTT_ENABLED,
CONF_MQTT_TOPIC_BUY,
CONF_MQTT_TOPIC_SELL,
DEFAULT_MQTT_TOPIC_BUY,
DEFAULT_MQTT_TOPIC_SELL
)
_LOGGER = logging.getLogger(__name__)
async def async_setup(hass: HomeAssistant, config: dict) -> bool:
"""Set up hass.data structure and services."""
hass.data.setdefault(DOMAIN, {})
# Set up services
await async_setup_services(hass)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Store API key and forward to sensor platform."""
hass.data[DOMAIN].setdefault(entry.entry_id, {})["api_key"] = entry.data.get("api_key")
# Register update listener for option changes - only if not already registered
if not entry.update_listeners:
entry.async_on_unload(entry.add_update_listener(async_reload_entry))
await hass.config_entries.async_forward_entry_setup(entry, "sensor")
_LOGGER.debug("Pstryk entry setup: %s", entry.entry_id)
# Setup MQTT publisher if enabled
mqtt_enabled = entry.options.get(CONF_MQTT_ENABLED, False)
mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
if mqtt_enabled:
# Check if MQTT is available
if not hass.services.has_service("mqtt", "publish"):
_LOGGER.error("MQTT integration is not enabled. Cannot setup EVCC bridge.")
# Display persistent notification to user
hass.components.persistent_notification.create(
"MQTT integration is not enabled. EVCC MQTT Bridge for Pstryk Energy "
"cannot function. Please configure MQTT integration in Home Assistant.",
title="Pstryk Energy MQTT Error",
notification_id=f"{DOMAIN}_mqtt_error_{entry.entry_id}"
)
# Still return True to allow the rest of the integration to work
return True
# Create and store the MQTT publisher
mqtt_publisher = PstrykMqttPublisher(
hass,
entry.entry_id,
mqtt_topic_buy,
mqtt_topic_sell
)
hass.data[DOMAIN][f"{entry.entry_id}_mqtt"] = mqtt_publisher
# We need to wait until sensors are fully setup
async def start_mqtt_publisher(_):
"""Start the MQTT publisher after a short delay to ensure coordinators are ready."""
await mqtt_publisher.async_initialize()
await mqtt_publisher.schedule_periodic_updates(interval_minutes=5)
# Also start automatic force_retain to ensure messages stay in MQTT
await setup_automatic_retain(
hass,
entry.entry_id,
mqtt_topic_buy,
mqtt_topic_sell
)
# Schedule the initialization to happen shortly after setup is complete
hass.async_create_task(start_mqtt_publisher(None))
_LOGGER.info("EVCC MQTT Bridge enabled for Pstryk Energy, publishing to %s and %s",
mqtt_topic_buy, mqtt_topic_sell)
return True
async def setup_automatic_retain(hass, entry_id, buy_topic, sell_topic, retain_hours=168):
"""Set up automatic republishing of MQTT messages to ensure they remain retained."""
_LOGGER.info("Setting up automatic MQTT message retention for buy topic %s and sell topic %s",
buy_topic, sell_topic)
# Get buy and sell coordinators
buy_coordinator = hass.data[DOMAIN].get(f"{entry_id}_buy")
sell_coordinator = hass.data[DOMAIN].get(f"{entry_id}_sell")
if not buy_coordinator or not buy_coordinator.data or not sell_coordinator or not sell_coordinator.data:
_LOGGER.error("No data available for entry %s, will retry later", entry_id)
# Schedule a retry in 30 seconds
now = dt_util.now()
next_run = now + timedelta(seconds=30)
hass.async_create_task(
async_track_point_in_time(
hass,
lambda _: setup_automatic_retain(hass, entry_id, buy_topic, sell_topic, retain_hours),
dt_util.as_utc(next_run)
)
)
return
# Get or create MQTT publisher
mqtt_publisher = hass.data[DOMAIN].get(f"{entry_id}_mqtt")
if not mqtt_publisher:
from .mqtt_publisher import PstrykMqttPublisher
mqtt_publisher = PstrykMqttPublisher(hass, entry_id, buy_topic, sell_topic)
await mqtt_publisher.async_initialize()
# Format prices
buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy")
sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell")
if not buy_prices or not sell_prices:
_LOGGER.error("No price data available to publish for automatic retain")
return
# Sort prices and create payloads
buy_prices.sort(key=lambda x: x["start"])
sell_prices.sort(key=lambda x: x["start"])
buy_payload = json.dumps(buy_prices)
sell_payload = json.dumps(sell_prices)
# Publish immediately with retain flag
await hass.services.async_call(
"mqtt",
"publish",
{
"topic": buy_topic,
"payload": buy_payload,
"qos": 1,
"retain": True
},
blocking=True
)
await hass.services.async_call(
"mqtt",
"publish",
{
"topic": sell_topic,
"payload": sell_payload,
"qos": 1,
"retain": True
},
blocking=True
)
_LOGGER.info("Published initial retained messages for buy and sell topics")
# Set up recurring republish every 5 minutes to ensure retention
now = dt_util.now()
retain_key = f"{entry_id}_auto_retain"
# Cancel any existing task
if retain_key in hass.data[DOMAIN] and callable(hass.data[DOMAIN][retain_key]):
hass.data[DOMAIN][retain_key]()
hass.data[DOMAIN].pop(retain_key, None)
# Function to republish data periodically
async def republish_retain_periodic(now=None):
"""Republish MQTT message with retain flag periodically."""
try:
# Get fresh data
if buy_coordinator.data and sell_coordinator.data:
buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy")
sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell")
if buy_prices and sell_prices:
buy_prices.sort(key=lambda x: x["start"])
sell_prices.sort(key=lambda x: x["start"])
buy_payload = json.dumps(buy_prices)
sell_payload = json.dumps(sell_prices)
# Publish with retain flag
await hass.services.async_call(
"mqtt",
"publish",
{
"topic": buy_topic,
"payload": buy_payload,
"qos": 1,
"retain": True
},
blocking=True
)
await hass.services.async_call(
"mqtt",
"publish",
{
"topic": sell_topic,
"payload": sell_payload,
"qos": 1,
"retain": True
},
blocking=True
)
_LOGGER.debug("Auto-republished retained messages to buy and sell topics")
# Schedule next run
next_run = dt_util.now() + timedelta(minutes=5)
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
hass, republish_retain_periodic, dt_util.as_utc(next_run)
)
except Exception as e:
_LOGGER.error("Error in automatic MQTT retain process: %s", str(e))
# Try to reschedule anyway
next_run = dt_util.now() + timedelta(minutes=5)
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
hass, republish_retain_periodic, dt_util.as_utc(next_run)
)
# Schedule first run
next_run = now + timedelta(minutes=5)
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
hass, republish_retain_periodic, dt_util.as_utc(next_run)
)
_LOGGER.info(
"Automatic MQTT message retention activated for buy and sell topics (every 5 minutes)"
)
async def _cleanup_coordinators(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Clean up coordinators and cancel scheduled tasks."""
# Clean up auto retain task
retain_key = f"{entry.entry_id}_auto_retain"
if retain_key in hass.data[DOMAIN] and callable(hass.data[DOMAIN][retain_key]):
hass.data[DOMAIN][retain_key]()
hass.data[DOMAIN].pop(retain_key, None)
# Clean up MQTT publisher if exists
mqtt_publisher = hass.data[DOMAIN].get(f"{entry.entry_id}_mqtt")
if mqtt_publisher:
_LOGGER.debug("Cleaning up MQTT publisher for entry %s", entry.entry_id)
mqtt_publisher.unsubscribe()
hass.data[DOMAIN].pop(f"{entry.entry_id}_mqtt", None)
# Clean up coordinators
for price_type in ("buy", "sell"):
key = f"{entry.entry_id}_{price_type}"
coordinator = hass.data[DOMAIN].get(key)
if coordinator:
_LOGGER.debug("Cleaning up %s coordinator for entry %s", price_type, entry.entry_id)
# Cancel scheduled updates
if hasattr(coordinator, '_unsub_hourly') and coordinator._unsub_hourly:
coordinator._unsub_hourly()
coordinator._unsub_hourly = None
if hasattr(coordinator, '_unsub_midnight') and coordinator._unsub_midnight:
coordinator._unsub_midnight()
coordinator._unsub_midnight = None
# Remove from hass data
hass.data[DOMAIN].pop(key, None)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload sensor platform and clear data."""
# First cancel coordinators' scheduled updates
await _cleanup_coordinators(hass, entry)
# Then unload the platform
unload_ok = await hass.config_entries.async_forward_entry_unload(entry, "sensor")
# Finally clean up data
if unload_ok:
if entry.entry_id in hass.data[DOMAIN]:
hass.data[DOMAIN].pop(entry.entry_id)
# Clean up any remaining components
for key in list(hass.data[DOMAIN].keys()):
if key.startswith(f"{entry.entry_id}_"):
hass.data[DOMAIN].pop(key, None)
# If this is the last entry, unload services
entries = hass.config_entries.async_entries(DOMAIN)
if len(entries) <= 1: # This is the last or only entry
await async_unload_services(hass)
return unload_ok
async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Reload the config entry when options change."""
await async_unload_entry(hass, entry)
await async_setup_entry(hass, entry)