Coverage for openhcs/runtime/queue_tracker.py: 15.8%
109 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""Queue tracker for monitoring image processing progress in viewers.
3Tracks sent images by ID and decrements count when acknowledgments are received.
4Used to show real-time progress like '3/10 images processed' in the UI.
5"""
7import logging
8import threading
9import time
10from typing import Dict, Tuple, Optional, Set
12logger = logging.getLogger(__name__)
15class QueueTracker:
16 """Tracks pending images for a single viewer.
18 Thread-safe tracker that maintains:
19 - Set of sent image IDs (pending processing)
20 - Set of processed image IDs (received acks)
21 - Timestamps for timeout detection
22 """
24 def __init__(self, viewer_port: int, viewer_type: str, timeout_seconds: float = 30.0):
25 """Initialize queue tracker.
27 Args:
28 viewer_port: Port of the viewer being tracked
29 viewer_type: 'napari' or 'fiji'
30 timeout_seconds: How long to wait for ack before marking as stuck
31 """
32 self.viewer_port = viewer_port
33 self.viewer_type = viewer_type
34 self.timeout_seconds = timeout_seconds
36 self._lock = threading.Lock()
37 self._pending: Dict[str, float] = {} # {image_id: timestamp_sent}
38 self._processed: Set[str] = set() # {image_id}
39 self._total_sent = 0
40 self._total_processed = 0
42 def register_sent(self, image_id: str):
43 """Register that an image was sent to the viewer.
45 Args:
46 image_id: UUID of the sent image
47 """
48 with self._lock:
49 self._pending[image_id] = time.time()
50 self._total_sent += 1
51 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Registered sent image {image_id} (pending: {len(self._pending)})")
53 def mark_processed(self, image_id: str):
54 """Mark an image as processed (ack received).
56 Args:
57 image_id: UUID of the processed image
58 """
59 with self._lock:
60 if image_id in self._pending:
61 elapsed = time.time() - self._pending[image_id]
62 del self._pending[image_id]
63 self._processed.add(image_id)
64 self._total_processed += 1
65 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Marked processed {image_id} (took {elapsed:.2f}s, pending: {len(self._pending)})")
67 # Log when all images are processed (but don't auto-clear)
68 # The UI needs to read the final progress before the tracker is cleared
69 if len(self._pending) == 0 and self._total_sent > 0:
70 logger.info(f"[{self.viewer_type}:{self.viewer_port}] All {self._total_sent} images processed")
71 else:
72 # Image was not registered (likely sent from worker process with separate registry)
73 # Still count it as processed so UI can track progress
74 if image_id not in self._processed:
75 self._processed.add(image_id)
76 self._total_processed += 1
77 self._total_sent += 1 # Retroactively count as sent
78 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Received ack for unregistered image {image_id}, counted retroactively (processed: {self._total_processed}/{self._total_sent})")
80 def get_progress(self) -> Tuple[int, int]:
81 """Get current progress.
83 Returns:
84 (processed_count, total_sent_count)
85 """
86 with self._lock:
87 return (self._total_processed, self._total_sent)
89 def get_pending_count(self) -> int:
90 """Get number of pending images (sent but not acked).
92 Returns:
93 Number of pending images
94 """
95 with self._lock:
96 return len(self._pending)
98 def has_stuck_images(self) -> bool:
99 """Check if any images have been pending longer than timeout.
101 Returns:
102 True if any images are stuck (no ack within timeout)
103 """
104 with self._lock:
105 now = time.time()
106 for image_id, sent_time in self._pending.items():
107 if now - sent_time > self.timeout_seconds:
108 return True
109 return False
111 def get_stuck_images(self) -> list:
112 """Get list of stuck image IDs (pending longer than timeout).
114 Returns:
115 List of (image_id, elapsed_seconds) tuples
116 """
117 with self._lock:
118 now = time.time()
119 stuck = []
120 for image_id, sent_time in self._pending.items():
121 elapsed = now - sent_time
122 if elapsed > self.timeout_seconds:
123 stuck.append((image_id, elapsed))
124 return stuck
126 def clear(self):
127 """Clear all tracking data (e.g., when viewer is closed)."""
128 with self._lock:
129 self._pending.clear()
130 self._processed.clear()
131 self._total_sent = 0
132 self._total_processed = 0
133 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Cleared queue tracker")
135 def reset_for_new_batch(self):
136 """Reset tracker for a new batch of images (e.g., new pipeline execution).
138 Clears pending and processed sets but preserves the tracker for reuse.
139 """
140 with self._lock:
141 self._pending.clear()
142 self._processed.clear()
143 self._total_sent = 0
144 self._total_processed = 0
145 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Reset queue tracker for new batch")
147 def __repr__(self):
148 with self._lock:
149 return f"QueueTracker({self.viewer_type}:{self.viewer_port}, processed={self._total_processed}/{self._total_sent}, pending={len(self._pending)})"
152class GlobalQueueTrackerRegistry:
153 """Global registry of queue trackers for all viewers.
155 Singleton that maintains queue trackers for each active viewer.
156 Used by the ack listener to route acks to the correct tracker.
157 """
159 _instance = None
160 _lock = threading.Lock()
162 def __new__(cls):
163 if cls._instance is None: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 with cls._lock:
165 if cls._instance is None:
166 cls._instance = super().__new__(cls)
167 cls._instance._initialized = False
168 return cls._instance
170 def __init__(self):
171 if self._initialized: 171 ↛ 173line 171 didn't jump to line 173 because the condition on line 171 was always true
172 return
173 self._initialized = True
174 self._trackers: Dict[int, QueueTracker] = {} # {viewer_port: QueueTracker}
175 self._registry_lock = threading.Lock()
176 logger.info("Initialized GlobalQueueTrackerRegistry")
178 def get_or_create_tracker(self, viewer_port: int, viewer_type: str) -> QueueTracker:
179 """Get existing tracker or create new one for a viewer.
181 Args:
182 viewer_port: Port of the viewer
183 viewer_type: 'napari' or 'fiji'
185 Returns:
186 QueueTracker for this viewer
187 """
188 with self._registry_lock:
189 if viewer_port not in self._trackers:
190 self._trackers[viewer_port] = QueueTracker(viewer_port, viewer_type)
191 logger.info(f"Created queue tracker for {viewer_type} viewer on port {viewer_port}")
192 return self._trackers[viewer_port]
194 def get_tracker(self, viewer_port: int) -> Optional[QueueTracker]:
195 """Get tracker for a viewer port.
197 Args:
198 viewer_port: Port of the viewer
200 Returns:
201 QueueTracker if exists, None otherwise
202 """
203 with self._registry_lock:
204 return self._trackers.get(viewer_port)
206 def remove_tracker(self, viewer_port: int):
207 """Remove tracker for a viewer (e.g., when viewer is closed).
209 Args:
210 viewer_port: Port of the viewer
211 """
212 with self._registry_lock:
213 if viewer_port in self._trackers:
214 del self._trackers[viewer_port]
215 logger.info(f"Removed queue tracker for viewer on port {viewer_port}")
217 def get_all_trackers(self) -> Dict[int, QueueTracker]:
218 """Get all active trackers.
220 Returns:
221 Dict of {viewer_port: QueueTracker}
222 """
223 with self._registry_lock:
224 return dict(self._trackers)
226 def clear_all(self):
227 """Clear all trackers (e.g., on shutdown)."""
228 with self._registry_lock:
229 self._trackers.clear()
230 logger.info("Cleared all queue trackers")