Coverage for openhcs/runtime/zmq_execution_client.py: 53.3%

185 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1"""ZMQ Execution Client - location-transparent pipeline execution.""" 

2 

3import logging 

4import subprocess 

5import sys 

6import time 

7import threading 

8import json 

9import zmq 

10import pickle 

11from pathlib import Path 

12from openhcs.runtime.zmq_base import ZMQClient 

13from openhcs.core.config import TransportMode 

14from openhcs.constants.constants import DEFAULT_EXECUTION_SERVER_PORT 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class ZMQExecutionClient(ZMQClient): 

20 """ZMQ client for OpenHCS pipeline execution with progress streaming.""" 

21 

22 def __init__(self, port=DEFAULT_EXECUTION_SERVER_PORT, host='localhost', persistent=True, progress_callback=None, transport_mode=None): 

23 super().__init__(port, host, persistent, transport_mode) 

24 self.progress_callback = progress_callback 

25 self._progress_thread = None 

26 self._progress_stop_event = threading.Event() 

27 

28 def _start_progress_listener(self): 

29 if self._progress_thread and self._progress_thread.is_alive(): 

30 return 

31 if not self.progress_callback: 

32 return 

33 self._progress_stop_event.clear() 

34 self._progress_thread = threading.Thread(target=self._progress_listener_loop, daemon=True) 

35 self._progress_thread.start() 

36 

37 def _stop_progress_listener(self): 

38 if not self._progress_thread: 38 ↛ 40line 38 didn't jump to line 40 because the condition on line 38 was always true

39 return 

40 self._progress_stop_event.set() 

41 if self._progress_thread.is_alive(): 

42 self._progress_thread.join(timeout=2) 

43 self._progress_thread = None 

44 

45 def _progress_listener_loop(self): 

46 try: 

47 while not self._progress_stop_event.is_set(): 

48 if not self.data_socket: 

49 time.sleep(0.1) 

50 continue 

51 try: 

52 message = self.data_socket.recv_string(zmq.NOBLOCK) 

53 try: 

54 data = json.loads(message) 

55 if self.progress_callback and data.get('type') == 'progress': 

56 try: 

57 self.progress_callback(data) 

58 except: 

59 pass 

60 except json.JSONDecodeError: 

61 pass 

62 except zmq.Again: 

63 time.sleep(0.05) 

64 except: 

65 time.sleep(0.1) 

66 except: 

67 pass 

68 

69 def submit_pipeline(self, plate_id, pipeline_steps, global_config, pipeline_config=None, config_params=None): 

70 """ 

71 Submit pipeline for execution and return immediately (non-blocking). 

72 

73 Use this for UI applications where you want to submit and return immediately, 

74 then monitor progress via progress_callback or poll status manually. 

75 

76 Args: 

77 plate_id: Plate identifier 

78 pipeline_steps: List of pipeline steps 

79 global_config: GlobalPipelineConfig instance 

80 pipeline_config: Optional PipelineConfig instance 

81 config_params: Optional config parameters dict 

82 

83 Returns: 

84 dict with 'status' and 'execution_id' if accepted, or error info 

85 """ 

86 if not self._connected and not self.connect(): 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 raise RuntimeError("Failed to connect to execution server") 

88 if self.progress_callback: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 self._start_progress_listener() 

90 from openhcs.debug.pickle_to_python import generate_complete_pipeline_steps_code, generate_config_code 

91 from openhcs.core.config import GlobalPipelineConfig, PipelineConfig 

92 logger.info("🔌 CLIENT: Generating pipeline code...") 

93 pipeline_code = generate_complete_pipeline_steps_code(pipeline_steps, clean_mode=True) 

94 request = {'type': 'execute', 'plate_id': str(plate_id), 'pipeline_code': pipeline_code} 

95 if config_params: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 request['config_params'] = config_params 

97 else: 

98 request['config_code'] = generate_config_code(global_config, GlobalPipelineConfig, clean_mode=True) 

99 if pipeline_config: 99 ↛ 101line 99 didn't jump to line 101 because the condition on line 99 was always true

100 request['pipeline_config_code'] = generate_config_code(pipeline_config, PipelineConfig, clean_mode=True) 

101 logger.info("🔌 CLIENT: Sending execute request...") 

102 response = self._send_control_request(request) 

103 logger.info(f"🔌 CLIENT: Got response: {response.get('status')}") 

104 return response 

105 

106 def wait_for_completion(self, execution_id, poll_interval=0.5, max_consecutive_errors=5): 

107 """ 

108 Block and wait for execution to complete (blocking). 

109 

110 Use this for CLI/script applications where you want to wait for results. 

111 For UI applications, use submit_pipeline() and monitor via progress_callback. 

112 

113 Args: 

114 execution_id: Execution ID from submit_pipeline() 

115 poll_interval: Seconds between status polls (default: 0.5) 

116 max_consecutive_errors: Max consecutive errors before giving up (default: 5) 

117 

118 Returns: 

119 dict with 'status' ('complete', 'error', 'cancelled') and results/error info 

120 """ 

121 logger.info(f"🔌 CLIENT: Waiting for execution {execution_id} to complete...") 

122 consecutive_errors = 0 

123 poll_count = 0 

124 

125 while True: 

126 time.sleep(poll_interval) 

127 poll_count += 1 

128 

129 try: 

130 logger.info(f"🔌 CLIENT: Status poll #{poll_count} for execution {execution_id}") 

131 status_response = self.get_status(execution_id) 

132 logger.info(f"🔌 CLIENT: Status response: {status_response}") 

133 consecutive_errors = 0 # Reset error counter on success 

134 

135 if status_response.get('status') == 'ok': 135 ↛ 148line 135 didn't jump to line 148 because the condition on line 135 was always true

136 execution = status_response.get('execution', {}) 

137 exec_status = execution.get('status') 

138 logger.info(f"🔌 CLIENT: Execution status: {exec_status}") 

139 if exec_status == 'complete': 

140 logger.info("🔌 CLIENT: Execution complete, returning results") 

141 return {'status': 'complete', 'execution_id': execution_id, 'results': execution.get('results_summary', {})} 

142 elif exec_status == 'failed': 

143 logger.info("🔌 CLIENT: Execution failed") 

144 return {'status': 'error', 'execution_id': execution_id, 'message': execution.get('error')} 

145 elif exec_status == 'cancelled': 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 logger.info("🔌 CLIENT: Execution cancelled") 

147 return {'status': 'cancelled', 'execution_id': execution_id, 'message': 'Execution was cancelled'} 

148 elif status_response.get('status') == 'error': 

149 # Server returned error status 

150 error_msg = status_response.get('message', 'Unknown error') 

151 logger.warning(f"Status check returned error: {error_msg}") 

152 return {'status': 'error', 'execution_id': execution_id, 'message': error_msg} 

153 

154 except Exception as e: 

155 # Handle ZMQ errors, connection issues, etc. 

156 consecutive_errors += 1 

157 logger.warning(f"Error checking execution status (attempt {consecutive_errors}/{max_consecutive_errors}): {e}") 

158 

159 if consecutive_errors >= max_consecutive_errors: 

160 # Too many consecutive errors - assume execution failed or was cancelled 

161 logger.error(f"Failed to get execution status after {max_consecutive_errors} attempts, assuming execution was cancelled") 

162 return {'status': 'cancelled', 'execution_id': execution_id, 

163 'message': f'Lost connection to server (workers may have been killed)'} 

164 

165 # Wait a bit longer before retrying after error 

166 time.sleep(1.0) 

167 

168 def execute_pipeline(self, plate_id, pipeline_steps, global_config, pipeline_config=None, config_params=None): 

169 """ 

170 Submit pipeline and wait for completion (blocking - for backward compatibility). 

171 

172 This is the legacy blocking API. For UI applications, use submit_pipeline() instead. 

173 

174 Args: 

175 plate_id: Plate identifier 

176 pipeline_steps: List of pipeline steps 

177 global_config: GlobalPipelineConfig instance 

178 pipeline_config: Optional PipelineConfig instance 

179 config_params: Optional config parameters dict 

180 

181 Returns: 

182 dict with execution results (blocks until complete) 

183 """ 

184 response = self.submit_pipeline(plate_id, pipeline_steps, global_config, pipeline_config, config_params) 

185 if response.get('status') == 'accepted': 185 ↛ 188line 185 didn't jump to line 188 because the condition on line 185 was always true

186 execution_id = response.get('execution_id') 

187 return self.wait_for_completion(execution_id) 

188 return response 

189 

190 def get_status(self, execution_id=None): 

191 request = {'type': 'status'} 

192 if execution_id: 192 ↛ 194line 192 didn't jump to line 194 because the condition on line 192 was always true

193 request['execution_id'] = execution_id 

194 return self._send_control_request(request) 

195 

196 def cancel_execution(self, execution_id): 

197 return self._send_control_request({'type': 'cancel', 'execution_id': execution_id}) 

198 

199 def ping(self): 

200 try: 

201 pong = self.get_server_info() 

202 return pong.get('type') == 'pong' and pong.get('ready') 

203 except: 

204 return False 

205 

206 def get_server_info(self): 

207 try: 

208 if not self._connected and not self.connect(): 

209 return {'status': 'error', 'message': 'Not connected'} 

210 ctx = zmq.Context() 

211 sock = ctx.socket(zmq.REQ) 

212 sock.setsockopt(zmq.LINGER, 0) 

213 sock.setsockopt(zmq.RCVTIMEO, 1000) 

214 sock.connect(f"tcp://{self.host}:{self.control_port}") 

215 sock.send(pickle.dumps({'type': 'ping'})) 

216 response = pickle.loads(sock.recv()) 

217 sock.close() 

218 ctx.term() 

219 return response 

220 except: 

221 return {'status': 'error', 'message': 'Failed'} 

222 

223 def _send_control_request(self, request, timeout_ms=5000): 

224 """ 

225 Send a control request to the server. 

226 

227 Args: 

228 request: Request dictionary to send 

229 timeout_ms: Timeout in milliseconds (default: 5000) 

230 

231 Returns: 

232 Response dictionary from server 

233 

234 Raises: 

235 Exception: If request fails or times out 

236 """ 

237 from openhcs.runtime.zmq_base import get_zmq_transport_url 

238 ctx = zmq.Context() 

239 sock = ctx.socket(zmq.REQ) 

240 sock.setsockopt(zmq.LINGER, 0) 

241 sock.setsockopt(zmq.RCVTIMEO, timeout_ms) # Set receive timeout 

242 control_url = get_zmq_transport_url(self.control_port, self.transport_mode, self.host) 

243 sock.connect(control_url) 

244 try: 

245 sock.send(pickle.dumps(request)) 

246 response = sock.recv() 

247 return pickle.loads(response) 

248 except zmq.Again: 

249 # Timeout - server didn't respond 

250 raise TimeoutError(f"Server did not respond to {request.get('type')} request within {timeout_ms}ms") 

251 finally: 

252 sock.close() 

253 ctx.term() 

254 

255 def _spawn_server_process(self): 

256 from pathlib import Path 

257 import time 

258 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs" 

259 log_dir.mkdir(parents=True, exist_ok=True) 

260 log_file_path = log_dir / f"openhcs_zmq_server_port_{self.port}_{int(time.time() * 1000000)}.log" 

261 cmd = [sys.executable, '-m', 'openhcs.runtime.zmq_execution_server_launcher', '--port', str(self.port)] 

262 if self.persistent: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true

263 cmd.append('--persistent') 

264 cmd.extend(['--log-file-path', str(log_file_path)]) 

265 cmd.extend(['--transport-mode', self.transport_mode.value]) 

266 return subprocess.Popen(cmd, stdout=open(log_file_path, 'w'), stderr=subprocess.STDOUT, start_new_session=self.persistent) 

267 

268 def disconnect(self): 

269 self._stop_progress_listener() 

270 super().disconnect() 

271 

272 def send_data(self, data): 

273 pass 

274 

275 def __enter__(self): 

276 self.connect() 

277 return self 

278 

279 def __exit__(self, exc_type, exc_val, exc_tb): 

280 self.disconnect() 

281