Coverage for openhcs/pyqt_gui/widgets/function_list_editor.py: 0.0%

465 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Function List Editor Widget for PyQt6 GUI. 

3 

4Mirrors the Textual TUI FunctionListEditorWidget with sophisticated parameter forms. 

5Displays a scrollable list of function panes with Add/Load/Save/Code controls. 

6""" 

7 

8import logging 

9import os 

10from typing import List, Union, Dict, Optional 

11 

12from PyQt6.QtWidgets import ( 

13 QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLabel, 

14 QScrollArea 

15) 

16from PyQt6.QtCore import Qt, pyqtSignal 

17 

18from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

19from openhcs.ui.shared.pattern_data_manager import PatternDataManager 

20from openhcs.pyqt_gui.widgets.function_pane import FunctionPaneWidget 

21from openhcs.constants.constants import GroupBy 

22from openhcs.pyqt_gui.shared.color_scheme import PyQt6ColorScheme 

23from openhcs.pyqt_gui.widgets.shared.widget_strategies import _get_enum_display_text 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class FunctionListEditorWidget(QWidget): 

29 """ 

30 Function list editor widget that mirrors Textual TUI functionality. 

31  

32 Displays functions with parameter editing, Add/Delete/Reset buttons, 

33 and Load/Save/Code functionality. 

34 """ 

35 

36 # Signals 

37 function_pattern_changed = pyqtSignal() 

38 

39 def __init__(self, initial_functions: Union[List, Dict, callable, None] = None, 

40 step_identifier: str = None, service_adapter=None, color_scheme: Optional[PyQt6ColorScheme] = None, parent=None): 

41 super().__init__(parent) 

42 

43 # Initialize color scheme 

44 self.color_scheme = color_scheme or PyQt6ColorScheme() 

45 

46 # Initialize services (reuse existing business logic) 

47 self.registry_service = RegistryService() 

48 self.data_manager = PatternDataManager() 

49 self.service_adapter = service_adapter 

50 

51 # Step identifier for cache isolation 

52 self.step_identifier = step_identifier or f"widget_{id(self)}" 

53 

54 # Step configuration properties (mirrors Textual TUI) 

55 self.current_group_by = None # Current GroupBy setting from step editor 

56 self.current_variable_components = [] # Current VariableComponents list from step editor 

57 self.selected_channel = None # Currently selected channel 

58 self.available_channels = [] # Available channels from orchestrator 

59 self.is_dict_mode = False # Whether we're in channel-specific mode 

60 

61 # Component selection cache per GroupBy (mirrors Textual TUI) 

62 self.component_selections = {} 

63 

64 # Initialize pattern data and mode 

65 self._initialize_pattern_data(initial_functions) 

66 

67 # UI components 

68 self.function_panes = [] 

69 

70 self.setup_ui() 

71 self.setup_connections() 

72 

73 logger.debug(f"Function list editor initialized with {len(self.functions)} functions") 

74 

75 def _initialize_pattern_data(self, initial_functions): 

76 """Initialize pattern data from various input formats (mirrors Textual TUI logic).""" 

77 print(f"🔍 FUNC LIST EDITOR _initialize_pattern_data: initial_functions = {initial_functions}") 

78 if initial_functions is None: 

79 self.pattern_data = [] 

80 self.is_dict_mode = False 

81 self.functions = [] 

82 elif callable(initial_functions): 

83 # Single callable: treat as [(callable, {})] 

84 self.pattern_data = [(initial_functions, {})] 

85 self.is_dict_mode = False 

86 self.functions = [(initial_functions, {})] 

87 elif isinstance(initial_functions, tuple) and len(initial_functions) == 2 and callable(initial_functions[0]) and isinstance(initial_functions[1], dict): 

88 # Single tuple (callable, kwargs): treat as [(callable, kwargs)] 

89 self.pattern_data = [initial_functions] 

90 self.is_dict_mode = False 

91 self.functions = [initial_functions] 

92 elif isinstance(initial_functions, list): 

93 print("🔍 FUNC LIST EDITOR: initial_functions is a list, calling _normalize_function_list") 

94 self.pattern_data = initial_functions 

95 self.is_dict_mode = False 

96 self.functions = self._normalize_function_list(initial_functions) 

97 print(f"🔍 FUNC LIST EDITOR: self.functions AFTER normalize = {self.functions}") 

98 elif isinstance(initial_functions, dict): 

99 # Convert any integer keys to string keys for consistency 

100 normalized_dict = {} 

101 for key, value in initial_functions.items(): 

102 str_key = str(key) 

103 normalized_dict[str_key] = self._normalize_function_list(value) if value else [] 

104 

105 self.pattern_data = normalized_dict 

106 self.is_dict_mode = True 

107 

108 # Set selected channel to first key and load its functions 

109 if normalized_dict: 

110 self.selected_channel = next(iter(normalized_dict.keys())) 

111 self.functions = normalized_dict[self.selected_channel] 

112 else: 

113 self.selected_channel = None 

114 self.functions = [] 

115 else: 

116 logger.warning(f"Unknown initial_functions type: {type(initial_functions)}") 

117 self.pattern_data = [] 

118 self.is_dict_mode = False 

119 self.functions = [] 

120 

121 def _normalize_function_list(self, func_list): 

122 """Normalize function list using PatternDataManager.""" 

123 print(f"🔍 NORMALIZE: INPUT = {func_list}") 

124 # Handle single tuple (function, kwargs) case - wrap in list 

125 if isinstance(func_list, tuple) and len(func_list) == 2 and callable(func_list[0]) and isinstance(func_list[1], dict): 

126 func_list = [func_list] 

127 # Handle single callable case - wrap in list with empty kwargs 

128 elif callable(func_list): 

129 func_list = [(func_list, {})] 

130 # Handle empty or None case 

131 elif not func_list: 

132 return [] 

133 

134 normalized = [] 

135 for i, item in enumerate(func_list): 

136 print(f"🔍 NORMALIZE: Processing item {i}: {item}") 

137 func, kwargs = self.data_manager.extract_func_and_kwargs(item) 

138 print(f"🔍 NORMALIZE: Extracted func={func.__name__ if func else None}, kwargs={kwargs}") 

139 if func: 

140 normalized.append((func, kwargs)) 

141 print(f"🔍 NORMALIZE: OUTPUT = {normalized}") 

142 return normalized 

143 

144 def setup_ui(self): 

145 """Setup the user interface.""" 

146 layout = QVBoxLayout(self) 

147 layout.setContentsMargins(0, 0, 0, 0) 

148 layout.setSpacing(8) 

149 

150 # Header with controls (mirrors Textual TUI) 

151 header_layout = QHBoxLayout() 

152 

153 functions_label = QLabel("Functions") 

154 functions_label.setStyleSheet(f"color: {self.color_scheme.to_hex(self.color_scheme.text_accent)}; font-weight: bold; font-size: 14px;") 

155 header_layout.addWidget(functions_label) 

156 

157 header_layout.addStretch() 

158 

159 # Control buttons (mirrors Textual TUI) 

160 add_btn = QPushButton("Add") 

161 add_btn.setMaximumWidth(60) 

162 add_btn.setStyleSheet(self._get_button_style()) 

163 add_btn.clicked.connect(self.add_function) 

164 header_layout.addWidget(add_btn) 

165 

166 # Code button supersedes Load/Save buttons (provides both functionality via text editor) 

167 code_btn = QPushButton("Code") 

168 code_btn.setMaximumWidth(60) 

169 code_btn.setStyleSheet(self._get_button_style()) 

170 code_btn.clicked.connect(self.edit_function_code) 

171 header_layout.addWidget(code_btn) 

172 

173 # Component selection button (mirrors Textual TUI) 

174 self.component_btn = QPushButton(self._get_component_button_text()) 

175 self.component_btn.setMaximumWidth(120) 

176 self.component_btn.setStyleSheet(self._get_button_style()) 

177 self.component_btn.clicked.connect(self.show_component_selection_dialog) 

178 self.component_btn.setEnabled(not self._is_component_button_disabled()) 

179 header_layout.addWidget(self.component_btn) 

180 

181 # Channel navigation buttons (only in dict mode with multiple channels, mirrors Textual TUI) 

182 self.prev_channel_btn = QPushButton("<") 

183 self.prev_channel_btn.setMaximumWidth(30) 

184 self.prev_channel_btn.setStyleSheet(self._get_button_style()) 

185 self.prev_channel_btn.clicked.connect(lambda: self._navigate_channel(-1)) 

186 header_layout.addWidget(self.prev_channel_btn) 

187 

188 self.next_channel_btn = QPushButton(">") 

189 self.next_channel_btn.setMaximumWidth(30) 

190 self.next_channel_btn.setStyleSheet(self._get_button_style()) 

191 self.next_channel_btn.clicked.connect(lambda: self._navigate_channel(1)) 

192 header_layout.addWidget(self.next_channel_btn) 

193 

194 # Update navigation button visibility 

195 self._update_navigation_buttons() 

196 

197 header_layout.addStretch() 

198 layout.addLayout(header_layout) 

199 

200 # Scrollable function list (mirrors Textual TUI) 

201 self.scroll_area = QScrollArea() 

202 self.scroll_area.setWidgetResizable(True) 

203 self.scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded) 

204 self.scroll_area.setStyleSheet(f""" 

205 QScrollArea {{ 

206 background-color: {self.color_scheme.to_hex(self.color_scheme.panel_bg)}; 

207 border: 1px solid {self.color_scheme.to_hex(self.color_scheme.border_color)}; 

208 border-radius: 4px; 

209 }} 

210 """) 

211 

212 # Function list container 

213 self.function_container = QWidget() 

214 self.function_layout = QVBoxLayout(self.function_container) 

215 self.function_layout.setAlignment(Qt.AlignmentFlag.AlignTop) 

216 self.function_layout.setSpacing(8) 

217 

218 # Populate function list 

219 self._populate_function_list() 

220 

221 self.scroll_area.setWidget(self.function_container) 

222 layout.addWidget(self.scroll_area) 

223 

224 def _get_button_style(self) -> str: 

225 """Get consistent button styling.""" 

226 return """ 

227 QPushButton { 

228 background-color: {self.color_scheme.to_hex(self.color_scheme.input_bg)}; 

229 color: white; 

230 border: 1px solid {self.color_scheme.to_hex(self.color_scheme.border_light)}; 

231 border-radius: 3px; 

232 padding: 6px 12px; 

233 font-size: 11px; 

234 } 

235 QPushButton:hover { 

236 background-color: {self.color_scheme.to_hex(self.color_scheme.button_hover_bg)}; 

237 } 

238 QPushButton:pressed { 

239 background-color: {self.color_scheme.to_hex(self.color_scheme.button_pressed_bg)}; 

240 } 

241 """ 

242 

243 def _populate_function_list(self): 

244 """Populate function list with panes (mirrors Textual TUI).""" 

245 # Clear existing panes - CRITICAL: Manually unregister form managers BEFORE deleteLater() 

246 # This prevents RuntimeError when new widgets try to connect to deleted managers 

247 for pane in self.function_panes: 

248 # Explicitly unregister the form manager before scheduling deletion 

249 if hasattr(pane, 'form_manager') and pane.form_manager is not None: 

250 try: 

251 pane.form_manager.unregister_from_cross_window_updates() 

252 except RuntimeError: 

253 pass # Already deleted 

254 pane.deleteLater() # Schedule for deletion - triggers destroyed signal 

255 self.function_panes.clear() 

256 

257 # Clear layout 

258 while self.function_layout.count(): 

259 child = self.function_layout.takeAt(0) 

260 if child.widget(): 

261 # Unregister form manager if it exists 

262 widget = child.widget() 

263 if hasattr(widget, 'form_manager') and widget.form_manager is not None: 

264 try: 

265 widget.form_manager.unregister_from_cross_window_updates() 

266 except RuntimeError: 

267 pass # Already deleted 

268 widget.deleteLater() # Schedule for deletion instead of just orphaning 

269 

270 if not self.functions: 

271 # Show empty state 

272 empty_label = QLabel("No functions defined. Click 'Add' to begin.") 

273 empty_label.setAlignment(Qt.AlignmentFlag.AlignCenter) 

274 empty_label.setStyleSheet(f"color: {self.color_scheme.to_hex(self.color_scheme.text_disabled)}; font-style: italic; padding: 20px;") 

275 self.function_layout.addWidget(empty_label) 

276 else: 

277 # Create function panes 

278 for i, func_item in enumerate(self.functions): 

279 print(f"🔍 FUNC LIST EDITOR: Creating pane {i} with func_item = {func_item}") 

280 pane = FunctionPaneWidget(func_item, i, self.service_adapter, color_scheme=self.color_scheme) 

281 

282 # Connect signals (using actual FunctionPaneWidget signal names) 

283 pane.move_function.connect(self._move_function) 

284 pane.add_function.connect(self._add_function_at_index) 

285 pane.remove_function.connect(self._remove_function) 

286 pane.parameter_changed.connect(self._on_parameter_changed) 

287 

288 self.function_panes.append(pane) 

289 self.function_layout.addWidget(pane) 

290 

291 # CRITICAL FIX: Apply initial enabled styling for function panes 

292 # This ensures that when the function pattern editor opens, disabled functions 

293 # show the correct dimmed styling immediately, not just after toggling 

294 if hasattr(pane, 'form_manager') and pane.form_manager is not None: 

295 # Use QTimer to ensure this runs after the widget is fully constructed 

296 from PyQt6.QtCore import QTimer 

297 QTimer.singleShot(0, lambda p=pane: self._apply_initial_enabled_styling_to_pane(p)) 

298 

299 def _apply_initial_enabled_styling_to_pane(self, pane): 

300 """Apply initial enabled styling to a function pane. 

301 

302 This is called after a function pane is created to ensure that disabled functions 

303 show the correct dimmed styling immediately when the function pattern editor opens. 

304 

305 Args: 

306 pane: FunctionPaneWidget instance to apply styling to 

307 """ 

308 try: 

309 if hasattr(pane, 'form_manager') and pane.form_manager is not None: 

310 # Check if the form manager has an enabled field 

311 if 'enabled' in pane.form_manager.parameters: 

312 # Apply the initial enabled styling 

313 if hasattr(pane.form_manager, '_apply_initial_enabled_styling'): 

314 pane.form_manager._apply_initial_enabled_styling() 

315 except Exception as e: 

316 # Log error but don't crash the UI 

317 import logging 

318 logger = logging.getLogger(__name__) 

319 logger.warning(f"Failed to apply initial enabled styling to function pane: {e}") 

320 

321 def setup_connections(self): 

322 """Setup signal/slot connections.""" 

323 pass 

324 

325 def add_function(self): 

326 """Add a new function (mirrors Textual TUI).""" 

327 from openhcs.pyqt_gui.dialogs.function_selector_dialog import FunctionSelectorDialog 

328 

329 # Show function selector dialog (reuses Textual TUI logic) 

330 selected_function = FunctionSelectorDialog.select_function(parent=self) 

331 

332 if selected_function: 

333 # Add function to list (same logic as Textual TUI) 

334 new_func_item = (selected_function, {}) 

335 self.functions.append(new_func_item) 

336 self._update_pattern_data() 

337 self._populate_function_list() 

338 self.function_pattern_changed.emit() 

339 logger.debug(f"Added function: {selected_function.__name__}") 

340 

341 

342 

343 def edit_function_code(self): 

344 """Edit function pattern as code (simple and direct).""" 

345 logger.debug("Edit function code clicked - opening code editor") 

346 

347 # Validation guard: Check for empty patterns 

348 if not self.functions and not self.pattern_data: 

349 if self.service_adapter: 

350 self.service_adapter.show_info_dialog("No function pattern to edit. Add functions first.") 

351 return 

352 

353 try: 

354 # Update pattern data first 

355 self._update_pattern_data() 

356 

357 # Generate complete Python code with imports 

358 python_code = self._generate_complete_python_code() 

359 

360 # Create simple code editor service 

361 from openhcs.pyqt_gui.services.simple_code_editor import SimpleCodeEditorService 

362 editor_service = SimpleCodeEditorService(self) 

363 

364 # Check if user wants external editor (check environment variable) 

365 use_external = os.environ.get('OPENHCS_USE_EXTERNAL_EDITOR', '').lower() in ('1', 'true', 'yes') 

366 

367 # Launch editor with callback and code_type for clean mode toggle 

368 editor_service.edit_code( 

369 initial_content=python_code, 

370 title="Edit Function Pattern", 

371 callback=self._handle_edited_pattern, 

372 use_external=use_external, 

373 code_type='function', 

374 code_data={'pattern_data': self.pattern_data, 'clean_mode': False} 

375 ) 

376 

377 except Exception as e: 

378 logger.error(f"Failed to launch code editor: {e}") 

379 if self.service_adapter: 

380 self.service_adapter.show_error_dialog(f"Failed to launch code editor: {str(e)}") 

381 

382 def _generate_complete_python_code(self) -> str: 

383 """Generate complete Python code with imports (following debug module approach).""" 

384 # Use complete function pattern code generation from pickle_to_python 

385 from openhcs.debug.pickle_to_python import generate_complete_function_pattern_code 

386 

387 # Disable clean_mode to preserve all parameters when same function appears multiple times 

388 # This prevents parsing issues when the same function has different parameter sets 

389 return generate_complete_function_pattern_code(self.pattern_data, clean_mode=False) 

390 

391 def _handle_edited_pattern(self, edited_code: str) -> None: 

392 """Handle the edited pattern code from code editor.""" 

393 try: 

394 # Ensure we have a string 

395 if not isinstance(edited_code, str): 

396 logger.error(f"Expected string, got {type(edited_code)}: {edited_code}") 

397 raise ValueError("Invalid code format received from editor") 

398 

399 # CRITICAL FIX: Execute code with lazy dataclass constructor patching to preserve None vs concrete distinction 

400 namespace = {} 

401 with self._patch_lazy_constructors(): 

402 exec(edited_code, namespace) 

403 

404 # Get the pattern from the namespace 

405 if 'pattern' in namespace: 

406 new_pattern = namespace['pattern'] 

407 self._apply_edited_pattern(new_pattern) 

408 else: 

409 raise ValueError("No 'pattern = ...' assignment found in edited code") 

410 

411 except (SyntaxError, Exception) as e: 

412 logger.error(f"Failed to parse edited pattern: {e}") 

413 # Re-raise so the code editor can handle it (keep dialog open, move cursor to error line) 

414 raise 

415 

416 def _apply_edited_pattern(self, new_pattern): 

417 """Apply the edited pattern back to the UI.""" 

418 try: 

419 if self.is_dict_mode: 

420 if isinstance(new_pattern, dict): 

421 self.pattern_data = new_pattern 

422 # Update current channel if it exists in new pattern 

423 if self.selected_channel and self.selected_channel in new_pattern: 

424 self.functions = self._normalize_function_list(new_pattern[self.selected_channel]) 

425 else: 

426 # Select first channel 

427 if new_pattern: 

428 self.selected_channel = next(iter(new_pattern)) 

429 self.functions = self._normalize_function_list(new_pattern[self.selected_channel]) 

430 else: 

431 self.functions = [] 

432 else: 

433 raise ValueError("Expected dict pattern for dict mode") 

434 else: 

435 if isinstance(new_pattern, list): 

436 self.pattern_data = new_pattern 

437 self.functions = self._normalize_function_list(new_pattern) 

438 elif callable(new_pattern): 

439 # Single callable: treat as [(callable, {})] 

440 self.pattern_data = [(new_pattern, {})] 

441 self.functions = [(new_pattern, {})] 

442 elif isinstance(new_pattern, tuple) and len(new_pattern) == 2 and callable(new_pattern[0]) and isinstance(new_pattern[1], dict): 

443 # Single tuple (callable, kwargs): treat as [(callable, kwargs)] 

444 self.pattern_data = [new_pattern] 

445 self.functions = [new_pattern] 

446 else: 

447 raise ValueError(f"Expected list, callable, or (callable, dict) tuple pattern for list mode, got {type(new_pattern)}") 

448 

449 # Refresh the UI and notify of changes 

450 self._populate_function_list() 

451 self.function_pattern_changed.emit() 

452 

453 except Exception as e: 

454 if self.service_adapter: 

455 self.service_adapter.show_error_dialog(f"Failed to apply edited pattern: {str(e)}") 

456 

457 def _patch_lazy_constructors(self): 

458 """Context manager that patches lazy dataclass constructors to preserve None vs concrete distinction.""" 

459 from contextlib import contextmanager 

460 from openhcs.core.lazy_placeholder import LazyDefaultPlaceholderService 

461 import dataclasses 

462 

463 @contextmanager 

464 def patch_context(): 

465 # Store original constructors 

466 original_constructors = {} 

467 

468 # Find all lazy dataclass types that need patching 

469 from openhcs.core.config import LazyZarrConfig, LazyStepMaterializationConfig, LazyWellFilterConfig 

470 lazy_types = [LazyZarrConfig, LazyStepMaterializationConfig, LazyWellFilterConfig] 

471 

472 # Add any other lazy types that might be used 

473 for lazy_type in lazy_types: 

474 if LazyDefaultPlaceholderService.has_lazy_resolution(lazy_type): 

475 # Store original constructor 

476 original_constructors[lazy_type] = lazy_type.__init__ 

477 

478 # Create patched constructor that uses raw values 

479 def create_patched_init(original_init, dataclass_type): 

480 def patched_init(self, **kwargs): 

481 # Use raw value approach instead of calling original constructor 

482 # This prevents lazy resolution during code execution 

483 for field in dataclasses.fields(dataclass_type): 

484 value = kwargs.get(field.name, None) 

485 object.__setattr__(self, field.name, value) 

486 

487 # Initialize any required lazy dataclass attributes 

488 if hasattr(dataclass_type, '_is_lazy_dataclass'): 

489 object.__setattr__(self, '_is_lazy_dataclass', True) 

490 

491 return patched_init 

492 

493 # Apply the patch 

494 lazy_type.__init__ = create_patched_init(original_constructors[lazy_type], lazy_type) 

495 

496 try: 

497 yield 

498 finally: 

499 # Restore original constructors 

500 for lazy_type, original_init in original_constructors.items(): 

501 lazy_type.__init__ = original_init 

502 

503 return patch_context() 

504 

505 def _move_function(self, index, direction): 

506 """Move function up or down.""" 

507 if 0 <= index < len(self.functions): 

508 new_index = index + direction 

509 if 0 <= new_index < len(self.functions): 

510 # Swap functions 

511 self.functions[index], self.functions[new_index] = self.functions[new_index], self.functions[index] 

512 self._update_pattern_data() 

513 self._populate_function_list() 

514 self.function_pattern_changed.emit() 

515 

516 def _add_function_at_index(self, index): 

517 """Add function at specific index (mirrors Textual TUI).""" 

518 from openhcs.pyqt_gui.dialogs.function_selector_dialog import FunctionSelectorDialog 

519 

520 # Show function selector dialog (reuses Textual TUI logic) 

521 selected_function = FunctionSelectorDialog.select_function(parent=self) 

522 

523 if selected_function: 

524 # Insert function at specific index (same logic as Textual TUI) 

525 new_func_item = (selected_function, {}) 

526 self.functions.insert(index, new_func_item) 

527 self._update_pattern_data() 

528 self._populate_function_list() 

529 self.function_pattern_changed.emit() 

530 logger.debug(f"Added function at index {index}: {selected_function.__name__}") 

531 

532 def _remove_function(self, index): 

533 """Remove function at index.""" 

534 if 0 <= index < len(self.functions): 

535 self.functions.pop(index) 

536 self._update_pattern_data() 

537 self._populate_function_list() 

538 self.function_pattern_changed.emit() 

539 

540 def _on_parameter_changed(self, index, param_name, value): 

541 """Handle parameter change from function pane.""" 

542 if 0 <= index < len(self.functions): 

543 func, kwargs = self.functions[index] 

544 kwargs[param_name] = value 

545 self.functions[index] = (func, kwargs) 

546 self._update_pattern_data() 

547 self.function_pattern_changed.emit() 

548 

549 

550 

551 

552 

553 def get_current_functions(self): 

554 """Get current function list.""" 

555 return self.functions.copy() 

556 

557 @property 

558 def current_pattern(self): 

559 """Get the current pattern data (for parent widgets to access).""" 

560 self._update_pattern_data() # Ensure it's up to date 

561 

562 # Migration fix: Convert any integer keys to string keys for compatibility 

563 # with pattern detection system which always uses string component values 

564 if isinstance(self.pattern_data, dict): 

565 migrated_pattern = {} 

566 for key, value in self.pattern_data.items(): 

567 str_key = str(key) 

568 migrated_pattern[str_key] = value 

569 return migrated_pattern 

570 

571 return self.pattern_data 

572 

573 def set_functions(self, functions): 

574 """Set function list and refresh display.""" 

575 self.functions = functions.copy() if functions else [] 

576 self._update_pattern_data() 

577 self._populate_function_list() 

578 

579 def _get_component_button_text(self) -> str: 

580 """Get text for the component selection button (mirrors Textual TUI).""" 

581 if self.current_group_by is None or self.current_group_by == GroupBy.NONE: 

582 return "Component: None" 

583 

584 # Use the existing _get_enum_display_text function for consistent enum display handling 

585 component_type = _get_enum_display_text(self.current_group_by).title() 

586 

587 if self.is_dict_mode and self.selected_channel is not None: 

588 # Try to get metadata name for the selected component 

589 display_name = self._get_component_display_name(self.selected_channel) 

590 return f"{component_type}: {display_name}" 

591 return f"{component_type}: None" 

592 

593 def _get_component_display_name(self, component_key: str) -> str: 

594 """Get display name for component key, using metadata if available (mirrors Textual TUI).""" 

595 orchestrator = self._get_current_orchestrator() 

596 if orchestrator and self.current_group_by: 

597 metadata_name = orchestrator.metadata_cache.get_component_metadata(self.current_group_by, component_key) 

598 if metadata_name: 

599 return metadata_name 

600 return component_key 

601 

602 def _is_component_button_disabled(self) -> bool: 

603 """Check if component selection button should be disabled (mirrors Textual TUI).""" 

604 return ( 

605 self.current_group_by is None or 

606 self.current_group_by == GroupBy.NONE or 

607 (self.current_variable_components and 

608 self.current_group_by.value in [vc.value for vc in self.current_variable_components]) 

609 ) 

610 

611 def show_component_selection_dialog(self): 

612 """Show the component selection dialog (mirrors Textual TUI).""" 

613 from openhcs.pyqt_gui.dialogs.group_by_selector_dialog import GroupBySelectorDialog 

614 

615 # Check if component selection is disabled 

616 if self._is_component_button_disabled(): 

617 logger.debug("Component selection is disabled") 

618 return 

619 

620 # Get available components from orchestrator using current group_by - MUST exist, no fallbacks 

621 orchestrator = self._get_current_orchestrator() 

622 

623 available_components = orchestrator.get_component_keys(self.current_group_by) 

624 assert available_components, f"No {self.current_group_by.value} values found in current plate" 

625 

626 # Get current selection from pattern data (mirrors Textual TUI logic) 

627 selected_components = self._get_current_component_selection() 

628 

629 

630 

631 # Show group by selector dialog (reuses Textual TUI logic) 

632 result = GroupBySelectorDialog.select_components( 

633 available_components=available_components, 

634 selected_components=selected_components, 

635 group_by=self.current_group_by, 

636 orchestrator=orchestrator, 

637 parent=self 

638 ) 

639 

640 if result is not None: 

641 self._handle_component_selection(result) 

642 

643 def _get_current_orchestrator(self): 

644 """Get current orchestrator instance - MUST exist, no fallbacks allowed.""" 

645 # Use stored main window reference to get plate manager 

646 main_window = self.main_window 

647 plate_manager_window = main_window.floating_windows['plate_manager'] 

648 

649 # Find the actual plate manager widget 

650 plate_manager_widget = None 

651 for child in plate_manager_window.findChildren(QWidget): 

652 if hasattr(child, 'orchestrators') and hasattr(child, 'selected_plate_path'): 

653 plate_manager_widget = child 

654 break 

655 

656 # Get current plate from plate manager's selection 

657 current_plate = plate_manager_widget.selected_plate_path 

658 orchestrator = plate_manager_widget.orchestrators[current_plate] 

659 

660 # Orchestrator must be initialized 

661 assert orchestrator.is_initialized(), f"Orchestrator for plate {current_plate} is not initialized" 

662 

663 return orchestrator 

664 

665 def _get_current_component_selection(self): 

666 """Get current component selection from pattern data (mirrors Textual TUI logic).""" 

667 # If in dict mode, return the keys of the dict as the current selection (sorted) 

668 if self.is_dict_mode and isinstance(self.pattern_data, dict): 

669 return sorted(list(self.pattern_data.keys())) 

670 

671 # If not in dict mode, check the cache (sorted) 

672 cached_selection = self.component_selections.get(self.current_group_by, []) 

673 return sorted(cached_selection) 

674 

675 def _handle_component_selection(self, new_components): 

676 """Handle component selection result (mirrors Textual TUI).""" 

677 # Save selection to cache for current group_by 

678 if self.current_group_by is not None and self.current_group_by != GroupBy.NONE: 

679 self.component_selections[self.current_group_by] = new_components 

680 logger.debug(f"Step '{self.step_identifier}': Cached selection for {self.current_group_by.value}: {new_components}") 

681 

682 # Update pattern structure based on component selection (mirrors Textual TUI) 

683 self._update_components(new_components) 

684 

685 # Update component button text and navigation 

686 self._refresh_component_button() 

687 logger.debug(f"Updated components: {new_components}") 

688 

689 self.function_pattern_changed.emit() 

690 

691 def _update_components(self, new_components): 

692 """Update function pattern structure based on component selection (mirrors Textual TUI).""" 

693 # Sort new components for consistent ordering 

694 if new_components: 

695 new_components = sorted(new_components) 

696 

697 if not new_components: 

698 # No components selected - revert to list mode 

699 if self.is_dict_mode: 

700 # Save current functions to list mode 

701 self.pattern_data = self.functions 

702 self.is_dict_mode = False 

703 self.selected_channel = None 

704 logger.debug("Reverted to list mode (no components selected)") 

705 else: 

706 # Use component strings directly - no conversion needed 

707 component_keys = new_components 

708 

709 # Components selected - ensure dict mode 

710 if not self.is_dict_mode: 

711 # Convert to dict mode 

712 current_functions = self.functions 

713 self.pattern_data = {component_keys[0]: current_functions} 

714 self.is_dict_mode = True 

715 self.selected_channel = component_keys[0] 

716 

717 # Add other components with empty functions 

718 for component_key in component_keys[1:]: 

719 self.pattern_data[component_key] = [] 

720 else: 

721 # Already in dict mode - update components 

722 old_pattern = self.pattern_data.copy() if isinstance(self.pattern_data, dict) else {} 

723 

724 # Create a persistent storage for deselected components (mirrors Textual TUI) 

725 if not hasattr(self, '_deselected_components_storage'): 

726 self._deselected_components_storage = {} 

727 

728 # Save currently deselected components to storage 

729 for old_key, old_functions in old_pattern.items(): 

730 if old_key not in component_keys: 

731 self._deselected_components_storage[old_key] = old_functions 

732 logger.debug(f"Saved {len(old_functions)} functions for deselected component {old_key}") 

733 

734 new_pattern = {} 

735 

736 # Restore functions for components (from current pattern or storage) 

737 for component_key in component_keys: 

738 if component_key in old_pattern: 

739 # Component was already selected - keep its functions 

740 new_pattern[component_key] = old_pattern[component_key] 

741 elif component_key in self._deselected_components_storage: 

742 # Component was previously deselected - restore its functions 

743 new_pattern[component_key] = self._deselected_components_storage[component_key] 

744 logger.debug(f"Restored {len(new_pattern[component_key])} functions for reselected component {component_key}") 

745 else: 

746 # New component - start with empty functions 

747 new_pattern[component_key] = [] 

748 

749 self.pattern_data = new_pattern 

750 

751 # Update selected channel if current one is no longer available 

752 if self.selected_channel not in component_keys: 

753 self.selected_channel = component_keys[0] 

754 self.functions = new_pattern[self.selected_channel] 

755 

756 # Update UI to reflect changes 

757 self._populate_function_list() 

758 self._update_navigation_buttons() 

759 

760 def _refresh_component_button(self): 

761 """Refresh the component button text and state (mirrors Textual TUI).""" 

762 if hasattr(self, 'component_btn'): 

763 self.component_btn.setText(self._get_component_button_text()) 

764 self.component_btn.setEnabled(not self._is_component_button_disabled()) 

765 

766 # Also update navigation buttons when component button is refreshed 

767 self._update_navigation_buttons() 

768 

769 

770 

771 def _update_navigation_buttons(self): 

772 """Update visibility of channel navigation buttons (mirrors Textual TUI).""" 

773 if hasattr(self, 'prev_channel_btn') and hasattr(self, 'next_channel_btn'): 

774 # Show navigation buttons only in dict mode with multiple channels 

775 show_nav = (self.is_dict_mode and 

776 isinstance(self.pattern_data, dict) and 

777 len(self.pattern_data) > 1) 

778 

779 self.prev_channel_btn.setVisible(show_nav) 

780 self.next_channel_btn.setVisible(show_nav) 

781 

782 def _navigate_channel(self, direction: int): 

783 """Navigate to next/previous channel (with looping, mirrors Textual TUI).""" 

784 if not self.is_dict_mode or not isinstance(self.pattern_data, dict): 

785 return 

786 

787 channels = sorted(self.pattern_data.keys()) 

788 if len(channels) <= 1: 

789 return 

790 

791 try: 

792 current_index = channels.index(self.selected_channel) 

793 new_index = (current_index + direction) % len(channels) 

794 new_channel = channels[new_index] 

795 

796 self._switch_to_channel(new_channel) 

797 logger.debug(f"Navigated to channel {new_channel}") 

798 except (ValueError, IndexError): 

799 logger.warning(f"Failed to navigate channels: current={self.selected_channel}, channels={channels}") 

800 

801 def _switch_to_channel(self, channel: str): 

802 """Switch to editing functions for a specific channel (mirrors Textual TUI).""" 

803 if not self.is_dict_mode: 

804 return 

805 

806 # Save current functions first 

807 old_channel = self.selected_channel 

808 logger.debug(f"Switching from channel {old_channel} to {channel}") 

809 

810 self._update_pattern_data() 

811 

812 # Switch to new channel 

813 self.selected_channel = channel 

814 if isinstance(self.pattern_data, dict): 

815 self.functions = self.pattern_data.get(channel, []) 

816 logger.debug(f"Loaded {len(self.functions)} functions for channel {channel}") 

817 else: 

818 self.functions = [] 

819 

820 # Update UI 

821 self._refresh_component_button() 

822 self._populate_function_list() 

823 

824 def _update_pattern_data(self): 

825 """Update pattern_data based on current functions and mode (mirrors Textual TUI).""" 

826 if self.is_dict_mode and self.selected_channel is not None: 

827 # Save current functions to the selected channel 

828 if not isinstance(self.pattern_data, dict): 

829 self.pattern_data = {} 

830 logger.debug(f"Saving {len(self.functions)} functions to channel {self.selected_channel}") 

831 self.pattern_data[self.selected_channel] = self.functions.copy() 

832 else: 

833 # List mode - pattern_data is just the functions list 

834 self.pattern_data = self.functions