Coverage for openhcs/runtime/napari_stream_visualizer.py: 1.3%
531 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2Napari-based real-time visualization module for OpenHCS.
4This module provides the NapariStreamVisualizer class for real-time
5visualization of tensors during pipeline execution.
7Doctrinal Clauses:
8- Clause 65 — No Fallback Logic
9- Clause 66 — Immutability After Construction
10- Clause 88 — No Inferred Capabilities
11- Clause 368 — Visualization Must Be Observer-Only
12"""
14import logging
15import multiprocessing
16import os
17import queue
18import subprocess
19import sys
20import threading
21import time
22import zmq
23import numpy as np
24from typing import Any, Dict, Optional
26from openhcs.io.filemanager import FileManager
27from openhcs.utils.import_utils import optional_import
28from openhcs.constants.constants import DEFAULT_NAPARI_STREAM_PORT
30# Optional napari import - this module should only be imported if napari is available
31napari = optional_import("napari")
32if napari is None:
33 raise ImportError(
34 "napari is required for NapariStreamVisualizer. "
35 "Install it with: pip install 'openhcs[viz]' or pip install napari"
36 )
38import numpy as np
40logger = logging.getLogger(__name__)
42# Global process management for napari viewer
43_global_viewer_process: Optional[multiprocessing.Process] = None
44_global_viewer_port: Optional[int] = None
45_global_process_lock = threading.Lock()
48def _cleanup_global_viewer() -> None:
49 """
50 Clean up global napari viewer process for test mode.
52 This forcibly terminates the napari viewer process to allow pytest to exit.
53 Should only be called in test mode.
54 """
55 global _global_viewer_process
57 with _global_process_lock:
58 if _global_viewer_process and _global_viewer_process.is_alive():
59 logger.info("🔬 VISUALIZER: Terminating napari viewer for test cleanup")
60 _global_viewer_process.terminate()
61 _global_viewer_process.join(timeout=3)
63 if _global_viewer_process.is_alive():
64 logger.warning("🔬 VISUALIZER: Force killing napari viewer process")
65 _global_viewer_process.kill()
66 _global_viewer_process.join(timeout=1)
68 _global_viewer_process = None
71def _parse_component_info_from_path(path_str: str):
72 """
73 Fallback component parsing from path (used when component metadata unavailable).
75 Args:
76 path_str: Path string like 'step_name/A01/s1_c2_z3.tif'
78 Returns:
79 Dict with basic component info extracted from filename
80 """
81 try:
82 import os
83 import re
84 filename = os.path.basename(path_str)
86 # Basic regex for common patterns
87 pattern = r'(?:s(\d+))?(?:_c(\d+))?(?:_z(\d+))?'
88 match = re.search(pattern, filename)
90 components = {}
91 if match:
92 site, channel, z_index = match.groups()
93 if site:
94 components['site'] = site
95 if channel:
96 components['channel'] = channel
97 if z_index:
98 components['z_index'] = z_index
100 return components
101 except Exception:
102 return {}
105def _handle_component_aware_display(viewer, layers, component_groups, image_data, path,
106 colormap, display_config, replace_layers, component_metadata=None):
107 """
108 Handle component-aware display following OpenHCS stacking patterns.
110 Creates separate layers for each step and well, with proper component-based stacking.
111 Each step gets its own layer. Each well gets its own layer. Components marked as
112 SLICE create separate layers, components marked as STACK are stacked together.
113 """
114 try:
115 # Use component metadata from ZMQ message - fail loud if not available
116 if not component_metadata:
117 raise ValueError(f"No component metadata available for path: {path}")
118 component_info = component_metadata
120 # Get step information from component metadata
121 step_index = component_info.get('step_index', 0)
122 step_name = component_info.get('step_name', 'unknown_step')
124 # Create step prefix for layer naming
125 step_prefix = f"step_{step_index:02d}_{step_name}"
127 # Get well identifier (configurable: slice vs stack)
128 well_id = component_info.get('well', 'unknown_well')
130 # Build component_modes from config (dict or object), default to channel=slice, others=stack
131 component_modes = None
132 if isinstance(display_config, dict):
133 cm = display_config.get('component_modes') or display_config.get('componentModes')
134 if isinstance(cm, dict) and cm:
135 component_modes = cm
136 if component_modes is None:
137 # Handle object-like config (NapariDisplayConfig)
138 component_modes = {}
139 for component in ['site', 'channel', 'z_index', 'well']:
140 mode_field = f"{component}_mode"
141 if hasattr(display_config, mode_field):
142 mode_value = getattr(display_config, mode_field)
143 component_modes[component] = getattr(mode_value, 'value', str(mode_value))
144 else:
145 component_modes[component] = 'slice' if component == 'channel' else 'stack'
147 # Create layer grouping key: step_prefix + optionally well + slice_components
148 # Each unique combination gets its own layer
149 layer_key_parts = [step_prefix]
150 if component_modes.get('well', 'stack') == 'slice':
151 layer_key_parts.append(well_id)
153 # Add slice components to layer key (these create separate layers)
154 for component_name, mode in component_modes.items():
155 if mode == 'slice' and component_name in component_info and component_name != 'well':
156 layer_key_parts.append(f"{component_name}_{component_info[component_name]}")
158 layer_key = "_".join(layer_key_parts)
160 # Reconcile cached layer/group state with live napari viewer after possible manual deletions
161 try:
162 current_layer_names = {l.name for l in viewer.layers}
163 if layer_key not in current_layer_names:
164 # Drop any stale references so we will recreate the layer
165 layers.pop(layer_key, None)
166 component_groups.pop(layer_key, None)
167 logger.debug(f"🔬 NAPARI PROCESS: Reconciling state — '{layer_key}' not in viewer; purged stale caches")
168 except Exception:
169 # Fail-loud elsewhere; reconciliation is best-effort and must not mask display
170 pass
172 # Initialize layer group if needed
173 if layer_key not in component_groups:
174 component_groups[layer_key] = []
176 # Add image to layer group
177 component_groups[layer_key].append({
178 'data': image_data,
179 'components': component_info,
180 'path': str(path)
181 })
183 # Get all images for this layer
184 layer_images = component_groups[layer_key]
186 # Determine if we should stack or use single image
187 stack_components = [comp for comp, mode in component_modes.items()
188 if mode == 'stack' and comp in component_info]
190 if len(layer_images) == 1:
191 # Single image - add directly
192 layer_name = layer_key
194 # Check if layer exists in actual napari viewer
195 existing_layer = None
196 for layer in viewer.layers:
197 if layer.name == layer_name:
198 existing_layer = layer
199 break
201 if existing_layer is not None:
202 # Update existing layer
203 existing_layer.data = image_data
204 layers[layer_name] = existing_layer
205 logger.info(f"🔬 NAPARI PROCESS: Updated existing layer {layer_name}")
206 else:
207 # Create new layer - this should always work for new names
208 logger.info(f"🔬 NAPARI PROCESS: Creating new layer {layer_name}, viewer has {len(viewer.layers)} existing layers")
209 logger.info(f"🔬 NAPARI PROCESS: Existing layer names: {[layer.name for layer in viewer.layers]}")
210 logger.info(f"🔬 NAPARI PROCESS: Image data shape: {image_data.shape}, dtype: {image_data.dtype}")
211 logger.info(f"🔬 NAPARI PROCESS: Viewer type: {type(viewer)}")
212 try:
213 new_layer = viewer.add_image(image_data, name=layer_name, colormap=colormap)
214 layers[layer_name] = new_layer
215 logger.info(f"🔬 NAPARI PROCESS: Successfully created new layer {layer_name}")
216 logger.info(f"🔬 NAPARI PROCESS: Viewer now has {len(viewer.layers)} layers")
217 except Exception as e:
218 logger.error(f"🔬 NAPARI PROCESS: FAILED to create layer {layer_name}: {e}")
219 logger.error(f"🔬 NAPARI PROCESS: Exception type: {type(e)}")
220 import traceback
221 logger.error(f"🔬 NAPARI PROCESS: Traceback: {traceback.format_exc()}")
222 raise
223 else:
224 # Multiple images - create multi-dimensional array for napari
225 try:
226 # Sort images by stack components for consistent ordering
227 if stack_components:
228 def sort_key(img_info):
229 return tuple(img_info['components'].get(comp, 0) for comp in stack_components)
230 layer_images.sort(key=sort_key)
232 # Group images by stack component values to create proper dimensions
233 if len(stack_components) == 1:
234 # Single stack component - simple 3D stack
235 image_stack = [img['data'] for img in layer_images]
236 from openhcs.core.memory.stack_utils import stack_slices
237 stacked_data = stack_slices(image_stack, memory_type='numpy', gpu_id=0)
238 else:
239 # Multiple stack components - create multi-dimensional array
240 # Get unique values for each stack component
241 component_values = {}
242 for comp in stack_components:
243 values = sorted(set(img['components'].get(comp, 0) for img in layer_images))
244 component_values[comp] = values
246 # Create multi-dimensional array
247 # Shape: (comp1_size, comp2_size, ..., height, width)
248 shape_dims = [len(component_values[comp]) for comp in stack_components]
249 first_img = layer_images[0]['data']
250 full_shape = tuple(shape_dims + list(first_img.shape))
251 stacked_data = np.zeros(full_shape, dtype=first_img.dtype)
253 # Fill the multi-dimensional array
254 for img_info in layer_images:
255 # Get indices for this image
256 indices = []
257 for comp in stack_components:
258 comp_value = img_info['components'].get(comp, 0)
259 comp_index = component_values[comp].index(comp_value)
260 indices.append(comp_index)
262 # Place image in the correct position
263 stacked_data[tuple(indices)] = img_info['data']
265 # Update or create stack layer
266 layer_name = layer_key
268 # Check if layer exists in actual napari viewer
269 existing_layer = None
270 for layer in viewer.layers:
271 if layer.name == layer_name:
272 existing_layer = layer
273 break
275 if existing_layer is not None:
276 # Update existing layer
277 existing_layer.data = stacked_data
278 layers[layer_name] = existing_layer
279 logger.info(f"🔬 NAPARI PROCESS: Updated existing stack layer {layer_name} (shape: {stacked_data.shape})")
280 else:
281 # Create new layer - this should always work for new names
282 logger.info(f"🔬 NAPARI PROCESS: Creating new stack layer {layer_name}, viewer has {len(viewer.layers)} existing layers")
283 try:
284 new_layer = viewer.add_image(stacked_data, name=layer_name, colormap=colormap)
285 layers[layer_name] = new_layer
286 logger.info(f"🔬 NAPARI PROCESS: Successfully created new stack layer {layer_name} (shape: {stacked_data.shape})")
287 except Exception as e:
288 logger.error(f"🔬 NAPARI PROCESS: FAILED to create stack layer {layer_name}: {e}")
289 import traceback
290 logger.error(f"🔬 NAPARI PROCESS: Traceback: {traceback.format_exc()}")
291 raise
293 except Exception as e:
294 logger.error(f"🔬 NAPARI PROCESS: Failed to create stack for layer {layer_key}: {e}")
295 import traceback
296 logger.error(f"🔬 NAPARI PROCESS: Traceback: {traceback.format_exc()}")
297 raise
299 except Exception as e:
300 import traceback
301 logger.error(f"🔬 NAPARI PROCESS: Component-aware display failed for {path}: {e}")
302 logger.error(f"🔬 NAPARI PROCESS: Component-aware display traceback: {traceback.format_exc()}")
303 raise # Fail loud - no fallback
306def _napari_viewer_process(port: int, viewer_title: str, replace_layers: bool = False):
307 """
308 Napari viewer process entry point. Runs in a separate process.
309 Listens for ZeroMQ messages with image data to display.
311 Args:
312 port: ZMQ port to listen on
313 viewer_title: Title for the napari viewer window
314 replace_layers: If True, replace existing layers; if False, add new layers with unique names
315 """
316 try:
317 import zmq
318 import napari
319 import numpy as np
320 import pickle
322 # Set up ZeroMQ communication
323 context = zmq.Context()
325 # Data channel: SUB socket for receiving images
326 data_socket = context.socket(zmq.SUB)
327 data_socket.bind(f"tcp://*:{port}")
328 data_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages
330 # Control channel: REP socket for handshake
331 control_port = port + 1000 # Use port+1000 for control
332 control_socket = context.socket(zmq.REP)
333 control_socket.bind(f"tcp://*:{control_port}")
335 # Create napari viewer in this process (main thread)
336 viewer = napari.Viewer(title=viewer_title, show=True)
338 # Initialize layers dictionary with existing layers (for reconnection scenarios)
339 layers = {}
340 for layer in viewer.layers:
341 layers[layer.name] = layer
343 napari_ready = False # Track readiness state
345 # Component grouping for stacking (following OpenHCS pattern)
346 component_groups = {} # {component_type: {group_key: [images]}}
347 display_config = None # Store display config for component mode decisions
349 logger.info(f"🔬 NAPARI PROCESS: Viewer started on data port {port}, control port {control_port}")
351 # Add cleanup handler for when viewer is closed
352 def cleanup_and_exit():
353 logger.info("🔬 NAPARI PROCESS: Viewer closed, cleaning up and exiting...")
354 try:
355 data_socket.close()
356 control_socket.close()
357 context.term()
358 except:
359 pass
360 sys.exit(0)
362 # Connect the viewer close event to cleanup
363 viewer.window.qt_viewer.destroyed.connect(cleanup_and_exit)
365 # Use proper Qt event loop integration
366 import sys
367 from qtpy import QtWidgets, QtCore
369 # Ensure Qt platform is properly set for detached processes
370 import os
371 if 'QT_QPA_PLATFORM' not in os.environ:
372 os.environ['QT_QPA_PLATFORM'] = 'xcb'
374 # Disable shared memory for X11 (helps with display issues in detached processes)
375 os.environ['QT_X11_NO_MITSHM'] = '1'
377 # Get the Qt application
378 app = QtWidgets.QApplication.instance()
379 if app is None:
380 app = QtWidgets.QApplication(sys.argv)
382 # Ensure the application DOES quit when the napari window closes
383 app.setQuitOnLastWindowClosed(True)
385 # Set up a QTimer for message processing
386 timer = QtCore.QTimer()
388 def process_messages():
389 nonlocal napari_ready
390 # Handle control messages (handshake) first
391 try:
392 control_message = control_socket.recv(zmq.NOBLOCK)
393 control_data = pickle.loads(control_message)
395 if control_data.get('type') == 'ping':
396 # Client is checking if we're ready
397 if not napari_ready:
398 # Mark as ready after first ping (GUI should be loaded by now)
399 napari_ready = True
400 logger.info(f"🔬 NAPARI PROCESS: Marked as ready after ping")
402 response = {'type': 'pong', 'ready': napari_ready}
403 control_socket.send(pickle.dumps(response))
404 logger.debug(f"🔬 NAPARI PROCESS: Responded to ping with ready={napari_ready}")
406 except zmq.Again:
407 pass # No control messages
409 # Debug: Print current layer count (only when layers change)
410 # Removed continuous debug printing to avoid terminal spam
412 # Process data messages only if ready
413 if napari_ready:
414 # Process multiple messages per timer tick for better throughput
415 for _ in range(10): # Process up to 10 messages per tick
416 try:
417 message = data_socket.recv(zmq.NOBLOCK)
419 # Try to parse as JSON first (from NapariStreamingBackend)
420 import json
421 data = json.loads(message.decode('utf-8'))
423 # Check if this is a batch message
424 if data.get('type') == 'batch':
425 # Handle batch of images
426 images = data.get('images', [])
427 display_config_dict = data.get('display_config')
429 # Use display_config_dict directly - no need to store globally
431 # Process all images in the batch together
432 for image_info in images:
433 path = image_info.get('path', 'unknown')
434 shape = image_info.get('shape')
435 dtype = image_info.get('dtype')
436 shm_name = image_info.get('shm_name')
437 component_metadata = image_info.get('component_metadata', {})
439 # Add step information to component metadata
440 component_metadata['step_index'] = image_info.get('step_index', 0)
441 component_metadata['step_name'] = image_info.get('step_name', 'unknown_step')
443 # Load from shared memory
444 from multiprocessing import shared_memory
445 shm = shared_memory.SharedMemory(name=shm_name)
446 image_data = np.ndarray(shape, dtype=dtype, buffer=shm.buf).copy()
447 shm.close() # Close our reference
449 # Extract colormap from display config
450 colormap = 'viridis' # Default
451 if display_config_dict and 'colormap' in display_config_dict:
452 colormap = display_config_dict['colormap']
454 # Component-aware layer management (following OpenHCS pattern)
455 _handle_component_aware_display(
456 viewer, layers, component_groups, image_data, path,
457 colormap, display_config_dict or {}, replace_layers, component_metadata
458 )
459 else:
460 # Handle single image format
461 path = data.get('path', 'unknown')
462 shape = data.get('shape')
463 dtype = data.get('dtype')
464 shm_name = data.get('shm_name')
465 direct_data = data.get('data')
466 display_config_dict = data.get('display_config')
467 component_metadata = data.get('component_metadata', {})
469 # Add step information to component metadata
470 component_metadata['step_index'] = data.get('step_index', 0)
471 component_metadata['step_name'] = data.get('step_name', 'unknown_step')
473 # Use display_config_dict directly - no need to store globally
475 if shm_name:
476 # Load from shared memory
477 from multiprocessing import shared_memory
478 shm = shared_memory.SharedMemory(name=shm_name)
479 image_data = np.ndarray(shape, dtype=dtype, buffer=shm.buf).copy()
480 shm.close() # Close our reference
481 elif direct_data:
482 # Load from direct data (fallback)
483 image_data = np.array(direct_data, dtype=dtype).reshape(shape)
484 else:
485 logger.warning(f"🔬 NAPARI PROCESS: No image data in message")
486 continue
488 # Extract colormap from display config
489 colormap = 'viridis' # Default
490 if display_config_dict and 'colormap' in display_config_dict:
491 colormap = display_config_dict['colormap']
493 # Component-aware layer management (following OpenHCS pattern)
494 _handle_component_aware_display(
495 viewer, layers, component_groups, image_data, path,
496 colormap, display_config_dict or {}, replace_layers, component_metadata
497 )
501 except zmq.Again:
502 # No more messages available
503 break
507 # Connect timer to message processing
508 timer.timeout.connect(process_messages)
509 timer.start(50) # Process messages every 50ms
511 logger.info("🔬 NAPARI PROCESS: Starting Qt event loop")
513 # Run the Qt event loop - this keeps napari responsive
514 app.exec_()
516 except Exception as e:
517 logger.error(f"🔬 NAPARI PROCESS: Fatal error: {e}")
518 finally:
519 logger.info("🔬 NAPARI PROCESS: Shutting down")
520 if 'socket' in locals():
521 socket.close()
522 if 'context' in locals():
523 context.term()
526def _spawn_detached_napari_process(port: int, viewer_title: str, replace_layers: bool = False) -> subprocess.Popen:
527 """
528 Spawn a completely detached napari viewer process that survives parent termination.
530 This creates a subprocess that runs independently and won't be terminated when
531 the parent process exits, enabling true persistence across pipeline runs.
532 """
533 # Use a simpler approach: spawn python directly with the napari viewer module
534 # This avoids temporary file issues and import problems
536 # Create the command to run the napari viewer directly
537 current_dir = os.getcwd()
538 python_code = f'''
539import sys
540import os
542# Detach from parent process group (Unix only)
543if hasattr(os, "setsid"):
544 try:
545 os.setsid()
546 except OSError:
547 pass
549# Add current working directory to Python path
550sys.path.insert(0, "{current_dir}")
552try:
553 from openhcs.runtime.napari_stream_visualizer import _napari_viewer_process
554 _napari_viewer_process({port}, "{viewer_title}", {replace_layers})
555except Exception as e:
556 import logging
557 logger = logging.getLogger("openhcs.runtime.napari_detached")
558 logger.error(f"Detached napari error: {{e}}")
559 import traceback
560 logger.error(traceback.format_exc())
561 sys.exit(1)
562'''
564 try:
565 # Use subprocess.Popen with detachment flags
566 if sys.platform == "win32":
567 # Windows: Use CREATE_NEW_PROCESS_GROUP to detach but preserve display environment
568 env = os.environ.copy() # Preserve environment variables
569 process = subprocess.Popen(
570 [sys.executable, "-c", python_code],
571 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.DETACHED_PROCESS,
572 env=env,
573 cwd=os.getcwd()
574 # Don't redirect stdout/stderr to allow GUI to display properly
575 )
576 else:
577 # Unix: Use start_new_session to detach but preserve display environment
578 env = os.environ.copy() # Preserve DISPLAY and other environment variables
580 # Ensure Qt platform is set for GUI display
581 if 'QT_QPA_PLATFORM' not in env:
582 env['QT_QPA_PLATFORM'] = 'xcb' # Use X11 backend
584 # Ensure Qt can find the display
585 env['QT_X11_NO_MITSHM'] = '1' # Disable shared memory for X11 (helps with some display issues)
587 # Try without start_new_session to see if GUI displays properly
588 process = subprocess.Popen(
589 [sys.executable, "-c", python_code],
590 env=env,
591 cwd=os.getcwd()
592 # Don't redirect stdout/stderr to allow GUI to display properly
593 )
595 logger.info(f"🔬 VISUALIZER: Detached napari process started (PID: {process.pid})")
596 return process
598 except Exception as e:
599 logger.error(f"🔬 VISUALIZER: Failed to spawn detached napari process: {e}")
600 raise e
603class NapariStreamVisualizer:
604 """
605 Manages a Napari viewer instance for real-time visualization of tensors
606 streamed from the OpenHCS pipeline. Runs napari in a separate process
607 for Qt compatibility and true persistence across pipeline runs.
608 """
610 def __init__(self, filemanager: FileManager, visualizer_config, viewer_title: str = "OpenHCS Real-Time Visualization", persistent: bool = True, napari_port: int = DEFAULT_NAPARI_STREAM_PORT, replace_layers: bool = False, display_config=None):
611 self.filemanager = filemanager
612 self.viewer_title = viewer_title
613 self.persistent = persistent # If True, viewer process stays alive after pipeline completion
614 self.visualizer_config = visualizer_config
615 self.napari_port = napari_port # Port for napari streaming
616 self.replace_layers = replace_layers # If True, replace existing layers; if False, add new layers
617 self.display_config = display_config # Configuration for display behavior
618 self.port: Optional[int] = None
619 self.process: Optional[multiprocessing.Process] = None
620 self.zmq_context: Optional[zmq.Context] = None
621 self.zmq_socket: Optional[zmq.Socket] = None
622 self.is_running = False
623 self._lock = threading.Lock()
625 # Clause 368: Visualization must be observer-only.
626 # This class will only read data and display it.
628 def _find_free_port(self) -> int:
629 """Find a free port for ZeroMQ communication."""
630 import socket
631 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
632 s.bind(('', 0))
633 return s.getsockname()[1]
635 def start_viewer(self):
636 """Starts the Napari viewer in a separate process."""
637 global _global_viewer_process, _global_viewer_port
639 with self._lock:
640 # Check if there's already a napari viewer running on the configured port
641 port_in_use = self._is_port_in_use(self.napari_port)
642 logger.info(f"🔬 VISUALIZER: Port {self.napari_port} in use: {port_in_use}")
643 if port_in_use:
644 logger.info(f"🔬 VISUALIZER: Reusing existing napari viewer on port {self.napari_port}")
645 # Set the port and connect to existing viewer
646 self.port = self.napari_port
648 # Check if we have a reference to the global process
649 with _global_process_lock:
650 if _global_viewer_process and _global_viewer_port == self.napari_port:
651 self.process = _global_viewer_process
652 logger.info(f"🔬 VISUALIZER: Found global process reference (PID: {self.process.pid})")
653 else:
654 self.process = None # External process we don't control
655 logger.info("🔬 VISUALIZER: No global process reference (external viewer)")
657 self._setup_zmq_client()
658 self.is_running = True
659 return
661 if self.is_running:
662 logger.warning("Napari viewer is already running.")
663 return
665 # Use configured port for napari streaming
666 self.port = self.napari_port
667 logger.info(f"🔬 VISUALIZER: Starting napari viewer process on port {self.port}")
669 if self.persistent:
670 # For persistent viewers, use detached subprocess that truly survives parent termination
671 logger.info("🔬 VISUALIZER: Creating detached persistent napari viewer")
672 self.process = _spawn_detached_napari_process(self.port, self.viewer_title, self.replace_layers)
673 else:
674 # For non-persistent viewers, use multiprocessing.Process
675 logger.info("🔬 VISUALIZER: Creating non-persistent napari viewer")
676 self.process = multiprocessing.Process(
677 target=_napari_viewer_process,
678 args=(self.port, self.viewer_title, self.replace_layers),
679 daemon=False
680 )
681 self.process.start()
683 # Update global references
684 with _global_process_lock:
685 _global_viewer_process = self.process
686 _global_viewer_port = self.port
688 # Wait for napari viewer to be ready before setting up ZMQ
689 self._wait_for_viewer_ready()
691 # Set up ZeroMQ client
692 self._setup_zmq_client()
694 # Check if process is running (different methods for subprocess vs multiprocessing)
695 if hasattr(self.process, 'is_alive'):
696 # multiprocessing.Process
697 process_alive = self.process.is_alive()
698 else:
699 # subprocess.Popen
700 process_alive = self.process.poll() is None
702 if process_alive:
703 self.is_running = True
704 logger.info(f"🔬 VISUALIZER: Napari viewer process started successfully (PID: {self.process.pid})")
705 else:
706 logger.error("🔬 VISUALIZER: Failed to start napari viewer process")
708 def _try_connect_to_existing_viewer(self, port: int) -> bool:
709 """Try to connect to an existing napari viewer on the given port."""
710 import socket
712 # First check if anything is listening on the port
713 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
714 sock.settimeout(0.1) # 100ms timeout
715 try:
716 result = sock.connect_ex(('localhost', port))
717 sock.close()
719 if result == 0: # Port is open
720 # Set up ZMQ connection
721 self._setup_zmq_client()
722 return True
723 else:
724 return False
726 except Exception:
727 return False
729 def _is_port_in_use(self, port: int) -> bool:
730 """Check if a port is already in use (indicating existing napari viewer)."""
731 import socket
733 # Check if any process is listening on this port
734 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
735 sock.settimeout(0.1)
736 try:
737 # Try to bind to the port - if it fails, something is already using it
738 sock.bind(('localhost', port))
739 sock.close()
740 return False # Port is free
741 except OSError:
742 # Port is already in use
743 sock.close()
744 return True
745 except Exception:
746 return False
748 def _wait_for_viewer_ready(self, timeout: float = 10.0) -> bool:
749 """Wait for the napari viewer to be ready using handshake protocol."""
750 import zmq
752 logger.info(f"🔬 VISUALIZER: Waiting for napari viewer to be ready on port {self.port}...")
754 # First wait for ports to be bound
755 start_time = time.time()
756 while time.time() - start_time < timeout:
757 if self._is_port_in_use(self.port) and self._is_port_in_use(self.port + 1000):
758 break
759 time.sleep(0.2)
760 else:
761 logger.warning(f"🔬 VISUALIZER: Timeout waiting for ports to be bound")
762 return False
764 # Now use handshake protocol - create fresh socket for each attempt
765 start_time = time.time()
766 while time.time() - start_time < timeout:
767 control_context = zmq.Context()
768 control_socket = control_context.socket(zmq.REQ)
769 control_socket.setsockopt(zmq.LINGER, 0)
770 control_socket.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout
772 try:
773 control_socket.connect(f"tcp://localhost:{self.port + 1000}")
775 import pickle
776 ping_message = {'type': 'ping'}
777 control_socket.send(pickle.dumps(ping_message))
779 response = control_socket.recv()
780 response_data = pickle.loads(response)
782 if response_data.get('type') == 'pong' and response_data.get('ready'):
783 logger.info(f"🔬 VISUALIZER: Napari viewer is ready on port {self.port}")
784 return True
786 except zmq.Again:
787 pass # Timeout waiting for response
788 except Exception as e:
789 logger.debug(f"🔬 VISUALIZER: Handshake attempt failed: {e}")
790 finally:
791 control_socket.close()
792 control_context.term()
794 time.sleep(0.5) # Wait before next ping
796 logger.warning(f"🔬 VISUALIZER: Timeout waiting for napari viewer handshake")
797 return False
799 def _setup_zmq_client(self):
800 """Set up ZeroMQ client to send data to viewer process."""
801 if self.port is None:
802 raise RuntimeError("Port not set - call start_viewer() first")
804 self.zmq_context = zmq.Context()
805 self.zmq_socket = self.zmq_context.socket(zmq.PUB)
806 self.zmq_socket.connect(f"tcp://localhost:{self.port}")
808 # Brief delay for ZMQ connection to establish
809 time.sleep(0.1)
810 logger.info(f"🔬 VISUALIZER: ZMQ client connected to port {self.port}")
812 def send_image_data(self, step_id: str, image_data: np.ndarray, axis_id: str = "unknown"):
813 """
814 DISABLED: This method bypasses component-aware stacking.
815 All visualization must go through the streaming backend.
816 """
817 raise RuntimeError(
818 f"send_image_data() is disabled. Use streaming backend for component-aware display. "
819 f"step_id: {step_id}, axis_id: {axis_id}, shape: {image_data.shape}"
820 )
822 def stop_viewer(self):
823 """Stop the napari viewer process (only if not persistent)."""
824 with self._lock:
825 if not self.persistent:
826 logger.info("🔬 VISUALIZER: Stopping non-persistent napari viewer")
827 self._cleanup_zmq()
828 if self.process:
829 # Handle both subprocess and multiprocessing process types
830 if hasattr(self.process, 'is_alive'):
831 # multiprocessing.Process
832 if self.process.is_alive():
833 self.process.terminate()
834 self.process.join(timeout=5)
835 if self.process.is_alive():
836 logger.warning("🔬 VISUALIZER: Force killing napari viewer process")
837 self.process.kill()
838 else:
839 # subprocess.Popen
840 if self.process.poll() is None: # Still running
841 self.process.terminate()
842 try:
843 self.process.wait(timeout=5)
844 except subprocess.TimeoutExpired:
845 logger.warning("🔬 VISUALIZER: Force killing napari viewer process")
846 self.process.kill()
847 self.is_running = False
848 else:
849 logger.info("🔬 VISUALIZER: Keeping persistent napari viewer alive")
850 # Just cleanup our ZMQ connection, leave process running
851 self._cleanup_zmq()
852 # DON'T set is_running = False for persistent viewers!
853 # The process is still alive and should be reusable
855 def _cleanup_zmq(self):
856 """Clean up ZeroMQ resources."""
857 if self.zmq_socket:
858 self.zmq_socket.close()
859 self.zmq_socket = None
860 if self.zmq_context:
861 self.zmq_context.term()
862 self.zmq_context = None
864 def visualize_path(self, step_id: str, path: str, backend: str, axis_id: Optional[str] = None):
865 """
866 DISABLED: This method bypasses component-aware stacking.
867 All visualization must go through the streaming backend.
868 """
869 raise RuntimeError(
870 f"visualize_path() is disabled. Use streaming backend for component-aware display. "
871 f"Path: {path}, step_id: {step_id}, axis_id: {axis_id}"
872 )
874 def _prepare_data_for_display(self, data: Any, step_id_for_log: str, display_config=None) -> Optional[np.ndarray]:
875 """Converts loaded data to a displayable NumPy array (slice or stack based on config)."""
876 cpu_tensor: Optional[np.ndarray] = None
877 try:
878 # GPU to CPU conversion logic
879 if hasattr(data, 'is_cuda') and data.is_cuda: # PyTorch
880 cpu_tensor = data.cpu().numpy()
881 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # Check for device attribute
882 if hasattr(data, 'get'): # CuPy
883 cpu_tensor = data.get()
884 elif hasattr(data, 'numpy'): # JAX on GPU might have .numpy() after host transfer
885 cpu_tensor = np.asarray(data) # JAX arrays might need explicit conversion
886 else: # Fallback for other GPU array types if possible
887 logger.warning(f"Unknown GPU array type for step '{step_id_for_log}'. Attempting .numpy().")
888 if hasattr(data, 'numpy'):
889 cpu_tensor = data.numpy()
890 else:
891 logger.error(f"Cannot convert GPU tensor of type {type(data)} for step '{step_id_for_log}'.")
892 return None
893 elif isinstance(data, np.ndarray):
894 cpu_tensor = data
895 else:
896 # Attempt to convert to numpy array if it's some other array-like structure
897 try:
898 cpu_tensor = np.asarray(data)
899 logger.debug(f"Converted data of type {type(data)} to numpy array for step '{step_id_for_log}'.")
900 except Exception as e_conv:
901 logger.warning(f"Unsupported data type for step '{step_id_for_log}': {type(data)}. Error: {e_conv}")
902 return None
904 if cpu_tensor is None: # Should not happen if logic above is correct
905 return None
907 # Determine display mode based on configuration
908 # Default behavior: show as stack unless config specifies otherwise
909 should_slice = False
911 if display_config:
912 # Check if any component mode is set to SLICE
913 from openhcs.core.config import NapariDimensionMode
914 from openhcs.constants import VariableComponents
916 # Check individual component mode fields
917 for component in VariableComponents:
918 field_name = f"{component.value}_mode"
919 if hasattr(display_config, field_name):
920 mode = getattr(display_config, field_name)
921 if mode == NapariDimensionMode.SLICE:
922 should_slice = True
923 break
924 else:
925 # Default: slice for backward compatibility
926 should_slice = True
928 # Slicing/stacking logic
929 display_data: Optional[np.ndarray] = None
931 if should_slice:
932 # Original slicing behavior
933 if cpu_tensor.ndim == 3: # ZYX
934 display_data = cpu_tensor[cpu_tensor.shape[0] // 2, :, :]
935 elif cpu_tensor.ndim == 2: # YX
936 display_data = cpu_tensor
937 elif cpu_tensor.ndim > 3: # e.g. CZYX or TZYX
938 logger.debug(f"Tensor for step '{step_id_for_log}' has ndim > 3 ({cpu_tensor.ndim}). Taking a slice.")
939 slicer = [0] * (cpu_tensor.ndim - 2) # Slice first channels/times
940 slicer[-1] = cpu_tensor.shape[-3] // 2 # Middle Z
941 try:
942 display_data = cpu_tensor[tuple(slicer)]
943 except IndexError: # Handle cases where slicing might fail (e.g. very small dimensions)
944 logger.error(f"Slicing failed for tensor with shape {cpu_tensor.shape} for step '{step_id_for_log}'.", exc_info=True)
945 display_data = None
946 else:
947 logger.warning(f"Tensor for step '{step_id_for_log}' has unsupported ndim for slicing: {cpu_tensor.ndim}.")
948 return None
949 else:
950 # Stack mode: send the full data to napari (napari can handle 3D+ data)
951 if cpu_tensor.ndim >= 2:
952 display_data = cpu_tensor
953 logger.debug(f"Sending {cpu_tensor.ndim}D stack to napari for step '{step_id_for_log}' (shape: {cpu_tensor.shape})")
954 else:
955 logger.warning(f"Tensor for step '{step_id_for_log}' has unsupported ndim for stacking: {cpu_tensor.ndim}.")
956 return None
958 return display_data.copy() if display_data is not None else None
960 except Exception as e:
961 logger.error(f"Error preparing data from step '{step_id_for_log}' for display: {e}", exc_info=True)
962 return None