Coverage for openhcs/core/pipeline/path_planner.py: 78.9%

248 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Pipeline path planning - actually reduced duplication. 

3 

4This version ACTUALLY eliminates duplication instead of adding abstraction theater. 

5""" 

6 

7import logging 

8from dataclasses import dataclass 

9from pathlib import Path 

10from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple 

11 

12from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend 

13from openhcs.constants.input_source import InputSource 

14from openhcs.core.config import MaterializationBackend 

15from openhcs.core.context.processing_context import ProcessingContext 

16from openhcs.core.pipeline.pipeline_utils import get_core_callable 

17from openhcs.core.steps.abstract import AbstractStep 

18from openhcs.core.steps.function_step import FunctionStep 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23# ===== PATTERN NORMALIZATION (ONE place) ===== 

24 

25def normalize_pattern(pattern: Any) -> Iterator[Tuple[Callable, str, int]]: 

26 """THE single pattern normalizer - 15 lines, no duplication.""" 

27 if isinstance(pattern, dict): 

28 for key, value in pattern.items(): 

29 for pos, func in enumerate(value if isinstance(value, list) else [value]): 

30 if callable_func := get_core_callable(func): 30 ↛ 29line 30 didn't jump to line 29 because the condition on line 30 was always true

31 yield (callable_func, key, pos) 

32 elif isinstance(pattern, list): 

33 for pos, func in enumerate(pattern): 

34 if callable_func := get_core_callable(func): 34 ↛ 33line 34 didn't jump to line 33 because the condition on line 34 was always true

35 yield (callable_func, "default", pos) 

36 elif callable_func := get_core_callable(pattern): 36 ↛ exitline 36 didn't return from function 'normalize_pattern' because the condition on line 36 was always true

37 yield (callable_func, "default", 0) 

38 

39 

40def extract_attributes(pattern: Any) -> Dict[str, Any]: 

41 """Extract all function attributes in one pass - 10 lines.""" 

42 outputs, inputs, mat_funcs = set(), {}, {} 

43 for func, _, _ in normalize_pattern(pattern): 

44 outputs.update(getattr(func, '__special_outputs__', set())) 

45 inputs.update(getattr(func, '__special_inputs__', {})) 

46 mat_funcs.update(getattr(func, '__materialization_functions__', {})) 

47 return {'outputs': outputs, 'inputs': inputs, 'mat_funcs': mat_funcs} 

48 

49 

50# ===== PATH PLANNING (NO duplication) ===== 

51 

52class PathPlanner: 

53 """Minimal path planner with zero duplication.""" 

54 

55 def __init__(self, context: ProcessingContext, pipeline_config): 

56 self.ctx = context 

57 # Access config directly from pipeline_config (lazy resolution happens via config_context) 

58 self.cfg = pipeline_config.path_planning_config 

59 self.vfs = pipeline_config.vfs_config 

60 self.plans = context.step_plans 

61 self.declared = {} # Tracks special outputs 

62 

63 # Initial input determination (once) 

64 self.initial_input = Path(context.input_dir) 

65 self.plate_path = Path(context.plate_path) 

66 

67 def plan(self, pipeline: List[AbstractStep]) -> Dict: 

68 """Plan all paths with zero duplication.""" 

69 for i, step in enumerate(pipeline): 

70 self._plan_step(step, i, pipeline) 

71 

72 self._validate(pipeline) 

73 

74 # Set output_plate_root and sub_dir for metadata writing 

75 if pipeline: 75 ↛ 81line 75 didn't jump to line 81 because the condition on line 75 was always true

76 self.ctx.output_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False) 

77 self.ctx.sub_dir = self.cfg.sub_dir 

78 

79 

80 

81 return self.plans 

82 

83 def _plan_step(self, step: AbstractStep, i: int, pipeline: List): 

84 """Plan one step - no duplicate logic.""" 

85 sid = i # Use step index instead of step_id 

86 

87 # Get paths with unified logic 

88 input_dir = self._get_dir(step, i, pipeline, 'input') 

89 output_dir = self._get_dir(step, i, pipeline, 'output', input_dir) 

90 

91 # Extract function data if FunctionStep 

92 attrs = extract_attributes(step.func) if isinstance(step, FunctionStep) else { 

93 'outputs': self._normalize_attr(getattr(step, 'special_outputs', set()), set), 

94 'inputs': self._normalize_attr(getattr(step, 'special_inputs', {}), dict), 

95 'mat_funcs': {} 

96 } 

97 

98 # Process special I/O with unified logic 

99 special_outputs = self._process_special(attrs['outputs'], attrs['mat_funcs'], 'output', sid) 

100 special_inputs = self._process_special(attrs['inputs'], attrs['outputs'], 'input', sid) 

101 

102 # Handle metadata injection 

103 if isinstance(step, FunctionStep) and any(k in METADATA_RESOLVERS for k in attrs['inputs']): 

104 step.func = self._inject_metadata(step.func, attrs['inputs']) 

105 

106 # Generate funcplan (only if needed) 

107 funcplan = {} 

108 if isinstance(step, FunctionStep) and special_outputs: 

109 for func, dk, pos in normalize_pattern(step.func): 

110 saves = [k for k in special_outputs if k in getattr(func, '__special_outputs__', set())] 

111 if saves: 111 ↛ 109line 111 didn't jump to line 109 because the condition on line 111 was always true

112 funcplan[f"{func.__name__}_{dk}_{pos}"] = saves 

113 

114 # Handle optional materialization and input conversion 

115 # Read step_materialization_config directly from step object (not step plans, which aren't populated yet) 

116 materialized_output_dir = None 

117 if step.step_materialization_config and step.step_materialization_config.enabled: 

118 # Check if this step has well filters and if current well should be materialized 

119 step_axis_filters = getattr(self.ctx, 'step_axis_filters', {}).get(sid, {}) 

120 materialization_filter = step_axis_filters.get('step_materialization_config') 

121 

122 if materialization_filter: 122 ↛ 137line 122 didn't jump to line 137 because the condition on line 122 was always true

123 # Inline simple conditional logic for axis filtering 

124 from openhcs.core.config import WellFilterMode 

125 axis_in_filter = self.ctx.axis_id in materialization_filter['resolved_axis_values'] 

126 should_materialize = ( 

127 axis_in_filter if materialization_filter['filter_mode'] == WellFilterMode.INCLUDE 

128 else not axis_in_filter 

129 ) 

130 

131 if should_materialize: 

132 materialized_output_dir = self._build_output_path(step.step_materialization_config) 

133 else: 

134 logger.debug(f"Skipping materialization for step {step.name}, axis {self.ctx.axis_id} (filtered out)") 

135 else: 

136 # No axis filter - create materialization path as normal 

137 materialized_output_dir = self._build_output_path(step.step_materialization_config) 

138 

139 # Check if input_conversion_dir is already set by compiler (direct path) 

140 # Otherwise try to calculate from input_conversion_config (legacy) 

141 if "input_conversion_dir" in self.plans[sid]: 

142 input_conversion_dir = Path(self.plans[sid]["input_conversion_dir"]) 

143 else: 

144 input_conversion_dir = self._get_optional_path("input_conversion_config", sid) 

145 

146 # Calculate main pipeline plate root for this step 

147 main_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False) 

148 

149 # Calculate analysis results directory (sibling to output_dir with _results suffix) 

150 # This ensures results are saved alongside images at the same hierarchical level 

151 # Example: images/ -> images_results/, checkpoints_step3/ -> checkpoints_step3_results/ 

152 output_dir_path = Path(output_dir) 

153 dir_name = output_dir_path.name 

154 analysis_results_dir = output_dir_path.parent / f"{dir_name}_results" 

155 

156 # Single update 

157 self.plans[sid].update({ 

158 'input_dir': str(input_dir), 

159 'output_dir': str(output_dir), 

160 'output_plate_root': str(main_plate_root), 

161 'sub_dir': self.cfg.sub_dir, # Store resolved sub_dir for main pipeline 

162 'analysis_results_dir': str(analysis_results_dir), # Pre-calculated results directory 

163 'pipeline_position': i, 

164 'input_source': self._get_input_source(step, i), 

165 'special_inputs': special_inputs, 

166 'special_outputs': special_outputs, 

167 'funcplan': funcplan, 

168 }) 

169 

170 # Add optional paths if configured 

171 if materialized_output_dir: 

172 # Per-step materialization uses its own config to determine plate root 

173 materialized_plate_root = self.build_output_plate_root(self.plate_path, step.step_materialization_config, is_per_step_materialization=False) 

174 

175 # Calculate analysis results directory for materialized output 

176 materialized_dir_path = Path(materialized_output_dir) 

177 materialized_dir_name = materialized_dir_path.name 

178 materialized_analysis_results_dir = materialized_dir_path.parent / f"{materialized_dir_name}_results" 

179 

180 self.plans[sid].update({ 

181 'materialized_output_dir': str(materialized_output_dir), 

182 'materialized_plate_root': str(materialized_plate_root), 

183 'materialized_sub_dir': step.step_materialization_config.sub_dir, # Store resolved sub_dir for materialization 

184 'materialized_analysis_results_dir': str(materialized_analysis_results_dir), # Pre-calculated materialized results directory 

185 'materialized_backend': self.vfs.materialization_backend.value, 

186 'materialization_config': step.step_materialization_config # Store config for well filtering (will be resolved by compiler) 

187 }) 

188 if input_conversion_dir: 

189 self.plans[sid].update({ 

190 'input_conversion_dir': str(input_conversion_dir), 

191 'input_conversion_backend': self.vfs.materialization_backend.value 

192 }) 

193 

194 # PIPELINE_START steps read from original input, not zarr conversion 

195 # (zarr conversion only applies to normal pipeline flow, not PIPELINE_START jumps) 

196 

197 def _get_dir(self, step: AbstractStep, i: int, pipeline: List, 

198 dir_type: str, fallback: Path = None) -> Path: 

199 """Unified directory resolution - no duplication.""" 

200 sid = i # Use step index instead of step_id 

201 

202 # Check overrides (same for input/output) 

203 if override := self.plans.get(sid, {}).get(f'{dir_type}_dir'): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 return Path(override) 

205 if override := getattr(step, f'__{dir_type}_dir__', None): 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true

206 return Path(override) 

207 

208 # Type-specific logic 

209 if dir_type == 'input': 

210 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

211 return self.initial_input 

212 prev_step_index = i - 1 # Use previous step index instead of step_id 

213 return Path(self.plans[prev_step_index]['output_dir']) 

214 else: # output 

215 if i == 0 or getattr(step, 'input_source', None) == InputSource.PIPELINE_START: 

216 return self._build_output_path() 

217 return fallback # Work in place 

218 

219 @staticmethod 

220 def build_output_plate_root(plate_path: Path, path_config, is_per_step_materialization: bool = False) -> Path: 

221 """Build output plate root directory directly from configuration components. 

222 

223 Formula: (global_output_folder OR plate_path.parent) + plate_name + output_dir_suffix 

224 

225 Results (analysis outputs) should ALWAYS use the output plate path, never the input plate path. 

226 This ensures metadata coherence - ROIs and other analysis results are saved alongside the 

227 processed images they were created from, not with the original input images. 

228 

229 Args: 

230 plate_path: Path to the original plate directory 

231 path_config: PathPlanningConfig with global_output_folder and output_dir_suffix 

232 is_per_step_materialization: Unused (kept for API compatibility) 

233 

234 Returns: 

235 Path to plate root directory (e.g., "/data/results/plate001_processed") 

236 """ 

237 

238 # OMERO paths always use /omero as base, ignore global_output_folder 

239 if str(plate_path).startswith("/omero/"): 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 base = plate_path.parent 

241 elif path_config.global_output_folder: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 base = Path(path_config.global_output_folder) 

243 else: 

244 base = plate_path.parent 

245 

246 # Always append suffix to create output plate path 

247 # If suffix is None/empty, fail loud - this is a configuration error 

248 if not path_config.output_dir_suffix: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 raise ValueError( 

250 f"output_dir_suffix cannot be None or empty. " 

251 f"Results must always use output plate path, not input plate path. " 

252 f"Config: {path_config}" 

253 ) 

254 

255 result = base / f"{plate_path.name}{path_config.output_dir_suffix}" 

256 return result 

257 

258 def _build_output_path(self, path_config=None) -> Path: 

259 """Build complete output path: plate_root + sub_dir""" 

260 config = path_config or self.cfg 

261 

262 # Use the config's own output_dir_suffix to determine plate root 

263 plate_root = self.build_output_plate_root(self.plate_path, config, is_per_step_materialization=False) 

264 return plate_root / config.sub_dir 

265 

266 def _calculate_materialized_output_path(self, materialization_config) -> Path: 

267 """Calculate materialized output path using custom PathPlanningConfig.""" 

268 return self._build_output_path(materialization_config) 

269 

270 def _calculate_input_conversion_path(self, conversion_config) -> Path: 

271 """Calculate input conversion path using custom PathPlanningConfig.""" 

272 return self._build_output_path(conversion_config) 

273 

274 def _get_optional_path(self, config_key: str, step_index: int) -> Optional[Path]: 

275 """Get optional path if config exists.""" 

276 if config_key in self.plans[step_index]: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 config = self.plans[step_index][config_key] 

278 return self._build_output_path(config) 

279 return None 

280 

281 def _process_special(self, items: Any, extra: Any, io_type: str, sid: str) -> Dict: 

282 """Unified special I/O processing - no duplication.""" 

283 result = {} 

284 

285 if io_type == 'output' and items: # Special outputs 

286 results_path = self._get_results_path() 

287 for key in sorted(items): 

288 # Include step index in filename to prevent collisions when multiple steps 

289 # produce the same special output (e.g., two crop_device steps both producing match_results) 

290 filename = PipelinePathPlanner._build_axis_filename(self.ctx.axis_id, key, step_index=sid) 

291 path = results_path / filename 

292 result[key] = { 

293 'path': str(path), 

294 'materialization_function': extra.get(key) # extra is mat_funcs 

295 } 

296 self.declared[key] = str(path) 

297 

298 elif io_type == 'input' and items: # Special inputs 

299 for key in sorted(items.keys() if isinstance(items, dict) else items): 

300 if key in self.declared: 

301 result[key] = {'path': self.declared[key], 'source_step_id': 'prev'} 

302 elif key in extra: # extra is outputs (self-fulfilling) 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 result[key] = {'path': 'self', 'source_step_id': sid} 

304 elif key not in METADATA_RESOLVERS: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 raise ValueError(f"Step {sid} needs '{key}' but it's not available") 

306 

307 return result 

308 

309 def _inject_metadata(self, pattern: Any, inputs: Dict) -> Any: 

310 """Inject metadata for special inputs.""" 

311 for key in inputs: 

312 if key in METADATA_RESOLVERS and key not in self.declared: 312 ↛ 311line 312 didn't jump to line 311 because the condition on line 312 was always true

313 value = METADATA_RESOLVERS[key]["resolver"](self.ctx) 

314 pattern = self._inject_into_pattern(pattern, key, value) 

315 return pattern 

316 

317 def _inject_into_pattern(self, pattern: Any, key: str, value: Any) -> Any: 

318 """Inject value into pattern - handles all cases in 6 lines.""" 

319 if callable(pattern): 

320 return (pattern, {key: value}) 

321 if isinstance(pattern, tuple) and len(pattern) == 2: 321 ↛ 323line 321 didn't jump to line 323 because the condition on line 321 was always true

322 return (pattern[0], {**pattern[1], key: value}) 

323 if isinstance(pattern, list) and len(pattern) == 1: 

324 return [self._inject_into_pattern(pattern[0], key, value)] 

325 raise ValueError(f"Cannot inject into pattern type: {type(pattern)}") 

326 

327 def _normalize_attr(self, attr: Any, target_type: type) -> Any: 

328 """Normalize step attributes - 5 lines, no duplication.""" 

329 if target_type == set: 

330 return {attr} if isinstance(attr, str) else set(attr) if isinstance(attr, (list, set)) else set() 

331 else: # dict 

332 return {attr: True} if isinstance(attr, str) else {k: True for k in attr} if isinstance(attr, list) else attr if isinstance(attr, dict) else {} 

333 

334 def _get_input_source(self, step: AbstractStep, i: int) -> str: 

335 """Get input source string.""" 

336 if step.input_source == InputSource.PIPELINE_START: 

337 return 'PIPELINE_START' 

338 return 'PREVIOUS_STEP' 

339 

340 def _get_results_path(self) -> Path: 

341 """Get results path from global pipeline configuration. 

342 

343 Results must always be stored in the OUTPUT plate, not the input plate. 

344 This ensures metadata coherence - analysis results are saved alongside the 

345 processed images they were created from. 

346 """ 

347 try: 

348 # Access materialization_results_path from global config, not path planning config 

349 path = self.ctx.global_config.materialization_results_path 

350 

351 # Build output plate root to ensure results go to output plate 

352 output_plate_root = self.build_output_plate_root(self.plate_path, self.cfg, is_per_step_materialization=False) 

353 

354 return Path(path) if Path(path).is_absolute() else output_plate_root / path 

355 except AttributeError as e: 

356 # Fallback with clear error message if global config is unavailable 

357 raise RuntimeError(f"Cannot access global config for materialization_results_path: {e}") from e 

358 

359 def _validate(self, pipeline: List): 

360 """Validate connectivity and materialization paths - no duplication.""" 

361 # Existing connectivity validation 

362 for i in range(1, len(pipeline)): 

363 curr, prev = pipeline[i], pipeline[i-1] 

364 if getattr(curr, 'input_source', None) == InputSource.PIPELINE_START: 

365 continue 

366 curr_in = self.plans[i]['input_dir'] # Use step index i 

367 prev_out = self.plans[i-1]['output_dir'] # Use step index i-1 

368 if curr_in != prev_out: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 has_special = any(inp.get('source_step_id') in [i-1, 'prev'] # Check both step index and 'prev' 

370 for inp in self.plans[i].get('special_inputs', {}).values()) # Use step index i 

371 if not has_special: 

372 raise ValueError(f"Disconnect: {prev.name} -> {curr.name}") 

373 

374 # NEW: Materialization path collision validation 

375 self._validate_materialization_paths(pipeline) 

376 

377 

378 def _validate_materialization_paths(self, pipeline: List[AbstractStep]) -> None: 

379 """Validate and resolve materialization path collisions with symmetric conflict resolution.""" 

380 global_path = self._build_output_path(self.cfg) 

381 

382 # Collect all materialization steps with their paths and positions 

383 mat_steps = [ 

384 (step, self.plans.get(i, {}).get('pipeline_position', 0), self._build_output_path(step.step_materialization_config)) 

385 for i, step in enumerate(pipeline) if step.step_materialization_config and step.step_materialization_config.enabled 

386 ] 

387 

388 # Group by path for conflict detection 

389 from collections import defaultdict 

390 path_groups = defaultdict(list) 

391 for step, pos, path in mat_steps: 

392 if path == global_path: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true

393 self._resolve_and_update_paths(step, pos, path, "main flow") 

394 else: 

395 path_groups[str(path)].append((step, pos, path)) 

396 

397 # Resolve materialization vs materialization conflicts 

398 for path_key, step_list in path_groups.items(): 

399 if len(step_list) > 1: 

400 for step, pos, path in step_list: 

401 self._resolve_and_update_paths(step, pos, path, f"pos {pos}") 

402 

403 def _resolve_and_update_paths(self, step: AbstractStep, position: int, original_path: Path, conflict_type: str) -> None: 

404 """Resolve path conflict by updating sub_dir configuration directly.""" 

405 # Lazy configs are already resolved via config_context() in the compiler 

406 # No need to call to_base_config() - that's legacy code 

407 materialization_config = step.step_materialization_config 

408 

409 # Generate unique sub_dir name instead of calculating from paths 

410 original_sub_dir = materialization_config.sub_dir 

411 new_sub_dir = f"{original_sub_dir}_step{position}" 

412 

413 # Update step materialization config with new sub_dir 

414 from dataclasses import replace 

415 step.step_materialization_config = replace(materialization_config, sub_dir=new_sub_dir) 

416 

417 # Recalculate the resolved path using the updated config 

418 resolved_path = self._build_output_path(step.step_materialization_config) 

419 

420 # Update step plans for metadata generation 

421 if step_plan := self.plans.get(position): # Use position (step index) instead of step_id 421 ↛ exitline 421 didn't return from function '_resolve_and_update_paths' because the condition on line 421 was always true

422 if 'materialized_output_dir' in step_plan: 422 ↛ exitline 422 didn't return from function '_resolve_and_update_paths' because the condition on line 422 was always true

423 step_plan['materialized_output_dir'] = str(resolved_path) 

424 step_plan['materialized_sub_dir'] = new_sub_dir # Update stored sub_dir 

425 

426 

427 

428# ===== PUBLIC API ===== 

429 

430class PipelinePathPlanner: 

431 """Public API matching original interface.""" 

432 

433 @staticmethod 

434 def prepare_pipeline_paths(context: ProcessingContext, 

435 pipeline_definition: List[AbstractStep], 

436 pipeline_config) -> Dict: 

437 """Prepare pipeline paths.""" 

438 return PathPlanner(context, pipeline_config).plan(pipeline_definition) 

439 

440 @staticmethod 

441 def _build_axis_filename(axis_id: str, key: str, extension: str = "pkl", step_index: Optional[int] = None) -> str: 

442 """Build standardized axis-based filename with optional step index. 

443 

444 Args: 

445 axis_id: Well/axis identifier (e.g., "R02C02") 

446 key: Special output key (e.g., "match_results") 

447 extension: File extension (default: "pkl") 

448 step_index: Optional step index to prevent collisions when multiple steps 

449 produce the same special output 

450 

451 Returns: 

452 Filename string (e.g., "R02C02_match_results_step3.pkl") 

453 """ 

454 if step_index is not None: 454 ↛ 456line 454 didn't jump to line 456 because the condition on line 454 was always true

455 return f"{axis_id}_{key}_step{step_index}.{extension}" 

456 return f"{axis_id}_{key}.{extension}" 

457 

458 @staticmethod 

459 def build_dict_pattern_path(base_path: str, dict_key: str) -> str: 

460 """Build channel-specific path for dict patterns. 

461 

462 Inserts _w{dict_key} after well ID in the filename. 

463 Example: "dir/A01_rois_step7.pkl" + "1" -> "dir/A01_w1_rois_step7.pkl" 

464 

465 Args: 

466 base_path: Base path without channel component 

467 dict_key: Dict pattern key (e.g., "1" for channel 1) 

468 

469 Returns: 

470 Channel-specific path 

471 """ 

472 # Use Path for cross-platform path handling (Windows uses backslashes) 

473 path = Path(base_path) 

474 dir_part = path.parent 

475 filename = path.name 

476 well_id, rest = filename.split('_', 1) 

477 return str(dir_part / f"{well_id}_w{dict_key}_{rest}") 

478 

479 

480 

481 

482# ===== METADATA ===== 

483 

484METADATA_RESOLVERS = { 

485 "grid_dimensions": { 

486 "resolver": lambda context: context.microscope_handler.get_grid_dimensions(context.plate_path), 

487 "description": "Grid dimensions (num_rows, num_cols) for position generation functions" 

488 }, 

489} 

490 

491def resolve_metadata(key: str, context) -> Any: 

492 """Resolve metadata value.""" 

493 if key not in METADATA_RESOLVERS: 

494 raise ValueError(f"No resolver for '{key}'") 

495 return METADATA_RESOLVERS[key]["resolver"](context) 

496 

497 

498 

499 

500def register_metadata_resolver(key: str, resolver: Callable, description: str): 

501 """Register metadata resolver.""" 

502 METADATA_RESOLVERS[key] = {"resolver": resolver, "description": description} 

503 

504 

505# ===== SCOPE PROMOTION (separate concern) ===== 

506 

507def _apply_scope_promotion_rules(dict_pattern, special_outputs, declared_outputs, step_index, position): 

508 """Scope promotion for single-key dict patterns - 15 lines.""" 

509 if len(dict_pattern) != 1: 

510 return special_outputs, declared_outputs 

511 

512 key_prefix = f"{list(dict_pattern.keys())[0]}_0_" 

513 promoted_out, promoted_decl = special_outputs.copy(), declared_outputs.copy() 

514 

515 for out_key in list(special_outputs.keys()): 

516 if out_key.startswith(key_prefix): 

517 promoted_key = out_key[len(key_prefix):] 

518 if promoted_key in promoted_decl: 

519 raise ValueError(f"Collision: {promoted_key} already exists") 

520 promoted_out[promoted_key] = special_outputs[out_key] 

521 promoted_decl[promoted_key] = { 

522 "step_index": step_index, "position": position, 

523 "path": special_outputs[out_key]["path"] 

524 } 

525 

526 return promoted_out, promoted_decl