diff --git a/aliby/pipeline.py b/aliby/pipeline.py index 58ed0ef712b618120a65135eef4cc9a79b196c2d..63da24fbb1576ef4b2f4cd7a0e7e7af25b456ca0 100644 --- a/aliby/pipeline.py +++ b/aliby/pipeline.py @@ -3,22 +3,27 @@ Pipeline and chaining elements. """ import logging import os -from abc import ABC, abstractmethod + +# from abc import ABC, abstractmethod from typing import List, Union from pathlib import Path import traceback +from copy import copy from itertools import groupby import re import h5py -import yaml + +# import yaml from tqdm import tqdm -from p_tqdm import p_map + +# from p_tqdm import p_map from time import perf_counter from pathos.multiprocessing import Pool import numpy as np -import pandas as pd + +# import pandas as pd from scipy import ndimage from aliby.experiment import MetaData @@ -142,8 +147,8 @@ class Pipeline(ProcessABC): def __init__(self, parameters: PipelineParameters, store=None): super().__init__(parameters) - if store is None: - store = self.parameters.general["directory"] + if store is not None: + store = Path(store) self.store = store @classmethod @@ -162,14 +167,17 @@ class Pipeline(ProcessABC): Parameters --------- - fpath : str or Pathlib indicating the folder containing the files to process + dir_path : str or Pathlib indicating the folder containing the files to process """ - files = Path(dir_path).rglob("*.h5") - fpath = list(files)[0] + dir_path = Path(dir_path) + files = list(dir_path.rglob("*.h5")) + assert len(files), "No valid files found in folder" + fpath = files[0] - with h5py.File(l, "r") as f: + # TODO add support for non-standard unique folder names + with h5py.File(fpath, "r") as f: pipeline_parameters = PipelineParameters.from_yaml(f.attrs["parameters"]) - pipeline_parameters.general["directory"] = Path(fpath).parent.parent + pipeline_parameters.general["directory"] = dir_path.parent pipeline_parameters.general["filter"] = [fpath.stem for fpath in files] return cls(pipeline_parameters) @@ -184,10 +192,11 @@ class Pipeline(ProcessABC): """ with h5py.File(fpath, "r") as f: pipeline_parameters = PipelineParameters.from_yaml(f.attrs["parameters"]) - pipeline_parameters.general["directory"] = Path(fpath).parent.parent - pipeline_parameters.general["filter"] = Path(fpath).stem + directory = Path(fpath).parent + pipeline_parameters.general["directory"] = directory + pipeline_parameters.general["filter"] = Path(fpath).stem - return cls(pipeline_parameters) + return cls(pipeline_parameters, store=directory) def run(self): # Config holds the general information, use in main @@ -198,17 +207,19 @@ class Pipeline(ProcessABC): expt_id = config["general"]["id"] distributed = config["general"]["distributed"] pos_filter = config["general"]["filter"] - root_dir = config["general"]["directory"] - root_dir = Path(root_dir) + root_dir = Path(config["general"]["directory"]) print("Searching OMERO") # Do all all initialisations with Dataset(int(expt_id), **self.general["server_info"]) as conn: image_ids = conn.get_images() - directory = root_dir / conn.unique_name + + directory = self.store or root_dir / conn.unique_name + if not directory.exists(): directory.mkdir(parents=True) - # Download logs to use for metadata + + # Download logs to use for metadata conn.cache_logs(directory) # Modify to the configuration @@ -402,6 +413,7 @@ class Pipeline(ProcessABC): # Limit extraction parameters during run using the available channels in tiler if process_from["extraction"] < tps: + # TODO Move this parameter validation into Extractor av_channels = set((*steps["tiler"].channels, "general")) config["extraction"]["tree"] = { k: v @@ -412,20 +424,15 @@ class Pipeline(ProcessABC): config["extraction"]["sub_bg"] ) + # av_channels_wsub = av_channels.union( [c + "_bgsub" for c in config["extraction"]["sub_bg"]] ) - for op in config["extraction"]["multichannel_ops"]: - config["extraction"]["multichannel_ops"][op] = [ - x - for x in config["extraction"]["multichannel_ops"] - if len(x[0]) == len(av_channels_wsub.intersection(x[0])) - ] - config["extraction"]["multichannel_ops"] = { - k: v - for k, v in config["extraction"]["multichannel_ops"].items() - if len(v) - } + for op, (input_ch, op_id, red_ext) in copy( + config["extraction"]["multichannel_ops"] + ).items(): + if set(input_ch).difference(av_channels_wsub): + del config["extraction"]["multichannel_ops"][op] exparams = ExtractorParameters.from_dict(config["extraction"]) steps["extraction"] = Extractor.from_tiler(