Coverage for openhcs/io/napari_stream.py: 20.2%
73 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Napari streaming backend for real-time visualization during processing.
4This module provides a storage backend that streams image data to a napari viewer
5for real-time visualization during pipeline execution. Uses ZeroMQ for IPC
6and shared memory for efficient data transfer.
8SHARED MEMORY OWNERSHIP MODEL:
9- Sender (Worker): Creates shared memory, sends reference via ZMQ, closes handle (does NOT unlink)
10- Receiver (Napari Server): Attaches to shared memory, copies data, closes handle, unlinks
11- Only receiver calls unlink() to prevent FileNotFoundError
12- PUB/SUB socket pattern is non-blocking; receiver must copy data before sender closes handle
13"""
15import logging
16import time
17from pathlib import Path
18from typing import Any, List, Union
19import os
21from openhcs.core.config import TransportMode
23import numpy as np
25from openhcs.io.streaming import StreamingBackend
26from openhcs.constants.constants import Backend
28logger = logging.getLogger(__name__)
31class NapariStreamingBackend(StreamingBackend):
32 """Napari streaming backend with automatic registration."""
33 _backend_type = Backend.NAPARI_STREAM.value
35 # Configure ABC attributes
36 VIEWER_TYPE = 'napari'
37 SHM_PREFIX = 'napari_'
39 # __init__, _get_publisher, save, cleanup now inherited from ABC
41 def _prepare_shapes_data(self, data: Any, file_path: Union[str, Path]) -> dict:
42 """
43 Prepare shapes data for transmission.
45 Args:
46 data: ROI list
47 file_path: Path identifier
49 Returns:
50 Dict with shapes data
51 """
52 from openhcs.runtime.roi_converters import NapariROIConverter
53 shapes_data = NapariROIConverter.rois_to_shapes(data)
55 return {
56 'path': str(file_path),
57 'shapes': shapes_data,
58 }
60 def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], **kwargs) -> None:
61 """
62 Stream multiple images or ROIs to napari as a batch.
64 Args:
65 data_list: List of image data or ROI lists
66 file_paths: List of path identifiers
67 **kwargs: Additional metadata
68 """
69 from openhcs.constants.streaming import StreamingDataType
71 if len(data_list) != len(file_paths):
72 raise ValueError("data_list and file_paths must have the same length")
74 # Extract kwargs using generic polymorphic names
75 host = kwargs.get('host', 'localhost')
76 port = kwargs['port']
77 transport_mode = kwargs.get('transport_mode', TransportMode.IPC)
78 publisher = self._get_publisher(host, port, transport_mode)
79 display_config = kwargs['display_config']
80 microscope_handler = kwargs['microscope_handler']
81 source = kwargs.get('source', 'unknown_source') # Pre-built source value
83 # Prepare batch of images/ROIs
84 batch_images = []
85 image_ids = []
87 for data, file_path in zip(data_list, file_paths):
88 # Generate unique ID
89 import uuid
90 image_id = str(uuid.uuid4())
91 image_ids.append(image_id)
93 # Detect data type using ABC helper
94 data_type = self._detect_data_type(data)
96 # Parse component metadata using ABC helper (ONCE for all types)
97 component_metadata = self._parse_component_metadata(
98 file_path, microscope_handler, source
99 )
101 # Prepare data based on type
102 if data_type == StreamingDataType.SHAPES:
103 item_data = self._prepare_shapes_data(data, file_path)
104 else: # IMAGE
105 item_data = self._create_shared_memory(data, file_path)
107 # Build batch item
108 batch_images.append({
109 **item_data,
110 'data_type': data_type.value,
111 'metadata': component_metadata,
112 'image_id': image_id
113 })
115 # Build component modes for ALL components in component_order (including virtual components)
116 component_modes = {}
117 for comp_name in display_config.COMPONENT_ORDER:
118 mode_field = f"{comp_name}_mode"
119 if hasattr(display_config, mode_field):
120 mode = getattr(display_config, mode_field)
121 component_modes[comp_name] = mode.value
123 # Send batch message
124 message = {
125 'type': 'batch',
126 'images': batch_images,
127 'display_config': {
128 'colormap': display_config.get_colormap_name(),
129 'component_modes': component_modes,
130 'component_order': display_config.COMPONENT_ORDER,
131 'variable_size_handling': display_config.variable_size_handling.value if hasattr(display_config, 'variable_size_handling') and display_config.variable_size_handling else None
132 },
133 'timestamp': time.time()
134 }
136 # Register sent images with queue tracker BEFORE sending
137 # This prevents race condition with IPC mode where acks arrive before registration
138 self._register_with_queue_tracker(port, image_ids)
140 # Send non-blocking to prevent hanging if Napari is slow to process (matches Fiji pattern)
141 import zmq
142 send_succeeded = False
143 try:
144 publisher.send_json(message, flags=zmq.NOBLOCK)
145 send_succeeded = True
147 except zmq.Again:
148 logger.warning(f"Napari viewer busy, dropped batch of {len(batch_images)} images (port {port})")
150 except Exception as e:
151 logger.error(f"Failed to send batch to Napari on port {port}: {e}", exc_info=True)
152 raise # Re-raise the exception so the pipeline knows it failed
154 finally:
155 # Unified cleanup: close our handle after successful send, close+unlink after failure
156 self._cleanup_shared_memory(batch_images, unlink=not send_succeeded)
158 def _cleanup_shared_memory(self, batch_images, unlink=False):
159 """Clean up shared memory blocks for a batch of images.
161 Args:
162 batch_images: List of image dictionaries with optional 'shm_name' keys
163 unlink: If True, both close and unlink. If False, only close (viewer will unlink)
164 """
165 for img in batch_images:
166 shm_name = img.get('shm_name') # ROI items don't have shm_name
167 if shm_name and shm_name in self._shared_memory_blocks:
168 try:
169 shm = self._shared_memory_blocks.pop(shm_name)
170 shm.close()
171 if unlink:
172 shm.unlink()
173 except Exception as e:
174 logger.warning(f"Failed to cleanup shared memory {shm_name}: {e}")
176 # cleanup() now inherited from ABC
178 def __del__(self):
179 """Cleanup on deletion."""
180 self.cleanup()