Update services.py
This commit is contained in:
@ -1,7 +1,6 @@
|
|||||||
"""Services for Pstryk Energy integration."""
|
"""Services for Pstryk Energy integration."""
|
||||||
import logging
|
import logging
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
import json
|
|
||||||
from homeassistant.core import HomeAssistant, ServiceCall, callback
|
from homeassistant.core import HomeAssistant, ServiceCall, callback
|
||||||
from homeassistant.helpers import config_validation as cv
|
from homeassistant.helpers import config_validation as cv
|
||||||
from homeassistant.helpers.typing import ConfigType
|
from homeassistant.helpers.typing import ConfigType
|
||||||
@ -11,6 +10,7 @@ from homeassistant.helpers.event import async_track_point_in_time
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
|
|
||||||
|
from .mqtt_common import publish_mqtt_prices, setup_periodic_mqtt_publish
|
||||||
from .const import (
|
from .const import (
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
DEFAULT_MQTT_TOPIC_BUY,
|
DEFAULT_MQTT_TOPIC_BUY,
|
||||||
@ -79,46 +79,12 @@ async def async_setup_services(hass: HomeAssistant) -> None:
|
|||||||
|
|
||||||
# Publish for all entries or the specified one
|
# Publish for all entries or the specified one
|
||||||
for entry in config_entries:
|
for entry in config_entries:
|
||||||
key = f"{entry.entry_id}_mqtt"
|
# Get topics from options or use override/default
|
||||||
mqtt_publisher = hass.data[DOMAIN].get(key)
|
mqtt_topic_buy = topic_buy_override or entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
|
||||||
|
mqtt_topic_sell = topic_sell_override or entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
|
||||||
|
|
||||||
# If publisher not found, create a temporary one
|
# Use common function to publish
|
||||||
if not mqtt_publisher:
|
success = await publish_mqtt_prices(hass, entry.entry_id, mqtt_topic_buy, mqtt_topic_sell)
|
||||||
# Check if we have buy coordinator
|
|
||||||
buy_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_buy")
|
|
||||||
sell_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_sell")
|
|
||||||
if not buy_coordinator or not sell_coordinator:
|
|
||||||
_LOGGER.error("Price coordinators not found for entry %s", entry.entry_id)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Dynamically import the publisher class
|
|
||||||
from .mqtt_publisher import PstrykMqttPublisher
|
|
||||||
|
|
||||||
# Get topic from options or use override/default
|
|
||||||
mqtt_topic_buy = topic_buy_override
|
|
||||||
mqtt_topic_sell = topic_sell_override
|
|
||||||
if not mqtt_topic_buy:
|
|
||||||
mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
|
|
||||||
if not mqtt_topic_sell:
|
|
||||||
mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
|
|
||||||
|
|
||||||
# Create temporary publisher
|
|
||||||
mqtt_publisher = PstrykMqttPublisher(
|
|
||||||
hass,
|
|
||||||
entry.entry_id,
|
|
||||||
mqtt_topic_buy,
|
|
||||||
mqtt_topic_sell
|
|
||||||
)
|
|
||||||
await mqtt_publisher.async_initialize()
|
|
||||||
else:
|
|
||||||
# Use the override topics if provided
|
|
||||||
if topic_buy_override:
|
|
||||||
mqtt_publisher.mqtt_topic_buy = topic_buy_override
|
|
||||||
if topic_sell_override:
|
|
||||||
mqtt_publisher.mqtt_topic_sell = topic_sell_override
|
|
||||||
|
|
||||||
# Publish prices
|
|
||||||
success = await mqtt_publisher.publish_prices()
|
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
_LOGGER.info("Manual MQTT publish to EVCC completed for entry %s", entry.entry_id)
|
_LOGGER.info("Manual MQTT publish to EVCC completed for entry %s", entry.entry_id)
|
||||||
@ -147,94 +113,25 @@ async def async_setup_services(hass: HomeAssistant) -> None:
|
|||||||
|
|
||||||
# Process all entries or just the specified one
|
# Process all entries or just the specified one
|
||||||
for entry in config_entries:
|
for entry in config_entries:
|
||||||
# Get MQTT publisher
|
# Get topics from options or use overrides
|
||||||
mqtt_publisher = hass.data[DOMAIN].get(f"{entry.entry_id}_mqtt")
|
mqtt_topic_buy = topic_buy_override or entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
|
||||||
|
mqtt_topic_sell = topic_sell_override or entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
|
||||||
|
|
||||||
# Get buy and sell coordinators
|
# First immediate publish
|
||||||
buy_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_buy")
|
success = await publish_mqtt_prices(hass, entry.entry_id, mqtt_topic_buy, mqtt_topic_sell)
|
||||||
sell_coordinator = hass.data[DOMAIN].get(f"{entry.entry_id}_sell")
|
|
||||||
|
|
||||||
if not buy_coordinator or not buy_coordinator.data or not sell_coordinator or not sell_coordinator.data:
|
if not success:
|
||||||
_LOGGER.error("No data available for entry %s", entry.entry_id)
|
_LOGGER.error("Failed to publish initial retained messages for entry %s", entry.entry_id)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If no publisher, create a temporary one
|
|
||||||
if not mqtt_publisher:
|
|
||||||
# Import publisher class
|
|
||||||
from .mqtt_publisher import PstrykMqttPublisher
|
|
||||||
|
|
||||||
# Get topics from options or use overrides
|
|
||||||
mqtt_topic_buy = topic_buy_override or entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
|
|
||||||
mqtt_topic_sell = topic_sell_override or entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
|
|
||||||
|
|
||||||
# Create publisher
|
|
||||||
mqtt_publisher = PstrykMqttPublisher(
|
|
||||||
hass,
|
|
||||||
entry.entry_id,
|
|
||||||
mqtt_topic_buy,
|
|
||||||
mqtt_topic_sell
|
|
||||||
)
|
|
||||||
await mqtt_publisher.async_initialize()
|
|
||||||
else:
|
|
||||||
# Update topics if overrides provided
|
|
||||||
if topic_buy_override:
|
|
||||||
mqtt_publisher.mqtt_topic_buy = topic_buy_override
|
|
||||||
if topic_sell_override:
|
|
||||||
mqtt_publisher.mqtt_topic_sell = topic_sell_override
|
|
||||||
|
|
||||||
# Format prices
|
|
||||||
buy_prices = mqtt_publisher._format_prices_for_evcc(buy_coordinator.data, "buy")
|
|
||||||
sell_prices = mqtt_publisher._format_prices_for_evcc(sell_coordinator.data, "sell")
|
|
||||||
|
|
||||||
if not buy_prices or not sell_prices:
|
|
||||||
_LOGGER.error("No price data available to publish")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Sort prices
|
|
||||||
buy_prices.sort(key=lambda x: x["start"])
|
|
||||||
sell_prices.sort(key=lambda x: x["start"])
|
|
||||||
|
|
||||||
# Create payloads
|
|
||||||
buy_payload = json.dumps(buy_prices)
|
|
||||||
sell_payload = json.dumps(sell_prices)
|
|
||||||
|
|
||||||
# Get topics
|
|
||||||
buy_topic = mqtt_publisher.mqtt_topic_buy
|
|
||||||
sell_topic = mqtt_publisher.mqtt_topic_sell
|
|
||||||
|
|
||||||
_LOGGER.info(
|
_LOGGER.info(
|
||||||
"Setting up scheduled retain for buy topic %s and sell topic %s for %d hours",
|
"Setting up scheduled retain for topics %s and %s for %d hours",
|
||||||
buy_topic,
|
mqtt_topic_buy,
|
||||||
sell_topic,
|
mqtt_topic_sell,
|
||||||
retain_hours
|
retain_hours
|
||||||
)
|
)
|
||||||
|
|
||||||
# First immediate publish
|
# Schedule periodic re-publishing
|
||||||
await hass.services.async_call(
|
|
||||||
"mqtt",
|
|
||||||
"publish",
|
|
||||||
{
|
|
||||||
"topic": buy_topic,
|
|
||||||
"payload": buy_payload,
|
|
||||||
"qos": 1,
|
|
||||||
"retain": True
|
|
||||||
},
|
|
||||||
blocking=True
|
|
||||||
)
|
|
||||||
|
|
||||||
await hass.services.async_call(
|
|
||||||
"mqtt",
|
|
||||||
"publish",
|
|
||||||
{
|
|
||||||
"topic": sell_topic,
|
|
||||||
"payload": sell_payload,
|
|
||||||
"qos": 1,
|
|
||||||
"retain": True
|
|
||||||
},
|
|
||||||
blocking=True
|
|
||||||
)
|
|
||||||
|
|
||||||
# Schedule periodic re-publishing to ensure message stays retained
|
|
||||||
now = dt_util.now()
|
now = dt_util.now()
|
||||||
end_time = now + timedelta(hours=retain_hours)
|
end_time = now + timedelta(hours=retain_hours)
|
||||||
|
|
||||||
@ -247,52 +144,31 @@ async def async_setup_services(hass: HomeAssistant) -> None:
|
|||||||
|
|
||||||
# Create a function that will republish and reschedule itself
|
# Create a function that will republish and reschedule itself
|
||||||
async def republish_retain(now=None):
|
async def republish_retain(now=None):
|
||||||
# Publish with retain
|
"""Republish retained messages periodically."""
|
||||||
await hass.services.async_call(
|
success = await publish_mqtt_prices(hass, entry.entry_id, mqtt_topic_buy, mqtt_topic_sell)
|
||||||
"mqtt",
|
|
||||||
"publish",
|
|
||||||
{
|
|
||||||
"topic": buy_topic,
|
|
||||||
"payload": buy_payload,
|
|
||||||
"qos": 1,
|
|
||||||
"retain": True
|
|
||||||
},
|
|
||||||
blocking=True
|
|
||||||
)
|
|
||||||
|
|
||||||
await hass.services.async_call(
|
if success:
|
||||||
"mqtt",
|
current_time = dt_util.now()
|
||||||
"publish",
|
_LOGGER.debug(
|
||||||
{
|
"Re-published retained messages (will continue until %s)",
|
||||||
"topic": sell_topic,
|
end_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
"payload": sell_payload,
|
)
|
||||||
"qos": 1,
|
|
||||||
"retain": True
|
# If we've reached the end time, stop
|
||||||
},
|
if current_time >= end_time:
|
||||||
blocking=True
|
_LOGGER.info(
|
||||||
)
|
"Finished scheduled retain after %d hours",
|
||||||
|
retain_hours
|
||||||
current_time = dt_util.now()
|
)
|
||||||
_LOGGER.debug(
|
if retain_key in hass.data[DOMAIN]:
|
||||||
"Re-published retained messages (will continue until %s)",
|
hass.data[DOMAIN].pop(retain_key, None)
|
||||||
end_time.isoformat()
|
return
|
||||||
)
|
|
||||||
|
# Otherwise, schedule the next run in 1 hour
|
||||||
# If we've reached the end time, stop
|
next_run = current_time + timedelta(hours=1)
|
||||||
if current_time >= end_time:
|
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
|
||||||
_LOGGER.info(
|
hass, republish_retain, dt_util.as_utc(next_run)
|
||||||
"Finished scheduled retain after %d hours",
|
|
||||||
retain_hours
|
|
||||||
)
|
)
|
||||||
if retain_key in hass.data[DOMAIN]:
|
|
||||||
hass.data[DOMAIN].pop(retain_key, None)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Otherwise, schedule the next run in 1 hour
|
|
||||||
next_run = current_time + timedelta(hours=1)
|
|
||||||
hass.data[DOMAIN][retain_key] = async_track_point_in_time(
|
|
||||||
hass, republish_retain, dt_util.as_utc(next_run)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Schedule first run in 1 hour
|
# Schedule first run in 1 hour
|
||||||
next_run = now + timedelta(hours=1)
|
next_run = now + timedelta(hours=1)
|
||||||
|
|||||||
Reference in New Issue
Block a user