Skip to content
Snippets Groups Projects
Commit 2a030b66 authored by pswain's avatar pswain
Browse files

docs for metadata

parent d43d5710
No related branches found
No related tags found
No related merge requests found
""" """
Anthology of interfaces fordispatch_metadata_parse different parsers and lack of them. Aliby decides on using different metadata parsers based on two elements:
1. The parameter given by PipelineParameters (either True/False or a string
ALIBY decides on using different metadata parsers based on two elements: pointing to the metadata file)
2. The available files in the root folder where images are found (either
1. The parameter given by PipelineParameters (Either True/False, or a string pointing to the metadata file) remote or locally).
2. The available files in the root folder where images are found (remote or locally)
If parameters is a string pointing to a metadata file, Aliby picks a parser
If parameters is a string pointing to a metadata file, ALIBY picks a parser based on the file format. based on the file format.
If parameters is True (as a boolean), ALIBY searches for any available file and uses the first valid one. If parameters is True, Aliby searches for any available file and uses the
If there are no metadata files, ALIBY requires indicating indices for tiler, segmentation and extraction. first valid one.
If there are no metadata files, Aliby requires indices in the tiff file names
for tiler, segmentation, and extraction.
WARNING: grammars depend on the directory structure of a local log-file_parser
repository.
""" """
import glob import glob
import logging import logging
...@@ -27,28 +31,32 @@ from logfile_parser.swainlab_parser import parse_from_swainlab_grammar ...@@ -27,28 +31,32 @@ from logfile_parser.swainlab_parser import parse_from_swainlab_grammar
class MetaData: class MetaData:
"""Small metadata Process that loads log.""" """Metadata process that loads and parses log files."""
def __init__(self, log_dir, store): def __init__(self, log_dir, store):
"""Initialise with log-file directory and h5 location to write."""
self.log_dir = log_dir self.log_dir = log_dir
self.store = store self.store = store
self.metadata_writer = Writer(self.store) self.metadata_writer = Writer(self.store)
def __getitem__(self, item): def __getitem__(self, item):
"""Load log and access item in resulting meta data dictionary."""
return self.load_logs()[item] return self.load_logs()[item]
def load_logs(self): def load_logs(self):
# parsed_flattened = parse_logfiles(self.log_dir) """Load log using a hierarchy of parsers."""
parsed_flattened = dispatch_metadata_parser(self.log_dir) parsed_flattened = dispatch_metadata_parser(self.log_dir)
return parsed_flattened return parsed_flattened
def run(self, overwrite=False): def run(self, overwrite=False):
"""Load and parse logs and write to h5 file."""
metadata_dict = self.load_logs() metadata_dict = self.load_logs()
self.metadata_writer.write( self.metadata_writer.write(
path="/", meta=metadata_dict, overwrite=overwrite path="/", meta=metadata_dict, overwrite=overwrite
) )
def add_field(self, field_name, field_value, **kwargs): def add_field(self, field_name, field_value, **kwargs):
"""Write a field and its values to the h5 file."""
self.metadata_writer.write( self.metadata_writer.write(
path="/", path="/",
meta={field_name: field_value}, meta={field_name: field_value},
...@@ -56,94 +64,87 @@ class MetaData: ...@@ -56,94 +64,87 @@ class MetaData:
) )
def add_fields(self, fields_values: dict, **kwargs): def add_fields(self, fields_values: dict, **kwargs):
"""Write a dict of fields and values to the h5 file."""
for field, value in fields_values.items(): for field, value in fields_values.items():
self.add_field(field, value) self.add_field(field, value)
# Paradigm: able to do something with all datatypes present in log files,
# then pare down on what specific information is really useful later.
# Needed because HDF5 attributes do not support dictionaries
def flatten_dict(nested_dict, separator="/"): def flatten_dict(nested_dict, separator="/"):
""" """
Flatten nested dictionary. If empty return as-is. Flatten nested dictionary because h5 attributes cannot be dicts.
If empty return as-is.
""" """
flattened = {} flattened = {}
if nested_dict: if nested_dict:
df = pd.json_normalize(nested_dict, sep=separator) df = pd.json_normalize(nested_dict, sep=separator)
flattened = df.to_dict(orient="records")[0] or {} flattened = df.to_dict(orient="records")[0] or {}
return flattened return flattened
# Needed because HDF5 attributes do not support datetime objects
# Takes care of time zones & daylight saving
def datetime_to_timestamp(time, locale="Europe/London"): def datetime_to_timestamp(time, locale="Europe/London"):
"""Convert datetime object to UNIX timestamp.""" """Convert datetime object to UNIX timestamp."""
# h5 attributes do not support datetime objects
return timezone(locale).localize(time).timestamp() return timezone(locale).localize(time).timestamp()
def find_file(root_dir, regex): def find_file(root_dir, regex):
"""Find files in a directory using regex."""
# ignore aliby.log files
file = [ file = [
f f
for f in glob.glob(os.path.join(str(root_dir), regex)) for f in glob.glob(os.path.join(str(root_dir), regex))
if Path(f).name != "aliby.log" # Skip filename reserved for aliby if Path(f).name != "aliby.log"
] ]
if len(file) > 1:
print(
"Warning:Metadata: More than one logfile found. Defaulting to first option."
)
file = [sorted(file)[0]]
if len(file) == 0: if len(file) == 0:
logging.getLogger("aliby").log( logging.getLogger("aliby").log(
logging.WARNING, "Metadata: No valid swainlab .log found." logging.WARNING, "Metadata: No valid swainlab .log found."
) )
return None
elif len(file) > 1:
print(
"Warning:Metadata: More than one log file found."
" Defaulting to first option."
)
return sorted(file)[0]
else: else:
return file[0] return file[0]
return None
# TODO: re-write this as a class if appropriate
# WARNING: grammars depend on the directory structure of a locally installed
# logfile_parser repo
def parse_logfiles( def parse_logfiles(
root_dir, root_dir,
acq_grammar="multiDGUI_acq_format.json", acq_grammar="multiDGUI_acq_format.json",
log_grammar="multiDGUI_log_format.json", log_grammar="multiDGUI_log_format.json",
): ):
""" """
Parse acq and log files depending on the grammar specified, then merge into Parse acq and log files using the grammar specified.
single dict.
Merge results into a single dict.
""" """
# Both acq and log files contain useful information.
# ACQ_FILE = 'flavin_htb2_glucose_long_ramp_DelftAcq.txt'
# LOG_FILE = 'flavin_htb2_glucose_long_ramp_Delftlog.txt'
log_parser = Parser(log_grammar) log_parser = Parser(log_grammar)
acq_parser = Parser(acq_grammar) acq_parser = Parser(acq_grammar)
# an example acq file is 'flavin_htb2_glucose_long_ramp_DelftAcq.txt'
log_file = find_file(root_dir, "*log.txt") log_file = find_file(root_dir, "*log.txt")
# an example log file is 'flavin_htb2_glucose_long_ramp_Delftlog.txt'
acq_file = find_file(root_dir, "*[Aa]cq.txt") acq_file = find_file(root_dir, "*[Aa]cq.txt")
# parse into a single dict
parsed = {} parsed = {}
if log_file and acq_file: if log_file and acq_file:
with open(log_file, "r") as f: with open(log_file, "r") as f:
log_parsed = log_parser.parse(f) log_parsed = log_parser.parse(f)
with open(acq_file, "r") as f: with open(acq_file, "r") as f:
acq_parsed = acq_parser.parse(f) acq_parsed = acq_parser.parse(f)
parsed = {**acq_parsed, **log_parsed} parsed = {**acq_parsed, **log_parsed}
# convert data to having time stamps
for key, value in parsed.items(): for key, value in parsed.items():
if isinstance(value, datetime): if isinstance(value, datetime):
parsed[key] = datetime_to_timestamp(value) parsed[key] = datetime_to_timestamp(value)
# flatten dict
parsed_flattened = flatten_dict(parsed) parsed_flattened = flatten_dict(parsed)
for k, v in parsed_flattened.items(): for k, v in parsed_flattened.items():
if isinstance(v, list): if isinstance(v, list):
# replace None with 0
parsed_flattened[k] = [0 if el is None else el for el in v] parsed_flattened[k] = [0 if el is None else el for el in v]
return parsed_flattened return parsed_flattened
...@@ -151,49 +152,37 @@ def get_meta_swainlab(parsed_metadata: dict): ...@@ -151,49 +152,37 @@ def get_meta_swainlab(parsed_metadata: dict):
""" """
Convert raw parsing of Swainlab logfile to the metadata interface. Convert raw parsing of Swainlab logfile to the metadata interface.
Input: Parameters
-------- --------
parsed_metadata: Dict[str, str or int or DataFrame or Dict] parsed_metadata: dict[str, str or int or DataFrame or Dict]
default['general', 'image_config', 'device_properties', 'group_position', 'group_time', 'group_config'] default['general', 'image_config', 'device_properties',
'group_position', 'group_time', 'group_config']
Returns: Returns
-------- --------
Dictionary with metadata following the standard Dict with channels metadata
""" """
channels = parsed_metadata["image_config"]["Image config"].values.tolist() channels = parsed_metadata["image_config"]["Image config"].values.tolist()
# nframes = int(parsed_metadata["group_time"]["frames"].max())
# return {"channels": channels, "nframes": nframes}
return {"channels": channels} return {"channels": channels}
def get_meta_from_legacy(parsed_metadata: dict): def get_meta_from_legacy(parsed_metadata: dict):
"""Fix naming convention for channels in legacy .txt log files."""
result = parsed_metadata result = parsed_metadata
result["channels"] = result["channels/channel"] result["channels"] = result["channels/channel"]
return result return result
def parse_swainlab_metadata(filedir: t.Union[str, Path]): def parse_swainlab_metadata(filedir: t.Union[str, Path]):
""" """Parse new, .log, and old, .txt, files in a directory into a dict."""
Dispatcher function that determines which parser to use based on the file ending.
Input:
--------
filedir: Directory where the logfile is located.
Returns:
--------
Dictionary with minimal metadata
"""
filedir = Path(filedir) filedir = Path(filedir)
filepath = find_file(filedir, "*.log") filepath = find_file(filedir, "*.log")
if filepath: if filepath:
# new log files # new log files ending in .log
raw_parse = parse_from_swainlab_grammar(filepath) raw_parse = parse_from_swainlab_grammar(filepath)
minimal_meta = get_meta_swainlab(raw_parse) minimal_meta = get_meta_swainlab(raw_parse)
else: else:
# old log files # old log files ending in .txt
if filedir.is_file() or str(filedir).endswith(".zarr"): if filedir.is_file() or str(filedir).endswith(".zarr"):
# log file is in parent directory # log file is in parent directory
filedir = filedir.parent filedir = filedir.parent
...@@ -210,51 +199,51 @@ def dispatch_metadata_parser(filepath: t.Union[str, Path]): ...@@ -210,51 +199,51 @@ def dispatch_metadata_parser(filepath: t.Union[str, Path]):
Currently only contains the swainlab log parsers. Currently only contains the swainlab log parsers.
Input: Parameters
-------- --------
filepath: str existing file containing metadata, or folder containing naming filepath: str
conventions File containing metadata or folder containing naming conventions.
""" """
parsed_meta = parse_swainlab_metadata(filepath) parsed_meta = parse_swainlab_metadata(filepath)
if parsed_meta is None: if parsed_meta is None:
# try to deduce metadata
parsed_meta = dir_to_meta parsed_meta = dir_to_meta
return parsed_meta return parsed_meta
def dir_to_meta(path: Path, suffix="tiff"): def dir_to_meta(path: Path, suffix="tiff"):
"""Deduce meta data from the naming convention of tiff files."""
filenames = list(path.glob(f"*.{suffix}")) filenames = list(path.glob(f"*.{suffix}"))
try: try:
# Deduct order from filenames # deduce order from filenames
dimorder = "".join( dim_order = "".join(
map(lambda x: x[0], filenames[0].stem.split("_")[1:]) map(lambda x: x[0], filenames[0].stem.split("_")[1:])
) )
dim_value = list( dim_value = list(
map( map(
lambda f: filename_to_dict_indices(f.stem), lambda f: filename_to_dict_indices(f.stem),
path.glob("*.tiff"), path.glob("*.tiff"),
) )
) )
maxes = [max(map(lambda x: x[dim], dim_value)) for dim in dimorder] maxs = [max(map(lambda x: x[dim], dim_value)) for dim in dim_order]
mins = [min(map(lambda x: x[dim], dim_value)) for dim in dimorder] mins = [min(map(lambda x: x[dim], dim_value)) for dim in dim_order]
_dim_shapes = [ dim_shapes = [
max_val - min_val + 1 for max_val, min_val in zip(maxes, mins) max_val - min_val + 1 for max_val, min_val in zip(maxs, mins)
] ]
meta = { meta = {
"size_" + dim: shape for dim, shape in zip(dimorder, _dim_shapes) "size_" + dim: shape for dim, shape in zip(dim_order, dim_shapes)
} }
except Exception as e: except Exception as e:
print( print(
f"Warning:Metadata: Cannot extract dimensions from filenames. Empty meta set {e}" "Warning:Metadata: Cannot extract dimensions from filenames."
f" Empty meta set {e}"
) )
meta = {} meta = {}
return meta return meta
def filename_to_dict_indices(stem: str): def filename_to_dict_indices(stem: str):
"""Convert a file name into a dict by splitting."""
return { return {
dim_number[0]: int(dim_number[1:]) dim_number[0]: int(dim_number[1:])
for dim_number in stem.split("_")[1:] for dim_number in stem.split("_")[1:]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment