diff --git a/src/aliby/pipeline.py b/src/aliby/pipeline.py index 73e00aa6592b0fe0c6cc7d3de0079f1532a62b98..1c443b9b3ac92fc2f64d6d5966281da093610ede 100644 --- a/src/aliby/pipeline.py +++ b/src/aliby/pipeline.py @@ -10,7 +10,8 @@ import baby import baby.errors import numpy as np import tensorflow as tf -from pathos.multiprocessing import Pool +from pathos.multiprocessing import ProcessingPool +import multiprocessing from tqdm import tqdm try: @@ -122,7 +123,7 @@ class PipelineParameters(ParametersABC): defaults = { "general": dict( id=expt_id, - distributed=0, + distributed=False, tps=tps, directory=str(directory.parent), filter="", @@ -289,13 +290,14 @@ class Pipeline(ProcessABC): print("\t" + pos.split(".")[0]) # create and run pipelines distributed = config["general"]["distributed"] - if distributed != 0: + if distributed: # multiple cores - with Pool(distributed) as p: - results = p.map( - self.run_one_position, - [position_id for position_id in position_ids.items()], - ) + no_cores = multiprocessing.cpu_count() + pool = ProcessingPool(nodes=no_cores) + results = pool.map( + self.run_one_position, + [position_id for position_id in position_ids.items()], + ) else: # single core results = [ @@ -311,7 +313,7 @@ class Pipeline(ProcessABC): out_dir = config["general"]["directory"] with dispatch_image(image_id)(image_id, **self.server_info) as image: out_file = Path(f"{out_dir}/{image.name}.h5") - # remove existing h5 file if overwriting + # remove existing h5 file if out_file.exists(): os.remove(out_file) # generate h5 file using meta data from logs