Coverage for openhcs/io/fiji_stream.py: 18.1%
80 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Fiji streaming backend for OpenHCS.
4Streams image data to Fiji/ImageJ viewer using ZMQ for IPC.
5Follows same architecture as Napari streaming for consistency.
7SHARED MEMORY OWNERSHIP MODEL:
8- Sender (Worker): Creates shared memory, sends reference via ZMQ, closes handle (does NOT unlink)
9- Receiver (Fiji Server): Attaches to shared memory, copies data, closes handle, unlinks
10- Only receiver calls unlink() to prevent FileNotFoundError
11- REQ/REP socket pattern ensures receiver copies data before sender closes handle
12"""
14import logging
15import time
16from pathlib import Path
17from typing import Any, Union, List
18import os
19import numpy as np
21from openhcs.io.streaming import StreamingBackend
22from openhcs.constants.constants import Backend
23from openhcs.core.config import TransportMode
25logger = logging.getLogger(__name__)
28class FijiStreamingBackend(StreamingBackend):
29 """Fiji streaming backend with ZMQ publisher pattern (matches Napari architecture)."""
30 _backend_type = Backend.FIJI_STREAM.value
32 # Configure ABC attributes
33 VIEWER_TYPE = 'fiji'
34 SHM_PREFIX = 'fiji_'
36 # __init__, _get_publisher, save, cleanup now inherited from ABC
38 def _prepare_rois_data(self, data: Any, file_path: Union[str, Path]) -> dict:
39 """
40 Prepare ROIs data for transmission.
42 Args:
43 data: ROI list
44 file_path: Path identifier
46 Returns:
47 Dict with ROI data
48 """
49 from openhcs.runtime.roi_converters import FijiROIConverter
51 # Convert ROI objects to bytes, then base64 encode for transmission
52 roi_bytes_list = FijiROIConverter.rois_to_imagej_bytes(data)
53 rois_encoded = FijiROIConverter.encode_rois_for_transmission(roi_bytes_list)
55 return {
56 'path': str(file_path),
57 'rois': rois_encoded,
58 }
60 def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], **kwargs) -> None:
61 """Stream batch of images or ROIs to Fiji via ZMQ."""
62 from openhcs.constants.streaming import StreamingDataType
64 if len(data_list) != len(file_paths):
65 raise ValueError("data_list and file_paths must have same length")
67 logger.info(f"📦 FIJI BACKEND: save_batch called with {len(data_list)} items")
69 # Extract kwargs using generic polymorphic names
70 host = kwargs.get('host', 'localhost')
71 port = kwargs['port']
72 transport_mode = kwargs.get('transport_mode', TransportMode.IPC)
73 publisher = self._get_publisher(host, port, transport_mode)
74 display_config = kwargs['display_config']
75 microscope_handler = kwargs['microscope_handler']
76 source = kwargs.get('source', 'unknown_source') # Pre-built source value
77 images_dir = kwargs.get('images_dir') # Source image subdirectory for ROI mapping
79 # Prepare batch messages
80 batch_images = []
81 image_ids = []
83 for data, file_path in zip(data_list, file_paths):
84 # Generate unique ID
85 import uuid
86 image_id = str(uuid.uuid4())
87 image_ids.append(image_id)
89 # Detect data type using ABC helper
90 data_type = self._detect_data_type(data)
91 logger.info(f"🔍 FIJI BACKEND: Detected data type: {data_type} for path: {file_path}")
93 # Parse component metadata using ABC helper (ONCE for all types)
94 component_metadata = self._parse_component_metadata(
95 file_path, microscope_handler, source
96 )
98 # Prepare data based on type
99 if data_type == StreamingDataType.SHAPES: # ROIs for Fiji
100 logger.info(f"🔍 FIJI BACKEND: Preparing ROI data for {file_path}")
101 item_data = self._prepare_rois_data(data, file_path)
102 data_type_str = 'rois' # Fiji uses 'rois' not 'shapes'
103 logger.info(f"🔍 FIJI BACKEND: ROI data prepared: {len(item_data.get('rois', []))} ROIs")
104 else: # IMAGE
105 logger.info(f"🔍 FIJI BACKEND: Preparing image data for {file_path}")
106 item_data = self._create_shared_memory(data, file_path)
107 data_type_str = 'image'
109 # Build batch item
110 batch_images.append({
111 **item_data,
112 'data_type': data_type_str,
113 'metadata': component_metadata,
114 'image_id': image_id
115 })
116 logger.info(f"🔍 FIJI BACKEND: Added {data_type_str} item to batch")
118 # Extract component modes for ALL components in component_order (including virtual components)
119 component_modes = {}
120 for comp_name in display_config.COMPONENT_ORDER:
121 mode_field = f"{comp_name}_mode"
122 if hasattr(display_config, mode_field):
123 mode = getattr(display_config, mode_field)
124 component_modes[comp_name] = mode.value
126 # Send batch message
127 message = {
128 'type': 'batch',
129 'images': batch_images,
130 'display_config': {
131 'lut': display_config.get_lut_name(),
132 'component_modes': component_modes,
133 'component_order': display_config.COMPONENT_ORDER,
134 'auto_contrast': display_config.auto_contrast if hasattr(display_config, 'auto_contrast') else True
135 },
136 'images_dir': images_dir, # Source image subdirectory for ROI->image mapping
137 'timestamp': time.time()
138 }
140 # Log batch composition
141 data_types = [item['data_type'] for item in batch_images]
142 type_counts = {dt: data_types.count(dt) for dt in set(data_types)}
143 logger.info(f"📤 FIJI BACKEND: Sending batch message with {len(batch_images)} items to port {port}: {type_counts}")
145 # Register sent images with queue tracker BEFORE sending
146 # This prevents race condition with IPC mode where acks arrive before registration
147 self._register_with_queue_tracker(port, image_ids)
149 # Send with REQ socket (BLOCKING - worker waits for Fiji to acknowledge)
150 # Worker blocks until Fiji receives, copies data from shared memory, and sends ack
151 # This guarantees no messages are lost and shared memory is only closed after Fiji is done
152 logger.info(f"📤 FIJI BACKEND: Sending batch of {len(batch_images)} images to Fiji on port {port} (REQ/REP - blocking until ack)")
153 publisher.send_json(message) # Blocking send
155 # Wait for acknowledgment from Fiji (REP socket)
156 # Fiji will only reply after it has copied all data from shared memory
157 ack_response = publisher.recv_json()
158 logger.info(f"✅ FIJI BACKEND: Received ack from Fiji: {ack_response.get('status', 'unknown')}")
160 # Clean up publisher's handles after successful send
161 # Receiver will unlink the shared memory after copying the data
162 for img in batch_images:
163 shm_name = img.get('shm_name') # ROI items don't have shm_name
164 if shm_name and shm_name in self._shared_memory_blocks:
165 try:
166 shm = self._shared_memory_blocks.pop(shm_name)
167 shm.close() # Close our handle, but don't unlink - receiver will do that
168 except Exception as e:
169 logger.warning(f"Failed to close shared memory handle {shm_name}: {e}")
171 # cleanup() now inherited from ABC
173 def __del__(self):
174 """Cleanup on deletion."""
175 import logging
176 logger = logging.getLogger(__name__)
177 logger.info("🔥 FIJI __del__ called, about to call cleanup()")
178 self.cleanup()
179 logger.info("🔥 FIJI __del__ cleanup() returned")