218 lines
8.0 KiB
Python
218 lines
8.0 KiB
Python
"""MQTT Publisher for Pstryk Energy integration."""
|
|
import logging
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
import asyncio
|
|
from homeassistant.helpers.entity import Entity
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.util import dt as dt_util
|
|
from homeassistant.components import mqtt
|
|
from homeassistant.helpers.translation import async_get_translations
|
|
from homeassistant.helpers.event import async_track_time_interval
|
|
|
|
from .const import (
|
|
DOMAIN,
|
|
DEFAULT_MQTT_TOPIC_BUY,
|
|
DEFAULT_MQTT_TOPIC_SELL
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
class PstrykMqttPublisher:
|
|
"""Class to handle publishing Pstryk energy prices to MQTT for EVCC."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
entry_id: str,
|
|
mqtt_topic_buy: str = DEFAULT_MQTT_TOPIC_BUY,
|
|
mqtt_topic_sell: str = DEFAULT_MQTT_TOPIC_SELL
|
|
):
|
|
"""Initialize the MQTT publisher."""
|
|
self.hass = hass
|
|
self.entry_id = entry_id
|
|
self.mqtt_topic_buy = mqtt_topic_buy
|
|
self.mqtt_topic_sell = mqtt_topic_sell
|
|
self._publish_task = None
|
|
self._initialized = False
|
|
self._translations = {}
|
|
self._unsub_timer = None
|
|
self._last_published = None
|
|
|
|
async def async_initialize(self):
|
|
"""Initialize the publisher and load translations."""
|
|
if self._initialized:
|
|
return True
|
|
|
|
# Load translations
|
|
try:
|
|
self._translations = await async_get_translations(
|
|
self.hass, self.hass.config.language, DOMAIN, ["mqtt"]
|
|
)
|
|
except Exception as ex:
|
|
_LOGGER.warning("Failed to load translations for MQTT publisher: %s", ex)
|
|
|
|
self._initialized = True
|
|
return True
|
|
|
|
def _is_day_data_valid(self, day_prices):
|
|
"""Check if prices for a specific day look like real data.
|
|
|
|
Returns False if the data appears to be placeholders.
|
|
"""
|
|
if not day_prices:
|
|
return False
|
|
|
|
price_values = [p.get("price") for p in day_prices if p.get("price") is not None]
|
|
|
|
if not price_values:
|
|
return False
|
|
|
|
# Need at least 20 hours for a valid day
|
|
if len(price_values) < 20:
|
|
return False
|
|
|
|
# If all values are identical, it's likely placeholder data
|
|
unique_values = set(price_values)
|
|
if len(unique_values) == 1:
|
|
return False
|
|
|
|
# If more than 90% of values are the same, probably placeholders
|
|
most_common = max(set(price_values), key=price_values.count)
|
|
if price_values.count(most_common) / len(price_values) > 0.9:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _format_prices_for_evcc(self, prices_data, price_type):
|
|
"""Format prices in EVCC expected format.
|
|
|
|
EVCC expects a list of objects with the following structure:
|
|
[
|
|
{
|
|
"start": "2024-05-07T00:00:00Z", // ISO timestamp in UTC
|
|
"end": "2024-05-07T01:00:00Z", // ISO timestamp in UTC (next hour)
|
|
"value": 0.1234 // Price in PLN/kWh
|
|
}
|
|
]
|
|
"""
|
|
if not prices_data or "prices" not in prices_data:
|
|
return []
|
|
|
|
formatted_prices = []
|
|
|
|
# First, check if we're in 48h mode
|
|
mqtt_48h_mode = self.hass.data[DOMAIN].get(f"{self.entry_id}_mqtt_48h_mode", False)
|
|
|
|
# Get current date for comparison
|
|
now = dt_util.as_local(dt_util.utcnow())
|
|
today_str = now.strftime("%Y-%m-%d")
|
|
tomorrow_str = (now + timedelta(days=1)).strftime("%Y-%m-%d")
|
|
|
|
# Get the appropriate price list based on mode
|
|
if mqtt_48h_mode:
|
|
price_list = prices_data.get("prices", [])
|
|
|
|
# In 48h mode, we need to check if tomorrow's data is valid
|
|
# Separate today and tomorrow prices
|
|
today_prices = [p for p in price_list if p.get("start", "").startswith(today_str)]
|
|
tomorrow_prices = [p for p in price_list if p.get("start", "").startswith(tomorrow_str)]
|
|
|
|
# Always include today's prices
|
|
prices_to_process = today_prices.copy()
|
|
|
|
# Only include tomorrow if the data looks valid
|
|
if tomorrow_prices and self._is_day_data_valid(tomorrow_prices):
|
|
prices_to_process.extend(tomorrow_prices)
|
|
_LOGGER.info(f"Including {len(tomorrow_prices)} tomorrow prices in MQTT publish")
|
|
else:
|
|
if tomorrow_prices:
|
|
_LOGGER.info(f"Excluding {len(tomorrow_prices)} tomorrow prices from MQTT - appear to be placeholders")
|
|
else:
|
|
_LOGGER.debug("No tomorrow prices available for MQTT publish")
|
|
else:
|
|
# In normal mode, use only today's prices
|
|
prices_to_process = prices_data.get("prices_today", [])
|
|
|
|
# Process the selected prices
|
|
for price_entry in prices_to_process:
|
|
try:
|
|
if "start" not in price_entry or "price" not in price_entry:
|
|
continue
|
|
|
|
# Validate price is a number
|
|
try:
|
|
price_value = float(price_entry["price"])
|
|
except (TypeError, ValueError):
|
|
_LOGGER.warning("Invalid price value: %s", price_entry.get("price"))
|
|
continue
|
|
|
|
# Parse local time and convert to UTC ISO format
|
|
local_dt = dt_util.parse_datetime(price_entry["start"])
|
|
if not local_dt:
|
|
continue
|
|
|
|
utc_dt = dt_util.as_utc(local_dt)
|
|
|
|
# Calculate end time (1 hour later)
|
|
end_dt = utc_dt + timedelta(hours=1)
|
|
|
|
# Format times in ISO format with Z suffix for UTC
|
|
start_str = utc_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
# EVCC w trybie custom pokazuje ceny jako "gr" (grosze),
|
|
price_value = price_value / 1
|
|
|
|
formatted_prices.append({
|
|
"start": start_str,
|
|
"end": end_str,
|
|
"value": price_value
|
|
})
|
|
except Exception as e:
|
|
_LOGGER.error("Error formatting price for EVCC: %s", str(e))
|
|
|
|
_LOGGER.debug(f"Formatted {len(formatted_prices)} prices for MQTT from {len(prices_to_process)} input prices")
|
|
return formatted_prices
|
|
|
|
async def publish_prices(self):
|
|
"""Publish prices to MQTT using common function."""
|
|
from .mqtt_common import publish_mqtt_prices
|
|
|
|
success = await publish_mqtt_prices(
|
|
self.hass,
|
|
self.entry_id,
|
|
self.mqtt_topic_buy,
|
|
self.mqtt_topic_sell
|
|
)
|
|
|
|
if success:
|
|
self._last_published = dt_util.now()
|
|
|
|
return success
|
|
|
|
async def schedule_periodic_updates(self, interval_minutes=60):
|
|
"""Schedule periodic updates to MQTT (default: 60 minutes)."""
|
|
from .mqtt_common import setup_periodic_mqtt_publish
|
|
|
|
# Use common function for periodic publishing
|
|
await setup_periodic_mqtt_publish(
|
|
self.hass,
|
|
self.entry_id,
|
|
self.mqtt_topic_buy,
|
|
self.mqtt_topic_sell,
|
|
interval_minutes
|
|
)
|
|
|
|
return True
|
|
|
|
def unsubscribe(self):
|
|
"""Unsubscribe from all events."""
|
|
# Cleanup is handled by common function in mqtt_common.py
|
|
_LOGGER.debug("MQTT publisher cleanup requested")
|
|
|
|
@property
|
|
def last_published(self):
|
|
"""Return the timestamp of the last successful publish."""
|
|
return self._last_published
|