Coverage for openhcs/io/omero_local.py: 7.4%
737 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1# openhcs/io/omero_local.py
2"""
3OMERO Local Storage Backend - Zero-copy server-side OMERO access.
5Reads directly from OMERO binary repository, saves results back to OMERO.
6"""
8import logging
9from dataclasses import dataclass
10from pathlib import Path
11from typing import Any, Dict, List, Optional, Set, Union, Tuple
12from collections import defaultdict
13from datetime import datetime
14import threading
16# Cross-platform file locking
17try:
18 import fcntl
19 FCNTL_AVAILABLE = True
20except ImportError:
21 import portalocker
22 FCNTL_AVAILABLE = False
24import numpy as np
26from openhcs.io.base import VirtualBackend
27from openhcs.constants.constants import FileFormat
29logger = logging.getLogger(__name__)
32class OMEROFileFormatRegistry:
33 """Registry for OMERO file format handlers (text files saved as FileAnnotations)."""
35 def __init__(self):
36 self._text_extensions: Set[str] = set()
37 self._mimetypes: Dict[str, str] = {}
39 def register_text_format(self, extensions: List[str], mimetype: str):
40 """Register a text format that should be saved as FileAnnotation."""
41 for ext in extensions:
42 ext = ext.lower()
43 self._text_extensions.add(ext)
44 self._mimetypes[ext] = mimetype
46 def is_text_format(self, ext: str) -> bool:
47 """Check if extension is registered as text format."""
48 return ext.lower() in self._text_extensions
50 def get_mimetype(self, ext: str) -> str:
51 """Get MIME type for extension."""
52 return self._mimetypes.get(ext.lower(), 'text/plain')
55@dataclass
56class ImageStructure:
57 """Metadata for a single OMERO image."""
58 image_id: int
59 sizeZ: int
60 sizeC: int
61 sizeT: int
62 sizeY: int
63 sizeX: int
66@dataclass
67class WellStructure:
68 """Metadata for a single well."""
69 sites: Dict[int, ImageStructure] # site_idx → ImageStructure
72@dataclass
73class PlateStructure:
74 """Lightweight metadata for entire plate."""
75 plate_id: int
76 parser_name: str
77 microscope_type: str
78 wells: Dict[str, WellStructure] # well_id → WellStructure
80 # Cached for quick access
81 all_well_ids: Set[str]
82 max_sites: int
83 max_z: int
84 max_c: int
85 max_t: int
88class OMEROLocalBackend(VirtualBackend):
89 """
90 Virtual backend for OMERO server-side execution.
92 Generates filenames on-demand from OMERO plate structure.
93 No real filesystem operations - all paths are virtual.
94 """
96 _backend_type = 'omero_local'
98 # Class-level lock dictionary for thread-safe well creation
99 _well_locks: Dict[str, threading.Lock] = {}
100 _well_locks_lock = threading.Lock() # Lock for the lock dictionary itself
102 def __init__(self, omero_data_dir: Optional[Path] = None, omero_conn=None):
103 try:
104 from omero.gateway import BlitzGateway
105 self._BlitzGateway = BlitzGateway
106 except ImportError:
107 raise ImportError("omero-py required: pip install omero-py")
109 if omero_data_dir:
110 omero_data_dir = Path(omero_data_dir)
111 if not omero_data_dir.exists():
112 raise ValueError(f"OMERO data directory not found: {omero_data_dir}")
114 self.omero_data_dir = omero_data_dir
115 # DO NOT store omero_conn - it contains unpicklable IcePy.Communicator
116 # Connection must be passed via kwargs or retrieved from global registry
117 self._initial_conn = omero_conn # Store temporarily for registration
119 # Store connection parameters for reconnection in worker processes
120 self._conn_params = None
121 if omero_conn:
122 try:
123 self._conn_params = {
124 'host': omero_conn.host,
125 'port': omero_conn.port,
126 'username': omero_conn.getUser().getName(),
127 # Password not available from connection object
128 }
129 except:
130 pass # Connection params not available
132 # Caches for virtual filesystem
133 self._plate_metadata: Dict[int, PlateStructure] = {}
134 self._parser_cache: Dict[int, Any] = {} # plate_id → parser instance
135 self._plate_name_cache: Dict[str, int] = {} # plate_name → plate_id
137 # File format registry
138 self.format_registry = OMEROFileFormatRegistry()
139 self._register_formats()
141 def _register_formats(self):
142 """Register supported text file formats for FileAnnotation storage."""
143 # JSON files
144 self.format_registry.register_text_format(
145 FileFormat.JSON.value,
146 'application/json'
147 )
149 # CSV files
150 self.format_registry.register_text_format(
151 FileFormat.CSV.value,
152 'text/csv'
153 )
155 # Text files
156 self.format_registry.register_text_format(
157 FileFormat.TEXT.value,
158 'text/plain'
159 )
161 def __getstate__(self):
162 """Exclude unpicklable connection from pickle."""
163 state = self.__dict__.copy()
164 # Remove unpicklable connection
165 state['_initial_conn'] = None
166 return state
168 def __setstate__(self, state):
169 """Restore state after unpickling."""
170 self.__dict__.update(state)
171 # Connection will be retrieved from global registry in worker process
173 def _get_connection(self, **kwargs):
174 """
175 Get OMERO connection from kwargs, instance, global registry, or create new one.
177 This method handles multiple scenarios:
178 1. Connection passed via kwargs (highest priority)
179 2. Connection stored in this instance
180 3. Connection from global registry backend
181 4. Create new connection using stored params (worker process)
183 This ensures the backend remains picklable for multiprocessing.
184 """
185 conn = kwargs.get('omero_conn')
186 if not conn and self._initial_conn:
187 conn = self._initial_conn
189 if not conn:
190 # Try to get from global registry
191 # This handles the case where orchestrator copies the registry
192 # but the copy's backend doesn't have the connection
193 try:
194 from openhcs.io.base import storage_registry
195 backend = storage_registry.get('omero_local')
196 if backend and backend is not self and hasattr(backend, '_initial_conn') and backend._initial_conn:
197 conn = backend._initial_conn
198 # Cache it in this instance too
199 self._initial_conn = conn
200 logger.debug("Retrieved OMERO connection from global registry backend")
201 except Exception as e:
202 logger.debug(f"Could not get connection from global registry: {e}")
204 if not conn and self._conn_params:
205 # Worker process or fresh instance: create new connection using stored params
206 logger.info(f"Creating new OMERO connection to {self._conn_params.get('host')}:{self._conn_params.get('port')}")
207 try:
208 # Get password from environment or use default
209 import os
210 password = os.getenv('OMERO_PASSWORD', 'openhcs')
212 conn = self._BlitzGateway(
213 self._conn_params['username'],
214 password,
215 host=self._conn_params['host'],
216 port=self._conn_params['port']
217 )
218 if not conn.connect():
219 raise ConnectionError(f"Failed to connect to OMERO at {self._conn_params['host']}:{self._conn_params['port']}")
221 # Python 3.11 compatibility: Ensure session is fully established
222 # by explicitly calling keepAlive() after connection
223 try:
224 conn.c.sf.keepAlive(None)
225 except Exception as e:
226 logger.warning(f"keepAlive() call failed (non-fatal): {e}")
228 # Cache the connection
229 self._initial_conn = conn
230 logger.info("Successfully connected to OMERO")
231 except Exception as e:
232 logger.error(f"Failed to create OMERO connection: {e}")
233 raise
235 if not conn:
236 raise ValueError(
237 "No OMERO connection available. "
238 "Pass omero_conn via kwargs, set in instance, ensure global registry has connection, "
239 "or provide connection params for auto-reconnection."
240 )
241 return conn
243 def _ensure_connection(self, **kwargs):
244 """Validate OMERO connection is available."""
245 self._get_connection(**kwargs)
247 def _get_parser_from_plate_metadata(self, plate) -> str:
248 """Get parser name from OMERO plate metadata."""
249 for ann in plate.listAnnotations():
250 if hasattr(ann, 'getNs') and ann.getNs() == "openhcs.metadata":
251 metadata = {kv[0]: kv[1] for kv in ann.getValue()}
252 parser_name = metadata.get("openhcs.parser")
253 if parser_name:
254 return parser_name
256 raise ValueError(f"Plate {plate.getId()} missing openhcs.parser metadata")
258 def _get_microscope_type_from_plate_metadata(self, plate) -> str:
259 """Get microscope type from OMERO plate metadata."""
260 for ann in plate.listAnnotations():
261 if hasattr(ann, 'getNs') and ann.getNs() == "openhcs.metadata":
262 metadata = {kv[0]: kv[1] for kv in ann.getValue()}
263 microscope_type = metadata.get("openhcs.microscope_type")
264 if microscope_type:
265 return microscope_type
267 raise ValueError(f"Plate {plate.getId()} missing openhcs.microscope_type metadata")
269 def _load_parser(self, parser_name: str):
270 """Dynamically load parser class by name."""
271 # Import parser classes
272 from openhcs.microscopes.imagexpress import ImageXpressFilenameParser
273 from openhcs.microscopes.opera_phenix import OperaPhenixFilenameParser
274 from openhcs.microscopes.omero import OMEROFilenameParser
276 parser_map = {
277 'ImageXpressFilenameParser': ImageXpressFilenameParser,
278 'OperaPhenixFilenameParser': OperaPhenixFilenameParser,
279 'OMEROFilenameParser': OMEROFilenameParser,
280 }
282 parser_class = parser_map.get(parser_name)
283 if not parser_class:
284 raise ValueError(f"Unknown parser: {parser_name}")
286 return parser_class()
288 def _load_plate_structure(self, plate_id: int, **kwargs) -> None:
289 """
290 Query OMERO once to build lightweight plate structure.
292 Args:
293 plate_id: OMERO Plate ID
294 **kwargs: Must include omero_conn
296 Raises:
297 ValueError: If plate not found or missing metadata
298 """
299 conn = self._get_connection(**kwargs)
301 # Query OMERO for plate
302 plate = conn.getObject("Plate", plate_id)
303 if not plate:
304 raise ValueError(f"OMERO Plate not found: {plate_id}")
306 # Get parser metadata
307 parser_name = self._get_parser_from_plate_metadata(plate)
308 microscope_type = self._get_microscope_type_from_plate_metadata(plate)
310 # Load parser (cache it)
311 if plate_id not in self._parser_cache:
312 self._parser_cache[plate_id] = self._load_parser(parser_name)
314 # Build structure
315 wells = {}
316 all_well_ids = set()
317 max_sites = 0
318 max_z = 0
319 max_c = 0
320 max_t = 0
322 for well in plate.listChildren():
323 # Handle both OMERO rtypes and plain Python types
324 row = well.row.val if hasattr(well.row, 'val') else well.row
325 col = well.column.val if hasattr(well.column, 'val') else well.column
326 well_id = f"{chr(ord('A') + row)}{col + 1:02d}"
327 all_well_ids.add(well_id)
329 sites = {}
330 for site_idx_0based, wellsample in enumerate(well.listChildren()):
331 image = wellsample.getImage()
332 # Use enumeration order as site index (0-based)
333 # Convert to 1-based for OpenHCS
334 site_idx = site_idx_0based + 1
336 image_struct = ImageStructure(
337 image_id=image.getId(),
338 sizeZ=image.getSizeZ(),
339 sizeC=image.getSizeC(),
340 sizeT=image.getSizeT(),
341 sizeY=image.getSizeY(),
342 sizeX=image.getSizeX()
343 )
344 sites[site_idx] = image_struct
346 # Track maximums
347 max_sites = max(max_sites, site_idx)
348 max_z = max(max_z, image.getSizeZ())
349 max_c = max(max_c, image.getSizeC())
350 max_t = max(max_t, image.getSizeT())
352 wells[well_id] = WellStructure(sites=sites)
354 # Store structure
355 self._plate_metadata[plate_id] = PlateStructure(
356 plate_id=plate_id,
357 parser_name=parser_name,
358 microscope_type=microscope_type,
359 wells=wells,
360 all_well_ids=all_well_ids,
361 max_sites=max_sites,
362 max_z=max_z,
363 max_c=max_c,
364 max_t=max_t
365 )
367 logger.info(f"Loaded plate structure for {plate_id}: "
368 f"{len(wells)} wells, {max_sites} sites, "
369 f"{max_z}Z × {max_c}C × {max_t}T")
371 def load(self, file_path: Union[str, Path], **kwargs) -> np.ndarray:
372 """
373 Load by parsing filename to extract coordinates, then lookup image.
375 Flow: path → extract plate_id → filename → parse → (well, site, z, c, t) → lookup structure → image_id → load plane
377 Args:
378 file_path: Full path including plate_id (e.g., "/omero/plate_59/A01_s001_w1_z001_t001.tif")
379 **kwargs: Additional backend-specific arguments (unused)
381 Returns:
382 2D numpy array (single z-plane, single channel, single timepoint)
383 """
384 # Extract plate_id from path using parent directory
385 # Path format: /omero/plate_59/A01_s001_w1_z001_t001.tif
386 path_obj = Path(file_path)
387 plate_dir = path_obj.parent # /omero/plate_59
389 # Extract plate_id from plate directory name
390 import re
391 plate_dir_name = plate_dir.name # "plate_59" or "plate_59_outputs"
392 match = re.match(r'plate_(\d+)', plate_dir_name)
393 if not match:
394 raise ValueError(f"Could not extract plate_id from path: {file_path}. Expected /omero/plate_<id>/filename format")
396 plate_id = int(match.group(1))
398 # Extract filename
399 filename = path_obj.name
401 # Ensure plate structure is loaded
402 if plate_id not in self._plate_metadata:
403 self._load_plate_structure(plate_id, **kwargs)
405 plate_struct = self._plate_metadata[plate_id]
406 parser = self._parser_cache[plate_id]
408 # Parse filename to extract components
409 parsed = parser.parse_filename(filename)
410 if not parsed:
411 raise ValueError(f"Cannot parse filename: {filename}")
413 well_id = parsed['well']
414 site_idx = parsed['site']
415 z_idx = parsed['z_index'] - 1 # Convert to 0-based
416 c_idx = parsed['channel'] - 1
417 t_idx = parsed['timepoint'] - 1
419 # Lookup image_id from structure
420 if well_id not in plate_struct.wells:
421 raise ValueError(f"Well {well_id} not found in plate {plate_id}")
423 well_struct = plate_struct.wells[well_id]
424 if site_idx not in well_struct.sites:
425 raise ValueError(f"Site {site_idx} not found in well {well_id}")
427 image_struct = well_struct.sites[site_idx]
429 # Validate coordinates
430 if z_idx >= image_struct.sizeZ:
431 raise ValueError(f"Z-index {z_idx} out of range (max: {image_struct.sizeZ})")
432 if c_idx >= image_struct.sizeC:
433 raise ValueError(f"Channel {c_idx} out of range (max: {image_struct.sizeC})")
434 if t_idx >= image_struct.sizeT:
435 raise ValueError(f"Timepoint {t_idx} out of range (max: {image_struct.sizeT})")
437 # Load plane from OMERO
438 conn = self._get_connection(**kwargs)
439 image = conn.getObject("Image", image_struct.image_id)
440 if not image:
441 raise ValueError(f"OMERO Image not found: {image_struct.image_id}")
443 pixels = image.getPrimaryPixels()
444 plane = pixels.getPlane(z_idx, c_idx, t_idx) # Returns 2D numpy array
446 logger.debug(f"Loaded {filename} → image {image_struct.image_id}, "
447 f"z={z_idx}, c={c_idx}, t={t_idx}, shape={plane.shape}")
449 return plane
451 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None:
452 """
453 Save data to OMERO.
455 For ROI data (List[ROI]): Creates OMERO ROI objects linked to images
456 For image data (numpy arrays): Creates a new image in a dataset
457 For tabular data (CSV/JSON/TXT): Attempts to parse and save as OMERO.table (queryable structured data)
458 For other text data: Creates a FileAnnotation attached to plate/well/image
460 Args:
461 data: Data to save
462 output_path: Output path
463 **kwargs: Additional arguments, including:
464 - images_dir: Directory containing images (required for analysis results to link to correct plate)
465 - dataset_id: Dataset ID for image data
466 """
467 from openhcs.core.roi import ROI
469 output_path = Path(output_path)
471 # Explicit type dispatch - fail-loud
472 if isinstance(data, list) and len(data) > 0 and isinstance(data[0], ROI):
473 # ROI data - save as OMERO ROI objects
474 images_dir = kwargs.pop('images_dir', None)
475 self._save_rois(data, output_path, images_dir=images_dir, **kwargs)
476 elif isinstance(data, str) and self.format_registry.is_text_format(output_path.suffix):
477 # Try to parse as tabular data and save as OMERO.table
478 # Extract images_dir from kwargs if present (passed via filemanager context)
479 # Remove it from kwargs to avoid duplicate keyword argument error
480 images_dir = kwargs.pop('images_dir', None)
481 self._save_as_table_or_annotation(data, output_path, images_dir=images_dir, **kwargs)
482 else:
483 # Image data - save as OMERO image
484 self._save_image(data, output_path, **kwargs)
486 def _save_as_table_or_annotation(self, text_content: str, output_path: Path, images_dir: str = None, **kwargs) -> None:
487 """
488 Try to parse text content as tabular data and save as OMERO.table.
489 If parsing fails, fall back to FileAnnotation.
491 Supports:
492 - CSV files (direct parsing)
493 - JSON files (if they contain tabular data)
494 - TXT files (if they contain tabular data)
496 Args:
497 text_content: Text content to save
498 output_path: Output path
499 images_dir: Directory containing images (required for analysis results to link to correct plate)
500 **kwargs: Additional arguments
501 """
502 import pandas as pd
503 from io import StringIO
504 import json
506 df = None
508 # Try to parse based on file extension
509 suffix = output_path.suffix.lower()
511 try:
512 if suffix == '.csv':
513 # Parse CSV directly
514 df = pd.read_csv(StringIO(text_content))
515 elif suffix == '.json':
516 # Try to parse JSON as tabular data
517 data = json.loads(text_content)
518 # Check if it's a list of dicts (table-like) or a dict with list values
519 if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
520 df = pd.DataFrame(data)
521 elif isinstance(data, dict):
522 # Try to convert dict to DataFrame
523 df = pd.DataFrame(data)
524 elif suffix == '.txt':
525 # Try to parse as CSV (tab or comma separated)
526 # First try tab-separated
527 try:
528 df = pd.read_csv(StringIO(text_content), sep='\t')
529 # Check if it actually parsed into multiple columns
530 if len(df.columns) == 1:
531 # Try comma-separated
532 df = pd.read_csv(StringIO(text_content))
533 except:
534 # Try comma-separated
535 try:
536 df = pd.read_csv(StringIO(text_content))
537 except:
538 # Try to parse as key-value pairs (e.g., "Key: Value" format)
539 lines = text_content.strip().split('\n')
540 data = {}
541 for line in lines:
542 if ':' in line:
543 key, value = line.split(':', 1)
544 data[key.strip()] = [value.strip()]
545 if data:
546 df = pd.DataFrame(data)
547 except Exception:
548 # Parsing failed, will fall back to FileAnnotation
549 df = None
551 # If we successfully parsed tabular data, save as OMERO.table
552 if df is not None and not df.empty and len(df.columns) > 0:
553 # Convert back to CSV for the table creation method
554 csv_content = df.to_csv(index=False)
555 self._save_csv_as_table(csv_content, output_path, images_dir=images_dir, **kwargs)
556 else:
557 # Fall back to FileAnnotation
558 self._save_text_annotation(text_content, output_path, images_dir=images_dir, **kwargs)
560 def _save_csv_as_table(self, csv_content: str, output_path: Path, images_dir: str = None, **kwargs) -> None:
561 """
562 Save CSV content as an OMERO.table (queryable structured data).
564 Tables are linked to the appropriate OMERO object (Plate, Well, or Image).
566 Args:
567 csv_content: CSV content to save
568 output_path: Output path (used for table name)
569 images_dir: Directory containing images (required to link table to correct plate)
570 **kwargs: Additional arguments
571 """
572 from omero.grid import LongColumn, DoubleColumn, StringColumn, BoolColumn
573 from omero.model import FileAnnotationI
574 from omero.rtypes import rstring
575 import pandas as pd
576 from io import StringIO
578 conn = self._get_connection(**kwargs)
580 # Parse CSV content into pandas DataFrame
581 df = pd.read_csv(StringIO(csv_content))
583 # Validate images_dir is provided
584 if not images_dir:
585 raise ValueError(
586 f"images_dir is required for OMERO table linking. "
587 f"This should be passed from the materialization context. "
588 f"Output path: {output_path}"
589 )
591 # Parse the images directory path to get the plate name, then query OMERO for actual plate ID
592 # Path format: /omero/plate_274_outputs/images/
593 # The path contains the INPUT plate ID (274), but we need the OUTPUT plate ID
594 # We must parse the full plate name and query OMERO to get the actual ID
595 images_dir = Path(images_dir)
596 plate_name, base_id, is_derived = self._parse_omero_path(images_dir)
598 # Query OMERO for the actual plate ID by name
599 plate_id = self._find_plate_by_name(plate_name, **kwargs)
600 if not plate_id:
601 raise ValueError(f"Plate '{plate_name}' not found in OMERO (images dir: {images_dir})")
603 # Determine table name from filename
604 # Remove ALL extensions (e.g., "file.roi.zip.json" -> "file")
605 # OMERO table names cannot contain dots except for the .h5 extension
606 table_name = output_path.name.split('.')[0]
608 # Build column objects based on DataFrame dtypes
609 columns = []
610 for col_name in df.columns:
611 col_data = df[col_name]
613 # Determine column type from pandas dtype
614 if pd.api.types.is_integer_dtype(col_data):
615 col = LongColumn(col_name, '', [])
616 col.values = col_data.astype(int).tolist()
617 elif pd.api.types.is_float_dtype(col_data):
618 col = DoubleColumn(col_name, '', [])
619 col.values = col_data.astype(float).tolist()
620 elif pd.api.types.is_bool_dtype(col_data):
621 col = BoolColumn(col_name, '', [])
622 col.values = col_data.astype(bool).tolist()
623 else:
624 # Default to string column
625 # Calculate max string length (OMERO requires size > 0)
626 str_values = col_data.astype(str).tolist()
627 max_len = max(len(s) for s in str_values) if str_values else 1
628 col = StringColumn(col_name, '', max_len, [])
629 col.values = str_values
631 columns.append(col)
633 # Create table via OMERO.tables service
634 # Python 3.11 compatibility: In multiprocessing contexts, the sharedResources
635 # may not be immediately available after connection. Retry with exponential backoff.
636 import time
638 max_retries = 3
639 retry_delay = 0.1 # Start with 100ms
641 for attempt in range(max_retries):
642 try:
643 resources = conn.c.sf.sharedResources()
645 # Get repository ID - fail-loud if no repositories available
646 repositories = resources.repositories()
647 if not repositories or not repositories.descriptions:
648 if attempt < max_retries - 1:
649 # Retry - repositories may not be ready yet
650 time.sleep(retry_delay)
651 retry_delay *= 2 # Exponential backoff
652 continue
653 else:
654 raise RuntimeError(
655 "No OMERO repositories available for table creation after retries. "
656 "This may indicate an OMERO server configuration issue."
657 )
659 # Get repository ID with explicit None checks
660 repo_desc = repositories.descriptions[0]
661 repo_id_obj = repo_desc.getId()
663 if repo_id_obj is None:
664 if attempt < max_retries - 1:
665 time.sleep(retry_delay)
666 retry_delay *= 2
667 continue
668 else:
669 raise RuntimeError(
670 f"Repository description exists but getId() returned None after retries. "
671 f"This may be a Python 3.11/Ice compatibility issue."
672 )
674 repository_id = repo_id_obj.getValue()
676 if repository_id is None:
677 if attempt < max_retries - 1:
678 time.sleep(retry_delay)
679 retry_delay *= 2
680 continue
681 else:
682 raise RuntimeError(
683 f"Repository ID object exists but getValue() returned None after retries. "
684 f"This may be a Python 3.11/Ice compatibility issue."
685 )
687 # Successfully got repository_id, create table
688 table = resources.newTable(repository_id, f"{table_name}.h5")
689 break # Success, exit retry loop
691 except Exception as e:
692 if attempt < max_retries - 1 and "null table" in str(e).lower():
693 # Retry on "null table" errors
694 time.sleep(retry_delay)
695 retry_delay *= 2
696 continue
697 else:
698 # Re-raise on final attempt or non-retryable errors
699 raise
701 if table is None:
702 raise RuntimeError("Failed to create OMERO.table")
704 try:
705 # Initialize table with columns
706 table.initialize(columns)
708 # Add all rows
709 table.addData(columns)
711 # Get the OriginalFile for the table
712 orig_file = table.getOriginalFile()
714 # Create FileAnnotation to link the table
715 file_ann = FileAnnotationI()
716 file_ann.setFile(orig_file)
717 file_ann.setNs(rstring('openhcs.analysis.results.table'))
718 file_ann.setDescription(rstring(f'Analysis results table: {table_name}'))
719 file_ann = conn.getUpdateService().saveAndReturnObject(file_ann)
721 # Link to plate
722 plate = conn.getObject("Plate", plate_id)
723 if not plate:
724 raise ValueError(f"Plate {plate_id} not found")
726 # Get the annotation ID and fetch as gateway object
727 ann_id = file_ann.getId().getValue()
728 file_ann_wrapped = conn.getObject("Annotation", ann_id)
729 plate.linkAnnotation(file_ann_wrapped)
730 logger.info(f"Created OMERO.table '{table_name}' and linked to plate {plate_id}")
732 finally:
733 table.close()
735 def _save_text_annotation(self, text_content: str, output_path: Path, images_dir: str = None, **kwargs) -> None:
736 """Save text content as a FileAnnotation attached to OMERO object.
738 Args:
739 text_content: Text content to save
740 output_path: Output path (used for filename)
741 images_dir: Directory containing images (required to link annotation to correct plate)
742 **kwargs: Additional arguments
743 """
744 conn = self._get_connection(**kwargs)
746 # Validate images_dir is provided
747 if not images_dir:
748 raise ValueError(
749 f"images_dir is required for OMERO annotation linking. "
750 f"This should be passed from the materialization context. "
751 f"Output path: {output_path}"
752 )
754 # Parse the images directory path to get the plate name, then query OMERO for actual plate ID
755 # Path format: /omero/plate_274_outputs/images/
756 # The path contains the INPUT plate ID (274), but we need the OUTPUT plate ID
757 # We must parse the full plate name and query OMERO to get the actual ID
758 images_dir = Path(images_dir)
759 plate_name, base_id, is_derived = self._parse_omero_path(images_dir)
761 # Query OMERO for the actual plate ID by name
762 plate_id = self._find_plate_by_name(plate_name, **kwargs)
763 if not plate_id:
764 raise ValueError(f"Plate '{plate_name}' not found in OMERO (images dir: {images_dir})")
766 # Create FileAnnotation
767 import tempfile
769 # Write content to temporary file
770 with tempfile.NamedTemporaryFile(mode='w', suffix=output_path.suffix, delete=False) as tmp:
771 tmp.write(text_content)
772 tmp_path = tmp.name
774 try:
775 # Upload file to OMERO with the actual filename from output_path
776 # Get MIME type from registry
777 mimetype = self.format_registry.get_mimetype(output_path.suffix)
779 file_ann = conn.createFileAnnfromLocalFile(
780 tmp_path,
781 origFilePathAndName=output_path.name, # Use actual filename, not temp name
782 mimetype=mimetype,
783 ns='openhcs.analysis.results',
784 desc=f'Analysis results: {output_path.name}'
785 )
787 # Attach to plate
788 plate = conn.getObject("Plate", plate_id)
789 if plate:
790 plate.linkAnnotation(file_ann)
791 logger.info(f"Attached {output_path.name} as FileAnnotation to plate {plate_id}")
792 else:
793 logger.warning(f"Plate {plate_id} not found, FileAnnotation created but not linked")
794 finally:
795 # Clean up temp file
796 import os
797 os.unlink(tmp_path)
799 def _save_image(self, data: Any, output_path: Path, **kwargs) -> None:
800 """Save image data to OMERO as new image."""
801 conn = self._get_connection(**kwargs)
803 dataset_id = kwargs.get('dataset_id')
804 if not dataset_id:
805 raise ValueError("dataset_id required")
807 dataset = conn.getObject("Dataset", dataset_id)
808 if not dataset:
809 raise ValueError(f"Dataset not found: {dataset_id}")
811 image_name = output_path.stem
813 # Get dimensions
814 if data.ndim == 3:
815 sizeZ, sizeY, sizeX = data.shape
816 sizeC, sizeT = 1, 1
817 elif data.ndim == 4:
818 sizeZ, sizeC, sizeY, sizeX = data.shape
819 sizeT = 1
820 else:
821 raise ValueError(f"Data must be 3D or 4D, got {data.shape}")
823 # Plane generator
824 def planes():
825 if data.ndim == 3:
826 for z in range(sizeZ):
827 yield data[z]
828 else:
829 for z in range(sizeZ):
830 for c in range(sizeC):
831 yield data[z, c]
833 # Create image
834 new_image = conn.createImageFromNumpySeq(
835 planes(),
836 image_name,
837 sizeZ=sizeZ,
838 sizeC=sizeC,
839 sizeT=sizeT,
840 description=kwargs.get('description', 'Processed by OpenHCS'),
841 dataset=dataset
842 )
844 logger.info(f"Created OMERO image {new_image.getId()}: {image_name}")
846 def exists(self, path: Union[str, Path], **kwargs) -> bool:
847 """
848 Check if a file/annotation exists in OMERO.
850 For text files (JSON/CSV): Check if FileAnnotation exists
851 For images: Check if image exists in plate
852 """
853 path = Path(path)
855 # For text files, check FileAnnotations using registry
856 if self.format_registry.is_text_format(path.suffix):
857 # For now, return False to allow overwrite
858 # TODO: Implement proper FileAnnotation lookup
859 return False
861 # For images, check if image exists
862 # TODO: Implement proper image lookup
863 return False
865 def delete(self, path: Union[str, Path], **kwargs) -> bool:
866 """
867 Delete a file/annotation from OMERO.
869 For text files (JSON/CSV): Delete FileAnnotation
870 For images: Delete image (if allowed)
871 """
872 path = Path(path)
874 # For text files, delete FileAnnotation using registry
875 if self.format_registry.is_text_format(path.suffix):
876 # For now, just log and return success
877 # FileAnnotations will be overwritten on save
878 logger.debug(f"Delete requested for {path} - will be overwritten on save")
879 return True
881 # For images, deletion not supported
882 logger.warning(f"Delete not supported for OMERO images: {path}")
883 return False
885 def _parse_omero_path(self, path: Path) -> Tuple[str, int, bool]:
886 """Extract (plate_name, base_id, is_derived) from path.
888 This method extracts the OMERO plate name from a path by combining the base plate directory
889 with any subdirectories (but NOT the filename).
891 Examples:
892 /omero/plate_289 -> ("plate_289", 289, False)
893 /omero/plate_289_outputs -> ("plate_289_outputs", 289, True)
894 /omero/plate_289_outputs/images -> ("plate_289_outputs_images", 289, True)
895 /omero/plate_289_outputs/images/A01.tif -> ("plate_289_outputs_images", 289, True)
896 /omero/plate_289_outputs/images_results -> ("plate_289_outputs_images_results", 289, True)
897 /omero/plate_289_outputs/checkpoints_step0/A01.tif -> ("plate_294_outputs_checkpoints_step0", 294, True)
898 """
899 parts = path.parts
900 if len(parts) < 2 or parts[0] != "/" or parts[1] != "omero":
901 raise ValueError(f"Not an OMERO path: {path}")
903 base_name = parts[2] # "plate_289_outputs"
904 # Extract subdirectories (everything between base_name and filename)
905 # For /omero/plate_289_outputs/images/A01.tif, subdirs should be ["images"]
906 # parts[3:-1] excludes both the base_name (parts[2]) and the filename (parts[-1])
907 subdirs = list(parts[3:-1]) if len(parts) > 4 else (list(parts[3:]) if len(parts) == 4 else [])
909 if not base_name.startswith("plate_"):
910 raise ValueError(f"OMERO path must use 'plate_{{id}}' format: {base_name}")
912 name_parts = base_name.split("_")
913 if len(name_parts) < 2 or not name_parts[1].isdigit():
914 raise ValueError(f"Cannot extract plate ID from: {base_name}")
916 base_id = int(name_parts[1])
917 plate_name = "_".join([base_name] + subdirs) if subdirs else base_name
918 is_derived = len(subdirs) > 0 or len(name_parts) > 2
920 return plate_name, base_id, is_derived
922 def _get_image_id(self, plate_id: int, well_id: str, site: int, **kwargs) -> int:
923 """Get OMERO image ID for well and site."""
924 if plate_id not in self._plate_metadata:
925 self._load_plate_structure(plate_id, **kwargs)
927 plate_struct = self._plate_metadata[plate_id]
928 if well_id not in plate_struct.wells:
929 raise ValueError(f"Well {well_id} not found in plate {plate_id}")
930 if site not in plate_struct.wells[well_id].sites:
931 raise ValueError(f"Site {site} not found in well {well_id}")
933 return plate_struct.wells[well_id].sites[site].image_id
935 def _find_plate_by_name(self, plate_name: str, **kwargs) -> Optional[int]:
936 """Query OMERO for plate by name."""
937 conn = self._get_connection(**kwargs)
938 plates = conn.getObjects("Plate", attributes={"name": plate_name})
939 for plate in plates:
940 return plate.getId()
941 return None
943 def save_batch(self, data_list: List[Any], identifiers: List[Union[str, Path]], **kwargs) -> None:
944 """Save multiple images to OMERO with plate creation and write support."""
945 if not identifiers:
946 return
948 if len(data_list) != len(identifiers):
949 raise ValueError(f"Length mismatch: {len(data_list)} vs {len(identifiers)}")
951 parser_name = kwargs.get('parser_name')
952 if not parser_name:
953 raise ValueError("parser_name required for OMERO save_batch")
955 microscope_type = kwargs.get('microscope_type')
956 if not microscope_type:
957 raise ValueError("microscope_type required for OMERO save_batch")
959 # Validate all paths are in same plate
960 plate_names = set()
961 for path in identifiers:
962 plate_name, _, _ = self._parse_omero_path(Path(path))
963 plate_names.add(plate_name)
965 if len(plate_names) > 1:
966 raise ValueError(f"Cannot save batch across multiple plates: {plate_names}")
968 parser = self._load_parser(parser_name)
969 plate_name, base_id, is_derived = self._parse_omero_path(Path(identifiers[0]))
971 # Group data by image (well + site)
972 images = defaultdict(lambda: {'planes': {}, 'max_z': 0, 'max_c': 0, 'max_t': 0})
973 for data, path in zip(data_list, identifiers):
974 parsed = parser.parse_filename(Path(path).name)
975 well_id, site = parsed['well'], parsed['site']
976 z, c, t = parsed.get('z_index', 1) - 1, parsed.get('channel', 1) - 1, parsed.get('timepoint', 1) - 1
978 image_key = (well_id, site)
979 images[image_key]['planes'][(z, c, t)] = data
980 images[image_key]['max_z'] = max(images[image_key]['max_z'], z + 1)
981 images[image_key]['max_c'] = max(images[image_key]['max_c'], c + 1)
982 images[image_key]['max_t'] = max(images[image_key]['max_t'], t + 1)
984 # Get or create plate with locking
985 plate_id = self._plate_name_cache.get(plate_name)
986 if plate_id is None:
987 # Remove parser_name and microscope_type from kwargs to avoid duplicate argument error
988 # (they're already passed as positional arguments)
989 filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ('parser_name', 'microscope_type')}
990 plate_id = self._get_or_create_plate_with_lock(
991 plate_name, base_id, parser_name, microscope_type, images, **filtered_kwargs
992 )
993 self._plate_name_cache[plate_name] = plate_id
995 # Write planes
996 self._write_planes_to_plate(plate_id, images, **kwargs)
998 def _get_or_create_plate_with_lock(self, plate_name, base_id, parser_name, microscope_type, images, **kwargs):
999 """Create plate with file locking (like zarr metadata)."""
1000 lock_dir = Path.home() / '.openhcs' / 'omero_locks'
1001 lock_dir.mkdir(parents=True, exist_ok=True)
1002 lock_path = lock_dir / f"{plate_name}.lock"
1004 try:
1005 with open(lock_path, 'w') as lock_file:
1006 if FCNTL_AVAILABLE:
1007 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
1008 else:
1009 portalocker.lock(lock_file, portalocker.LOCK_EX)
1011 existing_id = self._find_plate_by_name(plate_name, **kwargs)
1012 if existing_id:
1013 self._load_plate_structure(existing_id, **kwargs)
1014 return existing_id
1016 return self._create_derived_plate(plate_name, base_id, parser_name, microscope_type, images, **kwargs)
1017 finally:
1018 if lock_path.exists():
1019 try:
1020 lock_path.unlink()
1021 except:
1022 pass
1024 def _create_derived_plate(self, plate_name, base_id, parser_name, microscope_type, images, **kwargs):
1025 """Create plate from grouped image data."""
1026 conn = self._get_connection(**kwargs)
1028 # Import OMERO model classes
1029 import omero.model
1030 from omero.rtypes import rstring, rint
1031 from omero.model import NamedValue
1033 update_service = conn.getUpdateService()
1035 # Extract structure with MAX dimensions (Napari pattern)
1036 wells_structure = defaultdict(lambda: {'sites': {}})
1037 for (well_id, site), img_data in images.items():
1038 max_height = max_width = 0
1039 dtype = None
1040 for plane_data in img_data['planes'].values():
1041 h, w = plane_data.shape
1042 max_height, max_width = max(max_height, h), max(max_width, w)
1043 if dtype is None:
1044 dtype = plane_data.dtype
1046 wells_structure[well_id]['sites'][site] = {
1047 'sizeZ': img_data['max_z'], 'sizeC': img_data['max_c'], 'sizeT': img_data['max_t'],
1048 'height': max_height, 'width': max_width, 'dtype': dtype
1049 }
1051 # Create plate
1052 plate = omero.model.PlateI()
1053 plate.setName(rstring(plate_name))
1054 plate.setColumnNamingConvention(rstring("number"))
1055 plate.setRowNamingConvention(rstring("letter"))
1056 plate = update_service.saveAndReturnObject(plate)
1057 plate_id = plate.getId().getValue()
1059 # Attach metadata
1060 metadata_ann = omero.model.MapAnnotationI()
1061 metadata_ann.setNs(rstring("openhcs.metadata"))
1062 metadata_ann.setMapValue([
1063 NamedValue("openhcs.parser", parser_name),
1064 NamedValue("openhcs.microscope_type", microscope_type)
1065 ])
1066 plate.linkAnnotation(metadata_ann)
1068 prov_ann = omero.model.MapAnnotationI()
1069 prov_ann.setNs(rstring("openhcs.provenance"))
1070 prov_ann.setMapValue([
1071 NamedValue("source_plate_id", str(base_id)),
1072 NamedValue("created_by", "OpenHCS"),
1073 NamedValue("timestamp", datetime.now().isoformat())
1074 ])
1075 plate.linkAnnotation(prov_ann)
1076 update_service.saveObject(plate)
1078 # Create wells WITHOUT images
1079 # Images will be created with actual data in _write_planes_to_plate
1080 # This fixes the bug where placeholder zero images caused first well to be black
1081 for well_id, well_data in wells_structure.items():
1082 row, col = ord(well_id[0]) - ord('A'), int(well_id[1:]) - 1
1083 well = omero.model.WellI()
1084 well.setPlate(plate)
1085 well.setRow(rint(row))
1086 well.setColumn(rint(col))
1087 update_service.saveAndReturnObject(well)
1089 # Don't load plate structure yet - it will be loaded after images are written
1090 # Store parser for later use
1091 self._parser_cache[plate_id] = self._load_parser(parser_name)
1092 return plate_id
1094 def _write_planes_to_plate(self, plate_id, images, **kwargs):
1095 """Write planes by creating complete images with all data at once."""
1096 conn = self._get_connection(**kwargs)
1097 import omero.model
1098 from omero.rtypes import rint
1100 for (well_id, site), img_data in images.items():
1101 # Check if well/site already exists
1102 try:
1103 image_id = self._get_image_id(plate_id, well_id, site)
1104 # Image exists - skip it (already written)
1105 logger.info(f"Image for {well_id} site {site} already exists in plate {plate_id}, skipping")
1106 continue
1107 except ValueError:
1108 # Image doesn't exist - create it with all planes
1109 pass
1111 # Calculate max dimensions for padding
1112 max_height = max_width = 0
1113 dtype = None
1114 for plane_data in img_data['planes'].values():
1115 h, w = plane_data.shape
1116 max_height, max_width = max(max_height, h), max(max_width, w)
1117 if dtype is None:
1118 dtype = plane_data.dtype
1120 sizeZ = img_data['max_z']
1121 sizeC = img_data['max_c']
1122 sizeT = img_data['max_t']
1124 # Generate all planes in ZCT order with padding
1125 def plane_generator():
1126 for t in range(sizeT):
1127 for c in range(sizeC):
1128 for z in range(sizeZ):
1129 key = (z, c, t)
1130 if key in img_data['planes']:
1131 data = img_data['planes'][key]
1133 # Convert CuPy arrays to NumPy (OMERO requires NumPy)
1134 if hasattr(data, 'get'): # CuPy array
1135 data = data.get()
1137 h, w = data.shape
1139 # Pad if needed
1140 if h < max_height or w < max_width:
1141 padded = np.zeros((max_height, max_width), dtype=dtype)
1142 padded[:h, :w] = data
1143 yield padded
1144 else:
1145 yield data
1146 else:
1147 # Missing plane - yield zeros
1148 yield np.zeros((max_height, max_width), dtype=dtype)
1150 # Create complete image with all planes at once
1151 image = conn.createImageFromNumpySeq(
1152 zctPlanes=plane_generator(),
1153 imageName=f"{well_id}_s{site:03d}",
1154 sizeZ=sizeZ,
1155 sizeC=sizeC,
1156 sizeT=sizeT,
1157 description=f"OpenHCS processed image for well {well_id}, site {site}"
1158 )
1160 # Link image to well
1161 row, col = ord(well_id[0]) - ord('A'), int(well_id[1:]) - 1
1163 # Check if well exists, create if not
1164 query_service = conn.getQueryService()
1165 params = omero.sys.ParametersI()
1166 params.addLong("pid", plate_id)
1167 params.add("row", rint(row))
1168 params.add("col", rint(col))
1170 query = "select w from Well as w where w.plate.id = :pid and w.row = :row and w.column = :col"
1172 # Get or create lock for this specific well
1173 lock_key = f"plate_{plate_id}_well_{row}_{col}"
1174 with self._well_locks_lock:
1175 if lock_key not in self._well_locks:
1176 self._well_locks[lock_key] = threading.Lock()
1177 well_lock = self._well_locks[lock_key]
1179 # Use threading lock for thread safety + file lock for process safety
1180 with well_lock:
1181 lock_dir = Path.home() / '.openhcs' / 'omero_locks'
1182 lock_dir.mkdir(parents=True, exist_ok=True)
1183 lock_path = lock_dir / f"{lock_key}.lock"
1185 try:
1186 with open(lock_path, 'w') as lock_file:
1187 if FCNTL_AVAILABLE:
1188 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
1189 else:
1190 portalocker.lock(lock_file, portalocker.LOCK_EX)
1192 # Re-check if well exists after acquiring both locks
1193 # Use findAllByQuery since findByQuery throws exception on null
1194 wells = query_service.findAllByQuery(query, params)
1195 well_obj = wells[0] if wells else None
1197 if not well_obj:
1198 # Create new well
1199 update_service = conn.getUpdateService()
1200 well = omero.model.WellI()
1201 well.setPlate(omero.model.PlateI(plate_id, False))
1202 well.setRow(rint(row))
1203 well.setColumn(rint(col))
1204 well_obj = update_service.saveAndReturnObject(well)
1205 finally:
1206 if lock_path.exists():
1207 try:
1208 lock_path.unlink()
1209 except:
1210 pass
1212 # Link image to well
1213 # Reload well with wellSamples collection loaded
1214 well_obj_loaded = conn.getObject("Well", well_obj.getId().getValue())
1215 # Force load the wellSamples collection by accessing it
1216 _ = list(well_obj_loaded.listChildren()) # This loads the collection
1218 ws = omero.model.WellSampleI()
1219 ws.setImage(omero.model.ImageI(image.getId(), False))
1220 ws.setWell(well_obj_loaded._obj)
1221 well_obj_loaded._obj.addWellSample(ws)
1222 conn.getUpdateService().saveObject(well_obj_loaded._obj)
1224 # Reload plate structure to include new wells/images
1225 self._load_plate_structure(plate_id)
1227 def list_files(self, directory: Union[str, Path], pattern: str = "*",
1228 extensions: Set[str] = None, recursive: bool = False, **kwargs) -> List[str]:
1229 """
1230 Generate filenames on-demand from plate structure.
1232 Args:
1233 directory: Path containing plate ID (e.g., "/17/Images" or "17")
1234 pattern: File pattern (currently ignored)
1235 extensions: File extensions (currently ignored)
1236 recursive: Recursion flag (currently ignored)
1237 **kwargs: Additional backend-specific arguments (unused)
1239 Returns:
1240 List of filenames: ["A01_s001_w1_z001_t001.tif", ...]
1241 """
1242 # Extract plate_id from path
1243 # Path could be: "/omero/plate_55/Images" or "/17/Images" or "17/Images" or just "17"
1244 path_parts = Path(directory).parts
1246 # Find the numeric plate_id in the path
1247 plate_id = None
1248 for part in path_parts:
1249 # Handle both "55" and "plate_55" formats
1250 if part.isdigit():
1251 plate_id = int(part)
1252 break
1253 elif part.startswith("plate_"):
1254 try:
1255 plate_id = int(part.split("_")[1])
1256 break
1257 except (IndexError, ValueError):
1258 continue
1260 if plate_id is None:
1261 raise ValueError(f"Could not extract numeric plate_id from path: {directory}")
1263 # Load plate structure if not cached
1264 if plate_id not in self._plate_metadata:
1265 self._load_plate_structure(plate_id)
1267 plate_struct = self._plate_metadata[plate_id]
1268 parser = self._parser_cache[plate_id]
1270 # Generate filenames on-the-fly
1271 filenames = []
1272 for well_id, well_struct in plate_struct.wells.items():
1273 for site_idx, image_struct in well_struct.sites.items():
1274 # Generate filename for each (z, c, t) combination
1275 for t in range(image_struct.sizeT):
1276 for z in range(image_struct.sizeZ):
1277 for c in range(image_struct.sizeC):
1278 filename = parser.construct_filename(
1279 well=well_id,
1280 site=site_idx,
1281 channel=c + 1,
1282 z_index=z + 1,
1283 timepoint=t + 1,
1284 extension='.tif'
1285 )
1286 filenames.append(filename)
1288 logger.debug(f"Generated {len(filenames)} filenames on-demand for plate {plate_id}")
1289 return filenames
1291 def exists(self, path: Union[str, Path]) -> bool:
1292 """
1293 Check if a virtual OMERO path exists.
1295 For OMERO virtual backend, paths always "exist" if they're valid OMERO paths.
1296 This is because OMERO generates filenames on-demand from plate structure.
1298 Args:
1299 path: Virtual OMERO path to check
1301 Returns:
1302 True if path is a valid OMERO path format, False otherwise
1303 """
1304 try:
1305 # Check if path is valid OMERO format
1306 path_str = str(path)
1307 if not path_str.startswith("/omero/"):
1308 return False
1310 # Try to parse the path - if it parses, it's valid
1311 self._parse_omero_path(Path(path))
1312 return True
1313 except (ValueError, IndexError):
1314 return False
1316 def ensure_directory(self, directory: Union[str, Path]) -> None:
1317 """
1318 Ensure directory exists (no-op for OMERO virtual backend).
1320 OMERO is a virtual filesystem - directories don't exist as real entities.
1321 Plates are created on-demand during save_batch operations.
1322 This method exists to satisfy the backend interface but does nothing.
1324 Args:
1325 directory: Virtual directory path (ignored)
1326 """
1327 # No-op for virtual backend - directories are implicit in OMERO
1328 pass
1330 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
1331 """Load multiple images from OMERO."""
1332 return [self.load(fp, **kwargs) for fp in file_paths]
1334 def _save_rois(self, rois: List, output_path: Path, images_dir: str = None, **kwargs) -> str:
1335 """Save ROIs to OMERO by linking to images in the materialized plate.
1337 Args:
1338 rois: List of ROI objects
1339 output_path: Output path (e.g., /omero/plate_32_outputs/images_results/A01_rois_step7.json)
1340 images_dir: Images directory path (required for OMERO to link ROIs to correct plate)
1342 Returns:
1343 String describing where ROIs were saved
1344 """
1345 from openhcs.core.roi import PolygonShape, MaskShape, PointShape, EllipseShape
1346 import omero.model
1347 from omero.rtypes import rstring, rdouble, rint
1349 conn = self._get_connection(**kwargs)
1351 # Validate images_dir is provided
1352 if not images_dir:
1353 raise ValueError(
1354 f"images_dir is required for OMERO ROI linking. "
1355 f"This should be passed from the materialization context. "
1356 f"Output path: {output_path}"
1357 )
1359 images_dir = Path(images_dir)
1361 # Parse the images directory path to get the plate name
1362 plate_name, base_id, is_derived = self._parse_omero_path(images_dir)
1364 # Query OMERO for the actual plate ID by name
1365 plate_id = self._find_plate_by_name(plate_name, **kwargs)
1366 if not plate_id:
1367 raise ValueError(f"Plate '{plate_name}' not found in OMERO (images dir: {images_dir})")
1369 # Extract well ID from filename (first component before underscore)
1370 filename = output_path.name
1371 well_id_from_filename = filename.split('_')[0] # "A01" or "A1"
1373 # Query OMERO for images in this well of the materialized plate
1374 plate = conn.getObject("Plate", plate_id)
1375 if not plate:
1376 raise ValueError(f"Plate {plate_id} not found in OMERO")
1378 # Find well by label
1379 # Note: getWellPos() returns format like "A1" (no zero-padding)
1380 # but filenames might use "A01" (zero-padded), so we need to normalize
1381 well = None
1382 for w in plate.listChildren():
1383 well_pos = w.getWellPos() # e.g., "A1"
1384 # Normalize both to compare: remove leading zeros from column number
1385 # "A01" -> "A1", "A1" -> "A1"
1386 normalized_filename_well = well_id_from_filename[0] + str(int(well_id_from_filename[1:]))
1387 if well_pos == normalized_filename_well:
1388 well = w
1389 break
1391 if not well:
1392 raise ValueError(f"Well {well_id_from_filename} not found in plate {plate_id}")
1394 # Get all images in this well
1395 images = []
1396 for well_sample in well.listChildren():
1397 image = well_sample.getImage()
1398 if image:
1399 images.append(image)
1401 if not images:
1402 raise ValueError(f"No images found in well {well_id_from_filename} of plate {plate_id}")
1404 # Link ROIs to ALL images in the well
1405 # (ROIs were created from the full image stack at this step)
1406 update_service = conn.getUpdateService()
1407 roi_count = 0
1409 for image in images:
1410 for roi in rois:
1411 # Create OMERO ROI object
1412 omero_roi = omero.model.RoiI()
1413 omero_roi.setImage(image._obj)
1415 # Add shapes to ROI
1416 for shape in roi.shapes:
1417 if isinstance(shape, PolygonShape):
1418 # Create OMERO polygon
1419 polygon = omero.model.PolygonI()
1421 # Convert coordinates to OMERO format (comma-separated string)
1422 # OMERO expects "x1,y1 x2,y2 x3,y3 ..."
1423 points_str = " ".join([f"{x},{y}" for y, x in shape.coordinates])
1424 polygon.setPoints(rstring(points_str))
1426 # Set metadata
1427 if 'label' in roi.metadata:
1428 polygon.setTextValue(rstring(str(roi.metadata['label'])))
1430 omero_roi.addShape(polygon)
1432 elif isinstance(shape, EllipseShape):
1433 # Create OMERO ellipse
1434 ellipse = omero.model.EllipseI()
1435 ellipse.setX(rdouble(shape.center_x))
1436 ellipse.setY(rdouble(shape.center_y))
1437 ellipse.setRadiusX(rdouble(shape.radius_x))
1438 ellipse.setRadiusY(rdouble(shape.radius_y))
1440 if 'label' in roi.metadata:
1441 ellipse.setTextValue(rstring(str(roi.metadata['label'])))
1443 omero_roi.addShape(ellipse)
1445 elif isinstance(shape, PointShape):
1446 # Create OMERO point
1447 point = omero.model.PointI()
1448 point.setX(rdouble(shape.x))
1449 point.setY(rdouble(shape.y))
1451 if 'label' in roi.metadata:
1452 point.setTextValue(rstring(str(roi.metadata['label'])))
1454 omero_roi.addShape(point)
1456 # Save ROI to OMERO
1457 if omero_roi.sizeOfShapes() > 0:
1458 update_service.saveAndReturnObject(omero_roi)
1459 roi_count += 1
1461 result_msg = f"Linked {len(rois)} ROIs to {len(images)} images in well {well_id_from_filename} (plate: {plate_name}, ID: {plate_id})"
1462 logger.info(result_msg)
1463 return result_msg