Coverage for openhcs/runtime/zmq_base.py: 33.0%

524 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1"""Generic ZMQ dual-channel pattern (data + control).""" 

2 

3import logging 

4import socket 

5import subprocess 

6import platform 

7import time 

8import threading 

9import os 

10from abc import ABC, abstractmethod 

11from pathlib import Path 

12from typing import Optional, Dict, Type 

13import pickle 

14from openhcs.runtime.zmq_messages import ControlMessageType, ResponseType, MessageFields, PongResponse, SocketType, ImageAck 

15from openhcs.constants.constants import ( 

16 CONTROL_PORT_OFFSET, IPC_SOCKET_DIR_NAME, IPC_SOCKET_PREFIX, IPC_SOCKET_EXTENSION 

17) 

18from openhcs.core.config import TransportMode 

19from openhcs.core.auto_register_meta import AutoRegisterMeta, LazyDiscoveryDict 

20 

21logger = logging.getLogger(__name__) 

22 

23# Global shared acknowledgment port for all viewers 

24# All viewers send acks to this port via PUSH sockets 

25# Client listens on this port via PULL socket 

26SHARED_ACK_PORT = 7555 

27 

28# ============================================================================ 

29# ZMQ Server Registry 

30# ============================================================================ 

31# Registry will be auto-created by AutoRegisterMeta on ZMQServer base class 

32# Access via: ZMQServer.__registry__ (created after class definition below) 

33 

34 

35def get_default_transport_mode() -> TransportMode: 

36 """ 

37 Get the default transport mode for the current platform. 

38 

39 Windows doesn't support IPC (POSIX named pipes), so use TCP with localhost. 

40 Unix-like systems (Linux/macOS) use IPC for better performance. 

41 

42 Returns: 

43 TransportMode.TCP on Windows, TransportMode.IPC on Unix/macOS 

44 """ 

45 return TransportMode.TCP if platform.system() == 'Windows' else TransportMode.IPC 

46 

47 

48# ============================================================================ 

49# ZMQ Transport Utilities 

50# ============================================================================ 

51 

52def _get_ipc_socket_path(port: int) -> Optional[Path]: 

53 """Get IPC socket path for a given port (Unix/Mac only). 

54 

55 Args: 

56 port: Port number to generate socket path for 

57 

58 Returns: 

59 Path to IPC socket file, or None on Windows 

60 """ 

61 if platform.system() == 'Windows': 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 return None # Windows uses named pipes, not file paths 

63 

64 ipc_dir = Path.home() / ".openhcs" / IPC_SOCKET_DIR_NAME 

65 socket_name = f"{IPC_SOCKET_PREFIX}-{port}{IPC_SOCKET_EXTENSION}" 

66 return ipc_dir / socket_name 

67 

68 

69def _remove_ipc_socket(port: int) -> bool: 

70 """Remove stale IPC socket file for a given port. 

71 

72 Args: 

73 port: Port number whose socket should be removed 

74 

75 Returns: 

76 True if socket was removed, False otherwise 

77 """ 

78 socket_path = _get_ipc_socket_path(port) 

79 if socket_path and socket_path.exists(): 79 ↛ 87line 79 didn't jump to line 87 because the condition on line 79 was always true

80 try: 

81 socket_path.unlink() 

82 logger.info(f"🧹 Removed stale IPC socket: {socket_path}") 

83 return True 

84 except Exception as e: 

85 logger.warning(f"Failed to remove stale IPC socket {socket_path}: {e}") 

86 return False 

87 return False 

88 

89 

90def get_zmq_transport_url(port: int, transport_mode: TransportMode, host: str = 'localhost') -> str: 

91 """Generate ZMQ transport URL based on mode and platform. 

92 

93 Args: 

94 port: Port number (used for both IPC and TCP) 

95 transport_mode: IPC or TCP 

96 host: Host for TCP mode (ignored for IPC) 

97 

98 Returns: 

99 ZMQ transport URL string 

100 

101 Raises: 

102 ValueError: If transport_mode is invalid or IPC is requested on Windows (fail-loud) 

103 

104 Examples: 

105 >>> get_zmq_transport_url(5555, TransportMode.IPC) # Unix/Mac 

106 'ipc:///home/user/.openhcs/ipc/openhcs-zmq-5555.sock' 

107 

108 >>> get_zmq_transport_url(5555, TransportMode.TCP, 'localhost') 

109 'tcp://localhost:5555' 

110 """ 

111 if transport_mode == TransportMode.IPC: 111 ↛ 125line 111 didn't jump to line 125 because the condition on line 111 was always true

112 if platform.system() == 'Windows': 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was never true

113 # Windows doesn't support IPC (Unix domain sockets) - fail-loud 

114 raise ValueError( 

115 "IPC transport mode is not supported on Windows. " 

116 "Windows does not support Unix domain sockets. " 

117 "Use TransportMode.TCP instead, or use get_default_transport_mode() " 

118 "to automatically select the correct mode for the platform." 

119 ) 

120 else: 

121 # Unix domain sockets: use helper to get path and ensure directory exists 

122 socket_path = _get_ipc_socket_path(port) 

123 socket_path.parent.mkdir(parents=True, exist_ok=True) # Fail-loud if permission denied 

124 return f"ipc://{socket_path}" 

125 elif transport_mode == TransportMode.TCP: 

126 return f"tcp://{host}:{port}" 

127 else: 

128 # Fail-loud for invalid enum (should never happen with proper typing) 

129 raise ValueError(f"Invalid transport_mode: {transport_mode}") 

130 

131 

132# Global ack listener singleton 

133_ack_listener_thread = None 

134_ack_listener_lock = threading.Lock() 

135_ack_listener_running = False 

136_ack_listener_transport_mode = None 

137 

138 

139def start_global_ack_listener(transport_mode: TransportMode = None): 

140 """Start the global ack listener thread (singleton). 

141 

142 This thread listens on SHARED_ACK_PORT for acknowledgments from all viewers 

143 and routes them to the appropriate queue trackers. 

144 

145 Safe to call multiple times - only starts once. 

146 

147 Args: 

148 transport_mode: Transport mode to use (IPC or TCP). Defaults to IPC. 

149 """ 

150 global _ack_listener_thread, _ack_listener_running, _ack_listener_transport_mode 

151 

152 with _ack_listener_lock: 

153 if _ack_listener_running: 

154 logger.debug("Ack listener already running") 

155 return 

156 

157 # Set transport mode (default to IPC on Unix, TCP on Windows) 

158 _ack_listener_transport_mode = transport_mode or get_default_transport_mode() 

159 

160 logger.info(f"Starting global ack listener on port {SHARED_ACK_PORT} with {_ack_listener_transport_mode.value} transport") 

161 _ack_listener_running = True 

162 _ack_listener_thread = threading.Thread( 

163 target=_ack_listener_loop, 

164 daemon=True, 

165 name="AckListener" 

166 ) 

167 _ack_listener_thread.start() 

168 

169 

170def _ack_listener_loop(): 

171 """Main loop for ack listener thread. 

172 

173 Receives acks from all viewers and routes to queue trackers. 

174 """ 

175 import zmq 

176 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry 

177 

178 registry = GlobalQueueTrackerRegistry() 

179 context = zmq.Context() 

180 socket = None 

181 

182 try: 

183 socket = context.socket(zmq.PULL) 

184 ack_url = get_zmq_transport_url(SHARED_ACK_PORT, _ack_listener_transport_mode, '*') 

185 socket.bind(ack_url) 

186 logger.info(f"✅ Ack listener bound to {ack_url}") 

187 

188 while _ack_listener_running: 

189 try: 

190 # Receive ack message (with timeout to allow checking _ack_listener_running) 

191 if socket.poll(timeout=1000): # 1 second timeout 

192 ack_dict = socket.recv_json() 

193 

194 # Parse ack message 

195 try: 

196 ack = ImageAck.from_dict(ack_dict) 

197 

198 # Route to appropriate queue tracker 

199 tracker = registry.get_tracker(ack.viewer_port) 

200 if tracker: 

201 tracker.mark_processed(ack.image_id) 

202 

203 # Trigger UI refresh to show updated progress immediately 

204 try: 

205 from openhcs.pyqt_gui.widgets.shared.zmq_server_manager import _trigger_manager_refresh_fast 

206 _trigger_manager_refresh_fast() 

207 except ImportError: 

208 # PyQt not available (e.g., in TUI mode) - skip UI refresh 

209 pass 

210 else: 

211 logger.warning(f"Received ack for unknown viewer port {ack.viewer_port}: {ack.image_id}") 

212 

213 except Exception as e: 

214 logger.error(f"Failed to parse ack message: {e}", exc_info=True) 

215 

216 except zmq.ZMQError as e: 

217 if _ack_listener_running: 

218 logger.error(f"ZMQ error in ack listener: {e}") 

219 time.sleep(0.1) 

220 

221 except Exception as e: 

222 logger.error(f"Fatal error in ack listener: {e}", exc_info=True) 

223 

224 finally: 

225 if socket: 

226 try: 

227 socket.close() 

228 except: 

229 pass 

230 try: 

231 context.term() 

232 except: 

233 pass 

234 logger.info("Ack listener stopped") 

235 

236 

237def stop_global_ack_listener(): 

238 """Stop the global ack listener thread.""" 

239 global _ack_listener_running 

240 

241 with _ack_listener_lock: 

242 if not _ack_listener_running: 

243 return 

244 

245 logger.info("Stopping global ack listener") 

246 _ack_listener_running = False 

247 

248 

249class ZMQServer(ABC, metaclass=AutoRegisterMeta): 

250 """ 

251 ABC for ZMQ servers - dual-channel pattern with ping/pong handshake. 

252 

253 Registry auto-created and stored as ZMQServer.__registry__. 

254 Subclasses auto-register by setting _server_type class attribute. 

255 """ 

256 __registry_key__ = '_server_type' 

257 

258 _server_type: Optional[str] = None # Override in subclasses for registration 

259 

260 def __init__(self, port, host='*', log_file_path=None, data_socket_type=None, transport_mode=None): 

261 import zmq 

262 self.port = port 

263 self.host = host 

264 self.control_port = port + CONTROL_PORT_OFFSET 

265 self.log_file_path = log_file_path 

266 self.data_socket_type = data_socket_type if data_socket_type is not None else zmq.PUB 

267 # Windows doesn't support IPC (POSIX named pipes), so use TCP with localhost 

268 self.transport_mode = transport_mode or get_default_transport_mode() 

269 self.zmq_context = None 

270 self.data_socket = None 

271 self.control_socket = None 

272 self._running = False 

273 self._ready = False 

274 self._lock = threading.Lock() 

275 

276 def start(self): 

277 import zmq 

278 with self._lock: 

279 if self._running: 

280 return 

281 self.zmq_context = zmq.Context() 

282 self.data_socket = self.zmq_context.socket(self.data_socket_type) 

283 self.data_socket.setsockopt(zmq.LINGER, 0) 

284 

285 # Set high water mark for SUB/PULL sockets to prevent message drops 

286 # when viewer is busy processing (e.g., creating shapes layers that take 2-3 seconds) 

287 # REP sockets don't need HWM since they're synchronous (one request at a time) 

288 if self.data_socket_type in (zmq.SUB, zmq.PULL): 

289 self.data_socket.setsockopt(zmq.RCVHWM, 100000) # Buffer up to 100k messages 

290 socket_type_name = "SUB" if self.data_socket_type == zmq.SUB else "PULL" 

291 logger.info(f"ZMQ {socket_type_name} socket RCVHWM set to 100000 to prevent drops during blocking operations") 

292 

293 data_url = get_zmq_transport_url(self.port, self.transport_mode, self.host) 

294 control_url = get_zmq_transport_url(self.control_port, self.transport_mode, self.host) 

295 

296 self.data_socket.bind(data_url) 

297 if self.data_socket_type == zmq.SUB: 

298 self.data_socket.setsockopt(zmq.SUBSCRIBE, b"") 

299 self.control_socket = self.zmq_context.socket(zmq.REP) 

300 self.control_socket.setsockopt(zmq.LINGER, 0) 

301 self.control_socket.bind(control_url) 

302 self._running = True 

303 logger.info(f"ZMQ Server started on {data_url} ({SocketType.from_zmq_constant(self.data_socket_type).get_display_name()}), control {control_url}") 

304 

305 def stop(self): 

306 with self._lock: 

307 if not self._running: 

308 return 

309 self._running = False 

310 if self.data_socket: 

311 self.data_socket.close() 

312 self.data_socket = None 

313 if self.control_socket: 

314 self.control_socket.close() 

315 self.control_socket = None 

316 if self.zmq_context: 

317 self.zmq_context.term() 

318 self.zmq_context = None 

319 logger.info("ZMQ Server stopped") 

320 

321 def is_running(self): 

322 return self._running 

323 

324 def process_messages(self): 

325 import zmq 

326 if not self._running: 

327 return 

328 try: 

329 control_data = pickle.loads(self.control_socket.recv(zmq.NOBLOCK)) 

330 if control_data.get(MessageFields.TYPE) == ControlMessageType.PING.value: 

331 if not self._ready: 

332 self._ready = True 

333 logger.info("Server ready") 

334 response = self._create_pong_response() 

335 else: 

336 response = self.handle_control_message(control_data) 

337 self.control_socket.send(pickle.dumps(response)) 

338 except zmq.Again: 

339 pass 

340 

341 def _create_pong_response(self): 

342 return PongResponse(port=self.port, control_port=self.control_port, ready=self._ready, 

343 server=self.__class__.__name__, log_file_path=self.log_file_path).to_dict() 

344 

345 def get_status_info(self): 

346 return {'port': self.port, 'control_port': self.control_port, 'running': self._running, 

347 'ready': self._ready, 'server_type': self.__class__.__name__, 'log_file': self.log_file_path} 

348 

349 def request_shutdown(self): 

350 self._running = False 

351 

352 @staticmethod 

353 def kill_processes_on_port(port): 

354 killed = 0 

355 try: 

356 system = platform.system() 

357 if system in ["Linux", "Darwin"]: 357 ↛ 366line 357 didn't jump to line 366 because the condition on line 357 was always true

358 result = subprocess.run(['lsof', '-ti', f'TCP:{port}', '-sTCP:LISTEN'], capture_output=True, text=True, timeout=2) 

359 if result.returncode == 0 and result.stdout.strip(): 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true

360 for pid in result.stdout.strip().split('\n'): 

361 try: 

362 subprocess.run(['kill', '-9', pid], timeout=1) 

363 killed += 1 

364 except: 

365 pass 

366 elif system == "Windows": 

367 result = subprocess.run(['netstat', '-ano'], capture_output=True, text=True, timeout=2) 

368 for line in result.stdout.split('\n'): 

369 if f':{port}' in line and 'LISTENING' in line: 

370 try: 

371 # Graceful termination (no /F flag) - works without UAC for user processes 

372 subprocess.run(['taskkill', '/PID', line.split()[-1]], timeout=1) 

373 killed += 1 

374 except: 

375 pass 

376 except: 

377 pass 

378 return killed 

379 

380 @staticmethod 

381 def load_images_from_shared_memory(images, error_callback=None): 

382 """Load images from shared memory and clean up. 

383 

384 Args: 

385 images: List of image info dicts with shm_name, shape, dtype, metadata, image_id 

386 error_callback: Optional callback(image_id, status, error) for errors 

387 

388 Returns: 

389 List of dicts with 'data', 'metadata', 'image_id' keys 

390 """ 

391 import numpy as np 

392 from multiprocessing import shared_memory 

393 

394 image_data_list = [] 

395 for image_info in images: 

396 shm_name = image_info.get('shm_name') 

397 shape = tuple(image_info.get('shape')) 

398 dtype = np.dtype(image_info.get('dtype')) 

399 metadata = image_info.get('metadata', {}) 

400 image_id = image_info.get('image_id') 

401 

402 try: 

403 shm = shared_memory.SharedMemory(name=shm_name) 

404 np_data = np.ndarray(shape, dtype=dtype, buffer=shm.buf).copy() 

405 shm.close() 

406 shm.unlink() 

407 

408 image_data_list.append({ 

409 'data': np_data, 

410 'metadata': metadata, 

411 'image_id': image_id 

412 }) 

413 except Exception as e: 

414 logger.error(f"Failed to read shared memory {shm_name}: {e}") 

415 if error_callback and image_id: 

416 error_callback(image_id, 'error', f"Failed to read shared memory: {e}") 

417 continue 

418 

419 return image_data_list 

420 

421 @staticmethod 

422 def collect_dimension_values(images, components): 

423 """Collect unique dimension value tuples from images. 

424 

425 Args: 

426 images: List of image data dicts with 'metadata' key 

427 components: List of component names to collect 

428 

429 Returns: 

430 Sorted list of unique value tuples 

431 """ 

432 if not components: 

433 return [()] 

434 

435 values = set() 

436 for img_data in images: 

437 meta = img_data['metadata'] 

438 # Fail loud if component missing from metadata 

439 value_tuple = tuple(meta[comp] for comp in components) 

440 values.add(value_tuple) 

441 

442 return sorted(values) 

443 

444 @staticmethod 

445 def organize_components_by_mode(component_order, component_modes, component_unique_values, always_include_window=True, skip_flat_dimensions=True): 

446 """Organize components by their display mode. 

447 

448 Args: 

449 component_order: Ordered list of component names 

450 component_modes: Map of component name to mode ('window', 'channel', 'slice', 'frame') 

451 component_unique_values: Map of component name to set of unique values 

452 always_include_window: If True, include WINDOW components even if flat 

453 skip_flat_dimensions: If True, skip components with cardinality <= 1 for non-window dimensions 

454 

455 Returns: 

456 Dict with keys 'window', 'channel', 'slice', 'frame' mapping to component lists 

457 """ 

458 result = { 

459 'window': [], 

460 'channel': [], 

461 'slice': [], 

462 'frame': [] 

463 } 

464 

465 for comp_name in component_order: 

466 mode = component_modes[comp_name] 

467 is_flat = len(component_unique_values.get(comp_name, set())) <= 1 

468 

469 if mode == 'window': 

470 # Always include WINDOW components, even if flat 

471 result['window'].append(comp_name) 

472 elif skip_flat_dimensions and is_flat: 

473 # Skip flat dimensions for hyperstack axes (only if skip_flat_dimensions=True) 

474 continue 

475 else: 

476 result[mode].append(comp_name) 

477 

478 return result 

479 

480 @abstractmethod 

481 def handle_control_message(self, message): 

482 pass 

483 

484 @abstractmethod 

485 def handle_data_message(self, message): 

486 pass 

487 

488 

489class ZMQClient(ABC): 

490 """ABC for ZMQ clients - dual-channel pattern with auto-spawning.""" 

491 

492 def __init__(self, port, host='localhost', persistent=True, transport_mode=None): 

493 self.port = port 

494 self.host = host 

495 self.control_port = port + CONTROL_PORT_OFFSET 

496 self.persistent = persistent 

497 # Windows doesn't support IPC (POSIX named pipes), so use TCP with localhost 

498 self.transport_mode = transport_mode or get_default_transport_mode() 

499 self.zmq_context = None 

500 self.data_socket = None 

501 self.control_socket = None 

502 self.server_process = None 

503 self._connected = False 

504 self._connected_to_existing = False 

505 self._lock = threading.Lock() 

506 

507 def connect(self, timeout=10.0): 

508 with self._lock: 

509 if self._connected: 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true

510 return True 

511 if self._is_port_in_use(self.port): 

512 if self._try_connect_to_existing(self.port): 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true

513 self._connected = self._connected_to_existing = True 

514 return True 

515 self._kill_processes_on_port(self.port) 

516 self._kill_processes_on_port(self.control_port) 

517 time.sleep(0.5) 

518 self.server_process = self._spawn_server_process() 

519 if not self._wait_for_server_ready(timeout): 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true

520 return False 

521 self._setup_client_sockets() 

522 self._connected = True 

523 return True 

524 

525 def disconnect(self): 

526 with self._lock: 

527 if not self._connected: 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true

528 return 

529 self._cleanup_sockets() 

530 if not self._connected_to_existing and self.server_process and not self.persistent: 530 ↛ 544line 530 didn't jump to line 544 because the condition on line 530 was always true

531 if hasattr(self.server_process, 'is_alive'): 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true

532 if self.server_process.is_alive(): 

533 self.server_process.terminate() 

534 self.server_process.join(timeout=5) 

535 if self.server_process.is_alive(): 

536 self.server_process.kill() 

537 else: 

538 if self.server_process.poll() is None: 538 ↛ 544line 538 didn't jump to line 544 because the condition on line 538 was always true

539 self.server_process.terminate() 

540 try: 

541 self.server_process.wait(timeout=5) 

542 except subprocess.TimeoutExpired: 

543 self.server_process.kill() 

544 self._connected = False 

545 

546 def is_connected(self): 

547 return self._connected 

548 

549 def _setup_client_sockets(self): 

550 import zmq 

551 self.zmq_context = zmq.Context() 

552 data_url = get_zmq_transport_url(self.port, self.transport_mode, self.host) 

553 

554 self.data_socket = self.zmq_context.socket(zmq.SUB) 

555 self.data_socket.setsockopt(zmq.LINGER, 0) 

556 self.data_socket.connect(data_url) 

557 self.data_socket.setsockopt(zmq.SUBSCRIBE, b"") 

558 time.sleep(0.1) 

559 

560 def _cleanup_sockets(self): 

561 if self.data_socket: 561 ↛ 564line 561 didn't jump to line 564 because the condition on line 561 was always true

562 self.data_socket.close() 

563 self.data_socket = None 

564 if self.control_socket: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true

565 self.control_socket.close() 

566 self.control_socket = None 

567 

568 if self.zmq_context: 568 ↛ exitline 568 didn't return from function '_cleanup_sockets' because the condition on line 568 was always true

569 self.zmq_context.term() 

570 self.zmq_context = None 

571 

572 def _try_connect_to_existing(self, port): 

573 import zmq 

574 try: 

575 control_url = get_zmq_transport_url(port + CONTROL_PORT_OFFSET, self.transport_mode, self.host) 

576 

577 ctx = zmq.Context() 

578 sock = ctx.socket(zmq.REQ) 

579 sock.setsockopt(zmq.LINGER, 0) 

580 sock.setsockopt(zmq.RCVTIMEO, 500) 

581 sock.connect(control_url) 

582 sock.send(pickle.dumps({'type': 'ping'})) 

583 response = pickle.loads(sock.recv()) 

584 return response.get('type') == 'pong' and response.get('ready') 

585 except: 

586 return False 

587 finally: 

588 try: 

589 sock.close() 

590 ctx.term() 

591 except: 

592 pass 

593 

594 def _wait_for_server_ready(self, timeout=10.0): 

595 import zmq 

596 start = time.time() 

597 while time.time() - start < timeout: 597 ↛ 602line 597 didn't jump to line 602 because the condition on line 597 was always true

598 if self._is_port_in_use(self.port) and self._is_port_in_use(self.control_port): 

599 break 

600 time.sleep(0.2) 

601 else: 

602 return False 

603 control_url = get_zmq_transport_url(self.control_port, self.transport_mode, self.host) 

604 

605 start = time.time() 

606 while time.time() - start < timeout: 606 ↛ 629line 606 didn't jump to line 629 because the condition on line 606 was always true

607 try: 

608 ctx = zmq.Context() 

609 sock = ctx.socket(zmq.REQ) 

610 sock.setsockopt(zmq.LINGER, 0) 

611 sock.setsockopt(zmq.RCVTIMEO, 1000) 

612 sock.connect(control_url) 

613 sock.send(pickle.dumps({'type': 'ping'})) 

614 response = pickle.loads(sock.recv()) 

615 if response.get('type') == 'pong' and response.get('ready'): 615 ↛ 622line 615 didn't jump to line 622 because the condition on line 615 was always true

616 sock.close() 

617 ctx.term() 

618 return True 

619 except: 

620 pass 

621 finally: 

622 try: 

623 sock.close() 

624 ctx.term() 

625 except: 

626 pass 

627 time.sleep(0.5) 

628 

629 return False 

630 

631 def _is_port_in_use(self, port): 

632 if self.transport_mode == TransportMode.IPC: 632 ↛ 638line 632 didn't jump to line 638 because the condition on line 632 was always true

633 # Use helper function to check IPC socket existence 

634 socket_path = _get_ipc_socket_path(port) 

635 return socket_path.exists() if socket_path else False 

636 else: 

637 # TCP mode - check if port is bound 

638 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

639 sock.settimeout(0.1) 

640 try: 

641 sock.bind(('localhost', port)) 

642 sock.close() 

643 return False 

644 except OSError: 

645 sock.close() 

646 return True 

647 except: 

648 return False 

649 

650 def _find_free_port(self): 

651 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

652 s.bind(('', 0)) 

653 return s.getsockname()[1] 

654 

655 def _kill_processes_on_port(self, port): 

656 try: 

657 # For IPC mode, remove stale socket files using helper function 

658 if self.transport_mode == TransportMode.IPC: 658 ↛ 663line 658 didn't jump to line 663 because the condition on line 658 was always true

659 _remove_ipc_socket(port) 

660 return # IPC mode doesn't use TCP ports, so skip process killing 

661 

662 # TCP mode - kill processes using the port 

663 system = platform.system() 

664 if system in ["Linux", "Darwin"]: 

665 result = subprocess.run(['lsof', '-ti', f'TCP:{port}', '-sTCP:LISTEN'], capture_output=True, text=True, timeout=2) 

666 if result.returncode == 0 and result.stdout.strip(): 

667 for pid in result.stdout.strip().split('\n'): 

668 try: 

669 subprocess.run(['kill', '-9', pid], timeout=1) 

670 except: 

671 pass 

672 elif system == "Windows": 

673 result = subprocess.run(['netstat', '-ano'], capture_output=True, text=True, timeout=2) 

674 for line in result.stdout.split('\n'): 

675 if f':{port}' in line and 'LISTENING' in line: 

676 try: 

677 # Graceful termination (no /F flag) - works without UAC for user processes 

678 subprocess.run(['taskkill', '/PID', line.split()[-1]], timeout=1) 

679 except: 

680 pass 

681 except: 

682 pass 

683 

684 @staticmethod 

685 def scan_servers(ports, host='localhost', timeout_ms=200, transport_mode=None): 

686 import zmq 

687 # Windows doesn't support IPC, so use TCP with localhost 

688 transport_mode = transport_mode or get_default_transport_mode() 

689 servers = [] 

690 for port in ports: 

691 try: 

692 control_port = port + CONTROL_PORT_OFFSET 

693 control_url = get_zmq_transport_url(control_port, transport_mode, host) 

694 

695 ctx = zmq.Context() 

696 sock = ctx.socket(zmq.REQ) 

697 sock.setsockopt(zmq.LINGER, 0) 

698 sock.setsockopt(zmq.RCVTIMEO, timeout_ms) 

699 sock.connect(control_url) 

700 sock.send(pickle.dumps({'type': 'ping'})) 

701 pong = pickle.loads(sock.recv()) 

702 if pong.get('type') == 'pong': 

703 pong['port'] = port 

704 pong['control_port'] = control_port 

705 servers.append(pong) 

706 except: 

707 pass 

708 finally: 

709 try: 

710 sock.close() 

711 ctx.term() 

712 except: 

713 pass 

714 return servers 

715 

716 @staticmethod 

717 def kill_server_on_port(port, graceful=True, timeout=5.0, transport_mode=None, host='localhost'): 

718 """ 

719 Kill server on specified port. 

720 

721 Args: 

722 port: Server port number 

723 graceful: If True, kills workers only (server stays alive). 

724 If False, kills workers AND server (port becomes free). 

725 timeout: Timeout in seconds for server response 

726 transport_mode: TransportMode (IPC or TCP). If None, uses platform default. 

727 host: Host for TCP mode (ignored for IPC) 

728 

729 Returns: 

730 bool: True if operation succeeded 

731 """ 

732 import zmq 

733 transport_mode = transport_mode or get_default_transport_mode() 

734 msg_type = 'shutdown' if graceful else 'force_shutdown' 

735 shutdown_sent = False 

736 shutdown_acknowledged = False 

737 

738 def is_port_free(port): 

739 """Check if port is free (not in use) - only for TCP mode.""" 

740 if transport_mode == TransportMode.IPC: 

741 # IPC mode - check if socket file exists 

742 socket_path = _get_ipc_socket_path(port) 

743 return not (socket_path and socket_path.exists()) 

744 else: 

745 # TCP mode - check if port is bound 

746 sock_test = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

747 sock_test.settimeout(0.1) 

748 try: 

749 sock_test.bind(('localhost', port)) 

750 sock_test.close() 

751 return True 

752 except OSError: 

753 return False 

754 finally: 

755 try: 

756 sock_test.close() 

757 except: 

758 pass 

759 

760 try: 

761 control_port = port + CONTROL_PORT_OFFSET 

762 control_url = get_zmq_transport_url(control_port, transport_mode, host) 

763 

764 ctx = zmq.Context() 

765 sock = ctx.socket(zmq.REQ) 

766 sock.setsockopt(zmq.LINGER, 0) 

767 sock.connect(control_url) 

768 

769 if graceful: 

770 # Graceful shutdown: wait for ack (server stays alive) 

771 sock.setsockopt(zmq.RCVTIMEO, int(timeout * 1000)) 

772 sock.send(pickle.dumps({'type': msg_type})) 

773 shutdown_sent = True 

774 ack = pickle.loads(sock.recv()) 

775 if ack.get('type') == 'shutdown_ack': 

776 logger.info(f"✅ Server on port {port} acknowledged graceful shutdown (workers killed, server alive)") 

777 return True 

778 else: 

779 # Force shutdown: send message and immediately kill - don't wait for ack 

780 # Server is dying, it can't reliably send ack anyway 

781 sock.setsockopt(zmq.SNDTIMEO, 1000) # 1s timeout for send 

782 try: 

783 sock.send(pickle.dumps({'type': msg_type})) 

784 logger.info(f"🔥 Sent force shutdown message to port {port}") 

785 except Exception as send_error: 

786 logger.warning(f"⚠️ Failed to send force shutdown message: {send_error}") 

787 

788 # Don't wait for ack - immediately kill the server 

789 if transport_mode == TransportMode.IPC: 

790 # IPC mode - remove socket files 

791 _remove_ipc_socket(port) 

792 _remove_ipc_socket(control_port) 

793 logger.info(f"✅ Removed IPC socket files for ports {port} and {control_port}") 

794 return True 

795 else: 

796 # TCP mode - kill processes on ports 

797 killed = sum(ZMQServer.kill_processes_on_port(p) for p in [port, control_port]) 

798 logger.info(f"✅ Force killed {killed} processes on ports {port} and {control_port}") 

799 return killed > 0 

800 

801 except Exception as e: 

802 # Connection failed - server might not exist or wrong transport mode 

803 logger.warning(f"❌ Failed to connect to server on port {port} ({transport_mode.value} mode): {e}") 

804 

805 if not graceful: 

806 # Force shutdown failed via ZMQ, try killing processes directly 

807 if transport_mode == TransportMode.IPC: 

808 # IPC mode - remove socket files 

809 _remove_ipc_socket(port) 

810 _remove_ipc_socket(control_port) 

811 logger.info(f"Removed IPC socket files for ports {port} and {control_port}") 

812 return True 

813 else: 

814 # TCP mode - kill processes on ports 

815 killed = sum(ZMQServer.kill_processes_on_port(p) for p in [port, control_port]) 

816 logger.info(f"Force killed {killed} processes on ports {port} and {control_port}") 

817 return killed > 0 

818 

819 logger.warning(f"❌ Failed to shutdown server on port {port} gracefully") 

820 return False 

821 finally: 

822 try: 

823 sock.close() 

824 ctx.term() 

825 except: 

826 pass 

827 

828 # Graceful shutdown failed - no ack received 

829 logger.warning(f"❌ Failed to shutdown server on port {port} gracefully") 

830 return False 

831 

832 @abstractmethod 

833 def _spawn_server_process(self): 

834 pass 

835 

836 @abstractmethod 

837 def send_data(self, data): 

838 pass 

839 

840 

841# ============================================================================ 

842# Registry Export 

843# ============================================================================ 

844# Auto-created registry from ZMQServer base class 

845ZMQ_SERVERS = ZMQServer.__registry__ 

846