Coverage for openhcs/core/orchestrator/orchestrator.py: 60.3%

703 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Consolidated orchestrator module for OpenHCS. 

3 

4This module provides a unified PipelineOrchestrator class that implements 

5a two-phase (compile-all-then-execute-all) pipeline execution model. 

6 

7Doctrinal Clauses: 

8- Clause 12 — Absolute Clean Execution 

9- Clause 66 — Immutability After Construction 

10- Clause 88 — No Inferred Capabilities 

11- Clause 293 — GPU Pre-Declaration Enforcement 

12- Clause 295 — GPU Scheduling Affinity 

13""" 

14 

15import logging 

16import concurrent.futures 

17import multiprocessing 

18from dataclasses import fields 

19from pathlib import Path 

20from typing import Any, Callable, Dict, List, Optional, Union, Set 

21 

22from openhcs.constants.constants import Backend, DEFAULT_IMAGE_EXTENSIONS, GroupBy, OrchestratorState, get_openhcs_config, AllComponents, VariableComponents 

23from openhcs.constants import Microscope 

24from openhcs.core.config import GlobalPipelineConfig 

25from openhcs.config_framework.global_config import get_current_global_config 

26from openhcs.config_framework.lazy_factory import ContextProvider 

27 

28 

29from openhcs.core.metadata_cache import get_metadata_cache, MetadataCache 

30from openhcs.core.context.processing_context import ProcessingContext 

31from openhcs.core.pipeline.compiler import PipelineCompiler 

32from openhcs.core.steps.abstract import AbstractStep 

33from openhcs.core.components.validation import convert_enum_by_value 

34from openhcs.io.filemanager import FileManager 

35# Zarr backend is CPU-only; always import it (even in subprocess/no-GPU mode) 

36import os 

37from openhcs.io.zarr import ZarrStorageBackend 

38# PipelineConfig now imported directly above 

39from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization 

40from openhcs.microscopes import create_microscope_handler 

41from openhcs.microscopes.microscope_base import MicroscopeHandler 

42 

43# Lazy import of consolidate_analysis_results to avoid blocking GUI startup 

44# This function imports GPU libraries, so we defer it until first use 

45def _get_consolidate_analysis_results(): 

46 """Lazy import of consolidate_analysis_results function.""" 

47 if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 

48 # Subprocess runner mode - create placeholder 

49 def consolidate_analysis_results(*args, **kwargs): 

50 """Placeholder for subprocess runner mode.""" 

51 raise RuntimeError("Analysis consolidation not available in subprocess runner mode") 

52 return consolidate_analysis_results 

53 else: 

54 from openhcs.processing.backends.analysis.consolidate_analysis_results import consolidate_analysis_results 

55 return consolidate_analysis_results 

56 

57# Import generic component system - required for orchestrator functionality 

58 

59# Optional napari import for visualization 

60try: 

61 from openhcs.runtime.napari_stream_visualizer import NapariStreamVisualizer 

62 NapariVisualizerType = NapariStreamVisualizer 

63except ImportError: 

64 # Create a placeholder type for type hints when napari is not available 

65 NapariStreamVisualizer = None 

66 NapariVisualizerType = Any # Use Any for type hints when napari is not available 

67 

68# Optional GPU memory management imports 

69try: 

70 from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks 

71except ImportError: 

72 cleanup_all_gpu_frameworks = None 

73 

74 

75logger = logging.getLogger(__name__) 

76 

77 

78def _create_merged_config(pipeline_config: 'PipelineConfig', global_config: GlobalPipelineConfig) -> GlobalPipelineConfig: 

79 """ 

80 Pure function for creating merged config that preserves None values for sibling inheritance. 

81 

82 Follows OpenHCS stateless architecture principles - no side effects, explicit dependencies. 

83 Extracted from apply_pipeline_config to eliminate code duplication. 

84 """ 

85 logger.debug(f"Starting merge with pipeline_config={type(pipeline_config)} and global_config={type(global_config)}") 

86 

87 # DEBUG: Check what the global_config looks like 

88 if hasattr(global_config, 'step_well_filter_config'): 88 ↛ 94line 88 didn't jump to line 94 because the condition on line 88 was always true

89 step_config = getattr(global_config, 'step_well_filter_config') 

90 if hasattr(step_config, 'well_filter'): 90 ↛ 94line 90 didn't jump to line 94 because the condition on line 90 was always true

91 well_filter_value = getattr(step_config, 'well_filter') 

92 logger.debug(f"global_config has step_well_filter_config.well_filter = {well_filter_value}") 

93 

94 merged_config_values = {} 

95 for field in fields(GlobalPipelineConfig): 

96 # Fail-loud: Let AttributeError bubble up naturally (no getattr fallbacks) 

97 pipeline_value = getattr(pipeline_config, field.name) 

98 

99 if field.name == 'step_well_filter_config': 

100 logger.debug(f"Processing step_well_filter_config: pipeline_value = {pipeline_value}") 

101 

102 if pipeline_value is not None: 

103 # CRITICAL FIX: Convert lazy configs to base configs with resolved values 

104 # This ensures that user-set values from lazy configs are preserved in the thread-local context 

105 # instead of being replaced with static defaults when GlobalPipelineConfig is instantiated 

106 if hasattr(pipeline_value, 'to_base_config'): 

107 # This is a lazy config - convert to base config with resolved values 

108 converted_value = pipeline_value.to_base_config() 

109 merged_config_values[field.name] = converted_value 

110 if field.name == 'step_well_filter_config': 

111 logger.debug(f"Converted lazy config to base: {converted_value}") 

112 else: 

113 # Regular value - use as-is 

114 merged_config_values[field.name] = pipeline_value 

115 if field.name == 'step_well_filter_config': 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 logger.debug(f"Using pipeline value as-is: {pipeline_value}") 

117 else: 

118 global_value = getattr(global_config, field.name) 

119 merged_config_values[field.name] = global_value 

120 

121 result = GlobalPipelineConfig(**merged_config_values) 

122 return result 

123 

124 

125def _execute_single_axis_static( 

126 pipeline_definition: List[AbstractStep], 

127 frozen_context: 'ProcessingContext', 

128 visualizer: Optional['NapariVisualizerType'] 

129) -> Dict[str, Any]: 

130 """ 

131 Static version of _execute_single_axis for multiprocessing compatibility. 

132 

133 This function is identical to PipelineOrchestrator._execute_single_axis but doesn't 

134 require an orchestrator instance, making it safe for pickling in ProcessPoolExecutor. 

135 

136 Args: 

137 pipeline_definition: List of pipeline steps to execute 

138 frozen_context: Frozen processing context for this axis 

139 visualizer: Optional Napari visualizer (not used in multiprocessing) 

140 """ 

141 axis_id = frozen_context.axis_id 

142 

143 # NUCLEAR VALIDATION 

144 if not frozen_context.is_frozen(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 error_msg = f"Context for axis {axis_id} is not frozen before execution" 

146 logger.error(error_msg) 

147 raise RuntimeError(error_msg) 

148 

149 if not pipeline_definition: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 error_msg = f"Empty pipeline_definition for axis {axis_id}" 

151 logger.error(error_msg) 

152 raise RuntimeError(error_msg) 

153 

154 # Execute each step in the pipeline 

155 for step_index, step in enumerate(pipeline_definition): 

156 step_name = frozen_context.step_plans[step_index]["step_name"] 

157 

158 # Verify step has process method (should always be true for AbstractStep subclasses) 

159 if not hasattr(step, 'process'): 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 error_msg = f"Step {step_index+1} missing process method for axis {axis_id}" 

161 logger.error(error_msg) 

162 raise RuntimeError(error_msg) 

163 

164 # Call process method on step instance 

165 step.process(frozen_context, step_index) 

166 

167 # Handle visualization if requested 

168 if visualizer: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true

169 step_plan = frozen_context.step_plans[step_index] 

170 if step_plan['visualize']: 

171 output_dir = step_plan['output_dir'] 

172 write_backend = step_plan['write_backend'] 

173 if output_dir: 

174 logger.debug(f"Visualizing output for step {step_index} from path {output_dir} (backend: {write_backend}) for axis {axis_id}") 

175 visualizer.visualize_path( 

176 step_id=f"step_{step_index}", 

177 path=str(output_dir), 

178 backend=write_backend, 

179 axis_id=axis_id 

180 ) 

181 else: 

182 logger.warning(f"Step {step_index} in axis {axis_id} flagged for visualization but 'output_dir' is missing in its plan.") 

183 

184 logger.info(f"🔥 SINGLE_AXIS: Pipeline execution completed successfully for axis {axis_id}") 

185 result = {"status": "success", "axis_id": axis_id} 

186 logger.info(f"🔥 SINGLE_AXIS: Returning result: {result}") 

187 logger.info(f"🔥 SINGLE_AXIS: Result type check - status: {type(result['status'])}, axis_id: {type(result['axis_id'])}") 

188 return result 

189 

190 

191def _configure_worker_logging(log_file_base: str): 

192 """ 

193 Configure logging and import hook for worker process. 

194 

195 This function is called once per worker process when it starts. 

196 Each worker will get its own log file with a unique identifier. 

197 

198 Args: 

199 log_file_base: Base path for worker log files 

200 """ 

201 import os 

202 import logging 

203 import time 

204 

205 # CRITICAL: Skip function registry initialization for fast worker startup 

206 # The environment variable is inherited from the subprocess runner 

207 # Note: We don't log this yet because logging isn't configured 

208 

209 # Note: Import hook system was removed - using existing comprehensive registries 

210 

211 # Create unique worker identifier using PID and timestamp 

212 worker_pid = os.getpid() 

213 worker_timestamp = int(time.time() * 1000000) # Microsecond precision for uniqueness 

214 worker_id = f"{worker_pid}_{worker_timestamp}" 

215 worker_log_file = f"{log_file_base}_worker_{worker_id}.log" 

216 

217 # Configure root logger to capture ALL logs from worker process 

218 root_logger = logging.getLogger() 

219 root_logger.handlers.clear() # Clear any inherited handlers 

220 

221 # Create file handler for worker logs 

222 file_handler = logging.FileHandler(worker_log_file) 

223 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) 

224 root_logger.addHandler(file_handler) 

225 root_logger.setLevel(logging.INFO) 

226 

227 # Ensure all OpenHCS module logs are captured 

228 logging.getLogger("openhcs").setLevel(logging.INFO) 

229 

230 # Get worker logger 

231 worker_logger = logging.getLogger("openhcs.worker") 

232 

233 

234def _configure_worker_with_gpu(log_file_base: str, global_config_dict: dict): 

235 """ 

236 Configure logging, function registry, and GPU registry for worker process. 

237 

238 This function is called once per worker process when it starts. 

239 It sets up logging, function registry, and GPU registry initialization. 

240 

241 Args: 

242 log_file_base: Base path for worker log files (empty string if no logging) 

243 global_config_dict: Serialized global configuration for GPU registry setup 

244 """ 

245 import logging 

246 import os 

247 

248 # Workers should be allowed to import GPU libs if available. 

249 # The parent subprocess runner may set OPENHCS_SUBPROCESS_NO_GPU=1 to stay lean, 

250 # but that flag must not leak into worker processes. 

251 os.environ.pop('OPENHCS_SUBPROCESS_NO_GPU', None) 

252 

253 # Configure logging only if log_file_base is provided 

254 if log_file_base: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 _configure_worker_logging(log_file_base) 

256 worker_logger = logging.getLogger("openhcs.worker") 

257 else: 

258 # Set up basic logging for worker messages 

259 logging.basicConfig(level=logging.INFO) 

260 worker_logger = logging.getLogger("openhcs.worker") 

261 

262 # Initialize function registry for this worker process 

263 try: 

264 # Import and initialize function registry (will auto-discover all libraries) 

265 import openhcs.processing.func_registry as func_registry_module 

266 

267 # Force initialization if not already done (workers need full registry) 

268 with func_registry_module._registry_lock: 

269 if not func_registry_module._registry_initialized: 269 ↛ 277line 269 didn't jump to line 277

270 func_registry_module._auto_initialize_registry() 

271 

272 except Exception as e: 

273 # Don't raise - let worker continue, registry will auto-init on first function call 

274 pass 

275 

276 # Initialize GPU registry for this worker process 

277 try: 

278 # Reconstruct global config from dict 

279 from openhcs.core.config import GlobalPipelineConfig 

280 global_config = GlobalPipelineConfig(**global_config_dict) 

281 

282 # Initialize GPU registry for this worker 

283 from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry 

284 setup_global_gpu_registry(global_config) 

285 

286 except Exception as e: 

287 # Don't raise - let worker continue without GPU if needed 

288 pass 

289 

290 

291# Global variable to store log file base for worker processes 

292_worker_log_file_base = None 

293 

294 

295 

296 

297 

298class PipelineOrchestrator(ContextProvider): 

299 """ 

300 Updated orchestrator supporting both global and per-orchestrator configuration. 

301 

302 Global configuration: Updates all orchestrators (existing behavior) 

303 Per-orchestrator configuration: Affects only this orchestrator instance 

304 

305 The orchestrator first compiles the pipeline for all specified axis values, 

306 creating frozen, immutable ProcessingContexts using `compile_plate_for_processing()`. 

307 Then, it executes the (now stateless) pipeline definition against these contexts, 

308 potentially in parallel, using `execute_compiled_plate()`. 

309 """ 

310 _context_type = "orchestrator" # Register as orchestrator context provider 

311 

312 def __init__( 

313 self, 

314 plate_path: Union[str, Path], 

315 workspace_path: Optional[Union[str, Path]] = None, 

316 *, 

317 pipeline_config: Optional['PipelineConfig'] = None, 

318 storage_registry: Optional[Any] = None, 

319 progress_callback: Optional[Callable[[str, str, str, Dict[str, Any]], None]] = None, 

320 ): 

321 # Lock removed - was orphaned code never used 

322 

323 # Validate shared global context exists 

324 if get_current_global_config(GlobalPipelineConfig) is None: 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true

325 raise RuntimeError( 

326 "No global configuration context found. " 

327 "Ensure application startup has called ensure_global_config_context()." 

328 ) 

329 

330 # Track executor for cancellation support 

331 self._executor = None 

332 

333 # Initialize auto-sync control for pipeline config 

334 self._pipeline_config = None 

335 self._auto_sync_enabled = True 

336 

337 # Context management now handled by contextvars-based system 

338 

339 # Initialize per-orchestrator configuration 

340 # DUAL-AXIS FIX: Always create a PipelineConfig instance to make orchestrator detectable as context provider 

341 # This ensures the orchestrator has a dataclass attribute for stack introspection 

342 # PipelineConfig is already the lazy version of GlobalPipelineConfig 

343 from openhcs.core.config import PipelineConfig 

344 if pipeline_config is None: 344 ↛ 347line 344 didn't jump to line 347 because the condition on line 344 was never true

345 # CRITICAL FIX: Create pipeline config that inherits from global config 

346 # This ensures the orchestrator's pipeline_config has the global values for resolution 

347 pipeline_config = PipelineConfig() 

348 

349 # CRITICAL FIX: Do NOT apply global config inheritance during initialization 

350 # PipelineConfig should always have None values that resolve through lazy resolution 

351 # Copying concrete values breaks the placeholder system and makes all fields appear "explicitly set" 

352 

353 self.pipeline_config = pipeline_config 

354 

355 # CRITICAL FIX: Expose pipeline config as public attribute for dual-axis resolver discovery 

356 # The resolver's _is_context_provider method only finds public attributes (skips _private) 

357 # This allows the resolver to discover the orchestrator's pipeline config during context resolution 

358 self.pipeline_config = pipeline_config 

359 logger.info("PipelineOrchestrator initialized with PipelineConfig for context discovery.") 

360 

361 # REMOVED: Unnecessary thread-local modification 

362 # The orchestrator should not modify thread-local storage during initialization 

363 # Global config is already available through the dual-axis resolver fallback 

364 

365 # Convert to Path and validate 

366 if plate_path: 366 ↛ 379line 366 didn't jump to line 379 because the condition on line 366 was always true

367 plate_path = Path(plate_path) 

368 

369 # Validate filesystem paths (skip for OMERO virtual paths) 

370 if not str(plate_path).startswith("/omero/"): 370 ↛ 379line 370 didn't jump to line 379 because the condition on line 370 was always true

371 if not plate_path.is_absolute(): 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 raise ValueError(f"Plate path must be absolute: {plate_path}") 

373 if not plate_path.exists(): 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 raise FileNotFoundError(f"Plate path does not exist: {plate_path}") 

375 if not plate_path.is_dir(): 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true

376 raise NotADirectoryError(f"Plate path is not a directory: {plate_path}") 

377 

378 # Initialize _plate_path_frozen first to allow plate_path to be set during initialization 

379 object.__setattr__(self, '_plate_path_frozen', False) 

380 

381 self.plate_path = plate_path 

382 self.workspace_path = workspace_path 

383 

384 if self.plate_path is None and self.workspace_path is None: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true

385 raise ValueError("Either plate_path or workspace_path must be provided for PipelineOrchestrator.") 

386 

387 # Freeze plate_path immediately after setting it to prove immutability 

388 object.__setattr__(self, '_plate_path_frozen', True) 

389 logger.info(f"🔒 PLATE_PATH FROZEN: {self.plate_path} is now immutable") 

390 

391 if storage_registry: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 self.registry = storage_registry 

393 logger.info("PipelineOrchestrator using provided StorageRegistry instance.") 

394 else: 

395 # Create a copy of the global registry to avoid modifying shared state 

396 from openhcs.io.base import storage_registry as global_storage_registry, ensure_storage_registry 

397 # Ensure registry is initialized before copying 

398 ensure_storage_registry() 

399 self.registry = global_storage_registry.copy() 

400 logger.info("PipelineOrchestrator created its own StorageRegistry instance (copy of global).") 

401 

402 # Override zarr backend with orchestrator's config 

403 shared_context = get_current_global_config(GlobalPipelineConfig) 

404 zarr_backend_with_config = ZarrStorageBackend(shared_context.zarr_config) 

405 self.registry[Backend.ZARR.value] = zarr_backend_with_config 

406 logger.info(f"Orchestrator zarr backend configured with {shared_context.zarr_config.compressor.value} compression") 

407 

408 # Orchestrator always creates its own FileManager, using the determined registry 

409 self.filemanager = FileManager(self.registry) 

410 self.input_dir: Optional[Path] = None 

411 self.microscope_handler: Optional[MicroscopeHandler] = None 

412 self.default_pipeline_definition: Optional[List[AbstractStep]] = None 

413 self._initialized: bool = False 

414 self._state: OrchestratorState = OrchestratorState.CREATED 

415 

416 # Progress callback for real-time execution updates 

417 self.progress_callback = progress_callback 

418 if progress_callback: 418 ↛ 419line 418 didn't jump to line 419 because the condition on line 418 was never true

419 logger.info("PipelineOrchestrator initialized with progress callback") 

420 

421 # Component keys cache for fast access - uses AllComponents (includes multiprocessing axis) 

422 self._component_keys_cache: Dict['AllComponents', List[str]] = {} 

423 

424 # Metadata cache service 

425 self._metadata_cache_service = get_metadata_cache() 

426 

427 # Viewer management - shared between pipeline execution and image browser 

428 self._visualizers = {} # Dict[(backend_name, port)] -> visualizer instance 

429 

430 

431 def __setattr__(self, name: str, value: Any) -> None: 

432 """ 

433 Set an attribute, preventing modification of plate_path after it's frozen. 

434 

435 This proves that plate_path is truly immutable after initialization. 

436 """ 

437 if name == 'plate_path' and getattr(self, '_plate_path_frozen', False): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 import traceback 

439 stack_trace = ''.join(traceback.format_stack()) 

440 error_msg = ( 

441 f"🚫 IMMUTABLE PLATE_PATH VIOLATION: Cannot modify plate_path after freezing!\n" 

442 f"Current value: {getattr(self, 'plate_path', 'UNSET')}\n" 

443 f"Attempted new value: {value}\n" 

444 f"Stack trace:\n{stack_trace}" 

445 ) 

446 logger.error(error_msg) 

447 raise AttributeError(error_msg) 

448 super().__setattr__(name, value) 

449 

450 @property 

451 def state(self) -> OrchestratorState: 

452 """Get the current orchestrator state.""" 

453 return self._state 

454 

455 def get_or_create_visualizer(self, config, vis_config=None): 

456 """ 

457 Get existing visualizer or create a new one for the given config. 

458 

459 This method is shared between pipeline execution and image browser to avoid 

460 duplicating viewer instances. Viewers are tracked by (backend_name, port) key. 

461 

462 Args: 

463 config: Streaming config (any StreamingConfig subclass) 

464 vis_config: Optional visualizer config (can be None for image browser) 

465 

466 Returns: 

467 Visualizer instance 

468 """ 

469 from openhcs.core.config import StreamingConfig 

470 

471 # Generic streaming config handling using polymorphic attributes 

472 if isinstance(config, StreamingConfig): 472 ↛ 485line 472 didn't jump to line 485 because the condition on line 472 was always true

473 # Start global ack listener (must be before viewers connect) 

474 from openhcs.runtime.zmq_base import start_global_ack_listener 

475 start_global_ack_listener(config.transport_mode) 

476 

477 # Pre-create queue tracker using polymorphic attributes 

478 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry 

479 registry = GlobalQueueTrackerRegistry() 

480 registry.get_or_create_tracker(config.port, config.viewer_type) 

481 logger.info(f"🔬 ORCHESTRATOR: Pre-created queue tracker for {config.viewer_type} on port {config.port}") 

482 

483 key = (config.viewer_type, config.port) 

484 else: 

485 backend_name = config.backend.name if hasattr(config, 'backend') else 'unknown' 

486 key = (backend_name,) 

487 

488 # Check if we already have a visualizer for this key 

489 if key in self._visualizers: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true

490 vis = self._visualizers[key] 

491 if vis.is_running: 

492 return vis 

493 else: 

494 del self._visualizers[key] 

495 

496 # Create new visualizer using polymorphic create_visualizer method 

497 vis = config.create_visualizer(self.filemanager, vis_config) 

498 

499 # Start viewer asynchronously for streaming configs 

500 if isinstance(config, StreamingConfig): 500 ↛ 514line 500 didn't jump to line 514 because the condition on line 500 was always true

501 vis.start_viewer(async_mode=True) 

502 

503 # Ping server to set ready state (background thread to avoid blocking) 

504 import threading 

505 def ping_server(): 

506 import time 

507 time.sleep(1.0) # Give server time to start 

508 if hasattr(vis, '_wait_for_server_ready'): 

509 vis._wait_for_server_ready(timeout=10.0) 

510 

511 thread = threading.Thread(target=ping_server, daemon=True) 

512 thread.start() 

513 else: 

514 vis.start_viewer() 

515 

516 # Store in cache 

517 self._visualizers[key] = vis 

518 

519 return vis 

520 

521 def initialize_microscope_handler(self): 

522 """Initializes the microscope handler.""" 

523 if self.microscope_handler is not None: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true

524 logger.debug("Microscope handler already initialized.") 

525 return 

526# if self.input_dir is None: 

527# raise RuntimeError("Workspace (and input_dir) must be initialized before microscope handler.") 

528 

529 logger.info(f"Initializing microscope handler using input directory: {self.input_dir}...") 

530 try: 

531 # Use configured microscope type or auto-detect 

532 shared_context = get_current_global_config(GlobalPipelineConfig) 

533 microscope_type = shared_context.microscope.value if shared_context.microscope != Microscope.AUTO else 'auto' 

534 self.microscope_handler = create_microscope_handler( 

535 plate_folder=str(self.plate_path), 

536 filemanager=self.filemanager, 

537 microscope_type=microscope_type, 

538 ) 

539 logger.info(f"Initialized microscope handler: {type(self.microscope_handler).__name__}") 

540 except Exception as e: 

541 error_msg = f"Failed to create microscope handler: {e}" 

542 logger.error(error_msg) 

543 raise RuntimeError(error_msg) from e 

544 

545 def initialize(self, workspace_path: Optional[Union[str, Path]] = None) -> 'PipelineOrchestrator': 

546 """ 

547 Initializes all required components for the orchestrator. 

548 Must be called before other processing methods. 

549 Returns self for chaining. 

550 """ 

551 logger.info(f"🔥 INIT: initialize() called for plate: {self.plate_path}") 

552 if self._initialized: 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true

553 logger.info("Orchestrator already initialized.") 

554 return self 

555 

556 try: 

557 logger.info(f"🔥 INIT: About to call initialize_microscope_handler()") 

558 self.initialize_microscope_handler() 

559 

560 # Delegate workspace initialization to microscope handler 

561 logger.info("Initializing workspace with microscope handler...") 

562 actual_image_dir = self.microscope_handler.initialize_workspace( 

563 self.plate_path, self.filemanager 

564 ) 

565 

566 # Use the actual image directory returned by the microscope handler 

567 # All handlers now return Path (including OMERO with virtual paths) 

568 self.input_dir = Path(actual_image_dir) 

569 logger.info(f"Set input directory to: {self.input_dir}") 

570 

571 # Set workspace_path based on what the handler returned 

572 if actual_image_dir != self.plate_path: 

573 # Handler created a workspace (or virtual path for OMERO) 

574 self.workspace_path = Path(actual_image_dir).parent if Path(actual_image_dir).name != "workspace" else Path(actual_image_dir) 

575 else: 

576 # Handler used plate directly (like OpenHCS) 

577 self.workspace_path = None 

578 

579 # Mark as initialized BEFORE caching to avoid chicken-and-egg problem 

580 self._initialized = True 

581 self._state = OrchestratorState.READY 

582 

583 # Auto-cache component keys and metadata for instant access 

584 logger.info("Caching component keys and metadata...") 

585 self.cache_component_keys() 

586 self._metadata_cache_service.cache_metadata( 

587 self.microscope_handler, 

588 self.plate_path, 

589 self._component_keys_cache 

590 ) 

591 

592 # Ensure complete OpenHCS metadata exists 

593 self._ensure_openhcs_metadata() 

594 

595 logger.info("PipelineOrchestrator fully initialized with cached component keys and metadata.") 

596 return self 

597 except Exception as e: 

598 self._state = OrchestratorState.INIT_FAILED 

599 logger.error(f"Failed to initialize orchestrator: {e}") 

600 raise 

601 

602 def is_initialized(self) -> bool: 

603 return self._initialized 

604 

605 def _ensure_openhcs_metadata(self) -> None: 

606 """Ensure complete OpenHCS metadata exists for the plate. 

607 

608 Uses the same context creation logic as pipeline execution to get full metadata 

609 with channel names from metadata files (HTD, Index.xml, etc). 

610 

611 Skips OMERO and other non-disk-based microscope handlers since they don't have 

612 real disk directories. 

613 """ 

614 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator 

615 

616 # Skip metadata creation for OMERO and other non-disk-based handlers 

617 # OMERO uses virtual paths like /omero/plate_1 which are not real directories 

618 if self.microscope_handler.microscope_type == 'omero': 618 ↛ 619line 618 didn't jump to line 619 because the condition on line 618 was never true

619 logger.debug("Skipping metadata creation for OMERO plate (uses virtual paths)") 

620 return 

621 

622 # For plates with virtual workspace, metadata is already created by _build_virtual_mapping() 

623 # We just need to add the component metadata to the existing "." subdirectory 

624 from openhcs.io.metadata_writer import get_subdirectory_name 

625 subdir_name = get_subdirectory_name(self.input_dir, self.plate_path) 

626 

627 # Create context using SAME logic as create_context() to get full metadata 

628 context = self.create_context(axis_id="metadata_init") 

629 

630 # Create metadata (will skip if already complete) 

631 generator = OpenHCSMetadataGenerator(self.filemanager) 

632 generator.create_metadata( 

633 context, 

634 str(self.input_dir), 

635 "disk", 

636 is_main=True, 

637 plate_root=str(self.plate_path), 

638 sub_dir=subdir_name, 

639 skip_if_complete=True 

640 ) 

641 

642 def get_results_path(self) -> Path: 

643 """Get the results directory path for this orchestrator's plate. 

644 

645 Uses the same logic as PathPlanner._get_results_path() to ensure consistency. 

646 This is the single source of truth for where results are stored. 

647 

648 Returns: 

649 Path to results directory (absolute or relative to output plate root) 

650 """ 

651 from openhcs.core.pipeline.path_planner import PipelinePathPlanner 

652 

653 # Get materialization_results_path from global config 

654 materialization_path = self.global_config.materialization_results_path 

655 

656 # If absolute, use as-is 

657 if Path(materialization_path).is_absolute(): 

658 return Path(materialization_path) 

659 

660 # If relative, resolve relative to output plate root 

661 # Use path_planning_config from global config 

662 path_config = self.global_config.path_planning_config 

663 output_plate_root = PipelinePathPlanner.build_output_plate_root( 

664 self.plate_path, 

665 path_config, 

666 is_per_step_materialization=False 

667 ) 

668 

669 return output_plate_root / materialization_path 

670 

671 def create_context(self, axis_id: str) -> ProcessingContext: 

672 """Creates a ProcessingContext for a given multiprocessing axis value.""" 

673 if not self.is_initialized(): 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true

674 raise RuntimeError("Orchestrator must be initialized before calling create_context().") 

675 if not axis_id: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true

676 raise ValueError("Axis identifier must be provided.") 

677 if self.input_dir is None: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true

678 raise RuntimeError("Orchestrator input_dir is not set; initialize orchestrator first.") 

679 

680 context = ProcessingContext( 

681 global_config=self.get_effective_config(), 

682 axis_id=axis_id, 

683 filemanager=self.filemanager 

684 ) 

685 # Orchestrator reference removed - was orphaned and unpickleable 

686 context.microscope_handler = self.microscope_handler 

687 context.input_dir = self.input_dir 

688 context.workspace_path = self.workspace_path 

689 context.plate_path = self.plate_path # Add plate_path for path planner 

690 

691 # CRITICAL: Pass metadata cache for OpenHCS metadata creation 

692 # Extract cached metadata from service and convert to dict format expected by OpenHCSMetadataGenerator 

693 metadata_dict = {} 

694 for component in AllComponents: 

695 cached_metadata = self._metadata_cache_service.get_cached_metadata(component) 

696 if cached_metadata: 

697 metadata_dict[component] = cached_metadata 

698 context.metadata_cache = metadata_dict 

699 

700 return context 

701 

702 def compile_pipelines( 

703 self, 

704 pipeline_definition: List[AbstractStep], 

705 well_filter: Optional[List[str]] = None, 

706 enable_visualizer_override: bool = False 

707 ) -> Dict[str, ProcessingContext]: 

708 """Compile pipelines for axis values (well_filter name preserved for UI compatibility).""" 

709 return PipelineCompiler.compile_pipelines( 

710 orchestrator=self, 

711 pipeline_definition=pipeline_definition, 

712 axis_filter=well_filter, # Translate well_filter to axis_filter for generic backend 

713 enable_visualizer_override=enable_visualizer_override 

714 ) 

715 

716 def _execute_single_axis( 

717 self, 

718 pipeline_definition: List[AbstractStep], 

719 frozen_context: ProcessingContext, 

720 visualizer: Optional[NapariVisualizerType] 

721 ) -> Dict[str, Any]: 

722 """Executes the pipeline for a single well using its frozen context.""" 

723 axis_id = frozen_context.axis_id 

724 logger.info(f"🔥 SINGLE_AXIS: Starting execution for axis {axis_id}") 

725 

726 # Send progress: axis started 

727 if self.progress_callback: 

728 try: 

729 self.progress_callback(axis_id, 'pipeline', 'started', { 

730 'total_steps': len(pipeline_definition) 

731 }) 

732 except Exception as e: 

733 logger.warning(f"Progress callback failed for axis {axis_id} start: {e}") 

734 

735 # NUCLEAR VALIDATION 

736 if not frozen_context.is_frozen(): 

737 error_msg = f"🔥 SINGLE_AXIS ERROR: Context for axis {axis_id} is not frozen before execution" 

738 logger.error(error_msg) 

739 raise RuntimeError(error_msg) 

740 

741 if not pipeline_definition: 

742 error_msg = f"🔥 SINGLE_AXIS ERROR: Empty pipeline_definition for axis {axis_id}" 

743 logger.error(error_msg) 

744 raise RuntimeError(error_msg) 

745 

746 # Step IDs are consistent since pipeline_definition comes from UI (no remapping needed) 

747 

748 logger.info(f"🔥 SINGLE_AXIS: Processing {len(pipeline_definition)} steps for axis {axis_id}") 

749 

750 for step_index, step in enumerate(pipeline_definition): 

751 step_name = frozen_context.step_plans[step_index]["step_name"] 

752 

753 logger.info(f"🔥 SINGLE_AXIS: Executing step {step_index+1}/{len(pipeline_definition)} - {step_name} for axis {axis_id}") 

754 

755 # Send progress: step started 

756 if self.progress_callback: 

757 try: 

758 self.progress_callback(axis_id, step_name, 'started', { 

759 'step_index': step_index, 

760 'total_steps': len(pipeline_definition) 

761 }) 

762 except Exception as e: 

763 logger.warning(f"Progress callback failed for axis {axis_id} step {step_name} start: {e}") 

764 

765 # Verify step has process method (should always be true for AbstractStep subclasses) 

766 if not hasattr(step, 'process'): 

767 error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} missing process method for axis {axis_id}" 

768 logger.error(error_msg) 

769 raise RuntimeError(error_msg) 

770 

771 # Call process method on step instance 

772 step.process(frozen_context, step_index) 

773 logger.info(f"🔥 SINGLE_AXIS: Step {step_index+1}/{len(pipeline_definition)} - {step_name} completed for axis {axis_id}") 

774 

775 # Send progress: step completed 

776 if self.progress_callback: 

777 try: 

778 self.progress_callback(axis_id, step_name, 'completed', { 

779 'step_index': step_index, 

780 'total_steps': len(pipeline_definition) 

781 }) 

782 except Exception as e: 

783 logger.warning(f"Progress callback failed for axis {axis_id} step {step_name} completion: {e}") 

784 

785 # except Exception as step_error: 

786 # import traceback 

787 # full_traceback = traceback.format_exc() 

788 # error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} ({step_id}) failed for axis {axis_id}: {step_error}" 

789 # logger.error(error_msg, exc_info=True) 

790 # logger.error(f"🔥 SINGLE_AXIS TRACEBACK for axis {axis_id}, step {step_index+1} ({step_id}):\n{full_traceback}") 

791 # raise RuntimeError(error_msg) from step_error 

792 

793 if visualizer: 

794 step_plan = frozen_context.step_plans[step_index] 

795 if step_plan['visualize']: 

796 output_dir = step_plan['output_dir'] 

797 write_backend = step_plan['write_backend'] 

798 if output_dir: 

799 logger.debug(f"Visualizing output for step {step_index} from path {output_dir} (backend: {write_backend}) for axis {axis_id}") 

800 visualizer.visualize_path( 

801 step_id=f"step_{step_index}", 

802 path=str(output_dir), 

803 backend=write_backend, 

804 axis_id=axis_id 

805 ) 

806 else: 

807 logger.warning(f"Step {step_index} in axis {axis_id} flagged for visualization but 'output_dir' is missing in its plan.") 

808 

809 logger.info(f"🔥 SINGLE_AXIS: Pipeline execution completed successfully for axis {axis_id}") 

810 

811 # Send progress: axis completed 

812 if self.progress_callback: 

813 try: 

814 self.progress_callback(axis_id, 'pipeline', 'completed', { 

815 'total_steps': len(pipeline_definition) 

816 }) 

817 except Exception as e: 

818 logger.warning(f"Progress callback failed for axis {axis_id} completion: {e}") 

819 

820 return {"status": "success", "axis_id": axis_id} 

821 

822 def cancel_execution(self): 

823 """ 

824 Cancel ongoing execution by shutting down the executor. 

825 

826 This gracefully cancels pending futures and shuts down worker processes 

827 without killing all child processes (preserving Napari viewers, etc.). 

828 """ 

829 if self._executor: 

830 try: 

831 logger.info("🔥 ORCHESTRATOR: Cancelling execution - shutting down executor") 

832 self._executor.shutdown(wait=False, cancel_futures=True) 

833 logger.info("🔥 ORCHESTRATOR: Executor shutdown initiated") 

834 except Exception as e: 

835 logger.warning(f"🔥 ORCHESTRATOR: Failed to cancel executor: {e}") 

836 

837 def execute_compiled_plate( 

838 self, 

839 pipeline_definition: List[AbstractStep], 

840 compiled_contexts: Dict[str, ProcessingContext], 

841 max_workers: Optional[int] = None, 

842 visualizer: Optional[NapariVisualizerType] = None, 

843 log_file_base: Optional[str] = None 

844 ) -> Dict[str, Dict[str, Any]]: 

845 """ 

846 Execute-all phase: Runs the stateless pipeline against compiled contexts. 

847 

848 Args: 

849 pipeline_definition: The stateless list of AbstractStep objects. 

850 compiled_contexts: Dict of axis_id to its compiled, frozen ProcessingContext. 

851 Obtained from `compile_plate_for_processing`. 

852 max_workers: Maximum number of worker threads for parallel execution. 

853 visualizer: Optional instance of NapariStreamVisualizer for real-time visualization 

854 (requires napari to be installed; must be initialized with orchestrator's filemanager by the caller). 

855 log_file_base: Base path for worker process log files (without extension). 

856 Each worker will create its own log file: {log_file_base}_worker_{pid}.log 

857 

858 Returns: 

859 A dictionary mapping well IDs to their execution status (success/error and details). 

860 """ 

861 

862 # CRITICAL FIX: Use resolved pipeline definition from compilation if available 

863 # For subprocess runner, use the parameter directly since it receives pre-compiled contexts 

864 resolved_pipeline = getattr(self, '_resolved_pipeline_definition', None) 

865 if resolved_pipeline is not None: 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true

866 logger.info(f"🔥 EXECUTION: Using resolved pipeline definition with {len(resolved_pipeline)} steps (from compilation)") 

867 pipeline_definition = resolved_pipeline 

868 else: 

869 logger.info(f"🔥 EXECUTION: Using parameter pipeline definition with {len(pipeline_definition)} steps (subprocess mode)") 

870 # In subprocess mode, the pipeline_definition parameter should already be resolved 

871 if not self.is_initialized(): 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true

872 raise RuntimeError("Orchestrator must be initialized before executing.") 

873 if not pipeline_definition: 873 ↛ 874line 873 didn't jump to line 874 because the condition on line 873 was never true

874 raise ValueError("A valid (stateless) pipeline definition must be provided.") 

875 if not compiled_contexts: 875 ↛ 876line 875 didn't jump to line 876 because the condition on line 875 was never true

876 logger.warning("No compiled contexts provided for execution.") 

877 return {} 

878 

879 # Use effective config (includes pipeline config) instead of global config directly 

880 actual_max_workers = max_workers if max_workers is not None else self.get_effective_config().num_workers 

881 if actual_max_workers <= 0: # Ensure positive number of workers 881 ↛ 882line 881 didn't jump to line 882 because the condition on line 881 was never true

882 actual_max_workers = 1 

883 

884 # 🔬 AUTOMATIC VISUALIZER CREATION: Create visualizers if compiler detected streaming 

885 visualizers = [] 

886 if visualizer is None: 886 ↛ 934line 886 didn't jump to line 934 because the condition on line 886 was always true

887 from openhcs.core.config import StreamingConfig 

888 

889 # Collect unique configs (deduplicate by viewer_type + port) 

890 unique_configs = {} 

891 for ctx in compiled_contexts.values(): 

892 for visualizer_info in ctx.required_visualizers: 

893 config = visualizer_info['config'] 

894 key = (config.viewer_type, config.port) if isinstance(config, StreamingConfig) else (config.backend.name,) 

895 if key not in unique_configs: 

896 unique_configs[key] = (config, ctx.visualizer_config) 

897 

898 # Create visualizers 

899 for config, vis_config in unique_configs.values(): 

900 visualizers.append(self.get_or_create_visualizer(config, vis_config)) 

901 

902 # Wait for all streaming viewers to be ready before starting pipeline 

903 # This ensures viewers are available to receive images 

904 if visualizers: 

905 logger.info(f"🔬 ORCHESTRATOR: Waiting for {len(visualizers)} streaming viewer(s) to be ready...") 

906 import time 

907 max_wait = 30.0 # Maximum wait time in seconds 

908 start_time = time.time() 

909 

910 while time.time() - start_time < max_wait: 

911 all_ready = all(v.is_running for v in visualizers) 

912 if all_ready: 912 ↛ 913line 912 didn't jump to line 913 because the condition on line 912 was never true

913 logger.info("🔬 ORCHESTRATOR: All streaming viewers are ready!") 

914 break 

915 time.sleep(0.2) # Check every 200ms 

916 else: 

917 # Timeout - log which viewers aren't ready (use generic port attribute) 

918 not_ready = [v.port for v in visualizers if not v.is_running] 

919 logger.warning(f"🔬 ORCHESTRATOR: Timeout waiting for streaming viewers. Not ready: {not_ready}") 

920 

921 # Clear viewer state for new pipeline run to prevent accumulation 

922 logger.info("🔬 ORCHESTRATOR: Clearing streaming viewer state for new pipeline run...") 

923 for vis in visualizers: 

924 if hasattr(vis, 'clear_viewer_state'): 924 ↛ 923line 924 didn't jump to line 923 because the condition on line 924 was always true

925 success = vis.clear_viewer_state() 

926 if success: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true

927 logger.info(f"🔬 ORCHESTRATOR: Cleared state for viewer on port {vis.port}") 

928 else: 

929 logger.warning(f"🔬 ORCHESTRATOR: Failed to clear state for viewer on port {vis.port}") 

930 

931 # For backwards compatibility, set visualizer to the first one 

932 visualizer = visualizers[0] if visualizers else None 

933 

934 self._state = OrchestratorState.EXECUTING 

935 logger.info(f"Starting execution for {len(compiled_contexts)} axis values with max_workers={actual_max_workers}.") 

936 

937 try: 

938 execution_results: Dict[str, Dict[str, Any]] = {} 

939 

940 # CUDA COMPATIBILITY: Set spawn method for multiprocessing to support CUDA 

941 try: 

942 # Check if spawn method is available and set it if not already set 

943 current_method = multiprocessing.get_start_method(allow_none=True) 

944 if current_method != 'spawn': 

945 logger.info(f"🔥 CUDA: Setting multiprocessing start method from '{current_method}' to 'spawn' for CUDA compatibility") 

946 multiprocessing.set_start_method('spawn', force=True) 

947 else: 

948 logger.debug("🔥 CUDA: Multiprocessing start method already set to 'spawn'") 

949 except RuntimeError as e: 

950 # Start method may already be set, which is fine 

951 logger.debug(f"🔥 CUDA: Start method already configured: {e}") 

952 

953 # Choose executor type based on effective config for debugging support 

954 effective_config = self.get_effective_config() 

955 executor_type = "ThreadPoolExecutor" if effective_config.use_threading else "ProcessPoolExecutor" 

956 logger.info(f"🔥 ORCHESTRATOR: Creating {executor_type} with {actual_max_workers} workers") 

957 

958 # DEATH DETECTION: Mark executor creation 

959 logger.info(f"🔥 DEATH_MARKER: BEFORE_{executor_type.upper()}_CREATION") 

960 

961 # Choose appropriate executor class and configure worker logging 

962 if effective_config.use_threading: 962 ↛ 963line 962 didn't jump to line 963 because the condition on line 962 was never true

963 logger.info("🔥 DEBUG MODE: Using ThreadPoolExecutor for easier debugging") 

964 executor = concurrent.futures.ThreadPoolExecutor(max_workers=actual_max_workers) 

965 else: 

966 logger.info("🔥 PRODUCTION MODE: Using ProcessPoolExecutor for true parallelism") 

967 # CRITICAL FIX: Use _configure_worker_with_gpu to ensure workers have function registry 

968 # Workers need the function registry to access decorated functions with memory types 

969 global_config = get_current_global_config(GlobalPipelineConfig) 

970 global_config_dict = global_config.__dict__ if global_config else {} 

971 

972 if log_file_base: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true

973 logger.info("🔥 WORKER SETUP: Configuring worker processes with function registry and logging") 

974 executor = concurrent.futures.ProcessPoolExecutor( 

975 max_workers=actual_max_workers, 

976 initializer=_configure_worker_with_gpu, 

977 initargs=(log_file_base, global_config_dict) 

978 ) 

979 else: 

980 logger.info("🔥 WORKER SETUP: Configuring worker processes with function registry (no logging)") 

981 executor = concurrent.futures.ProcessPoolExecutor( 

982 max_workers=actual_max_workers, 

983 initializer=_configure_worker_with_gpu, 

984 initargs=("", global_config_dict) # Empty string for no logging 

985 ) 

986 

987 logger.info(f"🔥 DEATH_MARKER: ENTERING_{executor_type.upper()}_CONTEXT") 

988 # Store executor for cancellation support 

989 self._executor = executor 

990 with executor: 

991 logger.info(f"🔥 DEATH_MARKER: {executor_type.upper()}_CREATED_SUCCESSFULLY") 

992 logger.info(f"🔥 ORCHESTRATOR: {executor_type} created, submitting {len(compiled_contexts)} tasks") 

993 

994 # NUCLEAR ERROR TRACING: Create snapshot of compiled_contexts to prevent iteration issues 

995 contexts_snapshot = dict(compiled_contexts.items()) 

996 logger.info(f"🔥 ORCHESTRATOR: Created contexts snapshot with {len(contexts_snapshot)} items") 

997 

998 # CRITICAL FIX: Resolve all lazy dataclass instances before multiprocessing 

999 # This ensures that the contexts are safe for pickling in ProcessPoolExecutor 

1000 # Note: Don't resolve pipeline_definition as it may overwrite collision-resolved configs 

1001 logger.info("🔥 ORCHESTRATOR: Resolving lazy dataclasses for multiprocessing compatibility") 

1002 contexts_snapshot = resolve_lazy_configurations_for_serialization(contexts_snapshot) 

1003 logger.info("🔥 ORCHESTRATOR: Lazy dataclass resolution completed") 

1004 

1005 logger.info("🔥 DEATH_MARKER: BEFORE_TASK_SUBMISSION_LOOP") 

1006 future_to_axis_id = {} 

1007 config = get_openhcs_config() 

1008 if not config: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true

1009 raise RuntimeError("Component configuration is required for orchestrator execution") 

1010 axis_name = config.multiprocessing_axis.value 

1011 for axis_id, context in contexts_snapshot.items(): 

1012 try: 

1013 logger.info(f"🔥 DEATH_MARKER: SUBMITTING_TASK_FOR_{axis_name.upper()}_{axis_id}") 

1014 logger.info(f"🔥 ORCHESTRATOR: Submitting task for {axis_name} {axis_id}") 

1015 # Resolve all arguments before passing to ProcessPoolExecutor 

1016 resolved_context = resolve_lazy_configurations_for_serialization(context) 

1017 

1018 # Use static function to avoid pickling the orchestrator instance 

1019 # Note: Use original pipeline_definition to preserve collision-resolved configs 

1020 # Don't pass visualizer to worker processes - they communicate via ZeroMQ 

1021 future = executor.submit( 

1022 _execute_single_axis_static, 

1023 pipeline_definition, 

1024 resolved_context, 

1025 None # visualizer 

1026 ) 

1027 future_to_axis_id[future] = axis_id 

1028 logger.info(f"🔥 ORCHESTRATOR: Task submitted for {axis_name} {axis_id}") 

1029 logger.info(f"🔥 DEATH_MARKER: TASK_SUBMITTED_FOR_{axis_name.upper()}_{axis_id}") 

1030 except Exception as submit_error: 

1031 error_msg = f"🔥 ORCHESTRATOR ERROR: Failed to submit task for {axis_name} {axis_id}: {submit_error}" 

1032 logger.error(error_msg, exc_info=True) 

1033 # FAIL-FAST: Re-raise task submission errors immediately 

1034 raise 

1035 

1036 logger.info("🔥 DEATH_MARKER: TASK_SUBMISSION_LOOP_COMPLETED") 

1037 

1038 logger.info(f"🔥 ORCHESTRATOR: All {len(future_to_axis_id)} tasks submitted, waiting for completion") 

1039 logger.info("🔥 DEATH_MARKER: BEFORE_COMPLETION_LOOP") 

1040 

1041 completed_count = 0 

1042 logger.info("🔥 DEATH_MARKER: ENTERING_AS_COMPLETED_LOOP") 

1043 for future in concurrent.futures.as_completed(future_to_axis_id): 

1044 axis_id = future_to_axis_id[future] 

1045 completed_count += 1 

1046 logger.info(f"🔥 DEATH_MARKER: PROCESSING_COMPLETED_TASK_{completed_count}_{axis_name.upper()}_{axis_id}") 

1047 logger.info(f"🔥 ORCHESTRATOR: Task {completed_count}/{len(future_to_axis_id)} completed for {axis_name} {axis_id}") 

1048 

1049 try: 

1050 logger.info(f"🔥 DEATH_MARKER: CALLING_FUTURE_RESULT_FOR_{axis_name.upper()}_{axis_id}") 

1051 result = future.result() 

1052 logger.info(f"🔥 DEATH_MARKER: FUTURE_RESULT_SUCCESS_FOR_{axis_name.upper()}_{axis_id}") 

1053 logger.info(f"🔥 ORCHESTRATOR: {axis_name.title()} {axis_id} result: {result}") 

1054 execution_results[axis_id] = result 

1055 logger.info(f"🔥 DEATH_MARKER: RESULT_STORED_FOR_{axis_name.upper()}_{axis_id}") 

1056 except Exception as exc: 

1057 import traceback 

1058 full_traceback = traceback.format_exc() 

1059 error_msg = f"{axis_name.title()} {axis_id} generated an exception during execution: {exc}" 

1060 logger.error(f"🔥 ORCHESTRATOR ERROR: {error_msg}", exc_info=True) 

1061 logger.error(f"🔥 ORCHESTRATOR FULL TRACEBACK for {axis_name} {axis_id}:\n{full_traceback}") 

1062 # FAIL-FAST: Re-raise immediately instead of storing error 

1063 raise 

1064 

1065 logger.info("🔥 DEATH_MARKER: COMPLETION_LOOP_FINISHED") 

1066 

1067 logger.info(f"🔥 ORCHESTRATOR: All tasks completed, {len(execution_results)} results collected") 

1068 

1069 # Explicitly shutdown executor INSIDE the with block to avoid hang on context exit 

1070 logger.info("🔥 ORCHESTRATOR: Explicitly shutting down executor") 

1071 executor.shutdown(wait=True, cancel_futures=False) 

1072 logger.info("🔥 ORCHESTRATOR: Executor shutdown complete") 

1073 

1074 # Determine if we're using multiprocessing (ProcessPoolExecutor) or threading 

1075 effective_config = self.get_effective_config() 

1076 use_multiprocessing = not effective_config.use_threading 

1077 logger.info(f"🔥 ORCHESTRATOR: About to start GPU cleanup (use_multiprocessing={use_multiprocessing})") 

1078 

1079 # 🔥 GPU CLEANUP: Skip in multiprocessing mode - workers handle their own cleanup 

1080 # In multiprocessing mode, GPU cleanup in the main process can hang because 

1081 # GPU contexts are owned by worker processes, not the orchestrator process 

1082 try: 

1083 if cleanup_all_gpu_frameworks and not use_multiprocessing: 1083 ↛ 1084line 1083 didn't jump to line 1084 because the condition on line 1083 was never true

1084 logger.info("🔥 ORCHESTRATOR: Running GPU cleanup...") 

1085 cleanup_all_gpu_frameworks() 

1086 logger.info("🔥 GPU CLEANUP: Cleared all GPU frameworks after plate execution") 

1087 elif use_multiprocessing: 1087 ↛ 1092line 1087 didn't jump to line 1092 because the condition on line 1087 was always true

1088 logger.info("🔥 GPU CLEANUP: Skipped in multiprocessing mode (workers handle their own cleanup)") 

1089 except Exception as cleanup_error: 

1090 logger.warning(f"Failed to cleanup GPU memory after plate execution: {cleanup_error}") 

1091 

1092 logger.info("🔥 ORCHESTRATOR: GPU cleanup section finished") 

1093 

1094 logger.info("🔥 ORCHESTRATOR: Plate execution completed, checking for analysis consolidation") 

1095 # Run automatic analysis consolidation if enabled 

1096 shared_context = get_current_global_config(GlobalPipelineConfig) 

1097 logger.info(f"🔥 ORCHESTRATOR: Analysis consolidation enabled={shared_context.analysis_consolidation_config.enabled}") 

1098 if shared_context.analysis_consolidation_config.enabled: 1098 ↛ 1146line 1098 didn't jump to line 1146 because the condition on line 1098 was always true

1099 try: 

1100 logger.info("🔥 ORCHESTRATOR: Starting consolidation - finding results directory") 

1101 # Get results directory using same logic as path planner (single source of truth) 

1102 results_dir = None 

1103 for axis_id, context in compiled_contexts.items(): 

1104 # Use the same logic as PathPlanner._get_results_path() 

1105 plate_path = Path(context.plate_path) 

1106 materialization_path = shared_context.materialization_results_path 

1107 

1108 if Path(materialization_path).is_absolute(): 1108 ↛ 1109line 1108 didn't jump to line 1109 because the condition on line 1108 was never true

1109 potential_results_dir = Path(materialization_path) 

1110 else: 

1111 potential_results_dir = plate_path / materialization_path 

1112 

1113 if potential_results_dir.exists(): 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true

1114 results_dir = potential_results_dir 

1115 logger.info(f"🔍 CONSOLIDATION: Found results directory: {results_dir}") 

1116 break 

1117 

1118 if results_dir and results_dir.exists(): 1118 ↛ 1119line 1118 didn't jump to line 1119 because the condition on line 1118 was never true

1119 logger.info(f"🔥 ORCHESTRATOR: Results directory exists: {results_dir}") 

1120 # Check if there are actually CSV files (materialized results) 

1121 logger.info("🔥 ORCHESTRATOR: Checking for CSV files...") 

1122 csv_files = list(results_dir.glob("*.csv")) 

1123 logger.info(f"🔥 ORCHESTRATOR: Found {len(csv_files)} CSV files") 

1124 if csv_files: 

1125 logger.info(f"🔄 CONSOLIDATION: Found {len(csv_files)} CSV files, running consolidation") 

1126 # Get well IDs from compiled contexts 

1127 axis_ids = list(compiled_contexts.keys()) 

1128 logger.info(f"🔄 CONSOLIDATION: Using well IDs: {axis_ids}") 

1129 

1130 logger.info("🔥 ORCHESTRATOR: Calling consolidate_analysis_results()...") 

1131 consolidate_fn = _get_consolidate_analysis_results() 

1132 consolidate_fn( 

1133 results_directory=str(results_dir), 

1134 well_ids=axis_ids, 

1135 consolidation_config=shared_context.analysis_consolidation_config, 

1136 plate_metadata_config=shared_context.plate_metadata_config 

1137 ) 

1138 logger.info("✅ CONSOLIDATION: Completed successfully") 

1139 else: 

1140 logger.info(f"⏭️ CONSOLIDATION: No CSV files found in {results_dir}, skipping") 

1141 else: 

1142 logger.info("⏭️ CONSOLIDATION: No results directory found in compiled contexts") 

1143 except Exception as e: 

1144 logger.error(f"❌ CONSOLIDATION: Failed: {e}") 

1145 else: 

1146 logger.info("🔥 ORCHESTRATOR: Analysis consolidation disabled, skipping") 

1147 

1148 # Update state based on execution results 

1149 logger.info("🔥 ORCHESTRATOR: Updating orchestrator state based on execution results") 

1150 if all(result.get("status") == "success" for result in execution_results.values()): 1150 ↛ 1153line 1150 didn't jump to line 1153 because the condition on line 1150 was always true

1151 self._state = OrchestratorState.COMPLETED 

1152 else: 

1153 self._state = OrchestratorState.EXEC_FAILED 

1154 logger.info(f"🔥 ORCHESTRATOR: State updated to {self._state}") 

1155 

1156 # 🔬 VISUALIZER CLEANUP: Stop all visualizers if they were auto-created and not persistent 

1157 logger.info(f"🔬 ORCHESTRATOR: Starting visualizer cleanup for {len(visualizers)} visualizers") 

1158 for idx, vis in enumerate(visualizers): 1158 ↛ 1159line 1158 didn't jump to line 1159 because the loop on line 1158 never started

1159 try: 

1160 logger.info(f"🔬 ORCHESTRATOR: Processing visualizer {idx+1}/{len(visualizers)}, persistent={vis.persistent}") 

1161 if not vis.persistent: 

1162 logger.info(f"🔬 ORCHESTRATOR: Calling stop_viewer() for non-persistent visualizer {idx+1}") 

1163 vis.stop_viewer() 

1164 logger.info(f"🔬 ORCHESTRATOR: Stopped non-persistent visualizer {idx+1}") 

1165 else: 

1166 logger.info("🔬 ORCHESTRATOR: Keeping persistent visualizer alive (no cleanup needed)") 

1167 # Persistent visualizers stay alive across executions - no cleanup needed 

1168 # The ZMQ connection will be reused for the next execution 

1169 except Exception as e: 

1170 logger.warning(f"🔬 ORCHESTRATOR: Failed to cleanup visualizer {idx+1}: {e}") 

1171 logger.info("🔬 ORCHESTRATOR: Visualizer cleanup complete") 

1172 

1173 logger.info(f"🔥 ORCHESTRATOR: Plate execution finished. Results: {execution_results}") 

1174 

1175 return execution_results 

1176 except Exception as e: 

1177 self._state = OrchestratorState.EXEC_FAILED 

1178 logger.error(f"Failed to execute compiled plate: {e}") 

1179 raise 

1180 

1181 def get_component_keys(self, component: Union['AllComponents', 'VariableComponents'], component_filter: Optional[List[Union[str, int]]] = None) -> List[str]: 

1182 """ 

1183 Generic method to get component keys using VariableComponents directly. 

1184 

1185 Returns the discovered component values as strings to match the pattern 

1186 detection system format. 

1187 

1188 Tries metadata cache first, falls back to filename parsing cache if metadata is empty. 

1189 

1190 Args: 

1191 component: AllComponents or VariableComponents enum specifying which component to extract 

1192 (also accepts GroupBy enum which will be converted to AllComponents) 

1193 component_filter: Optional list of component values to filter by 

1194 

1195 Returns: 

1196 List of component values as strings, sorted 

1197 

1198 Raises: 

1199 RuntimeError: If orchestrator is not initialized 

1200 """ 

1201 if not self.is_initialized(): 1201 ↛ 1202line 1201 didn't jump to line 1202 because the condition on line 1201 was never true

1202 raise RuntimeError("Orchestrator must be initialized before getting component keys.") 

1203 

1204 # Convert GroupBy to AllComponents using OpenHCS generic utility 

1205 if isinstance(component, GroupBy) and component.value is None: 1205 ↛ 1206line 1205 didn't jump to line 1206 because the condition on line 1205 was never true

1206 raise ValueError("Cannot get component keys for GroupBy.NONE") 

1207 

1208 # Convert to AllComponents for cache lookup (includes multiprocessing axis) 

1209 component = convert_enum_by_value(component, AllComponents) or component 

1210 

1211 # Use component directly - let natural errors occur for wrong types 

1212 component_name = component.value 

1213 

1214 # Try metadata cache first (preferred source) 

1215 cached_metadata = self._metadata_cache_service.get_cached_metadata(component) 

1216 if cached_metadata: 

1217 all_components = list(cached_metadata.keys()) 

1218 logger.debug(f"Using metadata cache for {component_name}: {len(all_components)} components") 

1219 else: 

1220 # Fall back to filename parsing cache 

1221 all_components = self._component_keys_cache[component] # Let KeyError bubble up naturally 

1222 

1223 if not all_components: 1223 ↛ 1224line 1223 didn't jump to line 1224 because the condition on line 1223 was never true

1224 logger.warning(f"No {component_name} values found in input directory: {self.input_dir}") 

1225 return [] 

1226 

1227 logger.debug(f"Using filename parsing cache for {component.value}: {len(all_components)} components") 

1228 

1229 if component_filter: 

1230 str_component_filter = {str(c) for c in component_filter} 

1231 selected_components = [comp for comp in all_components if comp in str_component_filter] 

1232 if not selected_components: 1232 ↛ 1233line 1232 didn't jump to line 1233 because the condition on line 1232 was never true

1233 component_name = group_by.value 

1234 logger.warning(f"No {component_name} values from {all_components} match the filter: {component_filter}") 

1235 return selected_components 

1236 else: 

1237 return all_components 

1238 

1239 def cache_component_keys(self, components: Optional[List['AllComponents']] = None) -> None: 

1240 """ 

1241 Pre-compute and cache component keys for fast access using single-pass parsing. 

1242 

1243 This method performs expensive file listing and parsing operations once, 

1244 extracting all component types in a single pass for maximum efficiency. 

1245 

1246 Args: 

1247 components: Optional list of AllComponents to cache. 

1248 If None, caches all components in the AllComponents enum. 

1249 """ 

1250 if not self.is_initialized(): 1250 ↛ 1251line 1250 didn't jump to line 1251 because the condition on line 1250 was never true

1251 raise RuntimeError("Orchestrator must be initialized before caching component keys.") 

1252 

1253 if components is None: 1253 ↛ 1256line 1253 didn't jump to line 1256 because the condition on line 1253 was always true

1254 components = list(AllComponents) # Cache all enum values including multiprocessing axis 

1255 

1256 logger.info(f"Caching component keys for: {[comp.value for comp in components]}") 

1257 

1258 # Initialize component sets for all requested components 

1259 component_sets: Dict['AllComponents', Set[Union[str, int]]] = {} 

1260 for component in components: 

1261 component_sets[component] = set() 

1262 

1263 # Single pass through all filenames - extract all components at once 

1264 try: 

1265 # Use primary backend from microscope handler 

1266 backend_to_use = self.microscope_handler.get_primary_backend(self.input_dir, self.filemanager) 

1267 logger.debug(f"Using backend '{backend_to_use}' for file listing based on available backends") 

1268 

1269 filenames = self.filemanager.list_files(str(self.input_dir), backend_to_use, extensions=DEFAULT_IMAGE_EXTENSIONS) 

1270 logger.debug(f"Parsing {len(filenames)} filenames in single pass...") 

1271 

1272 for filename in filenames: 

1273 parsed_info = self.microscope_handler.parser.parse_filename(str(filename)) 

1274 if parsed_info: 1274 ↛ 1281line 1274 didn't jump to line 1281 because the condition on line 1274 was always true

1275 # Extract all requested components from this filename 

1276 for component in component_sets: 

1277 component_name = component.value 

1278 if component_name in parsed_info and parsed_info[component_name] is not None: 1278 ↛ 1276line 1278 didn't jump to line 1276 because the condition on line 1278 was always true

1279 component_sets[component].add(parsed_info[component_name]) 

1280 else: 

1281 logger.warning(f"Could not parse filename: {filename}") 

1282 

1283 except Exception as e: 

1284 logger.error(f"Error listing files or parsing filenames from {self.input_dir}: {e}", exc_info=True) 

1285 # Initialize empty sets for failed parsing 

1286 for component in component_sets: 

1287 component_sets[component] = set() 

1288 

1289 # Convert sets to sorted lists and store in cache 

1290 for component, component_set in component_sets.items(): 

1291 sorted_components = [str(comp) for comp in sorted(list(component_set))] 

1292 self._component_keys_cache[component] = sorted_components 

1293 logger.debug(f"Cached {len(sorted_components)} {component.value} keys") 

1294 

1295 if not sorted_components: 1295 ↛ 1296line 1295 didn't jump to line 1296 because the condition on line 1295 was never true

1296 logger.warning(f"No {component.value} values found in input directory: {self.input_dir}") 

1297 

1298 logger.info(f"Component key caching complete. Cached {len(component_sets)} component types in single pass.") 

1299 

1300 def clear_component_cache(self, components: Optional[List['AllComponents']] = None) -> None: 

1301 """ 

1302 Clear cached component keys to force recomputation. 

1303 

1304 Use this when the input directory contents have changed and you need 

1305 to refresh the component key cache. 

1306 

1307 Args: 

1308 components: Optional list of AllComponents to clear from cache. 

1309 If None, clears entire cache. 

1310 """ 

1311 if components is None: 

1312 self._component_keys_cache.clear() 

1313 logger.info("Cleared entire component keys cache") 

1314 else: 

1315 for component in components: 

1316 if component in self._component_keys_cache: 

1317 del self._component_keys_cache[component] 

1318 logger.debug(f"Cleared cache for {component.value}") 

1319 logger.info(f"Cleared cache for {len(components)} component types") 

1320 

1321 @property 

1322 def metadata_cache(self) -> MetadataCache: 

1323 """Access to metadata cache service.""" 

1324 return self._metadata_cache_service 

1325 

1326 

1327 

1328 # Global config management removed - handled by UI layer 

1329 

1330 @property 

1331 def pipeline_config(self) -> Optional['PipelineConfig']: 

1332 """Get current pipeline configuration.""" 

1333 return self._pipeline_config 

1334 

1335 @pipeline_config.setter 

1336 def pipeline_config(self, value: Optional['PipelineConfig']) -> None: 

1337 """Set pipeline configuration with auto-sync to thread-local context.""" 

1338 self._pipeline_config = value 

1339 # CRITICAL FIX: Also update public attribute for dual-axis resolver discovery 

1340 # This ensures the resolver can always find the current pipeline config 

1341 if hasattr(self, '__dict__'): # Avoid issues during __init__ 1341 ↛ 1343line 1341 didn't jump to line 1343 because the condition on line 1341 was always true

1342 self.__dict__['pipeline_config'] = value 

1343 if self._auto_sync_enabled and value is not None: 

1344 self._sync_to_thread_local() 

1345 

1346 def _sync_to_thread_local(self) -> None: 

1347 """Internal method to sync current pipeline_config to thread-local context.""" 

1348 if self._pipeline_config and hasattr(self, 'plate_path'): 1348 ↛ 1349line 1348 didn't jump to line 1349 because the condition on line 1348 was never true

1349 self.apply_pipeline_config(self._pipeline_config) 

1350 

1351 def apply_pipeline_config(self, pipeline_config: 'PipelineConfig') -> None: 

1352 """ 

1353 Apply per-orchestrator configuration using thread-local storage. 

1354 

1355 This method sets the orchestrator's effective config in thread-local storage 

1356 for step-level lazy configurations to resolve against. 

1357 """ 

1358 # Import PipelineConfig at runtime for isinstance check 

1359 from openhcs.core.config import PipelineConfig 

1360 if not isinstance(pipeline_config, PipelineConfig): 

1361 raise TypeError(f"Expected PipelineConfig, got {type(pipeline_config)}") 

1362 

1363 # Temporarily disable auto-sync to prevent recursion 

1364 self._auto_sync_enabled = False 

1365 try: 

1366 self._pipeline_config = pipeline_config 

1367 finally: 

1368 self._auto_sync_enabled = True 

1369 

1370 # CRITICAL FIX: Do NOT contaminate thread-local context during PipelineConfig editing 

1371 # The orchestrator should maintain its own internal context without modifying 

1372 # the global thread-local context. This prevents reset operations from showing 

1373 # orchestrator's saved values instead of original thread-local defaults. 

1374 # 

1375 # The merged config is computed internally and used by get_effective_config() 

1376 # but should NOT be set as the global thread-local context. 

1377 

1378 logger.info(f"Applied orchestrator config for plate: {self.plate_path}") 

1379 

1380 def get_effective_config(self, *, for_serialization: bool = False) -> GlobalPipelineConfig: 

1381 """ 

1382 Get effective configuration for this orchestrator. 

1383 

1384 Args: 

1385 for_serialization: If True, resolves all values for pickling/storage. 

1386 If False, preserves None values for sibling inheritance. 

1387 """ 

1388 

1389 if for_serialization: 1389 ↛ 1390line 1389 didn't jump to line 1390 because the condition on line 1389 was never true

1390 result = self.pipeline_config.to_base_config() 

1391 

1392 # DEBUG: Check what the serialization result looks like 

1393 if hasattr(result, 'step_well_filter_config'): 

1394 step_config = getattr(result, 'step_well_filter_config') 

1395 if hasattr(step_config, 'well_filter'): 

1396 well_filter_value = getattr(step_config, 'well_filter') 

1397 logger.debug(f"Serialization result has step_well_filter_config.well_filter = {well_filter_value}") 

1398 

1399 return result 

1400 else: 

1401 # Reuse existing merged config logic from apply_pipeline_config 

1402 shared_context = get_current_global_config(GlobalPipelineConfig) 

1403 if not shared_context: 1403 ↛ 1404line 1403 didn't jump to line 1404 because the condition on line 1403 was never true

1404 raise RuntimeError("No global configuration context available for merging") 

1405 

1406 # DEBUG: Check what the shared context looks like before merging 

1407 if hasattr(shared_context, 'step_well_filter_config'): 1407 ↛ 1413line 1407 didn't jump to line 1413 because the condition on line 1407 was always true

1408 step_config = getattr(shared_context, 'step_well_filter_config') 

1409 if hasattr(step_config, 'well_filter'): 1409 ↛ 1413line 1409 didn't jump to line 1413 because the condition on line 1409 was always true

1410 well_filter_value = getattr(step_config, 'well_filter') 

1411 logger.debug(f"Shared context before merge has step_well_filter_config.well_filter = {well_filter_value}") 

1412 

1413 result = _create_merged_config(self.pipeline_config, shared_context) 

1414 

1415 # DEBUG: Check what the merged result looks like 

1416 if hasattr(result, 'step_well_filter_config'): 1416 ↛ 1422line 1416 didn't jump to line 1422 because the condition on line 1416 was always true

1417 step_config = getattr(result, 'step_well_filter_config') 

1418 if hasattr(step_config, 'well_filter'): 1418 ↛ 1422line 1418 didn't jump to line 1422 because the condition on line 1418 was always true

1419 well_filter_value = getattr(step_config, 'well_filter') 

1420 logger.debug(f"Merged result has step_well_filter_config.well_filter = {well_filter_value}") 

1421 

1422 return result 

1423 

1424 

1425 

1426 def clear_pipeline_config(self) -> None: 

1427 """Clear per-orchestrator configuration.""" 

1428 # REMOVED: Thread-local modification - dual-axis resolver handles context automatically 

1429 # No need to modify thread-local storage when clearing orchestrator config 

1430 self.pipeline_config = None 

1431 logger.info(f"Cleared per-orchestrator config for plate: {self.plate_path}") 

1432 

1433 def cleanup_pipeline_config(self) -> None: 

1434 """Clean up orchestrator context when done (for backward compatibility).""" 

1435 self.clear_pipeline_config() 

1436 

1437 def __del__(self): 

1438 """Ensure config cleanup on orchestrator destruction.""" 

1439 try: 

1440 # Clear any stored configuration references 

1441 self.clear_pipeline_config() 

1442 except Exception: 

1443 # Ignore errors during cleanup in destructor to prevent cascading failures 

1444 pass