"""Main PAC class."""
import numpy as np
import logging
from tensorpac.spectral import spectral, hilbertm
from tensorpac.methods import (get_pac_fcn, pacstr, compute_surrogates,
erpac, ergcpac, _ergcpac_perm, preferred_phase,
normalize)
from tensorpac.gcmi import copnorm
from tensorpac.visu import _PacVisual, _PacPlt, _PolarPlt
from tensorpac.io import set_log_level
from tensorpac.config import CONFIG
logger = logging.getLogger('tensorpac')
class _PacObj(object):
"""Main class for relative PAC objects."""
def __init__(self, f_pha=[2, 4], f_amp=[60, 200], dcomplex='hilbert',
cycle=(3, 6), width=7):
# Frequency checking :
from tensorpac.utils import pac_vec
self._f_pha, self._f_amp = pac_vec(f_pha, f_amp)
self._xvec, self._yvec = self.f_pha.mean(1), self.f_amp.mean(1)
# Check spectral properties :
self._speccheck(dcomplex, cycle, width)
def __str__(self):
"""String representation."""
return self.method
def filter(self, sf, x, ftype='phase', keepfilt=False, edges=None,
n_jobs=-1):
"""Filt the data in the specified frequency bands.
Parameters
----------
sf : float
The sampling frequency.
x : array_like
Array of data of shape (n_epochs, n_times)
ftype : {'phase', 'amplitude'}
Specify if you want to extract phase ('phase') or the amplitude
('amplitude').
n_jobs : int | -1
Number of jobs to compute PAC in parallel. For very large data,
set this parameter to 1 in order to prevent large memory usage.
keepfilt : bool | False
Specify if you only want the filtered data (True). This parameter
is only available with dcomplex='hilbert' and not wavelet.
edges : int | None
Number of samples to discard to avoid edge effects due to filtering
Returns
-------
xfilt : array_like
The filtered data of shape (n_freqs, n_epochs, n_times)
"""
# ---------------------------------------------------------------------
# check inputs
assert isinstance(sf, (int, float)), ("The sampling frequency must be "
"a float number.")
# Compatibility between keepfilt and wavelet :
if (keepfilt is True) and (self._dcomplex is 'wavelet'):
raise ValueError("Using wavelet for the complex decomposition do "
"not allow to get filtered data only. Set the "
"keepfilt parameter to False or set dcomplex to "
"'hilbert'.")
assert ftype in ['phase', 'amplitude'], ("ftype must either be 'phase'"
" or 'amplitude.'")
mne_epochs_type = CONFIG['MNE_EPOCHS_TYPE']
if not isinstance(x, np.ndarray) and type(x) in mne_epochs_type:
x = x.get_data()
sf = x.info['sfreq']
if x.ndim == 1:
x = x[np.newaxis, :]
assert x.ndim == 2, ("x should be a 2d array like (n_epochs, n_times)")
# check edges
if not isinstance(edges, int):
edges = slice(None)
else:
logger.debug(f" Edges {edges} time samples ignored")
edges = slice(edges, -edges)
self._edges = edges
# ---------------------------------------------------------------------
# Switch between phase or amplitude :
if ftype is 'phase':
tosend = 'pha' if not keepfilt else None
xfilt = spectral(x, sf, self.f_pha, tosend, self._dcomplex,
self._cycle[0], self._width, n_jobs)
elif ftype is 'amplitude':
tosend = 'amp' if not keepfilt else None
xfilt = spectral(x, sf, self.f_amp, tosend, self._dcomplex,
self._cycle[1], self._width, n_jobs)
return xfilt[..., edges]
def _speccheck(self, dcomplex=None, cycle=None, width=None):
"""Check spectral parameters."""
# Check cycle :
if cycle is not None:
cycle = np.asarray(cycle)
if (len(cycle) is not 2) or not cycle.dtype == int:
raise ValueError("Cycle must be a tuple of two integers.")
else:
self._cycle = cycle
# Check complex decomposition :
if dcomplex is not None:
if dcomplex not in ['hilbert', 'wavelet']:
raise ValueError("dcomplex must either be 'hilbert' or "
"'wavelet'.")
else:
self._dcomplex = dcomplex
# Convert Morlet's width :
if width is not None:
self._width = int(width)
@staticmethod
def _phampcheck(pha, amp):
"""Check phase and amplitude values."""
assert pha.ndim == 3, ("`pha` should have a shape of (n_pha, n_epochs,"
" n_times)")
assert amp.ndim == 3, ("`amp` should have a shape of (n_pha, n_epochs,"
" n_times)")
assert pha.shape[1:] == amp.shape[1:], ("`pha` and `amp` must have the"
" same number of trials, "
"channels and time points")
assert np.ptp(pha) <= 2 * np.pi, ("Your phase is probably in degrees "
"and should be converted in radians "
"using either np.degrees or "
"np.deg2rad.")
# Force the phase to be in [-pi, pi] :
pha = (pha + np.pi) % (2. * np.pi) - np.pi
return pha, amp
def _infer_pvalues(self, effect, perm, p=.05, mcp='maxstat'):
"""Global function for statistical inferences.
In order to work this method requires :
* effect = array like of shape (n_dims...)
* perm = array like of shape (n_perm, n_dims...)
"""
assert all([isinstance(k, np.ndarray) for k in (effect, perm)])
n_perm = perm.shape[0]
# compute the minimum number of required permutations
n_perm_req = int(10. / p)
if n_perm < n_perm_req:
logger.warning(f"For inferences at p<{p}, it is recommended to per"
f"form at least n_perm={n_perm_req} permutations")
# ---------------------------------------------------------------------
logger.info(f" infer p-values at (p={p}, mcp={mcp})")
# computes the pvalues
if mcp is 'maxstat':
max_p = perm.reshape(n_perm, -1).max(1)[np.newaxis, ...]
nb_over = (effect[..., np.newaxis] <= max_p).sum(-1)
pvalues = nb_over / n_perm
# non-signi. p-values are set to 1. and min(pvalues) = 1 / n_perm
pvalues[pvalues >= p] = 1.
pvalues = np.maximum(1. / n_perm, pvalues)
elif mcp in ['fdr', 'bonferroni']:
from mne.stats import fdr_correction, bonferroni_correction
fcn = fdr_correction if mcp is 'fdr' else bonferroni_correction
# compute the p-values
pvalues = (effect[np.newaxis, ...] <= perm).sum(0) / n_perm
pvalues = np.maximum(1. / n_perm, pvalues)
# apply correction
is_signi, pvalues = fcn(pvalues, alpha=p)
pvalues[~is_signi] = 1.
return pvalues
@property
def f_pha(self):
"""Vector of phases of shape (n_pha, 2)."""
return self._f_pha
@property
def f_amp(self):
"""Vector of amplitudes of shape (n_amp, 2)."""
return self._f_amp
@property
def xvec(self):
"""Vector of phases of shape (n_pha,) use for plotting."""
return self._xvec
@property
def yvec(self):
"""Vector of amplitudes of shape (n_amp,) use for plotting."""
return self._yvec
# ----------- DCOMPLEX -----------
@property
def dcomplex(self):
"""Get the dcomplex value."""
return self._dcomplex
@dcomplex.setter
def dcomplex(self, value):
"""Set dcomplex value."""
self._speccheck(dcomplex=value)
# ----------- CYCLE -----------
@property
def cycle(self):
"""Get the cycle value."""
return self._cycle
@cycle.setter
def cycle(self, value):
"""Set cycle value."""
self._speccheck(cycle=value)
# ----------- WIDTH -----------
@property
def width(self):
"""Get the width value."""
return self._width
@width.setter
def width(self, value):
"""Set width value."""
self._width = value
[docs]class Pac(_PacObj, _PacPlt):
"""Compute Phase-Amplitude Coupling (PAC).
Computing PAC is assessed in three steps : compute the real PAC, compute
surrogates and finally, because PAC is very sensible to the noise, correct
the real PAC by the surrogates. This implementation is modular i.e. it lets
you choose among a large range of possible combinations.
Parameters
----------
idpac : tuple/list | (1, 1, 3)
Choose the combination of methods to use in order to extract PAC.
This tuple must be composed of three integers where each one them
refer
* First digit : refer to the pac method
- 1 : Mean Vector Length (MVL) :cite:`canolty2006high`
(see :func:`tensorpac.methods.mean_vector_length`)
- 2 : Modulation Index (MI) :cite:`tort2010measuring`
(see :func:`tensorpac.methods.modulation_index`)
- 3 : Heights Ratio (HR) :cite:`lakatos2005oscillatory`
(see :func:`tensorpac.methods.heights_ratio`)
- 4 : ndPAC :cite:`ozkurt2012statistically`
(see :func:`tensorpac.methods.norm_direct_pac`)
- 5 : Phase-Locking Value (PLV)
:cite:`penny2008testing,lachaux1999measuring`
(see :func:`tensorpac.methods.phase_locking_value`)
- 6 : Gaussian Copula PAC (GCPAC) :cite:`ince2017statistical`
(see :func:`tensorpac.methods.gauss_cop_pac`)
* Second digit : refer to the method for computing surrogates
- 0 : No surrogates
- 1 : Swap phase / amplitude across trials
:cite:`tort2010measuring`
(see :func:`tensorpac.methods.swap_pha_amp`)
- 2 : Swap amplitude time blocks
:cite:`bahramisharif2013propagating`
(see :func:`tensorpac.methods.swap_blocks`)
- 3 : Time lag :cite:`canolty2006high`
(see :func:`tensorpac.methods.time_lag`)
* Third digit : refer to the normalization method for correction
- 0 : No normalization
- 1 : Substract the mean of surrogates
- 2 : Divide by the mean of surrogates
- 3 : Substract then divide by the mean of surrogates
- 4 : Z-score
f_pha, f_amp : list/tuple/array | def: [2, 4] and [60, 200]
Frequency vector for the phase and amplitude. Here you can use
several forms to define those vectors :
* Basic list/tuple (ex: [2, 4] or [8, 12]...)
* List of frequency bands (ex: [[2, 4], [5, 7]]...)
* Dynamic definition : (start, stop, width, step)
* Range definition (ex : np.arange(3) => [[0, 1], [1, 2]])
* Using a string. `f_pha` and `f_amp` can be 'lres', 'mres', 'hres'
respectively for low, middle and high resolution vectors. In that
case, it uses the definition proposed by Bahramisharif et al.
2013 :cite:`bahramisharif2013propagating` i.e
f_pha = [f - f / 4, f + f / 4] and f_amp = [f - f / 8, f + f / 8]
dcomplex : {'wavelet', 'hilbert'}
Method for the complex definition. Use either 'hilbert' or
'wavelet'.
cycle : tuple | (3, 6)
Control the number of cycles for filtering (only if dcomplex is
'hilbert'). Should be a tuple of integers where the first one
refers to the number of cycles for the phase and the second for the
amplitude :cite:`bahramisharif2013propagating`.
width : int | 7
Width of the Morlet's wavelet.
n_bins : int | 18
Number of bins for the KLD and HR PAC method
:cite:`tort2010measuring,lakatos2005oscillatory`
"""
[docs] def __init__(self, idpac=(1, 2, 3), f_pha=[2, 4], f_amp=[60, 200],
dcomplex='hilbert', cycle=(3, 6), width=7, n_bins=18,
verbose=None):
"""Check and initialize."""
set_log_level(verbose)
self._idcheck(idpac)
_PacObj.__init__(self, f_pha=f_pha, f_amp=f_amp, dcomplex=dcomplex,
cycle=cycle, width=width)
_PacPlt.__init__(self)
self.n_bins = int(n_bins)
logger.info("Phase Amplitude Coupling object defined")
[docs] def fit(self, pha, amp, n_perm=200, p=.05, mcp='maxstat', n_jobs=-1,
random_state=None, verbose=None):
"""Compute PAC on filtered data.
Parameters
----------
pha : array_like
Array of phases of shape (n_pha, n_epochs, n_times).
Angles should be in rad.
amp : array_like
Array of amplitudes of shape (n_amp, n_epochs, n_times).
n_perm : int | 200
Number of surrogates to compute.
p : float | 0.05
Statistical threshold
mcp : {'fdr', 'bonferroni'}
Correct the p-values for multiple comparisons. Use either :
* 'maxstat' : maximum statistics
* 'fdr' : FDR correction (need MNE-Python)
* 'bonferroni' : Bonferroni correction (need MNE-Python)
n_jobs : int | -1
Number of jobs to compute PAC in parallel. For very large data,
set this parameter to 1 in order to prevent large memory usage.
random_state : int | None
Fix the random state of the machine for reproducible results.
Returns
-------
pac : array_like
Phase-Amplitude Coupling measure of shape (n_amp, n_pha, n_epochs)
Attributes
----------
pac : array_like
Unormalized Phase-Amplitude Coupling measure of shape (n_amp,
n_pha, n_epochs)
pvalues : array_like
Array of p-values of shape (n_amp, n_pha)
surrogates : array_like
Array of surrogates of shape (n_perm, n_amp, n_pha, n_epochs)
"""
set_log_level(verbose)
# ---------------------------------------------------------------------
# input checking
pha, amp = self._phampcheck(pha, amp)
self._pvalues, self._surrogates = None, None
# for the plv, extract the phase of the amplitude
if self._idpac[0] == 5:
amp = np.angle(hilbertm(amp))
# ---------------------------------------------------------------------
# check if permutations should be computed
if self._idpac[1] == 0:
n_perm = None
if not isinstance(n_perm, int) or not (n_perm > 0):
self._idpac = (self._idpac[0], 0, 0)
compute_surro = False
else:
compute_surro = True
# ---------------------------------------------------------------------
# copnorm if gaussian copula is used
if self._idpac[0] == 6:
logger.debug(f" copnorm the phase and the amplitude")
pha = copnorm(np.stack([np.sin(pha), np.cos(pha)], axis=-2))
amp = copnorm(amp[..., np.newaxis, :])
# ---------------------------------------------------------------------
# true pac estimation
logger.info(f' true PAC estimation using {self.method}')
fcn = get_pac_fcn(self.idpac[0], self.n_bins, p)
pac = fcn(pha, amp)
self._pac = pac.copy()
# ---------------------------------------------------------------------
# compute surrogates (if needed)
if compute_surro:
if random_state is None:
random_state = int(np.random.randint(0, 10000, size=1))
logger.info(f" compute surrogates ({self.str_surro}, {n_perm} "
f"permutations, random_state={random_state})")
surro = compute_surrogates(pha, amp, self.idpac[1], fcn, n_perm,
n_jobs, random_state)
self._surrogates = surro
# infer pvalues
self.infer_pvalues(p, mcp=mcp)
# ---------------------------------------------------------------------
# normalize (if needed)
if self._idpac[2] != 0:
# Get the mean / deviation of surrogates
logger.info(" normalize true PAC estimation by surrogates "
f"({self.str_norm})")
normalize(self.idpac[2], pac, surro)
return pac
[docs] def filterfit(self, sf, x_pha, x_amp=None, n_perm=200, p=.05,
mcp='maxstat', edges=None, n_jobs=-1, random_state=None,
verbose=None):
"""Filt the data then compute PAC on it.
Parameters
----------
sf : float
The sampling frequency.
x_pha, x_amp : array_like
Array of data for computing PAC. x_pha is the data used for
extracting phases and x_amp, amplitudes. Both arrays must have
the same shapes. If you want to compute PAC locally i.e. on the
same electrode, x=x_pha=x_amp. For distant coupling, x_pha and
x_amp could be different but still must to have the same shape.
n_perm : int | 200
Number of surrogates to compute.
p : float | 0.05
Statistical threshold
mcp : {'fdr', 'bonferroni'}
Correct the p-values for multiple comparisons. Use either :
* 'maxstat' : maximum statistics
* 'fdr' : FDR correction (need MNE-Python)
* 'bonferroni' : Bonferroni correction (need MNE-Python)
edges : int | None
Number of samples to discard to avoid edge effects due to filtering
n_jobs : int | -1
Number of jobs to compute PAC in parallel. For very large data,
set this parameter to 1 in order to prevent large memory usage.
random_state : int | None
Fix the random state of the machine for reproducible results.
Returns
-------
pac : array_like
Phase-Amplitude Coupling measure of shape (namp, npha, ...).
Attributes
----------
pac : array_like
Unormalized Phase-Amplitude Coupling measure of shape (n_amp,
n_pha, n_epochs)
pvalues : array_like
Array of p-values of shape (n_amp, n_pha)
surrogates : array_like
Array of surrogates of shape (n_perm, n_amp, n_pha, n_epochs)
"""
# Check if amp is None :
if x_amp is None:
x_amp = x_pha
# Shape checking :
assert x_pha.shape == x_amp.shape, ("Inputs `x_pha` and `x_amp` must "
"have the same shape.")
# Extract phase (npha, ...) and amplitude (namp, ...) :
logger.info(f" extract phases (n_pha={len(self.xvec)}) and "
f"amplitudes (n_amps={len(self.yvec)})")
kw = dict(keepfilt=False, edges=edges, n_jobs=1)
pha = self.filter(sf, x_pha, 'phase', **kw)
amp = self.filter(sf, x_amp, 'amplitude', **kw)
# Special cases :
if self._idpac[0] == 5:
amp = np.angle(hilbertm(amp))
# Compute pac :
return self.fit(pha, amp, p=p, mcp=mcp, n_perm=n_perm, n_jobs=n_jobs,
random_state=random_state, verbose=verbose)
[docs] def infer_pvalues(self, p=0.05, mcp='maxstat'):
"""Infer p-values based on surrogate distribution.
Parameters
----------
p : float | 0.05
Significiency threshold
Returns
-------
pvalues : array_like
Array of p-values of shape (n_amp, n_pha)
mcp : {'fdr', 'bonferroni'}
Correct the p-values for multiple comparisons. Use either :
* 'maxstat' : maximum statistics
* 'fdr' : FDR correction (need MNE-Python)
* 'bonferroni' : Bonferroni correction (need MNE-Python)
"""
# ---------------------------------------------------------------------
# check that pac and surrogates has already been computed
assert hasattr(self, 'pac'), ("You should compute PAC first. Use the "
"`fit` method")
assert hasattr(self, 'surrogates'), "No surrogates computed"
# mean pac and surrogates across trials
m_pac, m_surro = self.pac.mean(2), self.surrogates.mean(3)
self._pvalues = self._infer_pvalues(m_pac, m_surro, p=p, mcp=mcp)
return self._pvalues
def _idcheck(self, idpac):
"""Check the idpac parameter."""
idpac = np.atleast_1d(idpac)
if not all([isinstance(k, int) for k in idpac]) and (len(idpac) != 3):
raise ValueError("idpac must be a tuple/list of 3 integers.")
# Ozkurt PAC case (doesn't need surrogates and normalization)
if idpac[0] == 4:
idpac = np.array([4, 0, 0])
if (idpac[0] == 1) and (idpac[1] == 0) and (idpac[2] == 0):
logger.warning(
"MVL is amplitude dependent which means that if the amplitude "
"increases, MVL also increases. You should select a "
"normalization method for correcting this limitation "
"(e.g idpac=(1, 2, 4))")
if (idpac[2] != 0) and (idpac[1] == 0):
logger.warning("If you want to normalize the estimated PAC, you "
"should select a surrogate method (second digit of "
"`idpac`). Normalization ignored.")
idpac[2] = 0
self._idpac = idpac
# string representation
self.method, self.str_surro, self.str_norm = pacstr(idpac)
@property
def idpac(self):
"""Get the idpac value."""
return self._idpac
@idpac.setter
def idpac(self, value):
"""Set idpac value."""
self._idcheck(value)
@property
def pac(self):
"""Array of un-normalized PAC of shape (n_amp, n_pha, n_epochs)."""
return self._pac
@property
def surrogates(self):
"""Array of surrogates of shape (n_perm, n_amp, n_pha, n_epochs)."""
return self._surrogates
@property
def pvalues(self):
"""Array of p-values of shape (n_amp, n_pha)."""
return self._pvalues
[docs]class PreferredPhase(_PacObj, _PolarPlt):
"""Compute the Preferred Phase (PP).
The preferred phase is defined as the phase at which the amplitude is
maximum.
Parameters
----------
f_pha, f_amp : list/tuple/array | def: [2, 4] and [60, 200]
Frequency vector for the phase and amplitude. Here you can use
several forms to define those vectors :
* Basic list/tuple (ex: [2, 4] or [8, 12]...)
* List of frequency bands (ex: [[2, 4], [5, 7]]...)
* Dynamic definition : (start, stop, width, step)
* Range definition (ex : np.arange(3) => [[0, 1], [1, 2]])
dcomplex : {'wavelet', 'hilbert'}
Method for the complex definition. Use either 'hilbert' or
'wavelet'.
cycle : tuple | (3, 6)
Control the number of cycles for filtering (only if dcomplex is
'hilbert'). Should be a tuple of integers where the first one
refers to the number of cycles for the phase and the second for the
amplitude.
width : int | 7
Width of the Morlet's wavelet.
"""
[docs] def __init__(self, f_pha=[2, 4], f_amp=[60, 200], dcomplex='hilbert',
cycle=(3, 6), width=7, verbose=None):
"""Check and initialize."""
set_log_level(verbose)
_PacObj.__init__(self, f_pha=f_pha, f_amp=f_amp, dcomplex=dcomplex,
cycle=cycle, width=width)
_PacPlt.__init__(self)
logger.info("Preferred phase object defined")
self.method = 'Preferred-Phase (PP)'
[docs] def fit(self, pha, amp, n_bins=72):
"""Compute the preferred-phase.
Parameters
----------
pha, amp : array_like
Respectively the phase of slower oscillations of shape
(n_pha, n_epochs, n_times) and the amplitude of faster
oscillations of shape (n_pha, n_epochs, n_times).
n_bins : int | 72
Number of bins for bining the amplitude according to phase
slices.
Returns
-------
binned_amp : array_like
The binned amplitude according to the phase of shape
(n_bins, n_amp, n_pha, n_epochs)
pp : array_like
The prefered phase where the amplitude is maximum of shape
(namp, npha, n_epochs)
polarvec : array_like
The phase vector for the polar plot of shape (n_bins,)
"""
# Check phase and amplitude shapes :
pha, amp = self._phampcheck(pha, amp)
return preferred_phase(pha, amp, n_bins=n_bins)
[docs] def filterfit(self, sf, x_pha, x_amp=None, edges=None, n_bins=12,
verbose=None):
"""Extract phases, amplitudes and compute the preferred phase (PP).
Parameters
----------
sf : float
The sampling frequency.
x_pha, x_amp : array_like
Array of data for computing PP. x_pha is the data used for
extracting phases and x_amp, amplitudes. Both arrays must have
the same shapes (i.e n_epochs, n_times). If you want to compute
local PP i.e. on the same electrode, x=x_pha=x_amp. For distant
coupling, x_pha and x_amp could be different but still must to have
the same shape.
n_bins : int | 72
Number of bins for bining the amplitude according to phase
slices.
edges : int | None
Number of samples to discard to avoid edge effects due to filtering
Returns
-------
binned_amp : array_like
The binned amplitude according to the phase of shape
(n_bins, n_amp, n_pha, n_epochs)
pp : array_like
The prefered phase where the amplitude is maximum of shape
(namp, npha, n_epochs)
polarvec : array_like
The phase vector for the polar plot of shape (n_bins,)
"""
x_amp = x_pha if not isinstance(x_amp, np.ndarray) else x_amp
# extract phases and amplitudes
logger.info(f" Extract phases (n_pha={len(self.xvec)}) and "
f"amplitudes (n_amps={len(self.yvec)})")
kw = dict(keepfilt=False, edges=edges, n_jobs=1)
pha = self.filter(sf, x_pha, ftype='phase', **kw)
amp = self.filter(sf, x_amp, ftype='amplitude', **kw)
# compute pp
return self.fit(pha, amp, n_bins=n_bins)