Source code for idf_analysis.idf_backend

__author__ = "Markus Pichler"
__credits__ = ["Markus Pichler"]
__maintainer__ = "Markus Pichler"
__email__ = "markus.pichler@tugraz.at"
__version__ = "0.1"
__license__ = "MIT"

from collections import OrderedDict

import pandas as pd
import numpy as np
from pathlib import Path

from .definitions import *
from .event_series_analysis import calculate_u_w, iter_event_series
from .in_out import write_yaml, read_yaml
from .parameter_formulation_class import FORMULATION_REGISTER, _Formulation, register_formulations_to_yaml


[docs] class IdfParameters: def __init__(self, series_kind=SERIES.PARTIAL, worksheet=METHOD.KOSTRA, extended_durations=False): self.series_kind = series_kind self._durations = None self._set_default_durations(extended_durations) self.parameters_series = {} # parameters u and w (distribution function) from the event series analysis self.parameters_final = {} # parameters of the distribution function after the regression # {lower duration bound in minutes: {parameter u or w: {'function': function-name}}} self.set_parameter_approaches_from_worksheet(worksheet) def __bool__(self): return bool(self.parameters_series) def calc_from_series(self, series): u_w = calculate_u_w(series, self.durations, self.series_kind) for p in PARAM.U_AND_W: self.parameters_series[p] = np.array([d[p] for d in u_w.values()]) self._calc_params() # def get_event_max_series(self, series): # TODO # interim_results = {} # # for rolling_sum_values, duration_integer, _ in iter_event_series(series, self.durations): # interim_results[duration_integer] = rolling_sum_values # return interim_results def reverse_engineering(self, idf_table, linear_interpolation=False): durations = idf_table.index.values u = idf_table[1].values w_dat = idf_table.sub(u, axis=0) # -- w = [] for dur in durations: dat = w_dat.loc[dur] log_tn_i = np.log(dat.index.values) w.append(sum(dat.values * log_tn_i) / sum(log_tn_i ** 2)) # ------- # ax = dat.plot(logx=True) # fig = ax.get_figure() # fig.show() # print(results.summary()) self.durations = durations self.parameters_series[PARAM.U] = np.array(u) self.parameters_series[PARAM.W] = np.array(w) if linear_interpolation: self.clear_parameter_approaches() self.add_parameter_approach(0, 'linear', 'linear') self._calc_params() else: self._calc_params() @property def durations(self): return np.array(self._durations) @durations.setter def durations(self, durations): self._durations = list(durations) def _set_default_durations(self, extended_durations=False): # Suggestion from ATV-A 121 (ATV-DVWK-Regelwerk 2/2001) # sampling points of the duration steps in minutes duration_steps = [5, 10, 15, 20, 30, 45, 60, 90] # self._duration_steps += [i * 60 for i in [3, 4.5, 6, 7.5, 10, 12, 18]] # duration steps in hours duration_steps += [i * 60 for i in [2, 3, 4, 6, 9, 12, 18]] # duration steps in hours if extended_durations: duration_steps += [i * 60 * 24 for i in [1, 2, 3, 4, 5, 6]] # duration steps in days self.durations = duration_steps def filter_durations(self, freq_minutes): self.limit_duration(lowest=freq_minutes) def limit_duration(self, lowest=None, highest=None): bool_array = self.durations == False if lowest is not None: bool_array |= self.durations >= lowest if highest is not None: bool_array |= self.durations <= highest self.durations = self.durations[bool_array] for param in PARAM.U_AND_W: if param in self.parameters_series: self.parameters_series[param] = self.parameters_series[param][bool_array]
[docs] def set_parameter_approaches_from_worksheet(self, worksheet): """ Set approaches depending on the duration and the parameter. Args: worksheet (str): worksheet name for the analysis: - 'DWA-A_531' - 'ATV-A_121' - 'DWA-A_531_advektiv' (yet not implemented) Returns: list[dict]: table of approaches depending on the duration and the parameter """ self.parameters_final = read_yaml(Path(__file__).parent / 'approaches' / (worksheet + '.yaml'))
def add_parameter_approach(self, duration_bound, approach_u, approach_w): self.parameters_final[duration_bound] = { PARAM.U: {PARAM.FUNCTION: approach_u}, PARAM.W: {PARAM.FUNCTION: approach_w}, } def clear_parameter_approaches(self): self.parameters_final = {} def _iter_params(self): lower_bounds = list(self.parameters_final.keys()) return zip(lower_bounds, lower_bounds[1:] + [np.inf]) def _calc_params(self, params_mean=None, duration_mean=None): """ Calculate parameters a_u, a_w, b_u and b_w and add it to the dict Args: params_mean (dict[float]): duration_mean (float): Returns: list[dict]: parameters """ for dur_lower_bound, dur_upper_bound in self._iter_params(): param_part = (self.durations >= dur_lower_bound) & (self.durations <= dur_upper_bound) if param_part.sum() == 1: del self.parameters_final[dur_lower_bound] # Only one duration step in this duration range. # Only one value available in the series for this regression. continue params_dur = self.parameters_final[dur_lower_bound] dur = self.durations[param_part] for p in PARAM.U_AND_W: # u or w values_series = self.parameters_series[p][param_part] if params_mean: param_mean = params_mean[p] else: param_mean = None # ---------------------------- if not isinstance(params_dur[p], _Formulation): approach = params_dur[p][PARAM.FUNCTION] if approach not in FORMULATION_REGISTER: raise NotImplementedError(f'{approach=}') params_dur[p] = _Formulation.from_dict(params_dur[p]) # init object if params_dur[p].a is None or params_dur[p].b is None or param_mean is not None: params_dur[p].fit(dur, values_series, param_mean=param_mean, duration_mean=duration_mean) # ---------------------------- # balance_parameter_change # TODO only between Not linear ranges if params_mean is None and (len(self.parameters_final) > 1): print('_balance_parameter_change') # the balance between the different duration ranges acc. to DWA-A 531 chap. 5.2.4 duration_step = list(self.parameters_final.keys())[1] durations = np.array([duration_step - 0.001, duration_step + 0.001]) # if the interim results end here if any(durations < self.durations.min()) or \ any(durations > self.durations.max()): return u, w = self.get_u_w(durations) self._calc_params(params_mean={PARAM.U: np.mean(u), PARAM.W: np.mean(w)}, duration_mean=duration_step)
[docs] def measured_points(self, return_periods, max_duration=None): """ get the calculation results of the rainfall with u and w without the estimation of the formulation Args: return_periods (float | np.array | list | pd.Series): return period in [a] max_duration (float): max duration in [min] Returns: pd.Series: series with duration as index and the height of the rainfall as data """ interim_results = pd.DataFrame(index=self.durations) interim_results.index.name = PARAM.DUR interim_results[PARAM.U] = self.parameters_series[PARAM.U] interim_results[PARAM.W] = self.parameters_series[PARAM.W] if max_duration is not None: interim_results = interim_results.loc[:max_duration].copy() return pd.Series(index=interim_results.index, data=interim_results[PARAM.U] + interim_results[PARAM.W] * np.log(return_periods))
def get_duration_section(self, duration, param): for lower, upper in self._iter_params(): if lower <= duration <= upper: return self.parameters_final[lower][param]
[docs] def get_scalar_param(self, p, duration): """ Args: p (str): name of the parameter 'u' or 'w' duration (float | int): in minutes Returns: (float, float): u, w """ param = self.get_duration_section(duration, p) # type: _Formulation if param is None: return np.NaN return param.get_param(duration)
[docs] def get_array_param(self, p, duration): """ Args: p (str): name of the parameter 'u' or 'w' duration (numpy.ndarray): in minutes Returns: (numpy.ndarray, numpy.ndarray): u, w """ return np.vectorize(lambda d: self.get_scalar_param(p, d))(duration)
[docs] def get_u_w(self, duration): """ calculate the u and w parameters depending on the durations Args: duration (numpy.ndarray| list | float | int): in minutes Returns: (float, float): u and w """ if isinstance(duration, (list, np.ndarray)): func = self.get_array_param else: func = self.get_scalar_param return (func(p, duration) for p in PARAM.U_AND_W)
[docs] @classmethod def from_interim_results_file(cls, interim_results_fn, worksheet=METHOD.KOSTRA): """DEPRECIATED: for compatibility reasons. To use the old file and convert it to the new parameters file""" interim_results = pd.read_csv(interim_results_fn, index_col=0, header=0) p = cls(worksheet, interim_results) return p
def to_yaml(self, filename): register_formulations_to_yaml() def to_basic(a): return list([round(float(i), 4) for i in a]) data = OrderedDict({ PARAM.SERIES: self.series_kind, PARAM.DUR: to_basic(self.durations), PARAM.PARAMS_SERIES: {p: to_basic(l) for p, l in self.parameters_series.items()}, PARAM.PARAMS_FINAL: self.parameters_final }) write_yaml(data, filename) @classmethod def from_yaml(cls, filename): data = read_yaml(filename) if isinstance(data, list): return cls.from_yaml_depreciated(filename) p = cls(series_kind=data[PARAM.SERIES]) p.durations = data[PARAM.DUR] # list to numpy.array p.parameters_series = {p: np.array(l) for p, l in data[PARAM.PARAMS_SERIES].items()} p.parameters_final = data[PARAM.PARAMS_FINAL] p._calc_params() return p @classmethod def from_yaml_depreciated(cls, filename, series_kind=SERIES.PARTIAL): data = read_yaml(filename) p = cls(series_kind=series_kind) p.durations = [] p.parameters_series = {PARAM.U: [], PARAM.W: []} p.parameters_final = {} for part in data: start_dur = float(part['von']) start_idx = 0 if start_dur in part[COL.DUR]: start_idx = 1 p._durations += part[COL.DUR][start_idx:] part_params = {} for u_or_w in PARAM.U_AND_W: part_params[u_or_w] = {PARAM.FUNCTION: part[u_or_w]} if part[u_or_w] != APPROACH.LIN: part_params[u_or_w][PARAM.A] = part[f'{PARAM.A}_{u_or_w}'] part_params[u_or_w][PARAM.B] = part[f'{PARAM.B}_{u_or_w}'] p.parameters_series[u_or_w] += part[PARAM.VALUES(u_or_w)][start_idx:] p.parameters_final[start_dur] = part_params p.parameters_series = {param: np.array(l) for param, l in p.parameters_series.items()} if '.yaml' in filename: os.rename(filename, filename.replace('.yaml', '_OLD.yaml')) p.to_yaml(filename) return p