Source code for vital_sqi.sqi.waveform_sqi

"""
Implementation of waveform-based SQIs (Signal Quality Indices):
- For ECG based on DiMarco2012.
- For PPG (to be extended).
"""

import warnings
import scipy.signal as sn
import numpy as np
from vitalDSP.physiological_features.waveform import WaveformMorphology


[docs] def band_energy_sqi(signal, sampling_rate=100, band=None, nperseg=2048): """ Compute the peak value of the time marginal of the energy distribution in a frequency band. Parameters ---------- signal : array-like The input signal. sampling_rate : int, optional Sampling rate of the signal. Default is 100 Hz. band : list, optional Frequency band [low, high]. If None, the entire spectrum is used. Default is None. nperseg : int, optional Length of each segment for the Short-Time Fourier Transform. Default is 2048. Returns ------- float Maximum time marginal power in the specified frequency band. Raises ------ AssertionError If the band is not a valid list or has invalid values. Example ------- >>> import numpy as np >>> from vital_sqi.sqi.waveform_sqi import band_energy_sqi >>> signal = np.sin(2 * np.pi * 0.01 * np.arange(1000)) >>> band_energy_sqi(signal, sampling_rate=100, band=[0.1, 0.5]) 0.3141592653589793 """ assert np.isreal(sampling_rate), "Expected a numeric sampling rate value." if len(signal) < nperseg: nperseg = len(signal) f, t, spec = sn.stft( signal, fs=sampling_rate, window="hann", nperseg=nperseg, noverlap=(nperseg // 2), detrend=False, return_onesided=True, boundary="zeros", padded=True, ) if band is None: max_time_marginal = float(np.max(np.sum(np.abs(spec), axis=0))) else: assert isinstance(band, list) and band[0] <= band[1], "Invalid band values." idx = np.where((f > band[0]) & (f <= band[1]))[0] if len(idx) == 0: return 0.0 max_time_marginal = float(np.max(np.sum(np.abs(spec[idx]), axis=0))) return max_time_marginal
[docs] def lf_energy_sqi(signal, sampling_rate, band=[0, 0.5]): """ Low-Frequency Energy SQI. Parameters ---------- signal : array-like The input signal. sampling_rate : int Sampling rate of the signal. band : list, optional Frequency band. Default is [0, 0.5]. Returns ------- float Low-frequency energy SQI. """ return band_energy_sqi(signal, sampling_rate, band)
[docs] def qrs_energy_sqi(signal, sampling_rate, band=[5, 25]): """ QRS Energy SQI. Parameters ---------- signal : array-like The input signal. sampling_rate : int Sampling rate of the signal. band : list, optional Frequency band. Default is [5, 25]. Returns ------- float QRS energy SQI. """ return band_energy_sqi(signal, sampling_rate, band)
[docs] def hf_energy_sqi(signal, sampling_rate, band=None): """ High-Frequency Energy SQI. Parameters ---------- signal : array-like The input signal. sampling_rate : int Sampling rate of the signal. band : list, optional Frequency band [low_hz, high_hz]. Defaults to ``[100, np.inf]``. For signals sampled at 100 Hz (Nyquist = 50 Hz) this band is above Nyquist and will always return 0; pass an appropriate band for the actual sampling rate (e.g. ``[20, 50]`` for 100 Hz data). Returns ------- float High-frequency energy SQI. """ if band is None: band = [100, np.inf] nyquist = sampling_rate / 2.0 if band[0] >= nyquist: warnings.warn( f"hf_energy_sqi: band lower bound ({band[0]} Hz) is at or above Nyquist " f"({nyquist} Hz) for sampling_rate={sampling_rate}. " "Returning NaN. Pass a band appropriate for this sampling rate.", UserWarning, stacklevel=2, ) return np.nan return band_energy_sqi(signal, sampling_rate, band)
[docs] def vhf_norm_power_sqi(signal, sampling_rate, band=None, nperseg=2048): """ Very High-Frequency Normalized Power SQI. Parameters ---------- signal : array-like The input signal. sampling_rate : int Sampling rate of the signal. band : list, optional Frequency band [low_hz, high_hz]. Defaults to ``[150, np.inf]``. For signals sampled at 100 Hz (Nyquist = 50 Hz) this band is above Nyquist and will return NaN; pass a band within the Nyquist limit (e.g. ``[30, 50]`` for 100 Hz data). nperseg : int, optional Length of each segment for the Short-Time Fourier Transform. Default is 2048. Returns ------- float Normalized power in the very high-frequency band, or NaN if the band contains no STFT bins. """ if band is None: band = [150, np.inf] nyquist = sampling_rate / 2.0 if band[0] >= nyquist: warnings.warn( f"vhf_norm_power_sqi: band lower bound ({band[0]} Hz) is at or above " f"Nyquist ({nyquist} Hz) for sampling_rate={sampling_rate}. " "Returning NaN. Pass a band appropriate for this sampling rate.", UserWarning, stacklevel=2, ) return np.nan if len(signal) < nperseg: nperseg = len(signal) f, t, spec = sn.stft( signal, fs=sampling_rate, window="hann", nperseg=nperseg, noverlap=(nperseg // 2), detrend=False, return_onesided=True, boundary="zeros", padded=True, ) idx = np.where((f > band[0]) & (f <= band[1]))[0] if len(idx) == 0: warnings.warn( f"vhf_norm_power_sqi: band {band} is above Nyquist ({sampling_rate / 2} Hz). " "Returning NaN." ) return np.nan freq_marginal = np.sum(np.abs(spec[idx]), axis=0) peak = max(freq_marginal) if peak == 0: return np.nan np_vhf = float(np.median(freq_marginal) / peak) return np_vhf
[docs] def qrs_a_sqi(signal, sampling_rate): """ QRS Amplitude SQI. Parameters ---------- signal : array-like The input ECG signal. sampling_rate : int Sampling rate of the signal. Returns ------- float Median peak-to-nadir amplitude difference of QRS complexes. Example ------- >>> signal = np.random.randn(1000) >>> qrs_a_sqi(signal, sampling_rate=100) 0.75 """ waveform = WaveformMorphology(signal, signal_type="ECG", fs=sampling_rate) r_peaks = waveform.r_peaks qrs_session = waveform.detect_qrs_session(r_peaks) qrs_a = np.median( [np.max(signal[qrs]) - np.min(signal[qrs]) for qrs in qrs_session] ) return qrs_a