Coverage for openhcs/textual_tui/widgets/function_list_editor.py: 0.0%
625 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""Function list editor widget - port of function_list_manager.py to Textual."""
3import logging
4from pathlib import Path
5from typing import List, Union, Dict, Any, Optional, Callable # Added Optional, Callable
6from textual.containers import ScrollableContainer, Container, Horizontal
7from textual.widgets import Button, Static
8from textual.app import ComposeResult
9from textual.reactive import reactive
10from textual.message import Message # Added Message
12from openhcs.processing.backends.lib_registry.registry_service import RegistryService
13from openhcs.textual_tui.services.pattern_data_manager import PatternDataManager
14from openhcs.textual_tui.widgets.function_pane import FunctionPaneWidget
15from openhcs.constants.constants import GroupBy
17logger = logging.getLogger(__name__)
20class FunctionListEditorWidget(Container):
21 """
22 Scrollable function list editor.
24 Ports the function display and management logic from function_list_manager.py
25 """
27 class FunctionPatternChanged(Message):
28 """Message to indicate the function pattern has changed."""
29 pass
31 # Reactive properties for automatic UI updates
32 functions = reactive(list, recompose=True) # Structural changes (add/remove) should trigger recomposition
33 pattern_data = reactive(list, recompose=False) # The actual pattern (List or Dict)
34 is_dict_mode = reactive(False, recompose=True) # Whether we're in channel-specific mode - affects UI layout
35 selected_channel = reactive(None, recompose=True) # Currently selected channel - affects button text
36 available_channels = reactive(list) # Available channels from orchestrator
38 # Step configuration reactive properties for dynamic component selection
39 current_group_by = reactive(None, recompose=True) # Current GroupBy setting from step editor
40 current_variable_components = reactive(list, recompose=True) # Current VariableComponents list from step editor
42 def __init__(self, initial_functions: Union[List, Dict, callable, None] = None, step_identifier: str = None):
43 super().__init__()
45 # Initialize services (reuse existing business logic)
46 self.registry_service = RegistryService()
47 self.data_manager = PatternDataManager() # Not heavily used yet, but available
49 # Step identifier for cache isolation (optional, defaults to widget instance id)
50 self.step_identifier = step_identifier or f"widget_{id(self)}"
52 # Component selection cache per GroupBy (instance-specific, not shared between steps)
53 self.component_selections: Dict[GroupBy, List[str]] = {}
55 # Initialize pattern data and mode
56 self._initialize_pattern_data(initial_functions)
58 logger.debug(f"FunctionListEditorWidget initialized for step '{self.step_identifier}' with {len(self.functions)} functions, dict_mode={self.is_dict_mode}")
60 @property
61 def current_pattern(self) -> Union[List, Dict]:
62 """Get the current pattern data (for parent widgets to access)."""
63 self._update_pattern_data() # Ensure it's up to date
65 # Migration fix: Convert any integer keys to string keys for compatibility
66 # with pattern detection system which always uses string component values
67 if isinstance(self.pattern_data, dict):
68 migrated_pattern = {}
69 for key, value in self.pattern_data.items():
70 str_key = str(key)
71 migrated_pattern[str_key] = value
72 return migrated_pattern
74 return self.pattern_data
76 def watch_functions(self, new_functions: List) -> None:
77 """Watch for changes to functions and update pattern data."""
78 # Update pattern data when functions change (structural changes only)
79 self._update_pattern_data()
81 def _initialize_pattern_data(self, initial_functions: Union[List, Dict, callable, tuple, None]) -> None:
82 """Initialize pattern data and determine mode."""
83 if initial_functions is None:
84 self.pattern_data = []
85 self.is_dict_mode = False
86 self.functions = []
87 elif callable(initial_functions):
88 # Single callable: treat as [(callable, {})]
89 self.pattern_data = [(initial_functions, {})]
90 self.is_dict_mode = False
91 self.functions = [(initial_functions, {})]
92 elif isinstance(initial_functions, tuple) and len(initial_functions) == 2 and callable(initial_functions[0]) and isinstance(initial_functions[1], dict):
93 # Single tuple (callable, kwargs): treat as [(callable, kwargs)]
94 self.pattern_data = [initial_functions]
95 self.is_dict_mode = False
96 self.functions = [initial_functions]
97 elif isinstance(initial_functions, list):
98 self.pattern_data = initial_functions
99 self.is_dict_mode = False
100 self.functions = self._normalize_function_list(initial_functions)
101 elif isinstance(initial_functions, dict):
102 # Convert any integer keys to string keys for consistency
103 normalized_dict = {}
104 for key, value in initial_functions.items():
105 str_key = str(key) # Ensure all keys are strings
106 normalized_dict[str_key] = value
107 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)")
109 self.pattern_data = normalized_dict
110 self.is_dict_mode = True
111 # Set first channel as selected, or empty if no channels
112 if normalized_dict:
113 first_channel = next(iter(normalized_dict))
114 self.selected_channel = first_channel
115 self.functions = self._normalize_function_list(normalized_dict.get(first_channel, []))
116 else:
117 self.selected_channel = None
118 self.functions = []
119 else:
120 logger.warning(f"Unknown initial_functions type: {type(initial_functions)}, using empty list")
121 self.pattern_data = []
122 self.is_dict_mode = False
123 self.functions = []
126 def _normalize_function_list(self, func_list: List[Any]) -> List[tuple[Callable, Dict]]:
127 """Ensures all items in a function list are (callable, kwargs) tuples."""
128 normalized = []
129 for item in func_list:
130 if isinstance(item, tuple) and len(item) == 2 and callable(item[0]) and isinstance(item[1], dict):
131 normalized.append(item)
132 elif callable(item):
133 normalized.append((item, {}))
134 else:
135 logger.warning(f"Skipping invalid item in function list: {item}")
136 return normalized
138 def _commit_and_notify(self) -> None:
139 """Post a change message to notify parent of function pattern changes."""
140 self.post_message(self.FunctionPatternChanged())
141 logger.debug("Posted FunctionPatternChanged message to parent.")
143 def _trigger_recomposition(self) -> None:
144 """Manually trigger recomposition when needed (e.g., loading patterns, adding/removing functions)."""
145 # Force recomposition by mutating the reactive property
146 # This is needed because functions has recompose=False to prevent focus loss during typing
147 self.mutate_reactive(FunctionListEditorWidget.functions)
149 def watch_current_group_by(self, old_group_by: Optional[GroupBy], new_group_by: Optional[GroupBy]) -> None:
150 """Handle group_by changes by saving/loading component selections."""
151 if old_group_by is not None and old_group_by != GroupBy.NONE:
152 # Save current selection for old group_by
153 if self.is_dict_mode and isinstance(self.pattern_data, dict):
154 current_selection = list(self.pattern_data.keys())
155 self.component_selections[old_group_by] = current_selection
156 logger.debug(f"Step '{self.step_identifier}': Saved selection for {old_group_by.value}: {current_selection}")
158 # Note: We don't automatically load selection for new_group_by here
159 # because the dialog will handle loading from cache when opened
160 logger.debug(f"Group by changed from {old_group_by} to {new_group_by}")
162 def _update_pattern_data(self) -> None:
163 """Update pattern_data based on current functions and mode."""
164 if self.is_dict_mode and self.selected_channel is not None:
165 # Save current functions to the selected channel
166 if not isinstance(self.pattern_data, dict):
167 self.pattern_data = {}
168 logger.debug(f"Saving {len(self.functions)} functions to channel {self.selected_channel}")
169 self.pattern_data[self.selected_channel] = self.functions.copy() # Make a copy to avoid reference issues
170 else:
171 # List mode - pattern_data is just the functions list
172 self.pattern_data = self.functions
176 def _switch_to_channel(self, channel: Any) -> None:
177 """Switch to editing functions for a specific channel."""
178 if not self.is_dict_mode:
179 return
181 # Save current functions first
182 old_channel = self.selected_channel
183 logger.debug(f"Switching from channel {old_channel} to {channel}")
184 logger.debug(f"Current functions before save: {len(self.functions)} functions")
186 self._update_pattern_data()
188 # Verify the save worked
189 if old_channel and isinstance(self.pattern_data, dict):
190 saved_functions = self.pattern_data.get(old_channel, [])
191 logger.debug(f"Saved {len(saved_functions)} functions to channel {old_channel}")
193 # Switch to new channel
194 self.selected_channel = channel
195 if isinstance(self.pattern_data, dict):
196 self.functions = self.pattern_data.get(channel, [])
197 logger.debug(f"Loaded {len(self.functions)} functions for channel {channel}")
198 else:
199 self.functions = []
201 # Update button text to show new channel
202 self._refresh_component_button()
204 # Channel switch will automatically trigger recomposition via reactive system
206 def _add_channel_to_pattern(self, channel: Any) -> None:
207 """Add a new channel (converts to dict mode if needed)."""
208 if not self.is_dict_mode:
209 # Convert to dict mode
210 self.pattern_data = {channel: self.functions}
211 self.is_dict_mode = True
212 self.selected_channel = channel
213 else:
214 # Add new channel with empty functions
215 if not isinstance(self.pattern_data, dict):
216 self.pattern_data = {}
217 self.pattern_data[channel] = []
218 self.selected_channel = channel
219 self.functions = []
221 def _remove_current_channel(self) -> None:
222 """Remove the currently selected channel."""
223 if not self.is_dict_mode or self.selected_channel is None:
224 return
226 if isinstance(self.pattern_data, dict):
227 new_pattern = self.pattern_data.copy()
228 if self.selected_channel in new_pattern:
229 del new_pattern[self.selected_channel]
231 if len(new_pattern) == 0:
232 # Revert to list mode
233 self.pattern_data = []
234 self.is_dict_mode = False
235 self.selected_channel = None
236 self.functions = []
237 else:
238 # Switch to first remaining channel
239 self.pattern_data = new_pattern
240 first_channel = next(iter(new_pattern))
241 self.selected_channel = first_channel
242 self.functions = new_pattern[first_channel]
244 def compose(self) -> ComposeResult:
245 """Compose the function list using the common interface pattern."""
246 from textual.containers import Vertical
248 with Vertical():
249 # Fixed header with title
250 yield Static("[bold]Functions[/bold]")
252 # Button row - takes minimal height needed for buttons
253 with Horizontal(id="function_list_header") as button_row:
254 button_row.styles.height = "auto" # CRITICAL: Take only needed height
256 # Empty space (flex-grows)
257 yield Static("")
259 # Centered main button group
260 with Horizontal() as main_button_group:
261 main_button_group.styles.width = "auto"
262 yield Button("Add", id="add_function_btn", compact=True)
263 yield Button("Load", id="load_func_btn", compact=True)
264 yield Button("Save As", id="save_func_as_btn", compact=True)
265 yield Button("Code", id="edit_vim_btn", compact=True)
267 # Component selection button (dynamic based on group_by setting)
268 component_text = self._get_component_button_text()
269 component_button = Button(component_text, id="component_btn", compact=True)
270 component_button.disabled = self._is_component_button_disabled()
271 yield component_button
273 # Channel navigation buttons (only in dict mode with multiple channels)
274 if self.is_dict_mode and isinstance(self.pattern_data, dict) and len(self.pattern_data) > 1:
275 yield Button("<", id="prev_channel_btn", compact=True)
276 yield Button(">", id="next_channel_btn", compact=True)
278 # Empty space (flex-grows)
279 yield Static("")
281 # Scrollable content area - expands to fill ALL remaining vertical space
282 with ScrollableContainer(id="function_list_content") as container:
283 container.styles.height = "1fr" # CRITICAL: Fill remaining space
285 if not self.functions:
286 content = Static("No functions defined. Click 'Add Function' to begin.")
287 content.styles.width = "100%"
288 content.styles.height = "100%"
289 yield content
290 else:
291 for i, func_item in enumerate(self.functions):
292 pane = FunctionPaneWidget(func_item, i)
293 pane.styles.width = "100%"
294 pane.styles.height = "auto" # CRITICAL: Only take height needed for content
295 yield pane
299 async def on_button_pressed(self, event: Button.Pressed) -> None:
300 """Handle button presses."""
301 logger.debug(f"🔍 BUTTON: Button pressed: {event.button.id}")
303 if event.button.id == "add_function_btn":
304 logger.debug("🔍 BUTTON: Add function button pressed")
305 await self._add_function()
306 elif event.button.id == "load_func_btn":
307 logger.debug("🔍 BUTTON: Load func button pressed - calling _load_func()")
308 await self._load_func()
309 elif event.button.id == "save_func_as_btn":
310 logger.debug("🔍 BUTTON: Save func as button pressed")
311 await self._save_func_as()
312 elif event.button.id == "edit_vim_btn":
313 logger.debug("🔍 BUTTON: Edit vim button pressed")
314 self._edit_in_vim()
315 elif event.button.id == "component_btn":
316 logger.debug("🔍 BUTTON: Component button pressed")
317 await self._show_component_selection_dialog()
318 elif event.button.id == "prev_channel_btn":
319 logger.debug("🔍 BUTTON: Previous channel button pressed")
320 self._navigate_channel(-1)
321 elif event.button.id == "next_channel_btn":
322 logger.debug("🔍 BUTTON: Next channel button pressed")
323 self._navigate_channel(1)
324 else:
325 logger.warning(f"🔍 BUTTON: Unknown button pressed: {event.button.id}")
329 def on_function_pane_widget_parameter_changed(self, event: Message) -> None:
330 """Handle parameter change message from FunctionPaneWidget."""
331 if hasattr(event, 'index') and hasattr(event, 'param_name') and hasattr(event, 'value'):
332 if 0 <= event.index < len(self.functions):
333 # Update function kwargs with proper type conversion
334 func, kwargs = self.functions[event.index]
335 new_kwargs = kwargs.copy()
337 # Convert value to proper type based on function signature
338 converted_value = event.value
339 try:
340 from openhcs.introspection.signature_analyzer import SignatureAnalyzer
341 from enum import Enum
343 param_info = SignatureAnalyzer.analyze(func)
344 if event.param_name in param_info:
345 param_details = param_info[event.param_name]
346 param_type = param_details.param_type
347 is_required = param_details.is_required
348 default_value = param_details.default_value
350 # Handle empty strings - always convert to None or default value
351 if isinstance(event.value, str) and event.value.strip() == "":
352 # Empty parameter - use default value (None for required params)
353 converted_value = default_value
354 elif isinstance(event.value, str) and event.value.strip() != "":
355 # Non-empty string - convert to proper type
356 if param_type == float:
357 converted_value = float(event.value)
358 elif param_type == int:
359 converted_value = int(event.value)
360 elif param_type == bool:
361 converted_value = event.value.lower() in ('true', '1', 'yes', 'on')
362 elif hasattr(param_type, '__bases__') and Enum in param_type.__bases__:
363 converted_value = param_type(event.value)
364 except (ValueError, TypeError, AttributeError) as e:
365 logger.warning(f"Failed to convert parameter '{event.param_name}' value '{event.value}': {e}")
366 converted_value = event.value # Keep original value on conversion failure
368 new_kwargs[event.param_name] = converted_value
370 # Update functions list WITHOUT triggering recomposition (to prevent focus loss)
371 # We modify the underlying list directly instead of assigning a new list
372 self.functions[event.index] = (func, new_kwargs)
374 # Manually update pattern data
375 self._update_pattern_data()
377 # Notify parent
378 self.post_message(self.FunctionPatternChanged())
379 logger.debug(f"Updated parameter {event.param_name}={converted_value} (type: {type(converted_value)}) for function {event.index}")
381 async def on_function_pane_widget_change_function(self, event: Message) -> None:
382 """Handle change function message from FunctionPaneWidget."""
383 if hasattr(event, 'index') and 0 <= event.index < len(self.functions):
384 await self._change_function(event.index)
386 def on_function_pane_widget_remove_function(self, event: Message) -> None:
387 """Handle remove function message from FunctionPaneWidget."""
388 if hasattr(event, 'index') and 0 <= event.index < len(self.functions):
389 new_functions = self.functions[:event.index] + self.functions[event.index+1:]
390 self.functions = new_functions
391 self._commit_and_notify()
392 logger.debug(f"Removed function at index {event.index}")
393 else:
394 logger.warning(f"Invalid index for remove function: {getattr(event, 'index', 'N/A')}")
396 async def on_function_pane_widget_add_function(self, event: Message) -> None:
397 """Handle add function message from FunctionPaneWidget."""
398 if hasattr(event, 'insert_index'):
399 insert_index = min(event.insert_index, len(self.functions)) # Clamp to valid range
400 await self._add_function_at_index(insert_index)
401 else:
402 logger.warning("Invalid add function event: missing insert_index")
404 def on_function_pane_widget_move_function(self, event: Message) -> None:
405 """Handle move function message from FunctionPaneWidget."""
406 if not (hasattr(event, 'index') and hasattr(event, 'direction')):
407 return
409 index = event.index
410 direction = event.direction
412 if not (0 <= index < len(self.functions)):
413 return
415 new_index = index + direction
416 if not (0 <= new_index < len(self.functions)):
417 return
419 new_functions = self.functions.copy()
420 new_functions[index], new_functions[new_index] = new_functions[new_index], new_functions[index]
421 self.functions = new_functions
422 self._commit_and_notify()
423 logger.debug(f"Moved function from index {index} to {new_index}")
425 async def _add_function(self) -> None:
426 """Add a new function to the end of the list."""
427 await self._add_function_at_index(len(self.functions))
429 async def _add_function_at_index(self, insert_index: int) -> None:
430 """Add a new function at the specified index."""
431 from openhcs.textual_tui.windows import FunctionSelectorWindow
432 from textual.css.query import NoMatches
434 def handle_function_selection(selected_function: Optional[Callable]) -> None:
435 if selected_function:
436 # Insert function at the specified index
437 new_functions = self.functions.copy()
438 new_functions.insert(insert_index, (selected_function, {}))
439 self.functions = new_functions
440 self._commit_and_notify()
441 logger.debug(f"Added function: {selected_function.__name__} at index {insert_index}")
443 # Use window-based function selector (follows ConfigWindow pattern)
444 try:
445 window = self.app.query_one(FunctionSelectorWindow)
446 # Window exists, update it and open
447 window.on_result_callback = handle_function_selection
448 window.open_state = True
449 except NoMatches:
450 # Expected case: window doesn't exist yet, create new one
451 window = FunctionSelectorWindow(on_result_callback=handle_function_selection)
452 await self.app.mount(window)
453 window.open_state = True
455 async def _change_function(self, index: int) -> None:
456 """Change function at specified index."""
457 from openhcs.textual_tui.windows import FunctionSelectorWindow
458 from textual.css.query import NoMatches
460 if 0 <= index < len(self.functions):
461 current_func, _ = self.functions[index]
463 def handle_function_selection(selected_function: Optional[Callable]) -> None:
464 if selected_function:
465 # Replace function but keep existing kwargs structure
466 new_functions = self.functions.copy()
467 new_functions[index] = (selected_function, {})
468 self.functions = new_functions
469 self._commit_and_notify()
470 logger.debug(f"Changed function at index {index} to: {selected_function.__name__}")
472 # Use window-based function selector (follows ConfigWindow pattern)
473 try:
474 window = self.app.query_one(FunctionSelectorWindow)
475 # Window exists, update it and open
476 window.current_function = current_func
477 window.on_result_callback = handle_function_selection
478 window.open_state = True
479 except NoMatches:
480 # Expected case: window doesn't exist yet, create new one
481 window = FunctionSelectorWindow(current_function=current_func, on_result_callback=handle_function_selection)
482 await self.app.mount(window)
483 window.open_state = True
485 def _commit_and_notify(self) -> None:
486 """Commit changes and notify parent of function pattern change."""
487 # Update pattern data before notifying
488 self._update_pattern_data()
489 # Post message to notify parent (DualEditorScreen) of changes
490 self.post_message(self.FunctionPatternChanged())
492 async def _load_func(self) -> None:
493 """Load function pattern from .func file."""
494 logger.debug("🔍 LOAD FUNC: _load_func() called - starting file browser...")
496 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
497 from openhcs.constants.constants import Backend
498 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
500 def handle_result(result):
501 logger.debug(f"🔍 LOAD FUNC: File browser callback received result: {result} (type: {type(result)})")
503 # Handle both single Path and list of Paths (disable multi-loading, take first only)
504 file_path = None
505 if isinstance(result, Path):
506 file_path = result
507 logger.debug(f"🔍 LOAD FUNC: Single Path received: {file_path}")
508 elif isinstance(result, list) and len(result) > 0:
509 file_path = result[0] # Take only the first file (disable multi-loading)
510 if len(result) > 1:
511 logger.warning(f"🔍 LOAD FUNC: Multiple files selected, using only first: {file_path}")
512 else:
513 logger.debug(f"🔍 LOAD FUNC: List with single Path received: {file_path}")
515 if file_path and isinstance(file_path, Path):
516 logger.debug(f"🔍 LOAD FUNC: Valid Path extracted, calling _load_pattern_from_file({file_path})")
517 self._load_pattern_from_file(file_path)
518 else:
519 logger.warning(f"🔍 LOAD FUNC: No valid Path found in result: {result}")
521 # Use window-based file browser
522 logger.debug("🔍 LOAD FUNC: Opening file browser window...")
523 from openhcs.textual_tui.services.file_browser_service import SelectionMode
524 await open_file_browser_window(
525 app=self.app,
526 file_manager=self.app.filemanager,
527 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS),
528 backend=Backend.DISK,
529 title="Load Function Pattern (.func)",
530 mode=BrowserMode.LOAD,
531 selection_mode=SelectionMode.FILES_ONLY,
532 filter_extensions=['.func'],
533 cache_key=PathCacheKey.FUNCTION_PATTERNS,
534 on_result_callback=handle_result
535 )
536 logger.debug("🔍 LOAD FUNC: File browser window opened, waiting for user selection...")
538 async def _save_func_as(self) -> None:
539 """Save function pattern to .func file."""
540 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode
541 from openhcs.constants.constants import Backend
542 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
544 def handle_result(result):
545 if result and isinstance(result, Path):
546 self._save_pattern_to_file(result)
548 # Use window-based file browser
549 from openhcs.textual_tui.services.file_browser_service import SelectionMode
550 await open_file_browser_window(
551 app=self.app,
552 file_manager=self.app.filemanager,
553 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS),
554 backend=Backend.DISK,
555 title="Save Function Pattern (.func)",
556 mode=BrowserMode.SAVE,
557 selection_mode=SelectionMode.FILES_ONLY,
558 filter_extensions=['.func'],
559 default_filename="pattern.func",
560 cache_key=PathCacheKey.FUNCTION_PATTERNS,
561 on_result_callback=handle_result
562 )
564 def _load_pattern_from_file(self, file_path: Path) -> None:
565 """Load pattern from .func file."""
566 logger.debug(f"🔍 LOAD FUNC: _load_pattern_from_file called with: {file_path}")
567 logger.debug(f"🔍 LOAD FUNC: File exists: {file_path.exists()}")
568 logger.debug(f"🔍 LOAD FUNC: File size: {file_path.stat().st_size if file_path.exists() else 'N/A'} bytes")
570 import dill as pickle
571 try:
572 logger.debug("🔍 LOAD FUNC: Opening file for reading...")
573 with open(file_path, 'rb') as f:
574 pattern = pickle.load(f)
575 logger.debug(f"🔍 LOAD FUNC: Successfully loaded pattern from pickle: {pattern}")
576 logger.debug(f"🔍 LOAD FUNC: Pattern type: {type(pattern)}")
578 # Log current state before loading
579 logger.debug(f"🔍 LOAD FUNC: BEFORE - functions: {len(self.functions)} items")
580 logger.debug(f"🔍 LOAD FUNC: BEFORE - is_dict_mode: {self.is_dict_mode}")
581 logger.debug(f"🔍 LOAD FUNC: BEFORE - selected_channel: {self.selected_channel}")
583 # Process pattern and create NEW objects (like add button does)
584 # This is crucial - reactive system only triggers on new object assignment
585 if pattern is None:
586 new_pattern_data = []
587 new_is_dict_mode = False
588 new_functions = []
589 new_selected_channel = None
590 elif callable(pattern):
591 # Single callable: treat as [(callable, {})]
592 new_pattern_data = [(pattern, {})]
593 new_is_dict_mode = False
594 new_functions = [(pattern, {})]
595 new_selected_channel = None
596 elif isinstance(pattern, tuple) and len(pattern) == 2 and callable(pattern[0]) and isinstance(pattern[1], dict):
597 # Single tuple (callable, kwargs): treat as [(callable, kwargs)]
598 new_pattern_data = [pattern]
599 new_is_dict_mode = False
600 new_functions = [pattern]
601 new_selected_channel = None
602 elif isinstance(pattern, list):
603 new_pattern_data = pattern
604 new_is_dict_mode = False
605 new_functions = self._normalize_function_list(pattern)
606 new_selected_channel = None
607 elif isinstance(pattern, dict):
608 # Convert any integer keys to string keys for consistency
609 normalized_dict = {}
610 for key, value in pattern.items():
611 str_key = str(key)
612 normalized_dict[str_key] = value
613 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)")
615 new_pattern_data = normalized_dict
616 new_is_dict_mode = True
617 # Set first channel as selected, or None if no channels
618 if normalized_dict:
619 first_channel = next(iter(normalized_dict))
620 new_selected_channel = first_channel
621 new_functions = self._normalize_function_list(normalized_dict.get(first_channel, []))
622 else:
623 new_selected_channel = None
624 new_functions = []
625 else:
626 logger.warning(f"Unknown pattern type: {type(pattern)}, using empty list")
627 new_pattern_data = []
628 new_is_dict_mode = False
629 new_functions = []
630 new_selected_channel = None
632 # Assign NEW objects to reactive properties (like add button does)
633 logger.debug("🔍 LOAD FUNC: Assigning new values to reactive properties...")
634 logger.debug(f"🔍 LOAD FUNC: new_pattern_data: {new_pattern_data}")
635 logger.debug(f"🔍 LOAD FUNC: new_is_dict_mode: {new_is_dict_mode}")
636 logger.debug(f"🔍 LOAD FUNC: new_functions: {len(new_functions)} items: {new_functions}")
637 logger.debug(f"🔍 LOAD FUNC: new_selected_channel: {new_selected_channel}")
639 self.pattern_data = new_pattern_data
640 logger.debug("🔍 LOAD FUNC: ✅ Assigned pattern_data")
642 self.is_dict_mode = new_is_dict_mode
643 logger.debug("🔍 LOAD FUNC: ✅ Assigned is_dict_mode")
645 self.functions = new_functions # This triggers reactive system!
646 logger.debug("🔍 LOAD FUNC: ✅ Assigned functions - THIS SHOULD TRIGGER REACTIVE SYSTEM!")
648 self.selected_channel = new_selected_channel
649 logger.debug("🔍 LOAD FUNC: ✅ Assigned selected_channel")
651 logger.debug("🔍 LOAD FUNC: Calling _commit_and_notify...")
652 self._commit_and_notify()
654 # Final state logging
655 logger.debug(f"🔍 LOAD FUNC: FINAL - functions: {len(self.functions)} items")
656 logger.debug(f"🔍 LOAD FUNC: FINAL - is_dict_mode: {self.is_dict_mode}")
657 logger.debug(f"🔍 LOAD FUNC: FINAL - selected_channel: {self.selected_channel}")
658 logger.debug("🔍 LOAD FUNC: ✅ Successfully completed loading process!")
660 except Exception as e:
661 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Failed to load pattern: {e}")
662 import traceback
663 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Traceback: {traceback.format_exc()}")
665 def _save_pattern_to_file(self, file_path: Path) -> None:
666 """Save pattern to .func file."""
667 import pickle
668 try:
669 with open(file_path, 'wb') as f:
670 pickle.dump(self.current_pattern, f)
671 except Exception as e:
672 logger.error(f"Failed to save pattern: {e}")
674 def _edit_in_vim(self) -> None:
675 """Edit function pattern as Python code in terminal window."""
676 logger.debug("Edit button pressed - opening terminal editor")
678 try:
679 # Use debug module's pattern formatting with proper imports
680 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher
682 # Update pattern data first
683 self._update_pattern_data()
685 # Generate complete Python code with imports (like debug module does)
686 python_code = self._generate_complete_python_code()
688 # Create terminal launcher
689 launcher = TerminalLauncher(self.app)
691 # Launch editor in terminal window with callback
692 self.app.call_later(
693 launcher.launch_editor_for_file,
694 python_code,
695 '.py',
696 self._handle_edited_pattern
697 )
699 except Exception as e:
700 logger.error(f"Failed to launch terminal editor: {e}")
701 self.app.show_error("Edit Error", f"Failed to launch terminal editor: {str(e)}")
703 def _handle_edited_pattern(self, edited_code: str) -> None:
704 """Handle the edited pattern code from terminal editor."""
705 try:
706 # Execute the code (it has all necessary imports)
707 namespace = {}
708 exec(edited_code, namespace)
710 # Get the pattern from the namespace
711 if 'pattern' in namespace:
712 new_pattern = namespace['pattern']
713 self._apply_edited_pattern(new_pattern)
714 else:
715 self.app.show_error("Parse Error", "No 'pattern = ...' assignment found in edited code")
717 except SyntaxError as e:
718 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}")
719 except Exception as e:
720 logger.error(f"Failed to parse edited pattern: {e}")
721 self.app.show_error("Edit Error", f"Failed to parse edited pattern: {str(e)}")
723 def _generate_complete_python_code(self) -> str:
724 """Generate complete Python code with imports (following debug module approach)."""
725 # Use complete function pattern code generation from pickle_to_python
726 from openhcs.debug.pickle_to_python import generate_complete_function_pattern_code
728 # Disable clean_mode to preserve all parameters when same function appears multiple times
729 # This prevents parsing issues when the same function has different parameter sets
730 return generate_complete_function_pattern_code(self.pattern_data, clean_mode=False)
734 def _apply_edited_pattern(self, new_pattern):
735 """Apply the edited pattern back to the TUI."""
736 try:
737 if self.is_dict_mode:
738 if isinstance(new_pattern, dict):
739 self.pattern_data = new_pattern
740 # Update current channel if it exists in new pattern
741 if self.selected_channel and self.selected_channel in new_pattern:
742 self.functions = self._normalize_function_list(new_pattern[self.selected_channel])
743 else:
744 # Select first channel
745 if new_pattern:
746 self.selected_channel = next(iter(new_pattern))
747 self.functions = self._normalize_function_list(new_pattern[self.selected_channel])
748 else:
749 self.functions = []
750 else:
751 raise ValueError("Expected dict pattern for dict mode")
752 else:
753 if isinstance(new_pattern, list):
754 self.pattern_data = new_pattern
755 self.functions = self._normalize_function_list(new_pattern)
756 elif callable(new_pattern):
757 # Single callable: treat as [(callable, {})]
758 self.pattern_data = [(new_pattern, {})]
759 self.functions = [(new_pattern, {})]
760 elif isinstance(new_pattern, tuple) and len(new_pattern) == 2 and callable(new_pattern[0]) and isinstance(new_pattern[1], dict):
761 # Single tuple (callable, kwargs): treat as [(callable, kwargs)]
762 self.pattern_data = [new_pattern]
763 self.functions = [new_pattern]
764 else:
765 raise ValueError(f"Expected list, callable, or (callable, dict) tuple pattern for list mode, got {type(new_pattern)}")
767 # Refresh the UI and notify of changes
768 self.refresh()
769 self._commit_and_notify()
771 except Exception as e:
772 self.app.show_error("Apply Error", f"Failed to apply edited pattern: {str(e)}")
774 def _generate_pattern_python_code(self) -> str:
775 """Generate Python code representation of the current pattern."""
776 # Update pattern data first
777 self._update_pattern_data()
779 # Use centralized code generation from debug module to ensure consistent collision resolution
780 from openhcs.debug.pickle_to_python import generate_complete_function_pattern_code
781 return generate_complete_function_pattern_code(self.pattern_data)
785 def _get_component_button_text(self) -> str:
786 """Get text for the component selection button based on current group_by setting."""
787 if self.current_group_by is None or self.current_group_by == GroupBy.NONE:
788 return "Component: None"
790 # Use group_by.value.value.title() for dynamic component type display
791 # GroupBy.value is now a VariableComponents enum, so we need .value to get the string
792 component_type = self.current_group_by.value.value.title()
794 if self.is_dict_mode and self.selected_channel is not None:
795 # Try to get metadata name for the selected component
796 display_name = self._get_component_display_name(self.selected_channel)
797 return f"{component_type}: {display_name}"
798 return f"{component_type}: None"
800 def _get_component_display_name(self, component_key: str) -> str:
801 """Get display name for component key, using metadata if available."""
802 # Try to get metadata name from orchestrator
803 orchestrator = self._get_current_orchestrator()
804 if orchestrator and self.current_group_by:
805 try:
806 metadata_name = orchestrator.metadata_cache.get_component_metadata(self.current_group_by, component_key)
807 if metadata_name:
808 return metadata_name
809 except Exception as e:
810 logger.debug(f"Could not get metadata for {self.current_group_by.value} {component_key}: {e}")
812 # Fallback to component key
813 return component_key
815 def _refresh_component_button(self) -> None:
816 """Refresh the component button text and state."""
817 try:
818 component_button = self.query_one("#component_btn", Button)
819 component_button.label = self._get_component_button_text()
820 component_button.disabled = self._is_component_button_disabled()
821 except Exception as e:
822 logger.debug(f"Could not refresh component button: {e}")
824 def _is_component_button_disabled(self) -> bool:
825 """Check if component selection button should be disabled."""
826 return (
827 self.current_group_by is None or
828 self.current_group_by == GroupBy.NONE or
829 (self.current_variable_components and
830 self.current_group_by.value in [vc.value for vc in self.current_variable_components])
831 )
833 async def _show_component_selection_dialog(self) -> None:
834 """Show the component selection dialog for the current group_by setting."""
835 try:
836 # Check if component selection is disabled
837 if self._is_component_button_disabled():
838 logger.debug("Component selection is disabled")
839 return
841 # Get available components from orchestrator using current group_by
842 orchestrator = self._get_current_orchestrator()
843 if orchestrator is None:
844 logger.warning("No orchestrator available for component selection")
845 return
847 available_components = orchestrator.get_component_keys(self.current_group_by)
848 if not available_components:
849 component_type = self.current_group_by.value
850 logger.warning(f"No {component_type} values found in current plate")
851 return
853 # Get currently selected components from cache or current pattern
854 if self.current_group_by in self.component_selections:
855 # Use cached selection for this group_by
856 selected_components = self.component_selections[self.current_group_by]
857 logger.debug(f"Step '{self.step_identifier}': Using cached selection for {self.current_group_by.value}: {selected_components}")
858 elif self.is_dict_mode and isinstance(self.pattern_data, dict):
859 # Fallback to current pattern keys
860 selected_components = list(self.pattern_data.keys())
861 else:
862 selected_components = []
864 # Show window with dynamic component type
865 from openhcs.textual_tui.windows import GroupBySelectorWindow
866 from textual.css.query import NoMatches
868 def handle_selection(result_components):
869 if result_components is not None:
870 self._update_components(result_components)
872 # Use window-based group-by selector (follows ConfigWindow pattern)
873 try:
874 window = self.app.query_one(GroupBySelectorWindow)
875 # Window exists, update it and open
876 window.available_channels = available_components
877 window.selected_channels = selected_components
878 window.component_type = self.current_group_by.value
879 window.orchestrator = orchestrator
880 window.on_result_callback = handle_selection
881 window.open_state = True
882 except NoMatches:
883 # Expected case: window doesn't exist yet, create new one
884 window = GroupBySelectorWindow(
885 available_channels=available_components,
886 selected_channels=selected_components,
887 on_result_callback=handle_selection,
888 component_type=self.current_group_by.value,
889 orchestrator=orchestrator
890 )
891 await self.app.mount(window)
892 window.open_state = True
894 except Exception as e:
895 component_type = self.current_group_by.value if self.current_group_by else "component"
896 logger.error(f"Failed to show {component_type} selection dialog: {e}")
898 def _update_components(self, new_components: List[str]) -> None:
899 """
900 Update the pattern based on new component selection.
902 Uses string component keys directly to match the pattern detection system.
903 Pattern detection always returns string component values (e.g., '1', '2', '3') when
904 grouping by any component, so function patterns use string keys for consistency.
905 """
906 if not new_components:
907 # No components selected - revert to list mode
908 if self.is_dict_mode:
909 # Save current functions to list mode
910 self.pattern_data = self.functions
911 self.is_dict_mode = False
912 self.selected_channel = None
913 logger.debug("Reverted to list mode (no components selected)")
914 else:
915 # Use component strings directly - no conversion needed
916 component_keys = new_components
918 # Components selected - ensure dict mode
919 if not self.is_dict_mode:
920 # Convert to dict mode
921 current_functions = self.functions
922 self.pattern_data = {component_keys[0]: current_functions}
923 self.is_dict_mode = True
924 self.selected_channel = component_keys[0]
926 # Add other components with empty functions
927 for component_key in component_keys[1:]:
928 self.pattern_data[component_key] = []
929 else:
930 # Already in dict mode - update components
931 old_pattern = self.pattern_data.copy() if isinstance(self.pattern_data, dict) else {}
932 new_pattern = {}
934 # Keep existing functions for components that remain
935 for component_key in component_keys:
936 # Check both string and integer keys for backward compatibility
937 if component_key in old_pattern:
938 new_pattern[component_key] = old_pattern[component_key]
939 else:
940 # Try integer key for backward compatibility
941 try:
942 component_int = int(component_key)
943 if component_int in old_pattern:
944 new_pattern[component_key] = old_pattern[component_int]
945 else:
946 new_pattern[component_key] = []
947 except ValueError:
948 new_pattern[component_key] = []
950 self.pattern_data = new_pattern
952 # Update selected component if needed
953 if self.selected_channel not in component_keys:
954 self.selected_channel = component_keys[0] if component_keys else None
955 if self.selected_channel:
956 self.functions = new_pattern.get(self.selected_channel, [])
958 # Save selection to cache for current group_by
959 if self.current_group_by is not None and self.current_group_by != GroupBy.NONE:
960 self.component_selections[self.current_group_by] = new_components
961 logger.debug(f"Step '{self.step_identifier}': Cached selection for {self.current_group_by.value}: {new_components}")
963 self._commit_and_notify()
964 self._refresh_component_button()
965 logger.debug(f"Updated components: {new_components}")
967 def _navigate_channel(self, direction: int) -> None:
968 """Navigate to next/previous channel (with looping)."""
969 if not self.is_dict_mode or not isinstance(self.pattern_data, dict):
970 return
972 channels = sorted(self.pattern_data.keys())
973 if len(channels) <= 1:
974 return
976 try:
977 current_index = channels.index(self.selected_channel)
978 new_index = (current_index + direction) % len(channels)
979 new_channel = channels[new_index]
981 self._switch_to_channel(new_channel)
982 logger.debug(f"Navigated to channel {new_channel}")
983 except (ValueError, IndexError):
984 logger.warning(f"Failed to navigate channels: current={self.selected_channel}, channels={channels}")
988 def _get_current_orchestrator(self):
989 """Get the current orchestrator from the app."""
990 try:
991 # Get from app - PlateManagerWidget is now in PipelinePlateWindow
992 if hasattr(self.app, 'query_one'):
993 from openhcs.textual_tui.windows import PipelinePlateWindow
994 from openhcs.textual_tui.widgets.plate_manager import PlateManagerWidget
996 # Try to find the PipelinePlateWindow first
997 try:
998 pipeline_plate_window = self.app.query_one(PipelinePlateWindow)
999 plate_manager = pipeline_plate_window.plate_widget
1000 except:
1001 # Fallback: try to find PlateManagerWidget directly in the app
1002 plate_manager = self.app.query_one(PlateManagerWidget)
1004 # Use selected_plate (not current_plate!)
1005 selected_plate = plate_manager.selected_plate
1006 if selected_plate and selected_plate in plate_manager.orchestrators:
1007 orchestrator = plate_manager.orchestrators[selected_plate]
1008 if not orchestrator.is_initialized():
1009 orchestrator.initialize()
1010 return orchestrator
1011 return None
1012 except Exception as e:
1013 logger.error(f"Failed to get orchestrator: {e}")
1014 return None