Skip to content
Snippets Groups Projects
Commit 88742ce3 authored by Arin Wongprommoon's avatar Arin Wongprommoon
Browse files

[WIP] fix!(agora): pipeline handles experiments with new logfile format

WHY IS THIS CHANGE NEEDED?:
- aliby pipeline is not configured to deal with new logfile format
  produced by MultiDPy

HOW DOES THE CHANGE SOLVE THE PROBLEM?:
- this is a quick-and-dirty fix (basically, grafting a
  temporary/workaround solution) so that the relevant experiment can be
  segmented before a deadline
- put the parser in
https://git.ecdf.ed.ac.uk/swain-lab/aliby/skeletons/-/blob/cdf3699d8c802ea908c18197fe84d7e25f2e41ce/scripts/dev/slim_newlogfile.py
  into a new module.
- wrote parse_newlogfiles() in agora.io.metadata based on
  parse_logfiles() to call the parser for the new logfile
- redefined load_logs() in aliby.pipeline to use parse_newlogfiles()

WHAT SIDE EFFECTS DOES THIS CHANGE HAVE?:
- breaks: pipeline now unable to parse original logfiles
- tech debt: grafting a solution written without consideration of the
  existing design of metadata.  no coherence of how the grammar &
  functions are written.  ideally, we should have a set of grammar JSON
  files that the user can choose for a parser.
- bkwds compat: user must specify tiler parameters when running
  segmentation, i.e.

    params = PipelineParameters.default(general={ ..... })
    params.tiler = {"tile_size": 117, "ref_channel": "brightfield1", "ref_z": 0}
    p = Pipeline(params)
    p.run()

  this is because the new logfile format uses a different name for the
  reference channel, which is usually 'Brightfield'

  overall, this is intended to be a temporary solution

EVIDENCE THAT COMMIT WORKS:
- tested by running skeletons/scripts/essential/run.py on experiment
  staffa:470, which uses the new logfile format
- this commit doesn't actually solve all problems -- now encountering a
  ValueError that aborts segmentation (see references below). at least i
  think it solves all salient problems that i think are directly caused
  by a different logfile format

REFERENCES:
- issue #38
- the ValueError:
  aliby#38 (comment 113221)
parent 41c7bf0f
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ from pytz import timezone
from agora.io.writer import Writer
from logfile_parser import Parser
from logfile_parser.newlogfile_parser import parse_from_grammar, grammar
class MetaData:
......@@ -114,3 +115,32 @@ def parse_logfiles(
parsed_flattened[k] = [0 if el is None else el for el in v]
return parsed_flattened
def parse_newlogfiles(
root_dir,
):
"""
Parse log files depending on the new grammar, then merge into
single dict.
"""
# log_parser = Parser(log_grammar)
try:
log_file = find_file(root_dir, "*log.txt")
print(f"Log file (new format) is called {log_file}")
except FileNotFoundError:
raise ValueError("Experiment log file not found.")
log_parsed = parse_from_grammar(log_file, grammar)
parsed = {**log_parsed}
for key, value in parsed.items():
if isinstance(value, datetime):
parsed[key] = datetime_to_timestamp(value)
parsed_flattened = flatten_dict(parsed)
for k, v in parsed_flattened.items():
if isinstance(v, list):
parsed_flattened[k] = [0 if el is None else el for el in v]
return parsed_flattened
......@@ -19,7 +19,7 @@ from pathos.multiprocessing import Pool
from tqdm import tqdm
from agora.abc import ParametersABC, ProcessABC
from agora.io.metadata import MetaData, parse_logfiles
from agora.io.metadata import MetaData, parse_logfiles, parse_newlogfiles
from agora.io.reader import StateReader
from agora.io.signal import Signal
from agora.io.writer import ( # BabyWriter,
......@@ -148,7 +148,8 @@ class PipelineParameters(ParametersABC):
return cls(**{k: v for k, v in defaults.items()})
def load_logs(self):
parsed_flattened = parse_logfiles(self.log_dir)
# parsed_flattened = parse_logfiles(self.log_dir)
parsed_flattened = parse_newlogfiles(self.log_dir)
return parsed_flattened
......
#!/usr/bin/env python3
"""
Description of new logfile:
All three conditions are concatenated in a single file, in this order:
- Experiment basic information (URL in acquisition PC, project, user input)
- Acquisition settings
- Experiment start
The section separators are:
-----Acquisition settings-----
-----Experiment started-----
And for a successfully finished experiment we get:
YYYY-MM-DD HH:mm:ss,ms*3 Image acquisition complete WeekDay Mon Day HH:mm:ss,ms*3 YYYY
For example:
2022-09-30 05:40:59,765 Image acquisition complete Fri Sep 30 05:40:59 2022
Data to extract:
* Basic information
- Experiment details, which may indicate technical issues
- GIT commit
- (Not working as of 2022/10/03, but projects and tags)
* Basic information
-
New grammar
- Tables are assumed to end with an empty line.
"""
import typing as t
from pathlib import PosixPath
import pandas as pd
from pyparsing import (
CharsNotIn,
Combine,
Group,
Keyword,
LineEnd,
LineStart,
Literal,
OneOrMore,
ParserElement,
Word,
printables,
)
atomic = t.Union[str, int, float, bool]
class HeaderEndNotFound(Exception):
def __init__(self, message, errors):
super().__init__(message)
self.errors = errors
def extract_header(filepath: PosixPath):
# header_contents = ""
with open(filepath, "r") as f:
try:
header = ""
for _ in range(MAX_NLINES):
line = f.readline()
header += line
if HEADER_END in line:
break
except HeaderEndNotFound as e:
print(f"{MAX_NLINES} checked and no header found")
raise (e)
return header
def parse_table(
string: str,
start_trigger: t.Union[str, Keyword],
) -> pd.DataFrame:
"""Parse csv-like table
Parameters
----------
string : str
contents to parse
start_trigger : t.Union[str, t.Collection]
string or triggers that indicate section start.
Returns
-------
pd.Dataframe or dict of atomic values (int,str,bool,float)
DataFrame representing table.
Examples
--------
>>> table = parse_table()
"""
if isinstance(start_trigger, str):
start_trigger: Keyword = Keyword(start_trigger)
EOL = LineEnd().suppress()
field = OneOrMore(CharsNotIn(":,\n"))
line = LineStart() + Group(
OneOrMore(field + Literal(",").suppress()) + field + EOL
)
parser = (
start_trigger
+ EOL
+ Group(OneOrMore(line))
+ EOL # end_trigger.suppress()
)
parser_result = parser.search_string(string)
assert all(
[len(row) == len(parser_result[0]) for row in parser_result]
), f"Table {start_trigger} has unequal number of columns"
assert len(parser_result), f"Parsing is empty. {parser}"
return table_to_df(parser_result.as_list())
def parse_fields(
string: str, start_trigger, end_trigger=None
) -> t.Union[pd.DataFrame, t.Dict[str, atomic]]:
"""
Fields are parsed as key: value
By default the end is an empty newline.
For example
group: YST_1510 field: time
start: 0
interval: 300
frames: 180
"""
EOL = LineEnd().suppress()
if end_trigger is None:
end_trigger = EOL
elif isinstance(end_trigger, str):
end_trigger = Literal(end_trigger)
field = OneOrMore(CharsNotIn(":\n"))
line = (
LineStart()
+ Group(field + Combine(OneOrMore(Literal(":").suppress() + field)))
+ EOL
)
parser = (
start_trigger + EOL + Group(OneOrMore(line)) + end_trigger.suppress()
)
parser_result = parser.search_string(string)
return fields_to_dict_or_table(parser_result.as_list())
# Grammar specification
grammar = {
"general": {
"start_trigger": Literal("Swain Lab microscope experiment log file"),
"type": "fields",
"end_trigger": "-----Acquisition settings-----",
},
"image_config": {
"start_trigger": "Image Configs:",
"type": "table",
},
"device_properties": {
"start_trigger": "Device properties:",
"type": "table",
},
"group": {
"position": {
"start_trigger": Group(
Group(Literal("group:") + Word(printables))
+ Group(Literal("field:") + "position")
),
"type": "table",
},
**{
key: {
"start_trigger": Group(
Group(Literal("group:") + Word(printables))
+ Group(Literal("field:") + key)
),
"type": "fields",
}
for key in ("time", "config")
},
},
}
ACQ_START = "-----Acquisition settings-----"
HEADER_END = "-----Experiment started-----"
MAX_NLINES = 2000 # In case of malformed logfile
# test_file = "/home/alan/Downloads/pH_med_to_low.log"
# test_file = "/home/alan/Documents/dev/skeletons/scripts/dev/C1_60x.log"
ParserElement.setDefaultWhitespaceChars(" \t")
# time_fields = parse_field(acq, start_trigger=grammar["group"]["time"]["start_trigger"])
# config_fields = parse_fields(
# acq, start_trigger=grammar["group"]["config"]["start_trigger"]
# )
# general_fields = parse_fields(basic, start_trigger=grammar["general"]["start_trigger"])
def parse_from_grammar(filepath: str, grammar: t.Dict):
header = extract_header(filepath)
d = {}
for key, values in grammar.items():
try:
if "type" in values:
d[key] = parse_x(header, **values)
else: # Use subkeys to parse groups
for subkey, subvalues in values.items():
if subkey == "position":
print("stop")
subkey = "_".join((key, subkey))
d[subkey] = parse_x(header, **subvalues)
except Exception as e:
print(f"Parsing failed for key {key}")
raise (e)
return d
def table_to_df(result: t.List[t.List]):
if len(result) > 1: # Multiple tables with ids to append
# Generate multiindex from "Name column"
# index = [row[1][0][1] for table in result for row in table]
# table[1][0].index("Name") # for automatic indexing
from itertools import product
group_name = [
product((table[0][0][1],), (row[0] for row in table[1][1:]))
for table in result
]
tmp = [pair for pairset in group_name for pair in pairset]
multiindices = pd.MultiIndex.from_tuples(tmp)
df = pd.DataFrame(
[row for pr in result for row in pr[1][1:]],
columns=result[0][1][0],
index=multiindices,
)
df.name = result[0][0][1][1]
else: # If it is a single table
df = pd.DataFrame(result[0][1][1:], columns=result[0][1][0])
return df
def fields_to_dict_or_table(result: t.List[t.List]):
if len(result) > 1:
formatted = pd.DataFrame(
[[row[1] for row in pr[1]] for pr in result],
columns=[x[0] for x in result[0][1]],
index=[x[0][0][1] for x in result],
)
formatted.name = result[0][0][1][1]
else: # If it is a single table
formatted = {k: _cast_type(v) for k, v in dict(result[0][1]).items()}
return formatted
def _cast_type(x: str) -> t.Union[str, int, float, bool]:
# Convert to any possible when possible
x = x.strip()
if x.isdigit():
x = int(x)
else:
try:
x = float(x)
except:
try:
x = ("false", "true").index(x.lower())
except:
pass
return x
def parse_x(string: str, type: str, **kwargs):
# return eval(f"parse_{type}({string}, **{kwargs})")
return eval(f"parse_{type}(string, **kwargs)")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment