Coverage for openhcs/textual_tui/widgets/pipeline_editor.py: 0.0%

430 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2PipelineEditorWidget for OpenHCS Textual TUI 

3 

4Pipeline editing widget with complete button set and reactive state management. 

5Matches the functionality from the current prompt-toolkit TUI. 

6""" 

7 

8import logging 

9from typing import Dict, List, Optional, Tuple 

10from pathlib import Path 

11 

12from textual.reactive import reactive 

13from textual.widgets import SelectionList 

14from .button_list_widget import ButtonListWidget, ButtonConfig 

15 

16from openhcs.core.config import GlobalPipelineConfig 

17from openhcs.io.filemanager import FileManager 

18from openhcs.core.steps.function_step import FunctionStep 

19from openhcs.constants.constants import OrchestratorState 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24class PipelineEditorWidget(ButtonListWidget): 

25 """ 

26 Pipeline editing widget using Textual reactive state. 

27  

28 Features: 

29 - Complete button set: Add, Del, Edit, Load, Save 

30 - Reactive state management for automatic UI updates 

31 - Scrollable content area 

32 - Integration with plate selection from PlateManager 

33 """ 

34 

35 # Textual reactive state 

36 pipeline_steps = reactive([]) 

37 current_plate = reactive("") 

38 selected_step = reactive("") 

39 plate_pipelines = reactive({}) # {plate_path: List[FunctionStep]} - per-plate pipeline storage 

40 

41 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig): 

42 """ 

43 Initialize the pipeline editor widget. 

44 

45 Args: 

46 filemanager: FileManager instance for file operations 

47 global_config: Global configuration 

48 """ 

49 # Define button configuration 

50 button_configs = [ 

51 ButtonConfig("Add", "add_step", disabled=True), 

52 ButtonConfig("Del", "del_step", disabled=True), 

53 ButtonConfig("Edit", "edit_step", disabled=True), 

54 ButtonConfig("Auto", "auto_load_pipeline", disabled=True), 

55 ButtonConfig("Code", "code_pipeline", disabled=True), 

56 ] 

57 

58 super().__init__( 

59 button_configs=button_configs, 

60 list_id="pipeline_content", 

61 container_id="pipeline_list", 

62 on_button_pressed=self._handle_button_press, 

63 on_selection_changed=self._handle_selection_change, 

64 on_item_moved=self._handle_item_moved 

65 ) 

66 

67 self.filemanager = filemanager 

68 # Note: We don't store global_config as it can become stale 

69 # Always use self.app.global_config to get the current config 

70 

71 # Reference to plate manager (set by MainContent) 

72 self.plate_manager = None 

73 

74 logger.debug("PipelineEditorWidget initialized") 

75 

76 def format_item_for_display(self, step: FunctionStep) -> Tuple[str, str]: 

77 """Format step for display in the list.""" 

78 step_name = getattr(step, 'name', 'Unknown Step') 

79 display_text = f"📋 {step_name}" 

80 return display_text, step_name 

81 

82 def _is_current_plate_initialized(self) -> bool: 

83 """Check if current plate has an initialized orchestrator.""" 

84 if not self.current_plate or not self.plate_manager: 

85 logger.debug(f"PipelineEditor: No current plate ({self.current_plate}) or plate_manager ({self.plate_manager})") 

86 return False 

87 

88 orchestrator = self.plate_manager.orchestrators.get(self.current_plate) 

89 if orchestrator is None: 

90 logger.debug(f"PipelineEditor: No orchestrator found for plate {self.current_plate}") 

91 return False 

92 

93 is_initialized = orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED, 

94 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED, 

95 OrchestratorState.EXEC_FAILED] 

96 logger.debug(f"PipelineEditor: Plate {self.current_plate} orchestrator state: {orchestrator.state}, initialized: {is_initialized}") 

97 return is_initialized 

98 

99 async def _handle_button_press(self, button_id: str) -> None: 

100 """Handle button presses from ButtonListWidget (supports async actions).""" 

101 

102 if button_id == "add_step": 

103 await self.action_add_step() 

104 elif button_id == "del_step": 

105 self.action_delete_step() 

106 elif button_id == "edit_step": 

107 await self.action_edit_step() 

108 elif button_id == "auto_load_pipeline": 

109 await self.action_auto_load_pipeline() 

110 elif button_id == "code_pipeline": 

111 await self.action_code_pipeline() 

112 

113 def _handle_selection_change(self, selected_values: List[str]) -> None: 

114 """Handle selection changes from ButtonListWidget.""" 

115 # Update selected_step - use first selected item if any 

116 if selected_values: 

117 self.selected_step = selected_values[0] # This is the step name 

118 else: 

119 self.selected_step = "" 

120 

121 def _handle_item_moved(self, from_index: int, to_index: int) -> None: 

122 """Handle item movement from ButtonListWidget.""" 

123 current_steps = list(self.pipeline_steps) 

124 

125 # Move the step 

126 step = current_steps.pop(from_index) 

127 current_steps.insert(to_index, step) 

128 

129 # Update pipeline steps 

130 self.pipeline_steps = current_steps 

131 

132 step_name = getattr(step, 'name', 'Unknown Step') 

133 direction = "up" if to_index < from_index else "down" 

134 self.app.current_status = f"Moved step '{step_name}' {direction}" 

135 

136 def on_selection_list_selected_changed(self, event: SelectionList.SelectedChanged) -> None: 

137 """Handle selection changes from SelectionList.""" 

138 selected_values = event.selection_list.selected 

139 

140 # Update selected_step - use first selected item if any 

141 if selected_values: 

142 self.selected_step = selected_values[0] # This is the step name/id 

143 else: 

144 self.selected_step = "" 

145 

146 # Update button states based on selection 

147 self._update_button_states_for_selection(selected_values) 

148 

149 def _update_button_states_for_selection(self, selected_values: List[str]) -> None: 

150 """Update button states based on current selection and mathematical constraints.""" 

151 try: 

152 has_plate = bool(self.current_plate) 

153 is_initialized = self._is_current_plate_initialized() 

154 has_steps = len(self.pipeline_steps) > 0 

155 has_selection = len(selected_values) > 0 

156 

157 # Mathematical constraints: 

158 # - Pipeline editing requires initialization 

159 # - Edit requires exactly one selection 

160 self.query_one("#add_step").disabled = not (has_plate and is_initialized) 

161 self.query_one("#del_step").disabled = not has_selection 

162 self.query_one("#edit_step").disabled = not (len(selected_values) == 1) # Edit requires exactly one selection 

163 self.query_one("#auto_load_pipeline").disabled = not (has_plate and is_initialized) 

164 self.query_one("#code_pipeline").disabled = not (has_plate and is_initialized) # Same as add button 

165 

166 except Exception: 

167 # Buttons might not be mounted yet 

168 pass 

169 

170 def get_selection_state(self) -> tuple[List[FunctionStep], str]: 

171 """Get current selection state from SelectionList.""" 

172 try: 

173 selection_list = self.query_one("#pipeline_content", SelectionList) 

174 selected_values = selection_list.selected 

175 

176 # Convert selected values back to step objects 

177 selected_items = [] 

178 for step in self.pipeline_steps: 

179 step_name = getattr(step, 'name', '') 

180 if step_name in selected_values: 

181 selected_items.append(step) 

182 

183 # Determine selection mode 

184 if not selected_items: 

185 selection_mode = "empty" 

186 elif len(selected_items) == len(self.pipeline_steps): 

187 selection_mode = "all" 

188 else: 

189 selection_mode = "checkbox" # SelectionList is always checkbox-based 

190 

191 return selected_items, selection_mode 

192 except Exception: 

193 # Fallback if widget not mounted 

194 return [], "empty" 

195 

196 def watch_current_plate(self, plate_path: str) -> None: 

197 """Automatically update UI when current_plate changes.""" 

198 logger.debug(f"Current plate changed: {plate_path}") 

199 

200 # Load pipeline for the new plate WITHOUT triggering save/invalidation 

201 if plate_path: 

202 # Get pipeline for this plate (or empty if none exists) 

203 plate_pipeline = self.plate_pipelines.get(plate_path, []) 

204 # Set pipeline_steps directly without triggering reactive save 

205 self._set_pipeline_steps_without_save(plate_pipeline) 

206 else: 

207 # No plate selected - clear steps 

208 self._set_pipeline_steps_without_save([]) 

209 

210 # Clear selection when plate changes 

211 self.selected_step = "" 

212 

213 # Update button states 

214 self._update_button_states() 

215 

216 def _set_pipeline_steps_without_save(self, steps: List[FunctionStep]) -> None: 

217 """Set pipeline steps without triggering save/invalidation (for loading existing data).""" 

218 # Temporarily disable the reactive watcher to prevent save cascade 

219 self._loading_existing_pipeline = True 

220 self.pipeline_steps = steps 

221 # Sync with ButtonListWidget's items property 

222 self.items = list(steps) 

223 self._loading_existing_pipeline = False 

224 

225 def watch_pipeline_steps(self, steps: List[FunctionStep]) -> None: 

226 """Automatically update UI when pipeline_steps changes.""" 

227 # Sync with ButtonListWidget's items property to trigger its reactive system 

228 self.items = list(steps) 

229 

230 logger.debug(f"Pipeline steps updated: {len(steps)} steps") 

231 

232 # Only save/invalidate if this is a real change, not loading existing data 

233 if not getattr(self, '_loading_existing_pipeline', False): 

234 # Save pipeline changes to plate storage 

235 self._save_pipeline_to_plate_storage() 

236 

237 def _save_pipeline_to_plate_storage(self) -> None: 

238 """Save current pipeline steps to plate storage and invalidate compilation.""" 

239 if self.current_plate: 

240 # Update plate pipelines storage 

241 current_pipelines = dict(self.plate_pipelines) 

242 current_pipelines[self.current_plate] = list(self.pipeline_steps) 

243 self.plate_pipelines = current_pipelines 

244 logger.debug(f"Saved {len(self.pipeline_steps)} steps for plate: {self.current_plate}") 

245 

246 # Invalidate compilation status when pipeline changes 

247 self._invalidate_compilation_status() 

248 

249 def get_pipeline_for_plate(self, plate_path: str) -> List[FunctionStep]: 

250 """Get pipeline for specific plate.""" 

251 return self.plate_pipelines.get(plate_path, []) 

252 

253 def save_pipeline_for_plate(self, plate_path: str, pipeline: List[FunctionStep]) -> None: 

254 """Save pipeline for specific plate.""" 

255 current_pipelines = dict(self.plate_pipelines) 

256 current_pipelines[plate_path] = pipeline 

257 self.plate_pipelines = current_pipelines 

258 

259 def clear_pipeline_for_plate(self, plate_path: str) -> None: 

260 """Clear pipeline for specific plate.""" 

261 current_pipelines = dict(self.plate_pipelines) 

262 if plate_path in current_pipelines: 

263 del current_pipelines[plate_path] 

264 self.plate_pipelines = current_pipelines 

265 

266 def _invalidate_compilation_status(self) -> None: 

267 """Reset compilation status when pipeline definition changes.""" 

268 if not self.plate_manager or not self.current_plate: 

269 return 

270 

271 # Clear compiled data from simple state 

272 if self.current_plate in self.plate_manager.plate_compiled_data: 

273 del self.plate_manager.plate_compiled_data[self.current_plate] 

274 

275 # Reset orchestrator state to READY (initialized) 

276 orchestrator = self.plate_manager.orchestrators.get(self.current_plate) 

277 if orchestrator and orchestrator.state == OrchestratorState.COMPILED: 

278 orchestrator._state = OrchestratorState.READY 

279 

280 # Trigger UI refresh after orchestrator state change 

281 if self.plate_manager: 

282 self.plate_manager._trigger_ui_refresh() 

283 self.plate_manager._update_button_states() 

284 

285 

286 

287 def watch_selected_step(self, step_id: str) -> None: 

288 """Automatically update UI when selected_step changes.""" 

289 self._update_button_states() 

290 logger.debug(f"Selected step: {step_id}") 

291 

292 def _update_button_states(self) -> None: 

293 """Update button enabled/disabled states based on mathematical constraints.""" 

294 try: 

295 has_plate = bool(self.current_plate) 

296 is_initialized = self._is_current_plate_initialized() 

297 has_steps = len(self.pipeline_steps) > 0 

298 has_valid_selection = bool(self.selected_step) and self._find_step_index_by_selection() is not None 

299 

300 logger.debug(f"PipelineEditor: Button state update - has_plate: {has_plate}, is_initialized: {is_initialized}, has_steps: {has_steps}") 

301 

302 # Mathematical constraints: 

303 # - Pipeline editing requires initialization 

304 # - Step operations require steps to exist 

305 # - Edit requires valid selection that maps to actual step 

306 add_enabled = has_plate and is_initialized 

307 load_enabled = has_plate and is_initialized 

308 code_enabled = has_plate and is_initialized # Same as add button - orchestrator init is sufficient 

309 

310 logger.debug(f"PipelineEditor: Setting add_step.disabled = {not add_enabled}, load_pipeline.disabled = {not load_enabled}") 

311 

312 self.query_one("#add_step").disabled = not add_enabled 

313 self.query_one("#del_step").disabled = not has_steps 

314 self.query_one("#edit_step").disabled = not (has_steps and has_valid_selection) 

315 self.query_one("#load_pipeline").disabled = not load_enabled 

316 self.query_one("#save_pipeline").disabled = not has_steps 

317 self.query_one("#code_pipeline").disabled = not code_enabled # Changed from has_steps to code_enabled 

318 except Exception: 

319 # Buttons might not be mounted yet 

320 pass 

321 

322 

323 

324 async def action_add_step(self) -> None: 

325 """Handle Add Step button - now triggers modal.""" 

326 

327 def handle_result(result: Optional[FunctionStep]) -> None: 

328 if result: # User saved new step 

329 # Store the actual FunctionStep object directly (preserves memory type decorators) 

330 new_steps = self.pipeline_steps + [result] 

331 self.pipeline_steps = new_steps 

332 self.app.current_status = f"Added step: {result.name}" 

333 else: 

334 self.app.current_status = "Add step cancelled" 

335 

336 # LAZY IMPORT to avoid circular import 

337 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow 

338 from textual.css.query import NoMatches 

339 

340 # Use window-based dual editor (follows ConfigWindow pattern) 

341 try: 

342 window = self.app.query_one(DualEditorWindow) 

343 # Window exists, update it for new step and open 

344 window.editing_step = window.pattern_manager.create_new_step() 

345 window.is_new = True 

346 window.on_save_callback = handle_result 

347 window.original_step = window.pattern_manager.clone_pattern(window.editing_step) 

348 window.open_state = True 

349 except NoMatches: 

350 # Expected case: window doesn't exist yet, create new one 

351 window = DualEditorWindow(step_data=None, is_new=True, on_save_callback=handle_result) 

352 await self.app.mount(window) 

353 window.open_state = True 

354 

355 def action_delete_step(self) -> None: 

356 """Handle Delete Step button - delete selected steps.""" 

357 

358 # Get current selection state 

359 selected_items, selection_mode = self.get_selection_state() 

360 

361 if selection_mode == "empty": 

362 self.app.current_status = "No steps available for deletion" 

363 return 

364 

365 # Generate description and perform deletion 

366 count = len(selected_items) 

367 if selection_mode == "empty": 

368 desc = "No items available for deletion" 

369 elif selection_mode == "all": 

370 desc = f"Delete ALL {count} items" 

371 elif count == 1: 

372 item_name = getattr(selected_items[0], 'name', 'Unknown') 

373 desc = f"Delete selected item: {item_name}" 

374 else: 

375 desc = f"Delete {count} selected items" 

376 

377 # Remove selected steps 

378 current_steps = list(self.pipeline_steps) 

379 steps_to_remove = set(getattr(item, 'name', '') for item in selected_items) 

380 

381 # Filter out selected steps 

382 new_steps = [step for step in current_steps if getattr(step, 'name', '') not in steps_to_remove] 

383 

384 # Update pipeline steps (this will trigger save to plate storage) 

385 self.pipeline_steps = new_steps 

386 

387 deleted_count = len(current_steps) - len(new_steps) 

388 self.app.current_status = f"Deleted {deleted_count} steps" 

389 

390 def _dict_to_function_step(self, step_dict: Dict) -> FunctionStep: 

391 """Convert step dict to FunctionStep object with proper data preservation.""" 

392 # Extract function - handle both callable and registry lookup 

393 func = step_dict.get("func") 

394 if func is None: 

395 # Fallback to default function if missing 

396 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

397 registry = RegistryService() 

398 func = registry.find_default_function() 

399 logger.warning(f"Step '{step_dict.get('name', 'Unknown')}' missing function, using default") 

400 

401 # Extract variable components - handle both list and string formats 

402 var_components = step_dict.get("variable_components", []) 

403 if isinstance(var_components, str): 

404 var_components = [var_components] 

405 elif not isinstance(var_components, list): 

406 var_components = [] 

407 

408 # Only pass variable_components if it's not empty, let FunctionStep use its default otherwise 

409 step_kwargs = { 

410 "func": func, 

411 "name": step_dict.get("name", "Unknown Step"), 

412 "group_by": step_dict.get("group_by", "") 

413 } 

414 if var_components: # Only add if not empty 

415 step_kwargs["variable_components"] = var_components 

416 

417 return FunctionStep(**step_kwargs) 

418 

419 def _function_step_to_dict(self, step: FunctionStep) -> Dict: 

420 """Convert FunctionStep object to dict with complete data preservation.""" 

421 return { 

422 "name": step.name, 

423 "type": "function", 

424 "func": step.func, 

425 "variable_components": step.variable_components, 

426 "group_by": step.group_by 

427 } 

428 

429 def _find_step_index_by_selection(self) -> Optional[int]: 

430 """Find the index of the currently selected step.""" 

431 if not self.selected_step: 

432 return None 

433 

434 # selected_step contains the step name/id 

435 for i, step in enumerate(self.pipeline_steps): 

436 # Now step is a FunctionStep object, not a dict 

437 step_name = getattr(step, 'name', f"Step {i+1}") 

438 if step_name == self.selected_step: 

439 return i 

440 return None 

441 

442 async def action_edit_step(self) -> None: 

443 """Handle Edit Step button with proper selection and data preservation.""" 

444 

445 if not self.pipeline_steps: 

446 self.app.current_status = "No steps to edit" 

447 return 

448 

449 # Find selected step index 

450 step_index = self._find_step_index_by_selection() 

451 if step_index is None: 

452 self.app.current_status = "No step selected for editing" 

453 return 

454 

455 step_to_edit = self.pipeline_steps[step_index] 

456 

457 def handle_result(result: Optional[FunctionStep]) -> None: 

458 if result: # User saved changes 

459 # Store the actual FunctionStep object directly (preserves memory type decorators) 

460 updated_steps = self.pipeline_steps.copy() 

461 updated_steps[step_index] = result 

462 self.pipeline_steps = updated_steps 

463 self.app.current_status = f"Updated step: {result.name}" 

464 else: 

465 self.app.current_status = "Edit step cancelled" 

466 

467 # Use the actual FunctionStep object directly (no conversion needed) 

468 edit_step = step_to_edit 

469 

470 # LAZY IMPORT to avoid circular import 

471 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow 

472 from textual.css.query import NoMatches 

473 

474 # Use window-based dual editor (follows ConfigWindow pattern) 

475 try: 

476 window = self.app.query_one(DualEditorWindow) 

477 # Window exists, update it for editing existing step and open 

478 window.editing_step = edit_step 

479 window.is_new = False 

480 window.on_save_callback = handle_result 

481 window.original_step = window.pattern_manager.clone_pattern(window.editing_step) 

482 window.open_state = True 

483 except NoMatches: 

484 # Expected case: window doesn't exist yet, create new one 

485 window = DualEditorWindow(step_data=edit_step, is_new=False, on_save_callback=handle_result) 

486 await self.app.mount(window) 

487 window.open_state = True 

488 

489 async def action_auto_load_pipeline(self) -> None: 

490 """Handle Auto button - load basic_pipeline.py automatically.""" 

491 if not self.current_plate: 

492 self.app.current_status = "No plate selected" 

493 return 

494 

495 try: 

496 from pathlib import Path 

497 

498 # Find basic_pipeline.py relative to openhcs package 

499 import openhcs 

500 openhcs_root = Path(openhcs.__file__).parent 

501 pipeline_file = openhcs_root / "tests" / "basic_pipeline.py" 

502 

503 if not pipeline_file.exists(): 

504 self.app.current_status = f"Pipeline file not found: {pipeline_file}" 

505 return 

506 

507 # Read the file content 

508 python_code = pipeline_file.read_text() 

509 

510 # Execute the code to get pipeline_steps (same as code editor logic) 

511 namespace = {} 

512 exec(python_code, namespace) 

513 

514 # Get the pipeline_steps from the namespace 

515 if 'pipeline_steps' in namespace: 

516 new_pipeline_steps = namespace['pipeline_steps'] 

517 # Update the pipeline with new steps 

518 self.pipeline_steps = new_pipeline_steps 

519 self.update_step_list() 

520 self.app.current_status = f"Auto-loaded {len(new_pipeline_steps)} steps from basic_pipeline.py" 

521 else: 

522 raise ValueError("No 'pipeline_steps = [...]' assignment found in basic_pipeline.py") 

523 

524 except Exception as e: 

525 logger.error(f"Failed to auto-load basic_pipeline.py: {e}") 

526 self.app.current_status = f"Failed to auto-load pipeline: {str(e)}" 

527 

528 async def action_load_pipeline(self) -> None: 

529 """Handle Load Pipeline button - load pipeline from file.""" 

530 

531 if not self.current_plate: 

532 self.app.current_status = "No plate selected for loading pipeline" 

533 return 

534 

535 # Launch enhanced file browser for .pipeline files 

536 def handle_result(result): 

537 from pathlib import Path # Import at the top of the function 

538 

539 # Handle different result types from file browser 

540 paths_to_load = [] 

541 

542 if isinstance(result, Path): 

543 # Single Path object 

544 paths_to_load = [result] 

545 elif isinstance(result, list) and len(result) > 0: 

546 # List of paths - support multiple pipeline files 

547 for item in result: 

548 if isinstance(item, Path): 

549 paths_to_load.append(item) 

550 else: 

551 paths_to_load.append(Path(item)) 

552 elif isinstance(result, str): 

553 # String path 

554 paths_to_load = [Path(result)] 

555 

556 if paths_to_load: 

557 logger.debug(f"Loading {len(paths_to_load)} pipeline files") 

558 self._load_multiple_pipeline_files(paths_to_load) 

559 else: 

560 self.app.current_status = "Load pipeline cancelled" 

561 

562 # Create file browser window for .pipeline files 

563 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

564 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

565 from openhcs.constants.constants import Backend 

566 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

567 

568 await open_file_browser_window( 

569 app=self.app, 

570 file_manager=self.filemanager, 

571 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES), 

572 backend=Backend.DISK, 

573 title="Load Pipeline (.pipeline)", 

574 mode=BrowserMode.LOAD, 

575 selection_mode=SelectionMode.FILES_ONLY, 

576 filter_extensions=['.pipeline'], 

577 cache_key=PathCacheKey.PIPELINE_FILES, 

578 on_result_callback=handle_result, 

579 caller_id="pipeline_editor" 

580 ) 

581 

582 def _load_multiple_pipeline_files(self, file_paths: List[Path]) -> None: 

583 """Load and concatenate steps from multiple pipeline files.""" 

584 all_steps = [] 

585 loaded_files = [] 

586 failed_files = [] 

587 

588 for file_path in file_paths: 

589 try: 

590 steps = self._load_single_pipeline_file(file_path) 

591 if steps: 

592 all_steps.extend(steps) 

593 loaded_files.append(file_path.name) 

594 logger.info(f"✅ Loaded {len(steps)} steps from {file_path.name}") 

595 else: 

596 failed_files.append(file_path.name) 

597 except Exception as e: 

598 logger.error(f"❌ Failed to load {file_path.name}: {e}") 

599 failed_files.append(file_path.name) 

600 

601 if all_steps: 

602 # Replace current pipeline with concatenated steps 

603 self.pipeline_steps = all_steps 

604 

605 # Apply to multiple orchestrators if they are selected 

606 self._apply_pipeline_to_selected_orchestrators(all_steps) 

607 

608 # Create status message 

609 if len(loaded_files) == 1: 

610 status = f"Loaded {len(all_steps)} steps from {loaded_files[0]}" 

611 else: 

612 status = f"Loaded {len(all_steps)} steps from {len(loaded_files)} files: {', '.join(loaded_files)}" 

613 

614 if failed_files: 

615 status += f" (Failed: {', '.join(failed_files)})" 

616 

617 self.app.current_status = status 

618 logger.info(f"🎯 Total pipeline: {len(all_steps)} steps from {len(loaded_files)} files") 

619 else: 

620 self.app.current_status = f"No valid pipeline steps loaded from {len(file_paths)} files" 

621 

622 def _load_single_pipeline_file(self, file_path: Path) -> List: 

623 """Load pipeline steps from a single .pipeline file.""" 

624 import dill as pickle 

625 try: 

626 with open(file_path, 'rb') as f: 

627 pattern = pickle.load(f) 

628 

629 if isinstance(pattern, list): 

630 return pattern 

631 else: 

632 logger.error(f"Invalid pipeline format in {file_path.name}: expected list, got {type(pattern)}") 

633 return [] 

634 except Exception as e: 

635 logger.error(f"Failed to load pipeline from {file_path.name}: {e}") 

636 raise 

637 

638 def _apply_pipeline_to_selected_orchestrators(self, pipeline_steps: List) -> None: 

639 """Apply loaded pipeline to all selected orchestrators.""" 

640 if not self.plate_manager: 

641 return 

642 

643 # Get selected orchestrators from plate manager 

644 selected_items, selection_mode = self.plate_manager.get_selection_state() 

645 

646 if selection_mode == "empty" or len(selected_items) <= 1: 

647 # Single or no selection - normal behavior 

648 return 

649 

650 # Multiple orchestrators selected - apply pipeline to all 

651 applied_count = 0 

652 for item in selected_items: 

653 plate_path = item['path'] 

654 if plate_path in self.plate_manager.orchestrators: 

655 orchestrator = self.plate_manager.orchestrators[plate_path] 

656 orchestrator.pipeline_definition = list(pipeline_steps) 

657 

658 # Also save to our plate pipelines storage 

659 self.save_pipeline_for_plate(plate_path, list(pipeline_steps)) 

660 applied_count += 1 

661 

662 if applied_count > 1: 

663 self.app.current_status += f" → Applied to {applied_count} orchestrators" 

664 logger.info(f"Applied pipeline to {applied_count} selected orchestrators") 

665 

666 def _load_pipeline_from_file(self, file_path: Path) -> None: 

667 """Load pipeline from .pipeline file (legacy single-file method).""" 

668 try: 

669 steps = self._load_single_pipeline_file(file_path) 

670 if steps: 

671 self.pipeline_steps = steps 

672 self.app.current_status = f"Loaded {len(steps)} steps from {file_path.name}" 

673 else: 

674 self.app.current_status = f"Invalid pipeline format in {file_path.name}" 

675 except Exception as e: 

676 self.app.current_status = f"Failed to load pipeline: {e}" 

677 

678 async def action_save_pipeline(self) -> None: 

679 """Handle Save Pipeline button - save pipeline to file.""" 

680 

681 if not self.current_plate: 

682 self.app.current_status = "No plate selected for saving pipeline" 

683 return 

684 

685 if not self.pipeline_steps: 

686 self.app.current_status = "No pipeline steps to save" 

687 return 

688 

689 # Launch enhanced file browser for saving pipeline 

690 def handle_result(result): 

691 if result and isinstance(result, Path): 

692 self._save_pipeline_to_file(result) 

693 else: 

694 self.app.current_status = "Save pipeline cancelled" 

695 

696 # Create file browser window for saving .pipeline files 

697 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

698 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

699 from openhcs.constants.constants import Backend 

700 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

701 

702 # Generate default filename from plate name 

703 plate_name = Path(self.current_plate).name if self.current_plate else "pipeline" 

704 default_filename = f"{plate_name}.pipeline" 

705 

706 await open_file_browser_window( 

707 app=self.app, 

708 file_manager=self.filemanager, 

709 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES), 

710 backend=Backend.DISK, 

711 title="Save Pipeline (.pipeline)", 

712 mode=BrowserMode.SAVE, 

713 selection_mode=SelectionMode.FILES_ONLY, 

714 filter_extensions=['.pipeline'], 

715 default_filename=default_filename, 

716 cache_key=PathCacheKey.PIPELINE_FILES, 

717 on_result_callback=handle_result, 

718 caller_id="pipeline_editor" 

719 ) 

720 

721 def _save_pipeline_to_file(self, file_path: Path) -> None: 

722 """Save pipeline to .pipeline file.""" 

723 import dill as pickle 

724 try: 

725 with open(file_path, 'wb') as f: 

726 pickle.dump(list(self.pipeline_steps), f) 

727 self.app.current_status = f"Saved pipeline to {file_path.name}" 

728 except Exception as e: 

729 logger.error(f"Failed to save pipeline: {e}") 

730 self.app.current_status = f"Failed to save pipeline: {e}" 

731 

732 async def action_code_pipeline(self) -> None: 

733 """Edit pipeline as Python code in terminal window.""" 

734 logger.debug("Code button pressed - opening pipeline editor") 

735 

736 if not self.current_plate: 

737 self.app.current_status = "No plate selected" 

738 return 

739 

740 try: 

741 # Use complete pipeline steps code generation 

742 from openhcs.debug.pickle_to_python import generate_complete_pipeline_steps_code 

743 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher 

744 

745 # Generate complete pipeline steps code with imports 

746 python_code = generate_complete_pipeline_steps_code( 

747 pipeline_steps=list(self.pipeline_steps), 

748 clean_mode=True 

749 ) 

750 

751 # Create callback to handle edited code 

752 def handle_edited_code(edited_code: str): 

753 logger.debug("Pipeline code edited, processing changes...") 

754 try: 

755 # Execute the code (it has all necessary imports) 

756 namespace = {} 

757 exec(edited_code, namespace) 

758 

759 # Get the pipeline_steps from the namespace 

760 if 'pipeline_steps' in namespace: 

761 new_pipeline_steps = namespace['pipeline_steps'] 

762 # Update the pipeline with new steps 

763 self.pipeline_steps = new_pipeline_steps 

764 self.app.current_status = f"Pipeline updated with {len(new_pipeline_steps)} steps" 

765 else: 

766 self.app.show_error("Parse Error", "No 'pipeline_steps = [...]' assignment found in edited code") 

767 

768 except SyntaxError as e: 

769 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}") 

770 except Exception as e: 

771 logger.error(f"Failed to parse edited pipeline code: {e}") 

772 self.app.show_error("Edit Error", f"Failed to parse pipeline code: {str(e)}") 

773 

774 # Launch terminal editor 

775 launcher = TerminalLauncher(self.app) 

776 await launcher.launch_editor_for_file( 

777 file_content=python_code, 

778 file_extension='.py', 

779 on_save_callback=handle_edited_code 

780 ) 

781 

782 except Exception as e: 

783 logger.error(f"Failed to open pipeline code editor: {e}") 

784 self.app.current_status = f"Failed to open code editor: {e}"