Skip to content

Heartbeat detection

sleepecg.compare_heartbeats(detection, annotation, max_distance=0)

Determine correctness of detection results.

Determine true positives (TP), false positives (FP) and false negatives (FN) for an array of detected heartbeat indices based on an array of annotated heartbeat indices. Since neither annotations nor automated detectors usually hit the peak perfectly, detected peaks no further than max_distance in both directions from an annotated peak are considered true positives.

Parameters:

  • detection (ndarray) –

    Detected heartbeat indices.

  • annotation (ndarray) –

    Annotated heartbeat indices.

  • max_distance (int) –

    Maximum distance between indices to consider as the same peak, by default 0.

Returns:

  • TP( ndarray ) –

    True positives, i.e. actual heartbeats detected as heartbeats.

  • FP( ndarray ) –

    False positives, i.e. non-heartbeats detected as heartbeats.

  • FN( ndarray ) –

    False negatives, i.e. actual heartbeats not detected as heartbeats.

Source code in sleepecg/heartbeats.py
def compare_heartbeats(
    detection: np.ndarray,
    annotation: np.ndarray,
    max_distance: int = 0,
) -> _CompareHeartbeatsResult:
    """
    Determine correctness of detection results.

    Determine true positives (TP), false positives (FP) and false negatives (FN) for an
    array of detected heartbeat indices based on an array of annotated heartbeat indices.
    Since neither annotations nor automated detectors usually hit the peak perfectly,
    detected peaks no further than `max_distance` in both directions from an annotated peak
    are considered true positives.

    Parameters
    ----------
    detection : np.ndarray
        Detected heartbeat indices.
    annotation : np.ndarray
        Annotated heartbeat indices.
    max_distance : int, optional
        Maximum distance between indices to consider as the same peak, by default `0`.

    Returns
    -------
    TP : np.ndarray
        True positives, i.e. actual heartbeats detected as heartbeats.
    FP : np.ndarray
        False positives, i.e. non-heartbeats detected as heartbeats.
    FN : np.ndarray
        False negatives, i.e. actual heartbeats not detected as heartbeats.
    """
    if len(detection) == 0:
        return _CompareHeartbeatsResult(
            TP=np.array([]),
            FP=np.array([]),
            FN=annotation.copy(),
        )

    max_len = max(np.max(detection), np.max(annotation)) + 1
    detection_mask = np.zeros(max_len, dtype=bool)
    detection_mask[detection] = 1
    annotation_mask = np.zeros(max_len, dtype=bool)
    annotation_mask[annotation] = 1

    fuzzy_filter = np.ones(max_distance * 2 + 1, dtype=bool)
    detection_mask_fuzzy = np.convolve(detection_mask, fuzzy_filter, mode="same")
    annotation_mask_fuzzy = np.convolve(annotation_mask, fuzzy_filter, mode="same")

    return _CompareHeartbeatsResult(
        TP=np.where(detection_mask & annotation_mask_fuzzy)[0],
        FP=np.where(detection_mask & ~annotation_mask_fuzzy)[0],
        FN=np.where(annotation_mask & ~detection_mask_fuzzy)[0],
    )

sleepecg.detect_heartbeats(ecg, fs, backend='c')

Detect heartbeats in an ECG signal.

This is a modified version of the beat detection algorithm using adaptive thresholds described by Pan & Tompkins in 1985.

Modifications/additions to the original algorithm are:

  • Instead of a hardware filter adjusted to the sampling frequency of the MIT-BIH Arrhythmia Database, a second order bandpass with cutoff frequencies 5 and 30 Hz created via scipy.signal.butter() is used.
  • A bidirectional filter is used to remove filter delay.
  • The integration window is centered on the filtered signal, i.e. a peak in the filtered signal corresponds to a plateau in the integrated signal, not a saddle in the rising edge. This lets the adaptive threshold for the integrated signal remain at a higher level, which is less susceptible to noise.
  • Learning phase 1 is not described in detail in the original paper. This implementation uses maximum and mean values inside the first two seconds to initialize SPKI/SPKF/NPKI/NPKF. Details are provided in the _thresholding code.
  • In addition to the original searchback criterion, a searchback is also performed if no peak is found during the first second of the signal or no second peak is found 1.5 s after the first one. This ensures correct behaviour at signal start in case an unusually large peak during learning phase 1 messes up threshold initialization.
  • After an unsuccessful searchback, the procedure is repeated in the same interval with further reduced thresholds, up to 16 times.

Parameters:

  • ecg (ndarray) –

    ECG signal. Note that the unit of the data does not matter (the algorithm will return similar results regardless of the scaling of the data).

  • fs (float) –

    Sampling frequency in Hz. For best results, a sampling frequency of at least 100 Hz is recommended.

  • backend ((c, numba, python)) –

    Which implementation of the squared moving integration and thresholding algorithm to use. If available, 'c' is the fastest implementation, 'numba' is about 25% slower, and 'python' is about 20 times slower but provided as a fallback. By default 'c'.

Returns:

  • ndarray

    Indices of detected heartbeats.

Examples:

Detect heartbeats in a short electrocardiogram:

>>> from sleepecg import detect_heartbeats, get_toy_ecg
>>> ecg, fs = get_toy_ecg()  # 5 min of ECG data at 360 Hz
>>> heartbeats = detect_heartbeats(ecg, fs)
>>> print(f"{len(heartbeats)} heartbeats detected")
478 heartbeats detected
Source code in sleepecg/heartbeats.py
def detect_heartbeats(ecg: np.ndarray, fs: float, backend: str = "c") -> np.ndarray:
    """
    Detect heartbeats in an ECG signal.

    This is a modified version of the beat detection algorithm using adaptive thresholds
    described by Pan & Tompkins in 1985.

    Modifications/additions to the
    [original algorithm](https://doi.org/10.1109/TBME.1985.325532) are:

    - Instead of a hardware filter adjusted to the sampling frequency of the MIT-BIH
      Arrhythmia Database, a second order bandpass with cutoff frequencies 5 and 30 Hz
      created via `scipy.signal.butter()` is used.
    - A bidirectional filter is used to remove filter delay.
    - The integration window is centered on the filtered signal, i.e. a peak in the filtered
      signal corresponds to a plateau in the integrated signal, not a saddle in the rising
      edge. This lets the adaptive threshold for the integrated signal remain at a higher
      level, which is less susceptible to noise.
    - Learning phase 1 is not described in detail in the original paper. This implementation
      uses maximum and mean values inside the first two seconds to initialize
      SPKI/SPKF/NPKI/NPKF. Details are provided in the `_thresholding` code.
    - In addition to the original searchback criterion, a searchback is also performed if no
      peak is found during the first second of the signal or no second peak is found 1.5 s
      after the first one. This ensures correct behaviour at signal start in case an
      unusually large peak during learning phase 1 messes up threshold initialization.
    - After an unsuccessful searchback, the procedure is repeated in the same interval with
      further reduced thresholds, up to 16 times.

    Parameters
    ----------
    ecg : np.ndarray
        ECG signal. Note that the unit of the data does not matter (the algorithm will
        return similar results regardless of the scaling of the data).
    fs : float
        Sampling frequency in Hz. For best results, a sampling frequency of at least 100 Hz
        is recommended.
    backend : {'c', 'numba', 'python'}
        Which implementation of the squared moving integration and thresholding algorithm to
        use. If available, `'c'` is the fastest implementation, `'numba'` is about 25%
        slower, and `'python'` is about 20 times slower but provided as a fallback. By
        default `'c'`.

    Returns
    -------
    np.ndarray
        Indices of detected heartbeats.

    Examples
    --------
    Detect heartbeats in a short electrocardiogram:

    >>> from sleepecg import detect_heartbeats, get_toy_ecg
    >>> ecg, fs = get_toy_ecg()  # 5 min of ECG data at 360 Hz
    >>> heartbeats = detect_heartbeats(ecg, fs)
    >>> print(f"{len(heartbeats)} heartbeats detected")
    478 heartbeats detected
    """
    if backend not in _all_backends:
        raise ValueError(
            f"Invalid backend for detect_heartbeats: {backend!r}. "
            f"Possible options are: {_all_backends}."
        )
    if backend not in _available_backends:
        fallback = _available_backends[0]
        warnings.warn(f"Backend {backend!r} not available, using {fallback!r} instead.")
        backend = fallback

    # check for flat data at the beginning to avoid messing up detection thresholds
    if len(ecg) < 2:
        raise ValueError("ECG signal must have more than one sample.")
    if ecg[1] == ecg[0]:
        idx_nonflat = np.nonzero(ecg != ecg[0])[0]
        if not len(idx_nonflat):
            raise ValueError("ECG signal is flat. Please check your data.")
        first_nonflat = idx_nonflat[0]  # first non-flat index
    else:
        first_nonflat = 0

    # cache filter in global variable to speed up runtime (especially for short signals)
    try:
        sos = _sos_filters[fs]
    except KeyError:
        sos = scipy.signal.butter(
            N=2,
            Wn=(5, 30),
            btype="bandpass",
            output="sos",
            fs=fs,
        )
        _sos_filters[fs] = sos

    # filtering bidirectionally removes filter delay. Note that we start
    # filtering the signal at the first non-flat index.
    filtered_ecg = scipy.signal.sosfiltfilt(sos, ecg[first_nonflat:])

    # scipy.signal.sosfiltfilt returns an array with negative strides. Both `np.correlate`
    # and `_thresholding` require contiguity, so ensuring this here once reduces total
    # runtime.
    filtered_ecg = np.ascontiguousarray(filtered_ecg)

    # five-point derivative as described by Pan & Tompkins
    derivative = np.correlate(filtered_ecg, np.array([-1, -2, 0, 2, 1]), mode="same")

    moving_window_width = int(0.15 * fs)  # 150ms

    if backend == "c":
        integrated_ecg = _squared_moving_integration(derivative, moving_window_width)
        beat_mask = _thresholding(filtered_ecg, integrated_ecg, fs)
    elif backend == "numba":
        integrated_ecg = _squared_moving_integration_numba(derivative, moving_window_width)
        beat_mask = _thresholding_numba(filtered_ecg, integrated_ecg, fs)
    elif backend == "python":
        integrated_ecg = np.convolve(
            derivative**2,
            np.ones(moving_window_width),
            mode="same",
        )
        beat_mask = _thresholding_py(filtered_ecg, integrated_ecg, fs)

    heartbeats = np.where(beat_mask)[0]
    heartbeats += first_nonflat  # Add back the number of flat samples
    return heartbeats

sleepecg.rri_similarity(detection, annotation, fs_resample=4)

Calculate measures of similarity between RR intervals.

RR intervals are calculated from detected and annotated heartbeat indices. The RR time series is then resampled to frequency fs_resample in the timespan common to both detection and annotation. Pearson's and Spearman's correlation coefficients as well as the root mean square error are returned.

Parameters:

  • detection (ndarray) –

    Detected heartbeat indices.

  • annotation (ndarray) –

    Annotated heartbeat indices.

Returns:

  • pearsonr( float ) –

    Pearson correlation coefficient between resampled RR time series.

  • spearmanr( float ) –

    Spearman correlation coefficient between resampled RR time series.

  • rmse( float ) –

    Root mean square error between resampled RR time series.

Source code in sleepecg/heartbeats.py
def rri_similarity(
    detection: np.ndarray,
    annotation: np.ndarray,
    fs_resample: float = 4,
) -> _RRISimilarityResult:
    """
    Calculate measures of similarity between RR intervals.

    RR intervals are calculated from detected and annotated heartbeat indices. The RR time
    series is then resampled to frequency `fs_resample` in the timespan common to both
    detection and annotation. Pearson's and Spearman's correlation coefficients as well as
    the root mean square error are returned.

    Parameters
    ----------
    detection : np.ndarray
        Detected heartbeat indices.
    annotation : np.ndarray
        Annotated heartbeat indices.

    Returns
    -------
    pearsonr : float
        Pearson correlation coefficient between resampled RR time series.
    spearmanr : float
        Spearman correlation coefficient between resampled RR time series.
    rmse : float
        Root mean square error between resampled RR time series.
    """
    rr_ann = np.diff(annotation)
    rr_det = np.diff(detection)

    t_ann = annotation[1:]
    t_det = detection[1:]

    start = max(min(t_ann), min(t_det))
    end = min(max(t_ann), max(t_det))
    t_new = np.arange(start, end, 1 / fs_resample)

    interp_det = scipy.interpolate.interp1d(t_det, rr_det)
    interp_ann = scipy.interpolate.interp1d(t_ann, rr_ann)
    det_resampled = interp_det(t_new)
    ann_resampled = interp_ann(t_new)

    return _RRISimilarityResult(
        scipy.stats.pearsonr(det_resampled, ann_resampled)[0],
        scipy.stats.spearmanr(det_resampled, ann_resampled)[0],
        np.sqrt(np.mean((det_resampled - ann_resampled) ** 2)),
    )

sleepecg.get_toy_ecg()

Load a 5 minute long electrocardigram sampled at 360 Hz.

Data taken from scipy.datasets.electrocardiogram: https://docs.scipy.org/doc/scipy/reference/generated/scipy.datasets.electrocardiogram.html

Returns:

  • ecg( ndarray ) –

    The electrocardiogram in millivolt (mV).

  • fs( int ) –

    The sampling frequency in Hz.

Source code in sleepecg/utils.py
def get_toy_ecg() -> tuple[np.ndarray, int]:
    """
    Load a 5 minute long electrocardigram sampled at 360 Hz.

    Data taken from scipy.datasets.electrocardiogram:
    https://docs.scipy.org/doc/scipy/reference/generated/scipy.datasets.electrocardiogram.html

    Returns
    -------
    ecg : np.ndarray
        The electrocardiogram in millivolt (mV).
    fs : int
        The sampling frequency in Hz.
    """
    ecg = np.load(Path(__file__).parent / "data" / "ecg.npz")["ecg"]
    fs = 360
    return ecg, fs