Coverage for openhcs/debug/export.py: 0.0%
72 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1#!/usr/bin/env python3
2"""
3OpenHCS Pipeline Script - Generated from tmplsq2uc32.pkl
4Generated: 2025-07-21 21:34:24.776151
5"""
7import sys
8import os
9from pathlib import Path
11# Add OpenHCS to path
12sys.path.insert(0, "/home/ts/code/projects/openhcs")
14from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
15from openhcs.core.steps.function_step import FunctionStep
16from openhcs.core.config import (GlobalPipelineConfig, PathPlanningConfig, VFSConfig, ZarrConfig,
17 MaterializationBackend, ZarrCompressor, ZarrChunkStrategy)
18from openhcs.constants.constants import VariableComponents, Backend, Microscope
20# Function and Enum imports
21from openhcs.constants.constants import VariableComponents
22from openhcs.processing.backends.analysis.cell_counting_cpu import DetectionMethod, count_cells_single_channel
23from openhcs.processing.backends.analysis.skan_axon_analysis import AnalysisDimension, skan_axon_skeletonize_and_analyze
24from openhcs.processing.backends.assemblers.assemble_stack_cupy import assemble_stack_cupy
25from openhcs.processing.backends.pos_gen.ashlar_main_gpu import ashlar_compute_tile_positions_gpu
26from openhcs.processing.backends.processors.cupy_processor import create_composite, stack_percentile_normalize, tophat
27from openhcs.processing.backends.processors.torch_processor import stack_percentile_normalize
29def create_pipeline():
30 """Create and return the pipeline configuration."""
32 # Plate paths
33 plate_paths = ['/home/ts/nvme_usb/IMX/20250528-new-f04-analogs-n1-2-Plate-1_Plate_23318']
35 # Global configuration
36 global_config = GlobalPipelineConfig(
37 num_workers=5,
38 path_planning=PathPlanningConfig(
39 output_dir_suffix="_stitched",
40 global_output_folder="/home/ts/nvme_usb/OpenHCS/",
41 materialization_results_path="results"
42 ),
43 vfs=VFSConfig(
44 intermediate_backend=Backend.MEMORY,
45 materialization_backend=MaterializationBackend.ZARR
46 ),
47 zarr=ZarrConfig(
48 store_name="images.zarr",
49 compressor=ZarrCompressor.ZSTD,
50 compression_level=1,
51 shuffle=True,
52 chunk_strategy=ZarrChunkStrategy.SINGLE,
53 ome_zarr_metadata=True,
54 write_plate_metadata=True
55 ),
56 microscope=Microscope.AUTO,
57 use_threading=None
58 )
60 # Pipeline steps
61 pipeline_data = {}
63 # Steps for plate: 20250528-new-f04-analogs-n1-2-Plate-1_Plate_23318
64 steps = []
66 # Step 1: preprocess1
67 step_1 = FunctionStep(
68 func=[
69 (stack_percentile_normalize, {
70 'low_percentile': 1.0,
71 'high_percentile': 99.0,
72 'target_max': 65535.0
73 }),
74 (tophat, {
75 'selem_radius': 50,
76 'downsample_factor': 4
77 })
78 ],
79 name="preprocess1",
80 variable_components=[VariableComponents.SITE],
81 force_disk_output=False
82 )
83 steps.append(step_1)
85 # Step 2: composite
86 step_2 = FunctionStep(
87 func=[
88 (create_composite, {})
89 ],
90 name="composite",
91 variable_components=[VariableComponents.CHANNEL],
92 force_disk_output=False
93 )
94 steps.append(step_2)
96 # Step 3: find_stitch_positions
97 step_3 = FunctionStep(
98 func=[
99 (ashlar_compute_tile_positions_gpu, {
100 'overlap_ratio': 0.1,
101 'max_shift': 15.0,
102 'stitch_alpha': 0.2,
103 'upsample_factor': 10,
104 'permutation_upsample': 1,
105 'permutation_samples': 1000,
106 'min_permutation_samples': 10,
107 'max_permutation_tries': 100,
108 'window_size_factor': 0.1
109 })
110 ],
111 name="find_stitch_positions",
112 variable_components=[VariableComponents.SITE],
113 force_disk_output=False
114 )
115 steps.append(step_3)
117 # Step 4: preprocess2
118 step_4 = FunctionStep(
119 func=[
120 (stack_percentile_normalize, {
121 'low_percentile': 1.0,
122 'high_percentile': 99.0,
123 'target_max': 65535.0
124 }),
125 (tophat, {
126 'selem_radius': 50,
127 'downsample_factor': 4
128 })
129 ],
130 name="preprocess2",
131 variable_components=[VariableComponents.SITE],
132 force_disk_output=False
133 )
134 steps.append(step_4)
136 # Step 5: assemble
137 step_5 = FunctionStep(
138 func=[
139 (assemble_stack_cupy, {
140 'blend_method': "fixed",
141 'fixed_margin_ratio': 0.1,
142 'overlap_blend_fraction': 1.0
143 })
144 ],
145 name="assemble",
146 variable_components=[VariableComponents.SITE],
147 force_disk_output=True
148 )
149 steps.append(step_5)
151 # Step 6: skan
152 step_6 = FunctionStep(
153 func={ '1': [
154 (count_cells_single_channel, {
155 'min_sigma': 1.0,
156 'max_sigma': 10.0,
157 'num_sigma': 10,
158 'threshold': 0.1,
159 'overlap': 0.5,
160 'watershed_footprint_size': 3,
161 'watershed_min_distance': 5,
162 'gaussian_sigma': 1.0,
163 'median_disk_size': 1,
164 'min_cell_area': 30,
165 'max_cell_area': 200,
166 'detection_method': DetectionMethod.WATERSHED
167 })
168 ],
169 '2': [
170 (skan_axon_skeletonize_and_analyze, {
171 'voxel_spacing': (1.0, 1.0, 1.0),
172 'min_object_size': 100,
173 'min_branch_length': 10.0,
174 'analysis_dimension': AnalysisDimension.TWO_D
175 })
176 ]
177 },
178 name="skan",
179 variable_components=[VariableComponents.SITE],
180 force_disk_output=False
181 )
182 steps.append(step_6)
184 pipeline_data["/home/ts/nvme_usb/IMX/20250528-new-f04-analogs-n1-2-Plate-1_Plate_23318"] = steps
186 return plate_paths, pipeline_data, global_config
188def setup_signal_handlers():
189 """Setup signal handlers to kill all child processes and threads on Ctrl+C."""
190 import signal
191 import os
192 import sys
194 def cleanup_and_exit(signum, frame):
195 print(f"\n🔥 Signal {signum} received! Cleaning up all processes and threads...")
197 os._exit(1)
199 signal.signal(signal.SIGINT, cleanup_and_exit)
200 signal.signal(signal.SIGTERM, cleanup_and_exit)
202def run_pipeline():
203 os.environ["OPENHCS_SUBPROCESS_MODE"] = "1"
204 plate_paths, pipeline_data, global_config = create_pipeline()
205 from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry
206 setup_global_gpu_registry(global_config=global_config)
207 for plate_path in plate_paths:
208 orchestrator = PipelineOrchestrator(plate_path, global_config=global_config)
209 orchestrator.initialize()
210 compiled_contexts = orchestrator.compile_pipelines(pipeline_data[plate_path])
211 orchestrator.execute_compiled_plate(
212 pipeline_definition=pipeline_data[plate_path],
213 compiled_contexts=compiled_contexts,
214 max_workers=global_config.num_workers
215 )
217def export_debug_data(subprocess_data, output_path, data_file_path=None, log_file_path=None):
218 """
219 Export debug data including subprocess data, data files, and log files.
221 Args:
222 subprocess_data: The subprocess data to export
223 output_path: Base path for exported files
224 data_file_path: Optional path to data file to copy
225 log_file_path: Optional path to log file to copy
227 Returns:
228 Dict of exported file paths
229 """
230 import pickle
231 import shutil
232 from pathlib import Path
234 output_path = Path(output_path)
235 exported_files = {}
237 # Export subprocess data as pickle
238 pickle_path = output_path.with_suffix('.pkl')
239 with open(pickle_path, 'wb') as f:
240 pickle.dump(subprocess_data, f)
241 exported_files['subprocess_data'] = pickle_path
243 # Copy data file if provided
244 if data_file_path and Path(data_file_path).exists():
245 data_dest = output_path.with_suffix('.data')
246 shutil.copy2(data_file_path, data_dest)
247 exported_files['data_file'] = data_dest
249 # Copy log file if provided
250 if log_file_path and Path(log_file_path).exists():
251 log_dest = output_path.with_suffix('.log')
252 shutil.copy2(log_file_path, log_dest)
253 exported_files['log_file'] = log_dest
255 return exported_files
258if __name__ == "__main__":
259 setup_signal_handlers()
260 run_pipeline()