Coverage for openhcs/core/utils.py: 25.7%
233 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Utility functions for the OpenHCS package.
3"""
5import functools
6import logging
7import re
8import threading
9import time
10from collections import defaultdict
11from pathlib import Path
12from typing import Any, Callable, Dict, List, Optional, Union
14logger = logging.getLogger(__name__)
16class _ModulePlaceholder:
17 """
18 Placeholder for missing optional modules that allows attribute access
19 for type annotations while still being falsy and failing on actual use.
20 """
21 def __init__(self, module_name: str):
22 self._module_name = module_name
24 def __bool__(self):
25 return False
27 def __getattr__(self, name):
28 # Return another placeholder for chained attribute access
29 # This allows things like cp.ndarray in type annotations to work
30 return _ModulePlaceholder(f"{self._module_name}.{name}")
32 def __call__(self, *args, **kwargs):
33 # If someone tries to actually call a function, fail loudly
34 raise ImportError(f"Module '{self._module_name}' is not available. Please install the required dependency.")
36 def __repr__(self):
37 return f"<ModulePlaceholder for '{self._module_name}'>"
40def optional_import(module_name: str) -> Optional[Any]:
41 """
42 Import a module if available, otherwise return a placeholder that handles
43 attribute access gracefully for type annotations but fails on actual use.
45 This function allows for graceful handling of optional dependencies.
46 It can be used to import libraries that may not be installed,
47 particularly GPU-related libraries like torch, tensorflow, and cupy.
49 Args:
50 module_name: Name of the module to import
52 Returns:
53 The imported module if available, a placeholder otherwise
55 Example:
56 ```python
57 # Import torch if available
58 torch = optional_import("torch")
60 # Check if torch is available before using it
61 if torch:
62 # Use torch
63 tensor = torch.tensor([1, 2, 3])
64 else:
65 # Handle the case where torch is not available
66 raise ImportError("PyTorch is required for this function")
67 ```
68 """
69 try:
70 # Use importlib.import_module which handles dotted names properly
71 import importlib
72 return importlib.import_module(module_name)
73 except (ImportError, ModuleNotFoundError, AttributeError):
74 # Return a placeholder that handles attribute access gracefully
75 return _ModulePlaceholder(module_name)
77# Global thread activity tracking
78thread_activity = defaultdict(list)
79active_threads = set()
80thread_lock = threading.Lock()
82def get_thread_activity() -> Dict[int, List[Dict[str, Any]]]:
83 """
84 Get the current thread activity data.
86 Returns:
87 Dict mapping thread IDs to lists of activity records
88 """
89 return thread_activity
91def get_active_threads() -> set:
92 """
93 Get the set of currently active thread IDs.
95 Returns:
96 Set of active thread IDs
97 """
98 return active_threads
100def clear_thread_activity():
101 """Clear all thread activity data."""
102 with thread_lock:
103 thread_activity.clear()
104 active_threads.clear()
106def track_thread_activity(func: Optional[Callable] = None, *, log_level: str = "info"):
107 """
108 Decorator to track thread activity for a function.
110 Args:
111 func: The function to decorate
112 log_level: Logging level to use ("debug", "info", "warning", "error")
114 Returns:
115 Decorated function that tracks thread activity
116 """
117 def decorator(f):
118 @functools.wraps(f)
119 def wrapper(*args, **kwargs):
120 # Get thread information
121 thread_id = threading.get_ident()
122 thread_name = threading.current_thread().name
124 # Record thread start time
125 start_time = time.time()
127 # Extract function name and arguments for context
128 func_name = f.__name__
129 # Get the first argument if it's a method (self or cls)
130 context = ""
131 if args and hasattr(args[0], "__class__"):
132 if hasattr(args[0].__class__, func_name):
133 # It's likely a method, extract class name
134 context = f"{args[0].__class__.__name__}."
136 # Extract well information if present in kwargs or args
137 well = kwargs.get('well', None)
138 if well is None and len(args) > 1 and isinstance(args[1], str):
139 # Assume second argument might be well in methods like process_well(self, well, ...)
140 well = args[1]
142 # Add this thread to active threads
143 with thread_lock:
144 active_threads.add(thread_id)
145 # Record the number of active threads at this moment
146 thread_activity[thread_id].append({
147 'well': well,
148 'thread_name': thread_name,
149 'time': time.time(),
150 'action': 'start',
151 'function': f"{context}{func_name}",
152 'active_threads': len(active_threads)
153 })
155 # Log the start of the function
156 log_func = getattr(logger, log_level.lower())
157 log_func(f"Thread {thread_name} (ID: {thread_id}) started {context}{func_name} for well {well}")
158 log_func(f"Active threads: {len(active_threads)}")
160 try:
161 # Call the original function
162 result = f(*args, **kwargs)
163 return result
164 finally:
165 # Record thread end time
166 end_time = time.time()
167 duration = end_time - start_time
169 # Remove this thread from active threads
170 with thread_lock:
171 active_threads.remove(thread_id)
172 # Record the number of active threads at this moment
173 thread_activity[thread_id].append({
174 'well': well,
175 'thread_name': thread_name,
176 'time': time.time(),
177 'action': 'end',
178 'function': f"{context}{func_name}",
179 'duration': duration,
180 'active_threads': len(active_threads)
181 })
183 log_func(f"Thread {thread_name} (ID: {thread_id}) finished {context}{func_name} for well {well} in {duration:.2f} seconds")
184 log_func(f"Active threads: {len(active_threads)}")
186 return wrapper
188 # Handle both @track_thread_activity and @track_thread_activity(log_level="debug")
189 if func is None:
190 return decorator
191 return decorator(func)
193def analyze_thread_activity():
194 """
195 Analyze thread activity data and return a report.
197 Returns:
198 Dict containing analysis results
199 """
200 max_concurrent = 0
201 thread_starts = []
202 thread_ends = []
204 for thread_id, activities in thread_activity.items():
205 for activity in activities:
206 max_concurrent = max(max_concurrent, activity['active_threads'])
207 if activity['action'] == 'start':
208 thread_starts.append((
209 activity.get('well'),
210 activity['thread_name'],
211 activity['time'],
212 activity.get('function', '')
213 ))
214 else: # 'end'
215 thread_ends.append((
216 activity.get('well'),
217 activity['thread_name'],
218 activity['time'],
219 activity.get('duration', 0),
220 activity.get('function', '')
221 ))
223 # Sort by time
224 thread_starts.sort(key=lambda x: x[2])
225 thread_ends.sort(key=lambda x: x[2])
227 # Find overlapping time periods
228 overlaps = []
229 for i, (well1, thread1, start1, func1) in enumerate(thread_starts):
230 # Find the end time for this thread
231 end1 = None
232 for w, t, end, d, f in thread_ends:
233 if t == thread1 and w == well1 and f == func1:
234 end1 = end
235 break
237 if end1 is None:
238 continue # Skip if we can't find the end time
240 # Check for overlaps with other threads
241 for j, (well2, thread2, start2, func2) in enumerate(thread_starts):
242 if i == j or thread1 == thread2: # Skip same thread
243 continue
245 # Find the end time for the other thread
246 end2 = None
247 for w, t, end, d, f in thread_ends:
248 if t == thread2 and w == well2 and f == func2:
249 end2 = end
250 break
252 if end2 is None:
253 continue # Skip if we can't find the end time
255 # Check if there's an overlap
256 if start1 < end2 and start2 < end1:
257 overlap_start = max(start1, start2)
258 overlap_end = min(end1, end2)
259 overlap_duration = overlap_end - overlap_start
261 if overlap_duration > 0:
262 overlaps.append({
263 'thread1': thread1,
264 'well1': well1,
265 'function1': func1,
266 'thread2': thread2,
267 'well2': well2,
268 'function2': func2,
269 'duration': overlap_duration
270 })
272 return {
273 'max_concurrent': max_concurrent,
274 'thread_starts': thread_starts,
275 'thread_ends': thread_ends,
276 'overlaps': overlaps
277 }
279def print_thread_activity_report():
280 """Print a detailed report of thread activity."""
281 analysis = analyze_thread_activity()
283 print("\n" + "=" * 80)
284 print("Thread Activity Report")
285 print("=" * 80)
287 print("\nThread Start Events:")
288 for well, thread_name, time_val, func in analysis['thread_starts']:
289 print(f"Thread {thread_name} started {func} for well {well} at {time_val:.2f}")
291 print("\nThread End Events:")
292 for well, thread_name, time_val, duration, func in analysis['thread_ends']:
293 print(f"Thread {thread_name} finished {func} for well {well} at {time_val:.2f} (duration: {duration:.2f}s)")
295 print("\nOverlap Analysis:")
296 for overlap in analysis['overlaps']:
297 print(f"Threads {overlap['thread1']} and {overlap['thread2']} overlapped for {overlap['duration']:.2f}s")
298 print(f" {overlap['thread1']} was processing {overlap['function1']} for well {overlap['well1']}")
299 print(f" {overlap['thread2']} was processing {overlap['function2']} for well {overlap['well2']}")
301 print(f"\nFound {len(analysis['overlaps'])} thread overlaps")
302 print(f"Maximum concurrent threads: {analysis['max_concurrent']}")
303 print("=" * 80)
305 return analysis
308# Natural sorting utilities
309def natural_sort_key(text: Union[str, Path]) -> List[Union[str, int]]:
310 """
311 Generate a natural sorting key for a string or Path.
313 This function converts a string into a list of strings and integers
314 that can be used as a sorting key to achieve natural (human-friendly)
315 sorting order.
317 Args:
318 text: String or Path to generate sorting key for
320 Returns:
321 List of strings and integers for natural sorting
323 Examples:
324 >>> natural_sort_key("file10.txt")
325 ['file', 10, '.txt']
326 >>> natural_sort_key("A01_s001_w1_z001.tif")
327 ['A', 1, '_s', 1, '_w', 1, '_z', 1, '.tif']
328 """
329 text = str(text)
331 # Split on sequences of digits, keeping the digits
332 parts = re.split(r'(\d+)', text)
334 # Convert digit sequences to integers, leave other parts as strings
335 result = []
336 for part in parts:
337 if part.isdigit():
338 result.append(int(part))
339 else:
340 result.append(part)
342 return result
345def natural_sort(items: List[Union[str, Path]]) -> List[Union[str, Path]]:
346 """
347 Sort a list of strings or Paths using natural sorting.
349 Args:
350 items: List of strings or Paths to sort
352 Returns:
353 New list sorted in natural order
355 Examples:
356 >>> natural_sort(["file1.txt", "file10.txt", "file2.txt"])
357 ['file1.txt', 'file2.txt', 'file10.txt']
358 >>> natural_sort(["A01_s001.tif", "A01_s010.tif", "A01_s002.tif"])
359 ['A01_s001.tif', 'A01_s002.tif', 'A01_s010.tif']
360 """
361 return sorted(items, key=natural_sort_key)
364def natural_sort_inplace(items: List[Union[str, Path]]) -> None:
365 """
366 Sort a list of strings or Paths using natural sorting in-place.
368 Args:
369 items: List of strings or Paths to sort in-place
371 Examples:
372 >>> files = ["file1.txt", "file10.txt", "file2.txt"]
373 >>> natural_sort_inplace(files)
374 >>> files
375 ['file1.txt', 'file2.txt', 'file10.txt']
376 """
377 items.sort(key=natural_sort_key)
380# === WELL FILTERING UTILITIES ===
382import re
383import string
384from typing import List, Set, Union
385from openhcs.core.config import WellFilterMode
388class WellPatternConstants:
389 """Centralized constants for well pattern parsing."""
390 COMMA_SEPARATOR = ","
391 RANGE_SEPARATOR = ":"
392 ROW_PREFIX = "row:"
393 COL_PREFIX = "col:"
396class WellFilterProcessor:
397 """
398 Enhanced well filtering processor supporting both compilation-time and execution-time filtering.
400 Maintains backward compatibility with existing execution-time methods while adding
401 compilation-time capabilities for the 5-phase compilation system.
403 Follows systematic refactoring framework principles:
404 - Fail-loud validation with clear error messages
405 - Pythonic patterns and idioms
406 - Leverages existing well filtering infrastructure
407 - Eliminates magic strings through centralized constants
408 """
410 # === NEW COMPILATION-TIME METHOD ===
412 @staticmethod
413 def resolve_compilation_filter(
414 well_filter: Union[List[str], str, int],
415 available_wells: List[str]
416 ) -> Set[str]:
417 """
418 Resolve well filter to concrete well set during compilation.
420 Combines validation and resolution in single method to avoid verbose helper methods.
421 Supports all existing filter types while providing compilation-time optimization.
422 Works with any well naming format (A01, R01C03, etc.) by using available wells.
424 Args:
425 well_filter: Filter specification (list, string pattern, or max count)
426 available_wells: Ordered list of wells from orchestrator.get_component_keys(GroupBy.WELL)
428 Returns:
429 Set of well IDs that match the filter
431 Raises:
432 ValueError: If wells don't exist, insufficient wells for count, or invalid patterns
433 """
434 if isinstance(well_filter, list): 434 ↛ 436line 434 didn't jump to line 436 because the condition on line 434 was never true
435 # Inline validation for specific wells
436 available_set = set(available_wells)
437 invalid_wells = [w for w in well_filter if w not in available_set]
438 if invalid_wells:
439 raise ValueError(
440 f"Invalid wells specified: {invalid_wells}. "
441 f"Available wells: {sorted(available_set)}"
442 )
443 return set(well_filter)
445 elif isinstance(well_filter, int): 445 ↛ 455line 445 didn't jump to line 455 because the condition on line 445 was always true
446 # Inline validation for max count
447 if well_filter <= 0: 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true
448 raise ValueError(f"Max count must be positive, got: {well_filter}")
449 if well_filter > len(available_wells): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true
450 raise ValueError(
451 f"Requested {well_filter} wells but only {len(available_wells)} available"
452 )
453 return set(available_wells[:well_filter])
455 elif isinstance(well_filter, str):
456 # Pass available wells to pattern parsing for format-agnostic support
457 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells)
459 else:
460 raise ValueError(f"Unsupported well filter type: {type(well_filter)}")
462 # === EXISTING EXECUTION-TIME METHODS (MAINTAINED) ===
464 @staticmethod
465 def should_materialize_well(
466 well_id: str,
467 config, # MaterializationPathConfig
468 processed_wells: Set[str]
469 ) -> bool:
470 """
471 EXISTING METHOD: Determine if a well should be materialized during execution.
472 Maintained for backward compatibility and execution-time fallback.
473 """
474 if config.well_filter is None:
475 return True # No filter = materialize all wells
477 # Expand filter pattern to well list
478 target_wells = WellFilterProcessor.expand_well_filter(config.well_filter)
480 # Apply max wells limit if filter is integer
481 if isinstance(config.well_filter, int):
482 if len(processed_wells) >= config.well_filter:
483 return False
485 # Check if well matches filter
486 well_in_filter = well_id in target_wells
488 # Apply include/exclude mode
489 if config.well_filter_mode == WellFilterMode.INCLUDE:
490 return well_in_filter
491 else: # EXCLUDE mode
492 return not well_in_filter
494 @staticmethod
495 def expand_well_filter(well_filter: Union[List[str], str, int]) -> Set[str]:
496 """
497 EXISTING METHOD: Expand well filter pattern to set of well IDs.
498 Maintained for backward compatibility.
499 """
500 if isinstance(well_filter, list):
501 return set(well_filter)
503 if isinstance(well_filter, int):
504 # For integer filters, we can't pre-expand wells since it depends on processing order
505 # Return empty set - the max wells logic is handled in should_materialize_well
506 return set()
508 if isinstance(well_filter, str):
509 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells)
511 raise ValueError(f"Unsupported well filter type: {type(well_filter)}")
513 @staticmethod
514 def _parse_well_pattern(pattern: str, available_wells: List[str]) -> Set[str]:
515 """Parse string well patterns into well ID sets using available wells."""
516 pattern = pattern.strip()
518 # Comma-separated list
519 if WellPatternConstants.COMMA_SEPARATOR in pattern:
520 return set(w.strip() for w in pattern.split(WellPatternConstants.COMMA_SEPARATOR))
522 # Row pattern: "row:A"
523 if pattern.startswith(WellPatternConstants.ROW_PREFIX):
524 row = pattern[len(WellPatternConstants.ROW_PREFIX):].strip()
525 return WellFilterProcessor._expand_row_pattern(row, available_wells)
527 # Column pattern: "col:01-06"
528 if pattern.startswith(WellPatternConstants.COL_PREFIX):
529 col_spec = pattern[len(WellPatternConstants.COL_PREFIX):].strip()
530 return WellFilterProcessor._expand_col_pattern(col_spec, available_wells)
532 # Range pattern: "A01:A12"
533 if WellPatternConstants.RANGE_SEPARATOR in pattern:
534 return WellFilterProcessor._expand_range_pattern(pattern, available_wells)
536 # Single well
537 return {pattern}
539 @staticmethod
540 def _expand_row_pattern(row: str, available_wells: List[str]) -> Set[str]:
541 """Expand row pattern using available wells (format-agnostic)."""
542 # Direct prefix match (A01, B02, etc.)
543 result = {well for well in available_wells if well.startswith(row)}
545 # Opera Phenix format fallback (A → R01C*, B → R02C*)
546 if not result and len(row) == 1 and row.isalpha():
547 row_pattern = f"R{ord(row.upper()) - ord('A') + 1:02d}C"
548 result = {well for well in available_wells if well.startswith(row_pattern)}
550 return result
552 @staticmethod
553 def _expand_col_pattern(col_spec: str, available_wells: List[str]) -> Set[str]:
554 """Expand column pattern using available wells (format-agnostic)."""
555 # Parse column range
556 if "-" in col_spec:
557 start_col, end_col = map(int, col_spec.split("-"))
558 col_range = set(range(start_col, end_col + 1))
559 else:
560 col_range = {int(col_spec)}
562 # Extract numeric suffix and match (A01, B02, etc.)
563 def get_numeric_suffix(well: str) -> int:
564 digits = ''.join(char for char in reversed(well) if char.isdigit())
565 return int(digits[::-1]) if digits else 0
567 result = {well for well in available_wells if get_numeric_suffix(well) in col_range}
569 # Opera Phenix format fallback (C01, C02, etc.)
570 if not result:
571 patterns = {f"C{col:02d}" for col in col_range}
572 result = {well for well in available_wells
573 if any(pattern in well for pattern in patterns)}
575 return result
577 @staticmethod
578 def _expand_range_pattern(pattern: str, available_wells: List[str]) -> Set[str]:
579 """Expand range pattern using available wells (format-agnostic)."""
580 start_well, end_well = map(str.strip, pattern.split(WellPatternConstants.RANGE_SEPARATOR))
582 try:
583 start_idx, end_idx = available_wells.index(start_well), available_wells.index(end_well)
584 except ValueError as e:
585 raise ValueError(f"Range pattern '{pattern}' contains wells not in available wells: {e}")
587 # Ensure proper order and return range (inclusive)
588 start_idx, end_idx = sorted([start_idx, end_idx])
589 return set(available_wells[start_idx:end_idx + 1])