Coverage for openhcs/core/utils.py: 23.9%
238 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Utility functions for the OpenHCS package.
3"""
5import functools
6import logging
7import re
8import threading
9import time
10from collections import defaultdict
11from pathlib import Path
12from typing import Any, Callable, Dict, List, Optional, Union
14logger = logging.getLogger(__name__)
16class _ModulePlaceholder:
17 """
18 Placeholder for missing optional modules that allows attribute access
19 for type annotations while still being falsy and failing on actual use.
20 """
21 def __init__(self, module_name: str):
22 self._module_name = module_name
24 def __bool__(self):
25 return False
27 def __getattr__(self, name):
28 # Return another placeholder for chained attribute access
29 # This allows things like cp.ndarray in type annotations to work
30 return _ModulePlaceholder(f"{self._module_name}.{name}")
32 def __call__(self, *args, **kwargs):
33 # If someone tries to actually call a function, fail loudly
34 raise ImportError(f"Module '{self._module_name}' is not available. Please install the required dependency.")
36 def __repr__(self):
37 return f"<ModulePlaceholder for '{self._module_name}'>"
40def optional_import(module_name: str) -> Optional[Any]:
41 """
42 Import a module if available, otherwise return a placeholder that handles
43 attribute access gracefully for type annotations but fails on actual use.
45 This function allows for graceful handling of optional dependencies.
46 It can be used to import libraries that may not be installed,
47 particularly GPU-related libraries like torch, tensorflow, and cupy.
49 Args:
50 module_name: Name of the module to import
52 Returns:
53 The imported module if available, a placeholder otherwise
55 Example:
56 ```python
57 # Import torch if available
58 torch = optional_import("torch")
60 # Check if torch is available before using it
61 if torch:
62 # Use torch
63 tensor = torch.tensor([1, 2, 3])
64 else:
65 # Handle the case where torch is not available
66 raise ImportError("PyTorch is required for this function")
67 ```
68 """
69 try:
70 # Use importlib.import_module which handles dotted names properly
71 import importlib
72 return importlib.import_module(module_name)
73 except (ImportError, ModuleNotFoundError, AttributeError):
74 # Return a placeholder that handles attribute access gracefully
75 return _ModulePlaceholder(module_name)
77# Global thread activity tracking
78thread_activity = defaultdict(list)
79active_threads = set()
80thread_lock = threading.Lock()
82def get_thread_activity() -> Dict[int, List[Dict[str, Any]]]:
83 """
84 Get the current thread activity data.
86 Returns:
87 Dict mapping thread IDs to lists of activity records
88 """
89 return thread_activity
91def get_active_threads() -> set:
92 """
93 Get the set of currently active thread IDs.
95 Returns:
96 Set of active thread IDs
97 """
98 return active_threads
100def clear_thread_activity():
101 """Clear all thread activity data."""
102 with thread_lock:
103 thread_activity.clear()
104 active_threads.clear()
106def track_thread_activity(func: Optional[Callable] = None, *, log_level: str = "info"):
107 """
108 Decorator to track thread activity for a function.
110 Args:
111 func: The function to decorate
112 log_level: Logging level to use ("debug", "info", "warning", "error")
114 Returns:
115 Decorated function that tracks thread activity
116 """
117 def decorator(f):
118 @functools.wraps(f)
119 def wrapper(*args, **kwargs):
120 # Get thread information
121 thread_id = threading.get_ident()
122 thread_name = threading.current_thread().name
124 # Record thread start time
125 start_time = time.time()
127 # Extract function name and arguments for context
128 func_name = f.__name__
129 # Get the first argument if it's a method (self or cls)
130 context = ""
131 if args and hasattr(args[0], "__class__"):
132 if hasattr(args[0].__class__, func_name):
133 # It's likely a method, extract class name
134 context = f"{args[0].__class__.__name__}."
136 # Extract well information if present in kwargs or args
137 well = kwargs.get('well', None)
138 if well is None and len(args) > 1 and isinstance(args[1], str):
139 # Assume second argument might be well in methods like process_well(self, well, ...)
140 well = args[1]
142 # Add this thread to active threads
143 with thread_lock:
144 active_threads.add(thread_id)
145 # Record the number of active threads at this moment
146 thread_activity[thread_id].append({
147 'well': well,
148 'thread_name': thread_name,
149 'time': time.time(),
150 'action': 'start',
151 'function': f"{context}{func_name}",
152 'active_threads': len(active_threads)
153 })
155 # Log the start of the function
156 log_func = getattr(logger, log_level.lower())
157 log_func(f"Thread {thread_name} (ID: {thread_id}) started {context}{func_name} for well {well}")
158 log_func(f"Active threads: {len(active_threads)}")
160 try:
161 # Call the original function
162 result = f(*args, **kwargs)
163 return result
164 finally:
165 # Record thread end time
166 end_time = time.time()
167 duration = end_time - start_time
169 # Remove this thread from active threads
170 with thread_lock:
171 active_threads.remove(thread_id)
172 # Record the number of active threads at this moment
173 thread_activity[thread_id].append({
174 'well': well,
175 'thread_name': thread_name,
176 'time': time.time(),
177 'action': 'end',
178 'function': f"{context}{func_name}",
179 'duration': duration,
180 'active_threads': len(active_threads)
181 })
183 log_func(f"Thread {thread_name} (ID: {thread_id}) finished {context}{func_name} for well {well} in {duration:.2f} seconds")
184 log_func(f"Active threads: {len(active_threads)}")
186 return wrapper
188 # Handle both @track_thread_activity and @track_thread_activity(log_level="debug")
189 if func is None:
190 return decorator
191 return decorator(func)
193def analyze_thread_activity():
194 """
195 Analyze thread activity data and return a report.
197 Returns:
198 Dict containing analysis results
199 """
200 max_concurrent = 0
201 thread_starts = []
202 thread_ends = []
204 for thread_id, activities in thread_activity.items():
205 for activity in activities:
206 max_concurrent = max(max_concurrent, activity['active_threads'])
207 if activity['action'] == 'start':
208 thread_starts.append((
209 activity.get('well'),
210 activity['thread_name'],
211 activity['time'],
212 activity.get('function', '')
213 ))
214 else: # 'end'
215 thread_ends.append((
216 activity.get('well'),
217 activity['thread_name'],
218 activity['time'],
219 activity.get('duration', 0),
220 activity.get('function', '')
221 ))
223 # Sort by time
224 thread_starts.sort(key=lambda x: x[2])
225 thread_ends.sort(key=lambda x: x[2])
227 # Find overlapping time periods
228 overlaps = []
229 for i, (well1, thread1, start1, func1) in enumerate(thread_starts):
230 # Find the end time for this thread
231 end1 = None
232 for w, t, end, d, f in thread_ends:
233 if t == thread1 and w == well1 and f == func1:
234 end1 = end
235 break
237 if end1 is None:
238 continue # Skip if we can't find the end time
240 # Check for overlaps with other threads
241 for j, (well2, thread2, start2, func2) in enumerate(thread_starts):
242 if i == j or thread1 == thread2: # Skip same thread
243 continue
245 # Find the end time for the other thread
246 end2 = None
247 for w, t, end, d, f in thread_ends:
248 if t == thread2 and w == well2 and f == func2:
249 end2 = end
250 break
252 if end2 is None:
253 continue # Skip if we can't find the end time
255 # Check if there's an overlap
256 if start1 < end2 and start2 < end1:
257 overlap_start = max(start1, start2)
258 overlap_end = min(end1, end2)
259 overlap_duration = overlap_end - overlap_start
261 if overlap_duration > 0:
262 overlaps.append({
263 'thread1': thread1,
264 'well1': well1,
265 'function1': func1,
266 'thread2': thread2,
267 'well2': well2,
268 'function2': func2,
269 'duration': overlap_duration
270 })
272 return {
273 'max_concurrent': max_concurrent,
274 'thread_starts': thread_starts,
275 'thread_ends': thread_ends,
276 'overlaps': overlaps
277 }
279def print_thread_activity_report():
280 """Print a detailed report of thread activity."""
281 analysis = analyze_thread_activity()
283 print("\n" + "=" * 80)
284 print("Thread Activity Report")
285 print("=" * 80)
287 print("\nThread Start Events:")
288 for well, thread_name, time_val, func in analysis['thread_starts']:
289 print(f"Thread {thread_name} started {func} for well {well} at {time_val:.2f}")
291 print("\nThread End Events:")
292 for well, thread_name, time_val, duration, func in analysis['thread_ends']:
293 print(f"Thread {thread_name} finished {func} for well {well} at {time_val:.2f} (duration: {duration:.2f}s)")
295 print("\nOverlap Analysis:")
296 for overlap in analysis['overlaps']:
297 print(f"Threads {overlap['thread1']} and {overlap['thread2']} overlapped for {overlap['duration']:.2f}s")
298 print(f" {overlap['thread1']} was processing {overlap['function1']} for well {overlap['well1']}")
299 print(f" {overlap['thread2']} was processing {overlap['function2']} for well {overlap['well2']}")
301 print(f"\nFound {len(analysis['overlaps'])} thread overlaps")
302 print(f"Maximum concurrent threads: {analysis['max_concurrent']}")
303 print("=" * 80)
305 return analysis
308# Natural sorting utilities
309def natural_sort_key(text: Union[str, Path]) -> List[Union[str, int]]:
310 """
311 Generate a natural sorting key for a string or Path.
313 This function converts a string into a list of strings and integers
314 that can be used as a sorting key to achieve natural (human-friendly)
315 sorting order.
317 Args:
318 text: String or Path to generate sorting key for
320 Returns:
321 List of strings and integers for natural sorting
323 Examples:
324 >>> natural_sort_key("file10.txt")
325 ['file', 10, '.txt']
326 >>> natural_sort_key("A01_s001_w1_z001.tif")
327 ['A', 1, '_s', 1, '_w', 1, '_z', 1, '.tif']
328 """
329 text = str(text)
331 # Split on sequences of digits, keeping the digits
332 parts = re.split(r'(\d+)', text)
334 # Convert digit sequences to integers, leave other parts as strings
335 result = []
336 for part in parts:
337 if part.isdigit():
338 result.append(int(part))
339 else:
340 result.append(part)
342 return result
345def natural_sort(items: List[Union[str, Path]]) -> List[Union[str, Path]]:
346 """
347 Sort a list of strings or Paths using natural sorting.
349 Args:
350 items: List of strings or Paths to sort
352 Returns:
353 New list sorted in natural order
355 Examples:
356 >>> natural_sort(["file1.txt", "file10.txt", "file2.txt"])
357 ['file1.txt', 'file2.txt', 'file10.txt']
358 >>> natural_sort(["A01_s001.tif", "A01_s010.tif", "A01_s002.tif"])
359 ['A01_s001.tif', 'A01_s002.tif', 'A01_s010.tif']
360 """
361 return sorted(items, key=natural_sort_key)
364def natural_sort_inplace(items: List[Union[str, Path]]) -> None:
365 """
366 Sort a list of strings or Paths using natural sorting in-place.
368 Args:
369 items: List of strings or Paths to sort in-place
371 Examples:
372 >>> files = ["file1.txt", "file10.txt", "file2.txt"]
373 >>> natural_sort_inplace(files)
374 >>> files
375 ['file1.txt', 'file2.txt', 'file10.txt']
376 """
377 items.sort(key=natural_sort_key)
380# === WELL FILTERING UTILITIES ===
382from typing import List, Set, Union
383from openhcs.core.config import WellFilterMode
386class WellPatternConstants:
387 """Centralized constants for well pattern parsing."""
388 COMMA_SEPARATOR = ","
389 RANGE_SEPARATOR = ":"
390 ROW_PREFIX = "row:"
391 COL_PREFIX = "col:"
394class WellFilterProcessor:
395 """
396 Enhanced well filtering processor supporting both compilation-time and execution-time filtering.
398 Maintains backward compatibility with existing execution-time methods while adding
399 compilation-time capabilities for the 5-phase compilation system.
401 Follows systematic refactoring framework principles:
402 - Fail-loud validation with clear error messages
403 - Pythonic patterns and idioms
404 - Leverages existing well filtering infrastructure
405 - Eliminates magic strings through centralized constants
406 """
408 # === NEW COMPILATION-TIME METHOD ===
410 @staticmethod
411 def resolve_compilation_filter(
412 well_filter: Union[List[str], str, int],
413 available_wells: List[str]
414 ) -> Set[str]:
415 """
416 Resolve well filter to concrete well set during compilation.
418 Combines validation and resolution in single method to avoid verbose helper methods.
419 Supports all existing filter types while providing compilation-time optimization.
420 Works with any well naming format (A01, R01C03, etc.) by using available wells.
422 Args:
423 well_filter: Filter specification (list, string pattern, or max count)
424 available_wells: Ordered list of wells from orchestrator.get_component_keys(MULTIPROCESSING_AXIS)
426 Returns:
427 Set of well IDs that match the filter
429 Raises:
430 ValueError: If wells don't exist, insufficient wells for count, or invalid patterns
431 """
432 if isinstance(well_filter, list): 432 ↛ 434line 432 didn't jump to line 434 because the condition on line 432 was never true
433 # Inline validation for specific wells
434 available_set = set(available_wells)
435 invalid_wells = [w for w in well_filter if w not in available_set]
436 if invalid_wells:
437 raise ValueError(
438 f"Invalid wells specified: {invalid_wells}. "
439 f"Available wells: {sorted(available_set)}"
440 )
441 return set(well_filter)
443 elif isinstance(well_filter, int): 443 ↛ 453line 443 didn't jump to line 453 because the condition on line 443 was always true
444 # Inline validation for max count
445 if well_filter <= 0: 445 ↛ 446line 445 didn't jump to line 446 because the condition on line 445 was never true
446 raise ValueError(f"Max count must be positive, got: {well_filter}")
447 if well_filter > len(available_wells): 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true
448 raise ValueError(
449 f"Requested {well_filter} wells but only {len(available_wells)} available"
450 )
451 return set(available_wells[:well_filter])
453 elif isinstance(well_filter, str):
454 # Check if string is a numeric value (common UI input issue)
455 if well_filter.strip().isdigit():
456 # Convert numeric string to integer and process as max count
457 numeric_value = int(well_filter.strip())
458 if numeric_value <= 0:
459 raise ValueError(f"Max count must be positive, got: {numeric_value}")
460 if numeric_value > len(available_wells):
461 raise ValueError(
462 f"Requested {numeric_value} wells but only {len(available_wells)} available"
463 )
464 return set(available_wells[:numeric_value])
466 else:
467 # Non-numeric string - pass to pattern parsing for format-agnostic support
468 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells)
470 else:
471 raise ValueError(f"Unsupported well filter type: {type(well_filter)}")
473 # === EXISTING EXECUTION-TIME METHODS (MAINTAINED) ===
475 @staticmethod
476 def should_materialize_well(
477 well_id: str,
478 config, # MaterializationPathConfig
479 processed_wells: Set[str]
480 ) -> bool:
481 """
482 EXISTING METHOD: Determine if a well should be materialized during execution.
483 Maintained for backward compatibility and execution-time fallback.
484 """
485 if config.well_filter is None:
486 return True # No filter = materialize all wells
488 # Expand filter pattern to well list
489 target_wells = WellFilterProcessor.expand_well_filter(config.well_filter)
491 # Apply max wells limit if filter is integer
492 if isinstance(config.well_filter, int):
493 if len(processed_wells) >= config.well_filter:
494 return False
496 # Check if well matches filter
497 well_in_filter = well_id in target_wells
499 # Apply include/exclude mode
500 if config.well_filter_mode == WellFilterMode.INCLUDE:
501 return well_in_filter
502 else: # EXCLUDE mode
503 return not well_in_filter
505 @staticmethod
506 def expand_well_filter(well_filter: Union[List[str], str, int]) -> Set[str]:
507 """
508 EXISTING METHOD: Expand well filter pattern to set of well IDs.
509 Maintained for backward compatibility.
510 """
511 if isinstance(well_filter, list):
512 return set(well_filter)
514 if isinstance(well_filter, int):
515 # For integer filters, we can't pre-expand wells since it depends on processing order
516 # Return empty set - the max wells logic is handled in should_materialize_well
517 return set()
519 if isinstance(well_filter, str):
520 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells)
522 raise ValueError(f"Unsupported well filter type: {type(well_filter)}")
524 @staticmethod
525 def _parse_well_pattern(pattern: str, available_wells: List[str]) -> Set[str]:
526 """Parse string well patterns into well ID sets using available wells."""
527 pattern = pattern.strip()
529 # Comma-separated list
530 if WellPatternConstants.COMMA_SEPARATOR in pattern:
531 return set(w.strip() for w in pattern.split(WellPatternConstants.COMMA_SEPARATOR))
533 # Row pattern: "row:A"
534 if pattern.startswith(WellPatternConstants.ROW_PREFIX):
535 row = pattern[len(WellPatternConstants.ROW_PREFIX):].strip()
536 return WellFilterProcessor._expand_row_pattern(row, available_wells)
538 # Column pattern: "col:01-06"
539 if pattern.startswith(WellPatternConstants.COL_PREFIX):
540 col_spec = pattern[len(WellPatternConstants.COL_PREFIX):].strip()
541 return WellFilterProcessor._expand_col_pattern(col_spec, available_wells)
543 # Range pattern: "A01:A12"
544 if WellPatternConstants.RANGE_SEPARATOR in pattern:
545 return WellFilterProcessor._expand_range_pattern(pattern, available_wells)
547 # Single well
548 return {pattern}
550 @staticmethod
551 def _expand_row_pattern(row: str, available_wells: List[str]) -> Set[str]:
552 """Expand row pattern using available wells (format-agnostic)."""
553 # Direct prefix match (A01, B02, etc.)
554 result = {well for well in available_wells if well.startswith(row)}
556 # Opera Phenix format fallback (A → R01C*, B → R02C*)
557 if not result and len(row) == 1 and row.isalpha():
558 row_pattern = f"R{ord(row.upper()) - ord('A') + 1:02d}C"
559 result = {well for well in available_wells if well.startswith(row_pattern)}
561 return result
563 @staticmethod
564 def _expand_col_pattern(col_spec: str, available_wells: List[str]) -> Set[str]:
565 """Expand column pattern using available wells (format-agnostic)."""
566 # Parse column range
567 if "-" in col_spec:
568 start_col, end_col = map(int, col_spec.split("-"))
569 col_range = set(range(start_col, end_col + 1))
570 else:
571 col_range = {int(col_spec)}
573 # Extract numeric suffix and match (A01, B02, etc.)
574 def get_numeric_suffix(well: str) -> int:
575 digits = ''.join(char for char in reversed(well) if char.isdigit())
576 return int(digits[::-1]) if digits else 0
578 result = {well for well in available_wells if get_numeric_suffix(well) in col_range}
580 # Opera Phenix format fallback (C01, C02, etc.)
581 if not result:
582 patterns = {f"C{col:02d}" for col in col_range}
583 result = {well for well in available_wells
584 if any(pattern in well for pattern in patterns)}
586 return result
588 @staticmethod
589 def _expand_range_pattern(pattern: str, available_wells: List[str]) -> Set[str]:
590 """Expand range pattern using available wells (format-agnostic)."""
591 start_well, end_well = map(str.strip, pattern.split(WellPatternConstants.RANGE_SEPARATOR))
593 try:
594 start_idx, end_idx = available_wells.index(start_well), available_wells.index(end_well)
595 except ValueError as e:
596 raise ValueError(f"Range pattern '{pattern}' contains wells not in available wells: {e}")
598 # Ensure proper order and return range (inclusive)
599 start_idx, end_idx = sorted([start_idx, end_idx])
600 return set(available_wells[start_idx:end_idx + 1])