From 88e6dd589a188396b1401160d3e8365c0e1782ba Mon Sep 17 00:00:00 2001 From: balgerion <133121849+balgerion@users.noreply.github.com> Date: Mon, 9 Jun 2025 13:56:23 +0200 Subject: [PATCH] Create mqtt_common.py --- custom_components/pstryk/mqtt_common.py | 122 ++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 custom_components/pstryk/mqtt_common.py diff --git a/custom_components/pstryk/mqtt_common.py b/custom_components/pstryk/mqtt_common.py new file mode 100644 index 0000000..577b24d --- /dev/null +++ b/custom_components/pstryk/mqtt_common.py @@ -0,0 +1,122 @@ +"""Common MQTT functionality for Pstryk Energy integration.""" +import logging +import json +from datetime import timedelta +from homeassistant.util import dt as dt_util +from homeassistant.helpers.event import async_track_point_in_time + +from .const import DOMAIN + +_LOGGER = logging.getLogger(__name__) + + +async def publish_mqtt_prices(hass, entry_id, buy_topic, sell_topic): + """Publish prices to MQTT - common function used by all components.""" + # Get coordinators + buy_coordinator = hass.data[DOMAIN].get(f"{entry_id}_buy") + sell_coordinator = hass.data[DOMAIN].get(f"{entry_id}_sell") + + if not buy_coordinator or not sell_coordinator: + _LOGGER.debug("Price coordinators not found for entry %s (this is normal during startup)", entry_id) + return False + + if not buy_coordinator.data or not sell_coordinator.data: + _LOGGER.debug("No price data available for entry %s (coordinators may still be loading)", entry_id) + return False + + # Get or create MQTT publisher + mqtt_publisher = hass.data[DOMAIN].get(f"{entry_id}_mqtt") + if not mqtt_publisher: + from .mqtt_publisher import PstrykMqttPublisher + mqtt_publisher = PstrykMqttPublisher(hass, entry_id, buy_topic, sell_topic) + await mqtt_publisher.async_initialize() + + # Format prices + buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy") + sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell") + + if not buy_prices or not sell_prices: + _LOGGER.error("No valid prices to publish") + return False + + # Sort prices + buy_prices.sort(key=lambda x: x["start"]) + sell_prices.sort(key=lambda x: x["start"]) + + # Create payloads + buy_payload = json.dumps(buy_prices) + sell_payload = json.dumps(sell_prices) + + # Publish with retain flag + await hass.services.async_call( + "mqtt", + "publish", + { + "topic": buy_topic, + "payload": buy_payload, + "qos": 1, + "retain": True + }, + blocking=True + ) + + await hass.services.async_call( + "mqtt", + "publish", + { + "topic": sell_topic, + "payload": sell_payload, + "qos": 1, + "retain": True + }, + blocking=True + ) + + _LOGGER.info("Published prices to MQTT topics %s and %s", buy_topic, sell_topic) + return True + + +async def setup_periodic_mqtt_publish(hass, entry_id, buy_topic, sell_topic, interval_minutes=60): + """Set up periodic MQTT publishing with automatic republishing.""" + retain_key = f"{entry_id}_auto_retain" + + # Cancel any existing task + if retain_key in hass.data[DOMAIN] and callable(hass.data[DOMAIN][retain_key]): + hass.data[DOMAIN][retain_key]() + hass.data[DOMAIN].pop(retain_key, None) + + # Function to republish data periodically + async def republish_retain_periodic(now=None): + """Republish MQTT message with retain flag periodically.""" + try: + success = await publish_mqtt_prices(hass, entry_id, buy_topic, sell_topic) + if success: + _LOGGER.debug("Auto-republished retained messages") + + # Schedule next run + next_run = dt_util.now() + timedelta(minutes=interval_minutes) + hass.data[DOMAIN][retain_key] = async_track_point_in_time( + hass, republish_retain_periodic, dt_util.as_utc(next_run) + ) + + except Exception as e: + _LOGGER.error("Error in automatic MQTT retain process: %s", str(e)) + # Try to reschedule anyway + next_run = dt_util.now() + timedelta(minutes=interval_minutes) + hass.data[DOMAIN][retain_key] = async_track_point_in_time( + hass, republish_retain_periodic, dt_util.as_utc(next_run) + ) + + # Publish immediately + await publish_mqtt_prices(hass, entry_id, buy_topic, sell_topic) + + # Schedule first periodic run + next_run = dt_util.now() + timedelta(minutes=interval_minutes) + hass.data[DOMAIN][retain_key] = async_track_point_in_time( + hass, republish_retain_periodic, dt_util.as_utc(next_run) + ) + + _LOGGER.info( + "Automatic MQTT publishing activated (every %d minutes)", + interval_minutes + )