"""Module with abstract classes."""
__all__ = ['Inlet', 'UnitOperation',
'RtdModel', 'UserInterface',
'PDF', 'ChromatographyLoadBreakthrough',
'ParameterSetList']
__version__ = '0.7'
__author__ = 'Jure Sencar'
import typing as _typing
import numpy as _np
from abc import ABC as _ABC, abstractmethod as _abstractmethod
from collections import OrderedDict as _OrderedDict
from bio_rtd import logger as _logger
from bio_rtd import adj_par as _adj_par
from bio_rtd import utils as _utils
[docs]class DefaultLoggerLogic(_ABC):
# noinspection PyProtectedMember
"""Default binding of the `RtdLogger` to a class.
The class holds a reference to a `RtdLogger` logger instance
and plants a data tree in the logger.
Parameters
----------
logger_parent_id
Custom unique id that belongs to the instance of the class.
It is used to plant a data tree in the `RtdLogger`.
Examples
--------
>>> logger_parent_id = "parent_unit_operation"
>>> l = DefaultLoggerLogic(logger_parent_id)
>>> isinstance(l.log, _logger.DefaultLogger)
True
>>> l.log.e("Error Description") # log error
Traceback (most recent call last):
RuntimeError: Error Description
>>> l.log.w("Warning Description") # log waring
Warning Description
>>> l.log.i("Info") # log info
>>> l.log.log_data = True
>>> l.log.log_level = _logger.RtdLogger.DEBUG
>>> l.log.i_data(l._log_tree, "a", 3) # store value in logger
>>> l.log.d_data(l._log_tree, "b", 7) # store at DEBUG level
>>> l.log.get_data_tree(logger_parent_id)["b"]
7
>>> l.log = _logger.StrictLogger()
>>> l.log.w("Warning Info")
Traceback (most recent call last):
RuntimeError: Warning Info
Notes
-----
See the documentation of the `RtdLogger`.
"""
def __init__(self, logger_parent_id: str):
self._instance_id = logger_parent_id
self._logger: _typing.Union[_logger.RtdLogger, None] = None
self._log_tree = dict() # place to store logged data
@property
def log(self) -> _logger.RtdLogger:
"""Logger.
If logger is not set, then a `DefaultLogger` is instantiated.
Setter also plants a data tree into passed logger.
"""
if self._logger is None:
self.log = _logger.DefaultLogger() # init default logger
return self._logger
@log.setter
def log(self, logger: _logger.RtdLogger):
self._logger = logger
self._logger.set_data_tree(self._instance_id, self._log_tree)
[docs] def set_logger_from_parent(self, parent_id: str, logger: _logger.RtdLogger):
"""Inherit logger from parent.
Parameters
----------
parent_id
logger
"""
self._logger = logger
self._logger.set_data_tree(f"{parent_id}/{self._instance_id}",
self._log_tree)
[docs]class Inlet(DefaultLoggerLogic, _ABC):
"""Generates starting flow rate and concentration profiles.
Parameters
----------
t
Simulation time vector
Starts with 0 and has a constant time step.
species_list
List with names of simulating process fluid species.
inlet_id
Unique identifier of an instance. It is stored in :attr:`uo_id`.
gui_title
Readable title of an instance.
Attributes
----------
species_list : list of str
List with names of simulating process fluid species.
uo_id : str
Unique identifier of the :class:`Inlet` instance.
gui_title : str
Readable title of the :class:`Inlet` instance.
adj_par_list : list of :class:`bio_rtd.adj_par.AdjustableParameter`
List of adjustable parameters exposed to the GUI.
"""
def __init__(self, t: _np.ndarray, species_list: _typing.Sequence[str],
inlet_id: str, gui_title: str):
super().__init__(inlet_id) # logger
# Assert proper time vector.
assert t[0] == 0, "t should start with 0"
assert len(t.shape) == 1, "t should be a 1D np.ndarray"
self._t = _np.linspace(0, t[-1], t.size)
self._dt = t[-1] / (t.size - 1)
assert _np.all(_np.abs(self._t - t) < 0.001 * self._dt / t.size), \
"t should have a fixed step size"
# Species
self.species_list = species_list
self._n_species = len(self.species_list)
# Strings
self.uo_id: str = inlet_id
self.gui_title = gui_title
# Placeholders
self.place_inlet_before_uo_id: _typing.Optional[str] = None
self.adj_par_list: _typing.Sequence[_adj_par.AdjustableParameter] = ()
# Outputs
self._f_out = _np.zeros_like(t)
self._c_out = _np.zeros([self._n_species, t.size])
@_abstractmethod
def _refresh(self): # pragma: no cover
"""Re-calculates `self._f_out` and `self._c_out`."""
pass
[docs] def get_result(self) -> _typing.Tuple[_np.ndarray, _np.ndarray]:
"""Get flow rate and concentration profiles.
Returns
-------
f_out
Flow rate profile.
c_out
Concentration profile.
"""
return self._f_out, self._c_out
[docs] def get_t(self) -> _np.ndarray:
"""Get simulation time vector."""
return self._t
[docs] def get_n_species(self) -> int:
"""Get number of process fluid species."""
return self._n_species
[docs]class UnitOperation(DefaultLoggerLogic, _ABC):
"""Processes flow rate and concentration profiles.
Parameters
----------
t
Global simulation time vector.
It must starts with 0 and have a constant time step.
uo_id
Unique identifier.
gui_title
Readable title for GUI.
Attributes
----------
adj_par_list
List of adjustable parameters exposed to the GUI.
gui_hidden
Hide the of the unit operation (default False).
discard_inlet_until_t
Discard inlet until given time.
discard_inlet_until_min_c
Discard inlet until given concentration is reached.
discard_inlet_until_min_c_rel
Discard inlet until given concentration relative to is reached.
Specified concentration is relative to the max concentration.
discard_inlet_n_cycles
Discard first n cycles of the periodic inlet flow rate profile.
discard_outlet_until_t
Discard outlet until given time.
discard_outlet_until_min_c
Discard outlet until given concentration is reached.
discard_outlet_until_min_c_rel
Discard outlet until given concentration relative to is reached.
Specified concentration is relative to the max concentration.
discard_outlet_n_cycles
Discard first n cycles of the periodic outlet flow rate profile.
"""
def __init__(self, t: _np.ndarray, uo_id: str, gui_title: str = ""):
super().__init__(uo_id) # logger
# simulation time vector
assert t[0] == 0, "Time vector must start with 0"
self._t = t
self._dt = t[-1] / (t.size - 1) # time step
# id and title
self.uo_id = uo_id
self.gui_title = gui_title
# hide unit operation from plots
self.gui_hidden = False
# adjustable parameter list
self.adj_par_list: _typing.Sequence[_adj_par.AdjustableParameter] = ()
"""Settings"""
# start-up phase (optional initial delay)
self.discard_inlet_until_t = -1
self.discard_inlet_until_min_c: _np.ndarray = _np.array([])
self.discard_inlet_until_min_c_rel: _np.ndarray = _np.array([])
self.discard_inlet_n_cycles = -1
# shout-down phase (optional early stop)
self.discard_outlet_until_t = -1
self.discard_outlet_until_min_c: _np.ndarray = _np.array([])
self.discard_outlet_until_min_c_rel: _np.ndarray = _np.array([])
self.discard_outlet_n_cycles = -1
# placeholders, populated during simulation
self._c: _np.ndarray = _np.array([]) # concentration profiles
self._f: _np.array = _np.array([]) # flow rate profile
self._n_species: int = 0 # number of species
def _assert_valid_species_list(self, species: _typing.Sequence[int]):
"""Species indexes start with 0.
List must be ordered in ascending order (to prevent bugs).
List must have unique values (again, to prevent bugs).
"""
if len(species) == 0:
self.log.w("Species list is empty")
else:
assert max(species) < self._n_species, \
"Index of species should be less than number of species"
assert min(species) >= 0, \
"Index of species should not be negative"
assert len(set(species)) == len(species), \
"Vector with species should not have duplicate values"
assert list(set(species)) == species, \
"Values in vector with species must be ascending"
def _is_flow_box_shaped(self) -> bool:
"""Constant profile with optional leading or trailing zeros."""
assert _np.all(self._f >= 0), "Flow rate is negative!!!"
if _np.all(self._f == 0):
self.log.w("Flow rate is 0!")
return False
max_flow_start, max_flow_end = \
_utils.vectors.true_start_and_end(self._f == self._f.max())
if _np.any(self._f[:max_flow_start] > 0):
return False
elif _np.any(self._f[max_flow_end:] > 0):
return False
elif _np.any(self._f[max_flow_start:max_flow_end] != self._f.max()):
return False
else:
return True
def _i_flow_on(self) -> _typing.Sequence[int]:
"""Detect when the flow rate switches from off to on.
In case of periodic flow rate, the function returns all
switching events.
Returns
-------
i_interval_start : Sequence[int]
Indexes of time points at which the flow rate turns on.
Each index corresponds to a leading non-zero value.
"""
if _np.all(self._f == 0):
self.log.w("Flow rate is 0!")
return []
assert _np.all(self._f[self._f != 0] == self._f.max()), \
"flow rate must have a constant 'on' value"
return list(_np.argwhere(_np.diff(self._f, prepend=0) > 0).flatten())
def _assert_periodic_flow(self) -> (_typing.Sequence[int], int, float):
"""Assert and provides info about periodic flow rate.
Only last period is allowed to be shorter than others.
Returns
-------
i_flow_start_list : Sequence[int]
Indexes of time-points at which the flow rate gets turned on
i_flow_on_duration : int
Number of time-points of flow 'on' interval.
t_cycle_duration
Duration of average cycle ('on' + 'off' interval)
"""
# Get all flow on positions.
i_flow_start_list = self._i_flow_on()
assert len(i_flow_start_list) > 0, "Flow is 0"
assert len(i_flow_start_list) > 1, \
"Periodic flow must have at least 2 cycles"
# Get average cycle duration.
t_cycle_duration = _np.mean(_np.diff(i_flow_start_list)) * self._dt
assert _np.all(_np.abs(
_np.diff(i_flow_start_list) - t_cycle_duration / self._dt
) <= 1), "Periodic flow rate must have fixed cycle duration"
# Assert periods.
i_flow_on_duration = \
_utils.vectors.true_start(self._f[i_flow_start_list[0]:] == 0)
on_mask = _np.zeros_like(self._f, dtype=bool)
for i in range(len(i_flow_start_list)):
# Get next stop.
if i + 1 == len(i_flow_start_list):
i_cycle_flow_on_duration = _utils.vectors.true_end(
self._f[i_flow_start_list[i]:] > 0
)
else:
i_cycle_flow_on_duration = _utils.vectors.true_start(
self._f[i_flow_start_list[i]:] == 0
)
i_flow_off = i_flow_start_list[i] + i_cycle_flow_on_duration
on_mask[i_flow_start_list[i]:i_flow_off] = True
if i + 1 == len(i_flow_start_list):
# allow last cycle to be clipped
assert i_cycle_flow_on_duration - i_flow_on_duration <= 1
else:
# allow to be 1 time step off the first cycle
assert abs(i_cycle_flow_on_duration - i_flow_on_duration) <= 1
# Flow can be either off or on at the constant value.
assert _np.all(self._f[on_mask] == self._f.max())
assert _np.all(self._f[~on_mask] == 0)
return i_flow_start_list, i_flow_on_duration, t_cycle_duration
def _estimate_steady_state_mean_f(self) -> (float, float):
"""Estimate mean flow rate in a period of periodic flow rate.
Uses `self._assert_periodic_flow()` to get data about flow rate.
Returns
-------
(float, float)
f_mean
Mean flow rate in one cycle.
t_cycle_duration
Duration of a cycle ('on' + 'off' interval).
"""
_, i_flow_on_d, t_cycle_duration = self._assert_periodic_flow()
if i_flow_on_d < 6:
self.log.w(
f"Flow is turned on for {i_flow_on_d} (< 6) time points")
f_mean = self._f.max() * i_flow_on_d * self._dt / t_cycle_duration
return f_mean, t_cycle_duration
def _estimate_steady_state_mean_c(
self,
species: _typing.Optional[_typing.Sequence[int]] = None
) -> _np.ndarray:
"""Estimate mean concentration after start-up phase.
In case of box shaped flow rate, the steady-state concentration
is estimated as an average concentration between the first and
the last crossing of 90 % of the max concentration of the sum
across relevant species.
In case of periodic flow rate, the steady-state concentration
is estimated as an average concentration during the cycle with
the largest average concentration.
If possible, define the model so that this function is not
needed; estimates can in general lead to unintuitive results.
Parameters
----------
species : list of int
List of indexes of relevant species. (indexes start with 0).
If not specified, all species are selected.
Returns
-------
(float, float)
f_mean
Mean flow rate in one cycle.
t_cycle_duration
Duration of a cycle ('on' + 'off' interval).
"""
if species is None:
species = range(self._n_species)
else:
self._assert_valid_species_list(species)
assert self._c.size > 0
if _np.all(self._c[species] == 0):
self.log.w("Concentration is zero for all relevant components")
load_c_ss = _np.zeros([len(species), 1])
self.log.i_data(self._log_tree, 'load_c_ss', load_c_ss)
return load_c_ss
if self._is_flow_box_shaped():
c = self._c[species][:, self._f > 0]
i_st, i_end = _utils.vectors.true_start_and_end(
c.sum(0) >= c.sum(0).max() * 0.9
)
self.log.w(f"Steady-state concentration is being estimated"
f" as an average concentration withing 90 %"
f" between the first and the last crossing of 90 %"
f" of the max concentration for sum across"
f" species: {species}")
load_c_ss = _np.mean(c[:, i_st:i_end], 1)[:, _np.newaxis]
self.log.i_data(self._log_tree, 'load_c_ss', load_c_ss)
return load_c_ss
else:
# Get info about periodic flow rate.
i_flow_on_start_list, i_flow_on_duration, t_cycle_duration = \
self._assert_periodic_flow()
self.log.w(f"Steady-state concentration is being estimated"
f" as an average concentration in the cycle"
f" with the largest average concentration"
f" summed across species: {species}")
m_max = 0
c_max = _np.zeros([len(species), 1])
for i, i_st in enumerate(i_flow_on_start_list):
if i_st + i_flow_on_duration >= self._t.size:
continue
_c_max = self._c[species, i_st:i_st + i_flow_on_duration] \
.mean(1)
if _c_max.sum() > m_max:
c_max = _c_max
m_max = _c_max.sum()
# Add dimension: (n_species,) -> (n_species, 1)
load_c_ss = c_max[:, _np.newaxis]
self.log.i_data(self._log_tree, 'load_c_ss', load_c_ss)
return load_c_ss
def _ensure_single_non_negative_parameter(self, log_level_multiple: int,
log_level_none: int, **kwargs):
"""Check if one or multiple parameters are non-negative.
Parameters
----------
log_level_multiple : int
Log level at which the function reports to `RtdLogger` in
case of multiple non-negative parameters.
log_level_none : int
Log level at which the function reports to `RtdLogger` in
case of no non-negative parameters.
"""
non_null_keys = [key for key in kwargs.keys() if kwargs.get(key) >= 0]
if len(non_null_keys) > 1:
self.log.log(
log_level_multiple,
f"Only one of the parameters: {non_null_keys}"
f" should be defined!")
elif len(non_null_keys) == 0:
self.log.log(
log_level_none,
f"One of the parameters: {kwargs.keys()} should be defined!")
def _cut_start_of_c_and_f(self, outlet: bool = False):
"""Trim beginning of flow rate and concentration profiles
Parameters
----------
outlet
Current profiles have already been processed by this
`UnitOperation`.
"""
if self._f.max() == 0:
self.log.w("Flow rate is 0!")
self._f *= 0
self._c *= 0
return
if outlet:
discard_until_t = self.discard_outlet_until_t
discard_n_cycles = self.discard_outlet_n_cycles
discard_until_min_c = self.discard_outlet_until_min_c
discard_until_min_c_rel = self.discard_outlet_until_min_c_rel
else:
discard_until_t = self.discard_inlet_until_t
discard_n_cycles = self.discard_inlet_n_cycles
discard_until_min_c = self.discard_inlet_until_min_c
discard_until_min_c_rel = self.discard_inlet_until_min_c_rel
def get_cycle_start(_i, _i_start_list, _i_duration):
for _i_start in _i_start_list:
if _i <= _i_start + _i_duration:
return _i_start
i_init_cut = -1
_aux_has_min_c = discard_until_min_c.size > 0 \
and discard_until_min_c.max() > 0
_aux_has_min_c_rel = discard_until_min_c_rel.size > 0 \
and discard_until_min_c_rel.max() > 0
if _aux_has_min_c:
assert discard_until_min_c.size == self._n_species
discard_until_min_c = \
discard_until_min_c.reshape(self._n_species, 1)
if _aux_has_min_c_rel:
assert discard_until_min_c_rel.size == self._n_species
discard_until_min_c_rel = \
discard_until_min_c_rel.reshape(self._n_species, 1)
self._ensure_single_non_negative_parameter(
self.log.ERROR, 0,
discard_until_t=discard_until_t,
discard_until_min_c=1 if _aux_has_min_c else -1,
discard_until_min_c_rel=1 if _aux_has_min_c_rel else -1,
discard_n_cycles=discard_n_cycles,
)
if discard_until_t > 0:
if self._is_flow_box_shaped():
i_init_cut = int(discard_until_t / self._dt)
else:
i_start_list, i_duration, _ = self._assert_periodic_flow()
i_init_cut = get_cycle_start(int(discard_until_t / self._dt),
i_start_list, i_duration)
elif _aux_has_min_c or _aux_has_min_c_rel:
if _aux_has_min_c:
lim = _np.all(self._c >= discard_until_min_c, 0) \
* (self._f > 0)
else:
c_lim = discard_until_min_c_rel \
* self._c.max(1)[:, _np.newaxis]
lim = _np.all(self._c >= c_lim, 0) * (self._f > 0)
if _np.any(lim):
if self._is_flow_box_shaped():
i_init_cut = _utils.vectors.true_start(lim)
else:
i_start_list, i_duration, _ = self._assert_periodic_flow()
i_init_cut = get_cycle_start(
_utils.vectors.true_start(lim),
i_start_list, i_duration)
else:
if _aux_has_min_c:
c = discard_until_min_c
else:
c = discard_until_min_c_rel
self.log.w(f"Threshold concentration profile"
f" {c} was never reached!")
self._f[:] = 0
self._c[:] = 0
return
elif discard_n_cycles > 0:
i_flow_on_start_list, _, _ = self._assert_periodic_flow()
if len(i_flow_on_start_list) > discard_n_cycles:
i_init_cut = i_flow_on_start_list[discard_n_cycles]
else:
self.log.w(f"All the cycles {len(i_flow_on_start_list)}"
f" are removed!")
self._f[:] = 0
self._c[:] = 0
return
if i_init_cut > 0:
self._f[:i_init_cut] = 0
self._c[:, :i_init_cut] = 0
[docs] def evaluate(self, f_in: _np.array, c_in: _np.ndarray
) -> _typing.Tuple[_np.ndarray, _np.ndarray]:
"""Evaluate the propagation through the unit operation.
Parameters
----------
c_in
Inlet concentration profile with shape
(n_species, n_time_steps).
f_in
Inlet flow rate profile with shape (n_time_steps,).
Returns
-------
f_out
Outlet flow rate profile.
c_out
Outlet concentration profile.
"""
# Make a copy of inlet profiles, so we don't modify the originals.
self._f = f_in.copy()
self._c = c_in.copy()
# Clear log.
self._log_tree.clear()
# Make sure concentration is not negative.
if _np.any(self._c < 0):
raise AssertionError(
f"Negative concentration at unit operation: {self.uo_id}")
# Make sure flow is not negative.
if _np.any(self._f < 0):
raise AssertionError(
f"Negative flow at unit operation: {self.uo_id}")
# Make sure flow is still there.
if self._f.max() == 0:
self.log.w(f"No inlet flow at unit operation: {self.uo_id}")
return self._f * 0, self._c * 0
# Calculate number of tracking species.
self._n_species = self._c.shape[0]
# Cut off start if required.
self._cut_start_of_c_and_f(outlet=False)
# (Re-)calculate the outlet profiles.
self._calculate()
# Cutoff start of outlet if needed.
self._cut_start_of_c_and_f(outlet=True)
return self._f, self._c
@_abstractmethod
def _calculate(self): # pragma: no cover
"""Re-calculates the c_out and f_out from c_in and f_in.
Function should use and modify `self._f` and `self._c`.
"""
pass
[docs] def get_result(self) -> _typing.Tuple[_np.ndarray, _np.ndarray]:
"""Returns existing flow rate and concentration profiles.
Returns
-------
f_out
Outlet flow rate profile.
c_out
Outlet concentration profile.
"""
return self._f, self._c
[docs]class ParameterSetList(_ABC):
"""Abstract class for asserting keys in key-value pairs.
Key-value pairs passed to `assert_and_get_provided_kv_pairs` should
contain all keys from (at least) one of the key groups in
`_possible_key_groups`. The method returns key-value pars with keys
from that group and all passed keys that can be also found in
`_optional_keys`.
Examples
--------
>>> class DummyClass(ParameterSetList):
... _possible_key_groups = [['par_1'], ['par_2a', 'par_2b']]
... _optional_keys = ['key_plus_1', 'key_plus_2']
>>>
>>> dc = DummyClass()
>>> dc.assert_and_get_provided_kv_pairs(par_1=1, par_2a=2)
{'par_1': 1}
>>> dc.assert_and_get_provided_kv_pairs(par_2a=1, par_2b=2,
... key_plus_1=3, key_plus_9=2)
{'par_2a': 1, 'par_2b': 2, 'key_plus_1': 3}
>>> dc.assert_and_get_provided_kv_pairs(
... key_plus_1=1) # doctest: +ELLIPSIS
Traceback (most recent call last):
KeyError: "Keys ... do not contain any of the required groups: ...
"""
@property
@_abstractmethod
def _possible_key_groups(self) \
-> _typing.Sequence[_typing.Sequence[str]]: # pragma: no cover
"""Possible key combinations.
To override it in a new class, simply define an attribute, e.g.:
_possible_key_groups = [['v_void'], ['f', 'rt_mean']]
"""
raise NotImplementedError
@property
@_abstractmethod
def _optional_keys(self) -> _typing.Sequence[str]: # pragma: no cover
"""Optional additional keys.
To override it in a new class, simply define an attribute, e.g.:
_optional_keys = ['skew', 't_delay']
"""
raise NotImplementedError
[docs] def assert_and_get_provided_kv_pairs(self, **kwargs) -> dict:
"""
Parameters
----------
kwargs
Inputs to `calc_pdf(**kwargs)` function
Returns
-------
dict
Filtered `kwargs` so the keys contain first possible key
group in `_possible_key_groups` and any number of optional
keys from `_optional_keys`.
Raises
------
ValueError
If `**kwargs` do not contain keys from any of the groups
in `_possible_key_groups`.
"""
for group in self._possible_key_groups:
if any([key not in kwargs.keys() for key in group]):
continue
else:
# Get keys from groups.
d = {key: kwargs.get(key) for key in group}
# Get optional keys.
d_extra = {key: kwargs.get(key)
for key in self._optional_keys
if key in kwargs.keys()}
# Combine and return.
return {**d, **d_extra}
raise KeyError(f"Keys {list(kwargs.keys())} do not contain any of"
f" the required groups: {self._possible_key_groups}")
[docs]class PDF(ParameterSetList, DefaultLoggerLogic, _ABC):
"""Abstract class for defining probability distribution functions.
Parameters
----------
t
Simulation time vector.
pdf_id
Unique identifier of the PDF instance.
Attributes
----------
trim_and_normalize
Trim edges of the peak by the threshold at the relative signal
specified by `cutoff_relative_to_max`. Default = True.
cutoff_relative_to_max
Cutoff as a share of max value of the pdf (default 0.0001).
It is defined to avoid very long tails of the distribution.
Methods
-------
get_p()
Get calculated PDF.
update_pdf(**kwargs)
Re-calculate PDF based on specified parameters.
Abstract Methods
----------------
_calc_pdf(kw_pars: dict)
Calculate new pdf for a given set of parameters. The keys of the
`kw_pars` include keys from one of the group in
`_possible_key_groups` and any optional subset of keys from
`_optional_keys`.
"""
def __init__(self, t: _np.ndarray, pdf_id: str = ""):
super().__init__(pdf_id)
assert t[0] == 0
assert t[-1] > 0
self._dt = t[-1] / (t.size - 1)
self._t_steps_max = t.size
# apply cutoff
self.trim_and_normalize = True
self.cutoff_relative_to_max = 0.0001
# placeholder for the result of the pdf calculation
self._pdf: _np.array = _np.array([])
def _apply_cutoff_and_normalize(self):
# Get cutoff positions.
i_start, i_end = _utils.vectors.true_start_and_end(
self._pdf >= self.cutoff_relative_to_max * self._pdf.max()
)
# 0 at front.
self._pdf[:i_start] = 0
# Cut at end.
self._pdf = self._pdf[:i_end]
# Normalize.
self._pdf = self._pdf / self._pdf[:i_end].sum() / self._dt
[docs] def update_pdf(self, **kwargs):
"""Re-calculate PDF based on specified parameters.
Parameters
----------
kwargs
Should contain keys from one of the group in
`self._possible_key_groups`.
It may contain additional keys from `self._optional_keys`.
"""
kw = self.assert_and_get_provided_kv_pairs(**kwargs)
self._pdf = self._calc_pdf(kw)
if self.trim_and_normalize:
self._apply_cutoff_and_normalize()
[docs] @_abstractmethod
def _calc_pdf(self, kw_pars: dict) -> _np.ndarray: # pragma: no cover
"""Calculation of probability distribution.
Parameters
----------
kw_pars: dict
Key-value parameters.
Keys contain keys from one of the groups specified in
`self._possible_key_groups`.
Keys may contain additional keys from `self._optional_keys`.
"""
raise NotImplementedError
[docs] def get_p(self) -> _np.ndarray:
"""Get probability distribution.
Returns
-------
p: np.ndarray
Evaluated probability distribution function.
`sum(p * self._dt) == 1`
Corresponding time axis starts with 0 and has a fixed step
of self._dt.
"""
assert self._pdf.size > 0, f"PDF is empty. Make sure `update_pdf`" \
f" was called before `get_p`."
if self._pdf.size <= 5:
self.log.e("PDF should have a length of at least 5 time steps.")
return self._pdf
[docs]class ChromatographyLoadBreakthrough(ParameterSetList,
DefaultLoggerLogic, _ABC):
"""What parts of the load bind to the column.
Parameters
----------
bt_profile_id
Unique identifier of the PDF instance. Used for logs.
Notes
-----
See docstring of `ParameterSetList` for information about key
groups.
"""
def __init__(self, dt: float,
bt_profile_id: str = "ChromatographyLoadBreakthrough"):
super().__init__(bt_profile_id)
assert dt > 0
self._dt = dt
[docs] def update_btc_parameters(self, **kwargs) -> None:
"""Update binding dynamics for a given set of parameters."""
kw = self.assert_and_get_provided_kv_pairs(**kwargs)
self._update_btc_parameters(kw)
[docs] def calc_c_bound(self,
f_load: _np.ndarray,
c_load: _np.ndarray) -> _np.ndarray:
"""Calculates what parts of load bin to the column.
Parameters
---------
f_load
Load flow rate profile.
c_load
Load concentration profile. Concentration profile should
include only species which bind to the column.
Returns
-------
c_bound
Parts of the load that binds to the column during the
load step. `c_bound` has the same shape as `c_load`.
Notes
-----
This is default implementation. The user is welcome to override
this function in a custom child class.
"""
# All species are summed together.
m_cum_sum = _np.cumsum(
(c_load * f_load[_np.newaxis, :]).sum(0)) * self._dt
if m_cum_sum[-1] == 0: # empty concentration vector
return _np.zeros_like(c_load)
unbound_to_load_ratio = \
self._calc_unbound_to_load_ratio(m_cum_sum.flatten())
c_bound = c_load * (1 - unbound_to_load_ratio[_np.newaxis, :])
return c_bound
@_abstractmethod
def _update_btc_parameters(self,
kw_pars: dict) -> None: # pragma: no cover
"""Update binding dynamics for a given key-value set.
Parameters
----------
kw_pars: dict
Key-value parameters.
Keys contain keys from one of the groups specified in
`self._possible_key_groups` and may contain additional keys
from `self._optional_keys`.
"""
pass
@_abstractmethod
def _calc_unbound_to_load_ratio(self, loaded_material: _np.ndarray
) -> _np.ndarray: # pragma: no cover
"""Calculates what share of the load binds to the column.
Typical implementation would be just direct evaluation of the
breakthrough curve. See the `ConstantPatternSolution` subclass.
Parameters
----------
loaded_material
Cumulative sum of the amount of loaded material.
Returns
-------
bound_to_unbound_ratio
Ratio between the amount of captured material and the
amount of load material. `bound_to_unbound_ratio` has
the same size as `loaded_material`.
"""
pass
[docs] @_abstractmethod
def get_total_bc(self) -> float: # pragma: no cover
"""Total binding capacity
Useful for determining column utilization.
"""
pass
[docs]class RtdModel(DefaultLoggerLogic, _ABC):
"""Combines inlet and a train of unit operations into a model.
The logger assigned to the instance of RtdModel is propagated
throughout `inlet` and unit operations in `dsp_uo_chain`.
Parameters
----------
inlet
Inlet profile.
dsp_uo_chain
Sequence of unit operations. The sequence needs to be in order.
logger
Logger that the model uses for sending status messages and
storing intermediate data.
title
Title of the model.
desc
Description of the model.
Methods
-------
recalculate(start_at, on_update_callback)
Recalculates the process fluid propagation, starting at
`start_at` unit operation (-1 for inlet and entire process).
Callback function can be specified. It receives the index of
the just updated unit operation.
"""
def __init__(self,
inlet: Inlet,
dsp_uo_chain: _typing.Sequence[UnitOperation],
logger: _typing.Optional[_logger.RtdLogger] = None,
title: str = "RtdModel",
desc: str = ""):
super().__init__(title)
# Ensure unique `uo_id`s.
ids = [uo.uo_id for uo in dsp_uo_chain]
assert len(ids) == len(set(ids)), \
"Each unit operation must have a unique id (`uo_id`)"
# Bind parameters.
self.inlet = inlet
self.dsp_uo_chain = dsp_uo_chain
self.title = title
self.desc = desc
# Init data log tree with empty dict for each unit operation.
self._log_tree = _OrderedDict(
{inlet.uo_id: {}, **{uo.uo_id: {} for uo in dsp_uo_chain}})
self.log = logger if logger is not None else self.log
@DefaultLoggerLogic.log.setter
def log(self, logger: _logger.RtdLogger):
self._logger = logger
self._logger.set_data_tree(self._instance_id, self._log_tree)
# Pass logger to inlet and unit operations.
self.inlet.log = logger
for uo in self.dsp_uo_chain:
uo.log = logger
[docs] def get_dsp_uo(self, uo_id: str) -> UnitOperation:
"""Get reference to a `UnitOperation` with specified `uo_id`."""
for uo in self.dsp_uo_chain:
if uo.uo_id == uo_id:
return uo
raise KeyError(f"Unit operation `{uo_id}` not found.\n"
f"Available: {[uo.uo_id for uo in self.dsp_uo_chain]}")
[docs] def recalculate(self,
start_at: int = 0,
on_update_callback: _typing.Optional[
_typing.Callable[[int], None]
] = None):
"""Recalculate process fluid propagation.
Parameters
----------
start_at
The index of first unit operation that needs to be
re-evaluated. Default = 0.
If -1, then the inlet profile is also re-evaluated.
on_update_callback
Optional callback function which receives an integer.
The integer corresponds to the index of re-evaluated unit
operation (-1 for inlet).
This can serve as a trigger for updating UI after
re-evaluation of individual unit operations.
"""
# Evaluate inlet profile.
if start_at == -1:
self.inlet._refresh()
self._notify_updated(-1, on_update_callback)
start_at = 0
# Get outlet of previous unit operation.
if start_at == 0:
f, c = self.inlet.get_result()
else:
f, c = self.dsp_uo_chain[start_at - 1].get_result()
# Evaluate subsequent unit operations.
for i in range(start_at, len(self.dsp_uo_chain)):
f, c = self.dsp_uo_chain[i].evaluate(f, c)
self._notify_updated(i, on_update_callback)
def _notify_updated(self, uo_i: int, on_update_callback):
# Show INFO log.
if uo_i == -1:
uo = self.inlet
self.log.i("Inlet profile updated")
else:
uo = self.dsp_uo_chain[uo_i]
self.log.i(f"Unit operation `{uo.uo_id}` updated")
# Store profiles in DEBUG data log.
if self.log.log_level <= self.log.DEBUG:
f, c = uo.get_result()
self.log.d_data(self._log_tree[uo.uo_id], "f", f)
self.log.d_data(self._log_tree[uo.uo_id], "c", c)
# Call callback function if specified.
if on_update_callback:
on_update_callback(uo_i)
[docs]class UserInterface(_ABC):
"""Wrapper around RtdModel suitable for building GUI on top of it.
Parameters
----------
rtd_model : RtdModel
RTD model.
Attributes
----------
species_label : list of str
Labels of the species in concentration array.
x_label
Label of x axis (time). Default = 't'
y_label_c
Label of y axis (concentration). Default = 'c'
y_label_f
Label of y axis (flow rate). Default = 'f'
start_at : int
The index of unit operation (starting with 0) at which the
re-evaluation starts. The value of -1 means that the inlet
profile is also reevaluated.
"""
def __init__(self, rtd_model: RtdModel):
self.rtd_model = rtd_model
# default values - update them after init
self.start_at = -1
self.species_label = self.rtd_model.inlet.species_list
self.x_label = 't'
self.y_label_c = 'c'
self.y_label_f = 'f'
[docs] def recalculate(self, forced=False):
"""Re-evaluates the model from the `start_at` index onwards.
Parameters
----------
forced
If true, the entire model (inlet + unit operations) is
re-evaluated. The same can be achieved by setting
`self.start_at` to -1.
"""
start_at = -1 if forced else self.start_at
def callback_fun(i):
if i == -1:
f, c = self.rtd_model.inlet.get_result()
else:
f, c = self.rtd_model.dsp_uo_chain[i].get_result()
self._update_ui_for_uo(i, f, c)
# Re-calculate the model.
self.rtd_model.recalculate(start_at, callback_fun)
[docs] @_abstractmethod
def build_ui(self): # pragma: no cover
"""Build the UI from scratch."""
raise NotImplementedError
@_abstractmethod
def _update_ui_for_uo(self, uo_i, f, c): # pragma: no cover
"""Update the UI for specific unit operation.
Parameters
----------
uo_i: int
Index of unit operation (-1 for inlet profile).
Indexes for DSP unit operation train start with 0.
c: _np.ndarray
Outlet concentration profile.
f: _np.ndarray
Outlet flow profile.
"""
raise NotImplementedError