Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add new ERP measures #12434

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions mne/stats/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ __all__ = [
"summarize_clusters_stc",
"ttest_1samp_no_p",
"ttest_ind_no_p",
"erp",
]
from . import erp
from ._adjacency import combine_adjacency
from .cluster_level import (
_st_mask_from_s_inds,
Expand Down
2 changes: 2 additions & 0 deletions mne/stats/erp/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# ruff: noqa
# flake8: noqa
317 changes: 317 additions & 0 deletions mne/stats/erp/_erp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
# ruff: noqa
# flake8: noqa

from scipy import integrate
import pandas as pd
import numpy as np
from mne._fiff.pick import _pick_data_channels, _picks_to_idx, pick_info
from mne.channels.layout import _merge_ch_data, _pair_grad_sensors
from mne.utils import (
_check_option,
_time_mask,
)


def _get_peak(
evoked, tmin=None, tmax=None, picks="all", mode="abs", average=False, strict=True
):
"""Helper function to get the peak amplitude and latency of an evoked response."""

data = evoked.get_data(picks=picks)
times = evoked.times
mask = _time_mask(times, tmin, tmax, evoked.info["sfreq"])
data_masked = data[:, mask]

if average:
data_masked = np.mean(data_masked, axis=0)

if mode == "abs":
data_masked = np.abs(data_masked)
elif mode == "neg":
if strict and not np.any(data_masked < 0):
raise ValueError(
"No negative values encountered. Cannot operate in neg mode."
)
data_masked = -data_masked
elif mode == "pos":
if strict and not np.any(data_masked > 0):
raise ValueError(
"No positive values encountered. Cannot operate in pos mode."
)

max_indices = np.argmax(data_masked, axis=1)
peak_amplitudes = data[np.arange(data.shape[0]), max_indices + np.where(mask)[0][0]]
peak_latencies = times[max_indices + np.where(mask)[0][0]]

return peak_latencies, peak_amplitudes, data_masked, mask, times


def get_peak(
evoked,
tmin=None,
tmax=None,
picks="all",
mode="abs",
average=False,
strict=True,
):
"""Get the peak amplitude and latency of an evoked response and return a
DataFrame.

Parameters
----------
evoked : instance of Evoked
The evoked response object.
%(erp_evoked_tmin_tmax)s
%(picks_all)s
mode : str
Specifies how the peak amplitude should be determined. Can be one of:
- 'abs' : The peak amplitude is the maximum absolute value.
- 'neg': The peak amplitude is the maximum negative value. If there are no
negative values and `strict` is True, a ValueError is raised.
- 'pos': The peak amplitude is the maximum positive value. If there are no
positive values and `strict` is True, a ValueError is raised.
Defaults to abs'.
average : bool
If True, the peak amplitude is computed by averaging the data across
channels before finding the peak. Defaults to False.
%(erp_strict)s

Returns
-------
peak_df : pd.DataFrame
A DataFrame with columns 'channels', 'latency', and 'amplitude'
containing the peak amplitude and latency for each channel.
(Will only contain one row 'with 'latency' and 'amplitude' if average=True)
"""

_check_option("mode", mode, ["abs", "neg", "pos", "intg"])
peak_latencies, peak_amplitudes, data_masked, mask, times = _get_peak(
evoked, tmin, tmax, picks, mode, average, strict
)

peak_df = pd.DataFrame(
{
"channels": evoked.ch_names,
"latency": peak_latencies,
"amplitude": peak_amplitudes,
}
)
if average:
peak_df = peak_df.iloc[0]

return peak_df


def get_area(
evoked,
tmin=None,
tmax=None,
picks="all",
mode="abs",
average=False,
):
"""
Get the area under the curve of an evoked response within a given time window.

Parameters
----------
evoked : instance of Evoked
The evoked response object.
%(erp_evoked_tmin_tmax)s
%(picks_all)s
mode : str
Specifies how the area should be computed. Can be one of:
- 'abs': The absolute value of the data is used.
- 'neg': Only negative values are considered.
- 'pos': Only positive values are considered.
- 'intg': The integral of the data is computed without rectification.
Defaults to abs'.
average : bool
If True, the area is computed by averaging the data across channels
before integration. Defaults to False.

Returns
-------
area_df : pd.DataFrame
A DataFrame with columns 'channels' and 'area' containing the area
under the curve for each channel. (Will only contain one row with
'area' if average=True)

"""
_check_option("mode", mode, ["abs", "neg", "pos", "intg"])
data = evoked.get_data(picks=picks)
times = evoked.times
mask = _time_mask(times, tmin, tmax, evoked.info["sfreq"])
data_masked = data[:, mask]

if average:
data_masked = np.mean(data_masked, axis=0)
if mode == "abs":
data_masked = np.abs(data_masked)
elif mode == "neg":
data_masked = np.clip(data_masked, None, 0)
elif mode == "pos":
data_masked = np.clip(data_masked, 0, None)

area = integrate.trapz(data_masked, times[mask], axis=1)

if average:
area = area[0]
area_df = pd.DataFrame({"channels": evoked.ch_names, "area": area})

return area_df


def get_frac_peak_latency(
evoked,
frac=0.5,
tmin=None,
tmax=None,
picks="all",
mode="abs",
average=False,
strict=False,
):
"""Get the latency at which the peak amplitude reaches a certain fraction of its
maximum value.

Parameters
----------
evoked : instance of Evoked
The evoked response object.
frac : float
The fraction of the peak amplitude at which to compute the latency.
Defaults to 0.5.
%(erp_evoked_tmin_tmax)s
%(picks_all)s
mode : str
Specifies how the peak amplitude should be determined. Can be one of:
- 'abs' : The peak amplitude is the maximum absolute value.
- 'neg': The peak amplitude is the maximum negative value. If there are no
negative values and `strict` is True, a ValueError is raised.
- 'pos': The peak amplitude is the maximum positive value. If there are no
positive values and `strict` is True, a ValueError is raised.
Defaults to abs'.
average : bool
If True, the fractional peak latency is computed by averaging the data
across channels before finding the latency. Defaults to False.
strict : bool
If True, raise an error if values are all positive when detecting
a minimum (mode='neg'), or all negative when detecting a maximum
(mode='pos'). Defaults to False.

Returns
-------
frac_peak_df : pd.DataFrame
A DataFrame with columns 'channels', 'fractional_peak_onset',
'fractional_peak_offset', and 'amplitude' containing the latency at which...

"""
_check_option("mode", mode, ["abs", "neg", "pos"])

peak_latencies, peak_amplitudes, data_masked, mask, times = _get_peak(
evoked, tmin, tmax, picks, mode, average, strict
)
frac_amplitudes = frac * peak_amplitudes[:, np.newaxis]

# Find the first time point before the peak where the signal reaches the fractional threshold
frac_peak_onset = np.argmax(data_masked >= frac_amplitudes, axis=1)
frac_peak_onset_latency = times[mask][frac_peak_onset]

# Find the first time point after the peak where the signal reaches the fractional threshold
peak_idx = np.argmax(data_masked, axis=1)
frac_peak_offset = np.array(
[
peak_idx[i] + np.argmin(data_masked[i, peak_idx[i] :] <= frac_amplitudes[i])
for i in range(data_masked.shape[0])
]
)
frac_peak_offset_latency = times[mask][frac_peak_offset]

frac_peak_df = pd.DataFrame(
{
"channels": evoked.ch_names,
"fractional_peak_onset": frac_peak_onset_latency,
"fractional_peak_offset": frac_peak_offset_latency,
"amplitude": peak_amplitudes,
}
)

if average:
frac_peak_df = frac_peak_df.iloc[0]

return frac_peak_df


def get_frac_area_latency(
evoked,
frac=0.5,
tmin=None,
tmax=None,
picks="all",
mode="abs",
average=False,
strict=False,
):
"""Get the latency at which the area under the curve reaches a certain fraction of its
maximum value.

Parameters
----------
evoked : instance of Evoked
The evoked response object.
frac : float
The fraction of the area at which to compute the latency. Defaults to 0.5.
%(erp_evoked_tmin_tmax)s
%(picks_all)s
mode : str
Specifies how the area should be computed. Can be one of:
- 'abs': The absolute value of the data is used.
- 'neg': Only negative values are considered.
- 'pos': Only positive values are considered.
- 'intg': The integral of the data is computed without rectification.
Defaults to abs'.
average : bool
If True, the fractional area latency is computed by averaging the data
across channels before finding the latency. Defaults to False.
%(erp_strict)s


Returns
-------
frac_area_df : pd.DataFrame
A DataFrame with columns 'channels', 'fractional_area_latency',
and 'area' containing the latency at which...
"""
_check_option("mode", mode, ["abs", "neg", "pos", "intg"])
data = evoked.get_data(picks=picks)
times = evoked.times
mask = _time_mask(times, tmin, tmax, evoked.info["sfreq"])
data_masked = data[:, mask]
times = times[mask]
if average:
data_masked = np.mean(data_masked, axis=0, keepdims=True)
if mode == "abs":
data_masked = np.abs(data_masked)
elif mode == "neg":
data_masked = np.clip(data_masked, None, 0)
elif mode == "pos":
data_masked = np.clip(data_masked, 0, None)
area = np.trapz(data_masked, times, axis=1)
frac_area = frac * area
frac_area_latency = np.full(len(evoked.ch_names), np.nan)
for ch in range(data_masked.shape[0]):
idx = np.where(np.cumsum(data_masked[ch]) >= frac_area[ch])[0]
if len(idx) > 0:
frac_area_latency[ch] = times[idx[0]]
frac_area_df = pd.DataFrame(
{
"channels": evoked.ch_names,
"fractional_area_latency": frac_area_latency,
"area": area,
}
)
if average:
frac_area_df = frac_area_df.iloc[0]
return frac_area_df
13 changes: 13 additions & 0 deletions mne/utils/docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,19 @@ def _reflow_param_docstring(docstring, has_first_line=True, width=75):
time are included. Defaults to ``-0.2`` and ``0.5``, respectively.
"""

docdict["erp_evoked_tmin_tmax"] = """
tmin, tmax : float
Start and end time of the ERP computation window in seconds. Defaults to
``None`` and ``None``, which corresponds to the entire Evoked object.
"""
docdict["erp_strict"] = """
strict : bool
If True, raise an error if values are all positive when detecting
a minimum (mode='neg'), or all negative when detecting a maximum
(mode='pos'). Defaults to True.
"""


docdict["estimate_plot_psd"] = """\
estimate : str, {'auto', 'power', 'amplitude'}
Can be "power" for power spectral density (PSD), "amplitude" for
Expand Down