From edcdc280bc305bc486df121cef3e2517cd8c20a0 Mon Sep 17 00:00:00 2001 From: balgerion <133121849+balgerion@users.noreply.github.com> Date: Sat, 28 Jun 2025 11:35:34 +0200 Subject: [PATCH] Update update_coordinator.py --- .../pstryk/update_coordinator.py | 379 ++++++++++++------ 1 file changed, 253 insertions(+), 126 deletions(-) diff --git a/custom_components/pstryk/update_coordinator.py b/custom_components/pstryk/update_coordinator.py index 3da9060..c085b20 100644 --- a/custom_components/pstryk/update_coordinator.py +++ b/custom_components/pstryk/update_coordinator.py @@ -249,6 +249,67 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator): ).format(status=resp.status, error=short_error)) return await resp.json() + + def _is_likely_placeholder_data(self, prices_for_day): + """Check if prices for a day are likely placeholders. + + Returns True if: + - There are no prices + - ALL prices have exactly the same value (suggesting API returned default values) + - There are too many consecutive hours with the same value (e.g., 10+ hours) + """ + if not prices_for_day: + return True + + # Get all price values + price_values = [p.get("price") for p in prices_for_day if p.get("price") is not None] + + if not price_values: + return True + + # If we have less than 20 prices for a day, it's incomplete data + if len(price_values) < 20: + _LOGGER.debug(f"Only {len(price_values)} prices for the day, likely incomplete data") + return True + + # Check if ALL values are identical + unique_values = set(price_values) + if len(unique_values) == 1: + _LOGGER.debug(f"All {len(price_values)} prices have the same value ({price_values[0]}), likely placeholders") + return True + + # Additional check: if more than 90% of values are the same, it's suspicious + most_common_value = max(set(price_values), key=price_values.count) + count_most_common = price_values.count(most_common_value) + if count_most_common / len(price_values) > 0.9: + _LOGGER.debug(f"{count_most_common}/{len(price_values)} prices have value {most_common_value}, likely placeholders") + return True + + return False + + def _count_consecutive_same_values(self, prices): + """Count maximum consecutive hours with the same price.""" + if not prices: + return 0 + + # Sort by time to ensure consecutive checking + sorted_prices = sorted(prices, key=lambda x: x.get("start", "")) + + max_consecutive = 1 + current_consecutive = 1 + last_value = None + + for price in sorted_prices: + value = price.get("price") + if value is not None: + if value == last_value: + current_consecutive += 1 + max_consecutive = max(max_consecutive, current_consecutive) + else: + current_consecutive = 1 + last_value = value + + return max_consecutive async def _check_and_publish_mqtt(self, new_data): """Check if we should publish to MQTT after update.""" @@ -261,11 +322,27 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator): # Check if tomorrow prices are available in new data all_prices = new_data.get("prices", []) tomorrow_prices = [p for p in all_prices if p["start"].startswith(tomorrow)] - has_tomorrow_prices = len(tomorrow_prices) > 0 - # If we didn't have tomorrow prices before, but now we do, publish to MQTT - if not self._had_tomorrow_prices and has_tomorrow_prices: - _LOGGER.info("Tomorrow prices detected for %s, triggering MQTT publish", self.price_type) + # Check if tomorrow's data is valid (not placeholders) + has_valid_tomorrow_prices = ( + len(tomorrow_prices) >= 20 and + not self._is_likely_placeholder_data(tomorrow_prices) + ) + + # Log what we found for debugging + if tomorrow_prices: + unique_values = set(p.get("price") for p in tomorrow_prices if p.get("price") is not None) + consecutive = self._count_consecutive_same_values(tomorrow_prices) + _LOGGER.debug( + f"Tomorrow has {len(tomorrow_prices)} prices, " + f"{len(unique_values)} unique values, " + f"max {consecutive} consecutive same values, " + f"valid: {has_valid_tomorrow_prices}" + ) + + # If we didn't have valid tomorrow prices before, but now we do, publish to MQTT immediately + if not self._had_tomorrow_prices and has_valid_tomorrow_prices: + _LOGGER.info("Valid tomorrow prices detected for %s, triggering immediate MQTT publish", self.price_type) # Find our config entry entry_id = None @@ -298,141 +375,150 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator): success = await publish_mqtt_prices(self.hass, entry_id, mqtt_topic_buy, mqtt_topic_sell) if success: - _LOGGER.info("Successfully published 48h prices to MQTT after detecting tomorrow prices") + _LOGGER.info("Successfully published 48h prices to MQTT after detecting valid tomorrow prices") else: _LOGGER.error("Failed to publish to MQTT after detecting tomorrow prices") # Update state for next check - self._had_tomorrow_prices = has_tomorrow_prices + self._had_tomorrow_prices = has_valid_tomorrow_prices async def _async_update_data(self): - """Fetch 48h of frames and extract current + today's list.""" - _LOGGER.debug("Starting %s price update (48h mode: %s)", self.price_type, self.mqtt_48h_mode) - - # Store the previous data for fallback - previous_data = None - if hasattr(self, 'data') and self.data: - previous_data = self.data.copy() if self.data else None - if previous_data: - # Oznacz jako dane z cache, jeśli będziemy ich używać - previous_data["is_cached"] = True - - today_local = dt_util.now().replace(hour=0, minute=0, second=0, microsecond=0) - window_end_local = today_local + timedelta(days=2) - start_utc = dt_util.as_utc(today_local).strftime("%Y-%m-%dT%H:%M:%SZ") - end_utc = dt_util.as_utc(window_end_local).strftime("%Y-%m-%dT%H:%M:%SZ") - - endpoint_tpl = BUY_ENDPOINT if self.price_type == "buy" else SELL_ENDPOINT - endpoint = endpoint_tpl.format(start=start_utc, end=end_utc) - url = f"{API_URL}{endpoint}" - - _LOGGER.debug("Requesting %s data from %s", self.price_type, url) - - try: - # Załaduj tłumaczenia dla mechanizmu ponowień - await self.retry_mechanism.load_translations(self.hass) + """Fetch 48h of frames and extract current + today's list.""" + _LOGGER.debug("Starting %s price update (48h mode: %s)", self.price_type, self.mqtt_48h_mode) - # Załaduj tłumaczenia dla koordynatora - try: - self._translations = await async_get_translations( - self.hass, self.hass.config.language, DOMAIN, ["debug"] - ) - except Exception as ex: - _LOGGER.warning("Failed to load translations for coordinator: %s", ex) + # Store the previous data for fallback + previous_data = None + if hasattr(self, 'data') and self.data: + previous_data = self.data.copy() if self.data else None + if previous_data: + previous_data["is_cached"] = True - # Użyj mechanizmu ponowień z parametrem price_type - # Nie potrzebujemy łapać asyncio.TimeoutError tutaj, ponieważ - # jest już obsługiwany w execute() z odpowiednimi tłumaczeniami - data = await self.retry_mechanism.execute( - self._make_api_request, - url, - price_type=self.price_type - ) - - frames = data.get("frames", []) - if not frames: - _LOGGER.warning("No frames returned for %s prices", self.price_type) - + # Get today's start in LOCAL time, then convert to UTC + # This ensures we get the correct "today" for the user's timezone + today_local = dt_util.now().replace(hour=0, minute=0, second=0, microsecond=0) + window_end_local = today_local + timedelta(days=2) + + # Convert to UTC for API request + start_utc = dt_util.as_utc(today_local) + end_utc = dt_util.as_utc(window_end_local) + + start_str = start_utc.strftime("%Y-%m-%dT%H:%M:%SZ") + end_str = end_utc.strftime("%Y-%m-%dT%H:%M:%SZ") + + endpoint_tpl = BUY_ENDPOINT if self.price_type == "buy" else SELL_ENDPOINT + endpoint = endpoint_tpl.format(start=start_str, end=end_str) + url = f"{API_URL}{endpoint}" + + _LOGGER.debug("Requesting %s data from %s", self.price_type, url) + + # Get current time in UTC for price comparison now_utc = dt_util.utcnow() - prices = [] - current_price = None - - for f in frames: - val = convert_price(f.get("price_gross")) - if val is None: - continue - start = dt_util.parse_datetime(f["start"]) - end = dt_util.parse_datetime(f["end"]) + + try: + # Load translations + await self.retry_mechanism.load_translations(self.hass) - # Weryfikacja poprawności dat - if not start or not end: - _LOGGER.warning("Invalid datetime format in frames for %s", self.price_type) - continue + try: + self._translations = await async_get_translations( + self.hass, self.hass.config.language, DOMAIN, ["debug"] + ) + except Exception as ex: + _LOGGER.warning("Failed to load translations for coordinator: %s", ex) + + # Use retry mechanism + data = await self.retry_mechanism.execute( + self._make_api_request, + url, + price_type=self.price_type + ) + + frames = data.get("frames", []) + if not frames: + _LOGGER.warning("No frames returned for %s prices", self.price_type) - local_start = dt_util.as_local(start).strftime("%Y-%m-%dT%H:%M:%S") - prices.append({"start": local_start, "price": val}) - if start <= now_utc < end: - current_price = val - - # only today's entries - today_str = today_local.strftime("%Y-%m-%d") - prices_today = [p for p in prices if p["start"].startswith(today_str)] - - _LOGGER.debug("Successfully fetched %s price data: current=%s, today_prices=%d, total_prices=%d", - self.price_type, current_price, len(prices_today), len(prices)) - - new_data = { - "prices_today": prices_today, - "prices": prices, - "current": current_price, - "is_cached": False, # Dane bezpośrednio z API - } - - # Check if we should publish to MQTT (only for first coordinator that detects new tomorrow prices) - if self.mqtt_48h_mode: - await self._check_and_publish_mqtt(new_data) - - return new_data - - except aiohttp.ClientError as err: - error_msg = self._translations.get( - "debug.network_error", - "Network error fetching {price_type} data: {error}" - ).format(price_type=self.price_type, error=str(err)) - _LOGGER.error(error_msg) - - if previous_data: - cache_msg = self._translations.get( - "debug.using_cache", - "Using cached data from previous update due to API failure" - ) - _LOGGER.warning(cache_msg) - return previous_data + prices = [] + current_price = None + + for f in frames: + val = convert_price(f.get("price_gross")) + if val is None: + continue + + start = dt_util.parse_datetime(f["start"]) + end = dt_util.parse_datetime(f["end"]) + + if not start or not end: + _LOGGER.warning("Invalid datetime format in frames for %s", self.price_type) + continue + + # Convert to local time for display (we need this for hourly data) + local_start = dt_util.as_local(start).strftime("%Y-%m-%dT%H:%M:%S") + prices.append({"start": local_start, "price": val}) + + # Check if this is the current price + if start <= now_utc < end: + current_price = val + + # Filter today's prices + today_local = dt_util.now().strftime("%Y-%m-%d") + prices_today = [p for p in prices if p["start"].startswith(today_local)] - raise UpdateFailed(self._translations.get( - "debug.network_error_user", - "Network error: {error}" - ).format(error=err)) - - except Exception as err: - error_msg = self._translations.get( - "debug.unexpected_error", - "Unexpected error fetching {price_type} data: {error}" - ).format(price_type=self.price_type, error=str(err)) - _LOGGER.exception(error_msg) - - if previous_data: - cache_msg = self._translations.get( - "debug.using_cache", - "Using cached data from previous update due to API failure" - ) - _LOGGER.warning(cache_msg) - return previous_data + _LOGGER.debug("Successfully fetched %s price data: current=%s, today_prices=%d, total_prices=%d", + self.price_type, current_price, len(prices_today), len(prices)) + + new_data = { + "prices_today": prices_today, + "prices": prices, + "current": current_price, + "is_cached": False, + } - raise UpdateFailed(self._translations.get( - "debug.unexpected_error_user", - "Error: {error}" - ).format(error=err)) + # Check if we should publish to MQTT + if self.mqtt_48h_mode: + await self._check_and_publish_mqtt(new_data) + + return new_data + + except aiohttp.ClientError as err: + error_msg = self._translations.get( + "debug.network_error", + "Network error fetching {price_type} data: {error}" + ).format(price_type=self.price_type, error=str(err)) + _LOGGER.error(error_msg) + + if previous_data: + cache_msg = self._translations.get( + "debug.using_cache", + "Using cached data from previous update due to API failure" + ) + _LOGGER.warning(cache_msg) + return previous_data + + raise UpdateFailed(self._translations.get( + "debug.network_error_user", + "Network error: {error}" + ).format(error=err)) + + except Exception as err: + error_msg = self._translations.get( + "debug.unexpected_error", + "Unexpected error fetching {price_type} data: {error}" + ).format(price_type=self.price_type, error=str(err)) + _LOGGER.exception(error_msg) + + if previous_data: + cache_msg = self._translations.get( + "debug.using_cache", + "Using cached data from previous update due to API failure" + ) + _LOGGER.warning(cache_msg) + return previous_data + + raise UpdateFailed(self._translations.get( + "debug.unexpected_error_user", + "Error: {error}" + ).format(error=err)) + def schedule_hourly_update(self): """Schedule next refresh 1 min after each full hour.""" @@ -452,10 +538,51 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator): self.hass, self._handle_hourly_update, dt_util.as_utc(next_run) ) + async def _check_tomorrow_prices_after_15(self): + """Check if tomorrow prices are available and publish to MQTT if needed. + This runs after 15:00 until midnight in 48h mode. + """ + if not self.mqtt_48h_mode: + return + + now = dt_util.now() + # Only run between 15:00 and 23:59 + if now.hour < 15: + return + + tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d") + + # Check if we already have valid tomorrow data + if self.data and self.data.get("prices"): + tomorrow_prices = [p for p in self.data.get("prices", []) if p.get("start", "").startswith(tomorrow)] + + # Check if tomorrow's data is valid (not placeholders) + has_valid_tomorrow = ( + len(tomorrow_prices) >= 20 and + not self._is_likely_placeholder_data(tomorrow_prices) + ) + + if has_valid_tomorrow: + # We already have valid data, nothing to do + return + else: + _LOGGER.info("Missing or invalid tomorrow prices at %s, will refresh data", now.strftime("%H:%M")) + + # If we don't have valid tomorrow data, force a refresh + # The refresh will automatically trigger MQTT publish via _check_and_publish_mqtt + await self.async_request_refresh() + async def _handle_hourly_update(self, _): """Handle hourly update.""" _LOGGER.debug("Running scheduled hourly update for %s", self.price_type) + + # First do the regular update await self.async_request_refresh() + + # Then check if we need tomorrow prices (only in 48h mode after 15:00) + await self._check_tomorrow_prices_after_15() + + # Schedule next hourly update self.schedule_hourly_update() def schedule_midnight_update(self):