Coverage for openhcs/io/napari_stream.py: 27.0%
91 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2Napari streaming backend for real-time visualization during processing.
4This module provides a storage backend that streams image data to a napari viewer
5for real-time visualization during pipeline execution. Uses ZeroMQ for IPC
6and shared memory for efficient data transfer.
7"""
9import logging
10import time
11from pathlib import Path
12from typing import Any, Dict, List, Optional, Union, Set
13from os import PathLike
14import os
16import numpy as np
18from openhcs.io.streaming import StreamingBackend
19from openhcs.io.backend_registry import StorageBackendMeta
20from openhcs.constants.constants import Backend
21from openhcs.constants.constants import DEFAULT_NAPARI_STREAM_PORT
23logger = logging.getLogger(__name__)
26class NapariStreamingBackend(StreamingBackend, metaclass=StorageBackendMeta):
27 """Napari streaming backend with automatic metaclass registration."""
29 # Backend type from enum for registration
30 _backend_type = Backend.NAPARI_STREAM.value
31 """
32 Napari streaming backend for real-time visualization.
34 Streams image data to napari viewer using ZeroMQ.
35 Connects to existing NapariStreamVisualizer process.
36 Inherits from StreamingBackend - no file system operations.
37 """
39 def __init__(self):
40 """Initialize the napari streaming backend."""
41 self._publisher = None
42 self._context = None
43 self._shared_memory_blocks = {}
45 def _get_publisher(self, napari_port: int):
46 """Lazy initialization of ZeroMQ publisher."""
47 if self._publisher is None:
48 try:
49 import zmq
50 self._context = zmq.Context()
51 self._publisher = self._context.socket(zmq.PUB)
53 self._publisher.connect(f"tcp://localhost:{napari_port}")
54 logger.info(f"Napari streaming publisher connected to viewer on port {napari_port}")
56 # Small delay to ensure socket is ready
57 time.sleep(0.1)
59 except ImportError:
60 logger.error("ZeroMQ not available - napari streaming disabled")
61 raise RuntimeError("ZeroMQ required for napari streaming")
63 return self._publisher
67 def save(self, data: Any, file_path: Union[str, Path], **kwargs) -> None:
68 """Stream single image to napari."""
69 self.save_batch([data], [file_path], **kwargs)
71 def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], **kwargs) -> None:
72 """
73 Stream multiple images to napari as a batch.
75 Args:
76 data_list: List of image data
77 file_paths: List of path identifiers
78 **kwargs: Additional metadata
79 """
82 if len(data_list) != len(file_paths):
83 raise ValueError("data_list and file_paths must have the same length")
85 try:
86 publisher = self._get_publisher(kwargs['napari_port'])
87 display_config = kwargs['display_config']
88 microscope_handler = kwargs['microscope_handler']
89 step_index = kwargs.get('step_index', 0)
90 step_name = kwargs.get('step_name', 'unknown_step')
91 except KeyError as e:
92 raise
94 # Prepare batch of images
95 batch_images = []
96 for data, file_path in zip(data_list, file_paths):
97 # Convert to numpy
98 if hasattr(data, 'cpu'):
99 np_data = data.cpu().numpy()
100 elif hasattr(data, 'get'):
101 np_data = data.get()
102 else:
103 np_data = np.asarray(data)
105 # Create shared memory
106 from multiprocessing import shared_memory
107 shm_name = f"napari_{id(data)}_{time.time_ns()}"
108 shm = shared_memory.SharedMemory(create=True, size=np_data.nbytes, name=shm_name)
109 shm_array = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=shm.buf)
110 shm_array[:] = np_data[:]
111 self._shared_memory_blocks[shm_name] = shm
113 # Parse component metadata
114 filename = os.path.basename(str(file_path))
115 component_metadata = microscope_handler.parser.parse_filename(filename)
117 batch_images.append({
118 'path': str(file_path),
119 'shape': np_data.shape,
120 'dtype': str(np_data.dtype),
121 'shm_name': shm_name,
122 'component_metadata': component_metadata,
123 'step_index': step_index,
124 'step_name': step_name
125 })
127 # Build component modes
128 from openhcs.constants import VariableComponents
129 component_modes = {}
130 for component in VariableComponents:
131 field_name = f"{component.value}_mode"
132 mode = getattr(display_config, field_name)
133 component_modes[component.value] = mode.value
136 # Include well if available on the display config (not always part of VariableComponents)
137 if hasattr(display_config, 'well_mode'):
138 component_modes['well'] = display_config.well_mode.value
140 # Send batch message
141 message = {
142 'type': 'batch',
143 'images': batch_images,
144'display_config': {
145 'colormap': display_config.get_colormap_name(),
146 'component_modes': component_modes
147 },
148 'timestamp': time.time()
149 }
151 publisher.send_json(message)
153 # REMOVED: All file system methods (load, load_batch, exists, list_files, delete, etc.)
154 # These are no longer inherited - clean interface!
156 def cleanup_connections(self) -> None:
157 """Clean up ZeroMQ connections without affecting shared memory or napari window."""
158 # Close publisher and context
159 if self._publisher is not None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 self._publisher.close()
161 self._publisher = None
163 if self._context is not None: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 self._context.term()
165 self._context = None
167 logger.debug("Napari streaming connections cleaned up")
169 def cleanup(self) -> None:
170 """Clean up shared memory blocks and close publisher.
172 Note: This does NOT close the napari window - it should remain open
173 for future test executions and user interaction.
174 """
175 # Clean up shared memory blocks
176 for shm_name, shm in self._shared_memory_blocks.items():
177 try:
178 shm.close()
179 shm.unlink()
180 except Exception as e:
181 logger.warning(f"Failed to cleanup shared memory {shm_name}: {e}")
183 self._shared_memory_blocks.clear()
185 # Clean up connections
186 self.cleanup_connections()
188 logger.debug("Napari streaming backend cleaned up (napari window remains open)")
190 def __del__(self):
191 """Cleanup on deletion."""
192 self.cleanup()