Coverage for openhcs/core/pipeline/path_planner.py: 80.5%
237 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2Pipeline path planning - actually reduced duplication.
4This version ACTUALLY eliminates duplication instead of adding abstraction theater.
5"""
7import logging
8from dataclasses import dataclass
9from pathlib import Path
10from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple
12from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend
13from openhcs.constants.input_source import InputSource
14from openhcs.core.config import MaterializationBackend
15from openhcs.core.context.processing_context import ProcessingContext
16from openhcs.core.pipeline.pipeline_utils import get_core_callable
17from openhcs.core.steps.abstract import AbstractStep
18from openhcs.core.steps.function_step import FunctionStep
20logger = logging.getLogger(__name__)
23# ===== PATTERN NORMALIZATION (ONE place) =====
25def normalize_pattern(pattern: Any) -> Iterator[Tuple[Callable, str, int]]:
26 """THE single pattern normalizer - 15 lines, no duplication."""
27 if isinstance(pattern, dict):
28 for key, value in pattern.items():
29 for pos, func in enumerate(value if isinstance(value, list) else [value]):
30 if callable_func := get_core_callable(func): 30 ↛ 29line 30 didn't jump to line 29 because the condition on line 30 was always true
31 yield (callable_func, key, pos)
32 elif isinstance(pattern, list):
33 for pos, func in enumerate(pattern):
34 if callable_func := get_core_callable(func): 34 ↛ 33line 34 didn't jump to line 33 because the condition on line 34 was always true
35 yield (callable_func, "default", pos)
36 elif callable_func := get_core_callable(pattern): 36 ↛ exitline 36 didn't return from function 'normalize_pattern' because the condition on line 36 was always true
37 yield (callable_func, "default", 0)
40def extract_attributes(pattern: Any) -> Dict[str, Any]:
41 """Extract all function attributes in one pass - 10 lines."""
42 outputs, inputs, mat_funcs = set(), {}, {}
43 for func, _, _ in normalize_pattern(pattern):
44 outputs.update(getattr(func, '__special_outputs__', set()))
45 inputs.update(getattr(func, '__special_inputs__', {}))
46 mat_funcs.update(getattr(func, '__materialization_functions__', {}))
47 return {'outputs': outputs, 'inputs': inputs, 'mat_funcs': mat_funcs}
50# ===== PATH PLANNING (NO duplication) =====
52class PathPlanner:
53 """Minimal path planner with zero duplication."""
55 def __init__(self, context: ProcessingContext, pipeline_config):
56 self.ctx = context
57 # Access config directly from pipeline_config (lazy resolution happens via config_context)
58 self.cfg = pipeline_config.path_planning_config
59 self.vfs = pipeline_config.vfs_config
60 self.plans = context.step_plans
61 self.declared = {} # Tracks special outputs
63 # Initial input determination (once)
64 self.initial_input = Path(context.input_dir)
65 self.plate_path = Path(context.plate_path)
67 def plan(self, pipeline: List[AbstractStep]) -> Dict:
68 """Plan all paths with zero duplication."""
69 for i, step in enumerate(pipeline):
70 self._plan_step(step, i, pipeline)
72 self._validate(pipeline)
74 # Set output_plate_root and sub_dir for metadata writing
75 if pipeline: 75 ↛ 81line 75 didn't jump to line 81 because the condition on line 75 was always true
76 self.ctx.output_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False)
77 self.ctx.sub_dir = self.cfg.sub_dir
81 return self.plans
83 def _plan_step(self, step: AbstractStep, i: int, pipeline: List):
84 """Plan one step - no duplicate logic."""
85 sid = i # Use step index instead of step_id
87 # Get paths with unified logic
88 input_dir = self._get_dir(step, i, pipeline, 'input')
89 output_dir = self._get_dir(step, i, pipeline, 'output', input_dir)
91 # Extract function data if FunctionStep
92 attrs = extract_attributes(step.func) if isinstance(step, FunctionStep) else {
93 'outputs': self._normalize_attr(getattr(step, 'special_outputs', set()), set),
94 'inputs': self._normalize_attr(getattr(step, 'special_inputs', {}), dict),
95 'mat_funcs': {}
96 }
98 # Process special I/O with unified logic
99 special_outputs = self._process_special(attrs['outputs'], attrs['mat_funcs'], 'output', sid)
100 special_inputs = self._process_special(attrs['inputs'], attrs['outputs'], 'input', sid)
102 # Handle metadata injection
103 if isinstance(step, FunctionStep) and any(k in METADATA_RESOLVERS for k in attrs['inputs']):
104 step.func = self._inject_metadata(step.func, attrs['inputs'])
106 # Generate funcplan (only if needed)
107 funcplan = {}
108 if isinstance(step, FunctionStep) and special_outputs:
109 for func, dk, pos in normalize_pattern(step.func):
110 saves = [k for k in special_outputs if k in getattr(func, '__special_outputs__', set())]
111 if saves: 111 ↛ 109line 111 didn't jump to line 109 because the condition on line 111 was always true
112 funcplan[f"{func.__name__}_{dk}_{pos}"] = saves
114 # Handle optional materialization and input conversion
115 # Read step_materialization_config directly from step object (not step plans, which aren't populated yet)
116 materialized_output_dir = None
117 if step.step_materialization_config:
118 # Check if this step has well filters and if current well should be materialized
119 step_axis_filters = getattr(self.ctx, 'step_axis_filters', {}).get(sid, {})
120 materialization_filter = step_axis_filters.get('step_materialization_config')
122 if materialization_filter: 122 ↛ 137line 122 didn't jump to line 137 because the condition on line 122 was always true
123 # Inline simple conditional logic for axis filtering
124 from openhcs.core.config import WellFilterMode
125 axis_in_filter = self.ctx.axis_id in materialization_filter['resolved_axis_values']
126 should_materialize = (
127 axis_in_filter if materialization_filter['filter_mode'] == WellFilterMode.INCLUDE
128 else not axis_in_filter
129 )
131 if should_materialize:
132 materialized_output_dir = self._build_output_path(step.step_materialization_config)
133 else:
134 logger.debug(f"Skipping materialization for step {step.name}, axis {self.ctx.axis_id} (filtered out)")
135 else:
136 # No axis filter - create materialization path as normal
137 materialized_output_dir = self._build_output_path(step.step_materialization_config)
139 input_conversion_dir = self._get_optional_path("input_conversion_config", sid)
141 # Calculate main pipeline plate root for this step
142 main_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False)
144 # Single update
145 self.plans[sid].update({
146 'input_dir': str(input_dir),
147 'output_dir': str(output_dir),
148 'output_plate_root': str(main_plate_root),
149 'sub_dir': self.cfg.sub_dir, # Store resolved sub_dir for main pipeline
150 'pipeline_position': i,
151 'input_source': self._get_input_source(step, i),
152 'special_inputs': special_inputs,
153 'special_outputs': special_outputs,
154 'funcplan': funcplan,
155 })
157 # Add optional paths if configured
158 if materialized_output_dir:
159 # Per-step materialization uses its own config to determine plate root
160 materialized_plate_root = self.build_output_plate_root(self.plate_path, step.step_materialization_config, is_per_step_materialization=False)
161 self.plans[sid].update({
162 'materialized_output_dir': str(materialized_output_dir),
163 'materialized_plate_root': str(materialized_plate_root),
164 'materialized_sub_dir': step.step_materialization_config.sub_dir, # Store resolved sub_dir for materialization
165 'materialized_backend': self.vfs.materialization_backend.value,
166 'materialization_config': step.step_materialization_config # Store config for well filtering (will be resolved by compiler)
167 })
168 if input_conversion_dir:
169 self.plans[sid].update({
170 'input_conversion_dir': str(input_conversion_dir),
171 'input_conversion_backend': self.vfs.materialization_backend.value
172 })
174 # Set backend if needed
175 if getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
176 self.plans[sid][READ_BACKEND] = self.vfs.materialization_backend.value
178 # If zarr conversion occurred, redirect input_dir to zarr store
179 if self.vfs.materialization_backend == MaterializationBackend.ZARR and pipeline:
180 first_step_plan = self.plans.get(0, {}) # Use step index 0 instead of step_id
181 if "input_conversion_dir" in first_step_plan: 181 ↛ exitline 181 didn't return from function '_plan_step' because the condition on line 181 was always true
182 self.plans[sid]['input_dir'] = first_step_plan['input_conversion_dir']
184 def _get_dir(self, step: AbstractStep, i: int, pipeline: List,
185 dir_type: str, fallback: Path = None) -> Path:
186 """Unified directory resolution - no duplication."""
187 sid = i # Use step index instead of step_id
189 # Check overrides (same for input/output)
190 if override := self.plans.get(sid, {}).get(f'{dir_type}_dir'): 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 return Path(override)
192 if override := getattr(step, f'__{dir_type}_dir__', None): 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 return Path(override)
195 # Type-specific logic
196 if dir_type == 'input':
197 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
198 return self.initial_input
199 prev_step_index = i - 1 # Use previous step index instead of step_id
200 return Path(self.plans[prev_step_index]['output_dir'])
201 else: # output
202 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
203 return self._build_output_path()
204 return fallback # Work in place
206 @staticmethod
207 def build_output_plate_root(plate_path: Path, path_config, is_per_step_materialization: bool = False) -> Path:
208 """Build output plate root directory directly from configuration components.
210 Formula:
211 - If output_dir_suffix is empty and NOT per-step materialization: use main pipeline output directory
212 - If output_dir_suffix is empty and IS per-step materialization: use plate_path directly
213 - Otherwise: (global_output_folder OR plate_path.parent) + plate_name + output_dir_suffix
215 Args:
216 plate_path: Path to the original plate directory
217 path_config: PathPlanningConfig with global_output_folder and output_dir_suffix
218 is_per_step_materialization: True if this is per-step materialization (no auto suffix)
220 Returns:
221 Path to plate root directory (e.g., "/data/results/plate001_processed")
222 """
225 base = Path(path_config.global_output_folder) if path_config.global_output_folder else plate_path.parent
227 # Handle empty suffix differently for per-step vs pipeline-level materialization
228 if not path_config.output_dir_suffix:
229 if is_per_step_materialization: 229 ↛ 231line 229 didn't jump to line 231 because the condition on line 229 was never true
230 # Per-step materialization: use exact path without automatic suffix
231 return base / plate_path.name
232 else:
233 # Pipeline-level materialization: trust lazy inheritance system
234 return base / plate_path.name
236 result = base / f"{plate_path.name}{path_config.output_dir_suffix}"
237 return result
239 def _build_output_path(self, path_config=None) -> Path:
240 """Build complete output path: plate_root + sub_dir"""
241 config = path_config or self.cfg
243 # Use the config's own output_dir_suffix to determine plate root
244 plate_root = self.build_output_plate_root(self.plate_path, config, is_per_step_materialization=False)
245 return plate_root / config.sub_dir
247 def _calculate_materialized_output_path(self, materialization_config) -> Path:
248 """Calculate materialized output path using custom PathPlanningConfig."""
249 return self._build_output_path(materialization_config)
251 def _calculate_input_conversion_path(self, conversion_config) -> Path:
252 """Calculate input conversion path using custom PathPlanningConfig."""
253 return self._build_output_path(conversion_config)
255 def _get_optional_path(self, config_key: str, step_index: int) -> Optional[Path]:
256 """Get optional path if config exists."""
257 if config_key in self.plans[step_index]:
258 config = self.plans[step_index][config_key]
259 return self._build_output_path(config)
260 return None
262 def _process_special(self, items: Any, extra: Any, io_type: str, sid: str) -> Dict:
263 """Unified special I/O processing - no duplication."""
264 result = {}
266 if io_type == 'output' and items: # Special outputs
267 results_path = self._get_results_path()
268 for key in sorted(items):
269 filename = PipelinePathPlanner._build_axis_filename(self.ctx.axis_id, key)
270 path = results_path / filename
271 result[key] = {
272 'path': str(path),
273 'materialization_function': extra.get(key) # extra is mat_funcs
274 }
275 self.declared[key] = str(path)
277 elif io_type == 'input' and items: # Special inputs
278 for key in sorted(items.keys() if isinstance(items, dict) else items):
279 if key in self.declared:
280 result[key] = {'path': self.declared[key], 'source_step_id': 'prev'}
281 elif key in extra: # extra is outputs (self-fulfilling) 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 result[key] = {'path': 'self', 'source_step_id': sid}
283 elif key not in METADATA_RESOLVERS: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 raise ValueError(f"Step {sid} needs '{key}' but it's not available")
286 return result
288 def _inject_metadata(self, pattern: Any, inputs: Dict) -> Any:
289 """Inject metadata for special inputs."""
290 for key in inputs:
291 if key in METADATA_RESOLVERS and key not in self.declared: 291 ↛ 290line 291 didn't jump to line 290 because the condition on line 291 was always true
292 value = METADATA_RESOLVERS[key]["resolver"](self.ctx)
293 pattern = self._inject_into_pattern(pattern, key, value)
294 return pattern
296 def _inject_into_pattern(self, pattern: Any, key: str, value: Any) -> Any:
297 """Inject value into pattern - handles all cases in 6 lines."""
298 if callable(pattern):
299 return (pattern, {key: value})
300 if isinstance(pattern, tuple) and len(pattern) == 2: 300 ↛ 302line 300 didn't jump to line 302 because the condition on line 300 was always true
301 return (pattern[0], {**pattern[1], key: value})
302 if isinstance(pattern, list) and len(pattern) == 1:
303 return [self._inject_into_pattern(pattern[0], key, value)]
304 raise ValueError(f"Cannot inject into pattern type: {type(pattern)}")
306 def _normalize_attr(self, attr: Any, target_type: type) -> Any:
307 """Normalize step attributes - 5 lines, no duplication."""
308 if target_type == set:
309 return {attr} if isinstance(attr, str) else set(attr) if isinstance(attr, (list, set)) else set()
310 else: # dict
311 return {attr: True} if isinstance(attr, str) else {k: True for k in attr} if isinstance(attr, list) else attr if isinstance(attr, dict) else {}
313 def _get_input_source(self, step: AbstractStep, i: int) -> str:
314 """Get input source string."""
315 if getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
316 return 'PIPELINE_START'
317 return 'PREVIOUS_STEP'
319 def _get_results_path(self) -> Path:
320 """Get results path from global pipeline configuration."""
321 try:
322 # Access materialization_results_path from global config, not path planning config
323 path = self.ctx.global_config.materialization_results_path
324 return Path(path) if Path(path).is_absolute() else self.plate_path / path
325 except AttributeError as e:
326 # Fallback with clear error message if global config is unavailable
327 raise RuntimeError(f"Cannot access global config for materialization_results_path: {e}") from e
329 def _validate(self, pipeline: List):
330 """Validate connectivity and materialization paths - no duplication."""
331 # Existing connectivity validation
332 for i in range(1, len(pipeline)):
333 curr, prev = pipeline[i], pipeline[i-1]
334 if getattr(curr, 'input_source', None) == InputSource.PIPELINE_START:
335 continue
336 curr_in = self.plans[i]['input_dir'] # Use step index i
337 prev_out = self.plans[i-1]['output_dir'] # Use step index i-1
338 if curr_in != prev_out: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 has_special = any(inp.get('source_step_id') in [i-1, 'prev'] # Check both step index and 'prev'
340 for inp in self.plans[i].get('special_inputs', {}).values()) # Use step index i
341 if not has_special:
342 raise ValueError(f"Disconnect: {prev.name} -> {curr.name}")
344 # NEW: Materialization path collision validation
345 self._validate_materialization_paths(pipeline)
348 def _validate_materialization_paths(self, pipeline: List[AbstractStep]) -> None:
349 """Validate and resolve materialization path collisions with symmetric conflict resolution."""
350 global_path = self._build_output_path(self.cfg)
352 # Collect all materialization steps with their paths and positions
353 mat_steps = [
354 (step, self.plans.get(i, {}).get('pipeline_position', 0), self._build_output_path(step.step_materialization_config))
355 for i, step in enumerate(pipeline) if step.step_materialization_config
356 ]
358 # Group by path for conflict detection
359 from collections import defaultdict
360 path_groups = defaultdict(list)
361 for step, pos, path in mat_steps:
362 if path == global_path: 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true
363 self._resolve_and_update_paths(step, pos, path, "main flow")
364 else:
365 path_groups[str(path)].append((step, pos, path))
367 # Resolve materialization vs materialization conflicts
368 for path_key, step_list in path_groups.items():
369 if len(step_list) > 1:
370 print(f"⚠️ Materialization path collision detected for {len(step_list)} steps at: {path_key}")
371 for step, pos, path in step_list:
372 self._resolve_and_update_paths(step, pos, path, f"pos {pos}")
374 def _resolve_and_update_paths(self, step: AbstractStep, position: int, original_path: Path, conflict_type: str) -> None:
375 """Resolve path conflict by updating sub_dir configuration directly."""
376 # Lazy configs are already resolved via config_context() in the compiler
377 # No need to call to_base_config() - that's legacy code
378 materialization_config = step.step_materialization_config
380 # Generate unique sub_dir name instead of calculating from paths
381 original_sub_dir = materialization_config.sub_dir
382 print(f"🔍 PATH_COLLISION DEBUG: step '{step.name}' original_sub_dir = '{original_sub_dir}' (type: {type(materialization_config).__name__})")
383 new_sub_dir = f"{original_sub_dir}_step{position}"
385 # Update step materialization config with new sub_dir
386 from dataclasses import replace
387 step.step_materialization_config = replace(materialization_config, sub_dir=new_sub_dir)
389 # Recalculate the resolved path using the updated config
390 resolved_path = self._build_output_path(step.step_materialization_config)
392 # Update step plans for metadata generation
393 if step_plan := self.plans.get(position): # Use position (step index) instead of step_id 393 ↛ 398line 393 didn't jump to line 398 because the condition on line 393 was always true
394 if 'materialized_output_dir' in step_plan: 394 ↛ 398line 394 didn't jump to line 398 because the condition on line 394 was always true
395 step_plan['materialized_output_dir'] = str(resolved_path)
396 step_plan['materialized_sub_dir'] = new_sub_dir # Update stored sub_dir
398 print(f" - step '{step.name}' ({conflict_type}) → {resolved_path}")
402# ===== PUBLIC API =====
404class PipelinePathPlanner:
405 """Public API matching original interface."""
407 @staticmethod
408 def prepare_pipeline_paths(context: ProcessingContext,
409 pipeline_definition: List[AbstractStep],
410 pipeline_config) -> Dict:
411 """Prepare pipeline paths."""
412 return PathPlanner(context, pipeline_config).plan(pipeline_definition)
414 @staticmethod
415 def _build_axis_filename(axis_id: str, key: str, extension: str = "pkl") -> str:
416 """Build standardized axis-based filename."""
417 return f"{axis_id}_{key}.{extension}"
422# ===== METADATA =====
424METADATA_RESOLVERS = {
425 "grid_dimensions": {
426 "resolver": lambda context: context.microscope_handler.get_grid_dimensions(context.plate_path),
427 "description": "Grid dimensions (num_rows, num_cols) for position generation functions"
428 },
429}
431def resolve_metadata(key: str, context) -> Any:
432 """Resolve metadata value."""
433 if key not in METADATA_RESOLVERS:
434 raise ValueError(f"No resolver for '{key}'")
435 return METADATA_RESOLVERS[key]["resolver"](context)
440def register_metadata_resolver(key: str, resolver: Callable, description: str):
441 """Register metadata resolver."""
442 METADATA_RESOLVERS[key] = {"resolver": resolver, "description": description}
445# ===== SCOPE PROMOTION (separate concern) =====
447def _apply_scope_promotion_rules(dict_pattern, special_outputs, declared_outputs, step_index, position):
448 """Scope promotion for single-key dict patterns - 15 lines."""
449 if len(dict_pattern) != 1:
450 return special_outputs, declared_outputs
452 key_prefix = f"{list(dict_pattern.keys())[0]}_0_"
453 promoted_out, promoted_decl = special_outputs.copy(), declared_outputs.copy()
455 for out_key in list(special_outputs.keys()):
456 if out_key.startswith(key_prefix):
457 promoted_key = out_key[len(key_prefix):]
458 if promoted_key in promoted_decl:
459 raise ValueError(f"Collision: {promoted_key} already exists")
460 promoted_out[promoted_key] = special_outputs[out_key]
461 promoted_decl[promoted_key] = {
462 "step_index": step_index, "position": position,
463 "path": special_outputs[out_key]["path"]
464 }
466 return promoted_out, promoted_decl