Skip to content
Snippets Groups Projects
Commit 4ad4e2a8 authored by pswain's avatar pswain
Browse files

feat: added negative lags to autocrosscorr to enable coross-correlation

parent cf8d2661
No related branches found
No related tags found
No related merge requests found
import numpy as np
def autocrosscorr(
yA,
yB=None,
normalised=True,
only_pos=False,
):
"""
Calculates normalised auto- or cross-correlations as a function of lag.
Lag is given in multiples of the unknown time interval between data points.
Normalisation is by the product of the standard deviation over time for
each replicate for each variable.
For the cross-correlation between sA and sB, the closest peak to zero lag
should in the positive lags if sA is delayed compared to signal B and in the
negative lags if sA is advanced compared to signal B.
Parameters
----------
yA: array
An array of signal values, with each row a replicate measurement
and each column a time point.
yB: array (required for cross-correlation only)
An array of signal values, with each row a replicate measurement
and each column a time point.
normalised: boolean (optional)
If True, normalise the result for each replicate by the standard
deviation over time for that replicate.
only_pos: boolean (optional)
If True, return results only for positive lags.
Returns
-------
corr: array
An array of the correlations with each row the result for the
corresponding replicate and each column a time point
lags: array
A 1D array of the lags in multiples of the unknown time interval
Example
-------
>>> import matplotlib.pyplot as plt
>>> import numpy as np
Define a sine signal with 200 time points and 333 replicates
>>> t = np.linspace(0, 4, 200)
>>> ts = np.tile(t, 333).reshape((333, 200))
>>> s = 3*np.sin(2*np.pi*ts + 2*np.pi*np.random.rand(333, 1))
Find and plot the autocorrelaton
>>> ac, lags = autocrosscorr(s)
>>> plt.figure()
>>> plt.plot(lags, np.nanmean(ac, axis=0))
>>> plt.show()
"""
# number of replicates
nr = yA.shape[0]
# number of time points
nt = yA.shape[1]
# deviation from the mean, where the mean is calculated over replicates
# at each time point, which allows for non-stationary behaviour
dyA = yA - np.nanmean(yA, axis=0).reshape((1, nt))
# standard deviation over time for each replicate
stdA = np.sqrt(np.nanmean(dyA**2, axis=1).reshape((nr, 1)))
if np.any(yB):
# cross correlation
dyB = yB - np.nanmean(yB, axis=0).reshape((1, nt))
stdB = np.sqrt(np.nanmean(dyB**2, axis=1).reshape((nr, 1)))
else:
# auto correlation
dyB = dyA
stdB = stdA
# calculate correlation
# lag r runs over positive lags
pos_corr = np.nan * np.ones(yA.shape)
for r in range(nt):
prods = [dyA[:, t] * dyB[:, t + r] for t in range(nt - r)]
pos_corr[:, r] = np.nanmean(prods, axis=0)
# lag r runs over negative lags
# use corr_AB(-k) = corr_BA(k)
neg_corr = np.nan * np.ones(yA.shape)
for r in range(nt):
prods = [dyB[:, t] * dyA[:, t + r] for t in range(nt - r)]
neg_corr[:, r] = np.nanmean(prods, axis=0)
if normalised:
# normalise by standard deviation
pos_corr = pos_corr / stdA / stdB
neg_corr = neg_corr / stdA / stdB
# combine lags
lags = np.arange(-nt + 1, nt)
corr = np.hstack((np.flip(neg_corr[:, 1:], axis=1), pos_corr))
if only_pos:
return corr[:, int(lags.size/2):], lags[int(lags.size/2):]
else:
return corr, lags
###
def hl_envelopes_idx(s, dmin=1, dmax=1, split=False):
"""
Finds the envelope of a signal.
From https://stackoverflow.com/questions/34235530
Parameters
----------
s: 1d-array
Data signal from which to extract high and low envelopes
dmin, dmax: int (optional),
Size of chunks. Only necessary if the size of the input is too big.
split: boolean (optional)
If True, split the signal in half along its mean, which might help to
generate the envelope.
Returns
-------
lmin, lmax : array of integers
Low and high envelope indices of the input signal.
"""
# locals min
lmin = (np.diff(np.sign(np.diff(s))) > 0).nonzero()[0] + 1
# locals max
lmax = (np.diff(np.sign(np.diff(s))) < 0).nonzero()[0] + 1
if split:
s_mid = np.mean(s)
# pre-sorting of locals min based on relative position
lmin = lmin[s[lmin] < s_mid]
# pre-sorting of local max based on relative position
lmax = lmax[s[lmax] > s_mid]
# global max of dmax-chunks of locals max
lmin = lmin[
[
i + np.argmin(s[lmin[i: i + dmin]])
for i in range(0, len(lmin), dmin)
]
]
# global min of dmin-chunks of locals min
lmax = lmax[
[
i + np.argmax(s[lmax[i: i + dmax]])
for i in range(0, len(lmax), dmax)
]
]
return lmin, lmax
# pip install --upgrade aliby-post
import numpy as np
import pandas as pd
from pathlib import Path
import pprint
try:
from postprocessor.grouper import NameGrouper
except ModuleNotFoundError:
import sys
sys.path.append("/Users/pswain/wip/postprocessor")
sys.path.append("/Users/pswain/wip/agora")
from postprocessor.grouper import NameGrouper
class dataloader:
"""
Compiles aliby datasets into a single, extensible data frame
Parameters
----------
indir: string
The directory where data as stored as h5 files in one
directory per experiment
outdir: string
The working directory where .tsv files will be stored
Example
-------
>>> from dataloader import dataloader
>>> dl = dataloader(indir="experiments", outdir=".")
The names of experiments and datasets are stored in dictionaries
to avoid cutting and pasting of names.
To load an experiment from the data directory, first use,
for example,
>>> dl.load(dl.experiments[0])
or give the name of the directory explicitly
>>> dl.load("experiment_directory")
which will load the data from h5 files.
Once the loading has been complete, the data will be saved in a
text file - a .tsv file - in the working directory.
To load this file in future use, for example
>>> dl.load(dl.datasets[0])
or give the name of the file explicitly
>>> dl.load("data_set.tsv")
To see the data, use
>>> dl.df
which can be plotted directly with seaborn
>>> import seaborn as sns
>>> sns.relplot(x="time", y="median_GFP", hue="group",
kind="line", data=dl.df)
"""
def __init__(self, indir=".", outdir="."):
# from grouper.siglist to abbrevations
self.g2a_dict = {
"extraction/GFP/np_max/median": "median_GFP",
"extraction/GFP/np_max/mean": "mean_GFP",
"extraction/general/None/area": "area",
"extraction/general/None/volume": "volume",
"extraction/GFP/np_max/max5px_med": "max5px_med",
"postprocessing/births/extraction_general_None_volume": "births",
"postprocessing/bud_metric/extraction_general_None_volume":
"bud_volume",
"extraction/Flavin_bgsub/np_max/mean": "flavin",
}
# from abbreviations to grouper.siglist
self.a2g_dict = {v: k for (k, v) in self.g2a_dict.items()}
# establish paths
self.outdirpath = Path(outdir)
self.indirpath = Path(indir)
self.ls
###
@property
def ls(self):
"""
Lists the experiments available in indir and the datasets
available in outdir.
The names of all directories in indir are stored in the
.experiments dictionary.
The names of all datasets in outdir are stored in the
.datasets dictionary.
"""
pp = pprint.PrettyPrinter()
# find raw data
print("\nData directory is", str(self.indirpath.resolve()))
print("Experiments available:")
dirs = [f.name for f in self.indirpath.glob("*") if f.is_dir()]
# directories of data are stored in experiments
self.experiments = {i: name for i, name in enumerate(dirs)}
pp.pprint(self.experiments)
# find processed data
print("\nWorking directory is", str(self.outdirpath.resolve()))
print("Datasets available:")
files = [f.name for f in self.outdirpath.glob("*.tsv") if f.is_file()]
# tsv files are stored in datasets
self.datasets = {i: name for i, name in enumerate(files)}
pp.pprint(self.datasets)
###
def get_grouper(self, dataname):
"""
Returns an instance of NameGrouper.
Arguments
---------
dataname: string
The name of the dataset
Returns
-------
grouper: instance of NameGrouper
"""
if dataname[-4:] == ".tsv":
dataname = dataname[:-4]
grouper = NameGrouper(self.indirpath / dataname)
return grouper
###
def load(
self,
dataname,
g2a_dict=None,
pxsize=0.182,
save=False,
use_tsv=False,
):
"""
Loads either an experiment or a dataset into a long data
frame - with 'time' as a column.
New data is added to the existing dataframe.
Data is stored in the .df attribute.
Parameters
----------
dataname: string
Either the name of the directory for an experiment or a
file name for a dataset
g2a_dict: dictionary (optional)
A dictionary relating the aliby name to an abbreviation
eg {"extraction/general/None/volume": "volume"}
pxsize: float (default 0.182)
Pixel size in microns, which is used to convert volumes.
save: boolean (default: True)
If True, save dataframe generated as a tsv file
use_tsv: boolean (default: False)
If True, always load the data from a tsv file.
Returns
-------
grouper: aliby grouper object
grouper for an experiment if one is loaded
"""
if dataname[-4:] == ".tsv":
dataname = dataname[:-4]
use_tsv = True
# root name
self.dataname = dataname
# conversion dictionary
if not g2a_dict:
g2a_dict = self.g2a_dict
# load
if use_tsv:
# load data from tsv files
print("loading", dataname)
r_df = pd.read_csv(
str(self.outdirpath / (dataname + ".tsv")), sep="\t"
)
if hasattr(self, "r"):
# merge new data to current dataframe
self.df = pd.merge(self.df, r_df, how="left")
if save:
self.save(dataname)
else:
self.df = r_df
# store time interval
self.dt = np.min(np.diff(np.sort(self.df.time.unique())))
else:
# create instance of grouper
grouper = self.get_grouper(dataname)
# find time interval between images
self.dt = grouper.tintervals
# load data from h5 files
print(dataname + "\n---")
print("signals available:")
signals = grouper.siglist
for signal in signals:
print(" ", signal[1:])
print("\nloading...")
first = True
for i, char in enumerate(g2a_dict):
if "/" + char in signals:
print(" " + char)
hdf = grouper.concat_signal(char)
# convert to long dataframe
tdf = self._long_df(hdf, g2a_dict[char])
# drop redundant columns
tdf = tdf.drop(["position", "trap", "cell_label"], axis=1)
if first:
r_df = tdf.copy()
first = False
else:
r_df = pd.merge(r_df, tdf, how="left")
else:
print(" Warning: " + char, "not found")
if pxsize:
# volumes to micron^3
for signal in r_df.columns:
if "volume" in signal:
r_df[signal] *= pxsize ** 3
if hasattr(self, "r"):
# merge new data to current dataframe
self.df = pd.merge(self.df, r_df, how="left")
else:
# create new dataframe
self.df = r_df
# save dataframe
if save:
self.save(dataname)
# define ids
self.ids = list(self.df.id.unique())
if not use_tsv:
# return grouper
return grouper
###
def save(self, dataname=None):
"""
Saves the .df dataframe to a tab separated value (tsv) file
Parameters
----------
dataname: string (optional)
Name of the file
"""
if dataname is None:
dataname = self.dataname
self.df.to_csv(
str(self.outdirpath / (dataname + ".tsv")), sep="\t", index=False
)
print("Saved", dataname)
###
def _long_df(self, df, char_name):
"""
Internal function: converts an aliby multi-index dataframe into a
long dataframe
"""
# flatten multi-index dataframe
df = df.copy()
hnames = df.index.names
df.index = df.index.set_names(hnames)
df.reset_index(inplace=True)
# melt to create time column
df = df.melt(id_vars=hnames, var_name="time", value_name=char_name)
# add unique id for each cell
if (
"position" in df.columns
and "cell_label" in df.columns
and "trap" in df.columns
):
df.insert(
0,
"id",
df["position"]
+ ";"
+ df["cell_label"].astype(str)
+ ";"
+ df["trap"].astype(str),
)
return df
###
@property
def revert_df(self):
"""
Converts the 'id' column into the standard three columns used by
aliby - 'position', 'trap', and 'cell_label' - and vice versa,
either adding three columns to the .df dataframe or removing
three columns
"""
if (
"position" not in self.df.columns
and "trap" not in self.df.columns
and "cell_label" not in self.df.columns
):
# add back dropped columns by expanding id column
self.df[["position", "trap", "cell_label"]] = self.df.id.str.split(
";", expand=True
)
elif (
"position" in self.df.columns
and "trap" in self.df.columns
and "cell_label" in self.df.columns
):
# drop columns redundant with "id"
self.df = self.df.drop(["position", "trap", "cell_label"], axis=1)
else:
print("Cannot revert")
###
def wide_df(self, signal, x="time", y="id"):
"""
Pivots the .df dataframe to return a standard aliby dataframe
Parameters
----------
signal: string
The signal of interest
x: string (default: 'time')
The x value of interest
y: string (default: 'id')
The y value of interest
Example
-------
>>> dl.wide_df("median_GFP")
"""
return self.df.pivot(y, x, signal)
###
def get_time_series(self, signal, group=None):
"""
Extract all the data for a particular signal as
a 2D array with each row a time series
Parameters
----------
signal: string
The signal to be extracted
Returns
-------
t: 1D array
Time values
values: 2D array
The values of the signal with each row a separate time
series
group: string (optional)
Selected data only for a particular group
"""
if group is None:
wdf = self.wide_df(signal, x="time", y="id")
elif group in self.df.group.values:
wdf = self.df[self.df.group == group].pivot("id", "time", signal)
else:
print(group, "not recognised")
return None, None
time = wdf.columns.to_numpy()
# each column is a time series
values = wdf.to_numpy()
return time, values
def put_time_series(self, values, signal):
"""
Insert a 2D array of data with each column a time series
into the dataframe
Parameters
----------
values: array
The data to be inserted
signal: string
The name of the signal
"""
# find a suitable column in r to generate a pivot
cols = list(self.df.columns)
for col in ["id", "time"]:
cols.remove(col)
# use cols[0] to find index and columns of a pivot
wdf = self.wide_df(cols[0])
# create a dataframe using the structure of a pivot
df = pd.DataFrame(values, index=wdf.index, columns=wdf.columns)
# convert to long dataframe
tdf = self._long_df(df, signal)
# add to r
self.df = pd.merge(self.df, tdf, how="left")
###
def sort_df(
self, signal, functionstr="sum", tmin=None, tmax=None, strain=None
):
"""
Returns a sorted dataframe.
Parameters
----------
signal: string
The signal to sort by.
functionstr: string
A function to apply to the signal before sorting.
Default value is "sum" so that all the data for each cell
is summed before sorting.
Other options include "max", "mean", "median", etc.
tmin: float (optional)
Only include data for times greater than tmin
tmax: float (optional)
Only include data for times less than tmax
strain: string (optional)
Only include data for this strain
Examples
--------
To sort URA7 cells by the sum of their number of births after 10 hours
>>> df = dl.sort_df("births", tmax=10, strain="URA7")
To sort cells by their median fluorescence
>>> df = dl.sort_df("median_GFP")
"""
if strain:
df = self.df[self.df.group == strain].copy()
else:
df = self.df.copy()
if tmin and tmax:
sdf = df[(df.time >= tmin) & (df.time <= tmax)]
elif tmin:
sdf = df[df.time >= tmin]
elif tmax:
sdf = df[df.time <= tmax]
else:
sdf = df
sdf = sdf.groupby(["id"])
# apply function to data grouped by ID
if functionstr in dir(sdf):
sdf = eval("sdf." + functionstr + "()")
# sort
sdf = sdf.sort_values(by=signal)
sort_order = list(sdf.index)
# sort original dataframe in the same way
df.id = pd.Categorical(df.id, categories=sort_order)
df = df.sort_values(by="id")
# return sorted dataframe
return df
####
@property
def strains(self):
"""
List all strains in df.group
"""
for strain in list(self.df.group.unique()):
print(" " + strain)
####
def get_lineage_data(self, idx, signals):
"""
Returns signals for a single lineage specified by idx, which is
specified by an element of self.ids.
Arguments
---------
idx: integer
Element of self.ids
signals: list of strings
Signals to be returned
"""
if isinstance(signals, str):
signals = [signals]
res = [
self.df[self.df.id == idx][signal].to_numpy() for signal in signals
]
return res
import numpy as np
def autocrosscorr(
yA,
yB=None,
normalised=True,
):
"""
Calculates normalised auto- or cross-correlations as a function of time.
Normalisation is by the product of the standard deviation for each
variable calculated across replicates at each time point.
With zero lag, the normalised correlation should be one.
Parameters
----------
yA: array
An array of signal values, with each row a replicate measurement
and each column a time point.
yB: array (required for cross-correlation only)
An array of signal values, with each row a replicate measurement
and each column a time point.
normalised: boolean
If True normalise each time point by the standard deviation across
the replicates.
Returns
-------
corr: array
An array of the correlations with each row the result for the
corresponding replicate and each column a time point
"""
# number of replicates
nr = yA.shape[0]
# number of time points
nt = yA.shape[1]
# deviation from the mean, where the mean is calculated over replicates
# at each time point, which allows for non-stationary behaviour
dyA = yA - np.nanmean(yA, axis=0).reshape((1, nt))
# standard deviation over time for each replicate
stdA = np.sqrt(np.nanmean(dyA**2, axis=1).reshape((nr, 1)))
if np.any(yB):
# cross correlation
dyB = yB - np.nanmean(yB, axis=1).reshape((1, nt))
stdB = np.sqrt(np.nanmean(dyB**2, axis=1).reshape((nr, 1)))
else:
# auto correlation
yB = yA
dyB = dyA
stdB = stdA
# calculate correlation
corr = np.nan * np.ones(yA.shape)
# lag r runs over time points
for r in np.arange(0, nt):
prods = [dyA[:, t] * dyB[:, t + r] for t in range(nt - r)]
corr[:, r] = np.nansum(prods, axis=0) / (nt - r)
if normalised:
# normalise by standard deviation
corr = corr / stdA / stdB
return corr
###
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment