Coverage for openhcs/core/pipeline/compiler.py: 73.7%
252 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Pipeline module for OpenHCS.
4This module provides the core pipeline compilation components for OpenHCS.
5The PipelineCompiler is responsible for preparing step_plans within a ProcessingContext.
7Doctrinal Clauses:
8- Clause 12 — Absolute Clean Execution
9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath)
10- Clause 17-B — Path Format Discipline
11- Clause 66 — Immutability After Construction
12- Clause 88 — No Inferred Capabilities
13- Clause 101 — Memory Type Declaration
14- Clause 245 — Path Declaration
15- Clause 273 — Backend Authorization Doctrine
16- Clause 281 — Context-Bound Identifiers
17- Clause 293 — GPU Pre-Declaration Enforcement
18- Clause 295 — GPU Scheduling Affinity
19- Clause 504 — Pipeline Preparation Modifications
20- Clause 524 — Step = Declaration = ID = Runtime Authority
21"""
23import inspect
24import logging
25import json
26from pathlib import Path
27from typing import Any, Dict, List, Optional, Union # Callable removed
28from collections import OrderedDict # For special_outputs and special_inputs order (used by PathPlanner)
30from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES, READ_BACKEND, WRITE_BACKEND, Backend
31from openhcs.core.context.processing_context import ProcessingContext
32from openhcs.core.config import MaterializationBackend, PathPlanningConfig
33from openhcs.core.pipeline.funcstep_contract_validator import \
34 FuncStepContractValidator
35from openhcs.core.pipeline.materialization_flag_planner import \
36 MaterializationFlagPlanner
37from openhcs.core.pipeline.path_planner import PipelinePathPlanner
38from openhcs.core.pipeline.gpu_memory_validator import \
39 GPUMemoryTypeValidator
40from openhcs.core.steps.abstract import AbstractStep
41from openhcs.core.steps.function_step import FunctionStep # Used for isinstance check
43logger = logging.getLogger(__name__)
46def _normalize_step_attributes(pipeline_definition: List[AbstractStep]) -> None:
47 """Backwards compatibility: Set missing step attributes to constructor defaults."""
48 sig = inspect.signature(AbstractStep.__init__)
49 defaults = {name: param.default for name, param in sig.parameters.items()
50 if name != 'self' and param.default != inspect.Parameter.empty}
52 for step in pipeline_definition:
53 for attr_name, default_value in defaults.items():
54 if not hasattr(step, attr_name): 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true
55 setattr(step, attr_name, default_value)
58class PipelineCompiler:
59 """
60 Compiles a pipeline by populating step plans within a ProcessingContext.
62 This class provides static methods that are called sequentially by the
63 PipelineOrchestrator for each well's ProcessingContext. Each method
64 is responsible for a specific part of the compilation process, such as
65 path planning, special I/O resolution, materialization flag setting,
66 memory contract validation, and GPU resource assignment.
67 """
69 @staticmethod
70 def initialize_step_plans_for_context(
71 context: ProcessingContext,
72 steps_definition: List[AbstractStep],
73 orchestrator,
74 metadata_writer: bool = False,
75 plate_path: Optional[Path] = None
76 # base_input_dir and well_id parameters removed, will use from context
77 ) -> None:
78 """
79 Initializes step_plans by calling PipelinePathPlanner.prepare_pipeline_paths,
80 which handles primary paths, special I/O path planning and linking, and chainbreaker status.
81 Then, this method supplements the plans with non-I/O FunctionStep-specific attributes.
83 Args:
84 context: ProcessingContext to initialize step plans for
85 steps_definition: List of AbstractStep objects defining the pipeline
86 orchestrator: Orchestrator instance for well filter resolution
87 metadata_writer: If True, this well is responsible for creating OpenHCS metadata files
88 plate_path: Path to plate root for zarr conversion detection
89 """
90 if context.is_frozen(): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 raise AttributeError("Cannot initialize step plans in a frozen ProcessingContext.")
93 if not hasattr(context, 'step_plans') or context.step_plans is None: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true
94 context.step_plans = {} # Ensure step_plans dict exists
96 # === BACKWARDS COMPATIBILITY PREPROCESSING ===
97 # Ensure all steps have complete attribute sets based on AbstractStep constructor
98 # This must happen before any other compilation logic to eliminate defensive programming
99 logger.debug("🔧 BACKWARDS COMPATIBILITY: Normalizing step attributes...")
100 _normalize_step_attributes(steps_definition)
102 # === WELL FILTER RESOLUTION ===
103 # Resolve well filters for steps with materialization configs
104 # This must happen after normalization to ensure materialization_config exists
105 logger.debug("🎯 WELL FILTER RESOLUTION: Resolving step well filters...")
106 _resolve_step_well_filters(steps_definition, context, orchestrator)
108 # Pre-initialize step_plans with basic entries for each step
109 # This ensures step_plans is not empty when path planner checks it
110 for step in steps_definition:
111 if step.step_id not in context.step_plans: 111 ↛ 110line 111 didn't jump to line 110 because the condition on line 111 was always true
112 context.step_plans[step.step_id] = {
113 "step_name": step.name,
114 "step_type": step.__class__.__name__,
115 "well_id": context.well_id,
116 }
118 # === INPUT CONVERSION DETECTION ===
119 # Check if first step needs zarr conversion
120 if steps_definition and plate_path: 120 ↛ 146line 120 didn't jump to line 146 because the condition on line 120 was always true
121 first_step = steps_definition[0]
122 vfs_config = context.get_vfs_config()
124 # Only convert if default materialization backend is ZARR
125 wants_zarr_conversion = (
126 vfs_config.materialization_backend == MaterializationBackend.ZARR
127 )
129 if wants_zarr_conversion:
130 # Check if input plate is already zarr format
131 available_backends = context.microscope_handler.get_available_backends(plate_path)
132 already_zarr = Backend.ZARR in available_backends
134 if not already_zarr: 134 ↛ 146line 134 didn't jump to line 146 because the condition on line 134 was always true
135 # Inject input conversion config using existing PathPlanningConfig pattern
136 path_config = context.get_path_planning_config()
137 conversion_config = PathPlanningConfig(
138 output_dir_suffix="", # No suffix - write to plate root
139 global_output_folder=plate_path.parent, # Parent of plate
140 sub_dir=path_config.sub_dir # Use same sub_dir (e.g., "images")
141 )
142 context.step_plans[first_step.step_id]["input_conversion_config"] = conversion_config
143 logger.debug(f"Input conversion to zarr enabled for first step: {first_step.name}")
145 # The well_id and base_input_dir are available from the context object.
146 PipelinePathPlanner.prepare_pipeline_paths(
147 context,
148 steps_definition
149 )
151 # Loop to supplement step_plans with non-I/O, non-path attributes
152 # after PipelinePathPlanner has fully populated them with I/O info.
153 for step in steps_definition:
154 step_id = step.step_id
155 if step_id not in context.step_plans: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 logger.error(
157 f"Critical error: Step {step.name} (ID: {step_id}) "
158 f"not found in step_plans after path planning phase. Clause 504."
159 )
160 # Create a minimal error plan
161 context.step_plans[step_id] = {
162 "step_name": step.name,
163 "step_type": step.__class__.__name__,
164 "well_id": context.well_id, # Use context.well_id
165 "error": "Missing from path planning phase by PipelinePathPlanner",
166 "create_openhcs_metadata": metadata_writer # Set metadata writer responsibility flag
167 }
168 continue
170 current_plan = context.step_plans[step_id]
172 # Ensure basic metadata (PathPlanner should set most of this)
173 current_plan["step_name"] = step.name
174 current_plan["step_type"] = step.__class__.__name__
175 current_plan["well_id"] = context.well_id # Use context.well_id; PathPlanner should also use context.well_id
176 current_plan.setdefault("visualize", False) # Ensure visualize key exists
177 current_plan["create_openhcs_metadata"] = metadata_writer # Set metadata writer responsibility flag
179 # The special_outputs and special_inputs are now fully handled by PipelinePathPlanner.
180 # The block for planning special_outputs (lines 134-148 in original) is removed.
181 # Ensure these keys exist as OrderedDicts if PathPlanner doesn't guarantee it
182 # (PathPlanner currently creates them as dicts, OrderedDict might not be strictly needed here anymore)
183 current_plan.setdefault("special_inputs", OrderedDict())
184 current_plan.setdefault("special_outputs", OrderedDict())
185 current_plan.setdefault("chainbreaker", False) # PathPlanner now sets this.
187 # Add step-specific attributes (non-I/O, non-path related)
188 current_plan["variable_components"] = step.variable_components
189 current_plan["group_by"] = step.group_by
191 # Store materialization_config if present
192 if step.materialization_config is not None:
193 current_plan["materialization_config"] = step.materialization_config
195 # Add FunctionStep specific attributes
196 if isinstance(step, FunctionStep): 196 ↛ 153line 196 didn't jump to line 153 because the condition on line 196 was always true
198 # 🎯 SEMANTIC COHERENCE FIX: Prevent group_by/variable_components conflict
199 # When variable_components contains the same value as group_by,
200 # set group_by to None to avoid EZStitcher heritage rule violation
201 if (step.variable_components and step.group_by and 201 ↛ 203line 201 didn't jump to line 203 because the condition on line 201 was never true
202 step.group_by in step.variable_components):
203 logger.debug(f"Step {step.name}: Detected group_by='{step.group_by}' in variable_components={step.variable_components}. "
204 f"Setting group_by=None to maintain semantic coherence.")
205 current_plan["group_by"] = None
207 # func attribute is guaranteed in FunctionStep.__init__
208 current_plan["func_name"] = getattr(step.func, '__name__', str(step.func))
210 # Memory type hints from step instance (set in FunctionStep.__init__ if provided)
211 # These are initial hints; FuncStepContractValidator will set final types.
212 if hasattr(step, 'input_memory_type_hint'): # From FunctionStep.__init__ 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 current_plan['input_memory_type_hint'] = step.input_memory_type_hint
214 if hasattr(step, 'output_memory_type_hint'): # From FunctionStep.__init__ 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 current_plan['output_memory_type_hint'] = step.output_memory_type_hint
217 # The resolve_special_input_paths_for_context static method is DELETED (lines 181-238 of original)
218 # as this functionality is now handled by PipelinePathPlanner.prepare_pipeline_paths.
220 # _prepare_materialization_flags is removed as MaterializationFlagPlanner.prepare_pipeline_flags
221 # now modifies context.step_plans in-place and takes context directly.
223 @staticmethod
224 def declare_zarr_stores_for_context(
225 context: ProcessingContext,
226 steps_definition: List[AbstractStep],
227 orchestrator
228 ) -> None:
229 """
230 Declare zarr store creation functions for runtime execution.
232 This method runs after path planning but before materialization flag planning
233 to declare which steps need zarr stores and provide the metadata needed
234 for runtime store creation.
236 Args:
237 context: ProcessingContext for current well
238 steps_definition: List of AbstractStep objects
239 orchestrator: Orchestrator instance for accessing all wells
240 """
241 from openhcs.constants.constants import GroupBy, Backend
243 all_wells = orchestrator.get_component_keys(GroupBy.WELL)
245 vfs_config = context.get_vfs_config()
247 for step in steps_definition:
248 step_plan = context.step_plans[step.step_id]
250 will_use_zarr = (
251 vfs_config.materialization_backend == MaterializationBackend.ZARR and
252 steps_definition.index(step) == len(steps_definition) - 1
253 )
255 if will_use_zarr:
256 step_plan["zarr_config"] = {
257 "all_wells": all_wells,
258 "needs_initialization": True
259 }
260 logger.debug(f"Step '{step.name}' will use zarr backend for well {context.well_id}")
261 else:
262 step_plan["zarr_config"] = None
264 @staticmethod
265 def plan_materialization_flags_for_context(
266 context: ProcessingContext,
267 steps_definition: List[AbstractStep],
268 orchestrator
269 ) -> None:
270 """
271 Plans and injects materialization flags into context.step_plans
272 by calling MaterializationFlagPlanner.
273 """
274 if context.is_frozen(): 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 raise AttributeError("Cannot plan materialization flags in a frozen ProcessingContext.")
276 if not context.step_plans: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 logger.warning("step_plans is empty in context for materialization planning. This may be valid if pipeline is empty.")
278 return
280 # MaterializationFlagPlanner.prepare_pipeline_flags now takes context and pipeline_definition
281 # and modifies context.step_plans in-place.
282 MaterializationFlagPlanner.prepare_pipeline_flags(
283 context,
284 steps_definition,
285 orchestrator.plate_path
286 )
288 # Post-check (optional, but good for ensuring contracts are met by the planner)
289 for step in steps_definition:
290 step_id = step.step_id
291 if step_id not in context.step_plans: 291 ↛ 293line 291 didn't jump to line 293 because the condition on line 291 was never true
292 # This should not happen if prepare_pipeline_flags guarantees plans for all steps
293 logger.error(f"Step {step.name} (ID: {step_id}) missing from step_plans after materialization planning.")
294 continue
296 plan = context.step_plans[step_id]
297 # Check for keys that FunctionStep actually uses during execution
298 required_keys = [READ_BACKEND, WRITE_BACKEND]
299 if not all(k in plan for k in required_keys): 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 missing_keys = [k for k in required_keys if k not in plan]
301 logger.error(
302 f"Materialization flag planning incomplete for step {step.name} (ID: {step_id}). "
303 f"Missing required keys: {missing_keys} (Clause 273)."
304 )
307 @staticmethod
308 def validate_memory_contracts_for_context(
309 context: ProcessingContext,
310 steps_definition: List[AbstractStep],
311 orchestrator=None
312 ) -> None:
313 """
314 Validates FunctionStep memory contracts, dict patterns, and adds memory type info to context.step_plans.
316 Args:
317 context: ProcessingContext to validate
318 steps_definition: List of AbstractStep objects
319 orchestrator: Optional orchestrator for dict pattern key validation
320 """
321 if context.is_frozen(): 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true
322 raise AttributeError("Cannot validate memory contracts in a frozen ProcessingContext.")
324 # FuncStepContractValidator might need access to input/output_memory_type_hint from plan
325 step_memory_types = FuncStepContractValidator.validate_pipeline(
326 steps=steps_definition,
327 pipeline_context=context, # Pass context so validator can access step plans for memory type overrides
328 orchestrator=orchestrator # Pass orchestrator for dict pattern key validation
329 )
331 for step_id, memory_types in step_memory_types.items():
332 if "input_memory_type" not in memory_types or "output_memory_type" not in memory_types: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 step_name = context.step_plans[step_id]["step_name"]
334 raise AssertionError(
335 f"Memory type validation must set input/output_memory_type for FunctionStep {step_name} (ID: {step_id}) (Clause 101)."
336 )
337 if step_id in context.step_plans: 337 ↛ 340line 337 didn't jump to line 340 because the condition on line 337 was always true
338 context.step_plans[step_id].update(memory_types)
339 else:
340 logger.warning(f"Step ID {step_id} found in memory_types but not in context.step_plans. Skipping.")
342 # Apply memory type override: Any step with disk output must use numpy for disk writing
343 for i, step in enumerate(steps_definition):
344 if isinstance(step, FunctionStep): 344 ↛ 343line 344 didn't jump to line 343 because the condition on line 344 was always true
345 step_id = step.step_id
346 if step_id in context.step_plans: 346 ↛ 343line 346 didn't jump to line 343 because the condition on line 346 was always true
347 step_plan = context.step_plans[step_id]
348 is_last_step = (i == len(steps_definition) - 1)
349 write_backend = step_plan['write_backend']
351 if write_backend == 'disk':
352 logger.debug(f"Step {step.name} has disk output, overriding output_memory_type to numpy")
353 step_plan['output_memory_type'] = 'numpy'
357 @staticmethod
358 def assign_gpu_resources_for_context(
359 context: ProcessingContext
360 ) -> None:
361 """
362 Validates GPU memory types from context.step_plans and assigns GPU device IDs.
363 (Unchanged from previous version)
364 """
365 if context.is_frozen(): 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true
366 raise AttributeError("Cannot assign GPU resources in a frozen ProcessingContext.")
368 gpu_assignments = GPUMemoryTypeValidator.validate_step_plans(context.step_plans)
370 for step_id, step_plan_val in context.step_plans.items(): # Renamed step_plan to step_plan_val to avoid conflict
371 is_gpu_step = False
372 input_type = step_plan_val["input_memory_type"]
373 if input_type in VALID_GPU_MEMORY_TYPES: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 is_gpu_step = True
376 output_type = step_plan_val["output_memory_type"]
377 if output_type in VALID_GPU_MEMORY_TYPES: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true
378 is_gpu_step = True
380 if is_gpu_step: 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was never true
381 # Ensure gpu_assignments has an entry for this step_id if it's a GPU step
382 # And that entry contains a 'gpu_id'
383 step_gpu_assignment = gpu_assignments[step_id]
384 if "gpu_id" not in step_gpu_assignment:
385 step_name = step_plan_val["step_name"]
386 raise AssertionError(
387 f"GPU validation must assign gpu_id for step {step_name} (ID: {step_id}) "
388 f"with GPU memory types (Clause 295)."
389 )
391 for step_id, gpu_assignment in gpu_assignments.items(): 391 ↛ 392line 391 didn't jump to line 392 because the loop on line 391 never started
392 if step_id in context.step_plans:
393 context.step_plans[step_id].update(gpu_assignment)
394 else:
395 logger.warning(f"Step ID {step_id} found in gpu_assignments but not in context.step_plans. Skipping.")
397 @staticmethod
398 def apply_global_visualizer_override_for_context(
399 context: ProcessingContext,
400 global_enable_visualizer: bool
401 ) -> None:
402 """
403 Applies global visualizer override to all step_plans in the context.
404 (Unchanged from previous version)
405 """
406 if context.is_frozen():
407 raise AttributeError("Cannot apply visualizer override in a frozen ProcessingContext.")
409 if global_enable_visualizer:
410 if not context.step_plans: return # Guard against empty step_plans
411 for step_id, plan in context.step_plans.items():
412 plan["visualize"] = True
413 logger.info(f"Global visualizer override: Step '{plan['step_name']}' marked for visualization.")
415 @staticmethod
416 def resolve_lazy_dataclasses_for_context(context: ProcessingContext) -> None:
417 """
418 Resolve all lazy dataclass instances in step plans to their base configurations.
420 This method should be called after all compilation phases but before context
421 freezing to ensure step plans are safe for pickling in multiprocessing contexts.
423 Args:
424 context: ProcessingContext to process
425 """
426 from openhcs.core.config import get_base_type_for_lazy
428 def resolve_lazy_dataclass(obj: Any) -> Any:
429 """Resolve lazy dataclass to base config if it's a lazy type, otherwise return as-is."""
430 obj_type = type(obj)
431 if get_base_type_for_lazy(obj_type) is not None:
432 # This is a lazy dataclass - resolve it to base config
433 return obj.to_base_config()
434 else:
435 # Not a lazy dataclass - return as-is
436 return obj
438 # Resolve all lazy dataclasses in step plans
439 for step_id, step_plan in context.step_plans.items():
440 for key, value in step_plan.items():
441 step_plan[key] = resolve_lazy_dataclass(value)
443 @staticmethod
444 def compile_pipelines(
445 orchestrator,
446 pipeline_definition: List[AbstractStep],
447 well_filter: Optional[List[str]] = None,
448 enable_visualizer_override: bool = False
449 ) -> Dict[str, ProcessingContext]:
450 """
451 Compile-all phase: Prepares frozen ProcessingContexts for each well.
453 This method iterates through the specified wells, creates a ProcessingContext
454 for each, and invokes the various phases of the PipelineCompiler to populate
455 the context's step_plans. After all compilation phases for a well are complete,
456 its context is frozen. Finally, attributes are stripped from the pipeline_definition,
457 making the step objects stateless for the execution phase.
459 Args:
460 orchestrator: The PipelineOrchestrator instance to use for compilation
461 pipeline_definition: The list of AbstractStep objects defining the pipeline.
462 well_filter: Optional list of well IDs to process. If None, processes all found wells.
463 enable_visualizer_override: If True, all steps in all compiled contexts
464 will have their 'visualize' flag set to True.
466 Returns:
467 A dictionary mapping well IDs to their compiled and frozen ProcessingContexts.
468 The input `pipeline_definition` list (of step objects) is modified in-place
469 to become stateless.
470 """
471 from openhcs.constants.constants import GroupBy, OrchestratorState
472 from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper
474 if not orchestrator.is_initialized(): 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true
475 raise RuntimeError("PipelineOrchestrator must be explicitly initialized before calling compile_pipelines().")
477 if not pipeline_definition: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true
478 raise ValueError("A valid pipeline definition (List[AbstractStep]) must be provided.")
480 try:
481 compiled_contexts: Dict[str, ProcessingContext] = {}
482 wells_to_process = orchestrator.get_component_keys(GroupBy.WELL, well_filter)
484 if not wells_to_process: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 logger.warning("No wells found to process based on filter.")
486 return {}
488 logger.info(f"Starting compilation for wells: {', '.join(wells_to_process)}")
490 # Determine responsible well for metadata creation (lexicographically first)
491 responsible_well = sorted(wells_to_process)[0] if wells_to_process else None
492 logger.debug(f"Designated responsible well for metadata creation: {responsible_well}")
494 for well_id in wells_to_process:
495 logger.debug(f"Compiling for well: {well_id}")
496 context = orchestrator.create_context(well_id)
498 # Determine if this well is responsible for metadata creation
499 is_responsible = (well_id == responsible_well)
500 logger.debug(f"Well {well_id} metadata responsibility: {is_responsible}")
502 PipelineCompiler.initialize_step_plans_for_context(context, pipeline_definition, orchestrator, metadata_writer=is_responsible, plate_path=orchestrator.plate_path)
503 PipelineCompiler.declare_zarr_stores_for_context(context, pipeline_definition, orchestrator)
504 PipelineCompiler.plan_materialization_flags_for_context(context, pipeline_definition, orchestrator)
505 PipelineCompiler.validate_memory_contracts_for_context(context, pipeline_definition, orchestrator)
506 PipelineCompiler.assign_gpu_resources_for_context(context)
508 if enable_visualizer_override: 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true
509 PipelineCompiler.apply_global_visualizer_override_for_context(context, True)
511 # Resolve all lazy dataclasses before freezing to ensure multiprocessing compatibility
512 PipelineCompiler.resolve_lazy_dataclasses_for_context(context)
514 context.freeze()
515 compiled_contexts[well_id] = context
516 logger.debug(f"Compilation finished for well: {well_id}")
518 # After processing all wells, strip attributes and finalize
519 logger.info("Stripping attributes from pipeline definition steps.")
520 StepAttributeStripper.strip_step_attributes(pipeline_definition, {})
522 orchestrator._state = OrchestratorState.COMPILED
523 logger.info(f"Plate compilation finished for {len(compiled_contexts)} wells.")
524 return compiled_contexts
525 except Exception as e:
526 orchestrator._state = OrchestratorState.COMPILE_FAILED
527 logger.error(f"Failed to compile pipelines: {e}")
528 raise
530 @staticmethod
531 def update_step_ids_for_multiprocessing(
532 context: ProcessingContext,
533 steps_definition: List[AbstractStep]
534 ) -> None:
535 """
536 Updates step IDs in a frozen context after multiprocessing pickle/unpickle.
538 When contexts are pickled/unpickled for multiprocessing, step objects get
539 new memory addresses, changing their IDs. This method remaps the step_plans
540 from old IDs to new IDs while preserving all plan data.
542 SPECIAL PRIVILEGE: This method can modify frozen contexts since it's part
543 of the compilation process and maintains data integrity.
545 Args:
546 context: Frozen ProcessingContext with old step IDs
547 steps_definition: Step objects with new IDs after pickle/unpickle
548 """
549 if not context.is_frozen(): 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true
550 logger.warning("update_step_ids_for_multiprocessing called on unfrozen context - skipping")
551 return
553 # Create mapping from old step positions to new step IDs
554 if len(steps_definition) != len(context.step_plans): 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true
555 raise RuntimeError(
556 f"Step count mismatch: {len(steps_definition)} steps vs {len(context.step_plans)} plans. "
557 f"Cannot safely remap step IDs."
558 )
560 # Get old step IDs in order (assuming same order as steps_definition)
561 old_step_ids = list(context.step_plans.keys())
563 # Generate new step IDs using get_step_id (handles stripped step objects)
564 from openhcs.core.steps.abstract import get_step_id
565 new_step_ids = [get_step_id(step) for step in steps_definition]
567 logger.debug(f"Remapping step IDs for multiprocessing:")
568 for old_id, new_id in zip(old_step_ids, new_step_ids):
569 logger.debug(f" {old_id} → {new_id}")
571 # Create new step_plans dict with updated IDs
572 new_step_plans = {}
573 for old_id, new_id in zip(old_step_ids, new_step_ids):
574 new_step_plans[new_id] = context.step_plans[old_id].copy()
576 # SPECIAL PRIVILEGE: Temporarily unfreeze to update step_plans, then refreeze
577 object.__setattr__(context, '_is_frozen', False)
578 try:
579 context.step_plans = new_step_plans
580 logger.info(f"Updated {len(new_step_plans)} step plans for multiprocessing compatibility")
581 finally:
582 object.__setattr__(context, '_is_frozen', True)
584# The monolithic compile() method is removed.
585# Orchestrator will call the static methods above in sequence.
586# _strip_step_attributes is also removed as StepAttributeStripper is called by Orchestrator.
589def _resolve_step_well_filters(steps_definition: List[AbstractStep], context, orchestrator):
590 """
591 Resolve well filters for steps with materialization configs.
593 This function handles step-level well filtering by resolving patterns like
594 "row:A", ["A01", "B02"], or max counts against the available wells for the plate.
596 Args:
597 steps_definition: List of pipeline steps
598 context: Processing context for the current well
599 orchestrator: Orchestrator instance with access to available wells
600 """
601 from openhcs.core.utils import WellFilterProcessor
603 # Get available wells from orchestrator using correct method
604 from openhcs.constants.constants import GroupBy
605 available_wells = orchestrator.get_component_keys(GroupBy.WELL)
606 if not available_wells: 606 ↛ 607line 606 didn't jump to line 607 because the condition on line 606 was never true
607 logger.warning("No available wells found for well filter resolution")
608 return
610 # Initialize step_well_filters in context if not present
611 if not hasattr(context, 'step_well_filters'): 611 ↛ 615line 611 didn't jump to line 615 because the condition on line 611 was always true
612 context.step_well_filters = {}
614 # Process each step that has materialization config with well filter
615 for step in steps_definition:
616 if (hasattr(step, 'materialization_config') and
617 step.materialization_config and
618 step.materialization_config.well_filter is not None):
620 try:
621 # Resolve the well filter pattern to concrete well IDs
622 resolved_wells = WellFilterProcessor.resolve_compilation_filter(
623 step.materialization_config.well_filter,
624 available_wells
625 )
627 # Store resolved wells in context for path planner
628 # Use structure expected by path planner
629 context.step_well_filters[step.step_id] = {
630 'resolved_wells': sorted(resolved_wells),
631 'filter_mode': step.materialization_config.well_filter_mode,
632 'original_filter': step.materialization_config.well_filter
633 }
635 logger.debug(f"Step '{step.name}' well filter '{step.materialization_config.well_filter}' "
636 f"resolved to {len(resolved_wells)} wells: {sorted(resolved_wells)}")
638 except Exception as e:
639 logger.error(f"Failed to resolve well filter for step '{step.name}': {e}")
640 raise ValueError(f"Invalid well filter '{step.materialization_config.well_filter}' "
641 f"for step '{step.name}': {e}")
643 logger.debug(f"Well filter resolution complete. {len(context.step_well_filters)} steps have well filters.")