Coverage for openhcs/core/pipeline/path_planner.py: 78.9%
248 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Pipeline path planning - actually reduced duplication.
4This version ACTUALLY eliminates duplication instead of adding abstraction theater.
5"""
7import logging
8from dataclasses import dataclass
9from pathlib import Path
10from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple
12from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend
13from openhcs.constants.input_source import InputSource
14from openhcs.core.config import MaterializationBackend
15from openhcs.core.context.processing_context import ProcessingContext
16from openhcs.core.pipeline.pipeline_utils import get_core_callable
17from openhcs.core.steps.abstract import AbstractStep
18from openhcs.core.steps.function_step import FunctionStep
20logger = logging.getLogger(__name__)
23# ===== PATTERN NORMALIZATION (ONE place) =====
25def normalize_pattern(pattern: Any) -> Iterator[Tuple[Callable, str, int]]:
26 """THE single pattern normalizer - 15 lines, no duplication."""
27 if isinstance(pattern, dict):
28 for key, value in pattern.items():
29 for pos, func in enumerate(value if isinstance(value, list) else [value]):
30 if callable_func := get_core_callable(func): 30 ↛ 29line 30 didn't jump to line 29 because the condition on line 30 was always true
31 yield (callable_func, key, pos)
32 elif isinstance(pattern, list):
33 for pos, func in enumerate(pattern):
34 if callable_func := get_core_callable(func): 34 ↛ 33line 34 didn't jump to line 33 because the condition on line 34 was always true
35 yield (callable_func, "default", pos)
36 elif callable_func := get_core_callable(pattern): 36 ↛ exitline 36 didn't return from function 'normalize_pattern' because the condition on line 36 was always true
37 yield (callable_func, "default", 0)
40def extract_attributes(pattern: Any) -> Dict[str, Any]:
41 """Extract all function attributes in one pass - 10 lines."""
42 outputs, inputs, mat_funcs = set(), {}, {}
43 for func, _, _ in normalize_pattern(pattern):
44 outputs.update(getattr(func, '__special_outputs__', set()))
45 inputs.update(getattr(func, '__special_inputs__', {}))
46 mat_funcs.update(getattr(func, '__materialization_functions__', {}))
47 return {'outputs': outputs, 'inputs': inputs, 'mat_funcs': mat_funcs}
50# ===== PATH PLANNING (NO duplication) =====
52class PathPlanner:
53 """Minimal path planner with zero duplication."""
55 def __init__(self, context: ProcessingContext, pipeline_config):
56 self.ctx = context
57 # Access config directly from pipeline_config (lazy resolution happens via config_context)
58 self.cfg = pipeline_config.path_planning_config
59 self.vfs = pipeline_config.vfs_config
60 self.plans = context.step_plans
61 self.declared = {} # Tracks special outputs
63 # Initial input determination (once)
64 self.initial_input = Path(context.input_dir)
65 self.plate_path = Path(context.plate_path)
67 def plan(self, pipeline: List[AbstractStep]) -> Dict:
68 """Plan all paths with zero duplication."""
69 for i, step in enumerate(pipeline):
70 self._plan_step(step, i, pipeline)
72 self._validate(pipeline)
74 # Set output_plate_root and sub_dir for metadata writing
75 if pipeline: 75 ↛ 81line 75 didn't jump to line 81 because the condition on line 75 was always true
76 self.ctx.output_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False)
77 self.ctx.sub_dir = self.cfg.sub_dir
81 return self.plans
83 def _plan_step(self, step: AbstractStep, i: int, pipeline: List):
84 """Plan one step - no duplicate logic."""
85 sid = i # Use step index instead of step_id
87 # Get paths with unified logic
88 input_dir = self._get_dir(step, i, pipeline, 'input')
89 output_dir = self._get_dir(step, i, pipeline, 'output', input_dir)
91 # Extract function data if FunctionStep
92 attrs = extract_attributes(step.func) if isinstance(step, FunctionStep) else {
93 'outputs': self._normalize_attr(getattr(step, 'special_outputs', set()), set),
94 'inputs': self._normalize_attr(getattr(step, 'special_inputs', {}), dict),
95 'mat_funcs': {}
96 }
98 # Process special I/O with unified logic
99 special_outputs = self._process_special(attrs['outputs'], attrs['mat_funcs'], 'output', sid)
100 special_inputs = self._process_special(attrs['inputs'], attrs['outputs'], 'input', sid)
102 # Handle metadata injection
103 if isinstance(step, FunctionStep) and any(k in METADATA_RESOLVERS for k in attrs['inputs']):
104 step.func = self._inject_metadata(step.func, attrs['inputs'])
106 # Generate funcplan (only if needed)
107 funcplan = {}
108 if isinstance(step, FunctionStep) and special_outputs:
109 for func, dk, pos in normalize_pattern(step.func):
110 saves = [k for k in special_outputs if k in getattr(func, '__special_outputs__', set())]
111 if saves: 111 ↛ 109line 111 didn't jump to line 109 because the condition on line 111 was always true
112 funcplan[f"{func.__name__}_{dk}_{pos}"] = saves
114 # Handle optional materialization and input conversion
115 # Read step_materialization_config directly from step object (not step plans, which aren't populated yet)
116 materialized_output_dir = None
117 if step.step_materialization_config and step.step_materialization_config.enabled:
118 # Check if this step has well filters and if current well should be materialized
119 step_axis_filters = getattr(self.ctx, 'step_axis_filters', {}).get(sid, {})
120 materialization_filter = step_axis_filters.get('step_materialization_config')
122 if materialization_filter: 122 ↛ 137line 122 didn't jump to line 137 because the condition on line 122 was always true
123 # Inline simple conditional logic for axis filtering
124 from openhcs.core.config import WellFilterMode
125 axis_in_filter = self.ctx.axis_id in materialization_filter['resolved_axis_values']
126 should_materialize = (
127 axis_in_filter if materialization_filter['filter_mode'] == WellFilterMode.INCLUDE
128 else not axis_in_filter
129 )
131 if should_materialize:
132 materialized_output_dir = self._build_output_path(step.step_materialization_config)
133 else:
134 logger.debug(f"Skipping materialization for step {step.name}, axis {self.ctx.axis_id} (filtered out)")
135 else:
136 # No axis filter - create materialization path as normal
137 materialized_output_dir = self._build_output_path(step.step_materialization_config)
139 # Check if input_conversion_dir is already set by compiler (direct path)
140 # Otherwise try to calculate from input_conversion_config (legacy)
141 if "input_conversion_dir" in self.plans[sid]:
142 input_conversion_dir = Path(self.plans[sid]["input_conversion_dir"])
143 else:
144 input_conversion_dir = self._get_optional_path("input_conversion_config", sid)
146 # Calculate main pipeline plate root for this step
147 main_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False)
149 # Calculate analysis results directory (sibling to output_dir with _results suffix)
150 # This ensures results are saved alongside images at the same hierarchical level
151 # Example: images/ -> images_results/, checkpoints_step3/ -> checkpoints_step3_results/
152 output_dir_path = Path(output_dir)
153 dir_name = output_dir_path.name
154 analysis_results_dir = output_dir_path.parent / f"{dir_name}_results"
156 # Single update
157 self.plans[sid].update({
158 'input_dir': str(input_dir),
159 'output_dir': str(output_dir),
160 'output_plate_root': str(main_plate_root),
161 'sub_dir': self.cfg.sub_dir, # Store resolved sub_dir for main pipeline
162 'analysis_results_dir': str(analysis_results_dir), # Pre-calculated results directory
163 'pipeline_position': i,
164 'input_source': self._get_input_source(step, i),
165 'special_inputs': special_inputs,
166 'special_outputs': special_outputs,
167 'funcplan': funcplan,
168 })
170 # Add optional paths if configured
171 if materialized_output_dir:
172 # Per-step materialization uses its own config to determine plate root
173 materialized_plate_root = self.build_output_plate_root(self.plate_path, step.step_materialization_config, is_per_step_materialization=False)
175 # Calculate analysis results directory for materialized output
176 materialized_dir_path = Path(materialized_output_dir)
177 materialized_dir_name = materialized_dir_path.name
178 materialized_analysis_results_dir = materialized_dir_path.parent / f"{materialized_dir_name}_results"
180 self.plans[sid].update({
181 'materialized_output_dir': str(materialized_output_dir),
182 'materialized_plate_root': str(materialized_plate_root),
183 'materialized_sub_dir': step.step_materialization_config.sub_dir, # Store resolved sub_dir for materialization
184 'materialized_analysis_results_dir': str(materialized_analysis_results_dir), # Pre-calculated materialized results directory
185 'materialized_backend': self.vfs.materialization_backend.value,
186 'materialization_config': step.step_materialization_config # Store config for well filtering (will be resolved by compiler)
187 })
188 if input_conversion_dir:
189 self.plans[sid].update({
190 'input_conversion_dir': str(input_conversion_dir),
191 'input_conversion_backend': self.vfs.materialization_backend.value
192 })
194 # PIPELINE_START steps read from original input, not zarr conversion
195 # (zarr conversion only applies to normal pipeline flow, not PIPELINE_START jumps)
197 def _get_dir(self, step: AbstractStep, i: int, pipeline: List,
198 dir_type: str, fallback: Path = None) -> Path:
199 """Unified directory resolution - no duplication."""
200 sid = i # Use step index instead of step_id
202 # Check overrides (same for input/output)
203 if override := self.plans.get(sid, {}).get(f'{dir_type}_dir'): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 return Path(override)
205 if override := getattr(step, f'__{dir_type}_dir__', None): 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true
206 return Path(override)
208 # Type-specific logic
209 if dir_type == 'input':
210 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
211 return self.initial_input
212 prev_step_index = i - 1 # Use previous step index instead of step_id
213 return Path(self.plans[prev_step_index]['output_dir'])
214 else: # output
215 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START:
216 return self._build_output_path()
217 return fallback # Work in place
219 @staticmethod
220 def build_output_plate_root(plate_path: Path, path_config, is_per_step_materialization: bool = False) -> Path:
221 """Build output plate root directory directly from configuration components.
223 Formula: (global_output_folder OR plate_path.parent) + plate_name + output_dir_suffix
225 Results (analysis outputs) should ALWAYS use the output plate path, never the input plate path.
226 This ensures metadata coherence - ROIs and other analysis results are saved alongside the
227 processed images they were created from, not with the original input images.
229 Args:
230 plate_path: Path to the original plate directory
231 path_config: PathPlanningConfig with global_output_folder and output_dir_suffix
232 is_per_step_materialization: Unused (kept for API compatibility)
234 Returns:
235 Path to plate root directory (e.g., "/data/results/plate001_processed")
236 """
238 # OMERO paths always use /omero as base, ignore global_output_folder
239 if str(plate_path).startswith("/omero/"): 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 base = plate_path.parent
241 elif path_config.global_output_folder: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 base = Path(path_config.global_output_folder)
243 else:
244 base = plate_path.parent
246 # Always append suffix to create output plate path
247 # If suffix is None/empty, fail loud - this is a configuration error
248 if not path_config.output_dir_suffix: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 raise ValueError(
250 f"output_dir_suffix cannot be None or empty. "
251 f"Results must always use output plate path, not input plate path. "
252 f"Config: {path_config}"
253 )
255 result = base / f"{plate_path.name}{path_config.output_dir_suffix}"
256 return result
258 def _build_output_path(self, path_config=None) -> Path:
259 """Build complete output path: plate_root + sub_dir"""
260 config = path_config or self.cfg
262 # Use the config's own output_dir_suffix to determine plate root
263 plate_root = self.build_output_plate_root(self.plate_path, config, is_per_step_materialization=False)
264 return plate_root / config.sub_dir
266 def _calculate_materialized_output_path(self, materialization_config) -> Path:
267 """Calculate materialized output path using custom PathPlanningConfig."""
268 return self._build_output_path(materialization_config)
270 def _calculate_input_conversion_path(self, conversion_config) -> Path:
271 """Calculate input conversion path using custom PathPlanningConfig."""
272 return self._build_output_path(conversion_config)
274 def _get_optional_path(self, config_key: str, step_index: int) -> Optional[Path]:
275 """Get optional path if config exists."""
276 if config_key in self.plans[step_index]: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 config = self.plans[step_index][config_key]
278 return self._build_output_path(config)
279 return None
281 def _process_special(self, items: Any, extra: Any, io_type: str, sid: str) -> Dict:
282 """Unified special I/O processing - no duplication."""
283 result = {}
285 if io_type == 'output' and items: # Special outputs
286 results_path = self._get_results_path()
287 for key in sorted(items):
288 # Include step index in filename to prevent collisions when multiple steps
289 # produce the same special output (e.g., two crop_device steps both producing match_results)
290 filename = PipelinePathPlanner._build_axis_filename(self.ctx.axis_id, key, step_index=sid)
291 path = results_path / filename
292 result[key] = {
293 'path': str(path),
294 'materialization_function': extra.get(key) # extra is mat_funcs
295 }
296 self.declared[key] = str(path)
298 elif io_type == 'input' and items: # Special inputs
299 for key in sorted(items.keys() if isinstance(items, dict) else items):
300 if key in self.declared:
301 result[key] = {'path': self.declared[key], 'source_step_id': 'prev'}
302 elif key in extra: # extra is outputs (self-fulfilling) 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 result[key] = {'path': 'self', 'source_step_id': sid}
304 elif key not in METADATA_RESOLVERS: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 raise ValueError(f"Step {sid} needs '{key}' but it's not available")
307 return result
309 def _inject_metadata(self, pattern: Any, inputs: Dict) -> Any:
310 """Inject metadata for special inputs."""
311 for key in inputs:
312 if key in METADATA_RESOLVERS and key not in self.declared: 312 ↛ 311line 312 didn't jump to line 311 because the condition on line 312 was always true
313 value = METADATA_RESOLVERS[key]["resolver"](self.ctx)
314 pattern = self._inject_into_pattern(pattern, key, value)
315 return pattern
317 def _inject_into_pattern(self, pattern: Any, key: str, value: Any) -> Any:
318 """Inject value into pattern - handles all cases in 6 lines."""
319 if callable(pattern):
320 return (pattern, {key: value})
321 if isinstance(pattern, tuple) and len(pattern) == 2: 321 ↛ 323line 321 didn't jump to line 323 because the condition on line 321 was always true
322 return (pattern[0], {**pattern[1], key: value})
323 if isinstance(pattern, list) and len(pattern) == 1:
324 return [self._inject_into_pattern(pattern[0], key, value)]
325 raise ValueError(f"Cannot inject into pattern type: {type(pattern)}")
327 def _normalize_attr(self, attr: Any, target_type: type) -> Any:
328 """Normalize step attributes - 5 lines, no duplication."""
329 if target_type == set:
330 return {attr} if isinstance(attr, str) else set(attr) if isinstance(attr, (list, set)) else set()
331 else: # dict
332 return {attr: True} if isinstance(attr, str) else {k: True for k in attr} if isinstance(attr, list) else attr if isinstance(attr, dict) else {}
334 def _get_input_source(self, step: AbstractStep, i: int) -> str:
335 """Get input source string."""
336 if step.input_source == InputSource.PIPELINE_START:
337 return 'PIPELINE_START'
338 return 'PREVIOUS_STEP'
340 def _get_results_path(self) -> Path:
341 """Get results path from global pipeline configuration.
343 Results must always be stored in the OUTPUT plate, not the input plate.
344 This ensures metadata coherence - analysis results are saved alongside the
345 processed images they were created from.
346 """
347 try:
348 # Access materialization_results_path from global config, not path planning config
349 path = self.ctx.global_config.materialization_results_path
351 # Build output plate root to ensure results go to output plate
352 output_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False)
354 return Path(path) if Path(path).is_absolute() else output_plate_root / path
355 except AttributeError as e:
356 # Fallback with clear error message if global config is unavailable
357 raise RuntimeError(f"Cannot access global config for materialization_results_path: {e}") from e
359 def _validate(self, pipeline: List):
360 """Validate connectivity and materialization paths - no duplication."""
361 # Existing connectivity validation
362 for i in range(1, len(pipeline)):
363 curr, prev = pipeline[i], pipeline[i-1]
364 if getattr(curr, 'input_source', None) == InputSource.PIPELINE_START:
365 continue
366 curr_in = self.plans[i]['input_dir'] # Use step index i
367 prev_out = self.plans[i-1]['output_dir'] # Use step index i-1
368 if curr_in != prev_out: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true
369 has_special = any(inp.get('source_step_id') in [i-1, 'prev'] # Check both step index and 'prev'
370 for inp in self.plans[i].get('special_inputs', {}).values()) # Use step index i
371 if not has_special:
372 raise ValueError(f"Disconnect: {prev.name} -> {curr.name}")
374 # NEW: Materialization path collision validation
375 self._validate_materialization_paths(pipeline)
378 def _validate_materialization_paths(self, pipeline: List[AbstractStep]) -> None:
379 """Validate and resolve materialization path collisions with symmetric conflict resolution."""
380 global_path = self._build_output_path(self.cfg)
382 # Collect all materialization steps with their paths and positions
383 mat_steps = [
384 (step, self.plans.get(i, {}).get('pipeline_position', 0), self._build_output_path(step.step_materialization_config))
385 for i, step in enumerate(pipeline) if step.step_materialization_config and step.step_materialization_config.enabled
386 ]
388 # Group by path for conflict detection
389 from collections import defaultdict
390 path_groups = defaultdict(list)
391 for step, pos, path in mat_steps:
392 if path == global_path: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true
393 self._resolve_and_update_paths(step, pos, path, "main flow")
394 else:
395 path_groups[str(path)].append((step, pos, path))
397 # Resolve materialization vs materialization conflicts
398 for path_key, step_list in path_groups.items():
399 if len(step_list) > 1:
400 for step, pos, path in step_list:
401 self._resolve_and_update_paths(step, pos, path, f"pos {pos}")
403 def _resolve_and_update_paths(self, step: AbstractStep, position: int, original_path: Path, conflict_type: str) -> None:
404 """Resolve path conflict by updating sub_dir configuration directly."""
405 # Lazy configs are already resolved via config_context() in the compiler
406 # No need to call to_base_config() - that's legacy code
407 materialization_config = step.step_materialization_config
409 # Generate unique sub_dir name instead of calculating from paths
410 original_sub_dir = materialization_config.sub_dir
411 new_sub_dir = f"{original_sub_dir}_step{position}"
413 # Update step materialization config with new sub_dir
414 from dataclasses import replace
415 step.step_materialization_config = replace(materialization_config, sub_dir=new_sub_dir)
417 # Recalculate the resolved path using the updated config
418 resolved_path = self._build_output_path(step.step_materialization_config)
420 # Update step plans for metadata generation
421 if step_plan := self.plans.get(position): # Use position (step index) instead of step_id 421 ↛ exitline 421 didn't return from function '_resolve_and_update_paths' because the condition on line 421 was always true
422 if 'materialized_output_dir' in step_plan: 422 ↛ exitline 422 didn't return from function '_resolve_and_update_paths' because the condition on line 422 was always true
423 step_plan['materialized_output_dir'] = str(resolved_path)
424 step_plan['materialized_sub_dir'] = new_sub_dir # Update stored sub_dir
428# ===== PUBLIC API =====
430class PipelinePathPlanner:
431 """Public API matching original interface."""
433 @staticmethod
434 def prepare_pipeline_paths(context: ProcessingContext,
435 pipeline_definition: List[AbstractStep],
436 pipeline_config) -> Dict:
437 """Prepare pipeline paths."""
438 return PathPlanner(context, pipeline_config).plan(pipeline_definition)
440 @staticmethod
441 def _build_axis_filename(axis_id: str, key: str, extension: str = "pkl", step_index: Optional[int] = None) -> str:
442 """Build standardized axis-based filename with optional step index.
444 Args:
445 axis_id: Well/axis identifier (e.g., "R02C02")
446 key: Special output key (e.g., "match_results")
447 extension: File extension (default: "pkl")
448 step_index: Optional step index to prevent collisions when multiple steps
449 produce the same special output
451 Returns:
452 Filename string (e.g., "R02C02_match_results_step3.pkl")
453 """
454 if step_index is not None: 454 ↛ 456line 454 didn't jump to line 456 because the condition on line 454 was always true
455 return f"{axis_id}_{key}_step{step_index}.{extension}"
456 return f"{axis_id}_{key}.{extension}"
458 @staticmethod
459 def build_dict_pattern_path(base_path: str, dict_key: str) -> str:
460 """Build channel-specific path for dict patterns.
462 Inserts _w{dict_key} after well ID in the filename.
463 Example: "dir/A01_rois_step7.pkl" + "1" -> "dir/A01_w1_rois_step7.pkl"
465 Args:
466 base_path: Base path without channel component
467 dict_key: Dict pattern key (e.g., "1" for channel 1)
469 Returns:
470 Channel-specific path
471 """
472 # Use Path for cross-platform path handling (Windows uses backslashes)
473 path = Path(base_path)
474 dir_part = path.parent
475 filename = path.name
476 well_id, rest = filename.split('_', 1)
477 return str(dir_part / f"{well_id}_w{dict_key}_{rest}")
482# ===== METADATA =====
484METADATA_RESOLVERS = {
485 "grid_dimensions": {
486 "resolver": lambda context: context.microscope_handler.get_grid_dimensions(context.plate_path),
487 "description": "Grid dimensions (num_rows, num_cols) for position generation functions"
488 },
489}
491def resolve_metadata(key: str, context) -> Any:
492 """Resolve metadata value."""
493 if key not in METADATA_RESOLVERS:
494 raise ValueError(f"No resolver for '{key}'")
495 return METADATA_RESOLVERS[key]["resolver"](context)
500def register_metadata_resolver(key: str, resolver: Callable, description: str):
501 """Register metadata resolver."""
502 METADATA_RESOLVERS[key] = {"resolver": resolver, "description": description}
505# ===== SCOPE PROMOTION (separate concern) =====
507def _apply_scope_promotion_rules(dict_pattern, special_outputs, declared_outputs, step_index, position):
508 """Scope promotion for single-key dict patterns - 15 lines."""
509 if len(dict_pattern) != 1:
510 return special_outputs, declared_outputs
512 key_prefix = f"{list(dict_pattern.keys())[0]}_0_"
513 promoted_out, promoted_decl = special_outputs.copy(), declared_outputs.copy()
515 for out_key in list(special_outputs.keys()):
516 if out_key.startswith(key_prefix):
517 promoted_key = out_key[len(key_prefix):]
518 if promoted_key in promoted_decl:
519 raise ValueError(f"Collision: {promoted_key} already exists")
520 promoted_out[promoted_key] = special_outputs[out_key]
521 promoted_decl[promoted_key] = {
522 "step_index": step_index, "position": position,
523 "path": special_outputs[out_key]["path"]
524 }
526 return promoted_out, promoted_decl