Coverage for ezstitcher/core/pipeline_orchestrator.py: 89%

170 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2025-04-30 13:20 +0000

1import logging 

2import os 

3import copy 

4import time 

5import threading 

6import concurrent.futures 

7from pathlib import Path 

8 

9 

10from ezstitcher.core.microscope_interfaces import create_microscope_handler 

11from ezstitcher.core.stitcher import Stitcher 

12from ezstitcher.core.file_system_manager import FileSystemManager 

13from ezstitcher.core.image_processor import ImageProcessor 

14from ezstitcher.core.focus_analyzer import FocusAnalyzer 

15from ezstitcher.core.config import PipelineConfig 

16 

17# Import the pipeline architecture 

18from ezstitcher.core.pipeline import Step, Pipeline 

19 

20logger = logging.getLogger(__name__) 

21 

22DEFAULT_PADDING = 3 

23 

24class PipelineOrchestrator: 

25 """Orchestrates the complete image processing and stitching pipeline.""" 

26 

27 def __init__(self, plate_path=None, workspace_path=None, config=None, fs_manager=None, image_preprocessor=None, focus_analyzer=None): 

28 """ 

29 Initialize the pipeline orchestrator. 

30 

31 Args: 

32 config: Pipeline configuration 

33 fs_manager: File system manager 

34 image_preprocessor: Image preprocessor 

35 focus_analyzer: Focus analyzer 

36 """ 

37 self.config = config or PipelineConfig() 

38 

39 self.plate_path = Path(plate_path) 

40 self.fs_manager = fs_manager or FileSystemManager() 

41 

42 # Determine workspace path 

43 if workspace_path: 

44 workspace_path_to_use = workspace_path 

45 else: 

46 workspace_path_to_use = self.plate_path.parent / f"{self.plate_path.name}_workspace" 

47 

48 # Convert to Path 

49 self.workspace_path = Path(workspace_path_to_use) 

50 

51 logger.info("Detecting microscope type") 

52 self.microscope_handler = create_microscope_handler('auto', plate_folder=self.plate_path) 

53 logger.info("Initializing workspace: %s", workspace_path) 

54 self.microscope_handler.init_workspace(self.plate_path, self.workspace_path) 

55 

56 logger.info("Preparing images through renaming and dir flattening") 

57 self.input_dir = self.prepare_images(self.workspace_path) 

58 

59 self.stitcher = Stitcher(self.config.stitcher, filename_parser=self.microscope_handler.parser) 

60 self.image_preprocessor = image_preprocessor or ImageProcessor() 

61 

62 # Initialize focus analyzer 

63 self.focus_analyzer = focus_analyzer or FocusAnalyzer() 

64 

65 

66 def run(self,pipelines=None): 

67 """ 

68 Process a plate through the complete pipeline. 

69 

70 Args: 

71 plate_folder: Path to the plate folder 

72 pipelines: List of pipelines to run for each well 

73 

74 Returns: 

75 bool: True if successful, False otherwise 

76 """ 

77 try: 

78 # Setup 

79 self.config.grid_size = self.microscope_handler.get_grid_dimensions(self.workspace_path) 

80 logger.info("Grid size: %s", self.config.grid_size) 

81 self.config.pixel_size = self.microscope_handler.get_pixel_size(self.workspace_path) 

82 logger.info("Pixel size: %s", self.config.pixel_size) 

83 

84 # Directory setup is handled within pipelines now. 

85 

86 # Get wells to process 

87 wells = self._get_wells_to_process() 

88 

89 # Process wells using ThreadPoolExecutor 

90 num_workers = self.config.num_workers 

91 # Use only one worker if there's only one well 

92 effective_workers = min(num_workers, len(wells)) if len(wells) > 0 else 1 

93 # Check if pipelines are provided 

94 if pipelines: 

95 logger.info("Using provided pipelines for processing") 

96 else: 

97 logger.info("No pipelines provided, using pipeline functions") 

98 

99 logger.info( 

100 "Processing %d wells using %d worker threads", 

101 len(wells), 

102 effective_workers 

103 ) 

104 

105 # Create a thread pool with the appropriate number of workers 

106 with concurrent.futures.ThreadPoolExecutor(max_workers=effective_workers) as executor: 

107 # Submit all well processing tasks 

108 # Create a mapping of futures to wells 

109 future_to_well = {} 

110 logger.info("About to submit %d wells for parallel processing", len(wells)) 

111 for well in wells: 

112 # Deep copy pipelines for thread safety as they might be stateful 

113 # and lack a specific clone method. Requires 'import copy'. 

114 copied_pipelines = [copy.deepcopy(p) for p in pipelines] if pipelines else [] 

115 

116 logger.info("Submitting well %s to thread pool", well) 

117 future = executor.submit( 

118 self.process_well, 

119 well, 

120 copied_pipelines # Pass copied pipelines 

121 ) 

122 future_to_well[future] = well 

123 

124 # Process results as they complete 

125 for future in concurrent.futures.as_completed(future_to_well): 

126 well = future_to_well[future] 

127 try: 

128 future.result() # Get the result (or exception) 

129 logger.info("Completed processing well %s", well) 

130 

131 except Exception as e: 

132 logger.error("Error processing well %s: %s", well, e, exc_info=True) 

133 

134 # Final cleanup after all wells have been processed 

135 # Cleanup is now handled by individual pipelines 

136 

137 return True 

138 

139 except Exception as e: 

140 logger.error("Pipeline failed with unexpected error: %s", str(e)) 

141 logger.debug("Exception details:", exc_info=True) 

142 return False 

143 

144 def _get_wells_to_process(self): 

145 """ 

146 Get the list of wells to process based on well filter. 

147 

148 Args: 

149 input_dir: Input directory 

150 

151 Returns: 

152 list: List of wells to process 

153 """ 

154 input_dir = self.input_dir 

155 start_time = time.time() 

156 logger.info("Finding wells to process in %s", input_dir) 

157 

158 # Auto-detect all wells 

159 all_wells = set() 

160 

161 image_paths = FileSystemManager.list_image_files(input_dir, recursive=True) 

162 

163 # Extract wells from filenames 

164 logger.info("Found %d image files. Extracting well information...", len(image_paths)) 

165 for img_path in image_paths: 

166 metadata = self.microscope_handler.parse_filename(img_path.name) 

167 if metadata and 'well' in metadata: 

168 all_wells.add(metadata['well']) 

169 

170 # Apply well filter if specified 

171 if self.config.well_filter: 

172 # Convert well filter to lowercase for case-insensitive matching 

173 well_filter_lower = [w.lower() for w in self.config.well_filter] 

174 wells_to_process = [well for well in all_wells if well.lower() in well_filter_lower] 

175 else: 

176 wells_to_process = list(all_wells) 

177 

178 logger.info("Found %d wells in %.2f seconds", len(wells_to_process), time.time() - start_time) 

179 return wells_to_process 

180 

181 def process_well(self, well, pipelines=None): 

182 """ 

183 Process a single well through the pipeline. 

184 

185 Args: 

186 well: Well identifier 

187 pipelines: List of cloned pipelines to run sequentially for this well 

188 """ 

189 logger.info("Processing well %s", well) 

190 logger.info("Processing well %s with pixel size %s", well, self.config.pixel_size) 

191 

192 # Add thread ID information for debugging 

193 thread_id = threading.get_ident() 

194 thread_name = threading.current_thread().name 

195 logger.info("Processing well %s in thread %s (ID: %s)", well, thread_name, thread_id) 

196 

197 # Stitcher instances will be provided on demand by the orchestrator 

198 # via the get_stitcher() method, if needed by pipeline steps. 

199 

200 # Run the pipelines sequentially (list received is already copied) 

201 if pipelines: 

202 logger.info("Running %d pipelines for well %s", len(pipelines), well) 

203 for i, pipeline in enumerate(pipelines): 

204 logger.info("Running pipeline %d/%d for well %s: %s", 

205 i+1, len(pipelines), well, pipeline.name) 

206 

207 # Orchestrator is passed, allowing pipelines/steps to call 

208 # orchestrator.get_stitcher() if they need one. 

209 pipeline.run( 

210 well_filter=[well], 

211 orchestrator=self 

212 ) 

213 

214 logger.info("All pipelines completed for well %s", well) 

215 else: 

216 logger.warning("No pipelines provided for well %s", well) 

217 

218 

219 

220 def get_stitcher(self): 

221 """ 

222 Provides a new Stitcher instance configured for the current run. 

223 This ensures thread safety by creating a new instance on demand. 

224 

225 Returns: 

226 Stitcher: A new Stitcher instance. 

227 """ 

228 logger.debug("Creating new Stitcher instance for requestor.") 

229 # Ensure the stitcher is configured using the orchestrator's config 

230 return Stitcher(self.config.stitcher, filename_parser=self.microscope_handler.parser) 

231 

232 def prepare_images(self, plate_path): 

233 """ 

234 Prepare images by padding filenames and organizing Z-stack folders. 

235 

236 Args: 

237 plate_path: Path to the plate folder 

238 

239 Returns: 

240 Path: Path to the image directory 

241 """ 

242 start_time = time.time() 

243 

244 # Find the image directory 

245 image_dir = FileSystemManager.find_image_directory(plate_path) 

246 logger.info("Found image directory: %s", image_dir) 

247 

248 # Always rename files with consistent padding, even for Opera Phenix datasets 

249 logger.info("Renaming files with consistent padding...") 

250 rename_start = time.time() 

251 self.fs_manager.rename_files_with_consistent_padding( 

252 image_dir, 

253 parser=self.microscope_handler, 

254 width=DEFAULT_PADDING, # Use consistent padding width 

255 force_suffixes=True # Force missing suffixes to be added 

256 ) 

257 logger.info("Renamed files in %.2f seconds", time.time() - rename_start) 

258 

259 # Detect and organize Z-stack folders 

260 zstack_start = time.time() 

261 has_zstack_folders, z_folders = self.fs_manager.detect_zstack_folders(image_dir) 

262 if has_zstack_folders: 

263 logger.info("Found %d Z-stack folders in %s", len(z_folders), image_dir) 

264 logger.info("Organizing Z-stack folders...") 

265 self.fs_manager.organize_zstack_folders( 

266 image_dir, filename_parser=self.microscope_handler) 

267 logger.info("Organized Z-stack folders in %.2f seconds", time.time() - zstack_start) 

268 

269 # Return the image directory (which may have changed if Z-stack folders were organized) 

270 logger.info("Image preparation completed in %.2f seconds", time.time() - start_time) 

271 return FileSystemManager.find_image_directory(plate_path) 

272 

273 def _get_patterns_for_well(self, well, directory): 

274 """ 

275 Get patterns for a specific well from a directory. 

276 """ 

277 patterns_by_well = self.microscope_handler.auto_detect_patterns( 

278 directory, well_filter=[well], variable_components=['site'] 

279 ) 

280 

281 # Extract and flatten all patterns for this well 

282 all_patterns = [] 

283 if patterns_by_well and well in patterns_by_well: 

284 for _, patterns in patterns_by_well[well].items(): 

285 all_patterns.extend(patterns) 

286 

287 return all_patterns 

288 

289 def _get_reference_pattern(self, well, sample_pattern): 

290 """ 

291 Create a reference pattern for stitching. 

292 """ 

293 if not sample_pattern: 

294 raise ValueError(f"No pattern found for well {well}") 

295 

296 metadata = self.microscope_handler.parser.parse_filename(sample_pattern) 

297 if not metadata: 

298 raise ValueError(f"Could not parse pattern: {sample_pattern}") 

299 

300 return self.microscope_handler.parser.construct_filename( 

301 well=metadata['well'], 

302 site="{iii}", 

303 channel=metadata.get('channel'), 

304 z_index=metadata.get('z_index'), 

305 extension=metadata['extension'], 

306 site_padding=DEFAULT_PADDING, 

307 z_padding=DEFAULT_PADDING 

308 ) 

309 

310 def generate_positions(self, well, input_dir, positions_dir): 

311 """ 

312 Generate stitching positions for a well using a dedicated stitcher instance. 

313 """ 

314 logger.info("Generating positions for well %s", well) 

315 # Get a dedicated stitcher instance for this operation 

316 stitcher_to_use = self.get_stitcher() 

317 positions_dir = Path(positions_dir) 

318 input_dir = Path(input_dir) 

319 self.fs_manager.ensure_directory(positions_dir) 

320 positions_file = positions_dir / Path(f"{well}.csv") 

321 

322 # Get patterns and create reference pattern 

323 all_patterns = self._get_patterns_for_well(well, input_dir) 

324 if not all_patterns: 

325 raise ValueError(f"No patterns found for well {well}") 

326 

327 ### currently only support 1 set of positions per well (more makes no sense) 

328 reference_pattern = self._get_reference_pattern(well, all_patterns[0]) 

329 

330 # Generate positions 

331 stitcher_to_use.generate_positions( 

332 input_dir, 

333 reference_pattern, 

334 positions_file, 

335 self.config.grid_size[0], 

336 self.config.grid_size[1], 

337 ) 

338 

339 return positions_file, reference_pattern 

340 

341 def _create_output_filename(self, pattern): 

342 """ 

343 Create an output filename for a stitched image based on a pattern. 

344 """ 

345 parsable = pattern.replace('{iii}', '001') 

346 metadata = self.microscope_handler.parser.parse_filename(parsable) 

347 

348 if not metadata: 

349 raise ValueError(f"Could not parse pattern: {pattern}") 

350 

351 return self.microscope_handler.parser.construct_filename( 

352 well=metadata['well'], 

353 site=metadata['site'], 

354 channel=metadata['channel'], 

355 z_index=metadata.get('z_index', 1), 

356 extension='.tif', 

357 site_padding=DEFAULT_PADDING, 

358 z_padding=DEFAULT_PADDING 

359 ) 

360 

361 def stitch_images(self, well, input_dir, output_dir, positions_file): 

362 """ 

363 Stitch images for a well using a dedicated stitcher instance. 

364 """ 

365 logger.info("Stitching images for well %s", well) 

366 # Get a dedicated stitcher instance for this operation via the orchestrator's method 

367 stitcher_to_use = self.get_stitcher() 

368 output_dir = Path(output_dir) 

369 input_dir = Path(input_dir) 

370 

371 # Find the actual image directory 

372 actual_input_dir = FileSystemManager.find_image_directory(input_dir) 

373 logger.info("Using actual image directory: %s", actual_input_dir) 

374 

375 # Ensure output directory exists 

376 self.fs_manager.ensure_directory(output_dir) 

377 logger.info("Ensured output directory exists: %s", output_dir) 

378 

379 # Get patterns for this well 

380 all_patterns = self._get_patterns_for_well(well, actual_input_dir) 

381 if not all_patterns: 

382 raise ValueError(f"No patterns found for well {well} in {actual_input_dir}") 

383 

384 # Process each pattern 

385 for pattern in all_patterns: 

386 # Find all matching files and skip if none found 

387 matching_files = self.microscope_handler.parser.path_list_from_pattern( 

388 actual_input_dir, pattern) 

389 if not matching_files: 

390 logger.warning("No files found for pattern %s, skipping", pattern) 

391 continue 

392 

393 # Create output filename and path 

394 output_path = output_dir / self._create_output_filename(pattern) 

395 logger.info("Stitching pattern %s to %s", pattern, output_path) 

396 

397 # Assemble the stitched image 

398 stitcher_to_use.assemble_image( 

399 positions_path=positions_file, 

400 images_dir=actual_input_dir, 

401 output_path=output_path, 

402 override_names=[str(actual_input_dir / f) for f in matching_files] 

403 )