Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Pupil Deconvolution (port from pyeparse) #12505

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions doc/api/preprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ Projections:
convert_units
get_screen_visual_angle
interpolate_blinks
deconvolve
pupil_zscores
pupil_kernel

EEG referencing:

Expand Down
1 change: 1 addition & 0 deletions doc/changes/devel/12505.newfeature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added functions for deconvolving eyetracking pupil size data: :func:`mne.preprocessing.eyetracking.deconvolve` :func:`mne.preprocessing.eyetracking.pupil_zscores`, and :func:`mne.preprocessing.eyetracking.pupil_kernel`. by `Scott Huberty`_.
1 change: 1 addition & 0 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@
"n_signals",
"n_step",
"n_freqs",
"n_fit_times",
"wsize",
"Tx",
"M",
Expand Down
24 changes: 24 additions & 0 deletions doc/references.bib
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,17 @@ @article{HippEtAl2012
year = {2012}
}

@article{Hoeks1993,
author = {Hoeks, Bert and Levelt, Willem J M},
doi = {10.3758/BF03204445},
journal = {Behavior Research Methods, Instruments, & Computers},
number = {1},
pages = {16--26},
title = {Pupillary dilation as a measure of attention: a quantitative system analysis},
volume = {25},
year = {1993}
}

@article{HoldgrafEtAl2016,
author = {Holdgraf, Christopher R. and {de Heer}, Wendy and Pasley, Brian and Rieger, Jochem and Crone, Nathan and Lin, Jack J. and Knight, Robert T. and Theunissen, Frédéric E.},
doi = {10.1038/ncomms13654},
Expand Down Expand Up @@ -2153,6 +2164,19 @@ @inproceedings{StrohmeierEtAl2015
pages = {21--24}
}

@article{Wierda2012,
title = "Pupil dilation deconvolution reveals the dynamics of attention
at high temporal resolution",
doi = {10.1073/pnas.1201858109},
author = "Wierda, Stefan M and van Rijn, Hedderik and Taatgen, Niels A and
Martens, Sander",
journal = "Proceedings of the National Academy of Sciences",
volume = 109,
number = 22,
pages = "8456--8460",
year = 2012,
}

@misc{WikipediaSI,
author = "{Wikipedia contributors}",
title = "International System of Units — {Wikipedia}{,} The Free Encyclopedia",
Expand Down
1 change: 1 addition & 0 deletions ignore_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ pres
aas
vor
connec
hoeks
2 changes: 1 addition & 1 deletion mne/preprocessing/eyetracking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@

from .eyetracking import set_channel_types_eyetrack, convert_units
from .calibration import Calibration, read_eyelink_calibration
from ._pupillometry import interpolate_blinks
from ._pupillometry import interpolate_blinks, deconvolve, pupil_kernel, pupil_zscores
from .utils import get_screen_visual_angle
267 changes: 266 additions & 1 deletion mne/preprocessing/eyetracking/_pupillometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,19 @@

import numpy as np

from mne._fiff.pick import _picks_to_idx
from mne.parallel import parallel_func

from ..._fiff.constants import FIFF
from ...io import BaseRaw
from ...utils import _check_preload, _validate_type, logger, warn
from ...utils import (
_check_option,
_check_preload,
_validate_type,
fill_doc,
logger,
warn,
)


def interpolate_blinks(raw, buffer=0.05, match="BAD_blink", interpolate_gaze=False):
Expand Down Expand Up @@ -115,3 +125,258 @@ def _interpolate_blinks(raw, buffer, blink_annots, interpolate_gaze):
)
else:
warn("No channels were interpolated.")


@fill_doc
def pupil_zscores(epochs, baseline=(None, 0)):
"""Get normalized pupil data.

This function normalizes pupil responses within each epoch by subtracting
the mean pupil response during a specified baseline period and then dividing
by the standard deviation of all data (across time). This may help to compare
pupil responses across epochs or participants.

Parameters
----------
epochs : instance of Epochs
The epochs with pupil channels.
%(pupil_baseline)s

Returns
-------
pupil_data : array
An array of pupil size data, shape (n_epochs, n_channels, n_times).
"""
from mne import BaseEpochs

# Code ported from https://github.com/pyeparse/pyeparse
_check_preload(epochs, "Z-score normalization")
_validate_type(epochs, BaseEpochs, "epochs")
_validate_type(baseline, (tuple, list, np.ndarray), "baseline")

pupil_picks = _picks_to_idx(epochs.info, "pupil", allow_empty=False)
if len(baseline) != 2:
raise RuntimeError("baseline must be a 2-element list")
baseline = np.array(baseline)
if baseline[0] is None:
baseline[0] = epochs.times[0]
if baseline[1] is None:
baseline[1] = epochs.times[-1]
baseline = epochs.time_as_index(baseline)
zs = epochs.get_data(pupil_picks)
std = np.nanstd(zs.flat)
bl = np.nanmean(zs[..., baseline[0] : baseline[1] + 1], axis=-1)
zs -= bl[:, np.newaxis, :]
zs /= std
return zs


@fill_doc
def deconvolve(
epochs,
spacing=0.1,
baseline=(None, 0),
bounds=None,
max_iter=500,
kernel=None,
n_jobs=1,
acc=1e-6,
method="minimize",
reg=100,
):
r"""Deconvolve pupillary responses.

Parameters
----------
epochs : instance of Epochs
The epochs with pupil data to deconvolve.
spacing : float | array
Spacing of time points to use for deconvolution. Can also
be an array to directly specify time points to use.
%(pupil_baseline)s
This is passed to :func:`~mne.preprocessing.eyetracking.pupil_zscores`.
bounds : array of shape (2,) | None
Limits for deconvolution values. Can be, e.g. ``(0, np.inf)`` to
constrain to positive values. If ``None``, no bounds are used. Default is
``None``.
max_iter : int
Maximum number of iterations of minimization algorithm. Default is ``500``.
kernel : array | None
Kernel to assume when doing deconvolution. If ``None``, the
Hoeks and Levelt (1993)\ :footcite:p:`Hoeks1993` kernel will be used.
%(n_jobs)s
acc : float
The requested accuracy. Lower accuracy generally means smoother
fits.
method : ``"minimize"`` | ``"inverse"``
Can be ``"minimize"`` to use SLSQP or ``"inverse"`` to use
Tikhonov-regularized pseudoinverse. Default is ``"minimize"``.
reg : float
Regularization factor for pseudoinverse calculation. Only used if method is
``"inverse"``. Default is 100.

Returns
-------
fit : array, shape (n_epochs, n_channels, n_fit_times)
Array of fits.
times : array, shape (n_fit_times,)
The array of times at which points were fit.

Notes
-----
This method is adapted from Wierda et al., 2012, "Pupil dilation
deconvolution reveals the dynamics of attention at high temporal
resolution."\ :footcite:p:`Wierda2012`

Our implementation does not, by default, force all weights to be
greater than zero. It also does not do first-order detrending,
which the Wierda paper discusses implementing.

References
----------
.. footbibliography::
"""
from scipy import linalg

# Code ported from https://github.com/pyeparse/pyeparse
_validate_type(spacing, (float, np.ndarray, tuple, list), "spacing")
_validate_type(bounds, (type(None), tuple, list, np.ndarray), "bounds")
_validate_type(max_iter, int, "max_iter")
_validate_type(kernel, (np.ndarray, type(None)), "kernel")
_validate_type(n_jobs, int, "n_jobs")
_validate_type(acc, float, "acc")
_validate_type(method, str, "method")
_check_option("method", method, ["minimize", "inverse"])
_validate_type(reg, (int, float), "reg")

if bounds is not None:
bounds = np.array(bounds)
if bounds.ndim != 1 or bounds.size != 2:
raise RuntimeError("bounds must be 2-element array or None")
if kernel is None:
kernel = pupil_kernel(epochs.info["sfreq"])
else:
kernel = np.array(kernel, np.float64)
if kernel.ndim != 1:
raise TypeError("kernel must be 1D")

# get the data (and make sure it exists)
pupil_data = pupil_zscores(epochs, baseline=baseline)

# set up parallel function (and check n_jobs)
parallel, p_fun, n_jobs = parallel_func(_do_deconv, n_jobs)

# figure out where the samples go
n_samp = len(epochs.times)
if not isinstance(spacing, (np.ndarray, tuple, list)):
times = np.arange(epochs.times[0], epochs.times[-1], spacing)
times = np.unique(times)
else:
times = np.asanyarray(spacing)
samples = epochs.time_as_index(times)
if len(samples) == 0:
warn("No usable samples")
return np.array([]), np.array([])

# convert bounds to slsqp representation
if bounds is not None:
bounds = np.array([bounds for _ in range(len(samples))])
else:
bounds = [] # compatible with old version of scipy

# Build the convolution matrix
conv_mat = np.zeros((n_samp, len(samples)))
for li, loc in enumerate(samples):
eidx = min(loc + len(kernel), n_samp)
conv_mat[loc:eidx, li] = kernel[: eidx - loc]

# do the fitting
if method == "inverse":
u, s, v = linalg.svd(conv_mat, full_matrices=False)
# Threshold small singular values
s[s < 1e-7 * s[0]] = 0
# Regularize non-zero singular values
s[s > 0] /= s[s > 0] ** 2 + reg
inv_conv_mat = np.dot(v.T, s[:, np.newaxis] * u.T)
fit = np.dot(pupil_data, inv_conv_mat.T)
else: # minimize
fit_fails = parallel(
p_fun(data, conv_mat, bounds, max_iter, acc)
for data in np.array_split(pupil_data, n_jobs)
)
fit = np.concatenate([f[0] for f in fit_fails])
fails = np.concatenate([f[1] for f in fit_fails])
if np.any(fails):
reasons = ", ".join(str(r) for r in np.setdiff1d(np.unique(fails), [0]))
warn(
f"{np.sum(fails != 0)} out of {len(fails)} fits "
f"did not converge (reasons: {reasons})"
)
return fit, times


def _do_deconv(pupil_data, conv_mat, bounds, max_iter, acc):
"""Parallelize deconvolution helper function."""
# Code ported from https://github.com/pyeparse/pyeparse
from scipy.optimize import fmin_slsqp

x0 = np.zeros(conv_mat.shape[1])
fit = np.empty((pupil_data.shape[0], pupil_data.shape[1], conv_mat.shape[1]))
failed = np.empty(fit.shape)
for ei, data in enumerate(pupil_data):
out = fmin_slsqp(
_score,
x0,
args=(data, conv_mat),
epsilon=1e-4,
bounds=bounds,
disp=False,
full_output=True,
iter=max_iter,
acc=acc,
)
fit[ei, :, :] = out[0]
failed[ei, :, :] = out[3]
return fit, failed


def _score(vals, x_0, conv_mat):
return np.mean((x_0 - conv_mat.dot(vals)) ** 2)


def pupil_kernel(sfreq, dur=4.0, t_max=0.930, n=10.1, s=1.0):
r"""Generate pupil response kernel modeled as an Erlang gamma function.

Parameters
----------
sfreq : int
Sampling frequency (samples/second) to use in generating the kernel.
dur : float
Length (in seconds) of the generated kernel. Default is ``4.0`` seconds.
t_max : float
Time (in seconds) where the response maximum is stipulated to occur. Default is
``0.930`` seconds, as in Hoeks and Levelt (1993)\ :footcite:p:`Hoeks1993`.
n : float
Number of negative-exponential layers in the cascade defining the
gamma function. Default is ``10.1``, as in Hoeks and Levelt (1993)\
:footcite:p:`Hoeks1993`.
s : float | None
Desired value for the area under the kernel. If ``None``, no scaling is
performed. Default is ``1.0``.

Returns
-------
h : array
The generated kernel.

References
----------
.. footbibliography::
"""
# Code ported from https://github.com/pyeparse/pyeparse
n_samp = int(np.round(sfreq * dur))
t = np.arange(n_samp, dtype=float) / sfreq
h = (t**n) * np.exp(-n * t / t_max)
scal = 1.0 if s is None else float(s) / (np.sum(h) * (t[1] - t[0]))
h = scal * h
return h