Coverage for openhcs/microscopes/openhcs.py: 71.0%

392 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2OpenHCS microscope handler implementation for openhcs. 

3 

4This module provides the OpenHCSMicroscopeHandler, which reads plates 

5that have been pre-processed and standardized into the OpenHCS format. 

6The metadata for such plates is defined in an 'openhcs_metadata.json' file. 

7""" 

8 

9import json 

10import logging 

11from dataclasses import dataclass, asdict 

12from pathlib import Path 

13from typing import Any, Dict, List, Optional, Tuple, Union, Type 

14 

15from openhcs.constants.constants import Backend, GroupBy, AllComponents 

16from openhcs.io.exceptions import MetadataNotFoundError 

17from openhcs.io.filemanager import FileManager 

18from openhcs.io.metadata_writer import AtomicMetadataWriter, MetadataWriteError, get_metadata_path, METADATA_CONFIG 

19from openhcs.microscopes.microscope_interfaces import MetadataHandler 

20logger = logging.getLogger(__name__) 

21 

22 

23@dataclass(frozen=True) 

24class OpenHCSMetadataFields: 

25 """Centralized constants for OpenHCS metadata field names.""" 

26 # Core metadata structure - use centralized constants 

27 SUBDIRECTORIES: str = METADATA_CONFIG.SUBDIRECTORIES_KEY 

28 IMAGE_FILES: str = "image_files" 

29 AVAILABLE_BACKENDS: str = METADATA_CONFIG.AVAILABLE_BACKENDS_KEY 

30 

31 # Required metadata fields 

32 GRID_DIMENSIONS: str = "grid_dimensions" 

33 PIXEL_SIZE: str = "pixel_size" 

34 SOURCE_FILENAME_PARSER_NAME: str = "source_filename_parser_name" 

35 MICROSCOPE_HANDLER_NAME: str = "microscope_handler_name" 

36 

37 # Optional metadata fields 

38 CHANNELS: str = "channels" 

39 WELLS: str = "wells" 

40 SITES: str = "sites" 

41 Z_INDEXES: str = "z_indexes" 

42 TIMEPOINTS: str = "timepoints" 

43 OBJECTIVES: str = "objectives" 

44 ACQUISITION_DATETIME: str = "acquisition_datetime" 

45 PLATE_NAME: str = "plate_name" 

46 

47 # Default values 

48 DEFAULT_SUBDIRECTORY: str = "." 

49 DEFAULT_SUBDIRECTORY_LEGACY: str = "images" 

50 

51 # Microscope type identifier 

52 MICROSCOPE_TYPE: str = "openhcsdata" 

53 

54 

55# Global instance for easy access 

56FIELDS = OpenHCSMetadataFields() 

57 

58def _get_available_filename_parsers(): 

59 """ 

60 Lazy import of filename parsers to avoid circular imports. 

61 

62 Returns: 

63 Dict mapping parser class names to parser classes 

64 """ 

65 # Import parsers only when needed to avoid circular imports 

66 from openhcs.microscopes.imagexpress import ImageXpressFilenameParser 

67 from openhcs.microscopes.opera_phenix import OperaPhenixFilenameParser 

68 

69 return { 

70 "ImageXpressFilenameParser": ImageXpressFilenameParser, 

71 "OperaPhenixFilenameParser": OperaPhenixFilenameParser, 

72 # Add other parsers to this dictionary as they are implemented/imported. 

73 # Example: "MyOtherParser": MyOtherParser, 

74 } 

75 

76 

77class OpenHCSMetadataHandler(MetadataHandler): 

78 """ 

79 Metadata handler for the OpenHCS pre-processed format. 

80 

81 This handler reads metadata from an 'openhcs_metadata.json' file 

82 located in the root of the plate folder. 

83 """ 

84 METADATA_FILENAME = METADATA_CONFIG.METADATA_FILENAME 

85 

86 def __init__(self, filemanager: FileManager): 

87 """ 

88 Initialize the metadata handler. 

89 

90 Args: 

91 filemanager: FileManager instance for file operations. 

92 """ 

93 super().__init__() 

94 self.filemanager = filemanager 

95 self.atomic_writer = AtomicMetadataWriter() 

96 self._metadata_cache: Optional[Dict[str, Any]] = None 

97 self._plate_path_cache: Optional[Path] = None 

98 

99 def _load_metadata(self, plate_path: Union[str, Path]) -> Dict[str, Any]: 

100 """ 

101 Loads the JSON metadata file if not already cached or if plate_path changed. 

102 

103 Args: 

104 plate_path: Path to the plate folder. 

105 

106 Returns: 

107 A dictionary containing the parsed JSON metadata. 

108 

109 Raises: 

110 MetadataNotFoundError: If the metadata file cannot be found or parsed. 

111 FileNotFoundError: If plate_path does not exist. 

112 """ 

113 current_path = Path(plate_path) 

114 if self._metadata_cache is not None and self._plate_path_cache == current_path: 

115 return self._metadata_cache 

116 

117 metadata_file_path = self.find_metadata_file(current_path) 

118 if not self.filemanager.exists(str(metadata_file_path), Backend.DISK.value): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 raise MetadataNotFoundError(f"Metadata file '{self.METADATA_FILENAME}' not found in {plate_path}") 

120 

121 try: 

122 content = self.filemanager.load(str(metadata_file_path), Backend.DISK.value) 

123 # Backend may return already-parsed dict (disk backend auto-parses JSON) 

124 if isinstance(content, dict): 124 ↛ 128line 124 didn't jump to line 128 because the condition on line 124 was always true

125 metadata_dict = content 

126 else: 

127 # Otherwise parse raw bytes/string 

128 metadata_dict = json.loads(content.decode('utf-8') if isinstance(content, bytes) else content) 

129 

130 # Handle subdirectory-keyed format 

131 if subdirs := metadata_dict.get(FIELDS.SUBDIRECTORIES): 131 ↛ 149line 131 didn't jump to line 149 because the condition on line 131 was always true

132 if not subdirs: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 raise MetadataNotFoundError(f"Empty subdirectories in metadata file '{metadata_file_path}'") 

134 

135 # Use main subdirectory as base (marked with "main": true), fallback to first 

136 main_subdir = next((data for data in subdirs.values() if data.get("main")), None) 

137 if not main_subdir: 137 ↛ 139line 137 didn't jump to line 139 because the condition on line 137 was never true

138 # Fallback to first subdirectory if no main is marked 

139 main_subdir = next(iter(subdirs.values())) 

140 

141 base_metadata = main_subdir.copy() 

142 base_metadata[FIELDS.IMAGE_FILES] = [ 

143 file for subdir in subdirs.values() 

144 for file in subdir.get(FIELDS.IMAGE_FILES, []) 

145 ] 

146 self._metadata_cache = base_metadata 

147 else: 

148 # Legacy format not supported - use migration script 

149 raise MetadataNotFoundError( 

150 f"Legacy metadata format detected in '{metadata_file_path}'. " 

151 f"Please run the migration script: python scripts/migrate_legacy_metadata.py {current_path}" 

152 ) 

153 

154 self._plate_path_cache = current_path 

155 return self._metadata_cache 

156 

157 except json.JSONDecodeError as e: 

158 raise MetadataNotFoundError(f"Error decoding JSON from '{metadata_file_path}': {e}") from e 

159 

160 

161 

162 def determine_main_subdirectory(self, plate_path: Union[str, Path]) -> str: 

163 """Determine main input subdirectory from metadata.""" 

164 metadata_dict = self._load_metadata_dict(plate_path) 

165 subdirs = metadata_dict.get(FIELDS.SUBDIRECTORIES) 

166 

167 # Legacy format not supported - should have been caught by _load_metadata_dict 

168 if not subdirs: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true

169 raise MetadataNotFoundError(f"No subdirectories found in metadata for {plate_path}") 

170 

171 # Single subdirectory - use it 

172 if len(subdirs) == 1: 

173 return next(iter(subdirs.keys())) 

174 

175 # Multiple subdirectories - find main or fallback 

176 main_subdir = next((name for name, data in subdirs.items() if data.get("main")), None) 

177 if main_subdir: 177 ↛ 181line 177 didn't jump to line 181 because the condition on line 177 was always true

178 return main_subdir 

179 

180 # Fallback hierarchy: legacy default -> first available 

181 if FIELDS.DEFAULT_SUBDIRECTORY_LEGACY in subdirs: 

182 return FIELDS.DEFAULT_SUBDIRECTORY_LEGACY 

183 else: 

184 return next(iter(subdirs.keys())) 

185 

186 def _load_metadata_dict(self, plate_path: Union[str, Path]) -> Dict[str, Any]: 

187 """Load and parse metadata JSON, fail-loud on errors.""" 

188 metadata_file_path = self.find_metadata_file(plate_path) 

189 if not self.filemanager.exists(str(metadata_file_path), Backend.DISK.value): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 raise MetadataNotFoundError(f"Metadata file '{self.METADATA_FILENAME}' not found in {plate_path}") 

191 

192 try: 

193 content = self.filemanager.load(str(metadata_file_path), Backend.DISK.value) 

194 # Backend may return already-parsed dict (disk backend auto-parses JSON) 

195 if isinstance(content, dict): 195 ↛ 198line 195 didn't jump to line 198 because the condition on line 195 was always true

196 return content 

197 # Otherwise parse raw bytes/string 

198 return json.loads(content.decode('utf-8') if isinstance(content, bytes) else content) 

199 except json.JSONDecodeError as e: 

200 raise MetadataNotFoundError(f"Error decoding JSON from '{metadata_file_path}': {e}") from e 

201 

202 def find_metadata_file(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Path]: 

203 """Find the OpenHCS JSON metadata file.""" 

204 plate_p = Path(plate_path) 

205 if not self.filemanager.is_dir(str(plate_p), Backend.DISK.value): 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true

206 return None 

207 

208 expected_file = plate_p / self.METADATA_FILENAME 

209 if self.filemanager.exists(str(expected_file), Backend.DISK.value): 

210 return expected_file 

211 

212 # Fallback: recursive search 

213 try: 

214 if found_files := self.filemanager.find_file_recursive(plate_p, self.METADATA_FILENAME, Backend.DISK.value): 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 if isinstance(found_files, list): 

216 # Prioritize root location, then first found 

217 return next((Path(f) for f in found_files if Path(f).parent == plate_p), Path(found_files[0])) 

218 return Path(found_files) 

219 except Exception as e: 

220 logger.error(f"Error searching for {self.METADATA_FILENAME} in {plate_path}: {e}") 

221 

222 return None 

223 

224 

225 def get_grid_dimensions(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Tuple[int, int]: 

226 """Get grid dimensions from OpenHCS metadata.""" 

227 dims = self._load_metadata(plate_path).get(FIELDS.GRID_DIMENSIONS) 

228 if not (isinstance(dims, list) and len(dims) == 2 and all(isinstance(d, int) for d in dims)): 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 raise ValueError(f"'{FIELDS.GRID_DIMENSIONS}' must be a list of two integers in {self.METADATA_FILENAME}") 

230 return tuple(dims) 

231 

232 def get_pixel_size(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> float: 

233 """Get pixel size from OpenHCS metadata.""" 

234 pixel_size = self._load_metadata(plate_path).get(FIELDS.PIXEL_SIZE) 

235 if not isinstance(pixel_size, (float, int)): 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 raise ValueError(f"'{FIELDS.PIXEL_SIZE}' must be a number in {self.METADATA_FILENAME}") 

237 return float(pixel_size) 

238 

239 def get_source_filename_parser_name(self, plate_path: Union[str, Path]) -> str: 

240 """Get source filename parser name from OpenHCS metadata.""" 

241 parser_name = self._load_metadata(plate_path).get(FIELDS.SOURCE_FILENAME_PARSER_NAME) 

242 if not (isinstance(parser_name, str) and parser_name): 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 raise ValueError(f"'{FIELDS.SOURCE_FILENAME_PARSER_NAME}' must be a non-empty string in {self.METADATA_FILENAME}") 

244 return parser_name 

245 

246 # Uses default get_image_files() implementation from MetadataHandler ABC 

247 # (prefers workspace_mapping keys, falls back to image_files list) 

248 

249 # Optional metadata getters 

250 def _get_optional_metadata_dict(self, plate_path: Union[str, Path], key: str) -> Optional[Dict[str, str]]: 

251 """Helper to get optional dictionary metadata.""" 

252 value = self._load_metadata(plate_path).get(key) 

253 return {str(k): str(v) for k, v in value.items()} if isinstance(value, dict) else None 

254 

255 def get_channel_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]: 

256 return self._get_optional_metadata_dict(plate_path, FIELDS.CHANNELS) 

257 

258 def get_well_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]: 

259 return self._get_optional_metadata_dict(plate_path, FIELDS.WELLS) 

260 

261 def get_site_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]: 

262 return self._get_optional_metadata_dict(plate_path, FIELDS.SITES) 

263 

264 def get_z_index_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]: 

265 return self._get_optional_metadata_dict(plate_path, FIELDS.Z_INDEXES) 

266 

267 def get_timepoint_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]: 

268 return self._get_optional_metadata_dict(plate_path, FIELDS.TIMEPOINTS) 

269 

270 def get_objective_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Any]]: 

271 """Get objective lens information if available.""" 

272 return self._get_optional_metadata_dict(plate_path, FIELDS.OBJECTIVES) 

273 

274 def get_plate_acquisition_datetime(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[str]: 

275 """Get plate acquisition datetime if available.""" 

276 return self._get_optional_metadata_str(plate_path, FIELDS.ACQUISITION_DATETIME) 

277 

278 def get_plate_name(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[str]: 

279 """Get plate name if available.""" 

280 return self._get_optional_metadata_str(plate_path, FIELDS.PLATE_NAME) 

281 

282 def _get_optional_metadata_str(self, plate_path: Union[str, Path], field: str) -> Optional[str]: 

283 """Helper to get optional string metadata field.""" 

284 value = self._load_metadata(plate_path).get(field) 

285 return value if isinstance(value, str) and value else None 

286 

287 def get_available_backends(self, input_dir: Union[str, Path]) -> Dict[str, bool]: 

288 """ 

289 Get available storage backends for the input directory. 

290 

291 This method resolves the plate root from the input directory, 

292 loads the OpenHCS metadata, and returns the available backends. 

293 

294 Args: 

295 input_dir: Path to the input directory (may be plate root or subdirectory) 

296 

297 Returns: 

298 Dictionary mapping backend names to availability (e.g., {"disk": True, "zarr": False}) 

299 

300 Raises: 

301 MetadataNotFoundError: If metadata file cannot be found or parsed 

302 """ 

303 # Resolve plate root from input directory 

304 plate_root = self._resolve_plate_root(input_dir) 

305 

306 # Load metadata using existing infrastructure 

307 metadata = self._load_metadata(plate_root) 

308 

309 # Extract available backends, defaulting to empty dict if not present 

310 available_backends = metadata.get(FIELDS.AVAILABLE_BACKENDS, {}) 

311 

312 if not isinstance(available_backends, dict): 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true

313 logger.warning(f"Invalid available_backends format in metadata: {available_backends}") 

314 return {} 

315 

316 return available_backends 

317 

318 

319 

320 def _resolve_plate_root(self, input_dir: Union[str, Path]) -> Path: 

321 """ 

322 Resolve the plate root directory from an input directory. 

323 

324 The input directory may be the plate root itself or a subdirectory. 

325 This method walks up the directory tree to find the directory containing 

326 the OpenHCS metadata file. 

327 

328 Args: 

329 input_dir: Path to resolve 

330 

331 Returns: 

332 Path to the plate root directory 

333 

334 Raises: 

335 MetadataNotFoundError: If no metadata file is found 

336 """ 

337 current_path = Path(input_dir) 

338 

339 # Walk up the directory tree looking for metadata file 

340 for path in [current_path] + list(current_path.parents): 340 ↛ 346line 340 didn't jump to line 346 because the loop on line 340 didn't complete

341 metadata_file = path / self.METADATA_FILENAME 

342 if self.filemanager.exists(str(metadata_file), Backend.DISK.value): 342 ↛ 340line 342 didn't jump to line 340 because the condition on line 342 was always true

343 return path 

344 

345 # If not found, raise an error 

346 raise MetadataNotFoundError( 

347 f"Could not find {self.METADATA_FILENAME} in {input_dir} or any parent directory" 

348 ) 

349 

350 def update_available_backends(self, plate_path: Union[str, Path], available_backends: Dict[str, bool]) -> None: 

351 """Update available storage backends in metadata and save to disk.""" 

352 metadata_file_path = get_metadata_path(plate_path) 

353 

354 try: 

355 self.atomic_writer.update_available_backends(metadata_file_path, available_backends) 

356 # Clear cache to force reload on next access 

357 self._metadata_cache = None 

358 self._plate_path_cache = None 

359 logger.info(f"Updated available backends to {available_backends} in {metadata_file_path}") 

360 except MetadataWriteError as e: 

361 raise ValueError(f"Failed to update available backends: {e}") from e 

362 

363 

364@dataclass(frozen=True) 

365class OpenHCSMetadata: 

366 """ 

367 Declarative OpenHCS metadata structure. 

368 

369 Fail-loud: All fields are required, no defaults, no fallbacks. 

370 """ 

371 microscope_handler_name: str 

372 source_filename_parser_name: str 

373 grid_dimensions: List[int] 

374 pixel_size: float 

375 image_files: List[str] 

376 channels: Optional[Dict[str, str]] 

377 wells: Optional[Dict[str, str]] 

378 sites: Optional[Dict[str, str]] 

379 z_indexes: Optional[Dict[str, str]] 

380 timepoints: Optional[Dict[str, str]] 

381 available_backends: Dict[str, bool] 

382 workspace_mapping: Optional[Dict[str, str]] = None # Plate-relative virtual → real path mapping 

383 main: Optional[bool] = None # Indicates if this subdirectory is the primary/input subdirectory 

384 results_dir: Optional[str] = None # Sibling directory containing analysis results for this subdirectory 

385 

386 

387@dataclass(frozen=True) 

388class SubdirectoryKeyedMetadata: 

389 """ 

390 Subdirectory-keyed metadata structure for OpenHCS. 

391 

392 Organizes metadata by subdirectory to prevent conflicts when multiple 

393 steps write to the same plate folder with different subdirectories. 

394 

395 Structure: {subdirectory_name: OpenHCSMetadata} 

396 """ 

397 subdirectories: Dict[str, OpenHCSMetadata] 

398 

399 def get_subdirectory_metadata(self, sub_dir: str) -> Optional[OpenHCSMetadata]: 

400 """Get metadata for specific subdirectory.""" 

401 return self.subdirectories.get(sub_dir) 

402 

403 def add_subdirectory_metadata(self, sub_dir: str, metadata: OpenHCSMetadata) -> 'SubdirectoryKeyedMetadata': 

404 """Add or update metadata for subdirectory (immutable operation).""" 

405 new_subdirs = {**self.subdirectories, sub_dir: metadata} 

406 return SubdirectoryKeyedMetadata(subdirectories=new_subdirs) 

407 

408 @classmethod 

409 def from_single_metadata(cls, sub_dir: str, metadata: OpenHCSMetadata) -> 'SubdirectoryKeyedMetadata': 

410 """Create from single OpenHCSMetadata (migration helper).""" 

411 return cls(subdirectories={sub_dir: metadata}) 

412 

413 @classmethod 

414 def from_legacy_dict(cls, legacy_dict: Dict[str, Any], default_sub_dir: str = FIELDS.DEFAULT_SUBDIRECTORY_LEGACY) -> 'SubdirectoryKeyedMetadata': 

415 """Create from legacy single-subdirectory metadata dict.""" 

416 return cls.from_single_metadata(default_sub_dir, OpenHCSMetadata(**legacy_dict)) 

417 

418 

419class OpenHCSMetadataGenerator: 

420 """ 

421 Generator for OpenHCS metadata files. 

422 

423 Handles creation of openhcs_metadata.json files for processed plates, 

424 extracting information from processing context and output directories. 

425 

426 Design principle: Generate metadata that accurately reflects what exists on disk 

427 after processing, not what was originally intended or what the source contained. 

428 """ 

429 

430 def __init__(self, filemanager: FileManager): 

431 """ 

432 Initialize the metadata generator. 

433 

434 Args: 

435 filemanager: FileManager instance for file operations 

436 """ 

437 self.filemanager = filemanager 

438 self.atomic_writer = AtomicMetadataWriter() 

439 self.logger = logging.getLogger(__name__) 

440 

441 def create_metadata( 

442 self, 

443 context: 'ProcessingContext', 

444 output_dir: str, 

445 write_backend: str, 

446 is_main: bool = False, 

447 plate_root: str = None, 

448 sub_dir: str = None, 

449 results_dir: str = None, 

450 skip_if_complete: bool = False, 

451 allow_none_override: bool = False 

452 ) -> None: 

453 """Create or update subdirectory-keyed OpenHCS metadata file. 

454 

455 Args: 

456 skip_if_complete: If True, skip update if metadata already complete (has channels) 

457 allow_none_override: If True, None values override existing fields; 

458 if False (default), None values are filtered out to preserve existing fields 

459 """ 

460 plate_root_path = Path(plate_root) 

461 metadata_path = get_metadata_path(plate_root_path) 

462 

463 # Check if metadata already complete (if requested) 

464 if skip_if_complete and metadata_path.exists(): 

465 import json 

466 with open(metadata_path, 'r') as f: 

467 existing = json.load(f) 

468 

469 subdir_data = existing.get('subdirectories', {}).get(sub_dir, {}) 

470 if subdir_data.get('channels'): 

471 self.logger.debug(f"Metadata for {sub_dir} already complete, skipping") 

472 return 

473 

474 # Extract metadata from current state 

475 current_metadata = self._extract_metadata_from_disk_state(context, output_dir, write_backend, is_main, sub_dir, results_dir) 

476 metadata_dict = asdict(current_metadata) 

477 

478 # Filter None values unless override allowed 

479 if not allow_none_override: 479 ↛ 482line 479 didn't jump to line 482 because the condition on line 479 was always true

480 metadata_dict = {k: v for k, v in metadata_dict.items() if v is not None} 

481 

482 self.atomic_writer.merge_subdirectory_metadata(metadata_path, {sub_dir: metadata_dict}) 

483 

484 

485 

486 def _extract_metadata_from_disk_state(self, context: 'ProcessingContext', output_dir: str, write_backend: str, is_main: bool, sub_dir: str, results_dir: str = None) -> OpenHCSMetadata: 

487 """Extract metadata reflecting current disk state after processing. 

488 

489 CRITICAL: Extracts component metadata (channels, wells, sites, z_indexes, timepoints) 

490 by parsing actual filenames in output_dir, NOT from the original input metadata cache. 

491 This ensures metadata accurately reflects what was actually written, not what was in the input. 

492 

493 For example, if processing filters to only channels 1-2, the metadata will show only those channels. 

494 """ 

495 handler = context.microscope_handler 

496 

497 # metadata_cache is always set by create_context() - fail if not present 

498 if not hasattr(context, 'metadata_cache'): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 raise RuntimeError("ProcessingContext missing metadata_cache - must be created via create_context()") 

500 

501 actual_files = self.filemanager.list_image_files(output_dir, write_backend) 

502 relative_files = [f"{sub_dir}/{Path(f).name}" for f in actual_files] 

503 

504 # Calculate relative results directory path (relative to plate root) 

505 # Example: "images_results" for images subdirectory 

506 relative_results_dir = None 

507 if results_dir: 

508 results_path = Path(results_dir) 

509 relative_results_dir = results_path.name # Just the directory name, not full path 

510 

511 # Extract grid_dimensions and pixel_size from input metadata 

512 grid_dimensions = handler.metadata_handler._get_with_fallback('get_grid_dimensions', context.input_dir) 

513 pixel_size = handler.metadata_handler._get_with_fallback('get_pixel_size', context.input_dir) 

514 

515 # If grid_dimensions is None (fallback), try to get it from existing metadata 

516 if grid_dimensions is None: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 try: 

518 from pathlib import Path as PathlibPath 

519 plate_root = PathlibPath(context.plate_path) 

520 metadata_path = get_metadata_path(plate_root) 

521 if metadata_path.exists(): 

522 import json 

523 with open(metadata_path, 'r') as f: 

524 existing_metadata = json.load(f) 

525 # Try to get grid_dimensions from any existing subdirectory 

526 for existing_sub_dir, sub_metadata in existing_metadata.get('subdirectories', {}).items(): 

527 if sub_metadata.get('grid_dimensions'): 

528 grid_dimensions = sub_metadata['grid_dimensions'] 

529 self.logger.debug(f"Preserved grid_dimensions from existing subdirectory {existing_sub_dir}: {grid_dimensions}") 

530 break 

531 except Exception as e: 

532 self.logger.debug(f"Could not retrieve grid_dimensions from existing metadata: {e}") 

533 

534 # CRITICAL: Extract component metadata from actual output files by parsing filenames 

535 # This ensures metadata reflects what was actually written, not the original input 

536 component_metadata = self._extract_component_metadata_from_files(actual_files, handler.parser) 

537 

538 # Merge extracted component keys with display names from original metadata cache 

539 # This preserves display names (e.g., "tl-20") while using actual output components 

540 merged_metadata = self._merge_component_metadata(component_metadata, context.metadata_cache) 

541 

542 # CRITICAL: Use AllComponents enum for cache lookups (cache is keyed by AllComponents) 

543 # GroupBy and AllComponents have same values but different hashes, so dict.get() fails with GroupBy 

544 return OpenHCSMetadata( 

545 microscope_handler_name=handler.microscope_type, 

546 source_filename_parser_name=handler.parser.__class__.__name__, 

547 grid_dimensions=grid_dimensions, 

548 pixel_size=pixel_size, 

549 image_files=relative_files, 

550 channels=merged_metadata.get(AllComponents.CHANNEL), 

551 wells=merged_metadata.get(AllComponents.WELL), 

552 sites=merged_metadata.get(AllComponents.SITE), 

553 z_indexes=merged_metadata.get(AllComponents.Z_INDEX), 

554 timepoints=merged_metadata.get(AllComponents.TIMEPOINT), 

555 available_backends={write_backend: True}, 

556 workspace_mapping=None, # Preserve existing - filtered out by create_metadata() 

557 main=is_main if is_main else None, 

558 results_dir=relative_results_dir 

559 ) 

560 

561 def _extract_component_metadata_from_files(self, file_paths: list, parser) -> Dict[AllComponents, Optional[Dict[str, Optional[str]]]]: 

562 """ 

563 Extract component metadata by parsing actual filenames. 

564 

565 Filenames are architecturally guaranteed to be properly formed by the pipeline. 

566 Parser.parse_filename() is guaranteed to succeed. No defensive checks. 

567 

568 Args: 

569 file_paths: List of image file paths (guaranteed properly formed) 

570 parser: FilenameParser instance 

571 

572 Returns: 

573 Dict mapping AllComponents to component metadata dicts (key -> display_name) 

574 """ 

575 result = {component: {} for component in AllComponents} 

576 

577 for file_path in file_paths: 

578 filename = Path(file_path).name 

579 parsed = parser.parse_filename(filename) 

580 

581 # Extract each component from the parsed filename 

582 for component in AllComponents: 

583 component_name = component.value 

584 if component_name in parsed: 584 ↛ 582line 584 didn't jump to line 582 because the condition on line 584 was always true

585 component_value = str(parsed[component_name]) 

586 # Store with None as display name (will be merged with original metadata display names) 

587 if component_value not in result[component]: 

588 result[component][component_value] = None 

589 

590 # Convert empty dicts to None (no metadata for that component) 

591 return {component: metadata_dict if metadata_dict else None for component, metadata_dict in result.items()} 

592 

593 def _merge_component_metadata(self, extracted: Dict[AllComponents, Optional[Dict[str, Optional[str]]]], cache: Dict[AllComponents, Optional[Dict[str, Optional[str]]]]) -> Dict[AllComponents, Optional[Dict[str, Optional[str]]]]: 

594 """ 

595 Merge extracted component keys with display names from original metadata cache. 

596 

597 For each component: 

598 - Use extracted keys (what actually exists in output) 

599 - Preserve display names from cache (e.g., "tl-20" for channel "1") 

600 - If no display name in cache, use None 

601 

602 Args: 

603 extracted: Component metadata extracted from output filenames 

604 cache: Original metadata cache with display names 

605 

606 Returns: 

607 Merged metadata with actual components and preserved display names 

608 """ 

609 result = {} 

610 for component in AllComponents: 

611 extracted_dict = extracted.get(component) 

612 cache_dict = cache.get(component) 

613 

614 if extracted_dict is None: 

615 result[component] = None 

616 else: 

617 # For each extracted key, get display name from cache if available 

618 merged = {} 

619 for key in extracted_dict.keys(): 

620 display_name = cache_dict.get(key) if cache_dict else None 

621 merged[key] = display_name 

622 

623 result[component] = merged if merged else None 

624 

625 return result 

626 

627 

628 

629from openhcs.microscopes.microscope_base import MicroscopeHandler 

630from openhcs.microscopes.microscope_interfaces import FilenameParser 

631 

632 

633class OpenHCSMicroscopeHandler(MicroscopeHandler): 

634 """ 

635 MicroscopeHandler for OpenHCS pre-processed format. 

636 

637 This handler reads plates that have been standardized, with metadata 

638 provided in an 'openhcs_metadata.json' file. It dynamically loads the 

639 appropriate FilenameParser based on the metadata. 

640 """ 

641 

642 # Class attributes for automatic registration 

643 _microscope_type = FIELDS.MICROSCOPE_TYPE # Override automatic naming 

644 _metadata_handler_class = None # Set after class definition 

645 

646 def __init__(self, filemanager: FileManager, pattern_format: Optional[str] = None): 

647 """ 

648 Initialize the OpenHCSMicroscopeHandler. 

649 

650 Args: 

651 filemanager: FileManager instance for file operations. 

652 pattern_format: Optional pattern format string, passed to dynamically loaded parser. 

653 """ 

654 self.filemanager = filemanager 

655 self.metadata_handler = OpenHCSMetadataHandler(filemanager) 

656 self._parser: Optional[FilenameParser] = None 

657 self.plate_folder: Optional[Path] = None # Will be set by factory or post_workspace 

658 self.pattern_format = pattern_format # Store for parser instantiation 

659 

660 # Initialize super with a None parser. The actual parser is loaded dynamically. 

661 # The `parser` property will handle on-demand loading. 

662 super().__init__(parser=None, metadata_handler=self.metadata_handler) 

663 

664 def _load_and_get_parser(self) -> FilenameParser: 

665 """ 

666 Ensures the dynamic filename parser is loaded based on metadata from plate_folder. 

667 This method requires self.plate_folder to be set. 

668 """ 

669 if self._parser is None: 

670 if self.plate_folder is None: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true

671 raise RuntimeError( 

672 "OpenHCSHandler: plate_folder not set. Cannot determine and load the source filename parser." 

673 ) 

674 

675 parser_name = self.metadata_handler.get_source_filename_parser_name(self.plate_folder) 

676 available_parsers = _get_available_filename_parsers() 

677 ParserClass = available_parsers.get(parser_name) 

678 

679 if not ParserClass: 679 ↛ 680line 679 didn't jump to line 680 because the condition on line 679 was never true

680 raise ValueError( 

681 f"Unknown or unsupported filename parser '{parser_name}' specified in " 

682 f"{OpenHCSMetadataHandler.METADATA_FILENAME} for plate {self.plate_folder}. " 

683 f"Available parsers: {list(available_parsers.keys())}" 

684 ) 

685 

686 try: 

687 # Attempt to instantiate with filemanager and pattern_format 

688 self._parser = ParserClass(filemanager=self.filemanager, pattern_format=self.pattern_format) 

689 logger.info(f"OpenHCSHandler for plate {self.plate_folder} loaded source filename parser: {parser_name} with filemanager and pattern_format.") 

690 except TypeError: 

691 try: 

692 # Attempt with filemanager only 

693 self._parser = ParserClass(filemanager=self.filemanager) 

694 logger.info(f"OpenHCSHandler for plate {self.plate_folder} loaded source filename parser: {parser_name} with filemanager.") 

695 except TypeError: 

696 # Attempt with default constructor 

697 self._parser = ParserClass() 

698 logger.info(f"OpenHCSHandler for plate {self.plate_folder} loaded source filename parser: {parser_name} with default constructor.") 

699 

700 return self._parser 

701 

702 @property 

703 def parser(self) -> FilenameParser: 

704 """ 

705 Provides the dynamically loaded FilenameParser. 

706 The actual parser is determined from the 'openhcs_metadata.json' file. 

707 Requires `self.plate_folder` to be set prior to first access. 

708 """ 

709 # If plate_folder is not set here, it means it wasn't set by the factory 

710 # nor by a method like post_workspace before parser access. 

711 if self.plate_folder is None: 711 ↛ 713line 711 didn't jump to line 713 because the condition on line 711 was never true

712 # This situation should ideally be avoided by ensuring plate_folder is set appropriately. 

713 raise RuntimeError("OpenHCSHandler: plate_folder must be set before accessing the parser property.") 

714 

715 return self._load_and_get_parser() 

716 

717 @parser.setter 

718 def parser(self, value: Optional[FilenameParser]): 

719 """ 

720 Allows setting the parser instance. Used by base class __init__ if it attempts to set it, 

721 though our dynamic loading means we primarily manage it internally. 

722 """ 

723 # If the base class __init__ tries to set it (e.g. to None as we passed), 

724 # this setter will be called. We want our dynamic loading to take precedence. 

725 # If an actual parser is passed, we could use it, but it would override dynamic logic. 

726 # For now, if None is passed (from our super call), _parser remains None until dynamically loaded. 

727 # If a specific parser is passed, it will be set. 

728 if value is not None: 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true

729 logger.debug(f"OpenHCSMicroscopeHandler.parser being explicitly set to: {type(value).__name__}") 

730 self._parser = value 

731 

732 

733 @property 

734 def root_dir(self) -> str: 

735 """ 

736 Root directory for OpenHCS is determined from metadata. 

737 

738 OpenHCS plates can have multiple subdirectories (e.g., "zarr", "images", "."). 

739 The root_dir is determined dynamically from the main subdirectory in metadata. 

740 This property returns a placeholder - actual root_dir is determined at runtime. 

741 """ 

742 # This is determined dynamically from metadata in initialize_workspace 

743 # Return empty string as placeholder (not used for virtual workspace) 

744 return "" 

745 

746 @property 

747 def microscope_type(self) -> str: 

748 """Microscope type identifier (for interface enforcement only).""" 

749 return FIELDS.MICROSCOPE_TYPE 

750 

751 @property 

752 def metadata_handler_class(self) -> Type[MetadataHandler]: 

753 """Metadata handler class (for interface enforcement only).""" 

754 return OpenHCSMetadataHandler 

755 

756 @property 

757 def compatible_backends(self) -> List[Backend]: 

758 """ 

759 OpenHCS is compatible with ZARR (preferred) and DISK (fallback) backends. 

760 

761 ZARR: Advanced chunked storage for large datasets (preferred) 

762 DISK: Standard file operations for compatibility (fallback) 

763 """ 

764 return [Backend.ZARR, Backend.DISK] 

765 

766 def get_available_backends(self, plate_path: Union[str, Path]) -> List[Backend]: 

767 """ 

768 Get available storage backends for OpenHCS plates. 

769 

770 OpenHCS plates can support multiple backends based on what actually exists on disk. 

771 This method checks the metadata to see what backends are actually available. 

772 """ 

773 try: 

774 # Get available backends from metadata as Dict[str, bool] 

775 available_backends_dict = self.metadata_handler.get_available_backends(plate_path) 

776 

777 # Convert to List[Backend] by filtering compatible backends that are available 

778 available_backends = [] 

779 for backend_enum in self.compatible_backends: 

780 backend_name = backend_enum.value 

781 if available_backends_dict.get(backend_name, False): 

782 available_backends.append(backend_enum) 

783 

784 # If no backends are available from metadata, fall back to compatible backends 

785 # This handles cases where metadata might not have the available_backends field 

786 if not available_backends: 786 ↛ 787line 786 didn't jump to line 787 because the condition on line 786 was never true

787 logger.warning(f"No available backends found in metadata for {plate_path}, using all compatible backends") 

788 return self.compatible_backends 

789 

790 return available_backends 

791 

792 except Exception as e: 

793 logger.warning(f"Failed to get available backends from metadata for {plate_path}: {e}") 

794 # Fall back to all compatible backends if metadata reading fails 

795 return self.compatible_backends 

796 

797 def get_primary_backend(self, plate_path: Union[str, Path], filemanager: 'FileManager') -> str: 

798 """ 

799 Get the primary backend name for OpenHCS plates. 

800 

801 Uses metadata-based detection to determine the primary backend. 

802 Preference hierarchy: zarr > virtual_workspace > disk 

803 Registers virtual_workspace backend if needed. 

804 

805 Args: 

806 plate_path: Input directory (may be subdirectory like zarr/) 

807 filemanager: FileManager instance for backend registration 

808 """ 

809 # plate_folder must be set before calling this method 

810 if self.plate_folder is None: 810 ↛ 811line 810 didn't jump to line 811 because the condition on line 810 was never true

811 raise RuntimeError( 

812 "OpenHCSHandler.determine_backend_preference: plate_folder not set. " 

813 "Call determine_input_dir() or post_workspace() first." 

814 ) 

815 

816 available_backends_dict = self.metadata_handler.get_available_backends(self.plate_folder) 

817 

818 # Preference hierarchy: zarr > virtual_workspace > disk 

819 # 1. Prefer zarr if available (best performance for large datasets) 

820 if 'zarr' in available_backends_dict and available_backends_dict['zarr']: 

821 return 'zarr' 

822 

823 # 2. Prefer virtual_workspace if available (for plates with workspace_mapping) 

824 if 'virtual_workspace' in available_backends_dict and available_backends_dict['virtual_workspace']: 824 ↛ 830line 824 didn't jump to line 830 because the condition on line 824 was always true

825 # Register virtual_workspace backend using centralized helper 

826 self._register_virtual_workspace_backend(self.plate_folder, filemanager) 

827 return 'virtual_workspace' 

828 

829 # 3. Fall back to first available backend (usually disk) 

830 return next(iter(available_backends_dict.keys())) 

831 

832 def initialize_workspace(self, plate_path: Path, filemanager: FileManager) -> Path: 

833 """ 

834 OpenHCS format doesn't need workspace - determines the correct input subdirectory from metadata. 

835 

836 Args: 

837 plate_path: Path to the original plate directory 

838 filemanager: FileManager instance for file operations 

839 

840 Returns: 

841 Path to the main subdirectory containing input images (e.g., plate_path/images) 

842 """ 

843 logger.info(f"OpenHCS format: Determining input subdirectory from metadata in {plate_path}") 

844 

845 # Set plate_folder for this handler 

846 self.plate_folder = plate_path 

847 logger.debug(f"OpenHCSHandler: plate_folder set to {self.plate_folder}") 

848 

849 # Determine the main subdirectory from metadata - fail-loud on errors 

850 main_subdir = self.metadata_handler.determine_main_subdirectory(plate_path) 

851 input_dir = plate_path / main_subdir 

852 

853 # Check if workspace_mapping exists in metadata - if so, register virtual workspace backend 

854 metadata_dict = self.metadata_handler._load_metadata_dict(plate_path) 

855 subdir_metadata = metadata_dict.get(FIELDS.SUBDIRECTORIES, {}).get(main_subdir, {}) 

856 

857 if subdir_metadata.get('workspace_mapping'): 

858 # Register virtual_workspace backend using centralized helper 

859 self._register_virtual_workspace_backend(plate_path, filemanager) 

860 

861 # Verify the subdirectory exists - fail-loud if missing 

862 if not filemanager.is_dir(str(input_dir), Backend.DISK.value): 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 raise FileNotFoundError( 

864 f"Main subdirectory '{main_subdir}' does not exist at {input_dir}. " 

865 f"Expected directory structure: {plate_path}/{main_subdir}/" 

866 ) 

867 

868 logger.info(f"OpenHCS input directory determined: {input_dir} (subdirectory: {main_subdir})") 

869 return input_dir 

870 

871 def _prepare_workspace(self, workspace_path: Path, filemanager: FileManager) -> Path: 

872 """ 

873 OpenHCS format assumes the workspace is already prepared (e.g., flat structure). 

874 This method is a no-op. 

875 Args: 

876 workspace_path: Path to the symlinked workspace. 

877 filemanager: FileManager instance for file operations. 

878 Returns: 

879 The original workspace_path. 

880 """ 

881 logger.info(f"OpenHCSHandler._prepare_workspace: No preparation needed for {workspace_path} as it's pre-processed.") 

882 # Ensure plate_folder is set if this is the first relevant operation knowing the path 

883 if self.plate_folder is None: 

884 self.plate_folder = Path(workspace_path) 

885 logger.debug(f"OpenHCSHandler: plate_folder set to {self.plate_folder} during _prepare_workspace.") 

886 return workspace_path 

887 

888 def post_workspace(self, plate_path: Union[str, Path], filemanager: FileManager, skip_preparation: bool = False) -> Path: 

889 """ 

890 Hook called after virtual workspace mapping creation. 

891 For OpenHCS, this ensures the plate_folder is set (if not already) which allows 

892 the parser to be loaded using this plate_path. It then calls the base 

893 implementation which handles filename normalization using the loaded parser. 

894 """ 

895 current_plate_folder = Path(plate_path) 

896 if self.plate_folder is None: 

897 logger.info(f"OpenHCSHandler.post_workspace: Setting plate_folder to {current_plate_folder}.") 

898 self.plate_folder = current_plate_folder 

899 self._parser = None # Reset parser if plate_folder changes or is set for the first time 

900 elif self.plate_folder != current_plate_folder: 

901 logger.warning( 

902 f"OpenHCSHandler.post_workspace: plate_folder was {self.plate_folder}, " 

903 f"now processing {current_plate_folder}. Re-initializing parser." 

904 ) 

905 self.plate_folder = current_plate_folder 

906 self._parser = None # Force re-initialization for the new path 

907 

908 # Accessing self.parser here will trigger _load_and_get_parser() if not already loaded 

909 _ = self.parser 

910 

911 logger.info(f"OpenHCSHandler (plate: {self.plate_folder}): Files are expected to be pre-normalized. " 

912 "Superclass post_workspace will run with the dynamically loaded parser.") 

913 return super().post_workspace(plate_path, filemanager, skip_preparation) 

914 

915 # The following methods from MicroscopeHandler delegate to `self.parser`. 

916 # The `parser` property will ensure the correct, dynamically loaded parser is used. 

917 # No explicit override is needed for them unless special behavior for OpenHCS is required 

918 # beyond what the dynamically loaded original parser provides. 

919 # - parse_filename(self, filename: str) 

920 # - construct_filename(self, well: str, ...) 

921 # - auto_detect_patterns(self, folder_path: Union[str, Path], ...) 

922 # - path_list_from_pattern(self, directory: Union[str, Path], ...) 

923 

924 # Metadata handling methods are delegated to `self.metadata_handler` by the base class. 

925 # - find_metadata_file(self, plate_path: Union[str, Path]) 

926 # - get_grid_dimensions(self, plate_path: Union[str, Path]) 

927 # - get_pixel_size(self, plate_path: Union[str, Path]) 

928 # These will use our OpenHCSMetadataHandler correctly. 

929 

930 

931# Set metadata handler class after class definition for automatic registration 

932from openhcs.microscopes.microscope_base import register_metadata_handler 

933OpenHCSMicroscopeHandler._metadata_handler_class = OpenHCSMetadataHandler 

934register_metadata_handler(OpenHCSMicroscopeHandler, OpenHCSMetadataHandler)