Coverage for openhcs/io/fiji_stream.py: 18.1%

80 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Fiji streaming backend for OpenHCS. 

3 

4Streams image data to Fiji/ImageJ viewer using ZMQ for IPC. 

5Follows same architecture as Napari streaming for consistency. 

6 

7SHARED MEMORY OWNERSHIP MODEL: 

8- Sender (Worker): Creates shared memory, sends reference via ZMQ, closes handle (does NOT unlink) 

9- Receiver (Fiji Server): Attaches to shared memory, copies data, closes handle, unlinks 

10- Only receiver calls unlink() to prevent FileNotFoundError 

11- REQ/REP socket pattern ensures receiver copies data before sender closes handle 

12""" 

13 

14import logging 

15import time 

16from pathlib import Path 

17from typing import Any, Union, List 

18import os 

19import numpy as np 

20 

21from openhcs.io.streaming import StreamingBackend 

22from openhcs.constants.constants import Backend 

23from openhcs.core.config import TransportMode 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class FijiStreamingBackend(StreamingBackend): 

29 """Fiji streaming backend with ZMQ publisher pattern (matches Napari architecture).""" 

30 _backend_type = Backend.FIJI_STREAM.value 

31 

32 # Configure ABC attributes 

33 VIEWER_TYPE = 'fiji' 

34 SHM_PREFIX = 'fiji_' 

35 

36 # __init__, _get_publisher, save, cleanup now inherited from ABC 

37 

38 def _prepare_rois_data(self, data: Any, file_path: Union[str, Path]) -> dict: 

39 """ 

40 Prepare ROIs data for transmission. 

41 

42 Args: 

43 data: ROI list 

44 file_path: Path identifier 

45 

46 Returns: 

47 Dict with ROI data 

48 """ 

49 from openhcs.runtime.roi_converters import FijiROIConverter 

50 

51 # Convert ROI objects to bytes, then base64 encode for transmission 

52 roi_bytes_list = FijiROIConverter.rois_to_imagej_bytes(data) 

53 rois_encoded = FijiROIConverter.encode_rois_for_transmission(roi_bytes_list) 

54 

55 return { 

56 'path': str(file_path), 

57 'rois': rois_encoded, 

58 } 

59 

60 def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], **kwargs) -> None: 

61 """Stream batch of images or ROIs to Fiji via ZMQ.""" 

62 from openhcs.constants.streaming import StreamingDataType 

63 

64 if len(data_list) != len(file_paths): 

65 raise ValueError("data_list and file_paths must have same length") 

66 

67 logger.info(f"📦 FIJI BACKEND: save_batch called with {len(data_list)} items") 

68 

69 # Extract kwargs using generic polymorphic names 

70 host = kwargs.get('host', 'localhost') 

71 port = kwargs['port'] 

72 transport_mode = kwargs.get('transport_mode', TransportMode.IPC) 

73 publisher = self._get_publisher(host, port, transport_mode) 

74 display_config = kwargs['display_config'] 

75 microscope_handler = kwargs['microscope_handler'] 

76 source = kwargs.get('source', 'unknown_source') # Pre-built source value 

77 images_dir = kwargs.get('images_dir') # Source image subdirectory for ROI mapping 

78 

79 # Prepare batch messages 

80 batch_images = [] 

81 image_ids = [] 

82 

83 for data, file_path in zip(data_list, file_paths): 

84 # Generate unique ID 

85 import uuid 

86 image_id = str(uuid.uuid4()) 

87 image_ids.append(image_id) 

88 

89 # Detect data type using ABC helper 

90 data_type = self._detect_data_type(data) 

91 logger.info(f"🔍 FIJI BACKEND: Detected data type: {data_type} for path: {file_path}") 

92 

93 # Parse component metadata using ABC helper (ONCE for all types) 

94 component_metadata = self._parse_component_metadata( 

95 file_path, microscope_handler, source 

96 ) 

97 

98 # Prepare data based on type 

99 if data_type == StreamingDataType.SHAPES: # ROIs for Fiji 

100 logger.info(f"🔍 FIJI BACKEND: Preparing ROI data for {file_path}") 

101 item_data = self._prepare_rois_data(data, file_path) 

102 data_type_str = 'rois' # Fiji uses 'rois' not 'shapes' 

103 logger.info(f"🔍 FIJI BACKEND: ROI data prepared: {len(item_data.get('rois', []))} ROIs") 

104 else: # IMAGE 

105 logger.info(f"🔍 FIJI BACKEND: Preparing image data for {file_path}") 

106 item_data = self._create_shared_memory(data, file_path) 

107 data_type_str = 'image' 

108 

109 # Build batch item 

110 batch_images.append({ 

111 **item_data, 

112 'data_type': data_type_str, 

113 'metadata': component_metadata, 

114 'image_id': image_id 

115 }) 

116 logger.info(f"🔍 FIJI BACKEND: Added {data_type_str} item to batch") 

117 

118 # Extract component modes for ALL components in component_order (including virtual components) 

119 component_modes = {} 

120 for comp_name in display_config.COMPONENT_ORDER: 

121 mode_field = f"{comp_name}_mode" 

122 if hasattr(display_config, mode_field): 

123 mode = getattr(display_config, mode_field) 

124 component_modes[comp_name] = mode.value 

125 

126 # Send batch message 

127 message = { 

128 'type': 'batch', 

129 'images': batch_images, 

130 'display_config': { 

131 'lut': display_config.get_lut_name(), 

132 'component_modes': component_modes, 

133 'component_order': display_config.COMPONENT_ORDER, 

134 'auto_contrast': display_config.auto_contrast if hasattr(display_config, 'auto_contrast') else True 

135 }, 

136 'images_dir': images_dir, # Source image subdirectory for ROI->image mapping 

137 'timestamp': time.time() 

138 } 

139 

140 # Log batch composition 

141 data_types = [item['data_type'] for item in batch_images] 

142 type_counts = {dt: data_types.count(dt) for dt in set(data_types)} 

143 logger.info(f"📤 FIJI BACKEND: Sending batch message with {len(batch_images)} items to port {port}: {type_counts}") 

144 

145 # Register sent images with queue tracker BEFORE sending 

146 # This prevents race condition with IPC mode where acks arrive before registration 

147 self._register_with_queue_tracker(port, image_ids) 

148 

149 # Send with REQ socket (BLOCKING - worker waits for Fiji to acknowledge) 

150 # Worker blocks until Fiji receives, copies data from shared memory, and sends ack 

151 # This guarantees no messages are lost and shared memory is only closed after Fiji is done 

152 logger.info(f"📤 FIJI BACKEND: Sending batch of {len(batch_images)} images to Fiji on port {port} (REQ/REP - blocking until ack)") 

153 publisher.send_json(message) # Blocking send 

154 

155 # Wait for acknowledgment from Fiji (REP socket) 

156 # Fiji will only reply after it has copied all data from shared memory 

157 ack_response = publisher.recv_json() 

158 logger.info(f"✅ FIJI BACKEND: Received ack from Fiji: {ack_response.get('status', 'unknown')}") 

159 

160 # Clean up publisher's handles after successful send 

161 # Receiver will unlink the shared memory after copying the data 

162 for img in batch_images: 

163 shm_name = img.get('shm_name') # ROI items don't have shm_name 

164 if shm_name and shm_name in self._shared_memory_blocks: 

165 try: 

166 shm = self._shared_memory_blocks.pop(shm_name) 

167 shm.close() # Close our handle, but don't unlink - receiver will do that 

168 except Exception as e: 

169 logger.warning(f"Failed to close shared memory handle {shm_name}: {e}") 

170 

171 # cleanup() now inherited from ABC 

172 

173 def __del__(self): 

174 """Cleanup on deletion.""" 

175 import logging 

176 logger = logging.getLogger(__name__) 

177 logger.info("🔥 FIJI __del__ called, about to call cleanup()") 

178 self.cleanup() 

179 logger.info("🔥 FIJI __del__ cleanup() returned")