Coverage for openhcs/textual_tui/subprocess_runner.py: 0.0%
474 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1#!/usr/bin/env python3
2"""
3OpenHCS Subprocess Runner
5Standalone script that runs OpenHCS plate processing in a clean subprocess environment.
6This mimics the integration test pattern from test_main.py but runs independently.
8Usage:
9 python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id]
10"""
12import sys
13import json
14import dill as pickle
15import logging
16import traceback
17import signal
18import atexit
19import os
20from pathlib import Path
21from typing import Dict, List, Any
23# Enable subprocess mode - this single variable controls all subprocess behavior
24os.environ['OPENHCS_SUBPROCESS_MODE'] = '1'
26# Initialize function registry for subprocess workers
27def _initialize_subprocess_registry():
28 """Initialize function registry optimized for subprocess workers."""
29 import openhcs.processing.func_registry as func_registry_module
31 with func_registry_module._registry_lock:
32 if not func_registry_module._registry_initialized:
33 # Initialize empty registry structure
34 func_registry_module.FUNC_REGISTRY.clear()
35 for memory_type in func_registry_module.VALID_MEMORY_TYPES:
36 func_registry_module.FUNC_REGISTRY[memory_type] = []
37 func_registry_module._registry_initialized = True
39 # Register external libraries using cached metadata (fast)
40 func_registry_module._register_external_libraries()
42_initialize_subprocess_registry()
44def setup_subprocess_logging(log_file_path: str):
45 """Set up dedicated logging for the subprocess - all logs go to the specified file."""
47 # Configure root logger to capture ALL logs from subprocess and OpenHCS modules
48 root_logger = logging.getLogger()
49 root_logger.handlers.clear() # Clear any existing handlers
51 # Create file handler for subprocess logs
52 file_handler = logging.FileHandler(log_file_path)
53 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
54 root_logger.addHandler(file_handler)
55 root_logger.setLevel(logging.INFO)
57 # Ensure all OpenHCS module logs are captured
58 logging.getLogger("openhcs").setLevel(logging.INFO)
60 # Prevent console output - everything goes to file
61 logging.basicConfig = lambda *args, **kwargs: None
63 # Get subprocess logger
64 logger = logging.getLogger("openhcs.subprocess")
65 logger.info("SUBPROCESS: Logging configured")
67 return logger
69# Status and result files removed - log file is single source of truth
71def run_single_plate(plate_path: str, pipeline_steps: List, global_config,
72 logger, log_file_base: str = None):
73 """
74 Run a single plate using the integration test pattern.
76 This follows the exact same pattern as test_main.py:
77 1. Initialize GPU registry
78 2. Create orchestrator and initialize
79 3. Get wells and compile pipelines
80 4. Execute compiled plate
81 """
82 import psutil
83 import os
85 def log_thread_count(step_name):
86 thread_count = psutil.Process(os.getpid()).num_threads()
87 logger.info(f"🔥 SUBPROCESS: THREAD COUNT at {step_name}: {thread_count}")
88 print(f"🔥 SUBPROCESS STDOUT: THREAD COUNT at {step_name}: {thread_count}")
89 return thread_count
91 # NUCLEAR ERROR DETECTION: Wrap EVERYTHING in try/except
92 def force_error_detection(func_name, func, *args, **kwargs):
93 """Wrapper that forces any error to be visible and logged."""
94 try:
95 logger.info(f"🔥 SUBPROCESS: CALLING {func_name} with args={len(args)}, kwargs={len(kwargs)}")
96 print(f"🔥 SUBPROCESS STDOUT: CALLING {func_name}")
98 # DEATH DETECTION: Mark entry into function (log file only)
99 logger.info(f"🔥 SUBPROCESS: ENTERING: {func_name}")
101 result = func(*args, **kwargs)
103 # DEATH DETECTION: Mark successful completion (log file only)
104 logger.info(f"🔥 SUBPROCESS: COMPLETED: {func_name}")
106 logger.info(f"🔥 SUBPROCESS: {func_name} COMPLETED successfully")
107 print(f"🔥 SUBPROCESS STDOUT: {func_name} COMPLETED")
108 return result
109 except Exception as e:
110 error_msg = f"🔥 NUCLEAR ERROR in {func_name}: {e}"
111 logger.error(error_msg, exc_info=True)
112 print(f"🔥 SUBPROCESS STDOUT NUCLEAR ERROR: {error_msg}")
113 print(f"🔥 SUBPROCESS STDOUT NUCLEAR TRACEBACK: {traceback.format_exc()}")
114 # Error logged to log file (single source of truth)
115 raise RuntimeError(f"FORCED ERROR DETECTION: {func_name} failed: {e}") from e
116 except BaseException as e:
117 error_msg = f"🔥 NUCLEAR CRITICAL ERROR in {func_name}: {e}"
118 logger.error(error_msg, exc_info=True)
119 print(f"🔥 SUBPROCESS STDOUT NUCLEAR CRITICAL: {error_msg}")
120 print(f"🔥 SUBPROCESS STDOUT NUCLEAR CRITICAL TRACEBACK: {traceback.format_exc()}")
121 # Error logged to log file (single source of truth)
122 raise RuntimeError(f"FORCED CRITICAL ERROR DETECTION: {func_name} failed: {e}") from e
124 # DEATH DETECTION: Progress markers to find where process dies
125 def death_marker(location, details=""):
126 """Mark progress to detect where process dies (log file only)."""
127 marker_msg = f"🔥 DEATH_MARKER: {location} - {details}"
128 logger.info(marker_msg)
129 print(marker_msg)
131 try:
132 death_marker("FUNCTION_START", f"plate_path={plate_path}")
133 log_thread_count("function start")
135 death_marker("BEFORE_STARTING_LOG")
136 logger.info(f"SUBPROCESS: Starting plate {plate_path}")
138 death_marker("BEFORE_STATUS_WRITE")
139 logger.info(f"🔥 SUBPROCESS: STARTING plate {plate_path}")
140 death_marker("AFTER_STATUS_WRITE")
142 log_thread_count("after status write")
144 # Step 1: Initialize GPU registry (like test_main.py)
145 death_marker("STEP1_START", "GPU registry initialization")
146 logger.info("SUBPROCESS: Initializing GPU registry")
148 death_marker("BEFORE_GPU_IMPORT")
149 log_thread_count("before GPU scheduler import")
151 # NUCLEAR WRAP: GPU scheduler import
152 def import_gpu_scheduler():
153 from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry
154 return setup_global_gpu_registry
155 setup_global_gpu_registry = force_error_detection("import_gpu_scheduler", import_gpu_scheduler)
156 death_marker("AFTER_GPU_IMPORT")
158 log_thread_count("after GPU scheduler import")
160 death_marker("BEFORE_CONFIG_IMPORT")
161 # NUCLEAR WRAP: Config import
162 def import_config():
163 from openhcs.core.config import GlobalPipelineConfig, PathPlanningConfig, VFSConfig
164 from openhcs.constants import Microscope
165 return GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope
166 GlobalPipelineConfig, PathPlanningConfig, VFSConfig, Microscope = force_error_detection("import_config", import_config)
167 death_marker("AFTER_CONFIG_IMPORT")
169 log_thread_count("after config import")
171 # Global config is already a proper object from pickle - no reconstruction needed!
172 log_thread_count("using pickled global config")
173 logger.info(f"🔥 SUBPROCESS: Using pickled global config: {type(global_config)}")
174 logger.info(f"🔥 SUBPROCESS: Zarr compressor: {global_config.zarr.compressor.value}")
175 log_thread_count("after global config validation")
177 # NUCLEAR WRAP: GPU registry setup
178 force_error_detection("setup_global_gpu_registry", setup_global_gpu_registry, global_config=global_config)
180 log_thread_count("after GPU registry setup")
181 logger.info("SUBPROCESS: GPU registry initialized")
183 # PROCESS-LEVEL CUDA STREAM SETUP for true parallelism
184 logger.info("🔥 SUBPROCESS: Setting up process-specific CUDA streams...")
185 try:
186 import os
187 process_id = os.getpid()
189 # Set unique CUDA stream for this process based on PID
190 try:
191 import torch
192 if torch.cuda.is_available():
193 # Create process-specific stream
194 torch.cuda.set_device(0) # Use GPU 0
195 process_stream = torch.cuda.Stream()
196 torch.cuda.set_stream(process_stream)
197 logger.info(f"🔥 SUBPROCESS: Created PyTorch CUDA stream for process {process_id}")
198 except ImportError:
199 logger.debug("PyTorch not available for stream setup")
201 try:
202 import cupy as cp
203 if cp.cuda.is_available():
204 # Create process-specific stream
205 cp.cuda.Device(0).use() # Use GPU 0
206 process_stream = cp.cuda.Stream()
207 cp.cuda.Stream.null = process_stream # Set as default stream
208 logger.info(f"🔥 SUBPROCESS: Created CuPy CUDA stream for process {process_id}")
209 except ImportError:
210 logger.debug("CuPy not available for stream setup")
212 except Exception as stream_error:
213 logger.warning(f"🔥 SUBPROCESS: Could not set up process streams: {stream_error}")
214 # Continue anyway - not critical
216 # Step 2: Create orchestrator and initialize (like test_main.py)
217 logger.info("🔥 SUBPROCESS: Creating orchestrator...")
219 log_thread_count("before orchestrator import")
221 # NUCLEAR WRAP: Orchestrator import
222 def import_orchestrator():
223 from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
224 return PipelineOrchestrator
225 PipelineOrchestrator = force_error_detection("import_orchestrator", import_orchestrator)
227 log_thread_count("after orchestrator import")
229 # NUCLEAR WRAP: Storage registry import
230 def import_storage_registry():
231 from openhcs.io.base import storage_registry
232 return storage_registry
233 storage_registry = force_error_detection("import_storage_registry", import_storage_registry)
235 log_thread_count("after storage registry import")
237 # NUCLEAR WRAP: Function registry import (CRITICAL for auto-discovered functions)
238 def import_function_registry():
239 from openhcs.processing.func_registry import FUNC_REGISTRY
240 return FUNC_REGISTRY
241 FUNC_REGISTRY = force_error_detection("import_function_registry", import_function_registry)
243 log_thread_count("after function registry import")
244 logger.info("SUBPROCESS: Function registry initialized")
248 log_thread_count("before orchestrator creation")
250 # NUCLEAR WRAP: Orchestrator creation
251 orchestrator = force_error_detection("PipelineOrchestrator_creation", PipelineOrchestrator,
252 plate_path=plate_path,
253 global_config=global_config,
254 storage_registry=storage_registry # Use default registry
255 )
256 log_thread_count("after orchestrator creation")
258 # NUCLEAR WRAP: Orchestrator initialization
259 force_error_detection("orchestrator_initialize", orchestrator.initialize)
260 log_thread_count("after orchestrator initialization")
261 logger.info("🔥 SUBPROCESS: Orchestrator initialized!")
263 # Step 3: Get wells and prepare pipeline (like test_main.py)
264 # NUCLEAR WRAP: Get wells
265 from openhcs.constants.constants import GroupBy
266 wells = force_error_detection("orchestrator_get_wells", lambda: orchestrator.get_component_keys(GroupBy.WELL))
267 logger.info(f"🔥 SUBPROCESS: Found {len(wells)} wells: {wells}")
269 # AGGRESSIVE VALIDATION: Check wells
270 if not wells:
271 error_msg = "🔥 CRITICAL: No wells found by orchestrator!"
272 logger.error(error_msg)
273 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
274 raise RuntimeError(error_msg)
275 if not isinstance(wells, list):
276 error_msg = f"🔥 CRITICAL: Wells is not a list: {type(wells)} = {wells}"
277 logger.error(error_msg)
278 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
279 raise RuntimeError(error_msg)
281 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(pipeline_steps)} steps")
282 logger.info(f"🔥 SUBPROCESS: COMPILING plate {plate_path}")
284 # Step 4: Compilation phase (like test_main.py)
285 logger.info("🔥 SUBPROCESS: Starting compilation phase...")
287 # Make fresh copy of pipeline steps and fix IDs (like TUI does)
288 import copy
289 from openhcs.constants.constants import VariableComponents
290 execution_pipeline = copy.deepcopy(pipeline_steps)
292 # Fix step IDs after deep copy to match new object IDs
293 for step in execution_pipeline:
294 step.step_id = str(id(step))
295 # Ensure variable_components is never None
296 if step.variable_components is None:
297 logger.warning(f"🔥 Step '{step.name}' has None variable_components, setting default")
298 step.variable_components = [VariableComponents.SITE]
299 elif not step.variable_components:
300 logger.warning(f"🔥 Step '{step.name}' has empty variable_components, setting default")
301 step.variable_components = [VariableComponents.SITE]
303 compiled_contexts = orchestrator.compile_pipelines(
304 pipeline_definition=execution_pipeline,
305 well_filter=wells
306 )
307 logger.info("🔥 SUBPROCESS: Compilation completed!")
309 # Verify compilation (like test_main.py)
310 if not compiled_contexts:
311 raise RuntimeError("🔥 COMPILATION FAILED: No compiled contexts returned!")
312 if len(compiled_contexts) != len(wells):
313 raise RuntimeError(f"🔥 COMPILATION FAILED: Expected {len(wells)} contexts, got {len(compiled_contexts)}")
314 logger.info(f"🔥 SUBPROCESS: Compilation SUCCESS: {len(compiled_contexts)} contexts compiled")
315 logger.info(f"🔥 SUBPROCESS: EXECUTING plate {plate_path}")
317 # Step 5: Execution phase with multiprocessing (like test_main.py but with processes)
318 logger.info("🔥 SUBPROCESS: Starting execution phase with multiprocessing...")
320 # Use global config num_workers setting
321 max_workers = global_config.num_workers
322 logger.info(f"🔥 SUBPROCESS: Using {max_workers} workers from global config for {len(wells)} wells")
324 # This is where hangs often occur - add extra monitoring
325 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...")
327 # Add GPU memory monitoring before execution
328 try:
329 import torch
330 if torch.cuda.is_available():
331 gpu_mem_before = torch.cuda.memory_allocated() / 1024**3
332 gpu_mem_reserved = torch.cuda.memory_reserved() / 1024**3
333 logger.info(f"🔥 SUBPROCESS: GPU memory before execution - Allocated: {gpu_mem_before:.2f}GB, Reserved: {gpu_mem_reserved:.2f}GB")
334 except Exception as e:
335 logger.warning(f"🔥 SUBPROCESS: Could not check GPU memory: {e}")
337 # Let's debug what's actually happening - use normal threading
338 logger.info("🔥 SUBPROCESS: Starting execution with detailed monitoring...")
340 # Create a custom progress callback to see exactly where it hangs
341 def progress_callback(well_id, step_name, status):
342 logger.info(f"🔥 SUBPROCESS: PROGRESS - Well {well_id}, Step '{step_name}', Status: {status}")
344 # Add monitoring without timeout
345 import threading
347 # Start monitoring thread
348 monitoring_active = threading.Event()
349 monitoring_active.set()
351 def monitor_execution():
352 count = 0
353 while monitoring_active.is_set():
354 count += 1
355 logger.info(f"🔥 SUBPROCESS: MONITOR #{count} - Still executing, checking GPU memory...")
357 try:
358 import torch
359 if torch.cuda.is_available():
360 allocated = torch.cuda.memory_allocated() / 1024**3
361 reserved = torch.cuda.memory_reserved() / 1024**3
362 logger.info(f"🔥 SUBPROCESS: GPU Memory - Allocated: {allocated:.2f}GB, Reserved: {reserved:.2f}GB")
363 except:
364 pass
366 # Check if we can get thread info
367 try:
368 import threading
369 active_threads = threading.active_count()
370 logger.info(f"🔥 SUBPROCESS: Active threads: {active_threads}")
371 except:
372 pass
374 # Log progress every 30 seconds with system info
375 if count % 6 == 0:
376 logger.info(f"🔥 SUBPROCESS: PROGRESS - Been running for {count*5} seconds, still executing...")
377 print(f"🔥 SUBPROCESS STDOUT: PROGRESS - {count*5} seconds elapsed")
379 # Add system monitoring to catch resource issues
380 try:
381 import psutil
382 import os
384 # Memory info
385 memory = psutil.virtual_memory()
386 swap = psutil.swap_memory()
387 process = psutil.Process(os.getpid())
389 logger.info(f"🔥 SUBPROCESS: SYSTEM - RAM: {memory.percent:.1f}% used, {memory.available/1024**3:.1f}GB free")
390 logger.info(f"🔥 SUBPROCESS: SYSTEM - Swap: {swap.percent:.1f}% used")
391 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process RAM: {process.memory_info().rss/1024**3:.1f}GB")
392 logger.info(f"🔥 SUBPROCESS: SYSTEM - Process threads: {process.num_threads()}")
394 print(f"🔥 SUBPROCESS STDOUT: RAM {memory.percent:.1f}%, Process {process.memory_info().rss/1024**3:.1f}GB, Threads {process.num_threads()}")
396 # Check for memory pressure
397 if memory.percent > 90:
398 logger.error(f"🔥 SUBPROCESS: WARNING - High memory usage: {memory.percent:.1f}%")
399 print(f"🔥 SUBPROCESS STDOUT: HIGH MEMORY WARNING: {memory.percent:.1f}%")
401 if process.memory_info().rss > 16 * 1024**3: # 16GB
402 logger.error(f"🔥 SUBPROCESS: WARNING - Process using {process.memory_info().rss/1024**3:.1f}GB")
403 print(f"🔥 SUBPROCESS STDOUT: HIGH PROCESS MEMORY: {process.memory_info().rss/1024**3:.1f}GB")
405 except Exception as e:
406 logger.debug(f"Could not get system info: {e}")
408 threading.Event().wait(5) # Wait 5 seconds (more frequent)
410 monitor_thread = threading.Thread(target=monitor_execution, daemon=True)
411 monitor_thread.start()
413 try:
414 logger.info("🔥 SUBPROCESS: About to call execute_compiled_plate...")
415 logger.info(f"🔥 SUBPROCESS: Pipeline has {len(execution_pipeline)} steps")
416 logger.info(f"🔥 SUBPROCESS: Compiled contexts for {len(compiled_contexts)} wells")
417 logger.info("🔥 SUBPROCESS: Calling execute_compiled_plate NOW...")
419 log_thread_count("before execute_compiled_plate")
421 # PRE-EXECUTION STATE VALIDATION
422 logger.info("🔥 SUBPROCESS: PRE-EXECUTION VALIDATION...")
423 print("🔥 SUBPROCESS STDOUT: PRE-EXECUTION VALIDATION...")
425 if not hasattr(orchestrator, 'execute_compiled_plate'):
426 error_msg = "🔥 CRITICAL: orchestrator missing execute_compiled_plate method!"
427 logger.error(error_msg)
428 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
429 raise RuntimeError(error_msg)
431 if execution_pipeline is None:
432 error_msg = "🔥 CRITICAL: execution_pipeline is None!"
433 logger.error(error_msg)
434 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
435 raise RuntimeError(error_msg)
437 if compiled_contexts is None:
438 error_msg = "🔥 CRITICAL: compiled_contexts is None!"
439 logger.error(error_msg)
440 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
441 raise RuntimeError(error_msg)
443 logger.info(f"🔥 SUBPROCESS: PRE-EXECUTION OK - pipeline:{len(execution_pipeline)}, contexts:{len(compiled_contexts)}")
444 print(f"🔥 SUBPROCESS STDOUT: PRE-EXECUTION OK - pipeline:{len(execution_pipeline)}, contexts:{len(compiled_contexts)}")
446 # NUCLEAR EXECUTION WRAPPER: Force any error to surface
447 death_marker("BEFORE_EXECUTION_CALL", f"pipeline_steps={len(execution_pipeline)}, contexts={len(compiled_contexts)}")
448 logger.info("🔥 SUBPROCESS: CALLING NUCLEAR EXECUTION WRAPPER...")
449 print("🔥 SUBPROCESS STDOUT: CALLING NUCLEAR EXECUTION WRAPPER...")
451 death_marker("ENTERING_FORCE_ERROR_DETECTION")
452 results = force_error_detection("execute_compiled_plate", orchestrator.execute_compiled_plate,
453 pipeline_definition=execution_pipeline,
454 compiled_contexts=compiled_contexts,
455 max_workers=max_workers, # Use global config num_workers setting
456 visualizer=None, # No visualization in subprocess
457 log_file_base=log_file_base # Pass log base for worker process logging
458 )
459 death_marker("AFTER_FORCE_ERROR_DETECTION", f"results_type={type(results)}")
461 logger.info("🔥 SUBPROCESS: NUCLEAR EXECUTION WRAPPER RETURNED!")
462 print("🔥 SUBPROCESS STDOUT: NUCLEAR EXECUTION WRAPPER RETURNED!")
463 death_marker("EXECUTION_WRAPPER_RETURNED")
465 log_thread_count("after execute_compiled_plate")
467 logger.info("🔥 SUBPROCESS: execute_compiled_plate RETURNED successfully!")
468 logger.info(f"🔥 SUBPROCESS: Results: {type(results)}, length: {len(results) if results else 'None'}")
470 # FORCE ERROR DETECTION: Check for None results immediately
471 if results is None:
472 error_msg = "🔥 CRITICAL: execute_compiled_plate returned None - this should never happen!"
473 logger.error(error_msg)
474 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
475 raise RuntimeError(error_msg)
477 except Exception as execution_error:
478 # FORCE ERROR PROPAGATION: Re-raise with enhanced context
479 error_msg = f"🔥 EXECUTION ERROR in execute_compiled_plate: {execution_error}"
480 logger.error(error_msg, exc_info=True)
481 print(f"🔥 SUBPROCESS STDOUT EXECUTION ERROR: {error_msg}")
482 print(f"🔥 SUBPROCESS STDOUT EXECUTION TRACEBACK: {traceback.format_exc()}")
483 raise RuntimeError(error_msg) from execution_error
484 finally:
485 monitoring_active.clear() # Stop monitoring
487 logger.info("🔥 SUBPROCESS: Execution completed!")
489 # AGGRESSIVE RESULT VALIDATION: Force errors to surface
490 logger.info("🔥 SUBPROCESS: Starting aggressive result validation...")
492 # Check 1: Results exist
493 if not results:
494 error_msg = "🔥 EXECUTION FAILED: No results returned from execute_compiled_plate!"
495 logger.error(error_msg)
496 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
497 raise RuntimeError(error_msg)
499 # Check 2: Results is a dictionary
500 if not isinstance(results, dict):
501 error_msg = f"🔥 EXECUTION FAILED: Results is not a dict, got {type(results)}: {results}"
502 logger.error(error_msg)
503 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
504 raise RuntimeError(error_msg)
506 # Check 3: Expected number of results
507 if len(results) != len(wells):
508 error_msg = f"🔥 EXECUTION FAILED: Expected {len(wells)} results, got {len(results)}. Wells: {wells}, Result keys: {list(results.keys())}"
509 logger.error(error_msg)
510 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
511 raise RuntimeError(error_msg)
513 # Check 4: All wells have results
514 missing_wells = set(wells) - set(results.keys())
515 if missing_wells:
516 error_msg = f"🔥 EXECUTION FAILED: Missing results for wells: {missing_wells}"
517 logger.error(error_msg)
518 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
519 raise RuntimeError(error_msg)
521 # Check 5: All results have proper structure and check for errors
522 failed_wells = []
523 for well_id, result in results.items():
524 logger.info(f"🔥 SUBPROCESS: Validating result for well {well_id}: {result}")
526 if not isinstance(result, dict):
527 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} is not a dict: {type(result)} = {result}"
528 logger.error(error_msg)
529 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
530 raise RuntimeError(error_msg)
532 if 'status' not in result:
533 error_msg = f"🔥 EXECUTION FAILED: Result for well {well_id} missing 'status' field: {result}"
534 logger.error(error_msg)
535 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
536 raise RuntimeError(error_msg)
538 if result.get('status') != 'success':
539 error_msg = result.get('error_message', 'Unknown error')
540 details = result.get('details', 'No details')
541 full_error = f"🔥 EXECUTION FAILED for well {well_id}: {error_msg} | Details: {details}"
542 logger.error(full_error)
543 print(f"🔥 SUBPROCESS STDOUT ERROR: {full_error}")
544 failed_wells.append((well_id, error_msg, details))
546 # Check 6: Raise if any wells failed
547 if failed_wells:
548 error_summary = f"🔥 EXECUTION FAILED: {len(failed_wells)} wells failed out of {len(wells)}"
549 for well_id, error_msg, details in failed_wells:
550 error_summary += f"\n - Well {well_id}: {error_msg}"
551 logger.error(error_summary)
552 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_summary}")
553 raise RuntimeError(error_summary)
555 logger.info(f"🔥 SUBPROCESS: EXECUTION SUCCESS: {len(results)} wells executed successfully")
557 # Success logged to log file (single source of truth)
558 logger.info(f"🔥 SUBPROCESS: COMPLETED plate {plate_path} with {len(results)} results")
560 logger.info(f"🔥 SUBPROCESS: Plate {plate_path} completed successfully")
562 except Exception as e:
563 error_msg = f"Execution failed for plate {plate_path}: {e}"
564 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True)
565 print(f"🔥 SUBPROCESS STDOUT ERROR: {error_msg}")
566 print(f"🔥 SUBPROCESS STDOUT TRACEBACK: {traceback.format_exc()}")
567 # Error logged to log file (single source of truth)
568 except BaseException as e:
569 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc.
570 error_msg = f"CRITICAL failure for plate {plate_path}: {e}"
571 logger.error(f"🔥 SUBPROCESS: {error_msg}", exc_info=True)
572 print(f"🔥 SUBPROCESS STDOUT CRITICAL: {error_msg}")
573 print(f"🔥 SUBPROCESS STDOUT CRITICAL TRACEBACK: {traceback.format_exc()}")
574 # Error logged to log file (single source of truth)
576def main():
577 """Main entry point for subprocess runner."""
578 if len(sys.argv) < 3 or len(sys.argv) > 4:
579 print("Usage: python subprocess_runner.py <data_file.pkl> <log_file_base> [unique_id]")
580 sys.exit(1)
582 data_file = sys.argv[1]
583 log_file_base = sys.argv[2]
584 unique_id = sys.argv[3] if len(sys.argv) == 4 else None
586 # Build log file name from provided base and unique ID
587 if unique_id:
588 log_file = f"{log_file_base}_{unique_id}.log"
589 else:
590 log_file = f"{log_file_base}.log"
592 # PROCESS GROUP CLEANUP: Create new process group to manage all child processes
593 try:
594 import os
595 import signal
597 # Create new process group with this process as leader
598 os.setpgrp() # Create new process group
599 process_group_id = os.getpgrp()
601 print(f"🔥 SUBPROCESS: Created process group {process_group_id}")
603 # Track all child processes for cleanup
604 child_processes = set()
606 def kill_all_children():
607 """Kill all child processes and the entire process group."""
608 try:
609 print(f"🔥 SUBPROCESS: Killing process group {process_group_id}")
610 # Kill entire process group (negative PID kills process group)
611 os.killpg(process_group_id, signal.SIGTERM)
613 # Give processes time to exit gracefully
614 import time
615 time.sleep(2)
617 # Force kill if still alive
618 try:
619 os.killpg(process_group_id, signal.SIGKILL)
620 except ProcessLookupError:
621 pass # Already dead
623 print(f"🔥 SUBPROCESS: Process group {process_group_id} terminated")
624 except Exception as e:
625 print(f"🔥 SUBPROCESS: Error killing process group: {e}")
627 # Register cleanup function
628 atexit.register(kill_all_children)
630 except Exception as e:
631 print(f"🔥 SUBPROCESS: Warning - Could not set up process group cleanup: {e}")
633 # Set up logging first
634 logger = setup_subprocess_logging(log_file)
635 logger.info("🔥 SUBPROCESS: Starting OpenHCS subprocess runner")
636 logger.info(f"🔥 SUBPROCESS: Args - data: {data_file}, log: {log_file}")
637 logger.info(f"🔥 SUBPROCESS: Log file: {log_file}")
639 # DEATH DETECTION: Set up heartbeat monitoring
640 import threading
641 import time
643 def heartbeat_monitor():
644 """Monitor that writes heartbeats to detect where process dies."""
645 heartbeat_count = 0
646 while True:
647 try:
648 heartbeat_count += 1
649 heartbeat_msg = f"🔥 SUBPROCESS HEARTBEAT #{heartbeat_count}: Process alive at {time.time()}"
650 logger.info(heartbeat_msg)
651 print(heartbeat_msg)
653 # Heartbeat logged to log file (single source of truth)
654 # No separate heartbeat file needed
656 time.sleep(2) # Heartbeat every 2 seconds
657 except Exception as monitor_error:
658 logger.error(f"🔥 SUBPROCESS: Heartbeat monitor error: {monitor_error}")
659 print(f"🔥 SUBPROCESS STDOUT: Heartbeat monitor error: {monitor_error}")
660 break
662 # Start heartbeat monitor in daemon thread
663 heartbeat_thread = threading.Thread(target=heartbeat_monitor, daemon=True)
664 heartbeat_thread.start()
665 logger.info("🔥 SUBPROCESS: Heartbeat monitor started")
667 # NUCLEAR CRASH DETECTION - catch EVERYTHING
668 def crash_handler(signum, frame):
669 crash_msg = f"🔥 SUBPROCESS: CRASH DETECTED - Signal {signum} received!"
670 logger.error(crash_msg)
671 print(f"🔥 SUBPROCESS STDOUT CRASH: {crash_msg}")
673 # Crash info logged to log file (single source of truth)
675 # Dump stack trace
676 try:
677 import traceback
678 import threading
679 logger.error("🔥 SUBPROCESS: CRASH - Dumping all thread stacks...")
680 for thread_id, frame in sys._current_frames().items():
681 logger.error(f"🔥 SUBPROCESS: CRASH Thread {thread_id}:")
682 traceback.print_stack(frame)
683 except:
684 pass
686 # Force exit
687 os._exit(1)
689 # Set up signal handlers for all possible crashes
690 signal.signal(signal.SIGSEGV, crash_handler) # Segmentation fault
691 signal.signal(signal.SIGABRT, crash_handler) # Abort
692 signal.signal(signal.SIGFPE, crash_handler) # Floating point exception
693 signal.signal(signal.SIGILL, crash_handler) # Illegal instruction
694 signal.signal(signal.SIGTERM, crash_handler) # Termination
695 signal.signal(signal.SIGINT, crash_handler) # Interrupt (Ctrl+C)
697 # Set up atexit handler to catch silent deaths
698 def exit_handler():
699 logger.error("🔥 SUBPROCESS: ATEXIT - Process is exiting!")
700 print("🔥 SUBPROCESS STDOUT: ATEXIT - Process is exiting!")
701 # Exit info logged to log file (single source of truth)
703 atexit.register(exit_handler)
705 # Set up debug signal handler
706 def debug_handler(signum, frame):
707 logger.error("🔥 SUBPROCESS: SIGUSR1 received - dumping stack trace")
708 import traceback
709 import threading
711 # Dump all thread stacks
712 for thread_id, frame in sys._current_frames().items():
713 logger.error(f"🔥 SUBPROCESS: Thread {thread_id} stack:")
714 traceback.print_stack(frame)
716 # Log thread info
717 for thread in threading.enumerate():
718 logger.error(f"🔥 SUBPROCESS: Thread: {thread.name}, alive: {thread.is_alive()}")
720 signal.signal(signal.SIGUSR1, debug_handler)
721 logger.info("🔥 SUBPROCESS: NUCLEAR CRASH DETECTION ENABLED - All signals monitored")
723 try:
724 # Load pickled data
725 logger.info(f"🔥 SUBPROCESS: Loading data from {data_file}")
726 with open(data_file, 'rb') as f:
727 data = pickle.load(f)
729 plate_paths = data['plate_paths']
730 pipeline_data = data['pipeline_data'] # Dict[plate_path, List[FunctionStep]]
731 global_config = data['global_config']
733 logger.info(f"🔥 SUBPROCESS: Loaded data for {len(plate_paths)} plates")
734 logger.info(f"🔥 SUBPROCESS: Plates: {plate_paths}")
736 # Process each plate (like test_main.py but for multiple plates)
737 for plate_path in plate_paths:
738 pipeline_steps = pipeline_data[plate_path]
739 logger.info(f"🔥 SUBPROCESS: Processing plate {plate_path} with {len(pipeline_steps)} steps")
741 run_single_plate(
742 plate_path=plate_path,
743 pipeline_steps=pipeline_steps,
744 global_config=global_config,
745 logger=logger,
746 log_file_base=log_file_base
747 )
749 logger.info("🔥 SUBPROCESS: All plates completed successfully")
751 except Exception as e:
752 logger.error(f"🔥 SUBPROCESS: Fatal error: {e}", exc_info=True)
753 print(f"🔥 SUBPROCESS STDOUT FATAL: {e}")
754 print(f"🔥 SUBPROCESS STDOUT FATAL TRACEBACK: {traceback.format_exc()}")
755 # Error logged to log file (single source of truth)
756 logger.error(f"🔥 SUBPROCESS: Fatal error for all plates: {e}")
757 sys.exit(1)
758 except BaseException as e:
759 # Catch EVERYTHING including SystemExit, KeyboardInterrupt, etc.
760 logger.error(f"🔥 SUBPROCESS: CRITICAL SYSTEM ERROR: {e}", exc_info=True)
761 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM: {e}")
762 print(f"🔥 SUBPROCESS STDOUT CRITICAL SYSTEM TRACEBACK: {traceback.format_exc()}")
763 # Critical error logged to log file (single source of truth)
764 logger.error(f"🔥 SUBPROCESS: Critical system error for all plates: {e}")
765 sys.exit(2)
767if __name__ == "__main__":
768 main()