Source code for mcot.core._scripts.split.merge

#!/usr/bin/env python
"""Merge result from individual runs"""
from .run import get_markers
from loguru import logger
import os.path as op
import nibabel as nib
import numpy as np
from subprocess import run
import shutil
import os
import glob
import filecmp


[docs]def merge_files(paths, out_path, clean=True): """ Merges the given files into a single file :param paths: NIFTI filenames :param out_path: output NIFTI file :param clean: whether to clean the input files after merging """ if all(filecmp.cmp(paths[0], p) for p in paths[1:]): if clean: shutil.move(paths[0], out_path) for path in paths[1:]: os.remove(path) else: shutil.copy(paths[0], out_path) return imgs = [nib.load(path) for path in paths] outside_mask = imgs[-1].dataobj[(0, ) * len(imgs[-1].shape)] res = np.full(imgs[0].shape, outside_mask) if not np.isfinite(outside_mask): test_run = np.isfinite else: test_run = lambda arr: arr != outside_mask for img in imgs: data = img.get_data() use = test_run(data) res[use] = data[use] nib.Nifti1Image(res, affine=None, header=imgs[0].header).to_filename(out_path) if clean: for path in paths: os.remove(path)
[docs]def merge_basenames(basenames, out_basename, clean=True): """ Merges all files starting with given basenames into files starting with out_basename :param basenames: input basenames :param out_basename: output basename :param clean: whether to clean the input files after merging """ poss_fn = glob.glob(basenames[0] + '*') for first_fn in poss_fn: all_fn = [first_fn.replace(basenames[0], name) for name in basenames] out_fn = first_fn.replace(basenames[0], out_basename) if all(op.isfile(fn) for fn in all_fn): merge_files(all_fn, out_fn, clean=clean) elif all(op.isdir(fn) for fn in all_fn): merge_directories(all_fn, out_fn, clean=clean) else: logger.warning(f'No merging strategy found for {out_fn}')
[docs]def merge_directories(directories, out_directory, clean=True): """ Merges all files in the input directories into out_directory :param directories: input directories :param out_directory: output directory :param clean: whether to clean the input files after merging """ merge_basenames( [op.join(direc, '') for direc in directories], op.join(out_directory, ''), clean=clean, )
[docs]def run(names, njobs, clean=True): """ Access to the script for other python programs. """ for name in names: if 'JOBID' not in name: raise ValueError("Expects JOBID to be present in every name") paths = [name.replace('JOBID', marker) for marker in get_markers(njobs)] if all(op.isdir(path) for path in paths): merge_directories(paths, name.replace('JOBID', ''), clean=clean) elif all(op.isfile(path) for path in paths): merge_files(paths, name.replace('JOBID', ''), clean=clean) elif all(not op.exists(path) for path in paths): merge_basenames(paths, name.replace('JOBID', ''), clean=clean) else: raise IOError(f"Could not consistently identify {name} as a directory, filename, or basename") pass
[docs]def run_from_args(args): """ Runs the script based on a Namespace containing the command line arguments """ run( args.names, args.njobs, clean=not args.no_clean )
[docs]def add_to_parser(parser): """ Creates the parser of the command line arguments """ parser.add_argument('njobs', type=int, help='number of jobs to merge') parser.add_argument('names', nargs='+', help='names of files/basenames/directories to merge') parser.add_argument('-nc', '--no_clean', action='store_true', help='skip cleaning the data prior to merging')