Source code for vital_sqi.preprocess.preprocess_signal

import numpy as np
import pandas as pd
from scipy import signal
from typing import Optional
from vital_sqi.common.generate_template import squeeze_template


[docs] def taper_signal( s: np.ndarray, window: Optional[np.ndarray] = None, shift_min_to_zero: bool = True ) -> np.ndarray: """ Applies a tapering window to the signal and optionally shifts the minimum value to zero. Parameters ---------- s : np.ndarray Input signal as a 1D array of floats. window : np.ndarray, optional Window shape to apply, defaults to Tukey window if None. shift_min_to_zero : bool, optional If True, shifts the signal minimum value to zero. Returns ------- np.ndarray Tapered and optionally shifted signal. """ if shift_min_to_zero: s = s - np.min(s) if window is None: window = signal.windows.tukey(len(s), 0.9) return s * window
[docs] def smooth_signal( s: np.ndarray, window_len: int = 5, window: str = "flat" ) -> np.ndarray: """ Smooths the signal using a specified window. Parameters ---------- s : np.ndarray 1D signal array. window_len : int, optional Size of the smoothing window, must be greater than 2. window : str, optional Window type for smoothing. Options: 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'. Returns ------- np.ndarray Smoothed signal. """ if not isinstance(window_len, int) or window_len < 3: raise ValueError("window_len must be an integer greater than or equal to 3.") if window not in ["flat", "hanning", "hamming", "bartlett", "blackman"]: raise ValueError( "window must be 'flat', 'hanning', 'hamming', 'bartlett', or 'blackman'." ) if s.ndim != 1: raise ValueError("smoothing only supports 1D arrays.") if len(s) < window_len: raise ValueError("Input signal length must be greater than window size.") # Extend the signal at both ends for smoothing s_ext = np.r_[s[window_len - 1 : 0 : -1], s, s[-2 : -window_len - 1 : -1]] if window == "flat": w = np.ones(window_len) else: w = getattr(np, window)(window_len) return np.convolve(w / w.sum(), s_ext, mode="same")[ window_len - 1 : -window_len + 1 ]
[docs] def scale_pattern(s: np.ndarray, window_size: int) -> np.ndarray: """ Scales or resamples the signal to a specified window size for comparison with a template. Parameters ---------- s : np.ndarray Input signal as a 1D array of floats. window_size : int The desired size of the output signal. Returns ------- np.ndarray Resampled and smoothed signal to match the desired window size. """ if len(s) == window_size: return s elif len(s) < window_size: # Use linear interpolation for efficient upsampling scaled_signal = np.interp( np.linspace(0, len(s) - 1, window_size), np.arange(len(s)), s ) else: # Downsample using a predefined squeeze function scaled_signal = squeeze_template(s, window_size) # Ensure the smoothing window length is valid window_len = min(5, len(scaled_signal)) return smooth_signal(scaled_signal, window_len=window_len)