Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • swain-lab/aliby/aliby-mirror
  • swain-lab/aliby/alibylite
2 results
Show changes
import pytest
pytest.mark.skip
import json
import time
import numpy as np
# from aliby.experiment import ExperimentLocal
from aliby.baby_client import BabyClient
from aliby.tile.tiler import Tiler
@pytest.mark.skip(
reason="No longer usable, requires local files. Kept until replaced."
)
def test_client():
root_dir = (
"/Users/s1893247/PhD/pipeline-core/data/glclvl_0"
".1_mig1_msn2_maf1_sfp1_dot6_03"
)
expt = ExperimentLocal(root_dir, finished=True)
seg_expt = Tiler(expt, finished=True)
print(seg_expt.positions)
seg_expt.current_position = "pos007"
config = {
"camera": "evolve",
"channel": "brightfield",
"zoom": "60x",
"n_stacks": "5z",
}
baby_client = BabyClient(expt, **config)
print("The session is {}".format(baby_client.sessions["default"]))
# Channel 0, 0, X,Y,Z all
num_timepoints = 5
traps_tps = [
seg_expt.get_tiles_timepoint(
tp, tile_size=81, channels=[0], z=[0, 1, 2, 3, 4]
).squeeze()
for tp in range(num_timepoints)
]
segmentations = []
try:
for i, timpoint in enumerate(traps_tps):
print("Sending timepoint {};".format(i))
status = baby_client.queue_image(
timpoint,
baby_client.sessions["default"],
assign_mothers=True,
return_baprobs=True,
with_edgemasks=True,
)
while True:
try:
print("Loading.", end="")
result = baby_client.get_segmentation(
baby_client.sessions["default"]
)
except:
print(".", end="")
time.sleep(1)
continue
break
print("Received timepoint {}".format(i))
segmentations.append(result)
except Exception as e:
print(segmentations)
raise e
with open("segmentations.json", "w") as fd:
json.dump(segmentations, fd)
print("Done.")
# print(len(segmentations[0]))
# for i in range(5):
# print("trap {}".format(i))
# for k, v in segmentations[0][i].items():
# print(k, v)
#
# import matplotlib.pyplot as plt
# plt.imshow(np.squeeze(batches[0][0, ..., 0]))
# plt.savefig('test_baby.pdf')
......@@ -7,14 +7,6 @@ import skimage.morphology as morph
from scipy import ndimage
from skimage import draw
# from aliby.post_processing import (
# circle_outline,
# conical,
# ellipse_perimeter,
# union_of_spheres,
# volume_of_sphere,
# )
@pytest.mark.skip(
reason="No longer usable, post_processing unused inside aliby. Kept temporarily"
......
import argparse
from aliby.io.image import ImageLocal
from aliby.io.image import ImageLocalOME
# from aliby.experiment import ExperimentLocal
from aliby.tile.tiler import Tiler, TilerParameters
......@@ -19,8 +19,14 @@ def define_parser():
return parser
def initialise_dummy():
tiler_parameters = TilerParameters.default().to_dict()
dummy_tiler = Tiler.dummy(tiler_parameters)
return dummy_tiler
def initialise_objects(data_path, template=None):
image = ImageLocal(data_path)
image = ImageLocalOME(data_path)
tiler = Tiler.from_image(image, TilerParameters.default())
return tiler
......@@ -53,6 +59,8 @@ if __name__ == "__main__":
parser = define_parser()
args = parser.parse_args()
dummy_tiler = initialise_dummy()
tiler = initialise_objects(args.root_dir, template=args.template)
if args.position is not None:
......
#!/usr/bin/env jupyter
def pytest_addoption(parser):
parser.addoption("--file", action="store", default="test_datasets")
def pytest_generate_tests(metafunc):
# This is called for every test. Only get/set command line arguments
# if the argument is specified in the list of test "fixturenames".
option_value = metafunc.config.option.file
if "file" in metafunc.fixturenames and option_value is not None:
metafunc.parametrize("file", [option_value])
#
#!/usr/bin/env python3
import numpy as np
import dask.array as da
import pytest
from aliby.io.image import ImageDummy
tiler_parameters = {"tile_size": 117, "ref_channel": "Brightfield", "ref_z": 0}
sample_da = da.from_array(np.array([[1, 2], [3, 4]]))
# Make it 5-dimensional
sample_da = da.reshape(
sample_da, (1, 1, 1, sample_da.shape[-2], sample_da.shape[-1])
)
@pytest.mark.parametrize("sample_da", [sample_da])
@pytest.mark.parametrize("dim", [2])
@pytest.mark.parametrize("n_empty_slices", [4])
@pytest.mark.parametrize("image_position", [1])
def test_pad_array(sample_da, dim, n_empty_slices, image_position):
"""Test ImageDummy.pad_array() method"""
# create object
imgdmy = ImageDummy(tiler_parameters)
# pads array
padded_da = imgdmy.pad_array(
sample_da,
dim=dim,
n_empty_slices=n_empty_slices,
image_position=image_position,
)
# select which dimension to index the multidimensional array
indices = {dim: image_position}
ix = [
indices.get(dim, slice(None))
for dim in range(padded_da.compute().ndim)
]
# Checks that original image array is there and is at the correct index
assert np.array_equal(padded_da.compute()[ix], sample_da.compute()[0])
# Checks that the additional axis is extended correctly
assert padded_da.compute().shape[dim] == n_empty_slices + 1
#!/usr/bin/env jupyter
from pathlib import Path
import pytest
from aliby.pipeline import Pipeline, PipelineParameters
def test_local_pipeline(file: str):
if Path(file).exists():
params = PipelineParameters.default(
general={
"expt_id": file,
"distributed": 0,
"directory": "test_output/",
"overwrite": True,
},
tiler={"ref_channel": 0},
)
p = Pipeline(params)
p.run()
else:
print("Test dataset not downloaded")
#!/usr/bin/env jupyter
from importlib_resources import files
from logfile_parser.legacy import get_legacy_log_example_interface
import pytest
examples_dir = files("aliby").parent.parent / "examples" / "logfile_parser"
grammars_dir = files("logfile_parser") / "grammars"
@pytest.fixture(scope="module", autouse=True)
def legacy_log_interface() -> dict:
return get_legacy_log_example_interface()
@pytest.fixture(scope="module", autouse=True)
def swainlab_log_interface() -> dict:
return (
files("aliby").parent.parent
/ "examples"
/ "parsers"
/ "swainlab_logfile_header_example.log"
)
#!/usr/bin/env jupyter
"""
Output of legacy logfile parser:
channels: {'channel': ['Brightfield', 'GFPFast', 'mCherry'], 'exposure': [30, 30, 100], 'skip': [1, 1, 1], 'zsect': [1, 1, 1], 'start_time': [1, 1, 1], 'camera_mode': [2, 2, 2], 'em_gain': [270, 270, 270], 'voltage': [1.0, 3.5, 2.5]}
zsectioning: {'nsections': [3], 'spacing': [0.8], 'pfson': [True], 'anyz': [True], 'drift': [0], 'zmethod': [2]}
time_settings: {'istimelapse': [True], 'timeinterval': [120], 'ntimepoints': [660], 'totaltime': [79200]}
positions: {'posname': ['pos001', 'pos002', 'pos003', 'pos004', 'pos005', 'pos006', 'pos007', 'pos008', 'pos009'], 'xpos': [568.0, 1267.0, 1026.0, 540.0, 510.0, -187.0, -731.0, -1003.0, -568.0], 'ypos': [1302.0, 1302.0, 977.0, -347.0, -687.0, -470.0, 916.0, 1178.0, 1157.0], 'zpos': [1876.5, 1880.125, 1877.575, 1868.725, 1867.15, 1864.05, 1867.05, 1866.425, 1868.45], 'pfsoffset': [122.45, 119.95, 120.1, 121.2, 122.9, 119.6, 117.05, 121.7, 119.35], 'group': [1, 1, 1, 2, 2, 2, 3, 3, 3], 'Brightfield': [30, 30, 30, 30, 30, 30, 30, 30, 30], 'GFPFast': [30, 30, 30, 30, 30, 30, 30, 30, 30], 'mCherry': [100, 100, 100, 100, 100, 100, 100, 100, 100]}
npumps: 2
pumpinit: {'pump_port': ['COM7', 'COM8'], 'syringe_diameter': [14.43, 14.43], 'flowrate': [0.0, 4.0], 'flowdirection': ['INF', 'INF'], 'isrunning': [True, True], 'contents': ['2% glucose in SC', '0.1% glucose in SC']}
nswitches: 1
switchvol: 50
switchrate: 100
switchtimes: [0]
switchtopump: [2]
switchfrompump: [1]
pumprate: [[0.0], [4.0]]
multiDGUI_commit: 05903fb3769ccf612e7801b46e2248644ce7ca28
date: 2020-02-29 00:00:00
microscope: Batman
acqfile: C:\path\to\example_multiDGUI_log.txt
details: Aim: Strain: Comments:
setup: Brightfield:
White LED
->(Polariser + Prism + condenser)]
->Filter block:[Dual GFP/mCherry exciter (59022x),Dual dichroic (89021bs),No emission filter]
->Emission filter wheel:[No filter in emission wheel]
GFPFast:
470nm LED
->Combiner cube:[480/40 exciter, 515LP dichroic->(455LP dichroic)]
->Filter block:[Dual GFP/mCherry exciter (59022x),Dual dichroic (89021bs),No emission filter]
->Emission filter wheel:[520/40 emission filter]
mCherry:
White LED
->Combiner cube:[No exciter, No reflecting dichroic->(515LP and 455LP dichroics)]
->Filter block:[Dual GFP/mCherry exciter (59022x),Dual dichroic (89021bs),No emission filter]
->Emission filter wheel:[632/60 emission filter]
Micromanager config file:C:\path\to\config_file.cfg
omero_project: SteadystateGlucose
omero_tags: ['29-Feb-2020', 'Batman', '3 chamber', 'GFP', 'mCherry', '1106.Mig2-GFP Mig1-mCherry', '900.Mig1-GFP Msn2-mCherry', '898.Msn2-GFP Mig1-mCherry', '0.1% glucose', '2% glucose', '']
expt_start: 2020-02-29 01:16:51
first_capture: 2020-02-29 01:17:01
omero_tags_stop: Time to next time point:-104.2112
"""
def test_essential_meta_fields(legacy_log_interface: dict):
"""
We test the ability of the parser to find channel names and z-stacks
"""
assert "channels" in legacy_log_interface, "Channels not found at root"
assert len(
legacy_log_interface["channels"]
), "Channels present but names not found"
#!/usr/bin/env jupyter
from pathlib import Path
from logfile_parser.swainlab_parser import parse_from_swainlab_grammar
def test_swainlab_parser(swainlab_log_interface: Path):
return parse_from_swainlab_grammar(swainlab_log_interface)
import numpy as np
import pandas as pd
import pytest
from postprocessor.core.multisignal.crosscorr import (
crosscorr,
crosscorrParameters,
)
def generate_sinusoids_df(
time_axis,
num_replicates,
):
t = time_axis
ts = np.tile(t, num_replicates).reshape((num_replicates, len(t)))
s = 3 * np.sin(
2 * np.pi * ts + 2 * np.pi * np.random.rand(num_replicates, 1)
)
s_df = pd.DataFrame(s)
return s_df
@pytest.mark.parametrize("time_axis", [np.linspace(0, 4, 200)])
@pytest.mark.parametrize("num_replicates", [333])
def test_crosscorr(
time_axis,
num_replicates,
):
"""Tests croscorr.
Tests whether a crosscorr runner can be initialised with default
parameters and runs without errors.
"""
dummy_signal1 = generate_sinusoids_df(time_axis, num_replicates)
dummy_signal2 = generate_sinusoids_df(time_axis, num_replicates)
crosscorr_runner = crosscorr(crosscorrParameters.default())
_ = crosscorr_runner.run(dummy_signal1, dummy_signal2)
@pytest.mark.parametrize("time_axis", [np.linspace(0, 4, 200)])
@pytest.mark.parametrize("num_replicates", [333])
def test_autocorr(
time_axis,
num_replicates,
):
"""Tests croscorr.
Tests whether a crosscorr runner can be initialised with default
parameters and runs without errors, when performing autocorrelation.
"""
dummy_signal1 = generate_sinusoids_df(time_axis, num_replicates)
crosscorr_runner = crosscorr(crosscorrParameters.default())
_ = crosscorr_runner.run(dummy_signal1, dummy_signal1)
#!/usr/bin/env python3
import numpy as np
import pandas as pd
from postprocessor.core.processes.interpolate import (
interpolate,
interpolateParameters,
)
def dummy_signal_array(n_cells, n_tps):
"""Creates dummy signal array, i.e. increasing gradient"""
signal = np.array([np.linspace(1, 2, n_tps) for _ in range(n_cells)])
return signal
def test_dummy_signal_array():
ds = dummy_signal_array(5, 10)
# Check dimensions
assert ds.shape[0] == 5
assert ds.shape[1] == 10
def randomly_add_na(input_array, num_of_na):
"""Randomly replaces a 2d numpy array with NaNs, number of NaNs specified"""
input_array.ravel()[
np.random.choice(input_array.size, num_of_na, replace=False)
] = np.nan
return input_array
def test_interpolate():
dummy_array = dummy_signal_array(5, 10)
# Poke holes so interpolate can fill
holey_array = randomly_add_na(dummy_array, 15)
dummy_signal = pd.DataFrame(dummy_array)
holey_signal = pd.DataFrame(holey_array)
interpolate_runner = interpolate(interpolateParameters.default())
interpolated_signal = interpolate_runner.run(holey_signal)
subtr = interpolated_signal - dummy_signal
# Check that interpolated values are the ones that exist in the dummy
assert np.nansum(subtr.to_numpy()) == 0
# TODO: Check that if there are NaNs remaining after interpolation, they
# are at the ends