import math
import warnings
from pathlib import Path
import webbrowser
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.optimize as spo
from matplotlib.backends.backend_pdf import PdfPages
from .arg_parser import heavy_rain_parser
from .definitions import *
from .idf_backend import IdfParameters
from .in_out import import_series
from .little_helpers import (minutes_readable, height2rate, delta2min, rate2height, frame_looper, event_caption,
duration_steps_readable, get_progress_bar)
from .plot_helpers import idf_bar_axes
from .sww_utils import (guess_freq, rain_events, agg_events, event_duration, resample_rain_series, rain_bar_plot,
from .synthetic_rainseries import _BlockRain, _EulerRain
class IntensityDurationFrequencyAnalyse:
heavy rain as a function of the duration and the return period acc. to DWA-A 531 (2012)
This program reads the measurement data of the rainfall
and calculates the distribution of the rainfall as a function of the return period and the duration
for duration steps up to 12 hours (and more) and return period in a range of '0.5a <= T_n <= 100a'
_series (pandas.Series): rain time-series
_freq (pandas.DateOffset): frequency of the rain series
_return_periods_frame (pandas.DataFrame): with return periods of all given durations
_rain_events (pandas.DataFrame):
_rainfall_sum_frame (pandas.DataFrame): with rain sums of all given durations
def __init__(self, series_kind=SERIES.PARTIAL, worksheet=METHOD.KOSTRA, extended_durations=False, unit='mm'):
Heavy rainfall intensity as a function of duration and return period acc. to DWA-A 531 (2012).
This program reads the measurement data of the rainfall
and calculates the distribution of the rainfall as a function of the return period and the duration
for duration steps up to 12 hours (and more) and return period in a range of 0.5a and 100a.
series_kind (str): ['partial', 'annual']
worksheet (str): ['DWA-A_531', 'ATV-A_121', 'DWA-A_531_advektiv']
extended_durations (bool): add [720, 1080, 1440, 2880, 4320, 5760, 7200, 8640] minutes to the calculation
unit (str, optional): Unit for rainfall (default: 'mm').
self._series = None
self._freq = None
# how to calculate the idf curves
self._parameters = IdfParameters(series_kind=series_kind, worksheet=worksheet,
self._unit = unit
self._return_periods_frame = None
self._rain_events = None
self._rainfall_sum_frame = None
self._duration_steps_for_output = None
# __________________________________________________________________________________________________________________
def series(self) -> pd.Series:
if self._series is None:
raise IdfError('No Series defined for IDF-Analysis!')
return self._series
def series(self, series: pd.Series):
self._series = series
def set_series(self, series, unit='mm'):
set the series for the analysis
series (pandas.Series): precipitation time-series
unit (str, optional): Unit for rainfall (default: 'mm').
if not isinstance(series, pd.Series):
raise IdfError('The series has to be a pandas Series.')
if not isinstance(series.index, pd.DatetimeIndex):
raise IdfError('The series has to have a DatetimeIndex.')
# if is not None:
# series = remove_timezone(series)
self._freq = guess_freq(series.index)
freq_minutes = delta2min(self._freq)
self.series = series.replace(0, np.nan).dropna()
self._return_periods_frame = None
self._rain_events = None
self._rainfall_sum_frame = None
self._unit = unit
# __________________________________________________________________________________________________________________
def duration_steps(self):
get duration steps (in minutes) for the parameter calculation and basic evaluations
list | numpy.ndarray: duration steps in minutes
return self.parameters.durations
def duration_steps(self, durations):
Set duration steps (in minutes) for the parameter calculation and basic evaluations.
durations (list | numpy.ndarray): duration steps in minutes
if not isinstance(durations, (list, np.ndarray)):
raise IdfError(f'Duration steps have to be {(list, np.ndarray)} got "{type(durations)}"')
self.parameters.durations = durations
def duration_steps_for_output(self):
get duration steps (in minutes) for the parameter calculation and basic evaluations
list | numpy.ndarray: duration steps in minutes
if self._duration_steps_for_output is None:
self._duration_steps_for_output = self.duration_steps.copy()
return self._duration_steps_for_output
def duration_steps_for_output(self, durations):
Set duration steps (in minutes) for the parameter calculation and basic evaluations.
durations (list | numpy.ndarray): duration steps in minutes
if not isinstance(durations, (list, np.ndarray)):
raise IdfError(f'Duration steps have to be {(list, np.ndarray)} got "{type(durations)}"')
self._duration_steps_for_output = durations
# __________________________________________________________________________________________________________________
def parameters(self):
get the calculation parameters
calculation method depending on the used worksheet and on the duration
also the parameters for each method
to save some time and save the parameters with
and read them later with :func:`IntensityDurationFrequencyAnalyse.read_parameters`
IdfParameters: calculation parameters
if not self._parameters.parameters_series and self._series is not None:
return self._parameters
def write_parameters(self, filename):
save parameters as yaml-file to save computation time.
filename (str): filename for the parameters yaml-file
def read_parameters(self, filename, worksheet=None):
Read parameters from a .yaml-file to save computation time.
Extract interim results from parameters.
filename (str, Path): filename of the parameters yaml-file.
worksheet (str): ['DWA-A_531', 'ATV-A_121', 'DWA-A_531_advektiv']
self._parameters = IdfParameters.from_yaml(filename, worksheet)
def auto_save_parameters(self, filename: str or Path):
"""Auto-save the parameters as a yaml-file to save computation time."""
if isinstance(filename, str):
filename = Path(filename)
if filename.is_file():
if not filename.parent.is_dir():
# __________________________________________________________________________________________________________________
def depth_of_rainfall(self, duration, return_period):
calculate the height of the rainfall h in L/m² = mm (respectively the unit of the series)
duration (int | float | list | numpy.ndarray | pandas.Series): duration: in minutes
return_period (float): in years
int | float | list | numpy.ndarray | pandas.Series: height of the rainfall h in L/m² = mm (respectively the unit of the series)
if self.parameters.series_kind == SERIES.ANNUAL:
if return_period < 5:
print('WARNING: Using an annual series and a return period < 5 a will result in faulty values!')
if return_period <= 10:
return_period = np.exp(1.0 / return_period) / (np.exp(1.0 / return_period) - 1.0)
log_tn = -np.log(np.log(return_period / (return_period - 1.0)))
log_tn = np.log(return_period)
u, w = self.parameters.get_u_w(duration)
return u + w * log_tn
# __________________________________________________________________________________________________________________
def rain_flow_rate(self, duration, return_period):
Convert the height of rainfall to the specific rain flow rate in [l/(s*ha)].
if 2 array-like parameters are give, an element-wise calculation will be made.
So the length of the array must be the same.
duration (int | float | list | numpy.ndarray | pandas.Series): in minutes
return_period (float): in years
int | float | list | numpy.ndarray | pandas.Series: specific rain flow rate in [l/(s*ha)]
known_units = {'mm', 'l/m²', 'l/m2'}
if self._unit.lower() not in known_units:
raise NotImplementedError(f'rain_flow_rate function not implemented for unit={self._unit} ({known_units=})')
height_of_rainfall = self.depth_of_rainfall(duration=duration, return_period=return_period)
return height2rate(height_of_rainfall=height_of_rainfall, duration=duration)
# __________________________________________________________________________________________________________________
def r_720_1(self):
rain flow rate in [l/(s*ha)] for a duration of 12h and a return period of 1 year
float: rain flow rate in [l/(s*ha)]
return self.rain_flow_rate(duration=720, return_period=1)
# __________________________________________________________________________________________________________________
def get_return_period(self, height_of_rainfall, duration):
calculate the return period, when the height of rainfall and the duration are given
height_of_rainfall (float): in [mm]
duration (int | float | list | numpy.ndarray | pandas.Series): in minutes
int | float | list | numpy.ndarray | pandas.Series: return period in years
u, w = self.parameters.get_u_w(duration)
return np.exp((height_of_rainfall - u) / w)
# __________________________________________________________________________________________________________________
def get_duration(self, height_of_rainfall, return_period):
calculate the duration, when the height of rainfall and the return period are given
height_of_rainfall (float): in [mm]
return_period (float): in years
float: duration in minutes
return spo.newton(lambda d: self.depth_of_rainfall(d, return_period) - height_of_rainfall, x0=1)
# __________________________________________________________________________________________________________________
def result_table(self, durations=None, return_periods=None, add_names=False, add_unit=True, as_intensity=False):
Get an idf-table of rainfall depth with return periods as columns and durations as rows.
durations (list | numpy.ndarray): list of durations in minutes for the table
return_periods (list): list of return periods in years for the table
add_names (bool): weather to use expressive names as index- & column-label
add_unit (bool): weather to add units to index- & column-label
pandas.DataFrame: idf table
if durations is None:
durations = self.duration_steps_for_output
if return_periods is None:
return_periods = [1, 2, 3, 5, 10, 20, 25, 30, 50, 75, 100]
result_table = {}
for t in return_periods:
result_table[t] = self.depth_of_rainfall(durations, t)
if as_intensity:
result_table[t] /= durations / 60 # mm/h
result_table = pd.DataFrame(result_table, index=durations)
result_table.index = result_table.index.astype(int) # there should be no float in minutes
if add_names: = 'duration' + (' (min)' if add_unit else '')
result_table.columns = pd.MultiIndex.from_tuples([(rp, round(1 / rp, 3)) for rp in result_table.columns])
result_table.columns.names = ['return period' + (' (a)' if add_unit else ''),
'frequency' + (' (1/a)' if add_unit else '')]
return result_table
def get_rainfall_sum_frame(self, series=None, durations=None):
Get a rainfall sum frame for any series with the duration steps as columns.
Default: The time-series and the duration-steps of the analysis.
series (pandas.Series, Optional): rainfall time-series
durations (list, Optional): list of durations in minutes which are of interest (default: pre-defined durations)
pandas.DataFrame: Rain sum depending on the duration per datetime-index.
if durations is None:
durations = self.duration_steps_for_output
if series is None:
if self._rainfall_sum_frame is not None:
return self._rainfall_sum_frame[durations]
ts = self.series.copy()
freq = self._freq
freq = guess_freq(series.index)
ts = series.copy()
ts = ts.asfreq(freq).fillna(0)
# ts = series.replace(0, np.nan).dropna()
df = pd.DataFrame(index=ts.index)
# df = {}
freq_num = delta2min(freq)
for d in frame_looper(ts.index.size, columns=durations, label='rainfall_sum'):
if d % freq_num != 0:
warnings.warn(f'Using durations (= {d} minutes), '
f'which are not a multiple of the base frequency (= {freq_num} minutes) of the series, '
f'will lead to misinterpretations.')
df[d] = ts.rolling(pd.Timedelta(minutes=d)).sum()
# printable_names (bool): if durations should be as readable in dataframe, else in minutes
# df = df.rename(minutes_readable, axis=0)
return df # .round(2)
def rainfall_sum_frame(self):
Get the rainfall sum over the whole time-series for the default duration steps.
pandas.DataFrame: Rain sum depending on the duration per datetime-index.
if self._rainfall_sum_frame is None:
self._rainfall_sum_frame = self.get_rainfall_sum_frame()
return self._rainfall_sum_frame
def get_return_periods_frame(self, series=None, durations=None):
Get the return periods for any time-series with the duration steps as columns.
Default: The time-series and the duration-steps of the analysis.
Is NaN if rainfall sum is smaller than 0.1 mm.
series (pandas.Series, Optional): rainfall time-series
durations (list, Optional): Durations in minutes which are of interest (default: pre-defined durations)
pandas.DataFrame: Return periods depending on the duration per datetime-index.
sums = self.get_rainfall_sum_frame(series=series, durations=durations)
df = pd.DataFrame(index=sums.index)
# df = {}
for d in frame_looper(sums.index.size, columns=sums.columns, label='return_periods'):
df[d] = self.get_return_period(height_of_rainfall=sums[d][sums[d] >= 0.1], duration=d)
return df # .fillna(0)#.round(2)
def return_periods_frame(self):
Get the return periods over the whole time-series for the default duration steps.
Is NaN if rainfall sum is smaller than 0.1 mm.
pandas.DataFrame: data-frame of return periods where the columns are the duration steps
if self._return_periods_frame is None:
self._return_periods_frame = self.get_return_periods_frame()
return self._return_periods_frame
def write_return_periods_frame(self, filename, **kwargs):
"""save the return-periods dataframe as a parquet-file to save computation time."""
df = self.return_periods_frame.copy()
df.columns = df.columns.to_series().astype(str)
df.round(2).to_parquet(filename, **kwargs)
def read_return_periods_frame(self, filename, **kwargs):
"""read the return-periods dataframe as a parquet-file to save computation time."""
df = pd.read_parquet(filename, **kwargs)
df.columns = df.columns.to_series().astype(int)
self._return_periods_frame = df
def auto_save_return_periods_frame(self, filename: Path or str):
"""auto-save the return-periods dataframe as a parquet-file to save computation time."""
if isinstance(filename, str):
filename = Path(filename)
if filename.is_file():
def rain_events(self):
get the all the rain events of the time-series
default minimal gap between events is 4 hours
pandas.DataFrame: data-frame of events with start-, end-time and duration
if self._rain_events is None:
events = rain_events(self.series, min_gap=max(pd.Timedelta(hours=4), self._freq))
events[COL.DUR] = event_duration(events) + pd.Timedelta(self._freq)
events[COL.LP] = agg_events(events, self.series, 'sum').round(1)
events[COL.LAST] = events[COL.START] - events[COL.END].shift()
# events = events.sort_values(by=COL.LP, ascending=False)
self._rain_events = events
return self._rain_events
def write_rain_events(self, filename, sep=';', decimal='.'):
"""save the rain-events dataframe as a csv-file for external use or to save computation time."""
self.rain_events.to_csv(filename, index=False, sep=sep, decimal=decimal)
def read_rain_events(self, filename, sep=';', decimal='.'):
"""read the rain-events dataframe as a csv-file to save computation time."""
events = pd.read_csv(filename, skipinitialspace=True, sep=sep, decimal=decimal)
events[COL.START] = pd.to_datetime(events[COL.START])
events[COL.END] = pd.to_datetime(events[COL.END])
events[COL.DUR] = pd.to_timedelta(events[COL.DUR])
events[COL.LAST] = pd.to_timedelta(events[COL.LAST])
self._rain_events = events
def auto_save_rain_events(self, filename: Path or str, sep=';', decimal='.'):
"""auto-save the rain-events dataframe as a csv-file to save computation time."""
if isinstance(filename, str):
filename = Path(filename)
if filename.is_file():
self.read_rain_events(filename, sep=sep, decimal=decimal)
self.write_rain_events(filename, sep=sep, decimal=decimal)
def add_max_return_periods_to_events(self, events):
if COL.MAX_PERIOD not in events:
return_periods_frame = self.return_periods_frame
# maximum return period for every timestep
max_periods = return_periods_frame.max(axis=1).fillna(0) # fill NaN -> weil < 0.1 gefiltert wurde
datetime_max = agg_events(events, max_periods, 'idxmax')
datetime_max = np.where(np.isnan(datetime_max), events[COL.START].values, datetime_max)
# alternative:
# [xv if c else yv for c, xv, yv in zip(np.isnan(datetime_max), events[COL.START].values, datetime_max)]
if is not None:
datetime_max = pd.DatetimeIndex(datetime_max).tz_localize('utc').tz_convert(
events[COL.MAX_PERIOD] = max_periods[datetime_max].values
_select_tn_frame = return_periods_frame.loc[datetime_max].copy()
_select_tn_frame.loc[:, 0] = 0
events[COL.MAX_PERIOD_DURATION] = _select_tn_frame.idxmax(axis=1, skipna=True).values
events[COL.MAX_PERIOD_DURATION] = events[COL.MAX_PERIOD_DURATION].replace(0, np.nan)
def get_max_event_intensities_frame(self, events):
sum_frame = self.rainfall_sum_frame
di = {}
for duration in self.duration_steps:
di[duration] = agg_events(events, sum_frame[duration], 'max').round(2)
return pd.DataFrame(di, index=events.index)
def add_max_intensities_to_events(self, events, column_format='max_sum_{:0.0f}'):
Add the maximum intensities for all duration steps to the events table.
events (pandas.DataFrame): events table
column_format (str): format of the column names.
pandas.DataFrame: events table including the columns with the maximum intensities
return pd.concat([events, self.get_max_event_intensities_frame(events).rename(columns=column_format.format)],
def get_max_return_periods_per_durations_frame(self, events):
return_periods_frame = self.return_periods_frame
di = {}
for duration in self.duration_steps:
di[duration] = agg_events(events, return_periods_frame[duration], 'max').round(2)
return pd.DataFrame(di, index=events.index)
def add_max_return_periods_per_duration_to_events(self, events, column_format='max_return_period_{:0.0f}'):
Add the maximum return periods for all duration steps to the events table.
events (pandas.DataFrame): events table
column_format (str): format of the column names.
pandas.DataFrame: events table including the columns with the maximum return periods
return pd.concat(
[events, self.get_max_return_periods_per_durations_frame(events).rename(columns=column_format.format)],
def from_idf_table(cls, idf_table, worksheet=METHOD.KOSTRA, linear_interpolation=True):
Create an IDF-analysis-object based on an idf-tabel (i.e. from a given KOSTRA table)
idf_table (pandas.DataFrame): idf-table with index=Durations and columns=return Period and values=Rainheight
worksheet (str | optional): name of the worksheet to use. default: 'KOSTRA'
linear_interpolation (bool): only use linear interpolation between parameters.
IntensityDurationFrequencyAnalyse: idf-object
idf = cls(worksheet=worksheet)
idf._parameters.reverse_engineering(idf_table, linear_interpolation=linear_interpolation)
idf.parameters.series_kind = 'from IDF table'
return idf
# ############################################### DESIGN RAINFALL ##################################################
def model_rain_block(self):
Create a model block rain class.
_BlockRain: Synthetic model block rain.
return _BlockRain(self)
def model_rain_euler(self):
Create a model Euler rain class.
_EulerRain: Synthetic model Euler rain.
return _EulerRain(self)
# ############################################### PLOTS ############################################################
def event_report(self, filename, min_event_rain_sum=25, min_return_period=0.5, durations=None):
create pdf file with the biggest rain events
for each event is represented by a plot of the rain series
and a IDF analysis where the return periods are calculated
filename (str): path (directory + filename) for the created pdf-report
min_event_rain_sum (float): only events with a bigger rain sum will be created
min_return_period (float): only events with a bigger return period will be analysed
(the plot will be created anyway)
durations (list[int]): analysed durations
(default: [5, 10, 15, 20, 30, 45, 60, 90, 120, 180, 240, 360, 540, 720, 1080, 1440, 2880, 4320])
events = self.rain_events
main_events = events[
(events[COL.LP] > min_event_rain_sum) & (events[COL.MAX_PERIOD] > min_return_period)].sort_values(
by=COL.MAX_PERIOD, ascending=False).to_dict(
column_name = 'Precipitation'
pdf = PdfPages(filename)
for _, event in get_progress_bar(main_events.items()):
fig, caption = self.event_plot(event, min_return_period=min_return_period, column_name=column_name)
# -------------------------------------
fig.get_axes()[0].set_title(caption + '\n\n\n')
# DIN A4
fig.set_size_inches(w=8.27, h=11.69)
# fig.tight_layout()
def event_plot(self, event, durations=None, column_name='Precipitation', min_return_period=1., german_caption=False,
if isinstance(event, pd.Series):
event = event.to_dict()
plot_range = slice(event[COL.START] - pd.Timedelta(self._freq), event[COL.END] + pd.Timedelta(self._freq))
return_periods_frame = self.return_periods_frame[plot_range]
if COL.MAX_PERIOD not in event:
event[COL.MAX_PERIOD] = return_periods_frame.max().max()
event[COL.MAX_PERIOD_DURATION] = return_periods_frame.max().idxmax()
sum_frame_event = self.rainfall_sum_frame[plot_range]
ts = self.series[plot_range].resample(self._freq).sum().fillna(0).copy()
# -------------------------------------
fig = plt.figure()
if event[COL.MAX_PERIOD] < min_return_period:
rain_ax = fig.add_subplot(111)
if max_duration is None:
if durations is not None:
max_dur = max(durations)
max_dur = max(self.duration_steps_for_output)
max_dur = max_duration
return_periods_frame_extended = self.get_return_periods_frame(
self.series[event[COL.START] - pd.Timedelta(minutes=max_dur):
event[COL.END] + pd.Timedelta(self._freq)].resample(self._freq).sum(),
idf_bar_ax = fig.add_subplot(211)
idf_bar_ax = idf_bar_axes(idf_bar_ax, return_periods_frame_extended)
rain_ax = fig.add_subplot(212, sharex=idf_bar_ax)
# -------------------------------------
ts_sum, minutes = resample_rain_series(ts)
rain_ax = rain_bar_plot(ts_sum, rain_ax)
rain_ax.set_ylabel(f'{column_name} in {self._unit}/{minutes if minutes != 1 else ""}min')
if ts.index.size > 1:
rain_ax.set_xlim(ts.index[0], ts.index[-1])
return fig, event_caption(event, self._unit, lang='de' if german_caption else 'en')
# alias
result_figure = curve_figure
def event_return_period_report(self, filename, min_return_period=1):
events = self.rain_events
main_events = events[events[COL.MAX_PERIOD] > min_return_period].sort_values(by=COL.MAX_PERIOD,
pdf = PdfPages(filename)
for _, event in get_progress_bar(main_events.to_dict(orient='index').items()):
fig, ax = self.return_period_event_figure(event)
# -------------------------------------
# DIN A4
fig.set_size_inches(h=11.69 / 2, w=8.27)
# fig.tight_layout()
def return_period_event_figure(self, event):
period_line = self.return_periods_frame[event[COL.START]:event[COL.END]].max()
# period_line[period_line < 0.75] = np.nan
period_line = period_line.dropna()
ax = period_line.plot() # type: plt.Axes
event_ = event.copy()
event_[COL.MAX_PERIOD] = period_line.max()
event_[COL.MAX_PERIOD_DURATION] = period_line.idxmax()
ax.set_title(event_caption(event_, unit=self._unit, lang='en'))
# ax.set_xscale('log')
# ax.set_yscale('log')
# print(ax.get_ylim())
# ax.set_ylim(0.01, 300)
# exit()
ax.set_xticklabels([minutes_readable(m) for m in period_line.index])
ax.set_xlabel('duration steps')
ax.set_ylabel('return period in years')
return ax.get_figure(), ax
def return_period_scatter(self, min_event_sum=25, durations=None):
if durations is None:
durations = [5, 10, 15, 20, 30, 45, 60, 90, 120, 180, 240, 360, 540, 720, 1080, 1440, 2880, 4320]
dur_short = durations[:durations.index(90)]
dur_long = durations[durations.index(90):]
events = self.rain_events
events[COL.LP] = agg_events(events, self.series, 'sum')
events = events[events[COL.LP] > min_event_sum].copy()
tn_long_list = {}
tn_short_list = {}
for _, event in events.iterrows():
start = event[COL.START]
end = event[COL.END]
# save true
idf_table = self.return_periods_frame[start:end]
# idf_table = idf_table.rename(columns=minutes_readable)
# idf_table[idf_table < min_return_period] = np.nan
tn = idf_table.loc[start:end]
tn_short = tn[dur_short].max().max()
tn_long = tn[dur_long].max().max()
if tn_long > tn_short:
tn_long_list[start] = tn_long
tn_short_list[start] = tn_short
# check()
fig, ax = plt.subplots()
ax.scatter(x=list(tn_short_list.keys()), y=list(tn_short_list.values()), color='red')
ax.scatter(x=list(tn_long_list.keys()), y=list(tn_long_list.values()), color='blue')
fig = ax.get_figure()
ax.set_ylabel('Return Period in a')
def line_in_legend(color=None, marker=None, lw=None, ls=None, **kwargs):
from matplotlib.lines import Line2D
return Line2D([0], [0], color=color, marker=marker, linewidth=lw, linestyle=ls, **kwargs)
custom_lines = []
custom_lines.append(line_in_legend(color='red', marker='o', lw=0))
custom_lines.append(line_in_legend(color='blue', marker='o', lw=0))
# -----------------
l1 = ax.legend(custom_lines, ['< 60 min', '> 60 min'], loc='best', title='max Duration')
return fig, ax
# ############################################### CL TOOL ##########################################################
def command_line_tool(cls):
user = heavy_rain_parser()
# --------------------------------------------------
# use the same directory as the input file and make as subdir with the name of the input_file + "_idf_data"
out = '{label}_idf_data'.format(label='.'.join(user.input.split('.')[:-1]))
out = Path(out)
if not out.is_dir():
action = 'Creating'
action = 'Using'
print(f'{action} the subfolder "{out}" for the interim- and final-results.')
prefix = 'idf_'
# --------------------------------------------------
idf = cls(series_kind=user.series_kind, worksheet=user.worksheet, extended_durations=True)
# --------------------------------------------------
parameters_fn = out / f'{prefix}parameters.yaml'
if parameters_fn.is_file():
print(f'Found existing interim-results in "{parameters_fn}" and using them for calculations.')
print(f'Start reading the time-series {user.input} for the analysis.')
ts = import_series(user.input).replace(0, np.nan).dropna()
# --------------------------------------------------
print('Finished reading.')
# --------------------------------------------------
# --------------------------------------------------
h = user.height_of_rainfall
r = user.flow_rate_of_rainfall
d = user.duration
t = user.return_period
if r is not None:
if h is None and d is not None:
h = rate2height(rain_flow_rate=r, duration=d)
elif d is None and h is not None:
d = h / r * 1000 / 6
if user.r_720_1:
d = 720
t = 1
if any((h, d, t)):
if all((d, t)):
elif all((d, h)):
t = idf.get_return_period(h, d)
print(f'The return period is {t:0.1f} years.')
elif all((h, t)):
d = idf.get_duration(h, t)
print(f'The duration is {d:0.1f} minutes.')
print(f'Resultierende Regenhöhe h_N(T_n={t:0.1f}a, D={d:0.1f}min) = {idf.depth_of_rainfall(d, t):0.2f} mm')
f'Resultierende Regenspende r_N(T_n={t:0.1f}a, D={d:0.1f}min) = {idf.rain_flow_rate(d, t):0.2f} L/(s*ha)')
# --------------------------------------------------
if user.plot:
fig, ax = idf.curve_figure()
plot_fn = out / f'{prefix}curves_plot.png'
fig.savefig(plot_fn, dpi=260)
print(f'Created the IDF-curves-plot and saved the file as "{plot_fn}".')
# --------------------------------------------------
if user.export_table:
table = idf.result_table(add_names=True)
table_fn = out / f'{prefix}table.csv'
table.to_csv(table_fn, sep=';', decimal=',', float_format='%0.2f')
print(f'Created the IDF-curves-plot and saved the file as "{table_fn}".')