Coverage for openhcs/runtime/fiji_stream_visualizer.py: 33.3%
283 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Fiji stream visualizer for OpenHCS.
4Manages Fiji viewer instances for real-time visualization via ZMQ.
5Uses FijiViewerServer (inherits from ZMQServer) for PyImageJ-based display.
6Follows same architecture as NapariStreamVisualizer.
7"""
9import logging
10import multiprocessing
11import subprocess
12import threading
13import time
14from typing import Optional
15from pathlib import Path
17from openhcs.io.filemanager import FileManager
18from openhcs.core.config import TransportMode, FijiStreamingConfig
20logger = logging.getLogger(__name__)
22# Global process management for Fiji viewer
23_global_fiji_process: Optional[multiprocessing.Process] = None
24_global_fiji_lock = threading.Lock()
27def _cleanup_global_fiji_viewer() -> None:
28 """Clean up global Fiji viewer process for test mode."""
29 global _global_fiji_process
31 with _global_fiji_lock:
32 if _global_fiji_process and _global_fiji_process.is_alive(): 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true
33 logger.info("🔬 FIJI VISUALIZER: Terminating Fiji viewer for test cleanup")
34 _global_fiji_process.terminate()
35 _global_fiji_process.join(timeout=3)
37 if _global_fiji_process.is_alive():
38 logger.warning("🔬 FIJI VISUALIZER: Force killing Fiji viewer process")
39 _global_fiji_process.kill()
40 _global_fiji_process.join(timeout=1)
42 _global_fiji_process = None
45def _spawn_detached_fiji_process(
46 port: int,
47 viewer_title: str,
48 display_config,
49 transport_mode: TransportMode = TransportMode.IPC,
50) -> subprocess.Popen:
51 """
52 Spawn a completely detached Fiji viewer process that survives parent termination.
54 This creates a subprocess that runs independently and won't be terminated when
55 the parent process exits, enabling true persistence across pipeline runs.
57 Args:
58 port: ZMQ port to listen on
59 viewer_title: Title for the Fiji viewer window
60 display_config: Display configuration
61 transport_mode: ZMQ transport mode (IPC or TCP)
62 """
63 import sys
64 import os
66 current_dir = os.getcwd()
67 python_code = f"""
68import sys
69import os
71# Detach from parent process group (Unix only)
72if hasattr(os, "setsid"):
73 try:
74 os.setsid()
75 except OSError:
76 pass
78# Add current working directory to Python path
79sys.path.insert(0, {repr(current_dir)})
81try:
82 from openhcs.runtime.fiji_viewer_server import _fiji_viewer_server_process
83 from openhcs.core.config import TransportMode
84 transport_mode = TransportMode.{transport_mode.name}
85 _fiji_viewer_server_process({port}, {repr(viewer_title)}, None, {repr(current_dir + "/.fiji_log_path_placeholder")}, transport_mode)
86except Exception as e:
87 import logging
88 logger = logging.getLogger("openhcs.runtime.fiji_detached")
89 logger.error(f"Detached Fiji error: {{e}}")
90 import traceback
91 logger.error(traceback.format_exc())
92 sys.exit(1)
93"""
95 try:
96 # Create log file for detached process
97 log_dir = os.path.expanduser("~/.local/share/openhcs/logs")
98 os.makedirs(log_dir, exist_ok=True)
99 log_file = os.path.join(log_dir, f"fiji_detached_port_{port}.log")
101 # Replace placeholder with actual log file path
102 python_code = python_code.replace(
103 repr(current_dir + "/.fiji_log_path_placeholder"), repr(log_file)
104 )
106 # Use subprocess.Popen with detachment flags
107 if sys.platform == "win32": 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 env = os.environ.copy()
109 with open(log_file, "w") as log_f:
110 process = subprocess.Popen(
111 [sys.executable, "-c", python_code],
112 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
113 | subprocess.DETACHED_PROCESS,
114 env=env,
115 cwd=os.getcwd(),
116 stdout=log_f,
117 stderr=subprocess.STDOUT,
118 )
119 else:
120 # Unix: Use start_new_session to detach
121 env = os.environ.copy()
123 # Ensure display environment is preserved
124 if "QT_QPA_PLATFORM" not in env: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true
125 env["QT_QPA_PLATFORM"] = "xcb"
126 env["QT_X11_NO_MITSHM"] = "1"
128 log_f = open(log_file, "w")
129 process = subprocess.Popen(
130 [sys.executable, "-c", python_code],
131 env=env,
132 cwd=os.getcwd(),
133 stdout=log_f,
134 stderr=subprocess.STDOUT,
135 start_new_session=True, # CRITICAL: Detach from parent
136 )
138 logger.info(
139 f"🔬 FIJI VISUALIZER: Detached Fiji process started (PID: {process.pid}), logging to {log_file}"
140 )
141 return process
143 except Exception as e:
144 logger.error(f"🔬 FIJI VISUALIZER: Failed to spawn detached Fiji process: {e}")
145 raise
148class FijiStreamVisualizer:
149 """
150 Manages Fiji viewer instance for real-time visualization via ZMQ.
152 Uses FijiViewerServer (inherits from ZMQServer) for PyImageJ-based display.
153 Follows same architecture as NapariStreamVisualizer.
154 """
156 def __init__(
157 self,
158 filemanager: FileManager,
159 visualizer_config,
160 viewer_title: str = "OpenHCS Fiji Visualization",
161 persistent: bool = True,
162 port: int = None,
163 display_config=None,
164 transport_mode: TransportMode = TransportMode.IPC,
165 ):
166 self.filemanager = filemanager
167 self.viewer_title = viewer_title
168 self.persistent = persistent
169 self.visualizer_config = visualizer_config
170 # Use config class default if not specified
171 self.port = (
172 port
173 if port is not None
174 else FijiStreamingConfig.__dataclass_fields__["port"].default
175 )
176 self.display_config = display_config
177 self.transport_mode = transport_mode # ZMQ transport mode (IPC or TCP)
178 self.process: Optional[multiprocessing.Process] = None
179 self._is_running = False
180 self._connected_to_existing = False
181 self._lock = threading.Lock()
183 @property
184 def is_running(self) -> bool:
185 """
186 Check if the Fiji viewer is actually running.
188 This property checks the actual process state, not just a cached flag.
189 Returns True only if the process exists and is alive.
190 """
191 if not self._is_running: 191 ↛ 195line 191 didn't jump to line 195 because the condition on line 191 was always true
192 return False
194 # If we connected to an existing viewer, verify it's still responsive
195 if self._connected_to_existing:
196 # Quick ping check to verify viewer is still alive
197 if not self._quick_ping_check():
198 logger.debug(
199 f"🔬 FIJI VISUALIZER: Connected viewer on port {self.port} is no longer responsive"
200 )
201 self._is_running = False
202 self._connected_to_existing = False
203 return False
204 return True
206 if self.process is None:
207 self._is_running = False
208 return False
210 # Check if process is actually alive
211 try:
212 if hasattr(self.process, "is_alive"):
213 # multiprocessing.Process
214 alive = self.process.is_alive()
215 else:
216 # subprocess.Popen
217 alive = self.process.poll() is None
219 if not alive:
220 logger.debug(
221 f"🔬 FIJI VISUALIZER: Fiji process on port {self.port} is no longer alive"
222 )
223 self._is_running = False
225 return alive
226 except Exception as e:
227 logger.warning(f"🔬 FIJI VISUALIZER: Error checking process status: {e}")
228 self._is_running = False
229 return False
231 def _quick_ping_check(self) -> bool:
232 """Quick ping check to verify viewer is responsive (for connected viewers)."""
233 import zmq
234 import pickle
235 from openhcs.runtime.zmq_base import get_zmq_transport_url
236 from openhcs.constants.constants import CONTROL_PORT_OFFSET
238 try:
239 ctx = zmq.Context()
240 sock = ctx.socket(zmq.REQ)
241 sock.setsockopt(zmq.LINGER, 0)
242 sock.setsockopt(zmq.RCVTIMEO, 200) # 200ms timeout for quick check
243 control_url = get_zmq_transport_url(
244 self.port + CONTROL_PORT_OFFSET, self.transport_mode, "localhost"
245 )
246 sock.connect(control_url)
247 sock.send(pickle.dumps({"type": "ping"}))
248 response = pickle.loads(sock.recv())
249 sock.close()
250 ctx.term()
251 return response.get("type") == "pong"
252 except:
253 return False
255 def wait_for_ready(self, timeout: float = 10.0) -> bool:
256 """
257 Wait for the viewer to be ready to receive images.
259 This method blocks until the viewer is responsive or the timeout expires.
260 Should be called after start_viewer() when using async_mode=True.
262 Args:
263 timeout: Maximum time to wait in seconds
265 Returns:
266 True if viewer is ready, False if timeout
267 """
268 return self._wait_for_server_ready(timeout=timeout)
270 def start_viewer(self, async_mode: bool = False) -> None:
271 """Start Fiji viewer server process."""
272 global _global_fiji_process
274 with self._lock:
275 # Check if there's already a viewer running on the configured port
276 if self._is_port_in_use(self.port):
277 # Try to connect to existing viewer first
278 logger.info(
279 f"🔬 FIJI VISUALIZER: Port {self.port} is in use, attempting to connect to existing viewer..."
280 )
281 if self._try_connect_to_existing_viewer(): 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 logger.info(
283 f"🔬 FIJI VISUALIZER: Successfully connected to existing viewer on port {self.port}"
284 )
285 self._is_running = True
286 self._connected_to_existing = True
287 return
288 else:
289 # Existing viewer is unresponsive - kill it and start fresh
290 logger.info(
291 f"🔬 FIJI VISUALIZER: Existing viewer on port {self.port} is unresponsive, killing and restarting..."
292 )
293 from openhcs.runtime.zmq_base import ZMQServer
295 ZMQServer.kill_processes_on_port(self.port)
296 ZMQServer.kill_processes_on_port(self.port + 1000)
297 time.sleep(0.5)
299 if self._is_running: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 logger.warning("Fiji viewer is already running.")
301 return
303 logger.info(
304 f"🔬 FIJI VISUALIZER: Starting Fiji viewer server on port {self.port} (persistent={self.persistent})"
305 )
307 # ALL viewers (persistent and non-persistent) should be detached subprocess
308 # so they don't block parent process exit. The difference is only whether
309 # we terminate them during cleanup.
310 logger.info(
311 f"🔬 FIJI VISUALIZER: Creating {'persistent' if self.persistent else 'non-persistent'} Fiji viewer (detached)"
312 )
313 self.process = _spawn_detached_fiji_process(
314 self.port, self.viewer_title, self.display_config, self.transport_mode
315 )
317 # Only track non-persistent viewers in global variable for test cleanup
318 if not self.persistent: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 with _global_fiji_lock:
320 _global_fiji_process = self.process
322 # Wait for server to be ready before setting is_running flag
323 # This ensures the viewer is actually ready to receive messages
324 if async_mode: 324 ↛ 341line 324 didn't jump to line 341 because the condition on line 324 was always true
325 # For async mode, wait in background thread
326 def wait_and_set_ready():
327 if self._wait_for_server_ready(timeout=10.0):
328 self._is_running = True
329 logger.info(
330 f"🔬 FIJI VISUALIZER: Fiji viewer server ready (PID: {self.process.pid if hasattr(self.process, 'pid') else 'unknown'})"
331 )
332 else:
333 logger.error(
334 "🔬 FIJI VISUALIZER: Fiji viewer server failed to become ready"
335 )
337 thread = threading.Thread(target=wait_and_set_ready, daemon=True)
338 thread.start()
339 else:
340 # For sync mode, wait immediately
341 if self._wait_for_server_ready(timeout=10.0):
342 self._is_running = True
343 logger.info(
344 f"🔬 FIJI VISUALIZER: Fiji viewer server ready (PID: {self.process.pid if hasattr(self.process, 'pid') else 'unknown'})"
345 )
346 else:
347 logger.error(
348 "🔬 FIJI VISUALIZER: Fiji viewer server failed to become ready"
349 )
351 def _is_port_in_use(self, port: int) -> bool:
352 """Check if a port/socket is in use (handles both IPC and TCP modes)."""
353 from openhcs.runtime.zmq_base import _get_ipc_socket_path
355 if self.transport_mode == TransportMode.IPC: 355 ↛ 361line 355 didn't jump to line 361 because the condition on line 355 was always true
356 # IPC mode - check if socket file exists
357 socket_path = _get_ipc_socket_path(port)
358 return socket_path.exists() if socket_path else False
359 else:
360 # TCP mode - check if port is bound
361 import socket
363 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
364 sock.settimeout(0.1)
365 try:
366 sock.bind(("localhost", port))
367 sock.close()
368 return False
369 except OSError:
370 sock.close()
371 return True
373 def _try_connect_to_existing_viewer(self) -> bool:
374 """Try to connect to an existing Fiji viewer and verify it's responsive."""
375 import zmq
376 import pickle
377 from openhcs.runtime.zmq_base import get_zmq_transport_url
378 from openhcs.constants.constants import CONTROL_PORT_OFFSET
380 try:
381 ctx = zmq.Context()
382 sock = ctx.socket(zmq.REQ)
383 sock.setsockopt(zmq.LINGER, 0)
384 sock.setsockopt(zmq.RCVTIMEO, 500) # 500ms timeout
385 control_url = get_zmq_transport_url(
386 self.port + CONTROL_PORT_OFFSET, self.transport_mode, "localhost"
387 )
388 sock.connect(control_url)
390 # Send ping
391 sock.send(pickle.dumps({"type": "ping"}))
392 response = pickle.loads(sock.recv())
394 sock.close()
395 ctx.term()
397 return response.get("type") == "pong" and response.get("ready")
398 except:
399 return False
401 def _wait_for_server_ready(self, timeout: float = 10.0) -> bool:
402 """Wait for Fiji server to be ready via ping/pong."""
403 import zmq
404 import pickle
405 from openhcs.runtime.zmq_base import get_zmq_transport_url
406 from openhcs.constants.constants import CONTROL_PORT_OFFSET
408 logger.info(
409 f"🔬 FIJI VISUALIZER: Waiting for server on port {self.port} to be ready..."
410 )
412 start_time = time.time()
413 while time.time() - start_time < timeout:
414 try:
415 # Simple ping/pong check
416 ctx = zmq.Context()
417 sock = ctx.socket(zmq.REQ)
418 sock.setsockopt(zmq.LINGER, 0)
419 sock.setsockopt(zmq.RCVTIMEO, 500) # 500ms timeout
420 control_url = get_zmq_transport_url(
421 self.port + CONTROL_PORT_OFFSET, self.transport_mode, "localhost"
422 )
423 sock.connect(control_url)
425 # Send ping
426 sock.send(pickle.dumps({"type": "ping"}))
427 response = pickle.loads(sock.recv())
429 sock.close()
430 ctx.term()
432 if response.get("type") == "pong":
433 logger.info(f"🔬 FIJI VISUALIZER: Server ready on port {self.port}")
434 return True
435 except Exception as e:
436 logger.debug(f"🔬 FIJI VISUALIZER: Ping failed: {e}")
438 time.sleep(0.2)
440 logger.warning(
441 f"🔬 FIJI VISUALIZER: Timeout waiting for server on port {self.port}"
442 )
443 return False
445 def send_control_message(self, message_type: str, timeout: float = 2.0) -> bool:
446 """
447 Send a control message to the Fiji viewer.
449 Args:
450 message_type: Type of control message ('clear_state', 'shutdown', etc.)
451 timeout: Timeout in seconds for waiting for response
453 Returns:
454 True if message was sent and acknowledged, False otherwise
455 """
456 if not self.is_running or self.port is None: 456 ↛ 462line 456 didn't jump to line 462 because the condition on line 456 was always true
457 logger.warning(
458 f"🔬 FIJI VISUALIZER: Cannot send {message_type} - viewer not running"
459 )
460 return False
462 import zmq
463 import pickle
464 from openhcs.runtime.zmq_base import get_zmq_transport_url
465 from openhcs.constants.constants import CONTROL_PORT_OFFSET
467 control_port = self.port + CONTROL_PORT_OFFSET
468 control_context = None
469 control_socket = None
471 try:
472 control_context = zmq.Context()
473 control_socket = control_context.socket(zmq.REQ)
474 control_socket.setsockopt(zmq.LINGER, 0)
475 control_socket.setsockopt(zmq.RCVTIMEO, int(timeout * 1000))
476 control_url = get_zmq_transport_url(
477 control_port, self.transport_mode, "localhost"
478 )
479 control_socket.connect(control_url)
481 # Send control message
482 message = {"type": message_type}
483 control_socket.send(pickle.dumps(message))
485 # Wait for acknowledgment
486 response = control_socket.recv()
487 response_data = pickle.loads(response)
489 if response_data.get("status") == "success":
490 logger.info(
491 f"🔬 FIJI VISUALIZER: {message_type} acknowledged by viewer"
492 )
493 return True
494 else:
495 logger.warning(
496 f"🔬 FIJI VISUALIZER: {message_type} failed: {response_data}"
497 )
498 return False
500 except zmq.Again:
501 logger.warning(
502 f"🔬 FIJI VISUALIZER: Timeout waiting for {message_type} acknowledgment"
503 )
504 return False
505 except Exception as e:
506 logger.warning(f"🔬 FIJI VISUALIZER: Failed to send {message_type}: {e}")
507 return False
508 finally:
509 if control_socket:
510 try:
511 control_socket.close()
512 except Exception as e:
513 logger.debug(f"Failed to close control socket: {e}")
514 if control_context:
515 try:
516 control_context.term()
517 except Exception as e:
518 logger.debug(f"Failed to terminate control context: {e}")
520 def clear_viewer_state(self) -> bool:
521 """
522 Clear accumulated viewer state (dimension values, hyperstack metadata) for a new pipeline run.
524 Returns:
525 True if state was cleared successfully, False otherwise
526 """
527 return self.send_control_message("clear_state")
529 def stop_viewer(self) -> None:
530 """Stop Fiji viewer server (only if not persistent)."""
531 global _global_fiji_process
533 with self._lock:
534 if not self.persistent:
535 logger.info("🔬 FIJI VISUALIZER: Stopping non-persistent Fiji viewer")
537 if self.process:
538 # Handle both subprocess and multiprocessing process types
539 if hasattr(self.process, "is_alive"):
540 # multiprocessing.Process
541 if self.process.is_alive():
542 self.process.terminate()
543 self.process.join(timeout=5)
544 if self.process.is_alive():
545 logger.warning(
546 "🔬 FIJI VISUALIZER: Force killing Fiji viewer"
547 )
548 self.process.kill()
549 self.process.join(timeout=2)
550 else:
551 # subprocess.Popen
552 if self.process.poll() is None:
553 self.process.terminate()
554 try:
555 self.process.wait(timeout=5)
556 except subprocess.TimeoutExpired:
557 logger.warning(
558 "🔬 FIJI VISUALIZER: Force killing Fiji viewer"
559 )
560 self.process.kill()
562 # Clear global reference
563 with _global_fiji_lock:
564 if _global_fiji_process == self.process:
565 _global_fiji_process = None
567 self._is_running = False
568 else:
569 logger.info("🔬 FIJI VISUALIZER: Keeping persistent Fiji viewer alive")
570 # DON'T set _is_running = False for persistent viewers!
571 # The process is still alive and should be reusable
573 def _cleanup_zmq(self) -> None:
574 """Clean up ZMQ client connection (for persistent viewers)."""
575 if self._zmq_client:
576 try:
577 self._zmq_client.close()
578 except Exception as e:
579 logger.warning(f"🔬 FIJI VISUALIZER: Failed to cleanup ZMQ client: {e}")
580 self._zmq_client = None
582 def is_viewer_running(self) -> bool:
583 """Check if Fiji viewer process is running."""
584 return self.is_running and self.process is not None and self.process.is_alive()