Coverage for openhcs/core/utils.py: 23.9%

238 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Utility functions for the OpenHCS package. 

3""" 

4 

5import functools 

6import logging 

7import re 

8import threading 

9import time 

10from collections import defaultdict 

11from pathlib import Path 

12from typing import Any, Callable, Dict, List, Optional, Union 

13 

14logger = logging.getLogger(__name__) 

15 

16class _ModulePlaceholder: 

17 """ 

18 Placeholder for missing optional modules that allows attribute access 

19 for type annotations while still being falsy and failing on actual use. 

20 """ 

21 def __init__(self, module_name: str): 

22 self._module_name = module_name 

23 

24 def __bool__(self): 

25 return False 

26 

27 def __getattr__(self, name): 

28 # Return another placeholder for chained attribute access 

29 # This allows things like cp.ndarray in type annotations to work 

30 return _ModulePlaceholder(f"{self._module_name}.{name}") 

31 

32 def __call__(self, *args, **kwargs): 

33 # If someone tries to actually call a function, fail loudly 

34 raise ImportError(f"Module '{self._module_name}' is not available. Please install the required dependency.") 

35 

36 def __repr__(self): 

37 return f"<ModulePlaceholder for '{self._module_name}'>" 

38 

39 

40def optional_import(module_name: str) -> Optional[Any]: 

41 """ 

42 Import a module if available, otherwise return a placeholder that handles 

43 attribute access gracefully for type annotations but fails on actual use. 

44 

45 This function allows for graceful handling of optional dependencies. 

46 It can be used to import libraries that may not be installed, 

47 particularly GPU-related libraries like torch, tensorflow, and cupy. 

48 

49 Args: 

50 module_name: Name of the module to import 

51 

52 Returns: 

53 The imported module if available, a placeholder otherwise 

54 

55 Example: 

56 ```python 

57 # Import torch if available 

58 torch = optional_import("torch") 

59 

60 # Check if torch is available before using it 

61 if torch: 

62 # Use torch 

63 tensor = torch.tensor([1, 2, 3]) 

64 else: 

65 # Handle the case where torch is not available 

66 raise ImportError("PyTorch is required for this function") 

67 ``` 

68 """ 

69 try: 

70 # Use importlib.import_module which handles dotted names properly 

71 import importlib 

72 return importlib.import_module(module_name) 

73 except (ImportError, ModuleNotFoundError, AttributeError): 

74 # Return a placeholder that handles attribute access gracefully 

75 return _ModulePlaceholder(module_name) 

76 

77# Global thread activity tracking 

78thread_activity = defaultdict(list) 

79active_threads = set() 

80thread_lock = threading.Lock() 

81 

82def get_thread_activity() -> Dict[int, List[Dict[str, Any]]]: 

83 """ 

84 Get the current thread activity data. 

85 

86 Returns: 

87 Dict mapping thread IDs to lists of activity records 

88 """ 

89 return thread_activity 

90 

91def get_active_threads() -> set: 

92 """ 

93 Get the set of currently active thread IDs. 

94 

95 Returns: 

96 Set of active thread IDs 

97 """ 

98 return active_threads 

99 

100def clear_thread_activity(): 

101 """Clear all thread activity data.""" 

102 with thread_lock: 

103 thread_activity.clear() 

104 active_threads.clear() 

105 

106def track_thread_activity(func: Optional[Callable] = None, *, log_level: str = "info"): 

107 """ 

108 Decorator to track thread activity for a function. 

109 

110 Args: 

111 func: The function to decorate 

112 log_level: Logging level to use ("debug", "info", "warning", "error") 

113 

114 Returns: 

115 Decorated function that tracks thread activity 

116 """ 

117 def decorator(f): 

118 @functools.wraps(f) 

119 def wrapper(*args, **kwargs): 

120 # Get thread information 

121 thread_id = threading.get_ident() 

122 thread_name = threading.current_thread().name 

123 

124 # Record thread start time 

125 start_time = time.time() 

126 

127 # Extract function name and arguments for context 

128 func_name = f.__name__ 

129 # Get the first argument if it's a method (self or cls) 

130 context = "" 

131 if args and hasattr(args[0], "__class__"): 

132 if hasattr(args[0].__class__, func_name): 

133 # It's likely a method, extract class name 

134 context = f"{args[0].__class__.__name__}." 

135 

136 # Extract well information if present in kwargs or args 

137 well = kwargs.get('well', None) 

138 if well is None and len(args) > 1 and isinstance(args[1], str): 

139 # Assume second argument might be well in methods like process_well(self, well, ...) 

140 well = args[1] 

141 

142 # Add this thread to active threads 

143 with thread_lock: 

144 active_threads.add(thread_id) 

145 # Record the number of active threads at this moment 

146 thread_activity[thread_id].append({ 

147 'well': well, 

148 'thread_name': thread_name, 

149 'time': time.time(), 

150 'action': 'start', 

151 'function': f"{context}{func_name}", 

152 'active_threads': len(active_threads) 

153 }) 

154 

155 # Log the start of the function 

156 log_func = getattr(logger, log_level.lower()) 

157 log_func(f"Thread {thread_name} (ID: {thread_id}) started {context}{func_name} for well {well}") 

158 log_func(f"Active threads: {len(active_threads)}") 

159 

160 try: 

161 # Call the original function 

162 result = f(*args, **kwargs) 

163 return result 

164 finally: 

165 # Record thread end time 

166 end_time = time.time() 

167 duration = end_time - start_time 

168 

169 # Remove this thread from active threads 

170 with thread_lock: 

171 active_threads.remove(thread_id) 

172 # Record the number of active threads at this moment 

173 thread_activity[thread_id].append({ 

174 'well': well, 

175 'thread_name': thread_name, 

176 'time': time.time(), 

177 'action': 'end', 

178 'function': f"{context}{func_name}", 

179 'duration': duration, 

180 'active_threads': len(active_threads) 

181 }) 

182 

183 log_func(f"Thread {thread_name} (ID: {thread_id}) finished {context}{func_name} for well {well} in {duration:.2f} seconds") 

184 log_func(f"Active threads: {len(active_threads)}") 

185 

186 return wrapper 

187 

188 # Handle both @track_thread_activity and @track_thread_activity(log_level="debug") 

189 if func is None: 

190 return decorator 

191 return decorator(func) 

192 

193def analyze_thread_activity(): 

194 """ 

195 Analyze thread activity data and return a report. 

196 

197 Returns: 

198 Dict containing analysis results 

199 """ 

200 max_concurrent = 0 

201 thread_starts = [] 

202 thread_ends = [] 

203 

204 for thread_id, activities in thread_activity.items(): 

205 for activity in activities: 

206 max_concurrent = max(max_concurrent, activity['active_threads']) 

207 if activity['action'] == 'start': 

208 thread_starts.append(( 

209 activity.get('well'), 

210 activity['thread_name'], 

211 activity['time'], 

212 activity.get('function', '') 

213 )) 

214 else: # 'end' 

215 thread_ends.append(( 

216 activity.get('well'), 

217 activity['thread_name'], 

218 activity['time'], 

219 activity.get('duration', 0), 

220 activity.get('function', '') 

221 )) 

222 

223 # Sort by time 

224 thread_starts.sort(key=lambda x: x[2]) 

225 thread_ends.sort(key=lambda x: x[2]) 

226 

227 # Find overlapping time periods 

228 overlaps = [] 

229 for i, (well1, thread1, start1, func1) in enumerate(thread_starts): 

230 # Find the end time for this thread 

231 end1 = None 

232 for w, t, end, d, f in thread_ends: 

233 if t == thread1 and w == well1 and f == func1: 

234 end1 = end 

235 break 

236 

237 if end1 is None: 

238 continue # Skip if we can't find the end time 

239 

240 # Check for overlaps with other threads 

241 for j, (well2, thread2, start2, func2) in enumerate(thread_starts): 

242 if i == j or thread1 == thread2: # Skip same thread 

243 continue 

244 

245 # Find the end time for the other thread 

246 end2 = None 

247 for w, t, end, d, f in thread_ends: 

248 if t == thread2 and w == well2 and f == func2: 

249 end2 = end 

250 break 

251 

252 if end2 is None: 

253 continue # Skip if we can't find the end time 

254 

255 # Check if there's an overlap 

256 if start1 < end2 and start2 < end1: 

257 overlap_start = max(start1, start2) 

258 overlap_end = min(end1, end2) 

259 overlap_duration = overlap_end - overlap_start 

260 

261 if overlap_duration > 0: 

262 overlaps.append({ 

263 'thread1': thread1, 

264 'well1': well1, 

265 'function1': func1, 

266 'thread2': thread2, 

267 'well2': well2, 

268 'function2': func2, 

269 'duration': overlap_duration 

270 }) 

271 

272 return { 

273 'max_concurrent': max_concurrent, 

274 'thread_starts': thread_starts, 

275 'thread_ends': thread_ends, 

276 'overlaps': overlaps 

277 } 

278 

279def print_thread_activity_report(): 

280 """Print a detailed report of thread activity.""" 

281 analysis = analyze_thread_activity() 

282 

283 print("\n" + "=" * 80) 

284 print("Thread Activity Report") 

285 print("=" * 80) 

286 

287 print("\nThread Start Events:") 

288 for well, thread_name, time_val, func in analysis['thread_starts']: 

289 print(f"Thread {thread_name} started {func} for well {well} at {time_val:.2f}") 

290 

291 print("\nThread End Events:") 

292 for well, thread_name, time_val, duration, func in analysis['thread_ends']: 

293 print(f"Thread {thread_name} finished {func} for well {well} at {time_val:.2f} (duration: {duration:.2f}s)") 

294 

295 print("\nOverlap Analysis:") 

296 for overlap in analysis['overlaps']: 

297 print(f"Threads {overlap['thread1']} and {overlap['thread2']} overlapped for {overlap['duration']:.2f}s") 

298 print(f" {overlap['thread1']} was processing {overlap['function1']} for well {overlap['well1']}") 

299 print(f" {overlap['thread2']} was processing {overlap['function2']} for well {overlap['well2']}") 

300 

301 print(f"\nFound {len(analysis['overlaps'])} thread overlaps") 

302 print(f"Maximum concurrent threads: {analysis['max_concurrent']}") 

303 print("=" * 80) 

304 

305 return analysis 

306 

307 

308# Natural sorting utilities 

309def natural_sort_key(text: Union[str, Path]) -> List[Union[str, int]]: 

310 """ 

311 Generate a natural sorting key for a string or Path. 

312 

313 This function converts a string into a list of strings and integers 

314 that can be used as a sorting key to achieve natural (human-friendly) 

315 sorting order. 

316 

317 Args: 

318 text: String or Path to generate sorting key for 

319 

320 Returns: 

321 List of strings and integers for natural sorting 

322 

323 Examples: 

324 >>> natural_sort_key("file10.txt") 

325 ['file', 10, '.txt'] 

326 >>> natural_sort_key("A01_s001_w1_z001.tif") 

327 ['A', 1, '_s', 1, '_w', 1, '_z', 1, '.tif'] 

328 """ 

329 text = str(text) 

330 

331 # Split on sequences of digits, keeping the digits 

332 parts = re.split(r'(\d+)', text) 

333 

334 # Convert digit sequences to integers, leave other parts as strings 

335 result = [] 

336 for part in parts: 

337 if part.isdigit(): 

338 result.append(int(part)) 

339 else: 

340 result.append(part) 

341 

342 return result 

343 

344 

345def natural_sort(items: List[Union[str, Path]]) -> List[Union[str, Path]]: 

346 """ 

347 Sort a list of strings or Paths using natural sorting. 

348 

349 Args: 

350 items: List of strings or Paths to sort 

351 

352 Returns: 

353 New list sorted in natural order 

354 

355 Examples: 

356 >>> natural_sort(["file1.txt", "file10.txt", "file2.txt"]) 

357 ['file1.txt', 'file2.txt', 'file10.txt'] 

358 >>> natural_sort(["A01_s001.tif", "A01_s010.tif", "A01_s002.tif"]) 

359 ['A01_s001.tif', 'A01_s002.tif', 'A01_s010.tif'] 

360 """ 

361 return sorted(items, key=natural_sort_key) 

362 

363 

364def natural_sort_inplace(items: List[Union[str, Path]]) -> None: 

365 """ 

366 Sort a list of strings or Paths using natural sorting in-place. 

367 

368 Args: 

369 items: List of strings or Paths to sort in-place 

370 

371 Examples: 

372 >>> files = ["file1.txt", "file10.txt", "file2.txt"] 

373 >>> natural_sort_inplace(files) 

374 >>> files 

375 ['file1.txt', 'file2.txt', 'file10.txt'] 

376 """ 

377 items.sort(key=natural_sort_key) 

378 

379 

380# === WELL FILTERING UTILITIES === 

381 

382from typing import List, Set, Union 

383from openhcs.core.config import WellFilterMode 

384 

385 

386class WellPatternConstants: 

387 """Centralized constants for well pattern parsing.""" 

388 COMMA_SEPARATOR = "," 

389 RANGE_SEPARATOR = ":" 

390 ROW_PREFIX = "row:" 

391 COL_PREFIX = "col:" 

392 

393 

394class WellFilterProcessor: 

395 """ 

396 Enhanced well filtering processor supporting both compilation-time and execution-time filtering. 

397 

398 Maintains backward compatibility with existing execution-time methods while adding 

399 compilation-time capabilities for the 5-phase compilation system. 

400 

401 Follows systematic refactoring framework principles: 

402 - Fail-loud validation with clear error messages 

403 - Pythonic patterns and idioms 

404 - Leverages existing well filtering infrastructure 

405 - Eliminates magic strings through centralized constants 

406 """ 

407 

408 # === NEW COMPILATION-TIME METHOD === 

409 

410 @staticmethod 

411 def resolve_compilation_filter( 

412 well_filter: Union[List[str], str, int], 

413 available_wells: List[str] 

414 ) -> Set[str]: 

415 """ 

416 Resolve well filter to concrete well set during compilation. 

417 

418 Combines validation and resolution in single method to avoid verbose helper methods. 

419 Supports all existing filter types while providing compilation-time optimization. 

420 Works with any well naming format (A01, R01C03, etc.) by using available wells. 

421 

422 Args: 

423 well_filter: Filter specification (list, string pattern, or max count) 

424 available_wells: Ordered list of wells from orchestrator.get_component_keys(MULTIPROCESSING_AXIS) 

425 

426 Returns: 

427 Set of well IDs that match the filter 

428 

429 Raises: 

430 ValueError: If wells don't exist, insufficient wells for count, or invalid patterns 

431 """ 

432 if isinstance(well_filter, list): 432 ↛ 434line 432 didn't jump to line 434 because the condition on line 432 was never true

433 # Inline validation for specific wells 

434 available_set = set(available_wells) 

435 invalid_wells = [w for w in well_filter if w not in available_set] 

436 if invalid_wells: 

437 raise ValueError( 

438 f"Invalid wells specified: {invalid_wells}. " 

439 f"Available wells: {sorted(available_set)}" 

440 ) 

441 return set(well_filter) 

442 

443 elif isinstance(well_filter, int): 443 ↛ 453line 443 didn't jump to line 453 because the condition on line 443 was always true

444 # Inline validation for max count 

445 if well_filter <= 0: 445 ↛ 446line 445 didn't jump to line 446 because the condition on line 445 was never true

446 raise ValueError(f"Max count must be positive, got: {well_filter}") 

447 if well_filter > len(available_wells): 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true

448 raise ValueError( 

449 f"Requested {well_filter} wells but only {len(available_wells)} available" 

450 ) 

451 return set(available_wells[:well_filter]) 

452 

453 elif isinstance(well_filter, str): 

454 # Check if string is a numeric value (common UI input issue) 

455 if well_filter.strip().isdigit(): 

456 # Convert numeric string to integer and process as max count 

457 numeric_value = int(well_filter.strip()) 

458 if numeric_value <= 0: 

459 raise ValueError(f"Max count must be positive, got: {numeric_value}") 

460 if numeric_value > len(available_wells): 

461 raise ValueError( 

462 f"Requested {numeric_value} wells but only {len(available_wells)} available" 

463 ) 

464 return set(available_wells[:numeric_value]) 

465 

466 else: 

467 # Non-numeric string - pass to pattern parsing for format-agnostic support 

468 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells) 

469 

470 else: 

471 raise ValueError(f"Unsupported well filter type: {type(well_filter)}") 

472 

473 # === EXISTING EXECUTION-TIME METHODS (MAINTAINED) === 

474 

475 @staticmethod 

476 def should_materialize_well( 

477 well_id: str, 

478 config, # MaterializationPathConfig 

479 processed_wells: Set[str] 

480 ) -> bool: 

481 """ 

482 EXISTING METHOD: Determine if a well should be materialized during execution. 

483 Maintained for backward compatibility and execution-time fallback. 

484 """ 

485 if config.well_filter is None: 

486 return True # No filter = materialize all wells 

487 

488 # Expand filter pattern to well list 

489 target_wells = WellFilterProcessor.expand_well_filter(config.well_filter) 

490 

491 # Apply max wells limit if filter is integer 

492 if isinstance(config.well_filter, int): 

493 if len(processed_wells) >= config.well_filter: 

494 return False 

495 

496 # Check if well matches filter 

497 well_in_filter = well_id in target_wells 

498 

499 # Apply include/exclude mode 

500 if config.well_filter_mode == WellFilterMode.INCLUDE: 

501 return well_in_filter 

502 else: # EXCLUDE mode 

503 return not well_in_filter 

504 

505 @staticmethod 

506 def expand_well_filter(well_filter: Union[List[str], str, int]) -> Set[str]: 

507 """ 

508 EXISTING METHOD: Expand well filter pattern to set of well IDs. 

509 Maintained for backward compatibility. 

510 """ 

511 if isinstance(well_filter, list): 

512 return set(well_filter) 

513 

514 if isinstance(well_filter, int): 

515 # For integer filters, we can't pre-expand wells since it depends on processing order 

516 # Return empty set - the max wells logic is handled in should_materialize_well 

517 return set() 

518 

519 if isinstance(well_filter, str): 

520 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells) 

521 

522 raise ValueError(f"Unsupported well filter type: {type(well_filter)}") 

523 

524 @staticmethod 

525 def _parse_well_pattern(pattern: str, available_wells: List[str]) -> Set[str]: 

526 """Parse string well patterns into well ID sets using available wells.""" 

527 pattern = pattern.strip() 

528 

529 # Comma-separated list 

530 if WellPatternConstants.COMMA_SEPARATOR in pattern: 

531 return set(w.strip() for w in pattern.split(WellPatternConstants.COMMA_SEPARATOR)) 

532 

533 # Row pattern: "row:A" 

534 if pattern.startswith(WellPatternConstants.ROW_PREFIX): 

535 row = pattern[len(WellPatternConstants.ROW_PREFIX):].strip() 

536 return WellFilterProcessor._expand_row_pattern(row, available_wells) 

537 

538 # Column pattern: "col:01-06" 

539 if pattern.startswith(WellPatternConstants.COL_PREFIX): 

540 col_spec = pattern[len(WellPatternConstants.COL_PREFIX):].strip() 

541 return WellFilterProcessor._expand_col_pattern(col_spec, available_wells) 

542 

543 # Range pattern: "A01:A12" 

544 if WellPatternConstants.RANGE_SEPARATOR in pattern: 

545 return WellFilterProcessor._expand_range_pattern(pattern, available_wells) 

546 

547 # Single well 

548 return {pattern} 

549 

550 @staticmethod 

551 def _expand_row_pattern(row: str, available_wells: List[str]) -> Set[str]: 

552 """Expand row pattern using available wells (format-agnostic).""" 

553 # Direct prefix match (A01, B02, etc.) 

554 result = {well for well in available_wells if well.startswith(row)} 

555 

556 # Opera Phenix format fallback (A → R01C*, B → R02C*) 

557 if not result and len(row) == 1 and row.isalpha(): 

558 row_pattern = f"R{ord(row.upper()) - ord('A') + 1:02d}C" 

559 result = {well for well in available_wells if well.startswith(row_pattern)} 

560 

561 return result 

562 

563 @staticmethod 

564 def _expand_col_pattern(col_spec: str, available_wells: List[str]) -> Set[str]: 

565 """Expand column pattern using available wells (format-agnostic).""" 

566 # Parse column range 

567 if "-" in col_spec: 

568 start_col, end_col = map(int, col_spec.split("-")) 

569 col_range = set(range(start_col, end_col + 1)) 

570 else: 

571 col_range = {int(col_spec)} 

572 

573 # Extract numeric suffix and match (A01, B02, etc.) 

574 def get_numeric_suffix(well: str) -> int: 

575 digits = ''.join(char for char in reversed(well) if char.isdigit()) 

576 return int(digits[::-1]) if digits else 0 

577 

578 result = {well for well in available_wells if get_numeric_suffix(well) in col_range} 

579 

580 # Opera Phenix format fallback (C01, C02, etc.) 

581 if not result: 

582 patterns = {f"C{col:02d}" for col in col_range} 

583 result = {well for well in available_wells 

584 if any(pattern in well for pattern in patterns)} 

585 

586 return result 

587 

588 @staticmethod 

589 def _expand_range_pattern(pattern: str, available_wells: List[str]) -> Set[str]: 

590 """Expand range pattern using available wells (format-agnostic).""" 

591 start_well, end_well = map(str.strip, pattern.split(WellPatternConstants.RANGE_SEPARATOR)) 

592 

593 try: 

594 start_idx, end_idx = available_wells.index(start_well), available_wells.index(end_well) 

595 except ValueError as e: 

596 raise ValueError(f"Range pattern '{pattern}' contains wells not in available wells: {e}") 

597 

598 # Ensure proper order and return range (inclusive) 

599 start_idx, end_idx = sorted([start_idx, end_idx]) 

600 return set(available_wells[start_idx:end_idx + 1])