steinbock.export
data
logger
try_convert_to_anndata_from_disk(intensity_files, *data_file_lists, *, neighbors_files=None, panel=None, image_info=None)
Source code in steinbock/export/data.py
def try_convert_to_anndata_from_disk(
intensity_files: Sequence[Union[str, PathLike]],
*data_file_lists,
neighbors_files: Optional[Sequence[Union[str, PathLike]]] = None,
panel: Optional[pd.DataFrame] = None,
image_info: Optional[pd.DataFrame] = None,
) -> Generator[Tuple[str, Path, Tuple[Path, ...], Optional[Path], AnnData], None, None]:
if panel is not None:
panel = panel.set_index("name", drop=False, verify_integrity=True)
if image_info is not None:
image_info = image_info.set_index("image", drop=False, verify_integrity=True)
for i, intensity_file in enumerate(intensity_files):
intensity_file = Path(intensity_file)
data_files = tuple(Path(dfl[i]) for dfl in data_file_lists)
neighbors_file = None
if neighbors_files is not None:
neighbors_file = Path(neighbors_files[i])
img_file_name = io._as_path_with_suffix(intensity_file, ".tiff").name
try:
x = io.read_data(intensity_file)
obs = None
if len(data_files) > 0:
obs = io.read_data(data_files[0])
for data_file in data_files[1:]:
obs = pd.merge(
obs,
io.read_data(data_file),
left_index=True,
right_index=True,
)
obs = obs.loc[x.index, :]
if image_info is not None:
image_obs = (
pd.concat([image_info.loc[img_file_name, :]] * len(x.index), axis=1)
.transpose()
.astype(image_info.dtypes.to_dict())
)
image_obs.index = x.index
image_obs.columns = "image_" + image_obs.columns
image_obs.rename(columns={"image_image": "image"}, inplace=True)
if obs is not None:
obs = pd.merge(
obs,
image_obs,
how="inner", # preserves order of left keys
left_index=True,
right_index=True,
)
else:
obs = image_obs
var = None
if panel is not None:
var = panel.loc[x.columns, :].copy()
if obs is not None:
obs.index = [f"Object {object_id}" for object_id in x.index]
if var is not None:
var.index = x.columns.astype(str).tolist()
# convert nullable string dtype to generic object dtype
# https://github.com/BodenmillerGroup/steinbock/issues/66
if obs is not None:
for col, dtype in zip(obs.columns, obs.dtypes):
if dtype == "string":
obs[col] = obs[col].astype(str)
if var is not None:
for col, dtype in zip(var.columns, var.dtypes):
if dtype == "string":
var[col] = var[col].astype(str)
adata = AnnData(X=x.values, obs=obs, var=var, dtype=np.float32)
if neighbors_file is not None:
neighbors = io.read_neighbors(neighbors_file)
row_ind = [x.index.get_loc(a) for a in neighbors["Object"]]
col_ind = [x.index.get_loc(b) for b in neighbors["Neighbor"]]
adata.obsp["adj"] = csr_matrix(
([True] * len(neighbors.index), (row_ind, col_ind)),
shape=(adata.n_obs, adata.n_obs),
dtype=np.uint8,
)
if neighbors["Distance"].notna().any():
adata.obsp["dist"] = csr_matrix(
(neighbors["Distance"].values, (row_ind, col_ind)),
shape=(adata.n_obs, adata.n_obs),
dtype=np.float32,
)
del neighbors
yield (
img_file_name,
intensity_file,
data_files,
neighbors_file,
adata,
)
del x, obs, var, adata
except Exception as e:
logger.exception(
f"Error creating AnnData object for image {img_file_name}: {e}; "
"skipping image"
)
try_convert_to_dataframe_from_disk(*data_file_lists)
Source code in steinbock/export/data.py
def try_convert_to_dataframe_from_disk(
*data_file_lists,
) -> Generator[Tuple[str, Tuple[Path, ...], pd.DataFrame], None, None]:
for data_files in zip(*data_file_lists):
data_files = tuple(Path(data_file) for data_file in data_files)
img_file_name = io._as_path_with_suffix(data_files[0], ".tiff").name
try:
df = io.read_data(data_files[0])
for data_file in data_files[1:]:
df = pd.merge(
df,
io.read_data(data_file),
left_index=True,
right_index=True,
)
yield img_file_name, data_files, df
del df
except Exception as e:
logger.exception(
f"Error creating DataFrame for image {img_file_name}: {e}; "
"skipping image"
)
graphs
logger
convert_to_networkx(neighbors, *data_list)
Source code in steinbock/export/graphs.py
def convert_to_networkx(neighbors: pd.DataFrame, *data_list) -> nx.Graph:
edges = neighbors[["Object", "Neighbor"]].astype(int).values.tolist()
undirected_edges = [tuple(sorted(edge)) for edge in edges]
is_directed = any([x != 2 for x in Counter(undirected_edges).values()])
graph: nx.Graph = nx.from_pandas_edgelist(
neighbors,
source="Object",
target="Neighbor",
edge_attr=True,
create_using=nx.DiGraph if is_directed else nx.Graph,
)
if len(data_list) > 0:
merged_data = data_list[0]
for data in data_list[1:]:
merged_data = pd.merge(merged_data, data, left_index=True, right_index=True)
node_attributes = {
int(object_id): object_data.to_dict()
for object_id, object_data in merged_data.iterrows()
}
nx.set_node_attributes(graph, node_attributes)
return graph
try_convert_to_networkx_from_disk(neighbors_files, *data_file_lists)
Source code in steinbock/export/graphs.py
def try_convert_to_networkx_from_disk(
neighbors_files: Sequence[Union[str, PathLike]], *data_file_lists
) -> Generator[Tuple[Path, Tuple[Path, ...], nx.Graph], None, None]:
for neighbors_file, *data_files in zip(neighbors_files, *data_file_lists):
data_files = tuple(Path(data_file) for data_file in data_files)
try:
neighbors = io.read_neighbors(neighbors_file)
data_list = [io.read_data(data_file) for data_file in data_files]
graph = convert_to_networkx(neighbors, *data_list)
yield Path(neighbors_file), data_files, graph
del neighbors, data_list, graph
except Exception as e:
logger.exception(f"Error converting {neighbors_file} to networkx: {e}")