Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • swain-lab/aliby/aliby-mirror
  • swain-lab/aliby/alibylite
2 results
Show changes
import pytest
pytest.mark.skip
import json
import time
import numpy as np
from aliby.experiment import ExperimentLocal
from aliby.baby_client import BabyClient
from aliby.tile.tiler import Tiler
@pytest.mark.skip(reason="No longer usable, requires local files. Kept until replaced.")
def test_client():
root_dir = (
"/Users/s1893247/PhD/pipeline-core/data/glclvl_0"
".1_mig1_msn2_maf1_sfp1_dot6_03"
)
expt = ExperimentLocal(root_dir, finished=True)
seg_expt = Tiler(expt, finished=True)
print(seg_expt.positions)
seg_expt.current_position = "pos007"
config = {
"camera": "evolve",
"channel": "brightfield",
"zoom": "60x",
"n_stacks": "5z",
}
baby_client = BabyClient(expt, **config)
print("The session is {}".format(baby_client.sessions["default"]))
# Channel 0, 0, X,Y,Z all
num_timepoints = 5
traps_tps = [
seg_expt.get_traps_timepoint(
tp, tile_size=81, channels=[0], z=[0, 1, 2, 3, 4]
).squeeze()
for tp in range(num_timepoints)
]
segmentations = []
try:
for i, timpoint in enumerate(traps_tps):
print("Sending timepoint {};".format(i))
status = baby_client.queue_image(
timpoint,
baby_client.sessions["default"],
assign_mothers=True,
return_baprobs=True,
with_edgemasks=True,
)
while True:
try:
print("Loading.", end="")
result = baby_client.get_segmentation(
baby_client.sessions["default"]
)
except:
print(".", end="")
time.sleep(1)
continue
break
print("Received timepoint {}".format(i))
segmentations.append(result)
except Exception as e:
print(segmentations)
raise e
with open("segmentations.json", "w") as fd:
json.dump(segmentations, fd)
print("Done.")
# print(len(segmentations[0]))
# for i in range(5):
# print("trap {}".format(i))
# for k, v in segmentations[0][i].items():
# print(k, v)
#
# import matplotlib.pyplot as plt
# plt.imshow(np.squeeze(batches[0][0, ..., 0]))
# plt.savefig('test_baby.pdf')
import pytest
import os
from pathlib import Path
# from copy import copy
import pickle
from extraction.core.tracks import get_joinable, get_joint_ids, merge_tracks
from extraction.core.extractor import Extractor, ExtractorParameters
from extraction.core.lineage import reassign_mo_bud
DATA_DIR = Path(os.path.dirname(os.path.realpath(__file__))) / Path("data")
@pytest.mark.skip(reason="reassign_mo_bud no longer in use")
def test_mobud_translation(tracks_pkl=None, mo_bud_pkl=None):
if tracks_pkl is None:
tracks_pkl = "tracks.pkl"
if mo_bud_pkl is None:
mo_bud_pkl = "mo_bud.pkl"
mo_bud_pkl = Path(mo_bud_pkl)
tracks_pkl = Path(tracks_pkl)
with open(DATA_DIR / tracks_pkl, "rb") as f:
tracks = pickle.load(f)
with open(DATA_DIR / mo_bud_pkl, "rb") as f:
mo_bud = pickle.load(f)
ext = Extractor(
ExtractorParameters.from_meta({"channels/channel": ["Brightfield"]})
)
joinable = get_joinable(tracks, merge_tracks)
trans = get_joint_ids(joinable)
# Check that we have reassigned cell labels
mo_bud2 = reassign_mo_bud(mo_bud, trans)
assert mo_bud != mo_bud2
from extraction.core.tracks import load_test_dset, clean_tracks, merge_tracks
def test_clean_tracks():
tracks = load_test_dset()
clean = clean_tracks(tracks, min_len=3)
assert len(clean) < len(tracks)
pass
def test_merge_tracks_drop():
tracks = load_test_dset()
joint_tracks = merge_tracks(tracks, window=3, degree=2, drop=True, tol=1)
assert len(joint_tracks) < len(tracks), "Error when merging"
pass
def test_merge_tracks_nodrop():
tracks = load_test_dset()
joint_tracks, joint_ids = merge_tracks(
tracks, window=3, degree=2, drop=False, tol=1
)
assert len(joint_tracks) == len(tracks), "Error when merging"
assert len(joint_ids), "No joint ids found"
pass
import numpy as np
import pytest
from skimage.morphology import disk, erosion
from skimage import draw
import numpy as np
from skimage.morphology import disk, erosion
from extraction.core.functions.cell import volume
from extraction.core.functions.cell import min_maj_approximation
from extraction.core.functions.cell import eccentricity
from extraction.core.functions.cell import (
eccentricity,
min_maj_approximation,
volume,
)
threshold = 0.01
radii = list(range(10, 100, 10))
circularities = np.arange(0.4, 1., 0.1)
circularities = np.arange(0.4, 1.0, 0.1)
eccentricities = np.arange(0, 0.9, 0.1)
rotations = [10, 20, 30, 40, 50, 60, 70, 80, 90]
......@@ -23,23 +25,23 @@ def ellipse(x, y, rotate=0):
def maj_from_min(min_ax, ecc):
y = np.sqrt(min_ax ** 2 / (1 - ecc ** 2))
y = np.sqrt(min_ax**2 / (1 - ecc**2))
return np.round(y).astype(int)
@pytest.mark.parametrize('r', radii)
@pytest.mark.parametrize("r", radii)
def test_volume_circular(r):
im = disk(r)
v = volume(im)
real_v = (4 * np.pi * r ** 3) / 3
real_v = (4 * np.pi * r**3) / 3
err = np.abs(v - real_v) / real_v
assert err < threshold
assert np.isclose(v, real_v, rtol=threshold * real_v)
@pytest.mark.parametrize('x', radii)
@pytest.mark.parametrize('ecc', eccentricities)
@pytest.mark.parametrize('rotation', rotations)
@pytest.mark.parametrize("x", radii)
@pytest.mark.parametrize("ecc", eccentricities)
@pytest.mark.parametrize("rotation", rotations)
def test_volume_ellipsoid(x, ecc, rotation):
y = maj_from_min(x, ecc)
im = ellipse(x, y, rotation)
......@@ -51,24 +53,24 @@ def test_volume_ellipsoid(x, ecc, rotation):
return v, real_v
@pytest.mark.parametrize('x', radii)
@pytest.mark.parametrize('ecc', eccentricities)
@pytest.mark.parametrize('rotation', rotations)
@pytest.mark.parametrize("x", radii)
@pytest.mark.parametrize("ecc", eccentricities)
@pytest.mark.parametrize("rotation", rotations)
def test_approximation(x, ecc, rotation):
y = maj_from_min(x, ecc)
im = ellipse(x, y, rotation)
min_ax, maj_ax = min_maj_approximation(im)
assert np.allclose([min_ax, maj_ax], [x, y],
rtol=threshold * min(np.array([x, y])))
assert np.allclose(
[min_ax, maj_ax], [x, y], rtol=threshold * min(np.array([x, y]))
)
@pytest.mark.parametrize('x', radii)
@pytest.mark.parametrize('ecc', eccentricities)
@pytest.mark.parametrize('rotation', rotations)
@pytest.mark.parametrize("x", radii)
@pytest.mark.parametrize("ecc", eccentricities)
@pytest.mark.parametrize("rotation", rotations)
def test_roundness(x, ecc, rotation):
y = maj_from_min(x, ecc)
real_ecc = np.sqrt(y ** 2 - x ** 2) / y
real_ecc = np.sqrt(y**2 - x**2) / y
im = ellipse(x, y, rotation)
e = eccentricity(im)
assert np.isclose(real_ecc, e, rtol=threshold * real_ecc)
#!/usr/bin/env jupyter
from importlib_resources import files
from logfile_parser.legacy import get_legacy_log_example_interface
import pytest
examples_dir = files("aliby").parent.parent / "examples" / "logfile_parser"
grammars_dir = files("logfile_parser") / "grammars"
@pytest.fixture(scope="module", autouse=True)
def legacy_log_interface() -> dict:
return get_legacy_log_example_interface()
@pytest.fixture(scope="module", autouse=True)
def swainlab_log_interface() -> dict:
return (
files("aliby").parent.parent
/ "examples"
/ "parsers"
/ "swainlab_logfile_header_example.log"
)
#!/usr/bin/env jupyter
"""
Output of legacy logfile parser:
channels: {'channel': ['Brightfield', 'GFPFast', 'mCherry'], 'exposure': [30, 30, 100], 'skip': [1, 1, 1], 'zsect': [1, 1, 1], 'start_time': [1, 1, 1], 'camera_mode': [2, 2, 2], 'em_gain': [270, 270, 270], 'voltage': [1.0, 3.5, 2.5]}
zsectioning: {'nsections': [3], 'spacing': [0.8], 'pfson': [True], 'anyz': [True], 'drift': [0], 'zmethod': [2]}
time_settings: {'istimelapse': [True], 'timeinterval': [120], 'ntimepoints': [660], 'totaltime': [79200]}
positions: {'posname': ['pos001', 'pos002', 'pos003', 'pos004', 'pos005', 'pos006', 'pos007', 'pos008', 'pos009'], 'xpos': [568.0, 1267.0, 1026.0, 540.0, 510.0, -187.0, -731.0, -1003.0, -568.0], 'ypos': [1302.0, 1302.0, 977.0, -347.0, -687.0, -470.0, 916.0, 1178.0, 1157.0], 'zpos': [1876.5, 1880.125, 1877.575, 1868.725, 1867.15, 1864.05, 1867.05, 1866.425, 1868.45], 'pfsoffset': [122.45, 119.95, 120.1, 121.2, 122.9, 119.6, 117.05, 121.7, 119.35], 'group': [1, 1, 1, 2, 2, 2, 3, 3, 3], 'Brightfield': [30, 30, 30, 30, 30, 30, 30, 30, 30], 'GFPFast': [30, 30, 30, 30, 30, 30, 30, 30, 30], 'mCherry': [100, 100, 100, 100, 100, 100, 100, 100, 100]}
npumps: 2
pumpinit: {'pump_port': ['COM7', 'COM8'], 'syringe_diameter': [14.43, 14.43], 'flowrate': [0.0, 4.0], 'flowdirection': ['INF', 'INF'], 'isrunning': [True, True], 'contents': ['2% glucose in SC', '0.1% glucose in SC']}
nswitches: 1
switchvol: 50
switchrate: 100
switchtimes: [0]
switchtopump: [2]
switchfrompump: [1]
pumprate: [[0.0], [4.0]]
multiDGUI_commit: 05903fb3769ccf612e7801b46e2248644ce7ca28
date: 2020-02-29 00:00:00
microscope: Batman
acqfile: C:\path\to\example_multiDGUI_log.txt
details: Aim: Strain: Comments:
setup: Brightfield:
White LED
->(Polariser + Prism + condenser)]
->Filter block:[Dual GFP/mCherry exciter (59022x),Dual dichroic (89021bs),No emission filter]
->Emission filter wheel:[No filter in emission wheel]
GFPFast:
470nm LED
->Combiner cube:[480/40 exciter, 515LP dichroic->(455LP dichroic)]
->Filter block:[Dual GFP/mCherry exciter (59022x),Dual dichroic (89021bs),No emission filter]
->Emission filter wheel:[520/40 emission filter]
mCherry:
White LED
->Combiner cube:[No exciter, No reflecting dichroic->(515LP and 455LP dichroics)]
->Filter block:[Dual GFP/mCherry exciter (59022x),Dual dichroic (89021bs),No emission filter]
->Emission filter wheel:[632/60 emission filter]
Micromanager config file:C:\path\to\config_file.cfg
omero_project: SteadystateGlucose
omero_tags: ['29-Feb-2020', 'Batman', '3 chamber', 'GFP', 'mCherry', '1106.Mig2-GFP Mig1-mCherry', '900.Mig1-GFP Msn2-mCherry', '898.Msn2-GFP Mig1-mCherry', '0.1% glucose', '2% glucose', '']
expt_start: 2020-02-29 01:16:51
first_capture: 2020-02-29 01:17:01
omero_tags_stop: Time to next time point:-104.2112
"""
def test_essential_meta_fields(legacy_log_interface: dict):
"""
We test the ability of the parser to find channel names and z-stacks
"""
assert "channels" in legacy_log_interface, "Channels not found at root"
assert len(
legacy_log_interface["channels"]
), "Channels present but names not found"
#!/usr/bin/env jupyter
from pathlib import Path
from logfile_parser.swainlab_parser import parse_from_swainlab_grammar
def test_swainlab_parser(swainlab_log_interface: Path):
return parse_from_swainlab_grammar(swainlab_log_interface)
import numpy as np
import pandas as pd
import pytest
from postprocessor.core.processes.autoreg import autoreg, autoregParameters
def generate_sinusoids_df(
time_axis,
list_freqs,
):
"""Generate sinusoids and put them in a dataframe.
Parameters ---------- time_axis : array_like Time axis.
list_freqs : list List of frequencies for the sinusoids
Examples -------- generate_sinusoids_df([0,1,2,3,4], [1,2,3])
produces a dataframe containing 3 rows. The first row has a
sinusoid of frequency 1, the second has frequency 2, and the third
has frequency 3. The time axis goes from 0 to 5.
"""
sinusoids = np.array(
[np.sin((2 * np.pi * freq) * time_axis) for freq in list_freqs]
)
return pd.DataFrame(sinusoids)
@pytest.mark.parametrize("time_axis", [np.arange(0, 10, 0.01)])
@pytest.mark.parametrize("list_freqs", [[1, 2, 3]])
def test_autoreg(
time_axis,
list_freqs,
):
"""Tests autoreg.
Tests whether an autoreg runner can be initialised with default
parameters and runs without errors.
"""
dummy_signal = generate_sinusoids_df(time_axis, list_freqs)
autoreg_runner = autoreg(autoregParameters.default())
# freqs_df, power_df, order_df = autoreg_runner.run(dummy_signal)
_, _, _ = autoreg_runner.run(dummy_signal)
import numpy as np
import pandas as pd
import pytest
from postprocessor.core.multisignal.crosscorr import (
crosscorr,
crosscorrParameters,
)
def generate_sinusoids_df(
time_axis,
num_replicates,
):
t = time_axis
ts = np.tile(t, num_replicates).reshape((num_replicates, len(t)))
s = 3 * np.sin(
2 * np.pi * ts + 2 * np.pi * np.random.rand(num_replicates, 1)
)
s_df = pd.DataFrame(s)
return s_df
@pytest.mark.parametrize("time_axis", [np.linspace(0, 4, 200)])
@pytest.mark.parametrize("num_replicates", [333])
def test_crosscorr(
time_axis,
num_replicates,
):
"""Tests croscorr.
Tests whether a crosscorr runner can be initialised with default
parameters and runs without errors.
"""
dummy_signal1 = generate_sinusoids_df(time_axis, num_replicates)
dummy_signal2 = generate_sinusoids_df(time_axis, num_replicates)
crosscorr_runner = crosscorr(crosscorrParameters.default())
_ = crosscorr_runner.run(dummy_signal1, dummy_signal2)
@pytest.mark.parametrize("time_axis", [np.linspace(0, 4, 200)])
@pytest.mark.parametrize("num_replicates", [333])
def test_autocorr(
time_axis,
num_replicates,
):
"""Tests croscorr.
Tests whether a crosscorr runner can be initialised with default
parameters and runs without errors, when performing autocorrelation.
"""
dummy_signal1 = generate_sinusoids_df(time_axis, num_replicates)
crosscorr_runner = crosscorr(crosscorrParameters.default())
_ = crosscorr_runner.run(dummy_signal1, dummy_signal1)
import numpy as np
import pandas as pd
import pytest
from postprocessor.core.processes.gpsignal import (
estimate_gr,
gpsignal,
gpsignalParameters,
)
def dummy_signal(n_cells, n_tps, noise_level):
signal = np.array([np.linspace(1, 2, n_tps) for _ in range(n_cells)])
noise = np.random.normal(scale=noise_level, size=signal.shape)
return pd.DataFrame(signal + noise)
def test_dummy_signal():
ds = dummy_signal(5, 10, 0.001)
assert len(ds.columns) == 10
assert len(ds) == 5
# assert np.isclose(ds.std(), 0.001).any()
def default_values():
return dict(
dt=1, noruns=5, bounds={0: (0, 2), 1: (1, 3), 2: (-8, 0)}, verbose=True
)
# TODO: the tolerance threshold still needs to be tuned to expectations
thresh = 0.1
np.random.seed(42)
@pytest.mark.parametrize("n_cells", [10])
@pytest.mark.parametrize("n_tps", [50])
@pytest.mark.parametrize("noise_level", [0.01])
@pytest.mark.xfail(reason="Cell 6 is failing since unification") # TODO FIX
def test_estimate_gr(n_cells, n_tps, noise_level):
ds = dummy_signal(n_cells, n_tps, noise_level)
# Growth rate is just the slope
gr = 1 / n_tps
for i, volume in ds.iterrows():
results = estimate_gr(volume, **default_values())
est_gr = results["growth_rate"]
assert np.allclose(est_gr, gr, rtol=thresh), f"Failed for cell {i}"
def test_gpsignal():
ds = dummy_signal(5, 10, 0.001)
gpsig = gpsignal(gpsignalParameters.default())
multi_signal = gpsig.run(ds)
assert "fit_volume" in multi_signal
#!/usr/bin/env python3
import numpy as np
import pandas as pd
from postprocessor.core.processes.interpolate import (
interpolate,
interpolateParameters,
)
def dummy_signal_array(n_cells, n_tps):
"""Creates dummy signal array, i.e. increasing gradient"""
signal = np.array([np.linspace(1, 2, n_tps) for _ in range(n_cells)])
return signal
def test_dummy_signal_array():
ds = dummy_signal_array(5, 10)
# Check dimensions
assert ds.shape[0] == 5
assert ds.shape[1] == 10
def randomly_add_na(input_array, num_of_na):
"""Randomly replaces a 2d numpy array with NaNs, number of NaNs specified"""
input_array.ravel()[
np.random.choice(input_array.size, num_of_na, replace=False)
] = np.nan
return input_array
def test_interpolate():
dummy_array = dummy_signal_array(5, 10)
# Poke holes so interpolate can fill
holey_array = randomly_add_na(dummy_array, 15)
dummy_signal = pd.DataFrame(dummy_array)
holey_signal = pd.DataFrame(holey_array)
interpolate_runner = interpolate(interpolateParameters.default())
interpolated_signal = interpolate_runner.run(holey_signal)
subtr = interpolated_signal - dummy_signal
# Check that interpolated values are the ones that exist in the dummy
assert np.nansum(subtr.to_numpy()) == 0
# TODO: Check that if there are NaNs remaining after interpolation, they
# are at the ends
"""
Mutual information test
"""
# import pytest
import numpy as np
import pandas as pd
from postprocessor.core.multisignal.mi import mi, miParameters
# Sample data: cropped from replicate 1 in
# https://github.com/swainlab/mi-by-decoding/blob/master/matlab/ExampleScript/fig1_sfp1_replicates.json
SIGNAL_RICH = np.array(
[
[
-1.39512436686014,
-1.44046750531481,
-1.67185664648421,
-0.500474684796053,
-0.97255340345062,
-1.16250137971723,
],
[
-0.407742899002175,
-0.619583901332133,
-0.466156867298538,
-0.69093319800047,
-0.186360950573155,
-0.791411909242518,
],
[
0.350152741857363,
0.913407870351929,
0.524645770050427,
0.441917565610652,
1.5228639153911,
2.67310873357743,
],
[
0.524953681848925,
0.653076029386848,
1.24582647626295,
0.776211754098582,
-0.355200015764816,
-0.0871128171616209,
],
[
-1.68461323842732,
-1.43594025257403,
-1.3114696359734,
-0.956125193215477,
-1.2863334639258,
-0.963653392884438,
],
[
0.657671105178289,
1.20192526734078,
1.41272977531711,
1.10313719899755,
1.21218191767352,
1.25148540716015,
],
]
)
SIGNAL_STRESS = np.array(
[
[
0.360683309928096,
0.653056477804747,
0.609421809463519,
0.26028011016996,
-0.163807667201703,
-0.725067314828773,
],
[
-1.7884489956977,
-1.77274508168164,
-1.38542947325363,
-1.11368924913116,
-1.54227678929895,
-1.67197618502403,
],
[
0.246852644985541,
0.961545692641162,
-0.159373062144918,
0.0990542384881887,
-0.766446517169187,
-1.20071098737371,
],
[
0.393236272245847,
0.441250356135278,
1.05344010654052,
1.06399045807211,
-0.305342136100235,
-1.49833740305382,
],
[
-1.45454923818794,
-1.07292739455483,
-1.2991307659611,
-1.15322537661844,
-1.29894837314545,
-1.8884055407594,
],
[
0.102130222571265,
-1.07499178276524,
-1.3148530608215,
-0.765688535324232,
-0.645377669611553,
-0.937035540900562,
],
]
)
# Expected output: produced by running a modified estimateMI() from
# https://git.ecdf.ed.ac.uk/pswain/mutual-information/-/blob/master/MIdecoding.py
# on the sample input data. The modification was adding the few lines that
# forced a random state for each bootstrap as the 'i' variable, so that
# the output isconsistent and therefore useful for pytest.
MI_IQR = np.array(
[
[0, 0, 0],
[0, 0, 0],
[0, 0, 0],
[0, 0, 0],
[0.311278124459133, 0, 0.311278124459133],
[0.311278124459133, 0.311278124459133, 0.311278124459133],
]
)
def convert_to_df(array, strain_name):
multiindex_array = [[strain_name] * len(array), list(range(len(array)))]
multiindex = pd.MultiIndex.from_arrays(
multiindex_array, names=("strain", "cellID")
)
signal = pd.DataFrame(array, multiindex)
return signal
def test_mi():
"""Tests mi.
Tests whether an mi runner can be initialised with default
parameters and runs on sample data, giving expected output.
"""
dummy_signals = [
convert_to_df(SIGNAL_RICH, "rich"),
convert_to_df(SIGNAL_STRESS, "stress"),
]
params = miParameters.default()
params.train_test_split_seeding = True
mi_runner = mi(params)
res = mi_runner.run(dummy_signals)
assert np.allclose(res, MI_IQR)