Source code for eigenstrapping.geometry

from lapy import TetMesh, TriaMesh, Solver
from lapy.diffgeo import compute_geodesic_f

import warnings
import subprocess
import tempfile
import os

import nibabel as nib
import numpy as np
from scipy.spatial import Delaunay, KDTree
from scipy.spatial.distance import squareform, pdist
from brainspace import mesh as me
from joblib import Parallel, delayed
from scipy.sparse.linalg import splu
import scipy.optimize as optimize

from brainspace.vtk_interface import wrap_vtk, serial_connect
from vtk import (vtkThreshold, vtkDataObject, vtkGeometryFilter)
from brainspace.utils.parcellation import relabel_consecutive
from brainspace.mesh.mesh_creation import build_polydata
from eigenstrapping.utils import (_suppress, _print, is_string_like,
                    enablePrint, blockPrint, eigen_decomposition)

from eigenstrapping.dataio import dataio
from sklearn.utils.validation import check_random_state
from eigenstrapping.rotations import indirect_method
from eigenstrapping.utils import _get_eigengroups
from scipy.spatial import distance_matrix
from sklearn.neighbors import BallTree
from scipy.optimize import linear_sum_assignment
import copy
from parspin import utils as putils

from tqdm import tqdm

ASSOC_CELLS = vtkDataObject.FIELD_ASSOCIATION_CELLS
ASSOC_POINTS = vtkDataObject.FIELD_ASSOCIATION_POINTS

#blockPrint()

"""
Helper utilities for geometry and registration

    - Runs mri_mc from Freesurfer to create 2d surface
    - Projects 2d surface using gmsh
    - Writes out geometry in tetrahedral format, triangular format, or in Freesurfer binary
    - Writes out label files
    - Registers from native space to MNI152 and fsaverage space

read_geometry() was taken from nibabel.freesurfer package (https://github.com/nipy/nibabel/blob/master/nibabel/freesurfer/io.py).
get_tkrvox2ras(), make_tetra_file(), normalize_vtk(), and calc_volume() taken
from <https://github.com/BMHLab/BrainEigenmodes>, authors J. Pang and K. Aquino.

"""

# mni152_2mm = np.asarray(
#                  [[  2.,    0.,   0.,    -90,],
#                   [ -0.,    2.,   0.,   -126,],
#                   [ -0.,    0.,   2.,    -72,],
#                   [  0.,    0.,   0.,      1.]])

# mni152_1mm = np.asarray(
#                 [[  -1.,    0.,    0.,   90.],
#                  [   0.,    1.,    0., -126.],
#                  [   0.,    0.,    1.,  -72.],
#                  [   0.,    0.,    0.,    1.]])

# def _check_mni(in_file):
#     """
#     Checks if input image is in MNI152 space
#     """
    
#     img = image.load_img(in_file)
    
#     if img.affine != mni152_2mm:
#         if img.affine != mni152_1mm:
#             return False
        
#     else:
#         return True
    
# def native_to_mni152(in_file, nonlinear=True):
#     """
#     Linear or nonlinear registration of native volumetric image to MNI152 space
#     Uses ANTsPy
#     """
    
#     img = image_read(in_file)
    
#     # get template image
#     mni_file = fetch_mni152(density='1mm').get('2009cAsym_T1w')
#     mni = image_read(mni_file)

#     if nonlinear is True:
#         transform_type='SyN'
        
#     else:
#         transform_type='Affine'
        
#     # do transform
#     fixed_image = mni
#     moving_image = img
    
#     mytx = registration(fixed=fixed_image, moving=moving_image, type_of_transform=transform_type)
    
#     warped_moving_image = apply_transforms(fixed=fixed_image, moving=moving_image,
#                                            transformlist=mytx['fwdtransforms'])
    
#     # rebuild as nib.Nifti1Image
#     transformed_image = warped_moving_image.to_nibabel()
    
#     return transformed_image

[docs] def geodesic_distmat(surface, mask_input=None, use_cholmod=False, n_jobs=1, m=1.0): """ Compute geodesic distance using the heat diffusion method built into LaPy Based on: doi/10.1145/2516971.2516977 Crane et al., "Geodesics in heat: A new approach to computing distance based on heat flow" Parameters ---------- surf : str Path to input surface mask_input : array_like or str Mask to remove from `surf` (i.e. medial wall) during the geodesic distance calculations use_cholmod : bool, optional Specify whether to use ``scikit-sparse`` cholmod libraries (much faster), if False then uses ``scipy.sparse`` libraries instead (much slower). Default False. n_jobs : int, optional Number of workers to use for parallel calls to ._thread_method(), default is 1. m : float, optional Scaling factor for time in the heat kernel calculation. As `m` decreases, distances increase in accuracy (if you care) but increases computation time. Best put between 1.0 and 0.1. Default 1.0. Returns ------- D : (N,N) np.ndarray Distance matrix of every vertex to every other vertex """ # load surface (as a brainspace object) surface_orig = me.mesh_io.read_surface(surface) # load mask # can be any ROI (even whole cortex) if isinstance(mask_input, np.ndarray): mask = mask_input elif mask_input is None: mask = np.ones(len(surface_orig.GetPoints().astype('float32'))) else: mask_input_file_main, mask_input_file_ext = os.path.splitext(mask_input) if mask_input_file_ext == '.txt': mask = np.loadtxt(mask_input) elif mask_input_file_ext == '.gii': mask = nib.load(mask_input).darrays[0].data else: mask = mask_input # create temporary suface based on mask surface_cut = _surface_mask(surface_orig, mask) # load surface (as a lapy object) vertices = surface_cut.GetPoints().astype('float32') faces = me.mesh_elements.get_cells(surface_cut).astype('int32') tria = TriaMesh(vertices, faces) fem = Solver(tria, lump=True, use_cholmod=use_cholmod) # time of heat evolution: t = m * tria.avg_edge_length() ** 2 # backward Euler matrix: factor = fem.mass + t * fem.stiffness # keep factor, dummy. the slowest part is the cholmod factorization :grug: D = __distance__(tria, factor, n_jobs=n_jobs) # if parc: # D = np.row_stack([ # D[parc == lab].mean(axis=0) for lab in np.unique(parc) # ]) # D[np.diag_indices_from(D)] = 0 # D = D[1:, 1:] return D
def __distance__(tria, factor, n_jobs=1, use_cholmod=False): D = np.column_stack( Parallel(n_jobs=n_jobs, prefer='processes')( delayed(_distance_method)(tria, factor, use_cholmod=use_cholmod, v=v) for v in tqdm(range(tria.v.shape[0])) ) ) return np.asarray(D.squeeze()) def _distance_method(tria, factor, use_cholmod=False, v=0): b0 = np.zeros((tria.v.shape[0])) b0[np.array(v)] = 1.0 if use_cholmod: sksparse = import_optional_dependency("sksparse", raise_error=True) importlib.import_module(".cholmod", sksparse.__name__) #print("Solver: Cholesky decomposition from scikit-sparse cholmod ...") factor = sksparse.cholmod.cholesky(factor) u = factor(b0) else: sksparse=None from scipy.sparse.linalg import splu #print("Solver: spsolve (LU decomposition) ...") factor = splu(factor) u = factor.solve(b0) d = compute_geodesic_f(tria, u) return d def gaussian(x, amplitude, mean, stddev): return amplitude * np.exp(- ((x - mean) ** 2) / (2 * (stddev ** 2))) def estimate_fwhm(image): # Find the maximum value and its position in the image max_val = np.max(image) max_pos = np.argmax(image) max_row, max_col = np.unravel_index(max_pos, image.shape) # Create a 1d profile along the maximum row and column row_profile = image[max_row, :] col_profile = image[:, max_col] options = {'maxfev' : np.prod(image.shape)} # Fit a gaussian curve to the row profile row_x = np.arange(len(row_profile)) row_params, _ = optimize.curve_fit(gaussian, row_x, row_profile, p0=[max_val, max_col, 1], **options) # Fit a gaussian curve to the column profile col_x = np.arange(len(col_profile)) col_params, _ = optimize.curve_fit(gaussian, col_x, col_profile, p0=[max_val, max_row, 1], **options) # Calculate the FWHM as 2.355 times the std dev of the Gaussian row_fwhm = 2.355 * row_params[2] col_fwhm = 2.355 * col_params[2] # take the average fwhm = (row_fwhm + col_fwhm) / 2 return fwhm def resel_count(image, fwhm): # returns resel count of image given FWHM return np.prod(image.shape / fwhm)
[docs] def get_tkrvox2ras(voldim, voxres): """Generate transformation matrix to switch between tetrahedral and volume space. Parameters ---------- voldim : array (1x3) Dimension of the volume (number of voxels in each of the 3 dimensions) voxres : array (!x3) Voxel resolution (resolution in each of the 3 dimensions) Returns ------ T : array (4x4) Transformation matrix """ T = np.zeros([4,4]); T[3,3] = 1; T[0,0] = -voxres[0]; T[0,3] = voxres[0]*voldim[0]/2; T[1,2] = voxres[2]; T[1,3] = -voxres[2]*voldim[2]/2; T[2,1] = -voxres[1]; T[2,3] = voxres[1]*voldim[1]/2; return T
def make_tetra(volume, label=None, aseg=False, norm=None, verbose=True): """ Generate tetrahedral version of the ROI in the nifti file. Can specify label value. Parameters ---------- volume : str Filename of input volume where the relevant ROI has voxel values = 1 or ``label`` label : int or list of ints, optional Label value(s) of input volume. Extracts surface from voxels that have this(ese) intensity value(s). If None, defaults to 1 aseg : bool, optional Specify whether input volume and label is from FreeSurfer aseg.nii.gz (post-FS preprocessing). Returns ------- tetra_file : str Filename of output tetrahedral vtk file Raises ------ RuntimeError Multiple labels given but aseg not passed. """ if verbose: enablePrint() func = _print else: blockPrint() func = _suppress voldir = os.path.dirname(volume) tmpf = tempfile.NamedTemporaryFile(suffix='.mgz') tmpf = tmpf.name multiple = False if label is None: label = ' '.join('1') elif type(label) == list: if len(label) > 1: multiple = True tmp_label = ' ' for idx in range(len(label)): tmp_label.join(str(label[idx])) label = tmp_label else: label = ' '.join(str(label)) if aseg is False: if multiple is True: raise RuntimeError('Multiple label values given, aseg must be passed') # binarize first cmd = 'mri_binarize --i ' + volume + ' --match ' + label + ' --o ' + tmpf output = func(cmd, shell="True") # pass norm for pretess if aseg is True and norm is not None: cmd = 'mri_pretess ' + tmpf + ' 1 ' + norm + ' ' + tmpf output = func(cmd, shell="True") # run marching cubes cmd = 'mri_mc ' + tmpf + ' 1 ' + voldir + '/tmp_surface.vtk' output = func(cmd, shell="True") geo_file = volume + '.geo' tria_file = volume + '.vtk' tetra_file = volume + '.tetra.vtk' cmd = 'mv -f ' + voldir + '/tmp_surface.vtk ' + tria_file output = func(cmd, shell='True') file = tria_file.rsplit('/') inputGeo = file[len(file)-1] with open(geo_file, 'w') as writer: writer.write('Mesh.Algorithm3D=4;\n') writer.write('Mesh.Optimize=1;\n') writer.write('Mesh.OptimizeNetgen=1;\n') writer.write('Merge "'+inputGeo+'";\n') writer.write('Surface Loop(1) = {1};\n') writer.write('Volume(1) = {1};\n') writer.write('Physical Volume(1) = {1};\n') cmd = 'gmsh -3 -o ' + tetra_file + ' ' + geo_file output = func(cmd, shell="True") cmd = "sed 's/double/float/g;s/UNSTRUCTURED_GRID/POLYDATA/g;s/CELLS/POLYGONS/g;/CELL_TYPES/,$d' " + tetra_file + " > " + tetra_file + "'_fixed'" output = func(cmd, shell="True") cmd = 'mv -f ' + tetra_file + '_fixed ' + tetra_file output = func(cmd, shell="True") # remove auxiliary files os.remove(geo_file) os.remove(tria_file) return tetra_file def remesh(m, outfile=None, oformat='vtk'): """ Generate (re)meshed version of the volume/mesh in `m`. Goes from volume to triangular mesh, or goes from triangular to tetrahedral mesh (and vice-versa). Function does not go back from triangular or tetrahedral mesh to volume. Parameters ---------- m : str The filename of the volume/mesh. outfile : str, optional The filename of the output mesh, default None. oformat : str, optional Output format of mesh, default "vtk" Returns ------- file : str Filename of output tetra or triangular mesh. """ _, ext = os.path.splitext(m) if ext == '.nii': return vol_to_tria(m, outfile=outfile, oformat=oformat) mesh = me.mesh_io.read_surface(m) if mesh.GetCells2D().shape[-1] == 4: func = to_tria elif mesh.GetCells2D().shape[-1] == 3: func = to_tetra else: raise RuntimeError('Unknown file structure, check input') file = func(m, outfile=outfile, oformat=oformat) return file def vol_to_tria(volume, outfile=None, oformat='vtk', verbose=False): """ Converts the volume in `volume` to a triangular surface using FreeSurfer `mri_mc` and other libraries. Parameters ---------- volume : str Path to file to convert to triangular mesh. outfile : str, optional The filename of the output mesh, default None. oformat : str, optional Output format of mesh, default "vtk" verbose : bool, optional Print output messages, default False Returns ------- tria_file : str Filename of saved triangular mesh. """ tmpf = tempfile.NamedTemporaryFile(suffix='.mgz') tmpf = tmpf.name voldir = os.path.dirname(volume) volname = os.splitext(volume)[0] if outfile is None: outfile = volname + '.tria.' + oformat if verbose is True: enablePrint() func = _print else: blockPrint() func = _suppress cmd = 'mri_binarize --i ' + volume + ' --match 1' + ' --o ' + tmpf output = func(cmd, shell="True") # run marching cubes tmp_tria = voldir + '/tmp_surface.' + oformat cmd = 'mri_mc ' + tmpf + ' 1 ' + tmp_tria output = func(cmd, shell="True") cmd = 'mv -f ' + voldir + '/tmp_surface.' + oformat + ' ' + outfile output = func(cmd, shell='True') os.unlink(tmpf) os.unlink(tmp_tria) return outfile def to_tetra(tria_file, outfile=None, oformat='vtk'): """ Converts the triangular surface contained in `tria_file` into a tetrahedral mesh by using `gmsh` routines. Parameters ---------- tria_file : str Filename of triangular surface. outfile : str, optional Filename of output tetrahedral mesh. If None, adds the suffix .tetra.<oformat> to the `tria_file` filename. oformat : str, optional Format of output mesh, accepts `vtk`, `gii`, `ply`, `obj` Returns ------- outfile : str Filename of output triangular mesh. """ if oformat not in ['vtk', 'gii', 'ply', 'obj']: raise ValueError('Output format not valid: {}'.format(str(oformat))) if outfile is None: outfile = os.path.splitext(tria_file)[0] + '.tetra.' + oformat geo_file = tria_file + '.geo' tetra_file = outfile inputGeo = os.path.split(tria_file)[-1] print(inputGeo) with open(geo_file, 'w') as writer: writer.write('Mesh.Algorithm3D=4;\n') writer.write('Mesh.Optimize=1;\n') writer.write('Mesh.OptimizeNetgen=1;\n') writer.write('Merge "'+inputGeo+'";\n') writer.write('Surface Loop(1) = {1};\n') writer.write('Volume(1) = {1};\n') writer.write('Physical Volume(1) = {1};\n') cmd = 'gmsh -3 -o ' + tetra_file + ' ' + geo_file output = _print(cmd, shell="True") cmd = "sed 's/double/float/g;s/UNSTRUCTURED_GRID/POLYDATA/g;s/CELLS/POLYGONS/g;/CELL_TYPES/,$d' " + tetra_file + " > " + tetra_file + "'_fixed'" output = _print(cmd, shell="True") cmd = 'mv -f ' + tetra_file + '_fixed ' + tetra_file output = _print(cmd, shell="True") # remove auxiliary files os.remove(geo_file) return outfile def to_tria(tetra_file, outfile=None, oformat='vtk', verbose=True): """ Converts the tetrahedral mesh contained in `tetra_file` into a triangular surface by bounding the convex hull of the tetrahedra. Parameters ---------- tetra_file : str Filename of tetrahedral mesh. outfile : str, optional Filename of output triangular mesh. If None, adds the suffix .tria.<oformat> to the `tetra_file` filename. oformat : str, optional Format of output mesh, accepts `vtk`, `gii`, `ply`, `obj` Returns ------- outfile : str Filename of output triangular mesh. """ if oformat not in ['vtk', 'gii', 'ply', 'obj']: raise ValueError('Output format not valid: {}'.format(str(oformat))) if outfile is None: outfile = os.path.splitext(tetra_file)[0] + '.tria.' + oformat m = me.mesh_io.read_surface(tetra_file) tetra = TetMesh(m.GetPoints(), m.GetCells2D()) # get all triangles allt = np.vstack( ( tetra.t[:, np.array([3, 1, 2])], tetra.t[:, np.array([2, 0, 3])], tetra.t[:, np.array([1, 3, 0])], tetra.t[:, np.array([0, 2, 1])], ) ) # sort rows so that faces are reorder in ascending order of indices allts = np.sort(allt, axis=1) # find unique trias without a neighbor tria, indices, count = np.unique( allts, axis=0, return_index=True, return_counts=True ) tria = allt[indices[count == 1]] blockPrint() if verbose: enablePrint() print("Found " + str(np.size(tria, 0)) + " triangles on boundary.") # if we have tetra function, map these to the boundary triangles surf = TriaMesh(tetra.v, tria) surf.write_vtk(outfile) return outfile
[docs] def make_tria_file(nifti_input_filename): """ Generate triangular mesh of the ROI in the nifti file. Parameters ---------- nifti_input_filename : str Filename of input volume where the relevant ROI have voxel values = 1 Returns ------- tria_file : str Filename of output triangular vtk file """ nifti_input_file_head, nifti_input_file_tail = os.path.split(nifti_input_filename) nifti_input_file_main, nifti_input_file_ext = os.path.splitext(nifti_input_file_tail) os.system('mri_mc ' + nifti_input_filename + ' 1 ' + nifti_input_file_head + '/rh.tmp_surface.vtk') os.system('mv -f ' + nifti_input_file_head + '/rh.tmp_surface.vtk ' + nifti_input_filename + '.vtk') geo_file = nifti_input_filename + '.geo' tria_file = nifti_input_filename + '.vtk' file = tria_file.rsplit('/') inputGeo = file[len(file)-1] with open(geo_file, 'w') as writer: writer.write('Mesh.Algorithm3D=4;\n') writer.write('Mesh.Optimize=1;\n') writer.write('Mesh.OptimizeNetgen=1;\n') writer.write('Merge "'+inputGeo+'";\n') writer.write('Surface Loop(1) = {1};\n') writer.write('Volume(1) = {1};\n') writer.write('Physical Volume(1) = {1};\n') cmd = 'gmsh -2 -o ' + tria_file + ' ' + geo_file output = subprocess.check_output(cmd,shell="True") output = output.splitlines() cmd = "sed 's/double/float/g;s/UNSTRUCTURED_GRID/POLYDATA/g;s/CELLS/POLYGONS/g;/CELL_TYPES/,$d' " + tria_file + " > " + tria_file + "'_fixed'" os.system(cmd) os.system('mv -f ' + tria_file + '_fixed ' + tria_file) return tria_file
[docs] def load_mesh(mesh): """ Mesh loader function. Parameters ---------- mesh : str or array-like Filename 'vtk'-like, or tuple of surface (expecting two arrays of `vertices` and `faces`) Returns ------- ``lapy`` compatible object : `TetMesh` or `TriaMesh` Loaded mesh in ``lapy`` format """ if isinstance(mesh, tuple) or isinstance(mesh, list): m = build_polydata(mesh[0], mesh[1]) elif is_string_like(mesh): m = me.mesh_io.read_surface(mesh) else: raise ValueError('Unknown file type, check input') if m.GetCells2D().shape[-1] == 4: func = TetMesh elif m.GetCells2D().shape[-1] == 3: func = TriaMesh else: raise RuntimeError('Unknown file structure, check input') return func(m.GetPoints(), m.GetCells2D())
[docs] def create_temp_surface(surface_input, surface_output_filename): """Write surface to a new vtk file. Parameters ---------- surface_input : brainspace compatible object Loaded vtk object corresponding to a surface triangular mesh surface_output_filename : str Filename of surface to be saved """ f = open(surface_output_filename, 'w') f.write('# vtk DataFile Version 2.0\n') f.write(surface_output_filename + '\n') f.write('ASCII\n') f.write('DATASET POLYDATA\n') f.write('POINTS ' + str(np.shape(surface_input.Points)[0]) + ' float\n') for i in range(np.shape(surface_input.Points)[0]): f.write(' '.join(map(str, np.array(surface_input.Points[i, :])))) f.write('\n') f.write('\n') f.write('POLYGONS ' + str(np.shape(surface_input.polys2D)[0]) + ' ' + str(4* np.shape(surface_input.polys2D)[0]) + '\n') for i in range(np.shape(surface_input.polys2D)[0]): f.write(' '.join(map(str, np.append(3, np.array(surface_input.polys2D[i, :]))))) f.write('\n') f.close()
def get_indices(surface_original, surface_new): """Extract indices of vertices of the two surfaces that match. Parameters ---------- surface_original : brainspace compatible object Loaded vtk object corresponding to a surface triangular mesh surface_new : brainspace compatible object Loaded vtk object corresponding to a surface triangular mesh Returns ------ indices : array indices of vertices """ indices = np.zeros([np.shape(surface_new.Points)[0],1]) for i in range(np.shape(surface_new.Points)[0]): indices[i] = np.where(np.all(np.equal(surface_new.Points[i,:],surface_original.Points), axis=1))[0][0] indices = indices.astype(int) # potential target for improved runtime # takes several minutes for a big mesh return indices
[docs] def calc_eig(mesh, num_modes, use_cholmod=False, remove_zero=False): """Calculate the eigenvalues and eigenmodes of a surface. Parameters ---------- mesh : lapy compatible object Loaded vtk object corresponding to a surface mesh num_modes : int Number of eigenmodes to be calculated Returns ------ evals : array (num_modes x 1) Eigenvalues emodes : array (number of surface points x num_modes) Eigenmodes """ fem = Solver(mesh, use_cholmod=use_cholmod) evals, emodes = fem.eigs(k=num_modes) if remove_zero: return evals[1:], emodes[:, 1:] return evals[:-1], emodes[:, :-1]
[docs] def calc_surface_eigenmodes(surface_input_filename, mask_input, save_cut=False, num_modes=200, use_cholmod=False, remove_zero=True): """Main function to calculate the eigenmodes of a cortical surface with application of a mask (e.g., to remove the medial wall). Parameters ---------- surface_input_filename : str Filename of input surface with ``n_vertices`` mask_input : str or 1D ndarray Filename of mask to be applied on the surface (e.g., cortex without medial wall, values = 1 for mask and 0 elsewhere) save_cut : boolean Boolean to decide if the new surface with mask applied will be saved to a new surface file num_modes : int Number of eigenmodes to be calculated use_cholmod : bool, optional Spec to use cholesky libraries of ``scikit-sparse``. Requires installation of ``scikit-sparse`` and ``libsuitesparse-dev``. Returns ------- np.ndarray of shape=(num_modes,) Eigenvalues computed on masked surface. np.ndarray of shape=(n_vertices, num_modes) Eigenmodes corresponding to eigenvalues computed on masked surface padded with zeros to match size of original surface (``n_vertices``) """ # load surface (as a brainspace object) surface_orig = me.mesh_io.read_surface(surface_input_filename) # load mask # can be any ROI (even whole cortex) if isinstance(mask_input, np.ndarray): mask = mask_input else: mask_input_file_main, mask_input_file_ext = os.path.splitext(mask_input) if mask_input_file_ext == '.txt': mask = np.loadtxt(mask_input) elif mask_input_file_ext == '.gii': mask = nib.load(mask_input).darrays[0].data else: mask = mask_input # create temporary suface based on mask surface_cut = _surface_mask(surface_orig, mask) # load surface (as a lapy object) vertices = surface_cut.GetPoints().astype('float32') faces = me.mesh_elements.get_cells(surface_cut).astype('int32') tria = TriaMesh(vertices, faces) # calculate eigenvalues and eigenmodes evals, emodes = calc_eig(tria, num_modes+1, use_cholmod, remove_zero=remove_zero) # get indices of vertices of surface_orig that match surface_cut indices = get_indices(surface_orig, surface_cut) # reshape emodes to match vertices of original surface emodes_reshaped = np.zeros([surface_orig.n_points,np.shape(emodes)[1]]) for mode in range(np.shape(emodes)[1]): emodes_reshaped[indices,mode] = np.expand_dims(emodes[:,mode], axis=1) return evals, emodes_reshaped
def _surface_mask(surf, mask, use_cell=False): """Selection fo points or cells meeting some criteria. Parameters ---------- surf : vtkPolyData or BSPolyData Input surface. mask : str or ndarray Binary boolean or integer array. Zero or False elements are discarded. use_cell : bool, optional If True, apply selection to cells. Otherwise, use points. Default is False. Returns ------- surf_masked : BSPolyData PolyData after masking. """ if isinstance(mask, np.ndarray): if np.issubdtype(mask.dtype, np.bool_): mask = mask.astype(np.uint8) else: mask = surf.get_array(name=mask, at='c' if use_cell else 'p') if np.any(np.unique(mask) > 1): raise ValueError('Cannot work with non-binary mask.') return _surface_selection(surf, mask, low=1, upp=1, use_cell=use_cell) def _surface_selection(surf, array, low=-np.inf, upp=np.inf, use_cell=False): """Selection of points or cells meeting some thresholding criteria. Parameters ---------- surf : vtkPolyData or BSPolyData Input surface. array : str or ndarray Array used to perform selection. low : float or -np.inf Lower threshold. Default is -np.inf. upp : float or np.inf Upper threshold. Default is +np.inf. use_cell : bool, optional If True, apply selection to cells. Otherwise, use points. Default is False. Returns ------- surf_selected : BSPolyData Surface after thresholding. """ if low > upp: raise ValueError('Threshold not valid: [{},{}]'.format(low, upp)) at = 'c' if use_cell else 'p' if isinstance(array, np.ndarray): drop_array = True array_name = surf.append_array(array, at=at) else: drop_array = False array_name = array array = surf.get_array(name=array, at=at, return_name=False) if array.ndim > 1: raise ValueError('Arrays has more than one dimension.') if not use_cell: order_name = surf.append_array(np.arange(surf.n_points), at='p') if low == -np.inf: low = array.min() if upp == np.inf: upp = array.max() tf = wrap_vtk(vtkThreshold, allScalars=True) tf.SetUpperThreshold(upp) tf.SetLowerThreshold(low) if use_cell: tf.SetInputArrayToProcess(0, 0, 0, ASSOC_CELLS, array_name) else: tf.SetInputArrayToProcess(0, 0, 0, ASSOC_POINTS, array_name) gf = wrap_vtk(vtkGeometryFilter(), merging=False) surf_sel = serial_connect(surf, tf, gf) # Check results n_exp = np.logical_and(array >= low, array <= upp).sum() n_sel = surf_sel.n_cells if use_cell else surf_sel.n_points if n_exp != n_sel: element = 'cells' if use_cell else 'points' warnings.warn('Number of selected {}={}. Expected {}.' 'This may be due to the topology after selection.'. format(element, n_exp, n_sel)) if drop_array: surf.remove_array(name=array_name, at=at) surf_sel.remove_array(name=array_name, at=at) if not use_cell: surf_sel = _sort_polydata_points(surf_sel, order_name) surf_sel.remove_array(name=order_name, at='p') surf.remove_array(name=order_name, at='p') return surf_sel def _sort_polydata_points(surf, labeling, append_data=True): if isinstance(labeling, str): labeling = surf.get_array(labeling, at='p') lab_con = relabel_consecutive(labeling) idx_sorted = np.argsort(lab_con) new_pts = surf.Points[idx_sorted] # new_cells = relabel(surf.GetCells2D().ravel(), lab_con).reshape(-1, 3) new_cells = lab_con[surf.GetCells2D()] s = build_polydata(new_pts, cells=new_cells) if append_data is None or append_data is False: return s if append_data is True: append_data = {'p', 'c', 'f'} elif isinstance(append_data, str): append_data = {append_data} for at in append_data: for v, k in zip(*surf.get_array(at=at, return_name=True)): if at in {'p', 'point'}: v = v[idx_sorted] s.append_array(v, name=k, at=at) return s def find_wavelength(surface, data, roi=None, column=None, demean=True): """ Compute the characteristic wavelength of the data in `data` based on the full-width half-maximum (FWHM). This is equal to 2*FWHM. This function simply wraps ``wb_command -metric-estimate-fwhm``, converting stdout to a Python float. Can be used to estimate the number of eigengroups to include in the reconstruction of surrogates in :class:`eigenstrapping.SurfaceEigenstrapping.generate()` based on the relationship of eigenvalue to spatial scale. Parameters ---------- surface : str to filepath Surface for `data`. data : np.ndarray or str to filepath (N,) Data on `surface` to find FWHM roi : str to filepath, optional Region-of-interest mask. Default None column Returns ------- cost : float If `return_cost`, returns maximum cost of kde fit. wavelength : float Value for full-width half-maximum of `data`. Notes ----- This function uses Euclidean distance to measure nearest neighbors as an approximation, and assumes a Gaussianity to the smoothness in `data`. It may have unpredictable effects if the data is highly non-Gaussian. Use with caution. If the fit is bad (cost is high) then we recommend the variogram calculation in :func:`eigenstrapping.fit.surface_fit()` """ data = dataio(data) tmpf = tempfile.NamedTemporaryFile(suffix='.func.gii').name metric = nib.GiftiImage() metric.add_gifti_data_array(nib.gifti.gifti.GiftiDataArray(data.astype(np.float32))) nib.save(metric, tmpf) if roi is not None: roistr = '-roi {roi}' else: roistr = '' if column is not None: colstr = '-column {column}' wholestr = '' else: colstr = '' wholestr = '-whole-file' if demean is True: destr = '-demean' else: destr = '' cmd = f'wb_command -metric-estimate-fwhm {surface} {tmpf} {roistr} {colstr} {wholestr} {destr}' output = subprocess.check_output(cmd, shell="True") output = output.splitlines() os.unlink(tmpf) return 2*np.pi*float(output[0].decode('ascii').split(' ')[1]) def truncate_emodes(surface, data, emodes, evals, mask=None, ret_fwhm=False, find_group_only=False, per_spectrum=None): """ Uses the estimation of full-width half-maximum to truncate the number of eigenmodes to use for surrogate generation. Provides a heuristic to assist the end-user with number of mode selection. How it works is it finds the first eigenvalue such that: 2 * pi argmin(--------) <= 2*fwhm lam Actually truncates at the number of *whole* groups that include this eigenvalue or smaller, corresponding to larger wavelengths. Either "rounds" up or down, depending on whether the eigenvalue index of `lam` is greater or lesser than the total number of eigenvalues in the group G, where G is the group number. In other words, floor(G(lam)) if ||lam|| < np.sum(||lam_G||), ceil(G(lam)) if ||lam|| >= np.sum(||lam_G||) Also has the ability to truncate modes that account for `per_spectrum` percentage of the power spectra in `data`, which is quicker and may suit your end-goals better, particularly if small-scale spatial structure doesn't matter to your map. For example, per_spectra=0.9 would truncate the eigenmodes and eigenvalues to the total number of groups that contain 90% or more of the modal power spectrum in `data`. Parameters ---------- data : np.ndarray or str to filepath (N,) Surface data to find FWHM surface : nib.GiftiImage like or filepath Surface for `data`. Must have coordinates of shape (N, 3) mask : np.ndarray or str to filepath (N,), optional Mask for values in `data` to include e.g. non-medial wall vertices. Default None ns : int, optional Number of samples for random sampling. Default 100 knn : int, optional Number of neighbors to calculate FWHM. Default 100 seed : None or int or np.random.RandomState, optional Seed for random number generation. Default None ret_fwhm : bool, optional If True, returns estimated `fwhm`. Default False find_group_only : bool, optional If True, this function does not truncate `emodes` and `evals`, and only returns the group number that contains the eigenvalue where the above equation is true. per_spectra : None or float, optional If True, truncates `emodes` and `evals` at whole number of groups with `per_spectrum` Returns ------- emodes_truncated, evals_truncated : np.ndarrays Eigenmodes and corresponding eigenvalues as whole groups with higher or equal wavelengths to the smallest `fwhm` in `data`. Group number to truncate `emodes` and `evals` at is rounded to nearest group with full membership. fwhm : float Value for full-width half-maximum of `data`. Only returned if `ret_fwhm` is True. Exclusive with `per_spectra`. group : int Eigengroup number (zero-indexed) Notes ----- This function assumes a Gaussianity to the smoothness in `data`. It may have unpredictable effects if the data is highly non-Gaussian. Use with caution. If the data is highly non-Gaussian then we recommend the variogram calculation in :func:`eigenstrapping.fit.surface_fit()` """ groups = _get_eigengroups(emodes) if per_spectrum is not None: coeffs = eigen_decomposition(data[mask], emodes[mask]) power = np.abs(coeffs)**2 normed_power = power/np.sum(power) total_power = 0.0 j = 0 while total_power <= per_spectrum*np.sum(normed_power): total_power += np.sum(normed_power[groups[j]]) j += 1 if find_group_only: return j else: emodes_truncated, evals_truncated = (emodes[:, :groups[j][-1] + 1], evals[:groups[j][-1] + 1]) return evals_truncated, emodes_truncated fwhm = find_wavelength(surface, data, demean=False) j = 0 while j <= len(groups): for k in range(j): l = 2*np.pi/np.sqrt(evals[groups[j]][k]) if l <= fwhm: if find_group_only: if ret_fwhm: return fwhm, j if (groups[j][k] - groups[j][0]) >= (groups[j][-1] - groups[j][0]) else j-1 return j if (groups[j][k] - groups[j][0]) >= (groups[j][-1] - groups[j][0]) else j-1 emodes_truncated, evals_truncated = (emodes[:, :groups[j][-1] + 1], evals[:groups[j][-1] + 1]) if (groups[j][k] - groups[j][0]) >= (groups[j][-1] - groups[j][0]) else (emodes[:, :groups[j-1][-1] + 1], evals[:groups[j-1][-1] + 1]) if ret_fwhm: return fwhm, evals_truncated, emodes_truncated else: return evals_truncated, emodes_truncated j += 1 raise RuntimeError('Could not find eigenvalue for fwhm\n' 'consider using more modes.') def gen_eigensamples(emodes, evals, mask=None, n_rotate=1000, num_modes=100, check_duplicates=True, method='original', seed=None, n_jobs=1): """ compute a permutation matrix for a set of eigenmodes, based on the concept of rotating the eigenmodes in the eigenspace per group. Produces a sparse permutation matrix of (`n_rotate`, N, M). Parameters ---------- emodes : np.ndarray of shape (N, M) eigenmodes to resample evals : np.ndarray of shape (M,) eigenvalues corresponding to `emodes` n_rotate : int, optional number of permutations to compute. Default 1000 check_duplicates: bool, optional Whether to check for and attempt to avoid duplicate resamplings. A warnings will be raised if duplicates cannot be avoided. Setting to True may increase the runtime of this function! Default: True seed : None or int or np.random.RandomState, optional seed for random number generation. Default None method : {'original', 'vasa', 'hungarian'}, optional Method by which to match non- and rotated coordinates. Default: 'original' verbose : bool, optional Whether to print occasional status messages. Default: False return_cost : bool, optional Whether to return cost array (specified as Euclidean distance) for each coordinate for each rotation Default: True Returns ------- samples : np.ndarray of ints of shape (`n_rotate`, N, M) nearest neighbor remapping matrix """ from multiprocessing import Pool seed = check_random_state(seed) # mask emodes emodes_masked = copy.deepcopy(emodes[:, :num_modes]) emodes_masked = emodes_masked[mask] evals = copy.deepcopy(evals) evals = evals[:num_modes] groups = _get_eigengroups(emodes_masked) # TODO implement costs #cost = np.zeros((n_rotate, len(emodes_masked), len(groups))) inds = np.arange(len(emodes_masked), dtype='int32') # empty array to store resampling indices #eigen_spins = np.zeros((n_rotate, len(emodes_masked), len(evals)), dtype='int32') # generate resampled modes and fill indexing array def _gen_method(emodes_masked, evals, groups, method='original', seed=0): resampled = np.zeros((len(emodes_masked), len(groups)), dtype='int32') for j in range(len(groups)): group = emodes_masked[:, groups[j]] / np.sqrt(evals[groups[j]]) rot = indirect_method(len(groups[j]), seed=seed) new_group = group @ rot * np.sqrt(evals[groups[j]]) if method == 'original': dist, col = BallTree(new_group).query(emodes_masked[:, groups[j]], 1) col = col.reshape(-1,) #cols = np.stack(cols, axis=-1) #cost[:, j] = dist elif method == 'hungarian': dist = distance_matrix(group, new_group) row, col = optimize.linear_sum_assignment(dist) #cost[hinds, n] = dist[row, col] elif method == 'vasa': dist = distance_matrix(group, new_group) # min of max a la Vasa et al., 2018 col = np.zeros(len(group), dtype='int32') for _ in range(len(dist)): # find parcel whose closest neighbor is farthest away # overall; assign to that row = dist.min(axis=1).argmax() col[row] = dist[row].argmin() #cost[inds[hinds][row], n] = dist[row, col[row]] # set to -inf and inf so they can't be assigned again dist[row] = -np.inf dist[:, col[row]] = np.inf else: raise ValueError('unrecognized method {}\n' 'choose from "original", "hungarian" or "vasa"'.format(method)) resampled[:, j] = inds[col] return resampled eigen_spins = Parallel(n_jobs=n_jobs)( delayed(_gen_method)( emodes_masked, evals, groups, method, seed=n) for n in putils.trange(n_rotate) ) return np.asarray(eigen_spins).squeeze() def spin_modes(emodes, evals, mask=None, n_rotate=1000, eigen_spins=None, num_modes=100, **kwargs): """ compute a permutation matrix for a set of eigenmodes, based on the concept of rotating the eigenmodes in the eigenspace per group. Produces a sparse permutation matrix of (`n_rotate`, N, M). Parameters ---------- emodes : np.ndarray of shape (N, M) eigenmodes to resample evals : np.ndarray of shape (M,) eigenvalues corresponding to `emodes` n_rotate : int, optional number of permutations to compute. Default 1000 check_duplicates: bool, optional Whether to check for and attempt to avoid duplicate resamplings. A warnings will be raised if duplicates cannot be avoided. Setting to True may increase the runtime of this function! Default: True seed : None or int or np.random.RandomState, optional seed for random number generation. Default None method : {'original', 'vasa', 'hungarian'}, optional Method by which to match non- and rotated coordinates. Default: 'original' verbose : bool, optional Whether to print occasional status messages. Default: False return_cost : bool, optional Whether to return cost array (specified as Euclidean distance) for each coordinate for each rotation Default: True Returns ------- resampled_emodes : np.ndarray of shape (`n_rotate`, N, M) new eigenmodes matrix after resampling procedure Notes ----- While applicable in most circumstances, resampled modes, especially with ``method='original'`` set, may result in some inaccuracies of reconstructions of surrogates, particularly in the presence of near-white maps or """ if eigen_spins is None: eigen_spins = gen_eigensamples(emodes, evals, mask=mask, n_rotate=n_rotate, num_modes=num_modes, **kwargs) emodes_masked = copy.deepcopy(emodes[:, :num_modes]) emodes_masked = emodes_masked[mask] evals = copy.deepcopy(evals[:num_modes]) resampled_emodes = np.zeros((n_rotate, len(emodes), len(evals)), dtype=emodes.dtype) groups = _get_eigengroups(emodes_masked) for n in range(n_rotate): resampled_masked = np.zeros((len(emodes_masked), len(evals)), dtype=emodes.dtype) for j in range(len(groups)): resampled_masked[:, groups[j]] = emodes_masked[:, groups[j]][eigen_spins[n, :, j]] resampled_emodes[n, mask] = resampled_masked return resampled_emodes def spin_single(emodes, evals, mask=None, eigen_spins=None, num_modes=100, return_masked=False, **kwargs): if eigen_spins is None: eigen_spins = gen_eigensamples(emodes, evals, mask=mask, n_rotate=1, num_modes=num_modes, **kwargs)[0] emodes_masked = copy.deepcopy(emodes[:, :num_modes]) emodes_masked = emodes_masked[mask] evals = copy.deepcopy(evals[:num_modes]) resampled_emodes = np.zeros((len(emodes), len(evals)), dtype=emodes.dtype) groups = _get_eigengroups(emodes_masked) resampled_masked = np.zeros((len(emodes_masked), len(evals)), dtype=emodes.dtype) for j in range(len(groups)): resampled_masked[:, groups[j]] = emodes_masked[:, groups[j]][eigen_spins[:, j]] if return_masked is True: return resampled_masked resampled_emodes[mask] = resampled_masked return resampled_emodes
[docs] def compute_normals(coords, faces): """ Compute the normal for each vertex in the mesh. """ normals = np.zeros(coords.shape, dtype=coords.dtype) for i in range(faces.shape[0]): fn = np.cross(coords[faces[i, 1]] - coords[faces[i, 0]], coords[faces[i, 2]] - coords[faces[i, 0]]) normals[faces[i, 0]] += fn normals[faces[i, 1]] += fn normals[faces[i, 2]] += fn norms = np.sqrt((normals ** 2).sum(axis=1)) normals /= norms[:, np.newaxis] return normals
[docs] def inflate_mesh(*, vertices=None, faces=None, iterations=10, smoothing_factor=1., inflation_factor=0.1, normalize=False): """ Inflate the mesh using Connectome Workbench `surface-inflation` Parameters ---------- (vertices, faces) : mesh-like tuple or str Surface to inflate. Can also be a filename to a surface. iterations : int, optional Number of smoothing iterations, default 10. smoothing_factor : float, optional Sigma of Gaussian smoothing kernel, default 1. inflation_factor : float, optional How much to inflate the surface, used for soft correction of surface area distortions, default 0.1. normalize : bool, optional Normalize surface area of inflated surface to original surface. Hard correction of surface area distortions (forces surface area of inflated surface to be the same as the original - may induce distortions of its own, needs more testing). Default False. """ new_vertices = vertices for _ in range(iterations): normals = compute_normals(vertices, faces) new_vertices += normals * inflation_factor return new_vertices
[docs] def nearest_neighbor(P, X, radius=None): """ Find the one-nearest neighbors of vertices in points `P` on another surface `X` using Delaunay triangulation and KDTree query. Parameters ---------- P : np.ndarray of shape (N,3) Points to search for within the coordinate set of `X`. `P` can be a single point X : np.ndarray of shape (M,3) Vertices of the surface to search within radius : float Radius to search for nearest neighbors within Returns ------- nearest_indexes : int Indexes of one-nearest neighbors of vertices in `P`. Note that if two vertices in `X` are the same distance away from a point in `P`, function returns only the first one. """ # Create Delaunay triangulation for first surface tri = Delaunay(X) # Create tree of vertices to query on kdtree = KDTree(X) indices = np.empty(P.shape[0], dtype=int) for i, p in enumerate(P): simplex_index = tri.find_simplex(p) if simplex_index == -1 or (radius is not None and not _is_point_within_radius(p, X[tri.simplices[simplex_index]], radius)): _, nearest_neighbor_index = kdtree.query(p) else: simplex_vertices = X[tri.simplices[simplex_index]] dist = np.linalg.norm(simplex_vertices - p, axis=1) if radius is not None: valid_indices = np.where(dist <= radius)[0] if valid_indices.size == 0: _, nearest_neighbor_index = kdtree.query(p) else: nearest_neighbor_index = tri.simplices[simplex_index][valid_indices[np.argmin(dist[valid_indices])]] else: nearest_neighbor_index = tri.simplices[simplex_index][np.argmin(dist)] indices[i] = nearest_neighbor_index return indices
def _is_point_within_radius(p, vertices, radius): """ Check if a point is within a given radius of any vertex in a set of vertices. """ return np.any(np.linalg.norm(vertices - p, axis=1) <= radius)
[docs] def calc_volume(nifti_input_filename): """Calculate the physical volume of the ROI in the nifti file. Parameters ---------- nifti_input_filename : str Filename of input volume where the relevant ROI have voxel values = 1 Returns ------ ROI_number : int Total number of non-zero voxels ROI_volume : float Total volume of non-zero voxels in physical dimensions """ # Load data ROI_data = nib.load(nifti_input_filename) roi_data = ROI_data.get_fdata() # Get voxel dimensions in mm voxel_dims = (ROI_data.header["pixdim"])[1:4] voxel_vol = np.prod(voxel_dims) # Compute volume ROI_number = np.count_nonzero(roi_data) ROI_volume = ROI_number * voxel_vol # print("Number of non-zero voxels = {}".format(ROI_number)) # print("Volume of non-zero voxels = {} mm^3".format(ROI_volume)) return ROI_number, ROI_volume
[docs] def normalize_vtk(tet, nifti_input_filename, normalization_type='none', normalization_factor=1): """Normalize tetrahedral surface. Parameters ---------- tet : lapy compatible object Loaded vtk object corresponding to a surface tetrahedral mesh nifti_input_filename : str Filename of input volume where the relevant ROI have voxel values = 1 normalization_type : str (default: 'none') Type of normalization number - normalization with respect to the total number of non-zero voxels volume - normalization with respect to the total volume of non-zero voxels in physical dimensions constant - normalization with respect to a chosen constant others - no normalization normalization_factor : float (default: 1) Factor to be used in a constant normalization Returns ------ tet_norm : lapy compatible object Loaded vtk object corresponding to the normalized surface tetrahedral mesh """ nifti_input_file_head, nifti_input_file_tail = os.path.split(nifti_input_filename) nifti_input_file_main, nifti_input_file_ext = os.path.splitext(nifti_input_file_tail) ROI_number, ROI_volume = calc_volume(nifti_input_filename) # normalization process tet_norm = tet if normalization_type == 'number': tet_norm.v = tet.v/(ROI_number**(1/3)) elif normalization_type == 'volume': tet_norm.v = tet.v/(ROI_volume**(1/3)) elif normalization_type == 'constant': tet_norm.v = tet.v/(normalization_factor**(1/3)) else: pass # writing normalized surface to a vtk file if normalization_type == 'number' or normalization_type == 'volume' or normalization_type == 'constant': surface_output_filename = nifti_input_filename + '_norm=' + normalization_type + '.tetra.vtk' f = open(surface_output_filename, 'w') f.write('# vtk DataFile Version 2.0\n') f.write(nifti_input_file_tail + '\n') f.write('ASCII\n') f.write('DATASET POLYDATA\n') f.write('POINTS ' + str(np.shape(tet.v)[0]) + ' float\n') for i in range(np.shape(tet.v)[0]): f.write(' '.join(map(str, tet_norm.v[i, :]))) f.write('\n') f.write('\n') f.write('POLYGONS ' + str(np.shape(tet.t)[0]) + ' ' + str(5 * np.shape(tet.t)[0]) + '\n') for i in range(np.shape(tet.t)[0]): f.write(' '.join(map(str, np.append(4, tet.t[i, :])))) f.write('\n') f.close() return tet_norm
[docs] def euclidean_distmat(mesh): """ Calculates Euclidean distance matrix of the mesh in `mesh` Parameters ---------- mesh : lapy compatible object TriaMesh or TetMesh Returns ------- ndarray of shape (N,N) Euclidean distance matrix of mesh with `N` vertices. """ verts = mesh.v return squareform(pdist(verts))
"""Handle optional dependency imports. Inspired from pandas: https://pandas.pydata.org/ """ import importlib # A mapping from import name to package name (on PyPI) when the package name # is different. INSTALL_MAPPING = { "sksparse": "scikit-sparse", } def import_optional_dependency( name: str, extra: str = "", raise_error: bool = True, ): """Import an optional dependency. By default, if a dependency is missing an ImportError with a nice message will be raised. Parameters ---------- name : str The module name. extra : str, default="" Additional text to include in the ImportError message. raise_error : bool, default=True What to do when a dependency is not found. * True : Raise an ImportError. * False: Return None. Returns ------- module : Optional[ModuleType] The imported module when found. None is returned when the package is not found and raise_error is False. Raises ------- ImportError dependency not found; see raise_error """ package_name = INSTALL_MAPPING.get(name) install_name = package_name if package_name is not None else name try: module = importlib.import_module(name) except ImportError: if raise_error: raise ImportError( f"Missing optional dependency '{install_name}'. {extra} " f"Use pip or conda to install {install_name}." ) else: return None return module