Coverage for openhcs/core/pipeline/path_planner.py: 77.5%

234 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Pipeline path planning - actually reduced duplication. 

3 

4This version ACTUALLY eliminates duplication instead of adding abstraction theater. 

5""" 

6 

7import logging 

8from dataclasses import dataclass 

9from pathlib import Path 

10from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple 

11 

12from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend 

13from openhcs.constants.input_source import InputSource 

14from openhcs.core.config import MaterializationBackend 

15from openhcs.core.context.processing_context import ProcessingContext 

16from openhcs.core.pipeline.pipeline_utils import get_core_callable 

17from openhcs.core.steps.abstract import AbstractStep 

18from openhcs.core.steps.function_step import FunctionStep 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23# ===== PATTERN NORMALIZATION (ONE place) ===== 

24 

25def normalize_pattern(pattern: Any) -> Iterator[Tuple[Callable, str, int]]: 

26 """THE single pattern normalizer - 15 lines, no duplication.""" 

27 if isinstance(pattern, dict): 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 for key, value in pattern.items(): 

29 for pos, func in enumerate(value if isinstance(value, list) else [value]): 

30 if callable_func := get_core_callable(func): 

31 yield (callable_func, key, pos) 

32 elif isinstance(pattern, list): 

33 for pos, func in enumerate(pattern): 

34 if callable_func := get_core_callable(func): 34 ↛ 33line 34 didn't jump to line 33 because the condition on line 34 was always true

35 yield (callable_func, "default", pos) 

36 elif callable_func := get_core_callable(pattern): 36 ↛ exitline 36 didn't return from function 'normalize_pattern' because the condition on line 36 was always true

37 yield (callable_func, "default", 0) 

38 

39 

40def extract_attributes(pattern: Any) -> Dict[str, Any]: 

41 """Extract all function attributes in one pass - 10 lines.""" 

42 outputs, inputs, mat_funcs = set(), {}, {} 

43 for func, _, _ in normalize_pattern(pattern): 

44 outputs.update(getattr(func, '__special_outputs__', set())) 

45 inputs.update(getattr(func, '__special_inputs__', {})) 

46 mat_funcs.update(getattr(func, '__materialization_functions__', {})) 

47 return {'outputs': outputs, 'inputs': inputs, 'mat_funcs': mat_funcs} 

48 

49 

50# ===== PATH PLANNING (NO duplication) ===== 

51 

52class PathPlanner: 

53 """Minimal path planner with zero duplication.""" 

54 

55 def __init__(self, context: ProcessingContext): 

56 self.ctx = context 

57 self.cfg = context.get_path_planning_config() 

58 self.vfs = context.get_vfs_config() 

59 self.plans = context.step_plans 

60 self.declared = {} # Tracks special outputs 

61 

62 # Initial input determination (once) 

63 self.initial_input = Path(context.input_dir) 

64 self.plate_path = Path(context.plate_path) 

65 

66 def plan(self, pipeline: List[AbstractStep]) -> Dict: 

67 """Plan all paths with zero duplication.""" 

68 for i, step in enumerate(pipeline): 

69 self._plan_step(step, i, pipeline) 

70 

71 self._validate(pipeline) 

72 

73 # Set output_plate_root and sub_dir for metadata writing 

74 if pipeline: 74 ↛ 78line 74 didn't jump to line 78 because the condition on line 74 was always true

75 self.ctx.output_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False) 

76 self.ctx.sub_dir = self.cfg.sub_dir 

77 

78 return self.plans 

79 

80 def _plan_step(self, step: AbstractStep, i: int, pipeline: List): 

81 """Plan one step - no duplicate logic.""" 

82 sid = step.step_id 

83 

84 # Get paths with unified logic 

85 input_dir = self._get_dir(step, i, pipeline, 'input') 

86 output_dir = self._get_dir(step, i, pipeline, 'output', input_dir) 

87 

88 # Extract function data if FunctionStep 

89 attrs = extract_attributes(step.func) if isinstance(step, FunctionStep) else { 

90 'outputs': self._normalize_attr(getattr(step, 'special_outputs', set()), set), 

91 'inputs': self._normalize_attr(getattr(step, 'special_inputs', {}), dict), 

92 'mat_funcs': {} 

93 } 

94 

95 # Process special I/O with unified logic 

96 special_outputs = self._process_special(attrs['outputs'], attrs['mat_funcs'], 'output', sid) 

97 special_inputs = self._process_special(attrs['inputs'], attrs['outputs'], 'input', sid) 

98 

99 # Handle metadata injection 

100 if isinstance(step, FunctionStep) and any(k in METADATA_RESOLVERS for k in attrs['inputs']): 

101 step.func = self._inject_metadata(step.func, attrs['inputs']) 

102 

103 # Generate funcplan (only if needed) 

104 funcplan = {} 

105 if isinstance(step, FunctionStep) and special_outputs: 

106 for func, dk, pos in normalize_pattern(step.func): 

107 saves = [k for k in special_outputs if k in getattr(func, '__special_outputs__', set())] 

108 if saves: 108 ↛ 106line 108 didn't jump to line 106 because the condition on line 108 was always true

109 funcplan[f"{func.__name__}_{dk}_{pos}"] = saves 

110 

111 # Handle optional materialization and input conversion 

112 # Read materialization_config directly from step object (not step plans, which aren't populated yet) 

113 materialized_output_dir = None 

114 if step.materialization_config: 

115 # Check if this step has well filters and if current well should be materialized 

116 step_well_filter = getattr(self.ctx, 'step_well_filters', {}).get(sid) 

117 

118 if step_well_filter: 118 ↛ 133line 118 didn't jump to line 133 because the condition on line 118 was always true

119 # Inline simple conditional logic for well filtering 

120 from openhcs.core.config import WellFilterMode 

121 well_in_filter = self.ctx.well_id in step_well_filter['resolved_wells'] 

122 should_materialize = ( 

123 well_in_filter if step_well_filter['filter_mode'] == WellFilterMode.INCLUDE 

124 else not well_in_filter 

125 ) 

126 

127 if should_materialize: 

128 materialized_output_dir = self._build_output_path(step.materialization_config) 

129 else: 

130 logger.debug(f"Skipping materialization for step {step.name}, well {self.ctx.well_id} (filtered out)") 

131 else: 

132 # No well filter - create materialization path as normal 

133 materialized_output_dir = self._build_output_path(step.materialization_config) 

134 

135 input_conversion_dir = self._get_optional_path("input_conversion_config", sid) 

136 

137 # Calculate main pipeline plate root for this step 

138 main_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False) 

139 

140 # Single update 

141 self.plans[sid].update({ 

142 'input_dir': str(input_dir), 

143 'output_dir': str(output_dir), 

144 'output_plate_root': str(main_plate_root), 

145 'sub_dir': self.cfg.sub_dir, # Store resolved sub_dir for main pipeline 

146 'pipeline_position': i, 

147 'input_source': self._get_input_source(step, i), 

148 'special_inputs': special_inputs, 

149 'special_outputs': special_outputs, 

150 'funcplan': funcplan, 

151 }) 

152 

153 # Add optional paths if configured 

154 if materialized_output_dir: 

155 # Per-step materialization uses its own config to determine plate root 

156 materialized_plate_root = self.build_output_plate_root(self.plate_path, step.materialization_config, is_per_step_materialization=False) 

157 self.plans[sid].update({ 

158 'materialized_output_dir': str(materialized_output_dir), 

159 'materialized_plate_root': str(materialized_plate_root), 

160 'materialized_sub_dir': step.materialization_config.sub_dir, # Store resolved sub_dir for materialization 

161 'materialized_backend': self.vfs.materialization_backend.value, 

162 'materialization_config': step.materialization_config # Store config for well filtering (will be resolved by compiler) 

163 }) 

164 if input_conversion_dir: 

165 self.plans[sid].update({ 

166 'input_conversion_dir': str(input_conversion_dir), 

167 'input_conversion_backend': self.vfs.materialization_backend.value 

168 }) 

169 

170 # Set backend if needed 

171 if getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

172 self.plans[sid][READ_BACKEND] = self.vfs.materialization_backend.value 

173 

174 # If zarr conversion occurred, redirect input_dir to zarr store 

175 if self.vfs.materialization_backend == MaterializationBackend.ZARR and pipeline: 

176 first_step_plan = self.plans.get(pipeline[0].step_id, {}) 

177 if "input_conversion_dir" in first_step_plan: 177 ↛ exitline 177 didn't return from function '_plan_step' because the condition on line 177 was always true

178 self.plans[sid]['input_dir'] = first_step_plan['input_conversion_dir'] 

179 

180 def _get_dir(self, step: AbstractStep, i: int, pipeline: List, 

181 dir_type: str, fallback: Path = None) -> Path: 

182 """Unified directory resolution - no duplication.""" 

183 sid = step.step_id 

184 

185 # Check overrides (same for input/output) 

186 if override := self.plans.get(sid, {}).get(f'{dir_type}_dir'): 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true

187 return Path(override) 

188 if override := getattr(step, f'__{dir_type}_dir__', None): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 return Path(override) 

190 

191 # Type-specific logic 

192 if dir_type == 'input': 

193 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

194 return self.initial_input 

195 prev_sid = pipeline[i-1].step_id 

196 return Path(self.plans[prev_sid]['output_dir']) 

197 else: # output 

198 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

199 return self._build_output_path() 

200 return fallback # Work in place 

201 

202 @staticmethod 

203 def build_output_plate_root(plate_path: Path, path_config, is_per_step_materialization: bool = False) -> Path: 

204 """Build output plate root directory directly from configuration components. 

205 

206 Formula: 

207 - If output_dir_suffix is empty and NOT per-step materialization: use main pipeline output directory 

208 - If output_dir_suffix is empty and IS per-step materialization: use plate_path directly 

209 - Otherwise: (global_output_folder OR plate_path.parent) + plate_name + output_dir_suffix 

210 

211 Args: 

212 plate_path: Path to the original plate directory 

213 path_config: PathPlanningConfig with global_output_folder and output_dir_suffix 

214 is_per_step_materialization: True if this is per-step materialization (no auto suffix) 

215 

216 Returns: 

217 Path to plate root directory (e.g., "/data/results/plate001_processed") 

218 """ 

219 base = Path(path_config.global_output_folder) if path_config.global_output_folder else plate_path.parent 

220 

221 # Handle empty suffix differently for per-step vs pipeline-level materialization 

222 if not path_config.output_dir_suffix: 

223 if is_per_step_materialization: 223 ↛ 225line 223 didn't jump to line 225 because the condition on line 223 was never true

224 # Per-step materialization: use exact path without automatic suffix 

225 return base / plate_path.name 

226 else: 

227 # Pipeline-level materialization: use main pipeline output directory 

228 main_output_path = base / f"{plate_path.name}_outputs" 

229 return main_output_path 

230 

231 return base / f"{plate_path.name}{path_config.output_dir_suffix}" 

232 

233 def _build_output_path(self, path_config=None) -> Path: 

234 """Build complete output path: plate_root + sub_dir""" 

235 config = path_config or self.cfg 

236 

237 # Use the config's own output_dir_suffix to determine plate root 

238 plate_root = self.build_output_plate_root(self.plate_path, config, is_per_step_materialization=False) 

239 return plate_root / config.sub_dir 

240 

241 def _calculate_materialized_output_path(self, materialization_config) -> Path: 

242 """Calculate materialized output path using custom PathPlanningConfig.""" 

243 return self._build_output_path(materialization_config) 

244 

245 def _calculate_input_conversion_path(self, conversion_config) -> Path: 

246 """Calculate input conversion path using custom PathPlanningConfig.""" 

247 return self._build_output_path(conversion_config) 

248 

249 def _get_optional_path(self, config_key: str, step_id: str) -> Optional[Path]: 

250 """Get optional path if config exists.""" 

251 if config_key in self.plans[step_id]: 

252 config = self.plans[step_id][config_key] 

253 return self._build_output_path(config) 

254 return None 

255 

256 def _process_special(self, items: Any, extra: Any, io_type: str, sid: str) -> Dict: 

257 """Unified special I/O processing - no duplication.""" 

258 result = {} 

259 

260 if io_type == 'output' and items: # Special outputs 

261 results_path = self._get_results_path() 

262 for key in sorted(items): 

263 filename = PipelinePathPlanner._build_well_filename(self.ctx.well_id, key) 

264 path = results_path / filename 

265 result[key] = { 

266 'path': str(path), 

267 'materialization_function': extra.get(key) # extra is mat_funcs 

268 } 

269 self.declared[key] = str(path) 

270 

271 elif io_type == 'input' and items: # Special inputs 

272 for key in sorted(items.keys() if isinstance(items, dict) else items): 

273 if key in self.declared: 

274 result[key] = {'path': self.declared[key], 'source_step_id': 'prev'} 

275 elif key in extra: # extra is outputs (self-fulfilling) 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 result[key] = {'path': 'self', 'source_step_id': sid} 

277 elif key not in METADATA_RESOLVERS: 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true

278 raise ValueError(f"Step {sid} needs '{key}' but it's not available") 

279 

280 return result 

281 

282 def _inject_metadata(self, pattern: Any, inputs: Dict) -> Any: 

283 """Inject metadata for special inputs.""" 

284 for key in inputs: 

285 if key in METADATA_RESOLVERS and key not in self.declared: 285 ↛ 284line 285 didn't jump to line 284 because the condition on line 285 was always true

286 value = METADATA_RESOLVERS[key]["resolver"](self.ctx) 

287 pattern = self._inject_into_pattern(pattern, key, value) 

288 return pattern 

289 

290 def _inject_into_pattern(self, pattern: Any, key: str, value: Any) -> Any: 

291 """Inject value into pattern - handles all cases in 6 lines.""" 

292 if callable(pattern): 

293 return (pattern, {key: value}) 

294 if isinstance(pattern, tuple) and len(pattern) == 2: 294 ↛ 296line 294 didn't jump to line 296 because the condition on line 294 was always true

295 return (pattern[0], {**pattern[1], key: value}) 

296 if isinstance(pattern, list) and len(pattern) == 1: 

297 return [self._inject_into_pattern(pattern[0], key, value)] 

298 raise ValueError(f"Cannot inject into pattern type: {type(pattern)}") 

299 

300 def _normalize_attr(self, attr: Any, target_type: type) -> Any: 

301 """Normalize step attributes - 5 lines, no duplication.""" 

302 if target_type == set: 

303 return {attr} if isinstance(attr, str) else set(attr) if isinstance(attr, (list, set)) else set() 

304 else: # dict 

305 return {attr: True} if isinstance(attr, str) else {k: True for k in attr} if isinstance(attr, list) else attr if isinstance(attr, dict) else {} 

306 

307 def _get_input_source(self, step: AbstractStep, i: int) -> str: 

308 """Get input source string.""" 

309 if getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

310 return 'PIPELINE_START' 

311 return 'PREVIOUS_STEP' 

312 

313 def _get_results_path(self) -> Path: 

314 """Get results path from global pipeline configuration.""" 

315 try: 

316 # Access materialization_results_path from global config, not path planning config 

317 path = self.ctx.global_config.materialization_results_path 

318 return Path(path) if Path(path).is_absolute() else self.plate_path / path 

319 except AttributeError as e: 

320 # Fallback with clear error message if global config is unavailable 

321 raise RuntimeError(f"Cannot access global config for materialization_results_path: {e}") from e 

322 

323 def _validate(self, pipeline: List): 

324 """Validate connectivity and materialization paths - no duplication.""" 

325 # Existing connectivity validation 

326 for i in range(1, len(pipeline)): 

327 curr, prev = pipeline[i], pipeline[i-1] 

328 if getattr(curr, 'input_source', None) == InputSource.PIPELINE_START: 

329 continue 

330 curr_in = self.plans[curr.step_id]['input_dir'] 

331 prev_out = self.plans[prev.step_id]['output_dir'] 

332 if curr_in != prev_out: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 has_special = any(inp.get('source_step_id') == prev.step_id 

334 for inp in self.plans[curr.step_id].get('special_inputs', {}).values()) 

335 if not has_special: 

336 raise ValueError(f"Disconnect: {prev.name} -> {curr.name}") 

337 

338 # NEW: Materialization path collision validation 

339 self._validate_materialization_paths(pipeline) 

340 

341 

342 def _validate_materialization_paths(self, pipeline: List[AbstractStep]) -> None: 

343 """Validate and resolve materialization path collisions with symmetric conflict resolution.""" 

344 global_path = self._build_output_path(self.cfg) 

345 

346 # Collect all materialization steps with their paths and positions 

347 mat_steps = [ 

348 (step, self.plans.get(step.step_id, {}).get('pipeline_position', 0), self._build_output_path(step.materialization_config)) 

349 for step in pipeline if step.materialization_config 

350 ] 

351 

352 # Group by path for conflict detection 

353 from collections import defaultdict 

354 path_groups = defaultdict(list) 

355 for step, pos, path in mat_steps: 

356 if path == global_path: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true

357 self._resolve_and_update_paths(step, pos, path, "main flow") 

358 else: 

359 path_groups[str(path)].append((step, pos, path)) 

360 

361 # Resolve materialization vs materialization conflicts 

362 for path_key, step_list in path_groups.items(): 

363 if len(step_list) > 1: 

364 print(f"⚠️ Materialization path collision detected for {len(step_list)} steps at: {path_key}") 

365 for step, pos, path in step_list: 

366 self._resolve_and_update_paths(step, pos, path, f"pos {pos}") 

367 

368 def _resolve_and_update_paths(self, step: AbstractStep, position: int, original_path: Path, conflict_type: str) -> None: 

369 """Resolve path conflict by updating sub_dir configuration directly.""" 

370 # Generate unique sub_dir name instead of calculating from paths 

371 original_sub_dir = step.materialization_config.sub_dir 

372 new_sub_dir = f"{original_sub_dir}_step{position}" 

373 

374 # Update step materialization config with new sub_dir 

375 config_class = type(step.materialization_config) 

376 step.materialization_config = config_class(**{**step.materialization_config.__dict__, 'sub_dir': new_sub_dir}) 

377 

378 # Recalculate the resolved path using the new sub_dir 

379 resolved_path = self._build_output_path(step.materialization_config) 

380 

381 # Update step plans for metadata generation 

382 if step_plan := self.plans.get(step.step_id): 382 ↛ 387line 382 didn't jump to line 387 because the condition on line 382 was always true

383 if 'materialized_output_dir' in step_plan: 383 ↛ 387line 383 didn't jump to line 387 because the condition on line 383 was always true

384 step_plan['materialized_output_dir'] = str(resolved_path) 

385 step_plan['materialized_sub_dir'] = new_sub_dir # Update stored sub_dir 

386 

387 print(f" - step '{step.name}' ({conflict_type}) → {resolved_path}") 

388 

389 

390 

391# ===== PUBLIC API ===== 

392 

393class PipelinePathPlanner: 

394 """Public API matching original interface.""" 

395 

396 @staticmethod 

397 def prepare_pipeline_paths(context: ProcessingContext, 

398 pipeline_definition: List[AbstractStep]) -> Dict: 

399 """Prepare pipeline paths.""" 

400 return PathPlanner(context).plan(pipeline_definition) 

401 

402 @staticmethod 

403 def _build_well_filename(well_id: str, key: str, extension: str = "pkl") -> str: 

404 """Build standardized well-based filename.""" 

405 return f"{well_id}_{key}.{extension}" 

406 

407 

408 

409 

410# ===== METADATA ===== 

411 

412METADATA_RESOLVERS = { 

413 "grid_dimensions": { 

414 "resolver": lambda context: context.microscope_handler.get_grid_dimensions(context.plate_path), 

415 "description": "Grid dimensions (num_rows, num_cols) for position generation functions" 

416 }, 

417} 

418 

419def resolve_metadata(key: str, context) -> Any: 

420 """Resolve metadata value.""" 

421 if key not in METADATA_RESOLVERS: 

422 raise ValueError(f"No resolver for '{key}'") 

423 return METADATA_RESOLVERS[key]["resolver"](context) 

424 

425 

426 

427 

428def register_metadata_resolver(key: str, resolver: Callable, description: str): 

429 """Register metadata resolver.""" 

430 METADATA_RESOLVERS[key] = {"resolver": resolver, "description": description} 

431 

432 

433# ===== SCOPE PROMOTION (separate concern) ===== 

434 

435def _apply_scope_promotion_rules(dict_pattern, special_outputs, declared_outputs, step_id, position): 

436 """Scope promotion for single-key dict patterns - 15 lines.""" 

437 if len(dict_pattern) != 1: 

438 return special_outputs, declared_outputs 

439 

440 key_prefix = f"{list(dict_pattern.keys())[0]}_0_" 

441 promoted_out, promoted_decl = special_outputs.copy(), declared_outputs.copy() 

442 

443 for out_key in list(special_outputs.keys()): 

444 if out_key.startswith(key_prefix): 

445 promoted_key = out_key[len(key_prefix):] 

446 if promoted_key in promoted_decl: 

447 raise ValueError(f"Collision: {promoted_key} already exists") 

448 promoted_out[promoted_key] = special_outputs[out_key] 

449 promoted_decl[promoted_key] = { 

450 "step_id": step_id, "position": position, 

451 "path": special_outputs[out_key]["path"] 

452 } 

453 

454 return promoted_out, promoted_decl