From b65bd37de027e42c3f1f9d86ea0333a460ae5e3c Mon Sep 17 00:00:00 2001 From: balgerion <133121849+balgerion@users.noreply.github.com> Date: Sun, 27 Apr 2025 23:04:33 +0200 Subject: [PATCH] Create sensor.py --- custom_components/sensor.py | 222 ++++++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 custom_components/sensor.py diff --git a/custom_components/sensor.py b/custom_components/sensor.py new file mode 100644 index 0000000..25429fd --- /dev/null +++ b/custom_components/sensor.py @@ -0,0 +1,222 @@ +import logging +from datetime import timedelta +from homeassistant.components.sensor import SensorEntity +from homeassistant.helpers.event import async_track_point_in_time +from homeassistant.util import dt as dt_util +import aiohttp +import async_timeout +import asyncio + +from .const import API_URL, DOMAIN + +_LOGGER = logging.getLogger(__name__) + +SCAN_INTERVAL = timedelta(minutes=30) + +def convert_price(value): + try: + return float(str(value).replace(",", ".").strip()) + except (ValueError, TypeError) as e: + _LOGGER.warning("Price conversion error: %s", str(e)) + return None + +async def async_setup_entry(hass, config_entry, async_add_entities): + config = config_entry.data + sensors = [ + PstrykCurrentPriceSensor(config["api_key"], "buy"), + PstrykCurrentPriceSensor(config["api_key"], "sell"), + PstrykPriceTableSensor(config["api_key"], "buy", config.get("buy_top", 5)), + PstrykPriceTableSensor(config["api_key"], "sell", config.get("sell_top", 5)) + ] + async_add_entities(sensors, True) + +class PstrykCurrentPriceSensor(SensorEntity): + def __init__(self, api_key, price_type): + self._api_key = api_key + self._price_type = price_type + self._state = None + self._available = True + self._unsub_update = None + + @property + def name(self): return f"Pstryk Current {self._price_type.title()} Price" + + @property + def unique_id(self): return f"pstryk_current_{self._price_type}_price" + + @property + def native_value(self): return self._state + + @property + def native_unit_of_measurement(self): return "PLN/kWh" + + @property + def available(self): return self._available + + async def async_added_to_hass(self): + await super().async_added_to_hass() + self._schedule_next_update() + + def _schedule_next_update(self): + if self._unsub_update: + self._unsub_update() + + now = dt_util.utcnow() + next_update = (now + timedelta(hours=1)).replace( + minute=1, second=0, microsecond=0 + ) + + self._unsub_update = async_track_point_in_time( + self.hass, self._async_update_task, next_update + ) + _LOGGER.debug( + "Następna aktualizacja %s zaplanowana na %s", + self._price_type, + dt_util.as_local(next_update).strftime("%Y-%m-%d %H:%M:%S") + ) + + async def _async_update_task(self, _): + try: + await self.async_update(no_throttle=True) + self.async_write_ha_state() + finally: + self._schedule_next_update() + + async def async_update(self, **kwargs): + try: + today = dt_util.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) + window_end = today + timedelta(days=1) + endpoint = "pricing/" if self._price_type == "buy" else "prosumer-pricing/" + url = f"{API_URL}{endpoint}?resolution=hour&window_start={today.strftime('%Y-%m-%dT%H:%M:%SZ')}&window_end={window_end.strftime('%Y-%m-%dT%H:%M:%SZ')}" + + async with aiohttp.ClientSession() as session: + response = await session.get(url, headers={"Authorization": self._api_key, "Accept": "application/json"}) + data = await response.json() + + now = dt_util.utcnow() + current_price = None + for frame in data.get("frames", []): + start = dt_util.parse_datetime(frame["start"]) + end = dt_util.parse_datetime(frame["end"]) + if start <= now < end: + current_price = convert_price(frame["price_gross"]) + break + + self._state = current_price + self._available = current_price is not None + _LOGGER.debug( + "Zaktualizowano %s: %s PLN (UTC: %s)", + self._price_type, + self._state, + now.strftime("%Y-%m-%d %H:%M:%S") + ) + + except Exception as e: + _LOGGER.error("Błąd aktualizacji %s: %s", self._price_type, str(e)) + self._state = None + self._available = False + + async def async_will_remove_from_hass(self): + if self._unsub_update: + self._unsub_update() + +class PstrykPriceTableSensor(SensorEntity): + def __init__(self, api_key, price_type, top_count): + self._api_key = api_key + self._price_type = price_type + self._top_count = top_count + self._state = None + self._attributes = {} + self._available = True + self._unsub_update = None + + @property + def name(self): return f"Pstryk {self._price_type.title()} Price Table" + + @property + def unique_id(self): return f"pstryk_{self._price_type}_price_table" + + @property + def native_value(self): return self._state + + @property + def extra_state_attributes(self): return self._attributes + + @property + def available(self): return self._available + + async def async_added_to_hass(self): + await super().async_added_to_hass() + self._schedule_next_update() + + def _schedule_next_update(self): + if self._unsub_update: + self._unsub_update() + + now = dt_util.utcnow() + next_update = (now + timedelta(hours=1)).replace( + minute=1, second=0, microsecond=0 + ) + + self._unsub_update = async_track_point_in_time( + self.hass, self._async_update_task, next_update + ) + + async def _async_update_task(self, _): + try: + await self.async_update(no_throttle=True) + self.async_write_ha_state() + finally: + self._schedule_next_update() + + def _convert_time(self, utc_str): + try: + return dt_util.as_local(dt_util.parse_datetime(utc_str)).strftime("%H:%M") + except Exception as e: + _LOGGER.error("Błąd konwersji czasu: %s", str(e)) + return "N/A" + + async def async_update(self, **kwargs): + try: + today = dt_util.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) + window_end = today + timedelta(days=1) + endpoint = "pricing/" if self._price_type == "buy" else "prosumer-pricing/" + url = f"{API_URL}{endpoint}?resolution=hour&window_start={today.strftime('%Y-%m-%dT%H:%M:%SZ')}&window_end={window_end.strftime('%Y-%m-%dT%H:%M:%SZ')}" + + async with aiohttp.ClientSession() as session: + response = await session.get(url, headers={"Authorization": self._api_key, "Accept": "application/json"}) + data = await response.json() + + prices = [] + for frame in data.get("frames", []): + price = convert_price(frame.get("price_gross")) + if price is not None: + prices.append({ + "start_utc": frame["start"], + "start_local": self._convert_time(frame["start"]), + "price": price + }) + + sorted_prices = sorted(prices, key=lambda x: x["price"], reverse=(self._price_type == "sell")) + self._state = len(prices) + self._attributes = { + "all_prices": prices, + "best_prices": sorted_prices[:self._top_count], + "top_count": self._top_count, + "last_updated": dt_util.utcnow().isoformat() + } + self._available = True + _LOGGER.debug( + "Zaktualizowano tabelę %s, liczba pozycji: %d", + self._price_type, + len(prices) + ) + + except Exception as e: + _LOGGER.error("Błąd aktualizacji tabeli %s: %s", self._price_type, str(e)) + self._state = None + self._available = False + + async def async_will_remove_from_hass(self): + if self._unsub_update: + self._unsub_update()