Coverage for openhcs/core/utils.py: 25.7%

233 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Utility functions for the OpenHCS package. 

3""" 

4 

5import functools 

6import logging 

7import re 

8import threading 

9import time 

10from collections import defaultdict 

11from pathlib import Path 

12from typing import Any, Callable, Dict, List, Optional, Union 

13 

14logger = logging.getLogger(__name__) 

15 

16class _ModulePlaceholder: 

17 """ 

18 Placeholder for missing optional modules that allows attribute access 

19 for type annotations while still being falsy and failing on actual use. 

20 """ 

21 def __init__(self, module_name: str): 

22 self._module_name = module_name 

23 

24 def __bool__(self): 

25 return False 

26 

27 def __getattr__(self, name): 

28 # Return another placeholder for chained attribute access 

29 # This allows things like cp.ndarray in type annotations to work 

30 return _ModulePlaceholder(f"{self._module_name}.{name}") 

31 

32 def __call__(self, *args, **kwargs): 

33 # If someone tries to actually call a function, fail loudly 

34 raise ImportError(f"Module '{self._module_name}' is not available. Please install the required dependency.") 

35 

36 def __repr__(self): 

37 return f"<ModulePlaceholder for '{self._module_name}'>" 

38 

39 

40def optional_import(module_name: str) -> Optional[Any]: 

41 """ 

42 Import a module if available, otherwise return a placeholder that handles 

43 attribute access gracefully for type annotations but fails on actual use. 

44 

45 This function allows for graceful handling of optional dependencies. 

46 It can be used to import libraries that may not be installed, 

47 particularly GPU-related libraries like torch, tensorflow, and cupy. 

48 

49 Args: 

50 module_name: Name of the module to import 

51 

52 Returns: 

53 The imported module if available, a placeholder otherwise 

54 

55 Example: 

56 ```python 

57 # Import torch if available 

58 torch = optional_import("torch") 

59 

60 # Check if torch is available before using it 

61 if torch: 

62 # Use torch 

63 tensor = torch.tensor([1, 2, 3]) 

64 else: 

65 # Handle the case where torch is not available 

66 raise ImportError("PyTorch is required for this function") 

67 ``` 

68 """ 

69 try: 

70 # Use importlib.import_module which handles dotted names properly 

71 import importlib 

72 return importlib.import_module(module_name) 

73 except (ImportError, ModuleNotFoundError, AttributeError): 

74 # Return a placeholder that handles attribute access gracefully 

75 return _ModulePlaceholder(module_name) 

76 

77# Global thread activity tracking 

78thread_activity = defaultdict(list) 

79active_threads = set() 

80thread_lock = threading.Lock() 

81 

82def get_thread_activity() -> Dict[int, List[Dict[str, Any]]]: 

83 """ 

84 Get the current thread activity data. 

85 

86 Returns: 

87 Dict mapping thread IDs to lists of activity records 

88 """ 

89 return thread_activity 

90 

91def get_active_threads() -> set: 

92 """ 

93 Get the set of currently active thread IDs. 

94 

95 Returns: 

96 Set of active thread IDs 

97 """ 

98 return active_threads 

99 

100def clear_thread_activity(): 

101 """Clear all thread activity data.""" 

102 with thread_lock: 

103 thread_activity.clear() 

104 active_threads.clear() 

105 

106def track_thread_activity(func: Optional[Callable] = None, *, log_level: str = "info"): 

107 """ 

108 Decorator to track thread activity for a function. 

109 

110 Args: 

111 func: The function to decorate 

112 log_level: Logging level to use ("debug", "info", "warning", "error") 

113 

114 Returns: 

115 Decorated function that tracks thread activity 

116 """ 

117 def decorator(f): 

118 @functools.wraps(f) 

119 def wrapper(*args, **kwargs): 

120 # Get thread information 

121 thread_id = threading.get_ident() 

122 thread_name = threading.current_thread().name 

123 

124 # Record thread start time 

125 start_time = time.time() 

126 

127 # Extract function name and arguments for context 

128 func_name = f.__name__ 

129 # Get the first argument if it's a method (self or cls) 

130 context = "" 

131 if args and hasattr(args[0], "__class__"): 

132 if hasattr(args[0].__class__, func_name): 

133 # It's likely a method, extract class name 

134 context = f"{args[0].__class__.__name__}." 

135 

136 # Extract well information if present in kwargs or args 

137 well = kwargs.get('well', None) 

138 if well is None and len(args) > 1 and isinstance(args[1], str): 

139 # Assume second argument might be well in methods like process_well(self, well, ...) 

140 well = args[1] 

141 

142 # Add this thread to active threads 

143 with thread_lock: 

144 active_threads.add(thread_id) 

145 # Record the number of active threads at this moment 

146 thread_activity[thread_id].append({ 

147 'well': well, 

148 'thread_name': thread_name, 

149 'time': time.time(), 

150 'action': 'start', 

151 'function': f"{context}{func_name}", 

152 'active_threads': len(active_threads) 

153 }) 

154 

155 # Log the start of the function 

156 log_func = getattr(logger, log_level.lower()) 

157 log_func(f"Thread {thread_name} (ID: {thread_id}) started {context}{func_name} for well {well}") 

158 log_func(f"Active threads: {len(active_threads)}") 

159 

160 try: 

161 # Call the original function 

162 result = f(*args, **kwargs) 

163 return result 

164 finally: 

165 # Record thread end time 

166 end_time = time.time() 

167 duration = end_time - start_time 

168 

169 # Remove this thread from active threads 

170 with thread_lock: 

171 active_threads.remove(thread_id) 

172 # Record the number of active threads at this moment 

173 thread_activity[thread_id].append({ 

174 'well': well, 

175 'thread_name': thread_name, 

176 'time': time.time(), 

177 'action': 'end', 

178 'function': f"{context}{func_name}", 

179 'duration': duration, 

180 'active_threads': len(active_threads) 

181 }) 

182 

183 log_func(f"Thread {thread_name} (ID: {thread_id}) finished {context}{func_name} for well {well} in {duration:.2f} seconds") 

184 log_func(f"Active threads: {len(active_threads)}") 

185 

186 return wrapper 

187 

188 # Handle both @track_thread_activity and @track_thread_activity(log_level="debug") 

189 if func is None: 

190 return decorator 

191 return decorator(func) 

192 

193def analyze_thread_activity(): 

194 """ 

195 Analyze thread activity data and return a report. 

196 

197 Returns: 

198 Dict containing analysis results 

199 """ 

200 max_concurrent = 0 

201 thread_starts = [] 

202 thread_ends = [] 

203 

204 for thread_id, activities in thread_activity.items(): 

205 for activity in activities: 

206 max_concurrent = max(max_concurrent, activity['active_threads']) 

207 if activity['action'] == 'start': 

208 thread_starts.append(( 

209 activity.get('well'), 

210 activity['thread_name'], 

211 activity['time'], 

212 activity.get('function', '') 

213 )) 

214 else: # 'end' 

215 thread_ends.append(( 

216 activity.get('well'), 

217 activity['thread_name'], 

218 activity['time'], 

219 activity.get('duration', 0), 

220 activity.get('function', '') 

221 )) 

222 

223 # Sort by time 

224 thread_starts.sort(key=lambda x: x[2]) 

225 thread_ends.sort(key=lambda x: x[2]) 

226 

227 # Find overlapping time periods 

228 overlaps = [] 

229 for i, (well1, thread1, start1, func1) in enumerate(thread_starts): 

230 # Find the end time for this thread 

231 end1 = None 

232 for w, t, end, d, f in thread_ends: 

233 if t == thread1 and w == well1 and f == func1: 

234 end1 = end 

235 break 

236 

237 if end1 is None: 

238 continue # Skip if we can't find the end time 

239 

240 # Check for overlaps with other threads 

241 for j, (well2, thread2, start2, func2) in enumerate(thread_starts): 

242 if i == j or thread1 == thread2: # Skip same thread 

243 continue 

244 

245 # Find the end time for the other thread 

246 end2 = None 

247 for w, t, end, d, f in thread_ends: 

248 if t == thread2 and w == well2 and f == func2: 

249 end2 = end 

250 break 

251 

252 if end2 is None: 

253 continue # Skip if we can't find the end time 

254 

255 # Check if there's an overlap 

256 if start1 < end2 and start2 < end1: 

257 overlap_start = max(start1, start2) 

258 overlap_end = min(end1, end2) 

259 overlap_duration = overlap_end - overlap_start 

260 

261 if overlap_duration > 0: 

262 overlaps.append({ 

263 'thread1': thread1, 

264 'well1': well1, 

265 'function1': func1, 

266 'thread2': thread2, 

267 'well2': well2, 

268 'function2': func2, 

269 'duration': overlap_duration 

270 }) 

271 

272 return { 

273 'max_concurrent': max_concurrent, 

274 'thread_starts': thread_starts, 

275 'thread_ends': thread_ends, 

276 'overlaps': overlaps 

277 } 

278 

279def print_thread_activity_report(): 

280 """Print a detailed report of thread activity.""" 

281 analysis = analyze_thread_activity() 

282 

283 print("\n" + "=" * 80) 

284 print("Thread Activity Report") 

285 print("=" * 80) 

286 

287 print("\nThread Start Events:") 

288 for well, thread_name, time_val, func in analysis['thread_starts']: 

289 print(f"Thread {thread_name} started {func} for well {well} at {time_val:.2f}") 

290 

291 print("\nThread End Events:") 

292 for well, thread_name, time_val, duration, func in analysis['thread_ends']: 

293 print(f"Thread {thread_name} finished {func} for well {well} at {time_val:.2f} (duration: {duration:.2f}s)") 

294 

295 print("\nOverlap Analysis:") 

296 for overlap in analysis['overlaps']: 

297 print(f"Threads {overlap['thread1']} and {overlap['thread2']} overlapped for {overlap['duration']:.2f}s") 

298 print(f" {overlap['thread1']} was processing {overlap['function1']} for well {overlap['well1']}") 

299 print(f" {overlap['thread2']} was processing {overlap['function2']} for well {overlap['well2']}") 

300 

301 print(f"\nFound {len(analysis['overlaps'])} thread overlaps") 

302 print(f"Maximum concurrent threads: {analysis['max_concurrent']}") 

303 print("=" * 80) 

304 

305 return analysis 

306 

307 

308# Natural sorting utilities 

309def natural_sort_key(text: Union[str, Path]) -> List[Union[str, int]]: 

310 """ 

311 Generate a natural sorting key for a string or Path. 

312 

313 This function converts a string into a list of strings and integers 

314 that can be used as a sorting key to achieve natural (human-friendly) 

315 sorting order. 

316 

317 Args: 

318 text: String or Path to generate sorting key for 

319 

320 Returns: 

321 List of strings and integers for natural sorting 

322 

323 Examples: 

324 >>> natural_sort_key("file10.txt") 

325 ['file', 10, '.txt'] 

326 >>> natural_sort_key("A01_s001_w1_z001.tif") 

327 ['A', 1, '_s', 1, '_w', 1, '_z', 1, '.tif'] 

328 """ 

329 text = str(text) 

330 

331 # Split on sequences of digits, keeping the digits 

332 parts = re.split(r'(\d+)', text) 

333 

334 # Convert digit sequences to integers, leave other parts as strings 

335 result = [] 

336 for part in parts: 

337 if part.isdigit(): 

338 result.append(int(part)) 

339 else: 

340 result.append(part) 

341 

342 return result 

343 

344 

345def natural_sort(items: List[Union[str, Path]]) -> List[Union[str, Path]]: 

346 """ 

347 Sort a list of strings or Paths using natural sorting. 

348 

349 Args: 

350 items: List of strings or Paths to sort 

351 

352 Returns: 

353 New list sorted in natural order 

354 

355 Examples: 

356 >>> natural_sort(["file1.txt", "file10.txt", "file2.txt"]) 

357 ['file1.txt', 'file2.txt', 'file10.txt'] 

358 >>> natural_sort(["A01_s001.tif", "A01_s010.tif", "A01_s002.tif"]) 

359 ['A01_s001.tif', 'A01_s002.tif', 'A01_s010.tif'] 

360 """ 

361 return sorted(items, key=natural_sort_key) 

362 

363 

364def natural_sort_inplace(items: List[Union[str, Path]]) -> None: 

365 """ 

366 Sort a list of strings or Paths using natural sorting in-place. 

367 

368 Args: 

369 items: List of strings or Paths to sort in-place 

370 

371 Examples: 

372 >>> files = ["file1.txt", "file10.txt", "file2.txt"] 

373 >>> natural_sort_inplace(files) 

374 >>> files 

375 ['file1.txt', 'file2.txt', 'file10.txt'] 

376 """ 

377 items.sort(key=natural_sort_key) 

378 

379 

380# === WELL FILTERING UTILITIES === 

381 

382import re 

383import string 

384from typing import List, Set, Union 

385from openhcs.core.config import WellFilterMode 

386 

387 

388class WellPatternConstants: 

389 """Centralized constants for well pattern parsing.""" 

390 COMMA_SEPARATOR = "," 

391 RANGE_SEPARATOR = ":" 

392 ROW_PREFIX = "row:" 

393 COL_PREFIX = "col:" 

394 

395 

396class WellFilterProcessor: 

397 """ 

398 Enhanced well filtering processor supporting both compilation-time and execution-time filtering. 

399 

400 Maintains backward compatibility with existing execution-time methods while adding 

401 compilation-time capabilities for the 5-phase compilation system. 

402 

403 Follows systematic refactoring framework principles: 

404 - Fail-loud validation with clear error messages 

405 - Pythonic patterns and idioms 

406 - Leverages existing well filtering infrastructure 

407 - Eliminates magic strings through centralized constants 

408 """ 

409 

410 # === NEW COMPILATION-TIME METHOD === 

411 

412 @staticmethod 

413 def resolve_compilation_filter( 

414 well_filter: Union[List[str], str, int], 

415 available_wells: List[str] 

416 ) -> Set[str]: 

417 """ 

418 Resolve well filter to concrete well set during compilation. 

419 

420 Combines validation and resolution in single method to avoid verbose helper methods. 

421 Supports all existing filter types while providing compilation-time optimization. 

422 Works with any well naming format (A01, R01C03, etc.) by using available wells. 

423 

424 Args: 

425 well_filter: Filter specification (list, string pattern, or max count) 

426 available_wells: Ordered list of wells from orchestrator.get_component_keys(GroupBy.WELL) 

427 

428 Returns: 

429 Set of well IDs that match the filter 

430 

431 Raises: 

432 ValueError: If wells don't exist, insufficient wells for count, or invalid patterns 

433 """ 

434 if isinstance(well_filter, list): 434 ↛ 436line 434 didn't jump to line 436 because the condition on line 434 was never true

435 # Inline validation for specific wells 

436 available_set = set(available_wells) 

437 invalid_wells = [w for w in well_filter if w not in available_set] 

438 if invalid_wells: 

439 raise ValueError( 

440 f"Invalid wells specified: {invalid_wells}. " 

441 f"Available wells: {sorted(available_set)}" 

442 ) 

443 return set(well_filter) 

444 

445 elif isinstance(well_filter, int): 445 ↛ 455line 445 didn't jump to line 455 because the condition on line 445 was always true

446 # Inline validation for max count 

447 if well_filter <= 0: 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true

448 raise ValueError(f"Max count must be positive, got: {well_filter}") 

449 if well_filter > len(available_wells): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 raise ValueError( 

451 f"Requested {well_filter} wells but only {len(available_wells)} available" 

452 ) 

453 return set(available_wells[:well_filter]) 

454 

455 elif isinstance(well_filter, str): 

456 # Pass available wells to pattern parsing for format-agnostic support 

457 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells) 

458 

459 else: 

460 raise ValueError(f"Unsupported well filter type: {type(well_filter)}") 

461 

462 # === EXISTING EXECUTION-TIME METHODS (MAINTAINED) === 

463 

464 @staticmethod 

465 def should_materialize_well( 

466 well_id: str, 

467 config, # MaterializationPathConfig 

468 processed_wells: Set[str] 

469 ) -> bool: 

470 """ 

471 EXISTING METHOD: Determine if a well should be materialized during execution. 

472 Maintained for backward compatibility and execution-time fallback. 

473 """ 

474 if config.well_filter is None: 

475 return True # No filter = materialize all wells 

476 

477 # Expand filter pattern to well list 

478 target_wells = WellFilterProcessor.expand_well_filter(config.well_filter) 

479 

480 # Apply max wells limit if filter is integer 

481 if isinstance(config.well_filter, int): 

482 if len(processed_wells) >= config.well_filter: 

483 return False 

484 

485 # Check if well matches filter 

486 well_in_filter = well_id in target_wells 

487 

488 # Apply include/exclude mode 

489 if config.well_filter_mode == WellFilterMode.INCLUDE: 

490 return well_in_filter 

491 else: # EXCLUDE mode 

492 return not well_in_filter 

493 

494 @staticmethod 

495 def expand_well_filter(well_filter: Union[List[str], str, int]) -> Set[str]: 

496 """ 

497 EXISTING METHOD: Expand well filter pattern to set of well IDs. 

498 Maintained for backward compatibility. 

499 """ 

500 if isinstance(well_filter, list): 

501 return set(well_filter) 

502 

503 if isinstance(well_filter, int): 

504 # For integer filters, we can't pre-expand wells since it depends on processing order 

505 # Return empty set - the max wells logic is handled in should_materialize_well 

506 return set() 

507 

508 if isinstance(well_filter, str): 

509 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells) 

510 

511 raise ValueError(f"Unsupported well filter type: {type(well_filter)}") 

512 

513 @staticmethod 

514 def _parse_well_pattern(pattern: str, available_wells: List[str]) -> Set[str]: 

515 """Parse string well patterns into well ID sets using available wells.""" 

516 pattern = pattern.strip() 

517 

518 # Comma-separated list 

519 if WellPatternConstants.COMMA_SEPARATOR in pattern: 

520 return set(w.strip() for w in pattern.split(WellPatternConstants.COMMA_SEPARATOR)) 

521 

522 # Row pattern: "row:A" 

523 if pattern.startswith(WellPatternConstants.ROW_PREFIX): 

524 row = pattern[len(WellPatternConstants.ROW_PREFIX):].strip() 

525 return WellFilterProcessor._expand_row_pattern(row, available_wells) 

526 

527 # Column pattern: "col:01-06" 

528 if pattern.startswith(WellPatternConstants.COL_PREFIX): 

529 col_spec = pattern[len(WellPatternConstants.COL_PREFIX):].strip() 

530 return WellFilterProcessor._expand_col_pattern(col_spec, available_wells) 

531 

532 # Range pattern: "A01:A12" 

533 if WellPatternConstants.RANGE_SEPARATOR in pattern: 

534 return WellFilterProcessor._expand_range_pattern(pattern, available_wells) 

535 

536 # Single well 

537 return {pattern} 

538 

539 @staticmethod 

540 def _expand_row_pattern(row: str, available_wells: List[str]) -> Set[str]: 

541 """Expand row pattern using available wells (format-agnostic).""" 

542 # Direct prefix match (A01, B02, etc.) 

543 result = {well for well in available_wells if well.startswith(row)} 

544 

545 # Opera Phenix format fallback (A → R01C*, B → R02C*) 

546 if not result and len(row) == 1 and row.isalpha(): 

547 row_pattern = f"R{ord(row.upper()) - ord('A') + 1:02d}C" 

548 result = {well for well in available_wells if well.startswith(row_pattern)} 

549 

550 return result 

551 

552 @staticmethod 

553 def _expand_col_pattern(col_spec: str, available_wells: List[str]) -> Set[str]: 

554 """Expand column pattern using available wells (format-agnostic).""" 

555 # Parse column range 

556 if "-" in col_spec: 

557 start_col, end_col = map(int, col_spec.split("-")) 

558 col_range = set(range(start_col, end_col + 1)) 

559 else: 

560 col_range = {int(col_spec)} 

561 

562 # Extract numeric suffix and match (A01, B02, etc.) 

563 def get_numeric_suffix(well: str) -> int: 

564 digits = ''.join(char for char in reversed(well) if char.isdigit()) 

565 return int(digits[::-1]) if digits else 0 

566 

567 result = {well for well in available_wells if get_numeric_suffix(well) in col_range} 

568 

569 # Opera Phenix format fallback (C01, C02, etc.) 

570 if not result: 

571 patterns = {f"C{col:02d}" for col in col_range} 

572 result = {well for well in available_wells 

573 if any(pattern in well for pattern in patterns)} 

574 

575 return result 

576 

577 @staticmethod 

578 def _expand_range_pattern(pattern: str, available_wells: List[str]) -> Set[str]: 

579 """Expand range pattern using available wells (format-agnostic).""" 

580 start_well, end_well = map(str.strip, pattern.split(WellPatternConstants.RANGE_SEPARATOR)) 

581 

582 try: 

583 start_idx, end_idx = available_wells.index(start_well), available_wells.index(end_well) 

584 except ValueError as e: 

585 raise ValueError(f"Range pattern '{pattern}' contains wells not in available wells: {e}") 

586 

587 # Ensure proper order and return range (inclusive) 

588 start_idx, end_idx = sorted([start_idx, end_idx]) 

589 return set(available_wells[start_idx:end_idx + 1])