Coverage for openhcs/io/omero_local.py: 7.4%

737 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1# openhcs/io/omero_local.py 

2""" 

3OMERO Local Storage Backend - Zero-copy server-side OMERO access. 

4 

5Reads directly from OMERO binary repository, saves results back to OMERO. 

6""" 

7 

8import logging 

9from dataclasses import dataclass 

10from pathlib import Path 

11from typing import Any, Dict, List, Optional, Set, Union, Tuple 

12from collections import defaultdict 

13from datetime import datetime 

14import threading 

15 

16# Cross-platform file locking 

17try: 

18 import fcntl 

19 FCNTL_AVAILABLE = True 

20except ImportError: 

21 import portalocker 

22 FCNTL_AVAILABLE = False 

23 

24import numpy as np 

25 

26from openhcs.io.base import VirtualBackend 

27from openhcs.constants.constants import FileFormat 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32class OMEROFileFormatRegistry: 

33 """Registry for OMERO file format handlers (text files saved as FileAnnotations).""" 

34 

35 def __init__(self): 

36 self._text_extensions: Set[str] = set() 

37 self._mimetypes: Dict[str, str] = {} 

38 

39 def register_text_format(self, extensions: List[str], mimetype: str): 

40 """Register a text format that should be saved as FileAnnotation.""" 

41 for ext in extensions: 

42 ext = ext.lower() 

43 self._text_extensions.add(ext) 

44 self._mimetypes[ext] = mimetype 

45 

46 def is_text_format(self, ext: str) -> bool: 

47 """Check if extension is registered as text format.""" 

48 return ext.lower() in self._text_extensions 

49 

50 def get_mimetype(self, ext: str) -> str: 

51 """Get MIME type for extension.""" 

52 return self._mimetypes.get(ext.lower(), 'text/plain') 

53 

54 

55@dataclass 

56class ImageStructure: 

57 """Metadata for a single OMERO image.""" 

58 image_id: int 

59 sizeZ: int 

60 sizeC: int 

61 sizeT: int 

62 sizeY: int 

63 sizeX: int 

64 

65 

66@dataclass 

67class WellStructure: 

68 """Metadata for a single well.""" 

69 sites: Dict[int, ImageStructure] # site_idx → ImageStructure 

70 

71 

72@dataclass 

73class PlateStructure: 

74 """Lightweight metadata for entire plate.""" 

75 plate_id: int 

76 parser_name: str 

77 microscope_type: str 

78 wells: Dict[str, WellStructure] # well_id → WellStructure 

79 

80 # Cached for quick access 

81 all_well_ids: Set[str] 

82 max_sites: int 

83 max_z: int 

84 max_c: int 

85 max_t: int 

86 

87 

88class OMEROLocalBackend(VirtualBackend): 

89 """ 

90 Virtual backend for OMERO server-side execution. 

91 

92 Generates filenames on-demand from OMERO plate structure. 

93 No real filesystem operations - all paths are virtual. 

94 """ 

95 

96 _backend_type = 'omero_local' 

97 

98 # Class-level lock dictionary for thread-safe well creation 

99 _well_locks: Dict[str, threading.Lock] = {} 

100 _well_locks_lock = threading.Lock() # Lock for the lock dictionary itself 

101 

102 def __init__(self, omero_data_dir: Optional[Path] = None, omero_conn=None): 

103 try: 

104 from omero.gateway import BlitzGateway 

105 self._BlitzGateway = BlitzGateway 

106 except ImportError: 

107 raise ImportError("omero-py required: pip install omero-py") 

108 

109 if omero_data_dir: 

110 omero_data_dir = Path(omero_data_dir) 

111 if not omero_data_dir.exists(): 

112 raise ValueError(f"OMERO data directory not found: {omero_data_dir}") 

113 

114 self.omero_data_dir = omero_data_dir 

115 # DO NOT store omero_conn - it contains unpicklable IcePy.Communicator 

116 # Connection must be passed via kwargs or retrieved from global registry 

117 self._initial_conn = omero_conn # Store temporarily for registration 

118 

119 # Store connection parameters for reconnection in worker processes 

120 self._conn_params = None 

121 if omero_conn: 

122 try: 

123 self._conn_params = { 

124 'host': omero_conn.host, 

125 'port': omero_conn.port, 

126 'username': omero_conn.getUser().getName(), 

127 # Password not available from connection object 

128 } 

129 except: 

130 pass # Connection params not available 

131 

132 # Caches for virtual filesystem 

133 self._plate_metadata: Dict[int, PlateStructure] = {} 

134 self._parser_cache: Dict[int, Any] = {} # plate_id → parser instance 

135 self._plate_name_cache: Dict[str, int] = {} # plate_name → plate_id 

136 

137 # File format registry 

138 self.format_registry = OMEROFileFormatRegistry() 

139 self._register_formats() 

140 

141 def _register_formats(self): 

142 """Register supported text file formats for FileAnnotation storage.""" 

143 # JSON files 

144 self.format_registry.register_text_format( 

145 FileFormat.JSON.value, 

146 'application/json' 

147 ) 

148 

149 # CSV files 

150 self.format_registry.register_text_format( 

151 FileFormat.CSV.value, 

152 'text/csv' 

153 ) 

154 

155 # Text files 

156 self.format_registry.register_text_format( 

157 FileFormat.TEXT.value, 

158 'text/plain' 

159 ) 

160 

161 def __getstate__(self): 

162 """Exclude unpicklable connection from pickle.""" 

163 state = self.__dict__.copy() 

164 # Remove unpicklable connection 

165 state['_initial_conn'] = None 

166 return state 

167 

168 def __setstate__(self, state): 

169 """Restore state after unpickling.""" 

170 self.__dict__.update(state) 

171 # Connection will be retrieved from global registry in worker process 

172 

173 def _get_connection(self, **kwargs): 

174 """ 

175 Get OMERO connection from kwargs, instance, global registry, or create new one. 

176 

177 This method handles multiple scenarios: 

178 1. Connection passed via kwargs (highest priority) 

179 2. Connection stored in this instance 

180 3. Connection from global registry backend 

181 4. Create new connection using stored params (worker process) 

182 

183 This ensures the backend remains picklable for multiprocessing. 

184 """ 

185 conn = kwargs.get('omero_conn') 

186 if not conn and self._initial_conn: 

187 conn = self._initial_conn 

188 

189 if not conn: 

190 # Try to get from global registry 

191 # This handles the case where orchestrator copies the registry 

192 # but the copy's backend doesn't have the connection 

193 try: 

194 from openhcs.io.base import storage_registry 

195 backend = storage_registry.get('omero_local') 

196 if backend and backend is not self and hasattr(backend, '_initial_conn') and backend._initial_conn: 

197 conn = backend._initial_conn 

198 # Cache it in this instance too 

199 self._initial_conn = conn 

200 logger.debug("Retrieved OMERO connection from global registry backend") 

201 except Exception as e: 

202 logger.debug(f"Could not get connection from global registry: {e}") 

203 

204 if not conn and self._conn_params: 

205 # Worker process or fresh instance: create new connection using stored params 

206 logger.info(f"Creating new OMERO connection to {self._conn_params.get('host')}:{self._conn_params.get('port')}") 

207 try: 

208 # Get password from environment or use default 

209 import os 

210 password = os.getenv('OMERO_PASSWORD', 'openhcs') 

211 

212 conn = self._BlitzGateway( 

213 self._conn_params['username'], 

214 password, 

215 host=self._conn_params['host'], 

216 port=self._conn_params['port'] 

217 ) 

218 if not conn.connect(): 

219 raise ConnectionError(f"Failed to connect to OMERO at {self._conn_params['host']}:{self._conn_params['port']}") 

220 

221 # Python 3.11 compatibility: Ensure session is fully established 

222 # by explicitly calling keepAlive() after connection 

223 try: 

224 conn.c.sf.keepAlive(None) 

225 except Exception as e: 

226 logger.warning(f"keepAlive() call failed (non-fatal): {e}") 

227 

228 # Cache the connection 

229 self._initial_conn = conn 

230 logger.info("Successfully connected to OMERO") 

231 except Exception as e: 

232 logger.error(f"Failed to create OMERO connection: {e}") 

233 raise 

234 

235 if not conn: 

236 raise ValueError( 

237 "No OMERO connection available. " 

238 "Pass omero_conn via kwargs, set in instance, ensure global registry has connection, " 

239 "or provide connection params for auto-reconnection." 

240 ) 

241 return conn 

242 

243 def _ensure_connection(self, **kwargs): 

244 """Validate OMERO connection is available.""" 

245 self._get_connection(**kwargs) 

246 

247 def _get_parser_from_plate_metadata(self, plate) -> str: 

248 """Get parser name from OMERO plate metadata.""" 

249 for ann in plate.listAnnotations(): 

250 if hasattr(ann, 'getNs') and ann.getNs() == "openhcs.metadata": 

251 metadata = {kv[0]: kv[1] for kv in ann.getValue()} 

252 parser_name = metadata.get("openhcs.parser") 

253 if parser_name: 

254 return parser_name 

255 

256 raise ValueError(f"Plate {plate.getId()} missing openhcs.parser metadata") 

257 

258 def _get_microscope_type_from_plate_metadata(self, plate) -> str: 

259 """Get microscope type from OMERO plate metadata.""" 

260 for ann in plate.listAnnotations(): 

261 if hasattr(ann, 'getNs') and ann.getNs() == "openhcs.metadata": 

262 metadata = {kv[0]: kv[1] for kv in ann.getValue()} 

263 microscope_type = metadata.get("openhcs.microscope_type") 

264 if microscope_type: 

265 return microscope_type 

266 

267 raise ValueError(f"Plate {plate.getId()} missing openhcs.microscope_type metadata") 

268 

269 def _load_parser(self, parser_name: str): 

270 """Dynamically load parser class by name.""" 

271 # Import parser classes 

272 from openhcs.microscopes.imagexpress import ImageXpressFilenameParser 

273 from openhcs.microscopes.opera_phenix import OperaPhenixFilenameParser 

274 from openhcs.microscopes.omero import OMEROFilenameParser 

275 

276 parser_map = { 

277 'ImageXpressFilenameParser': ImageXpressFilenameParser, 

278 'OperaPhenixFilenameParser': OperaPhenixFilenameParser, 

279 'OMEROFilenameParser': OMEROFilenameParser, 

280 } 

281 

282 parser_class = parser_map.get(parser_name) 

283 if not parser_class: 

284 raise ValueError(f"Unknown parser: {parser_name}") 

285 

286 return parser_class() 

287 

288 def _load_plate_structure(self, plate_id: int, **kwargs) -> None: 

289 """ 

290 Query OMERO once to build lightweight plate structure. 

291 

292 Args: 

293 plate_id: OMERO Plate ID 

294 **kwargs: Must include omero_conn 

295 

296 Raises: 

297 ValueError: If plate not found or missing metadata 

298 """ 

299 conn = self._get_connection(**kwargs) 

300 

301 # Query OMERO for plate 

302 plate = conn.getObject("Plate", plate_id) 

303 if not plate: 

304 raise ValueError(f"OMERO Plate not found: {plate_id}") 

305 

306 # Get parser metadata 

307 parser_name = self._get_parser_from_plate_metadata(plate) 

308 microscope_type = self._get_microscope_type_from_plate_metadata(plate) 

309 

310 # Load parser (cache it) 

311 if plate_id not in self._parser_cache: 

312 self._parser_cache[plate_id] = self._load_parser(parser_name) 

313 

314 # Build structure 

315 wells = {} 

316 all_well_ids = set() 

317 max_sites = 0 

318 max_z = 0 

319 max_c = 0 

320 max_t = 0 

321 

322 for well in plate.listChildren(): 

323 # Handle both OMERO rtypes and plain Python types 

324 row = well.row.val if hasattr(well.row, 'val') else well.row 

325 col = well.column.val if hasattr(well.column, 'val') else well.column 

326 well_id = f"{chr(ord('A') + row)}{col + 1:02d}" 

327 all_well_ids.add(well_id) 

328 

329 sites = {} 

330 for site_idx_0based, wellsample in enumerate(well.listChildren()): 

331 image = wellsample.getImage() 

332 # Use enumeration order as site index (0-based) 

333 # Convert to 1-based for OpenHCS 

334 site_idx = site_idx_0based + 1 

335 

336 image_struct = ImageStructure( 

337 image_id=image.getId(), 

338 sizeZ=image.getSizeZ(), 

339 sizeC=image.getSizeC(), 

340 sizeT=image.getSizeT(), 

341 sizeY=image.getSizeY(), 

342 sizeX=image.getSizeX() 

343 ) 

344 sites[site_idx] = image_struct 

345 

346 # Track maximums 

347 max_sites = max(max_sites, site_idx) 

348 max_z = max(max_z, image.getSizeZ()) 

349 max_c = max(max_c, image.getSizeC()) 

350 max_t = max(max_t, image.getSizeT()) 

351 

352 wells[well_id] = WellStructure(sites=sites) 

353 

354 # Store structure 

355 self._plate_metadata[plate_id] = PlateStructure( 

356 plate_id=plate_id, 

357 parser_name=parser_name, 

358 microscope_type=microscope_type, 

359 wells=wells, 

360 all_well_ids=all_well_ids, 

361 max_sites=max_sites, 

362 max_z=max_z, 

363 max_c=max_c, 

364 max_t=max_t 

365 ) 

366 

367 logger.info(f"Loaded plate structure for {plate_id}: " 

368 f"{len(wells)} wells, {max_sites} sites, " 

369 f"{max_z}Z × {max_c}C × {max_t}T") 

370 

371 def load(self, file_path: Union[str, Path], **kwargs) -> np.ndarray: 

372 """ 

373 Load by parsing filename to extract coordinates, then lookup image. 

374 

375 Flow: path → extract plate_id → filename → parse → (well, site, z, c, t) → lookup structure → image_id → load plane 

376 

377 Args: 

378 file_path: Full path including plate_id (e.g., "/omero/plate_59/A01_s001_w1_z001_t001.tif") 

379 **kwargs: Additional backend-specific arguments (unused) 

380 

381 Returns: 

382 2D numpy array (single z-plane, single channel, single timepoint) 

383 """ 

384 # Extract plate_id from path using parent directory 

385 # Path format: /omero/plate_59/A01_s001_w1_z001_t001.tif 

386 path_obj = Path(file_path) 

387 plate_dir = path_obj.parent # /omero/plate_59 

388 

389 # Extract plate_id from plate directory name 

390 import re 

391 plate_dir_name = plate_dir.name # "plate_59" or "plate_59_outputs" 

392 match = re.match(r'plate_(\d+)', plate_dir_name) 

393 if not match: 

394 raise ValueError(f"Could not extract plate_id from path: {file_path}. Expected /omero/plate_<id>/filename format") 

395 

396 plate_id = int(match.group(1)) 

397 

398 # Extract filename 

399 filename = path_obj.name 

400 

401 # Ensure plate structure is loaded 

402 if plate_id not in self._plate_metadata: 

403 self._load_plate_structure(plate_id, **kwargs) 

404 

405 plate_struct = self._plate_metadata[plate_id] 

406 parser = self._parser_cache[plate_id] 

407 

408 # Parse filename to extract components 

409 parsed = parser.parse_filename(filename) 

410 if not parsed: 

411 raise ValueError(f"Cannot parse filename: {filename}") 

412 

413 well_id = parsed['well'] 

414 site_idx = parsed['site'] 

415 z_idx = parsed['z_index'] - 1 # Convert to 0-based 

416 c_idx = parsed['channel'] - 1 

417 t_idx = parsed['timepoint'] - 1 

418 

419 # Lookup image_id from structure 

420 if well_id not in plate_struct.wells: 

421 raise ValueError(f"Well {well_id} not found in plate {plate_id}") 

422 

423 well_struct = plate_struct.wells[well_id] 

424 if site_idx not in well_struct.sites: 

425 raise ValueError(f"Site {site_idx} not found in well {well_id}") 

426 

427 image_struct = well_struct.sites[site_idx] 

428 

429 # Validate coordinates 

430 if z_idx >= image_struct.sizeZ: 

431 raise ValueError(f"Z-index {z_idx} out of range (max: {image_struct.sizeZ})") 

432 if c_idx >= image_struct.sizeC: 

433 raise ValueError(f"Channel {c_idx} out of range (max: {image_struct.sizeC})") 

434 if t_idx >= image_struct.sizeT: 

435 raise ValueError(f"Timepoint {t_idx} out of range (max: {image_struct.sizeT})") 

436 

437 # Load plane from OMERO 

438 conn = self._get_connection(**kwargs) 

439 image = conn.getObject("Image", image_struct.image_id) 

440 if not image: 

441 raise ValueError(f"OMERO Image not found: {image_struct.image_id}") 

442 

443 pixels = image.getPrimaryPixels() 

444 plane = pixels.getPlane(z_idx, c_idx, t_idx) # Returns 2D numpy array 

445 

446 logger.debug(f"Loaded {filename} → image {image_struct.image_id}, " 

447 f"z={z_idx}, c={c_idx}, t={t_idx}, shape={plane.shape}") 

448 

449 return plane 

450 

451 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None: 

452 """ 

453 Save data to OMERO. 

454 

455 For ROI data (List[ROI]): Creates OMERO ROI objects linked to images 

456 For image data (numpy arrays): Creates a new image in a dataset 

457 For tabular data (CSV/JSON/TXT): Attempts to parse and save as OMERO.table (queryable structured data) 

458 For other text data: Creates a FileAnnotation attached to plate/well/image 

459 

460 Args: 

461 data: Data to save 

462 output_path: Output path 

463 **kwargs: Additional arguments, including: 

464 - images_dir: Directory containing images (required for analysis results to link to correct plate) 

465 - dataset_id: Dataset ID for image data 

466 """ 

467 from openhcs.core.roi import ROI 

468 

469 output_path = Path(output_path) 

470 

471 # Explicit type dispatch - fail-loud 

472 if isinstance(data, list) and len(data) > 0 and isinstance(data[0], ROI): 

473 # ROI data - save as OMERO ROI objects 

474 images_dir = kwargs.pop('images_dir', None) 

475 self._save_rois(data, output_path, images_dir=images_dir, **kwargs) 

476 elif isinstance(data, str) and self.format_registry.is_text_format(output_path.suffix): 

477 # Try to parse as tabular data and save as OMERO.table 

478 # Extract images_dir from kwargs if present (passed via filemanager context) 

479 # Remove it from kwargs to avoid duplicate keyword argument error 

480 images_dir = kwargs.pop('images_dir', None) 

481 self._save_as_table_or_annotation(data, output_path, images_dir=images_dir, **kwargs) 

482 else: 

483 # Image data - save as OMERO image 

484 self._save_image(data, output_path, **kwargs) 

485 

486 def _save_as_table_or_annotation(self, text_content: str, output_path: Path, images_dir: str = None, **kwargs) -> None: 

487 """ 

488 Try to parse text content as tabular data and save as OMERO.table. 

489 If parsing fails, fall back to FileAnnotation. 

490 

491 Supports: 

492 - CSV files (direct parsing) 

493 - JSON files (if they contain tabular data) 

494 - TXT files (if they contain tabular data) 

495 

496 Args: 

497 text_content: Text content to save 

498 output_path: Output path 

499 images_dir: Directory containing images (required for analysis results to link to correct plate) 

500 **kwargs: Additional arguments 

501 """ 

502 import pandas as pd 

503 from io import StringIO 

504 import json 

505 

506 df = None 

507 

508 # Try to parse based on file extension 

509 suffix = output_path.suffix.lower() 

510 

511 try: 

512 if suffix == '.csv': 

513 # Parse CSV directly 

514 df = pd.read_csv(StringIO(text_content)) 

515 elif suffix == '.json': 

516 # Try to parse JSON as tabular data 

517 data = json.loads(text_content) 

518 # Check if it's a list of dicts (table-like) or a dict with list values 

519 if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): 

520 df = pd.DataFrame(data) 

521 elif isinstance(data, dict): 

522 # Try to convert dict to DataFrame 

523 df = pd.DataFrame(data) 

524 elif suffix == '.txt': 

525 # Try to parse as CSV (tab or comma separated) 

526 # First try tab-separated 

527 try: 

528 df = pd.read_csv(StringIO(text_content), sep='\t') 

529 # Check if it actually parsed into multiple columns 

530 if len(df.columns) == 1: 

531 # Try comma-separated 

532 df = pd.read_csv(StringIO(text_content)) 

533 except: 

534 # Try comma-separated 

535 try: 

536 df = pd.read_csv(StringIO(text_content)) 

537 except: 

538 # Try to parse as key-value pairs (e.g., "Key: Value" format) 

539 lines = text_content.strip().split('\n') 

540 data = {} 

541 for line in lines: 

542 if ':' in line: 

543 key, value = line.split(':', 1) 

544 data[key.strip()] = [value.strip()] 

545 if data: 

546 df = pd.DataFrame(data) 

547 except Exception: 

548 # Parsing failed, will fall back to FileAnnotation 

549 df = None 

550 

551 # If we successfully parsed tabular data, save as OMERO.table 

552 if df is not None and not df.empty and len(df.columns) > 0: 

553 # Convert back to CSV for the table creation method 

554 csv_content = df.to_csv(index=False) 

555 self._save_csv_as_table(csv_content, output_path, images_dir=images_dir, **kwargs) 

556 else: 

557 # Fall back to FileAnnotation 

558 self._save_text_annotation(text_content, output_path, images_dir=images_dir, **kwargs) 

559 

560 def _save_csv_as_table(self, csv_content: str, output_path: Path, images_dir: str = None, **kwargs) -> None: 

561 """ 

562 Save CSV content as an OMERO.table (queryable structured data). 

563 

564 Tables are linked to the appropriate OMERO object (Plate, Well, or Image). 

565 

566 Args: 

567 csv_content: CSV content to save 

568 output_path: Output path (used for table name) 

569 images_dir: Directory containing images (required to link table to correct plate) 

570 **kwargs: Additional arguments 

571 """ 

572 from omero.grid import LongColumn, DoubleColumn, StringColumn, BoolColumn 

573 from omero.model import FileAnnotationI 

574 from omero.rtypes import rstring 

575 import pandas as pd 

576 from io import StringIO 

577 

578 conn = self._get_connection(**kwargs) 

579 

580 # Parse CSV content into pandas DataFrame 

581 df = pd.read_csv(StringIO(csv_content)) 

582 

583 # Validate images_dir is provided 

584 if not images_dir: 

585 raise ValueError( 

586 f"images_dir is required for OMERO table linking. " 

587 f"This should be passed from the materialization context. " 

588 f"Output path: {output_path}" 

589 ) 

590 

591 # Parse the images directory path to get the plate name, then query OMERO for actual plate ID 

592 # Path format: /omero/plate_274_outputs/images/ 

593 # The path contains the INPUT plate ID (274), but we need the OUTPUT plate ID 

594 # We must parse the full plate name and query OMERO to get the actual ID 

595 images_dir = Path(images_dir) 

596 plate_name, base_id, is_derived = self._parse_omero_path(images_dir) 

597 

598 # Query OMERO for the actual plate ID by name 

599 plate_id = self._find_plate_by_name(plate_name, **kwargs) 

600 if not plate_id: 

601 raise ValueError(f"Plate '{plate_name}' not found in OMERO (images dir: {images_dir})") 

602 

603 # Determine table name from filename 

604 # Remove ALL extensions (e.g., "file.roi.zip.json" -> "file") 

605 # OMERO table names cannot contain dots except for the .h5 extension 

606 table_name = output_path.name.split('.')[0] 

607 

608 # Build column objects based on DataFrame dtypes 

609 columns = [] 

610 for col_name in df.columns: 

611 col_data = df[col_name] 

612 

613 # Determine column type from pandas dtype 

614 if pd.api.types.is_integer_dtype(col_data): 

615 col = LongColumn(col_name, '', []) 

616 col.values = col_data.astype(int).tolist() 

617 elif pd.api.types.is_float_dtype(col_data): 

618 col = DoubleColumn(col_name, '', []) 

619 col.values = col_data.astype(float).tolist() 

620 elif pd.api.types.is_bool_dtype(col_data): 

621 col = BoolColumn(col_name, '', []) 

622 col.values = col_data.astype(bool).tolist() 

623 else: 

624 # Default to string column 

625 # Calculate max string length (OMERO requires size > 0) 

626 str_values = col_data.astype(str).tolist() 

627 max_len = max(len(s) for s in str_values) if str_values else 1 

628 col = StringColumn(col_name, '', max_len, []) 

629 col.values = str_values 

630 

631 columns.append(col) 

632 

633 # Create table via OMERO.tables service 

634 # Python 3.11 compatibility: In multiprocessing contexts, the sharedResources 

635 # may not be immediately available after connection. Retry with exponential backoff. 

636 import time 

637 

638 max_retries = 3 

639 retry_delay = 0.1 # Start with 100ms 

640 

641 for attempt in range(max_retries): 

642 try: 

643 resources = conn.c.sf.sharedResources() 

644 

645 # Get repository ID - fail-loud if no repositories available 

646 repositories = resources.repositories() 

647 if not repositories or not repositories.descriptions: 

648 if attempt < max_retries - 1: 

649 # Retry - repositories may not be ready yet 

650 time.sleep(retry_delay) 

651 retry_delay *= 2 # Exponential backoff 

652 continue 

653 else: 

654 raise RuntimeError( 

655 "No OMERO repositories available for table creation after retries. " 

656 "This may indicate an OMERO server configuration issue." 

657 ) 

658 

659 # Get repository ID with explicit None checks 

660 repo_desc = repositories.descriptions[0] 

661 repo_id_obj = repo_desc.getId() 

662 

663 if repo_id_obj is None: 

664 if attempt < max_retries - 1: 

665 time.sleep(retry_delay) 

666 retry_delay *= 2 

667 continue 

668 else: 

669 raise RuntimeError( 

670 f"Repository description exists but getId() returned None after retries. " 

671 f"This may be a Python 3.11/Ice compatibility issue." 

672 ) 

673 

674 repository_id = repo_id_obj.getValue() 

675 

676 if repository_id is None: 

677 if attempt < max_retries - 1: 

678 time.sleep(retry_delay) 

679 retry_delay *= 2 

680 continue 

681 else: 

682 raise RuntimeError( 

683 f"Repository ID object exists but getValue() returned None after retries. " 

684 f"This may be a Python 3.11/Ice compatibility issue." 

685 ) 

686 

687 # Successfully got repository_id, create table 

688 table = resources.newTable(repository_id, f"{table_name}.h5") 

689 break # Success, exit retry loop 

690 

691 except Exception as e: 

692 if attempt < max_retries - 1 and "null table" in str(e).lower(): 

693 # Retry on "null table" errors 

694 time.sleep(retry_delay) 

695 retry_delay *= 2 

696 continue 

697 else: 

698 # Re-raise on final attempt or non-retryable errors 

699 raise 

700 

701 if table is None: 

702 raise RuntimeError("Failed to create OMERO.table") 

703 

704 try: 

705 # Initialize table with columns 

706 table.initialize(columns) 

707 

708 # Add all rows 

709 table.addData(columns) 

710 

711 # Get the OriginalFile for the table 

712 orig_file = table.getOriginalFile() 

713 

714 # Create FileAnnotation to link the table 

715 file_ann = FileAnnotationI() 

716 file_ann.setFile(orig_file) 

717 file_ann.setNs(rstring('openhcs.analysis.results.table')) 

718 file_ann.setDescription(rstring(f'Analysis results table: {table_name}')) 

719 file_ann = conn.getUpdateService().saveAndReturnObject(file_ann) 

720 

721 # Link to plate 

722 plate = conn.getObject("Plate", plate_id) 

723 if not plate: 

724 raise ValueError(f"Plate {plate_id} not found") 

725 

726 # Get the annotation ID and fetch as gateway object 

727 ann_id = file_ann.getId().getValue() 

728 file_ann_wrapped = conn.getObject("Annotation", ann_id) 

729 plate.linkAnnotation(file_ann_wrapped) 

730 logger.info(f"Created OMERO.table '{table_name}' and linked to plate {plate_id}") 

731 

732 finally: 

733 table.close() 

734 

735 def _save_text_annotation(self, text_content: str, output_path: Path, images_dir: str = None, **kwargs) -> None: 

736 """Save text content as a FileAnnotation attached to OMERO object. 

737 

738 Args: 

739 text_content: Text content to save 

740 output_path: Output path (used for filename) 

741 images_dir: Directory containing images (required to link annotation to correct plate) 

742 **kwargs: Additional arguments 

743 """ 

744 conn = self._get_connection(**kwargs) 

745 

746 # Validate images_dir is provided 

747 if not images_dir: 

748 raise ValueError( 

749 f"images_dir is required for OMERO annotation linking. " 

750 f"This should be passed from the materialization context. " 

751 f"Output path: {output_path}" 

752 ) 

753 

754 # Parse the images directory path to get the plate name, then query OMERO for actual plate ID 

755 # Path format: /omero/plate_274_outputs/images/ 

756 # The path contains the INPUT plate ID (274), but we need the OUTPUT plate ID 

757 # We must parse the full plate name and query OMERO to get the actual ID 

758 images_dir = Path(images_dir) 

759 plate_name, base_id, is_derived = self._parse_omero_path(images_dir) 

760 

761 # Query OMERO for the actual plate ID by name 

762 plate_id = self._find_plate_by_name(plate_name, **kwargs) 

763 if not plate_id: 

764 raise ValueError(f"Plate '{plate_name}' not found in OMERO (images dir: {images_dir})") 

765 

766 # Create FileAnnotation 

767 import tempfile 

768 

769 # Write content to temporary file 

770 with tempfile.NamedTemporaryFile(mode='w', suffix=output_path.suffix, delete=False) as tmp: 

771 tmp.write(text_content) 

772 tmp_path = tmp.name 

773 

774 try: 

775 # Upload file to OMERO with the actual filename from output_path 

776 # Get MIME type from registry 

777 mimetype = self.format_registry.get_mimetype(output_path.suffix) 

778 

779 file_ann = conn.createFileAnnfromLocalFile( 

780 tmp_path, 

781 origFilePathAndName=output_path.name, # Use actual filename, not temp name 

782 mimetype=mimetype, 

783 ns='openhcs.analysis.results', 

784 desc=f'Analysis results: {output_path.name}' 

785 ) 

786 

787 # Attach to plate 

788 plate = conn.getObject("Plate", plate_id) 

789 if plate: 

790 plate.linkAnnotation(file_ann) 

791 logger.info(f"Attached {output_path.name} as FileAnnotation to plate {plate_id}") 

792 else: 

793 logger.warning(f"Plate {plate_id} not found, FileAnnotation created but not linked") 

794 finally: 

795 # Clean up temp file 

796 import os 

797 os.unlink(tmp_path) 

798 

799 def _save_image(self, data: Any, output_path: Path, **kwargs) -> None: 

800 """Save image data to OMERO as new image.""" 

801 conn = self._get_connection(**kwargs) 

802 

803 dataset_id = kwargs.get('dataset_id') 

804 if not dataset_id: 

805 raise ValueError("dataset_id required") 

806 

807 dataset = conn.getObject("Dataset", dataset_id) 

808 if not dataset: 

809 raise ValueError(f"Dataset not found: {dataset_id}") 

810 

811 image_name = output_path.stem 

812 

813 # Get dimensions 

814 if data.ndim == 3: 

815 sizeZ, sizeY, sizeX = data.shape 

816 sizeC, sizeT = 1, 1 

817 elif data.ndim == 4: 

818 sizeZ, sizeC, sizeY, sizeX = data.shape 

819 sizeT = 1 

820 else: 

821 raise ValueError(f"Data must be 3D or 4D, got {data.shape}") 

822 

823 # Plane generator 

824 def planes(): 

825 if data.ndim == 3: 

826 for z in range(sizeZ): 

827 yield data[z] 

828 else: 

829 for z in range(sizeZ): 

830 for c in range(sizeC): 

831 yield data[z, c] 

832 

833 # Create image 

834 new_image = conn.createImageFromNumpySeq( 

835 planes(), 

836 image_name, 

837 sizeZ=sizeZ, 

838 sizeC=sizeC, 

839 sizeT=sizeT, 

840 description=kwargs.get('description', 'Processed by OpenHCS'), 

841 dataset=dataset 

842 ) 

843 

844 logger.info(f"Created OMERO image {new_image.getId()}: {image_name}") 

845 

846 def exists(self, path: Union[str, Path], **kwargs) -> bool: 

847 """ 

848 Check if a file/annotation exists in OMERO. 

849 

850 For text files (JSON/CSV): Check if FileAnnotation exists 

851 For images: Check if image exists in plate 

852 """ 

853 path = Path(path) 

854 

855 # For text files, check FileAnnotations using registry 

856 if self.format_registry.is_text_format(path.suffix): 

857 # For now, return False to allow overwrite 

858 # TODO: Implement proper FileAnnotation lookup 

859 return False 

860 

861 # For images, check if image exists 

862 # TODO: Implement proper image lookup 

863 return False 

864 

865 def delete(self, path: Union[str, Path], **kwargs) -> bool: 

866 """ 

867 Delete a file/annotation from OMERO. 

868 

869 For text files (JSON/CSV): Delete FileAnnotation 

870 For images: Delete image (if allowed) 

871 """ 

872 path = Path(path) 

873 

874 # For text files, delete FileAnnotation using registry 

875 if self.format_registry.is_text_format(path.suffix): 

876 # For now, just log and return success 

877 # FileAnnotations will be overwritten on save 

878 logger.debug(f"Delete requested for {path} - will be overwritten on save") 

879 return True 

880 

881 # For images, deletion not supported 

882 logger.warning(f"Delete not supported for OMERO images: {path}") 

883 return False 

884 

885 def _parse_omero_path(self, path: Path) -> Tuple[str, int, bool]: 

886 """Extract (plate_name, base_id, is_derived) from path. 

887 

888 This method extracts the OMERO plate name from a path by combining the base plate directory 

889 with any subdirectories (but NOT the filename). 

890 

891 Examples: 

892 /omero/plate_289 -> ("plate_289", 289, False) 

893 /omero/plate_289_outputs -> ("plate_289_outputs", 289, True) 

894 /omero/plate_289_outputs/images -> ("plate_289_outputs_images", 289, True) 

895 /omero/plate_289_outputs/images/A01.tif -> ("plate_289_outputs_images", 289, True) 

896 /omero/plate_289_outputs/images_results -> ("plate_289_outputs_images_results", 289, True) 

897 /omero/plate_289_outputs/checkpoints_step0/A01.tif -> ("plate_294_outputs_checkpoints_step0", 294, True) 

898 """ 

899 parts = path.parts 

900 if len(parts) < 2 or parts[0] != "/" or parts[1] != "omero": 

901 raise ValueError(f"Not an OMERO path: {path}") 

902 

903 base_name = parts[2] # "plate_289_outputs" 

904 # Extract subdirectories (everything between base_name and filename) 

905 # For /omero/plate_289_outputs/images/A01.tif, subdirs should be ["images"] 

906 # parts[3:-1] excludes both the base_name (parts[2]) and the filename (parts[-1]) 

907 subdirs = list(parts[3:-1]) if len(parts) > 4 else (list(parts[3:]) if len(parts) == 4 else []) 

908 

909 if not base_name.startswith("plate_"): 

910 raise ValueError(f"OMERO path must use 'plate_{{id}}' format: {base_name}") 

911 

912 name_parts = base_name.split("_") 

913 if len(name_parts) < 2 or not name_parts[1].isdigit(): 

914 raise ValueError(f"Cannot extract plate ID from: {base_name}") 

915 

916 base_id = int(name_parts[1]) 

917 plate_name = "_".join([base_name] + subdirs) if subdirs else base_name 

918 is_derived = len(subdirs) > 0 or len(name_parts) > 2 

919 

920 return plate_name, base_id, is_derived 

921 

922 def _get_image_id(self, plate_id: int, well_id: str, site: int, **kwargs) -> int: 

923 """Get OMERO image ID for well and site.""" 

924 if plate_id not in self._plate_metadata: 

925 self._load_plate_structure(plate_id, **kwargs) 

926 

927 plate_struct = self._plate_metadata[plate_id] 

928 if well_id not in plate_struct.wells: 

929 raise ValueError(f"Well {well_id} not found in plate {plate_id}") 

930 if site not in plate_struct.wells[well_id].sites: 

931 raise ValueError(f"Site {site} not found in well {well_id}") 

932 

933 return plate_struct.wells[well_id].sites[site].image_id 

934 

935 def _find_plate_by_name(self, plate_name: str, **kwargs) -> Optional[int]: 

936 """Query OMERO for plate by name.""" 

937 conn = self._get_connection(**kwargs) 

938 plates = conn.getObjects("Plate", attributes={"name": plate_name}) 

939 for plate in plates: 

940 return plate.getId() 

941 return None 

942 

943 def save_batch(self, data_list: List[Any], identifiers: List[Union[str, Path]], **kwargs) -> None: 

944 """Save multiple images to OMERO with plate creation and write support.""" 

945 if not identifiers: 

946 return 

947 

948 if len(data_list) != len(identifiers): 

949 raise ValueError(f"Length mismatch: {len(data_list)} vs {len(identifiers)}") 

950 

951 parser_name = kwargs.get('parser_name') 

952 if not parser_name: 

953 raise ValueError("parser_name required for OMERO save_batch") 

954 

955 microscope_type = kwargs.get('microscope_type') 

956 if not microscope_type: 

957 raise ValueError("microscope_type required for OMERO save_batch") 

958 

959 # Validate all paths are in same plate 

960 plate_names = set() 

961 for path in identifiers: 

962 plate_name, _, _ = self._parse_omero_path(Path(path)) 

963 plate_names.add(plate_name) 

964 

965 if len(plate_names) > 1: 

966 raise ValueError(f"Cannot save batch across multiple plates: {plate_names}") 

967 

968 parser = self._load_parser(parser_name) 

969 plate_name, base_id, is_derived = self._parse_omero_path(Path(identifiers[0])) 

970 

971 # Group data by image (well + site) 

972 images = defaultdict(lambda: {'planes': {}, 'max_z': 0, 'max_c': 0, 'max_t': 0}) 

973 for data, path in zip(data_list, identifiers): 

974 parsed = parser.parse_filename(Path(path).name) 

975 well_id, site = parsed['well'], parsed['site'] 

976 z, c, t = parsed.get('z_index', 1) - 1, parsed.get('channel', 1) - 1, parsed.get('timepoint', 1) - 1 

977 

978 image_key = (well_id, site) 

979 images[image_key]['planes'][(z, c, t)] = data 

980 images[image_key]['max_z'] = max(images[image_key]['max_z'], z + 1) 

981 images[image_key]['max_c'] = max(images[image_key]['max_c'], c + 1) 

982 images[image_key]['max_t'] = max(images[image_key]['max_t'], t + 1) 

983 

984 # Get or create plate with locking 

985 plate_id = self._plate_name_cache.get(plate_name) 

986 if plate_id is None: 

987 # Remove parser_name and microscope_type from kwargs to avoid duplicate argument error 

988 # (they're already passed as positional arguments) 

989 filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ('parser_name', 'microscope_type')} 

990 plate_id = self._get_or_create_plate_with_lock( 

991 plate_name, base_id, parser_name, microscope_type, images, **filtered_kwargs 

992 ) 

993 self._plate_name_cache[plate_name] = plate_id 

994 

995 # Write planes 

996 self._write_planes_to_plate(plate_id, images, **kwargs) 

997 

998 def _get_or_create_plate_with_lock(self, plate_name, base_id, parser_name, microscope_type, images, **kwargs): 

999 """Create plate with file locking (like zarr metadata).""" 

1000 lock_dir = Path.home() / '.openhcs' / 'omero_locks' 

1001 lock_dir.mkdir(parents=True, exist_ok=True) 

1002 lock_path = lock_dir / f"{plate_name}.lock" 

1003 

1004 try: 

1005 with open(lock_path, 'w') as lock_file: 

1006 if FCNTL_AVAILABLE: 

1007 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) 

1008 else: 

1009 portalocker.lock(lock_file, portalocker.LOCK_EX) 

1010 

1011 existing_id = self._find_plate_by_name(plate_name, **kwargs) 

1012 if existing_id: 

1013 self._load_plate_structure(existing_id, **kwargs) 

1014 return existing_id 

1015 

1016 return self._create_derived_plate(plate_name, base_id, parser_name, microscope_type, images, **kwargs) 

1017 finally: 

1018 if lock_path.exists(): 

1019 try: 

1020 lock_path.unlink() 

1021 except: 

1022 pass 

1023 

1024 def _create_derived_plate(self, plate_name, base_id, parser_name, microscope_type, images, **kwargs): 

1025 """Create plate from grouped image data.""" 

1026 conn = self._get_connection(**kwargs) 

1027 

1028 # Import OMERO model classes 

1029 import omero.model 

1030 from omero.rtypes import rstring, rint 

1031 from omero.model import NamedValue 

1032 

1033 update_service = conn.getUpdateService() 

1034 

1035 # Extract structure with MAX dimensions (Napari pattern) 

1036 wells_structure = defaultdict(lambda: {'sites': {}}) 

1037 for (well_id, site), img_data in images.items(): 

1038 max_height = max_width = 0 

1039 dtype = None 

1040 for plane_data in img_data['planes'].values(): 

1041 h, w = plane_data.shape 

1042 max_height, max_width = max(max_height, h), max(max_width, w) 

1043 if dtype is None: 

1044 dtype = plane_data.dtype 

1045 

1046 wells_structure[well_id]['sites'][site] = { 

1047 'sizeZ': img_data['max_z'], 'sizeC': img_data['max_c'], 'sizeT': img_data['max_t'], 

1048 'height': max_height, 'width': max_width, 'dtype': dtype 

1049 } 

1050 

1051 # Create plate 

1052 plate = omero.model.PlateI() 

1053 plate.setName(rstring(plate_name)) 

1054 plate.setColumnNamingConvention(rstring("number")) 

1055 plate.setRowNamingConvention(rstring("letter")) 

1056 plate = update_service.saveAndReturnObject(plate) 

1057 plate_id = plate.getId().getValue() 

1058 

1059 # Attach metadata 

1060 metadata_ann = omero.model.MapAnnotationI() 

1061 metadata_ann.setNs(rstring("openhcs.metadata")) 

1062 metadata_ann.setMapValue([ 

1063 NamedValue("openhcs.parser", parser_name), 

1064 NamedValue("openhcs.microscope_type", microscope_type) 

1065 ]) 

1066 plate.linkAnnotation(metadata_ann) 

1067 

1068 prov_ann = omero.model.MapAnnotationI() 

1069 prov_ann.setNs(rstring("openhcs.provenance")) 

1070 prov_ann.setMapValue([ 

1071 NamedValue("source_plate_id", str(base_id)), 

1072 NamedValue("created_by", "OpenHCS"), 

1073 NamedValue("timestamp", datetime.now().isoformat()) 

1074 ]) 

1075 plate.linkAnnotation(prov_ann) 

1076 update_service.saveObject(plate) 

1077 

1078 # Create wells WITHOUT images 

1079 # Images will be created with actual data in _write_planes_to_plate 

1080 # This fixes the bug where placeholder zero images caused first well to be black 

1081 for well_id, well_data in wells_structure.items(): 

1082 row, col = ord(well_id[0]) - ord('A'), int(well_id[1:]) - 1 

1083 well = omero.model.WellI() 

1084 well.setPlate(plate) 

1085 well.setRow(rint(row)) 

1086 well.setColumn(rint(col)) 

1087 update_service.saveAndReturnObject(well) 

1088 

1089 # Don't load plate structure yet - it will be loaded after images are written 

1090 # Store parser for later use 

1091 self._parser_cache[plate_id] = self._load_parser(parser_name) 

1092 return plate_id 

1093 

1094 def _write_planes_to_plate(self, plate_id, images, **kwargs): 

1095 """Write planes by creating complete images with all data at once.""" 

1096 conn = self._get_connection(**kwargs) 

1097 import omero.model 

1098 from omero.rtypes import rint 

1099 

1100 for (well_id, site), img_data in images.items(): 

1101 # Check if well/site already exists 

1102 try: 

1103 image_id = self._get_image_id(plate_id, well_id, site) 

1104 # Image exists - skip it (already written) 

1105 logger.info(f"Image for {well_id} site {site} already exists in plate {plate_id}, skipping") 

1106 continue 

1107 except ValueError: 

1108 # Image doesn't exist - create it with all planes 

1109 pass 

1110 

1111 # Calculate max dimensions for padding 

1112 max_height = max_width = 0 

1113 dtype = None 

1114 for plane_data in img_data['planes'].values(): 

1115 h, w = plane_data.shape 

1116 max_height, max_width = max(max_height, h), max(max_width, w) 

1117 if dtype is None: 

1118 dtype = plane_data.dtype 

1119 

1120 sizeZ = img_data['max_z'] 

1121 sizeC = img_data['max_c'] 

1122 sizeT = img_data['max_t'] 

1123 

1124 # Generate all planes in ZCT order with padding 

1125 def plane_generator(): 

1126 for t in range(sizeT): 

1127 for c in range(sizeC): 

1128 for z in range(sizeZ): 

1129 key = (z, c, t) 

1130 if key in img_data['planes']: 

1131 data = img_data['planes'][key] 

1132 

1133 # Convert CuPy arrays to NumPy (OMERO requires NumPy) 

1134 if hasattr(data, 'get'): # CuPy array 

1135 data = data.get() 

1136 

1137 h, w = data.shape 

1138 

1139 # Pad if needed 

1140 if h < max_height or w < max_width: 

1141 padded = np.zeros((max_height, max_width), dtype=dtype) 

1142 padded[:h, :w] = data 

1143 yield padded 

1144 else: 

1145 yield data 

1146 else: 

1147 # Missing plane - yield zeros 

1148 yield np.zeros((max_height, max_width), dtype=dtype) 

1149 

1150 # Create complete image with all planes at once 

1151 image = conn.createImageFromNumpySeq( 

1152 zctPlanes=plane_generator(), 

1153 imageName=f"{well_id}_s{site:03d}", 

1154 sizeZ=sizeZ, 

1155 sizeC=sizeC, 

1156 sizeT=sizeT, 

1157 description=f"OpenHCS processed image for well {well_id}, site {site}" 

1158 ) 

1159 

1160 # Link image to well 

1161 row, col = ord(well_id[0]) - ord('A'), int(well_id[1:]) - 1 

1162 

1163 # Check if well exists, create if not 

1164 query_service = conn.getQueryService() 

1165 params = omero.sys.ParametersI() 

1166 params.addLong("pid", plate_id) 

1167 params.add("row", rint(row)) 

1168 params.add("col", rint(col)) 

1169 

1170 query = "select w from Well as w where w.plate.id = :pid and w.row = :row and w.column = :col" 

1171 

1172 # Get or create lock for this specific well 

1173 lock_key = f"plate_{plate_id}_well_{row}_{col}" 

1174 with self._well_locks_lock: 

1175 if lock_key not in self._well_locks: 

1176 self._well_locks[lock_key] = threading.Lock() 

1177 well_lock = self._well_locks[lock_key] 

1178 

1179 # Use threading lock for thread safety + file lock for process safety 

1180 with well_lock: 

1181 lock_dir = Path.home() / '.openhcs' / 'omero_locks' 

1182 lock_dir.mkdir(parents=True, exist_ok=True) 

1183 lock_path = lock_dir / f"{lock_key}.lock" 

1184 

1185 try: 

1186 with open(lock_path, 'w') as lock_file: 

1187 if FCNTL_AVAILABLE: 

1188 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) 

1189 else: 

1190 portalocker.lock(lock_file, portalocker.LOCK_EX) 

1191 

1192 # Re-check if well exists after acquiring both locks 

1193 # Use findAllByQuery since findByQuery throws exception on null 

1194 wells = query_service.findAllByQuery(query, params) 

1195 well_obj = wells[0] if wells else None 

1196 

1197 if not well_obj: 

1198 # Create new well 

1199 update_service = conn.getUpdateService() 

1200 well = omero.model.WellI() 

1201 well.setPlate(omero.model.PlateI(plate_id, False)) 

1202 well.setRow(rint(row)) 

1203 well.setColumn(rint(col)) 

1204 well_obj = update_service.saveAndReturnObject(well) 

1205 finally: 

1206 if lock_path.exists(): 

1207 try: 

1208 lock_path.unlink() 

1209 except: 

1210 pass 

1211 

1212 # Link image to well 

1213 # Reload well with wellSamples collection loaded 

1214 well_obj_loaded = conn.getObject("Well", well_obj.getId().getValue()) 

1215 # Force load the wellSamples collection by accessing it 

1216 _ = list(well_obj_loaded.listChildren()) # This loads the collection 

1217 

1218 ws = omero.model.WellSampleI() 

1219 ws.setImage(omero.model.ImageI(image.getId(), False)) 

1220 ws.setWell(well_obj_loaded._obj) 

1221 well_obj_loaded._obj.addWellSample(ws) 

1222 conn.getUpdateService().saveObject(well_obj_loaded._obj) 

1223 

1224 # Reload plate structure to include new wells/images 

1225 self._load_plate_structure(plate_id) 

1226 

1227 def list_files(self, directory: Union[str, Path], pattern: str = "*", 

1228 extensions: Set[str] = None, recursive: bool = False, **kwargs) -> List[str]: 

1229 """ 

1230 Generate filenames on-demand from plate structure. 

1231 

1232 Args: 

1233 directory: Path containing plate ID (e.g., "/17/Images" or "17") 

1234 pattern: File pattern (currently ignored) 

1235 extensions: File extensions (currently ignored) 

1236 recursive: Recursion flag (currently ignored) 

1237 **kwargs: Additional backend-specific arguments (unused) 

1238 

1239 Returns: 

1240 List of filenames: ["A01_s001_w1_z001_t001.tif", ...] 

1241 """ 

1242 # Extract plate_id from path 

1243 # Path could be: "/omero/plate_55/Images" or "/17/Images" or "17/Images" or just "17" 

1244 path_parts = Path(directory).parts 

1245 

1246 # Find the numeric plate_id in the path 

1247 plate_id = None 

1248 for part in path_parts: 

1249 # Handle both "55" and "plate_55" formats 

1250 if part.isdigit(): 

1251 plate_id = int(part) 

1252 break 

1253 elif part.startswith("plate_"): 

1254 try: 

1255 plate_id = int(part.split("_")[1]) 

1256 break 

1257 except (IndexError, ValueError): 

1258 continue 

1259 

1260 if plate_id is None: 

1261 raise ValueError(f"Could not extract numeric plate_id from path: {directory}") 

1262 

1263 # Load plate structure if not cached 

1264 if plate_id not in self._plate_metadata: 

1265 self._load_plate_structure(plate_id) 

1266 

1267 plate_struct = self._plate_metadata[plate_id] 

1268 parser = self._parser_cache[plate_id] 

1269 

1270 # Generate filenames on-the-fly 

1271 filenames = [] 

1272 for well_id, well_struct in plate_struct.wells.items(): 

1273 for site_idx, image_struct in well_struct.sites.items(): 

1274 # Generate filename for each (z, c, t) combination 

1275 for t in range(image_struct.sizeT): 

1276 for z in range(image_struct.sizeZ): 

1277 for c in range(image_struct.sizeC): 

1278 filename = parser.construct_filename( 

1279 well=well_id, 

1280 site=site_idx, 

1281 channel=c + 1, 

1282 z_index=z + 1, 

1283 timepoint=t + 1, 

1284 extension='.tif' 

1285 ) 

1286 filenames.append(filename) 

1287 

1288 logger.debug(f"Generated {len(filenames)} filenames on-demand for plate {plate_id}") 

1289 return filenames 

1290 

1291 def exists(self, path: Union[str, Path]) -> bool: 

1292 """ 

1293 Check if a virtual OMERO path exists. 

1294 

1295 For OMERO virtual backend, paths always "exist" if they're valid OMERO paths. 

1296 This is because OMERO generates filenames on-demand from plate structure. 

1297 

1298 Args: 

1299 path: Virtual OMERO path to check 

1300 

1301 Returns: 

1302 True if path is a valid OMERO path format, False otherwise 

1303 """ 

1304 try: 

1305 # Check if path is valid OMERO format 

1306 path_str = str(path) 

1307 if not path_str.startswith("/omero/"): 

1308 return False 

1309 

1310 # Try to parse the path - if it parses, it's valid 

1311 self._parse_omero_path(Path(path)) 

1312 return True 

1313 except (ValueError, IndexError): 

1314 return False 

1315 

1316 def ensure_directory(self, directory: Union[str, Path]) -> None: 

1317 """ 

1318 Ensure directory exists (no-op for OMERO virtual backend). 

1319 

1320 OMERO is a virtual filesystem - directories don't exist as real entities. 

1321 Plates are created on-demand during save_batch operations. 

1322 This method exists to satisfy the backend interface but does nothing. 

1323 

1324 Args: 

1325 directory: Virtual directory path (ignored) 

1326 """ 

1327 # No-op for virtual backend - directories are implicit in OMERO 

1328 pass 

1329 

1330 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

1331 """Load multiple images from OMERO.""" 

1332 return [self.load(fp, **kwargs) for fp in file_paths] 

1333 

1334 def _save_rois(self, rois: List, output_path: Path, images_dir: str = None, **kwargs) -> str: 

1335 """Save ROIs to OMERO by linking to images in the materialized plate. 

1336 

1337 Args: 

1338 rois: List of ROI objects 

1339 output_path: Output path (e.g., /omero/plate_32_outputs/images_results/A01_rois_step7.json) 

1340 images_dir: Images directory path (required for OMERO to link ROIs to correct plate) 

1341 

1342 Returns: 

1343 String describing where ROIs were saved 

1344 """ 

1345 from openhcs.core.roi import PolygonShape, MaskShape, PointShape, EllipseShape 

1346 import omero.model 

1347 from omero.rtypes import rstring, rdouble, rint 

1348 

1349 conn = self._get_connection(**kwargs) 

1350 

1351 # Validate images_dir is provided 

1352 if not images_dir: 

1353 raise ValueError( 

1354 f"images_dir is required for OMERO ROI linking. " 

1355 f"This should be passed from the materialization context. " 

1356 f"Output path: {output_path}" 

1357 ) 

1358 

1359 images_dir = Path(images_dir) 

1360 

1361 # Parse the images directory path to get the plate name 

1362 plate_name, base_id, is_derived = self._parse_omero_path(images_dir) 

1363 

1364 # Query OMERO for the actual plate ID by name 

1365 plate_id = self._find_plate_by_name(plate_name, **kwargs) 

1366 if not plate_id: 

1367 raise ValueError(f"Plate '{plate_name}' not found in OMERO (images dir: {images_dir})") 

1368 

1369 # Extract well ID from filename (first component before underscore) 

1370 filename = output_path.name 

1371 well_id_from_filename = filename.split('_')[0] # "A01" or "A1" 

1372 

1373 # Query OMERO for images in this well of the materialized plate 

1374 plate = conn.getObject("Plate", plate_id) 

1375 if not plate: 

1376 raise ValueError(f"Plate {plate_id} not found in OMERO") 

1377 

1378 # Find well by label 

1379 # Note: getWellPos() returns format like "A1" (no zero-padding) 

1380 # but filenames might use "A01" (zero-padded), so we need to normalize 

1381 well = None 

1382 for w in plate.listChildren(): 

1383 well_pos = w.getWellPos() # e.g., "A1" 

1384 # Normalize both to compare: remove leading zeros from column number 

1385 # "A01" -> "A1", "A1" -> "A1" 

1386 normalized_filename_well = well_id_from_filename[0] + str(int(well_id_from_filename[1:])) 

1387 if well_pos == normalized_filename_well: 

1388 well = w 

1389 break 

1390 

1391 if not well: 

1392 raise ValueError(f"Well {well_id_from_filename} not found in plate {plate_id}") 

1393 

1394 # Get all images in this well 

1395 images = [] 

1396 for well_sample in well.listChildren(): 

1397 image = well_sample.getImage() 

1398 if image: 

1399 images.append(image) 

1400 

1401 if not images: 

1402 raise ValueError(f"No images found in well {well_id_from_filename} of plate {plate_id}") 

1403 

1404 # Link ROIs to ALL images in the well 

1405 # (ROIs were created from the full image stack at this step) 

1406 update_service = conn.getUpdateService() 

1407 roi_count = 0 

1408 

1409 for image in images: 

1410 for roi in rois: 

1411 # Create OMERO ROI object 

1412 omero_roi = omero.model.RoiI() 

1413 omero_roi.setImage(image._obj) 

1414 

1415 # Add shapes to ROI 

1416 for shape in roi.shapes: 

1417 if isinstance(shape, PolygonShape): 

1418 # Create OMERO polygon 

1419 polygon = omero.model.PolygonI() 

1420 

1421 # Convert coordinates to OMERO format (comma-separated string) 

1422 # OMERO expects "x1,y1 x2,y2 x3,y3 ..." 

1423 points_str = " ".join([f"{x},{y}" for y, x in shape.coordinates]) 

1424 polygon.setPoints(rstring(points_str)) 

1425 

1426 # Set metadata 

1427 if 'label' in roi.metadata: 

1428 polygon.setTextValue(rstring(str(roi.metadata['label']))) 

1429 

1430 omero_roi.addShape(polygon) 

1431 

1432 elif isinstance(shape, EllipseShape): 

1433 # Create OMERO ellipse 

1434 ellipse = omero.model.EllipseI() 

1435 ellipse.setX(rdouble(shape.center_x)) 

1436 ellipse.setY(rdouble(shape.center_y)) 

1437 ellipse.setRadiusX(rdouble(shape.radius_x)) 

1438 ellipse.setRadiusY(rdouble(shape.radius_y)) 

1439 

1440 if 'label' in roi.metadata: 

1441 ellipse.setTextValue(rstring(str(roi.metadata['label']))) 

1442 

1443 omero_roi.addShape(ellipse) 

1444 

1445 elif isinstance(shape, PointShape): 

1446 # Create OMERO point 

1447 point = omero.model.PointI() 

1448 point.setX(rdouble(shape.x)) 

1449 point.setY(rdouble(shape.y)) 

1450 

1451 if 'label' in roi.metadata: 

1452 point.setTextValue(rstring(str(roi.metadata['label']))) 

1453 

1454 omero_roi.addShape(point) 

1455 

1456 # Save ROI to OMERO 

1457 if omero_roi.sizeOfShapes() > 0: 

1458 update_service.saveAndReturnObject(omero_roi) 

1459 roi_count += 1 

1460 

1461 result_msg = f"Linked {len(rois)} ROIs to {len(images)} images in well {well_id_from_filename} (plate: {plate_name}, ID: {plate_id})" 

1462 logger.info(result_msg) 

1463 return result_msg