Coverage for openhcs/runtime/queue_tracker.py: 15.8%

109 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1"""Queue tracker for monitoring image processing progress in viewers. 

2 

3Tracks sent images by ID and decrements count when acknowledgments are received. 

4Used to show real-time progress like '3/10 images processed' in the UI. 

5""" 

6 

7import logging 

8import threading 

9import time 

10from typing import Dict, Tuple, Optional, Set 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15class QueueTracker: 

16 """Tracks pending images for a single viewer. 

17  

18 Thread-safe tracker that maintains: 

19 - Set of sent image IDs (pending processing) 

20 - Set of processed image IDs (received acks) 

21 - Timestamps for timeout detection 

22 """ 

23 

24 def __init__(self, viewer_port: int, viewer_type: str, timeout_seconds: float = 30.0): 

25 """Initialize queue tracker. 

26  

27 Args: 

28 viewer_port: Port of the viewer being tracked 

29 viewer_type: 'napari' or 'fiji' 

30 timeout_seconds: How long to wait for ack before marking as stuck 

31 """ 

32 self.viewer_port = viewer_port 

33 self.viewer_type = viewer_type 

34 self.timeout_seconds = timeout_seconds 

35 

36 self._lock = threading.Lock() 

37 self._pending: Dict[str, float] = {} # {image_id: timestamp_sent} 

38 self._processed: Set[str] = set() # {image_id} 

39 self._total_sent = 0 

40 self._total_processed = 0 

41 

42 def register_sent(self, image_id: str): 

43 """Register that an image was sent to the viewer. 

44  

45 Args: 

46 image_id: UUID of the sent image 

47 """ 

48 with self._lock: 

49 self._pending[image_id] = time.time() 

50 self._total_sent += 1 

51 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Registered sent image {image_id} (pending: {len(self._pending)})") 

52 

53 def mark_processed(self, image_id: str): 

54 """Mark an image as processed (ack received). 

55 

56 Args: 

57 image_id: UUID of the processed image 

58 """ 

59 with self._lock: 

60 if image_id in self._pending: 

61 elapsed = time.time() - self._pending[image_id] 

62 del self._pending[image_id] 

63 self._processed.add(image_id) 

64 self._total_processed += 1 

65 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Marked processed {image_id} (took {elapsed:.2f}s, pending: {len(self._pending)})") 

66 

67 # Log when all images are processed (but don't auto-clear) 

68 # The UI needs to read the final progress before the tracker is cleared 

69 if len(self._pending) == 0 and self._total_sent > 0: 

70 logger.info(f"[{self.viewer_type}:{self.viewer_port}] All {self._total_sent} images processed") 

71 else: 

72 # Image was not registered (likely sent from worker process with separate registry) 

73 # Still count it as processed so UI can track progress 

74 if image_id not in self._processed: 

75 self._processed.add(image_id) 

76 self._total_processed += 1 

77 self._total_sent += 1 # Retroactively count as sent 

78 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Received ack for unregistered image {image_id}, counted retroactively (processed: {self._total_processed}/{self._total_sent})") 

79 

80 def get_progress(self) -> Tuple[int, int]: 

81 """Get current progress. 

82  

83 Returns: 

84 (processed_count, total_sent_count) 

85 """ 

86 with self._lock: 

87 return (self._total_processed, self._total_sent) 

88 

89 def get_pending_count(self) -> int: 

90 """Get number of pending images (sent but not acked). 

91  

92 Returns: 

93 Number of pending images 

94 """ 

95 with self._lock: 

96 return len(self._pending) 

97 

98 def has_stuck_images(self) -> bool: 

99 """Check if any images have been pending longer than timeout. 

100  

101 Returns: 

102 True if any images are stuck (no ack within timeout) 

103 """ 

104 with self._lock: 

105 now = time.time() 

106 for image_id, sent_time in self._pending.items(): 

107 if now - sent_time > self.timeout_seconds: 

108 return True 

109 return False 

110 

111 def get_stuck_images(self) -> list: 

112 """Get list of stuck image IDs (pending longer than timeout). 

113  

114 Returns: 

115 List of (image_id, elapsed_seconds) tuples 

116 """ 

117 with self._lock: 

118 now = time.time() 

119 stuck = [] 

120 for image_id, sent_time in self._pending.items(): 

121 elapsed = now - sent_time 

122 if elapsed > self.timeout_seconds: 

123 stuck.append((image_id, elapsed)) 

124 return stuck 

125 

126 def clear(self): 

127 """Clear all tracking data (e.g., when viewer is closed).""" 

128 with self._lock: 

129 self._pending.clear() 

130 self._processed.clear() 

131 self._total_sent = 0 

132 self._total_processed = 0 

133 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Cleared queue tracker") 

134 

135 def reset_for_new_batch(self): 

136 """Reset tracker for a new batch of images (e.g., new pipeline execution). 

137 

138 Clears pending and processed sets but preserves the tracker for reuse. 

139 """ 

140 with self._lock: 

141 self._pending.clear() 

142 self._processed.clear() 

143 self._total_sent = 0 

144 self._total_processed = 0 

145 logger.debug(f"[{self.viewer_type}:{self.viewer_port}] Reset queue tracker for new batch") 

146 

147 def __repr__(self): 

148 with self._lock: 

149 return f"QueueTracker({self.viewer_type}:{self.viewer_port}, processed={self._total_processed}/{self._total_sent}, pending={len(self._pending)})" 

150 

151 

152class GlobalQueueTrackerRegistry: 

153 """Global registry of queue trackers for all viewers. 

154  

155 Singleton that maintains queue trackers for each active viewer. 

156 Used by the ack listener to route acks to the correct tracker. 

157 """ 

158 

159 _instance = None 

160 _lock = threading.Lock() 

161 

162 def __new__(cls): 

163 if cls._instance is None: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 with cls._lock: 

165 if cls._instance is None: 

166 cls._instance = super().__new__(cls) 

167 cls._instance._initialized = False 

168 return cls._instance 

169 

170 def __init__(self): 

171 if self._initialized: 171 ↛ 173line 171 didn't jump to line 173 because the condition on line 171 was always true

172 return 

173 self._initialized = True 

174 self._trackers: Dict[int, QueueTracker] = {} # {viewer_port: QueueTracker} 

175 self._registry_lock = threading.Lock() 

176 logger.info("Initialized GlobalQueueTrackerRegistry") 

177 

178 def get_or_create_tracker(self, viewer_port: int, viewer_type: str) -> QueueTracker: 

179 """Get existing tracker or create new one for a viewer. 

180  

181 Args: 

182 viewer_port: Port of the viewer 

183 viewer_type: 'napari' or 'fiji' 

184  

185 Returns: 

186 QueueTracker for this viewer 

187 """ 

188 with self._registry_lock: 

189 if viewer_port not in self._trackers: 

190 self._trackers[viewer_port] = QueueTracker(viewer_port, viewer_type) 

191 logger.info(f"Created queue tracker for {viewer_type} viewer on port {viewer_port}") 

192 return self._trackers[viewer_port] 

193 

194 def get_tracker(self, viewer_port: int) -> Optional[QueueTracker]: 

195 """Get tracker for a viewer port. 

196  

197 Args: 

198 viewer_port: Port of the viewer 

199  

200 Returns: 

201 QueueTracker if exists, None otherwise 

202 """ 

203 with self._registry_lock: 

204 return self._trackers.get(viewer_port) 

205 

206 def remove_tracker(self, viewer_port: int): 

207 """Remove tracker for a viewer (e.g., when viewer is closed). 

208  

209 Args: 

210 viewer_port: Port of the viewer 

211 """ 

212 with self._registry_lock: 

213 if viewer_port in self._trackers: 

214 del self._trackers[viewer_port] 

215 logger.info(f"Removed queue tracker for viewer on port {viewer_port}") 

216 

217 def get_all_trackers(self) -> Dict[int, QueueTracker]: 

218 """Get all active trackers. 

219  

220 Returns: 

221 Dict of {viewer_port: QueueTracker} 

222 """ 

223 with self._registry_lock: 

224 return dict(self._trackers) 

225 

226 def clear_all(self): 

227 """Clear all trackers (e.g., on shutdown).""" 

228 with self._registry_lock: 

229 self._trackers.clear() 

230 logger.info("Cleared all queue trackers") 

231