Coverage for openhcs/textual_tui/widgets/function_list_editor.py: 0.0%

626 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1"""Function list editor widget - port of function_list_manager.py to Textual.""" 

2 

3import logging 

4from pathlib import Path 

5from typing import List, Union, Dict, Any, Optional, Callable # Added Optional, Callable 

6from textual.containers import ScrollableContainer, Container, Horizontal, Center 

7from textual.widgets import Button, Static, Select 

8from textual.app import ComposeResult 

9from textual.reactive import reactive 

10from textual.message import Message # Added Message 

11 

12from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

13from openhcs.textual_tui.services.pattern_data_manager import PatternDataManager 

14from openhcs.textual_tui.widgets.function_pane import FunctionPaneWidget 

15from openhcs.constants.constants import GroupBy, VariableComponents 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class FunctionListEditorWidget(Container): 

21 """ 

22 Scrollable function list editor. 

23 

24 Ports the function display and management logic from function_list_manager.py 

25 """ 

26 

27 class FunctionPatternChanged(Message): 

28 """Message to indicate the function pattern has changed.""" 

29 pass 

30 

31 # Reactive properties for automatic UI updates 

32 functions = reactive(list, recompose=True) # Structural changes (add/remove) should trigger recomposition 

33 pattern_data = reactive(list, recompose=False) # The actual pattern (List or Dict) 

34 is_dict_mode = reactive(False, recompose=True) # Whether we're in channel-specific mode - affects UI layout 

35 selected_channel = reactive(None, recompose=True) # Currently selected channel - affects button text 

36 available_channels = reactive(list) # Available channels from orchestrator 

37 

38 # Step configuration reactive properties for dynamic component selection 

39 current_group_by = reactive(None, recompose=True) # Current GroupBy setting from step editor 

40 current_variable_components = reactive(list, recompose=True) # Current VariableComponents list from step editor 

41 

42 def __init__(self, initial_functions: Union[List, Dict, callable, None] = None, step_identifier: str = None): 

43 super().__init__() 

44 

45 # Initialize services (reuse existing business logic) 

46 self.registry_service = RegistryService() 

47 self.data_manager = PatternDataManager() # Not heavily used yet, but available 

48 

49 # Step identifier for cache isolation (optional, defaults to widget instance id) 

50 self.step_identifier = step_identifier or f"widget_{id(self)}" 

51 

52 # Component selection cache per GroupBy (instance-specific, not shared between steps) 

53 self.component_selections: Dict[GroupBy, List[str]] = {} 

54 

55 # Initialize pattern data and mode 

56 self._initialize_pattern_data(initial_functions) 

57 

58 logger.debug(f"FunctionListEditorWidget initialized for step '{self.step_identifier}' with {len(self.functions)} functions, dict_mode={self.is_dict_mode}") 

59 

60 @property 

61 def current_pattern(self) -> Union[List, Dict]: 

62 """Get the current pattern data (for parent widgets to access).""" 

63 self._update_pattern_data() # Ensure it's up to date 

64 

65 # Migration fix: Convert any integer keys to string keys for compatibility 

66 # with pattern detection system which always uses string component values 

67 if isinstance(self.pattern_data, dict): 

68 migrated_pattern = {} 

69 for key, value in self.pattern_data.items(): 

70 str_key = str(key) 

71 migrated_pattern[str_key] = value 

72 return migrated_pattern 

73 

74 return self.pattern_data 

75 

76 def watch_functions(self, new_functions: List) -> None: 

77 """Watch for changes to functions and update pattern data.""" 

78 # Update pattern data when functions change (structural changes only) 

79 self._update_pattern_data() 

80 

81 def _initialize_pattern_data(self, initial_functions: Union[List, Dict, callable, tuple, None]) -> None: 

82 """Initialize pattern data and determine mode.""" 

83 if initial_functions is None: 

84 self.pattern_data = [] 

85 self.is_dict_mode = False 

86 self.functions = [] 

87 elif callable(initial_functions): 

88 # Single callable: treat as [(callable, {})] 

89 self.pattern_data = [(initial_functions, {})] 

90 self.is_dict_mode = False 

91 self.functions = [(initial_functions, {})] 

92 elif isinstance(initial_functions, tuple) and len(initial_functions) == 2 and callable(initial_functions[0]) and isinstance(initial_functions[1], dict): 

93 # Single tuple (callable, kwargs): treat as [(callable, kwargs)] 

94 self.pattern_data = [initial_functions] 

95 self.is_dict_mode = False 

96 self.functions = [initial_functions] 

97 elif isinstance(initial_functions, list): 

98 self.pattern_data = initial_functions 

99 self.is_dict_mode = False 

100 self.functions = self._normalize_function_list(initial_functions) 

101 elif isinstance(initial_functions, dict): 

102 # Convert any integer keys to string keys for consistency 

103 normalized_dict = {} 

104 for key, value in initial_functions.items(): 

105 str_key = str(key) # Ensure all keys are strings 

106 normalized_dict[str_key] = value 

107 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)") 

108 

109 self.pattern_data = normalized_dict 

110 self.is_dict_mode = True 

111 # Set first channel as selected, or empty if no channels 

112 if normalized_dict: 

113 first_channel = next(iter(normalized_dict)) 

114 self.selected_channel = first_channel 

115 self.functions = self._normalize_function_list(normalized_dict.get(first_channel, [])) 

116 else: 

117 self.selected_channel = None 

118 self.functions = [] 

119 else: 

120 logger.warning(f"Unknown initial_functions type: {type(initial_functions)}, using empty list") 

121 self.pattern_data = [] 

122 self.is_dict_mode = False 

123 self.functions = [] 

124 

125 

126 def _normalize_function_list(self, func_list: List[Any]) -> List[tuple[Callable, Dict]]: 

127 """Ensures all items in a function list are (callable, kwargs) tuples.""" 

128 normalized = [] 

129 for item in func_list: 

130 if isinstance(item, tuple) and len(item) == 2 and callable(item[0]) and isinstance(item[1], dict): 

131 normalized.append(item) 

132 elif callable(item): 

133 normalized.append((item, {})) 

134 else: 

135 logger.warning(f"Skipping invalid item in function list: {item}") 

136 return normalized 

137 

138 def _commit_and_notify(self) -> None: 

139 """Post a change message to notify parent of function pattern changes.""" 

140 self.post_message(self.FunctionPatternChanged()) 

141 logger.debug("Posted FunctionPatternChanged message to parent.") 

142 

143 def _trigger_recomposition(self) -> None: 

144 """Manually trigger recomposition when needed (e.g., loading patterns, adding/removing functions).""" 

145 # Force recomposition by mutating the reactive property 

146 # This is needed because functions has recompose=False to prevent focus loss during typing 

147 self.mutate_reactive(FunctionListEditorWidget.functions) 

148 

149 def watch_current_group_by(self, old_group_by: Optional[GroupBy], new_group_by: Optional[GroupBy]) -> None: 

150 """Handle group_by changes by saving/loading component selections.""" 

151 if old_group_by is not None and old_group_by != GroupBy.NONE: 

152 # Save current selection for old group_by 

153 if self.is_dict_mode and isinstance(self.pattern_data, dict): 

154 current_selection = list(self.pattern_data.keys()) 

155 self.component_selections[old_group_by] = current_selection 

156 logger.debug(f"Step '{self.step_identifier}': Saved selection for {old_group_by.value}: {current_selection}") 

157 

158 # Note: We don't automatically load selection for new_group_by here 

159 # because the dialog will handle loading from cache when opened 

160 logger.debug(f"Group by changed from {old_group_by} to {new_group_by}") 

161 

162 def _update_pattern_data(self) -> None: 

163 """Update pattern_data based on current functions and mode.""" 

164 if self.is_dict_mode and self.selected_channel is not None: 

165 # Save current functions to the selected channel 

166 if not isinstance(self.pattern_data, dict): 

167 self.pattern_data = {} 

168 logger.debug(f"Saving {len(self.functions)} functions to channel {self.selected_channel}") 

169 self.pattern_data[self.selected_channel] = self.functions.copy() # Make a copy to avoid reference issues 

170 else: 

171 # List mode - pattern_data is just the functions list 

172 self.pattern_data = self.functions 

173 

174 

175 

176 def _switch_to_channel(self, channel: Any) -> None: 

177 """Switch to editing functions for a specific channel.""" 

178 if not self.is_dict_mode: 

179 return 

180 

181 # Save current functions first 

182 old_channel = self.selected_channel 

183 logger.debug(f"Switching from channel {old_channel} to {channel}") 

184 logger.debug(f"Current functions before save: {len(self.functions)} functions") 

185 

186 self._update_pattern_data() 

187 

188 # Verify the save worked 

189 if old_channel and isinstance(self.pattern_data, dict): 

190 saved_functions = self.pattern_data.get(old_channel, []) 

191 logger.debug(f"Saved {len(saved_functions)} functions to channel {old_channel}") 

192 

193 # Switch to new channel 

194 self.selected_channel = channel 

195 if isinstance(self.pattern_data, dict): 

196 self.functions = self.pattern_data.get(channel, []) 

197 logger.debug(f"Loaded {len(self.functions)} functions for channel {channel}") 

198 else: 

199 self.functions = [] 

200 

201 # Update button text to show new channel 

202 self._refresh_component_button() 

203 

204 # Channel switch will automatically trigger recomposition via reactive system 

205 

206 def _add_channel_to_pattern(self, channel: Any) -> None: 

207 """Add a new channel (converts to dict mode if needed).""" 

208 if not self.is_dict_mode: 

209 # Convert to dict mode 

210 self.pattern_data = {channel: self.functions} 

211 self.is_dict_mode = True 

212 self.selected_channel = channel 

213 else: 

214 # Add new channel with empty functions 

215 if not isinstance(self.pattern_data, dict): 

216 self.pattern_data = {} 

217 self.pattern_data[channel] = [] 

218 self.selected_channel = channel 

219 self.functions = [] 

220 

221 def _remove_current_channel(self) -> None: 

222 """Remove the currently selected channel.""" 

223 if not self.is_dict_mode or self.selected_channel is None: 

224 return 

225 

226 if isinstance(self.pattern_data, dict): 

227 new_pattern = self.pattern_data.copy() 

228 if self.selected_channel in new_pattern: 

229 del new_pattern[self.selected_channel] 

230 

231 if len(new_pattern) == 0: 

232 # Revert to list mode 

233 self.pattern_data = [] 

234 self.is_dict_mode = False 

235 self.selected_channel = None 

236 self.functions = [] 

237 else: 

238 # Switch to first remaining channel 

239 self.pattern_data = new_pattern 

240 first_channel = next(iter(new_pattern)) 

241 self.selected_channel = first_channel 

242 self.functions = new_pattern[first_channel] 

243 

244 def compose(self) -> ComposeResult: 

245 """Compose the function list using the common interface pattern.""" 

246 from textual.containers import Vertical 

247 

248 with Vertical(): 

249 # Fixed header with title 

250 yield Static("[bold]Functions[/bold]") 

251 

252 # Button row - takes minimal height needed for buttons 

253 with Horizontal(id="function_list_header") as button_row: 

254 button_row.styles.height = "auto" # CRITICAL: Take only needed height 

255 

256 # Empty space (flex-grows) 

257 yield Static("") 

258 

259 # Centered main button group 

260 with Horizontal() as main_button_group: 

261 main_button_group.styles.width = "auto" 

262 yield Button("Add", id="add_function_btn", compact=True) 

263 yield Button("Load", id="load_func_btn", compact=True) 

264 yield Button("Save As", id="save_func_as_btn", compact=True) 

265 yield Button("Code", id="edit_vim_btn", compact=True) 

266 

267 # Component selection button (dynamic based on group_by setting) 

268 component_text = self._get_component_button_text() 

269 component_button = Button(component_text, id="component_btn", compact=True) 

270 component_button.disabled = self._is_component_button_disabled() 

271 yield component_button 

272 

273 # Channel navigation buttons (only in dict mode with multiple channels) 

274 if self.is_dict_mode and isinstance(self.pattern_data, dict) and len(self.pattern_data) > 1: 

275 yield Button("<", id="prev_channel_btn", compact=True) 

276 yield Button(">", id="next_channel_btn", compact=True) 

277 

278 # Empty space (flex-grows) 

279 yield Static("") 

280 

281 # Scrollable content area - expands to fill ALL remaining vertical space 

282 with ScrollableContainer(id="function_list_content") as container: 

283 container.styles.height = "1fr" # CRITICAL: Fill remaining space 

284 

285 if not self.functions: 

286 content = Static("No functions defined. Click 'Add Function' to begin.") 

287 content.styles.width = "100%" 

288 content.styles.height = "100%" 

289 yield content 

290 else: 

291 for i, func_item in enumerate(self.functions): 

292 pane = FunctionPaneWidget(func_item, i) 

293 pane.styles.width = "100%" 

294 pane.styles.height = "auto" # CRITICAL: Only take height needed for content 

295 yield pane 

296 

297 

298 

299 async def on_button_pressed(self, event: Button.Pressed) -> None: 

300 """Handle button presses.""" 

301 logger.debug(f"🔍 BUTTON: Button pressed: {event.button.id}") 

302 

303 if event.button.id == "add_function_btn": 

304 logger.debug(f"🔍 BUTTON: Add function button pressed") 

305 await self._add_function() 

306 elif event.button.id == "load_func_btn": 

307 logger.debug(f"🔍 BUTTON: Load func button pressed - calling _load_func()") 

308 await self._load_func() 

309 elif event.button.id == "save_func_as_btn": 

310 logger.debug(f"🔍 BUTTON: Save func as button pressed") 

311 await self._save_func_as() 

312 elif event.button.id == "edit_vim_btn": 

313 logger.debug(f"🔍 BUTTON: Edit vim button pressed") 

314 self._edit_in_vim() 

315 elif event.button.id == "component_btn": 

316 logger.debug(f"🔍 BUTTON: Component button pressed") 

317 await self._show_component_selection_dialog() 

318 elif event.button.id == "prev_channel_btn": 

319 logger.debug(f"🔍 BUTTON: Previous channel button pressed") 

320 self._navigate_channel(-1) 

321 elif event.button.id == "next_channel_btn": 

322 logger.debug(f"🔍 BUTTON: Next channel button pressed") 

323 self._navigate_channel(1) 

324 else: 

325 logger.warning(f"🔍 BUTTON: Unknown button pressed: {event.button.id}") 

326 

327 

328 

329 def on_function_pane_widget_parameter_changed(self, event: Message) -> None: 

330 """Handle parameter change message from FunctionPaneWidget.""" 

331 if hasattr(event, 'index') and hasattr(event, 'param_name') and hasattr(event, 'value'): 

332 if 0 <= event.index < len(self.functions): 

333 # Update function kwargs with proper type conversion 

334 func, kwargs = self.functions[event.index] 

335 new_kwargs = kwargs.copy() 

336 

337 # Convert value to proper type based on function signature 

338 converted_value = event.value 

339 try: 

340 from openhcs.textual_tui.widgets.shared.signature_analyzer import SignatureAnalyzer 

341 from enum import Enum 

342 

343 param_info = SignatureAnalyzer.analyze(func) 

344 if event.param_name in param_info: 

345 param_details = param_info[event.param_name] 

346 param_type = param_details.param_type 

347 is_required = param_details.is_required 

348 default_value = param_details.default_value 

349 

350 # Handle empty strings - always convert to None or default value 

351 if isinstance(event.value, str) and event.value.strip() == "": 

352 # Empty parameter - use default value (None for required params) 

353 converted_value = default_value 

354 elif isinstance(event.value, str) and event.value.strip() != "": 

355 # Non-empty string - convert to proper type 

356 if param_type == float: 

357 converted_value = float(event.value) 

358 elif param_type == int: 

359 converted_value = int(event.value) 

360 elif param_type == bool: 

361 converted_value = event.value.lower() in ('true', '1', 'yes', 'on') 

362 elif hasattr(param_type, '__bases__') and Enum in param_type.__bases__: 

363 converted_value = param_type(event.value) 

364 except (ValueError, TypeError, AttributeError) as e: 

365 logger.warning(f"Failed to convert parameter '{event.param_name}' value '{event.value}': {e}") 

366 converted_value = event.value # Keep original value on conversion failure 

367 

368 new_kwargs[event.param_name] = converted_value 

369 

370 # Update functions list WITHOUT triggering recomposition (to prevent focus loss) 

371 # We modify the underlying list directly instead of assigning a new list 

372 self.functions[event.index] = (func, new_kwargs) 

373 

374 # Manually update pattern data 

375 self._update_pattern_data() 

376 

377 # Notify parent 

378 self.post_message(self.FunctionPatternChanged()) 

379 logger.debug(f"Updated parameter {event.param_name}={converted_value} (type: {type(converted_value)}) for function {event.index}") 

380 

381 async def on_function_pane_widget_change_function(self, event: Message) -> None: 

382 """Handle change function message from FunctionPaneWidget.""" 

383 if hasattr(event, 'index') and 0 <= event.index < len(self.functions): 

384 await self._change_function(event.index) 

385 

386 def on_function_pane_widget_remove_function(self, event: Message) -> None: 

387 """Handle remove function message from FunctionPaneWidget.""" 

388 if hasattr(event, 'index') and 0 <= event.index < len(self.functions): 

389 new_functions = self.functions[:event.index] + self.functions[event.index+1:] 

390 self.functions = new_functions 

391 self._commit_and_notify() 

392 logger.debug(f"Removed function at index {event.index}") 

393 else: 

394 logger.warning(f"Invalid index for remove function: {getattr(event, 'index', 'N/A')}") 

395 

396 async def on_function_pane_widget_add_function(self, event: Message) -> None: 

397 """Handle add function message from FunctionPaneWidget.""" 

398 if hasattr(event, 'insert_index'): 

399 insert_index = min(event.insert_index, len(self.functions)) # Clamp to valid range 

400 await self._add_function_at_index(insert_index) 

401 else: 

402 logger.warning(f"Invalid add function event: missing insert_index") 

403 

404 def on_function_pane_widget_move_function(self, event: Message) -> None: 

405 """Handle move function message from FunctionPaneWidget.""" 

406 if not (hasattr(event, 'index') and hasattr(event, 'direction')): 

407 return 

408 

409 index = event.index 

410 direction = event.direction 

411 

412 if not (0 <= index < len(self.functions)): 

413 return 

414 

415 new_index = index + direction 

416 if not (0 <= new_index < len(self.functions)): 

417 return 

418 

419 new_functions = self.functions.copy() 

420 new_functions[index], new_functions[new_index] = new_functions[new_index], new_functions[index] 

421 self.functions = new_functions 

422 self._commit_and_notify() 

423 logger.debug(f"Moved function from index {index} to {new_index}") 

424 

425 async def _add_function(self) -> None: 

426 """Add a new function to the end of the list.""" 

427 await self._add_function_at_index(len(self.functions)) 

428 

429 async def _add_function_at_index(self, insert_index: int) -> None: 

430 """Add a new function at the specified index.""" 

431 from openhcs.textual_tui.windows import FunctionSelectorWindow 

432 from textual.css.query import NoMatches 

433 

434 def handle_function_selection(selected_function: Optional[Callable]) -> None: 

435 if selected_function: 

436 # Insert function at the specified index 

437 new_functions = self.functions.copy() 

438 new_functions.insert(insert_index, (selected_function, {})) 

439 self.functions = new_functions 

440 self._commit_and_notify() 

441 logger.debug(f"Added function: {selected_function.__name__} at index {insert_index}") 

442 

443 # Use window-based function selector (follows ConfigWindow pattern) 

444 try: 

445 window = self.app.query_one(FunctionSelectorWindow) 

446 # Window exists, update it and open 

447 window.on_result_callback = handle_function_selection 

448 window.open_state = True 

449 except NoMatches: 

450 # Expected case: window doesn't exist yet, create new one 

451 window = FunctionSelectorWindow(on_result_callback=handle_function_selection) 

452 await self.app.mount(window) 

453 window.open_state = True 

454 

455 async def _change_function(self, index: int) -> None: 

456 """Change function at specified index.""" 

457 from openhcs.textual_tui.windows import FunctionSelectorWindow 

458 from textual.css.query import NoMatches 

459 

460 if 0 <= index < len(self.functions): 

461 current_func, _ = self.functions[index] 

462 

463 def handle_function_selection(selected_function: Optional[Callable]) -> None: 

464 if selected_function: 

465 # Replace function but keep existing kwargs structure 

466 new_functions = self.functions.copy() 

467 new_functions[index] = (selected_function, {}) 

468 self.functions = new_functions 

469 self._commit_and_notify() 

470 logger.debug(f"Changed function at index {index} to: {selected_function.__name__}") 

471 

472 # Use window-based function selector (follows ConfigWindow pattern) 

473 try: 

474 window = self.app.query_one(FunctionSelectorWindow) 

475 # Window exists, update it and open 

476 window.current_function = current_func 

477 window.on_result_callback = handle_function_selection 

478 window.open_state = True 

479 except NoMatches: 

480 # Expected case: window doesn't exist yet, create new one 

481 window = FunctionSelectorWindow(current_function=current_func, on_result_callback=handle_function_selection) 

482 await self.app.mount(window) 

483 window.open_state = True 

484 

485 def _commit_and_notify(self) -> None: 

486 """Commit changes and notify parent of function pattern change.""" 

487 # Update pattern data before notifying 

488 self._update_pattern_data() 

489 # Post message to notify parent (DualEditorScreen) of changes 

490 self.post_message(self.FunctionPatternChanged()) 

491 

492 async def _load_func(self) -> None: 

493 """Load function pattern from .func file.""" 

494 logger.debug(f"🔍 LOAD FUNC: _load_func() called - starting file browser...") 

495 

496 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

497 from openhcs.constants.constants import Backend 

498 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

499 

500 def handle_result(result): 

501 logger.debug(f"🔍 LOAD FUNC: File browser callback received result: {result} (type: {type(result)})") 

502 

503 # Handle both single Path and list of Paths (disable multi-loading, take first only) 

504 file_path = None 

505 if isinstance(result, Path): 

506 file_path = result 

507 logger.debug(f"🔍 LOAD FUNC: Single Path received: {file_path}") 

508 elif isinstance(result, list) and len(result) > 0: 

509 file_path = result[0] # Take only the first file (disable multi-loading) 

510 if len(result) > 1: 

511 logger.warning(f"🔍 LOAD FUNC: Multiple files selected, using only first: {file_path}") 

512 else: 

513 logger.debug(f"🔍 LOAD FUNC: List with single Path received: {file_path}") 

514 

515 if file_path and isinstance(file_path, Path): 

516 logger.debug(f"🔍 LOAD FUNC: Valid Path extracted, calling _load_pattern_from_file({file_path})") 

517 self._load_pattern_from_file(file_path) 

518 else: 

519 logger.warning(f"🔍 LOAD FUNC: No valid Path found in result: {result}") 

520 

521 # Use window-based file browser 

522 logger.debug(f"🔍 LOAD FUNC: Opening file browser window...") 

523 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

524 await open_file_browser_window( 

525 app=self.app, 

526 file_manager=self.app.filemanager, 

527 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS), 

528 backend=Backend.DISK, 

529 title="Load Function Pattern (.func)", 

530 mode=BrowserMode.LOAD, 

531 selection_mode=SelectionMode.FILES_ONLY, 

532 filter_extensions=['.func'], 

533 cache_key=PathCacheKey.FUNCTION_PATTERNS, 

534 on_result_callback=handle_result 

535 ) 

536 logger.debug(f"🔍 LOAD FUNC: File browser window opened, waiting for user selection...") 

537 

538 async def _save_func_as(self) -> None: 

539 """Save function pattern to .func file.""" 

540 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

541 from openhcs.constants.constants import Backend 

542 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

543 

544 def handle_result(result): 

545 if result and isinstance(result, Path): 

546 self._save_pattern_to_file(result) 

547 

548 # Use window-based file browser 

549 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

550 await open_file_browser_window( 

551 app=self.app, 

552 file_manager=self.app.filemanager, 

553 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS), 

554 backend=Backend.DISK, 

555 title="Save Function Pattern (.func)", 

556 mode=BrowserMode.SAVE, 

557 selection_mode=SelectionMode.FILES_ONLY, 

558 filter_extensions=['.func'], 

559 default_filename="pattern.func", 

560 cache_key=PathCacheKey.FUNCTION_PATTERNS, 

561 on_result_callback=handle_result 

562 ) 

563 

564 def _load_pattern_from_file(self, file_path: Path) -> None: 

565 """Load pattern from .func file.""" 

566 logger.debug(f"🔍 LOAD FUNC: _load_pattern_from_file called with: {file_path}") 

567 logger.debug(f"🔍 LOAD FUNC: File exists: {file_path.exists()}") 

568 logger.debug(f"🔍 LOAD FUNC: File size: {file_path.stat().st_size if file_path.exists() else 'N/A'} bytes") 

569 

570 import dill as pickle 

571 try: 

572 logger.debug(f"🔍 LOAD FUNC: Opening file for reading...") 

573 with open(file_path, 'rb') as f: 

574 pattern = pickle.load(f) 

575 logger.debug(f"🔍 LOAD FUNC: Successfully loaded pattern from pickle: {pattern}") 

576 logger.debug(f"🔍 LOAD FUNC: Pattern type: {type(pattern)}") 

577 

578 # Log current state before loading 

579 logger.debug(f"🔍 LOAD FUNC: BEFORE - functions: {len(self.functions)} items") 

580 logger.debug(f"🔍 LOAD FUNC: BEFORE - is_dict_mode: {self.is_dict_mode}") 

581 logger.debug(f"🔍 LOAD FUNC: BEFORE - selected_channel: {self.selected_channel}") 

582 

583 # Process pattern and create NEW objects (like add button does) 

584 # This is crucial - reactive system only triggers on new object assignment 

585 if pattern is None: 

586 new_pattern_data = [] 

587 new_is_dict_mode = False 

588 new_functions = [] 

589 new_selected_channel = None 

590 elif callable(pattern): 

591 # Single callable: treat as [(callable, {})] 

592 new_pattern_data = [(pattern, {})] 

593 new_is_dict_mode = False 

594 new_functions = [(pattern, {})] 

595 new_selected_channel = None 

596 elif isinstance(pattern, tuple) and len(pattern) == 2 and callable(pattern[0]) and isinstance(pattern[1], dict): 

597 # Single tuple (callable, kwargs): treat as [(callable, kwargs)] 

598 new_pattern_data = [pattern] 

599 new_is_dict_mode = False 

600 new_functions = [pattern] 

601 new_selected_channel = None 

602 elif isinstance(pattern, list): 

603 new_pattern_data = pattern 

604 new_is_dict_mode = False 

605 new_functions = self._normalize_function_list(pattern) 

606 new_selected_channel = None 

607 elif isinstance(pattern, dict): 

608 # Convert any integer keys to string keys for consistency 

609 normalized_dict = {} 

610 for key, value in pattern.items(): 

611 str_key = str(key) 

612 normalized_dict[str_key] = value 

613 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)") 

614 

615 new_pattern_data = normalized_dict 

616 new_is_dict_mode = True 

617 # Set first channel as selected, or None if no channels 

618 if normalized_dict: 

619 first_channel = next(iter(normalized_dict)) 

620 new_selected_channel = first_channel 

621 new_functions = self._normalize_function_list(normalized_dict.get(first_channel, [])) 

622 else: 

623 new_selected_channel = None 

624 new_functions = [] 

625 else: 

626 logger.warning(f"Unknown pattern type: {type(pattern)}, using empty list") 

627 new_pattern_data = [] 

628 new_is_dict_mode = False 

629 new_functions = [] 

630 new_selected_channel = None 

631 

632 # Assign NEW objects to reactive properties (like add button does) 

633 logger.debug(f"🔍 LOAD FUNC: Assigning new values to reactive properties...") 

634 logger.debug(f"🔍 LOAD FUNC: new_pattern_data: {new_pattern_data}") 

635 logger.debug(f"🔍 LOAD FUNC: new_is_dict_mode: {new_is_dict_mode}") 

636 logger.debug(f"🔍 LOAD FUNC: new_functions: {len(new_functions)} items: {new_functions}") 

637 logger.debug(f"🔍 LOAD FUNC: new_selected_channel: {new_selected_channel}") 

638 

639 self.pattern_data = new_pattern_data 

640 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned pattern_data") 

641 

642 self.is_dict_mode = new_is_dict_mode 

643 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned is_dict_mode") 

644 

645 self.functions = new_functions # This triggers reactive system! 

646 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned functions - THIS SHOULD TRIGGER REACTIVE SYSTEM!") 

647 

648 self.selected_channel = new_selected_channel 

649 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned selected_channel") 

650 

651 logger.debug(f"🔍 LOAD FUNC: Calling _commit_and_notify...") 

652 self._commit_and_notify() 

653 

654 # Final state logging 

655 logger.debug(f"🔍 LOAD FUNC: FINAL - functions: {len(self.functions)} items") 

656 logger.debug(f"🔍 LOAD FUNC: FINAL - is_dict_mode: {self.is_dict_mode}") 

657 logger.debug(f"🔍 LOAD FUNC: FINAL - selected_channel: {self.selected_channel}") 

658 logger.debug(f"🔍 LOAD FUNC: ✅ Successfully completed loading process!") 

659 

660 except Exception as e: 

661 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Failed to load pattern: {e}") 

662 import traceback 

663 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Traceback: {traceback.format_exc()}") 

664 

665 def _save_pattern_to_file(self, file_path: Path) -> None: 

666 """Save pattern to .func file.""" 

667 import pickle 

668 try: 

669 with open(file_path, 'wb') as f: 

670 pickle.dump(self.current_pattern, f) 

671 except Exception as e: 

672 logger.error(f"Failed to save pattern: {e}") 

673 

674 def _edit_in_vim(self) -> None: 

675 """Edit function pattern as Python code in terminal window.""" 

676 logger.debug("Edit button pressed - opening terminal editor") 

677 

678 try: 

679 # Use debug module's pattern formatting with proper imports 

680 from openhcs.debug.pickle_to_python import generate_readable_function_repr 

681 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher 

682 

683 # Update pattern data first 

684 self._update_pattern_data() 

685 

686 # Generate complete Python code with imports (like debug module does) 

687 python_code = self._generate_complete_python_code() 

688 

689 # Create terminal launcher 

690 launcher = TerminalLauncher(self.app) 

691 

692 # Launch editor in terminal window with callback 

693 self.app.call_later( 

694 launcher.launch_editor_for_file, 

695 python_code, 

696 '.py', 

697 self._handle_edited_pattern 

698 ) 

699 

700 except Exception as e: 

701 logger.error(f"Failed to launch terminal editor: {e}") 

702 self.app.show_error("Edit Error", f"Failed to launch terminal editor: {str(e)}") 

703 

704 def _handle_edited_pattern(self, edited_code: str) -> None: 

705 """Handle the edited pattern code from terminal editor.""" 

706 try: 

707 # Execute the code (it has all necessary imports) 

708 namespace = {} 

709 exec(edited_code, namespace) 

710 

711 # Get the pattern from the namespace 

712 if 'pattern' in namespace: 

713 new_pattern = namespace['pattern'] 

714 self._apply_edited_pattern(new_pattern) 

715 else: 

716 self.app.show_error("Parse Error", "No 'pattern = ...' assignment found in edited code") 

717 

718 except SyntaxError as e: 

719 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}") 

720 except Exception as e: 

721 logger.error(f"Failed to parse edited pattern: {e}") 

722 self.app.show_error("Edit Error", f"Failed to parse edited pattern: {str(e)}") 

723 

724 def _generate_complete_python_code(self) -> str: 

725 """Generate complete Python code with imports (following debug module approach).""" 

726 # Use complete function pattern code generation from pickle_to_python 

727 from openhcs.debug.pickle_to_python import generate_complete_function_pattern_code 

728 

729 # Disable clean_mode to preserve all parameters when same function appears multiple times 

730 # This prevents parsing issues when the same function has different parameter sets 

731 return generate_complete_function_pattern_code(self.pattern_data, clean_mode=False) 

732 

733 

734 

735 def _apply_edited_pattern(self, new_pattern): 

736 """Apply the edited pattern back to the TUI.""" 

737 try: 

738 if self.is_dict_mode: 

739 if isinstance(new_pattern, dict): 

740 self.pattern_data = new_pattern 

741 # Update current channel if it exists in new pattern 

742 if self.selected_channel and self.selected_channel in new_pattern: 

743 self.functions = self._normalize_function_list(new_pattern[self.selected_channel]) 

744 else: 

745 # Select first channel 

746 if new_pattern: 

747 self.selected_channel = next(iter(new_pattern)) 

748 self.functions = self._normalize_function_list(new_pattern[self.selected_channel]) 

749 else: 

750 self.functions = [] 

751 else: 

752 raise ValueError("Expected dict pattern for dict mode") 

753 else: 

754 if isinstance(new_pattern, list): 

755 self.pattern_data = new_pattern 

756 self.functions = self._normalize_function_list(new_pattern) 

757 elif callable(new_pattern): 

758 # Single callable: treat as [(callable, {})] 

759 self.pattern_data = [(new_pattern, {})] 

760 self.functions = [(new_pattern, {})] 

761 elif isinstance(new_pattern, tuple) and len(new_pattern) == 2 and callable(new_pattern[0]) and isinstance(new_pattern[1], dict): 

762 # Single tuple (callable, kwargs): treat as [(callable, kwargs)] 

763 self.pattern_data = [new_pattern] 

764 self.functions = [new_pattern] 

765 else: 

766 raise ValueError(f"Expected list, callable, or (callable, dict) tuple pattern for list mode, got {type(new_pattern)}") 

767 

768 # Refresh the UI and notify of changes 

769 self.refresh() 

770 self._commit_and_notify() 

771 

772 except Exception as e: 

773 self.app.show_error("Apply Error", f"Failed to apply edited pattern: {str(e)}") 

774 

775 def _generate_pattern_python_code(self) -> str: 

776 """Generate Python code representation of the current pattern.""" 

777 # Update pattern data first 

778 self._update_pattern_data() 

779 

780 # Use centralized code generation from debug module to ensure consistent collision resolution 

781 from openhcs.debug.pickle_to_python import generate_complete_function_pattern_code 

782 return generate_complete_function_pattern_code(self.pattern_data) 

783 

784 

785 

786 def _get_component_button_text(self) -> str: 

787 """Get text for the component selection button based on current group_by setting.""" 

788 if self.current_group_by is None or self.current_group_by == GroupBy.NONE: 

789 return "Component: None" 

790 

791 # Use group_by.value.value.title() for dynamic component type display 

792 # GroupBy.value is now a VariableComponents enum, so we need .value to get the string 

793 component_type = self.current_group_by.value.value.title() 

794 

795 if self.is_dict_mode and self.selected_channel is not None: 

796 # Try to get metadata name for the selected component 

797 display_name = self._get_component_display_name(self.selected_channel) 

798 return f"{component_type}: {display_name}" 

799 return f"{component_type}: None" 

800 

801 def _get_component_display_name(self, component_key: str) -> str: 

802 """Get display name for component key, using metadata if available.""" 

803 # Try to get metadata name from orchestrator 

804 orchestrator = self._get_current_orchestrator() 

805 if orchestrator and self.current_group_by: 

806 try: 

807 metadata_name = orchestrator.metadata_cache.get_component_metadata(self.current_group_by, component_key) 

808 if metadata_name: 

809 return metadata_name 

810 except Exception as e: 

811 logger.debug(f"Could not get metadata for {self.current_group_by.value} {component_key}: {e}") 

812 

813 # Fallback to component key 

814 return component_key 

815 

816 def _refresh_component_button(self) -> None: 

817 """Refresh the component button text and state.""" 

818 try: 

819 component_button = self.query_one("#component_btn", Button) 

820 component_button.label = self._get_component_button_text() 

821 component_button.disabled = self._is_component_button_disabled() 

822 except Exception as e: 

823 logger.debug(f"Could not refresh component button: {e}") 

824 

825 def _is_component_button_disabled(self) -> bool: 

826 """Check if component selection button should be disabled.""" 

827 return ( 

828 self.current_group_by is None or 

829 self.current_group_by == GroupBy.NONE or 

830 (self.current_variable_components and 

831 self.current_group_by.value in [vc.value for vc in self.current_variable_components]) 

832 ) 

833 

834 async def _show_component_selection_dialog(self) -> None: 

835 """Show the component selection dialog for the current group_by setting.""" 

836 try: 

837 # Check if component selection is disabled 

838 if self._is_component_button_disabled(): 

839 logger.debug("Component selection is disabled") 

840 return 

841 

842 # Get available components from orchestrator using current group_by 

843 orchestrator = self._get_current_orchestrator() 

844 if orchestrator is None: 

845 logger.warning("No orchestrator available for component selection") 

846 return 

847 

848 available_components = orchestrator.get_component_keys(self.current_group_by) 

849 if not available_components: 

850 component_type = self.current_group_by.value 

851 logger.warning(f"No {component_type} values found in current plate") 

852 return 

853 

854 # Get currently selected components from cache or current pattern 

855 if self.current_group_by in self.component_selections: 

856 # Use cached selection for this group_by 

857 selected_components = self.component_selections[self.current_group_by] 

858 logger.debug(f"Step '{self.step_identifier}': Using cached selection for {self.current_group_by.value}: {selected_components}") 

859 elif self.is_dict_mode and isinstance(self.pattern_data, dict): 

860 # Fallback to current pattern keys 

861 selected_components = list(self.pattern_data.keys()) 

862 else: 

863 selected_components = [] 

864 

865 # Show window with dynamic component type 

866 from openhcs.textual_tui.windows import GroupBySelectorWindow 

867 from textual.css.query import NoMatches 

868 

869 def handle_selection(result_components): 

870 if result_components is not None: 

871 self._update_components(result_components) 

872 

873 # Use window-based group-by selector (follows ConfigWindow pattern) 

874 try: 

875 window = self.app.query_one(GroupBySelectorWindow) 

876 # Window exists, update it and open 

877 window.available_channels = available_components 

878 window.selected_channels = selected_components 

879 window.component_type = self.current_group_by.value 

880 window.orchestrator = orchestrator 

881 window.on_result_callback = handle_selection 

882 window.open_state = True 

883 except NoMatches: 

884 # Expected case: window doesn't exist yet, create new one 

885 window = GroupBySelectorWindow( 

886 available_channels=available_components, 

887 selected_channels=selected_components, 

888 on_result_callback=handle_selection, 

889 component_type=self.current_group_by.value, 

890 orchestrator=orchestrator 

891 ) 

892 await self.app.mount(window) 

893 window.open_state = True 

894 

895 except Exception as e: 

896 component_type = self.current_group_by.value if self.current_group_by else "component" 

897 logger.error(f"Failed to show {component_type} selection dialog: {e}") 

898 

899 def _update_components(self, new_components: List[str]) -> None: 

900 """ 

901 Update the pattern based on new component selection. 

902 

903 Uses string component keys directly to match the pattern detection system. 

904 Pattern detection always returns string component values (e.g., '1', '2', '3') when 

905 grouping by any component, so function patterns use string keys for consistency. 

906 """ 

907 if not new_components: 

908 # No components selected - revert to list mode 

909 if self.is_dict_mode: 

910 # Save current functions to list mode 

911 self.pattern_data = self.functions 

912 self.is_dict_mode = False 

913 self.selected_channel = None 

914 logger.debug("Reverted to list mode (no components selected)") 

915 else: 

916 # Use component strings directly - no conversion needed 

917 component_keys = new_components 

918 

919 # Components selected - ensure dict mode 

920 if not self.is_dict_mode: 

921 # Convert to dict mode 

922 current_functions = self.functions 

923 self.pattern_data = {component_keys[0]: current_functions} 

924 self.is_dict_mode = True 

925 self.selected_channel = component_keys[0] 

926 

927 # Add other components with empty functions 

928 for component_key in component_keys[1:]: 

929 self.pattern_data[component_key] = [] 

930 else: 

931 # Already in dict mode - update components 

932 old_pattern = self.pattern_data.copy() if isinstance(self.pattern_data, dict) else {} 

933 new_pattern = {} 

934 

935 # Keep existing functions for components that remain 

936 for component_key in component_keys: 

937 # Check both string and integer keys for backward compatibility 

938 if component_key in old_pattern: 

939 new_pattern[component_key] = old_pattern[component_key] 

940 else: 

941 # Try integer key for backward compatibility 

942 try: 

943 component_int = int(component_key) 

944 if component_int in old_pattern: 

945 new_pattern[component_key] = old_pattern[component_int] 

946 else: 

947 new_pattern[component_key] = [] 

948 except ValueError: 

949 new_pattern[component_key] = [] 

950 

951 self.pattern_data = new_pattern 

952 

953 # Update selected component if needed 

954 if self.selected_channel not in component_keys: 

955 self.selected_channel = component_keys[0] if component_keys else None 

956 if self.selected_channel: 

957 self.functions = new_pattern.get(self.selected_channel, []) 

958 

959 # Save selection to cache for current group_by 

960 if self.current_group_by is not None and self.current_group_by != GroupBy.NONE: 

961 self.component_selections[self.current_group_by] = new_components 

962 logger.debug(f"Step '{self.step_identifier}': Cached selection for {self.current_group_by.value}: {new_components}") 

963 

964 self._commit_and_notify() 

965 self._refresh_component_button() 

966 logger.debug(f"Updated components: {new_components}") 

967 

968 def _navigate_channel(self, direction: int) -> None: 

969 """Navigate to next/previous channel (with looping).""" 

970 if not self.is_dict_mode or not isinstance(self.pattern_data, dict): 

971 return 

972 

973 channels = sorted(self.pattern_data.keys()) 

974 if len(channels) <= 1: 

975 return 

976 

977 try: 

978 current_index = channels.index(self.selected_channel) 

979 new_index = (current_index + direction) % len(channels) 

980 new_channel = channels[new_index] 

981 

982 self._switch_to_channel(new_channel) 

983 logger.debug(f"Navigated to channel {new_channel}") 

984 except (ValueError, IndexError): 

985 logger.warning(f"Failed to navigate channels: current={self.selected_channel}, channels={channels}") 

986 

987 

988 

989 def _get_current_orchestrator(self): 

990 """Get the current orchestrator from the app.""" 

991 try: 

992 # Get from app - PlateManagerWidget is now in PipelinePlateWindow 

993 if hasattr(self.app, 'query_one'): 

994 from openhcs.textual_tui.windows import PipelinePlateWindow 

995 from openhcs.textual_tui.widgets.plate_manager import PlateManagerWidget 

996 

997 # Try to find the PipelinePlateWindow first 

998 try: 

999 pipeline_plate_window = self.app.query_one(PipelinePlateWindow) 

1000 plate_manager = pipeline_plate_window.plate_widget 

1001 except: 

1002 # Fallback: try to find PlateManagerWidget directly in the app 

1003 plate_manager = self.app.query_one(PlateManagerWidget) 

1004 

1005 # Use selected_plate (not current_plate!) 

1006 selected_plate = plate_manager.selected_plate 

1007 if selected_plate and selected_plate in plate_manager.orchestrators: 

1008 orchestrator = plate_manager.orchestrators[selected_plate] 

1009 if not orchestrator.is_initialized(): 

1010 orchestrator.initialize() 

1011 return orchestrator 

1012 return None 

1013 except Exception as e: 

1014 logger.error(f"Failed to get orchestrator: {e}") 

1015 return None 

1016