diff --git a/custom_components/pstryk/mqtt_publisher.py b/custom_components/pstryk/mqtt_publisher.py index 761e5d9..b227d59 100644 --- a/custom_components/pstryk/mqtt_publisher.py +++ b/custom_components/pstryk/mqtt_publisher.py @@ -33,6 +33,7 @@ class PstrykMqttPublisher: self.entry_id = entry_id self.mqtt_topic_buy = mqtt_topic_buy self.mqtt_topic_sell = mqtt_topic_sell + self._publish_task = None self._initialized = False self._translations = {} self._unsub_timer = None @@ -53,6 +54,35 @@ class PstrykMqttPublisher: self._initialized = True return True + + def _is_day_data_valid(self, day_prices): + """Check if prices for a specific day look like real data. + + Returns False if the data appears to be placeholders. + """ + if not day_prices: + return False + + price_values = [p.get("price") for p in day_prices if p.get("price") is not None] + + if not price_values: + return False + + # Need at least 20 hours for a valid day + if len(price_values) < 20: + return False + + # If all values are identical, it's likely placeholder data + unique_values = set(price_values) + if len(unique_values) == 1: + return False + + # If more than 90% of values are the same, probably placeholders + most_common = max(set(price_values), key=price_values.count) + if price_values.count(most_common) / len(price_values) > 0.9: + return False + + return True def _format_prices_for_evcc(self, prices_data, price_type): """Format prices in EVCC expected format. @@ -71,11 +101,52 @@ class PstrykMqttPublisher: formatted_prices = [] - for price_entry in prices_data.get("prices", []): + # First, check if we're in 48h mode + mqtt_48h_mode = self.hass.data[DOMAIN].get(f"{self.entry_id}_mqtt_48h_mode", False) + + # Get current date for comparison + now = dt_util.as_local(dt_util.utcnow()) + today_str = now.strftime("%Y-%m-%d") + tomorrow_str = (now + timedelta(days=1)).strftime("%Y-%m-%d") + + # Get the appropriate price list based on mode + if mqtt_48h_mode: + price_list = prices_data.get("prices", []) + + # In 48h mode, we need to check if tomorrow's data is valid + # Separate today and tomorrow prices + today_prices = [p for p in price_list if p.get("start", "").startswith(today_str)] + tomorrow_prices = [p for p in price_list if p.get("start", "").startswith(tomorrow_str)] + + # Always include today's prices + prices_to_process = today_prices.copy() + + # Only include tomorrow if the data looks valid + if tomorrow_prices and self._is_day_data_valid(tomorrow_prices): + prices_to_process.extend(tomorrow_prices) + _LOGGER.info(f"Including {len(tomorrow_prices)} tomorrow prices in MQTT publish") + else: + if tomorrow_prices: + _LOGGER.info(f"Excluding {len(tomorrow_prices)} tomorrow prices from MQTT - appear to be placeholders") + else: + _LOGGER.debug("No tomorrow prices available for MQTT publish") + else: + # In normal mode, use only today's prices + prices_to_process = prices_data.get("prices_today", []) + + # Process the selected prices + for price_entry in prices_to_process: try: if "start" not in price_entry or "price" not in price_entry: continue + # Validate price is a number + try: + price_value = float(price_entry["price"]) + except (TypeError, ValueError): + _LOGGER.warning("Invalid price value: %s", price_entry.get("price")) + continue + # Parse local time and convert to UTC ISO format local_dt = dt_util.parse_datetime(price_entry["start"]) if not local_dt: @@ -91,7 +162,7 @@ class PstrykMqttPublisher: end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") # EVCC w trybie custom pokazuje ceny jako "gr" (grosze), - price_value = price_entry["price"] / 1 + price_value = price_value / 1 formatted_prices.append({ "start": start_str, @@ -101,147 +172,35 @@ class PstrykMqttPublisher: except Exception as e: _LOGGER.error("Error formatting price for EVCC: %s", str(e)) + _LOGGER.debug(f"Formatted {len(formatted_prices)} prices for MQTT from {len(prices_to_process)} input prices") return formatted_prices async def publish_prices(self): - """Publish prices to MQTT in EVCC format for both buy and sell.""" - try: - if not self._initialized: - await self.async_initialize() - - # Get both buy and sell coordinators - buy_coordinator = self.hass.data[DOMAIN].get(f"{self.entry_id}_buy") - sell_coordinator = self.hass.data[DOMAIN].get(f"{self.entry_id}_sell") - - if not buy_coordinator or not sell_coordinator: - _LOGGER.error("Unable to find Pstryk coordinators for MQTT publishing") - return False - - # Check if data is available - if not buy_coordinator.data: - _LOGGER.warning("No buy price data available for MQTT publishing") - return False - - if not sell_coordinator.data: - _LOGGER.warning("No sell price data available for MQTT publishing") - return False - - # Format prices for EVCC - buy_prices = self._format_prices_for_evcc(buy_coordinator.data, "buy") - sell_prices = self._format_prices_for_evcc(sell_coordinator.data, "sell") - - if not buy_prices: - _LOGGER.warning("No valid buy prices available to publish to MQTT") - return False - - if not sell_prices: - _LOGGER.warning("No valid sell prices available to publish to MQTT") - return False - - # Sort prices by time to ensure chronological order - buy_prices.sort(key=lambda x: x["start"]) - sell_prices.sort(key=lambda x: x["start"]) - - # Convert to JSON - buy_payload = json.dumps(buy_prices) - sell_payload = json.dumps(sell_prices) - - # Log before publishing - _LOGGER.debug( - "Publishing buy prices to MQTT topic %s with RETAIN=TRUE, QoS=1, payload length: %d bytes", - self.mqtt_topic_buy, - len(buy_payload) - ) - - _LOGGER.debug( - "Publishing sell prices to MQTT topic %s with RETAIN=TRUE, QoS=1, payload length: %d bytes", - self.mqtt_topic_sell, - len(sell_payload) - ) - - # Publish buy prices to MQTT with explicit retain flag - await mqtt.async_publish( - self.hass, - self.mqtt_topic_buy, - buy_payload, - qos=1, - retain=True - ) - - # Publish sell prices to MQTT with explicit retain flag - await mqtt.async_publish( - self.hass, - self.mqtt_topic_sell, - sell_payload, - qos=1, - retain=True - ) - - # Double-check with a direct publish to ensure they're retained - await self.hass.services.async_call( - "mqtt", - "publish", - { - "topic": self.mqtt_topic_buy, - "payload": buy_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) - - await self.hass.services.async_call( - "mqtt", - "publish", - { - "topic": self.mqtt_topic_sell, - "payload": sell_payload, - "qos": 1, - "retain": True - }, - blocking=True - ) - - now = dt_util.now() - self._last_published = now - - _LOGGER.info( - "Published %d buy prices to %s and %d sell prices to %s WITH RETAIN FLAG", - len(buy_prices), - self.mqtt_topic_buy, - len(sell_prices), - self.mqtt_topic_sell - ) - - return True - - except Exception as e: - _LOGGER.error("Error publishing Pstryk prices to MQTT: %s", str(e)) - return False - - async def schedule_periodic_updates(self, interval_minutes=5): - """Schedule periodic updates to MQTT.""" - if self._unsub_timer: - self._unsub_timer() - self._unsub_timer = None - - # Initial publish - await self.publish_prices() + """Publish prices to MQTT using common function.""" + from .mqtt_common import publish_mqtt_prices - # Schedule periodic updates - async def periodic_publish(_now=None): - """Handle periodic publishing.""" - await self.publish_prices() - - # POPRAWKA: użycie bezpośrednio funkcji async_track_time_interval zamiast przez hass.helpers.event - self._unsub_timer = async_track_time_interval( - self.hass, - periodic_publish, - timedelta(minutes=interval_minutes) + success = await publish_mqtt_prices( + self.hass, + self.entry_id, + self.mqtt_topic_buy, + self.mqtt_topic_sell ) - _LOGGER.debug( - "Scheduled periodic MQTT publishing every %d minutes", + if success: + self._last_published = dt_util.now() + + return success + + async def schedule_periodic_updates(self, interval_minutes=60): + """Schedule periodic updates to MQTT (default: 60 minutes).""" + from .mqtt_common import setup_periodic_mqtt_publish + + # Use common function for periodic publishing + await setup_periodic_mqtt_publish( + self.hass, + self.entry_id, + self.mqtt_topic_buy, + self.mqtt_topic_sell, interval_minutes ) @@ -249,10 +208,8 @@ class PstrykMqttPublisher: def unsubscribe(self): """Unsubscribe from all events.""" - if self._unsub_timer: - self._unsub_timer() - self._unsub_timer = None - _LOGGER.debug("Unsubscribed from MQTT publishing timer") + # Cleanup is handled by common function in mqtt_common.py + _LOGGER.debug("MQTT publisher cleanup requested") @property def last_published(self):