Coverage for ezstitcher/core/pipeline_orchestrator.py: 89%
170 statements
« prev ^ index » next coverage.py v7.3.2, created at 2025-04-30 13:20 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2025-04-30 13:20 +0000
1import logging
2import os
3import copy
4import time
5import threading
6import concurrent.futures
7from pathlib import Path
10from ezstitcher.core.microscope_interfaces import create_microscope_handler
11from ezstitcher.core.stitcher import Stitcher
12from ezstitcher.core.file_system_manager import FileSystemManager
13from ezstitcher.core.image_processor import ImageProcessor
14from ezstitcher.core.focus_analyzer import FocusAnalyzer
15from ezstitcher.core.config import PipelineConfig
17# Import the pipeline architecture
18from ezstitcher.core.pipeline import Step, Pipeline
20logger = logging.getLogger(__name__)
22DEFAULT_PADDING = 3
24class PipelineOrchestrator:
25 """Orchestrates the complete image processing and stitching pipeline."""
27 def __init__(self, plate_path=None, workspace_path=None, config=None, fs_manager=None, image_preprocessor=None, focus_analyzer=None):
28 """
29 Initialize the pipeline orchestrator.
31 Args:
32 config: Pipeline configuration
33 fs_manager: File system manager
34 image_preprocessor: Image preprocessor
35 focus_analyzer: Focus analyzer
36 """
37 self.config = config or PipelineConfig()
39 self.plate_path = Path(plate_path)
40 self.fs_manager = fs_manager or FileSystemManager()
42 # Determine workspace path
43 if workspace_path:
44 workspace_path_to_use = workspace_path
45 else:
46 workspace_path_to_use = self.plate_path.parent / f"{self.plate_path.name}_workspace"
48 # Convert to Path
49 self.workspace_path = Path(workspace_path_to_use)
51 logger.info("Detecting microscope type")
52 self.microscope_handler = create_microscope_handler('auto', plate_folder=self.plate_path)
53 logger.info("Initializing workspace: %s", workspace_path)
54 self.microscope_handler.init_workspace(self.plate_path, self.workspace_path)
56 logger.info("Preparing images through renaming and dir flattening")
57 self.input_dir = self.prepare_images(self.workspace_path)
59 self.stitcher = Stitcher(self.config.stitcher, filename_parser=self.microscope_handler.parser)
60 self.image_preprocessor = image_preprocessor or ImageProcessor()
62 # Initialize focus analyzer
63 self.focus_analyzer = focus_analyzer or FocusAnalyzer()
66 def run(self,pipelines=None):
67 """
68 Process a plate through the complete pipeline.
70 Args:
71 plate_folder: Path to the plate folder
72 pipelines: List of pipelines to run for each well
74 Returns:
75 bool: True if successful, False otherwise
76 """
77 try:
78 # Setup
79 self.config.grid_size = self.microscope_handler.get_grid_dimensions(self.workspace_path)
80 logger.info("Grid size: %s", self.config.grid_size)
81 self.config.pixel_size = self.microscope_handler.get_pixel_size(self.workspace_path)
82 logger.info("Pixel size: %s", self.config.pixel_size)
84 # Directory setup is handled within pipelines now.
86 # Get wells to process
87 wells = self._get_wells_to_process()
89 # Process wells using ThreadPoolExecutor
90 num_workers = self.config.num_workers
91 # Use only one worker if there's only one well
92 effective_workers = min(num_workers, len(wells)) if len(wells) > 0 else 1
93 # Check if pipelines are provided
94 if pipelines:
95 logger.info("Using provided pipelines for processing")
96 else:
97 logger.info("No pipelines provided, using pipeline functions")
99 logger.info(
100 "Processing %d wells using %d worker threads",
101 len(wells),
102 effective_workers
103 )
105 # Create a thread pool with the appropriate number of workers
106 with concurrent.futures.ThreadPoolExecutor(max_workers=effective_workers) as executor:
107 # Submit all well processing tasks
108 # Create a mapping of futures to wells
109 future_to_well = {}
110 logger.info("About to submit %d wells for parallel processing", len(wells))
111 for well in wells:
112 # Deep copy pipelines for thread safety as they might be stateful
113 # and lack a specific clone method. Requires 'import copy'.
114 copied_pipelines = [copy.deepcopy(p) for p in pipelines] if pipelines else []
116 logger.info("Submitting well %s to thread pool", well)
117 future = executor.submit(
118 self.process_well,
119 well,
120 copied_pipelines # Pass copied pipelines
121 )
122 future_to_well[future] = well
124 # Process results as they complete
125 for future in concurrent.futures.as_completed(future_to_well):
126 well = future_to_well[future]
127 try:
128 future.result() # Get the result (or exception)
129 logger.info("Completed processing well %s", well)
131 except Exception as e:
132 logger.error("Error processing well %s: %s", well, e, exc_info=True)
134 # Final cleanup after all wells have been processed
135 # Cleanup is now handled by individual pipelines
137 return True
139 except Exception as e:
140 logger.error("Pipeline failed with unexpected error: %s", str(e))
141 logger.debug("Exception details:", exc_info=True)
142 return False
144 def _get_wells_to_process(self):
145 """
146 Get the list of wells to process based on well filter.
148 Args:
149 input_dir: Input directory
151 Returns:
152 list: List of wells to process
153 """
154 input_dir = self.input_dir
155 start_time = time.time()
156 logger.info("Finding wells to process in %s", input_dir)
158 # Auto-detect all wells
159 all_wells = set()
161 image_paths = FileSystemManager.list_image_files(input_dir, recursive=True)
163 # Extract wells from filenames
164 logger.info("Found %d image files. Extracting well information...", len(image_paths))
165 for img_path in image_paths:
166 metadata = self.microscope_handler.parse_filename(img_path.name)
167 if metadata and 'well' in metadata:
168 all_wells.add(metadata['well'])
170 # Apply well filter if specified
171 if self.config.well_filter:
172 # Convert well filter to lowercase for case-insensitive matching
173 well_filter_lower = [w.lower() for w in self.config.well_filter]
174 wells_to_process = [well for well in all_wells if well.lower() in well_filter_lower]
175 else:
176 wells_to_process = list(all_wells)
178 logger.info("Found %d wells in %.2f seconds", len(wells_to_process), time.time() - start_time)
179 return wells_to_process
181 def process_well(self, well, pipelines=None):
182 """
183 Process a single well through the pipeline.
185 Args:
186 well: Well identifier
187 pipelines: List of cloned pipelines to run sequentially for this well
188 """
189 logger.info("Processing well %s", well)
190 logger.info("Processing well %s with pixel size %s", well, self.config.pixel_size)
192 # Add thread ID information for debugging
193 thread_id = threading.get_ident()
194 thread_name = threading.current_thread().name
195 logger.info("Processing well %s in thread %s (ID: %s)", well, thread_name, thread_id)
197 # Stitcher instances will be provided on demand by the orchestrator
198 # via the get_stitcher() method, if needed by pipeline steps.
200 # Run the pipelines sequentially (list received is already copied)
201 if pipelines:
202 logger.info("Running %d pipelines for well %s", len(pipelines), well)
203 for i, pipeline in enumerate(pipelines):
204 logger.info("Running pipeline %d/%d for well %s: %s",
205 i+1, len(pipelines), well, pipeline.name)
207 # Orchestrator is passed, allowing pipelines/steps to call
208 # orchestrator.get_stitcher() if they need one.
209 pipeline.run(
210 well_filter=[well],
211 orchestrator=self
212 )
214 logger.info("All pipelines completed for well %s", well)
215 else:
216 logger.warning("No pipelines provided for well %s", well)
220 def get_stitcher(self):
221 """
222 Provides a new Stitcher instance configured for the current run.
223 This ensures thread safety by creating a new instance on demand.
225 Returns:
226 Stitcher: A new Stitcher instance.
227 """
228 logger.debug("Creating new Stitcher instance for requestor.")
229 # Ensure the stitcher is configured using the orchestrator's config
230 return Stitcher(self.config.stitcher, filename_parser=self.microscope_handler.parser)
232 def prepare_images(self, plate_path):
233 """
234 Prepare images by padding filenames and organizing Z-stack folders.
236 Args:
237 plate_path: Path to the plate folder
239 Returns:
240 Path: Path to the image directory
241 """
242 start_time = time.time()
244 # Find the image directory
245 image_dir = FileSystemManager.find_image_directory(plate_path)
246 logger.info("Found image directory: %s", image_dir)
248 # Always rename files with consistent padding, even for Opera Phenix datasets
249 logger.info("Renaming files with consistent padding...")
250 rename_start = time.time()
251 self.fs_manager.rename_files_with_consistent_padding(
252 image_dir,
253 parser=self.microscope_handler,
254 width=DEFAULT_PADDING, # Use consistent padding width
255 force_suffixes=True # Force missing suffixes to be added
256 )
257 logger.info("Renamed files in %.2f seconds", time.time() - rename_start)
259 # Detect and organize Z-stack folders
260 zstack_start = time.time()
261 has_zstack_folders, z_folders = self.fs_manager.detect_zstack_folders(image_dir)
262 if has_zstack_folders:
263 logger.info("Found %d Z-stack folders in %s", len(z_folders), image_dir)
264 logger.info("Organizing Z-stack folders...")
265 self.fs_manager.organize_zstack_folders(
266 image_dir, filename_parser=self.microscope_handler)
267 logger.info("Organized Z-stack folders in %.2f seconds", time.time() - zstack_start)
269 # Return the image directory (which may have changed if Z-stack folders were organized)
270 logger.info("Image preparation completed in %.2f seconds", time.time() - start_time)
271 return FileSystemManager.find_image_directory(plate_path)
273 def _get_patterns_for_well(self, well, directory):
274 """
275 Get patterns for a specific well from a directory.
276 """
277 patterns_by_well = self.microscope_handler.auto_detect_patterns(
278 directory, well_filter=[well], variable_components=['site']
279 )
281 # Extract and flatten all patterns for this well
282 all_patterns = []
283 if patterns_by_well and well in patterns_by_well:
284 for _, patterns in patterns_by_well[well].items():
285 all_patterns.extend(patterns)
287 return all_patterns
289 def _get_reference_pattern(self, well, sample_pattern):
290 """
291 Create a reference pattern for stitching.
292 """
293 if not sample_pattern:
294 raise ValueError(f"No pattern found for well {well}")
296 metadata = self.microscope_handler.parser.parse_filename(sample_pattern)
297 if not metadata:
298 raise ValueError(f"Could not parse pattern: {sample_pattern}")
300 return self.microscope_handler.parser.construct_filename(
301 well=metadata['well'],
302 site="{iii}",
303 channel=metadata.get('channel'),
304 z_index=metadata.get('z_index'),
305 extension=metadata['extension'],
306 site_padding=DEFAULT_PADDING,
307 z_padding=DEFAULT_PADDING
308 )
310 def generate_positions(self, well, input_dir, positions_dir):
311 """
312 Generate stitching positions for a well using a dedicated stitcher instance.
313 """
314 logger.info("Generating positions for well %s", well)
315 # Get a dedicated stitcher instance for this operation
316 stitcher_to_use = self.get_stitcher()
317 positions_dir = Path(positions_dir)
318 input_dir = Path(input_dir)
319 self.fs_manager.ensure_directory(positions_dir)
320 positions_file = positions_dir / Path(f"{well}.csv")
322 # Get patterns and create reference pattern
323 all_patterns = self._get_patterns_for_well(well, input_dir)
324 if not all_patterns:
325 raise ValueError(f"No patterns found for well {well}")
327 ### currently only support 1 set of positions per well (more makes no sense)
328 reference_pattern = self._get_reference_pattern(well, all_patterns[0])
330 # Generate positions
331 stitcher_to_use.generate_positions(
332 input_dir,
333 reference_pattern,
334 positions_file,
335 self.config.grid_size[0],
336 self.config.grid_size[1],
337 )
339 return positions_file, reference_pattern
341 def _create_output_filename(self, pattern):
342 """
343 Create an output filename for a stitched image based on a pattern.
344 """
345 parsable = pattern.replace('{iii}', '001')
346 metadata = self.microscope_handler.parser.parse_filename(parsable)
348 if not metadata:
349 raise ValueError(f"Could not parse pattern: {pattern}")
351 return self.microscope_handler.parser.construct_filename(
352 well=metadata['well'],
353 site=metadata['site'],
354 channel=metadata['channel'],
355 z_index=metadata.get('z_index', 1),
356 extension='.tif',
357 site_padding=DEFAULT_PADDING,
358 z_padding=DEFAULT_PADDING
359 )
361 def stitch_images(self, well, input_dir, output_dir, positions_file):
362 """
363 Stitch images for a well using a dedicated stitcher instance.
364 """
365 logger.info("Stitching images for well %s", well)
366 # Get a dedicated stitcher instance for this operation via the orchestrator's method
367 stitcher_to_use = self.get_stitcher()
368 output_dir = Path(output_dir)
369 input_dir = Path(input_dir)
371 # Find the actual image directory
372 actual_input_dir = FileSystemManager.find_image_directory(input_dir)
373 logger.info("Using actual image directory: %s", actual_input_dir)
375 # Ensure output directory exists
376 self.fs_manager.ensure_directory(output_dir)
377 logger.info("Ensured output directory exists: %s", output_dir)
379 # Get patterns for this well
380 all_patterns = self._get_patterns_for_well(well, actual_input_dir)
381 if not all_patterns:
382 raise ValueError(f"No patterns found for well {well} in {actual_input_dir}")
384 # Process each pattern
385 for pattern in all_patterns:
386 # Find all matching files and skip if none found
387 matching_files = self.microscope_handler.parser.path_list_from_pattern(
388 actual_input_dir, pattern)
389 if not matching_files:
390 logger.warning("No files found for pattern %s, skipping", pattern)
391 continue
393 # Create output filename and path
394 output_path = output_dir / self._create_output_filename(pattern)
395 logger.info("Stitching pattern %s to %s", pattern, output_path)
397 # Assemble the stitched image
398 stitcher_to_use.assemble_image(
399 positions_path=positions_file,
400 images_dir=actual_input_dir,
401 output_path=output_path,
402 override_names=[str(actual_input_dir / f) for f in matching_files]
403 )