diff --git a/custom_components/pstryk/__init__.py b/custom_components/pstryk/__init__.py index 179c55f..e8343f9 100644 --- a/custom_components/pstryk/__init__.py +++ b/custom_components/pstryk/__init__.py @@ -2,13 +2,32 @@ import logging from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant -from .const import DOMAIN +from homeassistant.components import mqtt +from homeassistant.helpers.event import async_track_point_in_time +from datetime import timedelta +from homeassistant.util import dt as dt_util +import json + +from .mqtt_publisher import PstrykMqttPublisher +from .services import async_setup_services, async_unload_services +from .const import ( + DOMAIN, + CONF_MQTT_ENABLED, + CONF_MQTT_TOPIC_BUY, + CONF_MQTT_TOPIC_SELL, + DEFAULT_MQTT_TOPIC_BUY, + DEFAULT_MQTT_TOPIC_SELL +) _LOGGER = logging.getLogger(__name__) async def async_setup(hass: HomeAssistant, config: dict) -> bool: - """Only set up hass.data structure (no YAML config).""" + """Set up hass.data structure and services.""" hass.data.setdefault(DOMAIN, {}) + + # Set up services + await async_setup_services(hass) + return True async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: @@ -21,10 +40,221 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: await hass.config_entries.async_forward_entry_setup(entry, "sensor") _LOGGER.debug("Pstryk entry setup: %s", entry.entry_id) + + # Setup MQTT publisher if enabled + mqtt_enabled = entry.options.get(CONF_MQTT_ENABLED, False) + mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY) + mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL) + + if mqtt_enabled: + # Check if MQTT is available + if not hass.services.has_service("mqtt", "publish"): + _LOGGER.error("MQTT integration is not enabled. Cannot setup EVCC bridge.") + # Display persistent notification to user + hass.components.persistent_notification.create( + "MQTT integration is not enabled. EVCC MQTT Bridge for Pstryk Energy " + "cannot function. Please configure MQTT integration in Home Assistant.", + title="Pstryk Energy MQTT Error", + notification_id=f"{DOMAIN}_mqtt_error_{entry.entry_id}" + ) + # Still return True to allow the rest of the integration to work + return True + + # Create and store the MQTT publisher + mqtt_publisher = PstrykMqttPublisher( + hass, + entry.entry_id, + mqtt_topic_buy, + mqtt_topic_sell + ) + hass.data[DOMAIN][f"{entry.entry_id}_mqtt"] = mqtt_publisher + + # We need to wait until sensors are fully setup + async def start_mqtt_publisher(_): + """Start the MQTT publisher after a short delay to ensure coordinators are ready.""" + await mqtt_publisher.async_initialize() + await mqtt_publisher.schedule_periodic_updates(interval_minutes=5) + + # Also start automatic force_retain to ensure messages stay in MQTT + await setup_automatic_retain( + hass, + entry.entry_id, + mqtt_topic_buy, + mqtt_topic_sell + ) + + # Schedule the initialization to happen shortly after setup is complete + hass.async_create_task(start_mqtt_publisher(None)) + + _LOGGER.info("EVCC MQTT Bridge enabled for Pstryk Energy, publishing to %s and %s", + mqtt_topic_buy, mqtt_topic_sell) + return True +async def setup_automatic_retain(hass, entry_id, buy_topic, sell_topic, retain_hours=168): + """Set up automatic republishing of MQTT messages to ensure they remain retained.""" + _LOGGER.info("Setting up automatic MQTT message retention for buy topic %s and sell topic %s", + buy_topic, sell_topic) + + # Get buy and sell coordinators + buy_coordinator = hass.data[DOMAIN].get(f"{entry_id}_buy") + sell_coordinator = hass.data[DOMAIN].get(f"{entry_id}_sell") + + if not buy_coordinator or not buy_coordinator.data or not sell_coordinator or not sell_coordinator.data: + _LOGGER.error("No data available for entry %s, will retry later", entry_id) + # Schedule a retry in 30 seconds + now = dt_util.now() + next_run = now + timedelta(seconds=30) + hass.async_create_task( + async_track_point_in_time( + hass, + lambda _: setup_automatic_retain(hass, entry_id, buy_topic, sell_topic, retain_hours), + dt_util.as_utc(next_run) + ) + ) + return + + # Get or create MQTT publisher + mqtt_publisher = hass.data[DOMAIN].get(f"{entry_id}_mqtt") + if not mqtt_publisher: + from .mqtt_publisher import PstrykMqttPublisher + mqtt_publisher = PstrykMqttPublisher(hass, entry_id, buy_topic, sell_topic) + await mqtt_publisher.async_initialize() + + # Format prices + buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy") + sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell") + + if not buy_prices or not sell_prices: + _LOGGER.error("No price data available to publish for automatic retain") + return + + # Sort prices and create payloads + buy_prices.sort(key=lambda x: x["start"]) + sell_prices.sort(key=lambda x: x["start"]) + + buy_payload = json.dumps(buy_prices) + sell_payload = json.dumps(sell_prices) + + # Publish immediately with retain flag + await hass.services.async_call( + "mqtt", + "publish", + { + "topic": buy_topic, + "payload": buy_payload, + "qos": 1, + "retain": True + }, + blocking=True + ) + + await hass.services.async_call( + "mqtt", + "publish", + { + "topic": sell_topic, + "payload": sell_payload, + "qos": 1, + "retain": True + }, + blocking=True + ) + + _LOGGER.info("Published initial retained messages for buy and sell topics") + + # Set up recurring republish every 5 minutes to ensure retention + now = dt_util.now() + retain_key = f"{entry_id}_auto_retain" + + # Cancel any existing task + if retain_key in hass.data[DOMAIN] and callable(hass.data[DOMAIN][retain_key]): + hass.data[DOMAIN][retain_key]() + hass.data[DOMAIN].pop(retain_key, None) + + # Function to republish data periodically + async def republish_retain_periodic(now=None): + """Republish MQTT message with retain flag periodically.""" + try: + # Get fresh data + if buy_coordinator.data and sell_coordinator.data: + buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy") + sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell") + + if buy_prices and sell_prices: + buy_prices.sort(key=lambda x: x["start"]) + sell_prices.sort(key=lambda x: x["start"]) + + buy_payload = json.dumps(buy_prices) + sell_payload = json.dumps(sell_prices) + + # Publish with retain flag + await hass.services.async_call( + "mqtt", + "publish", + { + "topic": buy_topic, + "payload": buy_payload, + "qos": 1, + "retain": True + }, + blocking=True + ) + + await hass.services.async_call( + "mqtt", + "publish", + { + "topic": sell_topic, + "payload": sell_payload, + "qos": 1, + "retain": True + }, + blocking=True + ) + + _LOGGER.debug("Auto-republished retained messages to buy and sell topics") + + # Schedule next run + next_run = dt_util.now() + timedelta(minutes=5) + hass.data[DOMAIN][retain_key] = async_track_point_in_time( + hass, republish_retain_periodic, dt_util.as_utc(next_run) + ) + + except Exception as e: + _LOGGER.error("Error in automatic MQTT retain process: %s", str(e)) + # Try to reschedule anyway + next_run = dt_util.now() + timedelta(minutes=5) + hass.data[DOMAIN][retain_key] = async_track_point_in_time( + hass, republish_retain_periodic, dt_util.as_utc(next_run) + ) + + # Schedule first run + next_run = now + timedelta(minutes=5) + hass.data[DOMAIN][retain_key] = async_track_point_in_time( + hass, republish_retain_periodic, dt_util.as_utc(next_run) + ) + + _LOGGER.info( + "Automatic MQTT message retention activated for buy and sell topics (every 5 minutes)" + ) + async def _cleanup_coordinators(hass: HomeAssistant, entry: ConfigEntry) -> None: """Clean up coordinators and cancel scheduled tasks.""" + # Clean up auto retain task + retain_key = f"{entry.entry_id}_auto_retain" + if retain_key in hass.data[DOMAIN] and callable(hass.data[DOMAIN][retain_key]): + hass.data[DOMAIN][retain_key]() + hass.data[DOMAIN].pop(retain_key, None) + + # Clean up MQTT publisher if exists + mqtt_publisher = hass.data[DOMAIN].get(f"{entry.entry_id}_mqtt") + if mqtt_publisher: + _LOGGER.debug("Cleaning up MQTT publisher for entry %s", entry.entry_id) + mqtt_publisher.unsubscribe() + hass.data[DOMAIN].pop(f"{entry.entry_id}_mqtt", None) + + # Clean up coordinators for price_type in ("buy", "sell"): key = f"{entry.entry_id}_{price_type}" coordinator = hass.data[DOMAIN].get(key) @@ -58,6 +288,11 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: if key.startswith(f"{entry.entry_id}_"): hass.data[DOMAIN].pop(key, None) + # If this is the last entry, unload services + entries = hass.config_entries.async_entries(DOMAIN) + if len(entries) <= 1: # This is the last or only entry + await async_unload_services(hass) + return unload_ok async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None: