Coverage for openhcs/textual_tui/widgets/pipeline_editor.py: 0.0%
430 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2PipelineEditorWidget for OpenHCS Textual TUI
4Pipeline editing widget with complete button set and reactive state management.
5Matches the functionality from the current prompt-toolkit TUI.
6"""
8import logging
9from typing import Dict, List, Optional, Tuple
10from pathlib import Path
12from textual.reactive import reactive
13from textual.widgets import SelectionList
14from .button_list_widget import ButtonListWidget, ButtonConfig
16from openhcs.core.config import GlobalPipelineConfig
17from openhcs.io.filemanager import FileManager
18from openhcs.core.steps.function_step import FunctionStep
19from openhcs.constants.constants import OrchestratorState
21logger = logging.getLogger(__name__)
24class PipelineEditorWidget(ButtonListWidget):
25 """
26 Pipeline editing widget using Textual reactive state.
28 Features:
29 - Complete button set: Add, Del, Edit, Load, Save
30 - Reactive state management for automatic UI updates
31 - Scrollable content area
32 - Integration with plate selection from PlateManager
33 """
35 # Textual reactive state
36 pipeline_steps = reactive([])
37 current_plate = reactive("")
38 selected_step = reactive("")
39 plate_pipelines = reactive({}) # {plate_path: List[FunctionStep]} - per-plate pipeline storage
41 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig):
42 """
43 Initialize the pipeline editor widget.
45 Args:
46 filemanager: FileManager instance for file operations
47 global_config: Global configuration
48 """
49 # Define button configuration
50 button_configs = [
51 ButtonConfig("Add", "add_step", disabled=True),
52 ButtonConfig("Del", "del_step", disabled=True),
53 ButtonConfig("Edit", "edit_step", disabled=True),
54 ButtonConfig("Auto", "auto_load_pipeline", disabled=True),
55 ButtonConfig("Code", "code_pipeline", disabled=True),
56 ]
58 super().__init__(
59 button_configs=button_configs,
60 list_id="pipeline_content",
61 container_id="pipeline_list",
62 on_button_pressed=self._handle_button_press,
63 on_selection_changed=self._handle_selection_change,
64 on_item_moved=self._handle_item_moved
65 )
67 self.filemanager = filemanager
68 # Note: We don't store global_config as it can become stale
69 # Always use self.app.global_config to get the current config
71 # Reference to plate manager (set by MainContent)
72 self.plate_manager = None
74 logger.debug("PipelineEditorWidget initialized")
76 def format_item_for_display(self, step: FunctionStep) -> Tuple[str, str]:
77 """Format step for display in the list."""
78 step_name = getattr(step, 'name', 'Unknown Step')
79 display_text = f"📋 {step_name}"
80 return display_text, step_name
82 def _is_current_plate_initialized(self) -> bool:
83 """Check if current plate has an initialized orchestrator."""
84 if not self.current_plate or not self.plate_manager:
85 logger.debug(f"PipelineEditor: No current plate ({self.current_plate}) or plate_manager ({self.plate_manager})")
86 return False
88 orchestrator = self.plate_manager.orchestrators.get(self.current_plate)
89 if orchestrator is None:
90 logger.debug(f"PipelineEditor: No orchestrator found for plate {self.current_plate}")
91 return False
93 is_initialized = orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED,
94 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED,
95 OrchestratorState.EXEC_FAILED]
96 logger.debug(f"PipelineEditor: Plate {self.current_plate} orchestrator state: {orchestrator.state}, initialized: {is_initialized}")
97 return is_initialized
99 async def _handle_button_press(self, button_id: str) -> None:
100 """Handle button presses from ButtonListWidget (supports async actions)."""
102 if button_id == "add_step":
103 await self.action_add_step()
104 elif button_id == "del_step":
105 self.action_delete_step()
106 elif button_id == "edit_step":
107 await self.action_edit_step()
108 elif button_id == "auto_load_pipeline":
109 await self.action_auto_load_pipeline()
110 elif button_id == "code_pipeline":
111 await self.action_code_pipeline()
113 def _handle_selection_change(self, selected_values: List[str]) -> None:
114 """Handle selection changes from ButtonListWidget."""
115 # Update selected_step - use first selected item if any
116 if selected_values:
117 self.selected_step = selected_values[0] # This is the step name
118 else:
119 self.selected_step = ""
121 def _handle_item_moved(self, from_index: int, to_index: int) -> None:
122 """Handle item movement from ButtonListWidget."""
123 current_steps = list(self.pipeline_steps)
125 # Move the step
126 step = current_steps.pop(from_index)
127 current_steps.insert(to_index, step)
129 # Update pipeline steps
130 self.pipeline_steps = current_steps
132 step_name = getattr(step, 'name', 'Unknown Step')
133 direction = "up" if to_index < from_index else "down"
134 self.app.current_status = f"Moved step '{step_name}' {direction}"
136 def on_selection_list_selected_changed(self, event: SelectionList.SelectedChanged) -> None:
137 """Handle selection changes from SelectionList."""
138 selected_values = event.selection_list.selected
140 # Update selected_step - use first selected item if any
141 if selected_values:
142 self.selected_step = selected_values[0] # This is the step name/id
143 else:
144 self.selected_step = ""
146 # Update button states based on selection
147 self._update_button_states_for_selection(selected_values)
149 def _update_button_states_for_selection(self, selected_values: List[str]) -> None:
150 """Update button states based on current selection and mathematical constraints."""
151 try:
152 has_plate = bool(self.current_plate)
153 is_initialized = self._is_current_plate_initialized()
154 has_steps = len(self.pipeline_steps) > 0
155 has_selection = len(selected_values) > 0
157 # Mathematical constraints:
158 # - Pipeline editing requires initialization
159 # - Edit requires exactly one selection
160 self.query_one("#add_step").disabled = not (has_plate and is_initialized)
161 self.query_one("#del_step").disabled = not has_selection
162 self.query_one("#edit_step").disabled = not (len(selected_values) == 1) # Edit requires exactly one selection
163 self.query_one("#auto_load_pipeline").disabled = not (has_plate and is_initialized)
164 self.query_one("#code_pipeline").disabled = not (has_plate and is_initialized) # Same as add button
166 except Exception:
167 # Buttons might not be mounted yet
168 pass
170 def get_selection_state(self) -> tuple[List[FunctionStep], str]:
171 """Get current selection state from SelectionList."""
172 try:
173 selection_list = self.query_one("#pipeline_content", SelectionList)
174 selected_values = selection_list.selected
176 # Convert selected values back to step objects
177 selected_items = []
178 for step in self.pipeline_steps:
179 step_name = getattr(step, 'name', '')
180 if step_name in selected_values:
181 selected_items.append(step)
183 # Determine selection mode
184 if not selected_items:
185 selection_mode = "empty"
186 elif len(selected_items) == len(self.pipeline_steps):
187 selection_mode = "all"
188 else:
189 selection_mode = "checkbox" # SelectionList is always checkbox-based
191 return selected_items, selection_mode
192 except Exception:
193 # Fallback if widget not mounted
194 return [], "empty"
196 def watch_current_plate(self, plate_path: str) -> None:
197 """Automatically update UI when current_plate changes."""
198 logger.debug(f"Current plate changed: {plate_path}")
200 # Load pipeline for the new plate WITHOUT triggering save/invalidation
201 if plate_path:
202 # Get pipeline for this plate (or empty if none exists)
203 plate_pipeline = self.plate_pipelines.get(plate_path, [])
204 # Set pipeline_steps directly without triggering reactive save
205 self._set_pipeline_steps_without_save(plate_pipeline)
206 else:
207 # No plate selected - clear steps
208 self._set_pipeline_steps_without_save([])
210 # Clear selection when plate changes
211 self.selected_step = ""
213 # Update button states
214 self._update_button_states()
216 def _set_pipeline_steps_without_save(self, steps: List[FunctionStep]) -> None:
217 """Set pipeline steps without triggering save/invalidation (for loading existing data)."""
218 # Temporarily disable the reactive watcher to prevent save cascade
219 self._loading_existing_pipeline = True
220 self.pipeline_steps = steps
221 # Sync with ButtonListWidget's items property
222 self.items = list(steps)
223 self._loading_existing_pipeline = False
225 def watch_pipeline_steps(self, steps: List[FunctionStep]) -> None:
226 """Automatically update UI when pipeline_steps changes."""
227 # Sync with ButtonListWidget's items property to trigger its reactive system
228 self.items = list(steps)
230 logger.debug(f"Pipeline steps updated: {len(steps)} steps")
232 # Only save/invalidate if this is a real change, not loading existing data
233 if not getattr(self, '_loading_existing_pipeline', False):
234 # Save pipeline changes to plate storage
235 self._save_pipeline_to_plate_storage()
237 def _save_pipeline_to_plate_storage(self) -> None:
238 """Save current pipeline steps to plate storage and invalidate compilation."""
239 if self.current_plate:
240 # Update plate pipelines storage
241 current_pipelines = dict(self.plate_pipelines)
242 current_pipelines[self.current_plate] = list(self.pipeline_steps)
243 self.plate_pipelines = current_pipelines
244 logger.debug(f"Saved {len(self.pipeline_steps)} steps for plate: {self.current_plate}")
246 # Invalidate compilation status when pipeline changes
247 self._invalidate_compilation_status()
249 def get_pipeline_for_plate(self, plate_path: str) -> List[FunctionStep]:
250 """Get pipeline for specific plate."""
251 return self.plate_pipelines.get(plate_path, [])
253 def save_pipeline_for_plate(self, plate_path: str, pipeline: List[FunctionStep]) -> None:
254 """Save pipeline for specific plate."""
255 current_pipelines = dict(self.plate_pipelines)
256 current_pipelines[plate_path] = pipeline
257 self.plate_pipelines = current_pipelines
259 def clear_pipeline_for_plate(self, plate_path: str) -> None:
260 """Clear pipeline for specific plate."""
261 current_pipelines = dict(self.plate_pipelines)
262 if plate_path in current_pipelines:
263 del current_pipelines[plate_path]
264 self.plate_pipelines = current_pipelines
266 def _invalidate_compilation_status(self) -> None:
267 """Reset compilation status when pipeline definition changes."""
268 if not self.plate_manager or not self.current_plate:
269 return
271 # Clear compiled data from simple state
272 if self.current_plate in self.plate_manager.plate_compiled_data:
273 del self.plate_manager.plate_compiled_data[self.current_plate]
275 # Reset orchestrator state to READY (initialized)
276 orchestrator = self.plate_manager.orchestrators.get(self.current_plate)
277 if orchestrator and orchestrator.state == OrchestratorState.COMPILED:
278 orchestrator._state = OrchestratorState.READY
280 # Trigger UI refresh after orchestrator state change
281 if self.plate_manager:
282 self.plate_manager._trigger_ui_refresh()
283 self.plate_manager._update_button_states()
287 def watch_selected_step(self, step_id: str) -> None:
288 """Automatically update UI when selected_step changes."""
289 self._update_button_states()
290 logger.debug(f"Selected step: {step_id}")
292 def _update_button_states(self) -> None:
293 """Update button enabled/disabled states based on mathematical constraints."""
294 try:
295 has_plate = bool(self.current_plate)
296 is_initialized = self._is_current_plate_initialized()
297 has_steps = len(self.pipeline_steps) > 0
298 has_valid_selection = bool(self.selected_step) and self._find_step_index_by_selection() is not None
300 logger.debug(f"PipelineEditor: Button state update - has_plate: {has_plate}, is_initialized: {is_initialized}, has_steps: {has_steps}")
302 # Mathematical constraints:
303 # - Pipeline editing requires initialization
304 # - Step operations require steps to exist
305 # - Edit requires valid selection that maps to actual step
306 add_enabled = has_plate and is_initialized
307 load_enabled = has_plate and is_initialized
308 code_enabled = has_plate and is_initialized # Same as add button - orchestrator init is sufficient
310 logger.debug(f"PipelineEditor: Setting add_step.disabled = {not add_enabled}, load_pipeline.disabled = {not load_enabled}")
312 self.query_one("#add_step").disabled = not add_enabled
313 self.query_one("#del_step").disabled = not has_steps
314 self.query_one("#edit_step").disabled = not (has_steps and has_valid_selection)
315 self.query_one("#load_pipeline").disabled = not load_enabled
316 self.query_one("#save_pipeline").disabled = not has_steps
317 self.query_one("#code_pipeline").disabled = not code_enabled # Changed from has_steps to code_enabled
318 except Exception:
319 # Buttons might not be mounted yet
320 pass
324 async def action_add_step(self) -> None:
325 """Handle Add Step button - now triggers modal."""
327 def handle_result(result: Optional[FunctionStep]) -> None:
328 if result: # User saved new step
329 # Store the actual FunctionStep object directly (preserves memory type decorators)
330 new_steps = self.pipeline_steps + [result]
331 self.pipeline_steps = new_steps
332 self.app.current_status = f"Added step: {result.name}"
333 else:
334 self.app.current_status = "Add step cancelled"
336 # LAZY IMPORT to avoid circular import
337 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow
338 from textual.css.query import NoMatches
340 # Use window-based dual editor (follows ConfigWindow pattern)
341 try:
342 window = self.app.query_one(DualEditorWindow)
343 # Window exists, update it for new step and open
344 window.editing_step = window.pattern_manager.create_new_step()
345 window.is_new = True
346 window.on_save_callback = handle_result
347 window.original_step = window.pattern_manager.clone_pattern(window.editing_step)
348 window.open_state = True
349 except NoMatches:
350 # Expected case: window doesn't exist yet, create new one
351 window = DualEditorWindow(step_data=None, is_new=True, on_save_callback=handle_result)
352 await self.app.mount(window)
353 window.open_state = True
355 def action_delete_step(self) -> None:
356 """Handle Delete Step button - delete selected steps."""
358 # Get current selection state
359 selected_items, selection_mode = self.get_selection_state()
361 if selection_mode == "empty":
362 self.app.current_status = "No steps available for deletion"
363 return
365 # Generate description and perform deletion
366 count = len(selected_items)
367 if selection_mode == "empty":
368 desc = "No items available for deletion"
369 elif selection_mode == "all":
370 desc = f"Delete ALL {count} items"
371 elif count == 1:
372 item_name = getattr(selected_items[0], 'name', 'Unknown')
373 desc = f"Delete selected item: {item_name}"
374 else:
375 desc = f"Delete {count} selected items"
377 # Remove selected steps
378 current_steps = list(self.pipeline_steps)
379 steps_to_remove = set(getattr(item, 'name', '') for item in selected_items)
381 # Filter out selected steps
382 new_steps = [step for step in current_steps if getattr(step, 'name', '') not in steps_to_remove]
384 # Update pipeline steps (this will trigger save to plate storage)
385 self.pipeline_steps = new_steps
387 deleted_count = len(current_steps) - len(new_steps)
388 self.app.current_status = f"Deleted {deleted_count} steps"
390 def _dict_to_function_step(self, step_dict: Dict) -> FunctionStep:
391 """Convert step dict to FunctionStep object with proper data preservation."""
392 # Extract function - handle both callable and registry lookup
393 func = step_dict.get("func")
394 if func is None:
395 # Fallback to default function if missing
396 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
397 registry = RegistryService()
398 func = registry.find_default_function()
399 logger.warning(f"Step '{step_dict.get('name', 'Unknown')}' missing function, using default")
401 # Extract variable components - handle both list and string formats
402 var_components = step_dict.get("variable_components", [])
403 if isinstance(var_components, str):
404 var_components = [var_components]
405 elif not isinstance(var_components, list):
406 var_components = []
408 # Only pass variable_components if it's not empty, let FunctionStep use its default otherwise
409 step_kwargs = {
410 "func": func,
411 "name": step_dict.get("name", "Unknown Step"),
412 "group_by": step_dict.get("group_by", "")
413 }
414 if var_components: # Only add if not empty
415 step_kwargs["variable_components"] = var_components
417 return FunctionStep(**step_kwargs)
419 def _function_step_to_dict(self, step: FunctionStep) -> Dict:
420 """Convert FunctionStep object to dict with complete data preservation."""
421 return {
422 "name": step.name,
423 "type": "function",
424 "func": step.func,
425 "variable_components": step.variable_components,
426 "group_by": step.group_by
427 }
429 def _find_step_index_by_selection(self) -> Optional[int]:
430 """Find the index of the currently selected step."""
431 if not self.selected_step:
432 return None
434 # selected_step contains the step name/id
435 for i, step in enumerate(self.pipeline_steps):
436 # Now step is a FunctionStep object, not a dict
437 step_name = getattr(step, 'name', f"Step {i+1}")
438 if step_name == self.selected_step:
439 return i
440 return None
442 async def action_edit_step(self) -> None:
443 """Handle Edit Step button with proper selection and data preservation."""
445 if not self.pipeline_steps:
446 self.app.current_status = "No steps to edit"
447 return
449 # Find selected step index
450 step_index = self._find_step_index_by_selection()
451 if step_index is None:
452 self.app.current_status = "No step selected for editing"
453 return
455 step_to_edit = self.pipeline_steps[step_index]
457 def handle_result(result: Optional[FunctionStep]) -> None:
458 if result: # User saved changes
459 # Store the actual FunctionStep object directly (preserves memory type decorators)
460 updated_steps = self.pipeline_steps.copy()
461 updated_steps[step_index] = result
462 self.pipeline_steps = updated_steps
463 self.app.current_status = f"Updated step: {result.name}"
464 else:
465 self.app.current_status = "Edit step cancelled"
467 # Use the actual FunctionStep object directly (no conversion needed)
468 edit_step = step_to_edit
470 # LAZY IMPORT to avoid circular import
471 from openhcs.textual_tui.windows.dual_editor_window import DualEditorWindow
472 from textual.css.query import NoMatches
474 # Use window-based dual editor (follows ConfigWindow pattern)
475 try:
476 window = self.app.query_one(DualEditorWindow)
477 # Window exists, update it for editing existing step and open
478 window.editing_step = edit_step
479 window.is_new = False
480 window.on_save_callback = handle_result
481 window.original_step = window.pattern_manager.clone_pattern(window.editing_step)
482 window.open_state = True
483 except NoMatches:
484 # Expected case: window doesn't exist yet, create new one
485 window = DualEditorWindow(step_data=edit_step, is_new=False, on_save_callback=handle_result)
486 await self.app.mount(window)
487 window.open_state = True
489 async def action_auto_load_pipeline(self) -> None:
490 """Handle Auto button - load basic_pipeline.py automatically."""
491 if not self.current_plate:
492 self.app.current_status = "No plate selected"
493 return
495 try:
496 from pathlib import Path
498 # Find basic_pipeline.py relative to openhcs package
499 import openhcs
500 openhcs_root = Path(openhcs.__file__).parent
501 pipeline_file = openhcs_root / "tests" / "basic_pipeline.py"
503 if not pipeline_file.exists():
504 self.app.current_status = f"Pipeline file not found: {pipeline_file}"
505 return
507 # Read the file content
508 python_code = pipeline_file.read_text()
510 # Execute the code to get pipeline_steps (same as code editor logic)
511 namespace = {}
512 exec(python_code, namespace)
514 # Get the pipeline_steps from the namespace
515 if 'pipeline_steps' in namespace:
516 new_pipeline_steps = namespace['pipeline_steps']
517 # Update the pipeline with new steps
518 self.pipeline_steps = new_pipeline_steps
519 self.update_step_list()
520 self.app.current_status = f"Auto-loaded {len(new_pipeline_steps)} steps from basic_pipeline.py"
521 else:
522 raise ValueError("No 'pipeline_steps = [...]' assignment found in basic_pipeline.py")
524 except Exception as e:
525 logger.error(f"Failed to auto-load basic_pipeline.py: {e}")
526 self.app.current_status = f"Failed to auto-load pipeline: {str(e)}"
528 async def action_load_pipeline(self) -> None:
529 """Handle Load Pipeline button - load pipeline from file."""
531 if not self.current_plate:
532 self.app.current_status = "No plate selected for loading pipeline"
533 return
535 # Launch enhanced file browser for .pipeline files
536 def handle_result(result):
537 from pathlib import Path # Import at the top of the function
539 # Handle different result types from file browser
540 paths_to_load = []
542 if isinstance(result, Path):
543 # Single Path object
544 paths_to_load = [result]
545 elif isinstance(result, list) and len(result) > 0:
546 # List of paths - support multiple pipeline files
547 for item in result:
548 if isinstance(item, Path):
549 paths_to_load.append(item)
550 else:
551 paths_to_load.append(Path(item))
552 elif isinstance(result, str):
553 # String path
554 paths_to_load = [Path(result)]
556 if paths_to_load:
557 logger.debug(f"Loading {len(paths_to_load)} pipeline files")
558 self._load_multiple_pipeline_files(paths_to_load)
559 else:
560 self.app.current_status = "Load pipeline cancelled"
562 # Create file browser window for .pipeline files
563 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
564 from openhcs.textual_tui.services.file_browser_service import SelectionMode
565 from openhcs.constants.constants import Backend
566 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
568 await open_file_browser_window(
569 app=self.app,
570 file_manager=self.filemanager,
571 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES),
572 backend=Backend.DISK,
573 title="Load Pipeline (.pipeline)",
574 mode=BrowserMode.LOAD,
575 selection_mode=SelectionMode.FILES_ONLY,
576 filter_extensions=['.pipeline'],
577 cache_key=PathCacheKey.PIPELINE_FILES,
578 on_result_callback=handle_result,
579 caller_id="pipeline_editor"
580 )
582 def _load_multiple_pipeline_files(self, file_paths: List[Path]) -> None:
583 """Load and concatenate steps from multiple pipeline files."""
584 all_steps = []
585 loaded_files = []
586 failed_files = []
588 for file_path in file_paths:
589 try:
590 steps = self._load_single_pipeline_file(file_path)
591 if steps:
592 all_steps.extend(steps)
593 loaded_files.append(file_path.name)
594 logger.info(f"✅ Loaded {len(steps)} steps from {file_path.name}")
595 else:
596 failed_files.append(file_path.name)
597 except Exception as e:
598 logger.error(f"❌ Failed to load {file_path.name}: {e}")
599 failed_files.append(file_path.name)
601 if all_steps:
602 # Replace current pipeline with concatenated steps
603 self.pipeline_steps = all_steps
605 # Apply to multiple orchestrators if they are selected
606 self._apply_pipeline_to_selected_orchestrators(all_steps)
608 # Create status message
609 if len(loaded_files) == 1:
610 status = f"Loaded {len(all_steps)} steps from {loaded_files[0]}"
611 else:
612 status = f"Loaded {len(all_steps)} steps from {len(loaded_files)} files: {', '.join(loaded_files)}"
614 if failed_files:
615 status += f" (Failed: {', '.join(failed_files)})"
617 self.app.current_status = status
618 logger.info(f"🎯 Total pipeline: {len(all_steps)} steps from {len(loaded_files)} files")
619 else:
620 self.app.current_status = f"No valid pipeline steps loaded from {len(file_paths)} files"
622 def _load_single_pipeline_file(self, file_path: Path) -> List:
623 """Load pipeline steps from a single .pipeline file."""
624 import dill as pickle
625 try:
626 with open(file_path, 'rb') as f:
627 pattern = pickle.load(f)
629 if isinstance(pattern, list):
630 return pattern
631 else:
632 logger.error(f"Invalid pipeline format in {file_path.name}: expected list, got {type(pattern)}")
633 return []
634 except Exception as e:
635 logger.error(f"Failed to load pipeline from {file_path.name}: {e}")
636 raise
638 def _apply_pipeline_to_selected_orchestrators(self, pipeline_steps: List) -> None:
639 """Apply loaded pipeline to all selected orchestrators."""
640 if not self.plate_manager:
641 return
643 # Get selected orchestrators from plate manager
644 selected_items, selection_mode = self.plate_manager.get_selection_state()
646 if selection_mode == "empty" or len(selected_items) <= 1:
647 # Single or no selection - normal behavior
648 return
650 # Multiple orchestrators selected - apply pipeline to all
651 applied_count = 0
652 for item in selected_items:
653 plate_path = item['path']
654 if plate_path in self.plate_manager.orchestrators:
655 orchestrator = self.plate_manager.orchestrators[plate_path]
656 orchestrator.pipeline_definition = list(pipeline_steps)
658 # Also save to our plate pipelines storage
659 self.save_pipeline_for_plate(plate_path, list(pipeline_steps))
660 applied_count += 1
662 if applied_count > 1:
663 self.app.current_status += f" → Applied to {applied_count} orchestrators"
664 logger.info(f"Applied pipeline to {applied_count} selected orchestrators")
666 def _load_pipeline_from_file(self, file_path: Path) -> None:
667 """Load pipeline from .pipeline file (legacy single-file method)."""
668 try:
669 steps = self._load_single_pipeline_file(file_path)
670 if steps:
671 self.pipeline_steps = steps
672 self.app.current_status = f"Loaded {len(steps)} steps from {file_path.name}"
673 else:
674 self.app.current_status = f"Invalid pipeline format in {file_path.name}"
675 except Exception as e:
676 self.app.current_status = f"Failed to load pipeline: {e}"
678 async def action_save_pipeline(self) -> None:
679 """Handle Save Pipeline button - save pipeline to file."""
681 if not self.current_plate:
682 self.app.current_status = "No plate selected for saving pipeline"
683 return
685 if not self.pipeline_steps:
686 self.app.current_status = "No pipeline steps to save"
687 return
689 # Launch enhanced file browser for saving pipeline
690 def handle_result(result):
691 if result and isinstance(result, Path):
692 self._save_pipeline_to_file(result)
693 else:
694 self.app.current_status = "Save pipeline cancelled"
696 # Create file browser window for saving .pipeline files
697 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
698 from openhcs.textual_tui.services.file_browser_service import SelectionMode
699 from openhcs.constants.constants import Backend
700 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
702 # Generate default filename from plate name
703 plate_name = Path(self.current_plate).name if self.current_plate else "pipeline"
704 default_filename = f"{plate_name}.pipeline"
706 await open_file_browser_window(
707 app=self.app,
708 file_manager=self.filemanager,
709 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES),
710 backend=Backend.DISK,
711 title="Save Pipeline (.pipeline)",
712 mode=BrowserMode.SAVE,
713 selection_mode=SelectionMode.FILES_ONLY,
714 filter_extensions=['.pipeline'],
715 default_filename=default_filename,
716 cache_key=PathCacheKey.PIPELINE_FILES,
717 on_result_callback=handle_result,
718 caller_id="pipeline_editor"
719 )
721 def _save_pipeline_to_file(self, file_path: Path) -> None:
722 """Save pipeline to .pipeline file."""
723 import dill as pickle
724 try:
725 with open(file_path, 'wb') as f:
726 pickle.dump(list(self.pipeline_steps), f)
727 self.app.current_status = f"Saved pipeline to {file_path.name}"
728 except Exception as e:
729 logger.error(f"Failed to save pipeline: {e}")
730 self.app.current_status = f"Failed to save pipeline: {e}"
732 async def action_code_pipeline(self) -> None:
733 """Edit pipeline as Python code in terminal window."""
734 logger.debug("Code button pressed - opening pipeline editor")
736 if not self.current_plate:
737 self.app.current_status = "No plate selected"
738 return
740 try:
741 # Use complete pipeline steps code generation
742 from openhcs.debug.pickle_to_python import generate_complete_pipeline_steps_code
743 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher
745 # Generate complete pipeline steps code with imports
746 python_code = generate_complete_pipeline_steps_code(
747 pipeline_steps=list(self.pipeline_steps),
748 clean_mode=True
749 )
751 # Create callback to handle edited code
752 def handle_edited_code(edited_code: str):
753 logger.debug("Pipeline code edited, processing changes...")
754 try:
755 # Execute the code (it has all necessary imports)
756 namespace = {}
757 exec(edited_code, namespace)
759 # Get the pipeline_steps from the namespace
760 if 'pipeline_steps' in namespace:
761 new_pipeline_steps = namespace['pipeline_steps']
762 # Update the pipeline with new steps
763 self.pipeline_steps = new_pipeline_steps
764 self.app.current_status = f"Pipeline updated with {len(new_pipeline_steps)} steps"
765 else:
766 self.app.show_error("Parse Error", "No 'pipeline_steps = [...]' assignment found in edited code")
768 except SyntaxError as e:
769 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}")
770 except Exception as e:
771 logger.error(f"Failed to parse edited pipeline code: {e}")
772 self.app.show_error("Edit Error", f"Failed to parse pipeline code: {str(e)}")
774 # Launch terminal editor
775 launcher = TerminalLauncher(self.app)
776 await launcher.launch_editor_for_file(
777 file_content=python_code,
778 file_extension='.py',
779 on_save_callback=handle_edited_code
780 )
782 except Exception as e:
783 logger.error(f"Failed to open pipeline code editor: {e}")
784 self.app.current_status = f"Failed to open code editor: {e}"