Source code for vital_sqi.data.signal_io

import logging
from pyedflib import highlevel
from wfdb import rdsamp, wrsamp
import numpy as np
import pandas as pd
import datetime as dt
import os
import glob
from vital_sqi.common import utils
from vital_sqi.common.utils import generate_timestamp
from vital_sqi.data.signal_sqi_class import SignalSQI

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)


[docs] def ECG_reader( file_name, file_type, channel_num=None, channel_name=None, sampling_rate=None, start_datetime=None, ): """ Reads ECG data from a specified file type and returns a SignalSQI object. Parameters ---------- file_name : str Path to ECG file. file_type : str Supported types include 'edf', 'mit' or 'csv'. channel_num : list, optional List of channel ids to read, starting from 0. channel_name : list, optional List of channel names to read. sampling_rate : int or float, optional Sampling rate of the signal. start_datetime : str, optional Start datetime in '%Y-%m-%d %H:%M:%S.%f' format. Returns ------- SignalSQI SignalSQI object. """ try: if file_type == "mit": assert glob.glob(file_name + ".*"), "Files not found" else: assert os.path.isfile(file_name), "File not found" assert file_type in [ "edf", "mit", "csv", ], "Only edf, mit, and csv are supported." assert ( isinstance(channel_num, list) or channel_num is None ), "Channel num must be a list or None" assert ( isinstance(channel_name, list) or channel_name is None ), "Channel name must be a list or None" assert not ( channel_name and channel_num ), "Specify either channel name or channel index(s) or None" assert ( isinstance(start_datetime, str) or start_datetime is None ), "Start datetime must be None or a string" assert ( isinstance(sampling_rate, (int, float)) or sampling_rate is None ), "Sampling rate must be a number or None" if isinstance(sampling_rate, float): sampling_rate = round(sampling_rate) if start_datetime: start_datetime = utils.parse_datetime(start_datetime) if file_type == "edf": signals, signal_headers, header = highlevel.read_edf( edf_file=file_name, ch_nrs=channel_num, ch_names=channel_name ) sampling_rate = sampling_rate or signal_headers[0].get("sample_frequency") or signal_headers[0].get("sample_rate") if sampling_rate is None: raise ValueError("Sampling rate could not be inferred.") start_datetime = start_datetime or header.get("startdate") signals = pd.DataFrame(signals.T) timestamps = generate_timestamp(start_datetime, sampling_rate, len(signals)) signals.insert(0, "timestamps", timestamps) info = [header, signal_headers] return SignalSQI( signals=signals, wave_type="ECG", start_datetime=start_datetime, sampling_rate=sampling_rate, info=info, ) elif file_type == "mit": signals, info = rdsamp( file_name, channels=channel_num, channel_names=channel_name, warn_empty=True, ) sampling_rate = sampling_rate or info.get("fs") if sampling_rate is None: raise ValueError("Sampling rate could not be inferred.") if not start_datetime: date, time = info.get("base_date"), info.get("base_time") start_datetime = ( dt.datetime.combine(date, time) if date and time else None ) timestamps = generate_timestamp(start_datetime, sampling_rate, len(signals)) signals = pd.DataFrame(signals) signals["timestamps"] = timestamps return SignalSQI( signals=signals, wave_type="ECG", sampling_rate=sampling_rate, info=info ) elif file_type == "csv": signals = pd.read_csv(file_name, usecols=channel_name or channel_num) timestamps = ( signals.iloc[:, 0].apply(pd.Timestamp) if not start_datetime else generate_timestamp(start_datetime, sampling_rate, len(signals)) ) if sampling_rate is None: raise ValueError("Sampling rate could not be inferred.") signals["timestamps"] = timestamps return SignalSQI( signals=signals, wave_type="ECG", start_datetime=start_datetime, sampling_rate=sampling_rate, ) except Exception as e: logging.error(f"Failed to read ECG file: {e}") raise
[docs] def ECG_writer(signal_sqi, file_name, file_type, info=None): """ Writes the SignalSQI object to a file. Parameters ---------- signal_sqi : SignalSQI SignalSQI object containing signals, sampling rate, and sqi. file_name : str Name of the file to write, with extension. file_type : str Type of file ('edf', 'mit', 'csv'). info : list or dict, optional Additional header information. """ try: signals = signal_sqi.signals.drop(columns="timestamps").to_numpy() sampling_rate = signal_sqi.sampling_rate start_datetime = signal_sqi.start_datetime if file_type == "edf": if info: signal_headers, header = info[1], info[0] highlevel.write_edf( file_name, signals.T, signal_headers, header, file_type=-1 ) else: highlevel.write_edf_quick(file_name, signals.T, sampling_rate) elif file_type == "mit": if not info: raise ValueError("Header dict needed for MIT format") wrsamp( record_name=file_name.split("/")[-1], fs=sampling_rate, units=info["units"], sig_name=info["sig_name"], p_signal=signals, base_date=info["base_date"], base_time=info["base_time"], comments=info["comments"], write_dir="/".join(file_name.split("/")[:-1]), ) elif file_type == "csv": timestamps = generate_timestamp(start_datetime, sampling_rate, len(signals)) signals = pd.DataFrame( np.hstack((np.array(timestamps).reshape(-1, 1), signals)) ) signals.to_csv(file_name, index=False) return True # Ensure success is indicated except Exception as e: logging.error(f"Failed to write ECG file: {e}") raise
[docs] def PPG_reader( file_name, signal_idx, timestamp_idx, info_idx=[], timestamp_unit="ms", sampling_rate=None, start_datetime=None, ): """ Reads PPG data from a CSV file and returns a SignalSQI object. This function extracts PPG signal data from a CSV file. It converts the timestamp column to `pd.Timestamp` format, based on the provided `timestamp_unit`, and generates timestamps accordingly if `start_datetime` is provided. If the sampling rate is not specified, it attempts to infer it based on the timestamps. Parameters ---------- file_name : str Path to the PPG file (CSV format). signal_idx : list List of indices or names indicating the columns with PPG signal data. timestamp_idx : list List containing the index or name of the column with timestamp data. info_idx : list, optional List of indices or names of columns with additional information. Default is an empty list. timestamp_unit : str, optional Unit of timestamp in the file. Accepts "ms" (milliseconds) or "s" (seconds). Default is "ms". sampling_rate : int or float, optional Sampling rate of the PPG signal. If None, it will be inferred from the timestamps. Default is None. start_datetime : str, optional Start datetime in '%Y-%m-%d %H:%M:%S.%f' format. If None, the current time will be used. Returns ------- SignalSQI SignalSQI object containing the PPG signals, metadata, and timestamps. Raises ------ Exception If there are issues reading the file or interpreting the timestamps. """ try: # Validate timestamp_unit valid_units = ["ms", "s"] if timestamp_unit not in valid_units: raise ValueError( "Timestamp unit must be either 'ms' (milliseconds) or 's' (seconds)." ) info_idx = info_idx or [] if isinstance(signal_idx, int): signal_idx = [signal_idx] # Ensure info_idx is a list info_idx = info_idx or [] # Combine all columns to read if type(timestamp_idx) is not list: timestamp_idx = [timestamp_idx] cols = timestamp_idx + signal_idx + info_idx tmp = pd.read_csv( file_name, usecols=cols, skipinitialspace=True, skip_blank_lines=True ) # Process timestamps timestamps = tmp.iloc[:, 0] if timestamp_unit == "ms": timestamps = pd.to_datetime(timestamps, unit="ms") elif timestamp_unit == "s": timestamps = pd.to_datetime(timestamps, unit="s") # Adjust timestamps if start_datetime is provided if start_datetime: start_datetime = pd.Timestamp(start_datetime) timestamps += start_datetime - timestamps.iloc[0] # Infer sampling rate if not provided if sampling_rate is None: diffs = timestamps.diff().dt.total_seconds() sampling_rate = 1 / diffs.median() # Extract signal data and additional info signal_data = tmp.iloc[:, 1 : 1 + len(signal_idx)] additional_info = tmp.iloc[:, 1 + len(signal_idx) :] signal_df = pd.concat([timestamps, signal_data, additional_info], axis=1) # Return a SignalSQI object (or similar custom object) return SignalSQI( signals=signal_df, wave_type="PPG", # timestamps=timestamps, info=additional_info, sampling_rate=sampling_rate, ) except Exception as e: logging.error(f"Failed to read PPG file: {e}") raise
[docs] def PPG_writer(signal_sqi, file_name, file_type="csv"): """ Writes PPG SignalSQI data to a specified file format. This function exports the `SignalSQI` object containing PPG signal data to either a CSV or Excel file format. The output file includes timestamps and PPG signal values. Parameters ---------- signal_sqi : SignalSQI SignalSQI object containing PPG signals, timestamps, and metadata. file_name : str Absolute path for the output file, including the file extension. file_type : str, optional Type of file to write. Options are "csv" (default) or "xlsx" for Excel format. Returns ------- bool True if the file was successfully written, False otherwise. Raises ------ Exception If there are issues writing the file or converting data formats. """ try: # Generate timestamps for the signal based on start time and sampling rate timestamps = generate_timestamp( start_datetime=signal_sqi.start_datetime, sampling_rate=signal_sqi.sampling_rate, signal_length=len(signal_sqi.signals), ) # signals = pd.DataFrame( # {"time": timestamps, "pleth": np.array(signal_sqi.signals).reshape(-1)} # ) signals = pd.DataFrame( { "time": np.array(signal_sqi.signals.iloc[:, 0]), "pleth": np.array(signal_sqi.signals.iloc[:, 1]), } ) if file_type == "csv": signals.to_csv(file_name, index=False) elif file_type == "xlsx": signals.to_excel(file_name, index=False) else: logging.error(f"Unsupported file type: {file_type}") return False return os.path.isfile(file_name) except Exception as e: logging.error(f"Failed to write PPG file: {e}") raise