""" Module provides tools to deal with Windy's point forecast API. """ from dataclasses import dataclass from datetime import datetime from enum import Enum from functools import reduce import numpy as np def _json(value): try: return value.json() except AttributeError: return value def _convert_notation(unit): return unit.replace("-1", "^-1") class _StrEnum(Enum): def __init__(self, value): self._index = len(self.__class__.__members__) def __lt__(self, other): if self.__class__ is other.__class__: return self._index < other._index return NotImplemented def __le__(self, other): if self.__class__ is other.__class__: return self._index <= other._index return NotImplemented def __gt__(self, other): if self.__class__ is other.__class__: return self._index > other._index return NotImplemented def __ge__(self, other): if self.__class__ is other.__class__: return self._index >= other._index return NotImplemented def __str__(self): return self.value def json(self): return self.value class Model(_StrEnum): """ Numerical models available for use with point forecast API. """ AROME = "arome" GEOS5 = "geos5" GFS = "gfs" GFSWAVE = "gfsWave" ICONEU = "iconEu" NAMALASKA = "namAlaska" NAMCONUS = "namConus" NAMHAWAII = "namHawaii" class Level(_StrEnum): """ Selectable levels for some of the input parameters that support them. """ SURFACE = "surface" H1000 = "1000h" H950 = "950h" H925 = "925h" H900 = "900h" H850 = "850h" H800 = "800h" H700 = "700h" H600 = "600h" H500 = "500h" H400 = "400h" H300 = "300h" H200 = "200h" H150 = "150h" def pressure(self): if self is Level.SURFACE: raise ValueError return float(self.value[:-1]) @dataclass class Request: """ Wraps raw JSON request expected by Windy's API. """ key: str lat: float lon: float model: Model parameters: list = None levels: list = None def json(self): body = { 'key': self.key, 'lat': self.lat, 'lon': self.lon, 'model': _json(self.model), 'parameters': self.parameters or [], } if self.levels: body['levels'] = [_json(x) for x in self.levels] return body class Prediction: """ Predicted values for each of the requested parameters along with their associated time point. Effectively a time slice of the entire Response. """ def __init__(self, response, index=0): self._response = response self._index = index @property def timestamp(self) -> datetime: return self._response.timestamps[self._index] @property def parameters(self) -> tuple: return self._response.parameters def at(self, parameter, level): level = self._response.raw_predictions['level'][self._index].index(level) return self._response.raw_predictions[parameter][self._index][level] def __iter__(self): return iter(self.parameters) def __getitem__(self, parameter): return self._response.raw_predictions[parameter][self._index] class Response: """ Parses raw JSON response from Windy's API to allow easier access to prepared pint-based vertical profiles of each of the parameters from the requested forecast scope. Profiles are accessed by the parameter name (each of the self.parameters) and then indexed by the predictions time (respectively to self.timestamps). Wrapper to access parameters of a certain prediction time point is available: >>> for prediction in response: >>> print(prediction.timestamp, prediction['temp']) Otherwise, timestamps list and raw_predicitions structured as described above are available for direct access. """ _INTERNAL_FIELDS = ('ts', 'units', 'warning') def __init__(self, registry, raw): self.timestamps = [datetime.fromtimestamp(x // 1000) for x in raw['ts']] def _levels_per_parameter(): # (raw, self._INTERNAL_FIELDS) levels = {} for key in raw: if key in self._INTERNAL_FIELDS: continue parameter, level = key.split("-") level = Level(level) if parameter in levels: levels[parameter].add(level) else: levels[parameter] = {level} return levels levels = _levels_per_parameter() all_levels = tuple(sorted(reduce(lambda x, y: x | y, levels.values()))) for parameter in levels: levels[parameter] = tuple(sorted(levels[parameter])) parameters = tuple(sorted(levels.keys())) def _parse_unit_map(): # (registry, raw, levels, parameters) return {x: registry(_convert_notation(raw['units'][f'{x}-{levels[x][0]}'])) for x in parameters} units = _parse_unit_map() def _generate_raw_pressure(): # (all_levels, registry, units, self.timestamps) generated = {} for level in all_levels: if level is Level.SURFACE: continue pressure = (level.pressure() * registry.hPa).m_as(units['pressure']) generated[f'pressure-{level}'] = [pressure for _ in range(len(self.timestamps))] return generated def _prepare_pressure_sort(): # (self.timestamps, raw, all_levels) pressure_sort = [] for index in range(len(self.timestamps)): surface = raw[f'pressure-{Level.SURFACE}'][index] new_index = 0 for new_index, level in enumerate(all_levels[1:], start=0): if surface > raw[f'pressure-{level}'][index]: break pressure_sort.append(new_index) return pressure_sort pressure_sort = [] if 'pressure' in parameters: raw.update(_generate_raw_pressure()) levels['pressure'] = all_levels pressure_sort = _prepare_pressure_sort() self.raw_predictions = {} for parameter in parameters: profiles = [] for index in range(len(self.timestamps)): profile = [] for level in levels[parameter]: try: profile.append(raw[f'{parameter}-{level}'][index]) except KeyError: pass if pressure_sort: new_index = pressure_sort[index] profile[0], profile[new_index] = profile[new_index], profile[0] profiles.append(np.array(profile) * units[parameter]) self.raw_predictions[parameter] = profiles levels = [] for index in range(len(self.timestamps)): profile = list(all_levels) if pressure_sort: new_index = pressure_sort[index] profile[0], profile[new_index] = profile[new_index], profile[0] levels.append(profile) self.raw_predictions['level'] = levels def __len__(self): return len(self.timestamps) @property def parameters(self) -> tuple: return tuple(self.raw_predictions.keys()) def predictions(self) -> Prediction: """ Yields Prediction for each time point available in this Response. """ for index in range(len(self.timestamps)): yield Prediction(self, index) def __iter__(self): return self.predictions() @dataclass class PointForecast: """ Represents the point forecast endpoint bound to *path*. Once created it can be called with Request object or with the same arguments that would be used to initialize the Request. The request is made using the passed *ctx*, which is usually a Windy instance. """ path: str def __call__(self, ctx, *args, **kwargs): try: body = args[0].json() except (IndexError, AttributeError): body = Request(*args, **kwargs).json() response = ctx.session.post(ctx.api + self.path, json=body) response.raise_for_status() return Response(ctx.registry, response.json())