Coverage for openhcs/textual_tui/widgets/function_list_editor.py: 0.0%
626 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""Function list editor widget - port of function_list_manager.py to Textual."""
3import logging
4from pathlib import Path
5from typing import List, Union, Dict, Any, Optional, Callable # Added Optional, Callable
6from textual.containers import ScrollableContainer, Container, Horizontal, Center
7from textual.widgets import Button, Static, Select
8from textual.app import ComposeResult
9from textual.reactive import reactive
10from textual.message import Message # Added Message
12from openhcs.processing.backends.lib_registry.registry_service import RegistryService
13from openhcs.textual_tui.services.pattern_data_manager import PatternDataManager
14from openhcs.textual_tui.widgets.function_pane import FunctionPaneWidget
15from openhcs.constants.constants import GroupBy, VariableComponents
17logger = logging.getLogger(__name__)
20class FunctionListEditorWidget(Container):
21 """
22 Scrollable function list editor.
24 Ports the function display and management logic from function_list_manager.py
25 """
27 class FunctionPatternChanged(Message):
28 """Message to indicate the function pattern has changed."""
29 pass
31 # Reactive properties for automatic UI updates
32 functions = reactive(list, recompose=True) # Structural changes (add/remove) should trigger recomposition
33 pattern_data = reactive(list, recompose=False) # The actual pattern (List or Dict)
34 is_dict_mode = reactive(False, recompose=True) # Whether we're in channel-specific mode - affects UI layout
35 selected_channel = reactive(None, recompose=True) # Currently selected channel - affects button text
36 available_channels = reactive(list) # Available channels from orchestrator
38 # Step configuration reactive properties for dynamic component selection
39 current_group_by = reactive(None, recompose=True) # Current GroupBy setting from step editor
40 current_variable_components = reactive(list, recompose=True) # Current VariableComponents list from step editor
42 def __init__(self, initial_functions: Union[List, Dict, callable, None] = None, step_identifier: str = None):
43 super().__init__()
45 # Initialize services (reuse existing business logic)
46 self.registry_service = RegistryService()
47 self.data_manager = PatternDataManager() # Not heavily used yet, but available
49 # Step identifier for cache isolation (optional, defaults to widget instance id)
50 self.step_identifier = step_identifier or f"widget_{id(self)}"
52 # Component selection cache per GroupBy (instance-specific, not shared between steps)
53 self.component_selections: Dict[GroupBy, List[str]] = {}
55 # Initialize pattern data and mode
56 self._initialize_pattern_data(initial_functions)
58 logger.debug(f"FunctionListEditorWidget initialized for step '{self.step_identifier}' with {len(self.functions)} functions, dict_mode={self.is_dict_mode}")
60 @property
61 def current_pattern(self) -> Union[List, Dict]:
62 """Get the current pattern data (for parent widgets to access)."""
63 self._update_pattern_data() # Ensure it's up to date
65 # Migration fix: Convert any integer keys to string keys for compatibility
66 # with pattern detection system which always uses string component values
67 if isinstance(self.pattern_data, dict):
68 migrated_pattern = {}
69 for key, value in self.pattern_data.items():
70 str_key = str(key)
71 migrated_pattern[str_key] = value
72 return migrated_pattern
74 return self.pattern_data
76 def watch_functions(self, new_functions: List) -> None:
77 """Watch for changes to functions and update pattern data."""
78 # Update pattern data when functions change (structural changes only)
79 self._update_pattern_data()
81 def _initialize_pattern_data(self, initial_functions: Union[List, Dict, callable, tuple, None]) -> None:
82 """Initialize pattern data and determine mode."""
83 if initial_functions is None:
84 self.pattern_data = []
85 self.is_dict_mode = False
86 self.functions = []
87 elif callable(initial_functions):
88 # Single callable: treat as [(callable, {})]
89 self.pattern_data = [(initial_functions, {})]
90 self.is_dict_mode = False
91 self.functions = [(initial_functions, {})]
92 elif isinstance(initial_functions, tuple) and len(initial_functions) == 2 and callable(initial_functions[0]) and isinstance(initial_functions[1], dict):
93 # Single tuple (callable, kwargs): treat as [(callable, kwargs)]
94 self.pattern_data = [initial_functions]
95 self.is_dict_mode = False
96 self.functions = [initial_functions]
97 elif isinstance(initial_functions, list):
98 self.pattern_data = initial_functions
99 self.is_dict_mode = False
100 self.functions = self._normalize_function_list(initial_functions)
101 elif isinstance(initial_functions, dict):
102 # Convert any integer keys to string keys for consistency
103 normalized_dict = {}
104 for key, value in initial_functions.items():
105 str_key = str(key) # Ensure all keys are strings
106 normalized_dict[str_key] = value
107 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)")
109 self.pattern_data = normalized_dict
110 self.is_dict_mode = True
111 # Set first channel as selected, or empty if no channels
112 if normalized_dict:
113 first_channel = next(iter(normalized_dict))
114 self.selected_channel = first_channel
115 self.functions = self._normalize_function_list(normalized_dict.get(first_channel, []))
116 else:
117 self.selected_channel = None
118 self.functions = []
119 else:
120 logger.warning(f"Unknown initial_functions type: {type(initial_functions)}, using empty list")
121 self.pattern_data = []
122 self.is_dict_mode = False
123 self.functions = []
126 def _normalize_function_list(self, func_list: List[Any]) -> List[tuple[Callable, Dict]]:
127 """Ensures all items in a function list are (callable, kwargs) tuples."""
128 normalized = []
129 for item in func_list:
130 if isinstance(item, tuple) and len(item) == 2 and callable(item[0]) and isinstance(item[1], dict):
131 normalized.append(item)
132 elif callable(item):
133 normalized.append((item, {}))
134 else:
135 logger.warning(f"Skipping invalid item in function list: {item}")
136 return normalized
138 def _commit_and_notify(self) -> None:
139 """Post a change message to notify parent of function pattern changes."""
140 self.post_message(self.FunctionPatternChanged())
141 logger.debug("Posted FunctionPatternChanged message to parent.")
143 def _trigger_recomposition(self) -> None:
144 """Manually trigger recomposition when needed (e.g., loading patterns, adding/removing functions)."""
145 # Force recomposition by mutating the reactive property
146 # This is needed because functions has recompose=False to prevent focus loss during typing
147 self.mutate_reactive(FunctionListEditorWidget.functions)
149 def watch_current_group_by(self, old_group_by: Optional[GroupBy], new_group_by: Optional[GroupBy]) -> None:
150 """Handle group_by changes by saving/loading component selections."""
151 if old_group_by is not None and old_group_by != GroupBy.NONE:
152 # Save current selection for old group_by
153 if self.is_dict_mode and isinstance(self.pattern_data, dict):
154 current_selection = list(self.pattern_data.keys())
155 self.component_selections[old_group_by] = current_selection
156 logger.debug(f"Step '{self.step_identifier}': Saved selection for {old_group_by.value}: {current_selection}")
158 # Note: We don't automatically load selection for new_group_by here
159 # because the dialog will handle loading from cache when opened
160 logger.debug(f"Group by changed from {old_group_by} to {new_group_by}")
162 def _update_pattern_data(self) -> None:
163 """Update pattern_data based on current functions and mode."""
164 if self.is_dict_mode and self.selected_channel is not None:
165 # Save current functions to the selected channel
166 if not isinstance(self.pattern_data, dict):
167 self.pattern_data = {}
168 logger.debug(f"Saving {len(self.functions)} functions to channel {self.selected_channel}")
169 self.pattern_data[self.selected_channel] = self.functions.copy() # Make a copy to avoid reference issues
170 else:
171 # List mode - pattern_data is just the functions list
172 self.pattern_data = self.functions
176 def _switch_to_channel(self, channel: Any) -> None:
177 """Switch to editing functions for a specific channel."""
178 if not self.is_dict_mode:
179 return
181 # Save current functions first
182 old_channel = self.selected_channel
183 logger.debug(f"Switching from channel {old_channel} to {channel}")
184 logger.debug(f"Current functions before save: {len(self.functions)} functions")
186 self._update_pattern_data()
188 # Verify the save worked
189 if old_channel and isinstance(self.pattern_data, dict):
190 saved_functions = self.pattern_data.get(old_channel, [])
191 logger.debug(f"Saved {len(saved_functions)} functions to channel {old_channel}")
193 # Switch to new channel
194 self.selected_channel = channel
195 if isinstance(self.pattern_data, dict):
196 self.functions = self.pattern_data.get(channel, [])
197 logger.debug(f"Loaded {len(self.functions)} functions for channel {channel}")
198 else:
199 self.functions = []
201 # Update button text to show new channel
202 self._refresh_component_button()
204 # Channel switch will automatically trigger recomposition via reactive system
206 def _add_channel_to_pattern(self, channel: Any) -> None:
207 """Add a new channel (converts to dict mode if needed)."""
208 if not self.is_dict_mode:
209 # Convert to dict mode
210 self.pattern_data = {channel: self.functions}
211 self.is_dict_mode = True
212 self.selected_channel = channel
213 else:
214 # Add new channel with empty functions
215 if not isinstance(self.pattern_data, dict):
216 self.pattern_data = {}
217 self.pattern_data[channel] = []
218 self.selected_channel = channel
219 self.functions = []
221 def _remove_current_channel(self) -> None:
222 """Remove the currently selected channel."""
223 if not self.is_dict_mode or self.selected_channel is None:
224 return
226 if isinstance(self.pattern_data, dict):
227 new_pattern = self.pattern_data.copy()
228 if self.selected_channel in new_pattern:
229 del new_pattern[self.selected_channel]
231 if len(new_pattern) == 0:
232 # Revert to list mode
233 self.pattern_data = []
234 self.is_dict_mode = False
235 self.selected_channel = None
236 self.functions = []
237 else:
238 # Switch to first remaining channel
239 self.pattern_data = new_pattern
240 first_channel = next(iter(new_pattern))
241 self.selected_channel = first_channel
242 self.functions = new_pattern[first_channel]
244 def compose(self) -> ComposeResult:
245 """Compose the function list using the common interface pattern."""
246 from textual.containers import Vertical
248 with Vertical():
249 # Fixed header with title
250 yield Static("[bold]Functions[/bold]")
252 # Button row - takes minimal height needed for buttons
253 with Horizontal(id="function_list_header") as button_row:
254 button_row.styles.height = "auto" # CRITICAL: Take only needed height
256 # Empty space (flex-grows)
257 yield Static("")
259 # Centered main button group
260 with Horizontal() as main_button_group:
261 main_button_group.styles.width = "auto"
262 yield Button("Add", id="add_function_btn", compact=True)
263 yield Button("Load", id="load_func_btn", compact=True)
264 yield Button("Save As", id="save_func_as_btn", compact=True)
265 yield Button("Code", id="edit_vim_btn", compact=True)
267 # Component selection button (dynamic based on group_by setting)
268 component_text = self._get_component_button_text()
269 component_button = Button(component_text, id="component_btn", compact=True)
270 component_button.disabled = self._is_component_button_disabled()
271 yield component_button
273 # Channel navigation buttons (only in dict mode with multiple channels)
274 if self.is_dict_mode and isinstance(self.pattern_data, dict) and len(self.pattern_data) > 1:
275 yield Button("<", id="prev_channel_btn", compact=True)
276 yield Button(">", id="next_channel_btn", compact=True)
278 # Empty space (flex-grows)
279 yield Static("")
281 # Scrollable content area - expands to fill ALL remaining vertical space
282 with ScrollableContainer(id="function_list_content") as container:
283 container.styles.height = "1fr" # CRITICAL: Fill remaining space
285 if not self.functions:
286 content = Static("No functions defined. Click 'Add Function' to begin.")
287 content.styles.width = "100%"
288 content.styles.height = "100%"
289 yield content
290 else:
291 for i, func_item in enumerate(self.functions):
292 pane = FunctionPaneWidget(func_item, i)
293 pane.styles.width = "100%"
294 pane.styles.height = "auto" # CRITICAL: Only take height needed for content
295 yield pane
299 async def on_button_pressed(self, event: Button.Pressed) -> None:
300 """Handle button presses."""
301 logger.debug(f"🔍 BUTTON: Button pressed: {event.button.id}")
303 if event.button.id == "add_function_btn":
304 logger.debug(f"🔍 BUTTON: Add function button pressed")
305 await self._add_function()
306 elif event.button.id == "load_func_btn":
307 logger.debug(f"🔍 BUTTON: Load func button pressed - calling _load_func()")
308 await self._load_func()
309 elif event.button.id == "save_func_as_btn":
310 logger.debug(f"🔍 BUTTON: Save func as button pressed")
311 await self._save_func_as()
312 elif event.button.id == "edit_vim_btn":
313 logger.debug(f"🔍 BUTTON: Edit vim button pressed")
314 self._edit_in_vim()
315 elif event.button.id == "component_btn":
316 logger.debug(f"🔍 BUTTON: Component button pressed")
317 await self._show_component_selection_dialog()
318 elif event.button.id == "prev_channel_btn":
319 logger.debug(f"🔍 BUTTON: Previous channel button pressed")
320 self._navigate_channel(-1)
321 elif event.button.id == "next_channel_btn":
322 logger.debug(f"🔍 BUTTON: Next channel button pressed")
323 self._navigate_channel(1)
324 else:
325 logger.warning(f"🔍 BUTTON: Unknown button pressed: {event.button.id}")
329 def on_function_pane_widget_parameter_changed(self, event: Message) -> None:
330 """Handle parameter change message from FunctionPaneWidget."""
331 if hasattr(event, 'index') and hasattr(event, 'param_name') and hasattr(event, 'value'):
332 if 0 <= event.index < len(self.functions):
333 # Update function kwargs with proper type conversion
334 func, kwargs = self.functions[event.index]
335 new_kwargs = kwargs.copy()
337 # Convert value to proper type based on function signature
338 converted_value = event.value
339 try:
340 from openhcs.textual_tui.widgets.shared.signature_analyzer import SignatureAnalyzer
341 from enum import Enum
343 param_info = SignatureAnalyzer.analyze(func)
344 if event.param_name in param_info:
345 param_details = param_info[event.param_name]
346 param_type = param_details.param_type
347 is_required = param_details.is_required
348 default_value = param_details.default_value
350 # Handle empty strings - always convert to None or default value
351 if isinstance(event.value, str) and event.value.strip() == "":
352 # Empty parameter - use default value (None for required params)
353 converted_value = default_value
354 elif isinstance(event.value, str) and event.value.strip() != "":
355 # Non-empty string - convert to proper type
356 if param_type == float:
357 converted_value = float(event.value)
358 elif param_type == int:
359 converted_value = int(event.value)
360 elif param_type == bool:
361 converted_value = event.value.lower() in ('true', '1', 'yes', 'on')
362 elif hasattr(param_type, '__bases__') and Enum in param_type.__bases__:
363 converted_value = param_type(event.value)
364 except (ValueError, TypeError, AttributeError) as e:
365 logger.warning(f"Failed to convert parameter '{event.param_name}' value '{event.value}': {e}")
366 converted_value = event.value # Keep original value on conversion failure
368 new_kwargs[event.param_name] = converted_value
370 # Update functions list WITHOUT triggering recomposition (to prevent focus loss)
371 # We modify the underlying list directly instead of assigning a new list
372 self.functions[event.index] = (func, new_kwargs)
374 # Manually update pattern data
375 self._update_pattern_data()
377 # Notify parent
378 self.post_message(self.FunctionPatternChanged())
379 logger.debug(f"Updated parameter {event.param_name}={converted_value} (type: {type(converted_value)}) for function {event.index}")
381 async def on_function_pane_widget_change_function(self, event: Message) -> None:
382 """Handle change function message from FunctionPaneWidget."""
383 if hasattr(event, 'index') and 0 <= event.index < len(self.functions):
384 await self._change_function(event.index)
386 def on_function_pane_widget_remove_function(self, event: Message) -> None:
387 """Handle remove function message from FunctionPaneWidget."""
388 if hasattr(event, 'index') and 0 <= event.index < len(self.functions):
389 new_functions = self.functions[:event.index] + self.functions[event.index+1:]
390 self.functions = new_functions
391 self._commit_and_notify()
392 logger.debug(f"Removed function at index {event.index}")
393 else:
394 logger.warning(f"Invalid index for remove function: {getattr(event, 'index', 'N/A')}")
396 async def on_function_pane_widget_add_function(self, event: Message) -> None:
397 """Handle add function message from FunctionPaneWidget."""
398 if hasattr(event, 'insert_index'):
399 insert_index = min(event.insert_index, len(self.functions)) # Clamp to valid range
400 await self._add_function_at_index(insert_index)
401 else:
402 logger.warning(f"Invalid add function event: missing insert_index")
404 def on_function_pane_widget_move_function(self, event: Message) -> None:
405 """Handle move function message from FunctionPaneWidget."""
406 if not (hasattr(event, 'index') and hasattr(event, 'direction')):
407 return
409 index = event.index
410 direction = event.direction
412 if not (0 <= index < len(self.functions)):
413 return
415 new_index = index + direction
416 if not (0 <= new_index < len(self.functions)):
417 return
419 new_functions = self.functions.copy()
420 new_functions[index], new_functions[new_index] = new_functions[new_index], new_functions[index]
421 self.functions = new_functions
422 self._commit_and_notify()
423 logger.debug(f"Moved function from index {index} to {new_index}")
425 async def _add_function(self) -> None:
426 """Add a new function to the end of the list."""
427 await self._add_function_at_index(len(self.functions))
429 async def _add_function_at_index(self, insert_index: int) -> None:
430 """Add a new function at the specified index."""
431 from openhcs.textual_tui.windows import FunctionSelectorWindow
432 from textual.css.query import NoMatches
434 def handle_function_selection(selected_function: Optional[Callable]) -> None:
435 if selected_function:
436 # Insert function at the specified index
437 new_functions = self.functions.copy()
438 new_functions.insert(insert_index, (selected_function, {}))
439 self.functions = new_functions
440 self._commit_and_notify()
441 logger.debug(f"Added function: {selected_function.__name__} at index {insert_index}")
443 # Use window-based function selector (follows ConfigWindow pattern)
444 try:
445 window = self.app.query_one(FunctionSelectorWindow)
446 # Window exists, update it and open
447 window.on_result_callback = handle_function_selection
448 window.open_state = True
449 except NoMatches:
450 # Expected case: window doesn't exist yet, create new one
451 window = FunctionSelectorWindow(on_result_callback=handle_function_selection)
452 await self.app.mount(window)
453 window.open_state = True
455 async def _change_function(self, index: int) -> None:
456 """Change function at specified index."""
457 from openhcs.textual_tui.windows import FunctionSelectorWindow
458 from textual.css.query import NoMatches
460 if 0 <= index < len(self.functions):
461 current_func, _ = self.functions[index]
463 def handle_function_selection(selected_function: Optional[Callable]) -> None:
464 if selected_function:
465 # Replace function but keep existing kwargs structure
466 new_functions = self.functions.copy()
467 new_functions[index] = (selected_function, {})
468 self.functions = new_functions
469 self._commit_and_notify()
470 logger.debug(f"Changed function at index {index} to: {selected_function.__name__}")
472 # Use window-based function selector (follows ConfigWindow pattern)
473 try:
474 window = self.app.query_one(FunctionSelectorWindow)
475 # Window exists, update it and open
476 window.current_function = current_func
477 window.on_result_callback = handle_function_selection
478 window.open_state = True
479 except NoMatches:
480 # Expected case: window doesn't exist yet, create new one
481 window = FunctionSelectorWindow(current_function=current_func, on_result_callback=handle_function_selection)
482 await self.app.mount(window)
483 window.open_state = True
485 def _commit_and_notify(self) -> None:
486 """Commit changes and notify parent of function pattern change."""
487 # Update pattern data before notifying
488 self._update_pattern_data()
489 # Post message to notify parent (DualEditorScreen) of changes
490 self.post_message(self.FunctionPatternChanged())
492 async def _load_func(self) -> None:
493 """Load function pattern from .func file."""
494 logger.debug(f"🔍 LOAD FUNC: _load_func() called - starting file browser...")
496 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
497 from openhcs.constants.constants import Backend
498 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
500 def handle_result(result):
501 logger.debug(f"🔍 LOAD FUNC: File browser callback received result: {result} (type: {type(result)})")
503 # Handle both single Path and list of Paths (disable multi-loading, take first only)
504 file_path = None
505 if isinstance(result, Path):
506 file_path = result
507 logger.debug(f"🔍 LOAD FUNC: Single Path received: {file_path}")
508 elif isinstance(result, list) and len(result) > 0:
509 file_path = result[0] # Take only the first file (disable multi-loading)
510 if len(result) > 1:
511 logger.warning(f"🔍 LOAD FUNC: Multiple files selected, using only first: {file_path}")
512 else:
513 logger.debug(f"🔍 LOAD FUNC: List with single Path received: {file_path}")
515 if file_path and isinstance(file_path, Path):
516 logger.debug(f"🔍 LOAD FUNC: Valid Path extracted, calling _load_pattern_from_file({file_path})")
517 self._load_pattern_from_file(file_path)
518 else:
519 logger.warning(f"🔍 LOAD FUNC: No valid Path found in result: {result}")
521 # Use window-based file browser
522 logger.debug(f"🔍 LOAD FUNC: Opening file browser window...")
523 from openhcs.textual_tui.services.file_browser_service import SelectionMode
524 await open_file_browser_window(
525 app=self.app,
526 file_manager=self.app.filemanager,
527 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS),
528 backend=Backend.DISK,
529 title="Load Function Pattern (.func)",
530 mode=BrowserMode.LOAD,
531 selection_mode=SelectionMode.FILES_ONLY,
532 filter_extensions=['.func'],
533 cache_key=PathCacheKey.FUNCTION_PATTERNS,
534 on_result_callback=handle_result
535 )
536 logger.debug(f"🔍 LOAD FUNC: File browser window opened, waiting for user selection...")
538 async def _save_func_as(self) -> None:
539 """Save function pattern to .func file."""
540 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
541 from openhcs.constants.constants import Backend
542 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
544 def handle_result(result):
545 if result and isinstance(result, Path):
546 self._save_pattern_to_file(result)
548 # Use window-based file browser
549 from openhcs.textual_tui.services.file_browser_service import SelectionMode
550 await open_file_browser_window(
551 app=self.app,
552 file_manager=self.app.filemanager,
553 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS),
554 backend=Backend.DISK,
555 title="Save Function Pattern (.func)",
556 mode=BrowserMode.SAVE,
557 selection_mode=SelectionMode.FILES_ONLY,
558 filter_extensions=['.func'],
559 default_filename="pattern.func",
560 cache_key=PathCacheKey.FUNCTION_PATTERNS,
561 on_result_callback=handle_result
562 )
564 def _load_pattern_from_file(self, file_path: Path) -> None:
565 """Load pattern from .func file."""
566 logger.debug(f"🔍 LOAD FUNC: _load_pattern_from_file called with: {file_path}")
567 logger.debug(f"🔍 LOAD FUNC: File exists: {file_path.exists()}")
568 logger.debug(f"🔍 LOAD FUNC: File size: {file_path.stat().st_size if file_path.exists() else 'N/A'} bytes")
570 import dill as pickle
571 try:
572 logger.debug(f"🔍 LOAD FUNC: Opening file for reading...")
573 with open(file_path, 'rb') as f:
574 pattern = pickle.load(f)
575 logger.debug(f"🔍 LOAD FUNC: Successfully loaded pattern from pickle: {pattern}")
576 logger.debug(f"🔍 LOAD FUNC: Pattern type: {type(pattern)}")
578 # Log current state before loading
579 logger.debug(f"🔍 LOAD FUNC: BEFORE - functions: {len(self.functions)} items")
580 logger.debug(f"🔍 LOAD FUNC: BEFORE - is_dict_mode: {self.is_dict_mode}")
581 logger.debug(f"🔍 LOAD FUNC: BEFORE - selected_channel: {self.selected_channel}")
583 # Process pattern and create NEW objects (like add button does)
584 # This is crucial - reactive system only triggers on new object assignment
585 if pattern is None:
586 new_pattern_data = []
587 new_is_dict_mode = False
588 new_functions = []
589 new_selected_channel = None
590 elif callable(pattern):
591 # Single callable: treat as [(callable, {})]
592 new_pattern_data = [(pattern, {})]
593 new_is_dict_mode = False
594 new_functions = [(pattern, {})]
595 new_selected_channel = None
596 elif isinstance(pattern, tuple) and len(pattern) == 2 and callable(pattern[0]) and isinstance(pattern[1], dict):
597 # Single tuple (callable, kwargs): treat as [(callable, kwargs)]
598 new_pattern_data = [pattern]
599 new_is_dict_mode = False
600 new_functions = [pattern]
601 new_selected_channel = None
602 elif isinstance(pattern, list):
603 new_pattern_data = pattern
604 new_is_dict_mode = False
605 new_functions = self._normalize_function_list(pattern)
606 new_selected_channel = None
607 elif isinstance(pattern, dict):
608 # Convert any integer keys to string keys for consistency
609 normalized_dict = {}
610 for key, value in pattern.items():
611 str_key = str(key)
612 normalized_dict[str_key] = value
613 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)")
615 new_pattern_data = normalized_dict
616 new_is_dict_mode = True
617 # Set first channel as selected, or None if no channels
618 if normalized_dict:
619 first_channel = next(iter(normalized_dict))
620 new_selected_channel = first_channel
621 new_functions = self._normalize_function_list(normalized_dict.get(first_channel, []))
622 else:
623 new_selected_channel = None
624 new_functions = []
625 else:
626 logger.warning(f"Unknown pattern type: {type(pattern)}, using empty list")
627 new_pattern_data = []
628 new_is_dict_mode = False
629 new_functions = []
630 new_selected_channel = None
632 # Assign NEW objects to reactive properties (like add button does)
633 logger.debug(f"🔍 LOAD FUNC: Assigning new values to reactive properties...")
634 logger.debug(f"🔍 LOAD FUNC: new_pattern_data: {new_pattern_data}")
635 logger.debug(f"🔍 LOAD FUNC: new_is_dict_mode: {new_is_dict_mode}")
636 logger.debug(f"🔍 LOAD FUNC: new_functions: {len(new_functions)} items: {new_functions}")
637 logger.debug(f"🔍 LOAD FUNC: new_selected_channel: {new_selected_channel}")
639 self.pattern_data = new_pattern_data
640 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned pattern_data")
642 self.is_dict_mode = new_is_dict_mode
643 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned is_dict_mode")
645 self.functions = new_functions # This triggers reactive system!
646 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned functions - THIS SHOULD TRIGGER REACTIVE SYSTEM!")
648 self.selected_channel = new_selected_channel
649 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned selected_channel")
651 logger.debug(f"🔍 LOAD FUNC: Calling _commit_and_notify...")
652 self._commit_and_notify()
654 # Final state logging
655 logger.debug(f"🔍 LOAD FUNC: FINAL - functions: {len(self.functions)} items")
656 logger.debug(f"🔍 LOAD FUNC: FINAL - is_dict_mode: {self.is_dict_mode}")
657 logger.debug(f"🔍 LOAD FUNC: FINAL - selected_channel: {self.selected_channel}")
658 logger.debug(f"🔍 LOAD FUNC: ✅ Successfully completed loading process!")
660 except Exception as e:
661 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Failed to load pattern: {e}")
662 import traceback
663 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Traceback: {traceback.format_exc()}")
665 def _save_pattern_to_file(self, file_path: Path) -> None:
666 """Save pattern to .func file."""
667 import pickle
668 try:
669 with open(file_path, 'wb') as f:
670 pickle.dump(self.current_pattern, f)
671 except Exception as e:
672 logger.error(f"Failed to save pattern: {e}")
674 def _edit_in_vim(self) -> None:
675 """Edit function pattern as Python code in terminal window."""
676 logger.debug("Edit button pressed - opening terminal editor")
678 try:
679 # Use debug module's pattern formatting with proper imports
680 from openhcs.debug.pickle_to_python import generate_readable_function_repr
681 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher
683 # Update pattern data first
684 self._update_pattern_data()
686 # Generate complete Python code with imports (like debug module does)
687 python_code = self._generate_complete_python_code()
689 # Create terminal launcher
690 launcher = TerminalLauncher(self.app)
692 # Launch editor in terminal window with callback
693 self.app.call_later(
694 launcher.launch_editor_for_file,
695 python_code,
696 '.py',
697 self._handle_edited_pattern
698 )
700 except Exception as e:
701 logger.error(f"Failed to launch terminal editor: {e}")
702 self.app.show_error("Edit Error", f"Failed to launch terminal editor: {str(e)}")
704 def _handle_edited_pattern(self, edited_code: str) -> None:
705 """Handle the edited pattern code from terminal editor."""
706 try:
707 # Execute the code (it has all necessary imports)
708 namespace = {}
709 exec(edited_code, namespace)
711 # Get the pattern from the namespace
712 if 'pattern' in namespace:
713 new_pattern = namespace['pattern']
714 self._apply_edited_pattern(new_pattern)
715 else:
716 self.app.show_error("Parse Error", "No 'pattern = ...' assignment found in edited code")
718 except SyntaxError as e:
719 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}")
720 except Exception as e:
721 logger.error(f"Failed to parse edited pattern: {e}")
722 self.app.show_error("Edit Error", f"Failed to parse edited pattern: {str(e)}")
724 def _generate_complete_python_code(self) -> str:
725 """Generate complete Python code with imports (following debug module approach)."""
726 # Use complete function pattern code generation from pickle_to_python
727 from openhcs.debug.pickle_to_python import generate_complete_function_pattern_code
729 # Disable clean_mode to preserve all parameters when same function appears multiple times
730 # This prevents parsing issues when the same function has different parameter sets
731 return generate_complete_function_pattern_code(self.pattern_data, clean_mode=False)
735 def _apply_edited_pattern(self, new_pattern):
736 """Apply the edited pattern back to the TUI."""
737 try:
738 if self.is_dict_mode:
739 if isinstance(new_pattern, dict):
740 self.pattern_data = new_pattern
741 # Update current channel if it exists in new pattern
742 if self.selected_channel and self.selected_channel in new_pattern:
743 self.functions = self._normalize_function_list(new_pattern[self.selected_channel])
744 else:
745 # Select first channel
746 if new_pattern:
747 self.selected_channel = next(iter(new_pattern))
748 self.functions = self._normalize_function_list(new_pattern[self.selected_channel])
749 else:
750 self.functions = []
751 else:
752 raise ValueError("Expected dict pattern for dict mode")
753 else:
754 if isinstance(new_pattern, list):
755 self.pattern_data = new_pattern
756 self.functions = self._normalize_function_list(new_pattern)
757 elif callable(new_pattern):
758 # Single callable: treat as [(callable, {})]
759 self.pattern_data = [(new_pattern, {})]
760 self.functions = [(new_pattern, {})]
761 elif isinstance(new_pattern, tuple) and len(new_pattern) == 2 and callable(new_pattern[0]) and isinstance(new_pattern[1], dict):
762 # Single tuple (callable, kwargs): treat as [(callable, kwargs)]
763 self.pattern_data = [new_pattern]
764 self.functions = [new_pattern]
765 else:
766 raise ValueError(f"Expected list, callable, or (callable, dict) tuple pattern for list mode, got {type(new_pattern)}")
768 # Refresh the UI and notify of changes
769 self.refresh()
770 self._commit_and_notify()
772 except Exception as e:
773 self.app.show_error("Apply Error", f"Failed to apply edited pattern: {str(e)}")
775 def _generate_pattern_python_code(self) -> str:
776 """Generate Python code representation of the current pattern."""
777 # Update pattern data first
778 self._update_pattern_data()
780 # Use centralized code generation from debug module to ensure consistent collision resolution
781 from openhcs.debug.pickle_to_python import generate_complete_function_pattern_code
782 return generate_complete_function_pattern_code(self.pattern_data)
786 def _get_component_button_text(self) -> str:
787 """Get text for the component selection button based on current group_by setting."""
788 if self.current_group_by is None or self.current_group_by == GroupBy.NONE:
789 return "Component: None"
791 # Use group_by.value.value.title() for dynamic component type display
792 # GroupBy.value is now a VariableComponents enum, so we need .value to get the string
793 component_type = self.current_group_by.value.value.title()
795 if self.is_dict_mode and self.selected_channel is not None:
796 # Try to get metadata name for the selected component
797 display_name = self._get_component_display_name(self.selected_channel)
798 return f"{component_type}: {display_name}"
799 return f"{component_type}: None"
801 def _get_component_display_name(self, component_key: str) -> str:
802 """Get display name for component key, using metadata if available."""
803 # Try to get metadata name from orchestrator
804 orchestrator = self._get_current_orchestrator()
805 if orchestrator and self.current_group_by:
806 try:
807 metadata_name = orchestrator.metadata_cache.get_component_metadata(self.current_group_by, component_key)
808 if metadata_name:
809 return metadata_name
810 except Exception as e:
811 logger.debug(f"Could not get metadata for {self.current_group_by.value} {component_key}: {e}")
813 # Fallback to component key
814 return component_key
816 def _refresh_component_button(self) -> None:
817 """Refresh the component button text and state."""
818 try:
819 component_button = self.query_one("#component_btn", Button)
820 component_button.label = self._get_component_button_text()
821 component_button.disabled = self._is_component_button_disabled()
822 except Exception as e:
823 logger.debug(f"Could not refresh component button: {e}")
825 def _is_component_button_disabled(self) -> bool:
826 """Check if component selection button should be disabled."""
827 return (
828 self.current_group_by is None or
829 self.current_group_by == GroupBy.NONE or
830 (self.current_variable_components and
831 self.current_group_by.value in [vc.value for vc in self.current_variable_components])
832 )
834 async def _show_component_selection_dialog(self) -> None:
835 """Show the component selection dialog for the current group_by setting."""
836 try:
837 # Check if component selection is disabled
838 if self._is_component_button_disabled():
839 logger.debug("Component selection is disabled")
840 return
842 # Get available components from orchestrator using current group_by
843 orchestrator = self._get_current_orchestrator()
844 if orchestrator is None:
845 logger.warning("No orchestrator available for component selection")
846 return
848 available_components = orchestrator.get_component_keys(self.current_group_by)
849 if not available_components:
850 component_type = self.current_group_by.value
851 logger.warning(f"No {component_type} values found in current plate")
852 return
854 # Get currently selected components from cache or current pattern
855 if self.current_group_by in self.component_selections:
856 # Use cached selection for this group_by
857 selected_components = self.component_selections[self.current_group_by]
858 logger.debug(f"Step '{self.step_identifier}': Using cached selection for {self.current_group_by.value}: {selected_components}")
859 elif self.is_dict_mode and isinstance(self.pattern_data, dict):
860 # Fallback to current pattern keys
861 selected_components = list(self.pattern_data.keys())
862 else:
863 selected_components = []
865 # Show window with dynamic component type
866 from openhcs.textual_tui.windows import GroupBySelectorWindow
867 from textual.css.query import NoMatches
869 def handle_selection(result_components):
870 if result_components is not None:
871 self._update_components(result_components)
873 # Use window-based group-by selector (follows ConfigWindow pattern)
874 try:
875 window = self.app.query_one(GroupBySelectorWindow)
876 # Window exists, update it and open
877 window.available_channels = available_components
878 window.selected_channels = selected_components
879 window.component_type = self.current_group_by.value
880 window.orchestrator = orchestrator
881 window.on_result_callback = handle_selection
882 window.open_state = True
883 except NoMatches:
884 # Expected case: window doesn't exist yet, create new one
885 window = GroupBySelectorWindow(
886 available_channels=available_components,
887 selected_channels=selected_components,
888 on_result_callback=handle_selection,
889 component_type=self.current_group_by.value,
890 orchestrator=orchestrator
891 )
892 await self.app.mount(window)
893 window.open_state = True
895 except Exception as e:
896 component_type = self.current_group_by.value if self.current_group_by else "component"
897 logger.error(f"Failed to show {component_type} selection dialog: {e}")
899 def _update_components(self, new_components: List[str]) -> None:
900 """
901 Update the pattern based on new component selection.
903 Uses string component keys directly to match the pattern detection system.
904 Pattern detection always returns string component values (e.g., '1', '2', '3') when
905 grouping by any component, so function patterns use string keys for consistency.
906 """
907 if not new_components:
908 # No components selected - revert to list mode
909 if self.is_dict_mode:
910 # Save current functions to list mode
911 self.pattern_data = self.functions
912 self.is_dict_mode = False
913 self.selected_channel = None
914 logger.debug("Reverted to list mode (no components selected)")
915 else:
916 # Use component strings directly - no conversion needed
917 component_keys = new_components
919 # Components selected - ensure dict mode
920 if not self.is_dict_mode:
921 # Convert to dict mode
922 current_functions = self.functions
923 self.pattern_data = {component_keys[0]: current_functions}
924 self.is_dict_mode = True
925 self.selected_channel = component_keys[0]
927 # Add other components with empty functions
928 for component_key in component_keys[1:]:
929 self.pattern_data[component_key] = []
930 else:
931 # Already in dict mode - update components
932 old_pattern = self.pattern_data.copy() if isinstance(self.pattern_data, dict) else {}
933 new_pattern = {}
935 # Keep existing functions for components that remain
936 for component_key in component_keys:
937 # Check both string and integer keys for backward compatibility
938 if component_key in old_pattern:
939 new_pattern[component_key] = old_pattern[component_key]
940 else:
941 # Try integer key for backward compatibility
942 try:
943 component_int = int(component_key)
944 if component_int in old_pattern:
945 new_pattern[component_key] = old_pattern[component_int]
946 else:
947 new_pattern[component_key] = []
948 except ValueError:
949 new_pattern[component_key] = []
951 self.pattern_data = new_pattern
953 # Update selected component if needed
954 if self.selected_channel not in component_keys:
955 self.selected_channel = component_keys[0] if component_keys else None
956 if self.selected_channel:
957 self.functions = new_pattern.get(self.selected_channel, [])
959 # Save selection to cache for current group_by
960 if self.current_group_by is not None and self.current_group_by != GroupBy.NONE:
961 self.component_selections[self.current_group_by] = new_components
962 logger.debug(f"Step '{self.step_identifier}': Cached selection for {self.current_group_by.value}: {new_components}")
964 self._commit_and_notify()
965 self._refresh_component_button()
966 logger.debug(f"Updated components: {new_components}")
968 def _navigate_channel(self, direction: int) -> None:
969 """Navigate to next/previous channel (with looping)."""
970 if not self.is_dict_mode or not isinstance(self.pattern_data, dict):
971 return
973 channels = sorted(self.pattern_data.keys())
974 if len(channels) <= 1:
975 return
977 try:
978 current_index = channels.index(self.selected_channel)
979 new_index = (current_index + direction) % len(channels)
980 new_channel = channels[new_index]
982 self._switch_to_channel(new_channel)
983 logger.debug(f"Navigated to channel {new_channel}")
984 except (ValueError, IndexError):
985 logger.warning(f"Failed to navigate channels: current={self.selected_channel}, channels={channels}")
989 def _get_current_orchestrator(self):
990 """Get the current orchestrator from the app."""
991 try:
992 # Get from app - PlateManagerWidget is now in PipelinePlateWindow
993 if hasattr(self.app, 'query_one'):
994 from openhcs.textual_tui.windows import PipelinePlateWindow
995 from openhcs.textual_tui.widgets.plate_manager import PlateManagerWidget
997 # Try to find the PipelinePlateWindow first
998 try:
999 pipeline_plate_window = self.app.query_one(PipelinePlateWindow)
1000 plate_manager = pipeline_plate_window.plate_widget
1001 except:
1002 # Fallback: try to find PlateManagerWidget directly in the app
1003 plate_manager = self.app.query_one(PlateManagerWidget)
1005 # Use selected_plate (not current_plate!)
1006 selected_plate = plate_manager.selected_plate
1007 if selected_plate and selected_plate in plate_manager.orchestrators:
1008 orchestrator = plate_manager.orchestrators[selected_plate]
1009 if not orchestrator.is_initialized():
1010 orchestrator.initialize()
1011 return orchestrator
1012 return None
1013 except Exception as e:
1014 logger.error(f"Failed to get orchestrator: {e}")
1015 return None