Coverage for openhcs/core/utils.py: 24.7%

240 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2Utility functions for the OpenHCS package. 

3""" 

4 

5import functools 

6import logging 

7import re 

8import threading 

9import time 

10from collections import defaultdict 

11from pathlib import Path 

12from typing import Any, Callable, Dict, List, Optional, Union 

13 

14logger = logging.getLogger(__name__) 

15 

16class _ModulePlaceholder: 

17 """ 

18 Placeholder for missing optional modules that allows attribute access 

19 for type annotations while still being falsy and failing on actual use. 

20 """ 

21 def __init__(self, module_name: str): 

22 self._module_name = module_name 

23 

24 def __bool__(self): 

25 return False 

26 

27 def __getattr__(self, name): 

28 # Return another placeholder for chained attribute access 

29 # This allows things like cp.ndarray in type annotations to work 

30 return _ModulePlaceholder(f"{self._module_name}.{name}") 

31 

32 def __call__(self, *args, **kwargs): 

33 # If someone tries to actually call a function, fail loudly 

34 raise ImportError(f"Module '{self._module_name}' is not available. Please install the required dependency.") 

35 

36 def __repr__(self): 

37 return f"<ModulePlaceholder for '{self._module_name}'>" 

38 

39 

40def optional_import(module_name: str) -> Optional[Any]: 

41 """ 

42 Import a module if available, otherwise return a placeholder that handles 

43 attribute access gracefully for type annotations but fails on actual use. 

44 

45 This function allows for graceful handling of optional dependencies. 

46 It can be used to import libraries that may not be installed, 

47 particularly GPU-related libraries like torch, tensorflow, and cupy. 

48 

49 Args: 

50 module_name: Name of the module to import 

51 

52 Returns: 

53 The imported module if available, a placeholder otherwise 

54 

55 Example: 

56 ```python 

57 # Import torch if available 

58 torch = optional_import("torch") 

59 

60 # Check if torch is available before using it 

61 if torch: 

62 # Use torch 

63 tensor = torch.tensor([1, 2, 3]) 

64 else: 

65 # Handle the case where torch is not available 

66 raise ImportError("PyTorch is required for this function") 

67 ``` 

68 """ 

69 try: 

70 # Use importlib.import_module which handles dotted names properly 

71 import importlib 

72 return importlib.import_module(module_name) 

73 except (ImportError, ModuleNotFoundError, AttributeError): 

74 # Return a placeholder that handles attribute access gracefully 

75 return _ModulePlaceholder(module_name) 

76 

77# Global thread activity tracking 

78thread_activity = defaultdict(list) 

79active_threads = set() 

80thread_lock = threading.Lock() 

81 

82def get_thread_activity() -> Dict[int, List[Dict[str, Any]]]: 

83 """ 

84 Get the current thread activity data. 

85 

86 Returns: 

87 Dict mapping thread IDs to lists of activity records 

88 """ 

89 return thread_activity 

90 

91def get_active_threads() -> set: 

92 """ 

93 Get the set of currently active thread IDs. 

94 

95 Returns: 

96 Set of active thread IDs 

97 """ 

98 return active_threads 

99 

100def clear_thread_activity(): 

101 """Clear all thread activity data.""" 

102 with thread_lock: 

103 thread_activity.clear() 

104 active_threads.clear() 

105 

106def track_thread_activity(func: Optional[Callable] = None, *, log_level: str = "info"): 

107 """ 

108 Decorator to track thread activity for a function. 

109 

110 Args: 

111 func: The function to decorate 

112 log_level: Logging level to use ("debug", "info", "warning", "error") 

113 

114 Returns: 

115 Decorated function that tracks thread activity 

116 """ 

117 def decorator(f): 

118 @functools.wraps(f) 

119 def wrapper(*args, **kwargs): 

120 # Get thread information 

121 thread_id = threading.get_ident() 

122 thread_name = threading.current_thread().name 

123 

124 # Record thread start time 

125 start_time = time.time() 

126 

127 # Extract function name and arguments for context 

128 func_name = f.__name__ 

129 # Get the first argument if it's a method (self or cls) 

130 context = "" 

131 if args and hasattr(args[0], "__class__"): 

132 if hasattr(args[0].__class__, func_name): 

133 # It's likely a method, extract class name 

134 context = f"{args[0].__class__.__name__}." 

135 

136 # Extract well information if present in kwargs or args 

137 well = kwargs.get('well', None) 

138 if well is None and len(args) > 1 and isinstance(args[1], str): 

139 # Assume second argument might be well in methods like process_well(self, well, ...) 

140 well = args[1] 

141 

142 # Add this thread to active threads 

143 with thread_lock: 

144 active_threads.add(thread_id) 

145 # Record the number of active threads at this moment 

146 thread_activity[thread_id].append({ 

147 'well': well, 

148 'thread_name': thread_name, 

149 'time': time.time(), 

150 'action': 'start', 

151 'function': f"{context}{func_name}", 

152 'active_threads': len(active_threads) 

153 }) 

154 

155 # Log the start of the function 

156 log_func = getattr(logger, log_level.lower()) 

157 log_func(f"Thread {thread_name} (ID: {thread_id}) started {context}{func_name} for well {well}") 

158 log_func(f"Active threads: {len(active_threads)}") 

159 

160 try: 

161 # Call the original function 

162 result = f(*args, **kwargs) 

163 return result 

164 finally: 

165 # Record thread end time 

166 end_time = time.time() 

167 duration = end_time - start_time 

168 

169 # Remove this thread from active threads 

170 with thread_lock: 

171 active_threads.remove(thread_id) 

172 # Record the number of active threads at this moment 

173 thread_activity[thread_id].append({ 

174 'well': well, 

175 'thread_name': thread_name, 

176 'time': time.time(), 

177 'action': 'end', 

178 'function': f"{context}{func_name}", 

179 'duration': duration, 

180 'active_threads': len(active_threads) 

181 }) 

182 

183 log_func(f"Thread {thread_name} (ID: {thread_id}) finished {context}{func_name} for well {well} in {duration:.2f} seconds") 

184 log_func(f"Active threads: {len(active_threads)}") 

185 

186 return wrapper 

187 

188 # Handle both @track_thread_activity and @track_thread_activity(log_level="debug") 

189 if func is None: 

190 return decorator 

191 return decorator(func) 

192 

193def analyze_thread_activity(): 

194 """ 

195 Analyze thread activity data and return a report. 

196 

197 Returns: 

198 Dict containing analysis results 

199 """ 

200 max_concurrent = 0 

201 thread_starts = [] 

202 thread_ends = [] 

203 

204 for thread_id, activities in thread_activity.items(): 

205 for activity in activities: 

206 max_concurrent = max(max_concurrent, activity['active_threads']) 

207 if activity['action'] == 'start': 

208 thread_starts.append(( 

209 activity.get('well'), 

210 activity['thread_name'], 

211 activity['time'], 

212 activity.get('function', '') 

213 )) 

214 else: # 'end' 

215 thread_ends.append(( 

216 activity.get('well'), 

217 activity['thread_name'], 

218 activity['time'], 

219 activity.get('duration', 0), 

220 activity.get('function', '') 

221 )) 

222 

223 # Sort by time 

224 thread_starts.sort(key=lambda x: x[2]) 

225 thread_ends.sort(key=lambda x: x[2]) 

226 

227 # Find overlapping time periods 

228 overlaps = [] 

229 for i, (well1, thread1, start1, func1) in enumerate(thread_starts): 

230 # Find the end time for this thread 

231 end1 = None 

232 for w, t, end, d, f in thread_ends: 

233 if t == thread1 and w == well1 and f == func1: 

234 end1 = end 

235 break 

236 

237 if end1 is None: 

238 continue # Skip if we can't find the end time 

239 

240 # Check for overlaps with other threads 

241 for j, (well2, thread2, start2, func2) in enumerate(thread_starts): 

242 if i == j or thread1 == thread2: # Skip same thread 

243 continue 

244 

245 # Find the end time for the other thread 

246 end2 = None 

247 for w, t, end, d, f in thread_ends: 

248 if t == thread2 and w == well2 and f == func2: 

249 end2 = end 

250 break 

251 

252 if end2 is None: 

253 continue # Skip if we can't find the end time 

254 

255 # Check if there's an overlap 

256 if start1 < end2 and start2 < end1: 

257 overlap_start = max(start1, start2) 

258 overlap_end = min(end1, end2) 

259 overlap_duration = overlap_end - overlap_start 

260 

261 if overlap_duration > 0: 

262 overlaps.append({ 

263 'thread1': thread1, 

264 'well1': well1, 

265 'function1': func1, 

266 'thread2': thread2, 

267 'well2': well2, 

268 'function2': func2, 

269 'duration': overlap_duration 

270 }) 

271 

272 return { 

273 'max_concurrent': max_concurrent, 

274 'thread_starts': thread_starts, 

275 'thread_ends': thread_ends, 

276 'overlaps': overlaps 

277 } 

278 

279def print_thread_activity_report(): 

280 """Print a detailed report of thread activity.""" 

281 analysis = analyze_thread_activity() 

282 

283 print("\n" + "=" * 80) 

284 print("Thread Activity Report") 

285 print("=" * 80) 

286 

287 print("\nThread Start Events:") 

288 for well, thread_name, time_val, func in analysis['thread_starts']: 

289 print(f"Thread {thread_name} started {func} for well {well} at {time_val:.2f}") 

290 

291 print("\nThread End Events:") 

292 for well, thread_name, time_val, duration, func in analysis['thread_ends']: 

293 print(f"Thread {thread_name} finished {func} for well {well} at {time_val:.2f} (duration: {duration:.2f}s)") 

294 

295 print("\nOverlap Analysis:") 

296 for overlap in analysis['overlaps']: 

297 print(f"Threads {overlap['thread1']} and {overlap['thread2']} overlapped for {overlap['duration']:.2f}s") 

298 print(f" {overlap['thread1']} was processing {overlap['function1']} for well {overlap['well1']}") 

299 print(f" {overlap['thread2']} was processing {overlap['function2']} for well {overlap['well2']}") 

300 

301 print(f"\nFound {len(analysis['overlaps'])} thread overlaps") 

302 print(f"Maximum concurrent threads: {analysis['max_concurrent']}") 

303 print("=" * 80) 

304 

305 return analysis 

306 

307 

308# Natural sorting utilities 

309def natural_sort_key(text: Union[str, Path]) -> List[Union[str, int]]: 

310 """ 

311 Generate a natural sorting key for a string or Path. 

312 

313 This function converts a string into a list of strings and integers 

314 that can be used as a sorting key to achieve natural (human-friendly) 

315 sorting order. 

316 

317 Args: 

318 text: String or Path to generate sorting key for 

319 

320 Returns: 

321 List of strings and integers for natural sorting 

322 

323 Examples: 

324 >>> natural_sort_key("file10.txt") 

325 ['file', 10, '.txt'] 

326 >>> natural_sort_key("A01_s001_w1_z001.tif") 

327 ['A', 1, '_s', 1, '_w', 1, '_z', 1, '.tif'] 

328 """ 

329 text = str(text) 

330 

331 # Split on sequences of digits, keeping the digits 

332 parts = re.split(r'(\d+)', text) 

333 

334 # Convert digit sequences to integers, leave other parts as strings 

335 result = [] 

336 for part in parts: 

337 if part.isdigit(): 

338 result.append(int(part)) 

339 else: 

340 result.append(part) 

341 

342 return result 

343 

344 

345def natural_sort(items: List[Union[str, Path]]) -> List[Union[str, Path]]: 

346 """ 

347 Sort a list of strings or Paths using natural sorting. 

348 

349 Args: 

350 items: List of strings or Paths to sort 

351 

352 Returns: 

353 New list sorted in natural order 

354 

355 Examples: 

356 >>> natural_sort(["file1.txt", "file10.txt", "file2.txt"]) 

357 ['file1.txt', 'file2.txt', 'file10.txt'] 

358 >>> natural_sort(["A01_s001.tif", "A01_s010.tif", "A01_s002.tif"]) 

359 ['A01_s001.tif', 'A01_s002.tif', 'A01_s010.tif'] 

360 """ 

361 return sorted(items, key=natural_sort_key) 

362 

363 

364def natural_sort_inplace(items: List[Union[str, Path]]) -> None: 

365 """ 

366 Sort a list of strings or Paths using natural sorting in-place. 

367 

368 Args: 

369 items: List of strings or Paths to sort in-place 

370 

371 Examples: 

372 >>> files = ["file1.txt", "file10.txt", "file2.txt"] 

373 >>> natural_sort_inplace(files) 

374 >>> files 

375 ['file1.txt', 'file2.txt', 'file10.txt'] 

376 """ 

377 items.sort(key=natural_sort_key) 

378 

379 

380# === WELL FILTERING UTILITIES === 

381 

382import re 

383import string 

384from typing import List, Set, Union 

385from openhcs.core.config import WellFilterMode 

386 

387 

388class WellPatternConstants: 

389 """Centralized constants for well pattern parsing.""" 

390 COMMA_SEPARATOR = "," 

391 RANGE_SEPARATOR = ":" 

392 ROW_PREFIX = "row:" 

393 COL_PREFIX = "col:" 

394 

395 

396class WellFilterProcessor: 

397 """ 

398 Enhanced well filtering processor supporting both compilation-time and execution-time filtering. 

399 

400 Maintains backward compatibility with existing execution-time methods while adding 

401 compilation-time capabilities for the 5-phase compilation system. 

402 

403 Follows systematic refactoring framework principles: 

404 - Fail-loud validation with clear error messages 

405 - Pythonic patterns and idioms 

406 - Leverages existing well filtering infrastructure 

407 - Eliminates magic strings through centralized constants 

408 """ 

409 

410 # === NEW COMPILATION-TIME METHOD === 

411 

412 @staticmethod 

413 def resolve_compilation_filter( 

414 well_filter: Union[List[str], str, int], 

415 available_wells: List[str] 

416 ) -> Set[str]: 

417 """ 

418 Resolve well filter to concrete well set during compilation. 

419 

420 Combines validation and resolution in single method to avoid verbose helper methods. 

421 Supports all existing filter types while providing compilation-time optimization. 

422 Works with any well naming format (A01, R01C03, etc.) by using available wells. 

423 

424 Args: 

425 well_filter: Filter specification (list, string pattern, or max count) 

426 available_wells: Ordered list of wells from orchestrator.get_component_keys(MULTIPROCESSING_AXIS) 

427 

428 Returns: 

429 Set of well IDs that match the filter 

430 

431 Raises: 

432 ValueError: If wells don't exist, insufficient wells for count, or invalid patterns 

433 """ 

434 if isinstance(well_filter, list): 434 ↛ 436line 434 didn't jump to line 436 because the condition on line 434 was never true

435 # Inline validation for specific wells 

436 available_set = set(available_wells) 

437 invalid_wells = [w for w in well_filter if w not in available_set] 

438 if invalid_wells: 

439 raise ValueError( 

440 f"Invalid wells specified: {invalid_wells}. " 

441 f"Available wells: {sorted(available_set)}" 

442 ) 

443 return set(well_filter) 

444 

445 elif isinstance(well_filter, int): 445 ↛ 455line 445 didn't jump to line 455 because the condition on line 445 was always true

446 # Inline validation for max count 

447 if well_filter <= 0: 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true

448 raise ValueError(f"Max count must be positive, got: {well_filter}") 

449 if well_filter > len(available_wells): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 raise ValueError( 

451 f"Requested {well_filter} wells but only {len(available_wells)} available" 

452 ) 

453 return set(available_wells[:well_filter]) 

454 

455 elif isinstance(well_filter, str): 

456 # Check if string is a numeric value (common UI input issue) 

457 if well_filter.strip().isdigit(): 

458 # Convert numeric string to integer and process as max count 

459 numeric_value = int(well_filter.strip()) 

460 if numeric_value <= 0: 

461 raise ValueError(f"Max count must be positive, got: {numeric_value}") 

462 if numeric_value > len(available_wells): 

463 raise ValueError( 

464 f"Requested {numeric_value} wells but only {len(available_wells)} available" 

465 ) 

466 return set(available_wells[:numeric_value]) 

467 

468 else: 

469 # Non-numeric string - pass to pattern parsing for format-agnostic support 

470 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells) 

471 

472 else: 

473 raise ValueError(f"Unsupported well filter type: {type(well_filter)}") 

474 

475 # === EXISTING EXECUTION-TIME METHODS (MAINTAINED) === 

476 

477 @staticmethod 

478 def should_materialize_well( 

479 well_id: str, 

480 config, # MaterializationPathConfig 

481 processed_wells: Set[str] 

482 ) -> bool: 

483 """ 

484 EXISTING METHOD: Determine if a well should be materialized during execution. 

485 Maintained for backward compatibility and execution-time fallback. 

486 """ 

487 if config.well_filter is None: 

488 return True # No filter = materialize all wells 

489 

490 # Expand filter pattern to well list 

491 target_wells = WellFilterProcessor.expand_well_filter(config.well_filter) 

492 

493 # Apply max wells limit if filter is integer 

494 if isinstance(config.well_filter, int): 

495 if len(processed_wells) >= config.well_filter: 

496 return False 

497 

498 # Check if well matches filter 

499 well_in_filter = well_id in target_wells 

500 

501 # Apply include/exclude mode 

502 if config.well_filter_mode == WellFilterMode.INCLUDE: 

503 return well_in_filter 

504 else: # EXCLUDE mode 

505 return not well_in_filter 

506 

507 @staticmethod 

508 def expand_well_filter(well_filter: Union[List[str], str, int]) -> Set[str]: 

509 """ 

510 EXISTING METHOD: Expand well filter pattern to set of well IDs. 

511 Maintained for backward compatibility. 

512 """ 

513 if isinstance(well_filter, list): 

514 return set(well_filter) 

515 

516 if isinstance(well_filter, int): 

517 # For integer filters, we can't pre-expand wells since it depends on processing order 

518 # Return empty set - the max wells logic is handled in should_materialize_well 

519 return set() 

520 

521 if isinstance(well_filter, str): 

522 return WellFilterProcessor._parse_well_pattern(well_filter, available_wells) 

523 

524 raise ValueError(f"Unsupported well filter type: {type(well_filter)}") 

525 

526 @staticmethod 

527 def _parse_well_pattern(pattern: str, available_wells: List[str]) -> Set[str]: 

528 """Parse string well patterns into well ID sets using available wells.""" 

529 pattern = pattern.strip() 

530 

531 # Comma-separated list 

532 if WellPatternConstants.COMMA_SEPARATOR in pattern: 

533 return set(w.strip() for w in pattern.split(WellPatternConstants.COMMA_SEPARATOR)) 

534 

535 # Row pattern: "row:A" 

536 if pattern.startswith(WellPatternConstants.ROW_PREFIX): 

537 row = pattern[len(WellPatternConstants.ROW_PREFIX):].strip() 

538 return WellFilterProcessor._expand_row_pattern(row, available_wells) 

539 

540 # Column pattern: "col:01-06" 

541 if pattern.startswith(WellPatternConstants.COL_PREFIX): 

542 col_spec = pattern[len(WellPatternConstants.COL_PREFIX):].strip() 

543 return WellFilterProcessor._expand_col_pattern(col_spec, available_wells) 

544 

545 # Range pattern: "A01:A12" 

546 if WellPatternConstants.RANGE_SEPARATOR in pattern: 

547 return WellFilterProcessor._expand_range_pattern(pattern, available_wells) 

548 

549 # Single well 

550 return {pattern} 

551 

552 @staticmethod 

553 def _expand_row_pattern(row: str, available_wells: List[str]) -> Set[str]: 

554 """Expand row pattern using available wells (format-agnostic).""" 

555 # Direct prefix match (A01, B02, etc.) 

556 result = {well for well in available_wells if well.startswith(row)} 

557 

558 # Opera Phenix format fallback (A → R01C*, B → R02C*) 

559 if not result and len(row) == 1 and row.isalpha(): 

560 row_pattern = f"R{ord(row.upper()) - ord('A') + 1:02d}C" 

561 result = {well for well in available_wells if well.startswith(row_pattern)} 

562 

563 return result 

564 

565 @staticmethod 

566 def _expand_col_pattern(col_spec: str, available_wells: List[str]) -> Set[str]: 

567 """Expand column pattern using available wells (format-agnostic).""" 

568 # Parse column range 

569 if "-" in col_spec: 

570 start_col, end_col = map(int, col_spec.split("-")) 

571 col_range = set(range(start_col, end_col + 1)) 

572 else: 

573 col_range = {int(col_spec)} 

574 

575 # Extract numeric suffix and match (A01, B02, etc.) 

576 def get_numeric_suffix(well: str) -> int: 

577 digits = ''.join(char for char in reversed(well) if char.isdigit()) 

578 return int(digits[::-1]) if digits else 0 

579 

580 result = {well for well in available_wells if get_numeric_suffix(well) in col_range} 

581 

582 # Opera Phenix format fallback (C01, C02, etc.) 

583 if not result: 

584 patterns = {f"C{col:02d}" for col in col_range} 

585 result = {well for well in available_wells 

586 if any(pattern in well for pattern in patterns)} 

587 

588 return result 

589 

590 @staticmethod 

591 def _expand_range_pattern(pattern: str, available_wells: List[str]) -> Set[str]: 

592 """Expand range pattern using available wells (format-agnostic).""" 

593 start_well, end_well = map(str.strip, pattern.split(WellPatternConstants.RANGE_SEPARATOR)) 

594 

595 try: 

596 start_idx, end_idx = available_wells.index(start_well), available_wells.index(end_well) 

597 except ValueError as e: 

598 raise ValueError(f"Range pattern '{pattern}' contains wells not in available wells: {e}") 

599 

600 # Ensure proper order and return range (inclusive) 

601 start_idx, end_idx = sorted([start_idx, end_idx]) 

602 return set(available_wells[start_idx:end_idx + 1])