Coverage for openhcs/runtime/zmq_base.py: 33.0%
524 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""Generic ZMQ dual-channel pattern (data + control)."""
3import logging
4import socket
5import subprocess
6import platform
7import time
8import threading
9import os
10from abc import ABC, abstractmethod
11from pathlib import Path
12from typing import Optional, Dict, Type
13import pickle
14from openhcs.runtime.zmq_messages import ControlMessageType, ResponseType, MessageFields, PongResponse, SocketType, ImageAck
15from openhcs.constants.constants import (
16 CONTROL_PORT_OFFSET, IPC_SOCKET_DIR_NAME, IPC_SOCKET_PREFIX, IPC_SOCKET_EXTENSION
17)
18from openhcs.core.config import TransportMode
19from openhcs.core.auto_register_meta import AutoRegisterMeta, LazyDiscoveryDict
21logger = logging.getLogger(__name__)
23# Global shared acknowledgment port for all viewers
24# All viewers send acks to this port via PUSH sockets
25# Client listens on this port via PULL socket
26SHARED_ACK_PORT = 7555
28# ============================================================================
29# ZMQ Server Registry
30# ============================================================================
31# Registry will be auto-created by AutoRegisterMeta on ZMQServer base class
32# Access via: ZMQServer.__registry__ (created after class definition below)
35def get_default_transport_mode() -> TransportMode:
36 """
37 Get the default transport mode for the current platform.
39 Windows doesn't support IPC (POSIX named pipes), so use TCP with localhost.
40 Unix-like systems (Linux/macOS) use IPC for better performance.
42 Returns:
43 TransportMode.TCP on Windows, TransportMode.IPC on Unix/macOS
44 """
45 return TransportMode.TCP if platform.system() == 'Windows' else TransportMode.IPC
48# ============================================================================
49# ZMQ Transport Utilities
50# ============================================================================
52def _get_ipc_socket_path(port: int) -> Optional[Path]:
53 """Get IPC socket path for a given port (Unix/Mac only).
55 Args:
56 port: Port number to generate socket path for
58 Returns:
59 Path to IPC socket file, or None on Windows
60 """
61 if platform.system() == 'Windows': 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 return None # Windows uses named pipes, not file paths
64 ipc_dir = Path.home() / ".openhcs" / IPC_SOCKET_DIR_NAME
65 socket_name = f"{IPC_SOCKET_PREFIX}-{port}{IPC_SOCKET_EXTENSION}"
66 return ipc_dir / socket_name
69def _remove_ipc_socket(port: int) -> bool:
70 """Remove stale IPC socket file for a given port.
72 Args:
73 port: Port number whose socket should be removed
75 Returns:
76 True if socket was removed, False otherwise
77 """
78 socket_path = _get_ipc_socket_path(port)
79 if socket_path and socket_path.exists(): 79 ↛ 87line 79 didn't jump to line 87 because the condition on line 79 was always true
80 try:
81 socket_path.unlink()
82 logger.info(f"🧹 Removed stale IPC socket: {socket_path}")
83 return True
84 except Exception as e:
85 logger.warning(f"Failed to remove stale IPC socket {socket_path}: {e}")
86 return False
87 return False
90def get_zmq_transport_url(port: int, transport_mode: TransportMode, host: str = 'localhost') -> str:
91 """Generate ZMQ transport URL based on mode and platform.
93 Args:
94 port: Port number (used for both IPC and TCP)
95 transport_mode: IPC or TCP
96 host: Host for TCP mode (ignored for IPC)
98 Returns:
99 ZMQ transport URL string
101 Raises:
102 ValueError: If transport_mode is invalid or IPC is requested on Windows (fail-loud)
104 Examples:
105 >>> get_zmq_transport_url(5555, TransportMode.IPC) # Unix/Mac
106 'ipc:///home/user/.openhcs/ipc/openhcs-zmq-5555.sock'
108 >>> get_zmq_transport_url(5555, TransportMode.TCP, 'localhost')
109 'tcp://localhost:5555'
110 """
111 if transport_mode == TransportMode.IPC: 111 ↛ 125line 111 didn't jump to line 125 because the condition on line 111 was always true
112 if platform.system() == 'Windows': 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was never true
113 # Windows doesn't support IPC (Unix domain sockets) - fail-loud
114 raise ValueError(
115 "IPC transport mode is not supported on Windows. "
116 "Windows does not support Unix domain sockets. "
117 "Use TransportMode.TCP instead, or use get_default_transport_mode() "
118 "to automatically select the correct mode for the platform."
119 )
120 else:
121 # Unix domain sockets: use helper to get path and ensure directory exists
122 socket_path = _get_ipc_socket_path(port)
123 socket_path.parent.mkdir(parents=True, exist_ok=True) # Fail-loud if permission denied
124 return f"ipc://{socket_path}"
125 elif transport_mode == TransportMode.TCP:
126 return f"tcp://{host}:{port}"
127 else:
128 # Fail-loud for invalid enum (should never happen with proper typing)
129 raise ValueError(f"Invalid transport_mode: {transport_mode}")
132# Global ack listener singleton
133_ack_listener_thread = None
134_ack_listener_lock = threading.Lock()
135_ack_listener_running = False
136_ack_listener_transport_mode = None
139def start_global_ack_listener(transport_mode: TransportMode = None):
140 """Start the global ack listener thread (singleton).
142 This thread listens on SHARED_ACK_PORT for acknowledgments from all viewers
143 and routes them to the appropriate queue trackers.
145 Safe to call multiple times - only starts once.
147 Args:
148 transport_mode: Transport mode to use (IPC or TCP). Defaults to IPC.
149 """
150 global _ack_listener_thread, _ack_listener_running, _ack_listener_transport_mode
152 with _ack_listener_lock:
153 if _ack_listener_running:
154 logger.debug("Ack listener already running")
155 return
157 # Set transport mode (default to IPC on Unix, TCP on Windows)
158 _ack_listener_transport_mode = transport_mode or get_default_transport_mode()
160 logger.info(f"Starting global ack listener on port {SHARED_ACK_PORT} with {_ack_listener_transport_mode.value} transport")
161 _ack_listener_running = True
162 _ack_listener_thread = threading.Thread(
163 target=_ack_listener_loop,
164 daemon=True,
165 name="AckListener"
166 )
167 _ack_listener_thread.start()
170def _ack_listener_loop():
171 """Main loop for ack listener thread.
173 Receives acks from all viewers and routes to queue trackers.
174 """
175 import zmq
176 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry
178 registry = GlobalQueueTrackerRegistry()
179 context = zmq.Context()
180 socket = None
182 try:
183 socket = context.socket(zmq.PULL)
184 ack_url = get_zmq_transport_url(SHARED_ACK_PORT, _ack_listener_transport_mode, '*')
185 socket.bind(ack_url)
186 logger.info(f"✅ Ack listener bound to {ack_url}")
188 while _ack_listener_running:
189 try:
190 # Receive ack message (with timeout to allow checking _ack_listener_running)
191 if socket.poll(timeout=1000): # 1 second timeout
192 ack_dict = socket.recv_json()
194 # Parse ack message
195 try:
196 ack = ImageAck.from_dict(ack_dict)
198 # Route to appropriate queue tracker
199 tracker = registry.get_tracker(ack.viewer_port)
200 if tracker:
201 tracker.mark_processed(ack.image_id)
203 # Trigger UI refresh to show updated progress immediately
204 try:
205 from openhcs.pyqt_gui.widgets.shared.zmq_server_manager import _trigger_manager_refresh_fast
206 _trigger_manager_refresh_fast()
207 except ImportError:
208 # PyQt not available (e.g., in TUI mode) - skip UI refresh
209 pass
210 else:
211 logger.warning(f"Received ack for unknown viewer port {ack.viewer_port}: {ack.image_id}")
213 except Exception as e:
214 logger.error(f"Failed to parse ack message: {e}", exc_info=True)
216 except zmq.ZMQError as e:
217 if _ack_listener_running:
218 logger.error(f"ZMQ error in ack listener: {e}")
219 time.sleep(0.1)
221 except Exception as e:
222 logger.error(f"Fatal error in ack listener: {e}", exc_info=True)
224 finally:
225 if socket:
226 try:
227 socket.close()
228 except:
229 pass
230 try:
231 context.term()
232 except:
233 pass
234 logger.info("Ack listener stopped")
237def stop_global_ack_listener():
238 """Stop the global ack listener thread."""
239 global _ack_listener_running
241 with _ack_listener_lock:
242 if not _ack_listener_running:
243 return
245 logger.info("Stopping global ack listener")
246 _ack_listener_running = False
249class ZMQServer(ABC, metaclass=AutoRegisterMeta):
250 """
251 ABC for ZMQ servers - dual-channel pattern with ping/pong handshake.
253 Registry auto-created and stored as ZMQServer.__registry__.
254 Subclasses auto-register by setting _server_type class attribute.
255 """
256 __registry_key__ = '_server_type'
258 _server_type: Optional[str] = None # Override in subclasses for registration
260 def __init__(self, port, host='*', log_file_path=None, data_socket_type=None, transport_mode=None):
261 import zmq
262 self.port = port
263 self.host = host
264 self.control_port = port + CONTROL_PORT_OFFSET
265 self.log_file_path = log_file_path
266 self.data_socket_type = data_socket_type if data_socket_type is not None else zmq.PUB
267 # Windows doesn't support IPC (POSIX named pipes), so use TCP with localhost
268 self.transport_mode = transport_mode or get_default_transport_mode()
269 self.zmq_context = None
270 self.data_socket = None
271 self.control_socket = None
272 self._running = False
273 self._ready = False
274 self._lock = threading.Lock()
276 def start(self):
277 import zmq
278 with self._lock:
279 if self._running:
280 return
281 self.zmq_context = zmq.Context()
282 self.data_socket = self.zmq_context.socket(self.data_socket_type)
283 self.data_socket.setsockopt(zmq.LINGER, 0)
285 # Set high water mark for SUB/PULL sockets to prevent message drops
286 # when viewer is busy processing (e.g., creating shapes layers that take 2-3 seconds)
287 # REP sockets don't need HWM since they're synchronous (one request at a time)
288 if self.data_socket_type in (zmq.SUB, zmq.PULL):
289 self.data_socket.setsockopt(zmq.RCVHWM, 100000) # Buffer up to 100k messages
290 socket_type_name = "SUB" if self.data_socket_type == zmq.SUB else "PULL"
291 logger.info(f"ZMQ {socket_type_name} socket RCVHWM set to 100000 to prevent drops during blocking operations")
293 data_url = get_zmq_transport_url(self.port, self.transport_mode, self.host)
294 control_url = get_zmq_transport_url(self.control_port, self.transport_mode, self.host)
296 self.data_socket.bind(data_url)
297 if self.data_socket_type == zmq.SUB:
298 self.data_socket.setsockopt(zmq.SUBSCRIBE, b"")
299 self.control_socket = self.zmq_context.socket(zmq.REP)
300 self.control_socket.setsockopt(zmq.LINGER, 0)
301 self.control_socket.bind(control_url)
302 self._running = True
303 logger.info(f"ZMQ Server started on {data_url} ({SocketType.from_zmq_constant(self.data_socket_type).get_display_name()}), control {control_url}")
305 def stop(self):
306 with self._lock:
307 if not self._running:
308 return
309 self._running = False
310 if self.data_socket:
311 self.data_socket.close()
312 self.data_socket = None
313 if self.control_socket:
314 self.control_socket.close()
315 self.control_socket = None
316 if self.zmq_context:
317 self.zmq_context.term()
318 self.zmq_context = None
319 logger.info("ZMQ Server stopped")
321 def is_running(self):
322 return self._running
324 def process_messages(self):
325 import zmq
326 if not self._running:
327 return
328 try:
329 control_data = pickle.loads(self.control_socket.recv(zmq.NOBLOCK))
330 if control_data.get(MessageFields.TYPE) == ControlMessageType.PING.value:
331 if not self._ready:
332 self._ready = True
333 logger.info("Server ready")
334 response = self._create_pong_response()
335 else:
336 response = self.handle_control_message(control_data)
337 self.control_socket.send(pickle.dumps(response))
338 except zmq.Again:
339 pass
341 def _create_pong_response(self):
342 return PongResponse(port=self.port, control_port=self.control_port, ready=self._ready,
343 server=self.__class__.__name__, log_file_path=self.log_file_path).to_dict()
345 def get_status_info(self):
346 return {'port': self.port, 'control_port': self.control_port, 'running': self._running,
347 'ready': self._ready, 'server_type': self.__class__.__name__, 'log_file': self.log_file_path}
349 def request_shutdown(self):
350 self._running = False
352 @staticmethod
353 def kill_processes_on_port(port):
354 killed = 0
355 try:
356 system = platform.system()
357 if system in ["Linux", "Darwin"]: 357 ↛ 366line 357 didn't jump to line 366 because the condition on line 357 was always true
358 result = subprocess.run(['lsof', '-ti', f'TCP:{port}', '-sTCP:LISTEN'], capture_output=True, text=True, timeout=2)
359 if result.returncode == 0 and result.stdout.strip(): 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true
360 for pid in result.stdout.strip().split('\n'):
361 try:
362 subprocess.run(['kill', '-9', pid], timeout=1)
363 killed += 1
364 except:
365 pass
366 elif system == "Windows":
367 result = subprocess.run(['netstat', '-ano'], capture_output=True, text=True, timeout=2)
368 for line in result.stdout.split('\n'):
369 if f':{port}' in line and 'LISTENING' in line:
370 try:
371 # Graceful termination (no /F flag) - works without UAC for user processes
372 subprocess.run(['taskkill', '/PID', line.split()[-1]], timeout=1)
373 killed += 1
374 except:
375 pass
376 except:
377 pass
378 return killed
380 @staticmethod
381 def load_images_from_shared_memory(images, error_callback=None):
382 """Load images from shared memory and clean up.
384 Args:
385 images: List of image info dicts with shm_name, shape, dtype, metadata, image_id
386 error_callback: Optional callback(image_id, status, error) for errors
388 Returns:
389 List of dicts with 'data', 'metadata', 'image_id' keys
390 """
391 import numpy as np
392 from multiprocessing import shared_memory
394 image_data_list = []
395 for image_info in images:
396 shm_name = image_info.get('shm_name')
397 shape = tuple(image_info.get('shape'))
398 dtype = np.dtype(image_info.get('dtype'))
399 metadata = image_info.get('metadata', {})
400 image_id = image_info.get('image_id')
402 try:
403 shm = shared_memory.SharedMemory(name=shm_name)
404 np_data = np.ndarray(shape, dtype=dtype, buffer=shm.buf).copy()
405 shm.close()
406 shm.unlink()
408 image_data_list.append({
409 'data': np_data,
410 'metadata': metadata,
411 'image_id': image_id
412 })
413 except Exception as e:
414 logger.error(f"Failed to read shared memory {shm_name}: {e}")
415 if error_callback and image_id:
416 error_callback(image_id, 'error', f"Failed to read shared memory: {e}")
417 continue
419 return image_data_list
421 @staticmethod
422 def collect_dimension_values(images, components):
423 """Collect unique dimension value tuples from images.
425 Args:
426 images: List of image data dicts with 'metadata' key
427 components: List of component names to collect
429 Returns:
430 Sorted list of unique value tuples
431 """
432 if not components:
433 return [()]
435 values = set()
436 for img_data in images:
437 meta = img_data['metadata']
438 # Fail loud if component missing from metadata
439 value_tuple = tuple(meta[comp] for comp in components)
440 values.add(value_tuple)
442 return sorted(values)
444 @staticmethod
445 def organize_components_by_mode(component_order, component_modes, component_unique_values, always_include_window=True, skip_flat_dimensions=True):
446 """Organize components by their display mode.
448 Args:
449 component_order: Ordered list of component names
450 component_modes: Map of component name to mode ('window', 'channel', 'slice', 'frame')
451 component_unique_values: Map of component name to set of unique values
452 always_include_window: If True, include WINDOW components even if flat
453 skip_flat_dimensions: If True, skip components with cardinality <= 1 for non-window dimensions
455 Returns:
456 Dict with keys 'window', 'channel', 'slice', 'frame' mapping to component lists
457 """
458 result = {
459 'window': [],
460 'channel': [],
461 'slice': [],
462 'frame': []
463 }
465 for comp_name in component_order:
466 mode = component_modes[comp_name]
467 is_flat = len(component_unique_values.get(comp_name, set())) <= 1
469 if mode == 'window':
470 # Always include WINDOW components, even if flat
471 result['window'].append(comp_name)
472 elif skip_flat_dimensions and is_flat:
473 # Skip flat dimensions for hyperstack axes (only if skip_flat_dimensions=True)
474 continue
475 else:
476 result[mode].append(comp_name)
478 return result
480 @abstractmethod
481 def handle_control_message(self, message):
482 pass
484 @abstractmethod
485 def handle_data_message(self, message):
486 pass
489class ZMQClient(ABC):
490 """ABC for ZMQ clients - dual-channel pattern with auto-spawning."""
492 def __init__(self, port, host='localhost', persistent=True, transport_mode=None):
493 self.port = port
494 self.host = host
495 self.control_port = port + CONTROL_PORT_OFFSET
496 self.persistent = persistent
497 # Windows doesn't support IPC (POSIX named pipes), so use TCP with localhost
498 self.transport_mode = transport_mode or get_default_transport_mode()
499 self.zmq_context = None
500 self.data_socket = None
501 self.control_socket = None
502 self.server_process = None
503 self._connected = False
504 self._connected_to_existing = False
505 self._lock = threading.Lock()
507 def connect(self, timeout=10.0):
508 with self._lock:
509 if self._connected: 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true
510 return True
511 if self._is_port_in_use(self.port):
512 if self._try_connect_to_existing(self.port): 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true
513 self._connected = self._connected_to_existing = True
514 return True
515 self._kill_processes_on_port(self.port)
516 self._kill_processes_on_port(self.control_port)
517 time.sleep(0.5)
518 self.server_process = self._spawn_server_process()
519 if not self._wait_for_server_ready(timeout): 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true
520 return False
521 self._setup_client_sockets()
522 self._connected = True
523 return True
525 def disconnect(self):
526 with self._lock:
527 if not self._connected: 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true
528 return
529 self._cleanup_sockets()
530 if not self._connected_to_existing and self.server_process and not self.persistent: 530 ↛ 544line 530 didn't jump to line 544 because the condition on line 530 was always true
531 if hasattr(self.server_process, 'is_alive'): 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true
532 if self.server_process.is_alive():
533 self.server_process.terminate()
534 self.server_process.join(timeout=5)
535 if self.server_process.is_alive():
536 self.server_process.kill()
537 else:
538 if self.server_process.poll() is None: 538 ↛ 544line 538 didn't jump to line 544 because the condition on line 538 was always true
539 self.server_process.terminate()
540 try:
541 self.server_process.wait(timeout=5)
542 except subprocess.TimeoutExpired:
543 self.server_process.kill()
544 self._connected = False
546 def is_connected(self):
547 return self._connected
549 def _setup_client_sockets(self):
550 import zmq
551 self.zmq_context = zmq.Context()
552 data_url = get_zmq_transport_url(self.port, self.transport_mode, self.host)
554 self.data_socket = self.zmq_context.socket(zmq.SUB)
555 self.data_socket.setsockopt(zmq.LINGER, 0)
556 self.data_socket.connect(data_url)
557 self.data_socket.setsockopt(zmq.SUBSCRIBE, b"")
558 time.sleep(0.1)
560 def _cleanup_sockets(self):
561 if self.data_socket: 561 ↛ 564line 561 didn't jump to line 564 because the condition on line 561 was always true
562 self.data_socket.close()
563 self.data_socket = None
564 if self.control_socket: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true
565 self.control_socket.close()
566 self.control_socket = None
568 if self.zmq_context: 568 ↛ exitline 568 didn't return from function '_cleanup_sockets' because the condition on line 568 was always true
569 self.zmq_context.term()
570 self.zmq_context = None
572 def _try_connect_to_existing(self, port):
573 import zmq
574 try:
575 control_url = get_zmq_transport_url(port + CONTROL_PORT_OFFSET, self.transport_mode, self.host)
577 ctx = zmq.Context()
578 sock = ctx.socket(zmq.REQ)
579 sock.setsockopt(zmq.LINGER, 0)
580 sock.setsockopt(zmq.RCVTIMEO, 500)
581 sock.connect(control_url)
582 sock.send(pickle.dumps({'type': 'ping'}))
583 response = pickle.loads(sock.recv())
584 return response.get('type') == 'pong' and response.get('ready')
585 except:
586 return False
587 finally:
588 try:
589 sock.close()
590 ctx.term()
591 except:
592 pass
594 def _wait_for_server_ready(self, timeout=10.0):
595 import zmq
596 start = time.time()
597 while time.time() - start < timeout: 597 ↛ 602line 597 didn't jump to line 602 because the condition on line 597 was always true
598 if self._is_port_in_use(self.port) and self._is_port_in_use(self.control_port):
599 break
600 time.sleep(0.2)
601 else:
602 return False
603 control_url = get_zmq_transport_url(self.control_port, self.transport_mode, self.host)
605 start = time.time()
606 while time.time() - start < timeout: 606 ↛ 629line 606 didn't jump to line 629 because the condition on line 606 was always true
607 try:
608 ctx = zmq.Context()
609 sock = ctx.socket(zmq.REQ)
610 sock.setsockopt(zmq.LINGER, 0)
611 sock.setsockopt(zmq.RCVTIMEO, 1000)
612 sock.connect(control_url)
613 sock.send(pickle.dumps({'type': 'ping'}))
614 response = pickle.loads(sock.recv())
615 if response.get('type') == 'pong' and response.get('ready'): 615 ↛ 622line 615 didn't jump to line 622 because the condition on line 615 was always true
616 sock.close()
617 ctx.term()
618 return True
619 except:
620 pass
621 finally:
622 try:
623 sock.close()
624 ctx.term()
625 except:
626 pass
627 time.sleep(0.5)
629 return False
631 def _is_port_in_use(self, port):
632 if self.transport_mode == TransportMode.IPC: 632 ↛ 638line 632 didn't jump to line 638 because the condition on line 632 was always true
633 # Use helper function to check IPC socket existence
634 socket_path = _get_ipc_socket_path(port)
635 return socket_path.exists() if socket_path else False
636 else:
637 # TCP mode - check if port is bound
638 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
639 sock.settimeout(0.1)
640 try:
641 sock.bind(('localhost', port))
642 sock.close()
643 return False
644 except OSError:
645 sock.close()
646 return True
647 except:
648 return False
650 def _find_free_port(self):
651 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
652 s.bind(('', 0))
653 return s.getsockname()[1]
655 def _kill_processes_on_port(self, port):
656 try:
657 # For IPC mode, remove stale socket files using helper function
658 if self.transport_mode == TransportMode.IPC: 658 ↛ 663line 658 didn't jump to line 663 because the condition on line 658 was always true
659 _remove_ipc_socket(port)
660 return # IPC mode doesn't use TCP ports, so skip process killing
662 # TCP mode - kill processes using the port
663 system = platform.system()
664 if system in ["Linux", "Darwin"]:
665 result = subprocess.run(['lsof', '-ti', f'TCP:{port}', '-sTCP:LISTEN'], capture_output=True, text=True, timeout=2)
666 if result.returncode == 0 and result.stdout.strip():
667 for pid in result.stdout.strip().split('\n'):
668 try:
669 subprocess.run(['kill', '-9', pid], timeout=1)
670 except:
671 pass
672 elif system == "Windows":
673 result = subprocess.run(['netstat', '-ano'], capture_output=True, text=True, timeout=2)
674 for line in result.stdout.split('\n'):
675 if f':{port}' in line and 'LISTENING' in line:
676 try:
677 # Graceful termination (no /F flag) - works without UAC for user processes
678 subprocess.run(['taskkill', '/PID', line.split()[-1]], timeout=1)
679 except:
680 pass
681 except:
682 pass
684 @staticmethod
685 def scan_servers(ports, host='localhost', timeout_ms=200, transport_mode=None):
686 import zmq
687 # Windows doesn't support IPC, so use TCP with localhost
688 transport_mode = transport_mode or get_default_transport_mode()
689 servers = []
690 for port in ports:
691 try:
692 control_port = port + CONTROL_PORT_OFFSET
693 control_url = get_zmq_transport_url(control_port, transport_mode, host)
695 ctx = zmq.Context()
696 sock = ctx.socket(zmq.REQ)
697 sock.setsockopt(zmq.LINGER, 0)
698 sock.setsockopt(zmq.RCVTIMEO, timeout_ms)
699 sock.connect(control_url)
700 sock.send(pickle.dumps({'type': 'ping'}))
701 pong = pickle.loads(sock.recv())
702 if pong.get('type') == 'pong':
703 pong['port'] = port
704 pong['control_port'] = control_port
705 servers.append(pong)
706 except:
707 pass
708 finally:
709 try:
710 sock.close()
711 ctx.term()
712 except:
713 pass
714 return servers
716 @staticmethod
717 def kill_server_on_port(port, graceful=True, timeout=5.0, transport_mode=None, host='localhost'):
718 """
719 Kill server on specified port.
721 Args:
722 port: Server port number
723 graceful: If True, kills workers only (server stays alive).
724 If False, kills workers AND server (port becomes free).
725 timeout: Timeout in seconds for server response
726 transport_mode: TransportMode (IPC or TCP). If None, uses platform default.
727 host: Host for TCP mode (ignored for IPC)
729 Returns:
730 bool: True if operation succeeded
731 """
732 import zmq
733 transport_mode = transport_mode or get_default_transport_mode()
734 msg_type = 'shutdown' if graceful else 'force_shutdown'
735 shutdown_sent = False
736 shutdown_acknowledged = False
738 def is_port_free(port):
739 """Check if port is free (not in use) - only for TCP mode."""
740 if transport_mode == TransportMode.IPC:
741 # IPC mode - check if socket file exists
742 socket_path = _get_ipc_socket_path(port)
743 return not (socket_path and socket_path.exists())
744 else:
745 # TCP mode - check if port is bound
746 sock_test = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
747 sock_test.settimeout(0.1)
748 try:
749 sock_test.bind(('localhost', port))
750 sock_test.close()
751 return True
752 except OSError:
753 return False
754 finally:
755 try:
756 sock_test.close()
757 except:
758 pass
760 try:
761 control_port = port + CONTROL_PORT_OFFSET
762 control_url = get_zmq_transport_url(control_port, transport_mode, host)
764 ctx = zmq.Context()
765 sock = ctx.socket(zmq.REQ)
766 sock.setsockopt(zmq.LINGER, 0)
767 sock.connect(control_url)
769 if graceful:
770 # Graceful shutdown: wait for ack (server stays alive)
771 sock.setsockopt(zmq.RCVTIMEO, int(timeout * 1000))
772 sock.send(pickle.dumps({'type': msg_type}))
773 shutdown_sent = True
774 ack = pickle.loads(sock.recv())
775 if ack.get('type') == 'shutdown_ack':
776 logger.info(f"✅ Server on port {port} acknowledged graceful shutdown (workers killed, server alive)")
777 return True
778 else:
779 # Force shutdown: send message and immediately kill - don't wait for ack
780 # Server is dying, it can't reliably send ack anyway
781 sock.setsockopt(zmq.SNDTIMEO, 1000) # 1s timeout for send
782 try:
783 sock.send(pickle.dumps({'type': msg_type}))
784 logger.info(f"🔥 Sent force shutdown message to port {port}")
785 except Exception as send_error:
786 logger.warning(f"⚠️ Failed to send force shutdown message: {send_error}")
788 # Don't wait for ack - immediately kill the server
789 if transport_mode == TransportMode.IPC:
790 # IPC mode - remove socket files
791 _remove_ipc_socket(port)
792 _remove_ipc_socket(control_port)
793 logger.info(f"✅ Removed IPC socket files for ports {port} and {control_port}")
794 return True
795 else:
796 # TCP mode - kill processes on ports
797 killed = sum(ZMQServer.kill_processes_on_port(p) for p in [port, control_port])
798 logger.info(f"✅ Force killed {killed} processes on ports {port} and {control_port}")
799 return killed > 0
801 except Exception as e:
802 # Connection failed - server might not exist or wrong transport mode
803 logger.warning(f"❌ Failed to connect to server on port {port} ({transport_mode.value} mode): {e}")
805 if not graceful:
806 # Force shutdown failed via ZMQ, try killing processes directly
807 if transport_mode == TransportMode.IPC:
808 # IPC mode - remove socket files
809 _remove_ipc_socket(port)
810 _remove_ipc_socket(control_port)
811 logger.info(f"Removed IPC socket files for ports {port} and {control_port}")
812 return True
813 else:
814 # TCP mode - kill processes on ports
815 killed = sum(ZMQServer.kill_processes_on_port(p) for p in [port, control_port])
816 logger.info(f"Force killed {killed} processes on ports {port} and {control_port}")
817 return killed > 0
819 logger.warning(f"❌ Failed to shutdown server on port {port} gracefully")
820 return False
821 finally:
822 try:
823 sock.close()
824 ctx.term()
825 except:
826 pass
828 # Graceful shutdown failed - no ack received
829 logger.warning(f"❌ Failed to shutdown server on port {port} gracefully")
830 return False
832 @abstractmethod
833 def _spawn_server_process(self):
834 pass
836 @abstractmethod
837 def send_data(self, data):
838 pass
841# ============================================================================
842# Registry Export
843# ============================================================================
844# Auto-created registry from ZMQServer base class
845ZMQ_SERVERS = ZMQServer.__registry__