Pipeline (vital_sqi.pipeline)

End-to-end signal-to-decision pipeline: load a recording, segment it, compute SQIs, and classify each segment as "accept" or "reject".

High-level entry points

vital_sqi.pipeline.pipeline_highlevel.get_ecg_sqis(file_name, sqi_dict_filename, file_type, signal_idx=1, timestamp_idx=0, sampling_rate=None, start_datetime=None, split_type=0, duration=30, overlapping=None, peak_detector=6)[source]

Computes SQIs for ECG segments and returns the segments along with the SQIs.

Parameters:
  • of (All parameters are similar to get_ppg_sqis with the addition)

  • file_type (str) – Type of the ECG file.

  • channel_num (int, optional) – Number of channels in the ECG signal (default is None).

  • channel_name (list, optional) – Names of channels in the ECG signal (default is None).

Returns:

Segments list and signal object with SQIs.

Return type:

tuple

vital_sqi.pipeline.pipeline_highlevel.get_ppg_sqis(file_name, signal_idx, timestamp_idx, sqi_dict_filename, info_idx=None, timestamp_unit='ms', sampling_rate=None, start_datetime=None, split_type=0, duration=30, overlapping=None, peak_detector=6, delete_signal=True)[source]

Computes SQIs for PPG segments and returns the segments along with the SQIs.

Parameters:
  • file_name (str) – Path to the PPG file.

  • sqi_dict_filename (str) – Path to the SQI dictionary.

  • signal_idx (int) – Index of the signal column in the file.

  • timestamp_idx (int) – Index of the timestamp column in the file.

  • info_idx (list, optional) – List of indices for additional information columns (default is None).

  • timestamp_unit (str, optional) – Time unit for the timestamps (default is ‘ms’).

  • sampling_rate (int, optional) – Sampling rate of the signal (default is None).

  • start_datetime (datetime, optional) – Start datetime of the signal (default is None).

  • split_type (int, optional) – Type of segment split (default is 0).

  • duration (int, optional) – Duration of each segment in seconds (default is 30).

  • overlapping (float, optional) – Overlapping ratio between segments (default is None).

  • peak_detector (int, optional) – Method for peak detection (default is 7).

  • delete_signal (bool, optional) – Whether to delete original signals after segmentation (default is True).

Returns:

Segments and signal object with SQIs.

Return type:

tuple

vital_sqi.pipeline.pipeline_highlevel.get_qualified_ecg(file_name, file_type, sqi_dict_filename, rule_dict_filename, ruleset_order, signal_idx=1, timestamp_idx=0, channel_num=None, channel_name=None, predefined_reject=False, sampling_rate=None, start_datetime=None, split_type=0, duration=30, auto_mode=False, lower_bound=0.1, upper_bound=0.9, overlapping=None, peak_detector=6, segment_name=None, save_image=False, output_dir=None)[source]

Extracts SQIs for ECG, classifies segments, and saves accepted/rejected segments.

Parameters:
  • of (All parameters are similar to get_qualified_ppg with the addition)

  • file_type (str) – Type of the ECG file.

Returns:

Signal object containing classified segments and SQIs.

Return type:

signal_obj

vital_sqi.pipeline.pipeline_highlevel.get_qualified_ppg(file_name, sqi_dict_filename, signal_idx, timestamp_idx, rule_dict_filename, ruleset_order, predefined_reject=False, info_idx=None, timestamp_unit='ms', sampling_rate=None, start_datetime=None, split_type=0, duration=30, overlapping=None, peak_detector=6, auto_mode=False, lower_bound=0.05, upper_bound=0.95, segment_name=None, save_image=False, output_dir=None, delete_signal=False)[source]

Extracts SQIs for PPG, classifies segments, and saves accepted/rejected segments.

Parameters:
  • of (All parameters are similar to get_ppg_sqis with the addition)

  • rule_dict_filename (str) – Path to the rule dictionary file.

  • ruleset_order (dict) – Order of rulesets for classification.

  • predefined_reject (bool, optional) – If True, use predefined rejection criteria (default is False).

  • save_image (bool, optional) – If True, saves segment images (default is False).

  • output_dir (str, optional) – Directory to save accepted/rejected segments (default is current directory).

  • delete_signal (bool, optional) – Whether to delete original signals after segmentation (default is True).

Returns:

Signal object containing classified segments and SQIs.

Return type:

signal_obj

Core pipeline functions

Signal Quality Index (SQI) Processing and Classification Utilities

vital_sqi.pipeline.pipeline_functions.classify_segments(sqis, rule_dict_filename, ruleset_order, auto_mode=True, lower_bound=0.05, upper_bound=0.95, mode='legacy', robust_config=None, target_accept_rate=0.85)[source]

Classify each segment as 'accept' or 'reject' using threshold rules.

The classifier builds a RuleSet from the rules named in ruleset_order, then runs RuleSet.execute on every segment row. Rules are evaluated in ascending integer key order; the first 'reject' short-circuits evaluation for that segment (linear early-exit, not recursive).

Threshold-selection strategies (auto_mode argument)

auto_mode=False or "manual"

Thresholds stored in rule_dict_filename are used exactly as written. Use this when you want to apply externally calibrated bounds without adapting them to the current recording.

auto_mode=True or "quantile" (default)

Replace each rule’s bounds with the empirical lower_bound / upper_bound quantiles of the SQI values observed across all segments. Simple and predictable, but with many independent rules the joint accept rate can be much lower than upper - lower would suggest because each rule trims its own tails.

auto_mode="tune"

Auto-tune the per-rule quantile so the joint accept rate targets target_accept_rate (default 0.85). Under the independence approximation each rule keeps target ** (1/n_rules) of its distribution, splitting the trim symmetrically across both tails. Much more forgiving than plain "quantile" mode when several rules are active. See vital_sqi.rule.auto_threshold.per_rule_quantile() for the underlying math.

Degenerate rules — SQIs whose distribution collapses to a single value across the recording (e.g. zero_crossings_rate_sqi on mean-centred PPG) — are dropped from the rule set with a warning instead of producing a 0-width “reject everything” band.

param sqis:

One DataFrame per segment produced by extract_sqi(). Every DataFrame must have the SQI column names referenced in ruleset_order.

type sqis:

list of DataFrame

param rule_dict_filename:

Path to a rule_dict.json file. Each entry must have keys "name" (SQI column name) and "def" (list of threshold conditions accepted by update_rule()). The calibrated file at vital_sqi/resource/rule_dict.json is the default starting point.

type rule_dict_filename:

str

param ruleset_order:

Maps integer priority keys to rule names present in the rule file, e.g. {1: "kurtosis_sqi", 2: "perfusion_sqi"}. Lower key = evaluated first. Only rules listed here participate in classification.

type ruleset_order:

dict

param auto_mode:

See above. True is an alias for "quantile"; False is an alias for "manual". Default True.

type auto_mode:

bool or str, optional

param lower_bound:

Lower quantile for "quantile" mode (default 0.05).

type lower_bound:

float, optional

param upper_bound:

Upper quantile for "quantile" mode (default 0.95).

type upper_bound:

float, optional

param target_accept_rate:

Joint accept rate target for "tune" mode (default 0.85). Ignored unless auto_mode == "tune".

type target_accept_rate:

float, optional

returns:
  • ruleset (RuleSet) – The RuleSet used for classification. Only rules with usable (non-degenerate) bands are included.

  • sqis (list of DataFrame) – The input list with an added "decision" column ('accept' or 'reject') in each DataFrame.

raises FileNotFoundError:

If rule_dict_filename does not exist.

raises KeyError:

If a rule name from ruleset_order is absent from the rule file.

raises ValueError:

If auto_mode is not one of the documented values.

Examples

>>> ruleset_order = {1: "kurtosis_sqi", 2: "perfusion_sqi"}
>>> ruleset, sqis = classify_segments(
...     sqis, "vital_sqi/resource/rule_dict.json",
...     ruleset_order, auto_mode="tune", target_accept_rate=0.85,
... )
>>> decisions = [df["decision"].iloc[0] for df in sqis]
vital_sqi.pipeline.pipeline_functions.extract_segment_sqi(s, sqi_list, sqi_names, sqi_arg_list, wave_type)[source]

Extract all SQIs for a single signal segment.

Peak detection is performed once per segment and the results are reused across all per-beat SQI functions via the _peak_list / _trough_list private keyword arguments injected into get_sqi().

Parameters:
  • s (DataFrame) – Segment signal data. Second column (index 1) must contain the raw waveform values.

  • sqi_list (list of callable) – SQI functions to evaluate, in the same order as sqi_names.

  • sqi_names (list of str) – Identifiers for each SQI, matched against keys in sqi_arg_list.

  • sqi_arg_list (dict) – Mapping of SQI name → keyword-argument dict. Each dict is forwarded to get_sqi() and ultimately to the underlying SQI function.

  • wave_type (str) – 'PPG' or 'ECG'; controls peak detector branch.

Returns:

One entry per SQI column produced (multi-element SQIs generate _mean_sqi, _median_sqi, _std_sqi columns via get_sqi_dict()).

Return type:

Series

vital_sqi.pipeline.pipeline_functions.extract_sqi(segments, milestones, sqi_dict_filename, wave_type='PPG', n_jobs=1)[source]

Extract all configured SQIs for every segment and return a result DataFrame.

This is the top-level entry point for batch SQI extraction. Internally it calls extract_segment_sqi() for each segment, which handles:

  • Routing HRV SQIs through a single cached get_nn() call per segment.

  • Routing signal-level SQIs directly to the SQI function.

  • Catching per-SQI exceptions and returning NaN for failed SQIs.

Column names in the output follow these rules:

  • Scalar SQIs → one column named by the sqi_dict key.

  • Dict-returning SQIs (e.g. poincare_sqi) → one column per dict key (sd1, sd2, area, ratio).

  • Per-beat SQIs returning a list → three columns: {key}_mean_sqi, {key}_median_sqi, {key}_std_sqi.

Parameters:
  • segments (list of DataFrame) – Segmented signal DataFrames produced by split_segment(). Each DataFrame must have two columns: timestamps (column 0) and raw waveform values (column 1).

  • milestones (DataFrame) – Two-column DataFrame with start_idx and end_idx (sample positions in the original recording) for each segment.

  • sqi_dict_filename (str) –

    Path to the JSON SQI configuration file. The calibrated default is vital_sqi/resource/sqi_dict.json.

    Format:

    {
      "user_label":  {"sqi": "registered_function_name", "args": {...}},
      "kurtosis":    {"sqi": "kurtosis_sqi", "args": {"axis": 0}},
      "poincare":    {"sqi": "poincare_sqi", "args": {}}
    }
    

    "sqi" must be a key in sqi_mapping. "args" are keyword arguments forwarded verbatim to the SQI function.

  • wave_type (str, optional) – 'PPG' (default) or 'ECG'. Passed to every SQI that accepts a wave_type parameter and controls peak detector branch selection.

Returns:

One row per segment. Columns are SQI labels from the config file (expanded for multi-output SQIs) plus start_idx and end_idx.

Return type:

pd.DataFrame

Examples

>>> from vital_sqi.pipeline.pipeline_functions import extract_sqi
>>> sqi_df = extract_sqi(segments, milestones,
...                      "vital_sqi/resource/sqi_dict.json",
...                      wave_type="PPG")
>>> print(sqi_df.columns.tolist())
['kurtosis', 'perfusion', 'sd1', 'sd2', 'area', 'ratio', ..., 'start_idx', 'end_idx']
vital_sqi.pipeline.pipeline_functions.generate_rule(rule_name, rule_def)[source]

Generate a Rule object from rule definition.

Parameters:
  • rule_name (str) – Rule name.

  • rule_def (dict) – Rule definitions.

Returns:

Created rule object.

Return type:

Rule

vital_sqi.pipeline.pipeline_functions.get_decision_segments(segments, decision, reject_decision)[source]

Separate accepted and rejected segments based on decisions.

Parameters:
  • segments (list) – List of all segments.

  • decision (list) – Decisions from SQI evaluation (‘accept’/’reject’).

  • reject_decision (list) – Additional rejection criteria.

Returns:

Accepted and rejected segments.

Return type:

tuple of lists

vital_sqi.pipeline.pipeline_functions.get_reject_segments(segments, wave_type)[source]

Return accept/reject decisions for each segment based on wave type.

Parameters:
  • segments (list) – List of signal DataFrames.

  • wave_type (str) – Type of waveform (‘PPG’ or ‘ECG’).

Returns:

Series with ‘accept’ or ‘reject’ for each segment.

Return type:

Series

vital_sqi.pipeline.pipeline_functions.get_sqi(sqi_func, sqi_name, s, per_beat=False, use_mean_beat=True, mean_resample_size=100, wave_type='PPG', peak_detector=6, _nn_intervals=None, _signal_values=None, _peak_list=None, _trough_list=None, **kwargs)[source]

Compute SQI for a single signal segment.

Parameters:
  • sqi_func (callable) – SQI function to apply.

  • sqi_name (str) – Identifier for this SQI, used as the column name in the output dict.

  • s (DataFrame, Series, or array-like) – Signal data. When a DataFrame is passed the second column (index 1) is used as the signal; a Series is converted directly; anything else is coerced via np.asarray.

  • per_beat (bool, optional) – If True perform per-beat SQI computation via per_beat_sqi() (default False).

  • use_mean_beat (bool, optional) – Passed through to per_beat_sqi(); only relevant when per_beat is True (default True).

  • mean_resample_size (int, optional) – Passed through to per_beat_sqi(); only relevant when per_beat is True (default 100).

  • wave_type (str, optional) – 'PPG' or 'ECG'. Controls which peak detector branch is used when per_beat is True, and is forwarded to SQI functions that accept a wave_type parameter (default 'PPG').

  • peak_detector (int, optional) – Peak detector index (0–7) passed to PeakDetector when per_beat is True (default 6).

  • _signal_values (np.ndarray, optional (internal)) – Pre-extracted signal array injected by extract_segment_sqi() to avoid redundant array conversion. Not intended for direct use.

  • _peak_list (array-like, optional (internal)) – Pre-computed peak indices injected by extract_segment_sqi().

  • _trough_list (array-like, optional (internal)) – Pre-computed trough indices injected by extract_segment_sqi().

  • **kwargs – Additional keyword arguments forwarded to sqi_func.

Returns:

Mapping of column name(s) to scalar SQI value(s), as produced by get_sqi_dict().

Return type:

dict

vital_sqi.pipeline.pipeline_functions.get_sqi_dict(sqis, sqi_name)[source]

Package a raw SQI result into a {column_name: value} dict for DataFrame insertion.

Parameters:
  • sqis (float, int, np.floating, np.ndarray, list, or dict) – Raw value(s) returned by an SQI function.

  • sqi_name (str) – Base column name for this SQI.

Returns:

Mapping of column name(s) to value(s). Rules:

  • correlogram_sqi → single {"correlogram_sqi": scalar}.

  • dict input → returned unchanged.

  • Scalar (float / int / np.floating / np.integer) → {sqi_name: scalar}.

  • 1-element list or ndarray → {sqi_name: value}.

  • Multi-element list or ndarray → three columns: {sqi_name_mean_sqi, sqi_name_median_sqi, sqi_name_std_sqi}.

Return type:

dict

vital_sqi.pipeline.pipeline_functions.map_decision(decision)[source]

Map decision string to integer for processing.

Parameters:

decision (str) – ‘accept’ or ‘reject’

Returns:

0 for ‘accept’, 1 for ‘reject’

Return type:

int

vital_sqi.pipeline.pipeline_functions.per_beat_sqi(sqi_func, troughs, signal, use_mean_beat, mean_resample_size, taper=False, **kwargs)[source]

Compute SQI per beat by dividing the signal based on trough indices.

Parameters:
  • sqi_func (callable) – SQI function with signature f(beat_array, **kwargs) -> scalar.

  • troughs (array-like of int) – Indices marking the start of each beat (typically returned by PeakDetector). Requires at least two entries to form one beat.

  • signal (array-like) – Raw signal values for a single segment.

  • use_mean_beat (bool) – If True, resample every beat to mean_resample_size samples, average them into one mean beat, and apply sqi_func once. The single result is then replicated to produce one value per beat interval. If False, apply sqi_func independently to each beat.

  • mean_resample_size (int) – Number of samples to use when resampling beats (only relevant when use_mean_beat is True).

  • taper (bool, optional) – If True, apply taper_signal() to each beat before SQI calculation (default False).

  • **kwargs – Additional keyword arguments forwarded to sqi_func.

Returns:

One SQI value per beat interval (len(troughs) - 1 elements in the normal case). Returns [-np.inf] when fewer than two troughs are found or when no valid beats remain after filtering.

Return type:

list of float