Coverage for openhcs/pyqt_gui/widgets/shared/zmq_server_manager.py: 0.0%

392 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Generic ZMQ Server Manager Widget for PyQt6. 

3 

4Provides a reusable UI component for managing any ZMQ server (execution servers, 

5Napari viewers, future servers) using the ZMQServer/ZMQClient ABC interface. 

6 

7Features: 

8- Auto-discovery of running servers via port scanning 

9- Display server info (port, type, status, log file) 

10- Graceful shutdown and force kill 

11- Double-click to open log files 

12- Works with ANY ZMQServer subclass 

13- Tracks launching viewers with queued image counts 

14""" 

15 

16import logging 

17from pathlib import Path 

18from typing import List, Dict, Any, Optional 

19from PyQt6.QtWidgets import ( 

20 QWidget, QVBoxLayout, QHBoxLayout, QTreeWidget, QTreeWidgetItem, 

21 QPushButton, QGroupBox, QMessageBox, QAbstractItemView 

22) 

23from PyQt6.QtCore import Qt, pyqtSignal, pyqtSlot, QTimer 

24from openhcs.pyqt_gui.shared.style_generator import StyleSheetGenerator 

25import threading 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30# Global registry for launching viewers 

31# Format: {port: {'type': 'napari'|'fiji', 'queued_images': int, 'start_time': float}} 

32_launching_viewers_lock = threading.Lock() 

33_launching_viewers: Dict[int, Dict[str, Any]] = {} 

34 

35 

36# Global reference to active ZMQ server manager widgets (for triggering refreshes) 

37_active_managers_lock = threading.Lock() 

38_active_managers: List['ZMQServerManagerWidget'] = [] 

39 

40 

41def register_launching_viewer(port: int, viewer_type: str, queued_images: int = 0): 

42 """Register a viewer that is launching and trigger UI refresh. 

43 

44 If the viewer is already launching, accumulates the queue count instead of replacing it. 

45 """ 

46 import time 

47 with _launching_viewers_lock: 

48 if port in _launching_viewers: 

49 # Already launching - accumulate queue count 

50 _launching_viewers[port]['queued_images'] += queued_images 

51 logger.info(f"Updated launching {viewer_type} viewer on port {port}: added {queued_images} images (total: {_launching_viewers[port]['queued_images']})") 

52 else: 

53 # New launching viewer 

54 _launching_viewers[port] = { 

55 'type': viewer_type, 

56 'queued_images': queued_images, 

57 'start_time': time.time() 

58 } 

59 logger.info(f"Registered launching {viewer_type} viewer on port {port} with {queued_images} queued images") 

60 

61 # Trigger immediate refresh on all active managers (fast path - no port scan) 

62 _trigger_manager_refresh_fast() 

63 

64 

65def update_launching_viewer_queue(port: int, queued_images: int): 

66 """Update the queued image count for a launching viewer and trigger UI refresh. 

67 

68 This SETS the queue count (doesn't accumulate). Use register_launching_viewer() to add images. 

69 """ 

70 with _launching_viewers_lock: 

71 if port in _launching_viewers: 

72 _launching_viewers[port]['queued_images'] = queued_images 

73 logger.debug(f"Updated launching viewer on port {port}: {queued_images} queued images") 

74 

75 # Trigger immediate refresh on all active managers (fast path - no port scan) 

76 _trigger_manager_refresh_fast() 

77 

78 

79def unregister_launching_viewer(port: int): 

80 """Remove a viewer from the launching registry (it's now ready) and trigger UI refresh.""" 

81 with _launching_viewers_lock: 

82 if port in _launching_viewers: 

83 del _launching_viewers[port] 

84 logger.info(f"Unregistered launching viewer on port {port} (now ready)") 

85 

86 # Trigger full refresh to pick up the now-ready viewer via port scan 

87 _trigger_manager_refresh_full() 

88 

89 

90def _trigger_manager_refresh_fast(): 

91 """Trigger fast refresh (launching viewers only, no port scan) on all active managers.""" 

92 with _active_managers_lock: 

93 for manager in _active_managers: 

94 try: 

95 # Use QMetaObject to safely call from any thread 

96 from PyQt6.QtCore import QMetaObject, Qt 

97 QMetaObject.invokeMethod( 

98 manager, 

99 "_refresh_launching_viewers_only", 

100 Qt.ConnectionType.QueuedConnection 

101 ) 

102 except Exception as e: 

103 logger.debug(f"Failed to trigger fast refresh on manager: {e}") 

104 

105 

106def _trigger_manager_refresh_full(): 

107 """Trigger full refresh (port scan + launching viewers) on all active managers.""" 

108 with _active_managers_lock: 

109 for manager in _active_managers: 

110 try: 

111 # Use QMetaObject to safely call from any thread 

112 from PyQt6.QtCore import QMetaObject, Qt 

113 QMetaObject.invokeMethod( 

114 manager, 

115 "refresh_servers", 

116 Qt.ConnectionType.QueuedConnection 

117 ) 

118 except Exception as e: 

119 logger.debug(f"Failed to trigger full refresh on manager: {e}") 

120 

121 

122def get_launching_viewers() -> Dict[int, Dict[str, Any]]: 

123 """Get a copy of the launching viewers registry.""" 

124 with _launching_viewers_lock: 

125 return dict(_launching_viewers) 

126 

127 

128class ZMQServerManagerWidget(QWidget): 

129 """ 

130 Generic ZMQ server manager widget. 

131 

132 Works with any ZMQServer subclass via the ABC interface. 

133 Displays running servers and provides management controls. 

134 """ 

135 

136 # Signals 

137 server_killed = pyqtSignal(int) # Emitted when server is killed (port) 

138 log_file_opened = pyqtSignal(str) # Emitted when log file is opened (path) 

139 _scan_complete = pyqtSignal(list) # Internal signal for async scan completion 

140 _kill_complete = pyqtSignal(bool, str) # Internal signal for async kill completion (success, message) 

141 

142 def __init__( 

143 self, 

144 ports_to_scan: List[int], 

145 title: str = "ZMQ Servers", 

146 style_generator: Optional[StyleSheetGenerator] = None, 

147 parent: Optional[QWidget] = None 

148 ): 

149 """ 

150 Initialize ZMQ server manager widget. 

151 

152 Args: 

153 ports_to_scan: List of ports to scan for servers 

154 title: Title for the group box 

155 style_generator: Style generator for consistent styling 

156 parent: Parent widget 

157 """ 

158 super().__init__(parent) 

159 

160 self.ports_to_scan = ports_to_scan 

161 self.title = title 

162 self.style_generator = style_generator 

163 

164 # Server tracking 

165 self.servers: List[Dict[str, Any]] = [] 

166 

167 # Register this manager for launching viewer updates 

168 with _active_managers_lock: 

169 _active_managers.append(self) 

170 

171 # Connect internal signal for async scanning 

172 self._scan_complete.connect(self._update_server_list) 

173 

174 # Auto-refresh timer (async scanning won't block UI) 

175 self.refresh_timer = QTimer() 

176 self.refresh_timer.timeout.connect(self.refresh_servers) 

177 

178 # Cleanup flag to prevent operations after cleanup 

179 self._is_cleaning_up = False 

180 

181 self.setup_ui() 

182 

183 def cleanup(self): 

184 """Cleanup resources before widget destruction.""" 

185 if self._is_cleaning_up: 

186 return 

187 

188 self._is_cleaning_up = True 

189 

190 # Stop refresh timer first to prevent new refresh calls 

191 if hasattr(self, 'refresh_timer') and self.refresh_timer: 

192 self.refresh_timer.stop() 

193 self.refresh_timer.deleteLater() 

194 self.refresh_timer = None 

195 

196 # Unregister this manager from global list 

197 with _active_managers_lock: 

198 if self in _active_managers: 

199 _active_managers.remove(self) 

200 

201 logger.debug("ZMQServerManagerWidget cleanup completed") 

202 

203 def __del__(self): 

204 """Cleanup when widget is destroyed.""" 

205 self.cleanup() 

206 

207 def showEvent(self, event): 

208 """Auto-scan for servers when widget is shown.""" 

209 super().showEvent(event) 

210 if not self._is_cleaning_up: 

211 # Scan for servers on first show 

212 self.refresh_servers() 

213 # Start auto-refresh (1 second interval - async scanning won't block UI) 

214 if self.refresh_timer: 

215 self.refresh_timer.start(1000) 

216 

217 def hideEvent(self, event): 

218 """Stop auto-refresh when widget is hidden.""" 

219 super().hideEvent(event) 

220 # Stop timer to prevent unnecessary background work 

221 if hasattr(self, 'refresh_timer') and self.refresh_timer: 

222 self.refresh_timer.stop() 

223 

224 def setup_ui(self): 

225 """Setup the user interface.""" 

226 layout = QVBoxLayout(self) 

227 layout.setContentsMargins(0, 0, 0, 0) 

228 

229 # Group box 

230 group_box = QGroupBox(self.title) 

231 group_layout = QVBoxLayout(group_box) 

232 group_layout.setContentsMargins(5, 5, 5, 5) 

233 

234 # Server tree (hierarchical display with workers as children) 

235 self.server_tree = QTreeWidget() 

236 self.server_tree.setHeaderLabels(["Server / Worker", "Status", "Info"]) 

237 self.server_tree.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection) 

238 self.server_tree.itemDoubleClicked.connect(self._on_item_double_clicked) 

239 self.server_tree.setColumnWidth(0, 250) 

240 self.server_tree.setColumnWidth(1, 100) 

241 group_layout.addWidget(self.server_tree) 

242 

243 # Buttons 

244 button_layout = QHBoxLayout() 

245 

246 self.refresh_btn = QPushButton("Refresh") 

247 self.refresh_btn.clicked.connect(self.refresh_servers) 

248 button_layout.addWidget(self.refresh_btn) 

249 

250 self.quit_btn = QPushButton("Quit") 

251 self.quit_btn.clicked.connect(self.quit_selected_servers) 

252 button_layout.addWidget(self.quit_btn) 

253 

254 self.force_kill_btn = QPushButton("Force Kill") 

255 self.force_kill_btn.clicked.connect(self.force_kill_selected_servers) 

256 button_layout.addWidget(self.force_kill_btn) 

257 

258 group_layout.addLayout(button_layout) 

259 

260 layout.addWidget(group_box) 

261 

262 # Apply styling 

263 if self.style_generator: 

264 # Apply button styles 

265 self.refresh_btn.setStyleSheet(self.style_generator.generate_button_style()) 

266 self.quit_btn.setStyleSheet(self.style_generator.generate_button_style()) 

267 self.force_kill_btn.setStyleSheet(self.style_generator.generate_button_style()) 

268 

269 # Apply tree widget style (uses existing method) 

270 self.server_tree.setStyleSheet(self.style_generator.generate_tree_widget_style()) 

271 

272 # Apply group box style 

273 cs = self.style_generator.color_scheme 

274 group_box.setStyleSheet(f""" 

275 QGroupBox {{ 

276 background-color: {cs.to_hex(cs.panel_bg)}; 

277 border: 1px solid {cs.to_hex(cs.border_color)}; 

278 border-radius: 4px; 

279 margin-top: 8px; 

280 padding-top: 8px; 

281 font-weight: bold; 

282 color: {cs.to_hex(cs.text_accent)}; 

283 }} 

284 QGroupBox::title {{ 

285 subcontrol-origin: margin; 

286 subcontrol-position: top left; 

287 padding: 2px 5px; 

288 color: {cs.to_hex(cs.text_accent)}; 

289 }} 

290 """) 

291 

292 # Connect internal signals 

293 self._scan_complete.connect(self._update_server_list) 

294 self._kill_complete.connect(self._on_kill_complete) 

295 

296 def refresh_servers(self): 

297 """Scan ports and refresh server list (async in background).""" 

298 # Guard against calls after cleanup 

299 if self._is_cleaning_up: 

300 return 

301 

302 import threading 

303 

304 def scan_and_update(): 

305 """Background thread to scan ports without blocking UI.""" 

306 import concurrent.futures 

307 

308 # Scan ports in parallel using thread pool (like Napari implementation) 

309 servers = [] 

310 

311 with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: 

312 # Submit all ping tasks 

313 future_to_port = { 

314 executor.submit(self._ping_server, port): port 

315 for port in self.ports_to_scan 

316 } 

317 

318 # Collect results as they complete 

319 for future in concurrent.futures.as_completed(future_to_port): 

320 port = future_to_port[future] 

321 try: 

322 server_info = future.result() 

323 if server_info: 

324 servers.append(server_info) 

325 except Exception as e: 

326 logger.debug(f"Error scanning port {port}: {e}") 

327 

328 # Update UI on main thread via signal 

329 self._scan_complete.emit(servers) 

330 

331 # Start scan in background thread 

332 thread = threading.Thread(target=scan_and_update, daemon=True) 

333 thread.start() 

334 

335 def _ping_server(self, port: int) -> Optional[Dict[str, Any]]: 

336 """ 

337 Ping a server on the given port and return its info. 

338 

339 Returns server info dict if responsive, None otherwise. 

340 """ 

341 import zmq 

342 import pickle 

343 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

344 from openhcs.runtime.zmq_base import get_zmq_transport_url, get_default_transport_mode 

345 

346 control_port = port + CONTROL_PORT_OFFSET 

347 control_context = None 

348 control_socket = None 

349 

350 try: 

351 control_context = zmq.Context() 

352 control_socket = control_context.socket(zmq.REQ) 

353 control_socket.setsockopt(zmq.LINGER, 0) 

354 control_socket.setsockopt(zmq.RCVTIMEO, 300) # 300ms timeout for fast scanning 

355 

356 # Use transport mode-aware URL (IPC or TCP) 

357 transport_mode = get_default_transport_mode() 

358 control_url = get_zmq_transport_url(control_port, transport_mode, 'localhost') 

359 control_socket.connect(control_url) 

360 

361 # Send ping 

362 ping_message = {'type': 'ping'} 

363 control_socket.send(pickle.dumps(ping_message)) 

364 

365 # Wait for pong 

366 response = control_socket.recv() 

367 response_data = pickle.loads(response) 

368 

369 # Return server info if valid pong 

370 if response_data.get('type') == 'pong': 

371 return response_data 

372 

373 return None 

374 

375 except Exception: 

376 return None 

377 finally: 

378 if control_socket: 

379 try: 

380 control_socket.close() 

381 except: 

382 pass 

383 if control_context: 

384 try: 

385 control_context.term() 

386 except: 

387 pass 

388 

389 @pyqtSlot() 

390 def _refresh_launching_viewers_only(self): 

391 """Fast refresh: Update UI with launching viewers only (no port scan). 

392 

393 This is called when launching viewer state changes and provides instant feedback. 

394 """ 

395 # Guard against calls after cleanup 

396 if self._is_cleaning_up: 

397 return 

398 

399 # Keep existing scanned servers, just update the tree display 

400 self._update_server_list(self.servers) 

401 

402 @pyqtSlot(list) 

403 def _update_server_list(self, servers: List[Dict[str, Any]]): 

404 """Update server tree on UI thread (called via signal).""" 

405 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry 

406 

407 self.servers = servers 

408 

409 # Save current selection (by port) before clearing 

410 selected_ports = set() 

411 for item in self.server_tree.selectedItems(): 

412 server_data = item.data(0, Qt.ItemDataRole.UserRole) 

413 if server_data and 'port' in server_data: 

414 selected_ports.add(server_data['port']) 

415 

416 self.server_tree.clear() 

417 

418 # Get queue tracker registry for progress info 

419 registry = GlobalQueueTrackerRegistry() 

420 

421 # First, add launching viewers 

422 launching_viewers = get_launching_viewers() 

423 for port, info in launching_viewers.items(): 

424 viewer_type = info['type'].capitalize() 

425 queued_images = info['queued_images'] 

426 

427 display_text = f"Port {port} - {viewer_type} Viewer" 

428 status_text = "🚀 Launching" 

429 info_text = f"{queued_images} images queued" if queued_images > 0 else "Starting..." 

430 

431 item = QTreeWidgetItem([display_text, status_text, info_text]) 

432 item.setData(0, Qt.ItemDataRole.UserRole, {'port': port, 'launching': True}) 

433 self.server_tree.addTopLevelItem(item) 

434 

435 # Add servers that are processing images (even if they didn't respond to ping) 

436 # This prevents busy servers from disappearing during image processing 

437 scanned_ports = {server.get('port') for server in servers} 

438 for tracker_port, tracker in registry.get_all_trackers().items(): 

439 if tracker_port in scanned_ports or tracker_port in launching_viewers: 

440 continue # Already in the list 

441 

442 # Check if this tracker has pending images (server is busy processing) 

443 pending = tracker.get_pending_count() 

444 if pending > 0: 

445 # Server is busy processing - add it even though it didn't respond to ping 

446 processed, total = tracker.get_progress() 

447 viewer_type = tracker.viewer_type.capitalize() 

448 

449 display_text = f"Port {tracker_port} - {viewer_type}ViewerServer" 

450 status_text = "⚙️" # Busy icon 

451 info_text = f"Processing: {processed}/{total} images" 

452 

453 # Check for stuck images 

454 if tracker.has_stuck_images(): 

455 status_text = "⚠️" 

456 stuck_images = tracker.get_stuck_images() 

457 info_text += f" (⚠️ {len(stuck_images)} stuck)" 

458 

459 # Create pseudo-server dict for consistency 

460 pseudo_server = { 

461 'port': tracker_port, 

462 'server': f'{viewer_type}ViewerServer', 

463 'ready': True, 

464 'busy': True # Mark as busy 

465 } 

466 

467 item = QTreeWidgetItem([display_text, status_text, info_text]) 

468 item.setData(0, Qt.ItemDataRole.UserRole, pseudo_server) 

469 self.server_tree.addTopLevelItem(item) 

470 

471 # Then add running servers 

472 for server in servers: 

473 port = server.get('port', 'unknown') 

474 

475 # Skip if this port is in launching registry (shouldn't happen, but be safe) 

476 if port in launching_viewers: 

477 continue 

478 

479 server_type = server.get('server', 'Unknown') 

480 ready = server.get('ready', False) 

481 

482 # Determine status icon 

483 if ready: 

484 status_icon = "✅" 

485 else: 

486 status_icon = "🚀" 

487 

488 # Handle execution servers specially - show workers as children 

489 if server_type == 'ZMQExecutionServer': 

490 running_executions = server.get('running_executions', []) 

491 workers = server.get('workers', []) 

492 

493 # Create server item 

494 if running_executions: 

495 server_text = f"Port {port} - Execution Server" 

496 status_text = f"{status_icon} {len(running_executions)} exec" 

497 info_text = f"{len(workers)} workers" 

498 else: 

499 server_text = f"Port {port} - Execution Server" 

500 status_text = f"{status_icon} Idle" 

501 info_text = f"{len(workers)} workers" if workers else "" 

502 

503 server_item = QTreeWidgetItem([server_text, status_text, info_text]) 

504 server_item.setData(0, Qt.ItemDataRole.UserRole, server) 

505 self.server_tree.addTopLevelItem(server_item) 

506 

507 # Add worker processes as children 

508 for worker in workers: 

509 pid = worker.get('pid', 'unknown') 

510 status = worker.get('status', 'unknown') 

511 cpu = worker.get('cpu_percent', 0) 

512 mem_mb = worker.get('memory_mb', 0) 

513 

514 worker_text = f" Worker PID {pid}" 

515 worker_status = f"⚙️ {status}" 

516 worker_info = f"CPU: {cpu:.1f}% | Mem: {mem_mb:.0f}MB" 

517 

518 worker_item = QTreeWidgetItem([worker_text, worker_status, worker_info]) 

519 worker_item.setData(0, Qt.ItemDataRole.UserRole, {'type': 'worker', 'pid': pid, 'server': server}) 

520 server_item.addChild(worker_item) 

521 

522 # Expand server item if it has workers 

523 if workers: 

524 server_item.setExpanded(True) 

525 

526 else: 

527 # Other server types (Napari, Fiji viewers) - show with progress if available 

528 display_text = f"Port {port} - {server_type}" 

529 status_text = status_icon 

530 info_text = "" 

531 

532 # Check if this is a viewer with pending images 

533 tracker = registry.get_tracker(port) 

534 if tracker: 

535 processed, total = tracker.get_progress() 

536 pending = tracker.get_pending_count() 

537 

538 if pending > 0: 

539 # Still processing images 

540 info_text = f"Processing: {processed}/{total} images" 

541 

542 # Check for stuck images 

543 if tracker.has_stuck_images(): 

544 status_text = "⚠️" # Warning icon for stuck 

545 stuck_images = tracker.get_stuck_images() 

546 info_text += f" (⚠️ {len(stuck_images)} stuck)" 

547 elif total > 0: 

548 # All images processed 

549 info_text = f"✅ Processed {total} images" 

550 

551 # If no processing info, show memory usage 

552 if not info_text: 

553 mem_mb = server.get('memory_mb') 

554 cpu_percent = server.get('cpu_percent') 

555 if mem_mb is not None: 

556 info_text = f"Mem: {mem_mb:.0f}MB" 

557 if cpu_percent is not None: 

558 info_text += f" | CPU: {cpu_percent:.1f}%" 

559 

560 item = QTreeWidgetItem([display_text, status_text, info_text]) 

561 item.setData(0, Qt.ItemDataRole.UserRole, server) 

562 self.server_tree.addTopLevelItem(item) 

563 

564 # Restore selection after refresh 

565 if selected_ports: 

566 for i in range(self.server_tree.topLevelItemCount()): 

567 item = self.server_tree.topLevelItem(i) 

568 server_data = item.data(0, Qt.ItemDataRole.UserRole) 

569 if server_data and server_data.get('port') in selected_ports: 

570 item.setSelected(True) 

571 

572 logger.debug(f"Found {len(servers)} ZMQ servers") 

573 

574 @pyqtSlot(bool, str) 

575 def _on_kill_complete(self, success: bool, message: str): 

576 """Handle kill operation completion on UI thread.""" 

577 if not success: 

578 QMessageBox.warning(self, "Kill Failed", message) 

579 # Refresh list after kill (quick refresh for better UX) 

580 QTimer.singleShot(200, self.refresh_servers) 

581 

582 def quit_selected_servers(self): 

583 """Gracefully quit selected servers (async to avoid blocking UI).""" 

584 selected_items = self.server_tree.selectedItems() 

585 if not selected_items: 

586 QMessageBox.warning(self, "No Selection", "Please select servers to quit.") 

587 return 

588 

589 # Collect ports to kill BEFORE showing dialog (items may be deleted by auto-refresh) 

590 ports_to_kill = [] 

591 for item in selected_items: 

592 data = item.data(0, Qt.ItemDataRole.UserRole) 

593 # Skip worker items (they don't have ports) 

594 if data and data.get('type') == 'worker': 

595 continue 

596 port = data.get('port') if data else None 

597 if port: 

598 ports_to_kill.append(port) 

599 

600 if not ports_to_kill: 

601 QMessageBox.warning(self, "No Servers", "No servers selected (only workers selected).") 

602 return 

603 

604 # Confirm with user 

605 reply = QMessageBox.question( 

606 self, 

607 "Quit Confirmation", 

608 f"Gracefully quit {len(ports_to_kill)} server(s)?\n\n" 

609 "For execution servers: kills workers only, server stays alive.", 

610 QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, 

611 QMessageBox.StandardButton.Yes 

612 ) 

613 

614 if reply != QMessageBox.StandardButton.Yes: 

615 return 

616 

617 # Kill in background thread to avoid blocking UI 

618 import threading 

619 

620 def kill_servers(): 

621 from openhcs.runtime.zmq_base import ZMQClient 

622 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry 

623 failed_ports = [] 

624 registry = GlobalQueueTrackerRegistry() 

625 

626 for port in ports_to_kill: 

627 try: 

628 logger.info(f"Attempting to quit server on port {port}...") 

629 success = ZMQClient.kill_server_on_port(port, graceful=True) 

630 if success: 

631 logger.info(f"✅ Successfully quit server on port {port}") 

632 # Clear queue tracker for this viewer 

633 registry.remove_tracker(port) 

634 self.server_killed.emit(port) 

635 else: 

636 failed_ports.append(port) 

637 logger.warning(f"❌ Failed to quit server on port {port} (kill_server_on_port returned False)") 

638 except Exception as e: 

639 failed_ports.append(port) 

640 logger.error(f"❌ Error quitting server on port {port}: {e}") 

641 

642 # Emit completion signal 

643 if failed_ports: 

644 self._kill_complete.emit(False, f"Failed to quit servers on ports: {failed_ports}") 

645 else: 

646 self._kill_complete.emit(True, "All servers quit successfully") 

647 

648 thread = threading.Thread(target=kill_servers, daemon=True) 

649 thread.start() 

650 

651 def force_kill_selected_servers(self): 

652 """Force kill selected servers (async to avoid blocking UI).""" 

653 selected_items = self.server_tree.selectedItems() 

654 if not selected_items: 

655 QMessageBox.warning(self, "No Selection", "Please select servers to force kill.") 

656 return 

657 

658 # Collect ports to kill BEFORE showing dialog (items may be deleted by auto-refresh) 

659 ports_to_kill = [] 

660 for item in selected_items: 

661 data = item.data(0, Qt.ItemDataRole.UserRole) 

662 # Skip worker items (they don't have ports) 

663 if data and data.get('type') == 'worker': 

664 continue 

665 port = data.get('port') if data else None 

666 if port: 

667 ports_to_kill.append(port) 

668 

669 if not ports_to_kill: 

670 QMessageBox.warning(self, "No Servers", "No servers selected (only workers selected).") 

671 return 

672 

673 # Confirm with user 

674 reply = QMessageBox.question( 

675 self, 

676 "Force Kill Confirmation", 

677 f"Force kill {len(ports_to_kill)} server(s)?\n\n" 

678 "For execution servers: kills workers AND server.\n" 

679 "For Napari viewers: kills the viewer process.", 

680 QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, 

681 QMessageBox.StandardButton.No 

682 ) 

683 

684 if reply != QMessageBox.StandardButton.Yes: 

685 return 

686 

687 # Kill in background thread to avoid blocking UI 

688 import threading 

689 

690 def kill_servers(): 

691 from openhcs.runtime.zmq_base import ZMQClient 

692 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry 

693 registry = GlobalQueueTrackerRegistry() 

694 

695 for port in ports_to_kill: 

696 try: 

697 logger.info(f"🔥 FORCE KILL: Force killing server on port {port} (kills workers AND server)") 

698 # Use kill_server_on_port with graceful=False 

699 # This handles both IPC and TCP modes correctly 

700 success = ZMQClient.kill_server_on_port(port, graceful=False) 

701 

702 if success: 

703 logger.info(f"✅ Successfully force killed server on port {port}") 

704 else: 

705 logger.warning(f"⚠️ Force kill returned False for port {port}, but continuing cleanup") 

706 

707 # Clear queue tracker for this viewer (always, regardless of kill result) 

708 registry.remove_tracker(port) 

709 self.server_killed.emit(port) 

710 

711 except Exception as e: 

712 logger.error(f"❌ Error force killing server on port {port}: {e}") 

713 # Still emit signal to trigger refresh and cleanup, even on error 

714 registry.remove_tracker(port) 

715 self.server_killed.emit(port) 

716 

717 # Always emit success - we've done our best to kill the processes 

718 # The refresh will remove any dead entries from the list 

719 self._kill_complete.emit(True, "Force kill operation completed (list will refresh)") 

720 

721 thread = threading.Thread(target=kill_servers, daemon=True) 

722 thread.start() 

723 

724 def _on_item_double_clicked(self, item: QTreeWidgetItem): 

725 """Handle double-click on tree item - open log file.""" 

726 data = item.data(0, Qt.ItemDataRole.UserRole) 

727 

728 # For worker items, get the server from parent 

729 if data and data.get('type') == 'worker': 

730 data = data.get('server', {}) 

731 

732 log_file = data.get('log_file_path') if data else None 

733 

734 if log_file and Path(log_file).exists(): 

735 # Emit signal for parent to handle (e.g., open in log viewer) 

736 self.log_file_opened.emit(log_file) 

737 logger.info(f"Opened log file: {log_file}") 

738 else: 

739 QMessageBox.information( 

740 self, 

741 "No Log File", 

742 f"No log file available for this item.\n\nPort: {data.get('port', 'unknown') if data else 'unknown'}" 

743 ) 

744