steinbock.preprocessing
external
logger
SteinbockExternalPreprocessingException (SteinbockPreprocessingException)
Source code in steinbock/preprocessing/external.py
class SteinbockExternalPreprocessingException(SteinbockPreprocessingException):
pass
create_panel_from_image_files(ext_img_files)
Source code in steinbock/preprocessing/external.py
def create_panel_from_image_files(
ext_img_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
num_channels = None
for ext_img_file in ext_img_files:
try:
ext_img = _read_external_image(ext_img_file)
num_channels = ext_img.shape[0]
break
except Exception:
pass # skipped intentionally
if num_channels is None:
raise SteinbockExternalPreprocessingException("No valid images found")
panel = pd.DataFrame(
data={
"channel": range(1, num_channels + 1),
"name": np.nan,
"keep": True,
"ilastik": range(1, num_channels + 1),
"deepcell": np.nan,
"cellpose": np.nan,
},
)
panel["channel"] = panel["channel"].astype(pd.StringDtype())
panel["name"] = panel["name"].astype(pd.StringDtype())
panel["keep"] = panel["keep"].astype(pd.BooleanDtype())
panel["ilastik"] = panel["ilastik"].astype(pd.UInt8Dtype())
panel["deepcell"] = panel["deepcell"].astype(pd.UInt8Dtype())
panel["cellpose"] = panel["cellpose"].astype(pd.UInt8Dtype())
return panel
list_image_files(ext_img_dir)
Source code in steinbock/preprocessing/external.py
def list_image_files(ext_img_dir: Union[str, PathLike]) -> List[Path]:
return sorted(Path(ext_img_dir).rglob("[!.]*.*"))
try_preprocess_images_from_disk(ext_img_files)
Source code in steinbock/preprocessing/external.py
def try_preprocess_images_from_disk(
ext_img_files: Sequence[Union[str, PathLike]]
) -> Generator[Tuple[Path, np.ndarray], None, None]:
for ext_img_file in ext_img_files:
try:
img = _read_external_image(ext_img_file)
except Exception:
logger.warning(f"Unsupported file format: {ext_img_file}")
continue
yield Path(ext_img_file), img
del img
imc
imc_available
logger
SteinbockIMCPreprocessingException (SteinbockPreprocessingException)
Source code in steinbock/preprocessing/imc.py
class SteinbockIMCPreprocessingException(SteinbockPreprocessingException):
pass
create_image_info(mcd_txt_file, acquisition, img, recovery_file, recovered, img_file)
Source code in steinbock/preprocessing/imc.py
def create_image_info(
mcd_txt_file: Union[str, PathLike],
acquisition: Optional[Acquisition],
img: np.ndarray,
recovery_file: Union[str, PathLike, None],
recovered: bool,
img_file: Union[str, PathLike],
) -> Dict[str, Any]:
recovery_file_name = None
if recovery_file is not None:
recovery_file_name = Path(recovery_file).name
image_info_row = {
"image": Path(img_file).name,
"width_px": img.shape[2],
"height_px": img.shape[1],
"num_channels": img.shape[0],
"source_file": Path(mcd_txt_file).name,
"recovery_file": recovery_file_name,
"recovered": recovered,
}
if acquisition is not None:
image_info_row.update(
{
"acquisition_id": acquisition.id,
"acquisition_description": acquisition.description,
"acquisition_start_x_um": (acquisition.roi_points_um[0][0]),
"acquisition_start_y_um": (acquisition.roi_points_um[0][1]),
"acquisition_end_x_um": (acquisition.roi_points_um[2][0]),
"acquisition_end_y_um": (acquisition.roi_points_um[2][1]),
"acquisition_width_um": acquisition.width_um,
"acquisition_height_um": acquisition.height_um,
}
)
return image_info_row
create_panel_from_imc_panel(imc_panel_file, imc_panel_channel_col='Metal Tag', imc_panel_name_col='Target', imc_panel_keep_col='full', imc_panel_ilastik_col='ilastik')
Source code in steinbock/preprocessing/imc.py
def create_panel_from_imc_panel(
imc_panel_file: Union[str, PathLike],
imc_panel_channel_col: str = "Metal Tag",
imc_panel_name_col: str = "Target",
imc_panel_keep_col: str = "full",
imc_panel_ilastik_col: str = "ilastik",
) -> pd.DataFrame:
imc_panel = pd.read_csv(
imc_panel_file,
sep=",|;",
dtype={
imc_panel_channel_col: pd.StringDtype(),
imc_panel_name_col: pd.StringDtype(),
imc_panel_keep_col: pd.BooleanDtype(),
imc_panel_ilastik_col: pd.BooleanDtype(),
},
engine="python",
true_values=["1"],
false_values=["0"],
)
for required_col in (imc_panel_channel_col, imc_panel_name_col):
if required_col not in imc_panel:
raise SteinbockIMCPreprocessingException(
f"Missing '{required_col}' column in IMC panel"
)
for notnan_col in (
imc_panel_channel_col,
imc_panel_keep_col,
imc_panel_ilastik_col,
):
if notnan_col in imc_panel and imc_panel[notnan_col].isna().any():
raise SteinbockIMCPreprocessingException(
f"Missing values for '{notnan_col}' in IMC panel"
)
rename_columns = {
imc_panel_channel_col: "channel",
imc_panel_name_col: "name",
imc_panel_keep_col: "keep",
imc_panel_ilastik_col: "ilastik",
}
drop_columns = [
panel_col
for imc_panel_col, panel_col in rename_columns.items()
if panel_col in imc_panel.columns and panel_col != imc_panel_col
]
panel = imc_panel.drop(columns=drop_columns).rename(columns=rename_columns)
for _, g in panel.groupby("channel"):
panel.loc[g.index, "name"] = " / ".join(g["name"].dropna().unique())
if "keep" in panel:
panel.loc[g.index, "keep"] = g["keep"].any()
if "ilastik" in panel:
panel.loc[g.index, "ilastik"] = g["ilastik"].any()
panel = panel.groupby(panel["channel"].values).aggregate("first")
panel = _clean_panel(panel) # ilastik column may be nullable uint8 now
ilastik_mask = panel["ilastik"].fillna(False).astype(bool)
panel["ilastik"] = pd.Series(dtype=pd.UInt8Dtype())
panel.loc[ilastik_mask, "ilastik"] = range(1, ilastik_mask.sum() + 1)
return panel
create_panel_from_mcd_files(mcd_files, unzip=False)
Source code in steinbock/preprocessing/imc.py
def create_panel_from_mcd_files(
mcd_files: Sequence[Union[str, PathLike]], unzip: bool = False
) -> pd.DataFrame:
panels = []
for mcd_file in mcd_files:
zip_file_mcd_member = _get_zip_file_member(mcd_file)
if zip_file_mcd_member is None:
panels += create_panels_from_mcd_file(mcd_file)
elif unzip:
zip_file, mcd_member = zip_file_mcd_member
with ZipFile(zip_file) as fzip:
with TemporaryDirectory() as temp_dir:
extracted_mcd_file = fzip.extract(mcd_member, path=temp_dir)
panels += create_panels_from_mcd_file(extracted_mcd_file)
panel = pd.concat(panels, ignore_index=True, copy=False)
panel.drop_duplicates(inplace=True, ignore_index=True)
return _clean_panel(panel)
create_panel_from_txt_file(txt_file)
Source code in steinbock/preprocessing/imc.py
def create_panel_from_txt_file(txt_file: Union[str, PathLike]) -> pd.DataFrame:
with TXTFile(txt_file) as f:
return pd.DataFrame(
data={
"channel": pd.Series(data=f.channel_names, dtype=pd.StringDtype()),
"name": pd.Series(data=f.channel_labels, dtype=pd.StringDtype()),
},
)
create_panel_from_txt_files(txt_files, unzip=False)
Source code in steinbock/preprocessing/imc.py
def create_panel_from_txt_files(
txt_files: Sequence[Union[str, PathLike]], unzip: bool = False
) -> pd.DataFrame:
panels = []
for txt_file in txt_files:
zip_file_txt_member = _get_zip_file_member(txt_file)
if zip_file_txt_member is None:
panel = create_panel_from_txt_file(txt_file)
panels.append(panel)
elif unzip:
zip_file, txt_member = zip_file_txt_member
with ZipFile(zip_file) as fzip:
with TemporaryDirectory() as temp_dir:
extracted_txt_file = fzip.extract(txt_member, path=temp_dir)
panel = create_panel_from_txt_file(extracted_txt_file)
panels.append(panel)
panel = pd.concat(panels, ignore_index=True, copy=False)
panel.drop_duplicates(inplace=True, ignore_index=True)
return _clean_panel(panel)
create_panels_from_mcd_file(mcd_file)
Source code in steinbock/preprocessing/imc.py
def create_panels_from_mcd_file(mcd_file: Union[str, PathLike]) -> List[pd.DataFrame]:
panels = []
with MCDFile(mcd_file) as f:
for slide in f.slides:
for acquisition in slide.acquisitions:
panel = pd.DataFrame(
data={
"channel": pd.Series(
data=acquisition.channel_names,
dtype=pd.StringDtype(),
),
"name": pd.Series(
data=acquisition.channel_labels,
dtype=pd.StringDtype(),
),
},
)
panels.append(panel)
return panels
filter_hot_pixels(img, thres)
Source code in steinbock/preprocessing/imc.py
def filter_hot_pixels(img: np.ndarray, thres: float) -> np.ndarray:
kernel = np.ones((1, 3, 3), dtype=bool)
kernel[0, 1, 1] = False
max_neighbor_img = maximum_filter(img, footprint=kernel, mode="mirror")
return np.where(img - max_neighbor_img > thres, max_neighbor_img, img)
list_mcd_files(mcd_dir, unzip=False)
Source code in steinbock/preprocessing/imc.py
def list_mcd_files(mcd_dir: Union[str, PathLike], unzip: bool = False) -> List[Path]:
mcd_files = sorted(Path(mcd_dir).rglob("[!.]*.mcd"))
if unzip:
for zip_file in sorted(Path(mcd_dir).rglob("[!.]*.zip")):
with ZipFile(zip_file) as fzip:
for zip_info in sorted(fzip.infolist(), key=lambda x: x.filename):
if not zip_info.is_dir() and zip_info.filename.endswith(".mcd"):
mcd_files.append(zip_file / zip_info.filename)
return mcd_files
list_txt_files(txt_dir, unzip=False)
Source code in steinbock/preprocessing/imc.py
def list_txt_files(txt_dir: Union[str, PathLike], unzip: bool = False) -> List[Path]:
txt_files = sorted(Path(txt_dir).rglob("[!.]*.txt"))
if unzip:
for zip_file in sorted(Path(txt_dir).rglob("[!.]*.zip")):
with ZipFile(zip_file) as fzip:
for zip_info in sorted(fzip.infolist(), key=lambda x: x.filename):
if not zip_info.is_dir() and zip_info.filename.endswith(".txt"):
txt_files.append(zip_file / zip_info.filename)
return txt_files
preprocess_image(img, hpf=None)
Source code in steinbock/preprocessing/imc.py
def preprocess_image(img: np.ndarray, hpf: Optional[float] = None) -> np.ndarray:
img = img.astype(np.float32)
if hpf is not None:
img = filter_hot_pixels(img, hpf)
return io._to_dtype(img, io.img_dtype)
try_preprocess_images_from_disk(mcd_files, txt_files, channel_names=None, hpf=None, unzip=False, strict=False)
Source code in steinbock/preprocessing/imc.py
def try_preprocess_images_from_disk(
mcd_files: Sequence[Union[str, PathLike]],
txt_files: Sequence[Union[str, PathLike]],
channel_names: Optional[Sequence[str]] = None,
hpf: Optional[float] = None,
unzip: bool = False,
strict: bool = False,
) -> Generator[
Tuple[Path, Optional["Acquisition"], np.ndarray, Optional[Path], bool],
None,
None,
]:
candidate_txt_files = list(txt_files)
# process mcd files in reverse order to avoid ambiguous txt file matching
# see https://github.com/BodenmillerGroup/steinbock/issues/100
for mcd_file in sorted(
mcd_files, key=lambda mcd_file: Path(mcd_file).stem, reverse=True
):
zip_file_mcd_member = _get_zip_file_member(mcd_file)
if zip_file_mcd_member is None:
for (
acquisition,
img,
recovery_txt_file,
recovered,
) in _try_preprocess_mcd_images_from_disk(
mcd_file,
candidate_txt_files,
channel_names=channel_names,
hpf=hpf,
unzip=unzip,
strict=strict,
):
yield Path(mcd_file), acquisition, img, recovery_txt_file, recovered
del img
elif unzip:
zip_file, mcd_member = zip_file_mcd_member
with ZipFile(zip_file) as fzip:
with TemporaryDirectory() as temp_dir:
extracted_mcd_file = fzip.extract(mcd_member, path=temp_dir)
for (
acquisition,
img,
recovery_txt_file,
recovered,
) in _try_preprocess_mcd_images_from_disk(
extracted_mcd_file,
candidate_txt_files,
channel_names=channel_names,
hpf=hpf,
unzip=unzip,
strict=strict,
):
yield (
Path(mcd_file),
acquisition,
img,
recovery_txt_file,
recovered,
)
del img
for txt_file in candidate_txt_files:
zip_file_txt_member = _get_zip_file_member(txt_file)
if zip_file_txt_member is None:
img = _try_preprocess_txt_image_from_disk(
txt_file, channel_names=channel_names, hpf=hpf
)
if img is not None:
yield Path(txt_file), None, img, None, False
del img
elif unzip:
zip_file, txt_member = zip_file_txt_member
with ZipFile(zip_file) as fzip:
with TemporaryDirectory() as temp_dir:
extracted_txt_file = fzip.extract(txt_member, path=temp_dir)
img = _try_preprocess_txt_image_from_disk(
extracted_txt_file, channel_names=channel_names, hpf=hpf
)
if img is not None:
yield Path(txt_file), None, img, None, False
del img