diff --git a/custom_components/pstryk/__init__.py b/custom_components/pstryk/__init__.py index e8343f9..8eadd73 100644 --- a/custom_components/pstryk/__init__.py +++ b/custom_components/pstryk/__init__.py @@ -1,20 +1,21 @@ """Pstryk Energy integration.""" import logging +import asyncio from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant from homeassistant.components import mqtt -from homeassistant.helpers.event import async_track_point_in_time from datetime import timedelta from homeassistant.util import dt as dt_util -import json from .mqtt_publisher import PstrykMqttPublisher +from .mqtt_common import setup_periodic_mqtt_publish from .services import async_setup_services, async_unload_services from .const import ( DOMAIN, CONF_MQTT_ENABLED, CONF_MQTT_TOPIC_BUY, CONF_MQTT_TOPIC_SELL, + CONF_MQTT_48H_MODE, DEFAULT_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_SELL ) @@ -45,6 +46,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: mqtt_enabled = entry.options.get(CONF_MQTT_ENABLED, False) mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY) mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL) + mqtt_48h_mode = entry.options.get(CONF_MQTT_48H_MODE, False) + + # Store 48h mode in hass.data for coordinators + hass.data[DOMAIN][f"{entry.entry_id}_mqtt_48h_mode"] = mqtt_48h_mode if mqtt_enabled: # Check if MQTT is available @@ -73,172 +78,42 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def start_mqtt_publisher(_): """Start the MQTT publisher after a short delay to ensure coordinators are ready.""" await mqtt_publisher.async_initialize() - await mqtt_publisher.schedule_periodic_updates(interval_minutes=5) - # Also start automatic force_retain to ensure messages stay in MQTT - await setup_automatic_retain( + # Wait for coordinators to be created by sensor platform + max_wait = 30 # Maximum 30 seconds wait + wait_interval = 1 # Check every second + waited = 0 + + while waited < max_wait: + buy_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_buy") + sell_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_sell") + + if buy_coordinator and sell_coordinator: + _LOGGER.debug("Coordinators ready, starting MQTT periodic publishing") + break + + await asyncio.sleep(wait_interval) + waited += wait_interval + else: + _LOGGER.warning("Coordinators not ready after %d seconds, MQTT publishing may fail", max_wait) + + # Use common function for periodic publishing + await setup_periodic_mqtt_publish( hass, entry.entry_id, - mqtt_topic_buy, - mqtt_topic_sell + mqtt_topic_buy, + mqtt_topic_sell, + interval_minutes=60 ) # Schedule the initialization to happen shortly after setup is complete hass.async_create_task(start_mqtt_publisher(None)) - _LOGGER.info("EVCC MQTT Bridge enabled for Pstryk Energy, publishing to %s and %s", - mqtt_topic_buy, mqtt_topic_sell) + _LOGGER.info("EVCC MQTT Bridge enabled for Pstryk Energy (48h mode: %s), publishing to %s and %s", + mqtt_48h_mode, mqtt_topic_buy, mqtt_topic_sell) return True -async def setup_automatic_retain(hass, entry_id, buy_topic, sell_topic, retain_hours=168): - """Set up automatic republishing of MQTT messages to ensure they remain retained.""" - _LOGGER.info("Setting up automatic MQTT message retention for buy topic %s and sell topic %s", - buy_topic, sell_topic) - - # Get buy and sell coordinators - buy_coordinator = hass.data[DOMAIN].get(f"{entry_id}_buy") - sell_coordinator = hass.data[DOMAIN].get(f"{entry_id}_sell") - - if not buy_coordinator or not buy_coordinator.data or not sell_coordinator or not sell_coordinator.data: - _LOGGER.error("No data available for entry %s, will retry later", entry_id) - # Schedule a retry in 30 seconds - now = dt_util.now() - next_run = now + timedelta(seconds=30) - hass.async_create_task( - async_track_point_in_time( - hass, - lambda _: setup_automatic_retain(hass, entry_id, buy_topic, sell_topic, retain_hours), - dt_util.as_utc(next_run) - ) - ) - return - - # Get or create MQTT publisher - mqtt_publisher = hass.data[DOMAIN].get(f"{entry_id}_mqtt") - if not mqtt_publisher: - from .mqtt_publisher import PstrykMqttPublisher - mqtt_publisher = PstrykMqttPublisher(hass, entry_id, buy_topic, sell_topic) - await mqtt_publisher.async_initialize() - - # Format prices - buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy") - sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell") - - if not buy_prices or not sell_prices: - _LOGGER.error("No price data available to publish for automatic retain") - return - - # Sort prices and create payloads - buy_prices.sort(key=lambda x: x["start"]) - sell_prices.sort(key=lambda x: x["start"]) - - buy_payload = json.dumps(buy_prices) - sell_payload = json.dumps(sell_prices) - - # Publish immediately with retain flag - await hass.services.async_call( - "mqtt", - "publish", - { - "topic": buy_topic, - "payload": buy_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) - - await hass.services.async_call( - "mqtt", - "publish", - { - "topic": sell_topic, - "payload": sell_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) - - _LOGGER.info("Published initial retained messages for buy and sell topics") - - # Set up recurring republish every 5 minutes to ensure retention - now = dt_util.now() - retain_key = f"{entry_id}_auto_retain" - - # Cancel any existing task - if retain_key in hass.data[DOMAIN] and callable(hass.data[DOMAIN][retain_key]): - hass.data[DOMAIN][retain_key]() - hass.data[DOMAIN].pop(retain_key, None) - - # Function to republish data periodically - async def republish_retain_periodic(now=None): - """Republish MQTT message with retain flag periodically.""" - try: - # Get fresh data - if buy_coordinator.data and sell_coordinator.data: - buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy") - sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell") - - if buy_prices and sell_prices: - buy_prices.sort(key=lambda x: x["start"]) - sell_prices.sort(key=lambda x: x["start"]) - - buy_payload = json.dumps(buy_prices) - sell_payload = json.dumps(sell_prices) - - # Publish with retain flag - await hass.services.async_call( - "mqtt", - "publish", - { - "topic": buy_topic, - "payload": buy_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) - - await hass.services.async_call( - "mqtt", - "publish", - { - "topic": sell_topic, - "payload": sell_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) - - _LOGGER.debug("Auto-republished retained messages to buy and sell topics") - - # Schedule next run - next_run = dt_util.now() + timedelta(minutes=5) - hass.data[DOMAIN][retain_key] = async_track_point_in_time( - hass, republish_retain_periodic, dt_util.as_utc(next_run) - ) - - except Exception as e: - _LOGGER.error("Error in automatic MQTT retain process: %s", str(e)) - # Try to reschedule anyway - next_run = dt_util.now() + timedelta(minutes=5) - hass.data[DOMAIN][retain_key] = async_track_point_in_time( - hass, republish_retain_periodic, dt_util.as_utc(next_run) - ) - - # Schedule first run - next_run = now + timedelta(minutes=5) - hass.data[DOMAIN][retain_key] = async_track_point_in_time( - hass, republish_retain_periodic, dt_util.as_utc(next_run) - ) - - _LOGGER.info( - "Automatic MQTT message retention activated for buy and sell topics (every 5 minutes)" - ) - async def _cleanup_coordinators(hass: HomeAssistant, entry: ConfigEntry) -> None: """Clean up coordinators and cancel scheduled tasks.""" # Clean up auto retain task @@ -267,8 +142,14 @@ async def _cleanup_coordinators(hass: HomeAssistant, entry: ConfigEntry) -> None if hasattr(coordinator, '_unsub_midnight') and coordinator._unsub_midnight: coordinator._unsub_midnight() coordinator._unsub_midnight = None + if hasattr(coordinator, '_unsub_afternoon') and coordinator._unsub_afternoon: + coordinator._unsub_afternoon() + coordinator._unsub_afternoon = None # Remove from hass data hass.data[DOMAIN].pop(key, None) + + # Clean up mqtt 48h mode flag + hass.data[DOMAIN].pop(f"{entry.entry_id}_mqtt_48h_mode", None) async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload sensor platform and clear data."""