Coverage for openhcs/textual_tui/subprocess_runner.py: 0.0%
368 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1#!/usr/bin/env python3
2"""
3OpenHCS Subprocess Runner
5DEPRECATED: This subprocess runner is deprecated in favor of the ZMQ execution pattern.
6For new code, use ZMQExecutionClient from openhcs.runtime.zmq_execution_client.
8The ZMQ execution pattern provides:
9- Bidirectional communication (vs fire-and-forget)
10- Real-time progress streaming
11- Graceful cancellation
12- Process reuse
13- Location-transparent execution (local or remote)
15To use ZMQ execution, set environment variable:
16 export OPENHCS_USE_ZMQ_EXECUTION=true
18This file is maintained for backward compatibility only.
20Standalone script that runs OpenHCS plate processing in a clean subprocess environment.
21This mimics the integration test pattern from test_main.py but runs independently.
23Usage:
24 python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id]
25"""
27import sys
28import dill as pickle
29import logging
30import traceback
31import atexit
32import os
33from typing import Dict, List
35logger = logging.getLogger(__name__)
37# Enable subprocess mode - this single variable controls all subprocess behavior
38os.environ['OPENHCS_SUBPROCESS_MODE'] = '1'
40# Prevent GPU library imports in subprocess runner - workers will handle GPU initialization
41os.environ['OPENHCS_SUBPROCESS_NO_GPU'] = '1'
43# Subprocess runner doesn't need function registry at all
44# Workers will initialize their own function registries when needed
45logger.info("Subprocess runner: No function registry needed - workers will handle initialization")
47def setup_subprocess_logging(log_file_path: str):
48 """Set up dedicated logging for the subprocess - all logs go to the specified file."""
50 # Configure root logger to capture ALL logs from subprocess and OpenHCS modules
51 root_logger = logging.getLogger()
52 root_logger.handlers.clear() # Clear any existing handlers
54 # Create file handler for subprocess logs
55 file_handler = logging.FileHandler(log_file_path)
56 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
57 root_logger.addHandler(file_handler)
58 root_logger.setLevel(logging.INFO)
60 # Ensure all OpenHCS module logs are captured
61 logging.getLogger("openhcs").setLevel(logging.INFO)
63 # Prevent console output - everything goes to file
64 logging.basicConfig = lambda *args, **kwargs: None
66 # Get subprocess logger
67 logger = logging.getLogger("openhcs.subprocess")
68 logger.info("SUBPROCESS: Logging configured")
70 return logger
72# Status and result files removed - log file is single source of truth
74def run_single_plate(plate_path: str, pipeline_definition: List, compiled_contexts: Dict,
75 global_config, logger, log_file_base: str = None, effective_config=None):
76 """
77 Run a single plate using pre-compiled contexts from UI.
79 This follows the pattern:
80 1. Initialize GPU registry
81 2. Create orchestrator and initialize
82 3. Execute pre-compiled plate (no compilation needed)
83 """
84 import psutil
85 import os
87 def log_thread_count(step_name):
88 thread_count = psutil.Process(os.getpid()).num_threads()
89 return thread_count
91 # NUCLEAR ERROR DETECTION: Wrap EVERYTHING in try/except
92 def force_error_detection(func_name, func, *args, **kwargs):
93 """Wrapper that forces any error to be visible and logged."""
94 try:
95 result = func(*args, **kwargs)
96 return result
97 except Exception as e:
98 error_msg = f"Error in {func_name}: {e}"
99 logger.error(error_msg, exc_info=True)
100 raise RuntimeError(f"Failed: {func_name} failed: {e}") from e
101 except BaseException as e:
102 error_msg = f"Critical error in {func_name}: {e}"
103 logger.error(error_msg, exc_info=True)
104 raise RuntimeError(f"Critical error: {func_name} failed: {e}") from e
106 # DEATH DETECTION: Progress markers to find where process dies
107 def death_marker(location, details=""):
108 """Mark progress to detect where process dies (log file only)."""
109 pass
111 try:
112 death_marker("FUNCTION_START", f"plate_path={plate_path}")
113 log_thread_count("function start")
115 logger.info(f"Starting plate {plate_path}")
117 log_thread_count("after status write")
119 # Step 1: Validate global config (GPU registry will be initialized by workers)
120 death_marker("STEP1_START", "Global config validation")
121 logger.info("SUBPROCESS: Validating global config (GPU registry initialization deferred to workers)")
123 death_marker("BEFORE_CONFIG_IMPORT")
124 # NUCLEAR WRAP: Config import
125 def import_config():
126 from openhcs.core.config import GlobalPipelineConfig, PathPlanningConfig, VFSConfig
127 from openhcs.constants import Microscope
128 return GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope
129 GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope = force_error_detection("import_config", import_config)
130 death_marker("AFTER_CONFIG_IMPORT")
132 log_thread_count("after config import")
134 # Global config is already a proper object from pickle - no reconstruction needed!
135 log_thread_count("using pickled global config")
136 log_thread_count("after global config validation")
138 logger.info("Global config validated - GPU registry and CUDA streams will be initialized by workers")
140 # Step 2: Create orchestrator and initialize (like test_main.py)
142 log_thread_count("before orchestrator import")
144 # NUCLEAR WRAP: Orchestrator import
145 def import_orchestrator():
146 from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
147 return PipelineOrchestrator
148 PipelineOrchestrator = force_error_detection("import_orchestrator", import_orchestrator)
150 log_thread_count("after orchestrator import")
152 # NUCLEAR WRAP: Storage registry import (lazy initialization for subprocess mode)
153 def import_storage_registry():
154 from openhcs.io.base import storage_registry, ensure_storage_registry
155 ensure_storage_registry() # Ensure registry is created in subprocess mode
156 return storage_registry
157 storage_registry = force_error_detection("import_storage_registry", import_storage_registry)
159 log_thread_count("after storage registry import")
161 # Skip function registry import in subprocess runner - workers will initialize their own
162 logger.info("SUBPROCESS: Function registry initialization deferred to workers")
166 log_thread_count("before orchestrator creation")
168 # NUCLEAR WRAP: Set up global config context (required before orchestrator creation)
169 def setup_global_context():
170 from openhcs.config_framework.lazy_factory import ensure_global_config_context
171 from openhcs.core.config import GlobalPipelineConfig
172 ensure_global_config_context(GlobalPipelineConfig, global_config)
173 force_error_detection("setup_global_context", setup_global_context)
175 # NUCLEAR WRAP: Orchestrator creation
176 orchestrator = force_error_detection("PipelineOrchestrator_creation", PipelineOrchestrator,
177 plate_path=plate_path,
178 storage_registry=storage_registry # Use default registry
179 )
180 log_thread_count("after orchestrator creation")
182 # NUCLEAR WRAP: Orchestrator initialization
183 force_error_detection("orchestrator_initialize", orchestrator.initialize)
184 log_thread_count("after orchestrator initialization")
186 # Step 3: Use wells from pre-compiled contexts (not rediscovery)
187 # The UI already compiled contexts for the specific wells in this plate
188 wells = list(compiled_contexts.keys())
190 # AGGRESSIVE VALIDATION: Check wells from compiled contexts
191 if not wells:
192 error_msg = f"No wells found in compiled contexts for plate {plate_path}!"
193 logger.error(error_msg)
194 raise RuntimeError(error_msg)
195 if not isinstance(wells, list):
196 error_msg = f"Wells is not a list: {type(wells)} = {wells}"
197 logger.error(error_msg)
198 raise RuntimeError(error_msg)
200 # Step 5: Execution phase with multiprocessing (like test_main.py but with processes)
201 # Use effective config passed from UI (includes pipeline config) instead of global config
202 config_to_use = effective_config if effective_config is not None else global_config
203 max_workers = config_to_use.num_workers
205 # Create a custom progress callback to see exactly where it hangs
206 def progress_callback(well_id, step_name, status):
207 logger.info(f"🔥 SUBPROCESS: PROGRESS - Well {well_id}, Step '{step_name}', Status: {status}")
209 # Add monitoring without timeout
210 import threading
212 # Start monitoring thread
213 monitoring_active = threading.Event()
214 monitoring_active.set()
216 def monitor_execution():
217 count = 0
218 while monitoring_active.is_set():
219 count += 1
220 logger.info(f"🔥 SUBPROCESS: MONITOR #{count} - Still executing...")
222 # Skip GPU memory monitoring in subprocess runner
223 logger.info("🔥 SUBPROCESS: GPU memory monitoring deferred to workers")
225 # Check if we can get thread info
226 try:
227 import threading
228 active_threads = threading.active_count()
229 logger.info(f"🔥 SUBPROCESS: Active threads: {active_threads}")
230 except:
231 pass
233 # Log progress every 30 seconds with system info
234 if count % 6 == 0:
235 logger.info(f"🔥 SUBPROCESS: PROGRESS - Been running for {count*5} seconds, still executing...")
236 print(f"🔥 SUBPROCESS STDOUT: PROGRESS - {count*5} seconds elapsed")
238 # Add system monitoring to catch resource issues
239 try:
240 import psutil
241 import os
243 # Memory info
244 memory = psutil.virtual_memory()
245 swap = psutil.swap_memory()
246 process = psutil.Process(os.getpid())
248 logger.info(f"🔥 SUBPROCESS: SYSTEM - RAM: {memory.percent:.1f}% used, {memory.available/1024**3:.1f}GB free")
249 logger.info(f"🔥 SUBPROCESS: SYSTEM - Swap: {swap.percent:.1f}% used")
250 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process RAM: {process.memory_info().rss/1024**3:.1f}GB")
251 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process threads: {process.num_threads()}")
253 print(f"🔥 SUBPROCESS STDOUT: RAM {memory.percent:.1f}%, Process {process.memory_info().rss/1024**3:.1f}GB, Threads {process.num_threads()}")
255 # Check for memory pressure
256 if memory.percent > 90:
257 logger.error(f"🔥 SUBPROCESS: WARNING - High memory usage: {memory.percent:.1f}%")
258 print(f"🔥 SUBPROCESS STDOUT: HIGH MEMORY WARNING: {memory.percent:.1f}%")
260 if process.memory_info().rss > 16 * 1024**3: # 16GB
261 logger.error(f"🔥 SUBPROCESS: WARNING - Process using {process.memory_info().rss/1024**3:.1f}GB")
262 print(f"🔥 SUBPROCESS STDOUT: HIGH PROCESS MEMORY: {process.memory_info().rss/1024**3:.1f}GB")
264 except Exception as e:
265 logger.debug(f"Could not get system info: {e}")
267 threading.Event().wait(5) # Wait 5 seconds (more frequent)
269 monitor_thread = threading.Thread(target=monitor_execution, daemon=True)
270 monitor_thread.start()
272 try:
273 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...")
274 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(pipeline_definition)} steps")
275 logger.info(f"🔥 SUBPROCESS: Compiled contexts for {len(compiled_contexts)} wells")
276 logger.info("🔥 SUBPROCESS: Calling execute_compiled_plate NOW...")
278 log_thread_count("before execute_compiled_plate")
280 # PRE-EXECUTION STATE VALIDATION
281 logger.info("🔥 SUBPROCESS: PRE-EXECUTION VALIDATION...")
282 print("🔥 SUBPROCESS STDOUT: PRE-EXECUTION VALIDATION...")
284 if not hasattr(orchestrator, 'execute_compiled_plate'):
285 error_msg = "🔥 CRITICAL: orchestrator missing execute_compiled_plate method!"
286 logger.error(error_msg)
287 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
288 raise RuntimeError(error_msg)
290 if pipeline_definition is None:
291 error_msg = "🔥 CRITICAL: pipeline_definition is None!"
292 logger.error(error_msg)
293 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
294 raise RuntimeError(error_msg)
296 if compiled_contexts is None:
297 error_msg = "🔥 CRITICAL: compiled_contexts is None!"
298 logger.error(error_msg)
299 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
300 raise RuntimeError(error_msg)
302 logger.info(f"🔥 SUBPROCESS: PRE-EXECUTION OK - pipeline:{len(pipeline_definition)}, contexts:{len(compiled_contexts)}")
303 print(f"🔥 SUBPROCESS STDOUT: PRE-EXECUTION OK - pipeline:{len(pipeline_definition)}, contexts:{len(compiled_contexts)}")
305 # NUCLEAR EXECUTION WRAPPER: Force any error to surface
306 death_marker("BEFORE_EXECUTION_CALL", f"pipeline_steps={len(pipeline_definition)}, contexts={len(compiled_contexts)}")
307 logger.info("🔥 SUBPROCESS: CALLING NUCLEAR EXECUTION WRAPPER...")
308 print("🔥 SUBPROCESS STDOUT: CALLING NUCLEAR EXECUTION WRAPPER...")
310 death_marker("ENTERING_FORCE_ERROR_DETECTION")
311 results = force_error_detection("execute_compiled_plate", orchestrator.execute_compiled_plate,
312 pipeline_definition=pipeline_definition,
313 compiled_contexts=compiled_contexts,
314 max_workers=max_workers, # Use global config num_workers setting
315 visualizer=None, # Let orchestrator auto-create visualizers based on compiled contexts
316 log_file_base=log_file_base # Pass log base for worker process logging
317 )
318 death_marker("AFTER_FORCE_ERROR_DETECTION", f"results_type={type(results)}")
320 logger.info("🔥 SUBPROCESS: NUCLEAR EXECUTION WRAPPER RETURNED!")
321 print("🔥 SUBPROCESS STDOUT: NUCLEAR EXECUTION WRAPPER RETURNED!")
322 death_marker("EXECUTION_WRAPPER_RETURNED")
324 log_thread_count("after execute_compiled_plate")
326 logger.info("🔥 SUBPROCESS: execute_compiled_plate RETURNED successfully!")
327 logger.info(f"🔥 SUBPROCESS: Results: {type(results)}, length: {len(results) if results else 'None'}")
329 # FORCE ERROR DETECTION: Check for None results immediately
330 if results is None:
331 error_msg = "🔥 CRITICAL: execute_compiled_plate returned None - this should never happen!"
332 logger.error(error_msg)
333 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
334 raise RuntimeError(error_msg)
336 except Exception as execution_error:
337 # FORCE ERROR PROPAGATION: Re-raise with enhanced context
338 error_msg = f"🔥 EXECUTION ERROR in execute_compiled_plate: {execution_error}"
339 logger.error(error_msg, exc_info=True)
340 print(f"🔥 SUBPROCESS STDOUT EXECUTION ERROR: {error_msg}")
341 print(f"🔥 SUBPROCESS STDOUT EXECUTION TRACEBACK: {traceback.format_exc()}")
342 raise RuntimeError(error_msg) from execution_error
343 finally:
344 monitoring_active.clear() # Stop monitoring
346 logger.info("🔥 SUBPROCESS: Execution completed!")
348 # AGGRESSIVE RESULT VALIDATION: Force errors to surface
349 logger.info("🔥 SUBPROCESS: Starting aggressive result validation...")
351 # Check 1: Results exist
352 if not results:
353 error_msg = "🔥 EXECUTION FAILED: No results returned from execute_compiled_plate!"
354 logger.error(error_msg)
355 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
356 raise RuntimeError(error_msg)
358 # Check 2: Results is a dictionary
359 if not isinstance(results, dict):
360 error_msg = f"🔥 EXECUTION FAILED: Results is not a dict, got {type(results)}: {results}"
361 logger.error(error_msg)
362 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
363 raise RuntimeError(error_msg)
365 # Check 3: Expected number of results
366 if len(results) != len(wells):
367 error_msg = f"🔥 EXECUTION FAILED: Expected {len(wells)} results, got {len(results)}. Wells: {wells}, Result keys: {list(results.keys())}"
368 logger.error(error_msg)
369 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
370 raise RuntimeError(error_msg)
372 # Check 4: All wells have results
373 missing_wells = set(wells) - set(results.keys())
374 if missing_wells:
375 error_msg = f"🔥 EXECUTION FAILED: Missing results for wells: {missing_wells}"
376 logger.error(error_msg)
377 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
378 raise RuntimeError(error_msg)
380 # Check 5: All results have proper structure and check for errors
381 failed_wells = []
382 for well_id, result in results.items():
383 logger.info(f"🔥 SUBPROCESS: Validating result for well {well_id}: {result}")
385 if not isinstance(result, dict):
386 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} is not a dict: {type(result)} = {result}"
387 logger.error(error_msg)
388 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
389 raise RuntimeError(error_msg)
391 if 'status' not in result:
392 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} missing 'status' field: {result}"
393 logger.error(error_msg)
394 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
395 raise RuntimeError(error_msg)
397 if result.get('status') != 'success':
398 error_msg = result.get('error_message', 'Unknown error')
399 details = result.get('details', 'No details')
400 full_error = f"🔥 EXECUTION FAILED for well {well_id}: {error_msg} | Details: {details}"
401 logger.error(full_error)
402 print(f"🔥 SUBPROCESS STDOUT ERROR: {full_error}")
403 failed_wells.append((well_id, error_msg, details))
405 # Check 6: Raise if any wells failed
406 if failed_wells:
407 error_summary = f"🔥 EXECUTION FAILED: {len(failed_wells)} wells failed out of {len(wells)}"
408 for well_id, error_msg, details in failed_wells:
409 error_summary += f"\n - Well {well_id}: {error_msg}"
410 logger.error(error_summary)
411 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_summary}")
412 raise RuntimeError(error_summary)
414 logger.info(f"🔥 SUBPROCESS: EXECUTION SUCCESS: {len(results)} wells executed successfully")
416 # Success logged to log file (single source of truth)
417 logger.info(f"🔥 SUBPROCESS: COMPLETED plate {plate_path} with {len(results)} results")
419 logger.info(f"🔥 SUBPROCESS: Plate {plate_path} completed successfully")
421 except Exception as e:
422 error_msg = f"Execution failed for plate {plate_path}: {e}"
423 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True)
424 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
425 print(f"🔥 SUBPROCESS STDOUT TRACEBACK: {traceback.format_exc()}")
426 # Error logged to log file (single source of truth)
427 except BaseException as e:
428 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc.
429 error_msg = f"CRITICAL failure for plate {plate_path}: {e}"
430 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True)
431 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
432 print(f"🔥 SUBPROCESS STDOUT CRITICAL TRACEBACK: {traceback.format_exc()}")
433 # Error logged to log file (single source of truth)
435def main():
436 """Main entry point for subprocess runner."""
437 if len(sys.argv) < 3 or len(sys.argv) > 4:
438 print("Usage: python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id]")
439 sys.exit(1)
441 data_file = sys.argv[1]
442 log_file_base = sys.argv[2]
443 unique_id = sys.argv[3] if len(sys.argv) == 4 else None
445 # Build log file name from provided base and unique ID
446 if unique_id:
447 log_file = f"{log_file_base}_{unique_id}.log"
448 else:
449 log_file = f"{log_file_base}.log"
451 # PROCESS GROUP CLEANUP: Create new process group to manage all child processes
452 try:
453 import os
454 import signal
456 # Create new process group with this process as leader
457 os.setpgrp() # Create new process group
458 process_group_id = os.getpgrp()
460 print(f"🔥 SUBPROCESS: Created process group {process_group_id}")
462 # Track all child processes for cleanup
463 child_processes = set()
465 def kill_all_children():
466 """Kill all child processes and the entire process group."""
467 try:
468 print(f"🔥 SUBPROCESS: Killing process group {process_group_id}")
469 # Kill entire process group (negative PID kills process group)
470 os.killpg(process_group_id, signal.SIGTERM)
472 # Give processes time to exit gracefully
473 import time
474 time.sleep(2)
476 # Force kill if still alive
477 try:
478 os.killpg(process_group_id, signal.SIGKILL)
479 except ProcessLookupError:
480 pass # Already dead
482 print(f"🔥 SUBPROCESS: Process group {process_group_id} terminated")
483 except Exception as e:
484 print(f"🔥 SUBPROCESS: Error killing process group: {e}")
486 # Register cleanup function
487 atexit.register(kill_all_children)
489 except Exception as e:
490 print(f"🔥 SUBPROCESS: Warning - Could not set up process group cleanup: {e}")
492 # Set up logging first
493 logger = setup_subprocess_logging(log_file)
494 logger.info("🔥 SUBPROCESS: Starting OpenHCS subprocess runner")
495 logger.info(f"🔥 SUBPROCESS: Args - data: {data_file}, log: {log_file}")
496 logger.info(f"🔥 SUBPROCESS: Log file: {log_file}")
498 # DEATH DETECTION: Set up heartbeat monitoring
499 import threading
500 import time
502 def heartbeat_monitor():
503 """Monitor that writes heartbeats to detect where process dies."""
504 heartbeat_count = 0
505 while True:
506 try:
507 heartbeat_count += 1
508 heartbeat_msg = f"🔥 SUBPROCESS HEARTBEAT #{heartbeat_count}: Process alive at {time.time()}"
509 logger.info(heartbeat_msg)
510 print(heartbeat_msg)
512 # Heartbeat logged to log file (single source of truth)
513 # No separate heartbeat file needed
515 time.sleep(2) # Heartbeat every 2 seconds
516 except Exception as monitor_error:
517 logger.error(f"🔥 SUBPROCESS: Heartbeat monitor error: {monitor_error}")
518 print(f"🔥 SUBPROCESS STDOUT: Heartbeat monitor error: {monitor_error}")
519 break
521 # Start heartbeat monitor in daemon thread
522 heartbeat_thread = threading.Thread(target=heartbeat_monitor, daemon=True)
523 heartbeat_thread.start()
524 logger.info("🔥 SUBPROCESS: Heartbeat monitor started")
526 # NUCLEAR CRASH DETECTION - catch EVERYTHING
527 def crash_handler(signum, frame):
528 crash_msg = f"🔥 SUBPROCESS: CRASH DETECTED - Signal {signum} received!"
529 logger.error(crash_msg)
530 print(f"🔥 SUBPROCESS STDOUT CRASH: {crash_msg}")
532 # Crash info logged to log file (single source of truth)
534 # Dump stack trace
535 try:
536 import traceback
537 logger.error("🔥 SUBPROCESS: CRASH - Dumping all thread stacks...")
538 for thread_id, frame in sys._current_frames().items():
539 logger.error(f"🔥 SUBPROCESS: CRASH Thread {thread_id}:")
540 traceback.print_stack(frame)
541 except:
542 pass
544 # Force exit
545 os._exit(1)
547 # Set up signal handlers for all possible crashes
548 signal.signal(signal.SIGSEGV, crash_handler) # Segmentation fault
549 signal.signal(signal.SIGABRT, crash_handler) # Abort
550 signal.signal(signal.SIGFPE, crash_handler) # Floating point exception
551 signal.signal(signal.SIGILL, crash_handler) # Illegal instruction
552 signal.signal(signal.SIGTERM, crash_handler) # Termination
553 signal.signal(signal.SIGINT, crash_handler) # Interrupt (Ctrl+C)
555 # Set up atexit handler to catch silent deaths
556 def exit_handler():
557 logger.error("🔥 SUBPROCESS: ATEXIT - Process is exiting!")
558 print("🔥 SUBPROCESS STDOUT: ATEXIT - Process is exiting!")
559 # Exit info logged to log file (single source of truth)
561 atexit.register(exit_handler)
563 # Set up debug signal handler
564 def debug_handler(signum, frame):
565 logger.error("🔥 SUBPROCESS: SIGUSR1 received - dumping stack trace")
566 import traceback
567 import threading
569 # Dump all thread stacks
570 for thread_id, frame in sys._current_frames().items():
571 logger.error(f"🔥 SUBPROCESS: Thread {thread_id} stack:")
572 traceback.print_stack(frame)
574 # Log thread info
575 for thread in threading.enumerate():
576 logger.error(f"🔥 SUBPROCESS: Thread: {thread.name}, alive: {thread.is_alive()}")
578 signal.signal(signal.SIGUSR1, debug_handler)
579 logger.info("🔥 SUBPROCESS: NUCLEAR CRASH DETECTION ENABLED - All signals monitored")
581 try:
582 # Load pickled data
583 logger.info(f"🔥 SUBPROCESS: Loading data from {data_file}")
584 with open(data_file, 'rb') as f:
585 data = pickle.load(f)
587 plate_paths = data['plate_paths']
588 pipeline_data = data['pipeline_data'] # Dict[plate_path, List[FunctionStep]]
589 global_config = data['global_config']
590 effective_configs = data.get('effective_configs', {}) # Per-plate effective configs
592 logger.info(f"🔥 SUBPROCESS: Loaded data for {len(plate_paths)} plates")
593 logger.info(f"🔥 SUBPROCESS: Plates: {plate_paths}")
595 # Process each plate (like test_main.py but for multiple plates)
596 for plate_path in plate_paths:
597 plate_data = pipeline_data[plate_path]
598 pipeline_definition = plate_data['pipeline_definition']
599 compiled_contexts = plate_data['compiled_contexts']
600 effective_config = effective_configs.get(plate_path) # Get effective config for this plate
601 logger.info(f"🔥 SUBPROCESS: Processing plate {plate_path} with {len(pipeline_definition)} steps")
603 run_single_plate(
604 plate_path=plate_path,
605 pipeline_definition=pipeline_definition,
606 compiled_contexts=compiled_contexts,
607 global_config=global_config,
608 logger=logger,
609 log_file_base=log_file_base,
610 effective_config=effective_config
611 )
613 logger.info("🔥 SUBPROCESS: All plates completed successfully")
615 except Exception as e:
616 logger.error(f"🔥 SUBPROCESS: Fatal error: {e}", exc_info=True)
617 print(f"🔥 SUBPROCESS STDOUT FATAL: {e}")
618 print(f"🔥 SUBPROCESS STDOUT FATAL TRACEBACK: {traceback.format_exc()}")
619 # Error logged to log file (single source of truth)
620 logger.error(f"🔥 SUBPROCESS: Fatal error for all plates: {e}")
621 sys.exit(1)
622 except BaseException as e:
623 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc.
624 logger.error(f"🔥 SUBPROCESS: CRITICAL SYSTEM ERROR: {e}", exc_info=True)
625 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM: {e}")
626 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM TRACEBACK: {traceback.format_exc()}")
627 # Critical error logged to log file (single source of truth)
628 logger.error(f"🔥 SUBPROCESS: Critical system error for all plates: {e}")
629 sys.exit(2)
631if __name__ == "__main__":
632 main()