Coverage for openhcs/core/orchestrator/orchestrator.py: 57.8%

475 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Consolidated orchestrator module for OpenHCS. 

3 

4This module provides a unified PipelineOrchestrator class that implements 

5a two-phase (compile-all-then-execute-all) pipeline execution model. 

6 

7Doctrinal Clauses: 

8- Clause 12 — Absolute Clean Execution 

9- Clause 66 — Immutability After Construction 

10- Clause 88 — No Inferred Capabilities 

11- Clause 293 — GPU Pre-Declaration Enforcement 

12- Clause 295 — GPU Scheduling Affinity 

13""" 

14 

15import logging 

16import concurrent.futures 

17import multiprocessing 

18from pathlib import Path 

19from typing import Any, Callable, Dict, List, Optional, Union, Set 

20 

21from openhcs.constants.constants import Backend, DEFAULT_WORKSPACE_DIR_SUFFIX, DEFAULT_IMAGE_EXTENSIONS, GroupBy, OrchestratorState 

22from openhcs.constants import Microscope 

23from openhcs.core.config import GlobalPipelineConfig, get_default_global_config, PipelineConfig 

24from openhcs.core.context.processing_context import ProcessingContext 

25from openhcs.core.pipeline.compiler import PipelineCompiler 

26from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper 

27from openhcs.core.steps.abstract import AbstractStep, get_step_id 

28from openhcs.io.exceptions import StorageWriteError 

29from openhcs.io.filemanager import FileManager 

30from openhcs.io.base import storage_registry 

31from openhcs.microscopes import create_microscope_handler 

32from openhcs.microscopes.microscope_base import MicroscopeHandler 

33 

34# Optional napari import for visualization 

35try: 

36 from openhcs.runtime.napari_stream_visualizer import NapariStreamVisualizer 

37 NapariVisualizerType = NapariStreamVisualizer 

38except ImportError: 

39 # Create a placeholder type for type hints when napari is not available 

40 NapariStreamVisualizer = None 

41 NapariVisualizerType = Any # Use Any for type hints when napari is not available 

42 

43 

44logger = logging.getLogger(__name__) 

45 

46 

47def _configure_worker_logging(log_file_base: str): 

48 """ 

49 Configure logging and import hook for worker process. 

50 

51 This function is called once per worker process when it starts. 

52 Each worker will get its own log file with a unique identifier. 

53 

54 Args: 

55 log_file_base: Base path for worker log files 

56 """ 

57 import os 

58 import logging 

59 import time 

60 

61 # CRITICAL: Skip function registry initialization for fast worker startup 

62 # The environment variable is inherited from the subprocess runner 

63 # Note: We don't log this yet because logging isn't configured 

64 

65 # CRITICAL: Install import hook for auto-discovered functions 

66 # Worker processes are fresh Python processes that need the import hook 

67 try: 

68 from openhcs.processing.func_registry import _install_import_hook 

69 _install_import_hook() 

70 # Note: We don't log this yet because logging isn't configured 

71 except Exception: 

72 # Can't log yet, but this is critical - the worker will fail later 

73 pass 

74 

75 # Create unique worker identifier using PID and timestamp 

76 worker_pid = os.getpid() 

77 worker_timestamp = int(time.time() * 1000000) # Microsecond precision for uniqueness 

78 worker_id = f"{worker_pid}_{worker_timestamp}" 

79 worker_log_file = f"{log_file_base}_worker_{worker_id}.log" 

80 

81 # Configure root logger to capture ALL logs from worker process 

82 root_logger = logging.getLogger() 

83 root_logger.handlers.clear() # Clear any inherited handlers 

84 

85 # Create file handler for worker logs 

86 file_handler = logging.FileHandler(worker_log_file) 

87 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) 

88 root_logger.addHandler(file_handler) 

89 root_logger.setLevel(logging.INFO) 

90 

91 # Ensure all OpenHCS module logs are captured 

92 logging.getLogger("openhcs").setLevel(logging.INFO) 

93 

94 # Get worker logger 

95 worker_logger = logging.getLogger("openhcs.worker") 

96 worker_logger.info(f"🔥 WORKER: Process {worker_pid} (ID: {worker_id}) logging configured") 

97 worker_logger.info(f"🔥 WORKER: All logs writing to: {worker_log_file}") 

98 

99 # Log import hook installation status 

100 worker_logger.info(f"🔥 WORKER: Import hook installed for auto-discovered functions") 

101 

102 

103# Global variable to store log file base for worker processes 

104_worker_log_file_base = None 

105 

106 

107def _ensure_step_ids_for_multiprocessing( 

108 frozen_context: ProcessingContext, 

109 pipeline_definition: List[AbstractStep], 

110 well_id: str 

111) -> None: 

112 """ 

113 Helper function to update step IDs after multiprocessing pickle/unpickle. 

114  

115 When contexts are pickled/unpickled for multiprocessing, step objects get 

116 new memory addresses, changing their IDs. This remaps the step_plans. 

117 """ 

118 from openhcs.core.pipeline.compiler import PipelineCompiler 

119 try: 

120 logger.debug(f"🔥 MULTIPROCESSING: Updating step IDs for well {well_id}") 

121 PipelineCompiler.update_step_ids_for_multiprocessing(frozen_context, pipeline_definition) 

122 logger.debug(f"🔥 MULTIPROCESSING: Step IDs updated successfully for well {well_id}") 

123 except Exception as remap_error: 

124 error_msg = f"🔥 MULTIPROCESSING ERROR: Failed to remap step IDs for well {well_id}: {remap_error}" 

125 logger.error(error_msg, exc_info=True) 

126 raise RuntimeError(error_msg) from remap_error 

127 

128 

129class PipelineOrchestrator: 

130 """ 

131 Updated orchestrator supporting both global and per-orchestrator configuration. 

132 

133 Global configuration: Updates all orchestrators (existing behavior) 

134 Per-orchestrator configuration: Affects only this orchestrator instance 

135 

136 The orchestrator first compiles the pipeline for all specified wells, 

137 creating frozen, immutable ProcessingContexts using `compile_plate_for_processing()`. 

138 Then, it executes the (now stateless) pipeline definition against these contexts, 

139 potentially in parallel, using `execute_compiled_plate()`. 

140 """ 

141 

142 def __init__( 

143 self, 

144 plate_path: Union[str, Path], 

145 workspace_path: Optional[Union[str, Path]] = None, 

146 *, 

147 global_config: Optional[GlobalPipelineConfig] = None, 

148 pipeline_config: Optional[PipelineConfig] = None, 

149 storage_registry: Optional[Any] = None, # Optional StorageRegistry instance 

150 ): 

151 # Lock removed - was orphaned code never used 

152 

153 if global_config is None: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 self.global_config = get_default_global_config() 

155 logger.info("PipelineOrchestrator using default global configuration.") 

156 else: 

157 self.global_config = global_config 

158 

159 # Initialize per-orchestrator configuration 

160 self.pipeline_config = pipeline_config # Per-orchestrator overrides 

161 

162 

163 

164 # Set current pipeline config for MaterializationPathConfig defaults 

165 from openhcs.core.config import set_current_pipeline_config 

166 set_current_pipeline_config(self.global_config) 

167 

168 if plate_path is None: 168 ↛ 172line 168 didn't jump to line 172 because the condition on line 168 was never true

169 # This case should ideally be prevented by TUI logic if plate_path is mandatory 

170 # for an orchestrator instance tied to a specific plate. 

171 # If workspace_path is also None, this will be caught later. 

172 pass 

173 elif isinstance(plate_path, str): 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 plate_path = Path(plate_path) 

175 elif not isinstance(plate_path, Path): 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 raise ValueError(f"Invalid plate_path type: {type(plate_path)}. Must be str or Path.") 

177 

178 if plate_path: 178 ↛ 187line 178 didn't jump to line 187 because the condition on line 178 was always true

179 if not plate_path.is_absolute(): 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 raise ValueError(f"Plate path must be absolute: {plate_path}") 

181 if not plate_path.exists(): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 raise FileNotFoundError(f"Plate path does not exist: {plate_path}") 

183 if not plate_path.is_dir(): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 raise NotADirectoryError(f"Plate path is not a directory: {plate_path}") 

185 

186 # Initialize _plate_path_frozen first to allow plate_path to be set during initialization 

187 object.__setattr__(self, '_plate_path_frozen', False) 

188 

189 self.plate_path = plate_path 

190 self.workspace_path = workspace_path 

191 

192 if self.plate_path is None and self.workspace_path is None: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 raise ValueError("Either plate_path or workspace_path must be provided for PipelineOrchestrator.") 

194 

195 # Freeze plate_path immediately after setting it to prove immutability 

196 object.__setattr__(self, '_plate_path_frozen', True) 

197 logger.info(f"🔒 PLATE_PATH FROZEN: {self.plate_path} is now immutable") 

198 

199 if storage_registry: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 self.registry = storage_registry 

201 logger.info("PipelineOrchestrator using provided StorageRegistry instance.") 

202 else: 

203 from openhcs.io.base import storage_registry as global_registry 

204 # Create a copy of the global registry to avoid modifying shared state 

205 self.registry = global_registry.copy() 

206 logger.info("PipelineOrchestrator created its own StorageRegistry instance (copy of global).") 

207 

208 # Override zarr backend with orchestrator's config 

209 from openhcs.io.zarr import ZarrStorageBackend 

210 from openhcs.constants.constants import Backend 

211 

212 zarr_backend_with_config = ZarrStorageBackend(self.global_config.zarr) 

213 self.registry[Backend.ZARR.value] = zarr_backend_with_config 

214 logger.info(f"Orchestrator zarr backend configured with {self.global_config.zarr.compressor.value} compression") 

215 

216 # Orchestrator always creates its own FileManager, using the determined registry 

217 self.filemanager = FileManager(self.registry) 

218 self.input_dir: Optional[Path] = None 

219 self.microscope_handler: Optional[MicroscopeHandler] = None 

220 self.default_pipeline_definition: Optional[List[AbstractStep]] = None 

221 self._initialized: bool = False 

222 self._state: OrchestratorState = OrchestratorState.CREATED 

223 

224 # Component keys cache for fast access 

225 self._component_keys_cache: Dict[GroupBy, List[str]] = {} 

226 

227 # Metadata cache for component key→name mappings 

228 self._metadata_cache: Dict[GroupBy, Dict[str, Optional[str]]] = {} 

229 

230 def __setattr__(self, name: str, value: Any) -> None: 

231 """ 

232 Set an attribute, preventing modification of plate_path after it's frozen. 

233 

234 This proves that plate_path is truly immutable after initialization. 

235 """ 

236 if name == 'plate_path' and getattr(self, '_plate_path_frozen', False): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 import traceback 

238 stack_trace = ''.join(traceback.format_stack()) 

239 error_msg = ( 

240 f"🚫 IMMUTABLE PLATE_PATH VIOLATION: Cannot modify plate_path after freezing!\n" 

241 f"Current value: {getattr(self, 'plate_path', 'UNSET')}\n" 

242 f"Attempted new value: {value}\n" 

243 f"Stack trace:\n{stack_trace}" 

244 ) 

245 logger.error(error_msg) 

246 raise AttributeError(error_msg) 

247 super().__setattr__(name, value) 

248 

249 @property 

250 def state(self) -> OrchestratorState: 

251 """Get the current orchestrator state.""" 

252 return self._state 

253 

254 def initialize_microscope_handler(self): 

255 """Initializes the microscope handler.""" 

256 if self.microscope_handler is not None: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 logger.debug("Microscope handler already initialized.") 

258 return 

259# if self.input_dir is None: 

260# raise RuntimeError("Workspace (and input_dir) must be initialized before microscope handler.") 

261 

262 logger.info(f"Initializing microscope handler using input directory: {self.input_dir}...") 

263 try: 

264 # Use configured microscope type or auto-detect 

265 microscope_type = self.global_config.microscope.value if self.global_config.microscope != Microscope.AUTO else 'auto' 

266 self.microscope_handler = create_microscope_handler( 

267 plate_folder=str(self.plate_path), 

268 filemanager=self.filemanager, 

269 microscope_type=microscope_type, 

270 ) 

271 logger.info(f"Initialized microscope handler: {type(self.microscope_handler).__name__}") 

272 except Exception as e: 

273 error_msg = f"Failed to create microscope handler: {e}" 

274 logger.error(error_msg) 

275 raise RuntimeError(error_msg) from e 

276 

277 def initialize(self, workspace_path: Optional[Union[str, Path]] = None) -> 'PipelineOrchestrator': 

278 """ 

279 Initializes all required components for the orchestrator. 

280 Must be called before other processing methods. 

281 Returns self for chaining. 

282 """ 

283 if self._initialized: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 logger.info("Orchestrator already initialized.") 

285 return self 

286 

287 try: 

288 self.initialize_microscope_handler() 

289 

290 # Delegate workspace initialization to microscope handler 

291 logger.info("Initializing workspace with microscope handler...") 

292 actual_image_dir = self.microscope_handler.initialize_workspace( 

293 self.plate_path, workspace_path, self.filemanager 

294 ) 

295 

296 # Use the actual image directory returned by the microscope handler 

297 self.input_dir = Path(actual_image_dir) 

298 logger.info(f"Set input directory to: {self.input_dir}") 

299 

300 # Set workspace_path based on what the handler returned 

301 if actual_image_dir != self.plate_path: 301 ↛ 306line 301 didn't jump to line 306 because the condition on line 301 was always true

302 # Handler created a workspace 

303 self.workspace_path = Path(actual_image_dir).parent if Path(actual_image_dir).name != "workspace" else Path(actual_image_dir) 

304 else: 

305 # Handler used plate directly (like OpenHCS) 

306 self.workspace_path = None 

307 

308 # Mark as initialized BEFORE caching to avoid chicken-and-egg problem 

309 self._initialized = True 

310 self._state = OrchestratorState.READY 

311 

312 # Auto-cache component keys and metadata for instant access 

313 logger.info("Caching component keys and metadata...") 

314 self.cache_component_keys() 

315 self.cache_metadata() 

316 

317 logger.info("PipelineOrchestrator fully initialized with cached component keys and metadata.") 

318 return self 

319 except Exception as e: 

320 self._state = OrchestratorState.INIT_FAILED 

321 logger.error(f"Failed to initialize orchestrator: {e}") 

322 raise 

323 

324 def is_initialized(self) -> bool: 

325 return self._initialized 

326 

327 def create_context(self, well_id: str) -> ProcessingContext: 

328 """Creates a ProcessingContext for a given well.""" 

329 if not self.is_initialized(): 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 raise RuntimeError("Orchestrator must be initialized before calling create_context().") 

331 if not well_id: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 raise ValueError("Well identifier must be provided.") 

333 if self.input_dir is None: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 raise RuntimeError("Orchestrator input_dir is not set; initialize orchestrator first.") 

335 

336 context = ProcessingContext( 

337 global_config=self.global_config, 

338 well_id=well_id, 

339 filemanager=self.filemanager 

340 ) 

341 # Orchestrator reference removed - was orphaned and unpickleable 

342 context.microscope_handler = self.microscope_handler 

343 context.input_dir = self.input_dir 

344 context.workspace_path = self.workspace_path 

345 context.plate_path = self.plate_path # Add plate_path for path planner 

346 # Pass metadata cache for OpenHCS metadata creation 

347 context.metadata_cache = dict(self._metadata_cache) # Copy to avoid pickling issues 

348 return context 

349 

350 def compile_pipelines( 

351 self, 

352 pipeline_definition: List[AbstractStep], 

353 well_filter: Optional[List[str]] = None, 

354 enable_visualizer_override: bool = False 

355 ) -> Dict[str, ProcessingContext]: 

356 """ 

357 Compile-all phase: Prepares frozen ProcessingContexts for each well. 

358 

359 This method delegates to PipelineCompiler.compile_pipelines() to handle 

360 the actual compilation logic while providing orchestrator context. 

361 

362 Args: 

363 pipeline_definition: The list of AbstractStep objects defining the pipeline. 

364 well_filter: Optional list of well IDs to process. If None, processes all found wells. 

365 enable_visualizer_override: If True, all steps in all compiled contexts 

366 will have their 'visualize' flag set to True. 

367 

368 Returns: 

369 A dictionary mapping well IDs to their compiled and frozen ProcessingContexts. 

370 The input `pipeline_definition` list (of step objects) is modified in-place 

371 to become stateless. 

372 """ 

373 return PipelineCompiler.compile_pipelines( 

374 orchestrator=self, 

375 pipeline_definition=pipeline_definition, 

376 well_filter=well_filter, 

377 enable_visualizer_override=enable_visualizer_override 

378 ) 

379 

380 def _execute_single_well( 

381 self, 

382 pipeline_definition: List[AbstractStep], 

383 frozen_context: ProcessingContext, 

384 visualizer: Optional[NapariVisualizerType] 

385 ) -> Dict[str, Any]: 

386 """Executes the pipeline for a single well using its frozen context.""" 

387 well_id = frozen_context.well_id 

388 logger.info(f"🔥 SINGLE_WELL: Starting execution for well {well_id}") 

389 

390 # NUCLEAR VALIDATION 

391 if not frozen_context.is_frozen(): 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 error_msg = f"🔥 SINGLE_WELL ERROR: Context for well {well_id} is not frozen before execution" 

393 logger.error(error_msg) 

394 raise RuntimeError(error_msg) 

395 

396 if not pipeline_definition: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true

397 error_msg = f"🔥 SINGLE_WELL ERROR: Empty pipeline_definition for well {well_id}" 

398 logger.error(error_msg) 

399 raise RuntimeError(error_msg) 

400 

401 # MULTIPROCESSING FIX: Update step IDs after pickle/unpickle 

402 _ensure_step_ids_for_multiprocessing(frozen_context, pipeline_definition, well_id) 

403 

404 logger.info(f"🔥 SINGLE_WELL: Processing {len(pipeline_definition)} steps for well {well_id}") 

405 

406 for step_index, step in enumerate(pipeline_definition): 

407 # Generate step_id from object reference (elegant stateless approach) 

408 step_id = get_step_id(step) 

409 step_name = getattr(step, 'name', 'N/A') if hasattr(step, 'name') else 'N/A' 

410 

411 logger.info(f"🔥 SINGLE_WELL: Executing step {step_index+1}/{len(pipeline_definition)} - {step_id} ({step_name}) for well {well_id}") 

412 

413 if not hasattr(step, 'process'): 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true

414 error_msg = f"🔥 SINGLE_WELL ERROR: Step {step_id} missing process method for well {well_id}" 

415 logger.error(error_msg) 

416 raise RuntimeError(error_msg) 

417 

418 step.process(frozen_context) 

419 logger.info(f"🔥 SINGLE_WELL: Step {step_index+1}/{len(pipeline_definition)} - {step_id} completed for well {well_id}") 

420 

421 # except Exception as step_error: 

422 # import traceback 

423 # full_traceback = traceback.format_exc() 

424 # error_msg = f"🔥 SINGLE_WELL ERROR: Step {step_index+1} ({step_id}) failed for well {well_id}: {step_error}" 

425 # logger.error(error_msg, exc_info=True) 

426 # logger.error(f"🔥 SINGLE_WELL TRACEBACK for well {well_id}, step {step_index+1} ({step_id}):\n{full_traceback}") 

427 # raise RuntimeError(error_msg) from step_error 

428 

429 if visualizer: 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true

430 step_plan = frozen_context.step_plans[step_id] 

431 if step_plan['visualize']: 

432 output_dir = step_plan['output_dir'] 

433 write_backend = step_plan['write_backend'] 

434 if output_dir: 

435 logger.debug(f"Visualizing output for step {step_id} from path {output_dir} (backend: {write_backend}) for well {well_id}") 

436 visualizer.visualize_path( 

437 step_id=step_id, 

438 path=str(output_dir), 

439 backend=write_backend, 

440 well_id=well_id 

441 ) 

442 else: 

443 logger.warning(f"Step {step_id} in well {well_id} flagged for visualization but 'output_dir' is missing in its plan.") 

444 

445 logger.info(f"🔥 SINGLE_WELL: Pipeline execution completed successfully for well {well_id}") 

446 return {"status": "success", "well_id": well_id} 

447 

448 def execute_compiled_plate( 

449 self, 

450 pipeline_definition: List[AbstractStep], 

451 compiled_contexts: Dict[str, ProcessingContext], 

452 max_workers: Optional[int] = None, 

453 visualizer: Optional[NapariVisualizerType] = None, 

454 log_file_base: Optional[str] = None 

455 ) -> Dict[str, Dict[str, Any]]: 

456 """ 

457 Execute-all phase: Runs the stateless pipeline against compiled contexts. 

458 

459 Args: 

460 pipeline_definition: The stateless list of AbstractStep objects. 

461 compiled_contexts: Dict of well_id to its compiled, frozen ProcessingContext. 

462 Obtained from `compile_plate_for_processing`. 

463 max_workers: Maximum number of worker threads for parallel execution. 

464 visualizer: Optional instance of NapariStreamVisualizer for real-time visualization 

465 (requires napari to be installed; must be initialized with orchestrator's filemanager by the caller). 

466 log_file_base: Base path for worker process log files (without extension). 

467 Each worker will create its own log file: {log_file_base}_worker_{pid}.log 

468 

469 Returns: 

470 A dictionary mapping well IDs to their execution status (success/error and details). 

471 """ 

472 if not self.is_initialized(): 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true

473 raise RuntimeError("Orchestrator must be initialized before executing.") 

474 if not pipeline_definition: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true

475 raise ValueError("A valid (stateless) pipeline definition must be provided.") 

476 if not compiled_contexts: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true

477 logger.warning("No compiled contexts provided for execution.") 

478 return {} 

479 

480 actual_max_workers = max_workers if max_workers is not None else self.global_config.num_workers 

481 if actual_max_workers <= 0: # Ensure positive number of workers 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 actual_max_workers = 1 

483 

484 self._state = OrchestratorState.EXECUTING 

485 logger.info(f"Starting execution for {len(compiled_contexts)} wells with max_workers={actual_max_workers}.") 

486 

487 # 🔍 VRAM TRACKING: Log initial memory state 

488 try: 

489 from openhcs.core.memory.gpu_cleanup import log_gpu_memory_usage 

490 log_gpu_memory_usage("plate execution start") 

491 except Exception: 

492 pass 

493 

494 try: 

495 execution_results: Dict[str, Dict[str, Any]] = {} 

496 

497 # CUDA COMPATIBILITY: Set spawn method for multiprocessing to support CUDA 

498 try: 

499 # Check if spawn method is available and set it if not already set 

500 current_method = multiprocessing.get_start_method(allow_none=True) 

501 if current_method != 'spawn': 

502 logger.info(f"🔥 CUDA: Setting multiprocessing start method from '{current_method}' to 'spawn' for CUDA compatibility") 

503 multiprocessing.set_start_method('spawn', force=True) 

504 else: 

505 logger.debug("🔥 CUDA: Multiprocessing start method already set to 'spawn'") 

506 except RuntimeError as e: 

507 # Start method may already be set, which is fine 

508 logger.debug(f"🔥 CUDA: Start method already configured: {e}") 

509 

510 # Choose executor type based on global config for debugging support 

511 executor_type = "ThreadPoolExecutor" if self.global_config.use_threading else "ProcessPoolExecutor" 

512 logger.info(f"🔥 ORCHESTRATOR: Creating {executor_type} with {actual_max_workers} workers") 

513 

514 # DEATH DETECTION: Mark executor creation 

515 logger.info(f"🔥 DEATH_MARKER: BEFORE_{executor_type.upper()}_CREATION") 

516 

517 # Choose appropriate executor class and configure worker logging 

518 if self.global_config.use_threading: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true

519 logger.info("🔥 DEBUG MODE: Using ThreadPoolExecutor for easier debugging") 

520 executor = concurrent.futures.ThreadPoolExecutor(max_workers=actual_max_workers) 

521 else: 

522 logger.info("🔥 PRODUCTION MODE: Using ProcessPoolExecutor for true parallelism") 

523 if log_file_base: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true

524 logger.info(f"🔥 WORKER LOGGING: Configuring worker processes with log base: {log_file_base}") 

525 executor = concurrent.futures.ProcessPoolExecutor( 

526 max_workers=actual_max_workers, 

527 initializer=_configure_worker_logging, 

528 initargs=(log_file_base,) 

529 ) 

530 else: 

531 logger.info("🔥 WORKER LOGGING: No log base provided, workers will inherit logging") 

532 executor = concurrent.futures.ProcessPoolExecutor(max_workers=actual_max_workers) 

533 

534 logger.info(f"🔥 DEATH_MARKER: ENTERING_{executor_type.upper()}_CONTEXT") 

535 with executor: 

536 logger.info(f"🔥 DEATH_MARKER: {executor_type.upper()}_CREATED_SUCCESSFULLY") 

537 logger.info(f"🔥 ORCHESTRATOR: {executor_type} created, submitting {len(compiled_contexts)} tasks") 

538 

539 # NUCLEAR ERROR TRACING: Create snapshot of compiled_contexts to prevent iteration issues 

540 contexts_snapshot = dict(compiled_contexts.items()) 

541 logger.info(f"🔥 ORCHESTRATOR: Created contexts snapshot with {len(contexts_snapshot)} items") 

542 

543 logger.info("🔥 DEATH_MARKER: BEFORE_TASK_SUBMISSION_LOOP") 

544 future_to_well_id = {} 

545 for well_id, context in contexts_snapshot.items(): 

546 try: 

547 logger.info(f"🔥 DEATH_MARKER: SUBMITTING_TASK_FOR_WELL_{well_id}") 

548 logger.info(f"🔥 ORCHESTRATOR: Submitting task for well {well_id}") 

549 future = executor.submit(self._execute_single_well, pipeline_definition, context, visualizer) 

550 future_to_well_id[future] = well_id 

551 logger.info(f"🔥 ORCHESTRATOR: Task submitted for well {well_id}") 

552 logger.info(f"🔥 DEATH_MARKER: TASK_SUBMITTED_FOR_WELL_{well_id}") 

553 except Exception as submit_error: 

554 error_msg = f"🔥 ORCHESTRATOR ERROR: Failed to submit task for well {well_id}: {submit_error}" 

555 logger.error(error_msg, exc_info=True) 

556 # FAIL-FAST: Re-raise task submission errors immediately 

557 raise 

558 

559 logger.info("🔥 DEATH_MARKER: TASK_SUBMISSION_LOOP_COMPLETED") 

560 

561 logger.info(f"🔥 ORCHESTRATOR: All {len(future_to_well_id)} tasks submitted, waiting for completion") 

562 logger.info("🔥 DEATH_MARKER: BEFORE_COMPLETION_LOOP") 

563 

564 completed_count = 0 

565 logger.info("🔥 DEATH_MARKER: ENTERING_AS_COMPLETED_LOOP") 

566 for future in concurrent.futures.as_completed(future_to_well_id): 

567 well_id = future_to_well_id[future] 

568 completed_count += 1 

569 logger.info(f"🔥 DEATH_MARKER: PROCESSING_COMPLETED_TASK_{completed_count}_WELL_{well_id}") 

570 logger.info(f"🔥 ORCHESTRATOR: Task {completed_count}/{len(future_to_well_id)} completed for well {well_id}") 

571 

572 try: 

573 logger.info(f"🔥 DEATH_MARKER: CALLING_FUTURE_RESULT_FOR_WELL_{well_id}") 

574 result = future.result() 

575 logger.info(f"🔥 DEATH_MARKER: FUTURE_RESULT_SUCCESS_FOR_WELL_{well_id}") 

576 logger.info(f"🔥 ORCHESTRATOR: Well {well_id} result: {result}") 

577 execution_results[well_id] = result 

578 logger.info(f"🔥 DEATH_MARKER: RESULT_STORED_FOR_WELL_{well_id}") 

579 except Exception as exc: 

580 import traceback 

581 full_traceback = traceback.format_exc() 

582 error_msg = f"Well {well_id} generated an exception during execution: {exc}" 

583 logger.error(f"🔥 ORCHESTRATOR ERROR: {error_msg}", exc_info=True) 

584 logger.error(f"🔥 ORCHESTRATOR FULL TRACEBACK for well {well_id}:\n{full_traceback}") 

585 # FAIL-FAST: Re-raise immediately instead of storing error 

586 raise 

587 

588 logger.info("🔥 DEATH_MARKER: COMPLETION_LOOP_FINISHED") 

589 

590 logger.info(f"🔥 ORCHESTRATOR: All tasks completed, {len(execution_results)} results collected") 

591 

592 

593 # 🔥 GPU CLEANUP: Clear GPU memory after plate execution 

594 try: 

595 from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks 

596 cleanup_all_gpu_frameworks() 

597 logger.debug("🔥 GPU CLEANUP: Cleared all GPU frameworks after plate execution") 

598 except Exception as cleanup_error: 

599 logger.warning(f"Failed to cleanup GPU memory after plate execution: {cleanup_error}") 

600 

601 

602 

603 logger.info(f"🔥 ORCHESTRATOR: Plate execution completed, checking for analysis consolidation") 

604 # Run automatic analysis consolidation if enabled 

605 if self.global_config.analysis_consolidation.enabled: 605 ↛ 648line 605 didn't jump to line 648 because the condition on line 605 was always true

606 try: 

607 from openhcs.processing.backends.analysis.consolidate_analysis_results import consolidate_analysis_results 

608 

609 # Get results directory from compiled contexts (Option 2: use existing paths) 

610 results_dir = None 

611 for well_id, context in compiled_contexts.items(): 

612 # Look for any step that has an output_dir - this is where materialization happens 

613 for step_id, step_plan in context.step_plans.items(): 

614 if 'output_dir' in step_plan: 614 ↛ 613line 614 didn't jump to line 613 because the condition on line 614 was always true

615 # Found an output directory, check if it has a results subdirectory 

616 potential_results_dir = Path(step_plan['output_dir']) / self.global_config.materialization_results_path 

617 if potential_results_dir.exists(): 617 ↛ 618line 617 didn't jump to line 618 because the condition on line 617 was never true

618 results_dir = potential_results_dir 

619 logger.info(f"🔍 CONSOLIDATION: Found results directory from step {step_id}: {results_dir}") 

620 break 

621 if results_dir: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true

622 break 

623 

624 if results_dir and results_dir.exists(): 624 ↛ 626line 624 didn't jump to line 626 because the condition on line 624 was never true

625 # Check if there are actually CSV files (materialized results) 

626 csv_files = list(results_dir.glob("*.csv")) 

627 if csv_files: 

628 logger.info(f"🔄 CONSOLIDATION: Found {len(csv_files)} CSV files, running consolidation") 

629 # Get well IDs from compiled contexts 

630 well_ids = list(compiled_contexts.keys()) 

631 logger.info(f"🔄 CONSOLIDATION: Using well IDs: {well_ids}") 

632 

633 consolidate_analysis_results( 

634 results_directory=str(results_dir), 

635 well_ids=well_ids, 

636 consolidation_config=self.global_config.analysis_consolidation, 

637 plate_metadata_config=self.global_config.plate_metadata 

638 ) 

639 logger.info("✅ CONSOLIDATION: Completed successfully") 

640 else: 

641 logger.info(f"⏭️ CONSOLIDATION: No CSV files found in {results_dir}, skipping") 

642 else: 

643 logger.info(f"⏭️ CONSOLIDATION: No results directory found in compiled contexts") 

644 except Exception as e: 

645 logger.error(f"❌ CONSOLIDATION: Failed: {e}") 

646 

647 # Update state based on execution results 

648 if all(result.get("status") == "success" for result in execution_results.values()): 648 ↛ 651line 648 didn't jump to line 651 because the condition on line 648 was always true

649 self._state = OrchestratorState.COMPLETED 

650 else: 

651 self._state = OrchestratorState.EXEC_FAILED 

652 

653 logger.info(f"🔥 ORCHESTRATOR: Plate execution finished. Results: {execution_results}") 

654 

655 return execution_results 

656 except Exception as e: 

657 self._state = OrchestratorState.EXEC_FAILED 

658 logger.error(f"Failed to execute compiled plate: {e}") 

659 raise 

660 

661 def get_component_keys(self, group_by: GroupBy, component_filter: Optional[List[Union[str, int]]] = None) -> List[str]: 

662 """ 

663 Generic method to get component keys for any group_by type. 

664 

665 This method works with any GroupBy enum value (CHANNEL, SITE, Z_INDEX, WELL) 

666 and returns the discovered component values as strings to match the pattern 

667 detection system format. 

668 

669 Tries metadata cache first, falls back to filename parsing cache if metadata is empty. 

670 

671 Args: 

672 group_by: GroupBy enum specifying which component to extract 

673 component_filter: Optional list of component values to filter by 

674 

675 Returns: 

676 List of component values as strings, sorted 

677 

678 Raises: 

679 RuntimeError: If orchestrator is not initialized 

680 """ 

681 if not self.is_initialized(): 681 ↛ 682line 681 didn't jump to line 682 because the condition on line 681 was never true

682 raise RuntimeError("Orchestrator must be initialized before getting component keys.") 

683 

684 # Try metadata cache first (preferred source) 

685 if group_by in self._metadata_cache and self._metadata_cache[group_by]: 

686 all_components = list(self._metadata_cache[group_by].keys()) 

687 logger.debug(f"Using metadata cache for {group_by.value}: {len(all_components)} components") 

688 else: 

689 # Fall back to filename parsing cache 

690 if group_by not in self._component_keys_cache: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true

691 raise RuntimeError(f"Component keys cache is empty for {group_by.value}. " 

692 f"Ensure cache_component_keys() was called during initialization.") 

693 

694 all_components = self._component_keys_cache[group_by] 

695 

696 if not all_components: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true

697 component_name = group_by.value 

698 logger.warning(f"No {component_name} values found in input directory: {self.input_dir}") 

699 return [] 

700 

701 logger.debug(f"Using filename parsing cache for {group_by.value}: {len(all_components)} components") 

702 

703 if component_filter: 

704 str_component_filter = {str(c) for c in component_filter} 

705 selected_components = [comp for comp in all_components if comp in str_component_filter] 

706 if not selected_components: 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true

707 component_name = group_by.value 

708 logger.warning(f"No {component_name} values from {all_components} match the filter: {component_filter}") 

709 return selected_components 

710 else: 

711 return all_components 

712 

713 def cache_component_keys(self, components: Optional[List[GroupBy]] = None) -> None: 

714 """ 

715 Pre-compute and cache component keys for fast access using single-pass parsing. 

716 

717 This method performs expensive file listing and parsing operations once, 

718 extracting all component types in a single pass for maximum efficiency. 

719 

720 Args: 

721 components: Optional list of GroupBy components to cache. 

722 If None, caches all components in the GroupBy enum. 

723 """ 

724 if not self.is_initialized(): 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true

725 raise RuntimeError("Orchestrator must be initialized before caching component keys.") 

726 

727 if components is None: 727 ↛ 730line 727 didn't jump to line 730 because the condition on line 727 was always true

728 components = list(GroupBy) # Cache all enum values 

729 

730 logger.info(f"Caching component keys for: {[comp.value for comp in components]}") 

731 

732 # Initialize component sets for all requested components 

733 component_sets: Dict[GroupBy, Set[Union[str, int]]] = {} 

734 for group_by in components: 

735 if group_by != GroupBy.NONE: # Skip the empty enum 

736 component_sets[group_by] = set() 

737 

738 # Single pass through all filenames - extract all components at once 

739 try: 

740 filenames = self.filemanager.list_files(str(self.input_dir), Backend.DISK.value, extensions=DEFAULT_IMAGE_EXTENSIONS) 

741 logger.debug(f"Parsing {len(filenames)} filenames in single pass...") 

742 

743 for filename in filenames: 

744 parsed_info = self.microscope_handler.parser.parse_filename(str(filename)) 

745 if parsed_info: 745 ↛ 752line 745 didn't jump to line 752 because the condition on line 745 was always true

746 # Extract all requested components from this filename 

747 for group_by in component_sets: 

748 component_name = group_by.value 

749 if component_name in parsed_info and parsed_info[component_name] is not None: 749 ↛ 747line 749 didn't jump to line 747 because the condition on line 749 was always true

750 component_sets[group_by].add(parsed_info[component_name]) 

751 else: 

752 logger.warning(f"Could not parse filename: {filename}") 

753 

754 except Exception as e: 

755 logger.error(f"Error listing files or parsing filenames from {self.input_dir}: {e}", exc_info=True) 

756 # Initialize empty sets for failed parsing 

757 for group_by in component_sets: 

758 component_sets[group_by] = set() 

759 

760 # Convert sets to sorted lists and store in cache 

761 for group_by, component_set in component_sets.items(): 

762 sorted_components = [str(comp) for comp in sorted(list(component_set))] 

763 self._component_keys_cache[group_by] = sorted_components 

764 logger.debug(f"Cached {len(sorted_components)} {group_by.value} keys") 

765 

766 if not sorted_components: 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true

767 logger.warning(f"No {group_by.value} values found in input directory: {self.input_dir}") 

768 

769 logger.info(f"Component key caching complete. Cached {len(component_sets)} component types in single pass.") 

770 

771 def clear_component_cache(self, components: Optional[List[GroupBy]] = None) -> None: 

772 """ 

773 Clear cached component keys to force recomputation. 

774 

775 Use this when the input directory contents have changed and you need 

776 to refresh the component key cache. 

777 

778 Args: 

779 components: Optional list of GroupBy components to clear from cache. 

780 If None, clears entire cache. 

781 """ 

782 if components is None: 

783 self._component_keys_cache.clear() 

784 logger.info("Cleared entire component keys cache") 

785 else: 

786 for group_by in components: 

787 if group_by in self._component_keys_cache: 

788 del self._component_keys_cache[group_by] 

789 logger.debug(f"Cleared cache for {group_by.value}") 

790 logger.info(f"Cleared cache for {len(components)} component types") 

791 

792 def cache_metadata(self) -> None: 

793 """ 

794 Cache all metadata from metadata handler for fast access. 

795 

796 This method calls the metadata handler's parse_metadata() method once 

797 and stores the results for instant access to component key→name mappings. 

798 Call this after orchestrator initialization to enable metadata-based 

799 component names. 

800 """ 

801 if not self.is_initialized() or self.input_dir is None or self.microscope_handler is None: 801 ↛ 802line 801 didn't jump to line 802 because the condition on line 801 was never true

802 raise RuntimeError("Orchestrator must be initialized before caching metadata.") 

803 

804 try: 

805 # Parse all metadata once using enum→method mapping 

806 # Use plate_path for metadata loading since metadata files are in plate root 

807 metadata = self.microscope_handler.metadata_handler.parse_metadata(self.plate_path) 

808 

809 # Initialize all GroupBy components with component keys mapped to None 

810 for group_by in [GroupBy.CHANNEL, GroupBy.WELL, GroupBy.SITE, GroupBy.Z_INDEX]: 

811 # Get all component keys for this GroupBy from filename parsing 

812 component_keys = self.get_component_keys(group_by) 

813 # Create dict mapping each key to None (no metadata available) 

814 self._metadata_cache[group_by] = {key: None for key in component_keys} 

815 

816 # Update with actual metadata from metadata handler where available 

817 for component_name, mapping in metadata.items(): 

818 try: 

819 group_by = GroupBy(component_name) # Convert string to enum 

820 # For OpenHCS plates, metadata keys might be the only source of component keys 

821 # Merge metadata keys with any existing component keys from filename parsing 

822 if group_by in self._metadata_cache: 822 ↛ 833line 822 didn't jump to line 833 because the condition on line 822 was always true

823 # Start with existing component keys (from filename parsing) 

824 combined_cache = self._metadata_cache[group_by].copy() 

825 # Add any metadata keys that weren't found in filename parsing 

826 for metadata_key in mapping.keys(): 

827 if metadata_key not in combined_cache: 827 ↛ 828line 827 didn't jump to line 828 because the condition on line 827 was never true

828 combined_cache[metadata_key] = None 

829 # Update with actual metadata values 

830 combined_cache.update(mapping) 

831 self._metadata_cache[group_by] = combined_cache 

832 else: 

833 self._metadata_cache[group_by] = mapping 

834 logger.debug(f"Updated metadata for {group_by.value}: {len(mapping)} entries with real data") 

835 except ValueError: 

836 logger.warning(f"Unknown component type in metadata: {component_name}") 

837 

838 # Log what we have for each component 

839 for group_by in [GroupBy.CHANNEL, GroupBy.WELL, GroupBy.SITE, GroupBy.Z_INDEX]: 

840 mapping = self._metadata_cache[group_by] 

841 real_metadata_count = sum(1 for v in mapping.values() if v is not None) 

842 total_keys = len(mapping) 

843 logger.debug(f"Cached {group_by.value}: {total_keys} keys, {real_metadata_count} with metadata") 

844 

845 logger.info(f"Metadata caching complete. All {len(self._metadata_cache)} component types populated.") 

846 

847 except Exception as e: 

848 logger.warning(f"Could not cache metadata: {e}") 

849 # Don't fail - metadata is optional enhancement 

850 

851 def get_component_metadata(self, group_by: GroupBy, key: str) -> Optional[str]: 

852 """ 

853 Get metadata display name for a specific component key. 

854 

855 Args: 

856 group_by: GroupBy enum specifying component type 

857 key: Component key (e.g., "1", "2", "A01") 

858 

859 Returns: 

860 Display name from metadata, or None if not available 

861 Example: get_component_metadata(GroupBy.CHANNEL, "1") → "HOECHST 33342" 

862 """ 

863 if group_by in self._metadata_cache: 

864 return self._metadata_cache[group_by].get(key) 

865 return None 

866 

867 def clear_metadata_cache(self) -> None: 

868 """ 

869 Clear cached metadata to force recomputation. 

870 

871 Use this when the input directory contents have changed and you need 

872 to refresh the metadata cache. 

873 """ 

874 self._metadata_cache.clear() 

875 logger.info("Cleared metadata cache") 

876 

877 async def apply_new_global_config(self, new_config: GlobalPipelineConfig): 

878 """ 

879 Apply global configuration and rebuild orchestrator-specific config if needed. 

880 

881 This method: 

882 1. Updates the global config reference 

883 2. Rebuilds any existing orchestrator-specific config to reference the new global config 

884 3. Preserves all user-set field values while updating lazy resolution defaults 

885 4. Re-initializes components that depend on config (if already initialized) 

886 """ 

887 from openhcs.core.config import GlobalPipelineConfig as GlobalPipelineConfigType 

888 if not isinstance(new_config, GlobalPipelineConfigType): 

889 raise TypeError(f"Expected GlobalPipelineConfig, got {type(new_config)}") 

890 

891 old_global_config = self.global_config 

892 self.global_config = new_config 

893 

894 # Rebuild orchestrator-specific config if it exists 

895 if self.pipeline_config is not None: 

896 from openhcs.core.lazy_config import rebuild_lazy_config_with_new_global_reference 

897 self.pipeline_config = rebuild_lazy_config_with_new_global_reference( 

898 self.pipeline_config, 

899 new_config, 

900 GlobalPipelineConfigType 

901 ) 

902 logger.info(f"Rebuilt orchestrator-specific config for plate: {self.plate_path}") 

903 

904 # Update thread-local storage to reflect the new effective configuration 

905 from openhcs.core.config import set_current_global_config 

906 effective_config = self.get_effective_config() 

907 set_current_global_config(GlobalPipelineConfigType, effective_config) 

908 

909 # Re-initialize components that depend on config if orchestrator was already initialized 

910 if self.is_initialized(): 

911 logger.info(f"Re-initializing orchestrator components for plate: {self.plate_path}") 

912 try: 

913 # Reset initialization state to allow re-initialization 

914 self._initialized = False 

915 self._state = OrchestratorState.CREATED 

916 

917 # Re-initialize with new config 

918 self.initialize() 

919 logger.info(f"Successfully re-initialized orchestrator for plate: {self.plate_path}") 

920 except Exception as e: 

921 logger.error(f"Failed to re-initialize orchestrator for plate {self.plate_path}: {e}") 

922 self._state = OrchestratorState.INIT_FAILED 

923 raise 

924 

925 def apply_pipeline_config(self, pipeline_config: PipelineConfig) -> None: 

926 """ 

927 Apply per-orchestrator configuration - affects only this orchestrator. 

928 Does not modify global configuration or affect other orchestrators. 

929 """ 

930 if not isinstance(pipeline_config, PipelineConfig): 

931 raise TypeError(f"Expected PipelineConfig, got {type(pipeline_config)}") 

932 self.pipeline_config = pipeline_config 

933 

934 

935 

936 # Update thread-local storage to reflect the new effective configuration 

937 # This ensures MaterializationPathConfig uses the updated defaults 

938 from openhcs.core.config import set_current_global_config, GlobalPipelineConfig 

939 effective_config = self.get_effective_config() 

940 set_current_global_config(GlobalPipelineConfig, effective_config) 

941 

942 def get_effective_config(self) -> GlobalPipelineConfig: 

943 """Get effective configuration for this orchestrator.""" 

944 if self.pipeline_config: 

945 return self.pipeline_config.to_base_config() 

946 return self.global_config 

947 

948 def clear_pipeline_config(self) -> None: 

949 """Clear per-orchestrator configuration.""" 

950 self.pipeline_config = None 

951 logger.info(f"Cleared per-orchestrator config for plate: {self.plate_path}") 

952 

953 # Update thread-local storage to reflect global config 

954 from openhcs.core.config import set_current_global_config, GlobalPipelineConfig 

955 set_current_global_config(GlobalPipelineConfig, self.global_config)