Coverage for openhcs/core/roi.py: 42.0%

145 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Generic ROI (Region of Interest) system for OpenHCS. 

3 

4This module provides backend-agnostic ROI extraction and representation. 

5ROIs are extracted from labeled segmentation masks and can be materialized 

6to various backends (OMERO, disk, Napari, Fiji). 

7 

8Doctrinal Clauses: 

9- Clause 66 — Immutability After Construction 

10- Clause 88 — No Inferred Capabilities 

11""" 

12 

13import logging 

14import numpy as np 

15from dataclasses import dataclass, field 

16from typing import List, Dict, Any, Optional, Tuple 

17from pathlib import Path 

18from enum import Enum 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class ShapeType(Enum): 

24 """ROI shape types.""" 

25 POLYGON = "polygon" 

26 MASK = "mask" 

27 POINT = "point" 

28 ELLIPSE = "ellipse" 

29 

30 

31@dataclass(frozen=True) 

32class PolygonShape: 

33 """Polygon ROI shape defined by vertex coordinates. 

34 

35 Attributes: 

36 coordinates: Nx2 array of (y, x) coordinates 

37 shape_type: Always ShapeType.POLYGON 

38 """ 

39 coordinates: np.ndarray # Nx2 array of (y, x) coordinates 

40 shape_type: ShapeType = field(default=ShapeType.POLYGON, init=False) 

41 

42 def __post_init__(self): 

43 """Validate polygon coordinates.""" 

44 if self.coordinates.ndim != 2 or self.coordinates.shape[1] != 2: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 raise ValueError(f"Polygon coordinates must be Nx2 array, got shape {self.coordinates.shape}") 

46 if len(self.coordinates) < 3: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true

47 raise ValueError(f"Polygon must have at least 3 vertices, got {len(self.coordinates)}") 

48 

49 

50@dataclass(frozen=True) 

51class MaskShape: 

52 """Binary mask ROI shape. 

53 

54 Attributes: 

55 mask: 2D boolean array 

56 bbox: Bounding box (min_y, min_x, max_y, max_x) 

57 shape_type: Always ShapeType.MASK 

58 """ 

59 mask: np.ndarray # 2D boolean array 

60 bbox: Tuple[int, int, int, int] # (min_y, min_x, max_y, max_x) 

61 shape_type: ShapeType = field(default=ShapeType.MASK, init=False) 

62 

63 def __post_init__(self): 

64 """Validate mask.""" 

65 if self.mask.ndim != 2: 

66 raise ValueError(f"Mask must be 2D array, got shape {self.mask.shape}") 

67 if self.mask.dtype != bool: 

68 raise ValueError(f"Mask must be boolean array, got dtype {self.mask.dtype}") 

69 

70 

71@dataclass(frozen=True) 

72class PointShape: 

73 """Point ROI shape. 

74 

75 Attributes: 

76 y: Y coordinate 

77 x: X coordinate 

78 shape_type: Always ShapeType.POINT 

79 """ 

80 y: float 

81 x: float 

82 shape_type: ShapeType = field(default=ShapeType.POINT, init=False) 

83 

84 

85@dataclass(frozen=True) 

86class EllipseShape: 

87 """Ellipse ROI shape. 

88 

89 Attributes: 

90 center_y: Y coordinate of center 

91 center_x: X coordinate of center 

92 radius_y: Radius along Y axis 

93 radius_x: Radius along X axis 

94 shape_type: Always ShapeType.ELLIPSE 

95 """ 

96 center_y: float 

97 center_x: float 

98 radius_y: float 

99 radius_x: float 

100 shape_type: ShapeType = field(default=ShapeType.ELLIPSE, init=False) 

101 

102 

103@dataclass(frozen=True) 

104class ROI: 

105 """Region of Interest with metadata. 

106 

107 Attributes: 

108 shapes: List of shape objects (PolygonShape, MaskShape, etc.) 

109 metadata: Dictionary of ROI metadata (label, area, perimeter, etc.) 

110 """ 

111 shapes: List[Any] # List of shape objects 

112 metadata: Dict[str, Any] = field(default_factory=dict) 

113 

114 def __post_init__(self): 

115 """Validate ROI.""" 

116 if not self.shapes: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 raise ValueError("ROI must have at least one shape") 

118 

119 # Validate all shapes have shape_type attribute 

120 for shape in self.shapes: 

121 if not hasattr(shape, 'shape_type'): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 raise ValueError(f"Shape {shape} must have shape_type attribute") 

123 

124 

125def extract_rois_from_labeled_mask( 

126 labeled_mask: np.ndarray, 

127 min_area: int = 10, 

128 extract_contours: bool = True 

129) -> List[ROI]: 

130 """Extract ROIs from a labeled segmentation mask. 

131 

132 Args: 

133 labeled_mask: 2D integer array where each unique value (except 0) represents a cell/object 

134 min_area: Minimum area (in pixels) for an ROI to be included 

135 extract_contours: If True, extract polygon contours; if False, use binary masks 

136 

137 Returns: 

138 List of ROI objects with shapes and metadata 

139 """ 

140 from skimage import measure 

141 from skimage.measure import regionprops 

142 

143 if labeled_mask.ndim != 2: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 raise ValueError(f"Labeled mask must be 2D, got shape {labeled_mask.shape}") 

145 

146 # Convert to integer type if needed (regionprops requires integer labels) 

147 if not np.issubdtype(labeled_mask.dtype, np.integer): 147 ↛ 151line 147 didn't jump to line 151 because the condition on line 147 was always true

148 labeled_mask = labeled_mask.astype(np.int32) 

149 

150 # Get region properties 

151 regions = regionprops(labeled_mask) 

152 

153 rois = [] 

154 for region in regions: 

155 # Filter by area 

156 if region.area < min_area: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 continue 

158 

159 # Extract metadata 

160 metadata = { 

161 'label': int(region.label), 

162 'area': float(region.area), 

163 'perimeter': float(region.perimeter), 

164 'centroid': tuple(float(c) for c in region.centroid), # (y, x) 

165 'bbox': tuple(int(b) for b in region.bbox), # (min_y, min_x, max_y, max_x) 

166 } 

167 

168 # Extract shapes 

169 shapes = [] 

170 

171 if extract_contours: 171 ↛ 186line 171 didn't jump to line 186 because the condition on line 171 was always true

172 # Find contours for this region 

173 # Create binary mask for this label 

174 binary_mask = (labeled_mask == region.label) 

175 

176 # Find contours 

177 contours = measure.find_contours(binary_mask.astype(float), level=0.5) 

178 

179 # Convert contours to polygon shapes 

180 for contour in contours: 

181 if len(contour) >= 3: # Valid polygon 181 ↛ 180line 181 didn't jump to line 180 because the condition on line 181 was always true

182 # Contour is already in (y, x) format 

183 shapes.append(PolygonShape(coordinates=contour)) 

184 else: 

185 # Use binary mask 

186 binary_mask = (labeled_mask == region.label) 

187 shapes.append(MaskShape(mask=binary_mask, bbox=region.bbox)) 

188 

189 # Create ROI if we have shapes 

190 if shapes: 190 ↛ 154line 190 didn't jump to line 154 because the condition on line 190 was always true

191 rois.append(ROI(shapes=shapes, metadata=metadata)) 

192 

193 logger.info(f"Extracted {len(rois)} ROIs from labeled mask") 

194 return rois 

195 

196 

197def materialize_rois( 

198 rois: List[ROI], 

199 output_path: str, 

200 filemanager, 

201 backend: str 

202) -> str: 

203 """Materialize ROIs to backend-specific format. 

204 

205 This is the generic materialization function that dispatches to 

206 backend-specific implementations. 

207 

208 Args: 

209 rois: List of ROI objects to materialize 

210 output_path: Output path (backend-specific format) 

211 filemanager: FileManager instance (used to get backend and extract images_dir) 

212 backend: Backend name (disk, omero_local, napari_stream, fiji_stream) 

213 

214 Returns: 

215 Path where ROIs were saved 

216 """ 

217 from openhcs.constants.constants import Backend 

218 

219 # Get backend instance from filemanager 

220 backend_obj = filemanager.get_backend(backend) 

221 

222 # Extract images_dir from filemanager's materialization context 

223 images_dir = None 

224 if hasattr(filemanager, '_materialization_context'): 

225 images_dir = filemanager._materialization_context.get('images_dir') 

226 

227 # Dispatch to backend-specific method with explicit images_dir parameter 

228 if hasattr(backend_obj, '_save_rois'): 

229 return backend_obj._save_rois(rois, Path(output_path), images_dir=images_dir) 

230 else: 

231 raise NotImplementedError(f"Backend {backend} does not support ROI saving") 

232 

233 

234def load_rois_from_json(json_path: Path) -> List[ROI]: 

235 """Load ROIs from JSON file. 

236 

237 Deserializes ROI JSON files created by the disk backend back into ROI objects. 

238 This allows ROIs to be loaded and re-streamed to viewers. 

239 

240 Args: 

241 json_path: Path to ROI JSON file 

242 

243 Returns: 

244 List of ROI objects 

245 

246 Raises: 

247 FileNotFoundError: If JSON file doesn't exist 

248 ValueError: If JSON format is invalid 

249 """ 

250 import json 

251 

252 if not json_path.exists(): 

253 raise FileNotFoundError(f"ROI JSON file not found: {json_path}") 

254 

255 with open(json_path, 'r') as f: 

256 rois_data = json.load(f) 

257 

258 if not isinstance(rois_data, list): 

259 raise ValueError(f"Invalid ROI JSON format: expected list, got {type(rois_data)}") 

260 

261 rois = [] 

262 for roi_dict in rois_data: 

263 # Extract metadata 

264 metadata = roi_dict.get('metadata', {}) 

265 

266 # Deserialize shapes 

267 shapes = [] 

268 for shape_dict in roi_dict.get('shapes', []): 

269 shape_type = shape_dict.get('type') 

270 

271 if shape_type == 'polygon': 

272 coordinates = np.array(shape_dict['coordinates']) 

273 shapes.append(PolygonShape(coordinates=coordinates)) 

274 

275 elif shape_type == 'mask': 

276 mask = np.array(shape_dict['mask'], dtype=bool) 

277 bbox = tuple(shape_dict['bbox']) 

278 shapes.append(MaskShape(mask=mask, bbox=bbox)) 

279 

280 elif shape_type == 'point': 

281 shapes.append(PointShape(y=shape_dict['y'], x=shape_dict['x'])) 

282 

283 elif shape_type == 'ellipse': 

284 shapes.append(EllipseShape( 

285 center_y=shape_dict['center_y'], 

286 center_x=shape_dict['center_x'], 

287 radius_y=shape_dict['radius_y'], 

288 radius_x=shape_dict['radius_x'] 

289 )) 

290 else: 

291 logger.warning(f"Unknown shape type: {shape_type}, skipping") 

292 

293 # Create ROI if we have shapes 

294 if shapes: 

295 rois.append(ROI(shapes=shapes, metadata=metadata)) 

296 

297 logger.info(f"Loaded {len(rois)} ROIs from {json_path}") 

298 return rois 

299 

300 

301def load_rois_from_zip(zip_path: Path) -> List[ROI]: 

302 """Load ROIs from .roi.zip archive (ImageJ standard format). 

303 

304 Deserializes .roi.zip files created by the disk backend back into ROI objects. 

305 This allows ROIs to be loaded and re-streamed to viewers. 

306 

307 Args: 

308 zip_path: Path to .roi.zip archive 

309 

310 Returns: 

311 List of ROI objects 

312 

313 Raises: 

314 FileNotFoundError: If zip file doesn't exist 

315 ValueError: If zip format is invalid or contains no valid ROIs 

316 ImportError: If roifile library is not available 

317 """ 

318 import zipfile 

319 

320 if not zip_path.exists(): 

321 raise FileNotFoundError(f"ROI zip file not found: {zip_path}") 

322 

323 try: 

324 from roifile import ImagejRoi 

325 except ImportError: 

326 raise ImportError("roifile library required for loading .roi.zip files. Install with: pip install roifile") 

327 

328 rois = [] 

329 

330 with zipfile.ZipFile(zip_path, 'r') as zf: 

331 for filename in zf.namelist(): 

332 if filename.endswith('.roi'): 

333 try: 

334 roi_bytes = zf.read(filename) 

335 ij_roi = ImagejRoi.frombytes(roi_bytes) 

336 

337 # Convert ImageJ ROI to OpenHCS ROI 

338 # ImageJ uses (x, y), OpenHCS uses (y, x) 

339 coords = ij_roi.coordinates() 

340 if coords is not None and len(coords) > 0: 

341 coords_yx = coords[:, [1, 0]] # Swap to (y, x) 

342 

343 shape = PolygonShape(coordinates=coords_yx) 

344 roi = ROI( 

345 shapes=[shape], 

346 metadata={'label': ij_roi.name or filename.replace('.roi', '')} 

347 ) 

348 rois.append(roi) 

349 except Exception as e: 

350 logger.warning(f"Failed to load ROI from {filename}: {e}") 

351 continue 

352 

353 if not rois: 

354 raise ValueError(f"No valid ROIs found in {zip_path}") 

355 

356 logger.info(f"Loaded {len(rois)} ROIs from {zip_path}") 

357 return rois 

358