Source code for idf_analysis.little_helpers

import datetime

import pandas as pd

from .definitions import COL


[docs] def delta2min(time_delta): """ convert timedelta to float in minutes Args: time_delta (pandas.Timedelta, pandas.DateOffset): Returns: float: the timedelta in minutes """ if isinstance(time_delta, pd.DateOffset): time_delta = pd.Timedelta(time_delta) return int(time_delta.total_seconds() / 60)
[docs] def minutes_readable(minutes): """ convert the duration in minutes to a more readable form Args: minutes (float | int): duration in minutes Returns: str: duration as a string """ if minutes <= 60: return f'{minutes:0.0f} min' elif 60 < minutes < 60 * 24: minutes /= 60 if minutes % 1: fmt = '{:0.1f} h' else: fmt = '{:0.0f} h' return fmt.format(minutes) elif 60 * 24 <= minutes: minutes /= 60 * 24 if minutes % 1: fmt = '{:0.1f} d' else: fmt = '{:0.0f} d' return fmt.format(minutes) else: return str(minutes)
[docs] def duration_steps_readable(durations): """ convert the durations to a more readable form Args: durations (list[int | float]): in minutes Returns: list[str]: of the readable duration list """ return [minutes_readable(i) for i in durations]
[docs] def height2rate(height_of_rainfall, duration): """ calculate the specific rain flow rate in [l/(s*ha)] if 2 array-like parameters are give, a element-wise calculation will be made. So the length of the array must be the same. Args: height_of_rainfall (float | np.ndarray | pd.Series): height_of_rainfall: in [mm] duration (float | np.ndarray | pd.Series): in minutes Returns: float | np.ndarray | pd.Series: specific rain flow rate in [l/(s*ha)] """ return height_of_rainfall / duration * (1000 / 6)
[docs] def rate2height(rain_flow_rate, duration): """ convert the rain flow rate to the height of rainfall in [mm] if 2 array-like parameters are give, a element-wise calculation will be made. So the length of the array must be the same. Args: rain_flow_rate (float | np.ndarray | pd.Series): in [l/(s*ha)] duration (float | np.ndarray | pd.Series): in minutes Returns: float | np.ndarray | pd.Series: height of rainfall in [mm] """ return rain_flow_rate * duration / (1000 / 6)
def get_progress_bar(iterator, desc=None): try: from tqdm.auto import tqdm return tqdm(iterator, desc=desc) except ModuleNotFoundError: return iterator def frame_looper(size, columns, label='return periods'): if size > 30000: # if > 3 weeks, use a progressbar return get_progress_bar(columns, desc=f'calculating {label} data-frame') else: return columns def event_caption(event, unit='mm'): caption = 'rain event\n' if (COL.START in event) and (COL.END in event): caption += f'between {event[COL.START]:%Y-%m-%d %H:%M} and ' if f'{event[COL.START]:%Y-%m-%d}' == f'{event[COL.END]:%Y-%m-%d}': caption += f'{event[COL.END]:%H:%M}\n' elif f'{event[COL.START]:%Y-%m-}' == f'{event[COL.END]:%Y-%m-}': caption += f'{event[COL.END]:%d %H:%M}\n' else: caption += f'{event[COL.END]:%Y-%m-%d %H:%M}\n' if COL.LP in event: caption += f'with a total sum of {event[COL.LP]:0.1f} {unit}\n' if COL.DUR in event: caption += f' and a duration of {timedelta_readable(event[COL.DUR])}' caption += '.\n' if COL.MAX_PERIOD in event: caption += f' The maximum return period was {return_period_formatter(event[COL.MAX_PERIOD])} a\n' if COL.MAX_PERIOD_DURATION in event: caption += f' at a duration of {minutes_readable(event[COL.MAX_PERIOD_DURATION])}.' return caption def event_caption_ger(event, unit='mm'): caption = 'Regenereignis\n' if (COL.START in event) and (COL.END in event): caption += f'zwischen {event[COL.START]:%Y-%m-%d %H:%M} und ' if f'{event[COL.START]:%Y-%m-%d}' == f'{event[COL.END]:%Y-%m-%d}': caption += f'{event[COL.END]:%H:%M}\n' elif f'{event[COL.START]:%Y-%m-}' == f'{event[COL.END]:%Y-%m-}': caption += f'{event[COL.END]:%d %H:%M}\n' else: caption += f'{event[COL.END]:%Y-%m-%d %H:%M}\n' if COL.LP in event: caption += f'mit einer Regensumme von {event[COL.LP]:0.1f} {unit}\n' if COL.DUR in event: caption += f' und einer Dauer von {timedelta_readable(event[COL.DUR])}' caption += '.\n' if COL.MAX_PERIOD in event: caption += f' Die maximale Wiederkehrperiode war {return_period_formatter(event[COL.MAX_PERIOD])} a\n' if COL.MAX_PERIOD_DURATION in event: caption += f' bei einer Dauerstufe von {minutes_readable(event[COL.MAX_PERIOD_DURATION])}.' return caption def return_period_formatter(t): if t < 1: return '< 1' elif t > 200: return '$\\gg$ 100' elif t > 100: return '> 100' else: return f'{t:0.1f}'
[docs] def timedelta_components_plus(td, min_freq='min'): """Schaltjahre nicht miteinbezogen""" l = [] if isinstance(td, datetime.timedelta): td = pd.to_timedelta(td) # years, weeks days_year = 365 days_week = 7 for component, value in td.round(min_freq).components._asdict().items(): if component == 'days': years, value = value // days_year, value % days_year l.append([int(years), 'years']) value -= years // 4 weeks, value = value // days_week, value % days_week l.append([int(weeks), 'weeks']) l.append([value, component]) return l
def timedelta_components_readable(l, short=False, sep=', '): result = [] for value, label_component in l: if value > 0: if short: unit_sep = '' unit = label_component[0] else: unit_sep = ' ' unit = label_component if value > 1: unit = label_component[:-1] result.append(f'{value}{unit_sep}{unit}') s = sep.join(result) if not short: # replace last "," with "and" s = ' and '.join(s.rsplit(sep, 1)) return s
[docs] def timedelta_readable(td, min_freq='min', short=False, sep=', '): """Schaltjahre nicht miteinbezogen""" return timedelta_components_readable(timedelta_components_plus(td, min_freq), short=short, sep=sep)
def timedelta_readable2(d1, d2, min_freq='min', short=False, sep=', '): td = d2 - d1 years = None if td > pd.Timedelta(days=365): d2_new = d2.replace(year=d1.year) if d2_new < d1: d2_new = d2_new.replace(year=d1.year + 1) years = d2.year - d2_new.year td = d2_new - d1 l = timedelta_components_plus(td, min_freq) if years is not None: l[0][0] = years return timedelta_components_readable(l, short=short, sep=sep)