From 8725aaafe4c8e5aec3675ab81ad2d40a8238fe3d Mon Sep 17 00:00:00 2001 From: balgerion <133121849+balgerion@users.noreply.github.com> Date: Mon, 9 Jun 2025 13:53:32 +0200 Subject: [PATCH] Update services.py --- custom_components/pstryk/services.py | 204 ++++++--------------------- 1 file changed, 40 insertions(+), 164 deletions(-) diff --git a/custom_components/pstryk/services.py b/custom_components/pstryk/services.py index 0cb1fdd..06f6171 100644 --- a/custom_components/pstryk/services.py +++ b/custom_components/pstryk/services.py @@ -1,7 +1,6 @@ """Services for Pstryk Energy integration.""" import logging import voluptuous as vol -import json from homeassistant.core import HomeAssistant, ServiceCall, callback from homeassistant.helpers import config_validation as cv from homeassistant.helpers.typing import ConfigType @@ -11,6 +10,7 @@ from homeassistant.helpers.event import async_track_point_in_time from datetime import timedelta from homeassistant.util import dt as dt_util +from .mqtt_common import publish_mqtt_prices, setup_periodic_mqtt_publish from .const import ( DOMAIN, DEFAULT_MQTT_TOPIC_BUY, @@ -79,46 +79,12 @@ async def async_setup_services(hass: HomeAssistant) -> None: # Publish for all entries or the specified one for entry in config_entries: - key = f"{entry.entry_id}_mqtt" - mqtt_publisher = hass.data[DOMAIN].get(key) + # Get topics from options or use override/default + mqtt_topic_buy = topic_buy_override or entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY) + mqtt_topic_sell = topic_sell_override or entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL) - # If publisher not found, create a temporary one - if not mqtt_publisher: - # Check if we have buy coordinator - buy_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_buy") - sell_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_sell") - if not buy_coordinator or not sell_coordinator: - _LOGGER.error("Price coordinators not found for entry %s", entry.entry_id) - continue - - # Dynamically import the publisher class - from .mqtt_publisher import PstrykMqttPublisher - - # Get topic from options or use override/default - mqtt_topic_buy = topic_buy_override - mqtt_topic_sell = topic_sell_override - if not mqtt_topic_buy: - mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY) - if not mqtt_topic_sell: - mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL) - - # Create temporary publisher - mqtt_publisher = PstrykMqttPublisher( - hass, - entry.entry_id, - mqtt_topic_buy, - mqtt_topic_sell - ) - await mqtt_publisher.async_initialize() - else: - # Use the override topics if provided - if topic_buy_override: - mqtt_publisher.mqtt_topic_buy = topic_buy_override - if topic_sell_override: - mqtt_publisher.mqtt_topic_sell = topic_sell_override - - # Publish prices - success = await mqtt_publisher.publish_prices() + # Use common function to publish + success = await publish_mqtt_prices(hass, entry.entry_id, mqtt_topic_buy, mqtt_topic_sell) if success: _LOGGER.info("Manual MQTT publish to EVCC completed for entry %s", entry.entry_id) @@ -147,94 +113,25 @@ async def async_setup_services(hass: HomeAssistant) -> None: # Process all entries or just the specified one for entry in config_entries: - # Get MQTT publisher - mqtt_publisher = hass.data[DOMAIN].get(f"{entry.entry_id}_mqtt") + # Get topics from options or use overrides + mqtt_topic_buy = topic_buy_override or entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY) + mqtt_topic_sell = topic_sell_override or entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL) - # Get buy and sell coordinators - buy_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_buy") - sell_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_sell") + # First immediate publish + success = await publish_mqtt_prices(hass, entry.entry_id, mqtt_topic_buy, mqtt_topic_sell) - if not buy_coordinator or not buy_coordinator.data or not sell_coordinator or not sell_coordinator.data: - _LOGGER.error("No data available for entry %s", entry.entry_id) + if not success: + _LOGGER.error("Failed to publish initial retained messages for entry %s", entry.entry_id) continue - # If no publisher, create a temporary one - if not mqtt_publisher: - # Import publisher class - from .mqtt_publisher import PstrykMqttPublisher - - # Get topics from options or use overrides - mqtt_topic_buy = topic_buy_override or entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY) - mqtt_topic_sell = topic_sell_override or entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL) - - # Create publisher - mqtt_publisher = PstrykMqttPublisher( - hass, - entry.entry_id, - mqtt_topic_buy, - mqtt_topic_sell - ) - await mqtt_publisher.async_initialize() - else: - # Update topics if overrides provided - if topic_buy_override: - mqtt_publisher.mqtt_topic_buy = topic_buy_override - if topic_sell_override: - mqtt_publisher.mqtt_topic_sell = topic_sell_override - - # Format prices - buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy") - sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell") - - if not buy_prices or not sell_prices: - _LOGGER.error("No price data available to publish") - continue - - # Sort prices - buy_prices.sort(key=lambda x: x["start"]) - sell_prices.sort(key=lambda x: x["start"]) - - # Create payloads - buy_payload = json.dumps(buy_prices) - sell_payload = json.dumps(sell_prices) - - # Get topics - buy_topic = mqtt_publisher.mqtt_topic_buy - sell_topic = mqtt_publisher.mqtt_topic_sell - _LOGGER.info( - "Setting up scheduled retain for buy topic %s and sell topic %s for %d hours", - buy_topic, - sell_topic, + "Setting up scheduled retain for topics %s and %s for %d hours", + mqtt_topic_buy, + mqtt_topic_sell, retain_hours ) - # First immediate publish - await hass.services.async_call( - "mqtt", - "publish", - { - "topic": buy_topic, - "payload": buy_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) - - await hass.services.async_call( - "mqtt", - "publish", - { - "topic": sell_topic, - "payload": sell_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) - - # Schedule periodic re-publishing to ensure message stays retained + # Schedule periodic re-publishing now = dt_util.now() end_time = now + timedelta(hours=retain_hours) @@ -247,52 +144,31 @@ async def async_setup_services(hass: HomeAssistant) -> None: # Create a function that will republish and reschedule itself async def republish_retain(now=None): - # Publish with retain - await hass.services.async_call( - "mqtt", - "publish", - { - "topic": buy_topic, - "payload": buy_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) + """Republish retained messages periodically.""" + success = await publish_mqtt_prices(hass, entry.entry_id, mqtt_topic_buy, mqtt_topic_sell) - await hass.services.async_call( - "mqtt", - "publish", - { - "topic": sell_topic, - "payload": sell_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) - - current_time = dt_util.now() - _LOGGER.debug( - "Re-published retained messages (will continue until %s)", - end_time.isoformat() - ) - - # If we've reached the end time, stop - if current_time >= end_time: - _LOGGER.info( - "Finished scheduled retain after %d hours", - retain_hours + if success: + current_time = dt_util.now() + _LOGGER.debug( + "Re-published retained messages (will continue until %s)", + end_time.strftime("%Y-%m-%d %H:%M:%S") + ) + + # If we've reached the end time, stop + if current_time >= end_time: + _LOGGER.info( + "Finished scheduled retain after %d hours", + retain_hours + ) + if retain_key in hass.data[DOMAIN]: + hass.data[DOMAIN].pop(retain_key, None) + return + + # Otherwise, schedule the next run in 1 hour + next_run = current_time + timedelta(hours=1) + hass.data[DOMAIN][retain_key] = async_track_point_in_time( + hass, republish_retain, dt_util.as_utc(next_run) ) - if retain_key in hass.data[DOMAIN]: - hass.data[DOMAIN].pop(retain_key, None) - return - - # Otherwise, schedule the next run in 1 hour - next_run = current_time + timedelta(hours=1) - hass.data[DOMAIN][retain_key] = async_track_point_in_time( - hass, republish_retain, dt_util.as_utc(next_run) - ) # Schedule first run in 1 hour next_run = now + timedelta(hours=1)