Coverage for ezstitcher/core/microscope_interfaces.py: 85%

336 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2025-04-30 13:20 +0000

1""" 

2Microscope interfaces for ezstitcher. 

3 

4This module provides abstract base classes for handling microscope-specific 

5functionality, including filename parsing and metadata handling. 

6""" 

7 

8import logging 

9import os 

10import re 

11import shutil 

12import sys 

13import time 

14import uuid 

15from abc import ABC, abstractmethod 

16from pathlib import Path 

17from typing import Dict, List, Optional, Union, Any, Tuple 

18from ezstitcher.core.file_system_manager import FileSystemManager 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class FilenameParser(ABC): 

24 """ 

25 Abstract base class for parsing microscopy image filenames. 

26 """ 

27 

28 # Constants 

29 FILENAME_COMPONENTS = ['well', 'site', 'channel', 'z_index', 'extension'] 

30 PLACEHOLDER_PATTERN = '{iii}' 

31 

32 @classmethod 

33 @abstractmethod 

34 def can_parse(cls, filename: str) -> bool: 

35 """ 

36 Check if this parser can parse the given filename. 

37 

38 Args: 

39 filename (str): Filename to check 

40 

41 Returns: 

42 bool: True if this parser can parse the filename, False otherwise 

43 """ 

44 pass 

45 

46 @abstractmethod 

47 def parse_filename(self, filename: str) -> Optional[Dict[str, Any]]: 

48 """ 

49 Parse a microscopy image filename to extract all components. 

50 

51 Args: 

52 filename (str): Filename to parse 

53 

54 Returns: 

55 dict or None: Dictionary with extracted components or None if parsing fails 

56 """ 

57 pass 

58 

59 @abstractmethod 

60 def construct_filename(self, well: str, site: Optional[Union[int, str]] = None, 

61 channel: Optional[int] = None, 

62 z_index: Optional[Union[int, str]] = None, 

63 extension: str = '.tif', 

64 site_padding: int = 3, z_padding: int = 3) -> str: 

65 """ 

66 Construct a filename from components. 

67 

68 Args: 

69 well (str): Well ID (e.g., 'A01') 

70 site (int or str, optional): Site number or placeholder string (e.g., '{iii}') 

71 channel (int, optional): Channel/wavelength number 

72 z_index (int or str, optional): Z-index or placeholder string (e.g., '{zzz}') 

73 extension (str, optional): File extension 

74 site_padding (int, optional): Width to pad site numbers to (default: 3) 

75 z_padding (int, optional): Width to pad Z-index numbers to (default: 3) 

76 

77 Returns: 

78 str: Constructed filename 

79 """ 

80 pass 

81 

82 def path_list_from_pattern(self, directory, pattern): 

83 """ 

84 Get a list of filenames matching a pattern in a directory. 

85 

86 Args: 

87 directory (str or Path): Directory to search 

88 pattern (str): Pattern to match with {iii} placeholder for site index 

89 

90 Returns: 

91 list: List of matching filenames 

92 """ 

93 directory = Path(directory) 

94 

95 # Handle substitution of {series} if present (from Ashlar) 

96 if "{series}" in pattern: 

97 pattern = pattern.replace("{series}", "{iii}") 

98 

99 # Parse the pattern to extract expected components 

100 pattern_metadata = self.parse_filename(pattern) 

101 if not pattern_metadata: 

102 logger.warning(f"Could not parse pattern: {pattern}") 

103 return [] 

104 

105 # Find all files in the directory 

106 matching_files = [] 

107 # Import here to avoid circular imports 

108 #from ezstitcher.core.file_system_manager import FileSystemManager 

109 all_images= FileSystemManager.list_image_files(directory) 

110 for file_path in all_images: 

111 if not file_path.is_file(): 

112 continue 

113 

114 # Parse the filename to extract its components 

115 file_metadata = self.parse_filename(file_path.name) 

116 if not file_metadata: 

117 continue 

118 

119 # Check if all non-None components in the pattern match the file 

120 is_match = True 

121 for key, value in pattern_metadata.items(): 

122 # Skip components that are None in the pattern (placeholders) 

123 if value is None: 

124 continue 

125 

126 # Check if the component exists in the file metadata and matches 

127 if key not in file_metadata or file_metadata[key] != value: 

128 is_match = False 

129 break 

130 

131 if is_match: 

132 matching_files.append(file_path.name) 

133 

134 # Log debug information 

135 logger.debug("Pattern: %s, Directory: %s, Files found: %d", pattern, directory, len(matching_files)) 

136 if matching_files and logger.isEnabledFor(logging.DEBUG): 

137 logger.debug("First file: %s", matching_files[0]) 

138 

139 # Use natural sorting instead of lexicographical sorting 

140 return self._natural_sort(matching_files) 

141 

142 def _natural_sort(self, file_list): 

143 """ 

144 Sort filenames naturally, so that site numbers are sorted numerically. 

145 E.g., ["s1", "s10", "s2"] -> ["s1", "s2", "s10"] 

146 

147 Args: 

148 file_list (list): List of filenames to sort 

149 

150 Returns: 

151 list: Naturally sorted list of filenames 

152 """ 

153 def natural_sort_key(s): 

154 return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)] 

155 

156 return sorted(file_list, key=natural_sort_key) 

157 

158 def group_patterns_by_component(self, patterns, component='channel', default_value='1'): 

159 """ 

160 Group patterns by a specific component (channel, z_index, site, well, etc.) 

161 

162 Args: 

163 patterns (list): List of patterns to group 

164 component (str): Component to group by (e.g., 'channel', 'z_index', 'site', 'well') 

165 default_value (str): Default value to use if component is not found 

166 

167 Returns: 

168 dict: Dictionary mapping component values to patterns 

169 """ 

170 grouped_patterns = {} 

171 

172 for pattern in patterns: 

173 # Replace placeholder with dummy value for parsing 

174 pattern_with_dummy = pattern.replace(self.PLACEHOLDER_PATTERN, '001') 

175 metadata = self.parse_filename(pattern_with_dummy) 

176 

177 if metadata and component in metadata and metadata[component] is not None: 

178 # Extract component value and convert to string 

179 value = str(metadata[component]) 

180 if value not in grouped_patterns: 

181 grouped_patterns[value] = [] 

182 grouped_patterns[value].append(pattern) 

183 else: 

184 # Use default value if component not found 

185 if default_value not in grouped_patterns: 

186 grouped_patterns[default_value] = [] 

187 grouped_patterns[default_value].append(pattern) 

188 if component != 'z_index': # z_index commonly defaults to 1, so don't log warning 

189 logger.warning("Could not extract %s from pattern: %s", component, pattern) 

190 

191 return grouped_patterns 

192 

193 def auto_detect_patterns(self, folder_path, well_filter=None, extensions=None, 

194 group_by=None, variable_components=None, flat=False): 

195 """ 

196 Automatically detect image patterns in a folder. 

197 

198 Args: 

199 folder_path (str or Path): Path to the folder 

200 well_filter (list): Optional list of wells to include 

201 extensions (list): Optional list of file extensions to include 

202 group_by (str, optional): Component to group patterns by (e.g., 'channel', 'z_index', 'well') 

203 If None, returns a flat list of patterns per well 

204 variable_components (list): List of components to make variable (e.g., ['site', 'z_index']) 

205 flat (bool): Deprecated. Use group_by=None instead. 

206 

207 Returns: 

208 dict: Dictionary mapping wells to patterns (either grouped by component or flat list) 

209 """ 

210 # Set default variable components if not provided 

211 if variable_components is None: 

212 variable_components = ['site'] 

213 

214 # Find all image files and group by well 

215 files_by_well = self._find_and_filter_images(folder_path, well_filter, extensions) 

216 

217 if not files_by_well: 

218 return {} 

219 

220 # Generate patterns for each well 

221 result = {} 

222 for well, files in files_by_well.items(): 

223 # Generate patterns for this well 

224 patterns = self._generate_patterns_for_files(files, variable_components) 

225 

226 # Return patterns based on requested format 

227 if flat or group_by is None: 

228 result[well] = patterns 

229 else: 

230 result[well] = self.group_patterns_by_component(patterns, component=group_by) 

231 

232 return result 

233 

234 def _find_and_filter_images(self, folder_path, well_filter=None, extensions=None): 

235 """ 

236 Find all image files in a directory and filter by well. 

237 

238 Args: 

239 folder_path (str or Path): Path to the folder 

240 well_filter (list): Optional list of wells to include 

241 extensions (list): Optional list of file extensions to include 

242 

243 Returns: 

244 dict: Dictionary mapping wells to lists of image files 

245 """ 

246 import time 

247 from collections import defaultdict 

248 

249 start_time = time.time() 

250 logger.info("Finding and filtering images in %s", folder_path) 

251 

252 # Find all image files 

253 folder_path = Path(folder_path) 

254 extensions = extensions or ['.tif', '.TIF', '.tiff', '.TIFF'] 

255 # Import here to avoid circular imports 

256 from ezstitcher.core.file_system_manager import FileSystemManager 

257 image_dir = FileSystemManager.find_image_directory(folder_path) 

258 logger.info("Using image directory: %s", image_dir) 

259 

260 # Check if this is an Opera Phenix dataset by checking for the remap_field_in_filename method 

261 is_opera_phenix = hasattr(self, 'remap_field_in_filename') 

262 

263 # For Opera Phenix, use a more efficient file detection approach 

264 if is_opera_phenix: 

265 logger.info("Detected Opera Phenix dataset. Using optimized file detection.") 

266 image_paths = [] 

267 

268 # Check root directory first 

269 for ext in extensions: 

270 root_images = list(image_dir.glob(f"*{ext}")) 

271 image_paths.extend(root_images) 

272 

273 # If no files in root, check immediate subdirectories 

274 if not image_paths: 

275 for subdir in image_dir.iterdir(): 

276 if subdir.is_dir(): 

277 for ext in extensions: 

278 subdir_images = list(subdir.glob(f"*{ext}")) 

279 image_paths.extend(subdir_images) 

280 else: 

281 # For other microscopes, use the standard approach but limit recursion depth 

282 image_paths = FileSystemManager.list_image_files(image_dir, extensions, recursive=True) 

283 

284 if not image_paths: 

285 logger.warning("No image files found in %s", folder_path) 

286 return {} 

287 

288 logger.info("Found %d image files in %.2f seconds. Grouping by well...", 

289 len(image_paths), time.time() - start_time) 

290 group_start = time.time() 

291 

292 # Group files by well 

293 files_by_well = defaultdict(list) 

294 for img_path in image_paths: 

295 metadata = self.parse_filename(img_path.name) 

296 if not metadata: 

297 continue 

298 

299 well = metadata['well'] 

300 # Case-insensitive well filtering 

301 if not well_filter or any(well.lower() == w.lower() for w in well_filter): 

302 files_by_well[well].append(img_path) 

303 

304 logger.info("Grouped %d files into %d wells in %.2f seconds", 

305 len(image_paths), len(files_by_well), time.time() - group_start) 

306 return files_by_well 

307 

308 def _generate_patterns_for_files(self, files, variable_components): 

309 """ 

310 Generate patterns for a list of files with specified variable components. 

311 

312 Args: 

313 files (list): List of file paths 

314 variable_components (list): List of components to make variable 

315 

316 Returns: 

317 list: List of patterns 

318 """ 

319 from collections import defaultdict 

320 

321 # Get unique combinations of non-variable components 

322 component_combinations = defaultdict(list) 

323 

324 for file_path in files: 

325 metadata = self.parse_filename(file_path.name) 

326 if not metadata: 

327 continue 

328 

329 # Create a key based on non-variable components 

330 key_parts = [] 

331 for comp in self.FILENAME_COMPONENTS: 

332 if comp in metadata and comp not in variable_components and metadata[comp] is not None: 

333 key_parts.append(f"{comp}={metadata[comp]}") 

334 

335 key = ",".join(key_parts) 

336 component_combinations[key].append((file_path, metadata)) 

337 

338 # Generate patterns for each combination 

339 patterns = [] 

340 for _, files_metadata in component_combinations.items(): 

341 if not files_metadata: 

342 continue 

343 

344 # Use the first file's metadata as a template 

345 _, template_metadata = files_metadata[0] 

346 

347 # Create pattern by replacing variable components with placeholders 

348 pattern_args = {} 

349 for comp in self.FILENAME_COMPONENTS: 

350 if comp in template_metadata: # Only include components that exist in the metadata 

351 if comp in variable_components: 

352 pattern_args[comp] = self.PLACEHOLDER_PATTERN 

353 else: 

354 pattern_args[comp] = template_metadata[comp] 

355 

356 # Construct the pattern 

357 pattern = self.construct_filename( 

358 well=pattern_args['well'], 

359 site=pattern_args.get('site'), 

360 channel=pattern_args.get('channel'), 

361 z_index=pattern_args.get('z_index'), 

362 extension=pattern_args.get('extension', '.tif') 

363 ) 

364 

365 patterns.append(pattern) 

366 

367 return patterns 

368 

369 

370class MetadataHandler(ABC): 

371 """ 

372 Abstract base class for handling microscope metadata. 

373 """ 

374 

375 @abstractmethod 

376 def find_metadata_file(self, plate_path: Union[str, Path]) -> Optional[Path]: 

377 """ 

378 Find the metadata file for a plate. 

379 

380 Args: 

381 plate_path: Path to the plate folder 

382 

383 Returns: 

384 Path to the metadata file, or None if not found 

385 """ 

386 pass 

387 

388 @abstractmethod 

389 def get_grid_dimensions(self, plate_path: Union[str, Path]) -> Tuple[int, int]: 

390 """ 

391 Get grid dimensions for stitching from metadata. 

392 

393 Args: 

394 plate_path: Path to the plate folder 

395 

396 Returns: 

397 (grid_size_x, grid_size_y) 

398 """ 

399 pass 

400 

401 @abstractmethod 

402 def get_pixel_size(self, plate_path: Union[str, Path]) -> Optional[float]: 

403 """ 

404 Get the pixel size from metadata. 

405 

406 Args: 

407 plate_path: Path to the plate folder 

408 

409 Returns: 

410 Pixel size in micrometers, or None if not available 

411 """ 

412 pass 

413 

414 

415class MicroscopeHandler: 

416 """Composed class for handling microscope-specific functionality.""" 

417 

418 DEFAULT_MICROSCOPE = 'ImageXpress' 

419 _handlers_cache = None 

420 

421 @classmethod 

422 def _discover_handlers(cls): 

423 """Discover all microscope handlers from the microscopes subpackage.""" 

424 if cls._handlers_cache: 

425 return cls._handlers_cache 

426 

427 import importlib, inspect, pkgutil 

428 from ezstitcher.microscopes import __path__ as microscopes_path 

429 

430 handlers = {} 

431 

432 # Find all modules in the microscopes package 

433 for _, module_name, _ in pkgutil.iter_modules(microscopes_path, 'ezstitcher.microscopes.'): 

434 try: 

435 module = importlib.import_module(module_name) 

436 

437 # Find FilenameParser implementations in the module 

438 for name, obj in inspect.getmembers(module, inspect.isclass): 

439 if obj.__module__ != module.__name__ or not issubclass(obj, FilenameParser) or obj == FilenameParser: 

440 continue 

441 

442 # Extract microscope type from class name 

443 microscope_type = name.replace('FilenameParser', '') 

444 

445 # Look for matching MetadataHandler 

446 handler_name = f"{microscope_type}MetadataHandler" 

447 handler_class = getattr(module, handler_name, None) 

448 

449 if handler_class and issubclass(handler_class, MetadataHandler): 

450 handlers[microscope_type] = (obj, handler_class) 

451 except Exception as e: 

452 logger.debug(f"Error inspecting module {module_name}: {e}") 

453 

454 cls._handlers_cache = handlers 

455 return handlers 

456 

457 def __init__(self, plate_folder=None, parser=None, metadata_handler=None, microscope_type='auto'): 

458 """Initialize with plate folder and optional components.""" 

459 self.plate_folder = Path(plate_folder) if plate_folder else None 

460 

461 if parser is None or metadata_handler is None: 

462 detected_type = self._detect_microscope_type(microscope_type) 

463 self.parser, self.metadata_handler = self._create_handlers(detected_type, parser, metadata_handler) 

464 else: 

465 self.parser, self.metadata_handler = parser, metadata_handler 

466 

467 def _detect_microscope_type(self, microscope_type): 

468 """Detect microscope type from files or use specified type.""" 

469 if microscope_type.lower() != 'auto' or not self.plate_folder: 

470 return microscope_type if microscope_type.lower() != 'auto' else self.DEFAULT_MICROSCOPE 

471 

472 try: 

473 # Get sample files and test each parser -> search recursively in dir 

474 sample_files = FileSystemManager.list_image_files(self.plate_folder)[:10] 

475 

476 if not sample_files: 

477 return self.DEFAULT_MICROSCOPE 

478 

479 matches = {} 

480 for name, (parser_class, _) in self._discover_handlers().items(): 

481 matches[name] = 0 

482 for f in sample_files: 

483 if parser_class.can_parse(f.name): 

484 matches[name] += 1 

485 

486 

487 best_match = max(matches.items(), key=lambda x: x[1]) if matches else (self.DEFAULT_MICROSCOPE, 0) 

488 if best_match[1] > 0: 

489 logger.info(f"Auto-detected {best_match[0]} format ({best_match[1]}/{len(sample_files)} files matched)") 

490 return best_match[0] 

491 

492 return self.DEFAULT_MICROSCOPE 

493 except Exception as e: 

494 logger.error(f"Error during auto-detection: {e}") 

495 return self.DEFAULT_MICROSCOPE 

496 

497 def _create_handlers(self, microscope_type, parser=None, metadata_handler=None): 

498 """Create parser and metadata handler for the specified microscope type.""" 

499 handlers = self._discover_handlers() 

500 parser_class, handler_class = handlers.get(microscope_type, handlers.get(self.DEFAULT_MICROSCOPE, (None, None))) 

501 

502 return (parser or (parser_class() if parser_class else None), 

503 metadata_handler or (handler_class() if handler_class else None)) 

504 

505 # Delegate filename parsing methods to parser 

506 

507 def parse_filename(self, filename: str) -> Optional[Dict[str, Any]]: 

508 """Delegate to parser.""" 

509 return self.parser.parse_filename(filename) 

510 

511 def construct_filename(self, well: str, site: Optional[Union[int, str]] = None, 

512 channel: Optional[int] = None, 

513 z_index: Optional[Union[int, str]] = None, 

514 extension: str = '.tif', 

515 site_padding: int = 3, z_padding: int = 3) -> str: 

516 """Delegate to parser.""" 

517 return self.parser.construct_filename( 

518 well, site, channel, z_index, extension, site_padding, z_padding 

519 ) 

520 

521 def auto_detect_patterns(self, folder_path, well_filter=None, extensions=None, 

522 group_by='channel', variable_components=None, flat=False): 

523 """Delegate to parser.""" 

524 return self.parser.auto_detect_patterns( 

525 folder_path, 

526 well_filter=well_filter, 

527 extensions=extensions, 

528 group_by=group_by, 

529 variable_components=variable_components, 

530 flat=flat 

531 ) 

532 

533 def path_list_from_pattern(self, directory, pattern): 

534 """Delegate to parser.""" 

535 return self.parser.path_list_from_pattern(directory, pattern) 

536 

537 # Delegate metadata handling methods to metadata_handler 

538 

539 def find_metadata_file(self, plate_path: Union[str, Path]) -> Optional[Path]: 

540 """Delegate to metadata handler.""" 

541 return self.metadata_handler.find_metadata_file(plate_path) 

542 

543 def get_grid_dimensions(self, plate_path: Union[str, Path]) -> Tuple[int, int]: 

544 """Delegate to metadata handler.""" 

545 return self.metadata_handler.get_grid_dimensions(plate_path) 

546 

547 def get_pixel_size(self, plate_path: Union[str, Path]) -> Optional[float]: 

548 """Delegate to metadata handler.""" 

549 return self.metadata_handler.get_pixel_size(plate_path) 

550 

551 def init_workspace(self, plate_path: Union[str, Path], workspace_path: Union[str, Path]) -> int: 

552 """Mirror the plate directory and create symlinks to all files. 

553 

554 For Opera Phenix, also renames symlinks based on field indices from Index.xml. 

555 

556 Args: 

557 plate_path: Path to the source plate directory 

558 workspace_path: Path to the target workspace directory 

559 

560 Returns: 

561 int: Number of symlinks created 

562 """ 

563 # Import here to avoid circular imports 

564 from ezstitcher.core.file_system_manager import FileSystemManager 

565 # Import time for performance logging 

566 import time 

567 import sys 

568 

569 plate_path = Path(plate_path) 

570 workspace_path = Path(workspace_path) 

571 self.plate_folder = workspace_path 

572 

573 print(f"Starting to mirror directory from {plate_path} to {workspace_path}") 

574 start_time = time.time() 

575 

576 # Create basic directory structure with symlinks 

577 print("Creating symlinks...") 

578 sys.stdout.flush() # Force output to be displayed immediately 

579 symlink_count = FileSystemManager.mirror_directory_with_symlinks(plate_path, workspace_path) 

580 

581 print(f"Mirroring completed in {time.time() - start_time:.2f} seconds. Created {symlink_count} symlinks.") 

582 sys.stdout.flush() # Force output to be displayed immediately 

583 

584 # Check if the parser has a remap_field_in_filename method (Opera Phenix specific) 

585 if hasattr(self.parser, 'remap_field_in_filename'): 

586 print("Detected Opera Phenix dataset. Checking for metadata file...") 

587 sys.stdout.flush() # Force output to be displayed immediately 

588 

589 # Find metadata file (Index.xml for Opera Phenix) 

590 metadata_file = self.metadata_handler.find_metadata_file(plate_path) 

591 

592 

593 ### SMELLY: this responsibility should be dedicated to an operaphenix specific class ### 

594 if metadata_file and hasattr(self.metadata_handler, 'create_xml_parser'): 

595 print(f"Found metadata file: {metadata_file}. Starting field remapping.") 

596 sys.stdout.flush() # Force output to be displayed immediately 

597 remap_start_time = time.time() 

598 

599 # Create XML parser using the metadata file 

600 print("Creating XML parser...") 

601 sys.stdout.flush() # Force output to be displayed immediately 

602 xml_parser = self.metadata_handler.create_xml_parser(metadata_file) 

603 

604 # Find image files in the workspace - limit to direct files in the workspace 

605 # rather than searching all subdirectories 

606 print("Finding image files in workspace...") 

607 sys.stdout.flush() # Force output to be displayed immediately 

608 

609 # Find the image directory (handles both root and subdirectory cases) 

610 image_dir = FileSystemManager.find_image_directory(workspace_path) 

611 print(f"Found image directory: {image_dir}") 

612 sys.stdout.flush() # Force output to be displayed immediately 

613 

614 # Find all image files in the directory using default extensions 

615 image_files = FileSystemManager.list_image_files( 

616 image_dir, 

617 recursive=True 

618 ) 

619 

620 total_files = len(image_files) 

621 print(f"Found {total_files} image files. Remapping field IDs...") 

622 sys.stdout.flush() # Force output to be displayed immediately 

623 

624 # Get field ID mapping 

625 print("Getting field ID mapping from XML...") 

626 sys.stdout.flush() 

627 field_mapping = xml_parser.get_field_id_mapping() 

628 

629 # Print the first 20 entries of the field mapping 

630 print("Field ID mapping (first 20 entries):") 

631 sorted_field_ids = sorted(field_mapping.keys())[:20] # Get first 20 field IDs 

632 for field_id in sorted_field_ids: 

633 new_field_id = field_mapping[field_id] 

634 print(f" Field {field_id:3d} -> {new_field_id:3d}") 

635 

636 # Create a temporary subfolder for all files that need to be renamed 

637 temp_folder_name = f"temp_rename_{uuid.uuid4().hex[:8]}" 

638 # Use the image_dir we already found 

639 temp_folder = image_dir / temp_folder_name 

640 #temp_folder = workspace_path / temp_folder_name 

641 temp_folder.mkdir(exist_ok=True) 

642 print(f"Created temporary folder: {temp_folder}") 

643 sys.stdout.flush() 

644 

645 ### SMELLY: we should log everything and reuse it 

646 # Calculate progress reporting interval based on total number of files 

647 # For large datasets, report less frequently 

648 if total_files > 10000: 

649 report_interval = 1000 # Report every 1000 files for very large datasets 

650 elif total_files > 1000: 

651 report_interval = 100 # Report every 100 files for large datasets 

652 else: 

653 report_interval = 10 # Report every 10 files for small datasets 

654 

655 

656 # Remap field IDs in filenames 

657 additional_symlinks = 0 

658 remapped_files = 0 

659 skipped_files = 0 

660 renamed_files = {} 

661 

662 print(f"Starting to process {total_files} files...") 

663 sys.stdout.flush() 

664 

665 # For each image in the folder: 

666 # 1. Get the field number from the filename 

667 # 2. Use the field number to find the new field number from the mapping 

668 # 3. Generate a new filename using the filename parser 

669 # 4. Move the image to a temporary folder 

670 processed_files = 0 

671 for image_file in image_files: 

672 processed_files += 1 

673 

674 # Log progress at appropriate intervals 

675 if processed_files > 0 and processed_files % report_interval == 0: 

676 percent_done = (processed_files/total_files)*100 

677 msg = f"Processed {processed_files}/{total_files} files" 

678 msg += f" ({percent_done:.1f}%)" 

679 print(msg) 

680 sys.stdout.flush() # Force output to be displayed immediately 

681 

682 # Parse the filename to get metadata 

683 metadata = self.parser.parse_filename(image_file.name) 

684 

685 if metadata and 'site' in metadata and metadata['site'] is not None: 

686 # Get the field ID (site number) 

687 field_id = metadata['site'] 

688 

689 # Find the new field ID from the mapping 

690 new_field_id = field_mapping.get(field_id, field_id) 

691 

692 # Create a new filename with the remapped field ID 

693 new_filename = self.parser.construct_filename( 

694 well=metadata['well'], 

695 site=new_field_id, # Use the remapped field ID 

696 channel=metadata['channel'], 

697 z_index=metadata['z_index'], 

698 extension=metadata['extension'], 

699 site_padding=3, 

700 z_padding=3 

701 ) 

702 

703 temp_path = temp_folder / new_filename 

704 shutil.move(str(image_dir / image_file), str(temp_path)) 

705 renamed_files[new_filename] = temp_path 

706 

707 symlink_count += additional_symlinks 

708 print(f"Field remapping completed in {time.time() - remap_start_time:.2f} seconds.") 

709 print(f"Remapped {remapped_files} files") 

710 print(f"Created {additional_symlinks} new symlinks") 

711 sys.stdout.flush() # Force output to be displayed immediately 

712 

713 # Move the renamed symlinks back to the original location with their new names 

714 print(f"Moving {len(renamed_files)} renamed files back to original location...") 

715 sys.stdout.flush() 

716 move_start_time = time.time() 

717 

718 for new_filename, temp_path in renamed_files.items(): 

719 try: 

720 # Move the file back to the original location with the new filename 

721 dest_path = image_dir / new_filename 

722 shutil.move(str(temp_path), str(dest_path)) 

723 except Exception as e: 

724 print(f"Error moving {temp_path} to {dest_path}: {e}") 

725 

726 print(f"Moved files back in {time.time() - move_start_time:.2f} seconds.") 

727 sys.stdout.flush() 

728 

729 # Clean up the temporary folder 

730 try: 

731 if temp_folder.exists(): 

732 # Check if the folder is empty before removing 

733 remaining_files = list(temp_folder.iterdir()) 

734 if remaining_files: 

735 num_remaining = len(remaining_files) 

736 print(f"Warning: {num_remaining} files remain in the temp folder") 

737 print("These files may have had conflicts during renaming.") 

738 shutil.rmtree(temp_folder) 

739 except Exception as e: 

740 print(f"Error removing temporary folder: {e}") 

741 

742def create_microscope_handler(microscope_type: str = 'auto', **kwargs) -> MicroscopeHandler: 

743 """Create the appropriate microscope handler.""" 

744 return MicroscopeHandler(microscope_type=microscope_type, **kwargs)