Skip to content
Snippets Groups Projects
Commit 8bd5c0fe authored by Alán Muñoz's avatar Alán Muñoz
Browse files

bugfix multichannel operations in pipeline

parent 23cc129c
No related branches found
No related tags found
No related merge requests found
......@@ -3,22 +3,27 @@ Pipeline and chaining elements.
"""
import logging
import os
from abc import ABC, abstractmethod
# from abc import ABC, abstractmethod
from typing import List, Union
from pathlib import Path
import traceback
from copy import copy
from itertools import groupby
import re
import h5py
import yaml
# import yaml
from tqdm import tqdm
from p_tqdm import p_map
# from p_tqdm import p_map
from time import perf_counter
from pathos.multiprocessing import Pool
import numpy as np
import pandas as pd
# import pandas as pd
from scipy import ndimage
from aliby.experiment import MetaData
......@@ -142,8 +147,8 @@ class Pipeline(ProcessABC):
def __init__(self, parameters: PipelineParameters, store=None):
super().__init__(parameters)
if store is None:
store = self.parameters.general["directory"]
if store is not None:
store = Path(store)
self.store = store
@classmethod
......@@ -162,14 +167,17 @@ class Pipeline(ProcessABC):
Parameters
---------
fpath : str or Pathlib indicating the folder containing the files to process
dir_path : str or Pathlib indicating the folder containing the files to process
"""
files = Path(dir_path).rglob("*.h5")
fpath = list(files)[0]
dir_path = Path(dir_path)
files = list(dir_path.rglob("*.h5"))
assert len(files), "No valid files found in folder"
fpath = files[0]
with h5py.File(l, "r") as f:
# TODO add support for non-standard unique folder names
with h5py.File(fpath, "r") as f:
pipeline_parameters = PipelineParameters.from_yaml(f.attrs["parameters"])
pipeline_parameters.general["directory"] = Path(fpath).parent.parent
pipeline_parameters.general["directory"] = dir_path.parent
pipeline_parameters.general["filter"] = [fpath.stem for fpath in files]
return cls(pipeline_parameters)
......@@ -184,10 +192,11 @@ class Pipeline(ProcessABC):
"""
with h5py.File(fpath, "r") as f:
pipeline_parameters = PipelineParameters.from_yaml(f.attrs["parameters"])
pipeline_parameters.general["directory"] = Path(fpath).parent.parent
pipeline_parameters.general["filter"] = Path(fpath).stem
directory = Path(fpath).parent
pipeline_parameters.general["directory"] = directory
pipeline_parameters.general["filter"] = Path(fpath).stem
return cls(pipeline_parameters)
return cls(pipeline_parameters, store=directory)
def run(self):
# Config holds the general information, use in main
......@@ -198,17 +207,19 @@ class Pipeline(ProcessABC):
expt_id = config["general"]["id"]
distributed = config["general"]["distributed"]
pos_filter = config["general"]["filter"]
root_dir = config["general"]["directory"]
root_dir = Path(root_dir)
root_dir = Path(config["general"]["directory"])
print("Searching OMERO")
# Do all all initialisations
with Dataset(int(expt_id), **self.general["server_info"]) as conn:
image_ids = conn.get_images()
directory = root_dir / conn.unique_name
directory = self.store or root_dir / conn.unique_name
if not directory.exists():
directory.mkdir(parents=True)
# Download logs to use for metadata
# Download logs to use for metadata
conn.cache_logs(directory)
# Modify to the configuration
......@@ -402,6 +413,7 @@ class Pipeline(ProcessABC):
# Limit extraction parameters during run using the available channels in tiler
if process_from["extraction"] < tps:
# TODO Move this parameter validation into Extractor
av_channels = set((*steps["tiler"].channels, "general"))
config["extraction"]["tree"] = {
k: v
......@@ -412,20 +424,15 @@ class Pipeline(ProcessABC):
config["extraction"]["sub_bg"]
)
#
av_channels_wsub = av_channels.union(
[c + "_bgsub" for c in config["extraction"]["sub_bg"]]
)
for op in config["extraction"]["multichannel_ops"]:
config["extraction"]["multichannel_ops"][op] = [
x
for x in config["extraction"]["multichannel_ops"]
if len(x[0]) == len(av_channels_wsub.intersection(x[0]))
]
config["extraction"]["multichannel_ops"] = {
k: v
for k, v in config["extraction"]["multichannel_ops"].items()
if len(v)
}
for op, (input_ch, op_id, red_ext) in copy(
config["extraction"]["multichannel_ops"]
).items():
if set(input_ch).difference(av_channels_wsub):
del config["extraction"]["multichannel_ops"][op]
exparams = ExtractorParameters.from_dict(config["extraction"])
steps["extraction"] = Extractor.from_tiler(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment