steinbock.io
img_dtype
mask_dtype
list_data_files(data_dir, base_files=None)
Source code in steinbock/io.py
def list_data_files(
data_dir: Union[str, PathLike],
base_files: Optional[Sequence[Union[str, PathLike]]] = None,
) -> List[Path]:
if base_files is not None:
return _list_related_files(base_files, data_dir, ".csv")
return sorted(Path(data_dir).rglob("*.csv"))
list_image_files(img_dir, base_files=None)
Source code in steinbock/io.py
def list_image_files(
img_dir: Union[str, PathLike],
base_files: Optional[Sequence[Union[str, PathLike]]] = None,
) -> List[Path]:
if base_files is not None:
return _list_related_files(base_files, img_dir, ".tiff")
return sorted(Path(img_dir).rglob("*.tiff"))
list_mask_files(mask_dir, base_files=None)
Source code in steinbock/io.py
def list_mask_files(
mask_dir: Union[str, PathLike],
base_files: Optional[Sequence[Union[str, PathLike]]] = None,
) -> List[Path]:
if base_files is not None:
return _list_related_files(base_files, mask_dir, ".tiff")
return sorted(Path(mask_dir).rglob("*.tiff"))
list_neighbors_files(neighbors_dir, base_files=None)
Source code in steinbock/io.py
def list_neighbors_files(
neighbors_dir: Union[str, PathLike],
base_files: Optional[Sequence[Union[str, PathLike]]] = None,
) -> List[Path]:
if base_files is not None:
return _list_related_files(base_files, neighbors_dir, ".csv")
return sorted(Path(neighbors_dir).rglob("*.csv"))
read_data(data_file)
Source code in steinbock/io.py
def read_data(data_file: Union[str, PathLike]) -> pd.DataFrame:
return pd.read_csv(
data_file, sep=",|;", index_col="Object", engine="python"
)
read_image(img_file, use_imageio=False, native_dtype=False)
Source code in steinbock/io.py
def read_image(
img_file: Union[str, PathLike],
use_imageio: bool = False,
native_dtype: bool = False,
) -> np.ndarray:
if use_imageio:
img = imageio.volread(img_file)
orig_img_shape = img.shape
while img.ndim > 3 and img.shape[0] == 1:
img = img.sqeeze(axis=0)
while img.ndim > 3 and img.shape[-1] == 1:
img = img.sqeeze(axis=img.ndim - 1)
if img.ndim == 2:
img = img[np.newaxis, :, :]
elif img.ndim != 3:
raise ValueError(
f"Unsupported shape {orig_img_shape} for image {img_file}"
)
else:
img = tifffile.imread(img_file, squeeze=False)
if img.ndim == 2:
img = img[np.newaxis, :, :]
elif img.ndim == 5:
size_t, size_z, size_c, size_y, size_x = img.shape
if size_t != 1 or size_z != 1:
raise ValueError(
f"{img_file}: unsupported TZCYX shape {img.shape}"
)
img = img[0, 0, :, :, :]
elif img.ndim == 6:
size_t, size_z, size_c, size_y, size_x, size_s = img.shape
if size_t != 1 or size_z != 1 or size_s != 1:
raise ValueError(
f"{img_file}: unsupported TZCYXS shape {img.shape}"
)
img = img[0, 0, :, :, :, 0]
elif img.ndim != 3:
raise ValueError(
f"{img_file}: unsupported number of dimensions ({img.ndim})"
)
if not native_dtype:
img = _to_dtype(img, img_dtype)
return img
read_image_info(image_info_file)
Source code in steinbock/io.py
def read_image_info(image_info_file: Union[str, PathLike]) -> pd.DataFrame:
image_info = pd.read_csv(
image_info_file,
sep=",|;",
dtype={
"image": pd.StringDtype(),
"width_px": pd.UInt16Dtype(),
"height_px": pd.UInt16Dtype(),
"num_channels": pd.UInt8Dtype(),
},
engine="python",
)
for required_col in ("image", "width_px", "height_px", "num_channels"):
if required_col not in image_info:
raise ValueError(
f"Missing '{required_col}' column in {image_info_file}"
)
for notnan_col in ("image", "width_px", "height_px", "num_channels"):
if notnan_col in image_info and image_info[notnan_col].isna().any():
raise ValueError(
f"Missing values for '{notnan_col}' in {image_info_file}"
)
for unique_col in ("image",):
if unique_col in image_info:
if image_info[unique_col].dropna().duplicated().any():
raise ValueError(
f"Duplicated values for '{unique_col}'"
f" in {image_info_file}"
)
return image_info
read_mask(mask_file, native_dtype=False)
Source code in steinbock/io.py
def read_mask(
mask_file: Union[str, PathLike], native_dtype: bool = False
) -> np.ndarray:
mask = tifffile.imread(mask_file, squeeze=False)
if mask.ndim == 5:
size_t, size_z, size_c, size_y, size_x = mask.shape
if size_t != 1 or size_z != 1 or size_c != 1:
raise ValueError(
f"{mask_file}: unsupported TZCYX shape {mask.shape}"
)
mask = mask[0, 0, 0, :, :]
elif mask.ndim == 6:
size_t, size_z, size_c, size_y, size_x, size_s = mask.shape
if size_t != 1 or size_z != 1 or size_c != 1 or size_s != 1:
raise ValueError(
f"{mask_file}: unsupported TZCYXS shape {mask.shape}"
)
mask = mask[0, 0, 0, :, :, 0]
elif mask.ndim != 2:
raise ValueError(
f"{mask_file}: unsupported number of dimensions ({mask.ndim})"
)
if not native_dtype:
mask = _to_dtype(mask, mask_dtype)
return mask
read_neighbors(neighbors_file)
Source code in steinbock/io.py
def read_neighbors(neighbors_file: Union[str, PathLike]) -> pd.DataFrame:
return pd.read_csv(
neighbors_file,
sep=",|;",
usecols=["Object", "Neighbor", "Distance"],
dtype={
"Object": mask_dtype,
"Neighbor": mask_dtype,
"Distance": np.float32,
},
engine="python",
)
read_panel(panel_file, unfiltered=False)
Source code in steinbock/io.py
def read_panel(
panel_file: Union[str, PathLike], unfiltered: bool = False
) -> pd.DataFrame:
panel = pd.read_csv(
panel_file,
sep=",|;",
dtype={
"channel": pd.StringDtype(),
"name": pd.StringDtype(),
"keep": pd.BooleanDtype(),
},
engine="python",
true_values=["1"],
false_values=["0"],
)
for required_col in ("channel", "name"):
if required_col not in panel:
raise ValueError(
f"Missing '{required_col}' column in {panel_file}"
)
for notnan_col in ("channel", "keep"):
if notnan_col in panel and panel[notnan_col].isna().any():
raise ValueError(
f"Missing values for '{notnan_col}' in {panel_file}"
)
for unique_col in ("channel", "name"):
if unique_col in panel:
if panel[unique_col].dropna().duplicated().any():
raise ValueError(
f"Duplicated values for '{unique_col}' in {panel_file}"
)
if not unfiltered and "keep" in panel:
panel = panel.loc[panel["keep"].astype(bool), :]
return panel
write_data(data, data_file)
Source code in steinbock/io.py
def write_data(data: pd.DataFrame, data_file: Union[str, PathLike]) -> None:
data = data.reset_index()
data.to_csv(data_file, index=False)
write_image(img, img_file, ignore_dtype=False)
Source code in steinbock/io.py
def write_image(
img: np.ndarray,
img_file: Union[str, PathLike],
ignore_dtype: bool = False,
) -> None:
if not ignore_dtype:
img = _to_dtype(img, img_dtype)
tifffile.imwrite(
img_file,
data=img[np.newaxis, np.newaxis, :, :, :, np.newaxis],
imagej=True,
)
write_image_info(image_info, image_info_file)
Source code in steinbock/io.py
def write_image_info(
image_info: pd.DataFrame, image_info_file: Union[str, PathLike]
) -> None:
image_info.to_csv(image_info_file, index=False)
write_mask(mask, mask_file)
Source code in steinbock/io.py
def write_mask(mask: np.ndarray, mask_file: Union[str, PathLike]) -> None:
mask = _to_dtype(mask, mask_dtype)
tifffile.imwrite(
mask_file,
data=mask[np.newaxis, np.newaxis, np.newaxis, :, :, np.newaxis],
imagej=True,
)
write_neighbors(neighbors, neighbors_file)
Source code in steinbock/io.py
def write_neighbors(
neighbors: pd.DataFrame, neighbors_file: Union[str, PathLike]
) -> None:
neighbors = neighbors.loc[:, ["Object", "Neighbor", "Distance"]].astype(
{
"Object": mask_dtype,
"Neighbor": mask_dtype,
"Distance": np.float32,
}
)
neighbors.to_csv(neighbors_file, index=False)
write_panel(panel, panel_file)
Source code in steinbock/io.py
def write_panel(panel: pd.DataFrame, panel_file: Union[str, PathLike]) -> None:
panel = panel.copy()
for col in panel.columns:
if panel[col].convert_dtypes().dtype == pd.BooleanDtype():
panel[col] = panel[col].astype(pd.UInt8Dtype())
panel.to_csv(panel_file, index=False)