Coverage for openhcs/runtime/fiji_stream_visualizer.py: 33.3%

283 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Fiji stream visualizer for OpenHCS. 

3 

4Manages Fiji viewer instances for real-time visualization via ZMQ. 

5Uses FijiViewerServer (inherits from ZMQServer) for PyImageJ-based display. 

6Follows same architecture as NapariStreamVisualizer. 

7""" 

8 

9import logging 

10import multiprocessing 

11import subprocess 

12import threading 

13import time 

14from typing import Optional 

15from pathlib import Path 

16 

17from openhcs.io.filemanager import FileManager 

18from openhcs.core.config import TransportMode, FijiStreamingConfig 

19 

20logger = logging.getLogger(__name__) 

21 

22# Global process management for Fiji viewer 

23_global_fiji_process: Optional[multiprocessing.Process] = None 

24_global_fiji_lock = threading.Lock() 

25 

26 

27def _cleanup_global_fiji_viewer() -> None: 

28 """Clean up global Fiji viewer process for test mode.""" 

29 global _global_fiji_process 

30 

31 with _global_fiji_lock: 

32 if _global_fiji_process and _global_fiji_process.is_alive(): 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true

33 logger.info("🔬 FIJI VISUALIZER: Terminating Fiji viewer for test cleanup") 

34 _global_fiji_process.terminate() 

35 _global_fiji_process.join(timeout=3) 

36 

37 if _global_fiji_process.is_alive(): 

38 logger.warning("🔬 FIJI VISUALIZER: Force killing Fiji viewer process") 

39 _global_fiji_process.kill() 

40 _global_fiji_process.join(timeout=1) 

41 

42 _global_fiji_process = None 

43 

44 

45def _spawn_detached_fiji_process( 

46 port: int, 

47 viewer_title: str, 

48 display_config, 

49 transport_mode: TransportMode = TransportMode.IPC, 

50) -> subprocess.Popen: 

51 """ 

52 Spawn a completely detached Fiji viewer process that survives parent termination. 

53 

54 This creates a subprocess that runs independently and won't be terminated when 

55 the parent process exits, enabling true persistence across pipeline runs. 

56 

57 Args: 

58 port: ZMQ port to listen on 

59 viewer_title: Title for the Fiji viewer window 

60 display_config: Display configuration 

61 transport_mode: ZMQ transport mode (IPC or TCP) 

62 """ 

63 import sys 

64 import os 

65 

66 current_dir = os.getcwd() 

67 python_code = f""" 

68import sys 

69import os 

70 

71# Detach from parent process group (Unix only) 

72if hasattr(os, "setsid"): 

73 try: 

74 os.setsid() 

75 except OSError: 

76 pass 

77 

78# Add current working directory to Python path 

79sys.path.insert(0, {repr(current_dir)}) 

80 

81try: 

82 from openhcs.runtime.fiji_viewer_server import _fiji_viewer_server_process 

83 from openhcs.core.config import TransportMode 

84 transport_mode = TransportMode.{transport_mode.name} 

85 _fiji_viewer_server_process({port}, {repr(viewer_title)}, None, {repr(current_dir + "/.fiji_log_path_placeholder")}, transport_mode) 

86except Exception as e: 

87 import logging 

88 logger = logging.getLogger("openhcs.runtime.fiji_detached") 

89 logger.error(f"Detached Fiji error: {{e}}") 

90 import traceback 

91 logger.error(traceback.format_exc()) 

92 sys.exit(1) 

93""" 

94 

95 try: 

96 # Create log file for detached process 

97 log_dir = os.path.expanduser("~/.local/share/openhcs/logs") 

98 os.makedirs(log_dir, exist_ok=True) 

99 log_file = os.path.join(log_dir, f"fiji_detached_port_{port}.log") 

100 

101 # Replace placeholder with actual log file path 

102 python_code = python_code.replace( 

103 repr(current_dir + "/.fiji_log_path_placeholder"), repr(log_file) 

104 ) 

105 

106 # Use subprocess.Popen with detachment flags 

107 if sys.platform == "win32": 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 env = os.environ.copy() 

109 with open(log_file, "w") as log_f: 

110 process = subprocess.Popen( 

111 [sys.executable, "-c", python_code], 

112 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP 

113 | subprocess.DETACHED_PROCESS, 

114 env=env, 

115 cwd=os.getcwd(), 

116 stdout=log_f, 

117 stderr=subprocess.STDOUT, 

118 ) 

119 else: 

120 # Unix: Use start_new_session to detach 

121 env = os.environ.copy() 

122 

123 # Ensure display environment is preserved 

124 if "QT_QPA_PLATFORM" not in env: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true

125 env["QT_QPA_PLATFORM"] = "xcb" 

126 env["QT_X11_NO_MITSHM"] = "1" 

127 

128 log_f = open(log_file, "w") 

129 process = subprocess.Popen( 

130 [sys.executable, "-c", python_code], 

131 env=env, 

132 cwd=os.getcwd(), 

133 stdout=log_f, 

134 stderr=subprocess.STDOUT, 

135 start_new_session=True, # CRITICAL: Detach from parent 

136 ) 

137 

138 logger.info( 

139 f"🔬 FIJI VISUALIZER: Detached Fiji process started (PID: {process.pid}), logging to {log_file}" 

140 ) 

141 return process 

142 

143 except Exception as e: 

144 logger.error(f"🔬 FIJI VISUALIZER: Failed to spawn detached Fiji process: {e}") 

145 raise 

146 

147 

148class FijiStreamVisualizer: 

149 """ 

150 Manages Fiji viewer instance for real-time visualization via ZMQ. 

151 

152 Uses FijiViewerServer (inherits from ZMQServer) for PyImageJ-based display. 

153 Follows same architecture as NapariStreamVisualizer. 

154 """ 

155 

156 def __init__( 

157 self, 

158 filemanager: FileManager, 

159 visualizer_config, 

160 viewer_title: str = "OpenHCS Fiji Visualization", 

161 persistent: bool = True, 

162 port: int = None, 

163 display_config=None, 

164 transport_mode: TransportMode = TransportMode.IPC, 

165 ): 

166 self.filemanager = filemanager 

167 self.viewer_title = viewer_title 

168 self.persistent = persistent 

169 self.visualizer_config = visualizer_config 

170 # Use config class default if not specified 

171 self.port = ( 

172 port 

173 if port is not None 

174 else FijiStreamingConfig.__dataclass_fields__["port"].default 

175 ) 

176 self.display_config = display_config 

177 self.transport_mode = transport_mode # ZMQ transport mode (IPC or TCP) 

178 self.process: Optional[multiprocessing.Process] = None 

179 self._is_running = False 

180 self._connected_to_existing = False 

181 self._lock = threading.Lock() 

182 

183 @property 

184 def is_running(self) -> bool: 

185 """ 

186 Check if the Fiji viewer is actually running. 

187 

188 This property checks the actual process state, not just a cached flag. 

189 Returns True only if the process exists and is alive. 

190 """ 

191 if not self._is_running: 191 ↛ 195line 191 didn't jump to line 195 because the condition on line 191 was always true

192 return False 

193 

194 # If we connected to an existing viewer, verify it's still responsive 

195 if self._connected_to_existing: 

196 # Quick ping check to verify viewer is still alive 

197 if not self._quick_ping_check(): 

198 logger.debug( 

199 f"🔬 FIJI VISUALIZER: Connected viewer on port {self.port} is no longer responsive" 

200 ) 

201 self._is_running = False 

202 self._connected_to_existing = False 

203 return False 

204 return True 

205 

206 if self.process is None: 

207 self._is_running = False 

208 return False 

209 

210 # Check if process is actually alive 

211 try: 

212 if hasattr(self.process, "is_alive"): 

213 # multiprocessing.Process 

214 alive = self.process.is_alive() 

215 else: 

216 # subprocess.Popen 

217 alive = self.process.poll() is None 

218 

219 if not alive: 

220 logger.debug( 

221 f"🔬 FIJI VISUALIZER: Fiji process on port {self.port} is no longer alive" 

222 ) 

223 self._is_running = False 

224 

225 return alive 

226 except Exception as e: 

227 logger.warning(f"🔬 FIJI VISUALIZER: Error checking process status: {e}") 

228 self._is_running = False 

229 return False 

230 

231 def _quick_ping_check(self) -> bool: 

232 """Quick ping check to verify viewer is responsive (for connected viewers).""" 

233 import zmq 

234 import pickle 

235 from openhcs.runtime.zmq_base import get_zmq_transport_url 

236 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

237 

238 try: 

239 ctx = zmq.Context() 

240 sock = ctx.socket(zmq.REQ) 

241 sock.setsockopt(zmq.LINGER, 0) 

242 sock.setsockopt(zmq.RCVTIMEO, 200) # 200ms timeout for quick check 

243 control_url = get_zmq_transport_url( 

244 self.port + CONTROL_PORT_OFFSET, self.transport_mode, "localhost" 

245 ) 

246 sock.connect(control_url) 

247 sock.send(pickle.dumps({"type": "ping"})) 

248 response = pickle.loads(sock.recv()) 

249 sock.close() 

250 ctx.term() 

251 return response.get("type") == "pong" 

252 except: 

253 return False 

254 

255 def wait_for_ready(self, timeout: float = 10.0) -> bool: 

256 """ 

257 Wait for the viewer to be ready to receive images. 

258 

259 This method blocks until the viewer is responsive or the timeout expires. 

260 Should be called after start_viewer() when using async_mode=True. 

261 

262 Args: 

263 timeout: Maximum time to wait in seconds 

264 

265 Returns: 

266 True if viewer is ready, False if timeout 

267 """ 

268 return self._wait_for_server_ready(timeout=timeout) 

269 

270 def start_viewer(self, async_mode: bool = False) -> None: 

271 """Start Fiji viewer server process.""" 

272 global _global_fiji_process 

273 

274 with self._lock: 

275 # Check if there's already a viewer running on the configured port 

276 if self._is_port_in_use(self.port): 

277 # Try to connect to existing viewer first 

278 logger.info( 

279 f"🔬 FIJI VISUALIZER: Port {self.port} is in use, attempting to connect to existing viewer..." 

280 ) 

281 if self._try_connect_to_existing_viewer(): 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 logger.info( 

283 f"🔬 FIJI VISUALIZER: Successfully connected to existing viewer on port {self.port}" 

284 ) 

285 self._is_running = True 

286 self._connected_to_existing = True 

287 return 

288 else: 

289 # Existing viewer is unresponsive - kill it and start fresh 

290 logger.info( 

291 f"🔬 FIJI VISUALIZER: Existing viewer on port {self.port} is unresponsive, killing and restarting..." 

292 ) 

293 from openhcs.runtime.zmq_base import ZMQServer 

294 

295 ZMQServer.kill_processes_on_port(self.port) 

296 ZMQServer.kill_processes_on_port(self.port + 1000) 

297 time.sleep(0.5) 

298 

299 if self._is_running: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 logger.warning("Fiji viewer is already running.") 

301 return 

302 

303 logger.info( 

304 f"🔬 FIJI VISUALIZER: Starting Fiji viewer server on port {self.port} (persistent={self.persistent})" 

305 ) 

306 

307 # ALL viewers (persistent and non-persistent) should be detached subprocess 

308 # so they don't block parent process exit. The difference is only whether 

309 # we terminate them during cleanup. 

310 logger.info( 

311 f"🔬 FIJI VISUALIZER: Creating {'persistent' if self.persistent else 'non-persistent'} Fiji viewer (detached)" 

312 ) 

313 self.process = _spawn_detached_fiji_process( 

314 self.port, self.viewer_title, self.display_config, self.transport_mode 

315 ) 

316 

317 # Only track non-persistent viewers in global variable for test cleanup 

318 if not self.persistent: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 with _global_fiji_lock: 

320 _global_fiji_process = self.process 

321 

322 # Wait for server to be ready before setting is_running flag 

323 # This ensures the viewer is actually ready to receive messages 

324 if async_mode: 324 ↛ 341line 324 didn't jump to line 341 because the condition on line 324 was always true

325 # For async mode, wait in background thread 

326 def wait_and_set_ready(): 

327 if self._wait_for_server_ready(timeout=10.0): 

328 self._is_running = True 

329 logger.info( 

330 f"🔬 FIJI VISUALIZER: Fiji viewer server ready (PID: {self.process.pid if hasattr(self.process, 'pid') else 'unknown'})" 

331 ) 

332 else: 

333 logger.error( 

334 "🔬 FIJI VISUALIZER: Fiji viewer server failed to become ready" 

335 ) 

336 

337 thread = threading.Thread(target=wait_and_set_ready, daemon=True) 

338 thread.start() 

339 else: 

340 # For sync mode, wait immediately 

341 if self._wait_for_server_ready(timeout=10.0): 

342 self._is_running = True 

343 logger.info( 

344 f"🔬 FIJI VISUALIZER: Fiji viewer server ready (PID: {self.process.pid if hasattr(self.process, 'pid') else 'unknown'})" 

345 ) 

346 else: 

347 logger.error( 

348 "🔬 FIJI VISUALIZER: Fiji viewer server failed to become ready" 

349 ) 

350 

351 def _is_port_in_use(self, port: int) -> bool: 

352 """Check if a port/socket is in use (handles both IPC and TCP modes).""" 

353 from openhcs.runtime.zmq_base import _get_ipc_socket_path 

354 

355 if self.transport_mode == TransportMode.IPC: 355 ↛ 361line 355 didn't jump to line 361 because the condition on line 355 was always true

356 # IPC mode - check if socket file exists 

357 socket_path = _get_ipc_socket_path(port) 

358 return socket_path.exists() if socket_path else False 

359 else: 

360 # TCP mode - check if port is bound 

361 import socket 

362 

363 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

364 sock.settimeout(0.1) 

365 try: 

366 sock.bind(("localhost", port)) 

367 sock.close() 

368 return False 

369 except OSError: 

370 sock.close() 

371 return True 

372 

373 def _try_connect_to_existing_viewer(self) -> bool: 

374 """Try to connect to an existing Fiji viewer and verify it's responsive.""" 

375 import zmq 

376 import pickle 

377 from openhcs.runtime.zmq_base import get_zmq_transport_url 

378 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

379 

380 try: 

381 ctx = zmq.Context() 

382 sock = ctx.socket(zmq.REQ) 

383 sock.setsockopt(zmq.LINGER, 0) 

384 sock.setsockopt(zmq.RCVTIMEO, 500) # 500ms timeout 

385 control_url = get_zmq_transport_url( 

386 self.port + CONTROL_PORT_OFFSET, self.transport_mode, "localhost" 

387 ) 

388 sock.connect(control_url) 

389 

390 # Send ping 

391 sock.send(pickle.dumps({"type": "ping"})) 

392 response = pickle.loads(sock.recv()) 

393 

394 sock.close() 

395 ctx.term() 

396 

397 return response.get("type") == "pong" and response.get("ready") 

398 except: 

399 return False 

400 

401 def _wait_for_server_ready(self, timeout: float = 10.0) -> bool: 

402 """Wait for Fiji server to be ready via ping/pong.""" 

403 import zmq 

404 import pickle 

405 from openhcs.runtime.zmq_base import get_zmq_transport_url 

406 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

407 

408 logger.info( 

409 f"🔬 FIJI VISUALIZER: Waiting for server on port {self.port} to be ready..." 

410 ) 

411 

412 start_time = time.time() 

413 while time.time() - start_time < timeout: 

414 try: 

415 # Simple ping/pong check 

416 ctx = zmq.Context() 

417 sock = ctx.socket(zmq.REQ) 

418 sock.setsockopt(zmq.LINGER, 0) 

419 sock.setsockopt(zmq.RCVTIMEO, 500) # 500ms timeout 

420 control_url = get_zmq_transport_url( 

421 self.port + CONTROL_PORT_OFFSET, self.transport_mode, "localhost" 

422 ) 

423 sock.connect(control_url) 

424 

425 # Send ping 

426 sock.send(pickle.dumps({"type": "ping"})) 

427 response = pickle.loads(sock.recv()) 

428 

429 sock.close() 

430 ctx.term() 

431 

432 if response.get("type") == "pong": 

433 logger.info(f"🔬 FIJI VISUALIZER: Server ready on port {self.port}") 

434 return True 

435 except Exception as e: 

436 logger.debug(f"🔬 FIJI VISUALIZER: Ping failed: {e}") 

437 

438 time.sleep(0.2) 

439 

440 logger.warning( 

441 f"🔬 FIJI VISUALIZER: Timeout waiting for server on port {self.port}" 

442 ) 

443 return False 

444 

445 def send_control_message(self, message_type: str, timeout: float = 2.0) -> bool: 

446 """ 

447 Send a control message to the Fiji viewer. 

448 

449 Args: 

450 message_type: Type of control message ('clear_state', 'shutdown', etc.) 

451 timeout: Timeout in seconds for waiting for response 

452 

453 Returns: 

454 True if message was sent and acknowledged, False otherwise 

455 """ 

456 if not self.is_running or self.port is None: 456 ↛ 462line 456 didn't jump to line 462 because the condition on line 456 was always true

457 logger.warning( 

458 f"🔬 FIJI VISUALIZER: Cannot send {message_type} - viewer not running" 

459 ) 

460 return False 

461 

462 import zmq 

463 import pickle 

464 from openhcs.runtime.zmq_base import get_zmq_transport_url 

465 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

466 

467 control_port = self.port + CONTROL_PORT_OFFSET 

468 control_context = None 

469 control_socket = None 

470 

471 try: 

472 control_context = zmq.Context() 

473 control_socket = control_context.socket(zmq.REQ) 

474 control_socket.setsockopt(zmq.LINGER, 0) 

475 control_socket.setsockopt(zmq.RCVTIMEO, int(timeout * 1000)) 

476 control_url = get_zmq_transport_url( 

477 control_port, self.transport_mode, "localhost" 

478 ) 

479 control_socket.connect(control_url) 

480 

481 # Send control message 

482 message = {"type": message_type} 

483 control_socket.send(pickle.dumps(message)) 

484 

485 # Wait for acknowledgment 

486 response = control_socket.recv() 

487 response_data = pickle.loads(response) 

488 

489 if response_data.get("status") == "success": 

490 logger.info( 

491 f"🔬 FIJI VISUALIZER: {message_type} acknowledged by viewer" 

492 ) 

493 return True 

494 else: 

495 logger.warning( 

496 f"🔬 FIJI VISUALIZER: {message_type} failed: {response_data}" 

497 ) 

498 return False 

499 

500 except zmq.Again: 

501 logger.warning( 

502 f"🔬 FIJI VISUALIZER: Timeout waiting for {message_type} acknowledgment" 

503 ) 

504 return False 

505 except Exception as e: 

506 logger.warning(f"🔬 FIJI VISUALIZER: Failed to send {message_type}: {e}") 

507 return False 

508 finally: 

509 if control_socket: 

510 try: 

511 control_socket.close() 

512 except Exception as e: 

513 logger.debug(f"Failed to close control socket: {e}") 

514 if control_context: 

515 try: 

516 control_context.term() 

517 except Exception as e: 

518 logger.debug(f"Failed to terminate control context: {e}") 

519 

520 def clear_viewer_state(self) -> bool: 

521 """ 

522 Clear accumulated viewer state (dimension values, hyperstack metadata) for a new pipeline run. 

523 

524 Returns: 

525 True if state was cleared successfully, False otherwise 

526 """ 

527 return self.send_control_message("clear_state") 

528 

529 def stop_viewer(self) -> None: 

530 """Stop Fiji viewer server (only if not persistent).""" 

531 global _global_fiji_process 

532 

533 with self._lock: 

534 if not self.persistent: 

535 logger.info("🔬 FIJI VISUALIZER: Stopping non-persistent Fiji viewer") 

536 

537 if self.process: 

538 # Handle both subprocess and multiprocessing process types 

539 if hasattr(self.process, "is_alive"): 

540 # multiprocessing.Process 

541 if self.process.is_alive(): 

542 self.process.terminate() 

543 self.process.join(timeout=5) 

544 if self.process.is_alive(): 

545 logger.warning( 

546 "🔬 FIJI VISUALIZER: Force killing Fiji viewer" 

547 ) 

548 self.process.kill() 

549 self.process.join(timeout=2) 

550 else: 

551 # subprocess.Popen 

552 if self.process.poll() is None: 

553 self.process.terminate() 

554 try: 

555 self.process.wait(timeout=5) 

556 except subprocess.TimeoutExpired: 

557 logger.warning( 

558 "🔬 FIJI VISUALIZER: Force killing Fiji viewer" 

559 ) 

560 self.process.kill() 

561 

562 # Clear global reference 

563 with _global_fiji_lock: 

564 if _global_fiji_process == self.process: 

565 _global_fiji_process = None 

566 

567 self._is_running = False 

568 else: 

569 logger.info("🔬 FIJI VISUALIZER: Keeping persistent Fiji viewer alive") 

570 # DON'T set _is_running = False for persistent viewers! 

571 # The process is still alive and should be reusable 

572 

573 def _cleanup_zmq(self) -> None: 

574 """Clean up ZMQ client connection (for persistent viewers).""" 

575 if self._zmq_client: 

576 try: 

577 self._zmq_client.close() 

578 except Exception as e: 

579 logger.warning(f"🔬 FIJI VISUALIZER: Failed to cleanup ZMQ client: {e}") 

580 self._zmq_client = None 

581 

582 def is_viewer_running(self) -> bool: 

583 """Check if Fiji viewer process is running.""" 

584 return self.is_running and self.process is not None and self.process.is_alive()