Coverage for openhcs/textual_tui/widgets/function_list_editor.py: 0.0%
658 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""Function list editor widget - port of function_list_manager.py to Textual."""
3import logging
4from pathlib import Path
5from typing import List, Union, Dict, Any, Optional, Callable # Added Optional, Callable
6from textual.containers import ScrollableContainer, Container, Horizontal, Center
7from textual.widgets import Button, Static, Select
8from textual.app import ComposeResult
9from textual.reactive import reactive
10from textual.message import Message # Added Message
12from openhcs.textual_tui.services.function_registry_service import FunctionRegistryService
13from openhcs.textual_tui.services.pattern_data_manager import PatternDataManager
14from openhcs.textual_tui.widgets.function_pane import FunctionPaneWidget
15from openhcs.constants.constants import GroupBy, VariableComponents
17logger = logging.getLogger(__name__)
20class FunctionListEditorWidget(Container):
21 """
22 Scrollable function list editor.
24 Ports the function display and management logic from function_list_manager.py
25 """
27 class FunctionPatternChanged(Message):
28 """Message to indicate the function pattern has changed."""
29 pass
31 # Reactive properties for automatic UI updates
32 functions = reactive(list, recompose=True) # Structural changes (add/remove) should trigger recomposition
33 pattern_data = reactive(list, recompose=False) # The actual pattern (List or Dict)
34 is_dict_mode = reactive(False, recompose=True) # Whether we're in channel-specific mode - affects UI layout
35 selected_channel = reactive(None, recompose=True) # Currently selected channel - affects button text
36 available_channels = reactive(list) # Available channels from orchestrator
38 # Step configuration reactive properties for dynamic component selection
39 current_group_by = reactive(None, recompose=True) # Current GroupBy setting from step editor
40 current_variable_components = reactive(list, recompose=True) # Current VariableComponents list from step editor
42 def __init__(self, initial_functions: Union[List, Dict, callable, None] = None, step_identifier: str = None):
43 super().__init__()
45 # Initialize services (reuse existing business logic)
46 self.registry_service = FunctionRegistryService()
47 self.data_manager = PatternDataManager() # Not heavily used yet, but available
49 # Step identifier for cache isolation (optional, defaults to widget instance id)
50 self.step_identifier = step_identifier or f"widget_{id(self)}"
52 # Component selection cache per GroupBy (instance-specific, not shared between steps)
53 self.component_selections: Dict[GroupBy, List[str]] = {}
55 # Initialize pattern data and mode
56 self._initialize_pattern_data(initial_functions)
58 logger.debug(f"FunctionListEditorWidget initialized for step '{self.step_identifier}' with {len(self.functions)} functions, dict_mode={self.is_dict_mode}")
60 @property
61 def current_pattern(self) -> Union[List, Dict]:
62 """Get the current pattern data (for parent widgets to access)."""
63 self._update_pattern_data() # Ensure it's up to date
65 # Migration fix: Convert any integer keys to string keys for compatibility
66 # with pattern detection system which always uses string component values
67 if isinstance(self.pattern_data, dict):
68 migrated_pattern = {}
69 for key, value in self.pattern_data.items():
70 str_key = str(key)
71 migrated_pattern[str_key] = value
72 return migrated_pattern
74 return self.pattern_data
76 def watch_functions(self, new_functions: List) -> None:
77 """Watch for changes to functions and update pattern data."""
78 # Update pattern data when functions change (structural changes only)
79 self._update_pattern_data()
81 def _initialize_pattern_data(self, initial_functions: Union[List, Dict, callable, None]) -> None:
82 """Initialize pattern data and determine mode."""
83 if initial_functions is None:
84 self.pattern_data = []
85 self.is_dict_mode = False
86 self.functions = []
87 elif callable(initial_functions):
88 self.pattern_data = [(initial_functions, {})]
89 self.is_dict_mode = False
90 self.functions = [(initial_functions, {})]
91 elif isinstance(initial_functions, list):
92 self.pattern_data = initial_functions
93 self.is_dict_mode = False
94 self.functions = self._normalize_function_list(initial_functions)
95 elif isinstance(initial_functions, dict):
96 # Convert any integer keys to string keys for consistency
97 normalized_dict = {}
98 for key, value in initial_functions.items():
99 str_key = str(key) # Ensure all keys are strings
100 normalized_dict[str_key] = value
101 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)")
103 self.pattern_data = normalized_dict
104 self.is_dict_mode = True
105 # Set first channel as selected, or empty if no channels
106 if normalized_dict:
107 first_channel = next(iter(normalized_dict))
108 self.selected_channel = first_channel
109 self.functions = self._normalize_function_list(normalized_dict.get(first_channel, []))
110 else:
111 self.selected_channel = None
112 self.functions = []
113 else:
114 logger.warning(f"Unknown initial_functions type: {type(initial_functions)}, using empty list")
115 self.pattern_data = []
116 self.is_dict_mode = False
117 self.functions = []
120 def _normalize_function_list(self, func_list: List[Any]) -> List[tuple[Callable, Dict]]:
121 """Ensures all items in a function list are (callable, kwargs) tuples."""
122 normalized = []
123 for item in func_list:
124 if isinstance(item, tuple) and len(item) == 2 and callable(item[0]) and isinstance(item[1], dict):
125 normalized.append(item)
126 elif callable(item):
127 normalized.append((item, {}))
128 else:
129 logger.warning(f"Skipping invalid item in function list: {item}")
130 return normalized
132 def _commit_and_notify(self) -> None:
133 """Post a change message to notify parent of function pattern changes."""
134 self.post_message(self.FunctionPatternChanged())
135 logger.debug("Posted FunctionPatternChanged message to parent.")
137 def _trigger_recomposition(self) -> None:
138 """Manually trigger recomposition when needed (e.g., loading patterns, adding/removing functions)."""
139 # Force recomposition by mutating the reactive property
140 # This is needed because functions has recompose=False to prevent focus loss during typing
141 self.mutate_reactive(FunctionListEditorWidget.functions)
143 def watch_current_group_by(self, old_group_by: Optional[GroupBy], new_group_by: Optional[GroupBy]) -> None:
144 """Handle group_by changes by saving/loading component selections."""
145 if old_group_by is not None and old_group_by != GroupBy.NONE:
146 # Save current selection for old group_by
147 if self.is_dict_mode and isinstance(self.pattern_data, dict):
148 current_selection = list(self.pattern_data.keys())
149 self.component_selections[old_group_by] = current_selection
150 logger.debug(f"Step '{self.step_identifier}': Saved selection for {old_group_by.value}: {current_selection}")
152 # Note: We don't automatically load selection for new_group_by here
153 # because the dialog will handle loading from cache when opened
154 logger.debug(f"Group by changed from {old_group_by} to {new_group_by}")
156 def _update_pattern_data(self) -> None:
157 """Update pattern_data based on current functions and mode."""
158 if self.is_dict_mode and self.selected_channel is not None:
159 # Save current functions to the selected channel
160 if not isinstance(self.pattern_data, dict):
161 self.pattern_data = {}
162 logger.debug(f"Saving {len(self.functions)} functions to channel {self.selected_channel}")
163 self.pattern_data[self.selected_channel] = self.functions.copy() # Make a copy to avoid reference issues
164 else:
165 # List mode - pattern_data is just the functions list
166 self.pattern_data = self.functions
170 def _switch_to_channel(self, channel: Any) -> None:
171 """Switch to editing functions for a specific channel."""
172 if not self.is_dict_mode:
173 return
175 # Save current functions first
176 old_channel = self.selected_channel
177 logger.debug(f"Switching from channel {old_channel} to {channel}")
178 logger.debug(f"Current functions before save: {len(self.functions)} functions")
180 self._update_pattern_data()
182 # Verify the save worked
183 if old_channel and isinstance(self.pattern_data, dict):
184 saved_functions = self.pattern_data.get(old_channel, [])
185 logger.debug(f"Saved {len(saved_functions)} functions to channel {old_channel}")
187 # Switch to new channel
188 self.selected_channel = channel
189 if isinstance(self.pattern_data, dict):
190 self.functions = self.pattern_data.get(channel, [])
191 logger.debug(f"Loaded {len(self.functions)} functions for channel {channel}")
192 else:
193 self.functions = []
195 # Update button text to show new channel
196 self._refresh_component_button()
198 # Channel switch will automatically trigger recomposition via reactive system
200 def _add_channel_to_pattern(self, channel: Any) -> None:
201 """Add a new channel (converts to dict mode if needed)."""
202 if not self.is_dict_mode:
203 # Convert to dict mode
204 self.pattern_data = {channel: self.functions}
205 self.is_dict_mode = True
206 self.selected_channel = channel
207 else:
208 # Add new channel with empty functions
209 if not isinstance(self.pattern_data, dict):
210 self.pattern_data = {}
211 self.pattern_data[channel] = []
212 self.selected_channel = channel
213 self.functions = []
215 def _remove_current_channel(self) -> None:
216 """Remove the currently selected channel."""
217 if not self.is_dict_mode or self.selected_channel is None:
218 return
220 if isinstance(self.pattern_data, dict):
221 new_pattern = self.pattern_data.copy()
222 if self.selected_channel in new_pattern:
223 del new_pattern[self.selected_channel]
225 if len(new_pattern) == 0:
226 # Revert to list mode
227 self.pattern_data = []
228 self.is_dict_mode = False
229 self.selected_channel = None
230 self.functions = []
231 else:
232 # Switch to first remaining channel
233 self.pattern_data = new_pattern
234 first_channel = next(iter(new_pattern))
235 self.selected_channel = first_channel
236 self.functions = new_pattern[first_channel]
238 def compose(self) -> ComposeResult:
239 """Compose the function list using the common interface pattern."""
240 from textual.containers import Vertical
242 with Vertical():
243 # Fixed header with title
244 yield Static("[bold]Functions[/bold]")
246 # Button row - takes minimal height needed for buttons
247 with Horizontal(id="function_list_header") as button_row:
248 button_row.styles.height = "auto" # CRITICAL: Take only needed height
250 # Empty space (flex-grows)
251 yield Static("")
253 # Centered main button group
254 with Horizontal() as main_button_group:
255 main_button_group.styles.width = "auto"
256 yield Button("Add", id="add_function_btn", compact=True)
257 yield Button("Load", id="load_func_btn", compact=True)
258 yield Button("Save As", id="save_func_as_btn", compact=True)
259 yield Button("Code", id="edit_vim_btn", compact=True)
261 # Component selection button (dynamic based on group_by setting)
262 component_text = self._get_component_button_text()
263 component_button = Button(component_text, id="component_btn", compact=True)
264 component_button.disabled = self._is_component_button_disabled()
265 yield component_button
267 # Channel navigation buttons (only in dict mode with multiple channels)
268 if self.is_dict_mode and isinstance(self.pattern_data, dict) and len(self.pattern_data) > 1:
269 yield Button("<", id="prev_channel_btn", compact=True)
270 yield Button(">", id="next_channel_btn", compact=True)
272 # Empty space (flex-grows)
273 yield Static("")
275 # Scrollable content area - expands to fill ALL remaining vertical space
276 with ScrollableContainer(id="function_list_content") as container:
277 container.styles.height = "1fr" # CRITICAL: Fill remaining space
279 if not self.functions:
280 content = Static("No functions defined. Click 'Add Function' to begin.")
281 content.styles.width = "100%"
282 content.styles.height = "100%"
283 yield content
284 else:
285 for i, func_item in enumerate(self.functions):
286 pane = FunctionPaneWidget(func_item, i)
287 pane.styles.width = "100%"
288 pane.styles.height = "auto" # CRITICAL: Only take height needed for content
289 yield pane
293 async def on_button_pressed(self, event: Button.Pressed) -> None:
294 """Handle button presses."""
295 logger.debug(f"🔍 BUTTON: Button pressed: {event.button.id}")
297 if event.button.id == "add_function_btn":
298 logger.debug(f"🔍 BUTTON: Add function button pressed")
299 await self._add_function()
300 elif event.button.id == "load_func_btn":
301 logger.debug(f"🔍 BUTTON: Load func button pressed - calling _load_func()")
302 await self._load_func()
303 elif event.button.id == "save_func_as_btn":
304 logger.debug(f"🔍 BUTTON: Save func as button pressed")
305 await self._save_func_as()
306 elif event.button.id == "edit_vim_btn":
307 logger.debug(f"🔍 BUTTON: Edit vim button pressed")
308 self._edit_in_vim()
309 elif event.button.id == "component_btn":
310 logger.debug(f"🔍 BUTTON: Component button pressed")
311 await self._show_component_selection_dialog()
312 elif event.button.id == "prev_channel_btn":
313 logger.debug(f"🔍 BUTTON: Previous channel button pressed")
314 self._navigate_channel(-1)
315 elif event.button.id == "next_channel_btn":
316 logger.debug(f"🔍 BUTTON: Next channel button pressed")
317 self._navigate_channel(1)
318 else:
319 logger.warning(f"🔍 BUTTON: Unknown button pressed: {event.button.id}")
323 def on_function_pane_widget_parameter_changed(self, event: Message) -> None:
324 """Handle parameter change message from FunctionPaneWidget."""
325 if hasattr(event, 'index') and hasattr(event, 'param_name') and hasattr(event, 'value'):
326 if 0 <= event.index < len(self.functions):
327 # Update function kwargs with proper type conversion
328 func, kwargs = self.functions[event.index]
329 new_kwargs = kwargs.copy()
331 # Convert value to proper type based on function signature
332 converted_value = event.value
333 try:
334 from openhcs.textual_tui.widgets.shared.signature_analyzer import SignatureAnalyzer
335 from enum import Enum
337 param_info = SignatureAnalyzer.analyze(func)
338 if event.param_name in param_info:
339 param_details = param_info[event.param_name]
340 param_type = param_details.param_type
341 is_required = param_details.is_required
342 default_value = param_details.default_value
344 # Handle empty strings - always convert to None or default value
345 if isinstance(event.value, str) and event.value.strip() == "":
346 # Empty parameter - use default value (None for required params)
347 converted_value = default_value
348 elif isinstance(event.value, str) and event.value.strip() != "":
349 # Non-empty string - convert to proper type
350 if param_type == float:
351 converted_value = float(event.value)
352 elif param_type == int:
353 converted_value = int(event.value)
354 elif param_type == bool:
355 converted_value = event.value.lower() in ('true', '1', 'yes', 'on')
356 elif hasattr(param_type, '__bases__') and Enum in param_type.__bases__:
357 converted_value = param_type(event.value)
358 except (ValueError, TypeError, AttributeError) as e:
359 logger.warning(f"Failed to convert parameter '{event.param_name}' value '{event.value}': {e}")
360 converted_value = event.value # Keep original value on conversion failure
362 new_kwargs[event.param_name] = converted_value
364 # Update functions list WITHOUT triggering recomposition (to prevent focus loss)
365 # We modify the underlying list directly instead of assigning a new list
366 self.functions[event.index] = (func, new_kwargs)
368 # Manually update pattern data
369 self._update_pattern_data()
371 # Notify parent
372 self.post_message(self.FunctionPatternChanged())
373 logger.debug(f"Updated parameter {event.param_name}={converted_value} (type: {type(converted_value)}) for function {event.index}")
375 async def on_function_pane_widget_change_function(self, event: Message) -> None:
376 """Handle change function message from FunctionPaneWidget."""
377 if hasattr(event, 'index') and 0 <= event.index < len(self.functions):
378 await self._change_function(event.index)
380 def on_function_pane_widget_remove_function(self, event: Message) -> None:
381 """Handle remove function message from FunctionPaneWidget."""
382 if hasattr(event, 'index') and 0 <= event.index < len(self.functions):
383 new_functions = self.functions[:event.index] + self.functions[event.index+1:]
384 self.functions = new_functions
385 self._commit_and_notify()
386 logger.debug(f"Removed function at index {event.index}")
387 else:
388 logger.warning(f"Invalid index for remove function: {getattr(event, 'index', 'N/A')}")
390 async def on_function_pane_widget_add_function(self, event: Message) -> None:
391 """Handle add function message from FunctionPaneWidget."""
392 if hasattr(event, 'insert_index'):
393 insert_index = min(event.insert_index, len(self.functions)) # Clamp to valid range
394 await self._add_function_at_index(insert_index)
395 else:
396 logger.warning(f"Invalid add function event: missing insert_index")
398 def on_function_pane_widget_move_function(self, event: Message) -> None:
399 """Handle move function message from FunctionPaneWidget."""
400 if not (hasattr(event, 'index') and hasattr(event, 'direction')):
401 return
403 index = event.index
404 direction = event.direction
406 if not (0 <= index < len(self.functions)):
407 return
409 new_index = index + direction
410 if not (0 <= new_index < len(self.functions)):
411 return
413 new_functions = self.functions.copy()
414 new_functions[index], new_functions[new_index] = new_functions[new_index], new_functions[index]
415 self.functions = new_functions
416 self._commit_and_notify()
417 logger.debug(f"Moved function from index {index} to {new_index}")
419 async def _add_function(self) -> None:
420 """Add a new function to the end of the list."""
421 await self._add_function_at_index(len(self.functions))
423 async def _add_function_at_index(self, insert_index: int) -> None:
424 """Add a new function at the specified index."""
425 from openhcs.textual_tui.windows import FunctionSelectorWindow
426 from textual.css.query import NoMatches
428 def handle_function_selection(selected_function: Optional[Callable]) -> None:
429 if selected_function:
430 # Insert function at the specified index
431 new_functions = self.functions.copy()
432 new_functions.insert(insert_index, (selected_function, {}))
433 self.functions = new_functions
434 self._commit_and_notify()
435 logger.debug(f"Added function: {selected_function.__name__} at index {insert_index}")
437 # Use window-based function selector (follows ConfigWindow pattern)
438 try:
439 window = self.app.query_one(FunctionSelectorWindow)
440 # Window exists, update it and open
441 window.on_result_callback = handle_function_selection
442 window.open_state = True
443 except NoMatches:
444 # Expected case: window doesn't exist yet, create new one
445 window = FunctionSelectorWindow(on_result_callback=handle_function_selection)
446 await self.app.mount(window)
447 window.open_state = True
449 async def _change_function(self, index: int) -> None:
450 """Change function at specified index."""
451 from openhcs.textual_tui.windows import FunctionSelectorWindow
452 from textual.css.query import NoMatches
454 if 0 <= index < len(self.functions):
455 current_func, _ = self.functions[index]
457 def handle_function_selection(selected_function: Optional[Callable]) -> None:
458 if selected_function:
459 # Replace function but keep existing kwargs structure
460 new_functions = self.functions.copy()
461 new_functions[index] = (selected_function, {})
462 self.functions = new_functions
463 self._commit_and_notify()
464 logger.debug(f"Changed function at index {index} to: {selected_function.__name__}")
466 # Use window-based function selector (follows ConfigWindow pattern)
467 try:
468 window = self.app.query_one(FunctionSelectorWindow)
469 # Window exists, update it and open
470 window.current_function = current_func
471 window.on_result_callback = handle_function_selection
472 window.open_state = True
473 except NoMatches:
474 # Expected case: window doesn't exist yet, create new one
475 window = FunctionSelectorWindow(current_function=current_func, on_result_callback=handle_function_selection)
476 await self.app.mount(window)
477 window.open_state = True
479 def _commit_and_notify(self) -> None:
480 """Commit changes and notify parent of function pattern change."""
481 # Update pattern data before notifying
482 self._update_pattern_data()
483 # Post message to notify parent (DualEditorScreen) of changes
484 self.post_message(self.FunctionPatternChanged())
486 async def _load_func(self) -> None:
487 """Load function pattern from .func file."""
488 logger.debug(f"🔍 LOAD FUNC: _load_func() called - starting file browser...")
490 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
491 from openhcs.constants.constants import Backend
492 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
494 def handle_result(result):
495 logger.debug(f"🔍 LOAD FUNC: File browser callback received result: {result} (type: {type(result)})")
497 # Handle both single Path and list of Paths (disable multi-loading, take first only)
498 file_path = None
499 if isinstance(result, Path):
500 file_path = result
501 logger.debug(f"🔍 LOAD FUNC: Single Path received: {file_path}")
502 elif isinstance(result, list) and len(result) > 0:
503 file_path = result[0] # Take only the first file (disable multi-loading)
504 if len(result) > 1:
505 logger.warning(f"🔍 LOAD FUNC: Multiple files selected, using only first: {file_path}")
506 else:
507 logger.debug(f"🔍 LOAD FUNC: List with single Path received: {file_path}")
509 if file_path and isinstance(file_path, Path):
510 logger.debug(f"🔍 LOAD FUNC: Valid Path extracted, calling _load_pattern_from_file({file_path})")
511 self._load_pattern_from_file(file_path)
512 else:
513 logger.warning(f"🔍 LOAD FUNC: No valid Path found in result: {result}")
515 # Use window-based file browser
516 logger.debug(f"🔍 LOAD FUNC: Opening file browser window...")
517 from openhcs.textual_tui.services.file_browser_service import SelectionMode
518 await open_file_browser_window(
519 app=self.app,
520 file_manager=self.app.filemanager,
521 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS),
522 backend=Backend.DISK,
523 title="Load Function Pattern (.func)",
524 mode=BrowserMode.LOAD,
525 selection_mode=SelectionMode.FILES_ONLY,
526 filter_extensions=['.func'],
527 cache_key=PathCacheKey.FUNCTION_PATTERNS,
528 on_result_callback=handle_result
529 )
530 logger.debug(f"🔍 LOAD FUNC: File browser window opened, waiting for user selection...")
532 async def _save_func_as(self) -> None:
533 """Save function pattern to .func file."""
534 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
535 from openhcs.constants.constants import Backend
536 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
538 def handle_result(result):
539 if result and isinstance(result, Path):
540 self._save_pattern_to_file(result)
542 # Use window-based file browser
543 from openhcs.textual_tui.services.file_browser_service import SelectionMode
544 await open_file_browser_window(
545 app=self.app,
546 file_manager=self.app.filemanager,
547 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS),
548 backend=Backend.DISK,
549 title="Save Function Pattern (.func)",
550 mode=BrowserMode.SAVE,
551 selection_mode=SelectionMode.FILES_ONLY,
552 filter_extensions=['.func'],
553 default_filename="pattern.func",
554 cache_key=PathCacheKey.FUNCTION_PATTERNS,
555 on_result_callback=handle_result
556 )
558 def _load_pattern_from_file(self, file_path: Path) -> None:
559 """Load pattern from .func file."""
560 logger.debug(f"🔍 LOAD FUNC: _load_pattern_from_file called with: {file_path}")
561 logger.debug(f"🔍 LOAD FUNC: File exists: {file_path.exists()}")
562 logger.debug(f"🔍 LOAD FUNC: File size: {file_path.stat().st_size if file_path.exists() else 'N/A'} bytes")
564 import dill as pickle
565 try:
566 logger.debug(f"🔍 LOAD FUNC: Opening file for reading...")
567 with open(file_path, 'rb') as f:
568 pattern = pickle.load(f)
569 logger.debug(f"🔍 LOAD FUNC: Successfully loaded pattern from pickle: {pattern}")
570 logger.debug(f"🔍 LOAD FUNC: Pattern type: {type(pattern)}")
572 # Log current state before loading
573 logger.debug(f"🔍 LOAD FUNC: BEFORE - functions: {len(self.functions)} items")
574 logger.debug(f"🔍 LOAD FUNC: BEFORE - is_dict_mode: {self.is_dict_mode}")
575 logger.debug(f"🔍 LOAD FUNC: BEFORE - selected_channel: {self.selected_channel}")
577 # Process pattern and create NEW objects (like add button does)
578 # This is crucial - reactive system only triggers on new object assignment
579 if pattern is None:
580 new_pattern_data = []
581 new_is_dict_mode = False
582 new_functions = []
583 new_selected_channel = None
584 elif callable(pattern):
585 new_pattern_data = [(pattern, {})]
586 new_is_dict_mode = False
587 new_functions = [(pattern, {})]
588 new_selected_channel = None
589 elif isinstance(pattern, list):
590 new_pattern_data = pattern
591 new_is_dict_mode = False
592 new_functions = self._normalize_function_list(pattern)
593 new_selected_channel = None
594 elif isinstance(pattern, dict):
595 # Convert any integer keys to string keys for consistency
596 normalized_dict = {}
597 for key, value in pattern.items():
598 str_key = str(key)
599 normalized_dict[str_key] = value
600 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)")
602 new_pattern_data = normalized_dict
603 new_is_dict_mode = True
604 # Set first channel as selected, or None if no channels
605 if normalized_dict:
606 first_channel = next(iter(normalized_dict))
607 new_selected_channel = first_channel
608 new_functions = self._normalize_function_list(normalized_dict.get(first_channel, []))
609 else:
610 new_selected_channel = None
611 new_functions = []
612 else:
613 logger.warning(f"Unknown pattern type: {type(pattern)}, using empty list")
614 new_pattern_data = []
615 new_is_dict_mode = False
616 new_functions = []
617 new_selected_channel = None
619 # Assign NEW objects to reactive properties (like add button does)
620 logger.debug(f"🔍 LOAD FUNC: Assigning new values to reactive properties...")
621 logger.debug(f"🔍 LOAD FUNC: new_pattern_data: {new_pattern_data}")
622 logger.debug(f"🔍 LOAD FUNC: new_is_dict_mode: {new_is_dict_mode}")
623 logger.debug(f"🔍 LOAD FUNC: new_functions: {len(new_functions)} items: {new_functions}")
624 logger.debug(f"🔍 LOAD FUNC: new_selected_channel: {new_selected_channel}")
626 self.pattern_data = new_pattern_data
627 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned pattern_data")
629 self.is_dict_mode = new_is_dict_mode
630 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned is_dict_mode")
632 self.functions = new_functions # This triggers reactive system!
633 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned functions - THIS SHOULD TRIGGER REACTIVE SYSTEM!")
635 self.selected_channel = new_selected_channel
636 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned selected_channel")
638 logger.debug(f"🔍 LOAD FUNC: Calling _commit_and_notify...")
639 self._commit_and_notify()
641 # Final state logging
642 logger.debug(f"🔍 LOAD FUNC: FINAL - functions: {len(self.functions)} items")
643 logger.debug(f"🔍 LOAD FUNC: FINAL - is_dict_mode: {self.is_dict_mode}")
644 logger.debug(f"🔍 LOAD FUNC: FINAL - selected_channel: {self.selected_channel}")
645 logger.debug(f"🔍 LOAD FUNC: ✅ Successfully completed loading process!")
647 except Exception as e:
648 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Failed to load pattern: {e}")
649 import traceback
650 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Traceback: {traceback.format_exc()}")
652 def _save_pattern_to_file(self, file_path: Path) -> None:
653 """Save pattern to .func file."""
654 import pickle
655 try:
656 with open(file_path, 'wb') as f:
657 pickle.dump(self.current_pattern, f)
658 except Exception as e:
659 logger.error(f"Failed to save pattern: {e}")
661 def _edit_in_vim(self) -> None:
662 """Edit function pattern as Python code in terminal window."""
663 logger.debug("Edit button pressed - opening terminal editor")
665 try:
666 # Use debug module's pattern formatting with proper imports
667 from openhcs.debug.pickle_to_python import generate_readable_function_repr
668 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher
670 # Update pattern data first
671 self._update_pattern_data()
673 # Generate complete Python code with imports (like debug module does)
674 python_code = self._generate_complete_python_code()
676 # Create terminal launcher
677 launcher = TerminalLauncher(self.app)
679 # Launch editor in terminal window with callback
680 self.app.call_later(
681 launcher.launch_editor_for_file,
682 python_code,
683 '.py',
684 self._handle_edited_pattern
685 )
687 except Exception as e:
688 logger.error(f"Failed to launch terminal editor: {e}")
689 self.app.show_error("Edit Error", f"Failed to launch terminal editor: {str(e)}")
691 def _handle_edited_pattern(self, edited_code: str) -> None:
692 """Handle the edited pattern code from terminal editor."""
693 try:
694 # Execute the code (it has all necessary imports)
695 namespace = {}
696 exec(edited_code, namespace)
698 # Get the pattern from the namespace
699 if 'pattern' in namespace:
700 new_pattern = namespace['pattern']
701 self._apply_edited_pattern(new_pattern)
702 else:
703 self.app.show_error("Parse Error", "No 'pattern = ...' assignment found in edited code")
705 except SyntaxError as e:
706 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}")
707 except Exception as e:
708 logger.error(f"Failed to parse edited pattern: {e}")
709 self.app.show_error("Edit Error", f"Failed to parse edited pattern: {str(e)}")
711 def _generate_complete_python_code(self) -> str:
712 """Generate complete Python code with imports (following debug module approach)."""
713 # Use complete function pattern code generation from pickle_to_python
714 from openhcs.debug.pickle_to_python import generate_complete_function_pattern_code
716 return generate_complete_function_pattern_code(self.pattern_data, clean_mode=False)
720 def _apply_edited_pattern(self, new_pattern):
721 """Apply the edited pattern back to the TUI."""
722 try:
723 if self.is_dict_mode:
724 if isinstance(new_pattern, dict):
725 self.pattern_data = new_pattern
726 # Update current channel if it exists in new pattern
727 if self.selected_channel and self.selected_channel in new_pattern:
728 self.functions = self._normalize_function_list(new_pattern[self.selected_channel])
729 else:
730 # Select first channel
731 if new_pattern:
732 self.selected_channel = next(iter(new_pattern))
733 self.functions = self._normalize_function_list(new_pattern[self.selected_channel])
734 else:
735 self.functions = []
736 else:
737 raise ValueError("Expected dict pattern for dict mode")
738 else:
739 if isinstance(new_pattern, list):
740 self.pattern_data = new_pattern
741 self.functions = self._normalize_function_list(new_pattern)
742 else:
743 raise ValueError("Expected list pattern for list mode")
745 # Refresh the UI and notify of changes
746 self.refresh()
747 self._commit_and_notify()
749 except Exception as e:
750 self.app.show_error("Apply Error", f"Failed to apply edited pattern: {str(e)}")
752 def _generate_pattern_python_code(self) -> str:
753 """Generate Python code representation of the current pattern."""
754 # Update pattern data first
755 self._update_pattern_data()
757 # Generate imports for all functions in the pattern
758 imports = set()
759 self._collect_function_imports(self.pattern_data, imports)
761 # Create the Python code
762 code_lines = [
763 "# Edit this function pattern and save to apply changes",
764 "# The pattern variable will be parsed and applied to the TUI",
765 "",
766 ]
768 # Add imports
769 if imports:
770 code_lines.extend(sorted(imports))
771 code_lines.append("")
773 # Add the pattern assignment
774 code_lines.append("# Function pattern:")
775 pattern_repr = self._format_pattern_for_code(self.pattern_data)
776 code_lines.append(f"pattern = {pattern_repr}")
778 return "\n".join(code_lines)
780 def _collect_function_imports(self, pattern, imports):
781 """Recursively collect import statements for all functions in pattern."""
782 if callable(pattern):
783 if hasattr(pattern, '__module__') and hasattr(pattern, '__name__'):
784 imports.add(f"from {pattern.__module__} import {pattern.__name__}")
785 elif isinstance(pattern, tuple) and len(pattern) == 2:
786 func, _ = pattern
787 self._collect_function_imports(func, imports)
788 elif isinstance(pattern, list):
789 for item in pattern:
790 self._collect_function_imports(item, imports)
791 elif isinstance(pattern, dict):
792 for value in pattern.values():
793 self._collect_function_imports(value, imports)
795 def _format_pattern_for_code(self, pattern, indent=0) -> str:
796 """Format pattern as readable Python code."""
797 indent_str = " " * indent
799 if callable(pattern):
800 return pattern.__name__
801 elif isinstance(pattern, tuple) and len(pattern) == 2:
802 func, kwargs = pattern
803 func_name = func.__name__ if callable(func) else str(func)
804 kwargs_str = repr(kwargs)
805 return f"({func_name}, {kwargs_str})"
806 elif isinstance(pattern, list):
807 if not pattern:
808 return "[]"
809 items = []
810 for item in pattern:
811 item_str = self._format_pattern_for_code(item, indent + 1)
812 items.append(f"{indent_str} {item_str}")
813 return "[\n" + ",\n".join(items) + f"\n{indent_str}]"
814 elif isinstance(pattern, dict):
815 if not pattern:
816 return "{}"
817 items = []
818 for key, value in pattern.items():
819 value_str = self._format_pattern_for_code(value, indent + 1)
820 items.append(f"{indent_str} {repr(key)}: {value_str}")
821 return "{\n" + ",\n".join(items) + f"\n{indent_str}" + "}"
822 else:
823 return repr(pattern)
825 def _get_component_button_text(self) -> str:
826 """Get text for the component selection button based on current group_by setting."""
827 if self.current_group_by is None or self.current_group_by == GroupBy.NONE:
828 return "Component: None"
830 # Use group_by.value.title() for dynamic component type display
831 component_type = self.current_group_by.value.title()
833 if self.is_dict_mode and self.selected_channel is not None:
834 # Try to get metadata name for the selected component
835 display_name = self._get_component_display_name(self.selected_channel)
836 return f"{component_type}: {display_name}"
837 return f"{component_type}: None"
839 def _get_component_display_name(self, component_key: str) -> str:
840 """Get display name for component key, using metadata if available."""
841 # Try to get metadata name from orchestrator
842 orchestrator = self._get_current_orchestrator()
843 if orchestrator and self.current_group_by:
844 try:
845 metadata_name = orchestrator.get_component_metadata(self.current_group_by, component_key)
846 if metadata_name:
847 return metadata_name
848 except Exception as e:
849 logger.debug(f"Could not get metadata for {self.current_group_by.value} {component_key}: {e}")
851 # Fallback to component key
852 return component_key
854 def _refresh_component_button(self) -> None:
855 """Refresh the component button text and state."""
856 try:
857 component_button = self.query_one("#component_btn", Button)
858 component_button.label = self._get_component_button_text()
859 component_button.disabled = self._is_component_button_disabled()
860 except Exception as e:
861 logger.debug(f"Could not refresh component button: {e}")
863 def _is_component_button_disabled(self) -> bool:
864 """Check if component selection button should be disabled."""
865 return (
866 self.current_group_by is None or
867 self.current_group_by == GroupBy.NONE or
868 (self.current_variable_components and
869 self.current_group_by.value in [vc.value for vc in self.current_variable_components])
870 )
872 async def _show_component_selection_dialog(self) -> None:
873 """Show the component selection dialog for the current group_by setting."""
874 try:
875 # Check if component selection is disabled
876 if self._is_component_button_disabled():
877 logger.debug("Component selection is disabled")
878 return
880 # Get available components from orchestrator using current group_by
881 orchestrator = self._get_current_orchestrator()
882 if orchestrator is None:
883 logger.warning("No orchestrator available for component selection")
884 return
886 available_components = orchestrator.get_component_keys(self.current_group_by)
887 if not available_components:
888 component_type = self.current_group_by.value
889 logger.warning(f"No {component_type} values found in current plate")
890 return
892 # Get currently selected components from cache or current pattern
893 if self.current_group_by in self.component_selections:
894 # Use cached selection for this group_by
895 selected_components = self.component_selections[self.current_group_by]
896 logger.debug(f"Step '{self.step_identifier}': Using cached selection for {self.current_group_by.value}: {selected_components}")
897 elif self.is_dict_mode and isinstance(self.pattern_data, dict):
898 # Fallback to current pattern keys
899 selected_components = list(self.pattern_data.keys())
900 else:
901 selected_components = []
903 # Show window with dynamic component type
904 from openhcs.textual_tui.windows import GroupBySelectorWindow
905 from textual.css.query import NoMatches
907 def handle_selection(result_components):
908 if result_components is not None:
909 self._update_components(result_components)
911 # Use window-based group-by selector (follows ConfigWindow pattern)
912 try:
913 window = self.app.query_one(GroupBySelectorWindow)
914 # Window exists, update it and open
915 window.available_channels = available_components
916 window.selected_channels = selected_components
917 window.component_type = self.current_group_by.value
918 window.orchestrator = orchestrator
919 window.on_result_callback = handle_selection
920 window.open_state = True
921 except NoMatches:
922 # Expected case: window doesn't exist yet, create new one
923 window = GroupBySelectorWindow(
924 available_channels=available_components,
925 selected_channels=selected_components,
926 on_result_callback=handle_selection,
927 component_type=self.current_group_by.value,
928 orchestrator=orchestrator
929 )
930 await self.app.mount(window)
931 window.open_state = True
933 except Exception as e:
934 component_type = self.current_group_by.value if self.current_group_by else "component"
935 logger.error(f"Failed to show {component_type} selection dialog: {e}")
937 def _update_components(self, new_components: List[str]) -> None:
938 """
939 Update the pattern based on new component selection.
941 Uses string component keys directly to match the pattern detection system.
942 Pattern detection always returns string component values (e.g., '1', '2', '3') when
943 grouping by any component, so function patterns use string keys for consistency.
944 """
945 if not new_components:
946 # No components selected - revert to list mode
947 if self.is_dict_mode:
948 # Save current functions to list mode
949 self.pattern_data = self.functions
950 self.is_dict_mode = False
951 self.selected_channel = None
952 logger.debug("Reverted to list mode (no components selected)")
953 else:
954 # Use component strings directly - no conversion needed
955 component_keys = new_components
957 # Components selected - ensure dict mode
958 if not self.is_dict_mode:
959 # Convert to dict mode
960 current_functions = self.functions
961 self.pattern_data = {component_keys[0]: current_functions}
962 self.is_dict_mode = True
963 self.selected_channel = component_keys[0]
965 # Add other components with empty functions
966 for component_key in component_keys[1:]:
967 self.pattern_data[component_key] = []
968 else:
969 # Already in dict mode - update components
970 old_pattern = self.pattern_data.copy() if isinstance(self.pattern_data, dict) else {}
971 new_pattern = {}
973 # Keep existing functions for components that remain
974 for component_key in component_keys:
975 # Check both string and integer keys for backward compatibility
976 if component_key in old_pattern:
977 new_pattern[component_key] = old_pattern[component_key]
978 else:
979 # Try integer key for backward compatibility
980 try:
981 component_int = int(component_key)
982 if component_int in old_pattern:
983 new_pattern[component_key] = old_pattern[component_int]
984 else:
985 new_pattern[component_key] = []
986 except ValueError:
987 new_pattern[component_key] = []
989 self.pattern_data = new_pattern
991 # Update selected component if needed
992 if self.selected_channel not in component_keys:
993 self.selected_channel = component_keys[0] if component_keys else None
994 if self.selected_channel:
995 self.functions = new_pattern.get(self.selected_channel, [])
997 # Save selection to cache for current group_by
998 if self.current_group_by is not None and self.current_group_by != GroupBy.NONE:
999 self.component_selections[self.current_group_by] = new_components
1000 logger.debug(f"Step '{self.step_identifier}': Cached selection for {self.current_group_by.value}: {new_components}")
1002 self._commit_and_notify()
1003 self._refresh_component_button()
1004 logger.debug(f"Updated components: {new_components}")
1006 def _navigate_channel(self, direction: int) -> None:
1007 """Navigate to next/previous channel (with looping)."""
1008 if not self.is_dict_mode or not isinstance(self.pattern_data, dict):
1009 return
1011 channels = sorted(self.pattern_data.keys())
1012 if len(channels) <= 1:
1013 return
1015 try:
1016 current_index = channels.index(self.selected_channel)
1017 new_index = (current_index + direction) % len(channels)
1018 new_channel = channels[new_index]
1020 self._switch_to_channel(new_channel)
1021 logger.debug(f"Navigated to channel {new_channel}")
1022 except (ValueError, IndexError):
1023 logger.warning(f"Failed to navigate channels: current={self.selected_channel}, channels={channels}")
1027 def _get_current_orchestrator(self):
1028 """Get the current orchestrator from the app."""
1029 try:
1030 # Get from app - PlateManagerWidget is now in PipelinePlateWindow
1031 if hasattr(self.app, 'query_one'):
1032 from openhcs.textual_tui.windows import PipelinePlateWindow
1033 from openhcs.textual_tui.widgets.plate_manager import PlateManagerWidget
1035 # Try to find the PipelinePlateWindow first
1036 try:
1037 pipeline_plate_window = self.app.query_one(PipelinePlateWindow)
1038 plate_manager = pipeline_plate_window.plate_widget
1039 except:
1040 # Fallback: try to find PlateManagerWidget directly in the app
1041 plate_manager = self.app.query_one(PlateManagerWidget)
1043 # Use selected_plate (not current_plate!)
1044 selected_plate = plate_manager.selected_plate
1045 if selected_plate and selected_plate in plate_manager.orchestrators:
1046 orchestrator = plate_manager.orchestrators[selected_plate]
1047 if not orchestrator.is_initialized():
1048 orchestrator.initialize()
1049 return orchestrator
1050 return None
1051 except Exception as e:
1052 logger.error(f"Failed to get orchestrator: {e}")
1053 return None