Coverage for openhcs/microscopes/openhcs.py: 71.0%
392 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2OpenHCS microscope handler implementation for openhcs.
4This module provides the OpenHCSMicroscopeHandler, which reads plates
5that have been pre-processed and standardized into the OpenHCS format.
6The metadata for such plates is defined in an 'openhcs_metadata.json' file.
7"""
9import json
10import logging
11from dataclasses import dataclass, asdict
12from pathlib import Path
13from typing import Any, Dict, List, Optional, Tuple, Union, Type
15from openhcs.constants.constants import Backend, GroupBy, AllComponents
16from openhcs.io.exceptions import MetadataNotFoundError
17from openhcs.io.filemanager import FileManager
18from openhcs.io.metadata_writer import AtomicMetadataWriter, MetadataWriteError, get_metadata_path, METADATA_CONFIG
19from openhcs.microscopes.microscope_interfaces import MetadataHandler
20logger = logging.getLogger(__name__)
23@dataclass(frozen=True)
24class OpenHCSMetadataFields:
25 """Centralized constants for OpenHCS metadata field names."""
26 # Core metadata structure - use centralized constants
27 SUBDIRECTORIES: str = METADATA_CONFIG.SUBDIRECTORIES_KEY
28 IMAGE_FILES: str = "image_files"
29 AVAILABLE_BACKENDS: str = METADATA_CONFIG.AVAILABLE_BACKENDS_KEY
31 # Required metadata fields
32 GRID_DIMENSIONS: str = "grid_dimensions"
33 PIXEL_SIZE: str = "pixel_size"
34 SOURCE_FILENAME_PARSER_NAME: str = "source_filename_parser_name"
35 MICROSCOPE_HANDLER_NAME: str = "microscope_handler_name"
37 # Optional metadata fields
38 CHANNELS: str = "channels"
39 WELLS: str = "wells"
40 SITES: str = "sites"
41 Z_INDEXES: str = "z_indexes"
42 TIMEPOINTS: str = "timepoints"
43 OBJECTIVES: str = "objectives"
44 ACQUISITION_DATETIME: str = "acquisition_datetime"
45 PLATE_NAME: str = "plate_name"
47 # Default values
48 DEFAULT_SUBDIRECTORY: str = "."
49 DEFAULT_SUBDIRECTORY_LEGACY: str = "images"
51 # Microscope type identifier
52 MICROSCOPE_TYPE: str = "openhcsdata"
55# Global instance for easy access
56FIELDS = OpenHCSMetadataFields()
58def _get_available_filename_parsers():
59 """
60 Lazy import of filename parsers to avoid circular imports.
62 Returns:
63 Dict mapping parser class names to parser classes
64 """
65 # Import parsers only when needed to avoid circular imports
66 from openhcs.microscopes.imagexpress import ImageXpressFilenameParser
67 from openhcs.microscopes.opera_phenix import OperaPhenixFilenameParser
69 return {
70 "ImageXpressFilenameParser": ImageXpressFilenameParser,
71 "OperaPhenixFilenameParser": OperaPhenixFilenameParser,
72 # Add other parsers to this dictionary as they are implemented/imported.
73 # Example: "MyOtherParser": MyOtherParser,
74 }
77class OpenHCSMetadataHandler(MetadataHandler):
78 """
79 Metadata handler for the OpenHCS pre-processed format.
81 This handler reads metadata from an 'openhcs_metadata.json' file
82 located in the root of the plate folder.
83 """
84 METADATA_FILENAME = METADATA_CONFIG.METADATA_FILENAME
86 def __init__(self, filemanager: FileManager):
87 """
88 Initialize the metadata handler.
90 Args:
91 filemanager: FileManager instance for file operations.
92 """
93 super().__init__()
94 self.filemanager = filemanager
95 self.atomic_writer = AtomicMetadataWriter()
96 self._metadata_cache: Optional[Dict[str, Any]] = None
97 self._plate_path_cache: Optional[Path] = None
99 def _load_metadata(self, plate_path: Union[str, Path]) -> Dict[str, Any]:
100 """
101 Loads the JSON metadata file if not already cached or if plate_path changed.
103 Args:
104 plate_path: Path to the plate folder.
106 Returns:
107 A dictionary containing the parsed JSON metadata.
109 Raises:
110 MetadataNotFoundError: If the metadata file cannot be found or parsed.
111 FileNotFoundError: If plate_path does not exist.
112 """
113 current_path = Path(plate_path)
114 if self._metadata_cache is not None and self._plate_path_cache == current_path:
115 return self._metadata_cache
117 metadata_file_path = self.find_metadata_file(current_path)
118 if not self.filemanager.exists(str(metadata_file_path), Backend.DISK.value): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 raise MetadataNotFoundError(f"Metadata file '{self.METADATA_FILENAME}' not found in {plate_path}")
121 try:
122 content = self.filemanager.load(str(metadata_file_path), Backend.DISK.value)
123 # Backend may return already-parsed dict (disk backend auto-parses JSON)
124 if isinstance(content, dict): 124 ↛ 128line 124 didn't jump to line 128 because the condition on line 124 was always true
125 metadata_dict = content
126 else:
127 # Otherwise parse raw bytes/string
128 metadata_dict = json.loads(content.decode('utf-8') if isinstance(content, bytes) else content)
130 # Handle subdirectory-keyed format
131 if subdirs := metadata_dict.get(FIELDS.SUBDIRECTORIES): 131 ↛ 149line 131 didn't jump to line 149 because the condition on line 131 was always true
132 if not subdirs: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 raise MetadataNotFoundError(f"Empty subdirectories in metadata file '{metadata_file_path}'")
135 # Use main subdirectory as base (marked with "main": true), fallback to first
136 main_subdir = next((data for data in subdirs.values() if data.get("main")), None)
137 if not main_subdir: 137 ↛ 139line 137 didn't jump to line 139 because the condition on line 137 was never true
138 # Fallback to first subdirectory if no main is marked
139 main_subdir = next(iter(subdirs.values()))
141 base_metadata = main_subdir.copy()
142 base_metadata[FIELDS.IMAGE_FILES] = [
143 file for subdir in subdirs.values()
144 for file in subdir.get(FIELDS.IMAGE_FILES, [])
145 ]
146 self._metadata_cache = base_metadata
147 else:
148 # Legacy format not supported - use migration script
149 raise MetadataNotFoundError(
150 f"Legacy metadata format detected in '{metadata_file_path}'. "
151 f"Please run the migration script: python scripts/migrate_legacy_metadata.py {current_path}"
152 )
154 self._plate_path_cache = current_path
155 return self._metadata_cache
157 except json.JSONDecodeError as e:
158 raise MetadataNotFoundError(f"Error decoding JSON from '{metadata_file_path}': {e}") from e
162 def determine_main_subdirectory(self, plate_path: Union[str, Path]) -> str:
163 """Determine main input subdirectory from metadata."""
164 metadata_dict = self._load_metadata_dict(plate_path)
165 subdirs = metadata_dict.get(FIELDS.SUBDIRECTORIES)
167 # Legacy format not supported - should have been caught by _load_metadata_dict
168 if not subdirs: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 raise MetadataNotFoundError(f"No subdirectories found in metadata for {plate_path}")
171 # Single subdirectory - use it
172 if len(subdirs) == 1:
173 return next(iter(subdirs.keys()))
175 # Multiple subdirectories - find main or fallback
176 main_subdir = next((name for name, data in subdirs.items() if data.get("main")), None)
177 if main_subdir: 177 ↛ 181line 177 didn't jump to line 181 because the condition on line 177 was always true
178 return main_subdir
180 # Fallback hierarchy: legacy default -> first available
181 if FIELDS.DEFAULT_SUBDIRECTORY_LEGACY in subdirs:
182 return FIELDS.DEFAULT_SUBDIRECTORY_LEGACY
183 else:
184 return next(iter(subdirs.keys()))
186 def _load_metadata_dict(self, plate_path: Union[str, Path]) -> Dict[str, Any]:
187 """Load and parse metadata JSON, fail-loud on errors."""
188 metadata_file_path = self.find_metadata_file(plate_path)
189 if not self.filemanager.exists(str(metadata_file_path), Backend.DISK.value): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 raise MetadataNotFoundError(f"Metadata file '{self.METADATA_FILENAME}' not found in {plate_path}")
192 try:
193 content = self.filemanager.load(str(metadata_file_path), Backend.DISK.value)
194 # Backend may return already-parsed dict (disk backend auto-parses JSON)
195 if isinstance(content, dict): 195 ↛ 198line 195 didn't jump to line 198 because the condition on line 195 was always true
196 return content
197 # Otherwise parse raw bytes/string
198 return json.loads(content.decode('utf-8') if isinstance(content, bytes) else content)
199 except json.JSONDecodeError as e:
200 raise MetadataNotFoundError(f"Error decoding JSON from '{metadata_file_path}': {e}") from e
202 def find_metadata_file(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Path]:
203 """Find the OpenHCS JSON metadata file."""
204 plate_p = Path(plate_path)
205 if not self.filemanager.is_dir(str(plate_p), Backend.DISK.value): 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true
206 return None
208 expected_file = plate_p / self.METADATA_FILENAME
209 if self.filemanager.exists(str(expected_file), Backend.DISK.value):
210 return expected_file
212 # Fallback: recursive search
213 try:
214 if found_files := self.filemanager.find_file_recursive(plate_p, self.METADATA_FILENAME, Backend.DISK.value): 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 if isinstance(found_files, list):
216 # Prioritize root location, then first found
217 return next((Path(f) for f in found_files if Path(f).parent == plate_p), Path(found_files[0]))
218 return Path(found_files)
219 except Exception as e:
220 logger.error(f"Error searching for {self.METADATA_FILENAME} in {plate_path}: {e}")
222 return None
225 def get_grid_dimensions(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Tuple[int, int]:
226 """Get grid dimensions from OpenHCS metadata."""
227 dims = self._load_metadata(plate_path).get(FIELDS.GRID_DIMENSIONS)
228 if not (isinstance(dims, list) and len(dims) == 2 and all(isinstance(d, int) for d in dims)): 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 raise ValueError(f"'{FIELDS.GRID_DIMENSIONS}' must be a list of two integers in {self.METADATA_FILENAME}")
230 return tuple(dims)
232 def get_pixel_size(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> float:
233 """Get pixel size from OpenHCS metadata."""
234 pixel_size = self._load_metadata(plate_path).get(FIELDS.PIXEL_SIZE)
235 if not isinstance(pixel_size, (float, int)): 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 raise ValueError(f"'{FIELDS.PIXEL_SIZE}' must be a number in {self.METADATA_FILENAME}")
237 return float(pixel_size)
239 def get_source_filename_parser_name(self, plate_path: Union[str, Path]) -> str:
240 """Get source filename parser name from OpenHCS metadata."""
241 parser_name = self._load_metadata(plate_path).get(FIELDS.SOURCE_FILENAME_PARSER_NAME)
242 if not (isinstance(parser_name, str) and parser_name): 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 raise ValueError(f"'{FIELDS.SOURCE_FILENAME_PARSER_NAME}' must be a non-empty string in {self.METADATA_FILENAME}")
244 return parser_name
246 # Uses default get_image_files() implementation from MetadataHandler ABC
247 # (prefers workspace_mapping keys, falls back to image_files list)
249 # Optional metadata getters
250 def _get_optional_metadata_dict(self, plate_path: Union[str, Path], key: str) -> Optional[Dict[str, str]]:
251 """Helper to get optional dictionary metadata."""
252 value = self._load_metadata(plate_path).get(key)
253 return {str(k): str(v) for k, v in value.items()} if isinstance(value, dict) else None
255 def get_channel_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]:
256 return self._get_optional_metadata_dict(plate_path, FIELDS.CHANNELS)
258 def get_well_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]:
259 return self._get_optional_metadata_dict(plate_path, FIELDS.WELLS)
261 def get_site_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]:
262 return self._get_optional_metadata_dict(plate_path, FIELDS.SITES)
264 def get_z_index_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]:
265 return self._get_optional_metadata_dict(plate_path, FIELDS.Z_INDEXES)
267 def get_timepoint_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Optional[str]]]:
268 return self._get_optional_metadata_dict(plate_path, FIELDS.TIMEPOINTS)
270 def get_objective_values(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[Dict[str, Any]]:
271 """Get objective lens information if available."""
272 return self._get_optional_metadata_dict(plate_path, FIELDS.OBJECTIVES)
274 def get_plate_acquisition_datetime(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[str]:
275 """Get plate acquisition datetime if available."""
276 return self._get_optional_metadata_str(plate_path, FIELDS.ACQUISITION_DATETIME)
278 def get_plate_name(self, plate_path: Union[str, Path], context: Optional[Any] = None) -> Optional[str]:
279 """Get plate name if available."""
280 return self._get_optional_metadata_str(plate_path, FIELDS.PLATE_NAME)
282 def _get_optional_metadata_str(self, plate_path: Union[str, Path], field: str) -> Optional[str]:
283 """Helper to get optional string metadata field."""
284 value = self._load_metadata(plate_path).get(field)
285 return value if isinstance(value, str) and value else None
287 def get_available_backends(self, input_dir: Union[str, Path]) -> Dict[str, bool]:
288 """
289 Get available storage backends for the input directory.
291 This method resolves the plate root from the input directory,
292 loads the OpenHCS metadata, and returns the available backends.
294 Args:
295 input_dir: Path to the input directory (may be plate root or subdirectory)
297 Returns:
298 Dictionary mapping backend names to availability (e.g., {"disk": True, "zarr": False})
300 Raises:
301 MetadataNotFoundError: If metadata file cannot be found or parsed
302 """
303 # Resolve plate root from input directory
304 plate_root = self._resolve_plate_root(input_dir)
306 # Load metadata using existing infrastructure
307 metadata = self._load_metadata(plate_root)
309 # Extract available backends, defaulting to empty dict if not present
310 available_backends = metadata.get(FIELDS.AVAILABLE_BACKENDS, {})
312 if not isinstance(available_backends, dict): 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true
313 logger.warning(f"Invalid available_backends format in metadata: {available_backends}")
314 return {}
316 return available_backends
320 def _resolve_plate_root(self, input_dir: Union[str, Path]) -> Path:
321 """
322 Resolve the plate root directory from an input directory.
324 The input directory may be the plate root itself or a subdirectory.
325 This method walks up the directory tree to find the directory containing
326 the OpenHCS metadata file.
328 Args:
329 input_dir: Path to resolve
331 Returns:
332 Path to the plate root directory
334 Raises:
335 MetadataNotFoundError: If no metadata file is found
336 """
337 current_path = Path(input_dir)
339 # Walk up the directory tree looking for metadata file
340 for path in [current_path] + list(current_path.parents): 340 ↛ 346line 340 didn't jump to line 346 because the loop on line 340 didn't complete
341 metadata_file = path / self.METADATA_FILENAME
342 if self.filemanager.exists(str(metadata_file), Backend.DISK.value): 342 ↛ 340line 342 didn't jump to line 340 because the condition on line 342 was always true
343 return path
345 # If not found, raise an error
346 raise MetadataNotFoundError(
347 f"Could not find {self.METADATA_FILENAME} in {input_dir} or any parent directory"
348 )
350 def update_available_backends(self, plate_path: Union[str, Path], available_backends: Dict[str, bool]) -> None:
351 """Update available storage backends in metadata and save to disk."""
352 metadata_file_path = get_metadata_path(plate_path)
354 try:
355 self.atomic_writer.update_available_backends(metadata_file_path, available_backends)
356 # Clear cache to force reload on next access
357 self._metadata_cache = None
358 self._plate_path_cache = None
359 logger.info(f"Updated available backends to {available_backends} in {metadata_file_path}")
360 except MetadataWriteError as e:
361 raise ValueError(f"Failed to update available backends: {e}") from e
364@dataclass(frozen=True)
365class OpenHCSMetadata:
366 """
367 Declarative OpenHCS metadata structure.
369 Fail-loud: All fields are required, no defaults, no fallbacks.
370 """
371 microscope_handler_name: str
372 source_filename_parser_name: str
373 grid_dimensions: List[int]
374 pixel_size: float
375 image_files: List[str]
376 channels: Optional[Dict[str, str]]
377 wells: Optional[Dict[str, str]]
378 sites: Optional[Dict[str, str]]
379 z_indexes: Optional[Dict[str, str]]
380 timepoints: Optional[Dict[str, str]]
381 available_backends: Dict[str, bool]
382 workspace_mapping: Optional[Dict[str, str]] = None # Plate-relative virtual → real path mapping
383 main: Optional[bool] = None # Indicates if this subdirectory is the primary/input subdirectory
384 results_dir: Optional[str] = None # Sibling directory containing analysis results for this subdirectory
387@dataclass(frozen=True)
388class SubdirectoryKeyedMetadata:
389 """
390 Subdirectory-keyed metadata structure for OpenHCS.
392 Organizes metadata by subdirectory to prevent conflicts when multiple
393 steps write to the same plate folder with different subdirectories.
395 Structure: {subdirectory_name: OpenHCSMetadata}
396 """
397 subdirectories: Dict[str, OpenHCSMetadata]
399 def get_subdirectory_metadata(self, sub_dir: str) -> Optional[OpenHCSMetadata]:
400 """Get metadata for specific subdirectory."""
401 return self.subdirectories.get(sub_dir)
403 def add_subdirectory_metadata(self, sub_dir: str, metadata: OpenHCSMetadata) -> 'SubdirectoryKeyedMetadata':
404 """Add or update metadata for subdirectory (immutable operation)."""
405 new_subdirs = {**self.subdirectories, sub_dir: metadata}
406 return SubdirectoryKeyedMetadata(subdirectories=new_subdirs)
408 @classmethod
409 def from_single_metadata(cls, sub_dir: str, metadata: OpenHCSMetadata) -> 'SubdirectoryKeyedMetadata':
410 """Create from single OpenHCSMetadata (migration helper)."""
411 return cls(subdirectories={sub_dir: metadata})
413 @classmethod
414 def from_legacy_dict(cls, legacy_dict: Dict[str, Any], default_sub_dir: str = FIELDS.DEFAULT_SUBDIRECTORY_LEGACY) -> 'SubdirectoryKeyedMetadata':
415 """Create from legacy single-subdirectory metadata dict."""
416 return cls.from_single_metadata(default_sub_dir, OpenHCSMetadata(**legacy_dict))
419class OpenHCSMetadataGenerator:
420 """
421 Generator for OpenHCS metadata files.
423 Handles creation of openhcs_metadata.json files for processed plates,
424 extracting information from processing context and output directories.
426 Design principle: Generate metadata that accurately reflects what exists on disk
427 after processing, not what was originally intended or what the source contained.
428 """
430 def __init__(self, filemanager: FileManager):
431 """
432 Initialize the metadata generator.
434 Args:
435 filemanager: FileManager instance for file operations
436 """
437 self.filemanager = filemanager
438 self.atomic_writer = AtomicMetadataWriter()
439 self.logger = logging.getLogger(__name__)
441 def create_metadata(
442 self,
443 context: 'ProcessingContext',
444 output_dir: str,
445 write_backend: str,
446 is_main: bool = False,
447 plate_root: str = None,
448 sub_dir: str = None,
449 results_dir: str = None,
450 skip_if_complete: bool = False,
451 allow_none_override: bool = False
452 ) -> None:
453 """Create or update subdirectory-keyed OpenHCS metadata file.
455 Args:
456 skip_if_complete: If True, skip update if metadata already complete (has channels)
457 allow_none_override: If True, None values override existing fields;
458 if False (default), None values are filtered out to preserve existing fields
459 """
460 plate_root_path = Path(plate_root)
461 metadata_path = get_metadata_path(plate_root_path)
463 # Check if metadata already complete (if requested)
464 if skip_if_complete and metadata_path.exists():
465 import json
466 with open(metadata_path, 'r') as f:
467 existing = json.load(f)
469 subdir_data = existing.get('subdirectories', {}).get(sub_dir, {})
470 if subdir_data.get('channels'):
471 self.logger.debug(f"Metadata for {sub_dir} already complete, skipping")
472 return
474 # Extract metadata from current state
475 current_metadata = self._extract_metadata_from_disk_state(context, output_dir, write_backend, is_main, sub_dir, results_dir)
476 metadata_dict = asdict(current_metadata)
478 # Filter None values unless override allowed
479 if not allow_none_override: 479 ↛ 482line 479 didn't jump to line 482 because the condition on line 479 was always true
480 metadata_dict = {k: v for k, v in metadata_dict.items() if v is not None}
482 self.atomic_writer.merge_subdirectory_metadata(metadata_path, {sub_dir: metadata_dict})
486 def _extract_metadata_from_disk_state(self, context: 'ProcessingContext', output_dir: str, write_backend: str, is_main: bool, sub_dir: str, results_dir: str = None) -> OpenHCSMetadata:
487 """Extract metadata reflecting current disk state after processing.
489 CRITICAL: Extracts component metadata (channels, wells, sites, z_indexes, timepoints)
490 by parsing actual filenames in output_dir, NOT from the original input metadata cache.
491 This ensures metadata accurately reflects what was actually written, not what was in the input.
493 For example, if processing filters to only channels 1-2, the metadata will show only those channels.
494 """
495 handler = context.microscope_handler
497 # metadata_cache is always set by create_context() - fail if not present
498 if not hasattr(context, 'metadata_cache'): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 raise RuntimeError("ProcessingContext missing metadata_cache - must be created via create_context()")
501 actual_files = self.filemanager.list_image_files(output_dir, write_backend)
502 relative_files = [f"{sub_dir}/{Path(f).name}" for f in actual_files]
504 # Calculate relative results directory path (relative to plate root)
505 # Example: "images_results" for images subdirectory
506 relative_results_dir = None
507 if results_dir:
508 results_path = Path(results_dir)
509 relative_results_dir = results_path.name # Just the directory name, not full path
511 # Extract grid_dimensions and pixel_size from input metadata
512 grid_dimensions = handler.metadata_handler._get_with_fallback('get_grid_dimensions', context.input_dir)
513 pixel_size = handler.metadata_handler._get_with_fallback('get_pixel_size', context.input_dir)
515 # If grid_dimensions is None (fallback), try to get it from existing metadata
516 if grid_dimensions is None: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 try:
518 from pathlib import Path as PathlibPath
519 plate_root = PathlibPath(context.plate_path)
520 metadata_path = get_metadata_path(plate_root)
521 if metadata_path.exists():
522 import json
523 with open(metadata_path, 'r') as f:
524 existing_metadata = json.load(f)
525 # Try to get grid_dimensions from any existing subdirectory
526 for existing_sub_dir, sub_metadata in existing_metadata.get('subdirectories', {}).items():
527 if sub_metadata.get('grid_dimensions'):
528 grid_dimensions = sub_metadata['grid_dimensions']
529 self.logger.debug(f"Preserved grid_dimensions from existing subdirectory {existing_sub_dir}: {grid_dimensions}")
530 break
531 except Exception as e:
532 self.logger.debug(f"Could not retrieve grid_dimensions from existing metadata: {e}")
534 # CRITICAL: Extract component metadata from actual output files by parsing filenames
535 # This ensures metadata reflects what was actually written, not the original input
536 component_metadata = self._extract_component_metadata_from_files(actual_files, handler.parser)
538 # Merge extracted component keys with display names from original metadata cache
539 # This preserves display names (e.g., "tl-20") while using actual output components
540 merged_metadata = self._merge_component_metadata(component_metadata, context.metadata_cache)
542 # CRITICAL: Use AllComponents enum for cache lookups (cache is keyed by AllComponents)
543 # GroupBy and AllComponents have same values but different hashes, so dict.get() fails with GroupBy
544 return OpenHCSMetadata(
545 microscope_handler_name=handler.microscope_type,
546 source_filename_parser_name=handler.parser.__class__.__name__,
547 grid_dimensions=grid_dimensions,
548 pixel_size=pixel_size,
549 image_files=relative_files,
550 channels=merged_metadata.get(AllComponents.CHANNEL),
551 wells=merged_metadata.get(AllComponents.WELL),
552 sites=merged_metadata.get(AllComponents.SITE),
553 z_indexes=merged_metadata.get(AllComponents.Z_INDEX),
554 timepoints=merged_metadata.get(AllComponents.TIMEPOINT),
555 available_backends={write_backend: True},
556 workspace_mapping=None, # Preserve existing - filtered out by create_metadata()
557 main=is_main if is_main else None,
558 results_dir=relative_results_dir
559 )
561 def _extract_component_metadata_from_files(self, file_paths: list, parser) -> Dict[AllComponents, Optional[Dict[str, Optional[str]]]]:
562 """
563 Extract component metadata by parsing actual filenames.
565 Filenames are architecturally guaranteed to be properly formed by the pipeline.
566 Parser.parse_filename() is guaranteed to succeed. No defensive checks.
568 Args:
569 file_paths: List of image file paths (guaranteed properly formed)
570 parser: FilenameParser instance
572 Returns:
573 Dict mapping AllComponents to component metadata dicts (key -> display_name)
574 """
575 result = {component: {} for component in AllComponents}
577 for file_path in file_paths:
578 filename = Path(file_path).name
579 parsed = parser.parse_filename(filename)
581 # Extract each component from the parsed filename
582 for component in AllComponents:
583 component_name = component.value
584 if component_name in parsed: 584 ↛ 582line 584 didn't jump to line 582 because the condition on line 584 was always true
585 component_value = str(parsed[component_name])
586 # Store with None as display name (will be merged with original metadata display names)
587 if component_value not in result[component]:
588 result[component][component_value] = None
590 # Convert empty dicts to None (no metadata for that component)
591 return {component: metadata_dict if metadata_dict else None for component, metadata_dict in result.items()}
593 def _merge_component_metadata(self, extracted: Dict[AllComponents, Optional[Dict[str, Optional[str]]]], cache: Dict[AllComponents, Optional[Dict[str, Optional[str]]]]) -> Dict[AllComponents, Optional[Dict[str, Optional[str]]]]:
594 """
595 Merge extracted component keys with display names from original metadata cache.
597 For each component:
598 - Use extracted keys (what actually exists in output)
599 - Preserve display names from cache (e.g., "tl-20" for channel "1")
600 - If no display name in cache, use None
602 Args:
603 extracted: Component metadata extracted from output filenames
604 cache: Original metadata cache with display names
606 Returns:
607 Merged metadata with actual components and preserved display names
608 """
609 result = {}
610 for component in AllComponents:
611 extracted_dict = extracted.get(component)
612 cache_dict = cache.get(component)
614 if extracted_dict is None:
615 result[component] = None
616 else:
617 # For each extracted key, get display name from cache if available
618 merged = {}
619 for key in extracted_dict.keys():
620 display_name = cache_dict.get(key) if cache_dict else None
621 merged[key] = display_name
623 result[component] = merged if merged else None
625 return result
629from openhcs.microscopes.microscope_base import MicroscopeHandler
630from openhcs.microscopes.microscope_interfaces import FilenameParser
633class OpenHCSMicroscopeHandler(MicroscopeHandler):
634 """
635 MicroscopeHandler for OpenHCS pre-processed format.
637 This handler reads plates that have been standardized, with metadata
638 provided in an 'openhcs_metadata.json' file. It dynamically loads the
639 appropriate FilenameParser based on the metadata.
640 """
642 # Class attributes for automatic registration
643 _microscope_type = FIELDS.MICROSCOPE_TYPE # Override automatic naming
644 _metadata_handler_class = None # Set after class definition
646 def __init__(self, filemanager: FileManager, pattern_format: Optional[str] = None):
647 """
648 Initialize the OpenHCSMicroscopeHandler.
650 Args:
651 filemanager: FileManager instance for file operations.
652 pattern_format: Optional pattern format string, passed to dynamically loaded parser.
653 """
654 self.filemanager = filemanager
655 self.metadata_handler = OpenHCSMetadataHandler(filemanager)
656 self._parser: Optional[FilenameParser] = None
657 self.plate_folder: Optional[Path] = None # Will be set by factory or post_workspace
658 self.pattern_format = pattern_format # Store for parser instantiation
660 # Initialize super with a None parser. The actual parser is loaded dynamically.
661 # The `parser` property will handle on-demand loading.
662 super().__init__(parser=None, metadata_handler=self.metadata_handler)
664 def _load_and_get_parser(self) -> FilenameParser:
665 """
666 Ensures the dynamic filename parser is loaded based on metadata from plate_folder.
667 This method requires self.plate_folder to be set.
668 """
669 if self._parser is None:
670 if self.plate_folder is None: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true
671 raise RuntimeError(
672 "OpenHCSHandler: plate_folder not set. Cannot determine and load the source filename parser."
673 )
675 parser_name = self.metadata_handler.get_source_filename_parser_name(self.plate_folder)
676 available_parsers = _get_available_filename_parsers()
677 ParserClass = available_parsers.get(parser_name)
679 if not ParserClass: 679 ↛ 680line 679 didn't jump to line 680 because the condition on line 679 was never true
680 raise ValueError(
681 f"Unknown or unsupported filename parser '{parser_name}' specified in "
682 f"{OpenHCSMetadataHandler.METADATA_FILENAME} for plate {self.plate_folder}. "
683 f"Available parsers: {list(available_parsers.keys())}"
684 )
686 try:
687 # Attempt to instantiate with filemanager and pattern_format
688 self._parser = ParserClass(filemanager=self.filemanager, pattern_format=self.pattern_format)
689 logger.info(f"OpenHCSHandler for plate {self.plate_folder} loaded source filename parser: {parser_name} with filemanager and pattern_format.")
690 except TypeError:
691 try:
692 # Attempt with filemanager only
693 self._parser = ParserClass(filemanager=self.filemanager)
694 logger.info(f"OpenHCSHandler for plate {self.plate_folder} loaded source filename parser: {parser_name} with filemanager.")
695 except TypeError:
696 # Attempt with default constructor
697 self._parser = ParserClass()
698 logger.info(f"OpenHCSHandler for plate {self.plate_folder} loaded source filename parser: {parser_name} with default constructor.")
700 return self._parser
702 @property
703 def parser(self) -> FilenameParser:
704 """
705 Provides the dynamically loaded FilenameParser.
706 The actual parser is determined from the 'openhcs_metadata.json' file.
707 Requires `self.plate_folder` to be set prior to first access.
708 """
709 # If plate_folder is not set here, it means it wasn't set by the factory
710 # nor by a method like post_workspace before parser access.
711 if self.plate_folder is None: 711 ↛ 713line 711 didn't jump to line 713 because the condition on line 711 was never true
712 # This situation should ideally be avoided by ensuring plate_folder is set appropriately.
713 raise RuntimeError("OpenHCSHandler: plate_folder must be set before accessing the parser property.")
715 return self._load_and_get_parser()
717 @parser.setter
718 def parser(self, value: Optional[FilenameParser]):
719 """
720 Allows setting the parser instance. Used by base class __init__ if it attempts to set it,
721 though our dynamic loading means we primarily manage it internally.
722 """
723 # If the base class __init__ tries to set it (e.g. to None as we passed),
724 # this setter will be called. We want our dynamic loading to take precedence.
725 # If an actual parser is passed, we could use it, but it would override dynamic logic.
726 # For now, if None is passed (from our super call), _parser remains None until dynamically loaded.
727 # If a specific parser is passed, it will be set.
728 if value is not None: 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true
729 logger.debug(f"OpenHCSMicroscopeHandler.parser being explicitly set to: {type(value).__name__}")
730 self._parser = value
733 @property
734 def root_dir(self) -> str:
735 """
736 Root directory for OpenHCS is determined from metadata.
738 OpenHCS plates can have multiple subdirectories (e.g., "zarr", "images", ".").
739 The root_dir is determined dynamically from the main subdirectory in metadata.
740 This property returns a placeholder - actual root_dir is determined at runtime.
741 """
742 # This is determined dynamically from metadata in initialize_workspace
743 # Return empty string as placeholder (not used for virtual workspace)
744 return ""
746 @property
747 def microscope_type(self) -> str:
748 """Microscope type identifier (for interface enforcement only)."""
749 return FIELDS.MICROSCOPE_TYPE
751 @property
752 def metadata_handler_class(self) -> Type[MetadataHandler]:
753 """Metadata handler class (for interface enforcement only)."""
754 return OpenHCSMetadataHandler
756 @property
757 def compatible_backends(self) -> List[Backend]:
758 """
759 OpenHCS is compatible with ZARR (preferred) and DISK (fallback) backends.
761 ZARR: Advanced chunked storage for large datasets (preferred)
762 DISK: Standard file operations for compatibility (fallback)
763 """
764 return [Backend.ZARR, Backend.DISK]
766 def get_available_backends(self, plate_path: Union[str, Path]) -> List[Backend]:
767 """
768 Get available storage backends for OpenHCS plates.
770 OpenHCS plates can support multiple backends based on what actually exists on disk.
771 This method checks the metadata to see what backends are actually available.
772 """
773 try:
774 # Get available backends from metadata as Dict[str, bool]
775 available_backends_dict = self.metadata_handler.get_available_backends(plate_path)
777 # Convert to List[Backend] by filtering compatible backends that are available
778 available_backends = []
779 for backend_enum in self.compatible_backends:
780 backend_name = backend_enum.value
781 if available_backends_dict.get(backend_name, False):
782 available_backends.append(backend_enum)
784 # If no backends are available from metadata, fall back to compatible backends
785 # This handles cases where metadata might not have the available_backends field
786 if not available_backends: 786 ↛ 787line 786 didn't jump to line 787 because the condition on line 786 was never true
787 logger.warning(f"No available backends found in metadata for {plate_path}, using all compatible backends")
788 return self.compatible_backends
790 return available_backends
792 except Exception as e:
793 logger.warning(f"Failed to get available backends from metadata for {plate_path}: {e}")
794 # Fall back to all compatible backends if metadata reading fails
795 return self.compatible_backends
797 def get_primary_backend(self, plate_path: Union[str, Path], filemanager: 'FileManager') -> str:
798 """
799 Get the primary backend name for OpenHCS plates.
801 Uses metadata-based detection to determine the primary backend.
802 Preference hierarchy: zarr > virtual_workspace > disk
803 Registers virtual_workspace backend if needed.
805 Args:
806 plate_path: Input directory (may be subdirectory like zarr/)
807 filemanager: FileManager instance for backend registration
808 """
809 # plate_folder must be set before calling this method
810 if self.plate_folder is None: 810 ↛ 811line 810 didn't jump to line 811 because the condition on line 810 was never true
811 raise RuntimeError(
812 "OpenHCSHandler.determine_backend_preference: plate_folder not set. "
813 "Call determine_input_dir() or post_workspace() first."
814 )
816 available_backends_dict = self.metadata_handler.get_available_backends(self.plate_folder)
818 # Preference hierarchy: zarr > virtual_workspace > disk
819 # 1. Prefer zarr if available (best performance for large datasets)
820 if 'zarr' in available_backends_dict and available_backends_dict['zarr']:
821 return 'zarr'
823 # 2. Prefer virtual_workspace if available (for plates with workspace_mapping)
824 if 'virtual_workspace' in available_backends_dict and available_backends_dict['virtual_workspace']: 824 ↛ 830line 824 didn't jump to line 830 because the condition on line 824 was always true
825 # Register virtual_workspace backend using centralized helper
826 self._register_virtual_workspace_backend(self.plate_folder, filemanager)
827 return 'virtual_workspace'
829 # 3. Fall back to first available backend (usually disk)
830 return next(iter(available_backends_dict.keys()))
832 def initialize_workspace(self, plate_path: Path, filemanager: FileManager) -> Path:
833 """
834 OpenHCS format doesn't need workspace - determines the correct input subdirectory from metadata.
836 Args:
837 plate_path: Path to the original plate directory
838 filemanager: FileManager instance for file operations
840 Returns:
841 Path to the main subdirectory containing input images (e.g., plate_path/images)
842 """
843 logger.info(f"OpenHCS format: Determining input subdirectory from metadata in {plate_path}")
845 # Set plate_folder for this handler
846 self.plate_folder = plate_path
847 logger.debug(f"OpenHCSHandler: plate_folder set to {self.plate_folder}")
849 # Determine the main subdirectory from metadata - fail-loud on errors
850 main_subdir = self.metadata_handler.determine_main_subdirectory(plate_path)
851 input_dir = plate_path / main_subdir
853 # Check if workspace_mapping exists in metadata - if so, register virtual workspace backend
854 metadata_dict = self.metadata_handler._load_metadata_dict(plate_path)
855 subdir_metadata = metadata_dict.get(FIELDS.SUBDIRECTORIES, {}).get(main_subdir, {})
857 if subdir_metadata.get('workspace_mapping'):
858 # Register virtual_workspace backend using centralized helper
859 self._register_virtual_workspace_backend(plate_path, filemanager)
861 # Verify the subdirectory exists - fail-loud if missing
862 if not filemanager.is_dir(str(input_dir), Backend.DISK.value): 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true
863 raise FileNotFoundError(
864 f"Main subdirectory '{main_subdir}' does not exist at {input_dir}. "
865 f"Expected directory structure: {plate_path}/{main_subdir}/"
866 )
868 logger.info(f"OpenHCS input directory determined: {input_dir} (subdirectory: {main_subdir})")
869 return input_dir
871 def _prepare_workspace(self, workspace_path: Path, filemanager: FileManager) -> Path:
872 """
873 OpenHCS format assumes the workspace is already prepared (e.g., flat structure).
874 This method is a no-op.
875 Args:
876 workspace_path: Path to the symlinked workspace.
877 filemanager: FileManager instance for file operations.
878 Returns:
879 The original workspace_path.
880 """
881 logger.info(f"OpenHCSHandler._prepare_workspace: No preparation needed for {workspace_path} as it's pre-processed.")
882 # Ensure plate_folder is set if this is the first relevant operation knowing the path
883 if self.plate_folder is None:
884 self.plate_folder = Path(workspace_path)
885 logger.debug(f"OpenHCSHandler: plate_folder set to {self.plate_folder} during _prepare_workspace.")
886 return workspace_path
888 def post_workspace(self, plate_path: Union[str, Path], filemanager: FileManager, skip_preparation: bool = False) -> Path:
889 """
890 Hook called after virtual workspace mapping creation.
891 For OpenHCS, this ensures the plate_folder is set (if not already) which allows
892 the parser to be loaded using this plate_path. It then calls the base
893 implementation which handles filename normalization using the loaded parser.
894 """
895 current_plate_folder = Path(plate_path)
896 if self.plate_folder is None:
897 logger.info(f"OpenHCSHandler.post_workspace: Setting plate_folder to {current_plate_folder}.")
898 self.plate_folder = current_plate_folder
899 self._parser = None # Reset parser if plate_folder changes or is set for the first time
900 elif self.plate_folder != current_plate_folder:
901 logger.warning(
902 f"OpenHCSHandler.post_workspace: plate_folder was {self.plate_folder}, "
903 f"now processing {current_plate_folder}. Re-initializing parser."
904 )
905 self.plate_folder = current_plate_folder
906 self._parser = None # Force re-initialization for the new path
908 # Accessing self.parser here will trigger _load_and_get_parser() if not already loaded
909 _ = self.parser
911 logger.info(f"OpenHCSHandler (plate: {self.plate_folder}): Files are expected to be pre-normalized. "
912 "Superclass post_workspace will run with the dynamically loaded parser.")
913 return super().post_workspace(plate_path, filemanager, skip_preparation)
915 # The following methods from MicroscopeHandler delegate to `self.parser`.
916 # The `parser` property will ensure the correct, dynamically loaded parser is used.
917 # No explicit override is needed for them unless special behavior for OpenHCS is required
918 # beyond what the dynamically loaded original parser provides.
919 # - parse_filename(self, filename: str)
920 # - construct_filename(self, well: str, ...)
921 # - auto_detect_patterns(self, folder_path: Union[str, Path], ...)
922 # - path_list_from_pattern(self, directory: Union[str, Path], ...)
924 # Metadata handling methods are delegated to `self.metadata_handler` by the base class.
925 # - find_metadata_file(self, plate_path: Union[str, Path])
926 # - get_grid_dimensions(self, plate_path: Union[str, Path])
927 # - get_pixel_size(self, plate_path: Union[str, Path])
928 # These will use our OpenHCSMetadataHandler correctly.
931# Set metadata handler class after class definition for automatic registration
932from openhcs.microscopes.microscope_base import register_metadata_handler
933OpenHCSMicroscopeHandler._metadata_handler_class = OpenHCSMetadataHandler
934register_metadata_handler(OpenHCSMicroscopeHandler, OpenHCSMetadataHandler)