Coverage for openhcs/core/pipeline/path_planner.py: 77.5%
234 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Pipeline path planning - actually reduced duplication.
4This version ACTUALLY eliminates duplication instead of adding abstraction theater.
5"""
7import logging
8from dataclasses import dataclass
9from pathlib import Path
10from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple
12from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend
13from openhcs.constants.input_source import InputSource
14from openhcs.core.config import MaterializationBackend
15from openhcs.core.context.processing_context import ProcessingContext
16from openhcs.core.pipeline.pipeline_utils import get_core_callable
17from openhcs.core.steps.abstract import AbstractStep
18from openhcs.core.steps.function_step import FunctionStep
20logger = logging.getLogger(__name__)
23# ===== PATTERN NORMALIZATION (ONE place) =====
25def normalize_pattern(pattern: Any) -> Iterator[Tuple[Callable, str, int]]:
26 """THE single pattern normalizer - 15 lines, no duplication."""
27 if isinstance(pattern, dict): 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 for key, value in pattern.items():
29 for pos, func in enumerate(value if isinstance(value, list) else [value]):
30 if callable_func := get_core_callable(func):
31 yield (callable_func, key, pos)
32 elif isinstance(pattern, list):
33 for pos, func in enumerate(pattern):
34 if callable_func := get_core_callable(func): 34 ↛ 33line 34 didn't jump to line 33 because the condition on line 34 was always true
35 yield (callable_func, "default", pos)
36 elif callable_func := get_core_callable(pattern): 36 ↛ exitline 36 didn't return from function 'normalize_pattern' because the condition on line 36 was always true
37 yield (callable_func, "default", 0)
40def extract_attributes(pattern: Any) -> Dict[str, Any]:
41 """Extract all function attributes in one pass - 10 lines."""
42 outputs, inputs, mat_funcs = set(), {}, {}
43 for func, _, _ in normalize_pattern(pattern):
44 outputs.update(getattr(func, '__special_outputs__', set()))
45 inputs.update(getattr(func, '__special_inputs__', {}))
46 mat_funcs.update(getattr(func, '__materialization_functions__', {}))
47 return {'outputs': outputs, 'inputs': inputs, 'mat_funcs': mat_funcs}
50# ===== PATH PLANNING (NO duplication) =====
52class PathPlanner:
53 """Minimal path planner with zero duplication."""
55 def __init__(self, context: ProcessingContext):
56 self.ctx = context
57 self.cfg = context.get_path_planning_config()
58 self.vfs = context.get_vfs_config()
59 self.plans = context.step_plans
60 self.declared = {} # Tracks special outputs
62 # Initial input determination (once)
63 self.initial_input = Path(context.input_dir)
64 self.plate_path = Path(context.plate_path)
66 def plan(self, pipeline: List[AbstractStep]) -> Dict:
67 """Plan all paths with zero duplication."""
68 for i, step in enumerate(pipeline):
69 self._plan_step(step, i, pipeline)
71 self._validate(pipeline)
73 # Set output_plate_root and sub_dir for metadata writing
74 if pipeline: 74 ↛ 78line 74 didn't jump to line 78 because the condition on line 74 was always true
75 self.ctx.output_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False)
76 self.ctx.sub_dir = self.cfg.sub_dir
78 return self.plans
80 def _plan_step(self, step: AbstractStep, i: int, pipeline: List):
81 """Plan one step - no duplicate logic."""
82 sid = step.step_id
84 # Get paths with unified logic
85 input_dir = self._get_dir(step, i, pipeline, 'input')
86 output_dir = self._get_dir(step, i, pipeline, 'output', input_dir)
88 # Extract function data if FunctionStep
89 attrs = extract_attributes(step.func) if isinstance(step, FunctionStep) else {
90 'outputs': self._normalize_attr(getattr(step, 'special_outputs', set()), set),
91 'inputs': self._normalize_attr(getattr(step, 'special_inputs', {}), dict),
92 'mat_funcs': {}
93 }
95 # Process special I/O with unified logic
96 special_outputs = self._process_special(attrs['outputs'], attrs['mat_funcs'], 'output', sid)
97 special_inputs = self._process_special(attrs['inputs'], attrs['outputs'], 'input', sid)
99 # Handle metadata injection
100 if isinstance(step, FunctionStep) and any(k in METADATA_RESOLVERS for k in attrs['inputs']):
101 step.func = self._inject_metadata(step.func, attrs['inputs'])
103 # Generate funcplan (only if needed)
104 funcplan = {}
105 if isinstance(step, FunctionStep) and special_outputs:
106 for func, dk, pos in normalize_pattern(step.func):
107 saves = [k for k in special_outputs if k in getattr(func, '__special_outputs__', set())]
108 if saves: 108 ↛ 106line 108 didn't jump to line 106 because the condition on line 108 was always true
109 funcplan[f"{func.__name__}_{dk}_{pos}"] = saves
111 # Handle optional materialization and input conversion
112 # Read materialization_config directly from step object (not step plans, which aren't populated yet)
113 materialized_output_dir = None
114 if step.materialization_config:
115 # Check if this step has well filters and if current well should be materialized
116 step_well_filter = getattr(self.ctx, 'step_well_filters', {}).get(sid)
118 if step_well_filter: 118 ↛ 133line 118 didn't jump to line 133 because the condition on line 118 was always true
119 # Inline simple conditional logic for well filtering
120 from openhcs.core.config import WellFilterMode
121 well_in_filter = self.ctx.well_id in step_well_filter['resolved_wells']
122 should_materialize = (
123 well_in_filter if step_well_filter['filter_mode'] == WellFilterMode.INCLUDE
124 else not well_in_filter
125 )
127 if should_materialize:
128 materialized_output_dir = self._build_output_path(step.materialization_config)
129 else:
130 logger.debug(f"Skipping materialization for step {step.name}, well {self.ctx.well_id} (filtered out)")
131 else:
132 # No well filter - create materialization path as normal
133 materialized_output_dir = self._build_output_path(step.materialization_config)
135 input_conversion_dir = self._get_optional_path("input_conversion_config", sid)
137 # Calculate main pipeline plate root for this step
138 main_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False)
140 # Single update
141 self.plans[sid].update({
142 'input_dir': str(input_dir),
143 'output_dir': str(output_dir),
144 'output_plate_root': str(main_plate_root),
145 'sub_dir': self.cfg.sub_dir, # Store resolved sub_dir for main pipeline
146 'pipeline_position': i,
147 'input_source': self._get_input_source(step, i),
148 'special_inputs': special_inputs,
149 'special_outputs': special_outputs,
150 'funcplan': funcplan,
151 })
153 # Add optional paths if configured
154 if materialized_output_dir:
155 # Per-step materialization uses its own config to determine plate root
156 materialized_plate_root = self.build_output_plate_root(self.plate_path, step.materialization_config, is_per_step_materialization=False)
157 self.plans[sid].update({
158 'materialized_output_dir': str(materialized_output_dir),
159 'materialized_plate_root': str(materialized_plate_root),
160 'materialized_sub_dir': step.materialization_config.sub_dir, # Store resolved sub_dir for materialization
161 'materialized_backend': self.vfs.materialization_backend.value,
162 'materialization_config': step.materialization_config # Store config for well filtering (will be resolved by compiler)
163 })
164 if input_conversion_dir:
165 self.plans[sid].update({
166 'input_conversion_dir': str(input_conversion_dir),
167 'input_conversion_backend': self.vfs.materialization_backend.value
168 })
170 # Set backend if needed
171 if getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
172 self.plans[sid][READ_BACKEND] = self.vfs.materialization_backend.value
174 # If zarr conversion occurred, redirect input_dir to zarr store
175 if self.vfs.materialization_backend == MaterializationBackend.ZARR and pipeline:
176 first_step_plan = self.plans.get(pipeline[0].step_id, {})
177 if "input_conversion_dir" in first_step_plan: 177 ↛ exitline 177 didn't return from function '_plan_step' because the condition on line 177 was always true
178 self.plans[sid]['input_dir'] = first_step_plan['input_conversion_dir']
180 def _get_dir(self, step: AbstractStep, i: int, pipeline: List,
181 dir_type: str, fallback: Path = None) -> Path:
182 """Unified directory resolution - no duplication."""
183 sid = step.step_id
185 # Check overrides (same for input/output)
186 if override := self.plans.get(sid, {}).get(f'{dir_type}_dir'): 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true
187 return Path(override)
188 if override := getattr(step, f'__{dir_type}_dir__', None): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 return Path(override)
191 # Type-specific logic
192 if dir_type == 'input':
193 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
194 return self.initial_input
195 prev_sid = pipeline[i-1].step_id
196 return Path(self.plans[prev_sid]['output_dir'])
197 else: # output
198 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
199 return self._build_output_path()
200 return fallback # Work in place
202 @staticmethod
203 def build_output_plate_root(plate_path: Path, path_config, is_per_step_materialization: bool = False) -> Path:
204 """Build output plate root directory directly from configuration components.
206 Formula:
207 - If output_dir_suffix is empty and NOT per-step materialization: use main pipeline output directory
208 - If output_dir_suffix is empty and IS per-step materialization: use plate_path directly
209 - Otherwise: (global_output_folder OR plate_path.parent) + plate_name + output_dir_suffix
211 Args:
212 plate_path: Path to the original plate directory
213 path_config: PathPlanningConfig with global_output_folder and output_dir_suffix
214 is_per_step_materialization: True if this is per-step materialization (no auto suffix)
216 Returns:
217 Path to plate root directory (e.g., "/data/results/plate001_processed")
218 """
219 base = Path(path_config.global_output_folder) if path_config.global_output_folder else plate_path.parent
221 # Handle empty suffix differently for per-step vs pipeline-level materialization
222 if not path_config.output_dir_suffix:
223 if is_per_step_materialization: 223 ↛ 225line 223 didn't jump to line 225 because the condition on line 223 was never true
224 # Per-step materialization: use exact path without automatic suffix
225 return base / plate_path.name
226 else:
227 # Pipeline-level materialization: use main pipeline output directory
228 main_output_path = base / f"{plate_path.name}_outputs"
229 return main_output_path
231 return base / f"{plate_path.name}{path_config.output_dir_suffix}"
233 def _build_output_path(self, path_config=None) -> Path:
234 """Build complete output path: plate_root + sub_dir"""
235 config = path_config or self.cfg
237 # Use the config's own output_dir_suffix to determine plate root
238 plate_root = self.build_output_plate_root(self.plate_path, config, is_per_step_materialization=False)
239 return plate_root / config.sub_dir
241 def _calculate_materialized_output_path(self, materialization_config) -> Path:
242 """Calculate materialized output path using custom PathPlanningConfig."""
243 return self._build_output_path(materialization_config)
245 def _calculate_input_conversion_path(self, conversion_config) -> Path:
246 """Calculate input conversion path using custom PathPlanningConfig."""
247 return self._build_output_path(conversion_config)
249 def _get_optional_path(self, config_key: str, step_id: str) -> Optional[Path]:
250 """Get optional path if config exists."""
251 if config_key in self.plans[step_id]:
252 config = self.plans[step_id][config_key]
253 return self._build_output_path(config)
254 return None
256 def _process_special(self, items: Any, extra: Any, io_type: str, sid: str) -> Dict:
257 """Unified special I/O processing - no duplication."""
258 result = {}
260 if io_type == 'output' and items: # Special outputs
261 results_path = self._get_results_path()
262 for key in sorted(items):
263 filename = PipelinePathPlanner._build_well_filename(self.ctx.well_id, key)
264 path = results_path / filename
265 result[key] = {
266 'path': str(path),
267 'materialization_function': extra.get(key) # extra is mat_funcs
268 }
269 self.declared[key] = str(path)
271 elif io_type == 'input' and items: # Special inputs
272 for key in sorted(items.keys() if isinstance(items, dict) else items):
273 if key in self.declared:
274 result[key] = {'path': self.declared[key], 'source_step_id': 'prev'}
275 elif key in extra: # extra is outputs (self-fulfilling) 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 result[key] = {'path': 'self', 'source_step_id': sid}
277 elif key not in METADATA_RESOLVERS: 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true
278 raise ValueError(f"Step {sid} needs '{key}' but it's not available")
280 return result
282 def _inject_metadata(self, pattern: Any, inputs: Dict) -> Any:
283 """Inject metadata for special inputs."""
284 for key in inputs:
285 if key in METADATA_RESOLVERS and key not in self.declared: 285 ↛ 284line 285 didn't jump to line 284 because the condition on line 285 was always true
286 value = METADATA_RESOLVERS[key]["resolver"](self.ctx)
287 pattern = self._inject_into_pattern(pattern, key, value)
288 return pattern
290 def _inject_into_pattern(self, pattern: Any, key: str, value: Any) -> Any:
291 """Inject value into pattern - handles all cases in 6 lines."""
292 if callable(pattern):
293 return (pattern, {key: value})
294 if isinstance(pattern, tuple) and len(pattern) == 2: 294 ↛ 296line 294 didn't jump to line 296 because the condition on line 294 was always true
295 return (pattern[0], {**pattern[1], key: value})
296 if isinstance(pattern, list) and len(pattern) == 1:
297 return [self._inject_into_pattern(pattern[0], key, value)]
298 raise ValueError(f"Cannot inject into pattern type: {type(pattern)}")
300 def _normalize_attr(self, attr: Any, target_type: type) -> Any:
301 """Normalize step attributes - 5 lines, no duplication."""
302 if target_type == set:
303 return {attr} if isinstance(attr, str) else set(attr) if isinstance(attr, (list, set)) else set()
304 else: # dict
305 return {attr: True} if isinstance(attr, str) else {k: True for k in attr} if isinstance(attr, list) else attr if isinstance(attr, dict) else {}
307 def _get_input_source(self, step: AbstractStep, i: int) -> str:
308 """Get input source string."""
309 if getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
310 return 'PIPELINE_START'
311 return 'PREVIOUS_STEP'
313 def _get_results_path(self) -> Path:
314 """Get results path from global pipeline configuration."""
315 try:
316 # Access materialization_results_path from global config, not path planning config
317 path = self.ctx.global_config.materialization_results_path
318 return Path(path) if Path(path).is_absolute() else self.plate_path / path
319 except AttributeError as e:
320 # Fallback with clear error message if global config is unavailable
321 raise RuntimeError(f"Cannot access global config for materialization_results_path: {e}") from e
323 def _validate(self, pipeline: List):
324 """Validate connectivity and materialization paths - no duplication."""
325 # Existing connectivity validation
326 for i in range(1, len(pipeline)):
327 curr, prev = pipeline[i], pipeline[i-1]
328 if getattr(curr, 'input_source', None) == InputSource.PIPELINE_START:
329 continue
330 curr_in = self.plans[curr.step_id]['input_dir']
331 prev_out = self.plans[prev.step_id]['output_dir']
332 if curr_in != prev_out: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 has_special = any(inp.get('source_step_id') == prev.step_id
334 for inp in self.plans[curr.step_id].get('special_inputs', {}).values())
335 if not has_special:
336 raise ValueError(f"Disconnect: {prev.name} -> {curr.name}")
338 # NEW: Materialization path collision validation
339 self._validate_materialization_paths(pipeline)
342 def _validate_materialization_paths(self, pipeline: List[AbstractStep]) -> None:
343 """Validate and resolve materialization path collisions with symmetric conflict resolution."""
344 global_path = self._build_output_path(self.cfg)
346 # Collect all materialization steps with their paths and positions
347 mat_steps = [
348 (step, self.plans.get(step.step_id, {}).get('pipeline_position', 0), self._build_output_path(step.materialization_config))
349 for step in pipeline if step.materialization_config
350 ]
352 # Group by path for conflict detection
353 from collections import defaultdict
354 path_groups = defaultdict(list)
355 for step, pos, path in mat_steps:
356 if path == global_path: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true
357 self._resolve_and_update_paths(step, pos, path, "main flow")
358 else:
359 path_groups[str(path)].append((step, pos, path))
361 # Resolve materialization vs materialization conflicts
362 for path_key, step_list in path_groups.items():
363 if len(step_list) > 1:
364 print(f"⚠️ Materialization path collision detected for {len(step_list)} steps at: {path_key}")
365 for step, pos, path in step_list:
366 self._resolve_and_update_paths(step, pos, path, f"pos {pos}")
368 def _resolve_and_update_paths(self, step: AbstractStep, position: int, original_path: Path, conflict_type: str) -> None:
369 """Resolve path conflict by updating sub_dir configuration directly."""
370 # Generate unique sub_dir name instead of calculating from paths
371 original_sub_dir = step.materialization_config.sub_dir
372 new_sub_dir = f"{original_sub_dir}_step{position}"
374 # Update step materialization config with new sub_dir
375 config_class = type(step.materialization_config)
376 step.materialization_config = config_class(**{**step.materialization_config.__dict__, 'sub_dir': new_sub_dir})
378 # Recalculate the resolved path using the new sub_dir
379 resolved_path = self._build_output_path(step.materialization_config)
381 # Update step plans for metadata generation
382 if step_plan := self.plans.get(step.step_id): 382 ↛ 387line 382 didn't jump to line 387 because the condition on line 382 was always true
383 if 'materialized_output_dir' in step_plan: 383 ↛ 387line 383 didn't jump to line 387 because the condition on line 383 was always true
384 step_plan['materialized_output_dir'] = str(resolved_path)
385 step_plan['materialized_sub_dir'] = new_sub_dir # Update stored sub_dir
387 print(f" - step '{step.name}' ({conflict_type}) → {resolved_path}")
391# ===== PUBLIC API =====
393class PipelinePathPlanner:
394 """Public API matching original interface."""
396 @staticmethod
397 def prepare_pipeline_paths(context: ProcessingContext,
398 pipeline_definition: List[AbstractStep]) -> Dict:
399 """Prepare pipeline paths."""
400 return PathPlanner(context).plan(pipeline_definition)
402 @staticmethod
403 def _build_well_filename(well_id: str, key: str, extension: str = "pkl") -> str:
404 """Build standardized well-based filename."""
405 return f"{well_id}_{key}.{extension}"
410# ===== METADATA =====
412METADATA_RESOLVERS = {
413 "grid_dimensions": {
414 "resolver": lambda context: context.microscope_handler.get_grid_dimensions(context.plate_path),
415 "description": "Grid dimensions (num_rows, num_cols) for position generation functions"
416 },
417}
419def resolve_metadata(key: str, context) -> Any:
420 """Resolve metadata value."""
421 if key not in METADATA_RESOLVERS:
422 raise ValueError(f"No resolver for '{key}'")
423 return METADATA_RESOLVERS[key]["resolver"](context)
428def register_metadata_resolver(key: str, resolver: Callable, description: str):
429 """Register metadata resolver."""
430 METADATA_RESOLVERS[key] = {"resolver": resolver, "description": description}
433# ===== SCOPE PROMOTION (separate concern) =====
435def _apply_scope_promotion_rules(dict_pattern, special_outputs, declared_outputs, step_id, position):
436 """Scope promotion for single-key dict patterns - 15 lines."""
437 if len(dict_pattern) != 1:
438 return special_outputs, declared_outputs
440 key_prefix = f"{list(dict_pattern.keys())[0]}_0_"
441 promoted_out, promoted_decl = special_outputs.copy(), declared_outputs.copy()
443 for out_key in list(special_outputs.keys()):
444 if out_key.startswith(key_prefix):
445 promoted_key = out_key[len(key_prefix):]
446 if promoted_key in promoted_decl:
447 raise ValueError(f"Collision: {promoted_key} already exists")
448 promoted_out[promoted_key] = special_outputs[out_key]
449 promoted_decl[promoted_key] = {
450 "step_id": step_id, "position": position,
451 "path": special_outputs[out_key]["path"]
452 }
454 return promoted_out, promoted_decl