From 6a588492ed4f19937170e21fafc8fd8dc7e95d32 Mon Sep 17 00:00:00 2001 From: pswain <peter.swain@ed.ac.uk> Date: Thu, 18 Apr 2024 18:06:06 +0100 Subject: [PATCH] new branch --- src/aliby/pipeline.py | 2 +- src/logfile_parser/simple_parser.py | 286 ++++++++++++++++++++++++++ src/logfile_parser/swainlab_parser.py | 4 +- 3 files changed, 289 insertions(+), 3 deletions(-) create mode 100644 src/logfile_parser/simple_parser.py diff --git a/src/aliby/pipeline.py b/src/aliby/pipeline.py index d54c269..ef1e8a8 100644 --- a/src/aliby/pipeline.py +++ b/src/aliby/pipeline.py @@ -204,7 +204,7 @@ class Pipeline(ProcessABC): logger.addHandler(fh) def setup(self): - """Get meta data and identify each position.""" + """Copy logfile and identify each position.""" config = self.parameters.to_dict() # print configuration self.log("Using alibylite.", "info") diff --git a/src/logfile_parser/simple_parser.py b/src/logfile_parser/simple_parser.py new file mode 100644 index 0000000..b2f29b3 --- /dev/null +++ b/src/logfile_parser/simple_parser.py @@ -0,0 +1,286 @@ +#!/usr/bin/env jupyter +# TODO should this be merged to the regular logfile_parser structure? +""" +Description of new logfile: + +All three conditions are concatenated in a single file, in this order: + - Experiment basic information (URL in acquisition PC, project, user input) + - Acquisition settings + - Experiment start + +The section separators are: +-----Acquisition settings----- +-----Experiment started----- + +And for a successfully finished experiment we get: + +YYYY-MM-DD HH:mm:ss,ms*3 Image acquisition complete WeekDay Mon Day HH:mm:ss,ms*3 YYYY + +For example: +2022-09-30 05:40:59,765 Image acquisition complete Fri Sep 30 05:40:59 2022 + +Data to extract: +* Basic information + - Experiment details, which may indicate technical issues + - GIT commit + - (Not working as of 2022/10/03, but projects and tags) +* Basic information + - + +{'channels_by_group': {'PDR5_GFP': ['Brightfield', 'GFP', 'cy5', 'mCherry'], +'Ilv3_mCherry': ['Brightfield', 'GFP', 'cy5', 'mCherry'], ' +Yor1_GFP': ['Brightfield', 'GFP', 'cy5', 'mCherry'], +'Snq2_GFP': ['Brightfield', 'GFP', 'cy5', 'mCherry'], +'Pdr5_mCherry_pdr1_pdr3': ['Brightfield', 'GFP', 'cy5', 'mCherry']}, +'channels': ['Brightfield', 'GFP', 'cy5', 'mCherry'], +'time_settings/ntimepoints': 240, +'time_settings/timeinterval': 300} + + +New grammar + +- Tables are assumed to end with an empty line. +""" + +import logging +import typing as t +from pathlib import Path + +import pandas as pd +from pyparsing import ( + CharsNotIn, + Combine, + Group, + Keyword, + LineEnd, + LineStart, + Literal, + OneOrMore, + ParserElement, + Word, + printables, +) + +atomic = t.Union[str, int, float, bool] + +# specify grammar for the Swain lab +sl_grammar = { + "group": { + "position": { + "start_trigger": Group( + Group(Literal("group:") + Word(printables)) + + Group(Literal("field:") + "position") + ), + "data_type": "table", + }, + **{ + key: { + "start_trigger": Group( + Group(Literal("group:") + Word(printables)) + + Group(Literal("field:") + key) + ), + "data_type": "fields", + } + for key in ("time", "config") + }, + }, +} + +ACQ_START = "-----Acquisition settings-----" +HEADER_END = "-----Experiment started-----" +MAX_NLINES = 2000 # In case of malformed logfile + +ParserElement.setDefaultWhitespaceChars(" \t") + + +class HeaderEndNotFound(Exception): + def __init__(self, message, errors): + super().__init__(message) + self.errors = errors + + +def extract_header(filepath: Path): + """Extract content of log file before the experiment starts.""" + with open(filepath, "r", errors="ignore", encoding="unicode_escape") as f: + try: + header = "" + for _ in range(MAX_NLINES): + line = f.readline() + if ":" in line: + header += line + if HEADER_END in line: + break + except HeaderEndNotFound as e: + print(f"{MAX_NLINES} checked and no header found.") + raise (e) + return header + + +def parse_from_swainlab_grammar(filepath: t.Union[str, Path]): + """Parse using a grammar for the Swain lab.""" + return parse_from_grammar(filepath, sl_grammar) + + +def parse_from_grammar(filepath: str, grammar: t.Dict): + """Parse a file using the specified grammar.""" + header = extract_header(filepath) + d = {} + for key, values in grammar.items(): + try: + if "data_type" in values: + # data_type for parse_x defined in values + d[key] = parse_x(header, **values) + else: + # use sub keys to parse groups + for subkey, subvalues in values.items(): + subkey = "_".join((key, subkey)) + d[subkey] = parse_x(header, **subvalues) + except Exception as e: + logging.getLogger("aliby").critical( + f"Parsing failed for key {key} and values {values}." + ) + raise (e) + return d + + +def parse_x(string, data_type, **kwargs): + """Parse a string using a function specifed by data_type.""" + res_dict = eval(f"parse_{data_type}(string, **kwargs)") + return res_dict + + +def parse_fields( + string: str, start_trigger, end_trigger=None +) -> t.Union[pd.DataFrame, t.Dict[str, atomic]]: + """ + Parse fields as key-value pairs. + + By default the end is an empty newline. + + For example + + group: YST_1510 field: time + start: 0 + interval: 300 + frames: 180 + """ + EOL = LineEnd().suppress() + if end_trigger is None: + end_trigger = EOL + elif isinstance(end_trigger, str): + end_trigger = Literal(end_trigger) + field = OneOrMore(CharsNotIn(":\n")) + line = ( + LineStart() + + Group(field + Combine(OneOrMore(Literal(":").suppress() + field))) + + EOL + ) + parser = ( + start_trigger + EOL + Group(OneOrMore(line)) + end_trigger.suppress() + ) + parser_result = parser.search_string(string) + breakpoint() + results = parser_result.as_list() + assert len(results), "Parsing returned nothing" + return fields_to_dict_or_table(results) + + +def parse_table( + string: str, + start_trigger: t.Union[str, Keyword], +) -> pd.DataFrame: + """ + Parse csv-like table. + + Parameters + ---------- + string : str + contents to parse + start_trigger : t.Union[str, t.Collection] + string or triggers that indicate section start. + + Returns + ------- + pd.Dataframe or dict of atomic values (int,str,bool,float) + DataFrame representing table. + + Examples + -------- + >>> table = parse_table() + """ + if isinstance(start_trigger, str): + start_trigger: Keyword = Keyword(start_trigger) + EOL = LineEnd().suppress() + field = OneOrMore(CharsNotIn(":,\n")) + line = LineStart() + Group( + OneOrMore(field + Literal(",").suppress()) + field + EOL + ) + parser = ( + start_trigger + + EOL + + Group(OneOrMore(line)) + + EOL # end_trigger.suppress() + ) + parser_result = parser.search_string(string) + assert all( + [len(row) == len(parser_result[0]) for row in parser_result] + ), f"Table {start_trigger} has unequal number of columns" + assert len(parser_result), f"Parsing is empty. {parser}" + return table_to_df(parser_result.as_list()) + + +def table_to_df(result: t.List[t.List]): + if len(result) > 1: # Multiple tables with ids to append + # Generate multiindex from "Name column" + # index = [row[1][0][1] for table in result for row in table] + # table[1][0].index("Name") # for automatic indexing + from itertools import product + + group_name = [ + product((table[0][0][1],), (row[0] for row in table[1][1:])) + for table in result + ] + tmp = [pair for pairset in group_name for pair in pairset] + multiindices = pd.MultiIndex.from_tuples(tmp) + df = pd.DataFrame( + [row for pr in result for row in pr[1][1:]], + columns=result[0][1][0], + index=multiindices, + ) + df.name = result[0][0][1][1] + else: # If it is a single table + df = pd.DataFrame(result[0][1][1:], columns=result[0][1][0]) + + return df + + +def fields_to_dict_or_table(result: t.List[t.List]): + if len(result) > 1: + formatted = pd.DataFrame( + [[row[1] for row in pr[1]] for pr in result], + columns=[x[0] for x in result[0][1]], + index=[x[0][0][1] for x in result], + ) + + formatted.name = result[0][0][1][1] + + else: # If it is a single table + formatted = {k: _cast_type(v) for k, v in dict(result[0][1]).items()} + + return formatted + + +def _cast_type(x: str) -> t.Union[str, int, float, bool]: + # Convert to any possible when possible + x = x.strip() + if x.isdigit(): + x = int(x) + else: + try: + x = float(x) + except: + try: + x = ("false", "true").index(x.lower()) + except: + pass + return x diff --git a/src/logfile_parser/swainlab_parser.py b/src/logfile_parser/swainlab_parser.py index f82345a..67e3433 100644 --- a/src/logfile_parser/swainlab_parser.py +++ b/src/logfile_parser/swainlab_parser.py @@ -178,8 +178,8 @@ def parse_fields( + EOL ) parser = ( - start_trigger + EOL + Group(OneOrMore(line)) - ) # + end_trigger.suppress() + start_trigger + EOL + Group(OneOrMore(line)) + end_trigger.suppress() + ) parser_result = parser.search_string(string) results = parser_result.as_list() assert len(results), "Parsing returned nothing" -- GitLab