Source code for vital_sqi.sqi.rpeaks_sqi

"""Signal Quality Indexes based on R peak detection.
This module includes various R peak detection-based metrics for evaluating ECG and PPG signals.
Metrics:
- Ratio of ectopic beats
- Correlogram
- MSQ: Consistency evaluation between two R-peak detectors
- Interpolation-based SQI
"""

import numpy as np
from vital_sqi.common.utils import HiddenPrints

# from hrvanalysis.preprocessing import (
#     remove_outliers,
#     remove_ectopic_beats,
#     interpolate_nan_values,
# )
import warnings
from statsmodels.tsa.stattools import acf
from vital_sqi.common.rpeak_detection import PeakDetector
from scipy import signal
from vitalDSP.transforms.beats_transformation import RRTransformation


[docs] def ectopic_sqi( s, rule_index=1, sample_rate=100, rpeak_detector=6, wave_type="PPG", low_rri=300, high_rri=2000, ): """ Compute the ratio of ectopic (invalid) beats in a signal. The function first removes out-of-range RR intervals (outliers), then applies an interpolation-based ectopic detection method to the remaining intervals. Parameters ---------- s : array-like Raw waveform signal (PPG or ECG). rule_index : int, optional Controls which step of ectopic analysis to perform: * ``0`` — return the outlier ratio only (no ectopic removal) * ``1`` — apply *adaptive* ectopic removal after outlier filtering * ``2`` — apply *linear* ectopic removal after outlier filtering * ``3`` — apply *spline* ectopic removal after outlier filtering Default is ``1``. sample_rate : int, optional Sampling frequency in Hz (default ``100``). rpeak_detector : int, optional Peak detector index used by ``RRTransformation`` (default ``6``). This parameter is passed through but the actual detector is managed internally by ``vitalDSP``. wave_type : str, optional ``'PPG'`` or ``'ECG'`` (default ``'PPG'``). low_rri : int, optional Minimum acceptable RR interval in milliseconds (default ``300``). high_rri : int, optional Maximum acceptable RR interval in milliseconds (default ``2000``). Returns ------- float A ratio in ``[0, 1]`` where ``0`` means no ectopic/outlier beats and ``1`` means all beats are invalid. Returns ``np.nan`` on error. """ rules = ["adaptive", "linear", "spline"] try: # Validate rule_index if rule_index not in range(0, len(rules) + 1): raise ValueError( f"Invalid rule_index: {rule_index}. Must be between 0 and {len(rules)}." ) # Initialize RRTransformation for the signal with HiddenPrints(): transformer = RRTransformation( signal=s, fs=sample_rate, signal_type=wave_type ) rr_intervals = transformer.compute_rr_intervals() if len(rr_intervals) < 2: raise ValueError("Insufficient RR intervals for analysis.") # Remove outliers based on RRI bounds and calculate outlier ratio rr_intervals_cleaned = transformer.remove_invalid_rr_intervals( rr_intervals, min_rr=low_rri / 1000, max_rr=high_rri / 1000 ) number_outliers = np.isnan(rr_intervals_cleaned).sum() total_rr_intervals = len(rr_intervals_cleaned) outlier_ratio = number_outliers / max(total_rr_intervals, 1) if rule_index == 0: return outlier_ratio # Impute invalid intervals for ectopic beat removal interpolated_rr_intervals = transformer.impute_rr_intervals( rr_intervals_cleaned ) selected_rule = rules[rule_index - 1] nn_intervals = remove_ectopic_beats( interpolated_rr_intervals, method=selected_rule ) number_ectopics = np.isnan(nn_intervals).sum() ectopic_ratio = number_ectopics / max(len(nn_intervals), 1) return ectopic_ratio except Exception as e: warnings.warn(f"Unexpected error in ectopic_sqi: {e}") return np.nan
[docs] def remove_ectopic_beats(rr_intervals, method="adaptive"): """ Mark ectopic beats in an RR-interval array as NaN. Parameters ---------- rr_intervals : np.ndarray RR intervals in seconds, possibly containing NaN values from a prior outlier-removal step. method : str, optional Detection algorithm. One of: * ``'adaptive'`` — compares each interval to a 5-point running mean; deviations > 20 % are flagged. * ``'linear'`` — fits a linear trend to valid intervals; deviations > 15 % are flagged. * ``'spline'`` — fits a smoothing spline; deviations > 10 % are flagged. Default is ``'adaptive'``. Returns ------- np.ndarray Copy of *rr_intervals* with ectopic positions set to ``NaN``. If an error occurs the original array is returned unchanged. Example ------- >>> import numpy as np >>> from vital_sqi.sqi.rpeaks_sqi import remove_ectopic_beats >>> rr = np.array([0.8, 1.2, 1.0, 2.5, 0.9, 0.85]) >>> clean = remove_ectopic_beats(rr, method="adaptive") """ try: if method not in ["adaptive", "linear", "spline"]: raise ValueError( f"Invalid method: {method}. Choose 'adaptive', 'linear', or 'spline'." ) rr_intervals_cleaned = np.copy(rr_intervals) valid_intervals = rr_intervals_cleaned[~np.isnan(rr_intervals_cleaned)] if len(valid_intervals) < 3: raise ValueError("Not enough valid RR intervals for ectopic detection.") if method == "adaptive": # Detect ectopic beats using a running mean and threshold running_mean = np.convolve(valid_intervals, np.ones(5) / 5, mode="same") deviations = np.abs(valid_intervals - running_mean) threshold = 0.2 * running_mean # 20% deviation considered ectopic ectopic_mask = deviations > threshold rr_intervals_cleaned[~np.isnan(rr_intervals_cleaned)] = np.where( ectopic_mask, np.nan, valid_intervals ) elif method == "linear": # Use linear interpolation to fit a trend and remove ectopic beats linear_fit = np.polyval( np.polyfit(np.arange(len(valid_intervals)), valid_intervals, 1), np.arange(len(valid_intervals)), ) deviations = np.abs(valid_intervals - linear_fit) threshold = 0.15 * linear_fit # 15% deviation considered ectopic ectopic_mask = deviations > threshold rr_intervals_cleaned[~np.isnan(rr_intervals_cleaned)] = np.where( ectopic_mask, np.nan, valid_intervals ) elif method == "spline": # Use spline fitting to detect and remove ectopic beats from scipy.interpolate import UnivariateSpline spline = UnivariateSpline( np.arange(len(valid_intervals)), valid_intervals, s=0.1 ) spline_fit = spline(np.arange(len(valid_intervals))) deviations = np.abs(valid_intervals - spline_fit) threshold = 0.1 * spline_fit # 10% deviation considered ectopic ectopic_mask = deviations > threshold rr_intervals_cleaned[~np.isnan(rr_intervals_cleaned)] = np.where( ectopic_mask, np.nan, valid_intervals ) return rr_intervals_cleaned except Exception as e: warnings.warn(f"Error in remove_ectopic_beats: {e}") return rr_intervals
[docs] def correlogram_sqi(s, sample_rate=100, wave_type="PPG", time_lag=3, n_selection=3): """ Compute the Correlogram SQI from the autocorrelation function (ACF). The ACF is computed up to *time_lag* seconds. The top *n_selection* ACF peaks are selected and their mean is returned as a single quality score. A high score (close to 1) indicates a highly periodic signal; a low score indicates poor periodicity or noise. Parameters ---------- s : array-like Raw waveform signal (PPG or ECG). sample_rate : int, optional Sampling frequency in Hz (default ``100``). wave_type : str, optional ``'PPG'`` or ``'ECG'`` (default ``'PPG'``). Currently unused but retained for API consistency with other SQI functions. time_lag : int, optional Duration in seconds over which ACF is computed (default ``3``). The signal must be at least ``time_lag * sample_rate`` samples long. n_selection : int, optional Number of top ACF peaks to average (default ``3``). Returns ------- float Mean ACF value of the top *n_selection* peaks, in ``[-1, 1]``. Returns ``np.nan`` if the signal is too short, no peaks are found, or an error occurs. """ try: nlags = time_lag * sample_rate if len(s) < nlags: raise ValueError("Signal length is too short for the specified time lag.") # Compute autocorrelation corr = acf(s, nlags=nlags, fft=True) # Find peaks in the autocorrelation function corr_peaks_idx = signal.find_peaks(corr)[0] corr_peaks_value = corr[corr_peaks_idx] if len(corr_peaks_idx) == 0: warnings.warn("No peaks detected in the autocorrelation function.") return np.nan # Select top peaks based on autocorrelation values top_values = np.sort(corr_peaks_value)[ -n_selection: ] # Select top `n_selection` values # Compute SQI as the mean of the top peak values sqi_value = np.mean(top_values) return sqi_value except Exception as e: warnings.warn(f"Error in correlogram_sqi: {e}") return np.nan
[docs] def interpolation_sqi(s): """ Interpolation-based SQI to assess consistency in R-R intervals. Parameters ---------- s : array-like Input signal. Returns ------- float Interpolation SQI value. """ # Placeholder, no clear implementation available return 0.0
[docs] def msq_sqi(s, peak_detector_1=7, peak_detector_2=6, wave_type="PPG"): """ Computes the Modified Signal Quality (MSQ) SQI based on agreement between two R-peak detectors. This SQI is used to evaluate the consistency of peaks detected by different algorithms. Parameters ---------- s : array-like Input signal. peak_detector_1 : int, optional Type of the primary peak detection algorithm. Default is 7 (Billauer). peak_detector_2 : int, optional Type of the secondary peak detection algorithm. Default is 6 (Scipy). wave_type : str, optional Type of signal, either 'PPG' or 'ECG'. Default is 'PPG'. Returns ------- float MSQ SQI value indicating consistency between detectors. """ if not isinstance(s, (np.ndarray, list)) or len(s) == 0: warnings.warn("Input signal is empty or invalid.") return np.nan if wave_type == "ECG": warnings.warn( "msq_sqi for ECG currently uses two PPG-style detectors as a proxy. " "For a meaningful ECG MSQ, two distinct detectors (e.g. Pan-Tompkins " "and Hamilton) are needed. Returning NaN until Phase-2 ECG detectors " "are available.", RuntimeWarning, stacklevel=2, ) return np.nan try: detector = PeakDetector(wave_type=wave_type) # Both wave types use ppg_detector with two different algorithm indices # because ecg_detector does not expose a detector_type parameter. peaks_1, _ = detector.ppg_detector(s, detector_type=peak_detector_1) peaks_2, _ = detector.ppg_detector( s, detector_type=peak_detector_2, preprocess=False ) # Check if either detector found no peaks if len(peaks_1) == 0 or len(peaks_2) == 0: warnings.warn("No peaks detected by one or both detectors.") return 0.0 # Calculate the percentage of matching peaks between the two detectors intersection_peaks = np.intersect1d(peaks_1, peaks_2) peak1_dom = len(intersection_peaks) / max(len(peaks_1), 1) peak2_dom = len(intersection_peaks) / max(len(peaks_2), 1) # Return minimum of two ratios for robustness return min(peak1_dom, peak2_dom) except Exception as e: warnings.warn(f"Error in msq_sqi: {e}") return np.nan
[docs] def amplitude_consistency_sqi(s, sample_rate=100, wave_type="PPG", peak_detector=6): """ Measure beat-to-beat amplitude variability as a signal quality indicator. High beat-to-beat variability in peak amplitude indicates motion artifact or poor electrode contact. The metric is the coefficient of variation (std / mean) of detected peak amplitudes. A clean signal has low CV (< 0.1); an artifact-corrupted signal has high CV (> 0.3). Parameters ---------- s : array_like Raw waveform signal (PPG or ECG). sample_rate : int, optional Sampling frequency in Hz (default ``100``). wave_type : str, optional ``'PPG'`` or ``'ECG'`` (default ``'PPG'``). peak_detector : int, optional Peak detector index (default ``6``). Returns ------- float Coefficient of variation of peak amplitudes, or ``np.nan`` if fewer than 2 peaks are detected. """ try: detector = PeakDetector(wave_type=wave_type) if wave_type == "PPG": peaks, _ = detector.ppg_detector(s, detector_type=peak_detector) else: peaks, _ = detector.ecg_detector(s) s = np.asarray(s, dtype=float) if len(peaks) < 2: warnings.warn("amplitude_consistency_sqi: fewer than 2 peaks detected.") return np.nan amplitudes = s[np.asarray(peaks, dtype=int)] mean_amp = np.mean(amplitudes) if mean_amp == 0: return np.nan return float(np.std(amplitudes, ddof=1) / np.abs(mean_amp)) except Exception as e: warnings.warn(f"Error in amplitude_consistency_sqi: {e}") return np.nan
# from vitalDSP.utils.synthesize_data import generate_ecg_signal # if __name__ == "__main__": # sfecg = 256 # N = 100 # Anoise = 0.05 # hrmean = 70 # ecg_signal = generate_ecg_signal( # sfecg=sfecg, N=N, Anoise=Anoise, hrmean=hrmean # ) # # result_correlogram = correlogram_sqi(ecg_signal, sample_rate=sfecg, wave_type="ECG") # # print(result_correlogram) # result_ectopic = ectopic_sqi(ecg_signal, sample_rate=sfecg, wave_type="ECG") # print(result_ectopic)