Coverage for openhcs/core/pipeline/compiler.py: 75.5%

468 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Pipeline module for OpenHCS. 

3 

4This module provides the core pipeline compilation components for OpenHCS. 

5The PipelineCompiler is responsible for preparing step_plans within a ProcessingContext. 

6 

7Doctrinal Clauses: 

8- Clause 12 — Absolute Clean Execution 

9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath) 

10- Clause 17-B — Path Format Discipline 

11- Clause 66 — Immutability After Construction 

12- Clause 88 — No Inferred Capabilities 

13- Clause 101 — Memory Type Declaration 

14- Clause 245 — Path Declaration 

15- Clause 273 — Backend Authorization Doctrine 

16- Clause 281 — Context-Bound Identifiers 

17- Clause 293 — GPU Pre-Declaration Enforcement 

18- Clause 295 — GPU Scheduling Affinity 

19- Clause 504 — Pipeline Preparation Modifications 

20- Clause 524 — Step = Declaration = ID = Runtime Authority 

21""" 

22 

23import inspect 

24import logging 

25import dataclasses 

26from pathlib import Path 

27from typing import Callable, Dict, List, Optional 

28from collections import OrderedDict # For special_outputs and special_inputs order (used by PathPlanner) 

29 

30from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES, READ_BACKEND, WRITE_BACKEND, Backend 

31from openhcs.core.context.processing_context import ProcessingContext 

32from openhcs.core.config import MaterializationBackend, PathPlanningConfig, WellFilterMode 

33from openhcs.core.pipeline.funcstep_contract_validator import \ 

34 FuncStepContractValidator 

35from openhcs.core.pipeline.materialization_flag_planner import \ 

36 MaterializationFlagPlanner 

37from openhcs.core.pipeline.path_planner import PipelinePathPlanner 

38from openhcs.core.pipeline.gpu_memory_validator import \ 

39 GPUMemoryTypeValidator 

40from openhcs.core.steps.abstract import AbstractStep 

41from openhcs.core.steps.function_step import FunctionStep # Used for isinstance check 

42from dataclasses import dataclass 

43logger = logging.getLogger(__name__) 

44 

45 

46@dataclass(frozen=True) 

47class FunctionReference: 

48 """ 

49 A picklable reference to a function in the registry. 

50 

51 This replaces raw function objects in compiled step definitions to ensure 

52 picklability while allowing workers to resolve functions from their registry. 

53 """ 

54 function_name: str 

55 registry_name: str 

56 memory_type: str # The memory type for get_function_by_name() (e.g., "numpy", "pyclesperanto") 

57 composite_key: str # The full registry key (e.g., "pyclesperanto:gaussian_blur") 

58 

59 def resolve(self) -> Callable: 

60 """Resolve this reference to the actual decorated function from registry.""" 

61 if self.registry_name == "openhcs": 61 ↛ 71line 61 didn't jump to line 71 because the condition on line 61 was always true

62 # For OpenHCS functions, use RegistryService directly with composite key 

63 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

64 all_functions = RegistryService.get_all_functions_with_metadata() 

65 if self.composite_key in all_functions: 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was always true

66 return all_functions[self.composite_key].func 

67 else: 

68 raise RuntimeError(f"OpenHCS function {self.composite_key} not found in registry") 

69 else: 

70 # For external library functions, use the memory type for lookup 

71 from openhcs.processing.func_registry import get_function_by_name 

72 return get_function_by_name(self.function_name, self.memory_type) 

73 

74 

75def _refresh_function_objects_in_steps(pipeline_definition: List[AbstractStep]) -> None: 

76 """ 

77 Refresh all function objects in pipeline steps to ensure they're picklable. 

78 

79 This recreates function objects by importing them fresh from their original modules, 

80 similar to how code mode works, which avoids unpicklable closures from registry wrapping. 

81 """ 

82 for step in pipeline_definition: 

83 if hasattr(step, 'func') and step.func is not None: 83 ↛ 82line 83 didn't jump to line 82 because the condition on line 83 was always true

84 step.func = _refresh_function_object(step.func) 

85 

86 

87def _refresh_function_object(func_value): 

88 """Convert function objects to picklable FunctionReference objects. 

89 

90 Also filters out functions with enabled=False at compile time. 

91 """ 

92 try: 

93 if callable(func_value) and hasattr(func_value, '__module__'): 

94 # Single function → FunctionReference 

95 return _get_function_reference(func_value) 

96 

97 elif isinstance(func_value, tuple) and len(func_value) == 2: 

98 # Function with parameters tuple → (FunctionReference, params) 

99 func, params = func_value 

100 

101 # Check if function is disabled via enabled parameter 

102 if isinstance(params, dict) and params.get('enabled', True) is False: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true

103 import logging 

104 logger = logging.getLogger(__name__) 

105 func_name = getattr(func, '__name__', str(func)) 

106 logger.info(f"🔧 COMPILE-TIME FILTER: Removing disabled function '{func_name}' from pipeline") 

107 return None # Mark for removal 

108 

109 if callable(func): 

110 func_ref = _refresh_function_object(func) 

111 # Remove 'enabled' from params since it's not a real function parameter 

112 if isinstance(params, dict) and 'enabled' in params: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 params = {k: v for k, v in params.items() if k != 'enabled'} 

114 return (func_ref, params) 

115 

116 elif isinstance(func_value, list): 

117 # List of functions → List of FunctionReferences (filter out None) 

118 refreshed = [_refresh_function_object(item) for item in func_value] 

119 return [item for item in refreshed if item is not None] 

120 

121 elif isinstance(func_value, dict): 

122 # Dict of functions → Dict of FunctionReferences (filter out None values) 

123 refreshed = {key: _refresh_function_object(value) for key, value in func_value.items()} 

124 return {key: value for key, value in refreshed.items() if value is not None} 

125 

126 except Exception as e: 

127 import logging 

128 logger = logging.getLogger(__name__) 

129 logger.warning(f"Failed to create function reference for {func_value}: {e}") 

130 # If we can't create a reference, return original (may fail later) 

131 return func_value 

132 

133 return func_value 

134 

135 

136def _get_function_reference(func): 

137 """Convert a function to a picklable FunctionReference.""" 

138 try: 

139 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

140 

141 # Get all function metadata to find this function 

142 all_functions = RegistryService.get_all_functions_with_metadata() 

143 

144 # Find the metadata for this function by matching name and module 

145 for composite_key, metadata in all_functions.items(): 145 ↛ 163line 145 didn't jump to line 163 because the loop on line 145 didn't complete

146 if (metadata.func.__name__ == func.__name__ and 

147 metadata.func.__module__ == func.__module__): 

148 # Create a picklable reference instead of the function object 

149 return FunctionReference( 

150 function_name=func.__name__, 

151 registry_name=metadata.registry.library_name, 

152 memory_type=metadata.registry.MEMORY_TYPE, 

153 composite_key=composite_key 

154 ) 

155 

156 except Exception as e: 

157 import logging 

158 logger = logging.getLogger(__name__) 

159 logger.warning(f"Failed to create function reference for {func.__name__}: {e}") 

160 

161 # If we can't create a reference, this function isn't in the registry 

162 # This should not happen for properly registered functions 

163 raise RuntimeError(f"Function {func.__name__} not found in registry - cannot create reference") 

164 

165 

166def _normalize_step_attributes(pipeline_definition: List[AbstractStep]) -> None: 

167 """Backwards compatibility: Set missing step attributes to constructor defaults.""" 

168 sig = inspect.signature(AbstractStep.__init__) 

169 # Include ALL parameters with defaults, even None values 

170 defaults = {name: param.default for name, param in sig.parameters.items() 

171 if name != 'self' and param.default is not inspect.Parameter.empty} 

172 

173 # Add attributes that are set manually in AbstractStep.__init__ but not constructor parameters 

174 manual_attributes = { 

175 '__input_dir__': None, 

176 '__output_dir__': None, 

177 } 

178 

179 for i, step in enumerate(pipeline_definition): 

180 # Set missing constructor parameters 

181 for attr_name, default_value in defaults.items(): 

182 if not hasattr(step, attr_name): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 setattr(step, attr_name, default_value) 

184 

185 # Set missing manual attributes (for backwards compatibility with older serialized steps) 

186 for attr_name, default_value in manual_attributes.items(): 

187 if not hasattr(step, attr_name): 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 setattr(step, attr_name, default_value) 

189 

190 

191class PipelineCompiler: 

192 """ 

193 Compiles a pipeline by populating step plans within a ProcessingContext. 

194 

195 This class provides static methods that are called sequentially by the 

196 PipelineOrchestrator for each well's ProcessingContext. Each method 

197 is responsible for a specific part of the compilation process, such as 

198 path planning, special I/O resolution, materialization flag setting, 

199 memory contract validation, and GPU resource assignment. 

200 """ 

201 

202 @staticmethod 

203 def initialize_step_plans_for_context( 

204 context: ProcessingContext, 

205 steps_definition: List[AbstractStep], 

206 orchestrator, 

207 metadata_writer: bool = False, 

208 plate_path: Optional[Path] = None 

209 # base_input_dir and axis_id parameters removed, will use from context 

210 ) -> None: 

211 """ 

212 Initializes step_plans by calling PipelinePathPlanner.prepare_pipeline_paths, 

213 which handles primary paths, special I/O path planning and linking, and chainbreaker status. 

214 Then, this method supplements the plans with non-I/O FunctionStep-specific attributes. 

215 

216 Args: 

217 context: ProcessingContext to initialize step plans for 

218 steps_definition: List of AbstractStep objects defining the pipeline 

219 orchestrator: Orchestrator instance for well filter resolution 

220 metadata_writer: If True, this well is responsible for creating OpenHCS metadata files 

221 plate_path: Path to plate root for zarr conversion detection 

222 """ 

223 # NOTE: This method is called within config_context() wrapper in compile_pipelines() 

224 if context.is_frozen(): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 raise AttributeError("Cannot initialize step plans in a frozen ProcessingContext.") 

226 

227 if not hasattr(context, 'step_plans') or context.step_plans is None: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 context.step_plans = {} # Ensure step_plans dict exists 

229 

230 # === VISUALIZER CONFIG EXTRACTION === 

231 # visualizer_config is a legacy parameter that's passed to visualizers but never used 

232 # The actual display configuration comes from the display_config parameter 

233 # Set to None for backward compatibility with orchestrator code 

234 context.visualizer_config = None 

235 

236 # Note: _normalize_step_attributes is now called in compile_pipelines() before filtering 

237 # to ensure old pickled steps have the 'enabled' attribute before we check it 

238 

239 # Pre-initialize step_plans with basic entries for each step 

240 # Use step index as key instead of step_id for multiprocessing compatibility 

241 for step_index, step in enumerate(steps_definition): 

242 if step_index not in context.step_plans: 242 ↛ 241line 242 didn't jump to line 241 because the condition on line 242 was always true

243 context.step_plans[step_index] = { 

244 "step_name": step.name, 

245 "step_type": step.__class__.__name__, 

246 "axis_id": context.axis_id, 

247 } 

248 

249 # === INPUT CONVERSION DETECTION === 

250 # Check if first step needs zarr conversion 

251 if steps_definition and plate_path: 251 ↛ 290line 251 didn't jump to line 290 because the condition on line 251 was always true

252 first_step = steps_definition[0] 

253 # Access config directly from orchestrator.pipeline_config (lazy resolution via config_context) 

254 vfs_config = orchestrator.pipeline_config.vfs_config 

255 

256 # Only convert if default materialization backend is ZARR 

257 wants_zarr_conversion = ( 

258 vfs_config.materialization_backend == MaterializationBackend.ZARR 

259 ) 

260 

261 if wants_zarr_conversion: 

262 # Check if input plate is already zarr format 

263 available_backends = context.microscope_handler.get_available_backends(plate_path) 

264 already_zarr = Backend.ZARR in available_backends 

265 

266 if not already_zarr: 

267 # Determine if input uses virtual workspace 

268 from openhcs.microscopes.openhcs import OpenHCSMetadataHandler 

269 from openhcs.io.metadata_writer import get_subdirectory_name 

270 

271 openhcs_metadata_handler = OpenHCSMetadataHandler(context.filemanager) 

272 metadata = openhcs_metadata_handler._load_metadata_dict(plate_path) 

273 subdirs = metadata["subdirectories"] 

274 

275 # Get actual subdirectory from input_dir 

276 original_subdir = get_subdirectory_name(context.input_dir, plate_path) 

277 uses_virtual_workspace = Backend.VIRTUAL_WORKSPACE.value in subdirs[original_subdir]["available_backends"] 

278 

279 zarr_subdir = "zarr" if uses_virtual_workspace else original_subdir 

280 conversion_dir = plate_path / zarr_subdir 

281 

282 context.step_plans[0]["input_conversion_dir"] = str(conversion_dir) 

283 context.step_plans[0]["input_conversion_backend"] = MaterializationBackend.ZARR.value 

284 context.step_plans[0]["input_conversion_uses_virtual_workspace"] = uses_virtual_workspace 

285 context.step_plans[0]["input_conversion_original_subdir"] = original_subdir 

286 logger.debug(f"Input conversion to zarr enabled for first step: {first_step.name}") 

287 

288 # The axis_id and base_input_dir are available from the context object. 

289 # Path planning now gets config directly from orchestrator.pipeline_config parameter 

290 PipelinePathPlanner.prepare_pipeline_paths( 

291 context, 

292 steps_definition, 

293 orchestrator.pipeline_config 

294 ) 

295 

296 # === FUNCTION OBJECT REFRESH === 

297 # CRITICAL FIX: Refresh all function objects to ensure they're picklable 

298 # This prevents multiprocessing pickling errors by ensuring clean function objects 

299 logger.debug("🔧 FUNCTION REFRESH: Refreshing all function objects for picklability...") 

300 _refresh_function_objects_in_steps(steps_definition) 

301 

302 # === LAZY CONFIG RESOLUTION === 

303 # Resolve each step's lazy configs with proper nested context 

304 # This ensures step-level configs inherit from pipeline-level configs 

305 # Architecture: GlobalPipelineConfig -> PipelineConfig -> Step (same as UI) 

306 logger.debug("🔧 LAZY CONFIG RESOLUTION: Resolving lazy configs with nested step contexts...") 

307 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization 

308 from openhcs.config_framework.context_manager import config_context 

309 

310 # Resolve each step individually with nested context (pipeline -> step) 

311 # NOTE: The caller has already set up config_context(orchestrator.pipeline_config) 

312 # We add step-level context on top for each step 

313 resolved_steps = [] 

314 for step in steps_definition: 

315 with config_context(step): # Step-level context on top of pipeline context 

316 resolved_step = resolve_lazy_configurations_for_serialization(step) 

317 resolved_steps.append(resolved_step) 

318 steps_definition = resolved_steps 

319 

320 # Loop to supplement step_plans with non-I/O, non-path attributes 

321 # after PipelinePathPlanner has fully populated them with I/O info. 

322 for step_index, step in enumerate(steps_definition): 

323 if step_index not in context.step_plans: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 logger.error( 

325 f"Critical error: Step {step.name} (index: {step_index}) " 

326 f"not found in step_plans after path planning phase. Clause 504." 

327 ) 

328 # Create a minimal error plan 

329 context.step_plans[step_index] = { 

330 "step_name": step.name, 

331 "step_type": step.__class__.__name__, 

332 "axis_id": context.axis_id, # Use context.axis_id 

333 "error": "Missing from path planning phase by PipelinePathPlanner", 

334 "create_openhcs_metadata": metadata_writer # Set metadata writer responsibility flag 

335 } 

336 continue 

337 

338 current_plan = context.step_plans[step_index] 

339 

340 # Ensure basic metadata (PathPlanner should set most of this) 

341 current_plan["step_name"] = step.name 

342 current_plan["step_type"] = step.__class__.__name__ 

343 current_plan["axis_id"] = context.axis_id # Use context.axis_id; PathPlanner should also use context.axis_id 

344 current_plan.setdefault("visualize", False) # Ensure visualize key exists 

345 current_plan["create_openhcs_metadata"] = metadata_writer # Set metadata writer responsibility flag 

346 

347 # The special_outputs and special_inputs are now fully handled by PipelinePathPlanner. 

348 # The block for planning special_outputs (lines 134-148 in original) is removed. 

349 # Ensure these keys exist as OrderedDicts if PathPlanner doesn't guarantee it 

350 # (PathPlanner currently creates them as dicts, OrderedDict might not be strictly needed here anymore) 

351 current_plan.setdefault("special_inputs", OrderedDict()) 

352 current_plan.setdefault("special_outputs", OrderedDict()) 

353 current_plan.setdefault("chainbreaker", False) # PathPlanner now sets this. 

354 

355 # Add step-specific attributes (non-I/O, non-path related) 

356 current_plan["variable_components"] = step.variable_components 

357 current_plan["group_by"] = step.group_by 

358 # Lazy configs were already resolved at the beginning of compilation 

359 resolved_step = step 

360 

361 # DEBUG: Check what the resolved napari config actually has 

362 if hasattr(resolved_step, 'napari_streaming_config') and resolved_step.napari_streaming_config: 

363 logger.debug(f"resolved_step.napari_streaming_config.well_filter = {resolved_step.napari_streaming_config.well_filter}") 

364 if hasattr(resolved_step, 'step_well_filter_config') and resolved_step.step_well_filter_config: 364 ↛ 366line 364 didn't jump to line 366 because the condition on line 364 was always true

365 logger.debug(f"resolved_step.step_well_filter_config.well_filter = {resolved_step.step_well_filter_config.well_filter}") 

366 if hasattr(resolved_step, 'step_materialization_config') and resolved_step.step_materialization_config: 366 ↛ 370line 366 didn't jump to line 370 because the condition on line 366 was always true

367 logger.debug(f"resolved_step.step_materialization_config.sub_dir = '{resolved_step.step_materialization_config.sub_dir}' (type: {type(resolved_step.step_materialization_config).__name__})") 

368 

369 # Store WellFilterConfig instances only if they match the current axis 

370 from openhcs.core.config import WellFilterConfig, StreamingConfig, WellFilterMode 

371 has_streaming = False 

372 required_visualizers = getattr(context, 'required_visualizers', []) 

373 

374 # CRITICAL FIX: Ensure required_visualizers is always set on context 

375 # This prevents AttributeError during execution phase 

376 if not hasattr(context, 'required_visualizers'): 

377 context.required_visualizers = [] 

378 

379 # Get step axis filters for this step 

380 step_axis_filters = getattr(context, 'step_axis_filters', {}).get(step_index, {}) 

381 

382 logger.debug(f"Processing step '{step.name}' with attributes: {[attr for attr in dir(resolved_step) if not attr.startswith('_') and 'config' in attr]}") 

383 if step.name == "Image Enhancement Processing": 

384 logger.debug(f"All attributes for {step.name}: {[attr for attr in dir(resolved_step) if not attr.startswith('_')]}") 

385 

386 for attr_name in dir(resolved_step): 

387 if not attr_name.startswith('_'): 

388 config = getattr(resolved_step, attr_name, None) 

389 # Configs are already resolved to base configs at line 277 

390 # No need to call to_base_config() again - that's legacy code 

391 

392 # Skip None configs 

393 if config is None: 

394 continue 

395 

396 # CRITICAL: Check enabled field first (fail-fast for disabled configs) 

397 if hasattr(config, 'enabled') and not config.enabled: 

398 continue 

399 

400 # Check well filter matching (only for WellFilterConfig instances) 

401 include_config = True 

402 if isinstance(config, WellFilterConfig) and config.well_filter is not None: 

403 config_filter = step_axis_filters.get(attr_name) 

404 if config_filter: 404 ↛ 413line 404 didn't jump to line 413 because the condition on line 404 was always true

405 # Apply axis filter logic 

406 axis_in_filter = context.axis_id in config_filter['resolved_axis_values'] 

407 include_config = ( 

408 axis_in_filter if config_filter['filter_mode'] == WellFilterMode.INCLUDE 

409 else not axis_in_filter 

410 ) 

411 

412 # Add config to plan if it passed all checks 

413 if include_config: 

414 current_plan[attr_name] = config 

415 

416 # Add streaming extras if this is a streaming config 

417 if isinstance(config, StreamingConfig): 

418 # Validate that the visualizer can actually be created 

419 try: 

420 # Only validate configs that actually have a backend (real streaming configs) 

421 if not hasattr(config, 'backend'): 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true

422 continue 

423 

424 # Test visualizer creation without actually creating it 

425 if hasattr(config, 'create_visualizer'): 425 ↛ 448line 425 didn't jump to line 448 because the condition on line 425 was always true

426 # For napari, check if napari is available and environment supports GUI 

427 if config.backend.name == 'NAPARI_STREAM': 

428 from openhcs.utils.import_utils import optional_import 

429 import os 

430 

431 # Check if running in headless/CI environment 

432 # CPU-only mode does NOT imply headless - you can run CPU mode with napari 

433 is_headless = ( 

434 os.getenv('CI', 'false').lower() == 'true' or 

435 os.getenv('OPENHCS_HEADLESS', 'false').lower() == 'true' or 

436 os.getenv('DISPLAY') is None 

437 ) 

438 

439 if is_headless: 439 ↛ 443line 439 didn't jump to line 443 because the condition on line 439 was always true

440 logger.info(f"Napari streaming disabled for step '{step.name}': running in headless environment (CI or no DISPLAY)") 

441 continue # Skip this streaming config 

442 

443 napari = optional_import("napari") 

444 if napari is None: 

445 logger.warning(f"Napari streaming disabled for step '{step.name}': napari not installed. Install with: pip install 'openhcs[viz]' or pip install napari") 

446 continue # Skip this streaming config 

447 

448 has_streaming = True 

449 # Collect visualizer info 

450 visualizer_info = { 

451 'backend': config.backend.name, 

452 'config': config 

453 } 

454 if visualizer_info not in required_visualizers: 454 ↛ 386line 454 didn't jump to line 386 because the condition on line 454 was always true

455 required_visualizers.append(visualizer_info) 

456 except Exception as e: 

457 logger.warning(f"Streaming disabled for step '{step.name}': {e}") 

458 continue # Skip this streaming config 

459 

460 # Set visualize flag for orchestrator if any streaming is enabled 

461 current_plan["visualize"] = has_streaming 

462 context.required_visualizers = required_visualizers 

463 

464 # Add FunctionStep specific attributes 

465 if isinstance(step, FunctionStep): 465 ↛ exitline 465 didn't return from function 'initialize_step_plans_for_context' because the condition on line 465 was always true

466 

467 # 🎯 SEMANTIC COHERENCE FIX: Prevent group_by/variable_components conflict 

468 # When variable_components contains the same value as group_by, 

469 # set group_by to None to avoid EZStitcher heritage rule violation 

470 if (step.variable_components and step.group_by and 470 ↛ 472line 470 didn't jump to line 472 because the condition on line 470 was never true

471 step.group_by in step.variable_components): 

472 logger.debug(f"Step {step.name}: Detected group_by='{step.group_by}' in variable_components={step.variable_components}. " 

473 f"Setting group_by=None to maintain semantic coherence.") 

474 current_plan["group_by"] = None 

475 

476 # func attribute is guaranteed in FunctionStep.__init__ 

477 current_plan["func_name"] = getattr(step.func, '__name__', str(step.func)) 

478 

479 # Memory type hints from step instance (set in FunctionStep.__init__ if provided) 

480 # These are initial hints; FuncStepContractValidator will set final types. 

481 if hasattr(step, 'input_memory_type_hint'): # From FunctionStep.__init__ 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 current_plan['input_memory_type_hint'] = step.input_memory_type_hint 

483 if hasattr(step, 'output_memory_type_hint'): # From FunctionStep.__init__ 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 current_plan['output_memory_type_hint'] = step.output_memory_type_hint 

485 

486 # The resolve_special_input_paths_for_context static method is DELETED (lines 181-238 of original) 

487 # as this functionality is now handled by PipelinePathPlanner.prepare_pipeline_paths. 

488 

489 # _prepare_materialization_flags is removed as MaterializationFlagPlanner.prepare_pipeline_flags 

490 # now modifies context.step_plans in-place and takes context directly. 

491 

492 @staticmethod 

493 def declare_zarr_stores_for_context( 

494 context: ProcessingContext, 

495 steps_definition: List[AbstractStep], 

496 orchestrator 

497 ) -> None: 

498 """ 

499 Declare zarr store creation functions for runtime execution. 

500 

501 This method runs after path planning but before materialization flag planning 

502 to declare which steps need zarr stores and provide the metadata needed 

503 for runtime store creation. 

504 

505 Args: 

506 context: ProcessingContext for current well 

507 steps_definition: List of AbstractStep objects 

508 orchestrator: Orchestrator instance for accessing all wells 

509 """ 

510 from openhcs.constants import MULTIPROCESSING_AXIS 

511 

512 all_wells = orchestrator.get_component_keys(MULTIPROCESSING_AXIS) 

513 

514 # Access config directly from orchestrator.pipeline_config (lazy resolution via config_context) 

515 vfs_config = orchestrator.pipeline_config.vfs_config 

516 

517 for step_index, step in enumerate(steps_definition): 

518 step_plan = context.step_plans[step_index] 

519 

520 will_use_zarr = ( 

521 vfs_config.materialization_backend == MaterializationBackend.ZARR and 

522 step_index == len(steps_definition) - 1 

523 ) 

524 

525 if will_use_zarr: 

526 step_plan["zarr_config"] = { 

527 "all_wells": all_wells, 

528 "needs_initialization": True 

529 } 

530 logger.debug(f"Step '{step.name}' will use zarr backend for axis {context.axis_id}") 

531 else: 

532 step_plan["zarr_config"] = None 

533 

534 @staticmethod 

535 def plan_materialization_flags_for_context( 

536 context: ProcessingContext, 

537 steps_definition: List[AbstractStep], 

538 orchestrator 

539 ) -> None: 

540 """ 

541 Plans and injects materialization flags into context.step_plans 

542 by calling MaterializationFlagPlanner. 

543 """ 

544 if context.is_frozen(): 544 ↛ 545line 544 didn't jump to line 545 because the condition on line 544 was never true

545 raise AttributeError("Cannot plan materialization flags in a frozen ProcessingContext.") 

546 if not context.step_plans: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true

547 logger.warning("step_plans is empty in context for materialization planning. This may be valid if pipeline is empty.") 

548 return 

549 

550 # MaterializationFlagPlanner.prepare_pipeline_flags now takes context and pipeline_definition 

551 # and modifies context.step_plans in-place. 

552 MaterializationFlagPlanner.prepare_pipeline_flags( 

553 context, 

554 steps_definition, 

555 orchestrator.plate_path, 

556 orchestrator.pipeline_config 

557 ) 

558 

559 # Post-check (optional, but good for ensuring contracts are met by the planner) 

560 for step_index, step in enumerate(steps_definition): 

561 if step_index not in context.step_plans: 561 ↛ 563line 561 didn't jump to line 563 because the condition on line 561 was never true

562 # This should not happen if prepare_pipeline_flags guarantees plans for all steps 

563 logger.error(f"Step {step.name} (index: {step_index}) missing from step_plans after materialization planning.") 

564 continue 

565 

566 plan = context.step_plans[step_index] 

567 # Check for keys that FunctionStep actually uses during execution 

568 required_keys = [READ_BACKEND, WRITE_BACKEND] 

569 if not all(k in plan for k in required_keys): 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true

570 missing_keys = [k for k in required_keys if k not in plan] 

571 logger.error( 

572 f"Materialization flag planning incomplete for step {step.name} (index: {step_index}). " 

573 f"Missing required keys: {missing_keys} (Clause 273)." 

574 ) 

575 

576 

577 @staticmethod 

578 def validate_memory_contracts_for_context( 

579 context: ProcessingContext, 

580 steps_definition: List[AbstractStep], 

581 orchestrator=None 

582 ) -> None: 

583 """ 

584 Validates FunctionStep memory contracts, dict patterns, and adds memory type info to context.step_plans. 

585 

586 Args: 

587 context: ProcessingContext to validate 

588 steps_definition: List of AbstractStep objects 

589 orchestrator: Optional orchestrator for dict pattern key validation 

590 """ 

591 if context.is_frozen(): 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true

592 raise AttributeError("Cannot validate memory contracts in a frozen ProcessingContext.") 

593 

594 # FuncStepContractValidator might need access to input/output_memory_type_hint from plan 

595 step_memory_types = FuncStepContractValidator.validate_pipeline( 

596 steps=steps_definition, 

597 pipeline_context=context, # Pass context so validator can access step plans for memory type overrides 

598 orchestrator=orchestrator # Pass orchestrator for dict pattern key validation 

599 ) 

600 

601 for step_index, memory_types in step_memory_types.items(): 

602 if "input_memory_type" not in memory_types or "output_memory_type" not in memory_types: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 step_name = context.step_plans[step_index]["step_name"] 

604 raise AssertionError( 

605 f"Memory type validation must set input/output_memory_type for FunctionStep {step_name} (index: {step_index}) (Clause 101)." 

606 ) 

607 if step_index in context.step_plans: 607 ↛ 610line 607 didn't jump to line 610 because the condition on line 607 was always true

608 context.step_plans[step_index].update(memory_types) 

609 else: 

610 logger.warning(f"Step index {step_index} found in memory_types but not in context.step_plans. Skipping.") 

611 

612 # Apply memory type override: Any step with disk output must use numpy for disk writing 

613 for step_index, step in enumerate(steps_definition): 

614 if isinstance(step, FunctionStep): 614 ↛ 613line 614 didn't jump to line 613 because the condition on line 614 was always true

615 if step_index in context.step_plans: 615 ↛ 613line 615 didn't jump to line 613 because the condition on line 615 was always true

616 step_plan = context.step_plans[step_index] 

617 is_last_step = (step_index == len(steps_definition) - 1) 

618 write_backend = step_plan['write_backend'] 

619 

620 if write_backend == 'disk': 

621 logger.debug(f"Step {step.name} has disk output, overriding output_memory_type to numpy") 

622 step_plan['output_memory_type'] = 'numpy' 

623 

624 

625 

626 @staticmethod 

627 def assign_gpu_resources_for_context( 

628 context: ProcessingContext 

629 ) -> None: 

630 """ 

631 Validates GPU memory types from context.step_plans and assigns GPU device IDs. 

632 (Unchanged from previous version) 

633 """ 

634 if context.is_frozen(): 634 ↛ 635line 634 didn't jump to line 635 because the condition on line 634 was never true

635 raise AttributeError("Cannot assign GPU resources in a frozen ProcessingContext.") 

636 

637 gpu_assignments = GPUMemoryTypeValidator.validate_step_plans(context.step_plans) 

638 

639 for step_index, step_plan_val in context.step_plans.items(): # Renamed step_plan to step_plan_val to avoid conflict 

640 is_gpu_step = False 

641 input_type = step_plan_val["input_memory_type"] 

642 if input_type in VALID_GPU_MEMORY_TYPES: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true

643 is_gpu_step = True 

644 

645 output_type = step_plan_val["output_memory_type"] 

646 if output_type in VALID_GPU_MEMORY_TYPES: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 is_gpu_step = True 

648 

649 if is_gpu_step: 649 ↛ 652line 649 didn't jump to line 652 because the condition on line 649 was never true

650 # Ensure gpu_assignments has an entry for this step_index if it's a GPU step 

651 # And that entry contains a 'gpu_id' 

652 step_gpu_assignment = gpu_assignments[step_index] 

653 if "gpu_id" not in step_gpu_assignment: 

654 step_name = step_plan_val["step_name"] 

655 raise AssertionError( 

656 f"GPU validation must assign gpu_id for step {step_name} (index: {step_index}) " 

657 f"with GPU memory types (Clause 295)." 

658 ) 

659 

660 for step_index, gpu_assignment in gpu_assignments.items(): 660 ↛ 661line 660 didn't jump to line 661 because the loop on line 660 never started

661 if step_index in context.step_plans: 

662 context.step_plans[step_index].update(gpu_assignment) 

663 else: 

664 logger.warning(f"Step index {step_index} found in gpu_assignments but not in context.step_plans. Skipping.") 

665 

666 @staticmethod 

667 def apply_global_visualizer_override_for_context( 

668 context: ProcessingContext, 

669 global_enable_visualizer: bool 

670 ) -> None: 

671 """ 

672 Applies global visualizer override to all step_plans in the context. 

673 (Unchanged from previous version) 

674 """ 

675 if context.is_frozen(): 

676 raise AttributeError("Cannot apply visualizer override in a frozen ProcessingContext.") 

677 

678 if global_enable_visualizer: 

679 if not context.step_plans: return # Guard against empty step_plans 

680 for step_index, plan in context.step_plans.items(): 

681 plan["visualize"] = True 

682 logger.info(f"Global visualizer override: Step '{plan['step_name']}' marked for visualization.") 

683 

684 @staticmethod 

685 def resolve_lazy_dataclasses_for_context(context: ProcessingContext, orchestrator) -> None: 

686 """ 

687 Resolve all lazy dataclass instances in step plans to their base configurations. 

688 

689 This method should be called after all compilation phases but before context 

690 freezing to ensure step plans are safe for pickling in multiprocessing contexts. 

691 

692 NOTE: The caller MUST have already set up config_context(orchestrator.pipeline_config) 

693 before calling this method. We rely on that context for lazy resolution. 

694 

695 Args: 

696 context: ProcessingContext to process 

697 orchestrator: PipelineOrchestrator (unused - kept for API compatibility) 

698 """ 

699 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization 

700 

701 # Resolve the entire context recursively to catch all lazy dataclass instances 

702 # The caller has already set up config_context(), so lazy resolution happens automatically 

703 resolved_context_dict = resolve_lazy_configurations_for_serialization(vars(context)) 

704 

705 # Update context attributes with resolved values 

706 for attr_name, resolved_value in resolved_context_dict.items(): 

707 if not attr_name.startswith('_'): # Skip private attributes 

708 setattr(context, attr_name, resolved_value) 

709 

710 @staticmethod 

711 def validate_backend_compatibility(orchestrator) -> None: 

712 """ 

713 Validate and auto-correct materialization backend for microscopes with single compatible backend. 

714 

715 For microscopes with only one compatible backend (e.g., OMERO → OMERO_LOCAL), 

716 automatically corrects the backend if misconfigured. For microscopes with multiple 

717 compatible backends, the configured backend must be explicitly compatible. 

718 

719 Args: 

720 orchestrator: PipelineOrchestrator instance with initialized microscope_handler 

721 """ 

722 from openhcs.core.config import VFSConfig 

723 from dataclasses import replace 

724 

725 microscope_handler = orchestrator.microscope_handler 

726 required_backend = microscope_handler.get_required_backend() 

727 

728 if required_backend: 

729 # Microscope has single compatible backend - auto-correct if needed 

730 vfs_config = orchestrator.pipeline_config.vfs_config or VFSConfig() 

731 

732 if vfs_config.materialization_backend != required_backend: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true

733 logger.warning( 

734 f"{microscope_handler.microscope_type} requires {required_backend.value} backend. " 

735 f"Auto-correcting from {vfs_config.materialization_backend.value}." 

736 ) 

737 new_vfs_config = replace(vfs_config, materialization_backend=required_backend) 

738 orchestrator.pipeline_config = replace( 

739 orchestrator.pipeline_config, 

740 vfs_config=new_vfs_config 

741 ) 

742 

743 @staticmethod 

744 def ensure_analysis_materialization(pipeline_definition: List[AbstractStep]) -> None: 

745 """ 

746 Ensure intermediate steps with analysis outputs have step_materialization_config. 

747 

748 Analysis results (special outputs) must be saved alongside the images they were 

749 created from to maintain metadata coherence. For intermediate steps (not final), 

750 this requires materializing the images so analysis has matching image metadata. 

751 

752 Final steps don't need auto-creation because their images and analysis both 

753 go to main output directory (no metadata mismatch). 

754 

755 Called once before per-well compilation loop. 

756 

757 Args: 

758 pipeline_definition: List of pipeline steps to check 

759 """ 

760 from openhcs.core.config import StepMaterializationConfig 

761 

762 for step_index, step in enumerate(pipeline_definition): 

763 # Only process FunctionSteps 

764 if not isinstance(step, FunctionStep): 764 ↛ 765line 764 didn't jump to line 765 because the condition on line 764 was never true

765 continue 

766 

767 # Check if step has special outputs (analysis results) 

768 has_special_outputs = hasattr(step.func, '__special_outputs__') and step.func.__special_outputs__ 

769 

770 # Only auto-create for intermediate steps (not final step) 

771 is_intermediate_step = step_index < len(pipeline_definition) - 1 

772 

773 # Normalize: no config = disabled config (eliminates dual code path) 

774 if not step.step_materialization_config: 

775 from openhcs.config_framework.lazy_factory import LazyStepMaterializationConfig 

776 step.step_materialization_config = LazyStepMaterializationConfig(enabled=False) 

777 

778 # Single code path: just check enabled 

779 if has_special_outputs and not step.step_materialization_config.enabled and is_intermediate_step: 

780 # Auto-enable materialization to preserve metadata coherence 

781 from openhcs.config_framework.lazy_factory import LazyStepMaterializationConfig 

782 step.step_materialization_config = LazyStepMaterializationConfig() 

783 

784 logger.warning( 

785 f"⚠️ Step '{step.name}' (index {step_index}) has analysis outputs but lacks " 

786 f"enabled materialization config. Auto-creating with defaults to preserve " 

787 f"metadata coherence (intermediate step analysis must be saved with matching images)." 

788 ) 

789 logger.info( 

790 f" → Images and analysis will be saved to: " 

791 f"{{plate_root}}/{step.step_materialization_config.sub_dir}/" 

792 ) 

793 

794 @staticmethod 

795 def compile_pipelines( 

796 orchestrator, 

797 pipeline_definition: List[AbstractStep], 

798 axis_filter: Optional[List[str]] = None, 

799 enable_visualizer_override: bool = False 

800 ) -> Dict[str, ProcessingContext]: 

801 """ 

802 Compile-all phase: Prepares frozen ProcessingContexts for each axis value. 

803 

804 This method iterates through the specified axis values, creates a ProcessingContext 

805 for each, and invokes the various phases of the PipelineCompiler to populate 

806 the context's step_plans. After all compilation phases for an axis value are complete, 

807 its context is frozen. Finally, attributes are stripped from the pipeline_definition, 

808 making the step objects stateless for the execution phase. 

809 

810 Args: 

811 orchestrator: The PipelineOrchestrator instance to use for compilation 

812 pipeline_definition: The list of AbstractStep objects defining the pipeline. 

813 axis_filter: Optional list of axis values to process. If None, processes all found axis values. 

814 enable_visualizer_override: If True, all steps in all compiled contexts 

815 will have their 'visualize' flag set to True. 

816 

817 Returns: 

818 A dictionary mapping axis values to their compiled and frozen ProcessingContexts. 

819 The input `pipeline_definition` list (of step objects) is modified in-place 

820 to become stateless. 

821 """ 

822 from openhcs.constants.constants import OrchestratorState 

823 from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper 

824 

825 if not orchestrator.is_initialized(): 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true

826 raise RuntimeError("PipelineOrchestrator must be explicitly initialized before calling compile_pipelines().") 

827 

828 if not pipeline_definition: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true

829 raise ValueError("A valid pipeline definition (List[AbstractStep]) must be provided.") 

830 

831 # === BACKWARDS COMPATIBILITY PREPROCESSING === 

832 # Normalize step attributes BEFORE filtering to ensure old pickled steps have 'enabled' attribute 

833 logger.debug("🔧 BACKWARDS COMPATIBILITY: Normalizing step attributes before filtering...") 

834 _normalize_step_attributes(pipeline_definition) 

835 

836 # Filter out disabled steps at compile time (before any compilation phases) 

837 original_count = len(pipeline_definition) 

838 enabled_steps = [] 

839 for step in pipeline_definition: 

840 if step.enabled: 840 ↛ 843line 840 didn't jump to line 843 because the condition on line 840 was always true

841 enabled_steps.append(step) 

842 else: 

843 logger.info(f"🔧 COMPILE-TIME FILTER: Removing disabled step '{step.name}' from pipeline") 

844 

845 # Update pipeline_definition in-place to contain only enabled steps 

846 pipeline_definition.clear() 

847 pipeline_definition.extend(enabled_steps) 

848 

849 if original_count != len(pipeline_definition): 849 ↛ 850line 849 didn't jump to line 850 because the condition on line 849 was never true

850 logger.info(f"🔧 COMPILE-TIME FILTER: Filtered {original_count - len(pipeline_definition)} disabled step(s), {len(pipeline_definition)} step(s) remaining") 

851 

852 if not pipeline_definition: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true

853 logger.warning("All steps were disabled. Pipeline is empty after filtering.") 

854 return { 

855 'pipeline_definition': pipeline_definition, 

856 'compiled_contexts': {} 

857 } 

858 

859 try: 

860 compiled_contexts: Dict[str, ProcessingContext] = {} 

861 # Get multiprocessing axis values dynamically from configuration 

862 from openhcs.constants import MULTIPROCESSING_AXIS 

863 

864 # CRITICAL: Resolve well_filter_config from pipeline_config if present 

865 # This allows global-level well filtering to work (e.g., well_filter_config.well_filter = 1) 

866 resolved_axis_filter = axis_filter 

867 if orchestrator.pipeline_config and hasattr(orchestrator.pipeline_config, 'well_filter_config'): 867 ↛ 885line 867 didn't jump to line 885 because the condition on line 867 was always true

868 well_filter_config = orchestrator.pipeline_config.well_filter_config 

869 if well_filter_config and hasattr(well_filter_config, 'well_filter') and well_filter_config.well_filter is not None: 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true

870 from openhcs.core.utils import WellFilterProcessor 

871 available_wells = orchestrator.get_component_keys(MULTIPROCESSING_AXIS) 

872 resolved_wells = list(WellFilterProcessor.resolve_compilation_filter( 

873 well_filter_config.well_filter, 

874 available_wells 

875 )) 

876 logger.info(f"Resolved well_filter_config.well_filter={well_filter_config.well_filter} to {len(resolved_wells)} wells: {resolved_wells}") 

877 

878 # If axis_filter was also provided, intersect them 

879 if axis_filter: 

880 resolved_axis_filter = [w for w in resolved_wells if w in axis_filter] 

881 logger.info(f"Intersected with axis_filter: {len(resolved_axis_filter)} wells remain") 

882 else: 

883 resolved_axis_filter = resolved_wells 

884 

885 axis_values_to_process = orchestrator.get_component_keys(MULTIPROCESSING_AXIS, resolved_axis_filter) 

886 

887 if not axis_values_to_process: 887 ↛ 888line 887 didn't jump to line 888 because the condition on line 887 was never true

888 logger.warning("No axis values found to process based on filter.") 

889 return { 

890 'pipeline_definition': pipeline_definition, 

891 'compiled_contexts': {} 

892 } 

893 

894 logger.info(f"Starting compilation for axis values: {', '.join(axis_values_to_process)}") 

895 

896 # === ANALYSIS MATERIALIZATION AUTO-INSTANTIATION === 

897 # Ensure intermediate steps with analysis outputs have step_materialization_config 

898 # This preserves metadata coherence (ROIs must match image structure they were created from) 

899 # CRITICAL: Must be inside config_context() for lazy resolution of .enabled field 

900 from openhcs.config_framework.context_manager import config_context 

901 with config_context(orchestrator.pipeline_config): 

902 PipelineCompiler.ensure_analysis_materialization(pipeline_definition) 

903 

904 # === BACKEND COMPATIBILITY VALIDATION === 

905 # Validate that configured backend is compatible with microscope 

906 # For microscopes with only one compatible backend (e.g., OMERO), auto-set it 

907 logger.debug("🔧 BACKEND VALIDATION: Validating backend compatibility with microscope...") 

908 PipelineCompiler.validate_backend_compatibility(orchestrator) 

909 

910 # === GLOBAL AXIS FILTER RESOLUTION === 

911 # Resolve axis filters once for all axis values to ensure step-level inheritance works 

912 logger.debug("🔧 LAZY CONFIG RESOLUTION: Resolving lazy configs for axis filter resolution...") 

913 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization 

914 from openhcs.config_framework.context_manager import config_context 

915 

916 # Resolve each step with nested context (same as initialize_step_plans_for_context) 

917 # This ensures step-level configs inherit from pipeline-level configs 

918 resolved_steps_for_filters = [] 

919 with config_context(orchestrator.pipeline_config): 

920 for step in pipeline_definition: 

921 with config_context(step): # Step-level context on top of pipeline context 

922 resolved_step = resolve_lazy_configurations_for_serialization(step) 

923 resolved_steps_for_filters.append(resolved_step) 

924 

925 logger.debug("🎯 AXIS FILTER RESOLUTION: Resolving step axis filters...") 

926 # Create a temporary context to store the global axis filters 

927 temp_context = orchestrator.create_context("temp") 

928 

929 # Use orchestrator context during axis filter resolution 

930 # This ensures that lazy config resolution uses the orchestrator context 

931 from openhcs.config_framework.context_manager import config_context 

932 with config_context(orchestrator.pipeline_config): 

933 _resolve_step_axis_filters(resolved_steps_for_filters, temp_context, orchestrator) 

934 global_step_axis_filters = getattr(temp_context, 'step_axis_filters', {}) 

935 

936 # Determine responsible axis value for metadata creation (lexicographically first) 

937 responsible_axis_value = sorted(axis_values_to_process)[0] if axis_values_to_process else None 

938 logger.debug(f"Designated responsible axis value for metadata creation: {responsible_axis_value}") 

939 

940 for axis_id in axis_values_to_process: 

941 logger.debug(f"Compiling for axis value: {axis_id}") 

942 context = orchestrator.create_context(axis_id) 

943 

944 # Copy global axis filters to this context 

945 context.step_axis_filters = global_step_axis_filters 

946 

947 # Determine if this axis value is responsible for metadata creation 

948 is_responsible = (axis_id == responsible_axis_value) 

949 logger.debug(f"Axis {axis_id} metadata responsibility: {is_responsible}") 

950 

951 # CRITICAL: Wrap all compilation steps in config_context() for lazy resolution 

952 from openhcs.config_framework.context_manager import config_context 

953 with config_context(orchestrator.pipeline_config): 

954 PipelineCompiler.initialize_step_plans_for_context(context, pipeline_definition, orchestrator, metadata_writer=is_responsible, plate_path=orchestrator.plate_path) 

955 PipelineCompiler.declare_zarr_stores_for_context(context, pipeline_definition, orchestrator) 

956 PipelineCompiler.plan_materialization_flags_for_context(context, pipeline_definition, orchestrator) 

957 PipelineCompiler.validate_memory_contracts_for_context(context, pipeline_definition, orchestrator) 

958 PipelineCompiler.assign_gpu_resources_for_context(context) 

959 

960 if enable_visualizer_override: 960 ↛ 961line 960 didn't jump to line 961 because the condition on line 960 was never true

961 PipelineCompiler.apply_global_visualizer_override_for_context(context, True) 

962 

963 # Resolve all lazy dataclasses before freezing to ensure multiprocessing compatibility 

964 PipelineCompiler.resolve_lazy_dataclasses_for_context(context, orchestrator) 

965 

966 

967 

968 

969 

970 context.freeze() 

971 compiled_contexts[axis_id] = context 

972 logger.debug(f"Compilation finished for axis value: {axis_id}") 

973 

974 # Log path planning summary once per plate 

975 if compiled_contexts: 975 ↛ 992line 975 didn't jump to line 992 because the condition on line 975 was always true

976 first_context = next(iter(compiled_contexts.values())) 

977 logger.info("📁 PATH PLANNING SUMMARY:") 

978 logger.info(f" Main pipeline output: {first_context.output_plate_root}") 

979 

980 # Check for materialization steps in first context 

981 materialization_steps = [] 

982 for step_id, plan in first_context.step_plans.items(): 

983 if 'materialized_output_dir' in plan: 

984 step_name = plan.get('step_name', f'step_{step_id}') 

985 mat_path = plan['materialized_output_dir'] 

986 materialization_steps.append((step_name, mat_path)) 

987 

988 for step_name, mat_path in materialization_steps: 

989 logger.info(f" Materialization {step_name}: {mat_path}") 

990 

991 # After processing all wells, strip attributes and finalize 

992 logger.info("Stripping attributes from pipeline definition steps.") 

993 StepAttributeStripper.strip_step_attributes(pipeline_definition, {}) 

994 

995 orchestrator._state = OrchestratorState.COMPILED 

996 

997 # Log worker configuration for execution planning 

998 effective_config = orchestrator.get_effective_config() 

999 logger.info(f"⚙️ EXECUTION CONFIG: {effective_config.num_workers} workers configured for pipeline execution") 

1000 

1001 logger.info(f"🏁 COMPILATION COMPLETE: {len(compiled_contexts)} wells compiled successfully") 

1002 

1003 # Return expected structure with both pipeline_definition and compiled_contexts 

1004 return { 

1005 'pipeline_definition': pipeline_definition, 

1006 'compiled_contexts': compiled_contexts 

1007 } 

1008 except Exception as e: 

1009 orchestrator._state = OrchestratorState.COMPILE_FAILED 

1010 logger.error(f"Failed to compile pipelines: {e}") 

1011 raise 

1012 

1013 

1014 

1015# The monolithic compile() method is removed. 

1016# Orchestrator will call the static methods above in sequence. 

1017# _strip_step_attributes is also removed as StepAttributeStripper is called by Orchestrator. 

1018 

1019 

1020def _resolve_step_axis_filters(resolved_steps: List[AbstractStep], context, orchestrator): 

1021 """ 

1022 Resolve axis filters for steps with any WellFilterConfig instances. 

1023 

1024 This function handles step-level axis filtering by resolving patterns like 

1025 "row:A", ["A01", "B02"], or max counts against the available axis values for the plate. 

1026 It processes ALL WellFilterConfig instances (materialization, streaming, etc.) uniformly. 

1027 

1028 Args: 

1029 resolved_steps: List of pipeline steps with lazy configs already resolved 

1030 context: Processing context for the current axis value 

1031 orchestrator: Orchestrator instance with access to available axis values 

1032 """ 

1033 from openhcs.core.utils import WellFilterProcessor 

1034 from openhcs.core.config import WellFilterConfig 

1035 

1036 # Get available axis values from orchestrator using multiprocessing axis 

1037 from openhcs.constants import MULTIPROCESSING_AXIS 

1038 available_axis_values = orchestrator.get_component_keys(MULTIPROCESSING_AXIS) 

1039 if not available_axis_values: 1039 ↛ 1040line 1039 didn't jump to line 1040 because the condition on line 1039 was never true

1040 logger.warning("No available axis values found for axis filter resolution") 

1041 return 

1042 

1043 # Initialize step_axis_filters in context if not present 

1044 if not hasattr(context, 'step_axis_filters'): 1044 ↛ 1048line 1044 didn't jump to line 1048 because the condition on line 1044 was always true

1045 context.step_axis_filters = {} 

1046 

1047 # Process each step for ALL WellFilterConfig instances using the already resolved steps 

1048 for step_index, resolved_step in enumerate(resolved_steps): 

1049 step_filters = {} 

1050 

1051 # Check all attributes for WellFilterConfig instances on the RESOLVED step 

1052 for attr_name in dir(resolved_step): 

1053 if not attr_name.startswith('_'): 

1054 config = getattr(resolved_step, attr_name, None) 

1055 if config is not None and isinstance(config, WellFilterConfig) and config.well_filter is not None: 

1056 try: 

1057 # Resolve the axis filter pattern to concrete axis values 

1058 resolved_axis_values = WellFilterProcessor.resolve_compilation_filter( 

1059 config.well_filter, 

1060 available_axis_values 

1061 ) 

1062 

1063 # Store resolved axis values for this config 

1064 step_filters[attr_name] = { 

1065 'resolved_axis_values': sorted(resolved_axis_values), 

1066 'filter_mode': config.well_filter_mode, 

1067 'original_filter': config.well_filter 

1068 } 

1069 

1070 logger.debug(f"Step '{resolved_step.name}' {attr_name} filter '{config.well_filter}' " 

1071 f"resolved to {len(resolved_axis_values)} axis values: {sorted(resolved_axis_values)}") 

1072 logger.debug(f"Step '{resolved_step.name}' {attr_name} filter '{config.well_filter}' " 

1073 f"resolved to {len(resolved_axis_values)} axis values: {sorted(resolved_axis_values)}") 

1074 

1075 except Exception as e: 

1076 logger.error(f"Failed to resolve axis filter for step '{resolved_step.name}' {attr_name}: {e}") 

1077 raise ValueError(f"Invalid axis filter '{config.well_filter}' " 

1078 f"for step '{resolved_step.name}' {attr_name}: {e}") 

1079 

1080 # Store step filters if any were found 

1081 if step_filters: 1081 ↛ 1048line 1081 didn't jump to line 1048 because the condition on line 1081 was always true

1082 context.step_axis_filters[step_index] = step_filters 

1083 

1084 total_filters = sum(len(filters) for filters in context.step_axis_filters.values()) 

1085 logger.debug(f"Axis filter resolution complete. {len(context.step_axis_filters)} steps have axis filters, {total_filters} total filters.") 

1086 

1087 

1088def _should_process_for_well(axis_id, well_filter_config): 

1089 """Unified well filtering logic for all WellFilterConfig systems.""" 

1090 if well_filter_config.well_filter is None: 

1091 return True 

1092 

1093 well_in_filter = axis_id in well_filter_config.well_filter 

1094 return well_in_filter if well_filter_config.well_filter_mode == WellFilterMode.INCLUDE else not well_in_filter