Coverage for openhcs/runtime/napari_stream_visualizer.py: 0.9%

894 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Napari-based real-time visualization module for OpenHCS. 

3 

4This module provides the NapariStreamVisualizer class for real-time 

5visualization of tensors during pipeline execution. 

6 

7Doctrinal Clauses: 

8- Clause 65 — No Fallback Logic 

9- Clause 66 — Immutability After Construction 

10- Clause 88 — No Inferred Capabilities 

11- Clause 368 — Visualization Must Be Observer-Only 

12""" 

13 

14import logging 

15import multiprocessing 

16import os 

17import subprocess 

18import sys 

19import threading 

20import time 

21import zmq 

22import numpy as np 

23from typing import Any, Dict, Optional 

24from qtpy.QtCore import QTimer 

25 

26from openhcs.io.filemanager import FileManager 

27from openhcs.utils.import_utils import optional_import 

28from openhcs.runtime.zmq_base import ZMQServer, SHARED_ACK_PORT, get_zmq_transport_url 

29from openhcs.runtime.zmq_messages import ImageAck 

30from openhcs.core.config import TransportMode, NapariStreamingConfig 

31 

32# Optional napari import - this module should only be imported if napari is available 

33napari = optional_import("napari") 

34if napari is None: 

35 raise ImportError( 

36 "napari is required for NapariStreamVisualizer. " 

37 "Install it with: pip install 'openhcs[viz]' or pip install napari" 

38 ) 

39 

40 

41logger = logging.getLogger(__name__) 

42 

43# ZMQ connection delay (ms) 

44ZMQ_CONNECTION_DELAY_MS = 100 # Brief delay for ZMQ connection to establish 

45 

46# Global process management for napari viewer 

47_global_viewer_process: Optional[multiprocessing.Process] = None 

48_global_viewer_port: Optional[int] = None 

49_global_process_lock = threading.Lock() 

50 

51# Registry of data type handlers (will be populated after helper functions are defined) 

52_DATA_TYPE_HANDLERS = None 

53 

54 

55def _cleanup_global_viewer() -> None: 

56 """ 

57 Clean up global napari viewer process for test mode. 

58 

59 This forcibly terminates the napari viewer process to allow pytest to exit. 

60 Should only be called in test mode. 

61 """ 

62 global _global_viewer_process 

63 

64 with _global_process_lock: 

65 if _global_viewer_process and _global_viewer_process.is_alive(): 

66 logger.info("🔬 VISUALIZER: Terminating napari viewer for test cleanup") 

67 _global_viewer_process.terminate() 

68 _global_viewer_process.join(timeout=3) 

69 

70 if _global_viewer_process.is_alive(): 

71 logger.warning("🔬 VISUALIZER: Force killing napari viewer process") 

72 _global_viewer_process.kill() 

73 _global_viewer_process.join(timeout=1) 

74 

75 _global_viewer_process = None 

76 

77 

78def _parse_component_info_from_path(path_str: str): 

79 """ 

80 Fallback component parsing from path (used when component metadata unavailable). 

81 

82 Args: 

83 path_str: Path string like 'step_name/A01/s1_c2_z3.tif' 

84 

85 Returns: 

86 Dict with basic component info extracted from filename 

87 """ 

88 try: 

89 import os 

90 import re 

91 

92 filename = os.path.basename(path_str) 

93 

94 # Basic regex for common patterns 

95 pattern = r"(?:s(\d+))?(?:_c(\d+))?(?:_z(\d+))?" 

96 match = re.search(pattern, filename) 

97 

98 components = {} 

99 if match: 

100 site, channel, z_index = match.groups() 

101 if site: 

102 components["site"] = site 

103 if channel: 

104 components["channel"] = channel 

105 if z_index: 

106 components["z_index"] = z_index 

107 

108 return components 

109 except Exception: 

110 return {} 

111 

112 

113def _build_nd_shapes(layer_items, stack_components): 

114 """ 

115 Build nD shapes by prepending stack component indices to 2D shape coordinates. 

116 

117 Args: 

118 layer_items: List of items with 'data' (shapes_data) and 'components' 

119 stack_components: List of component names to stack 

120 

121 Returns: 

122 Tuple of (all_shapes_nd, all_shape_types, all_properties) 

123 """ 

124 from openhcs.runtime.roi_converters import NapariROIConverter 

125 

126 all_shapes_nd = [] 

127 all_shape_types = [] 

128 all_properties = {"label": [], "area": [], "centroid_y": [], "centroid_x": []} 

129 

130 # Build component value to index mapping (same as _build_nd_image_array) 

131 component_values = {} 

132 for comp in stack_components: 

133 values = sorted(set(item["components"].get(comp, 0) for item in layer_items)) 

134 component_values[comp] = values 

135 

136 for item in layer_items: 

137 shapes_data = item["data"] # List of shape dicts 

138 components = item["components"] 

139 

140 # Get stack component INDICES to prepend (not values!) 

141 prepend_dims = [ 

142 component_values[comp].index(components.get(comp, 0)) 

143 for comp in stack_components 

144 ] 

145 

146 # Convert each shape to nD 

147 for shape_dict in shapes_data: 

148 # Use registry-based dimension handler 

149 nd_coords = NapariROIConverter.add_dimensions_to_shape( 

150 shape_dict, prepend_dims 

151 ) 

152 all_shapes_nd.append(nd_coords) 

153 all_shape_types.append(shape_dict["type"]) 

154 

155 # Extract properties 

156 metadata = shape_dict.get("metadata", {}) 

157 centroid = metadata.get("centroid", (0, 0)) 

158 all_properties["label"].append(metadata.get("label", "")) 

159 all_properties["area"].append(metadata.get("area", 0)) 

160 all_properties["centroid_y"].append(centroid[0]) 

161 all_properties["centroid_x"].append(centroid[1]) 

162 

163 return all_shapes_nd, all_shape_types, all_properties 

164 

165 

166def _build_nd_image_array(layer_items, stack_components): 

167 """ 

168 Build nD image array by stacking images along stack component dimensions. 

169 

170 Args: 

171 layer_items: List of items with 'data' (image arrays) and 'components' 

172 stack_components: List of component names to stack 

173 

174 Returns: 

175 np.ndarray: Stacked image array 

176 """ 

177 if len(stack_components) == 1: 

178 # Single stack component - simple 3D stack 

179 image_stack = [img["data"] for img in layer_items] 

180 from openhcs.core.memory.stack_utils import stack_slices 

181 

182 return stack_slices(image_stack, memory_type="numpy", gpu_id=0) 

183 else: 

184 # Multiple stack components - create multi-dimensional array 

185 component_values = {} 

186 for comp in stack_components: 

187 values = sorted(set(img["components"].get(comp, 0) for img in layer_items)) 

188 component_values[comp] = values 

189 

190 # Log component values for debugging 

191 logger.info( 

192 f"🔬 NAPARI PROCESS: Building nD array with stack_components={stack_components}, component_values={component_values}" 

193 ) 

194 

195 # Create empty array with shape (comp1_size, comp2_size, ..., y, x) 

196 first_img = layer_items[0]["data"] 

197 stack_shape = ( 

198 tuple(len(component_values[comp]) for comp in stack_components) 

199 + first_img.shape 

200 ) 

201 stacked_array = np.zeros(stack_shape, dtype=first_img.dtype) 

202 logger.info( 

203 f"🔬 NAPARI PROCESS: Created nD array with shape {stack_shape} from {len(layer_items)} items" 

204 ) 

205 

206 # Fill array 

207 for img in layer_items: 

208 # Get indices for this image 

209 indices = tuple( 

210 component_values[comp].index(img["components"].get(comp, 0)) 

211 for comp in stack_components 

212 ) 

213 logger.debug( 

214 f"🔬 NAPARI PROCESS: Placing image at indices {indices}, components={img['components']}" 

215 ) 

216 stacked_array[indices] = img["data"] 

217 

218 return stacked_array 

219 

220 

221def _create_or_update_shapes_layer( 

222 viewer, layers, layer_name, shapes_data, shape_types, properties 

223): 

224 """ 

225 Create or update a Napari shapes layer. 

226 

227 Note: Shapes layers don't handle .data updates well when dimensions change. 

228 We remove and recreate the layer instead of updating in place. 

229 

230 Args: 

231 viewer: Napari viewer 

232 layers: Dict of existing layers 

233 layer_name: Name for the layer 

234 shapes_data: List of shape coordinate arrays 

235 shape_types: List of shape type strings 

236 properties: Dict of properties 

237 

238 Returns: 

239 The created or updated layer 

240 """ 

241 # Check if layer exists 

242 existing_layer = None 

243 for layer in viewer.layers: 

244 if layer.name == layer_name: 

245 existing_layer = layer 

246 break 

247 

248 if existing_layer is not None: 

249 # For shapes, we need to remove and recreate because .data assignment doesn't work 

250 # when dimensions change. But we need to be careful: removing the layer will trigger 

251 # the reconciliation code to clear component_groups on the NEXT data arrival. 

252 # 

253 # Solution: Remove the layer, but immediately recreate it so it exists when 

254 # reconciliation runs on the next well. 

255 viewer.layers.remove(existing_layer) 

256 layers.pop(layer_name, None) 

257 logger.info( 

258 f"🔬 NAPARI PROCESS: Removed existing shapes layer {layer_name} for recreation" 

259 ) 

260 

261 # Create new layer (or recreate if we just removed it) 

262 new_layer = viewer.add_shapes( 

263 shapes_data, 

264 shape_type=shape_types, 

265 properties=properties, 

266 name=layer_name, 

267 edge_color="red", 

268 face_color="transparent", 

269 edge_width=2, 

270 ) 

271 layers[layer_name] = new_layer 

272 logger.info( 

273 f"🔬 NAPARI PROCESS: Created shapes layer {layer_name} with {len(shapes_data)} shapes" 

274 ) 

275 return new_layer 

276 

277 

278def _create_or_update_image_layer( 

279 viewer, layers, layer_name, image_data, colormap, axis_labels=None 

280): 

281 """ 

282 Create or update a Napari image layer. 

283 

284 Args: 

285 viewer: Napari viewer 

286 layers: Dict of existing layers 

287 layer_name: Name for the layer 

288 image_data: Image array 

289 colormap: Colormap name 

290 axis_labels: Optional tuple of axis label strings for dimension names 

291 

292 Returns: 

293 The created or updated layer 

294 """ 

295 # Check if layer exists 

296 existing_layer = None 

297 for layer in viewer.layers: 

298 if layer.name == layer_name: 

299 existing_layer = layer 

300 break 

301 

302 if existing_layer is not None: 

303 old_shape = existing_layer.data.shape 

304 new_shape = image_data.shape 

305 if old_shape != new_shape: 

306 logger.info( 

307 f"🔬 NAPARI PROCESS: Layer {layer_name} shape changing: {old_shape}{new_shape}" 

308 ) 

309 existing_layer.data = image_data 

310 if colormap: 

311 existing_layer.colormap = colormap 

312 layers[layer_name] = existing_layer 

313 logger.info( 

314 f"🔬 NAPARI PROCESS: Updated existing image layer {layer_name} (shape: {new_shape})" 

315 ) 

316 return existing_layer 

317 else: 

318 new_layer = viewer.add_image( 

319 image_data, name=layer_name, colormap=colormap or "gray" 

320 ) 

321 

322 # Set axis labels on viewer.dims (add_image axis_labels parameter doesn't work) 

323 # See: https://forum.image.sc/t/rename-napari-dimension-slider-labels/41974 

324 if axis_labels is not None: 

325 viewer.dims.axis_labels = axis_labels 

326 logger.info(f"🔬 NAPARI PROCESS: Set viewer.dims.axis_labels={axis_labels}") 

327 

328 layers[layer_name] = new_layer 

329 logger.info(f"🔬 NAPARI PROCESS: Created new image layer {layer_name}") 

330 return new_layer 

331 

332 

333# Populate registry now that helper functions are defined 

334from openhcs.constants.streaming import StreamingDataType 

335 

336_DATA_TYPE_HANDLERS = { 

337 StreamingDataType.IMAGE: { 

338 "build_nd_data": _build_nd_image_array, 

339 "create_layer": _create_or_update_image_layer, 

340 }, 

341 StreamingDataType.SHAPES: { 

342 "build_nd_data": _build_nd_shapes, 

343 "create_layer": _create_or_update_shapes_layer, 

344 }, 

345} 

346 

347 

348def _handle_component_aware_display( 

349 viewer, 

350 layers, 

351 component_groups, 

352 data, 

353 path, 

354 colormap, 

355 display_config, 

356 replace_layers, 

357 component_metadata=None, 

358 data_type="image", 

359 server=None, 

360): 

361 """ 

362 Handle component-aware display following OpenHCS stacking patterns. 

363 

364 Components marked as SLICE create separate layers, components marked as STACK are stacked together. 

365 Layer naming follows canonical component order from display config. 

366 

367 Args: 

368 data_type: 'image' for image data, 'shapes' for ROI/shapes data (string or StreamingDataType enum) 

369 server: NapariViewerServer instance (needed for debounced updates) 

370 """ 

371 try: 

372 # Convert data_type to enum if needed (for backwards compatibility) 

373 if isinstance(data_type, str): 

374 data_type = StreamingDataType(data_type) 

375 

376 # Use component metadata from ZMQ message - fail loud if not available 

377 if not component_metadata: 

378 raise ValueError(f"No component metadata available for path: {path}") 

379 component_info = component_metadata 

380 

381 # Build component_modes and component_order from config (dict or object) 

382 component_modes = None 

383 component_order = None 

384 

385 if isinstance(display_config, dict): 

386 cm = display_config.get("component_modes") or display_config.get( 

387 "componentModes" 

388 ) 

389 if isinstance(cm, dict) and cm: 

390 component_modes = cm 

391 component_order = display_config["component_order"] 

392 else: 

393 # Handle object-like config (NapariDisplayConfig) 

394 component_order = display_config.COMPONENT_ORDER 

395 component_modes = {} 

396 for component in component_order: 

397 mode_field = f"{component}_mode" 

398 if hasattr(display_config, mode_field): 

399 mode_value = getattr(display_config, mode_field) 

400 component_modes[component] = getattr( 

401 mode_value, "value", str(mode_value) 

402 ) 

403 

404 # Generic layer naming - iterate over components in canonical order 

405 # Components in SLICE mode create separate layers 

406 # Components in STACK mode are combined into the same layer 

407 

408 layer_key_parts = [] 

409 for component in component_order: 

410 mode = component_modes.get(component) 

411 if mode == "slice" and component in component_info: 

412 value = component_info[component] 

413 layer_key_parts.append(f"{component}_{value}") 

414 

415 layer_key = "_".join(layer_key_parts) if layer_key_parts else "default_layer" 

416 

417 # Log component modes for debugging 

418 logger.info( 

419 f"🔍 NAPARI PROCESS: component_modes={component_modes}, layer_key='{layer_key}'" 

420 ) 

421 

422 # Add "_shapes" suffix for shapes layers to distinguish from image layers 

423 # MUST happen BEFORE reconciliation so we check the correct layer name 

424 if data_type == StreamingDataType.SHAPES: 

425 layer_key = f"{layer_key}_shapes" 

426 

427 # Log layer key and component info for debugging 

428 logger.info( 

429 f"🔍 NAPARI PROCESS: layer_key='{layer_key}', component_info={component_info}" 

430 ) 

431 

432 # Reconcile cached layer/group state with live napari viewer after possible manual deletions 

433 # CRITICAL: Only purge if the layer WAS in our cache but is now missing from viewer 

434 # (user manually deleted it). Do NOT purge if layer was never created yet (debounced update pending). 

435 try: 

436 current_layer_names = {l.name for l in viewer.layers} 

437 if layer_key not in current_layer_names and layer_key in layers: 

438 # Layer was in our cache but is now missing from viewer - user deleted it 

439 # Drop stale references so we will recreate the layer 

440 num_items = len(component_groups.get(layer_key, [])) 

441 layers.pop(layer_key, None) 

442 component_groups.pop(layer_key, None) 

443 logger.info( 

444 f"🔬 NAPARI PROCESS: Reconciling state — '{layer_key}' was deleted from viewer; purged stale caches (had {num_items} items in component_groups)" 

445 ) 

446 except Exception: 

447 # Fail-loud elsewhere; reconciliation is best-effort and must not mask display 

448 pass 

449 

450 # Initialize layer group if needed 

451 if layer_key not in component_groups: 

452 component_groups[layer_key] = [] 

453 

454 # Handle replace_layers mode: clear all items for this layer_key 

455 if replace_layers and component_groups[layer_key]: 

456 logger.info( 

457 f"🔬 NAPARI PROCESS: replace_layers=True, clearing {len(component_groups[layer_key])} existing items from layer '{layer_key}'" 

458 ) 

459 component_groups[layer_key] = [] 

460 

461 # Check if an item with the same component_info AND data_type already exists 

462 # If so, replace it instead of appending (prevents accumulation across runs) 

463 # CRITICAL: Must include 'well' in comparison even if it's in STACK mode, 

464 # otherwise images from different wells with same channel/z/field will be treated as duplicates 

465 # CRITICAL: Must also check data_type to prevent images and ROIs from being treated as duplicates 

466 existing_index = None 

467 for i, item in enumerate(component_groups[layer_key]): 

468 # Compare ALL components including well AND data_type 

469 if item["components"] == component_info and item["data_type"] == data_type: 

470 logger.info( 

471 f"🔬 NAPARI PROCESS: Found duplicate - component_info: {component_info}, data_type: {data_type} at index {i}" 

472 ) 

473 existing_index = i 

474 break 

475 

476 new_item = { 

477 "data": data, 

478 "components": component_info, 

479 "path": str(path), 

480 "data_type": data_type, 

481 } 

482 

483 if existing_index is not None: 

484 # Replace existing item with same components and data type 

485 old_data_type = component_groups[layer_key][existing_index]["data_type"] 

486 component_groups[layer_key][existing_index] = new_item 

487 logger.info( 

488 f"🔬 NAPARI PROCESS: Replaced {old_data_type} item in component_groups[{layer_key}] at index {existing_index}, total items: {len(component_groups[layer_key])}" 

489 ) 

490 else: 

491 # Add new item 

492 component_groups[layer_key].append(new_item) 

493 logger.info( 

494 f"🔬 NAPARI PROCESS: Added {data_type} to component_groups[{layer_key}], now has {len(component_groups[layer_key])} items" 

495 ) 

496 

497 # Schedule debounced layer update instead of immediate update 

498 # This prevents race conditions when multiple items arrive rapidly 

499 if server is None: 

500 raise ValueError("Server instance required for debounced updates") 

501 logger.info( 

502 f"🔬 NAPARI PROCESS: Scheduling debounced update for {layer_key} (data_type={data_type})" 

503 ) 

504 server._schedule_layer_update( 

505 layer_key, data_type, component_modes, component_order 

506 ) 

507 

508 except Exception as e: 

509 import traceback 

510 

511 logger.error( 

512 f"🔬 NAPARI PROCESS: Component-aware display failed for {path}: {e}" 

513 ) 

514 logger.error( 

515 f"🔬 NAPARI PROCESS: Component-aware display traceback: {traceback.format_exc()}" 

516 ) 

517 raise # Fail loud - no fallback 

518 

519 

520def _old_immediate_update_logic_removed(): 

521 """ 

522 Old immediate update logic removed in favor of debounced updates. 

523 Kept as reference for the variable size handling logic that needs to be ported. 

524 """ 

525 pass 

526 # Old code was here - removed to prevent race conditions 

527 # Now using _schedule_layer_update -> _execute_layer_update -> _update_image_layer/_update_shapes_layer 

528 

529 

530class NapariViewerServer(ZMQServer): 

531 """ 

532 ZMQ server for Napari viewer that receives images from clients. 

533 

534 Inherits from ZMQServer ABC to get ping/pong, port management, etc. 

535 Uses SUB socket to receive images from pipeline clients. 

536 """ 

537 

538 _server_type = "napari" # Registration key for AutoRegisterMeta 

539 

540 def __init__( 

541 self, 

542 port: int, 

543 viewer_title: str, 

544 replace_layers: bool = False, 

545 log_file_path: str = None, 

546 transport_mode: TransportMode = TransportMode.IPC, 

547 ): 

548 """ 

549 Initialize Napari viewer server. 

550 

551 Args: 

552 port: Data port for receiving images (control port will be port + 1000) 

553 viewer_title: Title for the napari viewer window 

554 replace_layers: If True, replace existing layers; if False, add new layers 

555 log_file_path: Path to log file (for client discovery) 

556 transport_mode: ZMQ transport mode (IPC or TCP) 

557 """ 

558 import zmq 

559 

560 # Initialize with SUB socket for receiving images 

561 super().__init__( 

562 port, 

563 host="*", 

564 log_file_path=log_file_path, 

565 data_socket_type=zmq.SUB, 

566 transport_mode=transport_mode, 

567 ) 

568 

569 self.viewer_title = viewer_title 

570 self.replace_layers = replace_layers 

571 self.viewer = None 

572 self.layers = {} 

573 self.component_groups = {} 

574 

575 # Debouncing + locking for layer updates to prevent race conditions 

576 import threading 

577 

578 self.layer_update_lock = threading.Lock() # Prevent concurrent updates 

579 self.pending_updates = {} # layer_key -> QTimer (debounce) 

580 self.update_delay_ms = 1000 # Wait 200ms for more items before rebuilding 

581 

582 # Create PUSH socket for sending acknowledgments to shared ack port 

583 self.ack_socket = None 

584 self._setup_ack_socket() 

585 

586 def _setup_ack_socket(self): 

587 """Setup PUSH socket for sending acknowledgments.""" 

588 import zmq 

589 

590 try: 

591 ack_url = get_zmq_transport_url( 

592 SHARED_ACK_PORT, self.transport_mode, "localhost" 

593 ) 

594 

595 context = zmq.Context.instance() 

596 self.ack_socket = context.socket(zmq.PUSH) 

597 self.ack_socket.connect(ack_url) 

598 logger.info(f"🔬 NAPARI SERVER: Connected ack socket to {ack_url}") 

599 except Exception as e: 

600 logger.warning(f"🔬 NAPARI SERVER: Failed to setup ack socket: {e}") 

601 self.ack_socket = None 

602 

603 def _schedule_layer_update( 

604 self, layer_key, data_type, component_modes, component_order 

605 ): 

606 """ 

607 Schedule a debounced layer update. 

608 

609 Cancels any pending update for this layer and schedules a new one. 

610 This prevents race conditions when multiple items arrive rapidly. 

611 """ 

612 # Cancel existing timer if any 

613 if layer_key in self.pending_updates: 

614 self.pending_updates[layer_key].stop() 

615 logger.debug(f"🔬 NAPARI PROCESS: Cancelled pending update for {layer_key}") 

616 

617 # Create new timer 

618 timer = QTimer() 

619 timer.setSingleShot(True) 

620 timer.timeout.connect( 

621 lambda: self._execute_layer_update( 

622 layer_key, data_type, component_modes, component_order 

623 ) 

624 ) 

625 timer.start(self.update_delay_ms) 

626 self.pending_updates[layer_key] = timer 

627 logger.debug( 

628 f"🔬 NAPARI PROCESS: Scheduled update for {layer_key} in {self.update_delay_ms}ms" 

629 ) 

630 

631 def _execute_layer_update( 

632 self, layer_key, data_type, component_modes, component_order 

633 ): 

634 """ 

635 Execute the actual layer update after debounce delay. 

636 

637 Uses a lock to prevent concurrent updates to different layers. 

638 """ 

639 # Remove timer 

640 self.pending_updates.pop(layer_key, None) 

641 

642 # Acquire lock to prevent concurrent updates 

643 with self.layer_update_lock: 

644 logger.info( 

645 f"🔬 NAPARI PROCESS: Executing debounced update for {layer_key}" 

646 ) 

647 

648 # Get current items for this layer 

649 layer_items = self.component_groups.get(layer_key, []) 

650 if not layer_items: 

651 logger.warning( 

652 f"🔬 NAPARI PROCESS: No items found for {layer_key}, skipping update" 

653 ) 

654 return 

655 

656 # Log layer composition 

657 wells_in_layer = set( 

658 item["components"].get("well", "unknown") for item in layer_items 

659 ) 

660 logger.info( 

661 f"🔬 NAPARI PROCESS: layer_key='{layer_key}' has {len(layer_items)} items from wells: {sorted(wells_in_layer)}" 

662 ) 

663 

664 # Determine if we should stack or use single item 

665 first_item = layer_items[0] 

666 component_info = first_item["components"] 

667 stack_components = [ 

668 comp 

669 for comp, mode in component_modes.items() 

670 if mode == "stack" and comp in component_info 

671 ] 

672 

673 # Build and update the layer based on data type 

674 if data_type == StreamingDataType.IMAGE: 

675 self._update_image_layer( 

676 layer_key, layer_items, stack_components, component_modes 

677 ) 

678 elif data_type == StreamingDataType.SHAPES: 

679 self._update_shapes_layer( 

680 layer_key, layer_items, stack_components, component_modes 

681 ) 

682 else: 

683 logger.warning( 

684 f"🔬 NAPARI PROCESS: Unknown data type {data_type} for {layer_key}" 

685 ) 

686 

687 def _update_image_layer( 

688 self, layer_key, layer_items, stack_components, component_modes 

689 ): 

690 """Update an image layer with the current items.""" 

691 # Check if images have different shapes and pad if needed 

692 shapes = [item["data"].shape for item in layer_items] 

693 if len(set(shapes)) > 1: 

694 logger.info( 

695 f"🔬 NAPARI PROCESS: Images in layer {layer_key} have different shapes - padding to max size" 

696 ) 

697 

698 # Find max dimensions 

699 first_shape = shapes[0] 

700 max_shape = list(first_shape) 

701 for img_shape in shapes: 

702 for i, dim in enumerate(img_shape): 

703 max_shape[i] = max(max_shape[i], dim) 

704 max_shape = tuple(max_shape) 

705 

706 # Pad all images to max shape 

707 for img_info in layer_items: 

708 img_data = img_info["data"] 

709 if img_data.shape != max_shape: 

710 # Calculate padding for each dimension 

711 pad_width = [] 

712 for i, (current_dim, max_dim) in enumerate( 

713 zip(img_data.shape, max_shape) 

714 ): 

715 pad_before = 0 

716 pad_after = max_dim - current_dim 

717 pad_width.append((pad_before, pad_after)) 

718 

719 # Pad with zeros 

720 padded_data = np.pad( 

721 img_data, pad_width, mode="constant", constant_values=0 

722 ) 

723 img_info["data"] = padded_data 

724 logger.debug( 

725 f"🔬 NAPARI PROCESS: Padded image from {img_data.shape} to {padded_data.shape}" 

726 ) 

727 

728 logger.info( 

729 f"🔬 NAPARI PROCESS: Building nD data for {layer_key} from {len(layer_items)} items" 

730 ) 

731 stacked_data = _build_nd_image_array(layer_items, stack_components) 

732 

733 # Determine colormap 

734 colormap = None 

735 if "channel" in component_modes and component_modes["channel"] == "slice": 

736 first_item = layer_items[0] 

737 channel_value = first_item["components"].get("channel") 

738 if channel_value == 1: 

739 colormap = "green" 

740 elif channel_value == 2: 

741 colormap = "red" 

742 

743 # Build axis labels for stacked dimensions 

744 # Format: (component1_name, component2_name, ..., 'y', 'x') 

745 # The stack components appear in the same order as in stack_components list 

746 # Must be a tuple for Napari 

747 axis_labels = None 

748 if stack_components: 

749 axis_labels = tuple(list(stack_components) + ["y", "x"]) 

750 logger.info( 

751 f"🔬 NAPARI PROCESS: Built axis_labels={axis_labels} for stack_components={stack_components}" 

752 ) 

753 

754 # Create or update the layer 

755 _create_or_update_image_layer( 

756 self.viewer, self.layers, layer_key, stacked_data, colormap, axis_labels 

757 ) 

758 

759 def _update_shapes_layer( 

760 self, layer_key, layer_items, stack_components, component_modes 

761 ): 

762 """Update a shapes layer - use labels instead of shapes for efficiency.""" 

763 logger.info( 

764 f"🔬 NAPARI PROCESS: Converting shapes to labels for {layer_key} from {len(layer_items)} items" 

765 ) 

766 

767 # Convert shapes to label masks (much faster than individual shapes) 

768 # This happens synchronously but is fast because we're just creating arrays 

769 labels_data = self._shapes_to_labels(layer_items, stack_components) 

770 

771 # Remove existing layer if it exists 

772 if layer_key in self.layers: 

773 try: 

774 self.viewer.layers.remove(self.layers[layer_key]) 

775 logger.info( 

776 f"🔬 NAPARI PROCESS: Removed existing labels layer {layer_key} for recreation" 

777 ) 

778 except Exception as e: 

779 logger.warning( 

780 f"Failed to remove existing labels layer {layer_key}: {e}" 

781 ) 

782 

783 # Create new labels layer 

784 new_layer = self.viewer.add_labels(labels_data, name=layer_key) 

785 self.layers[layer_key] = new_layer 

786 logger.info( 

787 f"🔬 NAPARI PROCESS: Created labels layer {layer_key} with shape {labels_data.shape}" 

788 ) 

789 

790 def _shapes_to_labels(self, layer_items, stack_components): 

791 """Convert shapes data to label masks.""" 

792 from skimage import draw 

793 

794 # Build component value to index mapping 

795 component_values = {} 

796 for comp in stack_components: 

797 values = sorted( 

798 set(item["components"].get(comp, 0) for item in layer_items) 

799 ) 

800 component_values[comp] = values 

801 

802 # Determine output shape 

803 # Get image shape from first item's shapes data 

804 first_shapes = layer_items[0]["data"] 

805 if not first_shapes: 

806 # No shapes, return empty array 

807 return np.zeros((1, 1, 512, 512), dtype=np.uint16) 

808 

809 # Estimate image size from shape coordinates 

810 max_y, max_x = 0, 0 

811 for shape_dict in first_shapes: 

812 if shape_dict["type"] == "polygon": 

813 coords = np.array(shape_dict["coordinates"]) 

814 max_y = max(max_y, int(np.max(coords[:, 0])) + 1) 

815 max_x = max(max_x, int(np.max(coords[:, 1])) + 1) 

816 

817 # Build nD shape 

818 nd_shape = [] 

819 for comp in stack_components: 

820 nd_shape.append(len(component_values[comp])) 

821 nd_shape.extend([max_y, max_x]) 

822 

823 # Create empty label array 

824 labels_array = np.zeros(nd_shape, dtype=np.uint16) 

825 

826 # Fill in labels for each item 

827 label_id = 1 

828 for item in layer_items: 

829 # Get indices for this item 

830 indices = [] 

831 for comp in stack_components: 

832 comp_value = item["components"].get(comp, 0) 

833 idx = component_values[comp].index(comp_value) 

834 indices.append(idx) 

835 

836 # Get shapes data 

837 shapes_data = item["data"] 

838 

839 # Draw each shape into the label mask 

840 for shape_dict in shapes_data: 

841 if shape_dict["type"] == "polygon": 

842 coords = np.array(shape_dict["coordinates"]) 

843 rr, cc = draw.polygon( 

844 coords[:, 0], coords[:, 1], shape=labels_array.shape[-2:] 

845 ) 

846 

847 # Set label at the correct nD position 

848 full_indices = tuple(indices) + (rr, cc) 

849 labels_array[full_indices] = label_id 

850 label_id += 1 

851 

852 logger.info( 

853 f"🔬 NAPARI PROCESS: Created labels array with shape {labels_array.shape} and {label_id-1} labels" 

854 ) 

855 return labels_array 

856 

857 def _send_ack(self, image_id: str, status: str = "success", error: str = None): 

858 """Send acknowledgment that an image was processed. 

859 

860 Args: 

861 image_id: UUID of the processed image 

862 status: 'success' or 'error' 

863 error: Error message if status='error' 

864 """ 

865 if not self.ack_socket: 

866 return 

867 

868 try: 

869 ack = ImageAck( 

870 image_id=image_id, 

871 viewer_port=self.port, 

872 viewer_type="napari", 

873 status=status, 

874 timestamp=time.time(), 

875 error=error, 

876 ) 

877 self.ack_socket.send_json(ack.to_dict()) 

878 logger.debug(f"🔬 NAPARI SERVER: Sent ack for image {image_id}") 

879 except Exception as e: 

880 logger.warning(f"🔬 NAPARI SERVER: Failed to send ack for {image_id}: {e}") 

881 

882 def _create_pong_response(self) -> Dict[str, Any]: 

883 """Override to add Napari-specific fields and memory usage.""" 

884 response = super()._create_pong_response() 

885 response["viewer"] = "napari" 

886 response["openhcs"] = True 

887 response["server"] = "NapariViewer" 

888 

889 # Add memory usage 

890 try: 

891 import psutil 

892 import os 

893 

894 process = psutil.Process(os.getpid()) 

895 response["memory_mb"] = process.memory_info().rss / 1024 / 1024 

896 response["cpu_percent"] = process.cpu_percent(interval=0) 

897 except Exception: 

898 pass 

899 

900 return response 

901 

902 def handle_control_message(self, message: Dict[str, Any]) -> Dict[str, Any]: 

903 """ 

904 Handle control messages beyond ping/pong. 

905 

906 Supported message types: 

907 - shutdown: Graceful shutdown (closes viewer) 

908 - force_shutdown: Force shutdown (same as shutdown for Napari) 

909 - clear_state: Clear accumulated component groups (for new pipeline runs) 

910 """ 

911 msg_type = message.get("type") 

912 

913 if msg_type == "shutdown" or msg_type == "force_shutdown": 

914 logger.info(f"🔬 NAPARI SERVER: {msg_type} requested, closing viewer") 

915 self.request_shutdown() 

916 

917 # Schedule viewer close on Qt event loop to trigger application exit 

918 # This must be done after sending the response, so we use QTimer.singleShot 

919 if self.viewer is not None: 

920 from qtpy import QtCore 

921 

922 QtCore.QTimer.singleShot(100, self.viewer.close) 

923 

924 return { 

925 "type": "shutdown_ack", 

926 "status": "success", 

927 "message": "Napari viewer shutting down", 

928 } 

929 

930 elif msg_type == "clear_state": 

931 # Clear accumulated component groups to prevent shape accumulation across runs 

932 logger.info( 

933 f"🔬 NAPARI SERVER: Clearing component groups (had {len(self.component_groups)} groups)" 

934 ) 

935 self.component_groups.clear() 

936 return { 

937 "type": "clear_state_ack", 

938 "status": "success", 

939 "message": "Component groups cleared", 

940 } 

941 

942 # Unknown message type 

943 return {"status": "ok"} 

944 

945 def handle_data_message(self, message: Dict[str, Any]): 

946 """Handle incoming image data - called by process_messages().""" 

947 # This will be called from the Qt timer 

948 pass 

949 

950 def process_image_message(self, message: bytes): 

951 """ 

952 Process incoming image data message. 

953 

954 Args: 

955 message: Raw ZMQ message containing image data 

956 """ 

957 import json 

958 

959 # Parse JSON message 

960 data = json.loads(message.decode("utf-8")) 

961 

962 msg_type = data.get("type") 

963 

964 # Check message type 

965 if msg_type == "batch": 

966 # Handle batch of images/shapes 

967 images = data.get("images", []) 

968 display_config_dict = data.get("display_config") 

969 

970 for image_info in images: 

971 self._process_single_image(image_info, display_config_dict) 

972 

973 else: 

974 # Handle single image (legacy) 

975 self._process_single_image(data, data.get("display_config")) 

976 

977 def _process_single_image( 

978 self, image_info: Dict[str, Any], display_config_dict: Dict[str, Any] 

979 ): 

980 """Process a single image or shapes data and display in Napari.""" 

981 import numpy as np 

982 

983 path = image_info.get("path", "unknown") 

984 image_id = image_info.get("image_id") # UUID for acknowledgment 

985 data_type = image_info.get("data_type", "image") # 'image' or 'shapes' 

986 component_metadata = image_info.get("metadata", {}) 

987 

988 # Log incoming metadata to debug well filtering issues 

989 logger.info( 

990 f"🔍 NAPARI PROCESS: Received {data_type} with metadata: {component_metadata} (path: {path})" 

991 ) 

992 

993 try: 

994 # Check if this is shapes data 

995 if data_type == "shapes": 

996 # Handle shapes/ROIs - just pass the shapes data directly 

997 shapes_data = image_info.get("shapes", []) 

998 data = shapes_data 

999 colormap = None # Shapes don't use colormap 

1000 else: 

1001 # Handle image data - load from shared memory or direct data 

1002 shape = image_info.get("shape") 

1003 dtype = image_info.get("dtype") 

1004 shm_name = image_info.get("shm_name") 

1005 direct_data = image_info.get("data") 

1006 

1007 # Load image data 

1008 if shm_name: 

1009 from multiprocessing import shared_memory 

1010 

1011 try: 

1012 shm = shared_memory.SharedMemory(name=shm_name) 

1013 data = np.ndarray(shape, dtype=dtype, buffer=shm.buf).copy() 

1014 shm.close() 

1015 # Unlink shared memory after copying - viewer is responsible for cleanup 

1016 try: 

1017 shm.unlink() 

1018 except FileNotFoundError: 

1019 # Already unlinked (race condition or duplicate message) 

1020 logger.debug( 

1021 f"🔬 NAPARI PROCESS: Shared memory {shm_name} already unlinked" 

1022 ) 

1023 except Exception as e: 

1024 logger.warning( 

1025 f"🔬 NAPARI PROCESS: Failed to unlink shared memory {shm_name}: {e}" 

1026 ) 

1027 except FileNotFoundError: 

1028 # Shared memory doesn't exist - likely already processed and unlinked 

1029 logger.error( 

1030 f"🔬 NAPARI PROCESS: Shared memory {shm_name} not found - may have been already processed" 

1031 ) 

1032 if image_id: 

1033 self._send_ack( 

1034 image_id, 

1035 status="error", 

1036 error=f"Shared memory {shm_name} not found", 

1037 ) 

1038 return 

1039 except Exception as e: 

1040 logger.error( 

1041 f"🔬 NAPARI PROCESS: Failed to open shared memory {shm_name}: {e}" 

1042 ) 

1043 if image_id: 

1044 self._send_ack( 

1045 image_id, 

1046 status="error", 

1047 error=f"Failed to open shared memory: {e}", 

1048 ) 

1049 raise 

1050 elif direct_data: 

1051 data = np.array(direct_data, dtype=dtype).reshape(shape) 

1052 else: 

1053 logger.warning("🔬 NAPARI PROCESS: No image data in message") 

1054 if image_id: 

1055 self._send_ack( 

1056 image_id, status="error", error="No image data in message" 

1057 ) 

1058 return 

1059 

1060 # Extract colormap 

1061 colormap = "viridis" 

1062 if display_config_dict and "colormap" in display_config_dict: 

1063 colormap = display_config_dict["colormap"] 

1064 

1065 # Component-aware layer management (handles both images and shapes) 

1066 _handle_component_aware_display( 

1067 self.viewer, 

1068 self.layers, 

1069 self.component_groups, 

1070 data, 

1071 path, 

1072 colormap, 

1073 display_config_dict or {}, 

1074 self.replace_layers, 

1075 component_metadata, 

1076 data_type, 

1077 server=self, 

1078 ) 

1079 

1080 # Send acknowledgment that data was successfully displayed 

1081 if image_id: 

1082 self._send_ack(image_id, status="success") 

1083 

1084 except Exception as e: 

1085 logger.error( 

1086 f"🔬 NAPARI PROCESS: Failed to process {data_type} {path}: {e}" 

1087 ) 

1088 if image_id: 

1089 self._send_ack(image_id, status="error", error=str(e)) 

1090 raise # Fail loud 

1091 

1092 

1093def _napari_viewer_process( 

1094 port: int, 

1095 viewer_title: str, 

1096 replace_layers: bool = False, 

1097 log_file_path: str = None, 

1098 transport_mode: TransportMode = TransportMode.IPC, 

1099): 

1100 """ 

1101 Napari viewer process entry point. Runs in a separate process. 

1102 Listens for ZeroMQ messages with image data to display. 

1103 

1104 Args: 

1105 port: ZMQ port to listen on 

1106 viewer_title: Title for the napari viewer window 

1107 replace_layers: If True, replace existing layers; if False, add new layers with unique names 

1108 log_file_path: Path to log file (for client discovery via ping/pong) 

1109 transport_mode: ZMQ transport mode (IPC or TCP) 

1110 """ 

1111 try: 

1112 import zmq 

1113 import napari 

1114 

1115 # Create ZMQ server instance (inherits from ZMQServer ABC) 

1116 server = NapariViewerServer( 

1117 port, viewer_title, replace_layers, log_file_path, transport_mode 

1118 ) 

1119 

1120 # Start the server (binds sockets) 

1121 server.start() 

1122 

1123 # Create napari viewer in this process (main thread) 

1124 viewer = napari.Viewer(title=viewer_title, show=True) 

1125 server.viewer = viewer 

1126 

1127 # Initialize layers dictionary with existing layers (for reconnection scenarios) 

1128 for layer in viewer.layers: 

1129 server.layers[layer.name] = layer 

1130 

1131 logger.info( 

1132 f"🔬 NAPARI PROCESS: Viewer started on data port {port}, control port {server.control_port}" 

1133 ) 

1134 

1135 # Add cleanup handler for when viewer is closed 

1136 def cleanup_and_exit(): 

1137 logger.info("🔬 NAPARI PROCESS: Viewer closed, cleaning up and exiting...") 

1138 try: 

1139 server.stop() 

1140 except: 

1141 pass 

1142 sys.exit(0) 

1143 

1144 # Connect the viewer close event to cleanup 

1145 viewer.window.qt_viewer.destroyed.connect(cleanup_and_exit) 

1146 

1147 # Use proper Qt event loop integration 

1148 import sys 

1149 from qtpy import QtWidgets, QtCore 

1150 

1151 # Ensure Qt platform is properly set for detached processes 

1152 import os 

1153 import platform 

1154 

1155 if "QT_QPA_PLATFORM" not in os.environ: 

1156 if platform.system() == "Darwin": # macOS 

1157 os.environ["QT_QPA_PLATFORM"] = "cocoa" 

1158 elif platform.system() == "Linux": 

1159 os.environ["QT_QPA_PLATFORM"] = "xcb" 

1160 os.environ["QT_X11_NO_MITSHM"] = "1" 

1161 # Windows doesn't need QT_QPA_PLATFORM set 

1162 elif platform.system() == "Linux": 

1163 # Disable shared memory for X11 (helps with display issues in detached processes) 

1164 os.environ["QT_X11_NO_MITSHM"] = "1" 

1165 

1166 # Get the Qt application 

1167 app = QtWidgets.QApplication.instance() 

1168 if app is None: 

1169 app = QtWidgets.QApplication(sys.argv) 

1170 

1171 # Ensure the application DOES quit when the napari window closes 

1172 app.setQuitOnLastWindowClosed(True) 

1173 

1174 # Set up a QTimer for message processing 

1175 timer = QtCore.QTimer() 

1176 

1177 def process_messages(): 

1178 # Process control messages (ping/pong handled by ABC) 

1179 server.process_messages() 

1180 

1181 # Process data messages (images) if ready 

1182 if server._ready: 

1183 # Process multiple messages per timer tick for better throughput 

1184 for _ in range(10): # Process up to 10 messages per tick 

1185 try: 

1186 message = server.data_socket.recv(zmq.NOBLOCK) 

1187 server.process_image_message(message) 

1188 except zmq.Again: 

1189 # No more messages available 

1190 break 

1191 

1192 # Connect timer to message processing 

1193 timer.timeout.connect(process_messages) 

1194 timer.start(50) # Process messages every 50ms 

1195 

1196 logger.info("🔬 NAPARI PROCESS: Starting Qt event loop") 

1197 

1198 # Run the Qt event loop - this keeps napari responsive 

1199 app.exec_() 

1200 

1201 except Exception as e: 

1202 logger.error(f"🔬 NAPARI PROCESS: Fatal error: {e}") 

1203 finally: 

1204 logger.info("🔬 NAPARI PROCESS: Shutting down") 

1205 if "server" in locals(): 

1206 server.stop() 

1207 

1208 

1209def _spawn_detached_napari_process( 

1210 port: int, 

1211 viewer_title: str, 

1212 replace_layers: bool = False, 

1213 transport_mode: TransportMode = TransportMode.IPC, 

1214) -> subprocess.Popen: 

1215 """ 

1216 Spawn a completely detached napari viewer process that survives parent termination. 

1217 

1218 This creates a subprocess that runs independently and won't be terminated when 

1219 the parent process exits, enabling true persistence across pipeline runs. 

1220 

1221 Args: 

1222 port: ZMQ port to listen on 

1223 viewer_title: Title for the napari viewer window 

1224 replace_layers: If True, replace existing layers; if False, add new layers 

1225 transport_mode: ZMQ transport mode (IPC or TCP) 

1226 """ 

1227 # Use a simpler approach: spawn python directly with the napari viewer module 

1228 # This avoids temporary file issues and import problems 

1229 

1230 # Create the command to run the napari viewer directly 

1231 current_dir = os.getcwd() 

1232 python_code = f""" 

1233import sys 

1234import os 

1235 

1236# Detach from parent process group (Unix only) 

1237if hasattr(os, "setsid"): 

1238 try: 

1239 os.setsid() 

1240 except OSError: 

1241 pass 

1242 

1243# Add current working directory to Python path 

1244sys.path.insert(0, {repr(current_dir)}) 

1245 

1246try: 

1247 from openhcs.runtime.napari_stream_visualizer import _napari_viewer_process 

1248 from openhcs.core.config import TransportMode 

1249 transport_mode = TransportMode.{transport_mode.name} 

1250 _napari_viewer_process({port}, {repr(viewer_title)}, {replace_layers}, {repr(current_dir + "/.napari_log_path_placeholder")}, transport_mode) 

1251except Exception as e: 

1252 import logging 

1253 logger = logging.getLogger("openhcs.runtime.napari_detached") 

1254 logger.error(f"Detached napari error: {{e}}") 

1255 import traceback 

1256 logger.error(traceback.format_exc()) 

1257 sys.exit(1) 

1258""" 

1259 

1260 try: 

1261 # Create log file for detached process 

1262 log_dir = os.path.expanduser("~/.local/share/openhcs/logs") 

1263 os.makedirs(log_dir, exist_ok=True) 

1264 log_file = os.path.join(log_dir, f"napari_detached_port_{port}.log") 

1265 

1266 # Replace placeholder with actual log file path in python code 

1267 python_code = python_code.replace( 

1268 repr(current_dir + "/.napari_log_path_placeholder"), repr(log_file) 

1269 ) 

1270 

1271 # Use subprocess.Popen with detachment flags 

1272 if sys.platform == "win32": 

1273 # Windows: Use CREATE_NEW_PROCESS_GROUP to detach but preserve display environment 

1274 env = os.environ.copy() # Preserve environment variables 

1275 with open(log_file, "w") as log_f: 

1276 process = subprocess.Popen( 

1277 [sys.executable, "-c", python_code], 

1278 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP 

1279 | subprocess.DETACHED_PROCESS, 

1280 env=env, 

1281 cwd=os.getcwd(), 

1282 stdout=log_f, 

1283 stderr=subprocess.STDOUT, 

1284 ) 

1285 else: 

1286 # Unix: Use start_new_session to detach but preserve display environment 

1287 env = os.environ.copy() # Preserve DISPLAY and other environment variables 

1288 

1289 # Ensure Qt platform is set for GUI display 

1290 import platform 

1291 

1292 if "QT_QPA_PLATFORM" not in env: 

1293 if platform.system() == "Darwin": # macOS 

1294 env["QT_QPA_PLATFORM"] = "cocoa" 

1295 elif platform.system() == "Linux": 

1296 env["QT_QPA_PLATFORM"] = "xcb" 

1297 env["QT_X11_NO_MITSHM"] = "1" 

1298 # Windows doesn't need QT_QPA_PLATFORM set 

1299 elif platform.system() == "Linux": 

1300 # Ensure Qt can find the display 

1301 env["QT_X11_NO_MITSHM"] = ( 

1302 "1" # Disable shared memory for X11 (helps with some display issues) 

1303 ) 

1304 

1305 # Redirect stdout/stderr to log file for debugging 

1306 log_f = open(log_file, "w") 

1307 process = subprocess.Popen( 

1308 [sys.executable, "-c", python_code], 

1309 env=env, 

1310 cwd=os.getcwd(), 

1311 stdout=log_f, 

1312 stderr=subprocess.STDOUT, 

1313 start_new_session=True, # CRITICAL: Detach from parent process group 

1314 ) 

1315 

1316 logger.info( 

1317 f"🔬 VISUALIZER: Detached napari process started (PID: {process.pid}), logging to {log_file}" 

1318 ) 

1319 return process 

1320 

1321 except Exception as e: 

1322 logger.error(f"🔬 VISUALIZER: Failed to spawn detached napari process: {e}") 

1323 raise e 

1324 

1325 

1326class NapariStreamVisualizer: 

1327 """ 

1328 Manages a Napari viewer instance for real-time visualization of tensors 

1329 streamed from the OpenHCS pipeline. Runs napari in a separate process 

1330 for Qt compatibility and true persistence across pipeline runs. 

1331 """ 

1332 

1333 def __init__( 

1334 self, 

1335 filemanager: FileManager, 

1336 visualizer_config, 

1337 viewer_title: str = "OpenHCS Real-Time Visualization", 

1338 persistent: bool = True, 

1339 port: int = None, 

1340 replace_layers: bool = False, 

1341 display_config=None, 

1342 transport_mode: TransportMode = TransportMode.IPC, 

1343 ): 

1344 self.filemanager = filemanager 

1345 self.viewer_title = viewer_title 

1346 self.persistent = ( 

1347 persistent # If True, viewer process stays alive after pipeline completion 

1348 ) 

1349 self.visualizer_config = visualizer_config 

1350 # Use config class default if not specified 

1351 self.port = ( 

1352 port 

1353 if port is not None 

1354 else NapariStreamingConfig.__dataclass_fields__["port"].default 

1355 ) 

1356 self.replace_layers = ( 

1357 replace_layers # If True, replace existing layers; if False, add new layers 

1358 ) 

1359 self.display_config = display_config # Configuration for display behavior 

1360 self.transport_mode = transport_mode # ZMQ transport mode (IPC or TCP) 

1361 self.process: Optional[multiprocessing.Process] = None 

1362 self.zmq_context: Optional[zmq.Context] = None 

1363 self.zmq_socket: Optional[zmq.Socket] = None 

1364 self._is_running = False # Internal flag, use is_running property instead 

1365 self._connected_to_existing = ( 

1366 False # True if connected to viewer we didn't create 

1367 ) 

1368 self._lock = threading.Lock() 

1369 

1370 # Clause 368: Visualization must be observer-only. 

1371 # This class will only read data and display it. 

1372 

1373 @property 

1374 def is_running(self) -> bool: 

1375 """ 

1376 Check if the napari viewer is actually running. 

1377 

1378 This property checks the actual process state, not just a cached flag. 

1379 Returns True only if the process exists and is alive. 

1380 """ 

1381 if not self._is_running: 

1382 return False 

1383 

1384 # If we connected to an existing viewer, verify it's still responsive 

1385 if self._connected_to_existing: 

1386 # Quick ping check to verify viewer is still alive 

1387 if not self._quick_ping_check(): 

1388 logger.debug( 

1389 f"🔬 VISUALIZER: Connected viewer on port {self.port} is no longer responsive" 

1390 ) 

1391 self._is_running = False 

1392 self._connected_to_existing = False 

1393 return False 

1394 return True 

1395 

1396 if self.process is None: 

1397 self._is_running = False 

1398 return False 

1399 

1400 # Check if process is actually alive 

1401 try: 

1402 if hasattr(self.process, "is_alive"): 

1403 # multiprocessing.Process 

1404 alive = self.process.is_alive() 

1405 else: 

1406 # subprocess.Popen 

1407 alive = self.process.poll() is None 

1408 

1409 if not alive: 

1410 logger.debug( 

1411 f"🔬 VISUALIZER: Napari process on port {self.port} is no longer alive" 

1412 ) 

1413 self._is_running = False 

1414 

1415 return alive 

1416 except Exception as e: 

1417 logger.warning(f"🔬 VISUALIZER: Error checking process status: {e}") 

1418 self._is_running = False 

1419 return False 

1420 

1421 def _quick_ping_check(self) -> bool: 

1422 """Quick ping check to verify viewer is responsive (for connected viewers).""" 

1423 import zmq 

1424 import pickle 

1425 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

1426 

1427 try: 

1428 control_port = self.port + CONTROL_PORT_OFFSET 

1429 control_url = get_zmq_transport_url( 

1430 control_port, self.transport_mode, "localhost" 

1431 ) 

1432 

1433 ctx = zmq.Context() 

1434 sock = ctx.socket(zmq.REQ) 

1435 sock.setsockopt(zmq.LINGER, 0) 

1436 sock.setsockopt(zmq.RCVTIMEO, 200) # 200ms timeout for quick check 

1437 sock.connect(control_url) 

1438 sock.send(pickle.dumps({"type": "ping"})) 

1439 response = pickle.loads(sock.recv()) 

1440 sock.close() 

1441 ctx.term() 

1442 return response.get("type") == "pong" 

1443 except: 

1444 return False 

1445 

1446 def wait_for_ready(self, timeout: float = 10.0) -> bool: 

1447 """ 

1448 Wait for the viewer to be ready to receive images. 

1449 

1450 This method blocks until the viewer is responsive or the timeout expires. 

1451 Should be called after start_viewer() when using async_mode=True. 

1452 

1453 Args: 

1454 timeout: Maximum time to wait in seconds 

1455 

1456 Returns: 

1457 True if viewer is ready, False if timeout 

1458 """ 

1459 return self._wait_for_viewer_ready(timeout=timeout) 

1460 

1461 def _find_free_port(self) -> int: 

1462 """Find a free port for ZeroMQ communication.""" 

1463 import socket 

1464 

1465 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

1466 s.bind(("", 0)) 

1467 return s.getsockname()[1] 

1468 

1469 def start_viewer(self, async_mode: bool = True): 

1470 """ 

1471 Starts the Napari viewer in a separate process. 

1472 

1473 Args: 

1474 async_mode: If True, start viewer asynchronously in background thread. 

1475 If False, wait for viewer to be ready before returning (legacy behavior). 

1476 """ 

1477 if async_mode: 

1478 # Start viewer asynchronously in background thread 

1479 thread = threading.Thread(target=self._start_viewer_sync, daemon=True) 

1480 thread.start() 

1481 logger.info( 

1482 f"🔬 VISUALIZER: Starting napari viewer asynchronously on port {self.port}" 

1483 ) 

1484 else: 

1485 # Legacy synchronous mode 

1486 self._start_viewer_sync() 

1487 

1488 def _start_viewer_sync(self): 

1489 """Internal synchronous viewer startup (called by start_viewer).""" 

1490 global _global_viewer_process, _global_viewer_port 

1491 

1492 with self._lock: 

1493 # Check if there's already a napari viewer running on the configured port 

1494 port_in_use = self._is_port_in_use(self.port) 

1495 logger.info(f"🔬 VISUALIZER: Port {self.port} in use: {port_in_use}") 

1496 

1497 if port_in_use: 

1498 # Try to connect to existing viewer first before killing it 

1499 logger.info( 

1500 f"🔬 VISUALIZER: Port {self.port} is in use, attempting to connect to existing viewer..." 

1501 ) 

1502 if self._try_connect_to_existing_viewer(self.port): 

1503 logger.info( 

1504 f"🔬 VISUALIZER: Successfully connected to existing viewer on port {self.port}" 

1505 ) 

1506 self._is_running = True 

1507 self._connected_to_existing = ( 

1508 True # Mark that we connected to existing viewer 

1509 ) 

1510 return 

1511 else: 

1512 # Existing viewer is unresponsive - kill it and start fresh 

1513 logger.info( 

1514 f"🔬 VISUALIZER: Existing viewer on port {self.port} is unresponsive, killing and restarting..." 

1515 ) 

1516 # Use shared method from ZMQServer ABC 

1517 from openhcs.runtime.zmq_base import ZMQServer 

1518 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

1519 

1520 ZMQServer.kill_processes_on_port(self.port) 

1521 ZMQServer.kill_processes_on_port(self.port + CONTROL_PORT_OFFSET) 

1522 # Wait a moment for ports to be freed 

1523 import time 

1524 

1525 time.sleep(0.5) 

1526 

1527 if self._is_running: 

1528 logger.warning("Napari viewer is already running.") 

1529 return 

1530 

1531 # Port is already set in __init__ 

1532 logger.info( 

1533 f"🔬 VISUALIZER: Starting napari viewer process on port {self.port}" 

1534 ) 

1535 

1536 # ALL viewers (persistent and non-persistent) should be detached subprocess 

1537 # so they don't block parent process exit. The difference is only whether 

1538 # we terminate them during cleanup. 

1539 logger.info( 

1540 f"🔬 VISUALIZER: Creating {'persistent' if self.persistent else 'non-persistent'} napari viewer (detached)" 

1541 ) 

1542 self.process = _spawn_detached_napari_process( 

1543 self.port, self.viewer_title, self.replace_layers, self.transport_mode 

1544 ) 

1545 

1546 # Only track non-persistent viewers in global variable for test cleanup 

1547 if not self.persistent: 

1548 with _global_process_lock: 

1549 _global_viewer_process = self.process 

1550 _global_viewer_port = self.port 

1551 

1552 # Wait for napari viewer to be ready before setting up ZMQ 

1553 self._wait_for_viewer_ready() 

1554 

1555 # Set up ZeroMQ client 

1556 self._setup_zmq_client() 

1557 

1558 # Check if process is running (different methods for subprocess vs multiprocessing) 

1559 if hasattr(self.process, "is_alive"): 

1560 # multiprocessing.Process 

1561 process_alive = self.process.is_alive() 

1562 else: 

1563 # subprocess.Popen 

1564 process_alive = self.process.poll() is None 

1565 

1566 if process_alive: 

1567 self._is_running = True 

1568 logger.info( 

1569 f"🔬 VISUALIZER: Napari viewer process started successfully (PID: {self.process.pid})" 

1570 ) 

1571 else: 

1572 logger.error("🔬 VISUALIZER: Failed to start napari viewer process") 

1573 

1574 def _try_connect_to_existing_viewer(self, port: int) -> bool: 

1575 """ 

1576 Try to connect to an existing napari viewer and verify it's responsive. 

1577 

1578 Returns True only if we can successfully handshake with the viewer. 

1579 """ 

1580 import zmq 

1581 import pickle 

1582 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

1583 

1584 # Try to ping the control port to verify viewer is responsive 

1585 control_port = port + CONTROL_PORT_OFFSET 

1586 control_url = get_zmq_transport_url( 

1587 control_port, self.transport_mode, "localhost" 

1588 ) 

1589 control_context = None 

1590 control_socket = None 

1591 

1592 try: 

1593 control_context = zmq.Context() 

1594 control_socket = control_context.socket(zmq.REQ) 

1595 control_socket.setsockopt(zmq.LINGER, 0) 

1596 control_socket.setsockopt(zmq.RCVTIMEO, 500) # 500ms timeout 

1597 control_socket.connect(control_url) 

1598 

1599 # Send ping 

1600 ping_message = {"type": "ping"} 

1601 control_socket.send(pickle.dumps(ping_message)) 

1602 

1603 # Wait for pong 

1604 response = control_socket.recv() 

1605 response_data = pickle.loads(response) 

1606 

1607 if response_data.get("type") == "pong" and response_data.get("ready"): 

1608 # Viewer is responsive! Set up our ZMQ client 

1609 control_socket.close() 

1610 control_context.term() 

1611 self._setup_zmq_client() 

1612 return True 

1613 else: 

1614 return False 

1615 

1616 except Exception as e: 

1617 logger.debug(f"Failed to connect to existing viewer on port {port}: {e}") 

1618 return False 

1619 finally: 

1620 if control_socket: 

1621 try: 

1622 control_socket.close() 

1623 except: 

1624 pass 

1625 if control_context: 

1626 try: 

1627 control_context.term() 

1628 except: 

1629 pass 

1630 

1631 def _is_port_in_use(self, port: int) -> bool: 

1632 """Check if a port/socket is already in use (indicating existing napari viewer).""" 

1633 if self.transport_mode == TransportMode.IPC: 

1634 # For IPC mode, check if socket file exists 

1635 import platform 

1636 from pathlib import Path 

1637 from openhcs.constants.constants import ( 

1638 IPC_SOCKET_DIR_NAME, 

1639 IPC_SOCKET_PREFIX, 

1640 IPC_SOCKET_EXTENSION, 

1641 ) 

1642 

1643 if platform.system() == "Windows": 

1644 # Windows named pipes - can't easily check existence, so always return False 

1645 # (will rely on ping/pong handshake instead) 

1646 return False 

1647 else: 

1648 # Unix domain sockets - check if socket file exists 

1649 ipc_dir = Path.home() / ".openhcs" / IPC_SOCKET_DIR_NAME 

1650 socket_name = f"{IPC_SOCKET_PREFIX}-{port}{IPC_SOCKET_EXTENSION}" 

1651 socket_path = ipc_dir / socket_name 

1652 return socket_path.exists() 

1653 else: 

1654 # TCP mode - check if port is bound 

1655 import socket 

1656 

1657 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

1658 sock.settimeout(0.1) 

1659 try: 

1660 # Try to bind to the port - if it fails, something is already using it 

1661 sock.bind(("localhost", port)) 

1662 sock.close() 

1663 return False # Port is free 

1664 except OSError: 

1665 # Port is already in use 

1666 sock.close() 

1667 return True 

1668 except Exception: 

1669 return False 

1670 

1671 def _wait_for_viewer_ready(self, timeout: float = 10.0) -> bool: 

1672 """Wait for the napari viewer to be ready using handshake protocol.""" 

1673 import zmq 

1674 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

1675 

1676 logger.info( 

1677 f"🔬 VISUALIZER: Waiting for napari viewer to be ready on port {self.port}..." 

1678 ) 

1679 

1680 control_port = self.port + CONTROL_PORT_OFFSET 

1681 

1682 # First wait for ports to be bound 

1683 start_time = time.time() 

1684 while time.time() - start_time < timeout: 

1685 if self._is_port_in_use(self.port) and self._is_port_in_use(control_port): 

1686 break 

1687 time.sleep(0.2) 

1688 else: 

1689 logger.warning("🔬 VISUALIZER: Timeout waiting for ports to be bound") 

1690 return False 

1691 

1692 # Now use handshake protocol - create fresh socket for each attempt 

1693 control_url = get_zmq_transport_url( 

1694 control_port, self.transport_mode, "localhost" 

1695 ) 

1696 start_time = time.time() 

1697 while time.time() - start_time < timeout: 

1698 control_context = zmq.Context() 

1699 control_socket = control_context.socket(zmq.REQ) 

1700 control_socket.setsockopt(zmq.LINGER, 0) 

1701 control_socket.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout 

1702 

1703 try: 

1704 control_socket.connect(control_url) 

1705 

1706 import pickle 

1707 

1708 ping_message = {"type": "ping"} 

1709 control_socket.send(pickle.dumps(ping_message)) 

1710 

1711 response = control_socket.recv() 

1712 response_data = pickle.loads(response) 

1713 

1714 if response_data.get("type") == "pong" and response_data.get("ready"): 

1715 logger.info( 

1716 f"🔬 VISUALIZER: Napari viewer is ready on port {self.port}" 

1717 ) 

1718 return True 

1719 

1720 except zmq.Again: 

1721 pass # Timeout waiting for response 

1722 except Exception as e: 

1723 logger.debug(f"🔬 VISUALIZER: Handshake attempt failed: {e}") 

1724 finally: 

1725 control_socket.close() 

1726 control_context.term() 

1727 

1728 time.sleep(0.5) # Wait before next ping 

1729 

1730 logger.warning("🔬 VISUALIZER: Timeout waiting for napari viewer handshake") 

1731 return False 

1732 

1733 def _setup_zmq_client(self): 

1734 """Set up ZeroMQ client to send data to viewer process.""" 

1735 if self.port is None: 

1736 raise RuntimeError("Port not set - call start_viewer() first") 

1737 

1738 data_url = get_zmq_transport_url(self.port, self.transport_mode, "localhost") 

1739 

1740 self.zmq_context = zmq.Context() 

1741 self.zmq_socket = self.zmq_context.socket(zmq.PUB) 

1742 self.zmq_socket.connect(data_url) 

1743 

1744 # Brief delay for ZMQ connection to establish 

1745 time.sleep(ZMQ_CONNECTION_DELAY_MS / 1000.0) 

1746 logger.info(f"🔬 VISUALIZER: ZMQ client connected to {data_url}") 

1747 

1748 def send_control_message(self, message_type: str, timeout: float = 2.0) -> bool: 

1749 """ 

1750 Send a control message to the viewer. 

1751 

1752 Args: 

1753 message_type: Type of control message ('clear_state', 'shutdown', etc.) 

1754 timeout: Timeout in seconds for waiting for response 

1755 

1756 Returns: 

1757 True if message was sent and acknowledged, False otherwise 

1758 """ 

1759 if not self.is_running or self.port is None: 

1760 logger.warning( 

1761 f"🔬 VISUALIZER: Cannot send {message_type} - viewer not running" 

1762 ) 

1763 return False 

1764 

1765 import zmq 

1766 import pickle 

1767 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

1768 

1769 control_port = self.port + CONTROL_PORT_OFFSET 

1770 control_url = get_zmq_transport_url( 

1771 control_port, self.transport_mode, "localhost" 

1772 ) 

1773 control_context = None 

1774 control_socket = None 

1775 

1776 try: 

1777 control_context = zmq.Context() 

1778 control_socket = control_context.socket(zmq.REQ) 

1779 control_socket.setsockopt(zmq.LINGER, 0) 

1780 control_socket.setsockopt(zmq.RCVTIMEO, int(timeout * 1000)) 

1781 control_socket.connect(control_url) 

1782 

1783 # Send control message 

1784 message = {"type": message_type} 

1785 control_socket.send(pickle.dumps(message)) 

1786 

1787 # Wait for acknowledgment 

1788 response = control_socket.recv() 

1789 response_data = pickle.loads(response) 

1790 

1791 if response_data.get("status") == "success": 

1792 logger.info(f"🔬 VISUALIZER: {message_type} acknowledged by viewer") 

1793 return True 

1794 else: 

1795 logger.warning(f"🔬 VISUALIZER: {message_type} failed: {response_data}") 

1796 return False 

1797 

1798 except zmq.Again: 

1799 logger.warning( 

1800 f"🔬 VISUALIZER: Timeout waiting for {message_type} acknowledgment" 

1801 ) 

1802 return False 

1803 except Exception as e: 

1804 logger.warning(f"🔬 VISUALIZER: Failed to send {message_type}: {e}") 

1805 return False 

1806 finally: 

1807 if control_socket: 

1808 try: 

1809 control_socket.close() 

1810 except Exception as e: 

1811 logger.debug(f"Failed to close control socket: {e}") 

1812 if control_context: 

1813 try: 

1814 control_context.term() 

1815 except Exception as e: 

1816 logger.debug(f"Failed to terminate control context: {e}") 

1817 

1818 def clear_viewer_state(self) -> bool: 

1819 """ 

1820 Clear accumulated viewer state (component groups) for a new pipeline run. 

1821 

1822 Returns: 

1823 True if state was cleared successfully, False otherwise 

1824 """ 

1825 return self.send_control_message("clear_state") 

1826 

1827 def send_image_data( 

1828 self, step_id: str, image_data: np.ndarray, axis_id: str = "unknown" 

1829 ): 

1830 """ 

1831 DISABLED: This method bypasses component-aware stacking. 

1832 All visualization must go through the streaming backend. 

1833 """ 

1834 raise RuntimeError( 

1835 f"send_image_data() is disabled. Use streaming backend for component-aware display. " 

1836 f"step_id: {step_id}, axis_id: {axis_id}, shape: {image_data.shape}" 

1837 ) 

1838 

1839 def stop_viewer(self): 

1840 """Stop the napari viewer process (only if not persistent).""" 

1841 with self._lock: 

1842 if not self.persistent: 

1843 logger.info("🔬 VISUALIZER: Stopping non-persistent napari viewer") 

1844 self._cleanup_zmq() 

1845 if self.process: 

1846 # Handle both subprocess and multiprocessing process types 

1847 if hasattr(self.process, "is_alive"): 

1848 # multiprocessing.Process 

1849 if self.process.is_alive(): 

1850 self.process.terminate() 

1851 self.process.join(timeout=5) 

1852 if self.process.is_alive(): 

1853 logger.warning( 

1854 "🔬 VISUALIZER: Force killing napari viewer process" 

1855 ) 

1856 self.process.kill() 

1857 else: 

1858 # subprocess.Popen 

1859 if self.process.poll() is None: # Still running 

1860 self.process.terminate() 

1861 try: 

1862 self.process.wait(timeout=5) 

1863 except subprocess.TimeoutExpired: 

1864 logger.warning( 

1865 "🔬 VISUALIZER: Force killing napari viewer process" 

1866 ) 

1867 self.process.kill() 

1868 self._is_running = False 

1869 else: 

1870 logger.info("🔬 VISUALIZER: Keeping persistent napari viewer alive") 

1871 # Just cleanup our ZMQ connection, leave process running 

1872 self._cleanup_zmq() 

1873 # DON'T set is_running = False for persistent viewers! 

1874 # The process is still alive and should be reusable 

1875 

1876 def _cleanup_zmq(self): 

1877 """Clean up ZeroMQ resources.""" 

1878 if self.zmq_socket: 

1879 self.zmq_socket.close() 

1880 self.zmq_socket = None 

1881 if self.zmq_context: 

1882 self.zmq_context.term() 

1883 self.zmq_context = None 

1884 

1885 def visualize_path( 

1886 self, step_id: str, path: str, backend: str, axis_id: Optional[str] = None 

1887 ): 

1888 """ 

1889 DISABLED: This method bypasses component-aware stacking. 

1890 All visualization must go through the streaming backend. 

1891 """ 

1892 raise RuntimeError( 

1893 f"visualize_path() is disabled. Use streaming backend for component-aware display. " 

1894 f"Path: {path}, step_id: {step_id}, axis_id: {axis_id}" 

1895 ) 

1896 

1897 def _prepare_data_for_display( 

1898 self, data: Any, step_id_for_log: str, display_config=None 

1899 ) -> Optional[np.ndarray]: 

1900 """Converts loaded data to a displayable NumPy array (slice or stack based on config).""" 

1901 cpu_tensor: Optional[np.ndarray] = None 

1902 try: 

1903 # GPU to CPU conversion logic 

1904 if hasattr(data, "is_cuda") and data.is_cuda: # PyTorch 

1905 cpu_tensor = data.cpu().numpy() 

1906 elif ( 

1907 hasattr(data, "device") and "cuda" in str(data.device).lower() 

1908 ): # Check for device attribute 

1909 if hasattr(data, "get"): # CuPy 

1910 cpu_tensor = data.get() 

1911 elif hasattr( 

1912 data, "numpy" 

1913 ): # JAX on GPU might have .numpy() after host transfer 

1914 cpu_tensor = np.asarray( 

1915 data 

1916 ) # JAX arrays might need explicit conversion 

1917 else: # Fallback for other GPU array types if possible 

1918 logger.warning( 

1919 f"Unknown GPU array type for step '{step_id_for_log}'. Attempting .numpy()." 

1920 ) 

1921 if hasattr(data, "numpy"): 

1922 cpu_tensor = data.numpy() 

1923 else: 

1924 logger.error( 

1925 f"Cannot convert GPU tensor of type {type(data)} for step '{step_id_for_log}'." 

1926 ) 

1927 return None 

1928 elif isinstance(data, np.ndarray): 

1929 cpu_tensor = data 

1930 else: 

1931 # Attempt to convert to numpy array if it's some other array-like structure 

1932 try: 

1933 cpu_tensor = np.asarray(data) 

1934 logger.debug( 

1935 f"Converted data of type {type(data)} to numpy array for step '{step_id_for_log}'." 

1936 ) 

1937 except Exception as e_conv: 

1938 logger.warning( 

1939 f"Unsupported data type for step '{step_id_for_log}': {type(data)}. Error: {e_conv}" 

1940 ) 

1941 return None 

1942 

1943 if cpu_tensor is None: # Should not happen if logic above is correct 

1944 return None 

1945 

1946 # Determine display mode based on configuration 

1947 # Default behavior: show as stack unless config specifies otherwise 

1948 should_slice = False 

1949 

1950 if display_config: 

1951 # Check if any component mode is set to SLICE 

1952 from openhcs.core.config import NapariDimensionMode 

1953 from openhcs.constants import AllComponents 

1954 

1955 # Check individual component mode fields for all dimensions 

1956 for component in AllComponents: 

1957 field_name = f"{component.value}_mode" 

1958 if hasattr(display_config, field_name): 

1959 mode = getattr(display_config, field_name) 

1960 if mode == NapariDimensionMode.SLICE: 

1961 should_slice = True 

1962 break 

1963 else: 

1964 # Default: slice for backward compatibility 

1965 should_slice = True 

1966 

1967 # Slicing/stacking logic 

1968 display_data: Optional[np.ndarray] = None 

1969 

1970 if should_slice: 

1971 # Original slicing behavior 

1972 if cpu_tensor.ndim == 3: # ZYX 

1973 display_data = cpu_tensor[cpu_tensor.shape[0] // 2, :, :] 

1974 elif cpu_tensor.ndim == 2: # YX 

1975 display_data = cpu_tensor 

1976 elif cpu_tensor.ndim > 3: # e.g. CZYX or TZYX 

1977 logger.debug( 

1978 f"Tensor for step '{step_id_for_log}' has ndim > 3 ({cpu_tensor.ndim}). Taking a slice." 

1979 ) 

1980 slicer = [0] * (cpu_tensor.ndim - 2) # Slice first channels/times 

1981 slicer[-1] = cpu_tensor.shape[-3] // 2 # Middle Z 

1982 try: 

1983 display_data = cpu_tensor[tuple(slicer)] 

1984 except ( 

1985 IndexError 

1986 ): # Handle cases where slicing might fail (e.g. very small dimensions) 

1987 logger.error( 

1988 f"Slicing failed for tensor with shape {cpu_tensor.shape} for step '{step_id_for_log}'.", 

1989 exc_info=True, 

1990 ) 

1991 display_data = None 

1992 else: 

1993 logger.warning( 

1994 f"Tensor for step '{step_id_for_log}' has unsupported ndim for slicing: {cpu_tensor.ndim}." 

1995 ) 

1996 return None 

1997 else: 

1998 # Stack mode: send the full data to napari (napari can handle 3D+ data) 

1999 if cpu_tensor.ndim >= 2: 

2000 display_data = cpu_tensor 

2001 logger.debug( 

2002 f"Sending {cpu_tensor.ndim}D stack to napari for step '{step_id_for_log}' (shape: {cpu_tensor.shape})" 

2003 ) 

2004 else: 

2005 logger.warning( 

2006 f"Tensor for step '{step_id_for_log}' has unsupported ndim for stacking: {cpu_tensor.ndim}." 

2007 ) 

2008 return None 

2009 

2010 return display_data.copy() if display_data is not None else None 

2011 

2012 except Exception as e: 

2013 logger.error( 

2014 f"Error preparing data from step '{step_id_for_log}' for display: {e}", 

2015 exc_info=True, 

2016 ) 

2017 return None