Source code for nbiatoolkit.dicomsort.dicomsort

# import re, os, sys, shutil
# import pydicom
# import argparse


# from typing import Optional

# from tqdm import tqdm
from tqdm import tqdm
import pydicom
from pydicom.errors import InvalidDicomError
import os
import glob
import shutil
from .helper_functions import parseDICOMKeysFromFormat, sanitizeFileName, _truncateUID
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Union


[docs] def get_dicom_files(sourceDir) -> list[str]: return [ file for file in glob.glob(f"{sourceDir}/**/*.dcm", recursive=True) if os.path.isfile(file) ]
[docs] def read_in_dicom_file(filePath: str) -> pydicom.FileDataset: if not os.path.isfile(filePath): raise ValueError(f"The file {filePath} does not exist.") try: dataset: pydicom.FileDataset = pydicom.dcmread( filePath, stop_before_pixels=True ) return dataset except InvalidDicomError as e: raise InvalidDicomError(f"Error reading file {filePath}: {e}") except Exception as e: raise e
[docs] def generateFilePathFromDICOMAttributes( dataset: pydicom.Dataset, targetPattern: str, truncateUID: bool, sanitizeFilename: bool, ) -> str: """ Generate a file path for the DICOM file by formatting DICOM attributes. """ fmt: str keys: list[str] fmt, keys = parseDICOMKeysFromFormat(targetPattern=targetPattern) replacements: dict[str, str] = {} for key in keys: # Retrieve the attribute value if it exists or default to a placeholder string value = str(getattr(dataset, key, "Unknown" + key)) # if value is exactly "UnknownInstanceNumber", replace it with "1" if value == "UnknownInstanceNumber": value = "1" value = ( _truncateUID(uid=value, lastDigits=5) if key.endswith("UID") and truncateUID else value ) replacements[key] = sanitizeFileName(value) if sanitizeFilename else value # Build the target path by interpolating the replacement values return fmt % replacements
[docs] def sortSingleDICOMFile( filePath: str, shutil_option: str, overwrite: bool, destinationDir: str, targetPattern: str, truncateUID: bool, sanitizeFilename: bool, ) -> bool: assert shutil_option in [ "copy", "move", ], "Invalid shutil_option symlink not implemented yet" dataset: pydicom.FileDataset = read_in_dicom_file(filePath) if dataset is None: return False name = generateFilePathFromDICOMAttributes( dataset, targetPattern, truncateUID, sanitizeFilename ) assert name is not None and isinstance(name, str) targetFilename = os.path.join(destinationDir, name) if os.path.exists(targetFilename) and not overwrite: print(f"Source File: {filePath}\n") print(f"File {targetFilename} already exists. ") raise ValueError( "Pattern is probably not unique or overwrite is set to False. Exiting." ) os.makedirs(os.path.dirname(targetFilename), exist_ok=True) if shutil_option == "copy": shutil.copyfile(src=filePath, dst=targetFilename) elif shutil_option == "move": shutil.move(src=filePath, dst=targetFilename) else: raise ValueError(f"Invalid option: {shutil_option}") return True
[docs] class DICOMSorter: def __init__( self, sourceDir: str, destinationDir: str, targetPattern: str = "%PatientName/%SeriesNumber-%SeriesInstanceUID/%InstanceNumber.dcm", truncateUID: bool = True, sanitizeFilename: bool = True, ): self.targetPattern = targetPattern self.sourceDir = sourceDir self.destinationDir = destinationDir self.truncateUID = truncateUID self.sanitizeFilename = sanitizeFilename
[docs] def sortDICOMFiles( self, shutil_option: str = "copy", overwrite: bool = False, n_parallel: int = 1, progressbar=True, ) -> bool: # get the list of dicom files dicom_file_paths = get_dicom_files(self.sourceDir) if n_parallel == 1: for path in dicom_file_paths: if not sortSingleDICOMFile( filePath=path, shutil_option=shutil_option, overwrite=overwrite, destinationDir=self.destinationDir, targetPattern=self.targetPattern, truncateUID=self.truncateUID, sanitizeFilename=self.sanitizeFilename, ): return False # If any errors occurred during sorting, return False else: with ThreadPoolExecutor(max_workers=n_parallel) as executor: futures = [ executor.submit( sortSingleDICOMFile, filePath=path, shutil_option=shutil_option, overwrite=overwrite, destinationDir=self.destinationDir, targetPattern=self.targetPattern, truncateUID=self.truncateUID, sanitizeFilename=self.sanitizeFilename, ) for path in dicom_file_paths ] if progressbar: futures = tqdm( as_completed(futures), total=len(futures), ncols=100, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} Files Processed", ) for future in futures: if not future.result(): return ( False # If any errors occurred during sorting, return False ) return True # If no errors, return True
# Removed sortSingleDICOMFile and generateFilePathFromDICOMAttributes methods here. # class DICOMSorter: # def __init__( # self, # sourceDir: str, # destinationDir: str, # targetPattern: str = "%PatientName/%SeriesNumber-%SeriesInstanceUID/%InstanceNumber.dcm", # truncateUID: bool = True, # sanitizeFilename: bool = True, # ): # """ # Initialize the DICOMSorter with a target pattern. # """ # self.targetPattern = targetPattern # self.sourceDir = sourceDir # self.destinationDir = destinationDir # self.truncateUID = truncateUID # self.sanitizeFilename = sanitizeFilename # def generateFilePathFromDICOMAttributes(self, dataset: pydicom.FileDataset) -> str: # """ # Generate a file path for the DICOM file by formatting DICOM attributes. # """ # fmt, keys = parseDICOMKeysFromFormat(targetPattern=self.targetPattern) # replacements: dict[str, str] = {} # # Prepare the replacements dictionary with sanitized attribute values # for key in keys: # # Retrieve the attribute value if it exists or default to a placeholder string # value = str(getattr(dataset, key, "Unknown" + key)) # value = ( # truncateUID(value) # if key.endswith("UID") and self.truncateUID # else value # ) # replacements[key] = ( # sanitizeFileName(value) if self.sanitizeFilename else value # ) # # Build the target path by interpolating the replacement values # return fmt % replacements # def sortSingleDICOMFile( # self, filePath: str, option: str, overwrite: bool = False # ) -> bool: # assert option in ["copy", "move"], "Invalid option: symlink not implemented yet" # try: # dataset: pydicom.FileDataset = pydicom.dcmread( # filePath, stop_before_pixels=True # ) # except InvalidDicomError as e: # print(f"Error reading file {filePath}: {e}") # return False # except TypeError as e: # print( # f"Error reading file {filePath}: is ``None`` or of an unsupported type." # ) # return False # name = self.generateFilePathFromDICOMAttributes(dataset) # assert name is not None and isinstance(name, str) # targetFilename = os.path.join(self.destinationDir, name) # if os.path.exists(targetFilename) and not overwrite: # print(f"Source File: {filePath}\n") # print(f"File {targetFilename} already exists. ") # raise ValueError( # "Pattern is probably not unique or overwrite is set to False. Exiting." # ) # os.makedirs(os.path.dirname(targetFilename), exist_ok=True) # if option == "copy": # shutil.copyfile(src=filePath, dst=targetFilename) # elif option == "move": # shutil.move(src=filePath, dst=targetFilename) # else: # raise ValueError(f"Invalid option: {option}") # return True # def sortDICOMFiles( # self, option: str = "copy", overwrite: bool = False, nParallel: int = 1 # ) -> bool: # dicom_file_paths = _get_dicom_files(self.sourceDir) # result = [] # print("Running with {} parallel threads".format(nParallel)) # with ThreadPoolExecutor(max_workers=nParallel) as executor: # futures = [ # executor.submit(self.sortSingleDICOMFile, filePath, option, overwrite) # for filePath in dicom_file_paths # ] # for future in tqdm(as_completed(futures), total=len(futures)): # result.append(future.result()) # return all(result) # def _get_dicom_files(sourceDir) -> "list[str]": # dicom_file_paths = [] # # Iterate over all files in the source directory # for root, dirs, files in os.walk(sourceDir): # for f in files: # dicom_file_paths.append(os.path.join(root, f)) if f.endswith( # ".dcm" # ) else None # return dicom_file_paths # def _read_in_dicom_file(filePath: str) -> Optional[pydicom.FileDataset]: # try: # dataset: pydicom.FileDataset = pydicom.dcmread( # filePath, stop_before_pixels=True # ) # return dataset # except InvalidDicomError as e: # print(f"Error reading file {filePath}: {e}") # return None # except TypeError as e: # print(f"Error reading file {filePath}: is ``None`` or of an unsupported type.") # return None # sourceDir = "/Users/bhklab/Documents/GitHub/NBIA-toolkit/NBIA-Download" # files = _get_dicom_files(sourceDir=sourceDir) # dcm_list = [] # for file in files: # dcm = _read_in_dicom_file(file) # dcm_list.append(dcm)