Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • swain-lab/aliby/aliby-mirror
  • swain-lab/aliby/alibylite
2 results
Show changes
Showing
with 2512 additions and 36 deletions
......@@ -5,9 +5,10 @@ GUI/@timelapseTraps/extractCellDataStacksParfor.m
Especially lines 342 to 399.
This part only replicates the method to get the nuc_est_conv values
"""
import typing as t
import numpy as np
import scipy
import skimage
from scipy import signal, stats
def matlab_style_gauss2D(shape=(3, 3), sigma=0.5):
......@@ -25,14 +26,16 @@ def matlab_style_gauss2D(shape=(3, 3), sigma=0.5):
return h
def gauss3D(shape=(3, 3, 3), sigma=(0.5, 0.5, 0.5)):
def gauss3D(
shape: t.Tuple[int] = (3, 3, 3), sigma: t.Tuple[float] = (0.5, 0.5, 0.5)
):
"""3D gaussian mask - based on MATLAB's fspecial but made 3D."""
m, n, p = [(ss - 1.0) / 2.0 for ss in shape]
z, y, x = np.ogrid[-p : p + 1, -m : m + 1, -n : n + 1]
sigmax, sigmay, sigmaz = sigma
xx = (x ** 2) / (2 * sigmax)
yy = (y ** 2) / (2 * sigmay)
zz = (z ** 2) / (2 * sigmaz)
xx = (x**2) / (2 * sigmax)
yy = (y**2) / (2 * sigmay)
zz = (z**2) / (2 * sigmaz)
h = np.exp(-(xx + yy + zz))
h[h < np.finfo(h.dtype).eps * h.max()] = 0 # Truncate
sumh = h.sum()
......@@ -41,7 +44,7 @@ def gauss3D(shape=(3, 3, 3), sigma=(0.5, 0.5, 0.5)):
return h
def small_peaks_conv(cell_mask, trap_image):
def small_peaks_conv(cell_mask: np.ndarray, trap_image: np.ndarray):
cell_fluo = trap_image[cell_mask]
# Get the number of pixels in the cell
num_cell_fluo = len(np.nonzero(cell_fluo)[0])
......@@ -59,59 +62,91 @@ def small_peaks_conv(cell_mask, trap_image):
# expect the size of the nucleus to be.
# TODO directly get a disk of that size?
# new_shape = tuple(x * ratio_overlap / 5 for x in conv_matrix.shape)
# conv_matrix = scipy.misc.imresize(conv_matrix, new_shape)
# conv_matrix = misc.imresize(conv_matrix, new_shape)
conv_matrix = skimage.morphology.disk(3 * ratio_overlap / 5)
# Apply convolution to the image
# TODO maybe rename 'conv_matrix' to 'kernel'
fluo_peaks = scipy.signal.convolve(trap_image, conv_matrix, "same")
fluo_peaks = signal.convolve(trap_image, conv_matrix, "same")
fluo_peaks = fluo_peaks[cell_mask]
small_peak_conv = np.max(fluo_peaks)
return small_peak_conv
def nuc_est_conv(cell_mask, trap_image):
def nuc_est_conv(
cell_mask: np.ndarray,
trap_image: np.ndarray,
alpha: t.Optional[float] = 0.95,
object_radius_estimation: t.Optional[float] = 0.085,
gaussian_filter_shape: t.Optional[t.Union[int, t.Tuple[int]]] = None,
gaussian_sigma: t.Optional[float] = None,
):
"""
:param cell_mask: the segmentation mask of the cell (filled)
:param trap_image: the image for the trap in which the cell is (all
channels)
:param alpha: optional distribution alpha to get confidence intervals
:param object_radius_estimation: optional estimated object volume
(in pixels), used to estimate the object radius.
:param gaussian_filter_shape: optional tuple to pass to matlab_style_gauss2D,
determines the kernel shape for convolutions.
:param gaussian_sigma: optional optional sigma to pass to matlab_style_gauss2D
as sigma argument.
"""
if alpha is None:
alpha = 0.95
if object_radius_estimation is None:
object_radius_estimation = 0.085
cell_loc = cell_mask # np.where(cell_mask)[0]
cell_fluo = trap_image[cell_mask]
num_cell_fluo = len(np.nonzero(cell_fluo)[0])
chi2inv = stats.distributions.chi2.ppf(alpha, df=2)
approx_nuc_radius = np.sqrt(
object_radius_estimation * num_cell_fluo / np.pi
)
if gaussian_sigma is None:
gaussian_sigma = float(approx_nuc_radius / np.sqrt(chi2inv))
# Nuc Est Conv
alpha = 0.95
approx_nuc_radius = np.sqrt(0.085 * num_cell_fluo / np.pi)
chi2inv = scipy.stats.distributions.chi2.ppf(alpha, df=2)
sd_est = approx_nuc_radius / np.sqrt(chi2inv)
filter_size = int(np.ceil(2 * approx_nuc_radius))
gaussian_filter_shape = (2 * filter_size + 1,) * 2
nuc_filt_hw = np.ceil(2 * approx_nuc_radius)
nuc_filter = matlab_style_gauss2D((2 * nuc_filt_hw + 1,) * 2, sd_est)
nuc_filter = matlab_style_gauss2D(gaussian_filter_shape, gaussian_sigma)
cell_image = trap_image - np.median(cell_fluo)
cell_image[~cell_loc] = 0
nuc_conv = scipy.signal.convolve(cell_image, nuc_filter, "same")
nuc_conv = signal.convolve(cell_image, nuc_filter, "same")
nuc_est_conv = np.max(nuc_conv)
nuc_est_conv /= np.sum(nuc_filter ** 2) * alpha * np.pi * chi2inv * sd_est ** 2
nuc_est_conv /= (
np.sum(nuc_filter**2) * alpha * np.pi * chi2inv * gaussian_sigma**2
)
return nuc_est_conv
def nuc_conv_3d(cell_mask, trap_image, pixel_size=0.23, spacing=0.6):
cell_mask = np.dstack([cell_mask] * trap_image.shape[-1])
cell_mask = np.stack([cell_mask] * trap_image.shape[0])
ratio = spacing / pixel_size
cell_fluo = trap_image[cell_mask]
num_cell_fluo = len(np.nonzero(cell_fluo)[0])
# Nuc Est Conv
alpha = 0.95
approx_nuc_radius = np.sqrt(0.085 * num_cell_fluo / np.pi)
chi2inv = scipy.stats.distributions.chi2.ppf(alpha, df=2)
chi2inv = stats.distributions.chi2.ppf(alpha, df=2)
sd_est = approx_nuc_radius / np.sqrt(chi2inv)
nuc_filt_hw = np.ceil(2 * approx_nuc_radius)
nuc_filter = gauss3D((2 * nuc_filt_hw + 1,) * 3, (sd_est, sd_est, sd_est * ratio))
nuc_filter = gauss3D(
(2 * nuc_filt_hw + 1,) * 3, (sd_est, sd_est, sd_est * ratio)
)
cell_image = trap_image - np.median(cell_fluo)
cell_image[~cell_mask] = 0
nuc_conv = scipy.signal.convolve(cell_image, nuc_filter, "same")
nuc_conv = signal.convolve(cell_image, nuc_filter, "same")
nuc_est_conv = np.max(nuc_conv)
nuc_est_conv /= np.sum(nuc_filter ** 2) * alpha * np.pi * chi2inv * sd_est ** 2
nuc_est_conv /= (
np.sum(nuc_filter**2) * alpha * np.pi * chi2inv * sd_est**2
)
return nuc_est_conv
# File with defaults for ease of use
import re
import typing as t
from pathlib import Path
import h5py
# should we move these functions here?
from aliby.tile.tiler import find_channel_name
def exparams_from_meta(
meta: t.Union[dict, Path, str], extras: t.Collection[str] = ["ph"]
):
"""
Obtain parameters from metadata of the h5 file.
Compares a list of candidate channels using case-insensitive
REGEX to identify valid channels.
"""
meta = meta if isinstance(meta, dict) else load_metadata(meta)
base = {
"tree": {"general": {"None": ["area", "volume", "eccentricity"]}},
"multichannel_ops": {},
}
candidate_channels = {
"Citrine",
"GFP",
"GFPFast",
"mCherry",
"pHluorin405",
"pHluorin488",
"Flavin",
"Cy5",
"mKO2",
}
default_reductions = {"max"}
default_metrics = {
"mean",
"median",
"std",
"imBackground",
"max5px",
# "nuc_est_conv",
}
# define ratiometric combinations
# key is numerator and value is denominator
# add more to support additional channel names
ratiometric_combinations = {"phluorin405": ("phluorin488", "gfpfast")}
default_reduction_metrics = {
r: default_metrics for r in default_reductions
}
# default_rm["None"] = ["nuc_conv_3d"] # Uncomment this to add nuc_conv_3d (slow)
extant_fluorescence_ch = []
for av_channel in candidate_channels:
# find matching channels in metadata
found_channel = find_channel_name(meta.get("channels", []), av_channel)
if found_channel is not None:
extant_fluorescence_ch.append(found_channel)
for ch in extant_fluorescence_ch:
base["tree"][ch] = default_reduction_metrics
base["sub_bg"] = extant_fluorescence_ch
# additional extraction defaults if the channels are available
if "ph" in extras:
# SWAINLAB specific names
# find first valid combination of ratiometric fluorescence channels
numerator_channel, denominator_channel = (None, None)
for ch1, chs2 in ratiometric_combinations.items():
found_channel1 = find_channel_name(extant_fluorescence_ch, ch1)
if found_channel1 is not None:
numerator_channel = found_channel1
for ch2 in chs2:
found_channel2 = find_channel_name(
extant_fluorescence_ch, ch2
)
if found_channel2:
denominator_channel = found_channel2
break
# if two compatible ratiometric channels are available
if numerator_channel is not None and denominator_channel is not None:
sets = {
b + a: (x, y)
for a, x in zip(
["", "_bgsub"],
(
[numerator_channel, denominator_channel],
[
f"{numerator_channel}_bgsub",
f"{denominator_channel}_bgsub",
],
),
)
for b, y in zip(["em_ratio", "gsum"], ["div0", "add"])
}
for i, v in sets.items():
base["multichannel_ops"][i] = [
*v,
default_reduction_metrics,
]
return base
def load_metadata(file: t.Union[str, Path], group="/"):
"""Get meta data from an h5 file."""
with h5py.File(file, "r") as f:
meta = dict(f[group].attrs.items())
return meta
import typing as t
import bottleneck as bn
import numpy as np
def trap_apply(cell_fun, cell_masks, *args, **kwargs):
"""
Apply a cell_function to a mask and a trap_image.
Parameters
----------
cell_fun: function
Function to apply to the cell (from extraction/cell.py)
cell_masks: 3d array
Segmentation masks for the cells. Note that cells are in the first dimension (N, Y,X)
*args: tuple
Trap_image and any other arguments to pass if needed to custom functions.
**kwargs: dict
Keyword arguments to pass if needed to custom functions.
"""
# apply cell_fun to each cell and return the results as a list
return [cell_fun(mask, *args, **kwargs) for mask in cell_masks]
def reduce_z(trap_image: np.ndarray, fun: t.Callable, axis: int = 0):
"""
Reduce the trap_image to 2d.
Parameters
----------
trap_image: array
Images for all the channels associated with a trap
fun: function
Function to execute the reduction
axis: int (default 0)
Axis in which we apply the reduction operation.
"""
# FUTURE replace with py3.10's match-case.
if (
hasattr(fun, "__module__") and fun.__module__[:10] == "bottleneck"
): # Bottleneck type
return getattr(bn.reduce, fun.__name__)(trap_image, axis=axis)
elif isinstance(fun, np.ufunc):
# optimise the reduction function if possible
return fun.reduce(trap_image, axis=axis)
else: # WARNING: Very slow, only use when no alternatives exist
return np.apply_along_axis(fun, axis, trap_image)
import typing as t
from types import FunctionType
from inspect import getfullargspec, getmembers, isfunction, isbuiltin
import bottleneck as bn
from extraction.core.functions import cell, trap
from extraction.core.functions.custom import localisation
from extraction.core.functions.distributors import trap_apply
from extraction.core.functions.math_utils import div0
"""
Load functions for analysing cells and their background.
Note that inspect.getmembers returns a list of function names and functions,
and inspect.getfullargspec returns a function's arguments.
"""
def load_cellfuns_core():
"""Load functions from the cell module and return as a dict."""
return {
f[0]: f[1]
for f in getmembers(cell)
if isfunction(f[1])
and f[1].__module__.startswith("extraction.core.functions")
}
def load_custom_args() -> t.Tuple[
(t.Dict[str, t.Callable], t.Dict[str, t.List[str]])
]:
"""
Load custom functions from the localisation module.
Return the functions and any additional arguments other
than cell_mask and trap_image as dictionaries.
"""
# load functions from module
funs = {
f[0]: f[1]
for f in getmembers(localisation)
if isfunction(f[1])
and f[1].__module__.startswith("extraction.core.functions")
}
# load additional arguments if cell_mask and trap_image are arguments
args = {
k: getfullargspec(v).args[2:]
for k, v in funs.items()
if set(["cell_mask", "trap_image"]).intersection(
getfullargspec(v).args
)
}
# return dictionaries of functions and of arguments
return (
{k: funs[k] for k in args.keys()},
{k: v for k, v in args.items() if v},
)
def load_cellfuns():
"""
Create a dict of core functions for use on cell_masks.
The core functions only work on a single mask.
"""
# create dict of the core functions from cell.py - these functions apply to a single mask
cell_funs = load_cellfuns_core()
# create a dict of functions that apply the core functions to an array of cell_masks
CELLFUNS = {}
for f_name, f in cell_funs.items():
if isfunction(f):
def tmp(f):
args = getfullargspec(f).args
if len(args) == 1:
# function that applies f to m, an array of masks
return lambda m, _: trap_apply(f, m)
else:
# function that applies f to m and img, the trap_image
return lambda m, img: trap_apply(f, m, img)
CELLFUNS[f_name] = tmp(f)
return CELLFUNS
def load_trapfuns():
"""Load functions that are applied to an entire tile."""
TRAPFUNS = {
f[0]: f[1]
for f in getmembers(trap)
if isfunction(f[1])
and f[1].__module__.startswith("extraction.core.functions")
}
return TRAPFUNS
def load_funs():
"""Combine all automatically loaded functions."""
CELLFUNS = load_cellfuns()
TRAPFUNS = load_trapfuns()
# return dict of cell funs, dict of trap funs, and dict of both
return CELLFUNS, TRAPFUNS, {**TRAPFUNS, **CELLFUNS}
def load_redfuns(
additional_reducers: t.Optional[
t.Union[t.Dict[str, t.Callable], t.Callable]
] = None,
) -> t.Dict[str, t.Callable]:
"""
Load functions to reduce a multidimensional image by one dimension.
Parameters
----------
additional_reducers: function or a dict of functions (optional)
Functions to perform the reduction.
"""
RED_FUNS = {
"max": bn.nanmax,
"mean": bn.nanmean,
"median": bn.nanmedian,
"div0": div0,
"add": bn.nansum,
"None": None,
}
if additional_reducers is not None:
if isinstance(additional_reducers, FunctionType):
additional_reducers = [
(additional_reducers.__name__, additional_reducers)
]
RED_FUNS.update(additional_reducers)
return RED_FUNS
import numpy as np
def div0(array, fill=0, axis=-1):
"""
Divide array a by array b.
If the result is a scalar and infinite, return fill.
If the result contain elements that are infinite, replace these elements with fill.
Parameters
----------
a: array
b: array
fill: float
**kwargs: kwargs
"""
assert array.shape[axis] == 2, f"Array has the wrong shape in axis {axis}"
slices_0, slices_1 = [[slice(None)] * len(array.shape)] * 2
slices_0[axis] = 0
slices_1[axis] = 1
with np.errstate(divide="ignore", invalid="ignore"):
c = np.true_divide(
array[tuple(slices_0)],
array[tuple(slices_1)],
)
if np.isscalar(c):
return c if np.isfinite(c) else fill
else:
c[~np.isfinite(c)] = fill
return c
## Trap-wise calculations
import numpy as np
def imBackground(cell_masks, trap_image):
"""
Find the median background (pixels not comprising cells) from trap_image.
Parameters
----------
cell_masks: 3d array
Segmentation masks for cells
trap_image:
The image (all channels) for the tile containing the cell.
"""
if not len(cell_masks):
# create cell_masks if none are given
cell_masks = np.zeros_like(trap_image)
# find background pixels
# sum over all cells identified at a trap - one mask for each cell
background = ~cell_masks.sum(axis=2).astype(bool)
return np.median(trap_image[np.where(background)])
def background_max5(cell_masks, trap_image):
"""
Finds the mean of the maximum five pixels of the background.
Parameters
----------
cell_masks: 3d array
Segmentation masks for cells.
trap_image:
The image (all channels) for the tile containing the cell.
"""
if not len(cell_masks):
# create cell_masks if none are given
cell_masks = np.zeros_like(trap_image)
# find background pixels
# sum over all cells identified at a trap - one mask for each cell
background = ~cell_masks.sum(axis=2).astype(bool)
return np.mean(np.sort(trap_image[np.where(background)])[-5:])
import git
import pkg_resources
def get_sha():
# FIXME Unused, but *should* be used...
repo = git.Repo(search_parent_directories=True)
sha = repo.head.object.hexsha
return sha
def get_version(pkg="extraction"):
return pkg_resources.require(pkg)[0].version
......@@ -9,16 +9,14 @@ The most basic functions were copied from Swain Lab's baby module,
specifically baby/io.py
"""
import os
from importlib_resources import files
import json
import re
from pathlib import Path
from itertools import groupby
from pathlib import Path
from typing import Callable
import numpy as np
import random
from imageio import imread
from extraction.core.functions.distributors import reduce_z
......@@ -31,12 +29,13 @@ def load_tiled_image(filename):
nt = info.get("ntiles", 1)
nr, nc = info.get("layout", (1, 1))
nc_final_row = np.mod(nt, nc)
img = np.zeros((tw, th, nt), dtype=tImg.dtype)
img = np.zeros((nt, tw, th), dtype=tImg.dtype)
for i in range(nr):
i_nc = nc_final_row if i + 1 == nr and nc_final_row > 0 else nc
for j in range(i_nc):
ind = i * nc + j
img[:, :, ind] = tImg[i * tw : (i + 1) * tw, j * th : (j + 1) * th]
img[ind, :, :] = tImg[i * tw : (i + 1) * tw, j * th : (j + 1) * th]
return img, info
......@@ -51,11 +50,14 @@ def load_paired_images(filenames, typeA="Brightfield", typeB="segoutlines"):
k: {m.group(2): f for m, f in v}
for k, v in groupby(valid, key=lambda m: m[0].group(1))
}
valid = [set(v.keys()).issuperset({typeA, typeB}) for v in grouped.values()]
valid = [
set(v.keys()).issuperset({typeA, typeB}) for v in grouped.values()
]
if not all(valid):
raise Exception
return {
l: {t: load_tiled_image(f) for t, f in g.items()} for l, g in grouped.items()
lbl: {t: load_tiled_image(f) for t, f in g.items()}
for lbl, g in grouped.items()
}
......@@ -70,7 +72,13 @@ def load(path=None):
list of dictionaries containing GFP, Brightfield and segoutlines channel
"""
if path is None:
path = Path(os.path.dirname(os.path.realpath(__file__))) / Path("pairs_data")
path = (
files("aliby").parent.parent
/ "examples"
/ "extraction"
/ "pairs_data"
)
image_dir = Path(path)
channels = ["Brightfield", "GFP"]
......
# logfile\_parser
Simple log file parsing according to grammars specified in JSON
## Basic usage
This package comes with three built-in grammars: 'multiDGUI\_acq\_format',
'multiDGUI\_log\_format' and 'cExperiment\_log\_format'. As an example, the
'multiDGUI\_acq\_format' grammar can be used to parse the included example
using:
```python
>>> from logfile_parser import Parser
>>> acq_parser = Parser('multiDGUI_acq_format')
>>> with open('examples/example_multiDGUI_acq.txt', 'r') as f:
... parsed = acq_parser.parse(f)
>>> print(parsed)
```
The parsed output is a `dict` containing any fields satisfying the grammar.
## Defining new grammars
Custom grammars should be written in json as a dictionary with keys specifying
the information to extract from the log file.
The built-in grammars are useful examples or starting points for defining custom
grammars. They can be found in the `logfile_parser/grammars` directory.
Let's start with a basic example of a log file that we might want to parse:
```text
Date: 16 Apr 2020
Microscope: Batgirl
Experiment details:
My lengthy description of what will certainly be a great experiment.
This description takes multiple lines.
Tags:
User name, Project name, Experiment name
```
A basic grammar that just extracts the description of the experiment could be
defined using:
```json
{
"description": {
"trigger_startswith": "Experiment details:"
},
"stop": {
"trigger_startswith": "Tags:",
"type": "stop"
}
}
```
This tells the parser to fill the "description" field of the parsed result with
text on lines *after* that starting with the text "Experiment details:", and
then tells the parser to terminate parsing whenever it encounters a line that
starts with the text "Tags:". If you wanted it to include the trigger line, you
would specify `"skip": "false"` as an additional property for `"description"`.
If we also wanted to fill a "tags" field with the comma separated tags, we would
just need to change the type to "list":
```json
{
"description": {
"trigger_startswith": "Experiment details:"
},
"tags": {
"trigger_startswith": "Tags:",
"type": "list"
}
}
```
To extract the microscope name, we can make use of the "regex" type:
```json
{
"microscope": {
"trigger_startswith": "Microscope:",
"type": "regex",
"regex": "^Microscope:\\s*(.*)$"
}
}
```
The expression found in the bracketed group will be stored in the "microscope"
field of the parsed result.
Finally, to extract a date, we combine a "regex" with a "map" to map the text
to a Python `datetime` object:
```json
{
"date": {
"trigger_startswith": "Date:",
"type": "regex",
"regex": "^.*(\\d{2} [A-Z][a-z]{2} \\d{4})$",
"map": "datetime:%d %b %Y"
}
}
```
Putting this all together gives us the following grammar:
```json
{
"date": {
"trigger_startswith": "Date:",
"type": "regex",
"regex": "^.*(\\d{2} [A-Z][a-z]{2} \\d{4})$",
"map": "datetime:%d %b %Y"
},
"microscope": {
"trigger_startswith": "Microscope:",
"type": "regex",
"regex": "^Microscope:\\s*(.*)$"
},
"description": {
"trigger_startswith": "Experiment details:"
},
"tags": {
"trigger_startswith": "Tags:",
"type": "list"
}
}
```
If this is saved to a file `newgrammar.json` we could parse the log file as
listed above (say it is in `logfile.txt`) using the following:
```python
>>> from logfile_parser import Parser
>>> parser = Parser('newgrammar.json')
>>> with open('logfile.txt', 'r') as f:
... parsed = parser.parse(f)
>>> print(parsed)
{'date': datetime.datetime(2020, 4, 16, 0, 0), 'microscope': 'Batgirl',
'description': 'My lengthy description of what will certainly be a great
experiment.\nThis description takes multiple lines.', 'tags': ['User name',
'Project name', 'Experiment name']}
```
# -*- coding: utf-8 -*-
from .logfile_parser import Parser
{
"@@CONFIG@@": {
"regex_preprocessing": ["^\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}\\s*(.*)$"]
},
"extractmethod": {
"trigger_startswith": "extractionParameters:",
"type": "regex",
"regex": "^extractFunction:\\s*(.*)$",
"use_unmatched": true
},
"segmethod": {
"trigger_re": "Start .* segmentation",
"type": "regex",
"regex": "^.*Start (.*) segmentation.*$"
},
"segcomplete": {
"trigger_re": "Successfully completed .* segmentation",
"type": "regex",
"regex": "^(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}).*$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
},
"compiled": {
"trigger_startswith": "Successfully completed compiling cell information",
"type": "regex",
"regex": "^(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}).*$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
}
}
{
"channels": {
"trigger_startswith": "Channels:",
"type": "table",
"column_map": {
"Channel name": ["channel","str"],
"Exposure time": ["exposure","int"],
"Skip": ["skip","int"],
"Z sect.": ["zsect","int"],
"Start time": ["start_time","int"],
"Camera mode": ["camera_mode","int"],
"EM gain": ["em_gain","int"],
"Voltage": ["voltage","float"]
}
},
"zsectioning": {
"trigger_startswith": "Z_sectioning:",
"type": "table",
"column_map": {
"Sections": ["nsections","int"],
"Spacing": ["spacing","float"],
"PFSon?": ["pfson","bool"],
"AnyZ?": ["anyz","bool"],
"Drift": ["drift","int"],
"Method": ["zmethod","int"]
}
},
"time_settings": {
"trigger_startswith": "Time_settings",
"type": "table",
"has_header": false,
"column_map": [
["istimelapse","bool"],
["timeinterval","int"],
["ntimepoints","int"],
["totaltime","int"]
]
},
"positions": {
"trigger_startswith": "Points:",
"type": "table",
"column_map": {
"Position name": ["posname","str"],
"X position": ["xpos","float"],
"Y position": ["ypos","float"],
"Z position": ["zpos","float"],
"PFS offset": ["pfsoffset","float"],
"Group": ["group","int"]
},
"default_map": "int"
},
"npumps": {
"trigger_startswith": "Syringe pump details:",
"type": "regex",
"regex": "^.*:\\s*(\\d+)\\s*pumps\\.*$",
"map": "int"
},
"pumpinit": {
"trigger_startswith": "Pump states at beginning of experiment:",
"type": "table",
"column_map": {
"Pump port": ["pump_port","str"],
"Diameter": ["syringe_diameter","float"],
"Current rate": ["flowrate","float"],
"Direction": ["flowdirection","str"],
"Running": ["isrunning", "bool"],
"Contents": ["contents", "str"]
}
},
"nswitches": {
"trigger_startswith": "Number of pump changes:",
"type": "regex",
"regex": "(\\d+)",
"map": "int"
},
"switchvol": {
"trigger_startswith": "Infuse/withdraw volumes:",
"type": "regex",
"regex": "(\\d+)",
"map": "int"
},
"switchrate": {
"trigger_startswith": "Infuse/withdraw rates:",
"type": "regex",
"regex": "(\\d+)",
"map": "int"
},
"switchtimes": {
"trigger_startswith": "Times:",
"type": "list",
"map": "int"
},
"switchtopump": {
"trigger_startswith": "Switched to:",
"type": "list",
"map": "int"
},
"switchfrompump": {
"trigger_startswith": "Switched from:",
"type": "list",
"map": "int"
},
"pumprate": {
"trigger_startswith": "Flow post switch:",
"type": "lists",
"map": "float"
}
}
{
"date": {
"trigger_re": "^\\d{2}-[A-Z][a-z]{2}-\\d{4}$",
"type": "regex",
"regex": "^\\d{2}-[A-Z][a-z]{2}-\\d{4}$",
"map": "datetime:%d-%b-%Y"
},
"multiDGUI_commit": {
"trigger_startswith": "Swain lab microscope control software",
"type": "regex",
"regex": "^.*commit number:([0-9a-z]+)$",
"next_section": "date"
},
"microscope": {
"trigger_startswith": "Microscope name is:",
"type": "regex",
"regex": "^Microscope name is:\\s+(.*)$"
},
"acqfile": {
"trigger_startswith": "Acquisition settings are saved in:",
"type": "regex",
"regex": "^(.*)$",
"skip": true
},
"details": {
"trigger_startswith": "Experiment details:"
},
"setup": {
"trigger_startswith": "Microscope setup for used channels:"
},
"omero_project": {
"trigger_startswith": "Omero project:",
"type": "regex",
"regex": "^(.*)$",
"skip": true
},
"omero_tags": {
"trigger_startswith": "Omero tags:",
"type": "list"
},
"omero_tags_stop": {
"trigger_startswith": "PFS is locked"
},
"omero_tag_descriptions": {
"trigger_startswith": "Omero tag descriptions:",
"type": "list"
},
"expt_start": {
"trigger_startswith": "Experiment started at:",
"type": "regex",
"regex": "^.*at:(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2})$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
},
"first_capture": {
"trigger_startswith": "------Time point_1",
"type": "regex",
"regex": "^Channel:.*set at:(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2})$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
},
"stop": {
"trigger_startswith": "------Time point_2",
"type": "stop"
}
}
#!/usr/bin/env jupyter
from importlib_resources import files
from logfile_parser import Parser
grammars_dir = files("logfile_parser") / "grammars"
def get_examples_dir():
return files("aliby").parent.parent / "examples" / "logfile_parser"
def get_logfile_grammars_dir():
return files("logfile_parser") / "grammars"
def get_legacy_log_example_interface() -> dict:
parsed = {}
examples_dir = get_examples_dir()
grammars_dir = get_logfile_grammars_dir()
for gram in ("acq", "log"):
for gram_file in grammars_dir.glob(f"multiDGUI_{gram}_format.json"):
parser = Parser(gram_file)
for file_to_parse in examples_dir.glob(f"*{gram}.txt"):
with open(file_to_parse, "r") as f:
parsed = {**parsed, **parser.parse(f)}
return parsed
def to_legacy(parsed_logfile: dict) -> dict:
"""
Convert the output of the new logfile parsing to legacy to a minimal working set of metadata.
This converts the new more complex metadata structure to the previous one that did not have configuration profiles, but instead one configuration per channel.
This is a temporal solution as we transition into a more general metadata structure that accounts for heterogeneous groups.
We convert image configs to channels, and add general metadata to the root.
"""
name_translation = {
"Microscope name": "microscope",
}
channel_name_translation = {
"Image config": "channel",
"Channel": "channel_hardware",
"Exposure (ms)": "exposure",
"Z spacing (um)": "zsect",
}
# Translate general data
general = {v: d["general"][k] for k, v in name_translation.items()}
# Translate and cast image configs
channels = {
v: list(map(_cast_type, parsed_logfile["image_config"][k]))
for k, v in channel_name_translation.items()
}
legacy_format = {"channels": channels, **general}
return legacy_format
# -*- coding: utf-8 -*-
import json
import pkgutil
import re
import typing as t
from datetime import datetime
from os.path import dirname, exists, join
CONFIG_KEY = "@@CONFIG@@"
DEFAULT_NOSKIP = {"regex", "regexs", "list", "lists"}
DEFAULT_NOT_USE_UNMATCHED = {"regex", "regexs"}
class GrammarNotFound(OSError):
pass
class ParseError(Exception):
pass
class Parser(object):
def __init__(self, grammar_filename):
"""Create a Parser object based on the grammar defined in a file
:param grammar_filename: path to json file specifying grammar for this
parser, or one of the default grammars included with the package
"""
if exists(grammar_filename):
with open(grammar_filename, "r") as f:
self.grammar = json.load(f)
else:
if not grammar_filename.endswith(".json"):
grammar_filename = grammar_filename + ".json"
try:
grammar_fd = pkgutil.get_data(
__package__, "grammars/" + grammar_filename
)
except FileNotFoundError as e:
raise GrammarNotFound(
"{}:specified grammar could not be found:".format(e)
)
self.grammar = json.loads(grammar_fd)
self._config = self.grammar.get(CONFIG_KEY, {})
if CONFIG_KEY in self.grammar:
del self.grammar[CONFIG_KEY]
# Preprocessing to be applied to each line before checking triggers
self._preprocessing = self._config.get("regex_preprocessing", [])
self._preprocessing = [re.compile(r) for r in self._preprocessing]
self._triggers = {
trigger_type: [
(k, v[f"trigger_{trigger_type}"])
if trigger_type != "re"
else (k, re.compile(v[f"trigger_{trigger_type}"]))
for k, v in self.grammar.items()
if f"trigger_{trigger_type}" in v
]
for trigger_type in ("startswith", "endswith", "contains", "re")
}
def _set_section(self, k=None):
if k in self.grammar:
self._active_section = self.grammar[k]
self._section_name = k
self._section_type = self._active_section.get("type")
else:
self._active_section = None
self._section_name = ""
self._section_type = None
def parse(self, filehandle):
"""Parse contents of file according to the loaded grammar
:param filehandle: a line generator, e.g., a valid file handle
"""
self._set_section()
table_header = []
column_types = []
output = {}
for line in filehandle:
line = line.strip()
if len(line) == 0:
# skip blank lines
continue
line_pp = [r.findall(line) for r in self._preprocessing]
line_pp = [m[0].strip() for m in line_pp if len(m) == 1]
line_unmatched = line_pp[0] if len(line_pp) == 1 else line
line_pp += [line]
trigger_check_methods = {
k: lam
for k, lam in zip(
self._triggers.keys(),
(
lambda x, t: x.startswith(t),
lambda x, t: x.endswith(t),
lambda x, t: x.find(t),
lambda x, re: re.findall(x),
),
)
}
matches = {
trigger: [
(k, trig_str)
for k, trig_str in self._triggers[trigger]
if any(
[
trigger_check_methods[trigger](line, trig_str)
for line in line_pp
]
)
]
for trigger, method in trigger_check_methods.items()
}
section_match = {
k
for trigger_matches in matches.values()
for k, _ in trigger_matches
}
# if len(section_match) > 1:
assert len(section_match) <= 1, ParseError(
"conflicting sections triggered"
)
if len(section_match) == 1:
# Update the active section
self._set_section(list(section_match)[0])
# Determine the unmatched part of the line
line_unmatched = self.determine_unmatched_part(
matches, line_pp
)
# Skip the matched line if requested
if self._active_section.get(
"skip", self._section_type not in DEFAULT_NOSKIP
):
continue
if self._active_section is None:
continue
active_section = self._active_section
section_type = self._section_type
section_name = self._section_name
if active_section.get(
"use_unmatched",
self._section_type not in DEFAULT_NOT_USE_UNMATCHED,
):
line = line_unmatched.strip()
if len(line) == 0:
continue
if section_type == "table":
sep = active_section.get("separator", ",")
row = line.split(sep)
if section_name not in output:
# Table needs initialisation
(
has_header,
row,
table_header,
column_types,
) = self._parse_table(active_section, row)
output[section_name] = {k: [] for k in table_header}
if active_section.get("has_header", True):
continue
if len(row) < len(table_header):
# skip lines that have fewer columns than expected
continue
# Merge extra columns into final column
row = self._table_merge_extra_columns(
table_header, sep, row, column_types
)
# Fill out current row
for val, colname, coltype in zip(
row, table_header, column_types
):
output[section_name][colname].append(
_map_to_type(val.strip(), coltype)
)
elif section_type in {"list", "lists"}:
sep = active_section.get("separator", ",")
output[section_name] = output.get(section_name, [])
map_type = active_section.get("map")
next_list = [
_map_to_type(el.strip(), map_type)
for el in line.split(sep)
]
list_to_append = (
[next_list] if section_type == "lists" else next_list
)
output[section_name] += list_to_append
elif section_type in {"regex", "regexs"}:
regex = active_section.get("regex", "^(.*)$")
map_type = active_section.get("map")
matches = re.findall(regex, line)
if len(matches) == 0:
continue
elif len(matches) == 1 and section_type == "regex":
output[section_name] = _map_to_type(matches[0], map_type)
else:
output[section_name] = output.get(section_name, [])
output[section_name] += [
_map_to_type(m, map_type) for m in matches
]
# Terminate after finding the first match
self._terminate_after_first_match(active_section, section_type)
elif section_type == "stop":
break
else:
# By default, just append additional lines as text
new_str = (
f"{output[section_name]}\n{line}"
if section_name in output
else line
)
output[section_name] = new_str
return output
@staticmethod
def determine_unmatched_part(
matches: t.Dict[str, t.List], line_pp: t.List[str]
):
if matches["startswith"]:
_, t = matches["startswith"][0]
line_unmatched = [
line[len(t) :] for line in line_pp if line.startswith(t)
][0]
elif matches["endswith"]:
_, t = matches["endwith"][0]
line_unmatched = [
line[: -(len(t) + 1)] for line in line_pp if line.endswith(t)
][0]
elif matches["contains"]:
_, t = matches["contains"][0]
lpp = [line for line in line_pp if line.find(t) >= 0][0]
i = lpp.find(t)
line_unmatched = lpp[:i] + lpp[(i + len(t)) :]
elif matches["re"]:
_, r = matches["re"][0]
line_unmatched = [
r.sub("", line) for line in line_pp if len(r.findall(line)) > 0
][0]
return line_unmatched
def _terminate_after_first_match(self, active_section, section_type):
# Terminate after finding the first match
if section_type == "regex":
next_section = active_section.get("next_section")
self._set_section(next_section)
return next_section
@staticmethod
def _parse_table(active_section, row):
has_header = active_section.get("has_header", True)
if has_header:
row = [col.strip() for col in row]
default_type = active_section.get("default_map", "str")
colmap = active_section.get("column_map", len(row) * [(None, None)])
if type(colmap) == list:
# Columns are defined in order
if has_header:
table_header = [mn or rn for rn, (mn, _) in zip(row, colmap)]
table_header += row[len(colmap) :]
column_types = [mt for _, mt in colmap]
column_types += (len(row) - len(colmap)) * [default_type]
else:
table_header = [
mn or "column{:02d}".format(i + 1)
for i, (mn, _) in enumerate(colmap)
]
column_types = [mt or default_type for _, mt in colmap]
elif type(colmap) == dict:
if not has_header:
raise ParseError("dict column maps must have a header")
# First row is a header
table_header = [colmap.get(rn, (rn, None))[0] for rn in row]
column_types = [
colmap.get(rn, (None, default_type))[1] for rn in row
]
else:
raise ParseError("badly formatted column map")
return has_header, row, table_header, column_types
@staticmethod
def _table_merge_extra_columns(table_header, sep, row, column_types):
# Merge extra columns into final column
ncol = len(table_header)
if len(row) > ncol:
row[ncol - 1] = sep.join(row[ncol - 1 :])
del row[ncol:]
assert len(row) == len(table_header) and len(row) == len(column_types)
return row
def _map_to_type(val, map_type):
if map_type and map_type.startswith("datetime"):
date_format = "%Y-%m-%dT%H:%M:%S.%fZ" # ISO 8601 format
if map_type.startswith("datetime:"):
date_format = map_type[9:]
try:
return datetime.strptime(val, date_format)
except ValueError:
return None
else:
try:
return {"str": str, "int": int, "float": float, "bool": bool}.get(
map_type, str
)(val)
except ValueError or TypeError:
return {"float": float("nan")}.get(map_type)
#!/usr/bin/env jupyter
# TODO should this be merged to the regular logfile_parser structure?
"""
Description of new logfile:
All three conditions are concatenated in a single file, in this order:
- Experiment basic information (URL in acquisition PC, project, user input)
- Acquisition settings
- Experiment start
The section separators are:
-----Acquisition settings-----
-----Experiment started-----
And for a successfully finished experiment we get:
YYYY-MM-DD HH:mm:ss,ms*3 Image acquisition complete WeekDay Mon Day HH:mm:ss,ms*3 YYYY
For example:
2022-09-30 05:40:59,765 Image acquisition complete Fri Sep 30 05:40:59 2022
Data to extract:
* Basic information
- Experiment details, which may indicate technical issues
- GIT commit
- (Not working as of 2022/10/03, but projects and tags)
* Basic information
-
New grammar
- Tables are assumed to end with an empty line.
"""
import logging
import typing as t
from pathlib import Path
import pandas as pd
from pyparsing import (
CharsNotIn,
Combine,
Group,
Keyword,
LineEnd,
LineStart,
Literal,
OneOrMore,
ParserElement,
Word,
printables,
)
atomic = t.Union[str, int, float, bool]
class HeaderEndNotFound(Exception):
def __init__(self, message, errors):
super().__init__(message)
self.errors = errors
def extract_header(filepath: Path):
# header_contents = ""
with open(filepath, "r") as f:
try:
header = ""
for _ in range(MAX_NLINES):
line = f.readline()
header += line
if HEADER_END in line:
break
except HeaderEndNotFound as e:
print(f"{MAX_NLINES} checked and no header found")
raise (e)
return header
def parse_table(
string: str,
start_trigger: t.Union[str, Keyword],
) -> pd.DataFrame:
"""Parse csv-like table
Parameters
----------
string : str
contents to parse
start_trigger : t.Union[str, t.Collection]
string or triggers that indicate section start.
Returns
-------
pd.Dataframe or dict of atomic values (int,str,bool,float)
DataFrame representing table.
Examples
--------
>>> table = parse_table()
"""
if isinstance(start_trigger, str):
start_trigger: Keyword = Keyword(start_trigger)
EOL = LineEnd().suppress()
field = OneOrMore(CharsNotIn(":,\n"))
line = LineStart() + Group(
OneOrMore(field + Literal(",").suppress()) + field + EOL
)
parser = (
start_trigger
+ EOL
+ Group(OneOrMore(line))
+ EOL # end_trigger.suppress()
)
parser_result = parser.search_string(string)
assert all(
[len(row) == len(parser_result[0]) for row in parser_result]
), f"Table {start_trigger} has unequal number of columns"
assert len(parser_result), f"Parsing is empty. {parser}"
return table_to_df(parser_result.as_list())
def parse_fields(
string: str, start_trigger, end_trigger=None
) -> t.Union[pd.DataFrame, t.Dict[str, atomic]]:
"""
Fields are parsed as key: value
By default the end is an empty newline.
For example
group: YST_1510 field: time
start: 0
interval: 300
frames: 180
"""
EOL = LineEnd().suppress()
if end_trigger is None:
end_trigger = EOL
elif isinstance(end_trigger, str):
end_trigger = Literal(end_trigger)
field = OneOrMore(CharsNotIn(":\n"))
line = (
LineStart()
+ Group(field + Combine(OneOrMore(Literal(":").suppress() + field)))
+ EOL
)
parser = (
start_trigger + EOL + Group(OneOrMore(line)) + end_trigger.suppress()
)
parser_result = parser.search_string(string)
results = parser_result.as_list()
assert len(results), "Parsing returned nothing"
return fields_to_dict_or_table(results)
# Grammar specification
grammar = {
"general": {
"start_trigger": Literal("Swain Lab microscope experiment log file"),
"type": "fields",
"end_trigger": "-----Acquisition settings-----",
},
"image_config": {
"start_trigger": "Image Configs:",
"type": "table",
},
"device_properties": {
"start_trigger": "Device properties:",
"type": "table",
},
"group": {
"position": {
"start_trigger": Group(
Group(Literal("group:") + Word(printables))
+ Group(Literal("field:") + "position")
),
"type": "table",
},
**{
key: {
"start_trigger": Group(
Group(Literal("group:") + Word(printables))
+ Group(Literal("field:") + key)
),
"type": "fields",
}
for key in ("time", "config")
},
},
}
ACQ_START = "-----Acquisition settings-----"
HEADER_END = "-----Experiment started-----"
MAX_NLINES = 2000 # In case of malformed logfile
# test_file = "/home/alan/Downloads/pH_med_to_low.log"
# test_file = "/home/alan/Documents/dev/skeletons/scripts/dev/C1_60x.log"
ParserElement.setDefaultWhitespaceChars(" \t")
# time_fields = parse_field(acq, start_trigger=grammar["group"]["time"]["start_trigger"])
# config_fields = parse_fields(
# acq, start_trigger=grammar["group"]["config"]["start_trigger"]
# )
# general_fields = parse_fields(basic, start_trigger=grammar["general"]["start_trigger"])
def parse_from_grammar(filepath: str, grammar: t.Dict):
header = extract_header(filepath)
d = {}
for key, values in grammar.items():
try:
if "type" in values:
d[key] = parse_x(header, **values)
else: # Use subkeys to parse groups
for subkey, subvalues in values.items():
subkey = "_".join((key, subkey))
d[subkey] = parse_x(header, **subvalues)
except Exception as e:
logging.getLogger("aliby").critical(
f"Parsing failed for key {key} and values {values}"
)
raise (e)
return d
def table_to_df(result: t.List[t.List]):
if len(result) > 1: # Multiple tables with ids to append
# Generate multiindex from "Name column"
# index = [row[1][0][1] for table in result for row in table]
# table[1][0].index("Name") # for automatic indexing
from itertools import product
group_name = [
product((table[0][0][1],), (row[0] for row in table[1][1:]))
for table in result
]
tmp = [pair for pairset in group_name for pair in pairset]
multiindices = pd.MultiIndex.from_tuples(tmp)
df = pd.DataFrame(
[row for pr in result for row in pr[1][1:]],
columns=result[0][1][0],
index=multiindices,
)
df.name = result[0][0][1][1]
else: # If it is a single table
df = pd.DataFrame(result[0][1][1:], columns=result[0][1][0])
return df
def fields_to_dict_or_table(result: t.List[t.List]):
if len(result) > 1:
formatted = pd.DataFrame(
[[row[1] for row in pr[1]] for pr in result],
columns=[x[0] for x in result[0][1]],
index=[x[0][0][1] for x in result],
)
formatted.name = result[0][0][1][1]
else: # If it is a single table
formatted = {k: _cast_type(v) for k, v in dict(result[0][1]).items()}
return formatted
def _cast_type(x: str) -> t.Union[str, int, float, bool]:
# Convert to any possible when possible
x = x.strip()
if x.isdigit():
x = int(x)
else:
try:
x = float(x)
except:
try:
x = ("false", "true").index(x.lower())
except:
pass
return x
def parse_x(string: str, type: str, **kwargs):
# return eval(f"parse_{type}({string}, **{kwargs})")
return eval(f"parse_{type}(string, **kwargs)")
def parse_from_swainlab_grammar(filepath: t.Union[str, Path]):
return parse_from_grammar(filepath, grammar)
File moved
#!/usr/bin/env jupyter
import re
import typing as t
from copy import copy
import pandas as pd
from agora.io.signal import Signal
from agora.utils.kymograph import bidirectional_retainment_filter
from postprocessor.core.abc import get_process
class Chainer(Signal):
"""
Extend Signal by applying post-processes and allowing composite signals that combine basic signals.
It "chains" multiple processes upon fetching a dataset to produce the desired datasets.
Instead of reading processes previously applied, it executes
them when called.
"""
_synonyms = {
"m5m": ("extraction/GFP/max/max5px", "extraction/GFP/max/median")
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def replace_path(path: str, bgsub: bool = ""):
# function to add bgsub to paths
channel = path.split("/")[1]
suffix = "_bgsub" if bgsub else ""
path = re.sub(channel, f"{channel}{suffix}", path)
return path
# Add chain with and without bgsub for composite statistics
self.common_chains = {
alias
+ bgsub: lambda **kwargs: self.get(
replace_path(denominator, alias + bgsub), **kwargs
)
/ self.get(replace_path(numerator, alias + bgsub), **kwargs)
for alias, (denominator, numerator) in self._synonyms.items()
for bgsub in ("", "_bgsub")
}
def get(
self,
dataset: str,
chain: t.Collection[str] = ("standard", "interpolate", "savgol"),
in_minutes: bool = True,
stages: bool = True,
retain: t.Optional[float] = None,
**kwargs,
):
"""Load data from an h5 file."""
if dataset in self.common_chains:
# get dataset for composite chains
data = self.common_chains[dataset](**kwargs)
else:
# use Signal's get_raw
data = self.get_raw(dataset, in_minutes=in_minutes, lineage=True)
if chain:
data = self.apply_chain(data, chain, **kwargs)
if retain:
# keep data only from early time points
data = self.get_retained(data, retain)
if stages and "stage" not in data.columns.names:
# return stages as additional column level
stages_index = [
x
for i, (name, span) in enumerate(self.stages_span_tp)
for x in (f"{i} { name }",) * span
]
data.columns = pd.MultiIndex.from_tuples(
zip(stages_index, data.columns),
names=("stage", "time"),
)
return data
def apply_chain(
self, input_data: pd.DataFrame, chain: t.Tuple[str, ...], **kwargs
):
"""
Apply a series of processes to a data set.
Like postprocessing, Chainer consecutively applies processes.
Parameters can be passed as kwargs.
Chainer does not support applying the same process multiple times with different parameters.
Parameters
----------
input_data : pd.DataFrame
Input data to process.
chain : t.Tuple[str, ...]
Tuple of strings with the names of the processes
**kwargs : kwargs
Arguments passed on to Process.as_function() method to modify the parameters.
Examples
--------
FIXME: Add docs.
"""
result = copy(input_data)
self._intermediate_steps = []
for process in chain:
if process == "standard":
result = bidirectional_retainment_filter(result)
else:
params = kwargs.get(process, {})
process_cls = get_process(process)
result = process_cls.as_function(result, **params)
process_type = process_cls.__module__.split(".")[-2]
if process_type == "reshapers":
if process == "merger":
raise (NotImplementedError)
self._intermediate_steps.append(result)
return result
"""
Script in development
"""
# /usr/bin/env python3
import re
import warnings
from abc import abstractmethod
from collections import Counter
from pathlib import Path
from typing import Dict, Iterable, Tuple, Union
import h5py
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from agora.abc import ProcessABC
from matplotlib.backends.backend_pdf import PdfPages
from numpy import ndarray
from scipy.signal import find_peaks
from postprocessor.grouper import NameGrouper
sns.set_style("darkgrid")
# Main dataframe structure
# | position | group | ntraps |robustness index | initial_ncells | final_ncells
# dir = "/home/alan/Documents/dev/skeletons/data/2021_06_15_pypipeline_unit_test_00/2021_06_15_pypipeline_unit_test_00/"
# dir = "/home/alan/Documents/dev/libs/aliby/data/2021_08_24_2Raf_00/2021_08_24_2Raf_00/"
# dirs = [
# "16543_2019_07_16_aggregates_CTP_switch_2_0glu_0_0glu_URA7young_URA8young_URA8old_01",
# "16545_2019_07_16_aggregates_CTP_switch_2_0glu_0_0glu_URA7young_URA8young_URA8old_secondRun_01",
# "18069_2019_12_05_aggregates_updownshift_2_0_2_URA8_URA7H360A_URA7H360R_00",
# "18616_2020_02_20_protAgg_downUpShift_2_0_2_Ura8_Ura8HA_Ura8HR_01",
# "18617_2020_02_21_protAgg_downUpShift_2_0_2_pHluorin_Ura7HA_Ura7HR_00",
# "19129_2020_09_06_DownUpshift_2_0_2_glu_ura_mig1msn2_phluorin_00",
# "19144_2020_09_07_DownUpshift_2_0_2_glu_ura_mig1msn2_phluorin_secondRound_00",
# "19169_2020_09_09_downUpshift_2_0_2_glu_ura8_phl_mig1_phl_msn2_03",
# "19199_2020_09_29_downUpshift_2_0_2_glu_ura8_ura8h360a_ura8h360r_00",
# "19203_2020_09_30_downUpshift_twice_2_0_2_glu_ura8_ura8h360a_ura8h360r_00",
# "19207_2020_10_01_exp_00",
# "19232_2020_10_02_downUpshift_twice_2_0_2_glu_ura8_phluorinMsn2_phluorinMig1_01",
# "19307_2020_10_22_downUpshift_2_01_2_glucose_dual_pH__dot6_nrg1_tod6__00",
# "19310_2020_10_22_downUpshift_2_0_2_glu_dual_phluorin__glt1_psa1_ura7__thrice_00",
# "19311_2020_10_23_downUpshift_2_0_2_glu_dual_phluorin__glt1_psa1_ura7__twice__04",
# "19328_2020_10_31_downUpshift_four_2_0_2_glu_dual_phl__glt1_ura8_ura8__00",
# "19329_2020_11_01_exp_00",
# "19333_2020_11_02_downUpshift_2_0_2_glu_ura7_ura7ha_ura7hr_00",
# "19334_2020_11_02_downUpshift_2_0_2_glu_ura8_ura8ha_ura8hr_00",
# "19447_2020_11_18_downUpshift_2_0_2_glu_gcd2_gcd6_gcd7__02",
# "19810_2021_02_21_ToxicityTest_00",
# "19993_2021_06_15_pypipeline_unit_test_00",
# "19996_2021_06_27_ph_calibration_dual_phl_ura8_5_04_5_83_7_69_7_13_6_59__01",
# "20419_2021_11_02_dose_response_raf_05_075_2_glu_005_2_constantMedia_00",
# ]
# outdir = "/home/alan/Documents/dev/skeletons/data"
# dirs = Path(outdir).glob("*ph*")
# from abc import abstractclassmethod, abstractmethod
# group_pos_trap_ncells = (
# concat.dropna().groupby(["group", "position", "trap"]).apply(len)
# )
# group_pos_trapswcell = (
# group_pos_trap_ncells.dropna().groupby(["group", "position"]).apply(len)
# )
class Meta:
"""Convenience class to fetch data from hdf5 file."""
def __init__(self, filename):
self.filename = filename
@property
def ntimepoints(self):
with h5py.File(self.filename, "r") as f:
return f.attrs["time_settings/ntimepoints"][0]
class Compiler(ProcessABC):
# def __init__(self, parameters):
# super().__init__(parameters)
@abstractmethod
def load_data(self):
"""Abstract function that must be reimplemented."""
pass
@abstractmethod
def run():
pass
class ExperimentCompiler(Compiler):
def __init__(self, CompilerParameters, exp_path: Path):
super().__init__(CompilerParameters)
self.load_data(exp_path)
def run(self):
return {
method: getattr(self, "compile_" + method)()
for method in (
"slice",
"slices",
"delta_traps",
"pertrap_metric",
"ncells",
"last_valid_tp",
"stages_dmetric",
"fluorescence",
)
}
def load_data(self, path: Path):
self.grouper = NameGrouper(path)
self.meta = Meta(self.grouper.files[0])
@property
def ntraps(self) -> dict:
"""Get the number of traps in each position.
Returns ------- dict str -> int Examples -------- FIXME: Add
docs.
"""
return {
pos: coords.shape[0]
for pos, coords in self.grouper.traplocs().items()
}
def concat_signal(self, sigloc=None, mode=None, **kwargs) -> pd.DataFrame:
if sigloc is None:
sigloc = "extraction/general/None/volume"
self.sigloc = sigloc
if mode is None:
mode = "retained"
if not hasattr(self, "_concat") or self.sigloc != sigloc:
self._concat = self.grouper.concat_signal(
self.sigloc, mode=mode, **kwargs
)
return self._concat
def get_tp(self, sigloc=None, tp=None, mode=None, **kwargs) -> pd.Series:
if tp is None:
tp = 0
if mode is None:
mode = True
return self.concat_signal(sigloc=sigloc, mode=mode, **kwargs).iloc[
:, tp
]
def count_cells(
self,
signal="extraction/general/None/volume",
mode="raw",
**kwargs,
):
df = self.grouper.concat_signal(signal, mode=mode, **kwargs)
df = df.groupby(["group", "position", "trap"]).count()
df[df == 0] = np.nan
return df
def compile_dmetrics(self, stages=None):
"""Generate dataframe with dVol metrics without major cell picking."""
names_signals = {
"dvol": "postprocessing/dsignal/postprocessing_savgol_extraction_general_None_volume",
"bud_dvol": "postprocessing/bud_metric/postprocessing_dsignal_postprocessing_savgol_extraction_general_None_volume",
}
names_signals = {
"dvol": "postprocessing/dsignal/postprocessing_savgol_extraction_general_None_volume",
"bud_dvol": "postprocessing/bud_metric/postprocessing_dsignal_postprocessing_savgol_extraction_general_None_volume",
"buddings": "postprocessing/buddings/extraction_general_None_volume",
}
operations = {
"dvol": ("dvol", "max"),
"bud_dvol": ("bud_dvol", "max"),
"buddings": ("buddings", "sum"),
"buddings_mean": ("buddings", "mean"),
}
input_signals = {
k: self.grouper.concat_signal(v) for k, v in names_signals.items()
}
ids = input_signals["buddings"].index
for v in input_signals.values():
ids = ids.intersection(v.index)
if stages:
def process_dfs(dfs, rng):
return pd.DataFrame(
{
k: getattr(dfs[sig].loc(axis=1)[rng].loc[ids], op)(
axis=1
)
if isinstance(op, str)
else dfs[sig].loc[ids].apply(op, axis=1)
for k, (sig, op) in operations.items()
}
)
# Note that all input_signals columns must be the same
col_vals = list(input_signals.values())[0].columns
stages_dfs = {"Full": process_dfs(input_signals, col_vals)}
for k, rng in stages:
stage_df = process_dfs(input_signals, col_vals[rng])
stages_dfs[k] = stage_df
concat = pd.concat([x.reset_index() for x in stages_dfs.values()])
concat["stage"] = np.array(
[
np.repeat(x, len(concat) // len(stages_dfs))
for x in stages_dfs.keys()
]
).flatten()
return (
concat.set_index(["group", "position", "trap", "cell_label"])
.melt("stage", ignore_index=False, var_name="growth_metric")
.reset_index()
)
def compile_stages_dmetric(self):
stages = self.get_stages()
return self.compile_dmetrics(stages=stages)
def get_stages(self):
"""Use the metadata to give a prediction of the media being pumped at
each time point. Works for traditional metadata (pre-fluigent).
Returns: ------ A list of tuples where in each the first value
is the active pump's contents and the second its associated
range of time points
"""
fpath = list(self.grouper.signals.values())[0].filename
with h5py.File(fpath, "r") as f:
tinterval = f.attrs.get("time_settings/timeinterval", None)[0]
tnorm = tinterval / 60
switch_times = f.attrs.get("switchtimes", None) / tnorm
last_tp = (
f.attrs.get("time_settings/totaltime", None)[0] / tinterval
)
pump_contents = f.attrs.get("pumpinit/contents", None)
init_frate = f.attrs.get("pumpinit/flowrate", None)
prate = f.attrs.get("pumprate", None)
main_pump = np.array((init_frate.argmax(), *prate.argmax(axis=0)))
intervals = np.array((0, *switch_times, last_tp), dtype=int)
extracted_tps = self.grouper.ntimepoints
stages = [ # Only add intervals with length larger than zero
(
": ".join((str(i + 1), pump_contents[p_id])),
range(intervals[i], min(intervals[i + 1], extracted_tps)),
)
for i, p_id in enumerate(main_pump)
if (intervals[i + 1] > intervals[i])
]
return stages
def compile_growth_metrics(
self,
min_nbuddings: int = 2,
):
"""Filter mothers with n number of buddings and get their metrics.
Select cells with at least two recorded buddings
"""
names_signals = {
"dvol": "postprocessing/dsignal/postprocessing_savgol_extraction_general_None_volume",
"bud_dvol": "postprocessing/bud_metric/postprocessing_dsignal_postprocessing_savgol_extraction_general_None_volume",
"buddings": "postprocessing/buddings/extraction_general_None_volume",
}
operations = {
"dvol": ("dvol", "max"),
"bud_dvol": ("bud_dvol", "max"),
"buddings": ("buddings", "sum"),
"cycle_length_mean": (
"buddings",
lambda x: bn.nanmean(np.diff(np.where(x)[0])),
),
"cycle_length_min": (
"buddings",
lambda x: bn.nanmin(np.diff(np.where(x)[0])),
),
"cycle_length_median": (
"buddings",
lambda x: np.nanmedian(np.diff(np.where(x)[0])),
),
}
input_signals = {
k: self.grouper.concat_signal(v) for k, v in names_signals.items()
}
ids = self.get_shared_ids(input_signals, min_nbuddings=min_nbuddings)
compiled_df = pd.DataFrame(
{
k: getattr(input_signals[sig].loc[ids], op)(axis=1)
if isinstance(op, str)
else input_signals[sig].loc[ids].apply(op, axis=1)
for k, (sig, op) in operations.items()
}
)
return compiled_df
def get_shared_ids(
self, input_signals: Dict[str, pd.DataFrame], min_nbuddings: int = None
):
"""Get the intersection id of multiple signals.
"buddings" must be one the keys in input_signals to use the
argument min_nbuddings.
"""
ids = list(input_signals.values())[0].index
if min_nbuddings is not None:
ids = (
input_signals["buddings"]
.loc[input_signals["buddings"].sum(axis=1) >= min_nbuddings]
.index
)
for v in input_signals.values():
ids = ids.intersection(v.index)
return ids
def compile_ncells(self):
df = self.count_cells()
df = df.melt(ignore_index=False)
df.columns = ["timepoint", "ncells_pertrap"]
return df
def compile_last_valid_tp(self) -> pd.Series:
"""Last valid timepoint per position."""
df = self.count_cells()
df = df.apply(lambda x: x.last_valid_index(), axis=1)
df = df.groupby(["group", "position"]).max()
return df
def compile_slices(self, nslices=2, **kwargs):
tps = [
min(
i * (self.grouper.ntimepoints // nslices),
self.grouper.ntimepoints - 1,
)
for i in range(nslices + 1)
]
slices = [self.compile_slice(tp=tp, **kwargs) for tp in tps]
slices_df = pd.concat(slices)
slices_df["timepoint"] = np.concatenate(
[np.repeat(tp, len(slice_df)) for tp, slice_df in zip(tps, slices)]
)
return slices_df
def compile_slice_end(self, **kwargs):
return self.compile_slice(tp=-1, **kwargs)
def guess_metrics(self, metrics: Dict[str, Tuple[str]] = None):
"""First approach at autoselecting certain signals for automated
analysis."""
if metrics is None:
metrics = {
"GFP": ("median", "max5"),
"mCherry": ("median", "max5"),
# "general": ("eccentricity",),
"Flavin": ("median",),
"postprocessing/savgol": ("volume",),
"dsignal/postprocessing_savgol": ("volume",),
"bud_metric.*dsignal.*savgol": ("volume",),
"ph_ratio": ("median",),
}
sigs = self.grouper.siglist
selection = {
".".join((ch, metric)): sig
for sig in sigs
for ch, metric_set in metrics.items()
for metric in metric_set
if re.search("(?!.*bgsub).*".join((ch, metric)) + "$", sig)
}
return selection
def compile_fluorescence(
self,
metrics: Dict[str, Tuple[str]] = None,
norm: tuple = None,
**kwargs,
):
"""Get a single signal per."""
if norm is None:
norm = (
"GFP",
"GFPFast",
"ph_ratio",
"Flavin",
"Citrine",
"mCherry",
)
selection = self.guess_metrics(metrics)
input_signals = {
k: self.grouper.concat_signal(v, **kwargs)
for k, v in selection.items()
}
# ids = self.get_shared_ids(input_signals)
to_concat = []
def format_df(df):
return df.melt(
ignore_index=False, var_name="timepoint"
).reset_index()
for k, v in input_signals.items():
tmp_formatted = format_df(v)
tmp_formatted["signal"] = k
to_concat.append(tmp_formatted)
if norm and k.split(".")[0] in norm:
norm_v = v.subtract(v.min(axis=1), axis=0).div(
v.max(axis=1) - v.min(axis=1), axis=0
)
# norm_v = v.groupby(["position", "trap", "cell_label"]).transform(
# # lambda x: x - x.min() / (x.max() - x.min())
# lambda x: (x - x.min())
# / (x.max() - x.min())
# )
formatted = format_df(norm_v)
formatted["signal"] = "norm_" + k
to_concat.append(formatted)
concated = pd.concat(to_concat, axis=0)
return concated
def compile_slice(
self, sigloc=None, tp=None, metrics=None, mode=None, **kwargs
) -> pd.DataFrame:
if sigloc is None:
self.sigloc = "extraction/general/None/volume"
if tp is None:
tp = 0
if metrics is None:
metrics = ("max", "mean", "median", "count", "std", "sem")
if mode is None:
mode = True
df = pd.concat(
[
getattr(
self.get_tp(sigloc=sigloc, tp=tp, mode=mode, **kwargs)
.groupby(["group", "position", "trap"])
.max()
.groupby(["group", "position"]),
met,
)()
for met in metrics
],
axis=1,
)
df.columns = metrics
merged = self.add_column(df, self.ntraps, name="ntraps")
return merged
@staticmethod
def add_column(df: pd.DataFrame, new_values_d: dict, name="new_col"):
if name in df.columns:
warnings.warn(
"ExpCompiler: Replacing existing column in compilation"
)
df[name] = [
new_values_d[pos] for pos in df.index.get_level_values("position")
]
return df
@staticmethod
def traploc_diffs(traplocs: ndarray) -> list:
"""Obtain metrics for trap localisation.
Parameters ---------- traplocs : ndarray (x,2) 2-dimensional
array with the x,y coordinates of traps in each column
Examples -------- FIXME: Add docs.
"""
signal = np.zeros((traplocs.max(), 2))
for i in range(2):
counts = Counter(traplocs[:, i])
for j, v in counts.items():
signal[j - 1, i] = v
diffs = [
np.diff(x)
for x in np.apply_along_axis(find_peaks, 0, signal, distance=10)[0]
]
return diffs
def compile_delta_traps(self):
group_names = self.grouper.group_names
tups = [
(group_names[pos], pos, axis, val)
for pos, coords in self.grouper.traplocs().items()
for axis, vals in zip(("x", "y"), self.traploc_diffs(coords))
for val in vals
]
return pd.DataFrame(
tups, columns=["group", "position", "axis", "value"]
)
def compile_pertrap_metric(
self,
ranges: Iterable[Iterable[int]] = [
[0, -1],
],
metric: str = "count",
):
"""Get the number of cells per trap present during the given ranges."""
sig = self.concat_signal()
for i, rngs in enumerate(ranges):
for j, edge in enumerate(rngs):
if edge < 0:
ranges[i][j] = sig.shape[1] - i + 1
df = pd.concat(
[
self.get_filled_trapcounts(
sig.loc(axis=1)[slice(*rng)], metric=metric
)
for rng in ranges
],
axis=1,
)
return df.astype(str)
def get_filled_trapcounts(
self, signal: pd.DataFrame, metric: str
) -> pd.Series:
present = signal.apply(
lambda x: (not x.first_valid_index())
& (x.last_valid_index() == len(x) - 1),
axis=1,
)
results = getattr(
signal.loc[present]
.iloc[:, 0]
.groupby(["group", "position", "trap"]),
metric,
)()
filled = self.fill_trapcount(results)
return filled
def fill_trapcount(
self, srs: pd.Series, fill_value: Union[int, float] = 0
) -> pd.Series:
"""Fill the last level of a MultiIndex in a pd.Series.
Use self to get the max number of traps per position and use
this information to add rows with empty values (with plottings
of distributions in mind) Parameters ---------- srs : pd.Series
Series with a pd.MultiIndex index self : ExperimentSelf
class with 'ntraps' information that returns a dictionary with
position -> ntraps. fill_value : Union[int, float] Value
used to fill new rows. Returns ------- pd.Series Series
with no numbers skipped on the last level. Examples --------
FIXME: Add docs.
"""
all_sets = set(
[
(pos, i)
for pos, ntraps in self.ntraps.items()
for i in range(ntraps)
]
)
dif = all_sets.difference(
set(
zip(
*[
srs.index.get_level_values(i)
for i in ("position", "trap")
]
)
).difference()
)
new_indices = pd.MultiIndex.from_tuples(
[
(self.grouper.group_names[idx[0]], idx[0], np.uint(idx[1]))
for idx in dif
]
)
new_indices = new_indices.set_levels(
new_indices.levels[-1].astype(np.uint), level=-1
)
empty = pd.Series(fill_value, index=new_indices, name="ncells")
return pd.concat((srs, empty))
class Reporter(object):
"""Manages Multiple pages to generate a report."""
def __init__(
self,
data: Dict[str, pd.DataFrame],
pages: dict = None,
path: str = None,
):
self.data = data
if pages is None:
pages = {
"qa": self.gen_page_qa(),
"growth": self.gen_page_growth(),
"fluorescence": self.gen_page_fluorescence(),
}
self.pages = pages
if path is not None:
self.path = path
self.porgs = {k: PageOrganiser(data, v) for k, v in pages.items()}
@property
def pdf(self):
return self._pdf
@pdf.setter
def pdf(self, path: str):
self._pdf = PdfPages(path)
def plot_report(self, path: str = None):
if path is None:
path = self.path
with PdfPages(path) as pdf:
for page_org in list(self.porgs.values())[::-1]:
page_org.plot_page()
pdf.savefig(page_org.fig)
# pdf.savefig()
plt.close()
@staticmethod
def gen_page_qa():
page_qc = (
{
"data": "slice",
"func": "barplot",
"args": ("ntraps", "position"),
"kwargs": {"hue": "group", "palette": "muted"},
"loc": (0, 0),
},
{
"data": "delta_traps",
"func": "barplot",
"args": ("axis", "value"),
"kwargs": {
"hue": "group",
},
"loc": (0, 1),
},
{
"data": "slices",
"func": "violinplot",
"args": ("group", "median"),
"kwargs": {
"hue": "timepoint",
},
"loc": (2, 1),
},
{
"data": "pertrap_metric",
"func": "histplot",
"args": (0, None),
"kwargs": {
"hue": "group",
"multiple": "dodge",
"discrete": True,
},
"loc": (2, 0),
},
{
"data": "ncells",
"func": "lineplot",
"args": ("timepoint", "ncells_pertrap"),
"kwargs": {
"hue": "group",
},
"loc": (1, 1),
},
{
"data": "last_valid_tp",
"func": "stripplot",
"args": (0, "position"),
"kwargs": {
"hue": "group",
},
"loc": (1, 0),
},
)
return page_qc
@staticmethod
def gen_page_fluorescence():
return (
{
"data": "fluorescence",
"func": "relplot",
"args": ("timepoint", "value"),
"kwargs": {
"col": "signal",
"col_wrap": 2,
"hue": "group",
"facet_kws": {"sharey": False, "sharex": True},
"kind": "line",
},
},
)
def gen_page_cell_cell_corr():
pass
@staticmethod
def gen_page_growth():
return (
{
"data": "stages_dmetric",
"func": "catplot",
"args": ("stage", "value"),
"kwargs": {
"hue": "group",
"col": "growth_metric",
"col_wrap": 2,
"kind": "box",
"sharey": False,
},
},
)
def gen_all_instructions(self):
qa = self.gen_page_qa()
growth = self.gen_page_growth()
return (qa, growth)
class PageOrganiser(object):
"""Add multiple plots to a single page, wither using seaborn multiplots or
manual GridSpec."""
def __init__(
self,
data: Dict[str, pd.DataFrame],
instruction_set: Iterable = None,
grid_spec: tuple = None,
fig_kws: dict = None,
):
self.instruction_set = instruction_set
self.data = {k: df for k, df in data.items()}
self.single_fig = True
if len(instruction_set) > 1:
self.single_fig = False
if not self.single_fig: # Select grid_spec with location info
if grid_spec is None:
locs = np.array(
[x.get("loc", (0, 0)) for x in instruction_set]
)
grid_spec = locs.max(axis=0) + 1
if fig_kws is None:
self.fig = plt.figure(dpi=300)
self.fig.set_size_inches(8.27, 11.69, forward=True)
plt.figtext(0.02, 0.99, "", fontsize="small")
self.gs = plt.GridSpec(*grid_spec, wspace=0.3, hspace=0.3)
self.axes = {}
reset_index = (
lambda df: df.reset_index().sort_values("position")
if isinstance(df.index, pd.core.indexes.multi.MultiIndex)
else df.sort_values("position")
)
self.data = {k: reset_index(df) for k, df in self.data.items()}
def place_plot(self, func, xloc=None, yloc=None, **kwargs):
if xloc is None:
xloc = 0
if yloc is None:
yloc = 0
if (
self.single_fig
): # If plotting using a figure method using seaborn cols/rows
self.g = func(**kwargs)
self.axes = {
ax.title.get_text().split("=")[-1][1:]: ax
for ax in self.g.axes.flat
}
self.fig = self.g.fig
else:
self.axes[(xloc, yloc)] = self.fig.add_subplot(self.gs[xloc, yloc])
func(
ax=self.axes[(xloc, yloc)],
**kwargs,
)
# Eye candy
if np.any( # If there is a long label, rotate them all
[
len(lbl.get_text()) > 8
for ax in self.axes.values()
for lbl in ax.get_xticklabels()
]
) and hasattr(self, "g"):
for axes in self.g.axes.flat:
_ = axes.set_xticklabels(
axes.get_xticklabels(),
rotation=15,
horizontalalignment="right",
)
def plot_page(
self, instructions: Iterable[Dict[str, Union[str, Iterable]]] = None
):
if instructions is None:
instructions = self.instruction_set
if isinstance(instructions, dict):
how = (instructions,)
for how in instructions:
self.place_plot(
self.gen_sns_wrapper(how),
*how.get("loc", (None, None)),
)
def gen_sns_wrapper(self, how):
def sns_wrapper(ax=None):
kwargs = how.get("kwargs", {})
if ax:
kwargs["ax"] = ax
elif "height" not in kwargs:
ncols = kwargs.get("col_wrap", 1)
if "col" in kwargs:
nrows = np.ceil(
len(np.unique(self.data[how["data"]][kwargs["col"]]))
/ ncols
)
else:
nrows = len(
np.unique(self.data[how["data"]][kwargs["row"]])
)
kwargs["height"] = 11.7
# kwargs["aspect"] = 8.27 / (11.7 / kwargs["col_wrap"])
kwargs["aspect"] = (8.27 / ncols) / (kwargs["height"] / nrows)
return getattr(sns, how["func"])(
data=self.data[how["data"]],
x=how["args"][0],
y=how["args"][1],
**kwargs,
)
return sns_wrapper
# fpath = "/home/alan/Documents/dev/skeletons/scripts/aggregates_exploration/18616_2020_02_20_protAgg_downUpShift_2_0_2_Ura8_Ura8HA_Ura8HR_01"
# # compiler = ExperimentCompiler(None, base_dir / dir)
# compiler = ExperimentCompiler(None, fpath)
# dfs = compiler.run()
# rep = Reporter(data=dfs, path=Path(fpath) / "report.pdf")
# rep.plot_report("./report.pdf")
# base_dir = Path("/home/alan/Documents/dev/skeletons/scripts/data/")
# for dir in dirs:
# try:
# compiler = ExperimentCompiler(None, base_dir / dir)
# dfs = compiler.run()
# rep = Reporter(data=dfs, path=base_dir / (dir + "/report.pdf"))
# from time import time
# rep.plot_report(base_dir / (dir + "/report.pdf"))
# except Exception as e:
# print("LOG:ERROR:", e)
# with open("errors.log", "a") as f:
# f.write(e)