Update mqtt_publisher.py

This commit is contained in:
balgerion
2025-06-09 13:54:14 +02:00
committed by GitHub
parent c17c091e99
commit b01cc8df66

View File

@ -33,6 +33,7 @@ class PstrykMqttPublisher:
self.entry_id = entry_id
self.mqtt_topic_buy = mqtt_topic_buy
self.mqtt_topic_sell = mqtt_topic_sell
self._publish_task = None
self._initialized = False
self._translations = {}
self._unsub_timer = None
@ -53,6 +54,35 @@ class PstrykMqttPublisher:
self._initialized = True
return True
def _is_day_data_valid(self, day_prices):
"""Check if prices for a specific day look like real data.
Returns False if the data appears to be placeholders.
"""
if not day_prices:
return False
price_values = [p.get("price") for p in day_prices if p.get("price") is not None]
if not price_values:
return False
# Need at least 20 hours for a valid day
if len(price_values) < 20:
return False
# If all values are identical, it's likely placeholder data
unique_values = set(price_values)
if len(unique_values) == 1:
return False
# If more than 90% of values are the same, probably placeholders
most_common = max(set(price_values), key=price_values.count)
if price_values.count(most_common) / len(price_values) > 0.9:
return False
return True
def _format_prices_for_evcc(self, prices_data, price_type):
"""Format prices in EVCC expected format.
@ -71,11 +101,52 @@ class PstrykMqttPublisher:
formatted_prices = []
for price_entry in prices_data.get("prices", []):
# First, check if we're in 48h mode
mqtt_48h_mode = self.hass.data[DOMAIN].get(f"{self.entry_id}_mqtt_48h_mode", False)
# Get current date for comparison
now = dt_util.as_local(dt_util.utcnow())
today_str = now.strftime("%Y-%m-%d")
tomorrow_str = (now + timedelta(days=1)).strftime("%Y-%m-%d")
# Get the appropriate price list based on mode
if mqtt_48h_mode:
price_list = prices_data.get("prices", [])
# In 48h mode, we need to check if tomorrow's data is valid
# Separate today and tomorrow prices
today_prices = [p for p in price_list if p.get("start", "").startswith(today_str)]
tomorrow_prices = [p for p in price_list if p.get("start", "").startswith(tomorrow_str)]
# Always include today's prices
prices_to_process = today_prices.copy()
# Only include tomorrow if the data looks valid
if tomorrow_prices and self._is_day_data_valid(tomorrow_prices):
prices_to_process.extend(tomorrow_prices)
_LOGGER.info(f"Including {len(tomorrow_prices)} tomorrow prices in MQTT publish")
else:
if tomorrow_prices:
_LOGGER.info(f"Excluding {len(tomorrow_prices)} tomorrow prices from MQTT - appear to be placeholders")
else:
_LOGGER.debug("No tomorrow prices available for MQTT publish")
else:
# In normal mode, use only today's prices
prices_to_process = prices_data.get("prices_today", [])
# Process the selected prices
for price_entry in prices_to_process:
try:
if "start" not in price_entry or "price" not in price_entry:
continue
# Validate price is a number
try:
price_value = float(price_entry["price"])
except (TypeError, ValueError):
_LOGGER.warning("Invalid price value: %s", price_entry.get("price"))
continue
# Parse local time and convert to UTC ISO format
local_dt = dt_util.parse_datetime(price_entry["start"])
if not local_dt:
@ -91,7 +162,7 @@ class PstrykMqttPublisher:
end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
# EVCC w trybie custom pokazuje ceny jako "gr" (grosze),
price_value = price_entry["price"] / 1
price_value = price_value / 1
formatted_prices.append({
"start": start_str,
@ -101,147 +172,35 @@ class PstrykMqttPublisher:
except Exception as e:
_LOGGER.error("Error formatting price for EVCC: %s", str(e))
_LOGGER.debug(f"Formatted {len(formatted_prices)} prices for MQTT from {len(prices_to_process)} input prices")
return formatted_prices
async def publish_prices(self):
"""Publish prices to MQTT in EVCC format for both buy and sell."""
try:
if not self._initialized:
await self.async_initialize()
# Get both buy and sell coordinators
buy_coordinator = self.hass.data[DOMAIN].get(f"{self.entry_id}_buy")
sell_coordinator = self.hass.data[DOMAIN].get(f"{self.entry_id}_sell")
if not buy_coordinator or not sell_coordinator:
_LOGGER.error("Unable to find Pstryk coordinators for MQTT publishing")
return False
# Check if data is available
if not buy_coordinator.data:
_LOGGER.warning("No buy price data available for MQTT publishing")
return False
if not sell_coordinator.data:
_LOGGER.warning("No sell price data available for MQTT publishing")
return False
# Format prices for EVCC
buy_prices = self._format_prices_for_evcc(buy_coordinator.data, "buy")
sell_prices = self._format_prices_for_evcc(sell_coordinator.data, "sell")
if not buy_prices:
_LOGGER.warning("No valid buy prices available to publish to MQTT")
return False
if not sell_prices:
_LOGGER.warning("No valid sell prices available to publish to MQTT")
return False
# Sort prices by time to ensure chronological order
buy_prices.sort(key=lambda x: x["start"])
sell_prices.sort(key=lambda x: x["start"])
# Convert to JSON
buy_payload = json.dumps(buy_prices)
sell_payload = json.dumps(sell_prices)
# Log before publishing
_LOGGER.debug(
"Publishing buy prices to MQTT topic %s with RETAIN=TRUE, QoS=1, payload length: %d bytes",
self.mqtt_topic_buy,
len(buy_payload)
)
_LOGGER.debug(
"Publishing sell prices to MQTT topic %s with RETAIN=TRUE, QoS=1, payload length: %d bytes",
self.mqtt_topic_sell,
len(sell_payload)
)
# Publish buy prices to MQTT with explicit retain flag
await mqtt.async_publish(
self.hass,
self.mqtt_topic_buy,
buy_payload,
qos=1,
retain=True
)
# Publish sell prices to MQTT with explicit retain flag
await mqtt.async_publish(
self.hass,
self.mqtt_topic_sell,
sell_payload,
qos=1,
retain=True
)
# Double-check with a direct publish to ensure they're retained
await self.hass.services.async_call(
"mqtt",
"publish",
{
"topic": self.mqtt_topic_buy,
"payload": buy_payload,
"qos": 1,
"retain": True
},
blocking=True
)
await self.hass.services.async_call(
"mqtt",
"publish",
{
"topic": self.mqtt_topic_sell,
"payload": sell_payload,
"qos": 1,
"retain": True
},
blocking=True
)
now = dt_util.now()
self._last_published = now
_LOGGER.info(
"Published %d buy prices to %s and %d sell prices to %s WITH RETAIN FLAG",
len(buy_prices),
self.mqtt_topic_buy,
len(sell_prices),
self.mqtt_topic_sell
)
return True
except Exception as e:
_LOGGER.error("Error publishing Pstryk prices to MQTT: %s", str(e))
return False
async def schedule_periodic_updates(self, interval_minutes=5):
"""Schedule periodic updates to MQTT."""
if self._unsub_timer:
self._unsub_timer()
self._unsub_timer = None
# Initial publish
await self.publish_prices()
"""Publish prices to MQTT using common function."""
from .mqtt_common import publish_mqtt_prices
# Schedule periodic updates
async def periodic_publish(_now=None):
"""Handle periodic publishing."""
await self.publish_prices()
# POPRAWKA: użycie bezpośrednio funkcji async_track_time_interval zamiast przez hass.helpers.event
self._unsub_timer = async_track_time_interval(
self.hass,
periodic_publish,
timedelta(minutes=interval_minutes)
success = await publish_mqtt_prices(
self.hass,
self.entry_id,
self.mqtt_topic_buy,
self.mqtt_topic_sell
)
_LOGGER.debug(
"Scheduled periodic MQTT publishing every %d minutes",
if success:
self._last_published = dt_util.now()
return success
async def schedule_periodic_updates(self, interval_minutes=60):
"""Schedule periodic updates to MQTT (default: 60 minutes)."""
from .mqtt_common import setup_periodic_mqtt_publish
# Use common function for periodic publishing
await setup_periodic_mqtt_publish(
self.hass,
self.entry_id,
self.mqtt_topic_buy,
self.mqtt_topic_sell,
interval_minutes
)
@ -249,10 +208,8 @@ class PstrykMqttPublisher:
def unsubscribe(self):
"""Unsubscribe from all events."""
if self._unsub_timer:
self._unsub_timer()
self._unsub_timer = None
_LOGGER.debug("Unsubscribed from MQTT publishing timer")
# Cleanup is handled by common function in mqtt_common.py
_LOGGER.debug("MQTT publisher cleanup requested")
@property
def last_published(self):