Source code for dicom_csv.aggregation

"""Tools for grouping DICOM metadata into images."""
from typing import Callable, Sequence, Union

import pandas as pd
import numpy as np

from .spatial import get_voxel_spacing, get_orientation_matrix, get_image_position_patient, order_series
from .misc import stack_images
from .utils import Series

__all__ = 'aggregate_images', 'normalize_identifiers', 'select'


def _remove_dots(x):
    try:
        return str(int(float(x)))
    except ValueError:
        return x


[docs]def aggregate_images(metadata: pd.DataFrame, by: Union[str, Sequence[str]], process_series: Callable = None) -> pd.DataFrame: """ Groups DICOM ``metadata`` into images (series). Parameters ---------- metadata a dataframe with metadata returned by `join_tree`. by a list of column names by which the grouping will be performed. Default columns are: PatientID, SeriesInstanceUID, StudyInstanceUID, PathToFolder, PixelArrayShape, SequenceName. process_series a function that processes an aggregated series before it will be joined into a single entry References ---------- See the :doc:`tutorials/dicom` tutorial for more details. Notes ----- The following columns are added: | SlicesCount: the number of files/slices in the image. | FileNames: a list of slash ("/") separated file names. | InstanceNumbers: (if InstanceNumber is in columns) a list of comma separated InstanceNumber values. The following columns are removed: FileName (replaced by FileNames), InstanceNumber (replaced by InstanceNumbers), any other columns that differ from file to file. """ def get_unique_cols(df): # TODO: deal with float precision errors return [col for col in df.columns if len(df[col].dropna().unique()) == 1] def process_group(entry): if process_series is not None: entry = process_series(entry) res = entry.iloc[[0]][get_unique_cols(entry)] res['FileNames'] = '/'.join(entry.FileName) res['SlicesCount'] = len(entry) # TODO: move the saved fields to arguments try: res['InstanceNumbers'] = ','.join(map(_remove_dots, entry.InstanceNumber)) except (ValueError, TypeError): res['InstanceNumbers'] = None if 'SliceLocation' in entry: res['SliceLocations'] = ','.join(entry.SliceLocation.astype(str)) for position in ['ImagePositionPatient0', 'ImagePositionPatient1', 'ImagePositionPatient2']: if position in entry: res[f'{position}s'] = ','.join(entry[position].astype(str)) if 'SOPInstanceUID' in entry: res['SOPInstanceUIDs'] = ','.join(entry.SOPInstanceUID.astype(str)) return res.drop(['FileName'], axis=1, errors='ignore') if isinstance(by, str): by = [by] else: by = list(by) not_string = metadata[by].applymap(lambda x: not isinstance(x, str)).any() if not_string.any(): not_strings = ', '.join(not_string.index[not_string]) raise ValueError(f'The following columns do not contain only strings: {not_strings}. ' 'You should probably check for NaN values.') return metadata.groupby(by).apply(process_group).reset_index(drop=True)
[docs]def normalize_identifiers(metadata: pd.DataFrame) -> pd.DataFrame: """ Converts PatientID to str and fills nan values in SequenceName. Notes ----- The input dataframe will be mutated. """ metadata['PatientID'] = metadata.PatientID.apply(_remove_dots) if 'SequenceName' in metadata: metadata.SequenceName.fillna('', inplace=True) return metadata
[docs]def select(dataframe: pd.DataFrame, query: str, **where: str) -> pd.DataFrame: query = ' '.join(query.format(**where).splitlines()) return dataframe.query(query).dropna(axis=1, how='all').dropna(axis=0, how='all')
def _get_nifti_header(shape: tuple): from nibabel import Nifti1Header header = Nifti1Header() header.set_data_shape(shape) header.set_dim_info(slice=2) header.set_xyzt_units('mm') return header def _get_affine(om: np.ndarray, pos: list, voxel: list): voxel = np.diag(voxel) OM = np.eye(4) om = om @ voxel OM[:3, :3] = om OM[:3, 3] = pos return OM def get_nifti(series: Series, mask: np.ndarray = None): """ Construct NIFTI image from list of DICOMs. """ from nibabel import Nifti1Image series = order_series(series) image = stack_images(series) om = get_orientation_matrix(series) pos = list(get_image_position_patient(series)[0]) voxel = list(get_voxel_spacing(series)) affine = _get_affine(om, pos, voxel) header = _get_nifti_header(image.shape) if mask is None: return Nifti1Image(image, affine, header=header) return Nifti1Image(image, affine, header=header), Nifti1Image(mask, affine, header=header)