steinbock.preprocessing
external
logger
SteinbockExternalPreprocessingException (SteinbockPreprocessingException)
Source code in steinbock/preprocessing/external.py
class SteinbockExternalPreprocessingException(SteinbockPreprocessingException):
pass
create_panel_from_image_files(ext_img_files)
Source code in steinbock/preprocessing/external.py
def create_panel_from_image_files(
ext_img_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
num_channels = None
for ext_img_file in ext_img_files:
try:
ext_img = _read_external_image(ext_img_file)
num_channels = ext_img.shape[0]
break
except:
pass # skipped intentionally
if num_channels is None:
raise SteinbockExternalPreprocessingException("No valid images found")
panel = pd.DataFrame(
data={
"channel": range(1, num_channels + 1),
"name": np.nan,
"keep": True,
"ilastik": range(1, num_channels + 1),
"deepcell": np.nan,
},
)
panel["channel"] = panel["channel"].astype(pd.StringDtype())
panel["name"] = panel["name"].astype(pd.StringDtype())
panel["keep"] = panel["keep"].astype(pd.BooleanDtype())
panel["ilastik"] = panel["ilastik"].astype(pd.UInt8Dtype())
panel["deepcell"] = panel["deepcell"].astype(pd.UInt8Dtype())
return panel
list_image_files(ext_img_dir)
Source code in steinbock/preprocessing/external.py
def list_image_files(ext_img_dir: Union[str, PathLike]) -> List[Path]:
return sorted(Path(ext_img_dir).rglob("[!.]*.*"))
try_preprocess_images_from_disk(ext_img_files)
Source code in steinbock/preprocessing/external.py
def try_preprocess_images_from_disk(
ext_img_files: Sequence[Union[str, PathLike]]
) -> Generator[Tuple[Path, np.ndarray], None, None]:
for ext_img_file in ext_img_files:
try:
img = _read_external_image(ext_img_file)
except:
logger.warning(f"Unsupported file format: {ext_img_file}")
continue
yield ext_img_file, img
del img
imc
imc_available
logger
SteinbockIMCPreprocessingException (SteinbockPreprocessingException)
Source code in steinbock/preprocessing/imc.py
class SteinbockIMCPreprocessingException(SteinbockPreprocessingException):
pass
create_panel_from_imc_panel(imc_panel_file, imc_panel_channel_col='Metal Tag', imc_panel_name_col='Target', imc_panel_keep_col='full', imc_panel_ilastik_col='ilastik')
Source code in steinbock/preprocessing/imc.py
def create_panel_from_imc_panel(
imc_panel_file: Union[str, PathLike],
imc_panel_channel_col: str = "Metal Tag",
imc_panel_name_col: str = "Target",
imc_panel_keep_col: str = "full",
imc_panel_ilastik_col: str = "ilastik",
) -> pd.DataFrame:
imc_panel = pd.read_csv(
imc_panel_file,
sep=",|;",
dtype={
imc_panel_channel_col: pd.StringDtype(),
imc_panel_name_col: pd.StringDtype(),
imc_panel_keep_col: pd.BooleanDtype(),
imc_panel_ilastik_col: pd.BooleanDtype(),
},
engine="python",
true_values=["1"],
false_values=["0"],
)
for required_col in (imc_panel_channel_col, imc_panel_name_col):
if required_col not in imc_panel:
raise SteinbockIMCPreprocessingException(
f"Missing '{required_col}' column in IMC panel"
)
for notnan_col in (
imc_panel_channel_col,
imc_panel_keep_col,
imc_panel_ilastik_col,
):
if notnan_col in imc_panel and imc_panel[notnan_col].isna().any():
raise SteinbockIMCPreprocessingException(
f"Missing values for '{notnan_col}' in IMC panel"
)
rename_columns = {
imc_panel_channel_col: "channel",
imc_panel_name_col: "name",
imc_panel_keep_col: "keep",
imc_panel_ilastik_col: "ilastik",
}
drop_columns = [
panel_col
for imc_panel_col, panel_col in rename_columns.items()
if panel_col in imc_panel.columns and panel_col != imc_panel_col
]
panel = imc_panel.drop(columns=drop_columns).rename(columns=rename_columns)
for _, g in panel.groupby("channel"):
panel.loc[g.index, "name"] = " / ".join(g["name"].dropna().unique())
if "keep" in panel:
panel.loc[g.index, "keep"] = g["keep"].any()
if "ilastik" in panel:
panel.loc[g.index, "ilastik"] = g["ilastik"].any()
panel = panel.groupby(panel["channel"].values).aggregate("first")
panel = _clean_panel(panel) # ilastik column may be nullable uint8 now
ilastik_mask = panel["ilastik"].fillna(False).astype(bool)
panel["ilastik"] = pd.Series(dtype=pd.UInt8Dtype())
panel.loc[ilastik_mask, "ilastik"] = range(1, ilastik_mask.sum() + 1)
return panel
create_panel_from_mcd_files(mcd_files)
Source code in steinbock/preprocessing/imc.py
def create_panel_from_mcd_files(
mcd_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
panels = []
for mcd_file in mcd_files:
with MCDFile(mcd_file) as f:
for slide in f.slides:
for acquisition in slide.acquisitions:
panel = pd.DataFrame(
data={
"channel": pd.Series(
data=acquisition.channel_names,
dtype=pd.StringDtype(),
),
"name": pd.Series(
data=acquisition.channel_labels,
dtype=pd.StringDtype(),
),
},
)
panels.append(panel)
panel = pd.concat(panels, ignore_index=True, copy=False)
panel.drop_duplicates(inplace=True, ignore_index=True)
return _clean_panel(panel)
create_panel_from_txt_files(txt_files)
Source code in steinbock/preprocessing/imc.py
def create_panel_from_txt_files(
txt_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
panels = []
for txt_file in txt_files:
with TXTFile(txt_file) as f:
panel = pd.DataFrame(
data={
"channel": pd.Series(data=f.channel_names, dtype=pd.StringDtype()),
"name": pd.Series(data=f.channel_labels, dtype=pd.StringDtype()),
},
)
panels.append(panel)
panel = pd.concat(panels, ignore_index=True, copy=False)
panel.drop_duplicates(inplace=True, ignore_index=True)
return _clean_panel(panel)
filter_hot_pixels(img, thres)
Source code in steinbock/preprocessing/imc.py
def filter_hot_pixels(img: np.ndarray, thres: float) -> np.ndarray:
kernel = np.ones((1, 3, 3), dtype=bool)
kernel[0, 1, 1] = False
max_neighbor_img = maximum_filter(img, footprint=kernel, mode="mirror")
return np.where(img - max_neighbor_img > thres, max_neighbor_img, img)
list_mcd_files(mcd_dir)
Source code in steinbock/preprocessing/imc.py
def list_mcd_files(mcd_dir: Union[str, PathLike]) -> List[Path]:
return sorted(Path(mcd_dir).rglob("[!.]*.mcd"))
list_txt_files(txt_dir)
Source code in steinbock/preprocessing/imc.py
def list_txt_files(txt_dir: Union[str, PathLike]) -> List[Path]:
return sorted(Path(txt_dir).rglob("[!.]*.txt"))
preprocess_image(img, hpf=None)
Source code in steinbock/preprocessing/imc.py
def preprocess_image(img: np.ndarray, hpf: Optional[float] = None) -> np.ndarray:
img = img.astype(np.float32)
if hpf is not None:
img = filter_hot_pixels(img, hpf)
return io._to_dtype(img, io.img_dtype)
try_preprocess_images_from_disk(mcd_files, txt_files, channel_names=None, hpf=None)
Source code in steinbock/preprocessing/imc.py
def try_preprocess_images_from_disk(
mcd_files: Sequence[Union[str, PathLike]],
txt_files: Sequence[Union[str, PathLike]],
channel_names: Optional[Sequence[str]] = None,
hpf: Optional[float] = None,
) -> Generator[
Tuple[Path, Optional["Acquisition"], np.ndarray, Optional[Path], bool],
None,
None,
]:
unmatched_txt_files = list(txt_files)
# process mcd files in descending order to avoid ambiguous txt file matching
# see https://github.com/BodenmillerGroup/steinbock/issues/100
for mcd_file in sorted(
mcd_files, key=lambda mcd_file: Path(mcd_file).stem, reverse=True
):
try:
with MCDFile(mcd_file) as f_mcd:
for slide in f_mcd.slides:
for acquisition in slide.acquisitions:
matched_txt_file = _match_txt_file(
mcd_file, acquisition, unmatched_txt_files
)
if matched_txt_file is not None:
unmatched_txt_files.remove(matched_txt_file)
channel_ind = None
if channel_names is not None:
channel_ind = _get_channel_indices(
acquisition, channel_names
)
if isinstance(channel_ind, str):
logger.warning(
f"Channel {channel_ind} not found for "
f"acquisition {acquisition.id} in file "
"{mcd_file}; skipping acquisition"
)
continue
img = None
recovered = False
try:
img = f_mcd.read_acquisition(acquisition)
except IOError as e:
logger.warning(
f"Error reading acquisition {acquisition.id} "
f"from file {mcd_file}: {e}"
)
if matched_txt_file is not None:
logger.warning(
f"Restoring from file {matched_txt_file}"
)
try:
with TXTFile(matched_txt_file) as f_txt:
img = f_txt.read_acquisition()
if channel_names is not None:
channel_ind = _get_channel_indices(
f_txt, channel_names
)
if isinstance(channel_ind, str):
logger.warning(
f"Channel {channel_ind} "
"not found in file "
f"{matched_txt_file}; "
"skipping acquisition"
)
continue
recovered = True
except IOError as e2:
logger.error(
f"Error reading file {matched_txt_file}: {e2}"
)
if img is not None: # exceptions ...
if channel_ind is not None:
img = img[channel_ind, :, :]
img = preprocess_image(img, hpf=hpf)
yield (
Path(mcd_file),
acquisition,
img,
Path(matched_txt_file)
if matched_txt_file is not None
else None,
recovered,
)
del img
except:
logger.exception(f"Error reading file {mcd_file}")
while len(unmatched_txt_files) > 0:
txt_file = unmatched_txt_files.pop(0)
try:
channel_ind = None
with TXTFile(txt_file) as f:
if channel_names is not None:
channel_ind = _get_channel_indices(f, channel_names)
if isinstance(channel_ind, str):
logger.warning(
f"Channel {channel_ind} not found in file "
f"{txt_file}; skipping acquisition"
)
continue
img = f.read_acquisition()
if channel_ind is not None:
img = img[channel_ind, :, :]
img = preprocess_image(img, hpf=hpf)
yield Path(txt_file), None, img, None, False
del img
except:
logger.exception(f"Error reading file {txt_file}")