diff --git a/src/aliby/io/dataset.py b/src/aliby/io/dataset.py index 83f4401631fd82d6f5f4a2a198405771e6b8ae4e..f26baf7f93b361cc0b9e734ca2155e20ea70605f 100644 --- a/src/aliby/io/dataset.py +++ b/src/aliby/io/dataset.py @@ -18,35 +18,40 @@ from aliby.io.image import ImageLocalOME def dispatch_dataset(expt_id: int or str, **kwargs): """ - Choose a subtype of dataset based on the identifier. + Find paths to the data. - Input: - -------- - expt_id: int or string serving as dataset identifier. + Connects to OMERO if data is remotely available. - Returns: - -------- - Callable Dataset instance, either network-dependent or local. - """ - if isinstance(expt_id, int): # Is an experiment online + Parameters + ---------- + expt_id: int or str + To identify the data, either an OMERO ID or an OME-TIFF file or a local directory. + Returns + ------- + A callable Dataset instance, either network-dependent or local. + """ + if isinstance(expt_id, int): + # data available online from aliby.io.omero import Dataset return Dataset(expt_id, **kwargs) - - elif isinstance(expt_id, str): # Files or Dir + elif isinstance(expt_id, str): + # data available locally expt_path = Path(expt_id) if expt_path.is_dir(): + # data in multiple folders return DatasetLocalDir(expt_path) else: + # data in one folder as OME-TIFF files return DatasetLocalOME(expt_path) else: - raise Warning("Invalid expt_id") + raise Warning(f"{expt_id} is an invalid expt_id") class DatasetLocalABC(ABC): """ - Abstract Base class to fetch local files, either OME-XML or raw images. + Abstract Base class to find local files, either OME-XML or raw images. """ _valid_suffixes = ("tiff", "png") @@ -73,12 +78,9 @@ class DatasetLocalABC(ABC): def unique_name(self): return self.path.name - @abstractproperty - def date(self): - pass - @property def files(self): + """Return a dictionary with any available metadata files.""" if not hasattr(self, "_files"): self._files = { f: f @@ -91,34 +93,35 @@ class DatasetLocalABC(ABC): return self._files def cache_logs(self, root_dir): - # Copy metadata files to results folder + """Copy metadata files to results folder.""" for name, annotation in self.files.items(): shutil.copy(annotation, root_dir / name.name) return True + @abstractproperty + def date(self): + pass + @abstractmethod def get_images(self): - # Return a dictionary with the name of images and their unique identifiers pass class DatasetLocalDir(DatasetLocalABC): - """ - Organise an entire dataset, composed of multiple images, as a directory containing directories with individual files. - It relies on ImageDir to manage images. - """ + """Find paths to a data set, comprising multiple images in different folders.""" def __init__(self, dpath: t.Union[str, PosixPath], *args, **kwargs): super().__init__(dpath) @property def date(self): - # Use folder creation date, for cases where metadata is minimal + """Find date when a folder was created.""" return time.strftime( "%Y%m%d", time.strptime(time.ctime(os.path.getmtime(self.path))) ) def get_images(self): + """Return a dictionary of folder names and their paths.""" return { folder.name: folder for folder in self.path.glob("*/") @@ -131,13 +134,7 @@ class DatasetLocalDir(DatasetLocalABC): class DatasetLocalOME(DatasetLocalABC): - """Load a dataset from a folder - - We use a given image of a dataset to obtain the metadata, - as we cannot expect folders to contain this information. - - It uses the standard OME-TIFF file format. - """ + """Find names of images in a folder, assuming images in OME-TIFF format.""" def __init__(self, dpath: t.Union[str, PosixPath], *args, **kwargs): super().__init__(dpath) @@ -145,11 +142,11 @@ class DatasetLocalOME(DatasetLocalABC): @property def date(self): - # Access the date from the metadata of the first position + """Get the date from the metadata of the first position.""" return ImageLocalOME(list(self.get_images().values())[0]).date def get_images(self): - # Fetches all valid formats and overwrites if duplicates with different suffix + """Return a dictionary with the names of the image files.""" return { f.name: str(f) for suffix in self._valid_suffixes diff --git a/src/aliby/pipeline.py b/src/aliby/pipeline.py index 2977cada71b37a131f30e25103d6d668853878d0..d401d3ae037e4d95fce183d83514b0c6d95163c4 100644 --- a/src/aliby/pipeline.py +++ b/src/aliby/pipeline.py @@ -36,23 +36,14 @@ from postprocessor.core.processor import PostProcessor, PostProcessorParameters class PipelineParameters(ParametersABC): - """ - Parameters that host what is run and how. It takes a list of dictionaries, one for - general in collection: - pass dictionary for each step - -------------------- - expt_id: int or str Experiment id (if integer) or local path (if string). - directory: str Directory into which results are dumped. Default is "../data" - - Provides default parameters for the entire pipeline. This downloads the logfiles and sets the default - timepoints and extraction parameters from there. - """ + """Define parameters for the different steps of the pipeline.""" _pool_index = None def __init__( self, general, tiler, baby, extraction, postprocessing, reporting ): + """Initialise, but called by a class method not directly.""" self.general = general self.tiler = tiler self.baby = baby @@ -69,13 +60,34 @@ class PipelineParameters(ParametersABC): extraction={}, postprocessing={}, ): + """ + Initialise parameters for steps of the pipeline. + + Some parameters are extracted from the log files. + + Parameters + --------- + general: dict + Parameters to set up the pipeline. + tiler: dict + Parameters for tiler. + baby: dict (optional) + Parameters for Baby. + extraction: dict (optional) + Parameters for extraction. + postprocessing: dict (optional) + Parameters for post-processing. + """ + # Alan: should 19993 be updated? expt_id = general.get("expt_id", 19993) if isinstance(expt_id, PosixPath): expt_id = str(expt_id) general["expt_id"] = expt_id + # Alan: an error message rather than a default might be better directory = Path(general.get("directory", "../data")) + # get log files, either locally or via OMERO with dispatch_dataset( expt_id, **{k: general.get(k) for k in ("host", "username", "password")}, @@ -83,7 +95,7 @@ class PipelineParameters(ParametersABC): directory = directory / conn.unique_name if not directory.exists(): directory.mkdir(parents=True) - # Download logs to use for metadata + # download logs for metadata conn.cache_logs(directory) try: meta_d = MetaData(directory, None).load_logs() @@ -95,9 +107,10 @@ class PipelineParameters(ParametersABC): "channels": ["Brightfield"], "ntps": [2000], } - # Set minimal metadata + # set minimal metadata meta_d = minimal_default_meta + # define default values for general parameters tps = meta_d.get("ntps", 2000) defaults = { "general": dict( @@ -118,7 +131,8 @@ class PipelineParameters(ParametersABC): ) } - for k, v in general.items(): # Overwrite general parameters + # update default values using inputs + for k, v in general.items(): if k not in defaults["general"]: defaults["general"][k] = v elif isinstance(v, dict): @@ -127,15 +141,13 @@ class PipelineParameters(ParametersABC): else: defaults["general"][k] = v + # define defaults and update with any inputs defaults["tiler"] = TilerParameters.default(**tiler).to_dict() defaults["baby"] = BabyParameters.default(**baby).to_dict() defaults["extraction"] = ( exparams_from_meta(meta_d) or BabyParameters.default(**extraction).to_dict() ) - defaults["postprocessing"] = {} - defaults["reporting"] = {} - defaults["postprocessing"] = PostProcessorParameters.default( **postprocessing ).to_dict() @@ -156,16 +168,15 @@ class Pipeline(ProcessABC): """ - iterative_steps = ["tiler", "baby", "extraction"] - + pipeline_steps = ["tiler", "baby", "extraction"] step_sequence = [ "tiler", "baby", "extraction", "postprocessing", ] - # Indicate step-writer groupings to perform special operations during step iteration + # specify the group in the h5 files written by each step (?) writer_groups = { "tiler": ["trap_info"], "baby": ["cell_info"], @@ -178,8 +189,8 @@ class Pipeline(ProcessABC): } def __init__(self, parameters: PipelineParameters, store=None): + """Initialise - not usually called directly.""" super().__init__(parameters) - if store is not None: store = Path(store) self.store = store @@ -188,20 +199,19 @@ class Pipeline(ProcessABC): def setLogger( folder, file_level: str = "INFO", stream_level: str = "WARNING" ): - + """Initialise and format logger.""" logger = logging.getLogger("aliby") logger.setLevel(getattr(logging, file_level)) formatter = logging.Formatter( "%(asctime)s - %(levelname)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S%z", ) - + # for streams - stdout, files, etc. ch = logging.StreamHandler() ch.setLevel(getattr(logging, stream_level)) ch.setFormatter(formatter) logger.addHandler(ch) - - # create file handler which logs even debug messages + # create file handler that logs even debug messages fh = logging.FileHandler(Path(folder) / "aliby.log", "w+") fh.setLevel(getattr(logging, file_level)) fh.setFormatter(formatter) @@ -216,20 +226,20 @@ class Pipeline(ProcessABC): @classmethod def from_folder(cls, dir_path): """ - Constructor to re-process all files in a given folder. + Re-process all h5 files in a given folder. - Assumes all files share the same parameters (even if they don't share - the same channel set). + All files must share the same parameters, even if they have different channels. Parameters --------- - dir_path : str or Pathlib indicating the folder containing the files to process + dir_path : str or Pathlib + Folder containing the files. """ + # find h5 files dir_path = Path(dir_path) files = list(dir_path.rglob("*.h5")) assert len(files), "No valid files found in folder" fpath = files[0] - # TODO add support for non-standard unique folder names with h5py.File(fpath, "r") as f: pipeline_parameters = PipelineParameters.from_yaml( @@ -237,8 +247,7 @@ class Pipeline(ProcessABC): ) pipeline_parameters.general["directory"] = dir_path.parent pipeline_parameters.general["filter"] = [fpath.stem for fpath in files] - - # Fix legacy postprocessing parameters + # fix legacy post-processing parameters post_process_params = pipeline_parameters.postprocessing.get( "parameters", None ) @@ -247,16 +256,19 @@ class Pipeline(ProcessABC): post_process_params ) del pipeline_parameters.postprocessing["parameters"] - return cls(pipeline_parameters) @classmethod def from_existing_h5(cls, fpath): """ - Constructor to process an existing hdf5 file. - Notice that it forces a single file, not suitable for multiprocessing of certain positions. + Re-process an existing h5 file. - It i s also used as a base for a folder-wide reprocessing. + Not suitable for more than one file. + + Parameters + --------- + fpath: str + Name of file. """ with h5py.File(fpath, "r") as f: pipeline_parameters = PipelineParameters.from_yaml( @@ -265,7 +277,6 @@ class Pipeline(ProcessABC): directory = Path(fpath).parent pipeline_parameters.general["directory"] = directory pipeline_parameters.general["filter"] = Path(fpath).stem - post_process_params = pipeline_parameters.postprocessing.get( "parameters", None ) @@ -274,7 +285,6 @@ class Pipeline(ProcessABC): post_process_params ) del pipeline_parameters.postprocessing["parameters"] - return cls(pipeline_parameters, store=directory) @property @@ -287,7 +297,6 @@ class Pipeline(ProcessABC): Steps: all holds general tasks steps: strain_name holds task for a given strain """ - config = self.parameters.to_dict() expt_id = config["general"]["id"] distributed = config["general"]["distributed"] @@ -297,80 +306,70 @@ class Pipeline(ProcessABC): k: config["general"].get(k) for k in ("host", "username", "password") } - dispatcher = dispatch_dataset(expt_id, **self.server_info) logging.getLogger("aliby").info( f"Fetching data using {dispatcher.__class__.__name__}" ) - # Do all all initialisations - + # get log files, either locally or via OMERO with dispatcher as conn: image_ids = conn.get_images() - directory = self.store or root_dir / conn.unique_name - if not directory.exists(): directory.mkdir(parents=True) - - # Download logs to use for metadata + # download logs to use for metadata conn.cache_logs(directory) - - # Modify to the configuration + # update configuration self.parameters.general["directory"] = str(directory) config["general"]["directory"] = directory - self.setLogger(directory) - - # Filter TODO integrate filter onto class and add regex - def filt_int(d: dict, filt: int): - return {k: v for i, (k, v) in enumerate(d.items()) if i == filt} - - def filt_str(image_ids: dict, filt: str): - return {k: v for k, v in image_ids.items() if re.search(filt, k)} - - def pick_filter(image_ids: dict, filt: int or str): - if isinstance(filt, str): - image_ids = filt_str(image_ids, filt) - elif isinstance(filt, int): - image_ids = filt_int(image_ids, filt) - return image_ids - + # pick particular images if desired if isinstance(pos_filter, list): image_ids = { k: v for filt in pos_filter - for k, v in pick_filter(image_ids, filt).items() + for k, v in self.apply_filter(image_ids, filt).items() } else: - image_ids = pick_filter(image_ids, pos_filter) - + image_ids = self.apply_filter(image_ids, pos_filter) assert len(image_ids), "No images to segment" - - if distributed != 0: # Gives the number of simultaneous processes + # create pipeline + if distributed != 0: + # multiple cores with Pool(distributed) as p: results = p.map( - lambda x: self.create_pipeline(*x), + lambda x: self.run_one_pipeline(*x), [(k, i) for i, k in enumerate(image_ids.items())], - # num_cpus=distributed, - # position=0, ) - - else: # Sequential + else: + # single core results = [] for k, v in tqdm(image_ids.items()): - r = self.create_pipeline((k, v), 1) + r = self.run_one_pipeline((k, v), 1) results.append(r) - return results - def create_pipeline( + def apply_filter(self, image_ids: dict, filt: int or str): + """Select images by picking a particular one or by using a regular expression to parse their file names.""" + if isinstance(filt, str): + # pick images using a regular expression + image_ids = { + k: v for k, v in image_ids.items() if re.search(filt, k) + } + elif isinstance(filt, int): + # pick the filt'th image + image_ids = { + k: v for i, (k, v) in enumerate(image_ids.items()) if i == filt + } + return image_ids + + def run_one_pipeline( self, - image_id: t.Tuple[str, str or PosixPath or int], + name_image_id: t.Tuple[str, str or PosixPath or int], index: t.Optional[int] = None, ): """ """ self._pool_index = index - name, image_id = image_id + name, image_id = name_image_id session = None filename = None run_kwargs = {"extraction": {"labels": None, "masks": None}} @@ -386,7 +385,6 @@ class Pipeline(ProcessABC): session, trackers_state, ) = self._setup_pipeline(image_id) - loaded_writers = { name: writer(filename) for k in self.step_sequence @@ -398,20 +396,17 @@ class Pipeline(ProcessABC): "baby": ["mother_assign"], } - # START PIPELINE + # START frac_clogged_traps = 0 min_process_from = min(process_from.values()) - with get_image_class(image_id)( image_id, **self.server_info ) as image: - - # Initialise Steps + # initialise steps if "tiler" not in steps: steps["tiler"] = Tiler.from_image( image, TilerParameters.from_dict(config["tiler"]) ) - if process_from["baby"] < tps: session = initialise_tf(2) steps["baby"] = BabyRunner.from_tiler( @@ -420,8 +415,7 @@ class Pipeline(ProcessABC): ) if trackers_state: steps["baby"].crawler.tracker_states = trackers_state - - # Limit extraction parameters during run using the available channels in tiler + # limit extraction parameters using the available channels in tiler if process_from["extraction"] < tps: # TODO Move this parameter validation into Extractor av_channels = set((*steps["tiler"].channels, "general")) @@ -433,7 +427,6 @@ class Pipeline(ProcessABC): config["extraction"]["sub_bg"] = av_channels.intersection( config["extraction"]["sub_bg"] ) - av_channels_wsub = av_channels.union( [c + "_bgsub" for c in config["extraction"]["sub_bg"]] ) @@ -441,7 +434,6 @@ class Pipeline(ProcessABC): for op, (input_ch, _, _) in tmp.items(): if not set(input_ch).issubset(av_channels_wsub): del config["extraction"]["multichannel_ops"][op] - exparams = ExtractorParameters.from_dict( config["extraction"] ) @@ -456,14 +448,12 @@ class Pipeline(ProcessABC): # position=index + 1, ) for i in pbar: - if ( frac_clogged_traps < earlystop["thresh_pos_clogged"] or i < earlystop["min_tp"] ): - - for step in self.iterative_steps: + for step in self.pipeline_steps: if i >= process_from[step]: result = steps[step].run_tp( i, **run_kwargs.get(step, {}) @@ -478,7 +468,7 @@ class Pipeline(ProcessABC): meta={"last_processed": i}, ) - # Step-specific actions + # step-specific actions if ( step == "tiler" and i == min_process_from @@ -486,9 +476,8 @@ class Pipeline(ProcessABC): logging.getLogger("aliby").info( f"Found {steps['tiler'].n_traps} traps in {image.name}" ) - elif ( - step == "baby" - ): # Write state and pass info to ext + elif step == "baby": + # write state and pass info to ext loaded_writers["state"].write( data=steps[ step @@ -513,7 +502,8 @@ class Pipeline(ProcessABC): frac = np.round(frac_clogged_traps * 100) pbar.set_postfix_str(f"{frac} Clogged") - else: # Stop if more than X% traps are clogged + else: + # stop if more than X% traps are clogged self._log( f"{name}:Analysis stopped early at time {i} with {frac_clogged_traps} clogged traps" ) @@ -522,23 +512,22 @@ class Pipeline(ProcessABC): meta.add_fields({"last_processed": i}) - # Run post-processing + # run post-processing meta.add_fields({"end_status": "Success"}) post_proc_params = PostProcessorParameters.from_dict( config["postprocessing"] ) PostProcessor(filename, post_proc_params).run() - self._log("Analysis finished successfully.", "info") return 1 - except Exception as e: # Catch bugs during setup or runtime + except Exception as e: + # catch bugs during setup or run time logging.exception( f"{name}: Exception caught.", exc_info=True, ) - # This prints the type, value, and stack trace of the - # current exception being handled. + # print the type, value, and stack trace of the exception traceback.print_exc() raise e finally: @@ -607,17 +596,17 @@ class Pipeline(ProcessABC): t.List[np.ndarray], ]: """ - Initialise pipeline components and if necessary use - exising file to continue existing experiments. + Initialise pipeline components. + If necessary use a file to continue existing experiments. Parameters ---------- - image_id : int - identifier of image in OMERO server, or filename + image_id : int or str + Identifier of image in OMERO server, or filename Returns - --------- + ------- filename: str meta: config: @@ -639,7 +628,7 @@ class Pipeline(ProcessABC): general_config = config["general"] session = None earlystop = general_config.get("earlystop", None) - process_from = {k: 0 for k in self.iterative_steps} + process_from = {k: 0 for k in self.pipeline_steps} steps = {} ow = {k: 0 for k in self.step_sequence}