diff --git a/core/functions/tracks.py b/core/functions/tracks.py index 2895b6764aa8a72749c36e35251e3864c87df066..928fc2b177c63857d0ff4149771b5f04b581478c 100644 --- a/core/functions/tracks.py +++ b/core/functions/tracks.py @@ -9,6 +9,7 @@ from typing import Union, List import numpy as np import pandas as pd +from utils_find_1st import find_1st, cmp_larger import more_itertools as mit from scipy.signal import savgol_filter @@ -167,7 +168,7 @@ def rec_bottom(d, k): return rec_bottom(d, d[k]) -def join_tracks(tracks, joinable_pairs, drop=False) -> pd.DataFrame: +def join_tracks(tracks, joinable_pairs, drop=True) -> pd.DataFrame: """ Join pairs of tracks from later tps towards the start. @@ -185,7 +186,7 @@ def join_tracks(tracks, joinable_pairs, drop=False) -> pd.DataFrame: tmp = copy(tracks) for target, source in joinable_pairs: - tmp.loc[target] = join_track_pairs(tmp.loc[target], tmp.loc[source]) + tmp.loc[target] = join_track_pair(tmp.loc[target], tmp.loc[source]) if drop: tmp = tmp.drop(source) @@ -193,11 +194,11 @@ def join_tracks(tracks, joinable_pairs, drop=False) -> pd.DataFrame: return tmp -def join_track_pairs(track1, track2): - tmp = copy(track1) - tmp.loc[track2.dropna().index] = track2.dropna().values - - return tmp +def join_track_pair(target, source): + tgt_copy = copy(target) + end = find_1st(target.values[::-1], 0, cmp_larger) + tgt_copy.iloc[-end:] = source.iloc[-end:].values + return tgt_copy def get_joinable(tracks, smooth=False, tol=0.1, window=5, degree=3) -> dict: @@ -216,24 +217,25 @@ def get_joinable(tracks, smooth=False, tol=0.1, window=5, degree=3) -> dict: """ - tracks.index.names = [ - "trap", - "cell", - ] # TODO remove this once it is integrated in the tracker - # contig=tracks.groupby(['pos','trap']).apply(tracks2contig) - clean = clean_tracks(tracks, min_len=window + 1, min_gr=0.9) # get useful tracks - contig = clean.groupby(["trap"]).apply(get_contiguous_pairs) - contig = contig.loc[contig.apply(len) > 0] + # Commented because we are not smoothing in this step yet # candict = {k:v for d in contig.values for k,v in d.items()} # smooth all relevant tracks - linear = set([k for v in contig.values for i in v for j in i for k in j]) if smooth: # Apply savgol filter TODO fix nans affecting edge placing + clean = clean_tracks( + tracks, min_len=window + 1, min_gr=0.9 + ) # get useful tracks savgol_on_srs = lambda x: non_uniform_savgol(x.index, x.values, window, degree) - smoothed_tracks = clean.loc[linear].apply(savgol_on_srs, 1) + contig = clean.groupby(["trap"]).apply(get_contiguous_pairs) + contig = contig.loc[contig.apply(len) > 0] + flat = set([k for v in contig.values for i in v for j in i for k in j]) + smoothed_tracks = clean.loc[flat].apply(savgol_on_srs, 1) else: - smoothed_tracks = clean.loc[linear].apply(lambda x: np.array(x.values), axis=1) + contig = tracks.groupby(["trap"]).apply(get_contiguous_pairs) + contig = contig.loc[contig.apply(len) > 0] + flat = set([k for v in contig.values for i in v for j in i for k in j]) + smoothed_tracks = tracks.loc[flat].apply(lambda x: np.array(x.values), axis=1) # fetch edges from ids TODO (IF necessary, here we can compare growth rates) idx_to_edge = lambda preposts: [ @@ -311,7 +313,7 @@ def get_closest_pairs(pre: List[float], post: List[float], tol: Union[float, int result = dMetric[ids] / norm ids = ids if len(pre) < len(post) else ids[::-1] - return [idx for idx, res in zip(zip(*ids), result) if res < tol] + return [idx for idx, res in zip(zip(*ids), result) if res <= tol] def solve_matrix(dMetric): @@ -424,116 +426,3 @@ def get_contiguous_pairs(tracks: pd.DataFrame) -> list: # for x,y in candidates: # d[x].append(y) # return d - - -def non_uniform_savgol(x, y, window, polynom): - """ - Applies a Savitzky-Golay filter to y with non-uniform spacing - as defined in x - - This is based on https://dsp.stackexchange.com/questions/1676/savitzky-golay-smoothing-filter-for-not-equally-spaced-data - The borders are interpolated like scipy.signal.savgol_filter would do - - source: https://dsp.stackexchange.com/a/64313 - - Parameters - ---------- - x : array_like - List of floats representing the x values of the data - y : array_like - List of floats representing the y values. Must have same length - as x - window : int (odd) - Window length of datapoints. Must be odd and smaller than x - polynom : int - The order of polynom used. Must be smaller than the window size - - Returns - ------- - np.array of float - The smoothed y values - """ - if len(x) != len(y): - raise ValueError('"x" and "y" must be of the same size') - - if len(x) < window: - raise ValueError("The data size must be larger than the window size") - - if type(window) is not int: - raise TypeError('"window" must be an integer') - - if window % 2 == 0: - raise ValueError('The "window" must be an odd integer') - - if type(polynom) is not int: - raise TypeError('"polynom" must be an integer') - - if polynom >= window: - raise ValueError('"polynom" must be less than "window"') - - half_window = window // 2 - polynom += 1 - - # Initialize variables - A = np.empty((window, polynom)) # Matrix - tA = np.empty((polynom, window)) # Transposed matrix - t = np.empty(window) # Local x variables - y_smoothed = np.full(len(y), np.nan) - - # Start smoothing - for i in range(half_window, len(x) - half_window, 1): - # Center a window of x values on x[i] - for j in range(0, window, 1): - t[j] = x[i + j - half_window] - x[i] - - # Create the initial matrix A and its transposed form tA - for j in range(0, window, 1): - r = 1.0 - for k in range(0, polynom, 1): - A[j, k] = r - tA[k, j] = r - r *= t[j] - - # Multiply the two matrices - tAA = np.matmul(tA, A) - - # Invert the product of the matrices - tAA = np.linalg.inv(tAA) - - # Calculate the pseudoinverse of the design matrix - coeffs = np.matmul(tAA, tA) - - # Calculate c0 which is also the y value for y[i] - y_smoothed[i] = 0 - for j in range(0, window, 1): - y_smoothed[i] += coeffs[0, j] * y[i + j - half_window] - - # If at the end or beginning, store all coefficients for the polynom - if i == half_window: - first_coeffs = np.zeros(polynom) - for j in range(0, window, 1): - for k in range(polynom): - first_coeffs[k] += coeffs[k, j] * y[j] - elif i == len(x) - half_window - 1: - last_coeffs = np.zeros(polynom) - for j in range(0, window, 1): - for k in range(polynom): - last_coeffs[k] += coeffs[k, j] * y[len(y) - window + j] - - # Interpolate the result at the left border - for i in range(0, half_window, 1): - y_smoothed[i] = 0 - x_i = 1 - for j in range(0, polynom, 1): - y_smoothed[i] += first_coeffs[j] * x_i - x_i *= x[i] - x[half_window] - - # Interpolate the result at the right border - for i in range(len(x) - half_window, len(x), 1): - y_smoothed[i] = 0 - x_i = 1 - for j in range(0, polynom, 1): - y_smoothed[i] += last_coeffs[j] * x_i - x_i *= x[i] - x[-half_window - 1] - - return y_smoothed diff --git a/core/group.py b/core/group.py new file mode 100644 index 0000000000000000000000000000000000000000..99cf00cf775f034d69be6f29fd7d9bb77dba5f54 --- /dev/null +++ b/core/group.py @@ -0,0 +1,130 @@ +""" +Class to group multiple positions into one using one of several criteria. +""" + +from pathlib import Path +import re + +import h5py +import pandas as pd + +from postprocessor.core.io.base import groupsort +from postprocessor.core.io.signal import Signal + +from postprocessor.core.processes.base import ParametersABC, ProcessABC + + +class GroupParameters(ParametersABC): + def __init__(self, by="name", processes=[], signals=[]): + self.by = by + self.signals = signals + self.processes = processes + + @classmethod + def default(cls): + return cls.from_dict({"by": "name", "signals": [], "processes": []}) + + +class Group(ProcessABC): + def __init__(self, parameters): + super().__init__(parameters) + + def get_position_filenames(self, exp_root, poses): + """ + Get filenames as a dictionary where the key is the position and value the filename. + """ + central_store = Path(exp_root) / "store.h5" + if central_store.exists(): + hdf = h5py.File(central_store, "r") + self.filenames = [pos.attrs["filename"] for pos in hdf["/positions/"]] + hdf.close() + else: # If no central store just list position files in expt root folder + fullfiles = [x for x in Path(exp_root).glob("*store.h5")] + files = [x.name for x in Path(exp_root).glob("*store.h5")] + filenames = [False for _ in poses] + for i, pos in enumerate(poses): + matches = [ + True if re.match(pos + ".*.h5", fname) else False for fname in files + ] + if any(matches): + assert sum(matches) == 1, "More than one match" + filenames[i] = (pos, fullfiles[matches.index(True)]) + + self.filenames = {fname[0]: fname[1] for fname in filenames if fname} + + self.positions = list(self.filenames.keys()) + return self.filenames + + def get_signals(self): + # hdf = h5py.File(central_store, "r") + # keys_d = groupsort(keys) + self.signals = {pos: {} for pos in self.filenames.keys()} + for pos, fname in self.filenames.items(): + for signal in self.parameters.signals: + self.signals[pos][signal] = pd.read_hdf(fname, signal) + + return self.signals + + def gen_groups(self): + if self.by == "group": # Use group names in metadata + pass + elif self.by == "name": # Infer groups from signal concatenation + # Remove last four characters and find commonalities larger than 4 + # characters between posnames and group them that way. + groupnames = list(set([x[:-3] for x in self.positions])) + self.group_signal_tree = {group: [] for group in groupnames} + self.poses_grouped = {group: [] for group in groupnames} + for pos in self.positions: + group = groupnames[groupnames.index(pos[:-3])] + self.group_signal_tree[group].append(self.signals[pos]) + self.poses_grouped[group].append(pos) + + elif ( + type(self.by) == tuple + ): # Manually give groups as tuple or list of positions + pass + + def concat_signals(self): + self.concated_signals = {group: {} for group in self.group_signal_tree} + for k, group in self.group_signal_tree.items(): + for signal in self.parameters.signals: + self.concated_signals[k][signal] = pd.concat( + [g[signal] for g in group], keys=self.poses_grouped[k] + ) + + return self.concated_signals + + def process_signals(self, grouped_signals): + pass + + def run(self, central_store, poses): + + self.get_position_filenames(central_store, poses) + self.get_signals() + self.gen_groups() + self.concat_signals() + # processed_signals = self.process_signals(grouped_signals) + + return self.concated_signals + # return processed_signals + + +poses = [ + x.name.split("store")[0] + for x in Path( + "/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01" + ).rglob("*") + if x.name != "images.h5" +] +gr = Group( + GroupParameters( + signals=["/extraction/general/None/area", "/extraction/mCherry/np_max/median"] + ) +) +gr.run( + central_store="/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01", + poses=poses, +) +signal = Signal( + "/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01/ph_5_04_001store.h5" +) diff --git a/core/io/base.py b/core/io/base.py deleted file mode 100644 index a757a96ed2fa0dcad60a2edbd808ec0e875edd4e..0000000000000000000000000000000000000000 --- a/core/io/base.py +++ /dev/null @@ -1,9 +0,0 @@ -import h5py - - -class BridgeH5: - def __init__(self, file): - self._hdf = h5py.File(file, "r") - - def close(self): - self._hdf.close() diff --git a/core/io/signal.py b/core/io/signal.py deleted file mode 100644 index 91907bd5cccc1bedaf9eb9c51faa6ee17e7a322c..0000000000000000000000000000000000000000 --- a/core/io/signal.py +++ /dev/null @@ -1,37 +0,0 @@ -import pandas as pd - -from postprocessor.core.io.base import BridgeH5 - - -class Signal(BridgeH5): - """ - Class that fetches data from the hdf5 storage for post-processing - """ - - def __init__(self, file): - super().__init__(file) - - def __getitem__(self, dataset): - dset = self._hdf[dataset][()] - attrs = self._hdf[dataset].attrs - first_dataset = dataset.split("/")[1] + "/" - timepoints = self._hdf[first_dataset].attrs["processed_timepoints"] - - if "cell_label" in self._hdf[dataset].attrs: - ids = pd.MultiIndex.from_tuples( - zip(attrs["trap"], attrs["cell_label"]), names=["trap", "cell_label"] - ) - else: - ids = pd.Index(attrs["trap"], names=["trap"]) - - return pd.DataFrame(dset, index=ids, columns=timepoints) - - @staticmethod - def _if_ext_or_post(name): - if name.startswith("extraction") or name.startswith("postprocessing"): - if len(name.split("/")) > 3: - return name - - @property - def datasets(self): - return signals._hdf.visit(self._if_ext_or_post) diff --git a/core/io/writer.py b/core/io/writer.py deleted file mode 100644 index 240331bb8a9008ae37a128f9d12a437e9937050d..0000000000000000000000000000000000000000 --- a/core/io/writer.py +++ /dev/null @@ -1,50 +0,0 @@ -from itertools import accumulate - -import h5py -import pandas as pd - -from postprocessor.core.io.base import BridgeH5 - - -def Writer(BridgeH5): - """ - Class in charge of transforming data into compatible formats - - Decoupling interface from implementation! - - :hdfname: Name of file to write into - """ - - def __init__(self, hdfname): - self._hdf = h5py.Hdf(hdfname, "a") - - def write(self, address, data): - self._hdf.add_group(address) - if type(data) is pd.DataFrame: - self.write_df(address, data) - elif type(data) is np.array: - self.write_np(address, data) - - def write_np(self, address, array): - pass - - def write_df(self, df, tps, path): - print("writing to ", path) - for item in accummulate(path.split("/")[:-2]): - if item not in self._hdf: - self._hdf.create_group(item) - pos_group = f[path.split("/")[1]] - - if path not in pos_group: - pos_group.create_dataset(name=path, shape=df.shape, dtype=df.dtypes[0]) - new_dset = f[path] - new_dset[()] = df.values - if len(df.index.names) > 1: - trap, cell_label = zip(*list(df.index.values)) - new_dset.attrs["trap"] = trap - new_dset.attrs["cell_label"] = cell_label - new_dset.attrs["idnames"] = ["trap", "cell_label"] - else: - new_dset.attrs["trap"] = list(df.index.values) - new_dset.attrs["idnames"] = ["trap"] - pos_group.attrs["processed_timepoints"] = tps diff --git a/core/processes/base.py b/core/processes/base.py new file mode 100644 index 0000000000000000000000000000000000000000..142bd6c3fa2c5ef0fc578d3d09bfa47530482619 --- /dev/null +++ b/core/processes/base.py @@ -0,0 +1 @@ +from agora.base import ParametersABC, ProcessABC \ No newline at end of file diff --git a/core/processes/births.py b/core/processes/births.py new file mode 100644 index 0000000000000000000000000000000000000000..e5a0d9b4834ec8f46d6e0d1256c6dcaad2e460fe --- /dev/null +++ b/core/processes/births.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/core/processes/dsignal.py b/core/processes/dsignal.py new file mode 100644 index 0000000000000000000000000000000000000000..484ad59944c12cd12005c02b9470b84d8f2f19d7 --- /dev/null +++ b/core/processes/dsignal.py @@ -0,0 +1,28 @@ +import pandas as pd + +from postprocessor.core.processes.base import ParametersABC, ProcessABC + + +class dsignalParameters(ParametersABC): + """ + :window: Number of timepoints to consider for signal. + """ + + def __init__(self, window: int): + self.window = window + + @classmethod + def default(cls): + return cls.from_dict({"window": 3}) + + +class dsignal(ProcessABC): + """ + Calculate the change in a signal depending on a window + """ + + def __init__(self, parameters: dsignalParameters): + super().__init__(parameters) + + def run(self, signal: pd.DataFrame): + return signal.rolling(window=self.parameters.window, axis=1).mean().diff(axis=1) diff --git a/core/processes/gpsignal.py b/core/processes/gpsignal.py new file mode 100644 index 0000000000000000000000000000000000000000..f6cdb45fbd1416630382246fb3152c9d99184076 --- /dev/null +++ b/core/processes/gpsignal.py @@ -0,0 +1,97 @@ +"""Gaussian process fit of a Signal.""" +import logging + +from postprocessor.core.processes.base import ParametersABC, ProcessABC +import numpy as np +import pandas as pd + +import gaussian_processes.gaussianprocess as gp + +def estimate_gr(volume, dt, noruns, bounds, verbose): + """ + Parameters + ---------- + + volume : pd.Series + The volume series of a given cell + dt : float + The time interval in hours + noruns : int + The number of runs for optimisation + bounds : dict + The hyperparameter bounds used for optimisation + verbose : bool + If True, prints results + + Returns + ------- + """ + volume = volume.values + n = len(volume) + idx = np.arange(n) + t = idx * dt + y = volume[volume > 0] + x = t[volume > 0] + idx = idx[volume > 0] + # Fit the GP + mg = gp.maternGP(bounds, x, y) + mg.findhyperparameters(noruns=noruns) + if verbose: + mg.results() # Prints out the hyperparameters + mg.predict(x, derivs=2) # Saves the predictions to object + # Save the predictions to a csv file so they can be reused + results = dict(time=mg.x, volume=mg.y, fit_time=mg.xnew, fit_volume=mg.f, + growth_rate=mg.df, d_growth_rate=mg.ddf, + volume_var=mg.fvar, growth_rate_var=mg.dfvar, + d_growth_rate_var=mg.ddfvar) + for name, value in results.items(): + results[name] = np.full((n, ), np.nan) + results[name][idx] = value + return results + +# Give that to a writer: NOTE the writer needs to learn how to write the +# output of a process that returns multiple signals like this one does. + +class gpParameters(ParametersABC): + default_dict = dict(dt=5, + noruns=5, + bounds={0: (-2, 3), + 1: (-2, 1), + 2: (-4, -1)}, + verbose=False) + def __init__(self, dt, noruns, bounds, verbose): + """ + Parameters + ---------- + dt : float + The time step between time points, in minutes + noruns : int + The number of times the optimisation is tried + bounds : dict + Hyperparameter bounds for the Matern Kernel + verbose : bool + Determines whether to print hyperparameter results + """ + self.dt = dt + self.noruns = noruns + self.bounds = bounds + self.verbose = verbose + + @classmethod + def default(cls): + return cls.from_dict(cls.default_dict) + + +class GPSignal(ProcessABC): + """Gaussian process fit of a Signal. + """ + def __init__(self, parameters: gpParameters): + super().__init__(parameters) + + def run(self, signal: pd.DataFrame): + results = signal.apply(lambda x: estimate_gr(x, + **self.parameters.to_dict()), + result_type='expand').T + multi_signal = {name: pd.DataFrame(np.vstack(results[name])) + for name in results.columns} + return multi_signal diff --git a/core/processes/merger.py b/core/processes/merger.py index b0f411b7f9baea94aadcbd47f62ea4171a179b7d..f7e90a5cb0b2fe50be7542c642ebd986d8f63152 100644 --- a/core/processes/merger.py +++ b/core/processes/merger.py @@ -1,8 +1,8 @@ from agora.base import ParametersABC, ProcessABC -from postprocessor.core.functions.tracks import clean_tracks, merge_tracks, join_tracks +from postprocessor.core.functions.tracks import get_joinable -class MergerParameters(ParametersABC): +class mergerParameters(ParametersABC): """ :param tol: float or int threshold of average (prediction error/std) necessary to consider two tracks the same. If float is fraction of first track, @@ -43,13 +43,18 @@ class MergerParameters(ParametersABC): ) -class Merger(ProcessABC): +class merger(ProcessABC): """ - TODO Integrate functions/tracks.py inside this class? + TODO check why it needs to be run a few times to complete the merging """ def __init__(self, parameters): super().__init__(parameters) def run(self, signal): - merged, joint_pairs = merge_tracks(signal) # , min_len=self.window + 1) + joinable = get_joinable(signal) + # merged, _ = merge_tracks(signal) # , min_len=self.window + 1) + # indices = (*zip(*merged.index.tolist()),) + # names = merged.index.names + # return {name: ids for name, ids in zip(names, indices)} + return joinable diff --git a/core/processes/peaks.py b/core/processes/peaks.py new file mode 100644 index 0000000000000000000000000000000000000000..15ddf950ea1ebb16a6a46177474dbec14489e1f3 --- /dev/null +++ b/core/processes/peaks.py @@ -0,0 +1,44 @@ +from scipy.signal import argrelmax, argrelmin +from postprocessor.core.processes.base import ParametersABC, ProcessABC + + +class PeaksParameters(ParametersABC): + """ + Parameters + type : str {minima, maxima, "all"}. Determines which type of peaks to identify + order : int Parameter to pass to scipy.signal.argrelextrema indicating + how many points to use for comparison. + """ + + def __init__(self, type): + self.type = type + + @classmethod + def default(cls): + return cls.from_dict({"type": "minima", "order": 3}) + + +class Peaks(ProcessABC): + """ + Identifies a signal sharply dropping. + """ + + def __init__(self, parameters: PeaksParameters): + super().__init__(parameters) + + def run(self, signal: pd.DataFrame): + """ + Returns a boolean dataframe with the same shape as the + original signal but with peaks as true values. + """ + peaks_mat = np.zeros_like(signal, dtype=bool) + + comparator = np.less if self.parameters.type is "minima" else np.greater + peaks_ids = argrelextrema(new_df, comparator=comparator, order=order) + peaks_mat[peak_ids] = True + + return pd.DataFrame( + peaks_mat, + index=signal.index, + columns=signal.columns, + ) diff --git a/core/processes/picker.py b/core/processes/picker.py index 6b0ae14065c311a7c497e86ada74a5bb9da77d1f..3668e92291d9889cd7def4a04d8302c2296bfa98 100644 --- a/core/processes/picker.py +++ b/core/processes/picker.py @@ -10,7 +10,7 @@ from agora.base import ParametersABC, ProcessABC from postprocessor.core.functions.tracks import max_ntps, max_nonstop_ntps -class PickerParameters(ParametersABC): +class pickerParameters(ParametersABC): def __init__( self, condition: Tuple[str, Union[float, int]] = None, @@ -32,7 +32,7 @@ class PickerParameters(ParametersABC): ) -class Picker(ProcessABC): +class picker(ProcessABC): """ :cells: Cell object passed to the constructor :condition: Tuple with condition and associated parameter(s), conditions can be @@ -43,7 +43,7 @@ class Picker(ProcessABC): def __init__( self, - parameters: PickerParameters, + parameters: pickerParameters, cells: CellsHDF, ): super().__init__(parameters=parameters) @@ -93,6 +93,8 @@ class Picker(ProcessABC): def run(self, signals): for alg in self.sequence: + if alg == "condition": + pass self.signals = getattr(self, "pick_by_" + alg)(signals) return self.signals @@ -104,7 +106,7 @@ class Picker(ProcessABC): ): threshold_asint = _as_int(threshold, signals.shape[1]) case_mgr = { - "present": signals.apply(max_ntps, axis=1) > threshold_asint, + "present": signals.notna().sum(axis=1) > threshold_asint, "nonstoply_present": signals.apply(max_nonstop_ntps, axis=1) > threshold_asint, "quantile": [np.quantile(signals.values[signals.notna()], threshold)], @@ -114,5 +116,5 @@ class Picker(ProcessABC): def _as_int(threshold: Union[float, int], ntps: int): if type(threshold) is float: - threshold = threshold / ntps + threshold = ntps * threshold return threshold diff --git a/core/processes/savgol.py b/core/processes/savgol.py new file mode 100644 index 0000000000000000000000000000000000000000..d0256736bb83a26e253302017babe122545d8260 --- /dev/null +++ b/core/processes/savgol.py @@ -0,0 +1,152 @@ +import numpy as np +import pandas as pd + +from postprocessor.core.processes.base import ParametersABC, ProcessABC + + +class savgolParameters(ParametersABC): + """ + Parameters + + window : int (odd) + Window length of datapoints. Must be odd and smaller than x + polynom : int + The order of polynom used. Must be smaller than the window size + """ + + def __init__(self, window, polynom): + self.window = window + self.polynom = polynom + + @classmethod + def default(cls): + return cls.from_dict({"window": 3, "polynom": 2}) + + +class savgol(ProcessABC): + """ + Apply Savitzky-Golay filter (works with NaNs, but it might return + NaN regions). + """ + + def __init__(self, parameters: savgolParameters): + super().__init__(parameters) + + def run(self, signal: pd.DataFrame): + savgol_on_srs = lambda x: self.non_uniform_savgol( + x.index, x.values, self.parameters.window, self.parameters.polynom + ) + return signal.apply(savgol_on_srs, 1) + + @staticmethod + def non_uniform_savgol(x, y, window: int, polynom: int): + """ + Applies a Savitzky-Golay filter to y with non-uniform spacing + as defined in x + + This is based on https://dsp.stackexchange.com/questions/1676/savitzky-golay-smoothing-filter-for-not-equally-spaced-data + The borders are interpolated like scipy.signal.savgol_filter would do + + source: https://dsp.stackexchange.com/a/64313 + + Parameters + ---------- + x : array_like + List of floats representing the x values of the data + y : array_like + List of floats representing the y values. Must have same length + as x + window : int (odd) + Window length of datapoints. Must be odd and smaller than x + polynom : int + The order of polynom used. Must be smaller than the window size + + Returns + ------- + np.array of float + The smoothed y values + """ + if len(x) != len(y): + raise ValueError('"x" and "y" must be of the same size') + + if len(x) < window: + raise ValueError("The data size must be larger than the window size") + + if type(window) is not int: + raise TypeError('"window" must be an integer') + + if window % 2 == 0: + raise ValueError('The "window" must be an odd integer') + + if type(polynom) is not int: + raise TypeError('"polynom" must be an integer') + + if polynom >= window: + raise ValueError('"polynom" must be less than "window"') + + half_window = window // 2 + polynom += 1 + + # Initialize variables + A = np.empty((window, polynom)) # Matrix + tA = np.empty((polynom, window)) # Transposed matrix + t = np.empty(window) # Local x variables + y_smoothed = np.full(len(y), np.nan) + + # Start smoothing + for i in range(half_window, len(x) - half_window, 1): + # Center a window of x values on x[i] + for j in range(0, window, 1): + t[j] = x[i + j - half_window] - x[i] + + # Create the initial matrix A and its transposed form tA + for j in range(0, window, 1): + r = 1.0 + for k in range(0, polynom, 1): + A[j, k] = r + tA[k, j] = r + r *= t[j] + + # Multiply the two matrices + tAA = np.matmul(tA, A) + + # Invert the product of the matrices + tAA = np.linalg.inv(tAA) + + # Calculate the pseudoinverse of the design matrix + coeffs = np.matmul(tAA, tA) + + # Calculate c0 which is also the y value for y[i] + y_smoothed[i] = 0 + for j in range(0, window, 1): + y_smoothed[i] += coeffs[0, j] * y[i + j - half_window] + + # If at the end or beginning, store all coefficients for the polynom + if i == half_window: + first_coeffs = np.zeros(polynom) + for j in range(0, window, 1): + for k in range(polynom): + first_coeffs[k] += coeffs[k, j] * y[j] + elif i == len(x) - half_window - 1: + last_coeffs = np.zeros(polynom) + for j in range(0, window, 1): + for k in range(polynom): + last_coeffs[k] += coeffs[k, j] * y[len(y) - window + j] + + # Interpolate the result at the left border + for i in range(0, half_window, 1): + y_smoothed[i] = 0 + x_i = 1 + for j in range(0, polynom, 1): + y_smoothed[i] += first_coeffs[j] * x_i + x_i *= x[i] - x[half_window] + + # Interpolate the result at the right border + for i in range(len(x) - half_window, len(x), 1): + y_smoothed[i] = 0 + x_i = 1 + for j in range(0, polynom, 1): + y_smoothed[i] += last_coeffs[j] * x_i + x_i *= x[i] - x[-half_window - 1] + + return y_smoothed diff --git a/core/processes/template.py b/core/processes/template.py new file mode 100644 index 0000000000000000000000000000000000000000..075f21b2a91993958829a09636c20acb7ccce412 --- /dev/null +++ b/core/processes/template.py @@ -0,0 +1,29 @@ +from postprocessor.core.processes.base import ParametersABC, ProcessABC +from postprocessor.core.functions.tracks import clean_tracks, merge_tracks, join_tracks + + +class ParametersTemplate(ParametersABC): + """ + Parameters + """ + + def __init__( + self, + ): + super().__init__() + + @classmethod + def default(cls): + return cls.from_dict({}) + + +class ProcessTemplate(ProcessABC): + """ + Template for process class. + """ + + def __init__(self, parameters: ParametersTemplate): + super().__init__(parameters) + + def run(self): + pass diff --git a/core/processor.py b/core/processor.py index d51a29108eefa45325b9371e1664a0c2e73b6f3e..4ccea664032b12650b40983d67a212ce3d1f22cf 100644 --- a/core/processor.py +++ b/core/processor.py @@ -1,30 +1,37 @@ +import h5py from typing import List, Dict, Union +from pydoc import locate + +import numpy as np import pandas as pd from agora.base import ParametersABC -from postprocessor.core.processes.merger import MergerParameters, Merger -from postprocessor.core.processes.picker import PickerParameters, Picker -from postprocessor.core.io.writer import Writer -from postprocessor.core.io.signal import Signal +from core.io.writer import Writer +from core.io.signal import Signal from core.cells import Cells -class PostProParameters(ParametersABC): +class PostProcessorParameters(ParametersABC): """ Anthology of parameters used for postprocessing :merger: :picker: parameters for picker - :processes: List of processes that can be found in ./processes - :datasets: Dictionary - """ + :processes: Dict processes:[objectives], 'processes' are defined in ./processes/ + while objectives are relative or absolute paths to datasets. If relative paths the + post-processed addresses are used. - def __init__(self, merger=None, picker=None, processes=[], datasets=None): - self.merger: MergerParameters = merger - self.picker: PickerParameters = picker - self.processes: List = processes + """ - self.datasets: Dict = datasets + def __init__( + self, + targets={}, + parameters={}, + outpaths={}, + ): + self.targets: Dict = targets + self.parameters: Dict = parameters + self.outpaths: Dict = outpaths def __getitem__(self, item): return getattr(self, item) @@ -33,13 +40,23 @@ class PostProParameters(ParametersABC): def default(cls, kind=None): if kind == "defaults" or kind == None: return cls( - merger=MergerParameters.default(), - picker=PickerParameters.default(), - datasets={ - "merger": "/extraction/general/None/area", - "picker": "/extraction/general/None/area", - "processes": [], + targets={ + "prepost": { + "merger": "/extraction/general/None/area", + "picker": ["/extraction/general/None/area"], + }, + "processes": { + "dsignal": ["/extraction/general/None/area"], + # "savgol": ["/extraction/general/None/area"], + }, }, + parameters={ + "prepost": { + "merger": mergerParameters.default(), + "picker": pickerParameters.default(), + } + }, + outpaths={}, ) def to_dict(self): @@ -49,24 +66,119 @@ class PostProParameters(ParametersABC): class PostProcessor: def __init__(self, filename, parameters): self.parameters = parameters - self._signals = Signal(filename) + self._filename = filename + self._signal = Signal(filename) self._writer = Writer(filename) - self.datasets = parameters["datasets"] - self.merger = Merger(parameters["merger"]) - self.picker = Picker( - parameters=parameters["picker"], cells=Cells.from_source(filename) + # self.outpaths = parameters["outpaths"] + self.merger = merger(parameters["parameters"]["prepost"]["merger"]) + + self.picker = picker( + parameters=parameters["parameters"]["prepost"]["picker"], + cells=Cells.from_source(filename), ) - self.processes = [ - self.get_process(process) for process in parameters["processes"] - ] + self.classfun = { + process: self.get_process(process) + for process in parameters["targets"]["processes"] + } + self.parameters_classfun = { + process: self.get_parameters(process) + for process in parameters["targets"]["processes"] + } + self.targets = parameters["targets"] + + @staticmethod + def get_process(process): + """ + Dynamically import a process class from the 'processes' folder. + Assumes process filename and class name are the same + """ + return locate("postprocessor.core.processes." + process + "." + process) + + @staticmethod + def get_parameters(process): + """ + Dynamically import a process class from the 'processes' folder. + Assumes process filename and class name are the same + """ + return locate( + "postprocessor.core.processes." + process + "." + process + "Parameters" + ) + + def run_prepost(self): + """Important processes run before normal post-processing ones""" + merge_events = self.merger.run(self._signal[self.targets["prepost"]["merger"]]) + + with h5py.File(self._filename, "r") as f: + prev_idchanges = self._signal.get_merges() + + changes_history = list(prev_idchanges) + [np.array(x) for x in merge_events] + self._writer.write("modifiers/merges", data=changes_history) + + picks = self.picker.run(self._signal[self.targets["prepost"]["picker"][0]]) + self._writer.write("modifiers/picks", data=picks) def run(self): - self.merger.run(self._signals[self.datasets["merger"]]) - self.picker.run(self._signals[self.datasets["picker"]]) - for process, dataset in zip(self.processes, self.datasets["processes"]): - process_result = process.run(self._signals.get_dataset(dataset)) - self.writer.write(process_result, dataset) + self.run_prepost() + + for process, datasets in self.targets["processes"].items(): + if process in self.parameters["parameters"].get( + "processes", {} + ): # If we assigned parameters + parameters = self.parameters_classfun[process](self.parameters[process]) + + else: + parameters = self.parameters_classfun[process].default() + + loaded_process = self.classfun[process](parameters) + for dataset in datasets: + if isinstance(dataset, list): # multisignal process + signal = [self._signal[d] for d in dataset] + elif isinstance(dataset, str): + signal = self._signal[dataset] + else: + raise ("Incorrect dataset") + + result = loaded_process.run(signal) + + if process in self.parameters.to_dict()["outpaths"]: + outpath = self.parameters.to_dict()["outpaths"][process] + elif isinstance(dataset, list): + # If no outpath defined, place the result in the minimum common + # branch of all signals used + prefix = "".join( + prefix + c[0] + for c in takewhile( + lambda x: all(x[0] == y for y in x), zip(*dataset) + ) + ) + outpath = ( + prefix + + "_".join( # TODO check that it always finishes in '/' + [d[len(prefix) :].replace("/", "_") for d in dataset] + ) + ) + elif isinstance(dataset, str): + outpath = dataset[1:].replace("/", "_") + else: + raise ("Outpath not defined", type(dataset)) + + if isinstance(result, dict): # Multiple Signals as output + for k, v in result: + self.write_result( + "/postprocessing/" + process + "/" + outpath + + f'/{k}', + v, metadata={} + ) + else: + self.write_result( + "/postprocessing/" + process + "/" + outpath, result, metadata={} + ) + + def write_result( + self, path: str, result: Union[List, pd.DataFrame, np.ndarray], metadata: Dict + ): + self._writer.write(path, result, meta=metadata) def _if_dict(item): diff --git a/examples/basic_processes.py b/examples/basic_processes.py new file mode 100644 index 0000000000000000000000000000000000000000..c322fd4b68b2dd4f310a44bcd2a4ccbebb366f48 --- /dev/null +++ b/examples/basic_processes.py @@ -0,0 +1,15 @@ +from postprocessor.core.processor import PostProcessor, PostProcessorParameters + +params = PostProcessorParameters.default() +pp = PostProcessor( + "/shared_libs/pipeline-core/scripts/pH_calibration_dual_phl__ura8__by4741__01/ph_5_29_025store.h5", + params, +) +tmp = pp.run() + +import h5py + +# f = h5py.File( +# "/shared_libs/pipeline-core/scripts/pH_calibration_dual_phl__ura8__by4741__01/ph_5_29_025store.h5", +# "a", +# ) diff --git a/examples/group.py b/examples/group.py new file mode 100644 index 0000000000000000000000000000000000000000..1e59978acca5cd34c7796cb65d93d95d1657773c --- /dev/null +++ b/examples/group.py @@ -0,0 +1,21 @@ +from pathlib import Path + +from postprocessor.core.group import GroupParameters, Group + +poses = [ + x.name.split("store")[0] + for x in Path( + "/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01" + ).rglob("*") + if x.name != "images.h5" +] + +gr = Group( + GroupParameters( + signals=["/extraction/general/None/area", "/extraction/mCherry/np_max/median"] + ) +) +gr.run( + central_store="/shared_libs/pipeline-core/scripts/data/ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01", + poses=poses, +) diff --git a/examples/testing.py b/examples/testing.py index 5a0b65168e83192e5c6abe6ac7e9f54580dbe12c..d5420d8337710e6a41f47a7cec436ed2a765b5f0 100644 --- a/examples/testing.py +++ b/examples/testing.py @@ -12,6 +12,7 @@ import compress_pickle from postprocessor.core.postprocessor import PostProcessor from postprocessor.core.tracks import get_avg_grs, non_uniform_savgol +from postprocessor.core.tracks import clean_tracks, merge_tracks, join_tracks from postprocessor.core.ph import * sns.set_context("talk", font_scale=1.8) @@ -68,8 +69,6 @@ def get_clean_dfs(dfs=None): clean_dfs = compress_pickle.load("/home/alan/Documents/tmp/clean_dfs.gz") else: - from postprocessor.core.tracks import clean_tracks, merge_tracks, join_tracks - # Clean timelapse clean = clean_tracks(new_dfs[("general", None, "area")]) tra, joint = merge_tracks(clean)