Files
ha_pstryk/custom_components/pstryk/mqtt_common.py
2025-06-09 13:56:23 +02:00

123 lines
4.3 KiB
Python

"""Common MQTT functionality for Pstryk Energy integration."""
import logging
import json
from datetime import timedelta
from homeassistant.util import dt as dt_util
from homeassistant.helpers.event import async_track_point_in_time
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
async def publish_mqtt_prices(hass, entry_id, buy_topic, sell_topic):
"""Publish prices to MQTT - common function used by all components."""
# Get coordinators
buy_coordinator = hass.data[DOMAIN].get(f"{entry_id}_buy")
sell_coordinator = hass.data[DOMAIN].get(f"{entry_id}_sell")
if not buy_coordinator or not sell_coordinator:
_LOGGER.debug("Price coordinators not found for entry %s (this is normal during startup)", entry_id)
return False
if not buy_coordinator.data or not sell_coordinator.data:
_LOGGER.debug("No price data available for entry %s (coordinators may still be loading)", entry_id)
return False
# Get or create MQTT publisher
mqtt_publisher = hass.data[DOMAIN].get(f"{entry_id}_mqtt")
if not mqtt_publisher:
from .mqtt_publisher import PstrykMqttPublisher
mqtt_publisher = PstrykMqttPublisher(hass, entry_id, buy_topic, sell_topic)
await mqtt_publisher.async_initialize()
# Format prices
buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy")
sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell")
if not buy_prices or not sell_prices:
_LOGGER.error("No valid prices to publish")
return False
# Sort prices
buy_prices.sort(key=lambda x: x["start"])
sell_prices.sort(key=lambda x: x["start"])
# Create payloads
buy_payload = json.dumps(buy_prices)
sell_payload = json.dumps(sell_prices)
# Publish with retain flag
await hass.services.async_call(
"mqtt",
"publish",
{
"topic": buy_topic,
"payload": buy_payload,
"qos": 1,
"retain": True
},
blocking=True
)
await hass.services.async_call(
"mqtt",
"publish",
{
"topic": sell_topic,
"payload": sell_payload,
"qos": 1,
"retain": True
},
blocking=True
)
_LOGGER.info("Published prices to MQTT topics %s and %s", buy_topic, sell_topic)
return True
async def setup_periodic_mqtt_publish(hass, entry_id, buy_topic, sell_topic, interval_minutes=60):
"""Set up periodic MQTT publishing with automatic republishing."""
retain_key = f"{entry_id}_auto_retain"
# Cancel any existing task
if retain_key in hass.data[DOMAIN] and callable(hass.data[DOMAIN][retain_key]):
hass.data[DOMAIN][retain_key]()
hass.data[DOMAIN].pop(retain_key, None)
# Function to republish data periodically
async def republish_retain_periodic(now=None):
"""Republish MQTT message with retain flag periodically."""
try:
success = await publish_mqtt_prices(hass, entry_id, buy_topic, sell_topic)
if success:
_LOGGER.debug("Auto-republished retained messages")
# Schedule next run
next_run = dt_util.now() + timedelta(minutes=interval_minutes)
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
hass, republish_retain_periodic, dt_util.as_utc(next_run)
)
except Exception as e:
_LOGGER.error("Error in automatic MQTT retain process: %s", str(e))
# Try to reschedule anyway
next_run = dt_util.now() + timedelta(minutes=interval_minutes)
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
hass, republish_retain_periodic, dt_util.as_utc(next_run)
)
# Publish immediately
await publish_mqtt_prices(hass, entry_id, buy_topic, sell_topic)
# Schedule first periodic run
next_run = dt_util.now() + timedelta(minutes=interval_minutes)
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
hass, republish_retain_periodic, dt_util.as_utc(next_run)
)
_LOGGER.info(
"Automatic MQTT publishing activated (every %d minutes)",
interval_minutes
)