Coverage for openhcs/textual_tui/widgets/pipeline_editor.py: 0.0%

414 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2PipelineEditorWidget for OpenHCS Textual TUI 

3 

4Pipeline editing widget with complete button set and reactive state management. 

5Matches the functionality from the current prompt-toolkit TUI. 

6""" 

7 

8import logging 

9from typing import Dict, List, Optional, Any, Callable, Tuple 

10from pathlib import Path 

11 

12from textual.app import ComposeResult 

13from textual.containers import Horizontal, ScrollableContainer, Vertical 

14from textual.reactive import reactive 

15from textual.widgets import Button, Static, SelectionList 

16from textual.widget import Widget 

17from .button_list_widget import ButtonListWidget, ButtonConfig 

18from textual import work 

19 

20from openhcs.core.config import GlobalPipelineConfig 

21from openhcs.io.filemanager import FileManager 

22from openhcs.core.steps.function_step import FunctionStep 

23from openhcs.constants.constants import OrchestratorState 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class PipelineEditorWidget(ButtonListWidget): 

29 """ 

30 Pipeline editing widget using Textual reactive state. 

31  

32 Features: 

33 - Complete button set: Add, Del, Edit, Load, Save 

34 - Reactive state management for automatic UI updates 

35 - Scrollable content area 

36 - Integration with plate selection from PlateManager 

37 """ 

38 

39 # Textual reactive state 

40 pipeline_steps = reactive([]) 

41 current_plate = reactive("") 

42 selected_step = reactive("") 

43 plate_pipelines = reactive({}) # {plate_path: List[FunctionStep]} - per-plate pipeline storage 

44 

45 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig): 

46 """ 

47 Initialize the pipeline editor widget. 

48 

49 Args: 

50 filemanager: FileManager instance for file operations 

51 global_config: Global configuration 

52 """ 

53 # Define button configuration 

54 button_configs = [ 

55 ButtonConfig("Add", "add_step", disabled=True), 

56 ButtonConfig("Del", "del_step", disabled=True), 

57 ButtonConfig("Edit", "edit_step", disabled=True), 

58 ButtonConfig("Load", "load_pipeline", disabled=True), 

59 ButtonConfig("Save", "save_pipeline", disabled=True), 

60 ButtonConfig("Code", "code_pipeline", disabled=True), 

61 ] 

62 

63 super().__init__( 

64 button_configs=button_configs, 

65 list_id="pipeline_content", 

66 container_id="pipeline_list", 

67 on_button_pressed=self._handle_button_press, 

68 on_selection_changed=self._handle_selection_change, 

69 on_item_moved=self._handle_item_moved 

70 ) 

71 

72 self.filemanager = filemanager 

73 # Note: We don't store global_config as it can become stale 

74 # Always use self.app.global_config to get the current config 

75 

76 # Reference to plate manager (set by MainContent) 

77 self.plate_manager = None 

78 

79 logger.debug("PipelineEditorWidget initialized") 

80 

81 def format_item_for_display(self, step: FunctionStep) -> Tuple[str, str]: 

82 """Format step for display in the list.""" 

83 step_name = getattr(step, 'name', 'Unknown Step') 

84 display_text = f"📋 {step_name}" 

85 return display_text, step_name 

86 

87 def _is_current_plate_initialized(self) -> bool: 

88 """Check if current plate has an initialized orchestrator.""" 

89 if not self.current_plate or not self.plate_manager: 

90 logger.debug(f"PipelineEditor: No current plate ({self.current_plate}) or plate_manager ({self.plate_manager})") 

91 return False 

92 

93 orchestrator = self.plate_manager.orchestrators.get(self.current_plate) 

94 if orchestrator is None: 

95 logger.debug(f"PipelineEditor: No orchestrator found for plate {self.current_plate}") 

96 return False 

97 

98 is_initialized = orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED, 

99 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED, 

100 OrchestratorState.EXEC_FAILED] 

101 logger.debug(f"PipelineEditor: Plate {self.current_plate} orchestrator state: {orchestrator.state}, initialized: {is_initialized}") 

102 return is_initialized 

103 

104 async def _handle_button_press(self, button_id: str) -> None: 

105 """Handle button presses from ButtonListWidget (supports async actions).""" 

106 import inspect 

107 

108 if button_id == "add_step": 

109 await self.action_add_step() 

110 elif button_id == "del_step": 

111 self.action_delete_step() 

112 elif button_id == "edit_step": 

113 await self.action_edit_step() 

114 elif button_id == "load_pipeline": 

115 await self.action_load_pipeline() 

116 elif button_id == "save_pipeline": 

117 await self.action_save_pipeline() 

118 elif button_id == "code_pipeline": 

119 await self.action_code_pipeline() 

120 

121 def _handle_selection_change(self, selected_values: List[str]) -> None: 

122 """Handle selection changes from ButtonListWidget.""" 

123 # Update selected_step - use first selected item if any 

124 if selected_values: 

125 self.selected_step = selected_values[0] # This is the step name 

126 else: 

127 self.selected_step = "" 

128 

129 def _handle_item_moved(self, from_index: int, to_index: int) -> None: 

130 """Handle item movement from ButtonListWidget.""" 

131 current_steps = list(self.pipeline_steps) 

132 

133 # Move the step 

134 step = current_steps.pop(from_index) 

135 current_steps.insert(to_index, step) 

136 

137 # Update pipeline steps 

138 self.pipeline_steps = current_steps 

139 

140 step_name = getattr(step, 'name', 'Unknown Step') 

141 direction = "up" if to_index < from_index else "down" 

142 self.app.current_status = f"Moved step '{step_name}' {direction}" 

143 

144 def on_selection_list_selected_changed(self, event: SelectionList.SelectedChanged) -> None: 

145 """Handle selection changes from SelectionList.""" 

146 selected_values = event.selection_list.selected 

147 

148 # Update selected_step - use first selected item if any 

149 if selected_values: 

150 self.selected_step = selected_values[0] # This is the step name/id 

151 else: 

152 self.selected_step = "" 

153 

154 # Update button states based on selection 

155 self._update_button_states_for_selection(selected_values) 

156 

157 def _update_button_states_for_selection(self, selected_values: List[str]) -> None: 

158 """Update button states based on current selection and mathematical constraints.""" 

159 try: 

160 has_plate = bool(self.current_plate) 

161 is_initialized = self._is_current_plate_initialized() 

162 has_steps = len(self.pipeline_steps) > 0 

163 has_selection = len(selected_values) > 0 

164 

165 # Mathematical constraints: 

166 # - Pipeline editing requires initialization 

167 # - Edit requires exactly one selection 

168 self.query_one("#add_step").disabled = not (has_plate and is_initialized) 

169 self.query_one("#del_step").disabled = not has_selection 

170 self.query_one("#edit_step").disabled = not (len(selected_values) == 1) # Edit requires exactly one selection 

171 self.query_one("#load_pipeline").disabled = not (has_plate and is_initialized) 

172 self.query_one("#save_pipeline").disabled = not has_steps 

173 self.query_one("#code_pipeline").disabled = not (has_plate and is_initialized) # Same as add button 

174 

175 except Exception: 

176 # Buttons might not be mounted yet 

177 pass 

178 

179 def get_selection_state(self) -> tuple[List[FunctionStep], str]: 

180 """Get current selection state from SelectionList.""" 

181 try: 

182 selection_list = self.query_one("#pipeline_content", SelectionList) 

183 selected_values = selection_list.selected 

184 

185 # Convert selected values back to step objects 

186 selected_items = [] 

187 for step in self.pipeline_steps: 

188 step_name = getattr(step, 'name', '') 

189 if step_name in selected_values: 

190 selected_items.append(step) 

191 

192 # Determine selection mode 

193 if not selected_items: 

194 selection_mode = "empty" 

195 elif len(selected_items) == len(self.pipeline_steps): 

196 selection_mode = "all" 

197 else: 

198 selection_mode = "checkbox" # SelectionList is always checkbox-based 

199 

200 return selected_items, selection_mode 

201 except Exception: 

202 # Fallback if widget not mounted 

203 return [], "empty" 

204 

205 def watch_current_plate(self, plate_path: str) -> None: 

206 """Automatically update UI when current_plate changes.""" 

207 logger.debug(f"Current plate changed: {plate_path}") 

208 

209 # Load pipeline for the new plate WITHOUT triggering save/invalidation 

210 if plate_path: 

211 # Get pipeline for this plate (or empty if none exists) 

212 plate_pipeline = self.plate_pipelines.get(plate_path, []) 

213 # Set pipeline_steps directly without triggering reactive save 

214 self._set_pipeline_steps_without_save(plate_pipeline) 

215 else: 

216 # No plate selected - clear steps 

217 self._set_pipeline_steps_without_save([]) 

218 

219 # Clear selection when plate changes 

220 self.selected_step = "" 

221 

222 # Update button states 

223 self._update_button_states() 

224 

225 def _set_pipeline_steps_without_save(self, steps: List[FunctionStep]) -> None: 

226 """Set pipeline steps without triggering save/invalidation (for loading existing data).""" 

227 # Temporarily disable the reactive watcher to prevent save cascade 

228 self._loading_existing_pipeline = True 

229 self.pipeline_steps = steps 

230 # Sync with ButtonListWidget's items property 

231 self.items = list(steps) 

232 self._loading_existing_pipeline = False 

233 

234 def watch_pipeline_steps(self, steps: List[FunctionStep]) -> None: 

235 """Automatically update UI when pipeline_steps changes.""" 

236 # Sync with ButtonListWidget's items property to trigger its reactive system 

237 self.items = list(steps) 

238 

239 logger.debug(f"Pipeline steps updated: {len(steps)} steps") 

240 

241 # Only save/invalidate if this is a real change, not loading existing data 

242 if not getattr(self, '_loading_existing_pipeline', False): 

243 # Save pipeline changes to plate storage 

244 self._save_pipeline_to_plate_storage() 

245 

246 def _save_pipeline_to_plate_storage(self) -> None: 

247 """Save current pipeline steps to plate storage and invalidate compilation.""" 

248 if self.current_plate: 

249 # Update plate pipelines storage 

250 current_pipelines = dict(self.plate_pipelines) 

251 current_pipelines[self.current_plate] = list(self.pipeline_steps) 

252 self.plate_pipelines = current_pipelines 

253 logger.debug(f"Saved {len(self.pipeline_steps)} steps for plate: {self.current_plate}") 

254 

255 # Invalidate compilation status when pipeline changes 

256 self._invalidate_compilation_status() 

257 

258 def get_pipeline_for_plate(self, plate_path: str) -> List[FunctionStep]: 

259 """Get pipeline for specific plate.""" 

260 return self.plate_pipelines.get(plate_path, []) 

261 

262 def save_pipeline_for_plate(self, plate_path: str, pipeline: List[FunctionStep]) -> None: 

263 """Save pipeline for specific plate.""" 

264 current_pipelines = dict(self.plate_pipelines) 

265 current_pipelines[plate_path] = pipeline 

266 self.plate_pipelines = current_pipelines 

267 

268 def clear_pipeline_for_plate(self, plate_path: str) -> None: 

269 """Clear pipeline for specific plate.""" 

270 current_pipelines = dict(self.plate_pipelines) 

271 if plate_path in current_pipelines: 

272 del current_pipelines[plate_path] 

273 self.plate_pipelines = current_pipelines 

274 

275 def _invalidate_compilation_status(self) -> None: 

276 """Reset compilation status when pipeline definition changes.""" 

277 if not self.plate_manager or not self.current_plate: 

278 return 

279 

280 # Clear compiled data from simple state 

281 if self.current_plate in self.plate_manager.plate_compiled_data: 

282 del self.plate_manager.plate_compiled_data[self.current_plate] 

283 

284 # Reset orchestrator state to READY (initialized) 

285 orchestrator = self.plate_manager.orchestrators.get(self.current_plate) 

286 if orchestrator and orchestrator.state == OrchestratorState.COMPILED: 

287 orchestrator._state = OrchestratorState.READY 

288 

289 # Trigger UI refresh after orchestrator state change 

290 if self.plate_manager: 

291 self.plate_manager._trigger_ui_refresh() 

292 self.plate_manager._update_button_states() 

293 

294 

295 

296 def watch_selected_step(self, step_id: str) -> None: 

297 """Automatically update UI when selected_step changes.""" 

298 self._update_button_states() 

299 logger.debug(f"Selected step: {step_id}") 

300 

301 def _update_button_states(self) -> None: 

302 """Update button enabled/disabled states based on mathematical constraints.""" 

303 try: 

304 has_plate = bool(self.current_plate) 

305 is_initialized = self._is_current_plate_initialized() 

306 has_steps = len(self.pipeline_steps) > 0 

307 has_valid_selection = bool(self.selected_step) and self._find_step_index_by_selection() is not None 

308 

309 logger.debug(f"PipelineEditor: Button state update - has_plate: {has_plate}, is_initialized: {is_initialized}, has_steps: {has_steps}") 

310 

311 # Mathematical constraints: 

312 # - Pipeline editing requires initialization 

313 # - Step operations require steps to exist 

314 # - Edit requires valid selection that maps to actual step 

315 add_enabled = has_plate and is_initialized 

316 load_enabled = has_plate and is_initialized 

317 code_enabled = has_plate and is_initialized # Same as add button - orchestrator init is sufficient 

318 

319 logger.debug(f"PipelineEditor: Setting add_step.disabled = {not add_enabled}, load_pipeline.disabled = {not load_enabled}") 

320 

321 self.query_one("#add_step").disabled = not add_enabled 

322 self.query_one("#del_step").disabled = not has_steps 

323 self.query_one("#edit_step").disabled = not (has_steps and has_valid_selection) 

324 self.query_one("#load_pipeline").disabled = not load_enabled 

325 self.query_one("#save_pipeline").disabled = not has_steps 

326 self.query_one("#code_pipeline").disabled = not code_enabled # Changed from has_steps to code_enabled 

327 except Exception: 

328 # Buttons might not be mounted yet 

329 pass 

330 

331 

332 

333 async def action_add_step(self) -> None: 

334 """Handle Add Step button - now triggers modal.""" 

335 

336 def handle_result(result: Optional[FunctionStep]) -> None: 

337 if result: # User saved new step 

338 # Store the actual FunctionStep object directly (preserves memory type decorators) 

339 new_steps = self.pipeline_steps + [result] 

340 self.pipeline_steps = new_steps 

341 self.app.current_status = f"Added step: {result.name}" 

342 else: 

343 self.app.current_status = "Add step cancelled" 

344 

345 # LAZY IMPORT to avoid circular import 

346 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow 

347 from textual.css.query import NoMatches 

348 

349 # Use window-based dual editor (follows ConfigWindow pattern) 

350 try: 

351 window = self.app.query_one(DualEditorWindow) 

352 # Window exists, update it for new step and open 

353 window.editing_step = window.pattern_manager.create_new_step() 

354 window.is_new = True 

355 window.on_save_callback = handle_result 

356 window.original_step = window.pattern_manager.clone_pattern(window.editing_step) 

357 window.open_state = True 

358 except NoMatches: 

359 # Expected case: window doesn't exist yet, create new one 

360 window = DualEditorWindow(step_data=None, is_new=True, on_save_callback=handle_result) 

361 await self.app.mount(window) 

362 window.open_state = True 

363 

364 def action_delete_step(self) -> None: 

365 """Handle Delete Step button - delete selected steps.""" 

366 

367 # Get current selection state 

368 selected_items, selection_mode = self.get_selection_state() 

369 

370 if selection_mode == "empty": 

371 self.app.current_status = "No steps available for deletion" 

372 return 

373 

374 # Generate description and perform deletion 

375 count = len(selected_items) 

376 if selection_mode == "empty": 

377 desc = "No items available for deletion" 

378 elif selection_mode == "all": 

379 desc = f"Delete ALL {count} items" 

380 elif count == 1: 

381 item_name = getattr(selected_items[0], 'name', 'Unknown') 

382 desc = f"Delete selected item: {item_name}" 

383 else: 

384 desc = f"Delete {count} selected items" 

385 

386 # Remove selected steps 

387 current_steps = list(self.pipeline_steps) 

388 steps_to_remove = set(getattr(item, 'name', '') for item in selected_items) 

389 

390 # Filter out selected steps 

391 new_steps = [step for step in current_steps if getattr(step, 'name', '') not in steps_to_remove] 

392 

393 # Update pipeline steps (this will trigger save to plate storage) 

394 self.pipeline_steps = new_steps 

395 

396 deleted_count = len(current_steps) - len(new_steps) 

397 self.app.current_status = f"Deleted {deleted_count} steps" 

398 

399 def _dict_to_function_step(self, step_dict: Dict) -> FunctionStep: 

400 """Convert step dict to FunctionStep object with proper data preservation.""" 

401 # Extract function - handle both callable and registry lookup 

402 func = step_dict.get("func") 

403 if func is None: 

404 # Fallback to default function if missing 

405 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

406 registry = RegistryService() 

407 func = registry.find_default_function() 

408 logger.warning(f"Step '{step_dict.get('name', 'Unknown')}' missing function, using default") 

409 

410 # Extract variable components - handle both list and string formats 

411 var_components = step_dict.get("variable_components", []) 

412 if isinstance(var_components, str): 

413 var_components = [var_components] 

414 elif not isinstance(var_components, list): 

415 var_components = [] 

416 

417 # Only pass variable_components if it's not empty, let FunctionStep use its default otherwise 

418 step_kwargs = { 

419 "func": func, 

420 "name": step_dict.get("name", "Unknown Step"), 

421 "group_by": step_dict.get("group_by", "") 

422 } 

423 if var_components: # Only add if not empty 

424 step_kwargs["variable_components"] = var_components 

425 

426 return FunctionStep(**step_kwargs) 

427 

428 def _function_step_to_dict(self, step: FunctionStep) -> Dict: 

429 """Convert FunctionStep object to dict with complete data preservation.""" 

430 return { 

431 "name": step.name, 

432 "type": "function", 

433 "func": step.func, 

434 "variable_components": step.variable_components, 

435 "group_by": step.group_by 

436 } 

437 

438 def _find_step_index_by_selection(self) -> Optional[int]: 

439 """Find the index of the currently selected step.""" 

440 if not self.selected_step: 

441 return None 

442 

443 # selected_step contains the step name/id 

444 for i, step in enumerate(self.pipeline_steps): 

445 # Now step is a FunctionStep object, not a dict 

446 step_name = getattr(step, 'name', f"Step {i+1}") 

447 if step_name == self.selected_step: 

448 return i 

449 return None 

450 

451 async def action_edit_step(self) -> None: 

452 """Handle Edit Step button with proper selection and data preservation.""" 

453 

454 if not self.pipeline_steps: 

455 self.app.current_status = "No steps to edit" 

456 return 

457 

458 # Find selected step index 

459 step_index = self._find_step_index_by_selection() 

460 if step_index is None: 

461 self.app.current_status = "No step selected for editing" 

462 return 

463 

464 step_to_edit = self.pipeline_steps[step_index] 

465 

466 def handle_result(result: Optional[FunctionStep]) -> None: 

467 if result: # User saved changes 

468 # Store the actual FunctionStep object directly (preserves memory type decorators) 

469 updated_steps = self.pipeline_steps.copy() 

470 updated_steps[step_index] = result 

471 self.pipeline_steps = updated_steps 

472 self.app.current_status = f"Updated step: {result.name}" 

473 else: 

474 self.app.current_status = "Edit step cancelled" 

475 

476 # Use the actual FunctionStep object directly (no conversion needed) 

477 edit_step = step_to_edit 

478 

479 # LAZY IMPORT to avoid circular import 

480 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow 

481 from textual.css.query import NoMatches 

482 

483 # Use window-based dual editor (follows ConfigWindow pattern) 

484 try: 

485 window = self.app.query_one(DualEditorWindow) 

486 # Window exists, update it for editing existing step and open 

487 window.editing_step = edit_step 

488 window.is_new = False 

489 window.on_save_callback = handle_result 

490 window.original_step = window.pattern_manager.clone_pattern(window.editing_step) 

491 window.open_state = True 

492 except NoMatches: 

493 # Expected case: window doesn't exist yet, create new one 

494 window = DualEditorWindow(step_data=edit_step, is_new=False, on_save_callback=handle_result) 

495 await self.app.mount(window) 

496 window.open_state = True 

497 

498 async def action_load_pipeline(self) -> None: 

499 """Handle Load Pipeline button - load pipeline from file.""" 

500 

501 if not self.current_plate: 

502 self.app.current_status = "No plate selected for loading pipeline" 

503 return 

504 

505 # Launch enhanced file browser for .pipeline files 

506 def handle_result(result): 

507 from pathlib import Path # Import at the top of the function 

508 

509 # Handle different result types from file browser 

510 paths_to_load = [] 

511 

512 if isinstance(result, Path): 

513 # Single Path object 

514 paths_to_load = [result] 

515 elif isinstance(result, list) and len(result) > 0: 

516 # List of paths - support multiple pipeline files 

517 for item in result: 

518 if isinstance(item, Path): 

519 paths_to_load.append(item) 

520 else: 

521 paths_to_load.append(Path(item)) 

522 elif isinstance(result, str): 

523 # String path 

524 paths_to_load = [Path(result)] 

525 

526 if paths_to_load: 

527 logger.debug(f"Loading {len(paths_to_load)} pipeline files") 

528 self._load_multiple_pipeline_files(paths_to_load) 

529 else: 

530 self.app.current_status = "Load pipeline cancelled" 

531 

532 # Create file browser window for .pipeline files 

533 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

534 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

535 from openhcs.constants.constants import Backend 

536 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

537 

538 await open_file_browser_window( 

539 app=self.app, 

540 file_manager=self.filemanager, 

541 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES), 

542 backend=Backend.DISK, 

543 title="Load Pipeline (.pipeline)", 

544 mode=BrowserMode.LOAD, 

545 selection_mode=SelectionMode.FILES_ONLY, 

546 filter_extensions=['.pipeline'], 

547 cache_key=PathCacheKey.PIPELINE_FILES, 

548 on_result_callback=handle_result, 

549 caller_id="pipeline_editor" 

550 ) 

551 

552 def _load_multiple_pipeline_files(self, file_paths: List[Path]) -> None: 

553 """Load and concatenate steps from multiple pipeline files.""" 

554 all_steps = [] 

555 loaded_files = [] 

556 failed_files = [] 

557 

558 for file_path in file_paths: 

559 try: 

560 steps = self._load_single_pipeline_file(file_path) 

561 if steps: 

562 all_steps.extend(steps) 

563 loaded_files.append(file_path.name) 

564 logger.info(f"✅ Loaded {len(steps)} steps from {file_path.name}") 

565 else: 

566 failed_files.append(file_path.name) 

567 except Exception as e: 

568 logger.error(f"❌ Failed to load {file_path.name}: {e}") 

569 failed_files.append(file_path.name) 

570 

571 if all_steps: 

572 # Replace current pipeline with concatenated steps 

573 self.pipeline_steps = all_steps 

574 

575 # Apply to multiple orchestrators if they are selected 

576 self._apply_pipeline_to_selected_orchestrators(all_steps) 

577 

578 # Create status message 

579 if len(loaded_files) == 1: 

580 status = f"Loaded {len(all_steps)} steps from {loaded_files[0]}" 

581 else: 

582 status = f"Loaded {len(all_steps)} steps from {len(loaded_files)} files: {', '.join(loaded_files)}" 

583 

584 if failed_files: 

585 status += f" (Failed: {', '.join(failed_files)})" 

586 

587 self.app.current_status = status 

588 logger.info(f"🎯 Total pipeline: {len(all_steps)} steps from {len(loaded_files)} files") 

589 else: 

590 self.app.current_status = f"No valid pipeline steps loaded from {len(file_paths)} files" 

591 

592 def _load_single_pipeline_file(self, file_path: Path) -> List: 

593 """Load pipeline steps from a single .pipeline file.""" 

594 import dill as pickle 

595 try: 

596 with open(file_path, 'rb') as f: 

597 pattern = pickle.load(f) 

598 

599 if isinstance(pattern, list): 

600 return pattern 

601 else: 

602 logger.error(f"Invalid pipeline format in {file_path.name}: expected list, got {type(pattern)}") 

603 return [] 

604 except Exception as e: 

605 logger.error(f"Failed to load pipeline from {file_path.name}: {e}") 

606 raise 

607 

608 def _apply_pipeline_to_selected_orchestrators(self, pipeline_steps: List) -> None: 

609 """Apply loaded pipeline to all selected orchestrators.""" 

610 if not self.plate_manager: 

611 return 

612 

613 # Get selected orchestrators from plate manager 

614 selected_items, selection_mode = self.plate_manager.get_selection_state() 

615 

616 if selection_mode == "empty" or len(selected_items) <= 1: 

617 # Single or no selection - normal behavior 

618 return 

619 

620 # Multiple orchestrators selected - apply pipeline to all 

621 applied_count = 0 

622 for item in selected_items: 

623 plate_path = item['path'] 

624 if plate_path in self.plate_manager.orchestrators: 

625 orchestrator = self.plate_manager.orchestrators[plate_path] 

626 orchestrator.pipeline_definition = list(pipeline_steps) 

627 

628 # Also save to our plate pipelines storage 

629 self.save_pipeline_for_plate(plate_path, list(pipeline_steps)) 

630 applied_count += 1 

631 

632 if applied_count > 1: 

633 self.app.current_status += f" → Applied to {applied_count} orchestrators" 

634 logger.info(f"Applied pipeline to {applied_count} selected orchestrators") 

635 

636 def _load_pipeline_from_file(self, file_path: Path) -> None: 

637 """Load pipeline from .pipeline file (legacy single-file method).""" 

638 try: 

639 steps = self._load_single_pipeline_file(file_path) 

640 if steps: 

641 self.pipeline_steps = steps 

642 self.app.current_status = f"Loaded {len(steps)} steps from {file_path.name}" 

643 else: 

644 self.app.current_status = f"Invalid pipeline format in {file_path.name}" 

645 except Exception as e: 

646 self.app.current_status = f"Failed to load pipeline: {e}" 

647 

648 async def action_save_pipeline(self) -> None: 

649 """Handle Save Pipeline button - save pipeline to file.""" 

650 

651 if not self.current_plate: 

652 self.app.current_status = "No plate selected for saving pipeline" 

653 return 

654 

655 if not self.pipeline_steps: 

656 self.app.current_status = "No pipeline steps to save" 

657 return 

658 

659 # Launch enhanced file browser for saving pipeline 

660 def handle_result(result): 

661 if result and isinstance(result, Path): 

662 self._save_pipeline_to_file(result) 

663 else: 

664 self.app.current_status = "Save pipeline cancelled" 

665 

666 # Create file browser window for saving .pipeline files 

667 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

668 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

669 from openhcs.constants.constants import Backend 

670 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

671 

672 # Generate default filename from plate name 

673 plate_name = Path(self.current_plate).name if self.current_plate else "pipeline" 

674 default_filename = f"{plate_name}.pipeline" 

675 

676 await open_file_browser_window( 

677 app=self.app, 

678 file_manager=self.filemanager, 

679 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES), 

680 backend=Backend.DISK, 

681 title="Save Pipeline (.pipeline)", 

682 mode=BrowserMode.SAVE, 

683 selection_mode=SelectionMode.FILES_ONLY, 

684 filter_extensions=['.pipeline'], 

685 default_filename=default_filename, 

686 cache_key=PathCacheKey.PIPELINE_FILES, 

687 on_result_callback=handle_result, 

688 caller_id="pipeline_editor" 

689 ) 

690 

691 def _save_pipeline_to_file(self, file_path: Path) -> None: 

692 """Save pipeline to .pipeline file.""" 

693 import dill as pickle 

694 try: 

695 with open(file_path, 'wb') as f: 

696 pickle.dump(list(self.pipeline_steps), f) 

697 self.app.current_status = f"Saved pipeline to {file_path.name}" 

698 except Exception as e: 

699 logger.error(f"Failed to save pipeline: {e}") 

700 self.app.current_status = f"Failed to save pipeline: {e}" 

701 

702 async def action_code_pipeline(self) -> None: 

703 """Edit pipeline as Python code in terminal window.""" 

704 logger.debug("Code button pressed - opening pipeline editor") 

705 

706 if not self.current_plate: 

707 self.app.current_status = "No plate selected" 

708 return 

709 

710 try: 

711 # Use complete pipeline steps code generation 

712 from openhcs.debug.pickle_to_python import generate_complete_pipeline_steps_code 

713 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher 

714 

715 # Generate complete pipeline steps code with imports 

716 python_code = generate_complete_pipeline_steps_code( 

717 pipeline_steps=list(self.pipeline_steps), 

718 clean_mode=True 

719 ) 

720 

721 # Create callback to handle edited code 

722 def handle_edited_code(edited_code: str): 

723 logger.debug("Pipeline code edited, processing changes...") 

724 try: 

725 # Execute the code (it has all necessary imports) 

726 namespace = {} 

727 exec(edited_code, namespace) 

728 

729 # Get the pipeline_steps from the namespace 

730 if 'pipeline_steps' in namespace: 

731 new_pipeline_steps = namespace['pipeline_steps'] 

732 # Update the pipeline with new steps 

733 self.pipeline_steps = new_pipeline_steps 

734 self.app.current_status = f"Pipeline updated with {len(new_pipeline_steps)} steps" 

735 else: 

736 self.app.show_error("Parse Error", "No 'pipeline_steps = [...]' assignment found in edited code") 

737 

738 except SyntaxError as e: 

739 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}") 

740 except Exception as e: 

741 logger.error(f"Failed to parse edited pipeline code: {e}") 

742 self.app.show_error("Edit Error", f"Failed to parse pipeline code: {str(e)}") 

743 

744 # Launch terminal editor 

745 launcher = TerminalLauncher(self.app) 

746 await launcher.launch_editor_for_file( 

747 file_content=python_code, 

748 file_extension='.py', 

749 on_save_callback=handle_edited_code 

750 ) 

751 

752 except Exception as e: 

753 logger.error(f"Failed to open pipeline code editor: {e}") 

754 self.app.current_status = f"Failed to open code editor: {e}"