Update update_coordinator.py

This commit is contained in:
balgerion
2025-06-28 11:35:34 +02:00
committed by GitHub
parent 0ac076dd6b
commit edcdc280bc

View File

@ -250,6 +250,67 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
return await resp.json()
def _is_likely_placeholder_data(self, prices_for_day):
"""Check if prices for a day are likely placeholders.
Returns True if:
- There are no prices
- ALL prices have exactly the same value (suggesting API returned default values)
- There are too many consecutive hours with the same value (e.g., 10+ hours)
"""
if not prices_for_day:
return True
# Get all price values
price_values = [p.get("price") for p in prices_for_day if p.get("price") is not None]
if not price_values:
return True
# If we have less than 20 prices for a day, it's incomplete data
if len(price_values) < 20:
_LOGGER.debug(f"Only {len(price_values)} prices for the day, likely incomplete data")
return True
# Check if ALL values are identical
unique_values = set(price_values)
if len(unique_values) == 1:
_LOGGER.debug(f"All {len(price_values)} prices have the same value ({price_values[0]}), likely placeholders")
return True
# Additional check: if more than 90% of values are the same, it's suspicious
most_common_value = max(set(price_values), key=price_values.count)
count_most_common = price_values.count(most_common_value)
if count_most_common / len(price_values) > 0.9:
_LOGGER.debug(f"{count_most_common}/{len(price_values)} prices have value {most_common_value}, likely placeholders")
return True
return False
def _count_consecutive_same_values(self, prices):
"""Count maximum consecutive hours with the same price."""
if not prices:
return 0
# Sort by time to ensure consecutive checking
sorted_prices = sorted(prices, key=lambda x: x.get("start", ""))
max_consecutive = 1
current_consecutive = 1
last_value = None
for price in sorted_prices:
value = price.get("price")
if value is not None:
if value == last_value:
current_consecutive += 1
max_consecutive = max(max_consecutive, current_consecutive)
else:
current_consecutive = 1
last_value = value
return max_consecutive
async def _check_and_publish_mqtt(self, new_data):
"""Check if we should publish to MQTT after update."""
if not self.mqtt_48h_mode:
@ -261,11 +322,27 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
# Check if tomorrow prices are available in new data
all_prices = new_data.get("prices", [])
tomorrow_prices = [p for p in all_prices if p["start"].startswith(tomorrow)]
has_tomorrow_prices = len(tomorrow_prices) > 0
# If we didn't have tomorrow prices before, but now we do, publish to MQTT
if not self._had_tomorrow_prices and has_tomorrow_prices:
_LOGGER.info("Tomorrow prices detected for %s, triggering MQTT publish", self.price_type)
# Check if tomorrow's data is valid (not placeholders)
has_valid_tomorrow_prices = (
len(tomorrow_prices) >= 20 and
not self._is_likely_placeholder_data(tomorrow_prices)
)
# Log what we found for debugging
if tomorrow_prices:
unique_values = set(p.get("price") for p in tomorrow_prices if p.get("price") is not None)
consecutive = self._count_consecutive_same_values(tomorrow_prices)
_LOGGER.debug(
f"Tomorrow has {len(tomorrow_prices)} prices, "
f"{len(unique_values)} unique values, "
f"max {consecutive} consecutive same values, "
f"valid: {has_valid_tomorrow_prices}"
)
# If we didn't have valid tomorrow prices before, but now we do, publish to MQTT immediately
if not self._had_tomorrow_prices and has_valid_tomorrow_prices:
_LOGGER.info("Valid tomorrow prices detected for %s, triggering immediate MQTT publish", self.price_type)
# Find our config entry
entry_id = None
@ -298,141 +375,150 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
success = await publish_mqtt_prices(self.hass, entry_id, mqtt_topic_buy, mqtt_topic_sell)
if success:
_LOGGER.info("Successfully published 48h prices to MQTT after detecting tomorrow prices")
_LOGGER.info("Successfully published 48h prices to MQTT after detecting valid tomorrow prices")
else:
_LOGGER.error("Failed to publish to MQTT after detecting tomorrow prices")
# Update state for next check
self._had_tomorrow_prices = has_tomorrow_prices
self._had_tomorrow_prices = has_valid_tomorrow_prices
async def _async_update_data(self):
"""Fetch 48h of frames and extract current + today's list."""
_LOGGER.debug("Starting %s price update (48h mode: %s)", self.price_type, self.mqtt_48h_mode)
"""Fetch 48h of frames and extract current + today's list."""
_LOGGER.debug("Starting %s price update (48h mode: %s)", self.price_type, self.mqtt_48h_mode)
# Store the previous data for fallback
previous_data = None
if hasattr(self, 'data') and self.data:
previous_data = self.data.copy() if self.data else None
if previous_data:
# Oznacz jako dane z cache, jeśli będziemy ich używać
previous_data["is_cached"] = True
# Store the previous data for fallback
previous_data = None
if hasattr(self, 'data') and self.data:
previous_data = self.data.copy() if self.data else None
if previous_data:
previous_data["is_cached"] = True
today_local = dt_util.now().replace(hour=0, minute=0, second=0, microsecond=0)
window_end_local = today_local + timedelta(days=2)
start_utc = dt_util.as_utc(today_local).strftime("%Y-%m-%dT%H:%M:%SZ")
end_utc = dt_util.as_utc(window_end_local).strftime("%Y-%m-%dT%H:%M:%SZ")
# Get today's start in LOCAL time, then convert to UTC
# This ensures we get the correct "today" for the user's timezone
today_local = dt_util.now().replace(hour=0, minute=0, second=0, microsecond=0)
window_end_local = today_local + timedelta(days=2)
endpoint_tpl = BUY_ENDPOINT if self.price_type == "buy" else SELL_ENDPOINT
endpoint = endpoint_tpl.format(start=start_utc, end=end_utc)
url = f"{API_URL}{endpoint}"
# Convert to UTC for API request
start_utc = dt_util.as_utc(today_local)
end_utc = dt_util.as_utc(window_end_local)
_LOGGER.debug("Requesting %s data from %s", self.price_type, url)
start_str = start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
end_str = end_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
try:
# Załaduj tłumaczenia dla mechanizmu ponowień
await self.retry_mechanism.load_translations(self.hass)
endpoint_tpl = BUY_ENDPOINT if self.price_type == "buy" else SELL_ENDPOINT
endpoint = endpoint_tpl.format(start=start_str, end=end_str)
url = f"{API_URL}{endpoint}"
# Załaduj tłumaczenia dla koordynatora
try:
self._translations = await async_get_translations(
self.hass, self.hass.config.language, DOMAIN, ["debug"]
)
except Exception as ex:
_LOGGER.warning("Failed to load translations for coordinator: %s", ex)
# Użyj mechanizmu ponowień z parametrem price_type
# Nie potrzebujemy łapać asyncio.TimeoutError tutaj, ponieważ
# jest już obsługiwany w execute() z odpowiednimi tłumaczeniami
data = await self.retry_mechanism.execute(
self._make_api_request,
url,
price_type=self.price_type
)
frames = data.get("frames", [])
if not frames:
_LOGGER.warning("No frames returned for %s prices", self.price_type)
_LOGGER.debug("Requesting %s data from %s", self.price_type, url)
# Get current time in UTC for price comparison
now_utc = dt_util.utcnow()
prices = []
current_price = None
for f in frames:
val = convert_price(f.get("price_gross"))
if val is None:
continue
start = dt_util.parse_datetime(f["start"])
end = dt_util.parse_datetime(f["end"])
try:
# Load translations
await self.retry_mechanism.load_translations(self.hass)
# Weryfikacja poprawności dat
if not start or not end:
_LOGGER.warning("Invalid datetime format in frames for %s", self.price_type)
continue
try:
self._translations = await async_get_translations(
self.hass, self.hass.config.language, DOMAIN, ["debug"]
)
except Exception as ex:
_LOGGER.warning("Failed to load translations for coordinator: %s", ex)
local_start = dt_util.as_local(start).strftime("%Y-%m-%dT%H:%M:%S")
prices.append({"start": local_start, "price": val})
if start <= now_utc < end:
current_price = val
# only today's entries
today_str = today_local.strftime("%Y-%m-%d")
prices_today = [p for p in prices if p["start"].startswith(today_str)]
_LOGGER.debug("Successfully fetched %s price data: current=%s, today_prices=%d, total_prices=%d",
self.price_type, current_price, len(prices_today), len(prices))
new_data = {
"prices_today": prices_today,
"prices": prices,
"current": current_price,
"is_cached": False, # Dane bezpośrednio z API
}
# Check if we should publish to MQTT (only for first coordinator that detects new tomorrow prices)
if self.mqtt_48h_mode:
await self._check_and_publish_mqtt(new_data)
return new_data
except aiohttp.ClientError as err:
error_msg = self._translations.get(
"debug.network_error",
"Network error fetching {price_type} data: {error}"
).format(price_type=self.price_type, error=str(err))
_LOGGER.error(error_msg)
if previous_data:
cache_msg = self._translations.get(
"debug.using_cache",
"Using cached data from previous update due to API failure"
# Use retry mechanism
data = await self.retry_mechanism.execute(
self._make_api_request,
url,
price_type=self.price_type
)
_LOGGER.warning(cache_msg)
return previous_data
raise UpdateFailed(self._translations.get(
"debug.network_error_user",
"Network error: {error}"
).format(error=err))
frames = data.get("frames", [])
if not frames:
_LOGGER.warning("No frames returned for %s prices", self.price_type)
except Exception as err:
error_msg = self._translations.get(
"debug.unexpected_error",
"Unexpected error fetching {price_type} data: {error}"
).format(price_type=self.price_type, error=str(err))
_LOGGER.exception(error_msg)
prices = []
current_price = None
if previous_data:
cache_msg = self._translations.get(
"debug.using_cache",
"Using cached data from previous update due to API failure"
)
_LOGGER.warning(cache_msg)
return previous_data
for f in frames:
val = convert_price(f.get("price_gross"))
if val is None:
continue
start = dt_util.parse_datetime(f["start"])
end = dt_util.parse_datetime(f["end"])
if not start or not end:
_LOGGER.warning("Invalid datetime format in frames for %s", self.price_type)
continue
# Convert to local time for display (we need this for hourly data)
local_start = dt_util.as_local(start).strftime("%Y-%m-%dT%H:%M:%S")
prices.append({"start": local_start, "price": val})
# Check if this is the current price
if start <= now_utc < end:
current_price = val
# Filter today's prices
today_local = dt_util.now().strftime("%Y-%m-%d")
prices_today = [p for p in prices if p["start"].startswith(today_local)]
_LOGGER.debug("Successfully fetched %s price data: current=%s, today_prices=%d, total_prices=%d",
self.price_type, current_price, len(prices_today), len(prices))
new_data = {
"prices_today": prices_today,
"prices": prices,
"current": current_price,
"is_cached": False,
}
# Check if we should publish to MQTT
if self.mqtt_48h_mode:
await self._check_and_publish_mqtt(new_data)
return new_data
except aiohttp.ClientError as err:
error_msg = self._translations.get(
"debug.network_error",
"Network error fetching {price_type} data: {error}"
).format(price_type=self.price_type, error=str(err))
_LOGGER.error(error_msg)
if previous_data:
cache_msg = self._translations.get(
"debug.using_cache",
"Using cached data from previous update due to API failure"
)
_LOGGER.warning(cache_msg)
return previous_data
raise UpdateFailed(self._translations.get(
"debug.network_error_user",
"Network error: {error}"
).format(error=err))
except Exception as err:
error_msg = self._translations.get(
"debug.unexpected_error",
"Unexpected error fetching {price_type} data: {error}"
).format(price_type=self.price_type, error=str(err))
_LOGGER.exception(error_msg)
if previous_data:
cache_msg = self._translations.get(
"debug.using_cache",
"Using cached data from previous update due to API failure"
)
_LOGGER.warning(cache_msg)
return previous_data
raise UpdateFailed(self._translations.get(
"debug.unexpected_error_user",
"Error: {error}"
).format(error=err))
raise UpdateFailed(self._translations.get(
"debug.unexpected_error_user",
"Error: {error}"
).format(error=err))
def schedule_hourly_update(self):
"""Schedule next refresh 1 min after each full hour."""
@ -452,10 +538,51 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
self.hass, self._handle_hourly_update, dt_util.as_utc(next_run)
)
async def _check_tomorrow_prices_after_15(self):
"""Check if tomorrow prices are available and publish to MQTT if needed.
This runs after 15:00 until midnight in 48h mode.
"""
if not self.mqtt_48h_mode:
return
now = dt_util.now()
# Only run between 15:00 and 23:59
if now.hour < 15:
return
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
# Check if we already have valid tomorrow data
if self.data and self.data.get("prices"):
tomorrow_prices = [p for p in self.data.get("prices", []) if p.get("start", "").startswith(tomorrow)]
# Check if tomorrow's data is valid (not placeholders)
has_valid_tomorrow = (
len(tomorrow_prices) >= 20 and
not self._is_likely_placeholder_data(tomorrow_prices)
)
if has_valid_tomorrow:
# We already have valid data, nothing to do
return
else:
_LOGGER.info("Missing or invalid tomorrow prices at %s, will refresh data", now.strftime("%H:%M"))
# If we don't have valid tomorrow data, force a refresh
# The refresh will automatically trigger MQTT publish via _check_and_publish_mqtt
await self.async_request_refresh()
async def _handle_hourly_update(self, _):
"""Handle hourly update."""
_LOGGER.debug("Running scheduled hourly update for %s", self.price_type)
# First do the regular update
await self.async_request_refresh()
# Then check if we need tomorrow prices (only in 48h mode after 15:00)
await self._check_tomorrow_prices_after_15()
# Schedule next hourly update
self.schedule_hourly_update()
def schedule_midnight_update(self):