Source code for eigenstrapping.datasets.base

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Data fetchers
"""

import os.path as op
from pathlib import Path
import shutil

import numpy as np
from neuromaps import datasets, images
import nibabel as nib
from .utils import (get_data_dir, _groupby_match,
                    get_dataset_info, _match_files)
from eigenstrapping import dataio

try:  # Nilearn <=0.10 exposed a private ``fetch_single_file`` helper in
    # ``nilearn.datasets._utils``. This was removed in newer versions in
    # favour of ``_fetch_file`` in ``nilearn.datasets.utils``. Import from the
    # new location and fallback for older releases if needed.
    from nilearn.datasets.utils import _fetch_file
    fetch_single_file = _fetch_file  # backwards compat alias
except Exception:  # pragma: no cover - maintain previous import path
    from nilearn.datasets._utils import fetch_single_file  # type: ignore

try:  # pragma: no cover - optional dependency
    import gdown
except Exception:  # pragma: no cover
    gdown = None

[docs] def load_surface_examples(data_dir=None, with_surface=False): """ Downloads all surface files for fsaverage5 space Parameters ---------- data_dir : str, optional Path to use as data directory. If not specified, will check for environmental variable 'EIGEN_DATA'; if that is not set, will use `~/eigenstrapping-data` instead. Default: None with_surface : str, optional Flag whether to return the surfaces or not. Default False. Returns ------- tuple of ndarrays Maps, emodes, and evals for left and right hemispheres. """ space = 'fsaverage' version = 'fsaverage5' den = '10k' surfs = [None] * 2 data = [None] * 2 emodes = [None] * 2 evals = [None] * 2 masks = [None] * 2 for i, side in enumerate(['lh', 'rh']): if side == 'lh': hemi = 'L' else: hemi = 'R' surfs[i] = fetch_data(name='surfaces', space=space, den=den, hemi=side) data[i] = nib.load(datasets.fetch_annotation(source='abagen', hemi=hemi, return_single=True)).agg_data() emodes_file = fetch_data(name='eigenmodes', space=space, den=den, hemi=side, format='emodes') emodes[i] = np.loadtxt(emodes_file) evals_file = fetch_data(name='eigenmodes', space=space, den=den, hemi=side, format='evals') evals[i] = np.loadtxt(evals_file) masks[i] = nib.load(getattr(getattr(datasets.fetch_fsaverage(density='10k'), 'medial'), hemi)).agg_data() data[i][~masks[i]] = np.nan if with_surface: return surfs[0], surfs[1], data[0], data[1], emodes[0], emodes[1], evals[0], evals[1] return data[0], data[1], emodes[0], emodes[1], evals[0], evals[1]
[docs] def load_genepc(join=False): """ Download and return the Allen Human Brain Atlas gene PC1 in fsaverage5 space. Parameters ---------- join : bool, optional If True, return both hemispheres in one array. If False, return two hemispheres separately. Default is False. Returns ------- tuple of ndarrays of shape (10242,) (10242,) if join is True then returns ndarray of shape (20484,) Gene PC1 data in fsaverage5 space """ genepc = datasets.fetch_annotation(desc='genepc1', return_single=True) data = images.load_data(genepc) if join: return data data_lh, data_rh = data.reshape((2, 10242)) return data_lh, data_rh
[docs] def load_distmat(space='fsaverage', den='10k', data_dir=None, hemi='lh', sort=False): """ Downloads geodesic distance matrices. If the dense distance matrix is retrieved, there is an option to return the sorted distance matrix and index memmapped, ala variogram calculation in ``brainsmash`` in `sort=True`. If `sort=False`, the unsorted distance matrix (as per normal) is returned. Parameters ---------- space : str, optional Which space of the files to get. Default is 'fsaverage5' data_dir : str, optional Path to use as data directory. If not specified, will check for environmental variable 'E_DATA'; if that is not set, will use `~/e-data` instead. Default: None hemi : str, optional Which hemisphere to load data for. Default 'lh' parcellated : bool, optional If True, return parcellated format of distance matrix. If False, return sorted dense distance matrix. sort : bool, optional If True, return sorted dense distance matrix and index memory-mapped using `.utils.txt2memmap()` for variogram calculation. Does nothing if `parcellation` is passed. Returns ------- if parcellated: ndarray Parcellated distance matrix else: if sort: tuple of ndarrays Sorted distance matrix and index for hemisphere in 'lh' else: ndarray Dense geodesic distance matrix """ # rm parcellated? # if parcellated: # distmat = fetch_data(name='distmat', space=space, den=den, # hemi=hemi, format='parc') # return distmat distmat = fetch_data(name='distmat', space=space, den=den, hemi=hemi, format='dense') if sort: outfiles = txt2memmap(distmat, get_data_dir(data_dir)) D = load_memmap(outfiles['distmat']) index = load_memmap(outfiles['index']) return D, index else: return distmat
[docs] def load_subcort(structure='tha', data_dir=None): """ Loads all the files for the subcortical structures used in the paper - cortical-subcortical gradients and the subcortical ROIs from the 25% probability Harvard-Oxford atlas. Parameters ---------- structure : str, optional Which subcortical structure to return. Options are 'tha', 'hippo' or 'stri'. Default is 'tha'. Returns ------- data : nib.Nifti1Image Subcortical data of cortico-subcortical gradients in MNI152 space. mask : nib.Nifti1Image Mask image of subcortical structure in MNI152 space (i.e., 1 inside structure and 0 outside structure) """ path = op.dirname(__file__) data = op.join(path, 'HarvardOxford', f'hcp_{structure}-lh_gradient_1.txt') mask = op.join(path, 'HarvardOxford', f'hcp_{structure}-lh_thr25.nii.gz') return data, mask
def load_native_tutorial(data_dir=None): """ Download HCP native surface and data for demonstration purposes. Parameters ---------- data_dir : str, optional Path to use . The default is Npne. Returns ------- native : dict Dictionary of filenames """ path = op.dirname(__file__) native_surface = op.join(path, 'HCP', '102816.L.midthickness_MSMAll.164k_fs_LR.surf.gii') native_thickness = op.join(path, 'HCP', '102816.L.MyelinMap_BC.164k_fs_LR.func.gii') native_myelin = op.join(path, 'HCP', '102816.L.corrThickness.164k_fs_LR.shape.gii') return {'surface' : native_surface, 'thickness' : native_thickness, 'myelin': native_myelin}
[docs] def fetch_data(*, name=None, space=None, den=None, res=None, hemi=None, tags=None, format=None, data_dir=None, verbose=1): """ Downloads files for brain surfaces and eigenmodes matching requested variables Parameters ---------- name, space, den, res, hemi, tags, format : str or list-of-str Values on which to match surfaces. If not specified surfaces with any value for the relevant key will be matched. Default: None data_dir : str, optional Path to use as data directory. If not specified, will use `~/e-data` instead. Default: None verbose : int, optional Modifies verbosity of download, where higher numbers mean more updates. Default: 1 Returns ------- data : dict Dictionary of downloaded annotations where dictionary keys are tuples (space, den/res) and values are lists of corresponding filenames """ # check input parameters to ensure we're fetching _something_ supplied = False for val in (space, den, res, hemi, tags, format): if val is not None: supplied = True break if not supplied: raise ValueError('Must provide at least one parameters on which to ' 'match files. If you want to fetch all ' 'annotations set any of the parameters to "all".') # get info on datasets we need to fetch data_dir = get_data_dir(data_dir=data_dir) info = _match_files(get_dataset_info(name), space=space, den=den, res=res, hemi=hemi, tags=tags, format=format) if verbose > 1: print(f'Identified {len(info)} datsets matching specified parameters') # TODO: current work-around to handle that _fetch_files() does not support # session instances. hopefully a future version will and we can just use # that function to handle this instead of calling _fetch_file() directly data = [] for dset in info: fn = Path(data_dir) / dset['rel_path'] / dset['fname'] if not fn.exists(): fn.parent.mkdir(parents=True, exist_ok=True) url = dset.get('redir') or dset['url'] if isinstance(url, str) and 'drive.google.com' in url: if gdown is None: raise ImportError('Downloading from Google Drive requires `gdown`') gdown.download(url, str(fn), quiet=verbose == 0, fuzzy=True) else: dl_file = fetch_single_file(url, fn.parent, verbose=verbose, md5sum=dset['checksum']) shutil.move(str(dl_file), fn) data.append(str(fn)) if len(data) == 1: return data[0] return _groupby_match(data)
[docs] def txt2memmap(dist_file, output_dir, prefix=None, suffix=None, maskfile=None, delimiter=' '): """ Export distance matrix to memory-mapped array. Parameters ---------- dist_file : filename Path to `delimiter`-separated distance matrix file output_dir : filename Path to directory in which output files will be written prefix : str Prefix to add to start of filenames {'distmat.npy', 'index.npy'}, default None suffix : str Suffix to add to end of filenames {'distmat.npy', 'index.npy'}, default None maskfile : filename or np.ndarray or None, default None Path to a neuroimaging/txt file containing a mask, or a mask represented as a numpy array. Mask scalars are cast to boolean, and all elements not equal to zero will be masked. delimiter : str Delimiting character in `dist_file` Returns ------- dict Keys are 'D' and 'index'; values are absolute paths to the corresponding binary files on disk. Notes ----- Each row of the distance matrix is sorted before writing to file. Thus, a second mem-mapped array is necessary, the i-th row of which contains argsort(d[i]). If `maskfile` is not None, a binary mask.txt file will also be written to the output directory. Raises ------ IOError : `output_dir` doesn't exist ValueError : Mask image and distance matrix have inconsistent sizes """ nlines = _count_lines(dist_file) if not op.exists(output_dir): raise IOError("Output directory does not exist: {}".format(output_dir)) # Load mask if one was provided if maskfile is not None: mask = dataio(maskfile).astype(bool) if mask.size != nlines: e = "Incompatible input sizes\n" e += "{} rows in {}\n".format(nlines, dist_file) e += "{} elements in {}".format(mask.size, maskfile) raise ValueError(e) mask_fileout = op.join(output_dir, "mask.txt") np.savetxt( # Write to text file fname=mask_fileout, X=mask.astype(int), fmt="%i", delimiter=',') nv = int((~mask).sum()) # number of non-masked elements idx = np.arange(nlines)[~mask] # indices of non-masked elements else: nv = nlines idx = np.arange(nlines) # Build memory-mapped arrays with open(dist_file, 'r') as fp: if prefix is None: prefix = '' if suffix is None: suffix = '' npydfile = op.join(output_dir, f"{prefix}distmat{suffix}.npy") npyifile = op.join(output_dir, f"{prefix}index{suffix}.npy") fpd = np.lib.format.open_memmap( npydfile, mode='w+', dtype=np.float32, shape=(nv, nv)) fpi = np.lib.format.open_memmap( npyifile, mode='w+', dtype=np.int32, shape=(nv, nv)) ifp = 0 # Build memory-mapped arrays one row of distances at a time for il, l in enumerate(fp): # Loop over lines of file if il not in idx: # Keep only CIFTI vertices continue else: line = l.rstrip() if line: data = np.array(line.split(delimiter), dtype=np.float32) if data.size != nlines: raise RuntimeError( "Distance matrix is not square: {}".format( dist_file)) d = data[idx] sort_idx = np.argsort(d) fpd[ifp, :] = d[sort_idx] # sorted row of distances fpi[ifp, :] = sort_idx # sort indexes ifp += 1 del fpd # Flush memory changes to disk del fpi return {'distmat': npydfile, 'index': npyifile} # Return filenames
[docs] def load_memmap(filename): """ Load a memory-mapped array. Parameters ---------- filename : str path to memory-mapped array saved as npy file Returns ------- np.memmap """ return np.load(filename, mmap_mode='r')
def _count_lines(filename): """ Count number of lines in a file. Parameters ---------- filename : filename Returns ------- int number of lines in file """ with open(filename, 'rb') as f: lines = 0 buf_size = 1024 * 1024 read_f = f.raw.read buf = read_f(buf_size) while buf: lines += buf.count(b'\n') buf = read_f(buf_size) return lines