Source code for vital_sqi.common.rpeak_detection

"""R peak detection approaches for PPG and ECG."""

import numpy as np
from sklearn.cluster import KMeans
from scipy import signal
import warnings
import logging
from vital_sqi.common.band_filter import BandpassFilter
from vitalDSP.physiological_features.waveform import WaveformMorphology

# Set up logging configuration
logging.basicConfig(
    level=logging.ERROR, format="%(asctime)s - %(levelname)s - %(message)s"
)

# Detection method constants — PPG
ADAPTIVE_THRESHOLD = 1
COUNT_ORIG_METHOD = 2
CLUSTERER_METHOD = 3
SLOPE_SUM_METHOD = 4
MOVING_AVERAGE_METHOD = 5
DEFAULT = 6
BILLAUER_METHOD = 7
AMPD_METHOD = 8          # Automatic Multiscale Peak Detection (PPG)
LOCAL_MAX_IBI = 9        # Local-max + IBI tracking (PPG)

# Detection method constants — ECG (used by ecg_detector)
ECG_DEFAULT = 10         # vitalDSP WaveformMorphology (kept as gold standard)
PAN_TOMPKINS = 11        # Classic Pan-Tompkins (1985)
HAMILTON = 12            # Hamilton-Tompkins simplified (2002)
ENGZEE = 13              # Engzee-Zeelenberg (1979)


[docs] class PeakDetector: """ Detects peaks in PPG and ECG signals using various algorithms. Parameters ---------- wave_type : str, optional The type of waveform to detect peaks from, either 'PPG' or 'ECG' (default is 'PPG'). fs : int, optional Sampling frequency of the signal (default is 100). Examples -------- >>> detector = PeakDetector(wave_type="PPG", fs=100) >>> signal = np.random.randn(1000) >>> peaks, troughs = detector.ppg_detector(signal) # DEFAULT (vitalDSP) >>> peaks, troughs = detector.ppg_detector(signal, detector_type=ADAPTIVE_THRESHOLD) >>> r, q, s, p, t = PeakDetector(wave_type="ECG", fs=256).ecg_detector(signal) """ def __init__(self, wave_type="PPG", fs=100): if wave_type not in ["PPG", "ECG"]: raise ValueError("Invalid wave_type. Expected 'PPG' or 'ECG'.") self.wave_type = wave_type self.fs = fs
[docs] def ecg_detector(self, s, detector_type=ECG_DEFAULT, get_session=False): """ Detects R-peaks and critical points in ECG signals. The default path (ECG_DEFAULT) uses vitalDSP WaveformMorphology, which provides high-quality derivative-based R-peak detection plus full critical point extraction (Q/S/P/T). Alternative R-peak algorithms (PAN_TOMPKINS, HAMILTON, ENGZEE) also use WaveformMorphology for the critical-point step so that Q/S/P/T quality is never downgraded. Parameters ---------- s : array_like Input ECG signal. detector_type : int, optional R-peak detection algorithm. Available constants: * ``ECG_DEFAULT`` (10) — vitalDSP WaveformMorphology (recommended; also provides Q/S/P/T morphology points) * ``PAN_TOMPKINS`` (11) — classic Pan-Tompkins 1985 with adaptive dual-threshold; bandpass 5–15 Hz * ``HAMILTON`` (12) — Hamilton-Tompkins simplified 2002; single-pass derivative-square-integrate; bandpass 8–16 Hz * ``ENGZEE`` (13) — Engzee-Zeelenberg 1979; dynamic threshold on high-pass filtered derivative All alternatives use vitalDSP WaveformMorphology for Q/S/P/T extraction, anchored to the chosen R-peak indices. Default is ``ECG_DEFAULT``. get_session : bool, optional If True return (r_peaks, ecg_session) instead of the full tuple. Returns ------- tuple (r_peaks, q_valleys, s_valleys, p_peaks, t_peaks) normally, or (r_peaks, ecg_session) when get_session=True. """ valid = {ECG_DEFAULT, PAN_TOMPKINS, HAMILTON, ENGZEE} if detector_type not in valid: raise ValueError( f"Invalid ECG detector_type: {detector_type}. " f"Choose from ECG_DEFAULT={ECG_DEFAULT}, PAN_TOMPKINS={PAN_TOMPKINS}, " f"HAMILTON={HAMILTON}, ENGZEE={ENGZEE}." ) if len(s) == 0: raise ValueError("Input ECG signal is empty.") try: # Step 1 — detect R-peaks with the chosen algorithm if detector_type == PAN_TOMPKINS: r_peaks = self.detect_r_peaks_pan_tompkins(s) elif detector_type == HAMILTON: r_peaks = self.detect_r_peaks_hamilton(s) elif detector_type == ENGZEE: r_peaks = self.detect_r_peaks_engzee(s) else: # ECG_DEFAULT — vitalDSP WaveformMorphology wm = WaveformMorphology(s, signal_type="ECG", fs=self.fs) r_peaks = wm.r_peaks # Step 2 — extract critical points using vitalDSP (always) wm = WaveformMorphology(s, signal_type="ECG", fs=self.fs) # Override the WM r_peaks with those from our chosen detector so that # Q/S/P/T search windows are anchored to the correct peaks. wm.r_peaks = r_peaks q_valleys = wm.detect_q_valley(r_peaks) s_valleys = wm.detect_s_valley(r_peaks) p_peaks = wm.detect_p_peak(r_peaks, q_valleys) t_peaks = wm.detect_t_peak(r_peaks, s_valleys) if get_session: ecg_session = wm.detect_ecg_session( p_peaks=p_peaks, t_peaks=t_peaks ) return r_peaks, ecg_session return r_peaks, q_valleys, s_valleys, p_peaks, t_peaks except Exception as e: logging.error(f"ECG detection failed: {e}") return np.array([]), np.array([]), np.array([]), np.array([]), np.array([])
[docs] def ppg_detector( self, s, detector_type=DEFAULT, get_session=False, preprocess=False, cubing=False, ): """ Detects peaks in PPG signals using specified detector type. Parameters ---------- s : array_like Input PPG signal. detector_type : int, optional Method for peak detection (default is ``DEFAULT`` = 6, which uses vitalDSP WaveformMorphology systolic-peak detection). Available constants: * ``ADAPTIVE_THRESHOLD`` (1) — threshold adapts to local signal amplitude * ``COUNT_ORIG_METHOD`` (2) — count-based local maxima * ``CLUSTERER_METHOD`` (3) — KMeans clustering * ``SLOPE_SUM_METHOD`` (4) — Zong 2003 slope-sum onset * ``MOVING_AVERAGE_METHOD`` (5) — Elgendi two-moving-average * ``DEFAULT`` (6) — vitalDSP WaveformMorphology (recommended) * ``BILLAUER_METHOD`` (7) — Billauer peak/trough tracker * ``AMPD_METHOD`` (8) — Automatic Multiscale Peak Detection * ``LOCAL_MAX_IBI`` (9) — local-max with IBI tracking preprocess : bool, optional Whether to apply filtering to the signal (default is False). cubing : bool, optional Whether to cube the signal for enhanced peak detection (default is False). Returns ------- tuple Detected peaks and troughs. """ _ppg_methods = { ADAPTIVE_THRESHOLD, COUNT_ORIG_METHOD, CLUSTERER_METHOD, SLOPE_SUM_METHOD, MOVING_AVERAGE_METHOD, DEFAULT, BILLAUER_METHOD, AMPD_METHOD, LOCAL_MAX_IBI, } if detector_type not in _ppg_methods: raise ValueError(f"Invalid detector_type: {detector_type}") if len(s) == 0: raise ValueError("Input signal is empty.") if preprocess: bp_filter = BandpassFilter(fs=self.fs) s = bp_filter.signal_highpass_filter(s, cutoff=1, order=2) s = bp_filter.signal_lowpass_filter(s, cutoff=12, order=2) if cubing: s = s**3 if get_session: waveform = WaveformMorphology(s, signal_type="PPG", fs=self.fs) ppg_session = waveform.detect_ppg_session() return waveform.systolic_peaks, ppg_session if detector_type == ADAPTIVE_THRESHOLD: return self.detect_peak_trough_adaptive_threshold(s) elif detector_type == CLUSTERER_METHOD: return self.detect_peak_trough_clusterer(s) elif detector_type == SLOPE_SUM_METHOD: return self.detect_peak_trough_slope_sum(s) elif detector_type == MOVING_AVERAGE_METHOD: return self.detect_peak_trough_moving_average_threshold(s) elif detector_type == COUNT_ORIG_METHOD: return self.detect_peak_trough_count_orig(s) elif detector_type == BILLAUER_METHOD: return self.detect_peak_trough_billauer(s) elif detector_type == AMPD_METHOD: return self.detect_peak_trough_ampd(s) elif detector_type == LOCAL_MAX_IBI: return self.detect_peak_trough_local_max_ibi(s) else: # DEFAULT waveform = WaveformMorphology(s, signal_type="PPG", fs=self.fs) return waveform.systolic_peaks, waveform.detect_troughs()
[docs] def search_for_onset(self, slope_sum, n, local_max): """ Walk back from detection point n to find the pulse onset. Following Zong et al. 2003: onset is where slope_sum drops below 1% of local_max, up to 200 ms before the detection point. """ threshold = 0.01 * local_max lookback = int(0.2 * self.fs) for k in range(n, max(0, n - lookback), -1): if slope_sum[k] < threshold: return k return max(0, n - lookback)
[docs] def detect_peak_trough_slope_sum(self, s): """ Detect peaks and troughs in a signal using the slope sum method. Reference: Zong et al. (2003) Computers in Cardiology 30:259-262. Parameters ---------- s : array_like Input signal. Returns ------- tuple Detected peaks and troughs as lists of indices. """ fs = self.fs # use instance sampling rate window_size = max(1, int(0.128 * fs)) # 128 ms window slope_sum = [] onset_list = [] peak_finalist = [] trough_finalist = [] # Compute slope sum for each sample using a sliding window for n in range(window_size + 1, len(s)): Zk = sum(max(0, s[k] - s[k - 1]) for k in range(n - window_size, n)) slope_sum.append(Zk) slope_sum = np.array(slope_sum) if len(slope_sum) == 0: return np.array([]), np.array([]) # Establish adaptive threshold from first 10 s (or all samples if shorter) init_samples = min(10 * fs, len(slope_sum)) initial_threshold = 3 * np.mean(slope_sum[:init_samples]) threshold_base = initial_threshold # Identify threshold crossings and search for peaks and onsets for n, Z_value in enumerate(slope_sum): actual_threshold = threshold_base * 0.6 if Z_value > actual_threshold: left = max(0, n - 15) right = min(len(slope_sum), n + 15) local_min = np.min(slope_sum[left:right]) local_max = np.max(slope_sum[left:right]) if (local_max - local_min) > abs(local_min) * 2: onset = self.search_for_onset(slope_sum, n, local_max) onset_list.append(onset) # Update threshold as running average (not max) threshold_base = 0.875 * threshold_base + 0.125 * local_max onset_list = np.unique(onset_list) for i in range(1, len(onset_list)): left, right = onset_list[i - 1], onset_list[i] try: peak_finalist.append(int(np.argmax(s[left:right])) + left) trough_finalist.append(int(np.argmin(s[left:right])) + left) except ValueError as e: warnings.warn(f"Peak detection failed at index {i} due to {e}") return np.array(peak_finalist), np.array(trough_finalist)
[docs] def detect_peak_trough_count_orig(self, s): """ Detect peaks and troughs in a signal using local extrema and thresholding. Parameters ---------- s : array_like Input signal. Returns ------- tuple Detected peaks and troughs as numpy arrays. """ # Identify local extrema local_maxima = signal.argrelmax(s)[0] local_minima = signal.argrelmin(s)[0] # Define thresholds based on quantiles peak_threshold = np.quantile(s[local_maxima], 0.75) * 0.2 trough_threshold = np.quantile(s[local_minima], 0.25) * 0.2 # Filter extrema based on thresholds peak_shortlist = local_maxima[s[local_maxima] >= peak_threshold] trough_shortlist = local_minima[s[local_minima] <= trough_threshold] # Initialize lists for peaks and troughs peak_finalist = [] trough_finalist = [] # Traverse through troughs to find peaks left_trough = trough_shortlist[0] for right_trough in trough_shortlist[1:]: # Peaks between the current pair of troughs peaks_in_range = peak_shortlist[ (peak_shortlist > left_trough) & (peak_shortlist < right_trough) ] if len(peaks_in_range) == 0: # No peaks found between the troughs left_trough = ( left_trough if s[left_trough] < s[right_trough] else right_trough ) else: # Select the highest peak peak = peaks_in_range[np.argmax(s[peaks_in_range])] peak_finalist.append(peak) trough_finalist.append(left_trough) left_trough = right_trough # Add the final right trough trough_finalist.append(right_trough) return np.array(peak_finalist), np.array(trough_finalist)
[docs] def detect_peak_trough_adaptive_threshold( self, s, adaptive_size=0.75, overlap=0, sliding=1 ): """ Detect peaks and troughs in a signal using an adaptive threshold approach. Parameters ---------- s : array_like Input signal. adaptive_size : float, optional Window size for adaptive thresholding as a fraction of the sampling rate (default is 0.75). overlap : float, optional Overlapping ratio for the sliding window (default is 0). sliding : int, optional Step size for sliding window (default is 1). Returns ------- tuple Detected peaks and troughs. """ adaptive_window = int(adaptive_size * self.fs) adaptive_threshold = self.get_moving_average(s, int(adaptive_window * 2 + 1)) start_ROIs, end_ROIs = self.get_ROI(s, adaptive_threshold) if not start_ROIs: return np.array([]), np.array([]) # Detect peaks within the regions of interest peak_finalist = [ np.argmax(s[start_ROI : end_ROI + 1]) + start_ROI for start_ROI, end_ROI in zip(start_ROIs, end_ROIs) ] if len(peak_finalist) < 2: return np.array(peak_finalist), np.array([]) # Detect troughs between the peaks; skip degenerate (zero-length) intervals trough_finalist = [] for idx in range(len(peak_finalist) - 1): p1, p2 = peak_finalist[idx], peak_finalist[idx + 1] if p2 > p1: trough_finalist.append(int(np.argmin(s[p1:p2])) + p1) return np.array(peak_finalist), np.array(trough_finalist)
[docs] def get_ROI(self, s, adaptive_threshold, margin=0.1): """ Identify regions of interest (ROIs) in the signal where peaks or troughs are likely to occur. Parameters ---------- s : array_like Input signal. adaptive_threshold : array_like Adaptive threshold values for the signal. margin : float, optional Margin (fraction of the signal range) to include before and after the ROI (default is 0.1). Returns ------- tuple Two lists: start_ROIs and end_ROIs, which contain the start and end indices of the ROIs. Notes ----- - ROIs are defined as contiguous regions where the signal exceeds the adaptive threshold. - Margins can be added to widen the ROIs for more inclusive peak/trough detection. Example ------- >>> s = [0, 1, 3, 7, 5, 2, 0, 6, 9, 8, 4, 1, 0] >>> adaptive_threshold = [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4] >>> peak_detector = PeakDetection(fs=100) >>> start_ROIs, end_ROIs = peak_detector.get_ROI(s, adaptive_threshold) """ # Identify regions where the signal exceeds the adaptive threshold above_threshold = s > adaptive_threshold # Find the start and end indices of these regions start_ROIs = [] end_ROIs = [] for i in range(1, len(above_threshold)): # Transition from below threshold to above threshold: start of an ROI if above_threshold[i] and not above_threshold[i - 1]: start_ROIs.append(i) # Transition from above threshold to below threshold: end of an ROI if not above_threshold[i] and above_threshold[i - 1]: end_ROIs.append(i - 1) # Handle case where the signal ends above the threshold if above_threshold[-1]: end_ROIs.append(len(s) - 1) # Apply margin to widen the ROIs signal_length = len(s) start_ROIs = [ max(0, int(start - margin * signal_length)) for start in start_ROIs ] end_ROIs = [ min(signal_length - 1, int(end + margin * signal_length)) for end in end_ROIs ] return start_ROIs, end_ROIs
[docs] def detect_peak_trough_moving_average_threshold(self, s): """ Detect peaks using a two-moving-average threshold (Elgendi et al. 2013). Reference: Elgendi M. et al., PLoS ONE 8(10):e76585, 2013. A short MA (w1 ≈ 120 ms) rises above a long MA (w2 ≈ 670 ms) baseline to form "blocks of interest"; the raw-signal argmax inside each block is the peak. Parameters ---------- s : array_like Input signal. Returns ------- tuple Detected peaks and troughs. """ s = np.asarray(s, dtype=float) w1 = max(1, int(0.12 * self.fs)) # ~120 ms w2 = max(1, int(0.67 * self.fs)) # ~670 ms ma_peak = self.get_moving_average(s, w1) ma_beat = self.get_moving_average(s, w2) if len(ma_peak) != len(s): ma_peak = ma_peak[: len(s)] if len(ma_beat) != len(s): ma_beat = ma_beat[: len(s)] threshold = ma_beat + 0.02 * np.mean(s) above = ma_peak > threshold # Find rising and falling edges of "blocks of interest" above_int = above.astype(int) rising = np.where(np.diff(above_int) == 1)[0] + 1 falling = np.where(np.diff(above_int) == -1)[0] + 1 # Handle signal ending inside a block; clip to last valid index if above[-1]: falling = np.append(falling, len(s) - 1) # Align: drop unmatched edges if len(rising) == 0 or len(falling) == 0: return np.array([]), np.array([]) if falling[0] < rising[0]: falling = falling[1:] min_len = min(len(rising), len(falling)) rising, falling = rising[:min_len], falling[:min_len] # Raw-signal argmax inside each block → true peak index. # Require the peak value to exceed the mean threshold to reject degenerate # end-of-signal blocks where the entire block is below the signal mean. signal_mean = np.mean(s) peaks_list = [] for i in range(len(rising)): block = s[rising[i]:falling[i]] if len(block) == 0: continue peak_idx = rising[i] + int(np.argmax(block)) if s[peak_idx] > signal_mean: peaks_list.append(peak_idx) peaks = np.array(peaks_list) # Troughs: raw-signal argmin between consecutive peaks troughs = np.array([ peaks[i] + int(np.argmin(s[peaks[i]:peaks[i + 1]])) for i in range(len(peaks) - 1) ]) return peaks, troughs
[docs] def get_moving_average(self, q, w): """ Calculates the moving average of a sequence. Parameters ---------- q : array_like Input sequence. w : int Window size for the moving average. Returns ------- array_like Moving average of the sequence. """ if w <= 0: raise ValueError("Window size must be greater than 0.") try: q_padded = np.pad(q, (w // 2, w - 1 - w // 2), mode="edge") return np.convolve(q_padded, np.ones(w) / w, "valid") except Exception as e: logging.error(f"Moving average calculation failed: {e}") return np.array([])
[docs] def detect_peak_trough_clusterer(self, s): """ Detects peaks and troughs in a signal using a clustering technique. Parameters ---------- s : array_like Input signal. Returns ------- tuple Detected peaks and troughs. """ try: local_maxima = signal.argrelmax(s)[0] local_minima = signal.argrelmin(s)[0] def cluster_extrema(extrema): features = np.vstack([s[extrema], np.gradient(s)[extrema]]).T kmeans = KMeans(n_clusters=2, random_state=0, n_init=10) labels = kmeans.fit_predict(features) cluster = labels[np.argmax(s[extrema])] # Cluster with higher peak return extrema[labels == cluster] peaks = cluster_extrema(local_maxima) troughs = cluster_extrema(local_minima) return peaks, troughs except Exception as e: logging.error(f"Clustering-based peak/trough detection failed: {e}") return np.array([]), np.array([])
[docs] def detect_peak_trough_DEFAULT(self, s): """ Detects peaks and troughs using SciPy's `find_peaks` function. Parameters ---------- s : array_like Input signal. Returns ------- tuple Detected peaks and troughs. """ try: peaks = signal.find_peaks(s)[0] troughs = [ np.argmin(s[peaks[i] : peaks[i + 1]]) + peaks[i] for i in range(len(peaks) - 1) ] return peaks, troughs except Exception as e: logging.error(f"SciPy peak/trough detection failed: {e}") return np.array([]), np.array([])
[docs] def detect_peak_trough_billauer(self, s, delta=0.8): """ Billauer's method for peak and trough detection, translated from MATLAB. Parameters ---------- s : array_like Input signal. delta : float, optional Minimum difference required to consider a peak (default is 0.8). Returns ------- tuple Detected peaks and troughs. """ # Billauer peakdet — records the index of the actual max/min, not the # detection crossing point. Reference: Billauer (2005) peakdet.m. maxtab, mintab = [], [] mn, mx = np.inf, -np.inf mnpos, mxpos = 0, 0 look_for_max = True for i in range(len(s)): if s[i] > mx: mx, mxpos = s[i], i if s[i] < mn: mn, mnpos = s[i], i if look_for_max: if s[i] < mx - delta: maxtab.append(mxpos) # true peak index mn, mnpos = s[i], i look_for_max = False else: if s[i] > mn + delta: mintab.append(mnpos) # true trough index mx, mxpos = s[i], i look_for_max = True return np.array(maxtab), np.array(mintab)
# ========================================================================= # ECG detectors — Phase 2 # =========================================================================
[docs] def detect_r_peaks_pan_tompkins(self, s): """ Pan-Tompkins (1985) R-peak detector. Pipeline: bandpass → derivative → square → moving-window integration → adaptive threshold with two learning periods. Reference: Pan J, Tompkins WJ. IEEE Trans Biomed Eng. 1985;32(3):230-6. Parameters ---------- s : array_like Input ECG signal. Returns ------- np.ndarray Indices of detected R-peaks in the original signal. """ s = np.asarray(s, dtype=float) fs = self.fs # --- 1. Bandpass filter (5–15 Hz passband per Pan-Tompkins) ---------- try: bp = BandpassFilter(fs=fs) s_filt = bp.signal_highpass_filter(s, cutoff=5, order=2) s_filt = bp.signal_lowpass_filter(s_filt, cutoff=15, order=2) except Exception: s_filt = s.copy() # --- 2. Derivative (5-point) ----------------------------------------- # y[n] = (1/8T)(-x[n-2] - 2x[n-1] + 2x[n+1] + x[n+2]) d = np.zeros_like(s_filt) for n in range(2, len(s_filt) - 2): d[n] = (1.0 / (8.0 / fs)) * ( -s_filt[n - 2] - 2 * s_filt[n - 1] + 2 * s_filt[n + 1] + s_filt[n + 2] ) # --- 3. Squaring -------------------------------------------------------- sq = d ** 2 # --- 4. Moving-window integration (150 ms) --------------------------- win = max(1, int(0.150 * fs)) mwi = np.convolve(sq, np.ones(win) / win, mode="same") # --- 5. Adaptive threshold (two-pass) -------------------------------- # Learning period 1: first 2 s for initial threshold estimates learn_n = int(2 * fs) init = mwi[:min(learn_n, len(mwi))] spki = np.max(init) if len(init) > 0 else 1.0 # signal peak estimate npki = np.mean(init) if len(init) > 0 else 0.0 # noise peak estimate threshold_i1 = npki + 0.25 * (spki - npki) min_rr = int(0.2 * fs) # 200 ms refractory period peaks_i = [] last_peak = -min_rr for i in range(len(mwi)): if i - last_peak < min_rr: continue # Local maximum check in ±win/2 neighbourhood lo = max(0, i - win // 2) hi = min(len(mwi), i + win // 2 + 1) if mwi[i] != np.max(mwi[lo:hi]): continue if mwi[i] >= threshold_i1: peaks_i.append(i) spki = 0.125 * mwi[i] + 0.875 * spki last_peak = i else: npki = 0.125 * mwi[i] + 0.875 * npki threshold_i1 = npki + 0.25 * (spki - npki) if len(peaks_i) == 0: return np.array([]) # --- 6. Back-project to original signal: search ±60 ms for true R --- search = int(0.060 * fs) r_peaks = [] for p in peaks_i: lo = max(0, p - search) hi = min(len(s), p + search + 1) r_peaks.append(lo + int(np.argmax(s[lo:hi]))) return np.array(r_peaks)
[docs] def detect_r_peaks_hamilton(self, s): """ Hamilton-Tompkins simplified ECG R-peak detector (2002). Uses a single-pass derivative-square-integrate pipeline with fixed refractory period and mean-based threshold, suitable for real-time use. Reference: Hamilton PS. "Open source ECG analysis." Computers in Cardiology 2002;29:101-104. Parameters ---------- s : array_like Input ECG signal. Returns ------- np.ndarray Indices of detected R-peaks. """ s = np.asarray(s, dtype=float) fs = self.fs # Bandpass 8–16 Hz (Hamilton's recommended range) try: bp = BandpassFilter(fs=fs) s_filt = bp.signal_highpass_filter(s, cutoff=8, order=2) s_filt = bp.signal_lowpass_filter(s_filt, cutoff=16, order=2) except Exception: s_filt = s.copy() # First difference + abs (simplified derivative) d = np.abs(np.diff(s_filt, prepend=s_filt[0])) # Moving average (80 ms) win = max(1, int(0.080 * fs)) ma = np.convolve(d, np.ones(win) / win, mode="same") # Fixed threshold: mean of signal + 0.5 * std threshold = np.mean(ma) + 0.5 * np.std(ma) min_rr = int(0.25 * fs) # 250 ms refractory # Peak detection on integrated signal candidate_peaks = signal.find_peaks(ma, height=threshold, distance=min_rr)[0] if len(candidate_peaks) == 0: return np.array([]) # Back-project to raw signal within ±50 ms search = int(0.050 * fs) r_peaks = [] for p in candidate_peaks: lo = max(0, p - search) hi = min(len(s), p + search + 1) r_peaks.append(lo + int(np.argmax(s[lo:hi]))) return np.array(r_peaks)
[docs] def detect_r_peaks_engzee(self, s): """ Engzee-Zeelenberg ECG R-peak detector (1979), as described in: Engzee R, Zeelenberg C. "A single scan algorithm for QRS-detection and feature extraction." Computers in Cardiology 1979;6:37-42. Uses the absolute first difference of a high-pass filtered signal with a dynamic threshold that adapts every detected beat. Parameters ---------- s : array_like Input ECG signal. Returns ------- np.ndarray Indices of detected R-peaks. """ s = np.asarray(s, dtype=float) fs = self.fs # High-pass filter to remove baseline wander try: bp = BandpassFilter(fs=fs) s_hp = bp.signal_highpass_filter(s, cutoff=0.5, order=2) except Exception: s_hp = s.copy() # Engzee: use the difference signal (approximation of derivative) diff = np.abs(np.diff(s_hp, prepend=s_hp[0])) # Initial threshold from first 2 s of signal init_n = int(2 * fs) threshold = 0.6 * np.max(diff[:min(init_n, len(diff))]) min_rr = int(0.2 * fs) peaks = [] last = -min_rr i = 0 while i < len(diff): if i - last < min_rr: i += 1 continue if diff[i] >= threshold: # Search window: find peak within next 50 ms win_end = min(len(s), i + int(0.050 * fs)) local_peak = i + int(np.argmax(s[i:win_end])) peaks.append(local_peak) # Update threshold: 60% of detected peak value in diff threshold = 0.6 * np.max(diff[max(0, i - int(0.2 * fs)):i + 1]) last = local_peak i = local_peak + 1 else: i += 1 return np.array(peaks)
# ========================================================================= # PPG detectors — Phase 2 # =========================================================================
[docs] def detect_peak_trough_ampd(self, s): """ Automatic Multiscale Peak Detection (AMPD) for PPG signals. AMPD finds peaks without requiring pre-set parameters by computing a Local Maxima Scalogram (LMS) across all window scales and identifying the scale with the fewest maxima (optimal scale), then intersecting maxima across all scales up to that point. Reference: Scholkmann F et al. "An Efficient Algorithm for Automatic Peak Detection in Noisy Periodic and Quasi-Periodic Signals." Algorithms 2012;5(4):588-603. Parameters ---------- s : array_like Input PPG signal. Returns ------- tuple (peaks, troughs) as numpy arrays of indices. """ s = np.asarray(s, dtype=float) N = len(s) L = N // 2 # maximum scale if L < 2: return np.array([]), np.array([]) # Build Local Maxima Scalogram: lms[k, i] = 1 if s[i] is local max at scale k # Use a compact boolean matrix; row k corresponds to window half-width k+1 lms = np.zeros((L - 1, N), dtype=bool) for k in range(1, L): lms[k - 1, k: N - k] = ( (s[k: N - k] > s[: N - 2 * k]) & (s[k: N - k] > s[2 * k:]) ) # Row counts: how many samples each scale treats as a local maximum. # The optimal scale lambda_opt minimises the row count in the physiological # range — it is the scale closest to the true half-period of the signal. # At this scale the LMS has approximately the correct number of peaks. row_counts = lms.sum(axis=1) if not np.any(row_counts > 0): return np.array([]), np.array([]) # Restrict search to scales 1..2*fs (prevents choosing the degenerate # large-scale minimum where only 1 global maximum survives). max_scale = min(L - 2, int(2 * self.fs)) # Among scales with more than 1 peak, find the minimum-count scale. search = row_counts[:max_scale + 1] valid = search > 1 # must find at least 2 peaks to be useful if not np.any(valid): valid = search > 0 candidates = np.where(valid)[0] lambda_opt = candidates[np.argmin(search[candidates])] # Use the optimal scale's local maxima directly as peaks. # This is equivalent to the Scholkmann original — the row with minimum # count has precisely the right number of beats. peaks = np.where(lms[lambda_opt, :])[0] # Refine: for each detected peak, find the true maximum of the raw signal # within ±(lambda_opt//4) samples to snap to the exact waveform peak. refine_w = max(1, lambda_opt // 4) refined = [] for p in peaks: lo = max(0, p - refine_w) hi = min(N, p + refine_w + 1) refined.append(lo + int(np.argmax(s[lo:hi]))) peaks = np.unique(np.array(refined)) if len(peaks) == 0: return np.array([]), np.array([]) # Troughs: argmin between consecutive peaks troughs = np.array([ peaks[i] + int(np.argmin(s[peaks[i]:peaks[i + 1]])) for i in range(len(peaks) - 1) ]) return peaks, troughs
[docs] def detect_peak_trough_local_max_ibi(self, s): """ Local-maximum PPG detector with inter-beat-interval (IBI) tracking. Finds all local maxima above a dynamic threshold, then prunes false positives by enforcing a physiologically plausible IBI window derived from the running median IBI (40–200 BPM, i.e. 300–1500 ms). This is particularly robust to low-amplitude diastolic humps that confuse simpler detectors. Parameters ---------- s : array_like Input PPG signal. Returns ------- tuple (peaks, troughs) as numpy arrays of indices. """ s = np.asarray(s, dtype=float) fs = self.fs # Initial candidates: all local maxima with order ~50 ms candidates = signal.argrelmax(s, order=max(1, int(0.05 * fs)))[0] if len(candidates) == 0: return np.array([]), np.array([]) # Height threshold: above the signal mean (rejects sub-mean noise bumps # while passing all true systolic peaks regardless of waveform offset). amp_threshold = np.mean(s) candidates = candidates[s[candidates] >= amp_threshold] if len(candidates) == 0: return np.array([]), np.array([]) # Enforce minimum refractory (300 ms = 200 BPM upper limit) min_rr = int(0.300 * fs) # Enforce maximum IBI (1500 ms = 40 BPM lower limit) max_rr = int(1.500 * fs) # Greedy forward scan with running-median IBI adaptation peaks = [candidates[0]] for c in candidates[1:]: rr = c - peaks[-1] if rr < min_rr: # Keep the higher of the two close candidates if s[c] > s[peaks[-1]]: peaks[-1] = c continue if rr > max_rr and len(peaks) >= 3: # Large gap — check if a beat was missed (insert midpoint search) median_rr = int(np.median(np.diff(peaks[-3:]))) mid = peaks[-1] + median_rr if mid < c: # Search ±15% of median_rr around expected position search = int(0.15 * median_rr) lo = max(0, mid - search) hi = min(len(s), mid + search + 1) insert_idx = lo + int(np.argmax(s[lo:hi])) if s[insert_idx] >= amp_threshold: peaks.append(insert_idx) peaks.append(c) peaks = np.array(peaks) # Troughs: argmin between consecutive peaks troughs = np.array([ peaks[i] + int(np.argmin(s[peaks[i]:peaks[i + 1]])) for i in range(len(peaks) - 1) ]) return peaks, troughs