Coverage for openhcs/runtime/napari_stream_visualizer.py: 4.5%

164 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Napari-based real-time visualization module for OpenHCS. 

3 

4This module provides the NapariStreamVisualizer class for real-time 

5visualization of tensors during pipeline execution. 

6 

7Doctrinal Clauses: 

8- Clause 65 — No Fallback Logic 

9- Clause 66 — Immutability After Construction 

10- Clause 88 — No Inferred Capabilities 

11- Clause 368 — Visualization Must Be Observer-Only 

12""" 

13 

14import logging 

15import queue 

16import threading 

17from typing import Any, Dict, Optional 

18 

19from openhcs.io.filemanager import FileManager 

20from openhcs.utils.import_utils import optional_import 

21 

22# Optional napari import - this module should only be imported if napari is available 

23napari = optional_import("napari") 

24if napari is None: 24 ↛ 30line 24 didn't jump to line 30 because the condition on line 24 was always true

25 raise ImportError( 

26 "napari is required for NapariStreamVisualizer. " 

27 "Install it with: pip install 'openhcs[viz]' or pip install napari" 

28 ) 

29 

30import numpy as np 

31 

32logger = logging.getLogger(__name__) 

33 

34# Sentinel object to signal the viewer thread to shut down 

35SHUTDOWN_SENTINEL = object() 

36 

37class NapariStreamVisualizer: 

38 """ 

39 Manages a Napari viewer instance for real-time visualization of tensors 

40 streamed from the OpenHCS pipeline. Runs in a separate thread. 

41 """ 

42 

43 def __init__(self, filemanager: FileManager, viewer_title: str = "OpenHCS Real-Time Visualization"): 

44 self.filemanager = filemanager # Added 

45 self.viewer_title = viewer_title 

46 self.viewer: Optional[napari.Viewer] = None 

47 self.layers: Dict[str, napari.layers.Image] = {} # Consider if layer type should be more generic 

48 self.data_queue = queue.Queue() 

49 self.viewer_thread: Optional[threading.Thread] = None 

50 self.is_running = False 

51 self._lock = threading.Lock() 

52 

53 # Clause 368: Visualization must be observer-only. 

54 # This class will only read data and display it. 

55 

56 def _initialize_viewer_in_thread(self): 

57 """ 

58 Initializes and runs the Napari viewer event loop. 

59 This method is intended to be run in a separate thread. 

60 """ 

61 try: 

62 logger.info("Napari viewer thread started.") 

63 # napari.gui_qt() ensures the Qt event loop is running if not already. 

64 # It's crucial for running napari in a non-blocking way from a script. 

65 with napari.gui_qt(): 

66 self.viewer = napari.Viewer(title=self.viewer_title, show=True) 

67 self.is_running = True 

68 logger.info("Napari viewer initialized and shown.") 

69 

70 while self.is_running: 

71 try: 

72 # Wait for data with a timeout to allow checking self.is_running 

73 item = self.data_queue.get(timeout=0.1) 

74 if item is SHUTDOWN_SENTINEL: 

75 logger.info("Shutdown sentinel received. Exiting viewer loop.") 

76 break 

77 

78 # New logic for path-based items: 

79 if isinstance(item, dict) and item.get('type') == 'data_path': 

80 step_id = item['step_id'] 

81 path = item['path'] 

82 backend = item['backend'] 

83 well_id = item.get('well_id') # Can be None 

84 

85 logger.debug(f"Processing path '{path}' for step '{step_id}' from queue.") 

86 try: 

87 # Load data using FileManager 

88 loaded_data = self.filemanager.load(path, backend) 

89 if loaded_data is not None: 

90 # Prepare data for display (includes GPU->CPU, slicing) 

91 display_data = self._prepare_data_for_display(loaded_data, step_id) 

92 

93 if display_data is not None: 

94 layer_name = f"{well_id}_{step_id}" if well_id else step_id 

95 # Metadata might come from step_plan or be fixed for now 

96 metadata = {'colormap': 'gray'} 

97 self._update_layer_in_thread(layer_name, display_data, metadata) 

98 # else: (logging already in _prepare_data_for_display) 

99 else: 

100 logger.warning(f"FileManager returned None for path '{path}', backend '{backend}' (step '{step_id}').") 

101 except Exception as e_load: 

102 logger.error(f"Error loading or preparing data for step '{step_id}', path '{path}': {e_load}", exc_info=True) 

103 else: 

104 logger.warning(f"Unknown item type in data queue: {type(item)}. Item: {item}") 

105 

106 self.data_queue.task_done() 

107 except queue.Empty: 

108 continue # Timeout, check self.is_running again 

109 except Exception as e: 

110 logger.error(f"Error processing item in viewer thread: {e}", exc_info=True) 

111 # Clause 65: Fail loudly (within the visualizer context) but don't crash pipeline. 

112 logger.info("Napari viewer event loop exited.") 

113 except Exception as e: 

114 logger.error(f"Fatal error in Napari viewer thread: {e}", exc_info=True) 

115 finally: 

116 self.is_running = False # Ensure flag is cleared 

117 if self.viewer: 

118 self.viewer.close() 

119 logger.info("Napari viewer thread finished.") 

120 

121 

122 def _update_layer_in_thread(self, layer_name: str, data: np.ndarray, metadata: Optional[Dict] = None): 

123 """ 

124 Updates or creates a layer in the Napari viewer. Must be called from the viewer thread. 

125 """ 

126 if not self.viewer: 

127 logger.warning("Viewer not initialized, cannot update layer.") 

128 return 

129 

130 try: 

131 if layer_name in self.layers: 

132 self.layers[layer_name].data = data 

133 logger.debug(f"Updated layer: {layer_name} with shape {data.shape}") 

134 else: 

135 self.layers[layer_name] = self.viewer.add_image(data, name=layer_name) 

136 logger.info(f"Added new layer: {layer_name} with shape {data.shape}") 

137 

138 if metadata and 'colormap' in metadata and self.layers[layer_name]: 

139 self.layers[layer_name].colormap = metadata['colormap'] 

140 if metadata and 'contrast_limits' in metadata and self.layers[layer_name]: 

141 self.layers[layer_name].contrast_limits = metadata['contrast_limits'] 

142 

143 except Exception as e: 

144 logger.error(f"Error updating Napari layer '{layer_name}': {e}", exc_info=True) 

145 

146 def start_viewer(self): 

147 """Starts the Napari viewer in a separate thread.""" 

148 with self._lock: 

149 if self.is_running or self.viewer_thread is not None: 

150 logger.warning("Napari viewer is already running or starting.") 

151 return 

152 

153 self.viewer_thread = threading.Thread(target=self._initialize_viewer_in_thread, daemon=True) 

154 self.viewer_thread.start() 

155 logger.info("NapariStreamVisualizer viewer thread initiated.") 

156 

157 def _prepare_data_for_display(self, data: Any, step_id_for_log: str) -> Optional[np.ndarray]: 

158 """Converts loaded data to a displayable NumPy array (e.g., 2D slice).""" 

159 cpu_tensor: Optional[np.ndarray] = None 

160 try: 

161 # GPU to CPU conversion logic 

162 if hasattr(data, 'is_cuda') and data.is_cuda: # PyTorch 

163 cpu_tensor = data.cpu().numpy() 

164 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # Check for device attribute 

165 if hasattr(data, 'get'): # CuPy 

166 cpu_tensor = data.get() 

167 elif hasattr(data, 'numpy'): # JAX on GPU might have .numpy() after host transfer 

168 cpu_tensor = np.asarray(data) # JAX arrays might need explicit conversion 

169 else: # Fallback for other GPU array types if possible 

170 logger.warning(f"Unknown GPU array type for step '{step_id_for_log}'. Attempting .numpy().") 

171 if hasattr(data, 'numpy'): 

172 cpu_tensor = data.numpy() 

173 else: 

174 logger.error(f"Cannot convert GPU tensor of type {type(data)} for step '{step_id_for_log}'.") 

175 return None 

176 elif isinstance(data, np.ndarray): 

177 cpu_tensor = data 

178 else: 

179 # Attempt to convert to numpy array if it's some other array-like structure 

180 try: 

181 cpu_tensor = np.asarray(data) 

182 logger.debug(f"Converted data of type {type(data)} to numpy array for step '{step_id_for_log}'.") 

183 except Exception as e_conv: 

184 logger.warning(f"Unsupported data type for step '{step_id_for_log}': {type(data)}. Error: {e_conv}") 

185 return None 

186 

187 if cpu_tensor is None: # Should not happen if logic above is correct 

188 return None 

189 

190 # Slicing logic 

191 display_slice: Optional[np.ndarray] = None 

192 if cpu_tensor.ndim == 3: # ZYX 

193 display_slice = cpu_tensor[cpu_tensor.shape[0] // 2, :, :] 

194 elif cpu_tensor.ndim == 2: # YX 

195 display_slice = cpu_tensor 

196 elif cpu_tensor.ndim > 3: # e.g. CZYX or TZYX 

197 logger.warning(f"Tensor for step '{step_id_for_log}' has ndim > 3 ({cpu_tensor.ndim}). Taking a default slice.") 

198 slicer = [0] * (cpu_tensor.ndim - 2) # Slice first channels/times 

199 slicer[-1] = cpu_tensor.shape[-3] // 2 # Middle Z 

200 try: 

201 display_slice = cpu_tensor[tuple(slicer)] 

202 except IndexError: # Handle cases where slicing might fail (e.g. very small dimensions) 

203 logger.error(f"Slicing failed for tensor with shape {cpu_tensor.shape} for step '{step_id_for_log}'.", exc_info=True) 

204 display_slice = None 

205 else: 

206 logger.warning(f"Tensor for step '{step_id_for_log}' has unsupported ndim for display: {cpu_tensor.ndim}.") 

207 return None 

208 

209 return display_slice.copy() if display_slice is not None else None 

210 

211 except Exception as e: 

212 logger.error(f"Error preparing data from step '{step_id_for_log}' for display: {e}", exc_info=True) 

213 return None 

214 

215 def visualize_path(self, step_id: str, path: str, backend: str, well_id: Optional[str] = None): 

216 """ 

217 Receives a VFS path, backend, and associated info, and queues it for display. 

218 """ 

219 if not self.is_running and self.viewer_thread is None: 

220 logger.info(f"Visualizer not running for step '{step_id}'. Starting Napari viewer.") 

221 self.start_viewer() 

222 

223 if not self.viewer_thread: # Check if thread actually started 

224 logger.warning(f"Visualizer thread not available. Cannot visualize path for step '{step_id}'.") 

225 return 

226 

227 try: 

228 item_to_queue = { 

229 'type': 'data_path', # To distinguish from other potential queue items 

230 'step_id': step_id, 

231 'path': path, 

232 'backend': backend, 

233 'well_id': well_id 

234 } 

235 self.data_queue.put(item_to_queue) 

236 logger.debug(f"Queued path '{path}' for step '{step_id}' (well: {well_id}).") 

237 except Exception as e: 

238 logger.error(f"Error queueing path for visualization: {e}", exc_info=True) 

239 

240 def stop_viewer(self): 

241 """Signals the viewer thread to shut down and waits for it to join.""" 

242 logger.info("Attempting to stop Napari viewer...") 

243 with self._lock: 

244 if not self.is_running and self.viewer_thread is None: 

245 logger.info("Napari viewer was not running.") 

246 return 

247 if self.is_running: 

248 self.is_running = False 

249 self.data_queue.put(SHUTDOWN_SENTINEL) 

250 

251 if self.viewer_thread and self.viewer_thread.is_alive(): 

252 logger.info("Waiting for Napari viewer thread to join...") 

253 self.viewer_thread.join(timeout=5.0) 

254 if self.viewer_thread.is_alive(): 

255 logger.warning("Napari viewer thread did not join in time.") 

256 else: 

257 logger.info("Napari viewer thread joined successfully.") 

258 self.viewer_thread = None 

259 logger.info("NapariStreamVisualizer stopped.")