Coverage for openhcs/core/context/processing_context.py: 65.1%
45 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Processing Context for OpenHCS.
4This module defines the ProcessingContext class, which maintains state during pipeline execution.
5"""
7from typing import Any, Dict, Optional, Union
8from pathlib import Path
10from openhcs.core.config import GlobalPipelineConfig, VFSConfig, PathPlanningConfig
13class ProcessingContext:
14 """
15 Maintains state during pipeline execution.
17 The ProcessingContext is the canonical owner of all state during pipeline execution.
18 After compilation and freezing, it should be treated as immutable by processing steps.
20 OWNERSHIP: This class may ONLY be instantiated by PipelineOrchestrator.
21 All other components must receive a context instance, never create one.
23 Attributes:
24 step_plans: Dictionary mapping step IDs to execution plans.
25 outputs: Dictionary for step outputs (usage may change with VFS-centric model).
26 intermediates: Dictionary for intermediate results (usage may change).
27 current_step: Current executing step ID (usage may change).
28 well_id: Identifier of the well being processed.
29 filemanager: Instance of FileManager for VFS operations.
30 global_config: GlobalPipelineConfig holding system-wide configurations.
31 _is_frozen: Internal flag indicating if the context is immutable.
32 """
34 def __init__(
35 self,
36 global_config: GlobalPipelineConfig, # Made a required argument
37 step_plans: Optional[Dict[str, Dict[str, Any]]] = None,
38 well_id: Optional[str] = None,
39 **kwargs
40 ):
41 """
42 Initialize the processing context.
44 Args:
45 global_config: The global pipeline configuration object.
46 step_plans: Dictionary mapping step IDs to execution plans.
47 well_id: Identifier of the well being processed.
48 **kwargs: Additional context attributes (e.g., filemanager, microscope_handler).
49 """
50 # Initialize _is_frozen first to allow other attributes to be set by __setattr__
51 # This direct assignment bypasses the custom __setattr__ during initialization.
52 object.__setattr__(self, '_is_frozen', False)
54 self.step_plans = step_plans or {}
55 self.outputs = {} # Future use TBD, primary data flow via VFS
56 self.intermediates = {} # Future use TBD, primary data flow via VFS
57 self.current_step = None # Future use TBD
58 self.well_id = well_id
59 self.global_config = global_config # Store the global config
60 self.filemanager = None # Expected to be set by Orchestrator via kwargs or direct assignment
62 # Add any additional attributes from kwargs
63 # Note: 'filemanager' is often passed via kwargs by PipelineOrchestrator.create_context
64 for key, value in kwargs.items():
65 setattr(self, key, value) # This will now go through our __setattr__
67 def __setattr__(self, name: str, value: Any) -> None:
68 """
69 Set an attribute, preventing modification if the context is frozen.
70 """
71 if getattr(self, '_is_frozen', False) and name != '_is_frozen': 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 raise AttributeError(f"Cannot modify attribute '{name}' of a frozen ProcessingContext.")
73 super().__setattr__(name, value)
75 def inject_plan(self, step_id: str, plan: Dict[str, Any]) -> None:
76 """
77 Inject a step plan into the context.
79 This method is the canonical way to add step plans to the context during compilation.
80 All step configuration must be injected into the context using this method.
82 Args:
83 step_id: The unique identifier of the step
84 plan: The step execution plan
86 Raises:
87 AttributeError: If the context is frozen.
88 """
89 if self._is_frozen:
90 raise AttributeError("Cannot inject plan into a frozen ProcessingContext.")
91 self.step_plans[step_id] = plan
93 def freeze(self) -> None:
94 """
95 Freezes the context, making its attributes immutable.
97 This should be called after all compilation and plan injection is complete.
98 Essential attributes like step_plans, filemanager, and well_id must be set.
100 Raises:
101 RuntimeError: If essential attributes are not set before freezing.
102 """
103 if not self.well_id: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 raise RuntimeError("Cannot freeze ProcessingContext: 'well_id' is not set.")
105 if not hasattr(self, 'filemanager') or self.filemanager is None: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 raise RuntimeError("Cannot freeze ProcessingContext: 'filemanager' is not set.")
107 # step_plans can be empty if the pipeline is empty, but it must exist.
108 if not hasattr(self, 'step_plans'): 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 raise RuntimeError("Cannot freeze ProcessingContext: 'step_plans' attribute does not exist.")
111 self._is_frozen = True # This assignment is allowed by __setattr__
113 def is_frozen(self) -> bool:
114 """
115 Check if the context is frozen.
117 Returns:
118 True if the context is frozen, False otherwise.
119 """
120 return self._is_frozen
124 # update_from_step_result method is removed as per plan.
126 # --- Config Getters ---
128 def get_vfs_config(self) -> VFSConfig:
129 """Returns the VFSConfig part of the global configuration."""
130 if not hasattr(self, 'global_config') or self.global_config is None: 130 ↛ 132line 130 didn't jump to line 132 because the condition on line 130 was never true
131 # This case should ideally not happen if Orchestrator always sets it.
132 raise RuntimeError("GlobalPipelineConfig not set on ProcessingContext.")
133 return self.global_config.vfs
135 def get_path_planning_config(self) -> PathPlanningConfig:
136 """Returns the PathPlanningConfig part of the global configuration."""
137 if not hasattr(self, 'global_config') or self.global_config is None: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 raise RuntimeError("GlobalPipelineConfig not set on ProcessingContext.")
139 return self.global_config.path_planning
141 def get_num_workers(self) -> int:
142 """Returns the number of workers from the global configuration."""
143 if not hasattr(self, 'global_config') or self.global_config is None:
144 raise RuntimeError("GlobalPipelineConfig not set on ProcessingContext.")
145 return self.global_config.num_workers