Skip to content
Snippets Groups Projects
Commit c92f6cd9 authored by Alán Muñoz's avatar Alán Muñoz
Browse files

first commit post-restructure

parents
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
abc.py 0 → 100644
from abc import ABC, abstractmethod
from collections.abc import Iterable
from pathlib import Path, PosixPath
from typing import Union
from yaml import dump, safe_load
class ParametersABC(ABC):
"""
Base class to add yaml functionality to parameters
"""
def to_dict(self, iterable="null"):
"""
Recursive function that converts class to nested dictionary.
"""
if isinstance(iterable, dict):
if any(
[
True
for x in iterable.values()
if isinstance(x, Iterable) or hasattr(x, "to_dict")
]
):
return {
k: v.to_dict() if hasattr(v, "to_dict") else self.to_dict(v)
for k, v in iterable.items()
}
return iterable
elif iterable == "null":
return self.to_dict(self.__dict__)
else:
return iterable
@classmethod
def from_dict(cls, d: dict):
return cls(**d)
def to_yaml(self, path: Union[PosixPath, str] = None):
"""Return instance as yaml stream and optionally export to file.
Returns the yaml version of the class instance. If path is provided, it
is saved there as well.
Parameters
----------
path : Union[PosixPath, str]
Output path.
"""
if path:
with open(Path(path), "w") as f:
dump(self.to_dict(), f)
return dump(self.to_dict())
@classmethod
def from_yaml(cls, path: Union[PosixPath, str]):
with open(Path(path)) as f:
params = safe_load(f)
return cls(**params)
@classmethod
@abstractmethod
def default(cls):
pass
class ProcessABC(ABC):
"""Base class for processes"""
def __init__(self, parameters):
self._parameters = parameters
for k, v in parameters.to_dict().items(): # access parameters directly
setattr(self, k, v)
@property
def parameters(self):
return self._parameters
@abstractmethod
def run(self):
pass
from typing import Union
import collections
from itertools import groupby, chain, product
import numpy as np
import h5py
class BridgeH5:
"""
Base class to interact with h5 data stores.
It also contains functions useful to predict how long should segmentation take.
"""
def __init__(self, filename, flag="r"):
self.filename = filename
if flag is not None:
self._hdf = h5py.File(filename, flag)
self._filecheck
def _filecheck(self):
assert "cell_info" in self._hdf, "Invalid file. No 'cell_info' found."
def close(self):
self._hdf.close()
def max_ncellpairs(self, nstepsback):
"""
Get maximum number of cell pairs to be calculated
"""
dset = self._hdf["cell_info"][()]
# attrs = self._hdf[dataset].attrs
pass
@property
def cell_tree(self):
return self.get_info_tree()
def get_n_cellpairs(self, nstepsback=2):
cell_tree = self.cell_tree
# get pair of consecutive trap-time points
pass
@staticmethod
def get_consecutives(tree, nstepsback):
# Receives a sorted tree and returns the keys of consecutive elements
vals = {k: np.array(list(v)) for k, v in tree.items()} # get tp level
where_consec = [
{
k: np.where(np.subtract(v[n + 1 :], v[: -n - 1]) == n + 1)[0]
for k, v in vals.items()
}
for n in range(nstepsback)
] # get indices of consecutive elements
return where_consec
def get_npairs(self, nstepsback=2, tree=None):
if tree is None:
tree = self.cell_tree
consecutive = self.get_consecutives(tree, nstepsback=nstepsback)
flat_tree = flatten(tree)
n_predictions = 0
for i, d in enumerate(consecutive, 1):
flat = list(chain(*[product([k], list(v)) for k, v in d.items()]))
pairs = [(f, (f[0], f[1] + i)) for f in flat]
for p in pairs:
n_predictions += len(flat_tree.get(p[0], [])) * len(
flat_tree.get(p[1], [])
)
return n_predictions
def get_npairs_over_time(self, nstepsback=2):
tree = self.cell_tree
npairs = []
for t in self._hdf["cell_info"]["processed_timepoints"][()]:
tmp_tree = {
k: {k2: v2 for k2, v2 in v.items() if k2 <= t} for k, v in tree.items()
}
npairs.append(self.get_npairs(tree=tmp_tree))
return np.diff(npairs)
def get_info_tree(
self, fields: Union[tuple, list] = ("trap", "timepoint", "cell_label")
):
"""
Returns traps, time points and labels for this position in form of a tree
in the hierarchy determined by the argument fields. Note that it is
compressed to non-empty elements and timepoints.
Default hierarchy is:
- trap
- time point
- cell label
This function currently produces trees of depth 3, but it can easily be
extended for deeper trees if needed (e.g. considering groups,
chambers and/or positions).
input
:fields: Fields to fetch from 'cell_info' inside the hdf5 storage
returns
:tree: Nested dictionary where keys (or branches) are the upper levels
and the leaves are the last element of :fields:.
"""
zipped_info = (*zip(*[self._hdf["cell_info"][f][()] for f in fields]),)
return recursive_groupsort(zipped_info)
def groupsort(iterable: Union[tuple, list]):
# Sorts iterable and returns a dictionary where the values are grouped by the first element.
iterable = sorted(iterable, key=lambda x: x[0])
grouped = {k: [x[1:] for x in v] for k, v in groupby(iterable, lambda x: x[0])}
return grouped
def recursive_groupsort(iterable):
# Recursive extension of groupsort
if len(iterable[0]) > 1:
return {k: recursive_groupsort(v) for k, v in groupsort(iterable).items()}
else: # Only two elements in list
return [x[0] for x in iterable]
def flatten(d, parent_key="", sep="_"):
"""Flatten nested dict. Adapted from https://stackoverflow.com/a/6027615"""
items = []
for k, v in d.items():
new_key = parent_key + (k,) if parent_key else (k,)
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
# logfile\_parser
Simple log file parsing according to grammars specified in JSON
## Basic usage
This package comes with three built-in grammars: 'multiDGUI\_acq\_format',
'multiDGUI\_log\_format' and 'cExperiment\_log\_format'. As an example, the
'multiDGUI\_acq\_format' grammar can be used to parse the included example
using:
```python
>>> from logfile_parser import Parser
>>> acq_parser = Parser('multiDGUI_acq_format')
>>> with open('examples/example_multiDGUI_acq.txt', 'r') as f:
... parsed = acq_parser.parse(f)
>>> print(parsed)
```
The parsed output is a `dict` containing any fields satisfying the grammar.
## Defining new grammars
Custom grammars should be written in json as a dictionary with keys specifying
the information to extract from the log file.
The built-in grammars are useful examples or starting points for defining custom
grammars. They can be found in the `logfile_parser/grammars` directory.
Let's start with a basic example of a log file that we might want to parse:
```text
Date: 16 Apr 2020
Microscope: Batgirl
Experiment details:
My lengthy description of what will certainly be a great experiment.
This description takes multiple lines.
Tags:
User name, Project name, Experiment name
```
A basic grammar that just extracts the description of the experiment could be
defined using:
```json
{
"description": {
"trigger_startswith": "Experiment details:"
},
"stop": {
"trigger_startswith": "Tags:",
"type": "stop"
}
}
```
This tells the parser to fill the "description" field of the parsed result with
text on lines *after* that starting with the text "Experiment details:", and
then tells the parser to terminate parsing whenever it encounters a line that
starts with the text "Tags:". If you wanted it to include the trigger line, you
would specify `"skip": "false"` as an additional property for `"description"`.
If we also wanted to fill a "tags" field with the comma separated tags, we would
just need to change the type to "list":
```json
{
"description": {
"trigger_startswith": "Experiment details:"
},
"tags": {
"trigger_startswith": "Tags:",
"type": "list"
}
}
```
To extract the microscope name, we can make use of the "regex" type:
```json
{
"microscope": {
"trigger_startswith": "Microscope:",
"type": "regex",
"regex": "^Microscope:\\s*(.*)$"
}
}
```
The expression found in the bracketed group will be stored in the "microscope"
field of the parsed result.
Finally, to extract a date, we combine a "regex" with a "map" to map the text
to a Python `datetime` object:
```json
{
"date": {
"trigger_startswith": "Date:",
"type": "regex",
"regex": "^.*(\\d{2} [A-Z][a-z]{2} \\d{4})$",
"map": "datetime:%d %b %Y"
}
}
```
Putting this all together gives us the following grammar:
```json
{
"date": {
"trigger_startswith": "Date:",
"type": "regex",
"regex": "^.*(\\d{2} [A-Z][a-z]{2} \\d{4})$",
"map": "datetime:%d %b %Y"
},
"microscope": {
"trigger_startswith": "Microscope:",
"type": "regex",
"regex": "^Microscope:\\s*(.*)$"
},
"description": {
"trigger_startswith": "Experiment details:"
},
"tags": {
"trigger_startswith": "Tags:",
"type": "list"
}
}
```
If this is saved to a file `newgrammar.json` we could parse the log file as
listed above (say it is in `logfile.txt`) using the following:
```python
>>> from logfile_parser import Parser
>>> parser = Parser('newgrammar.json')
>>> with open('logfile.txt', 'r') as f:
... parsed = parser.parse(f)
>>> print(parsed)
{'date': datetime.datetime(2020, 4, 16, 0, 0), 'microscope': 'Batgirl',
'description': 'My lengthy description of what will certainly be a great
experiment.\nThis description takes multiple lines.', 'tags': ['User name',
'Project name', 'Experiment name']}
```
# -*- coding: utf-8 -*-
"""
logfile_parser
~~~~~~~~~~~~
Simple log file parsing according to grammars specified in JSON
:copyright: (c) 2020 by Julian Pietsch.
:license: LGPL
"""
from .logfile_parser import Parser
{
"@@CONFIG@@": {
"regex_preprocessing": ["^\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}\\s*(.*)$"]
},
"extractmethod": {
"trigger_startswith": "extractionParameters:",
"type": "regex",
"regex": "^extractFunction:\\s*(.*)$",
"use_unmatched": true
},
"segmethod": {
"trigger_re": "Start .* segmentation",
"type": "regex",
"regex": "^.*Start (.*) segmentation.*$"
},
"segcomplete": {
"trigger_re": "Successfully completed .* segmentation",
"type": "regex",
"regex": "^(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}).*$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
},
"compiled": {
"trigger_startswith": "Successfully completed compiling cell information",
"type": "regex",
"regex": "^(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}).*$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
}
}
{
"channels": {
"trigger_startswith": "Channels:",
"type": "table",
"column_map": {
"Channel name": ["channel","str"],
"Exposure time": ["exposure","int"],
"Skip": ["skip","int"],
"Z sect.": ["zsect","int"],
"Start time": ["start_time","int"],
"Camera mode": ["camera_mode","int"],
"EM gain": ["em_gain","int"],
"Voltage": ["voltage","float"]
}
},
"zsectioning": {
"trigger_startswith": "Z_sectioning:",
"type": "table",
"column_map": {
"Sections": ["nsections","int"],
"Spacing": ["spacing","float"],
"PFSon?": ["pfson","bool"],
"AnyZ?": ["anyz","bool"],
"Drift": ["drift","int"],
"Method": ["zmethod","int"]
}
},
"time_settings": {
"trigger_startswith": "Time_settings",
"type": "table",
"has_header": false,
"column_map": [
["istimelapse","bool"],
["timeinterval","int"],
["ntimepoints","int"],
["totaltime","int"]
]
},
"positions": {
"trigger_startswith": "Points:",
"type": "table",
"column_map": {
"Position name": ["posname","str"],
"X position": ["xpos","float"],
"Y position": ["ypos","float"],
"Z position": ["zpos","float"],
"PFS offset": ["pfsoffset","float"],
"Group": ["group","int"]
},
"default_map": "int"
},
"npumps": {
"trigger_startswith": "Syringe pump details:",
"type": "regex",
"regex": "^.*:\\s*(\\d+)\\s*pumps\\.*$",
"map": "int"
},
"pumpinit": {
"trigger_startswith": "Pump states at beginning of experiment:",
"type": "table",
"column_map": {
"Pump port": ["pump_port","str"],
"Diameter": ["syringe_diameter","float"],
"Current rate": ["flowrate","float"],
"Direction": ["flowdirection","str"],
"Running": ["isrunning", "bool"],
"Contents": ["contents", "str"]
}
},
"nswitches": {
"trigger_startswith": "Number of pump changes:",
"type": "regex",
"regex": "(\\d+)",
"map": "int"
},
"switchvol": {
"trigger_startswith": "Infuse/withdraw volumes:",
"type": "regex",
"regex": "(\\d+)",
"map": "int"
},
"switchrate": {
"trigger_startswith": "Infuse/withdraw rates:",
"type": "regex",
"regex": "(\\d+)",
"map": "int"
},
"switchtimes": {
"trigger_startswith": "Times:",
"type": "list",
"map": "int"
},
"switchtopump": {
"trigger_startswith": "Switched to:",
"type": "list",
"map": "int"
},
"switchfrompump": {
"trigger_startswith": "Switched from:",
"type": "list",
"map": "int"
},
"pumprate": {
"trigger_startswith": "Flow post switch:",
"type": "lists",
"map": "float"
}
}
{
"date": {
"trigger_re": "^\\d{2}-[A-Z][a-z]{2}-\\d{4}$",
"type": "regex",
"regex": "^\\d{2}-[A-Z][a-z]{2}-\\d{4}$",
"map": "datetime:%d-%b-%Y"
},
"multiDGUI_commit": {
"trigger_startswith": "Swain lab microscope control software",
"type": "regex",
"regex": "^.*commit number:([0-9a-z]+)$",
"next_section": "date"
},
"microscope": {
"trigger_startswith": "Microscope name is:",
"type": "regex",
"regex": "^Microscope name is:\\s+(.*)$"
},
"acqfile": {
"trigger_startswith": "Acquisition settings are saved in:",
"type": "regex",
"regex": "^(.*)$",
"skip": true
},
"details": {
"trigger_startswith": "Experiment details:"
},
"setup": {
"trigger_startswith": "Microscope setup for used channels:"
},
"omero_project": {
"trigger_startswith": "Omero project:",
"type": "regex",
"regex": "^(.*)$",
"skip": true
},
"omero_tags": {
"trigger_startswith": "Omero tags:",
"type": "list"
},
"omero_tags_stop": {
"trigger_startswith": "PFS is locked"
},
"omero_tag_descriptions": {
"trigger_startswith": "Omero tag descriptions:",
"type": "list"
},
"expt_start": {
"trigger_startswith": "Experiment started at:",
"type": "regex",
"regex": "^.*at:(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2})$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
},
"first_capture": {
"trigger_startswith": "------Time point_1",
"type": "regex",
"regex": "^Channel:.*set at:(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2})$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
},
"stop": {
"trigger_startswith": "------Time point_2",
"type": "stop"
}
}
# -*- coding: utf-8 -*-
"""
logfile_parser
~~~~~~~~~~~~
Simple log file parsing according to grammars specified in JSON
:copyright: (c) 2020 by Julian Pietsch.
:license: LGPL
"""
from os.path import exists, join, dirname
import json
import re
from datetime import datetime
import pkgutil
CONFIG_KEY = '@@CONFIG@@'
DEFAULT_NOSKIP = {'regex', 'regexs', 'list', 'lists'}
DEFAULT_NOT_USE_UNMATCHED = {'regex', 'regexs'}
class GrammarNotFound(OSError):
pass
class ParseError(Exception):
pass
class Parser(object):
def __init__(self, grammar_filename):
"""Create a Parser object based on the grammar defined in a file
:param grammar_filename: path to json file specifying grammar for this
parser, or one of the default grammars included with the package
"""
if exists(grammar_filename):
with open(grammar_filename, 'r') as f:
self.grammar = json.load(f)
else:
if not grammar_filename.endswith('.json'):
grammar_filename = grammar_filename + '.json'
try:
grammar_fd = pkgutil.get_data(__package__,
'grammars/' + grammar_filename)
except FileNotFoundError as e:
raise GrammarNotFound('specified grammar could not be '
'found')
self.grammar = json.loads(grammar_fd)
self._config = self.grammar.get(CONFIG_KEY, {})
if CONFIG_KEY in self.grammar:
del self.grammar[CONFIG_KEY]
# Preprocessing to be applied to each line before checking triggers
self._preprocessing = self._config.get('regex_preprocessing', [])
self._preprocessing = [re.compile(r) for r in self._preprocessing]
self._trigger_startswith = [(k, v['trigger_startswith'])
for k, v in self.grammar.items()
if 'trigger_startswith' in v]
self._trigger_endswith = [(k, v['trigger_endswith'])
for k, v in self.grammar.items()
if 'trigger_endswith' in v]
self._trigger_contains = [(k, v['trigger_contains'])
for k, v in self.grammar.items()
if 'trigger_contains' in v]
self._trigger_re = [(k, re.compile(v['trigger_re']))
for k, v in self.grammar.items()
if 'trigger_re' in v]
def _set_section(self, k=None):
if k in self.grammar:
self._active_section = self.grammar[k]
self._section_name = k
self._section_type = self._active_section.get('type')
else:
self._active_section = None
self._section_name = ''
self._section_type = None
def parse(self, filehandle):
"""Parse contents of file according to the loaded grammar
:param filehandle: a line generator, e.g., a valid file handle
"""
self._set_section()
table_header = []
column_types = []
output = {}
for line in filehandle:
line = line.strip()
if len(line) == 0:
# skip blank lines
continue
line_unmatched = line
line_pp = [r.findall(line) for r in self._preprocessing]
line_pp = [m[0].strip() for m in line_pp if len(m) == 1]
if len(line_pp) == 1:
line_unmatched = line_pp[0]
line_pp += [line]
sw_matches = [(k, t) for k, t in self._trigger_startswith
if any([l.startswith(t) for l in line_pp])]
ew_matches = [(k, t) for k, t in self._trigger_endswith
if any([l.endswith(t) for l in line_pp])]
co_matches = [(k, t) for k, t in self._trigger_contains
if any([l.find(t) >= 0 for l in line_pp])]
re_matches = [(k, r) for k, r in self._trigger_re
if any([len(r.findall(l)) > 0 for l in line_pp])]
section_match = {k for k, _ in (sw_matches + ew_matches +
co_matches + re_matches)}
if len(section_match) > 1:
raise ParseError('conflicting sections triggered')
if len(section_match) == 1:
# Update the active section
self._set_section(list(section_match)[0])
# Determine the unmatched part of the line
if len(sw_matches) > 0:
_, t = sw_matches[0]
line_unmatched = [l[len(t):] for l in line_pp
if l.startswith(t)][0]
elif len(ew_matches) > 0:
_, t = ew_matches[0]
line_unmatched = [l[:-(len(t)+1)] for l in line_pp
if l.endswith(t)][0]
elif len(co_matches) > 0:
_, t = co_matches[0]
lpp = [l for l in line_pp if l.find(t) >= 0][0]
i = lpp.find(t)
line_unmatched = lpp[:i] + lpp[(i + len(t)):]
elif len(re_matches) > 0:
_, r = re_matches[0]
line_unmatched = [r.sub('', l) for l in line_pp
if len(r.findall(l)) > 0][0]
# Skip the matched line if requested
if self._active_section.get('skip', self._section_type not in
DEFAULT_NOSKIP):
continue
if self._active_section is None:
continue
active_section = self._active_section
section_type = self._section_type
section_name = self._section_name
if active_section.get('use_unmatched', self._section_type not in
DEFAULT_NOT_USE_UNMATCHED):
line = line_unmatched.strip()
if len(line) == 0:
continue
if section_type == 'table':
sep = active_section.get('separator', ',')
row = line.split(sep)
if section_name not in output:
# Table needs initialisation
has_header = active_section.get('has_header', True)
if has_header:
row = [col.strip() for col in row]
default_type = active_section.get('default_map', 'str')
colmap = active_section.get(
'column_map', len(row)*[(None, None)]
)
if type(colmap) == list:
# Columns are defined in order
if has_header:
table_header = [mn or rn for rn, (mn, _) in
zip(row, colmap)]
table_header += row[len(colmap):]
column_types = [mt for _, mt in colmap]
column_types += (len(row) - len(colmap)) * [default_type]
else:
table_header = [mn or 'column{:02d}'.format(i+1)
for i, (mn, _) in enumerate(colmap)]
column_types = [mt or default_type for _, mt in colmap]
elif type(colmap) == dict:
if not has_header:
raise ParseError('dict column maps must have a header')
# First row is a header
table_header = [colmap.get(rn, (rn, None))[0] for rn in row]
column_types = [colmap.get(rn, (None, default_type))[1]
for rn in row]
else:
raise ParseError('badly formatted column map')
output[section_name] = {k: [] for k in table_header}
if has_header:
continue
if len(row) < len(table_header):
# skip lines that have fewer columns than expected
continue
# Merge extra columns into final column
ncol = len(table_header)
if len(row) > ncol:
row[ncol - 1] = sep.join(row[ncol - 1:])
del row[ncol:]
assert len(row) == len(table_header) and len(row) == len(column_types)
# Fill out current row
for val, colname, coltype in zip(row, table_header, column_types):
output[section_name][colname].append(
_map_to_type(val.strip(), coltype)
)
elif section_type in {'list', 'lists'}:
sep = active_section.get('separator', ',')
if section_name not in output:
output[section_name] = []
map_type = active_section.get('map')
next_list = [_map_to_type(el.strip(), map_type)
for el in line.split(sep)]
if section_type == 'lists':
output[section_name].append(next_list)
else:
output[section_name] += next_list
elif section_type in {'regex', 'regexs'}:
regex = active_section.get('regex', '^(.*)$')
map_type = active_section.get('map')
matches = re.findall(regex, line)
if len(matches) == 0:
continue
elif len(matches) == 1 and section_type == 'regex':
output[section_name] = _map_to_type(matches[0], map_type)
else:
if section_name not in output:
output[section_name] = []
output[section_name] += [_map_to_type(m, map_type)
for m in matches]
# Terminate after finding the first match
if section_type == 'regex':
next_section = active_section.get('next_section')
self._set_section(next_section)
elif section_type == 'stop':
break
else:
# By default, just append additional lines as text
if section_name in output:
output[section_name] += '\n' + line
else:
output[section_name] = line
return output
def _map_to_type(val, map_type):
if map_type and map_type.startswith('datetime'):
date_format = '%Y-%m-%dT%H:%M:%S.%fZ' # ISO 8601 format
if map_type.startswith('datetime:'):
date_format = map_type[9:]
try:
return datetime.strptime(val, date_format)
except ValueError:
return None
else:
try:
return {
'str': str, 'int': int, 'float': float, 'bool': bool
}.get(map_type, str)(val)
except ValueError or TypeError:
return {'float': float('nan')}.get(map_type)
"""
Parse microscopy log files according to specified JSON grammars.
Produces dictionary to include in HDF5
"""
import glob
import os
import numpy as np
import pandas as pd
from datetime import datetime, timezone
from pytz import timezone
from logfile_parser import Parser
# Paradigm: able to do something with all datatypes present in log files,
# then pare down on what specific information is really useful later.
# Needed because HDF5 attributes do not support dictionaries
def flatten_dict(nested_dict, separator="/"):
"""
Flattens nested dictionary
"""
df = pd.json_normalize(nested_dict, sep=separator)
return df.to_dict(orient="records")[0]
# Needed because HDF5 attributes do not support datetime objects
# Takes care of time zones & daylight saving
def datetime_to_timestamp(time, locale="Europe/London"):
"""
Convert datetime object to UNIX timestamp
"""
return timezone(locale).localize(time).timestamp()
def find_file(root_dir, regex):
file = glob.glob(os.path.join(str(root_dir), regex))
if len(file) != 1:
return None
else:
return file[0]
# TODO: re-write this as a class if appropriate
# WARNING: grammars depend on the directory structure of a locally installed
# logfile_parser repo
def parse_logfiles(
root_dir,
acq_grammar="multiDGUI_acq_format.json",
log_grammar="multiDGUI_log_format.json",
):
"""
Parse acq and log files depending on the grammar specified, then merge into
single dict.
"""
# Both acq and log files contain useful information.
# ACQ_FILE = 'flavin_htb2_glucose_long_ramp_DelftAcq.txt'
# LOG_FILE = 'flavin_htb2_glucose_long_ramp_Delftlog.txt'
log_parser = Parser(log_grammar)
try:
log_file = find_file(root_dir, "*log.txt")
except:
raise ValueError("Experiment log file not found.")
with open(log_file, "r") as f:
log_parsed = log_parser.parse(f)
acq_parser = Parser(acq_grammar)
try:
acq_file = find_file(root_dir, "*[Aa]cq.txt")
except:
raise ValueError("Experiment acq file not found.")
with open(acq_file, "r") as f:
acq_parsed = acq_parser.parse(f)
parsed = {**acq_parsed, **log_parsed}
for key, value in parsed.items():
if isinstance(value, datetime):
parsed[key] = datetime_to_timestamp(value)
parsed_flattened = flatten_dict(parsed)
for k, v in parsed_flattened.items():
if isinstance(v, list):
parsed_flattened[k] = [0 if el is None else el for el in v]
return parsed_flattened
import numpy as np
from copy import copy
from itertools import accumulate
from numpy import ndarray
# from more_itertools import first_true
import h5py
import pandas as pd
from utils_find_1st import find_1st, cmp_larger
from agora.io.bridge import BridgeH5
class Signal(BridgeH5):
"""
Class that fetches data from the hdf5 storage for post-processing
"""
def __init__(self, file):
super().__init__(file, flag=None)
self.names = ["experiment", "position", "trap"]
@staticmethod
def add_name(df, name):
df.name = name
return df
def mothers(self, signal, cutoff=0.8):
df = self[signal]
get_mothers = lambda df: df.loc[df.notna().sum(axis=1) > df.shape[1] * cutoff]
if isinstance(df, pd.DataFrame):
return get_mothers(df)
elif isinstance(df, list):
return [get_mothers(d) for d in df]
def __getitem__(self, dsets):
if isinstance(dsets, str) and (
dsets.startswith("postprocessing")
or dsets.startswith("/postprocessing")
or dsets.endswith("imBackground")
):
df = self.get_raw(dsets)
elif isinstance(dsets, str):
df = self.apply_prepost(dsets)
elif isinstance(dsets, list):
is_bgd = [dset.endswith("imBackground") for dset in dsets]
assert sum(is_bgd) == 0 or sum(is_bgd) == len(
dsets
), "Trap data and cell data can't be mixed"
with h5py.File(self.filename, "r") as f:
return [self.add_name(self.apply_prepost(dset), dset) for dset in dsets]
return self.add_name(df, dsets)
def apply_prepost(self, dataset: str):
merges = self.get_merges()
with h5py.File(self.filename, "r") as f:
df = self.dset_to_df(f, dataset)
merged = df
if merges.any():
# Split in two dfs, one with rows relevant for merging and one without them
mergable_ids = pd.MultiIndex.from_arrays(
np.unique(merges.reshape(-1, 2), axis=0).T,
names=df.index.names,
)
merged = self.apply_merge(df.loc[mergable_ids], merges)
nonmergable_ids = df.index.difference(mergable_ids)
merged = pd.concat(
(merged, df.loc[nonmergable_ids]), names=df.index.names
)
search = lambda a, b: np.where(
np.in1d(
np.ravel_multi_index(a.T, a.max(0) + 1),
np.ravel_multi_index(b.T, a.max(0) + 1),
)
)
if "modifiers/picks" in f:
picks = self.get_picks(names=merged.index.names)
missing_cells = [i for i in picks if tuple(i) not in set(merged.index)]
if picks:
# return merged.loc[
# set(picks).intersection([tuple(x) for x in merged.index])
# ]
return merged.loc[picks]
else:
if isinstance(merged.index, pd.MultiIndex):
empty_lvls = [[] for i in merged.index.names]
index = pd.MultiIndex(
levels=empty_lvls,
codes=empty_lvls,
names=merged.index.names,
)
else:
index = pd.Index([], name=merged.index.name)
merged = pd.DataFrame([], index=index)
return merged
@property
def datasets(self):
with h5py.File(self.filename, "r") as f:
dsets = f.visititems(self._if_ext_or_post)
return dsets
def get_merged(self, dataset):
return self.apply_prepost(dataset, skip_pick=True)
@property
def merges(self):
with h5py.File(self.filename, "r") as f:
dsets = f.visititems(self._if_merges)
return dsets
@property
def n_merges(self):
print("{} merge events".format(len(self.merges)))
@property
def merges(self):
with h5py.File(self.filename, "r") as f:
dsets = f.visititems(self._if_merges)
return dsets
@property
def picks(self):
with h5py.File(self.filename, "r") as f:
dsets = f.visititems(self._if_picks)
return dsets
def apply_merge(self, df, changes):
if len(changes):
for target, source in changes:
df.loc[tuple(target)] = self.join_tracks_pair(
df.loc[tuple(target)], df.loc[tuple(source)]
)
df.drop(tuple(source), inplace=True)
return df
def get_raw(self, dataset):
if isinstance(dataset, str):
with h5py.File(self.filename, "r") as f:
return self.dset_to_df(f, dataset)
elif isinstance(dataset, list):
return [self.get_raw(dset) for dset in dataset]
def get_merges(self):
# fetch merge events going up to the first level
with h5py.File(self.filename, "r") as f:
merges = f.get("modifiers/merges", np.array([]))
if not isinstance(merges, np.ndarray):
merges = merges[()]
return merges
# def get_picks(self, levels):
def get_picks(self, names, path="modifiers/picks/"):
with h5py.File(self.filename, "r") as f:
if path in f:
return list(zip(*[f[path + name] for name in names]))
# return f["modifiers/picks"]
else:
return None
def dset_to_df(self, f, dataset):
dset = f[dataset]
names = copy(self.names)
if not dataset.endswith("imBackground"):
names.append("cell_label")
lbls = {lbl: dset[lbl][()] for lbl in names if lbl in dset.keys()}
index = pd.MultiIndex.from_arrays(
list(lbls.values()), names=names[-len(lbls) :]
)
columns = (
dset["timepoint"][()] if "timepoint" in dset else dset.attrs["columns"]
)
df = pd.DataFrame(dset[("values")][()], index=index, columns=columns)
return df
@staticmethod
def dataset_to_df(f: h5py.File, path: str, mode: str = "h5py"):
if mode is "h5py":
all_indices = ["experiment", "position", "trap", "cell_label"]
indices = {k: f[path][k][()] for k in all_indices if k in f[path].keys()}
return pd.DataFrame(
f[path + "/values"][()],
index=pd.MultiIndex.from_arrays(
list(indices.values()), names=indices.keys()
),
columns=f[path + "/timepoint"][()],
)
@staticmethod
def _if_ext_or_post(name, *args):
flag = False
if name.startswith("extraction") and len(name.split("/")) == 4:
flag = True
elif name.startswith("postprocessing") and len(name.split("/")) == 3:
flag = True
if flag:
print(name)
@staticmethod
def _if_merges(name: str, obj):
if isinstance(obj, h5py.Dataset) and name.startswith("modifiers/merges"):
return obj[()]
@staticmethod
def _if_picks(name: str, obj):
if isinstance(obj, h5py.Group) and name.endswith("picks"):
return obj[()]
@staticmethod
def join_tracks_pair(target, source):
tgt_copy = copy(target)
end = find_1st(target.values[::-1], 0, cmp_larger)
tgt_copy.iloc[-end:] = source.iloc[-end:].values
return tgt_copy
This diff is collapsed.
"""This is an example module to show the structure."""
from typing import Union
class ExampleClass:
"""This is an example class to show the structure."""
def __init__(self, parameter: int):
"""This class takes one parameter and is used to add one to that
parameter.
:param parameter: The parameter for this class
"""
self.parameter = parameter
def add_one(self):
"""Takes the parameter and adds one.
>>> x = ExampleClass(1)
>>> x.add_one()
2
:return: the parameter + 1
"""
return self.parameter + 1
def add_n(self, n: int):
"""Adds n to the class instance's parameter.
For instance
>>> x = ExampleClass(1)
>>> x.add_n(10)
11
:param n: The number to add
:return: the parameter + n
"""
return self.parameter + n
def example_function(parameter: Union[int, str]):
"""This is a factory function for an ExampleClass.
:param parameter: the parameter to give to the example class
:return: An example class
"""
try:
return ExampleClass(int(parameter))
except ValueError as e:
raise ValueError(
f"The parameter {parameter} could not be turned " f"into an integer."
) from e
"""This is an example module to show the structure."""
from typing import Union
import numpy as np
from PIL import Image
class localImageViewer:
"""
This class is used to quickly access position images without tiling
from image.h5 objects.
"""
def __init__(self, h5file):
"""This class takes one parameter and is used to add one to that
parameter.
:param parameter: The parameter for this class
"""
self._hdf = h5py.File(h5file)
self.positions = list(self._hdf.keys())
self.current_position = self.positions[0]
self.parameter = parameter
def plot_position(channel=0, tp=0, z=0, stretch=True):
pixvals = self._hdf[self.current_position][channel, tp, ..., z]
if stretch:
minval = np.percentile(pixvals, 0.5)
maxval = np.percentile(pixvals, 99.5)
pixvals = np.clip(pixvals, minval, maxval)
pixvals = ((pixvals - minval) / (maxval - minval)) * 255
Image.fromarray(pixvals.astype(np.uint8))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment