Skip to content

steinbock.preprocessing

external

logger

SteinbockExternalPreprocessingException (SteinbockPreprocessingException)

Source code in steinbock/preprocessing/external.py
class SteinbockExternalPreprocessingException(SteinbockPreprocessingException):
    pass

create_panel_from_image_files(ext_img_files)

Source code in steinbock/preprocessing/external.py
def create_panel_from_image_files(
    ext_img_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
    num_channels = None
    for ext_img_file in ext_img_files:
        try:
            ext_img = _read_external_image(ext_img_file)
            num_channels = ext_img.shape[0]
            break
        except Exception:
            pass  # skipped intentionally
    if num_channels is None:
        raise SteinbockExternalPreprocessingException("No valid images found")
    panel = pd.DataFrame(
        data={
            "channel": range(1, num_channels + 1),
            "name": np.nan,
            "keep": True,
            "ilastik": range(1, num_channels + 1),
            "deepcell": np.nan,
            "cellpose": np.nan,
        },
    )
    panel["channel"] = panel["channel"].astype(pd.StringDtype())
    panel["name"] = panel["name"].astype(pd.StringDtype())
    panel["keep"] = panel["keep"].astype(pd.BooleanDtype())
    panel["ilastik"] = panel["ilastik"].astype(pd.UInt8Dtype())
    panel["deepcell"] = panel["deepcell"].astype(pd.UInt8Dtype())
    panel["cellpose"] = panel["cellpose"].astype(pd.UInt8Dtype())
    return panel

list_image_files(ext_img_dir)

Source code in steinbock/preprocessing/external.py
def list_image_files(ext_img_dir: Union[str, PathLike]) -> List[Path]:
    return sorted(Path(ext_img_dir).rglob("[!.]*.*"))

try_preprocess_images_from_disk(ext_img_files)

Source code in steinbock/preprocessing/external.py
def try_preprocess_images_from_disk(
    ext_img_files: Sequence[Union[str, PathLike]]
) -> Generator[Tuple[Path, np.ndarray], None, None]:
    for ext_img_file in ext_img_files:
        try:
            img = _read_external_image(ext_img_file)
        except Exception:
            logger.warning(f"Unsupported file format: {ext_img_file}")
            continue
        yield Path(ext_img_file), img
        del img

imc

imc_available

logger

SteinbockIMCPreprocessingException (SteinbockPreprocessingException)

Source code in steinbock/preprocessing/imc.py
class SteinbockIMCPreprocessingException(SteinbockPreprocessingException):
    pass

create_image_info(mcd_txt_file, acquisition, img, recovery_file, recovered, img_file)

Source code in steinbock/preprocessing/imc.py
def create_image_info(
    mcd_txt_file: Union[str, PathLike],
    acquisition: Optional[Acquisition],
    img: np.ndarray,
    recovery_file: Union[str, PathLike, None],
    recovered: bool,
    img_file: Union[str, PathLike],
) -> Dict[str, Any]:
    recovery_file_name = None
    if recovery_file is not None:
        recovery_file_name = Path(recovery_file).name
    image_info_row = {
        "image": Path(img_file).name,
        "width_px": img.shape[2],
        "height_px": img.shape[1],
        "num_channels": img.shape[0],
        "source_file": Path(mcd_txt_file).name,
        "recovery_file": recovery_file_name,
        "recovered": recovered,
    }
    if acquisition is not None:
        image_info_row.update(
            {
                "acquisition_id": acquisition.id,
                "acquisition_description": acquisition.description,
                "acquisition_start_x_um": (acquisition.roi_points_um[0][0]),
                "acquisition_start_y_um": (acquisition.roi_points_um[0][1]),
                "acquisition_end_x_um": (acquisition.roi_points_um[2][0]),
                "acquisition_end_y_um": (acquisition.roi_points_um[2][1]),
                "acquisition_width_um": acquisition.width_um,
                "acquisition_height_um": acquisition.height_um,
            }
        )
    return image_info_row

create_panel_from_imc_panel(imc_panel_file, imc_panel_channel_col='Metal Tag', imc_panel_name_col='Target', imc_panel_keep_col='full', imc_panel_ilastik_col='ilastik')

Source code in steinbock/preprocessing/imc.py
def create_panel_from_imc_panel(
    imc_panel_file: Union[str, PathLike],
    imc_panel_channel_col: str = "Metal Tag",
    imc_panel_name_col: str = "Target",
    imc_panel_keep_col: str = "full",
    imc_panel_ilastik_col: str = "ilastik",
) -> pd.DataFrame:
    imc_panel = pd.read_csv(
        imc_panel_file,
        sep=",|;",
        dtype={
            imc_panel_channel_col: pd.StringDtype(),
            imc_panel_name_col: pd.StringDtype(),
            imc_panel_keep_col: pd.BooleanDtype(),
            imc_panel_ilastik_col: pd.BooleanDtype(),
        },
        engine="python",
        true_values=["1"],
        false_values=["0"],
    )
    for required_col in (imc_panel_channel_col, imc_panel_name_col):
        if required_col not in imc_panel:
            raise SteinbockIMCPreprocessingException(
                f"Missing '{required_col}' column in IMC panel"
            )
    for notnan_col in (
        imc_panel_channel_col,
        imc_panel_keep_col,
        imc_panel_ilastik_col,
    ):
        if notnan_col in imc_panel and imc_panel[notnan_col].isna().any():
            raise SteinbockIMCPreprocessingException(
                f"Missing values for '{notnan_col}' in IMC panel"
            )
    rename_columns = {
        imc_panel_channel_col: "channel",
        imc_panel_name_col: "name",
        imc_panel_keep_col: "keep",
        imc_panel_ilastik_col: "ilastik",
    }
    drop_columns = [
        panel_col
        for imc_panel_col, panel_col in rename_columns.items()
        if panel_col in imc_panel.columns and panel_col != imc_panel_col
    ]
    panel = imc_panel.drop(columns=drop_columns).rename(columns=rename_columns)
    for _, g in panel.groupby("channel"):
        panel.loc[g.index, "name"] = " / ".join(g["name"].dropna().unique())
        if "keep" in panel:
            panel.loc[g.index, "keep"] = g["keep"].any()
        if "ilastik" in panel:
            panel.loc[g.index, "ilastik"] = g["ilastik"].any()
    panel = panel.groupby(panel["channel"].values).aggregate("first")
    panel = _clean_panel(panel)  # ilastik column may be nullable uint8 now
    ilastik_mask = panel["ilastik"].fillna(False).astype(bool)
    panel["ilastik"] = pd.Series(dtype=pd.UInt8Dtype())
    panel.loc[ilastik_mask, "ilastik"] = range(1, ilastik_mask.sum() + 1)
    return panel

create_panel_from_mcd_files(mcd_files, unzip=False)

Source code in steinbock/preprocessing/imc.py
def create_panel_from_mcd_files(
    mcd_files: Sequence[Union[str, PathLike]], unzip: bool = False
) -> pd.DataFrame:
    panels = []
    for mcd_file in mcd_files:
        zip_file_mcd_member = _get_zip_file_member(mcd_file)
        if zip_file_mcd_member is None:
            panels += create_panels_from_mcd_file(mcd_file)
        elif unzip:
            zip_file, mcd_member = zip_file_mcd_member
            with ZipFile(zip_file) as fzip:
                with TemporaryDirectory() as temp_dir:
                    extracted_mcd_file = fzip.extract(mcd_member, path=temp_dir)
                    panels += create_panels_from_mcd_file(extracted_mcd_file)
    panel = pd.concat(panels, ignore_index=True, copy=False)
    panel.drop_duplicates(inplace=True, ignore_index=True)
    return _clean_panel(panel)

create_panel_from_txt_file(txt_file)

Source code in steinbock/preprocessing/imc.py
def create_panel_from_txt_file(txt_file: Union[str, PathLike]) -> pd.DataFrame:
    with TXTFile(txt_file) as f:
        return pd.DataFrame(
            data={
                "channel": pd.Series(data=f.channel_names, dtype=pd.StringDtype()),
                "name": pd.Series(data=f.channel_labels, dtype=pd.StringDtype()),
            },
        )

create_panel_from_txt_files(txt_files, unzip=False)

Source code in steinbock/preprocessing/imc.py
def create_panel_from_txt_files(
    txt_files: Sequence[Union[str, PathLike]], unzip: bool = False
) -> pd.DataFrame:
    panels = []
    for txt_file in txt_files:
        zip_file_txt_member = _get_zip_file_member(txt_file)
        if zip_file_txt_member is None:
            panel = create_panel_from_txt_file(txt_file)
            panels.append(panel)
        elif unzip:
            zip_file, txt_member = zip_file_txt_member
            with ZipFile(zip_file) as fzip:
                with TemporaryDirectory() as temp_dir:
                    extracted_txt_file = fzip.extract(txt_member, path=temp_dir)
                    panel = create_panel_from_txt_file(extracted_txt_file)
                    panels.append(panel)
    panel = pd.concat(panels, ignore_index=True, copy=False)
    panel.drop_duplicates(inplace=True, ignore_index=True)
    return _clean_panel(panel)

create_panels_from_mcd_file(mcd_file)

Source code in steinbock/preprocessing/imc.py
def create_panels_from_mcd_file(mcd_file: Union[str, PathLike]) -> List[pd.DataFrame]:
    panels = []
    with MCDFile(mcd_file) as f:
        for slide in f.slides:
            for acquisition in slide.acquisitions:
                panel = pd.DataFrame(
                    data={
                        "channel": pd.Series(
                            data=acquisition.channel_names,
                            dtype=pd.StringDtype(),
                        ),
                        "name": pd.Series(
                            data=acquisition.channel_labels,
                            dtype=pd.StringDtype(),
                        ),
                    },
                )
                panels.append(panel)
    return panels

filter_hot_pixels(img, thres)

Source code in steinbock/preprocessing/imc.py
def filter_hot_pixels(img: np.ndarray, thres: float) -> np.ndarray:
    kernel = np.ones((1, 3, 3), dtype=bool)
    kernel[0, 1, 1] = False
    max_neighbor_img = maximum_filter(img, footprint=kernel, mode="mirror")
    return np.where(img - max_neighbor_img > thres, max_neighbor_img, img)

list_mcd_files(mcd_dir, unzip=False)

Source code in steinbock/preprocessing/imc.py
def list_mcd_files(mcd_dir: Union[str, PathLike], unzip: bool = False) -> List[Path]:
    mcd_files = sorted(Path(mcd_dir).rglob("[!.]*.mcd"))
    if unzip:
        for zip_file in sorted(Path(mcd_dir).rglob("[!.]*.zip")):
            with ZipFile(zip_file) as fzip:
                for zip_info in sorted(fzip.infolist(), key=lambda x: x.filename):
                    if not zip_info.is_dir() and zip_info.filename.endswith(".mcd"):
                        mcd_files.append(zip_file / zip_info.filename)
    return mcd_files

list_txt_files(txt_dir, unzip=False)

Source code in steinbock/preprocessing/imc.py
def list_txt_files(txt_dir: Union[str, PathLike], unzip: bool = False) -> List[Path]:
    txt_files = sorted(Path(txt_dir).rglob("[!.]*.txt"))
    if unzip:
        for zip_file in sorted(Path(txt_dir).rglob("[!.]*.zip")):
            with ZipFile(zip_file) as fzip:
                for zip_info in sorted(fzip.infolist(), key=lambda x: x.filename):
                    if not zip_info.is_dir() and zip_info.filename.endswith(".txt"):
                        txt_files.append(zip_file / zip_info.filename)
    return txt_files

preprocess_image(img, hpf=None)

Source code in steinbock/preprocessing/imc.py
def preprocess_image(img: np.ndarray, hpf: Optional[float] = None) -> np.ndarray:
    img = img.astype(np.float32)
    if hpf is not None:
        img = filter_hot_pixels(img, hpf)
    return io._to_dtype(img, io.img_dtype)

try_preprocess_images_from_disk(mcd_files, txt_files, channel_names=None, hpf=None, unzip=False, strict=False)

Source code in steinbock/preprocessing/imc.py
def try_preprocess_images_from_disk(
    mcd_files: Sequence[Union[str, PathLike]],
    txt_files: Sequence[Union[str, PathLike]],
    channel_names: Optional[Sequence[str]] = None,
    hpf: Optional[float] = None,
    unzip: bool = False,
    strict: bool = False,
) -> Generator[
    Tuple[Path, Optional["Acquisition"], np.ndarray, Optional[Path], bool],
    None,
    None,
]:
    candidate_txt_files = list(txt_files)
    # process mcd files in reverse order to avoid ambiguous txt file matching
    # see https://github.com/BodenmillerGroup/steinbock/issues/100
    for mcd_file in sorted(
        mcd_files, key=lambda mcd_file: Path(mcd_file).stem, reverse=True
    ):
        zip_file_mcd_member = _get_zip_file_member(mcd_file)
        if zip_file_mcd_member is None:
            for (
                acquisition,
                img,
                recovery_txt_file,
                recovered,
            ) in _try_preprocess_mcd_images_from_disk(
                mcd_file,
                candidate_txt_files,
                channel_names=channel_names,
                hpf=hpf,
                unzip=unzip,
                strict=strict,
            ):
                yield Path(mcd_file), acquisition, img, recovery_txt_file, recovered
                del img
        elif unzip:
            zip_file, mcd_member = zip_file_mcd_member
            with ZipFile(zip_file) as fzip:
                with TemporaryDirectory() as temp_dir:
                    extracted_mcd_file = fzip.extract(mcd_member, path=temp_dir)
                    for (
                        acquisition,
                        img,
                        recovery_txt_file,
                        recovered,
                    ) in _try_preprocess_mcd_images_from_disk(
                        extracted_mcd_file,
                        candidate_txt_files,
                        channel_names=channel_names,
                        hpf=hpf,
                        unzip=unzip,
                        strict=strict,
                    ):
                        yield (
                            Path(mcd_file),
                            acquisition,
                            img,
                            recovery_txt_file,
                            recovered,
                        )
                        del img
    for txt_file in candidate_txt_files:
        zip_file_txt_member = _get_zip_file_member(txt_file)
        if zip_file_txt_member is None:
            img = _try_preprocess_txt_image_from_disk(
                txt_file, channel_names=channel_names, hpf=hpf
            )
            if img is not None:
                yield Path(txt_file), None, img, None, False
                del img
        elif unzip:
            zip_file, txt_member = zip_file_txt_member
            with ZipFile(zip_file) as fzip:
                with TemporaryDirectory() as temp_dir:
                    extracted_txt_file = fzip.extract(txt_member, path=temp_dir)
                    img = _try_preprocess_txt_image_from_disk(
                        extracted_txt_file, channel_names=channel_names, hpf=hpf
                    )
                    if img is not None:
                        yield Path(txt_file), None, img, None, False
                        del img