Coverage for openhcs/textual_tui/subprocess_runner.py: 0.0%

368 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1#!/usr/bin/env python3 

2""" 

3OpenHCS Subprocess Runner 

4 

5DEPRECATED: This subprocess runner is deprecated in favor of the ZMQ execution pattern. 

6For new code, use ZMQExecutionClient from openhcs.runtime.zmq_execution_client. 

7 

8The ZMQ execution pattern provides: 

9- Bidirectional communication (vs fire-and-forget) 

10- Real-time progress streaming 

11- Graceful cancellation 

12- Process reuse 

13- Location-transparent execution (local or remote) 

14 

15To use ZMQ execution, set environment variable: 

16 export OPENHCS_USE_ZMQ_EXECUTION=true 

17 

18This file is maintained for backward compatibility only. 

19 

20Standalone script that runs OpenHCS plate processing in a clean subprocess environment. 

21This mimics the integration test pattern from test_main.py but runs independently. 

22 

23Usage: 

24 python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id] 

25""" 

26 

27import sys 

28import dill as pickle 

29import logging 

30import traceback 

31import atexit 

32import os 

33from typing import Dict, List 

34 

35logger = logging.getLogger(__name__) 

36 

37# Enable subprocess mode - this single variable controls all subprocess behavior 

38os.environ['OPENHCS_SUBPROCESS_MODE'] = '1' 

39 

40# Prevent GPU library imports in subprocess runner - workers will handle GPU initialization 

41os.environ['OPENHCS_SUBPROCESS_NO_GPU'] = '1' 

42 

43# Subprocess runner doesn't need function registry at all 

44# Workers will initialize their own function registries when needed 

45logger.info("Subprocess runner: No function registry needed - workers will handle initialization") 

46 

47def setup_subprocess_logging(log_file_path: str): 

48 """Set up dedicated logging for the subprocess - all logs go to the specified file.""" 

49 

50 # Configure root logger to capture ALL logs from subprocess and OpenHCS modules 

51 root_logger = logging.getLogger() 

52 root_logger.handlers.clear() # Clear any existing handlers 

53 

54 # Create file handler for subprocess logs 

55 file_handler = logging.FileHandler(log_file_path) 

56 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) 

57 root_logger.addHandler(file_handler) 

58 root_logger.setLevel(logging.INFO) 

59 

60 # Ensure all OpenHCS module logs are captured 

61 logging.getLogger("openhcs").setLevel(logging.INFO) 

62 

63 # Prevent console output - everything goes to file 

64 logging.basicConfig = lambda *args, **kwargs: None 

65 

66 # Get subprocess logger 

67 logger = logging.getLogger("openhcs.subprocess") 

68 logger.info("SUBPROCESS: Logging configured") 

69 

70 return logger 

71 

72# Status and result files removed - log file is single source of truth 

73 

74def run_single_plate(plate_path: str, pipeline_definition: List, compiled_contexts: Dict, 

75 global_config, logger, log_file_base: str = None, effective_config=None): 

76 """ 

77 Run a single plate using pre-compiled contexts from UI. 

78 

79 This follows the pattern: 

80 1. Initialize GPU registry 

81 2. Create orchestrator and initialize 

82 3. Execute pre-compiled plate (no compilation needed) 

83 """ 

84 import psutil 

85 import os 

86 

87 def log_thread_count(step_name): 

88 thread_count = psutil.Process(os.getpid()).num_threads() 

89 return thread_count 

90 

91 # NUCLEAR ERROR DETECTION: Wrap EVERYTHING in try/except 

92 def force_error_detection(func_name, func, *args, **kwargs): 

93 """Wrapper that forces any error to be visible and logged.""" 

94 try: 

95 result = func(*args, **kwargs) 

96 return result 

97 except Exception as e: 

98 error_msg = f"Error in {func_name}: {e}" 

99 logger.error(error_msg, exc_info=True) 

100 raise RuntimeError(f"Failed: {func_name} failed: {e}") from e 

101 except BaseException as e: 

102 error_msg = f"Critical error in {func_name}: {e}" 

103 logger.error(error_msg, exc_info=True) 

104 raise RuntimeError(f"Critical error: {func_name} failed: {e}") from e 

105 

106 # DEATH DETECTION: Progress markers to find where process dies 

107 def death_marker(location, details=""): 

108 """Mark progress to detect where process dies (log file only).""" 

109 pass 

110 

111 try: 

112 death_marker("FUNCTION_START", f"plate_path={plate_path}") 

113 log_thread_count("function start") 

114 

115 logger.info(f"Starting plate {plate_path}") 

116 

117 log_thread_count("after status write") 

118 

119 # Step 1: Validate global config (GPU registry will be initialized by workers) 

120 death_marker("STEP1_START", "Global config validation") 

121 logger.info("SUBPROCESS: Validating global config (GPU registry initialization deferred to workers)") 

122 

123 death_marker("BEFORE_CONFIG_IMPORT") 

124 # NUCLEAR WRAP: Config import 

125 def import_config(): 

126 from openhcs.core.config import GlobalPipelineConfig, PathPlanningConfig, VFSConfig 

127 from openhcs.constants import Microscope 

128 return GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope 

129 GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope = force_error_detection("import_config", import_config) 

130 death_marker("AFTER_CONFIG_IMPORT") 

131 

132 log_thread_count("after config import") 

133 

134 # Global config is already a proper object from pickle - no reconstruction needed! 

135 log_thread_count("using pickled global config") 

136 log_thread_count("after global config validation") 

137 

138 logger.info("Global config validated - GPU registry and CUDA streams will be initialized by workers") 

139 

140 # Step 2: Create orchestrator and initialize (like test_main.py) 

141 

142 log_thread_count("before orchestrator import") 

143 

144 # NUCLEAR WRAP: Orchestrator import 

145 def import_orchestrator(): 

146 from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator 

147 return PipelineOrchestrator 

148 PipelineOrchestrator = force_error_detection("import_orchestrator", import_orchestrator) 

149 

150 log_thread_count("after orchestrator import") 

151 

152 # NUCLEAR WRAP: Storage registry import (lazy initialization for subprocess mode) 

153 def import_storage_registry(): 

154 from openhcs.io.base import storage_registry, ensure_storage_registry 

155 ensure_storage_registry() # Ensure registry is created in subprocess mode 

156 return storage_registry 

157 storage_registry = force_error_detection("import_storage_registry", import_storage_registry) 

158 

159 log_thread_count("after storage registry import") 

160 

161 # Skip function registry import in subprocess runner - workers will initialize their own 

162 logger.info("SUBPROCESS: Function registry initialization deferred to workers") 

163 

164 

165 

166 log_thread_count("before orchestrator creation") 

167 

168 # NUCLEAR WRAP: Set up global config context (required before orchestrator creation) 

169 def setup_global_context(): 

170 from openhcs.config_framework.lazy_factory import ensure_global_config_context 

171 from openhcs.core.config import GlobalPipelineConfig 

172 ensure_global_config_context(GlobalPipelineConfig, global_config) 

173 force_error_detection("setup_global_context", setup_global_context) 

174 

175 # NUCLEAR WRAP: Orchestrator creation 

176 orchestrator = force_error_detection("PipelineOrchestrator_creation", PipelineOrchestrator, 

177 plate_path=plate_path, 

178 storage_registry=storage_registry # Use default registry 

179 ) 

180 log_thread_count("after orchestrator creation") 

181 

182 # NUCLEAR WRAP: Orchestrator initialization 

183 force_error_detection("orchestrator_initialize", orchestrator.initialize) 

184 log_thread_count("after orchestrator initialization") 

185 

186 # Step 3: Use wells from pre-compiled contexts (not rediscovery) 

187 # The UI already compiled contexts for the specific wells in this plate 

188 wells = list(compiled_contexts.keys()) 

189 

190 # AGGRESSIVE VALIDATION: Check wells from compiled contexts 

191 if not wells: 

192 error_msg = f"No wells found in compiled contexts for plate {plate_path}!" 

193 logger.error(error_msg) 

194 raise RuntimeError(error_msg) 

195 if not isinstance(wells, list): 

196 error_msg = f"Wells is not a list: {type(wells)} = {wells}" 

197 logger.error(error_msg) 

198 raise RuntimeError(error_msg) 

199 

200 # Step 5: Execution phase with multiprocessing (like test_main.py but with processes) 

201 # Use effective config passed from UI (includes pipeline config) instead of global config 

202 config_to_use = effective_config if effective_config is not None else global_config 

203 max_workers = config_to_use.num_workers 

204 

205 # Create a custom progress callback to see exactly where it hangs 

206 def progress_callback(well_id, step_name, status): 

207 logger.info(f"🔥 SUBPROCESS: PROGRESS - Well {well_id}, Step '{step_name}', Status: {status}") 

208 

209 # Add monitoring without timeout 

210 import threading 

211 

212 # Start monitoring thread 

213 monitoring_active = threading.Event() 

214 monitoring_active.set() 

215 

216 def monitor_execution(): 

217 count = 0 

218 while monitoring_active.is_set(): 

219 count += 1 

220 logger.info(f"🔥 SUBPROCESS: MONITOR #{count} - Still executing...") 

221 

222 # Skip GPU memory monitoring in subprocess runner 

223 logger.info("🔥 SUBPROCESS: GPU memory monitoring deferred to workers") 

224 

225 # Check if we can get thread info 

226 try: 

227 import threading 

228 active_threads = threading.active_count() 

229 logger.info(f"🔥 SUBPROCESS: Active threads: {active_threads}") 

230 except: 

231 pass 

232 

233 # Log progress every 30 seconds with system info 

234 if count % 6 == 0: 

235 logger.info(f"🔥 SUBPROCESS: PROGRESS - Been running for {count*5} seconds, still executing...") 

236 print(f"🔥 SUBPROCESS STDOUT: PROGRESS - {count*5} seconds elapsed") 

237 

238 # Add system monitoring to catch resource issues 

239 try: 

240 import psutil 

241 import os 

242 

243 # Memory info 

244 memory = psutil.virtual_memory() 

245 swap = psutil.swap_memory() 

246 process = psutil.Process(os.getpid()) 

247 

248 logger.info(f"🔥 SUBPROCESS: SYSTEM - RAM: {memory.percent:.1f}% used, {memory.available/1024**3:.1f}GB free") 

249 logger.info(f"🔥 SUBPROCESS: SYSTEM - Swap: {swap.percent:.1f}% used") 

250 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process RAM: {process.memory_info().rss/1024**3:.1f}GB") 

251 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process threads: {process.num_threads()}") 

252 

253 print(f"🔥 SUBPROCESS STDOUT: RAM {memory.percent:.1f}%, Process {process.memory_info().rss/1024**3:.1f}GB, Threads {process.num_threads()}") 

254 

255 # Check for memory pressure 

256 if memory.percent > 90: 

257 logger.error(f"🔥 SUBPROCESS: WARNING - High memory usage: {memory.percent:.1f}%") 

258 print(f"🔥 SUBPROCESS STDOUT: HIGH MEMORY WARNING: {memory.percent:.1f}%") 

259 

260 if process.memory_info().rss > 16 * 1024**3: # 16GB 

261 logger.error(f"🔥 SUBPROCESS: WARNING - Process using {process.memory_info().rss/1024**3:.1f}GB") 

262 print(f"🔥 SUBPROCESS STDOUT: HIGH PROCESS MEMORY: {process.memory_info().rss/1024**3:.1f}GB") 

263 

264 except Exception as e: 

265 logger.debug(f"Could not get system info: {e}") 

266 

267 threading.Event().wait(5) # Wait 5 seconds (more frequent) 

268 

269 monitor_thread = threading.Thread(target=monitor_execution, daemon=True) 

270 monitor_thread.start() 

271 

272 try: 

273 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...") 

274 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(pipeline_definition)} steps") 

275 logger.info(f"🔥 SUBPROCESS: Compiled contexts for {len(compiled_contexts)} wells") 

276 logger.info("🔥 SUBPROCESS: Calling execute_compiled_plate NOW...") 

277 

278 log_thread_count("before execute_compiled_plate") 

279 

280 # PRE-EXECUTION STATE VALIDATION 

281 logger.info("🔥 SUBPROCESS: PRE-EXECUTION VALIDATION...") 

282 print("🔥 SUBPROCESS STDOUT: PRE-EXECUTION VALIDATION...") 

283 

284 if not hasattr(orchestrator, 'execute_compiled_plate'): 

285 error_msg = "🔥 CRITICAL: orchestrator missing execute_compiled_plate method!" 

286 logger.error(error_msg) 

287 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

288 raise RuntimeError(error_msg) 

289 

290 if pipeline_definition is None: 

291 error_msg = "🔥 CRITICAL: pipeline_definition is None!" 

292 logger.error(error_msg) 

293 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

294 raise RuntimeError(error_msg) 

295 

296 if compiled_contexts is None: 

297 error_msg = "🔥 CRITICAL: compiled_contexts is None!" 

298 logger.error(error_msg) 

299 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

300 raise RuntimeError(error_msg) 

301 

302 logger.info(f"🔥 SUBPROCESS: PRE-EXECUTION OK - pipeline:{len(pipeline_definition)}, contexts:{len(compiled_contexts)}") 

303 print(f"🔥 SUBPROCESS STDOUT: PRE-EXECUTION OK - pipeline:{len(pipeline_definition)}, contexts:{len(compiled_contexts)}") 

304 

305 # NUCLEAR EXECUTION WRAPPER: Force any error to surface 

306 death_marker("BEFORE_EXECUTION_CALL", f"pipeline_steps={len(pipeline_definition)}, contexts={len(compiled_contexts)}") 

307 logger.info("🔥 SUBPROCESS: CALLING NUCLEAR EXECUTION WRAPPER...") 

308 print("🔥 SUBPROCESS STDOUT: CALLING NUCLEAR EXECUTION WRAPPER...") 

309 

310 death_marker("ENTERING_FORCE_ERROR_DETECTION") 

311 results = force_error_detection("execute_compiled_plate", orchestrator.execute_compiled_plate, 

312 pipeline_definition=pipeline_definition, 

313 compiled_contexts=compiled_contexts, 

314 max_workers=max_workers, # Use global config num_workers setting 

315 visualizer=None, # Let orchestrator auto-create visualizers based on compiled contexts 

316 log_file_base=log_file_base # Pass log base for worker process logging 

317 ) 

318 death_marker("AFTER_FORCE_ERROR_DETECTION", f"results_type={type(results)}") 

319 

320 logger.info("🔥 SUBPROCESS: NUCLEAR EXECUTION WRAPPER RETURNED!") 

321 print("🔥 SUBPROCESS STDOUT: NUCLEAR EXECUTION WRAPPER RETURNED!") 

322 death_marker("EXECUTION_WRAPPER_RETURNED") 

323 

324 log_thread_count("after execute_compiled_plate") 

325 

326 logger.info("🔥 SUBPROCESS: execute_compiled_plate RETURNED successfully!") 

327 logger.info(f"🔥 SUBPROCESS: Results: {type(results)}, length: {len(results) if results else 'None'}") 

328 

329 # FORCE ERROR DETECTION: Check for None results immediately 

330 if results is None: 

331 error_msg = "🔥 CRITICAL: execute_compiled_plate returned None - this should never happen!" 

332 logger.error(error_msg) 

333 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

334 raise RuntimeError(error_msg) 

335 

336 except Exception as execution_error: 

337 # FORCE ERROR PROPAGATION: Re-raise with enhanced context 

338 error_msg = f"🔥 EXECUTION ERROR in execute_compiled_plate: {execution_error}" 

339 logger.error(error_msg, exc_info=True) 

340 print(f"🔥 SUBPROCESS STDOUT EXECUTION ERROR: {error_msg}") 

341 print(f"🔥 SUBPROCESS STDOUT EXECUTION TRACEBACK: {traceback.format_exc()}") 

342 raise RuntimeError(error_msg) from execution_error 

343 finally: 

344 monitoring_active.clear() # Stop monitoring 

345 

346 logger.info("🔥 SUBPROCESS: Execution completed!") 

347 

348 # AGGRESSIVE RESULT VALIDATION: Force errors to surface 

349 logger.info("🔥 SUBPROCESS: Starting aggressive result validation...") 

350 

351 # Check 1: Results exist 

352 if not results: 

353 error_msg = "🔥 EXECUTION FAILED: No results returned from execute_compiled_plate!" 

354 logger.error(error_msg) 

355 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

356 raise RuntimeError(error_msg) 

357 

358 # Check 2: Results is a dictionary 

359 if not isinstance(results, dict): 

360 error_msg = f"🔥 EXECUTION FAILED: Results is not a dict, got {type(results)}: {results}" 

361 logger.error(error_msg) 

362 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

363 raise RuntimeError(error_msg) 

364 

365 # Check 3: Expected number of results 

366 if len(results) != len(wells): 

367 error_msg = f"🔥 EXECUTION FAILED: Expected {len(wells)} results, got {len(results)}. Wells: {wells}, Result keys: {list(results.keys())}" 

368 logger.error(error_msg) 

369 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

370 raise RuntimeError(error_msg) 

371 

372 # Check 4: All wells have results 

373 missing_wells = set(wells) - set(results.keys()) 

374 if missing_wells: 

375 error_msg = f"🔥 EXECUTION FAILED: Missing results for wells: {missing_wells}" 

376 logger.error(error_msg) 

377 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

378 raise RuntimeError(error_msg) 

379 

380 # Check 5: All results have proper structure and check for errors 

381 failed_wells = [] 

382 for well_id, result in results.items(): 

383 logger.info(f"🔥 SUBPROCESS: Validating result for well {well_id}: {result}") 

384 

385 if not isinstance(result, dict): 

386 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} is not a dict: {type(result)} = {result}" 

387 logger.error(error_msg) 

388 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

389 raise RuntimeError(error_msg) 

390 

391 if 'status' not in result: 

392 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} missing 'status' field: {result}" 

393 logger.error(error_msg) 

394 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

395 raise RuntimeError(error_msg) 

396 

397 if result.get('status') != 'success': 

398 error_msg = result.get('error_message', 'Unknown error') 

399 details = result.get('details', 'No details') 

400 full_error = f"🔥 EXECUTION FAILED for well {well_id}: {error_msg} | Details: {details}" 

401 logger.error(full_error) 

402 print(f"🔥 SUBPROCESS STDOUT ERROR: {full_error}") 

403 failed_wells.append((well_id, error_msg, details)) 

404 

405 # Check 6: Raise if any wells failed 

406 if failed_wells: 

407 error_summary = f"🔥 EXECUTION FAILED: {len(failed_wells)} wells failed out of {len(wells)}" 

408 for well_id, error_msg, details in failed_wells: 

409 error_summary += f"\n - Well {well_id}: {error_msg}" 

410 logger.error(error_summary) 

411 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_summary}") 

412 raise RuntimeError(error_summary) 

413 

414 logger.info(f"🔥 SUBPROCESS: EXECUTION SUCCESS: {len(results)} wells executed successfully") 

415 

416 # Success logged to log file (single source of truth) 

417 logger.info(f"🔥 SUBPROCESS: COMPLETED plate {plate_path} with {len(results)} results") 

418 

419 logger.info(f"🔥 SUBPROCESS: Plate {plate_path} completed successfully") 

420 

421 except Exception as e: 

422 error_msg = f"Execution failed for plate {plate_path}: {e}" 

423 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True) 

424 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}") 

425 print(f"🔥 SUBPROCESS STDOUT TRACEBACK: {traceback.format_exc()}") 

426 # Error logged to log file (single source of truth) 

427 except BaseException as e: 

428 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc. 

429 error_msg = f"CRITICAL failure for plate {plate_path}: {e}" 

430 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True) 

431 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}") 

432 print(f"🔥 SUBPROCESS STDOUT CRITICAL TRACEBACK: {traceback.format_exc()}") 

433 # Error logged to log file (single source of truth) 

434 

435def main(): 

436 """Main entry point for subprocess runner.""" 

437 if len(sys.argv) < 3 or len(sys.argv) > 4: 

438 print("Usage: python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id]") 

439 sys.exit(1) 

440 

441 data_file = sys.argv[1] 

442 log_file_base = sys.argv[2] 

443 unique_id = sys.argv[3] if len(sys.argv) == 4 else None 

444 

445 # Build log file name from provided base and unique ID 

446 if unique_id: 

447 log_file = f"{log_file_base}_{unique_id}.log" 

448 else: 

449 log_file = f"{log_file_base}.log" 

450 

451 # PROCESS GROUP CLEANUP: Create new process group to manage all child processes 

452 try: 

453 import os 

454 import signal 

455 

456 # Create new process group with this process as leader 

457 os.setpgrp() # Create new process group 

458 process_group_id = os.getpgrp() 

459 

460 print(f"🔥 SUBPROCESS: Created process group {process_group_id}") 

461 

462 # Track all child processes for cleanup 

463 child_processes = set() 

464 

465 def kill_all_children(): 

466 """Kill all child processes and the entire process group.""" 

467 try: 

468 print(f"🔥 SUBPROCESS: Killing process group {process_group_id}") 

469 # Kill entire process group (negative PID kills process group) 

470 os.killpg(process_group_id, signal.SIGTERM) 

471 

472 # Give processes time to exit gracefully 

473 import time 

474 time.sleep(2) 

475 

476 # Force kill if still alive 

477 try: 

478 os.killpg(process_group_id, signal.SIGKILL) 

479 except ProcessLookupError: 

480 pass # Already dead 

481 

482 print(f"🔥 SUBPROCESS: Process group {process_group_id} terminated") 

483 except Exception as e: 

484 print(f"🔥 SUBPROCESS: Error killing process group: {e}") 

485 

486 # Register cleanup function 

487 atexit.register(kill_all_children) 

488 

489 except Exception as e: 

490 print(f"🔥 SUBPROCESS: Warning - Could not set up process group cleanup: {e}") 

491 

492 # Set up logging first 

493 logger = setup_subprocess_logging(log_file) 

494 logger.info("🔥 SUBPROCESS: Starting OpenHCS subprocess runner") 

495 logger.info(f"🔥 SUBPROCESS: Args - data: {data_file}, log: {log_file}") 

496 logger.info(f"🔥 SUBPROCESS: Log file: {log_file}") 

497 

498 # DEATH DETECTION: Set up heartbeat monitoring 

499 import threading 

500 import time 

501 

502 def heartbeat_monitor(): 

503 """Monitor that writes heartbeats to detect where process dies.""" 

504 heartbeat_count = 0 

505 while True: 

506 try: 

507 heartbeat_count += 1 

508 heartbeat_msg = f"🔥 SUBPROCESS HEARTBEAT #{heartbeat_count}: Process alive at {time.time()}" 

509 logger.info(heartbeat_msg) 

510 print(heartbeat_msg) 

511 

512 # Heartbeat logged to log file (single source of truth) 

513 # No separate heartbeat file needed 

514 

515 time.sleep(2) # Heartbeat every 2 seconds 

516 except Exception as monitor_error: 

517 logger.error(f"🔥 SUBPROCESS: Heartbeat monitor error: {monitor_error}") 

518 print(f"🔥 SUBPROCESS STDOUT: Heartbeat monitor error: {monitor_error}") 

519 break 

520 

521 # Start heartbeat monitor in daemon thread 

522 heartbeat_thread = threading.Thread(target=heartbeat_monitor, daemon=True) 

523 heartbeat_thread.start() 

524 logger.info("🔥 SUBPROCESS: Heartbeat monitor started") 

525 

526 # NUCLEAR CRASH DETECTION - catch EVERYTHING 

527 def crash_handler(signum, frame): 

528 crash_msg = f"🔥 SUBPROCESS: CRASH DETECTED - Signal {signum} received!" 

529 logger.error(crash_msg) 

530 print(f"🔥 SUBPROCESS STDOUT CRASH: {crash_msg}") 

531 

532 # Crash info logged to log file (single source of truth) 

533 

534 # Dump stack trace 

535 try: 

536 import traceback 

537 logger.error("🔥 SUBPROCESS: CRASH - Dumping all thread stacks...") 

538 for thread_id, frame in sys._current_frames().items(): 

539 logger.error(f"🔥 SUBPROCESS: CRASH Thread {thread_id}:") 

540 traceback.print_stack(frame) 

541 except: 

542 pass 

543 

544 # Force exit 

545 os._exit(1) 

546 

547 # Set up signal handlers for all possible crashes 

548 signal.signal(signal.SIGSEGV, crash_handler) # Segmentation fault 

549 signal.signal(signal.SIGABRT, crash_handler) # Abort 

550 signal.signal(signal.SIGFPE, crash_handler) # Floating point exception 

551 signal.signal(signal.SIGILL, crash_handler) # Illegal instruction 

552 signal.signal(signal.SIGTERM, crash_handler) # Termination 

553 signal.signal(signal.SIGINT, crash_handler) # Interrupt (Ctrl+C) 

554 

555 # Set up atexit handler to catch silent deaths 

556 def exit_handler(): 

557 logger.error("🔥 SUBPROCESS: ATEXIT - Process is exiting!") 

558 print("🔥 SUBPROCESS STDOUT: ATEXIT - Process is exiting!") 

559 # Exit info logged to log file (single source of truth) 

560 

561 atexit.register(exit_handler) 

562 

563 # Set up debug signal handler 

564 def debug_handler(signum, frame): 

565 logger.error("🔥 SUBPROCESS: SIGUSR1 received - dumping stack trace") 

566 import traceback 

567 import threading 

568 

569 # Dump all thread stacks 

570 for thread_id, frame in sys._current_frames().items(): 

571 logger.error(f"🔥 SUBPROCESS: Thread {thread_id} stack:") 

572 traceback.print_stack(frame) 

573 

574 # Log thread info 

575 for thread in threading.enumerate(): 

576 logger.error(f"🔥 SUBPROCESS: Thread: {thread.name}, alive: {thread.is_alive()}") 

577 

578 signal.signal(signal.SIGUSR1, debug_handler) 

579 logger.info("🔥 SUBPROCESS: NUCLEAR CRASH DETECTION ENABLED - All signals monitored") 

580 

581 try: 

582 # Load pickled data 

583 logger.info(f"🔥 SUBPROCESS: Loading data from {data_file}") 

584 with open(data_file, 'rb') as f: 

585 data = pickle.load(f) 

586 

587 plate_paths = data['plate_paths'] 

588 pipeline_data = data['pipeline_data'] # Dict[plate_path, List[FunctionStep]] 

589 global_config = data['global_config'] 

590 effective_configs = data.get('effective_configs', {}) # Per-plate effective configs 

591 

592 logger.info(f"🔥 SUBPROCESS: Loaded data for {len(plate_paths)} plates") 

593 logger.info(f"🔥 SUBPROCESS: Plates: {plate_paths}") 

594 

595 # Process each plate (like test_main.py but for multiple plates) 

596 for plate_path in plate_paths: 

597 plate_data = pipeline_data[plate_path] 

598 pipeline_definition = plate_data['pipeline_definition'] 

599 compiled_contexts = plate_data['compiled_contexts'] 

600 effective_config = effective_configs.get(plate_path) # Get effective config for this plate 

601 logger.info(f"🔥 SUBPROCESS: Processing plate {plate_path} with {len(pipeline_definition)} steps") 

602 

603 run_single_plate( 

604 plate_path=plate_path, 

605 pipeline_definition=pipeline_definition, 

606 compiled_contexts=compiled_contexts, 

607 global_config=global_config, 

608 logger=logger, 

609 log_file_base=log_file_base, 

610 effective_config=effective_config 

611 ) 

612 

613 logger.info("🔥 SUBPROCESS: All plates completed successfully") 

614 

615 except Exception as e: 

616 logger.error(f"🔥 SUBPROCESS: Fatal error: {e}", exc_info=True) 

617 print(f"🔥 SUBPROCESS STDOUT FATAL: {e}") 

618 print(f"🔥 SUBPROCESS STDOUT FATAL TRACEBACK: {traceback.format_exc()}") 

619 # Error logged to log file (single source of truth) 

620 logger.error(f"🔥 SUBPROCESS: Fatal error for all plates: {e}") 

621 sys.exit(1) 

622 except BaseException as e: 

623 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc. 

624 logger.error(f"🔥 SUBPROCESS: CRITICAL SYSTEM ERROR: {e}", exc_info=True) 

625 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM: {e}") 

626 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM TRACEBACK: {traceback.format_exc()}") 

627 # Critical error logged to log file (single source of truth) 

628 logger.error(f"🔥 SUBPROCESS: Critical system error for all plates: {e}") 

629 sys.exit(2) 

630 

631if __name__ == "__main__": 

632 main()