Coverage for openhcs/runtime/napari_stream_visualizer.py: 4.5%
164 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Napari-based real-time visualization module for OpenHCS.
4This module provides the NapariStreamVisualizer class for real-time
5visualization of tensors during pipeline execution.
7Doctrinal Clauses:
8- Clause 65 — No Fallback Logic
9- Clause 66 — Immutability After Construction
10- Clause 88 — No Inferred Capabilities
11- Clause 368 — Visualization Must Be Observer-Only
12"""
14import logging
15import queue
16import threading
17from typing import Any, Dict, Optional
19from openhcs.io.filemanager import FileManager
20from openhcs.utils.import_utils import optional_import
22# Optional napari import - this module should only be imported if napari is available
23napari = optional_import("napari")
24if napari is None: 24 ↛ 30line 24 didn't jump to line 30 because the condition on line 24 was always true
25 raise ImportError(
26 "napari is required for NapariStreamVisualizer. "
27 "Install it with: pip install 'openhcs[viz]' or pip install napari"
28 )
30import numpy as np
32logger = logging.getLogger(__name__)
34# Sentinel object to signal the viewer thread to shut down
35SHUTDOWN_SENTINEL = object()
37class NapariStreamVisualizer:
38 """
39 Manages a Napari viewer instance for real-time visualization of tensors
40 streamed from the OpenHCS pipeline. Runs in a separate thread.
41 """
43 def __init__(self, filemanager: FileManager, viewer_title: str = "OpenHCS Real-Time Visualization"):
44 self.filemanager = filemanager # Added
45 self.viewer_title = viewer_title
46 self.viewer: Optional[napari.Viewer] = None
47 self.layers: Dict[str, napari.layers.Image] = {} # Consider if layer type should be more generic
48 self.data_queue = queue.Queue()
49 self.viewer_thread: Optional[threading.Thread] = None
50 self.is_running = False
51 self._lock = threading.Lock()
53 # Clause 368: Visualization must be observer-only.
54 # This class will only read data and display it.
56 def _initialize_viewer_in_thread(self):
57 """
58 Initializes and runs the Napari viewer event loop.
59 This method is intended to be run in a separate thread.
60 """
61 try:
62 logger.info("Napari viewer thread started.")
63 # napari.gui_qt() ensures the Qt event loop is running if not already.
64 # It's crucial for running napari in a non-blocking way from a script.
65 with napari.gui_qt():
66 self.viewer = napari.Viewer(title=self.viewer_title, show=True)
67 self.is_running = True
68 logger.info("Napari viewer initialized and shown.")
70 while self.is_running:
71 try:
72 # Wait for data with a timeout to allow checking self.is_running
73 item = self.data_queue.get(timeout=0.1)
74 if item is SHUTDOWN_SENTINEL:
75 logger.info("Shutdown sentinel received. Exiting viewer loop.")
76 break
78 # New logic for path-based items:
79 if isinstance(item, dict) and item.get('type') == 'data_path':
80 step_id = item['step_id']
81 path = item['path']
82 backend = item['backend']
83 well_id = item.get('well_id') # Can be None
85 logger.debug(f"Processing path '{path}' for step '{step_id}' from queue.")
86 try:
87 # Load data using FileManager
88 loaded_data = self.filemanager.load(path, backend)
89 if loaded_data is not None:
90 # Prepare data for display (includes GPU->CPU, slicing)
91 display_data = self._prepare_data_for_display(loaded_data, step_id)
93 if display_data is not None:
94 layer_name = f"{well_id}_{step_id}" if well_id else step_id
95 # Metadata might come from step_plan or be fixed for now
96 metadata = {'colormap': 'gray'}
97 self._update_layer_in_thread(layer_name, display_data, metadata)
98 # else: (logging already in _prepare_data_for_display)
99 else:
100 logger.warning(f"FileManager returned None for path '{path}', backend '{backend}' (step '{step_id}').")
101 except Exception as e_load:
102 logger.error(f"Error loading or preparing data for step '{step_id}', path '{path}': {e_load}", exc_info=True)
103 else:
104 logger.warning(f"Unknown item type in data queue: {type(item)}. Item: {item}")
106 self.data_queue.task_done()
107 except queue.Empty:
108 continue # Timeout, check self.is_running again
109 except Exception as e:
110 logger.error(f"Error processing item in viewer thread: {e}", exc_info=True)
111 # Clause 65: Fail loudly (within the visualizer context) but don't crash pipeline.
112 logger.info("Napari viewer event loop exited.")
113 except Exception as e:
114 logger.error(f"Fatal error in Napari viewer thread: {e}", exc_info=True)
115 finally:
116 self.is_running = False # Ensure flag is cleared
117 if self.viewer:
118 self.viewer.close()
119 logger.info("Napari viewer thread finished.")
122 def _update_layer_in_thread(self, layer_name: str, data: np.ndarray, metadata: Optional[Dict] = None):
123 """
124 Updates or creates a layer in the Napari viewer. Must be called from the viewer thread.
125 """
126 if not self.viewer:
127 logger.warning("Viewer not initialized, cannot update layer.")
128 return
130 try:
131 if layer_name in self.layers:
132 self.layers[layer_name].data = data
133 logger.debug(f"Updated layer: {layer_name} with shape {data.shape}")
134 else:
135 self.layers[layer_name] = self.viewer.add_image(data, name=layer_name)
136 logger.info(f"Added new layer: {layer_name} with shape {data.shape}")
138 if metadata and 'colormap' in metadata and self.layers[layer_name]:
139 self.layers[layer_name].colormap = metadata['colormap']
140 if metadata and 'contrast_limits' in metadata and self.layers[layer_name]:
141 self.layers[layer_name].contrast_limits = metadata['contrast_limits']
143 except Exception as e:
144 logger.error(f"Error updating Napari layer '{layer_name}': {e}", exc_info=True)
146 def start_viewer(self):
147 """Starts the Napari viewer in a separate thread."""
148 with self._lock:
149 if self.is_running or self.viewer_thread is not None:
150 logger.warning("Napari viewer is already running or starting.")
151 return
153 self.viewer_thread = threading.Thread(target=self._initialize_viewer_in_thread, daemon=True)
154 self.viewer_thread.start()
155 logger.info("NapariStreamVisualizer viewer thread initiated.")
157 def _prepare_data_for_display(self, data: Any, step_id_for_log: str) -> Optional[np.ndarray]:
158 """Converts loaded data to a displayable NumPy array (e.g., 2D slice)."""
159 cpu_tensor: Optional[np.ndarray] = None
160 try:
161 # GPU to CPU conversion logic
162 if hasattr(data, 'is_cuda') and data.is_cuda: # PyTorch
163 cpu_tensor = data.cpu().numpy()
164 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # Check for device attribute
165 if hasattr(data, 'get'): # CuPy
166 cpu_tensor = data.get()
167 elif hasattr(data, 'numpy'): # JAX on GPU might have .numpy() after host transfer
168 cpu_tensor = np.asarray(data) # JAX arrays might need explicit conversion
169 else: # Fallback for other GPU array types if possible
170 logger.warning(f"Unknown GPU array type for step '{step_id_for_log}'. Attempting .numpy().")
171 if hasattr(data, 'numpy'):
172 cpu_tensor = data.numpy()
173 else:
174 logger.error(f"Cannot convert GPU tensor of type {type(data)} for step '{step_id_for_log}'.")
175 return None
176 elif isinstance(data, np.ndarray):
177 cpu_tensor = data
178 else:
179 # Attempt to convert to numpy array if it's some other array-like structure
180 try:
181 cpu_tensor = np.asarray(data)
182 logger.debug(f"Converted data of type {type(data)} to numpy array for step '{step_id_for_log}'.")
183 except Exception as e_conv:
184 logger.warning(f"Unsupported data type for step '{step_id_for_log}': {type(data)}. Error: {e_conv}")
185 return None
187 if cpu_tensor is None: # Should not happen if logic above is correct
188 return None
190 # Slicing logic
191 display_slice: Optional[np.ndarray] = None
192 if cpu_tensor.ndim == 3: # ZYX
193 display_slice = cpu_tensor[cpu_tensor.shape[0] // 2, :, :]
194 elif cpu_tensor.ndim == 2: # YX
195 display_slice = cpu_tensor
196 elif cpu_tensor.ndim > 3: # e.g. CZYX or TZYX
197 logger.warning(f"Tensor for step '{step_id_for_log}' has ndim > 3 ({cpu_tensor.ndim}). Taking a default slice.")
198 slicer = [0] * (cpu_tensor.ndim - 2) # Slice first channels/times
199 slicer[-1] = cpu_tensor.shape[-3] // 2 # Middle Z
200 try:
201 display_slice = cpu_tensor[tuple(slicer)]
202 except IndexError: # Handle cases where slicing might fail (e.g. very small dimensions)
203 logger.error(f"Slicing failed for tensor with shape {cpu_tensor.shape} for step '{step_id_for_log}'.", exc_info=True)
204 display_slice = None
205 else:
206 logger.warning(f"Tensor for step '{step_id_for_log}' has unsupported ndim for display: {cpu_tensor.ndim}.")
207 return None
209 return display_slice.copy() if display_slice is not None else None
211 except Exception as e:
212 logger.error(f"Error preparing data from step '{step_id_for_log}' for display: {e}", exc_info=True)
213 return None
215 def visualize_path(self, step_id: str, path: str, backend: str, well_id: Optional[str] = None):
216 """
217 Receives a VFS path, backend, and associated info, and queues it for display.
218 """
219 if not self.is_running and self.viewer_thread is None:
220 logger.info(f"Visualizer not running for step '{step_id}'. Starting Napari viewer.")
221 self.start_viewer()
223 if not self.viewer_thread: # Check if thread actually started
224 logger.warning(f"Visualizer thread not available. Cannot visualize path for step '{step_id}'.")
225 return
227 try:
228 item_to_queue = {
229 'type': 'data_path', # To distinguish from other potential queue items
230 'step_id': step_id,
231 'path': path,
232 'backend': backend,
233 'well_id': well_id
234 }
235 self.data_queue.put(item_to_queue)
236 logger.debug(f"Queued path '{path}' for step '{step_id}' (well: {well_id}).")
237 except Exception as e:
238 logger.error(f"Error queueing path for visualization: {e}", exc_info=True)
240 def stop_viewer(self):
241 """Signals the viewer thread to shut down and waits for it to join."""
242 logger.info("Attempting to stop Napari viewer...")
243 with self._lock:
244 if not self.is_running and self.viewer_thread is None:
245 logger.info("Napari viewer was not running.")
246 return
247 if self.is_running:
248 self.is_running = False
249 self.data_queue.put(SHUTDOWN_SENTINEL)
251 if self.viewer_thread and self.viewer_thread.is_alive():
252 logger.info("Waiting for Napari viewer thread to join...")
253 self.viewer_thread.join(timeout=5.0)
254 if self.viewer_thread.is_alive():
255 logger.warning("Napari viewer thread did not join in time.")
256 else:
257 logger.info("Napari viewer thread joined successfully.")
258 self.viewer_thread = None
259 logger.info("NapariStreamVisualizer stopped.")