Coverage for openhcs/textual_tui/subprocess_runner.py: 0.0%
406 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1#!/usr/bin/env python3
2"""
3OpenHCS Subprocess Runner
5Standalone script that runs OpenHCS plate processing in a clean subprocess environment.
6This mimics the integration test pattern from test_main.py but runs independently.
8Usage:
9 python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id]
10"""
12import sys
13import json
14import dill as pickle
15import logging
16import traceback
17import signal
18import atexit
19import os
20from pathlib import Path
21from typing import Dict, List, Any
23logger = logging.getLogger(__name__)
25# Enable subprocess mode - this single variable controls all subprocess behavior
26os.environ['OPENHCS_SUBPROCESS_MODE'] = '1'
28# Prevent GPU library imports in subprocess runner - workers will handle GPU initialization
29os.environ['OPENHCS_SUBPROCESS_NO_GPU'] = '1'
31# Subprocess runner doesn't need function registry at all
32# Workers will initialize their own function registries when needed
33logger.info("Subprocess runner: No function registry needed - workers will handle initialization")
35def setup_subprocess_logging(log_file_path: str):
36 """Set up dedicated logging for the subprocess - all logs go to the specified file."""
38 # Configure root logger to capture ALL logs from subprocess and OpenHCS modules
39 root_logger = logging.getLogger()
40 root_logger.handlers.clear() # Clear any existing handlers
42 # Create file handler for subprocess logs
43 file_handler = logging.FileHandler(log_file_path)
44 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
45 root_logger.addHandler(file_handler)
46 root_logger.setLevel(logging.INFO)
48 # Ensure all OpenHCS module logs are captured
49 logging.getLogger("openhcs").setLevel(logging.INFO)
51 # Prevent console output - everything goes to file
52 logging.basicConfig = lambda *args, **kwargs: None
54 # Get subprocess logger
55 logger = logging.getLogger("openhcs.subprocess")
56 logger.info("SUBPROCESS: Logging configured")
58 return logger
60# Status and result files removed - log file is single source of truth
62def run_single_plate(plate_path: str, pipeline_definition: List, compiled_contexts: Dict,
63 global_config, logger, log_file_base: str = None, effective_config=None):
64 """
65 Run a single plate using pre-compiled contexts from UI.
67 This follows the pattern:
68 1. Initialize GPU registry
69 2. Create orchestrator and initialize
70 3. Execute pre-compiled plate (no compilation needed)
71 """
72 import psutil
73 import os
75 def log_thread_count(step_name):
76 thread_count = psutil.Process(os.getpid()).num_threads()
77 logger.info(f"🔥 SUBPROCESS: THREAD COUNT at {step_name}: {thread_count}")
78 print(f"🔥 SUBPROCESS STDOUT: THREAD COUNT at {step_name}: {thread_count}")
79 return thread_count
81 # NUCLEAR ERROR DETECTION: Wrap EVERYTHING in try/except
82 def force_error_detection(func_name, func, *args, **kwargs):
83 """Wrapper that forces any error to be visible and logged."""
84 try:
85 logger.info(f"🔥 SUBPROCESS: CALLING {func_name} with args={len(args)}, kwargs={len(kwargs)}")
86 print(f"🔥 SUBPROCESS STDOUT: CALLING {func_name}")
88 # DEATH DETECTION: Mark entry into function (log file only)
89 logger.info(f"🔥 SUBPROCESS: ENTERING: {func_name}")
91 result = func(*args, **kwargs)
93 # DEATH DETECTION: Mark successful completion (log file only)
94 logger.info(f"🔥 SUBPROCESS: COMPLETED: {func_name}")
96 logger.info(f"🔥 SUBPROCESS: {func_name} COMPLETED successfully")
97 print(f"🔥 SUBPROCESS STDOUT: {func_name} COMPLETED")
98 return result
99 except Exception as e:
100 error_msg = f"🔥 NUCLEAR ERROR in {func_name}: {e}"
101 logger.error(error_msg, exc_info=True)
102 print(f"🔥 SUBPROCESS STDOUT NUCLEAR ERROR: {error_msg}")
103 print(f"🔥 SUBPROCESS STDOUT NUCLEAR TRACEBACK: {traceback.format_exc()}")
104 # Error logged to log file (single source of truth)
105 raise RuntimeError(f"FORCED ERROR DETECTION: {func_name} failed: {e}") from e
106 except BaseException as e:
107 error_msg = f"🔥 NUCLEAR CRITICAL ERROR in {func_name}: {e}"
108 logger.error(error_msg, exc_info=True)
109 print(f"🔥 SUBPROCESS STDOUT NUCLEAR CRITICAL: {error_msg}")
110 print(f"🔥 SUBPROCESS STDOUT NUCLEAR CRITICAL TRACEBACK: {traceback.format_exc()}")
111 # Error logged to log file (single source of truth)
112 raise RuntimeError(f"FORCED CRITICAL ERROR DETECTION: {func_name} failed: {e}") from e
114 # DEATH DETECTION: Progress markers to find where process dies
115 def death_marker(location, details=""):
116 """Mark progress to detect where process dies (log file only)."""
117 marker_msg = f"🔥 DEATH_MARKER: {location} - {details}"
118 logger.info(marker_msg)
119 print(marker_msg)
121 try:
122 death_marker("FUNCTION_START", f"plate_path={plate_path}")
123 log_thread_count("function start")
125 death_marker("BEFORE_STARTING_LOG")
126 logger.info(f"SUBPROCESS: Starting plate {plate_path}")
128 death_marker("BEFORE_STATUS_WRITE")
129 logger.info(f"🔥 SUBPROCESS: STARTING plate {plate_path}")
130 death_marker("AFTER_STATUS_WRITE")
132 log_thread_count("after status write")
134 # Step 1: Validate global config (GPU registry will be initialized by workers)
135 death_marker("STEP1_START", "Global config validation")
136 logger.info("SUBPROCESS: Validating global config (GPU registry initialization deferred to workers)")
138 death_marker("BEFORE_CONFIG_IMPORT")
139 # NUCLEAR WRAP: Config import
140 def import_config():
141 from openhcs.core.config import GlobalPipelineConfig, PathPlanningConfig, VFSConfig
142 from openhcs.constants import Microscope
143 return GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope
144 GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope = force_error_detection("import_config", import_config)
145 death_marker("AFTER_CONFIG_IMPORT")
147 log_thread_count("after config import")
149 # Global config is already a proper object from pickle - no reconstruction needed!
150 log_thread_count("using pickled global config")
151 logger.info(f"🔥 SUBPROCESS: Using pickled global config: {type(global_config)}")
152 logger.info(f"🔥 SUBPROCESS: Zarr compressor: {global_config.zarr_config.compressor.value}")
153 log_thread_count("after global config validation")
155 logger.info("SUBPROCESS: Global config validated - GPU registry and CUDA streams will be initialized by workers")
157 # Step 2: Create orchestrator and initialize (like test_main.py)
158 logger.info("🔥 SUBPROCESS: Creating orchestrator...")
160 log_thread_count("before orchestrator import")
162 # NUCLEAR WRAP: Orchestrator import
163 def import_orchestrator():
164 from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
165 return PipelineOrchestrator
166 PipelineOrchestrator = force_error_detection("import_orchestrator", import_orchestrator)
168 log_thread_count("after orchestrator import")
170 # NUCLEAR WRAP: Storage registry import (lazy initialization for subprocess mode)
171 def import_storage_registry():
172 from openhcs.io.base import storage_registry, ensure_storage_registry
173 ensure_storage_registry() # Ensure registry is created in subprocess mode
174 return storage_registry
175 storage_registry = force_error_detection("import_storage_registry", import_storage_registry)
177 log_thread_count("after storage registry import")
179 # Skip function registry import in subprocess runner - workers will initialize their own
180 logger.info("SUBPROCESS: Function registry initialization deferred to workers")
184 log_thread_count("before orchestrator creation")
186 # NUCLEAR WRAP: Set up global config context (required before orchestrator creation)
187 def setup_global_context():
188 from openhcs.config_framework.lazy_factory import ensure_global_config_context
189 from openhcs.core.config import GlobalPipelineConfig
190 ensure_global_config_context(GlobalPipelineConfig, global_config)
191 force_error_detection("setup_global_context", setup_global_context)
193 # NUCLEAR WRAP: Orchestrator creation
194 orchestrator = force_error_detection("PipelineOrchestrator_creation", PipelineOrchestrator,
195 plate_path=plate_path,
196 storage_registry=storage_registry # Use default registry
197 )
198 log_thread_count("after orchestrator creation")
200 # NUCLEAR WRAP: Orchestrator initialization
201 force_error_detection("orchestrator_initialize", orchestrator.initialize)
202 log_thread_count("after orchestrator initialization")
203 logger.info("🔥 SUBPROCESS: Orchestrator initialized!")
205 # Step 3: Use wells from pre-compiled contexts (not rediscovery)
206 # The UI already compiled contexts for the specific wells in this plate
207 wells = list(compiled_contexts.keys())
208 logger.info(f"🔥 SUBPROCESS: Using pre-compiled wells for plate {plate_path}: {wells}")
209 logger.info(f"🔥 SUBPROCESS: Found {len(wells)} wells from compiled contexts")
211 # AGGRESSIVE VALIDATION: Check wells from compiled contexts
212 if not wells:
213 error_msg = f"🔥 CRITICAL: No wells found in compiled contexts for plate {plate_path}!"
214 logger.error(error_msg)
215 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
216 raise RuntimeError(error_msg)
217 if not isinstance(wells, list):
218 error_msg = f"🔥 CRITICAL: Wells is not a list: {type(wells)} = {wells}"
219 logger.error(error_msg)
220 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
221 raise RuntimeError(error_msg)
223 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(pipeline_definition)} steps")
224 logger.info(f"🔥 SUBPROCESS: Using pre-compiled contexts from UI")
225 logger.info(f"🔥 SUBPROCESS: EXECUTING plate {plate_path}")
227 # Step 5: Execution phase with multiprocessing (like test_main.py but with processes)
228 logger.info("🔥 SUBPROCESS: Starting execution phase with multiprocessing...")
230 # Use effective config passed from UI (includes pipeline config) instead of global config
231 config_to_use = effective_config if effective_config is not None else global_config
232 max_workers = config_to_use.num_workers
233 logger.info(f"🔥 SUBPROCESS: Using {max_workers} workers from {'effective' if effective_config else 'global'} config for {len(wells)} wells")
235 # This is where hangs often occur - add extra monitoring
236 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...")
238 # Skip GPU memory monitoring in subprocess runner - workers will handle GPU monitoring
239 logger.info("🔥 SUBPROCESS: GPU memory monitoring deferred to workers")
241 # Let's debug what's actually happening - use normal threading
242 logger.info("🔥 SUBPROCESS: Starting execution with detailed monitoring...")
244 # Create a custom progress callback to see exactly where it hangs
245 def progress_callback(well_id, step_name, status):
246 logger.info(f"🔥 SUBPROCESS: PROGRESS - Well {well_id}, Step '{step_name}', Status: {status}")
248 # Add monitoring without timeout
249 import threading
251 # Start monitoring thread
252 monitoring_active = threading.Event()
253 monitoring_active.set()
255 def monitor_execution():
256 count = 0
257 while monitoring_active.is_set():
258 count += 1
259 logger.info(f"🔥 SUBPROCESS: MONITOR #{count} - Still executing...")
261 # Skip GPU memory monitoring in subprocess runner
262 logger.info("🔥 SUBPROCESS: GPU memory monitoring deferred to workers")
264 # Check if we can get thread info
265 try:
266 import threading
267 active_threads = threading.active_count()
268 logger.info(f"🔥 SUBPROCESS: Active threads: {active_threads}")
269 except:
270 pass
272 # Log progress every 30 seconds with system info
273 if count % 6 == 0:
274 logger.info(f"🔥 SUBPROCESS: PROGRESS - Been running for {count*5} seconds, still executing...")
275 print(f"🔥 SUBPROCESS STDOUT: PROGRESS - {count*5} seconds elapsed")
277 # Add system monitoring to catch resource issues
278 try:
279 import psutil
280 import os
282 # Memory info
283 memory = psutil.virtual_memory()
284 swap = psutil.swap_memory()
285 process = psutil.Process(os.getpid())
287 logger.info(f"🔥 SUBPROCESS: SYSTEM - RAM: {memory.percent:.1f}% used, {memory.available/1024**3:.1f}GB free")
288 logger.info(f"🔥 SUBPROCESS: SYSTEM - Swap: {swap.percent:.1f}% used")
289 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process RAM: {process.memory_info().rss/1024**3:.1f}GB")
290 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process threads: {process.num_threads()}")
292 print(f"🔥 SUBPROCESS STDOUT: RAM {memory.percent:.1f}%, Process {process.memory_info().rss/1024**3:.1f}GB, Threads {process.num_threads()}")
294 # Check for memory pressure
295 if memory.percent > 90:
296 logger.error(f"🔥 SUBPROCESS: WARNING - High memory usage: {memory.percent:.1f}%")
297 print(f"🔥 SUBPROCESS STDOUT: HIGH MEMORY WARNING: {memory.percent:.1f}%")
299 if process.memory_info().rss > 16 * 1024**3: # 16GB
300 logger.error(f"🔥 SUBPROCESS: WARNING - Process using {process.memory_info().rss/1024**3:.1f}GB")
301 print(f"🔥 SUBPROCESS STDOUT: HIGH PROCESS MEMORY: {process.memory_info().rss/1024**3:.1f}GB")
303 except Exception as e:
304 logger.debug(f"Could not get system info: {e}")
306 threading.Event().wait(5) # Wait 5 seconds (more frequent)
308 monitor_thread = threading.Thread(target=monitor_execution, daemon=True)
309 monitor_thread.start()
311 try:
312 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...")
313 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(pipeline_definition)} steps")
314 logger.info(f"🔥 SUBPROCESS: Compiled contexts for {len(compiled_contexts)} wells")
315 logger.info("🔥 SUBPROCESS: Calling execute_compiled_plate NOW...")
317 log_thread_count("before execute_compiled_plate")
319 # PRE-EXECUTION STATE VALIDATION
320 logger.info("🔥 SUBPROCESS: PRE-EXECUTION VALIDATION...")
321 print("🔥 SUBPROCESS STDOUT: PRE-EXECUTION VALIDATION...")
323 if not hasattr(orchestrator, 'execute_compiled_plate'):
324 error_msg = "🔥 CRITICAL: orchestrator missing execute_compiled_plate method!"
325 logger.error(error_msg)
326 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
327 raise RuntimeError(error_msg)
329 if pipeline_definition is None:
330 error_msg = "🔥 CRITICAL: pipeline_definition is None!"
331 logger.error(error_msg)
332 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
333 raise RuntimeError(error_msg)
335 if compiled_contexts is None:
336 error_msg = "🔥 CRITICAL: compiled_contexts is None!"
337 logger.error(error_msg)
338 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
339 raise RuntimeError(error_msg)
341 logger.info(f"🔥 SUBPROCESS: PRE-EXECUTION OK - pipeline:{len(pipeline_definition)}, contexts:{len(compiled_contexts)}")
342 print(f"🔥 SUBPROCESS STDOUT: PRE-EXECUTION OK - pipeline:{len(pipeline_definition)}, contexts:{len(compiled_contexts)}")
344 # NUCLEAR EXECUTION WRAPPER: Force any error to surface
345 death_marker("BEFORE_EXECUTION_CALL", f"pipeline_steps={len(pipeline_definition)}, contexts={len(compiled_contexts)}")
346 logger.info("🔥 SUBPROCESS: CALLING NUCLEAR EXECUTION WRAPPER...")
347 print("🔥 SUBPROCESS STDOUT: CALLING NUCLEAR EXECUTION WRAPPER...")
349 death_marker("ENTERING_FORCE_ERROR_DETECTION")
350 results = force_error_detection("execute_compiled_plate", orchestrator.execute_compiled_plate,
351 pipeline_definition=pipeline_definition,
352 compiled_contexts=compiled_contexts,
353 max_workers=max_workers, # Use global config num_workers setting
354 visualizer=None, # Let orchestrator auto-create visualizers based on compiled contexts
355 log_file_base=log_file_base # Pass log base for worker process logging
356 )
357 death_marker("AFTER_FORCE_ERROR_DETECTION", f"results_type={type(results)}")
359 logger.info("🔥 SUBPROCESS: NUCLEAR EXECUTION WRAPPER RETURNED!")
360 print("🔥 SUBPROCESS STDOUT: NUCLEAR EXECUTION WRAPPER RETURNED!")
361 death_marker("EXECUTION_WRAPPER_RETURNED")
363 log_thread_count("after execute_compiled_plate")
365 logger.info("🔥 SUBPROCESS: execute_compiled_plate RETURNED successfully!")
366 logger.info(f"🔥 SUBPROCESS: Results: {type(results)}, length: {len(results) if results else 'None'}")
368 # FORCE ERROR DETECTION: Check for None results immediately
369 if results is None:
370 error_msg = "🔥 CRITICAL: execute_compiled_plate returned None - this should never happen!"
371 logger.error(error_msg)
372 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
373 raise RuntimeError(error_msg)
375 except Exception as execution_error:
376 # FORCE ERROR PROPAGATION: Re-raise with enhanced context
377 error_msg = f"🔥 EXECUTION ERROR in execute_compiled_plate: {execution_error}"
378 logger.error(error_msg, exc_info=True)
379 print(f"🔥 SUBPROCESS STDOUT EXECUTION ERROR: {error_msg}")
380 print(f"🔥 SUBPROCESS STDOUT EXECUTION TRACEBACK: {traceback.format_exc()}")
381 raise RuntimeError(error_msg) from execution_error
382 finally:
383 monitoring_active.clear() # Stop monitoring
385 logger.info("🔥 SUBPROCESS: Execution completed!")
387 # AGGRESSIVE RESULT VALIDATION: Force errors to surface
388 logger.info("🔥 SUBPROCESS: Starting aggressive result validation...")
390 # Check 1: Results exist
391 if not results:
392 error_msg = "🔥 EXECUTION FAILED: No results returned from execute_compiled_plate!"
393 logger.error(error_msg)
394 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
395 raise RuntimeError(error_msg)
397 # Check 2: Results is a dictionary
398 if not isinstance(results, dict):
399 error_msg = f"🔥 EXECUTION FAILED: Results is not a dict, got {type(results)}: {results}"
400 logger.error(error_msg)
401 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
402 raise RuntimeError(error_msg)
404 # Check 3: Expected number of results
405 if len(results) != len(wells):
406 error_msg = f"🔥 EXECUTION FAILED: Expected {len(wells)} results, got {len(results)}. Wells: {wells}, Result keys: {list(results.keys())}"
407 logger.error(error_msg)
408 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
409 raise RuntimeError(error_msg)
411 # Check 4: All wells have results
412 missing_wells = set(wells) - set(results.keys())
413 if missing_wells:
414 error_msg = f"🔥 EXECUTION FAILED: Missing results for wells: {missing_wells}"
415 logger.error(error_msg)
416 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
417 raise RuntimeError(error_msg)
419 # Check 5: All results have proper structure and check for errors
420 failed_wells = []
421 for well_id, result in results.items():
422 logger.info(f"🔥 SUBPROCESS: Validating result for well {well_id}: {result}")
424 if not isinstance(result, dict):
425 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} is not a dict: {type(result)} = {result}"
426 logger.error(error_msg)
427 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
428 raise RuntimeError(error_msg)
430 if 'status' not in result:
431 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} missing 'status' field: {result}"
432 logger.error(error_msg)
433 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
434 raise RuntimeError(error_msg)
436 if result.get('status') != 'success':
437 error_msg = result.get('error_message', 'Unknown error')
438 details = result.get('details', 'No details')
439 full_error = f"🔥 EXECUTION FAILED for well {well_id}: {error_msg} | Details: {details}"
440 logger.error(full_error)
441 print(f"🔥 SUBPROCESS STDOUT ERROR: {full_error}")
442 failed_wells.append((well_id, error_msg, details))
444 # Check 6: Raise if any wells failed
445 if failed_wells:
446 error_summary = f"🔥 EXECUTION FAILED: {len(failed_wells)} wells failed out of {len(wells)}"
447 for well_id, error_msg, details in failed_wells:
448 error_summary += f"\n - Well {well_id}: {error_msg}"
449 logger.error(error_summary)
450 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_summary}")
451 raise RuntimeError(error_summary)
453 logger.info(f"🔥 SUBPROCESS: EXECUTION SUCCESS: {len(results)} wells executed successfully")
455 # Success logged to log file (single source of truth)
456 logger.info(f"🔥 SUBPROCESS: COMPLETED plate {plate_path} with {len(results)} results")
458 logger.info(f"🔥 SUBPROCESS: Plate {plate_path} completed successfully")
460 except Exception as e:
461 error_msg = f"Execution failed for plate {plate_path}: {e}"
462 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True)
463 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
464 print(f"🔥 SUBPROCESS STDOUT TRACEBACK: {traceback.format_exc()}")
465 # Error logged to log file (single source of truth)
466 except BaseException as e:
467 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc.
468 error_msg = f"CRITICAL failure for plate {plate_path}: {e}"
469 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True)
470 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
471 print(f"🔥 SUBPROCESS STDOUT CRITICAL TRACEBACK: {traceback.format_exc()}")
472 # Error logged to log file (single source of truth)
474def main():
475 """Main entry point for subprocess runner."""
476 if len(sys.argv) < 3 or len(sys.argv) > 4:
477 print("Usage: python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id]")
478 sys.exit(1)
480 data_file = sys.argv[1]
481 log_file_base = sys.argv[2]
482 unique_id = sys.argv[3] if len(sys.argv) == 4 else None
484 # Build log file name from provided base and unique ID
485 if unique_id:
486 log_file = f"{log_file_base}_{unique_id}.log"
487 else:
488 log_file = f"{log_file_base}.log"
490 # PROCESS GROUP CLEANUP: Create new process group to manage all child processes
491 try:
492 import os
493 import signal
495 # Create new process group with this process as leader
496 os.setpgrp() # Create new process group
497 process_group_id = os.getpgrp()
499 print(f"🔥 SUBPROCESS: Created process group {process_group_id}")
501 # Track all child processes for cleanup
502 child_processes = set()
504 def kill_all_children():
505 """Kill all child processes and the entire process group."""
506 try:
507 print(f"🔥 SUBPROCESS: Killing process group {process_group_id}")
508 # Kill entire process group (negative PID kills process group)
509 os.killpg(process_group_id, signal.SIGTERM)
511 # Give processes time to exit gracefully
512 import time
513 time.sleep(2)
515 # Force kill if still alive
516 try:
517 os.killpg(process_group_id, signal.SIGKILL)
518 except ProcessLookupError:
519 pass # Already dead
521 print(f"🔥 SUBPROCESS: Process group {process_group_id} terminated")
522 except Exception as e:
523 print(f"🔥 SUBPROCESS: Error killing process group: {e}")
525 # Register cleanup function
526 atexit.register(kill_all_children)
528 except Exception as e:
529 print(f"🔥 SUBPROCESS: Warning - Could not set up process group cleanup: {e}")
531 # Set up logging first
532 logger = setup_subprocess_logging(log_file)
533 logger.info("🔥 SUBPROCESS: Starting OpenHCS subprocess runner")
534 logger.info(f"🔥 SUBPROCESS: Args - data: {data_file}, log: {log_file}")
535 logger.info(f"🔥 SUBPROCESS: Log file: {log_file}")
537 # DEATH DETECTION: Set up heartbeat monitoring
538 import threading
539 import time
541 def heartbeat_monitor():
542 """Monitor that writes heartbeats to detect where process dies."""
543 heartbeat_count = 0
544 while True:
545 try:
546 heartbeat_count += 1
547 heartbeat_msg = f"🔥 SUBPROCESS HEARTBEAT #{heartbeat_count}: Process alive at {time.time()}"
548 logger.info(heartbeat_msg)
549 print(heartbeat_msg)
551 # Heartbeat logged to log file (single source of truth)
552 # No separate heartbeat file needed
554 time.sleep(2) # Heartbeat every 2 seconds
555 except Exception as monitor_error:
556 logger.error(f"🔥 SUBPROCESS: Heartbeat monitor error: {monitor_error}")
557 print(f"🔥 SUBPROCESS STDOUT: Heartbeat monitor error: {monitor_error}")
558 break
560 # Start heartbeat monitor in daemon thread
561 heartbeat_thread = threading.Thread(target=heartbeat_monitor, daemon=True)
562 heartbeat_thread.start()
563 logger.info("🔥 SUBPROCESS: Heartbeat monitor started")
565 # NUCLEAR CRASH DETECTION - catch EVERYTHING
566 def crash_handler(signum, frame):
567 crash_msg = f"🔥 SUBPROCESS: CRASH DETECTED - Signal {signum} received!"
568 logger.error(crash_msg)
569 print(f"🔥 SUBPROCESS STDOUT CRASH: {crash_msg}")
571 # Crash info logged to log file (single source of truth)
573 # Dump stack trace
574 try:
575 import traceback
576 import threading
577 logger.error("🔥 SUBPROCESS: CRASH - Dumping all thread stacks...")
578 for thread_id, frame in sys._current_frames().items():
579 logger.error(f"🔥 SUBPROCESS: CRASH Thread {thread_id}:")
580 traceback.print_stack(frame)
581 except:
582 pass
584 # Force exit
585 os._exit(1)
587 # Set up signal handlers for all possible crashes
588 signal.signal(signal.SIGSEGV, crash_handler) # Segmentation fault
589 signal.signal(signal.SIGABRT, crash_handler) # Abort
590 signal.signal(signal.SIGFPE, crash_handler) # Floating point exception
591 signal.signal(signal.SIGILL, crash_handler) # Illegal instruction
592 signal.signal(signal.SIGTERM, crash_handler) # Termination
593 signal.signal(signal.SIGINT, crash_handler) # Interrupt (Ctrl+C)
595 # Set up atexit handler to catch silent deaths
596 def exit_handler():
597 logger.error("🔥 SUBPROCESS: ATEXIT - Process is exiting!")
598 print("🔥 SUBPROCESS STDOUT: ATEXIT - Process is exiting!")
599 # Exit info logged to log file (single source of truth)
601 atexit.register(exit_handler)
603 # Set up debug signal handler
604 def debug_handler(signum, frame):
605 logger.error("🔥 SUBPROCESS: SIGUSR1 received - dumping stack trace")
606 import traceback
607 import threading
609 # Dump all thread stacks
610 for thread_id, frame in sys._current_frames().items():
611 logger.error(f"🔥 SUBPROCESS: Thread {thread_id} stack:")
612 traceback.print_stack(frame)
614 # Log thread info
615 for thread in threading.enumerate():
616 logger.error(f"🔥 SUBPROCESS: Thread: {thread.name}, alive: {thread.is_alive()}")
618 signal.signal(signal.SIGUSR1, debug_handler)
619 logger.info("🔥 SUBPROCESS: NUCLEAR CRASH DETECTION ENABLED - All signals monitored")
621 try:
622 # Load pickled data
623 logger.info(f"🔥 SUBPROCESS: Loading data from {data_file}")
624 with open(data_file, 'rb') as f:
625 data = pickle.load(f)
627 plate_paths = data['plate_paths']
628 pipeline_data = data['pipeline_data'] # Dict[plate_path, List[FunctionStep]]
629 global_config = data['global_config']
630 effective_configs = data.get('effective_configs', {}) # Per-plate effective configs
632 logger.info(f"🔥 SUBPROCESS: Loaded data for {len(plate_paths)} plates")
633 logger.info(f"🔥 SUBPROCESS: Plates: {plate_paths}")
635 # Process each plate (like test_main.py but for multiple plates)
636 for plate_path in plate_paths:
637 plate_data = pipeline_data[plate_path]
638 pipeline_definition = plate_data['pipeline_definition']
639 compiled_contexts = plate_data['compiled_contexts']
640 effective_config = effective_configs.get(plate_path) # Get effective config for this plate
641 logger.info(f"🔥 SUBPROCESS: Processing plate {plate_path} with {len(pipeline_definition)} steps")
643 run_single_plate(
644 plate_path=plate_path,
645 pipeline_definition=pipeline_definition,
646 compiled_contexts=compiled_contexts,
647 global_config=global_config,
648 logger=logger,
649 log_file_base=log_file_base,
650 effective_config=effective_config
651 )
653 logger.info("🔥 SUBPROCESS: All plates completed successfully")
655 except Exception as e:
656 logger.error(f"🔥 SUBPROCESS: Fatal error: {e}", exc_info=True)
657 print(f"🔥 SUBPROCESS STDOUT FATAL: {e}")
658 print(f"🔥 SUBPROCESS STDOUT FATAL TRACEBACK: {traceback.format_exc()}")
659 # Error logged to log file (single source of truth)
660 logger.error(f"🔥 SUBPROCESS: Fatal error for all plates: {e}")
661 sys.exit(1)
662 except BaseException as e:
663 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc.
664 logger.error(f"🔥 SUBPROCESS: CRITICAL SYSTEM ERROR: {e}", exc_info=True)
665 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM: {e}")
666 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM TRACEBACK: {traceback.format_exc()}")
667 # Critical error logged to log file (single source of truth)
668 logger.error(f"🔥 SUBPROCESS: Critical system error for all plates: {e}")
669 sys.exit(2)
671if __name__ == "__main__":
672 main()