Coverage for openhcs/microscopes/opera_phenix.py: 63.6%

317 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Opera Phenix microscope implementations for openhcs. 

3 

4This module provides concrete implementations of FilenameParser and MetadataHandler 

5for Opera Phenix microscopes. 

6""" 

7 

8import logging 

9import os 

10import re 

11from pathlib import Path 

12from typing import Any, Dict, List, Optional, Union, Type, Tuple 

13 

14from openhcs.constants.constants import Backend 

15from openhcs.microscopes.opera_phenix_xml_parser import OperaPhenixXmlParser 

16from openhcs.io.filemanager import FileManager 

17from openhcs.microscopes.microscope_base import MicroscopeHandler 

18from openhcs.microscopes.microscope_interfaces import (FilenameParser, 

19 MetadataHandler) 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24 

25class OperaPhenixHandler(MicroscopeHandler): 

26 """ 

27 MicroscopeHandler implementation for Opera Phenix systems. 

28 

29 This handler combines the OperaPhenix filename parser with its 

30 corresponding metadata handler. It guarantees aligned behavior 

31 for plate structure parsing, metadata extraction, and any optional 

32 post-processing steps required after workspace setup. 

33 """ 

34 

35 # Explicit microscope type for proper registration 

36 _microscope_type = 'opera_phenix' 

37 

38 # Class attribute for automatic metadata handler registration (set after class definition) 

39 _metadata_handler_class = None 

40 

41 def __init__(self, filemanager: FileManager, pattern_format: Optional[str] = None): 

42 self.parser = OperaPhenixFilenameParser(filemanager, pattern_format=pattern_format) 

43 self.metadata_handler = OperaPhenixMetadataHandler(filemanager) 

44 super().__init__(parser=self.parser, metadata_handler=self.metadata_handler) 

45 

46 @property 

47 def common_dirs(self) -> List[str]: 

48 """Subdirectory names commonly used by Opera Phenix.""" 

49 return ['Images'] 

50 

51 @property 

52 def microscope_type(self) -> str: 

53 """Microscope type identifier (for interface enforcement only).""" 

54 return 'opera_phenix' 

55 

56 @property 

57 def metadata_handler_class(self) -> Type[MetadataHandler]: 

58 """Metadata handler class (for interface enforcement only).""" 

59 return OperaPhenixMetadataHandler 

60 

61 @property 

62 def compatible_backends(self) -> List[Backend]: 

63 """ 

64 Opera Phenix is compatible with DISK backend only. 

65 

66 Legacy microscope format with standard file operations. 

67 """ 

68 return [Backend.DISK] 

69 

70 

71 

72 # Uses default workspace initialization from base class 

73 

74 def _prepare_workspace(self, workspace_path: Path, filemanager: FileManager): 

75 """ 

76 Renames Opera Phenix images to follow a consistent field order 

77 based on spatial layout extracted from Index.xml. Uses remapped 

78 filenames and replaces the directory in-place. 

79 

80 This method performs preparation but does not determine the final image directory. 

81 

82 Args: 

83 workspace_path: Path to the symlinked workspace 

84 filemanager: FileManager instance for file operations 

85 

86 Returns: 

87 Path to the normalized image directory. 

88 """ 

89 

90 # Check if workspace has already been processed by looking for temp directory 

91 # If temp directory exists, workspace was already processed - skip processing 

92 temp_dir_name = "__opera_phenix_temp" 

93 for entry in filemanager.list_dir(workspace_path, Backend.DISK.value): 

94 entry_path = Path(workspace_path) / entry 

95 if entry_path.is_dir() and entry_path.name == temp_dir_name: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 logger.info(f"📁 WORKSPACE ALREADY PROCESSED: Found {temp_dir_name} - skipping Opera Phenix preparation") 

97 return workspace_path 

98 

99 logger.info(f"🔄 PROCESSING WORKSPACE: Applying Opera Phenix name remapping to {workspace_path}") 

100 # Find the image directory using the common_dirs property 

101 # Clause 245: Workspace operations are disk-only by design 

102 # This call is structurally hardcoded to use the "disk" backend 

103 

104 # Get all entries in the directory 

105 entries = filemanager.list_dir(workspace_path, Backend.DISK.value) 

106 

107 # Look for a directory matching any of the common_dirs patterns 

108 image_dir = workspace_path 

109 for entry in entries: 109 ↛ 118line 109 didn't jump to line 118 because the loop on line 109 didn't complete

110 entry_lower = entry.lower() 

111 if any(common_dir.lower() in entry_lower for common_dir in self.common_dirs): 111 ↛ 109line 111 didn't jump to line 109 because the condition on line 111 was always true

112 # Found a matching directory 

113 image_dir = Path(workspace_path) / entry if isinstance(workspace_path, (str, Path)) else workspace_path / entry 

114 logger.info("Found directory matching common_dirs pattern: %s", image_dir) 

115 break 

116 

117 # Default to empty field mapping (no remapping) 

118 field_mapping = {} 

119 

120 # Try to load field mapping from Index.xml if available 

121 try: 

122 # Clause 245: Workspace operations are disk-only by design 

123 # This call is structurally hardcoded to use the "disk" backend 

124 index_xml = filemanager.find_file_recursive(workspace_path, "Index.xml", Backend.DISK.value) 

125 if index_xml: 125 ↛ 130line 125 didn't jump to line 130 because the condition on line 125 was always true

126 xml_parser = OperaPhenixXmlParser(index_xml) 

127 field_mapping = xml_parser.get_field_id_mapping() 

128 logger.debug("Loaded field mapping from Index.xml: %s", field_mapping) 

129 else: 

130 logger.debug("Index.xml not found. Using default field mapping.") 

131 except Exception as e: 

132 logger.error("Error loading Index.xml: %s", e) 

133 logger.debug("Using default field mapping due to error.") 

134 

135 # Get all image files in the directory BEFORE creating temp directory 

136 # This prevents recursive mirroring of the temp directory 

137 # Clause 245: Workspace operations are disk-only by design 

138 # This call is structurally hardcoded to use the "disk" backend 

139 image_files = filemanager.list_image_files(image_dir, Backend.DISK.value) 

140 

141 # Create a uniquely named temporary directory for renamed files 

142 # Use "__opera_phenix_temp" to make it clearly identifiable 

143 if isinstance(image_dir, str): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 temp_dir = os.path.join(image_dir, "__opera_phenix_temp") 

145 else: # Path object 

146 temp_dir = image_dir / "__opera_phenix_temp" 

147 

148 # SAFETY CHECK: Ensure temp directory is within workspace 

149 if not str(temp_dir).startswith(str(workspace_path)): 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 logger.error("SAFETY VIOLATION: Temp directory would be created outside workspace: %s", temp_dir) 

151 raise RuntimeError(f"Temp directory would be created outside workspace: {temp_dir}") 

152 

153 # Clause 245: Workspace operations are disk-only by design 

154 # This call is structurally hardcoded to use the "disk" backend 

155 filemanager.ensure_directory(temp_dir, Backend.DISK.value) 

156 

157 logger.debug("Created temporary directory for Opera Phenix workspace preparation: %s", temp_dir) 

158 

159 # Process each file 

160 for file_path in image_files: 

161 # FileManager should return strings, but handle Path objects too 

162 if isinstance(file_path, str): 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true

163 file_name = os.path.basename(file_path) 

164 file_path_obj = Path(file_path) 

165 elif isinstance(file_path, Path): 

166 file_name = file_path.name 

167 file_path_obj = file_path 

168 else: 

169 # Skip any unexpected types 

170 logger.warning("Unexpected file path type: %s", type(file_path).__name__) 

171 continue 

172 

173 # Check if this is a symlink 

174 if file_path_obj.is_symlink(): 174 ↛ 189line 174 didn't jump to line 189 because the condition on line 174 was always true

175 try: 

176 # Get the target of the symlink (what it points to) 

177 real_file_path = file_path_obj.resolve() 

178 if not real_file_path.exists(): 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 logger.warning("Broken symlink detected: %s -> %s", file_path, real_file_path) 

180 continue 

181 # Store both the symlink path and the real file path 

182 source_path = str(real_file_path) 

183 symlink_target = str(real_file_path) 

184 except Exception as e: 

185 logger.warning("Failed to resolve symlink %s: %s", file_path, e) 

186 continue 

187 else: 

188 # This should never happen in a properly mirrored workspace 

189 logger.error("SAFETY VIOLATION: Found real file in workspace (should be symlink): %s", file_path) 

190 raise RuntimeError(f"Workspace contains real file instead of symlink: {file_path}") 

191 

192 # Store the original symlink path for reference 

193 original_symlink_path = str(file_path_obj) 

194 

195 # Parse file metadata 

196 metadata = self.parser.parse_filename(file_name) 

197 if not metadata or 'site' not in metadata or metadata['site'] is None: 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true

198 continue 

199 

200 # Remap the field ID using the spatial layout 

201 original_field_id = metadata['site'] 

202 new_field_id = field_mapping.get(original_field_id, original_field_id) 

203 

204 # Construct the new filename with proper padding 

205 new_name = self.parser.construct_filename( 

206 well=metadata['well'], 

207 site=new_field_id, 

208 channel=metadata['channel'], 

209 z_index=metadata['z_index'], 

210 extension=metadata['extension'], 

211 site_padding=3, 

212 z_padding=3 

213 ) 

214 

215 # Create the new path in the temporary directory 

216 if isinstance(temp_dir, str): 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 new_path = os.path.join(temp_dir, new_name) 

218 else: # Path object 

219 new_path = temp_dir / new_name 

220 

221 # Check if destination already exists in temp directory 

222 try: 

223 # Clause 245: Workspace operations are disk-only by design 

224 # This call is structurally hardcoded to use the "disk" backend 

225 if filemanager.exists(new_path, Backend.DISK.value): 225 ↛ 227line 225 didn't jump to line 227 because the condition on line 225 was never true

226 # For temp directory, we can be more aggressive and delete any existing file 

227 logger.debug("File exists in temp directory, removing before copy: %s", new_path) 

228 filemanager.delete(new_path, Backend.DISK.value) 

229 

230 # Create a symlink in the temp directory pointing to the original file 

231 # Clause 245: Workspace operations are disk-only by design 

232 # This call is structurally hardcoded to use the "disk" backend 

233 filemanager.create_symlink(source_path, new_path, Backend.DISK.value) 

234 logger.debug("Created symlink in temp directory: %s -> %s", new_path, source_path) 

235 

236 except Exception as e: 

237 logger.error("Failed to copy file to temp directory: %s -> %s: %s", 

238 source_path, new_path, e) 

239 raise RuntimeError(f"Failed to copy file to temp directory: {e}") from e 

240 

241 # Clean up and replace old files - ONLY delete symlinks in workspace, NEVER original files 

242 for file_path in image_files: 

243 # Convert to Path object for symlink checking 

244 file_path_obj = Path(file_path) if isinstance(file_path, str) else file_path 

245 

246 # SAFETY CHECK: Only delete if it's within the workspace directory 

247 if not str(file_path_obj).startswith(str(workspace_path)): 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 logger.error("SAFETY VIOLATION: Attempted to delete file outside workspace: %s", file_path) 

249 raise RuntimeError(f"Workspace preparation tried to delete file outside workspace: {file_path}") 

250 

251 # SAFETY CHECK: In workspace, only delete symlinks, never real files 

252 if file_path_obj.is_symlink(): 252 ↛ 256line 252 didn't jump to line 256 because the condition on line 252 was always true

253 # Safe to delete - it's a symlink in the workspace 

254 logger.debug("Deleting symlink in workspace: %s", file_path) 

255 filemanager.delete(file_path, Backend.DISK.value) 

256 elif file_path_obj.is_file(): 

257 # This should never happen in a properly mirrored workspace 

258 logger.error("SAFETY VIOLATION: Found real file in workspace (should be symlink): %s", file_path) 

259 raise RuntimeError(f"Workspace contains real file instead of symlink: {file_path}") 

260 else: 

261 logger.warning("File not found or not accessible: %s", file_path) 

262 

263 # Get all files in the temporary directory 

264 # Clause 245: Workspace operations are disk-only by design 

265 # This call is structurally hardcoded to use the "disk" backend 

266 temp_files = filemanager.list_files(temp_dir, Backend.DISK.value) 

267 

268 # Move files from temporary directory to image directory 

269 for temp_file in temp_files: 

270 # FileManager should return strings, but handle Path objects too 

271 if isinstance(temp_file, str): 271 ↛ 273line 271 didn't jump to line 273 because the condition on line 271 was always true

272 temp_file_name = os.path.basename(temp_file) 

273 elif isinstance(temp_file, Path): 

274 temp_file_name = temp_file.name 

275 else: 

276 # Skip any unexpected types 

277 logger.warning("Unexpected file path type: %s", type(temp_file).__name__) 

278 continue 

279 if isinstance(image_dir, str): 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 dest_path = os.path.join(image_dir, temp_file_name) 

281 else: # Path object 

282 dest_path = image_dir / temp_file_name 

283 

284 try: 

285 # Check if destination already exists in image directory 

286 # Clause 245: Workspace operations are disk-only by design 

287 # This call is structurally hardcoded to use the "disk" backend 

288 if filemanager.exists(dest_path, Backend.DISK.value): 288 ↛ 290line 288 didn't jump to line 290 because the condition on line 288 was never true

289 # If destination is a symlink, ok to remove and replace 

290 if filemanager.is_symlink(dest_path, Backend.DISK.value): 

291 logger.debug("Destination is a symlink, removing before copy: %s", dest_path) 

292 filemanager.delete(dest_path, Backend.DISK.value) 

293 else: 

294 # Not a symlink - could be a real file 

295 logger.error("SAFETY VIOLATION: Destination exists and is not a symlink: %s", dest_path) 

296 raise FileExistsError(f"Destination exists and is not a symlink: {dest_path}") 

297 

298 # First, if the temp file is a symlink, get its target 

299 temp_file_obj = Path(temp_file) if isinstance(temp_file, str) else temp_file 

300 if temp_file_obj.is_symlink(): 300 ↛ 316line 300 didn't jump to line 316 because the condition on line 300 was always true

301 try: 

302 # Get the target that the temp symlink points to 

303 real_target = temp_file_obj.resolve() 

304 real_target_path = str(real_target) 

305 

306 # Create a new symlink in the image directory pointing to the original file 

307 # Clause 245: Workspace operations are disk-only by design 

308 # This call is structurally hardcoded to use the "disk" backend 

309 filemanager.create_symlink(real_target_path, dest_path, Backend.DISK.value) 

310 logger.debug("Created symlink in image directory: %s -> %s", dest_path, real_target_path) 

311 except Exception as e: 

312 logger.error("Failed to resolve symlink in temp directory: %s: %s", temp_file, e) 

313 raise RuntimeError(f"Failed to resolve symlink: {e}") from e 

314 else: 

315 # This should never happen if we're using symlinks consistently 

316 logger.warning("Temp file is not a symlink: %s", temp_file) 

317 # Fall back to copying the file 

318 filemanager.copy(temp_file, dest_path, Backend.DISK.value) 

319 logger.debug("Copied file (not symlink) to image directory: %s -> %s", temp_file, dest_path) 

320 

321 # Remove the file from the temporary directory 

322 # Clause 245: Workspace operations are disk-only by design 

323 # This call is structurally hardcoded to use the "disk" backend 

324 filemanager.delete(temp_file, Backend.DISK.value) 

325 

326 except FileExistsError as e: 

327 # Re-raise with clear message 

328 logger.error("Cannot copy to destination: %s", e) 

329 raise 

330 except Exception as e: 

331 logger.error("Error copying from temp to destination: %s -> %s: %s", 

332 temp_file, dest_path, e) 

333 raise RuntimeError(f"Failed to process file from temp directory: {e}") from e 

334 

335 # SAFETY CHECK: Validate temp directory before deletion  

336 if not str(temp_dir).startswith(str(workspace_path)): 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 logger.error("SAFETY VIOLATION: Attempted to delete temp directory outside workspace: %s", temp_dir) 

338 raise RuntimeError(f"Attempted to delete temp directory outside workspace: {temp_dir}") 

339 

340 if not "__opera_phenix_temp" in str(temp_dir): 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 logger.error("SAFETY VIOLATION: Attempted to delete non-temp directory: %s", temp_dir) 

342 raise RuntimeError(f"Attempted to delete non-temp directory: {temp_dir}") 

343 

344 # Remove the temporary directory 

345 # Clause 245: Workspace operations are disk-only by design 

346 # This call is structurally hardcoded to use the "disk" backend 

347 try: 

348 filemanager.delete(temp_dir, Backend.DISK.value) 

349 logger.debug("Successfully removed temporary directory: %s", temp_dir) 

350 except Exception as e: 

351 # Non-fatal error, just log it 

352 logger.warning("Failed to remove temporary directory %s: %s", temp_dir, e) 

353 

354 return image_dir 

355 

356 

357class OperaPhenixFilenameParser(FilenameParser): 

358 """Parser for Opera Phenix microscope filenames. 

359 

360 Handles Opera Phenix format filenames like: 

361 - r01c01f001p01-ch1sk1fk1fl1.tiff 

362 - r01c01f001p01-ch1.tiff 

363 """ 

364 

365 # Regular expression pattern for Opera Phenix filenames 

366 _pattern = re.compile(r"r(\d{1,2})c(\d{1,2})f(\d+|\{[^\}]*\})p(\d+|\{[^\}]*\})-ch(\d+|\{[^\}]*\})(?:sk\d+)?(?:fk\d+)?(?:fl\d+)?(\.\w+)$", re.I) 

367 

368 # Pattern for extracting row and column from Opera Phenix well format 

369 _well_pattern = re.compile(r"R(\d{2})C(\d{2})", re.I) 

370 

371 def __init__(self, filemanager=None, pattern_format=None): 

372 """ 

373 Initialize the parser. 

374 

375 Args: 

376 filemanager: FileManager instance (not used, but required for interface compatibility) 

377 pattern_format: Optional pattern format (not used, but required for interface compatibility) 

378 """ 

379 # These parameters are not used by this parser, but are required for interface compatibility 

380 self.filemanager = filemanager 

381 self.pattern_format = pattern_format 

382 

383 @classmethod 

384 def can_parse(cls, filename: str) -> bool: 

385 """ 

386 Check if this parser can parse the given filename. 

387 

388 Args: 

389 filename (str): Filename to check 

390 

391 Returns: 

392 bool: True if this parser can parse the filename, False otherwise 

393 """ 

394 # 🔒 Clause 17 — VFS Boundary Method 

395 # This is a string operation that doesn't perform actual file I/O 

396 # Extract just the basename 

397 basename = os.path.basename(filename) 

398 # Check if the filename matches the Opera Phenix pattern 

399 return bool(cls._pattern.match(basename)) 

400 

401 def parse_filename(self, filename: str) -> Optional[Dict[str, Any]]: 

402 """ 

403 Parse an Opera Phenix filename to extract all components. 

404 Supports placeholders like {iii} which will return None for that field. 

405 

406 Args: 

407 filename (str): Filename to parse 

408 

409 Returns: 

410 dict or None: Dictionary with extracted components or None if parsing fails. 

411 """ 

412 # 🔒 Clause 17 — VFS Boundary Method 

413 # This is a string operation that doesn't perform actual file I/O 

414 basename = os.path.basename(filename) 

415 logger.debug("OperaPhenixFilenameParser attempting to parse basename: '%s'", basename) 

416 

417 # Try parsing using the Opera Phenix pattern 

418 match = self._pattern.match(basename) 

419 if match: 419 ↛ 448line 419 didn't jump to line 448 because the condition on line 419 was always true

420 logger.debug("Regex match successful for '%s'", basename) 

421 row, col, site_str, z_str, channel_str, ext = match.groups() 

422 

423 # Helper function to parse component strings 

424 def parse_comp(s): 

425 """Parse component string to int or None if it's a placeholder.""" 

426 if not s or '{' in s: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 return None 

428 return int(s) 

429 

430 # Create well ID from row and column 

431 well = f"R{int(row):02d}C{int(col):02d}" 

432 

433 # Parse components 

434 site = parse_comp(site_str) 

435 channel = parse_comp(channel_str) 

436 z_index = parse_comp(z_str) 

437 

438 result = { 

439 'well': well, 

440 'site': site, 

441 'channel': channel, 

442 'wavelength': channel, # For backward compatibility 

443 'z_index': z_index, 

444 'extension': ext if ext else '.tif' 

445 } 

446 return result 

447 

448 logger.warning("Regex match failed for basename: '%s'", basename) 

449 return None 

450 

451 def construct_filename(self, well: str, site: Optional[Union[int, str]] = None, channel: Optional[int] = None, 

452 z_index: Optional[Union[int, str]] = None, extension: str = '.tiff', 

453 site_padding: int = 3, z_padding: int = 3) -> str: 

454 """ 

455 Construct an Opera Phenix filename from components. 

456 

457 Args: 

458 well (str): Well ID (e.g., 'R03C04' or 'A01') 

459 site: Site/field number (int) or placeholder string 

460 channel (int): Channel number 

461 z_index: Z-index/plane (int) or placeholder string 

462 extension (str, optional): File extension 

463 site_padding (int, optional): Width to pad site numbers to (default: 3) 

464 z_padding (int, optional): Width to pad Z-index numbers to (default: 3) 

465 

466 Returns: 

467 str: Constructed filename 

468 """ 

469 # Extract row and column from well name 

470 # Check if well is in Opera Phenix format (e.g., 'R01C03') 

471 match = self._well_pattern.match(well) 

472 if match: 472 ↛ 477line 472 didn't jump to line 477 because the condition on line 472 was always true

473 # Extract row and column from Opera Phenix format 

474 row = int(match.group(1)) 

475 col = int(match.group(2)) 

476 else: 

477 raise ValueError(f"Invalid well format: {well}. Expected format: 'R01C03'") 

478 

479 # Default Z-index to 1 if not provided 

480 z_index = 1 if z_index is None else z_index 

481 channel = 1 if channel is None else channel 

482 

483 # Construct filename in Opera Phenix format 

484 if isinstance(site, str): 

485 # If site is a string (e.g., '{iii}'), use it directly 

486 site_part = f"f{site}" 

487 else: 

488 # Otherwise, format it as a padded integer 

489 site_part = f"f{site:0{site_padding}d}" 

490 

491 if isinstance(z_index, str): 

492 # If z_index is a string (e.g., '{zzz}'), use it directly 

493 z_part = f"p{z_index}" 

494 else: 

495 # Otherwise, format it as a padded integer 

496 z_part = f"p{z_index:0{z_padding}d}" 

497 

498 return f"r{row:02d}c{col:02d}{site_part}{z_part}-ch{channel}sk1fk1fl1{extension}" 

499 

500 def remap_field_in_filename(self, filename: str, xml_parser: Optional[OperaPhenixXmlParser] = None) -> str: 

501 """ 

502 Remap the field ID in a filename to follow a top-left to bottom-right pattern. 

503 

504 Args: 

505 filename: Original filename 

506 xml_parser: Parser with XML data 

507 

508 Returns: 

509 str: New filename with remapped field ID 

510 """ 

511 if xml_parser is None: 

512 return filename 

513 

514 # Parse the filename 

515 metadata = self.parse_filename(filename) 

516 if not metadata or 'site' not in metadata or metadata['site'] is None: 

517 return filename 

518 

519 # Get the mapping and remap the field ID 

520 mapping = xml_parser.get_field_id_mapping() 

521 new_field_id = xml_parser.remap_field_id(metadata['site'], mapping) 

522 

523 # Always create a new filename with the remapped field ID and consistent padding 

524 # This ensures all filenames have the same format, even if the field ID didn't change 

525 return self.construct_filename( 

526 well=metadata['well'], 

527 site=new_field_id, 

528 channel=metadata['channel'], 

529 z_index=metadata['z_index'], 

530 extension=metadata['extension'], 

531 site_padding=3, 

532 z_padding=3 

533 ) 

534 

535 def extract_row_column(self, well: str) -> Tuple[str, str]: 

536 """ 

537 Extract row and column from Opera Phenix well identifier. 

538 

539 Args: 

540 well (str): Well identifier (e.g., 'R03C04' or 'A01') 

541 

542 Returns: 

543 Tuple[str, str]: (row, column) where row is like 'A', 'B' and column is like '01', '04' 

544 

545 Raises: 

546 ValueError: If well format is invalid 

547 """ 

548 if not well: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true

549 raise ValueError(f"Invalid well format: {well}") 

550 

551 # Check if well is in Opera Phenix format (e.g., 'R01C03') 

552 match = self._well_pattern.match(well) 

553 if match: 553 ↛ 563line 553 didn't jump to line 563 because the condition on line 553 was always true

554 # Extract row and column from Opera Phenix format 

555 row_num = int(match.group(1)) 

556 col_num = int(match.group(2)) 

557 # Convert to letter-number format: R01C03 -> A, 03 

558 row = chr(ord('A') + row_num - 1) # R01 -> A, R02 -> B, etc. 

559 col = f"{col_num:02d}" # Ensure 2-digit padding 

560 return row, col 

561 else: 

562 # Assume simple format like 'A01', 'C04' 

563 if len(well) < 2: 

564 raise ValueError(f"Invalid well format: {well}") 

565 row = well[0] 

566 col = well[1:] 

567 if not row.isalpha() or not col.isdigit(): 

568 raise ValueError(f"Invalid Opera Phenix well format: {well}. Expected 'R01C03' or 'A01' format") 

569 return row, col 

570 

571 

572class OperaPhenixMetadataHandler(MetadataHandler): 

573 """ 

574 Metadata handler for Opera Phenix microscopes. 

575 

576 Handles finding and parsing Index.xml files for Opera Phenix microscopes. 

577 """ 

578 

579 def __init__(self, filemanager: FileManager): 

580 """ 

581 Initialize the metadata handler. 

582 

583 Args: 

584 filemanager: FileManager instance for file operations. 

585 """ 

586 super().__init__() 

587 self.filemanager = filemanager 

588 

589 # Legacy mode has been completely purged 

590 

591 def find_metadata_file(self, plate_path: Union[str, Path]): 

592 """ 

593 Find the Index.xml file in the plate directory. 

594 

595 Args: 

596 plate_path: Path to the plate directory 

597 

598 Returns: 

599 Path to the Index.xml file 

600 

601 Raises: 

602 FileNotFoundError: If no Index.xml file is found 

603 """ 

604 # Ensure plate_path is a Path object 

605 if isinstance(plate_path, str): 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true

606 plate_path = Path(plate_path) 

607 

608 # Ensure the path exists 

609 if not plate_path.exists(): 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true

610 raise FileNotFoundError(f"Plate path does not exist: {plate_path}") 

611 

612 # Check for Index.xml in the plate directory 

613 index_xml = plate_path / "Index.xml" 

614 if index_xml.exists(): 

615 return index_xml 

616 

617 # Check for Index.xml in the Images directory 

618 images_dir = plate_path / "Images" 

619 if images_dir.exists(): 619 ↛ 625line 619 didn't jump to line 625 because the condition on line 619 was always true

620 index_xml = images_dir / "Index.xml" 

621 if index_xml.exists(): 621 ↛ 625line 621 didn't jump to line 625 because the condition on line 621 was always true

622 return index_xml 

623 

624 # No recursive search - only check root and Images directories 

625 raise FileNotFoundError( 

626 f"Index.xml not found in {plate_path} or {plate_path}/Images. " 

627 "Opera Phenix metadata requires Index.xml file." 

628 ) 

629 

630 # Ensure result is a Path object 

631 if isinstance(result, str): 

632 return Path(result) 

633 if isinstance(result, Path): 

634 return result 

635 # This should not happen if FileManager is properly implemented 

636 logger.warning("Unexpected result type from find_file_recursive: %s", type(result).__name__) 

637 return Path(str(result)) 

638 

639 def get_grid_dimensions(self, plate_path: Union[str, Path]): 

640 """ 

641 Get grid dimensions for stitching from Index.xml file. 

642 

643 Args: 

644 plate_path: Path to the plate folder 

645 

646 Returns: 

647 Tuple of (grid_rows, grid_cols) - UPDATED: Now returns (rows, cols) for MIST compatibility 

648 

649 Raises: 

650 FileNotFoundError: If no Index.xml file is found 

651 OperaPhenixXmlParseError: If the XML cannot be parsed 

652 OperaPhenixXmlContentError: If grid dimensions cannot be determined 

653 """ 

654 # Ensure plate_path is a Path object 

655 if isinstance(plate_path, str): 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true

656 plate_path = Path(plate_path) 

657 

658 # Ensure the path exists 

659 if not plate_path.exists(): 659 ↛ 660line 659 didn't jump to line 660 because the condition on line 659 was never true

660 raise FileNotFoundError(f"Plate path does not exist: {plate_path}") 

661 

662 # Find the Index.xml file - this will raise FileNotFoundError if not found 

663 index_xml = self.find_metadata_file(plate_path) 

664 

665 # Use the OperaPhenixXmlParser to get the grid size 

666 # This will raise appropriate exceptions if parsing fails 

667 xml_parser = self.create_xml_parser(index_xml) 

668 grid_size = xml_parser.get_grid_size() 

669 

670 # Validate the grid size 

671 if grid_size[0] <= 0 or grid_size[1] <= 0: 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true

672 raise ValueError( 

673 f"Invalid grid dimensions: {grid_size[0]}x{grid_size[1]}. " 

674 "Grid dimensions must be positive integers." 

675 ) 

676 

677 logger.info("Grid size from Index.xml: %dx%d (cols x rows)", grid_size[0], grid_size[1]) 

678 # FIXED: Return (rows, cols) for MIST compatibility instead of (cols, rows) 

679 return (grid_size[1], grid_size[0]) 

680 

681 def get_pixel_size(self, plate_path: Union[str, Path]): 

682 """ 

683 Get the pixel size from Index.xml file. 

684 

685 Args: 

686 plate_path: Path to the plate folder 

687 

688 Returns: 

689 Pixel size in micrometers 

690 

691 Raises: 

692 FileNotFoundError: If no Index.xml file is found 

693 OperaPhenixXmlParseError: If the XML cannot be parsed 

694 OperaPhenixXmlContentError: If pixel size cannot be determined 

695 """ 

696 # Ensure plate_path is a Path object 

697 if isinstance(plate_path, str): 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true

698 plate_path = Path(plate_path) 

699 

700 # Ensure the path exists 

701 if not plate_path.exists(): 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true

702 raise FileNotFoundError(f"Plate path does not exist: {plate_path}") 

703 

704 # Find the Index.xml file - this will raise FileNotFoundError if not found 

705 index_xml = self.find_metadata_file(plate_path) 

706 

707 # Use the OperaPhenixXmlParser to get the pixel size 

708 # This will raise appropriate exceptions if parsing fails 

709 xml_parser = self.create_xml_parser(index_xml) 

710 pixel_size = xml_parser.get_pixel_size() 

711 

712 # Validate the pixel size 

713 if pixel_size <= 0: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true

714 raise ValueError( 

715 f"Invalid pixel size: {pixel_size}. " 

716 "Pixel size must be a positive number." 

717 ) 

718 

719 logger.info("Pixel size from Index.xml: %.4f μm", pixel_size) 

720 return pixel_size 

721 

722 def get_channel_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]: 

723 """ 

724 Get channel key→name mapping from Opera Phenix Index.xml. 

725 

726 Args: 

727 plate_path: Path to the plate folder (str or Path) 

728 

729 Returns: 

730 Dict mapping channel IDs to channel names from metadata 

731 Example: {"1": "HOECHST 33342", "2": "Calcein", "3": "Alexa 647"} 

732 """ 

733 try: 

734 # Ensure plate_path is a Path object 

735 if isinstance(plate_path, str): 735 ↛ 736line 735 didn't jump to line 736 because the condition on line 735 was never true

736 plate_path = Path(plate_path) 

737 

738 # Find and parse Index.xml 

739 index_xml = self.find_metadata_file(plate_path) 

740 xml_parser = self.create_xml_parser(index_xml) 

741 

742 # Extract channel information 

743 channel_mapping = {} 

744 

745 # Look for channel entries in the XML 

746 # Opera Phenix stores channel info in multiple places, try the most common 

747 root = xml_parser.root 

748 namespace = xml_parser.namespace 

749 

750 # Find channel entries with ChannelName elements 

751 channel_entries = root.findall(f".//{namespace}Entry[@ChannelID]") 

752 for entry in channel_entries: 

753 channel_id = entry.get('ChannelID') 

754 channel_name_elem = entry.find(f"{namespace}ChannelName") 

755 

756 if channel_id and channel_name_elem is not None: 756 ↛ 752line 756 didn't jump to line 752 because the condition on line 756 was always true

757 channel_name = channel_name_elem.text 

758 if channel_name: 758 ↛ 752line 758 didn't jump to line 752 because the condition on line 758 was always true

759 channel_mapping[channel_id] = channel_name 

760 

761 return channel_mapping if channel_mapping else None 

762 

763 except Exception as e: 

764 logger.debug(f"Could not extract channel names from Opera Phenix metadata: {e}") 

765 return None 

766 

767 def get_well_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]: 

768 """ 

769 Get well key→name mapping from Opera Phenix metadata. 

770 

771 Args: 

772 plate_path: Path to the plate folder (str or Path) 

773 

774 Returns: 

775 None - Opera Phenix doesn't provide rich well names in metadata 

776 """ 

777 return None 

778 

779 def get_site_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]: 

780 """ 

781 Get site key→name mapping from Opera Phenix metadata. 

782 

783 Args: 

784 plate_path: Path to the plate folder (str or Path) 

785 

786 Returns: 

787 None - Opera Phenix doesn't provide rich site names in metadata 

788 """ 

789 return None 

790 

791 def get_z_index_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]: 

792 """ 

793 Get z_index key→name mapping from Opera Phenix metadata. 

794 

795 Args: 

796 plate_path: Path to the plate folder (str or Path) 

797 

798 Returns: 

799 None - Opera Phenix doesn't provide rich z_index names in metadata 

800 """ 

801 return None 

802 

803 

804 

805 def create_xml_parser(self, xml_path: Union[str, Path]): 

806 """ 

807 Create an OperaPhenixXmlParser for the given XML file. 

808 

809 Args: 

810 xml_path: Path to the XML file 

811 

812 Returns: 

813 OperaPhenixXmlParser: Parser for the XML file 

814 

815 Raises: 

816 FileNotFoundError: If the XML file does not exist 

817 """ 

818 # Ensure xml_path is a Path object 

819 if isinstance(xml_path, str): 819 ↛ 820line 819 didn't jump to line 820 because the condition on line 819 was never true

820 xml_path = Path(xml_path) 

821 

822 # Ensure the path exists 

823 if not xml_path.exists(): 823 ↛ 824line 823 didn't jump to line 824 because the condition on line 823 was never true

824 raise FileNotFoundError(f"XML file does not exist: {xml_path}") 

825 

826 # Create the parser 

827 return OperaPhenixXmlParser(xml_path) 

828 

829 

830# Set metadata handler class after class definition for automatic registration 

831from openhcs.microscopes.microscope_base import register_metadata_handler 

832OperaPhenixHandler._metadata_handler_class = OperaPhenixMetadataHandler 

833register_metadata_handler(OperaPhenixHandler, OperaPhenixMetadataHandler)