steinbock.utils
expansion
expand_mask(mask, distance)
Source code in steinbock/utils/expansion.py
def expand_mask(mask: np.ndarray, distance: int) -> np.ndarray:
expanded_mask = expand_labels(mask, distance=distance)
return expanded_mask
try_expand_masks_from_disk(mask_files, distance, mmap=False)
Source code in steinbock/utils/expansion.py
def try_expand_masks_from_disk(
mask_files: Sequence[Union[str, PathLike]], distance: int, mmap: bool = False
) -> Generator[Tuple[Path, np.ndarray], None, None]:
for mask_file in mask_files:
if mmap:
mask = io.mmap_mask(mask_file)
else:
mask = io.read_mask(mask_file, native_dtype=True)
expanded_mask = expand_mask(mask, distance=distance)
yield Path(mask_file), expanded_mask
matching
logger
match_masks(mask1, mask2)
Source code in steinbock/utils/matching.py
def match_masks(mask1: np.ndarray, mask2: np.ndarray) -> pd.DataFrame:
nz1 = mask1 != 0
nz2 = mask2 != 0
object_ids1 = []
object_ids2 = []
for object_id1 in np.unique(mask1[nz1]):
for object_id2 in np.unique(mask2[nz2 & (mask1 == object_id1)]):
object_ids1.append(object_id1)
object_ids2.append(object_id2)
for object_id2 in np.unique(mask2[nz2]):
for object_id1 in np.unique(mask1[nz1 & (mask2 == object_id2)]):
object_ids1.append(object_id1)
object_ids2.append(object_id2)
df = pd.DataFrame(data={"Object1": object_ids1, "Object2": object_ids2})
df.drop_duplicates(inplace=True, ignore_index=True)
return df
try_match_masks_from_disk(mask_files1, mask_files2, mmap=False)
Source code in steinbock/utils/matching.py
def try_match_masks_from_disk(
mask_files1: Sequence[Union[str, PathLike]],
mask_files2: Sequence[Union[str, PathLike]],
mmap: bool = False,
) -> Generator[Tuple[Path, Path, pd.DataFrame], None, None]:
for mask_file1, mask_file2 in zip(mask_files1, mask_files2):
try:
if mmap:
mask1 = io.mmap_mask(mask_file1)
mask2 = io.mmap_mask(mask_file2)
else:
mask1 = io.read_mask(mask_file1)
mask2 = io.read_mask(mask_file2)
df = match_masks(mask1, mask2)
del mask1, mask2
yield Path(mask_file1), Path(mask_file2), df
del df
except Exception as e:
logger.exception(f"Error matching masks {mask_file1, mask_file2}: {e}")
mosaics
logger
SteinbockMosaicsUtilsException (SteinbockUtilsException)
Source code in steinbock/utils/mosaics.py
class SteinbockMosaicsUtilsException(SteinbockUtilsException):
pass
try_extract_tiles_from_disk_to_disk(img_files, tile_dir, tile_size, mmap=False)
Source code in steinbock/utils/mosaics.py
def try_extract_tiles_from_disk_to_disk(
img_files: Sequence[Union[str, PathLike]],
tile_dir: Union[str, PathLike],
tile_size: int,
mmap: bool = False,
) -> Generator[Tuple[Path, np.ndarray], None, None]:
for img_file in img_files:
try:
if mmap:
img = io.mmap_image(img_file)
else:
img = io.read_image(img_file, native_dtype=True)
if img.shape[-1] % tile_size == 1 or img.shape[-2] % tile_size == 1:
logger.warning(
"Chosen tile size yields UNSTITCHABLE tiles of 1 pixel "
f"width or height for image {img_file}"
)
for tile_x in range(0, img.shape[-1], tile_size):
for tile_y in range(0, img.shape[-2], tile_size):
tile = img[
:,
tile_y : (tile_y + tile_size),
tile_x : (tile_x + tile_size),
]
tile_file = Path(tile_dir) / (
f"{Path(img_file).stem}_tx{tile_x}_ty{tile_y}"
f"_tw{tile.shape[-1]}_th{tile.shape[-2]}.tiff"
)
io.write_image(tile, tile_file, ignore_dtype=True)
yield tile_file, tile
del tile
del img
except Exception as e:
logger.exception(f"Error extracting tiles: {img_file}: {e}")
try_stitch_tiles_from_disk_to_disk(tile_files, img_dir, relabel=False, mmap=False)
Source code in steinbock/utils/mosaics.py
def try_stitch_tiles_from_disk_to_disk(
tile_files: Sequence[Union[str, PathLike]],
img_dir: Union[str, PathLike],
relabel: bool = False,
mmap: bool = False,
) -> Generator[Tuple[Path, np.ndarray], None, None]:
class TileInfo(NamedTuple):
tile_file: Path
x: int
y: int
width: int
height: int
tile_file_stem_pattern = re.compile(
r"(?P<img_file_stem>.+)_tx(?P<x>\d+)_ty(?P<y>\d+)"
r"_tw(?P<width>\d+)_th(?P<height>\d+)"
)
img_tile_infos: Dict[str, List[TileInfo]] = {}
for tile_file in tile_files:
m = tile_file_stem_pattern.fullmatch(Path(tile_file).stem)
if m is None:
raise SteinbockMosaicsUtilsException(
f"Malformed tile file name: {tile_file}"
)
img_file_stem = m.group("img_file_stem")
tile_info = TileInfo(
Path(tile_file),
int(m.group("x")),
int(m.group("y")),
int(m.group("width")),
int(m.group("height")),
)
if img_file_stem not in img_tile_infos:
img_tile_infos[img_file_stem] = []
img_tile_infos[img_file_stem].append(tile_info)
for img_file_stem, tile_infos in img_tile_infos.items():
img_file = Path(img_dir) / f"{img_file_stem}.tiff"
try:
tile = io.read_image(tile_infos[0].tile_file, native_dtype=True)
img_shape = (
tile.shape[0],
max(ti.y + ti.height for ti in tile_infos),
max(ti.x + ti.width for ti in tile_infos),
)
if mmap:
img = io.mmap_image(
img_file, mode="r+", shape=img_shape, dtype=tile.dtype
)
else:
img = np.zeros(img_shape, dtype=tile.dtype)
for i, tile_info in enumerate(tile_infos):
if i > 0:
tile = io.read_image(tile_info.tile_file, native_dtype=True)
img[
:,
tile_info.y : tile_info.y + tile_info.height,
tile_info.x : tile_info.x + tile_info.width,
] = tile
if mmap:
img.flush()
if relabel:
img[0, :, :] = measure.label(img[0, :, :])
if mmap:
img.flush()
else:
io.write_image(img, img_file, ignore_dtype=True)
yield img_file, img
del img
except Exception as e:
logger.exception(f"Error stitching tiles: {img_file}: {e}")