Coverage for openhcs/textual_tui/widgets/pipeline_editor.py: 0.0%
414 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2PipelineEditorWidget for OpenHCS Textual TUI
4Pipeline editing widget with complete button set and reactive state management.
5Matches the functionality from the current prompt-toolkit TUI.
6"""
8import logging
9from typing import Dict, List, Optional, Any, Callable, Tuple
10from pathlib import Path
12from textual.app import ComposeResult
13from textual.containers import Horizontal, ScrollableContainer, Vertical
14from textual.reactive import reactive
15from textual.widgets import Button, Static, SelectionList
16from textual.widget import Widget
17from .button_list_widget import ButtonListWidget, ButtonConfig
18from textual import work
20from openhcs.core.config import GlobalPipelineConfig
21from openhcs.io.filemanager import FileManager
22from openhcs.core.steps.function_step import FunctionStep
23from openhcs.constants.constants import OrchestratorState
25logger = logging.getLogger(__name__)
28class PipelineEditorWidget(ButtonListWidget):
29 """
30 Pipeline editing widget using Textual reactive state.
32 Features:
33 - Complete button set: Add, Del, Edit, Load, Save
34 - Reactive state management for automatic UI updates
35 - Scrollable content area
36 - Integration with plate selection from PlateManager
37 """
39 # Textual reactive state
40 pipeline_steps = reactive([])
41 current_plate = reactive("")
42 selected_step = reactive("")
43 plate_pipelines = reactive({}) # {plate_path: List[FunctionStep]} - per-plate pipeline storage
45 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig):
46 """
47 Initialize the pipeline editor widget.
49 Args:
50 filemanager: FileManager instance for file operations
51 global_config: Global configuration
52 """
53 # Define button configuration
54 button_configs = [
55 ButtonConfig("Add", "add_step", disabled=True),
56 ButtonConfig("Del", "del_step", disabled=True),
57 ButtonConfig("Edit", "edit_step", disabled=True),
58 ButtonConfig("Load", "load_pipeline", disabled=True),
59 ButtonConfig("Save", "save_pipeline", disabled=True),
60 ButtonConfig("Code", "code_pipeline", disabled=True),
61 ]
63 super().__init__(
64 button_configs=button_configs,
65 list_id="pipeline_content",
66 container_id="pipeline_list",
67 on_button_pressed=self._handle_button_press,
68 on_selection_changed=self._handle_selection_change,
69 on_item_moved=self._handle_item_moved
70 )
72 self.filemanager = filemanager
73 # Note: We don't store global_config as it can become stale
74 # Always use self.app.global_config to get the current config
76 # Reference to plate manager (set by MainContent)
77 self.plate_manager = None
79 logger.debug("PipelineEditorWidget initialized")
81 def format_item_for_display(self, step: FunctionStep) -> Tuple[str, str]:
82 """Format step for display in the list."""
83 step_name = getattr(step, 'name', 'Unknown Step')
84 display_text = f"📋 {step_name}"
85 return display_text, step_name
87 def _is_current_plate_initialized(self) -> bool:
88 """Check if current plate has an initialized orchestrator."""
89 if not self.current_plate or not self.plate_manager:
90 logger.debug(f"PipelineEditor: No current plate ({self.current_plate}) or plate_manager ({self.plate_manager})")
91 return False
93 orchestrator = self.plate_manager.orchestrators.get(self.current_plate)
94 if orchestrator is None:
95 logger.debug(f"PipelineEditor: No orchestrator found for plate {self.current_plate}")
96 return False
98 is_initialized = orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED,
99 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED,
100 OrchestratorState.EXEC_FAILED]
101 logger.debug(f"PipelineEditor: Plate {self.current_plate} orchestrator state: {orchestrator.state}, initialized: {is_initialized}")
102 return is_initialized
104 async def _handle_button_press(self, button_id: str) -> None:
105 """Handle button presses from ButtonListWidget (supports async actions)."""
106 import inspect
108 if button_id == "add_step":
109 await self.action_add_step()
110 elif button_id == "del_step":
111 self.action_delete_step()
112 elif button_id == "edit_step":
113 await self.action_edit_step()
114 elif button_id == "load_pipeline":
115 await self.action_load_pipeline()
116 elif button_id == "save_pipeline":
117 await self.action_save_pipeline()
118 elif button_id == "code_pipeline":
119 await self.action_code_pipeline()
121 def _handle_selection_change(self, selected_values: List[str]) -> None:
122 """Handle selection changes from ButtonListWidget."""
123 # Update selected_step - use first selected item if any
124 if selected_values:
125 self.selected_step = selected_values[0] # This is the step name
126 else:
127 self.selected_step = ""
129 def _handle_item_moved(self, from_index: int, to_index: int) -> None:
130 """Handle item movement from ButtonListWidget."""
131 current_steps = list(self.pipeline_steps)
133 # Move the step
134 step = current_steps.pop(from_index)
135 current_steps.insert(to_index, step)
137 # Update pipeline steps
138 self.pipeline_steps = current_steps
140 step_name = getattr(step, 'name', 'Unknown Step')
141 direction = "up" if to_index < from_index else "down"
142 self.app.current_status = f"Moved step '{step_name}' {direction}"
144 def on_selection_list_selected_changed(self, event: SelectionList.SelectedChanged) -> None:
145 """Handle selection changes from SelectionList."""
146 selected_values = event.selection_list.selected
148 # Update selected_step - use first selected item if any
149 if selected_values:
150 self.selected_step = selected_values[0] # This is the step name/id
151 else:
152 self.selected_step = ""
154 # Update button states based on selection
155 self._update_button_states_for_selection(selected_values)
157 def _update_button_states_for_selection(self, selected_values: List[str]) -> None:
158 """Update button states based on current selection and mathematical constraints."""
159 try:
160 has_plate = bool(self.current_plate)
161 is_initialized = self._is_current_plate_initialized()
162 has_steps = len(self.pipeline_steps) > 0
163 has_selection = len(selected_values) > 0
165 # Mathematical constraints:
166 # - Pipeline editing requires initialization
167 # - Edit requires exactly one selection
168 self.query_one("#add_step").disabled = not (has_plate and is_initialized)
169 self.query_one("#del_step").disabled = not has_selection
170 self.query_one("#edit_step").disabled = not (len(selected_values) == 1) # Edit requires exactly one selection
171 self.query_one("#load_pipeline").disabled = not (has_plate and is_initialized)
172 self.query_one("#save_pipeline").disabled = not has_steps
173 self.query_one("#code_pipeline").disabled = not (has_plate and is_initialized) # Same as add button
175 except Exception:
176 # Buttons might not be mounted yet
177 pass
179 def get_selection_state(self) -> tuple[List[FunctionStep], str]:
180 """Get current selection state from SelectionList."""
181 try:
182 selection_list = self.query_one("#pipeline_content", SelectionList)
183 selected_values = selection_list.selected
185 # Convert selected values back to step objects
186 selected_items = []
187 for step in self.pipeline_steps:
188 step_name = getattr(step, 'name', '')
189 if step_name in selected_values:
190 selected_items.append(step)
192 # Determine selection mode
193 if not selected_items:
194 selection_mode = "empty"
195 elif len(selected_items) == len(self.pipeline_steps):
196 selection_mode = "all"
197 else:
198 selection_mode = "checkbox" # SelectionList is always checkbox-based
200 return selected_items, selection_mode
201 except Exception:
202 # Fallback if widget not mounted
203 return [], "empty"
205 def watch_current_plate(self, plate_path: str) -> None:
206 """Automatically update UI when current_plate changes."""
207 logger.debug(f"Current plate changed: {plate_path}")
209 # Load pipeline for the new plate WITHOUT triggering save/invalidation
210 if plate_path:
211 # Get pipeline for this plate (or empty if none exists)
212 plate_pipeline = self.plate_pipelines.get(plate_path, [])
213 # Set pipeline_steps directly without triggering reactive save
214 self._set_pipeline_steps_without_save(plate_pipeline)
215 else:
216 # No plate selected - clear steps
217 self._set_pipeline_steps_without_save([])
219 # Clear selection when plate changes
220 self.selected_step = ""
222 # Update button states
223 self._update_button_states()
225 def _set_pipeline_steps_without_save(self, steps: List[FunctionStep]) -> None:
226 """Set pipeline steps without triggering save/invalidation (for loading existing data)."""
227 # Temporarily disable the reactive watcher to prevent save cascade
228 self._loading_existing_pipeline = True
229 self.pipeline_steps = steps
230 # Sync with ButtonListWidget's items property
231 self.items = list(steps)
232 self._loading_existing_pipeline = False
234 def watch_pipeline_steps(self, steps: List[FunctionStep]) -> None:
235 """Automatically update UI when pipeline_steps changes."""
236 # Sync with ButtonListWidget's items property to trigger its reactive system
237 self.items = list(steps)
239 logger.debug(f"Pipeline steps updated: {len(steps)} steps")
241 # Only save/invalidate if this is a real change, not loading existing data
242 if not getattr(self, '_loading_existing_pipeline', False):
243 # Save pipeline changes to plate storage
244 self._save_pipeline_to_plate_storage()
246 def _save_pipeline_to_plate_storage(self) -> None:
247 """Save current pipeline steps to plate storage and invalidate compilation."""
248 if self.current_plate:
249 # Update plate pipelines storage
250 current_pipelines = dict(self.plate_pipelines)
251 current_pipelines[self.current_plate] = list(self.pipeline_steps)
252 self.plate_pipelines = current_pipelines
253 logger.debug(f"Saved {len(self.pipeline_steps)} steps for plate: {self.current_plate}")
255 # Invalidate compilation status when pipeline changes
256 self._invalidate_compilation_status()
258 def get_pipeline_for_plate(self, plate_path: str) -> List[FunctionStep]:
259 """Get pipeline for specific plate."""
260 return self.plate_pipelines.get(plate_path, [])
262 def save_pipeline_for_plate(self, plate_path: str, pipeline: List[FunctionStep]) -> None:
263 """Save pipeline for specific plate."""
264 current_pipelines = dict(self.plate_pipelines)
265 current_pipelines[plate_path] = pipeline
266 self.plate_pipelines = current_pipelines
268 def clear_pipeline_for_plate(self, plate_path: str) -> None:
269 """Clear pipeline for specific plate."""
270 current_pipelines = dict(self.plate_pipelines)
271 if plate_path in current_pipelines:
272 del current_pipelines[plate_path]
273 self.plate_pipelines = current_pipelines
275 def _invalidate_compilation_status(self) -> None:
276 """Reset compilation status when pipeline definition changes."""
277 if not self.plate_manager or not self.current_plate:
278 return
280 # Clear compiled data from simple state
281 if self.current_plate in self.plate_manager.plate_compiled_data:
282 del self.plate_manager.plate_compiled_data[self.current_plate]
284 # Reset orchestrator state to READY (initialized)
285 orchestrator = self.plate_manager.orchestrators.get(self.current_plate)
286 if orchestrator and orchestrator.state == OrchestratorState.COMPILED:
287 orchestrator._state = OrchestratorState.READY
289 # Trigger UI refresh after orchestrator state change
290 if self.plate_manager:
291 self.plate_manager._trigger_ui_refresh()
292 self.plate_manager._update_button_states()
296 def watch_selected_step(self, step_id: str) -> None:
297 """Automatically update UI when selected_step changes."""
298 self._update_button_states()
299 logger.debug(f"Selected step: {step_id}")
301 def _update_button_states(self) -> None:
302 """Update button enabled/disabled states based on mathematical constraints."""
303 try:
304 has_plate = bool(self.current_plate)
305 is_initialized = self._is_current_plate_initialized()
306 has_steps = len(self.pipeline_steps) > 0
307 has_valid_selection = bool(self.selected_step) and self._find_step_index_by_selection() is not None
309 logger.debug(f"PipelineEditor: Button state update - has_plate: {has_plate}, is_initialized: {is_initialized}, has_steps: {has_steps}")
311 # Mathematical constraints:
312 # - Pipeline editing requires initialization
313 # - Step operations require steps to exist
314 # - Edit requires valid selection that maps to actual step
315 add_enabled = has_plate and is_initialized
316 load_enabled = has_plate and is_initialized
317 code_enabled = has_plate and is_initialized # Same as add button - orchestrator init is sufficient
319 logger.debug(f"PipelineEditor: Setting add_step.disabled = {not add_enabled}, load_pipeline.disabled = {not load_enabled}")
321 self.query_one("#add_step").disabled = not add_enabled
322 self.query_one("#del_step").disabled = not has_steps
323 self.query_one("#edit_step").disabled = not (has_steps and has_valid_selection)
324 self.query_one("#load_pipeline").disabled = not load_enabled
325 self.query_one("#save_pipeline").disabled = not has_steps
326 self.query_one("#code_pipeline").disabled = not code_enabled # Changed from has_steps to code_enabled
327 except Exception:
328 # Buttons might not be mounted yet
329 pass
333 async def action_add_step(self) -> None:
334 """Handle Add Step button - now triggers modal."""
336 def handle_result(result: Optional[FunctionStep]) -> None:
337 if result: # User saved new step
338 # Store the actual FunctionStep object directly (preserves memory type decorators)
339 new_steps = self.pipeline_steps + [result]
340 self.pipeline_steps = new_steps
341 self.app.current_status = f"Added step: {result.name}"
342 else:
343 self.app.current_status = "Add step cancelled"
345 # LAZY IMPORT to avoid circular import
346 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow
347 from textual.css.query import NoMatches
349 # Use window-based dual editor (follows ConfigWindow pattern)
350 try:
351 window = self.app.query_one(DualEditorWindow)
352 # Window exists, update it for new step and open
353 window.editing_step = window.pattern_manager.create_new_step()
354 window.is_new = True
355 window.on_save_callback = handle_result
356 window.original_step = window.pattern_manager.clone_pattern(window.editing_step)
357 window.open_state = True
358 except NoMatches:
359 # Expected case: window doesn't exist yet, create new one
360 window = DualEditorWindow(step_data=None, is_new=True, on_save_callback=handle_result)
361 await self.app.mount(window)
362 window.open_state = True
364 def action_delete_step(self) -> None:
365 """Handle Delete Step button - delete selected steps."""
367 # Get current selection state
368 selected_items, selection_mode = self.get_selection_state()
370 if selection_mode == "empty":
371 self.app.current_status = "No steps available for deletion"
372 return
374 # Generate description and perform deletion
375 count = len(selected_items)
376 if selection_mode == "empty":
377 desc = "No items available for deletion"
378 elif selection_mode == "all":
379 desc = f"Delete ALL {count} items"
380 elif count == 1:
381 item_name = getattr(selected_items[0], 'name', 'Unknown')
382 desc = f"Delete selected item: {item_name}"
383 else:
384 desc = f"Delete {count} selected items"
386 # Remove selected steps
387 current_steps = list(self.pipeline_steps)
388 steps_to_remove = set(getattr(item, 'name', '') for item in selected_items)
390 # Filter out selected steps
391 new_steps = [step for step in current_steps if getattr(step, 'name', '') not in steps_to_remove]
393 # Update pipeline steps (this will trigger save to plate storage)
394 self.pipeline_steps = new_steps
396 deleted_count = len(current_steps) - len(new_steps)
397 self.app.current_status = f"Deleted {deleted_count} steps"
399 def _dict_to_function_step(self, step_dict: Dict) -> FunctionStep:
400 """Convert step dict to FunctionStep object with proper data preservation."""
401 # Extract function - handle both callable and registry lookup
402 func = step_dict.get("func")
403 if func is None:
404 # Fallback to default function if missing
405 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
406 registry = RegistryService()
407 func = registry.find_default_function()
408 logger.warning(f"Step '{step_dict.get('name', 'Unknown')}' missing function, using default")
410 # Extract variable components - handle both list and string formats
411 var_components = step_dict.get("variable_components", [])
412 if isinstance(var_components, str):
413 var_components = [var_components]
414 elif not isinstance(var_components, list):
415 var_components = []
417 # Only pass variable_components if it's not empty, let FunctionStep use its default otherwise
418 step_kwargs = {
419 "func": func,
420 "name": step_dict.get("name", "Unknown Step"),
421 "group_by": step_dict.get("group_by", "")
422 }
423 if var_components: # Only add if not empty
424 step_kwargs["variable_components"] = var_components
426 return FunctionStep(**step_kwargs)
428 def _function_step_to_dict(self, step: FunctionStep) -> Dict:
429 """Convert FunctionStep object to dict with complete data preservation."""
430 return {
431 "name": step.name,
432 "type": "function",
433 "func": step.func,
434 "variable_components": step.variable_components,
435 "group_by": step.group_by
436 }
438 def _find_step_index_by_selection(self) -> Optional[int]:
439 """Find the index of the currently selected step."""
440 if not self.selected_step:
441 return None
443 # selected_step contains the step name/id
444 for i, step in enumerate(self.pipeline_steps):
445 # Now step is a FunctionStep object, not a dict
446 step_name = getattr(step, 'name', f"Step {i+1}")
447 if step_name == self.selected_step:
448 return i
449 return None
451 async def action_edit_step(self) -> None:
452 """Handle Edit Step button with proper selection and data preservation."""
454 if not self.pipeline_steps:
455 self.app.current_status = "No steps to edit"
456 return
458 # Find selected step index
459 step_index = self._find_step_index_by_selection()
460 if step_index is None:
461 self.app.current_status = "No step selected for editing"
462 return
464 step_to_edit = self.pipeline_steps[step_index]
466 def handle_result(result: Optional[FunctionStep]) -> None:
467 if result: # User saved changes
468 # Store the actual FunctionStep object directly (preserves memory type decorators)
469 updated_steps = self.pipeline_steps.copy()
470 updated_steps[step_index] = result
471 self.pipeline_steps = updated_steps
472 self.app.current_status = f"Updated step: {result.name}"
473 else:
474 self.app.current_status = "Edit step cancelled"
476 # Use the actual FunctionStep object directly (no conversion needed)
477 edit_step = step_to_edit
479 # LAZY IMPORT to avoid circular import
480 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow
481 from textual.css.query import NoMatches
483 # Use window-based dual editor (follows ConfigWindow pattern)
484 try:
485 window = self.app.query_one(DualEditorWindow)
486 # Window exists, update it for editing existing step and open
487 window.editing_step = edit_step
488 window.is_new = False
489 window.on_save_callback = handle_result
490 window.original_step = window.pattern_manager.clone_pattern(window.editing_step)
491 window.open_state = True
492 except NoMatches:
493 # Expected case: window doesn't exist yet, create new one
494 window = DualEditorWindow(step_data=edit_step, is_new=False, on_save_callback=handle_result)
495 await self.app.mount(window)
496 window.open_state = True
498 async def action_load_pipeline(self) -> None:
499 """Handle Load Pipeline button - load pipeline from file."""
501 if not self.current_plate:
502 self.app.current_status = "No plate selected for loading pipeline"
503 return
505 # Launch enhanced file browser for .pipeline files
506 def handle_result(result):
507 from pathlib import Path # Import at the top of the function
509 # Handle different result types from file browser
510 paths_to_load = []
512 if isinstance(result, Path):
513 # Single Path object
514 paths_to_load = [result]
515 elif isinstance(result, list) and len(result) > 0:
516 # List of paths - support multiple pipeline files
517 for item in result:
518 if isinstance(item, Path):
519 paths_to_load.append(item)
520 else:
521 paths_to_load.append(Path(item))
522 elif isinstance(result, str):
523 # String path
524 paths_to_load = [Path(result)]
526 if paths_to_load:
527 logger.debug(f"Loading {len(paths_to_load)} pipeline files")
528 self._load_multiple_pipeline_files(paths_to_load)
529 else:
530 self.app.current_status = "Load pipeline cancelled"
532 # Create file browser window for .pipeline files
533 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
534 from openhcs.textual_tui.services.file_browser_service import SelectionMode
535 from openhcs.constants.constants import Backend
536 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
538 await open_file_browser_window(
539 app=self.app,
540 file_manager=self.filemanager,
541 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES),
542 backend=Backend.DISK,
543 title="Load Pipeline (.pipeline)",
544 mode=BrowserMode.LOAD,
545 selection_mode=SelectionMode.FILES_ONLY,
546 filter_extensions=['.pipeline'],
547 cache_key=PathCacheKey.PIPELINE_FILES,
548 on_result_callback=handle_result,
549 caller_id="pipeline_editor"
550 )
552 def _load_multiple_pipeline_files(self, file_paths: List[Path]) -> None:
553 """Load and concatenate steps from multiple pipeline files."""
554 all_steps = []
555 loaded_files = []
556 failed_files = []
558 for file_path in file_paths:
559 try:
560 steps = self._load_single_pipeline_file(file_path)
561 if steps:
562 all_steps.extend(steps)
563 loaded_files.append(file_path.name)
564 logger.info(f"✅ Loaded {len(steps)} steps from {file_path.name}")
565 else:
566 failed_files.append(file_path.name)
567 except Exception as e:
568 logger.error(f"❌ Failed to load {file_path.name}: {e}")
569 failed_files.append(file_path.name)
571 if all_steps:
572 # Replace current pipeline with concatenated steps
573 self.pipeline_steps = all_steps
575 # Apply to multiple orchestrators if they are selected
576 self._apply_pipeline_to_selected_orchestrators(all_steps)
578 # Create status message
579 if len(loaded_files) == 1:
580 status = f"Loaded {len(all_steps)} steps from {loaded_files[0]}"
581 else:
582 status = f"Loaded {len(all_steps)} steps from {len(loaded_files)} files: {', '.join(loaded_files)}"
584 if failed_files:
585 status += f" (Failed: {', '.join(failed_files)})"
587 self.app.current_status = status
588 logger.info(f"🎯 Total pipeline: {len(all_steps)} steps from {len(loaded_files)} files")
589 else:
590 self.app.current_status = f"No valid pipeline steps loaded from {len(file_paths)} files"
592 def _load_single_pipeline_file(self, file_path: Path) -> List:
593 """Load pipeline steps from a single .pipeline file."""
594 import dill as pickle
595 try:
596 with open(file_path, 'rb') as f:
597 pattern = pickle.load(f)
599 if isinstance(pattern, list):
600 return pattern
601 else:
602 logger.error(f"Invalid pipeline format in {file_path.name}: expected list, got {type(pattern)}")
603 return []
604 except Exception as e:
605 logger.error(f"Failed to load pipeline from {file_path.name}: {e}")
606 raise
608 def _apply_pipeline_to_selected_orchestrators(self, pipeline_steps: List) -> None:
609 """Apply loaded pipeline to all selected orchestrators."""
610 if not self.plate_manager:
611 return
613 # Get selected orchestrators from plate manager
614 selected_items, selection_mode = self.plate_manager.get_selection_state()
616 if selection_mode == "empty" or len(selected_items) <= 1:
617 # Single or no selection - normal behavior
618 return
620 # Multiple orchestrators selected - apply pipeline to all
621 applied_count = 0
622 for item in selected_items:
623 plate_path = item['path']
624 if plate_path in self.plate_manager.orchestrators:
625 orchestrator = self.plate_manager.orchestrators[plate_path]
626 orchestrator.pipeline_definition = list(pipeline_steps)
628 # Also save to our plate pipelines storage
629 self.save_pipeline_for_plate(plate_path, list(pipeline_steps))
630 applied_count += 1
632 if applied_count > 1:
633 self.app.current_status += f" → Applied to {applied_count} orchestrators"
634 logger.info(f"Applied pipeline to {applied_count} selected orchestrators")
636 def _load_pipeline_from_file(self, file_path: Path) -> None:
637 """Load pipeline from .pipeline file (legacy single-file method)."""
638 try:
639 steps = self._load_single_pipeline_file(file_path)
640 if steps:
641 self.pipeline_steps = steps
642 self.app.current_status = f"Loaded {len(steps)} steps from {file_path.name}"
643 else:
644 self.app.current_status = f"Invalid pipeline format in {file_path.name}"
645 except Exception as e:
646 self.app.current_status = f"Failed to load pipeline: {e}"
648 async def action_save_pipeline(self) -> None:
649 """Handle Save Pipeline button - save pipeline to file."""
651 if not self.current_plate:
652 self.app.current_status = "No plate selected for saving pipeline"
653 return
655 if not self.pipeline_steps:
656 self.app.current_status = "No pipeline steps to save"
657 return
659 # Launch enhanced file browser for saving pipeline
660 def handle_result(result):
661 if result and isinstance(result, Path):
662 self._save_pipeline_to_file(result)
663 else:
664 self.app.current_status = "Save pipeline cancelled"
666 # Create file browser window for saving .pipeline files
667 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
668 from openhcs.textual_tui.services.file_browser_service import SelectionMode
669 from openhcs.constants.constants import Backend
670 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
672 # Generate default filename from plate name
673 plate_name = Path(self.current_plate).name if self.current_plate else "pipeline"
674 default_filename = f"{plate_name}.pipeline"
676 await open_file_browser_window(
677 app=self.app,
678 file_manager=self.filemanager,
679 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES),
680 backend=Backend.DISK,
681 title="Save Pipeline (.pipeline)",
682 mode=BrowserMode.SAVE,
683 selection_mode=SelectionMode.FILES_ONLY,
684 filter_extensions=['.pipeline'],
685 default_filename=default_filename,
686 cache_key=PathCacheKey.PIPELINE_FILES,
687 on_result_callback=handle_result,
688 caller_id="pipeline_editor"
689 )
691 def _save_pipeline_to_file(self, file_path: Path) -> None:
692 """Save pipeline to .pipeline file."""
693 import dill as pickle
694 try:
695 with open(file_path, 'wb') as f:
696 pickle.dump(list(self.pipeline_steps), f)
697 self.app.current_status = f"Saved pipeline to {file_path.name}"
698 except Exception as e:
699 logger.error(f"Failed to save pipeline: {e}")
700 self.app.current_status = f"Failed to save pipeline: {e}"
702 async def action_code_pipeline(self) -> None:
703 """Edit pipeline as Python code in terminal window."""
704 logger.debug("Code button pressed - opening pipeline editor")
706 if not self.current_plate:
707 self.app.current_status = "No plate selected"
708 return
710 try:
711 # Use complete pipeline steps code generation
712 from openhcs.debug.pickle_to_python import generate_complete_pipeline_steps_code
713 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher
715 # Generate complete pipeline steps code with imports
716 python_code = generate_complete_pipeline_steps_code(
717 pipeline_steps=list(self.pipeline_steps),
718 clean_mode=True
719 )
721 # Create callback to handle edited code
722 def handle_edited_code(edited_code: str):
723 logger.debug("Pipeline code edited, processing changes...")
724 try:
725 # Execute the code (it has all necessary imports)
726 namespace = {}
727 exec(edited_code, namespace)
729 # Get the pipeline_steps from the namespace
730 if 'pipeline_steps' in namespace:
731 new_pipeline_steps = namespace['pipeline_steps']
732 # Update the pipeline with new steps
733 self.pipeline_steps = new_pipeline_steps
734 self.app.current_status = f"Pipeline updated with {len(new_pipeline_steps)} steps"
735 else:
736 self.app.show_error("Parse Error", "No 'pipeline_steps = [...]' assignment found in edited code")
738 except SyntaxError as e:
739 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}")
740 except Exception as e:
741 logger.error(f"Failed to parse edited pipeline code: {e}")
742 self.app.show_error("Edit Error", f"Failed to parse pipeline code: {str(e)}")
744 # Launch terminal editor
745 launcher = TerminalLauncher(self.app)
746 await launcher.launch_editor_for_file(
747 file_content=python_code,
748 file_extension='.py',
749 on_save_callback=handle_edited_code
750 )
752 except Exception as e:
753 logger.error(f"Failed to open pipeline code editor: {e}")
754 self.app.current_status = f"Failed to open code editor: {e}"