diff --git a/io/cells.py b/io/cells.py index 1fde5978ed25e9413977cea33ca6baefebab9ebe..aee65562d914487786341c18419c58c5b30cafea 100644 --- a/io/cells.py +++ b/io/cells.py @@ -298,128 +298,88 @@ class CellsLinear(CellsHDF): rand = np.random.randint(mat.sum()) return (traps[rand], tps[rand]) + def mothers_in_trap(self, trap_id: int): + return self.mothers[trap_id] -# class CellsMat(Cells): -# def __init__(self, mat_object): -# super(CellsMat, self).__init__() -# # TODO add __contains__ to the matObject -# timelapse_traps = mat_object.get( -# "timelapseTrapsOmero", mat_object.get("timelapseTraps", None) -# ) -# if timelapse_traps is None: -# raise NotImplementedError( -# "Could not find a timelapseTraps or " -# "timelapseTrapsOmero object. Cells " -# "from cellResults not implemented" -# ) -# else: -# self.trap_info = timelapse_traps["cTimepoint"]["trapInfo"] - -# if isinstance(self.trap_info, list): -# self.trap_info = { -# k: list([res.get(k, []) for res in self.trap_info]) -# for k in self.trap_info[0].keys() -# } - -# def where(self, cell_id, trap_id): -# times, indices = zip( -# *[ -# (tp, np.where(cell_id == x)[0][0]) -# for tp, x in enumerate(self.trap_info["cellLabel"][:, trap_id].tolist()) -# if np.any(cell_id == x) -# ] -# ) -# return times, indices - -# def outline(self, cell_id, trap_id): -# times, indices = self.where(cell_id, trap_id) -# info = self.trap_info["cell"][times, trap_id] - -# def get_segmented(cell, index): -# if cell["segmented"].ndim == 0: -# return cell["segmented"][()].todense() -# else: -# return cell["segmented"][index].todense() - -# segmentation_outline = [ -# get_segmented(cell, idx) for idx, cell in zip(indices, info) -# ] -# return times, np.array(segmentation_outline) - -# def mask(self, cell_id, trap_id): -# times, outlines = self.outline(cell_id, trap_id) -# return times, np.array( -# [ndimage.morphology.binary_fill_holes(o) for o in outlines] -# ) - -# def at_time(self, timepoint, kind="outline"): - -# """Returns the segmentations for all the cells at a given timepoint. - -# FIXME: this is extremely hacky and accounts for differently saved -# results in the matlab object. Deprecate ASAP. -# """ -# # Case 1: only one cell per trap: trap_info['cell'][timepoint] is a -# # structured array -# if isinstance(self.trap_info["cell"][timepoint], dict): -# segmentations = [ -# self._astype(x, "outline") -# for x in self.trap_info["cell"][timepoint]["segmented"] -# ] -# # Case 2: Multiple cells per trap: it becomes a list of arrays or -# # dictionaries, one for each trap -# # Case 2.1 : it's a dictionary -# elif isinstance(self.trap_info["cell"][timepoint][0], dict): -# segmentations = [] -# for x in self.trap_info["cell"][timepoint]: -# seg = x["segmented"] -# if not isinstance(seg, np.ndarray): -# seg = [seg] -# segmentations.append([self._astype(y, "outline") for y in seg]) -# # Case 2.2 : it's an array -# else: -# segmentations = [ -# [self._astype(y, type) for y in x["segmented"]] if x.ndim != 0 else [] -# for x in self.trap_info["cell"][timepoint] -# ] -# # Return dict for compatibility with hdf5 output -# return {i: v for i, v in enumerate(segmentations)} - -# def labels_at_time(self, tp): -# labels = self.trap_info["cellLabel"] -# labels = [_aslist(x) for x in labels[tp]] -# labels = {i: [lbl for lbl in lblset] for i, lblset in enumerate(labels)} -# return labels - -# @property -# def ntraps(self): -# return len(self.trap_info["cellLabel"][0]) - -# @property -# def tile_size(self): -# pass - - -# class ExtractionRunner: -# """An object to run extraction of fluorescence, and general data out of -# segmented data. - -# Configure with what extraction we want to run. -# Cell selection criteria. -# Filtering criteria. -# """ - -# def __init__(self, tiler, cells): -# pass - -# def run(self, keys, store, **kwargs): -# pass - - -# def _aslist(x): -# if isinstance(x, Iterable): -# if hasattr(x, "tolist"): -# x = x.tolist() -# else: -# x = [x] -# return x + @property + def mothers(self): + """ + Return nested list with final prediction of mother id for each cell + """ + with h5py.File(self.filename, "r") as f: + + return self.mother_assign_from_dynamic( + self["mother_assign_dynamic"], + self["cell_label"], + self["trap"], + self.ntraps, + ) + + @property + def mothers_daughters(self): + nested_massign = self.mothers + + if sum([x for y in nested_massign for x in y]): + + idx = set( + [ + (tid, i + 1) + for tid, x in enumerate(nested_massign) + for i in range(len(x)) + ] + ) + mothers, daughters = zip( + *[ + ((tid, m), (tid, d)) + for tid, trapcells in enumerate(nested_massign) + for d, m in enumerate(trapcells, 1) + if m + ] + ) + else: + mothers, daughters = ([], []) + # print("Warning:Cells: No mother-daughters assigned") + + return mothers, daughters + + @staticmethod + def mother_assign_to_mb_matrix(ma: t.List[np.array]): + # Convert from list of lists to mother_bud sparse matrix + ncells = sum([len(t) for t in ma]) + mb_matrix = np.zeros((ncells, ncells), dtype=bool) + c = 0 + for cells in ma: + for d, m in enumerate(cells): + if m: + mb_matrix[c + d, c + m - 1] = True + + c += len(cells) + + return mb_matrix + + @staticmethod + def mother_assign_from_dynamic( + ma, cell_label: t.List[int], trap: t.List[int], ntraps: int + ): + """ + Interpolate the list of lists containing the associated mothers from the mother_assign_dynamic feature + """ + idlist = list(zip(trap, cell_label)) + cell_gid = np.unique(idlist, axis=0) + + last_lin_preds = [ + find_1st( + ((cell_label[::-1] == lbl) & (trap[::-1] == tr)), + True, + cmp_equal, + ) + for tr, lbl in cell_gid + ] + mother_assign_sorted = ma[::-1][last_lin_preds] + + traps = cell_gid[:, 0] + iterator = groupby(zip(traps, mother_assign_sorted), lambda x: x[0]) + d = {key: [x[1] for x in group] for key, group in iterator} + nested_massign = [d.get(i, []) for i in range(ntraps)] + + return nested_massign