Coverage for openhcs/core/context/processing_context.py: 55.6%

45 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2Processing Context for OpenHCS. 

3 

4This module defines the ProcessingContext class, which maintains state during pipeline execution. 

5""" 

6 

7from typing import Any, Dict, Optional, Union 

8from pathlib import Path 

9 

10from openhcs.core.config import GlobalPipelineConfig, VFSConfig, PathPlanningConfig 

11 

12 

13class ProcessingContext: 

14 """ 

15 Maintains state during pipeline execution. 

16 

17 The ProcessingContext is the canonical owner of all state during pipeline execution. 

18 After compilation and freezing, it should be treated as immutable by processing steps. 

19 

20 OWNERSHIP: This class may ONLY be instantiated by PipelineOrchestrator. 

21 All other components must receive a context instance, never create one. 

22 

23 Attributes: 

24 step_plans: Dictionary mapping step IDs to execution plans. 

25 outputs: Dictionary for step outputs (usage may change with VFS-centric model). 

26 intermediates: Dictionary for intermediate results (usage may change). 

27 current_step: Current executing step ID (usage may change). 

28 axis_id: Identifier of the multiprocessing axis value being processed. 

29 filemanager: Instance of FileManager for VFS operations. 

30 global_config: GlobalPipelineConfig holding system-wide configurations. 

31 _is_frozen: Internal flag indicating if the context is immutable. 

32 """ 

33 

34 def __init__( 

35 self, 

36 global_config: GlobalPipelineConfig, # Made a required argument 

37 step_plans: Optional[Dict[str, Dict[str, Any]]] = None, 

38 axis_id: Optional[str] = None, 

39 **kwargs 

40 ): 

41 """ 

42 Initialize the processing context. 

43 

44 Args: 

45 global_config: The global pipeline configuration object. 

46 step_plans: Dictionary mapping step IDs to execution plans. 

47 axis_id: Identifier of the multiprocessing axis value being processed. 

48 **kwargs: Additional context attributes (e.g., filemanager, microscope_handler). 

49 """ 

50 # Initialize _is_frozen first to allow other attributes to be set by __setattr__ 

51 # This direct assignment bypasses the custom __setattr__ during initialization. 

52 object.__setattr__(self, '_is_frozen', False) 

53 

54 self.step_plans = step_plans or {} 

55 self.outputs = {} # Future use TBD, primary data flow via VFS 

56 self.intermediates = {} # Future use TBD, primary data flow via VFS 

57 self.current_step = None # Future use TBD 

58 self.axis_id = axis_id 

59 self.global_config = global_config # Store the global config 

60 self.filemanager = None # Expected to be set by Orchestrator via kwargs or direct assignment 

61 

62 # Add any additional attributes from kwargs 

63 # Note: 'filemanager' is often passed via kwargs by PipelineOrchestrator.create_context 

64 for key, value in kwargs.items(): 

65 setattr(self, key, value) # This will now go through our __setattr__ 

66 

67 def __setattr__(self, name: str, value: Any) -> None: 

68 """ 

69 Set an attribute, preventing modification if the context is frozen. 

70 """ 

71 if getattr(self, '_is_frozen', False) and name != '_is_frozen': 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 raise AttributeError(f"Cannot modify attribute '{name}' of a frozen ProcessingContext.") 

73 super().__setattr__(name, value) 

74 

75 def inject_plan(self, step_id: str, plan: Dict[str, Any]) -> None: 

76 """ 

77 Inject a step plan into the context. 

78 

79 This method is the canonical way to add step plans to the context during compilation. 

80 All step configuration must be injected into the context using this method. 

81 

82 Args: 

83 step_id: The unique identifier of the step 

84 plan: The step execution plan 

85 

86 Raises: 

87 AttributeError: If the context is frozen. 

88 """ 

89 if self._is_frozen: 

90 raise AttributeError("Cannot inject plan into a frozen ProcessingContext.") 

91 self.step_plans[step_id] = plan 

92 

93 def freeze(self) -> None: 

94 """ 

95 Freezes the context, making its attributes immutable. 

96 

97 This should be called after all compilation and plan injection is complete. 

98 Essential attributes like step_plans, filemanager, and axis_id must be set. 

99 

100 Raises: 

101 RuntimeError: If essential attributes are not set before freezing. 

102 """ 

103 if not self.axis_id: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 raise RuntimeError("Cannot freeze ProcessingContext: 'axis_id' is not set.") 

105 if not hasattr(self, 'filemanager') or self.filemanager is None: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 raise RuntimeError("Cannot freeze ProcessingContext: 'filemanager' is not set.") 

107 # step_plans can be empty if the pipeline is empty, but it must exist. 

108 if not hasattr(self, 'step_plans'): 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 raise RuntimeError("Cannot freeze ProcessingContext: 'step_plans' attribute does not exist.") 

110 

111 self._is_frozen = True # This assignment is allowed by __setattr__ 

112 

113 def is_frozen(self) -> bool: 

114 """ 

115 Check if the context is frozen. 

116 

117 Returns: 

118 True if the context is frozen, False otherwise. 

119 """ 

120 return self._is_frozen 

121 

122 

123 

124 # update_from_step_result method is removed as per plan. 

125 

126 # --- Config Getters --- 

127 # NOTE: These are only used outside compilation (e.g., in workers after context is frozen) 

128 # During compilation, code should access orchestrator.pipeline_config directly 

129 

130 def get_vfs_config(self) -> VFSConfig: 

131 """Returns the VFSConfig part of the global configuration.""" 

132 if not hasattr(self, 'global_config') or self.global_config is None: 

133 raise RuntimeError("GlobalPipelineConfig not set on ProcessingContext.") 

134 return self.global_config.vfs_config 

135 

136 def get_path_planning_config(self) -> PathPlanningConfig: 

137 """Returns the PathPlanningConfig part of the global configuration.""" 

138 if not hasattr(self, 'global_config') or self.global_config is None: 

139 raise RuntimeError("GlobalPipelineConfig not set on ProcessingContext.") 

140 return self.global_config.path_planning_config 

141 

142 def get_num_workers(self) -> int: 

143 """Returns the number of workers from the global configuration.""" 

144 if not hasattr(self, 'global_config') or self.global_config is None: 

145 raise RuntimeError("GlobalPipelineConfig not set on ProcessingContext.") 

146 return self.global_config.num_workers