Skip to content

steinbock.preprocessing

external

create_panel_from_image_files(ext_img_files)

Source code in steinbock/preprocessing/external.py
def create_panel_from_image_files(
    ext_img_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
    num_channels = None
    for ext_img_file in ext_img_files:
        try:
            ext_img = _read_external_image(ext_img_file)
            num_channels = ext_img.shape[0]
            break
        except:
            pass  # skipped intentionally
    if num_channels is None:
        raise IOError("No valid images found")
    panel = pd.DataFrame(
        data={
            "channel": range(1, num_channels + 1),
            "name": np.nan,
            "keep": True,
            "ilastik": range(1, num_channels + 1),
            "deepcell": np.nan,
        },
    )
    panel["channel"] = panel["channel"].astype(pd.StringDtype())
    panel["name"] = panel["name"].astype(pd.StringDtype())
    panel["keep"] = panel["keep"].astype(pd.BooleanDtype())
    panel["ilastik"] = panel["ilastik"].astype(pd.UInt8Dtype())
    panel["deepcell"] = panel["deepcell"].astype(pd.UInt8Dtype())
    return panel

list_image_files(ext_img_dir)

Source code in steinbock/preprocessing/external.py
def list_image_files(ext_img_dir: Union[str, PathLike]) -> List[Path]:
    return sorted(Path(ext_img_dir).rglob("*.*"))

try_preprocess_images_from_disk(ext_img_files)

Source code in steinbock/preprocessing/external.py
def try_preprocess_images_from_disk(
    ext_img_files: Sequence[Union[str, PathLike]]
) -> Generator[Tuple[Path, np.ndarray], None, None]:
    for ext_img_file in ext_img_files:
        try:
            img = _read_external_image(ext_img_file)
        except:
            _logger.warning(f"Unsupported file format: {ext_img_file}")
            continue
        yield ext_img_file, img
        del img

imc

imc_available

create_panel_from_imc_panel(imc_panel_file, imc_panel_channel_col='Metal Tag', imc_panel_name_col='Target', imc_panel_keep_col='full', imc_panel_ilastik_col='ilastik')

Source code in steinbock/preprocessing/imc.py
def create_panel_from_imc_panel(
    imc_panel_file: Union[str, PathLike],
    imc_panel_channel_col: str = "Metal Tag",
    imc_panel_name_col: str = "Target",
    imc_panel_keep_col: str = "full",
    imc_panel_ilastik_col: str = "ilastik",
) -> pd.DataFrame:
    imc_panel = pd.read_csv(
        imc_panel_file,
        sep=",|;",
        dtype={
            imc_panel_channel_col: pd.StringDtype(),
            imc_panel_name_col: pd.StringDtype(),
            imc_panel_keep_col: pd.BooleanDtype(),
            imc_panel_ilastik_col: pd.BooleanDtype(),
        },
        engine="python",
        true_values=["1"],
        false_values=["0"],
    )
    for required_col in (imc_panel_channel_col, imc_panel_name_col):
        if required_col not in imc_panel:
            raise ValueError(f"Missing '{required_col}' column in IMC panel")
    for notnan_col in (
        imc_panel_channel_col,
        imc_panel_keep_col,
        imc_panel_ilastik_col,
    ):
        if notnan_col in imc_panel and imc_panel[notnan_col].isna().any():
            raise ValueError(f"Missing values for '{notnan_col}' in IMC panel")
    rename_columns = {
        imc_panel_channel_col: "channel",
        imc_panel_name_col: "name",
        imc_panel_keep_col: "keep",
        imc_panel_ilastik_col: "ilastik",
    }
    drop_columns = [
        panel_col
        for imc_panel_col, panel_col in rename_columns.items()
        if panel_col in imc_panel.columns and panel_col != imc_panel_col
    ]
    panel = imc_panel.drop(columns=drop_columns).rename(columns=rename_columns)
    for _, g in panel.groupby("channel"):
        panel.loc[g.index, "name"] = " / ".join(g["name"].dropna().unique())
        if "keep" in panel:
            panel.loc[g.index, "keep"] = g["keep"].any()
        if "ilastik" in panel:
            panel.loc[g.index, "ilastik"] = g["ilastik"].any()
    panel = panel.groupby(panel["channel"].values).aggregate("first")
    panel = _clean_panel(panel)  # ilastik column may be nullable uint8 now
    ilastik_mask = panel["ilastik"].fillna(False).astype(bool)
    panel["ilastik"] = pd.Series(dtype=pd.UInt8Dtype())
    panel.loc[ilastik_mask, "ilastik"] = range(1, ilastik_mask.sum() + 1)
    return panel

create_panel_from_mcd_files(mcd_files)

Source code in steinbock/preprocessing/imc.py
def create_panel_from_mcd_files(
    mcd_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
    panels = []
    for mcd_file in mcd_files:
        with MCDFile(mcd_file) as f:
            for slide in f.slides:
                for acquisition in slide.acquisitions:
                    panel = pd.DataFrame(
                        data={
                            "channel": pd.Series(
                                data=acquisition.channel_names,
                                dtype=pd.StringDtype(),
                            ),
                            "name": pd.Series(
                                data=acquisition.channel_labels,
                                dtype=pd.StringDtype(),
                            ),
                        },
                    )
                    panels.append(panel)
    panel = pd.concat(panels, ignore_index=True, copy=False)
    panel.drop_duplicates(inplace=True, ignore_index=True)
    return _clean_panel(panel)

create_panel_from_txt_files(txt_files)

Source code in steinbock/preprocessing/imc.py
def create_panel_from_txt_files(
    txt_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
    panels = []
    for txt_file in txt_files:
        with TXTFile(txt_file) as f:
            panel = pd.DataFrame(
                data={
                    "channel": pd.Series(
                        data=f.channel_names, dtype=pd.StringDtype()
                    ),
                    "name": pd.Series(
                        data=f.channel_labels, dtype=pd.StringDtype()
                    ),
                },
            )
            panels.append(panel)
    panel = pd.concat(panels, ignore_index=True, copy=False)
    panel.drop_duplicates(inplace=True, ignore_index=True)
    return _clean_panel(panel)

filter_hot_pixels(img, thres)

Source code in steinbock/preprocessing/imc.py
def filter_hot_pixels(img: np.ndarray, thres: float) -> np.ndarray:
    kernel = np.ones((1, 3, 3), dtype=bool)
    kernel[0, 1, 1] = False
    max_neighbor_img = maximum_filter(img, footprint=kernel, mode="mirror")
    return np.where(img - max_neighbor_img > thres, max_neighbor_img, img)

list_mcd_files(mcd_dir)

Source code in steinbock/preprocessing/imc.py
def list_mcd_files(mcd_dir: Union[str, PathLike]) -> List[Path]:
    return sorted(Path(mcd_dir).rglob("*.mcd"))

list_txt_files(txt_dir)

Source code in steinbock/preprocessing/imc.py
def list_txt_files(txt_dir: Union[str, PathLike]) -> List[Path]:
    return sorted(Path(txt_dir).rglob("*.txt"))

preprocess_image(img, hpf=None)

Source code in steinbock/preprocessing/imc.py
def preprocess_image(
    img: np.ndarray, hpf: Optional[float] = None
) -> np.ndarray:
    img = img.astype(np.float32)
    if hpf is not None:
        img = filter_hot_pixels(img, hpf)
    return io._to_dtype(img, io.img_dtype)

try_preprocess_images_from_disk(mcd_files, txt_files, channel_names=None, hpf=None)

Source code in steinbock/preprocessing/imc.py
def try_preprocess_images_from_disk(
    mcd_files: Sequence[Union[str, PathLike]],
    txt_files: Sequence[Union[str, PathLike]],
    channel_names: Optional[Sequence[str]] = None,
    hpf: Optional[float] = None,
) -> Generator[
    Tuple[Path, Optional["Acquisition"], np.ndarray, Optional[Path], bool],
    None,
    None,
]:
    unmatched_txt_files = list(txt_files)
    for mcd_file in mcd_files:
        try:
            with MCDFile(mcd_file) as f_mcd:
                for slide in f_mcd.slides:
                    for acquisition in slide.acquisitions:
                        matched_txt_file = _match_txt_file(
                            mcd_file, acquisition, unmatched_txt_files
                        )
                        if matched_txt_file is not None:
                            unmatched_txt_files.remove(matched_txt_file)
                        channel_ind = None
                        if channel_names is not None:
                            channel_ind = _get_channel_indices(
                                acquisition, channel_names
                            )
                            if isinstance(channel_ind, str):
                                _logger.warning(
                                    f"Channel {channel_ind} not found for "
                                    f"acquisition {acquisition.id} in file "
                                    "{mcd_file}; skipping acquisition"
                                )
                                continue
                        img = None
                        recovered = False
                        try:
                            img = f_mcd.read_acquisition(acquisition)
                        except IOError:
                            _logger.warning(
                                f"Error reading acquisition {acquisition.id} "
                                f"from file {mcd_file}"
                            )
                            if matched_txt_file is not None:
                                _logger.warning(
                                    f"Restoring from file {matched_txt_file}"
                                )
                                try:
                                    with TXTFile(matched_txt_file) as f_txt:
                                        img = f_txt.read_acquisition()
                                        if channel_names is not None:
                                            channel_ind = _get_channel_indices(
                                                f_txt, channel_names
                                            )
                                            if isinstance(channel_ind, str):
                                                _logger.warning(
                                                    f"Channel {channel_ind} "
                                                    "not found in file "
                                                    f"{matched_txt_file}; "
                                                    "skipping acquisition"
                                                )
                                                continue
                                    recovered = True
                                except IOError:
                                    _logger.exception(
                                        "Error reading file "
                                        f"{matched_txt_file}"
                                    )
                        if img is not None:  # exceptions ...
                            if channel_ind is not None:
                                img = img[channel_ind, :, :]
                            img = preprocess_image(img, hpf=hpf)
                            yield (
                                Path(mcd_file),
                                acquisition,
                                img,
                                Path(matched_txt_file)
                                if matched_txt_file is not None
                                else None,
                                recovered,
                            )
                            del img
        except:
            _logger.exception(f"Error reading file {mcd_file}")
    while len(unmatched_txt_files) > 0:
        txt_file = unmatched_txt_files.pop(0)
        try:
            channel_ind = None
            with TXTFile(txt_file) as f:
                if channel_names is not None:
                    channel_ind = _get_channel_indices(f, channel_names)
                    if isinstance(channel_ind, str):
                        _logger.warning(
                            f"Channel {channel_ind} not found in file "
                            f"{txt_file}; skipping acquisition"
                        )
                        continue
                img = f.read_acquisition()
            if channel_ind is not None:
                img = img[channel_ind, :, :]
            img = preprocess_image(img, hpf=hpf)
            yield Path(txt_file), None, img, None, False
            del img
        except:
            _logger.exception(f"Error reading file {txt_file}")
Back to top