Update update_coordinator.py
This commit is contained in:
@ -2,106 +2,22 @@
|
|||||||
import logging
|
import logging
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
import asyncio
|
import asyncio
|
||||||
import aiohttp
|
|
||||||
import async_timeout
|
|
||||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||||
from homeassistant.helpers.event import async_track_point_in_time
|
from homeassistant.helpers.event import async_track_point_in_time
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
from homeassistant.helpers.translation import async_get_translations
|
from homeassistant.helpers.translation import async_get_translations
|
||||||
from .const import (
|
from .const import (
|
||||||
API_URL,
|
API_URL,
|
||||||
API_TIMEOUT,
|
|
||||||
BUY_ENDPOINT,
|
BUY_ENDPOINT,
|
||||||
SELL_ENDPOINT,
|
SELL_ENDPOINT,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
CONF_MQTT_48H_MODE,
|
|
||||||
CONF_RETRY_ATTEMPTS,
|
|
||||||
CONF_RETRY_DELAY,
|
|
||||||
DEFAULT_RETRY_ATTEMPTS,
|
DEFAULT_RETRY_ATTEMPTS,
|
||||||
DEFAULT_RETRY_DELAY
|
DEFAULT_RETRY_DELAY
|
||||||
)
|
)
|
||||||
|
from .api_client import PstrykAPIClient
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
class ExponentialBackoffRetry:
|
|
||||||
"""Implementacja wykładniczego opóźnienia przy ponawianiu prób."""
|
|
||||||
|
|
||||||
def __init__(self, max_retries=DEFAULT_RETRY_ATTEMPTS, base_delay=DEFAULT_RETRY_DELAY):
|
|
||||||
"""Inicjalizacja mechanizmu ponowień.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
max_retries: Maksymalna liczba prób
|
|
||||||
base_delay: Podstawowe opóźnienie w sekundach (zwiększane wykładniczo)
|
|
||||||
"""
|
|
||||||
self.max_retries = max_retries
|
|
||||||
self.base_delay = base_delay
|
|
||||||
self._translations = {}
|
|
||||||
|
|
||||||
async def load_translations(self, hass):
|
|
||||||
"""Załaduj tłumaczenia dla aktualnego języka."""
|
|
||||||
try:
|
|
||||||
self._translations = await async_get_translations(
|
|
||||||
hass, hass.config.language, DOMAIN, ["debug"]
|
|
||||||
)
|
|
||||||
except Exception as ex:
|
|
||||||
_LOGGER.warning("Failed to load translations for retry mechanism: %s", ex)
|
|
||||||
|
|
||||||
async def execute(self, func, *args, price_type=None, **kwargs):
|
|
||||||
"""Wykonaj funkcję z ponawianiem prób.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
func: Funkcja asynchroniczna do wykonania
|
|
||||||
args, kwargs: Argumenty funkcji
|
|
||||||
price_type: Typ ceny (do logów)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Wynik funkcji
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
UpdateFailed: Po wyczerpaniu wszystkich prób
|
|
||||||
"""
|
|
||||||
last_exception = None
|
|
||||||
for retry in range(self.max_retries):
|
|
||||||
try:
|
|
||||||
return await func(*args, **kwargs)
|
|
||||||
except Exception as err:
|
|
||||||
last_exception = err
|
|
||||||
# Nie czekamy po ostatniej próbie
|
|
||||||
if retry < self.max_retries - 1:
|
|
||||||
delay = self.base_delay * (2 ** retry)
|
|
||||||
|
|
||||||
# Użyj przetłumaczonego komunikatu jeśli dostępny
|
|
||||||
retry_msg = self._translations.get(
|
|
||||||
"debug.retry_attempt",
|
|
||||||
"Retry {retry}/{max_retries} after error: {error} (delay: {delay}s)"
|
|
||||||
).format(
|
|
||||||
retry=retry + 1,
|
|
||||||
max_retries=self.max_retries,
|
|
||||||
error=str(err),
|
|
||||||
delay=round(delay, 1)
|
|
||||||
)
|
|
||||||
|
|
||||||
_LOGGER.debug(retry_msg)
|
|
||||||
await asyncio.sleep(delay)
|
|
||||||
|
|
||||||
# Jeśli wszystkie próby zawiodły i mamy timeout
|
|
||||||
if isinstance(last_exception, asyncio.TimeoutError) and price_type:
|
|
||||||
timeout_msg = self._translations.get(
|
|
||||||
"debug.timeout_after_retries",
|
|
||||||
"Timeout fetching {price_type} data from API after {retries} retries"
|
|
||||||
).format(price_type=price_type, retries=self.max_retries)
|
|
||||||
|
|
||||||
_LOGGER.error(timeout_msg)
|
|
||||||
|
|
||||||
api_timeout_msg = self._translations.get(
|
|
||||||
"debug.api_timeout_message",
|
|
||||||
"API timeout after {timeout} seconds (tried {retries} times)"
|
|
||||||
).format(timeout=API_TIMEOUT, retries=self.max_retries)
|
|
||||||
|
|
||||||
raise UpdateFailed(api_timeout_msg)
|
|
||||||
|
|
||||||
# Dla innych typów błędów
|
|
||||||
raise last_exception
|
|
||||||
|
|
||||||
def convert_price(value):
|
def convert_price(value):
|
||||||
"""Convert price string to float."""
|
"""Convert price string to float."""
|
||||||
@ -111,174 +27,60 @@ def convert_price(value):
|
|||||||
_LOGGER.warning("Price conversion error: %s", e)
|
_LOGGER.warning("Price conversion error: %s", e)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
||||||
"""Coordinator to fetch both current price and today's table."""
|
"""Coordinator to fetch both current price and today's table."""
|
||||||
|
|
||||||
def __del__(self):
|
def __init__(self, hass, api_client: PstrykAPIClient, price_type, mqtt_48h_mode=False, retry_attempts=None, retry_delay=None):
|
||||||
"""Properly clean up when object is deleted."""
|
|
||||||
if hasattr(self, '_unsub_hourly') and self._unsub_hourly:
|
|
||||||
self._unsub_hourly()
|
|
||||||
if hasattr(self, '_unsub_midnight') and self._unsub_midnight:
|
|
||||||
self._unsub_midnight()
|
|
||||||
if hasattr(self, '_unsub_afternoon') and self._unsub_afternoon:
|
|
||||||
self._unsub_afternoon()
|
|
||||||
|
|
||||||
def __init__(self, hass, api_key, price_type, mqtt_48h_mode=False, retry_attempts=None, retry_delay=None):
|
|
||||||
"""Initialize the coordinator."""
|
"""Initialize the coordinator."""
|
||||||
self.hass = hass
|
self.hass = hass
|
||||||
self.api_key = api_key
|
self.api_client = api_client
|
||||||
self.price_type = price_type
|
self.price_type = price_type
|
||||||
self.mqtt_48h_mode = mqtt_48h_mode
|
self.mqtt_48h_mode = mqtt_48h_mode
|
||||||
self._unsub_hourly = None
|
self._unsub_hourly = None
|
||||||
self._unsub_midnight = None
|
self._unsub_midnight = None
|
||||||
self._unsub_afternoon = None
|
self._unsub_afternoon = None
|
||||||
# Inicjalizacja tłumaczeń
|
|
||||||
self._translations = {}
|
self._translations = {}
|
||||||
# Track if we had tomorrow prices in last update
|
|
||||||
self._had_tomorrow_prices = False
|
self._had_tomorrow_prices = False
|
||||||
|
|
||||||
# Get retry configuration from entry options
|
# Get retry configuration
|
||||||
if retry_attempts is None or retry_delay is None:
|
if retry_attempts is None:
|
||||||
# Try to find the config entry to get retry options
|
retry_attempts = DEFAULT_RETRY_ATTEMPTS
|
||||||
for entry in hass.config_entries.async_entries(DOMAIN):
|
if retry_delay is None:
|
||||||
if entry.data.get("api_key") == api_key:
|
retry_delay = DEFAULT_RETRY_DELAY
|
||||||
retry_attempts = entry.options.get(CONF_RETRY_ATTEMPTS, DEFAULT_RETRY_ATTEMPTS)
|
|
||||||
retry_delay = entry.options.get(CONF_RETRY_DELAY, DEFAULT_RETRY_DELAY)
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
# Use defaults if no matching entry found
|
|
||||||
retry_attempts = DEFAULT_RETRY_ATTEMPTS
|
|
||||||
retry_delay = DEFAULT_RETRY_DELAY
|
|
||||||
|
|
||||||
# Inicjalizacja mechanizmu ponowień z konfigurowalnymi wartościami
|
self.retry_attempts = retry_attempts
|
||||||
self.retry_mechanism = ExponentialBackoffRetry(max_retries=retry_attempts, base_delay=retry_delay)
|
self.retry_delay = retry_delay
|
||||||
|
|
||||||
# Set a default update interval as a fallback (1 hour)
|
# Set update interval as fallback
|
||||||
# This ensures data is refreshed even if scheduled updates fail
|
|
||||||
update_interval = timedelta(hours=1)
|
update_interval = timedelta(hours=1)
|
||||||
|
|
||||||
super().__init__(
|
super().__init__(
|
||||||
hass,
|
hass,
|
||||||
_LOGGER,
|
_LOGGER,
|
||||||
name=f"{DOMAIN}_{price_type}",
|
name=f"{DOMAIN}_{price_type}",
|
||||||
update_interval=update_interval, # Add fallback interval
|
update_interval=update_interval,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _make_api_request(self, url):
|
|
||||||
"""Make API request with proper error handling."""
|
|
||||||
async with aiohttp.ClientSession() as session:
|
|
||||||
async with async_timeout.timeout(API_TIMEOUT):
|
|
||||||
resp = await session.get(
|
|
||||||
url,
|
|
||||||
headers={"Authorization": self.api_key, "Accept": "application/json"}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Obsługa różnych kodów błędu
|
|
||||||
if resp.status == 401:
|
|
||||||
error_msg = self._translations.get(
|
|
||||||
"debug.api_error_401",
|
|
||||||
"API authentication failed for {price_type} - invalid API key"
|
|
||||||
).format(price_type=self.price_type)
|
|
||||||
_LOGGER.error(error_msg)
|
|
||||||
raise UpdateFailed(self._translations.get(
|
|
||||||
"debug.api_error_401_user",
|
|
||||||
"API authentication failed - invalid API key"
|
|
||||||
))
|
|
||||||
elif resp.status == 403:
|
|
||||||
error_msg = self._translations.get(
|
|
||||||
"debug.api_error_403",
|
|
||||||
"API access forbidden for {price_type} - permissions issue"
|
|
||||||
).format(price_type=self.price_type)
|
|
||||||
_LOGGER.error(error_msg)
|
|
||||||
raise UpdateFailed(self._translations.get(
|
|
||||||
"debug.api_error_403_user",
|
|
||||||
"API access forbidden - check permissions"
|
|
||||||
))
|
|
||||||
elif resp.status == 404:
|
|
||||||
error_msg = self._translations.get(
|
|
||||||
"debug.api_error_404",
|
|
||||||
"API endpoint not found for {price_type} - check URL"
|
|
||||||
).format(price_type=self.price_type)
|
|
||||||
_LOGGER.error(error_msg)
|
|
||||||
raise UpdateFailed(self._translations.get(
|
|
||||||
"debug.api_error_404_user",
|
|
||||||
"API endpoint not found"
|
|
||||||
))
|
|
||||||
elif resp.status == 429:
|
|
||||||
error_msg = self._translations.get(
|
|
||||||
"debug.api_error_429",
|
|
||||||
"API rate limit exceeded for {price_type}"
|
|
||||||
).format(price_type=self.price_type)
|
|
||||||
_LOGGER.error(error_msg)
|
|
||||||
raise UpdateFailed(self._translations.get(
|
|
||||||
"debug.api_error_429_user",
|
|
||||||
"API rate limit exceeded - try again later"
|
|
||||||
))
|
|
||||||
elif resp.status == 502:
|
|
||||||
error_msg = self._translations.get(
|
|
||||||
"debug.api_error_502",
|
|
||||||
"API Gateway error (502) for {price_type} - server may be down"
|
|
||||||
).format(price_type=self.price_type)
|
|
||||||
_LOGGER.error(error_msg)
|
|
||||||
raise UpdateFailed(self._translations.get(
|
|
||||||
"debug.api_error_502_user",
|
|
||||||
"API Gateway error (502) - server may be down"
|
|
||||||
))
|
|
||||||
elif 500 <= resp.status < 600:
|
|
||||||
error_msg = self._translations.get(
|
|
||||||
"debug.api_error_5xx",
|
|
||||||
"API server error ({status}) for {price_type} - server issue"
|
|
||||||
).format(status=resp.status, price_type=self.price_type)
|
|
||||||
_LOGGER.error(error_msg)
|
|
||||||
raise UpdateFailed(self._translations.get(
|
|
||||||
"debug.api_error_5xx_user",
|
|
||||||
"API server error ({status}) - server issue"
|
|
||||||
).format(status=resp.status))
|
|
||||||
elif resp.status != 200:
|
|
||||||
error_text = await resp.text()
|
|
||||||
# Pokaż tylko pierwsze 50 znaków błędu dla krótszego logu
|
|
||||||
short_error = error_text[:50] + ("..." if len(error_text) > 50 else "")
|
|
||||||
error_msg = self._translations.get(
|
|
||||||
"debug.api_error_generic",
|
|
||||||
"API error {status} for {price_type}: {error}"
|
|
||||||
).format(status=resp.status, price_type=self.price_type, error=short_error)
|
|
||||||
_LOGGER.error(error_msg)
|
|
||||||
raise UpdateFailed(self._translations.get(
|
|
||||||
"debug.api_error_generic_user",
|
|
||||||
"API error {status}: {error}"
|
|
||||||
).format(status=resp.status, error=short_error))
|
|
||||||
|
|
||||||
return await resp.json()
|
|
||||||
|
|
||||||
def _is_likely_placeholder_data(self, prices_for_day):
|
def _is_likely_placeholder_data(self, prices_for_day):
|
||||||
"""Check if prices for a day are likely placeholders.
|
"""Check if prices for a day are likely placeholders."""
|
||||||
|
|
||||||
Returns True if:
|
|
||||||
- There are no prices
|
|
||||||
- ALL prices have exactly the same value (suggesting API returned default values)
|
|
||||||
- There are too many consecutive hours with the same value (e.g., 10+ hours)
|
|
||||||
"""
|
|
||||||
if not prices_for_day:
|
if not prices_for_day:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Get all price values
|
|
||||||
price_values = [p.get("price") for p in prices_for_day if p.get("price") is not None]
|
price_values = [p.get("price") for p in prices_for_day if p.get("price") is not None]
|
||||||
|
|
||||||
if not price_values:
|
if not price_values:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# If we have less than 20 prices for a day, it's incomplete data
|
|
||||||
if len(price_values) < 20:
|
if len(price_values) < 20:
|
||||||
_LOGGER.debug(f"Only {len(price_values)} prices for the day, likely incomplete data")
|
_LOGGER.debug(f"Only {len(price_values)} prices for the day, likely incomplete data")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Check if ALL values are identical
|
|
||||||
unique_values = set(price_values)
|
unique_values = set(price_values)
|
||||||
if len(unique_values) == 1:
|
if len(unique_values) == 1:
|
||||||
_LOGGER.debug(f"All {len(price_values)} prices have the same value ({price_values[0]}), likely placeholders")
|
_LOGGER.debug(f"All {len(price_values)} prices have the same value ({price_values[0]}), likely placeholders")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Additional check: if more than 90% of values are the same, it's suspicious
|
|
||||||
most_common_value = max(set(price_values), key=price_values.count)
|
most_common_value = max(set(price_values), key=price_values.count)
|
||||||
count_most_common = price_values.count(most_common_value)
|
count_most_common = price_values.count(most_common_value)
|
||||||
if count_most_common / len(price_values) > 0.9:
|
if count_most_common / len(price_values) > 0.9:
|
||||||
@ -287,30 +89,6 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _count_consecutive_same_values(self, prices):
|
|
||||||
"""Count maximum consecutive hours with the same price."""
|
|
||||||
if not prices:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# Sort by time to ensure consecutive checking
|
|
||||||
sorted_prices = sorted(prices, key=lambda x: x.get("start", ""))
|
|
||||||
|
|
||||||
max_consecutive = 1
|
|
||||||
current_consecutive = 1
|
|
||||||
last_value = None
|
|
||||||
|
|
||||||
for price in sorted_prices:
|
|
||||||
value = price.get("price")
|
|
||||||
if value is not None:
|
|
||||||
if value == last_value:
|
|
||||||
current_consecutive += 1
|
|
||||||
max_consecutive = max(max_consecutive, current_consecutive)
|
|
||||||
else:
|
|
||||||
current_consecutive = 1
|
|
||||||
last_value = value
|
|
||||||
|
|
||||||
return max_consecutive
|
|
||||||
|
|
||||||
async def _check_and_publish_mqtt(self, new_data):
|
async def _check_and_publish_mqtt(self, new_data):
|
||||||
"""Check if we should publish to MQTT after update."""
|
"""Check if we should publish to MQTT after update."""
|
||||||
if not self.mqtt_48h_mode:
|
if not self.mqtt_48h_mode:
|
||||||
@ -319,58 +97,39 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
now = dt_util.now()
|
now = dt_util.now()
|
||||||
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
|
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
# Check if tomorrow prices are available in new data
|
|
||||||
all_prices = new_data.get("prices", [])
|
all_prices = new_data.get("prices", [])
|
||||||
tomorrow_prices = [p for p in all_prices if p["start"].startswith(tomorrow)]
|
tomorrow_prices = [p for p in all_prices if p["start"].startswith(tomorrow)]
|
||||||
|
|
||||||
# Check if tomorrow's data is valid (not placeholders)
|
|
||||||
has_valid_tomorrow_prices = (
|
has_valid_tomorrow_prices = (
|
||||||
len(tomorrow_prices) >= 20 and
|
len(tomorrow_prices) >= 20 and
|
||||||
not self._is_likely_placeholder_data(tomorrow_prices)
|
not self._is_likely_placeholder_data(tomorrow_prices)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Log what we found for debugging
|
|
||||||
if tomorrow_prices:
|
|
||||||
unique_values = set(p.get("price") for p in tomorrow_prices if p.get("price") is not None)
|
|
||||||
consecutive = self._count_consecutive_same_values(tomorrow_prices)
|
|
||||||
_LOGGER.debug(
|
|
||||||
f"Tomorrow has {len(tomorrow_prices)} prices, "
|
|
||||||
f"{len(unique_values)} unique values, "
|
|
||||||
f"max {consecutive} consecutive same values, "
|
|
||||||
f"valid: {has_valid_tomorrow_prices}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# If we didn't have valid tomorrow prices before, but now we do, publish to MQTT immediately
|
|
||||||
if not self._had_tomorrow_prices and has_valid_tomorrow_prices:
|
if not self._had_tomorrow_prices and has_valid_tomorrow_prices:
|
||||||
_LOGGER.info("Valid tomorrow prices detected for %s, triggering immediate MQTT publish", self.price_type)
|
_LOGGER.info("Valid tomorrow prices detected for %s, triggering immediate MQTT publish", self.price_type)
|
||||||
|
|
||||||
# Find our config entry
|
# Find our config entry
|
||||||
entry_id = None
|
entry_id = None
|
||||||
for entry in self.hass.config_entries.async_entries(DOMAIN):
|
for entry in self.hass.config_entries.async_entries(DOMAIN):
|
||||||
if entry.data.get("api_key") == self.api_key:
|
if self.api_client.api_key == entry.data.get("api_key"):
|
||||||
entry_id = entry.entry_id
|
entry_id = entry.entry_id
|
||||||
break
|
break
|
||||||
|
|
||||||
if entry_id:
|
if entry_id:
|
||||||
# Check if both coordinators are initialized before publishing
|
|
||||||
buy_coordinator = self.hass.data[DOMAIN].get(f"{entry_id}_buy")
|
buy_coordinator = self.hass.data[DOMAIN].get(f"{entry_id}_buy")
|
||||||
sell_coordinator = self.hass.data[DOMAIN].get(f"{entry_id}_sell")
|
sell_coordinator = self.hass.data[DOMAIN].get(f"{entry_id}_sell")
|
||||||
|
|
||||||
if not buy_coordinator or not sell_coordinator:
|
if not buy_coordinator or not sell_coordinator:
|
||||||
_LOGGER.debug("Coordinators not yet initialized, skipping MQTT publish for now")
|
_LOGGER.debug("Coordinators not yet initialized, skipping MQTT publish for now")
|
||||||
# Don't update _had_tomorrow_prices so we'll try again on next update
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# Get MQTT topics from config
|
|
||||||
from .const import CONF_MQTT_TOPIC_BUY, CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_SELL
|
from .const import CONF_MQTT_TOPIC_BUY, CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_SELL
|
||||||
entry = self.hass.config_entries.async_get_entry(entry_id)
|
entry = self.hass.config_entries.async_get_entry(entry_id)
|
||||||
mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
|
mqtt_topic_buy = entry.options.get(CONF_MQTT_TOPIC_BUY, DEFAULT_MQTT_TOPIC_BUY)
|
||||||
mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
|
mqtt_topic_sell = entry.options.get(CONF_MQTT_TOPIC_SELL, DEFAULT_MQTT_TOPIC_SELL)
|
||||||
|
|
||||||
# Wait a moment for both coordinators to update
|
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
# Publish to MQTT
|
|
||||||
from .mqtt_common import publish_mqtt_prices
|
from .mqtt_common import publish_mqtt_prices
|
||||||
success = await publish_mqtt_prices(self.hass, entry_id, mqtt_topic_buy, mqtt_topic_sell)
|
success = await publish_mqtt_prices(self.hass, entry_id, mqtt_topic_buy, mqtt_topic_sell)
|
||||||
|
|
||||||
@ -379,146 +138,116 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
else:
|
else:
|
||||||
_LOGGER.error("Failed to publish to MQTT after detecting tomorrow prices")
|
_LOGGER.error("Failed to publish to MQTT after detecting tomorrow prices")
|
||||||
|
|
||||||
# Update state for next check
|
|
||||||
self._had_tomorrow_prices = has_valid_tomorrow_prices
|
self._had_tomorrow_prices = has_valid_tomorrow_prices
|
||||||
|
|
||||||
async def _async_update_data(self):
|
async def _async_update_data(self):
|
||||||
"""Fetch 48h of frames and extract current + today's list."""
|
"""Fetch 48h of frames and extract current + today's list."""
|
||||||
_LOGGER.debug("Starting %s price update (48h mode: %s)", self.price_type, self.mqtt_48h_mode)
|
_LOGGER.debug("Starting %s price update (48h mode: %s)", self.price_type, self.mqtt_48h_mode)
|
||||||
|
|
||||||
# Store the previous data for fallback
|
previous_data = None
|
||||||
previous_data = None
|
if hasattr(self, 'data') and self.data:
|
||||||
if hasattr(self, 'data') and self.data:
|
previous_data = self.data.copy() if self.data else None
|
||||||
previous_data = self.data.copy() if self.data else None
|
if previous_data:
|
||||||
if previous_data:
|
previous_data["is_cached"] = True
|
||||||
previous_data["is_cached"] = True
|
|
||||||
|
|
||||||
# Get today's start in LOCAL time, then convert to UTC
|
today_local = dt_util.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
# This ensures we get the correct "today" for the user's timezone
|
window_end_local = today_local + timedelta(days=2)
|
||||||
today_local = dt_util.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
||||||
window_end_local = today_local + timedelta(days=2)
|
|
||||||
|
|
||||||
# Convert to UTC for API request
|
start_utc = dt_util.as_utc(today_local)
|
||||||
start_utc = dt_util.as_utc(today_local)
|
end_utc = dt_util.as_utc(window_end_local)
|
||||||
end_utc = dt_util.as_utc(window_end_local)
|
|
||||||
|
|
||||||
start_str = start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
start_str = start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
end_str = end_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
end_str = end_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
endpoint_tpl = BUY_ENDPOINT if self.price_type == "buy" else SELL_ENDPOINT
|
endpoint_tpl = BUY_ENDPOINT if self.price_type == "buy" else SELL_ENDPOINT
|
||||||
endpoint = endpoint_tpl.format(start=start_str, end=end_str)
|
endpoint = endpoint_tpl.format(start=start_str, end=end_str)
|
||||||
url = f"{API_URL}{endpoint}"
|
url = f"{API_URL}{endpoint}"
|
||||||
|
|
||||||
_LOGGER.debug("Requesting %s data from %s", self.price_type, url)
|
_LOGGER.debug("Requesting %s data from %s", self.price_type, url)
|
||||||
|
|
||||||
# Get current time in UTC for price comparison
|
now_utc = dt_util.utcnow()
|
||||||
now_utc = dt_util.utcnow()
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Load translations
|
||||||
try:
|
try:
|
||||||
# Load translations
|
self._translations = await async_get_translations(
|
||||||
await self.retry_mechanism.load_translations(self.hass)
|
self.hass, self.hass.config.language, DOMAIN, ["debug"]
|
||||||
|
|
||||||
try:
|
|
||||||
self._translations = await async_get_translations(
|
|
||||||
self.hass, self.hass.config.language, DOMAIN, ["debug"]
|
|
||||||
)
|
|
||||||
except Exception as ex:
|
|
||||||
_LOGGER.warning("Failed to load translations for coordinator: %s", ex)
|
|
||||||
|
|
||||||
# Use retry mechanism
|
|
||||||
data = await self.retry_mechanism.execute(
|
|
||||||
self._make_api_request,
|
|
||||||
url,
|
|
||||||
price_type=self.price_type
|
|
||||||
)
|
)
|
||||||
|
except Exception as ex:
|
||||||
|
_LOGGER.warning("Failed to load translations for coordinator: %s", ex)
|
||||||
|
|
||||||
frames = data.get("frames", [])
|
# Use shared API client
|
||||||
if not frames:
|
data = await self.api_client.fetch(
|
||||||
_LOGGER.warning("No frames returned for %s prices", self.price_type)
|
url,
|
||||||
|
max_retries=self.retry_attempts,
|
||||||
|
base_delay=self.retry_delay
|
||||||
|
)
|
||||||
|
|
||||||
prices = []
|
frames = data.get("frames", [])
|
||||||
current_price = None
|
if not frames:
|
||||||
|
_LOGGER.warning("No frames returned for %s prices", self.price_type)
|
||||||
|
|
||||||
for f in frames:
|
prices = []
|
||||||
val = convert_price(f.get("price_gross"))
|
current_price = None
|
||||||
if val is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
start = dt_util.parse_datetime(f["start"])
|
for f in frames:
|
||||||
end = dt_util.parse_datetime(f["end"])
|
val = convert_price(f.get("price_gross"))
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
|
||||||
if not start or not end:
|
start = dt_util.parse_datetime(f["start"])
|
||||||
_LOGGER.warning("Invalid datetime format in frames for %s", self.price_type)
|
end = dt_util.parse_datetime(f["end"])
|
||||||
continue
|
|
||||||
|
|
||||||
# Convert to local time for display (we need this for hourly data)
|
if not start or not end:
|
||||||
local_start = dt_util.as_local(start).strftime("%Y-%m-%dT%H:%M:%S")
|
_LOGGER.warning("Invalid datetime format in frames for %s", self.price_type)
|
||||||
prices.append({"start": local_start, "price": val})
|
continue
|
||||||
|
|
||||||
# Check if this is the current price
|
local_start = dt_util.as_local(start).strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
if start <= now_utc < end:
|
prices.append({"start": local_start, "price": val})
|
||||||
current_price = val
|
|
||||||
|
|
||||||
# Filter today's prices
|
if start <= now_utc < end:
|
||||||
today_local = dt_util.now().strftime("%Y-%m-%d")
|
current_price = val
|
||||||
prices_today = [p for p in prices if p["start"].startswith(today_local)]
|
|
||||||
|
|
||||||
_LOGGER.debug("Successfully fetched %s price data: current=%s, today_prices=%d, total_prices=%d",
|
today_local = dt_util.now().strftime("%Y-%m-%d")
|
||||||
self.price_type, current_price, len(prices_today), len(prices))
|
prices_today = [p for p in prices if p["start"].startswith(today_local)]
|
||||||
|
|
||||||
new_data = {
|
_LOGGER.debug("Successfully fetched %s price data: current=%s, today_prices=%d, total_prices=%d",
|
||||||
"prices_today": prices_today,
|
self.price_type, current_price, len(prices_today), len(prices))
|
||||||
"prices": prices,
|
|
||||||
"current": current_price,
|
|
||||||
"is_cached": False,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Check if we should publish to MQTT
|
new_data = {
|
||||||
if self.mqtt_48h_mode:
|
"prices_today": prices_today,
|
||||||
await self._check_and_publish_mqtt(new_data)
|
"prices": prices,
|
||||||
|
"current": current_price,
|
||||||
|
"is_cached": False,
|
||||||
|
}
|
||||||
|
|
||||||
return new_data
|
if self.mqtt_48h_mode:
|
||||||
|
await self._check_and_publish_mqtt(new_data)
|
||||||
|
|
||||||
except aiohttp.ClientError as err:
|
return new_data
|
||||||
error_msg = self._translations.get(
|
|
||||||
"debug.network_error",
|
|
||||||
"Network error fetching {price_type} data: {error}"
|
|
||||||
).format(price_type=self.price_type, error=str(err))
|
|
||||||
_LOGGER.error(error_msg)
|
|
||||||
|
|
||||||
if previous_data:
|
except UpdateFailed:
|
||||||
cache_msg = self._translations.get(
|
# UpdateFailed already has proper error message from API client
|
||||||
"debug.using_cache",
|
if previous_data:
|
||||||
"Using cached data from previous update due to API failure"
|
_LOGGER.warning("Using cached data from previous update due to API failure")
|
||||||
)
|
return previous_data
|
||||||
_LOGGER.warning(cache_msg)
|
raise
|
||||||
return previous_data
|
|
||||||
|
|
||||||
raise UpdateFailed(self._translations.get(
|
except Exception as err:
|
||||||
"debug.network_error_user",
|
error_msg = self._translations.get(
|
||||||
"Network error: {error}"
|
"debug.unexpected_error",
|
||||||
).format(error=err))
|
"Unexpected error fetching {price_type} data: {error}"
|
||||||
|
).format(price_type=self.price_type, error=str(err))
|
||||||
|
_LOGGER.exception(error_msg)
|
||||||
|
|
||||||
except Exception as err:
|
if previous_data:
|
||||||
error_msg = self._translations.get(
|
_LOGGER.warning("Using cached data from previous update due to API failure")
|
||||||
"debug.unexpected_error",
|
return previous_data
|
||||||
"Unexpected error fetching {price_type} data: {error}"
|
|
||||||
).format(price_type=self.price_type, error=str(err))
|
|
||||||
_LOGGER.exception(error_msg)
|
|
||||||
|
|
||||||
if previous_data:
|
|
||||||
cache_msg = self._translations.get(
|
|
||||||
"debug.using_cache",
|
|
||||||
"Using cached data from previous update due to API failure"
|
|
||||||
)
|
|
||||||
_LOGGER.warning(cache_msg)
|
|
||||||
return previous_data
|
|
||||||
|
|
||||||
raise UpdateFailed(self._translations.get(
|
|
||||||
"debug.unexpected_error_user",
|
|
||||||
"Error: {error}"
|
|
||||||
).format(error=err))
|
|
||||||
|
|
||||||
|
raise UpdateFailed(self._translations.get(
|
||||||
|
"debug.unexpected_error_user",
|
||||||
|
"Error: {error}"
|
||||||
|
).format(error=err))
|
||||||
|
|
||||||
def schedule_hourly_update(self):
|
def schedule_hourly_update(self):
|
||||||
"""Schedule next refresh 1 min after each full hour."""
|
"""Schedule next refresh 1 min after each full hour."""
|
||||||
@ -527,7 +256,6 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
self._unsub_hourly = None
|
self._unsub_hourly = None
|
||||||
|
|
||||||
now = dt_util.now()
|
now = dt_util.now()
|
||||||
# Keep original timing: 1 minute past the hour
|
|
||||||
next_run = (now.replace(minute=0, second=0, microsecond=0)
|
next_run = (now.replace(minute=0, second=0, microsecond=0)
|
||||||
+ timedelta(hours=1, minutes=1))
|
+ timedelta(hours=1, minutes=1))
|
||||||
|
|
||||||
@ -539,50 +267,37 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def _check_tomorrow_prices_after_15(self):
|
async def _check_tomorrow_prices_after_15(self):
|
||||||
"""Check if tomorrow prices are available and publish to MQTT if needed.
|
"""Check if tomorrow prices are available and publish to MQTT if needed."""
|
||||||
This runs after 15:00 until midnight in 48h mode.
|
|
||||||
"""
|
|
||||||
if not self.mqtt_48h_mode:
|
if not self.mqtt_48h_mode:
|
||||||
return
|
return
|
||||||
|
|
||||||
now = dt_util.now()
|
now = dt_util.now()
|
||||||
# Only run between 15:00 and 23:59
|
|
||||||
if now.hour < 15:
|
if now.hour < 15:
|
||||||
return
|
return
|
||||||
|
|
||||||
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
|
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
# Check if we already have valid tomorrow data
|
|
||||||
if self.data and self.data.get("prices"):
|
if self.data and self.data.get("prices"):
|
||||||
tomorrow_prices = [p for p in self.data.get("prices", []) if p.get("start", "").startswith(tomorrow)]
|
tomorrow_prices = [p for p in self.data.get("prices", []) if p.get("start", "").startswith(tomorrow)]
|
||||||
|
|
||||||
# Check if tomorrow's data is valid (not placeholders)
|
|
||||||
has_valid_tomorrow = (
|
has_valid_tomorrow = (
|
||||||
len(tomorrow_prices) >= 20 and
|
len(tomorrow_prices) >= 20 and
|
||||||
not self._is_likely_placeholder_data(tomorrow_prices)
|
not self._is_likely_placeholder_data(tomorrow_prices)
|
||||||
)
|
)
|
||||||
|
|
||||||
if has_valid_tomorrow:
|
if has_valid_tomorrow:
|
||||||
# We already have valid data, nothing to do
|
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
_LOGGER.info("Missing or invalid tomorrow prices at %s, will refresh data", now.strftime("%H:%M"))
|
_LOGGER.info("Missing or invalid tomorrow prices at %s, will refresh data", now.strftime("%H:%M"))
|
||||||
|
|
||||||
# If we don't have valid tomorrow data, force a refresh
|
|
||||||
# The refresh will automatically trigger MQTT publish via _check_and_publish_mqtt
|
|
||||||
await self.async_request_refresh()
|
await self.async_request_refresh()
|
||||||
|
|
||||||
async def _handle_hourly_update(self, _):
|
async def _handle_hourly_update(self, _):
|
||||||
"""Handle hourly update."""
|
"""Handle hourly update."""
|
||||||
_LOGGER.debug("Running scheduled hourly update for %s", self.price_type)
|
_LOGGER.debug("Running scheduled hourly update for %s", self.price_type)
|
||||||
|
|
||||||
# First do the regular update
|
|
||||||
await self.async_request_refresh()
|
await self.async_request_refresh()
|
||||||
|
|
||||||
# Then check if we need tomorrow prices (only in 48h mode after 15:00)
|
|
||||||
await self._check_tomorrow_prices_after_15()
|
await self._check_tomorrow_prices_after_15()
|
||||||
|
|
||||||
# Schedule next hourly update
|
|
||||||
self.schedule_hourly_update()
|
self.schedule_hourly_update()
|
||||||
|
|
||||||
def schedule_midnight_update(self):
|
def schedule_midnight_update(self):
|
||||||
@ -592,7 +307,6 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
self._unsub_midnight = None
|
self._unsub_midnight = None
|
||||||
|
|
||||||
now = dt_util.now()
|
now = dt_util.now()
|
||||||
# Keep original timing: 1 minute past midnight
|
|
||||||
next_mid = (now + timedelta(days=1)).replace(hour=0, minute=1, second=0, microsecond=0)
|
next_mid = (now + timedelta(days=1)).replace(hour=0, minute=1, second=0, microsecond=0)
|
||||||
|
|
||||||
_LOGGER.debug("Scheduling next midnight update for %s at %s",
|
_LOGGER.debug("Scheduling next midnight update for %s at %s",
|
||||||
@ -620,13 +334,9 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
|
|
||||||
now = dt_util.now()
|
now = dt_util.now()
|
||||||
|
|
||||||
# Determine next check time
|
|
||||||
# If we're before 14:00, start at 14:00
|
|
||||||
if now.hour < 14:
|
if now.hour < 14:
|
||||||
next_check = now.replace(hour=14, minute=0, second=0, microsecond=0)
|
next_check = now.replace(hour=14, minute=0, second=0, microsecond=0)
|
||||||
# If we're between 14:00-15:00, find next 15-minute slot
|
|
||||||
elif now.hour == 14:
|
elif now.hour == 14:
|
||||||
# Calculate minutes to next 15-minute mark
|
|
||||||
current_minutes = now.minute
|
current_minutes = now.minute
|
||||||
if current_minutes < 15:
|
if current_minutes < 15:
|
||||||
next_minutes = 15
|
next_minutes = 15
|
||||||
@ -635,19 +345,15 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
elif current_minutes < 45:
|
elif current_minutes < 45:
|
||||||
next_minutes = 45
|
next_minutes = 45
|
||||||
else:
|
else:
|
||||||
# Move to 15:00
|
|
||||||
next_check = now.replace(hour=15, minute=0, second=0, microsecond=0)
|
next_check = now.replace(hour=15, minute=0, second=0, microsecond=0)
|
||||||
next_minutes = None
|
next_minutes = None
|
||||||
|
|
||||||
if next_minutes is not None:
|
if next_minutes is not None:
|
||||||
next_check = now.replace(minute=next_minutes, second=0, microsecond=0)
|
next_check = now.replace(minute=next_minutes, second=0, microsecond=0)
|
||||||
# If we're at 15:00 or later, schedule for tomorrow 14:00
|
|
||||||
else:
|
else:
|
||||||
next_check = (now + timedelta(days=1)).replace(hour=14, minute=0, second=0, microsecond=0)
|
next_check = (now + timedelta(days=1)).replace(hour=14, minute=0, second=0, microsecond=0)
|
||||||
|
|
||||||
# Make sure next_check is in the future
|
|
||||||
if next_check <= now:
|
if next_check <= now:
|
||||||
# This shouldn't happen, but just in case
|
|
||||||
next_check = next_check + timedelta(minutes=15)
|
next_check = next_check + timedelta(minutes=15)
|
||||||
|
|
||||||
_LOGGER.info("Scheduling afternoon update check for %s at %s (48h mode, checking every 15min between 14:00-15:00)",
|
_LOGGER.info("Scheduling afternoon update check for %s at %s (48h mode, checking every 15min between 14:00-15:00)",
|
||||||
@ -663,25 +369,10 @@ class PstrykDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
_LOGGER.debug("Running scheduled afternoon update check for %s at %s",
|
_LOGGER.debug("Running scheduled afternoon update check for %s at %s",
|
||||||
self.price_type, now.strftime("%H:%M"))
|
self.price_type, now.strftime("%H:%M"))
|
||||||
|
|
||||||
# Perform the update
|
|
||||||
await self.async_request_refresh()
|
await self.async_request_refresh()
|
||||||
|
|
||||||
# Schedule next check only if we're still in the 14:00-15:00 window
|
|
||||||
if now.hour < 15:
|
if now.hour < 15:
|
||||||
self.schedule_afternoon_update()
|
self.schedule_afternoon_update()
|
||||||
else:
|
else:
|
||||||
# We've finished the afternoon window, schedule for tomorrow
|
|
||||||
_LOGGER.info("Finished afternoon update window for %s, next cycle tomorrow", self.price_type)
|
_LOGGER.info("Finished afternoon update window for %s, next cycle tomorrow", self.price_type)
|
||||||
self.schedule_afternoon_update()
|
self.schedule_afternoon_update()
|
||||||
|
|
||||||
def unschedule_all_updates(self):
|
|
||||||
"""Unschedule all updates."""
|
|
||||||
if self._unsub_hourly:
|
|
||||||
self._unsub_hourly()
|
|
||||||
self._unsub_hourly = None
|
|
||||||
if self._unsub_midnight:
|
|
||||||
self._unsub_midnight()
|
|
||||||
self._unsub_midnight = None
|
|
||||||
if self._unsub_afternoon:
|
|
||||||
self._unsub_afternoon()
|
|
||||||
self._unsub_afternoon = None
|
|
||||||
|
|||||||
Reference in New Issue
Block a user