Coverage for openhcs/runtime/napari_stream_visualizer.py: 0.9%
894 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Napari-based real-time visualization module for OpenHCS.
4This module provides the NapariStreamVisualizer class for real-time
5visualization of tensors during pipeline execution.
7Doctrinal Clauses:
8- Clause 65 — No Fallback Logic
9- Clause 66 — Immutability After Construction
10- Clause 88 — No Inferred Capabilities
11- Clause 368 — Visualization Must Be Observer-Only
12"""
14import logging
15import multiprocessing
16import os
17import subprocess
18import sys
19import threading
20import time
21import zmq
22import numpy as np
23from typing import Any, Dict, Optional
24from qtpy.QtCore import QTimer
26from openhcs.io.filemanager import FileManager
27from openhcs.utils.import_utils import optional_import
28from openhcs.runtime.zmq_base import ZMQServer, SHARED_ACK_PORT, get_zmq_transport_url
29from openhcs.runtime.zmq_messages import ImageAck
30from openhcs.core.config import TransportMode, NapariStreamingConfig
32# Optional napari import - this module should only be imported if napari is available
33napari = optional_import("napari")
34if napari is None:
35 raise ImportError(
36 "napari is required for NapariStreamVisualizer. "
37 "Install it with: pip install 'openhcs[viz]' or pip install napari"
38 )
41logger = logging.getLogger(__name__)
43# ZMQ connection delay (ms)
44ZMQ_CONNECTION_DELAY_MS = 100 # Brief delay for ZMQ connection to establish
46# Global process management for napari viewer
47_global_viewer_process: Optional[multiprocessing.Process] = None
48_global_viewer_port: Optional[int] = None
49_global_process_lock = threading.Lock()
51# Registry of data type handlers (will be populated after helper functions are defined)
52_DATA_TYPE_HANDLERS = None
55def _cleanup_global_viewer() -> None:
56 """
57 Clean up global napari viewer process for test mode.
59 This forcibly terminates the napari viewer process to allow pytest to exit.
60 Should only be called in test mode.
61 """
62 global _global_viewer_process
64 with _global_process_lock:
65 if _global_viewer_process and _global_viewer_process.is_alive():
66 logger.info("🔬 VISUALIZER: Terminating napari viewer for test cleanup")
67 _global_viewer_process.terminate()
68 _global_viewer_process.join(timeout=3)
70 if _global_viewer_process.is_alive():
71 logger.warning("🔬 VISUALIZER: Force killing napari viewer process")
72 _global_viewer_process.kill()
73 _global_viewer_process.join(timeout=1)
75 _global_viewer_process = None
78def _parse_component_info_from_path(path_str: str):
79 """
80 Fallback component parsing from path (used when component metadata unavailable).
82 Args:
83 path_str: Path string like 'step_name/A01/s1_c2_z3.tif'
85 Returns:
86 Dict with basic component info extracted from filename
87 """
88 try:
89 import os
90 import re
92 filename = os.path.basename(path_str)
94 # Basic regex for common patterns
95 pattern = r"(?:s(\d+))?(?:_c(\d+))?(?:_z(\d+))?"
96 match = re.search(pattern, filename)
98 components = {}
99 if match:
100 site, channel, z_index = match.groups()
101 if site:
102 components["site"] = site
103 if channel:
104 components["channel"] = channel
105 if z_index:
106 components["z_index"] = z_index
108 return components
109 except Exception:
110 return {}
113def _build_nd_shapes(layer_items, stack_components):
114 """
115 Build nD shapes by prepending stack component indices to 2D shape coordinates.
117 Args:
118 layer_items: List of items with 'data' (shapes_data) and 'components'
119 stack_components: List of component names to stack
121 Returns:
122 Tuple of (all_shapes_nd, all_shape_types, all_properties)
123 """
124 from openhcs.runtime.roi_converters import NapariROIConverter
126 all_shapes_nd = []
127 all_shape_types = []
128 all_properties = {"label": [], "area": [], "centroid_y": [], "centroid_x": []}
130 # Build component value to index mapping (same as _build_nd_image_array)
131 component_values = {}
132 for comp in stack_components:
133 values = sorted(set(item["components"].get(comp, 0) for item in layer_items))
134 component_values[comp] = values
136 for item in layer_items:
137 shapes_data = item["data"] # List of shape dicts
138 components = item["components"]
140 # Get stack component INDICES to prepend (not values!)
141 prepend_dims = [
142 component_values[comp].index(components.get(comp, 0))
143 for comp in stack_components
144 ]
146 # Convert each shape to nD
147 for shape_dict in shapes_data:
148 # Use registry-based dimension handler
149 nd_coords = NapariROIConverter.add_dimensions_to_shape(
150 shape_dict, prepend_dims
151 )
152 all_shapes_nd.append(nd_coords)
153 all_shape_types.append(shape_dict["type"])
155 # Extract properties
156 metadata = shape_dict.get("metadata", {})
157 centroid = metadata.get("centroid", (0, 0))
158 all_properties["label"].append(metadata.get("label", ""))
159 all_properties["area"].append(metadata.get("area", 0))
160 all_properties["centroid_y"].append(centroid[0])
161 all_properties["centroid_x"].append(centroid[1])
163 return all_shapes_nd, all_shape_types, all_properties
166def _build_nd_image_array(layer_items, stack_components):
167 """
168 Build nD image array by stacking images along stack component dimensions.
170 Args:
171 layer_items: List of items with 'data' (image arrays) and 'components'
172 stack_components: List of component names to stack
174 Returns:
175 np.ndarray: Stacked image array
176 """
177 if len(stack_components) == 1:
178 # Single stack component - simple 3D stack
179 image_stack = [img["data"] for img in layer_items]
180 from openhcs.core.memory.stack_utils import stack_slices
182 return stack_slices(image_stack, memory_type="numpy", gpu_id=0)
183 else:
184 # Multiple stack components - create multi-dimensional array
185 component_values = {}
186 for comp in stack_components:
187 values = sorted(set(img["components"].get(comp, 0) for img in layer_items))
188 component_values[comp] = values
190 # Log component values for debugging
191 logger.info(
192 f"🔬 NAPARI PROCESS: Building nD array with stack_components={stack_components}, component_values={component_values}"
193 )
195 # Create empty array with shape (comp1_size, comp2_size, ..., y, x)
196 first_img = layer_items[0]["data"]
197 stack_shape = (
198 tuple(len(component_values[comp]) for comp in stack_components)
199 + first_img.shape
200 )
201 stacked_array = np.zeros(stack_shape, dtype=first_img.dtype)
202 logger.info(
203 f"🔬 NAPARI PROCESS: Created nD array with shape {stack_shape} from {len(layer_items)} items"
204 )
206 # Fill array
207 for img in layer_items:
208 # Get indices for this image
209 indices = tuple(
210 component_values[comp].index(img["components"].get(comp, 0))
211 for comp in stack_components
212 )
213 logger.debug(
214 f"🔬 NAPARI PROCESS: Placing image at indices {indices}, components={img['components']}"
215 )
216 stacked_array[indices] = img["data"]
218 return stacked_array
221def _create_or_update_shapes_layer(
222 viewer, layers, layer_name, shapes_data, shape_types, properties
223):
224 """
225 Create or update a Napari shapes layer.
227 Note: Shapes layers don't handle .data updates well when dimensions change.
228 We remove and recreate the layer instead of updating in place.
230 Args:
231 viewer: Napari viewer
232 layers: Dict of existing layers
233 layer_name: Name for the layer
234 shapes_data: List of shape coordinate arrays
235 shape_types: List of shape type strings
236 properties: Dict of properties
238 Returns:
239 The created or updated layer
240 """
241 # Check if layer exists
242 existing_layer = None
243 for layer in viewer.layers:
244 if layer.name == layer_name:
245 existing_layer = layer
246 break
248 if existing_layer is not None:
249 # For shapes, we need to remove and recreate because .data assignment doesn't work
250 # when dimensions change. But we need to be careful: removing the layer will trigger
251 # the reconciliation code to clear component_groups on the NEXT data arrival.
252 #
253 # Solution: Remove the layer, but immediately recreate it so it exists when
254 # reconciliation runs on the next well.
255 viewer.layers.remove(existing_layer)
256 layers.pop(layer_name, None)
257 logger.info(
258 f"🔬 NAPARI PROCESS: Removed existing shapes layer {layer_name} for recreation"
259 )
261 # Create new layer (or recreate if we just removed it)
262 new_layer = viewer.add_shapes(
263 shapes_data,
264 shape_type=shape_types,
265 properties=properties,
266 name=layer_name,
267 edge_color="red",
268 face_color="transparent",
269 edge_width=2,
270 )
271 layers[layer_name] = new_layer
272 logger.info(
273 f"🔬 NAPARI PROCESS: Created shapes layer {layer_name} with {len(shapes_data)} shapes"
274 )
275 return new_layer
278def _create_or_update_image_layer(
279 viewer, layers, layer_name, image_data, colormap, axis_labels=None
280):
281 """
282 Create or update a Napari image layer.
284 Args:
285 viewer: Napari viewer
286 layers: Dict of existing layers
287 layer_name: Name for the layer
288 image_data: Image array
289 colormap: Colormap name
290 axis_labels: Optional tuple of axis label strings for dimension names
292 Returns:
293 The created or updated layer
294 """
295 # Check if layer exists
296 existing_layer = None
297 for layer in viewer.layers:
298 if layer.name == layer_name:
299 existing_layer = layer
300 break
302 if existing_layer is not None:
303 old_shape = existing_layer.data.shape
304 new_shape = image_data.shape
305 if old_shape != new_shape:
306 logger.info(
307 f"🔬 NAPARI PROCESS: Layer {layer_name} shape changing: {old_shape} → {new_shape}"
308 )
309 existing_layer.data = image_data
310 if colormap:
311 existing_layer.colormap = colormap
312 layers[layer_name] = existing_layer
313 logger.info(
314 f"🔬 NAPARI PROCESS: Updated existing image layer {layer_name} (shape: {new_shape})"
315 )
316 return existing_layer
317 else:
318 new_layer = viewer.add_image(
319 image_data, name=layer_name, colormap=colormap or "gray"
320 )
322 # Set axis labels on viewer.dims (add_image axis_labels parameter doesn't work)
323 # See: https://forum.image.sc/t/rename-napari-dimension-slider-labels/41974
324 if axis_labels is not None:
325 viewer.dims.axis_labels = axis_labels
326 logger.info(f"🔬 NAPARI PROCESS: Set viewer.dims.axis_labels={axis_labels}")
328 layers[layer_name] = new_layer
329 logger.info(f"🔬 NAPARI PROCESS: Created new image layer {layer_name}")
330 return new_layer
333# Populate registry now that helper functions are defined
334from openhcs.constants.streaming import StreamingDataType
336_DATA_TYPE_HANDLERS = {
337 StreamingDataType.IMAGE: {
338 "build_nd_data": _build_nd_image_array,
339 "create_layer": _create_or_update_image_layer,
340 },
341 StreamingDataType.SHAPES: {
342 "build_nd_data": _build_nd_shapes,
343 "create_layer": _create_or_update_shapes_layer,
344 },
345}
348def _handle_component_aware_display(
349 viewer,
350 layers,
351 component_groups,
352 data,
353 path,
354 colormap,
355 display_config,
356 replace_layers,
357 component_metadata=None,
358 data_type="image",
359 server=None,
360):
361 """
362 Handle component-aware display following OpenHCS stacking patterns.
364 Components marked as SLICE create separate layers, components marked as STACK are stacked together.
365 Layer naming follows canonical component order from display config.
367 Args:
368 data_type: 'image' for image data, 'shapes' for ROI/shapes data (string or StreamingDataType enum)
369 server: NapariViewerServer instance (needed for debounced updates)
370 """
371 try:
372 # Convert data_type to enum if needed (for backwards compatibility)
373 if isinstance(data_type, str):
374 data_type = StreamingDataType(data_type)
376 # Use component metadata from ZMQ message - fail loud if not available
377 if not component_metadata:
378 raise ValueError(f"No component metadata available for path: {path}")
379 component_info = component_metadata
381 # Build component_modes and component_order from config (dict or object)
382 component_modes = None
383 component_order = None
385 if isinstance(display_config, dict):
386 cm = display_config.get("component_modes") or display_config.get(
387 "componentModes"
388 )
389 if isinstance(cm, dict) and cm:
390 component_modes = cm
391 component_order = display_config["component_order"]
392 else:
393 # Handle object-like config (NapariDisplayConfig)
394 component_order = display_config.COMPONENT_ORDER
395 component_modes = {}
396 for component in component_order:
397 mode_field = f"{component}_mode"
398 if hasattr(display_config, mode_field):
399 mode_value = getattr(display_config, mode_field)
400 component_modes[component] = getattr(
401 mode_value, "value", str(mode_value)
402 )
404 # Generic layer naming - iterate over components in canonical order
405 # Components in SLICE mode create separate layers
406 # Components in STACK mode are combined into the same layer
408 layer_key_parts = []
409 for component in component_order:
410 mode = component_modes.get(component)
411 if mode == "slice" and component in component_info:
412 value = component_info[component]
413 layer_key_parts.append(f"{component}_{value}")
415 layer_key = "_".join(layer_key_parts) if layer_key_parts else "default_layer"
417 # Log component modes for debugging
418 logger.info(
419 f"🔍 NAPARI PROCESS: component_modes={component_modes}, layer_key='{layer_key}'"
420 )
422 # Add "_shapes" suffix for shapes layers to distinguish from image layers
423 # MUST happen BEFORE reconciliation so we check the correct layer name
424 if data_type == StreamingDataType.SHAPES:
425 layer_key = f"{layer_key}_shapes"
427 # Log layer key and component info for debugging
428 logger.info(
429 f"🔍 NAPARI PROCESS: layer_key='{layer_key}', component_info={component_info}"
430 )
432 # Reconcile cached layer/group state with live napari viewer after possible manual deletions
433 # CRITICAL: Only purge if the layer WAS in our cache but is now missing from viewer
434 # (user manually deleted it). Do NOT purge if layer was never created yet (debounced update pending).
435 try:
436 current_layer_names = {l.name for l in viewer.layers}
437 if layer_key not in current_layer_names and layer_key in layers:
438 # Layer was in our cache but is now missing from viewer - user deleted it
439 # Drop stale references so we will recreate the layer
440 num_items = len(component_groups.get(layer_key, []))
441 layers.pop(layer_key, None)
442 component_groups.pop(layer_key, None)
443 logger.info(
444 f"🔬 NAPARI PROCESS: Reconciling state — '{layer_key}' was deleted from viewer; purged stale caches (had {num_items} items in component_groups)"
445 )
446 except Exception:
447 # Fail-loud elsewhere; reconciliation is best-effort and must not mask display
448 pass
450 # Initialize layer group if needed
451 if layer_key not in component_groups:
452 component_groups[layer_key] = []
454 # Handle replace_layers mode: clear all items for this layer_key
455 if replace_layers and component_groups[layer_key]:
456 logger.info(
457 f"🔬 NAPARI PROCESS: replace_layers=True, clearing {len(component_groups[layer_key])} existing items from layer '{layer_key}'"
458 )
459 component_groups[layer_key] = []
461 # Check if an item with the same component_info AND data_type already exists
462 # If so, replace it instead of appending (prevents accumulation across runs)
463 # CRITICAL: Must include 'well' in comparison even if it's in STACK mode,
464 # otherwise images from different wells with same channel/z/field will be treated as duplicates
465 # CRITICAL: Must also check data_type to prevent images and ROIs from being treated as duplicates
466 existing_index = None
467 for i, item in enumerate(component_groups[layer_key]):
468 # Compare ALL components including well AND data_type
469 if item["components"] == component_info and item["data_type"] == data_type:
470 logger.info(
471 f"🔬 NAPARI PROCESS: Found duplicate - component_info: {component_info}, data_type: {data_type} at index {i}"
472 )
473 existing_index = i
474 break
476 new_item = {
477 "data": data,
478 "components": component_info,
479 "path": str(path),
480 "data_type": data_type,
481 }
483 if existing_index is not None:
484 # Replace existing item with same components and data type
485 old_data_type = component_groups[layer_key][existing_index]["data_type"]
486 component_groups[layer_key][existing_index] = new_item
487 logger.info(
488 f"🔬 NAPARI PROCESS: Replaced {old_data_type} item in component_groups[{layer_key}] at index {existing_index}, total items: {len(component_groups[layer_key])}"
489 )
490 else:
491 # Add new item
492 component_groups[layer_key].append(new_item)
493 logger.info(
494 f"🔬 NAPARI PROCESS: Added {data_type} to component_groups[{layer_key}], now has {len(component_groups[layer_key])} items"
495 )
497 # Schedule debounced layer update instead of immediate update
498 # This prevents race conditions when multiple items arrive rapidly
499 if server is None:
500 raise ValueError("Server instance required for debounced updates")
501 logger.info(
502 f"🔬 NAPARI PROCESS: Scheduling debounced update for {layer_key} (data_type={data_type})"
503 )
504 server._schedule_layer_update(
505 layer_key, data_type, component_modes, component_order
506 )
508 except Exception as e:
509 import traceback
511 logger.error(
512 f"🔬 NAPARI PROCESS: Component-aware display failed for {path}: {e}"
513 )
514 logger.error(
515 f"🔬 NAPARI PROCESS: Component-aware display traceback: {traceback.format_exc()}"
516 )
517 raise # Fail loud - no fallback
520def _old_immediate_update_logic_removed():
521 """
522 Old immediate update logic removed in favor of debounced updates.
523 Kept as reference for the variable size handling logic that needs to be ported.
524 """
525 pass
526 # Old code was here - removed to prevent race conditions
527 # Now using _schedule_layer_update -> _execute_layer_update -> _update_image_layer/_update_shapes_layer
530class NapariViewerServer(ZMQServer):
531 """
532 ZMQ server for Napari viewer that receives images from clients.
534 Inherits from ZMQServer ABC to get ping/pong, port management, etc.
535 Uses SUB socket to receive images from pipeline clients.
536 """
538 _server_type = "napari" # Registration key for AutoRegisterMeta
540 def __init__(
541 self,
542 port: int,
543 viewer_title: str,
544 replace_layers: bool = False,
545 log_file_path: str = None,
546 transport_mode: TransportMode = TransportMode.IPC,
547 ):
548 """
549 Initialize Napari viewer server.
551 Args:
552 port: Data port for receiving images (control port will be port + 1000)
553 viewer_title: Title for the napari viewer window
554 replace_layers: If True, replace existing layers; if False, add new layers
555 log_file_path: Path to log file (for client discovery)
556 transport_mode: ZMQ transport mode (IPC or TCP)
557 """
558 import zmq
560 # Initialize with SUB socket for receiving images
561 super().__init__(
562 port,
563 host="*",
564 log_file_path=log_file_path,
565 data_socket_type=zmq.SUB,
566 transport_mode=transport_mode,
567 )
569 self.viewer_title = viewer_title
570 self.replace_layers = replace_layers
571 self.viewer = None
572 self.layers = {}
573 self.component_groups = {}
575 # Debouncing + locking for layer updates to prevent race conditions
576 import threading
578 self.layer_update_lock = threading.Lock() # Prevent concurrent updates
579 self.pending_updates = {} # layer_key -> QTimer (debounce)
580 self.update_delay_ms = 1000 # Wait 200ms for more items before rebuilding
582 # Create PUSH socket for sending acknowledgments to shared ack port
583 self.ack_socket = None
584 self._setup_ack_socket()
586 def _setup_ack_socket(self):
587 """Setup PUSH socket for sending acknowledgments."""
588 import zmq
590 try:
591 ack_url = get_zmq_transport_url(
592 SHARED_ACK_PORT, self.transport_mode, "localhost"
593 )
595 context = zmq.Context.instance()
596 self.ack_socket = context.socket(zmq.PUSH)
597 self.ack_socket.connect(ack_url)
598 logger.info(f"🔬 NAPARI SERVER: Connected ack socket to {ack_url}")
599 except Exception as e:
600 logger.warning(f"🔬 NAPARI SERVER: Failed to setup ack socket: {e}")
601 self.ack_socket = None
603 def _schedule_layer_update(
604 self, layer_key, data_type, component_modes, component_order
605 ):
606 """
607 Schedule a debounced layer update.
609 Cancels any pending update for this layer and schedules a new one.
610 This prevents race conditions when multiple items arrive rapidly.
611 """
612 # Cancel existing timer if any
613 if layer_key in self.pending_updates:
614 self.pending_updates[layer_key].stop()
615 logger.debug(f"🔬 NAPARI PROCESS: Cancelled pending update for {layer_key}")
617 # Create new timer
618 timer = QTimer()
619 timer.setSingleShot(True)
620 timer.timeout.connect(
621 lambda: self._execute_layer_update(
622 layer_key, data_type, component_modes, component_order
623 )
624 )
625 timer.start(self.update_delay_ms)
626 self.pending_updates[layer_key] = timer
627 logger.debug(
628 f"🔬 NAPARI PROCESS: Scheduled update for {layer_key} in {self.update_delay_ms}ms"
629 )
631 def _execute_layer_update(
632 self, layer_key, data_type, component_modes, component_order
633 ):
634 """
635 Execute the actual layer update after debounce delay.
637 Uses a lock to prevent concurrent updates to different layers.
638 """
639 # Remove timer
640 self.pending_updates.pop(layer_key, None)
642 # Acquire lock to prevent concurrent updates
643 with self.layer_update_lock:
644 logger.info(
645 f"🔬 NAPARI PROCESS: Executing debounced update for {layer_key}"
646 )
648 # Get current items for this layer
649 layer_items = self.component_groups.get(layer_key, [])
650 if not layer_items:
651 logger.warning(
652 f"🔬 NAPARI PROCESS: No items found for {layer_key}, skipping update"
653 )
654 return
656 # Log layer composition
657 wells_in_layer = set(
658 item["components"].get("well", "unknown") for item in layer_items
659 )
660 logger.info(
661 f"🔬 NAPARI PROCESS: layer_key='{layer_key}' has {len(layer_items)} items from wells: {sorted(wells_in_layer)}"
662 )
664 # Determine if we should stack or use single item
665 first_item = layer_items[0]
666 component_info = first_item["components"]
667 stack_components = [
668 comp
669 for comp, mode in component_modes.items()
670 if mode == "stack" and comp in component_info
671 ]
673 # Build and update the layer based on data type
674 if data_type == StreamingDataType.IMAGE:
675 self._update_image_layer(
676 layer_key, layer_items, stack_components, component_modes
677 )
678 elif data_type == StreamingDataType.SHAPES:
679 self._update_shapes_layer(
680 layer_key, layer_items, stack_components, component_modes
681 )
682 else:
683 logger.warning(
684 f"🔬 NAPARI PROCESS: Unknown data type {data_type} for {layer_key}"
685 )
687 def _update_image_layer(
688 self, layer_key, layer_items, stack_components, component_modes
689 ):
690 """Update an image layer with the current items."""
691 # Check if images have different shapes and pad if needed
692 shapes = [item["data"].shape for item in layer_items]
693 if len(set(shapes)) > 1:
694 logger.info(
695 f"🔬 NAPARI PROCESS: Images in layer {layer_key} have different shapes - padding to max size"
696 )
698 # Find max dimensions
699 first_shape = shapes[0]
700 max_shape = list(first_shape)
701 for img_shape in shapes:
702 for i, dim in enumerate(img_shape):
703 max_shape[i] = max(max_shape[i], dim)
704 max_shape = tuple(max_shape)
706 # Pad all images to max shape
707 for img_info in layer_items:
708 img_data = img_info["data"]
709 if img_data.shape != max_shape:
710 # Calculate padding for each dimension
711 pad_width = []
712 for i, (current_dim, max_dim) in enumerate(
713 zip(img_data.shape, max_shape)
714 ):
715 pad_before = 0
716 pad_after = max_dim - current_dim
717 pad_width.append((pad_before, pad_after))
719 # Pad with zeros
720 padded_data = np.pad(
721 img_data, pad_width, mode="constant", constant_values=0
722 )
723 img_info["data"] = padded_data
724 logger.debug(
725 f"🔬 NAPARI PROCESS: Padded image from {img_data.shape} to {padded_data.shape}"
726 )
728 logger.info(
729 f"🔬 NAPARI PROCESS: Building nD data for {layer_key} from {len(layer_items)} items"
730 )
731 stacked_data = _build_nd_image_array(layer_items, stack_components)
733 # Determine colormap
734 colormap = None
735 if "channel" in component_modes and component_modes["channel"] == "slice":
736 first_item = layer_items[0]
737 channel_value = first_item["components"].get("channel")
738 if channel_value == 1:
739 colormap = "green"
740 elif channel_value == 2:
741 colormap = "red"
743 # Build axis labels for stacked dimensions
744 # Format: (component1_name, component2_name, ..., 'y', 'x')
745 # The stack components appear in the same order as in stack_components list
746 # Must be a tuple for Napari
747 axis_labels = None
748 if stack_components:
749 axis_labels = tuple(list(stack_components) + ["y", "x"])
750 logger.info(
751 f"🔬 NAPARI PROCESS: Built axis_labels={axis_labels} for stack_components={stack_components}"
752 )
754 # Create or update the layer
755 _create_or_update_image_layer(
756 self.viewer, self.layers, layer_key, stacked_data, colormap, axis_labels
757 )
759 def _update_shapes_layer(
760 self, layer_key, layer_items, stack_components, component_modes
761 ):
762 """Update a shapes layer - use labels instead of shapes for efficiency."""
763 logger.info(
764 f"🔬 NAPARI PROCESS: Converting shapes to labels for {layer_key} from {len(layer_items)} items"
765 )
767 # Convert shapes to label masks (much faster than individual shapes)
768 # This happens synchronously but is fast because we're just creating arrays
769 labels_data = self._shapes_to_labels(layer_items, stack_components)
771 # Remove existing layer if it exists
772 if layer_key in self.layers:
773 try:
774 self.viewer.layers.remove(self.layers[layer_key])
775 logger.info(
776 f"🔬 NAPARI PROCESS: Removed existing labels layer {layer_key} for recreation"
777 )
778 except Exception as e:
779 logger.warning(
780 f"Failed to remove existing labels layer {layer_key}: {e}"
781 )
783 # Create new labels layer
784 new_layer = self.viewer.add_labels(labels_data, name=layer_key)
785 self.layers[layer_key] = new_layer
786 logger.info(
787 f"🔬 NAPARI PROCESS: Created labels layer {layer_key} with shape {labels_data.shape}"
788 )
790 def _shapes_to_labels(self, layer_items, stack_components):
791 """Convert shapes data to label masks."""
792 from skimage import draw
794 # Build component value to index mapping
795 component_values = {}
796 for comp in stack_components:
797 values = sorted(
798 set(item["components"].get(comp, 0) for item in layer_items)
799 )
800 component_values[comp] = values
802 # Determine output shape
803 # Get image shape from first item's shapes data
804 first_shapes = layer_items[0]["data"]
805 if not first_shapes:
806 # No shapes, return empty array
807 return np.zeros((1, 1, 512, 512), dtype=np.uint16)
809 # Estimate image size from shape coordinates
810 max_y, max_x = 0, 0
811 for shape_dict in first_shapes:
812 if shape_dict["type"] == "polygon":
813 coords = np.array(shape_dict["coordinates"])
814 max_y = max(max_y, int(np.max(coords[:, 0])) + 1)
815 max_x = max(max_x, int(np.max(coords[:, 1])) + 1)
817 # Build nD shape
818 nd_shape = []
819 for comp in stack_components:
820 nd_shape.append(len(component_values[comp]))
821 nd_shape.extend([max_y, max_x])
823 # Create empty label array
824 labels_array = np.zeros(nd_shape, dtype=np.uint16)
826 # Fill in labels for each item
827 label_id = 1
828 for item in layer_items:
829 # Get indices for this item
830 indices = []
831 for comp in stack_components:
832 comp_value = item["components"].get(comp, 0)
833 idx = component_values[comp].index(comp_value)
834 indices.append(idx)
836 # Get shapes data
837 shapes_data = item["data"]
839 # Draw each shape into the label mask
840 for shape_dict in shapes_data:
841 if shape_dict["type"] == "polygon":
842 coords = np.array(shape_dict["coordinates"])
843 rr, cc = draw.polygon(
844 coords[:, 0], coords[:, 1], shape=labels_array.shape[-2:]
845 )
847 # Set label at the correct nD position
848 full_indices = tuple(indices) + (rr, cc)
849 labels_array[full_indices] = label_id
850 label_id += 1
852 logger.info(
853 f"🔬 NAPARI PROCESS: Created labels array with shape {labels_array.shape} and {label_id-1} labels"
854 )
855 return labels_array
857 def _send_ack(self, image_id: str, status: str = "success", error: str = None):
858 """Send acknowledgment that an image was processed.
860 Args:
861 image_id: UUID of the processed image
862 status: 'success' or 'error'
863 error: Error message if status='error'
864 """
865 if not self.ack_socket:
866 return
868 try:
869 ack = ImageAck(
870 image_id=image_id,
871 viewer_port=self.port,
872 viewer_type="napari",
873 status=status,
874 timestamp=time.time(),
875 error=error,
876 )
877 self.ack_socket.send_json(ack.to_dict())
878 logger.debug(f"🔬 NAPARI SERVER: Sent ack for image {image_id}")
879 except Exception as e:
880 logger.warning(f"🔬 NAPARI SERVER: Failed to send ack for {image_id}: {e}")
882 def _create_pong_response(self) -> Dict[str, Any]:
883 """Override to add Napari-specific fields and memory usage."""
884 response = super()._create_pong_response()
885 response["viewer"] = "napari"
886 response["openhcs"] = True
887 response["server"] = "NapariViewer"
889 # Add memory usage
890 try:
891 import psutil
892 import os
894 process = psutil.Process(os.getpid())
895 response["memory_mb"] = process.memory_info().rss / 1024 / 1024
896 response["cpu_percent"] = process.cpu_percent(interval=0)
897 except Exception:
898 pass
900 return response
902 def handle_control_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
903 """
904 Handle control messages beyond ping/pong.
906 Supported message types:
907 - shutdown: Graceful shutdown (closes viewer)
908 - force_shutdown: Force shutdown (same as shutdown for Napari)
909 - clear_state: Clear accumulated component groups (for new pipeline runs)
910 """
911 msg_type = message.get("type")
913 if msg_type == "shutdown" or msg_type == "force_shutdown":
914 logger.info(f"🔬 NAPARI SERVER: {msg_type} requested, closing viewer")
915 self.request_shutdown()
917 # Schedule viewer close on Qt event loop to trigger application exit
918 # This must be done after sending the response, so we use QTimer.singleShot
919 if self.viewer is not None:
920 from qtpy import QtCore
922 QtCore.QTimer.singleShot(100, self.viewer.close)
924 return {
925 "type": "shutdown_ack",
926 "status": "success",
927 "message": "Napari viewer shutting down",
928 }
930 elif msg_type == "clear_state":
931 # Clear accumulated component groups to prevent shape accumulation across runs
932 logger.info(
933 f"🔬 NAPARI SERVER: Clearing component groups (had {len(self.component_groups)} groups)"
934 )
935 self.component_groups.clear()
936 return {
937 "type": "clear_state_ack",
938 "status": "success",
939 "message": "Component groups cleared",
940 }
942 # Unknown message type
943 return {"status": "ok"}
945 def handle_data_message(self, message: Dict[str, Any]):
946 """Handle incoming image data - called by process_messages()."""
947 # This will be called from the Qt timer
948 pass
950 def process_image_message(self, message: bytes):
951 """
952 Process incoming image data message.
954 Args:
955 message: Raw ZMQ message containing image data
956 """
957 import json
959 # Parse JSON message
960 data = json.loads(message.decode("utf-8"))
962 msg_type = data.get("type")
964 # Check message type
965 if msg_type == "batch":
966 # Handle batch of images/shapes
967 images = data.get("images", [])
968 display_config_dict = data.get("display_config")
970 for image_info in images:
971 self._process_single_image(image_info, display_config_dict)
973 else:
974 # Handle single image (legacy)
975 self._process_single_image(data, data.get("display_config"))
977 def _process_single_image(
978 self, image_info: Dict[str, Any], display_config_dict: Dict[str, Any]
979 ):
980 """Process a single image or shapes data and display in Napari."""
981 import numpy as np
983 path = image_info.get("path", "unknown")
984 image_id = image_info.get("image_id") # UUID for acknowledgment
985 data_type = image_info.get("data_type", "image") # 'image' or 'shapes'
986 component_metadata = image_info.get("metadata", {})
988 # Log incoming metadata to debug well filtering issues
989 logger.info(
990 f"🔍 NAPARI PROCESS: Received {data_type} with metadata: {component_metadata} (path: {path})"
991 )
993 try:
994 # Check if this is shapes data
995 if data_type == "shapes":
996 # Handle shapes/ROIs - just pass the shapes data directly
997 shapes_data = image_info.get("shapes", [])
998 data = shapes_data
999 colormap = None # Shapes don't use colormap
1000 else:
1001 # Handle image data - load from shared memory or direct data
1002 shape = image_info.get("shape")
1003 dtype = image_info.get("dtype")
1004 shm_name = image_info.get("shm_name")
1005 direct_data = image_info.get("data")
1007 # Load image data
1008 if shm_name:
1009 from multiprocessing import shared_memory
1011 try:
1012 shm = shared_memory.SharedMemory(name=shm_name)
1013 data = np.ndarray(shape, dtype=dtype, buffer=shm.buf).copy()
1014 shm.close()
1015 # Unlink shared memory after copying - viewer is responsible for cleanup
1016 try:
1017 shm.unlink()
1018 except FileNotFoundError:
1019 # Already unlinked (race condition or duplicate message)
1020 logger.debug(
1021 f"🔬 NAPARI PROCESS: Shared memory {shm_name} already unlinked"
1022 )
1023 except Exception as e:
1024 logger.warning(
1025 f"🔬 NAPARI PROCESS: Failed to unlink shared memory {shm_name}: {e}"
1026 )
1027 except FileNotFoundError:
1028 # Shared memory doesn't exist - likely already processed and unlinked
1029 logger.error(
1030 f"🔬 NAPARI PROCESS: Shared memory {shm_name} not found - may have been already processed"
1031 )
1032 if image_id:
1033 self._send_ack(
1034 image_id,
1035 status="error",
1036 error=f"Shared memory {shm_name} not found",
1037 )
1038 return
1039 except Exception as e:
1040 logger.error(
1041 f"🔬 NAPARI PROCESS: Failed to open shared memory {shm_name}: {e}"
1042 )
1043 if image_id:
1044 self._send_ack(
1045 image_id,
1046 status="error",
1047 error=f"Failed to open shared memory: {e}",
1048 )
1049 raise
1050 elif direct_data:
1051 data = np.array(direct_data, dtype=dtype).reshape(shape)
1052 else:
1053 logger.warning("🔬 NAPARI PROCESS: No image data in message")
1054 if image_id:
1055 self._send_ack(
1056 image_id, status="error", error="No image data in message"
1057 )
1058 return
1060 # Extract colormap
1061 colormap = "viridis"
1062 if display_config_dict and "colormap" in display_config_dict:
1063 colormap = display_config_dict["colormap"]
1065 # Component-aware layer management (handles both images and shapes)
1066 _handle_component_aware_display(
1067 self.viewer,
1068 self.layers,
1069 self.component_groups,
1070 data,
1071 path,
1072 colormap,
1073 display_config_dict or {},
1074 self.replace_layers,
1075 component_metadata,
1076 data_type,
1077 server=self,
1078 )
1080 # Send acknowledgment that data was successfully displayed
1081 if image_id:
1082 self._send_ack(image_id, status="success")
1084 except Exception as e:
1085 logger.error(
1086 f"🔬 NAPARI PROCESS: Failed to process {data_type} {path}: {e}"
1087 )
1088 if image_id:
1089 self._send_ack(image_id, status="error", error=str(e))
1090 raise # Fail loud
1093def _napari_viewer_process(
1094 port: int,
1095 viewer_title: str,
1096 replace_layers: bool = False,
1097 log_file_path: str = None,
1098 transport_mode: TransportMode = TransportMode.IPC,
1099):
1100 """
1101 Napari viewer process entry point. Runs in a separate process.
1102 Listens for ZeroMQ messages with image data to display.
1104 Args:
1105 port: ZMQ port to listen on
1106 viewer_title: Title for the napari viewer window
1107 replace_layers: If True, replace existing layers; if False, add new layers with unique names
1108 log_file_path: Path to log file (for client discovery via ping/pong)
1109 transport_mode: ZMQ transport mode (IPC or TCP)
1110 """
1111 try:
1112 import zmq
1113 import napari
1115 # Create ZMQ server instance (inherits from ZMQServer ABC)
1116 server = NapariViewerServer(
1117 port, viewer_title, replace_layers, log_file_path, transport_mode
1118 )
1120 # Start the server (binds sockets)
1121 server.start()
1123 # Create napari viewer in this process (main thread)
1124 viewer = napari.Viewer(title=viewer_title, show=True)
1125 server.viewer = viewer
1127 # Initialize layers dictionary with existing layers (for reconnection scenarios)
1128 for layer in viewer.layers:
1129 server.layers[layer.name] = layer
1131 logger.info(
1132 f"🔬 NAPARI PROCESS: Viewer started on data port {port}, control port {server.control_port}"
1133 )
1135 # Add cleanup handler for when viewer is closed
1136 def cleanup_and_exit():
1137 logger.info("🔬 NAPARI PROCESS: Viewer closed, cleaning up and exiting...")
1138 try:
1139 server.stop()
1140 except:
1141 pass
1142 sys.exit(0)
1144 # Connect the viewer close event to cleanup
1145 viewer.window.qt_viewer.destroyed.connect(cleanup_and_exit)
1147 # Use proper Qt event loop integration
1148 import sys
1149 from qtpy import QtWidgets, QtCore
1151 # Ensure Qt platform is properly set for detached processes
1152 import os
1153 import platform
1155 if "QT_QPA_PLATFORM" not in os.environ:
1156 if platform.system() == "Darwin": # macOS
1157 os.environ["QT_QPA_PLATFORM"] = "cocoa"
1158 elif platform.system() == "Linux":
1159 os.environ["QT_QPA_PLATFORM"] = "xcb"
1160 os.environ["QT_X11_NO_MITSHM"] = "1"
1161 # Windows doesn't need QT_QPA_PLATFORM set
1162 elif platform.system() == "Linux":
1163 # Disable shared memory for X11 (helps with display issues in detached processes)
1164 os.environ["QT_X11_NO_MITSHM"] = "1"
1166 # Get the Qt application
1167 app = QtWidgets.QApplication.instance()
1168 if app is None:
1169 app = QtWidgets.QApplication(sys.argv)
1171 # Ensure the application DOES quit when the napari window closes
1172 app.setQuitOnLastWindowClosed(True)
1174 # Set up a QTimer for message processing
1175 timer = QtCore.QTimer()
1177 def process_messages():
1178 # Process control messages (ping/pong handled by ABC)
1179 server.process_messages()
1181 # Process data messages (images) if ready
1182 if server._ready:
1183 # Process multiple messages per timer tick for better throughput
1184 for _ in range(10): # Process up to 10 messages per tick
1185 try:
1186 message = server.data_socket.recv(zmq.NOBLOCK)
1187 server.process_image_message(message)
1188 except zmq.Again:
1189 # No more messages available
1190 break
1192 # Connect timer to message processing
1193 timer.timeout.connect(process_messages)
1194 timer.start(50) # Process messages every 50ms
1196 logger.info("🔬 NAPARI PROCESS: Starting Qt event loop")
1198 # Run the Qt event loop - this keeps napari responsive
1199 app.exec_()
1201 except Exception as e:
1202 logger.error(f"🔬 NAPARI PROCESS: Fatal error: {e}")
1203 finally:
1204 logger.info("🔬 NAPARI PROCESS: Shutting down")
1205 if "server" in locals():
1206 server.stop()
1209def _spawn_detached_napari_process(
1210 port: int,
1211 viewer_title: str,
1212 replace_layers: bool = False,
1213 transport_mode: TransportMode = TransportMode.IPC,
1214) -> subprocess.Popen:
1215 """
1216 Spawn a completely detached napari viewer process that survives parent termination.
1218 This creates a subprocess that runs independently and won't be terminated when
1219 the parent process exits, enabling true persistence across pipeline runs.
1221 Args:
1222 port: ZMQ port to listen on
1223 viewer_title: Title for the napari viewer window
1224 replace_layers: If True, replace existing layers; if False, add new layers
1225 transport_mode: ZMQ transport mode (IPC or TCP)
1226 """
1227 # Use a simpler approach: spawn python directly with the napari viewer module
1228 # This avoids temporary file issues and import problems
1230 # Create the command to run the napari viewer directly
1231 current_dir = os.getcwd()
1232 python_code = f"""
1233import sys
1234import os
1236# Detach from parent process group (Unix only)
1237if hasattr(os, "setsid"):
1238 try:
1239 os.setsid()
1240 except OSError:
1241 pass
1243# Add current working directory to Python path
1244sys.path.insert(0, {repr(current_dir)})
1246try:
1247 from openhcs.runtime.napari_stream_visualizer import _napari_viewer_process
1248 from openhcs.core.config import TransportMode
1249 transport_mode = TransportMode.{transport_mode.name}
1250 _napari_viewer_process({port}, {repr(viewer_title)}, {replace_layers}, {repr(current_dir + "/.napari_log_path_placeholder")}, transport_mode)
1251except Exception as e:
1252 import logging
1253 logger = logging.getLogger("openhcs.runtime.napari_detached")
1254 logger.error(f"Detached napari error: {{e}}")
1255 import traceback
1256 logger.error(traceback.format_exc())
1257 sys.exit(1)
1258"""
1260 try:
1261 # Create log file for detached process
1262 log_dir = os.path.expanduser("~/.local/share/openhcs/logs")
1263 os.makedirs(log_dir, exist_ok=True)
1264 log_file = os.path.join(log_dir, f"napari_detached_port_{port}.log")
1266 # Replace placeholder with actual log file path in python code
1267 python_code = python_code.replace(
1268 repr(current_dir + "/.napari_log_path_placeholder"), repr(log_file)
1269 )
1271 # Use subprocess.Popen with detachment flags
1272 if sys.platform == "win32":
1273 # Windows: Use CREATE_NEW_PROCESS_GROUP to detach but preserve display environment
1274 env = os.environ.copy() # Preserve environment variables
1275 with open(log_file, "w") as log_f:
1276 process = subprocess.Popen(
1277 [sys.executable, "-c", python_code],
1278 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
1279 | subprocess.DETACHED_PROCESS,
1280 env=env,
1281 cwd=os.getcwd(),
1282 stdout=log_f,
1283 stderr=subprocess.STDOUT,
1284 )
1285 else:
1286 # Unix: Use start_new_session to detach but preserve display environment
1287 env = os.environ.copy() # Preserve DISPLAY and other environment variables
1289 # Ensure Qt platform is set for GUI display
1290 import platform
1292 if "QT_QPA_PLATFORM" not in env:
1293 if platform.system() == "Darwin": # macOS
1294 env["QT_QPA_PLATFORM"] = "cocoa"
1295 elif platform.system() == "Linux":
1296 env["QT_QPA_PLATFORM"] = "xcb"
1297 env["QT_X11_NO_MITSHM"] = "1"
1298 # Windows doesn't need QT_QPA_PLATFORM set
1299 elif platform.system() == "Linux":
1300 # Ensure Qt can find the display
1301 env["QT_X11_NO_MITSHM"] = (
1302 "1" # Disable shared memory for X11 (helps with some display issues)
1303 )
1305 # Redirect stdout/stderr to log file for debugging
1306 log_f = open(log_file, "w")
1307 process = subprocess.Popen(
1308 [sys.executable, "-c", python_code],
1309 env=env,
1310 cwd=os.getcwd(),
1311 stdout=log_f,
1312 stderr=subprocess.STDOUT,
1313 start_new_session=True, # CRITICAL: Detach from parent process group
1314 )
1316 logger.info(
1317 f"🔬 VISUALIZER: Detached napari process started (PID: {process.pid}), logging to {log_file}"
1318 )
1319 return process
1321 except Exception as e:
1322 logger.error(f"🔬 VISUALIZER: Failed to spawn detached napari process: {e}")
1323 raise e
1326class NapariStreamVisualizer:
1327 """
1328 Manages a Napari viewer instance for real-time visualization of tensors
1329 streamed from the OpenHCS pipeline. Runs napari in a separate process
1330 for Qt compatibility and true persistence across pipeline runs.
1331 """
1333 def __init__(
1334 self,
1335 filemanager: FileManager,
1336 visualizer_config,
1337 viewer_title: str = "OpenHCS Real-Time Visualization",
1338 persistent: bool = True,
1339 port: int = None,
1340 replace_layers: bool = False,
1341 display_config=None,
1342 transport_mode: TransportMode = TransportMode.IPC,
1343 ):
1344 self.filemanager = filemanager
1345 self.viewer_title = viewer_title
1346 self.persistent = (
1347 persistent # If True, viewer process stays alive after pipeline completion
1348 )
1349 self.visualizer_config = visualizer_config
1350 # Use config class default if not specified
1351 self.port = (
1352 port
1353 if port is not None
1354 else NapariStreamingConfig.__dataclass_fields__["port"].default
1355 )
1356 self.replace_layers = (
1357 replace_layers # If True, replace existing layers; if False, add new layers
1358 )
1359 self.display_config = display_config # Configuration for display behavior
1360 self.transport_mode = transport_mode # ZMQ transport mode (IPC or TCP)
1361 self.process: Optional[multiprocessing.Process] = None
1362 self.zmq_context: Optional[zmq.Context] = None
1363 self.zmq_socket: Optional[zmq.Socket] = None
1364 self._is_running = False # Internal flag, use is_running property instead
1365 self._connected_to_existing = (
1366 False # True if connected to viewer we didn't create
1367 )
1368 self._lock = threading.Lock()
1370 # Clause 368: Visualization must be observer-only.
1371 # This class will only read data and display it.
1373 @property
1374 def is_running(self) -> bool:
1375 """
1376 Check if the napari viewer is actually running.
1378 This property checks the actual process state, not just a cached flag.
1379 Returns True only if the process exists and is alive.
1380 """
1381 if not self._is_running:
1382 return False
1384 # If we connected to an existing viewer, verify it's still responsive
1385 if self._connected_to_existing:
1386 # Quick ping check to verify viewer is still alive
1387 if not self._quick_ping_check():
1388 logger.debug(
1389 f"🔬 VISUALIZER: Connected viewer on port {self.port} is no longer responsive"
1390 )
1391 self._is_running = False
1392 self._connected_to_existing = False
1393 return False
1394 return True
1396 if self.process is None:
1397 self._is_running = False
1398 return False
1400 # Check if process is actually alive
1401 try:
1402 if hasattr(self.process, "is_alive"):
1403 # multiprocessing.Process
1404 alive = self.process.is_alive()
1405 else:
1406 # subprocess.Popen
1407 alive = self.process.poll() is None
1409 if not alive:
1410 logger.debug(
1411 f"🔬 VISUALIZER: Napari process on port {self.port} is no longer alive"
1412 )
1413 self._is_running = False
1415 return alive
1416 except Exception as e:
1417 logger.warning(f"🔬 VISUALIZER: Error checking process status: {e}")
1418 self._is_running = False
1419 return False
1421 def _quick_ping_check(self) -> bool:
1422 """Quick ping check to verify viewer is responsive (for connected viewers)."""
1423 import zmq
1424 import pickle
1425 from openhcs.constants.constants import CONTROL_PORT_OFFSET
1427 try:
1428 control_port = self.port + CONTROL_PORT_OFFSET
1429 control_url = get_zmq_transport_url(
1430 control_port, self.transport_mode, "localhost"
1431 )
1433 ctx = zmq.Context()
1434 sock = ctx.socket(zmq.REQ)
1435 sock.setsockopt(zmq.LINGER, 0)
1436 sock.setsockopt(zmq.RCVTIMEO, 200) # 200ms timeout for quick check
1437 sock.connect(control_url)
1438 sock.send(pickle.dumps({"type": "ping"}))
1439 response = pickle.loads(sock.recv())
1440 sock.close()
1441 ctx.term()
1442 return response.get("type") == "pong"
1443 except:
1444 return False
1446 def wait_for_ready(self, timeout: float = 10.0) -> bool:
1447 """
1448 Wait for the viewer to be ready to receive images.
1450 This method blocks until the viewer is responsive or the timeout expires.
1451 Should be called after start_viewer() when using async_mode=True.
1453 Args:
1454 timeout: Maximum time to wait in seconds
1456 Returns:
1457 True if viewer is ready, False if timeout
1458 """
1459 return self._wait_for_viewer_ready(timeout=timeout)
1461 def _find_free_port(self) -> int:
1462 """Find a free port for ZeroMQ communication."""
1463 import socket
1465 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1466 s.bind(("", 0))
1467 return s.getsockname()[1]
1469 def start_viewer(self, async_mode: bool = True):
1470 """
1471 Starts the Napari viewer in a separate process.
1473 Args:
1474 async_mode: If True, start viewer asynchronously in background thread.
1475 If False, wait for viewer to be ready before returning (legacy behavior).
1476 """
1477 if async_mode:
1478 # Start viewer asynchronously in background thread
1479 thread = threading.Thread(target=self._start_viewer_sync, daemon=True)
1480 thread.start()
1481 logger.info(
1482 f"🔬 VISUALIZER: Starting napari viewer asynchronously on port {self.port}"
1483 )
1484 else:
1485 # Legacy synchronous mode
1486 self._start_viewer_sync()
1488 def _start_viewer_sync(self):
1489 """Internal synchronous viewer startup (called by start_viewer)."""
1490 global _global_viewer_process, _global_viewer_port
1492 with self._lock:
1493 # Check if there's already a napari viewer running on the configured port
1494 port_in_use = self._is_port_in_use(self.port)
1495 logger.info(f"🔬 VISUALIZER: Port {self.port} in use: {port_in_use}")
1497 if port_in_use:
1498 # Try to connect to existing viewer first before killing it
1499 logger.info(
1500 f"🔬 VISUALIZER: Port {self.port} is in use, attempting to connect to existing viewer..."
1501 )
1502 if self._try_connect_to_existing_viewer(self.port):
1503 logger.info(
1504 f"🔬 VISUALIZER: Successfully connected to existing viewer on port {self.port}"
1505 )
1506 self._is_running = True
1507 self._connected_to_existing = (
1508 True # Mark that we connected to existing viewer
1509 )
1510 return
1511 else:
1512 # Existing viewer is unresponsive - kill it and start fresh
1513 logger.info(
1514 f"🔬 VISUALIZER: Existing viewer on port {self.port} is unresponsive, killing and restarting..."
1515 )
1516 # Use shared method from ZMQServer ABC
1517 from openhcs.runtime.zmq_base import ZMQServer
1518 from openhcs.constants.constants import CONTROL_PORT_OFFSET
1520 ZMQServer.kill_processes_on_port(self.port)
1521 ZMQServer.kill_processes_on_port(self.port + CONTROL_PORT_OFFSET)
1522 # Wait a moment for ports to be freed
1523 import time
1525 time.sleep(0.5)
1527 if self._is_running:
1528 logger.warning("Napari viewer is already running.")
1529 return
1531 # Port is already set in __init__
1532 logger.info(
1533 f"🔬 VISUALIZER: Starting napari viewer process on port {self.port}"
1534 )
1536 # ALL viewers (persistent and non-persistent) should be detached subprocess
1537 # so they don't block parent process exit. The difference is only whether
1538 # we terminate them during cleanup.
1539 logger.info(
1540 f"🔬 VISUALIZER: Creating {'persistent' if self.persistent else 'non-persistent'} napari viewer (detached)"
1541 )
1542 self.process = _spawn_detached_napari_process(
1543 self.port, self.viewer_title, self.replace_layers, self.transport_mode
1544 )
1546 # Only track non-persistent viewers in global variable for test cleanup
1547 if not self.persistent:
1548 with _global_process_lock:
1549 _global_viewer_process = self.process
1550 _global_viewer_port = self.port
1552 # Wait for napari viewer to be ready before setting up ZMQ
1553 self._wait_for_viewer_ready()
1555 # Set up ZeroMQ client
1556 self._setup_zmq_client()
1558 # Check if process is running (different methods for subprocess vs multiprocessing)
1559 if hasattr(self.process, "is_alive"):
1560 # multiprocessing.Process
1561 process_alive = self.process.is_alive()
1562 else:
1563 # subprocess.Popen
1564 process_alive = self.process.poll() is None
1566 if process_alive:
1567 self._is_running = True
1568 logger.info(
1569 f"🔬 VISUALIZER: Napari viewer process started successfully (PID: {self.process.pid})"
1570 )
1571 else:
1572 logger.error("🔬 VISUALIZER: Failed to start napari viewer process")
1574 def _try_connect_to_existing_viewer(self, port: int) -> bool:
1575 """
1576 Try to connect to an existing napari viewer and verify it's responsive.
1578 Returns True only if we can successfully handshake with the viewer.
1579 """
1580 import zmq
1581 import pickle
1582 from openhcs.constants.constants import CONTROL_PORT_OFFSET
1584 # Try to ping the control port to verify viewer is responsive
1585 control_port = port + CONTROL_PORT_OFFSET
1586 control_url = get_zmq_transport_url(
1587 control_port, self.transport_mode, "localhost"
1588 )
1589 control_context = None
1590 control_socket = None
1592 try:
1593 control_context = zmq.Context()
1594 control_socket = control_context.socket(zmq.REQ)
1595 control_socket.setsockopt(zmq.LINGER, 0)
1596 control_socket.setsockopt(zmq.RCVTIMEO, 500) # 500ms timeout
1597 control_socket.connect(control_url)
1599 # Send ping
1600 ping_message = {"type": "ping"}
1601 control_socket.send(pickle.dumps(ping_message))
1603 # Wait for pong
1604 response = control_socket.recv()
1605 response_data = pickle.loads(response)
1607 if response_data.get("type") == "pong" and response_data.get("ready"):
1608 # Viewer is responsive! Set up our ZMQ client
1609 control_socket.close()
1610 control_context.term()
1611 self._setup_zmq_client()
1612 return True
1613 else:
1614 return False
1616 except Exception as e:
1617 logger.debug(f"Failed to connect to existing viewer on port {port}: {e}")
1618 return False
1619 finally:
1620 if control_socket:
1621 try:
1622 control_socket.close()
1623 except:
1624 pass
1625 if control_context:
1626 try:
1627 control_context.term()
1628 except:
1629 pass
1631 def _is_port_in_use(self, port: int) -> bool:
1632 """Check if a port/socket is already in use (indicating existing napari viewer)."""
1633 if self.transport_mode == TransportMode.IPC:
1634 # For IPC mode, check if socket file exists
1635 import platform
1636 from pathlib import Path
1637 from openhcs.constants.constants import (
1638 IPC_SOCKET_DIR_NAME,
1639 IPC_SOCKET_PREFIX,
1640 IPC_SOCKET_EXTENSION,
1641 )
1643 if platform.system() == "Windows":
1644 # Windows named pipes - can't easily check existence, so always return False
1645 # (will rely on ping/pong handshake instead)
1646 return False
1647 else:
1648 # Unix domain sockets - check if socket file exists
1649 ipc_dir = Path.home() / ".openhcs" / IPC_SOCKET_DIR_NAME
1650 socket_name = f"{IPC_SOCKET_PREFIX}-{port}{IPC_SOCKET_EXTENSION}"
1651 socket_path = ipc_dir / socket_name
1652 return socket_path.exists()
1653 else:
1654 # TCP mode - check if port is bound
1655 import socket
1657 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1658 sock.settimeout(0.1)
1659 try:
1660 # Try to bind to the port - if it fails, something is already using it
1661 sock.bind(("localhost", port))
1662 sock.close()
1663 return False # Port is free
1664 except OSError:
1665 # Port is already in use
1666 sock.close()
1667 return True
1668 except Exception:
1669 return False
1671 def _wait_for_viewer_ready(self, timeout: float = 10.0) -> bool:
1672 """Wait for the napari viewer to be ready using handshake protocol."""
1673 import zmq
1674 from openhcs.constants.constants import CONTROL_PORT_OFFSET
1676 logger.info(
1677 f"🔬 VISUALIZER: Waiting for napari viewer to be ready on port {self.port}..."
1678 )
1680 control_port = self.port + CONTROL_PORT_OFFSET
1682 # First wait for ports to be bound
1683 start_time = time.time()
1684 while time.time() - start_time < timeout:
1685 if self._is_port_in_use(self.port) and self._is_port_in_use(control_port):
1686 break
1687 time.sleep(0.2)
1688 else:
1689 logger.warning("🔬 VISUALIZER: Timeout waiting for ports to be bound")
1690 return False
1692 # Now use handshake protocol - create fresh socket for each attempt
1693 control_url = get_zmq_transport_url(
1694 control_port, self.transport_mode, "localhost"
1695 )
1696 start_time = time.time()
1697 while time.time() - start_time < timeout:
1698 control_context = zmq.Context()
1699 control_socket = control_context.socket(zmq.REQ)
1700 control_socket.setsockopt(zmq.LINGER, 0)
1701 control_socket.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout
1703 try:
1704 control_socket.connect(control_url)
1706 import pickle
1708 ping_message = {"type": "ping"}
1709 control_socket.send(pickle.dumps(ping_message))
1711 response = control_socket.recv()
1712 response_data = pickle.loads(response)
1714 if response_data.get("type") == "pong" and response_data.get("ready"):
1715 logger.info(
1716 f"🔬 VISUALIZER: Napari viewer is ready on port {self.port}"
1717 )
1718 return True
1720 except zmq.Again:
1721 pass # Timeout waiting for response
1722 except Exception as e:
1723 logger.debug(f"🔬 VISUALIZER: Handshake attempt failed: {e}")
1724 finally:
1725 control_socket.close()
1726 control_context.term()
1728 time.sleep(0.5) # Wait before next ping
1730 logger.warning("🔬 VISUALIZER: Timeout waiting for napari viewer handshake")
1731 return False
1733 def _setup_zmq_client(self):
1734 """Set up ZeroMQ client to send data to viewer process."""
1735 if self.port is None:
1736 raise RuntimeError("Port not set - call start_viewer() first")
1738 data_url = get_zmq_transport_url(self.port, self.transport_mode, "localhost")
1740 self.zmq_context = zmq.Context()
1741 self.zmq_socket = self.zmq_context.socket(zmq.PUB)
1742 self.zmq_socket.connect(data_url)
1744 # Brief delay for ZMQ connection to establish
1745 time.sleep(ZMQ_CONNECTION_DELAY_MS / 1000.0)
1746 logger.info(f"🔬 VISUALIZER: ZMQ client connected to {data_url}")
1748 def send_control_message(self, message_type: str, timeout: float = 2.0) -> bool:
1749 """
1750 Send a control message to the viewer.
1752 Args:
1753 message_type: Type of control message ('clear_state', 'shutdown', etc.)
1754 timeout: Timeout in seconds for waiting for response
1756 Returns:
1757 True if message was sent and acknowledged, False otherwise
1758 """
1759 if not self.is_running or self.port is None:
1760 logger.warning(
1761 f"🔬 VISUALIZER: Cannot send {message_type} - viewer not running"
1762 )
1763 return False
1765 import zmq
1766 import pickle
1767 from openhcs.constants.constants import CONTROL_PORT_OFFSET
1769 control_port = self.port + CONTROL_PORT_OFFSET
1770 control_url = get_zmq_transport_url(
1771 control_port, self.transport_mode, "localhost"
1772 )
1773 control_context = None
1774 control_socket = None
1776 try:
1777 control_context = zmq.Context()
1778 control_socket = control_context.socket(zmq.REQ)
1779 control_socket.setsockopt(zmq.LINGER, 0)
1780 control_socket.setsockopt(zmq.RCVTIMEO, int(timeout * 1000))
1781 control_socket.connect(control_url)
1783 # Send control message
1784 message = {"type": message_type}
1785 control_socket.send(pickle.dumps(message))
1787 # Wait for acknowledgment
1788 response = control_socket.recv()
1789 response_data = pickle.loads(response)
1791 if response_data.get("status") == "success":
1792 logger.info(f"🔬 VISUALIZER: {message_type} acknowledged by viewer")
1793 return True
1794 else:
1795 logger.warning(f"🔬 VISUALIZER: {message_type} failed: {response_data}")
1796 return False
1798 except zmq.Again:
1799 logger.warning(
1800 f"🔬 VISUALIZER: Timeout waiting for {message_type} acknowledgment"
1801 )
1802 return False
1803 except Exception as e:
1804 logger.warning(f"🔬 VISUALIZER: Failed to send {message_type}: {e}")
1805 return False
1806 finally:
1807 if control_socket:
1808 try:
1809 control_socket.close()
1810 except Exception as e:
1811 logger.debug(f"Failed to close control socket: {e}")
1812 if control_context:
1813 try:
1814 control_context.term()
1815 except Exception as e:
1816 logger.debug(f"Failed to terminate control context: {e}")
1818 def clear_viewer_state(self) -> bool:
1819 """
1820 Clear accumulated viewer state (component groups) for a new pipeline run.
1822 Returns:
1823 True if state was cleared successfully, False otherwise
1824 """
1825 return self.send_control_message("clear_state")
1827 def send_image_data(
1828 self, step_id: str, image_data: np.ndarray, axis_id: str = "unknown"
1829 ):
1830 """
1831 DISABLED: This method bypasses component-aware stacking.
1832 All visualization must go through the streaming backend.
1833 """
1834 raise RuntimeError(
1835 f"send_image_data() is disabled. Use streaming backend for component-aware display. "
1836 f"step_id: {step_id}, axis_id: {axis_id}, shape: {image_data.shape}"
1837 )
1839 def stop_viewer(self):
1840 """Stop the napari viewer process (only if not persistent)."""
1841 with self._lock:
1842 if not self.persistent:
1843 logger.info("🔬 VISUALIZER: Stopping non-persistent napari viewer")
1844 self._cleanup_zmq()
1845 if self.process:
1846 # Handle both subprocess and multiprocessing process types
1847 if hasattr(self.process, "is_alive"):
1848 # multiprocessing.Process
1849 if self.process.is_alive():
1850 self.process.terminate()
1851 self.process.join(timeout=5)
1852 if self.process.is_alive():
1853 logger.warning(
1854 "🔬 VISUALIZER: Force killing napari viewer process"
1855 )
1856 self.process.kill()
1857 else:
1858 # subprocess.Popen
1859 if self.process.poll() is None: # Still running
1860 self.process.terminate()
1861 try:
1862 self.process.wait(timeout=5)
1863 except subprocess.TimeoutExpired:
1864 logger.warning(
1865 "🔬 VISUALIZER: Force killing napari viewer process"
1866 )
1867 self.process.kill()
1868 self._is_running = False
1869 else:
1870 logger.info("🔬 VISUALIZER: Keeping persistent napari viewer alive")
1871 # Just cleanup our ZMQ connection, leave process running
1872 self._cleanup_zmq()
1873 # DON'T set is_running = False for persistent viewers!
1874 # The process is still alive and should be reusable
1876 def _cleanup_zmq(self):
1877 """Clean up ZeroMQ resources."""
1878 if self.zmq_socket:
1879 self.zmq_socket.close()
1880 self.zmq_socket = None
1881 if self.zmq_context:
1882 self.zmq_context.term()
1883 self.zmq_context = None
1885 def visualize_path(
1886 self, step_id: str, path: str, backend: str, axis_id: Optional[str] = None
1887 ):
1888 """
1889 DISABLED: This method bypasses component-aware stacking.
1890 All visualization must go through the streaming backend.
1891 """
1892 raise RuntimeError(
1893 f"visualize_path() is disabled. Use streaming backend for component-aware display. "
1894 f"Path: {path}, step_id: {step_id}, axis_id: {axis_id}"
1895 )
1897 def _prepare_data_for_display(
1898 self, data: Any, step_id_for_log: str, display_config=None
1899 ) -> Optional[np.ndarray]:
1900 """Converts loaded data to a displayable NumPy array (slice or stack based on config)."""
1901 cpu_tensor: Optional[np.ndarray] = None
1902 try:
1903 # GPU to CPU conversion logic
1904 if hasattr(data, "is_cuda") and data.is_cuda: # PyTorch
1905 cpu_tensor = data.cpu().numpy()
1906 elif (
1907 hasattr(data, "device") and "cuda" in str(data.device).lower()
1908 ): # Check for device attribute
1909 if hasattr(data, "get"): # CuPy
1910 cpu_tensor = data.get()
1911 elif hasattr(
1912 data, "numpy"
1913 ): # JAX on GPU might have .numpy() after host transfer
1914 cpu_tensor = np.asarray(
1915 data
1916 ) # JAX arrays might need explicit conversion
1917 else: # Fallback for other GPU array types if possible
1918 logger.warning(
1919 f"Unknown GPU array type for step '{step_id_for_log}'. Attempting .numpy()."
1920 )
1921 if hasattr(data, "numpy"):
1922 cpu_tensor = data.numpy()
1923 else:
1924 logger.error(
1925 f"Cannot convert GPU tensor of type {type(data)} for step '{step_id_for_log}'."
1926 )
1927 return None
1928 elif isinstance(data, np.ndarray):
1929 cpu_tensor = data
1930 else:
1931 # Attempt to convert to numpy array if it's some other array-like structure
1932 try:
1933 cpu_tensor = np.asarray(data)
1934 logger.debug(
1935 f"Converted data of type {type(data)} to numpy array for step '{step_id_for_log}'."
1936 )
1937 except Exception as e_conv:
1938 logger.warning(
1939 f"Unsupported data type for step '{step_id_for_log}': {type(data)}. Error: {e_conv}"
1940 )
1941 return None
1943 if cpu_tensor is None: # Should not happen if logic above is correct
1944 return None
1946 # Determine display mode based on configuration
1947 # Default behavior: show as stack unless config specifies otherwise
1948 should_slice = False
1950 if display_config:
1951 # Check if any component mode is set to SLICE
1952 from openhcs.core.config import NapariDimensionMode
1953 from openhcs.constants import AllComponents
1955 # Check individual component mode fields for all dimensions
1956 for component in AllComponents:
1957 field_name = f"{component.value}_mode"
1958 if hasattr(display_config, field_name):
1959 mode = getattr(display_config, field_name)
1960 if mode == NapariDimensionMode.SLICE:
1961 should_slice = True
1962 break
1963 else:
1964 # Default: slice for backward compatibility
1965 should_slice = True
1967 # Slicing/stacking logic
1968 display_data: Optional[np.ndarray] = None
1970 if should_slice:
1971 # Original slicing behavior
1972 if cpu_tensor.ndim == 3: # ZYX
1973 display_data = cpu_tensor[cpu_tensor.shape[0] // 2, :, :]
1974 elif cpu_tensor.ndim == 2: # YX
1975 display_data = cpu_tensor
1976 elif cpu_tensor.ndim > 3: # e.g. CZYX or TZYX
1977 logger.debug(
1978 f"Tensor for step '{step_id_for_log}' has ndim > 3 ({cpu_tensor.ndim}). Taking a slice."
1979 )
1980 slicer = [0] * (cpu_tensor.ndim - 2) # Slice first channels/times
1981 slicer[-1] = cpu_tensor.shape[-3] // 2 # Middle Z
1982 try:
1983 display_data = cpu_tensor[tuple(slicer)]
1984 except (
1985 IndexError
1986 ): # Handle cases where slicing might fail (e.g. very small dimensions)
1987 logger.error(
1988 f"Slicing failed for tensor with shape {cpu_tensor.shape} for step '{step_id_for_log}'.",
1989 exc_info=True,
1990 )
1991 display_data = None
1992 else:
1993 logger.warning(
1994 f"Tensor for step '{step_id_for_log}' has unsupported ndim for slicing: {cpu_tensor.ndim}."
1995 )
1996 return None
1997 else:
1998 # Stack mode: send the full data to napari (napari can handle 3D+ data)
1999 if cpu_tensor.ndim >= 2:
2000 display_data = cpu_tensor
2001 logger.debug(
2002 f"Sending {cpu_tensor.ndim}D stack to napari for step '{step_id_for_log}' (shape: {cpu_tensor.shape})"
2003 )
2004 else:
2005 logger.warning(
2006 f"Tensor for step '{step_id_for_log}' has unsupported ndim for stacking: {cpu_tensor.ndim}."
2007 )
2008 return None
2010 return display_data.copy() if display_data is not None else None
2012 except Exception as e:
2013 logger.error(
2014 f"Error preparing data from step '{step_id_for_log}' for display: {e}",
2015 exc_info=True,
2016 )
2017 return None