Coverage for openhcs/core/pipeline/compiler.py: 73.7%

252 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Pipeline module for OpenHCS. 

3 

4This module provides the core pipeline compilation components for OpenHCS. 

5The PipelineCompiler is responsible for preparing step_plans within a ProcessingContext. 

6 

7Doctrinal Clauses: 

8- Clause 12 — Absolute Clean Execution 

9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath) 

10- Clause 17-B — Path Format Discipline 

11- Clause 66 — Immutability After Construction 

12- Clause 88 — No Inferred Capabilities 

13- Clause 101 — Memory Type Declaration 

14- Clause 245 — Path Declaration 

15- Clause 273 — Backend Authorization Doctrine 

16- Clause 281 — Context-Bound Identifiers 

17- Clause 293 — GPU Pre-Declaration Enforcement 

18- Clause 295 — GPU Scheduling Affinity 

19- Clause 504 — Pipeline Preparation Modifications 

20- Clause 524 — Step = Declaration = ID = Runtime Authority 

21""" 

22 

23import inspect 

24import logging 

25import json 

26from pathlib import Path 

27from typing import Any, Dict, List, Optional, Union # Callable removed 

28from collections import OrderedDict # For special_outputs and special_inputs order (used by PathPlanner) 

29 

30from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES, READ_BACKEND, WRITE_BACKEND, Backend 

31from openhcs.core.context.processing_context import ProcessingContext 

32from openhcs.core.config import MaterializationBackend, PathPlanningConfig 

33from openhcs.core.pipeline.funcstep_contract_validator import \ 

34 FuncStepContractValidator 

35from openhcs.core.pipeline.materialization_flag_planner import \ 

36 MaterializationFlagPlanner 

37from openhcs.core.pipeline.path_planner import PipelinePathPlanner 

38from openhcs.core.pipeline.gpu_memory_validator import \ 

39 GPUMemoryTypeValidator 

40from openhcs.core.steps.abstract import AbstractStep 

41from openhcs.core.steps.function_step import FunctionStep # Used for isinstance check 

42 

43logger = logging.getLogger(__name__) 

44 

45 

46def _normalize_step_attributes(pipeline_definition: List[AbstractStep]) -> None: 

47 """Backwards compatibility: Set missing step attributes to constructor defaults.""" 

48 sig = inspect.signature(AbstractStep.__init__) 

49 defaults = {name: param.default for name, param in sig.parameters.items() 

50 if name != 'self' and param.default != inspect.Parameter.empty} 

51 

52 for step in pipeline_definition: 

53 for attr_name, default_value in defaults.items(): 

54 if not hasattr(step, attr_name): 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true

55 setattr(step, attr_name, default_value) 

56 

57 

58class PipelineCompiler: 

59 """ 

60 Compiles a pipeline by populating step plans within a ProcessingContext. 

61 

62 This class provides static methods that are called sequentially by the 

63 PipelineOrchestrator for each well's ProcessingContext. Each method 

64 is responsible for a specific part of the compilation process, such as 

65 path planning, special I/O resolution, materialization flag setting, 

66 memory contract validation, and GPU resource assignment. 

67 """ 

68 

69 @staticmethod 

70 def initialize_step_plans_for_context( 

71 context: ProcessingContext, 

72 steps_definition: List[AbstractStep], 

73 orchestrator, 

74 metadata_writer: bool = False, 

75 plate_path: Optional[Path] = None 

76 # base_input_dir and well_id parameters removed, will use from context 

77 ) -> None: 

78 """ 

79 Initializes step_plans by calling PipelinePathPlanner.prepare_pipeline_paths, 

80 which handles primary paths, special I/O path planning and linking, and chainbreaker status. 

81 Then, this method supplements the plans with non-I/O FunctionStep-specific attributes. 

82 

83 Args: 

84 context: ProcessingContext to initialize step plans for 

85 steps_definition: List of AbstractStep objects defining the pipeline 

86 orchestrator: Orchestrator instance for well filter resolution 

87 metadata_writer: If True, this well is responsible for creating OpenHCS metadata files 

88 plate_path: Path to plate root for zarr conversion detection 

89 """ 

90 if context.is_frozen(): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 raise AttributeError("Cannot initialize step plans in a frozen ProcessingContext.") 

92 

93 if not hasattr(context, 'step_plans') or context.step_plans is None: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true

94 context.step_plans = {} # Ensure step_plans dict exists 

95 

96 # === BACKWARDS COMPATIBILITY PREPROCESSING === 

97 # Ensure all steps have complete attribute sets based on AbstractStep constructor 

98 # This must happen before any other compilation logic to eliminate defensive programming 

99 logger.debug("🔧 BACKWARDS COMPATIBILITY: Normalizing step attributes...") 

100 _normalize_step_attributes(steps_definition) 

101 

102 # === WELL FILTER RESOLUTION === 

103 # Resolve well filters for steps with materialization configs 

104 # This must happen after normalization to ensure materialization_config exists 

105 logger.debug("🎯 WELL FILTER RESOLUTION: Resolving step well filters...") 

106 _resolve_step_well_filters(steps_definition, context, orchestrator) 

107 

108 # Pre-initialize step_plans with basic entries for each step 

109 # This ensures step_plans is not empty when path planner checks it 

110 for step in steps_definition: 

111 if step.step_id not in context.step_plans: 111 ↛ 110line 111 didn't jump to line 110 because the condition on line 111 was always true

112 context.step_plans[step.step_id] = { 

113 "step_name": step.name, 

114 "step_type": step.__class__.__name__, 

115 "well_id": context.well_id, 

116 } 

117 

118 # === INPUT CONVERSION DETECTION === 

119 # Check if first step needs zarr conversion 

120 if steps_definition and plate_path: 120 ↛ 146line 120 didn't jump to line 146 because the condition on line 120 was always true

121 first_step = steps_definition[0] 

122 vfs_config = context.get_vfs_config() 

123 

124 # Only convert if default materialization backend is ZARR 

125 wants_zarr_conversion = ( 

126 vfs_config.materialization_backend == MaterializationBackend.ZARR 

127 ) 

128 

129 if wants_zarr_conversion: 

130 # Check if input plate is already zarr format 

131 available_backends = context.microscope_handler.get_available_backends(plate_path) 

132 already_zarr = Backend.ZARR in available_backends 

133 

134 if not already_zarr: 134 ↛ 146line 134 didn't jump to line 146 because the condition on line 134 was always true

135 # Inject input conversion config using existing PathPlanningConfig pattern 

136 path_config = context.get_path_planning_config() 

137 conversion_config = PathPlanningConfig( 

138 output_dir_suffix="", # No suffix - write to plate root 

139 global_output_folder=plate_path.parent, # Parent of plate 

140 sub_dir=path_config.sub_dir # Use same sub_dir (e.g., "images") 

141 ) 

142 context.step_plans[first_step.step_id]["input_conversion_config"] = conversion_config 

143 logger.debug(f"Input conversion to zarr enabled for first step: {first_step.name}") 

144 

145 # The well_id and base_input_dir are available from the context object. 

146 PipelinePathPlanner.prepare_pipeline_paths( 

147 context, 

148 steps_definition 

149 ) 

150 

151 # Loop to supplement step_plans with non-I/O, non-path attributes 

152 # after PipelinePathPlanner has fully populated them with I/O info. 

153 for step in steps_definition: 

154 step_id = step.step_id 

155 if step_id not in context.step_plans: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 logger.error( 

157 f"Critical error: Step {step.name} (ID: {step_id}) " 

158 f"not found in step_plans after path planning phase. Clause 504." 

159 ) 

160 # Create a minimal error plan 

161 context.step_plans[step_id] = { 

162 "step_name": step.name, 

163 "step_type": step.__class__.__name__, 

164 "well_id": context.well_id, # Use context.well_id 

165 "error": "Missing from path planning phase by PipelinePathPlanner", 

166 "create_openhcs_metadata": metadata_writer # Set metadata writer responsibility flag 

167 } 

168 continue 

169 

170 current_plan = context.step_plans[step_id] 

171 

172 # Ensure basic metadata (PathPlanner should set most of this) 

173 current_plan["step_name"] = step.name 

174 current_plan["step_type"] = step.__class__.__name__ 

175 current_plan["well_id"] = context.well_id # Use context.well_id; PathPlanner should also use context.well_id 

176 current_plan.setdefault("visualize", False) # Ensure visualize key exists 

177 current_plan["create_openhcs_metadata"] = metadata_writer # Set metadata writer responsibility flag 

178 

179 # The special_outputs and special_inputs are now fully handled by PipelinePathPlanner. 

180 # The block for planning special_outputs (lines 134-148 in original) is removed. 

181 # Ensure these keys exist as OrderedDicts if PathPlanner doesn't guarantee it 

182 # (PathPlanner currently creates them as dicts, OrderedDict might not be strictly needed here anymore) 

183 current_plan.setdefault("special_inputs", OrderedDict()) 

184 current_plan.setdefault("special_outputs", OrderedDict()) 

185 current_plan.setdefault("chainbreaker", False) # PathPlanner now sets this. 

186 

187 # Add step-specific attributes (non-I/O, non-path related) 

188 current_plan["variable_components"] = step.variable_components 

189 current_plan["group_by"] = step.group_by 

190 

191 # Store materialization_config if present 

192 if step.materialization_config is not None: 

193 current_plan["materialization_config"] = step.materialization_config 

194 

195 # Add FunctionStep specific attributes 

196 if isinstance(step, FunctionStep): 196 ↛ 153line 196 didn't jump to line 153 because the condition on line 196 was always true

197 

198 # 🎯 SEMANTIC COHERENCE FIX: Prevent group_by/variable_components conflict 

199 # When variable_components contains the same value as group_by, 

200 # set group_by to None to avoid EZStitcher heritage rule violation 

201 if (step.variable_components and step.group_by and 201 ↛ 203line 201 didn't jump to line 203 because the condition on line 201 was never true

202 step.group_by in step.variable_components): 

203 logger.debug(f"Step {step.name}: Detected group_by='{step.group_by}' in variable_components={step.variable_components}. " 

204 f"Setting group_by=None to maintain semantic coherence.") 

205 current_plan["group_by"] = None 

206 

207 # func attribute is guaranteed in FunctionStep.__init__ 

208 current_plan["func_name"] = getattr(step.func, '__name__', str(step.func)) 

209 

210 # Memory type hints from step instance (set in FunctionStep.__init__ if provided) 

211 # These are initial hints; FuncStepContractValidator will set final types. 

212 if hasattr(step, 'input_memory_type_hint'): # From FunctionStep.__init__ 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 current_plan['input_memory_type_hint'] = step.input_memory_type_hint 

214 if hasattr(step, 'output_memory_type_hint'): # From FunctionStep.__init__ 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 current_plan['output_memory_type_hint'] = step.output_memory_type_hint 

216 

217 # The resolve_special_input_paths_for_context static method is DELETED (lines 181-238 of original) 

218 # as this functionality is now handled by PipelinePathPlanner.prepare_pipeline_paths. 

219 

220 # _prepare_materialization_flags is removed as MaterializationFlagPlanner.prepare_pipeline_flags 

221 # now modifies context.step_plans in-place and takes context directly. 

222 

223 @staticmethod 

224 def declare_zarr_stores_for_context( 

225 context: ProcessingContext, 

226 steps_definition: List[AbstractStep], 

227 orchestrator 

228 ) -> None: 

229 """ 

230 Declare zarr store creation functions for runtime execution. 

231 

232 This method runs after path planning but before materialization flag planning 

233 to declare which steps need zarr stores and provide the metadata needed 

234 for runtime store creation. 

235 

236 Args: 

237 context: ProcessingContext for current well 

238 steps_definition: List of AbstractStep objects 

239 orchestrator: Orchestrator instance for accessing all wells 

240 """ 

241 from openhcs.constants.constants import GroupBy, Backend 

242 

243 all_wells = orchestrator.get_component_keys(GroupBy.WELL) 

244 

245 vfs_config = context.get_vfs_config() 

246 

247 for step in steps_definition: 

248 step_plan = context.step_plans[step.step_id] 

249 

250 will_use_zarr = ( 

251 vfs_config.materialization_backend == MaterializationBackend.ZARR and 

252 steps_definition.index(step) == len(steps_definition) - 1 

253 ) 

254 

255 if will_use_zarr: 

256 step_plan["zarr_config"] = { 

257 "all_wells": all_wells, 

258 "needs_initialization": True 

259 } 

260 logger.debug(f"Step '{step.name}' will use zarr backend for well {context.well_id}") 

261 else: 

262 step_plan["zarr_config"] = None 

263 

264 @staticmethod 

265 def plan_materialization_flags_for_context( 

266 context: ProcessingContext, 

267 steps_definition: List[AbstractStep], 

268 orchestrator 

269 ) -> None: 

270 """ 

271 Plans and injects materialization flags into context.step_plans 

272 by calling MaterializationFlagPlanner. 

273 """ 

274 if context.is_frozen(): 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 raise AttributeError("Cannot plan materialization flags in a frozen ProcessingContext.") 

276 if not context.step_plans: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 logger.warning("step_plans is empty in context for materialization planning. This may be valid if pipeline is empty.") 

278 return 

279 

280 # MaterializationFlagPlanner.prepare_pipeline_flags now takes context and pipeline_definition 

281 # and modifies context.step_plans in-place. 

282 MaterializationFlagPlanner.prepare_pipeline_flags( 

283 context, 

284 steps_definition, 

285 orchestrator.plate_path 

286 ) 

287 

288 # Post-check (optional, but good for ensuring contracts are met by the planner) 

289 for step in steps_definition: 

290 step_id = step.step_id 

291 if step_id not in context.step_plans: 291 ↛ 293line 291 didn't jump to line 293 because the condition on line 291 was never true

292 # This should not happen if prepare_pipeline_flags guarantees plans for all steps 

293 logger.error(f"Step {step.name} (ID: {step_id}) missing from step_plans after materialization planning.") 

294 continue 

295 

296 plan = context.step_plans[step_id] 

297 # Check for keys that FunctionStep actually uses during execution 

298 required_keys = [READ_BACKEND, WRITE_BACKEND] 

299 if not all(k in plan for k in required_keys): 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 missing_keys = [k for k in required_keys if k not in plan] 

301 logger.error( 

302 f"Materialization flag planning incomplete for step {step.name} (ID: {step_id}). " 

303 f"Missing required keys: {missing_keys} (Clause 273)." 

304 ) 

305 

306 

307 @staticmethod 

308 def validate_memory_contracts_for_context( 

309 context: ProcessingContext, 

310 steps_definition: List[AbstractStep], 

311 orchestrator=None 

312 ) -> None: 

313 """ 

314 Validates FunctionStep memory contracts, dict patterns, and adds memory type info to context.step_plans. 

315 

316 Args: 

317 context: ProcessingContext to validate 

318 steps_definition: List of AbstractStep objects 

319 orchestrator: Optional orchestrator for dict pattern key validation 

320 """ 

321 if context.is_frozen(): 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true

322 raise AttributeError("Cannot validate memory contracts in a frozen ProcessingContext.") 

323 

324 # FuncStepContractValidator might need access to input/output_memory_type_hint from plan 

325 step_memory_types = FuncStepContractValidator.validate_pipeline( 

326 steps=steps_definition, 

327 pipeline_context=context, # Pass context so validator can access step plans for memory type overrides 

328 orchestrator=orchestrator # Pass orchestrator for dict pattern key validation 

329 ) 

330 

331 for step_id, memory_types in step_memory_types.items(): 

332 if "input_memory_type" not in memory_types or "output_memory_type" not in memory_types: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 step_name = context.step_plans[step_id]["step_name"] 

334 raise AssertionError( 

335 f"Memory type validation must set input/output_memory_type for FunctionStep {step_name} (ID: {step_id}) (Clause 101)." 

336 ) 

337 if step_id in context.step_plans: 337 ↛ 340line 337 didn't jump to line 340 because the condition on line 337 was always true

338 context.step_plans[step_id].update(memory_types) 

339 else: 

340 logger.warning(f"Step ID {step_id} found in memory_types but not in context.step_plans. Skipping.") 

341 

342 # Apply memory type override: Any step with disk output must use numpy for disk writing 

343 for i, step in enumerate(steps_definition): 

344 if isinstance(step, FunctionStep): 344 ↛ 343line 344 didn't jump to line 343 because the condition on line 344 was always true

345 step_id = step.step_id 

346 if step_id in context.step_plans: 346 ↛ 343line 346 didn't jump to line 343 because the condition on line 346 was always true

347 step_plan = context.step_plans[step_id] 

348 is_last_step = (i == len(steps_definition) - 1) 

349 write_backend = step_plan['write_backend'] 

350 

351 if write_backend == 'disk': 

352 logger.debug(f"Step {step.name} has disk output, overriding output_memory_type to numpy") 

353 step_plan['output_memory_type'] = 'numpy' 

354 

355 

356 

357 @staticmethod 

358 def assign_gpu_resources_for_context( 

359 context: ProcessingContext 

360 ) -> None: 

361 """ 

362 Validates GPU memory types from context.step_plans and assigns GPU device IDs. 

363 (Unchanged from previous version) 

364 """ 

365 if context.is_frozen(): 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true

366 raise AttributeError("Cannot assign GPU resources in a frozen ProcessingContext.") 

367 

368 gpu_assignments = GPUMemoryTypeValidator.validate_step_plans(context.step_plans) 

369 

370 for step_id, step_plan_val in context.step_plans.items(): # Renamed step_plan to step_plan_val to avoid conflict 

371 is_gpu_step = False 

372 input_type = step_plan_val["input_memory_type"] 

373 if input_type in VALID_GPU_MEMORY_TYPES: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 is_gpu_step = True 

375 

376 output_type = step_plan_val["output_memory_type"] 

377 if output_type in VALID_GPU_MEMORY_TYPES: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true

378 is_gpu_step = True 

379 

380 if is_gpu_step: 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was never true

381 # Ensure gpu_assignments has an entry for this step_id if it's a GPU step 

382 # And that entry contains a 'gpu_id' 

383 step_gpu_assignment = gpu_assignments[step_id] 

384 if "gpu_id" not in step_gpu_assignment: 

385 step_name = step_plan_val["step_name"] 

386 raise AssertionError( 

387 f"GPU validation must assign gpu_id for step {step_name} (ID: {step_id}) " 

388 f"with GPU memory types (Clause 295)." 

389 ) 

390 

391 for step_id, gpu_assignment in gpu_assignments.items(): 391 ↛ 392line 391 didn't jump to line 392 because the loop on line 391 never started

392 if step_id in context.step_plans: 

393 context.step_plans[step_id].update(gpu_assignment) 

394 else: 

395 logger.warning(f"Step ID {step_id} found in gpu_assignments but not in context.step_plans. Skipping.") 

396 

397 @staticmethod 

398 def apply_global_visualizer_override_for_context( 

399 context: ProcessingContext, 

400 global_enable_visualizer: bool 

401 ) -> None: 

402 """ 

403 Applies global visualizer override to all step_plans in the context. 

404 (Unchanged from previous version) 

405 """ 

406 if context.is_frozen(): 

407 raise AttributeError("Cannot apply visualizer override in a frozen ProcessingContext.") 

408 

409 if global_enable_visualizer: 

410 if not context.step_plans: return # Guard against empty step_plans 

411 for step_id, plan in context.step_plans.items(): 

412 plan["visualize"] = True 

413 logger.info(f"Global visualizer override: Step '{plan['step_name']}' marked for visualization.") 

414 

415 @staticmethod 

416 def resolve_lazy_dataclasses_for_context(context: ProcessingContext) -> None: 

417 """ 

418 Resolve all lazy dataclass instances in step plans to their base configurations. 

419 

420 This method should be called after all compilation phases but before context 

421 freezing to ensure step plans are safe for pickling in multiprocessing contexts. 

422 

423 Args: 

424 context: ProcessingContext to process 

425 """ 

426 from openhcs.core.config import get_base_type_for_lazy 

427 

428 def resolve_lazy_dataclass(obj: Any) -> Any: 

429 """Resolve lazy dataclass to base config if it's a lazy type, otherwise return as-is.""" 

430 obj_type = type(obj) 

431 if get_base_type_for_lazy(obj_type) is not None: 

432 # This is a lazy dataclass - resolve it to base config 

433 return obj.to_base_config() 

434 else: 

435 # Not a lazy dataclass - return as-is 

436 return obj 

437 

438 # Resolve all lazy dataclasses in step plans 

439 for step_id, step_plan in context.step_plans.items(): 

440 for key, value in step_plan.items(): 

441 step_plan[key] = resolve_lazy_dataclass(value) 

442 

443 @staticmethod 

444 def compile_pipelines( 

445 orchestrator, 

446 pipeline_definition: List[AbstractStep], 

447 well_filter: Optional[List[str]] = None, 

448 enable_visualizer_override: bool = False 

449 ) -> Dict[str, ProcessingContext]: 

450 """ 

451 Compile-all phase: Prepares frozen ProcessingContexts for each well. 

452 

453 This method iterates through the specified wells, creates a ProcessingContext 

454 for each, and invokes the various phases of the PipelineCompiler to populate 

455 the context's step_plans. After all compilation phases for a well are complete, 

456 its context is frozen. Finally, attributes are stripped from the pipeline_definition, 

457 making the step objects stateless for the execution phase. 

458 

459 Args: 

460 orchestrator: The PipelineOrchestrator instance to use for compilation 

461 pipeline_definition: The list of AbstractStep objects defining the pipeline. 

462 well_filter: Optional list of well IDs to process. If None, processes all found wells. 

463 enable_visualizer_override: If True, all steps in all compiled contexts 

464 will have their 'visualize' flag set to True. 

465 

466 Returns: 

467 A dictionary mapping well IDs to their compiled and frozen ProcessingContexts. 

468 The input `pipeline_definition` list (of step objects) is modified in-place 

469 to become stateless. 

470 """ 

471 from openhcs.constants.constants import GroupBy, OrchestratorState 

472 from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper 

473 

474 if not orchestrator.is_initialized(): 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true

475 raise RuntimeError("PipelineOrchestrator must be explicitly initialized before calling compile_pipelines().") 

476 

477 if not pipeline_definition: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true

478 raise ValueError("A valid pipeline definition (List[AbstractStep]) must be provided.") 

479 

480 try: 

481 compiled_contexts: Dict[str, ProcessingContext] = {} 

482 wells_to_process = orchestrator.get_component_keys(GroupBy.WELL, well_filter) 

483 

484 if not wells_to_process: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 logger.warning("No wells found to process based on filter.") 

486 return {} 

487 

488 logger.info(f"Starting compilation for wells: {', '.join(wells_to_process)}") 

489 

490 # Determine responsible well for metadata creation (lexicographically first) 

491 responsible_well = sorted(wells_to_process)[0] if wells_to_process else None 

492 logger.debug(f"Designated responsible well for metadata creation: {responsible_well}") 

493 

494 for well_id in wells_to_process: 

495 logger.debug(f"Compiling for well: {well_id}") 

496 context = orchestrator.create_context(well_id) 

497 

498 # Determine if this well is responsible for metadata creation 

499 is_responsible = (well_id == responsible_well) 

500 logger.debug(f"Well {well_id} metadata responsibility: {is_responsible}") 

501 

502 PipelineCompiler.initialize_step_plans_for_context(context, pipeline_definition, orchestrator, metadata_writer=is_responsible, plate_path=orchestrator.plate_path) 

503 PipelineCompiler.declare_zarr_stores_for_context(context, pipeline_definition, orchestrator) 

504 PipelineCompiler.plan_materialization_flags_for_context(context, pipeline_definition, orchestrator) 

505 PipelineCompiler.validate_memory_contracts_for_context(context, pipeline_definition, orchestrator) 

506 PipelineCompiler.assign_gpu_resources_for_context(context) 

507 

508 if enable_visualizer_override: 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true

509 PipelineCompiler.apply_global_visualizer_override_for_context(context, True) 

510 

511 # Resolve all lazy dataclasses before freezing to ensure multiprocessing compatibility 

512 PipelineCompiler.resolve_lazy_dataclasses_for_context(context) 

513 

514 context.freeze() 

515 compiled_contexts[well_id] = context 

516 logger.debug(f"Compilation finished for well: {well_id}") 

517 

518 # After processing all wells, strip attributes and finalize 

519 logger.info("Stripping attributes from pipeline definition steps.") 

520 StepAttributeStripper.strip_step_attributes(pipeline_definition, {}) 

521 

522 orchestrator._state = OrchestratorState.COMPILED 

523 logger.info(f"Plate compilation finished for {len(compiled_contexts)} wells.") 

524 return compiled_contexts 

525 except Exception as e: 

526 orchestrator._state = OrchestratorState.COMPILE_FAILED 

527 logger.error(f"Failed to compile pipelines: {e}") 

528 raise 

529 

530 @staticmethod 

531 def update_step_ids_for_multiprocessing( 

532 context: ProcessingContext, 

533 steps_definition: List[AbstractStep] 

534 ) -> None: 

535 """ 

536 Updates step IDs in a frozen context after multiprocessing pickle/unpickle. 

537 

538 When contexts are pickled/unpickled for multiprocessing, step objects get 

539 new memory addresses, changing their IDs. This method remaps the step_plans 

540 from old IDs to new IDs while preserving all plan data. 

541 

542 SPECIAL PRIVILEGE: This method can modify frozen contexts since it's part 

543 of the compilation process and maintains data integrity. 

544  

545 Args: 

546 context: Frozen ProcessingContext with old step IDs 

547 steps_definition: Step objects with new IDs after pickle/unpickle 

548 """ 

549 if not context.is_frozen(): 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true

550 logger.warning("update_step_ids_for_multiprocessing called on unfrozen context - skipping") 

551 return 

552 

553 # Create mapping from old step positions to new step IDs 

554 if len(steps_definition) != len(context.step_plans): 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true

555 raise RuntimeError( 

556 f"Step count mismatch: {len(steps_definition)} steps vs {len(context.step_plans)} plans. " 

557 f"Cannot safely remap step IDs." 

558 ) 

559 

560 # Get old step IDs in order (assuming same order as steps_definition) 

561 old_step_ids = list(context.step_plans.keys()) 

562 

563 # Generate new step IDs using get_step_id (handles stripped step objects) 

564 from openhcs.core.steps.abstract import get_step_id 

565 new_step_ids = [get_step_id(step) for step in steps_definition] 

566 

567 logger.debug(f"Remapping step IDs for multiprocessing:") 

568 for old_id, new_id in zip(old_step_ids, new_step_ids): 

569 logger.debug(f" {old_id}{new_id}") 

570 

571 # Create new step_plans dict with updated IDs 

572 new_step_plans = {} 

573 for old_id, new_id in zip(old_step_ids, new_step_ids): 

574 new_step_plans[new_id] = context.step_plans[old_id].copy() 

575 

576 # SPECIAL PRIVILEGE: Temporarily unfreeze to update step_plans, then refreeze 

577 object.__setattr__(context, '_is_frozen', False) 

578 try: 

579 context.step_plans = new_step_plans 

580 logger.info(f"Updated {len(new_step_plans)} step plans for multiprocessing compatibility") 

581 finally: 

582 object.__setattr__(context, '_is_frozen', True) 

583 

584# The monolithic compile() method is removed. 

585# Orchestrator will call the static methods above in sequence. 

586# _strip_step_attributes is also removed as StepAttributeStripper is called by Orchestrator. 

587 

588 

589def _resolve_step_well_filters(steps_definition: List[AbstractStep], context, orchestrator): 

590 """ 

591 Resolve well filters for steps with materialization configs. 

592 

593 This function handles step-level well filtering by resolving patterns like 

594 "row:A", ["A01", "B02"], or max counts against the available wells for the plate. 

595 

596 Args: 

597 steps_definition: List of pipeline steps 

598 context: Processing context for the current well 

599 orchestrator: Orchestrator instance with access to available wells 

600 """ 

601 from openhcs.core.utils import WellFilterProcessor 

602 

603 # Get available wells from orchestrator using correct method 

604 from openhcs.constants.constants import GroupBy 

605 available_wells = orchestrator.get_component_keys(GroupBy.WELL) 

606 if not available_wells: 606 ↛ 607line 606 didn't jump to line 607 because the condition on line 606 was never true

607 logger.warning("No available wells found for well filter resolution") 

608 return 

609 

610 # Initialize step_well_filters in context if not present 

611 if not hasattr(context, 'step_well_filters'): 611 ↛ 615line 611 didn't jump to line 615 because the condition on line 611 was always true

612 context.step_well_filters = {} 

613 

614 # Process each step that has materialization config with well filter 

615 for step in steps_definition: 

616 if (hasattr(step, 'materialization_config') and 

617 step.materialization_config and 

618 step.materialization_config.well_filter is not None): 

619 

620 try: 

621 # Resolve the well filter pattern to concrete well IDs 

622 resolved_wells = WellFilterProcessor.resolve_compilation_filter( 

623 step.materialization_config.well_filter, 

624 available_wells 

625 ) 

626 

627 # Store resolved wells in context for path planner 

628 # Use structure expected by path planner 

629 context.step_well_filters[step.step_id] = { 

630 'resolved_wells': sorted(resolved_wells), 

631 'filter_mode': step.materialization_config.well_filter_mode, 

632 'original_filter': step.materialization_config.well_filter 

633 } 

634 

635 logger.debug(f"Step '{step.name}' well filter '{step.materialization_config.well_filter}' " 

636 f"resolved to {len(resolved_wells)} wells: {sorted(resolved_wells)}") 

637 

638 except Exception as e: 

639 logger.error(f"Failed to resolve well filter for step '{step.name}': {e}") 

640 raise ValueError(f"Invalid well filter '{step.materialization_config.well_filter}' " 

641 f"for step '{step.name}': {e}") 

642 

643 logger.debug(f"Well filter resolution complete. {len(context.step_well_filters)} steps have well filters.")