Update update_coordinator.py
This commit is contained in:
@ -8,14 +8,25 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, Upda
|
||||
from homeassistant.helpers.event import async_track_point_in_time
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.helpers.translation import async_get_translations
|
||||
from .const import API_URL, API_TIMEOUT, BUY_ENDPOINT, SELL_ENDPOINT, DOMAIN
|
||||
from .const import (
|
||||
API_URL,
|
||||
API_TIMEOUT,
|
||||
BUY_ENDPOINT,
|
||||
SELL_ENDPOINT,
|
||||
DOMAIN,
|
||||
CONF_MQTT_48H_MODE,
|
||||
CONF_RETRY_ATTEMPTS,
|
||||
CONF_RETRY_DELAY,
|
||||
DEFAULT_RETRY_ATTEMPTS,
|
||||
DEFAULT_RETRY_DELAY
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
class ExponentialBackoffRetry:
|
||||
"""Implementacja wykładniczego opóźnienia przy ponawianiu prób."""
|
||||
|
||||
def __init__(self, max_retries=3, base_delay=20.0):
|
||||
def __init__(self, max_retries=DEFAULT_RETRY_ATTEMPTS, base_delay=DEFAULT_RETRY_DELAY):
|
||||
"""Inicjalizacja mechanizmu ponowień.
|
||||
|
||||
Args:
|
||||
@ -109,16 +120,38 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
||||
self._unsub_hourly()
|
||||
if hasattr(self, '_unsub_midnight') and self._unsub_midnight:
|
||||
self._unsub_midnight()
|
||||
if hasattr(self, '_unsub_afternoon') and self._unsub_afternoon:
|
||||
self._unsub_afternoon()
|
||||
|
||||
def __init__(self, hass, api_key, price_type):
|
||||
def __init__(self, hass, api_key, price_type, mqtt_48h_mode=False, retry_attempts=None, retry_delay=None):
|
||||
"""Initialize the coordinator."""
|
||||
self.hass = hass
|
||||
self.api_key = api_key
|
||||
self.price_type = price_type
|
||||
self.mqtt_48h_mode = mqtt_48h_mode
|
||||
self._unsub_hourly = None
|
||||
self._unsub_midnight = None
|
||||
# Inicjalizacja mechanizmu ponowień z 3 próbami i dłuższym odstępem
|
||||
self.retry_mechanism = ExponentialBackoffRetry(max_retries=3, base_delay=20.0)
|
||||
self._unsub_afternoon = None
|
||||
# Inicjalizacja tłumaczeń
|
||||
self._translations = {}
|
||||
# Track if we had tomorrow prices in last update
|
||||
self._had_tomorrow_prices = False
|
||||
|
||||
# Get retry configuration from entry options
|
||||
if retry_attempts is None or retry_delay is None:
|
||||
# Try to find the config entry to get retry options
|
||||
for entry in hass.config_entries.async_entries(DOMAIN):
|
||||
if entry.data.get("api_key") == api_key:
|
||||
retry_attempts = entry.options.get(CONF_RETRY_ATTEMPTS, DEFAULT_RETRY_ATTEMPTS)
|
||||
retry_delay = entry.options.get(CONF_RETRY_DELAY, DEFAULT_RETRY_DELAY)
|
||||
break
|
||||
else:
|
||||
# Use defaults if no matching entry found
|
||||
retry_attempts = DEFAULT_RETRY_ATTEMPTS
|
||||
retry_delay = DEFAULT_RETRY_DELAY
|
||||
|
||||
# Inicjalizacja mechanizmu ponowień z konfigurowalnymi wartościami
|
||||
self.retry_mechanism = ExponentialBackoffRetry(max_retries=retry_attempts, base_delay=retry_delay)
|
||||
|
||||
# Set a default update interval as a fallback (1 hour)
|
||||
# This ensures data is refreshed even if scheduled updates fail
|
||||
@ -142,27 +175,139 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
||||
|
||||
# Obsługa różnych kodów błędu
|
||||
if resp.status == 401:
|
||||
_LOGGER.error("API authentication failed for %s - invalid API key", self.price_type)
|
||||
raise UpdateFailed("API authentication failed - invalid API key")
|
||||
error_msg = self._translations.get(
|
||||
"debug.api_error_401",
|
||||
"API authentication failed for {price_type} - invalid API key"
|
||||
).format(price_type=self.price_type)
|
||||
_LOGGER.error(error_msg)
|
||||
raise UpdateFailed(self._translations.get(
|
||||
"debug.api_error_401_user",
|
||||
"API authentication failed - invalid API key"
|
||||
))
|
||||
elif resp.status == 403:
|
||||
_LOGGER.error("API access forbidden for %s - permissions issue", self.price_type)
|
||||
raise UpdateFailed("API access forbidden - check permissions")
|
||||
error_msg = self._translations.get(
|
||||
"debug.api_error_403",
|
||||
"API access forbidden for {price_type} - permissions issue"
|
||||
).format(price_type=self.price_type)
|
||||
_LOGGER.error(error_msg)
|
||||
raise UpdateFailed(self._translations.get(
|
||||
"debug.api_error_403_user",
|
||||
"API access forbidden - check permissions"
|
||||
))
|
||||
elif resp.status == 404:
|
||||
_LOGGER.error("API endpoint not found for %s - check URL", self.price_type)
|
||||
raise UpdateFailed("API endpoint not found")
|
||||
error_msg = self._translations.get(
|
||||
"debug.api_error_404",
|
||||
"API endpoint not found for {price_type} - check URL"
|
||||
).format(price_type=self.price_type)
|
||||
_LOGGER.error(error_msg)
|
||||
raise UpdateFailed(self._translations.get(
|
||||
"debug.api_error_404_user",
|
||||
"API endpoint not found"
|
||||
))
|
||||
elif resp.status == 429:
|
||||
_LOGGER.error("API rate limit exceeded for %s", self.price_type)
|
||||
raise UpdateFailed("API rate limit exceeded - try again later")
|
||||
error_msg = self._translations.get(
|
||||
"debug.api_error_429",
|
||||
"API rate limit exceeded for {price_type}"
|
||||
).format(price_type=self.price_type)
|
||||
_LOGGER.error(error_msg)
|
||||
raise UpdateFailed(self._translations.get(
|
||||
"debug.api_error_429_user",
|
||||
"API rate limit exceeded - try again later"
|
||||
))
|
||||
elif resp.status == 502:
|
||||
error_msg = self._translations.get(
|
||||
"debug.api_error_502",
|
||||
"API Gateway error (502) for {price_type} - server may be down"
|
||||
).format(price_type=self.price_type)
|
||||
_LOGGER.error(error_msg)
|
||||
raise UpdateFailed(self._translations.get(
|
||||
"debug.api_error_502_user",
|
||||
"API Gateway error (502) - server may be down"
|
||||
))
|
||||
elif 500 <= resp.status < 600:
|
||||
error_msg = self._translations.get(
|
||||
"debug.api_error_5xx",
|
||||
"API server error ({status}) for {price_type} - server issue"
|
||||
).format(status=resp.status, price_type=self.price_type)
|
||||
_LOGGER.error(error_msg)
|
||||
raise UpdateFailed(self._translations.get(
|
||||
"debug.api_error_5xx_user",
|
||||
"API server error ({status}) - server issue"
|
||||
).format(status=resp.status))
|
||||
elif resp.status != 200:
|
||||
error_text = await resp.text()
|
||||
_LOGGER.error("API error %s for %s: %s", resp.status, self.price_type, error_text)
|
||||
raise UpdateFailed(f"API error {resp.status}: {error_text[:100]}")
|
||||
# Pokaż tylko pierwsze 50 znaków błędu dla krótszego logu
|
||||
short_error = error_text[:50] + ("..." if len(error_text) > 50 else "")
|
||||
error_msg = self._translations.get(
|
||||
"debug.api_error_generic",
|
||||
"API error {status} for {price_type}: {error}"
|
||||
).format(status=resp.status, price_type=self.price_type, error=short_error)
|
||||
_LOGGER.error(error_msg)
|
||||
raise UpdateFailed(self._translations.get(
|
||||
"debug.api_error_generic_user",
|
||||
"API error {status}: {error}"
|
||||
).format(status=resp.status, error=short_error))
|
||||
|
||||
return await resp.json()
|
||||
|
||||
async def _check_and_publish_mqtt(self, new_data):
|
||||
"""Check if we should publish to MQTT after update."""
|
||||
if not self.mqtt_48h_mode:
|
||||
return
|
||||
|
||||
now = dt_util.now()
|
||||
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
|
||||
# Check if tomorrow prices are available in new data
|
||||
all_prices = new_data.get("prices", [])
|
||||
tomorrow_prices = [p for p in all_prices if p["start"].startswith(tomorrow)]
|
||||
has_tomorrow_prices = len(tomorrow_prices) > 0
|
||||
|
||||
# If we didn't have tomorrow prices before, but now we do, publish to MQTT
|
||||
if not self._had_tomorrow_prices and has_tomorrow_prices:
|
||||
_LOGGER.info("Tomorrow prices detected for %s, triggering MQTT publish", self.price_type)
|
||||
|
||||
# Find our config entry
|
||||
entry_id = None
|
||||
for entry in self.hass.config_entries.async_entries(DOMAIN):
|
||||
if entry.data.get("api_key") == self.api_key:
|
||||
entry_id = entry.entry_id
|
||||
break
|
||||
|
||||
if entry_id:
|
||||
# Check if both coordinators are initialized before publishing
|
||||
buy_coordinator = self.hass.data[DOMAIN].get(f"{entry_id}_buy")
|
||||
sell_coordinator = self.hass.data[DOMAIN].get(f"{entry_id}_sell")
|
||||
|
||||
if not buy_coordinator or not sell_coordinator:
|
||||
_LOGGER.debug("Coordinators not yet initialized, skipping MQTT publish for now")
|
||||
# Don't update _had_tomorrow_prices so we'll try again on next update
|
||||
return
|
||||
|
||||
# Get MQTT topics from config
|
||||
from .const import CONF_MQTT_TOPIC_BUY, CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_SELL
|
||||
entry = self.hass.config_entries.async_get_entry(entry_id)
|
||||
mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
|
||||
mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
|
||||
|
||||
# Wait a moment for both coordinators to update
|
||||
await asyncio.sleep(5)
|
||||
|
||||
# Publish to MQTT
|
||||
from .mqtt_common import publish_mqtt_prices
|
||||
success = await publish_mqtt_prices(self.hass, entry_id, mqtt_topic_buy, mqtt_topic_sell)
|
||||
|
||||
if success:
|
||||
_LOGGER.info("Successfully published 48h prices to MQTT after detecting tomorrow prices")
|
||||
else:
|
||||
_LOGGER.error("Failed to publish to MQTT after detecting tomorrow prices")
|
||||
|
||||
# Update state for next check
|
||||
self._had_tomorrow_prices = has_tomorrow_prices
|
||||
|
||||
async def _async_update_data(self):
|
||||
"""Fetch 48h of frames and extract current + today's list."""
|
||||
_LOGGER.debug("Starting %s price update", self.price_type)
|
||||
_LOGGER.debug("Starting %s price update (48h mode: %s)", self.price_type, self.mqtt_48h_mode)
|
||||
|
||||
# Store the previous data for fallback
|
||||
previous_data = None
|
||||
@ -187,6 +332,14 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
||||
# Załaduj tłumaczenia dla mechanizmu ponowień
|
||||
await self.retry_mechanism.load_translations(self.hass)
|
||||
|
||||
# Załaduj tłumaczenia dla koordynatora
|
||||
try:
|
||||
self._translations = await async_get_translations(
|
||||
self.hass, self.hass.config.language, DOMAIN, ["debug"]
|
||||
)
|
||||
except Exception as ex:
|
||||
_LOGGER.warning("Failed to load translations for coordinator: %s", ex)
|
||||
|
||||
# Użyj mechanizmu ponowień z parametrem price_type
|
||||
# Nie potrzebujemy łapać asyncio.TimeoutError tutaj, ponieważ
|
||||
# jest już obsługiwany w execute() z odpowiednimi tłumaczeniami
|
||||
@ -225,28 +378,61 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
||||
today_str = today_local.strftime("%Y-%m-%d")
|
||||
prices_today = [p for p in prices if p["start"].startswith(today_str)]
|
||||
|
||||
_LOGGER.debug("Successfully fetched %s price data: current=%s, today_prices=%d",
|
||||
self.price_type, current_price, len(prices_today))
|
||||
_LOGGER.debug("Successfully fetched %s price data: current=%s, today_prices=%d, total_prices=%d",
|
||||
self.price_type, current_price, len(prices_today), len(prices))
|
||||
|
||||
return {
|
||||
new_data = {
|
||||
"prices_today": prices_today,
|
||||
"prices": prices,
|
||||
"current": current_price,
|
||||
"is_cached": False, # Dane bezpośrednio z API
|
||||
}
|
||||
|
||||
# Check if we should publish to MQTT (only for first coordinator that detects new tomorrow prices)
|
||||
if self.mqtt_48h_mode:
|
||||
await self._check_and_publish_mqtt(new_data)
|
||||
|
||||
return new_data
|
||||
|
||||
except aiohttp.ClientError as err:
|
||||
_LOGGER.error("Network error fetching %s data: %s", self.price_type, str(err))
|
||||
error_msg = self._translations.get(
|
||||
"debug.network_error",
|
||||
"Network error fetching {price_type} data: {error}"
|
||||
).format(price_type=self.price_type, error=str(err))
|
||||
_LOGGER.error(error_msg)
|
||||
|
||||
if previous_data:
|
||||
_LOGGER.warning("Using cached data from previous update due to API failure")
|
||||
cache_msg = self._translations.get(
|
||||
"debug.using_cache",
|
||||
"Using cached data from previous update due to API failure"
|
||||
)
|
||||
_LOGGER.warning(cache_msg)
|
||||
return previous_data
|
||||
raise UpdateFailed(f"Network error: {err}")
|
||||
|
||||
raise UpdateFailed(self._translations.get(
|
||||
"debug.network_error_user",
|
||||
"Network error: {error}"
|
||||
).format(error=err))
|
||||
|
||||
except Exception as err:
|
||||
_LOGGER.exception("Unexpected error fetching %s data: %s", self.price_type, str(err))
|
||||
error_msg = self._translations.get(
|
||||
"debug.unexpected_error",
|
||||
"Unexpected error fetching {price_type} data: {error}"
|
||||
).format(price_type=self.price_type, error=str(err))
|
||||
_LOGGER.exception(error_msg)
|
||||
|
||||
if previous_data:
|
||||
_LOGGER.warning("Using cached data from previous update due to API failure")
|
||||
cache_msg = self._translations.get(
|
||||
"debug.using_cache",
|
||||
"Using cached data from previous update due to API failure"
|
||||
)
|
||||
_LOGGER.warning(cache_msg)
|
||||
return previous_data
|
||||
raise UpdateFailed(f"Error: {err}")
|
||||
|
||||
raise UpdateFailed(self._translations.get(
|
||||
"debug.unexpected_error_user",
|
||||
"Error: {error}"
|
||||
).format(error=err))
|
||||
|
||||
def schedule_hourly_update(self):
|
||||
"""Schedule next refresh 1 min after each full hour."""
|
||||
@ -260,7 +446,7 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
||||
+ timedelta(hours=1, minutes=1))
|
||||
|
||||
_LOGGER.debug("Scheduling next hourly update for %s at %s",
|
||||
self.price_type, next_run.isoformat())
|
||||
self.price_type, next_run.strftime("%Y-%m-%d %H:%M:%S"))
|
||||
|
||||
self._unsub_hourly = async_track_point_in_time(
|
||||
self.hass, self._handle_hourly_update, dt_util.as_utc(next_run)
|
||||
@ -283,7 +469,7 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
||||
next_mid = (now + timedelta(days=1)).replace(hour=0, minute=1, second=0, microsecond=0)
|
||||
|
||||
_LOGGER.debug("Scheduling next midnight update for %s at %s",
|
||||
self.price_type, next_mid.isoformat())
|
||||
self.price_type, next_mid.strftime("%Y-%m-%d %H:%M:%S"))
|
||||
|
||||
self._unsub_midnight = async_track_point_in_time(
|
||||
self.hass, self._handle_midnight_update, dt_util.as_utc(next_mid)
|
||||
@ -294,3 +480,81 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
||||
_LOGGER.debug("Running scheduled midnight update for %s", self.price_type)
|
||||
await self.async_request_refresh()
|
||||
self.schedule_midnight_update()
|
||||
|
||||
def schedule_afternoon_update(self):
|
||||
"""Schedule frequent updates between 14:00-15:00 for 48h mode."""
|
||||
if self._unsub_afternoon:
|
||||
self._unsub_afternoon()
|
||||
self._unsub_afternoon = None
|
||||
|
||||
if not self.mqtt_48h_mode:
|
||||
_LOGGER.debug("Afternoon updates not scheduled for %s - 48h mode is disabled", self.price_type)
|
||||
return
|
||||
|
||||
now = dt_util.now()
|
||||
|
||||
# Determine next check time
|
||||
# If we're before 14:00, start at 14:00
|
||||
if now.hour < 14:
|
||||
next_check = now.replace(hour=14, minute=0, second=0, microsecond=0)
|
||||
# If we're between 14:00-15:00, find next 15-minute slot
|
||||
elif now.hour == 14:
|
||||
# Calculate minutes to next 15-minute mark
|
||||
current_minutes = now.minute
|
||||
if current_minutes < 15:
|
||||
next_minutes = 15
|
||||
elif current_minutes < 30:
|
||||
next_minutes = 30
|
||||
elif current_minutes < 45:
|
||||
next_minutes = 45
|
||||
else:
|
||||
# Move to 15:00
|
||||
next_check = now.replace(hour=15, minute=0, second=0, microsecond=0)
|
||||
next_minutes = None
|
||||
|
||||
if next_minutes is not None:
|
||||
next_check = now.replace(minute=next_minutes, second=0, microsecond=0)
|
||||
# If we're at 15:00 or later, schedule for tomorrow 14:00
|
||||
else:
|
||||
next_check = (now + timedelta(days=1)).replace(hour=14, minute=0, second=0, microsecond=0)
|
||||
|
||||
# Make sure next_check is in the future
|
||||
if next_check <= now:
|
||||
# This shouldn't happen, but just in case
|
||||
next_check = next_check + timedelta(minutes=15)
|
||||
|
||||
_LOGGER.info("Scheduling afternoon update check for %s at %s (48h mode, checking every 15min between 14:00-15:00)",
|
||||
self.price_type, next_check.strftime("%Y-%m-%d %H:%M:%S"))
|
||||
|
||||
self._unsub_afternoon = async_track_point_in_time(
|
||||
self.hass, self._handle_afternoon_update, dt_util.as_utc(next_check)
|
||||
)
|
||||
|
||||
async def _handle_afternoon_update(self, _):
|
||||
"""Handle afternoon update for 48h mode."""
|
||||
now = dt_util.now()
|
||||
_LOGGER.debug("Running scheduled afternoon update check for %s at %s",
|
||||
self.price_type, now.strftime("%H:%M"))
|
||||
|
||||
# Perform the update
|
||||
await self.async_request_refresh()
|
||||
|
||||
# Schedule next check only if we're still in the 14:00-15:00 window
|
||||
if now.hour < 15:
|
||||
self.schedule_afternoon_update()
|
||||
else:
|
||||
# We've finished the afternoon window, schedule for tomorrow
|
||||
_LOGGER.info("Finished afternoon update window for %s, next cycle tomorrow", self.price_type)
|
||||
self.schedule_afternoon_update()
|
||||
|
||||
def unschedule_all_updates(self):
|
||||
"""Unschedule all updates."""
|
||||
if self._unsub_hourly:
|
||||
self._unsub_hourly()
|
||||
self._unsub_hourly = None
|
||||
if self._unsub_midnight:
|
||||
self._unsub_midnight()
|
||||
self._unsub_midnight = None
|
||||
if self._unsub_afternoon:
|
||||
self._unsub_afternoon()
|
||||
self._unsub_afternoon = None
|
||||
|
||||
Reference in New Issue
Block a user