"""This script is used to generate Current Source Density Estimates, using the
kCSD method Jan et.al (2012).
"""
import numpy as np
from numpy.linalg import LinAlgError, svd
from scipy import special, integrate, interpolate
from scipy.spatial import distance
from . import utility_functions as utils
from . import basis_functions as basis
[docs]
class CSD(object):
"""CSD - The base class for CSD methods."""
[docs]
def __init__(self, ele_pos, pots):
self.validate(ele_pos, pots)
self.ele_pos = ele_pos
self.pots = pots
self.n_ele = self.ele_pos.shape[0]
self.n_time = self.pots.shape[1]
self.dim = self.ele_pos.shape[1]
self.cv_error = None
[docs]
def validate(self, ele_pos, pots):
"""Basic checks to see if inputs are okay
Parameters
----------
ele_pos : numpy array
positions of electrodes
pots : numpy array
potentials measured by electrodes
"""
if ele_pos.shape[0] != pots.shape[0]:
raise Exception("Number of measured potentials is not equal "
"to electrode number!")
if ele_pos.shape[0] < 1+ele_pos.shape[1]: # Dim+1
raise Exception("Number of electrodes must be at least :",
1+ele_pos.shape[1])
if utils.check_for_duplicated_electrodes(ele_pos) is False:
raise Exception("Error! Duplicated electrode!")
[docs]
def sanity(self, true_csd, pos_csd):
"""Useful for comparing TrueCSD with reconstructed CSD.
Computes, the RMS error between the true_csd and the reconstructed csd at
pos_csd using the method defined.
Parameters
----------
true_csd : csd values used to generate potentials
pos_csd : csd estimatation from the method
Returns
-------
RMSE : root mean squared difference
"""
csd = self.values(pos_csd)
RMSE = np.sqrt(np.mean(np.square(true_csd - csd)))
return RMSE
[docs]
class KCSD(CSD):
"""KCSD - The base class for all the KCSD variants.
This estimates the Current Source Density, for a given configuration of
electrod positions and recorded potentials, electrodes.
The method implented here is based on the original paper
by Jan Potworowski et.al. 2012.
"""
[docs]
def __init__(self, ele_pos, pots, **kwargs):
super(KCSD, self).__init__(ele_pos, pots)
self.parameters(**kwargs)
self.estimate_at()
self.place_basis()
self.create_src_dist_tables()
self.method()
[docs]
def parameters(self, **kwargs):
"""Defining the default values of the method passed as kwargs
Parameters
----------
**kwargs
Same as those passed to initialize the Class
Raises
------
TypeError
If invalid keyword arguments inserted into **kwargs.
"""
self.src_type = kwargs.pop('src_type', 'gauss')
self.sigma = kwargs.pop('sigma', 1.0)
self.h = kwargs.pop('h', 1.0)
self.n_src_init = kwargs.pop('n_src_init', 1000)
self.lambd = kwargs.pop('lambd', 0.0)
self.R_init = kwargs.pop('R_init', 0.23)
self.ext_x = kwargs.pop('ext_x', 0.0)
self.xmin = kwargs.pop('xmin', np.min(self.ele_pos[:, 0]))
self.xmax = kwargs.pop('xmax', np.max(self.ele_pos[:, 0]))
self.gdx = kwargs.pop('gdx', 0.01*(self.xmax - self.xmin))
self.dist_table_density = kwargs.pop('dist_table_density', 20)
if self.dim >= 2:
self.ext_y = kwargs.pop('ext_y', 0.0)
self.ymin = kwargs.pop('ymin', np.min(self.ele_pos[:, 1]))
self.ymax = kwargs.pop('ymax', np.max(self.ele_pos[:, 1]))
self.gdy = kwargs.pop('gdy', 0.01*(self.ymax - self.ymin))
if self.dim == 3:
self.ext_z = kwargs.pop('ext_z', 0.0)
self.zmin = kwargs.pop('zmin', np.min(self.ele_pos[:, 2]))
self.zmax = kwargs.pop('zmax', np.max(self.ele_pos[:, 2]))
self.gdz = kwargs.pop('gdz', 0.01*(self.zmax - self.zmin))
if kwargs:
raise TypeError('Invalid keyword arguments:', kwargs.keys())
[docs]
def method(self):
"""Actual sequence of methods called for KCSD
Defines:
self.k_pot and self.k_interp_cross matrices
"""
self.create_lookup() # Look up table
self.update_b_pot() # update kernel
self.update_b_src() # update crskernel
self.update_b_interp_pot() # update pot interp
[docs]
def create_lookup(self):
"""Creates a table for easy potential estimation from CSD.
Updates and Returns the potentials due to a
given basis source like a lookup
table whose shape=(dist_table_density,)
Parameters
----------
dist_table_density : int
number of distance points at which potentials are computed.
Default 100
"""
xs = np.logspace(0., np.log10(self.dist_max+1.), self.dist_table_density)
xs = xs - 1.0 # starting from 0
dist_table = np.zeros(len(xs))
for i, pos in enumerate(xs):
dist_table[i] = self.forward_model(pos,
self.R,
self.h,
self.sigma,
self.basis)
self.interpolate_pot_at = interpolate.interp1d(xs, dist_table,
kind='cubic')
[docs]
def update_b_pot(self):
"""Updates the b_pot - array is (#_basis_sources, #_electrodes)
Updates the k_pot - array is (#_electrodes, #_electrodes) K(x,x')
Eq9,Jan2012
Calculates b_pot - matrix containing the values of all
the potential basis functions in all the electrode positions
(essential for calculating the cross_matrix).
"""
self.b_pot = self.interpolate_pot_at(self.src_ele_dists)
self.k_pot = np.dot(self.b_pot.T, self.b_pot) # K(x,x') Eq9,Jan2012
self.k_pot /= self.n_src
[docs]
def update_b_src(self):
"""Updates the b_src in the shape of (#_est_pts, #_basis_sources)
Updates the k_interp_cross - K_t(x,y) Eq17
Calculate b_src - matrix containing containing the values of
all the source basis functions in all the points at which we want to
calculate the solution (essential for calculating the cross_matrix)
"""
self.b_src = self.basis(self.src_estm_dists, self.R).T
self.k_interp_cross = np.dot(self.b_src, self.b_pot) # K_t(x,y) Eq17
self.k_interp_cross /= self.n_src
[docs]
def update_b_interp_pot(self):
"""Compute the matrix of potentials generated by every source
basis function at every position in the interpolated space.
Updates b_interp_pot
Updates k_interp_pot
Parameters
----------
None
"""
self.b_interp_pot = self.interpolate_pot_at(self.src_estm_dists).T
self.k_interp_pot = np.dot(self.b_interp_pot, self.b_pot)
self.k_interp_pot /= self.n_src
[docs]
def values(self, estimate='CSD'):
"""Computes the values of the quantity of interest
Parameters
----------
estimate : 'CSD' or 'POT'
What quantity is to be estimated
Defaults to 'CSD'
Returns
-------
estimation : np.array
estimated quantity of shape (ngx, ngy, ngz, nt)
"""
if estimate == 'CSD': # Maybe used for estimating the potentials also.
estimation_table = self.k_interp_cross
elif estimate == 'POT':
estimation_table = self.k_interp_pot
else:
print('Invalid quantity to be measured, pass either CSD or POT')
kernel = self.k_pot + self.lambd * np.identity(self.k_pot.shape[0])
beta_new = np.dot(np.linalg.inv(kernel), self.pots)
estimation = np.dot(estimation_table, beta_new)
return self.process_estimate(estimation)
[docs]
def process_estimate(self, estimation):
"""Function used to rearrange estimation according to dimension, to be
used by the fuctions values
Parameters
----------
estimation : np.array
Returns
-------
estimation : np.array
estimated quantity of shape (ngx, ngy, ngz, nt)
"""
if self.dim == 1:
estimation = estimation.reshape(self.ngx, self.n_time)
elif self.dim == 2:
estimation = estimation.reshape(self.ngx, self.ngy, self.n_time)
elif self.dim == 3:
estimation = estimation.reshape(self.ngx, self.ngy, self.ngz,
self.n_time)
try:
self.own_src
return estimation
except: pass
return estimation
[docs]
def update_R(self, R):
"""Update the width of the basis fuction - Used in Cross validation
Parameters
----------
R : float
"""
if self.R == R:
return
self.R = R
self.dist_max = max(np.max(self.src_ele_dists),
np.max(self.src_estm_dists)) + self.R
self.method()
[docs]
def update_lambda(self, lambd):
"""Update the lambda parameter of regularization, Used in Cross validation
Parameters
----------
lambd : float
"""
self.lambd = lambd
[docs]
def cross_validate(self, lambdas=None, Rs=None, regression_test=False):
"""Method defines the cross validation.
By default only cross_validates over lambda,
When no argument is passed, it takes
lambdas = np.logspace(-2,-25,25,base=10.)
and Rs = np.array(self.R).flatten()
otherwise pass necessary numpy arrays
Parameters
----------
lambdas : numpy array
Rs : numpy array
regression_test : bool
Returns
-------
R : post cross validation
Lambda : post cross validation
"""
if lambdas is None: # when None
print('No lambda given, using defaults')
lambdas = np.logspace(-2,-25,25,base=10.) # Default multiple lambda
lambdas = np.hstack((lambdas, np.array((0.0))))
elif lambdas.size == 1: # resize when one entry
lambdas = lambdas.flatten()
if Rs is None: # when None
Rs = np.array((self.R)).flatten() # Default over one R value
self.errs = np.zeros((Rs.size, lambdas.size))
if regression_test:
index_generator = self._get_index_generator()
self.errs_regression = self.errs.copy()
for R_idx, R in enumerate(Rs): # Iterate over R
self.update_R(R)
print('Cross validating R (all lambda) :', R)
for lambd_idx, lambd in enumerate(lambdas): # Iterate over lambdas
self.errs[R_idx, lambd_idx] = self._compute_cverr(lambd)
if regression_test:
self.errs_regression[R_idx, lambd_idx] = self.compute_cverror(lambd,
index_generator)
err_idx = np.where(self.errs == np.min(self.errs)) # Index of the least error
cv_R = Rs[err_idx[0]][0] # First occurance of the least error's
cv_lambda = lambdas[err_idx[1]][0]
self.cv_error = np.min(self.errs) # otherwise is None
self.update_R(cv_R) # Update solver
self.update_lambda(cv_lambda)
print('R, lambda :', cv_R, cv_lambda)
if regression_test:
err_idx = np.where(self.errs_regression == np.min(self.errs_regression))
print("R, lambda :", Rs[err_idx[0]][0], lambdas[err_idx[1]][0], "OLD")
print("max diff :", abs(self.errs - self.errs_regression).max())
print("new min, old min :", self.cv_error, np.min(self.errs_regression))
return cv_R, cv_lambda
def _get_index_generator(self):
index_generator = []
for ii in range(self.n_ele):
idx_test = [ii]
idx_train = list(range(self.n_ele))
idx_train.remove(ii) # Leave one out
index_generator.append((idx_train, idx_test))
return index_generator
def _compute_cverr(self, lambd):
n = self.k_pot.shape[0] # self.n_ele
K = self.k_pot + np.identity(n) * lambd
IDX_N = np.arange(n)
return sum(map(np.linalg.norm,
[self._leave_one_out_estimate(K, i, IDX_N != i) - ROW
for i, ROW in enumerate(self.pots)]))
def _leave_one_out_estimate(self, KERNEL, i, IDX):
return np.matmul(self.k_pot[np.ix_([i], IDX)],
np.linalg.solve(KERNEL[np.ix_(IDX, IDX)],
self.pots[IDX]))[0]
[docs]
def compute_cverror(self, lambd, index_generator):
"""Useful for Cross validation error calculations
Parameters
----------
lambd : float
index_generator : list
Returns
-------
err : float
the sum of the error computed.
Raises
------
LinAlgError
If the matrix is not numerically invertible.
"""
err = 0
for idx_train, idx_test in index_generator:
B_train = self.k_pot[np.ix_(idx_train, idx_train)]
V_train = self.pots[idx_train]
V_test = self.pots[idx_test]
I_matrix = np.identity(len(idx_train))
B_new = np.matrix(B_train) + (lambd*I_matrix)
try:
beta_new = np.dot(np.matrix(B_new).I, np.matrix(V_train))
B_test = self.k_pot[np.ix_(idx_test, idx_train)]
V_est = np.zeros((len(idx_test), self.pots.shape[1]))
for ii in range(len(idx_train)):
for tt in range(self.pots.shape[1]):
V_est[:, tt] += beta_new[ii, tt] * B_test[:, ii]
err += np.linalg.norm(V_est-V_test)
except LinAlgError:
raise LinAlgError('Encoutered Singular Matrix Error:'
'try changing ele_pos slightly')
return err
[docs]
def suggest_lambda(self):
"""Computes the lambda parameter range for regularization,
Used in Cross validation and L-curve
Returns
-------
Lambdas : list
"""
u, s, v = svd(self.k_pot)
print('min lambda', 10**np.round(np.log10(s[-1]), decimals=0))
print('max lambda', str.format('{0:.4f}', np.std(np.diag(self.k_pot))))
return np.logspace(np.log10(s[-1]), np.log10(np.std(np.diag(self.k_pot))), 20)
[docs]
def L_curve(self, lambdas=None, Rs=None, n_jobs=1):
"""Method defines the L-curve.
By default calculates L-curve over lambda,
When no argument is passed, it takes
lambdas = np.logspace(-10,-1,100,base=10)
and Rs = np.array(self.R).flatten()
otherwise pass necessary numpy arrays
Parameters
----------
L-curve plotting: default True
lambdas : numpy array
Rs : numpy array
Returns
-------
curve_surf : post cross validation
"""
if lambdas is None:
print('No lambda given, using defaults')
lambdas = self.suggest_lambda()
else:
lambdas = lambdas.flatten()
if Rs is None:
Rs = np.array((self.R)).flatten()
else:
Rs = np.array((Rs)).flatten()
self.lcurve_axis = np.zeros((2, len(Rs), len(lambdas)))
self.curve_surf = np.zeros((len(Rs), len(lambdas)))
for R_idx, R in enumerate(Rs):
self.update_R(R)
self.suggest_lambda()
print('l-curve (all lambda): ', np.round(R, decimals=3))
modelnormseq, residualseq = utils.parallel_search(self.k_pot, self.pots, lambdas,
n_jobs=n_jobs)
norm_log = np.log(modelnormseq + np.finfo(np.float64).eps)
res_log = np.log(residualseq + np.finfo(np.float64).eps)
curveseq = res_log[0] * (norm_log - norm_log[-1]) + res_log * (norm_log[-1] - norm_log[0]) \
+ res_log[-1] * (norm_log[0] - norm_log)
self.curve_surf[R_idx] = curveseq
self.lcurve_axis[0,R_idx] = modelnormseq#norm_log
self.lcurve_axis[1,R_idx] = residualseq#res_log
best_R_ind = np.argmax(np.max(self.curve_surf, axis=1))
self.m_norm = self.lcurve_axis[0, best_R_ind]
self.m_resi = self.lcurve_axis[1, best_R_ind]
self.update_R(Rs[best_R_ind])
self.update_lambda(lambdas[np.argmax(self.curve_surf, axis=1)[best_R_ind]])
print("Best lambda and R = ", self.lambd, ', ',
np.round(self.R, decimals=3))
[docs]
class KCSD1D(KCSD):
"""The 1D variant for the Kernel Current Source Density method.
This estimates the Current Source Density, for a given configuration of
electrod positions and recorded potentials, in the case of 1D recording
electrodes (laminar probes). The method implented here is based on the
original paper by Jan Potworowski et.al. 2012.
"""
[docs]
def __init__(self, ele_pos, pots, **kwargs):
"""Initialize KCSD1D Class.
Parameters
----------
ele_pos : numpy array
positions of electrodes
pots : numpy array
potentials measured by electrodes
**kwargs
configuration parameters, that may contain the following keys:
src_type : str
basis function type ('gauss', 'step', 'gauss_lim')
Defaults to 'gauss'
sigma : float
space conductance of the tissue in S/m
Defaults to 1 S/m
n_src_init : int
requested number of sources
Defaults to 300
R_init : float
demanded thickness of the basis element
Defaults to 0.23
h : float
thickness of analyzed cylindrical slice
Defaults to 1.
xmin, xmax : floats
boundaries for CSD estimation space
Defaults to min(ele_pos(x)), and max(ele_pos(x))
ext_x : float
length of space extension: x_min-ext_x ... x_max+ext_x
Defaults to 0.
gdx : float
space increments in the estimation space
Defaults to 0.01(xmax-xmin)
lambd : float
regularization parameter for ridge regression
Defaults to 0.
dist_table_density : int
size of the potential interpolation table
Defaults to 20
Raises
------
LinAlgError
If the matrix is not numerically invertible.
KeyError
Basis function (src_type) not implemented. See basis_functions.py for available
"""
super(KCSD1D, self).__init__(ele_pos, pots, **kwargs)
[docs]
def estimate_at(self):
"""Defines locations where the estimation is wanted
Defines:
self.n_estm = self.estm_x.size
self.ngx = self.estm_x.shape
self.estm_x : Locations at which CSD is requested.
"""
self.ngx = int(np.rint((self.xmax - self.xmin)/self.gdx)) + 1
self.estm_x = np.linspace(self.xmin, self.xmax, self.ngx)
self.n_estm = self.estm_x.size
self.estm_pos = self.estm_x.reshape(self.n_estm, 1)
[docs]
def place_basis(self):
"""Places basis sources of the defined type.
Checks if a given source_type is defined, if so then defines it
self.basis, This function gives locations of the basis sources,
Defines
source_type : basis_fuctions.basis_1D.keys()
self.R based on R_init
self.dist_max as maximum distance between electrode and basis
self.nsx = self.src_x.shape
self.src_x : Locations at which basis sources are placed.
"""
source_type = self.src_type
try:
self.basis = basis.basis_1D[source_type]
except KeyError:
raise KeyError('Invalid source_type for basis! available are:',
basis.basis_1D.keys())
(self.src_x, self.R) = utils.distribute_srcs_1D(self.estm_x,
self.n_src_init,
self.ext_x,
self.R_init)
self.n_src = self.src_x.size
self.nsx = self.src_x.shape
[docs]
def create_src_dist_tables(self):
"""Creates distance tables between sources, electrode and estm points
"""
src_loc = np.array((self.src_x.ravel()))
src_loc = src_loc.reshape((len(src_loc), 1))
est_loc = np.array((self.estm_x.ravel()))
est_loc = est_loc.reshape((len(est_loc), 1))
self.src_ele_dists = distance.cdist(src_loc, self.ele_pos, 'euclidean')
self.src_estm_dists = distance.cdist(src_loc, est_loc, 'euclidean')
self.dist_max = max(np.max(self.src_ele_dists),
np.max(self.src_estm_dists)) + self.R
[docs]
def forward_model(self, x, R, h, sigma, src_type):
"""Forward model functions
Evaluates potential at point (x,0) by a basis source located at (0,0)
Eq 26 kCSD by Jan,2012
Parameters
----------
x : float
R : float
h : float
sigma : float
src_type : basis_1D.key
Returns
-------
pot : float
value of potential at specified distance from the source
"""
pot, err = integrate.quad(self.int_pot_1D,
-R, R,
args=(x, R, h, src_type))
pot *= 1./(2.0*sigma)
return pot
[docs]
def int_pot_1D(self, xp, x, R, h, basis_func):
"""Forward model function for 1D.
Returns contribution of a point xp,yp, belonging to a basis source
support centered at (0,0) to the potential measured at (x,0),
integrated over xp,yp gives the potential generated by a
basis source element centered at (0,0) at point (x,0)
Eq 26 kCSD by Jan,2012
Parameters
----------
xp : floats or np.arrays
point or set of points where function should be calculated
x : float
position at which potential is being measured
R : float
The size of the basis function
h : float
thickness of slice
basis_func : method
Fuction of the basis source
Returns
-------
pot : float
"""
m = np.sqrt((x-xp)**2 + h**2) - abs(x-xp)
m *= basis_func(abs(xp), R) # xp is the distance
return m
[docs]
class KCSD2D(KCSD):
"""The 2D variant for the Kernel Current Source Density method.
This estimates the Current Source Density, for a given configuration of
electrod positions and recorded potentials, in the case of 2D recording
electrodes. The method implented here is based on the original paper
by Jan Potworowski et.al. 2012.
"""
[docs]
def __init__(self, ele_pos, pots, **kwargs):
"""Initialize KCSD2D Class.
Parameters
----------
ele_pos : numpy array
positions of electrodes
pots : numpy array
potentials measured by electrodes
**kwargs
configuration parameters, that may contain the following keys:
src_type : str
basis function type ('gauss', 'step', 'gauss_lim')
Defaults to 'gauss'
sigma : float
space conductance of the tissue in S/m
Defaults to 1 S/m
n_src_init : int
requested number of sources
Defaults to 1000
R_init : float
demanded thickness of the basis element
Defaults to 0.23
h : float
thickness of analyzed tissue slice
Defaults to 1.
xmin, xmax, ymin, ymax : floats
boundaries for CSD estimation space
Defaults to min(ele_pos(x)), and max(ele_pos(x))
Defaults to min(ele_pos(y)), and max(ele_pos(y))
ext_x, ext_y : float
length of space extension: x_min-ext_x ... x_max+ext_x
length of space extension: y_min-ext_y ... y_max+ext_y
Defaults to 0.
gdx, gdy : float
space increments in the estimation space
Defaults to 0.01(xmax-xmin)
Defaults to 0.01(ymax-ymin)
lambd : float
regularization parameter for ridge regression
Defaults to 0.
Raises
------
LinAlgError
Could not invert the matrix, try changing the ele_pos slightly
KeyError
Basis function (src_type) not implemented. See basis_functions.py for available
"""
super(KCSD2D, self).__init__(ele_pos, pots, **kwargs)
[docs]
def estimate_at(self):
"""Defines locations where the estimation is wanted
Defines:
self.n_estm = self.estm_x.size
self.ngx, self.ngy = self.estm_x.shape
self.estm_x, self.estm_y : Locations at which CSD is requested.
"""
self.ngx = int(np.rint((self.xmax - self.xmin)/self.gdx)) + 1
self.ngy = int(np.rint((self.ymax - self.ymin)/self.gdy)) + 1
self.estm_pos = np.meshgrid(np.linspace(self.xmin, self.xmax, self.ngx),
np.linspace(self.ymin, self.ymax, self.ngy),
indexing='ij')
self.estm_x, self.estm_y = self.estm_pos
self.n_estm = self.estm_x.size
[docs]
def place_basis(self):
"""Places basis sources of the defined type.
Checks if a given source_type is defined, if so then defines it
self.basis, This function gives locations of the basis sources,
Defines
source_type : basis_fuctions.basis_2D.keys()
self.R based on R_init
self.dist_max as maximum distance between electrode and basis
self.nsx, self.nsy = self.src_x.shape
self.src_x, self.src_y : Locations at which basis sources are placed.
"""
source_type = self.src_type
try:
self.basis = basis.basis_2D[source_type]
except KeyError:
raise KeyError('Invalid source_type for basis! available are:',
basis.basis_2D.keys())
(self.src_x, self.src_y, self.R) = utils.distribute_srcs_2D(self.estm_x,
self.estm_y,
self.n_src_init,
self.ext_x,
self.ext_y,
self.R_init)
self.n_src = self.src_x.size
self.nsx, self.nsy = self.src_x.shape
[docs]
def create_src_dist_tables(self):
"""Creates distance tables between sources, electrode and estm points
"""
src_loc = np.array((self.src_x.ravel(), self.src_y.ravel()))
est_loc = np.array((self.estm_x.ravel(), self.estm_y.ravel()))
self.src_ele_dists = distance.cdist(src_loc.T, self.ele_pos, 'euclidean')
self.src_estm_dists = distance.cdist(src_loc.T, est_loc.T, 'euclidean')
self.dist_max = max(np.max(self.src_ele_dists), np.max(self.src_estm_dists)) + self.R
[docs]
def forward_model(self, x, R, h, sigma, src_type):
"""FWD model functions
Evaluates potential at point (x,0) by a basis source located at (0,0)
Eq 22 kCSD by Jan,2012
Parameters
----------
x : float
R : float
h : float
sigma : float
src_type : basis_2D.key
Returns
-------
pot : float
value of potential at specified distance from the source
"""
pot, err = integrate.dblquad(self.int_pot_2D,
-R, R,
lambda x: -R,
lambda x: R,
args=(x, R, h, src_type))
pot *= 1./(2.0*np.pi*sigma) # Potential basis functions bi_x_y
return pot
[docs]
def int_pot_2D(self, xp, yp, x, R, h, basis_func):
"""FWD model function.
Returns contribution of a point xp,yp, belonging to a basis source
support centered at (0,0) to the potential measured at (x,0),
integrated over xp,yp gives the potential generated by a
basis source element centered at (0,0) at point (x,0)
Parameters
----------
xp, yp : floats or np.arrays
point or set of points where function should be calculated
x : float
position at which potential is being measured
R : float
The size of the basis function
h : float
thickness of slice
basis_func : method
Fuction of the basis source
Returns
-------
pot : float
"""
y = ((x-xp)**2 + yp**2)**(0.5)
if y < 0.00001:
y = 0.00001
dist = np.sqrt(xp**2 + yp**2)
pot = np.arcsinh(h/y)*basis_func(dist, R)
return pot
[docs]
class MoIKCSD(KCSD2D):
"""MoIKCSD - CSD while including the forward modeling effects of saline.
This estimates the Current Source Density, for a given configuration of
electrod positions and recorded potentials, in the case of 2D recording
electrodes from an MEA electrode plane using the Method of Images.
The method implented here is based on kCSD method by Jan Potworowski
et.al. 2012, which was extended in Ness, Chintaluri 2015 for MEA.
"""
[docs]
def __init__(self, ele_pos, pots, **kwargs):
"""Initialize MoIKCSD Class.
Parameters
----------
ele_pos : numpy array
positions of electrodes
pots : numpy array
potentials measured by electrodes
**kwargs
configuration parameters, that may contain the following keys:
src_type : str
basis function type ('gauss', 'step', 'gauss_lim')
Defaults to 'gauss'
sigma : float
space conductance of the tissue in S/m
Defaults to 1 S/m
sigma_S : float
conductance of the saline (medium) in S/m
Default is 5 S/m (5 times more conductive)
n_src_init : int
requested number of sources
Defaults to 1000
R_init : float
demanded thickness of the basis element
Defaults to 0.23
h : float
thickness of analyzed tissue slice
Defaults to 1.
xmin, xmax, ymin, ymax : floats
boundaries for CSD estimation space
Defaults to min(ele_pos(x)), and max(ele_pos(x))
Defaults to min(ele_pos(y)), and max(ele_pos(y))
ext_x, ext_y : float
length of space extension: x_min-ext_x ... x_max+ext_x
length of space extension: y_min-ext_y ... y_max+ext_y
Defaults to 0.
gdx, gdy : float
space increments in the estimation space
Defaults to 0.01(xmax-xmin)
Defaults to 0.01(ymax-ymin)
lambd : float
regularization parameter for ridge regression
Defaults to 0.
MoI_iters : int
Number of interations in method of images.
Default is 20
"""
self.MoI_iters = kwargs.pop('MoI_iters', 20)
self.sigma_S = kwargs.pop('sigma_S', 5.0)
self.sigma = kwargs.pop('sigma', 1.0)
W_TS = (self.sigma - self.sigma_S) / (self.sigma + self.sigma_S)
self.iters = np.arange(self.MoI_iters) + 1 # Eq 6, Ness (2015)
self.iter_factor = W_TS**self.iters
super(MoIKCSD, self).__init__(ele_pos, pots, **kwargs)
[docs]
def forward_model(self, x, R, h, sigma, src_type):
"""FWD model functions
Evaluates potential at point (x,0) by a basis source located at (0,0)
Eq 22 kCSD by Jan,2012
Parameters
----------
x : float
R : float
h : float
sigma : float
src_type : basis_2D.key
Returns
-------
pot : float
value of potential at specified distance from the source
"""
pot, err = integrate.dblquad(self.int_pot_2D_moi, -R, R,
lambda x: -R,
lambda x: R,
args=(x, R, h, src_type))
pot *= 1./(2.0*np.pi*sigma)
return pot
[docs]
def int_pot_2D_moi(self, xp, yp, x, R, h, basis_func):
"""FWD model function. Incorporates the Method of Images.
Returns contribution of a point xp,yp, belonging to a basis source
support centered at (0,0) to the potential measured at (x,0),
integrated over xp,yp gives the potential generated by a
basis source element centered at (0,0) at point (x,0)
#Eq 20, Ness(2015)
Parameters
----------
xp, yp : floats or np.arrays
point or set of points where function should be calculated
x : float
position at which potential is being measured
R : float
The size of the basis function
h : float
thickness of slice
basis_func : method
Fuction of the basis source
Returns
-------
pot : float
"""
L = ((x-xp)**2 + yp**2)**(0.5)
if L < 0.00001:
L = 0.00001
correction = np.arcsinh((h-(2*h*self.iters))/L) + np.arcsinh((h+(2*h*self.iters))/L)
pot = np.arcsinh(h/L) + np.sum(self.iter_factor*correction)
dist = np.sqrt(xp**2 + yp**2)
pot *= basis_func(dist, R) # Eq 20, Ness et.al.
return pot
[docs]
class KCSD3D(KCSD):
"""KCSD3D - The 3D variant for the Kernel Current Source Density method.
This estimates the Current Source Density, for a given configuration of
electrod positions and recorded potentials, in the case of 2D recording
electrodes. The method implented here is based on the original paper
by Jan Potworowski et.al. 2012.
"""
[docs]
def __init__(self, ele_pos, pots, **kwargs):
"""Initialize KCSD3D Class.
Parameters
----------
ele_pos : numpy array
positions of electrodes
pots : numpy array
potentials measured by electrodes
**kwargs
configuration parameters, that may contain the following keys:
src_type : str
basis function type ('gauss', 'step', 'gauss_lim')
Defaults to 'gauss'
sigma : float
space conductance of the tissue in S/m
Defaults to 1 S/m
n_src_init : int
requested number of sources
Defaults to 1000
R_init : float
demanded thickness of the basis element
Defaults to 0.23
h : float
thickness of analyzed tissue slice
Defaults to 1.
xmin, xmax, ymin, ymax, zmin, zmax : floats
boundaries for CSD estimation space
Defaults to min(ele_pos(x)), and max(ele_pos(x))
Defaults to min(ele_pos(y)), and max(ele_pos(y))
Defaults to min(ele_pos(z)), and max(ele_pos(z))
ext_x, ext_y, ext_z : float
length of space extension: xmin-ext_x ... xmax+ext_x
length of space extension: ymin-ext_y ... ymax+ext_y
length of space extension: zmin-ext_z ... zmax+ext_z
Defaults to 0.
gdx, gdy, gdz : float
space increments in the estimation space
Defaults to 0.01(xmax-xmin)
Defaults to 0.01(ymax-ymin)
Defaults to 0.01(zmax-zmin)
lambd : float
regularization parameter for ridge regression
Defaults to 0.
Raises
------
LinAlgError
Could not invert the matrix, try changing the ele_pos slightly
KeyError
Basis function (src_type) not implemented.
See basis_functions.py for available
"""
super(KCSD3D, self).__init__(ele_pos, pots, **kwargs)
[docs]
def estimate_at(self):
"""Defines locations where the estimation is wanted
Defines:
self.n_estm = self.estm_x.size
self.ngx, self.ngy, self.ngz = self.estm_x.shape
self.estm_x, self.estm_y, self.estm_z : Pts. at which CSD is requested
"""
self.ngx = int(np.rint((self.xmax - self.xmin)/self.gdx)) + 1
self.ngy = int(np.rint((self.ymax - self.ymin)/self.gdy)) + 1
self.ngz = int(np.rint((self.zmax - self.zmin)/self.gdz)) + 1
self.estm_pos = np.meshgrid(np.linspace(self.xmin, self.xmax, self.ngx),
np.linspace(self.ymin, self.ymax, self.ngy),
np.linspace(self.zmin, self.zmax, self.ngz),
indexing='ij')
self.estm_x, self.estm_y, self.estm_z = self.estm_pos
self.n_estm = self.estm_x.size
[docs]
def place_basis(self):
"""Places basis sources of the defined type.
Checks if a given source_type is defined, if so then defines it
self.basis, This function gives locations of the basis sources,
Defines
source_type : basis_fuctions.basis_2D.keys()
self.R based on R_init
self.dist_max as maximum distance between electrode and basis
self.nsx, self.nsy, self.nsz = self.src_x.shape
self.src_x, self.src_y, self.src_z : Locations at which basis sources are placed.
"""
source_type = self.src_type
try:
self.basis = basis.basis_3D[source_type]
except KeyError:
raise KeyError('Invalid source_type for basis! available are:',
basis.basis_3D.keys())
(self.src_x, self.src_y, self.src_z, self.R) = utils.distribute_srcs_3D(self.estm_x,
self.estm_y,
self.estm_z,
self.n_src_init,
self.ext_x,
self.ext_y,
self.ext_z,
self.R_init)
self.n_src = self.src_x.size
self.nsx, self.nsy, self.nsz = self.src_x.shape
[docs]
def create_src_dist_tables(self):
"""Creates distance tables between sources, electrode and estm points
"""
src_loc = np.array((self.src_x.ravel(),
self.src_y.ravel(),
self.src_z.ravel()))
est_loc = np.array((self.estm_x.ravel(),
self.estm_y.ravel(),
self.estm_z.ravel()))
self.src_ele_dists = distance.cdist(src_loc.T, self.ele_pos, 'euclidean')
self.src_estm_dists = distance.cdist(src_loc.T, est_loc.T, 'euclidean')
self.dist_max = max(np.max(self.src_ele_dists),
np.max(self.src_estm_dists)) + self.R
[docs]
def forward_model(self, x, R, h, sigma, src_type):
"""FWD model functions
Evaluates potential at point (x,0) by a basis source located at (0,0)
Utlizies sk monaco monte carlo method if available, otherwise defaults
to scipy integrate
Parameters
----------
x : float
R : float
h : float
sigma : float
src_type : basis_3D.key
Returns
-------
pot : float
value of potential at specified distance from the source
"""
if src_type.__name__ == "gauss_3D":
if x == 0: x=0.0001
pot = special.erf(x/(np.sqrt(2)*R/3.0)) / x
elif src_type.__name__ == "gauss_lim_3D":
if x == 0: x=0.0001
d = R/3.
if x < R:
e = np.exp(-(x / (np.sqrt(2)*d))**2)
erf = special.erf(x / (np.sqrt(2)*d))
pot = 4*np.pi * ((d**2)*(e - np.exp(-4.5)) +
(1/x)*((np.sqrt(np.pi/2)*(d**3)*erf) - x*(d**2)*e))
else:
pot = 15.28828*(d)**3 / x
pot /= (np.sqrt(2*np.pi)*d)**3
elif src_type.__name__ == "step_3D":
Q = 4.*np.pi*(R**3)/3.
if x < R:
pot = (Q * (3 - (x/R)**2)) / (2.*R)
else:
pot = Q / x
pot *= 3/(4*np.pi*R**3)
else:
pot, err = integrate.tplquad(self.int_pot_3D,
-R,
R,
lambda x: -R,
lambda x: R,
lambda x, y: -R,
lambda x, y: R,
args=(x, R, h, src_type))
pot *= 1./(4.0*np.pi*sigma)
return pot
[docs]
def int_pot_3D(self, xp, yp, zp, x, R, h, basis_func):
"""FWD model function.
Returns contribution of a point xp,yp, belonging to a basis source
support centered at (0,0) to the potential measured at (x,0),
integrated over xp,yp gives the potential generated by a
basis source element centered at (0,0) at point (x,0)
Parameters
----------
xp, yp, zp : floats or np.arrays
point or set of points where function should be calculated
x : float
position at which potential is being measured
R : float
The size of the basis function
h : float
thickness of slice
basis_func : method
Fuction of the basis source
Returns
-------
pot : float
"""
y = ((x-xp)**2 + yp**2 + zp**2)**0.5
if y < 0.00001:
y = 0.00001
dist = np.sqrt(xp**2 + yp**2 + zp**2)
pot = 1.0/y
pot *= basis_func(dist, R)
return pot
[docs]
class oKCSD1D(KCSD1D):
"""oKCSD - The variant for the Kernel Current Source Density method that
allows to reconstruct potential and CSD in given 1D space points.
"""
[docs]
def __init__(self, ele_pos, pots, **kwargs):
"""Initialize oKCSD1D Class.
Parameters
----------
ele_pos : numpy array
positions of electrodes
pots : numpy array
potentials measured by electrodes
**kwargs
configuration parameters, that may contain the following keys:
src_type : str
basis function type ('gauss', 'step', 'gauss_lim')
Defaults to 'gauss'
sigma : float
space conductance of the tissue in S/m
Defaults to 1 S/m
n_src_init : int
requested number of sources
Defaults to 1000
R_init : float
demanded thickness of the basis element
Defaults to 0.23
h : float
thickness of analyzed tissue slice
Defaults to 1.
own_est: numpy array
points coordinates of estimation places. If not given
estimation places will be taken from own_src
own_src: numpy array
points coordinates of basis source centers
lambd : float
regularization parameter for ridge regression
Defaults to 0.
Raises
------
LinAlgError
Could not invert the matrix, try changing the ele_pos slightly
KeyError
Basis function (src_type) not implemented.
See basis_functions.py for available
"""
self.own_src = kwargs.pop('own_src', np.array([]))
self.own_est = kwargs.pop('own_est', np.array([]))
if not self.own_est.any(): self.own_est = self.own_src
if not self.own_est.any() and not self.own_src.any():
raise KeyError('"own_src" is required argument to use oKCSD1D.' +
'If you would like to reconstruct in default ' +
'region of interest please use KCSD1D')
super(oKCSD1D, self).__init__(ele_pos, pots, **kwargs)
self.dim = 'own'
[docs]
def estimate_at(self):
"""Redefines locations where the estimation is wanted
Defines:
self.n_estm = self.estm_x.size
self.estm_x : Locations at which CSD is requested.
"""
self.estm_x = self.own_est
self.n_estm = self.estm_x.size
[docs]
def place_basis(self):
"""Places basis sources of the defined type.
Checks if a given source_type is defined, if so then defines it
self.basis, This function gives locations of the basis sources,
Defines
source_type : basis_fuctions.basis_1D.keys()
self.R based on R_init
self.dist_max as maximum distance between electrode and basis
self.src_x : Locations at which basis sources are placed.
"""
source_type = self.src_type
try:
self.basis = basis.basis_1D[source_type]
except KeyError:
raise KeyError('Invalid source_type for basis! available are:',
basis.basis_1D.keys())
self.R = self.R_init
self.src_x = self.own_src
self.n_src = self.src_x.size
[docs]
class oKCSD2D(KCSD2D):
"""oKCSD - The variant for the Kernel Current Source Density method that
allows to reconstruct potential and CSD in given 2D space points.
"""
[docs]
def __init__(self, ele_pos, pots, **kwargs):
"""Initialize oKCSD2D Class.
Parameters
----------
ele_pos : numpy array
positions of electrodes
pots : numpy array
potentials measured by electrodes
**kwargs
configuration parameters, that may contain the following keys:
src_type : str
basis function type ('gauss', 'step', 'gauss_lim')
Defaults to 'gauss'
sigma : float
space conductance of the tissue in S/m
Defaults to 1 S/m
n_src_init : int
requested number of sources
Defaults to 1000
R_init : float
demanded thickness of the basis element
Defaults to 0.23
h : float
thickness of analyzed tissue slice
Defaults to 1.
own_est: numpy array
points coordinates of estimation places. If not given
estimation places will be taken from own_src
own_src: numpy array
points coordinates of basis source centers
lambd : float
regularization parameter for ridge regression
Defaults to 0.
Raises
------
LinAlgError
Could not invert the matrix, try changing the ele_pos slightly
KeyError
Basis function (src_type) not implemented.
See basis_functions.py for available
"""
self.own_src = kwargs.pop('own_src', np.array([]))
self.own_est = kwargs.pop('own_est', np.array([]))
if not self.own_est.any(): self.own_est = self.own_src
if not self.own_est.any() and not self.own_src.any():
raise KeyError('"own_src" is required argument to use oKCSD2D.' +
'If you would like to reconstruct in default ' +
'region of interest please use KCSD2D')
super(oKCSD2D, self).__init__(ele_pos, pots, **kwargs)
self.dim = 'own'
[docs]
def estimate_at(self):
"""Redefines locations where the estimation is wanted
Defines:
self.n_estm = self.estm_x.size
self.estm_x, self.estm_y : Locations at which CSD is requested.
"""
self.estm_x, self.estm_y = self.own_est
self.n_estm = self.estm_x.size
[docs]
def place_basis(self):
"""Places basis sources of the defined type.
Checks if a given source_type is defined, if so then defines it
self.basis, This function gives locations of the basis sources,
Defines
source_type : basis_fuctions.basis_2D.keys()
self.R based on R_init
self.src_x, self.src_y : Locations at which basis sources are placed.
"""
source_type = self.src_type
try:
self.basis = basis.basis_2D[source_type]
except KeyError:
raise KeyError('Invalid source_type for basis! available are:',
basis.basis_2D.keys())
self.R = self.R_init
self.src_x, self.src_y = self.own_src
self.n_src = self.src_x.size
[docs]
class oKCSD3D(KCSD3D):
"""oKCSD - The variant for the Kernel Current Source Density method that
allows to reconstruct potential and CSD in given 3D space points.
"""
[docs]
def __init__(self, ele_pos, pots, **kwargs):
"""Initialize oKCSD3D Class.
Parameters
----------
ele_pos : numpy array
positions of electrodes
pots : numpy array
potentials measured by electrodes
**kwargs
configuration parameters, that may contain the following keys:
src_type : str
basis function type ('gauss', 'step', 'gauss_lim')
Defaults to 'gauss'
sigma : float
space conductance of the tissue in S/m
Defaults to 1 S/m
n_src_init : int
requested number of sources
Defaults to 1000
R_init : float
demanded thickness of the basis element
Defaults to 0.23
h : float
thickness of analyzed tissue slice
Defaults to 1.
own_est: numpy array
points coordinates of estimation places. If not given
estimation places will be taken from own_src
own_src: numpy array
points coordinates of basis source centers
lambd : float
regularization parameter for ridge regression
Defaults to 0.
Raises
------
LinAlgError
Could not invert the matrix, try changing the ele_pos slightly
KeyError
Basis function (src_type) not implemented.
See basis_functions.py for available
"""
self.own_src = kwargs.pop('own_src', np.array([]))
self.own_est = kwargs.pop('own_est', np.array([]))
if not self.own_est.any(): self.own_est = self.own_src
if not self.own_est.any() and not self.own_src.any():
raise KeyError('"own_src" is required argument to use oKCSD3D.' +
'If you would like to reconstruct in default ' +
'region of interest please use KCSD3D')
super(oKCSD3D, self).__init__(ele_pos, pots, **kwargs)
self.dim = 'own'
[docs]
def estimate_at(self):
"""Redefines locations where the estimation is wanted
Defines:
self.n_estm = self.estm_x.size
self.estm_x, self.estm_y, self.estm_z : Locations at which CSD is
requested.
"""
self.estm_x, self.estm_y, self.estm_z = self.own_est
self.n_estm = self.estm_x.size
[docs]
def place_basis(self):
"""Places basis sources of the defined type.
Checks if a given source_type is defined, if so then defines it
self.basis, This function gives locations of the basis sources,
Defines
source_type : basis_fuctions.basis_3D.keys()
self.R based on R_init
self.src_x, self.src_y, self.src_z : Locations at which basis sources are placed.
"""
source_type = self.src_type
try:
self.basis = basis.basis_3D[source_type]
except KeyError:
raise KeyError('Invalid source_type for basis! available are:',
basis.basis_3D.keys())
self.R = self.R_init
self.src_x, self.src_y, self.src_z = self.own_src
self.n_src = self.src_x.size
if __name__ == '__main__':
print('Checking 1D')
ele_pos = np.array(([-0.1], [0], [0.5], [1.], [1.4], [2.], [2.3]))
pots = np.array([[-1], [-1], [-1], [0], [0], [1], [-1.5]])
k = KCSD1D(ele_pos, pots,
gdx=0.01, n_src_init=300,
ext_x=0.0, src_type='gauss')
k.cross_validate()
print(k.values())
print('Checking 2D')
ele_pos = np.array([[-0.2, -0.2], [0, 0], [0, 1], [1, 0], [1, 1],
[0.5, 0.5], [1.2, 1.2]])
pots = np.array([[-1], [-1], [-1], [0], [0], [1], [-1.5]])
k = KCSD2D(ele_pos, pots,
gdx=0.05, gdy=0.05,
xmin=-2.0, xmax=2.0,
ymin=-2.0, ymax=2.0,
src_type='gauss')
k.cross_validate()
print(k.values())
print('Checking MoIKCSD')
k = MoIKCSD(ele_pos, pots,
gdx=0.05, gdy=0.05,
xmin=-2.0, xmax=2.0,
ymin=-2.0, ymax=2.0)
k.cross_validate()
print('Checking KCSD3D')
ele_pos = np.array([(0, 0, 0), (0, 0, 1), (0, 1, 0), (1, 0, 0),
(0, 1, 1), (1, 1, 0), (1, 0, 1), (1, 1, 1),
(0.5, 0.5, 0.5)])
pots = np.array([[-0.5], [0], [-0.5], [0], [0], [0.2], [0], [0], [1]])
k = KCSD3D(ele_pos, pots,
gdx=0.02, gdy=0.02, gdz=0.02,
n_src_init=1000, src_type='gauss_lim')
k.cross_validate()
print(k.values())