Coverage for openhcs/core/orchestrator/orchestrator.py: 37.7%

596 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2Consolidated orchestrator module for OpenHCS. 

3 

4This module provides a unified PipelineOrchestrator class that implements 

5a two-phase (compile-all-then-execute-all) pipeline execution model. 

6 

7Doctrinal Clauses: 

8- Clause 12 — Absolute Clean Execution 

9- Clause 66 — Immutability After Construction 

10- Clause 88 — No Inferred Capabilities 

11- Clause 293 — GPU Pre-Declaration Enforcement 

12- Clause 295 — GPU Scheduling Affinity 

13""" 

14 

15import logging 

16import contextlib 

17import concurrent.futures 

18import multiprocessing 

19from dataclasses import fields 

20from pathlib import Path 

21from typing import Any, Callable, Dict, List, Optional, Union, Set 

22 

23from openhcs.constants.constants import Backend, DEFAULT_WORKSPACE_DIR_SUFFIX, DEFAULT_IMAGE_EXTENSIONS, GroupBy, OrchestratorState, get_openhcs_config, AllComponents, VariableComponents 

24from openhcs.constants import Microscope 

25from openhcs.core.config import GlobalPipelineConfig 

26from openhcs.config_framework.global_config import set_current_global_config, get_current_global_config 

27from openhcs.config_framework.lazy_factory import ContextProvider 

28 

29 

30from openhcs.core.metadata_cache import get_metadata_cache, MetadataCache 

31from openhcs.core.context.processing_context import ProcessingContext 

32from openhcs.core.pipeline.compiler import PipelineCompiler 

33from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper 

34from openhcs.core.steps.abstract import AbstractStep 

35from openhcs.core.components.validation import convert_enum_by_value 

36from openhcs.io.filemanager import FileManager 

37# Zarr backend is CPU-only; always import it (even in subprocess/no-GPU mode) 

38import os 

39from openhcs.io.zarr import ZarrStorageBackend 

40# PipelineConfig now imported directly above 

41from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization 

42from openhcs.io.exceptions import StorageWriteError 

43from openhcs.io.base import storage_registry 

44from openhcs.microscopes import create_microscope_handler 

45from openhcs.microscopes.microscope_base import MicroscopeHandler 

46 

47# Conditional analysis import - skip in subprocess runner mode 

48if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 48 ↛ 50line 48 didn't jump to line 50 because the condition on line 48 was never true

49 # Subprocess runner mode - create placeholder 

50 def consolidate_analysis_results(*args, **kwargs): 

51 """Placeholder for subprocess runner mode.""" 

52 raise RuntimeError("Analysis consolidation not available in subprocess runner mode") 

53else: 

54 from openhcs.processing.backends.analysis.consolidate_analysis_results import consolidate_analysis_results 

55 

56# Import generic component system - required for orchestrator functionality 

57from openhcs.core.components.multiprocessing import MultiprocessingCoordinator 

58 

59# Optional napari import for visualization 

60try: 

61 from openhcs.runtime.napari_stream_visualizer import NapariStreamVisualizer 

62 NapariVisualizerType = NapariStreamVisualizer 

63except ImportError: 

64 # Create a placeholder type for type hints when napari is not available 

65 NapariStreamVisualizer = None 

66 NapariVisualizerType = Any # Use Any for type hints when napari is not available 

67 

68# Optional GPU memory management imports 

69try: 

70 from openhcs.core.memory.gpu_cleanup import log_gpu_memory_usage, cleanup_all_gpu_frameworks 

71except ImportError: 

72 log_gpu_memory_usage = None 

73 cleanup_all_gpu_frameworks = None 

74 

75 

76logger = logging.getLogger(__name__) 

77 

78 

79def _create_merged_config(pipeline_config: 'PipelineConfig', global_config: GlobalPipelineConfig) -> GlobalPipelineConfig: 

80 """ 

81 Pure function for creating merged config that preserves None values for sibling inheritance. 

82 

83 Follows OpenHCS stateless architecture principles - no side effects, explicit dependencies. 

84 Extracted from apply_pipeline_config to eliminate code duplication. 

85 """ 

86 logger.debug(f"Starting merge with pipeline_config={type(pipeline_config)} and global_config={type(global_config)}") 

87 

88 # DEBUG: Check what the global_config looks like 

89 if hasattr(global_config, 'step_well_filter_config'): 89 ↛ 95line 89 didn't jump to line 95 because the condition on line 89 was always true

90 step_config = getattr(global_config, 'step_well_filter_config') 

91 if hasattr(step_config, 'well_filter'): 91 ↛ 95line 91 didn't jump to line 95 because the condition on line 91 was always true

92 well_filter_value = getattr(step_config, 'well_filter') 

93 logger.debug(f"global_config has step_well_filter_config.well_filter = {well_filter_value}") 

94 

95 merged_config_values = {} 

96 for field in fields(GlobalPipelineConfig): 

97 # Fail-loud: Let AttributeError bubble up naturally (no getattr fallbacks) 

98 pipeline_value = getattr(pipeline_config, field.name) 

99 

100 if field.name == 'step_well_filter_config': 

101 logger.debug(f"Processing step_well_filter_config: pipeline_value = {pipeline_value}") 

102 

103 if pipeline_value is not None: 

104 # CRITICAL FIX: Convert lazy configs to base configs with resolved values 

105 # This ensures that user-set values from lazy configs are preserved in the thread-local context 

106 # instead of being replaced with static defaults when GlobalPipelineConfig is instantiated 

107 if hasattr(pipeline_value, 'to_base_config'): 

108 # This is a lazy config - convert to base config with resolved values 

109 converted_value = pipeline_value.to_base_config() 

110 merged_config_values[field.name] = converted_value 

111 if field.name == 'step_well_filter_config': 

112 logger.debug(f"Converted lazy config to base: {converted_value}") 

113 else: 

114 # Regular value - use as-is 

115 merged_config_values[field.name] = pipeline_value 

116 if field.name == 'step_well_filter_config': 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 logger.debug(f"Using pipeline value as-is: {pipeline_value}") 

118 else: 

119 global_value = getattr(global_config, field.name) 

120 merged_config_values[field.name] = global_value 

121 if field.name == 'step_well_filter_config': 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 print(f"🔍 MERGE DEBUG: Using global_config value: {global_value}") 

123 

124 result = GlobalPipelineConfig(**merged_config_values) 

125 

126 # DEBUG: Check what the result looks like 

127 if hasattr(result, 'step_well_filter_config'): 127 ↛ 133line 127 didn't jump to line 133 because the condition on line 127 was always true

128 step_config = getattr(result, 'step_well_filter_config') 

129 if hasattr(step_config, 'well_filter'): 129 ↛ 133line 129 didn't jump to line 133 because the condition on line 129 was always true

130 well_filter_value = getattr(step_config, 'well_filter') 

131 print(f"🔍 MERGE DEBUG: Final result has step_well_filter_config.well_filter = {well_filter_value}") 

132 

133 return result 

134 

135 

136def _execute_single_axis_static( 

137 pipeline_definition: List[AbstractStep], 

138 frozen_context: 'ProcessingContext', 

139 visualizer: Optional['NapariVisualizerType'] 

140) -> Dict[str, Any]: 

141 """ 

142 Static version of _execute_single_axis for multiprocessing compatibility. 

143 

144 This function is identical to PipelineOrchestrator._execute_single_axis but doesn't 

145 require an orchestrator instance, making it safe for pickling in ProcessPoolExecutor. 

146 """ 

147 axis_id = frozen_context.axis_id 

148 logger.info(f"🔥 SINGLE_AXIS: Starting execution for axis {axis_id}") 

149 

150 # NUCLEAR VALIDATION 

151 if not frozen_context.is_frozen(): 

152 error_msg = f"🔥 SINGLE_AXIS ERROR: Context for axis {axis_id} is not frozen before execution" 

153 logger.error(error_msg) 

154 raise RuntimeError(error_msg) 

155 

156 if not pipeline_definition: 

157 error_msg = f"🔥 SINGLE_AXIS ERROR: Empty pipeline_definition for axis {axis_id}" 

158 logger.error(error_msg) 

159 raise RuntimeError(error_msg) 

160 

161 # Execute each step in the pipeline 

162 for step_index, step in enumerate(pipeline_definition): 

163 step_name = frozen_context.step_plans[step_index]["step_name"] 

164 

165 logger.info(f"🔥 SINGLE_AXIS: Executing step {step_index+1}/{len(pipeline_definition)} - {step_name} for axis {axis_id}") 

166 

167 if not hasattr(step, 'process'): 

168 error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} missing process method for axis {axis_id}" 

169 logger.error(error_msg) 

170 raise RuntimeError(error_msg) 

171 

172 # CRITICAL: Wrap step processing in config_context(step) for lazy config resolution 

173 from openhcs.config_framework.context_manager import config_context 

174 with config_context(step): 

175 step.process(frozen_context, step_index) 

176 logger.info(f"🔥 SINGLE_AXIS: Step {step_index+1}/{len(pipeline_definition)} - {step_name} completed for axis {axis_id}") 

177 

178 # Handle visualization if requested 

179 if visualizer: 

180 step_plan = frozen_context.step_plans[step_index] 

181 if step_plan['visualize']: 

182 output_dir = step_plan['output_dir'] 

183 write_backend = step_plan['write_backend'] 

184 if output_dir: 

185 logger.debug(f"Visualizing output for step {step_index} from path {output_dir} (backend: {write_backend}) for axis {axis_id}") 

186 visualizer.visualize_path( 

187 step_id=f"step_{step_index}", 

188 path=str(output_dir), 

189 backend=write_backend, 

190 axis_id=axis_id 

191 ) 

192 else: 

193 logger.warning(f"Step {step_index} in axis {axis_id} flagged for visualization but 'output_dir' is missing in its plan.") 

194 

195 logger.info(f"🔥 SINGLE_AXIS: Pipeline execution completed successfully for axis {axis_id}") 

196 return {"status": "success", "axis_id": axis_id} 

197 

198 

199def _configure_worker_logging(log_file_base: str): 

200 """ 

201 Configure logging and import hook for worker process. 

202 

203 This function is called once per worker process when it starts. 

204 Each worker will get its own log file with a unique identifier. 

205 

206 Args: 

207 log_file_base: Base path for worker log files 

208 """ 

209 import os 

210 import logging 

211 import time 

212 

213 # CRITICAL: Skip function registry initialization for fast worker startup 

214 # The environment variable is inherited from the subprocess runner 

215 # Note: We don't log this yet because logging isn't configured 

216 

217 # Note: Import hook system was removed - using existing comprehensive registries 

218 

219 # Create unique worker identifier using PID and timestamp 

220 worker_pid = os.getpid() 

221 worker_timestamp = int(time.time() * 1000000) # Microsecond precision for uniqueness 

222 worker_id = f"{worker_pid}_{worker_timestamp}" 

223 worker_log_file = f"{log_file_base}_worker_{worker_id}.log" 

224 

225 # Configure root logger to capture ALL logs from worker process 

226 root_logger = logging.getLogger() 

227 root_logger.handlers.clear() # Clear any inherited handlers 

228 

229 # Create file handler for worker logs 

230 file_handler = logging.FileHandler(worker_log_file) 

231 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) 

232 root_logger.addHandler(file_handler) 

233 root_logger.setLevel(logging.INFO) 

234 

235 # Ensure all OpenHCS module logs are captured 

236 logging.getLogger("openhcs").setLevel(logging.INFO) 

237 

238 # Get worker logger 

239 worker_logger = logging.getLogger("openhcs.worker") 

240 worker_logger.info(f"🔥 WORKER: Process {worker_pid} (ID: {worker_id}) logging configured") 

241 worker_logger.info(f"🔥 WORKER: All logs writing to: {worker_log_file}") 

242 

243 # Log import hook installation status 

244 worker_logger.info(f"🔥 WORKER: Import hook installed for auto-discovered functions") 

245 

246 

247def _configure_worker_with_gpu(log_file_base: str, global_config_dict: dict): 

248 """ 

249 Configure logging, function registry, and GPU registry for worker process. 

250 

251 This function is called once per worker process when it starts. 

252 It sets up logging, function registry, and GPU registry initialization. 

253 

254 Args: 

255 log_file_base: Base path for worker log files (empty string if no logging) 

256 global_config_dict: Serialized global configuration for GPU registry setup 

257 """ 

258 import logging 

259 import os 

260 

261 # Workers should be allowed to import GPU libs if available. 

262 # The parent subprocess runner may set OPENHCS_SUBPROCESS_NO_GPU=1 to stay lean, 

263 # but that flag must not leak into worker processes. 

264 os.environ.pop('OPENHCS_SUBPROCESS_NO_GPU', None) 

265 

266 # Configure logging only if log_file_base is provided 

267 if log_file_base: 

268 _configure_worker_logging(log_file_base) 

269 worker_logger = logging.getLogger("openhcs.worker") 

270 else: 

271 # Set up basic logging for worker messages 

272 logging.basicConfig(level=logging.INFO) 

273 worker_logger = logging.getLogger("openhcs.worker") 

274 worker_logger.info("🔥 WORKER: No log file base provided, using basic logging") 

275 

276 # Initialize function registry for this worker process 

277 try: 

278 worker_logger.info("🔥 WORKER: Initializing function registry for worker process") 

279 

280 # Import and initialize function registry (will auto-discover all libraries) 

281 import openhcs.processing.func_registry as func_registry_module 

282 

283 # Force initialization if not already done (workers need full registry) 

284 with func_registry_module._registry_lock: 

285 if not func_registry_module._registry_initialized: 

286 func_registry_module._auto_initialize_registry() 

287 

288 worker_logger.info("🔥 WORKER: Function registry initialized successfully") 

289 

290 except Exception as e: 

291 worker_logger.error(f"🔥 WORKER: Failed to initialize function registry: {e}") 

292 # Don't raise - let worker continue, registry will auto-init on first function call 

293 worker_logger.warning("🔥 WORKER: Function registry will auto-initialize on first function call") 

294 

295 # Initialize GPU registry for this worker process 

296 try: 

297 worker_logger.info("🔥 WORKER: Initializing GPU registry for worker process") 

298 

299 # Reconstruct global config from dict 

300 from openhcs.core.config import GlobalPipelineConfig 

301 global_config = GlobalPipelineConfig(**global_config_dict) 

302 

303 # Initialize GPU registry for this worker 

304 from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry 

305 setup_global_gpu_registry(global_config) 

306 

307 worker_logger.info("🔥 WORKER: GPU registry initialized successfully") 

308 

309 except Exception as e: 

310 worker_logger.error(f"🔥 WORKER: Failed to initialize GPU registry: {e}") 

311 # Don't raise - let worker continue without GPU if needed 

312 worker_logger.warning("🔥 WORKER: Continuing without GPU registry - GPU functions may fail") 

313 

314 

315# Global variable to store log file base for worker processes 

316_worker_log_file_base = None 

317 

318 

319 

320 

321 

322class PipelineOrchestrator(ContextProvider): 

323 """ 

324 Updated orchestrator supporting both global and per-orchestrator configuration. 

325 

326 Global configuration: Updates all orchestrators (existing behavior) 

327 Per-orchestrator configuration: Affects only this orchestrator instance 

328 

329 The orchestrator first compiles the pipeline for all specified axis values, 

330 creating frozen, immutable ProcessingContexts using `compile_plate_for_processing()`. 

331 Then, it executes the (now stateless) pipeline definition against these contexts, 

332 potentially in parallel, using `execute_compiled_plate()`. 

333 """ 

334 _context_type = "orchestrator" # Register as orchestrator context provider 

335 

336 def __init__( 

337 self, 

338 plate_path: Union[str, Path], 

339 workspace_path: Optional[Union[str, Path]] = None, 

340 *, 

341 pipeline_config: Optional['PipelineConfig'] = None, 

342 storage_registry: Optional[Any] = None, 

343 ): 

344 # Lock removed - was orphaned code never used 

345 

346 # Validate shared global context exists 

347 if get_current_global_config(GlobalPipelineConfig) is None: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 raise RuntimeError( 

349 "No global configuration context found. " 

350 "Ensure application startup has called ensure_global_config_context()." 

351 ) 

352 

353 # Initialize auto-sync control for pipeline config 

354 self._pipeline_config = None 

355 self._auto_sync_enabled = True 

356 

357 # Context management now handled by contextvars-based system 

358 

359 # Initialize per-orchestrator configuration 

360 # DUAL-AXIS FIX: Always create a PipelineConfig instance to make orchestrator detectable as context provider 

361 # This ensures the orchestrator has a dataclass attribute for stack introspection 

362 # PipelineConfig is already the lazy version of GlobalPipelineConfig 

363 from openhcs.core.config import PipelineConfig 

364 if pipeline_config is None: 364 ↛ 367line 364 didn't jump to line 367 because the condition on line 364 was never true

365 # CRITICAL FIX: Create pipeline config that inherits from global config 

366 # This ensures the orchestrator's pipeline_config has the global values for resolution 

367 pipeline_config = PipelineConfig() 

368 

369 # CRITICAL FIX: Do NOT apply global config inheritance during initialization 

370 # PipelineConfig should always have None values that resolve through lazy resolution 

371 # Copying concrete values breaks the placeholder system and makes all fields appear "explicitly set" 

372 

373 self.pipeline_config = pipeline_config 

374 

375 # CRITICAL FIX: Expose pipeline config as public attribute for dual-axis resolver discovery 

376 # The resolver's _is_context_provider method only finds public attributes (skips _private) 

377 # This allows the resolver to discover the orchestrator's pipeline config during context resolution 

378 self.pipeline_config = pipeline_config 

379 logger.info("PipelineOrchestrator initialized with PipelineConfig for context discovery.") 

380 

381 # REMOVED: Unnecessary thread-local modification 

382 # The orchestrator should not modify thread-local storage during initialization 

383 # Global config is already available through the dual-axis resolver fallback 

384 

385 # Validate plate_path if provided 

386 if plate_path: 386 ↛ 389line 386 didn't jump to line 389 because the condition on line 386 was always true

387 plate_path = Path(plate_path) 

388 

389 if plate_path: 389 ↛ 398line 389 didn't jump to line 398 because the condition on line 389 was always true

390 if not plate_path.is_absolute(): 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true

391 raise ValueError(f"Plate path must be absolute: {plate_path}") 

392 if not plate_path.exists(): 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true

393 raise FileNotFoundError(f"Plate path does not exist: {plate_path}") 

394 if not plate_path.is_dir(): 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true

395 raise NotADirectoryError(f"Plate path is not a directory: {plate_path}") 

396 

397 # Initialize _plate_path_frozen first to allow plate_path to be set during initialization 

398 object.__setattr__(self, '_plate_path_frozen', False) 

399 

400 self.plate_path = plate_path 

401 self.workspace_path = workspace_path 

402 

403 if self.plate_path is None and self.workspace_path is None: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true

404 raise ValueError("Either plate_path or workspace_path must be provided for PipelineOrchestrator.") 

405 

406 # Freeze plate_path immediately after setting it to prove immutability 

407 object.__setattr__(self, '_plate_path_frozen', True) 

408 logger.info(f"🔒 PLATE_PATH FROZEN: {self.plate_path} is now immutable") 

409 

410 if storage_registry: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true

411 self.registry = storage_registry 

412 logger.info("PipelineOrchestrator using provided StorageRegistry instance.") 

413 else: 

414 # Create a copy of the global registry to avoid modifying shared state 

415 from openhcs.io.base import storage_registry as global_storage_registry 

416 self.registry = global_storage_registry.copy() 

417 logger.info("PipelineOrchestrator created its own StorageRegistry instance (copy of global).") 

418 

419 # Override zarr backend with orchestrator's config 

420 shared_context = get_current_global_config(GlobalPipelineConfig) 

421 zarr_backend_with_config = ZarrStorageBackend(shared_context.zarr_config) 

422 self.registry[Backend.ZARR.value] = zarr_backend_with_config 

423 logger.info(f"Orchestrator zarr backend configured with {shared_context.zarr_config.compressor.value} compression") 

424 

425 # Orchestrator always creates its own FileManager, using the determined registry 

426 self.filemanager = FileManager(self.registry) 

427 self.input_dir: Optional[Path] = None 

428 self.microscope_handler: Optional[MicroscopeHandler] = None 

429 self.default_pipeline_definition: Optional[List[AbstractStep]] = None 

430 self._initialized: bool = False 

431 self._state: OrchestratorState = OrchestratorState.CREATED 

432 

433 # Component keys cache for fast access - uses AllComponents (includes multiprocessing axis) 

434 self._component_keys_cache: Dict['AllComponents', List[str]] = {} 

435 

436 # Metadata cache service 

437 self._metadata_cache_service = get_metadata_cache() 

438 

439 

440 

441 

442 

443 def __setattr__(self, name: str, value: Any) -> None: 

444 """ 

445 Set an attribute, preventing modification of plate_path after it's frozen. 

446 

447 This proves that plate_path is truly immutable after initialization. 

448 """ 

449 if name == 'plate_path' and getattr(self, '_plate_path_frozen', False): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 import traceback 

451 stack_trace = ''.join(traceback.format_stack()) 

452 error_msg = ( 

453 f"🚫 IMMUTABLE PLATE_PATH VIOLATION: Cannot modify plate_path after freezing!\n" 

454 f"Current value: {getattr(self, 'plate_path', 'UNSET')}\n" 

455 f"Attempted new value: {value}\n" 

456 f"Stack trace:\n{stack_trace}" 

457 ) 

458 logger.error(error_msg) 

459 raise AttributeError(error_msg) 

460 super().__setattr__(name, value) 

461 

462 @property 

463 def state(self) -> OrchestratorState: 

464 """Get the current orchestrator state.""" 

465 return self._state 

466 

467 def initialize_microscope_handler(self): 

468 """Initializes the microscope handler.""" 

469 if self.microscope_handler is not None: 469 ↛ 470line 469 didn't jump to line 470 because the condition on line 469 was never true

470 logger.debug("Microscope handler already initialized.") 

471 return 

472# if self.input_dir is None: 

473# raise RuntimeError("Workspace (and input_dir) must be initialized before microscope handler.") 

474 

475 logger.info(f"Initializing microscope handler using input directory: {self.input_dir}...") 

476 try: 

477 # Use configured microscope type or auto-detect 

478 shared_context = get_current_global_config(GlobalPipelineConfig) 

479 microscope_type = shared_context.microscope.value if shared_context.microscope != Microscope.AUTO else 'auto' 

480 self.microscope_handler = create_microscope_handler( 

481 plate_folder=str(self.plate_path), 

482 filemanager=self.filemanager, 

483 microscope_type=microscope_type, 

484 ) 

485 logger.info(f"Initialized microscope handler: {type(self.microscope_handler).__name__}") 

486 except Exception as e: 

487 error_msg = f"Failed to create microscope handler: {e}" 

488 logger.error(error_msg) 

489 raise RuntimeError(error_msg) from e 

490 

491 def initialize(self, workspace_path: Optional[Union[str, Path]] = None) -> 'PipelineOrchestrator': 

492 """ 

493 Initializes all required components for the orchestrator. 

494 Must be called before other processing methods. 

495 Returns self for chaining. 

496 """ 

497 if self._initialized: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true

498 logger.info("Orchestrator already initialized.") 

499 return self 

500 

501 try: 

502 self.initialize_microscope_handler() 

503 

504 # Delegate workspace initialization to microscope handler 

505 logger.info("Initializing workspace with microscope handler...") 

506 actual_image_dir = self.microscope_handler.initialize_workspace( 

507 self.plate_path, workspace_path, self.filemanager 

508 ) 

509 

510 # Use the actual image directory returned by the microscope handler 

511 self.input_dir = Path(actual_image_dir) 

512 logger.info(f"Set input directory to: {self.input_dir}") 

513 

514 # Set workspace_path based on what the handler returned 

515 if actual_image_dir != self.plate_path: 515 ↛ 520line 515 didn't jump to line 520 because the condition on line 515 was always true

516 # Handler created a workspace 

517 self.workspace_path = Path(actual_image_dir).parent if Path(actual_image_dir).name != "workspace" else Path(actual_image_dir) 

518 else: 

519 # Handler used plate directly (like OpenHCS) 

520 self.workspace_path = None 

521 

522 # Mark as initialized BEFORE caching to avoid chicken-and-egg problem 

523 self._initialized = True 

524 self._state = OrchestratorState.READY 

525 

526 # Auto-cache component keys and metadata for instant access 

527 logger.info("Caching component keys and metadata...") 

528 self.cache_component_keys() 

529 self._metadata_cache_service.cache_metadata( 

530 self.microscope_handler, 

531 self.plate_path, 

532 self._component_keys_cache 

533 ) 

534 

535 logger.info("PipelineOrchestrator fully initialized with cached component keys and metadata.") 

536 return self 

537 except Exception as e: 

538 self._state = OrchestratorState.INIT_FAILED 

539 logger.error(f"Failed to initialize orchestrator: {e}") 

540 raise 

541 

542 def is_initialized(self) -> bool: 

543 return self._initialized 

544 

545 def create_context(self, axis_id: str) -> ProcessingContext: 

546 """Creates a ProcessingContext for a given multiprocessing axis value.""" 

547 if not self.is_initialized(): 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true

548 raise RuntimeError("Orchestrator must be initialized before calling create_context().") 

549 if not axis_id: 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true

550 raise ValueError("Axis identifier must be provided.") 

551 if self.input_dir is None: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 raise RuntimeError("Orchestrator input_dir is not set; initialize orchestrator first.") 

553 

554 context = ProcessingContext( 

555 global_config=self.get_effective_config(), 

556 axis_id=axis_id, 

557 filemanager=self.filemanager 

558 ) 

559 # Orchestrator reference removed - was orphaned and unpickleable 

560 context.microscope_handler = self.microscope_handler 

561 context.input_dir = self.input_dir 

562 context.workspace_path = self.workspace_path 

563 context.plate_path = self.plate_path # Add plate_path for path planner 

564 # Pass metadata cache for OpenHCS metadata creation 

565 context.metadata_cache = {} # Initialize empty - metadata cache is not pickled 

566 return context 

567 

568 def compile_pipelines( 

569 self, 

570 pipeline_definition: List[AbstractStep], 

571 well_filter: Optional[List[str]] = None, 

572 enable_visualizer_override: bool = False 

573 ) -> Dict[str, ProcessingContext]: 

574 """Compile pipelines for axis values (well_filter name preserved for UI compatibility).""" 

575 return PipelineCompiler.compile_pipelines( 

576 orchestrator=self, 

577 pipeline_definition=pipeline_definition, 

578 axis_filter=well_filter, # Translate well_filter to axis_filter for generic backend 

579 enable_visualizer_override=enable_visualizer_override 

580 ) 

581 

582 def _execute_single_axis( 

583 self, 

584 pipeline_definition: List[AbstractStep], 

585 frozen_context: ProcessingContext, 

586 visualizer: Optional[NapariVisualizerType] 

587 ) -> Dict[str, Any]: 

588 """Executes the pipeline for a single well using its frozen context.""" 

589 axis_id = frozen_context.axis_id 

590 logger.info(f"🔥 SINGLE_AXIS: Starting execution for axis {axis_id}") 

591 

592 # NUCLEAR VALIDATION 

593 if not frozen_context.is_frozen(): 

594 error_msg = f"🔥 SINGLE_AXIS ERROR: Context for axis {axis_id} is not frozen before execution" 

595 logger.error(error_msg) 

596 raise RuntimeError(error_msg) 

597 

598 if not pipeline_definition: 

599 error_msg = f"🔥 SINGLE_AXIS ERROR: Empty pipeline_definition for axis {axis_id}" 

600 logger.error(error_msg) 

601 raise RuntimeError(error_msg) 

602 

603 # Step IDs are consistent since pipeline_definition comes from UI (no remapping needed) 

604 

605 logger.info(f"🔥 SINGLE_AXIS: Processing {len(pipeline_definition)} steps for axis {axis_id}") 

606 

607 for step_index, step in enumerate(pipeline_definition): 

608 step_name = frozen_context.step_plans[step_index]["step_name"] 

609 

610 logger.info(f"🔥 SINGLE_AXIS: Executing step {step_index+1}/{len(pipeline_definition)} - {step_name} for axis {axis_id}") 

611 

612 if not hasattr(step, 'process'): 

613 error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} missing process method for axis {axis_id}" 

614 logger.error(error_msg) 

615 raise RuntimeError(error_msg) 

616 

617 # CRITICAL: Wrap step processing in config_context(step) for lazy config resolution 

618 from openhcs.config_framework.context_manager import config_context 

619 with config_context(step): 

620 step.process(frozen_context, step_index) 

621 logger.info(f"🔥 SINGLE_AXIS: Step {step_index+1}/{len(pipeline_definition)} - {step_name} completed for axis {axis_id}") 

622 

623 # except Exception as step_error: 

624 # import traceback 

625 # full_traceback = traceback.format_exc() 

626 # error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} ({step_id}) failed for axis {axis_id}: {step_error}" 

627 # logger.error(error_msg, exc_info=True) 

628 # logger.error(f"🔥 SINGLE_AXIS TRACEBACK for axis {axis_id}, step {step_index+1} ({step_id}):\n{full_traceback}") 

629 # raise RuntimeError(error_msg) from step_error 

630 

631 if visualizer: 

632 step_plan = frozen_context.step_plans[step_index] 

633 if step_plan['visualize']: 

634 output_dir = step_plan['output_dir'] 

635 write_backend = step_plan['write_backend'] 

636 if output_dir: 

637 logger.debug(f"Visualizing output for step {step_index} from path {output_dir} (backend: {write_backend}) for axis {axis_id}") 

638 visualizer.visualize_path( 

639 step_id=f"step_{step_index}", 

640 path=str(output_dir), 

641 backend=write_backend, 

642 axis_id=axis_id 

643 ) 

644 else: 

645 logger.warning(f"Step {step_index} in axis {axis_id} flagged for visualization but 'output_dir' is missing in its plan.") 

646 

647 logger.info(f"🔥 SINGLE_AXIS: Pipeline execution completed successfully for axis {axis_id}") 

648 return {"status": "success", "axis_id": axis_id} 

649 

650 def execute_compiled_plate( 

651 self, 

652 pipeline_definition: List[AbstractStep], 

653 compiled_contexts: Dict[str, ProcessingContext], 

654 max_workers: Optional[int] = None, 

655 visualizer: Optional[NapariVisualizerType] = None, 

656 log_file_base: Optional[str] = None 

657 ) -> Dict[str, Dict[str, Any]]: 

658 """ 

659 Execute-all phase: Runs the stateless pipeline against compiled contexts. 

660 

661 Args: 

662 pipeline_definition: The stateless list of AbstractStep objects. 

663 compiled_contexts: Dict of axis_id to its compiled, frozen ProcessingContext. 

664 Obtained from `compile_plate_for_processing`. 

665 max_workers: Maximum number of worker threads for parallel execution. 

666 visualizer: Optional instance of NapariStreamVisualizer for real-time visualization 

667 (requires napari to be installed; must be initialized with orchestrator's filemanager by the caller). 

668 log_file_base: Base path for worker process log files (without extension). 

669 Each worker will create its own log file: {log_file_base}_worker_{pid}.log 

670 

671 Returns: 

672 A dictionary mapping well IDs to their execution status (success/error and details). 

673 """ 

674 

675 # CRITICAL FIX: Use resolved pipeline definition from compilation if available 

676 # For subprocess runner, use the parameter directly since it receives pre-compiled contexts 

677 resolved_pipeline = getattr(self, '_resolved_pipeline_definition', None) 

678 if resolved_pipeline is not None: 

679 logger.info(f"🔥 EXECUTION: Using resolved pipeline definition with {len(resolved_pipeline)} steps (from compilation)") 

680 pipeline_definition = resolved_pipeline 

681 else: 

682 logger.info(f"🔥 EXECUTION: Using parameter pipeline definition with {len(pipeline_definition)} steps (subprocess mode)") 

683 # In subprocess mode, the pipeline_definition parameter should already be resolved 

684 if not self.is_initialized(): 

685 raise RuntimeError("Orchestrator must be initialized before executing.") 

686 if not pipeline_definition: 

687 raise ValueError("A valid (stateless) pipeline definition must be provided.") 

688 if not compiled_contexts: 

689 logger.warning("No compiled contexts provided for execution.") 

690 return {} 

691 

692 # Use effective config (includes pipeline config) instead of global config directly 

693 actual_max_workers = max_workers if max_workers is not None else self.get_effective_config().num_workers 

694 if actual_max_workers <= 0: # Ensure positive number of workers 

695 actual_max_workers = 1 

696 

697 # 🔬 AUTOMATIC VISUALIZER CREATION: Create visualizers if compiler detected streaming 

698 if visualizer is None: 

699 context = next(iter(compiled_contexts.values())) 

700 logger.info(f"🔬 ORCHESTRATOR: Checking for required visualizers: {len(context.required_visualizers)} found") 

701 for visualizer_info in context.required_visualizers: 

702 config = visualizer_info['config'] 

703 logger.info(f"🔬 ORCHESTRATOR: Creating visualizer with config: {config}") 

704 visualizer = config.create_visualizer(self.filemanager, context.visualizer_config) 

705 logger.info(f"🔬 ORCHESTRATOR: Starting visualizer: {visualizer}") 

706 visualizer.start_viewer() 

707 logger.info(f"🔬 ORCHESTRATOR: Visualizer started, is_running: {visualizer.is_running}") 

708 break 

709 

710 self._state = OrchestratorState.EXECUTING 

711 logger.info(f"Starting execution for {len(compiled_contexts)} axis values with max_workers={actual_max_workers}.") 

712 

713 # 🔍 VRAM TRACKING: Log initial memory state 

714 try: 

715 if log_gpu_memory_usage: 

716 log_gpu_memory_usage("plate execution start") 

717 except Exception: 

718 pass 

719 

720 try: 

721 execution_results: Dict[str, Dict[str, Any]] = {} 

722 

723 # CUDA COMPATIBILITY: Set spawn method for multiprocessing to support CUDA 

724 try: 

725 # Check if spawn method is available and set it if not already set 

726 current_method = multiprocessing.get_start_method(allow_none=True) 

727 if current_method != 'spawn': 

728 logger.info(f"🔥 CUDA: Setting multiprocessing start method from '{current_method}' to 'spawn' for CUDA compatibility") 

729 multiprocessing.set_start_method('spawn', force=True) 

730 else: 

731 logger.debug("🔥 CUDA: Multiprocessing start method already set to 'spawn'") 

732 except RuntimeError as e: 

733 # Start method may already be set, which is fine 

734 logger.debug(f"🔥 CUDA: Start method already configured: {e}") 

735 

736 # Choose executor type based on effective config for debugging support 

737 effective_config = self.get_effective_config() 

738 executor_type = "ThreadPoolExecutor" if effective_config.use_threading else "ProcessPoolExecutor" 

739 logger.info(f"🔥 ORCHESTRATOR: Creating {executor_type} with {actual_max_workers} workers") 

740 

741 # DEATH DETECTION: Mark executor creation 

742 logger.info(f"🔥 DEATH_MARKER: BEFORE_{executor_type.upper()}_CREATION") 

743 

744 # Choose appropriate executor class and configure worker logging 

745 if effective_config.use_threading: 

746 logger.info("🔥 DEBUG MODE: Using ThreadPoolExecutor for easier debugging") 

747 executor = concurrent.futures.ThreadPoolExecutor(max_workers=actual_max_workers) 

748 else: 

749 logger.info("🔥 PRODUCTION MODE: Using ProcessPoolExecutor for true parallelism") 

750 # CRITICAL FIX: Use _configure_worker_with_gpu to ensure workers have function registry 

751 # Workers need the function registry to access decorated functions with memory types 

752 from openhcs.config_framework.global_config import get_current_global_config 

753 global_config = get_current_global_config(GlobalPipelineConfig) 

754 global_config_dict = global_config.__dict__ if global_config else {} 

755 

756 if log_file_base: 

757 logger.info(f"🔥 WORKER SETUP: Configuring worker processes with function registry and logging") 

758 executor = concurrent.futures.ProcessPoolExecutor( 

759 max_workers=actual_max_workers, 

760 initializer=_configure_worker_with_gpu, 

761 initargs=(log_file_base, global_config_dict) 

762 ) 

763 else: 

764 logger.info("🔥 WORKER SETUP: Configuring worker processes with function registry (no logging)") 

765 executor = concurrent.futures.ProcessPoolExecutor( 

766 max_workers=actual_max_workers, 

767 initializer=_configure_worker_with_gpu, 

768 initargs=("", global_config_dict) # Empty string for no logging 

769 ) 

770 

771 logger.info(f"🔥 DEATH_MARKER: ENTERING_{executor_type.upper()}_CONTEXT") 

772 with executor: 

773 logger.info(f"🔥 DEATH_MARKER: {executor_type.upper()}_CREATED_SUCCESSFULLY") 

774 logger.info(f"🔥 ORCHESTRATOR: {executor_type} created, submitting {len(compiled_contexts)} tasks") 

775 

776 # NUCLEAR ERROR TRACING: Create snapshot of compiled_contexts to prevent iteration issues 

777 contexts_snapshot = dict(compiled_contexts.items()) 

778 logger.info(f"🔥 ORCHESTRATOR: Created contexts snapshot with {len(contexts_snapshot)} items") 

779 

780 # CRITICAL FIX: Resolve all lazy dataclass instances before multiprocessing 

781 # This ensures that the contexts are safe for pickling in ProcessPoolExecutor 

782 # Note: Don't resolve pipeline_definition as it may overwrite collision-resolved configs 

783 logger.info("🔥 ORCHESTRATOR: Resolving lazy dataclasses for multiprocessing compatibility") 

784 contexts_snapshot = resolve_lazy_configurations_for_serialization(contexts_snapshot) 

785 logger.info("🔥 ORCHESTRATOR: Lazy dataclass resolution completed") 

786 

787 logger.info("🔥 DEATH_MARKER: BEFORE_TASK_SUBMISSION_LOOP") 

788 future_to_axis_id = {} 

789 config = get_openhcs_config() 

790 if not config: 

791 raise RuntimeError("Component configuration is required for orchestrator execution") 

792 axis_name = config.multiprocessing_axis.value 

793 for axis_id, context in contexts_snapshot.items(): 

794 try: 

795 logger.info(f"🔥 DEATH_MARKER: SUBMITTING_TASK_FOR_{axis_name.upper()}_{axis_id}") 

796 logger.info(f"🔥 ORCHESTRATOR: Submitting task for {axis_name} {axis_id}") 

797 # Resolve all arguments before passing to ProcessPoolExecutor 

798 resolved_context = resolve_lazy_configurations_for_serialization(context) 

799 

800 # Use static function to avoid pickling the orchestrator instance 

801 # Note: Use original pipeline_definition to preserve collision-resolved configs 

802 # Don't pass visualizer to worker processes - they communicate via ZeroMQ 

803 future = executor.submit(_execute_single_axis_static, pipeline_definition, resolved_context, None) 

804 future_to_axis_id[future] = axis_id 

805 logger.info(f"🔥 ORCHESTRATOR: Task submitted for {axis_name} {axis_id}") 

806 logger.info(f"🔥 DEATH_MARKER: TASK_SUBMITTED_FOR_{axis_name.upper()}_{axis_id}") 

807 except Exception as submit_error: 

808 error_msg = f"🔥 ORCHESTRATOR ERROR: Failed to submit task for {axis_name} {axis_id}: {submit_error}" 

809 logger.error(error_msg, exc_info=True) 

810 # FAIL-FAST: Re-raise task submission errors immediately 

811 raise 

812 

813 logger.info("🔥 DEATH_MARKER: TASK_SUBMISSION_LOOP_COMPLETED") 

814 

815 logger.info(f"🔥 ORCHESTRATOR: All {len(future_to_axis_id)} tasks submitted, waiting for completion") 

816 logger.info("🔥 DEATH_MARKER: BEFORE_COMPLETION_LOOP") 

817 

818 completed_count = 0 

819 logger.info("🔥 DEATH_MARKER: ENTERING_AS_COMPLETED_LOOP") 

820 for future in concurrent.futures.as_completed(future_to_axis_id): 

821 axis_id = future_to_axis_id[future] 

822 completed_count += 1 

823 logger.info(f"🔥 DEATH_MARKER: PROCESSING_COMPLETED_TASK_{completed_count}_{axis_name.upper()}_{axis_id}") 

824 logger.info(f"🔥 ORCHESTRATOR: Task {completed_count}/{len(future_to_axis_id)} completed for {axis_name} {axis_id}") 

825 

826 try: 

827 logger.info(f"🔥 DEATH_MARKER: CALLING_FUTURE_RESULT_FOR_{axis_name.upper()}_{axis_id}") 

828 result = future.result() 

829 logger.info(f"🔥 DEATH_MARKER: FUTURE_RESULT_SUCCESS_FOR_{axis_name.upper()}_{axis_id}") 

830 logger.info(f"🔥 ORCHESTRATOR: {axis_name.title()} {axis_id} result: {result}") 

831 execution_results[axis_id] = result 

832 logger.info(f"🔥 DEATH_MARKER: RESULT_STORED_FOR_{axis_name.upper()}_{axis_id}") 

833 except Exception as exc: 

834 import traceback 

835 full_traceback = traceback.format_exc() 

836 error_msg = f"{axis_name.title()} {axis_id} generated an exception during execution: {exc}" 

837 logger.error(f"🔥 ORCHESTRATOR ERROR: {error_msg}", exc_info=True) 

838 logger.error(f"🔥 ORCHESTRATOR FULL TRACEBACK for {axis_name} {axis_id}:\n{full_traceback}") 

839 # FAIL-FAST: Re-raise immediately instead of storing error 

840 raise 

841 

842 logger.info("🔥 DEATH_MARKER: COMPLETION_LOOP_FINISHED") 

843 

844 logger.info(f"🔥 ORCHESTRATOR: All tasks completed, {len(execution_results)} results collected") 

845 

846 

847 # 🔥 GPU CLEANUP: Clear GPU memory after plate execution 

848 try: 

849 if cleanup_all_gpu_frameworks: 

850 cleanup_all_gpu_frameworks() 

851 logger.debug("🔥 GPU CLEANUP: Cleared all GPU frameworks after plate execution") 

852 except Exception as cleanup_error: 

853 logger.warning(f"Failed to cleanup GPU memory after plate execution: {cleanup_error}") 

854 

855 

856 

857 logger.info(f"🔥 ORCHESTRATOR: Plate execution completed, checking for analysis consolidation") 

858 # Run automatic analysis consolidation if enabled 

859 shared_context = get_current_global_config(GlobalPipelineConfig) 

860 if shared_context.analysis_consolidation_config.enabled: 

861 try: 

862 # Get results directory using same logic as path planner (single source of truth) 

863 results_dir = None 

864 for axis_id, context in compiled_contexts.items(): 

865 # Use the same logic as PathPlanner._get_results_path() 

866 plate_path = Path(context.plate_path) 

867 materialization_path = shared_context.materialization_results_path 

868 

869 if Path(materialization_path).is_absolute(): 

870 potential_results_dir = Path(materialization_path) 

871 else: 

872 potential_results_dir = plate_path / materialization_path 

873 

874 if potential_results_dir.exists(): 

875 results_dir = potential_results_dir 

876 logger.info(f"🔍 CONSOLIDATION: Found results directory: {results_dir}") 

877 break 

878 

879 if results_dir and results_dir.exists(): 

880 # Check if there are actually CSV files (materialized results) 

881 csv_files = list(results_dir.glob("*.csv")) 

882 if csv_files: 

883 logger.info(f"🔄 CONSOLIDATION: Found {len(csv_files)} CSV files, running consolidation") 

884 # Get well IDs from compiled contexts 

885 axis_ids = list(compiled_contexts.keys()) 

886 logger.info(f"🔄 CONSOLIDATION: Using well IDs: {axis_ids}") 

887 

888 consolidate_analysis_results( 

889 results_directory=str(results_dir), 

890 well_ids=axis_ids, 

891 consolidation_config=shared_context.analysis_consolidation_config, 

892 plate_metadata_config=shared_context.plate_metadata_config 

893 ) 

894 logger.info("✅ CONSOLIDATION: Completed successfully") 

895 else: 

896 logger.info(f"⏭️ CONSOLIDATION: No CSV files found in {results_dir}, skipping") 

897 else: 

898 logger.info(f"⏭️ CONSOLIDATION: No results directory found in compiled contexts") 

899 except Exception as e: 

900 logger.error(f"❌ CONSOLIDATION: Failed: {e}") 

901 

902 # Update state based on execution results 

903 if all(result.get("status") == "success" for result in execution_results.values()): 

904 self._state = OrchestratorState.COMPLETED 

905 else: 

906 self._state = OrchestratorState.EXEC_FAILED 

907 

908 # 🔬 VISUALIZER CLEANUP: Stop napari visualizer if it was auto-created and not persistent 

909 if visualizer is not None: 

910 try: 

911 if not visualizer.persistent: 

912 visualizer.stop_viewer() 

913 logger.info("🔬 ORCHESTRATOR: Stopped non-persistent napari visualizer") 

914 else: 

915 logger.info("🔬 ORCHESTRATOR: Keeping persistent napari visualizer alive") 

916 # Just cleanup ZMQ connection, leave process running 

917 visualizer._cleanup_zmq() 

918 except Exception as e: 

919 logger.warning(f"🔬 ORCHESTRATOR: Failed to cleanup napari visualizer: {e}") 

920 

921 logger.info(f"🔥 ORCHESTRATOR: Plate execution finished. Results: {execution_results}") 

922 

923 return execution_results 

924 except Exception as e: 

925 self._state = OrchestratorState.EXEC_FAILED 

926 logger.error(f"Failed to execute compiled plate: {e}") 

927 raise 

928 

929 def get_component_keys(self, component: Union['AllComponents', 'VariableComponents'], component_filter: Optional[List[Union[str, int]]] = None) -> List[str]: 

930 """ 

931 Generic method to get component keys using VariableComponents directly. 

932 

933 Returns the discovered component values as strings to match the pattern 

934 detection system format. 

935 

936 Tries metadata cache first, falls back to filename parsing cache if metadata is empty. 

937 

938 Args: 

939 component: AllComponents or VariableComponents enum specifying which component to extract 

940 (also accepts GroupBy enum which will be converted to AllComponents) 

941 component_filter: Optional list of component values to filter by 

942 

943 Returns: 

944 List of component values as strings, sorted 

945 

946 Raises: 

947 RuntimeError: If orchestrator is not initialized 

948 """ 

949 if not self.is_initialized(): 949 ↛ 950line 949 didn't jump to line 950 because the condition on line 949 was never true

950 raise RuntimeError("Orchestrator must be initialized before getting component keys.") 

951 

952 # Convert GroupBy to AllComponents using OpenHCS generic utility 

953 if isinstance(component, GroupBy) and component.value is None: 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true

954 raise ValueError("Cannot get component keys for GroupBy.NONE") 

955 

956 # Convert to AllComponents for cache lookup (includes multiprocessing axis) 

957 component = convert_enum_by_value(component, AllComponents) or component 

958 

959 # Use component directly - let natural errors occur for wrong types 

960 component_name = component.value 

961 

962 # Try metadata cache first (preferred source) 

963 cached_metadata = self._metadata_cache_service.get_cached_metadata(component) 

964 if cached_metadata: 964 ↛ 969line 964 didn't jump to line 969 because the condition on line 964 was always true

965 all_components = list(cached_metadata.keys()) 

966 logger.debug(f"Using metadata cache for {component_name}: {len(all_components)} components") 

967 else: 

968 # Fall back to filename parsing cache 

969 all_components = self._component_keys_cache[component] # Let KeyError bubble up naturally 

970 

971 if not all_components: 

972 logger.warning(f"No {component_name} values found in input directory: {self.input_dir}") 

973 return [] 

974 

975 logger.debug(f"Using filename parsing cache for {component.value}: {len(all_components)} components") 

976 

977 if component_filter: 

978 str_component_filter = {str(c) for c in component_filter} 

979 selected_components = [comp for comp in all_components if comp in str_component_filter] 

980 if not selected_components: 980 ↛ 981line 980 didn't jump to line 981 because the condition on line 980 was never true

981 component_name = group_by.value 

982 logger.warning(f"No {component_name} values from {all_components} match the filter: {component_filter}") 

983 return selected_components 

984 else: 

985 return all_components 

986 

987 def cache_component_keys(self, components: Optional[List['AllComponents']] = None) -> None: 

988 """ 

989 Pre-compute and cache component keys for fast access using single-pass parsing. 

990 

991 This method performs expensive file listing and parsing operations once, 

992 extracting all component types in a single pass for maximum efficiency. 

993 

994 Args: 

995 components: Optional list of AllComponents to cache. 

996 If None, caches all components in the AllComponents enum. 

997 """ 

998 if not self.is_initialized(): 998 ↛ 999line 998 didn't jump to line 999 because the condition on line 998 was never true

999 raise RuntimeError("Orchestrator must be initialized before caching component keys.") 

1000 

1001 if components is None: 1001 ↛ 1004line 1001 didn't jump to line 1004 because the condition on line 1001 was always true

1002 components = list(AllComponents) # Cache all enum values including multiprocessing axis 

1003 

1004 logger.info(f"Caching component keys for: {[comp.value for comp in components]}") 

1005 

1006 # Initialize component sets for all requested components 

1007 component_sets: Dict['AllComponents', Set[Union[str, int]]] = {} 

1008 for component in components: 

1009 component_sets[component] = set() 

1010 

1011 # Single pass through all filenames - extract all components at once 

1012 try: 

1013 # Use primary backend from microscope handler 

1014 backend_to_use = self.microscope_handler.get_primary_backend(self.input_dir) 

1015 logger.debug(f"Using backend '{backend_to_use}' for file listing based on available backends") 

1016 

1017 filenames = self.filemanager.list_files(str(self.input_dir), backend_to_use, extensions=DEFAULT_IMAGE_EXTENSIONS) 

1018 logger.debug(f"Parsing {len(filenames)} filenames in single pass...") 

1019 

1020 for filename in filenames: 

1021 parsed_info = self.microscope_handler.parser.parse_filename(str(filename)) 

1022 if parsed_info: 1022 ↛ 1029line 1022 didn't jump to line 1029 because the condition on line 1022 was always true

1023 # Extract all requested components from this filename 

1024 for component in component_sets: 

1025 component_name = component.value 

1026 if component_name in parsed_info and parsed_info[component_name] is not None: 1026 ↛ 1024line 1026 didn't jump to line 1024 because the condition on line 1026 was always true

1027 component_sets[component].add(parsed_info[component_name]) 

1028 else: 

1029 logger.warning(f"Could not parse filename: {filename}") 

1030 

1031 except Exception as e: 

1032 logger.error(f"Error listing files or parsing filenames from {self.input_dir}: {e}", exc_info=True) 

1033 # Initialize empty sets for failed parsing 

1034 for component in component_sets: 

1035 component_sets[component] = set() 

1036 

1037 # Convert sets to sorted lists and store in cache 

1038 for component, component_set in component_sets.items(): 

1039 sorted_components = [str(comp) for comp in sorted(list(component_set))] 

1040 self._component_keys_cache[component] = sorted_components 

1041 logger.debug(f"Cached {len(sorted_components)} {component.value} keys") 

1042 

1043 if not sorted_components: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true

1044 logger.warning(f"No {component.value} values found in input directory: {self.input_dir}") 

1045 

1046 logger.info(f"Component key caching complete. Cached {len(component_sets)} component types in single pass.") 

1047 

1048 def clear_component_cache(self, components: Optional[List['AllComponents']] = None) -> None: 

1049 """ 

1050 Clear cached component keys to force recomputation. 

1051 

1052 Use this when the input directory contents have changed and you need 

1053 to refresh the component key cache. 

1054 

1055 Args: 

1056 components: Optional list of AllComponents to clear from cache. 

1057 If None, clears entire cache. 

1058 """ 

1059 if components is None: 

1060 self._component_keys_cache.clear() 

1061 logger.info("Cleared entire component keys cache") 

1062 else: 

1063 for component in components: 

1064 if component in self._component_keys_cache: 

1065 del self._component_keys_cache[component] 

1066 logger.debug(f"Cleared cache for {component.value}") 

1067 logger.info(f"Cleared cache for {len(components)} component types") 

1068 

1069 @property 

1070 def metadata_cache(self) -> MetadataCache: 

1071 """Access to metadata cache service.""" 

1072 return self._metadata_cache_service 

1073 

1074 

1075 

1076 # Global config management removed - handled by UI layer 

1077 

1078 @property 

1079 def pipeline_config(self) -> Optional['PipelineConfig']: 

1080 """Get current pipeline configuration.""" 

1081 return self._pipeline_config 

1082 

1083 @pipeline_config.setter 

1084 def pipeline_config(self, value: Optional['PipelineConfig']) -> None: 

1085 """Set pipeline configuration with auto-sync to thread-local context.""" 

1086 self._pipeline_config = value 

1087 # CRITICAL FIX: Also update public attribute for dual-axis resolver discovery 

1088 # This ensures the resolver can always find the current pipeline config 

1089 if hasattr(self, '__dict__'): # Avoid issues during __init__ 1089 ↛ 1091line 1089 didn't jump to line 1091 because the condition on line 1089 was always true

1090 self.__dict__['pipeline_config'] = value 

1091 if self._auto_sync_enabled and value is not None: 

1092 self._sync_to_thread_local() 

1093 

1094 def _sync_to_thread_local(self) -> None: 

1095 """Internal method to sync current pipeline_config to thread-local context.""" 

1096 if self._pipeline_config and hasattr(self, 'plate_path'): 1096 ↛ 1097line 1096 didn't jump to line 1097 because the condition on line 1096 was never true

1097 self.apply_pipeline_config(self._pipeline_config) 

1098 

1099 def apply_pipeline_config(self, pipeline_config: 'PipelineConfig') -> None: 

1100 """ 

1101 Apply per-orchestrator configuration using thread-local storage. 

1102 

1103 This method sets the orchestrator's effective config in thread-local storage 

1104 for step-level lazy configurations to resolve against. 

1105 """ 

1106 # Import PipelineConfig at runtime for isinstance check 

1107 from openhcs.core.config import PipelineConfig 

1108 if not isinstance(pipeline_config, PipelineConfig): 

1109 raise TypeError(f"Expected PipelineConfig, got {type(pipeline_config)}") 

1110 

1111 # Temporarily disable auto-sync to prevent recursion 

1112 self._auto_sync_enabled = False 

1113 try: 

1114 self._pipeline_config = pipeline_config 

1115 finally: 

1116 self._auto_sync_enabled = True 

1117 

1118 # CRITICAL FIX: Do NOT contaminate thread-local context during PipelineConfig editing 

1119 # The orchestrator should maintain its own internal context without modifying 

1120 # the global thread-local context. This prevents reset operations from showing 

1121 # orchestrator's saved values instead of original thread-local defaults. 

1122 # 

1123 # The merged config is computed internally and used by get_effective_config() 

1124 # but should NOT be set as the global thread-local context. 

1125 

1126 logger.info(f"Applied orchestrator config for plate: {self.plate_path}") 

1127 

1128 def get_effective_config(self, *, for_serialization: bool = False) -> GlobalPipelineConfig: 

1129 """ 

1130 Get effective configuration for this orchestrator. 

1131 

1132 Args: 

1133 for_serialization: If True, resolves all values for pickling/storage. 

1134 If False, preserves None values for sibling inheritance. 

1135 """ 

1136 

1137 if for_serialization: 1137 ↛ 1138line 1137 didn't jump to line 1138 because the condition on line 1137 was never true

1138 result = self.pipeline_config.to_base_config() 

1139 

1140 # DEBUG: Check what the serialization result looks like 

1141 if hasattr(result, 'step_well_filter_config'): 

1142 step_config = getattr(result, 'step_well_filter_config') 

1143 if hasattr(step_config, 'well_filter'): 

1144 well_filter_value = getattr(step_config, 'well_filter') 

1145 logger.debug(f"Serialization result has step_well_filter_config.well_filter = {well_filter_value}") 

1146 

1147 return result 

1148 else: 

1149 # Reuse existing merged config logic from apply_pipeline_config 

1150 shared_context = get_current_global_config(GlobalPipelineConfig) 

1151 if not shared_context: 1151 ↛ 1152line 1151 didn't jump to line 1152 because the condition on line 1151 was never true

1152 raise RuntimeError("No global configuration context available for merging") 

1153 

1154 # DEBUG: Check what the shared context looks like before merging 

1155 if hasattr(shared_context, 'step_well_filter_config'): 1155 ↛ 1161line 1155 didn't jump to line 1161 because the condition on line 1155 was always true

1156 step_config = getattr(shared_context, 'step_well_filter_config') 

1157 if hasattr(step_config, 'well_filter'): 1157 ↛ 1161line 1157 didn't jump to line 1161 because the condition on line 1157 was always true

1158 well_filter_value = getattr(step_config, 'well_filter') 

1159 logger.debug(f"Shared context before merge has step_well_filter_config.well_filter = {well_filter_value}") 

1160 

1161 result = _create_merged_config(self.pipeline_config, shared_context) 

1162 

1163 # DEBUG: Check what the merged result looks like 

1164 if hasattr(result, 'step_well_filter_config'): 1164 ↛ 1170line 1164 didn't jump to line 1170 because the condition on line 1164 was always true

1165 step_config = getattr(result, 'step_well_filter_config') 

1166 if hasattr(step_config, 'well_filter'): 1166 ↛ 1170line 1166 didn't jump to line 1170 because the condition on line 1166 was always true

1167 well_filter_value = getattr(step_config, 'well_filter') 

1168 logger.debug(f"Merged result has step_well_filter_config.well_filter = {well_filter_value}") 

1169 

1170 return result 

1171 

1172 

1173 

1174 def clear_pipeline_config(self) -> None: 

1175 """Clear per-orchestrator configuration.""" 

1176 # REMOVED: Thread-local modification - dual-axis resolver handles context automatically 

1177 # No need to modify thread-local storage when clearing orchestrator config 

1178 self.pipeline_config = None 

1179 logger.info(f"Cleared per-orchestrator config for plate: {self.plate_path}") 

1180 

1181 def cleanup_pipeline_config(self) -> None: 

1182 """Clean up orchestrator context when done (for backward compatibility).""" 

1183 self.clear_pipeline_config() 

1184 

1185 def __del__(self): 

1186 """Ensure config cleanup on orchestrator destruction.""" 

1187 try: 

1188 # Clear any stored configuration references 

1189 self.clear_pipeline_config() 

1190 except Exception: 

1191 # Ignore errors during cleanup in destructor to prevent cascading failures 

1192 pass