Skip to content
Snippets Groups Projects
Commit 9e635cb8 authored by Alán Muñoz's avatar Alán Muñoz
Browse files

add group and pandas hdf functions

Former-commit-id: 6ba6315c607baae1b606d576d8b26f179e359c88
parent a348bc3c
No related branches found
No related tags found
No related merge requests found
"""
Class to group multiple positions into one using one of several criteria.
"""
from pathlib import Path
import re
import h5py
import pandas as pd
from postprocessor.core.io.base import groupsort
from postprocessor.core.io.signal import Signal
from postprocessor.core.processes.base import ParametersABC, ProcessABC
class GroupParameters(ParametersABC):
def __init__(self, by="name", processes=[], signals=[]):
self.by = by
self.signals = signals
self.processes = processes
@classmethod
def default(cls):
return cls.from_dict({"by": "name", "signals": [], "processes": []})
class Group(ProcessABC):
def __init__(self, parameters):
super().__init__(parameters)
def get_position_filenames(self, exp_root, poses):
"""
Get filenames as a dictionary where the key is the position and value the filename.
"""
central_store = Path(exp_root) / "store.h5"
if central_store.exists():
hdf = h5py.File(central_store, "r")
self.filenames = [pos.attrs["filename"] for pos in hdf["/positions/"]]
hdf.close()
else: # If no central store just list position files in expt root folder
fullfiles = [x for x in Path(exp_root).glob("*store.h5")]
files = [x.name for x in Path(exp_root).glob("*store.h5")]
filenames = [False for _ in poses]
for i, pos in enumerate(poses):
matches = [
True if re.match(pos + ".*.h5", fname) else False for fname in files
]
if any(matches):
assert sum(matches) == 1, "More than one match"
filenames[i] = (pos, fullfiles[matches.index(True)])
self.filenames = {fname[0]: fname[1] for fname in filenames if fname}
self.positions = list(self.filenames.keys())
return self.filenames
def get_signals(self):
# hdf = h5py.File(central_store, "r")
# keys_d = groupsort(keys)
self.signals = {pos: {} for pos in self.filenames.keys()}
for pos, fname in self.filenames.items():
for signal in self.parameters.signals:
self.signals[pos][signal] = pd.read_hdf(fname, signal)
return self.signals
def gen_groups(self):
if self.by == "group": # Use group names in metadata
pass
elif self.by == "name": # Infer groups from signal concatenation
# Remove last four characters and find commonalities larger than 4
# characters between posnames and group them that way.
groupnames = list(set([x[:-3] for x in self.positions]))
self.group_signal_tree = {group: [] for group in groupnames}
self.poses_grouped = {group: [] for group in groupnames}
for pos in self.positions:
group = groupnames[groupnames.index(pos[:-3])]
self.group_signal_tree[group].append(self.signals[pos])
self.poses_grouped[group].append(pos)
elif (
type(self.by) == tuple
): # Manually give groups as tuple or list of positions
pass
def concat_signals(self):
self.concated_signals = {group: {} for group in self.group_signal_tree}
for k, group in self.group_signal_tree.items():
for signal in self.parameters.signals:
self.concated_signals[k][signal] = pd.concat(
[g[signal] for g in group], keys=self.poses_grouped[k]
)
return self.concated_signals
def process_signals(self, grouped_signals):
pass
def run(self, central_store, poses):
self.get_position_filenames(central_store, poses)
self.get_signals()
self.gen_groups()
self.concat_signals()
# processed_signals = self.process_signals(grouped_signals)
return concated_signals
# return processed_signals
# "/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01"
# simulate poses
poses = [
x.name.split("store")[0]
for x in Path(
"/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01"
).rglob("*")
if x.name != "images.h5"
]
gr = Group(
GroupParameters(
signals=["/extraction/general/None/area", "/extraction/mCherry/np_max/median"]
)
)
gr.run(
central_store="/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01",
poses=poses,
)
signal = Signal(
"/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01/ph_5_04_001store.h5"
)
...@@ -7,6 +7,11 @@ import h5py ...@@ -7,6 +7,11 @@ import h5py
class BridgeH5: class BridgeH5:
"""
Base class to interact with h5 data stores.
It also contains functions useful to predict how long should segmentation take.
"""
def __init__(self, file): def __init__(self, file):
self._hdf = h5py.File(file, "r") self._hdf = h5py.File(file, "r")
......
...@@ -10,21 +10,12 @@ class Signal(BridgeH5): ...@@ -10,21 +10,12 @@ class Signal(BridgeH5):
def __init__(self, file): def __init__(self, file):
super().__init__(file) super().__init__(file)
self._hdf.close() # Close the file to use pandas hdf functions
# hdf = pd.HDFStore(file)
# self.file = file
def __getitem__(self, dataset): def __getitem__(self, dataset):
dset = self._hdf[dataset][()] return pd.read_hdf(self.file, dataset)
attrs = self._hdf[dataset].attrs
first_dataset = dataset.split("/")[0] + "/"
timepoints = self._hdf[first_dataset].attrs["processed_timepoints"]
if "cell_label" in self._hdf[dataset].attrs:
ids = pd.MultiIndex.from_tuples(
zip(attrs["trap"], attrs["cell_label"]), names=["trap", "cell_label"]
)
else:
ids = pd.Index(attrs["trap"], names=["trap"])
return pd.DataFrame(dset, index=ids, columns=timepoints)
@staticmethod @staticmethod
def _if_ext_or_post(name): def _if_ext_or_post(name):
...@@ -34,4 +25,4 @@ class Signal(BridgeH5): ...@@ -34,4 +25,4 @@ class Signal(BridgeH5):
@property @property
def datasets(self): def datasets(self):
return signals._hdf.visit(self._if_ext_or_post) return self._hdf.visit(self._if_ext_or_post)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment