diff --git a/src/postprocessor/core/multisignal/crosscorr.py b/src/postprocessor/core/multisignal/crosscorr.py index 2b1c28969317f1c250768eafac8381248cd5472a..a4069829e9ada5681fd5b8ec85946e857390e1c2 100644 --- a/src/postprocessor/core/multisignal/crosscorr.py +++ b/src/postprocessor/core/multisignal/crosscorr.py @@ -13,6 +13,10 @@ class crosscorrParameters(ParametersABC): Attributes ---------- + stationary: boolean + If True, the underlying dynamic process is assumed to be + stationary with the mean a constant, estimated from all + data points. normalised: boolean (optional) If True, normalise the result for each replicate by the standard deviation over time for that replicate. @@ -20,7 +24,7 @@ class crosscorrParameters(ParametersABC): If True, return results only for positive lags. """ - _defaults = {"normalised": True, "only_pos": False} + _defaults = {"stationary": False, "normalised": True, "only_pos": False} class crosscorr(PostProcessABC): @@ -50,6 +54,10 @@ class crosscorr(PostProcessABC): trace_dfB: dataframe (required for cross-correlation only) An array of signal values, with each row a replicate measurement and each column a time point. + stationary: boolean + If True, the underlying dynamic process is assumed to be + stationary with the mean a constant, estimated from all + data points. normalised: boolean (optional) If True, normalise the result for each replicate by the standard deviation over time for that replicate. @@ -84,6 +92,12 @@ class crosscorr(PostProcessABC): >>> plt.figure() >>> plt.plot(ac.columns, ac.mean(axis=0, skipna=True)) >>> plt.show() + + Reference + --------- + Dunlop MJ, Cox RS, Levine JH, Murray RM, Elowitz MB (2008). Regulatory + activity revealed by dynamic correlations in gene expression noise. + Nat Genet, 40, 1493-1498. """ df = ( @@ -97,25 +111,15 @@ class crosscorr(PostProcessABC): n_replicates = trace_A.shape[0] # number of time points n_tps = trace_A.shape[1] - # deviation from the mean, where the mean is calculated over replicates - # at each time point, which allows for non-stationary behaviour - dmean_A = trace_A - np.nanmean(trace_A, axis=0).reshape((1, n_tps)) - # standard deviation over time for each replicate - stdA = np.sqrt( - np.nanmean(dmean_A**2, axis=1).reshape((n_replicates, 1)) - ) - if trace_dfB is not None: - trace_B = trace_dfB.to_numpy() - # cross correlation - dmean_B = trace_B - np.nanmean(trace_B, axis=0).reshape((1, n_tps)) - stdB = np.sqrt( - np.nanmean(dmean_B**2, axis=1).reshape((n_replicates, 1)) - ) + # autocorrelation if 2nd dataframe is not supplied + if trace_dfB is None: + trace_dfB = trace_dfA + trace_B = trace_A else: - # auto correlation - dmean_B = dmean_A - stdB = stdA - # calculate correlation + trace_B = trace_dfB.to_numpy() + # find deviation from the mean + dmean_A, stdA = _dev(trace_A, n_replicates, n_tps, self.stationary) + dmean_B, stdB = _dev(trace_B, n_replicates, n_tps, self.stationary) # lag r runs over positive lags pos_corr = np.nan * np.ones(trace_A.shape) for r in range(n_tps): @@ -148,3 +152,16 @@ class crosscorr(PostProcessABC): ) else: return pd.DataFrame(corr, index=df.index, columns=lags) + + +def _dev(y, nr, nt, stationary=False): + # calculate deviation from the mean + if stationary: + # mean calculated over time and over replicates + dy = y - np.nanmean(y) + else: + # mean calculated over replicates at each time point + dy = y - np.nanmean(y, axis=0).reshape((1, nt)) + # standard deviation calculated for each replicate + stdy = np.sqrt(np.nanmean(dy**2, axis=1).reshape((nr, 1))) + return dy, stdy diff --git a/tests/postprocessor/test_crosscorr.py b/tests/postprocessor/test_crosscorr.py new file mode 100644 index 0000000000000000000000000000000000000000..ea12692338da68b1c5d37539e5419a54ed7355e1 --- /dev/null +++ b/tests/postprocessor/test_crosscorr.py @@ -0,0 +1,54 @@ +import numpy as np +import pandas as pd +import pytest + +from postprocessor.core.multisignal.crosscorr import ( + crosscorr, + crosscorrParameters, +) + + +def generate_sinusoids_df( + time_axis, + num_replicates, +): + t = time_axis + ts = np.tile(t, num_replicates).reshape((num_replicates, len(t))) + s = 3 * np.sin( + 2 * np.pi * ts + 2 * np.pi * np.random.rand(num_replicates, 1) + ) + s_df = pd.DataFrame(s) + return s_df + + +@pytest.mark.parametrize("time_axis", [np.linspace(0, 4, 200)]) +@pytest.mark.parametrize("num_replicates", [333]) +def test_crosscorr( + time_axis, + num_replicates, +): + """Tests croscorr. + + Tests whether a crosscorr runner can be initialised with default + parameters and runs without errors. + """ + dummy_signal1 = generate_sinusoids_df(time_axis, num_replicates) + dummy_signal2 = generate_sinusoids_df(time_axis, num_replicates) + crosscorr_runner = crosscorr(crosscorrParameters.default()) + _ = crosscorr_runner.run(dummy_signal1, dummy_signal2) + + +@pytest.mark.parametrize("time_axis", [np.linspace(0, 4, 200)]) +@pytest.mark.parametrize("num_replicates", [333]) +def test_autocorr( + time_axis, + num_replicates, +): + """Tests croscorr. + + Tests whether a crosscorr runner can be initialised with default + parameters and runs without errors, when performing autocorrelation. + """ + dummy_signal1 = generate_sinusoids_df(time_axis, num_replicates) + crosscorr_runner = crosscorr(crosscorrParameters.default()) + _ = crosscorr_runner.run(dummy_signal1, dummy_signal1)