Coverage for openhcs/runtime/zmq_execution_client.py: 53.3%
185 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""ZMQ Execution Client - location-transparent pipeline execution."""
3import logging
4import subprocess
5import sys
6import time
7import threading
8import json
9import zmq
10import pickle
11from pathlib import Path
12from openhcs.runtime.zmq_base import ZMQClient
13from openhcs.core.config import TransportMode
14from openhcs.constants.constants import DEFAULT_EXECUTION_SERVER_PORT
16logger = logging.getLogger(__name__)
19class ZMQExecutionClient(ZMQClient):
20 """ZMQ client for OpenHCS pipeline execution with progress streaming."""
22 def __init__(self, port=DEFAULT_EXECUTION_SERVER_PORT, host='localhost', persistent=True, progress_callback=None, transport_mode=None):
23 super().__init__(port, host, persistent, transport_mode)
24 self.progress_callback = progress_callback
25 self._progress_thread = None
26 self._progress_stop_event = threading.Event()
28 def _start_progress_listener(self):
29 if self._progress_thread and self._progress_thread.is_alive():
30 return
31 if not self.progress_callback:
32 return
33 self._progress_stop_event.clear()
34 self._progress_thread = threading.Thread(target=self._progress_listener_loop, daemon=True)
35 self._progress_thread.start()
37 def _stop_progress_listener(self):
38 if not self._progress_thread: 38 ↛ 40line 38 didn't jump to line 40 because the condition on line 38 was always true
39 return
40 self._progress_stop_event.set()
41 if self._progress_thread.is_alive():
42 self._progress_thread.join(timeout=2)
43 self._progress_thread = None
45 def _progress_listener_loop(self):
46 try:
47 while not self._progress_stop_event.is_set():
48 if not self.data_socket:
49 time.sleep(0.1)
50 continue
51 try:
52 message = self.data_socket.recv_string(zmq.NOBLOCK)
53 try:
54 data = json.loads(message)
55 if self.progress_callback and data.get('type') == 'progress':
56 try:
57 self.progress_callback(data)
58 except:
59 pass
60 except json.JSONDecodeError:
61 pass
62 except zmq.Again:
63 time.sleep(0.05)
64 except:
65 time.sleep(0.1)
66 except:
67 pass
69 def submit_pipeline(self, plate_id, pipeline_steps, global_config, pipeline_config=None, config_params=None):
70 """
71 Submit pipeline for execution and return immediately (non-blocking).
73 Use this for UI applications where you want to submit and return immediately,
74 then monitor progress via progress_callback or poll status manually.
76 Args:
77 plate_id: Plate identifier
78 pipeline_steps: List of pipeline steps
79 global_config: GlobalPipelineConfig instance
80 pipeline_config: Optional PipelineConfig instance
81 config_params: Optional config parameters dict
83 Returns:
84 dict with 'status' and 'execution_id' if accepted, or error info
85 """
86 if not self._connected and not self.connect(): 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true
87 raise RuntimeError("Failed to connect to execution server")
88 if self.progress_callback: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 self._start_progress_listener()
90 from openhcs.debug.pickle_to_python import generate_complete_pipeline_steps_code, generate_config_code
91 from openhcs.core.config import GlobalPipelineConfig, PipelineConfig
92 logger.info("🔌 CLIENT: Generating pipeline code...")
93 pipeline_code = generate_complete_pipeline_steps_code(pipeline_steps, clean_mode=True)
94 request = {'type': 'execute', 'plate_id': str(plate_id), 'pipeline_code': pipeline_code}
95 if config_params: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true
96 request['config_params'] = config_params
97 else:
98 request['config_code'] = generate_config_code(global_config, GlobalPipelineConfig, clean_mode=True)
99 if pipeline_config: 99 ↛ 101line 99 didn't jump to line 101 because the condition on line 99 was always true
100 request['pipeline_config_code'] = generate_config_code(pipeline_config, PipelineConfig, clean_mode=True)
101 logger.info("🔌 CLIENT: Sending execute request...")
102 response = self._send_control_request(request)
103 logger.info(f"🔌 CLIENT: Got response: {response.get('status')}")
104 return response
106 def wait_for_completion(self, execution_id, poll_interval=0.5, max_consecutive_errors=5):
107 """
108 Block and wait for execution to complete (blocking).
110 Use this for CLI/script applications where you want to wait for results.
111 For UI applications, use submit_pipeline() and monitor via progress_callback.
113 Args:
114 execution_id: Execution ID from submit_pipeline()
115 poll_interval: Seconds between status polls (default: 0.5)
116 max_consecutive_errors: Max consecutive errors before giving up (default: 5)
118 Returns:
119 dict with 'status' ('complete', 'error', 'cancelled') and results/error info
120 """
121 logger.info(f"🔌 CLIENT: Waiting for execution {execution_id} to complete...")
122 consecutive_errors = 0
123 poll_count = 0
125 while True:
126 time.sleep(poll_interval)
127 poll_count += 1
129 try:
130 logger.info(f"🔌 CLIENT: Status poll #{poll_count} for execution {execution_id}")
131 status_response = self.get_status(execution_id)
132 logger.info(f"🔌 CLIENT: Status response: {status_response}")
133 consecutive_errors = 0 # Reset error counter on success
135 if status_response.get('status') == 'ok': 135 ↛ 148line 135 didn't jump to line 148 because the condition on line 135 was always true
136 execution = status_response.get('execution', {})
137 exec_status = execution.get('status')
138 logger.info(f"🔌 CLIENT: Execution status: {exec_status}")
139 if exec_status == 'complete':
140 logger.info("🔌 CLIENT: Execution complete, returning results")
141 return {'status': 'complete', 'execution_id': execution_id, 'results': execution.get('results_summary', {})}
142 elif exec_status == 'failed':
143 logger.info("🔌 CLIENT: Execution failed")
144 return {'status': 'error', 'execution_id': execution_id, 'message': execution.get('error')}
145 elif exec_status == 'cancelled': 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 logger.info("🔌 CLIENT: Execution cancelled")
147 return {'status': 'cancelled', 'execution_id': execution_id, 'message': 'Execution was cancelled'}
148 elif status_response.get('status') == 'error':
149 # Server returned error status
150 error_msg = status_response.get('message', 'Unknown error')
151 logger.warning(f"Status check returned error: {error_msg}")
152 return {'status': 'error', 'execution_id': execution_id, 'message': error_msg}
154 except Exception as e:
155 # Handle ZMQ errors, connection issues, etc.
156 consecutive_errors += 1
157 logger.warning(f"Error checking execution status (attempt {consecutive_errors}/{max_consecutive_errors}): {e}")
159 if consecutive_errors >= max_consecutive_errors:
160 # Too many consecutive errors - assume execution failed or was cancelled
161 logger.error(f"Failed to get execution status after {max_consecutive_errors} attempts, assuming execution was cancelled")
162 return {'status': 'cancelled', 'execution_id': execution_id,
163 'message': f'Lost connection to server (workers may have been killed)'}
165 # Wait a bit longer before retrying after error
166 time.sleep(1.0)
168 def execute_pipeline(self, plate_id, pipeline_steps, global_config, pipeline_config=None, config_params=None):
169 """
170 Submit pipeline and wait for completion (blocking - for backward compatibility).
172 This is the legacy blocking API. For UI applications, use submit_pipeline() instead.
174 Args:
175 plate_id: Plate identifier
176 pipeline_steps: List of pipeline steps
177 global_config: GlobalPipelineConfig instance
178 pipeline_config: Optional PipelineConfig instance
179 config_params: Optional config parameters dict
181 Returns:
182 dict with execution results (blocks until complete)
183 """
184 response = self.submit_pipeline(plate_id, pipeline_steps, global_config, pipeline_config, config_params)
185 if response.get('status') == 'accepted': 185 ↛ 188line 185 didn't jump to line 188 because the condition on line 185 was always true
186 execution_id = response.get('execution_id')
187 return self.wait_for_completion(execution_id)
188 return response
190 def get_status(self, execution_id=None):
191 request = {'type': 'status'}
192 if execution_id: 192 ↛ 194line 192 didn't jump to line 194 because the condition on line 192 was always true
193 request['execution_id'] = execution_id
194 return self._send_control_request(request)
196 def cancel_execution(self, execution_id):
197 return self._send_control_request({'type': 'cancel', 'execution_id': execution_id})
199 def ping(self):
200 try:
201 pong = self.get_server_info()
202 return pong.get('type') == 'pong' and pong.get('ready')
203 except:
204 return False
206 def get_server_info(self):
207 try:
208 if not self._connected and not self.connect():
209 return {'status': 'error', 'message': 'Not connected'}
210 ctx = zmq.Context()
211 sock = ctx.socket(zmq.REQ)
212 sock.setsockopt(zmq.LINGER, 0)
213 sock.setsockopt(zmq.RCVTIMEO, 1000)
214 sock.connect(f"tcp://{self.host}:{self.control_port}")
215 sock.send(pickle.dumps({'type': 'ping'}))
216 response = pickle.loads(sock.recv())
217 sock.close()
218 ctx.term()
219 return response
220 except:
221 return {'status': 'error', 'message': 'Failed'}
223 def _send_control_request(self, request, timeout_ms=5000):
224 """
225 Send a control request to the server.
227 Args:
228 request: Request dictionary to send
229 timeout_ms: Timeout in milliseconds (default: 5000)
231 Returns:
232 Response dictionary from server
234 Raises:
235 Exception: If request fails or times out
236 """
237 from openhcs.runtime.zmq_base import get_zmq_transport_url
238 ctx = zmq.Context()
239 sock = ctx.socket(zmq.REQ)
240 sock.setsockopt(zmq.LINGER, 0)
241 sock.setsockopt(zmq.RCVTIMEO, timeout_ms) # Set receive timeout
242 control_url = get_zmq_transport_url(self.control_port, self.transport_mode, self.host)
243 sock.connect(control_url)
244 try:
245 sock.send(pickle.dumps(request))
246 response = sock.recv()
247 return pickle.loads(response)
248 except zmq.Again:
249 # Timeout - server didn't respond
250 raise TimeoutError(f"Server did not respond to {request.get('type')} request within {timeout_ms}ms")
251 finally:
252 sock.close()
253 ctx.term()
255 def _spawn_server_process(self):
256 from pathlib import Path
257 import time
258 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs"
259 log_dir.mkdir(parents=True, exist_ok=True)
260 log_file_path = log_dir / f"openhcs_zmq_server_port_{self.port}_{int(time.time() * 1000000)}.log"
261 cmd = [sys.executable, '-m', 'openhcs.runtime.zmq_execution_server_launcher', '--port', str(self.port)]
262 if self.persistent: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true
263 cmd.append('--persistent')
264 cmd.extend(['--log-file-path', str(log_file_path)])
265 cmd.extend(['--transport-mode', self.transport_mode.value])
266 return subprocess.Popen(cmd, stdout=open(log_file_path, 'w'), stderr=subprocess.STDOUT, start_new_session=self.persistent)
268 def disconnect(self):
269 self._stop_progress_listener()
270 super().disconnect()
272 def send_data(self, data):
273 pass
275 def __enter__(self):
276 self.connect()
277 return self
279 def __exit__(self, exc_type, exc_val, exc_tb):
280 self.disconnect()