Coverage for openhcs/textual_tui/subprocess_runner.py: 0.0%

406 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1#!/usr/bin/env python3 

2""" 

3OpenHCS Subprocess Runner 

4 

5Standalone script that runs OpenHCS plate processing in a clean subprocess environment. 

6This mimics the integration test pattern from test_main.py but runs independently. 

7 

8Usage: 

9 python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id] 

10""" 

11 

12import sys 

13import json 

14import dill as pickle 

15import logging 

16import traceback 

17import signal 

18import atexit 

19import os 

20from pathlib import Path 

21from typing import Dict, List, Any 

22 

23logger = logging.getLogger(__name__) 

24 

25# Enable subprocess mode - this single variable controls all subprocess behavior 

26os.environ['OPENHCS_SUBPROCESS_MODE'] = '1' 

27 

28# Prevent GPU library imports in subprocess runner - workers will handle GPU initialization 

29os.environ['OPENHCS_SUBPROCESS_NO_GPU'] = '1' 

30 

31# Subprocess runner doesn't need function registry at all 

32# Workers will initialize their own function registries when needed 

33logger.info("Subprocess runner: No function registry needed - workers will handle initialization") 

34 

35def setup_subprocess_logging(log_file_path: str): 

36 """Set up dedicated logging for the subprocess - all logs go to the specified file.""" 

37 

38 # Configure root logger to capture ALL logs from subprocess and OpenHCS modules 

39 root_logger = logging.getLogger() 

40 root_logger.handlers.clear() # Clear any existing handlers 

41 

42 # Create file handler for subprocess logs 

43 file_handler = logging.FileHandler(log_file_path) 

44 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) 

45 root_logger.addHandler(file_handler) 

46 root_logger.setLevel(logging.INFO) 

47 

48 # Ensure all OpenHCS module logs are captured 

49 logging.getLogger("openhcs").setLevel(logging.INFO) 

50 

51 # Prevent console output - everything goes to file 

52 logging.basicConfig = lambda *args, **kwargs: None 

53 

54 # Get subprocess logger 

55 logger = logging.getLogger("openhcs.subprocess") 

56 logger.info("SUBPROCESS: Logging configured") 

57 

58 return logger 

59 

60# Status and result files removed - log file is single source of truth 

61 

62def run_single_plate(plate_path: str, pipeline_definition: List, compiled_contexts: Dict, 

63 global_config, logger, log_file_base: str = None, effective_config=None): 

64 """ 

65 Run a single plate using pre-compiled contexts from UI. 

66 

67 This follows the pattern: 

68 1. Initialize GPU registry 

69 2. Create orchestrator and initialize 

70 3. Execute pre-compiled plate (no compilation needed) 

71 """ 

72 import psutil 

73 import os 

74 

75 def log_thread_count(step_name): 

76 thread_count = psutil.Process(os.getpid()).num_threads() 

77 logger.info(f"🔥 SUBPROCESS: THREAD COUNT at {step_name}: {thread_count}") 

78 print(f"🔥 SUBPROCESS STDOUT: THREAD COUNT at {step_name}: {thread_count}") 

79 return thread_count 

80 

81 # NUCLEAR ERROR DETECTION: Wrap EVERYTHING in try/except 

82 def force_error_detection(func_name, func, *args, **kwargs): 

83 """Wrapper that forces any error to be visible and logged.""" 

84 try: 

85 logger.info(f"🔥 SUBPROCESS: CALLING {func_name} with args={len(args)}, kwargs={len(kwargs)}") 

86 print(f"🔥 SUBPROCESS STDOUT: CALLING {func_name}") 

87 

88 # DEATH DETECTION: Mark entry into function (log file only) 

89 logger.info(f"🔥 SUBPROCESS: ENTERING: {func_name}") 

90 

91 result = func(*args, **kwargs) 

92 

93 # DEATH DETECTION: Mark successful completion (log file only) 

94 logger.info(f"🔥 SUBPROCESS: COMPLETED: {func_name}") 

95 

96 logger.info(f"🔥 SUBPROCESS: {func_name} COMPLETED successfully") 

97 print(f"🔥 SUBPROCESS STDOUT: {func_name} COMPLETED") 

98 return result 

99 except Exception as e: 

100 error_msg = f"🔥 NUCLEAR ERROR in {func_name}: {e}" 

101 logger.error(error_msg, exc_info=True) 

102 print(f"🔥 SUBPROCESS STDOUT NUCLEAR ERROR: {error_msg}") 

103 print(f"🔥 SUBPROCESS STDOUT NUCLEAR TRACEBACK: {traceback.format_exc()}") 

104 # Error logged to log file (single source of truth) 

105 raise RuntimeError(f"FORCED ERROR DETECTION: {func_name} failed: {e}") from e 

106 except BaseException as e: 

107 error_msg = f"🔥 NUCLEAR CRITICAL ERROR in {func_name}: {e}" 

108 logger.error(error_msg, exc_info=True) 

109 print(f"🔥 SUBPROCESS STDOUT NUCLEAR CRITICAL: {error_msg}") 

110 print(f"🔥 SUBPROCESS STDOUT NUCLEAR CRITICAL TRACEBACK: {traceback.format_exc()}") 

111 # Error logged to log file (single source of truth) 

112 raise RuntimeError(f"FORCED CRITICAL ERROR DETECTION: {func_name} failed: {e}") from e 

113 

114 # DEATH DETECTION: Progress markers to find where process dies 

115 def death_marker(location, details=""): 

116 """Mark progress to detect where process dies (log file only).""" 

117 marker_msg = f"🔥 DEATH_MARKER: {location} - {details}" 

118 logger.info(marker_msg) 

119 print(marker_msg) 

120 

121 try: 

122 death_marker("FUNCTION_START", f"plate_path={plate_path}") 

123 log_thread_count("function start") 

124 

125 death_marker("BEFORE_STARTING_LOG") 

126 logger.info(f"SUBPROCESS: Starting plate {plate_path}") 

127 

128 death_marker("BEFORE_STATUS_WRITE") 

129 logger.info(f"🔥 SUBPROCESS: STARTING plate {plate_path}") 

130 death_marker("AFTER_STATUS_WRITE") 

131 

132 log_thread_count("after status write") 

133 

134 # Step 1: Validate global config (GPU registry will be initialized by workers) 

135 death_marker("STEP1_START", "Global config validation") 

136 logger.info("SUBPROCESS: Validating global config (GPU registry initialization deferred to workers)") 

137 

138 death_marker("BEFORE_CONFIG_IMPORT") 

139 # NUCLEAR WRAP: Config import 

140 def import_config(): 

141 from openhcs.core.config import GlobalPipelineConfig, PathPlanningConfig, VFSConfig 

142 from openhcs.constants import Microscope 

143 return GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope 

144 GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope = force_error_detection("import_config", import_config) 

145 death_marker("AFTER_CONFIG_IMPORT") 

146 

147 log_thread_count("after config import") 

148 

149 # Global config is already a proper object from pickle - no reconstruction needed! 

150 log_thread_count("using pickled global config") 

151 logger.info(f"🔥 SUBPROCESS: Using pickled global config: {type(global_config)}") 

152 logger.info(f"🔥 SUBPROCESS: Zarr compressor: {global_config.zarr_config.compressor.value}") 

153 log_thread_count("after global config validation") 

154 

155 logger.info("SUBPROCESS: Global config validated - GPU registry and CUDA streams will be initialized by workers") 

156 

157 # Step 2: Create orchestrator and initialize (like test_main.py) 

158 logger.info("🔥 SUBPROCESS: Creating orchestrator...") 

159 

160 log_thread_count("before orchestrator import") 

161 

162 # NUCLEAR WRAP: Orchestrator import 

163 def import_orchestrator(): 

164 from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator 

165 return PipelineOrchestrator 

166 PipelineOrchestrator = force_error_detection("import_orchestrator", import_orchestrator) 

167 

168 log_thread_count("after orchestrator import") 

169 

170 # NUCLEAR WRAP: Storage registry import (lazy initialization for subprocess mode) 

171 def import_storage_registry(): 

172 from openhcs.io.base import storage_registry, ensure_storage_registry 

173 ensure_storage_registry() # Ensure registry is created in subprocess mode 

174 return storage_registry 

175 storage_registry = force_error_detection("import_storage_registry", import_storage_registry) 

176 

177 log_thread_count("after storage registry import") 

178 

179 # Skip function registry import in subprocess runner - workers will initialize their own 

180 logger.info("SUBPROCESS: Function registry initialization deferred to workers") 

181 

182 

183 

184 log_thread_count("before orchestrator creation") 

185 

186 # NUCLEAR WRAP: Set up global config context (required before orchestrator creation) 

187 def setup_global_context(): 

188 from openhcs.config_framework.lazy_factory import ensure_global_config_context 

189 from openhcs.core.config import GlobalPipelineConfig 

190 ensure_global_config_context(GlobalPipelineConfig, global_config) 

191 force_error_detection("setup_global_context", setup_global_context) 

192 

193 # NUCLEAR WRAP: Orchestrator creation 

194 orchestrator = force_error_detection("PipelineOrchestrator_creation", PipelineOrchestrator, 

195 plate_path=plate_path, 

196 storage_registry=storage_registry # Use default registry 

197 ) 

198 log_thread_count("after orchestrator creation") 

199 

200 # NUCLEAR WRAP: Orchestrator initialization 

201 force_error_detection("orchestrator_initialize", orchestrator.initialize) 

202 log_thread_count("after orchestrator initialization") 

203 logger.info("🔥 SUBPROCESS: Orchestrator initialized!") 

204 

205 # Step 3: Use wells from pre-compiled contexts (not rediscovery) 

206 # The UI already compiled contexts for the specific wells in this plate 

207 wells = list(compiled_contexts.keys()) 

208 logger.info(f"🔥 SUBPROCESS: Using pre-compiled wells for plate {plate_path}: {wells}") 

209 logger.info(f"🔥 SUBPROCESS: Found {len(wells)} wells from compiled contexts") 

210 

211 # AGGRESSIVE VALIDATION: Check wells from compiled contexts 

212 if not wells: 

213 error_msg = f"🔥 CRITICAL: No wells found in compiled contexts for plate {plate_path}!" 

214 logger.error(error_msg) 

215 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

216 raise RuntimeError(error_msg) 

217 if not isinstance(wells, list): 

218 error_msg = f"🔥 CRITICAL: Wells is not a list: {type(wells)} = {wells}" 

219 logger.error(error_msg) 

220 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

221 raise RuntimeError(error_msg) 

222 

223 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(pipeline_definition)} steps") 

224 logger.info(f"🔥 SUBPROCESS: Using pre-compiled contexts from UI") 

225 logger.info(f"🔥 SUBPROCESS: EXECUTING plate {plate_path}") 

226 

227 # Step 5: Execution phase with multiprocessing (like test_main.py but with processes) 

228 logger.info("🔥 SUBPROCESS: Starting execution phase with multiprocessing...") 

229 

230 # Use effective config passed from UI (includes pipeline config) instead of global config 

231 config_to_use = effective_config if effective_config is not None else global_config 

232 max_workers = config_to_use.num_workers 

233 logger.info(f"🔥 SUBPROCESS: Using {max_workers} workers from {'effective' if effective_config else 'global'} config for {len(wells)} wells") 

234 

235 # This is where hangs often occur - add extra monitoring 

236 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...") 

237 

238 # Skip GPU memory monitoring in subprocess runner - workers will handle GPU monitoring 

239 logger.info("🔥 SUBPROCESS: GPU memory monitoring deferred to workers") 

240 

241 # Let's debug what's actually happening - use normal threading 

242 logger.info("🔥 SUBPROCESS: Starting execution with detailed monitoring...") 

243 

244 # Create a custom progress callback to see exactly where it hangs 

245 def progress_callback(well_id, step_name, status): 

246 logger.info(f"🔥 SUBPROCESS: PROGRESS - Well {well_id}, Step '{step_name}', Status: {status}") 

247 

248 # Add monitoring without timeout 

249 import threading 

250 

251 # Start monitoring thread 

252 monitoring_active = threading.Event() 

253 monitoring_active.set() 

254 

255 def monitor_execution(): 

256 count = 0 

257 while monitoring_active.is_set(): 

258 count += 1 

259 logger.info(f"🔥 SUBPROCESS: MONITOR #{count} - Still executing...") 

260 

261 # Skip GPU memory monitoring in subprocess runner 

262 logger.info("🔥 SUBPROCESS: GPU memory monitoring deferred to workers") 

263 

264 # Check if we can get thread info 

265 try: 

266 import threading 

267 active_threads = threading.active_count() 

268 logger.info(f"🔥 SUBPROCESS: Active threads: {active_threads}") 

269 except: 

270 pass 

271 

272 # Log progress every 30 seconds with system info 

273 if count % 6 == 0: 

274 logger.info(f"🔥 SUBPROCESS: PROGRESS - Been running for {count*5} seconds, still executing...") 

275 print(f"🔥 SUBPROCESS STDOUT: PROGRESS - {count*5} seconds elapsed") 

276 

277 # Add system monitoring to catch resource issues 

278 try: 

279 import psutil 

280 import os 

281 

282 # Memory info 

283 memory = psutil.virtual_memory() 

284 swap = psutil.swap_memory() 

285 process = psutil.Process(os.getpid()) 

286 

287 logger.info(f"🔥 SUBPROCESS: SYSTEM - RAM: {memory.percent:.1f}% used, {memory.available/1024**3:.1f}GB free") 

288 logger.info(f"🔥 SUBPROCESS: SYSTEM - Swap: {swap.percent:.1f}% used") 

289 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process RAM: {process.memory_info().rss/1024**3:.1f}GB") 

290 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process threads: {process.num_threads()}") 

291 

292 print(f"🔥 SUBPROCESS STDOUT: RAM {memory.percent:.1f}%, Process {process.memory_info().rss/1024**3:.1f}GB, Threads {process.num_threads()}") 

293 

294 # Check for memory pressure 

295 if memory.percent > 90: 

296 logger.error(f"🔥 SUBPROCESS: WARNING - High memory usage: {memory.percent:.1f}%") 

297 print(f"🔥 SUBPROCESS STDOUT: HIGH MEMORY WARNING: {memory.percent:.1f}%") 

298 

299 if process.memory_info().rss > 16 * 1024**3: # 16GB 

300 logger.error(f"🔥 SUBPROCESS: WARNING - Process using {process.memory_info().rss/1024**3:.1f}GB") 

301 print(f"🔥 SUBPROCESS STDOUT: HIGH PROCESS MEMORY: {process.memory_info().rss/1024**3:.1f}GB") 

302 

303 except Exception as e: 

304 logger.debug(f"Could not get system info: {e}") 

305 

306 threading.Event().wait(5) # Wait 5 seconds (more frequent) 

307 

308 monitor_thread = threading.Thread(target=monitor_execution, daemon=True) 

309 monitor_thread.start() 

310 

311 try: 

312 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...") 

313 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(pipeline_definition)} steps") 

314 logger.info(f"🔥 SUBPROCESS: Compiled contexts for {len(compiled_contexts)} wells") 

315 logger.info("🔥 SUBPROCESS: Calling execute_compiled_plate NOW...") 

316 

317 log_thread_count("before execute_compiled_plate") 

318 

319 # PRE-EXECUTION STATE VALIDATION 

320 logger.info("🔥 SUBPROCESS: PRE-EXECUTION VALIDATION...") 

321 print("🔥 SUBPROCESS STDOUT: PRE-EXECUTION VALIDATION...") 

322 

323 if not hasattr(orchestrator, 'execute_compiled_plate'): 

324 error_msg = "🔥 CRITICAL: orchestrator missing execute_compiled_plate method!" 

325 logger.error(error_msg) 

326 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

327 raise RuntimeError(error_msg) 

328 

329 if pipeline_definition is None: 

330 error_msg = "🔥 CRITICAL: pipeline_definition is None!" 

331 logger.error(error_msg) 

332 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

333 raise RuntimeError(error_msg) 

334 

335 if compiled_contexts is None: 

336 error_msg = "🔥 CRITICAL: compiled_contexts is None!" 

337 logger.error(error_msg) 

338 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

339 raise RuntimeError(error_msg) 

340 

341 logger.info(f"🔥 SUBPROCESS: PRE-EXECUTION OK - pipeline:{len(pipeline_definition)}, contexts:{len(compiled_contexts)}") 

342 print(f"🔥 SUBPROCESS STDOUT: PRE-EXECUTION OK - pipeline:{len(pipeline_definition)}, contexts:{len(compiled_contexts)}") 

343 

344 # NUCLEAR EXECUTION WRAPPER: Force any error to surface 

345 death_marker("BEFORE_EXECUTION_CALL", f"pipeline_steps={len(pipeline_definition)}, contexts={len(compiled_contexts)}") 

346 logger.info("🔥 SUBPROCESS: CALLING NUCLEAR EXECUTION WRAPPER...") 

347 print("🔥 SUBPROCESS STDOUT: CALLING NUCLEAR EXECUTION WRAPPER...") 

348 

349 death_marker("ENTERING_FORCE_ERROR_DETECTION") 

350 results = force_error_detection("execute_compiled_plate", orchestrator.execute_compiled_plate, 

351 pipeline_definition=pipeline_definition, 

352 compiled_contexts=compiled_contexts, 

353 max_workers=max_workers, # Use global config num_workers setting 

354 visualizer=None, # Let orchestrator auto-create visualizers based on compiled contexts 

355 log_file_base=log_file_base # Pass log base for worker process logging 

356 ) 

357 death_marker("AFTER_FORCE_ERROR_DETECTION", f"results_type={type(results)}") 

358 

359 logger.info("🔥 SUBPROCESS: NUCLEAR EXECUTION WRAPPER RETURNED!") 

360 print("🔥 SUBPROCESS STDOUT: NUCLEAR EXECUTION WRAPPER RETURNED!") 

361 death_marker("EXECUTION_WRAPPER_RETURNED") 

362 

363 log_thread_count("after execute_compiled_plate") 

364 

365 logger.info("🔥 SUBPROCESS: execute_compiled_plate RETURNED successfully!") 

366 logger.info(f"🔥 SUBPROCESS: Results: {type(results)}, length: {len(results) if results else 'None'}") 

367 

368 # FORCE ERROR DETECTION: Check for None results immediately 

369 if results is None: 

370 error_msg = "🔥 CRITICAL: execute_compiled_plate returned None - this should never happen!" 

371 logger.error(error_msg) 

372 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

373 raise RuntimeError(error_msg) 

374 

375 except Exception as execution_error: 

376 # FORCE ERROR PROPAGATION: Re-raise with enhanced context 

377 error_msg = f"🔥 EXECUTION ERROR in execute_compiled_plate: {execution_error}" 

378 logger.error(error_msg, exc_info=True) 

379 print(f"🔥 SUBPROCESS STDOUT EXECUTION ERROR: {error_msg}") 

380 print(f"🔥 SUBPROCESS STDOUT EXECUTION TRACEBACK: {traceback.format_exc()}") 

381 raise RuntimeError(error_msg) from execution_error 

382 finally: 

383 monitoring_active.clear() # Stop monitoring 

384 

385 logger.info("🔥 SUBPROCESS: Execution completed!") 

386 

387 # AGGRESSIVE RESULT VALIDATION: Force errors to surface 

388 logger.info("🔥 SUBPROCESS: Starting aggressive result validation...") 

389 

390 # Check 1: Results exist 

391 if not results: 

392 error_msg = "🔥 EXECUTION FAILED: No results returned from execute_compiled_plate!" 

393 logger.error(error_msg) 

394 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

395 raise RuntimeError(error_msg) 

396 

397 # Check 2: Results is a dictionary 

398 if not isinstance(results, dict): 

399 error_msg = f"🔥 EXECUTION FAILED: Results is not a dict, got {type(results)}: {results}" 

400 logger.error(error_msg) 

401 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

402 raise RuntimeError(error_msg) 

403 

404 # Check 3: Expected number of results 

405 if len(results) != len(wells): 

406 error_msg = f"🔥 EXECUTION FAILED: Expected {len(wells)} results, got {len(results)}. Wells: {wells}, Result keys: {list(results.keys())}" 

407 logger.error(error_msg) 

408 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

409 raise RuntimeError(error_msg) 

410 

411 # Check 4: All wells have results 

412 missing_wells = set(wells) - set(results.keys()) 

413 if missing_wells: 

414 error_msg = f"🔥 EXECUTION FAILED: Missing results for wells: {missing_wells}" 

415 logger.error(error_msg) 

416 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

417 raise RuntimeError(error_msg) 

418 

419 # Check 5: All results have proper structure and check for errors 

420 failed_wells = [] 

421 for well_id, result in results.items(): 

422 logger.info(f"🔥 SUBPROCESS: Validating result for well {well_id}: {result}") 

423 

424 if not isinstance(result, dict): 

425 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} is not a dict: {type(result)} = {result}" 

426 logger.error(error_msg) 

427 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

428 raise RuntimeError(error_msg) 

429 

430 if 'status' not in result: 

431 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} missing 'status' field: {result}" 

432 logger.error(error_msg) 

433 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

434 raise RuntimeError(error_msg) 

435 

436 if result.get('status') != 'success': 

437 error_msg = result.get('error_message', 'Unknown error') 

438 details = result.get('details', 'No details') 

439 full_error = f"🔥 EXECUTION FAILED for well {well_id}: {error_msg} | Details: {details}" 

440 logger.error(full_error) 

441 print(f"🔥 SUBPROCESS STDOUT ERROR: {full_error}") 

442 failed_wells.append((well_id, error_msg, details)) 

443 

444 # Check 6: Raise if any wells failed 

445 if failed_wells: 

446 error_summary = f"🔥 EXECUTION FAILED: {len(failed_wells)} wells failed out of {len(wells)}" 

447 for well_id, error_msg, details in failed_wells: 

448 error_summary += f"\n - Well {well_id}: {error_msg}" 

449 logger.error(error_summary) 

450 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_summary}") 

451 raise RuntimeError(error_summary) 

452 

453 logger.info(f"🔥 SUBPROCESS: EXECUTION SUCCESS: {len(results)} wells executed successfully") 

454 

455 # Success logged to log file (single source of truth) 

456 logger.info(f"🔥 SUBPROCESS: COMPLETED plate {plate_path} with {len(results)} results") 

457 

458 logger.info(f"🔥 SUBPROCESS: Plate {plate_path} completed successfully") 

459 

460 except Exception as e: 

461 error_msg = f"Execution failed for plate {plate_path}: {e}" 

462 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True) 

463 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

464 print(f"🔥 SUBPROCESS STDOUT TRACEBACK: {traceback.format_exc()}") 

465 # Error logged to log file (single source of truth) 

466 except BaseException as e: 

467 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc. 

468 error_msg = f"CRITICAL failure for plate {plate_path}: {e}" 

469 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True) 

470 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

471 print(f"🔥 SUBPROCESS STDOUT CRITICAL TRACEBACK: {traceback.format_exc()}") 

472 # Error logged to log file (single source of truth) 

473 

474def main(): 

475 """Main entry point for subprocess runner.""" 

476 if len(sys.argv) < 3 or len(sys.argv) > 4: 

477 print("Usage: python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id]") 

478 sys.exit(1) 

479 

480 data_file = sys.argv[1] 

481 log_file_base = sys.argv[2] 

482 unique_id = sys.argv[3] if len(sys.argv) == 4 else None 

483 

484 # Build log file name from provided base and unique ID 

485 if unique_id: 

486 log_file = f"{log_file_base}_{unique_id}.log" 

487 else: 

488 log_file = f"{log_file_base}.log" 

489 

490 # PROCESS GROUP CLEANUP: Create new process group to manage all child processes 

491 try: 

492 import os 

493 import signal 

494 

495 # Create new process group with this process as leader 

496 os.setpgrp() # Create new process group 

497 process_group_id = os.getpgrp() 

498 

499 print(f"🔥 SUBPROCESS: Created process group {process_group_id}") 

500 

501 # Track all child processes for cleanup 

502 child_processes = set() 

503 

504 def kill_all_children(): 

505 """Kill all child processes and the entire process group.""" 

506 try: 

507 print(f"🔥 SUBPROCESS: Killing process group {process_group_id}") 

508 # Kill entire process group (negative PID kills process group) 

509 os.killpg(process_group_id, signal.SIGTERM) 

510 

511 # Give processes time to exit gracefully 

512 import time 

513 time.sleep(2) 

514 

515 # Force kill if still alive 

516 try: 

517 os.killpg(process_group_id, signal.SIGKILL) 

518 except ProcessLookupError: 

519 pass # Already dead 

520 

521 print(f"🔥 SUBPROCESS: Process group {process_group_id} terminated") 

522 except Exception as e: 

523 print(f"🔥 SUBPROCESS: Error killing process group: {e}") 

524 

525 # Register cleanup function 

526 atexit.register(kill_all_children) 

527 

528 except Exception as e: 

529 print(f"🔥 SUBPROCESS: Warning - Could not set up process group cleanup: {e}") 

530 

531 # Set up logging first 

532 logger = setup_subprocess_logging(log_file) 

533 logger.info("🔥 SUBPROCESS: Starting OpenHCS subprocess runner") 

534 logger.info(f"🔥 SUBPROCESS: Args - data: {data_file}, log: {log_file}") 

535 logger.info(f"🔥 SUBPROCESS: Log file: {log_file}") 

536 

537 # DEATH DETECTION: Set up heartbeat monitoring 

538 import threading 

539 import time 

540 

541 def heartbeat_monitor(): 

542 """Monitor that writes heartbeats to detect where process dies.""" 

543 heartbeat_count = 0 

544 while True: 

545 try: 

546 heartbeat_count += 1 

547 heartbeat_msg = f"🔥 SUBPROCESS HEARTBEAT #{heartbeat_count}: Process alive at {time.time()}" 

548 logger.info(heartbeat_msg) 

549 print(heartbeat_msg) 

550 

551 # Heartbeat logged to log file (single source of truth) 

552 # No separate heartbeat file needed 

553 

554 time.sleep(2) # Heartbeat every 2 seconds 

555 except Exception as monitor_error: 

556 logger.error(f"🔥 SUBPROCESS: Heartbeat monitor error: {monitor_error}") 

557 print(f"🔥 SUBPROCESS STDOUT: Heartbeat monitor error: {monitor_error}") 

558 break 

559 

560 # Start heartbeat monitor in daemon thread 

561 heartbeat_thread = threading.Thread(target=heartbeat_monitor, daemon=True) 

562 heartbeat_thread.start() 

563 logger.info("🔥 SUBPROCESS: Heartbeat monitor started") 

564 

565 # NUCLEAR CRASH DETECTION - catch EVERYTHING 

566 def crash_handler(signum, frame): 

567 crash_msg = f"🔥 SUBPROCESS: CRASH DETECTED - Signal {signum} received!" 

568 logger.error(crash_msg) 

569 print(f"🔥 SUBPROCESS STDOUT CRASH: {crash_msg}") 

570 

571 # Crash info logged to log file (single source of truth) 

572 

573 # Dump stack trace 

574 try: 

575 import traceback 

576 import threading 

577 logger.error("🔥 SUBPROCESS: CRASH - Dumping all thread stacks...") 

578 for thread_id, frame in sys._current_frames().items(): 

579 logger.error(f"🔥 SUBPROCESS: CRASH Thread {thread_id}:") 

580 traceback.print_stack(frame) 

581 except: 

582 pass 

583 

584 # Force exit 

585 os._exit(1) 

586 

587 # Set up signal handlers for all possible crashes 

588 signal.signal(signal.SIGSEGV, crash_handler) # Segmentation fault 

589 signal.signal(signal.SIGABRT, crash_handler) # Abort 

590 signal.signal(signal.SIGFPE, crash_handler) # Floating point exception 

591 signal.signal(signal.SIGILL, crash_handler) # Illegal instruction 

592 signal.signal(signal.SIGTERM, crash_handler) # Termination 

593 signal.signal(signal.SIGINT, crash_handler) # Interrupt (Ctrl+C) 

594 

595 # Set up atexit handler to catch silent deaths 

596 def exit_handler(): 

597 logger.error("🔥 SUBPROCESS: ATEXIT - Process is exiting!") 

598 print("🔥 SUBPROCESS STDOUT: ATEXIT - Process is exiting!") 

599 # Exit info logged to log file (single source of truth) 

600 

601 atexit.register(exit_handler) 

602 

603 # Set up debug signal handler 

604 def debug_handler(signum, frame): 

605 logger.error("🔥 SUBPROCESS: SIGUSR1 received - dumping stack trace") 

606 import traceback 

607 import threading 

608 

609 # Dump all thread stacks 

610 for thread_id, frame in sys._current_frames().items(): 

611 logger.error(f"🔥 SUBPROCESS: Thread {thread_id} stack:") 

612 traceback.print_stack(frame) 

613 

614 # Log thread info 

615 for thread in threading.enumerate(): 

616 logger.error(f"🔥 SUBPROCESS: Thread: {thread.name}, alive: {thread.is_alive()}") 

617 

618 signal.signal(signal.SIGUSR1, debug_handler) 

619 logger.info("🔥 SUBPROCESS: NUCLEAR CRASH DETECTION ENABLED - All signals monitored") 

620 

621 try: 

622 # Load pickled data 

623 logger.info(f"🔥 SUBPROCESS: Loading data from {data_file}") 

624 with open(data_file, 'rb') as f: 

625 data = pickle.load(f) 

626 

627 plate_paths = data['plate_paths'] 

628 pipeline_data = data['pipeline_data'] # Dict[plate_path, List[FunctionStep]] 

629 global_config = data['global_config'] 

630 effective_configs = data.get('effective_configs', {}) # Per-plate effective configs 

631 

632 logger.info(f"🔥 SUBPROCESS: Loaded data for {len(plate_paths)} plates") 

633 logger.info(f"🔥 SUBPROCESS: Plates: {plate_paths}") 

634 

635 # Process each plate (like test_main.py but for multiple plates) 

636 for plate_path in plate_paths: 

637 plate_data = pipeline_data[plate_path] 

638 pipeline_definition = plate_data['pipeline_definition'] 

639 compiled_contexts = plate_data['compiled_contexts'] 

640 effective_config = effective_configs.get(plate_path) # Get effective config for this plate 

641 logger.info(f"🔥 SUBPROCESS: Processing plate {plate_path} with {len(pipeline_definition)} steps") 

642 

643 run_single_plate( 

644 plate_path=plate_path, 

645 pipeline_definition=pipeline_definition, 

646 compiled_contexts=compiled_contexts, 

647 global_config=global_config, 

648 logger=logger, 

649 log_file_base=log_file_base, 

650 effective_config=effective_config 

651 ) 

652 

653 logger.info("🔥 SUBPROCESS: All plates completed successfully") 

654 

655 except Exception as e: 

656 logger.error(f"🔥 SUBPROCESS: Fatal error: {e}", exc_info=True) 

657 print(f"🔥 SUBPROCESS STDOUT FATAL: {e}") 

658 print(f"🔥 SUBPROCESS STDOUT FATAL TRACEBACK: {traceback.format_exc()}") 

659 # Error logged to log file (single source of truth) 

660 logger.error(f"🔥 SUBPROCESS: Fatal error for all plates: {e}") 

661 sys.exit(1) 

662 except BaseException as e: 

663 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc. 

664 logger.error(f"🔥 SUBPROCESS: CRITICAL SYSTEM ERROR: {e}", exc_info=True) 

665 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM: {e}") 

666 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM TRACEBACK: {traceback.format_exc()}") 

667 # Critical error logged to log file (single source of truth) 

668 logger.error(f"🔥 SUBPROCESS: Critical system error for all plates: {e}") 

669 sys.exit(2) 

670 

671if __name__ == "__main__": 

672 main()