Update __init__.py
This commit is contained in:
@ -1,20 +1,21 @@
|
|||||||
"""Pstryk Energy integration."""
|
"""Pstryk Energy integration."""
|
||||||
import logging
|
import logging
|
||||||
|
import asyncio
|
||||||
from homeassistant.config_entries import ConfigEntry
|
from homeassistant.config_entries import ConfigEntry
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
from homeassistant.components import mqtt
|
from homeassistant.components import mqtt
|
||||||
from homeassistant.helpers.event import async_track_point_in_time
|
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
import json
|
|
||||||
|
|
||||||
from .mqtt_publisher import PstrykMqttPublisher
|
from .mqtt_publisher import PstrykMqttPublisher
|
||||||
|
from .mqtt_common import setup_periodic_mqtt_publish
|
||||||
from .services import async_setup_services, async_unload_services
|
from .services import async_setup_services, async_unload_services
|
||||||
from .const import (
|
from .const import (
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
CONF_MQTT_ENABLED,
|
CONF_MQTT_ENABLED,
|
||||||
CONF_MQTT_TOPIC_BUY,
|
CONF_MQTT_TOPIC_BUY,
|
||||||
CONF_MQTT_TOPIC_SELL,
|
CONF_MQTT_TOPIC_SELL,
|
||||||
|
CONF_MQTT_48H_MODE,
|
||||||
DEFAULT_MQTT_TOPIC_BUY,
|
DEFAULT_MQTT_TOPIC_BUY,
|
||||||
DEFAULT_MQTT_TOPIC_SELL
|
DEFAULT_MQTT_TOPIC_SELL
|
||||||
)
|
)
|
||||||
@ -45,6 +46,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||||||
mqtt_enabled = entry.options.get(CONF_MQTT_ENABLED, False)
|
mqtt_enabled = entry.options.get(CONF_MQTT_ENABLED, False)
|
||||||
mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
|
mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
|
||||||
mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
|
mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
|
||||||
|
mqtt_48h_mode = entry.options.get(CONF_MQTT_48H_MODE, False)
|
||||||
|
|
||||||
|
# Store 48h mode in hass.data for coordinators
|
||||||
|
hass.data[DOMAIN][f"{entry.entry_id}_mqtt_48h_mode"] = mqtt_48h_mode
|
||||||
|
|
||||||
if mqtt_enabled:
|
if mqtt_enabled:
|
||||||
# Check if MQTT is available
|
# Check if MQTT is available
|
||||||
@ -73,172 +78,42 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||||||
async def start_mqtt_publisher(_):
|
async def start_mqtt_publisher(_):
|
||||||
"""Start the MQTT publisher after a short delay to ensure coordinators are ready."""
|
"""Start the MQTT publisher after a short delay to ensure coordinators are ready."""
|
||||||
await mqtt_publisher.async_initialize()
|
await mqtt_publisher.async_initialize()
|
||||||
await mqtt_publisher.schedule_periodic_updates(interval_minutes=5)
|
|
||||||
|
|
||||||
# Also start automatic force_retain to ensure messages stay in MQTT
|
# Wait for coordinators to be created by sensor platform
|
||||||
await setup_automatic_retain(
|
max_wait = 30 # Maximum 30 seconds wait
|
||||||
|
wait_interval = 1 # Check every second
|
||||||
|
waited = 0
|
||||||
|
|
||||||
|
while waited < max_wait:
|
||||||
|
buy_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_buy")
|
||||||
|
sell_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_sell")
|
||||||
|
|
||||||
|
if buy_coordinator and sell_coordinator:
|
||||||
|
_LOGGER.debug("Coordinators ready, starting MQTT periodic publishing")
|
||||||
|
break
|
||||||
|
|
||||||
|
await asyncio.sleep(wait_interval)
|
||||||
|
waited += wait_interval
|
||||||
|
else:
|
||||||
|
_LOGGER.warning("Coordinators not ready after %d seconds, MQTT publishing may fail", max_wait)
|
||||||
|
|
||||||
|
# Use common function for periodic publishing
|
||||||
|
await setup_periodic_mqtt_publish(
|
||||||
hass,
|
hass,
|
||||||
entry.entry_id,
|
entry.entry_id,
|
||||||
mqtt_topic_buy,
|
mqtt_topic_buy,
|
||||||
mqtt_topic_sell
|
mqtt_topic_sell,
|
||||||
|
interval_minutes=60
|
||||||
)
|
)
|
||||||
|
|
||||||
# Schedule the initialization to happen shortly after setup is complete
|
# Schedule the initialization to happen shortly after setup is complete
|
||||||
hass.async_create_task(start_mqtt_publisher(None))
|
hass.async_create_task(start_mqtt_publisher(None))
|
||||||
|
|
||||||
_LOGGER.info("EVCC MQTT Bridge enabled for Pstryk Energy, publishing to %s and %s",
|
_LOGGER.info("EVCC MQTT Bridge enabled for Pstryk Energy (48h mode: %s), publishing to %s and %s",
|
||||||
mqtt_topic_buy, mqtt_topic_sell)
|
mqtt_48h_mode, mqtt_topic_buy, mqtt_topic_sell)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def setup_automatic_retain(hass, entry_id, buy_topic, sell_topic, retain_hours=168):
|
|
||||||
"""Set up automatic republishing of MQTT messages to ensure they remain retained."""
|
|
||||||
_LOGGER.info("Setting up automatic MQTT message retention for buy topic %s and sell topic %s",
|
|
||||||
buy_topic, sell_topic)
|
|
||||||
|
|
||||||
# Get buy and sell coordinators
|
|
||||||
buy_coordinator = hass.data[DOMAIN].get(f"{entry_id}_buy")
|
|
||||||
sell_coordinator = hass.data[DOMAIN].get(f"{entry_id}_sell")
|
|
||||||
|
|
||||||
if not buy_coordinator or not buy_coordinator.data or not sell_coordinator or not sell_coordinator.data:
|
|
||||||
_LOGGER.error("No data available for entry %s, will retry later", entry_id)
|
|
||||||
# Schedule a retry in 30 seconds
|
|
||||||
now = dt_util.now()
|
|
||||||
next_run = now + timedelta(seconds=30)
|
|
||||||
hass.async_create_task(
|
|
||||||
async_track_point_in_time(
|
|
||||||
hass,
|
|
||||||
lambda _: setup_automatic_retain(hass, entry_id, buy_topic, sell_topic, retain_hours),
|
|
||||||
dt_util.as_utc(next_run)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Get or create MQTT publisher
|
|
||||||
mqtt_publisher = hass.data[DOMAIN].get(f"{entry_id}_mqtt")
|
|
||||||
if not mqtt_publisher:
|
|
||||||
from .mqtt_publisher import PstrykMqttPublisher
|
|
||||||
mqtt_publisher = PstrykMqttPublisher(hass, entry_id, buy_topic, sell_topic)
|
|
||||||
await mqtt_publisher.async_initialize()
|
|
||||||
|
|
||||||
# Format prices
|
|
||||||
buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy")
|
|
||||||
sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell")
|
|
||||||
|
|
||||||
if not buy_prices or not sell_prices:
|
|
||||||
_LOGGER.error("No price data available to publish for automatic retain")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Sort prices and create payloads
|
|
||||||
buy_prices.sort(key=lambda x: x["start"])
|
|
||||||
sell_prices.sort(key=lambda x: x["start"])
|
|
||||||
|
|
||||||
buy_payload = json.dumps(buy_prices)
|
|
||||||
sell_payload = json.dumps(sell_prices)
|
|
||||||
|
|
||||||
# Publish immediately with retain flag
|
|
||||||
await hass.services.async_call(
|
|
||||||
"mqtt",
|
|
||||||
"publish",
|
|
||||||
{
|
|
||||||
"topic": buy_topic,
|
|
||||||
"payload": buy_payload,
|
|
||||||
"qos": 1,
|
|
||||||
"retain": True
|
|
||||||
},
|
|
||||||
blocking=True
|
|
||||||
)
|
|
||||||
|
|
||||||
await hass.services.async_call(
|
|
||||||
"mqtt",
|
|
||||||
"publish",
|
|
||||||
{
|
|
||||||
"topic": sell_topic,
|
|
||||||
"payload": sell_payload,
|
|
||||||
"qos": 1,
|
|
||||||
"retain": True
|
|
||||||
},
|
|
||||||
blocking=True
|
|
||||||
)
|
|
||||||
|
|
||||||
_LOGGER.info("Published initial retained messages for buy and sell topics")
|
|
||||||
|
|
||||||
# Set up recurring republish every 5 minutes to ensure retention
|
|
||||||
now = dt_util.now()
|
|
||||||
retain_key = f"{entry_id}_auto_retain"
|
|
||||||
|
|
||||||
# Cancel any existing task
|
|
||||||
if retain_key in hass.data[DOMAIN] and callable(hass.data[DOMAIN][retain_key]):
|
|
||||||
hass.data[DOMAIN][retain_key]()
|
|
||||||
hass.data[DOMAIN].pop(retain_key, None)
|
|
||||||
|
|
||||||
# Function to republish data periodically
|
|
||||||
async def republish_retain_periodic(now=None):
|
|
||||||
"""Republish MQTT message with retain flag periodically."""
|
|
||||||
try:
|
|
||||||
# Get fresh data
|
|
||||||
if buy_coordinator.data and sell_coordinator.data:
|
|
||||||
buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy")
|
|
||||||
sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell")
|
|
||||||
|
|
||||||
if buy_prices and sell_prices:
|
|
||||||
buy_prices.sort(key=lambda x: x["start"])
|
|
||||||
sell_prices.sort(key=lambda x: x["start"])
|
|
||||||
|
|
||||||
buy_payload = json.dumps(buy_prices)
|
|
||||||
sell_payload = json.dumps(sell_prices)
|
|
||||||
|
|
||||||
# Publish with retain flag
|
|
||||||
await hass.services.async_call(
|
|
||||||
"mqtt",
|
|
||||||
"publish",
|
|
||||||
{
|
|
||||||
"topic": buy_topic,
|
|
||||||
"payload": buy_payload,
|
|
||||||
"qos": 1,
|
|
||||||
"retain": True
|
|
||||||
},
|
|
||||||
blocking=True
|
|
||||||
)
|
|
||||||
|
|
||||||
await hass.services.async_call(
|
|
||||||
"mqtt",
|
|
||||||
"publish",
|
|
||||||
{
|
|
||||||
"topic": sell_topic,
|
|
||||||
"payload": sell_payload,
|
|
||||||
"qos": 1,
|
|
||||||
"retain": True
|
|
||||||
},
|
|
||||||
blocking=True
|
|
||||||
)
|
|
||||||
|
|
||||||
_LOGGER.debug("Auto-republished retained messages to buy and sell topics")
|
|
||||||
|
|
||||||
# Schedule next run
|
|
||||||
next_run = dt_util.now() + timedelta(minutes=5)
|
|
||||||
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
|
|
||||||
hass, republish_retain_periodic, dt_util.as_utc(next_run)
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
_LOGGER.error("Error in automatic MQTT retain process: %s", str(e))
|
|
||||||
# Try to reschedule anyway
|
|
||||||
next_run = dt_util.now() + timedelta(minutes=5)
|
|
||||||
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
|
|
||||||
hass, republish_retain_periodic, dt_util.as_utc(next_run)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Schedule first run
|
|
||||||
next_run = now + timedelta(minutes=5)
|
|
||||||
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
|
|
||||||
hass, republish_retain_periodic, dt_util.as_utc(next_run)
|
|
||||||
)
|
|
||||||
|
|
||||||
_LOGGER.info(
|
|
||||||
"Automatic MQTT message retention activated for buy and sell topics (every 5 minutes)"
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _cleanup_coordinators(hass: HomeAssistant, entry: ConfigEntry) -> None:
|
async def _cleanup_coordinators(hass: HomeAssistant, entry: ConfigEntry) -> None:
|
||||||
"""Clean up coordinators and cancel scheduled tasks."""
|
"""Clean up coordinators and cancel scheduled tasks."""
|
||||||
# Clean up auto retain task
|
# Clean up auto retain task
|
||||||
@ -267,8 +142,14 @@ async def _cleanup_coordinators(hass: HomeAssistant, entry: ConfigEntry) -> None
|
|||||||
if hasattr(coordinator, '_unsub_midnight') and coordinator._unsub_midnight:
|
if hasattr(coordinator, '_unsub_midnight') and coordinator._unsub_midnight:
|
||||||
coordinator._unsub_midnight()
|
coordinator._unsub_midnight()
|
||||||
coordinator._unsub_midnight = None
|
coordinator._unsub_midnight = None
|
||||||
|
if hasattr(coordinator, '_unsub_afternoon') and coordinator._unsub_afternoon:
|
||||||
|
coordinator._unsub_afternoon()
|
||||||
|
coordinator._unsub_afternoon = None
|
||||||
# Remove from hass data
|
# Remove from hass data
|
||||||
hass.data[DOMAIN].pop(key, None)
|
hass.data[DOMAIN].pop(key, None)
|
||||||
|
|
||||||
|
# Clean up mqtt 48h mode flag
|
||||||
|
hass.data[DOMAIN].pop(f"{entry.entry_id}_mqtt_48h_mode", None)
|
||||||
|
|
||||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
"""Unload sensor platform and clear data."""
|
"""Unload sensor platform and clear data."""
|
||||||
|
|||||||
Reference in New Issue
Block a user