Coverage for openhcs/microscopes/opera_phenix.py: 63.6%
317 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Opera Phenix microscope implementations for openhcs.
4This module provides concrete implementations of FilenameParser and MetadataHandler
5for Opera Phenix microscopes.
6"""
8import logging
9import os
10import re
11from pathlib import Path
12from typing import Any, Dict, List, Optional, Union, Type, Tuple
14from openhcs.constants.constants import Backend
15from openhcs.microscopes.opera_phenix_xml_parser import OperaPhenixXmlParser
16from openhcs.io.filemanager import FileManager
17from openhcs.microscopes.microscope_base import MicroscopeHandler
18from openhcs.microscopes.microscope_interfaces import (FilenameParser,
19 MetadataHandler)
21logger = logging.getLogger(__name__)
25class OperaPhenixHandler(MicroscopeHandler):
26 """
27 MicroscopeHandler implementation for Opera Phenix systems.
29 This handler combines the OperaPhenix filename parser with its
30 corresponding metadata handler. It guarantees aligned behavior
31 for plate structure parsing, metadata extraction, and any optional
32 post-processing steps required after workspace setup.
33 """
35 # Explicit microscope type for proper registration
36 _microscope_type = 'opera_phenix'
38 # Class attribute for automatic metadata handler registration (set after class definition)
39 _metadata_handler_class = None
41 def __init__(self, filemanager: FileManager, pattern_format: Optional[str] = None):
42 self.parser = OperaPhenixFilenameParser(filemanager, pattern_format=pattern_format)
43 self.metadata_handler = OperaPhenixMetadataHandler(filemanager)
44 super().__init__(parser=self.parser, metadata_handler=self.metadata_handler)
46 @property
47 def common_dirs(self) -> List[str]:
48 """Subdirectory names commonly used by Opera Phenix."""
49 return ['Images']
51 @property
52 def microscope_type(self) -> str:
53 """Microscope type identifier (for interface enforcement only)."""
54 return 'opera_phenix'
56 @property
57 def metadata_handler_class(self) -> Type[MetadataHandler]:
58 """Metadata handler class (for interface enforcement only)."""
59 return OperaPhenixMetadataHandler
61 @property
62 def compatible_backends(self) -> List[Backend]:
63 """
64 Opera Phenix is compatible with DISK backend only.
66 Legacy microscope format with standard file operations.
67 """
68 return [Backend.DISK]
72 # Uses default workspace initialization from base class
74 def _prepare_workspace(self, workspace_path: Path, filemanager: FileManager):
75 """
76 Renames Opera Phenix images to follow a consistent field order
77 based on spatial layout extracted from Index.xml. Uses remapped
78 filenames and replaces the directory in-place.
80 This method performs preparation but does not determine the final image directory.
82 Args:
83 workspace_path: Path to the symlinked workspace
84 filemanager: FileManager instance for file operations
86 Returns:
87 Path to the normalized image directory.
88 """
90 # Check if workspace has already been processed by looking for temp directory
91 # If temp directory exists, workspace was already processed - skip processing
92 temp_dir_name = "__opera_phenix_temp"
93 for entry in filemanager.list_dir(workspace_path, Backend.DISK.value):
94 entry_path = Path(workspace_path) / entry
95 if entry_path.is_dir() and entry_path.name == temp_dir_name: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true
96 logger.info(f"📁 WORKSPACE ALREADY PROCESSED: Found {temp_dir_name} - skipping Opera Phenix preparation")
97 return workspace_path
99 logger.info(f"🔄 PROCESSING WORKSPACE: Applying Opera Phenix name remapping to {workspace_path}")
100 # Find the image directory using the common_dirs property
101 # Clause 245: Workspace operations are disk-only by design
102 # This call is structurally hardcoded to use the "disk" backend
104 # Get all entries in the directory
105 entries = filemanager.list_dir(workspace_path, Backend.DISK.value)
107 # Look for a directory matching any of the common_dirs patterns
108 image_dir = workspace_path
109 for entry in entries: 109 ↛ 118line 109 didn't jump to line 118 because the loop on line 109 didn't complete
110 entry_lower = entry.lower()
111 if any(common_dir.lower() in entry_lower for common_dir in self.common_dirs): 111 ↛ 109line 111 didn't jump to line 109 because the condition on line 111 was always true
112 # Found a matching directory
113 image_dir = Path(workspace_path) / entry if isinstance(workspace_path, (str, Path)) else workspace_path / entry
114 logger.info("Found directory matching common_dirs pattern: %s", image_dir)
115 break
117 # Default to empty field mapping (no remapping)
118 field_mapping = {}
120 # Try to load field mapping from Index.xml if available
121 try:
122 # Clause 245: Workspace operations are disk-only by design
123 # This call is structurally hardcoded to use the "disk" backend
124 index_xml = filemanager.find_file_recursive(workspace_path, "Index.xml", Backend.DISK.value)
125 if index_xml: 125 ↛ 130line 125 didn't jump to line 130 because the condition on line 125 was always true
126 xml_parser = OperaPhenixXmlParser(index_xml)
127 field_mapping = xml_parser.get_field_id_mapping()
128 logger.debug("Loaded field mapping from Index.xml: %s", field_mapping)
129 else:
130 logger.debug("Index.xml not found. Using default field mapping.")
131 except Exception as e:
132 logger.error("Error loading Index.xml: %s", e)
133 logger.debug("Using default field mapping due to error.")
135 # Get all image files in the directory BEFORE creating temp directory
136 # This prevents recursive mirroring of the temp directory
137 # Clause 245: Workspace operations are disk-only by design
138 # This call is structurally hardcoded to use the "disk" backend
139 image_files = filemanager.list_image_files(image_dir, Backend.DISK.value)
141 # Create a uniquely named temporary directory for renamed files
142 # Use "__opera_phenix_temp" to make it clearly identifiable
143 if isinstance(image_dir, str): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 temp_dir = os.path.join(image_dir, "__opera_phenix_temp")
145 else: # Path object
146 temp_dir = image_dir / "__opera_phenix_temp"
148 # SAFETY CHECK: Ensure temp directory is within workspace
149 if not str(temp_dir).startswith(str(workspace_path)): 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 logger.error("SAFETY VIOLATION: Temp directory would be created outside workspace: %s", temp_dir)
151 raise RuntimeError(f"Temp directory would be created outside workspace: {temp_dir}")
153 # Clause 245: Workspace operations are disk-only by design
154 # This call is structurally hardcoded to use the "disk" backend
155 filemanager.ensure_directory(temp_dir, Backend.DISK.value)
157 logger.debug("Created temporary directory for Opera Phenix workspace preparation: %s", temp_dir)
159 # Process each file
160 for file_path in image_files:
161 # FileManager should return strings, but handle Path objects too
162 if isinstance(file_path, str): 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true
163 file_name = os.path.basename(file_path)
164 file_path_obj = Path(file_path)
165 elif isinstance(file_path, Path):
166 file_name = file_path.name
167 file_path_obj = file_path
168 else:
169 # Skip any unexpected types
170 logger.warning("Unexpected file path type: %s", type(file_path).__name__)
171 continue
173 # Check if this is a symlink
174 if file_path_obj.is_symlink(): 174 ↛ 189line 174 didn't jump to line 189 because the condition on line 174 was always true
175 try:
176 # Get the target of the symlink (what it points to)
177 real_file_path = file_path_obj.resolve()
178 if not real_file_path.exists(): 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 logger.warning("Broken symlink detected: %s -> %s", file_path, real_file_path)
180 continue
181 # Store both the symlink path and the real file path
182 source_path = str(real_file_path)
183 symlink_target = str(real_file_path)
184 except Exception as e:
185 logger.warning("Failed to resolve symlink %s: %s", file_path, e)
186 continue
187 else:
188 # This should never happen in a properly mirrored workspace
189 logger.error("SAFETY VIOLATION: Found real file in workspace (should be symlink): %s", file_path)
190 raise RuntimeError(f"Workspace contains real file instead of symlink: {file_path}")
192 # Store the original symlink path for reference
193 original_symlink_path = str(file_path_obj)
195 # Parse file metadata
196 metadata = self.parser.parse_filename(file_name)
197 if not metadata or 'site' not in metadata or metadata['site'] is None: 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true
198 continue
200 # Remap the field ID using the spatial layout
201 original_field_id = metadata['site']
202 new_field_id = field_mapping.get(original_field_id, original_field_id)
204 # Construct the new filename with proper padding
205 new_name = self.parser.construct_filename(
206 well=metadata['well'],
207 site=new_field_id,
208 channel=metadata['channel'],
209 z_index=metadata['z_index'],
210 extension=metadata['extension'],
211 site_padding=3,
212 z_padding=3
213 )
215 # Create the new path in the temporary directory
216 if isinstance(temp_dir, str): 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 new_path = os.path.join(temp_dir, new_name)
218 else: # Path object
219 new_path = temp_dir / new_name
221 # Check if destination already exists in temp directory
222 try:
223 # Clause 245: Workspace operations are disk-only by design
224 # This call is structurally hardcoded to use the "disk" backend
225 if filemanager.exists(new_path, Backend.DISK.value): 225 ↛ 227line 225 didn't jump to line 227 because the condition on line 225 was never true
226 # For temp directory, we can be more aggressive and delete any existing file
227 logger.debug("File exists in temp directory, removing before copy: %s", new_path)
228 filemanager.delete(new_path, Backend.DISK.value)
230 # Create a symlink in the temp directory pointing to the original file
231 # Clause 245: Workspace operations are disk-only by design
232 # This call is structurally hardcoded to use the "disk" backend
233 filemanager.create_symlink(source_path, new_path, Backend.DISK.value)
234 logger.debug("Created symlink in temp directory: %s -> %s", new_path, source_path)
236 except Exception as e:
237 logger.error("Failed to copy file to temp directory: %s -> %s: %s",
238 source_path, new_path, e)
239 raise RuntimeError(f"Failed to copy file to temp directory: {e}") from e
241 # Clean up and replace old files - ONLY delete symlinks in workspace, NEVER original files
242 for file_path in image_files:
243 # Convert to Path object for symlink checking
244 file_path_obj = Path(file_path) if isinstance(file_path, str) else file_path
246 # SAFETY CHECK: Only delete if it's within the workspace directory
247 if not str(file_path_obj).startswith(str(workspace_path)): 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 logger.error("SAFETY VIOLATION: Attempted to delete file outside workspace: %s", file_path)
249 raise RuntimeError(f"Workspace preparation tried to delete file outside workspace: {file_path}")
251 # SAFETY CHECK: In workspace, only delete symlinks, never real files
252 if file_path_obj.is_symlink(): 252 ↛ 256line 252 didn't jump to line 256 because the condition on line 252 was always true
253 # Safe to delete - it's a symlink in the workspace
254 logger.debug("Deleting symlink in workspace: %s", file_path)
255 filemanager.delete(file_path, Backend.DISK.value)
256 elif file_path_obj.is_file():
257 # This should never happen in a properly mirrored workspace
258 logger.error("SAFETY VIOLATION: Found real file in workspace (should be symlink): %s", file_path)
259 raise RuntimeError(f"Workspace contains real file instead of symlink: {file_path}")
260 else:
261 logger.warning("File not found or not accessible: %s", file_path)
263 # Get all files in the temporary directory
264 # Clause 245: Workspace operations are disk-only by design
265 # This call is structurally hardcoded to use the "disk" backend
266 temp_files = filemanager.list_files(temp_dir, Backend.DISK.value)
268 # Move files from temporary directory to image directory
269 for temp_file in temp_files:
270 # FileManager should return strings, but handle Path objects too
271 if isinstance(temp_file, str): 271 ↛ 273line 271 didn't jump to line 273 because the condition on line 271 was always true
272 temp_file_name = os.path.basename(temp_file)
273 elif isinstance(temp_file, Path):
274 temp_file_name = temp_file.name
275 else:
276 # Skip any unexpected types
277 logger.warning("Unexpected file path type: %s", type(temp_file).__name__)
278 continue
279 if isinstance(image_dir, str): 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 dest_path = os.path.join(image_dir, temp_file_name)
281 else: # Path object
282 dest_path = image_dir / temp_file_name
284 try:
285 # Check if destination already exists in image directory
286 # Clause 245: Workspace operations are disk-only by design
287 # This call is structurally hardcoded to use the "disk" backend
288 if filemanager.exists(dest_path, Backend.DISK.value): 288 ↛ 290line 288 didn't jump to line 290 because the condition on line 288 was never true
289 # If destination is a symlink, ok to remove and replace
290 if filemanager.is_symlink(dest_path, Backend.DISK.value):
291 logger.debug("Destination is a symlink, removing before copy: %s", dest_path)
292 filemanager.delete(dest_path, Backend.DISK.value)
293 else:
294 # Not a symlink - could be a real file
295 logger.error("SAFETY VIOLATION: Destination exists and is not a symlink: %s", dest_path)
296 raise FileExistsError(f"Destination exists and is not a symlink: {dest_path}")
298 # First, if the temp file is a symlink, get its target
299 temp_file_obj = Path(temp_file) if isinstance(temp_file, str) else temp_file
300 if temp_file_obj.is_symlink(): 300 ↛ 316line 300 didn't jump to line 316 because the condition on line 300 was always true
301 try:
302 # Get the target that the temp symlink points to
303 real_target = temp_file_obj.resolve()
304 real_target_path = str(real_target)
306 # Create a new symlink in the image directory pointing to the original file
307 # Clause 245: Workspace operations are disk-only by design
308 # This call is structurally hardcoded to use the "disk" backend
309 filemanager.create_symlink(real_target_path, dest_path, Backend.DISK.value)
310 logger.debug("Created symlink in image directory: %s -> %s", dest_path, real_target_path)
311 except Exception as e:
312 logger.error("Failed to resolve symlink in temp directory: %s: %s", temp_file, e)
313 raise RuntimeError(f"Failed to resolve symlink: {e}") from e
314 else:
315 # This should never happen if we're using symlinks consistently
316 logger.warning("Temp file is not a symlink: %s", temp_file)
317 # Fall back to copying the file
318 filemanager.copy(temp_file, dest_path, Backend.DISK.value)
319 logger.debug("Copied file (not symlink) to image directory: %s -> %s", temp_file, dest_path)
321 # Remove the file from the temporary directory
322 # Clause 245: Workspace operations are disk-only by design
323 # This call is structurally hardcoded to use the "disk" backend
324 filemanager.delete(temp_file, Backend.DISK.value)
326 except FileExistsError as e:
327 # Re-raise with clear message
328 logger.error("Cannot copy to destination: %s", e)
329 raise
330 except Exception as e:
331 logger.error("Error copying from temp to destination: %s -> %s: %s",
332 temp_file, dest_path, e)
333 raise RuntimeError(f"Failed to process file from temp directory: {e}") from e
335 # SAFETY CHECK: Validate temp directory before deletion
336 if not str(temp_dir).startswith(str(workspace_path)): 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true
337 logger.error("SAFETY VIOLATION: Attempted to delete temp directory outside workspace: %s", temp_dir)
338 raise RuntimeError(f"Attempted to delete temp directory outside workspace: {temp_dir}")
340 if not "__opera_phenix_temp" in str(temp_dir): 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 logger.error("SAFETY VIOLATION: Attempted to delete non-temp directory: %s", temp_dir)
342 raise RuntimeError(f"Attempted to delete non-temp directory: {temp_dir}")
344 # Remove the temporary directory
345 # Clause 245: Workspace operations are disk-only by design
346 # This call is structurally hardcoded to use the "disk" backend
347 try:
348 filemanager.delete(temp_dir, Backend.DISK.value)
349 logger.debug("Successfully removed temporary directory: %s", temp_dir)
350 except Exception as e:
351 # Non-fatal error, just log it
352 logger.warning("Failed to remove temporary directory %s: %s", temp_dir, e)
354 return image_dir
357class OperaPhenixFilenameParser(FilenameParser):
358 """Parser for Opera Phenix microscope filenames.
360 Handles Opera Phenix format filenames like:
361 - r01c01f001p01-ch1sk1fk1fl1.tiff
362 - r01c01f001p01-ch1.tiff
363 """
365 # Regular expression pattern for Opera Phenix filenames
366 _pattern = re.compile(r"r(\d{1,2})c(\d{1,2})f(\d+|\{[^\}]*\})p(\d+|\{[^\}]*\})-ch(\d+|\{[^\}]*\})(?:sk\d+)?(?:fk\d+)?(?:fl\d+)?(\.\w+)$", re.I)
368 # Pattern for extracting row and column from Opera Phenix well format
369 _well_pattern = re.compile(r"R(\d{2})C(\d{2})", re.I)
371 def __init__(self, filemanager=None, pattern_format=None):
372 """
373 Initialize the parser.
375 Args:
376 filemanager: FileManager instance (not used, but required for interface compatibility)
377 pattern_format: Optional pattern format (not used, but required for interface compatibility)
378 """
379 # These parameters are not used by this parser, but are required for interface compatibility
380 self.filemanager = filemanager
381 self.pattern_format = pattern_format
383 @classmethod
384 def can_parse(cls, filename: str) -> bool:
385 """
386 Check if this parser can parse the given filename.
388 Args:
389 filename (str): Filename to check
391 Returns:
392 bool: True if this parser can parse the filename, False otherwise
393 """
394 # 🔒 Clause 17 — VFS Boundary Method
395 # This is a string operation that doesn't perform actual file I/O
396 # Extract just the basename
397 basename = os.path.basename(filename)
398 # Check if the filename matches the Opera Phenix pattern
399 return bool(cls._pattern.match(basename))
401 def parse_filename(self, filename: str) -> Optional[Dict[str, Any]]:
402 """
403 Parse an Opera Phenix filename to extract all components.
404 Supports placeholders like {iii} which will return None for that field.
406 Args:
407 filename (str): Filename to parse
409 Returns:
410 dict or None: Dictionary with extracted components or None if parsing fails.
411 """
412 # 🔒 Clause 17 — VFS Boundary Method
413 # This is a string operation that doesn't perform actual file I/O
414 basename = os.path.basename(filename)
415 logger.debug("OperaPhenixFilenameParser attempting to parse basename: '%s'", basename)
417 # Try parsing using the Opera Phenix pattern
418 match = self._pattern.match(basename)
419 if match: 419 ↛ 448line 419 didn't jump to line 448 because the condition on line 419 was always true
420 logger.debug("Regex match successful for '%s'", basename)
421 row, col, site_str, z_str, channel_str, ext = match.groups()
423 # Helper function to parse component strings
424 def parse_comp(s):
425 """Parse component string to int or None if it's a placeholder."""
426 if not s or '{' in s: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 return None
428 return int(s)
430 # Create well ID from row and column
431 well = f"R{int(row):02d}C{int(col):02d}"
433 # Parse components
434 site = parse_comp(site_str)
435 channel = parse_comp(channel_str)
436 z_index = parse_comp(z_str)
438 result = {
439 'well': well,
440 'site': site,
441 'channel': channel,
442 'wavelength': channel, # For backward compatibility
443 'z_index': z_index,
444 'extension': ext if ext else '.tif'
445 }
446 return result
448 logger.warning("Regex match failed for basename: '%s'", basename)
449 return None
451 def construct_filename(self, well: str, site: Optional[Union[int, str]] = None, channel: Optional[int] = None,
452 z_index: Optional[Union[int, str]] = None, extension: str = '.tiff',
453 site_padding: int = 3, z_padding: int = 3) -> str:
454 """
455 Construct an Opera Phenix filename from components.
457 Args:
458 well (str): Well ID (e.g., 'R03C04' or 'A01')
459 site: Site/field number (int) or placeholder string
460 channel (int): Channel number
461 z_index: Z-index/plane (int) or placeholder string
462 extension (str, optional): File extension
463 site_padding (int, optional): Width to pad site numbers to (default: 3)
464 z_padding (int, optional): Width to pad Z-index numbers to (default: 3)
466 Returns:
467 str: Constructed filename
468 """
469 # Extract row and column from well name
470 # Check if well is in Opera Phenix format (e.g., 'R01C03')
471 match = self._well_pattern.match(well)
472 if match: 472 ↛ 477line 472 didn't jump to line 477 because the condition on line 472 was always true
473 # Extract row and column from Opera Phenix format
474 row = int(match.group(1))
475 col = int(match.group(2))
476 else:
477 raise ValueError(f"Invalid well format: {well}. Expected format: 'R01C03'")
479 # Default Z-index to 1 if not provided
480 z_index = 1 if z_index is None else z_index
481 channel = 1 if channel is None else channel
483 # Construct filename in Opera Phenix format
484 if isinstance(site, str):
485 # If site is a string (e.g., '{iii}'), use it directly
486 site_part = f"f{site}"
487 else:
488 # Otherwise, format it as a padded integer
489 site_part = f"f{site:0{site_padding}d}"
491 if isinstance(z_index, str):
492 # If z_index is a string (e.g., '{zzz}'), use it directly
493 z_part = f"p{z_index}"
494 else:
495 # Otherwise, format it as a padded integer
496 z_part = f"p{z_index:0{z_padding}d}"
498 return f"r{row:02d}c{col:02d}{site_part}{z_part}-ch{channel}sk1fk1fl1{extension}"
500 def remap_field_in_filename(self, filename: str, xml_parser: Optional[OperaPhenixXmlParser] = None) -> str:
501 """
502 Remap the field ID in a filename to follow a top-left to bottom-right pattern.
504 Args:
505 filename: Original filename
506 xml_parser: Parser with XML data
508 Returns:
509 str: New filename with remapped field ID
510 """
511 if xml_parser is None:
512 return filename
514 # Parse the filename
515 metadata = self.parse_filename(filename)
516 if not metadata or 'site' not in metadata or metadata['site'] is None:
517 return filename
519 # Get the mapping and remap the field ID
520 mapping = xml_parser.get_field_id_mapping()
521 new_field_id = xml_parser.remap_field_id(metadata['site'], mapping)
523 # Always create a new filename with the remapped field ID and consistent padding
524 # This ensures all filenames have the same format, even if the field ID didn't change
525 return self.construct_filename(
526 well=metadata['well'],
527 site=new_field_id,
528 channel=metadata['channel'],
529 z_index=metadata['z_index'],
530 extension=metadata['extension'],
531 site_padding=3,
532 z_padding=3
533 )
535 def extract_row_column(self, well: str) -> Tuple[str, str]:
536 """
537 Extract row and column from Opera Phenix well identifier.
539 Args:
540 well (str): Well identifier (e.g., 'R03C04' or 'A01')
542 Returns:
543 Tuple[str, str]: (row, column) where row is like 'A', 'B' and column is like '01', '04'
545 Raises:
546 ValueError: If well format is invalid
547 """
548 if not well: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true
549 raise ValueError(f"Invalid well format: {well}")
551 # Check if well is in Opera Phenix format (e.g., 'R01C03')
552 match = self._well_pattern.match(well)
553 if match: 553 ↛ 563line 553 didn't jump to line 563 because the condition on line 553 was always true
554 # Extract row and column from Opera Phenix format
555 row_num = int(match.group(1))
556 col_num = int(match.group(2))
557 # Convert to letter-number format: R01C03 -> A, 03
558 row = chr(ord('A') + row_num - 1) # R01 -> A, R02 -> B, etc.
559 col = f"{col_num:02d}" # Ensure 2-digit padding
560 return row, col
561 else:
562 # Assume simple format like 'A01', 'C04'
563 if len(well) < 2:
564 raise ValueError(f"Invalid well format: {well}")
565 row = well[0]
566 col = well[1:]
567 if not row.isalpha() or not col.isdigit():
568 raise ValueError(f"Invalid Opera Phenix well format: {well}. Expected 'R01C03' or 'A01' format")
569 return row, col
572class OperaPhenixMetadataHandler(MetadataHandler):
573 """
574 Metadata handler for Opera Phenix microscopes.
576 Handles finding and parsing Index.xml files for Opera Phenix microscopes.
577 """
579 def __init__(self, filemanager: FileManager):
580 """
581 Initialize the metadata handler.
583 Args:
584 filemanager: FileManager instance for file operations.
585 """
586 super().__init__()
587 self.filemanager = filemanager
589 # Legacy mode has been completely purged
591 def find_metadata_file(self, plate_path: Union[str, Path]):
592 """
593 Find the Index.xml file in the plate directory.
595 Args:
596 plate_path: Path to the plate directory
598 Returns:
599 Path to the Index.xml file
601 Raises:
602 FileNotFoundError: If no Index.xml file is found
603 """
604 # Ensure plate_path is a Path object
605 if isinstance(plate_path, str): 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true
606 plate_path = Path(plate_path)
608 # Ensure the path exists
609 if not plate_path.exists(): 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true
610 raise FileNotFoundError(f"Plate path does not exist: {plate_path}")
612 # Check for Index.xml in the plate directory
613 index_xml = plate_path / "Index.xml"
614 if index_xml.exists():
615 return index_xml
617 # Check for Index.xml in the Images directory
618 images_dir = plate_path / "Images"
619 if images_dir.exists(): 619 ↛ 625line 619 didn't jump to line 625 because the condition on line 619 was always true
620 index_xml = images_dir / "Index.xml"
621 if index_xml.exists(): 621 ↛ 625line 621 didn't jump to line 625 because the condition on line 621 was always true
622 return index_xml
624 # No recursive search - only check root and Images directories
625 raise FileNotFoundError(
626 f"Index.xml not found in {plate_path} or {plate_path}/Images. "
627 "Opera Phenix metadata requires Index.xml file."
628 )
630 # Ensure result is a Path object
631 if isinstance(result, str):
632 return Path(result)
633 if isinstance(result, Path):
634 return result
635 # This should not happen if FileManager is properly implemented
636 logger.warning("Unexpected result type from find_file_recursive: %s", type(result).__name__)
637 return Path(str(result))
639 def get_grid_dimensions(self, plate_path: Union[str, Path]):
640 """
641 Get grid dimensions for stitching from Index.xml file.
643 Args:
644 plate_path: Path to the plate folder
646 Returns:
647 Tuple of (grid_rows, grid_cols) - UPDATED: Now returns (rows, cols) for MIST compatibility
649 Raises:
650 FileNotFoundError: If no Index.xml file is found
651 OperaPhenixXmlParseError: If the XML cannot be parsed
652 OperaPhenixXmlContentError: If grid dimensions cannot be determined
653 """
654 # Ensure plate_path is a Path object
655 if isinstance(plate_path, str): 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true
656 plate_path = Path(plate_path)
658 # Ensure the path exists
659 if not plate_path.exists(): 659 ↛ 660line 659 didn't jump to line 660 because the condition on line 659 was never true
660 raise FileNotFoundError(f"Plate path does not exist: {plate_path}")
662 # Find the Index.xml file - this will raise FileNotFoundError if not found
663 index_xml = self.find_metadata_file(plate_path)
665 # Use the OperaPhenixXmlParser to get the grid size
666 # This will raise appropriate exceptions if parsing fails
667 xml_parser = self.create_xml_parser(index_xml)
668 grid_size = xml_parser.get_grid_size()
670 # Validate the grid size
671 if grid_size[0] <= 0 or grid_size[1] <= 0: 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true
672 raise ValueError(
673 f"Invalid grid dimensions: {grid_size[0]}x{grid_size[1]}. "
674 "Grid dimensions must be positive integers."
675 )
677 logger.info("Grid size from Index.xml: %dx%d (cols x rows)", grid_size[0], grid_size[1])
678 # FIXED: Return (rows, cols) for MIST compatibility instead of (cols, rows)
679 return (grid_size[1], grid_size[0])
681 def get_pixel_size(self, plate_path: Union[str, Path]):
682 """
683 Get the pixel size from Index.xml file.
685 Args:
686 plate_path: Path to the plate folder
688 Returns:
689 Pixel size in micrometers
691 Raises:
692 FileNotFoundError: If no Index.xml file is found
693 OperaPhenixXmlParseError: If the XML cannot be parsed
694 OperaPhenixXmlContentError: If pixel size cannot be determined
695 """
696 # Ensure plate_path is a Path object
697 if isinstance(plate_path, str): 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true
698 plate_path = Path(plate_path)
700 # Ensure the path exists
701 if not plate_path.exists(): 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true
702 raise FileNotFoundError(f"Plate path does not exist: {plate_path}")
704 # Find the Index.xml file - this will raise FileNotFoundError if not found
705 index_xml = self.find_metadata_file(plate_path)
707 # Use the OperaPhenixXmlParser to get the pixel size
708 # This will raise appropriate exceptions if parsing fails
709 xml_parser = self.create_xml_parser(index_xml)
710 pixel_size = xml_parser.get_pixel_size()
712 # Validate the pixel size
713 if pixel_size <= 0: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true
714 raise ValueError(
715 f"Invalid pixel size: {pixel_size}. "
716 "Pixel size must be a positive number."
717 )
719 logger.info("Pixel size from Index.xml: %.4f μm", pixel_size)
720 return pixel_size
722 def get_channel_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]:
723 """
724 Get channel key→name mapping from Opera Phenix Index.xml.
726 Args:
727 plate_path: Path to the plate folder (str or Path)
729 Returns:
730 Dict mapping channel IDs to channel names from metadata
731 Example: {"1": "HOECHST 33342", "2": "Calcein", "3": "Alexa 647"}
732 """
733 try:
734 # Ensure plate_path is a Path object
735 if isinstance(plate_path, str): 735 ↛ 736line 735 didn't jump to line 736 because the condition on line 735 was never true
736 plate_path = Path(plate_path)
738 # Find and parse Index.xml
739 index_xml = self.find_metadata_file(plate_path)
740 xml_parser = self.create_xml_parser(index_xml)
742 # Extract channel information
743 channel_mapping = {}
745 # Look for channel entries in the XML
746 # Opera Phenix stores channel info in multiple places, try the most common
747 root = xml_parser.root
748 namespace = xml_parser.namespace
750 # Find channel entries with ChannelName elements
751 channel_entries = root.findall(f".//{namespace}Entry[@ChannelID]")
752 for entry in channel_entries:
753 channel_id = entry.get('ChannelID')
754 channel_name_elem = entry.find(f"{namespace}ChannelName")
756 if channel_id and channel_name_elem is not None: 756 ↛ 752line 756 didn't jump to line 752 because the condition on line 756 was always true
757 channel_name = channel_name_elem.text
758 if channel_name: 758 ↛ 752line 758 didn't jump to line 752 because the condition on line 758 was always true
759 channel_mapping[channel_id] = channel_name
761 return channel_mapping if channel_mapping else None
763 except Exception as e:
764 logger.debug(f"Could not extract channel names from Opera Phenix metadata: {e}")
765 return None
767 def get_well_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]:
768 """
769 Get well key→name mapping from Opera Phenix metadata.
771 Args:
772 plate_path: Path to the plate folder (str or Path)
774 Returns:
775 None - Opera Phenix doesn't provide rich well names in metadata
776 """
777 return None
779 def get_site_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]:
780 """
781 Get site key→name mapping from Opera Phenix metadata.
783 Args:
784 plate_path: Path to the plate folder (str or Path)
786 Returns:
787 None - Opera Phenix doesn't provide rich site names in metadata
788 """
789 return None
791 def get_z_index_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]:
792 """
793 Get z_index key→name mapping from Opera Phenix metadata.
795 Args:
796 plate_path: Path to the plate folder (str or Path)
798 Returns:
799 None - Opera Phenix doesn't provide rich z_index names in metadata
800 """
801 return None
805 def create_xml_parser(self, xml_path: Union[str, Path]):
806 """
807 Create an OperaPhenixXmlParser for the given XML file.
809 Args:
810 xml_path: Path to the XML file
812 Returns:
813 OperaPhenixXmlParser: Parser for the XML file
815 Raises:
816 FileNotFoundError: If the XML file does not exist
817 """
818 # Ensure xml_path is a Path object
819 if isinstance(xml_path, str): 819 ↛ 820line 819 didn't jump to line 820 because the condition on line 819 was never true
820 xml_path = Path(xml_path)
822 # Ensure the path exists
823 if not xml_path.exists(): 823 ↛ 824line 823 didn't jump to line 824 because the condition on line 823 was never true
824 raise FileNotFoundError(f"XML file does not exist: {xml_path}")
826 # Create the parser
827 return OperaPhenixXmlParser(xml_path)
830# Set metadata handler class after class definition for automatic registration
831from openhcs.microscopes.microscope_base import register_metadata_handler
832OperaPhenixHandler._metadata_handler_class = OperaPhenixMetadataHandler
833register_metadata_handler(OperaPhenixHandler, OperaPhenixMetadataHandler)