Coverage for openhcs/debug/export.py: 0.0%
69 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1#!/usr/bin/env python3
2"""
3OpenHCS Pipeline Script - Generated from tmplsq2uc32.pkl
4Generated: 2025-07-21 21:34:24.776151
5"""
7import sys
8import os
9from pathlib import Path
11# Add OpenHCS to path
12sys.path.insert(0, "/home/ts/code/projects/openhcs")
14from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
15from openhcs.core.steps.function_step import FunctionStep
16from openhcs.core.config import (GlobalPipelineConfig, PathPlanningConfig, VFSConfig, ZarrConfig,
17 MaterializationBackend, ZarrCompressor, ZarrChunkStrategy)
18from openhcs.constants.constants import VariableComponents, Backend, Microscope
20# Function and Enum imports
21from openhcs.processing.backends.analysis.cell_counting_cpu import DetectionMethod, count_cells_single_channel
22from openhcs.processing.backends.analysis.skan_axon_analysis import AnalysisDimension, skan_axon_skeletonize_and_analyze
23from openhcs.processing.backends.assemblers.assemble_stack_cupy import assemble_stack_cupy
24from openhcs.processing.backends.pos_gen.ashlar_main_gpu import ashlar_compute_tile_positions_gpu
25from openhcs.processing.backends.processors.cupy_processor import create_composite, stack_percentile_normalize, tophat
26from openhcs.processing.backends.processors.torch_processor import stack_percentile_normalize
28def create_pipeline():
29 """Create and return the pipeline configuration."""
31 # Plate paths
32 plate_paths = ['/home/ts/nvme_usb/IMX/20250528-new-f04-analogs-n1-2-Plate-1_Plate_23318']
34 # Global configuration
35 global_config = GlobalPipelineConfig(
36 num_workers=5,
37 path_planning=PathPlanningConfig(
38 output_dir_suffix="_stitched",
39 global_output_folder="/home/ts/nvme_usb/OpenHCS/",
40 materialization_results_path="results"
41 ),
42 vfs=VFSConfig(
43 intermediate_backend=Backend.MEMORY,
44 materialization_backend=MaterializationBackend.ZARR
45 ),
46 zarr=ZarrConfig(
47 compressor=ZarrCompressor.ZSTD,
48 compression_level=1,
49 chunk_strategy=ZarrChunkStrategy.WELL
50 ),
51 microscope=Microscope.AUTO,
52 use_threading=None
53 )
55 # Pipeline steps
56 pipeline_data = {}
58 # Steps for plate: 20250528-new-f04-analogs-n1-2-Plate-1_Plate_23318
59 steps = []
61 # Step 1: preprocess1
62 step_1 = FunctionStep(
63 func=[
64 (stack_percentile_normalize, {
65 'low_percentile': 1.0,
66 'high_percentile': 99.0,
67 'target_max': 65535.0
68 }),
69 (tophat, {
70 'selem_radius': 50,
71 'downsample_factor': 4
72 })
73 ],
74 name="preprocess1",
75 variable_components=[VariableComponents.SITE],
76 force_disk_output=False
77 )
78 steps.append(step_1)
80 # Step 2: composite
81 step_2 = FunctionStep(
82 func=[
83 (create_composite, {})
84 ],
85 name="composite",
86 variable_components=[VariableComponents.CHANNEL],
87 force_disk_output=False
88 )
89 steps.append(step_2)
91 # Step 3: find_stitch_positions
92 step_3 = FunctionStep(
93 func=[
94 (ashlar_compute_tile_positions_gpu, {
95 'overlap_ratio': 0.1,
96 'max_shift': 15.0,
97 'stitch_alpha': 0.2,
98 'upsample_factor': 10,
99 'permutation_upsample': 1,
100 'permutation_samples': 1000,
101 'min_permutation_samples': 10,
102 'max_permutation_tries': 100,
103 'window_size_factor': 0.1
104 })
105 ],
106 name="find_stitch_positions",
107 variable_components=[VariableComponents.SITE],
108 force_disk_output=False
109 )
110 steps.append(step_3)
112 # Step 4: preprocess2
113 step_4 = FunctionStep(
114 func=[
115 (stack_percentile_normalize, {
116 'low_percentile': 1.0,
117 'high_percentile': 99.0,
118 'target_max': 65535.0
119 }),
120 (tophat, {
121 'selem_radius': 50,
122 'downsample_factor': 4
123 })
124 ],
125 name="preprocess2",
126 variable_components=[VariableComponents.SITE],
127 force_disk_output=False
128 )
129 steps.append(step_4)
131 # Step 5: assemble
132 step_5 = FunctionStep(
133 func=[
134 (assemble_stack_cupy, {
135 'blend_method': "fixed",
136 'fixed_margin_ratio': 0.1,
137 'overlap_blend_fraction': 1.0
138 })
139 ],
140 name="assemble",
141 variable_components=[VariableComponents.SITE],
142 force_disk_output=True
143 )
144 steps.append(step_5)
146 # Step 6: skan
147 step_6 = FunctionStep(
148 func={ '1': [
149 (count_cells_single_channel, {
150 'min_sigma': 1.0,
151 'max_sigma': 10.0,
152 'num_sigma': 10,
153 'threshold': 0.1,
154 'overlap': 0.5,
155 'watershed_footprint_size': 3,
156 'watershed_min_distance': 5,
157 'gaussian_sigma': 1.0,
158 'median_disk_size': 1,
159 'min_cell_area': 30,
160 'max_cell_area': 200,
161 'detection_method': DetectionMethod.WATERSHED
162 })
163 ],
164 '2': [
165 (skan_axon_skeletonize_and_analyze, {
166 'voxel_spacing': (1.0, 1.0, 1.0),
167 'min_object_size': 100,
168 'min_branch_length': 10.0,
169 'analysis_dimension': AnalysisDimension.TWO_D
170 })
171 ]
172 },
173 name="skan",
174 variable_components=[VariableComponents.SITE],
175 force_disk_output=False
176 )
177 steps.append(step_6)
179 pipeline_data["/home/ts/nvme_usb/IMX/20250528-new-f04-analogs-n1-2-Plate-1_Plate_23318"] = steps
181 return plate_paths, pipeline_data, global_config
183def setup_signal_handlers():
184 """Setup signal handlers to kill all child processes and threads on Ctrl+C."""
185 import signal
186 import os
188 def cleanup_and_exit(signum, frame):
189 print(f"\n🔥 Signal {signum} received! Cleaning up all processes and threads...")
191 os._exit(1)
193 signal.signal(signal.SIGINT, cleanup_and_exit)
194 signal.signal(signal.SIGTERM, cleanup_and_exit)
196def run_pipeline():
197 os.environ["OPENHCS_SUBPROCESS_MODE"] = "1"
198 plate_paths, pipeline_data, global_config = create_pipeline()
199 from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry
200 setup_global_gpu_registry(global_config=global_config)
201 for plate_path in plate_paths:
202 orchestrator = PipelineOrchestrator(plate_path)
203 orchestrator.initialize()
204 compiled_contexts = orchestrator.compile_pipelines(pipeline_data[plate_path])
205 orchestrator.execute_compiled_plate(
206 pipeline_definition=pipeline_data[plate_path],
207 compiled_contexts=compiled_contexts,
208 max_workers=global_config.num_workers
209 )
211def export_debug_data(subprocess_data, output_path, data_file_path=None, log_file_path=None):
212 """
213 Export debug data including subprocess data, data files, and log files.
215 Args:
216 subprocess_data: The subprocess data to export
217 output_path: Base path for exported files
218 data_file_path: Optional path to data file to copy
219 log_file_path: Optional path to log file to copy
221 Returns:
222 Dict of exported file paths
223 """
224 import pickle
225 import shutil
227 output_path = Path(output_path)
228 exported_files = {}
230 # Export subprocess data as pickle
231 pickle_path = output_path.with_suffix('.pkl')
232 with open(pickle_path, 'wb') as f:
233 pickle.dump(subprocess_data, f)
234 exported_files['subprocess_data'] = pickle_path
236 # Copy data file if provided
237 if data_file_path and Path(data_file_path).exists():
238 data_dest = output_path.with_suffix('.data')
239 shutil.copy2(data_file_path, data_dest)
240 exported_files['data_file'] = data_dest
242 # Copy log file if provided
243 if log_file_path and Path(log_file_path).exists():
244 log_dest = output_path.with_suffix('.log')
245 shutil.copy2(log_file_path, log_dest)
246 exported_files['log_file'] = log_dest
248 return exported_files
251if __name__ == "__main__":
252 setup_signal_handlers()
253 run_pipeline()