Source code for idf_analysis.idf_backend

import math
import warnings
from pathlib import Path
from typing import Literal

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.stats as sps

from .definitions import SERIES, METHOD, PARAM
from .in_out import write_yaml, read_yaml
from .little_helpers import duration_steps_readable, minutes_readable, get_progress_bar
from .parameter_formulas import FORMULA_REGISTER, _Formula, register_formulas_to_yaml, LinearFormula
from .sww_utils import year_delta, guess_freq, rain_events, agg_events


[docs] class IdfParameters: def __init__(self, series_kind: str or Literal['partial', 'annual'] = SERIES.PARTIAL, worksheet: str or Literal['KOSTRA', 'convective_vs_advective', 'ATV-A_121'] = METHOD.KOSTRA, extended_durations=False): self.series_kind = series_kind self._durations = None self._set_default_durations(extended_durations) self.parameters_series = {} # parameters u and w (distribution function) from the event series analysis self.parameters_final = {} # parameters of the distribution function after the regression # {lower duration bound in minutes: {parameter u or w: {'function': function-name}}} if worksheet is not None: self.set_parameter_approaches_from_worksheet(worksheet) self._interims = None # type: ExtremeValueParameters def calc_from_series(self, series): self._interims = ExtremeValueParameters(series, self.durations) self._interims.evaluate(self.series_kind) self.parameters_series[PARAM.U] = np.array(self._interims.u) self.parameters_series[PARAM.W] = np.array(self._interims.w) self._calc_params() def reverse_engineering(self, idf_table, linear_interpolation=False): durations = idf_table.index.values u = idf_table[1].values w_dat = idf_table.sub(u, axis=0) # -- w = [] for dur in durations: dat = w_dat.loc[dur] log_tn_i = np.log(dat.index.values) w.append(sum(dat.values * log_tn_i) / sum(log_tn_i ** 2)) # --- self.durations = durations self.parameters_series[PARAM.U] = np.array(u) self.parameters_series[PARAM.W] = np.array(w) if linear_interpolation: self.clear_parameter_approaches() self.add_parameter_approach(0, 'linear', 'linear') self._calc_params() # from u,w to a_u, a_w, ... else: self._calc_params() @property def durations(self): return np.array(self._durations) @durations.setter def durations(self, durations): self._durations = list(durations) def _set_default_durations(self, extended_durations=False): # Suggestion from ATV-A 121 (ATV-DVWK-Regelwerk 2/2001) # sampling points of the duration steps in minutes duration_steps = [5, 10, 15, 20, 30, 45, 60, 90] # self._duration_steps += [i * 60 for i in [3, 4.5, 6, 7.5, 10, 12, 18]] # duration steps in hours duration_steps += [i * 60 for i in [2, 3, 4, 6, 9, 12, 18]] # duration steps in hours if extended_durations: duration_steps += [i * 60 * 24 for i in [1, 2, 3, 4, 5, 6]] # duration steps in days self.durations = duration_steps def filter_durations(self, freq_minutes): self.limit_duration(lowest=freq_minutes) self.limit_durations_from_freq(freq_minutes) def limit_duration(self, lowest=None, highest=None): # bool_array = self.durations == False bool_array = np.array([False]*self.durations.size) if lowest is not None: bool_array |= self.durations >= lowest if highest is not None: bool_array |= self.durations <= highest self.durations = self.durations[bool_array] for param in PARAM.U_AND_W: if param in self.parameters_series: self.parameters_series[param] = self.parameters_series[param][bool_array] def limit_durations_from_freq(self, freq_minutes): # only multiple of freq # Aus DWA-M 531 Abschnitt 2: # > Da Niederschlagsmesser Tageswerte liefern, sind hier nur Auswertungen für Regendauern möglich, die ein Vielfaches von 24 h betragen. self.durations = self.durations[(self.durations % freq_minutes) == 0]
[docs] def set_parameter_approaches_from_worksheet(self, worksheet): """ Set approaches depending on the duration and the parameter. Args: worksheet (str): worksheet name for the analysis: - 'DWA-A_531' - 'ATV-A_121' - 'DWA-A_531_advektiv' (yet not implemented) Returns: list[dict]: table of approaches depending on the duration and the parameter """ self.parameters_final = read_yaml(Path(__file__).parent / 'approaches' / (worksheet + '.yaml')) if self.parameters_series: self._calc_params()
def add_parameter_approach(self, duration_bound, approach_u, approach_w): self.parameters_final[duration_bound] = { PARAM.U: {PARAM.FUNCTION: approach_u}, PARAM.W: {PARAM.FUNCTION: approach_w}, } def clear_parameter_approaches(self): self.parameters_final = {} def _iter_params(self): # parameters_final has the lower bound duration step as a key # making a zip with lower bound duration step and upper bound duration step # last upper bound duration step is infinity lower_bounds = list(self.parameters_final.keys()) return zip(lower_bounds, lower_bounds[1:] + [np.inf]) def _calc_params(self): """ Calculate parameters a_u, a_w, b_u and b_w and add it to the dict. """ for dur_lower_bound, dur_upper_bound in self._iter_params(): # bool array for this duration range param_part = (self.durations >= dur_lower_bound) & (self.durations <= dur_upper_bound) if param_part.sum() <= 1: # Only one duration step in this duration range. # Only one value available in the series for this regression. del self.parameters_final[dur_lower_bound] continue # {'u': {'function': ...}, 'w': {'function': ...}} params_dur = self.parameters_final[dur_lower_bound] # type: dict[str, (_Formula or dict)] # selected duration steps for the duration range dur = self.durations[param_part] for p in PARAM.U_AND_W: # u or w # array of parameter values u|w values_series = self.parameters_series[p][param_part] # --- # convert dict with approach-string to Formula-object if not isinstance(params_dur[p], _Formula): approach = params_dur[p][PARAM.FUNCTION] if approach not in FORMULA_REGISTER: # if approach is not defined in code - raise Error raise NotImplementedError(f'{approach=}') params_dur[p] = _Formula.from_dict(params_dur[p]) # init object if not params_dur[p].is_fitted: # if fit was not already run params_dur[p].fit(dur, values_series) # --- self._balance_parameter_change() def _balance_parameter_change(self): last = {PARAM.U: None, PARAM.W: None} for dur_lower_bound, dur_upper_bound in self._iter_params(): duration_change = dur_upper_bound for p in PARAM.U_AND_W: # u or w formula_this = self.parameters_final[dur_lower_bound][p] # type: _Formula if dur_upper_bound not in self.parameters_final: # last range if last[p] is None: # last was linear OR only one formula continue formula_this.fit(formula_this.durations, formula_this.values, *last[p]) continue formula_next = self.parameters_final[dur_upper_bound][p] # type: _Formula if isinstance(formula_next, LinearFormula) and isinstance(formula_this, LinearFormula): last[p] = None continue elif isinstance(formula_next, LinearFormula): value_mean = formula_this.get_param(duration_change) elif isinstance(formula_this, LinearFormula): value_mean = formula_next.get_param(duration_change) else: value_mean = (formula_this.get_param(duration_change) + formula_next.get_param(duration_change))/2 if last[p] is None: formula_this.fit(formula_this.durations, formula_this.values, duration_change, value_mean) else: formula_this.fit(formula_this.durations, formula_this.values, [last[p][0], duration_change], [last[p][1], value_mean]) last[p] = (duration_change, value_mean)
[docs] def measured_points(self, return_periods, max_duration=None): """ Get the calculation results of the rainfall with u and w without the estimation of the formula. Args: return_periods (float | np.array | list | pd.Series): return period in [a] max_duration (float): max duration in [min] Returns: pd.Series: series with duration as index and the height of the rainfall as data """ interim_results = pd.DataFrame(index=self.durations) interim_results.index.name = PARAM.DUR interim_results[PARAM.U] = self.parameters_series[PARAM.U] interim_results[PARAM.W] = self.parameters_series[PARAM.W] if max_duration is not None: interim_results = interim_results.loc[:max_duration].copy() return pd.Series(index=interim_results.index, data=interim_results[PARAM.U] + interim_results[PARAM.W] * np.log(return_periods))
def interim_plot_parameters(self): dur = self.durations fig, axes = plt.subplots(2, sharex=True) for ax, label in zip(axes, PARAM.U_AND_W): y = self.parameters_series[label] ax.plot(dur, y, lw=0, marker='.') for lower, upper in self._iter_params(): dur_ = dur[(dur >= lower) & (dur <= upper)] y_pred = self.parameters_final[lower][label].get_param(dur_) # y_pred = self.get_array_param(label, dur) ax.plot(dur_, y_pred, lw=1, label=f'${self.parameters_final[lower][label].latex_formula}$') ax.grid(ls=':', lw=0.5) ax.legend(title='Formula') for d in self.parameters_final: ax.axvline(d, color='black', lw=0.7, ls='--') ax.set_xscale('log', base=math.e) ax.set_xticks(dur) ax.set_xticklabels(duration_steps_readable(dur), rotation=90) ax.set_title(f'Parameter: {label}') return fig def get_duration_section(self, duration, param): for lower, upper in self._iter_params(): if lower <= duration <= upper: return self.parameters_final[lower][param]
[docs] def get_scalar_param(self, p, duration): """ Args: p (str): name of the parameter 'u' or 'w' duration (float | int): in minutes Returns: (float, float): u, w """ param = self.get_duration_section(duration, p) # type: _Formula if param is None: return np.nan return param.get_param(duration)
[docs] def get_array_param(self, p, duration): """ Args: p (str): name of the parameter 'u' or 'w' duration (numpy.ndarray): in minutes Returns: (numpy.ndarray, numpy.ndarray): u, w """ return np.vectorize(lambda d: self.get_scalar_param(p, d))(duration)
[docs] def get_u_w(self, duration): """ calculate the u and w parameters depending on the durations Args: duration (numpy.ndarray| list | float | int): in minutes Returns: (float, float): u and w """ if isinstance(duration, (list, np.ndarray)): func = self.get_array_param else: func = self.get_scalar_param return (func(p, duration) for p in PARAM.U_AND_W)
def to_yaml(self, filename): register_formulas_to_yaml() def to_basic(a): return list([round(float(i), 4) for i in a]) data = { PARAM.SERIES: self.series_kind, PARAM.DUR: to_basic(self.durations), PARAM.PARAMS_SERIES: {p: to_basic(l) for p, l in self.parameters_series.items()}, PARAM.PARAMS_FINAL: self.parameters_final } write_yaml(data, filename) def pprint(self): from pprint import pprint def to_basic(a): return list([round(float(i), 4) for i in a]) pprint({ PARAM.SERIES: self.series_kind, PARAM.DUR: to_basic(self.durations), PARAM.PARAMS_SERIES: {p: to_basic(l) for p, l in self.parameters_series.items()}, PARAM.PARAMS_FINAL: self.parameters_final }) @classmethod def from_yaml(cls, filename, worksheet=None): data = read_yaml(filename) p = cls(series_kind=data[PARAM.SERIES]) p.durations = data[PARAM.DUR] # list to numpy.array p.parameters_series = {p: np.array(l) for p, l in data[PARAM.PARAMS_SERIES].items()} if PARAM.PARAMS_FINAL in data: p.parameters_final = data[PARAM.PARAMS_FINAL] else: p.set_parameter_approaches_from_worksheet(worksheet) p._calc_params() return p
class ExtremeValueParameters: def __init__(self, series, duration_steps): self.series = series self.duration_steps = duration_steps self.base_frequency = guess_freq(self.series.index) # DateOffset/Timedelta self.base_delta = pd.Timedelta(self.base_frequency) # measuring time in years measurement_start, measurement_end = self.series.index[[0, -1]] self.measurement_period = (measurement_end - measurement_start) / year_delta(years=1) if round(self.measurement_period, 1) < 10: warnings.warn("The measurement period is too short. The results may be inaccurate! " "It is recommended to use at least ten years. " f"(-> Currently {self.measurement_period}a used)") # acc. to DWA-A 531 chap. 4.2: # The values must be independent of each other for the statistical evaluations. # estimated four hours acc. (Schilling, 1984) # for larger durations - use the duration as minimal gap self.min_event_gap = pd.Timedelta(hours=4) # Use only the (2-3 multiplied with the number of measuring years) of the biggest # values in the database (-> acc. to ATV-A 121 chap. 4.3; DWA-A 531 chap. 4.4). # As a requirement for the extreme value distribution. self.threshold_sample_size = int(math.floor(self.measurement_period * math.e)) self._data = {} self.u = [] self.w = [] self._intensities = {} @staticmethod def _improve_factor(interval): """ correction factor acc. to DWA-A 531 chap. 4.3 Args: interval (float): length of the interval: number of observations per duration Returns: float: correction factor """ improve_factor = {1: 1.14, 2: 1.07, 3: 1.04, 4: 1.03, 5: 1.00, 6: 1.00} return np.interp(interval, list(improve_factor.keys()), list(improve_factor.values())) @staticmethod def _plotting_formula(k, l, m): """ plotting function acc. to DWA-A 531 chap. 5.1.3 for the partial series Args: k (float or np.ndarray): running index l (float or np.ndarray): sample size m (float or np.ndarray): measurement period Returns: float: estimated empirical return period """ return (l + 0.2) * m / ((k - 0.4) * l) def evaluate(self, series_kind: str or Literal['partial', 'annual'] = SERIES.PARTIAL): """ Statistical analysis for each duration step. acc. to DWA-A 531 chap. 5.1 Save the parameters of the distribution function as interim results. Args: series_kind (str): which kind of series should be used to evaluate the extreme values. """ pbar = get_progress_bar(self.duration_steps, desc='Calculating Parameters u and w') for duration_integer in pbar: try: pbar.set_description(f'Calculating Parameters u and w for duration {duration_integer:0.0f}') except: ... if series_kind == SERIES.ANNUAL: x, y = self.annual_series(duration_integer) elif series_kind == SERIES.PARTIAL: x, y = self.partial_series(duration_integer) else: raise NotImplementedError(f"Unknown series kind {series_kind}") res = sps.linregress(x, y) self.u.append(res.intercept) self.w.append(res.slope) self._data[duration_integer] = {'x': x, 'y': y, 'u': res.intercept, 'w': res.slope, 'series_kind': series_kind} def get_intensities(self, duration_integer: int): if duration_integer not in self._intensities: duration = pd.Timedelta(minutes=duration_integer) if duration < self.base_delta: return if duration < self.min_event_gap: min_gap = self.min_event_gap else: min_gap = duration events = rain_events(self.series, min_gap=min_gap) # Correction factor acc. to DWA-A 531 chap. 4.3 improve = self._improve_factor(duration / self.base_delta) roll_sum = self.series.rolling(duration).sum() events['i'] = agg_events(events, roll_sum, 'max') * improve events['ix'] = agg_events(events, roll_sum, 'idxmax') self._intensities[duration_integer] = events.set_index('ix')['i'] return self._intensities[duration_integer] def annual_series(self, duration_integer: int): """ Create an annual series of the maximum overlapping sum per year and calculate the "u" and "w" parameters. acc. to DWA-A 531 chap. 5.1.5 Gumbel distribution | https://en.wikipedia.org/wiki/Gumbel_distribution Args: duration_integer (int): Duration step in minutes. Returns: dict[str, float]: Parameter u and w from the annual series for a specific duration step as a tuple. """ annually_series = self.get_intensities(duration_integer).resample('YE').max().sort_values(ascending=False).values x = -np.log(np.log((annually_series.size + 0.2) / (annually_series.size - sps.rankdata(annually_series)[::-1] + 0.6))) return x, annually_series def partial_series(self, duration_integer: int): """ Create a partial series of the largest overlapping sums and calculate the "u" and "w" parameters. acc. to DWA-A 531 chap. 5.1.4 Exponential distribution | https://en.wikipedia.org/wiki/Exponential_distribution Args: duration_integer (int): Duration step in minutes. Returns: dict[str, float]: parameter u and w from the partial series for a specific duration step as a tuple """ partially_series = self.get_intensities(duration_integer).sort_values(ascending=False).values if partially_series.size < self.threshold_sample_size: warnings.warn('Fewer events in series than recommended for extreme value analysis. Use the results with mindfulness.') else: partially_series = partially_series[:self.threshold_sample_size] x = np.log(self._plotting_formula(sps.rankdata(partially_series)[::-1], partially_series.size, self.measurement_period)) return x, partially_series def plot_series(self, ncols=3): n_plots = len(self.duration_steps) n_cols = min(ncols, n_plots) n_rows = math.ceil(n_plots / n_cols) fig, axes = plt.subplots(nrows=n_rows, ncols=n_cols) fig.suptitle(f'Interim Results of {self._data[self.duration_steps[0]]["series_kind"]}-series Plot') if n_plots == 1: axes = np.array([axes]) axes = axes.flatten() for ax, duration_integer in zip(axes, self.duration_steps): data = self._data[duration_integer] x = data['x'] # probability y = data['y'] # intensity line = data['u'] + x * data['w'] ax.axhline(data["u"], lw=0.7, color='black') ax.axvline(0, lw=0.7, color='black') ax.plot(x, line, label='Fitted model', color='C1', linewidth=2) ax.scatter(x, y, s=30, label='Observed data', color='C2', alpha=0.7) dur_plot = minutes_readable(duration_integer) ax.set_ylabel(f'Intensity (mm/{dur_plot})') ax.set_xlabel(r'$ln(T_n) = ln(\frac{L + 0.2}{k-0.4}*\frac{M}{L})$') ax.set_title(f'Duration: {dur_plot}\nu={data["u"]:0.2f}, w={data["w"]:0.2f}') ax2 = ax.twiny() xlim = ax.get_xlim() ax2.set_xlim(math.exp(xlim[0]), math.exp(xlim[1])) ax.legend() ax.grid(linestyle='--', alpha=0.7) # Hide empty subplots for ax in axes[n_plots:]: ax.set_visible(False) return fig