Coverage for openhcs/textual_tui/widgets/pipeline_editor.py: 0.0%
415 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2PipelineEditorWidget for OpenHCS Textual TUI
4Pipeline editing widget with complete button set and reactive state management.
5Matches the functionality from the current prompt-toolkit TUI.
6"""
8import logging
9from typing import Dict, List, Optional, Any, Callable, Tuple
10from pathlib import Path
12from textual.app import ComposeResult
13from textual.containers import Horizontal, ScrollableContainer, Vertical
14from textual.reactive import reactive
15from textual.widgets import Button, Static, SelectionList
16from textual.widget import Widget
17from .button_list_widget import ButtonListWidget, ButtonConfig
18from textual import work
20from openhcs.core.config import GlobalPipelineConfig
21from openhcs.io.filemanager import FileManager
22from openhcs.core.steps.function_step import FunctionStep
23from openhcs.constants.constants import OrchestratorState
25logger = logging.getLogger(__name__)
28class PipelineEditorWidget(ButtonListWidget):
29 """
30 Pipeline editing widget using Textual reactive state.
32 Features:
33 - Complete button set: Add, Del, Edit, Load, Save
34 - Reactive state management for automatic UI updates
35 - Scrollable content area
36 - Integration with plate selection from PlateManager
37 """
39 # Textual reactive state
40 pipeline_steps = reactive([])
41 current_plate = reactive("")
42 selected_step = reactive("")
43 plate_pipelines = reactive({}) # {plate_path: List[FunctionStep]} - per-plate pipeline storage
45 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig):
46 """
47 Initialize the pipeline editor widget.
49 Args:
50 filemanager: FileManager instance for file operations
51 global_config: Global configuration
52 """
53 # Define button configuration
54 button_configs = [
55 ButtonConfig("Add", "add_step", disabled=True),
56 ButtonConfig("Del", "del_step", disabled=True),
57 ButtonConfig("Edit", "edit_step", disabled=True),
58 ButtonConfig("Load", "load_pipeline", disabled=True),
59 ButtonConfig("Save", "save_pipeline", disabled=True),
60 ButtonConfig("Code", "code_pipeline", disabled=True),
61 ]
63 super().__init__(
64 button_configs=button_configs,
65 list_id="pipeline_content",
66 container_id="pipeline_list",
67 on_button_pressed=self._handle_button_press,
68 on_selection_changed=self._handle_selection_change,
69 on_item_moved=self._handle_item_moved
70 )
72 self.filemanager = filemanager
73 # Note: We don't store global_config as it can become stale
74 # Always use self.app.global_config to get the current config
76 # Reference to plate manager (set by MainContent)
77 self.plate_manager = None
79 logger.debug("PipelineEditorWidget initialized")
81 def format_item_for_display(self, step: FunctionStep) -> Tuple[str, str]:
82 """Format step for display in the list."""
83 step_name = getattr(step, 'name', 'Unknown Step')
84 display_text = f"📋 {step_name}"
85 return display_text, step_name
87 def _is_current_plate_initialized(self) -> bool:
88 """Check if current plate has an initialized orchestrator."""
89 if not self.current_plate or not self.plate_manager:
90 logger.debug(f"PipelineEditor: No current plate ({self.current_plate}) or plate_manager ({self.plate_manager})")
91 return False
93 orchestrator = self.plate_manager.orchestrators.get(self.current_plate)
94 if orchestrator is None:
95 logger.debug(f"PipelineEditor: No orchestrator found for plate {self.current_plate}")
96 return False
98 is_initialized = orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED,
99 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED,
100 OrchestratorState.EXEC_FAILED]
101 logger.debug(f"PipelineEditor: Plate {self.current_plate} orchestrator state: {orchestrator.state}, initialized: {is_initialized}")
102 return is_initialized
104 async def _handle_button_press(self, button_id: str) -> None:
105 """Handle button presses from ButtonListWidget (supports async actions)."""
106 import inspect
108 if button_id == "add_step":
109 await self.action_add_step()
110 elif button_id == "del_step":
111 self.action_delete_step()
112 elif button_id == "edit_step":
113 await self.action_edit_step()
114 elif button_id == "load_pipeline":
115 await self.action_load_pipeline()
116 elif button_id == "save_pipeline":
117 await self.action_save_pipeline()
118 elif button_id == "code_pipeline":
119 await self.action_code_pipeline()
121 def _handle_selection_change(self, selected_values: List[str]) -> None:
122 """Handle selection changes from ButtonListWidget."""
123 # Update selected_step - use first selected item if any
124 if selected_values:
125 self.selected_step = selected_values[0] # This is the step name
126 else:
127 self.selected_step = ""
129 def _handle_item_moved(self, from_index: int, to_index: int) -> None:
130 """Handle item movement from ButtonListWidget."""
131 current_steps = list(self.pipeline_steps)
133 # Move the step
134 step = current_steps.pop(from_index)
135 current_steps.insert(to_index, step)
137 # Update pipeline steps
138 self.pipeline_steps = current_steps
140 step_name = getattr(step, 'name', 'Unknown Step')
141 direction = "up" if to_index < from_index else "down"
142 self.app.current_status = f"Moved step '{step_name}' {direction}"
144 def on_selection_list_selected_changed(self, event: SelectionList.SelectedChanged) -> None:
145 """Handle selection changes from SelectionList."""
146 selected_values = event.selection_list.selected
148 # Update selected_step - use first selected item if any
149 if selected_values:
150 self.selected_step = selected_values[0] # This is the step name/id
151 else:
152 self.selected_step = ""
154 # Update button states based on selection
155 self._update_button_states_for_selection(selected_values)
157 def _update_button_states_for_selection(self, selected_values: List[str]) -> None:
158 """Update button states based on current selection and mathematical constraints."""
159 try:
160 has_plate = bool(self.current_plate)
161 is_initialized = self._is_current_plate_initialized()
162 has_steps = len(self.pipeline_steps) > 0
163 has_selection = len(selected_values) > 0
165 # Mathematical constraints:
166 # - Pipeline editing requires initialization
167 # - Edit requires exactly one selection
168 self.query_one("#add_step").disabled = not (has_plate and is_initialized)
169 self.query_one("#del_step").disabled = not has_selection
170 self.query_one("#edit_step").disabled = not (len(selected_values) == 1) # Edit requires exactly one selection
171 self.query_one("#load_pipeline").disabled = not (has_plate and is_initialized)
172 self.query_one("#save_pipeline").disabled = not has_steps
174 except Exception:
175 # Buttons might not be mounted yet
176 pass
178 def get_selection_state(self) -> tuple[List[FunctionStep], str]:
179 """Get current selection state from SelectionList."""
180 try:
181 selection_list = self.query_one("#pipeline_content", SelectionList)
182 selected_values = selection_list.selected
184 # Convert selected values back to step objects
185 selected_items = []
186 for step in self.pipeline_steps:
187 step_name = getattr(step, 'name', '')
188 if step_name in selected_values:
189 selected_items.append(step)
191 # Determine selection mode
192 if not selected_items:
193 selection_mode = "empty"
194 elif len(selected_items) == len(self.pipeline_steps):
195 selection_mode = "all"
196 else:
197 selection_mode = "checkbox" # SelectionList is always checkbox-based
199 return selected_items, selection_mode
200 except Exception:
201 # Fallback if widget not mounted
202 return [], "empty"
204 def watch_current_plate(self, plate_path: str) -> None:
205 """Automatically update UI when current_plate changes."""
206 logger.debug(f"Current plate changed: {plate_path}")
208 # Load pipeline for the new plate WITHOUT triggering save/invalidation
209 if plate_path:
210 # Get pipeline for this plate (or empty if none exists)
211 plate_pipeline = self.plate_pipelines.get(plate_path, [])
212 # Set pipeline_steps directly without triggering reactive save
213 self._set_pipeline_steps_without_save(plate_pipeline)
214 else:
215 # No plate selected - clear steps
216 self._set_pipeline_steps_without_save([])
218 # Clear selection when plate changes
219 self.selected_step = ""
221 # Update button states
222 self._update_button_states()
224 def _set_pipeline_steps_without_save(self, steps: List[FunctionStep]) -> None:
225 """Set pipeline steps without triggering save/invalidation (for loading existing data)."""
226 # Temporarily disable the reactive watcher to prevent save cascade
227 self._loading_existing_pipeline = True
228 self.pipeline_steps = steps
229 # Sync with ButtonListWidget's items property
230 self.items = list(steps)
231 self._loading_existing_pipeline = False
233 def watch_pipeline_steps(self, steps: List[FunctionStep]) -> None:
234 """Automatically update UI when pipeline_steps changes."""
235 # Sync with ButtonListWidget's items property to trigger its reactive system
236 self.items = list(steps)
238 logger.debug(f"Pipeline steps updated: {len(steps)} steps")
240 # Only save/invalidate if this is a real change, not loading existing data
241 if not getattr(self, '_loading_existing_pipeline', False):
242 # Save pipeline changes to plate storage
243 self._save_pipeline_to_plate_storage()
245 def _save_pipeline_to_plate_storage(self) -> None:
246 """Save current pipeline steps to plate storage and invalidate compilation."""
247 if self.current_plate:
248 # Update plate pipelines storage
249 current_pipelines = dict(self.plate_pipelines)
250 current_pipelines[self.current_plate] = list(self.pipeline_steps)
251 self.plate_pipelines = current_pipelines
252 logger.debug(f"Saved {len(self.pipeline_steps)} steps for plate: {self.current_plate}")
254 # Invalidate compilation status when pipeline changes
255 self._invalidate_compilation_status()
257 def get_pipeline_for_plate(self, plate_path: str) -> List[FunctionStep]:
258 """Get pipeline for specific plate."""
259 return self.plate_pipelines.get(plate_path, [])
261 def save_pipeline_for_plate(self, plate_path: str, pipeline: List[FunctionStep]) -> None:
262 """Save pipeline for specific plate."""
263 current_pipelines = dict(self.plate_pipelines)
264 current_pipelines[plate_path] = pipeline
265 self.plate_pipelines = current_pipelines
267 def clear_pipeline_for_plate(self, plate_path: str) -> None:
268 """Clear pipeline for specific plate."""
269 current_pipelines = dict(self.plate_pipelines)
270 if plate_path in current_pipelines:
271 del current_pipelines[plate_path]
272 self.plate_pipelines = current_pipelines
274 def _invalidate_compilation_status(self) -> None:
275 """Reset compilation status when pipeline definition changes."""
276 if not self.plate_manager or not self.current_plate:
277 return
279 # Clear compiled data from simple state
280 if self.current_plate in self.plate_manager.plate_compiled_data:
281 del self.plate_manager.plate_compiled_data[self.current_plate]
283 # Reset orchestrator state to READY (initialized)
284 orchestrator = self.plate_manager.orchestrators.get(self.current_plate)
285 if orchestrator and orchestrator.state == OrchestratorState.COMPILED:
286 orchestrator._state = OrchestratorState.READY
288 # Trigger UI refresh after orchestrator state change
289 if self.plate_manager:
290 self.plate_manager._trigger_ui_refresh()
291 self.plate_manager._update_button_states()
295 def watch_selected_step(self, step_id: str) -> None:
296 """Automatically update UI when selected_step changes."""
297 self._update_button_states()
298 logger.debug(f"Selected step: {step_id}")
300 def _update_button_states(self) -> None:
301 """Update button enabled/disabled states based on mathematical constraints."""
302 try:
303 has_plate = bool(self.current_plate)
304 is_initialized = self._is_current_plate_initialized()
305 has_steps = len(self.pipeline_steps) > 0
306 has_valid_selection = bool(self.selected_step) and self._find_step_index_by_selection() is not None
308 logger.debug(f"PipelineEditor: Button state update - has_plate: {has_plate}, is_initialized: {is_initialized}, has_steps: {has_steps}")
310 # Mathematical constraints:
311 # - Pipeline editing requires initialization
312 # - Step operations require steps to exist
313 # - Edit requires valid selection that maps to actual step
314 add_enabled = has_plate and is_initialized
315 load_enabled = has_plate and is_initialized
317 logger.debug(f"PipelineEditor: Setting add_step.disabled = {not add_enabled}, load_pipeline.disabled = {not load_enabled}")
319 self.query_one("#add_step").disabled = not add_enabled
320 self.query_one("#del_step").disabled = not has_steps
321 self.query_one("#edit_step").disabled = not (has_steps and has_valid_selection)
322 self.query_one("#load_pipeline").disabled = not load_enabled
323 self.query_one("#save_pipeline").disabled = not has_steps
324 self.query_one("#code_pipeline").disabled = not has_steps
325 except Exception:
326 # Buttons might not be mounted yet
327 pass
331 async def action_add_step(self) -> None:
332 """Handle Add Step button - now triggers modal."""
334 def handle_result(result: Optional[FunctionStep]) -> None:
335 if result: # User saved new step
336 # Store the actual FunctionStep object directly (preserves memory type decorators)
337 new_steps = self.pipeline_steps + [result]
338 self.pipeline_steps = new_steps
339 self.app.current_status = f"Added step: {result.name}"
340 else:
341 self.app.current_status = "Add step cancelled"
343 # LAZY IMPORT to avoid circular import
344 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow
345 from textual.css.query import NoMatches
347 # Use window-based dual editor (follows ConfigWindow pattern)
348 try:
349 window = self.app.query_one(DualEditorWindow)
350 # Window exists, update it for new step and open
351 window.editing_step = window.pattern_manager.create_new_step()
352 window.is_new = True
353 window.on_save_callback = handle_result
354 window.original_step = window.pattern_manager.clone_pattern(window.editing_step)
355 window.open_state = True
356 except NoMatches:
357 # Expected case: window doesn't exist yet, create new one
358 window = DualEditorWindow(step_data=None, is_new=True, on_save_callback=handle_result)
359 await self.app.mount(window)
360 window.open_state = True
362 def action_delete_step(self) -> None:
363 """Handle Delete Step button - delete selected steps."""
365 # Get current selection state
366 selected_items, selection_mode = self.get_selection_state()
368 if selection_mode == "empty":
369 self.app.current_status = "No steps available for deletion"
370 return
372 # Generate description and perform deletion
373 count = len(selected_items)
374 if selection_mode == "empty":
375 desc = "No items available for deletion"
376 elif selection_mode == "all":
377 desc = f"Delete ALL {count} items"
378 elif count == 1:
379 item_name = getattr(selected_items[0], 'name', 'Unknown')
380 desc = f"Delete selected item: {item_name}"
381 else:
382 desc = f"Delete {count} selected items"
384 # Remove selected steps
385 current_steps = list(self.pipeline_steps)
386 steps_to_remove = set(getattr(item, 'name', '') for item in selected_items)
388 # Filter out selected steps
389 new_steps = [step for step in current_steps if getattr(step, 'name', '') not in steps_to_remove]
391 # Update pipeline steps (this will trigger save to plate storage)
392 self.pipeline_steps = new_steps
394 deleted_count = len(current_steps) - len(new_steps)
395 self.app.current_status = f"Deleted {deleted_count} steps"
397 def _dict_to_function_step(self, step_dict: Dict) -> FunctionStep:
398 """Convert step dict to FunctionStep object with proper data preservation."""
399 # Extract function - handle both callable and registry lookup
400 func = step_dict.get("func")
401 if func is None:
402 # Fallback to default function if missing
403 from openhcs.textual_tui.services.function_registry_service import FunctionRegistryService
404 registry = FunctionRegistryService()
405 func = registry.find_default_function()
406 logger.warning(f"Step '{step_dict.get('name', 'Unknown')}' missing function, using default")
408 # Extract variable components - handle both list and string formats
409 var_components = step_dict.get("variable_components", [])
410 if isinstance(var_components, str):
411 var_components = [var_components]
412 elif not isinstance(var_components, list):
413 var_components = []
415 # Only pass variable_components if it's not empty, let FunctionStep use its default otherwise
416 step_kwargs = {
417 "func": func,
418 "name": step_dict.get("name", "Unknown Step"),
419 "group_by": step_dict.get("group_by", "")
420 }
421 if var_components: # Only add if not empty
422 step_kwargs["variable_components"] = var_components
424 return FunctionStep(**step_kwargs)
426 def _function_step_to_dict(self, step: FunctionStep) -> Dict:
427 """Convert FunctionStep object to dict with complete data preservation."""
428 return {
429 "name": step.name,
430 "type": "function",
431 "func": step.func,
432 "variable_components": step.variable_components,
433 "group_by": step.group_by
434 }
436 def _find_step_index_by_selection(self) -> Optional[int]:
437 """Find the index of the currently selected step."""
438 if not self.selected_step:
439 return None
441 # selected_step contains the step name/id
442 for i, step in enumerate(self.pipeline_steps):
443 # Now step is a FunctionStep object, not a dict
444 step_name = getattr(step, 'name', f"Step {i+1}")
445 if step_name == self.selected_step:
446 return i
447 return None
449 async def action_edit_step(self) -> None:
450 """Handle Edit Step button with proper selection and data preservation."""
452 if not self.pipeline_steps:
453 self.app.current_status = "No steps to edit"
454 return
456 # Find selected step index
457 step_index = self._find_step_index_by_selection()
458 if step_index is None:
459 self.app.current_status = "No step selected for editing"
460 return
462 step_to_edit = self.pipeline_steps[step_index]
464 def handle_result(result: Optional[FunctionStep]) -> None:
465 if result: # User saved changes
466 # Store the actual FunctionStep object directly (preserves memory type decorators)
467 updated_steps = self.pipeline_steps.copy()
468 updated_steps[step_index] = result
469 self.pipeline_steps = updated_steps
470 self.app.current_status = f"Updated step: {result.name}"
471 else:
472 self.app.current_status = "Edit step cancelled"
474 # Use the actual FunctionStep object directly (no conversion needed)
475 edit_step = step_to_edit
477 # LAZY IMPORT to avoid circular import
478 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow
479 from textual.css.query import NoMatches
481 # Use window-based dual editor (follows ConfigWindow pattern)
482 try:
483 window = self.app.query_one(DualEditorWindow)
484 # Window exists, update it for editing existing step and open
485 window.editing_step = edit_step
486 window.is_new = False
487 window.on_save_callback = handle_result
488 window.original_step = window.pattern_manager.clone_pattern(window.editing_step)
489 window.open_state = True
490 except NoMatches:
491 # Expected case: window doesn't exist yet, create new one
492 window = DualEditorWindow(step_data=edit_step, is_new=False, on_save_callback=handle_result)
493 await self.app.mount(window)
494 window.open_state = True
496 async def action_load_pipeline(self) -> None:
497 """Handle Load Pipeline button - load pipeline from file."""
499 if not self.current_plate:
500 self.app.current_status = "No plate selected for loading pipeline"
501 return
503 # Launch enhanced file browser for .pipeline files
504 def handle_result(result):
505 from pathlib import Path # Import at the top of the function
507 # Handle different result types from file browser
508 paths_to_load = []
510 if isinstance(result, Path):
511 # Single Path object
512 paths_to_load = [result]
513 elif isinstance(result, list) and len(result) > 0:
514 # List of paths - support multiple pipeline files
515 for item in result:
516 if isinstance(item, Path):
517 paths_to_load.append(item)
518 else:
519 paths_to_load.append(Path(item))
520 elif isinstance(result, str):
521 # String path
522 paths_to_load = [Path(result)]
524 if paths_to_load:
525 logger.debug(f"Loading {len(paths_to_load)} pipeline files")
526 self._load_multiple_pipeline_files(paths_to_load)
527 else:
528 self.app.current_status = "Load pipeline cancelled"
530 # Create file browser window for .pipeline files
531 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
532 from openhcs.textual_tui.services.file_browser_service import SelectionMode
533 from openhcs.constants.constants import Backend
534 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
536 await open_file_browser_window(
537 app=self.app,
538 file_manager=self.filemanager,
539 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES),
540 backend=Backend.DISK,
541 title="Load Pipeline (.pipeline)",
542 mode=BrowserMode.LOAD,
543 selection_mode=SelectionMode.FILES_ONLY,
544 filter_extensions=['.pipeline'],
545 cache_key=PathCacheKey.PIPELINE_FILES,
546 on_result_callback=handle_result,
547 caller_id="pipeline_editor"
548 )
550 def _load_multiple_pipeline_files(self, file_paths: List[Path]) -> None:
551 """Load and concatenate steps from multiple pipeline files."""
552 all_steps = []
553 loaded_files = []
554 failed_files = []
556 for file_path in file_paths:
557 try:
558 steps = self._load_single_pipeline_file(file_path)
559 if steps:
560 all_steps.extend(steps)
561 loaded_files.append(file_path.name)
562 logger.info(f"✅ Loaded {len(steps)} steps from {file_path.name}")
563 else:
564 failed_files.append(file_path.name)
565 except Exception as e:
566 logger.error(f"❌ Failed to load {file_path.name}: {e}")
567 failed_files.append(file_path.name)
569 if all_steps:
570 # Replace current pipeline with concatenated steps
571 self.pipeline_steps = all_steps
573 # Apply to multiple orchestrators if they are selected
574 self._apply_pipeline_to_selected_orchestrators(all_steps)
576 # Create status message
577 if len(loaded_files) == 1:
578 status = f"Loaded {len(all_steps)} steps from {loaded_files[0]}"
579 else:
580 status = f"Loaded {len(all_steps)} steps from {len(loaded_files)} files: {', '.join(loaded_files)}"
582 if failed_files:
583 status += f" (Failed: {', '.join(failed_files)})"
585 self.app.current_status = status
586 logger.info(f"🎯 Total pipeline: {len(all_steps)} steps from {len(loaded_files)} files")
587 else:
588 self.app.current_status = f"No valid pipeline steps loaded from {len(file_paths)} files"
590 def _load_single_pipeline_file(self, file_path: Path) -> List:
591 """Load pipeline steps from a single .pipeline file."""
592 import dill as pickle
593 try:
594 with open(file_path, 'rb') as f:
595 pattern = pickle.load(f)
597 if isinstance(pattern, list):
598 return pattern
599 else:
600 logger.error(f"Invalid pipeline format in {file_path.name}: expected list, got {type(pattern)}")
601 return []
602 except Exception as e:
603 logger.error(f"Failed to load pipeline from {file_path.name}: {e}")
604 raise
606 def _apply_pipeline_to_selected_orchestrators(self, pipeline_steps: List) -> None:
607 """Apply loaded pipeline to all selected orchestrators."""
608 if not self.plate_manager:
609 return
611 # Get selected orchestrators from plate manager
612 selected_items, selection_mode = self.plate_manager.get_selection_state()
614 if selection_mode == "empty" or len(selected_items) <= 1:
615 # Single or no selection - normal behavior
616 return
618 # Multiple orchestrators selected - apply pipeline to all
619 applied_count = 0
620 for item in selected_items:
621 plate_path = item['path']
622 if plate_path in self.plate_manager.orchestrators:
623 orchestrator = self.plate_manager.orchestrators[plate_path]
624 orchestrator.pipeline_definition = list(pipeline_steps)
626 # Also save to our plate pipelines storage
627 self.save_pipeline_for_plate(plate_path, list(pipeline_steps))
628 applied_count += 1
630 if applied_count > 1:
631 self.app.current_status += f" → Applied to {applied_count} orchestrators"
632 logger.info(f"Applied pipeline to {applied_count} selected orchestrators")
634 def _load_pipeline_from_file(self, file_path: Path) -> None:
635 """Load pipeline from .pipeline file (legacy single-file method)."""
636 try:
637 steps = self._load_single_pipeline_file(file_path)
638 if steps:
639 self.pipeline_steps = steps
640 self.app.current_status = f"Loaded {len(steps)} steps from {file_path.name}"
641 else:
642 self.app.current_status = f"Invalid pipeline format in {file_path.name}"
643 except Exception as e:
644 self.app.current_status = f"Failed to load pipeline: {e}"
646 async def action_save_pipeline(self) -> None:
647 """Handle Save Pipeline button - save pipeline to file."""
649 if not self.current_plate:
650 self.app.current_status = "No plate selected for saving pipeline"
651 return
653 if not self.pipeline_steps:
654 self.app.current_status = "No pipeline steps to save"
655 return
657 # Launch enhanced file browser for saving pipeline
658 def handle_result(result):
659 if result and isinstance(result, Path):
660 self._save_pipeline_to_file(result)
661 else:
662 self.app.current_status = "Save pipeline cancelled"
664 # Create file browser window for saving .pipeline files
665 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
666 from openhcs.textual_tui.services.file_browser_service import SelectionMode
667 from openhcs.constants.constants import Backend
668 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
670 # Generate default filename from plate name
671 plate_name = Path(self.current_plate).name if self.current_plate else "pipeline"
672 default_filename = f"{plate_name}.pipeline"
674 await open_file_browser_window(
675 app=self.app,
676 file_manager=self.filemanager,
677 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES),
678 backend=Backend.DISK,
679 title="Save Pipeline (.pipeline)",
680 mode=BrowserMode.SAVE,
681 selection_mode=SelectionMode.FILES_ONLY,
682 filter_extensions=['.pipeline'],
683 default_filename=default_filename,
684 cache_key=PathCacheKey.PIPELINE_FILES,
685 on_result_callback=handle_result,
686 caller_id="pipeline_editor"
687 )
689 def _save_pipeline_to_file(self, file_path: Path) -> None:
690 """Save pipeline to .pipeline file."""
691 import dill as pickle
692 try:
693 with open(file_path, 'wb') as f:
694 pickle.dump(list(self.pipeline_steps), f)
695 self.app.current_status = f"Saved pipeline to {file_path.name}"
696 except Exception as e:
697 logger.error(f"Failed to save pipeline: {e}")
698 self.app.current_status = f"Failed to save pipeline: {e}"
700 async def action_code_pipeline(self) -> None:
701 """Edit pipeline as Python code in terminal window."""
702 logger.debug("Code button pressed - opening pipeline editor")
704 if not self.pipeline_steps:
705 self.app.current_status = "No pipeline steps to edit"
706 return
708 if not self.current_plate:
709 self.app.current_status = "No plate selected"
710 return
712 try:
713 # Use complete pipeline steps code generation
714 from openhcs.debug.pickle_to_python import generate_complete_pipeline_steps_code
715 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher
717 # Generate complete pipeline steps code with imports
718 python_code = generate_complete_pipeline_steps_code(
719 pipeline_steps=list(self.pipeline_steps),
720 clean_mode=False
721 )
723 # Create callback to handle edited code
724 def handle_edited_code(edited_code: str):
725 logger.debug("Pipeline code edited, processing changes...")
726 try:
727 # Execute the code (it has all necessary imports)
728 namespace = {}
729 exec(edited_code, namespace)
731 # Get the pipeline_steps from the namespace
732 if 'pipeline_steps' in namespace:
733 new_pipeline_steps = namespace['pipeline_steps']
734 # Update the pipeline with new steps
735 self.pipeline_steps = new_pipeline_steps
736 self.app.current_status = f"Pipeline updated with {len(new_pipeline_steps)} steps"
737 else:
738 self.app.show_error("Parse Error", "No 'pipeline_steps = [...]' assignment found in edited code")
740 except SyntaxError as e:
741 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}")
742 except Exception as e:
743 logger.error(f"Failed to parse edited pipeline code: {e}")
744 self.app.show_error("Edit Error", f"Failed to parse pipeline code: {str(e)}")
746 # Launch terminal editor
747 launcher = TerminalLauncher(self.app)
748 await launcher.launch_editor_for_file(
749 file_content=python_code,
750 file_extension='.py',
751 on_save_callback=handle_edited_code
752 )
754 except Exception as e:
755 logger.error(f"Failed to open pipeline code editor: {e}")
756 self.app.current_status = f"Failed to open code editor: {e}"