diff --git a/src/aliby/pipeline.py b/src/aliby/pipeline.py index 8a0d39f69b8e9ca30535ee08fade1afc5dc94d74..6c70c575721d2ed0a00705599bbeafed07cd2605 100644 --- a/src/aliby/pipeline.py +++ b/src/aliby/pipeline.py @@ -1,6 +1,4 @@ -""" -Pipeline and chaining elements. -""" +"""Set up and run pipelines: tiling, segmentation, extraction, and then post-processing.""" import logging import os import re @@ -36,12 +34,14 @@ from postprocessor.core.processor import PostProcessor, PostProcessorParameters class PipelineParameters(ParametersABC): - """Define parameters for the different steps of the pipeline.""" + """Define parameters for the steps of the pipeline.""" _pool_index = None - def __init__(self, general, tiler, baby, extraction, postprocessing, reporting): - """Initialise, but called by a class method not directly.""" + def __init__( + self, general, tiler, baby, extraction, postprocessing, reporting + ): + """Initialise, but called by a class method - not directly.""" self.general = general self.tiler = tiler self.baby = baby @@ -143,7 +143,8 @@ class PipelineParameters(ParametersABC): defaults["tiler"] = TilerParameters.default(**tiler).to_dict() defaults["baby"] = BabyParameters.default(**baby).to_dict() defaults["extraction"] = ( - exparams_from_meta(meta_d) or BabyParameters.default(**extraction).to_dict() + exparams_from_meta(meta_d) + or BabyParameters.default(**extraction).to_dict() ) defaults["postprocessing"] = PostProcessorParameters.default( **postprocessing @@ -159,10 +160,11 @@ class PipelineParameters(ParametersABC): class Pipeline(ProcessABC): """ - A chained set of Pipeline elements connected through pipes. - Tiling, Segmentation,Extraction and Postprocessing should use their own default parameters. - These can be overriden passing the key:value of parameters to override to a PipelineParameters class + Initialise and run tiling, segmentation, extraction and post-processing. + + Each step feeds the next one. + To customise parameters for any step use the PipelineParameters class.stem """ pipeline_steps = ["tiler", "baby", "extraction"] @@ -173,7 +175,7 @@ class Pipeline(ProcessABC): "postprocessing", ] # Indicate step-writer groupings to perform special operations during step iteration - # specify the group in the h5 files written by each step (?) + # Alan: replace with - specify the group in the h5 files written by each step (?) writer_groups = { "tiler": ["trap_info"], "baby": ["cell_info"], @@ -193,7 +195,9 @@ class Pipeline(ProcessABC): self.store = store @staticmethod - def setLogger(folder, file_level: str = "INFO", stream_level: str = "WARNING"): + def setLogger( + folder, file_level: str = "INFO", stream_level: str = "WARNING" + ): """Initialise and format logger.""" logger = logging.getLogger("aliby") logger.setLevel(getattr(logging, file_level)) @@ -237,13 +241,19 @@ class Pipeline(ProcessABC): fpath = files[0] # TODO add support for non-standard unique folder names with h5py.File(fpath, "r") as f: - pipeline_parameters = PipelineParameters.from_yaml(f.attrs["parameters"]) + pipeline_parameters = PipelineParameters.from_yaml( + f.attrs["parameters"] + ) pipeline_parameters.general["directory"] = dir_path.parent pipeline_parameters.general["filter"] = [fpath.stem for fpath in files] # fix legacy post-processing parameters - post_process_params = pipeline_parameters.postprocessing.get("parameters", None) + post_process_params = pipeline_parameters.postprocessing.get( + "parameters", None + ) if post_process_params: - pipeline_parameters.postprocessing["param_sets"] = copy(post_process_params) + pipeline_parameters.postprocessing["param_sets"] = copy( + post_process_params + ) del pipeline_parameters.postprocessing["parameters"] return cls(pipeline_parameters) @@ -260,13 +270,19 @@ class Pipeline(ProcessABC): Name of file. """ with h5py.File(fpath, "r") as f: - pipeline_parameters = PipelineParameters.from_yaml(f.attrs["parameters"]) + pipeline_parameters = PipelineParameters.from_yaml( + f.attrs["parameters"] + ) directory = Path(fpath).parent pipeline_parameters.general["directory"] = directory pipeline_parameters.general["filter"] = Path(fpath).stem - post_process_params = pipeline_parameters.postprocessing.get("parameters", None) + post_process_params = pipeline_parameters.postprocessing.get( + "parameters", None + ) if post_process_params: - pipeline_parameters.postprocessing["param_sets"] = copy(post_process_params) + pipeline_parameters.postprocessing["param_sets"] = copy( + post_process_params + ) del pipeline_parameters.postprocessing["parameters"] return cls(pipeline_parameters, store=directory) @@ -275,18 +291,16 @@ class Pipeline(ProcessABC): return logging.getLogger("aliby") def run(self): - """ - Config holds the general information, use in main - Steps: all holds general tasks - steps: strain_name holds task for a given strain - """ + """Run separate pipelines for all positions in an experiment.""" + # general information in config config = self.parameters.to_dict() expt_id = config["general"]["id"] distributed = config["general"]["distributed"] pos_filter = config["general"]["filter"] root_dir = Path(config["general"]["directory"]) self.server_info = { - k: config["general"].get(k) for k in ("host", "username", "password") + k: config["general"].get(k) + for k in ("host", "username", "password") } dispatcher = dispatch_dataset(expt_id, **self.server_info) logging.getLogger("aliby").info( @@ -305,28 +319,29 @@ class Pipeline(ProcessABC): config["general"]["directory"] = directory self.setLogger(directory) # pick particular images if desired - if isinstance(pos_filter, list): - image_ids = { - k: v - for filt in pos_filter - for k, v in self.apply_filter(image_ids, filt).items() - } - else: - image_ids = self.apply_filter(image_ids, pos_filter) + if pos_filter: + if isinstance(pos_filter, list): + image_ids = { + k: v + for filt in pos_filter + for k, v in self.apply_filter(image_ids, filt).items() + } + else: + image_ids = self.apply_filter(image_ids, pos_filter) assert len(image_ids), "No images to segment" - # create pipeline + # create pipelines if distributed != 0: # multiple cores with Pool(distributed) as p: results = p.map( - lambda x: self.run_one_pipeline(*x), + lambda x: self.run_one_position(*x), [(k, i) for i, k in enumerate(image_ids.items())], ) else: # single core results = [] for k, v in tqdm(image_ids.items()): - r = self.run_one_pipeline((k, v), 1) + r = self.run_one_position((k, v), 1) results.append(r) return results @@ -334,7 +349,9 @@ class Pipeline(ProcessABC): """Select images by picking a particular one or by using a regular expression to parse their file names.""" if isinstance(filt, str): # pick images using a regular expression - image_ids = {k: v for k, v in image_ids.items() if re.search(filt, k)} + image_ids = { + k: v for k, v in image_ids.items() if re.search(filt, k) + } elif isinstance(filt, int): # pick the filt'th image image_ids = { @@ -342,16 +359,19 @@ class Pipeline(ProcessABC): } return image_ids - def run_one_pipeline( + def run_one_position( self, name_image_id: t.Tuple[str, str or PosixPath or int], index: t.Optional[int] = None, ): - """ """ + """Set up and run a pipeline for one position.""" self._pool_index = index name, image_id = name_image_id + # session and filename are defined by calling setup_pipeline. + # can they be deleted here? session = None filename = None + # run_kwargs = {"extraction": {"labels": None, "masks": None}} try: ( @@ -379,7 +399,9 @@ class Pipeline(ProcessABC): # START frac_clogged_traps = 0 min_process_from = min(process_from.values()) - with get_image_class(image_id)(image_id, **self.server_info) as image: + with get_image_class(image_id)( + image_id, **self.server_info + ) as image: # initialise steps if "tiler" not in steps: steps["tiler"] = Tiler.from_image( @@ -412,22 +434,26 @@ class Pipeline(ProcessABC): for op, (input_ch, _, _) in tmp.items(): if not set(input_ch).issubset(av_channels_wsub): del config["extraction"]["multichannel_ops"][op] - exparams = ExtractorParameters.from_dict(config["extraction"]) + exparams = ExtractorParameters.from_dict( + config["extraction"] + ) steps["extraction"] = Extractor.from_tiler( exparams, store=filename, tiler=steps["tiler"] ) + # set up progress meter pbar = tqdm( range(min_process_from, tps), desc=image.name, initial=min_process_from, total=tps, - # position=index + 1, ) for i in pbar: if ( - frac_clogged_traps < earlystop["thresh_pos_clogged"] + frac_clogged_traps + < earlystop["thresh_pos_clogged"] or i < earlystop["min_tp"] ): + # run through steps for step in self.pipeline_steps: if i >= process_from[step]: result = steps[step].run_tp( @@ -436,48 +462,52 @@ class Pipeline(ProcessABC): if step in loaded_writers: loaded_writers[step].write( data=result, - overwrite=writer_ow_kwargs.get(step, []), + overwrite=writer_ow_kwargs.get( + step, [] + ), tp=i, meta={"last_processed": i}, ) - - # step-specific actions - if step == "tiler" and i == min_process_from: + # perform step + if ( + step == "tiler" + and i == min_process_from + ): logging.getLogger("aliby").info( f"Found {steps['tiler'].n_traps} traps in {image.name}" ) elif step == "baby": - # write state and pass info to ext + # write state and pass info to ext (Alan: what's ext?) loaded_writers["state"].write( - data=steps[step].crawler.tracker_states, + data=steps[ + step + ].crawler.tracker_states, overwrite=loaded_writers[ "state" ].datatypes.keys(), tp=i, ) - elif ( - step == "extraction" - ): # Remove mask/label after ext + elif step == "extraction": + # remove mask/label after extraction for k in ["masks", "labels"]: run_kwargs[step][k] = None - + # check and report clogging frac_clogged_traps = self.check_earlystop( filename, earlystop, steps["tiler"].tile_size ) - self._log(f"{name}:Clogged_traps:{frac_clogged_traps}") - + self._log( + f"{name}:Clogged_traps:{frac_clogged_traps}" + ) frac = np.round(frac_clogged_traps * 100) pbar.set_postfix_str(f"{frac} Clogged") else: - # stop if more than X% traps are clogged + # stop if too many traps are clogged self._log( - f"{name}:Analysis stopped early at time {i} with {frac_clogged_traps} clogged traps" + f"{name}:Stopped early at time {i} with {frac_clogged_traps} clogged traps" ) meta.add_fields({"end_status": "Clogged"}) break - meta.add_fields({"last_processed": i}) - # run post-processing meta.add_fields({"end_status": "Success"}) post_proc_params = PostProcessorParameters.from_dict( @@ -501,22 +531,48 @@ class Pipeline(ProcessABC): @staticmethod def check_earlystop(filename: str, es_parameters: dict, tile_size: int): + """ + Check recent time points for tiles with too many cells. + + Returns the fraction of clogged tiles, where clogged tiles have + too many cells or too much of their area covered by cells. + + Parameters + ---------- + filename: str + Name of h5 file. + es_parameters: dict + Parameters defining when early stopping should happen. + For example: + {'min_tp': 100, + 'thresh_pos_clogged': 0.4, + 'thresh_trap_ncells': 8, + 'thresh_trap_area': 0.9, + 'ntps_to_eval': 5} + tile_size: int + Size of tile. + """ + # get the area of the cells organised by trap and cell number s = Signal(filename) df = s["/extraction/general/None/area"] - cells_used = df[df.columns[-1 - es_parameters["ntps_to_eval"] : -1]].dropna( - how="all" - ) + # check the latest time points only + cells_used = df[ + df.columns[-1 - es_parameters["ntps_to_eval"] : -1] + ].dropna(how="all") + # find tiles with too many cells traps_above_nthresh = ( cells_used.groupby("trap").count().apply(np.mean, axis=1) > es_parameters["thresh_trap_ncells"] ) + # find tiles with cells covering too great a fraction of the tiles' area traps_above_athresh = ( - cells_used.groupby("trap").sum().apply(np.mean, axis=1) / tile_size**2 + cells_used.groupby("trap").sum().apply(np.mean, axis=1) + / tile_size**2 > es_parameters["thresh_trap_area"] ) - return (traps_above_nthresh & traps_above_athresh).mean() + # Alan: can both this method and the next be deleted? def _load_config_from_file( self, filename: PosixPath, @@ -542,7 +598,9 @@ class Pipeline(ProcessABC): switch_case = { "tiler": lambda f: f["trap_info/drifts"].shape[0] - 1, "baby": lambda f: f["cell_info/timepoint"][-1], - "extraction": lambda f: f["extraction/general/None/area/timepoint"][-1], + "extraction": lambda f: f[ + "extraction/general/None/area/timepoint" + ][-1], } return switch_case[step] @@ -559,72 +617,66 @@ class Pipeline(ProcessABC): t.List[np.ndarray], ]: """ - Initialise pipeline components. + Initialise steps in a pipeline. - If necessary use a file to continue existing experiments. + If necessary use a file to re-start experiments already partly run. Parameters ---------- image_id : int or str - Identifier of image in OMERO server, or filename + Identifier of a data set in an OMERO server or a filename. Returns ------- filename: str - meta: - config: - process_from: - tps: - steps: - earlystop: - session: - trackers_state: - - Examples - -------- - FIXME: Add docs. - + Path to a h5 file to write to. + meta: object + agora.io.metadata.MetaData object + config: dict + Configuration parameters. + process_from: dict + Gives from which time point each step of the pipeline should start. + tps: int + Number of time points. + steps: dict + earlystop: dict + Parameters to check whether the pipeline should be stopped. + session: None + trackers_state: list + States of any trackers from earlier runs. """ config = self.parameters.to_dict() - pparams = config - image_id = image_id - general_config = config["general"] + # Alan: session is never changed session = None - earlystop = general_config.get("earlystop", None) + earlystop = config["general"].get("earlystop", None) process_from = {k: 0 for k in self.pipeline_steps} steps = {} - ow = {k: 0 for k in self.step_sequence} - # check overwriting - ow_id = general_config.get("overwrite", 0) + ow_id = config["general"].get("overwrite", 0) ow = {step: True for step in self.step_sequence} if ow_id and ow_id is not True: ow = { step: self.step_sequence.index(ow_id) < i for i, step in enumerate(self.step_sequence, 1) } - - # Set up - directory = general_config["directory"] - - trackers_state: t.List[np.ndarray] = [] + # set up + directory = config["general"]["directory"] + trackers_state = [] with get_image_class(image_id)(image_id, **self.server_info) as image: filename = Path(f"{directory}/{image.name}.h5") meta = MetaData(directory, filename) - from_start = True if np.any(ow.values()) else False - - # New experiment or overwriting + # remove existing file if overwriting if ( from_start and ( - config.get("overwrite", False) == True or np.all(list(ow.values())) + config["general"].get("overwrite", False) + or np.all(list(ow.values())) ) and filename.exists() ): os.remove(filename) - - # If no previous segmentation and keep tiler + # if the file exists with no previous segmentation use its tiler if filename.exists(): self._log("Result file exists.", "info") if not ow["tiler"]: @@ -643,15 +695,14 @@ class Pipeline(ProcessABC): if ow["baby"] else StateReader(filename).get_formatted_states() ) - config["tiler"] = steps["tiler"].parameters.to_dict() except Exception: + # Alan: a warning or log here? pass - if config["general"]["use_explog"]: meta.run() - - meta.add_fields( # Add non-logfile metadata + # add metadata not in the log file + meta.add_fields( { "aliby_version": version("aliby"), "baby_version": version("aliby-baby"), @@ -659,12 +710,12 @@ class Pipeline(ProcessABC): "image_id": image_id if isinstance(image_id, int) else str(image_id), - "parameters": PipelineParameters.from_dict(pparams).to_yaml(), + "parameters": PipelineParameters.from_dict( + config + ).to_yaml(), } ) - - tps = min(general_config["tps"], image.data.shape[0]) - + tps = min(config["general"]["tps"], image.data.shape[0]) return ( filename, meta,