Coverage for openhcs/debug/export.py: 0.0%

69 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1#!/usr/bin/env python3 

2""" 

3OpenHCS Pipeline Script - Generated from tmplsq2uc32.pkl 

4Generated: 2025-07-21 21:34:24.776151 

5""" 

6 

7import sys 

8import os 

9from pathlib import Path 

10 

11# Add OpenHCS to path 

12sys.path.insert(0, "/home/ts/code/projects/openhcs") 

13 

14from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator 

15from openhcs.core.steps.function_step import FunctionStep 

16from openhcs.core.config import (GlobalPipelineConfig, PathPlanningConfig, VFSConfig, ZarrConfig, 

17 MaterializationBackend, ZarrCompressor, ZarrChunkStrategy) 

18from openhcs.constants.constants import VariableComponents, Backend, Microscope 

19 

20# Function and Enum imports 

21from openhcs.processing.backends.analysis.cell_counting_cpu import DetectionMethod, count_cells_single_channel 

22from openhcs.processing.backends.analysis.skan_axon_analysis import AnalysisDimension, skan_axon_skeletonize_and_analyze 

23from openhcs.processing.backends.assemblers.assemble_stack_cupy import assemble_stack_cupy 

24from openhcs.processing.backends.pos_gen.ashlar_main_gpu import ashlar_compute_tile_positions_gpu 

25from openhcs.processing.backends.processors.cupy_processor import create_composite, stack_percentile_normalize, tophat 

26from openhcs.processing.backends.processors.torch_processor import stack_percentile_normalize 

27 

28def create_pipeline(): 

29 """Create and return the pipeline configuration.""" 

30 

31 # Plate paths 

32 plate_paths = ['/home/ts/nvme_usb/IMX/20250528-new-f04-analogs-n1-2-Plate-1_Plate_23318'] 

33 

34 # Global configuration 

35 global_config = GlobalPipelineConfig( 

36 num_workers=5, 

37 path_planning=PathPlanningConfig( 

38 output_dir_suffix="_stitched", 

39 global_output_folder="/home/ts/nvme_usb/OpenHCS/", 

40 materialization_results_path="results" 

41 ), 

42 vfs=VFSConfig( 

43 intermediate_backend=Backend.MEMORY, 

44 materialization_backend=MaterializationBackend.ZARR 

45 ), 

46 zarr=ZarrConfig( 

47 compressor=ZarrCompressor.ZSTD, 

48 compression_level=1, 

49 chunk_strategy=ZarrChunkStrategy.WELL 

50 ), 

51 microscope=Microscope.AUTO, 

52 use_threading=None 

53 ) 

54 

55 # Pipeline steps 

56 pipeline_data = {} 

57 

58 # Steps for plate: 20250528-new-f04-analogs-n1-2-Plate-1_Plate_23318 

59 steps = [] 

60 

61 # Step 1: preprocess1 

62 step_1 = FunctionStep( 

63 func=[ 

64 (stack_percentile_normalize, { 

65 'low_percentile': 1.0, 

66 'high_percentile': 99.0, 

67 'target_max': 65535.0 

68 }), 

69 (tophat, { 

70 'selem_radius': 50, 

71 'downsample_factor': 4 

72 }) 

73 ], 

74 name="preprocess1", 

75 variable_components=[VariableComponents.SITE], 

76 force_disk_output=False 

77 ) 

78 steps.append(step_1) 

79 

80 # Step 2: composite 

81 step_2 = FunctionStep( 

82 func=[ 

83 (create_composite, {}) 

84 ], 

85 name="composite", 

86 variable_components=[VariableComponents.CHANNEL], 

87 force_disk_output=False 

88 ) 

89 steps.append(step_2) 

90 

91 # Step 3: find_stitch_positions 

92 step_3 = FunctionStep( 

93 func=[ 

94 (ashlar_compute_tile_positions_gpu, { 

95 'overlap_ratio': 0.1, 

96 'max_shift': 15.0, 

97 'stitch_alpha': 0.2, 

98 'upsample_factor': 10, 

99 'permutation_upsample': 1, 

100 'permutation_samples': 1000, 

101 'min_permutation_samples': 10, 

102 'max_permutation_tries': 100, 

103 'window_size_factor': 0.1 

104 }) 

105 ], 

106 name="find_stitch_positions", 

107 variable_components=[VariableComponents.SITE], 

108 force_disk_output=False 

109 ) 

110 steps.append(step_3) 

111 

112 # Step 4: preprocess2 

113 step_4 = FunctionStep( 

114 func=[ 

115 (stack_percentile_normalize, { 

116 'low_percentile': 1.0, 

117 'high_percentile': 99.0, 

118 'target_max': 65535.0 

119 }), 

120 (tophat, { 

121 'selem_radius': 50, 

122 'downsample_factor': 4 

123 }) 

124 ], 

125 name="preprocess2", 

126 variable_components=[VariableComponents.SITE], 

127 force_disk_output=False 

128 ) 

129 steps.append(step_4) 

130 

131 # Step 5: assemble 

132 step_5 = FunctionStep( 

133 func=[ 

134 (assemble_stack_cupy, { 

135 'blend_method': "fixed", 

136 'fixed_margin_ratio': 0.1, 

137 'overlap_blend_fraction': 1.0 

138 }) 

139 ], 

140 name="assemble", 

141 variable_components=[VariableComponents.SITE], 

142 force_disk_output=True 

143 ) 

144 steps.append(step_5) 

145 

146 # Step 6: skan 

147 step_6 = FunctionStep( 

148 func={ '1': [ 

149 (count_cells_single_channel, { 

150 'min_sigma': 1.0, 

151 'max_sigma': 10.0, 

152 'num_sigma': 10, 

153 'threshold': 0.1, 

154 'overlap': 0.5, 

155 'watershed_footprint_size': 3, 

156 'watershed_min_distance': 5, 

157 'gaussian_sigma': 1.0, 

158 'median_disk_size': 1, 

159 'min_cell_area': 30, 

160 'max_cell_area': 200, 

161 'detection_method': DetectionMethod.WATERSHED 

162 }) 

163 ], 

164 '2': [ 

165 (skan_axon_skeletonize_and_analyze, { 

166 'voxel_spacing': (1.0, 1.0, 1.0), 

167 'min_object_size': 100, 

168 'min_branch_length': 10.0, 

169 'analysis_dimension': AnalysisDimension.TWO_D 

170 }) 

171 ] 

172 }, 

173 name="skan", 

174 variable_components=[VariableComponents.SITE], 

175 force_disk_output=False 

176 ) 

177 steps.append(step_6) 

178 

179 pipeline_data["/home/ts/nvme_usb/IMX/20250528-new-f04-analogs-n1-2-Plate-1_Plate_23318"] = steps 

180 

181 return plate_paths, pipeline_data, global_config 

182 

183def setup_signal_handlers(): 

184 """Setup signal handlers to kill all child processes and threads on Ctrl+C.""" 

185 import signal 

186 import os 

187 

188 def cleanup_and_exit(signum, frame): 

189 print(f"\n🔥 Signal {signum} received! Cleaning up all processes and threads...") 

190 

191 os._exit(1) 

192 

193 signal.signal(signal.SIGINT, cleanup_and_exit) 

194 signal.signal(signal.SIGTERM, cleanup_and_exit) 

195 

196def run_pipeline(): 

197 os.environ["OPENHCS_SUBPROCESS_MODE"] = "1" 

198 plate_paths, pipeline_data, global_config = create_pipeline() 

199 from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry 

200 setup_global_gpu_registry(global_config=global_config) 

201 for plate_path in plate_paths: 

202 orchestrator = PipelineOrchestrator(plate_path) 

203 orchestrator.initialize() 

204 compiled_contexts = orchestrator.compile_pipelines(pipeline_data[plate_path]) 

205 orchestrator.execute_compiled_plate( 

206 pipeline_definition=pipeline_data[plate_path], 

207 compiled_contexts=compiled_contexts, 

208 max_workers=global_config.num_workers 

209 ) 

210 

211def export_debug_data(subprocess_data, output_path, data_file_path=None, log_file_path=None): 

212 """ 

213 Export debug data including subprocess data, data files, and log files. 

214 

215 Args: 

216 subprocess_data: The subprocess data to export 

217 output_path: Base path for exported files 

218 data_file_path: Optional path to data file to copy 

219 log_file_path: Optional path to log file to copy 

220 

221 Returns: 

222 Dict of exported file paths 

223 """ 

224 import pickle 

225 import shutil 

226 

227 output_path = Path(output_path) 

228 exported_files = {} 

229 

230 # Export subprocess data as pickle 

231 pickle_path = output_path.with_suffix('.pkl') 

232 with open(pickle_path, 'wb') as f: 

233 pickle.dump(subprocess_data, f) 

234 exported_files['subprocess_data'] = pickle_path 

235 

236 # Copy data file if provided 

237 if data_file_path and Path(data_file_path).exists(): 

238 data_dest = output_path.with_suffix('.data') 

239 shutil.copy2(data_file_path, data_dest) 

240 exported_files['data_file'] = data_dest 

241 

242 # Copy log file if provided 

243 if log_file_path and Path(log_file_path).exists(): 

244 log_dest = output_path.with_suffix('.log') 

245 shutil.copy2(log_file_path, log_dest) 

246 exported_files['log_file'] = log_dest 

247 

248 return exported_files 

249 

250 

251if __name__ == "__main__": 

252 setup_signal_handlers() 

253 run_pipeline()