Source code for bio_rtd.peak_shapes

"""
Peak shapes based on mean residence time (`rt_mean` == first momentum)

For un-clipped peak, the integral over the peak over time == 1.
For un-clipped peak, first momentum == `rt_mean`.
"""

__version__ = '0.2'
__author__ = 'Jure Sencar'

import typing as _typing
import numpy as _np

from scipy import special as _special

from bio_rtd.logger import RtdLogger as _RtdLogger


[docs]def gaussian(t: _np.ndarray, rt_mean: float, sigma: float, logger: _typing.Union[_RtdLogger, None] = None) -> _np.ndarray: if logger: # warnings for unsuitable peak parameters if sigma < 4 * (t[1] - t[0]): logger.w("Gaussian peak: sigma < 4 * dt") if rt_mean + 3 * sigma > t[-1]: logger.w("Gaussian peak: rt_mean + 3 * sigma > t[-1]") if rt_mean - 3 * sigma < t[0]: logger.w("Gaussian peak: rt_mean - 3 * sigma < t[0]") p = - (t - rt_mean) ** 2 / 2 / sigma ** 2 p[p > 700] = 700 p[p < -700] = -700 p = _np.exp(p) / sigma / _np.sqrt(2 * _np.pi) if logger: _report_suspicious_peak_shape(t, p, rt_mean, logger) return p
[docs]def emg(t, rt_mean, sigma, skew, logger: _typing.Union[_RtdLogger, None] = None): # first momentum of emg = rt_mean = t0 + 1 / skew t0 = rt_mean - 1 / skew # exp argument p = skew / 2 * (2 * (t0 - t) + skew * sigma ** 2) if logger: # warnings for unsuitable peak parameters if sigma < 4 * (t[1] - t[0]): logger.w("EMG peak: sigma < 4 * dt") if rt_mean + 3 * sigma > t[-1]: logger.w("EMG peak: rt_mean + 3 * sigma > t[-1]") if t0 < 3 * sigma: logger.w("EMG peak: t0 < 3 * sigma; t0 = rt_mean - 1 / skew") if skew < 1 / 40: logger.w("EMG peak: skew < 1/40") if skew > 10: logger.w("EMG peak: skew > 10") if _np.any(p > 200): logger.w("EMG peak: exp argument (p) > 200") if _np.any(p < -200): logger.w("EMG peak: exp argument (p) < -200") p[p > 700] = 700 p[p < -700] = -700 # curve p = _np.exp(p) * _special.erfc((t0 - t + skew * sigma ** 2) / (2 ** 0.5 * sigma)) * skew / 2 if logger: _report_suspicious_peak_shape(t, p, rt_mean, logger) return p
[docs]def skew_normal(t, rt_mean, sigma, skew, logger: _typing.Union[_RtdLogger, None] = None): if logger: # warnings for unsuitable peak parameters if sigma < 4 * (t[1] - t[0]): logger.w("Skewed normal peak: sigma < 4 * dt") if rt_mean + 3 * sigma > t[-1]: logger.w("Skewed normal peak: rt_mean + 3 * sigma > t[-1]") if rt_mean < 3 * sigma: logger.w("Skewed normal peak: rt_mean < 3 * sigma") if skew < -20: logger.w("Skewed normal peak: skew < -20") if skew > 20: logger.w("Skewed normal peak: skew > 20") # rt_mean = t0 + sigma * np.sqrt(2 / np.pi) * skew / (1 + skew**2) t0 = rt_mean - sigma * _np.sqrt(2 / _np.pi) * skew / _np.sqrt(1 + skew ** 2) # skew x = (t - t0) / sigma p = 2 * gaussian(t, t0, sigma) * 1 / 2 * (1 + _special.erf(skew * x / _np.sqrt(2))) if logger: _report_suspicious_peak_shape(t, p, rt_mean, logger) return p
[docs]def tanks_in_series(t: _np.ndarray, rt_mean: float, n_tanks: int, logger: _typing.Union[_RtdLogger, None] = None) -> _np.ndarray: """`rt_mean` is for entire unit operation (all tanks together)""" if logger: # warnings for unsuitable peak parameters if rt_mean > t[-1] / 4: logger.w("Tanks in series peak: rt_mean > t[-1] / 4") if t[0] > 0: logger.e("Tanks in series peak: Initial time point > 0") if n_tanks < 1: logger.e("Tanks in series peak: n_tanks " + str(n_tanks) + " (< 1)") if n_tanks > 50: logger.w("Tanks in series peak: n_tanks " + str(n_tanks) + " (> 50)") if n_tanks == 1: p = _np.exp(_np.clip(-t / rt_mean, -100, 0)) / rt_mean else: p = t ** (n_tanks - 1) / _np.math.factorial(n_tanks - 1) / (rt_mean / n_tanks) ** n_tanks \ * _np.exp(_np.clip(-t / rt_mean * n_tanks, -100, 0)) if logger: _report_suspicious_peak_shape(t, p, rt_mean, logger, ignore_min_start=True) return p
def _report_suspicious_peak_shape( t: _np.ndarray, p: _np.ndarray, rt_mean: float, logger: _typing.Union[_RtdLogger, None], ignore_min_start=False ): dt = t[1] - t[0] # check normalization s = p.sum() * dt if abs(s - 1) > 0.1: logger.e("Peak shape: integral: " + str(s)) elif abs(s - 1) > 0.01: logger.w("Peak shape: integral: " + str(s)) # check values at edges if not ignore_min_start: rel_start = p[0] / p.max() if rel_start > 0.05: logger.e("Peak shape: relative value at start: " + str(rel_start)) elif rel_start > 0.001: logger.w("Peak shape: relative value at start: " + str(rel_start)) rel_end = p[-1] / p.max() if rel_end > 0.05: logger.e("Peak shape: relative value at end: " + str(rel_end)) elif rel_end > 0.001: logger.w("Peak shape: relative value at end: " + str(rel_end)) # check rt_mean p_rt_mean = _np.sum(t * p) * dt rel_diff = abs(rt_mean - p_rt_mean) / rt_mean if rel_diff > 0.1: logger.e("Peak shape: relative difference in rt_mean: " + str(rel_diff)) elif rel_diff > 0.01: logger.w("Peak shape: relative difference in rt_mean: " + str(rel_diff))