Skip to content
Snippets Groups Projects
Commit 5e1c1544 authored by pswain's avatar pswain
Browse files

Docs for most of pipeline and dataset.

parent e770d48f
No related branches found
No related tags found
No related merge requests found
...@@ -18,35 +18,40 @@ from aliby.io.image import ImageLocalOME ...@@ -18,35 +18,40 @@ from aliby.io.image import ImageLocalOME
def dispatch_dataset(expt_id: int or str, **kwargs): def dispatch_dataset(expt_id: int or str, **kwargs):
""" """
Choose a subtype of dataset based on the identifier. Find paths to the data.
Input: Connects to OMERO if data is remotely available.
--------
expt_id: int or string serving as dataset identifier.
Returns: Parameters
-------- ----------
Callable Dataset instance, either network-dependent or local. expt_id: int or str
""" To identify the data, either an OMERO ID or an OME-TIFF file or a local directory.
if isinstance(expt_id, int): # Is an experiment online
Returns
-------
A callable Dataset instance, either network-dependent or local.
"""
if isinstance(expt_id, int):
# data available online
from aliby.io.omero import Dataset from aliby.io.omero import Dataset
return Dataset(expt_id, **kwargs) return Dataset(expt_id, **kwargs)
elif isinstance(expt_id, str):
elif isinstance(expt_id, str): # Files or Dir # data available locally
expt_path = Path(expt_id) expt_path = Path(expt_id)
if expt_path.is_dir(): if expt_path.is_dir():
# data in multiple folders
return DatasetLocalDir(expt_path) return DatasetLocalDir(expt_path)
else: else:
# data in one folder as OME-TIFF files
return DatasetLocalOME(expt_path) return DatasetLocalOME(expt_path)
else: else:
raise Warning("Invalid expt_id") raise Warning(f"{expt_id} is an invalid expt_id")
class DatasetLocalABC(ABC): class DatasetLocalABC(ABC):
""" """
Abstract Base class to fetch local files, either OME-XML or raw images. Abstract Base class to find local files, either OME-XML or raw images.
""" """
_valid_suffixes = ("tiff", "png") _valid_suffixes = ("tiff", "png")
...@@ -73,12 +78,9 @@ class DatasetLocalABC(ABC): ...@@ -73,12 +78,9 @@ class DatasetLocalABC(ABC):
def unique_name(self): def unique_name(self):
return self.path.name return self.path.name
@abstractproperty
def date(self):
pass
@property @property
def files(self): def files(self):
"""Return a dictionary with any available metadata files."""
if not hasattr(self, "_files"): if not hasattr(self, "_files"):
self._files = { self._files = {
f: f f: f
...@@ -91,34 +93,35 @@ class DatasetLocalABC(ABC): ...@@ -91,34 +93,35 @@ class DatasetLocalABC(ABC):
return self._files return self._files
def cache_logs(self, root_dir): def cache_logs(self, root_dir):
# Copy metadata files to results folder """Copy metadata files to results folder."""
for name, annotation in self.files.items(): for name, annotation in self.files.items():
shutil.copy(annotation, root_dir / name.name) shutil.copy(annotation, root_dir / name.name)
return True return True
@abstractproperty
def date(self):
pass
@abstractmethod @abstractmethod
def get_images(self): def get_images(self):
# Return a dictionary with the name of images and their unique identifiers
pass pass
class DatasetLocalDir(DatasetLocalABC): class DatasetLocalDir(DatasetLocalABC):
""" """Find paths to a data set, comprising multiple images in different folders."""
Organise an entire dataset, composed of multiple images, as a directory containing directories with individual files.
It relies on ImageDir to manage images.
"""
def __init__(self, dpath: t.Union[str, PosixPath], *args, **kwargs): def __init__(self, dpath: t.Union[str, PosixPath], *args, **kwargs):
super().__init__(dpath) super().__init__(dpath)
@property @property
def date(self): def date(self):
# Use folder creation date, for cases where metadata is minimal """Find date when a folder was created."""
return time.strftime( return time.strftime(
"%Y%m%d", time.strptime(time.ctime(os.path.getmtime(self.path))) "%Y%m%d", time.strptime(time.ctime(os.path.getmtime(self.path)))
) )
def get_images(self): def get_images(self):
"""Return a dictionary of folder names and their paths."""
return { return {
folder.name: folder folder.name: folder
for folder in self.path.glob("*/") for folder in self.path.glob("*/")
...@@ -131,13 +134,7 @@ class DatasetLocalDir(DatasetLocalABC): ...@@ -131,13 +134,7 @@ class DatasetLocalDir(DatasetLocalABC):
class DatasetLocalOME(DatasetLocalABC): class DatasetLocalOME(DatasetLocalABC):
"""Load a dataset from a folder """Find names of images in a folder, assuming images in OME-TIFF format."""
We use a given image of a dataset to obtain the metadata,
as we cannot expect folders to contain this information.
It uses the standard OME-TIFF file format.
"""
def __init__(self, dpath: t.Union[str, PosixPath], *args, **kwargs): def __init__(self, dpath: t.Union[str, PosixPath], *args, **kwargs):
super().__init__(dpath) super().__init__(dpath)
...@@ -145,11 +142,11 @@ class DatasetLocalOME(DatasetLocalABC): ...@@ -145,11 +142,11 @@ class DatasetLocalOME(DatasetLocalABC):
@property @property
def date(self): def date(self):
# Access the date from the metadata of the first position """Get the date from the metadata of the first position."""
return ImageLocalOME(list(self.get_images().values())[0]).date return ImageLocalOME(list(self.get_images().values())[0]).date
def get_images(self): def get_images(self):
# Fetches all valid formats and overwrites if duplicates with different suffix """Return a dictionary with the names of the image files."""
return { return {
f.name: str(f) f.name: str(f)
for suffix in self._valid_suffixes for suffix in self._valid_suffixes
......
...@@ -36,23 +36,14 @@ from postprocessor.core.processor import PostProcessor, PostProcessorParameters ...@@ -36,23 +36,14 @@ from postprocessor.core.processor import PostProcessor, PostProcessorParameters
class PipelineParameters(ParametersABC): class PipelineParameters(ParametersABC):
""" """Define parameters for the different steps of the pipeline."""
Parameters that host what is run and how. It takes a list of dictionaries, one for
general in collection:
pass dictionary for each step
--------------------
expt_id: int or str Experiment id (if integer) or local path (if string).
directory: str Directory into which results are dumped. Default is "../data"
Provides default parameters for the entire pipeline. This downloads the logfiles and sets the default
timepoints and extraction parameters from there.
"""
_pool_index = None _pool_index = None
def __init__( def __init__(
self, general, tiler, baby, extraction, postprocessing, reporting self, general, tiler, baby, extraction, postprocessing, reporting
): ):
"""Initialise, but called by a class method not directly."""
self.general = general self.general = general
self.tiler = tiler self.tiler = tiler
self.baby = baby self.baby = baby
...@@ -69,13 +60,34 @@ class PipelineParameters(ParametersABC): ...@@ -69,13 +60,34 @@ class PipelineParameters(ParametersABC):
extraction={}, extraction={},
postprocessing={}, postprocessing={},
): ):
"""
Initialise parameters for steps of the pipeline.
Some parameters are extracted from the log files.
Parameters
---------
general: dict
Parameters to set up the pipeline.
tiler: dict
Parameters for tiler.
baby: dict (optional)
Parameters for Baby.
extraction: dict (optional)
Parameters for extraction.
postprocessing: dict (optional)
Parameters for post-processing.
"""
# Alan: should 19993 be updated?
expt_id = general.get("expt_id", 19993) expt_id = general.get("expt_id", 19993)
if isinstance(expt_id, PosixPath): if isinstance(expt_id, PosixPath):
expt_id = str(expt_id) expt_id = str(expt_id)
general["expt_id"] = expt_id general["expt_id"] = expt_id
# Alan: an error message rather than a default might be better
directory = Path(general.get("directory", "../data")) directory = Path(general.get("directory", "../data"))
# get log files, either locally or via OMERO
with dispatch_dataset( with dispatch_dataset(
expt_id, expt_id,
**{k: general.get(k) for k in ("host", "username", "password")}, **{k: general.get(k) for k in ("host", "username", "password")},
...@@ -83,7 +95,7 @@ class PipelineParameters(ParametersABC): ...@@ -83,7 +95,7 @@ class PipelineParameters(ParametersABC):
directory = directory / conn.unique_name directory = directory / conn.unique_name
if not directory.exists(): if not directory.exists():
directory.mkdir(parents=True) directory.mkdir(parents=True)
# Download logs to use for metadata # download logs for metadata
conn.cache_logs(directory) conn.cache_logs(directory)
try: try:
meta_d = MetaData(directory, None).load_logs() meta_d = MetaData(directory, None).load_logs()
...@@ -95,9 +107,10 @@ class PipelineParameters(ParametersABC): ...@@ -95,9 +107,10 @@ class PipelineParameters(ParametersABC):
"channels": ["Brightfield"], "channels": ["Brightfield"],
"ntps": [2000], "ntps": [2000],
} }
# Set minimal metadata # set minimal metadata
meta_d = minimal_default_meta meta_d = minimal_default_meta
# define default values for general parameters
tps = meta_d.get("ntps", 2000) tps = meta_d.get("ntps", 2000)
defaults = { defaults = {
"general": dict( "general": dict(
...@@ -118,7 +131,8 @@ class PipelineParameters(ParametersABC): ...@@ -118,7 +131,8 @@ class PipelineParameters(ParametersABC):
) )
} }
for k, v in general.items(): # Overwrite general parameters # update default values using inputs
for k, v in general.items():
if k not in defaults["general"]: if k not in defaults["general"]:
defaults["general"][k] = v defaults["general"][k] = v
elif isinstance(v, dict): elif isinstance(v, dict):
...@@ -127,15 +141,13 @@ class PipelineParameters(ParametersABC): ...@@ -127,15 +141,13 @@ class PipelineParameters(ParametersABC):
else: else:
defaults["general"][k] = v defaults["general"][k] = v
# define defaults and update with any inputs
defaults["tiler"] = TilerParameters.default(**tiler).to_dict() defaults["tiler"] = TilerParameters.default(**tiler).to_dict()
defaults["baby"] = BabyParameters.default(**baby).to_dict() defaults["baby"] = BabyParameters.default(**baby).to_dict()
defaults["extraction"] = ( defaults["extraction"] = (
exparams_from_meta(meta_d) exparams_from_meta(meta_d)
or BabyParameters.default(**extraction).to_dict() or BabyParameters.default(**extraction).to_dict()
) )
defaults["postprocessing"] = {}
defaults["reporting"] = {}
defaults["postprocessing"] = PostProcessorParameters.default( defaults["postprocessing"] = PostProcessorParameters.default(
**postprocessing **postprocessing
).to_dict() ).to_dict()
...@@ -156,16 +168,15 @@ class Pipeline(ProcessABC): ...@@ -156,16 +168,15 @@ class Pipeline(ProcessABC):
""" """
iterative_steps = ["tiler", "baby", "extraction"] pipeline_steps = ["tiler", "baby", "extraction"]
step_sequence = [ step_sequence = [
"tiler", "tiler",
"baby", "baby",
"extraction", "extraction",
"postprocessing", "postprocessing",
] ]
# Indicate step-writer groupings to perform special operations during step iteration # Indicate step-writer groupings to perform special operations during step iteration
# specify the group in the h5 files written by each step (?)
writer_groups = { writer_groups = {
"tiler": ["trap_info"], "tiler": ["trap_info"],
"baby": ["cell_info"], "baby": ["cell_info"],
...@@ -178,8 +189,8 @@ class Pipeline(ProcessABC): ...@@ -178,8 +189,8 @@ class Pipeline(ProcessABC):
} }
def __init__(self, parameters: PipelineParameters, store=None): def __init__(self, parameters: PipelineParameters, store=None):
"""Initialise - not usually called directly."""
super().__init__(parameters) super().__init__(parameters)
if store is not None: if store is not None:
store = Path(store) store = Path(store)
self.store = store self.store = store
...@@ -188,20 +199,19 @@ class Pipeline(ProcessABC): ...@@ -188,20 +199,19 @@ class Pipeline(ProcessABC):
def setLogger( def setLogger(
folder, file_level: str = "INFO", stream_level: str = "WARNING" folder, file_level: str = "INFO", stream_level: str = "WARNING"
): ):
"""Initialise and format logger."""
logger = logging.getLogger("aliby") logger = logging.getLogger("aliby")
logger.setLevel(getattr(logging, file_level)) logger.setLevel(getattr(logging, file_level))
formatter = logging.Formatter( formatter = logging.Formatter(
"%(asctime)s - %(levelname)s:%(message)s", "%(asctime)s - %(levelname)s:%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S%z", datefmt="%Y-%m-%dT%H:%M:%S%z",
) )
# for streams - stdout, files, etc.
ch = logging.StreamHandler() ch = logging.StreamHandler()
ch.setLevel(getattr(logging, stream_level)) ch.setLevel(getattr(logging, stream_level))
ch.setFormatter(formatter) ch.setFormatter(formatter)
logger.addHandler(ch) logger.addHandler(ch)
# create file handler that logs even debug messages
# create file handler which logs even debug messages
fh = logging.FileHandler(Path(folder) / "aliby.log", "w+") fh = logging.FileHandler(Path(folder) / "aliby.log", "w+")
fh.setLevel(getattr(logging, file_level)) fh.setLevel(getattr(logging, file_level))
fh.setFormatter(formatter) fh.setFormatter(formatter)
...@@ -216,20 +226,20 @@ class Pipeline(ProcessABC): ...@@ -216,20 +226,20 @@ class Pipeline(ProcessABC):
@classmethod @classmethod
def from_folder(cls, dir_path): def from_folder(cls, dir_path):
""" """
Constructor to re-process all files in a given folder. Re-process all h5 files in a given folder.
Assumes all files share the same parameters (even if they don't share All files must share the same parameters, even if they have different channels.
the same channel set).
Parameters Parameters
--------- ---------
dir_path : str or Pathlib indicating the folder containing the files to process dir_path : str or Pathlib
Folder containing the files.
""" """
# find h5 files
dir_path = Path(dir_path) dir_path = Path(dir_path)
files = list(dir_path.rglob("*.h5")) files = list(dir_path.rglob("*.h5"))
assert len(files), "No valid files found in folder" assert len(files), "No valid files found in folder"
fpath = files[0] fpath = files[0]
# TODO add support for non-standard unique folder names # TODO add support for non-standard unique folder names
with h5py.File(fpath, "r") as f: with h5py.File(fpath, "r") as f:
pipeline_parameters = PipelineParameters.from_yaml( pipeline_parameters = PipelineParameters.from_yaml(
...@@ -237,8 +247,7 @@ class Pipeline(ProcessABC): ...@@ -237,8 +247,7 @@ class Pipeline(ProcessABC):
) )
pipeline_parameters.general["directory"] = dir_path.parent pipeline_parameters.general["directory"] = dir_path.parent
pipeline_parameters.general["filter"] = [fpath.stem for fpath in files] pipeline_parameters.general["filter"] = [fpath.stem for fpath in files]
# fix legacy post-processing parameters
# Fix legacy postprocessing parameters
post_process_params = pipeline_parameters.postprocessing.get( post_process_params = pipeline_parameters.postprocessing.get(
"parameters", None "parameters", None
) )
...@@ -247,16 +256,19 @@ class Pipeline(ProcessABC): ...@@ -247,16 +256,19 @@ class Pipeline(ProcessABC):
post_process_params post_process_params
) )
del pipeline_parameters.postprocessing["parameters"] del pipeline_parameters.postprocessing["parameters"]
return cls(pipeline_parameters) return cls(pipeline_parameters)
@classmethod @classmethod
def from_existing_h5(cls, fpath): def from_existing_h5(cls, fpath):
""" """
Constructor to process an existing hdf5 file. Re-process an existing h5 file.
Notice that it forces a single file, not suitable for multiprocessing of certain positions.
It i s also used as a base for a folder-wide reprocessing. Not suitable for more than one file.
Parameters
---------
fpath: str
Name of file.
""" """
with h5py.File(fpath, "r") as f: with h5py.File(fpath, "r") as f:
pipeline_parameters = PipelineParameters.from_yaml( pipeline_parameters = PipelineParameters.from_yaml(
...@@ -265,7 +277,6 @@ class Pipeline(ProcessABC): ...@@ -265,7 +277,6 @@ class Pipeline(ProcessABC):
directory = Path(fpath).parent directory = Path(fpath).parent
pipeline_parameters.general["directory"] = directory pipeline_parameters.general["directory"] = directory
pipeline_parameters.general["filter"] = Path(fpath).stem pipeline_parameters.general["filter"] = Path(fpath).stem
post_process_params = pipeline_parameters.postprocessing.get( post_process_params = pipeline_parameters.postprocessing.get(
"parameters", None "parameters", None
) )
...@@ -274,7 +285,6 @@ class Pipeline(ProcessABC): ...@@ -274,7 +285,6 @@ class Pipeline(ProcessABC):
post_process_params post_process_params
) )
del pipeline_parameters.postprocessing["parameters"] del pipeline_parameters.postprocessing["parameters"]
return cls(pipeline_parameters, store=directory) return cls(pipeline_parameters, store=directory)
@property @property
...@@ -287,7 +297,6 @@ class Pipeline(ProcessABC): ...@@ -287,7 +297,6 @@ class Pipeline(ProcessABC):
Steps: all holds general tasks Steps: all holds general tasks
steps: strain_name holds task for a given strain steps: strain_name holds task for a given strain
""" """
config = self.parameters.to_dict() config = self.parameters.to_dict()
expt_id = config["general"]["id"] expt_id = config["general"]["id"]
distributed = config["general"]["distributed"] distributed = config["general"]["distributed"]
...@@ -297,80 +306,70 @@ class Pipeline(ProcessABC): ...@@ -297,80 +306,70 @@ class Pipeline(ProcessABC):
k: config["general"].get(k) k: config["general"].get(k)
for k in ("host", "username", "password") for k in ("host", "username", "password")
} }
dispatcher = dispatch_dataset(expt_id, **self.server_info) dispatcher = dispatch_dataset(expt_id, **self.server_info)
logging.getLogger("aliby").info( logging.getLogger("aliby").info(
f"Fetching data using {dispatcher.__class__.__name__}" f"Fetching data using {dispatcher.__class__.__name__}"
) )
# Do all all initialisations # get log files, either locally or via OMERO
with dispatcher as conn: with dispatcher as conn:
image_ids = conn.get_images() image_ids = conn.get_images()
directory = self.store or root_dir / conn.unique_name directory = self.store or root_dir / conn.unique_name
if not directory.exists(): if not directory.exists():
directory.mkdir(parents=True) directory.mkdir(parents=True)
# download logs to use for metadata
# Download logs to use for metadata
conn.cache_logs(directory) conn.cache_logs(directory)
# update configuration
# Modify to the configuration
self.parameters.general["directory"] = str(directory) self.parameters.general["directory"] = str(directory)
config["general"]["directory"] = directory config["general"]["directory"] = directory
self.setLogger(directory) self.setLogger(directory)
# pick particular images if desired
# Filter TODO integrate filter onto class and add regex
def filt_int(d: dict, filt: int):
return {k: v for i, (k, v) in enumerate(d.items()) if i == filt}
def filt_str(image_ids: dict, filt: str):
return {k: v for k, v in image_ids.items() if re.search(filt, k)}
def pick_filter(image_ids: dict, filt: int or str):
if isinstance(filt, str):
image_ids = filt_str(image_ids, filt)
elif isinstance(filt, int):
image_ids = filt_int(image_ids, filt)
return image_ids
if isinstance(pos_filter, list): if isinstance(pos_filter, list):
image_ids = { image_ids = {
k: v k: v
for filt in pos_filter for filt in pos_filter
for k, v in pick_filter(image_ids, filt).items() for k, v in self.apply_filter(image_ids, filt).items()
} }
else: else:
image_ids = pick_filter(image_ids, pos_filter) image_ids = self.apply_filter(image_ids, pos_filter)
assert len(image_ids), "No images to segment" assert len(image_ids), "No images to segment"
# create pipeline
if distributed != 0: # Gives the number of simultaneous processes if distributed != 0:
# multiple cores
with Pool(distributed) as p: with Pool(distributed) as p:
results = p.map( results = p.map(
lambda x: self.create_pipeline(*x), lambda x: self.run_one_pipeline(*x),
[(k, i) for i, k in enumerate(image_ids.items())], [(k, i) for i, k in enumerate(image_ids.items())],
# num_cpus=distributed,
# position=0,
) )
else:
else: # Sequential # single core
results = [] results = []
for k, v in tqdm(image_ids.items()): for k, v in tqdm(image_ids.items()):
r = self.create_pipeline((k, v), 1) r = self.run_one_pipeline((k, v), 1)
results.append(r) results.append(r)
return results return results
def create_pipeline( def apply_filter(self, image_ids: dict, filt: int or str):
"""Select images by picking a particular one or by using a regular expression to parse their file names."""
if isinstance(filt, str):
# pick images using a regular expression
image_ids = {
k: v for k, v in image_ids.items() if re.search(filt, k)
}
elif isinstance(filt, int):
# pick the filt'th image
image_ids = {
k: v for i, (k, v) in enumerate(image_ids.items()) if i == filt
}
return image_ids
def run_one_pipeline(
self, self,
image_id: t.Tuple[str, str or PosixPath or int], name_image_id: t.Tuple[str, str or PosixPath or int],
index: t.Optional[int] = None, index: t.Optional[int] = None,
): ):
""" """ """ """
self._pool_index = index self._pool_index = index
name, image_id = image_id name, image_id = name_image_id
session = None session = None
filename = None filename = None
run_kwargs = {"extraction": {"labels": None, "masks": None}} run_kwargs = {"extraction": {"labels": None, "masks": None}}
...@@ -386,7 +385,6 @@ class Pipeline(ProcessABC): ...@@ -386,7 +385,6 @@ class Pipeline(ProcessABC):
session, session,
trackers_state, trackers_state,
) = self._setup_pipeline(image_id) ) = self._setup_pipeline(image_id)
loaded_writers = { loaded_writers = {
name: writer(filename) name: writer(filename)
for k in self.step_sequence for k in self.step_sequence
...@@ -398,20 +396,17 @@ class Pipeline(ProcessABC): ...@@ -398,20 +396,17 @@ class Pipeline(ProcessABC):
"baby": ["mother_assign"], "baby": ["mother_assign"],
} }
# START PIPELINE # START
frac_clogged_traps = 0 frac_clogged_traps = 0
min_process_from = min(process_from.values()) min_process_from = min(process_from.values())
with get_image_class(image_id)( with get_image_class(image_id)(
image_id, **self.server_info image_id, **self.server_info
) as image: ) as image:
# initialise steps
# Initialise Steps
if "tiler" not in steps: if "tiler" not in steps:
steps["tiler"] = Tiler.from_image( steps["tiler"] = Tiler.from_image(
image, TilerParameters.from_dict(config["tiler"]) image, TilerParameters.from_dict(config["tiler"])
) )
if process_from["baby"] < tps: if process_from["baby"] < tps:
session = initialise_tf(2) session = initialise_tf(2)
steps["baby"] = BabyRunner.from_tiler( steps["baby"] = BabyRunner.from_tiler(
...@@ -420,8 +415,7 @@ class Pipeline(ProcessABC): ...@@ -420,8 +415,7 @@ class Pipeline(ProcessABC):
) )
if trackers_state: if trackers_state:
steps["baby"].crawler.tracker_states = trackers_state steps["baby"].crawler.tracker_states = trackers_state
# limit extraction parameters using the available channels in tiler
# Limit extraction parameters during run using the available channels in tiler
if process_from["extraction"] < tps: if process_from["extraction"] < tps:
# TODO Move this parameter validation into Extractor # TODO Move this parameter validation into Extractor
av_channels = set((*steps["tiler"].channels, "general")) av_channels = set((*steps["tiler"].channels, "general"))
...@@ -433,7 +427,6 @@ class Pipeline(ProcessABC): ...@@ -433,7 +427,6 @@ class Pipeline(ProcessABC):
config["extraction"]["sub_bg"] = av_channels.intersection( config["extraction"]["sub_bg"] = av_channels.intersection(
config["extraction"]["sub_bg"] config["extraction"]["sub_bg"]
) )
av_channels_wsub = av_channels.union( av_channels_wsub = av_channels.union(
[c + "_bgsub" for c in config["extraction"]["sub_bg"]] [c + "_bgsub" for c in config["extraction"]["sub_bg"]]
) )
...@@ -441,7 +434,6 @@ class Pipeline(ProcessABC): ...@@ -441,7 +434,6 @@ class Pipeline(ProcessABC):
for op, (input_ch, _, _) in tmp.items(): for op, (input_ch, _, _) in tmp.items():
if not set(input_ch).issubset(av_channels_wsub): if not set(input_ch).issubset(av_channels_wsub):
del config["extraction"]["multichannel_ops"][op] del config["extraction"]["multichannel_ops"][op]
exparams = ExtractorParameters.from_dict( exparams = ExtractorParameters.from_dict(
config["extraction"] config["extraction"]
) )
...@@ -456,14 +448,12 @@ class Pipeline(ProcessABC): ...@@ -456,14 +448,12 @@ class Pipeline(ProcessABC):
# position=index + 1, # position=index + 1,
) )
for i in pbar: for i in pbar:
if ( if (
frac_clogged_traps frac_clogged_traps
< earlystop["thresh_pos_clogged"] < earlystop["thresh_pos_clogged"]
or i < earlystop["min_tp"] or i < earlystop["min_tp"]
): ):
for step in self.pipeline_steps:
for step in self.iterative_steps:
if i >= process_from[step]: if i >= process_from[step]:
result = steps[step].run_tp( result = steps[step].run_tp(
i, **run_kwargs.get(step, {}) i, **run_kwargs.get(step, {})
...@@ -478,7 +468,7 @@ class Pipeline(ProcessABC): ...@@ -478,7 +468,7 @@ class Pipeline(ProcessABC):
meta={"last_processed": i}, meta={"last_processed": i},
) )
# Step-specific actions # step-specific actions
if ( if (
step == "tiler" step == "tiler"
and i == min_process_from and i == min_process_from
...@@ -486,9 +476,8 @@ class Pipeline(ProcessABC): ...@@ -486,9 +476,8 @@ class Pipeline(ProcessABC):
logging.getLogger("aliby").info( logging.getLogger("aliby").info(
f"Found {steps['tiler'].n_traps} traps in {image.name}" f"Found {steps['tiler'].n_traps} traps in {image.name}"
) )
elif ( elif step == "baby":
step == "baby" # write state and pass info to ext
): # Write state and pass info to ext
loaded_writers["state"].write( loaded_writers["state"].write(
data=steps[ data=steps[
step step
...@@ -513,7 +502,8 @@ class Pipeline(ProcessABC): ...@@ -513,7 +502,8 @@ class Pipeline(ProcessABC):
frac = np.round(frac_clogged_traps * 100) frac = np.round(frac_clogged_traps * 100)
pbar.set_postfix_str(f"{frac} Clogged") pbar.set_postfix_str(f"{frac} Clogged")
else: # Stop if more than X% traps are clogged else:
# stop if more than X% traps are clogged
self._log( self._log(
f"{name}:Analysis stopped early at time {i} with {frac_clogged_traps} clogged traps" f"{name}:Analysis stopped early at time {i} with {frac_clogged_traps} clogged traps"
) )
...@@ -522,23 +512,22 @@ class Pipeline(ProcessABC): ...@@ -522,23 +512,22 @@ class Pipeline(ProcessABC):
meta.add_fields({"last_processed": i}) meta.add_fields({"last_processed": i})
# Run post-processing # run post-processing
meta.add_fields({"end_status": "Success"}) meta.add_fields({"end_status": "Success"})
post_proc_params = PostProcessorParameters.from_dict( post_proc_params = PostProcessorParameters.from_dict(
config["postprocessing"] config["postprocessing"]
) )
PostProcessor(filename, post_proc_params).run() PostProcessor(filename, post_proc_params).run()
self._log("Analysis finished successfully.", "info") self._log("Analysis finished successfully.", "info")
return 1 return 1
except Exception as e: # Catch bugs during setup or runtime except Exception as e:
# catch bugs during setup or run time
logging.exception( logging.exception(
f"{name}: Exception caught.", f"{name}: Exception caught.",
exc_info=True, exc_info=True,
) )
# This prints the type, value, and stack trace of the # print the type, value, and stack trace of the exception
# current exception being handled.
traceback.print_exc() traceback.print_exc()
raise e raise e
finally: finally:
...@@ -607,17 +596,17 @@ class Pipeline(ProcessABC): ...@@ -607,17 +596,17 @@ class Pipeline(ProcessABC):
t.List[np.ndarray], t.List[np.ndarray],
]: ]:
""" """
Initialise pipeline components and if necessary use Initialise pipeline components.
exising file to continue existing experiments.
If necessary use a file to continue existing experiments.
Parameters Parameters
---------- ----------
image_id : int image_id : int or str
identifier of image in OMERO server, or filename Identifier of image in OMERO server, or filename
Returns Returns
--------- -------
filename: str filename: str
meta: meta:
config: config:
...@@ -639,7 +628,7 @@ class Pipeline(ProcessABC): ...@@ -639,7 +628,7 @@ class Pipeline(ProcessABC):
general_config = config["general"] general_config = config["general"]
session = None session = None
earlystop = general_config.get("earlystop", None) earlystop = general_config.get("earlystop", None)
process_from = {k: 0 for k in self.iterative_steps} process_from = {k: 0 for k in self.pipeline_steps}
steps = {} steps = {}
ow = {k: 0 for k in self.step_sequence} ow = {k: 0 for k in self.step_sequence}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment