Source code for bio_rtd.uo.sc_uo

__all__ = ['AlternatingChromatography', 'ACC', 'PCC', 'PCCWithWashDesorption']
__version__ = '0.2'
__author__ = 'Jure Sencar'

import typing as _typing
import numpy as _np
import scipy.interpolate as _interp

import bio_rtd.utils as _utils
import bio_rtd.core as _core
import bio_rtd.pdf as _pdf

[docs]class AlternatingChromatography(_core.UnitOperation): def __init__(self, t: _np.ndarray, uo_id: str, load_bt: _core.ChromatographyLoadBreakthrough, peak_shape_pdf: _core.PDF, gui_title: str = "ACC"): super().__init__(t, uo_id, gui_title) # get load breakthrough profile self.load_bt = load_bt # get elution peak pdf self.elution_peak_shape = peak_shape_pdf # process buffer species that are not binding to the column self.non_binding_species = [] # column volume float = -1 # column volume """Column volume""" self.column_porosity_retentate = -1 # column porosity for product """Column rtm""" self.ft_mean_retentate = -1 # flow-through time for product (cv = f * ft_mean_retentate / porosity_retentate) # duration of equilibration step self.equilibration_cv = -1 self.equilibration_t = -1 # equilibration step flow rate self.equilibration_f = -1 self.equilibration_f_rel = 1 # equilibration flow rate relative to the load flow rate # duration of the load phase self.load_cv = -1 # load duration in CV self.load_c_end_ss: _typing.Optional[_np.ndarray] = None # breakthrough concentration upper limit (CV << load) self.load_c_end_relative_ss = -1 # relative breakthrough concentration upper limit (CV << load) self.load_c_end_estimate_with_iterative_solver = False # finer optimization of cycle length determination self.load_c_end_estimate_with_iterative_solver_max_iter = 1000 # extend first load phase (to achieve a faster steady-state) self.load_extend_first_cycle = False self.load_extend_first_cycle_cv = -1 self.load_extend_first_cycle_t = -1 # target load linear velocity (in order to estimate column length / diameter ratio) self.load_target_lin_velocity = -1 # units need to match other units chosen for the model # duration of the wash step self.wash_cv = -1 self.wash_t = -1 # wash step flow rate self.wash_f = -1 self.wash_f_rel = 1 # wash flow rate relative to the load flow rate # Elution peak is scaled down by 1 - `unaccounted_losses_rel`. # Peak cut criteria is applied after the scale down. self.unaccounted_losses_rel = 0 # duration of elution step self.elution_cv = -1 self.elution_t = -1 # elution flow rate self.elution_f = -1 self.elution_f_rel = 1 # elution flow rate relative to the load flow rate self.elution_buffer_c: _np.ndarray = _np.array([]) # peak position within elution step (1st moment, not max) self.elution_peak_position_cv = -1 self.elution_peak_position_t = -1 # elution peak cut # start (one of them needs to be defined) self.elution_peak_cut_start_t = -1 self.elution_peak_cut_start_cv = -1 self.elution_peak_cut_start_c_rel_to_peak_max = -1 self.elution_peak_cut_start_peak_area_share = -1 # end (one of them needs to be defined) self.elution_peak_cut_end_t = -1 self.elution_peak_cut_end_cv = -1 self.elution_peak_cut_end_c_rel_to_peak_max = -1 self.elution_peak_cut_end_peak_area_share = -1 # duration of regeneration step self.regeneration_cv = -1 self.regeneration_t = -1 # regeneration step flow rate self.regeneration_f = -1 self.regeneration_f_rel = 1 # wash flow rate relative to the load flow rate # option to simulate desorption during wash step self.wash_desorption = False # For PCC # option to capture load breakthrough onto next column self.load_recycle = False # propagation dynamics of unbound material through the column self.load_recycle_pdf: _typing.Optional[_core.PDF] = None # option to capture breakthrough material during wash step onto third column (2nd is on load step) self.wash_recycle = False # duration of wash recycling self.wash_recycle_duration_cv = -1 self.wash_recycle_duration_t = -1 @_core.UnitOperation.log.setter def log(self, logger: _core._logger.RtdLogger): self._logger = logger # propagate logger across other elements with logging self._logger.set_data_tree(self._instance_id, self._log_tree) if self.load_recycle_pdf is not None: self.load_recycle_pdf.set_logger_from_parent(self.uo_id, logger) if self.load_recycle_pdf is not None: self.elution_peak_shape.set_logger_from_parent(self.uo_id, logger) if self.load_recycle_pdf is not None: self.load_bt.set_logger_from_parent(self.uo_id, logger) def _get_flow_value(self, step_name: str, var_name: str, flow: float, rel_flow: float) -> float: if flow > 0: self.log.i_data(self._log_tree, var_name, flow) elif rel_flow > 0: flow = rel_flow * self._load_f self.log.i_data(self._log_tree, var_name, flow) else: self.log.w(step_name + " step flow rate is not defined, using load flow rate instead") flow = self._load_f return flow def _get_time_value(self, step_name: str, var_name: str, t: float, cv: float, flow: float) -> float: t_sum = max(t, 0) if cv > 0: assert flow > 0, step_name + ": Flow rate must be defined ( > 0 ) if the duration was specified in CVs." assert self._cv > 0, "CV must be determined (by `calc_cv`) before calculating duration based on CVs" t_sum += cv * self._cv / flow # sum if t <= 0 and cv <= 0: # log self.log.w(step_name + " time is not defined") else: self.log.i_data(self._log_tree, var_name, t_sum) return t_sum # return def _assert_non_binding_species(self): # make sure binding species list is valid if len(self.non_binding_species) > 0: assert max(self.non_binding_species) < self._n_species, \ "Index of non_binding_species too large (indexes start with 0)" assert list(set(self.non_binding_species)) \ == list(self.non_binding_species), \ "List of non_binding_species should have ascending order" assert len(self.non_binding_species) < self._n_species, \ "All species cannot be non-binding." self.log.i_data(self._log_tree, 'non_binding_species', self.non_binding_species) def _calc_load_f(self): assert self._is_flow_box_shaped(), "Inlet flow must be box shaped" self._load_f = self._f.max() self.log.d_data(self._log_tree, 'load_f', self._load_f) def _calc_cv(self): self._ensure_single_non_negative_parameter( self.log.ERROR, self.log.ERROR,, ft_mean_retentate=self.ft_mean_retentate, ) if > 0: self._cv = else: # self.ft_mean_retentate > 0: assert self.column_porosity_retentate > 0, \ "porosity_retentate must be defined to calc CV from " \ "ft_mean_retentate" assert self._load_f > 0, "load flow rate must be defined to calc CV from ft_mean_retentate" self._cv = self.ft_mean_retentate * self._load_f / self.column_porosity_retentate self.log.i_data(self._log_tree, 'cv', self._cv) def _report_column_dimensions(self): # report column dimensions based on desired load linear velocity if self.load_target_lin_velocity > 0: self._col_h = self._cv * self.load_target_lin_velocity / self._load_f self.log.i_data(self._log_tree, "column_h", self._col_h) self.log.i_data(self._log_tree, "column_d", (self._cv / self._col_h / _np.pi) ** 0.5 * 2) def _calc_equilibration_t(self): if self.equilibration_cv > 0: # flow rate eq_f = self._get_flow_value( "Equilibration", "equilibration_f", self.equilibration_f, self.equilibration_f_rel ) # duration self._equilibration_t = self._get_time_value( "Equilibration", "equilibration_t", self.equilibration_t, self.equilibration_cv, eq_f ) else: self._equilibration_t = max(self.equilibration_t, 0) self.log.i_data(self._log_tree, 'equilibration_t', self._equilibration_t) def _calc_wash_t_and_f(self): # flow rate self._wash_f = self._get_flow_value("Wash", "wash_f", self.wash_f, self.wash_f_rel) # duration self._wash_t = self._get_time_value("Wash", "wash_t", self.wash_t, self.wash_cv, self._wash_f) def _calc_elution_t_and_f(self): # flow rate self._elution_f = self._get_flow_value("Elution", "elution_f", self.elution_f, self.elution_f_rel) # duration self._elution_t = self._get_time_value("Elution", "elution_t", self.elution_t, self.elution_cv, self._elution_f) def _calc_elution_peak_t(self): # elution peak mean position (1st momentum) self._elution_peak_t = self._get_time_value( "elution peak position", "elution_peak_position_t", self.elution_peak_position_t, self.elution_peak_position_cv, self._elution_f ) def _update_elution_peak_pdf(self): assert self._elution_peak_t > 0 assert self._elution_f > 0 # calc elution peak shape self.elution_peak_shape.update_pdf( rt_mean=self._elution_peak_t, v_void=self._elution_peak_t * self._elution_f, f=self._elution_f ) self._p_elution_peak = self.elution_peak_shape.get_p() \ * (1 - self.unaccounted_losses_rel) self.log.d_data(self._log_tree, "p_elution_peak", self._p_elution_peak) def _calc_elution_peak_cut_i_start_and_i_end(self): elution_peak_pdf: _np.ndarray = self._p_elution_peak.copy() # start peak cut self._ensure_single_non_negative_parameter( log_level_multiple=self.log.ERROR, log_level_none=self.log.WARNING, elution_peak_cut_start_peak_area_share=self.elution_peak_cut_start_peak_area_share, elution_peak_cut_start_c_rel_to_peak_max=self.elution_peak_cut_start_c_rel_to_peak_max, elution_peak_cut_start_cv=self.elution_peak_cut_start_cv, elution_peak_cut_start_t=self.elution_peak_cut_start_t ) # calc elution_peak_cut_start_i if self.elution_peak_cut_start_peak_area_share >= 0: elution_peak_cut_start_i = _utils.vectors.true_start( _np.cumsum(elution_peak_pdf * self._dt) >= self.elution_peak_cut_start_peak_area_share ) elif self.elution_peak_cut_start_c_rel_to_peak_max >= 0: elution_peak_cut_start_i = _utils.vectors.true_start( elution_peak_pdf >= self.elution_peak_cut_start_c_rel_to_peak_max * elution_peak_pdf.max() ) elif self.elution_peak_cut_start_cv >= 0: elution_peak_cut_start_i = int(self.elution_peak_cut_start_cv * self._cv / self._elution_f / self._dt) elif self.elution_peak_cut_start_t >= 0: elution_peak_cut_start_i = int(self.elution_peak_cut_start_t / self._dt) else: self.log.w("Elution peak cut start is not defined. Now collecting from the beginning of the elution phase") elution_peak_cut_start_i = 0 # log results self.log.i_data(self._log_tree, "elution_peak_cut_start_i", elution_peak_cut_start_i) self.log.i_data(self._log_tree, "elution_peak_cut_start_t", elution_peak_cut_start_i * self._dt) # end peak cut self._ensure_single_non_negative_parameter( log_level_multiple=self.log.ERROR, log_level_none=self.log.WARNING, elution_peak_cut_end_peak_area_share=self.elution_peak_cut_end_peak_area_share, elution_peak_cut_end_c_rel_to_peak_max=self.elution_peak_cut_end_c_rel_to_peak_max, elution_peak_cut_end_cv=self.elution_peak_cut_end_cv, elution_peak_cut_end_t=self.elution_peak_cut_end_t, ) # calc elution_peak_cut_end_i if self.elution_peak_cut_end_peak_area_share >= 0: elution_peak_cut_end_i = _utils.vectors.true_start( _np.cumsum(elution_peak_pdf * self._dt) >= (1 - self.elution_peak_cut_end_peak_area_share) ) elif self.elution_peak_cut_end_c_rel_to_peak_max >= 0: elution_peak_cut_end_i = _utils.vectors.true_end( elution_peak_pdf >= self.elution_peak_cut_end_c_rel_to_peak_max * elution_peak_pdf.max() ) elif self.elution_peak_cut_end_cv >= 0: elution_peak_cut_end_i = int(self.elution_peak_cut_end_cv * self._cv / self._elution_f / self._dt) elif self.elution_peak_cut_end_t >= 0: elution_peak_cut_end_i = _utils.vectors.true_end(self._t < self.elution_peak_cut_end_t) else: self.log.w("Elution peak cut end is not defined. Now collecting to the end of the elution phase") elution_peak_cut_end_i = elution_peak_pdf.size # log results self.log.i_data(self._log_tree, "elution_peak_cut_end_i", elution_peak_cut_end_i) self.log.i_data(self._log_tree, "elution_peak_cut_end_t", elution_peak_cut_end_i * self._dt) self._elution_peak_cut_start_i = elution_peak_cut_start_i self._elution_peak_cut_end_i = elution_peak_cut_end_i if self._elution_peak_cut_end_i * self._dt < self._elution_peak_t: self.log.w("Peak end is cut before its maximum") if self._elution_peak_cut_end_i * self._dt > self._elution_t: self.log.w("Peak cut end exceeds elution step duration") return elution_peak_cut_start_i, elution_peak_cut_end_i def _calc_elution_peak_mask(self): self._elution_peak_mask = _np.ones(int(round(self._elution_t / self._dt)), dtype=bool) self._elution_peak_mask[self._elution_peak_cut_end_i:] = False self._elution_peak_mask[:self._elution_peak_cut_start_i] = False self.log.d_data(self._log_tree, "elution_peak_interval", self._elution_peak_mask) def _update_load_btc(self): assert self._cv > 0, "CV must be defined by now" self.load_bt.update_btc_parameters(cv=self._cv) def _calc_regeneration_t(self): if self.regeneration_cv > 0: # flow rate eq_f = self._get_flow_value( "Regeneration", "regeneration_f", self.regeneration_f, self.regeneration_f_rel ) # duration self._regeneration_t = self._get_time_value( "Regeneration", "regeneration_t", self.regeneration_t, self.regeneration_cv, eq_f ) else: self._regeneration_t = max(self.regeneration_t, 0) self.log.i_data(self._log_tree, 'regeneration_t', self._regeneration_t) def _update_load_recycle_pdf(self, flow): """ Updates pdf that describes propagation dynamics of unbound and desorbed material throughout the column. """ assert self.load_recycle_pdf is not None, "load_recycle_pdf must be defined" assert self.column_porosity_retentate > 0, "Retentate porosity must be defined" assert self._cv > 0, "CV must be defined by now" v_void = self._cv * self.column_porosity_retentate self.load_recycle_pdf.update_pdf(v_void=v_void, f=flow, rt_mean=v_void / flow) self._p_load_recycle_pdf = self.load_recycle_pdf.get_p() def _calc_load_recycle_wash_i(self): if self.wash_recycle_duration_t > 0 \ or self.wash_recycle_duration_cv > 0: self._wash_recycle_i_duration = int(self._get_time_value( "Wash recycle", "load_wash_recycle_t", self.wash_recycle_duration_t, self.wash_recycle_duration_cv, self._wash_f ) / self._dt) else: # same as wash duration assert self._wash_t > 0 self._wash_recycle_i_duration = int(round(self._wash_t / self._dt)) def _get_load_bt_cycle_switch_limit(self, load_c_ss: _np.ndarray) -> _np.ndarray: assert self.load_c_end_ss is not None or self.load_c_end_relative_ss > 0, \ "Load step duration should be defined!" # calc if self.load_c_end_ss is not None: load_c_end_ss = self.load_c_end_ss if self.load_c_end_relative_ss > 0: self.log.w("Cycle time defined by `load_c_end_ss` and `load_c_end_relative_ss`." "Simulation is using `load_c_end_ss`.") else: # self.load_c_end_relative_ss > 0 load_c_end_ss = self.load_c_end_relative_ss * load_c_ss # add to log self.log.i_data(self._log_tree, 'load_c_end_ss', load_c_end_ss) return load_c_end_ss # noinspection DuplicatedCode def _calc_cycle_t(self): """ Calculates cycle time. Optional delay of first cycle is not part of this function """ assert self._cv > 0 assert self._load_f > 0 if self.load_cv > 0: t_cycle = self.load_cv * self._cv / self._load_f if self.load_c_end_ss is not None or self.load_c_end_relative_ss > 0: self.log.w("Cycle time defined in more than one way. Simulation is using `load_cv`.") else: # ## get bt profile for constant inlet ## # inlet c binding_species = [i for i in range(self._n_species) if i not in self.non_binding_species] load_c_ss = self._estimate_steady_state_mean_c(binding_species) # simulate first cycle at constant load concentration f_first_load = self._load_f * _np.ones(self._t.size) c_first_load = load_c_ss * _np.ones([len(binding_species), self._t.size]) bt_first_load: _np.ndarray = load_c_ss - self.load_bt.calc_c_bound(f_first_load, c_first_load) # propagate breakthrough bt_first_load_out, bt_first_wash_out = self._sim_c_recycle_propagation(f_first_load, bt_first_load, None) # calc cycle duration load_c_end_ss = self._get_load_bt_cycle_switch_limit(load_c_ss) # noinspection PyTypeChecker i_t_first_cycle = _utils.vectors.true_start(bt_first_load_out.sum(0) >= load_c_end_ss.sum()) t_cycle = i_t_first_cycle * self._dt # wash desorption if self.wash_desorption and self.wash_recycle: c_wash_desorbed = self._sim_c_wash_desorption( f_first_load[:i_t_first_cycle], c_first_load[:, :i_t_first_cycle] - bt_first_load[:, :i_t_first_cycle] ) else: c_wash_desorbed = None bt_first_load_out, bt_first_wash_out = self._sim_c_recycle_propagation( f_first_load[:i_t_first_cycle], bt_first_load[:, :i_t_first_cycle], c_wash_desorbed ) if self.load_recycle: if not self.load_c_end_estimate_with_iterative_solver: self.log.w("Estimating cycle duration: Assuming sharp breakthrough profile") i_load_recycle_start = self._wash_recycle_i_duration if self.wash_recycle else 0 m_load_recycle = \ bt_first_load_out[:, i_load_recycle_start:i_t_first_cycle].sum() * self._load_f * self._dt _t_diff = m_load_recycle / self._load_f / load_c_ss.sum() t_cycle -= _t_diff self._load_recycle_m_ss = m_load_recycle self.log.i_data(self._log_tree, 'm_load_recycle_ss', m_load_recycle) self.log.i_data(self._log_tree, 'shorten_cycle_t_due_to_bt_recycle', _t_diff) if self.wash_recycle: if not self.load_c_end_estimate_with_iterative_solver: self.log.w("Estimating cycle duration: Assuming sharp breakthrough profile") m_wash_recycle = bt_first_wash_out[:, :self._wash_recycle_i_duration].sum() * self._wash_f * self._dt _t_diff = m_wash_recycle / self._load_f / load_c_ss.sum() t_cycle -= _t_diff self._wash_recycle_m_ss = m_wash_recycle self.log.i_data(self._log_tree, 'm_wash_recycle_ss', m_wash_recycle) self.log.i_data(self._log_tree, 'shorten_cycle_t_due_to_wash_recycle', _t_diff) if self.load_c_end_estimate_with_iterative_solver and (self.wash_recycle or self.load_recycle): c_load_fist_cycle = load_c_ss * _np.ones([len(binding_species), i_t_first_cycle * 2]) def sim_cycle(f_load, c_load, i_prev_cycle) -> (_np.ndarray, _np.ndarray, int): # load bt_load: _np.ndarray = c_load - self.load_bt.calc_c_bound(f_load, c_load) # propagate breakthrough bt_load_out, _ = self._sim_c_recycle_propagation(f_load, bt_load, None) # 'stop' load at specified breakthrough criteria # noinspection PyTypeChecker i_cycle_duration = _utils.vectors.true_start(bt_load_out.sum(0) >= load_c_end_ss.sum()) # cut load at specified time bt_load = bt_load[:, :i_cycle_duration] # wash desorption if self.wash_desorption and self.wash_recycle: c_first_wash_desorbed = self._sim_c_wash_desorption( f_load[:i_cycle_duration], c_load[:, :i_cycle_duration] - bt_load[:, :i_cycle_duration] ) else: c_first_wash_desorbed = None # propagate load and wash leftovers bt_load_out, bt_wash_out = self._sim_c_recycle_propagation( f_load[:i_cycle_duration], bt_load, c_first_wash_desorbed ) # construct load for next cycle # recycle load if self.load_recycle: rec_load = bt_load_out[:, i_prev_cycle:i_cycle_duration] else: rec_load = _np.zeros_like(bt_load_out[:, i_prev_cycle:i_cycle_duration]) # next load profiles c_next_load = _np.concatenate((rec_load, c_load_fist_cycle), axis=1) f_next_load = self._load_f * _np.ones(c_next_load.shape[1]) wash_recycle_i_duration = self._wash_recycle_i_duration if self.wash_recycle else 0 m_load_recycle_ss = \ bt_first_load_out[:, wash_recycle_i_duration:i_t_first_cycle].sum() * self._load_f * self._dt self._load_recycle_m_ss = m_load_recycle_ss self.log.i_data(self._log_tree, 'm_load_recycle_ss', m_load_recycle_ss) # recycle wash if self.wash_recycle: c_next_load[:, :self._wash_recycle_i_duration] = bt_wash_out[:, :self._wash_recycle_i_duration] f_next_load[:self._wash_recycle_i_duration] = self._wash_f m_wash_recycle_ss = \ bt_wash_out[:, :self._wash_recycle_i_duration].sum() * self._wash_f * self._dt self._wash_recycle_m_ss = m_wash_recycle_ss self.log.i_data(self._log_tree, 'm_wash_recycle_ss', m_wash_recycle_ss) # return next load and cycle duration return f_next_load, c_next_load, i_cycle_duration - i_prev_cycle f_load_cycle = self._load_f * _np.ones(c_load_fist_cycle.shape[1]) c_load_cycle = c_load_fist_cycle i_t_cycle_prev = i_t_first_cycle i_t_cycle_estimate = 0 # loop until cycle duration converges for i in range(self.load_c_end_estimate_with_iterative_solver_max_iter): if abs(i_t_cycle_prev - i_t_cycle_estimate) <= 1: self.log.i_data(self._log_tree, "t_cycle_optimization_loop_iter", i) break i_t_cycle_prev = i_t_cycle_estimate f_load_cycle, c_load_cycle, i_t_cycle_estimate = \ sim_cycle(f_load_cycle, c_load_cycle, i_t_cycle_prev) # print([i, i_t_cycle_prev, i_t_cycle_estimate]) if abs(i_t_cycle_prev - i_t_cycle_estimate) > 1: self.log.w("Cycle duration estimator did not converge") t_cycle = i_t_cycle_estimate * self._dt elif self.load_c_end_estimate_with_iterative_solver: self.log.w("No need to use iterative solver in case of no recycling of load and/or wash") self._cycle_t = t_cycle self.log.i_data(self._log_tree, 'cycle_t', t_cycle) # noinspection DuplicatedCode def _calc_first_cycle_extension_t(self): if not self.load_recycle and not self.wash_recycle: self.log.w("Estimation of first cycle extension requested on a process without load recycle") self._first_cycle_extension_t = 0 return if not self.load_extend_first_cycle: self.log.w("Estimation of first cycle extension requested on a process without extended first cycle") self._first_cycle_extension_t = 0 return if self.load_extend_first_cycle_t > 0: self._first_cycle_extension_t = self.load_extend_first_cycle_t return if self.load_extend_first_cycle_cv >= 0: assert self._cv > 0, "CV should be defined by now" assert self._load_f > 0, "Load flow rate should be defined by now" self._first_cycle_extension_t = self.load_extend_first_cycle_cv * self._cv / self._load_f elif self.load_cv > 0: raise NotImplementedError( "Estimation of first cycle extension is only supported if the cycle length " "is defined by breakthrough cutoff criteria. This is due to the fact that if all the " "breakthrough material gets recycles, there is no single steady-state." ) else: binding_species = [i for i in range(self._n_species) if i not in self.non_binding_species] load_c_ss = self._estimate_steady_state_mean_c(binding_species) # simulate first cycle at constant load concentration f_first_load = self._load_f * _np.ones(self._t.size) c_first_load = load_c_ss * _np.ones([len(binding_species), self._t.size]) bt_first_load: _np.ndarray = load_c_ss - self.load_bt.calc_c_bound(f_first_load, c_first_load) # propagate breakthrough bt_first_load_out, _ = self._sim_c_recycle_propagation(f_first_load, bt_first_load, None) load_c_end_ss = self._get_load_bt_cycle_switch_limit(load_c_ss) # noinspection PyTypeChecker i_t_first_cycle = _utils.vectors.true_start(bt_first_load_out.sum(0) >= load_c_end_ss.sum()) dm = 0 if self.load_recycle: assert hasattr(self, "_load_recycle_m_ss"), "Function `_calc_cycle_t()` should already be called." dm += self._load_recycle_m_ss if self.wash_recycle: assert hasattr(self, "_wash_recycle_m_ss"), "Function `_calc_cycle_t()` should already be called." dm += self._wash_recycle_m_ss di = 0 if dm > 0: m_ext_bt = _np.cumsum(bt_first_load_out.sum(0)[i_t_first_cycle:]) * self._load_f * self._dt di += _utils.vectors.true_start(m_ext_bt >= dm) self._first_cycle_extension_t = di * self._dt def _calc_cycle_start_i_list(self): assert self._cycle_t > 0, "Cycle length must have been determined (by `_calc_cycle_t`) by now" flow_i_start, flow_i_end = _utils.vectors.true_start_and_end(self._f > 0) if self.load_extend_first_cycle: assert self._first_cycle_extension_t >= 0, \ "Prolong of first load cycle is set to True, but the length is undefined" if self._first_cycle_extension_t == 0: self.log.w("Prolong of first load cycle is set to True, but the length of the extension is 0") load_extend_first_cycle_t = self._first_cycle_extension_t self.log.i_data(self._log_tree, "load_extend_first_cycle_t", load_extend_first_cycle_t) else: load_extend_first_cycle_t = 0 cycle_start_t_list = _np.arange( self._t[flow_i_start] + load_extend_first_cycle_t, self._t[flow_i_end - 1], self._cycle_t ) cycle_start_t_list[0] = self._t[flow_i_start] self._cycle_start_i_list = _np.rint(cycle_start_t_list / self._dt).astype(_np.int32) self.log.i_data(self._log_tree, "cycle_start_t_list", cycle_start_t_list) def _prepare_simulation(self): """Prepare everything before simulation loop over cycles""" self._assert_non_binding_species() self._calc_load_f() self._calc_cv() # might depend on load_f self._report_column_dimensions() # optional self._calc_equilibration_t() self._calc_wash_t_and_f() self._calc_elution_t_and_f() self._calc_elution_peak_t() self._update_elution_peak_pdf() self._calc_elution_peak_cut_i_start_and_i_end() self._calc_elution_peak_mask() self._calc_regeneration_t() # prepare for estimation of cycle length self._update_load_btc() if self.load_recycle: self._update_load_recycle_pdf(self._wash_f) if self.wash_recycle: self._calc_load_recycle_wash_i() # calc cycle time self._calc_cycle_t() if self.load_extend_first_cycle: self._calc_first_cycle_extension_t() # calc cycle start positions -> column switch time points (at load) self._calc_cycle_start_i_list() # make sure cycle duration is long enough _t_cycle_except_load = self._equilibration_t + self._wash_t + self._elution_t + self._regeneration_t if self._cycle_t < _t_cycle_except_load: self.log.e(f"Load step ({self._cycle_t}) should not be shorter" f" than eq_t + wash_t + elution_t + regeneration_t" f" ({_t_cycle_except_load: .6})!") def _sim_c_load_binding(self, f_load: _np.ndarray, c_load: _np.ndarray) -> (_np.ndarray, _np.ndarray): """Evaluates load against breakthrough profile to determine what part of load binds.""" assert f_load.size == c_load.shape[1], \ "f_load and c_load must have the same length" assert c_load.shape[0] == \ self._n_species - len(self.non_binding_species), \ "c_load must contain all binding species" c_bound = self.load_bt.calc_c_bound(f_load, c_load) return c_bound, c_load - c_bound # returns bound and unbound parts def _sim_c_wash_desorption(self, f_load: _np.ndarray, c_bound: _np.ndarray) -> _np.ndarray: """Returns concentration profile of desorbed material during wash step.""" # Not implemented in core this class, as there is no consensus on typical dynamics or way to describe it. raise NotImplementedError("Function not implemented in this class") def _sim_c_recycle_propagation(self, f_load: _np.ndarray, c_load_unbound: _np.ndarray, c_wash_desorbed: _typing.Optional[_np.ndarray]) -> (_np.ndarray, _np.ndarray): """ Propagate the unbound (breakthrough during load) and desorbed (during wash) material through the column Parameters ---------- f_load Flow rate profile of load step (which includes load step + possible wash and load recycle). c_load_unbound Concentration profile of overloaded material during load step (+ previous wash and load recycle). c_wash_desorbed Concentration profile of desorbed material during wash step. Returns ------- (_np.ndarray, _np.ndarray) c_unbound_propagated Propagated (through the column) concentration profile of overloaded material during load step. c_wash_desorbed_propagated Propagated (through the column) concentration profile of desorbed material during wash step. """ assert hasattr(self, "_wash_f") and self._wash_f > 0 assert hasattr(self, "_wash_t") and self._wash_t > 0 assert self.load_recycle_pdf is not None assert c_load_unbound.shape[0] == \ self._n_species - len(self.non_binding_species) assert c_load_unbound.shape[1] == f_load.size if c_wash_desorbed is None or c_wash_desorbed.size == 0: c_wash_desorbed = _np.zeros([ self._n_species - len(self.non_binding_species), int(round(self._wash_t / self._dt))]) else: assert c_wash_desorbed.shape[0] == \ self._n_species - len(self.non_binding_species) assert c_wash_desorbed.shape[1] == \ int(round(self._wash_t / self._dt)) # combine on volumetric scale v_load = self._dt * f_load.cumsum() v_wash = v_load[-1] + self._dt * _np.arange(1, c_wash_desorbed.shape[1] + 1) * self._wash_f min_flow = min(f_load.min(), self._wash_f) dv = min_flow * self._dt v = _np.arange(dv, (v_wash[-1] if v_wash.size > 0 else v_load[-1]) + dv, dv) c_v_combined = _interp.interp1d( _np.concatenate((v_load, v_wash), axis=0), _np.concatenate((c_load_unbound, c_wash_desorbed), axis=1), fill_value="extrapolate" )(v) c_v_combined[c_v_combined < 0] = 0 # simulate traveling of leftover material through the column self._update_load_recycle_pdf(min_flow) c_v_combined_propagated = _utils.convolution.time_conv( self._dt, c_v_combined, self._p_load_recycle_pdf) # split back on time scale c_combined_propagated = _interp.interp1d( v, c_v_combined_propagated, fill_value="extrapolate" )(_np.concatenate((v_load, v_wash), axis=0)) c_combined_propagated[c_combined_propagated < 0] = 0 c_unbound_propagated = c_combined_propagated[:, :v_load.size] c_wash_desorbed_propagated = c_combined_propagated[:, v_load.size:] return c_unbound_propagated, c_wash_desorbed_propagated def _sim_c_elution_desorption(self, m_bound: _np.ndarray) -> (_np.ndarray, _np.ndarray): """ Simulate elution step. Parameters ---------- m_bound Vector with amount of product being bound to the column. `m_bound.size == n_species` Returns ------- (_np.ndarray, _np.ndarray) c_elution Outlet concentration profile during the elution. mask_elution_peak Boolean vector. The peak is collected where the value is `True`. """ assert self._elution_f > 0 assert self._elution_t > 0 i_elution_duration = int(round(self._elution_t / self._dt)) c_elution = \ self._p_elution_peak[_np.newaxis, :i_elution_duration] * \ m_bound[:, _np.newaxis] / self._elution_f if c_elution.shape[1] < i_elution_duration: c_elution = _np.pad(c_elution, ((0, 0), (0, i_elution_duration - c_elution.shape[1])), mode="constant") mask_elution_peak = self._elution_peak_mask return c_elution, mask_elution_peak def _sim_c_elution_buffer(self, n_time_steps: int) -> _np.ndarray: """Feel free to override this function if you want to simulate linear gradient""" # elution buffer composition elution_buffer_composition = self.elution_buffer_c.reshape(self.elution_buffer_c.size, 1) assert elution_buffer_composition.size == 0 or elution_buffer_composition.size == self._n_species, \ "Elution buffer composition must be either empty or have a concentration value for each specie" assert _np.all(elution_buffer_composition >= 0), "Concentration values in elution buffer must be >= 0" if elution_buffer_composition.size == 0: elution_buffer_composition = _np.zeros([self._n_species, 1]) self.log.i_data(self._log_tree, "elution_buffer_composition", elution_buffer_composition) return elution_buffer_composition * _np.ones_like(self._t[:n_time_steps]) # noinspection PyMethodMayBeStatic,PyUnusedLocal def _sim_c_regeneration(self, m_bound: _np.ndarray) -> _typing.Optional[_np.ndarray]: """ Simulate regeneration step. Parameters ---------- m_bound Vector with amount of product being bound to the column. `m_bound.size == n_species` Returns ------- _typing.Optional[_np.ndarray] Outlet concentration profile during regeneration step. """ c_regeneration = None return c_regeneration def _sim_c_out_cycle(self, f_load: _np.ndarray, c_load: _np.ndarray) -> (_typing.Optional[_np.ndarray], _typing.Optional[_np.ndarray], _np.ndarray, _np.ndarray, _typing.Optional[_np.ndarray]): """ Simulates load-wash-elution-regeneration steps. Regeneration is optional. This function can be replaced in case user wants to use some other evaluation of bind-elution dynamics. Parameters ---------- f_load: _np.ndarray Inlet load flow rate profile. It might be changing due to wash recycle. c_load: _np.ndarray Inlet load concentration profile. Returns ------- (_typing.Optional[_np.ndarray], _typing.Optional[_np.ndarray], _np.ndarray, _np.ndarray, _typing.Optional[_np.ndarray]) Concentration profiles at the outlet of the column for load, wash, elution and regeneration steps. Elution peak cut is applied in this function. Elution peak must be defined. Profiles that are `None` are considered being zero. """ assert self._load_f > 0 assert self._wash_f > 0 assert self._wash_t > 0 assert self._elution_f > 0 assert self._elution_t > 0 assert self._load_f > 0 assert self._cv > 0 # evaluate binding c_bound, c_unbound = self._sim_c_load_binding(f_load, c_load) # log m_load = (c_load * f_load[_np.newaxis, :]).sum(1) * self._dt m_bound = (c_bound * f_load[_np.newaxis, :]).sum(1) * self._dt self.log.i_data(self._cycle_tree, "column_utilization", m_bound / self._cv / self.load_bt.get_total_bc()) self.log.i_data(self._cycle_tree, "m_load", m_load) self.log.i_data(self._cycle_tree, "m_bound", m_bound) self.log.i_data(self._cycle_tree, "m_unbound", m_load - m_bound) self.log.d_data(self._cycle_tree, "f_load", f_load) self.log.d_data(self._cycle_tree, "c_load", c_load) self.log.d_data(self._cycle_tree, "c_bound", c_bound) self.log.d_data(self._cycle_tree, "c_unbound", c_unbound) # evaluate desorption during wash c_wash_desorbed = None if self.wash_desorption: c_wash_desorbed = self._sim_c_wash_desorption(f_load, c_bound) if c_wash_desorbed.size > 0: m_bound -= c_wash_desorbed.sum(1) # subtract desorbed material from bound material # log self.log.i_data(self._cycle_tree, "m_wash_desorbed", c_wash_desorbed.sum(1) * self._wash_f * self._dt) self.log.d_data(self._cycle_tree, "c_wash_desorbed", c_wash_desorbed) # propagate unbound and desorbed material throughout the column c_out_load = c_unbound c_out_wash = c_wash_desorbed if self.load_recycle or self.wash_recycle: c_out_load, c_out_wash = self._sim_c_recycle_propagation(f_load, c_unbound, c_wash_desorbed) # get elution peak c_out_elution, elution_peak_mask = self._sim_c_elution_desorption(m_bound) # log m_elution_peak = (c_out_elution * elution_peak_mask[_np.newaxis, :]).sum(1) * self._elution_f * self._dt m_elution = c_out_elution.sum(1) * self._elution_f * self._dt self.log.i_data(self._cycle_tree, "m_elution_peak", m_elution_peak) self.log.i_data(self._cycle_tree, "m_elution", m_elution) self.log.i_data(self._cycle_tree, "m_elution_peak_cut_loss", m_elution - m_elution_peak) # get regeneration peak c_out_regeneration = self._sim_c_regeneration(m_bound - c_out_elution.sum(1) * self._elution_f * self._dt) return c_out_load, c_out_wash, c_out_elution, elution_peak_mask, c_out_regeneration def _calculate(self): # pre calculate parameters and repetitive profiles self._prepare_simulation() binding_species = [i for i in range(self._n_species) if i not in self.non_binding_species] assert len(binding_species) > 0 # copy inlet vectors c_in_load = self._c[binding_species].copy() f_in_load = self._f.copy() f_in_i_end = min(_utils.vectors.true_end(f_in_load > 0), self._t.size) c_in_load[:, f_in_i_end:] = 0 # clear for results self._c[:] = 0 self._f[:] = 0 # prepare logger log_data_cycles = list() self.log.set_branch(self._log_tree, "cycles", log_data_cycles) # util variable previous_c_bt_wash: _typing.Optional[_np.ndarray] = None for i in range(self._cycle_start_i_list.size): """ In one cycle we focus on load-wash-elution-regeneration-equilibration on the column. The loading step starts at `self._cycle_start_i_list[i]`. """ # prepare logger for this cycle self._cycle_tree = dict() log_data_cycles.append(self._cycle_tree) # Load start and end time as the column sees it if i > 0 and self.load_recycle: # column sees leftovers from previous load during recycling cycle_load_i_start = self._cycle_start_i_list[i - 1] else: cycle_load_i_start = self._cycle_start_i_list[i] # Calc cycle end (either next cycle or end or simulation time) if i + 1 < self._cycle_start_i_list.size: cycle_load_i_end = self._cycle_start_i_list[i + 1] else: cycle_load_i_end = f_in_i_end - 1 # log results self.log.i_data(self._cycle_tree, "i_cycle_load_start", cycle_load_i_start) self.log.i_data(self._cycle_tree, "i_cycle_load_step_start", self._cycle_start_i_list[i]) self.log.i_data(self._cycle_tree, "i_cycle_load_end", cycle_load_i_end) # calc profiles at column outlet c_out_load, c_out_wash, c_out_elution, b_out_elution, c_out_regeneration = self._sim_c_out_cycle( f_in_load[cycle_load_i_start:cycle_load_i_end], c_in_load[:, cycle_load_i_start:cycle_load_i_end] ) self.log.d_data(self._cycle_tree, "c_out_load", c_out_load) self.log.d_data(self._cycle_tree, "c_out_wash", c_out_wash) self.log.d_data(self._cycle_tree, "c_out_elution", c_out_elution) self.log.d_data(self._cycle_tree, "b_out_elution", b_out_elution) self.log.d_data(self._cycle_tree, "c_out_regeneration", c_out_regeneration) # load recycle if self.load_recycle: # recycle load during the load step i_load_start_rel = self._cycle_start_i_list[i] - cycle_load_i_start c_load_recycle = c_out_load[:, i_load_start_rel:] c_in_load[:, self._cycle_start_i_list[i]:cycle_load_i_end] = c_load_recycle self.log.i_data(self._cycle_tree, "m_load_recycle", c_load_recycle.sum(1) * self._load_f * self._dt) self.log.d_data(self._cycle_tree, "c_load_recycle", c_load_recycle) # losses during load == bt through 2nd column == bt before start of load step c_loss_bt_2nd_column = c_out_load[:, i_load_start_rel] self.log.d_data(self._cycle_tree, "m_loss_bt_2nd_column", c_loss_bt_2nd_column.sum() * self._dt * self._load_f) self.log.i_data(self._cycle_tree, "c_loss_bt_2nd_column", c_loss_bt_2nd_column) else: # report losses during load m_loss_load = c_out_load.sum() * self._dt * self._load_f self.log.i_data(self._cycle_tree, "m_loss_load", m_loss_load) # wash recycle if self.wash_recycle: if previous_c_bt_wash is not None and previous_c_bt_wash.size > 0: # clip wash recycle duration if needed i_wash_duration = min(self._wash_recycle_i_duration, self._t.size - self._cycle_start_i_list[i]) # report losses due to discarding load bt while recycling wash s = c_in_load[:, self._cycle_start_i_list[i]:self._cycle_start_i_list[i] + i_wash_duration] self.log.i_data(self._cycle_tree, "m_loss_load_bt_during_wash_recycle", s.sum() * self._dt * self._load_f) self.log.d_data(self._cycle_tree, "c_lost_load_during_wash_recycle", s) self.log.d_data(self._cycle_tree, "c_wash_recycle", previous_c_bt_wash[:, :i_wash_duration]) self.log.i_data(self._cycle_tree, "m_wash_recycle", previous_c_bt_wash[:, :i_wash_duration].sum(1) * self._dt * self._wash_f) # apply previous wash recycle onto the inlet profile s[:] = previous_c_bt_wash[:, :i_wash_duration] f_in_load[self._cycle_start_i_list[i]:self._cycle_start_i_list[i] + i_wash_duration] = self._wash_f # save wash from this cycle to be applied during the next cycle previous_c_bt_wash = c_out_wash else: # report losses during wash if c_out_wash is None: c_out_wash = _np.zeros([len(binding_species), int(round(self._wash_t / self._dt))]) m_loss_wash = c_out_wash.sum() * self._dt * self._load_f self.log.i_data(self._cycle_tree, "m_loss_wash", m_loss_wash) # elution [i_el_rel_start, i_el_rel_end] = _utils.vectors.true_start_and_end(b_out_elution) i_el_start = min(self._t.size, cycle_load_i_end + c_out_wash.shape[1] + i_el_rel_start) i_el_end = min(self._t.size, cycle_load_i_end + c_out_wash.shape[1] + i_el_rel_end) i_el_rel_end = i_el_rel_start + i_el_end - i_el_start # log self.log.i_data(self._cycle_tree, "i_elution_start", i_el_start) self.log.i_data(self._cycle_tree, "i_elution_end", i_el_end) # write to global outlet self._f[i_el_start:i_el_end] = self._elution_f self._c[binding_species, i_el_start:i_el_end] = c_out_elution[:, i_el_rel_start:i_el_rel_end]
[docs]class ACC(AlternatingChromatography): def _sim_c_wash_desorption(self, f_load: _np.ndarray, c_bound: _np.ndarray) -> _np.ndarray: raise NotImplementedError("Function not implemented in this class")
[docs]class PCC(AlternatingChromatography): def __init__(self, t: _np.ndarray, uo_id: str, load_bt: _core.ChromatographyLoadBreakthrough, load_recycle_pdf: _core.PDF, column_porosity_retentate: float, peak_shape_pdf: _core.PDF, gui_title: str = "PCC"): super().__init__(t, uo_id, load_bt, peak_shape_pdf, gui_title) self.load_recycle = True self.wash_recycle = False # how fast is the protein traveling through the column during overloaded conditions (in CVs) self.column_porosity_retentate = column_porosity_retentate # corresponding peak shape self.load_recycle_pdf = load_recycle_pdf def _sim_c_wash_desorption(self, f_load: _np.ndarray, c_bound: _np.ndarray) -> _np.ndarray: raise NotImplementedError("Function not implemented in this class")
[docs]class PCCWithWashDesorption(PCC): def __init__(self, t: _np.ndarray, uo_id: str, load_bt: _core.ChromatographyLoadBreakthrough, load_recycle_pdf: _core.PDF, column_porosity_retentate: float, peak_shape_pdf: _core.PDF, gui_title: str = "PCCWithWashDesorption"): super().__init__(t, uo_id, load_bt, load_recycle_pdf, column_porosity_retentate, peak_shape_pdf, gui_title) self.load_recycle = True self.wash_recycle = True self.wash_desorption = True # one of those must be defined self.wash_desorption_desorbable_material_share = -1 self.wash_desorption_desorbable_above_dbc = -1 # this one must be defined self.wash_desorption_tail_half_time_cv = -1 def _sim_c_wash_desorption(self, f_load: _np.ndarray, c_bound: _np.ndarray) -> _np.ndarray: assert self.wash_desorption_tail_half_time_cv > 0 assert self._load_f > 0 assert self._wash_f > 0 assert self._wash_t > 0 assert self._cv > 0 assert self.wash_desorption_desorbable_material_share > 0 \ or self.wash_desorption_desorbable_above_dbc > 0 assert f_load.size == c_bound.shape[1] assert c_bound.shape[0] \ == self._n_species - len(self.non_binding_species) m_bound = (c_bound * f_load[_np.newaxis, :]).sum(1)[:, _np.newaxis] * self._dt k = -1 if self.wash_desorption_desorbable_material_share > 0: k = self.wash_desorption_desorbable_material_share if self.wash_desorption_desorbable_above_dbc > 0: if k > 0: self.log.w("share of desorbable material defined twice!! " "Using load_recycle_wash_desorbable_material_share") else: k = max(0, 1 - self.wash_desorption_desorbable_above_dbc * self._cv / m_bound.sum()) assert 1 >= k >= 0 i_wash_duration = int(round(self._wash_t / self._dt)) # generate exponential tail exp_pdf = _pdf.TanksInSeries(self._t[:i_wash_duration], 1) tau = self.wash_desorption_tail_half_time_cv * self._cv / self._wash_f / _np.log(2) exp_pdf.update_pdf(rt_mean=tau) # scale desorbed material concentration due to differences in flow rate (ensure mass balance) c_desorbed = m_bound * k * exp_pdf.get_p()[_np.newaxis, :i_wash_duration] / self._wash_f c_desorbed = _np.pad(c_desorbed, ((0, 0), (0, i_wash_duration - c_desorbed.shape[1])), mode="constant") # cut return c_desorbed