Coverage for openhcs/microscopes/imagexpress.py: 70.5%

285 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2ImageXpress microscope implementations for openhcs. 

3 

4This module provides concrete implementations of FilenameParser and MetadataHandler 

5for ImageXpress microscopes. 

6""" 

7 

8import logging 

9import os 

10import re 

11from pathlib import Path 

12from typing import Any, Dict, List, Optional, Tuple, Union, Type 

13 

14import tifffile 

15 

16from openhcs.constants.constants import Backend 

17from openhcs.io.exceptions import MetadataNotFoundError 

18from openhcs.io.filemanager import FileManager 

19from openhcs.io.metadata_writer import AtomicMetadataWriter 

20from openhcs.microscopes.microscope_base import MicroscopeHandler 

21from openhcs.microscopes.microscope_interfaces import (FilenameParser, 

22 MetadataHandler) 

23 

24logger = logging.getLogger(__name__) 

25 

26class ImageXpressHandler(MicroscopeHandler): 

27 """ 

28 MicroscopeHandler implementation for Molecular Devices ImageXpress systems. 

29 

30 This handler binds the ImageXpress filename parser and metadata handler, 

31 enforcing semantic alignment between file layout parsing and metadata resolution. 

32 """ 

33 

34 # Explicit microscope type for proper registration 

35 _microscope_type = 'imagexpress' 

36 

37 # Class attribute for automatic metadata handler registration (set after class definition) 

38 _metadata_handler_class = None 

39 

40 def __init__(self, filemanager: FileManager, pattern_format: Optional[str] = None): 

41 # Initialize parser with filemanager, respecting its interface 

42 self.parser = ImageXpressFilenameParser(filemanager, pattern_format) 

43 self.metadata_handler = ImageXpressMetadataHandler(filemanager) 

44 super().__init__(parser=self.parser, metadata_handler=self.metadata_handler) 

45 

46 @property 

47 def root_dir(self) -> str: 

48 """ 

49 Root directory for ImageXpress virtual workspace preparation. 

50 

51 Returns "." (plate root) because ImageXpress TimePoint/ZStep folders 

52 are flattened starting from the plate root, and virtual paths have no prefix. 

53 """ 

54 return "." 

55 

56 @property 

57 def microscope_type(self) -> str: 

58 """Microscope type identifier (for interface enforcement only).""" 

59 return 'imagexpress' 

60 

61 @property 

62 def metadata_handler_class(self) -> Type[MetadataHandler]: 

63 """Metadata handler class (for interface enforcement only).""" 

64 return ImageXpressMetadataHandler 

65 

66 @property 

67 def compatible_backends(self) -> List[Backend]: 

68 """ 

69 ImageXpress is compatible with DISK backend only. 

70 

71 Legacy microscope format with standard file operations. 

72 """ 

73 return [Backend.DISK] 

74 

75 

76 

77 # Uses default workspace initialization from base class 

78 

79 def _build_virtual_mapping(self, plate_path: Path, filemanager: FileManager) -> Path: 

80 """ 

81 Build ImageXpress virtual workspace mapping using plate-relative paths. 

82 

83 Flattens TimePoint and Z-step folder structures virtually by building a mapping dict. 

84 

85 Args: 

86 plate_path: Path to plate directory 

87 filemanager: FileManager instance for file operations 

88 

89 Returns: 

90 Path to image directory 

91 """ 

92 plate_path = Path(plate_path) # Ensure Path object 

93 

94 logger.info(f"🔄 BUILDING VIRTUAL MAPPING: ImageXpress folder flattening for {plate_path}") 

95 

96 # Initialize mapping dict (PLATE-RELATIVE paths) 

97 workspace_mapping = {} 

98 

99 # Flatten TimePoint and ZStep folders virtually (starting from plate root) 

100 self._flatten_timepoints(plate_path, filemanager, workspace_mapping, plate_path) 

101 self._flatten_zsteps(plate_path, filemanager, workspace_mapping, plate_path) 

102 

103 logger.info(f"Built {len(workspace_mapping)} virtual path mappings for ImageXpress") 

104 

105 # Save virtual workspace mapping to metadata using root_dir as subdirectory key 

106 metadata_path = plate_path / "openhcs_metadata.json" 

107 writer = AtomicMetadataWriter() 

108 

109 writer.merge_subdirectory_metadata(metadata_path, { 

110 self.root_dir: { 

111 "workspace_mapping": workspace_mapping, # Plate-relative paths 

112 "available_backends": {"disk": True, "virtual_workspace": True} 

113 } 

114 }) 

115 

116 logger.info(f"✅ Saved virtual workspace mapping to {metadata_path}") 

117 

118 # Return the image directory 

119 return plate_path 

120 

121 def _flatten_zsteps(self, directory: Path, fm: FileManager, mapping_dict: Dict[str, str], plate_path: Path): 

122 """ 

123 Process Z-step folders virtually by building plate-relative mapping dict. 

124 

125 Args: 

126 directory: Path to directory that might contain Z-step folders 

127 fm: FileManager instance for file operations 

128 mapping_dict: Dict to populate with virtual → real mappings 

129 plate_path: Plate root path for computing relative paths 

130 """ 

131 zstep_pattern = re.compile(r"ZStep[_-]?(\d+)", re.IGNORECASE) 

132 

133 # Find and process Z-step folders 

134 self._flatten_indexed_folders( 

135 directory=directory, 

136 fm=fm, 

137 folder_pattern=zstep_pattern, 

138 component_name='z_index', 

139 folder_type="ZStep", 

140 mapping_dict=mapping_dict, 

141 plate_path=plate_path 

142 ) 

143 

144 def _process_files_in_directory(self, directory: Path, fm: FileManager): 

145 """ 

146 Process files directly in a directory to ensure complete metadata. 

147 

148 This handles files that are not in Z-step folders but may be missing 

149 channel or z-index information. Similar to how Z-step processing adds 

150 z_index, this adds default values for missing components. 

151 

152 Args: 

153 directory: Path to directory containing image files 

154 fm: FileManager instance for file operations 

155 """ 

156 # List all image files in the directory 

157 img_files = fm.list_files(directory, Backend.DISK.value) 

158 

159 for img_file in img_files: 

160 # Skip if not a file 

161 if not fm.is_file(img_file, Backend.DISK.value): 

162 continue 

163 

164 # Get the filename 

165 img_file_name = img_file.name if isinstance(img_file, Path) else os.path.basename(str(img_file)) 

166 

167 # Parse the original filename to extract components 

168 components = self.parser.parse_filename(img_file_name) 

169 

170 if not components: 

171 continue 

172 

173 # Check if we need to add missing metadata 

174 needs_rebuild = False 

175 

176 # Add default channel if missing (like we do for z_index in Z-step processing) 

177 if components['channel'] is None: 

178 components['channel'] = 1 

179 needs_rebuild = True 

180 logger.debug("Added default channel=1 to file without channel info: %s", img_file_name) 

181 

182 # Add default z_index if missing (for 2D images) 

183 if components['z_index'] is None: 

184 components['z_index'] = 1 

185 needs_rebuild = True 

186 logger.debug("Added default z_index=1 to file without z_index info: %s", img_file_name) 

187 

188 # Only rebuild filename if we added missing components 

189 if needs_rebuild: 

190 # Construct new filename with complete metadata 

191 new_name = self.parser.construct_filename(**components) 

192 

193 # Only rename if the filename actually changed 

194 if new_name != img_file_name: 

195 new_path = directory / new_name 

196 

197 try: 

198 # Pass the backend parameter as required by Clause 306 

199 # Use replace_symlinks=True to allow overwriting existing symlinks 

200 fm.move(img_file, new_path, Backend.DISK.value, replace_symlinks=True) 

201 logger.debug("Rebuilt filename with complete metadata: %s -> %s", img_file_name, new_name) 

202 except FileExistsError as e: 

203 logger.error("Cannot rename %s to %s: %s", img_file, new_path, e) 

204 raise 

205 except Exception as e: 

206 logger.error("Error renaming %s to %s: %s", img_file, new_path, e) 

207 raise 

208 

209 def _flatten_timepoints(self, directory: Path, fm: FileManager, mapping_dict: Dict[str, str], plate_path: Path): 

210 """ 

211 Process TimePoint folders virtually by building plate-relative mapping dict. 

212 

213 Args: 

214 directory: Path to directory that might contain TimePoint folders 

215 fm: FileManager instance for file operations 

216 mapping_dict: Dict to populate with virtual → real mappings 

217 plate_path: Plate root path for computing relative paths 

218 """ 

219 timepoint_pattern = re.compile(r"TimePoint[_-]?(\d+)", re.IGNORECASE) 

220 

221 # First flatten Z-steps within each timepoint folder (if they exist) 

222 entries = fm.list_dir(directory, Backend.DISK.value) 

223 subdirs = [Path(directory) / entry for entry in entries 

224 if (Path(directory) / entry).is_dir()] 

225 

226 for subdir in subdirs: 

227 if timepoint_pattern.search(subdir.name): 227 ↛ 226line 227 didn't jump to line 226 because the condition on line 227 was always true

228 self._flatten_zsteps(subdir, fm, mapping_dict, plate_path) 

229 

230 # Then flatten timepoint folders themselves 

231 self._flatten_indexed_folders( 

232 directory=directory, 

233 fm=fm, 

234 folder_pattern=timepoint_pattern, 

235 component_name='timepoint', 

236 folder_type="TimePoint", 

237 mapping_dict=mapping_dict, 

238 plate_path=plate_path 

239 ) 

240 

241 def _flatten_indexed_folders(self, directory: Path, fm: FileManager, 

242 folder_pattern: re.Pattern, component_name: str, 

243 folder_type: str, mapping_dict: Dict[str, str], plate_path: Path): 

244 """ 

245 Generic helper to flatten indexed folders virtually (TimePoint_N, ZStep_M, etc.). 

246 

247 Builds plate-relative mapping dict instead of moving files. 

248 

249 Args: 

250 directory: Path to directory that might contain indexed folders 

251 fm: FileManager instance for file operations 

252 folder_pattern: Regex pattern to match folder names (must have one capture group for index) 

253 component_name: Component to update in metadata (e.g., 'z_index', 'timepoint') 

254 folder_type: Human-readable folder type name (for logging) 

255 mapping_dict: Dict to populate with virtual → real mappings 

256 plate_path: Plate root path for computing relative paths 

257 """ 

258 # List all subdirectories 

259 entries = fm.list_dir(directory, Backend.DISK.value) 

260 subdirs = [Path(directory) / entry for entry in entries 

261 if (Path(directory) / entry).is_dir()] 

262 

263 # Find indexed folders 

264 indexed_folders = [] 

265 for d in subdirs: 

266 match = folder_pattern.search(d.name) 

267 if match: 

268 index = int(match.group(1)) 

269 indexed_folders.append((index, d)) 

270 

271 if not indexed_folders: 

272 return 

273 

274 # Sort by index 

275 indexed_folders.sort(key=lambda x: x[0]) 

276 

277 logger.info(f"Found {len(indexed_folders)} {folder_type} folders. Building virtual mapping...") 

278 

279 # Process each folder 

280 for index, folder in indexed_folders: 

281 logger.debug(f"Processing {folder.name} ({folder_type}={index})") 

282 

283 # List all files in the folder 

284 img_files = fm.list_files(str(folder), Backend.DISK.value) 

285 

286 for img_file in img_files: 

287 if not fm.is_file(img_file, Backend.DISK.value): 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 continue 

289 

290 filename = Path(img_file).name 

291 

292 # Parse existing filename 

293 metadata = self.parser.parse_filename(filename) 

294 if not metadata: 

295 continue 

296 

297 # Update the component 

298 metadata[component_name] = index 

299 

300 # Reconstruct filename 

301 new_filename = self.parser.construct_filename(**metadata) 

302 

303 # Build PLATE-RELATIVE virtual flattened path (at plate root, not in subdirectory) 

304 # This makes images appear at plate root in virtual workspace 

305 virtual_relative = new_filename 

306 

307 # Build PLATE-RELATIVE real path (in subfolder) 

308 real_relative = Path(img_file).relative_to(plate_path).as_posix() 

309 

310 # Add to mapping (both plate-relative) 

311 mapping_dict[virtual_relative] = real_relative 

312 logger.debug(f" Mapped: {virtual_relative}{real_relative}") 

313 

314 

315 

316 

317class ImageXpressFilenameParser(FilenameParser): 

318 """ 

319 Parser for ImageXpress microscope filenames. 

320 

321 Handles standard ImageXpress format filenames like: 

322 - A01_s001_w1.tif 

323 - A01_s1_w1_z1.tif 

324 """ 

325 

326 # Regular expression pattern for ImageXpress filenames 

327 # Supports: well, site, channel, z_index, timepoint 

328 # Also supports result files with suffixes like: A01_s001_w1_z001_t001_cell_counts_step7.json 

329 _pattern = re.compile(r'(?:.*?_)?([A-Z]\d+)(?:_s(\d+|\{[^\}]*\}))?(?:_w(\d+|\{[^\}]*\}))?(?:_z(\d+|\{[^\}]*\}))?(?:_t(\d+|\{[^\}]*\}))?(?:_.*?)?(\.\w+)?$') 

330 

331 def __init__(self, filemanager=None, pattern_format=None): 

332 """ 

333 Initialize the parser. 

334 

335 Args: 

336 filemanager: FileManager instance (not used, but required for interface compatibility) 

337 pattern_format: Optional pattern format (not used, but required for interface compatibility) 

338 """ 

339 super().__init__() # Initialize the generic parser interface 

340 

341 # These parameters are not used by this parser, but are required for interface compatibility 

342 self.filemanager = filemanager 

343 self.pattern_format = pattern_format 

344 

345 @classmethod 

346 def can_parse(cls, filename: Union[str, Any]) -> bool: 

347 """ 

348 Check if this parser can parse the given filename. 

349 

350 Args: 

351 filename: Filename to check (str or VirtualPath) 

352 

353 Returns: 

354 bool: True if this parser can parse the filename, False otherwise 

355 """ 

356 # For strings and other objects, convert to string and get basename 

357 # 🔒 Clause 17 — VFS Boundary Method 

358 # Use Path.name instead of os.path.basename for string operations 

359 basename = Path(str(filename)).name 

360 

361 # Check if the filename matches the ImageXpress pattern 

362 return bool(cls._pattern.match(basename)) 

363 

364 # 🔒 Clause 17 — VFS Boundary Method 

365 # This is a string operation that doesn't perform actual file I/O 

366 # but is needed for filename parsing during runtime. 

367 def parse_filename(self, filename: Union[str, Any]) -> Optional[Dict[str, Any]]: 

368 """ 

369 Parse an ImageXpress filename to extract all components, including extension. 

370 

371 Args: 

372 filename: Filename to parse (str or VirtualPath) 

373 

374 Returns: 

375 dict or None: Dictionary with extracted components or None if parsing fails 

376 """ 

377 

378 basename = Path(str(filename)).name 

379 

380 match = self._pattern.match(basename) 

381 

382 if match: 

383 well, site_str, channel_str, z_str, t_str, ext = match.groups() 

384 

385 #handle {} place holders 

386 parse_comp = lambda s: None if not s or '{' in s else int(s) 

387 site = parse_comp(site_str) 

388 channel = parse_comp(channel_str) 

389 z_index = parse_comp(z_str) 

390 timepoint = parse_comp(t_str) 

391 

392 # Use the parsed components in the result 

393 result = { 

394 'well': well, 

395 'site': site, 

396 'channel': channel, 

397 'z_index': z_index, 

398 'timepoint': timepoint, 

399 'extension': ext if ext else '.tif' # Default if somehow empty 

400 } 

401 

402 return result 

403 else: 

404 logger.debug("Could not parse ImageXpress filename: %s", filename) 

405 return None 

406 

407 def extract_component_coordinates(self, component_value: str) -> Tuple[str, str]: 

408 """ 

409 Extract coordinates from component identifier (typically well). 

410 

411 Args: 

412 component_value (str): Component identifier (e.g., 'A01', 'C04') 

413 

414 Returns: 

415 Tuple[str, str]: (row, column) where row is like 'A', 'C' and column is like '01', '04' 

416 

417 Raises: 

418 ValueError: If component format is invalid 

419 """ 

420 if not component_value or len(component_value) < 2: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 raise ValueError(f"Invalid component format: {component_value}") 

422 

423 # ImageXpress format: A01, B02, C04, etc. 

424 row = component_value[0] 

425 col = component_value[1:] 

426 

427 if not row.isalpha() or not col.isdigit(): 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 raise ValueError(f"Invalid ImageXpress component format: {component_value}. Expected format like 'A01', 'C04'") 

429 

430 return row, col 

431 

432 def construct_filename(self, extension: str = '.tif', site_padding: int = 3, z_padding: int = 3, timepoint_padding: int = 3, **component_values) -> str: 

433 """ 

434 Construct an ImageXpress filename from components. 

435 

436 This method now uses **kwargs to accept any component values dynamically, 

437 making it compatible with the generic parser interface. 

438 

439 Args: 

440 extension (str, optional): File extension (default: '.tif') 

441 site_padding (int, optional): Width to pad site numbers to (default: 3) 

442 z_padding (int, optional): Width to pad Z-index numbers to (default: 3) 

443 timepoint_padding (int, optional): Width to pad timepoint numbers to (default: 3) 

444 **component_values: Component values as keyword arguments. 

445 Expected keys: well, site, channel, z_index, timepoint 

446 

447 Returns: 

448 str: Constructed filename 

449 """ 

450 # Extract components from kwargs 

451 well = component_values.get('well') 

452 site = component_values.get('site') 

453 channel = component_values.get('channel') 

454 z_index = component_values.get('z_index') 

455 timepoint = component_values.get('timepoint') 

456 

457 if not well: 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true

458 raise ValueError("Well ID cannot be empty or None.") 

459 

460 # Default timepoint to 1 if not provided (like z_index) 

461 if timepoint is None: 

462 timepoint = 1 

463 

464 parts = [well] 

465 

466 if site is not None: 466 ↛ 472line 466 didn't jump to line 472 because the condition on line 466 was always true

467 if isinstance(site, str): 

468 parts.append(f"_s{site}") 

469 else: 

470 parts.append(f"_s{site:0{site_padding}d}") 

471 

472 if channel is not None: 472 ↛ 475line 472 didn't jump to line 475 because the condition on line 472 was always true

473 parts.append(f"_w{channel}") 

474 

475 if z_index is not None: 

476 if isinstance(z_index, str): 

477 parts.append(f"_z{z_index}") 

478 else: 

479 parts.append(f"_z{z_index:0{z_padding}d}") 

480 

481 # Always add timepoint (like z_index) 

482 if isinstance(timepoint, str): 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true

483 parts.append(f"_t{timepoint}") 

484 else: 

485 parts.append(f"_t{timepoint:0{timepoint_padding}d}") 

486 

487 base_name = "".join(parts) 

488 return f"{base_name}{extension}" 

489 

490 

491class ImageXpressMetadataHandler(MetadataHandler): 

492 """ 

493 Metadata handler for ImageXpress microscopes. 

494 

495 Handles finding and parsing HTD files for ImageXpress microscopes. 

496 Inherits fallback values from MetadataHandler ABC. 

497 """ 

498 

499 def __init__(self, filemanager: FileManager): 

500 """ 

501 Initialize the metadata handler. 

502 

503 Args: 

504 filemanager: FileManager instance for file operations. 

505 """ 

506 super().__init__() # Call parent's __init__ without parameters 

507 self.filemanager = filemanager # Store filemanager as an instance attribute 

508 

509 def find_metadata_file(self, plate_path: Union[str, Path], 

510 context: Optional['ProcessingContext'] = None) -> Path: 

511 """ 

512 Find the HTD file for an ImageXpress plate. 

513 

514 Args: 

515 plate_path: Path to the plate folder 

516 context: Optional ProcessingContext (not used) 

517 

518 Returns: 

519 Path to the HTD file 

520 

521 Raises: 

522 MetadataNotFoundError: If no HTD file is found 

523 TypeError: If plate_path is not a valid path type 

524 """ 

525 # Ensure plate_path is a Path object 

526 if isinstance(plate_path, str): 526 ↛ 527line 526 didn't jump to line 527 because the condition on line 526 was never true

527 plate_path = Path(plate_path) 

528 elif not isinstance(plate_path, Path): 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 raise TypeError(f"Expected str or Path, got {type(plate_path).__name__}") 

530 

531 # Ensure the path exists 

532 if not plate_path.exists(): 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 raise FileNotFoundError(f"Plate path does not exist: {plate_path}") 

534 

535 # Use filemanager to list files 

536 # Pass the backend parameter as required by Clause 306 (Backend Positional Parameters) 

537 htd_files = self.filemanager.list_files(plate_path, Backend.DISK.value, pattern="*.HTD") 

538 if htd_files: 

539 for htd_file in htd_files: 539 ↛ 548line 539 didn't jump to line 548 because the loop on line 539 didn't complete

540 # Convert to Path if it's a string 

541 if isinstance(htd_file, str): 541 ↛ 544line 541 didn't jump to line 544 because the condition on line 541 was always true

542 htd_file = Path(htd_file) 

543 

544 if 'plate' in htd_file.name.lower(): 544 ↛ 539line 544 didn't jump to line 539 because the condition on line 544 was always true

545 return htd_file 

546 

547 # Return the first file 

548 first_file = htd_files[0] 

549 if isinstance(first_file, str): 

550 return Path(first_file) 

551 return first_file 

552 

553 # 🔒 Clause 65 — No Fallback Logic 

554 # Fail loudly if no HTD file is found 

555 raise MetadataNotFoundError("No HTD or metadata file found. ImageXpressHandler requires declared metadata.") 

556 

557 def get_grid_dimensions(self, plate_path: Union[str, Path], 

558 context: Optional['ProcessingContext'] = None) -> Tuple[int, int]: 

559 """ 

560 Get grid dimensions for stitching from HTD file. 

561 

562 Args: 

563 plate_path: Path to the plate folder 

564 context: Optional ProcessingContext (not used) 

565 

566 Returns: 

567 (grid_rows, grid_cols) - UPDATED: Now returns (rows, cols) for MIST compatibility 

568 

569 Raises: 

570 MetadataNotFoundError: If no HTD file is found 

571 ValueError: If grid dimensions cannot be determined from metadata 

572 """ 

573 htd_file = self.find_metadata_file(plate_path, context) 

574 

575 # Parse HTD file 

576 try: 

577 # HTD files are plain text, but may use different encodings 

578 # Try multiple encodings in order of likelihood 

579 encodings_to_try = ['utf-8', 'windows-1252', 'latin-1', 'cp1252', 'iso-8859-1'] 

580 htd_content = None 

581 

582 for encoding in encodings_to_try: 582 ↛ 592line 582 didn't jump to line 592 because the loop on line 582 didn't complete

583 try: 

584 with open(htd_file, 'r', encoding=encoding) as f: 

585 htd_content = f.read() 

586 logger.debug("Successfully read HTD file with encoding: %s", encoding) 

587 break 

588 except UnicodeDecodeError: 

589 logger.debug("Failed to read HTD file with encoding: %s", encoding) 

590 continue 

591 

592 if htd_content is None: 592 ↛ 593line 592 didn't jump to line 593 because the condition on line 592 was never true

593 raise ValueError(f"Could not read HTD file with any supported encoding: {encodings_to_try}") 

594 

595 # Extract grid dimensions - try multiple formats 

596 # First try the new format with "XSites" and "YSites" 

597 cols_match = re.search(r'"XSites", (\d+)', htd_content) 

598 rows_match = re.search(r'"YSites", (\d+)', htd_content) 

599 

600 # If not found, try the old format with SiteColumns and SiteRows 

601 if not (cols_match and rows_match): 601 ↛ 602line 601 didn't jump to line 602 because the condition on line 601 was never true

602 cols_match = re.search(r'SiteColumns=(\d+)', htd_content) 

603 rows_match = re.search(r'SiteRows=(\d+)', htd_content) 

604 

605 if cols_match and rows_match: 605 ↛ 614line 605 didn't jump to line 614 because the condition on line 605 was always true

606 grid_size_x = int(cols_match.group(1)) # cols from metadata 

607 grid_size_y = int(rows_match.group(1)) # rows from metadata 

608 logger.info("Using grid dimensions from HTD file: %dx%d (cols x rows)", grid_size_x, grid_size_y) 

609 # FIXED: Return (rows, cols) for MIST compatibility instead of (cols, rows) 

610 return grid_size_y, grid_size_x 

611 

612 # 🔒 Clause 65 — No Fallback Logic 

613 # Fail loudly if grid dimensions cannot be determined 

614 raise ValueError(f"Could not find grid dimensions in HTD file {htd_file}") 

615 except Exception as e: 

616 # 🔒 Clause 65 — No Fallback Logic 

617 # Fail loudly on any error 

618 raise ValueError(f"Error parsing HTD file {htd_file}: {e}") 

619 

620 def get_pixel_size(self, plate_path: Union[str, Path], 

621 context: Optional['ProcessingContext'] = None) -> float: 

622 """ 

623 Gets pixel size by reading TIFF tags from an image file via FileManager. 

624 

625 Args: 

626 plate_path: Path to the plate folder 

627 context: Optional ProcessingContext (not used) 

628 

629 Returns: 

630 Pixel size in micrometers 

631 

632 Raises: 

633 ValueError: If pixel size cannot be determined from metadata 

634 """ 

635 # This implementation requires: 

636 # 1. The backend used by filemanager supports listing image files. 

637 # 2. The backend allows direct reading of TIFF file tags. 

638 # 3. Images are in TIFF format. 

639 try: 

640 # Use filemanager to list potential image files 

641 # Pass the backend parameter as required by Clause 306 (Backend Positional Parameters) 

642 image_files = self.filemanager.list_image_files(plate_path, Backend.DISK.value, extensions={'.tif', '.tiff'}, recursive=True) 

643 if not image_files: 643 ↛ 646line 643 didn't jump to line 646 because the condition on line 643 was never true

644 # 🔒 Clause 65 — No Fallback Logic 

645 # Fail loudly if no image files are found 

646 raise ValueError(f"No TIFF images found in {plate_path} to read pixel size") 

647 

648 # Attempt to read tags from the first found image 

649 first_image_path = image_files[0] 

650 

651 # Convert to Path if it's a string 

652 if isinstance(first_image_path, str): 652 ↛ 654line 652 didn't jump to line 654 because the condition on line 652 was always true

653 first_image_path = Path(first_image_path) 

654 elif not isinstance(first_image_path, Path): 

655 raise TypeError(f"Expected str or Path, got {type(first_image_path).__name__}") 

656 

657 # Use the path with tifffile 

658 with tifffile.TiffFile(first_image_path) as tif: 

659 # Try to get ImageDescription tag 

660 if tif.pages[0].tags.get('ImageDescription'): 660 ↛ 678line 660 didn't jump to line 678

661 desc = tif.pages[0].tags['ImageDescription'].value 

662 # Look for spatial calibration using regex 

663 match = re.search(r'id="spatial-calibration-x"[^>]*value="([0-9.]+)"', desc) 

664 if match: 664 ↛ 665line 664 didn't jump to line 665 because the condition on line 664 was never true

665 logger.info("Found pixel size metadata %.3f in %s", 

666 float(match.group(1)), first_image_path) 

667 return float(match.group(1)) 

668 

669 # Alternative pattern for some formats 

670 match = re.search(r'Spatial Calibration: ([0-9.]+) [uµ]m', desc) 

671 if match: 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true

672 logger.info("Found pixel size metadata %.3f in %s", 

673 float(match.group(1)), first_image_path) 

674 return float(match.group(1)) 

675 

676 # 🔒 Clause 65 — No Fallback Logic 

677 # Fail loudly if pixel size cannot be determined 

678 raise ValueError(f"Could not find pixel size in image metadata for {plate_path}") 

679 

680 except Exception as e: 

681 # 🔒 Clause 65 — No Fallback Logic 

682 # Fail loudly on any error 

683 raise ValueError(f"Error getting pixel size from {plate_path}: {e}") 

684 

685 def get_channel_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]: 

686 """ 

687 Get channel key->name mapping from ImageXpress HTD file. 

688 

689 Args: 

690 plate_path: Path to the plate folder (str or Path) 

691 

692 Returns: 

693 Dict mapping channel IDs to channel names from metadata 

694 Example: {"1": "TL-20", "2": "DAPI", "3": "FITC", "4": "CY5"} 

695 """ 

696 try: 

697 # Find and parse HTD file 

698 htd_file = self.find_metadata_file(plate_path) 

699 

700 # Read HTD file content 

701 encodings_to_try = ['utf-8', 'windows-1252', 'latin-1', 'cp1252', 'iso-8859-1'] 

702 htd_content = None 

703 

704 for encoding in encodings_to_try: 704 ↛ 712line 704 didn't jump to line 712 because the loop on line 704 didn't complete

705 try: 

706 with open(htd_file, 'r', encoding=encoding) as f: 

707 htd_content = f.read() 

708 break 

709 except UnicodeDecodeError: 

710 continue 

711 

712 if htd_content is None: 712 ↛ 713line 712 didn't jump to line 713 because the condition on line 712 was never true

713 logger.debug("Could not read HTD file with any supported encoding") 

714 return None 

715 

716 # Extract channel information from WaveName entries 

717 channel_mapping = {} 

718 

719 # ImageXpress stores channel names as WaveName1, WaveName2, etc. 

720 wave_pattern = re.compile(r'"WaveName(\d+)", "([^"]*)"') 

721 matches = wave_pattern.findall(htd_content) 

722 

723 for wave_num, wave_name in matches: 

724 if wave_name: # Only add non-empty wave names 724 ↛ 723line 724 didn't jump to line 723 because the condition on line 724 was always true

725 channel_mapping[wave_num] = wave_name 

726 

727 return channel_mapping if channel_mapping else None 

728 

729 except Exception as e: 

730 logger.debug(f"Could not extract channel names from ImageXpress metadata: {e}") 

731 return None 

732 

733 def get_well_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]: 

734 """ 

735 Get well key→name mapping from ImageXpress metadata. 

736 

737 Args: 

738 plate_path: Path to the plate folder (str or Path) 

739 

740 Returns: 

741 None - ImageXpress doesn't provide rich well names in metadata 

742 """ 

743 return None 

744 

745 def get_site_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]: 

746 """ 

747 Get site key→name mapping from ImageXpress metadata. 

748 

749 Args: 

750 plate_path: Path to the plate folder (str or Path) 

751 

752 Returns: 

753 None - ImageXpress doesn't provide rich site names in metadata 

754 """ 

755 return None 

756 

757 def get_z_index_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]: 

758 """ 

759 Get z_index key→name mapping from ImageXpress metadata. 

760 

761 Args: 

762 plate_path: Path to the plate folder (str or Path) 

763 

764 Returns: 

765 None - ImageXpress doesn't provide rich z_index names in metadata 

766 """ 

767 return None 

768 

769 def get_timepoint_values(self, plate_path: Union[str, Path]) -> Optional[Dict[str, Optional[str]]]: 

770 """ 

771 Get timepoint key→name mapping from ImageXpress metadata. 

772 

773 Args: 

774 plate_path: Path to the plate folder (str or Path) 

775 

776 Returns: 

777 None - ImageXpress doesn't provide rich timepoint names in metadata 

778 """ 

779 return None 

780 

781 # Uses default get_image_files() implementation from MetadataHandler ABC 

782 

783 

784 

785 

786# Set metadata handler class after class definition for automatic registration 

787from openhcs.microscopes.microscope_base import register_metadata_handler 

788ImageXpressHandler._metadata_handler_class = ImageXpressMetadataHandler 

789register_metadata_handler(ImageXpressHandler, ImageXpressMetadataHandler)