Coverage for openhcs/core/pipeline/pipeline_utils.py: 34.6%

14 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1# openhcs/core/pipeline/pipeline_utils.py 

2"""Utility functions for the OpenHCS pipeline system.""" 

3from typing import Any, Callable, List, Optional, Tuple 

4 

5def get_core_callable(func_pattern: Any) -> Optional[Callable[..., Any]]: 

6 """ 

7 Extracts the first effective Python callable from a func_pattern. 

8 A func_pattern can be a direct callable, a (callable, kwargs) tuple, 

9 a list (chain) where the first element is one of these types, 

10 or a dict pattern where we extract from the first value. 

11 """ 

12 if callable(func_pattern) and not isinstance(func_pattern, type): 

13 # It's a direct callable (and not an uninstantiated class) 

14 return func_pattern 

15 elif isinstance(func_pattern, tuple) and func_pattern and \ 15 ↛ 20line 15 didn't jump to line 20 because the condition on line 15 was always true

16 callable(func_pattern[0]) and \ 

17 not isinstance(func_pattern[0], type): 

18 # It's a (callable, kwargs) tuple, ensure first element is a callable function 

19 return func_pattern[0] 

20 elif isinstance(func_pattern, list) and func_pattern: 

21 # It's a list (chain), recursively call for the first item 

22 return get_core_callable(func_pattern[0]) 

23 elif isinstance(func_pattern, dict) and func_pattern: 

24 # It's a dict pattern, extract from the first value 

25 # For transformed dict patterns, we want the transformed function with namespaced outputs 

26 for key, value in func_pattern.items(): 

27 core_callable = get_core_callable(value) 

28 if core_callable: 

29 return core_callable 

30 return None 

31