Coverage for openhcs/microscopes/opera_phenix_xml_parser.py: 45.8%

258 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Opera Phenix XML parser for openhcs. 

3 

4This module provides a class for parsing Opera Phenix Index.xml files. 

5""" 

6 

7import logging 

8import re 

9import xml.etree.ElementTree as ET 

10from pathlib import Path 

11from typing import Any, Dict, Optional, Tuple, Union 

12 

13import numpy as np 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class OperaPhenixXmlError(Exception): 

19 """Base exception for Opera Phenix XML parsing errors.""" 

20 pass 

21 

22 

23class OperaPhenixXmlParseError(OperaPhenixXmlError): 

24 """Exception raised when parsing the XML file fails.""" 

25 pass 

26 

27 

28class OperaPhenixXmlContentError(OperaPhenixXmlError): 

29 """Exception raised when the XML content is invalid or missing required elements.""" 

30 pass 

31 

32 

33class OperaPhenixXmlParser: 

34 """Parser for Opera Phenix Index.xml files.""" 

35 

36 def __init__(self, xml_path: Union[str, Path]): 

37 """ 

38 Initialize the parser with the path to the Index.xml file. 

39 

40 Args: 

41 xml_path: Path to the Index.xml file (string or Path object) 

42 """ 

43 # Convert to Path object for filesystem operations 

44 if isinstance(xml_path, str): 

45 self.xml_path = Path(xml_path) 

46 else: 

47 self.xml_path = xml_path 

48 

49 # Ensure the path exists 

50 if not self.xml_path.exists(): 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true

51 raise FileNotFoundError(f"XML file does not exist: {self.xml_path}") 

52 

53 self.tree = None 

54 self.root = None 

55 self.namespace = "" 

56 self._parse_xml() 

57 

58 def _parse_xml(self): 

59 """ 

60 Parse the XML file and extract the namespace. 

61 

62 Raises: 

63 FileNotFoundError: If the XML file doesn't exist 

64 PermissionError: If there's no permission to read the file 

65 OperaPhenixXmlParseError: If the XML is malformed or cannot be parsed 

66 TypeError: If the XML path is not a string or Path object 

67 AttributeError: If the XML structure is unexpected 

68 ValueError: If there are issues with the XML content 

69 """ 

70 try: 

71 self.tree = ET.parse(self.xml_path) 

72 self.root = self.tree.getroot() 

73 

74 # Extract namespace from the root tag 

75 match = re.match(r'{.*}', self.root.tag) 

76 self.namespace = match.group(0) if match else "" 

77 

78 logger.info("Parsed Opera Phenix XML file: %s", self.xml_path) 

79 logger.debug("XML namespace: %s", self.namespace) 

80 except FileNotFoundError: 

81 logger.error("XML file not found: %s", self.xml_path) 

82 raise 

83 except PermissionError: 

84 logger.error("Permission denied when reading XML file: %s", self.xml_path) 

85 raise 

86 except ET.ParseError as e: 

87 logger.error("XML parse error in file %s: %s", self.xml_path, e) 

88 raise OperaPhenixXmlParseError(f"Failed to parse XML file {self.xml_path}: {e}") 

89 except re.error as e: 

90 logger.error("Regex error when extracting namespace from %s: %s", self.xml_path, e) 

91 raise OperaPhenixXmlParseError(f"Failed to extract namespace from XML file {self.xml_path}: {e}") 

92 except TypeError as e: 

93 logger.error("Type error when parsing XML file %s: %s", self.xml_path, e) 

94 raise TypeError(f"Invalid type for XML path: {e}") 

95 except AttributeError as e: 

96 logger.error("Attribute error when parsing XML file %s: %s", self.xml_path, e) 

97 raise OperaPhenixXmlParseError(f"Unexpected XML structure in file {self.xml_path}: {e}") 

98 except ValueError as e: 

99 logger.error("Value error when parsing XML file %s: %s", self.xml_path, e) 

100 raise OperaPhenixXmlParseError(f"Invalid value in XML file {self.xml_path}: {e}") 

101 

102 def get_plate_info(self) -> Dict[str, Any]: 

103 """ 

104 Extract plate information from the XML. 

105 

106 Returns: 

107 Dict containing plate information 

108 

109 Raises: 

110 OperaPhenixXmlParseError: If XML is not parsed 

111 OperaPhenixXmlContentError: If Plate element is missing or required elements are missing 

112 """ 

113 if self.root is None: 

114 raise OperaPhenixXmlParseError("XML not parsed, cannot retrieve plate information") 

115 

116 plate_elem = self.root.find(f".//{self.namespace}Plate") 

117 if plate_elem is None: 

118 raise OperaPhenixXmlContentError("No Plate element found in XML") 

119 

120 plate_rows_text = self._get_element_text(plate_elem, 'PlateRows') 

121 plate_columns_text = self._get_element_text(plate_elem, 'PlateColumns') 

122 

123 if plate_rows_text is None: 

124 raise OperaPhenixXmlContentError("PlateRows element missing or empty in XML") 

125 if plate_columns_text is None: 

126 raise OperaPhenixXmlContentError("PlateColumns element missing or empty in XML") 

127 

128 plate_info = { 

129 'plate_id': self._get_element_text(plate_elem, 'PlateID'), 

130 'measurement_id': self._get_element_text(plate_elem, 'MeasurementID'), 

131 'plate_type': self._get_element_text(plate_elem, 'PlateTypeName'), 

132 'rows': int(plate_rows_text), 

133 'columns': int(plate_columns_text), 

134 } 

135 

136 # Get well IDs 

137 well_elems = plate_elem.findall(f"{self.namespace}Well") 

138 plate_info['wells'] = [well.get('id') for well in well_elems if well.get('id')] 

139 

140 logger.debug("Plate info: %s", plate_info) 

141 return plate_info 

142 

143 def get_grid_size(self) -> Tuple[int, int]: 

144 """ 

145 Determine the grid size (number of fields per well) by analyzing image positions. 

146 

147 This method analyzes the positions of images for a single well, channel, and plane 

148 to determine the grid dimensions. 

149 

150 Returns: 

151 Tuple of (grid_size_x, grid_size_y) - NOTE: Still returns (cols, rows) format 

152 The calling handler will swap this to (rows, cols) for MIST compatibility 

153 

154 Raises: 

155 OperaPhenixXmlParseError: If XML is not parsed 

156 OperaPhenixXmlContentError: If no Image elements are found or grid size cannot be determined 

157 """ 

158 if self.root is None: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true

159 raise OperaPhenixXmlParseError("XML not parsed, cannot determine grid size") 

160 

161 # Get all image elements 

162 image_elements = self.root.findall(f".//{self.namespace}Image") 

163 

164 if not image_elements: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 raise OperaPhenixXmlContentError("No Image elements found in XML") 

166 

167 # Group images by well (Row+Col), channel, and plane 

168 # We'll use the first group with multiple fields to determine grid size 

169 image_groups = {} 

170 

171 for image in image_elements: 

172 # Extract well, channel, and plane information 

173 row_elem = image.find(f"{self.namespace}Row") 

174 col_elem = image.find(f"{self.namespace}Col") 

175 channel_elem = image.find(f"{self.namespace}ChannelID") 

176 plane_elem = image.find(f"{self.namespace}PlaneID") 

177 

178 if (row_elem is not None and row_elem.text and 

179 col_elem is not None and col_elem.text and 

180 channel_elem is not None and channel_elem.text and 

181 plane_elem is not None and plane_elem.text): 

182 

183 # Create a key for grouping 

184 group_key = f"R{row_elem.text}C{col_elem.text}_CH{channel_elem.text}_P{plane_elem.text}" 

185 

186 # Extract position information 

187 pos_x_elem = image.find(f"{self.namespace}PositionX") 

188 pos_y_elem = image.find(f"{self.namespace}PositionY") 

189 field_elem = image.find(f"{self.namespace}FieldID") 

190 

191 if (pos_x_elem is not None and pos_x_elem.text and 191 ↛ 171line 191 didn't jump to line 171 because the condition on line 191 was always true

192 pos_y_elem is not None and pos_y_elem.text and 

193 field_elem is not None and field_elem.text): 

194 

195 try: 

196 # Parse position values 

197 x_value = float(pos_x_elem.text) 

198 y_value = float(pos_y_elem.text) 

199 field_id = int(field_elem.text) 

200 

201 # Add to group 

202 if group_key not in image_groups: 

203 image_groups[group_key] = [] 

204 

205 image_groups[group_key].append({ 

206 'field_id': field_id, 

207 'pos_x': x_value, 

208 'pos_y': y_value, 

209 'pos_x_unit': pos_x_elem.get('Unit', ''), 

210 'pos_y_unit': pos_y_elem.get('Unit', '') 

211 }) 

212 except ValueError as e: 

213 logger.warning("Could not parse position values (invalid number format) for image in group %s: %s", group_key, e) 

214 except TypeError as e: 

215 logger.warning("Could not parse position values (wrong type) for image in group %s: %s", group_key, e) 

216 

217 # Find the first group with multiple fields 

218 for group_key, images in image_groups.items(): 218 ↛ 262line 218 didn't jump to line 262 because the loop on line 218 didn't complete

219 if len(images) > 1: 219 ↛ 218line 219 didn't jump to line 218 because the condition on line 219 was always true

220 logger.debug("Using image group %s with %d fields to determine grid size", group_key, len(images)) 

221 

222 # Extract unique X and Y positions 

223 # Use a small epsilon for floating point comparison 

224 epsilon = 1e-10 

225 x_positions = [img['pos_x'] for img in images] 

226 y_positions = [img['pos_y'] for img in images] 

227 

228 # Use numpy to find unique positions 

229 unique_x = np.unique(np.round(np.array(x_positions) / epsilon) * epsilon) 

230 unique_y = np.unique(np.round(np.array(y_positions) / epsilon) * epsilon) 

231 

232 # Count unique positions 

233 num_x_positions = len(unique_x) 

234 num_y_positions = len(unique_y) 

235 

236 # If we have a reasonable number of positions, use them as grid dimensions 

237 if num_x_positions > 0 and num_y_positions > 0: 237 ↛ 242line 237 didn't jump to line 242 because the condition on line 237 was always true

238 logger.info("Determined grid size from positions: %dx%d", num_x_positions, num_y_positions) 

239 return (num_x_positions, num_y_positions) 

240 

241 # Alternative approach: try to infer grid size from field IDs 

242 if len(images) > 1: 

243 # Sort images by field ID 

244 sorted_images = sorted(images, key=lambda x: x['field_id']) 

245 max_field_id = sorted_images[-1]['field_id'] 

246 

247 # Try to determine if it's a square grid 

248 grid_size = int(np.sqrt(max_field_id) + 0.5) # Round to nearest integer 

249 

250 if grid_size ** 2 == max_field_id: 

251 logger.info("Determined square grid size from field IDs: %dx%d", grid_size, grid_size) 

252 return (grid_size, grid_size) 

253 

254 # If not a perfect square, try to find factors 

255 for i in range(1, int(np.sqrt(max_field_id)) + 1): 

256 if max_field_id % i == 0: 

257 j = max_field_id // i 

258 logger.info("Determined grid size from field IDs: %dx%d", i, j) 

259 return (i, j) 

260 

261 # If we couldn't determine grid size, raise an error 

262 raise OperaPhenixXmlContentError("Could not determine grid size from XML data") 

263 

264 def get_pixel_size(self) -> float: 

265 """ 

266 Extract pixel size from the XML. 

267 

268 The pixel size is stored in ImageResolutionX/Y elements with Unit="m". 

269 

270 Returns: 

271 Pixel size in micrometers (μm) 

272 

273 Raises: 

274 OperaPhenixXmlParseError: If XML is not parsed 

275 OperaPhenixXmlContentError: If pixel size cannot be determined or parsed 

276 """ 

277 if self.root is None: 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true

278 raise OperaPhenixXmlParseError("XML not parsed, cannot determine pixel size") 

279 

280 # Try to find ImageResolutionX element 

281 resolution_x = self.root.find(f".//{self.namespace}ImageResolutionX") 

282 if resolution_x is not None and resolution_x.text: 282 ↛ 296line 282 didn't jump to line 296 because the condition on line 282 was always true

283 try: 

284 # Convert from meters to micrometers 

285 pixel_size = float(resolution_x.text) * 1e6 

286 logger.info("Found pixel size from ImageResolutionX: %.4f μm", pixel_size) 

287 return pixel_size 

288 except ValueError as e: 

289 logger.warning("Could not parse pixel size from ImageResolutionX (invalid number format): %s", e) 

290 # Continue to try ImageResolutionY 

291 except TypeError as e: 

292 logger.warning("Could not parse pixel size from ImageResolutionX (wrong type): %s", e) 

293 # Continue to try ImageResolutionY 

294 

295 # If not found in ImageResolutionX, try ImageResolutionY 

296 resolution_y = self.root.find(f".//{self.namespace}ImageResolutionY") 

297 if resolution_y is not None and resolution_y.text: 

298 try: 

299 # Convert from meters to micrometers 

300 pixel_size = float(resolution_y.text) * 1e6 

301 logger.info("Found pixel size from ImageResolutionY: %.4f μm", pixel_size) 

302 return pixel_size 

303 except ValueError as e: 

304 logger.warning("Could not parse pixel size from ImageResolutionY (invalid number format): %s", e) 

305 # Fall through to the error case 

306 except TypeError as e: 

307 logger.warning("Could not parse pixel size from ImageResolutionY (wrong type): %s", e) 

308 # Fall through to the error case 

309 

310 # If not found, raise an error 

311 raise OperaPhenixXmlContentError("Pixel size not found or could not be parsed in XML") 

312 

313 

314 

315 def get_image_info(self) -> Dict[str, Dict[str, Any]]: 

316 """ 

317 Extract image information from the XML. 

318 

319 Returns: 

320 Dictionary mapping image IDs to dictionaries containing image information 

321 

322 Raises: 

323 OperaPhenixXmlParseError: If XML is not parsed 

324 OperaPhenixXmlContentError: If no Image elements are found or required elements are missing 

325 """ 

326 if self.root is None: 

327 raise OperaPhenixXmlParseError("XML not parsed, cannot retrieve image information") 

328 

329 # Look for Image elements 

330 image_elems = self.root.findall(f".//{self.namespace}Image[@Version]") 

331 if not image_elems: 

332 raise OperaPhenixXmlContentError("No Image elements with Version attribute found in XML") 

333 

334 image_info = {} 

335 for image in image_elems: 

336 image_id = self._get_element_text(image, 'id') 

337 if image_id: 

338 row_text = self._get_element_text(image, 'Row') 

339 col_text = self._get_element_text(image, 'Col') 

340 field_id_text = self._get_element_text(image, 'FieldID') 

341 plane_id_text = self._get_element_text(image, 'PlaneID') 

342 channel_id_text = self._get_element_text(image, 'ChannelID') 

343 

344 # Validate required fields 

345 if row_text is None: 

346 raise OperaPhenixXmlContentError(f"Row element missing or empty for image {image_id}") 

347 if col_text is None: 

348 raise OperaPhenixXmlContentError(f"Col element missing or empty for image {image_id}") 

349 if field_id_text is None: 

350 raise OperaPhenixXmlContentError(f"FieldID element missing or empty for image {image_id}") 

351 if plane_id_text is None: 

352 raise OperaPhenixXmlContentError(f"PlaneID element missing or empty for image {image_id}") 

353 if channel_id_text is None: 

354 raise OperaPhenixXmlContentError(f"ChannelID element missing or empty for image {image_id}") 

355 

356 image_data = { 

357 'url': self._get_element_text(image, 'URL'), 

358 'row': int(row_text), 

359 'col': int(col_text), 

360 'field_id': int(field_id_text), 

361 'plane_id': int(plane_id_text), 

362 'channel_id': int(channel_id_text), 

363 'position_x': self._get_element_text(image, 'PositionX'), 

364 'position_y': self._get_element_text(image, 'PositionY'), 

365 'position_z': self._get_element_text(image, 'PositionZ'), 

366 } 

367 image_info[image_id] = image_data 

368 

369 logger.debug("Found %d images in XML", len(image_info)) 

370 return image_info 

371 

372 

373 

374 def get_well_positions(self) -> Dict[str, Tuple[int, int]]: 

375 """ 

376 Extract well positions from the XML. 

377 

378 Returns: 

379 Dictionary mapping well IDs to (row, column) tuples 

380 

381 Raises: 

382 OperaPhenixXmlParseError: If XML is not parsed 

383 OperaPhenixXmlContentError: If no Well elements are found 

384 """ 

385 if self.root is None: 

386 raise OperaPhenixXmlParseError("XML not parsed, cannot retrieve well positions") 

387 

388 # Look for Well elements 

389 well_elems = self.root.findall(f".//{self.namespace}Wells/{self.namespace}Well") 

390 if not well_elems: 

391 raise OperaPhenixXmlContentError("No Well elements found in XML") 

392 

393 well_positions = {} 

394 for well in well_elems: 

395 well_id = self._get_element_text(well, 'id') 

396 row = self._get_element_text(well, 'Row') 

397 col = self._get_element_text(well, 'Col') 

398 

399 if well_id and row and col: 

400 well_positions[well_id] = (int(row), int(col)) 

401 

402 logger.debug("Well positions: %s", well_positions) 

403 return well_positions 

404 

405 def _get_element_text(self, parent_elem, tag_name: str) -> Optional[str]: 

406 """Helper method to get element text with namespace.""" 

407 elem = parent_elem.find(f"{self.namespace}{tag_name}") 

408 return elem.text if elem is not None else None 

409 

410 def _get_element_attribute(self, parent_elem, tag_name: str, attr_name: str) -> Optional[str]: 

411 """Helper method to get element attribute with namespace.""" 

412 elem = parent_elem.find(f"{self.namespace}{tag_name}") 

413 return elem.get(attr_name) if elem is not None else None 

414 

415 def get_field_positions(self) -> Dict[int, Tuple[float, float]]: 

416 """ 

417 Extract field IDs and their X,Y positions from the Index.xml file. 

418 

419 Returns: 

420 dict: Mapping of field IDs to (x, y) position tuples 

421 

422 Raises: 

423 OperaPhenixXmlParseError: If XML is not parsed 

424 """ 

425 if self.root is None: 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 raise OperaPhenixXmlParseError("XML not parsed, cannot extract field positions") 

427 

428 field_positions = {} 

429 

430 # Find all Image elements 

431 image_elems = self.root.findall(f".//{self.namespace}Image") 

432 

433 for image in image_elems: 

434 # Check if this element has FieldID, PositionX, and PositionY children 

435 field_id_elem = image.find(f"{self.namespace}FieldID") 

436 pos_x_elem = image.find(f"{self.namespace}PositionX") 

437 pos_y_elem = image.find(f"{self.namespace}PositionY") 

438 

439 if field_id_elem is not None and pos_x_elem is not None and pos_y_elem is not None: 

440 try: 

441 field_id = int(field_id_elem.text) 

442 pos_x = float(pos_x_elem.text) 

443 pos_y = float(pos_y_elem.text) 

444 

445 # Only add if we don't already have this field ID 

446 if field_id not in field_positions: 

447 field_positions[field_id] = (pos_x, pos_y) 

448 except ValueError as e: 

449 # Skip entries with invalid number format 

450 logger.debug("Skipping field with invalid number format: %s", e) 

451 continue 

452 except TypeError as e: 

453 # Skip entries with wrong type 

454 logger.debug("Skipping field with wrong type: %s", e) 

455 continue 

456 

457 return field_positions 

458 

459 def sort_fields_by_position(self, positions: Dict[int, Tuple[float, float]]) -> list: 

460 """ 

461 Sort fields based on their positions in a raster pattern starting from the top. 

462 All rows go left-to-right in a consistent raster scan pattern. 

463 

464 Args: 

465 positions: Dictionary mapping field IDs to (x, y) position tuples 

466 

467 Returns: 

468 list: Field IDs sorted in raster pattern order starting from the top 

469 """ 

470 if not positions: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 return [] 

472 

473 # Get all unique x and y coordinates 

474 x_coords = sorted(set(pos[0] for pos in positions.values())) 

475 y_coords = sorted(set(pos[1] for pos in positions.values()), reverse=True) # Reverse to get top row first 

476 

477 # Create a grid of field IDs 

478 grid = {} 

479 for field_id, (x, y) in positions.items(): 

480 # Find the closest x and y coordinates in our sorted lists 

481 x_idx = x_coords.index(x) 

482 y_idx = y_coords.index(y) # This will now map top row to index 0 

483 grid[(x_idx, y_idx)] = field_id 

484 

485 # Debug output to help diagnose field mapping issues 

486 logger.info("Field position grid:") 

487 for y_idx in range(len(y_coords)): 

488 row_str = "" 

489 for x_idx in range(len(x_coords)): 

490 field_id = grid.get((x_idx, y_idx), 0) 

491 row_str += f"{field_id:3d} " 

492 logger.info(row_str) 

493 

494 # Sort field IDs by row (y) then column (x) 

495 # Use raster pattern: all rows go left-to-right in a consistent pattern 

496 sorted_field_ids = [] 

497 for y_idx in range(len(y_coords)): 

498 row_fields = [] 

499 # All rows go left to right in a raster pattern 

500 x_range = range(len(x_coords)) 

501 

502 for x_idx in x_range: 

503 if (x_idx, y_idx) in grid: 503 ↛ 502line 503 didn't jump to line 502 because the condition on line 503 was always true

504 row_fields.append(grid[(x_idx, y_idx)]) 

505 sorted_field_ids.extend(row_fields) 

506 

507 return sorted_field_ids 

508 

509 def get_field_id_mapping(self) -> Dict[int, int]: 

510 """ 

511 Generate a mapping from original field IDs to new field IDs based on position data. 

512 

513 Returns: 

514 dict: Mapping of original field IDs to new field IDs 

515 """ 

516 # Get field positions 

517 field_positions = self.get_field_positions() 

518 

519 # Sort fields by position 

520 sorted_field_ids = self.sort_fields_by_position(field_positions) 

521 

522 # Create mapping from original to new field IDs 

523 return {field_id: i + 1 for i, field_id in enumerate(sorted_field_ids)} 

524 

525 def remap_field_id(self, field_id: int, mapping: Optional[Dict[int, int]] = None) -> int: 

526 """ 

527 Remap a field ID using the position-based mapping. 

528 

529 Args: 

530 field_id: Original field ID 

531 mapping: Mapping to use. If None, generates a new mapping. 

532 

533 Returns: 

534 int: New field ID 

535 

536 Raises: 

537 OperaPhenixXmlContentError: If field_id is not found in the mapping 

538 """ 

539 if mapping is None: 

540 mapping = self.get_field_id_mapping() 

541 

542 if field_id not in mapping: 

543 raise OperaPhenixXmlContentError(f"Field ID {field_id} not found in remapping dictionary") 

544 return mapping[field_id]