Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • swain-lab/aliby/aliby-mirror
  • swain-lab/aliby/alibylite
2 results
Show changes
Showing
with 2283 additions and 0 deletions
import bottleneck as bn
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from postprocessor.core.abc import PostProcessABC
class dsignalParameters(ParametersABC):
"""
:window: Number of timepoints to consider for signal.
"""
_defaults = {"window": 10, "min_count": 5}
class dsignal(PostProcessABC):
"""
Calculate the change in a signal using the mean of a moving window.
"""
def __init__(self, parameters: dsignalParameters):
super().__init__(parameters)
def run(self, signal: pd.DataFrame):
if signal.shape[1] > self.parameters.window:
matrix = np.diff(
bn.move_mean(
signal,
window=self.parameters.window,
min_count=self.parameters.min_count,
axis=1,
),
axis=1,
)
# Pad values to keep the same signal shape
matrix = np.pad(matrix, ((0, 0), (0, 1)), constant_values=np.nan)
else:
matrix = np.full_like(signal, np.nan)
return pd.DataFrame(matrix, index=signal.index, columns=signal.columns)
from collections import namedtuple
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from scipy.signal import periodogram
from postprocessor.core.abc import PostProcessABC
class fftParameters(ParametersABC):
"""
Parameters for the 'fft' process.
Attributes
----------
sampling_period: float
Sampling period of measurement values, in unit time.
oversampling_factor: float
Oversampling factor, which indicates how many times a signal is
sampled over the Nyquist rate. For example, if the oversampling
factor is 2, the signal is sampled at 2 times the Nyquist rate.
"""
_defaults = {
"sampling_period": 5,
"oversampling_factor": 1,
}
class fft(PostProcessABC):
"""
Process to estimate power spectral density (classical/Fourier).
Methods
-------
classical_periodogram(timeseries: array_like, sampling_period: float,
oversampling_factor: float)
Estimates the power spectral density using a periodogram.
run(signal: pd.DataFrame)
Estimates the power spectral density of a dataframe of time series.
"""
def __init__(self, parameters: fftParameters):
super().__init__(parameters)
def classical_periodogram(
self,
timeseries,
sampling_period,
oversampling_factor,
):
"""
Estimates the power spectral density using a periodogram.
This is based on a fast Fourier transform.
The power spectrum is normalised so that the area under the power
spectrum is constant across different time series, thus allowing users
to easily compare spectra across time series. See:
* Scargle (1982). Astrophysical Journal 263 p. 835-853
* Glynn et al (2006). Bioinformatics 22(3) 310-316
Parameters
---------
timeseries: array_like
Time series of measurement values.
sampling_period: float
Sampling period of measurement values, in unit time.
oversampling_factor: float
Oversampling factor, which indicates how many times a signal is
sampled over the Nyquist rate. For example, if the oversampling
factor is 2, the signal is sampled at 2 times the Nyquist rate.
Returns
-------
freqs: ndarray
Array of sample frequencies, unit time-1.
power: ndarray
Power spectral density or power spectrum of the time series,
arbitrary units.
"""
freqs, power = periodogram(
timeseries,
fs=1 / (oversampling_factor * sampling_period),
nfft=len(timeseries) * oversampling_factor,
detrend="constant",
return_onesided=True,
scaling="spectrum",
)
# Required to correct units; units will be expressed in min-1 (or any other
# unit)
freqs = oversampling_factor * freqs
# Normalisation (Scargle, 1982; Glynn et al., 2006)
power = power * (0.5 * len(timeseries))
# Normalisation by variance to allow comparing different time series
# (Scargle, 1982)
power = power / np.var(timeseries, ddof=1)
return freqs, power
def run(self, signal: pd.DataFrame):
"""
Estimates the power spectral density of a dataframe of time series.
This uses the classical periodogram.
All NaNs are dropped as the Fourier transform used does not afford
missing time points or time points with uneven spacing in the time
dimension. For time series with missing values, the Lomb-Scargle
periodogram is suggested (processes/lsp.py)
Parameters
----------
signal: pandas.DataFrame
Time series, with rows indicating individiual time series (e.g. from
each cell), and columns indicating time points.
Returns
-------
freqs_df: pandas.DataFrame
Sample frequencies from each time series, with labels preserved from
'signal'.
power_df: pandas.DataFrame
Power spectrum from each time series, with labels preserved from
'signal'.
"""
FftAxes = namedtuple("FftAxes", ["freqs", "power"])
# Each element in this list is a named tuple: (freqs, power)
axes = [
FftAxes(
*self.classical_periodogram(
timeseries=signal.iloc[row_index, :].dropna().values,
sampling_period=self.sampling_period,
oversampling_factor=self.oversampling_factor,
)
)
for row_index in range(len(signal))
]
freqs_df = pd.DataFrame(
[element.freqs for element in axes], index=signal.index
)
power_df = pd.DataFrame(
[element.power for element in axes], index=signal.index
)
return freqs_df, power_df
#!/usr/bin/env python3
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from scipy.signal import find_peaks
from postprocessor.core.abc import PostProcessABC
class findpeaksParameters(ParametersABC):
"""
Parameters for the 'findpeaks' process.
Attributes
----------
distance: number
Required minimal horizontal distance (>= 1) in samples between
neighbouring peaks. Smaller peaks are removed first until the condition
is fulfilled for all remaining peaks.
prominence: number or ndarray or sequence
Required prominence of peaks. Either a number, None, an array matching x
or a 2-element sequence of the former. The first element is always
interpreted as the minimal and the second, if supplied, as the maximal
required prominence.
"""
_defaults = {
"height": None,
"threshold": None,
"distance": 10,
"prominence": 0.035,
"width": None,
"wlen": None,
"rel_height": 0.5,
"plateau_size": None,
}
class findpeaks(PostProcessABC):
"""
Process to find peaks inside a signal.
Methods
-------
_find_peaks_mask(timeseries: sequence, distance: number, prominence: number
or ndarray or sequence)
Find peaks of a time series and returns a binary mask locating these peaks
run(signal: pd.DataFrame)
Find peaks in a dataframe of time series.
"""
def __init__(self, parameters: findpeaksParameters):
super().__init__(parameters)
def _find_peaks_mask(
self,
timeseries,
height,
threshold,
distance,
prominence,
width,
wlen,
rel_height,
plateau_size,
):
"""Find peaks of a time series and returns a binary mask locating these peaks
Parameters
----------
timeseries : sequence
Time series with peaks.
distance : number
Required minimal horizontal distance in samples between neighbouring
peaks.
prominence : number or ndarray or sequence
Required prominence of peaks.
"""
peak_indices = find_peaks(
timeseries,
height=height,
threshold=threshold,
distance=distance,
prominence=prominence,
width=width,
wlen=wlen,
rel_height=rel_height,
plateau_size=plateau_size,
)[0]
mask = np.zeros(len(timeseries), dtype=int)
mask[peak_indices] = 1
return mask
def run(self, signal: pd.DataFrame):
"""Find peaks in a dataframe of time series.
Find peaks of a dataframe of time series. This function is effectively a
wrapper for scipy.signal.find_peaks.
Parameters
----------
signal : pd.DataFrame
Time series, with rows indicating individual time series (e.g. from
each cell), and columns indicating time points.
"""
mask_df = signal.apply(
lambda x: self._find_peaks_mask(
x,
height=self.height,
threshold=self.threshold,
distance=self.distance,
prominence=self.prominence,
width=self.width,
wlen=self.wlen,
rel_height=self.rel_height,
plateau_size=self.plateau_size,
),
axis=1,
result_type="expand",
)
mask_df.columns = signal.columns
return mask_df
"""Gaussian process fit of a Signal."""
import logging
import gaussianprocessderivatives as gp
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from postprocessor.core.abc import PostProcessABC
def estimate_gr(volume, dt, noruns, bounds, verbose):
"""
Parameters
----------
volume : pd.Series
The volume series of a given cell
dt : float
The time interval in hours
noruns : int
The number of runs for optimisation
bounds : dict
The hyperparameter bounds used for optimisation
verbose : bool
If True, prints results
Returns
-------
"""
volume = volume.values
n = len(volume)
idx = np.arange(n)
t = idx * dt
y = volume[volume > 0]
x = t[volume > 0]
idx = idx[volume > 0]
# Fit the GP
mg = gp.maternGP(bounds, x, y)
mg.findhyperparameters(noruns=noruns)
if verbose:
mg.results() # Prints out the hyperparameters
mg.predict(x, derivs=2) # Saves the predictions to object
# Save the predictions to a csv file so they can be reused
results = dict(
time=mg.x,
volume=mg.y,
fit_time=mg.xnew,
fit_volume=mg.f,
growth_rate=mg.df,
d_growth_rate=mg.ddf,
volume_var=mg.fvar,
growth_rate_var=mg.dfvar,
d_growth_rate_var=mg.ddfvar,
)
for name, value in results.items():
results[name] = np.full((n,), np.nan)
results[name][idx] = value
return results
# Give that to a writer: NOTE the writer needs to learn how to write the
# output of a process that returns multiple signals like this one does.
class gpsignalParameters(ParametersABC):
"""
Parameters
----------
dt : float
The time step between time points, in minutes
noruns : int
The number of times the optimisation is tried
bounds : dict
Hyperparameter bounds for the Matern Kernel
verbose : bool
Determines whether to print hyperparameter results
"""
_defaults = dict(
dt=5,
noruns=5,
bounds={0: (-2, 3), 1: (-2, 1), 2: (-4, -1)},
verbose=False,
)
class gpsignal(PostProcessABC):
"""Gaussian process fit of a Signal."""
def __init__(self, parameters: gpsignalParameters):
super().__init__(parameters)
def run(self, signal: pd.DataFrame):
results = signal.apply(
lambda x: estimate_gr(x, **self.parameters.to_dict()),
result_type="expand",
axis=1,
)
multi_signal = {
name: pd.DataFrame(
np.vstack(results[name]),
index=signal.index,
columns=signal.columns,
)
for name in results.columns
}
return multi_signal
#!/usr/bin/env jupyter
import pandas as pd
from agora.abc import ParametersABC
from postprocessor.core.abc import PostProcessABC
class interpolateParameters(ParametersABC):
"""
Parameters
"""
_defaults = {"limit_area": "inside"}
class interpolate(PostProcessABC):
"""
Interpolate process.
"""
def __init__(self, parameters: interpolateParameters):
super().__init__(parameters)
def run(self, signal: pd.DataFrame):
if len(signal):
signal = signal.interpolate(limit_area="inside", axis=1)
return signal
#!/usr/bin/env python3
import igraph as ig
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from sklearn.metrics.pairwise import euclidean_distances
from postprocessor.core.abc import PostProcessABC
class knngraphParameters(ParametersABC):
"""
Parameters for the 'knngraph' process.
"""
_defaults = {
"n_neighbors": 10,
}
class knngraph(PostProcessABC):
"""
Process to get geometric graph from data/features, based on k-nearest
neighbours.
Methods
-------
run(signal: pd.DataFrame)
Get graph.
"""
def __init__(self, parameters: knngraphParameters):
super().__init__(parameters)
def run(self, signal: pd.DataFrame):
"""Get graph.
Parameters
----------
signal : pd.DataFrame
Feature matrix.
"""
distance_matrix = euclidean_distances(signal)
distance_matrix_pruned = graph_prune(
distance_matrix, self.n_neighbours
)
graph = ig.Graph.Weighted_Adjacency(
distance_matrix_pruned.tolist(), mode="undirected"
)
graph.vs["strain"] = signal.index.get_level_values("strain")
return graph
def graph_prune(distanceMatrix, neighbours):
"""
Prunes a complete graph (input distance matrix), keeping at least a
specified number of neighbours for each node.
Parameters:
-----------
distanceMatrix = 2D numpy array
neighbours = integer
Return: Dij_pruned, a 2D numpy array, represents distance matrix of pruned
graph
"""
Dij_temp = distanceMatrix
Adj = np.zeros(distanceMatrix.shape)
for ii in range(distanceMatrix.shape[0]):
idx = np.argsort(Dij_temp[ii, :])
Adj[ii, idx[1]] = 1
Adj[idx[1], ii] = 1
for jj in range(neighbours):
Adj[ii, idx[jj]] = 1
Adj[idx[jj], ii] = 1
Dij_pruned = Dij_temp * Adj
return Dij_pruned
from itertools import product
import igraph as ig
import leidenalg
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from postprocessor.core.abc import PostProcessABC
class leidenParameters(ParametersABC):
"""
Parameters
"""
_defaults = {}
class leiden(PostProcessABC):
"""
leiden algorithm applied to a dataframe with features.
"""
def __init__(self, parameters: leidenParameters):
super().__init__(parameters)
def run(self, features: pd.DataFrame):
# Generate euclidean distance matrix
distances = np.linalg.norm(
features.values - features.values[:, None], axis=2
)
ind = [
"_".join([str(y) for y in x[1:]])
for x in features.index.to_flat_index()
]
source, target = zip(*product(ind, ind))
df = pd.DataFrame(
{
"source": source,
"target": target,
"distance": distances.flatten(),
}
)
df = df.loc[df["source"] != df["target"]]
g = ig.Graph.DataFrame(df, directed=False)
return leidenalg.find_partition(g, leidenalg.ModularityVertexPartition)
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from scipy.signal import savgol_filter
from postprocessor.core.abc import PostProcessABC
class savgolParameters(ParametersABC):
"""
Parameters
window : int (odd)
Window length of datapoints. Must be odd and smaller than x
polynom : int
The order of polynom used. Must be smaller than the window size
"""
_defaults = {"window": 7, "polynom": 3}
class savgol(PostProcessABC):
"""
Apply Savitzky-Golay filter (works with NaNs, but it might return
NaN regions).
"""
def __init__(self, parameters: savgolParameters):
super().__init__(parameters)
def run(self, signal: pd.DataFrame):
try:
post_savgol = pd.DataFrame(
savgol_filter(
signal, self.parameters.window, self.parameters.polynom
),
index=signal.index,
columns=signal.columns,
)
except Exception as e:
print(e)
def savgol_on_srs(x):
return non_uniform_savgol(
x.index,
x.values,
self.parameters.window,
self.parameters.polynom,
)
post_savgol = signal.apply(savgol_on_srs, 1).apply(pd.Series)
return post_savgol
def non_uniform_savgol(x, y, window: int, polynom: int):
"""
Applies a Savitzky-Golay filter to y with non-uniform spacing
as defined in x
This is based on https://dsp.stackexchange.com/questions/1676/savitzky-golay-smoothing-filter-for-not-equally-spaced-data
The borders are interpolated like scipy.signal.savgol_filter would do
source: https://dsp.stackexchange.com/a/64313
Parameters
----------
x : array_like
List of floats representing the x values of the data
y : array_like
List of floats representing the y values. Must have same length
as x
window : int (odd)
Window length of datapoints. Must be odd and smaller than x
polynom : int
The order of polynom used. Must be smaller than the window size
Returns
-------
np.array of float
The smoothed y values
"""
_raiseif(
len(x) != len(y),
'"x" and "y" must be of the same size',
ValueError,
)
_raiseif(
len(x) < window,
"The data size must be larger than the window size",
ValueError,
)
_raiseif(
not isinstance(window, int),
'"window" must be an integer',
TypeError,
)
_raiseif(window % 2, 'The "window" must be an odd integer', ValueError)
_raiseif(
not isinstance(polynom, int),
'"polynom" must be an integer',
TypeError,
)
_raiseif(
polynom >= window,
'"polynom" must be less than "window"',
ValueError,
)
half_window = window // 2
polynom += 1
# Initialize variables
A = np.empty((window, polynom)) # Matrix
tA = np.empty((polynom, window)) # Transposed matrix
t = np.empty(window) # Local x variables
y_smoothed = np.full(len(y), np.nan)
# Start smoothing
for i in range(half_window, len(x) - half_window, 1):
# Center a window of x values on x[i]
for j in range(0, window, 1):
t[j] = x[i + j - half_window] - x[i]
# Create the initial matrix A and its transposed form tA
for j in range(0, window, 1):
r = 1.0
for k in range(0, polynom, 1):
A[j, k] = r
tA[k, j] = r
r *= t[j]
# Multiply the two matrices
tAA = np.matmul(tA, A)
# Invert the product of the matrices
tAA = np.linalg.inv(tAA)
# Calculate the pseudoinverse of the design matrix
coeffs = np.matmul(tAA, tA)
# Calculate c0 which is also the y value for y[i]
y_smoothed[i] = 0
for j in range(0, window, 1):
y_smoothed[i] += coeffs[0, j] * y[i + j - half_window]
# If at the end or beginning, store all coefficients for the polynom
if i == half_window:
first_coeffs = np.zeros(polynom)
for j in range(0, window, 1):
for k in range(polynom):
first_coeffs[k] += coeffs[k, j] * y[j]
elif i == len(x) - half_window - 1:
last_coeffs = np.zeros(polynom)
for j in range(0, window, 1):
for k in range(polynom):
last_coeffs[k] += coeffs[k, j] * y[len(y) - window + j]
# Interpolate the result at the left border
for i in range(0, half_window, 1):
y_smoothed[i] = 0
x_i = 1
for j in range(0, polynom, 1):
y_smoothed[i] += first_coeffs[j] * x_i
x_i *= x[i] - x[half_window]
# Interpolate the result at the right border
for i in range(len(x) - half_window, len(x), 1):
y_smoothed[i] = 0
x_i = 1
for j in range(0, polynom, 1):
y_smoothed[i] += last_coeffs[j] * x_i
x_i *= x[i] - x[-half_window - 1]
return y_smoothed
def _raiseif(cond, msg="", exc=AssertionError):
if cond:
raise exc(msg)
#!/usr/bin/env python
import pandas as pd
from sklearn.preprocessing import StandardScaler
from agora.abc import ParametersABC
from postprocessor.core.abc import PostProcessABC
class standardscalerParameters(ParametersABC):
"""
Parameters for the 'scale' process.
"""
_defaults = {}
class standardscaler(PostProcessABC):
"""
Process to scale a DataFrame of a signal using the standard scaler.
Methods
-------
run(signal: pd.DataFrame)
Scale values in a dataframe of time series.
"""
def __init__(self, parameters: standardscalerParameters):
super().__init__(parameters)
def run(self, signal: pd.DataFrame):
"""Scale values in a dataframe of time series.
Scale values in a dataframe of time series. This function is effectively a
wrapper for sklearn.preprocessing.StandardScaler.
Parameters
----------
signal : pd.DataFrame
Time series, with rows indicating individual time series (e.g. from
each cell), and columns indicating time points.
"""
signal_array = signal.to_numpy()
scaler = StandardScaler().fit(signal_array.transpose())
signal_scaled_array = scaler.transform(signal_array.transpose())
signal_scaled_array = signal_scaled_array.transpose()
signal_scaled = pd.DataFrame(
signal_scaled_array, columns=signal.columns, index=signal.index
)
return signal_scaled
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from postprocessor.core.abc import PostProcessABC
class TemplateParameters(ParametersABC):
"""
Parameters
"""
_defaults = {}
class Template(PostProcessABC):
"""
Template for process class.
"""
def __init__(self, parameters: TemplateParameters):
super().__init__(parameters)
def run(self):
pass
#!/usr/bin/env python3
import pandas as pd
import umap
from agora.abc import ParametersABC
from postprocessor.core.abc import PostProcessABC
class umapembeddingParameters(ParametersABC):
"""
Parameters for the 'umapembedding' process.
"""
_defaults = {
"n_neighbors": None,
"min_dist": None,
"n_components": None,
}
class umapembedding(PostProcessABC):
"""
Process to get UMAP embeddings from data/features.
Methods
-------
run(signal: pd.DataFrame)
Get UMAP embeddings.
"""
def __init__(self, parameters: umapembeddingParameters):
super().__init__(parameters)
def run(self, signal: pd.DataFrame):
"""Get UMAP embeddings.
This function is effectively a wrapper for umap.UMAP.
Parameters
----------
signal : pd.DataFrame
Feature matrix.
"""
mapper = umap.UMAP(
n_neighbors=self.n_neighbors,
min_dist=self.min_dist,
n_components=self.n_components,
).fit(signal)
return mapper.embedding_
import typing as t
from itertools import takewhile
import numpy as np
import pandas as pd
from tqdm import tqdm
from agora.abc import ParametersABC, ProcessABC
from agora.io.cells import Cells
from agora.io.signal import Signal
from agora.io.writer import Writer
from agora.utils.indexing import (
_3d_index_to_2d,
_assoc_indices_to_3d,
)
from agora.utils.merge import merge_association
from postprocessor.core.abc import get_parameters, get_process
from postprocessor.core.lineageprocess import (
LineageProcess,
LineageProcessParameters,
)
from postprocessor.core.reshapers.merger import Merger, MergerParameters
from postprocessor.core.reshapers.picker import Picker, PickerParameters
class PostProcessorParameters(ParametersABC):
"""
Anthology of parameters used for postprocessing
:merger:
:picker: parameters for picker
:processes: list processes:[objectives], 'processes' are defined in ./processes/
while objectives are relative or absolute paths to datasets. If relative paths the
post-processed addresses are used. The order of processes matters.
Supply parameters for picker, merger, and processes.
The order of processes matters.
'processes' are defined in ./processes/ while objectives are relative or absolute paths to datasets. If relative paths the post-processed addresses are used.
"""
def __init__(
self,
targets: t.Dict = {},
param_sets: t.Dict = {},
outpaths: t.Dict = {},
):
self.targets = targets
self.param_sets = param_sets
self.outpaths = outpaths
def __getitem__(self, item):
return getattr(self, item)
@classmethod
def default(cls, kind=[]):
"""
Include buddings and bud_metric and estimates of their time derivatives.
Parameters
----------
kind: list of str
If "ph_batman" included, add targets for experiments using pHlourin.
"""
# each subitem specifies the function to be called and the location
# on the h5 file to be written
targets = {
"prepost": {
"merger": "/extraction/general/None/area",
"picker": ["/extraction/general/None/area"],
},
"processes": [
[
"buddings",
["/extraction/general/None/volume"],
],
[
"dsignal",
[
"/extraction/general/None/volume",
],
],
[
"bud_metric",
[
"/extraction/general/None/volume",
],
],
[
"dsignal",
[
"/postprocessing/bud_metric/extraction_general_None_volume",
],
],
],
}
param_sets = {
"prepost": {
"merger": MergerParameters.default(),
"picker": PickerParameters.default(),
}
}
outpaths = {}
outpaths["aggregate"] = "/postprocessing/experiment_wide/aggregated/"
# pHlourin experiments are special
if "ph_batman" in kind:
targets["processes"]["dsignal"].append(
[
"/extraction/em_ratio/np_max/mean",
"/extraction/em_ratio/np_max/median",
"/extraction/em_ratio_bgsub/np_max/mean",
"/extraction/em_ratio_bgsub/np_max/median",
]
)
targets["processes"]["aggregate"].append(
[
[
"/extraction/em_ratio/np_max/mean",
"/extraction/em_ratio/np_max/median",
"/extraction/em_ratio_bgsub/np_max/mean",
"/extraction/em_ratio_bgsub/np_max/median",
"/extraction/gsum/np_max/median",
"/extraction/gsum/np_max/mean",
]
],
)
return cls(targets=targets, param_sets=param_sets, outpaths=outpaths)
class PostProcessor(ProcessABC):
def __init__(self, filename, parameters):
"""
Initialise PostProcessor
Parameters
----------
filename: str or PosixPath
Name of h5 file.
parameters: PostProcessorParameters object
An instance of PostProcessorParameters.
"""
super().__init__(parameters)
self._filename = filename
self._signal = Signal(filename)
self._writer = Writer(filename)
# parameters for merger and picker
dicted_params = {
i: parameters["param_sets"]["prepost"][i]
for i in ["merger", "picker"]
}
for k in dicted_params.keys():
if not isinstance(dicted_params[k], dict):
dicted_params[k] = dicted_params[k].to_dict()
# merger and picker
self.merger = Merger(
MergerParameters.from_dict(dicted_params["merger"])
)
self.picker = Picker(
PickerParameters.from_dict(dicted_params["picker"]),
cells=Cells.from_source(filename),
)
# processes, such as buddings
self.classfun = {
process: get_process(process)
for process, _ in parameters["targets"]["processes"]
}
# parameters for the process in classfun
self.parameters_classfun = {
process: get_parameters(process)
for process, _ in parameters["targets"]["processes"]
}
# locations to be written in the h5 file
self.targets = parameters["targets"]
def run_prepost(self):
"""Using picker, get and write lineages, returning mothers and daughters."""
"""Important processes run before normal post-processing ones"""
record = self._signal.get_raw(self.targets["prepost"]["merger"])
merges = np.array(self.merger.run(record), dtype=int)
self._writer.write(
"modifiers/merges", data=[np.array(x) for x in merges]
)
lineage = _assoc_indices_to_3d(self.picker.cells.mothers_daughters)
lineage_merged = []
if merges.any(): # Update lineages after merge events
merged_indices = merge_association(lineage, merges)
# Remove repeated labels post-merging
lineage_merged = np.unique(merged_indices, axis=0)
self.lineage = _3d_index_to_2d(
lineage_merged if len(lineage_merged) else lineage
)
self._writer.write(
"modifiers/lineage_merged", _3d_index_to_2d(lineage_merged)
)
picked_indices = self.picker.run(
self._signal[self.targets["prepost"]["picker"][0]]
)
if picked_indices.any():
self._writer.write(
"modifiers/picks",
data=pd.MultiIndex.from_arrays(
picked_indices.T,
# names=["trap", "cell_label", "mother_label"],
names=["trap", "cell_label"],
),
overwrite="overwrite",
)
@staticmethod
def pick_mother(a, b):
"""Update the mother id following this priorities:
The mother has a lower id
"""
x = max(a, b)
if min([a, b]):
x = [a, b][np.argmin([a, b])]
return x
def run(self):
"""
Write the results to the h5 file.
Processes include identifying buddings and finding bud metrics.
"""
# run merger, picker, and find lineages
self.run_prepost()
# run processes
for process, datasets in tqdm(self.targets["processes"]):
if process in self.parameters["param_sets"].get("processes", {}):
# parameters already assigned
parameters = self.parameters_classfun[process](
self.parameters[process]
)
else:
# assign parameters
parameters = self.parameters_classfun[process].default()
# load process
loaded_process = self.classfun[process](parameters)
if isinstance(parameters, LineageProcessParameters):
loaded_process.lineage = self.lineage
# apply process to each dataset
for dataset in datasets:
self.run_process(dataset, process, loaded_process)
def run_process(self, dataset, process, loaded_process):
"""Run process on a single dataset and write the result."""
# define signal
if isinstance(dataset, list):
# multisignal process
signal = [self._signal[d] for d in dataset]
elif isinstance(dataset, str):
signal = self._signal[dataset]
else:
raise ("Incorrect dataset")
# run process on signal
if len(signal) and (
not isinstance(loaded_process, LineageProcess)
or len(loaded_process.lineage)
):
result = loaded_process.run(signal)
else:
result = pd.DataFrame(
[], columns=signal.columns, index=signal.index
)
result.columns.names = ["timepoint"]
# define outpath, where result will be written
if process in self.parameters["outpaths"]:
outpath = self.parameters["outpaths"][process]
elif isinstance(dataset, list):
# no outpath is defined
# place the result in the minimum common branch of all signals
prefix = "".join(
c[0]
for c in takewhile(
lambda x: all(x[0] == y for y in x), zip(*dataset)
)
)
outpath = (
prefix
+ "_".join( # TODO check that it always finishes in '/'
[d[len(prefix) :].replace("/", "_") for d in dataset]
)
)
elif isinstance(dataset, str):
outpath = dataset[1:].replace("/", "_")
else:
raise ("Outpath not defined", type(dataset))
# add postprocessing to outpath when required
if process not in self.parameters["outpaths"]:
outpath = "/postprocessing/" + process + "/" + outpath
# write result
if isinstance(result, dict):
# multiple Signals as output
for k, v in result.items():
self.write_result(
outpath + f"/{k}",
v,
metadata={},
)
else:
# a single Signal as output
self.write_result(
outpath,
result,
metadata={},
)
def write_result(
self,
path: str,
result: t.Union[t.List, pd.DataFrame, np.ndarray],
metadata: t.Dict,
):
self._writer.write(path, result, meta=metadata, overwrite="overwrite")
#!/usr/bin/env python3
"""
Automatic summarissed report to condense results of an experiment.
It should NOT contain text beyond labels and titles.
The most efficient way to communicate information is (inteligently) colour-coded figures and tables.
The structure page-wise is as follows:
Page:Contents
1: Technical summary
2-4: Summarised signal results
5-End:
Part 1: Extended results
Part 2: Extended technicalities
"""
from agora.base import ParametersABC, ProcessABC
class Reporter(ProcessABC):
""" """
def __init__(self, parameters: ParametersABC):
super().__init__(parameters)
class ReporterParameters(ParametersABC):
""" """
def __init__(self, technical: dict, results: dict, grouper_kws: dict):
super().__init__()
from itertools import cycle
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from postprocessor.core.abc import PostProcessABC
class aggregateParameters(ParametersABC):
"""
Parameters
reduction: str to be passed to a dataframe for collapsing across columns
"""
_defaults = {
"reductions": ["mean", "median", "max"],
"axis": 1,
"ranges": None,
}
class aggregate(PostProcessABC):
"""
Aggregate multiple datasets for cell-to-cell feature comparison.
"""
def __init__(self, parameters: aggregateParameters):
super().__init__(parameters)
def run(self, signals):
index = signals[0].index
for s in signals[0:]:
index = index.intersection(s.index)
tmp_signals = [s.loc[index] for s in signals]
for i, s in enumerate(signals):
tmp_signals[i].name = s.name
signals = tmp_signals
assert len(signals), "Signals is empty"
bad_words = {
"postprocessing",
"extraction",
"None",
"np_max",
"",
}
def get_keywords(df):
return [
ind
for item in df.name.split("/")
for ind in item.split("/")
if ind not in bad_words
]
colnames = [
"_".join(get_keywords(s) + [red])
for s in signals
for red in self.parameters.reductions
]
concat = pd.concat(
[
getattr(signal, red)(axis=self.parameters.axis)
for signal in signals
for red in self.parameters.reductions
],
names=signals[0].index.names,
axis=self.parameters.axis,
)
if self.parameters.axis:
concat.columns = colnames
else:
concat.columns = pd.MultiIndex.from_product(
(
colnames,
[
"_".join((str(start), str(stop)))
for x in self.parameters.ranges
for start, stop in x
],
)
)
return concat
import typing as t
from typing import Dict, Tuple
import numpy as np
import pandas as pd
from agora.utils.lineage import mb_array_to_dict
from postprocessor.core.lineageprocess import (
LineageProcess,
LineageProcessParameters,
)
class BudMetricParameters(LineageProcessParameters):
"""
Parameters
"""
_defaults = {"lineage_location": "postprocessing/lineage_merged"}
class BudMetric(LineageProcess):
"""
Requires mother-bud information to create a new dataframe where the indices are mother ids and
values are the daughters' values for a given signal.
"""
def __init__(self, parameters: BudMetricParameters):
super().__init__(parameters)
def run(
self,
signal: pd.DataFrame,
lineage: Dict[pd.Index, Tuple[pd.Index]] = None,
):
if lineage is None:
if hasattr(self, "lineage"):
lineage = self.lineage
else:
assert "mother_label" in signal.index.names
lineage = signal.index.to_list()
return self.get_bud_metric(signal, mb_array_to_dict(lineage))
@staticmethod
def get_bud_metric(
signal: pd.DataFrame, md: Dict[Tuple, Tuple[Tuple]] = None
):
"""
signal: Daughter-inclusive dataframe
md: Mother-daughters dictionary where key is mother's index and value a list of daugher indices
Get fvi (First Valid Index) for all cells
Create empty matrix
for every mother:
- Get daughters' subdataframe
- sort daughters by cell label
- get series of fvis
- concatenate the values of these ranges from the dataframe
Fill the empty matrix
Convert matrix into dataframe using mother indices
"""
mothers_mat = np.zeros((len(md), signal.shape[1]))
cells_were_dropped = 0 # Flag determines if mothers (1), daughters (2) or both were missing (3)
md_index = signal.index
if (
"mother_label" not in md_index.names
): # Generate mother label from md dict if unavailable
d = {v: k for k, values in md.items() for v in values}
signal["mother_label"] = list(
map(lambda x: d.get(x, [0])[-1], signal.index)
)
signal.set_index("mother_label", append=True, inplace=True)
related_items = set(
[*md.keys(), *[y for x in md.values() for y in x]]
)
md_index = md_index.intersection(related_items)
elif "mother_label" in md_index.names:
md_index = md_index.droplevel("mother_label")
else:
raise ("Unavailable relationship information")
if len(md_index) < len(signal):
print("Dropped cells before bud_metric") # TODO log
signal = (
signal.reset_index("mother_label")
.loc(axis=0)[md_index]
.set_index("mother_label", append=True)
)
names = list(signal.index.names)
del names[-2]
output_df = (
signal.loc[signal.index.get_level_values("mother_label") > 0]
.groupby(names)
.apply(lambda x: _combine_daughter_tracks(x))
)
output_df.columns = signal.columns
output_df["padding_level"] = 0
output_df.set_index("padding_level", append=True, inplace=True)
if len(output_df):
output_df.index.names = signal.index.names
return output_df
def _combine_daughter_tracks(tracks: t.Collection[pd.Series]):
"""
Combine multiple time series of cells into one, overwriting values
prioritising the most recent entity.
"""
sorted_da_ids = tracks.sort_index(level="cell_label")
sorted_da_ids.index = range(len(sorted_da_ids))
tp_fvt = sorted_da_ids.apply(lambda x: x.first_valid_index(), axis=0)
tp_fvt = sorted_da_ids.columns.get_indexer(tp_fvt)
tp_fvt[tp_fvt < 0] = len(sorted_da_ids) - 1
_metric = np.choose(tp_fvt, sorted_da_ids.values)
return pd.Series(_metric, index=tracks.columns)
#!/usr/bin/env python3
import typing as t
from itertools import product
import numpy as np
import pandas as pd
from postprocessor.core.lineageprocess import (
LineageProcess,
LineageProcessParameters,
)
class buddingsParameters(LineageProcessParameters):
"""Parameter class to obtain budding events.
Parameters
----------
LineageProcessParameters : lineage_location
Location of lineage matrix to be used for calculations.
Examples
--------
FIXME: Add docs.
"""
_defaults = {"lineage_location": "postprocessing/lineage_merged"}
class buddings(LineageProcess):
"""
Calculate buddings in a trap assuming one mother per trap
returns a pandas series with the buddings.
We define a budding event as the moment in which a bud was identified for
the first time, even if the bud is not considered one until later
in the experiment.
"""
def __init__(self, parameters: buddingsParameters):
super().__init__(parameters)
def run(
self, signal: pd.DataFrame, lineage: np.ndarray = None
) -> pd.DataFrame:
lineage = lineage or self.lineage
# Get time of first appearance for all cells
fvi = signal.apply(lambda x: x.first_valid_index(), axis=1)
# Select mother cells in a given dataset
traps_mothers: t.Dict[tuple, list] = {
tuple(mo): [] for mo in lineage[:, :2] if tuple(mo) in signal.index
}
for trap, mother, daughter in lineage:
if (trap, mother) in traps_mothers.keys():
traps_mothers[(trap, mother)].append(daughter)
mothers = signal.loc[
set(signal.index).intersection(traps_mothers.keys())
]
# Create a new dataframe with dimensions (n_mother_cells * n_timepoints)
buddings = pd.DataFrame(
np.zeros((mothers.shape[0], signal.shape[1])).astype(bool),
index=mothers.index,
columns=signal.columns,
)
buddings.columns.names = ["timepoint"]
# Fill the budding events
for mother_id, daughters in traps_mothers.items():
daughters_idx = set(
fvi.loc[
fvi.index.intersection(
list(product((mother_id[0],), daughters))
)
].values
).difference({0})
buddings.loc[
mother_id,
daughters_idx,
] = True
return buddings
from agora.abc import ParametersABC
from postprocessor.core.abc import PostProcessABC
from postprocessor.core.functions.tracks import get_joinable
class MergerParameters(ParametersABC):
"""
Define the parameters for merger from a dict.
There are five parameters expected in the dict:
smooth, boolean
Whether or not to smooth with a savgol_filter.
tol: float or int
The threshold of average prediction error/std necessary to
consider two tracks the same.
If float, the threshold is the fraction of the first track;
if int, the threshold is in absolute units.
window: int
The size of the window of the savgol_filter.
degree: int v
The order of the polynomial used by the savgol_filter
"""
_defaults = {
"smooth": False,
"tolerance": 0.2,
"window": 5,
"degree": 3,
"min_avg_delta": 0.5,
}
class Merger(PostProcessABC):
"""Combine rows of tracklet that are likely to be the same."""
def __init__(self, parameters):
super().__init__(parameters)
def run(self, signal):
joinable = []
if signal.shape[1] > 4:
joinable = get_joinable(
signal,
smooth=self.parameters.smooth,
tol=self.parameters.tolerance,
window=self.parameters.window,
degree=self.parameters.degree,
)
return joinable
import typing as t
import numpy as np
import pandas as pd
from agora.abc import ParametersABC
from agora.io.cells import Cells
from agora.utils.indexing import validate_association
from agora.utils.cast import _str_to_int
from agora.utils.kymograph import drop_mother_label
from postprocessor.core.lineageprocess import LineageProcess
class PickerParameters(ParametersABC):
_defaults = {
"sequence": [
["lineage", "families"],
["condition", "present", 7],
],
}
class Picker(LineageProcess):
"""
:cells: Cell object passed to the constructor
:condition: Tuple with condition and associated parameter(s), conditions can be
"present", "nonstoply_present" or "quantile".
Determine the thresholds or fractions of signals to use.
:lineage: str {"mothers", "daughters", "families" (mothers AND daughters), "orphans"}. Mothers/daughters picks cells with those tags, families pick the union of both and orphans the difference between the total and families.
"""
def __init__(
self,
parameters: PickerParameters,
cells: Cells or None = None,
):
super().__init__(parameters=parameters)
self.cells = cells
def pick_by_lineage(
self,
signal: pd.DataFrame,
how: str,
mothers_daughters: t.Optional[np.ndarray] = None,
) -> pd.MultiIndex:
cells_present = drop_mother_label(signal.index)
mothers_daughters = self.get_lineage_information(signal)
valid_indices = slice(None)
if how == "mothers":
_, valid_indices = validate_association(
mothers_daughters, cells_present, match_column=0
)
elif how == "daughters":
_, valid_indices = validate_association(
mothers_daughters, cells_present, match_column=1
)
elif how == "families": # Mothers and daughters that are still present
_, valid_indices = validate_association(
mothers_daughters, cells_present
)
return signal.index[valid_indices]
def pick_by_condition(self, signal, condition, thresh):
idx = self.switch_case(signal, condition, thresh)
return idx
def run(self, signal):
self.orig_signal = signal
indices = set(signal.index)
lineage = self.get_lineage_information(signal)
if len(lineage):
self.mothers = lineage[:, :2]
self.daughters = lineage[:, [0, 2]]
for alg, *params in self.sequence:
new_indices = tuple()
if indices:
if alg == "lineage":
param1 = params[0]
new_indices = getattr(self, "pick_by_" + alg)(
signal.loc[list(indices)], param1
)
else:
param1, *param2 = params
new_indices = getattr(self, "pick_by_" + alg)(
signal.loc[list(indices)], param1, param2
)
new_indices = [tuple(x) for x in new_indices]
indices = indices.intersection(new_indices)
else:
self._log(f"No lineage assignment")
indices = np.array([])
return np.array([tuple(map(_str_to_int, x)) for x in indices])
def switch_case(
self,
signal: pd.DataFrame,
condition: str,
threshold: t.Union[float, int, list],
):
if len(threshold) == 1:
threshold = [_as_int(*threshold, signal.shape[1])]
case_mgr = {
"any_present": lambda s, thresh: any_present(s, thresh),
"present": lambda s, thresh: s.notna().sum(axis=1) > thresh,
"nonstoply_present": lambda s, thresh: s.apply(thresh, axis=1)
> thresh,
"growing": lambda s, thresh: s.diff(axis=1).sum(axis=1) > thresh,
}
return set(signal.index[case_mgr[condition](signal, *threshold)])
def _as_int(threshold: t.Union[float, int], ntps: int):
if type(threshold) is float:
threshold = ntps * threshold
return threshold
def any_present(signal, threshold):
"""
Return a mask for cells, True if there is a cell in that trap that was present for more than :threshold: timepoints.
"""
any_present = pd.Series(
np.sum(
[
np.isin([x[0] for x in signal.index], i) & v
for i, v in (signal.notna().sum(axis=1) > threshold)
.groupby("trap")
.any()
.items()
],
axis=0,
).astype(bool),
index=signal.index,
)
return any_present
#!/usr/bin/env python3
import re
import typing as t
from abc import ABC, abstractproperty
from collections import Counter
from functools import cached_property as property
from pathlib import Path
from typing import Dict, List, Union
import h5py
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from pathos.multiprocessing import Pool
from postprocessor.chainer import Chainer
class Grouper(ABC):
"""Base grouper class."""
def __init__(self, dir: Union[str, Path]):
"""Find h5 files and load a chain for each one."""
path = Path(dir)
assert path.exists(), f"{str(dir)} does not exist"
self.name = path.name
self.files = list(path.glob("*.h5"))
assert len(self.files), "No valid h5 files in dir"
self.load_chains()
def load_chains(self) -> None:
"""Load a chain for each position, or h5 file."""
self.chainers = {f.name[:-3]: Chainer(f) for f in self.files}
@property
def fsignal(self) -> Chainer:
"""Get first chain."""
return list(self.chainers.values())[0]
@property
def ntimepoints(self) -> int:
"""Find number of time points."""
return max([s.ntimepoints for s in self.chainers.values()])
@property
def tintervals(self) -> float:
"""Find the maximum time interval for all chains."""
tintervals = set([s.tinterval / 60 for s in self.chainers.values()])
assert (
len(tintervals) == 1
), "Not all chains have the same time interval"
return max(tintervals)
@property
def available(self) -> t.Collection[str]:
"""Generate list of available signals in the first chain."""
return self.fsignal.available
@property
def available_grouped(self) -> None:
"""Display available signals and the number of chains for each."""
if not hasattr(self, "_available_grouped"):
self._available_grouped = Counter(
[x for s in self.chainers.values() for x in s.available]
)
for s, n in self._available_grouped.items():
print(f"{s} - {n}")
@property
def datasets(self) -> None:
"""Print available data sets in the first chain."""
return self.fsignal.datasets
@abstractproperty
def positions_groups(self):
pass
def concat_signal(
self,
path: str,
pool: t.Optional[int] = None,
mode: str = "retained",
standard: t.Optional[bool] = False,
**kwargs,
):
"""
Concatenate data for one signal from different h5 files, with
one h5 file per position, into a dataframe.
Parameters
----------
path : str
Signal location within h5py file
pool : int
Number of threads used; if 0 or None only one core is used
mode: str
standard: boolean
**kwargs : key, value pairings
Named arguments for concat_ind_function
Examples
--------
>>> record = grouper.concat_signal("extraction/GFP/max/median")
"""
if path.startswith("/"):
path = path.strip("/")
good_chains = self.filter_chains(path)
if standard:
fn_pos = concat_standard
else:
fn_pos = concat_signal_ind
kwargs["mode"] = mode
records = self.pool_function(
path=path,
f=fn_pos,
pool=pool,
chainers=good_chains,
**kwargs,
)
# check for errors
errors = [
k for kymo, k in zip(records, self.chainers.keys()) if kymo is None
]
records = [record for record in records if record is not None]
if len(errors):
print("Warning: Positions contain errors {errors}")
assert len(records), "All data sets contain errors"
# combine into one dataframe
concat = pd.concat(records, axis=0)
if len(concat.index.names) > 4:
# reorder levels in the multi-index dataframe when mother_label is present
concat = concat.reorder_levels(
("group", "position", "trap", "cell_label", "mother_label")
)
concat_sorted = concat.sort_index()
return concat_sorted
def filter_chains(self, path: str) -> t.Dict[str, Chainer]:
"""Filter chains to those whose data is available in the h5 file."""
good_chains = {
k: v
for k, v in self.chainers.items()
if path in [*v.available, *v.common_chains]
}
nchains_dif = len(self.chainers) - len(good_chains)
if nchains_dif:
print(
f"Grouper:Warning: {nchains_dif} chains do not contain"
f" channel {path}"
)
assert len(
good_chains
), f"No valid dataset to use. Valid datasets are {self.available}"
return good_chains
def pool_function(
self,
path: str,
f: t.Callable,
pool: t.Optional[int] = None,
chainers: t.Dict[str, Chainer] = None,
**kwargs,
):
"""Enable different threads for independent chains, particularly useful when aggregating multiple elements."""
if pool is None:
pass
chainers = chainers or self.chainers
if pool:
with Pool(pool) as p:
records = p.map(
lambda x: f(
path=path,
chainer=x[1],
group=self.positions_groups[x[0]],
position=x[0],
**kwargs,
),
chainers.items(),
)
else:
records = [
f(
path=path,
chainer=chainer,
group=self.positions_groups[name],
position=name,
**kwargs,
)
for name, chainer in self.chainers.items()
]
return records
@property
def nmembers(self) -> t.Dict[str, int]:
"""Get the number of positions belonging to each group."""
return Counter(self.positions_groups.values())
@property
def ntiles(self):
"""Get total number of tiles per position (h5 file)."""
for pos, s in self.chainers.items():
with h5py.File(s.filename, "r") as f:
print(pos, f["/trap_info/trap_locations"].shape[0])
@property
def ntiles_by_group(self) -> t.Dict[str, int]:
"""Get total number of tiles per group."""
ntiles = {}
for pos, s in self.chainers.items():
with h5py.File(s.filename, "r") as f:
ntiles[pos] = f["/trap_info/trap_locations"].shape[0]
ntiles_by_group = {k: 0 for k in self.groups}
for posname, vals in ntiles.items():
ntiles_by_group[self.positions_groups[posname]] += vals
return ntiles_by_group
@property
def tilelocs(self) -> t.Dict[str, np.ndarray]:
"""Get the locations of the tiles for each position as a dictionary."""
d = {}
for pos, s in self.chainers.items():
with h5py.File(s.filename, "r") as f:
d[pos] = f["/trap_info/trap_locations"][()]
return d
@property
def groups(self) -> t.Tuple[str]:
"""Get groups, sorted alphabetically."""
return tuple(sorted(set(self.positions_groups.values())))
@property
def positions(self) -> t.Tuple[str]:
"""Get positions, sorted alphabetically."""
return tuple(sorted(set(self.positions_groups.keys())))
def ncells(
self,
path="extraction/general/None/area",
mode="retained",
**kwargs,
) -> t.Dict[str, int]:
"""Get number of cells retained per position in base channel as a dictionary."""
return (
self.concat_signal(path=path, mode=mode, **kwargs)
.groupby("group")
.apply(len)
.to_dict()
)
@property
def nretained(self) -> t.Dict[str, int]:
"""Get number of cells retained per position in base channel as a dictionary."""
return self.ncells()
@property
def channels(self):
"""Get unique channels for all chains as a set."""
return set(
[
channel
for chainer in self.chainers.values()
for channel in chainer.channels
]
)
@property
def stages_span(self):
# FAILS on my example
return self.fsignal.stages_span
@property
def max_span(self):
# FAILS on my example
return self.fsignal.max_span
@property
def stages(self):
# FAILS on my example
return self.fsignal.stages
@property
def tinterval(self):
"""Get interval between time points."""
return self.fsignal.tinterval
class MetaGrouper(Grouper):
"""Group positions using metadata's 'group' number."""
pass
class NameGrouper(Grouper):
"""Group a set of positions with a shorter version of the group's name."""
def __init__(self, dir, name_inds=(0, -4)):
"""Define the indices to slice names."""
super().__init__(dir=dir)
self.name_inds = name_inds
@property
def positions_groups(self) -> t.Dict[str, str]:
"""Get a dictionary with the positions as keys and groups as items."""
if not hasattr(self, "_positions_groups"):
self._positions_groups = {}
for name in self.chainers.keys():
self._positions_groups[name] = name[
self.name_inds[0] : self.name_inds[1]
]
return self._positions_groups
class phGrouper(NameGrouper):
"""Grouper for pH calibration experiments where all surveyed media pH values are within a single experiment."""
def __init__(self, dir, name_inds=(3, 7)):
"""Initialise via NameGrouper."""
super().__init__(dir=dir, name_inds=name_inds)
def get_ph(self) -> None:
"""Find the pH from the group names and store as a dictionary."""
self.ph = {gn: self.ph_from_group(gn) for gn in self.positions_groups}
@staticmethod
def ph_from_group(group_name: str) -> float:
"""Find the pH from the name of a group."""
if group_name.startswith("ph_") or group_name.startswith("pH_"):
group_name = group_name[3:]
return float(group_name.replace("_", "."))
def aggregate_multichains(self, signals: list) -> pd.DataFrame:
"""Get data from a list of signals and combine into one multi-index dataframe with 'media-pH' included."""
aggregated = pd.concat(
[
self.concat_signal(signal, reduce_cols=np.nanmean)
for signal in signals
],
axis=1,
)
ph = pd.Series(
[
self.ph_from_group(
x[list(aggregated.index.names).index("group")]
)
for x in aggregated.index
],
index=aggregated.index,
name="media_pH",
)
aggregated = pd.concat((aggregated, ph), axis=1)
return aggregated
def concat_standard(
path: str,
chainer: Chainer,
group: str,
position: t.Optional[str] = None,
**kwargs,
) -> pd.DataFrame:
combined = chainer.get(path, **kwargs).copy()
combined["position"] = position
combined["group"] = group
combined.set_index(["group", "position"], inplace=True, append=True)
combined.index = combined.index.reorder_levels(
("group", "position", "trap", "cell_label", "mother_label")
)
return combined
def concat_signal_ind(
path: str,
chainer: Chainer,
group: str,
mode: str = "retained",
position=None,
**kwargs,
) -> pd.DataFrame:
"""
Retrieve an individual signal.
Applies filtering if requested and adjusts indices.
"""
if position is None:
# name of h5 file
position = chainer.stem
if mode == "retained":
combined = chainer.retained(path, **kwargs)
elif mode == "raw":
combined = chainer.get_raw(path, **kwargs)
elif mode == "daughters":
combined = chainer.get_raw(path, **kwargs)
combined = combined.loc[
combined.index.get_level_values("mother_label") > 0
]
elif mode == "families":
combined = chainer[path]
else:
raise Exception(f"{mode} not recognised.")
if combined is not None:
# adjust indices
combined["position"] = position
combined["group"] = group
combined.set_index(["group", "position"], inplace=True, append=True)
combined.index = combined.index.swaplevel(-2, 0).swaplevel(-1, 1)
# should there be an error message if None is returned?
return combined
class MultiGrouper:
"""Wrap results from multiple experiments stored as folders inside a
folder."""
def __init__(self, source: Union[str, list]):
"""
Create NameGroupers for each experiment.
Parameters
----------
source: list of str
List of folders, one per experiment, containing h5 files.
"""
if isinstance(source, str):
source = Path(source)
self.exp_dirs = list(source.glob("*"))
else:
self.exp_dirs = [Path(x) for x in source]
self.groupers = [NameGrouper(d) for d in self.exp_dirs]
for group in self.groupers:
group.load_chains()
@property
def available(self) -> None:
"""Print available signals and number of chains, one per position, for each Grouper."""
for gpr in self.groupers:
print(gpr.available_grouped)
@property
def sigtable(self) -> pd.DataFrame:
"""Generate a table showing the number of positions, or h5 files, available for each signal with one column per experiment."""
def regex_cleanup(x):
x = re.sub(r"extraction\/", "", x)
x = re.sub(r"postprocessing\/", "", x)
x = re.sub(r"\/max", "", x)
return x
if not hasattr(self, "_sigtable"):
raw_mat = [
[s.available for s in gpr.chainers.values()]
for gpr in self.groupers
]
available_grouped = [
Counter([x for y in grp for x in y]) for grp in raw_mat
]
nexps = len(available_grouped)
sigs_idx = list(
set([y for x in available_grouped for y in x.keys()])
)
sigs_idx = [regex_cleanup(x) for x in sigs_idx]
nsigs = len(sigs_idx)
sig_matrix = np.zeros((nsigs, nexps))
for i, c in enumerate(available_grouped):
for k, v in c.items():
sig_matrix[sigs_idx.index(regex_cleanup(k)), i] = v
sig_matrix[sig_matrix == 0] = np.nan
self._sigtable = pd.DataFrame(
sig_matrix,
index=sigs_idx,
columns=[x.name for x in self.exp_dirs],
)
return self._sigtable
def _sigtable_plot(self) -> None:
"""
Plot number of chains for all available experiments.
Examples
--------
FIXME: Add docs.
"""
ax = sns.heatmap(self.sigtable, cmap="viridis")
ax.set_xticklabels(
ax.get_xticklabels(),
rotation=10,
ha="right",
rotation_mode="anchor",
)
plt.show()
def aggregate_signal(
self,
path: Union[str, list],
**kwargs,
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""
Aggregate chains, one per position, from multiple Groupers, one per experiment.
Parameters
----------
path : Union[str, list]
String or list of strings indicating the signal(s) to fetch.
**kwargs :
Passed to Grouper.concat_signal.
Returns
-------
concatenated: Union[pd.DataFrame, Dict[str, pd.DataFrame]]
A multi-index dataFrame or a dictionary of multi-index dataframes, one per signal
Examples
--------
>>> mg = MultiGrouper(["pHCalibrate7_24", "pHCalibrate6_7"])
>>> p405 = mg.aggregate_signal("extraction/pHluorin405_0_4/max/median")
>>> p588 = mg.aggregate_signal("extraction/pHluorin488_0_4/max/median")
>>> ratio = p405 / p488
"""
if isinstance(path, str):
path = [path]
sigs = {s: [] for s in path}
for s in path:
for grp in self.groupers:
try:
sigset = grp.concat_signal(s, **kwargs)
new_idx = pd.MultiIndex.from_tuples(
[(grp.name, *x) for x in sigset.index],
names=("experiment", *sigset.index.names),
)
sigset.index = new_idx
sigs[s].append(sigset)
except Exception as e:
print("Grouper {} failed: {}".format(grp.name, e))
concatenated = {
name: pd.concat(multiexp_sig)
for name, multiexp_sig in sigs.items()
}
if len(concatenated) == 1:
concatenated = list(concatenated.values())[0]
return concatenated
"""Routines for analysing post-processed data that don't follow the parameters-processes structure.
Routines for analysing post-processed data that don't follow the
parameters-processes structure.
Currently, these consist of plotting routines. There is one module for each
plotting routine. Each module consists of two components and is structured as
follows:
1. An internal class.
The class defines the parameters and defines additional class attributes to
help with plotting. The class also has one method (`plot`) that takes a
`matplotlib.Axes` object as an argument. This method draws the plot on the
`Axes` object.
2. A plotting function.
The user accesses this function. This function defines the default
parameters in its arguments. Within the function, a 'plotter' object is
defined using the internal class and then the function draws the plot on a
specified `matplotlib.Axes` object.
This structure follows that of plotting routines in `seaborn`
(https://github.com/mwaskom/seaborn), a Python visualisation library that is
based on `matplotlib`.
"""