Heartbeat detection

Source code in sleepecg/
def detect_heartbeats(ecg: np.ndarray, fs: float, backend: str = "c") -> np.ndarray:
    Detect heartbeats in an ECG signal.

    This is a modified version of the beat detection algorithm using adaptive thresholds
    described by Pan & Tompkins in 1985.

    Modifications/additions to the
    [original algorithm]( are:

    - Instead of a hardware filter adjusted to the sampling frequency of the MIT-BIH
      Arrhythmia Database, a second order bandpass with cutoff frequencies 5 and 30 Hz
      created via `scipy.signal.butter()` is used.
    - A bidirectional filter is used to remove filter delay.
    - The integration window is centered on the filtered signal, i.e. a peak in the filtered
      signal corresponds to a plateau in the integrated signal, not a saddle in the rising
      edge. This lets the adaptive threshold for the integrated signal remain at a higher
      level, which is less susceptible to noise.
    - Learning phase 1 is not described in detail in the original paper. This implementation
      uses maximum and mean values inside the first two seconds to initialize
      SPKI/SPKF/NPKI/NPKF. Details are provided in the `_thresholding` code.
    - In addition to the original searchback criterion, a searchback is also performed if no
      peak is found during the first second of the signal or no second peak is found 1.5 s
      after the first one. This ensures correct behaviour at signal start in case an
      unusually large peak during learning phase 1 messes up threshold initialization.
    - After an unsuccessful searchback, the procedure is repeated in the same interval with
      further reduced thresholds, up to 16 times.

    ecg : np.ndarray
        ECG signal. Note that the unit of the data does not matter (the algorithm will
        return similar results regardless of the scaling of the data).
    fs : float
        Sampling frequency in Hz. For best results, a sampling frequency of at least 100 Hz
        is recommended.
    backend : {'c', 'numba', 'python'}
        Which implementation of the squared moving integration and thresholding algorithm to
        use. If available, `'c'` is the fastest implementation, `'numba'` is about 25%
        slower, and `'python'` is about 20 times slower but provided as a fallback. By
        default `'c'`.

        Indices of detected heartbeats.

    Detect heartbeats in a short electrocardiogram:

    >>> from sleepecg import detect_heartbeats, get_toy_ecg
    >>> ecg, fs = get_toy_ecg()  # 5 min of ECG data at 360 Hz
    >>> heartbeats = detect_heartbeats(ecg, fs)
    >>> print(f"{len(heartbeats)} heartbeats detected")
    478 heartbeats detected
    if backend not in _all_backends:
        raise ValueError(
            f"Invalid backend for detect_heartbeats: {backend!r}. "
            f"Possible options are: {_all_backends}."
    if backend not in _available_backends:
        fallback = _available_backends[0]
        warnings.warn(f"Backend {backend!r} not available, using {fallback!r} instead.")
        backend = fallback

    # check for flat data at the beginning to avoid messing up detection thresholds
    if len(ecg) < 2:
        raise ValueError("ECG signal must have more than one sample.")
    if ecg[1] == ecg[0]:
        idx_nonflat = np.nonzero(ecg != ecg[0])[0]
        if not len(idx_nonflat):
            raise ValueError("ECG signal is flat. Please check your data.")
        first_nonflat = idx_nonflat[0]  # first non-flat index
        first_nonflat = 0

    # cache filter in global variable to speed up runtime (especially for short signals)
        sos = _sos_filters[fs]
    except KeyError:
        sos = scipy.signal.butter(
            Wn=(5, 30),
        _sos_filters[fs] = sos

    # filtering bidirectionally removes filter delay. Note that we start
    # filtering the signal at the first non-flat index.
    filtered_ecg = scipy.signal.sosfiltfilt(sos, ecg[first_nonflat:])

    # scipy.signal.sosfiltfilt returns an array with negative strides. Both `np.correlate`
    # and `_thresholding` require contiguity, so ensuring this here once reduces total
    # runtime.
    filtered_ecg = np.ascontiguousarray(filtered_ecg)

    # five-point derivative as described by Pan & Tompkins
    derivative = np.correlate(filtered_ecg, np.array([-1, -2, 0, 2, 1]), mode="same")

    moving_window_width = int(0.15 * fs)  # 150ms

    if backend == "c":
        integrated_ecg = _squared_moving_integration(derivative, moving_window_width)
        beat_mask = _thresholding(filtered_ecg, integrated_ecg, fs)
    elif backend == "numba":
        integrated_ecg = _squared_moving_integration_numba(derivative, moving_window_width)
        beat_mask = _thresholding_numba(filtered_ecg, integrated_ecg, fs)
    elif backend == "python":
        integrated_ecg = np.convolve(
        beat_mask = _thresholding_py(filtered_ecg, integrated_ecg, fs)

    heartbeats = np.where(beat_mask)[0]
    heartbeats += first_nonflat  # Add back the number of flat samples
    return heartbeats

sleepecg.rri_similarity(detection, annotation, fs_resample=4)

Calculate measures of similarity between RR intervals.

RR intervals are calculated from detected and annotated heartbeat indices. The RR time series is then resampled to frequency fs_resample in the timespan common to both detection and annotation. Pearson's and Spearman's correlation coefficients as well as the root mean square error are returned.


  • detection (ndarray) –

    Detected heartbeat indices.

  • annotation (ndarray) –

    Annotated heartbeat indices.


  • pearsonr( float ) –

    Pearson correlation coefficient between resampled RR time series.

  • spearmanr( float ) –

    Spearman correlation coefficient between resampled RR time series.

  • rmse( float ) –

    Root mean square error between resampled RR time series.

Source code in sleepecg/
def rri_similarity(
    detection: np.ndarray,
    annotation: np.ndarray,
    fs_resample: float = 4,
) -> _RRISimilarityResult:
    Calculate measures of similarity between RR intervals.

    RR intervals are calculated from detected and annotated heartbeat indices. The RR time
    series is then resampled to frequency `fs_resample` in the timespan common to both
    detection and annotation. Pearson's and Spearman's correlation coefficients as well as
    the root mean square error are returned.

    detection : np.ndarray
        Detected heartbeat indices.
    annotation : np.ndarray
        Annotated heartbeat indices.

    pearsonr : float
        Pearson correlation coefficient between resampled RR time series.
    spearmanr : float
        Spearman correlation coefficient between resampled RR time series.
    rmse : float
        Root mean square error between resampled RR time series.
    rr_ann = np.diff(annotation)
    rr_det = np.diff(detection)

    t_ann = annotation[1:]
    t_det = detection[1:]

    start = max(min(t_ann), min(t_det))
    end = min(max(t_ann), max(t_det))
    t_new = np.arange(start, end, 1 / fs_resample)

    interp_det = scipy.interpolate.interp1d(t_det, rr_det)
    interp_ann = scipy.interpolate.interp1d(t_ann, rr_ann)
    det_resampled = interp_det(t_new)
    ann_resampled = interp_ann(t_new)

    return _RRISimilarityResult(
        scipy.stats.pearsonr(det_resampled, ann_resampled)[0],
        scipy.stats.spearmanr(det_resampled, ann_resampled)[0],
        np.sqrt(np.mean((det_resampled - ann_resampled) ** 2)),


Load a 5 minute long electrocardigram sampled at 360 Hz.

Data taken from scipy.datasets.electrocardiogram:


  • ecg( ndarray ) –

    The electrocardiogram in millivolt (mV).

  • fs( int ) –

    The sampling frequency in Hz.

Source code in sleepecg/
def get_toy_ecg() -> tuple[np.ndarray, int]:
    Load a 5 minute long electrocardigram sampled at 360 Hz.

    Data taken from scipy.datasets.electrocardiogram:

    ecg : np.ndarray
        The electrocardiogram in millivolt (mV).
    fs : int
        The sampling frequency in Hz.
    ecg = np.load(Path(__file__).parent / "data" / "ecg.npz")["ecg"]
    fs = 360
    return ecg, fs