Coverage for openhcs/runtime/napari_stream_visualizer.py: 1.3%

531 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2Napari-based real-time visualization module for OpenHCS. 

3 

4This module provides the NapariStreamVisualizer class for real-time 

5visualization of tensors during pipeline execution. 

6 

7Doctrinal Clauses: 

8- Clause 65 — No Fallback Logic 

9- Clause 66 — Immutability After Construction 

10- Clause 88 — No Inferred Capabilities 

11- Clause 368 — Visualization Must Be Observer-Only 

12""" 

13 

14import logging 

15import multiprocessing 

16import os 

17import queue 

18import subprocess 

19import sys 

20import threading 

21import time 

22import zmq 

23import numpy as np 

24from typing import Any, Dict, Optional 

25 

26from openhcs.io.filemanager import FileManager 

27from openhcs.utils.import_utils import optional_import 

28from openhcs.constants.constants import DEFAULT_NAPARI_STREAM_PORT 

29 

30# Optional napari import - this module should only be imported if napari is available 

31napari = optional_import("napari") 

32if napari is None: 

33 raise ImportError( 

34 "napari is required for NapariStreamVisualizer. " 

35 "Install it with: pip install 'openhcs[viz]' or pip install napari" 

36 ) 

37 

38import numpy as np 

39 

40logger = logging.getLogger(__name__) 

41 

42# Global process management for napari viewer 

43_global_viewer_process: Optional[multiprocessing.Process] = None 

44_global_viewer_port: Optional[int] = None 

45_global_process_lock = threading.Lock() 

46 

47 

48def _cleanup_global_viewer() -> None: 

49 """ 

50 Clean up global napari viewer process for test mode. 

51 

52 This forcibly terminates the napari viewer process to allow pytest to exit. 

53 Should only be called in test mode. 

54 """ 

55 global _global_viewer_process 

56 

57 with _global_process_lock: 

58 if _global_viewer_process and _global_viewer_process.is_alive(): 

59 logger.info("🔬 VISUALIZER: Terminating napari viewer for test cleanup") 

60 _global_viewer_process.terminate() 

61 _global_viewer_process.join(timeout=3) 

62 

63 if _global_viewer_process.is_alive(): 

64 logger.warning("🔬 VISUALIZER: Force killing napari viewer process") 

65 _global_viewer_process.kill() 

66 _global_viewer_process.join(timeout=1) 

67 

68 _global_viewer_process = None 

69 

70 

71def _parse_component_info_from_path(path_str: str): 

72 """ 

73 Fallback component parsing from path (used when component metadata unavailable). 

74 

75 Args: 

76 path_str: Path string like 'step_name/A01/s1_c2_z3.tif' 

77 

78 Returns: 

79 Dict with basic component info extracted from filename 

80 """ 

81 try: 

82 import os 

83 import re 

84 filename = os.path.basename(path_str) 

85 

86 # Basic regex for common patterns 

87 pattern = r'(?:s(\d+))?(?:_c(\d+))?(?:_z(\d+))?' 

88 match = re.search(pattern, filename) 

89 

90 components = {} 

91 if match: 

92 site, channel, z_index = match.groups() 

93 if site: 

94 components['site'] = site 

95 if channel: 

96 components['channel'] = channel 

97 if z_index: 

98 components['z_index'] = z_index 

99 

100 return components 

101 except Exception: 

102 return {} 

103 

104 

105def _handle_component_aware_display(viewer, layers, component_groups, image_data, path, 

106 colormap, display_config, replace_layers, component_metadata=None): 

107 """ 

108 Handle component-aware display following OpenHCS stacking patterns. 

109 

110 Creates separate layers for each step and well, with proper component-based stacking. 

111 Each step gets its own layer. Each well gets its own layer. Components marked as 

112 SLICE create separate layers, components marked as STACK are stacked together. 

113 """ 

114 try: 

115 # Use component metadata from ZMQ message - fail loud if not available 

116 if not component_metadata: 

117 raise ValueError(f"No component metadata available for path: {path}") 

118 component_info = component_metadata 

119 

120 # Get step information from component metadata 

121 step_index = component_info.get('step_index', 0) 

122 step_name = component_info.get('step_name', 'unknown_step') 

123 

124 # Create step prefix for layer naming 

125 step_prefix = f"step_{step_index:02d}_{step_name}" 

126 

127 # Get well identifier (configurable: slice vs stack) 

128 well_id = component_info.get('well', 'unknown_well') 

129 

130 # Build component_modes from config (dict or object), default to channel=slice, others=stack 

131 component_modes = None 

132 if isinstance(display_config, dict): 

133 cm = display_config.get('component_modes') or display_config.get('componentModes') 

134 if isinstance(cm, dict) and cm: 

135 component_modes = cm 

136 if component_modes is None: 

137 # Handle object-like config (NapariDisplayConfig) 

138 component_modes = {} 

139 for component in ['site', 'channel', 'z_index', 'well']: 

140 mode_field = f"{component}_mode" 

141 if hasattr(display_config, mode_field): 

142 mode_value = getattr(display_config, mode_field) 

143 component_modes[component] = getattr(mode_value, 'value', str(mode_value)) 

144 else: 

145 component_modes[component] = 'slice' if component == 'channel' else 'stack' 

146 

147 # Create layer grouping key: step_prefix + optionally well + slice_components 

148 # Each unique combination gets its own layer 

149 layer_key_parts = [step_prefix] 

150 if component_modes.get('well', 'stack') == 'slice': 

151 layer_key_parts.append(well_id) 

152 

153 # Add slice components to layer key (these create separate layers) 

154 for component_name, mode in component_modes.items(): 

155 if mode == 'slice' and component_name in component_info and component_name != 'well': 

156 layer_key_parts.append(f"{component_name}_{component_info[component_name]}") 

157 

158 layer_key = "_".join(layer_key_parts) 

159 

160 # Reconcile cached layer/group state with live napari viewer after possible manual deletions 

161 try: 

162 current_layer_names = {l.name for l in viewer.layers} 

163 if layer_key not in current_layer_names: 

164 # Drop any stale references so we will recreate the layer 

165 layers.pop(layer_key, None) 

166 component_groups.pop(layer_key, None) 

167 logger.debug(f"🔬 NAPARI PROCESS: Reconciling state — '{layer_key}' not in viewer; purged stale caches") 

168 except Exception: 

169 # Fail-loud elsewhere; reconciliation is best-effort and must not mask display 

170 pass 

171 

172 # Initialize layer group if needed 

173 if layer_key not in component_groups: 

174 component_groups[layer_key] = [] 

175 

176 # Add image to layer group 

177 component_groups[layer_key].append({ 

178 'data': image_data, 

179 'components': component_info, 

180 'path': str(path) 

181 }) 

182 

183 # Get all images for this layer 

184 layer_images = component_groups[layer_key] 

185 

186 # Determine if we should stack or use single image 

187 stack_components = [comp for comp, mode in component_modes.items() 

188 if mode == 'stack' and comp in component_info] 

189 

190 if len(layer_images) == 1: 

191 # Single image - add directly 

192 layer_name = layer_key 

193 

194 # Check if layer exists in actual napari viewer 

195 existing_layer = None 

196 for layer in viewer.layers: 

197 if layer.name == layer_name: 

198 existing_layer = layer 

199 break 

200 

201 if existing_layer is not None: 

202 # Update existing layer 

203 existing_layer.data = image_data 

204 layers[layer_name] = existing_layer 

205 logger.info(f"🔬 NAPARI PROCESS: Updated existing layer {layer_name}") 

206 else: 

207 # Create new layer - this should always work for new names 

208 logger.info(f"🔬 NAPARI PROCESS: Creating new layer {layer_name}, viewer has {len(viewer.layers)} existing layers") 

209 logger.info(f"🔬 NAPARI PROCESS: Existing layer names: {[layer.name for layer in viewer.layers]}") 

210 logger.info(f"🔬 NAPARI PROCESS: Image data shape: {image_data.shape}, dtype: {image_data.dtype}") 

211 logger.info(f"🔬 NAPARI PROCESS: Viewer type: {type(viewer)}") 

212 try: 

213 new_layer = viewer.add_image(image_data, name=layer_name, colormap=colormap) 

214 layers[layer_name] = new_layer 

215 logger.info(f"🔬 NAPARI PROCESS: Successfully created new layer {layer_name}") 

216 logger.info(f"🔬 NAPARI PROCESS: Viewer now has {len(viewer.layers)} layers") 

217 except Exception as e: 

218 logger.error(f"🔬 NAPARI PROCESS: FAILED to create layer {layer_name}: {e}") 

219 logger.error(f"🔬 NAPARI PROCESS: Exception type: {type(e)}") 

220 import traceback 

221 logger.error(f"🔬 NAPARI PROCESS: Traceback: {traceback.format_exc()}") 

222 raise 

223 else: 

224 # Multiple images - create multi-dimensional array for napari 

225 try: 

226 # Sort images by stack components for consistent ordering 

227 if stack_components: 

228 def sort_key(img_info): 

229 return tuple(img_info['components'].get(comp, 0) for comp in stack_components) 

230 layer_images.sort(key=sort_key) 

231 

232 # Group images by stack component values to create proper dimensions 

233 if len(stack_components) == 1: 

234 # Single stack component - simple 3D stack 

235 image_stack = [img['data'] for img in layer_images] 

236 from openhcs.core.memory.stack_utils import stack_slices 

237 stacked_data = stack_slices(image_stack, memory_type='numpy', gpu_id=0) 

238 else: 

239 # Multiple stack components - create multi-dimensional array 

240 # Get unique values for each stack component 

241 component_values = {} 

242 for comp in stack_components: 

243 values = sorted(set(img['components'].get(comp, 0) for img in layer_images)) 

244 component_values[comp] = values 

245 

246 # Create multi-dimensional array 

247 # Shape: (comp1_size, comp2_size, ..., height, width) 

248 shape_dims = [len(component_values[comp]) for comp in stack_components] 

249 first_img = layer_images[0]['data'] 

250 full_shape = tuple(shape_dims + list(first_img.shape)) 

251 stacked_data = np.zeros(full_shape, dtype=first_img.dtype) 

252 

253 # Fill the multi-dimensional array 

254 for img_info in layer_images: 

255 # Get indices for this image 

256 indices = [] 

257 for comp in stack_components: 

258 comp_value = img_info['components'].get(comp, 0) 

259 comp_index = component_values[comp].index(comp_value) 

260 indices.append(comp_index) 

261 

262 # Place image in the correct position 

263 stacked_data[tuple(indices)] = img_info['data'] 

264 

265 # Update or create stack layer 

266 layer_name = layer_key 

267 

268 # Check if layer exists in actual napari viewer 

269 existing_layer = None 

270 for layer in viewer.layers: 

271 if layer.name == layer_name: 

272 existing_layer = layer 

273 break 

274 

275 if existing_layer is not None: 

276 # Update existing layer 

277 existing_layer.data = stacked_data 

278 layers[layer_name] = existing_layer 

279 logger.info(f"🔬 NAPARI PROCESS: Updated existing stack layer {layer_name} (shape: {stacked_data.shape})") 

280 else: 

281 # Create new layer - this should always work for new names 

282 logger.info(f"🔬 NAPARI PROCESS: Creating new stack layer {layer_name}, viewer has {len(viewer.layers)} existing layers") 

283 try: 

284 new_layer = viewer.add_image(stacked_data, name=layer_name, colormap=colormap) 

285 layers[layer_name] = new_layer 

286 logger.info(f"🔬 NAPARI PROCESS: Successfully created new stack layer {layer_name} (shape: {stacked_data.shape})") 

287 except Exception as e: 

288 logger.error(f"🔬 NAPARI PROCESS: FAILED to create stack layer {layer_name}: {e}") 

289 import traceback 

290 logger.error(f"🔬 NAPARI PROCESS: Traceback: {traceback.format_exc()}") 

291 raise 

292 

293 except Exception as e: 

294 logger.error(f"🔬 NAPARI PROCESS: Failed to create stack for layer {layer_key}: {e}") 

295 import traceback 

296 logger.error(f"🔬 NAPARI PROCESS: Traceback: {traceback.format_exc()}") 

297 raise 

298 

299 except Exception as e: 

300 import traceback 

301 logger.error(f"🔬 NAPARI PROCESS: Component-aware display failed for {path}: {e}") 

302 logger.error(f"🔬 NAPARI PROCESS: Component-aware display traceback: {traceback.format_exc()}") 

303 raise # Fail loud - no fallback 

304 

305 

306def _napari_viewer_process(port: int, viewer_title: str, replace_layers: bool = False): 

307 """ 

308 Napari viewer process entry point. Runs in a separate process. 

309 Listens for ZeroMQ messages with image data to display. 

310 

311 Args: 

312 port: ZMQ port to listen on 

313 viewer_title: Title for the napari viewer window 

314 replace_layers: If True, replace existing layers; if False, add new layers with unique names 

315 """ 

316 try: 

317 import zmq 

318 import napari 

319 import numpy as np 

320 import pickle 

321 

322 # Set up ZeroMQ communication 

323 context = zmq.Context() 

324 

325 # Data channel: SUB socket for receiving images 

326 data_socket = context.socket(zmq.SUB) 

327 data_socket.bind(f"tcp://*:{port}") 

328 data_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages 

329 

330 # Control channel: REP socket for handshake 

331 control_port = port + 1000 # Use port+1000 for control 

332 control_socket = context.socket(zmq.REP) 

333 control_socket.bind(f"tcp://*:{control_port}") 

334 

335 # Create napari viewer in this process (main thread) 

336 viewer = napari.Viewer(title=viewer_title, show=True) 

337 

338 # Initialize layers dictionary with existing layers (for reconnection scenarios) 

339 layers = {} 

340 for layer in viewer.layers: 

341 layers[layer.name] = layer 

342 

343 napari_ready = False # Track readiness state 

344 

345 # Component grouping for stacking (following OpenHCS pattern) 

346 component_groups = {} # {component_type: {group_key: [images]}} 

347 display_config = None # Store display config for component mode decisions 

348 

349 logger.info(f"🔬 NAPARI PROCESS: Viewer started on data port {port}, control port {control_port}") 

350 

351 # Add cleanup handler for when viewer is closed 

352 def cleanup_and_exit(): 

353 logger.info("🔬 NAPARI PROCESS: Viewer closed, cleaning up and exiting...") 

354 try: 

355 data_socket.close() 

356 control_socket.close() 

357 context.term() 

358 except: 

359 pass 

360 sys.exit(0) 

361 

362 # Connect the viewer close event to cleanup 

363 viewer.window.qt_viewer.destroyed.connect(cleanup_and_exit) 

364 

365 # Use proper Qt event loop integration 

366 import sys 

367 from qtpy import QtWidgets, QtCore 

368 

369 # Ensure Qt platform is properly set for detached processes 

370 import os 

371 if 'QT_QPA_PLATFORM' not in os.environ: 

372 os.environ['QT_QPA_PLATFORM'] = 'xcb' 

373 

374 # Disable shared memory for X11 (helps with display issues in detached processes) 

375 os.environ['QT_X11_NO_MITSHM'] = '1' 

376 

377 # Get the Qt application 

378 app = QtWidgets.QApplication.instance() 

379 if app is None: 

380 app = QtWidgets.QApplication(sys.argv) 

381 

382 # Ensure the application DOES quit when the napari window closes 

383 app.setQuitOnLastWindowClosed(True) 

384 

385 # Set up a QTimer for message processing 

386 timer = QtCore.QTimer() 

387 

388 def process_messages(): 

389 nonlocal napari_ready 

390 # Handle control messages (handshake) first 

391 try: 

392 control_message = control_socket.recv(zmq.NOBLOCK) 

393 control_data = pickle.loads(control_message) 

394 

395 if control_data.get('type') == 'ping': 

396 # Client is checking if we're ready 

397 if not napari_ready: 

398 # Mark as ready after first ping (GUI should be loaded by now) 

399 napari_ready = True 

400 logger.info(f"🔬 NAPARI PROCESS: Marked as ready after ping") 

401 

402 response = {'type': 'pong', 'ready': napari_ready} 

403 control_socket.send(pickle.dumps(response)) 

404 logger.debug(f"🔬 NAPARI PROCESS: Responded to ping with ready={napari_ready}") 

405 

406 except zmq.Again: 

407 pass # No control messages 

408 

409 # Debug: Print current layer count (only when layers change) 

410 # Removed continuous debug printing to avoid terminal spam 

411 

412 # Process data messages only if ready 

413 if napari_ready: 

414 # Process multiple messages per timer tick for better throughput 

415 for _ in range(10): # Process up to 10 messages per tick 

416 try: 

417 message = data_socket.recv(zmq.NOBLOCK) 

418 

419 # Try to parse as JSON first (from NapariStreamingBackend) 

420 import json 

421 data = json.loads(message.decode('utf-8')) 

422 

423 # Check if this is a batch message 

424 if data.get('type') == 'batch': 

425 # Handle batch of images 

426 images = data.get('images', []) 

427 display_config_dict = data.get('display_config') 

428 

429 # Use display_config_dict directly - no need to store globally 

430 

431 # Process all images in the batch together 

432 for image_info in images: 

433 path = image_info.get('path', 'unknown') 

434 shape = image_info.get('shape') 

435 dtype = image_info.get('dtype') 

436 shm_name = image_info.get('shm_name') 

437 component_metadata = image_info.get('component_metadata', {}) 

438 

439 # Add step information to component metadata 

440 component_metadata['step_index'] = image_info.get('step_index', 0) 

441 component_metadata['step_name'] = image_info.get('step_name', 'unknown_step') 

442 

443 # Load from shared memory 

444 from multiprocessing import shared_memory 

445 shm = shared_memory.SharedMemory(name=shm_name) 

446 image_data = np.ndarray(shape, dtype=dtype, buffer=shm.buf).copy() 

447 shm.close() # Close our reference 

448 

449 # Extract colormap from display config 

450 colormap = 'viridis' # Default 

451 if display_config_dict and 'colormap' in display_config_dict: 

452 colormap = display_config_dict['colormap'] 

453 

454 # Component-aware layer management (following OpenHCS pattern) 

455 _handle_component_aware_display( 

456 viewer, layers, component_groups, image_data, path, 

457 colormap, display_config_dict or {}, replace_layers, component_metadata 

458 ) 

459 else: 

460 # Handle single image format 

461 path = data.get('path', 'unknown') 

462 shape = data.get('shape') 

463 dtype = data.get('dtype') 

464 shm_name = data.get('shm_name') 

465 direct_data = data.get('data') 

466 display_config_dict = data.get('display_config') 

467 component_metadata = data.get('component_metadata', {}) 

468 

469 # Add step information to component metadata 

470 component_metadata['step_index'] = data.get('step_index', 0) 

471 component_metadata['step_name'] = data.get('step_name', 'unknown_step') 

472 

473 # Use display_config_dict directly - no need to store globally 

474 

475 if shm_name: 

476 # Load from shared memory 

477 from multiprocessing import shared_memory 

478 shm = shared_memory.SharedMemory(name=shm_name) 

479 image_data = np.ndarray(shape, dtype=dtype, buffer=shm.buf).copy() 

480 shm.close() # Close our reference 

481 elif direct_data: 

482 # Load from direct data (fallback) 

483 image_data = np.array(direct_data, dtype=dtype).reshape(shape) 

484 else: 

485 logger.warning(f"🔬 NAPARI PROCESS: No image data in message") 

486 continue 

487 

488 # Extract colormap from display config 

489 colormap = 'viridis' # Default 

490 if display_config_dict and 'colormap' in display_config_dict: 

491 colormap = display_config_dict['colormap'] 

492 

493 # Component-aware layer management (following OpenHCS pattern) 

494 _handle_component_aware_display( 

495 viewer, layers, component_groups, image_data, path, 

496 colormap, display_config_dict or {}, replace_layers, component_metadata 

497 ) 

498 

499 

500 

501 except zmq.Again: 

502 # No more messages available 

503 break 

504 

505 

506 

507 # Connect timer to message processing 

508 timer.timeout.connect(process_messages) 

509 timer.start(50) # Process messages every 50ms 

510 

511 logger.info("🔬 NAPARI PROCESS: Starting Qt event loop") 

512 

513 # Run the Qt event loop - this keeps napari responsive 

514 app.exec_() 

515 

516 except Exception as e: 

517 logger.error(f"🔬 NAPARI PROCESS: Fatal error: {e}") 

518 finally: 

519 logger.info("🔬 NAPARI PROCESS: Shutting down") 

520 if 'socket' in locals(): 

521 socket.close() 

522 if 'context' in locals(): 

523 context.term() 

524 

525 

526def _spawn_detached_napari_process(port: int, viewer_title: str, replace_layers: bool = False) -> subprocess.Popen: 

527 """ 

528 Spawn a completely detached napari viewer process that survives parent termination. 

529 

530 This creates a subprocess that runs independently and won't be terminated when 

531 the parent process exits, enabling true persistence across pipeline runs. 

532 """ 

533 # Use a simpler approach: spawn python directly with the napari viewer module 

534 # This avoids temporary file issues and import problems 

535 

536 # Create the command to run the napari viewer directly 

537 current_dir = os.getcwd() 

538 python_code = f''' 

539import sys 

540import os 

541 

542# Detach from parent process group (Unix only) 

543if hasattr(os, "setsid"): 

544 try: 

545 os.setsid() 

546 except OSError: 

547 pass 

548 

549# Add current working directory to Python path 

550sys.path.insert(0, "{current_dir}") 

551 

552try: 

553 from openhcs.runtime.napari_stream_visualizer import _napari_viewer_process 

554 _napari_viewer_process({port}, "{viewer_title}", {replace_layers}) 

555except Exception as e: 

556 import logging 

557 logger = logging.getLogger("openhcs.runtime.napari_detached") 

558 logger.error(f"Detached napari error: {{e}}") 

559 import traceback 

560 logger.error(traceback.format_exc()) 

561 sys.exit(1) 

562''' 

563 

564 try: 

565 # Use subprocess.Popen with detachment flags 

566 if sys.platform == "win32": 

567 # Windows: Use CREATE_NEW_PROCESS_GROUP to detach but preserve display environment 

568 env = os.environ.copy() # Preserve environment variables 

569 process = subprocess.Popen( 

570 [sys.executable, "-c", python_code], 

571 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.DETACHED_PROCESS, 

572 env=env, 

573 cwd=os.getcwd() 

574 # Don't redirect stdout/stderr to allow GUI to display properly 

575 ) 

576 else: 

577 # Unix: Use start_new_session to detach but preserve display environment 

578 env = os.environ.copy() # Preserve DISPLAY and other environment variables 

579 

580 # Ensure Qt platform is set for GUI display 

581 if 'QT_QPA_PLATFORM' not in env: 

582 env['QT_QPA_PLATFORM'] = 'xcb' # Use X11 backend 

583 

584 # Ensure Qt can find the display 

585 env['QT_X11_NO_MITSHM'] = '1' # Disable shared memory for X11 (helps with some display issues) 

586 

587 # Try without start_new_session to see if GUI displays properly 

588 process = subprocess.Popen( 

589 [sys.executable, "-c", python_code], 

590 env=env, 

591 cwd=os.getcwd() 

592 # Don't redirect stdout/stderr to allow GUI to display properly 

593 ) 

594 

595 logger.info(f"🔬 VISUALIZER: Detached napari process started (PID: {process.pid})") 

596 return process 

597 

598 except Exception as e: 

599 logger.error(f"🔬 VISUALIZER: Failed to spawn detached napari process: {e}") 

600 raise e 

601 

602 

603class NapariStreamVisualizer: 

604 """ 

605 Manages a Napari viewer instance for real-time visualization of tensors 

606 streamed from the OpenHCS pipeline. Runs napari in a separate process 

607 for Qt compatibility and true persistence across pipeline runs. 

608 """ 

609 

610 def __init__(self, filemanager: FileManager, visualizer_config, viewer_title: str = "OpenHCS Real-Time Visualization", persistent: bool = True, napari_port: int = DEFAULT_NAPARI_STREAM_PORT, replace_layers: bool = False, display_config=None): 

611 self.filemanager = filemanager 

612 self.viewer_title = viewer_title 

613 self.persistent = persistent # If True, viewer process stays alive after pipeline completion 

614 self.visualizer_config = visualizer_config 

615 self.napari_port = napari_port # Port for napari streaming 

616 self.replace_layers = replace_layers # If True, replace existing layers; if False, add new layers 

617 self.display_config = display_config # Configuration for display behavior 

618 self.port: Optional[int] = None 

619 self.process: Optional[multiprocessing.Process] = None 

620 self.zmq_context: Optional[zmq.Context] = None 

621 self.zmq_socket: Optional[zmq.Socket] = None 

622 self.is_running = False 

623 self._lock = threading.Lock() 

624 

625 # Clause 368: Visualization must be observer-only. 

626 # This class will only read data and display it. 

627 

628 def _find_free_port(self) -> int: 

629 """Find a free port for ZeroMQ communication.""" 

630 import socket 

631 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

632 s.bind(('', 0)) 

633 return s.getsockname()[1] 

634 

635 def start_viewer(self): 

636 """Starts the Napari viewer in a separate process.""" 

637 global _global_viewer_process, _global_viewer_port 

638 

639 with self._lock: 

640 # Check if there's already a napari viewer running on the configured port 

641 port_in_use = self._is_port_in_use(self.napari_port) 

642 logger.info(f"🔬 VISUALIZER: Port {self.napari_port} in use: {port_in_use}") 

643 if port_in_use: 

644 logger.info(f"🔬 VISUALIZER: Reusing existing napari viewer on port {self.napari_port}") 

645 # Set the port and connect to existing viewer 

646 self.port = self.napari_port 

647 

648 # Check if we have a reference to the global process 

649 with _global_process_lock: 

650 if _global_viewer_process and _global_viewer_port == self.napari_port: 

651 self.process = _global_viewer_process 

652 logger.info(f"🔬 VISUALIZER: Found global process reference (PID: {self.process.pid})") 

653 else: 

654 self.process = None # External process we don't control 

655 logger.info("🔬 VISUALIZER: No global process reference (external viewer)") 

656 

657 self._setup_zmq_client() 

658 self.is_running = True 

659 return 

660 

661 if self.is_running: 

662 logger.warning("Napari viewer is already running.") 

663 return 

664 

665 # Use configured port for napari streaming 

666 self.port = self.napari_port 

667 logger.info(f"🔬 VISUALIZER: Starting napari viewer process on port {self.port}") 

668 

669 if self.persistent: 

670 # For persistent viewers, use detached subprocess that truly survives parent termination 

671 logger.info("🔬 VISUALIZER: Creating detached persistent napari viewer") 

672 self.process = _spawn_detached_napari_process(self.port, self.viewer_title, self.replace_layers) 

673 else: 

674 # For non-persistent viewers, use multiprocessing.Process 

675 logger.info("🔬 VISUALIZER: Creating non-persistent napari viewer") 

676 self.process = multiprocessing.Process( 

677 target=_napari_viewer_process, 

678 args=(self.port, self.viewer_title, self.replace_layers), 

679 daemon=False 

680 ) 

681 self.process.start() 

682 

683 # Update global references 

684 with _global_process_lock: 

685 _global_viewer_process = self.process 

686 _global_viewer_port = self.port 

687 

688 # Wait for napari viewer to be ready before setting up ZMQ 

689 self._wait_for_viewer_ready() 

690 

691 # Set up ZeroMQ client 

692 self._setup_zmq_client() 

693 

694 # Check if process is running (different methods for subprocess vs multiprocessing) 

695 if hasattr(self.process, 'is_alive'): 

696 # multiprocessing.Process 

697 process_alive = self.process.is_alive() 

698 else: 

699 # subprocess.Popen 

700 process_alive = self.process.poll() is None 

701 

702 if process_alive: 

703 self.is_running = True 

704 logger.info(f"🔬 VISUALIZER: Napari viewer process started successfully (PID: {self.process.pid})") 

705 else: 

706 logger.error("🔬 VISUALIZER: Failed to start napari viewer process") 

707 

708 def _try_connect_to_existing_viewer(self, port: int) -> bool: 

709 """Try to connect to an existing napari viewer on the given port.""" 

710 import socket 

711 

712 # First check if anything is listening on the port 

713 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

714 sock.settimeout(0.1) # 100ms timeout 

715 try: 

716 result = sock.connect_ex(('localhost', port)) 

717 sock.close() 

718 

719 if result == 0: # Port is open 

720 # Set up ZMQ connection 

721 self._setup_zmq_client() 

722 return True 

723 else: 

724 return False 

725 

726 except Exception: 

727 return False 

728 

729 def _is_port_in_use(self, port: int) -> bool: 

730 """Check if a port is already in use (indicating existing napari viewer).""" 

731 import socket 

732 

733 # Check if any process is listening on this port 

734 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

735 sock.settimeout(0.1) 

736 try: 

737 # Try to bind to the port - if it fails, something is already using it 

738 sock.bind(('localhost', port)) 

739 sock.close() 

740 return False # Port is free 

741 except OSError: 

742 # Port is already in use 

743 sock.close() 

744 return True 

745 except Exception: 

746 return False 

747 

748 def _wait_for_viewer_ready(self, timeout: float = 10.0) -> bool: 

749 """Wait for the napari viewer to be ready using handshake protocol.""" 

750 import zmq 

751 

752 logger.info(f"🔬 VISUALIZER: Waiting for napari viewer to be ready on port {self.port}...") 

753 

754 # First wait for ports to be bound 

755 start_time = time.time() 

756 while time.time() - start_time < timeout: 

757 if self._is_port_in_use(self.port) and self._is_port_in_use(self.port + 1000): 

758 break 

759 time.sleep(0.2) 

760 else: 

761 logger.warning(f"🔬 VISUALIZER: Timeout waiting for ports to be bound") 

762 return False 

763 

764 # Now use handshake protocol - create fresh socket for each attempt 

765 start_time = time.time() 

766 while time.time() - start_time < timeout: 

767 control_context = zmq.Context() 

768 control_socket = control_context.socket(zmq.REQ) 

769 control_socket.setsockopt(zmq.LINGER, 0) 

770 control_socket.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout 

771 

772 try: 

773 control_socket.connect(f"tcp://localhost:{self.port + 1000}") 

774 

775 import pickle 

776 ping_message = {'type': 'ping'} 

777 control_socket.send(pickle.dumps(ping_message)) 

778 

779 response = control_socket.recv() 

780 response_data = pickle.loads(response) 

781 

782 if response_data.get('type') == 'pong' and response_data.get('ready'): 

783 logger.info(f"🔬 VISUALIZER: Napari viewer is ready on port {self.port}") 

784 return True 

785 

786 except zmq.Again: 

787 pass # Timeout waiting for response 

788 except Exception as e: 

789 logger.debug(f"🔬 VISUALIZER: Handshake attempt failed: {e}") 

790 finally: 

791 control_socket.close() 

792 control_context.term() 

793 

794 time.sleep(0.5) # Wait before next ping 

795 

796 logger.warning(f"🔬 VISUALIZER: Timeout waiting for napari viewer handshake") 

797 return False 

798 

799 def _setup_zmq_client(self): 

800 """Set up ZeroMQ client to send data to viewer process.""" 

801 if self.port is None: 

802 raise RuntimeError("Port not set - call start_viewer() first") 

803 

804 self.zmq_context = zmq.Context() 

805 self.zmq_socket = self.zmq_context.socket(zmq.PUB) 

806 self.zmq_socket.connect(f"tcp://localhost:{self.port}") 

807 

808 # Brief delay for ZMQ connection to establish 

809 time.sleep(0.1) 

810 logger.info(f"🔬 VISUALIZER: ZMQ client connected to port {self.port}") 

811 

812 def send_image_data(self, step_id: str, image_data: np.ndarray, axis_id: str = "unknown"): 

813 """ 

814 DISABLED: This method bypasses component-aware stacking. 

815 All visualization must go through the streaming backend. 

816 """ 

817 raise RuntimeError( 

818 f"send_image_data() is disabled. Use streaming backend for component-aware display. " 

819 f"step_id: {step_id}, axis_id: {axis_id}, shape: {image_data.shape}" 

820 ) 

821 

822 def stop_viewer(self): 

823 """Stop the napari viewer process (only if not persistent).""" 

824 with self._lock: 

825 if not self.persistent: 

826 logger.info("🔬 VISUALIZER: Stopping non-persistent napari viewer") 

827 self._cleanup_zmq() 

828 if self.process: 

829 # Handle both subprocess and multiprocessing process types 

830 if hasattr(self.process, 'is_alive'): 

831 # multiprocessing.Process 

832 if self.process.is_alive(): 

833 self.process.terminate() 

834 self.process.join(timeout=5) 

835 if self.process.is_alive(): 

836 logger.warning("🔬 VISUALIZER: Force killing napari viewer process") 

837 self.process.kill() 

838 else: 

839 # subprocess.Popen 

840 if self.process.poll() is None: # Still running 

841 self.process.terminate() 

842 try: 

843 self.process.wait(timeout=5) 

844 except subprocess.TimeoutExpired: 

845 logger.warning("🔬 VISUALIZER: Force killing napari viewer process") 

846 self.process.kill() 

847 self.is_running = False 

848 else: 

849 logger.info("🔬 VISUALIZER: Keeping persistent napari viewer alive") 

850 # Just cleanup our ZMQ connection, leave process running 

851 self._cleanup_zmq() 

852 # DON'T set is_running = False for persistent viewers! 

853 # The process is still alive and should be reusable 

854 

855 def _cleanup_zmq(self): 

856 """Clean up ZeroMQ resources.""" 

857 if self.zmq_socket: 

858 self.zmq_socket.close() 

859 self.zmq_socket = None 

860 if self.zmq_context: 

861 self.zmq_context.term() 

862 self.zmq_context = None 

863 

864 def visualize_path(self, step_id: str, path: str, backend: str, axis_id: Optional[str] = None): 

865 """ 

866 DISABLED: This method bypasses component-aware stacking. 

867 All visualization must go through the streaming backend. 

868 """ 

869 raise RuntimeError( 

870 f"visualize_path() is disabled. Use streaming backend for component-aware display. " 

871 f"Path: {path}, step_id: {step_id}, axis_id: {axis_id}" 

872 ) 

873 

874 def _prepare_data_for_display(self, data: Any, step_id_for_log: str, display_config=None) -> Optional[np.ndarray]: 

875 """Converts loaded data to a displayable NumPy array (slice or stack based on config).""" 

876 cpu_tensor: Optional[np.ndarray] = None 

877 try: 

878 # GPU to CPU conversion logic 

879 if hasattr(data, 'is_cuda') and data.is_cuda: # PyTorch 

880 cpu_tensor = data.cpu().numpy() 

881 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # Check for device attribute 

882 if hasattr(data, 'get'): # CuPy 

883 cpu_tensor = data.get() 

884 elif hasattr(data, 'numpy'): # JAX on GPU might have .numpy() after host transfer 

885 cpu_tensor = np.asarray(data) # JAX arrays might need explicit conversion 

886 else: # Fallback for other GPU array types if possible 

887 logger.warning(f"Unknown GPU array type for step '{step_id_for_log}'. Attempting .numpy().") 

888 if hasattr(data, 'numpy'): 

889 cpu_tensor = data.numpy() 

890 else: 

891 logger.error(f"Cannot convert GPU tensor of type {type(data)} for step '{step_id_for_log}'.") 

892 return None 

893 elif isinstance(data, np.ndarray): 

894 cpu_tensor = data 

895 else: 

896 # Attempt to convert to numpy array if it's some other array-like structure 

897 try: 

898 cpu_tensor = np.asarray(data) 

899 logger.debug(f"Converted data of type {type(data)} to numpy array for step '{step_id_for_log}'.") 

900 except Exception as e_conv: 

901 logger.warning(f"Unsupported data type for step '{step_id_for_log}': {type(data)}. Error: {e_conv}") 

902 return None 

903 

904 if cpu_tensor is None: # Should not happen if logic above is correct 

905 return None 

906 

907 # Determine display mode based on configuration 

908 # Default behavior: show as stack unless config specifies otherwise 

909 should_slice = False 

910 

911 if display_config: 

912 # Check if any component mode is set to SLICE 

913 from openhcs.core.config import NapariDimensionMode 

914 from openhcs.constants import VariableComponents 

915 

916 # Check individual component mode fields 

917 for component in VariableComponents: 

918 field_name = f"{component.value}_mode" 

919 if hasattr(display_config, field_name): 

920 mode = getattr(display_config, field_name) 

921 if mode == NapariDimensionMode.SLICE: 

922 should_slice = True 

923 break 

924 else: 

925 # Default: slice for backward compatibility 

926 should_slice = True 

927 

928 # Slicing/stacking logic 

929 display_data: Optional[np.ndarray] = None 

930 

931 if should_slice: 

932 # Original slicing behavior 

933 if cpu_tensor.ndim == 3: # ZYX 

934 display_data = cpu_tensor[cpu_tensor.shape[0] // 2, :, :] 

935 elif cpu_tensor.ndim == 2: # YX 

936 display_data = cpu_tensor 

937 elif cpu_tensor.ndim > 3: # e.g. CZYX or TZYX 

938 logger.debug(f"Tensor for step '{step_id_for_log}' has ndim > 3 ({cpu_tensor.ndim}). Taking a slice.") 

939 slicer = [0] * (cpu_tensor.ndim - 2) # Slice first channels/times 

940 slicer[-1] = cpu_tensor.shape[-3] // 2 # Middle Z 

941 try: 

942 display_data = cpu_tensor[tuple(slicer)] 

943 except IndexError: # Handle cases where slicing might fail (e.g. very small dimensions) 

944 logger.error(f"Slicing failed for tensor with shape {cpu_tensor.shape} for step '{step_id_for_log}'.", exc_info=True) 

945 display_data = None 

946 else: 

947 logger.warning(f"Tensor for step '{step_id_for_log}' has unsupported ndim for slicing: {cpu_tensor.ndim}.") 

948 return None 

949 else: 

950 # Stack mode: send the full data to napari (napari can handle 3D+ data) 

951 if cpu_tensor.ndim >= 2: 

952 display_data = cpu_tensor 

953 logger.debug(f"Sending {cpu_tensor.ndim}D stack to napari for step '{step_id_for_log}' (shape: {cpu_tensor.shape})") 

954 else: 

955 logger.warning(f"Tensor for step '{step_id_for_log}' has unsupported ndim for stacking: {cpu_tensor.ndim}.") 

956 return None 

957 

958 return display_data.copy() if display_data is not None else None 

959 

960 except Exception as e: 

961 logger.error(f"Error preparing data from step '{step_id_for_log}' for display: {e}", exc_info=True) 

962 return None 

963