Coverage for openhcs/textual_tui/widgets/function_list_editor.py: 0.0%

658 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1"""Function list editor widget - port of function_list_manager.py to Textual.""" 

2 

3import logging 

4from pathlib import Path 

5from typing import List, Union, Dict, Any, Optional, Callable # Added Optional, Callable 

6from textual.containers import ScrollableContainer, Container, Horizontal, Center 

7from textual.widgets import Button, Static, Select 

8from textual.app import ComposeResult 

9from textual.reactive import reactive 

10from textual.message import Message # Added Message 

11 

12from openhcs.textual_tui.services.function_registry_service import FunctionRegistryService 

13from openhcs.textual_tui.services.pattern_data_manager import PatternDataManager 

14from openhcs.textual_tui.widgets.function_pane import FunctionPaneWidget 

15from openhcs.constants.constants import GroupBy, VariableComponents 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class FunctionListEditorWidget(Container): 

21 """ 

22 Scrollable function list editor. 

23 

24 Ports the function display and management logic from function_list_manager.py 

25 """ 

26 

27 class FunctionPatternChanged(Message): 

28 """Message to indicate the function pattern has changed.""" 

29 pass 

30 

31 # Reactive properties for automatic UI updates 

32 functions = reactive(list, recompose=True) # Structural changes (add/remove) should trigger recomposition 

33 pattern_data = reactive(list, recompose=False) # The actual pattern (List or Dict) 

34 is_dict_mode = reactive(False, recompose=True) # Whether we're in channel-specific mode - affects UI layout 

35 selected_channel = reactive(None, recompose=True) # Currently selected channel - affects button text 

36 available_channels = reactive(list) # Available channels from orchestrator 

37 

38 # Step configuration reactive properties for dynamic component selection 

39 current_group_by = reactive(None, recompose=True) # Current GroupBy setting from step editor 

40 current_variable_components = reactive(list, recompose=True) # Current VariableComponents list from step editor 

41 

42 def __init__(self, initial_functions: Union[List, Dict, callable, None] = None, step_identifier: str = None): 

43 super().__init__() 

44 

45 # Initialize services (reuse existing business logic) 

46 self.registry_service = FunctionRegistryService() 

47 self.data_manager = PatternDataManager() # Not heavily used yet, but available 

48 

49 # Step identifier for cache isolation (optional, defaults to widget instance id) 

50 self.step_identifier = step_identifier or f"widget_{id(self)}" 

51 

52 # Component selection cache per GroupBy (instance-specific, not shared between steps) 

53 self.component_selections: Dict[GroupBy, List[str]] = {} 

54 

55 # Initialize pattern data and mode 

56 self._initialize_pattern_data(initial_functions) 

57 

58 logger.debug(f"FunctionListEditorWidget initialized for step '{self.step_identifier}' with {len(self.functions)} functions, dict_mode={self.is_dict_mode}") 

59 

60 @property 

61 def current_pattern(self) -> Union[List, Dict]: 

62 """Get the current pattern data (for parent widgets to access).""" 

63 self._update_pattern_data() # Ensure it's up to date 

64 

65 # Migration fix: Convert any integer keys to string keys for compatibility 

66 # with pattern detection system which always uses string component values 

67 if isinstance(self.pattern_data, dict): 

68 migrated_pattern = {} 

69 for key, value in self.pattern_data.items(): 

70 str_key = str(key) 

71 migrated_pattern[str_key] = value 

72 return migrated_pattern 

73 

74 return self.pattern_data 

75 

76 def watch_functions(self, new_functions: List) -> None: 

77 """Watch for changes to functions and update pattern data.""" 

78 # Update pattern data when functions change (structural changes only) 

79 self._update_pattern_data() 

80 

81 def _initialize_pattern_data(self, initial_functions: Union[List, Dict, callable, None]) -> None: 

82 """Initialize pattern data and determine mode.""" 

83 if initial_functions is None: 

84 self.pattern_data = [] 

85 self.is_dict_mode = False 

86 self.functions = [] 

87 elif callable(initial_functions): 

88 self.pattern_data = [(initial_functions, {})] 

89 self.is_dict_mode = False 

90 self.functions = [(initial_functions, {})] 

91 elif isinstance(initial_functions, list): 

92 self.pattern_data = initial_functions 

93 self.is_dict_mode = False 

94 self.functions = self._normalize_function_list(initial_functions) 

95 elif isinstance(initial_functions, dict): 

96 # Convert any integer keys to string keys for consistency 

97 normalized_dict = {} 

98 for key, value in initial_functions.items(): 

99 str_key = str(key) # Ensure all keys are strings 

100 normalized_dict[str_key] = value 

101 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)") 

102 

103 self.pattern_data = normalized_dict 

104 self.is_dict_mode = True 

105 # Set first channel as selected, or empty if no channels 

106 if normalized_dict: 

107 first_channel = next(iter(normalized_dict)) 

108 self.selected_channel = first_channel 

109 self.functions = self._normalize_function_list(normalized_dict.get(first_channel, [])) 

110 else: 

111 self.selected_channel = None 

112 self.functions = [] 

113 else: 

114 logger.warning(f"Unknown initial_functions type: {type(initial_functions)}, using empty list") 

115 self.pattern_data = [] 

116 self.is_dict_mode = False 

117 self.functions = [] 

118 

119 

120 def _normalize_function_list(self, func_list: List[Any]) -> List[tuple[Callable, Dict]]: 

121 """Ensures all items in a function list are (callable, kwargs) tuples.""" 

122 normalized = [] 

123 for item in func_list: 

124 if isinstance(item, tuple) and len(item) == 2 and callable(item[0]) and isinstance(item[1], dict): 

125 normalized.append(item) 

126 elif callable(item): 

127 normalized.append((item, {})) 

128 else: 

129 logger.warning(f"Skipping invalid item in function list: {item}") 

130 return normalized 

131 

132 def _commit_and_notify(self) -> None: 

133 """Post a change message to notify parent of function pattern changes.""" 

134 self.post_message(self.FunctionPatternChanged()) 

135 logger.debug("Posted FunctionPatternChanged message to parent.") 

136 

137 def _trigger_recomposition(self) -> None: 

138 """Manually trigger recomposition when needed (e.g., loading patterns, adding/removing functions).""" 

139 # Force recomposition by mutating the reactive property 

140 # This is needed because functions has recompose=False to prevent focus loss during typing 

141 self.mutate_reactive(FunctionListEditorWidget.functions) 

142 

143 def watch_current_group_by(self, old_group_by: Optional[GroupBy], new_group_by: Optional[GroupBy]) -> None: 

144 """Handle group_by changes by saving/loading component selections.""" 

145 if old_group_by is not None and old_group_by != GroupBy.NONE: 

146 # Save current selection for old group_by 

147 if self.is_dict_mode and isinstance(self.pattern_data, dict): 

148 current_selection = list(self.pattern_data.keys()) 

149 self.component_selections[old_group_by] = current_selection 

150 logger.debug(f"Step '{self.step_identifier}': Saved selection for {old_group_by.value}: {current_selection}") 

151 

152 # Note: We don't automatically load selection for new_group_by here 

153 # because the dialog will handle loading from cache when opened 

154 logger.debug(f"Group by changed from {old_group_by} to {new_group_by}") 

155 

156 def _update_pattern_data(self) -> None: 

157 """Update pattern_data based on current functions and mode.""" 

158 if self.is_dict_mode and self.selected_channel is not None: 

159 # Save current functions to the selected channel 

160 if not isinstance(self.pattern_data, dict): 

161 self.pattern_data = {} 

162 logger.debug(f"Saving {len(self.functions)} functions to channel {self.selected_channel}") 

163 self.pattern_data[self.selected_channel] = self.functions.copy() # Make a copy to avoid reference issues 

164 else: 

165 # List mode - pattern_data is just the functions list 

166 self.pattern_data = self.functions 

167 

168 

169 

170 def _switch_to_channel(self, channel: Any) -> None: 

171 """Switch to editing functions for a specific channel.""" 

172 if not self.is_dict_mode: 

173 return 

174 

175 # Save current functions first 

176 old_channel = self.selected_channel 

177 logger.debug(f"Switching from channel {old_channel} to {channel}") 

178 logger.debug(f"Current functions before save: {len(self.functions)} functions") 

179 

180 self._update_pattern_data() 

181 

182 # Verify the save worked 

183 if old_channel and isinstance(self.pattern_data, dict): 

184 saved_functions = self.pattern_data.get(old_channel, []) 

185 logger.debug(f"Saved {len(saved_functions)} functions to channel {old_channel}") 

186 

187 # Switch to new channel 

188 self.selected_channel = channel 

189 if isinstance(self.pattern_data, dict): 

190 self.functions = self.pattern_data.get(channel, []) 

191 logger.debug(f"Loaded {len(self.functions)} functions for channel {channel}") 

192 else: 

193 self.functions = [] 

194 

195 # Update button text to show new channel 

196 self._refresh_component_button() 

197 

198 # Channel switch will automatically trigger recomposition via reactive system 

199 

200 def _add_channel_to_pattern(self, channel: Any) -> None: 

201 """Add a new channel (converts to dict mode if needed).""" 

202 if not self.is_dict_mode: 

203 # Convert to dict mode 

204 self.pattern_data = {channel: self.functions} 

205 self.is_dict_mode = True 

206 self.selected_channel = channel 

207 else: 

208 # Add new channel with empty functions 

209 if not isinstance(self.pattern_data, dict): 

210 self.pattern_data = {} 

211 self.pattern_data[channel] = [] 

212 self.selected_channel = channel 

213 self.functions = [] 

214 

215 def _remove_current_channel(self) -> None: 

216 """Remove the currently selected channel.""" 

217 if not self.is_dict_mode or self.selected_channel is None: 

218 return 

219 

220 if isinstance(self.pattern_data, dict): 

221 new_pattern = self.pattern_data.copy() 

222 if self.selected_channel in new_pattern: 

223 del new_pattern[self.selected_channel] 

224 

225 if len(new_pattern) == 0: 

226 # Revert to list mode 

227 self.pattern_data = [] 

228 self.is_dict_mode = False 

229 self.selected_channel = None 

230 self.functions = [] 

231 else: 

232 # Switch to first remaining channel 

233 self.pattern_data = new_pattern 

234 first_channel = next(iter(new_pattern)) 

235 self.selected_channel = first_channel 

236 self.functions = new_pattern[first_channel] 

237 

238 def compose(self) -> ComposeResult: 

239 """Compose the function list using the common interface pattern.""" 

240 from textual.containers import Vertical 

241 

242 with Vertical(): 

243 # Fixed header with title 

244 yield Static("[bold]Functions[/bold]") 

245 

246 # Button row - takes minimal height needed for buttons 

247 with Horizontal(id="function_list_header") as button_row: 

248 button_row.styles.height = "auto" # CRITICAL: Take only needed height 

249 

250 # Empty space (flex-grows) 

251 yield Static("") 

252 

253 # Centered main button group 

254 with Horizontal() as main_button_group: 

255 main_button_group.styles.width = "auto" 

256 yield Button("Add", id="add_function_btn", compact=True) 

257 yield Button("Load", id="load_func_btn", compact=True) 

258 yield Button("Save As", id="save_func_as_btn", compact=True) 

259 yield Button("Code", id="edit_vim_btn", compact=True) 

260 

261 # Component selection button (dynamic based on group_by setting) 

262 component_text = self._get_component_button_text() 

263 component_button = Button(component_text, id="component_btn", compact=True) 

264 component_button.disabled = self._is_component_button_disabled() 

265 yield component_button 

266 

267 # Channel navigation buttons (only in dict mode with multiple channels) 

268 if self.is_dict_mode and isinstance(self.pattern_data, dict) and len(self.pattern_data) > 1: 

269 yield Button("<", id="prev_channel_btn", compact=True) 

270 yield Button(">", id="next_channel_btn", compact=True) 

271 

272 # Empty space (flex-grows) 

273 yield Static("") 

274 

275 # Scrollable content area - expands to fill ALL remaining vertical space 

276 with ScrollableContainer(id="function_list_content") as container: 

277 container.styles.height = "1fr" # CRITICAL: Fill remaining space 

278 

279 if not self.functions: 

280 content = Static("No functions defined. Click 'Add Function' to begin.") 

281 content.styles.width = "100%" 

282 content.styles.height = "100%" 

283 yield content 

284 else: 

285 for i, func_item in enumerate(self.functions): 

286 pane = FunctionPaneWidget(func_item, i) 

287 pane.styles.width = "100%" 

288 pane.styles.height = "auto" # CRITICAL: Only take height needed for content 

289 yield pane 

290 

291 

292 

293 async def on_button_pressed(self, event: Button.Pressed) -> None: 

294 """Handle button presses.""" 

295 logger.debug(f"🔍 BUTTON: Button pressed: {event.button.id}") 

296 

297 if event.button.id == "add_function_btn": 

298 logger.debug(f"🔍 BUTTON: Add function button pressed") 

299 await self._add_function() 

300 elif event.button.id == "load_func_btn": 

301 logger.debug(f"🔍 BUTTON: Load func button pressed - calling _load_func()") 

302 await self._load_func() 

303 elif event.button.id == "save_func_as_btn": 

304 logger.debug(f"🔍 BUTTON: Save func as button pressed") 

305 await self._save_func_as() 

306 elif event.button.id == "edit_vim_btn": 

307 logger.debug(f"🔍 BUTTON: Edit vim button pressed") 

308 self._edit_in_vim() 

309 elif event.button.id == "component_btn": 

310 logger.debug(f"🔍 BUTTON: Component button pressed") 

311 await self._show_component_selection_dialog() 

312 elif event.button.id == "prev_channel_btn": 

313 logger.debug(f"🔍 BUTTON: Previous channel button pressed") 

314 self._navigate_channel(-1) 

315 elif event.button.id == "next_channel_btn": 

316 logger.debug(f"🔍 BUTTON: Next channel button pressed") 

317 self._navigate_channel(1) 

318 else: 

319 logger.warning(f"🔍 BUTTON: Unknown button pressed: {event.button.id}") 

320 

321 

322 

323 def on_function_pane_widget_parameter_changed(self, event: Message) -> None: 

324 """Handle parameter change message from FunctionPaneWidget.""" 

325 if hasattr(event, 'index') and hasattr(event, 'param_name') and hasattr(event, 'value'): 

326 if 0 <= event.index < len(self.functions): 

327 # Update function kwargs with proper type conversion 

328 func, kwargs = self.functions[event.index] 

329 new_kwargs = kwargs.copy() 

330 

331 # Convert value to proper type based on function signature 

332 converted_value = event.value 

333 try: 

334 from openhcs.textual_tui.widgets.shared.signature_analyzer import SignatureAnalyzer 

335 from enum import Enum 

336 

337 param_info = SignatureAnalyzer.analyze(func) 

338 if event.param_name in param_info: 

339 param_details = param_info[event.param_name] 

340 param_type = param_details.param_type 

341 is_required = param_details.is_required 

342 default_value = param_details.default_value 

343 

344 # Handle empty strings - always convert to None or default value 

345 if isinstance(event.value, str) and event.value.strip() == "": 

346 # Empty parameter - use default value (None for required params) 

347 converted_value = default_value 

348 elif isinstance(event.value, str) and event.value.strip() != "": 

349 # Non-empty string - convert to proper type 

350 if param_type == float: 

351 converted_value = float(event.value) 

352 elif param_type == int: 

353 converted_value = int(event.value) 

354 elif param_type == bool: 

355 converted_value = event.value.lower() in ('true', '1', 'yes', 'on') 

356 elif hasattr(param_type, '__bases__') and Enum in param_type.__bases__: 

357 converted_value = param_type(event.value) 

358 except (ValueError, TypeError, AttributeError) as e: 

359 logger.warning(f"Failed to convert parameter '{event.param_name}' value '{event.value}': {e}") 

360 converted_value = event.value # Keep original value on conversion failure 

361 

362 new_kwargs[event.param_name] = converted_value 

363 

364 # Update functions list WITHOUT triggering recomposition (to prevent focus loss) 

365 # We modify the underlying list directly instead of assigning a new list 

366 self.functions[event.index] = (func, new_kwargs) 

367 

368 # Manually update pattern data 

369 self._update_pattern_data() 

370 

371 # Notify parent 

372 self.post_message(self.FunctionPatternChanged()) 

373 logger.debug(f"Updated parameter {event.param_name}={converted_value} (type: {type(converted_value)}) for function {event.index}") 

374 

375 async def on_function_pane_widget_change_function(self, event: Message) -> None: 

376 """Handle change function message from FunctionPaneWidget.""" 

377 if hasattr(event, 'index') and 0 <= event.index < len(self.functions): 

378 await self._change_function(event.index) 

379 

380 def on_function_pane_widget_remove_function(self, event: Message) -> None: 

381 """Handle remove function message from FunctionPaneWidget.""" 

382 if hasattr(event, 'index') and 0 <= event.index < len(self.functions): 

383 new_functions = self.functions[:event.index] + self.functions[event.index+1:] 

384 self.functions = new_functions 

385 self._commit_and_notify() 

386 logger.debug(f"Removed function at index {event.index}") 

387 else: 

388 logger.warning(f"Invalid index for remove function: {getattr(event, 'index', 'N/A')}") 

389 

390 async def on_function_pane_widget_add_function(self, event: Message) -> None: 

391 """Handle add function message from FunctionPaneWidget.""" 

392 if hasattr(event, 'insert_index'): 

393 insert_index = min(event.insert_index, len(self.functions)) # Clamp to valid range 

394 await self._add_function_at_index(insert_index) 

395 else: 

396 logger.warning(f"Invalid add function event: missing insert_index") 

397 

398 def on_function_pane_widget_move_function(self, event: Message) -> None: 

399 """Handle move function message from FunctionPaneWidget.""" 

400 if not (hasattr(event, 'index') and hasattr(event, 'direction')): 

401 return 

402 

403 index = event.index 

404 direction = event.direction 

405 

406 if not (0 <= index < len(self.functions)): 

407 return 

408 

409 new_index = index + direction 

410 if not (0 <= new_index < len(self.functions)): 

411 return 

412 

413 new_functions = self.functions.copy() 

414 new_functions[index], new_functions[new_index] = new_functions[new_index], new_functions[index] 

415 self.functions = new_functions 

416 self._commit_and_notify() 

417 logger.debug(f"Moved function from index {index} to {new_index}") 

418 

419 async def _add_function(self) -> None: 

420 """Add a new function to the end of the list.""" 

421 await self._add_function_at_index(len(self.functions)) 

422 

423 async def _add_function_at_index(self, insert_index: int) -> None: 

424 """Add a new function at the specified index.""" 

425 from openhcs.textual_tui.windows import FunctionSelectorWindow 

426 from textual.css.query import NoMatches 

427 

428 def handle_function_selection(selected_function: Optional[Callable]) -> None: 

429 if selected_function: 

430 # Insert function at the specified index 

431 new_functions = self.functions.copy() 

432 new_functions.insert(insert_index, (selected_function, {})) 

433 self.functions = new_functions 

434 self._commit_and_notify() 

435 logger.debug(f"Added function: {selected_function.__name__} at index {insert_index}") 

436 

437 # Use window-based function selector (follows ConfigWindow pattern) 

438 try: 

439 window = self.app.query_one(FunctionSelectorWindow) 

440 # Window exists, update it and open 

441 window.on_result_callback = handle_function_selection 

442 window.open_state = True 

443 except NoMatches: 

444 # Expected case: window doesn't exist yet, create new one 

445 window = FunctionSelectorWindow(on_result_callback=handle_function_selection) 

446 await self.app.mount(window) 

447 window.open_state = True 

448 

449 async def _change_function(self, index: int) -> None: 

450 """Change function at specified index.""" 

451 from openhcs.textual_tui.windows import FunctionSelectorWindow 

452 from textual.css.query import NoMatches 

453 

454 if 0 <= index < len(self.functions): 

455 current_func, _ = self.functions[index] 

456 

457 def handle_function_selection(selected_function: Optional[Callable]) -> None: 

458 if selected_function: 

459 # Replace function but keep existing kwargs structure 

460 new_functions = self.functions.copy() 

461 new_functions[index] = (selected_function, {}) 

462 self.functions = new_functions 

463 self._commit_and_notify() 

464 logger.debug(f"Changed function at index {index} to: {selected_function.__name__}") 

465 

466 # Use window-based function selector (follows ConfigWindow pattern) 

467 try: 

468 window = self.app.query_one(FunctionSelectorWindow) 

469 # Window exists, update it and open 

470 window.current_function = current_func 

471 window.on_result_callback = handle_function_selection 

472 window.open_state = True 

473 except NoMatches: 

474 # Expected case: window doesn't exist yet, create new one 

475 window = FunctionSelectorWindow(current_function=current_func, on_result_callback=handle_function_selection) 

476 await self.app.mount(window) 

477 window.open_state = True 

478 

479 def _commit_and_notify(self) -> None: 

480 """Commit changes and notify parent of function pattern change.""" 

481 # Update pattern data before notifying 

482 self._update_pattern_data() 

483 # Post message to notify parent (DualEditorScreen) of changes 

484 self.post_message(self.FunctionPatternChanged()) 

485 

486 async def _load_func(self) -> None: 

487 """Load function pattern from .func file.""" 

488 logger.debug(f"🔍 LOAD FUNC: _load_func() called - starting file browser...") 

489 

490 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

491 from openhcs.constants.constants import Backend 

492 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

493 

494 def handle_result(result): 

495 logger.debug(f"🔍 LOAD FUNC: File browser callback received result: {result} (type: {type(result)})") 

496 

497 # Handle both single Path and list of Paths (disable multi-loading, take first only) 

498 file_path = None 

499 if isinstance(result, Path): 

500 file_path = result 

501 logger.debug(f"🔍 LOAD FUNC: Single Path received: {file_path}") 

502 elif isinstance(result, list) and len(result) > 0: 

503 file_path = result[0] # Take only the first file (disable multi-loading) 

504 if len(result) > 1: 

505 logger.warning(f"🔍 LOAD FUNC: Multiple files selected, using only first: {file_path}") 

506 else: 

507 logger.debug(f"🔍 LOAD FUNC: List with single Path received: {file_path}") 

508 

509 if file_path and isinstance(file_path, Path): 

510 logger.debug(f"🔍 LOAD FUNC: Valid Path extracted, calling _load_pattern_from_file({file_path})") 

511 self._load_pattern_from_file(file_path) 

512 else: 

513 logger.warning(f"🔍 LOAD FUNC: No valid Path found in result: {result}") 

514 

515 # Use window-based file browser 

516 logger.debug(f"🔍 LOAD FUNC: Opening file browser window...") 

517 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

518 await open_file_browser_window( 

519 app=self.app, 

520 file_manager=self.app.filemanager, 

521 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS), 

522 backend=Backend.DISK, 

523 title="Load Function Pattern (.func)", 

524 mode=BrowserMode.LOAD, 

525 selection_mode=SelectionMode.FILES_ONLY, 

526 filter_extensions=['.func'], 

527 cache_key=PathCacheKey.FUNCTION_PATTERNS, 

528 on_result_callback=handle_result 

529 ) 

530 logger.debug(f"🔍 LOAD FUNC: File browser window opened, waiting for user selection...") 

531 

532 async def _save_func_as(self) -> None: 

533 """Save function pattern to .func file.""" 

534 from openhcs.textual_tui.windows import open_file_browser_window, BrowserMode 

535 from openhcs.constants.constants import Backend 

536 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

537 

538 def handle_result(result): 

539 if result and isinstance(result, Path): 

540 self._save_pattern_to_file(result) 

541 

542 # Use window-based file browser 

543 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

544 await open_file_browser_window( 

545 app=self.app, 

546 file_manager=self.app.filemanager, 

547 initial_path=get_cached_browser_path(PathCacheKey.FUNCTION_PATTERNS), 

548 backend=Backend.DISK, 

549 title="Save Function Pattern (.func)", 

550 mode=BrowserMode.SAVE, 

551 selection_mode=SelectionMode.FILES_ONLY, 

552 filter_extensions=['.func'], 

553 default_filename="pattern.func", 

554 cache_key=PathCacheKey.FUNCTION_PATTERNS, 

555 on_result_callback=handle_result 

556 ) 

557 

558 def _load_pattern_from_file(self, file_path: Path) -> None: 

559 """Load pattern from .func file.""" 

560 logger.debug(f"🔍 LOAD FUNC: _load_pattern_from_file called with: {file_path}") 

561 logger.debug(f"🔍 LOAD FUNC: File exists: {file_path.exists()}") 

562 logger.debug(f"🔍 LOAD FUNC: File size: {file_path.stat().st_size if file_path.exists() else 'N/A'} bytes") 

563 

564 import dill as pickle 

565 try: 

566 logger.debug(f"🔍 LOAD FUNC: Opening file for reading...") 

567 with open(file_path, 'rb') as f: 

568 pattern = pickle.load(f) 

569 logger.debug(f"🔍 LOAD FUNC: Successfully loaded pattern from pickle: {pattern}") 

570 logger.debug(f"🔍 LOAD FUNC: Pattern type: {type(pattern)}") 

571 

572 # Log current state before loading 

573 logger.debug(f"🔍 LOAD FUNC: BEFORE - functions: {len(self.functions)} items") 

574 logger.debug(f"🔍 LOAD FUNC: BEFORE - is_dict_mode: {self.is_dict_mode}") 

575 logger.debug(f"🔍 LOAD FUNC: BEFORE - selected_channel: {self.selected_channel}") 

576 

577 # Process pattern and create NEW objects (like add button does) 

578 # This is crucial - reactive system only triggers on new object assignment 

579 if pattern is None: 

580 new_pattern_data = [] 

581 new_is_dict_mode = False 

582 new_functions = [] 

583 new_selected_channel = None 

584 elif callable(pattern): 

585 new_pattern_data = [(pattern, {})] 

586 new_is_dict_mode = False 

587 new_functions = [(pattern, {})] 

588 new_selected_channel = None 

589 elif isinstance(pattern, list): 

590 new_pattern_data = pattern 

591 new_is_dict_mode = False 

592 new_functions = self._normalize_function_list(pattern) 

593 new_selected_channel = None 

594 elif isinstance(pattern, dict): 

595 # Convert any integer keys to string keys for consistency 

596 normalized_dict = {} 

597 for key, value in pattern.items(): 

598 str_key = str(key) 

599 normalized_dict[str_key] = value 

600 logger.debug(f"Converted channel key {key} ({type(key)}) to '{str_key}' (str)") 

601 

602 new_pattern_data = normalized_dict 

603 new_is_dict_mode = True 

604 # Set first channel as selected, or None if no channels 

605 if normalized_dict: 

606 first_channel = next(iter(normalized_dict)) 

607 new_selected_channel = first_channel 

608 new_functions = self._normalize_function_list(normalized_dict.get(first_channel, [])) 

609 else: 

610 new_selected_channel = None 

611 new_functions = [] 

612 else: 

613 logger.warning(f"Unknown pattern type: {type(pattern)}, using empty list") 

614 new_pattern_data = [] 

615 new_is_dict_mode = False 

616 new_functions = [] 

617 new_selected_channel = None 

618 

619 # Assign NEW objects to reactive properties (like add button does) 

620 logger.debug(f"🔍 LOAD FUNC: Assigning new values to reactive properties...") 

621 logger.debug(f"🔍 LOAD FUNC: new_pattern_data: {new_pattern_data}") 

622 logger.debug(f"🔍 LOAD FUNC: new_is_dict_mode: {new_is_dict_mode}") 

623 logger.debug(f"🔍 LOAD FUNC: new_functions: {len(new_functions)} items: {new_functions}") 

624 logger.debug(f"🔍 LOAD FUNC: new_selected_channel: {new_selected_channel}") 

625 

626 self.pattern_data = new_pattern_data 

627 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned pattern_data") 

628 

629 self.is_dict_mode = new_is_dict_mode 

630 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned is_dict_mode") 

631 

632 self.functions = new_functions # This triggers reactive system! 

633 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned functions - THIS SHOULD TRIGGER REACTIVE SYSTEM!") 

634 

635 self.selected_channel = new_selected_channel 

636 logger.debug(f"🔍 LOAD FUNC: ✅ Assigned selected_channel") 

637 

638 logger.debug(f"🔍 LOAD FUNC: Calling _commit_and_notify...") 

639 self._commit_and_notify() 

640 

641 # Final state logging 

642 logger.debug(f"🔍 LOAD FUNC: FINAL - functions: {len(self.functions)} items") 

643 logger.debug(f"🔍 LOAD FUNC: FINAL - is_dict_mode: {self.is_dict_mode}") 

644 logger.debug(f"🔍 LOAD FUNC: FINAL - selected_channel: {self.selected_channel}") 

645 logger.debug(f"🔍 LOAD FUNC: ✅ Successfully completed loading process!") 

646 

647 except Exception as e: 

648 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Failed to load pattern: {e}") 

649 import traceback 

650 logger.error(f"🔍 LOAD FUNC: ❌ ERROR - Traceback: {traceback.format_exc()}") 

651 

652 def _save_pattern_to_file(self, file_path: Path) -> None: 

653 """Save pattern to .func file.""" 

654 import pickle 

655 try: 

656 with open(file_path, 'wb') as f: 

657 pickle.dump(self.current_pattern, f) 

658 except Exception as e: 

659 logger.error(f"Failed to save pattern: {e}") 

660 

661 def _edit_in_vim(self) -> None: 

662 """Edit function pattern as Python code in terminal window.""" 

663 logger.debug("Edit button pressed - opening terminal editor") 

664 

665 try: 

666 # Use debug module's pattern formatting with proper imports 

667 from openhcs.debug.pickle_to_python import generate_readable_function_repr 

668 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher 

669 

670 # Update pattern data first 

671 self._update_pattern_data() 

672 

673 # Generate complete Python code with imports (like debug module does) 

674 python_code = self._generate_complete_python_code() 

675 

676 # Create terminal launcher 

677 launcher = TerminalLauncher(self.app) 

678 

679 # Launch editor in terminal window with callback 

680 self.app.call_later( 

681 launcher.launch_editor_for_file, 

682 python_code, 

683 '.py', 

684 self._handle_edited_pattern 

685 ) 

686 

687 except Exception as e: 

688 logger.error(f"Failed to launch terminal editor: {e}") 

689 self.app.show_error("Edit Error", f"Failed to launch terminal editor: {str(e)}") 

690 

691 def _handle_edited_pattern(self, edited_code: str) -> None: 

692 """Handle the edited pattern code from terminal editor.""" 

693 try: 

694 # Execute the code (it has all necessary imports) 

695 namespace = {} 

696 exec(edited_code, namespace) 

697 

698 # Get the pattern from the namespace 

699 if 'pattern' in namespace: 

700 new_pattern = namespace['pattern'] 

701 self._apply_edited_pattern(new_pattern) 

702 else: 

703 self.app.show_error("Parse Error", "No 'pattern = ...' assignment found in edited code") 

704 

705 except SyntaxError as e: 

706 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}") 

707 except Exception as e: 

708 logger.error(f"Failed to parse edited pattern: {e}") 

709 self.app.show_error("Edit Error", f"Failed to parse edited pattern: {str(e)}") 

710 

711 def _generate_complete_python_code(self) -> str: 

712 """Generate complete Python code with imports (following debug module approach).""" 

713 # Use complete function pattern code generation from pickle_to_python 

714 from openhcs.debug.pickle_to_python import generate_complete_function_pattern_code 

715 

716 return generate_complete_function_pattern_code(self.pattern_data, clean_mode=False) 

717 

718 

719 

720 def _apply_edited_pattern(self, new_pattern): 

721 """Apply the edited pattern back to the TUI.""" 

722 try: 

723 if self.is_dict_mode: 

724 if isinstance(new_pattern, dict): 

725 self.pattern_data = new_pattern 

726 # Update current channel if it exists in new pattern 

727 if self.selected_channel and self.selected_channel in new_pattern: 

728 self.functions = self._normalize_function_list(new_pattern[self.selected_channel]) 

729 else: 

730 # Select first channel 

731 if new_pattern: 

732 self.selected_channel = next(iter(new_pattern)) 

733 self.functions = self._normalize_function_list(new_pattern[self.selected_channel]) 

734 else: 

735 self.functions = [] 

736 else: 

737 raise ValueError("Expected dict pattern for dict mode") 

738 else: 

739 if isinstance(new_pattern, list): 

740 self.pattern_data = new_pattern 

741 self.functions = self._normalize_function_list(new_pattern) 

742 else: 

743 raise ValueError("Expected list pattern for list mode") 

744 

745 # Refresh the UI and notify of changes 

746 self.refresh() 

747 self._commit_and_notify() 

748 

749 except Exception as e: 

750 self.app.show_error("Apply Error", f"Failed to apply edited pattern: {str(e)}") 

751 

752 def _generate_pattern_python_code(self) -> str: 

753 """Generate Python code representation of the current pattern.""" 

754 # Update pattern data first 

755 self._update_pattern_data() 

756 

757 # Generate imports for all functions in the pattern 

758 imports = set() 

759 self._collect_function_imports(self.pattern_data, imports) 

760 

761 # Create the Python code 

762 code_lines = [ 

763 "# Edit this function pattern and save to apply changes", 

764 "# The pattern variable will be parsed and applied to the TUI", 

765 "", 

766 ] 

767 

768 # Add imports 

769 if imports: 

770 code_lines.extend(sorted(imports)) 

771 code_lines.append("") 

772 

773 # Add the pattern assignment 

774 code_lines.append("# Function pattern:") 

775 pattern_repr = self._format_pattern_for_code(self.pattern_data) 

776 code_lines.append(f"pattern = {pattern_repr}") 

777 

778 return "\n".join(code_lines) 

779 

780 def _collect_function_imports(self, pattern, imports): 

781 """Recursively collect import statements for all functions in pattern.""" 

782 if callable(pattern): 

783 if hasattr(pattern, '__module__') and hasattr(pattern, '__name__'): 

784 imports.add(f"from {pattern.__module__} import {pattern.__name__}") 

785 elif isinstance(pattern, tuple) and len(pattern) == 2: 

786 func, _ = pattern 

787 self._collect_function_imports(func, imports) 

788 elif isinstance(pattern, list): 

789 for item in pattern: 

790 self._collect_function_imports(item, imports) 

791 elif isinstance(pattern, dict): 

792 for value in pattern.values(): 

793 self._collect_function_imports(value, imports) 

794 

795 def _format_pattern_for_code(self, pattern, indent=0) -> str: 

796 """Format pattern as readable Python code.""" 

797 indent_str = " " * indent 

798 

799 if callable(pattern): 

800 return pattern.__name__ 

801 elif isinstance(pattern, tuple) and len(pattern) == 2: 

802 func, kwargs = pattern 

803 func_name = func.__name__ if callable(func) else str(func) 

804 kwargs_str = repr(kwargs) 

805 return f"({func_name}, {kwargs_str})" 

806 elif isinstance(pattern, list): 

807 if not pattern: 

808 return "[]" 

809 items = [] 

810 for item in pattern: 

811 item_str = self._format_pattern_for_code(item, indent + 1) 

812 items.append(f"{indent_str} {item_str}") 

813 return "[\n" + ",\n".join(items) + f"\n{indent_str}]" 

814 elif isinstance(pattern, dict): 

815 if not pattern: 

816 return "{}" 

817 items = [] 

818 for key, value in pattern.items(): 

819 value_str = self._format_pattern_for_code(value, indent + 1) 

820 items.append(f"{indent_str} {repr(key)}: {value_str}") 

821 return "{\n" + ",\n".join(items) + f"\n{indent_str}" + "}" 

822 else: 

823 return repr(pattern) 

824 

825 def _get_component_button_text(self) -> str: 

826 """Get text for the component selection button based on current group_by setting.""" 

827 if self.current_group_by is None or self.current_group_by == GroupBy.NONE: 

828 return "Component: None" 

829 

830 # Use group_by.value.title() for dynamic component type display 

831 component_type = self.current_group_by.value.title() 

832 

833 if self.is_dict_mode and self.selected_channel is not None: 

834 # Try to get metadata name for the selected component 

835 display_name = self._get_component_display_name(self.selected_channel) 

836 return f"{component_type}: {display_name}" 

837 return f"{component_type}: None" 

838 

839 def _get_component_display_name(self, component_key: str) -> str: 

840 """Get display name for component key, using metadata if available.""" 

841 # Try to get metadata name from orchestrator 

842 orchestrator = self._get_current_orchestrator() 

843 if orchestrator and self.current_group_by: 

844 try: 

845 metadata_name = orchestrator.get_component_metadata(self.current_group_by, component_key) 

846 if metadata_name: 

847 return metadata_name 

848 except Exception as e: 

849 logger.debug(f"Could not get metadata for {self.current_group_by.value} {component_key}: {e}") 

850 

851 # Fallback to component key 

852 return component_key 

853 

854 def _refresh_component_button(self) -> None: 

855 """Refresh the component button text and state.""" 

856 try: 

857 component_button = self.query_one("#component_btn", Button) 

858 component_button.label = self._get_component_button_text() 

859 component_button.disabled = self._is_component_button_disabled() 

860 except Exception as e: 

861 logger.debug(f"Could not refresh component button: {e}") 

862 

863 def _is_component_button_disabled(self) -> bool: 

864 """Check if component selection button should be disabled.""" 

865 return ( 

866 self.current_group_by is None or 

867 self.current_group_by == GroupBy.NONE or 

868 (self.current_variable_components and 

869 self.current_group_by.value in [vc.value for vc in self.current_variable_components]) 

870 ) 

871 

872 async def _show_component_selection_dialog(self) -> None: 

873 """Show the component selection dialog for the current group_by setting.""" 

874 try: 

875 # Check if component selection is disabled 

876 if self._is_component_button_disabled(): 

877 logger.debug("Component selection is disabled") 

878 return 

879 

880 # Get available components from orchestrator using current group_by 

881 orchestrator = self._get_current_orchestrator() 

882 if orchestrator is None: 

883 logger.warning("No orchestrator available for component selection") 

884 return 

885 

886 available_components = orchestrator.get_component_keys(self.current_group_by) 

887 if not available_components: 

888 component_type = self.current_group_by.value 

889 logger.warning(f"No {component_type} values found in current plate") 

890 return 

891 

892 # Get currently selected components from cache or current pattern 

893 if self.current_group_by in self.component_selections: 

894 # Use cached selection for this group_by 

895 selected_components = self.component_selections[self.current_group_by] 

896 logger.debug(f"Step '{self.step_identifier}': Using cached selection for {self.current_group_by.value}: {selected_components}") 

897 elif self.is_dict_mode and isinstance(self.pattern_data, dict): 

898 # Fallback to current pattern keys 

899 selected_components = list(self.pattern_data.keys()) 

900 else: 

901 selected_components = [] 

902 

903 # Show window with dynamic component type 

904 from openhcs.textual_tui.windows import GroupBySelectorWindow 

905 from textual.css.query import NoMatches 

906 

907 def handle_selection(result_components): 

908 if result_components is not None: 

909 self._update_components(result_components) 

910 

911 # Use window-based group-by selector (follows ConfigWindow pattern) 

912 try: 

913 window = self.app.query_one(GroupBySelectorWindow) 

914 # Window exists, update it and open 

915 window.available_channels = available_components 

916 window.selected_channels = selected_components 

917 window.component_type = self.current_group_by.value 

918 window.orchestrator = orchestrator 

919 window.on_result_callback = handle_selection 

920 window.open_state = True 

921 except NoMatches: 

922 # Expected case: window doesn't exist yet, create new one 

923 window = GroupBySelectorWindow( 

924 available_channels=available_components, 

925 selected_channels=selected_components, 

926 on_result_callback=handle_selection, 

927 component_type=self.current_group_by.value, 

928 orchestrator=orchestrator 

929 ) 

930 await self.app.mount(window) 

931 window.open_state = True 

932 

933 except Exception as e: 

934 component_type = self.current_group_by.value if self.current_group_by else "component" 

935 logger.error(f"Failed to show {component_type} selection dialog: {e}") 

936 

937 def _update_components(self, new_components: List[str]) -> None: 

938 """ 

939 Update the pattern based on new component selection. 

940 

941 Uses string component keys directly to match the pattern detection system. 

942 Pattern detection always returns string component values (e.g., '1', '2', '3') when 

943 grouping by any component, so function patterns use string keys for consistency. 

944 """ 

945 if not new_components: 

946 # No components selected - revert to list mode 

947 if self.is_dict_mode: 

948 # Save current functions to list mode 

949 self.pattern_data = self.functions 

950 self.is_dict_mode = False 

951 self.selected_channel = None 

952 logger.debug("Reverted to list mode (no components selected)") 

953 else: 

954 # Use component strings directly - no conversion needed 

955 component_keys = new_components 

956 

957 # Components selected - ensure dict mode 

958 if not self.is_dict_mode: 

959 # Convert to dict mode 

960 current_functions = self.functions 

961 self.pattern_data = {component_keys[0]: current_functions} 

962 self.is_dict_mode = True 

963 self.selected_channel = component_keys[0] 

964 

965 # Add other components with empty functions 

966 for component_key in component_keys[1:]: 

967 self.pattern_data[component_key] = [] 

968 else: 

969 # Already in dict mode - update components 

970 old_pattern = self.pattern_data.copy() if isinstance(self.pattern_data, dict) else {} 

971 new_pattern = {} 

972 

973 # Keep existing functions for components that remain 

974 for component_key in component_keys: 

975 # Check both string and integer keys for backward compatibility 

976 if component_key in old_pattern: 

977 new_pattern[component_key] = old_pattern[component_key] 

978 else: 

979 # Try integer key for backward compatibility 

980 try: 

981 component_int = int(component_key) 

982 if component_int in old_pattern: 

983 new_pattern[component_key] = old_pattern[component_int] 

984 else: 

985 new_pattern[component_key] = [] 

986 except ValueError: 

987 new_pattern[component_key] = [] 

988 

989 self.pattern_data = new_pattern 

990 

991 # Update selected component if needed 

992 if self.selected_channel not in component_keys: 

993 self.selected_channel = component_keys[0] if component_keys else None 

994 if self.selected_channel: 

995 self.functions = new_pattern.get(self.selected_channel, []) 

996 

997 # Save selection to cache for current group_by 

998 if self.current_group_by is not None and self.current_group_by != GroupBy.NONE: 

999 self.component_selections[self.current_group_by] = new_components 

1000 logger.debug(f"Step '{self.step_identifier}': Cached selection for {self.current_group_by.value}: {new_components}") 

1001 

1002 self._commit_and_notify() 

1003 self._refresh_component_button() 

1004 logger.debug(f"Updated components: {new_components}") 

1005 

1006 def _navigate_channel(self, direction: int) -> None: 

1007 """Navigate to next/previous channel (with looping).""" 

1008 if not self.is_dict_mode or not isinstance(self.pattern_data, dict): 

1009 return 

1010 

1011 channels = sorted(self.pattern_data.keys()) 

1012 if len(channels) <= 1: 

1013 return 

1014 

1015 try: 

1016 current_index = channels.index(self.selected_channel) 

1017 new_index = (current_index + direction) % len(channels) 

1018 new_channel = channels[new_index] 

1019 

1020 self._switch_to_channel(new_channel) 

1021 logger.debug(f"Navigated to channel {new_channel}") 

1022 except (ValueError, IndexError): 

1023 logger.warning(f"Failed to navigate channels: current={self.selected_channel}, channels={channels}") 

1024 

1025 

1026 

1027 def _get_current_orchestrator(self): 

1028 """Get the current orchestrator from the app.""" 

1029 try: 

1030 # Get from app - PlateManagerWidget is now in PipelinePlateWindow 

1031 if hasattr(self.app, 'query_one'): 

1032 from openhcs.textual_tui.windows import PipelinePlateWindow 

1033 from openhcs.textual_tui.widgets.plate_manager import PlateManagerWidget 

1034 

1035 # Try to find the PipelinePlateWindow first 

1036 try: 

1037 pipeline_plate_window = self.app.query_one(PipelinePlateWindow) 

1038 plate_manager = pipeline_plate_window.plate_widget 

1039 except: 

1040 # Fallback: try to find PlateManagerWidget directly in the app 

1041 plate_manager = self.app.query_one(PlateManagerWidget) 

1042 

1043 # Use selected_plate (not current_plate!) 

1044 selected_plate = plate_manager.selected_plate 

1045 if selected_plate and selected_plate in plate_manager.orchestrators: 

1046 orchestrator = plate_manager.orchestrators[selected_plate] 

1047 if not orchestrator.is_initialized(): 

1048 orchestrator.initialize() 

1049 return orchestrator 

1050 return None 

1051 except Exception as e: 

1052 logger.error(f"Failed to get orchestrator: {e}") 

1053 return None 

1054