From 895d658aafbeaa25c4275195ef8cde4b70757ad2 Mon Sep 17 00:00:00 2001
From: pswain <peter.swain@ed.ac.uk>
Date: Thu, 18 Apr 2024 19:02:35 +0100
Subject: [PATCH] change(swainlab_parser): conditions on reading header

---
 src/agora/io/metadata.py              | 234 +++++++++------------
 src/aliby/pipeline.py                 |   5 +-
 src/logfile_parser/simple_parser.py   | 286 --------------------------
 src/logfile_parser/swainlab_parser.py | 130 ++++++------
 4 files changed, 156 insertions(+), 499 deletions(-)
 delete mode 100644 src/logfile_parser/simple_parser.py

diff --git a/src/agora/io/metadata.py b/src/agora/io/metadata.py
index bed04c4..8f152b2 100644
--- a/src/agora/io/metadata.py
+++ b/src/agora/io/metadata.py
@@ -46,7 +46,7 @@ class MetaData:
 
     def load_logs(self):
         """Load log using a hierarchy of parsers."""
-        parsed_flattened = dispatch_metadata_parser(self.log_dir)
+        parsed_flattened = parse_metadata(self.log_dir)
         return parsed_flattened
 
     def run(self, overwrite=False):
@@ -70,23 +70,36 @@ class MetaData:
             self.add_field(field, value)
 
 
-def flatten_dict(nested_dict, separator="/"):
-    """
-    Flatten nested dictionary because h5 attributes cannot be dicts.
-
-    If empty return as-is.
+def parse_metadata(filedir: t.Union[str, Path]):
     """
-    flattened = {}
-    if nested_dict:
-        df = pd.json_normalize(nested_dict, sep=separator)
-        flattened = df.to_dict(orient="records")[0] or {}
-    return flattened
+    Dispatch different metadata parsers that convert logfiles into a dictionary.
 
+    Currently only contains the swainlab log parsers.
 
-def datetime_to_timestamp(time, locale="Europe/London"):
-    """Convert datetime object to UNIX timestamp."""
-    # h5 attributes do not support datetime objects
-    return timezone(locale).localize(time).timestamp()
+    Parameters
+    --------
+    filepath: str
+        File containing metadata or folder containing naming conventions.
+    """
+    filedir = Path(filedir)
+    if filedir.is_file() or str(filedir).endswith(".zarr"):
+        # log file is in parent directory
+        filedir = filedir.parent
+    filepath = find_file(filedir, "*.log")
+    if filepath:
+        # new log files ending in .log
+        raw_parse = parse_from_swainlab_grammar(filepath)
+        minimal_meta = get_minimal_meta_swainlab(raw_parse)
+    else:
+        # legacy log files ending in .txt
+        legacy_parse = parse_legacy_logfiles(filedir)
+        minimal_meta = (
+            get_meta_from_legacy(legacy_parse) if legacy_parse else {}
+        )
+    if minimal_meta is None:
+        raise Exception("No metadata found.")
+    else:
+        return minimal_meta
 
 
 def find_file(root_dir, regex):
@@ -109,39 +122,31 @@ def find_file(root_dir, regex):
         return file[0]
 
 
-def parse_logfiles(
-    root_dir,
-    acq_grammar="multiDGUI_acq_format.json",
-    log_grammar="multiDGUI_log_format.json",
-):
+def get_minimal_meta_swainlab(parsed_metadata: dict):
     """
-    Parse acq and log files using the grammar specified.
+    Extract channels from parsed metadata.
 
-    Merge results into a single dict.
+    Parameters
+    --------
+    parsed_metadata: dict[str, str or int or DataFrame or Dict]
+        default['general', 'image_config', 'device_properties',
+                'group_position', 'group_time', 'group_config']
+
+    Returns
+    --------
+    Dict with channels metadata
     """
-    log_parser = Parser(log_grammar)
-    acq_parser = Parser(acq_grammar)
-    log_file = find_file(root_dir, "*log.txt")
-    acq_file = find_file(root_dir, "*[Aa]cq.txt")
-    # parse into a single dict
-    parsed = {}
-    if log_file and acq_file:
-        with open(log_file, "r") as f:
-            log_parsed = log_parser.parse(f)
-        with open(acq_file, "r") as f:
-            acq_parsed = acq_parser.parse(f)
-        parsed = {**acq_parsed, **log_parsed}
-    # convert data to having time stamps
-    for key, value in parsed.items():
-        if isinstance(value, datetime):
-            parsed[key] = datetime_to_timestamp(value)
-    # flatten dict
-    parsed_flattened = flatten_dict(parsed)
-    for k, v in parsed_flattened.items():
-        if isinstance(v, list):
-            # replace None with 0
-            parsed_flattened[k] = [0 if el is None else el for el in v]
-    return parsed_flattened
+    channels_dict = find_channels_by_position(parsed_metadata["group_config"])
+    channels = parsed_metadata["image_config"]["Image config"].values.tolist()
+    ntps = parsed_metadata["group_time"]["frames"].max()
+    timeinterval = parsed_metadata["group_time"]["interval"].min()
+    minimal_meta = {
+        "channels_by_group": channels_dict,
+        "channels": channels,
+        "time_settings/ntimepoints": int(ntps),
+        "time_settings/timeinterval": int(timeinterval),
+    }
+    return minimal_meta
 
 
 def find_channels_by_position(meta):
@@ -157,7 +162,7 @@ def find_channels_by_position(meta):
             for channel in imaging_channels:
                 if meta.loc[group, channel] is not None:
                     channels_dict[group].append(channel)
-    elif isinstance(meta, dict):
+    elif isinstance(meta, dict) and "positions/posname" in meta:
         channels_dict = {
             position_name: [] for position_name in meta["positions/posname"]
         }
@@ -174,31 +179,42 @@ def find_channels_by_position(meta):
     return channels_dict
 
 
-def get_minimal_meta_swainlab(parsed_metadata: dict):
-    """
-    Extract channels from parsed metadata.
+### legacy code for acq and log files ###
 
-    Parameters
-    --------
-    parsed_metadata: dict[str, str or int or DataFrame or Dict]
-        default['general', 'image_config', 'device_properties',
-                'group_position', 'group_time', 'group_config']
 
-    Returns
-    --------
-    Dict with channels metadata
+def parse_legacy_logfiles(
+    root_dir,
+    acq_grammar="multiDGUI_acq_format.json",
+    log_grammar="multiDGUI_log_format.json",
+):
     """
-    channels_dict = find_channels_by_position(parsed_metadata["group_config"])
-    channels = parsed_metadata["image_config"]["Image config"].values.tolist()
-    ntps = parsed_metadata["group_time"]["frames"].max()
-    timeinterval = parsed_metadata["group_time"]["interval"].min()
-    minimal_meta = {
-        "channels_by_group": channels_dict,
-        "channels": channels,
-        "time_settings/ntimepoints": int(ntps),
-        "time_settings/timeinterval": int(timeinterval),
-    }
-    return minimal_meta
+    Parse acq and log files using the grammar specified.
+
+    Merge results into a single dict.
+    """
+    log_parser = Parser(log_grammar)
+    acq_parser = Parser(acq_grammar)
+    log_file = find_file(root_dir, "*log.txt")
+    acq_file = find_file(root_dir, "*[Aa]cq.txt")
+    # parse into a single dict
+    parsed = {}
+    if log_file and acq_file:
+        with open(log_file, "r") as f:
+            log_parsed = log_parser.parse(f)
+        with open(acq_file, "r") as f:
+            acq_parsed = acq_parser.parse(f)
+        parsed = {**acq_parsed, **log_parsed}
+    # convert data to having time stamps
+    for key, value in parsed.items():
+        if isinstance(value, datetime):
+            parsed[key] = datetime_to_timestamp(value)
+    # flatten dict
+    parsed_flattened = flatten_dict(parsed)
+    for k, v in parsed_flattened.items():
+        if isinstance(v, list):
+            # replace None with 0
+            parsed_flattened[k] = [0 if el is None else el for el in v]
+    return parsed_flattened
 
 
 def get_meta_from_legacy(parsed_metadata: dict):
@@ -208,78 +224,20 @@ def get_meta_from_legacy(parsed_metadata: dict):
     return result
 
 
-def parse_swainlab_metadata(filedir: t.Union[str, Path]):
-    """Parse new, .log, and old, .txt, files in a directory into a dict."""
-    filedir = Path(filedir)
-    if filedir.is_file() or str(filedir).endswith(".zarr"):
-        # log file is in parent directory
-        filedir = filedir.parent
-    filepath = find_file(filedir, "*.log")
-    if filepath:
-        # new log files ending in .log
-        raw_parse = parse_from_swainlab_grammar(filepath)
-        minimal_meta = get_minimal_meta_swainlab(raw_parse)
-    else:
-        # old log files ending in .txt
-        legacy_parse = parse_logfiles(filedir)
-        minimal_meta = (
-            get_meta_from_legacy(legacy_parse) if legacy_parse else {}
-        )
-    return minimal_meta
-
-
-def dispatch_metadata_parser(filepath: t.Union[str, Path]):
+def flatten_dict(nested_dict, separator="/"):
     """
-    Dispatch different metadata parsers that convert logfiles into a dictionary.
-
-    Currently only contains the swainlab log parsers.
+    Flatten nested dictionary because h5 attributes cannot be dicts.
 
-    Parameters
-    --------
-    filepath: str
-        File containing metadata or folder containing naming conventions.
+    If empty return as-is.
     """
-    parsed_meta = parse_swainlab_metadata(filepath)
-    if parsed_meta is None:
-        # try to deduce metadata
-        parsed_meta = dir_to_meta
-    return parsed_meta
-
-
-def dir_to_meta(path: Path, suffix="tiff"):
-    """Deduce meta data from the naming convention of tiff files."""
-    filenames = list(path.glob(f"*.{suffix}"))
-    try:
-        # deduce order from filenames
-        dim_order = "".join(
-            map(lambda x: x[0], filenames[0].stem.split("_")[1:])
-        )
-        dim_value = list(
-            map(
-                lambda f: filename_to_dict_indices(f.stem),
-                path.glob("*.tiff"),
-            )
-        )
-        maxs = [max(map(lambda x: x[dim], dim_value)) for dim in dim_order]
-        mins = [min(map(lambda x: x[dim], dim_value)) for dim in dim_order]
-        dim_shapes = [
-            max_val - min_val + 1 for max_val, min_val in zip(maxs, mins)
-        ]
-        meta = {
-            "size_" + dim: shape for dim, shape in zip(dim_order, dim_shapes)
-        }
-    except Exception as e:
-        print(
-            "Warning:Metadata: Cannot extract dimensions from filenames."
-            f" Empty meta set {e}"
-        )
-        meta = {}
-    return meta
+    flattened = {}
+    if nested_dict:
+        df = pd.json_normalize(nested_dict, sep=separator)
+        flattened = df.to_dict(orient="records")[0] or {}
+    return flattened
 
 
-def filename_to_dict_indices(stem: str):
-    """Convert a file name into a dict by splitting."""
-    return {
-        dim_number[0]: int(dim_number[1:])
-        for dim_number in stem.split("_")[1:]
-    }
+def datetime_to_timestamp(time, locale="Europe/London"):
+    """Convert datetime object to UNIX timestamp."""
+    # h5 attributes do not support datetime objects
+    return timezone(locale).localize(time).timestamp()
diff --git a/src/aliby/pipeline.py b/src/aliby/pipeline.py
index ef1e8a8..e185bec 100644
--- a/src/aliby/pipeline.py
+++ b/src/aliby/pipeline.py
@@ -115,17 +115,16 @@ class PipelineParameters(ParametersABC):
             )
             minimal_default_meta = {
                 "channels": ["Brightfield"],
-                "ntps": [2000],
+                "time_settings/ntimepoints": [2000],
             }
             # set minimal metadata
             meta_d = minimal_default_meta
         # define default values for general parameters
-        tps = meta_d.get("ntps", 2000)
         defaults = {
             "general": dict(
                 id=expt_id,
                 distributed=0,
-                tps=tps,
+                tps=meta_d["time_settings/ntimepoints"],
                 directory=str(directory.parent),
                 filter="",
                 earlystop=global_parameters.earlystop,
diff --git a/src/logfile_parser/simple_parser.py b/src/logfile_parser/simple_parser.py
deleted file mode 100644
index b2f29b3..0000000
--- a/src/logfile_parser/simple_parser.py
+++ /dev/null
@@ -1,286 +0,0 @@
-#!/usr/bin/env jupyter
-# TODO should this be merged to the regular logfile_parser structure?
-"""
-Description of new logfile:
-
-All three conditions are concatenated in a single file, in this order:
- - Experiment basic information  (URL in acquisition PC, project, user input)
- - Acquisition settings
- - Experiment start
-
-The section separators are:
------Acquisition settings-----
------Experiment started-----
-
-And for a successfully finished experiment we get:
-
-YYYY-MM-DD HH:mm:ss,ms*3 Image acquisition complete WeekDay Mon Day  HH:mm:ss,ms*3 YYYY
-
-For example:
-2022-09-30 05:40:59,765 Image acquisition complete Fri Sep 30 05:40:59 2022
-
-Data to extract:
-* Basic information
- - Experiment details, which may indicate technical issues
- -  GIT commit
- - (Not working as of 2022/10/03, but projects and tags)
-* Basic information
- -
-
-{'channels_by_group': {'PDR5_GFP': ['Brightfield', 'GFP', 'cy5', 'mCherry'],
-'Ilv3_mCherry': ['Brightfield', 'GFP', 'cy5', 'mCherry'], '
-Yor1_GFP': ['Brightfield', 'GFP', 'cy5', 'mCherry'],
-'Snq2_GFP': ['Brightfield', 'GFP', 'cy5', 'mCherry'],
-'Pdr5_mCherry_pdr1_pdr3': ['Brightfield', 'GFP', 'cy5', 'mCherry']},
-'channels': ['Brightfield', 'GFP', 'cy5', 'mCherry'],
-'time_settings/ntimepoints': 240,
-'time_settings/timeinterval': 300}
-
-
-New grammar
-
-- Tables are assumed to end with an empty line.
-"""
-
-import logging
-import typing as t
-from pathlib import Path
-
-import pandas as pd
-from pyparsing import (
-    CharsNotIn,
-    Combine,
-    Group,
-    Keyword,
-    LineEnd,
-    LineStart,
-    Literal,
-    OneOrMore,
-    ParserElement,
-    Word,
-    printables,
-)
-
-atomic = t.Union[str, int, float, bool]
-
-# specify grammar for the Swain lab
-sl_grammar = {
-    "group": {
-        "position": {
-            "start_trigger": Group(
-                Group(Literal("group:") + Word(printables))
-                + Group(Literal("field:") + "position")
-            ),
-            "data_type": "table",
-        },
-        **{
-            key: {
-                "start_trigger": Group(
-                    Group(Literal("group:") + Word(printables))
-                    + Group(Literal("field:") + key)
-                ),
-                "data_type": "fields",
-            }
-            for key in ("time", "config")
-        },
-    },
-}
-
-ACQ_START = "-----Acquisition settings-----"
-HEADER_END = "-----Experiment started-----"
-MAX_NLINES = 2000  # In case of malformed logfile
-
-ParserElement.setDefaultWhitespaceChars(" \t")
-
-
-class HeaderEndNotFound(Exception):
-    def __init__(self, message, errors):
-        super().__init__(message)
-        self.errors = errors
-
-
-def extract_header(filepath: Path):
-    """Extract content of log file before the experiment starts."""
-    with open(filepath, "r", errors="ignore", encoding="unicode_escape") as f:
-        try:
-            header = ""
-            for _ in range(MAX_NLINES):
-                line = f.readline()
-                if ":" in line:
-                    header += line
-                if HEADER_END in line:
-                    break
-        except HeaderEndNotFound as e:
-            print(f"{MAX_NLINES} checked and no header found.")
-            raise (e)
-        return header
-
-
-def parse_from_swainlab_grammar(filepath: t.Union[str, Path]):
-    """Parse using a grammar for the Swain lab."""
-    return parse_from_grammar(filepath, sl_grammar)
-
-
-def parse_from_grammar(filepath: str, grammar: t.Dict):
-    """Parse a file using the specified grammar."""
-    header = extract_header(filepath)
-    d = {}
-    for key, values in grammar.items():
-        try:
-            if "data_type" in values:
-                # data_type for parse_x defined in values
-                d[key] = parse_x(header, **values)
-            else:
-                # use sub keys to parse groups
-                for subkey, subvalues in values.items():
-                    subkey = "_".join((key, subkey))
-                    d[subkey] = parse_x(header, **subvalues)
-        except Exception as e:
-            logging.getLogger("aliby").critical(
-                f"Parsing failed for key {key} and values {values}."
-            )
-            raise (e)
-    return d
-
-
-def parse_x(string, data_type, **kwargs):
-    """Parse a string using a function specifed by data_type."""
-    res_dict = eval(f"parse_{data_type}(string, **kwargs)")
-    return res_dict
-
-
-def parse_fields(
-    string: str, start_trigger, end_trigger=None
-) -> t.Union[pd.DataFrame, t.Dict[str, atomic]]:
-    """
-    Parse fields as key-value pairs.
-
-    By default the end is an empty newline.
-
-    For example
-
-    group: YST_1510 field: time
-    start: 0
-    interval: 300
-    frames: 180
-    """
-    EOL = LineEnd().suppress()
-    if end_trigger is None:
-        end_trigger = EOL
-    elif isinstance(end_trigger, str):
-        end_trigger = Literal(end_trigger)
-    field = OneOrMore(CharsNotIn(":\n"))
-    line = (
-        LineStart()
-        + Group(field + Combine(OneOrMore(Literal(":").suppress() + field)))
-        + EOL
-    )
-    parser = (
-        start_trigger + EOL + Group(OneOrMore(line)) + end_trigger.suppress()
-    )
-    parser_result = parser.search_string(string)
-    breakpoint()
-    results = parser_result.as_list()
-    assert len(results), "Parsing returned nothing"
-    return fields_to_dict_or_table(results)
-
-
-def parse_table(
-    string: str,
-    start_trigger: t.Union[str, Keyword],
-) -> pd.DataFrame:
-    """
-    Parse csv-like table.
-
-    Parameters
-    ----------
-    string : str
-        contents to parse
-    start_trigger : t.Union[str, t.Collection]
-        string or triggers that indicate section start.
-
-    Returns
-    -------
-    pd.Dataframe or dict of atomic values (int,str,bool,float)
-        DataFrame representing table.
-
-    Examples
-    --------
-    >>> table = parse_table()
-    """
-    if isinstance(start_trigger, str):
-        start_trigger: Keyword = Keyword(start_trigger)
-    EOL = LineEnd().suppress()
-    field = OneOrMore(CharsNotIn(":,\n"))
-    line = LineStart() + Group(
-        OneOrMore(field + Literal(",").suppress()) + field + EOL
-    )
-    parser = (
-        start_trigger
-        + EOL
-        + Group(OneOrMore(line))
-        + EOL  # end_trigger.suppress()
-    )
-    parser_result = parser.search_string(string)
-    assert all(
-        [len(row) == len(parser_result[0]) for row in parser_result]
-    ), f"Table {start_trigger} has unequal number of columns"
-    assert len(parser_result), f"Parsing is empty. {parser}"
-    return table_to_df(parser_result.as_list())
-
-
-def table_to_df(result: t.List[t.List]):
-    if len(result) > 1:  # Multiple tables with ids to append
-        # Generate multiindex from "Name column"
-        # index = [row[1][0][1] for table in result for row in table]
-        # table[1][0].index("Name") # for automatic indexing
-        from itertools import product
-
-        group_name = [
-            product((table[0][0][1],), (row[0] for row in table[1][1:]))
-            for table in result
-        ]
-        tmp = [pair for pairset in group_name for pair in pairset]
-        multiindices = pd.MultiIndex.from_tuples(tmp)
-        df = pd.DataFrame(
-            [row for pr in result for row in pr[1][1:]],
-            columns=result[0][1][0],
-            index=multiindices,
-        )
-        df.name = result[0][0][1][1]
-    else:  # If it is a single table
-        df = pd.DataFrame(result[0][1][1:], columns=result[0][1][0])
-
-    return df
-
-
-def fields_to_dict_or_table(result: t.List[t.List]):
-    if len(result) > 1:
-        formatted = pd.DataFrame(
-            [[row[1] for row in pr[1]] for pr in result],
-            columns=[x[0] for x in result[0][1]],
-            index=[x[0][0][1] for x in result],
-        )
-
-        formatted.name = result[0][0][1][1]
-
-    else:  # If it is a single table
-        formatted = {k: _cast_type(v) for k, v in dict(result[0][1]).items()}
-
-    return formatted
-
-
-def _cast_type(x: str) -> t.Union[str, int, float, bool]:
-    # Convert to any possible when possible
-    x = x.strip()
-    if x.isdigit():
-        x = int(x)
-    else:
-        try:
-            x = float(x)
-        except:
-            try:
-                x = ("false", "true").index(x.lower())
-            except:
-                pass
-    return x
diff --git a/src/logfile_parser/swainlab_parser.py b/src/logfile_parser/swainlab_parser.py
index 67e3433..a461bb3 100644
--- a/src/logfile_parser/swainlab_parser.py
+++ b/src/logfile_parser/swainlab_parser.py
@@ -34,7 +34,7 @@ New grammar
 
 import logging
 import typing as t
-from pathlib import Path
+from pathlib import PosixPath
 
 import pandas as pd
 from pyparsing import (
@@ -53,20 +53,20 @@ from pyparsing import (
 
 atomic = t.Union[str, int, float, bool]
 
-# specify grammar for the Swain lab
-sl_grammar = {
+# Grammar specification
+grammar = {
     "general": {
         "start_trigger": Literal("Swain Lab microscope experiment log file"),
-        "data_type": "fields",
+        "type": "fields",
         "end_trigger": "-----Acquisition settings-----",
     },
     "image_config": {
         "start_trigger": "Image Configs:",
-        "data_type": "table",
+        "type": "table",
     },
     "device_properties": {
         "start_trigger": "Device properties:",
-        "data_type": "table",
+        "type": "table",
     },
     "group": {
         "position": {
@@ -74,7 +74,7 @@ sl_grammar = {
                 Group(Literal("group:") + Word(printables))
                 + Group(Literal("field:") + "position")
             ),
-            "data_type": "table",
+            "type": "table",
         },
         **{
             key: {
@@ -82,29 +82,24 @@ sl_grammar = {
                     Group(Literal("group:") + Word(printables))
                     + Group(Literal("field:") + key)
                 ),
-                "data_type": "fields",
+                "type": "fields",
             }
             for key in ("time", "config")
         },
     },
 }
 
-ACQ_START = "-----Acquisition settings-----"
+
 HEADER_END = "-----Experiment started-----"
 MAX_NLINES = 2000  # In case of malformed logfile
 
 ParserElement.setDefaultWhitespaceChars(" \t")
 
 
-class HeaderEndNotFound(Exception):
-    def __init__(self, message, errors):
-        super().__init__(message)
-        self.errors = errors
-
-
-def extract_header(filepath: Path):
-    """Extract content of log file before the experiment starts."""
-    with open(filepath, "r", errors="ignore", encoding="unicode_escape") as f:
+def extract_header(filepath: PosixPath):
+    """Extract content of log file upto HEADER_END."""
+    with open(filepath, "r", encoding="latin1") as f:
+        # with open(filepath, "r", errors="ignore", encoding="unicode_escape") as f:
         try:
             header = ""
             for _ in range(MAX_NLINES):
@@ -112,78 +107,39 @@ def extract_header(filepath: Path):
                 header += line
                 if HEADER_END in line:
                     break
-        except HeaderEndNotFound as e:
-            print(f"{MAX_NLINES} checked and no header found.")
-            raise (e)
+        except Exception as e:
+            raise e(f"{MAX_NLINES} checked and no header found")
         return header
 
 
-def parse_from_swainlab_grammar(filepath: t.Union[str, Path]):
-    """Parse using a grammar for the Swain lab."""
-    return parse_from_grammar(filepath, sl_grammar)
-
-
 def parse_from_grammar(filepath: str, grammar: t.Dict):
     """Parse a file using the specified grammar."""
     header = extract_header(filepath)
     d = {}
     for key, values in grammar.items():
         try:
-            if "data_type" in values:
-                # data_type for parse_x defined in values
+            if "type" in values:
                 d[key] = parse_x(header, **values)
-            else:
-                # use sub keys to parse groups
+            else:  # Use subkeys to parse groups
                 for subkey, subvalues in values.items():
                     subkey = "_".join((key, subkey))
                     d[subkey] = parse_x(header, **subvalues)
         except Exception as e:
             logging.getLogger("aliby").critical(
-                f"Parsing failed for key {key} and values {values}."
+                f"Parsing failed for key {key} and values {values}"
             )
             raise (e)
     return d
 
 
-def parse_x(string, data_type, **kwargs):
+def parse_x(string: str, type: str, **kwargs):
     """Parse a string using a function specifed by data_type."""
-    res_dict = eval(f"parse_{data_type}(string, **kwargs)")
-    return res_dict
-
-
-def parse_fields(
-    string: str, start_trigger, end_trigger=None
-) -> t.Union[pd.DataFrame, t.Dict[str, atomic]]:
-    """
-    Parse fields as key-value pairs.
-
-    By default the end is an empty newline.
+    return eval(f"parse_{type}(string, **kwargs)")
 
-    For example
 
-    group: YST_1510 field: time
-    start: 0
-    interval: 300
-    frames: 180
-    """
-    EOL = LineEnd().suppress()
-    if end_trigger is None:
-        end_trigger = EOL
-    elif isinstance(end_trigger, str):
-        end_trigger = Literal(end_trigger)
-    field = OneOrMore(CharsNotIn(":\n"))
-    line = (
-        LineStart()
-        + Group(field + Combine(OneOrMore(Literal(":").suppress() + field)))
-        + EOL
-    )
-    parser = (
-        start_trigger + EOL + Group(OneOrMore(line)) + end_trigger.suppress()
-    )
-    parser_result = parser.search_string(string)
-    results = parser_result.as_list()
-    assert len(results), "Parsing returned nothing"
-    return fields_to_dict_or_table(results)
+def parse_from_swainlab_grammar(filepath: t.Union[str, PosixPath]):
+    """Parse using a grammar for the Swain lab."""
+    return parse_from_grammar(filepath, grammar)
 
 
 def parse_table(
@@ -230,11 +186,43 @@ def parse_table(
     return table_to_df(parser_result.as_list())
 
 
+def parse_fields(
+    string: str, start_trigger, end_trigger=None
+) -> t.Union[pd.DataFrame, t.Dict[str, atomic]]:
+    """
+    Parse fields are parsed as key-value pairs.
+
+    By default the end is an empty newline.
+
+    For example
+
+    group: YST_1510 field: time
+    start: 0
+    interval: 300
+    frames: 180
+    """
+    EOL = LineEnd().suppress()
+    if end_trigger is None:
+        end_trigger = EOL
+    elif isinstance(end_trigger, str):
+        end_trigger = Literal(end_trigger)
+    field = OneOrMore(CharsNotIn(":\n"))
+    line = (
+        LineStart()
+        + Group(field + Combine(OneOrMore(Literal(":").suppress() + field)))
+        + EOL
+    )
+    parser = (
+        start_trigger + EOL + Group(OneOrMore(line)) + end_trigger.suppress()
+    )
+    parser_result = parser.search_string(string)
+    results = parser_result.as_list()
+    assert len(results), "Parsing returned nothing"
+    return fields_to_dict_or_table(results)
+
+
 def table_to_df(result: t.List[t.List]):
     if len(result) > 1:  # Multiple tables with ids to append
-        # Generate multiindex from "Name column"
-        # index = [row[1][0][1] for table in result for row in table]
-        # table[1][0].index("Name") # for automatic indexing
         from itertools import product
 
         group_name = [
@@ -251,7 +239,6 @@ def table_to_df(result: t.List[t.List]):
         df.name = result[0][0][1][1]
     else:  # If it is a single table
         df = pd.DataFrame(result[0][1][1:], columns=result[0][1][0])
-
     return df
 
 
@@ -264,7 +251,6 @@ def fields_to_dict_or_table(result: t.List[t.List]):
         )
 
         formatted.name = result[0][0][1][1]
-
     else:  # If it is a single table
         formatted = {k: _cast_type(v) for k, v in dict(result[0][1]).items()}
 
-- 
GitLab