Coverage for openhcs/io/napari_stream.py: 20.2%

73 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Napari streaming backend for real-time visualization during processing. 

3 

4This module provides a storage backend that streams image data to a napari viewer 

5for real-time visualization during pipeline execution. Uses ZeroMQ for IPC 

6and shared memory for efficient data transfer. 

7 

8SHARED MEMORY OWNERSHIP MODEL: 

9- Sender (Worker): Creates shared memory, sends reference via ZMQ, closes handle (does NOT unlink) 

10- Receiver (Napari Server): Attaches to shared memory, copies data, closes handle, unlinks 

11- Only receiver calls unlink() to prevent FileNotFoundError 

12- PUB/SUB socket pattern is non-blocking; receiver must copy data before sender closes handle 

13""" 

14 

15import logging 

16import time 

17from pathlib import Path 

18from typing import Any, List, Union 

19import os 

20 

21from openhcs.core.config import TransportMode 

22 

23import numpy as np 

24 

25from openhcs.io.streaming import StreamingBackend 

26from openhcs.constants.constants import Backend 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class NapariStreamingBackend(StreamingBackend): 

32 """Napari streaming backend with automatic registration.""" 

33 _backend_type = Backend.NAPARI_STREAM.value 

34 

35 # Configure ABC attributes 

36 VIEWER_TYPE = 'napari' 

37 SHM_PREFIX = 'napari_' 

38 

39 # __init__, _get_publisher, save, cleanup now inherited from ABC 

40 

41 def _prepare_shapes_data(self, data: Any, file_path: Union[str, Path]) -> dict: 

42 """ 

43 Prepare shapes data for transmission. 

44 

45 Args: 

46 data: ROI list 

47 file_path: Path identifier 

48 

49 Returns: 

50 Dict with shapes data 

51 """ 

52 from openhcs.runtime.roi_converters import NapariROIConverter 

53 shapes_data = NapariROIConverter.rois_to_shapes(data) 

54 

55 return { 

56 'path': str(file_path), 

57 'shapes': shapes_data, 

58 } 

59 

60 def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], **kwargs) -> None: 

61 """ 

62 Stream multiple images or ROIs to napari as a batch. 

63 

64 Args: 

65 data_list: List of image data or ROI lists 

66 file_paths: List of path identifiers 

67 **kwargs: Additional metadata 

68 """ 

69 from openhcs.constants.streaming import StreamingDataType 

70 

71 if len(data_list) != len(file_paths): 

72 raise ValueError("data_list and file_paths must have the same length") 

73 

74 # Extract kwargs using generic polymorphic names 

75 host = kwargs.get('host', 'localhost') 

76 port = kwargs['port'] 

77 transport_mode = kwargs.get('transport_mode', TransportMode.IPC) 

78 publisher = self._get_publisher(host, port, transport_mode) 

79 display_config = kwargs['display_config'] 

80 microscope_handler = kwargs['microscope_handler'] 

81 source = kwargs.get('source', 'unknown_source') # Pre-built source value 

82 

83 # Prepare batch of images/ROIs 

84 batch_images = [] 

85 image_ids = [] 

86 

87 for data, file_path in zip(data_list, file_paths): 

88 # Generate unique ID 

89 import uuid 

90 image_id = str(uuid.uuid4()) 

91 image_ids.append(image_id) 

92 

93 # Detect data type using ABC helper 

94 data_type = self._detect_data_type(data) 

95 

96 # Parse component metadata using ABC helper (ONCE for all types) 

97 component_metadata = self._parse_component_metadata( 

98 file_path, microscope_handler, source 

99 ) 

100 

101 # Prepare data based on type 

102 if data_type == StreamingDataType.SHAPES: 

103 item_data = self._prepare_shapes_data(data, file_path) 

104 else: # IMAGE 

105 item_data = self._create_shared_memory(data, file_path) 

106 

107 # Build batch item 

108 batch_images.append({ 

109 **item_data, 

110 'data_type': data_type.value, 

111 'metadata': component_metadata, 

112 'image_id': image_id 

113 }) 

114 

115 # Build component modes for ALL components in component_order (including virtual components) 

116 component_modes = {} 

117 for comp_name in display_config.COMPONENT_ORDER: 

118 mode_field = f"{comp_name}_mode" 

119 if hasattr(display_config, mode_field): 

120 mode = getattr(display_config, mode_field) 

121 component_modes[comp_name] = mode.value 

122 

123 # Send batch message 

124 message = { 

125 'type': 'batch', 

126 'images': batch_images, 

127 'display_config': { 

128 'colormap': display_config.get_colormap_name(), 

129 'component_modes': component_modes, 

130 'component_order': display_config.COMPONENT_ORDER, 

131 'variable_size_handling': display_config.variable_size_handling.value if hasattr(display_config, 'variable_size_handling') and display_config.variable_size_handling else None 

132 }, 

133 'timestamp': time.time() 

134 } 

135 

136 # Register sent images with queue tracker BEFORE sending 

137 # This prevents race condition with IPC mode where acks arrive before registration 

138 self._register_with_queue_tracker(port, image_ids) 

139 

140 # Send non-blocking to prevent hanging if Napari is slow to process (matches Fiji pattern) 

141 import zmq 

142 send_succeeded = False 

143 try: 

144 publisher.send_json(message, flags=zmq.NOBLOCK) 

145 send_succeeded = True 

146 

147 except zmq.Again: 

148 logger.warning(f"Napari viewer busy, dropped batch of {len(batch_images)} images (port {port})") 

149 

150 except Exception as e: 

151 logger.error(f"Failed to send batch to Napari on port {port}: {e}", exc_info=True) 

152 raise # Re-raise the exception so the pipeline knows it failed 

153 

154 finally: 

155 # Unified cleanup: close our handle after successful send, close+unlink after failure 

156 self._cleanup_shared_memory(batch_images, unlink=not send_succeeded) 

157 

158 def _cleanup_shared_memory(self, batch_images, unlink=False): 

159 """Clean up shared memory blocks for a batch of images. 

160 

161 Args: 

162 batch_images: List of image dictionaries with optional 'shm_name' keys 

163 unlink: If True, both close and unlink. If False, only close (viewer will unlink) 

164 """ 

165 for img in batch_images: 

166 shm_name = img.get('shm_name') # ROI items don't have shm_name 

167 if shm_name and shm_name in self._shared_memory_blocks: 

168 try: 

169 shm = self._shared_memory_blocks.pop(shm_name) 

170 shm.close() 

171 if unlink: 

172 shm.unlink() 

173 except Exception as e: 

174 logger.warning(f"Failed to cleanup shared memory {shm_name}: {e}") 

175 

176 # cleanup() now inherited from ABC 

177 

178 def __del__(self): 

179 """Cleanup on deletion.""" 

180 self.cleanup()