Coverage for openhcs/core/roi.py: 42.0%
145 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Generic ROI (Region of Interest) system for OpenHCS.
4This module provides backend-agnostic ROI extraction and representation.
5ROIs are extracted from labeled segmentation masks and can be materialized
6to various backends (OMERO, disk, Napari, Fiji).
8Doctrinal Clauses:
9- Clause 66 — Immutability After Construction
10- Clause 88 — No Inferred Capabilities
11"""
13import logging
14import numpy as np
15from dataclasses import dataclass, field
16from typing import List, Dict, Any, Optional, Tuple
17from pathlib import Path
18from enum import Enum
20logger = logging.getLogger(__name__)
23class ShapeType(Enum):
24 """ROI shape types."""
25 POLYGON = "polygon"
26 MASK = "mask"
27 POINT = "point"
28 ELLIPSE = "ellipse"
31@dataclass(frozen=True)
32class PolygonShape:
33 """Polygon ROI shape defined by vertex coordinates.
35 Attributes:
36 coordinates: Nx2 array of (y, x) coordinates
37 shape_type: Always ShapeType.POLYGON
38 """
39 coordinates: np.ndarray # Nx2 array of (y, x) coordinates
40 shape_type: ShapeType = field(default=ShapeType.POLYGON, init=False)
42 def __post_init__(self):
43 """Validate polygon coordinates."""
44 if self.coordinates.ndim != 2 or self.coordinates.shape[1] != 2: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 raise ValueError(f"Polygon coordinates must be Nx2 array, got shape {self.coordinates.shape}")
46 if len(self.coordinates) < 3: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true
47 raise ValueError(f"Polygon must have at least 3 vertices, got {len(self.coordinates)}")
50@dataclass(frozen=True)
51class MaskShape:
52 """Binary mask ROI shape.
54 Attributes:
55 mask: 2D boolean array
56 bbox: Bounding box (min_y, min_x, max_y, max_x)
57 shape_type: Always ShapeType.MASK
58 """
59 mask: np.ndarray # 2D boolean array
60 bbox: Tuple[int, int, int, int] # (min_y, min_x, max_y, max_x)
61 shape_type: ShapeType = field(default=ShapeType.MASK, init=False)
63 def __post_init__(self):
64 """Validate mask."""
65 if self.mask.ndim != 2:
66 raise ValueError(f"Mask must be 2D array, got shape {self.mask.shape}")
67 if self.mask.dtype != bool:
68 raise ValueError(f"Mask must be boolean array, got dtype {self.mask.dtype}")
71@dataclass(frozen=True)
72class PointShape:
73 """Point ROI shape.
75 Attributes:
76 y: Y coordinate
77 x: X coordinate
78 shape_type: Always ShapeType.POINT
79 """
80 y: float
81 x: float
82 shape_type: ShapeType = field(default=ShapeType.POINT, init=False)
85@dataclass(frozen=True)
86class EllipseShape:
87 """Ellipse ROI shape.
89 Attributes:
90 center_y: Y coordinate of center
91 center_x: X coordinate of center
92 radius_y: Radius along Y axis
93 radius_x: Radius along X axis
94 shape_type: Always ShapeType.ELLIPSE
95 """
96 center_y: float
97 center_x: float
98 radius_y: float
99 radius_x: float
100 shape_type: ShapeType = field(default=ShapeType.ELLIPSE, init=False)
103@dataclass(frozen=True)
104class ROI:
105 """Region of Interest with metadata.
107 Attributes:
108 shapes: List of shape objects (PolygonShape, MaskShape, etc.)
109 metadata: Dictionary of ROI metadata (label, area, perimeter, etc.)
110 """
111 shapes: List[Any] # List of shape objects
112 metadata: Dict[str, Any] = field(default_factory=dict)
114 def __post_init__(self):
115 """Validate ROI."""
116 if not self.shapes: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 raise ValueError("ROI must have at least one shape")
119 # Validate all shapes have shape_type attribute
120 for shape in self.shapes:
121 if not hasattr(shape, 'shape_type'): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 raise ValueError(f"Shape {shape} must have shape_type attribute")
125def extract_rois_from_labeled_mask(
126 labeled_mask: np.ndarray,
127 min_area: int = 10,
128 extract_contours: bool = True
129) -> List[ROI]:
130 """Extract ROIs from a labeled segmentation mask.
132 Args:
133 labeled_mask: 2D integer array where each unique value (except 0) represents a cell/object
134 min_area: Minimum area (in pixels) for an ROI to be included
135 extract_contours: If True, extract polygon contours; if False, use binary masks
137 Returns:
138 List of ROI objects with shapes and metadata
139 """
140 from skimage import measure
141 from skimage.measure import regionprops
143 if labeled_mask.ndim != 2: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 raise ValueError(f"Labeled mask must be 2D, got shape {labeled_mask.shape}")
146 # Convert to integer type if needed (regionprops requires integer labels)
147 if not np.issubdtype(labeled_mask.dtype, np.integer): 147 ↛ 151line 147 didn't jump to line 151 because the condition on line 147 was always true
148 labeled_mask = labeled_mask.astype(np.int32)
150 # Get region properties
151 regions = regionprops(labeled_mask)
153 rois = []
154 for region in regions:
155 # Filter by area
156 if region.area < min_area: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 continue
159 # Extract metadata
160 metadata = {
161 'label': int(region.label),
162 'area': float(region.area),
163 'perimeter': float(region.perimeter),
164 'centroid': tuple(float(c) for c in region.centroid), # (y, x)
165 'bbox': tuple(int(b) for b in region.bbox), # (min_y, min_x, max_y, max_x)
166 }
168 # Extract shapes
169 shapes = []
171 if extract_contours: 171 ↛ 186line 171 didn't jump to line 186 because the condition on line 171 was always true
172 # Find contours for this region
173 # Create binary mask for this label
174 binary_mask = (labeled_mask == region.label)
176 # Find contours
177 contours = measure.find_contours(binary_mask.astype(float), level=0.5)
179 # Convert contours to polygon shapes
180 for contour in contours:
181 if len(contour) >= 3: # Valid polygon 181 ↛ 180line 181 didn't jump to line 180 because the condition on line 181 was always true
182 # Contour is already in (y, x) format
183 shapes.append(PolygonShape(coordinates=contour))
184 else:
185 # Use binary mask
186 binary_mask = (labeled_mask == region.label)
187 shapes.append(MaskShape(mask=binary_mask, bbox=region.bbox))
189 # Create ROI if we have shapes
190 if shapes: 190 ↛ 154line 190 didn't jump to line 154 because the condition on line 190 was always true
191 rois.append(ROI(shapes=shapes, metadata=metadata))
193 logger.info(f"Extracted {len(rois)} ROIs from labeled mask")
194 return rois
197def materialize_rois(
198 rois: List[ROI],
199 output_path: str,
200 filemanager,
201 backend: str
202) -> str:
203 """Materialize ROIs to backend-specific format.
205 This is the generic materialization function that dispatches to
206 backend-specific implementations.
208 Args:
209 rois: List of ROI objects to materialize
210 output_path: Output path (backend-specific format)
211 filemanager: FileManager instance (used to get backend and extract images_dir)
212 backend: Backend name (disk, omero_local, napari_stream, fiji_stream)
214 Returns:
215 Path where ROIs were saved
216 """
217 from openhcs.constants.constants import Backend
219 # Get backend instance from filemanager
220 backend_obj = filemanager.get_backend(backend)
222 # Extract images_dir from filemanager's materialization context
223 images_dir = None
224 if hasattr(filemanager, '_materialization_context'):
225 images_dir = filemanager._materialization_context.get('images_dir')
227 # Dispatch to backend-specific method with explicit images_dir parameter
228 if hasattr(backend_obj, '_save_rois'):
229 return backend_obj._save_rois(rois, Path(output_path), images_dir=images_dir)
230 else:
231 raise NotImplementedError(f"Backend {backend} does not support ROI saving")
234def load_rois_from_json(json_path: Path) -> List[ROI]:
235 """Load ROIs from JSON file.
237 Deserializes ROI JSON files created by the disk backend back into ROI objects.
238 This allows ROIs to be loaded and re-streamed to viewers.
240 Args:
241 json_path: Path to ROI JSON file
243 Returns:
244 List of ROI objects
246 Raises:
247 FileNotFoundError: If JSON file doesn't exist
248 ValueError: If JSON format is invalid
249 """
250 import json
252 if not json_path.exists():
253 raise FileNotFoundError(f"ROI JSON file not found: {json_path}")
255 with open(json_path, 'r') as f:
256 rois_data = json.load(f)
258 if not isinstance(rois_data, list):
259 raise ValueError(f"Invalid ROI JSON format: expected list, got {type(rois_data)}")
261 rois = []
262 for roi_dict in rois_data:
263 # Extract metadata
264 metadata = roi_dict.get('metadata', {})
266 # Deserialize shapes
267 shapes = []
268 for shape_dict in roi_dict.get('shapes', []):
269 shape_type = shape_dict.get('type')
271 if shape_type == 'polygon':
272 coordinates = np.array(shape_dict['coordinates'])
273 shapes.append(PolygonShape(coordinates=coordinates))
275 elif shape_type == 'mask':
276 mask = np.array(shape_dict['mask'], dtype=bool)
277 bbox = tuple(shape_dict['bbox'])
278 shapes.append(MaskShape(mask=mask, bbox=bbox))
280 elif shape_type == 'point':
281 shapes.append(PointShape(y=shape_dict['y'], x=shape_dict['x']))
283 elif shape_type == 'ellipse':
284 shapes.append(EllipseShape(
285 center_y=shape_dict['center_y'],
286 center_x=shape_dict['center_x'],
287 radius_y=shape_dict['radius_y'],
288 radius_x=shape_dict['radius_x']
289 ))
290 else:
291 logger.warning(f"Unknown shape type: {shape_type}, skipping")
293 # Create ROI if we have shapes
294 if shapes:
295 rois.append(ROI(shapes=shapes, metadata=metadata))
297 logger.info(f"Loaded {len(rois)} ROIs from {json_path}")
298 return rois
301def load_rois_from_zip(zip_path: Path) -> List[ROI]:
302 """Load ROIs from .roi.zip archive (ImageJ standard format).
304 Deserializes .roi.zip files created by the disk backend back into ROI objects.
305 This allows ROIs to be loaded and re-streamed to viewers.
307 Args:
308 zip_path: Path to .roi.zip archive
310 Returns:
311 List of ROI objects
313 Raises:
314 FileNotFoundError: If zip file doesn't exist
315 ValueError: If zip format is invalid or contains no valid ROIs
316 ImportError: If roifile library is not available
317 """
318 import zipfile
320 if not zip_path.exists():
321 raise FileNotFoundError(f"ROI zip file not found: {zip_path}")
323 try:
324 from roifile import ImagejRoi
325 except ImportError:
326 raise ImportError("roifile library required for loading .roi.zip files. Install with: pip install roifile")
328 rois = []
330 with zipfile.ZipFile(zip_path, 'r') as zf:
331 for filename in zf.namelist():
332 if filename.endswith('.roi'):
333 try:
334 roi_bytes = zf.read(filename)
335 ij_roi = ImagejRoi.frombytes(roi_bytes)
337 # Convert ImageJ ROI to OpenHCS ROI
338 # ImageJ uses (x, y), OpenHCS uses (y, x)
339 coords = ij_roi.coordinates()
340 if coords is not None and len(coords) > 0:
341 coords_yx = coords[:, [1, 0]] # Swap to (y, x)
343 shape = PolygonShape(coordinates=coords_yx)
344 roi = ROI(
345 shapes=[shape],
346 metadata={'label': ij_roi.name or filename.replace('.roi', '')}
347 )
348 rois.append(roi)
349 except Exception as e:
350 logger.warning(f"Failed to load ROI from {filename}: {e}")
351 continue
353 if not rois:
354 raise ValueError(f"No valid ROIs found in {zip_path}")
356 logger.info(f"Loaded {len(rois)} ROIs from {zip_path}")
357 return rois