Coverage for openhcs/io/streaming.py: 17.2%
106 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Streaming backend interfaces for OpenHCS.
4This module provides abstract base classes for streaming data destinations
5that send data to external systems without persistent storage capabilities.
6"""
8import logging
9import time
10import os
11from pathlib import Path
12from typing import Any, List, Union
13import numpy as np
15from openhcs.io.base import DataSink
16from openhcs.runtime.zmq_base import get_zmq_transport_url
17from openhcs.core.config import TransportMode
19logger = logging.getLogger(__name__)
22class StreamingBackend(DataSink):
23 """
24 Abstract base class for ZeroMQ-based streaming backends.
26 Provides common ZeroMQ publisher management, shared memory handling,
27 and component metadata parsing for all streaming backends.
29 Subclasses must define abstract class attributes:
30 - VIEWER_TYPE: str (e.g., 'napari', 'fiji')
31 - SHM_PREFIX: str (e.g., 'napari_', 'fiji_')
33 All streaming backends use generic 'host' and 'port' kwargs for polymorphism.
35 Concrete implementations should use StorageBackendMeta for automatic registration.
36 """
38 # Abstract class attributes that subclasses must define
39 VIEWER_TYPE: str = None
40 SHM_PREFIX: str = None
42 def __init__(self):
43 """Initialize ZeroMQ and shared memory infrastructure."""
44 self._publishers = {}
45 self._context = None
46 self._shared_memory_blocks = {}
48 def _get_publisher(self, host: str, port: int, transport_mode: TransportMode = TransportMode.IPC):
49 """
50 Lazy initialization of ZeroMQ publisher (common for all streaming backends).
52 Uses REQ socket for Fiji (synchronous request/reply with blocking)
53 and PUB socket for Napari (broadcast pattern).
55 Args:
56 host: Host to connect to (ignored for IPC mode)
57 port: Port to connect to
58 transport_mode: IPC or TCP transport
60 Returns:
61 ZeroMQ publisher socket
62 """
63 # Generate transport URL using centralized function
64 url = get_zmq_transport_url(port, transport_mode, host)
66 key = url # Use URL as key instead of host:port
67 if key not in self._publishers:
68 try:
69 import zmq
70 if self._context is None:
71 self._context = zmq.Context()
73 # Use REQ socket for Fiji (synchronous request/reply - worker blocks until Fiji acks)
74 # Use PUB socket for Napari (broadcast pattern)
75 socket_type = zmq.REQ if self.VIEWER_TYPE == 'fiji' else zmq.PUB
76 publisher = self._context.socket(socket_type)
78 if socket_type == zmq.PUB:
79 publisher.setsockopt(zmq.SNDHWM, 100000) # Only for PUB sockets
81 publisher.connect(url)
82 socket_name = "REQ" if socket_type == zmq.REQ else "PUB"
83 logger.info(f"{self.VIEWER_TYPE} streaming {socket_name} socket connected to {url}")
84 time.sleep(0.1)
85 self._publishers[key] = publisher
87 except ImportError:
88 logger.error("ZeroMQ not available - streaming disabled")
89 raise RuntimeError("ZeroMQ required for streaming")
91 return self._publishers[key]
93 def _parse_component_metadata(self, file_path: Union[str, Path], microscope_handler,
94 source: str) -> dict:
95 """
96 Parse component metadata from filename (common for all streaming backends).
98 Args:
99 file_path: Path to parse
100 microscope_handler: Handler with parser
101 source: Pre-built source value (step_name during execution, subdir when loading from disk)
103 Returns:
104 Component metadata dict with source added
105 """
106 filename = os.path.basename(str(file_path))
107 component_metadata = microscope_handler.parser.parse_filename(filename)
109 # Add pre-built source value directly
110 component_metadata['source'] = source
112 return component_metadata
114 def _detect_data_type(self, data: Any):
115 """
116 Detect if data is ROI or image (common for all streaming backends).
118 Args:
119 data: Data to check
121 Returns:
122 StreamingDataType enum value
123 """
124 from openhcs.core.roi import ROI
125 from openhcs.constants.streaming import StreamingDataType
127 is_roi = isinstance(data, list) and len(data) > 0 and isinstance(data[0], ROI)
128 return StreamingDataType.SHAPES if is_roi else StreamingDataType.IMAGE
130 def _create_shared_memory(self, data: Any, file_path: Union[str, Path]) -> dict:
131 """
132 Create shared memory for image data (common for all streaming backends).
134 Args:
135 data: Image data to put in shared memory
136 file_path: Path identifier
138 Returns:
139 Dict with shared memory metadata
140 """
141 # Convert to numpy
142 np_data = data.cpu().numpy() if hasattr(data, 'cpu') else \
143 data.get() if hasattr(data, 'get') else np.asarray(data)
145 # Create shared memory with hash-based naming to avoid "File name too long" errors
146 # Hash the timestamp and object ID to create a short, unique name
147 from multiprocessing import shared_memory, resource_tracker
148 import hashlib
149 timestamp = time.time_ns()
150 obj_id = id(data)
151 hash_input = f"{obj_id}_{timestamp}"
152 hash_suffix = hashlib.md5(hash_input.encode()).hexdigest()[:8]
153 shm_name = f"{self.SHM_PREFIX}{hash_suffix}"
154 shm = shared_memory.SharedMemory(create=True, size=np_data.nbytes, name=shm_name)
156 # Unregister from resource tracker - we manage cleanup manually
157 # This prevents resource tracker warnings when worker processes exit
158 # before the viewer has unlinked the shared memory
159 try:
160 resource_tracker.unregister(shm._name, "shared_memory")
161 except Exception:
162 pass # Ignore errors if already unregistered
164 shm_array = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=shm.buf)
165 shm_array[:] = np_data[:]
166 self._shared_memory_blocks[shm_name] = shm
168 return {
169 'path': str(file_path),
170 'shape': np_data.shape,
171 'dtype': str(np_data.dtype),
172 'shm_name': shm_name,
173 }
175 def _register_with_queue_tracker(self, port: int, image_ids: List[str]) -> None:
176 """
177 Register sent images with queue tracker (common for all streaming backends).
179 Args:
180 port: Port number for tracker lookup
181 image_ids: List of image IDs to register
182 """
183 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry
184 registry = GlobalQueueTrackerRegistry()
185 tracker = registry.get_or_create_tracker(port, self.VIEWER_TYPE)
186 for image_id in image_ids:
187 tracker.register_sent(image_id)
189 def save(self, data: Any, file_path: Union[str, Path], **kwargs) -> None:
190 """
191 Stream single item (common for all streaming backends).
193 Args:
194 data: Data to stream
195 file_path: Path identifier
196 **kwargs: Backend-specific arguments
197 """
198 if isinstance(data, str):
199 return # Ignore text data
200 self.save_batch([data], [file_path], **kwargs)
202 def cleanup(self) -> None:
203 """
204 Clean up shared memory and ZeroMQ resources (common for all streaming backends).
205 """
206 logger.info(f"🔥 CLEANUP: Starting cleanup for {self.VIEWER_TYPE}")
208 # Clean up shared memory blocks
209 logger.info(f"🔥 CLEANUP: About to clean {len(self._shared_memory_blocks)} shared memory blocks")
210 for shm_name, shm in self._shared_memory_blocks.items():
211 try:
212 shm.close()
213 shm.unlink()
214 except Exception as e:
215 logger.warning(f"Failed to cleanup shared memory {shm_name}: {e}")
216 self._shared_memory_blocks.clear()
217 logger.info(f"🔥 CLEANUP: Shared memory cleanup complete")
219 # Close publishers
220 logger.info(f"🔥 CLEANUP: About to close {len(self._publishers)} publishers")
221 for key, publisher in self._publishers.items():
222 try:
223 logger.info(f"🔥 CLEANUP: Closing publisher {key}")
224 publisher.close()
225 logger.info(f"🔥 CLEANUP: Publisher {key} closed")
226 except Exception as e:
227 logger.warning(f"Failed to close publisher {key}: {e}")
228 self._publishers.clear()
229 logger.info(f"🔥 CLEANUP: Publishers cleanup complete")
231 # Terminate context
232 if self._context:
233 try:
234 logger.info(f"🔥 CLEANUP: About to terminate ZMQ context")
235 self._context.term()
236 logger.info(f"🔥 CLEANUP: ZMQ context terminated")
237 except Exception as e:
238 logger.warning(f"Failed to terminate ZMQ context: {e}")
239 self._context = None
241 logger.info(f"🔥 CLEANUP: {self.VIEWER_TYPE} streaming backend cleaned up")