Coverage for ezstitcher/core/pipeline.py: 77%
146 statements
« prev ^ index » next coverage.py v7.3.2, created at 2025-04-30 13:20 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2025-04-30 13:20 +0000
1"""
2Core implementation of the Flexible Pipeline Architecture.
4This module provides a flexible, declarative API for defining image processing
5pipelines in EZStitcher. It builds on the strengths of the current
6process_patterns_with_variable_components method while adding an object-oriented
7core with a functional interface.
8"""
10from typing import Dict, List, Any
11import logging
12from pathlib import Path
14# Import base interface
15from .pipeline_base import PipelineInterface
17# Import Step classes from steps module
18from ezstitcher.core.steps import ImageStitchingStep
19from ezstitcher.core.steps import Step, WellFilter
20from ezstitcher.core.utils import prepare_patterns_and_functions
22# Configure logging
23logger = logging.getLogger(__name__)
26class Pipeline(PipelineInterface):
27 """
28 A sequence of processing steps.
30 A Pipeline is a sequence of processing steps that are executed in order.
31 Each step takes input from the previous step's output and produces new output.
33 Attributes:
34 steps: The sequence of processing steps
35 input_dir: The input directory
36 output_dir: The output directory
37 well_filter: Wells to process
38 name: Human-readable name for the pipeline
39 _config: Configuration parameters
40 """
42 def __init__(
43 self,
44 steps: List[Step] = None,
45 input_dir: str = None,
46 output_dir: str = None,
47 well_filter: WellFilter = None,
48 name: str = None
49 ):
50 """
51 Initialize a pipeline.
53 Args:
54 steps: The sequence of processing steps
55 input_dir: The input directory
56 output_dir: The output directory
57 well_filter: Wells to process
58 name: Human-readable name for the pipeline
59 """
60 self.steps = []
61 self.input_dir = input_dir
62 self.output_dir = output_dir
63 self.well_filter = well_filter
64 self.name = name or f"Pipeline({len(steps or [])} steps)"
65 self._config = {}
67 # Add steps if provided
68 if steps:
69 for step in steps:
70 if step is not None: # Skip None values in steps list
71 self.add_step(step)
73 def add_step(self, step: Step, output_dir: str = None) -> 'Pipeline':
74 """
75 Add a step to the pipeline with improved directory resolution.
77 Directory resolution follows these rules:
78 1. Input directory is resolved first based on previous step
79 2. Output directory is set based on input directory
80 3. Explicit output_dir overrides automatic resolution
81 """
82 # First ensure input directory is coherent
83 self._ensure_coherent_input_directory(step)
85 # Set output directory if not explicitly provided
86 if not output_dir and not step.output_dir:
87 self._set_step_output_directory(step)
88 elif output_dir:
89 step.output_dir = output_dir
91 # Add step and update pipeline directories
92 self.steps.append(step)
93 self._update_pipeline_directories(step)
94 return self
96 def _ensure_coherent_input_directory(self, step: Step):
97 """Ensure step's input directory is coherent with pipeline flow."""
98 if not self.steps: # First step
99 if not step.input_dir:
100 if not self.input_dir:
101 raise ValueError("Input directory must be specified for the first step or at the pipeline level")
102 step.input_dir = self.input_dir
103 return
105 prev_step = self.steps[-1]
107 # If no input specified, use previous step's output
108 if not step.input_dir:
109 step.input_dir = prev_step.output_dir or prev_step.input_dir
111 def _check_directory_conflicts(self, step: Step, proposed_dir: Path) -> bool:
112 """
113 Check for directory conflicts in pipeline.
115 Args:
116 step: Step being configured
117 proposed_dir: Proposed output directory
119 Returns:
120 bool: True if conflict exists
121 """
122 proposed_dir = Path(proposed_dir)
123 last_processing = next((s for s in reversed(self.steps)
124 if s.__class__.__name__ != "PositionGenerationStep"), None)
125 return (last_processing and Path(last_processing.output_dir) == proposed_dir) or \
126 (step.input_dir and Path(step.input_dir) == proposed_dir)
128 def _set_stitching_step_output_directory(self, step):
129 """Set output directory for ImageStitchingStep."""
130 stitched_suffix = getattr(self.orchestrator.config, 'stitched_dir_suffix', '_stitched') if hasattr(self, 'orchestrator') else '_stitched'
132 # Always use the workspace directory (input directory) as the base
133 # This ensures the stitched output is not in the same directory as any processing step
134 base_dir = Path(self.input_dir)
136 # Create the stitched output directory
137 step.output_dir = base_dir.parent / f"{base_dir.name}{stitched_suffix}"
139 # Check for conflicts and adjust if needed
140 if self._check_directory_conflicts(step, step.output_dir):
141 step.output_dir = base_dir.parent / f"{base_dir.name}{stitched_suffix}_final"
143 def _set_step_output_directory(self, step: Step):
144 """Set the step's output directory if not already specified."""
145 if step.output_dir:
146 return # Output directory already specified
148 # Get directory suffixes from orchestrator's config if available
149 out_suffix = "_out" # Default suffix for all processing steps
150 positions_suffix = "_positions" # Default suffix for position generation steps
152 # Try to get suffixes from orchestrator config
153 if hasattr(self, 'orchestrator') and self.orchestrator and hasattr(self.orchestrator, 'config'):
154 config = self.orchestrator.config
155 out_suffix = config.out_dir_suffix
156 positions_suffix = config.positions_dir_suffix
158 # Check if this is a stitching step
159 is_stitching = step.__class__.__name__ == "ImageStitchingStep"
161 # Check if this is a position generation step
162 is_position_generation = step.__class__.__name__ == "PositionGenerationStep"
164 # Special handling for ImageStitchingStep
165 if is_stitching:
166 # If the step's input_dir is the same as the pipeline's input_dir,
167 # always use the default stitched directory to avoid conflicts with regular steps
168 if step.input_dir == self.input_dir:
169 self._set_stitching_step_output_directory(step)
170 return
172 # If pipeline has an output_dir, use it for the stitching step
173 if self.output_dir:
174 step.output_dir = self.output_dir
175 logger.info("ImageStitchingStep using pipeline output dir: %s", step.output_dir)
176 return
177 # Otherwise use the default stitching directory
178 self._set_stitching_step_output_directory(step)
179 return
181 # Special handling for PositionGenerationStep
182 if is_position_generation:
183 # Use the default positions directory
184 input_path = Path(step.input_dir)
185 step.output_dir = input_path.parent / f"{input_path.name}{positions_suffix}"
186 logger.info("PositionGenerationStep using default directory: %s", step.output_dir)
187 return
189 # For regular image processing steps (Step, ZFlatStep, CompositeStep, FocusStep)
190 # Check if there's a previous step with the same input directory
191 if self.steps:
192 prev_step = self.steps[-1]
193 # If this step's input is the previous step's output, use the same directory
194 if step.input_dir == prev_step.output_dir:
195 step.output_dir = step.input_dir
196 logger.info("Step using in-place processing: %s", step.output_dir)
197 return
199 # Otherwise use default output directory based on input_dir
200 input_path = Path(step.input_dir)
201 step.output_dir = input_path.parent / f"{input_path.name}{out_suffix}"
202 logger.info("Processing step using default directory: %s", step.output_dir)
203 # Don't create the directory yet - let the step create it when it's executed
205 def _update_pipeline_directories(self, step: Step):
206 """Update pipeline directories based on the step if needed."""
207 # If this is the first step and pipeline's input_dir is not set, use step's input_dir
208 if not self.steps and not self.input_dir and step.input_dir:
209 self.input_dir = step.input_dir
211 # If pipeline's output_dir is not set, use the step's output_dir
212 # Let each step handle its own directory logic
213 if not self.output_dir and step.output_dir:
214 self.output_dir = step.output_dir
216 def set_input(self, input_dir: str) -> 'Pipeline':
217 """
218 Set the input directory.
220 Args:
221 input_dir: The input directory
223 Returns:
224 Self, for method chaining
225 """
226 self.input_dir = input_dir
227 return self
229 def set_output(self, output_dir: str) -> 'Pipeline':
230 """
231 Set the output directory.
233 Args:
234 output_dir: The output directory
236 Returns:
237 Self, for method chaining
238 """
239 self.output_dir = output_dir
240 return self
242 def run(
243 self,
244 input_dir: str = None,
245 output_dir: str = None,
246 well_filter: WellFilter = None,
247 microscope_handler = None,
248 orchestrator = None,
249 positions_file = None
250 ) -> Dict[str, Any]:
251 """
252 Execute the pipeline.
254 Args:
255 input_dir: Optional input directory override
256 output_dir: Optional output directory override
257 well_filter: Optional well filter override
258 microscope_handler: Optional microscope handler override
259 orchestrator: Optional PipelineOrchestrator instance
260 positions_file: Optional positions file to use for stitching
262 Returns:
263 The results of the pipeline execution
265 Raises:
266 ValueError: If no input directory is specified
267 """
268 logger.info("Running pipeline: %s", self.name)
270 self.orchestrator = orchestrator
271 self.microscope_handler = self.orchestrator.microscope_handler
272 if orchestrator is None:
273 raise ValueError("orchestrator must be specified")
274 effective_input = input_dir or self.input_dir
275 effective_output = output_dir or self.output_dir
276 effective_well_filter = well_filter or self.well_filter
278 # If input_dir is still not set, try to get it from the first step
279 if not effective_input and self.steps:
280 effective_input = self.steps[0].input_dir
282 if not effective_input:
283 raise ValueError("Input directory must be specified")
285 logger.info("Input directory: %s", effective_input)
286 logger.info("Output directory: %s", effective_output)
287 logger.info("Well filter: %s", effective_well_filter)
289 # Initialize context
290 context = ProcessingContext(
291 input_dir=effective_input,
292 output_dir=effective_output,
293 well_filter=effective_well_filter,
294 orchestrator=orchestrator,
295 )
297 # Execute each step
298 for i, step in enumerate(self.steps):
299 logger.info("Executing step %d/%d: %s", i+1, len(self.steps), step)
300 context = step.process(context)
302 logger.info("Pipeline completed: %s", self.name)
303 return context.results
305 def collect_unique_dirs(self) -> set:
306 """
307 Collects all unique directory paths from all steps in the pipeline.
309 Iterates through each step's attributes and collects values for attributes
310 with "dir" in their name.
312 Returns:
313 A set of unique directory paths.
314 """
315 unique_dirs = set()
316 for step in self.steps:
317 for attr_name, attr_value in step.__dict__.items():
318 if "dir" in attr_name.lower() and attr_value:
319 unique_dirs.add(attr_value)
320 return unique_dirs
322 def __repr__(self) -> str:
323 """
324 String representation of the pipeline.
326 Returns:
327 A human-readable representation of the pipeline
328 """
329 steps_repr = "\n ".join(repr(step) for step in self.steps)
330 input_dir_str = str(self.input_dir) if self.input_dir else "None"
331 output_dir_str = str(self.output_dir) if self.output_dir else "None"
332 return (f"{self.name}\n"
333 f" Input: {input_dir_str}\n"
334 f" Output: {output_dir_str}\n"
335 f" Well filter: {self.well_filter}\n"
336 f" Steps:\n {steps_repr}")
339class ProcessingContext:
340 """
341 Maintains state during pipeline execution.
343 The ProcessingContext holds input/output directories, well filter, configuration,
344 and results during pipeline execution.
346 Attributes:
347 input_dir: The input directory
348 output_dir: The output directory
349 well_filter: Wells to process
350 config: Configuration parameters
351 results: Processing results
352 """
354 def __init__(
355 self,
356 input_dir: str = None,
357 output_dir: str = None,
358 well_filter: WellFilter = None,
359 config: Dict[str, Any] = None,
360 **kwargs
361 ):
362 """
363 Initialize the processing context.
365 Args:
366 input_dir: The input directory
367 output_dir: The output directory
368 well_filter: Wells to process
369 config: Configuration parameters
370 **kwargs: Additional context attributes
371 """
372 self.input_dir = input_dir
373 self.output_dir = output_dir
374 self.well_filter = well_filter
375 self.config = config or {}
376 self.results = {}
378 # Add any additional attributes
379 for key, value in kwargs.items():
380 setattr(self, key, value)
386def group_patterns_by(patterns, component, microscope_handler=None):
387 """
388 Group patterns by the specified component.
390 Args:
391 patterns (list): Patterns to group
392 Returns:
393 dict: Dictionary mapping component values to lists of patterns
394 """
395 grouped_patterns = {}
396 for pattern in patterns:
397 # Extract the component value from the pattern
398 component_value = microscope_handler.parser.parse_filename(pattern)[component]
399 if component_value not in grouped_patterns:
400 grouped_patterns[component_value] = []
401 grouped_patterns[component_value].append(pattern)
402 return grouped_patterns