Skip to content
Snippets Groups Projects
Commit c952e727 authored by Alán Muñoz's avatar Alán Muñoz
Browse files

fix(logfile_parser): add logfile_parser to unifier

parent 1b8c7f23
No related branches found
No related tags found
No related merge requests found
# logfile\_parser
Simple log file parsing according to grammars specified in JSON
## Basic usage
This package comes with three built-in grammars: 'multiDGUI\_acq\_format',
'multiDGUI\_log\_format' and 'cExperiment\_log\_format'. As an example, the
'multiDGUI\_acq\_format' grammar can be used to parse the included example
using:
```python
>>> from logfile_parser import Parser
>>> acq_parser = Parser('multiDGUI_acq_format')
>>> with open('examples/example_multiDGUI_acq.txt', 'r') as f:
... parsed = acq_parser.parse(f)
>>> print(parsed)
```
The parsed output is a `dict` containing any fields satisfying the grammar.
## Defining new grammars
Custom grammars should be written in json as a dictionary with keys specifying
the information to extract from the log file.
The built-in grammars are useful examples or starting points for defining custom
grammars. They can be found in the `logfile_parser/grammars` directory.
Let's start with a basic example of a log file that we might want to parse:
```text
Date: 16 Apr 2020
Microscope: Batgirl
Experiment details:
My lengthy description of what will certainly be a great experiment.
This description takes multiple lines.
Tags:
User name, Project name, Experiment name
```
A basic grammar that just extracts the description of the experiment could be
defined using:
```json
{
"description": {
"trigger_startswith": "Experiment details:"
},
"stop": {
"trigger_startswith": "Tags:",
"type": "stop"
}
}
```
This tells the parser to fill the "description" field of the parsed result with
text on lines *after* that starting with the text "Experiment details:", and
then tells the parser to terminate parsing whenever it encounters a line that
starts with the text "Tags:". If you wanted it to include the trigger line, you
would specify `"skip": "false"` as an additional property for `"description"`.
If we also wanted to fill a "tags" field with the comma separated tags, we would
just need to change the type to "list":
```json
{
"description": {
"trigger_startswith": "Experiment details:"
},
"tags": {
"trigger_startswith": "Tags:",
"type": "list"
}
}
```
To extract the microscope name, we can make use of the "regex" type:
```json
{
"microscope": {
"trigger_startswith": "Microscope:",
"type": "regex",
"regex": "^Microscope:\\s*(.*)$"
}
}
```
The expression found in the bracketed group will be stored in the "microscope"
field of the parsed result.
Finally, to extract a date, we combine a "regex" with a "map" to map the text
to a Python `datetime` object:
```json
{
"date": {
"trigger_startswith": "Date:",
"type": "regex",
"regex": "^.*(\\d{2} [A-Z][a-z]{2} \\d{4})$",
"map": "datetime:%d %b %Y"
}
}
```
Putting this all together gives us the following grammar:
```json
{
"date": {
"trigger_startswith": "Date:",
"type": "regex",
"regex": "^.*(\\d{2} [A-Z][a-z]{2} \\d{4})$",
"map": "datetime:%d %b %Y"
},
"microscope": {
"trigger_startswith": "Microscope:",
"type": "regex",
"regex": "^Microscope:\\s*(.*)$"
},
"description": {
"trigger_startswith": "Experiment details:"
},
"tags": {
"trigger_startswith": "Tags:",
"type": "list"
}
}
```
If this is saved to a file `newgrammar.json` we could parse the log file as
listed above (say it is in `logfile.txt`) using the following:
```python
>>> from logfile_parser import Parser
>>> parser = Parser('newgrammar.json')
>>> with open('logfile.txt', 'r') as f:
... parsed = parser.parse(f)
>>> print(parsed)
{'date': datetime.datetime(2020, 4, 16, 0, 0), 'microscope': 'Batgirl',
'description': 'My lengthy description of what will certainly be a great
experiment.\nThis description takes multiple lines.', 'tags': ['User name',
'Project name', 'Experiment name']}
```
# -*- coding: utf-8 -*-
"""
logfile_parser
~~~~~~~~~~~~
Simple log file parsing according to grammars specified in JSON
:copyright: (c) 2020 by Julian Pietsch.
:license: LGPL
"""
from .logfile_parser import Parser
{
"@@CONFIG@@": {
"regex_preprocessing": ["^\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}\\s*(.*)$"]
},
"extractmethod": {
"trigger_startswith": "extractionParameters:",
"type": "regex",
"regex": "^extractFunction:\\s*(.*)$",
"use_unmatched": true
},
"segmethod": {
"trigger_re": "Start .* segmentation",
"type": "regex",
"regex": "^.*Start (.*) segmentation.*$"
},
"segcomplete": {
"trigger_re": "Successfully completed .* segmentation",
"type": "regex",
"regex": "^(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}).*$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
},
"compiled": {
"trigger_startswith": "Successfully completed compiling cell information",
"type": "regex",
"regex": "^(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}).*$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
}
}
{
"channels": {
"trigger_startswith": "Channels:",
"type": "table",
"column_map": {
"Channel name": ["channel","str"],
"Exposure time": ["exposure","int"],
"Skip": ["skip","int"],
"Z sect.": ["zsect","int"],
"Start time": ["start_time","int"],
"Camera mode": ["camera_mode","int"],
"EM gain": ["em_gain","int"],
"Voltage": ["voltage","float"]
}
},
"zsectioning": {
"trigger_startswith": "Z_sectioning:",
"type": "table",
"column_map": {
"Sections": ["nsections","int"],
"Spacing": ["spacing","float"],
"PFSon?": ["pfson","bool"],
"AnyZ?": ["anyz","bool"],
"Drift": ["drift","int"],
"Method": ["zmethod","int"]
}
},
"time_settings": {
"trigger_startswith": "Time_settings",
"type": "table",
"has_header": false,
"column_map": [
["istimelapse","bool"],
["timeinterval","int"],
["ntimepoints","int"],
["totaltime","int"]
]
},
"positions": {
"trigger_startswith": "Points:",
"type": "table",
"column_map": {
"Position name": ["posname","str"],
"X position": ["xpos","float"],
"Y position": ["ypos","float"],
"Z position": ["zpos","float"],
"PFS offset": ["pfsoffset","float"],
"Group": ["group","int"]
},
"default_map": "int"
},
"npumps": {
"trigger_startswith": "Syringe pump details:",
"type": "regex",
"regex": "^.*:\\s*(\\d+)\\s*pumps\\.*$",
"map": "int"
},
"pumpinit": {
"trigger_startswith": "Pump states at beginning of experiment:",
"type": "table",
"column_map": {
"Pump port": ["pump_port","str"],
"Diameter": ["syringe_diameter","float"],
"Current rate": ["flowrate","float"],
"Direction": ["flowdirection","str"],
"Running": ["isrunning", "bool"],
"Contents": ["contents", "str"]
}
},
"nswitches": {
"trigger_startswith": "Number of pump changes:",
"type": "regex",
"regex": "(\\d+)",
"map": "int"
},
"switchvol": {
"trigger_startswith": "Infuse/withdraw volumes:",
"type": "regex",
"regex": "(\\d+)",
"map": "int"
},
"switchrate": {
"trigger_startswith": "Infuse/withdraw rates:",
"type": "regex",
"regex": "(\\d+)",
"map": "int"
},
"switchtimes": {
"trigger_startswith": "Times:",
"type": "list",
"map": "int"
},
"switchtopump": {
"trigger_startswith": "Switched to:",
"type": "list",
"map": "int"
},
"switchfrompump": {
"trigger_startswith": "Switched from:",
"type": "list",
"map": "int"
},
"pumprate": {
"trigger_startswith": "Flow post switch:",
"type": "lists",
"map": "float"
}
}
{
"date": {
"trigger_re": "^\\d{2}-[A-Z][a-z]{2}-\\d{4}$",
"type": "regex",
"regex": "^\\d{2}-[A-Z][a-z]{2}-\\d{4}$",
"map": "datetime:%d-%b-%Y"
},
"multiDGUI_commit": {
"trigger_startswith": "Swain lab microscope control software",
"type": "regex",
"regex": "^.*commit number:([0-9a-z]+)$",
"next_section": "date"
},
"microscope": {
"trigger_startswith": "Microscope name is:",
"type": "regex",
"regex": "^Microscope name is:\\s+(.*)$"
},
"acqfile": {
"trigger_startswith": "Acquisition settings are saved in:",
"type": "regex",
"regex": "^(.*)$",
"skip": true
},
"details": {
"trigger_startswith": "Experiment details:"
},
"setup": {
"trigger_startswith": "Microscope setup for used channels:"
},
"omero_project": {
"trigger_startswith": "Omero project:",
"type": "regex",
"regex": "^(.*)$",
"skip": true
},
"omero_tags": {
"trigger_startswith": "Omero tags:",
"type": "list"
},
"omero_tags_stop": {
"trigger_startswith": "PFS is locked"
},
"omero_tag_descriptions": {
"trigger_startswith": "Omero tag descriptions:",
"type": "list"
},
"expt_start": {
"trigger_startswith": "Experiment started at:",
"type": "regex",
"regex": "^.*at:(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2})$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
},
"first_capture": {
"trigger_startswith": "------Time point_1",
"type": "regex",
"regex": "^Channel:.*set at:(\\d{2}-[A-Z][a-z]{2}-\\d{4} \\d{2}:\\d{2}:\\d{2})$",
"map": "datetime:%d-%b-%Y %H:%M:%S"
},
"stop": {
"trigger_startswith": "------Time point_2",
"type": "stop"
}
}
# -*- coding: utf-8 -*-
"""
logfile_parser
~~~~~~~~~~~~
Simple log file parsing according to grammars specified in JSON
:copyright: (c) 2020 by Julian Pietsch.
:license: LGPL
"""
import json
import pkgutil
import re
import typing as t
from datetime import datetime
from os.path import dirname, exists, join
CONFIG_KEY = "@@CONFIG@@"
DEFAULT_NOSKIP = {"regex", "regexs", "list", "lists"}
DEFAULT_NOT_USE_UNMATCHED = {"regex", "regexs"}
class GrammarNotFound(OSError):
pass
class ParseError(Exception):
pass
class Parser(object):
def __init__(self, grammar_filename):
"""Create a Parser object based on the grammar defined in a file
:param grammar_filename: path to json file specifying grammar for this
parser, or one of the default grammars included with the package
"""
if exists(grammar_filename):
with open(grammar_filename, "r") as f:
self.grammar = json.load(f)
else:
if not grammar_filename.endswith(".json"):
grammar_filename = grammar_filename + ".json"
try:
grammar_fd = pkgutil.get_data(
__package__, "grammars/" + grammar_filename
)
except FileNotFoundError as e:
raise GrammarNotFound(
"{}:specified grammar could not be found:".format(e)
)
self.grammar = json.loads(grammar_fd)
self._config = self.grammar.get(CONFIG_KEY, {})
if CONFIG_KEY in self.grammar:
del self.grammar[CONFIG_KEY]
# Preprocessing to be applied to each line before checking triggers
self._preprocessing = self._config.get("regex_preprocessing", [])
self._preprocessing = [re.compile(r) for r in self._preprocessing]
self._triggers = {
trigger_type: [
(k, v[f"trigger_{trigger_type}"])
if trigger_type != "re"
else (k, re.compile(v[f"trigger_{trigger_type}"]))
for k, v in self.grammar.items()
if f"trigger_{trigger_type}" in v
]
for trigger_type in ("startswith", "endswith", "contains", "re")
}
def _set_section(self, k=None):
if k in self.grammar:
self._active_section = self.grammar[k]
self._section_name = k
self._section_type = self._active_section.get("type")
else:
self._active_section = None
self._section_name = ""
self._section_type = None
def parse(self, filehandle):
"""Parse contents of file according to the loaded grammar
:param filehandle: a line generator, e.g., a valid file handle
"""
self._set_section()
table_header = []
column_types = []
output = {}
for line in filehandle:
line = line.strip()
if len(line) == 0:
# skip blank lines
continue
line_pp = [r.findall(line) for r in self._preprocessing]
line_pp = [m[0].strip() for m in line_pp if len(m) == 1]
line_unmatched = line_pp[0] if len(line_pp) == 1 else line
line_pp += [line]
trigger_check_methods = {
k: lam
for k, lam in zip(
self._triggers.keys(),
(
lambda x, t: x.startswith(t),
lambda x, t: x.endswith(t),
lambda x, t: x.find(t),
lambda x, re: re.findall(x),
),
)
}
matches = {
trigger: [
(k, trig_str)
for k, trig_str in self._triggers[trigger]
if any(
[
trigger_check_methods[trigger](line, trig_str)
for line in line_pp
]
)
]
for trigger, method in trigger_check_methods.items()
}
section_match = {
k
for trigger_matches in matches.values()
for k, _ in trigger_matches
}
# if len(section_match) > 1:
assert len(section_match) <= 1, ParseError(
"conflicting sections triggered"
)
if len(section_match) == 1:
# Update the active section
self._set_section(list(section_match)[0])
# Determine the unmatched part of the line
line_unmatched = self.determine_unmatched_part(
matches, line_pp
)
# Skip the matched line if requested
if self._active_section.get(
"skip", self._section_type not in DEFAULT_NOSKIP
):
continue
if self._active_section is None:
continue
active_section = self._active_section
section_type = self._section_type
section_name = self._section_name
if active_section.get(
"use_unmatched",
self._section_type not in DEFAULT_NOT_USE_UNMATCHED,
):
line = line_unmatched.strip()
if len(line) == 0:
continue
if section_type == "table":
sep = active_section.get("separator", ",")
row = line.split(sep)
if section_name not in output:
# Table needs initialisation
(
has_header,
row,
table_header,
column_types,
) = self._parse_table(active_section, row)
output[section_name] = {k: [] for k in table_header}
if active_section.get("has_header", True):
continue
if len(row) < len(table_header):
# skip lines that have fewer columns than expected
continue
# Merge extra columns into final column
row = self._table_merge_extra_columns(
table_header, sep, row, column_types
)
# Fill out current row
for val, colname, coltype in zip(
row, table_header, column_types
):
output[section_name][colname].append(
_map_to_type(val.strip(), coltype)
)
elif section_type in {"list", "lists"}:
sep = active_section.get("separator", ",")
output[section_name] = output.get(section_name, [])
map_type = active_section.get("map")
next_list = [
_map_to_type(el.strip(), map_type)
for el in line.split(sep)
]
list_to_append = (
[next_list] if section_type == "lists" else next_list
)
output[section_name] += list_to_append
elif section_type in {"regex", "regexs"}:
regex = active_section.get("regex", "^(.*)$")
map_type = active_section.get("map")
matches = re.findall(regex, line)
if len(matches) == 0:
continue
elif len(matches) == 1 and section_type == "regex":
output[section_name] = _map_to_type(matches[0], map_type)
else:
output[section_name] = output.get(section_name, [])
output[section_name] += [
_map_to_type(m, map_type) for m in matches
]
# Terminate after finding the first match
self._terminate_after_first_match(active_section, section_type)
elif section_type == "stop":
break
else:
# By default, just append additional lines as text
new_str = (
f"{output[section_name]}\n{line}"
if section_name in output
else line
)
output[section_name] = new_str
return output
@staticmethod
def determine_unmatched_part(
matches: t.Dict[str, t.List], line_pp: t.List[str]
):
if matches["startswith"]:
_, t = matches["startswith"][0]
line_unmatched = [
line[len(t) :] for line in line_pp if line.startswith(t)
][0]
elif matches["endswith"]:
_, t = matches["endwith"][0]
line_unmatched = [
line[: -(len(t) + 1)] for line in line_pp if line.endswith(t)
][0]
elif matches["contains"]:
_, t = matches["contains"][0]
lpp = [line for line in line_pp if line.find(t) >= 0][0]
i = lpp.find(t)
line_unmatched = lpp[:i] + lpp[(i + len(t)) :]
elif matches["re"]:
_, r = matches["re"][0]
line_unmatched = [
r.sub("", line) for line in line_pp if len(r.findall(line)) > 0
][0]
return line_unmatched
def _terminate_after_first_match(self, active_section, section_type):
# Terminate after finding the first match
if section_type == "regex":
next_section = active_section.get("next_section")
self._set_section(next_section)
return next_section
@staticmethod
def _parse_table(active_section, row):
has_header = active_section.get("has_header", True)
if has_header:
row = [col.strip() for col in row]
default_type = active_section.get("default_map", "str")
colmap = active_section.get("column_map", len(row) * [(None, None)])
if type(colmap) == list:
# Columns are defined in order
if has_header:
table_header = [mn or rn for rn, (mn, _) in zip(row, colmap)]
table_header += row[len(colmap) :]
column_types = [mt for _, mt in colmap]
column_types += (len(row) - len(colmap)) * [default_type]
else:
table_header = [
mn or "column{:02d}".format(i + 1)
for i, (mn, _) in enumerate(colmap)
]
column_types = [mt or default_type for _, mt in colmap]
elif type(colmap) == dict:
if not has_header:
raise ParseError("dict column maps must have a header")
# First row is a header
table_header = [colmap.get(rn, (rn, None))[0] for rn in row]
column_types = [
colmap.get(rn, (None, default_type))[1] for rn in row
]
else:
raise ParseError("badly formatted column map")
return has_header, row, table_header, column_types
@staticmethod
def _table_merge_extra_columns(table_header, sep, row, column_types):
# Merge extra columns into final column
ncol = len(table_header)
if len(row) > ncol:
row[ncol - 1] = sep.join(row[ncol - 1 :])
del row[ncol:]
assert len(row) == len(table_header) and len(row) == len(column_types)
return row
def _map_to_type(val, map_type):
if map_type and map_type.startswith("datetime"):
date_format = "%Y-%m-%dT%H:%M:%S.%fZ" # ISO 8601 format
if map_type.startswith("datetime:"):
date_format = map_type[9:]
try:
return datetime.strptime(val, date_format)
except ValueError:
return None
else:
try:
return {"str": str, "int": int, "float": float, "bool": bool}.get(
map_type, str
)(val)
except ValueError or TypeError:
return {"float": float("nan")}.get(map_type)
...@@ -8,6 +8,7 @@ packages = [ ...@@ -8,6 +8,7 @@ packages = [
{ include = "extraction" }, { include = "extraction" },
{ include = "agora" }, { include = "agora" },
{ include = "postprocessor" }, { include = "postprocessor" },
{ include = "logfile_parser" },
] ]
readme = "README.md" readme = "README.md"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment