Coverage for openhcs/textual_tui/widgets/pipeline_editor.py: 0.0%

415 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2PipelineEditorWidget for OpenHCS Textual TUI 

3 

4Pipeline editing widget with complete button set and reactive state management. 

5Matches the functionality from the current prompt-toolkit TUI. 

6""" 

7 

8import logging 

9from typing import Dict, List, Optional, Any, Callable, Tuple 

10from pathlib import Path 

11 

12from textual.app import ComposeResult 

13from textual.containers import Horizontal, ScrollableContainer, Vertical 

14from textual.reactive import reactive 

15from textual.widgets import Button, Static, SelectionList 

16from textual.widget import Widget 

17from .button_list_widget import ButtonListWidget, ButtonConfig 

18from textual import work 

19 

20from openhcs.core.config import GlobalPipelineConfig 

21from openhcs.io.filemanager import FileManager 

22from openhcs.core.steps.function_step import FunctionStep 

23from openhcs.constants.constants import OrchestratorState 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class PipelineEditorWidget(ButtonListWidget): 

29 """ 

30 Pipeline editing widget using Textual reactive state. 

31  

32 Features: 

33 - Complete button set: Add, Del, Edit, Load, Save 

34 - Reactive state management for automatic UI updates 

35 - Scrollable content area 

36 - Integration with plate selection from PlateManager 

37 """ 

38 

39 # Textual reactive state 

40 pipeline_steps = reactive([]) 

41 current_plate = reactive("") 

42 selected_step = reactive("") 

43 plate_pipelines = reactive({}) # {plate_path: List[FunctionStep]} - per-plate pipeline storage 

44 

45 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig): 

46 """ 

47 Initialize the pipeline editor widget. 

48 

49 Args: 

50 filemanager: FileManager instance for file operations 

51 global_config: Global configuration 

52 """ 

53 # Define button configuration 

54 button_configs = [ 

55 ButtonConfig("Add", "add_step", disabled=True), 

56 ButtonConfig("Del", "del_step", disabled=True), 

57 ButtonConfig("Edit", "edit_step", disabled=True), 

58 ButtonConfig("Load", "load_pipeline", disabled=True), 

59 ButtonConfig("Save", "save_pipeline", disabled=True), 

60 ButtonConfig("Code", "code_pipeline", disabled=True), 

61 ] 

62 

63 super().__init__( 

64 button_configs=button_configs, 

65 list_id="pipeline_content", 

66 container_id="pipeline_list", 

67 on_button_pressed=self._handle_button_press, 

68 on_selection_changed=self._handle_selection_change, 

69 on_item_moved=self._handle_item_moved 

70 ) 

71 

72 self.filemanager = filemanager 

73 # Note: We don't store global_config as it can become stale 

74 # Always use self.app.global_config to get the current config 

75 

76 # Reference to plate manager (set by MainContent) 

77 self.plate_manager = None 

78 

79 logger.debug("PipelineEditorWidget initialized") 

80 

81 def format_item_for_display(self, step: FunctionStep) -> Tuple[str, str]: 

82 """Format step for display in the list.""" 

83 step_name = getattr(step, 'name', 'Unknown Step') 

84 display_text = f"📋 {step_name}" 

85 return display_text, step_name 

86 

87 def _is_current_plate_initialized(self) -> bool: 

88 """Check if current plate has an initialized orchestrator.""" 

89 if not self.current_plate or not self.plate_manager: 

90 logger.debug(f"PipelineEditor: No current plate ({self.current_plate}) or plate_manager ({self.plate_manager})") 

91 return False 

92 

93 orchestrator = self.plate_manager.orchestrators.get(self.current_plate) 

94 if orchestrator is None: 

95 logger.debug(f"PipelineEditor: No orchestrator found for plate {self.current_plate}") 

96 return False 

97 

98 is_initialized = orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED, 

99 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED, 

100 OrchestratorState.EXEC_FAILED] 

101 logger.debug(f"PipelineEditor: Plate {self.current_plate} orchestrator state: {orchestrator.state}, initialized: {is_initialized}") 

102 return is_initialized 

103 

104 async def _handle_button_press(self, button_id: str) -> None: 

105 """Handle button presses from ButtonListWidget (supports async actions).""" 

106 import inspect 

107 

108 if button_id == "add_step": 

109 await self.action_add_step() 

110 elif button_id == "del_step": 

111 self.action_delete_step() 

112 elif button_id == "edit_step": 

113 await self.action_edit_step() 

114 elif button_id == "load_pipeline": 

115 await self.action_load_pipeline() 

116 elif button_id == "save_pipeline": 

117 await self.action_save_pipeline() 

118 elif button_id == "code_pipeline": 

119 await self.action_code_pipeline() 

120 

121 def _handle_selection_change(self, selected_values: List[str]) -> None: 

122 """Handle selection changes from ButtonListWidget.""" 

123 # Update selected_step - use first selected item if any 

124 if selected_values: 

125 self.selected_step = selected_values[0] # This is the step name 

126 else: 

127 self.selected_step = "" 

128 

129 def _handle_item_moved(self, from_index: int, to_index: int) -> None: 

130 """Handle item movement from ButtonListWidget.""" 

131 current_steps = list(self.pipeline_steps) 

132 

133 # Move the step 

134 step = current_steps.pop(from_index) 

135 current_steps.insert(to_index, step) 

136 

137 # Update pipeline steps 

138 self.pipeline_steps = current_steps 

139 

140 step_name = getattr(step, 'name', 'Unknown Step') 

141 direction = "up" if to_index < from_index else "down" 

142 self.app.current_status = f"Moved step '{step_name}' {direction}" 

143 

144 def on_selection_list_selected_changed(self, event: SelectionList.SelectedChanged) -> None: 

145 """Handle selection changes from SelectionList.""" 

146 selected_values = event.selection_list.selected 

147 

148 # Update selected_step - use first selected item if any 

149 if selected_values: 

150 self.selected_step = selected_values[0] # This is the step name/id 

151 else: 

152 self.selected_step = "" 

153 

154 # Update button states based on selection 

155 self._update_button_states_for_selection(selected_values) 

156 

157 def _update_button_states_for_selection(self, selected_values: List[str]) -> None: 

158 """Update button states based on current selection and mathematical constraints.""" 

159 try: 

160 has_plate = bool(self.current_plate) 

161 is_initialized = self._is_current_plate_initialized() 

162 has_steps = len(self.pipeline_steps) > 0 

163 has_selection = len(selected_values) > 0 

164 

165 # Mathematical constraints: 

166 # - Pipeline editing requires initialization 

167 # - Edit requires exactly one selection 

168 self.query_one("#add_step").disabled = not (has_plate and is_initialized) 

169 self.query_one("#del_step").disabled = not has_selection 

170 self.query_one("#edit_step").disabled = not (len(selected_values) == 1) # Edit requires exactly one selection 

171 self.query_one("#load_pipeline").disabled = not (has_plate and is_initialized) 

172 self.query_one("#save_pipeline").disabled = not has_steps 

173 

174 except Exception: 

175 # Buttons might not be mounted yet 

176 pass 

177 

178 def get_selection_state(self) -> tuple[List[FunctionStep], str]: 

179 """Get current selection state from SelectionList.""" 

180 try: 

181 selection_list = self.query_one("#pipeline_content", SelectionList) 

182 selected_values = selection_list.selected 

183 

184 # Convert selected values back to step objects 

185 selected_items = [] 

186 for step in self.pipeline_steps: 

187 step_name = getattr(step, 'name', '') 

188 if step_name in selected_values: 

189 selected_items.append(step) 

190 

191 # Determine selection mode 

192 if not selected_items: 

193 selection_mode = "empty" 

194 elif len(selected_items) == len(self.pipeline_steps): 

195 selection_mode = "all" 

196 else: 

197 selection_mode = "checkbox" # SelectionList is always checkbox-based 

198 

199 return selected_items, selection_mode 

200 except Exception: 

201 # Fallback if widget not mounted 

202 return [], "empty" 

203 

204 def watch_current_plate(self, plate_path: str) -> None: 

205 """Automatically update UI when current_plate changes.""" 

206 logger.debug(f"Current plate changed: {plate_path}") 

207 

208 # Load pipeline for the new plate WITHOUT triggering save/invalidation 

209 if plate_path: 

210 # Get pipeline for this plate (or empty if none exists) 

211 plate_pipeline = self.plate_pipelines.get(plate_path, []) 

212 # Set pipeline_steps directly without triggering reactive save 

213 self._set_pipeline_steps_without_save(plate_pipeline) 

214 else: 

215 # No plate selected - clear steps 

216 self._set_pipeline_steps_without_save([]) 

217 

218 # Clear selection when plate changes 

219 self.selected_step = "" 

220 

221 # Update button states 

222 self._update_button_states() 

223 

224 def _set_pipeline_steps_without_save(self, steps: List[FunctionStep]) -> None: 

225 """Set pipeline steps without triggering save/invalidation (for loading existing data).""" 

226 # Temporarily disable the reactive watcher to prevent save cascade 

227 self._loading_existing_pipeline = True 

228 self.pipeline_steps = steps 

229 # Sync with ButtonListWidget's items property 

230 self.items = list(steps) 

231 self._loading_existing_pipeline = False 

232 

233 def watch_pipeline_steps(self, steps: List[FunctionStep]) -> None: 

234 """Automatically update UI when pipeline_steps changes.""" 

235 # Sync with ButtonListWidget's items property to trigger its reactive system 

236 self.items = list(steps) 

237 

238 logger.debug(f"Pipeline steps updated: {len(steps)} steps") 

239 

240 # Only save/invalidate if this is a real change, not loading existing data 

241 if not getattr(self, '_loading_existing_pipeline', False): 

242 # Save pipeline changes to plate storage 

243 self._save_pipeline_to_plate_storage() 

244 

245 def _save_pipeline_to_plate_storage(self) -> None: 

246 """Save current pipeline steps to plate storage and invalidate compilation.""" 

247 if self.current_plate: 

248 # Update plate pipelines storage 

249 current_pipelines = dict(self.plate_pipelines) 

250 current_pipelines[self.current_plate] = list(self.pipeline_steps) 

251 self.plate_pipelines = current_pipelines 

252 logger.debug(f"Saved {len(self.pipeline_steps)} steps for plate: {self.current_plate}") 

253 

254 # Invalidate compilation status when pipeline changes 

255 self._invalidate_compilation_status() 

256 

257 def get_pipeline_for_plate(self, plate_path: str) -> List[FunctionStep]: 

258 """Get pipeline for specific plate.""" 

259 return self.plate_pipelines.get(plate_path, []) 

260 

261 def save_pipeline_for_plate(self, plate_path: str, pipeline: List[FunctionStep]) -> None: 

262 """Save pipeline for specific plate.""" 

263 current_pipelines = dict(self.plate_pipelines) 

264 current_pipelines[plate_path] = pipeline 

265 self.plate_pipelines = current_pipelines 

266 

267 def clear_pipeline_for_plate(self, plate_path: str) -> None: 

268 """Clear pipeline for specific plate.""" 

269 current_pipelines = dict(self.plate_pipelines) 

270 if plate_path in current_pipelines: 

271 del current_pipelines[plate_path] 

272 self.plate_pipelines = current_pipelines 

273 

274 def _invalidate_compilation_status(self) -> None: 

275 """Reset compilation status when pipeline definition changes.""" 

276 if not self.plate_manager or not self.current_plate: 

277 return 

278 

279 # Clear compiled data from simple state 

280 if self.current_plate in self.plate_manager.plate_compiled_data: 

281 del self.plate_manager.plate_compiled_data[self.current_plate] 

282 

283 # Reset orchestrator state to READY (initialized) 

284 orchestrator = self.plate_manager.orchestrators.get(self.current_plate) 

285 if orchestrator and orchestrator.state == OrchestratorState.COMPILED: 

286 orchestrator._state = OrchestratorState.READY 

287 

288 # Trigger UI refresh after orchestrator state change 

289 if self.plate_manager: 

290 self.plate_manager._trigger_ui_refresh() 

291 self.plate_manager._update_button_states() 

292 

293 

294 

295 def watch_selected_step(self, step_id: str) -> None: 

296 """Automatically update UI when selected_step changes.""" 

297 self._update_button_states() 

298 logger.debug(f"Selected step: {step_id}") 

299 

300 def _update_button_states(self) -> None: 

301 """Update button enabled/disabled states based on mathematical constraints.""" 

302 try: 

303 has_plate = bool(self.current_plate) 

304 is_initialized = self._is_current_plate_initialized() 

305 has_steps = len(self.pipeline_steps) > 0 

306 has_valid_selection = bool(self.selected_step) and self._find_step_index_by_selection() is not None 

307 

308 logger.debug(f"PipelineEditor: Button state update - has_plate: {has_plate}, is_initialized: {is_initialized}, has_steps: {has_steps}") 

309 

310 # Mathematical constraints: 

311 # - Pipeline editing requires initialization 

312 # - Step operations require steps to exist 

313 # - Edit requires valid selection that maps to actual step 

314 add_enabled = has_plate and is_initialized 

315 load_enabled = has_plate and is_initialized 

316 

317 logger.debug(f"PipelineEditor: Setting add_step.disabled = {not add_enabled}, load_pipeline.disabled = {not load_enabled}") 

318 

319 self.query_one("#add_step").disabled = not add_enabled 

320 self.query_one("#del_step").disabled = not has_steps 

321 self.query_one("#edit_step").disabled = not (has_steps and has_valid_selection) 

322 self.query_one("#load_pipeline").disabled = not load_enabled 

323 self.query_one("#save_pipeline").disabled = not has_steps 

324 self.query_one("#code_pipeline").disabled = not has_steps 

325 except Exception: 

326 # Buttons might not be mounted yet 

327 pass 

328 

329 

330 

331 async def action_add_step(self) -> None: 

332 """Handle Add Step button - now triggers modal.""" 

333 

334 def handle_result(result: Optional[FunctionStep]) -> None: 

335 if result: # User saved new step 

336 # Store the actual FunctionStep object directly (preserves memory type decorators) 

337 new_steps = self.pipeline_steps + [result] 

338 self.pipeline_steps = new_steps 

339 self.app.current_status = f"Added step: {result.name}" 

340 else: 

341 self.app.current_status = "Add step cancelled" 

342 

343 # LAZY IMPORT to avoid circular import 

344 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow 

345 from textual.css.query import NoMatches 

346 

347 # Use window-based dual editor (follows ConfigWindow pattern) 

348 try: 

349 window = self.app.query_one(DualEditorWindow) 

350 # Window exists, update it for new step and open 

351 window.editing_step = window.pattern_manager.create_new_step() 

352 window.is_new = True 

353 window.on_save_callback = handle_result 

354 window.original_step = window.pattern_manager.clone_pattern(window.editing_step) 

355 window.open_state = True 

356 except NoMatches: 

357 # Expected case: window doesn't exist yet, create new one 

358 window = DualEditorWindow(step_data=None, is_new=True, on_save_callback=handle_result) 

359 await self.app.mount(window) 

360 window.open_state = True 

361 

362 def action_delete_step(self) -> None: 

363 """Handle Delete Step button - delete selected steps.""" 

364 

365 # Get current selection state 

366 selected_items, selection_mode = self.get_selection_state() 

367 

368 if selection_mode == "empty": 

369 self.app.current_status = "No steps available for deletion" 

370 return 

371 

372 # Generate description and perform deletion 

373 count = len(selected_items) 

374 if selection_mode == "empty": 

375 desc = "No items available for deletion" 

376 elif selection_mode == "all": 

377 desc = f"Delete ALL {count} items" 

378 elif count == 1: 

379 item_name = getattr(selected_items[0], 'name', 'Unknown') 

380 desc = f"Delete selected item: {item_name}" 

381 else: 

382 desc = f"Delete {count} selected items" 

383 

384 # Remove selected steps 

385 current_steps = list(self.pipeline_steps) 

386 steps_to_remove = set(getattr(item, 'name', '') for item in selected_items) 

387 

388 # Filter out selected steps 

389 new_steps = [step for step in current_steps if getattr(step, 'name', '') not in steps_to_remove] 

390 

391 # Update pipeline steps (this will trigger save to plate storage) 

392 self.pipeline_steps = new_steps 

393 

394 deleted_count = len(current_steps) - len(new_steps) 

395 self.app.current_status = f"Deleted {deleted_count} steps" 

396 

397 def _dict_to_function_step(self, step_dict: Dict) -> FunctionStep: 

398 """Convert step dict to FunctionStep object with proper data preservation.""" 

399 # Extract function - handle both callable and registry lookup 

400 func = step_dict.get("func") 

401 if func is None: 

402 # Fallback to default function if missing 

403 from openhcs.textual_tui.services.function_registry_service import FunctionRegistryService 

404 registry = FunctionRegistryService() 

405 func = registry.find_default_function() 

406 logger.warning(f"Step '{step_dict.get('name', 'Unknown')}' missing function, using default") 

407 

408 # Extract variable components - handle both list and string formats 

409 var_components = step_dict.get("variable_components", []) 

410 if isinstance(var_components, str): 

411 var_components = [var_components] 

412 elif not isinstance(var_components, list): 

413 var_components = [] 

414 

415 # Only pass variable_components if it's not empty, let FunctionStep use its default otherwise 

416 step_kwargs = { 

417 "func": func, 

418 "name": step_dict.get("name", "Unknown Step"), 

419 "group_by": step_dict.get("group_by", "") 

420 } 

421 if var_components: # Only add if not empty 

422 step_kwargs["variable_components"] = var_components 

423 

424 return FunctionStep(**step_kwargs) 

425 

426 def _function_step_to_dict(self, step: FunctionStep) -> Dict: 

427 """Convert FunctionStep object to dict with complete data preservation.""" 

428 return { 

429 "name": step.name, 

430 "type": "function", 

431 "func": step.func, 

432 "variable_components": step.variable_components, 

433 "group_by": step.group_by 

434 } 

435 

436 def _find_step_index_by_selection(self) -> Optional[int]: 

437 """Find the index of the currently selected step.""" 

438 if not self.selected_step: 

439 return None 

440 

441 # selected_step contains the step name/id 

442 for i, step in enumerate(self.pipeline_steps): 

443 # Now step is a FunctionStep object, not a dict 

444 step_name = getattr(step, 'name', f"Step {i+1}") 

445 if step_name == self.selected_step: 

446 return i 

447 return None 

448 

449 async def action_edit_step(self) -> None: 

450 """Handle Edit Step button with proper selection and data preservation.""" 

451 

452 if not self.pipeline_steps: 

453 self.app.current_status = "No steps to edit" 

454 return 

455 

456 # Find selected step index 

457 step_index = self._find_step_index_by_selection() 

458 if step_index is None: 

459 self.app.current_status = "No step selected for editing" 

460 return 

461 

462 step_to_edit = self.pipeline_steps[step_index] 

463 

464 def handle_result(result: Optional[FunctionStep]) -> None: 

465 if result: # User saved changes 

466 # Store the actual FunctionStep object directly (preserves memory type decorators) 

467 updated_steps = self.pipeline_steps.copy() 

468 updated_steps[step_index] = result 

469 self.pipeline_steps = updated_steps 

470 self.app.current_status = f"Updated step: {result.name}" 

471 else: 

472 self.app.current_status = "Edit step cancelled" 

473 

474 # Use the actual FunctionStep object directly (no conversion needed) 

475 edit_step = step_to_edit 

476 

477 # LAZY IMPORT to avoid circular import 

478 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow 

479 from textual.css.query import NoMatches 

480 

481 # Use window-based dual editor (follows ConfigWindow pattern) 

482 try: 

483 window = self.app.query_one(DualEditorWindow) 

484 # Window exists, update it for editing existing step and open 

485 window.editing_step = edit_step 

486 window.is_new = False 

487 window.on_save_callback = handle_result 

488 window.original_step = window.pattern_manager.clone_pattern(window.editing_step) 

489 window.open_state = True 

490 except NoMatches: 

491 # Expected case: window doesn't exist yet, create new one 

492 window = DualEditorWindow(step_data=edit_step, is_new=False, on_save_callback=handle_result) 

493 await self.app.mount(window) 

494 window.open_state = True 

495 

496 async def action_load_pipeline(self) -> None: 

497 """Handle Load Pipeline button - load pipeline from file.""" 

498 

499 if not self.current_plate: 

500 self.app.current_status = "No plate selected for loading pipeline" 

501 return 

502 

503 # Launch enhanced file browser for .pipeline files 

504 def handle_result(result): 

505 from pathlib import Path # Import at the top of the function 

506 

507 # Handle different result types from file browser 

508 paths_to_load = [] 

509 

510 if isinstance(result, Path): 

511 # Single Path object 

512 paths_to_load = [result] 

513 elif isinstance(result, list) and len(result) > 0: 

514 # List of paths - support multiple pipeline files 

515 for item in result: 

516 if isinstance(item, Path): 

517 paths_to_load.append(item) 

518 else: 

519 paths_to_load.append(Path(item)) 

520 elif isinstance(result, str): 

521 # String path 

522 paths_to_load = [Path(result)] 

523 

524 if paths_to_load: 

525 logger.debug(f"Loading {len(paths_to_load)} pipeline files") 

526 self._load_multiple_pipeline_files(paths_to_load) 

527 else: 

528 self.app.current_status = "Load pipeline cancelled" 

529 

530 # Create file browser window for .pipeline files 

531 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

532 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

533 from openhcs.constants.constants import Backend 

534 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

535 

536 await open_file_browser_window( 

537 app=self.app, 

538 file_manager=self.filemanager, 

539 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES), 

540 backend=Backend.DISK, 

541 title="Load Pipeline (.pipeline)", 

542 mode=BrowserMode.LOAD, 

543 selection_mode=SelectionMode.FILES_ONLY, 

544 filter_extensions=['.pipeline'], 

545 cache_key=PathCacheKey.PIPELINE_FILES, 

546 on_result_callback=handle_result, 

547 caller_id="pipeline_editor" 

548 ) 

549 

550 def _load_multiple_pipeline_files(self, file_paths: List[Path]) -> None: 

551 """Load and concatenate steps from multiple pipeline files.""" 

552 all_steps = [] 

553 loaded_files = [] 

554 failed_files = [] 

555 

556 for file_path in file_paths: 

557 try: 

558 steps = self._load_single_pipeline_file(file_path) 

559 if steps: 

560 all_steps.extend(steps) 

561 loaded_files.append(file_path.name) 

562 logger.info(f"✅ Loaded {len(steps)} steps from {file_path.name}") 

563 else: 

564 failed_files.append(file_path.name) 

565 except Exception as e: 

566 logger.error(f"❌ Failed to load {file_path.name}: {e}") 

567 failed_files.append(file_path.name) 

568 

569 if all_steps: 

570 # Replace current pipeline with concatenated steps 

571 self.pipeline_steps = all_steps 

572 

573 # Apply to multiple orchestrators if they are selected 

574 self._apply_pipeline_to_selected_orchestrators(all_steps) 

575 

576 # Create status message 

577 if len(loaded_files) == 1: 

578 status = f"Loaded {len(all_steps)} steps from {loaded_files[0]}" 

579 else: 

580 status = f"Loaded {len(all_steps)} steps from {len(loaded_files)} files: {', '.join(loaded_files)}" 

581 

582 if failed_files: 

583 status += f" (Failed: {', '.join(failed_files)})" 

584 

585 self.app.current_status = status 

586 logger.info(f"🎯 Total pipeline: {len(all_steps)} steps from {len(loaded_files)} files") 

587 else: 

588 self.app.current_status = f"No valid pipeline steps loaded from {len(file_paths)} files" 

589 

590 def _load_single_pipeline_file(self, file_path: Path) -> List: 

591 """Load pipeline steps from a single .pipeline file.""" 

592 import dill as pickle 

593 try: 

594 with open(file_path, 'rb') as f: 

595 pattern = pickle.load(f) 

596 

597 if isinstance(pattern, list): 

598 return pattern 

599 else: 

600 logger.error(f"Invalid pipeline format in {file_path.name}: expected list, got {type(pattern)}") 

601 return [] 

602 except Exception as e: 

603 logger.error(f"Failed to load pipeline from {file_path.name}: {e}") 

604 raise 

605 

606 def _apply_pipeline_to_selected_orchestrators(self, pipeline_steps: List) -> None: 

607 """Apply loaded pipeline to all selected orchestrators.""" 

608 if not self.plate_manager: 

609 return 

610 

611 # Get selected orchestrators from plate manager 

612 selected_items, selection_mode = self.plate_manager.get_selection_state() 

613 

614 if selection_mode == "empty" or len(selected_items) <= 1: 

615 # Single or no selection - normal behavior 

616 return 

617 

618 # Multiple orchestrators selected - apply pipeline to all 

619 applied_count = 0 

620 for item in selected_items: 

621 plate_path = item['path'] 

622 if plate_path in self.plate_manager.orchestrators: 

623 orchestrator = self.plate_manager.orchestrators[plate_path] 

624 orchestrator.pipeline_definition = list(pipeline_steps) 

625 

626 # Also save to our plate pipelines storage 

627 self.save_pipeline_for_plate(plate_path, list(pipeline_steps)) 

628 applied_count += 1 

629 

630 if applied_count > 1: 

631 self.app.current_status += f" → Applied to {applied_count} orchestrators" 

632 logger.info(f"Applied pipeline to {applied_count} selected orchestrators") 

633 

634 def _load_pipeline_from_file(self, file_path: Path) -> None: 

635 """Load pipeline from .pipeline file (legacy single-file method).""" 

636 try: 

637 steps = self._load_single_pipeline_file(file_path) 

638 if steps: 

639 self.pipeline_steps = steps 

640 self.app.current_status = f"Loaded {len(steps)} steps from {file_path.name}" 

641 else: 

642 self.app.current_status = f"Invalid pipeline format in {file_path.name}" 

643 except Exception as e: 

644 self.app.current_status = f"Failed to load pipeline: {e}" 

645 

646 async def action_save_pipeline(self) -> None: 

647 """Handle Save Pipeline button - save pipeline to file.""" 

648 

649 if not self.current_plate: 

650 self.app.current_status = "No plate selected for saving pipeline" 

651 return 

652 

653 if not self.pipeline_steps: 

654 self.app.current_status = "No pipeline steps to save" 

655 return 

656 

657 # Launch enhanced file browser for saving pipeline 

658 def handle_result(result): 

659 if result and isinstance(result, Path): 

660 self._save_pipeline_to_file(result) 

661 else: 

662 self.app.current_status = "Save pipeline cancelled" 

663 

664 # Create file browser window for saving .pipeline files 

665 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

666 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

667 from openhcs.constants.constants import Backend 

668 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

669 

670 # Generate default filename from plate name 

671 plate_name = Path(self.current_plate).name if self.current_plate else "pipeline" 

672 default_filename = f"{plate_name}.pipeline" 

673 

674 await open_file_browser_window( 

675 app=self.app, 

676 file_manager=self.filemanager, 

677 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES), 

678 backend=Backend.DISK, 

679 title="Save Pipeline (.pipeline)", 

680 mode=BrowserMode.SAVE, 

681 selection_mode=SelectionMode.FILES_ONLY, 

682 filter_extensions=['.pipeline'], 

683 default_filename=default_filename, 

684 cache_key=PathCacheKey.PIPELINE_FILES, 

685 on_result_callback=handle_result, 

686 caller_id="pipeline_editor" 

687 ) 

688 

689 def _save_pipeline_to_file(self, file_path: Path) -> None: 

690 """Save pipeline to .pipeline file.""" 

691 import dill as pickle 

692 try: 

693 with open(file_path, 'wb') as f: 

694 pickle.dump(list(self.pipeline_steps), f) 

695 self.app.current_status = f"Saved pipeline to {file_path.name}" 

696 except Exception as e: 

697 logger.error(f"Failed to save pipeline: {e}") 

698 self.app.current_status = f"Failed to save pipeline: {e}" 

699 

700 async def action_code_pipeline(self) -> None: 

701 """Edit pipeline as Python code in terminal window.""" 

702 logger.debug("Code button pressed - opening pipeline editor") 

703 

704 if not self.pipeline_steps: 

705 self.app.current_status = "No pipeline steps to edit" 

706 return 

707 

708 if not self.current_plate: 

709 self.app.current_status = "No plate selected" 

710 return 

711 

712 try: 

713 # Use complete pipeline steps code generation 

714 from openhcs.debug.pickle_to_python import generate_complete_pipeline_steps_code 

715 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher 

716 

717 # Generate complete pipeline steps code with imports 

718 python_code = generate_complete_pipeline_steps_code( 

719 pipeline_steps=list(self.pipeline_steps), 

720 clean_mode=False 

721 ) 

722 

723 # Create callback to handle edited code 

724 def handle_edited_code(edited_code: str): 

725 logger.debug("Pipeline code edited, processing changes...") 

726 try: 

727 # Execute the code (it has all necessary imports) 

728 namespace = {} 

729 exec(edited_code, namespace) 

730 

731 # Get the pipeline_steps from the namespace 

732 if 'pipeline_steps' in namespace: 

733 new_pipeline_steps = namespace['pipeline_steps'] 

734 # Update the pipeline with new steps 

735 self.pipeline_steps = new_pipeline_steps 

736 self.app.current_status = f"Pipeline updated with {len(new_pipeline_steps)} steps" 

737 else: 

738 self.app.show_error("Parse Error", "No 'pipeline_steps = [...]' assignment found in edited code") 

739 

740 except SyntaxError as e: 

741 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}") 

742 except Exception as e: 

743 logger.error(f"Failed to parse edited pipeline code: {e}") 

744 self.app.show_error("Edit Error", f"Failed to parse pipeline code: {str(e)}") 

745 

746 # Launch terminal editor 

747 launcher = TerminalLauncher(self.app) 

748 await launcher.launch_editor_for_file( 

749 file_content=python_code, 

750 file_extension='.py', 

751 on_save_callback=handle_edited_code 

752 ) 

753 

754 except Exception as e: 

755 logger.error(f"Failed to open pipeline code editor: {e}") 

756 self.app.current_status = f"Failed to open code editor: {e}"