Coverage for src/polystore/streaming.py: 13%

115 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 06:58 +0000

1""" 

2Streaming backend interfaces for OpenHCS. 

3 

4This module provides abstract base classes for streaming data destinations 

5that send data to external systems without persistent storage capabilities. 

6 

7Note: This module requires the openhcs package. It is optional for polystore. 

8""" 

9 

10import logging 

11import time 

12import os 

13from pathlib import Path 

14from typing import Any, List, Union, Optional 

15import numpy as np 

16 

17# Lazy imports - streaming is optional and requires OpenHCS 

18try: 

19 from openhcs.io.base import DataSink 

20 from openhcs.runtime.zmq_base import get_zmq_transport_url 

21 from openhcs.core.config import TransportMode 

22 OPENHCS_AVAILABLE = True 

23except ImportError: 

24 OPENHCS_AVAILABLE = False 

25 DataSink = object # Use object as stub base class 

26 TransportMode = None 

27 get_zmq_transport_url = None 

28 

29 

30# Only define StreamingBackend if OpenHCS is available 

31if not OPENHCS_AVAILABLE: 

32 raise ImportError("Streaming backend requires OpenHCS. Install with: pip install openhcs") 

33 

34logger = logging.getLogger(__name__) 

35 

36 

37class StreamingBackend(DataSink): 

38 """ 

39 Abstract base class for ZeroMQ-based streaming backends. 

40 

41 Provides common ZeroMQ publisher management, shared memory handling, 

42 and component metadata parsing for all streaming backends. 

43 

44 Subclasses must define abstract class attributes: 

45 - VIEWER_TYPE: str (e.g., 'napari', 'fiji') 

46 - SHM_PREFIX: str (e.g., 'napari_', 'fiji_') 

47 

48 All streaming backends use generic 'host' and 'port' kwargs for polymorphism. 

49 

50 Concrete implementations should use StorageBackendMeta for automatic registration. 

51 """ 

52 

53 # Abstract class attributes that subclasses must define 

54 VIEWER_TYPE: str = None 

55 SHM_PREFIX: str = None 

56 

57 def __init__(self): 

58 """Initialize ZeroMQ and shared memory infrastructure.""" 

59 self._publishers = {} 

60 self._context = None 

61 self._shared_memory_blocks = {} 

62 

63 def _get_publisher(self, host: str, port: int, transport_mode: TransportMode = TransportMode.IPC): 

64 """ 

65 Lazy initialization of ZeroMQ publisher (common for all streaming backends). 

66 

67 Uses REQ socket for Fiji (synchronous request/reply with blocking) 

68 and PUB socket for Napari (broadcast pattern). 

69 

70 Args: 

71 host: Host to connect to (ignored for IPC mode) 

72 port: Port to connect to 

73 transport_mode: IPC or TCP transport 

74 

75 Returns: 

76 ZeroMQ publisher socket 

77 """ 

78 # Generate transport URL using centralized function 

79 url = get_zmq_transport_url(port, transport_mode, host) 

80 

81 key = url # Use URL as key instead of host:port 

82 if key not in self._publishers: 

83 try: 

84 import zmq 

85 if self._context is None: 

86 self._context = zmq.Context() 

87 

88 # Use REQ socket for Fiji (synchronous request/reply - worker blocks until Fiji acks) 

89 # Use PUB socket for Napari (broadcast pattern) 

90 socket_type = zmq.REQ if self.VIEWER_TYPE == 'fiji' else zmq.PUB 

91 publisher = self._context.socket(socket_type) 

92 

93 if socket_type == zmq.PUB: 

94 publisher.setsockopt(zmq.SNDHWM, 100000) # Only for PUB sockets 

95 

96 publisher.connect(url) 

97 socket_name = "REQ" if socket_type == zmq.REQ else "PUB" 

98 logger.info(f"{self.VIEWER_TYPE} streaming {socket_name} socket connected to {url}") 

99 time.sleep(0.1) 

100 self._publishers[key] = publisher 

101 

102 except ImportError: 

103 logger.error("ZeroMQ not available - streaming disabled") 

104 raise RuntimeError("ZeroMQ required for streaming") 

105 

106 return self._publishers[key] 

107 

108 def _parse_component_metadata(self, file_path: Union[str, Path], microscope_handler, 

109 source: str) -> dict: 

110 """ 

111 Parse component metadata from filename (common for all streaming backends). 

112 

113 Args: 

114 file_path: Path to parse 

115 microscope_handler: Handler with parser 

116 source: Pre-built source value (step_name during execution, subdir when loading from disk) 

117 

118 Returns: 

119 Component metadata dict with source added 

120 """ 

121 filename = os.path.basename(str(file_path)) 

122 component_metadata = microscope_handler.parser.parse_filename(filename) 

123 

124 # Add pre-built source value directly 

125 component_metadata['source'] = source 

126 

127 return component_metadata 

128 

129 def _detect_data_type(self, data: Any): 

130 """ 

131 Detect if data is ROI or image (common for all streaming backends). 

132 

133 Args: 

134 data: Data to check 

135 

136 Returns: 

137 StreamingDataType enum value 

138 """ 

139 from openhcs.core.roi import ROI 

140 from openhcs.constants.streaming import StreamingDataType 

141 

142 is_roi = isinstance(data, list) and len(data) > 0 and isinstance(data[0], ROI) 

143 return StreamingDataType.SHAPES if is_roi else StreamingDataType.IMAGE 

144 

145 def _create_shared_memory(self, data: Any, file_path: Union[str, Path]) -> dict: 

146 """ 

147 Create shared memory for image data (common for all streaming backends). 

148 

149 Args: 

150 data: Image data to put in shared memory 

151 file_path: Path identifier 

152 

153 Returns: 

154 Dict with shared memory metadata 

155 """ 

156 # Convert to numpy 

157 np_data = data.cpu().numpy() if hasattr(data, 'cpu') else \ 

158 data.get() if hasattr(data, 'get') else np.asarray(data) 

159 

160 # Create shared memory with hash-based naming to avoid "File name too long" errors 

161 # Hash the timestamp and object ID to create a short, unique name 

162 from multiprocessing import shared_memory, resource_tracker 

163 import hashlib 

164 timestamp = time.time_ns() 

165 obj_id = id(data) 

166 hash_input = f"{obj_id}_{timestamp}" 

167 hash_suffix = hashlib.md5(hash_input.encode()).hexdigest()[:8] 

168 shm_name = f"{self.SHM_PREFIX}{hash_suffix}" 

169 shm = shared_memory.SharedMemory(create=True, size=np_data.nbytes, name=shm_name) 

170 

171 # Unregister from resource tracker - we manage cleanup manually 

172 # This prevents resource tracker warnings when worker processes exit 

173 # before the viewer has unlinked the shared memory 

174 try: 

175 resource_tracker.unregister(shm._name, "shared_memory") 

176 except Exception: 

177 pass # Ignore errors if already unregistered 

178 

179 shm_array = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=shm.buf) 

180 shm_array[:] = np_data[:] 

181 self._shared_memory_blocks[shm_name] = shm 

182 

183 return { 

184 'path': str(file_path), 

185 'shape': np_data.shape, 

186 'dtype': str(np_data.dtype), 

187 'shm_name': shm_name, 

188 } 

189 

190 def _register_with_queue_tracker(self, port: int, image_ids: List[str]) -> None: 

191 """ 

192 Register sent images with queue tracker (common for all streaming backends). 

193 

194 Args: 

195 port: Port number for tracker lookup 

196 image_ids: List of image IDs to register 

197 """ 

198 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry 

199 registry = GlobalQueueTrackerRegistry() 

200 tracker = registry.get_or_create_tracker(port, self.VIEWER_TYPE) 

201 for image_id in image_ids: 

202 tracker.register_sent(image_id) 

203 

204 def save(self, data: Any, file_path: Union[str, Path], **kwargs) -> None: 

205 """ 

206 Stream single item (common for all streaming backends). 

207 

208 Args: 

209 data: Data to stream 

210 file_path: Path identifier 

211 **kwargs: Backend-specific arguments 

212 """ 

213 if isinstance(data, str): 

214 return # Ignore text data 

215 self.save_batch([data], [file_path], **kwargs) 

216 

217 def cleanup(self) -> None: 

218 """ 

219 Clean up shared memory and ZeroMQ resources (common for all streaming backends). 

220 """ 

221 logger.info(f"🔥 CLEANUP: Starting cleanup for {self.VIEWER_TYPE}") 

222 

223 # Clean up shared memory blocks 

224 logger.info(f"🔥 CLEANUP: About to clean {len(self._shared_memory_blocks)} shared memory blocks") 

225 for shm_name, shm in self._shared_memory_blocks.items(): 

226 try: 

227 shm.close() 

228 shm.unlink() 

229 except Exception as e: 

230 logger.warning(f"Failed to cleanup shared memory {shm_name}: {e}") 

231 self._shared_memory_blocks.clear() 

232 logger.info(f"🔥 CLEANUP: Shared memory cleanup complete") 

233 

234 # Close publishers 

235 logger.info(f"🔥 CLEANUP: About to close {len(self._publishers)} publishers") 

236 for key, publisher in self._publishers.items(): 

237 try: 

238 logger.info(f"🔥 CLEANUP: Closing publisher {key}") 

239 publisher.close() 

240 logger.info(f"🔥 CLEANUP: Publisher {key} closed") 

241 except Exception as e: 

242 logger.warning(f"Failed to close publisher {key}: {e}") 

243 self._publishers.clear() 

244 logger.info(f"🔥 CLEANUP: Publishers cleanup complete") 

245 

246 # Terminate context 

247 if self._context: 

248 try: 

249 logger.info(f"🔥 CLEANUP: About to terminate ZMQ context") 

250 self._context.term() 

251 logger.info(f"🔥 CLEANUP: ZMQ context terminated") 

252 except Exception as e: 

253 logger.warning(f"Failed to terminate ZMQ context: {e}") 

254 self._context = None 

255 

256 logger.info(f"🔥 CLEANUP: {self.VIEWER_TYPE} streaming backend cleaned up")