Coverage for ezstitcher/core/pipeline.py: 77%

146 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2025-04-30 13:20 +0000

1""" 

2Core implementation of the Flexible Pipeline Architecture. 

3 

4This module provides a flexible, declarative API for defining image processing 

5pipelines in EZStitcher. It builds on the strengths of the current 

6process_patterns_with_variable_components method while adding an object-oriented 

7core with a functional interface. 

8""" 

9 

10from typing import Dict, List, Any 

11import logging 

12from pathlib import Path 

13 

14# Import base interface 

15from .pipeline_base import PipelineInterface 

16 

17# Import Step classes from steps module 

18from ezstitcher.core.steps import ImageStitchingStep 

19from ezstitcher.core.steps import Step, WellFilter 

20from ezstitcher.core.utils import prepare_patterns_and_functions 

21 

22# Configure logging 

23logger = logging.getLogger(__name__) 

24 

25 

26class Pipeline(PipelineInterface): 

27 """ 

28 A sequence of processing steps. 

29 

30 A Pipeline is a sequence of processing steps that are executed in order. 

31 Each step takes input from the previous step's output and produces new output. 

32 

33 Attributes: 

34 steps: The sequence of processing steps 

35 input_dir: The input directory 

36 output_dir: The output directory 

37 well_filter: Wells to process 

38 name: Human-readable name for the pipeline 

39 _config: Configuration parameters 

40 """ 

41 

42 def __init__( 

43 self, 

44 steps: List[Step] = None, 

45 input_dir: str = None, 

46 output_dir: str = None, 

47 well_filter: WellFilter = None, 

48 name: str = None 

49 ): 

50 """ 

51 Initialize a pipeline. 

52 

53 Args: 

54 steps: The sequence of processing steps 

55 input_dir: The input directory 

56 output_dir: The output directory 

57 well_filter: Wells to process 

58 name: Human-readable name for the pipeline 

59 """ 

60 self.steps = [] 

61 self.input_dir = input_dir 

62 self.output_dir = output_dir 

63 self.well_filter = well_filter 

64 self.name = name or f"Pipeline({len(steps or [])} steps)" 

65 self._config = {} 

66 

67 # Add steps if provided 

68 if steps: 

69 for step in steps: 

70 if step is not None: # Skip None values in steps list 

71 self.add_step(step) 

72 

73 def add_step(self, step: Step, output_dir: str = None) -> 'Pipeline': 

74 """ 

75 Add a step to the pipeline with improved directory resolution. 

76 

77 Directory resolution follows these rules: 

78 1. Input directory is resolved first based on previous step 

79 2. Output directory is set based on input directory 

80 3. Explicit output_dir overrides automatic resolution 

81 """ 

82 # First ensure input directory is coherent 

83 self._ensure_coherent_input_directory(step) 

84 

85 # Set output directory if not explicitly provided 

86 if not output_dir and not step.output_dir: 

87 self._set_step_output_directory(step) 

88 elif output_dir: 

89 step.output_dir = output_dir 

90 

91 # Add step and update pipeline directories 

92 self.steps.append(step) 

93 self._update_pipeline_directories(step) 

94 return self 

95 

96 def _ensure_coherent_input_directory(self, step: Step): 

97 """Ensure step's input directory is coherent with pipeline flow.""" 

98 if not self.steps: # First step 

99 if not step.input_dir: 

100 if not self.input_dir: 

101 raise ValueError("Input directory must be specified for the first step or at the pipeline level") 

102 step.input_dir = self.input_dir 

103 return 

104 

105 prev_step = self.steps[-1] 

106 

107 # If no input specified, use previous step's output 

108 if not step.input_dir: 

109 step.input_dir = prev_step.output_dir or prev_step.input_dir 

110 

111 def _check_directory_conflicts(self, step: Step, proposed_dir: Path) -> bool: 

112 """ 

113 Check for directory conflicts in pipeline. 

114 

115 Args: 

116 step: Step being configured 

117 proposed_dir: Proposed output directory 

118 

119 Returns: 

120 bool: True if conflict exists 

121 """ 

122 proposed_dir = Path(proposed_dir) 

123 last_processing = next((s for s in reversed(self.steps) 

124 if s.__class__.__name__ != "PositionGenerationStep"), None) 

125 return (last_processing and Path(last_processing.output_dir) == proposed_dir) or \ 

126 (step.input_dir and Path(step.input_dir) == proposed_dir) 

127 

128 def _set_stitching_step_output_directory(self, step): 

129 """Set output directory for ImageStitchingStep.""" 

130 stitched_suffix = getattr(self.orchestrator.config, 'stitched_dir_suffix', '_stitched') if hasattr(self, 'orchestrator') else '_stitched' 

131 

132 # Always use the workspace directory (input directory) as the base 

133 # This ensures the stitched output is not in the same directory as any processing step 

134 base_dir = Path(self.input_dir) 

135 

136 # Create the stitched output directory 

137 step.output_dir = base_dir.parent / f"{base_dir.name}{stitched_suffix}" 

138 

139 # Check for conflicts and adjust if needed 

140 if self._check_directory_conflicts(step, step.output_dir): 

141 step.output_dir = base_dir.parent / f"{base_dir.name}{stitched_suffix}_final" 

142 

143 def _set_step_output_directory(self, step: Step): 

144 """Set the step's output directory if not already specified.""" 

145 if step.output_dir: 

146 return # Output directory already specified 

147 

148 # Get directory suffixes from orchestrator's config if available 

149 out_suffix = "_out" # Default suffix for all processing steps 

150 positions_suffix = "_positions" # Default suffix for position generation steps 

151 

152 # Try to get suffixes from orchestrator config 

153 if hasattr(self, 'orchestrator') and self.orchestrator and hasattr(self.orchestrator, 'config'): 

154 config = self.orchestrator.config 

155 out_suffix = config.out_dir_suffix 

156 positions_suffix = config.positions_dir_suffix 

157 

158 # Check if this is a stitching step 

159 is_stitching = step.__class__.__name__ == "ImageStitchingStep" 

160 

161 # Check if this is a position generation step 

162 is_position_generation = step.__class__.__name__ == "PositionGenerationStep" 

163 

164 # Special handling for ImageStitchingStep 

165 if is_stitching: 

166 # If the step's input_dir is the same as the pipeline's input_dir, 

167 # always use the default stitched directory to avoid conflicts with regular steps 

168 if step.input_dir == self.input_dir: 

169 self._set_stitching_step_output_directory(step) 

170 return 

171 

172 # If pipeline has an output_dir, use it for the stitching step 

173 if self.output_dir: 

174 step.output_dir = self.output_dir 

175 logger.info("ImageStitchingStep using pipeline output dir: %s", step.output_dir) 

176 return 

177 # Otherwise use the default stitching directory 

178 self._set_stitching_step_output_directory(step) 

179 return 

180 

181 # Special handling for PositionGenerationStep 

182 if is_position_generation: 

183 # Use the default positions directory 

184 input_path = Path(step.input_dir) 

185 step.output_dir = input_path.parent / f"{input_path.name}{positions_suffix}" 

186 logger.info("PositionGenerationStep using default directory: %s", step.output_dir) 

187 return 

188 

189 # For regular image processing steps (Step, ZFlatStep, CompositeStep, FocusStep) 

190 # Check if there's a previous step with the same input directory 

191 if self.steps: 

192 prev_step = self.steps[-1] 

193 # If this step's input is the previous step's output, use the same directory 

194 if step.input_dir == prev_step.output_dir: 

195 step.output_dir = step.input_dir 

196 logger.info("Step using in-place processing: %s", step.output_dir) 

197 return 

198 

199 # Otherwise use default output directory based on input_dir 

200 input_path = Path(step.input_dir) 

201 step.output_dir = input_path.parent / f"{input_path.name}{out_suffix}" 

202 logger.info("Processing step using default directory: %s", step.output_dir) 

203 # Don't create the directory yet - let the step create it when it's executed 

204 

205 def _update_pipeline_directories(self, step: Step): 

206 """Update pipeline directories based on the step if needed.""" 

207 # If this is the first step and pipeline's input_dir is not set, use step's input_dir 

208 if not self.steps and not self.input_dir and step.input_dir: 

209 self.input_dir = step.input_dir 

210 

211 # If pipeline's output_dir is not set, use the step's output_dir 

212 # Let each step handle its own directory logic 

213 if not self.output_dir and step.output_dir: 

214 self.output_dir = step.output_dir 

215 

216 def set_input(self, input_dir: str) -> 'Pipeline': 

217 """ 

218 Set the input directory. 

219 

220 Args: 

221 input_dir: The input directory 

222 

223 Returns: 

224 Self, for method chaining 

225 """ 

226 self.input_dir = input_dir 

227 return self 

228 

229 def set_output(self, output_dir: str) -> 'Pipeline': 

230 """ 

231 Set the output directory. 

232 

233 Args: 

234 output_dir: The output directory 

235 

236 Returns: 

237 Self, for method chaining 

238 """ 

239 self.output_dir = output_dir 

240 return self 

241 

242 def run( 

243 self, 

244 input_dir: str = None, 

245 output_dir: str = None, 

246 well_filter: WellFilter = None, 

247 microscope_handler = None, 

248 orchestrator = None, 

249 positions_file = None 

250 ) -> Dict[str, Any]: 

251 """ 

252 Execute the pipeline. 

253 

254 Args: 

255 input_dir: Optional input directory override 

256 output_dir: Optional output directory override 

257 well_filter: Optional well filter override 

258 microscope_handler: Optional microscope handler override 

259 orchestrator: Optional PipelineOrchestrator instance 

260 positions_file: Optional positions file to use for stitching 

261 

262 Returns: 

263 The results of the pipeline execution 

264 

265 Raises: 

266 ValueError: If no input directory is specified 

267 """ 

268 logger.info("Running pipeline: %s", self.name) 

269 

270 self.orchestrator = orchestrator 

271 self.microscope_handler = self.orchestrator.microscope_handler 

272 if orchestrator is None: 

273 raise ValueError("orchestrator must be specified") 

274 effective_input = input_dir or self.input_dir 

275 effective_output = output_dir or self.output_dir 

276 effective_well_filter = well_filter or self.well_filter 

277 

278 # If input_dir is still not set, try to get it from the first step 

279 if not effective_input and self.steps: 

280 effective_input = self.steps[0].input_dir 

281 

282 if not effective_input: 

283 raise ValueError("Input directory must be specified") 

284 

285 logger.info("Input directory: %s", effective_input) 

286 logger.info("Output directory: %s", effective_output) 

287 logger.info("Well filter: %s", effective_well_filter) 

288 

289 # Initialize context 

290 context = ProcessingContext( 

291 input_dir=effective_input, 

292 output_dir=effective_output, 

293 well_filter=effective_well_filter, 

294 orchestrator=orchestrator, 

295 ) 

296 

297 # Execute each step 

298 for i, step in enumerate(self.steps): 

299 logger.info("Executing step %d/%d: %s", i+1, len(self.steps), step) 

300 context = step.process(context) 

301 

302 logger.info("Pipeline completed: %s", self.name) 

303 return context.results 

304 

305 def collect_unique_dirs(self) -> set: 

306 """ 

307 Collects all unique directory paths from all steps in the pipeline. 

308 

309 Iterates through each step's attributes and collects values for attributes 

310 with "dir" in their name. 

311 

312 Returns: 

313 A set of unique directory paths. 

314 """ 

315 unique_dirs = set() 

316 for step in self.steps: 

317 for attr_name, attr_value in step.__dict__.items(): 

318 if "dir" in attr_name.lower() and attr_value: 

319 unique_dirs.add(attr_value) 

320 return unique_dirs 

321 

322 def __repr__(self) -> str: 

323 """ 

324 String representation of the pipeline. 

325 

326 Returns: 

327 A human-readable representation of the pipeline 

328 """ 

329 steps_repr = "\n ".join(repr(step) for step in self.steps) 

330 input_dir_str = str(self.input_dir) if self.input_dir else "None" 

331 output_dir_str = str(self.output_dir) if self.output_dir else "None" 

332 return (f"{self.name}\n" 

333 f" Input: {input_dir_str}\n" 

334 f" Output: {output_dir_str}\n" 

335 f" Well filter: {self.well_filter}\n" 

336 f" Steps:\n {steps_repr}") 

337 

338 

339class ProcessingContext: 

340 """ 

341 Maintains state during pipeline execution. 

342 

343 The ProcessingContext holds input/output directories, well filter, configuration, 

344 and results during pipeline execution. 

345 

346 Attributes: 

347 input_dir: The input directory 

348 output_dir: The output directory 

349 well_filter: Wells to process 

350 config: Configuration parameters 

351 results: Processing results 

352 """ 

353 

354 def __init__( 

355 self, 

356 input_dir: str = None, 

357 output_dir: str = None, 

358 well_filter: WellFilter = None, 

359 config: Dict[str, Any] = None, 

360 **kwargs 

361 ): 

362 """ 

363 Initialize the processing context. 

364 

365 Args: 

366 input_dir: The input directory 

367 output_dir: The output directory 

368 well_filter: Wells to process 

369 config: Configuration parameters 

370 **kwargs: Additional context attributes 

371 """ 

372 self.input_dir = input_dir 

373 self.output_dir = output_dir 

374 self.well_filter = well_filter 

375 self.config = config or {} 

376 self.results = {} 

377 

378 # Add any additional attributes 

379 for key, value in kwargs.items(): 

380 setattr(self, key, value) 

381 

382 

383 

384 

385 

386def group_patterns_by(patterns, component, microscope_handler=None): 

387 """ 

388 Group patterns by the specified component. 

389 

390 Args: 

391 patterns (list): Patterns to group 

392 Returns: 

393 dict: Dictionary mapping component values to lists of patterns 

394 """ 

395 grouped_patterns = {} 

396 for pattern in patterns: 

397 # Extract the component value from the pattern 

398 component_value = microscope_handler.parser.parse_filename(pattern)[component] 

399 if component_value not in grouped_patterns: 

400 grouped_patterns[component_value] = [] 

401 grouped_patterns[component_value].append(pattern) 

402 return grouped_patterns