steinbock.preprocessing
external
create_panel_from_image_files(ext_img_files)
Source code in steinbock/preprocessing/external.py
def create_panel_from_image_files(
ext_img_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
num_channels = None
for ext_img_file in ext_img_files:
try:
ext_img = io.read_image(
ext_img_file,
keep_suffix=True,
use_imageio=True,
native_dtype=True,
)
if ext_img is not None:
num_channels = ext_img.shape[0]
break
except:
pass
if num_channels is None:
raise IOError("No valid images found")
panel = pd.DataFrame(
data={
"channel": range(1, num_channels + 1),
"name": np.nan,
"keep": True,
"ilastik": range(1, num_channels + 1),
"deepcell": np.nan,
},
)
panel["channel"] = panel["channel"].astype(pd.StringDtype())
panel["name"] = panel["name"].astype(pd.StringDtype())
panel["keep"] = panel["keep"].astype(pd.BooleanDtype())
panel["ilastik"] = panel["ilastik"].astype(pd.UInt8Dtype())
panel["deepcell"] = panel["deepcell"].astype(pd.UInt8Dtype())
return panel
list_image_files(ext_img_dir)
Source code in steinbock/preprocessing/external.py
def list_image_files(ext_img_dir: Union[str, PathLike]) -> List[Path]:
return sorted(Path(ext_img_dir).rglob("*.*"))
try_preprocess_images_from_disk(ext_img_files, channel_indices=None)
Source code in steinbock/preprocessing/external.py
def try_preprocess_images_from_disk(
ext_img_files: Sequence[Union[str, PathLike]],
channel_indices: Optional[Sequence[int]] = None,
) -> Generator[Tuple[Path, np.ndarray], None, None]:
for ext_img_file in ext_img_files:
try:
ext_img = io.read_image(
ext_img_file,
keep_suffix=True,
use_imageio=True,
native_dtype=True,
)
if ext_img is None:
_logger.warning(
f"Unsupported image data in file {ext_img_file}"
)
continue
if channel_indices is not None:
if max(channel_indices) > ext_img.shape[0]:
_logger.warning(
f"Channel indices out of bounds for file "
f"{ext_img_file} with {ext_img.shape[0]} channels"
)
ext_img = ext_img[channel_indices, :, :]
yield ext_img_file, ext_img
del ext_img
except:
_logger.exception(f"Error reading file {ext_img_file}")
imc
imc_available
create_panel_from_imc_panel(imc_panel_file)
Source code in steinbock/preprocessing/imc.py
def create_panel_from_imc_panel(
imc_panel_file: Union[str, PathLike]
) -> pd.DataFrame:
imc_panel = pd.read_csv(
imc_panel_file,
sep=",|;",
dtype={
_imc_panel_metal_col: pd.StringDtype(),
_imc_panel_target_col: pd.StringDtype(),
_imc_panel_keep_col: pd.BooleanDtype(),
_imc_panel_ilastik_col: pd.BooleanDtype(),
_imc_panel_deepcell_col: pd.UInt8Dtype(),
},
engine="python",
true_values=["1"],
false_values=["0"],
)
for required_col in (_imc_panel_metal_col, _imc_panel_target_col):
if required_col not in imc_panel:
raise ValueError(f"Missing '{required_col}' column in IMC panel")
for notnan_col in (
_imc_panel_metal_col,
_imc_panel_keep_col,
_imc_panel_ilastik_col,
_imc_panel_deepcell_col,
):
if notnan_col in imc_panel and imc_panel[notnan_col].isna().any():
raise ValueError(f"Missing values for '{notnan_col}' in IMC panel")
rename_columns = {
_imc_panel_metal_col: "channel",
_imc_panel_target_col: "name",
_imc_panel_keep_col: "keep",
_imc_panel_ilastik_col: "ilastik",
_imc_panel_deepcell_col: "deepcell",
}
drop_columns = [
panel_col
for imc_panel_col, panel_col in rename_columns.items()
if panel_col in imc_panel.columns and panel_col != imc_panel_col
]
panel = imc_panel.drop(columns=drop_columns).rename(columns=rename_columns)
if "ilastik" in panel:
ilastik_mask = panel["ilastik"].astype(bool)
panel["ilastik"] = pd.Series(dtype=pd.UInt8Dtype())
panel.loc[ilastik_mask, "ilastik"] = range(1, ilastik_mask.sum() + 1)
return _clean_panel(panel)
create_panel_from_mcd_files(mcd_files)
Source code in steinbock/preprocessing/imc.py
def create_panel_from_mcd_files(
mcd_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
panels = []
for mcd_file in mcd_files:
with MCDFile(mcd_file) as f:
for slide in f.slides:
for acquisition in slide.acquisitions:
panel = _create_panel_from_acquisition(acquisition)
panels.append(panel)
panel = pd.concat(panels, ignore_index=True, copy=False)
return _clean_panel(panel)
create_panel_from_txt_files(txt_files)
Source code in steinbock/preprocessing/imc.py
def create_panel_from_txt_files(
txt_files: Sequence[Union[str, PathLike]]
) -> pd.DataFrame:
panels = []
for txt_file in txt_files:
with TXTFile(txt_file) as f:
panel = _create_panel_from_acquisition(f)
panels.append(panel)
panel = pd.concat(panels, ignore_index=True, copy=False)
return _clean_panel(panel)
filter_hot_pixels(img, thres)
Source code in steinbock/preprocessing/imc.py
def filter_hot_pixels(img: np.ndarray, thres: float) -> np.ndarray:
kernel = np.ones((1, 3, 3), dtype=bool)
kernel[0, 1, 1] = False
max_neighbor_img = maximum_filter(img, footprint=kernel, mode="mirror")
return np.where(img - max_neighbor_img > thres, max_neighbor_img, img)
list_mcd_files(mcd_dir)
Source code in steinbock/preprocessing/imc.py
def list_mcd_files(mcd_dir: Union[str, PathLike]) -> List[Path]:
return sorted(Path(mcd_dir).rglob("*.mcd"))
list_txt_files(txt_dir)
Source code in steinbock/preprocessing/imc.py
def list_txt_files(txt_dir: Union[str, PathLike]) -> List[Path]:
return sorted(Path(txt_dir).rglob("*.txt"))
preprocess_image(img, hpf=None)
Source code in steinbock/preprocessing/imc.py
def preprocess_image(
img: np.ndarray, hpf: Optional[float] = None
) -> np.ndarray:
img = img.astype(np.float32)
if hpf is not None:
img = filter_hot_pixels(img, hpf)
return io._to_dtype(img, io.img_dtype)
try_preprocess_images_from_disk(mcd_files, txt_files, channel_names=None, hpf=None)
Source code in steinbock/preprocessing/imc.py
def try_preprocess_images_from_disk(
mcd_files: Sequence[Union[str, PathLike]],
txt_files: Sequence[Union[str, PathLike]],
channel_names: Optional[Sequence[str]] = None,
hpf: Optional[float] = None,
) -> Generator[
Tuple[Path, Optional["Acquisition"], np.ndarray, Optional[Path], bool],
None,
None,
]:
unmatched_txt_files = list(txt_files)
for mcd_file in mcd_files:
try:
with MCDFile(mcd_file) as f_mcd:
for slide in f_mcd.slides:
for acquisition in slide.acquisitions:
matched_txt_file = _match_txt_file(
mcd_file, acquisition, unmatched_txt_files
)
if matched_txt_file is not None:
unmatched_txt_files.remove(matched_txt_file)
channel_ind = None
if channel_names is not None:
channel_ind = _get_channel_indices(
acquisition, channel_names
)
if isinstance(channel_ind, str):
_logger.warning(
f"Channel {channel_ind} not found for "
f"acquisition {acquisition.id} in file "
"{mcd_file}; skipping acquisition"
)
continue
img = None
recovered = False
try:
img = f_mcd.read_acquisition(acquisition)
except IOError:
_logger.warning(
f"Error reading acquisition {acquisition.id} "
f"from file {mcd_file}"
)
if matched_txt_file is not None:
_logger.warning(
f"Restoring from file {matched_txt_file}"
)
try:
with TXTFile(matched_txt_file) as f_txt:
img = f_txt.read_acquisition()
if channel_names is not None:
channel_ind = _get_channel_indices(
f_txt, channel_names
)
if isinstance(channel_ind, str):
_logger.warning(
f"Channel {channel_ind} "
"not found in file "
f"{matched_txt_file}; "
"skipping acquisition"
)
continue
recovered = True
except IOError:
_logger.exception(
"Error reading file "
f"{matched_txt_file}"
)
if img is not None: # exceptions ...
if channel_ind is not None:
img = img[channel_ind, :, :]
img = preprocess_image(img, hpf=hpf)
yield (
Path(mcd_file),
acquisition,
img,
Path(matched_txt_file)
if matched_txt_file is not None
else None,
recovered,
)
del img
except:
_logger.exception(f"Error reading file {mcd_file}")
while len(unmatched_txt_files) > 0:
txt_file = unmatched_txt_files.pop(0)
try:
channel_ind = None
with TXTFile(txt_file) as f:
if channel_names is not None:
channel_ind = _get_channel_indices(f, channel_names)
if isinstance(channel_ind, str):
_logger.warning(
f"Channel {channel_ind} not found in file "
f"{txt_file}; skipping acquisition"
)
continue
img = f.read_acquisition()
if channel_ind is not None:
img = img[channel_ind, :, :]
img = preprocess_image(img, hpf=hpf)
yield Path(txt_file), None, img, None, False
del img
except:
_logger.exception(f"Error reading file {txt_file}")