from nibabel.cifti2.cifti2_axes import BrainModelAxis, Axis, ScalarAxis
from nibabel.cifti2 import Cifti2Image, Cifti2Header
from typing import Sequence
import numpy as np
import nibabel as nib
from nibabel.gifti import GiftiImage
from mcot.core.surface import cortical_mesh
import argparse
from fsl.utils.path import hasExt
from ._write_gifti import write_gifti
from . import cifti
from numpy.lib.stride_tricks import as_strided
from contextlib import contextmanager
import dask.array
import os
import shutil
[docs]class GreyOrdinates(object):
"""
Represents data on voxels or vertices
"""
[docs] def __init__(self, data, brain_model_axis: BrainModelAxis, other_axes: Sequence[Axis]=None, parent_file=None):
"""
Defines a new dataset in greyordinate space
:param data: (..., N) array for N greyordinates
:param brain_model_axis: CIFTI axis describing the greyordinate space
:param other_axes: sequence of CIFTI axes describing the other dimensions
:param parent_file: file in which the dataset has been stored
"""
self.data = data
if data.shape[-1] != len(brain_model_axis):
raise ValueError("Last axis of data does not match number of greyordinates")
self.brain_model_axis = brain_model_axis
if other_axes is not None:
if len(other_axes) != self.data.ndim - 1:
raise ValueError("Number of axis does not match dimensionality of the data")
if tuple(len(ax) for ax in other_axes) != self.data.shape[:-1]:
raise ValueError("Size of other axes does not match data size")
self.other_axes = None if other_axes is None else tuple(other_axes)
self.parent_file = parent_file
[docs] def volume(self, ):
"""
Get the volumetric data as a Nifti1Image
"""
if self.brain_model_axis.volume_mask.sum() == 0:
raise ValueError(f"Can not create volume without voxels in {self}")
data = np.full(self.brain_model_axis.volume_shape + self.data.shape[:-1], np.nan,
dtype=self.data.dtype)
voxels = self.brain_model_axis.voxel[self.brain_model_axis.volume_mask]
data[tuple(voxels.T)] = np.transpose(self.data, (-1, ) + tuple(range(self.data.ndim - 1)))[self.brain_model_axis.volume_mask]
return nib.Nifti1Image(data, affine=self.brain_model_axis.affine)
[docs] def surface(self, anatomy, fill=np.nan, partial=False):
"""
Gets a specific surface
:param anatomy: BrainStructure or string like 'CortexLeft' or 'CortexRight'
:param fill: which value to fill the array with if not all vertices are defined
:param partial: only return the part of the surface defined in the greyordinate file (ignores `fill` if set)
:return:
- if not partial: (..., n_vertices) array
- if partial: (N, ) int array with indices on the surface included in (..., N) array
"""
if isinstance(anatomy, str):
anatomy = cortical_mesh.BrainStructure.from_string(anatomy, issurface=True)
if anatomy.cifti not in self.brain_model_axis.name:
raise ValueError(f"No surface data for {anatomy.cifti} found")
slc, bm = None, None
arr = np.full(self.data.shape[:-1] + (self.brain_model_axis.nvertices[anatomy.cifti],), fill,
dtype=self.data.dtype)
for name, slc_try, bm_try in self.brain_model_axis.iter_structures():
if name == anatomy.cifti:
if partial:
if bm is not None:
raise ValueError(f"Surface {anatomy} does not form a contiguous block")
slc, bm = slc_try, bm_try
else:
arr[..., bm_try.vertex] = self.data[..., slc_try]
if not partial:
return arr
else:
return bm.vertex, self.data[..., slc]
[docs] def to_hdf5(self, group, compression='gzip'):
"""
Stores the image in the HDF5 group
"""
other_axes = (None, ) * (self.data.ndim - 1) if self.other_axes is None else self.other_axes
cifti.to_hdf5(group, self.data, other_axes + (self.brain_model_axis, ), compression=compression)
[docs] @classmethod
def from_hdf5(cls, group):
"""
Retrieves data from HDF5 group
"""
data, axes = cifti.from_hdf5(group)
return cls(data, axes[-1], axes[:-1])
[docs] @classmethod
@contextmanager
def empty_hdf5(cls, filename, axes, dtype=float):
"""
Creates an empty greyordinate object based on the axes
Data will be stored on disk in an HDF5 file
:param filename: filename to store the HDF5 file in or HDF5 group
:param axes: cifti2 axes
:param dtype: data type of array
:return: Greyordinate object where the data is stored in the new HDF5 file
"""
import h5py
if isinstance(filename, str):
group = h5py.File(filename, 'w')
to_close = True
else:
group = filename
to_close = False
data = cifti.empty_hdf5(group, axes, dtype)
try:
yield cls(data, axes[-1], axes[:-1], parent_file=group)
except:
if to_close:
os.remove(filename)
raise
if to_close:
group.close()
[docs] @classmethod
@contextmanager
def empty_zarr(cls, filename, axes, dtype=float):
"""
Creates an empty greyordinate object based on the axes
Data will be stored on disk in an HDF5 file
:param filename: filename to store the HDF5 file in or HDF5 group
:param axes: cifti2 axes
:param dtype: data type of array
:return: Greyordinate object where the data is stored in the new HDF5 file
"""
import zarr
if isinstance(filename, str):
group = zarr.group(filename, 'w')
else:
group = filename
data = cifti.empty_zarr(group, axes, dtype)
try:
yield cls(data, axes[-1], axes[:-1], parent_file=group)
except:
to_delete = filename if group is not filename else group.path
if os.path.isdir(to_delete):
shutil.rmtree(to_delete)
raise
[docs] def to_cifti(self, other_axes=None):
"""
Create a CIFTI image from the data
:param other_axes: defines the axes besides the greyordinate one
:return: nibabel CIFTI image
"""
if other_axes is None:
other_axes = self.other_axes
if other_axes is None:
if self.data.ndim != 1:
raise ValueError("Can not store to CIFTI without defining what is stored along the other dimensions")
other_axes = []
if other_axes is not None:
if len(other_axes) != self.data.ndim - 1:
raise ValueError("Number of axis does not match dimensionality of the data")
if tuple(len(ax) for ax in other_axes) != self.data.shape[:-1]:
raise ValueError("Size of other axes does not match data size")
data = self.data
if data.ndim == 1:
data = data[None, :]
other_axes = [ScalarAxis(['default'])]
return Cifti2Image(
data,
header=Cifti2Header.from_axes(list(other_axes) + [self.brain_model_axis])
)
[docs] @classmethod
def from_cifti(cls, filename, writable=False):
"""
Creates new greyordinate object from dense CIFTI file
:param filename: CIFTI filename
:param writable: if True, opens data array in writable mode
"""
if isinstance(filename, str):
img: Cifti2Image = nib.load(filename)
else:
img = filename
if writable:
data = np.memmap(filename, img.dataobj.dtype, mode='r+',
offset=img.dataobj.offset, shape=img.shape, order='F')
else:
data = np.asanyarray(img.dataobj)
axes = [img.header.get_axis(idx) for idx in range(data.ndim)]
if not isinstance(axes[-1], BrainModelAxis):
raise ValueError("Last axis of dense CIFTI file should be a BrainModelAxis")
return GreyOrdinates(data, axes[-1], axes[:-1])
[docs] @classmethod
@contextmanager
def empty_cifti(cls, filename, axes, dtype=float):
"""
Creates an empty greyordinate object based on the axes
Data will be stored on disk in CIFTI format
:param filename: filename to store the CIFTI file in
:param axes: cifti2 axes
:param dtype: data type of array
:return: Greyordinate object where the data is stored in the new HDF5 file
"""
hdr = cifti.cifti2.Cifti2Header.from_axes(axes)
data = np.zeros(1, dtype=dtype)
shape = tuple(len(ax) for ax in axes)
data_shaped = as_strided(data, shape=shape, strides=tuple(0 for _ in axes),
writeable=False)
cifti.cifti2.Cifti2Image(data_shaped, header=hdr).to_filename(filename)
go = cls.from_cifti(filename, writable=True)
try:
yield go
except:
os.remove(filename)
raise
go.data.flush()
del go
[docs] @classmethod
def from_gifti(cls, filename, mask_values=(0, np.nan)):
"""
Creates a new greyordinate object from a GIFTI file
:param filename: GIFTI filename
:param mask_values: values to mask out
:return: greyordinate object representing the unmasked vertices
"""
if isinstance(filename, str):
img = nib.load(filename)
else:
img = filename
datasets = [darr.data for darr in img.darrays]
if len(datasets) == 1:
data = datasets[0]
else:
data = np.concatenate(
[np.atleast_2d(d) for d in datasets], axis=0
)
mask = np.ones(data.shape, dtype='bool')
for value in mask_values:
if value is np.nan:
mask &= ~np.isnan(data)
else:
mask &= ~(data == value)
while mask.ndim > 1:
mask = mask.any(0)
anatomy = cortical_mesh.get_brain_structure(img)
bm_axes = BrainModelAxis.from_mask(mask, name=anatomy.cifti)
return GreyOrdinates(data[..., mask], bm_axes)
[docs] @classmethod
def from_nifti(cls, filename, mask_values=(np.nan, 0)):
"""
Creates a new greyordinate object from a NIFTI file
:param filename: NIFTI filename
:param mask_values: which values to mask out
:return: greyordinate object representing the unmasked voxels
"""
if isinstance(filename, str):
img = nib.load(filename)
else:
img = filename
data = img.get_fdata()
mask = np.ones(data.shape, dtype='bool')
for value in mask_values:
if value is np.nan:
mask &= ~np.isnan(data)
else:
mask &= ~(data == value)
while mask.ndim > 3:
mask = mask.any(-1)
inverted_data = np.transpose(data[mask], tuple(range(1, data.ndim - 2)) + (0, ))
bm_axes = BrainModelAxis.from_mask(mask, affine=img.affine)
return GreyOrdinates(inverted_data, bm_axes)
def __add__(self, other):
"""
Adds the overlapping part of the arrays
Only voxels/vertices in common between the greyordinate spaces are added
"""
if not isinstance(other, GreyOrdinates):
return NotImplemented
if self.other_axes is not None:
if other.other_axes is not None:
if self.other_axes != other.other_axes:
raise ValueError("can not concatenate greyordinates when non-brain-model axes do not match")
other_axes = self.other_axes
else:
other_axes = other.other_axes
new_bm, slices = cifti.combine([self.brain_model_axis, other.brain_model_axis])
return GreyOrdinates(
data=self.data[..., slices[0]] + other.data[..., slices[1]],
brain_model_axis=new_bm,
other_axes=other_axes,
)
[docs] @classmethod
@contextmanager
def empty(cls, filename, axes, dtype=float):
"""
Creates an empty file to store the greyordinates with the type determined by the extension:
- .nii: CIFTI file
- .h5/hdf5/he2/he5: HDF5 file representing CIFTI data
- .zarr: zarr file representing CIFTI data
:param filename: target filename
:param axes: cifti2 axes
:param dtype: data type of array
:return: Greyordinate object where CIFTI data can be stored
"""
formats = {
'zarr': ('.zarr',),
'hdf5': ('.hdf5', '.h5', '.he2', '.he5'),
'cifti': ('.nii',),
}
for format, exts in formats.items():
if hasExt(filename, exts):
break
else:
raise ValueError(f"Extension of {filename} not recognised as CIFTI, HDF5 or zarr file")
with getattr(cls, f'empty_{format}')(filename, axes, dtype) as go:
yield go
[docs] @classmethod
def from_filename(cls, filename, mask_values=(0, np.nan), writable=False):
"""
Reads greyordinate data from the given file
File can be:
- NIFTI mask
- GIFTI mask
- CIFTI file
- HDF5 file representing CIFTI data
- zarr file representing CIFTI data
:param filename: input filename
:param mask_values: which values are outside of the mask for NIFTI or GIFTI input
:param writable: allow to write to disk
:return: greyordinates object
"""
try:
import h5py
except ImportError:
h5py = False
try:
import zarr
except ImportError:
zarr = False
if isinstance(filename, str):
try:
if not h5py:
raise OSError()
img = h5py.File(filename, 'r+' if writable else 'r')
except OSError:
try:
if not zarr:
raise ValueError()
img = zarr.open(filename, mode='r+' if writable else 'r')
except ValueError:
img = nib.load(filename)
else:
img = filename
if h5py and isinstance(img, h5py.Group):
return cls.from_hdf5(img)
if zarr and isinstance(img, zarr.Group):
return cls.from_hdf5(img)
if isinstance(img, nib.Nifti1Image):
if writable:
raise ValueError("Can not open NIFTI file in writable mode")
return cls.from_nifti(filename, mask_values)
if isinstance(img, Cifti2Image):
return cls.from_cifti(filename, writable=writable)
if isinstance(img, GiftiImage):
if writable:
raise ValueError("Can not open GIFTI file in writable mode")
return cls.from_gifti(filename, mask_values)
raise ValueError(f"I do not know how to convert {type(img)} into greyordinates (from {filename})")
[docs] def to_filename(self, filename):
"""
Stores the greyordinate data to the given filename.
Type of storage is determined by the extension of the filename:
- .dscalar/dconn/dlabel.nii: CIFTI file
- .h5/hdf5/he2/he5: HDF5 file representing CIFTI data
- .zarr: zarr file representing CIFTI data
- .gii: GIFTI file (only stores surface data;
raises error if more that one surface is represented in the greyordinates)
- .nii: NIFTI file (only stores the volumetric data)
:param filename: target filename
"""
if hasExt(filename, ('.dscalar.nii', '.dconn.nii', '.dlabel.nii')):
self.to_cifti().to_filename(filename)
elif hasExt(filename, ('.h5', '.hdf5', '.he2', 'he5')):
import h5py
with h5py.File(filename, 'w') as f:
self.to_hdf5(f)
elif hasExt(filename, ('.zarr', )):
import zarr
f = zarr.group(filename)
self.to_hdf5(f)
elif hasExt(filename, ('.gii', )):
surfaces = np.unique(self.brain_model_axis.name[self.brain_model_axis.surface_mask])
if len(surfaces) > 1:
raise ValueError(f"Can not write to GIFTI file as more than one surface has been defined: {surfaces}")
if len(surfaces) == 0:
raise ValueError("Can not write to GIFTI file as no surface has been provided")
write_gifti(filename, [self.surface(surfaces[0])], surfaces[0])
elif hasExt(filename, ('.nii.gz', '.nii')):
self.volume().to_filename(filename)
else:
raise IOError(f"Extension of {filename} not recognized for NIFTI, GIFTI, or CIFTI file")
[docs] def as_dask(self, chunks='auto', name='greyordinates'):
"""
Returns the greyordinates as a dask array
:param chunks: shape of the chunks (defaults to chunks of the dataset)
:param name: name of the dask array
:return: dask array
"""
if chunks == 'auto':
chunks = getattr(self.data, 'chunks', 'auto')
if chunks != 'auto':
size_chunks = np.prod(chunks)
if size_chunks < 2e7:
mult_factor = int((2e7 / size_chunks) ** (1. / self.data.ndim))
chunks = tuple(c * mult_factor for c in chunks)
return dask.array.from_array(self.data, chunks, name)
[docs] def transpose(self, ):
"""
Transposes a dense connectome
"""
if self.data.ndim != 2:
raise ValueError("Can only transpose 2D datasets")
return GreyOrdinates(np.transpose(self.data), brain_model_axis=self.other_axes[0],
other_axes=(self.brain_model_axis, ), parent_file=self.parent_file)
[docs]def stack(greyordinates, axis=0):
"""
Stacks a sequene of greyordinates along the given axis
Resulting GreyOrdinate will only contain voxels/vertices in all GreyOrdinate arrays
:param greyordinates: individual greyordinates to be merged
:return: merged greyordinate object
"""
new_bm, slices = cifti.combine([go.brain_model_axis for go in greyordinates])
new_arr = np.stack([go.data[..., slc] for go, slc in zip(greyordinates, slices)], axis=axis)
ref_axes = set([go.other_axes for go in greyordinates if go.other_axes is not None])
if len(ref_axes) == 0:
other_axes = None
elif len(ref_axes) == 1:
other_axes = list(ref_axes[0])
other_axes.insert(axis, ScalarAxis([f'stacked_{idx + 1}' for idx in range(len(greyordinates))]))
else:
raise ValueError("Failed to merge greyordinates as their other axes did not match")
return GreyOrdinates(new_arr, new_bm, other_axes)
[docs]def concatenate(greyordinates, axis=0):
"""
Stacks a sequene of greyordinates along the given axis
Resulting GreyOrdinate will only contain voxels/vertices in all GreyOrdinate arrays
:param greyordinates: individual greyordinates to be merged
:return: merged greyordinate object
"""
if len(greyordinates) == 1:
return greyordinates[0]
ref_axes = [go.other_axes for go in greyordinates if go.other_axes is not None]
if len(ref_axes) == 0:
other_axes = None
elif any(ra != ref_axes[0] for ra in ref_axes[1:]):
raise ValueError("Failed to merge greyordinates as their other axes did not match")
else:
other_axes = ref_axes[0]
if axis == -1 or axis == greyordinates[0].ndim - 1:
new_bm = greyordinates[0].brain_model_axis
for go in greyordinates[1:]:
new_bm = new_bm + go.brain_model_axis
new_arr = np.concatenate([go.data for go in greyordinates], axis=axis)
return GreyOrdinates(new_arr, new_bm, other_axes)
else:
new_bm, slices = cifti.combine([go.brain_model_axis for go in greyordinates])
new_arr = np.concatenate([go.data[..., slc] for go, slc in zip(greyordinates, slices)], axis=axis)
return GreyOrdinates(new_arr, new_bm, other_axes)
[docs]def parse_greyordinate(filename):
"""
Parses a set of filenames as a single greyordinate object
:param filename: '@'-symbol separated files (NIFTI, GIFTI, and/or CIFTI)
:return: single Greyordinate object representing the full dataset
"""
try:
parts = [GreyOrdinates.from_filename(fn) for fn in filename.split('@')]
except (ValueError, IOError) as e:
raise argparse.ArgumentTypeError(*e.args)
if len(parts) == 0:
raise argparse.ArgumentParser("Can not parse empty string")
return concatenate(parts, axis=-1)