Coverage for openhcs/core/context/processing_context.py: 59.7%

44 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Processing Context for OpenHCS. 

3 

4This module defines the ProcessingContext class, which maintains state during pipeline execution. 

5""" 

6 

7from typing import Any, Dict, Optional 

8 

9from openhcs.core.config import GlobalPipelineConfig, VFSConfig, PathPlanningConfig 

10 

11 

12class ProcessingContext: 

13 """ 

14 Maintains state during pipeline execution. 

15 

16 The ProcessingContext is the canonical owner of all state during pipeline execution. 

17 After compilation and freezing, it should be treated as immutable by processing steps. 

18 

19 OWNERSHIP: This class may ONLY be instantiated by PipelineOrchestrator. 

20 All other components must receive a context instance, never create one. 

21 

22 Attributes: 

23 step_plans: Dictionary mapping step IDs to execution plans. 

24 outputs: Dictionary for step outputs (usage may change with VFS-centric model). 

25 intermediates: Dictionary for intermediate results (usage may change). 

26 current_step: Current executing step ID (usage may change). 

27 axis_id: Identifier of the multiprocessing axis value being processed. 

28 filemanager: Instance of FileManager for VFS operations. 

29 global_config: GlobalPipelineConfig holding system-wide configurations. 

30 _is_frozen: Internal flag indicating if the context is immutable. 

31 """ 

32 

33 def __init__( 

34 self, 

35 global_config: GlobalPipelineConfig, # Made a required argument 

36 step_plans: Optional[Dict[str, Dict[str, Any]]] = None, 

37 axis_id: Optional[str] = None, 

38 **kwargs 

39 ): 

40 """ 

41 Initialize the processing context. 

42 

43 Args: 

44 global_config: The global pipeline configuration object. 

45 step_plans: Dictionary mapping step IDs to execution plans. 

46 axis_id: Identifier of the multiprocessing axis value being processed. 

47 **kwargs: Additional context attributes (e.g., filemanager, microscope_handler). 

48 """ 

49 # Initialize _is_frozen first to allow other attributes to be set by __setattr__ 

50 # This direct assignment bypasses the custom __setattr__ during initialization. 

51 object.__setattr__(self, '_is_frozen', False) 

52 

53 self.step_plans = step_plans or {} 

54 self.outputs = {} # Future use TBD, primary data flow via VFS 

55 self.intermediates = {} # Future use TBD, primary data flow via VFS 

56 self.current_step = None # Future use TBD 

57 self.axis_id = axis_id 

58 self.global_config = global_config # Store the global config 

59 self.filemanager = None # Expected to be set by Orchestrator via kwargs or direct assignment 

60 

61 # Add any additional attributes from kwargs 

62 # Note: 'filemanager' is often passed via kwargs by PipelineOrchestrator.create_context 

63 for key, value in kwargs.items(): 

64 setattr(self, key, value) # This will now go through our __setattr__ 

65 

66 def __setattr__(self, name: str, value: Any) -> None: 

67 """ 

68 Set an attribute, preventing modification if the context is frozen. 

69 """ 

70 if getattr(self, '_is_frozen', False) and name != '_is_frozen': 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 raise AttributeError(f"Cannot modify attribute '{name}' of a frozen ProcessingContext.") 

72 super().__setattr__(name, value) 

73 

74 def inject_plan(self, step_id: str, plan: Dict[str, Any]) -> None: 

75 """ 

76 Inject a step plan into the context. 

77 

78 This method is the canonical way to add step plans to the context during compilation. 

79 All step configuration must be injected into the context using this method. 

80 

81 Args: 

82 step_id: The unique identifier of the step 

83 plan: The step execution plan 

84 

85 Raises: 

86 AttributeError: If the context is frozen. 

87 """ 

88 if self._is_frozen: 

89 raise AttributeError("Cannot inject plan into a frozen ProcessingContext.") 

90 self.step_plans[step_id] = plan 

91 

92 def freeze(self) -> None: 

93 """ 

94 Freezes the context, making its attributes immutable. 

95 

96 This should be called after all compilation and plan injection is complete. 

97 Essential attributes like step_plans, filemanager, and axis_id must be set. 

98 

99 Raises: 

100 RuntimeError: If essential attributes are not set before freezing. 

101 """ 

102 if not self.axis_id: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true

103 raise RuntimeError("Cannot freeze ProcessingContext: 'axis_id' is not set.") 

104 if not hasattr(self, 'filemanager') or self.filemanager is None: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 raise RuntimeError("Cannot freeze ProcessingContext: 'filemanager' is not set.") 

106 # step_plans can be empty if the pipeline is empty, but it must exist. 

107 if not hasattr(self, 'step_plans'): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 raise RuntimeError("Cannot freeze ProcessingContext: 'step_plans' attribute does not exist.") 

109 

110 self._is_frozen = True # This assignment is allowed by __setattr__ 

111 

112 def is_frozen(self) -> bool: 

113 """ 

114 Check if the context is frozen. 

115 

116 Returns: 

117 True if the context is frozen, False otherwise. 

118 """ 

119 return self._is_frozen 

120 

121 

122 

123 # update_from_step_result method is removed as per plan. 

124 

125 # --- Config Getters --- 

126 # NOTE: These are only used outside compilation (e.g., in workers after context is frozen) 

127 # During compilation, code should access orchestrator.pipeline_config directly 

128 

129 def get_vfs_config(self) -> VFSConfig: 

130 """Returns the VFSConfig part of the global configuration.""" 

131 if not hasattr(self, 'global_config') or self.global_config is None: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 raise RuntimeError("GlobalPipelineConfig not set on ProcessingContext.") 

133 return self.global_config.vfs_config 

134 

135 def get_path_planning_config(self) -> PathPlanningConfig: 

136 """Returns the PathPlanningConfig part of the global configuration.""" 

137 if not hasattr(self, 'global_config') or self.global_config is None: 

138 raise RuntimeError("GlobalPipelineConfig not set on ProcessingContext.") 

139 return self.global_config.path_planning_config 

140 

141 def get_num_workers(self) -> int: 

142 """Returns the number of workers from the global configuration.""" 

143 if not hasattr(self, 'global_config') or self.global_config is None: 

144 raise RuntimeError("GlobalPipelineConfig not set on ProcessingContext.") 

145 return self.global_config.num_workers