From 4a547de72644318600971d815391388b6d94cb55 Mon Sep 17 00:00:00 2001 From: pswain <peter.swain@ed.ac.uk> Date: Fri, 9 Jun 2023 19:33:33 +0100 Subject: [PATCH] mostly comments; changed image_ids to position_ids in pipeline --- src/agora/abc.py | 10 +++---- src/agora/io/signal.py | 7 ++--- src/agora/utils/merge.py | 2 +- src/aliby/pipeline.py | 26 +++++++++-------- src/extraction/core/extractor.py | 15 +++++----- src/postprocessor/core/processor.py | 28 ++++++++++--------- .../core/reshapers/bud_metric.py | 5 ++-- 7 files changed, 47 insertions(+), 46 deletions(-) diff --git a/src/agora/abc.py b/src/agora/abc.py index bf54d8a2..19281c94 100644 --- a/src/agora/abc.py +++ b/src/agora/abc.py @@ -17,7 +17,7 @@ atomic = t.Union[int, float, str, bool] class ParametersABC(ABC): """ - Defines parameters as attributes and allows parameters to + Define parameters as attributes and allow parameters to be converted to either a dictionary or to yaml. No attribute should be called "parameters"! @@ -25,7 +25,7 @@ class ParametersABC(ABC): def __init__(self, **kwargs): """ - Defines parameters as attributes + Define parameters as attributes. """ assert ( "parameters" not in kwargs @@ -243,11 +243,9 @@ class StepABC(ProcessABC): @timer def run_tp(self, tp: int, **kwargs): - """ - Time and log the timing of a step. - """ + """Time and log the timing of a step.""" return self._run_tp(tp, **kwargs) def run(self): # Replace run with run_tp - raise Warning("Steps use run_tp instead of run") + raise Warning("Steps use run_tp instead of run.") diff --git a/src/agora/io/signal.py b/src/agora/io/signal.py index 4611663f..392b6c7b 100644 --- a/src/agora/io/signal.py +++ b/src/agora/io/signal.py @@ -271,7 +271,7 @@ class Signal(BridgeH5): Parameters ---------- dataset: str or list of strs - The name of the h5 file or a list of h5 file names + The name of the h5 file or a list of h5 file names. in_minutes: boolean If True, lineage: boolean @@ -354,10 +354,7 @@ class Signal(BridgeH5): fullname: str, node: t.Union[h5py.Dataset, h5py.Group], ): - """ - Store the name of a signal if it is a leaf node - (a group with no more groups inside) and if it starts with extraction. - """ + """Store the name of a signal if it is a leaf node and if it starts with extraction.""" if isinstance(node, h5py.Group) and np.all( [isinstance(x, h5py.Dataset) for x in node.values()] ): diff --git a/src/agora/utils/merge.py b/src/agora/utils/merge.py index 93669912..edd65b31 100644 --- a/src/agora/utils/merge.py +++ b/src/agora/utils/merge.py @@ -43,7 +43,7 @@ def apply_merges(data: pd.DataFrame, merges: np.ndarray): # Implement the merges and drop source rows. # TODO Use matrices to perform merges in batch - # for ecficiency + # for efficiency if valid_merges.any(): to_merge = data.loc[indices] targets, sources = zip(*merges[valid_merges]) diff --git a/src/aliby/pipeline.py b/src/aliby/pipeline.py index dcb807cd..144b4fb8 100644 --- a/src/aliby/pipeline.py +++ b/src/aliby/pipeline.py @@ -320,7 +320,7 @@ class Pipeline(ProcessABC): ) # get log files, either locally or via OMERO with dispatcher as conn: - image_ids = conn.get_images() + position_ids = conn.get_images() directory = self.store or root_dir / conn.unique_name if not directory.exists(): directory.mkdir(parents=True) @@ -330,29 +330,29 @@ class Pipeline(ProcessABC): self.parameters.general["directory"] = str(directory) config["general"]["directory"] = directory self.setLogger(directory) - # pick particular images if desired + # pick particular positions if desired if pos_filter is not None: if isinstance(pos_filter, list): - image_ids = { + position_ids = { k: v for filt in pos_filter - for k, v in self.apply_filter(image_ids, filt).items() + for k, v in self.apply_filter(position_ids, filt).items() } else: - image_ids = self.apply_filter(image_ids, pos_filter) - assert len(image_ids), "No images to segment" + position_ids = self.apply_filter(position_ids, pos_filter) + assert len(position_ids), "No images to segment" # create pipelines if distributed != 0: # multiple cores with Pool(distributed) as p: results = p.map( lambda x: self.run_one_position(*x), - [(k, i) for i, k in enumerate(image_ids.items())], + [(k, i) for i, k in enumerate(position_ids.items())], ) else: # single core results = [] - for k, v in tqdm(image_ids.items()): + for k, v in tqdm(position_ids.items()): r = self.run_one_position((k, v), 1) results.append(r) return results @@ -453,13 +453,14 @@ class Pipeline(ProcessABC): steps["extraction"] = Extractor.from_tiler( exparams, store=filename, tiler=steps["tiler"] ) - # set up progress meter + # set up progress bar pbar = tqdm( range(min_process_from, tps), desc=image.name, initial=min_process_from, total=tps, ) + # run through time points for i in pbar: if ( frac_clogged_traps @@ -469,9 +470,12 @@ class Pipeline(ProcessABC): # run through steps for step in self.pipeline_steps: if i >= process_from[step]: + # perform step result = steps[step].run_tp( i, **run_kwargs.get(step, {}) ) + # write to h5 file using writers + # extractor writes to h5 itself if step in loaded_writers: loaded_writers[step].write( data=result, @@ -481,7 +485,7 @@ class Pipeline(ProcessABC): tp=i, meta={"last_processed": i}, ) - # perform step + # clean up if ( step == "tiler" and i == min_process_from @@ -501,7 +505,7 @@ class Pipeline(ProcessABC): tp=i, ) elif step == "extraction": - # remove mask/label after extraction + # remove masks and labels after extraction for k in ["masks", "labels"]: run_kwargs[step][k] = None # check and report clogging diff --git a/src/extraction/core/extractor.py b/src/extraction/core/extractor.py index 34ce783c..cebb0b2a 100644 --- a/src/extraction/core/extractor.py +++ b/src/extraction/core/extractor.py @@ -80,7 +80,7 @@ class Extractor(StepABC): Usually the metric is applied to only a tile's masked area, but some metrics depend on the whole tile. - Extraction follows a three-level tree structure. Channels, such as GFP, are the root level; the reduction algorithm, such as maximum projection, is the second level; the specific metric, or operation, to apply to the masks, such as mean, is the third level. + Extraction follows a three-level tree structure. Channels, such as GFP, are the root level; the reduction algorithm, such as maximum projection, is the second level; the specific metric, or operation, to apply to the masks, such as mean, is the third or leaf level. """ # TODO Alan: Move this to a location with the SwainLab defaults @@ -202,7 +202,7 @@ class Extractor(StepABC): self._custom_funs[k] = tmp(f) def load_funs(self): - """Define all functions, including custum ones.""" + """Define all functions, including custom ones.""" self.load_custom_funs() self._all_cell_funs = set(self._custom_funs.keys()).union(CELL_FUNS) # merge the two dicts @@ -335,7 +335,7 @@ class Extractor(StepABC): **kwargs, ) -> t.Dict[str, t.Dict[reduction_method, t.Dict[str, pd.Series]]]: """ - Wrapper to apply reduction and then extraction. + Wrapper to reduce to a 2D image and then extract. Parameters ---------- @@ -499,7 +499,6 @@ class Extractor(StepABC): # calculate metrics with subtracted bg ch_bs = ch + "_bgsub" # subtract median background - self.img_bgsub[ch_bs] = np.moveaxis( np.stack( list( @@ -579,7 +578,9 @@ class Extractor(StepABC): **kwargs, ) -> dict: """ - Wrapper to add compatibility with other steps of the pipeline. + Run extraction for one position and for the specified time points. + + Save the results to a h5 file. Parameters ---------- @@ -597,7 +598,7 @@ class Extractor(StepABC): Returns ------- d: dict - A dict of the extracted data with a concatenated string of channel, reduction metric, and cell metric as keys and pd.Series of the extracted data as values. + A dict of the extracted data for one position with a concatenated string of channel, reduction metric, and cell metric as keys and pd.DataFrame of the extracted data for all time points as values. """ if tree is None: tree = self.params.tree @@ -633,7 +634,7 @@ class Extractor(StepABC): def save_to_hdf(self, dict_series, path=None): """ - Save the extracted data to the h5 file. + Save the extracted data for one position to the h5 file. Parameters ---------- diff --git a/src/postprocessor/core/processor.py b/src/postprocessor/core/processor.py index 42dae83a..32d87175 100644 --- a/src/postprocessor/core/processor.py +++ b/src/postprocessor/core/processor.py @@ -201,26 +201,17 @@ class PostProcessor(ProcessABC): overwrite="overwrite", ) - @staticmethod - def pick_mother(a, b): - """Update the mother id following this priorities: - - The mother has a lower id - """ - x = max(a, b) - if min([a, b]): - x = [a, b][np.argmin([a, b])] - return x - def run(self): """ Write the results to the h5 file. + Processes include identifying buddings and finding bud metrics. """ # run merger, picker, and find lineages self.run_prepost() # run processes for process, datasets in tqdm(self.targets["processes"]): + # process is a str; datasets is a list of str if process in self.parameters["param_sets"].get("processes", {}): # parameters already assigned parameters = self.parameters_classfun[process]( @@ -233,7 +224,6 @@ class PostProcessor(ProcessABC): loaded_process = self.classfun[process](parameters) if isinstance(parameters, LineageProcessParameters): loaded_process.lineage = self.lineage - # apply process to each dataset for dataset in datasets: self.run_process(dataset, process, loaded_process) @@ -259,7 +249,7 @@ class PostProcessor(ProcessABC): [], columns=signal.columns, index=signal.index ) result.columns.names = ["timepoint"] - # define outpath, where result will be written + # define outpath to write result if process in self.parameters["outpaths"]: outpath = self.parameters["outpaths"][process] elif isinstance(dataset, list): @@ -308,3 +298,15 @@ class PostProcessor(ProcessABC): metadata: t.Dict, ): self._writer.write(path, result, meta=metadata, overwrite="overwrite") + + @staticmethod + def pick_mother(a, b): + """ + Update the mother id following this priorities: + + The mother has a lower id + """ + x = max(a, b) + if min([a, b]): + x = [a, b][np.argmin([a, b])] + return x diff --git a/src/postprocessor/core/reshapers/bud_metric.py b/src/postprocessor/core/reshapers/bud_metric.py index 677375aa..b8952288 100644 --- a/src/postprocessor/core/reshapers/bud_metric.py +++ b/src/postprocessor/core/reshapers/bud_metric.py @@ -1,5 +1,4 @@ import typing as t -from typing import Dict, Tuple import numpy as np import pandas as pd @@ -31,7 +30,7 @@ class BudMetric(LineageProcess): def run( self, signal: pd.DataFrame, - lineage: Dict[pd.Index, Tuple[pd.Index]] = None, + lineage: t.Dict[pd.Index, t.Tuple[pd.Index]] = None, ): if lineage is None: if hasattr(self, "lineage"): @@ -44,7 +43,7 @@ class BudMetric(LineageProcess): @staticmethod def get_bud_metric( - signal: pd.DataFrame, md: Dict[Tuple, Tuple[Tuple]] = None + signal: pd.DataFrame, md: t.Dict[t.Tuple, t.Tuple[t.Tuple]] = None ): """ -- GitLab