Create mqtt_common.py
This commit is contained in:
122
custom_components/pstryk/mqtt_common.py
Normal file
122
custom_components/pstryk/mqtt_common.py
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
"""Common MQTT functionality for Pstryk Energy integration."""
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from datetime import timedelta
|
||||||
|
from homeassistant.util import dt as dt_util
|
||||||
|
from homeassistant.helpers.event import async_track_point_in_time
|
||||||
|
|
||||||
|
from .const import DOMAIN
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def publish_mqtt_prices(hass, entry_id, buy_topic, sell_topic):
|
||||||
|
"""Publish prices to MQTT - common function used by all components."""
|
||||||
|
# Get coordinators
|
||||||
|
buy_coordinator = hass.data[DOMAIN].get(f"{entry_id}_buy")
|
||||||
|
sell_coordinator = hass.data[DOMAIN].get(f"{entry_id}_sell")
|
||||||
|
|
||||||
|
if not buy_coordinator or not sell_coordinator:
|
||||||
|
_LOGGER.debug("Price coordinators not found for entry %s (this is normal during startup)", entry_id)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not buy_coordinator.data or not sell_coordinator.data:
|
||||||
|
_LOGGER.debug("No price data available for entry %s (coordinators may still be loading)", entry_id)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Get or create MQTT publisher
|
||||||
|
mqtt_publisher = hass.data[DOMAIN].get(f"{entry_id}_mqtt")
|
||||||
|
if not mqtt_publisher:
|
||||||
|
from .mqtt_publisher import PstrykMqttPublisher
|
||||||
|
mqtt_publisher = PstrykMqttPublisher(hass, entry_id, buy_topic, sell_topic)
|
||||||
|
await mqtt_publisher.async_initialize()
|
||||||
|
|
||||||
|
# Format prices
|
||||||
|
buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy")
|
||||||
|
sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell")
|
||||||
|
|
||||||
|
if not buy_prices or not sell_prices:
|
||||||
|
_LOGGER.error("No valid prices to publish")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Sort prices
|
||||||
|
buy_prices.sort(key=lambda x: x["start"])
|
||||||
|
sell_prices.sort(key=lambda x: x["start"])
|
||||||
|
|
||||||
|
# Create payloads
|
||||||
|
buy_payload = json.dumps(buy_prices)
|
||||||
|
sell_payload = json.dumps(sell_prices)
|
||||||
|
|
||||||
|
# Publish with retain flag
|
||||||
|
await hass.services.async_call(
|
||||||
|
"mqtt",
|
||||||
|
"publish",
|
||||||
|
{
|
||||||
|
"topic": buy_topic,
|
||||||
|
"payload": buy_payload,
|
||||||
|
"qos": 1,
|
||||||
|
"retain": True
|
||||||
|
},
|
||||||
|
blocking=True
|
||||||
|
)
|
||||||
|
|
||||||
|
await hass.services.async_call(
|
||||||
|
"mqtt",
|
||||||
|
"publish",
|
||||||
|
{
|
||||||
|
"topic": sell_topic,
|
||||||
|
"payload": sell_payload,
|
||||||
|
"qos": 1,
|
||||||
|
"retain": True
|
||||||
|
},
|
||||||
|
blocking=True
|
||||||
|
)
|
||||||
|
|
||||||
|
_LOGGER.info("Published prices to MQTT topics %s and %s", buy_topic, sell_topic)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def setup_periodic_mqtt_publish(hass, entry_id, buy_topic, sell_topic, interval_minutes=60):
|
||||||
|
"""Set up periodic MQTT publishing with automatic republishing."""
|
||||||
|
retain_key = f"{entry_id}_auto_retain"
|
||||||
|
|
||||||
|
# Cancel any existing task
|
||||||
|
if retain_key in hass.data[DOMAIN] and callable(hass.data[DOMAIN][retain_key]):
|
||||||
|
hass.data[DOMAIN][retain_key]()
|
||||||
|
hass.data[DOMAIN].pop(retain_key, None)
|
||||||
|
|
||||||
|
# Function to republish data periodically
|
||||||
|
async def republish_retain_periodic(now=None):
|
||||||
|
"""Republish MQTT message with retain flag periodically."""
|
||||||
|
try:
|
||||||
|
success = await publish_mqtt_prices(hass, entry_id, buy_topic, sell_topic)
|
||||||
|
if success:
|
||||||
|
_LOGGER.debug("Auto-republished retained messages")
|
||||||
|
|
||||||
|
# Schedule next run
|
||||||
|
next_run = dt_util.now() + timedelta(minutes=interval_minutes)
|
||||||
|
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
|
||||||
|
hass, republish_retain_periodic, dt_util.as_utc(next_run)
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
_LOGGER.error("Error in automatic MQTT retain process: %s", str(e))
|
||||||
|
# Try to reschedule anyway
|
||||||
|
next_run = dt_util.now() + timedelta(minutes=interval_minutes)
|
||||||
|
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
|
||||||
|
hass, republish_retain_periodic, dt_util.as_utc(next_run)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publish immediately
|
||||||
|
await publish_mqtt_prices(hass, entry_id, buy_topic, sell_topic)
|
||||||
|
|
||||||
|
# Schedule first periodic run
|
||||||
|
next_run = dt_util.now() + timedelta(minutes=interval_minutes)
|
||||||
|
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
|
||||||
|
hass, republish_retain_periodic, dt_util.as_utc(next_run)
|
||||||
|
)
|
||||||
|
|
||||||
|
_LOGGER.info(
|
||||||
|
"Automatic MQTT publishing activated (every %d minutes)",
|
||||||
|
interval_minutes
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user