262 lines
9.7 KiB
Python
262 lines
9.7 KiB
Python
"""MQTT Publisher for Pstryk Energy integration."""
|
|
import logging
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
import asyncio
|
|
from homeassistant.helpers.entity import Entity
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.util import dt as dt_util
|
|
from homeassistant.components import mqtt
|
|
from homeassistant.helpers.translation import async_get_translations
|
|
from homeassistant.helpers.event import async_track_time_interval
|
|
|
|
from .const import (
|
|
DOMAIN,
|
|
DEFAULT_MQTT_TOPIC_BUY,
|
|
DEFAULT_MQTT_TOPIC_SELL
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
class PstrykMqttPublisher:
|
|
"""Class to handle publishing Pstryk energy prices to MQTT for EVCC."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
entry_id: str,
|
|
mqtt_topic_buy: str = DEFAULT_MQTT_TOPIC_BUY,
|
|
mqtt_topic_sell: str = DEFAULT_MQTT_TOPIC_SELL
|
|
):
|
|
"""Initialize the MQTT publisher."""
|
|
self.hass = hass
|
|
self.entry_id = entry_id
|
|
self.mqtt_topic_buy = mqtt_topic_buy
|
|
self.mqtt_topic_sell = mqtt_topic_sell
|
|
self._publish_task = None
|
|
self._initialized = False
|
|
self._translations = {}
|
|
self._unsub_timer = None
|
|
self._last_published = None
|
|
|
|
async def async_initialize(self):
|
|
"""Initialize the publisher and load translations."""
|
|
if self._initialized:
|
|
return True
|
|
|
|
# Load translations
|
|
try:
|
|
self._translations = await async_get_translations(
|
|
self.hass, self.hass.config.language, DOMAIN, ["mqtt"]
|
|
)
|
|
except Exception as ex:
|
|
_LOGGER.warning("Failed to load translations for MQTT publisher: %s", ex)
|
|
|
|
self._initialized = True
|
|
return True
|
|
|
|
def _is_day_data_valid(self, day_prices):
|
|
"""Check if prices for a specific day look like real data.
|
|
|
|
Returns False if the data appears to be placeholders.
|
|
"""
|
|
if not day_prices:
|
|
return False
|
|
|
|
price_values = [p.get("price") for p in day_prices if p.get("price") is not None]
|
|
|
|
if not price_values:
|
|
return False
|
|
|
|
# Need at least 20 hours for a valid day
|
|
if len(price_values) < 20:
|
|
return False
|
|
|
|
# If all values are identical, it's likely placeholder data
|
|
unique_values = set(price_values)
|
|
if len(unique_values) == 1:
|
|
return False
|
|
|
|
# If more than 90% of values are the same, probably placeholders
|
|
most_common = max(set(price_values), key=price_values.count)
|
|
if price_values.count(most_common) / len(price_values) > 0.9:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _format_prices_for_evcc(self, prices_data, price_type):
|
|
"""Format prices in EVCC expected format.
|
|
|
|
EVCC expects a list of objects with the following structure:
|
|
[
|
|
{
|
|
"start": "2024-05-07T00:00:00Z", // ISO timestamp in UTC
|
|
"end": "2024-05-07T01:00:00Z", // ISO timestamp in UTC (next hour)
|
|
"value": 0.1234 // Price in PLN/kWh
|
|
}
|
|
]
|
|
"""
|
|
if not prices_data or "prices" not in prices_data:
|
|
return []
|
|
|
|
formatted_prices = []
|
|
|
|
# First, check if we're in 48h mode
|
|
mqtt_48h_mode = self.hass.data[DOMAIN].get(f"{self.entry_id}_mqtt_48h_mode", False)
|
|
|
|
# Get current date for comparison - in local time
|
|
now_local = dt_util.as_local(dt_util.utcnow())
|
|
today_date = now_local.date()
|
|
tomorrow_date = (now_local + timedelta(days=1)).date()
|
|
|
|
# Get all prices from coordinator
|
|
all_prices = prices_data.get("prices", [])
|
|
|
|
# Process prices and group by date
|
|
prices_by_date = {}
|
|
for price_entry in all_prices:
|
|
try:
|
|
if "start" not in price_entry or "price" not in price_entry:
|
|
continue
|
|
|
|
# Parse the datetime (it's already in local time from coordinator)
|
|
price_datetime = dt_util.parse_datetime(price_entry["start"])
|
|
if not price_datetime:
|
|
continue
|
|
|
|
# Ensure it's local time
|
|
price_datetime_local = dt_util.as_local(price_datetime)
|
|
price_date = price_datetime_local.date()
|
|
|
|
if price_date not in prices_by_date:
|
|
prices_by_date[price_date] = []
|
|
|
|
prices_by_date[price_date].append(price_entry)
|
|
|
|
except Exception as e:
|
|
_LOGGER.error("Error processing price entry: %s", str(e))
|
|
|
|
# Determine which days to include
|
|
days_to_include = []
|
|
|
|
if mqtt_48h_mode:
|
|
# In 48h mode, include today and tomorrow
|
|
if today_date in prices_by_date:
|
|
days_to_include.append(today_date)
|
|
|
|
if tomorrow_date in prices_by_date:
|
|
# Check if tomorrow's data is valid
|
|
tomorrow_prices = prices_by_date[tomorrow_date]
|
|
if self._is_day_data_valid(tomorrow_prices):
|
|
days_to_include.append(tomorrow_date)
|
|
_LOGGER.info(f"Including {len(tomorrow_prices)} tomorrow prices in MQTT publish")
|
|
else:
|
|
_LOGGER.info(f"Excluding tomorrow prices from MQTT - appear to be placeholders or incomplete")
|
|
else:
|
|
# In 24h mode, only include today
|
|
if today_date in prices_by_date:
|
|
days_to_include.append(today_date)
|
|
|
|
# Process selected days
|
|
for date_to_include in sorted(days_to_include):
|
|
day_prices = prices_by_date.get(date_to_include, [])
|
|
|
|
# Sort by time to ensure correct order
|
|
day_prices.sort(key=lambda x: x["start"])
|
|
|
|
for price_entry in day_prices:
|
|
try:
|
|
# Validate price is a number
|
|
try:
|
|
price_value = float(price_entry["price"])
|
|
except (TypeError, ValueError):
|
|
_LOGGER.warning("Invalid price value: %s", price_entry.get("price"))
|
|
continue
|
|
|
|
# Parse local time and convert to UTC ISO format
|
|
local_dt = dt_util.parse_datetime(price_entry["start"])
|
|
if not local_dt:
|
|
continue
|
|
|
|
# Ensure it's local time and convert to UTC
|
|
local_dt = dt_util.as_local(local_dt)
|
|
utc_dt = dt_util.as_utc(local_dt)
|
|
|
|
# Calculate end time (1 hour later)
|
|
end_dt = utc_dt + timedelta(hours=1)
|
|
|
|
# Format times in ISO format with Z suffix for UTC
|
|
start_str = utc_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
formatted_prices.append({
|
|
"start": start_str,
|
|
"end": end_str,
|
|
"value": price_value
|
|
})
|
|
except Exception as e:
|
|
_LOGGER.error("Error formatting price for EVCC: %s", str(e))
|
|
|
|
# Log summary
|
|
if formatted_prices:
|
|
first_time = formatted_prices[0]["start"]
|
|
last_time = formatted_prices[-1]["start"]
|
|
_LOGGER.debug(f"Formatted {len(formatted_prices)} prices for MQTT from {first_time} to {last_time}")
|
|
|
|
# Verify we have complete days
|
|
hours_by_date = {}
|
|
for fp in formatted_prices:
|
|
date_part = fp["start"][:10] # YYYY-MM-DD
|
|
if date_part not in hours_by_date:
|
|
hours_by_date[date_part] = 0
|
|
hours_by_date[date_part] += 1
|
|
|
|
for date, hours in hours_by_date.items():
|
|
if hours != 24:
|
|
_LOGGER.warning(f"Incomplete day {date}: only {hours} hours instead of 24")
|
|
else:
|
|
_LOGGER.warning("No prices formatted for MQTT")
|
|
|
|
return formatted_prices
|
|
|
|
async def publish_prices(self):
|
|
"""Publish prices to MQTT using common function."""
|
|
from .mqtt_common import publish_mqtt_prices
|
|
|
|
success = await publish_mqtt_prices(
|
|
self.hass,
|
|
self.entry_id,
|
|
self.mqtt_topic_buy,
|
|
self.mqtt_topic_sell
|
|
)
|
|
|
|
if success:
|
|
self._last_published = dt_util.now()
|
|
|
|
return success
|
|
|
|
async def schedule_periodic_updates(self, interval_minutes=60):
|
|
"""Schedule periodic updates to MQTT (default: 60 minutes)."""
|
|
from .mqtt_common import setup_periodic_mqtt_publish
|
|
|
|
# Use common function for periodic publishing
|
|
await setup_periodic_mqtt_publish(
|
|
self.hass,
|
|
self.entry_id,
|
|
self.mqtt_topic_buy,
|
|
self.mqtt_topic_sell,
|
|
interval_minutes
|
|
)
|
|
|
|
return True
|
|
|
|
def unsubscribe(self):
|
|
"""Unsubscribe from all events."""
|
|
# Cleanup is handled by common function in mqtt_common.py
|
|
_LOGGER.debug("MQTT publisher cleanup requested")
|
|
|
|
@property
|
|
def last_published(self):
|
|
"""Return the timestamp of the last successful publish."""
|
|
return self._last_published
|