Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • swain-lab/aliby/aliby-mirror
  • swain-lab/aliby/alibylite
2 results
Show changes
Showing
with 6636 additions and 3270 deletions
# File with defaults for ease of use
from typing import Union
from pathlib import PosixPath, Path
import json
def exparams_from_meta(meta: Union[dict, PosixPath, str], extras=["ph"]):
"""
Obtain parameters from metadata of hdf5 file
"""
meta = meta if isinstance(meta, dict) else load_attributes(meta)
base = {
"tree": {"general": {"None": ["area", "volume"]}},
"multichannel_ops": {},
}
av_channels = {
"Citrine",
"GFP",
"GFPFast",
"mCherry",
"pHluorin405",
"Flavin",
"Cy5",
"mKO2",
}
default_reductions = {"np_max"}
default_metrics = {"mean", "median", "imBackground", "max2p5pc"}
default_rm = {r: default_metrics for r in default_reductions}
av_flch = av_channels.intersection(meta["channels/channel"]).difference(
{"Brightfield, DIC"}
)
for ch in av_flch:
base["tree"][ch] = default_rm
base["sub_bg"] = av_flch
# Additional extraction
if "ph" in extras and {"pHluorin405", "GFPFast"}.issubset(av_flch):
sets = {
b + a: (x, y)
for a, x in zip(
["", "_bgsub"],
(
["GFPFast", "pHluorin405"],
["GFPFast_bgsub", "pHluorin405_bgsub"],
),
)
for b, y in zip(["em_ratio", "gsum"], ["div0", "np_add"])
}
for i, v in sets.items():
base["multichannel_ops"][i] = [
*v,
default_rm,
]
return base
def load_attributes(file: str, group="/"):
with h5py.File(file, "r") as f:
meta = dict(f[group].attrs.items())
return meta
import numpy as np
def trap_apply(cell_fun, cell_masks, trap_image, **kwargs):
"""
Apply a cell_function to a mask, trap_image pair
:param cell_fun: function to apply to a cell (from extraction/cell.py)
:param cell_masks: (numpy 3d array) cells' segmentation mask
:param trap_image: the image for the trap in which the cell is (all
channels)
:**kwargs: parameters to pass if needed for custom functions
"""
cells_iter = (*range(cell_masks.shape[2]),)
return [cell_fun(cell_masks[..., i], trap_image, **kwargs) for i in cells_iter]
def reduce_z(trap_image, fun):
# Optimise the reduction function if possible
if isinstance(fun, np.ufunc):
return fun.reduce(trap_image, axis=2)
else:
return np.apply_along_axis(fun, 2, trap_image)
from yaml import load, dump
def dict_to_yaml(d, f):
with open(f, "w") as f:
dump(d, f)
def add_attrs(hdfile, path, files):
group = hdfile.create_group(path)
for k, v in files:
group.attrs[k] = v
import numpy as np
from inspect import getmembers, isfunction, getargspec
from extraction.core.functions import cell, trap
from extraction.core.functions.custom import localisation
from extraction.core.functions.math import div0
from extraction.core.functions.distributors import trap_apply
def load_cellfuns_core():
# Generate str -> trap_function dict from functions in core.cell
return {f[0]: f[1] for f in getmembers(cell) if isfunction(f[1])}
def load_custom_args():
"""
Load custom functions. If they have extra arguments also load these
"""
funs = {f[0]: f[1] for f in getmembers(localisation) if isfunction(f[1])}
args = {
k: getargspec(v).args[2:]
for k, v in funs.items()
if set(["cell_mask", "trap_image"]).intersection(getargspec(v).args)
}
return ({k: funs[k] for k in args.keys()}, {k: v for k, v in args.items() if v})
def load_cellfuns():
# Generate str -> trap_function dict from core.cell and core.trap functions
cell_funs = load_cellfuns_core()
CELLFUNS = {}
for k, f in cell_funs.items():
if isfunction(f):
def tmp(f):
return lambda m, img: trap_apply(f, m, img)
CELLFUNS[k] = tmp(f)
return CELLFUNS
def load_trapfuns():
TRAPFUNS = {f[0]: f[1] for f in getmembers(trap) if isfunction(f[1])}
return TRAPFUNS
def load_funs():
CELLFUNS = load_cellfuns()
TRAPFUNS = load_trapfuns()
return CELLFUNS, TRAPFUNS, {**TRAPFUNS, **CELLFUNS}
def load_redfuns(): # TODO make defining reduction functions more flexible
RED_FUNS = {
"np_max": np.maximum,
"np_mean": np.mean,
"np_median": np.median,
"None": None,
}
return RED_FUNS
def load_mergefuns():
MERGE_FUNS = {"div0": div0, "np_add": np.add}
return MERGE_FUNS
import numpy as np
def div0(a, b, fill=0):
"""a / b, divide by 0 -> `fill`
div0( [-1, 0, 1], 0, fill=np.nan) -> [nan nan nan]
div0( 1, 0, fill=np.inf ) -> inf
"""
with np.errstate(divide="ignore", invalid="ignore"):
c = np.true_divide(a, b)
if np.isscalar(c):
return c if np.isfinite(c) else fill
else:
c[~np.isfinite(c)] = fill
return c
## Trap-wise calculations
import numpy as np
def imBackground(cell_masks, trap_image):
'''
:param cell_masks: (numpy 3d array) cells' segmentation mask
:param trap_image: the image for the trap in which the cell is (all
channels)
'''
if not len(cell_masks):
cell_masks = np.zeros_like(trap_image)
background = ~cell_masks.sum(axis=2).astype(bool)
return (np.median(trap_image[np.where(background)]))
from collections import deque
def depth(d):
"""
Copied from https://stackoverflow.com/a/23499088
Used to determine the depth of our config trees and fill them
"""
queue = deque([(id(d), d, 1)])
memo = set()
while queue:
id_, o, level = queue.popleft()
if id_ in memo:
continue
memo.add(id_)
if isinstance(o, dict):
queue += ((id(v), v, level + 1) for v in o.values())
return level
from copy import copy
def reassign_mo_bud(mo_bud, trans):
"""
Update mother_bud dictionary using another dict with tracks joined
input
:param mo_bud: dict with mother's ids as keys and daughters' as values
:param trans: dict of joint tracks where moved track -> static track
output
mo_bud with updated cell ids
"""
val2lst = lambda x: [j for i in x.values() for j in i]
bud_inter=set(val2lst(mo_bud)).intersection(trans.keys())
# translate daughters
mo_bud = copy(mo_bud)
for k,das in mo_bud.items():
for da in bud_inter.intersection(das):
mo_bud[k][mo_bud[k].index(da)] = trans[da]
# translate mothers
mo_inter = set(mo_bud.keys()).intersection(trans.keys())
for k in mo_inter:
mo_bud[trans[k]] = mo_bud.get(trans[k], []) + mo_bud[k]
del mo_bud[k]
return mo_bud
from tqdm import tqdm
from omero.gateway import BlitzGateway
# Helper funs
def connect_omero():
conn = BlitzGateway(*get_creds(), host='islay.bio.ed.ac.uk', port=4064)
conn.connect()
return conn
def get_creds():
return('upload',
'***REMOVED***', #OMERO Password
)
def download_file(f):
"""
Download file in chunks using FileWrapper object
"""
desc = 'Downloading ' + f.getFileName() + \
' (' + str(round(f.getFileSize()/1000**2, 2)) + 'Mb)'
down_file = bytearray()
for c in tqdm(f.getFileInChunks(), desc=desc):
down_file += c
return down_file
# Example of argo experiment explorer
from aliby.utils.argo import Argo
from extraction.core.extractor import Extractor
from extraction.core.parameters import Parameters
from extraction.core.functions.defaults import get_params
argo = Argo()
argo.load()
# argo.channels("GFP")
argo.tags(["Alan"])
argo.complete()
# argo.cExperiment()
# argo.tiler_cells()
# params = Parameters(**get_params("batman_ph_dual_fast"))
# def try_extract(d):
# try:
# params = Parameters(**get_params("batman_ph_dual_fast"))
# ext = Extractor(params, source=d.getId())
# ext.load_tiler_cells()
# ext.process_experiment()
# print(d.getId(), d.getName(), "Experiment processed")
# return True
# except:
# print(d.getId(), d.getName(), "Experiment not processed")
# return False
# from multiprocessing.dummy import Pool as ThreadPool
# pool = ThreadPool(4)
# results = pool.map(try_extract, argo.dsets)
# import pickle
# with open("results.pkl", "wb") as f:
# pickle.dump(results, f)
import numpy as np
from pathlib import Path
from extraction.core.extractor import Extractor
from extraction.core.parameters import Parameters
from extraction.core.functions.defaults import get_params
params = Parameters(**get_params("batman_ph_dual_fast"))
# ext = Extractor(params, source=19918) # 19831
ext = Extractor(params, source=19831)
ext.load_tiler()
self = ext
# s=self.extract_exp(tree={'general':{None:['area']}, 'GFPFast':{np.maximum:['median','mean']}},poses=self.expt.positions[:2], tps=[0,1], stg='df')
s = self.extract_exp()
# # import cProfile
# # profile = cProfile.Profile()
# # profile.enable()
# # ext.change_position(ext.expt.positions[1])
# # tracks = self.extract_pos(
# # tree={('general'):{None: # Other metrics can be used
# # [tidy_metric]}})#['general',None,'area']
# # profile.disable()
# # import pstats
# # ps = pstats.Stats(profile)
# # ps.sort_stats('cumulative')
# # ps.print_stats()
from core.experiment import Experiment
from core.segment import Tiler
expt = Experiment.from_source(19310, #Experiment ID on OMERO
'upload', #OMERO Username
'***REMOVED***', #OMERO Password
'islay.bio.ed.ac.uk', #OMERO host
port=4064 #This is default
)
# Load whole position
img=expt[0,0,:,:,2]
plt.imshow(img[0,0,...,0]); plt.show()
# Manually get template
tilesize=117
x0=827
y0=632
trap_template = img[0,0,x0:x0+tilesize,y0:y0+tilesize,0]
plt.imshow(trap_template); plt.show()
tiler = Tiler(expt, template = trap_template)
# Load images (takes about 5 mins)
trap_tps = tiler.get_traps_timepoint(0, tile_size=117, z=[2])
#Plot found traps
nrows, ncols = (5,5)
fig, axes = plt.subplots(nrows,ncols)
for i in range(nrows):
for j in range(ncols):
if i*nrows+j < trap_tps.shape[0]:
axes[i,j].imshow(trap_tps[i*nrows+j,0,0,...,0])
plt.show()
Source diff could not be displayed: it is too large. Options to address this: view the blob.
[tool.poetry]
name = "aliby"
version = "0.1.7"
description = ""
version = "0.1.64"
description = "Process and analyse live-cell imaging data"
authors = ["Alan Munoz <alan.munoz@ed.ac.uk>"]
packages = [
{ include = "aliby" },
{ include = "extraction" },
{ include = "utils" },
{ include = "aliby", from="src" },
{ include = "extraction", from="src" },
{ include = "agora", from="src" },
{ include = "postprocessor", from="src" },
{ include = "logfile_parser", from="src" },
]
readme = "README.md"
[tool.poetry.scripts]
aliby-run = "aliby.bin.run:run"
aliby-annotate = "aliby.bin.annotate:annotate"
aliby-visualise = "aliby.bin.visualise:napari_overlay"
[build-system]
requires = ["setuptools", "poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.dependencies]
python = ">=3.7.1,<3.11"
ray = {version = "1.4.0", extras = ["tune"]}
numpy = "^1.21.4"
python = ">=3.8, <3.11"
PyYAML = "^6.0"
flatten-dict = "^0.4.2"
gaussianprocessderivatives = "^0.1.5"
numpy = ">=1.21.6"
Bottleneck = "^1.3.5"
opencv-python = "^4.7.0.72"
pathos = "^0.2.8" # Lambda-friendly multithreading
p-tqdm = "^1.3.3"
pandas = ">=1.3.3"
py-find-1st = "^1.1.5" # Fast indexing
scikit-learn = ">=1.0.2" # Used for an extraction metric
scipy = ">=1.7.3"
# Pipeline + I/O
dask = "^2021.12.0"
pathos = "^0.2.8"
tqdm = "^4.62.3"
pandas = "^1.3.5"
scikit-learn = "0.22.2.post1"
more-itertools = "^8.12.0"
py-find-1st = "^1.1.5"
scikit-image = "^0.19.1"
opencv-python = "^4.5.4"
imageio = "2.8.0" # For image-visualisation utilities
requests-toolbelt = "^0.9.1"
h5py = "2.10"
tables = "^3.6.1"
imageio = "2.8.0"
omero-py = ">=5.6.2"
zeroc-ice = "3.6.5"
tensorflow = ">=1.15,<=2.3"
aliby-agora = ">=0.2.0"
aliby-baby = "*"
aliby-post = "*"
omni-gaussian = "*"
[tool.poetry.dev-dependencies]
data-science-types = "^0.2.23"
black = "^21.12b0"
Sphinx = "^4.3.2"
pytest = "^6.2.5"
scikit-image = ">=0.18.1"
tqdm = "^4.62.3" # progress bars
xmltodict = "^0.13.0" # read ome-tiff metadata
zarr = "^2.14.0"
GitPython = "^3.1.27"
h5py = "2.10" # File I/O
# Networking
omero-py = { version = ">=5.6.2", optional = true } # contact omero server
# Baby segmentation
aliby-baby = {version = "^0.1.17", optional=true}
# Postprocessing
[tool.poetry.group.pp.dependencies]
leidenalg = "^0.8.8"
more-itertools = "^8.12.0"
pycatch22 = "^0.4.2"
[tool.poetry.group.pp]
optional = true
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
black = "^22.6.0"
mypy = "^0.930"
SQLAlchemy = "^1.4.29"
numpydoc = "^1.3.1"
isort = "^5.10.1"
jupyter = "^1.0.0"
flake8 = "^4.0.1"
pyright = "^1.1.258"
pre-commit = "^2.20.0"
seaborn = "^0.11.2"
debugpy = "^1.6.3"
coverage = "^7.0.4"
jupytext = "^1.14.4"
grid-strategy = "^0.0.1"
readchar = "^4.0.3"
ipdb = "^0.13.11"
[build-system]
requires = ["setuptools", "poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.group.docs]
optional = true
[tool.poetry.group.docs.dependencies]
Sphinx = "^5.2.0"
sphinx-rtd-theme = "^1.0.0"
sphinx-autodoc-typehints = "^1.19.2"
myst-parser = "^0.18.0"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^6.2.5"
[tool.poetry.group.utils]
optional = true
# Dependency groups can only be used by a poetry installation, not pip
[tool.poetry.group.utils.dependencies]
napari = {version = ">=0.4.16", optional=true}
Torch = {version = "^1.13.1", optional=true}
pytorch-lightning = {version = "^1.9.3", optional=true}
torchvision = {version = "^0.14.1", optional=true}
trio = {version = "^0.22.0", optional=true}
grid-strategy = {version = "^0.0.1", optional=true}
[tool.poetry.extras]
omero = ["omero-py"]
baby = ["aliby-baby"]
[tool.black]
line-length = 79
target-version = ['py38']
include = '\.pyi?$'
extend-exclude = '''
/(
\.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
)/
'''
[tool.isort]
profile = "black"
multi_line_output = 3
line_length = 79
include_trailing_comma = true
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "-ra -q"
testpaths = [
"tests",
]
from setuptools import setup, find_packages
print("find_packages outputs ", find_packages("aliby"))
setup(
name="aliby",
version="0.1.2",
packages=find_packages(),
# package_dir={"": "aliby"},
# packages=['aliby', 'aliby.io'],
# include_package_data=True,
url="",
license="",
author="Diane Adjavon",
author_email="diane.adjavon@ed.ac.uk",
description="Core utilities for microscopy pipeline",
python_requires=">=3.7",
install_requires=[
"numpy",
"dask",
"pathos",
"tqdm",
"pandas",
"scikit-image",
"opencv-python",
"requests_toolbelt",
"h5py==2.10",
"tables",
"imageio==2.8.0",
"omero-py>=5.6.2",
"pathos",
"zeroc-ice==3.6.5",
"tensorflow>=1.15,<=2.3",
"aliby-agora",
"aliby-argo",
"aliby-baby",
"aliby-extraction",
"aliby-post",
],
)
import logging
import typing as t
from abc import ABC, abstractmethod
from collections.abc import Iterable
from copy import copy
from pathlib import Path
from time import perf_counter
from typing import Union
from flatten_dict import flatten
from yaml import dump, safe_load
from agora.logging import timer
atomic = t.Union[int, float, str, bool]
class ParametersABC(ABC):
"""
Defines parameters as attributes and allows parameters to
be converted to either a dictionary or to yaml.
No attribute should be called "parameters"!
"""
def __init__(self, **kwargs):
"""
Defines parameters as attributes
"""
assert (
"parameters" not in kwargs
), "No attribute should be named parameters"
for k, v in kwargs.items():
setattr(self, k, v)
def to_dict(self, iterable="null") -> t.Dict:
"""
Recursive function to return a nested dictionary of the
attributes of the class instance.
"""
if isinstance(iterable, dict):
if any(
[
True
for x in iterable.values()
if isinstance(x, Iterable) or hasattr(x, "to_dict")
]
):
return {
k: v.to_dict()
if hasattr(v, "to_dict")
else self.to_dict(v)
for k, v in iterable.items()
}
else:
return iterable
elif iterable == "null":
# use instance's built-in __dict__ dictionary of attributes
return self.to_dict(self.__dict__)
else:
return iterable
def to_yaml(self, path: Union[Path, str] = None):
"""
Returns a yaml stream of the attributes of the class instance.
If path is provided, the yaml stream is saved there.
Parameters
----------
path : Union[Path, str]
Output path.
"""
if path:
with open(Path(path), "w") as f:
dump(self.to_dict(), f)
return dump(self.to_dict())
@classmethod
def from_dict(cls, d: dict):
return cls(**d)
@classmethod
def from_yaml(cls, source: Union[Path, str]):
"""
Returns instance from a yaml filename or stdin
"""
is_buffer = True
try:
if Path(source).exists():
is_buffer = False
except Exception as _:
assert isinstance(source, str), "Invalid source type."
if is_buffer:
params = safe_load(source)
else:
with open(source) as f:
params = safe_load(f)
return cls(**params)
@classmethod
def default(cls, **kwargs):
overriden_defaults = copy(cls._defaults)
for k, v in kwargs.items():
overriden_defaults[k] = v
return cls.from_dict(overriden_defaults)
def update(self, name: str, new_value):
"""
Update values recursively
if name is a dictionary, replace data where existing found or add if not.
It warns against type changes.
If the existing structure under name is a dictionary,
it looks for the first occurrence and modifies it accordingly.
If a leaf node that is to be changed is a collection, it adds the new elements.
"""
assert name not in (
"parameters",
"params",
), "Attribute can't be named params or parameters"
if name in self.__dict__:
if check_type_recursive(getattr(self, name), new_value):
print("Warnings:Type changes are risky")
if isinstance(getattr(self, name), dict):
flattened = flatten(self.to_dict())
names_found = [k for k in flattened.keys() if name in k]
found_idx = [keys.index(name) for keys in names_found]
assert len(names_found), f"{name} not found as key."
keys = None
if len(names_found) > 1:
for level in zip(found_idx, names_found):
if level == min(found_idx):
keys = level
print(
f"Warning: {name} was found in multiple keys. Selected {keys}"
)
break
else:
keys = names_found.pop()
if keys:
current_val = flattened.get(keys, None)
# if isinstance(current_val, t.Collection):
elif isinstance(getattr(self, name), t.Collection):
add_to_collection(getattr(self, name), new_value)
elif isinstance(getattr(self, name), set):
pass # TODO implement
new_d = getattr(self, name)
new_d.update(new_value)
setattr(self, name, new_d)
else:
setattr(self, name, new_value)
def add_to_collection(
collection: t.Collection, value: t.Union[atomic, t.Collection]
):
# Adds element(s) in place.
if not isinstance(value, t.Collection):
value = [value]
if isinstance(collection, list):
collection += value
elif isinstance(collection, set):
collection.update(value)
class ProcessABC(ABC):
"""
Base class for processes.
Defines parameters as attributes and requires run method to be defined.
"""
def __init__(self, parameters):
"""
Arguments
---------
parameters: instance of ParametersABC
"""
self._parameters = parameters
# convert parameters to dictionary
# and then define each parameter as an attribute
for k, v in parameters.to_dict().items():
setattr(self, k, v)
@property
def parameters(self):
return self._parameters
@abstractmethod
def run(self):
pass
def _log(self, message: str, level: str = "warning"):
# Log messages in the corresponding level
logger = logging.getLogger("aliby")
getattr(logger, level)(f"{self.__class__.__name__}: {message}")
def check_type_recursive(val1, val2):
same_types = True
if not isinstance(val1, type(val2)) and not all(
type(x) in (Path, str) for x in (val1, val2) # Ignore str->path
):
return False
if not isinstance(val1, t.Iterable) and not isinstance(val2, t.Iterable):
return isinstance(val1, type(val2))
elif isinstance(val1, (tuple, list)) and isinstance(val2, (tuple, list)):
return bool(
sum([check_type_recursive(v1, v2) for v1, v2 in zip(val1, val2)])
)
elif isinstance(val1, dict) and isinstance(val2, dict):
if not len(val1) or not len(val2):
return False
for k in val2.keys():
same_types = same_types and check_type_recursive(val1[k], val2[k])
return same_types
class StepABC(ProcessABC):
"""
Base class that expands on ProcessABC to include tools used by Aliby steps.
It adds a setup step, logging and benchmarking for time benchmarks.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@abstractmethod
def _run_tp(self):
pass
@timer
def run_tp(self, tp: int, **kwargs):
"""
Time and log the timing of a step.
"""
return self._run_tp(tp, **kwargs)
def run(self):
# Replace run with run_tp
raise Warning("Steps use run_tp instead of run")
"""
Tools to interact with h5 files and handle data consistently.
"""
import collections
import logging
import typing as t
from itertools import chain, groupby, product
from typing import Union
import h5py
import numpy as np
import yaml
class BridgeH5:
"""
Base class to interact with h5 files.
It includes functions that predict how long segmentation will take.
"""
def __init__(self, filename, flag="r"):
"""Initialise with the name of the h5 file."""
self.filename = filename
if flag is not None:
self._hdf = h5py.File(filename, flag)
self._filecheck
def _log(self, message: str, level: str = "warn"):
# Log messages in the corresponding level
logger = logging.getLogger("aliby")
getattr(logger, level)(f"{self.__class__.__name__}: {message}")
def _filecheck(self):
assert "cell_info" in self._hdf, "Invalid file. No 'cell_info' found."
def close(self):
"""Close the h5 file."""
self._hdf.close()
@property
def meta_h5(self) -> t.Dict[str, t.Any]:
"""Return metadata, defining it if necessary."""
if not hasattr(self, "_meta_h5"):
with h5py.File(self.filename, "r") as f:
self._meta_h5 = dict(f.attrs)
return self._meta_h5
@property
def cell_tree(self):
return self.get_info_tree()
@staticmethod
def get_consecutives(tree, nstepsback):
"""Receives a sorted tree and returns the keys of consecutive elements."""
# get tp level
vals = {k: np.array(list(v)) for k, v in tree.items()}
# get indices of consecutive elements
where_consec = [
{
k: np.where(np.subtract(v[n + 1 :], v[: -n - 1]) == n + 1)[0]
for k, v in vals.items()
}
for n in range(nstepsback)
]
return where_consec
def get_npairs(self, nstepsback=2, tree=None):
if tree is None:
tree = self.cell_tree
consecutive = self.get_consecutives(tree, nstepsback=nstepsback)
flat_tree = flatten(tree)
n_predictions = 0
for i, d in enumerate(consecutive, 1):
flat = list(chain(*[product([k], list(v)) for k, v in d.items()]))
pairs = [(f, (f[0], f[1] + i)) for f in flat]
for p in pairs:
n_predictions += len(flat_tree.get(p[0], [])) * len(
flat_tree.get(p[1], [])
)
return n_predictions
def get_npairs_over_time(self, nstepsback=2):
tree = self.cell_tree
npairs = []
for tp in self._hdf["cell_info"]["processed_timepoints"][()]:
tmp_tree = {
k: {k2: v2 for k2, v2 in v.items() if k2 <= tp}
for k, v in tree.items()
}
npairs.append(self.get_npairs(tree=tmp_tree))
return np.diff(npairs)
def get_info_tree(
self, fields: Union[tuple, list] = ("trap", "timepoint", "cell_label")
):
"""
Return traps, time points and labels for this position in the form of a tree in the hierarchy determined by the argument fields.
Note that it is compressed to non-empty elements and timepoints.
Default hierarchy is:
- trap
- time point
- cell label
This function currently produces trees of depth 3, but it can easily be extended for deeper trees if needed (e.g. considering groups, chambers and/or positions).
Parameters
----------
fields: list of strs
Fields to fetch from 'cell_info' inside the h5 file.
Returns
----------
Nested dictionary where keys (or branches) are the upper levels and the leaves are the last element of :fields:.
"""
zipped_info = (*zip(*[self._hdf["cell_info"][f][()] for f in fields]),)
return recursive_groupsort(zipped_info)
def groupsort(iterable: Union[tuple, list]):
"""Sorts iterable and returns a dictionary where the values are grouped by the first element."""
iterable = sorted(iterable, key=lambda x: x[0])
grouped = {
k: [x[1:] for x in v] for k, v in groupby(iterable, lambda x: x[0])
}
return grouped
def recursive_groupsort(iterable):
"""Recursive extension of groupsort."""
if len(iterable[0]) > 1:
return {
k: recursive_groupsort(v) for k, v in groupsort(iterable).items()
}
else:
# only two elements in list
return [x[0] for x in iterable]
def flatten(d, parent_key="", sep="_"):
"""Flatten nested dict. Adapted from https://stackoverflow.com/a/6027615."""
items = []
for k, v in d.items():
new_key = parent_key + (k,) if parent_key else (k,)
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def attrs_from_h5(fpath: str):
"""Return attributes as dict from an h5 file."""
with h5py.File(fpath, "r") as f:
return dict(f.attrs)
def image_creds_from_h5(fpath: str):
"""Return image id and server credentials from an h5."""
attrs = attrs_from_h5(fpath)
return (
attrs["image_id"],
{
k: yaml.safe_load(attrs["parameters"])["general"][k]
for k in ("username", "password", "host")
},
)