-
Alán Muñoz authored
Former-commit-id: c4eacc82e2148aa500f8dea989a85c11d2c87cee
Alán Muñoz authoredFormer-commit-id: c4eacc82e2148aa500f8dea989a85c11d2c87cee
processor.py 6.27 KiB
import h5py
from typing import List, Dict, Union
from pydoc import locate
import numpy as np
import pandas as pd
from postprocessor.core.processes.base import ParametersABC
from postprocessor.core.processes.merger import mergerParameters, merger
from postprocessor.core.processes.picker import pickerParameters, picker
from core.io.writer import Writer
from core.io.signal import Signal
from core.cells import Cells
class PostProcessorParameters(ParametersABC):
"""
Anthology of parameters used for postprocessing
:merger:
:picker: parameters for picker
:processes: Dict processes:[objectives], 'processes' are defined in ./processes/
while objectives are relative or absolute paths to datasets. If relative paths the
post-processed addresses are used.
"""
def __init__(
self,
targets={},
parameters={},
outpaths={},
):
self.targets: Dict = targets
self.parameters: Dict = parameters
self.outpaths: Dict = outpaths
def __getitem__(self, item):
return getattr(self, item)
@classmethod
def default(cls, kind=None):
if kind == "defaults" or kind == None:
return cls(
targets={
"prepost": {
"merger": "/extraction/general/None/area",
"picker": ["/extraction/general/None/area"],
},
"processes": {
"dsignal": ["/extraction/general/None/area"],
# "savgol": ["/extraction/general/None/area"],
},
},
parameters={
"prepost": {
"merger": mergerParameters.default(),
"picker": pickerParameters.default(),
}
},
outpaths={},
)
def to_dict(self):
return {k: _if_dict(v) for k, v in self.__dict__.items()}
class PostProcessor:
def __init__(self, filename, parameters):
self.parameters = parameters
self._filename = filename
self._signal = Signal(filename)
self._writer = Writer(filename)
# self.outpaths = parameters["outpaths"]
self.merger = merger(parameters["parameters"]["prepost"]["merger"])
self.picker = picker(
parameters=parameters["parameters"]["prepost"]["picker"],
cells=Cells.from_source(filename),
)
self.classfun = {
process: self.get_process(process)
for process in parameters["targets"]["processes"]
}
self.parameters_classfun = {
process: self.get_parameters(process)
for process in parameters["targets"]["processes"]
}
self.targets = parameters["targets"]
@staticmethod
def get_process(process):
"""
Dynamically import a process class from the 'processes' folder.
Assumes process filename and class name are the same
"""
return locate("postprocessor.core.processes." + process + "." + process)
@staticmethod
def get_parameters(process):
"""
Dynamically import a process class from the 'processes' folder.
Assumes process filename and class name are the same
"""
return locate(
"postprocessor.core.processes." + process + "." + process + "Parameters"
)
def run_prepost(self):
"""Important processes run before normal post-processing ones"""
merge_events = self.merger.run(self._signal[self.targets["prepost"]["merger"]])
with h5py.File(self._filename, "r") as f:
prev_idchanges = self._signal.get_merges()
changes_history = list(prev_idchanges) + [np.array(x) for x in merge_events]
self._writer.write("modifiers/merges", data=changes_history)
picks = self.picker.run(self._signal[self.targets["prepost"]["picker"][0]])
self._writer.write("modifiers/picks", data=picks)
def run(self):
self.run_prepost()
for process, datasets in self.targets["processes"].items():
if process in self.parameters["parameters"].get(
"processes", {}
): # If we assigned parameters
parameters = self.parameters_classfun[process](self.parameters[process])
else:
parameters = self.parameters_classfun[process].default()
loaded_process = self.classfun[process](parameters)
for dataset in datasets:
if isinstance(dataset, list): # multisignal process
signal = [self._signal[d] for d in dataset]
elif isinstance(dataset, str):
signal = self._signal[dataset]
else:
raise ("Incorrect dataset")
result = loaded_process.run(signal)
if process in self.parameters.to_dict()["outpaths"]:
outpath = self.parameters.to_dict()["outpaths"][process]
elif isinstance(dataset, list):
# If no outpath defined, place the result in the minimum common
# branch of all signals used
prefix = "".join(
prefix + c[0]
for c in takewhile(
lambda x: all(x[0] == y for y in x), zip(*dataset)
)
)
outpath = (
prefix
+ "_".join( # TODO check that it always finishes in '/'
[d[len(prefix) :].replace("/", "_") for d in dataset]
)
)
elif isinstance(dataset, str):
outpath = dataset[1:].replace("/", "_")
else:
raise ("Outpath not defined", type(dataset))
self.write_result(
"/postprocessing/" + process + "/" + outpath, result, metadata={}
)
def write_result(
self, path: str, result: Union[List, pd.DataFrame, np.ndarray], metadata: Dict
):
self._writer.write(path, result, meta=metadata)
def _if_dict(item):
if hasattr(item, "to_dict"):
item = item.to_dict()
return item