"""Signal quality indexes based on dynamic template matching
"""
import numpy as np
from vital_sqi.common.generate_template import (
ppg_absolute_dual_skewness_template,
ppg_dual_double_frequency_template,
ppg_nonlinear_dynamic_system_template,
ecg_dynamic_template,
)
from vital_sqi.common.utils import check_valid_signal
from scipy.spatial.distance import euclidean
from scipy.signal import resample
# from librosa.sequence import dtw
from sklearn.preprocessing import MinMaxScaler
from scipy.spatial.distance import cdist
# Template cache: (template_type, template_size) -> scaled reference array
_template_cache: dict = {}
[docs]
def dtw_distance(seq1, seq2):
"""
Compute the Dynamic Time Warping (DTW) distance between two sequences in a vectorized manner.
Parameters
----------
seq1 : array_like
The first sequence (e.g., the signal)
seq2 : array_like
The second sequence (e.g., the template)
Returns
-------
float
The DTW distance between the sequences
"""
n, m = len(seq1), len(seq2)
# Calculate cost matrix in a vectorized manner
cost_matrix = cdist(seq1.reshape(-1, 1), seq2.reshape(-1, 1), metric="euclidean")
# Initialize the DTW matrix with infinity
dtw_matrix = np.full((n + 1, m + 1), np.inf)
dtw_matrix[0, 0] = 0
# Fill the DTW matrix row by row. The "insert" transition dtw[i, j-1]
# must use the already-updated value from the current row, so a cell-by-
# cell loop is required for correctness.
for i in range(1, n + 1):
for j in range(1, m + 1):
dtw_matrix[i, j] = cost_matrix[i - 1, j - 1] + min(
dtw_matrix[i - 1, j], # deletion
dtw_matrix[i, j - 1], # insertion (updated value)
dtw_matrix[i - 1, j - 1], # match
)
return dtw_matrix[n, m]
[docs]
def dtw_sqi(s, template_type, template_size=100, simple_mode=False):
"""
Euclidean distance between signal and its template using DTW
Parameters
----------
s : array_like
Signal containing int or float values.
template_type : int
Template type identifier (0-3 for different template types)
template_size : int, optional
Size of the template to resample the signal and template (default is 100)
simple_mode : bool, optional
If True, uses a simpler Euclidean distance instead of DTW
Returns
-------
float
Calculated DTW or Euclidean distance between the signal and the template.
"""
check_valid_signal(s)
if template_size <= 0:
raise ValueError("Number of samples must be greater than zero.")
s = resample(s, template_size).reshape(-1)
if not isinstance(template_type, int) or not (0 <= template_type <= 3):
raise ValueError("Invalid template type")
cache_key = (template_type, template_size)
if cache_key not in _template_cache:
if template_type == 0:
ref_raw = ppg_nonlinear_dynamic_system_template(template_size).reshape(-1)
elif template_type == 1:
ref_raw = ppg_dual_double_frequency_template(template_size)
elif template_type == 2:
ref_raw = ppg_absolute_dual_skewness_template(template_size)
elif template_type == 3:
ref_raw = np.array(ecg_dynamic_template(template_size)).reshape(-1)
scaler = MinMaxScaler(feature_range=(0, 1))
_template_cache[cache_key] = scaler.fit_transform(
ref_raw.reshape(-1, 1)
).reshape(-1)
reference = _template_cache[cache_key]
scaler = MinMaxScaler(feature_range=(0, 1))
s = scaler.fit_transform(s.reshape(-1, 1)).reshape(-1)
if simple_mode:
return np.mean(
[euclidean([s[i]], [reference[i]]) for i in range(template_size)]
)
else:
return dtw_distance(s, reference)
# if simple_mode:
# cost = 0
# for i in range(template_size):
# cost += euclidean([s[i]], [reference[i]])
# dtw_cost = cost / template_size
# else:
# beat = resample(s, template_size)
# scaler = MinMaxScaler(feature_range=(0, 1))
# beat = scaler.fit_transform(beat.reshape(-1, 1)).reshape(-1)
# reference = resample(reference, template_size)
# scaler = MinMaxScaler(feature_range=(0, 1))
# reference = scaler.fit_transform(reference.reshape(-1, 1)).reshape(-1)
# # Use custom DTW function instead of librosa
# dtw_cost = dtw_distance(beat, reference)
# return dtw_cost