Coverage for openhcs/core/pipeline/path_planner.py: 80.5%

237 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2Pipeline path planning - actually reduced duplication. 

3 

4This version ACTUALLY eliminates duplication instead of adding abstraction theater. 

5""" 

6 

7import logging 

8from dataclasses import dataclass 

9from pathlib import Path 

10from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple 

11 

12from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend 

13from openhcs.constants.input_source import InputSource 

14from openhcs.core.config import MaterializationBackend 

15from openhcs.core.context.processing_context import ProcessingContext 

16from openhcs.core.pipeline.pipeline_utils import get_core_callable 

17from openhcs.core.steps.abstract import AbstractStep 

18from openhcs.core.steps.function_step import FunctionStep 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23# ===== PATTERN NORMALIZATION (ONE place) ===== 

24 

25def normalize_pattern(pattern: Any) -> Iterator[Tuple[Callable, str, int]]: 

26 """THE single pattern normalizer - 15 lines, no duplication.""" 

27 if isinstance(pattern, dict): 

28 for key, value in pattern.items(): 

29 for pos, func in enumerate(value if isinstance(value, list) else [value]): 

30 if callable_func := get_core_callable(func): 30 ↛ 29line 30 didn't jump to line 29 because the condition on line 30 was always true

31 yield (callable_func, key, pos) 

32 elif isinstance(pattern, list): 

33 for pos, func in enumerate(pattern): 

34 if callable_func := get_core_callable(func): 34 ↛ 33line 34 didn't jump to line 33 because the condition on line 34 was always true

35 yield (callable_func, "default", pos) 

36 elif callable_func := get_core_callable(pattern): 36 ↛ exitline 36 didn't return from function 'normalize_pattern' because the condition on line 36 was always true

37 yield (callable_func, "default", 0) 

38 

39 

40def extract_attributes(pattern: Any) -> Dict[str, Any]: 

41 """Extract all function attributes in one pass - 10 lines.""" 

42 outputs, inputs, mat_funcs = set(), {}, {} 

43 for func, _, _ in normalize_pattern(pattern): 

44 outputs.update(getattr(func, '__special_outputs__', set())) 

45 inputs.update(getattr(func, '__special_inputs__', {})) 

46 mat_funcs.update(getattr(func, '__materialization_functions__', {})) 

47 return {'outputs': outputs, 'inputs': inputs, 'mat_funcs': mat_funcs} 

48 

49 

50# ===== PATH PLANNING (NO duplication) ===== 

51 

52class PathPlanner: 

53 """Minimal path planner with zero duplication.""" 

54 

55 def __init__(self, context: ProcessingContext, pipeline_config): 

56 self.ctx = context 

57 # Access config directly from pipeline_config (lazy resolution happens via config_context) 

58 self.cfg = pipeline_config.path_planning_config 

59 self.vfs = pipeline_config.vfs_config 

60 self.plans = context.step_plans 

61 self.declared = {} # Tracks special outputs 

62 

63 # Initial input determination (once) 

64 self.initial_input = Path(context.input_dir) 

65 self.plate_path = Path(context.plate_path) 

66 

67 def plan(self, pipeline: List[AbstractStep]) -> Dict: 

68 """Plan all paths with zero duplication.""" 

69 for i, step in enumerate(pipeline): 

70 self._plan_step(step, i, pipeline) 

71 

72 self._validate(pipeline) 

73 

74 # Set output_plate_root and sub_dir for metadata writing 

75 if pipeline: 75 ↛ 81line 75 didn't jump to line 81 because the condition on line 75 was always true

76 self.ctx.output_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False) 

77 self.ctx.sub_dir = self.cfg.sub_dir 

78 

79 

80 

81 return self.plans 

82 

83 def _plan_step(self, step: AbstractStep, i: int, pipeline: List): 

84 """Plan one step - no duplicate logic.""" 

85 sid = i # Use step index instead of step_id 

86 

87 # Get paths with unified logic 

88 input_dir = self._get_dir(step, i, pipeline, 'input') 

89 output_dir = self._get_dir(step, i, pipeline, 'output', input_dir) 

90 

91 # Extract function data if FunctionStep 

92 attrs = extract_attributes(step.func) if isinstance(step, FunctionStep) else { 

93 'outputs': self._normalize_attr(getattr(step, 'special_outputs', set()), set), 

94 'inputs': self._normalize_attr(getattr(step, 'special_inputs', {}), dict), 

95 'mat_funcs': {} 

96 } 

97 

98 # Process special I/O with unified logic 

99 special_outputs = self._process_special(attrs['outputs'], attrs['mat_funcs'], 'output', sid) 

100 special_inputs = self._process_special(attrs['inputs'], attrs['outputs'], 'input', sid) 

101 

102 # Handle metadata injection 

103 if isinstance(step, FunctionStep) and any(k in METADATA_RESOLVERS for k in attrs['inputs']): 

104 step.func = self._inject_metadata(step.func, attrs['inputs']) 

105 

106 # Generate funcplan (only if needed) 

107 funcplan = {} 

108 if isinstance(step, FunctionStep) and special_outputs: 

109 for func, dk, pos in normalize_pattern(step.func): 

110 saves = [k for k in special_outputs if k in getattr(func, '__special_outputs__', set())] 

111 if saves: 111 ↛ 109line 111 didn't jump to line 109 because the condition on line 111 was always true

112 funcplan[f"{func.__name__}_{dk}_{pos}"] = saves 

113 

114 # Handle optional materialization and input conversion 

115 # Read step_materialization_config directly from step object (not step plans, which aren't populated yet) 

116 materialized_output_dir = None 

117 if step.step_materialization_config: 

118 # Check if this step has well filters and if current well should be materialized 

119 step_axis_filters = getattr(self.ctx, 'step_axis_filters', {}).get(sid, {}) 

120 materialization_filter = step_axis_filters.get('step_materialization_config') 

121 

122 if materialization_filter: 122 ↛ 137line 122 didn't jump to line 137 because the condition on line 122 was always true

123 # Inline simple conditional logic for axis filtering 

124 from openhcs.core.config import WellFilterMode 

125 axis_in_filter = self.ctx.axis_id in materialization_filter['resolved_axis_values'] 

126 should_materialize = ( 

127 axis_in_filter if materialization_filter['filter_mode'] == WellFilterMode.INCLUDE 

128 else not axis_in_filter 

129 ) 

130 

131 if should_materialize: 

132 materialized_output_dir = self._build_output_path(step.step_materialization_config) 

133 else: 

134 logger.debug(f"Skipping materialization for step {step.name}, axis {self.ctx.axis_id} (filtered out)") 

135 else: 

136 # No axis filter - create materialization path as normal 

137 materialized_output_dir = self._build_output_path(step.step_materialization_config) 

138 

139 input_conversion_dir = self._get_optional_path("input_conversion_config", sid) 

140 

141 # Calculate main pipeline plate root for this step 

142 main_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False) 

143 

144 # Single update 

145 self.plans[sid].update({ 

146 'input_dir': str(input_dir), 

147 'output_dir': str(output_dir), 

148 'output_plate_root': str(main_plate_root), 

149 'sub_dir': self.cfg.sub_dir, # Store resolved sub_dir for main pipeline 

150 'pipeline_position': i, 

151 'input_source': self._get_input_source(step, i), 

152 'special_inputs': special_inputs, 

153 'special_outputs': special_outputs, 

154 'funcplan': funcplan, 

155 }) 

156 

157 # Add optional paths if configured 

158 if materialized_output_dir: 

159 # Per-step materialization uses its own config to determine plate root 

160 materialized_plate_root = self.build_output_plate_root(self.plate_path, step.step_materialization_config, is_per_step_materialization=False) 

161 self.plans[sid].update({ 

162 'materialized_output_dir': str(materialized_output_dir), 

163 'materialized_plate_root': str(materialized_plate_root), 

164 'materialized_sub_dir': step.step_materialization_config.sub_dir, # Store resolved sub_dir for materialization 

165 'materialized_backend': self.vfs.materialization_backend.value, 

166 'materialization_config': step.step_materialization_config # Store config for well filtering (will be resolved by compiler) 

167 }) 

168 if input_conversion_dir: 

169 self.plans[sid].update({ 

170 'input_conversion_dir': str(input_conversion_dir), 

171 'input_conversion_backend': self.vfs.materialization_backend.value 

172 }) 

173 

174 # Set backend if needed 

175 if getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

176 self.plans[sid][READ_BACKEND] = self.vfs.materialization_backend.value 

177 

178 # If zarr conversion occurred, redirect input_dir to zarr store 

179 if self.vfs.materialization_backend == MaterializationBackend.ZARR and pipeline: 

180 first_step_plan = self.plans.get(0, {}) # Use step index 0 instead of step_id 

181 if "input_conversion_dir" in first_step_plan: 181 ↛ exitline 181 didn't return from function '_plan_step' because the condition on line 181 was always true

182 self.plans[sid]['input_dir'] = first_step_plan['input_conversion_dir'] 

183 

184 def _get_dir(self, step: AbstractStep, i: int, pipeline: List, 

185 dir_type: str, fallback: Path = None) -> Path: 

186 """Unified directory resolution - no duplication.""" 

187 sid = i # Use step index instead of step_id 

188 

189 # Check overrides (same for input/output) 

190 if override := self.plans.get(sid, {}).get(f'{dir_type}_dir'): 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 return Path(override) 

192 if override := getattr(step, f'__{dir_type}_dir__', None): 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 return Path(override) 

194 

195 # Type-specific logic 

196 if dir_type == 'input': 

197 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

198 return self.initial_input 

199 prev_step_index = i - 1 # Use previous step index instead of step_id 

200 return Path(self.plans[prev_step_index]['output_dir']) 

201 else: # output 

202 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

203 return self._build_output_path() 

204 return fallback # Work in place 

205 

206 @staticmethod 

207 def build_output_plate_root(plate_path: Path, path_config, is_per_step_materialization: bool = False) -> Path: 

208 """Build output plate root directory directly from configuration components. 

209 

210 Formula: 

211 - If output_dir_suffix is empty and NOT per-step materialization: use main pipeline output directory 

212 - If output_dir_suffix is empty and IS per-step materialization: use plate_path directly 

213 - Otherwise: (global_output_folder OR plate_path.parent) + plate_name + output_dir_suffix 

214 

215 Args: 

216 plate_path: Path to the original plate directory 

217 path_config: PathPlanningConfig with global_output_folder and output_dir_suffix 

218 is_per_step_materialization: True if this is per-step materialization (no auto suffix) 

219 

220 Returns: 

221 Path to plate root directory (e.g., "/data/results/plate001_processed") 

222 """ 

223 

224 

225 base = Path(path_config.global_output_folder) if path_config.global_output_folder else plate_path.parent 

226 

227 # Handle empty suffix differently for per-step vs pipeline-level materialization 

228 if not path_config.output_dir_suffix: 

229 if is_per_step_materialization: 229 ↛ 231line 229 didn't jump to line 231 because the condition on line 229 was never true

230 # Per-step materialization: use exact path without automatic suffix 

231 return base / plate_path.name 

232 else: 

233 # Pipeline-level materialization: trust lazy inheritance system 

234 return base / plate_path.name 

235 

236 result = base / f"{plate_path.name}{path_config.output_dir_suffix}" 

237 return result 

238 

239 def _build_output_path(self, path_config=None) -> Path: 

240 """Build complete output path: plate_root + sub_dir""" 

241 config = path_config or self.cfg 

242 

243 # Use the config's own output_dir_suffix to determine plate root 

244 plate_root = self.build_output_plate_root(self.plate_path, config, is_per_step_materialization=False) 

245 return plate_root / config.sub_dir 

246 

247 def _calculate_materialized_output_path(self, materialization_config) -> Path: 

248 """Calculate materialized output path using custom PathPlanningConfig.""" 

249 return self._build_output_path(materialization_config) 

250 

251 def _calculate_input_conversion_path(self, conversion_config) -> Path: 

252 """Calculate input conversion path using custom PathPlanningConfig.""" 

253 return self._build_output_path(conversion_config) 

254 

255 def _get_optional_path(self, config_key: str, step_index: int) -> Optional[Path]: 

256 """Get optional path if config exists.""" 

257 if config_key in self.plans[step_index]: 

258 config = self.plans[step_index][config_key] 

259 return self._build_output_path(config) 

260 return None 

261 

262 def _process_special(self, items: Any, extra: Any, io_type: str, sid: str) -> Dict: 

263 """Unified special I/O processing - no duplication.""" 

264 result = {} 

265 

266 if io_type == 'output' and items: # Special outputs 

267 results_path = self._get_results_path() 

268 for key in sorted(items): 

269 filename = PipelinePathPlanner._build_axis_filename(self.ctx.axis_id, key) 

270 path = results_path / filename 

271 result[key] = { 

272 'path': str(path), 

273 'materialization_function': extra.get(key) # extra is mat_funcs 

274 } 

275 self.declared[key] = str(path) 

276 

277 elif io_type == 'input' and items: # Special inputs 

278 for key in sorted(items.keys() if isinstance(items, dict) else items): 

279 if key in self.declared: 

280 result[key] = {'path': self.declared[key], 'source_step_id': 'prev'} 

281 elif key in extra: # extra is outputs (self-fulfilling) 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 result[key] = {'path': 'self', 'source_step_id': sid} 

283 elif key not in METADATA_RESOLVERS: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 raise ValueError(f"Step {sid} needs '{key}' but it's not available") 

285 

286 return result 

287 

288 def _inject_metadata(self, pattern: Any, inputs: Dict) -> Any: 

289 """Inject metadata for special inputs.""" 

290 for key in inputs: 

291 if key in METADATA_RESOLVERS and key not in self.declared: 291 ↛ 290line 291 didn't jump to line 290 because the condition on line 291 was always true

292 value = METADATA_RESOLVERS[key]["resolver"](self.ctx) 

293 pattern = self._inject_into_pattern(pattern, key, value) 

294 return pattern 

295 

296 def _inject_into_pattern(self, pattern: Any, key: str, value: Any) -> Any: 

297 """Inject value into pattern - handles all cases in 6 lines.""" 

298 if callable(pattern): 

299 return (pattern, {key: value}) 

300 if isinstance(pattern, tuple) and len(pattern) == 2: 300 ↛ 302line 300 didn't jump to line 302 because the condition on line 300 was always true

301 return (pattern[0], {**pattern[1], key: value}) 

302 if isinstance(pattern, list) and len(pattern) == 1: 

303 return [self._inject_into_pattern(pattern[0], key, value)] 

304 raise ValueError(f"Cannot inject into pattern type: {type(pattern)}") 

305 

306 def _normalize_attr(self, attr: Any, target_type: type) -> Any: 

307 """Normalize step attributes - 5 lines, no duplication.""" 

308 if target_type == set: 

309 return {attr} if isinstance(attr, str) else set(attr) if isinstance(attr, (list, set)) else set() 

310 else: # dict 

311 return {attr: True} if isinstance(attr, str) else {k: True for k in attr} if isinstance(attr, list) else attr if isinstance(attr, dict) else {} 

312 

313 def _get_input_source(self, step: AbstractStep, i: int) -> str: 

314 """Get input source string.""" 

315 if getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

316 return 'PIPELINE_START' 

317 return 'PREVIOUS_STEP' 

318 

319 def _get_results_path(self) -> Path: 

320 """Get results path from global pipeline configuration.""" 

321 try: 

322 # Access materialization_results_path from global config, not path planning config 

323 path = self.ctx.global_config.materialization_results_path 

324 return Path(path) if Path(path).is_absolute() else self.plate_path / path 

325 except AttributeError as e: 

326 # Fallback with clear error message if global config is unavailable 

327 raise RuntimeError(f"Cannot access global config for materialization_results_path: {e}") from e 

328 

329 def _validate(self, pipeline: List): 

330 """Validate connectivity and materialization paths - no duplication.""" 

331 # Existing connectivity validation 

332 for i in range(1, len(pipeline)): 

333 curr, prev = pipeline[i], pipeline[i-1] 

334 if getattr(curr, 'input_source', None) == InputSource.PIPELINE_START: 

335 continue 

336 curr_in = self.plans[i]['input_dir'] # Use step index i 

337 prev_out = self.plans[i-1]['output_dir'] # Use step index i-1 

338 if curr_in != prev_out: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 has_special = any(inp.get('source_step_id') in [i-1, 'prev'] # Check both step index and 'prev' 

340 for inp in self.plans[i].get('special_inputs', {}).values()) # Use step index i 

341 if not has_special: 

342 raise ValueError(f"Disconnect: {prev.name} -> {curr.name}") 

343 

344 # NEW: Materialization path collision validation 

345 self._validate_materialization_paths(pipeline) 

346 

347 

348 def _validate_materialization_paths(self, pipeline: List[AbstractStep]) -> None: 

349 """Validate and resolve materialization path collisions with symmetric conflict resolution.""" 

350 global_path = self._build_output_path(self.cfg) 

351 

352 # Collect all materialization steps with their paths and positions 

353 mat_steps = [ 

354 (step, self.plans.get(i, {}).get('pipeline_position', 0), self._build_output_path(step.step_materialization_config)) 

355 for i, step in enumerate(pipeline) if step.step_materialization_config 

356 ] 

357 

358 # Group by path for conflict detection 

359 from collections import defaultdict 

360 path_groups = defaultdict(list) 

361 for step, pos, path in mat_steps: 

362 if path == global_path: 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true

363 self._resolve_and_update_paths(step, pos, path, "main flow") 

364 else: 

365 path_groups[str(path)].append((step, pos, path)) 

366 

367 # Resolve materialization vs materialization conflicts 

368 for path_key, step_list in path_groups.items(): 

369 if len(step_list) > 1: 

370 print(f"⚠️ Materialization path collision detected for {len(step_list)} steps at: {path_key}") 

371 for step, pos, path in step_list: 

372 self._resolve_and_update_paths(step, pos, path, f"pos {pos}") 

373 

374 def _resolve_and_update_paths(self, step: AbstractStep, position: int, original_path: Path, conflict_type: str) -> None: 

375 """Resolve path conflict by updating sub_dir configuration directly.""" 

376 # Lazy configs are already resolved via config_context() in the compiler 

377 # No need to call to_base_config() - that's legacy code 

378 materialization_config = step.step_materialization_config 

379 

380 # Generate unique sub_dir name instead of calculating from paths 

381 original_sub_dir = materialization_config.sub_dir 

382 print(f"🔍 PATH_COLLISION DEBUG: step '{step.name}' original_sub_dir = '{original_sub_dir}' (type: {type(materialization_config).__name__})") 

383 new_sub_dir = f"{original_sub_dir}_step{position}" 

384 

385 # Update step materialization config with new sub_dir 

386 from dataclasses import replace 

387 step.step_materialization_config = replace(materialization_config, sub_dir=new_sub_dir) 

388 

389 # Recalculate the resolved path using the updated config 

390 resolved_path = self._build_output_path(step.step_materialization_config) 

391 

392 # Update step plans for metadata generation 

393 if step_plan := self.plans.get(position): # Use position (step index) instead of step_id 393 ↛ 398line 393 didn't jump to line 398 because the condition on line 393 was always true

394 if 'materialized_output_dir' in step_plan: 394 ↛ 398line 394 didn't jump to line 398 because the condition on line 394 was always true

395 step_plan['materialized_output_dir'] = str(resolved_path) 

396 step_plan['materialized_sub_dir'] = new_sub_dir # Update stored sub_dir 

397 

398 print(f" - step '{step.name}' ({conflict_type}) → {resolved_path}") 

399 

400 

401 

402# ===== PUBLIC API ===== 

403 

404class PipelinePathPlanner: 

405 """Public API matching original interface.""" 

406 

407 @staticmethod 

408 def prepare_pipeline_paths(context: ProcessingContext, 

409 pipeline_definition: List[AbstractStep], 

410 pipeline_config) -> Dict: 

411 """Prepare pipeline paths.""" 

412 return PathPlanner(context, pipeline_config).plan(pipeline_definition) 

413 

414 @staticmethod 

415 def _build_axis_filename(axis_id: str, key: str, extension: str = "pkl") -> str: 

416 """Build standardized axis-based filename.""" 

417 return f"{axis_id}_{key}.{extension}" 

418 

419 

420 

421 

422# ===== METADATA ===== 

423 

424METADATA_RESOLVERS = { 

425 "grid_dimensions": { 

426 "resolver": lambda context: context.microscope_handler.get_grid_dimensions(context.plate_path), 

427 "description": "Grid dimensions (num_rows, num_cols) for position generation functions" 

428 }, 

429} 

430 

431def resolve_metadata(key: str, context) -> Any: 

432 """Resolve metadata value.""" 

433 if key not in METADATA_RESOLVERS: 

434 raise ValueError(f"No resolver for '{key}'") 

435 return METADATA_RESOLVERS[key]["resolver"](context) 

436 

437 

438 

439 

440def register_metadata_resolver(key: str, resolver: Callable, description: str): 

441 """Register metadata resolver.""" 

442 METADATA_RESOLVERS[key] = {"resolver": resolver, "description": description} 

443 

444 

445# ===== SCOPE PROMOTION (separate concern) ===== 

446 

447def _apply_scope_promotion_rules(dict_pattern, special_outputs, declared_outputs, step_index, position): 

448 """Scope promotion for single-key dict patterns - 15 lines.""" 

449 if len(dict_pattern) != 1: 

450 return special_outputs, declared_outputs 

451 

452 key_prefix = f"{list(dict_pattern.keys())[0]}_0_" 

453 promoted_out, promoted_decl = special_outputs.copy(), declared_outputs.copy() 

454 

455 for out_key in list(special_outputs.keys()): 

456 if out_key.startswith(key_prefix): 

457 promoted_key = out_key[len(key_prefix):] 

458 if promoted_key in promoted_decl: 

459 raise ValueError(f"Collision: {promoted_key} already exists") 

460 promoted_out[promoted_key] = special_outputs[out_key] 

461 promoted_decl[promoted_key] = { 

462 "step_index": step_index, "position": position, 

463 "path": special_outputs[out_key]["path"] 

464 } 

465 

466 return promoted_out, promoted_decl