Coverage for src/polystore/streaming.py: 13%
115 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 06:58 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 06:58 +0000
1"""
2Streaming backend interfaces for OpenHCS.
4This module provides abstract base classes for streaming data destinations
5that send data to external systems without persistent storage capabilities.
7Note: This module requires the openhcs package. It is optional for polystore.
8"""
10import logging
11import time
12import os
13from pathlib import Path
14from typing import Any, List, Union, Optional
15import numpy as np
17# Lazy imports - streaming is optional and requires OpenHCS
18try:
19 from openhcs.io.base import DataSink
20 from openhcs.runtime.zmq_base import get_zmq_transport_url
21 from openhcs.core.config import TransportMode
22 OPENHCS_AVAILABLE = True
23except ImportError:
24 OPENHCS_AVAILABLE = False
25 DataSink = object # Use object as stub base class
26 TransportMode = None
27 get_zmq_transport_url = None
30# Only define StreamingBackend if OpenHCS is available
31if not OPENHCS_AVAILABLE:
32 raise ImportError("Streaming backend requires OpenHCS. Install with: pip install openhcs")
34logger = logging.getLogger(__name__)
37class StreamingBackend(DataSink):
38 """
39 Abstract base class for ZeroMQ-based streaming backends.
41 Provides common ZeroMQ publisher management, shared memory handling,
42 and component metadata parsing for all streaming backends.
44 Subclasses must define abstract class attributes:
45 - VIEWER_TYPE: str (e.g., 'napari', 'fiji')
46 - SHM_PREFIX: str (e.g., 'napari_', 'fiji_')
48 All streaming backends use generic 'host' and 'port' kwargs for polymorphism.
50 Concrete implementations should use StorageBackendMeta for automatic registration.
51 """
53 # Abstract class attributes that subclasses must define
54 VIEWER_TYPE: str = None
55 SHM_PREFIX: str = None
57 def __init__(self):
58 """Initialize ZeroMQ and shared memory infrastructure."""
59 self._publishers = {}
60 self._context = None
61 self._shared_memory_blocks = {}
63 def _get_publisher(self, host: str, port: int, transport_mode: TransportMode = TransportMode.IPC):
64 """
65 Lazy initialization of ZeroMQ publisher (common for all streaming backends).
67 Uses REQ socket for Fiji (synchronous request/reply with blocking)
68 and PUB socket for Napari (broadcast pattern).
70 Args:
71 host: Host to connect to (ignored for IPC mode)
72 port: Port to connect to
73 transport_mode: IPC or TCP transport
75 Returns:
76 ZeroMQ publisher socket
77 """
78 # Generate transport URL using centralized function
79 url = get_zmq_transport_url(port, transport_mode, host)
81 key = url # Use URL as key instead of host:port
82 if key not in self._publishers:
83 try:
84 import zmq
85 if self._context is None:
86 self._context = zmq.Context()
88 # Use REQ socket for Fiji (synchronous request/reply - worker blocks until Fiji acks)
89 # Use PUB socket for Napari (broadcast pattern)
90 socket_type = zmq.REQ if self.VIEWER_TYPE == 'fiji' else zmq.PUB
91 publisher = self._context.socket(socket_type)
93 if socket_type == zmq.PUB:
94 publisher.setsockopt(zmq.SNDHWM, 100000) # Only for PUB sockets
96 publisher.connect(url)
97 socket_name = "REQ" if socket_type == zmq.REQ else "PUB"
98 logger.info(f"{self.VIEWER_TYPE} streaming {socket_name} socket connected to {url}")
99 time.sleep(0.1)
100 self._publishers[key] = publisher
102 except ImportError:
103 logger.error("ZeroMQ not available - streaming disabled")
104 raise RuntimeError("ZeroMQ required for streaming")
106 return self._publishers[key]
108 def _parse_component_metadata(self, file_path: Union[str, Path], microscope_handler,
109 source: str) -> dict:
110 """
111 Parse component metadata from filename (common for all streaming backends).
113 Args:
114 file_path: Path to parse
115 microscope_handler: Handler with parser
116 source: Pre-built source value (step_name during execution, subdir when loading from disk)
118 Returns:
119 Component metadata dict with source added
120 """
121 filename = os.path.basename(str(file_path))
122 component_metadata = microscope_handler.parser.parse_filename(filename)
124 # Add pre-built source value directly
125 component_metadata['source'] = source
127 return component_metadata
129 def _detect_data_type(self, data: Any):
130 """
131 Detect if data is ROI or image (common for all streaming backends).
133 Args:
134 data: Data to check
136 Returns:
137 StreamingDataType enum value
138 """
139 from openhcs.core.roi import ROI
140 from openhcs.constants.streaming import StreamingDataType
142 is_roi = isinstance(data, list) and len(data) > 0 and isinstance(data[0], ROI)
143 return StreamingDataType.SHAPES if is_roi else StreamingDataType.IMAGE
145 def _create_shared_memory(self, data: Any, file_path: Union[str, Path]) -> dict:
146 """
147 Create shared memory for image data (common for all streaming backends).
149 Args:
150 data: Image data to put in shared memory
151 file_path: Path identifier
153 Returns:
154 Dict with shared memory metadata
155 """
156 # Convert to numpy
157 np_data = data.cpu().numpy() if hasattr(data, 'cpu') else \
158 data.get() if hasattr(data, 'get') else np.asarray(data)
160 # Create shared memory with hash-based naming to avoid "File name too long" errors
161 # Hash the timestamp and object ID to create a short, unique name
162 from multiprocessing import shared_memory, resource_tracker
163 import hashlib
164 timestamp = time.time_ns()
165 obj_id = id(data)
166 hash_input = f"{obj_id}_{timestamp}"
167 hash_suffix = hashlib.md5(hash_input.encode()).hexdigest()[:8]
168 shm_name = f"{self.SHM_PREFIX}{hash_suffix}"
169 shm = shared_memory.SharedMemory(create=True, size=np_data.nbytes, name=shm_name)
171 # Unregister from resource tracker - we manage cleanup manually
172 # This prevents resource tracker warnings when worker processes exit
173 # before the viewer has unlinked the shared memory
174 try:
175 resource_tracker.unregister(shm._name, "shared_memory")
176 except Exception:
177 pass # Ignore errors if already unregistered
179 shm_array = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=shm.buf)
180 shm_array[:] = np_data[:]
181 self._shared_memory_blocks[shm_name] = shm
183 return {
184 'path': str(file_path),
185 'shape': np_data.shape,
186 'dtype': str(np_data.dtype),
187 'shm_name': shm_name,
188 }
190 def _register_with_queue_tracker(self, port: int, image_ids: List[str]) -> None:
191 """
192 Register sent images with queue tracker (common for all streaming backends).
194 Args:
195 port: Port number for tracker lookup
196 image_ids: List of image IDs to register
197 """
198 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry
199 registry = GlobalQueueTrackerRegistry()
200 tracker = registry.get_or_create_tracker(port, self.VIEWER_TYPE)
201 for image_id in image_ids:
202 tracker.register_sent(image_id)
204 def save(self, data: Any, file_path: Union[str, Path], **kwargs) -> None:
205 """
206 Stream single item (common for all streaming backends).
208 Args:
209 data: Data to stream
210 file_path: Path identifier
211 **kwargs: Backend-specific arguments
212 """
213 if isinstance(data, str):
214 return # Ignore text data
215 self.save_batch([data], [file_path], **kwargs)
217 def cleanup(self) -> None:
218 """
219 Clean up shared memory and ZeroMQ resources (common for all streaming backends).
220 """
221 logger.info(f"🔥 CLEANUP: Starting cleanup for {self.VIEWER_TYPE}")
223 # Clean up shared memory blocks
224 logger.info(f"🔥 CLEANUP: About to clean {len(self._shared_memory_blocks)} shared memory blocks")
225 for shm_name, shm in self._shared_memory_blocks.items():
226 try:
227 shm.close()
228 shm.unlink()
229 except Exception as e:
230 logger.warning(f"Failed to cleanup shared memory {shm_name}: {e}")
231 self._shared_memory_blocks.clear()
232 logger.info(f"🔥 CLEANUP: Shared memory cleanup complete")
234 # Close publishers
235 logger.info(f"🔥 CLEANUP: About to close {len(self._publishers)} publishers")
236 for key, publisher in self._publishers.items():
237 try:
238 logger.info(f"🔥 CLEANUP: Closing publisher {key}")
239 publisher.close()
240 logger.info(f"🔥 CLEANUP: Publisher {key} closed")
241 except Exception as e:
242 logger.warning(f"Failed to close publisher {key}: {e}")
243 self._publishers.clear()
244 logger.info(f"🔥 CLEANUP: Publishers cleanup complete")
246 # Terminate context
247 if self._context:
248 try:
249 logger.info(f"🔥 CLEANUP: About to terminate ZMQ context")
250 self._context.term()
251 logger.info(f"🔥 CLEANUP: ZMQ context terminated")
252 except Exception as e:
253 logger.warning(f"Failed to terminate ZMQ context: {e}")
254 self._context = None
256 logger.info(f"🔥 CLEANUP: {self.VIEWER_TYPE} streaming backend cleaned up")