Skip to content

steinbock.export

data

logger

try_convert_to_anndata_from_disk(intensity_files, *data_file_lists, *, neighbors_files=None, panel=None, image_info=None)

Source code in steinbock/export/data.py
def try_convert_to_anndata_from_disk(
    intensity_files: Sequence[Union[str, PathLike]],
    *data_file_lists,
    neighbors_files: Optional[Sequence[Union[str, PathLike]]] = None,
    panel: Optional[pd.DataFrame] = None,
    image_info: Optional[pd.DataFrame] = None,
) -> Generator[Tuple[str, Path, Tuple[Path, ...], Optional[Path], AnnData], None, None]:
    if panel is not None:
        panel = panel.set_index("name", drop=False, verify_integrity=True)
    if image_info is not None:
        image_info = image_info.set_index("image", drop=False, verify_integrity=True)
    for i, intensity_file in enumerate(intensity_files):
        intensity_file = Path(intensity_file)
        data_files = tuple(Path(dfl[i]) for dfl in data_file_lists)
        neighbors_file = None
        if neighbors_files is not None:
            neighbors_file = Path(neighbors_files[i])
        img_file_name = io._as_path_with_suffix(intensity_file, ".tiff").name
        try:
            x = io.read_data(intensity_file)
            obs = None
            if len(data_files) > 0:
                obs = io.read_data(data_files[0])
                for data_file in data_files[1:]:
                    obs = pd.merge(
                        obs,
                        io.read_data(data_file),
                        left_index=True,
                        right_index=True,
                    )
                obs = obs.loc[x.index, :]
            if image_info is not None:
                image_obs = (
                    pd.concat([image_info.loc[img_file_name, :]] * len(x.index), axis=1)
                    .transpose()
                    .astype(image_info.dtypes.to_dict())
                )
                image_obs.index = x.index
                image_obs.columns = "image_" + image_obs.columns
                image_obs.rename(columns={"image_image": "image"}, inplace=True)
                if obs is not None:
                    obs = pd.merge(
                        obs,
                        image_obs,
                        how="inner",  # preserves order of left keys
                        left_index=True,
                        right_index=True,
                    )
                else:
                    obs = image_obs
            var = None
            if panel is not None:
                var = panel.loc[x.columns, :].copy()
            if obs is not None:
                obs.index = [f"Object {object_id}" for object_id in x.index]
            if var is not None:
                var.index = x.columns.astype(str).tolist()
            # convert nullable string dtype to generic object dtype
            # https://github.com/BodenmillerGroup/steinbock/issues/66
            if obs is not None:
                for col, dtype in zip(obs.columns, obs.dtypes):
                    if dtype == "string":
                        obs[col] = obs[col].astype(str)
            if var is not None:
                for col, dtype in zip(var.columns, var.dtypes):
                    if dtype == "string":
                        var[col] = var[col].astype(str)
            adata = AnnData(X=x.values, obs=obs, var=var, dtype=np.float32)
            if neighbors_file is not None:
                neighbors = io.read_neighbors(neighbors_file)
                row_ind = [x.index.get_loc(a) for a in neighbors["Object"]]
                col_ind = [x.index.get_loc(b) for b in neighbors["Neighbor"]]
                adata.obsp["adj"] = csr_matrix(
                    ([True] * len(neighbors.index), (row_ind, col_ind)),
                    shape=(adata.n_obs, adata.n_obs),
                    dtype=np.uint8,
                )
                if neighbors["Distance"].notna().any():
                    adata.obsp["dist"] = csr_matrix(
                        (neighbors["Distance"].values, (row_ind, col_ind)),
                        shape=(adata.n_obs, adata.n_obs),
                        dtype=np.float32,
                    )
                del neighbors
            yield (
                img_file_name,
                intensity_file,
                data_files,
                neighbors_file,
                adata,
            )
            del x, obs, var, adata
        except Exception as e:
            logger.exception(
                f"Error creating AnnData object for image {img_file_name}: {e}; "
                "skipping image"
            )

try_convert_to_dataframe_from_disk(*data_file_lists)

Source code in steinbock/export/data.py
def try_convert_to_dataframe_from_disk(
    *data_file_lists,
) -> Generator[Tuple[str, Tuple[Path, ...], pd.DataFrame], None, None]:
    for data_files in zip(*data_file_lists):
        data_files = tuple(Path(data_file) for data_file in data_files)
        img_file_name = io._as_path_with_suffix(data_files[0], ".tiff").name
        try:
            df = io.read_data(data_files[0])
            for data_file in data_files[1:]:
                df = pd.merge(
                    df,
                    io.read_data(data_file),
                    left_index=True,
                    right_index=True,
                )
            yield img_file_name, data_files, df
            del df
        except Exception as e:
            logger.exception(
                f"Error creating DataFrame for image {img_file_name}: {e}; "
                "skipping image"
            )

graphs

logger

convert_to_networkx(neighbors, *data_list)

Source code in steinbock/export/graphs.py
def convert_to_networkx(neighbors: pd.DataFrame, *data_list) -> nx.Graph:
    edges = neighbors[["Object", "Neighbor"]].astype(int).values.tolist()
    undirected_edges = [tuple(sorted(edge)) for edge in edges]
    is_directed = any([x != 2 for x in Counter(undirected_edges).values()])
    graph: nx.Graph = nx.from_pandas_edgelist(
        neighbors,
        source="Object",
        target="Neighbor",
        edge_attr=True,
        create_using=nx.DiGraph if is_directed else nx.Graph,
    )
    if len(data_list) > 0:
        merged_data = data_list[0]
        for data in data_list[1:]:
            merged_data = pd.merge(merged_data, data, left_index=True, right_index=True)
        node_attributes = {
            int(object_id): object_data.to_dict()
            for object_id, object_data in merged_data.iterrows()
        }
        nx.set_node_attributes(graph, node_attributes)
    return graph

try_convert_to_networkx_from_disk(neighbors_files, *data_file_lists)

Source code in steinbock/export/graphs.py
def try_convert_to_networkx_from_disk(
    neighbors_files: Sequence[Union[str, PathLike]], *data_file_lists
) -> Generator[Tuple[Path, Tuple[Path, ...], nx.Graph], None, None]:
    for neighbors_file, *data_files in zip(neighbors_files, *data_file_lists):
        data_files = tuple(Path(data_file) for data_file in data_files)
        try:
            neighbors = io.read_neighbors(neighbors_file)
            data_list = [io.read_data(data_file) for data_file in data_files]
            graph = convert_to_networkx(neighbors, *data_list)
            yield Path(neighbors_file), data_files, graph
            del neighbors, data_list, graph
        except Exception as e:
            logger.exception(f"Error converting {neighbors_file} to networkx: {e}")