Coverage for openhcs/core/pipeline/compiler.py: 72.7%

395 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2Pipeline module for OpenHCS. 

3 

4This module provides the core pipeline compilation components for OpenHCS. 

5The PipelineCompiler is responsible for preparing step_plans within a ProcessingContext. 

6 

7Doctrinal Clauses: 

8- Clause 12 — Absolute Clean Execution 

9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath) 

10- Clause 17-B — Path Format Discipline 

11- Clause 66 — Immutability After Construction 

12- Clause 88 — No Inferred Capabilities 

13- Clause 101 — Memory Type Declaration 

14- Clause 245 — Path Declaration 

15- Clause 273 — Backend Authorization Doctrine 

16- Clause 281 — Context-Bound Identifiers 

17- Clause 293 — GPU Pre-Declaration Enforcement 

18- Clause 295 — GPU Scheduling Affinity 

19- Clause 504 — Pipeline Preparation Modifications 

20- Clause 524 — Step = Declaration = ID = Runtime Authority 

21""" 

22 

23import inspect 

24import logging 

25import json 

26import dataclasses 

27from pathlib import Path 

28from typing import Any, Callable, Dict, List, Optional, Union 

29from collections import OrderedDict # For special_outputs and special_inputs order (used by PathPlanner) 

30 

31from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES, READ_BACKEND, WRITE_BACKEND, Backend 

32from openhcs.core.context.processing_context import ProcessingContext 

33from openhcs.core.config import MaterializationBackend, PathPlanningConfig, WellFilterMode 

34from openhcs.core.pipeline.funcstep_contract_validator import \ 

35 FuncStepContractValidator 

36from openhcs.core.pipeline.materialization_flag_planner import \ 

37 MaterializationFlagPlanner 

38from openhcs.core.pipeline.path_planner import PipelinePathPlanner 

39from openhcs.core.pipeline.gpu_memory_validator import \ 

40 GPUMemoryTypeValidator 

41from openhcs.core.steps.abstract import AbstractStep 

42from openhcs.core.steps.function_step import FunctionStep # Used for isinstance check 

43from dataclasses import dataclass 

44from typing import Callable 

45logger = logging.getLogger(__name__) 

46 

47 

48@dataclass(frozen=True) 

49class FunctionReference: 

50 """ 

51 A picklable reference to a function in the registry. 

52 

53 This replaces raw function objects in compiled step definitions to ensure 

54 picklability while allowing workers to resolve functions from their registry. 

55 """ 

56 function_name: str 

57 registry_name: str 

58 memory_type: str # The memory type for get_function_by_name() (e.g., "numpy", "pyclesperanto") 

59 composite_key: str # The full registry key (e.g., "pyclesperanto:gaussian_blur") 

60 

61 def resolve(self) -> Callable: 

62 """Resolve this reference to the actual decorated function from registry.""" 

63 if self.registry_name == "openhcs": 63 ↛ 73line 63 didn't jump to line 73 because the condition on line 63 was always true

64 # For OpenHCS functions, use RegistryService directly with composite key 

65 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

66 all_functions = RegistryService.get_all_functions_with_metadata() 

67 if self.composite_key in all_functions: 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true

68 return all_functions[self.composite_key].func 

69 else: 

70 raise RuntimeError(f"OpenHCS function {self.composite_key} not found in registry") 

71 else: 

72 # For external library functions, use the memory type for lookup 

73 from openhcs.processing.func_registry import get_function_by_name 

74 return get_function_by_name(self.function_name, self.memory_type) 

75 

76 

77def _refresh_function_objects_in_steps(pipeline_definition: List[AbstractStep]) -> None: 

78 """ 

79 Refresh all function objects in pipeline steps to ensure they're picklable. 

80 

81 This recreates function objects by importing them fresh from their original modules, 

82 similar to how code mode works, which avoids unpicklable closures from registry wrapping. 

83 """ 

84 for step in pipeline_definition: 

85 if hasattr(step, 'func') and step.func is not None: 85 ↛ 84line 85 didn't jump to line 84 because the condition on line 85 was always true

86 step.func = _refresh_function_object(step.func) 

87 

88 

89def _refresh_function_object(func_value): 

90 """Convert function objects to picklable FunctionReference objects.""" 

91 try: 

92 if callable(func_value) and hasattr(func_value, '__module__'): 

93 # Single function → FunctionReference 

94 return _get_function_reference(func_value) 

95 

96 elif isinstance(func_value, tuple) and len(func_value) == 2: 

97 # Function with parameters tuple → (FunctionReference, params) 

98 func, params = func_value 

99 if callable(func): 

100 func_ref = _refresh_function_object(func) 

101 return (func_ref, params) 

102 

103 elif isinstance(func_value, list): 

104 # List of functions → List of FunctionReferences 

105 return [_refresh_function_object(item) for item in func_value] 

106 

107 elif isinstance(func_value, dict): 

108 # Dict of functions → Dict of FunctionReferences 

109 return {key: _refresh_function_object(value) for key, value in func_value.items()} 

110 

111 except Exception as e: 

112 import logging 

113 logger = logging.getLogger(__name__) 

114 logger.warning(f"Failed to create function reference for {func_value}: {e}") 

115 # If we can't create a reference, return original (may fail later) 

116 return func_value 

117 

118 return func_value 

119 

120 

121def _get_function_reference(func): 

122 """Convert a function to a picklable FunctionReference.""" 

123 try: 

124 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

125 

126 # Get all function metadata to find this function 

127 all_functions = RegistryService.get_all_functions_with_metadata() 

128 

129 # Find the metadata for this function by matching name and module 

130 for composite_key, metadata in all_functions.items(): 130 ↛ 148line 130 didn't jump to line 148 because the loop on line 130 didn't complete

131 if (metadata.func.__name__ == func.__name__ and 

132 metadata.func.__module__ == func.__module__): 

133 # Create a picklable reference instead of the function object 

134 return FunctionReference( 

135 function_name=func.__name__, 

136 registry_name=metadata.registry.library_name, 

137 memory_type=metadata.registry.MEMORY_TYPE, 

138 composite_key=composite_key 

139 ) 

140 

141 except Exception as e: 

142 import logging 

143 logger = logging.getLogger(__name__) 

144 logger.warning(f"Failed to create function reference for {func.__name__}: {e}") 

145 

146 # If we can't create a reference, this function isn't in the registry 

147 # This should not happen for properly registered functions 

148 raise RuntimeError(f"Function {func.__name__} not found in registry - cannot create reference") 

149 

150 

151def _normalize_step_attributes(pipeline_definition: List[AbstractStep]) -> None: 

152 """Backwards compatibility: Set missing step attributes to constructor defaults.""" 

153 sig = inspect.signature(AbstractStep.__init__) 

154 # Include ALL parameters with defaults, even None values 

155 defaults = {name: param.default for name, param in sig.parameters.items() 

156 if name != 'self' and param.default is not inspect.Parameter.empty} 

157 

158 # Add attributes that are set manually in AbstractStep.__init__ but not constructor parameters 

159 manual_attributes = { 

160 '__input_dir__': None, 

161 '__output_dir__': None, 

162 } 

163 

164 for i, step in enumerate(pipeline_definition): 

165 # Set missing constructor parameters 

166 for attr_name, default_value in defaults.items(): 

167 if not hasattr(step, attr_name): 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 setattr(step, attr_name, default_value) 

169 

170 # Set missing manual attributes (for backwards compatibility with older serialized steps) 

171 for attr_name, default_value in manual_attributes.items(): 

172 if not hasattr(step, attr_name): 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 setattr(step, attr_name, default_value) 

174 

175 

176class PipelineCompiler: 

177 """ 

178 Compiles a pipeline by populating step plans within a ProcessingContext. 

179 

180 This class provides static methods that are called sequentially by the 

181 PipelineOrchestrator for each well's ProcessingContext. Each method 

182 is responsible for a specific part of the compilation process, such as 

183 path planning, special I/O resolution, materialization flag setting, 

184 memory contract validation, and GPU resource assignment. 

185 """ 

186 

187 @staticmethod 

188 def initialize_step_plans_for_context( 

189 context: ProcessingContext, 

190 steps_definition: List[AbstractStep], 

191 orchestrator, 

192 metadata_writer: bool = False, 

193 plate_path: Optional[Path] = None 

194 # base_input_dir and axis_id parameters removed, will use from context 

195 ) -> None: 

196 """ 

197 Initializes step_plans by calling PipelinePathPlanner.prepare_pipeline_paths, 

198 which handles primary paths, special I/O path planning and linking, and chainbreaker status. 

199 Then, this method supplements the plans with non-I/O FunctionStep-specific attributes. 

200 

201 Args: 

202 context: ProcessingContext to initialize step plans for 

203 steps_definition: List of AbstractStep objects defining the pipeline 

204 orchestrator: Orchestrator instance for well filter resolution 

205 metadata_writer: If True, this well is responsible for creating OpenHCS metadata files 

206 plate_path: Path to plate root for zarr conversion detection 

207 """ 

208 # NOTE: This method is called within config_context() wrapper in compile_pipelines() 

209 if context.is_frozen(): 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 raise AttributeError("Cannot initialize step plans in a frozen ProcessingContext.") 

211 

212 if not hasattr(context, 'step_plans') or context.step_plans is None: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 context.step_plans = {} # Ensure step_plans dict exists 

214 

215 # === VISUALIZER CONFIG EXTRACTION === 

216 # Extract visualizer config from orchestrator.pipeline_config 

217 # The caller has already set up config_context(orchestrator.pipeline_config) 

218 # so we can just access the field directly - lazy resolution happens automatically 

219 context.visualizer_config = orchestrator.pipeline_config.visualizer_config 

220 

221 # === BACKWARDS COMPATIBILITY PREPROCESSING === 

222 # Ensure all steps have complete attribute sets based on AbstractStep constructor 

223 # This must happen before any other compilation logic to eliminate defensive programming 

224 logger.debug("🔧 BACKWARDS COMPATIBILITY: Normalizing step attributes...") 

225 _normalize_step_attributes(steps_definition) 

226 

227 

228 

229 # Pre-initialize step_plans with basic entries for each step 

230 # Use step index as key instead of step_id for multiprocessing compatibility 

231 for step_index, step in enumerate(steps_definition): 

232 if step_index not in context.step_plans: 232 ↛ 231line 232 didn't jump to line 231 because the condition on line 232 was always true

233 context.step_plans[step_index] = { 

234 "step_name": step.name, 

235 "step_type": step.__class__.__name__, 

236 "axis_id": context.axis_id, 

237 } 

238 

239 # === INPUT CONVERSION DETECTION === 

240 # Check if first step needs zarr conversion 

241 if steps_definition and plate_path: 241 ↛ 269line 241 didn't jump to line 269 because the condition on line 241 was always true

242 first_step = steps_definition[0] 

243 # Access config directly from orchestrator.pipeline_config (lazy resolution via config_context) 

244 vfs_config = orchestrator.pipeline_config.vfs_config 

245 

246 # Only convert if default materialization backend is ZARR 

247 wants_zarr_conversion = ( 

248 vfs_config.materialization_backend == MaterializationBackend.ZARR 

249 ) 

250 

251 if wants_zarr_conversion: 

252 # Check if input plate is already zarr format 

253 available_backends = context.microscope_handler.get_available_backends(plate_path) 

254 already_zarr = Backend.ZARR in available_backends 

255 

256 if not already_zarr: 256 ↛ 269line 256 didn't jump to line 269 because the condition on line 256 was always true

257 # Inject input conversion config using existing PathPlanningConfig pattern 

258 path_config = orchestrator.pipeline_config.path_planning_config 

259 conversion_config = PathPlanningConfig( 

260 output_dir_suffix="", # No suffix - write to plate root 

261 global_output_folder=plate_path.parent, # Parent of plate 

262 sub_dir=path_config.sub_dir # Use same sub_dir (e.g., "images") 

263 ) 

264 context.step_plans[0]["input_conversion_config"] = conversion_config 

265 logger.debug(f"Input conversion to zarr enabled for first step: {first_step.name}") 

266 

267 # The axis_id and base_input_dir are available from the context object. 

268 # Path planning now gets config directly from orchestrator.pipeline_config parameter 

269 PipelinePathPlanner.prepare_pipeline_paths( 

270 context, 

271 steps_definition, 

272 orchestrator.pipeline_config 

273 ) 

274 

275 # === FUNCTION OBJECT REFRESH === 

276 # CRITICAL FIX: Refresh all function objects to ensure they're picklable 

277 # This prevents multiprocessing pickling errors by ensuring clean function objects 

278 logger.debug("🔧 FUNCTION REFRESH: Refreshing all function objects for picklability...") 

279 _refresh_function_objects_in_steps(steps_definition) 

280 

281 # === LAZY CONFIG RESOLUTION === 

282 # Resolve each step's lazy configs with proper nested context 

283 # This ensures step-level configs inherit from pipeline-level configs 

284 # Architecture: GlobalPipelineConfig -> PipelineConfig -> Step (same as UI) 

285 logger.debug("🔧 LAZY CONFIG RESOLUTION: Resolving lazy configs with nested step contexts...") 

286 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization 

287 from openhcs.config_framework.context_manager import config_context 

288 

289 # Resolve each step individually with nested context (pipeline -> step) 

290 # NOTE: The caller has already set up config_context(orchestrator.pipeline_config) 

291 # We add step-level context on top for each step 

292 resolved_steps = [] 

293 for step in steps_definition: 

294 with config_context(step): # Step-level context on top of pipeline context 

295 resolved_step = resolve_lazy_configurations_for_serialization(step) 

296 resolved_steps.append(resolved_step) 

297 steps_definition = resolved_steps 

298 

299 # Loop to supplement step_plans with non-I/O, non-path attributes 

300 # after PipelinePathPlanner has fully populated them with I/O info. 

301 for step_index, step in enumerate(steps_definition): 

302 if step_index not in context.step_plans: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 logger.error( 

304 f"Critical error: Step {step.name} (index: {step_index}) " 

305 f"not found in step_plans after path planning phase. Clause 504." 

306 ) 

307 # Create a minimal error plan 

308 context.step_plans[step_index] = { 

309 "step_name": step.name, 

310 "step_type": step.__class__.__name__, 

311 "axis_id": context.axis_id, # Use context.axis_id 

312 "error": "Missing from path planning phase by PipelinePathPlanner", 

313 "create_openhcs_metadata": metadata_writer # Set metadata writer responsibility flag 

314 } 

315 continue 

316 

317 current_plan = context.step_plans[step_index] 

318 

319 # Ensure basic metadata (PathPlanner should set most of this) 

320 current_plan["step_name"] = step.name 

321 current_plan["step_type"] = step.__class__.__name__ 

322 current_plan["axis_id"] = context.axis_id # Use context.axis_id; PathPlanner should also use context.axis_id 

323 current_plan.setdefault("visualize", False) # Ensure visualize key exists 

324 current_plan["create_openhcs_metadata"] = metadata_writer # Set metadata writer responsibility flag 

325 

326 # The special_outputs and special_inputs are now fully handled by PipelinePathPlanner. 

327 # The block for planning special_outputs (lines 134-148 in original) is removed. 

328 # Ensure these keys exist as OrderedDicts if PathPlanner doesn't guarantee it 

329 # (PathPlanner currently creates them as dicts, OrderedDict might not be strictly needed here anymore) 

330 current_plan.setdefault("special_inputs", OrderedDict()) 

331 current_plan.setdefault("special_outputs", OrderedDict()) 

332 current_plan.setdefault("chainbreaker", False) # PathPlanner now sets this. 

333 

334 # Add step-specific attributes (non-I/O, non-path related) 

335 current_plan["variable_components"] = step.variable_components 

336 current_plan["group_by"] = step.group_by 

337 # Lazy configs were already resolved at the beginning of compilation 

338 resolved_step = step 

339 

340 # DEBUG: Check what the resolved napari config actually has 

341 if hasattr(resolved_step, 'napari_streaming_config') and resolved_step.napari_streaming_config: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true

342 logger.debug(f"resolved_step.napari_streaming_config.well_filter = {resolved_step.napari_streaming_config.well_filter}") 

343 if hasattr(resolved_step, 'step_well_filter_config') and resolved_step.step_well_filter_config: 343 ↛ 345line 343 didn't jump to line 345 because the condition on line 343 was always true

344 logger.debug(f"resolved_step.step_well_filter_config.well_filter = {resolved_step.step_well_filter_config.well_filter}") 

345 if hasattr(resolved_step, 'step_materialization_config') and resolved_step.step_materialization_config: 

346 logger.debug(f"resolved_step.step_materialization_config.sub_dir = '{resolved_step.step_materialization_config.sub_dir}' (type: {type(resolved_step.step_materialization_config).__name__})") 

347 

348 # Store WellFilterConfig instances only if they match the current axis 

349 from openhcs.core.config import WellFilterConfig, StreamingConfig, WellFilterMode 

350 has_streaming = False 

351 required_visualizers = getattr(context, 'required_visualizers', []) 

352 

353 # CRITICAL FIX: Ensure required_visualizers is always set on context 

354 # This prevents AttributeError during execution phase 

355 if not hasattr(context, 'required_visualizers'): 

356 context.required_visualizers = [] 

357 

358 # Get step axis filters for this step 

359 step_axis_filters = getattr(context, 'step_axis_filters', {}).get(step_index, {}) 

360 

361 logger.debug(f"Processing step '{step.name}' with attributes: {[attr for attr in dir(resolved_step) if not attr.startswith('_') and 'config' in attr]}") 

362 if step.name == "Image Enhancement Processing": 

363 logger.debug(f"All attributes for {step.name}: {[attr for attr in dir(resolved_step) if not attr.startswith('_')]}") 

364 

365 for attr_name in dir(resolved_step): 

366 if not attr_name.startswith('_'): 

367 config = getattr(resolved_step, attr_name, None) 

368 # Configs are already resolved to base configs at line 277 

369 # No need to call to_base_config() again - that's legacy code 

370 

371 # Unified handling: compute inclusion for any WellFilterConfig (StreamingConfig subclasses it) 

372 is_streaming = config is not None and isinstance(config, StreamingConfig) 

373 is_wellfilter = config is not None and isinstance(config, WellFilterConfig) 

374 

375 include_config = False 

376 if is_wellfilter: 

377 # Check if this config has a well filter and if current axis matches 

378 should_include_config = True 

379 if config.well_filter is not None: 379 ↛ 388line 379 didn't jump to line 388 because the condition on line 379 was always true

380 config_filter = step_axis_filters.get(attr_name) 

381 if config_filter: 381 ↛ 388line 381 didn't jump to line 388 because the condition on line 381 was always true

382 # Apply axis filter logic 

383 axis_in_filter = context.axis_id in config_filter['resolved_axis_values'] 

384 should_include_config = ( 

385 axis_in_filter if config_filter['filter_mode'] == WellFilterMode.INCLUDE 

386 else not axis_in_filter 

387 ) 

388 if should_include_config: 

389 include_config = True 

390 

391 # Only add the config to the plan if it's included (or not a WellFilterConfig) 

392 if include_config or (not is_wellfilter and config is not None): 

393 current_plan[attr_name] = config 

394 

395 # Streaming extras only apply if the streaming config is included 

396 if is_streaming and include_config: 396 ↛ 398line 396 didn't jump to line 398 because the condition on line 396 was never true

397 # Validate that the visualizer can actually be created 

398 try: 

399 # Only validate configs that actually have a backend (real streaming configs) 

400 if not hasattr(config, 'backend'): 

401 continue 

402 

403 # Test visualizer creation without actually creating it 

404 if hasattr(config, 'create_visualizer'): 

405 # For napari, check if napari is available and environment supports GUI 

406 if config.backend.name == 'NAPARI_STREAM': 

407 from openhcs.utils.import_utils import optional_import 

408 import os 

409 

410 # Check if running in headless/CI environment 

411 is_headless = ( 

412 os.getenv('OPENHCS_CPU_ONLY', 'false').lower() == 'true' or 

413 os.getenv('CI', 'false').lower() == 'true' or 

414 os.getenv('DISPLAY') is None 

415 ) 

416 

417 if is_headless: 

418 logger.info(f"Napari streaming disabled for step '{step.name}': running in headless environment (CI/CPU_ONLY mode)") 

419 continue # Skip this streaming config 

420 

421 napari = optional_import("napari") 

422 if napari is None: 

423 logger.warning(f"Napari streaming disabled for step '{step.name}': napari not installed. Install with: pip install 'openhcs[viz]' or pip install napari") 

424 continue # Skip this streaming config 

425 

426 has_streaming = True 

427 # Collect visualizer info 

428 visualizer_info = { 

429 'backend': config.backend.name, 

430 'config': config 

431 } 

432 if visualizer_info not in required_visualizers: 

433 required_visualizers.append(visualizer_info) 

434 except Exception as e: 

435 logger.warning(f"Streaming disabled for step '{step.name}': {e}") 

436 continue # Skip this streaming config 

437 

438 # Set visualize flag for orchestrator if any streaming is enabled 

439 current_plan["visualize"] = has_streaming 

440 context.required_visualizers = required_visualizers 

441 

442 # Add FunctionStep specific attributes 

443 if isinstance(step, FunctionStep): 443 ↛ exitline 443 didn't return from function 'initialize_step_plans_for_context' because the condition on line 443 was always true

444 

445 # 🎯 SEMANTIC COHERENCE FIX: Prevent group_by/variable_components conflict 

446 # When variable_components contains the same value as group_by, 

447 # set group_by to None to avoid EZStitcher heritage rule violation 

448 if (step.variable_components and step.group_by and 448 ↛ 450line 448 didn't jump to line 450 because the condition on line 448 was never true

449 step.group_by in step.variable_components): 

450 logger.debug(f"Step {step.name}: Detected group_by='{step.group_by}' in variable_components={step.variable_components}. " 

451 f"Setting group_by=None to maintain semantic coherence.") 

452 current_plan["group_by"] = None 

453 

454 # func attribute is guaranteed in FunctionStep.__init__ 

455 current_plan["func_name"] = getattr(step.func, '__name__', str(step.func)) 

456 

457 # Memory type hints from step instance (set in FunctionStep.__init__ if provided) 

458 # These are initial hints; FuncStepContractValidator will set final types. 

459 if hasattr(step, 'input_memory_type_hint'): # From FunctionStep.__init__ 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true

460 current_plan['input_memory_type_hint'] = step.input_memory_type_hint 

461 if hasattr(step, 'output_memory_type_hint'): # From FunctionStep.__init__ 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true

462 current_plan['output_memory_type_hint'] = step.output_memory_type_hint 

463 

464 # The resolve_special_input_paths_for_context static method is DELETED (lines 181-238 of original) 

465 # as this functionality is now handled by PipelinePathPlanner.prepare_pipeline_paths. 

466 

467 # _prepare_materialization_flags is removed as MaterializationFlagPlanner.prepare_pipeline_flags 

468 # now modifies context.step_plans in-place and takes context directly. 

469 

470 @staticmethod 

471 def declare_zarr_stores_for_context( 

472 context: ProcessingContext, 

473 steps_definition: List[AbstractStep], 

474 orchestrator 

475 ) -> None: 

476 """ 

477 Declare zarr store creation functions for runtime execution. 

478 

479 This method runs after path planning but before materialization flag planning 

480 to declare which steps need zarr stores and provide the metadata needed 

481 for runtime store creation. 

482 

483 Args: 

484 context: ProcessingContext for current well 

485 steps_definition: List of AbstractStep objects 

486 orchestrator: Orchestrator instance for accessing all wells 

487 """ 

488 from openhcs.constants.constants import Backend 

489 from openhcs.constants import MULTIPROCESSING_AXIS 

490 

491 all_wells = orchestrator.get_component_keys(MULTIPROCESSING_AXIS) 

492 

493 # Access config directly from orchestrator.pipeline_config (lazy resolution via config_context) 

494 vfs_config = orchestrator.pipeline_config.vfs_config 

495 

496 for step_index, step in enumerate(steps_definition): 

497 step_plan = context.step_plans[step_index] 

498 

499 will_use_zarr = ( 

500 vfs_config.materialization_backend == MaterializationBackend.ZARR and 

501 step_index == len(steps_definition) - 1 

502 ) 

503 

504 if will_use_zarr: 

505 step_plan["zarr_config"] = { 

506 "all_wells": all_wells, 

507 "needs_initialization": True 

508 } 

509 logger.debug(f"Step '{step.name}' will use zarr backend for axis {context.axis_id}") 

510 else: 

511 step_plan["zarr_config"] = None 

512 

513 @staticmethod 

514 def plan_materialization_flags_for_context( 

515 context: ProcessingContext, 

516 steps_definition: List[AbstractStep], 

517 orchestrator 

518 ) -> None: 

519 """ 

520 Plans and injects materialization flags into context.step_plans 

521 by calling MaterializationFlagPlanner. 

522 """ 

523 if context.is_frozen(): 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true

524 raise AttributeError("Cannot plan materialization flags in a frozen ProcessingContext.") 

525 if not context.step_plans: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true

526 logger.warning("step_plans is empty in context for materialization planning. This may be valid if pipeline is empty.") 

527 return 

528 

529 # MaterializationFlagPlanner.prepare_pipeline_flags now takes context and pipeline_definition 

530 # and modifies context.step_plans in-place. 

531 MaterializationFlagPlanner.prepare_pipeline_flags( 

532 context, 

533 steps_definition, 

534 orchestrator.plate_path, 

535 orchestrator.pipeline_config 

536 ) 

537 

538 # Post-check (optional, but good for ensuring contracts are met by the planner) 

539 for step_index, step in enumerate(steps_definition): 

540 if step_index not in context.step_plans: 540 ↛ 542line 540 didn't jump to line 542 because the condition on line 540 was never true

541 # This should not happen if prepare_pipeline_flags guarantees plans for all steps 

542 logger.error(f"Step {step.name} (index: {step_index}) missing from step_plans after materialization planning.") 

543 continue 

544 

545 plan = context.step_plans[step_index] 

546 # Check for keys that FunctionStep actually uses during execution 

547 required_keys = [READ_BACKEND, WRITE_BACKEND] 

548 if not all(k in plan for k in required_keys): 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true

549 missing_keys = [k for k in required_keys if k not in plan] 

550 logger.error( 

551 f"Materialization flag planning incomplete for step {step.name} (index: {step_index}). " 

552 f"Missing required keys: {missing_keys} (Clause 273)." 

553 ) 

554 

555 

556 @staticmethod 

557 def validate_memory_contracts_for_context( 

558 context: ProcessingContext, 

559 steps_definition: List[AbstractStep], 

560 orchestrator=None 

561 ) -> None: 

562 """ 

563 Validates FunctionStep memory contracts, dict patterns, and adds memory type info to context.step_plans. 

564 

565 Args: 

566 context: ProcessingContext to validate 

567 steps_definition: List of AbstractStep objects 

568 orchestrator: Optional orchestrator for dict pattern key validation 

569 """ 

570 if context.is_frozen(): 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true

571 raise AttributeError("Cannot validate memory contracts in a frozen ProcessingContext.") 

572 

573 # FuncStepContractValidator might need access to input/output_memory_type_hint from plan 

574 step_memory_types = FuncStepContractValidator.validate_pipeline( 

575 steps=steps_definition, 

576 pipeline_context=context, # Pass context so validator can access step plans for memory type overrides 

577 orchestrator=orchestrator # Pass orchestrator for dict pattern key validation 

578 ) 

579 

580 for step_index, memory_types in step_memory_types.items(): 

581 if "input_memory_type" not in memory_types or "output_memory_type" not in memory_types: 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true

582 step_name = context.step_plans[step_index]["step_name"] 

583 raise AssertionError( 

584 f"Memory type validation must set input/output_memory_type for FunctionStep {step_name} (index: {step_index}) (Clause 101)." 

585 ) 

586 if step_index in context.step_plans: 586 ↛ 589line 586 didn't jump to line 589 because the condition on line 586 was always true

587 context.step_plans[step_index].update(memory_types) 

588 else: 

589 logger.warning(f"Step index {step_index} found in memory_types but not in context.step_plans. Skipping.") 

590 

591 # Apply memory type override: Any step with disk output must use numpy for disk writing 

592 for step_index, step in enumerate(steps_definition): 

593 if isinstance(step, FunctionStep): 593 ↛ 592line 593 didn't jump to line 592 because the condition on line 593 was always true

594 if step_index in context.step_plans: 594 ↛ 592line 594 didn't jump to line 592 because the condition on line 594 was always true

595 step_plan = context.step_plans[step_index] 

596 is_last_step = (step_index == len(steps_definition) - 1) 

597 write_backend = step_plan['write_backend'] 

598 

599 if write_backend == 'disk': 

600 logger.debug(f"Step {step.name} has disk output, overriding output_memory_type to numpy") 

601 step_plan['output_memory_type'] = 'numpy' 

602 

603 

604 

605 @staticmethod 

606 def assign_gpu_resources_for_context( 

607 context: ProcessingContext 

608 ) -> None: 

609 """ 

610 Validates GPU memory types from context.step_plans and assigns GPU device IDs. 

611 (Unchanged from previous version) 

612 """ 

613 if context.is_frozen(): 613 ↛ 614line 613 didn't jump to line 614 because the condition on line 613 was never true

614 raise AttributeError("Cannot assign GPU resources in a frozen ProcessingContext.") 

615 

616 gpu_assignments = GPUMemoryTypeValidator.validate_step_plans(context.step_plans) 

617 

618 for step_index, step_plan_val in context.step_plans.items(): # Renamed step_plan to step_plan_val to avoid conflict 

619 is_gpu_step = False 

620 input_type = step_plan_val["input_memory_type"] 

621 if input_type in VALID_GPU_MEMORY_TYPES: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true

622 is_gpu_step = True 

623 

624 output_type = step_plan_val["output_memory_type"] 

625 if output_type in VALID_GPU_MEMORY_TYPES: 625 ↛ 626line 625 didn't jump to line 626 because the condition on line 625 was never true

626 is_gpu_step = True 

627 

628 if is_gpu_step: 628 ↛ 631line 628 didn't jump to line 631 because the condition on line 628 was never true

629 # Ensure gpu_assignments has an entry for this step_index if it's a GPU step 

630 # And that entry contains a 'gpu_id' 

631 step_gpu_assignment = gpu_assignments[step_index] 

632 if "gpu_id" not in step_gpu_assignment: 

633 step_name = step_plan_val["step_name"] 

634 raise AssertionError( 

635 f"GPU validation must assign gpu_id for step {step_name} (index: {step_index}) " 

636 f"with GPU memory types (Clause 295)." 

637 ) 

638 

639 for step_index, gpu_assignment in gpu_assignments.items(): 639 ↛ 640line 639 didn't jump to line 640 because the loop on line 639 never started

640 if step_index in context.step_plans: 

641 context.step_plans[step_index].update(gpu_assignment) 

642 else: 

643 logger.warning(f"Step index {step_index} found in gpu_assignments but not in context.step_plans. Skipping.") 

644 

645 @staticmethod 

646 def apply_global_visualizer_override_for_context( 

647 context: ProcessingContext, 

648 global_enable_visualizer: bool 

649 ) -> None: 

650 """ 

651 Applies global visualizer override to all step_plans in the context. 

652 (Unchanged from previous version) 

653 """ 

654 if context.is_frozen(): 

655 raise AttributeError("Cannot apply visualizer override in a frozen ProcessingContext.") 

656 

657 if global_enable_visualizer: 

658 if not context.step_plans: return # Guard against empty step_plans 

659 for step_index, plan in context.step_plans.items(): 

660 plan["visualize"] = True 

661 logger.info(f"Global visualizer override: Step '{plan['step_name']}' marked for visualization.") 

662 

663 @staticmethod 

664 def resolve_lazy_dataclasses_for_context(context: ProcessingContext, orchestrator) -> None: 

665 """ 

666 Resolve all lazy dataclass instances in step plans to their base configurations. 

667 

668 This method should be called after all compilation phases but before context 

669 freezing to ensure step plans are safe for pickling in multiprocessing contexts. 

670 

671 NOTE: The caller MUST have already set up config_context(orchestrator.pipeline_config) 

672 before calling this method. We rely on that context for lazy resolution. 

673 

674 Args: 

675 context: ProcessingContext to process 

676 orchestrator: PipelineOrchestrator (unused - kept for API compatibility) 

677 """ 

678 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization 

679 

680 # Resolve the entire context recursively to catch all lazy dataclass instances 

681 # The caller has already set up config_context(), so lazy resolution happens automatically 

682 resolved_context_dict = resolve_lazy_configurations_for_serialization(vars(context)) 

683 

684 # Update context attributes with resolved values 

685 for attr_name, resolved_value in resolved_context_dict.items(): 

686 if not attr_name.startswith('_'): # Skip private attributes 

687 setattr(context, attr_name, resolved_value) 

688 

689 @staticmethod 

690 def compile_pipelines( 

691 orchestrator, 

692 pipeline_definition: List[AbstractStep], 

693 axis_filter: Optional[List[str]] = None, 

694 enable_visualizer_override: bool = False 

695 ) -> Dict[str, ProcessingContext]: 

696 """ 

697 Compile-all phase: Prepares frozen ProcessingContexts for each axis value. 

698 

699 This method iterates through the specified axis values, creates a ProcessingContext 

700 for each, and invokes the various phases of the PipelineCompiler to populate 

701 the context's step_plans. After all compilation phases for an axis value are complete, 

702 its context is frozen. Finally, attributes are stripped from the pipeline_definition, 

703 making the step objects stateless for the execution phase. 

704 

705 Args: 

706 orchestrator: The PipelineOrchestrator instance to use for compilation 

707 pipeline_definition: The list of AbstractStep objects defining the pipeline. 

708 axis_filter: Optional list of axis values to process. If None, processes all found axis values. 

709 enable_visualizer_override: If True, all steps in all compiled contexts 

710 will have their 'visualize' flag set to True. 

711 

712 Returns: 

713 A dictionary mapping axis values to their compiled and frozen ProcessingContexts. 

714 The input `pipeline_definition` list (of step objects) is modified in-place 

715 to become stateless. 

716 """ 

717 from openhcs.constants.constants import VariableComponents, OrchestratorState 

718 from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper 

719 

720 if not orchestrator.is_initialized(): 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true

721 raise RuntimeError("PipelineOrchestrator must be explicitly initialized before calling compile_pipelines().") 

722 

723 if not pipeline_definition: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 raise ValueError("A valid pipeline definition (List[AbstractStep]) must be provided.") 

725 

726 try: 

727 compiled_contexts: Dict[str, ProcessingContext] = {} 

728 # Get multiprocessing axis values dynamically from configuration 

729 from openhcs.constants import MULTIPROCESSING_AXIS 

730 axis_values_to_process = orchestrator.get_component_keys(MULTIPROCESSING_AXIS, axis_filter) 

731 

732 if not axis_values_to_process: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true

733 logger.warning("No axis values found to process based on filter.") 

734 return { 

735 'pipeline_definition': pipeline_definition, 

736 'compiled_contexts': {} 

737 } 

738 

739 logger.info(f"Starting compilation for axis values: {', '.join(axis_values_to_process)}") 

740 

741 # === GLOBAL AXIS FILTER RESOLUTION === 

742 # Resolve axis filters once for all axis values to ensure step-level inheritance works 

743 logger.debug("🔧 LAZY CONFIG RESOLUTION: Resolving lazy configs for axis filter resolution...") 

744 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization 

745 from openhcs.config_framework.context_manager import config_context 

746 

747 # Resolve each step with nested context (same as initialize_step_plans_for_context) 

748 # This ensures step-level configs inherit from pipeline-level configs 

749 resolved_steps_for_filters = [] 

750 with config_context(orchestrator.pipeline_config): 

751 for step in pipeline_definition: 

752 with config_context(step): # Step-level context on top of pipeline context 

753 resolved_step = resolve_lazy_configurations_for_serialization(step) 

754 resolved_steps_for_filters.append(resolved_step) 

755 

756 logger.debug("🎯 AXIS FILTER RESOLUTION: Resolving step axis filters...") 

757 # Create a temporary context to store the global axis filters 

758 temp_context = orchestrator.create_context("temp") 

759 

760 # Use orchestrator context during axis filter resolution 

761 # This ensures that lazy config resolution uses the orchestrator context 

762 from openhcs.config_framework.context_manager import config_context 

763 with config_context(orchestrator.pipeline_config): 

764 _resolve_step_axis_filters(resolved_steps_for_filters, temp_context, orchestrator) 

765 global_step_axis_filters = getattr(temp_context, 'step_axis_filters', {}) 

766 

767 # Determine responsible axis value for metadata creation (lexicographically first) 

768 responsible_axis_value = sorted(axis_values_to_process)[0] if axis_values_to_process else None 

769 logger.debug(f"Designated responsible axis value for metadata creation: {responsible_axis_value}") 

770 

771 for axis_id in axis_values_to_process: 

772 logger.debug(f"Compiling for axis value: {axis_id}") 

773 context = orchestrator.create_context(axis_id) 

774 

775 # Copy global axis filters to this context 

776 context.step_axis_filters = global_step_axis_filters 

777 

778 # Determine if this axis value is responsible for metadata creation 

779 is_responsible = (axis_id == responsible_axis_value) 

780 logger.debug(f"Axis {axis_id} metadata responsibility: {is_responsible}") 

781 

782 # CRITICAL: Wrap all compilation steps in config_context() for lazy resolution 

783 from openhcs.config_framework.context_manager import config_context 

784 with config_context(orchestrator.pipeline_config): 

785 PipelineCompiler.initialize_step_plans_for_context(context, pipeline_definition, orchestrator, metadata_writer=is_responsible, plate_path=orchestrator.plate_path) 

786 PipelineCompiler.declare_zarr_stores_for_context(context, pipeline_definition, orchestrator) 

787 PipelineCompiler.plan_materialization_flags_for_context(context, pipeline_definition, orchestrator) 

788 PipelineCompiler.validate_memory_contracts_for_context(context, pipeline_definition, orchestrator) 

789 PipelineCompiler.assign_gpu_resources_for_context(context) 

790 

791 if enable_visualizer_override: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true

792 PipelineCompiler.apply_global_visualizer_override_for_context(context, True) 

793 

794 # Resolve all lazy dataclasses before freezing to ensure multiprocessing compatibility 

795 PipelineCompiler.resolve_lazy_dataclasses_for_context(context, orchestrator) 

796 

797 

798 

799 

800 

801 context.freeze() 

802 compiled_contexts[axis_id] = context 

803 logger.debug(f"Compilation finished for axis value: {axis_id}") 

804 

805 # Log path planning summary once per plate 

806 if compiled_contexts: 806 ↛ 823line 806 didn't jump to line 823 because the condition on line 806 was always true

807 first_context = next(iter(compiled_contexts.values())) 

808 logger.info(f"📁 PATH PLANNING SUMMARY:") 

809 logger.info(f" Main pipeline output: {first_context.output_plate_root}") 

810 

811 # Check for materialization steps in first context 

812 materialization_steps = [] 

813 for step_id, plan in first_context.step_plans.items(): 

814 if 'materialized_output_dir' in plan: 

815 step_name = plan.get('step_name', f'step_{step_id}') 

816 mat_path = plan['materialized_output_dir'] 

817 materialization_steps.append((step_name, mat_path)) 

818 

819 for step_name, mat_path in materialization_steps: 

820 logger.info(f" Materialization {step_name}: {mat_path}") 

821 

822 # After processing all wells, strip attributes and finalize 

823 logger.info("Stripping attributes from pipeline definition steps.") 

824 StepAttributeStripper.strip_step_attributes(pipeline_definition, {}) 

825 

826 orchestrator._state = OrchestratorState.COMPILED 

827 

828 # Log worker configuration for execution planning 

829 effective_config = orchestrator.get_effective_config() 

830 logger.info(f"⚙️ EXECUTION CONFIG: {effective_config.num_workers} workers configured for pipeline execution") 

831 

832 logger.info(f"🏁 COMPILATION COMPLETE: {len(compiled_contexts)} wells compiled successfully") 

833 

834 # Return expected structure with both pipeline_definition and compiled_contexts 

835 return { 

836 'pipeline_definition': pipeline_definition, 

837 'compiled_contexts': compiled_contexts 

838 } 

839 except Exception as e: 

840 orchestrator._state = OrchestratorState.COMPILE_FAILED 

841 logger.error(f"Failed to compile pipelines: {e}") 

842 raise 

843 

844 

845 

846# The monolithic compile() method is removed. 

847# Orchestrator will call the static methods above in sequence. 

848# _strip_step_attributes is also removed as StepAttributeStripper is called by Orchestrator. 

849 

850 

851def _resolve_step_axis_filters(resolved_steps: List[AbstractStep], context, orchestrator): 

852 """ 

853 Resolve axis filters for steps with any WellFilterConfig instances. 

854 

855 This function handles step-level axis filtering by resolving patterns like 

856 "row:A", ["A01", "B02"], or max counts against the available axis values for the plate. 

857 It processes ALL WellFilterConfig instances (materialization, streaming, etc.) uniformly. 

858 

859 Args: 

860 resolved_steps: List of pipeline steps with lazy configs already resolved 

861 context: Processing context for the current axis value 

862 orchestrator: Orchestrator instance with access to available axis values 

863 """ 

864 from openhcs.core.utils import WellFilterProcessor 

865 from openhcs.core.config import WellFilterConfig 

866 

867 # Get available axis values from orchestrator using multiprocessing axis 

868 from openhcs.constants import MULTIPROCESSING_AXIS 

869 available_axis_values = orchestrator.get_component_keys(MULTIPROCESSING_AXIS) 

870 if not available_axis_values: 870 ↛ 871line 870 didn't jump to line 871 because the condition on line 870 was never true

871 logger.warning("No available axis values found for axis filter resolution") 

872 return 

873 

874 # Initialize step_axis_filters in context if not present 

875 if not hasattr(context, 'step_axis_filters'): 875 ↛ 879line 875 didn't jump to line 879 because the condition on line 875 was always true

876 context.step_axis_filters = {} 

877 

878 # Process each step for ALL WellFilterConfig instances using the already resolved steps 

879 for step_index, resolved_step in enumerate(resolved_steps): 

880 step_filters = {} 

881 

882 # Check all attributes for WellFilterConfig instances on the RESOLVED step 

883 for attr_name in dir(resolved_step): 

884 if not attr_name.startswith('_'): 

885 config = getattr(resolved_step, attr_name, None) 

886 if config is not None and isinstance(config, WellFilterConfig) and config.well_filter is not None: 

887 try: 

888 # Resolve the axis filter pattern to concrete axis values 

889 resolved_axis_values = WellFilterProcessor.resolve_compilation_filter( 

890 config.well_filter, 

891 available_axis_values 

892 ) 

893 

894 # Store resolved axis values for this config 

895 step_filters[attr_name] = { 

896 'resolved_axis_values': sorted(resolved_axis_values), 

897 'filter_mode': config.well_filter_mode, 

898 'original_filter': config.well_filter 

899 } 

900 

901 logger.debug(f"Step '{resolved_step.name}' {attr_name} filter '{config.well_filter}' " 

902 f"resolved to {len(resolved_axis_values)} axis values: {sorted(resolved_axis_values)}") 

903 logger.debug(f"Step '{resolved_step.name}' {attr_name} filter '{config.well_filter}' " 

904 f"resolved to {len(resolved_axis_values)} axis values: {sorted(resolved_axis_values)}") 

905 

906 except Exception as e: 

907 logger.error(f"Failed to resolve axis filter for step '{resolved_step.name}' {attr_name}: {e}") 

908 raise ValueError(f"Invalid axis filter '{config.well_filter}' " 

909 f"for step '{resolved_step.name}' {attr_name}: {e}") 

910 

911 # Store step filters if any were found 

912 if step_filters: 912 ↛ 879line 912 didn't jump to line 879 because the condition on line 912 was always true

913 context.step_axis_filters[step_index] = step_filters 

914 

915 total_filters = sum(len(filters) for filters in context.step_axis_filters.values()) 

916 logger.debug(f"Axis filter resolution complete. {len(context.step_axis_filters)} steps have axis filters, {total_filters} total filters.") 

917 

918 

919def _should_process_for_well(axis_id, well_filter_config): 

920 """Unified well filtering logic for all WellFilterConfig systems.""" 

921 if well_filter_config.well_filter is None: 

922 return True 

923 

924 well_in_filter = axis_id in well_filter_config.well_filter 

925 return well_in_filter if well_filter_config.well_filter_mode == WellFilterMode.INCLUDE else not well_in_filter