Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • swain-lab/aliby/aliby-mirror
  • swain-lab/aliby/alibylite
2 results
Show changes
Showing
with 7256 additions and 2744 deletions
import numpy as np
def trap_apply(cell_fun, cell_masks, *args, **kwargs):
"""
Apply a cell_function to a mask, trap_image pair
:param cell_fun: function to apply to a cell (from extraction/cell.py)
:param cell_masks: (numpy 3d array) cells' segmentation mask
:param trap_image: (Optional) the image for the trap in which the cell is (all
channels)
:**kwargs: parameters to pass if needed for custom functions
"""
cells_iter = (*range(cell_masks.shape[2]),)
return [cell_fun(cell_masks[..., i], *args, **kwargs) for i in cells_iter]
def reduce_z(trap_image, fun):
# Optimise the reduction function if possible
if isinstance(fun, np.ufunc):
return fun.reduce(trap_image, axis=2)
else:
return np.apply_along_axis(fun, 2, trap_image)
from yaml import dump, load
def dict_to_yaml(d, f):
with open(f, "w") as f:
dump(d, f)
def add_attrs(hdfile, path, files):
group = hdfile.create_group(path)
for k, v in files:
group.attrs[k] = v
import numpy as np
def div0(a, b, fill=0):
"""a / b, divide by 0 -> `fill`
div0( [-1, 0, 1], 0, fill=np.nan) -> [nan nan nan]
div0( 1, 0, fill=np.inf ) -> inf
"""
with np.errstate(divide="ignore", invalid="ignore"):
c = np.true_divide(a, b)
if np.isscalar(c):
return c if np.isfinite(c) else fill
else:
c[~np.isfinite(c)] = fill
return c
from collections import deque
def depth(d):
"""
Copied from https://stackoverflow.com/a/23499088
Used to determine the depth of our config trees and fill them
"""
queue = deque([(id(d), d, 1)])
memo = set()
while queue:
id_, o, level = queue.popleft()
if id_ in memo:
continue
memo.add(id_)
if isinstance(o, dict):
queue += ((id(v), v, level + 1) for v in o.values())
return level
from omero.gateway import BlitzGateway
from tqdm import tqdm
# Helper funs
def connect_omero():
conn = BlitzGateway(*get_creds(), host="islay.bio.ed.ac.uk", port=4064)
conn.connect()
return conn
def get_creds():
return (
"upload",
"***REMOVED***", # OMERO Password
)
def download_file(f):
"""
Download file in chunks using FileWrapper object
"""
desc = (
"Downloading "
+ f.getFileName()
+ " ("
+ str(round(f.getFileSize() / 1000**2, 2))
+ "Mb)"
)
down_file = bytearray()
for c in tqdm(f.getFileInChunks(), desc=desc):
down_file += c
return down_file
# Example of argo experiment explorer
from aliby.utils.argo import Argo
from extraction.core.extractor import Extractor
from extraction.core.functions.defaults import get_params
from extraction.core.parameters import Parameters
argo = Argo()
argo.load()
# argo.channels("GFP")
argo.tags(["Alan"])
argo.complete()
# argo.cExperiment()
# argo.tiler_cells()
# params = Parameters(**get_params("batman_ph_dual_fast"))
# def try_extract(d):
# try:
# params = Parameters(**get_params("batman_ph_dual_fast"))
# ext = Extractor(params, source=d.getId())
# ext.load_tiler_cells()
# ext.process_experiment()
# print(d.getId(), d.getName(), "Experiment processed")
# return True
# except:
# print(d.getId(), d.getName(), "Experiment not processed")
# return False
# from multiprocessing.dummy import Pool as ThreadPool
# pool = ThreadPool(4)
# results = pool.map(try_extract, argo.dsets)
# import pickle
# with open("results.pkl", "wb") as f:
# pickle.dump(results, f)
from pathlib import Path
import numpy as np
from extraction.core.extractor import Extractor
from extraction.core.functions.defaults import get_params
from extraction.core.parameters import Parameters
params = Parameters(**get_params("batman_ph_dual_fast"))
# ext = Extractor(params, source=19918) # 19831
ext = Extractor(params, source=19831)
ext.load_tiler()
self = ext
# s=self.extract_exp(tree={'general':{None:['area']}, 'GFPFast':{np.maximum:['median','mean']}},poses=self.expt.positions[:2], tps=[0,1], stg='df')
s = self.extract_exp()
# # import cProfile
# # profile = cProfile.Profile()
# # profile.enable()
# # ext.change_position(ext.expt.positions[1])
# # tracks = self.extract_pos(
# # tree={('general'):{None: # Other metrics can be used
# # [tidy_metric]}})#['general',None,'area']
# # profile.disable()
# # import pstats
# # ps = pstats.Stats(profile)
# # ps.sort_stats('cumulative')
# # ps.print_stats()
This diff is collapsed.
[tool.poetry]
name = "aliby"
version = "0.1.35"
version = "0.1.64"
description = "Process and analyse live-cell imaging data"
authors = ["Alan Munoz <alan.munoz@ed.ac.uk>"]
packages = [
{ include = "aliby" },
{ include = "extraction" },
{ include = "aliby", from="src" },
{ include = "extraction", from="src" },
{ include = "agora", from="src" },
{ include = "postprocessor", from="src" },
{ include = "logfile_parser", from="src" },
]
readme = "README.md"
[tool.poetry.scripts]
aliby-run = "aliby.bin.run:run"
aliby-annotate = "aliby.bin.annotate:annotate"
aliby-visualise = "aliby.bin.visualise:napari_overlay"
[build-system]
requires = ["setuptools", "poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.dependencies]
python = ">=3.7.1,<3.11"
numpy = "1.21.6" # Pinning numpy and pandas makes dep resolution much faster
pandas = "1.3.3"
python = ">=3.8, <3.11"
PyYAML = "^6.0"
flatten-dict = "^0.4.2"
gaussianprocessderivatives = "^0.1.5"
numpy = ">=1.21.6"
Bottleneck = "^1.3.5"
opencv-python = "^4.7.0.72"
pathos = "^0.2.8" # Lambda-friendly multithreading
p-tqdm = "^1.3.3"
pandas = ">=1.3.3"
py-find-1st = "^1.1.5" # Fast indexing
scikit-learn = ">=1.0.2" # Used for an extraction metric
scipy = ">=1.7.3"
# Pipeline + I/O
dask = "^2021.12.0"
pathos = "^0.2.8"
tqdm = "^4.62.3"
scikit-learn = "0.22.2.post1"
py-find-1st = "^1.1.5"
scikit-image = ">=0.18.1"
opencv-python = "*"
imageio = "2.8.0" # For image-visualisation utilities
requests-toolbelt = "^0.9.1"
h5py = "2.10" # I/O into files
imageio = "2.8.0"
omero-py = ">=5.6.2" # contact omero server
aliby-agora = "^0.2.30"
aliby-baby = "^0.1.13"
aliby-post = "^0.1.31"
p-tqdm = "^1.3.3" # Parallel progress bars
scikit-image = ">=0.18.1"
tqdm = "^4.62.3" # progress bars
xmltodict = "^0.13.0" # read ome-tiff metadata
zeroc-ice = {version="3.6.5"} # networking interface, slow to build
# zeroc-ice = {version="3.6.5", optional=true} # To be set as optional in the future
zarr = "^2.14.0"
GitPython = "^3.1.27"
h5py = "2.10" # File I/O
[tool.poetry.dev-dependencies]
black = "^22.3.0"
Sphinx = "^4.3.2"
pytest = "^6.2.5"
# Networking
omero-py = { version = ">=5.6.2", optional = true } # contact omero server
# Baby segmentation
aliby-baby = {version = "^0.1.17", optional=true}
# Postprocessing
[tool.poetry.group.pp.dependencies]
leidenalg = "^0.8.8"
more-itertools = "^8.12.0"
pycatch22 = "^0.4.2"
[tool.poetry.group.pp]
optional = true
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
black = "^22.6.0"
mypy = "^0.930"
numpydoc = "^1.3.1"
pyflakes = "^2.4.0"
isort = "^5.10.1"
jupyter = "^1.0.0"
flake8 = "^4.0.1"
pyright = "^1.1.258"
pre-commit = "^2.20.0"
seaborn = "^0.11.2"
debugpy = "^1.6.3"
coverage = "^7.0.4"
jupytext = "^1.14.4"
grid-strategy = "^0.0.1"
readchar = "^4.0.3"
ipdb = "^0.13.11"
[build-system]
requires = ["setuptools", "poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.group.docs]
optional = true
[tool.poetry.group.docs.dependencies]
Sphinx = "^5.2.0"
sphinx-rtd-theme = "^1.0.0"
sphinx-autodoc-typehints = "^1.19.2"
myst-parser = "^0.18.0"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^6.2.5"
[tool.poetry.group.utils]
optional = true
# Dependency groups can only be used by a poetry installation, not pip
[tool.poetry.group.utils.dependencies]
napari = {version = ">=0.4.16", optional=true}
Torch = {version = "^1.13.1", optional=true}
pytorch-lightning = {version = "^1.9.3", optional=true}
torchvision = {version = "^0.14.1", optional=true}
trio = {version = "^0.22.0", optional=true}
grid-strategy = {version = "^0.0.1", optional=true}
[tool.poetry.extras]
omero = ["omero-py"]
baby = ["aliby-baby"]
[tool.black]
line-length = 79
target-version = ['py37']
target-version = ['py38']
include = '\.pyi?$'
extend-exclude = '''
# A regex preceded with ^/ will apply only to files and directories
# in the root of the project.
^/foo.py # exclude a file named foo.py in the root of the project (in addition to the defaults)
/(
\.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
)/
'''
[tool.isort]
profile = "black"
multi_line_output = 3
line_length = 79
include_trailing_comma = true
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "-ra -q"
testpaths = [
"tests",
]
from setuptools import setup, find_packages
print("find_packages outputs ", find_packages("aliby"))
setup(
name="aliby",
version="0.1.2",
packages=find_packages(),
# package_dir={"": "aliby"},
# packages=['aliby', 'aliby.io'],
# include_package_data=True,
url="",
license="",
author="Diane Adjavon",
author_email="diane.adjavon@ed.ac.uk",
description="Core utilities for microscopy pipeline",
python_requires=">=3.7",
install_requires=[
"numpy",
"dask",
"pathos",
"tqdm",
"pandas",
"scikit-image",
"opencv-python",
"requests_toolbelt",
"h5py==2.10",
"tables",
"imageio==2.8.0",
"omero-py>=5.6.2",
"pathos",
"zeroc-ice==3.6.5",
"tensorflow>=1.15,<=2.3",
"aliby-agora",
"aliby-argo",
"aliby-baby",
"aliby-extraction",
"aliby-post",
],
)
import logging
import typing as t
from abc import ABC, abstractmethod
from collections.abc import Iterable
from copy import copy
from pathlib import Path
from time import perf_counter
from typing import Union
from flatten_dict import flatten
from yaml import dump, safe_load
from agora.logging import timer
atomic = t.Union[int, float, str, bool]
class ParametersABC(ABC):
"""
Defines parameters as attributes and allows parameters to
be converted to either a dictionary or to yaml.
No attribute should be called "parameters"!
"""
def __init__(self, **kwargs):
"""
Defines parameters as attributes
"""
assert (
"parameters" not in kwargs
), "No attribute should be named parameters"
for k, v in kwargs.items():
setattr(self, k, v)
def to_dict(self, iterable="null") -> t.Dict:
"""
Recursive function to return a nested dictionary of the
attributes of the class instance.
"""
if isinstance(iterable, dict):
if any(
[
True
for x in iterable.values()
if isinstance(x, Iterable) or hasattr(x, "to_dict")
]
):
return {
k: v.to_dict()
if hasattr(v, "to_dict")
else self.to_dict(v)
for k, v in iterable.items()
}
else:
return iterable
elif iterable == "null":
# use instance's built-in __dict__ dictionary of attributes
return self.to_dict(self.__dict__)
else:
return iterable
def to_yaml(self, path: Union[Path, str] = None):
"""
Returns a yaml stream of the attributes of the class instance.
If path is provided, the yaml stream is saved there.
Parameters
----------
path : Union[Path, str]
Output path.
"""
if path:
with open(Path(path), "w") as f:
dump(self.to_dict(), f)
return dump(self.to_dict())
@classmethod
def from_dict(cls, d: dict):
return cls(**d)
@classmethod
def from_yaml(cls, source: Union[Path, str]):
"""
Returns instance from a yaml filename or stdin
"""
is_buffer = True
try:
if Path(source).exists():
is_buffer = False
except Exception as _:
assert isinstance(source, str), "Invalid source type."
if is_buffer:
params = safe_load(source)
else:
with open(source) as f:
params = safe_load(f)
return cls(**params)
@classmethod
def default(cls, **kwargs):
overriden_defaults = copy(cls._defaults)
for k, v in kwargs.items():
overriden_defaults[k] = v
return cls.from_dict(overriden_defaults)
def update(self, name: str, new_value):
"""
Update values recursively
if name is a dictionary, replace data where existing found or add if not.
It warns against type changes.
If the existing structure under name is a dictionary,
it looks for the first occurrence and modifies it accordingly.
If a leaf node that is to be changed is a collection, it adds the new elements.
"""
assert name not in (
"parameters",
"params",
), "Attribute can't be named params or parameters"
if name in self.__dict__:
if check_type_recursive(getattr(self, name), new_value):
print("Warnings:Type changes are risky")
if isinstance(getattr(self, name), dict):
flattened = flatten(self.to_dict())
names_found = [k for k in flattened.keys() if name in k]
found_idx = [keys.index(name) for keys in names_found]
assert len(names_found), f"{name} not found as key."
keys = None
if len(names_found) > 1:
for level in zip(found_idx, names_found):
if level == min(found_idx):
keys = level
print(
f"Warning: {name} was found in multiple keys. Selected {keys}"
)
break
else:
keys = names_found.pop()
if keys:
current_val = flattened.get(keys, None)
# if isinstance(current_val, t.Collection):
elif isinstance(getattr(self, name), t.Collection):
add_to_collection(getattr(self, name), new_value)
elif isinstance(getattr(self, name), set):
pass # TODO implement
new_d = getattr(self, name)
new_d.update(new_value)
setattr(self, name, new_d)
else:
setattr(self, name, new_value)
def add_to_collection(
collection: t.Collection, value: t.Union[atomic, t.Collection]
):
# Adds element(s) in place.
if not isinstance(value, t.Collection):
value = [value]
if isinstance(collection, list):
collection += value
elif isinstance(collection, set):
collection.update(value)
class ProcessABC(ABC):
"""
Base class for processes.
Defines parameters as attributes and requires run method to be defined.
"""
def __init__(self, parameters):
"""
Arguments
---------
parameters: instance of ParametersABC
"""
self._parameters = parameters
# convert parameters to dictionary
# and then define each parameter as an attribute
for k, v in parameters.to_dict().items():
setattr(self, k, v)
@property
def parameters(self):
return self._parameters
@abstractmethod
def run(self):
pass
def _log(self, message: str, level: str = "warning"):
# Log messages in the corresponding level
logger = logging.getLogger("aliby")
getattr(logger, level)(f"{self.__class__.__name__}: {message}")
def check_type_recursive(val1, val2):
same_types = True
if not isinstance(val1, type(val2)) and not all(
type(x) in (Path, str) for x in (val1, val2) # Ignore str->path
):
return False
if not isinstance(val1, t.Iterable) and not isinstance(val2, t.Iterable):
return isinstance(val1, type(val2))
elif isinstance(val1, (tuple, list)) and isinstance(val2, (tuple, list)):
return bool(
sum([check_type_recursive(v1, v2) for v1, v2 in zip(val1, val2)])
)
elif isinstance(val1, dict) and isinstance(val2, dict):
if not len(val1) or not len(val2):
return False
for k in val2.keys():
same_types = same_types and check_type_recursive(val1[k], val2[k])
return same_types
class StepABC(ProcessABC):
"""
Base class that expands on ProcessABC to include tools used by Aliby steps.
It adds a setup step, logging and benchmarking for time benchmarks.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@abstractmethod
def _run_tp(self):
pass
@timer
def run_tp(self, tp: int, **kwargs):
"""
Time and log the timing of a step.
"""
return self._run_tp(tp, **kwargs)
def run(self):
# Replace run with run_tp
raise Warning("Steps use run_tp instead of run")
#!/usr/bin/env python3
"""
Tools to interact with h5 files and handle data consistently.
"""
import collections
import logging
import typing as t
from itertools import chain, groupby, product
from typing import Union
import h5py
import numpy as np
import yaml
class BridgeH5:
"""
Base class to interact with h5 files.
It includes functions that predict how long segmentation will take.
"""
def __init__(self, filename, flag="r"):
"""Initialise with the name of the h5 file."""
self.filename = filename
if flag is not None:
self._hdf = h5py.File(filename, flag)
self._filecheck
def _log(self, message: str, level: str = "warn"):
# Log messages in the corresponding level
logger = logging.getLogger("aliby")
getattr(logger, level)(f"{self.__class__.__name__}: {message}")
def _filecheck(self):
assert "cell_info" in self._hdf, "Invalid file. No 'cell_info' found."
def close(self):
"""Close the h5 file."""
self._hdf.close()
@property
def meta_h5(self) -> t.Dict[str, t.Any]:
"""Return metadata, defining it if necessary."""
if not hasattr(self, "_meta_h5"):
with h5py.File(self.filename, "r") as f:
self._meta_h5 = dict(f.attrs)
return self._meta_h5
@property
def cell_tree(self):
return self.get_info_tree()
@staticmethod
def get_consecutives(tree, nstepsback):
"""Receives a sorted tree and returns the keys of consecutive elements."""
# get tp level
vals = {k: np.array(list(v)) for k, v in tree.items()}
# get indices of consecutive elements
where_consec = [
{
k: np.where(np.subtract(v[n + 1 :], v[: -n - 1]) == n + 1)[0]
for k, v in vals.items()
}
for n in range(nstepsback)
]
return where_consec
def get_npairs(self, nstepsback=2, tree=None):
if tree is None:
tree = self.cell_tree
consecutive = self.get_consecutives(tree, nstepsback=nstepsback)
flat_tree = flatten(tree)
n_predictions = 0
for i, d in enumerate(consecutive, 1):
flat = list(chain(*[product([k], list(v)) for k, v in d.items()]))
pairs = [(f, (f[0], f[1] + i)) for f in flat]
for p in pairs:
n_predictions += len(flat_tree.get(p[0], [])) * len(
flat_tree.get(p[1], [])
)
return n_predictions
def get_npairs_over_time(self, nstepsback=2):
tree = self.cell_tree
npairs = []
for tp in self._hdf["cell_info"]["processed_timepoints"][()]:
tmp_tree = {
k: {k2: v2 for k2, v2 in v.items() if k2 <= tp}
for k, v in tree.items()
}
npairs.append(self.get_npairs(tree=tmp_tree))
return np.diff(npairs)
def get_info_tree(
self, fields: Union[tuple, list] = ("trap", "timepoint", "cell_label")
):
"""
Return traps, time points and labels for this position in the form of a tree in the hierarchy determined by the argument fields.
Note that it is compressed to non-empty elements and timepoints.
Default hierarchy is:
- trap
- time point
- cell label
This function currently produces trees of depth 3, but it can easily be extended for deeper trees if needed (e.g. considering groups, chambers and/or positions).
Parameters
----------
fields: list of strs
Fields to fetch from 'cell_info' inside the h5 file.
Returns
----------
Nested dictionary where keys (or branches) are the upper levels and the leaves are the last element of :fields:.
"""
zipped_info = (*zip(*[self._hdf["cell_info"][f][()] for f in fields]),)
return recursive_groupsort(zipped_info)
def groupsort(iterable: Union[tuple, list]):
"""Sorts iterable and returns a dictionary where the values are grouped by the first element."""
iterable = sorted(iterable, key=lambda x: x[0])
grouped = {
k: [x[1:] for x in v] for k, v in groupby(iterable, lambda x: x[0])
}
return grouped
def recursive_groupsort(iterable):
"""Recursive extension of groupsort."""
if len(iterable[0]) > 1:
return {
k: recursive_groupsort(v) for k, v in groupsort(iterable).items()
}
else:
# only two elements in list
return [x[0] for x in iterable]
def flatten(d, parent_key="", sep="_"):
"""Flatten nested dict. Adapted from https://stackoverflow.com/a/6027615."""
items = []
for k, v in d.items():
new_key = parent_key + (k,) if parent_key else (k,)
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def attrs_from_h5(fpath: str):
"""Return attributes as dict from an h5 file."""
with h5py.File(fpath, "r") as f:
return dict(f.attrs)
def image_creds_from_h5(fpath: str):
"""Return image id and server credentials from an h5."""
attrs = attrs_from_h5(fpath)
return (
attrs["image_id"],
{
k: yaml.safe_load(attrs["parameters"])["general"][k]
for k in ("username", "password", "host")
},
)
This diff is collapsed.
#!/usr/bin/env jupyter
"""
Convenience decorators to extend commonly-used methods or functions.
"""
import typing as t
from functools import wraps
def _first_arg_str_to_df(
fn: t.Callable,
):
"""Enable Signal-like classes to convert strings to data sets."""
@wraps(fn)
def format_input(*args, **kwargs):
cls = args[0]
data = args[1]
if isinstance(data, str):
# get data from h5 file
data = cls.get_raw(data)
# replace path in the undecorated function with data
return fn(cls, data, *args[2:], **kwargs)
return format_input
"""
Anthology of interfaces fordispatch_metadata_parse different parsers and lack of them.
ALIBY decides on using different metadata parsers based on two elements:
1. The parameter given by PipelineParameters (Either True/False, or a string pointing to the metadata file)
2. The available files in the root folder where images are found (remote or locally)
If parameters is a string pointing to a metadata file, ALIBY picks a parser based on the file format.
If parameters is True (as a boolean), ALIBY searches for any available file and uses the first valid one.
If there are no metadata files, ALIBY requires indicating indices for tiler, segmentation and extraction.
"""
import glob
import logging
import os
import typing as t
from datetime import datetime
from pathlib import Path
import pandas as pd
from pytz import timezone
from agora.io.writer import Writer
from logfile_parser import Parser
from logfile_parser.swainlab_parser import parse_from_swainlab_grammar
class MetaData:
"""Small metadata Process that loads log."""
def __init__(self, log_dir, store):
self.log_dir = log_dir
self.store = store
self.metadata_writer = Writer(self.store)
def __getitem__(self, item):
return self.load_logs()[item]
def load_logs(self):
# parsed_flattened = parse_logfiles(self.log_dir)
parsed_flattened = dispatch_metadata_parser(self.log_dir)
return parsed_flattened
def run(self, overwrite=False):
metadata_dict = self.load_logs()
self.metadata_writer.write(
path="/", meta=metadata_dict, overwrite=overwrite
)
def add_field(self, field_name, field_value, **kwargs):
self.metadata_writer.write(
path="/",
meta={field_name: field_value},
**kwargs,
)
def add_fields(self, fields_values: dict, **kwargs):
for field, value in fields_values.items():
self.add_field(field, value)
# Paradigm: able to do something with all datatypes present in log files,
# then pare down on what specific information is really useful later.
# Needed because HDF5 attributes do not support dictionaries
def flatten_dict(nested_dict, separator="/"):
"""
Flattens nested dictionary. If empty return as-is.
"""
flattened = {}
if nested_dict:
df = pd.json_normalize(nested_dict, sep=separator)
flattened = df.to_dict(orient="records")[0] or {}
return flattened
# Needed because HDF5 attributes do not support datetime objects
# Takes care of time zones & daylight saving
def datetime_to_timestamp(time, locale="Europe/London"):
"""
Convert datetime object to UNIX timestamp
"""
return timezone(locale).localize(time).timestamp()
def find_file(root_dir, regex):
file = [
f
for f in glob.glob(os.path.join(str(root_dir), regex))
if Path(f).name != "aliby.log" # Skip filename reserved for aliby
]
if len(file) > 1:
print(
"Warning:Metadata: More than one logfile found. Defaulting to first option."
)
file = [sorted(file)[0]]
if len(file) == 0:
logging.getLogger("aliby").log(
logging.WARNING, "Metadata: No valid swainlab .log found."
)
else:
return file[0]
return None
# TODO: re-write this as a class if appropriate
# WARNING: grammars depend on the directory structure of a locally installed
# logfile_parser repo
def parse_logfiles(
root_dir,
acq_grammar="multiDGUI_acq_format.json",
log_grammar="multiDGUI_log_format.json",
):
"""
Parse acq and log files depending on the grammar specified, then merge into
single dict.
"""
# Both acq and log files contain useful information.
# ACQ_FILE = 'flavin_htb2_glucose_long_ramp_DelftAcq.txt'
# LOG_FILE = 'flavin_htb2_glucose_long_ramp_Delftlog.txt'
log_parser = Parser(log_grammar)
acq_parser = Parser(acq_grammar)
log_file = find_file(root_dir, "*log.txt")
acq_file = find_file(root_dir, "*[Aa]cq.txt")
parsed = {}
if log_file and acq_file:
with open(log_file, "r") as f:
log_parsed = log_parser.parse(f)
with open(acq_file, "r") as f:
acq_parsed = acq_parser.parse(f)
parsed = {**acq_parsed, **log_parsed}
for key, value in parsed.items():
if isinstance(value, datetime):
parsed[key] = datetime_to_timestamp(value)
parsed_flattened = flatten_dict(parsed)
for k, v in parsed_flattened.items():
if isinstance(v, list):
parsed_flattened[k] = [0 if el is None else el for el in v]
return parsed_flattened
def get_meta_swainlab(parsed_metadata: dict):
"""
Convert raw parsing of Swainlab logfile to the metadata interface.
Input:
--------
parsed_metadata: Dict[str, str or int or DataFrame or Dict]
default['general', 'image_config', 'device_properties', 'group_position', 'group_time', 'group_config']
Returns:
--------
Dictionary with metadata following the standard
"""
channels = parsed_metadata["image_config"]["Image config"].values.tolist()
# nframes = int(parsed_metadata["group_time"]["frames"].max())
# return {"channels": channels, "nframes": nframes}
return {"channels": channels}
def get_meta_from_legacy(parsed_metadata: dict):
result = parsed_metadata
result["channels"] = result["channels/channel"]
return result
def parse_swainlab_metadata(filedir: t.Union[str, Path]):
"""
Dispatcher function that determines which parser to use based on the file ending.
Input:
--------
filedir: Directory where the logfile is located.
Returns:
--------
Dictionary with minimal metadata
"""
filedir = Path(filedir)
filepath = find_file(filedir, "*.log")
if filepath:
raw_parse = parse_from_swainlab_grammar(filepath)
minimal_meta = get_meta_swainlab(raw_parse)
else:
if filedir.is_file() or str(filedir).endswith(".zarr"):
filedir = filedir.parent
legacy_parse = parse_logfiles(filedir)
minimal_meta = (
get_meta_from_legacy(legacy_parse) if legacy_parse else {}
)
return minimal_meta
def dispatch_metadata_parser(filepath: t.Union[str, Path]):
"""
Function to dispatch different metadata parsers that convert logfiles into a
basic metadata dictionary. Currently only contains the swainlab log parsers.
Input:
--------
filepath: str existing file containing metadata, or folder containing naming conventions
"""
parsed_meta = parse_swainlab_metadata(filepath)
if parsed_meta is None:
parsed_meta = dir_to_meta
return parsed_meta
def dir_to_meta(path: Path, suffix="tiff"):
filenames = list(path.glob(f"*.{suffix}"))
try:
# Deduct order from filenames
dimorder = "".join(
map(lambda x: x[0], filenames[0].stem.split("_")[1:])
)
dim_value = list(
map(
lambda f: filename_to_dict_indices(f.stem),
path.glob("*.tiff"),
)
)
maxes = [max(map(lambda x: x[dim], dim_value)) for dim in dimorder]
mins = [min(map(lambda x: x[dim], dim_value)) for dim in dimorder]
_dim_shapes = [
max_val - min_val + 1 for max_val, min_val in zip(maxes, mins)
]
meta = {
"size_" + dim: shape for dim, shape in zip(dimorder, _dim_shapes)
}
except Exception as e:
print(
f"Warning:Metadata: Cannot extract dimensions from filenames. Empty meta set {e}"
)
meta = {}
return meta
def filename_to_dict_indices(stem: str):
return {
dim_number[0]: int(dim_number[1:])
for dim_number in stem.split("_")[1:]
}
#!/usr/bin/env python3
from pathlib import Path
import h5py
import numpy as np
from agora.io.bridge import groupsort
from agora.io.writer import load_attributes
class DynamicReader:
group = ""
def __init__(self, file: str):
self.file = file
self.metadata = load_attributes(file)
class StateReader(DynamicReader):
"""
Analogous to StateWriter:
Possible cases (and data shapes):
- max_lbl (ntraps, 1) -> One int per trap.
- tp_back, trap, cell_label -> One int per cell_label-timepoint
- prev_feats -> A fixed number of floats per cell_label-timepoint (default is 9)
- lifetime, p_was_bud, p_is_mother -> (nTotalCells, 2) A (Ncells, 2) matrix where the first column is the trap,
and its index for such trap (+1) is its cell label.
- ba_cum ->. (2^n, 2^n, None) 3d array where the lineage score is contained for all traps - traps in the 3rd dimension 3d array where the lineage score is contained for all traps - traps in the 3rd dimension.
2^n >= ncells, it is kept in powers of two for efficiency.
"""
data_types = {}
datatypes = {
"max_lbl": ((None, 1), np.uint16),
"tp_back": ((None, 1), np.uint16),
"trap": ((None, 1), np.int16),
"cell_lbls": ((None, 1), np.uint16),
"prev_feats": ((None, None), np.float64),
"lifetime": ((None, 2), np.uint16),
"p_was_bud": ((None, 2), np.float64),
"p_is_mother": ((None, 2), np.float64),
"ba_cum": ((None, None), np.float64),
}
group = "last_state"
def __init__(self, file: str):
super().__init__(file)
def format_tps(self):
pass
def format_traps(self):
pass
def format_bacum(self):
pass
def read_raw(self, key, dtype):
with h5py.File(self.file, "r") as f:
raw = f[self.group + "/" + key][()].astype(dtype)
return raw
def read_all(self):
self.raw_data = {
key: self.read_raw(key, dtype)
for key, (_, dtype) in self.datatypes.items()
}
return self.raw_data
def reconstruct_states(self, data: dict):
ntps_back = max(data["tp_back"]) + 1
from copy import copy
tpback_as_idx = copy(data["tp_back"])
trap_as_idx = copy(data["trap"])
states = {k: {"max_lbl": v} for k, v in enumerate(data["max_lbl"])}
for val_name in ("cell_lbls", "prev_feats"):
for k in states.keys():
if val_name == "cell_lbls":
states[k][val_name] = [[] for _ in range(ntps_back)]
else:
states[k][val_name] = [
np.zeros(
(0, data[val_name].shape[1]), dtype=np.float64
)
for _ in range(ntps_back)
]
data[val_name] = list(
zip(trap_as_idx, tpback_as_idx, data[val_name])
)
for k, v in groupsort(data[val_name]).items():
states[k][val_name] = [
np.array([w[0] for w in val])
for val in groupsort(v).values()
]
for val_name in ("lifetime", "p_was_bud", "p_is_mother"):
for k in states.keys():
states[k][val_name] = np.array([])
# This contains no time points back
for k, v in groupsort(data[val_name]).items():
states[k][val_name] = np.array([val[0] for val in v])
for trap_id, ba_matrix in enumerate(data["ba_cum"]):
states[trap_id]["ba_cum"] = np.array(ba_matrix, dtype=np.float64)
return [val for val in states.values()]
def get_formatted_states(self):
return self.reconstruct_states(self.read_all())
This diff is collapsed.
......@@ -4,15 +4,13 @@ Utility functions and classes
import itertools
import logging
import operator
from functools import partial, wraps
import typing as t
from functools import wraps
from pathlib import Path
from time import perf_counter
from typing import Callable
import cv2
import h5py
import imageio
import numpy as np
def repr_obj(obj, indent=0):
......@@ -35,29 +33,6 @@ def imread(path):
return cv2.imread(str(path), -1)
class ImageCache:
"""HDF5-based image cache for faster loading of the images once they've
been read.
"""
def __init__(self, file, name, shape, remote_fn):
self.store = h5py.File(file, "a")
# Create a dataset
self.dataset = self.store.create_dataset(
name, shape, dtype=np.float, fill_value=np.nan
)
self.remote_fn = remote_fn
def __getitem__(self, item):
cached = self.dataset[item]
if np.any(np.isnan(cached)):
full = self.remote_fn(item)
self.dataset[item] = full
return full
else:
return cached
class Cache:
"""
Fixed-length mapping to use as a cache.
......@@ -92,9 +67,10 @@ class Cache:
self._queue.clear()
def accumulate(lst: list):
lst = sorted(lst)
it = itertools.groupby(lst, operator.itemgetter(0))
def accumulate(list_: list) -> t.Generator:
"""Accumulate list based on the first value"""
list_ = sorted(list_)
it = itertools.groupby(list_, operator.itemgetter(0))
for key, sub_iter in it:
yield key, [x[1] for x in sub_iter]
......