Coverage for openhcs/pyqt_gui/utils/process_tracker.py: 0.0%
83 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Process Tracker Utility
4Tracks which processes are currently alive for log file status indication.
5Used by log viewer to distinguish logs from running vs terminated processes.
6"""
8import logging
9import re
10from pathlib import Path
11from typing import Set, Optional
12from dataclasses import dataclass
14logger = logging.getLogger(__name__)
17@dataclass
18class ProcessInfo:
19 """Information about a tracked process."""
20 pid: int
21 name: str
22 status: str
23 create_time: float
26class ProcessTracker:
27 """
28 Tracks which processes are currently alive.
30 Used to determine if log files correspond to running or terminated processes.
31 """
33 def __init__(self):
34 """Initialize process tracker."""
35 self.alive_pids: Set[int] = set()
36 self.process_info: dict[int, ProcessInfo] = {}
37 self._psutil_available = False
39 # Check if psutil is available
40 try:
41 import psutil
42 self._psutil_available = True
43 logger.debug("ProcessTracker initialized with psutil support")
44 except ImportError:
45 logger.warning("psutil not available - process tracking disabled")
47 def update(self):
48 """Update list of alive PIDs and their info."""
49 if not self._psutil_available:
50 return
52 try:
53 import psutil
55 # Get all current PIDs
56 new_alive_pids = set()
57 new_process_info = {}
59 for proc in psutil.process_iter(['pid', 'name', 'status', 'create_time']):
60 try:
61 pid = proc.info['pid']
62 new_alive_pids.add(pid)
63 new_process_info[pid] = ProcessInfo(
64 pid=pid,
65 name=proc.info.get('name', 'unknown'),
66 status=proc.info.get('status', 'unknown'),
67 create_time=proc.info.get('create_time', 0)
68 )
69 except (psutil.NoSuchProcess, psutil.AccessDenied):
70 pass
72 self.alive_pids = new_alive_pids
73 self.process_info = new_process_info
75 logger.debug(f"Updated process tracker: {len(self.alive_pids)} alive processes")
77 except Exception as e:
78 logger.warning(f"Failed to update process tracker: {e}")
80 def is_alive(self, pid: Optional[int]) -> bool:
81 """
82 Check if a PID is currently alive.
84 Args:
85 pid: Process ID to check (None returns False)
87 Returns:
88 bool: True if process is alive, False otherwise
89 """
90 if pid is None or not self._psutil_available:
91 return False
92 return pid in self.alive_pids
94 def get_process_info(self, pid: int) -> Optional[ProcessInfo]:
95 """
96 Get information about a process.
98 Args:
99 pid: Process ID
101 Returns:
102 ProcessInfo if process is alive, None otherwise
103 """
104 return self.process_info.get(pid)
106 def get_status_icon(self, pid: Optional[int]) -> str:
107 """
108 Get status icon for a process.
110 Args:
111 pid: Process ID (None returns unknown icon)
113 Returns:
114 str: Status icon (🟢 alive, ⚫ dead, ❓ unknown)
115 """
116 if pid is None or not self._psutil_available:
117 return "❓"
118 return "🟢" if self.is_alive(pid) else "⚫"
120 def get_status_text(self, pid: Optional[int]) -> str:
121 """
122 Get human-readable status text for a process.
124 Args:
125 pid: Process ID (None returns "Unknown")
127 Returns:
128 str: Status text ("Running", "Terminated", "Unknown")
129 """
130 if pid is None or not self._psutil_available:
131 return "Unknown"
132 return "Running" if self.is_alive(pid) else "Terminated"
135def extract_pid_from_log_filename(log_path: Path) -> Optional[int]:
136 """
137 Extract PID from OpenHCS log filename.
139 Supports various log filename patterns:
140 - openhcs_unified_20251007_102845_worker_12345.log
141 - zmq_worker_exec_abc123_worker_12345.log
142 - openhcs_worker_12345.log
144 Args:
145 log_path: Path to log file
147 Returns:
148 int: Extracted PID, or None if no PID found
149 """
150 # Pattern to match worker PID in filename
151 match = re.search(r'worker_(\d+)', log_path.name)
152 if match:
153 return int(match.group(1))
155 # Pattern to match PID in other formats (e.g., process_12345.log)
156 match = re.search(r'process_(\d+)', log_path.name)
157 if match:
158 return int(match.group(1))
160 # Pattern to match PID at end of filename (e.g., server_12345.log)
161 match = re.search(r'_(\d+)\.log$', log_path.name)
162 if match:
163 pid_candidate = int(match.group(1))
164 # Only return if it looks like a PID (not a timestamp)
165 if pid_candidate < 1000000: # PIDs are typically < 1M
166 return pid_candidate
168 return None
171def get_log_display_name(log_path: Path, process_tracker: ProcessTracker) -> str:
172 """
173 Get display name for log file with process status indicator.
175 Args:
176 log_path: Path to log file
177 process_tracker: ProcessTracker instance
179 Returns:
180 str: Display name with status icon (e.g., "🟢 worker_12345.log")
181 """
182 pid = extract_pid_from_log_filename(log_path)
183 icon = process_tracker.get_status_icon(pid)
184 return f"{icon} {log_path.name}"
187def get_log_tooltip(log_path: Path, process_tracker: ProcessTracker) -> str:
188 """
189 Get tooltip text for log file with process information.
191 Args:
192 log_path: Path to log file
193 process_tracker: ProcessTracker instance
195 Returns:
196 str: Tooltip text with process status and info
197 """
198 pid = extract_pid_from_log_filename(log_path)
200 if pid is None:
201 return f"Log file: {log_path.name}\nProcess: Unknown"
203 status = process_tracker.get_status_text(pid)
204 tooltip = f"Log file: {log_path.name}\nPID: {pid}\nStatus: {status}"
206 # Add additional process info if available
207 proc_info = process_tracker.get_process_info(pid)
208 if proc_info:
209 tooltip += f"\nProcess: {proc_info.name}"
211 return tooltip