From 7915260edf05ad586c0cee80942c2a4700a71710 Mon Sep 17 00:00:00 2001 From: Diane Adjavon <diane.adjavon@ed.ac.uk> Date: Sat, 11 Sep 2021 15:11:54 +0100 Subject: [PATCH] Updated from master and add Gaussian Process. Gaussian Process currently fixed to growth rate, could be used for any signal in theory so the names might be changed later. Also edited the processor so it can deal with a process that returns multiple signals. Former-commit-id: 7931875480d159d5ca40763e72d16f8c07f2b61a --- core/functions/tracks.py | 153 +++++-------------------------- core/group.py | 130 ++++++++++++++++++++++++++ core/io/base.py | 9 -- core/io/signal.py | 37 -------- core/io/writer.py | 50 ---------- core/processes/base.py | 1 + core/processes/births.py | 1 + core/processes/dsignal.py | 28 ++++++ core/processes/gpsignal.py | 97 ++++++++++++++++++++ core/processes/merger.py | 15 ++- core/processes/peaks.py | 44 +++++++++ core/processes/picker.py | 12 ++- core/processes/savgol.py | 152 +++++++++++++++++++++++++++++++ core/processes/template.py | 29 ++++++ core/processor.py | 176 +++++++++++++++++++++++++++++------- examples/basic_processes.py | 15 +++ examples/group.py | 21 +++++ examples/testing.py | 3 +- 18 files changed, 701 insertions(+), 272 deletions(-) create mode 100644 core/group.py delete mode 100644 core/io/base.py delete mode 100644 core/io/signal.py delete mode 100644 core/io/writer.py create mode 100644 core/processes/base.py create mode 100644 core/processes/births.py create mode 100644 core/processes/dsignal.py create mode 100644 core/processes/gpsignal.py create mode 100644 core/processes/peaks.py create mode 100644 core/processes/savgol.py create mode 100644 core/processes/template.py create mode 100644 examples/basic_processes.py create mode 100644 examples/group.py diff --git a/core/functions/tracks.py b/core/functions/tracks.py index 2895b676..928fc2b1 100644 --- a/core/functions/tracks.py +++ b/core/functions/tracks.py @@ -9,6 +9,7 @@ from typing import Union, List import numpy as np import pandas as pd +from utils_find_1st import find_1st, cmp_larger import more_itertools as mit from scipy.signal import savgol_filter @@ -167,7 +168,7 @@ def rec_bottom(d, k): return rec_bottom(d, d[k]) -def join_tracks(tracks, joinable_pairs, drop=False) -> pd.DataFrame: +def join_tracks(tracks, joinable_pairs, drop=True) -> pd.DataFrame: """ Join pairs of tracks from later tps towards the start. @@ -185,7 +186,7 @@ def join_tracks(tracks, joinable_pairs, drop=False) -> pd.DataFrame: tmp = copy(tracks) for target, source in joinable_pairs: - tmp.loc[target] = join_track_pairs(tmp.loc[target], tmp.loc[source]) + tmp.loc[target] = join_track_pair(tmp.loc[target], tmp.loc[source]) if drop: tmp = tmp.drop(source) @@ -193,11 +194,11 @@ def join_tracks(tracks, joinable_pairs, drop=False) -> pd.DataFrame: return tmp -def join_track_pairs(track1, track2): - tmp = copy(track1) - tmp.loc[track2.dropna().index] = track2.dropna().values - - return tmp +def join_track_pair(target, source): + tgt_copy = copy(target) + end = find_1st(target.values[::-1], 0, cmp_larger) + tgt_copy.iloc[-end:] = source.iloc[-end:].values + return tgt_copy def get_joinable(tracks, smooth=False, tol=0.1, window=5, degree=3) -> dict: @@ -216,24 +217,25 @@ def get_joinable(tracks, smooth=False, tol=0.1, window=5, degree=3) -> dict: """ - tracks.index.names = [ - "trap", - "cell", - ] # TODO remove this once it is integrated in the tracker - # contig=tracks.groupby(['pos','trap']).apply(tracks2contig) - clean = clean_tracks(tracks, min_len=window + 1, min_gr=0.9) # get useful tracks - contig = clean.groupby(["trap"]).apply(get_contiguous_pairs) - contig = contig.loc[contig.apply(len) > 0] + # Commented because we are not smoothing in this step yet # candict = {k:v for d in contig.values for k,v in d.items()} # smooth all relevant tracks - linear = set([k for v in contig.values for i in v for j in i for k in j]) if smooth: # Apply savgol filter TODO fix nans affecting edge placing + clean = clean_tracks( + tracks, min_len=window + 1, min_gr=0.9 + ) # get useful tracks savgol_on_srs = lambda x: non_uniform_savgol(x.index, x.values, window, degree) - smoothed_tracks = clean.loc[linear].apply(savgol_on_srs, 1) + contig = clean.groupby(["trap"]).apply(get_contiguous_pairs) + contig = contig.loc[contig.apply(len) > 0] + flat = set([k for v in contig.values for i in v for j in i for k in j]) + smoothed_tracks = clean.loc[flat].apply(savgol_on_srs, 1) else: - smoothed_tracks = clean.loc[linear].apply(lambda x: np.array(x.values), axis=1) + contig = tracks.groupby(["trap"]).apply(get_contiguous_pairs) + contig = contig.loc[contig.apply(len) > 0] + flat = set([k for v in contig.values for i in v for j in i for k in j]) + smoothed_tracks = tracks.loc[flat].apply(lambda x: np.array(x.values), axis=1) # fetch edges from ids TODO (IF necessary, here we can compare growth rates) idx_to_edge = lambda preposts: [ @@ -311,7 +313,7 @@ def get_closest_pairs(pre: List[float], post: List[float], tol: Union[float, int result = dMetric[ids] / norm ids = ids if len(pre) < len(post) else ids[::-1] - return [idx for idx, res in zip(zip(*ids), result) if res < tol] + return [idx for idx, res in zip(zip(*ids), result) if res <= tol] def solve_matrix(dMetric): @@ -424,116 +426,3 @@ def get_contiguous_pairs(tracks: pd.DataFrame) -> list: # for x,y in candidates: # d[x].append(y) # return d - - -def non_uniform_savgol(x, y, window, polynom): - """ - Applies a Savitzky-Golay filter to y with non-uniform spacing - as defined in x - - This is based on https://dsp.stackexchange.com/questions/1676/savitzky-golay-smoothing-filter-for-not-equally-spaced-data - The borders are interpolated like scipy.signal.savgol_filter would do - - source: https://dsp.stackexchange.com/a/64313 - - Parameters - ---------- - x : array_like - List of floats representing the x values of the data - y : array_like - List of floats representing the y values. Must have same length - as x - window : int (odd) - Window length of datapoints. Must be odd and smaller than x - polynom : int - The order of polynom used. Must be smaller than the window size - - Returns - ------- - np.array of float - The smoothed y values - """ - if len(x) != len(y): - raise ValueError('"x" and "y" must be of the same size') - - if len(x) < window: - raise ValueError("The data size must be larger than the window size") - - if type(window) is not int: - raise TypeError('"window" must be an integer') - - if window % 2 == 0: - raise ValueError('The "window" must be an odd integer') - - if type(polynom) is not int: - raise TypeError('"polynom" must be an integer') - - if polynom >= window: - raise ValueError('"polynom" must be less than "window"') - - half_window = window // 2 - polynom += 1 - - # Initialize variables - A = np.empty((window, polynom)) # Matrix - tA = np.empty((polynom, window)) # Transposed matrix - t = np.empty(window) # Local x variables - y_smoothed = np.full(len(y), np.nan) - - # Start smoothing - for i in range(half_window, len(x) - half_window, 1): - # Center a window of x values on x[i] - for j in range(0, window, 1): - t[j] = x[i + j - half_window] - x[i] - - # Create the initial matrix A and its transposed form tA - for j in range(0, window, 1): - r = 1.0 - for k in range(0, polynom, 1): - A[j, k] = r - tA[k, j] = r - r *= t[j] - - # Multiply the two matrices - tAA = np.matmul(tA, A) - - # Invert the product of the matrices - tAA = np.linalg.inv(tAA) - - # Calculate the pseudoinverse of the design matrix - coeffs = np.matmul(tAA, tA) - - # Calculate c0 which is also the y value for y[i] - y_smoothed[i] = 0 - for j in range(0, window, 1): - y_smoothed[i] += coeffs[0, j] * y[i + j - half_window] - - # If at the end or beginning, store all coefficients for the polynom - if i == half_window: - first_coeffs = np.zeros(polynom) - for j in range(0, window, 1): - for k in range(polynom): - first_coeffs[k] += coeffs[k, j] * y[j] - elif i == len(x) - half_window - 1: - last_coeffs = np.zeros(polynom) - for j in range(0, window, 1): - for k in range(polynom): - last_coeffs[k] += coeffs[k, j] * y[len(y) - window + j] - - # Interpolate the result at the left border - for i in range(0, half_window, 1): - y_smoothed[i] = 0 - x_i = 1 - for j in range(0, polynom, 1): - y_smoothed[i] += first_coeffs[j] * x_i - x_i *= x[i] - x[half_window] - - # Interpolate the result at the right border - for i in range(len(x) - half_window, len(x), 1): - y_smoothed[i] = 0 - x_i = 1 - for j in range(0, polynom, 1): - y_smoothed[i] += last_coeffs[j] * x_i - x_i *= x[i] - x[-half_window - 1] - - return y_smoothed diff --git a/core/group.py b/core/group.py new file mode 100644 index 00000000..99cf00cf --- /dev/null +++ b/core/group.py @@ -0,0 +1,130 @@ +""" +Class to group multiple positions into one using one of several criteria. +""" + +from pathlib import Path +import re + +import h5py +import pandas as pd + +from postprocessor.core.io.base import groupsort +from postprocessor.core.io.signal import Signal + +from postprocessor.core.processes.base import ParametersABC, ProcessABC + + +class GroupParameters(ParametersABC): + def __init__(self, by="name", processes=[], signals=[]): + self.by = by + self.signals = signals + self.processes = processes + + @classmethod + def default(cls): + return cls.from_dict({"by": "name", "signals": [], "processes": []}) + + +class Group(ProcessABC): + def __init__(self, parameters): + super().__init__(parameters) + + def get_position_filenames(self, exp_root, poses): + """ + Get filenames as a dictionary where the key is the position and value the filename. + """ + central_store = Path(exp_root) / "store.h5" + if central_store.exists(): + hdf = h5py.File(central_store, "r") + self.filenames = [pos.attrs["filename"] for pos in hdf["/positions/"]] + hdf.close() + else: # If no central store just list position files in expt root folder + fullfiles = [x for x in Path(exp_root).glob("*store.h5")] + files = [x.name for x in Path(exp_root).glob("*store.h5")] + filenames = [False for _ in poses] + for i, pos in enumerate(poses): + matches = [ + True if re.match(pos + ".*.h5", fname) else False for fname in files + ] + if any(matches): + assert sum(matches) == 1, "More than one match" + filenames[i] = (pos, fullfiles[matches.index(True)]) + + self.filenames = {fname[0]: fname[1] for fname in filenames if fname} + + self.positions = list(self.filenames.keys()) + return self.filenames + + def get_signals(self): + # hdf = h5py.File(central_store, "r") + # keys_d = groupsort(keys) + self.signals = {pos: {} for pos in self.filenames.keys()} + for pos, fname in self.filenames.items(): + for signal in self.parameters.signals: + self.signals[pos][signal] = pd.read_hdf(fname, signal) + + return self.signals + + def gen_groups(self): + if self.by == "group": # Use group names in metadata + pass + elif self.by == "name": # Infer groups from signal concatenation + # Remove last four characters and find commonalities larger than 4 + # characters between posnames and group them that way. + groupnames = list(set([x[:-3] for x in self.positions])) + self.group_signal_tree = {group: [] for group in groupnames} + self.poses_grouped = {group: [] for group in groupnames} + for pos in self.positions: + group = groupnames[groupnames.index(pos[:-3])] + self.group_signal_tree[group].append(self.signals[pos]) + self.poses_grouped[group].append(pos) + + elif ( + type(self.by) == tuple + ): # Manually give groups as tuple or list of positions + pass + + def concat_signals(self): + self.concated_signals = {group: {} for group in self.group_signal_tree} + for k, group in self.group_signal_tree.items(): + for signal in self.parameters.signals: + self.concated_signals[k][signal] = pd.concat( + [g[signal] for g in group], keys=self.poses_grouped[k] + ) + + return self.concated_signals + + def process_signals(self, grouped_signals): + pass + + def run(self, central_store, poses): + + self.get_position_filenames(central_store, poses) + self.get_signals() + self.gen_groups() + self.concat_signals() + # processed_signals = self.process_signals(grouped_signals) + + return self.concated_signals + # return processed_signals + + +poses = [ + x.name.split("store")[0] + for x in Path( + "/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01" + ).rglob("*") + if x.name != "images.h5" +] +gr = Group( + GroupParameters( + signals=["/extraction/general/None/area", "/extraction/mCherry/np_max/median"] + ) +) +gr.run( + central_store="/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01", + poses=poses, +) +signal = Signal( + "/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01/ph_5_04_001store.h5" +) diff --git a/core/io/base.py b/core/io/base.py deleted file mode 100644 index a757a96e..00000000 --- a/core/io/base.py +++ /dev/null @@ -1,9 +0,0 @@ -import h5py - - -class BridgeH5: - def __init__(self, file): - self._hdf = h5py.File(file, "r") - - def close(self): - self._hdf.close() diff --git a/core/io/signal.py b/core/io/signal.py deleted file mode 100644 index 91907bd5..00000000 --- a/core/io/signal.py +++ /dev/null @@ -1,37 +0,0 @@ -import pandas as pd - -from postprocessor.core.io.base import BridgeH5 - - -class Signal(BridgeH5): - """ - Class that fetches data from the hdf5 storage for post-processing - """ - - def __init__(self, file): - super().__init__(file) - - def __getitem__(self, dataset): - dset = self._hdf[dataset][()] - attrs = self._hdf[dataset].attrs - first_dataset = dataset.split("/")[1] + "/" - timepoints = self._hdf[first_dataset].attrs["processed_timepoints"] - - if "cell_label" in self._hdf[dataset].attrs: - ids = pd.MultiIndex.from_tuples( - zip(attrs["trap"], attrs["cell_label"]), names=["trap", "cell_label"] - ) - else: - ids = pd.Index(attrs["trap"], names=["trap"]) - - return pd.DataFrame(dset, index=ids, columns=timepoints) - - @staticmethod - def _if_ext_or_post(name): - if name.startswith("extraction") or name.startswith("postprocessing"): - if len(name.split("/")) > 3: - return name - - @property - def datasets(self): - return signals._hdf.visit(self._if_ext_or_post) diff --git a/core/io/writer.py b/core/io/writer.py deleted file mode 100644 index 240331bb..00000000 --- a/core/io/writer.py +++ /dev/null @@ -1,50 +0,0 @@ -from itertools import accumulate - -import h5py -import pandas as pd - -from postprocessor.core.io.base import BridgeH5 - - -def Writer(BridgeH5): - """ - Class in charge of transforming data into compatible formats - - Decoupling interface from implementation! - - :hdfname: Name of file to write into - """ - - def __init__(self, hdfname): - self._hdf = h5py.Hdf(hdfname, "a") - - def write(self, address, data): - self._hdf.add_group(address) - if type(data) is pd.DataFrame: - self.write_df(address, data) - elif type(data) is np.array: - self.write_np(address, data) - - def write_np(self, address, array): - pass - - def write_df(self, df, tps, path): - print("writing to ", path) - for item in accummulate(path.split("/")[:-2]): - if item not in self._hdf: - self._hdf.create_group(item) - pos_group = f[path.split("/")[1]] - - if path not in pos_group: - pos_group.create_dataset(name=path, shape=df.shape, dtype=df.dtypes[0]) - new_dset = f[path] - new_dset[()] = df.values - if len(df.index.names) > 1: - trap, cell_label = zip(*list(df.index.values)) - new_dset.attrs["trap"] = trap - new_dset.attrs["cell_label"] = cell_label - new_dset.attrs["idnames"] = ["trap", "cell_label"] - else: - new_dset.attrs["trap"] = list(df.index.values) - new_dset.attrs["idnames"] = ["trap"] - pos_group.attrs["processed_timepoints"] = tps diff --git a/core/processes/base.py b/core/processes/base.py new file mode 100644 index 00000000..142bd6c3 --- /dev/null +++ b/core/processes/base.py @@ -0,0 +1 @@ +from agora.base import ParametersABC, ProcessABC \ No newline at end of file diff --git a/core/processes/births.py b/core/processes/births.py new file mode 100644 index 00000000..e5a0d9b4 --- /dev/null +++ b/core/processes/births.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/core/processes/dsignal.py b/core/processes/dsignal.py new file mode 100644 index 00000000..484ad599 --- /dev/null +++ b/core/processes/dsignal.py @@ -0,0 +1,28 @@ +import pandas as pd + +from postprocessor.core.processes.base import ParametersABC, ProcessABC + + +class dsignalParameters(ParametersABC): + """ + :window: Number of timepoints to consider for signal. + """ + + def __init__(self, window: int): + self.window = window + + @classmethod + def default(cls): + return cls.from_dict({"window": 3}) + + +class dsignal(ProcessABC): + """ + Calculate the change in a signal depending on a window + """ + + def __init__(self, parameters: dsignalParameters): + super().__init__(parameters) + + def run(self, signal: pd.DataFrame): + return signal.rolling(window=self.parameters.window, axis=1).mean().diff(axis=1) diff --git a/core/processes/gpsignal.py b/core/processes/gpsignal.py new file mode 100644 index 00000000..f6cdb45f --- /dev/null +++ b/core/processes/gpsignal.py @@ -0,0 +1,97 @@ +"""Gaussian process fit of a Signal.""" +import logging + +from postprocessor.core.processes.base import ParametersABC, ProcessABC +import numpy as np +import pandas as pd + +import gaussian_processes.gaussianprocess as gp + +def estimate_gr(volume, dt, noruns, bounds, verbose): + """ + Parameters + ---------- + + volume : pd.Series + The volume series of a given cell + dt : float + The time interval in hours + noruns : int + The number of runs for optimisation + bounds : dict + The hyperparameter bounds used for optimisation + verbose : bool + If True, prints results + + Returns + ------- + """ + volume = volume.values + n = len(volume) + idx = np.arange(n) + t = idx * dt + y = volume[volume > 0] + x = t[volume > 0] + idx = idx[volume > 0] + # Fit the GP + mg = gp.maternGP(bounds, x, y) + mg.findhyperparameters(noruns=noruns) + if verbose: + mg.results() # Prints out the hyperparameters + mg.predict(x, derivs=2) # Saves the predictions to object + # Save the predictions to a csv file so they can be reused + results = dict(time=mg.x, volume=mg.y, fit_time=mg.xnew, fit_volume=mg.f, + growth_rate=mg.df, d_growth_rate=mg.ddf, + volume_var=mg.fvar, growth_rate_var=mg.dfvar, + d_growth_rate_var=mg.ddfvar) + for name, value in results.items(): + results[name] = np.full((n, ), np.nan) + results[name][idx] = value + return results + +# Give that to a writer: NOTE the writer needs to learn how to write the +# output of a process that returns multiple signals like this one does. + +class gpParameters(ParametersABC): + default_dict = dict(dt=5, + noruns=5, + bounds={0: (-2, 3), + 1: (-2, 1), + 2: (-4, -1)}, + verbose=False) + def __init__(self, dt, noruns, bounds, verbose): + """ + Parameters + ---------- + dt : float + The time step between time points, in minutes + noruns : int + The number of times the optimisation is tried + bounds : dict + Hyperparameter bounds for the Matern Kernel + verbose : bool + Determines whether to print hyperparameter results + """ + self.dt = dt + self.noruns = noruns + self.bounds = bounds + self.verbose = verbose + + @classmethod + def default(cls): + return cls.from_dict(cls.default_dict) + + +class GPSignal(ProcessABC): + """Gaussian process fit of a Signal. + """ + def __init__(self, parameters: gpParameters): + super().__init__(parameters) + + def run(self, signal: pd.DataFrame): + results = signal.apply(lambda x: estimate_gr(x, + **self.parameters.to_dict()), + result_type='expand').T + multi_signal = {name: pd.DataFrame(np.vstack(results[name])) + for name in results.columns} + return multi_signal diff --git a/core/processes/merger.py b/core/processes/merger.py index b0f411b7..f7e90a5c 100644 --- a/core/processes/merger.py +++ b/core/processes/merger.py @@ -1,8 +1,8 @@ from agora.base import ParametersABC, ProcessABC -from postprocessor.core.functions.tracks import clean_tracks, merge_tracks, join_tracks +from postprocessor.core.functions.tracks import get_joinable -class MergerParameters(ParametersABC): +class mergerParameters(ParametersABC): """ :param tol: float or int threshold of average (prediction error/std) necessary to consider two tracks the same. If float is fraction of first track, @@ -43,13 +43,18 @@ class MergerParameters(ParametersABC): ) -class Merger(ProcessABC): +class merger(ProcessABC): """ - TODO Integrate functions/tracks.py inside this class? + TODO check why it needs to be run a few times to complete the merging """ def __init__(self, parameters): super().__init__(parameters) def run(self, signal): - merged, joint_pairs = merge_tracks(signal) # , min_len=self.window + 1) + joinable = get_joinable(signal) + # merged, _ = merge_tracks(signal) # , min_len=self.window + 1) + # indices = (*zip(*merged.index.tolist()),) + # names = merged.index.names + # return {name: ids for name, ids in zip(names, indices)} + return joinable diff --git a/core/processes/peaks.py b/core/processes/peaks.py new file mode 100644 index 00000000..15ddf950 --- /dev/null +++ b/core/processes/peaks.py @@ -0,0 +1,44 @@ +from scipy.signal import argrelmax, argrelmin +from postprocessor.core.processes.base import ParametersABC, ProcessABC + + +class PeaksParameters(ParametersABC): + """ + Parameters + type : str {minima, maxima, "all"}. Determines which type of peaks to identify + order : int Parameter to pass to scipy.signal.argrelextrema indicating + how many points to use for comparison. + """ + + def __init__(self, type): + self.type = type + + @classmethod + def default(cls): + return cls.from_dict({"type": "minima", "order": 3}) + + +class Peaks(ProcessABC): + """ + Identifies a signal sharply dropping. + """ + + def __init__(self, parameters: PeaksParameters): + super().__init__(parameters) + + def run(self, signal: pd.DataFrame): + """ + Returns a boolean dataframe with the same shape as the + original signal but with peaks as true values. + """ + peaks_mat = np.zeros_like(signal, dtype=bool) + + comparator = np.less if self.parameters.type is "minima" else np.greater + peaks_ids = argrelextrema(new_df, comparator=comparator, order=order) + peaks_mat[peak_ids] = True + + return pd.DataFrame( + peaks_mat, + index=signal.index, + columns=signal.columns, + ) diff --git a/core/processes/picker.py b/core/processes/picker.py index 6b0ae140..3668e922 100644 --- a/core/processes/picker.py +++ b/core/processes/picker.py @@ -10,7 +10,7 @@ from agora.base import ParametersABC, ProcessABC from postprocessor.core.functions.tracks import max_ntps, max_nonstop_ntps -class PickerParameters(ParametersABC): +class pickerParameters(ParametersABC): def __init__( self, condition: Tuple[str, Union[float, int]] = None, @@ -32,7 +32,7 @@ class PickerParameters(ParametersABC): ) -class Picker(ProcessABC): +class picker(ProcessABC): """ :cells: Cell object passed to the constructor :condition: Tuple with condition and associated parameter(s), conditions can be @@ -43,7 +43,7 @@ class Picker(ProcessABC): def __init__( self, - parameters: PickerParameters, + parameters: pickerParameters, cells: CellsHDF, ): super().__init__(parameters=parameters) @@ -93,6 +93,8 @@ class Picker(ProcessABC): def run(self, signals): for alg in self.sequence: + if alg == "condition": + pass self.signals = getattr(self, "pick_by_" + alg)(signals) return self.signals @@ -104,7 +106,7 @@ class Picker(ProcessABC): ): threshold_asint = _as_int(threshold, signals.shape[1]) case_mgr = { - "present": signals.apply(max_ntps, axis=1) > threshold_asint, + "present": signals.notna().sum(axis=1) > threshold_asint, "nonstoply_present": signals.apply(max_nonstop_ntps, axis=1) > threshold_asint, "quantile": [np.quantile(signals.values[signals.notna()], threshold)], @@ -114,5 +116,5 @@ class Picker(ProcessABC): def _as_int(threshold: Union[float, int], ntps: int): if type(threshold) is float: - threshold = threshold / ntps + threshold = ntps * threshold return threshold diff --git a/core/processes/savgol.py b/core/processes/savgol.py new file mode 100644 index 00000000..d0256736 --- /dev/null +++ b/core/processes/savgol.py @@ -0,0 +1,152 @@ +import numpy as np +import pandas as pd + +from postprocessor.core.processes.base import ParametersABC, ProcessABC + + +class savgolParameters(ParametersABC): + """ + Parameters + + window : int (odd) + Window length of datapoints. Must be odd and smaller than x + polynom : int + The order of polynom used. Must be smaller than the window size + """ + + def __init__(self, window, polynom): + self.window = window + self.polynom = polynom + + @classmethod + def default(cls): + return cls.from_dict({"window": 3, "polynom": 2}) + + +class savgol(ProcessABC): + """ + Apply Savitzky-Golay filter (works with NaNs, but it might return + NaN regions). + """ + + def __init__(self, parameters: savgolParameters): + super().__init__(parameters) + + def run(self, signal: pd.DataFrame): + savgol_on_srs = lambda x: self.non_uniform_savgol( + x.index, x.values, self.parameters.window, self.parameters.polynom + ) + return signal.apply(savgol_on_srs, 1) + + @staticmethod + def non_uniform_savgol(x, y, window: int, polynom: int): + """ + Applies a Savitzky-Golay filter to y with non-uniform spacing + as defined in x + + This is based on https://dsp.stackexchange.com/questions/1676/savitzky-golay-smoothing-filter-for-not-equally-spaced-data + The borders are interpolated like scipy.signal.savgol_filter would do + + source: https://dsp.stackexchange.com/a/64313 + + Parameters + ---------- + x : array_like + List of floats representing the x values of the data + y : array_like + List of floats representing the y values. Must have same length + as x + window : int (odd) + Window length of datapoints. Must be odd and smaller than x + polynom : int + The order of polynom used. Must be smaller than the window size + + Returns + ------- + np.array of float + The smoothed y values + """ + if len(x) != len(y): + raise ValueError('"x" and "y" must be of the same size') + + if len(x) < window: + raise ValueError("The data size must be larger than the window size") + + if type(window) is not int: + raise TypeError('"window" must be an integer') + + if window % 2 == 0: + raise ValueError('The "window" must be an odd integer') + + if type(polynom) is not int: + raise TypeError('"polynom" must be an integer') + + if polynom >= window: + raise ValueError('"polynom" must be less than "window"') + + half_window = window // 2 + polynom += 1 + + # Initialize variables + A = np.empty((window, polynom)) # Matrix + tA = np.empty((polynom, window)) # Transposed matrix + t = np.empty(window) # Local x variables + y_smoothed = np.full(len(y), np.nan) + + # Start smoothing + for i in range(half_window, len(x) - half_window, 1): + # Center a window of x values on x[i] + for j in range(0, window, 1): + t[j] = x[i + j - half_window] - x[i] + + # Create the initial matrix A and its transposed form tA + for j in range(0, window, 1): + r = 1.0 + for k in range(0, polynom, 1): + A[j, k] = r + tA[k, j] = r + r *= t[j] + + # Multiply the two matrices + tAA = np.matmul(tA, A) + + # Invert the product of the matrices + tAA = np.linalg.inv(tAA) + + # Calculate the pseudoinverse of the design matrix + coeffs = np.matmul(tAA, tA) + + # Calculate c0 which is also the y value for y[i] + y_smoothed[i] = 0 + for j in range(0, window, 1): + y_smoothed[i] += coeffs[0, j] * y[i + j - half_window] + + # If at the end or beginning, store all coefficients for the polynom + if i == half_window: + first_coeffs = np.zeros(polynom) + for j in range(0, window, 1): + for k in range(polynom): + first_coeffs[k] += coeffs[k, j] * y[j] + elif i == len(x) - half_window - 1: + last_coeffs = np.zeros(polynom) + for j in range(0, window, 1): + for k in range(polynom): + last_coeffs[k] += coeffs[k, j] * y[len(y) - window + j] + + # Interpolate the result at the left border + for i in range(0, half_window, 1): + y_smoothed[i] = 0 + x_i = 1 + for j in range(0, polynom, 1): + y_smoothed[i] += first_coeffs[j] * x_i + x_i *= x[i] - x[half_window] + + # Interpolate the result at the right border + for i in range(len(x) - half_window, len(x), 1): + y_smoothed[i] = 0 + x_i = 1 + for j in range(0, polynom, 1): + y_smoothed[i] += last_coeffs[j] * x_i + x_i *= x[i] - x[-half_window - 1] + + return y_smoothed diff --git a/core/processes/template.py b/core/processes/template.py new file mode 100644 index 00000000..075f21b2 --- /dev/null +++ b/core/processes/template.py @@ -0,0 +1,29 @@ +from postprocessor.core.processes.base import ParametersABC, ProcessABC +from postprocessor.core.functions.tracks import clean_tracks, merge_tracks, join_tracks + + +class ParametersTemplate(ParametersABC): + """ + Parameters + """ + + def __init__( + self, + ): + super().__init__() + + @classmethod + def default(cls): + return cls.from_dict({}) + + +class ProcessTemplate(ProcessABC): + """ + Template for process class. + """ + + def __init__(self, parameters: ParametersTemplate): + super().__init__(parameters) + + def run(self): + pass diff --git a/core/processor.py b/core/processor.py index d51a2910..4ccea664 100644 --- a/core/processor.py +++ b/core/processor.py @@ -1,30 +1,37 @@ +import h5py from typing import List, Dict, Union +from pydoc import locate + +import numpy as np import pandas as pd from agora.base import ParametersABC -from postprocessor.core.processes.merger import MergerParameters, Merger -from postprocessor.core.processes.picker import PickerParameters, Picker -from postprocessor.core.io.writer import Writer -from postprocessor.core.io.signal import Signal +from core.io.writer import Writer +from core.io.signal import Signal from core.cells import Cells -class PostProParameters(ParametersABC): +class PostProcessorParameters(ParametersABC): """ Anthology of parameters used for postprocessing :merger: :picker: parameters for picker - :processes: List of processes that can be found in ./processes - :datasets: Dictionary - """ + :processes: Dict processes:[objectives], 'processes' are defined in ./processes/ + while objectives are relative or absolute paths to datasets. If relative paths the + post-processed addresses are used. - def __init__(self, merger=None, picker=None, processes=[], datasets=None): - self.merger: MergerParameters = merger - self.picker: PickerParameters = picker - self.processes: List = processes + """ - self.datasets: Dict = datasets + def __init__( + self, + targets={}, + parameters={}, + outpaths={}, + ): + self.targets: Dict = targets + self.parameters: Dict = parameters + self.outpaths: Dict = outpaths def __getitem__(self, item): return getattr(self, item) @@ -33,13 +40,23 @@ class PostProParameters(ParametersABC): def default(cls, kind=None): if kind == "defaults" or kind == None: return cls( - merger=MergerParameters.default(), - picker=PickerParameters.default(), - datasets={ - "merger": "/extraction/general/None/area", - "picker": "/extraction/general/None/area", - "processes": [], + targets={ + "prepost": { + "merger": "/extraction/general/None/area", + "picker": ["/extraction/general/None/area"], + }, + "processes": { + "dsignal": ["/extraction/general/None/area"], + # "savgol": ["/extraction/general/None/area"], + }, }, + parameters={ + "prepost": { + "merger": mergerParameters.default(), + "picker": pickerParameters.default(), + } + }, + outpaths={}, ) def to_dict(self): @@ -49,24 +66,119 @@ class PostProParameters(ParametersABC): class PostProcessor: def __init__(self, filename, parameters): self.parameters = parameters - self._signals = Signal(filename) + self._filename = filename + self._signal = Signal(filename) self._writer = Writer(filename) - self.datasets = parameters["datasets"] - self.merger = Merger(parameters["merger"]) - self.picker = Picker( - parameters=parameters["picker"], cells=Cells.from_source(filename) + # self.outpaths = parameters["outpaths"] + self.merger = merger(parameters["parameters"]["prepost"]["merger"]) + + self.picker = picker( + parameters=parameters["parameters"]["prepost"]["picker"], + cells=Cells.from_source(filename), ) - self.processes = [ - self.get_process(process) for process in parameters["processes"] - ] + self.classfun = { + process: self.get_process(process) + for process in parameters["targets"]["processes"] + } + self.parameters_classfun = { + process: self.get_parameters(process) + for process in parameters["targets"]["processes"] + } + self.targets = parameters["targets"] + + @staticmethod + def get_process(process): + """ + Dynamically import a process class from the 'processes' folder. + Assumes process filename and class name are the same + """ + return locate("postprocessor.core.processes." + process + "." + process) + + @staticmethod + def get_parameters(process): + """ + Dynamically import a process class from the 'processes' folder. + Assumes process filename and class name are the same + """ + return locate( + "postprocessor.core.processes." + process + "." + process + "Parameters" + ) + + def run_prepost(self): + """Important processes run before normal post-processing ones""" + merge_events = self.merger.run(self._signal[self.targets["prepost"]["merger"]]) + + with h5py.File(self._filename, "r") as f: + prev_idchanges = self._signal.get_merges() + + changes_history = list(prev_idchanges) + [np.array(x) for x in merge_events] + self._writer.write("modifiers/merges", data=changes_history) + + picks = self.picker.run(self._signal[self.targets["prepost"]["picker"][0]]) + self._writer.write("modifiers/picks", data=picks) def run(self): - self.merger.run(self._signals[self.datasets["merger"]]) - self.picker.run(self._signals[self.datasets["picker"]]) - for process, dataset in zip(self.processes, self.datasets["processes"]): - process_result = process.run(self._signals.get_dataset(dataset)) - self.writer.write(process_result, dataset) + self.run_prepost() + + for process, datasets in self.targets["processes"].items(): + if process in self.parameters["parameters"].get( + "processes", {} + ): # If we assigned parameters + parameters = self.parameters_classfun[process](self.parameters[process]) + + else: + parameters = self.parameters_classfun[process].default() + + loaded_process = self.classfun[process](parameters) + for dataset in datasets: + if isinstance(dataset, list): # multisignal process + signal = [self._signal[d] for d in dataset] + elif isinstance(dataset, str): + signal = self._signal[dataset] + else: + raise ("Incorrect dataset") + + result = loaded_process.run(signal) + + if process in self.parameters.to_dict()["outpaths"]: + outpath = self.parameters.to_dict()["outpaths"][process] + elif isinstance(dataset, list): + # If no outpath defined, place the result in the minimum common + # branch of all signals used + prefix = "".join( + prefix + c[0] + for c in takewhile( + lambda x: all(x[0] == y for y in x), zip(*dataset) + ) + ) + outpath = ( + prefix + + "_".join( # TODO check that it always finishes in '/' + [d[len(prefix) :].replace("/", "_") for d in dataset] + ) + ) + elif isinstance(dataset, str): + outpath = dataset[1:].replace("/", "_") + else: + raise ("Outpath not defined", type(dataset)) + + if isinstance(result, dict): # Multiple Signals as output + for k, v in result: + self.write_result( + "/postprocessing/" + process + "/" + outpath + + f'/{k}', + v, metadata={} + ) + else: + self.write_result( + "/postprocessing/" + process + "/" + outpath, result, metadata={} + ) + + def write_result( + self, path: str, result: Union[List, pd.DataFrame, np.ndarray], metadata: Dict + ): + self._writer.write(path, result, meta=metadata) def _if_dict(item): diff --git a/examples/basic_processes.py b/examples/basic_processes.py new file mode 100644 index 00000000..c322fd4b --- /dev/null +++ b/examples/basic_processes.py @@ -0,0 +1,15 @@ +from postprocessor.core.processor import PostProcessor, PostProcessorParameters + +params = PostProcessorParameters.default() +pp = PostProcessor( + "/shared_libs/pipeline-core/scripts/pH_calibration_dual_phl__ura8__by4741__01/ph_5_29_025store.h5", + params, +) +tmp = pp.run() + +import h5py + +# f = h5py.File( +# "/shared_libs/pipeline-core/scripts/pH_calibration_dual_phl__ura8__by4741__01/ph_5_29_025store.h5", +# "a", +# ) diff --git a/examples/group.py b/examples/group.py new file mode 100644 index 00000000..1e59978a --- /dev/null +++ b/examples/group.py @@ -0,0 +1,21 @@ +from pathlib import Path + +from postprocessor.core.group import GroupParameters, Group + +poses = [ + x.name.split("store")[0] + for x in Path( + "/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01" + ).rglob("*") + if x.name != "images.h5" +] + +gr = Group( + GroupParameters( + signals=["/extraction/general/None/area", "/extraction/mCherry/np_max/median"] + ) +) +gr.run( + central_store="/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01", + poses=poses, +) diff --git a/examples/testing.py b/examples/testing.py index 5a0b6516..d5420d83 100644 --- a/examples/testing.py +++ b/examples/testing.py @@ -12,6 +12,7 @@ import compress_pickle from postprocessor.core.postprocessor import PostProcessor from postprocessor.core.tracks import get_avg_grs, non_uniform_savgol +from postprocessor.core.tracks import clean_tracks, merge_tracks, join_tracks from postprocessor.core.ph import * sns.set_context("talk", font_scale=1.8) @@ -68,8 +69,6 @@ def get_clean_dfs(dfs=None): clean_dfs = compress_pickle.load("/home/alan/Documents/tmp/clean_dfs.gz") else: - from postprocessor.core.tracks import clean_tracks, merge_tracks, join_tracks - # Clean timelapse clean = clean_tracks(new_dfs[("general", None, "area")]) tra, joint = merge_tracks(clean) -- GitLab