Skip to content
Snippets Groups Projects
Commit 3164db4e authored by Alán Muñoz's avatar Alán Muñoz
Browse files

refactor(picker/cells): Lineage methods to cells

parent a37cdfd6
No related branches found
No related tags found
No related merge requests found
......@@ -298,128 +298,88 @@ class CellsLinear(CellsHDF):
rand = np.random.randint(mat.sum())
return (traps[rand], tps[rand])
def mothers_in_trap(self, trap_id: int):
return self.mothers[trap_id]
# class CellsMat(Cells):
# def __init__(self, mat_object):
# super(CellsMat, self).__init__()
# # TODO add __contains__ to the matObject
# timelapse_traps = mat_object.get(
# "timelapseTrapsOmero", mat_object.get("timelapseTraps", None)
# )
# if timelapse_traps is None:
# raise NotImplementedError(
# "Could not find a timelapseTraps or "
# "timelapseTrapsOmero object. Cells "
# "from cellResults not implemented"
# )
# else:
# self.trap_info = timelapse_traps["cTimepoint"]["trapInfo"]
# if isinstance(self.trap_info, list):
# self.trap_info = {
# k: list([res.get(k, []) for res in self.trap_info])
# for k in self.trap_info[0].keys()
# }
# def where(self, cell_id, trap_id):
# times, indices = zip(
# *[
# (tp, np.where(cell_id == x)[0][0])
# for tp, x in enumerate(self.trap_info["cellLabel"][:, trap_id].tolist())
# if np.any(cell_id == x)
# ]
# )
# return times, indices
# def outline(self, cell_id, trap_id):
# times, indices = self.where(cell_id, trap_id)
# info = self.trap_info["cell"][times, trap_id]
# def get_segmented(cell, index):
# if cell["segmented"].ndim == 0:
# return cell["segmented"][()].todense()
# else:
# return cell["segmented"][index].todense()
# segmentation_outline = [
# get_segmented(cell, idx) for idx, cell in zip(indices, info)
# ]
# return times, np.array(segmentation_outline)
# def mask(self, cell_id, trap_id):
# times, outlines = self.outline(cell_id, trap_id)
# return times, np.array(
# [ndimage.morphology.binary_fill_holes(o) for o in outlines]
# )
# def at_time(self, timepoint, kind="outline"):
# """Returns the segmentations for all the cells at a given timepoint.
# FIXME: this is extremely hacky and accounts for differently saved
# results in the matlab object. Deprecate ASAP.
# """
# # Case 1: only one cell per trap: trap_info['cell'][timepoint] is a
# # structured array
# if isinstance(self.trap_info["cell"][timepoint], dict):
# segmentations = [
# self._astype(x, "outline")
# for x in self.trap_info["cell"][timepoint]["segmented"]
# ]
# # Case 2: Multiple cells per trap: it becomes a list of arrays or
# # dictionaries, one for each trap
# # Case 2.1 : it's a dictionary
# elif isinstance(self.trap_info["cell"][timepoint][0], dict):
# segmentations = []
# for x in self.trap_info["cell"][timepoint]:
# seg = x["segmented"]
# if not isinstance(seg, np.ndarray):
# seg = [seg]
# segmentations.append([self._astype(y, "outline") for y in seg])
# # Case 2.2 : it's an array
# else:
# segmentations = [
# [self._astype(y, type) for y in x["segmented"]] if x.ndim != 0 else []
# for x in self.trap_info["cell"][timepoint]
# ]
# # Return dict for compatibility with hdf5 output
# return {i: v for i, v in enumerate(segmentations)}
# def labels_at_time(self, tp):
# labels = self.trap_info["cellLabel"]
# labels = [_aslist(x) for x in labels[tp]]
# labels = {i: [lbl for lbl in lblset] for i, lblset in enumerate(labels)}
# return labels
# @property
# def ntraps(self):
# return len(self.trap_info["cellLabel"][0])
# @property
# def tile_size(self):
# pass
# class ExtractionRunner:
# """An object to run extraction of fluorescence, and general data out of
# segmented data.
# Configure with what extraction we want to run.
# Cell selection criteria.
# Filtering criteria.
# """
# def __init__(self, tiler, cells):
# pass
# def run(self, keys, store, **kwargs):
# pass
# def _aslist(x):
# if isinstance(x, Iterable):
# if hasattr(x, "tolist"):
# x = x.tolist()
# else:
# x = [x]
# return x
@property
def mothers(self):
"""
Return nested list with final prediction of mother id for each cell
"""
with h5py.File(self.filename, "r") as f:
return self.mother_assign_from_dynamic(
self["mother_assign_dynamic"],
self["cell_label"],
self["trap"],
self.ntraps,
)
@property
def mothers_daughters(self):
nested_massign = self.mothers
if sum([x for y in nested_massign for x in y]):
idx = set(
[
(tid, i + 1)
for tid, x in enumerate(nested_massign)
for i in range(len(x))
]
)
mothers, daughters = zip(
*[
((tid, m), (tid, d))
for tid, trapcells in enumerate(nested_massign)
for d, m in enumerate(trapcells, 1)
if m
]
)
else:
mothers, daughters = ([], [])
# print("Warning:Cells: No mother-daughters assigned")
return mothers, daughters
@staticmethod
def mother_assign_to_mb_matrix(ma: t.List[np.array]):
# Convert from list of lists to mother_bud sparse matrix
ncells = sum([len(t) for t in ma])
mb_matrix = np.zeros((ncells, ncells), dtype=bool)
c = 0
for cells in ma:
for d, m in enumerate(cells):
if m:
mb_matrix[c + d, c + m - 1] = True
c += len(cells)
return mb_matrix
@staticmethod
def mother_assign_from_dynamic(
ma, cell_label: t.List[int], trap: t.List[int], ntraps: int
):
"""
Interpolate the list of lists containing the associated mothers from the mother_assign_dynamic feature
"""
idlist = list(zip(trap, cell_label))
cell_gid = np.unique(idlist, axis=0)
last_lin_preds = [
find_1st(
((cell_label[::-1] == lbl) & (trap[::-1] == tr)),
True,
cmp_equal,
)
for tr, lbl in cell_gid
]
mother_assign_sorted = ma[::-1][last_lin_preds]
traps = cell_gid[:, 0]
iterator = groupby(zip(traps, mother_assign_sorted), lambda x: x[0])
d = {key: [x[1] for x in group] for key, group in iterator}
nested_massign = [d.get(i, []) for i in range(ntraps)]
return nested_massign
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment