Coverage for openhcs/io/napari_stream.py: 27.0%

91 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2Napari streaming backend for real-time visualization during processing. 

3 

4This module provides a storage backend that streams image data to a napari viewer 

5for real-time visualization during pipeline execution. Uses ZeroMQ for IPC 

6and shared memory for efficient data transfer. 

7""" 

8 

9import logging 

10import time 

11from pathlib import Path 

12from typing import Any, Dict, List, Optional, Union, Set 

13from os import PathLike 

14import os 

15 

16import numpy as np 

17 

18from openhcs.io.streaming import StreamingBackend 

19from openhcs.io.backend_registry import StorageBackendMeta 

20from openhcs.constants.constants import Backend 

21from openhcs.constants.constants import DEFAULT_NAPARI_STREAM_PORT 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26class NapariStreamingBackend(StreamingBackend, metaclass=StorageBackendMeta): 

27 """Napari streaming backend with automatic metaclass registration.""" 

28 

29 # Backend type from enum for registration 

30 _backend_type = Backend.NAPARI_STREAM.value 

31 """ 

32 Napari streaming backend for real-time visualization. 

33 

34 Streams image data to napari viewer using ZeroMQ. 

35 Connects to existing NapariStreamVisualizer process. 

36 Inherits from StreamingBackend - no file system operations. 

37 """ 

38 

39 def __init__(self): 

40 """Initialize the napari streaming backend.""" 

41 self._publisher = None 

42 self._context = None 

43 self._shared_memory_blocks = {} 

44 

45 def _get_publisher(self, napari_port: int): 

46 """Lazy initialization of ZeroMQ publisher.""" 

47 if self._publisher is None: 

48 try: 

49 import zmq 

50 self._context = zmq.Context() 

51 self._publisher = self._context.socket(zmq.PUB) 

52 

53 self._publisher.connect(f"tcp://localhost:{napari_port}") 

54 logger.info(f"Napari streaming publisher connected to viewer on port {napari_port}") 

55 

56 # Small delay to ensure socket is ready 

57 time.sleep(0.1) 

58 

59 except ImportError: 

60 logger.error("ZeroMQ not available - napari streaming disabled") 

61 raise RuntimeError("ZeroMQ required for napari streaming") 

62 

63 return self._publisher 

64 

65 

66 

67 def save(self, data: Any, file_path: Union[str, Path], **kwargs) -> None: 

68 """Stream single image to napari.""" 

69 self.save_batch([data], [file_path], **kwargs) 

70 

71 def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], **kwargs) -> None: 

72 """ 

73 Stream multiple images to napari as a batch. 

74 

75 Args: 

76 data_list: List of image data 

77 file_paths: List of path identifiers 

78 **kwargs: Additional metadata 

79 """ 

80 

81 

82 if len(data_list) != len(file_paths): 

83 raise ValueError("data_list and file_paths must have the same length") 

84 

85 try: 

86 publisher = self._get_publisher(kwargs['napari_port']) 

87 display_config = kwargs['display_config'] 

88 microscope_handler = kwargs['microscope_handler'] 

89 step_index = kwargs.get('step_index', 0) 

90 step_name = kwargs.get('step_name', 'unknown_step') 

91 except KeyError as e: 

92 raise 

93 

94 # Prepare batch of images 

95 batch_images = [] 

96 for data, file_path in zip(data_list, file_paths): 

97 # Convert to numpy 

98 if hasattr(data, 'cpu'): 

99 np_data = data.cpu().numpy() 

100 elif hasattr(data, 'get'): 

101 np_data = data.get() 

102 else: 

103 np_data = np.asarray(data) 

104 

105 # Create shared memory 

106 from multiprocessing import shared_memory 

107 shm_name = f"napari_{id(data)}_{time.time_ns()}" 

108 shm = shared_memory.SharedMemory(create=True, size=np_data.nbytes, name=shm_name) 

109 shm_array = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=shm.buf) 

110 shm_array[:] = np_data[:] 

111 self._shared_memory_blocks[shm_name] = shm 

112 

113 # Parse component metadata 

114 filename = os.path.basename(str(file_path)) 

115 component_metadata = microscope_handler.parser.parse_filename(filename) 

116 

117 batch_images.append({ 

118 'path': str(file_path), 

119 'shape': np_data.shape, 

120 'dtype': str(np_data.dtype), 

121 'shm_name': shm_name, 

122 'component_metadata': component_metadata, 

123 'step_index': step_index, 

124 'step_name': step_name 

125 }) 

126 

127 # Build component modes 

128 from openhcs.constants import VariableComponents 

129 component_modes = {} 

130 for component in VariableComponents: 

131 field_name = f"{component.value}_mode" 

132 mode = getattr(display_config, field_name) 

133 component_modes[component.value] = mode.value 

134 

135 

136 # Include well if available on the display config (not always part of VariableComponents) 

137 if hasattr(display_config, 'well_mode'): 

138 component_modes['well'] = display_config.well_mode.value 

139 

140 # Send batch message 

141 message = { 

142 'type': 'batch', 

143 'images': batch_images, 

144'display_config': { 

145 'colormap': display_config.get_colormap_name(), 

146 'component_modes': component_modes 

147 }, 

148 'timestamp': time.time() 

149 } 

150 

151 publisher.send_json(message) 

152 

153 # REMOVED: All file system methods (load, load_batch, exists, list_files, delete, etc.) 

154 # These are no longer inherited - clean interface! 

155 

156 def cleanup_connections(self) -> None: 

157 """Clean up ZeroMQ connections without affecting shared memory or napari window.""" 

158 # Close publisher and context 

159 if self._publisher is not None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 self._publisher.close() 

161 self._publisher = None 

162 

163 if self._context is not None: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 self._context.term() 

165 self._context = None 

166 

167 logger.debug("Napari streaming connections cleaned up") 

168 

169 def cleanup(self) -> None: 

170 """Clean up shared memory blocks and close publisher. 

171 

172 Note: This does NOT close the napari window - it should remain open 

173 for future test executions and user interaction. 

174 """ 

175 # Clean up shared memory blocks 

176 for shm_name, shm in self._shared_memory_blocks.items(): 

177 try: 

178 shm.close() 

179 shm.unlink() 

180 except Exception as e: 

181 logger.warning(f"Failed to cleanup shared memory {shm_name}: {e}") 

182 

183 self._shared_memory_blocks.clear() 

184 

185 # Clean up connections 

186 self.cleanup_connections() 

187 

188 logger.debug("Napari streaming backend cleaned up (napari window remains open)") 

189 

190 def __del__(self): 

191 """Cleanup on deletion.""" 

192 self.cleanup()