Coverage for openhcs/textual_tui/subprocess_runner.py: 0.0%

474 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1#!/usr/bin/env python3 

2""" 

3OpenHCS Subprocess Runner 

4 

5Standalone script that runs OpenHCS plate processing in a clean subprocess environment. 

6This mimics the integration test pattern from test_main.py but runs independently. 

7 

8Usage: 

9 python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id] 

10""" 

11 

12import sys 

13import json 

14import dill as pickle 

15import logging 

16import traceback 

17import signal 

18import atexit 

19import os 

20from pathlib import Path 

21from typing import Dict, List, Any 

22 

23# Enable subprocess mode - this single variable controls all subprocess behavior 

24os.environ['OPENHCS_SUBPROCESS_MODE'] = '1' 

25 

26# Initialize function registry for subprocess workers 

27def _initialize_subprocess_registry(): 

28 """Initialize function registry optimized for subprocess workers.""" 

29 import openhcs.processing.func_registry as func_registry_module 

30 

31 with func_registry_module._registry_lock: 

32 if not func_registry_module._registry_initialized: 

33 # Initialize empty registry structure 

34 func_registry_module.FUNC_REGISTRY.clear() 

35 for memory_type in func_registry_module.VALID_MEMORY_TYPES: 

36 func_registry_module.FUNC_REGISTRY[memory_type] = [] 

37 func_registry_module._registry_initialized = True 

38 

39 # Register external libraries using cached metadata (fast) 

40 func_registry_module._register_external_libraries() 

41 

42_initialize_subprocess_registry() 

43 

44def setup_subprocess_logging(log_file_path: str): 

45 """Set up dedicated logging for the subprocess - all logs go to the specified file.""" 

46 

47 # Configure root logger to capture ALL logs from subprocess and OpenHCS modules 

48 root_logger = logging.getLogger() 

49 root_logger.handlers.clear() # Clear any existing handlers 

50 

51 # Create file handler for subprocess logs 

52 file_handler = logging.FileHandler(log_file_path) 

53 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) 

54 root_logger.addHandler(file_handler) 

55 root_logger.setLevel(logging.INFO) 

56 

57 # Ensure all OpenHCS module logs are captured 

58 logging.getLogger("openhcs").setLevel(logging.INFO) 

59 

60 # Prevent console output - everything goes to file 

61 logging.basicConfig = lambda *args, **kwargs: None 

62 

63 # Get subprocess logger 

64 logger = logging.getLogger("openhcs.subprocess") 

65 logger.info("SUBPROCESS: Logging configured") 

66 

67 return logger 

68 

69# Status and result files removed - log file is single source of truth 

70 

71def run_single_plate(plate_path: str, pipeline_steps: List, global_config, 

72 logger, log_file_base: str = None): 

73 """ 

74 Run a single plate using the integration test pattern. 

75 

76 This follows the exact same pattern as test_main.py: 

77 1. Initialize GPU registry 

78 2. Create orchestrator and initialize 

79 3. Get wells and compile pipelines 

80 4. Execute compiled plate 

81 """ 

82 import psutil 

83 import os 

84 

85 def log_thread_count(step_name): 

86 thread_count = psutil.Process(os.getpid()).num_threads() 

87 logger.info(f"🔥 SUBPROCESS: THREAD COUNT at {step_name}: {thread_count}") 

88 print(f"🔥 SUBPROCESS STDOUT: THREAD COUNT at {step_name}: {thread_count}") 

89 return thread_count 

90 

91 # NUCLEAR ERROR DETECTION: Wrap EVERYTHING in try/except 

92 def force_error_detection(func_name, func, *args, **kwargs): 

93 """Wrapper that forces any error to be visible and logged.""" 

94 try: 

95 logger.info(f"🔥 SUBPROCESS: CALLING {func_name} with args={len(args)}, kwargs={len(kwargs)}") 

96 print(f"🔥 SUBPROCESS STDOUT: CALLING {func_name}") 

97 

98 # DEATH DETECTION: Mark entry into function (log file only) 

99 logger.info(f"🔥 SUBPROCESS: ENTERING: {func_name}") 

100 

101 result = func(*args, **kwargs) 

102 

103 # DEATH DETECTION: Mark successful completion (log file only) 

104 logger.info(f"🔥 SUBPROCESS: COMPLETED: {func_name}") 

105 

106 logger.info(f"🔥 SUBPROCESS: {func_name} COMPLETED successfully") 

107 print(f"🔥 SUBPROCESS STDOUT: {func_name} COMPLETED") 

108 return result 

109 except Exception as e: 

110 error_msg = f"🔥 NUCLEAR ERROR in {func_name}: {e}" 

111 logger.error(error_msg, exc_info=True) 

112 print(f"🔥 SUBPROCESS STDOUT NUCLEAR ERROR: {error_msg}") 

113 print(f"🔥 SUBPROCESS STDOUT NUCLEAR TRACEBACK: {traceback.format_exc()}") 

114 # Error logged to log file (single source of truth) 

115 raise RuntimeError(f"FORCED ERROR DETECTION: {func_name} failed: {e}") from e 

116 except BaseException as e: 

117 error_msg = f"🔥 NUCLEAR CRITICAL ERROR in {func_name}: {e}" 

118 logger.error(error_msg, exc_info=True) 

119 print(f"🔥 SUBPROCESS STDOUT NUCLEAR CRITICAL: {error_msg}") 

120 print(f"🔥 SUBPROCESS STDOUT NUCLEAR CRITICAL TRACEBACK: {traceback.format_exc()}") 

121 # Error logged to log file (single source of truth) 

122 raise RuntimeError(f"FORCED CRITICAL ERROR DETECTION: {func_name} failed: {e}") from e 

123 

124 # DEATH DETECTION: Progress markers to find where process dies 

125 def death_marker(location, details=""): 

126 """Mark progress to detect where process dies (log file only).""" 

127 marker_msg = f"🔥 DEATH_MARKER: {location} - {details}" 

128 logger.info(marker_msg) 

129 print(marker_msg) 

130 

131 try: 

132 death_marker("FUNCTION_START", f"plate_path={plate_path}") 

133 log_thread_count("function start") 

134 

135 death_marker("BEFORE_STARTING_LOG") 

136 logger.info(f"SUBPROCESS: Starting plate {plate_path}") 

137 

138 death_marker("BEFORE_STATUS_WRITE") 

139 logger.info(f"🔥 SUBPROCESS: STARTING plate {plate_path}") 

140 death_marker("AFTER_STATUS_WRITE") 

141 

142 log_thread_count("after status write") 

143 

144 # Step 1: Initialize GPU registry (like test_main.py) 

145 death_marker("STEP1_START", "GPU registry initialization") 

146 logger.info("SUBPROCESS: Initializing GPU registry") 

147 

148 death_marker("BEFORE_GPU_IMPORT") 

149 log_thread_count("before GPU scheduler import") 

150 

151 # NUCLEAR WRAP: GPU scheduler import 

152 def import_gpu_scheduler(): 

153 from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry 

154 return setup_global_gpu_registry 

155 setup_global_gpu_registry = force_error_detection("import_gpu_scheduler", import_gpu_scheduler) 

156 death_marker("AFTER_GPU_IMPORT") 

157 

158 log_thread_count("after GPU scheduler import") 

159 

160 death_marker("BEFORE_CONFIG_IMPORT") 

161 # NUCLEAR WRAP: Config import 

162 def import_config(): 

163 from openhcs.core.config import GlobalPipelineConfig, PathPlanningConfig, VFSConfig 

164 from openhcs.constants import Microscope 

165 return GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope 

166 GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope = force_error_detection("import_config", import_config) 

167 death_marker("AFTER_CONFIG_IMPORT") 

168 

169 log_thread_count("after config import") 

170 

171 # Global config is already a proper object from pickle - no reconstruction needed! 

172 log_thread_count("using pickled global config") 

173 logger.info(f"🔥 SUBPROCESS: Using pickled global config: {type(global_config)}") 

174 logger.info(f"🔥 SUBPROCESS: Zarr compressor: {global_config.zarr.compressor.value}") 

175 log_thread_count("after global config validation") 

176 

177 # NUCLEAR WRAP: GPU registry setup 

178 force_error_detection("setup_global_gpu_registry", setup_global_gpu_registry, global_config=global_config) 

179 

180 log_thread_count("after GPU registry setup") 

181 logger.info("SUBPROCESS: GPU registry initialized") 

182 

183 # PROCESS-LEVEL CUDA STREAM SETUP for true parallelism 

184 logger.info("🔥 SUBPROCESS: Setting up process-specific CUDA streams...") 

185 try: 

186 import os 

187 process_id = os.getpid() 

188 

189 # Set unique CUDA stream for this process based on PID 

190 try: 

191 import torch 

192 if torch.cuda.is_available(): 

193 # Create process-specific stream 

194 torch.cuda.set_device(0) # Use GPU 0 

195 process_stream = torch.cuda.Stream() 

196 torch.cuda.set_stream(process_stream) 

197 logger.info(f"🔥 SUBPROCESS: Created PyTorch CUDA stream for process {process_id}") 

198 except ImportError: 

199 logger.debug("PyTorch not available for stream setup") 

200 

201 try: 

202 import cupy as cp 

203 if cp.cuda.is_available(): 

204 # Create process-specific stream 

205 cp.cuda.Device(0).use() # Use GPU 0 

206 process_stream = cp.cuda.Stream() 

207 cp.cuda.Stream.null = process_stream # Set as default stream 

208 logger.info(f"🔥 SUBPROCESS: Created CuPy CUDA stream for process {process_id}") 

209 except ImportError: 

210 logger.debug("CuPy not available for stream setup") 

211 

212 except Exception as stream_error: 

213 logger.warning(f"🔥 SUBPROCESS: Could not set up process streams: {stream_error}") 

214 # Continue anyway - not critical 

215 

216 # Step 2: Create orchestrator and initialize (like test_main.py) 

217 logger.info("🔥 SUBPROCESS: Creating orchestrator...") 

218 

219 log_thread_count("before orchestrator import") 

220 

221 # NUCLEAR WRAP: Orchestrator import 

222 def import_orchestrator(): 

223 from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator 

224 return PipelineOrchestrator 

225 PipelineOrchestrator = force_error_detection("import_orchestrator", import_orchestrator) 

226 

227 log_thread_count("after orchestrator import") 

228 

229 # NUCLEAR WRAP: Storage registry import 

230 def import_storage_registry(): 

231 from openhcs.io.base import storage_registry 

232 return storage_registry 

233 storage_registry = force_error_detection("import_storage_registry", import_storage_registry) 

234 

235 log_thread_count("after storage registry import") 

236 

237 # NUCLEAR WRAP: Function registry import (CRITICAL for auto-discovered functions) 

238 def import_function_registry(): 

239 from openhcs.processing.func_registry import FUNC_REGISTRY 

240 return FUNC_REGISTRY 

241 FUNC_REGISTRY = force_error_detection("import_function_registry", import_function_registry) 

242 

243 log_thread_count("after function registry import") 

244 logger.info("SUBPROCESS: Function registry initialized") 

245 

246 

247 

248 log_thread_count("before orchestrator creation") 

249 

250 # NUCLEAR WRAP: Orchestrator creation 

251 orchestrator = force_error_detection("PipelineOrchestrator_creation", PipelineOrchestrator, 

252 plate_path=plate_path, 

253 global_config=global_config, 

254 storage_registry=storage_registry # Use default registry 

255 ) 

256 log_thread_count("after orchestrator creation") 

257 

258 # NUCLEAR WRAP: Orchestrator initialization 

259 force_error_detection("orchestrator_initialize", orchestrator.initialize) 

260 log_thread_count("after orchestrator initialization") 

261 logger.info("🔥 SUBPROCESS: Orchestrator initialized!") 

262 

263 # Step 3: Get wells and prepare pipeline (like test_main.py) 

264 # NUCLEAR WRAP: Get wells 

265 from openhcs.constants.constants import GroupBy 

266 wells = force_error_detection("orchestrator_get_wells", lambda: orchestrator.get_component_keys(GroupBy.WELL)) 

267 logger.info(f"🔥 SUBPROCESS: Found {len(wells)} wells: {wells}") 

268 

269 # AGGRESSIVE VALIDATION: Check wells 

270 if not wells: 

271 error_msg = "🔥 CRITICAL: No wells found by orchestrator!" 

272 logger.error(error_msg) 

273 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

274 raise RuntimeError(error_msg) 

275 if not isinstance(wells, list): 

276 error_msg = f"🔥 CRITICAL: Wells is not a list: {type(wells)} = {wells}" 

277 logger.error(error_msg) 

278 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

279 raise RuntimeError(error_msg) 

280 

281 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(pipeline_steps)} steps") 

282 logger.info(f"🔥 SUBPROCESS: COMPILING plate {plate_path}") 

283 

284 # Step 4: Compilation phase (like test_main.py) 

285 logger.info("🔥 SUBPROCESS: Starting compilation phase...") 

286 

287 # Make fresh copy of pipeline steps and fix IDs (like TUI does) 

288 import copy 

289 from openhcs.constants.constants import VariableComponents 

290 execution_pipeline = copy.deepcopy(pipeline_steps) 

291 

292 # Fix step IDs after deep copy to match new object IDs 

293 for step in execution_pipeline: 

294 step.step_id = str(id(step)) 

295 # Ensure variable_components is never None 

296 if step.variable_components is None: 

297 logger.warning(f"🔥 Step '{step.name}' has None variable_components, setting default") 

298 step.variable_components = [VariableComponents.SITE] 

299 elif not step.variable_components: 

300 logger.warning(f"🔥 Step '{step.name}' has empty variable_components, setting default") 

301 step.variable_components = [VariableComponents.SITE] 

302 

303 compiled_contexts = orchestrator.compile_pipelines( 

304 pipeline_definition=execution_pipeline, 

305 well_filter=wells 

306 ) 

307 logger.info("🔥 SUBPROCESS: Compilation completed!") 

308 

309 # Verify compilation (like test_main.py) 

310 if not compiled_contexts: 

311 raise RuntimeError("🔥 COMPILATION FAILED: No compiled contexts returned!") 

312 if len(compiled_contexts) != len(wells): 

313 raise RuntimeError(f"🔥 COMPILATION FAILED: Expected {len(wells)} contexts, got {len(compiled_contexts)}") 

314 logger.info(f"🔥 SUBPROCESS: Compilation SUCCESS: {len(compiled_contexts)} contexts compiled") 

315 logger.info(f"🔥 SUBPROCESS: EXECUTING plate {plate_path}") 

316 

317 # Step 5: Execution phase with multiprocessing (like test_main.py but with processes) 

318 logger.info("🔥 SUBPROCESS: Starting execution phase with multiprocessing...") 

319 

320 # Use global config num_workers setting 

321 max_workers = global_config.num_workers 

322 logger.info(f"🔥 SUBPROCESS: Using {max_workers} workers from global config for {len(wells)} wells") 

323 

324 # This is where hangs often occur - add extra monitoring 

325 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...") 

326 

327 # Add GPU memory monitoring before execution 

328 try: 

329 import torch 

330 if torch.cuda.is_available(): 

331 gpu_mem_before = torch.cuda.memory_allocated() / 1024**3 

332 gpu_mem_reserved = torch.cuda.memory_reserved() / 1024**3 

333 logger.info(f"🔥 SUBPROCESS: GPU memory before execution - Allocated: {gpu_mem_before:.2f}GB, Reserved: {gpu_mem_reserved:.2f}GB") 

334 except Exception as e: 

335 logger.warning(f"🔥 SUBPROCESS: Could not check GPU memory: {e}") 

336 

337 # Let's debug what's actually happening - use normal threading 

338 logger.info("🔥 SUBPROCESS: Starting execution with detailed monitoring...") 

339 

340 # Create a custom progress callback to see exactly where it hangs 

341 def progress_callback(well_id, step_name, status): 

342 logger.info(f"🔥 SUBPROCESS: PROGRESS - Well {well_id}, Step '{step_name}', Status: {status}") 

343 

344 # Add monitoring without timeout 

345 import threading 

346 

347 # Start monitoring thread 

348 monitoring_active = threading.Event() 

349 monitoring_active.set() 

350 

351 def monitor_execution(): 

352 count = 0 

353 while monitoring_active.is_set(): 

354 count += 1 

355 logger.info(f"🔥 SUBPROCESS: MONITOR #{count} - Still executing, checking GPU memory...") 

356 

357 try: 

358 import torch 

359 if torch.cuda.is_available(): 

360 allocated = torch.cuda.memory_allocated() / 1024**3 

361 reserved = torch.cuda.memory_reserved() / 1024**3 

362 logger.info(f"🔥 SUBPROCESS: GPU Memory - Allocated: {allocated:.2f}GB, Reserved: {reserved:.2f}GB") 

363 except: 

364 pass 

365 

366 # Check if we can get thread info 

367 try: 

368 import threading 

369 active_threads = threading.active_count() 

370 logger.info(f"🔥 SUBPROCESS: Active threads: {active_threads}") 

371 except: 

372 pass 

373 

374 # Log progress every 30 seconds with system info 

375 if count % 6 == 0: 

376 logger.info(f"🔥 SUBPROCESS: PROGRESS - Been running for {count*5} seconds, still executing...") 

377 print(f"🔥 SUBPROCESS STDOUT: PROGRESS - {count*5} seconds elapsed") 

378 

379 # Add system monitoring to catch resource issues 

380 try: 

381 import psutil 

382 import os 

383 

384 # Memory info 

385 memory = psutil.virtual_memory() 

386 swap = psutil.swap_memory() 

387 process = psutil.Process(os.getpid()) 

388 

389 logger.info(f"🔥 SUBPROCESS: SYSTEM - RAM: {memory.percent:.1f}% used, {memory.available/1024**3:.1f}GB free") 

390 logger.info(f"🔥 SUBPROCESS: SYSTEM - Swap: {swap.percent:.1f}% used") 

391 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process RAM: {process.memory_info().rss/1024**3:.1f}GB") 

392 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process threads: {process.num_threads()}") 

393 

394 print(f"🔥 SUBPROCESS STDOUT: RAM {memory.percent:.1f}%, Process {process.memory_info().rss/1024**3:.1f}GB, Threads {process.num_threads()}") 

395 

396 # Check for memory pressure 

397 if memory.percent > 90: 

398 logger.error(f"🔥 SUBPROCESS: WARNING - High memory usage: {memory.percent:.1f}%") 

399 print(f"🔥 SUBPROCESS STDOUT: HIGH MEMORY WARNING: {memory.percent:.1f}%") 

400 

401 if process.memory_info().rss > 16 * 1024**3: # 16GB 

402 logger.error(f"🔥 SUBPROCESS: WARNING - Process using {process.memory_info().rss/1024**3:.1f}GB") 

403 print(f"🔥 SUBPROCESS STDOUT: HIGH PROCESS MEMORY: {process.memory_info().rss/1024**3:.1f}GB") 

404 

405 except Exception as e: 

406 logger.debug(f"Could not get system info: {e}") 

407 

408 threading.Event().wait(5) # Wait 5 seconds (more frequent) 

409 

410 monitor_thread = threading.Thread(target=monitor_execution, daemon=True) 

411 monitor_thread.start() 

412 

413 try: 

414 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...") 

415 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(execution_pipeline)} steps") 

416 logger.info(f"🔥 SUBPROCESS: Compiled contexts for {len(compiled_contexts)} wells") 

417 logger.info("🔥 SUBPROCESS: Calling execute_compiled_plate NOW...") 

418 

419 log_thread_count("before execute_compiled_plate") 

420 

421 # PRE-EXECUTION STATE VALIDATION 

422 logger.info("🔥 SUBPROCESS: PRE-EXECUTION VALIDATION...") 

423 print("🔥 SUBPROCESS STDOUT: PRE-EXECUTION VALIDATION...") 

424 

425 if not hasattr(orchestrator, 'execute_compiled_plate'): 

426 error_msg = "🔥 CRITICAL: orchestrator missing execute_compiled_plate method!" 

427 logger.error(error_msg) 

428 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

429 raise RuntimeError(error_msg) 

430 

431 if execution_pipeline is None: 

432 error_msg = "🔥 CRITICAL: execution_pipeline is None!" 

433 logger.error(error_msg) 

434 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

435 raise RuntimeError(error_msg) 

436 

437 if compiled_contexts is None: 

438 error_msg = "🔥 CRITICAL: compiled_contexts is None!" 

439 logger.error(error_msg) 

440 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

441 raise RuntimeError(error_msg) 

442 

443 logger.info(f"🔥 SUBPROCESS: PRE-EXECUTION OK - pipeline:{len(execution_pipeline)}, contexts:{len(compiled_contexts)}") 

444 print(f"🔥 SUBPROCESS STDOUT: PRE-EXECUTION OK - pipeline:{len(execution_pipeline)}, contexts:{len(compiled_contexts)}") 

445 

446 # NUCLEAR EXECUTION WRAPPER: Force any error to surface 

447 death_marker("BEFORE_EXECUTION_CALL", f"pipeline_steps={len(execution_pipeline)}, contexts={len(compiled_contexts)}") 

448 logger.info("🔥 SUBPROCESS: CALLING NUCLEAR EXECUTION WRAPPER...") 

449 print("🔥 SUBPROCESS STDOUT: CALLING NUCLEAR EXECUTION WRAPPER...") 

450 

451 death_marker("ENTERING_FORCE_ERROR_DETECTION") 

452 results = force_error_detection("execute_compiled_plate", orchestrator.execute_compiled_plate, 

453 pipeline_definition=execution_pipeline, 

454 compiled_contexts=compiled_contexts, 

455 max_workers=max_workers, # Use global config num_workers setting 

456 visualizer=None, # No visualization in subprocess 

457 log_file_base=log_file_base # Pass log base for worker process logging 

458 ) 

459 death_marker("AFTER_FORCE_ERROR_DETECTION", f"results_type={type(results)}") 

460 

461 logger.info("🔥 SUBPROCESS: NUCLEAR EXECUTION WRAPPER RETURNED!") 

462 print("🔥 SUBPROCESS STDOUT: NUCLEAR EXECUTION WRAPPER RETURNED!") 

463 death_marker("EXECUTION_WRAPPER_RETURNED") 

464 

465 log_thread_count("after execute_compiled_plate") 

466 

467 logger.info("🔥 SUBPROCESS: execute_compiled_plate RETURNED successfully!") 

468 logger.info(f"🔥 SUBPROCESS: Results: {type(results)}, length: {len(results) if results else 'None'}") 

469 

470 # FORCE ERROR DETECTION: Check for None results immediately 

471 if results is None: 

472 error_msg = "🔥 CRITICAL: execute_compiled_plate returned None - this should never happen!" 

473 logger.error(error_msg) 

474 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

475 raise RuntimeError(error_msg) 

476 

477 except Exception as execution_error: 

478 # FORCE ERROR PROPAGATION: Re-raise with enhanced context 

479 error_msg = f"🔥 EXECUTION ERROR in execute_compiled_plate: {execution_error}" 

480 logger.error(error_msg, exc_info=True) 

481 print(f"🔥 SUBPROCESS STDOUT EXECUTION ERROR: {error_msg}") 

482 print(f"🔥 SUBPROCESS STDOUT EXECUTION TRACEBACK: {traceback.format_exc()}") 

483 raise RuntimeError(error_msg) from execution_error 

484 finally: 

485 monitoring_active.clear() # Stop monitoring 

486 

487 logger.info("🔥 SUBPROCESS: Execution completed!") 

488 

489 # AGGRESSIVE RESULT VALIDATION: Force errors to surface 

490 logger.info("🔥 SUBPROCESS: Starting aggressive result validation...") 

491 

492 # Check 1: Results exist 

493 if not results: 

494 error_msg = "🔥 EXECUTION FAILED: No results returned from execute_compiled_plate!" 

495 logger.error(error_msg) 

496 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

497 raise RuntimeError(error_msg) 

498 

499 # Check 2: Results is a dictionary 

500 if not isinstance(results, dict): 

501 error_msg = f"🔥 EXECUTION FAILED: Results is not a dict, got {type(results)}: {results}" 

502 logger.error(error_msg) 

503 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

504 raise RuntimeError(error_msg) 

505 

506 # Check 3: Expected number of results 

507 if len(results) != len(wells): 

508 error_msg = f"🔥 EXECUTION FAILED: Expected {len(wells)} results, got {len(results)}. Wells: {wells}, Result keys: {list(results.keys())}" 

509 logger.error(error_msg) 

510 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

511 raise RuntimeError(error_msg) 

512 

513 # Check 4: All wells have results 

514 missing_wells = set(wells) - set(results.keys()) 

515 if missing_wells: 

516 error_msg = f"🔥 EXECUTION FAILED: Missing results for wells: {missing_wells}" 

517 logger.error(error_msg) 

518 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

519 raise RuntimeError(error_msg) 

520 

521 # Check 5: All results have proper structure and check for errors 

522 failed_wells = [] 

523 for well_id, result in results.items(): 

524 logger.info(f"🔥 SUBPROCESS: Validating result for well {well_id}: {result}") 

525 

526 if not isinstance(result, dict): 

527 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} is not a dict: {type(result)} = {result}" 

528 logger.error(error_msg) 

529 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

530 raise RuntimeError(error_msg) 

531 

532 if 'status' not in result: 

533 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} missing 'status' field: {result}" 

534 logger.error(error_msg) 

535 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

536 raise RuntimeError(error_msg) 

537 

538 if result.get('status') != 'success': 

539 error_msg = result.get('error_message', 'Unknown error') 

540 details = result.get('details', 'No details') 

541 full_error = f"🔥 EXECUTION FAILED for well {well_id}: {error_msg} | Details: {details}" 

542 logger.error(full_error) 

543 print(f"🔥 SUBPROCESS STDOUT ERROR: {full_error}") 

544 failed_wells.append((well_id, error_msg, details)) 

545 

546 # Check 6: Raise if any wells failed 

547 if failed_wells: 

548 error_summary = f"🔥 EXECUTION FAILED: {len(failed_wells)} wells failed out of {len(wells)}" 

549 for well_id, error_msg, details in failed_wells: 

550 error_summary += f"\n - Well {well_id}: {error_msg}" 

551 logger.error(error_summary) 

552 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_summary}") 

553 raise RuntimeError(error_summary) 

554 

555 logger.info(f"🔥 SUBPROCESS: EXECUTION SUCCESS: {len(results)} wells executed successfully") 

556 

557 # Success logged to log file (single source of truth) 

558 logger.info(f"🔥 SUBPROCESS: COMPLETED plate {plate_path} with {len(results)} results") 

559 

560 logger.info(f"🔥 SUBPROCESS: Plate {plate_path} completed successfully") 

561 

562 except Exception as e: 

563 error_msg = f"Execution failed for plate {plate_path}: {e}" 

564 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True) 

565 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

566 print(f"🔥 SUBPROCESS STDOUT TRACEBACK: {traceback.format_exc()}") 

567 # Error logged to log file (single source of truth) 

568 except BaseException as e: 

569 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc. 

570 error_msg = f"CRITICAL failure for plate {plate_path}: {e}" 

571 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True) 

572 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

573 print(f"🔥 SUBPROCESS STDOUT CRITICAL TRACEBACK: {traceback.format_exc()}") 

574 # Error logged to log file (single source of truth) 

575 

576def main(): 

577 """Main entry point for subprocess runner.""" 

578 if len(sys.argv) < 3 or len(sys.argv) > 4: 

579 print("Usage: python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id]") 

580 sys.exit(1) 

581 

582 data_file = sys.argv[1] 

583 log_file_base = sys.argv[2] 

584 unique_id = sys.argv[3] if len(sys.argv) == 4 else None 

585 

586 # Build log file name from provided base and unique ID 

587 if unique_id: 

588 log_file = f"{log_file_base}_{unique_id}.log" 

589 else: 

590 log_file = f"{log_file_base}.log" 

591 

592 # PROCESS GROUP CLEANUP: Create new process group to manage all child processes 

593 try: 

594 import os 

595 import signal 

596 

597 # Create new process group with this process as leader 

598 os.setpgrp() # Create new process group 

599 process_group_id = os.getpgrp() 

600 

601 print(f"🔥 SUBPROCESS: Created process group {process_group_id}") 

602 

603 # Track all child processes for cleanup 

604 child_processes = set() 

605 

606 def kill_all_children(): 

607 """Kill all child processes and the entire process group.""" 

608 try: 

609 print(f"🔥 SUBPROCESS: Killing process group {process_group_id}") 

610 # Kill entire process group (negative PID kills process group) 

611 os.killpg(process_group_id, signal.SIGTERM) 

612 

613 # Give processes time to exit gracefully 

614 import time 

615 time.sleep(2) 

616 

617 # Force kill if still alive 

618 try: 

619 os.killpg(process_group_id, signal.SIGKILL) 

620 except ProcessLookupError: 

621 pass # Already dead 

622 

623 print(f"🔥 SUBPROCESS: Process group {process_group_id} terminated") 

624 except Exception as e: 

625 print(f"🔥 SUBPROCESS: Error killing process group: {e}") 

626 

627 # Register cleanup function 

628 atexit.register(kill_all_children) 

629 

630 except Exception as e: 

631 print(f"🔥 SUBPROCESS: Warning - Could not set up process group cleanup: {e}") 

632 

633 # Set up logging first 

634 logger = setup_subprocess_logging(log_file) 

635 logger.info("🔥 SUBPROCESS: Starting OpenHCS subprocess runner") 

636 logger.info(f"🔥 SUBPROCESS: Args - data: {data_file}, log: {log_file}") 

637 logger.info(f"🔥 SUBPROCESS: Log file: {log_file}") 

638 

639 # DEATH DETECTION: Set up heartbeat monitoring 

640 import threading 

641 import time 

642 

643 def heartbeat_monitor(): 

644 """Monitor that writes heartbeats to detect where process dies.""" 

645 heartbeat_count = 0 

646 while True: 

647 try: 

648 heartbeat_count += 1 

649 heartbeat_msg = f"🔥 SUBPROCESS HEARTBEAT #{heartbeat_count}: Process alive at {time.time()}" 

650 logger.info(heartbeat_msg) 

651 print(heartbeat_msg) 

652 

653 # Heartbeat logged to log file (single source of truth) 

654 # No separate heartbeat file needed 

655 

656 time.sleep(2) # Heartbeat every 2 seconds 

657 except Exception as monitor_error: 

658 logger.error(f"🔥 SUBPROCESS: Heartbeat monitor error: {monitor_error}") 

659 print(f"🔥 SUBPROCESS STDOUT: Heartbeat monitor error: {monitor_error}") 

660 break 

661 

662 # Start heartbeat monitor in daemon thread 

663 heartbeat_thread = threading.Thread(target=heartbeat_monitor, daemon=True) 

664 heartbeat_thread.start() 

665 logger.info("🔥 SUBPROCESS: Heartbeat monitor started") 

666 

667 # NUCLEAR CRASH DETECTION - catch EVERYTHING 

668 def crash_handler(signum, frame): 

669 crash_msg = f"🔥 SUBPROCESS: CRASH DETECTED - Signal {signum} received!" 

670 logger.error(crash_msg) 

671 print(f"🔥 SUBPROCESS STDOUT CRASH: {crash_msg}") 

672 

673 # Crash info logged to log file (single source of truth) 

674 

675 # Dump stack trace 

676 try: 

677 import traceback 

678 import threading 

679 logger.error("🔥 SUBPROCESS: CRASH - Dumping all thread stacks...") 

680 for thread_id, frame in sys._current_frames().items(): 

681 logger.error(f"🔥 SUBPROCESS: CRASH Thread {thread_id}:") 

682 traceback.print_stack(frame) 

683 except: 

684 pass 

685 

686 # Force exit 

687 os._exit(1) 

688 

689 # Set up signal handlers for all possible crashes 

690 signal.signal(signal.SIGSEGV, crash_handler) # Segmentation fault 

691 signal.signal(signal.SIGABRT, crash_handler) # Abort 

692 signal.signal(signal.SIGFPE, crash_handler) # Floating point exception 

693 signal.signal(signal.SIGILL, crash_handler) # Illegal instruction 

694 signal.signal(signal.SIGTERM, crash_handler) # Termination 

695 signal.signal(signal.SIGINT, crash_handler) # Interrupt (Ctrl+C) 

696 

697 # Set up atexit handler to catch silent deaths 

698 def exit_handler(): 

699 logger.error("🔥 SUBPROCESS: ATEXIT - Process is exiting!") 

700 print("🔥 SUBPROCESS STDOUT: ATEXIT - Process is exiting!") 

701 # Exit info logged to log file (single source of truth) 

702 

703 atexit.register(exit_handler) 

704 

705 # Set up debug signal handler 

706 def debug_handler(signum, frame): 

707 logger.error("🔥 SUBPROCESS: SIGUSR1 received - dumping stack trace") 

708 import traceback 

709 import threading 

710 

711 # Dump all thread stacks 

712 for thread_id, frame in sys._current_frames().items(): 

713 logger.error(f"🔥 SUBPROCESS: Thread {thread_id} stack:") 

714 traceback.print_stack(frame) 

715 

716 # Log thread info 

717 for thread in threading.enumerate(): 

718 logger.error(f"🔥 SUBPROCESS: Thread: {thread.name}, alive: {thread.is_alive()}") 

719 

720 signal.signal(signal.SIGUSR1, debug_handler) 

721 logger.info("🔥 SUBPROCESS: NUCLEAR CRASH DETECTION ENABLED - All signals monitored") 

722 

723 try: 

724 # Load pickled data 

725 logger.info(f"🔥 SUBPROCESS: Loading data from {data_file}") 

726 with open(data_file, 'rb') as f: 

727 data = pickle.load(f) 

728 

729 plate_paths = data['plate_paths'] 

730 pipeline_data = data['pipeline_data'] # Dict[plate_path, List[FunctionStep]] 

731 global_config = data['global_config'] 

732 

733 logger.info(f"🔥 SUBPROCESS: Loaded data for {len(plate_paths)} plates") 

734 logger.info(f"🔥 SUBPROCESS: Plates: {plate_paths}") 

735 

736 # Process each plate (like test_main.py but for multiple plates) 

737 for plate_path in plate_paths: 

738 pipeline_steps = pipeline_data[plate_path] 

739 logger.info(f"🔥 SUBPROCESS: Processing plate {plate_path} with {len(pipeline_steps)} steps") 

740 

741 run_single_plate( 

742 plate_path=plate_path, 

743 pipeline_steps=pipeline_steps, 

744 global_config=global_config, 

745 logger=logger, 

746 log_file_base=log_file_base 

747 ) 

748 

749 logger.info("🔥 SUBPROCESS: All plates completed successfully") 

750 

751 except Exception as e: 

752 logger.error(f"🔥 SUBPROCESS: Fatal error: {e}", exc_info=True) 

753 print(f"🔥 SUBPROCESS STDOUT FATAL: {e}") 

754 print(f"🔥 SUBPROCESS STDOUT FATAL TRACEBACK: {traceback.format_exc()}") 

755 # Error logged to log file (single source of truth) 

756 logger.error(f"🔥 SUBPROCESS: Fatal error for all plates: {e}") 

757 sys.exit(1) 

758 except BaseException as e: 

759 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc. 

760 logger.error(f"🔥 SUBPROCESS: CRITICAL SYSTEM ERROR: {e}", exc_info=True) 

761 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM: {e}") 

762 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM TRACEBACK: {traceback.format_exc()}") 

763 # Critical error logged to log file (single source of truth) 

764 logger.error(f"🔥 SUBPROCESS: Critical system error for all plates: {e}") 

765 sys.exit(2) 

766 

767if __name__ == "__main__": 

768 main()