Files
ha_pstryk/custom_components/sensor.py
2025-04-27 23:04:33 +02:00

223 lines
8.0 KiB
Python

import logging
from datetime import timedelta
from homeassistant.components.sensor import SensorEntity
from homeassistant.helpers.event import async_track_point_in_time
from homeassistant.util import dt as dt_util
import aiohttp
import async_timeout
import asyncio
from .const import API_URL, DOMAIN
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=30)
def convert_price(value):
try:
return float(str(value).replace(",", ".").strip())
except (ValueError, TypeError) as e:
_LOGGER.warning("Price conversion error: %s", str(e))
return None
async def async_setup_entry(hass, config_entry, async_add_entities):
config = config_entry.data
sensors = [
PstrykCurrentPriceSensor(config["api_key"], "buy"),
PstrykCurrentPriceSensor(config["api_key"], "sell"),
PstrykPriceTableSensor(config["api_key"], "buy", config.get("buy_top", 5)),
PstrykPriceTableSensor(config["api_key"], "sell", config.get("sell_top", 5))
]
async_add_entities(sensors, True)
class PstrykCurrentPriceSensor(SensorEntity):
def __init__(self, api_key, price_type):
self._api_key = api_key
self._price_type = price_type
self._state = None
self._available = True
self._unsub_update = None
@property
def name(self): return f"Pstryk Current {self._price_type.title()} Price"
@property
def unique_id(self): return f"pstryk_current_{self._price_type}_price"
@property
def native_value(self): return self._state
@property
def native_unit_of_measurement(self): return "PLN/kWh"
@property
def available(self): return self._available
async def async_added_to_hass(self):
await super().async_added_to_hass()
self._schedule_next_update()
def _schedule_next_update(self):
if self._unsub_update:
self._unsub_update()
now = dt_util.utcnow()
next_update = (now + timedelta(hours=1)).replace(
minute=1, second=0, microsecond=0
)
self._unsub_update = async_track_point_in_time(
self.hass, self._async_update_task, next_update
)
_LOGGER.debug(
"Następna aktualizacja %s zaplanowana na %s",
self._price_type,
dt_util.as_local(next_update).strftime("%Y-%m-%d %H:%M:%S")
)
async def _async_update_task(self, _):
try:
await self.async_update(no_throttle=True)
self.async_write_ha_state()
finally:
self._schedule_next_update()
async def async_update(self, **kwargs):
try:
today = dt_util.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
window_end = today + timedelta(days=1)
endpoint = "pricing/" if self._price_type == "buy" else "prosumer-pricing/"
url = f"{API_URL}{endpoint}?resolution=hour&window_start={today.strftime('%Y-%m-%dT%H:%M:%SZ')}&window_end={window_end.strftime('%Y-%m-%dT%H:%M:%SZ')}"
async with aiohttp.ClientSession() as session:
response = await session.get(url, headers={"Authorization": self._api_key, "Accept": "application/json"})
data = await response.json()
now = dt_util.utcnow()
current_price = None
for frame in data.get("frames", []):
start = dt_util.parse_datetime(frame["start"])
end = dt_util.parse_datetime(frame["end"])
if start <= now < end:
current_price = convert_price(frame["price_gross"])
break
self._state = current_price
self._available = current_price is not None
_LOGGER.debug(
"Zaktualizowano %s: %s PLN (UTC: %s)",
self._price_type,
self._state,
now.strftime("%Y-%m-%d %H:%M:%S")
)
except Exception as e:
_LOGGER.error("Błąd aktualizacji %s: %s", self._price_type, str(e))
self._state = None
self._available = False
async def async_will_remove_from_hass(self):
if self._unsub_update:
self._unsub_update()
class PstrykPriceTableSensor(SensorEntity):
def __init__(self, api_key, price_type, top_count):
self._api_key = api_key
self._price_type = price_type
self._top_count = top_count
self._state = None
self._attributes = {}
self._available = True
self._unsub_update = None
@property
def name(self): return f"Pstryk {self._price_type.title()} Price Table"
@property
def unique_id(self): return f"pstryk_{self._price_type}_price_table"
@property
def native_value(self): return self._state
@property
def extra_state_attributes(self): return self._attributes
@property
def available(self): return self._available
async def async_added_to_hass(self):
await super().async_added_to_hass()
self._schedule_next_update()
def _schedule_next_update(self):
if self._unsub_update:
self._unsub_update()
now = dt_util.utcnow()
next_update = (now + timedelta(hours=1)).replace(
minute=1, second=0, microsecond=0
)
self._unsub_update = async_track_point_in_time(
self.hass, self._async_update_task, next_update
)
async def _async_update_task(self, _):
try:
await self.async_update(no_throttle=True)
self.async_write_ha_state()
finally:
self._schedule_next_update()
def _convert_time(self, utc_str):
try:
return dt_util.as_local(dt_util.parse_datetime(utc_str)).strftime("%H:%M")
except Exception as e:
_LOGGER.error("Błąd konwersji czasu: %s", str(e))
return "N/A"
async def async_update(self, **kwargs):
try:
today = dt_util.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
window_end = today + timedelta(days=1)
endpoint = "pricing/" if self._price_type == "buy" else "prosumer-pricing/"
url = f"{API_URL}{endpoint}?resolution=hour&window_start={today.strftime('%Y-%m-%dT%H:%M:%SZ')}&window_end={window_end.strftime('%Y-%m-%dT%H:%M:%SZ')}"
async with aiohttp.ClientSession() as session:
response = await session.get(url, headers={"Authorization": self._api_key, "Accept": "application/json"})
data = await response.json()
prices = []
for frame in data.get("frames", []):
price = convert_price(frame.get("price_gross"))
if price is not None:
prices.append({
"start_utc": frame["start"],
"start_local": self._convert_time(frame["start"]),
"price": price
})
sorted_prices = sorted(prices, key=lambda x: x["price"], reverse=(self._price_type == "sell"))
self._state = len(prices)
self._attributes = {
"all_prices": prices,
"best_prices": sorted_prices[:self._top_count],
"top_count": self._top_count,
"last_updated": dt_util.utcnow().isoformat()
}
self._available = True
_LOGGER.debug(
"Zaktualizowano tabelę %s, liczba pozycji: %d",
self._price_type,
len(prices)
)
except Exception as e:
_LOGGER.error("Błąd aktualizacji tabeli %s: %s", self._price_type, str(e))
self._state = None
self._available = False
async def async_will_remove_from_hass(self):
if self._unsub_update:
self._unsub_update()