Source code for vital_sqi.common.generate_template

"""Generating templates of ECG and PPG complexes"""

import numpy as np
from scipy.special import erf
from sklearn.preprocessing import MinMaxScaler
from scipy import signal, interpolate
from scipy.signal import argrelextrema
from scipy.integrate import solve_ivp


[docs] def squeeze_template(s, width): """ Compress a signal template by averaging values within a given width. Parameters ---------- s : array_like Input signal array. width : int Desired compressed length of the output signal. Returns ------- numpy.ndarray Compressed signal. """ s = np.asarray(s) total_len = len(s) width = int(width) if width <= 0: raise ValueError("width must be a positive integer.") span_unit = 2 centroids = (total_len / width) * np.arange(width) lefts = np.clip(centroids.astype(int) - span_unit, 0, total_len) rights = np.clip(centroids.astype(int) + span_unit, 0, total_len) # Cumulative-sum trick: mean over [left, right) = (cumsum[right] - cumsum[left]) / (right - left). csum = np.concatenate(([0.0], np.cumsum(s, dtype=float))) widths = np.maximum(rights - lefts, 1) return (csum[rights] - csum[lefts]) / widths
[docs] def ppg_dual_double_frequency_template(width): """ Generate a PPG template using two sine waveforms with different frequencies. Parameters ---------- width : int The sample size of the generated waveform. Returns ------- numpy.ndarray A 1-D array representing the PPG waveform with a diastolic peak at a low position. """ t = np.linspace(0, 1, width, False) # 1 second sig = np.sin(2 * np.pi * 2 * t - np.pi / 2) + np.sin(2 * np.pi * 1 * t - np.pi / 6) sig_scale = MinMaxScaler().fit_transform(sig.reshape(-1, 1)) return sig_scale.reshape(-1)
[docs] def skew_func(x, e=0, w=1, a=0): """ Generate a skewness distribution. Parameters ---------- x : array_like Input sequence of time points. e : float, optional Location parameter (default is 0). w : float, optional Scale parameter (default is 1). a : float, optional Shape parameter (default is 0). Returns ------- numpy.ndarray A 1-D array of a skewness distribution. """ t = (x - e) / w omega = (1 + erf((a * t) / np.sqrt(2))) / 2 gaussian_dist = 1 / (np.sqrt(2 * np.pi)) * np.exp(-(t**2) / 2) return 2 / w * gaussian_dist * omega
[docs] def ppg_absolute_dual_skewness_template(width, e_1=1, w_1=2.5, e_2=3, w_2=3, a=4): """ Generate a PPG template using two skewness distributions. Parameters ---------- width : int Sample size of the generated waveform. e_1, w_1 : float, optional Parameters for the first skew distribution (default e_1=1, w_1=2.5). e_2, w_2 : float, optional Parameters for the second skew distribution (default e_2=3, w_2=3). a : float, optional Shape parameter for both distributions (default is 4). Returns ------- numpy.ndarray A 1-D array representing the PPG waveform. """ x = np.linspace(0, 11, width, False) p_1 = skew_func(x, e_1, w_1, a) p_2 = skew_func(x, e_2, w_2, a) p_ = np.max([p_1, p_2], axis=0) sig_scale = MinMaxScaler().fit_transform(p_.reshape(-1, 1)) return sig_scale.reshape(-1)
[docs] def ppg_nonlinear_dynamic_system_template(width): """ Generate a PPG template based on a nonlinear dynamic system. Parameters ---------- width : int Desired length of the template. Returns ------- numpy.ndarray A rescaled signal template for PPG. """ x1, x2, u = 0.15, 0.15, 0.5 beta, gamma1, gamma2 = 1, -0.25, 0.25 x1_list, x2_list = [x1], [x2] dt = 0.1 for _ in np.arange(1, 100, dt): y1 = 0.5 * (np.abs(x1 + 1) - np.abs(x1 - 1)) y2 = 0.5 * (np.abs(x2 + 1) - np.abs(x2 - 1)) dx1 = -x1 + (1 + u) * y1 - beta * y2 + gamma1 dx2 = -x2 + (1 + u) * y2 + beta * y1 + gamma2 x1 += dx1 * dt x2 += dx2 * dt x1_list.append(x1) x2_list.append(x2) local_minima = argrelextrema(np.array(x2_list), np.less)[0] s = np.array(x2_list[local_minima[-2] : local_minima[-1] + 1]) rescale_signal = squeeze_template(s, width) window = signal.windows.cosine(len(rescale_signal), 0.5) signal_data_tapered = window * (rescale_signal - min(rescale_signal)) out_scale = MinMaxScaler().fit_transform(signal_data_tapered.reshape(-1, 1)) return out_scale.reshape(-1)
[docs] def interp(ys, mul): """ Perform cubic interpolation and extrapolation on a sequence. Parameters ---------- ys : array_like Input sequence. mul : int Multiplication factor for interpolation. Returns ------- numpy.ndarray Interpolated sequence. """ ys = list(ys) ys.append(2 * ys[-1] - ys[-2]) xs = np.arange(len(ys)) fn = interpolate.interp1d(xs, ys, kind="cubic") new_xs = np.arange(len(ys) - 1, step=1.0 / mul) return fn(new_xs)
[docs] def ecg_dynamic_template( width, sfecg=256, N=256, Anoise=0, hrmean=60, hrstd=1, lfhfratio=0.5, sfint=512, ti=np.array([-70, -15, 0, 15, 100]), ai=np.array([1.2, -5, 30, -7.5, 0.75]), bi=np.array([0.25, 0.1, 0.1, 0.1, 0.4]), ): """ Generate a synthetic ECG signal template. Parameters ---------- width : int Desired length of the output template. sfecg, N, Anoise, hrmean, hrstd, lfhfratio, sfint : int or float, optional Parameters for the ECG generation model. ti, ai, bi : numpy.ndarray, optional Arrays of model parameters. Returns ------- numpy.ndarray Generated ECG template. """ ti = ti * np.pi / 180 hrfact = np.sqrt(hrmean / 60) hrfact2 = np.sqrt(hrfact) bi = hrfact * bi ti = np.multiply([hrfact2, hrfact, 1, hrfact, hrfact2], ti) flo, fhi, flostd, fhistd = 0.1, 0.25, 0.01, 0.01 rr0 = rr_process( flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, 1, 2 ** int(np.ceil(np.log2(N * 60 / hrmean))), ) rr = interp(rr0, sfint) dt = 1 / sfint rrn = np.zeros(len(rr)) tecg, i, Nt = 0, 0, len(rr) while i < Nt: tecg += rr[i] ip = int(np.round(tecg / dt)) rrn[i : ip + 1] = rr[i] i = ip + 1 tspan = np.arange(0, (ip - 1) * dt, dt) args = (rrn, sfint, ti, ai, bi) solv_ode = solve_ivp( ordinary_differential_equation, [tspan[0], tspan[-1]], [1, 0, 0.04], t_eval=np.arange(20.5, 21.5, 0.00001), args=args, ) return solv_ode.y[2]
[docs] def ordinary_differential_equation(t, x_equations, rr, sfint, ti, ai, bi): """ Solve the ordinary differential equation for synthetic ECG generation. Parameters ---------- t : float Time variable. x_equations : array_like Initial values for x, y, and z variables. rr : numpy.ndarray Resampled RR interval sequence. sfint : int Sampling frequency for interpolation. ti, ai, bi : numpy.ndarray Model parameters for ECG generation. Returns ------- list Derivatives of x, y, and z. """ x, y, z = x_equations ta = np.arctan2(y, x) a0 = 1.0 - np.sqrt(x**2 + y**2) ip = int(1 + np.floor(t * sfint)) w0 = 2 * np.pi / (rr[ip] if ip < len(rr) else rr[-1]) dx1dt = a0 * x - w0 * y dx2dt = a0 * y + w0 * x dti = np.fmod(ta - ti, 2 * np.pi) dx3dt = -np.sum(ai * dti * np.exp(-0.5 * (dti / bi) ** 2)) - 1.0 * ( z - 0.005 * np.sin(2 * np.pi * 0.25 * t) ) return [dx1dt, dx2dt, dx3dt]
[docs] def rr_process(flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, sfrr, n, seed=0): """ Generate an RR interval time series with bimodal power spectrum. Parameters ---------- flo, fhi : float Low and high frequencies of the Gaussian distributions. flostd, fhistd : float Standard deviations of low and high frequencies. lfhfratio : float Ratio of low-frequency to high-frequency power. hrmean, hrstd : float Mean and standard deviation of heart rate. sfrr : int Sampling frequency for RR intervals. n : int Length of the generated time series. seed : int or None, optional Seed for the random phase generator (default ``0``). Pass ``None`` to draw fresh randomness from the global numpy state. A fixed default makes ``ecg_dynamic_template`` deterministic, so the DTW reference template is reproducible across runs. Returns ------- numpy.ndarray Generated RR interval time series. """ rng = np.random.default_rng(seed) if seed is not None else np.random w1, w2 = 2 * np.pi * flo, 2 * np.pi * fhi c1, c2 = 2 * np.pi * flostd, 2 * np.pi * fhistd sig2, sig1 = 1, lfhfratio rrmean = 60 / hrmean rrstd = 60 * hrstd / (hrmean * hrmean) df = sfrr / n w = np.arange(0, n) * 2 * np.pi * df Hw1 = sig1 * np.exp(-0.5 * ((w - w1) / c1) ** 2) / np.sqrt(2 * np.pi * c1**2) Hw2 = sig2 * np.exp(-0.5 * ((w - w2) / c2) ** 2) / np.sqrt(2 * np.pi * c2**2) Hw = Hw1 + Hw2 Sw = (sfrr / 2) * np.sqrt(Hw) # Use a single random draw, then reuse it in the conjugate half so the # spectrum is Hermitian-symmetric (real ifft) — the original code drew # two independent random vectors which broke symmetry. half = 2 * np.pi * rng.uniform(size=int(n / 2) - 1) if hasattr(rng, "uniform") \ else 2 * np.pi * rng.rand(int(n / 2) - 1) ph = np.concatenate([[0], half, [0], -np.flip(half)]) SwC = Sw * np.exp(1j * ph) x = (1 / n) * np.real(np.fft.ifft(SwC)) rr = rrmean + (rrstd / np.std(x)) * x return rr