Skip to content
Snippets Groups Projects
Commit 29403e96 authored by pswain's avatar pswain
Browse files

"More on grouper"

parent 2b6cbbe1
No related branches found
No related tags found
No related merge requests found
......@@ -107,7 +107,7 @@ class Signal(BridgeH5):
@staticmethod
def get_retained(df, cutoff):
"""Return a percentage of the df without later time points."""
"""Return a fraction of the df, one without later time points."""
return df.loc[bn.nansum(df.notna(), axis=1) > df.shape[1] * cutoff]
@property
......@@ -118,7 +118,7 @@ class Signal(BridgeH5):
@_first_arg_str_to_df
def retained(self, signal, cutoff=0.8):
"""Reduce a dataframe or a list of dataframes by a percentage, losing late time points."""
"""Reduce a dataframe, or a list of dataframes, to a fraction of its former size, losing late time points."""
if isinstance(signal, pd.DataFrame):
return self.get_retained(signal, cutoff)
elif isinstance(signal, list):
......
......@@ -23,23 +23,21 @@ def add_index_levels(
def drop_level(
df: pd.DataFrame, name: str = "mother_label", as_list: bool = True
df: pd.DataFrame,
name: str = "mother_label",
as_list: bool = True,
) -> t.Union[t.List[index_row], pd.Index]:
"""Drop index level
"""
Drop index level.
Parameters
----------
df : pd.DataFrame
dataframe whose multiindex we will drop
Dataframe whose multiindex we will drop
name : str
name of index level to drop
Mame of index level to drop
as_list : bool
Whether to return as a list instead of an index
Examples
--------
FIXME: Add docs.
"""
short_index = df.index.droplevel(name)
if as_list:
......@@ -50,22 +48,17 @@ def drop_level(
def intersection_matrix(
index1: pd.MultiIndex, index2: pd.MultiIndex
) -> np.ndarray:
"""
Use casting to obtain the boolean mask of the intersection of two multiindices
"""
"""Use casting to obtain the boolean mask of the intersection of two multi-indices."""
indices = [index1, index2]
for i in range(2):
if hasattr(indices[i], "to_list"):
indices[i]: t.List = indices[i].to_list()
indices[i]: np.ndarray = np.array(indices[i])
return (indices[0][..., None] == indices[1].T).all(axis=1)
def get_mother_ilocs_from_daughters(df: pd.DataFrame) -> np.ndarray:
"""
Fetch mother locations in the index of df for all daughters in df.
"""
"""Fetch mother locations in the index of df for all daughters in df."""
daughter_ids = df.index[df.index.get_level_values("mother_label") > 0]
mother_ilocs = intersection_matrix(
daughter_ids.droplevel("cell_label"),
......@@ -86,36 +79,41 @@ def get_mothers_from_another_df(whole_df: pd.DataFrame, da_df: pd.DataFrame):
def bidirectional_retainment_filter(
df: pd.DataFrame, mothers_thresh: float = 0.8, daughters_thresh: int = 7
):
df: pd.DataFrame,
mothers_thresh: float = 0.8,
daughters_thresh: int = 7,
) -> pd.DataFrame:
"""
Retrieve families where mothers are present for more than a fraction of the experiment, and daughters for longer than some number of time-points.
Parameters
----------
df: pd.DataFrame
Data
mothers_thresh: float
Minimum fraction of experiment's total duration for which mothers must be present.
daughters_thresh: int
Minimum number of time points for which daughters must be observed
"""
# daughters
all_daughters = df.loc[df.index.get_level_values("mother_label") > 0]
# Filter daughters
# keep daughters observed sufficiently often
retained_daughters = all_daughters.loc[
all_daughters.notna().sum(axis=1) > daughters_thresh
]
# Fectch mother using existing daughters
# fetch mother using existing daughters
mothers = df.loc[get_mothers_from_another_df(df, retained_daughters)]
# Get mothers
# keep mothers present for at least a fraction of the experiment's duration
retained_mothers = mothers.loc[
mothers.notna().sum(axis=1) > mothers.shape[1] * mothers_thresh
]
# Filter-out daughters with no valid mothers
# drop daughters with no valid mothers
final_da_mask = intersection_matrix(
drop_level(retained_daughters, "cell_label", as_list=False),
drop_level(retained_mothers, "mother_label", as_list=False),
)
final_daughters = retained_daughters.loc[final_da_mask.any(axis=1)]
# Join mothers and daughters and sort index
#
# join mothers and daughters and sort index
return pd.concat((final_daughters, retained_mothers), axis=0).sort_index()
......
......@@ -16,10 +16,13 @@ from postprocessor.core.lineageprocess import LineageProcessParameters
class Chainer(Signal):
"""
Extend Signal by applying postprocesses.
Extend Signal by applying post-processes and allowing composite signals that combine basic signals.
Instead of reading processes previously applied, it executes
them when called.
"""
# these no longer seem to be used
#process_types = ("multisignal", "processes", "reshapers")
#common_chains = {}
......@@ -46,7 +49,7 @@ class Chainer(Signal):
f"extraction/{channel}/max/median",
),
}
# Alan: can we change url to address?
# function to add bgsub to urls
def replace_url(url: str, bgsub: str = ""):
channel = url.split("/")[1]
......@@ -79,6 +82,7 @@ class Chainer(Signal):
retain: t.Optional[float] = None,
**kwargs,
):
"""Load data from an h5 file."""
if dataset in self.common_chains:
# get dataset for composite chains
data = self.common_chains[dataset](**kwargs)
......@@ -88,8 +92,9 @@ class Chainer(Signal):
if chain:
data = self.apply_chain(data, chain, **kwargs)
if retain:
# keep only early time points (?)
data = data.loc[data.notna().sum(axis=1) > data.shape[1] * retain]
# keep data only from early time points
data = self.get_retained(data, retain)
# data = data.loc[data.notna().sum(axis=1) > data.shape[1] * retain]
if (stages and "stage" not in data.columns.names):
# return stages as additional column level
stages_index = [
......@@ -145,6 +150,5 @@ class Chainer(Signal):
raise (NotImplementedError)
merges = process.as_function(result, **params)
result = self.apply_merges(result, merges)
self._intermediate_steps.append(result)
return result
......@@ -106,7 +106,7 @@ class Grouper(ABC):
Examples
--------
FIXME: Add docs.
>>> record = grouper.concat_signal("extraction/GFP/max/median")
"""
if path.startswith("/"):
path = path.strip("/")
......@@ -125,9 +125,7 @@ class Grouper(ABC):
)
# check for errors
errors = [
k
for kymo, k in zip(records, self.chainers.keys())
if kymo is None
k for kymo, k in zip(records, self.chainers.keys()) if kymo is None
]
records = [record for record in records if record is not None]
if len(errors):
......@@ -156,7 +154,9 @@ class Grouper(ABC):
f"Grouper:Warning: {nchains_dif} chains do not contain"
f" channel {path}"
)
assert len(good_chains), f"No valid dataset to use. Valid datasets are {self.available}"
assert len(
good_chains
), f"No valid dataset to use. Valid datasets are {self.available}"
return good_chains
def pool_function(
......@@ -170,7 +170,7 @@ class Grouper(ABC):
"""Enable different threads for independent chains, particularly useful when aggregating multiple elements."""
if pool is None:
# Alan: why is None changed to 8
#pool = 8
# pool = 8
pass
chainers = chainers or self.chainers
if pool:
......@@ -198,33 +198,33 @@ class Grouper(ABC):
]
return records
@property
def nmembers(self) -> t.Dict[str, int]:
# Return the number of positions belonging to each group
"""Get the number of positions belonging to each group."""
return Counter(self.positions_groups.values())
@property
def ntraps(self):
"""Get total number of traps per position (h5 file)."""
for pos, s in self.chainers.items():
with h5py.File(s.filename, "r") as f:
print(pos, f["/trap_info/trap_locations"].shape[0])
@property
def ntraps_by_pos(self) -> t.Dict[str, int]:
# Return total number of traps grouped
def ntraps_by_group(self) -> t.Dict[str, int]:
"""Get total number of traps per group."""
ntraps = {}
for pos, s in self.chainers.items():
with h5py.File(s.filename, "r") as f:
ntraps[pos] = f["/trap_info/trap_locations"].shape[0]
ntraps_by_pos = {k: 0 for k in self.groups}
ntraps_by_group = {k: 0 for k in self.groups}
for posname, vals in ntraps.items():
ntraps_by_pos[self.positions_groups[posname]] += vals
ntraps_by_group[self.positions_groups[posname]] += vals
return ntraps_by_group
return ntraps_by_pos
def traplocs(self):
@property
def traplocs(self) -> t.Dict[str, np.ndarray]:
"""Get the locations of the traps for each position as a dictionary."""
d = {}
for pos, s in self.chainers.items():
with h5py.File(s.filename, "r") as f:
......@@ -233,20 +233,21 @@ class Grouper(ABC):
@property
def groups(self) -> t.Tuple[str]:
# Return groups sorted alphabetically
"""Get groups, sorted alphabetically."""
return tuple(sorted(set(self.positions_groups.values())))
@property
def positions(self) -> t.Tuple[str]:
# Return positions sorted alphabetically
"""Get positions, sorted alphabetically."""
return tuple(sorted(set(self.positions_groups.keys())))
def ncells(
self, path="extraction/general/None/area", mode="retained", **kwargs
self,
path="extraction/general/None/area",
mode="retained",
**kwargs,
) -> t.Dict[str, int]:
"""
Returns number of cells retained per position in base channel
"""
"""Get number of cells retained per position in base channel as a dictionary."""
return (
self.concat_signal(path=path, mode=mode, **kwargs)
.groupby("group")
......@@ -256,11 +257,12 @@ class Grouper(ABC):
@property
def nretained(self) -> t.Dict[str, int]:
"""Get number of cells retained per position in base channel as a dictionary."""
return self.ncells()
@property
def channels(self):
"""Get unique channels for all chains as a set."""
return set(
[
channel
......@@ -271,20 +273,24 @@ class Grouper(ABC):
@property
def stages_span(self):
# FAILS on my example
return self.fsignal.stages_span
@property
def max_span(self):
# FAILS on my example
return self.fsignal.max_span
@property
def tinterval(self):
return self.fsignal.tinterval
@property
def stages(self):
# FAILS on my example
return self.fsignal.stages
@property
def tinterval(self):
"""Get interval between time points."""
return self.fsignal.tinterval
class MetaGrouper(Grouper):
"""Group positions using metadata's 'group' number."""
......@@ -293,22 +299,21 @@ class MetaGrouper(Grouper):
class NameGrouper(Grouper):
"""Group a set of positions using a subsection of the name."""
"""Group a set of positions with a shorter version of the group's name."""
def __init__(self, dir, criteria=None):
def __init__(self, dir, name_inds=(0, -4)):
"""Define the indices to slice names."""
super().__init__(dir=dir)
# what does criteria do?
if criteria is None:
criteria = (0, -4)
self.criteria = criteria
self.name_inds = name_inds
@property
def positions_groups(self) -> t.Dict[str, str]:
"""Get a dictionary with the positions as keys and groups as items."""
if not hasattr(self, "_positions_groups"):
self._positions_groups = {}
for name in self.chainers.keys():
self._positions_groups[name] = name[
self.criteria[0] : self.criteria[1]
self.name_inds[0] : self.name_inds[1]
]
return self._positions_groups
......@@ -316,8 +321,9 @@ class NameGrouper(Grouper):
class phGrouper(NameGrouper):
"""Grouper for pH calibration experiments where all surveyed media pH values are within a single experiment."""
def __init__(self, dir, criteria=(3, 7)):
super().__init__(dir=dir, criteria=criteria)
def __init__(self, dir, name_inds=(3, 7)):
"""Initialise via NameGrouper."""
super().__init__(dir=dir, name_inds=name_inds)
def get_ph(self) -> None:
self.ph = {gn: self.ph_from_group(gn) for gn in self.positions_groups}
......@@ -350,6 +356,7 @@ class phGrouper(NameGrouper):
aggregated = pd.concat((aggregated, ph), axis=1)
return aggregated
# Alan: why are these separate functions?
def concat_standard(
path: str,
......@@ -367,6 +374,7 @@ def concat_standard(
)
return combined
# why _ind ?
def concat_signal_ind(
path: str,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment