Coverage for openhcs/io/streaming.py: 17.2%

106 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Streaming backend interfaces for OpenHCS. 

3 

4This module provides abstract base classes for streaming data destinations 

5that send data to external systems without persistent storage capabilities. 

6""" 

7 

8import logging 

9import time 

10import os 

11from pathlib import Path 

12from typing import Any, List, Union 

13import numpy as np 

14 

15from openhcs.io.base import DataSink 

16from openhcs.runtime.zmq_base import get_zmq_transport_url 

17from openhcs.core.config import TransportMode 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22class StreamingBackend(DataSink): 

23 """ 

24 Abstract base class for ZeroMQ-based streaming backends. 

25 

26 Provides common ZeroMQ publisher management, shared memory handling, 

27 and component metadata parsing for all streaming backends. 

28 

29 Subclasses must define abstract class attributes: 

30 - VIEWER_TYPE: str (e.g., 'napari', 'fiji') 

31 - SHM_PREFIX: str (e.g., 'napari_', 'fiji_') 

32 

33 All streaming backends use generic 'host' and 'port' kwargs for polymorphism. 

34 

35 Concrete implementations should use StorageBackendMeta for automatic registration. 

36 """ 

37 

38 # Abstract class attributes that subclasses must define 

39 VIEWER_TYPE: str = None 

40 SHM_PREFIX: str = None 

41 

42 def __init__(self): 

43 """Initialize ZeroMQ and shared memory infrastructure.""" 

44 self._publishers = {} 

45 self._context = None 

46 self._shared_memory_blocks = {} 

47 

48 def _get_publisher(self, host: str, port: int, transport_mode: TransportMode = TransportMode.IPC): 

49 """ 

50 Lazy initialization of ZeroMQ publisher (common for all streaming backends). 

51 

52 Uses REQ socket for Fiji (synchronous request/reply with blocking) 

53 and PUB socket for Napari (broadcast pattern). 

54 

55 Args: 

56 host: Host to connect to (ignored for IPC mode) 

57 port: Port to connect to 

58 transport_mode: IPC or TCP transport 

59 

60 Returns: 

61 ZeroMQ publisher socket 

62 """ 

63 # Generate transport URL using centralized function 

64 url = get_zmq_transport_url(port, transport_mode, host) 

65 

66 key = url # Use URL as key instead of host:port 

67 if key not in self._publishers: 

68 try: 

69 import zmq 

70 if self._context is None: 

71 self._context = zmq.Context() 

72 

73 # Use REQ socket for Fiji (synchronous request/reply - worker blocks until Fiji acks) 

74 # Use PUB socket for Napari (broadcast pattern) 

75 socket_type = zmq.REQ if self.VIEWER_TYPE == 'fiji' else zmq.PUB 

76 publisher = self._context.socket(socket_type) 

77 

78 if socket_type == zmq.PUB: 

79 publisher.setsockopt(zmq.SNDHWM, 100000) # Only for PUB sockets 

80 

81 publisher.connect(url) 

82 socket_name = "REQ" if socket_type == zmq.REQ else "PUB" 

83 logger.info(f"{self.VIEWER_TYPE} streaming {socket_name} socket connected to {url}") 

84 time.sleep(0.1) 

85 self._publishers[key] = publisher 

86 

87 except ImportError: 

88 logger.error("ZeroMQ not available - streaming disabled") 

89 raise RuntimeError("ZeroMQ required for streaming") 

90 

91 return self._publishers[key] 

92 

93 def _parse_component_metadata(self, file_path: Union[str, Path], microscope_handler, 

94 source: str) -> dict: 

95 """ 

96 Parse component metadata from filename (common for all streaming backends). 

97 

98 Args: 

99 file_path: Path to parse 

100 microscope_handler: Handler with parser 

101 source: Pre-built source value (step_name during execution, subdir when loading from disk) 

102 

103 Returns: 

104 Component metadata dict with source added 

105 """ 

106 filename = os.path.basename(str(file_path)) 

107 component_metadata = microscope_handler.parser.parse_filename(filename) 

108 

109 # Add pre-built source value directly 

110 component_metadata['source'] = source 

111 

112 return component_metadata 

113 

114 def _detect_data_type(self, data: Any): 

115 """ 

116 Detect if data is ROI or image (common for all streaming backends). 

117 

118 Args: 

119 data: Data to check 

120 

121 Returns: 

122 StreamingDataType enum value 

123 """ 

124 from openhcs.core.roi import ROI 

125 from openhcs.constants.streaming import StreamingDataType 

126 

127 is_roi = isinstance(data, list) and len(data) > 0 and isinstance(data[0], ROI) 

128 return StreamingDataType.SHAPES if is_roi else StreamingDataType.IMAGE 

129 

130 def _create_shared_memory(self, data: Any, file_path: Union[str, Path]) -> dict: 

131 """ 

132 Create shared memory for image data (common for all streaming backends). 

133 

134 Args: 

135 data: Image data to put in shared memory 

136 file_path: Path identifier 

137 

138 Returns: 

139 Dict with shared memory metadata 

140 """ 

141 # Convert to numpy 

142 np_data = data.cpu().numpy() if hasattr(data, 'cpu') else \ 

143 data.get() if hasattr(data, 'get') else np.asarray(data) 

144 

145 # Create shared memory with hash-based naming to avoid "File name too long" errors 

146 # Hash the timestamp and object ID to create a short, unique name 

147 from multiprocessing import shared_memory, resource_tracker 

148 import hashlib 

149 timestamp = time.time_ns() 

150 obj_id = id(data) 

151 hash_input = f"{obj_id}_{timestamp}" 

152 hash_suffix = hashlib.md5(hash_input.encode()).hexdigest()[:8] 

153 shm_name = f"{self.SHM_PREFIX}{hash_suffix}" 

154 shm = shared_memory.SharedMemory(create=True, size=np_data.nbytes, name=shm_name) 

155 

156 # Unregister from resource tracker - we manage cleanup manually 

157 # This prevents resource tracker warnings when worker processes exit 

158 # before the viewer has unlinked the shared memory 

159 try: 

160 resource_tracker.unregister(shm._name, "shared_memory") 

161 except Exception: 

162 pass # Ignore errors if already unregistered 

163 

164 shm_array = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=shm.buf) 

165 shm_array[:] = np_data[:] 

166 self._shared_memory_blocks[shm_name] = shm 

167 

168 return { 

169 'path': str(file_path), 

170 'shape': np_data.shape, 

171 'dtype': str(np_data.dtype), 

172 'shm_name': shm_name, 

173 } 

174 

175 def _register_with_queue_tracker(self, port: int, image_ids: List[str]) -> None: 

176 """ 

177 Register sent images with queue tracker (common for all streaming backends). 

178 

179 Args: 

180 port: Port number for tracker lookup 

181 image_ids: List of image IDs to register 

182 """ 

183 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry 

184 registry = GlobalQueueTrackerRegistry() 

185 tracker = registry.get_or_create_tracker(port, self.VIEWER_TYPE) 

186 for image_id in image_ids: 

187 tracker.register_sent(image_id) 

188 

189 def save(self, data: Any, file_path: Union[str, Path], **kwargs) -> None: 

190 """ 

191 Stream single item (common for all streaming backends). 

192 

193 Args: 

194 data: Data to stream 

195 file_path: Path identifier 

196 **kwargs: Backend-specific arguments 

197 """ 

198 if isinstance(data, str): 

199 return # Ignore text data 

200 self.save_batch([data], [file_path], **kwargs) 

201 

202 def cleanup(self) -> None: 

203 """ 

204 Clean up shared memory and ZeroMQ resources (common for all streaming backends). 

205 """ 

206 logger.info(f"🔥 CLEANUP: Starting cleanup for {self.VIEWER_TYPE}") 

207 

208 # Clean up shared memory blocks 

209 logger.info(f"🔥 CLEANUP: About to clean {len(self._shared_memory_blocks)} shared memory blocks") 

210 for shm_name, shm in self._shared_memory_blocks.items(): 

211 try: 

212 shm.close() 

213 shm.unlink() 

214 except Exception as e: 

215 logger.warning(f"Failed to cleanup shared memory {shm_name}: {e}") 

216 self._shared_memory_blocks.clear() 

217 logger.info(f"🔥 CLEANUP: Shared memory cleanup complete") 

218 

219 # Close publishers 

220 logger.info(f"🔥 CLEANUP: About to close {len(self._publishers)} publishers") 

221 for key, publisher in self._publishers.items(): 

222 try: 

223 logger.info(f"🔥 CLEANUP: Closing publisher {key}") 

224 publisher.close() 

225 logger.info(f"🔥 CLEANUP: Publisher {key} closed") 

226 except Exception as e: 

227 logger.warning(f"Failed to close publisher {key}: {e}") 

228 self._publishers.clear() 

229 logger.info(f"🔥 CLEANUP: Publishers cleanup complete") 

230 

231 # Terminate context 

232 if self._context: 

233 try: 

234 logger.info(f"🔥 CLEANUP: About to terminate ZMQ context") 

235 self._context.term() 

236 logger.info(f"🔥 CLEANUP: ZMQ context terminated") 

237 except Exception as e: 

238 logger.warning(f"Failed to terminate ZMQ context: {e}") 

239 self._context = None 

240 

241 logger.info(f"🔥 CLEANUP: {self.VIEWER_TYPE} streaming backend cleaned up")