Coverage for openhcs/microscopes/opera_phenix_xml_parser.py: 45.8%
258 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Opera Phenix XML parser for openhcs.
4This module provides a class for parsing Opera Phenix Index.xml files.
5"""
7import logging
8import re
9import xml.etree.ElementTree as ET
10from pathlib import Path
11from typing import Any, Dict, Optional, Tuple, Union
13import numpy as np
15logger = logging.getLogger(__name__)
18class OperaPhenixXmlError(Exception):
19 """Base exception for Opera Phenix XML parsing errors."""
20 pass
23class OperaPhenixXmlParseError(OperaPhenixXmlError):
24 """Exception raised when parsing the XML file fails."""
25 pass
28class OperaPhenixXmlContentError(OperaPhenixXmlError):
29 """Exception raised when the XML content is invalid or missing required elements."""
30 pass
33class OperaPhenixXmlParser:
34 """Parser for Opera Phenix Index.xml files."""
36 def __init__(self, xml_path: Union[str, Path]):
37 """
38 Initialize the parser with the path to the Index.xml file.
40 Args:
41 xml_path: Path to the Index.xml file (string or Path object)
42 """
43 # Convert to Path object for filesystem operations
44 if isinstance(xml_path, str):
45 self.xml_path = Path(xml_path)
46 else:
47 self.xml_path = xml_path
49 # Ensure the path exists
50 if not self.xml_path.exists(): 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true
51 raise FileNotFoundError(f"XML file does not exist: {self.xml_path}")
53 self.tree = None
54 self.root = None
55 self.namespace = ""
56 self._parse_xml()
58 def _parse_xml(self):
59 """
60 Parse the XML file and extract the namespace.
62 Raises:
63 FileNotFoundError: If the XML file doesn't exist
64 PermissionError: If there's no permission to read the file
65 OperaPhenixXmlParseError: If the XML is malformed or cannot be parsed
66 TypeError: If the XML path is not a string or Path object
67 AttributeError: If the XML structure is unexpected
68 ValueError: If there are issues with the XML content
69 """
70 try:
71 self.tree = ET.parse(self.xml_path)
72 self.root = self.tree.getroot()
74 # Extract namespace from the root tag
75 match = re.match(r'{.*}', self.root.tag)
76 self.namespace = match.group(0) if match else ""
78 logger.info("Parsed Opera Phenix XML file: %s", self.xml_path)
79 logger.debug("XML namespace: %s", self.namespace)
80 except FileNotFoundError:
81 logger.error("XML file not found: %s", self.xml_path)
82 raise
83 except PermissionError:
84 logger.error("Permission denied when reading XML file: %s", self.xml_path)
85 raise
86 except ET.ParseError as e:
87 logger.error("XML parse error in file %s: %s", self.xml_path, e)
88 raise OperaPhenixXmlParseError(f"Failed to parse XML file {self.xml_path}: {e}")
89 except re.error as e:
90 logger.error("Regex error when extracting namespace from %s: %s", self.xml_path, e)
91 raise OperaPhenixXmlParseError(f"Failed to extract namespace from XML file {self.xml_path}: {e}")
92 except TypeError as e:
93 logger.error("Type error when parsing XML file %s: %s", self.xml_path, e)
94 raise TypeError(f"Invalid type for XML path: {e}")
95 except AttributeError as e:
96 logger.error("Attribute error when parsing XML file %s: %s", self.xml_path, e)
97 raise OperaPhenixXmlParseError(f"Unexpected XML structure in file {self.xml_path}: {e}")
98 except ValueError as e:
99 logger.error("Value error when parsing XML file %s: %s", self.xml_path, e)
100 raise OperaPhenixXmlParseError(f"Invalid value in XML file {self.xml_path}: {e}")
102 def get_plate_info(self) -> Dict[str, Any]:
103 """
104 Extract plate information from the XML.
106 Returns:
107 Dict containing plate information
109 Raises:
110 OperaPhenixXmlParseError: If XML is not parsed
111 OperaPhenixXmlContentError: If Plate element is missing or required elements are missing
112 """
113 if self.root is None:
114 raise OperaPhenixXmlParseError("XML not parsed, cannot retrieve plate information")
116 plate_elem = self.root.find(f".//{self.namespace}Plate")
117 if plate_elem is None:
118 raise OperaPhenixXmlContentError("No Plate element found in XML")
120 plate_rows_text = self._get_element_text(plate_elem, 'PlateRows')
121 plate_columns_text = self._get_element_text(plate_elem, 'PlateColumns')
123 if plate_rows_text is None:
124 raise OperaPhenixXmlContentError("PlateRows element missing or empty in XML")
125 if plate_columns_text is None:
126 raise OperaPhenixXmlContentError("PlateColumns element missing or empty in XML")
128 plate_info = {
129 'plate_id': self._get_element_text(plate_elem, 'PlateID'),
130 'measurement_id': self._get_element_text(plate_elem, 'MeasurementID'),
131 'plate_type': self._get_element_text(plate_elem, 'PlateTypeName'),
132 'rows': int(plate_rows_text),
133 'columns': int(plate_columns_text),
134 }
136 # Get well IDs
137 well_elems = plate_elem.findall(f"{self.namespace}Well")
138 plate_info['wells'] = [well.get('id') for well in well_elems if well.get('id')]
140 logger.debug("Plate info: %s", plate_info)
141 return plate_info
143 def get_grid_size(self) -> Tuple[int, int]:
144 """
145 Determine the grid size (number of fields per well) by analyzing image positions.
147 This method analyzes the positions of images for a single well, channel, and plane
148 to determine the grid dimensions.
150 Returns:
151 Tuple of (grid_size_x, grid_size_y) - NOTE: Still returns (cols, rows) format
152 The calling handler will swap this to (rows, cols) for MIST compatibility
154 Raises:
155 OperaPhenixXmlParseError: If XML is not parsed
156 OperaPhenixXmlContentError: If no Image elements are found or grid size cannot be determined
157 """
158 if self.root is None: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true
159 raise OperaPhenixXmlParseError("XML not parsed, cannot determine grid size")
161 # Get all image elements
162 image_elements = self.root.findall(f".//{self.namespace}Image")
164 if not image_elements: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 raise OperaPhenixXmlContentError("No Image elements found in XML")
167 # Group images by well (Row+Col), channel, and plane
168 # We'll use the first group with multiple fields to determine grid size
169 image_groups = {}
171 for image in image_elements:
172 # Extract well, channel, and plane information
173 row_elem = image.find(f"{self.namespace}Row")
174 col_elem = image.find(f"{self.namespace}Col")
175 channel_elem = image.find(f"{self.namespace}ChannelID")
176 plane_elem = image.find(f"{self.namespace}PlaneID")
178 if (row_elem is not None and row_elem.text and
179 col_elem is not None and col_elem.text and
180 channel_elem is not None and channel_elem.text and
181 plane_elem is not None and plane_elem.text):
183 # Create a key for grouping
184 group_key = f"R{row_elem.text}C{col_elem.text}_CH{channel_elem.text}_P{plane_elem.text}"
186 # Extract position information
187 pos_x_elem = image.find(f"{self.namespace}PositionX")
188 pos_y_elem = image.find(f"{self.namespace}PositionY")
189 field_elem = image.find(f"{self.namespace}FieldID")
191 if (pos_x_elem is not None and pos_x_elem.text and 191 ↛ 171line 191 didn't jump to line 171 because the condition on line 191 was always true
192 pos_y_elem is not None and pos_y_elem.text and
193 field_elem is not None and field_elem.text):
195 try:
196 # Parse position values
197 x_value = float(pos_x_elem.text)
198 y_value = float(pos_y_elem.text)
199 field_id = int(field_elem.text)
201 # Add to group
202 if group_key not in image_groups:
203 image_groups[group_key] = []
205 image_groups[group_key].append({
206 'field_id': field_id,
207 'pos_x': x_value,
208 'pos_y': y_value,
209 'pos_x_unit': pos_x_elem.get('Unit', ''),
210 'pos_y_unit': pos_y_elem.get('Unit', '')
211 })
212 except ValueError as e:
213 logger.warning("Could not parse position values (invalid number format) for image in group %s: %s", group_key, e)
214 except TypeError as e:
215 logger.warning("Could not parse position values (wrong type) for image in group %s: %s", group_key, e)
217 # Find the first group with multiple fields
218 for group_key, images in image_groups.items(): 218 ↛ 262line 218 didn't jump to line 262 because the loop on line 218 didn't complete
219 if len(images) > 1: 219 ↛ 218line 219 didn't jump to line 218 because the condition on line 219 was always true
220 logger.debug("Using image group %s with %d fields to determine grid size", group_key, len(images))
222 # Extract unique X and Y positions
223 # Use a small epsilon for floating point comparison
224 epsilon = 1e-10
225 x_positions = [img['pos_x'] for img in images]
226 y_positions = [img['pos_y'] for img in images]
228 # Use numpy to find unique positions
229 unique_x = np.unique(np.round(np.array(x_positions) / epsilon) * epsilon)
230 unique_y = np.unique(np.round(np.array(y_positions) / epsilon) * epsilon)
232 # Count unique positions
233 num_x_positions = len(unique_x)
234 num_y_positions = len(unique_y)
236 # If we have a reasonable number of positions, use them as grid dimensions
237 if num_x_positions > 0 and num_y_positions > 0: 237 ↛ 242line 237 didn't jump to line 242 because the condition on line 237 was always true
238 logger.info("Determined grid size from positions: %dx%d", num_x_positions, num_y_positions)
239 return (num_x_positions, num_y_positions)
241 # Alternative approach: try to infer grid size from field IDs
242 if len(images) > 1:
243 # Sort images by field ID
244 sorted_images = sorted(images, key=lambda x: x['field_id'])
245 max_field_id = sorted_images[-1]['field_id']
247 # Try to determine if it's a square grid
248 grid_size = int(np.sqrt(max_field_id) + 0.5) # Round to nearest integer
250 if grid_size ** 2 == max_field_id:
251 logger.info("Determined square grid size from field IDs: %dx%d", grid_size, grid_size)
252 return (grid_size, grid_size)
254 # If not a perfect square, try to find factors
255 for i in range(1, int(np.sqrt(max_field_id)) + 1):
256 if max_field_id % i == 0:
257 j = max_field_id // i
258 logger.info("Determined grid size from field IDs: %dx%d", i, j)
259 return (i, j)
261 # If we couldn't determine grid size, raise an error
262 raise OperaPhenixXmlContentError("Could not determine grid size from XML data")
264 def get_pixel_size(self) -> float:
265 """
266 Extract pixel size from the XML.
268 The pixel size is stored in ImageResolutionX/Y elements with Unit="m".
270 Returns:
271 Pixel size in micrometers (μm)
273 Raises:
274 OperaPhenixXmlParseError: If XML is not parsed
275 OperaPhenixXmlContentError: If pixel size cannot be determined or parsed
276 """
277 if self.root is None: 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true
278 raise OperaPhenixXmlParseError("XML not parsed, cannot determine pixel size")
280 # Try to find ImageResolutionX element
281 resolution_x = self.root.find(f".//{self.namespace}ImageResolutionX")
282 if resolution_x is not None and resolution_x.text: 282 ↛ 296line 282 didn't jump to line 296 because the condition on line 282 was always true
283 try:
284 # Convert from meters to micrometers
285 pixel_size = float(resolution_x.text) * 1e6
286 logger.info("Found pixel size from ImageResolutionX: %.4f μm", pixel_size)
287 return pixel_size
288 except ValueError as e:
289 logger.warning("Could not parse pixel size from ImageResolutionX (invalid number format): %s", e)
290 # Continue to try ImageResolutionY
291 except TypeError as e:
292 logger.warning("Could not parse pixel size from ImageResolutionX (wrong type): %s", e)
293 # Continue to try ImageResolutionY
295 # If not found in ImageResolutionX, try ImageResolutionY
296 resolution_y = self.root.find(f".//{self.namespace}ImageResolutionY")
297 if resolution_y is not None and resolution_y.text:
298 try:
299 # Convert from meters to micrometers
300 pixel_size = float(resolution_y.text) * 1e6
301 logger.info("Found pixel size from ImageResolutionY: %.4f μm", pixel_size)
302 return pixel_size
303 except ValueError as e:
304 logger.warning("Could not parse pixel size from ImageResolutionY (invalid number format): %s", e)
305 # Fall through to the error case
306 except TypeError as e:
307 logger.warning("Could not parse pixel size from ImageResolutionY (wrong type): %s", e)
308 # Fall through to the error case
310 # If not found, raise an error
311 raise OperaPhenixXmlContentError("Pixel size not found or could not be parsed in XML")
315 def get_image_info(self) -> Dict[str, Dict[str, Any]]:
316 """
317 Extract image information from the XML.
319 Returns:
320 Dictionary mapping image IDs to dictionaries containing image information
322 Raises:
323 OperaPhenixXmlParseError: If XML is not parsed
324 OperaPhenixXmlContentError: If no Image elements are found or required elements are missing
325 """
326 if self.root is None:
327 raise OperaPhenixXmlParseError("XML not parsed, cannot retrieve image information")
329 # Look for Image elements
330 image_elems = self.root.findall(f".//{self.namespace}Image[@Version]")
331 if not image_elems:
332 raise OperaPhenixXmlContentError("No Image elements with Version attribute found in XML")
334 image_info = {}
335 for image in image_elems:
336 image_id = self._get_element_text(image, 'id')
337 if image_id:
338 row_text = self._get_element_text(image, 'Row')
339 col_text = self._get_element_text(image, 'Col')
340 field_id_text = self._get_element_text(image, 'FieldID')
341 plane_id_text = self._get_element_text(image, 'PlaneID')
342 channel_id_text = self._get_element_text(image, 'ChannelID')
344 # Validate required fields
345 if row_text is None:
346 raise OperaPhenixXmlContentError(f"Row element missing or empty for image {image_id}")
347 if col_text is None:
348 raise OperaPhenixXmlContentError(f"Col element missing or empty for image {image_id}")
349 if field_id_text is None:
350 raise OperaPhenixXmlContentError(f"FieldID element missing or empty for image {image_id}")
351 if plane_id_text is None:
352 raise OperaPhenixXmlContentError(f"PlaneID element missing or empty for image {image_id}")
353 if channel_id_text is None:
354 raise OperaPhenixXmlContentError(f"ChannelID element missing or empty for image {image_id}")
356 image_data = {
357 'url': self._get_element_text(image, 'URL'),
358 'row': int(row_text),
359 'col': int(col_text),
360 'field_id': int(field_id_text),
361 'plane_id': int(plane_id_text),
362 'channel_id': int(channel_id_text),
363 'position_x': self._get_element_text(image, 'PositionX'),
364 'position_y': self._get_element_text(image, 'PositionY'),
365 'position_z': self._get_element_text(image, 'PositionZ'),
366 }
367 image_info[image_id] = image_data
369 logger.debug("Found %d images in XML", len(image_info))
370 return image_info
374 def get_well_positions(self) -> Dict[str, Tuple[int, int]]:
375 """
376 Extract well positions from the XML.
378 Returns:
379 Dictionary mapping well IDs to (row, column) tuples
381 Raises:
382 OperaPhenixXmlParseError: If XML is not parsed
383 OperaPhenixXmlContentError: If no Well elements are found
384 """
385 if self.root is None:
386 raise OperaPhenixXmlParseError("XML not parsed, cannot retrieve well positions")
388 # Look for Well elements
389 well_elems = self.root.findall(f".//{self.namespace}Wells/{self.namespace}Well")
390 if not well_elems:
391 raise OperaPhenixXmlContentError("No Well elements found in XML")
393 well_positions = {}
394 for well in well_elems:
395 well_id = self._get_element_text(well, 'id')
396 row = self._get_element_text(well, 'Row')
397 col = self._get_element_text(well, 'Col')
399 if well_id and row and col:
400 well_positions[well_id] = (int(row), int(col))
402 logger.debug("Well positions: %s", well_positions)
403 return well_positions
405 def _get_element_text(self, parent_elem, tag_name: str) -> Optional[str]:
406 """Helper method to get element text with namespace."""
407 elem = parent_elem.find(f"{self.namespace}{tag_name}")
408 return elem.text if elem is not None else None
410 def _get_element_attribute(self, parent_elem, tag_name: str, attr_name: str) -> Optional[str]:
411 """Helper method to get element attribute with namespace."""
412 elem = parent_elem.find(f"{self.namespace}{tag_name}")
413 return elem.get(attr_name) if elem is not None else None
415 def get_field_positions(self) -> Dict[int, Tuple[float, float]]:
416 """
417 Extract field IDs and their X,Y positions from the Index.xml file.
419 Returns:
420 dict: Mapping of field IDs to (x, y) position tuples
422 Raises:
423 OperaPhenixXmlParseError: If XML is not parsed
424 """
425 if self.root is None: 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true
426 raise OperaPhenixXmlParseError("XML not parsed, cannot extract field positions")
428 field_positions = {}
430 # Find all Image elements
431 image_elems = self.root.findall(f".//{self.namespace}Image")
433 for image in image_elems:
434 # Check if this element has FieldID, PositionX, and PositionY children
435 field_id_elem = image.find(f"{self.namespace}FieldID")
436 pos_x_elem = image.find(f"{self.namespace}PositionX")
437 pos_y_elem = image.find(f"{self.namespace}PositionY")
439 if field_id_elem is not None and pos_x_elem is not None and pos_y_elem is not None:
440 try:
441 field_id = int(field_id_elem.text)
442 pos_x = float(pos_x_elem.text)
443 pos_y = float(pos_y_elem.text)
445 # Only add if we don't already have this field ID
446 if field_id not in field_positions:
447 field_positions[field_id] = (pos_x, pos_y)
448 except ValueError as e:
449 # Skip entries with invalid number format
450 logger.debug("Skipping field with invalid number format: %s", e)
451 continue
452 except TypeError as e:
453 # Skip entries with wrong type
454 logger.debug("Skipping field with wrong type: %s", e)
455 continue
457 return field_positions
459 def sort_fields_by_position(self, positions: Dict[int, Tuple[float, float]]) -> list:
460 """
461 Sort fields based on their positions in a raster pattern starting from the top.
462 All rows go left-to-right in a consistent raster scan pattern.
464 Args:
465 positions: Dictionary mapping field IDs to (x, y) position tuples
467 Returns:
468 list: Field IDs sorted in raster pattern order starting from the top
469 """
470 if not positions: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true
471 return []
473 # Get all unique x and y coordinates
474 x_coords = sorted(set(pos[0] for pos in positions.values()))
475 y_coords = sorted(set(pos[1] for pos in positions.values()), reverse=True) # Reverse to get top row first
477 # Create a grid of field IDs
478 grid = {}
479 for field_id, (x, y) in positions.items():
480 # Find the closest x and y coordinates in our sorted lists
481 x_idx = x_coords.index(x)
482 y_idx = y_coords.index(y) # This will now map top row to index 0
483 grid[(x_idx, y_idx)] = field_id
485 # Debug output to help diagnose field mapping issues
486 logger.info("Field position grid:")
487 for y_idx in range(len(y_coords)):
488 row_str = ""
489 for x_idx in range(len(x_coords)):
490 field_id = grid.get((x_idx, y_idx), 0)
491 row_str += f"{field_id:3d} "
492 logger.info(row_str)
494 # Sort field IDs by row (y) then column (x)
495 # Use raster pattern: all rows go left-to-right in a consistent pattern
496 sorted_field_ids = []
497 for y_idx in range(len(y_coords)):
498 row_fields = []
499 # All rows go left to right in a raster pattern
500 x_range = range(len(x_coords))
502 for x_idx in x_range:
503 if (x_idx, y_idx) in grid: 503 ↛ 502line 503 didn't jump to line 502 because the condition on line 503 was always true
504 row_fields.append(grid[(x_idx, y_idx)])
505 sorted_field_ids.extend(row_fields)
507 return sorted_field_ids
509 def get_field_id_mapping(self) -> Dict[int, int]:
510 """
511 Generate a mapping from original field IDs to new field IDs based on position data.
513 Returns:
514 dict: Mapping of original field IDs to new field IDs
515 """
516 # Get field positions
517 field_positions = self.get_field_positions()
519 # Sort fields by position
520 sorted_field_ids = self.sort_fields_by_position(field_positions)
522 # Create mapping from original to new field IDs
523 return {field_id: i + 1 for i, field_id in enumerate(sorted_field_ids)}
525 def remap_field_id(self, field_id: int, mapping: Optional[Dict[int, int]] = None) -> int:
526 """
527 Remap a field ID using the position-based mapping.
529 Args:
530 field_id: Original field ID
531 mapping: Mapping to use. If None, generates a new mapping.
533 Returns:
534 int: New field ID
536 Raises:
537 OperaPhenixXmlContentError: If field_id is not found in the mapping
538 """
539 if mapping is None:
540 mapping = self.get_field_id_mapping()
542 if field_id not in mapping:
543 raise OperaPhenixXmlContentError(f"Field ID {field_id} not found in remapping dictionary")
544 return mapping[field_id]