Coverage for ezstitcher/core/file_system_manager.py: 74%

282 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2025-04-30 13:20 +0000

1""" 

2File system manager for ezstitcher. 

3 

4This module provides a class for managing file system operations. 

5""" 

6 

7import os 

8import re 

9import sys 

10import logging 

11import warnings 

12from pathlib import Path 

13from typing import Dict, List, Optional, Union, Any, Tuple, Pattern 

14import tifffile 

15import numpy as np 

16import shutil 

17#import imagecodecs 

18#import imagecodecs # Import imagecodecs for OperaPhenix TIFF reading 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class FileSystemManager: 

24 """ 

25 Manages file system operations for ezstitcher. 

26 Abstracts away direct file system interactions for improved testability. 

27 """ 

28 

29 default_extensions = ['.tif', '.TIF', '.tiff', '.TIFF', 

30 '.jpg', '.JPG', '.jpeg', '.JPEG', 

31 '.png', '.PNG'] 

32 

33 @staticmethod 

34 def ensure_directory(directory: Union[str, Path]) -> Path: 

35 """ 

36 Ensure a directory exists, creating it if necessary. 

37 

38 Args: 

39 directory (str or Path): Directory path to ensure exists 

40 

41 Returns: 

42 Path: Path object for the directory 

43 """ 

44 directory = Path(directory) 

45 directory.mkdir(parents=True, exist_ok=True) 

46 return directory 

47 

48 @staticmethod 

49 def list_image_files(directory: Union[str, Path], 

50 extensions: Optional[List[str]] = None, 

51 recursive: bool = True, 

52 ) -> List[Path]: 

53 """ 

54 List all image files in a directory with specified extensions. 

55 

56 Args: 

57 directory (str or Path): Directory to search 

58 extensions (list): List of file extensions to include 

59 recursive (bool): Whether to search recursively 

60 

61 Returns: 

62 list: List of Path objects for image files 

63 """ 

64 directory = Path(directory) 

65 if not directory.exists(): 

66 logger.warning(f"Directory does not exist: {directory}") 

67 return [] 

68 

69 if extensions is None: 

70 extensions = FileSystemManager.default_extensions 

71 

72 # Regular directory search 

73 image_files = [] 

74 for ext in extensions: 

75 if recursive: 

76 # Use ** for recursive search 

77 found_files = list(directory.glob(f"**/*{ext}")) 

78 else: 

79 # Use * for non-recursive search 

80 found_files = list(directory.glob(f"*{ext}")) 

81 

82 image_files.extend(found_files) 

83 

84 return sorted(image_files) 

85 

86 # Removed path_list_from_pattern - use pattern_matcher.path_list_from_pattern directly 

87 

88 @staticmethod 

89 def load_image(file_path: Union[str, Path]) -> Optional[np.ndarray]: 

90 """ 

91 Load an image. Only 2D images are supported. 

92 

93 Args: 

94 file_path (str or Path): Path to the image file 

95 

96 Returns: 

97 numpy.ndarray: 2D image or None if loading fails 

98 """ 

99 try: 

100 img = tifffile.imread(str(file_path)) 

101 

102 # Check if image is 3D and raise an error 

103 if img.ndim == 3: 

104 raise ValueError("3D images are not supported. Only 2D images can be loaded.") 

105 

106 return img 

107 except Exception as e: 

108 logger.error(f"Error loading image {file_path}: {e}") 

109 return None 

110 

111 @staticmethod 

112 def save_image(file_path: Union[str, Path], image: np.ndarray, 

113 compression: Optional[str] = None) -> bool: 

114 """ 

115 Save an image to disk. 

116 

117 Args: 

118 file_path (str or Path): Path to save the image 

119 image (numpy.ndarray): Image to save 

120 compression (str or None): Compression method 

121 

122 Returns: 

123 bool: True if successful, False otherwise 

124 """ 

125 try: 

126 # Ensure directory exists 

127 directory = Path(file_path).parent 

128 directory.mkdir(parents=True, exist_ok=True) 

129 

130 # Save image 

131 tifffile.imwrite(str(file_path), image, compression=compression) 

132 return True 

133 except Exception as e: 

134 logger.error(f"Error saving image {file_path}: {e}") 

135 return False 

136 

137 @staticmethod 

138 def copy_file(source_path: Union[str, Path], dest_path: Union[str, Path]) -> bool: 

139 """ 

140 Copy a file from source to destination, preserving metadata. 

141 

142 This method abstracts the file copying operation, ensuring that the destination 

143 directory exists and handling any errors that might occur. It preserves file 

144 metadata such as timestamps and permissions. 

145 

146 Args: 

147 source_path (str or Path): Source file path 

148 dest_path (str or Path): Destination file path 

149 

150 Returns: 

151 bool: True if successful, False otherwise 

152 """ 

153 try: 

154 # Ensure destination directory exists 

155 directory = Path(dest_path).parent 

156 directory.mkdir(parents=True, exist_ok=True) 

157 

158 # Copy file with metadata 

159 shutil.copy2(source_path, dest_path) 

160 return True 

161 except Exception as e: 

162 logger.error(f"Error copying file from {source_path} to {dest_path}: {e}") 

163 return False 

164 

165 @staticmethod 

166 def remove_directory(directory_path: Union[str, Path], recursive: bool = True) -> bool: 

167 """ 

168 Remove a directory and optionally all its contents. 

169 

170 This method abstracts directory removal operations, handling both recursive 

171 and non-recursive removal. It provides error handling and logging for 

172 directory removal operations. 

173 

174 Args: 

175 directory_path (str or Path): Path to the directory to remove 

176 recursive (bool): Whether to remove the directory recursively 

177 

178 Returns: 

179 bool: True if successful, False otherwise 

180 """ 

181 try: 

182 import shutil 

183 directory_path = Path(directory_path) 

184 

185 if recursive: 

186 shutil.rmtree(directory_path) 

187 else: 

188 directory_path.rmdir() 

189 

190 return True 

191 except Exception as e: 

192 logger.error(f"Error removing directory {directory_path}: {e}") 

193 return False 

194 

195 @staticmethod 

196 def empty_directory(directory_path: Union[str, Path]) -> bool: 

197 """ 

198 Empty a directory by recursively deleting all its contents. 

199 

200 This method removes all files and subdirectories within the specified directory 

201 but preserves the directory itself. It provides error handling and logging for 

202 directory emptying operations. 

203 

204 Args: 

205 directory_path (str or Path): Path to the directory to empty 

206 

207 Returns: 

208 bool: True if successful, False otherwise 

209 """ 

210 try: 

211 directory_path = Path(directory_path) 

212 

213 if not directory_path.exists() or not directory_path.is_dir(): 

214 logger.error(f"Cannot empty {directory_path}: Not a valid directory") 

215 return False 

216 

217 # Iterate through all entries in the directory 

218 for item in directory_path.iterdir(): 

219 if item.is_file() or item.is_symlink(): 

220 # Remove files and symlinks 

221 item.unlink() 

222 elif item.is_dir(): 

223 # Recursively remove subdirectories 

224 import shutil 

225 shutil.rmtree(item) 

226 

227 return True 

228 except Exception as e: 

229 logger.error(f"Error emptying directory {directory_path}: {e}") 

230 return False 

231 

232 

233 @staticmethod 

234 def find_file_recursive(directory: Union[str, Path], filename: str) -> Optional[Path]: 

235 """ 

236 Recursively search for a file by name in a directory and its subdirectories. 

237 Returns the first instance found. 

238 

239 Args: 

240 directory (str or Path): Directory to search in 

241 filename (str): Name of the file to find 

242 

243 Returns: 

244 Path or None: Path to the first instance of the file, or None if not found 

245 """ 

246 try: 

247 directory = Path(directory) 

248 

249 # Check if the file exists in the current directory 

250 file_path = directory / filename 

251 if file_path.exists() and file_path.is_file(): 

252 logger.debug(f"Found file {filename} in {directory}") 

253 return file_path 

254 

255 # Recursively search in subdirectories 

256 for item in directory.iterdir(): 

257 if item.is_dir(): 

258 result = FileSystemManager.find_file_recursive(item, filename) 

259 if result is not None: 

260 return result 

261 

262 # File not found in this directory or its subdirectories 

263 return None 

264 except Exception as e: 

265 logger.error(f"Error searching for file {filename} in {directory}: {e}") 

266 return None 

267 

268 @staticmethod 

269 def find_directory_substring_recursive(start_path: Union[str, Path], substring: str) -> Optional[Path]: 

270 """ 

271 Recursively search for a directory containing a substring in its name. 

272 Returns the path to the first directory found, or None if not found. 

273 

274 Args: 

275 start_path (str or Path): The directory path to start the search from. 

276 substring (str): The substring to search for in directory names. 

277 

278 Returns: 

279 Path or None: Path to the first matching directory, or None if not found. 

280 """ 

281 try: 

282 start_path = Path(start_path) 

283 

284 for root, dirs, files in os.walk(start_path): 

285 for dir_name in dirs: 

286 if substring in dir_name: 

287 found_dir_path = Path(root) / dir_name 

288 logger.debug(f"Found directory with substring '{substring}': {found_dir_path}") 

289 return found_dir_path 

290 

291 # Directory not found 

292 logger.debug(f"No directory found containing substring '{substring}' starting from {start_path}") 

293 return None 

294 except Exception as e: 

295 logger.error(f"Error searching for directory with substring '{substring}' in {start_path}: {e}") 

296 return None 

297 

298 

299 

300 

301 @staticmethod 

302 def rename_files_with_consistent_padding(directory, parser, width=3, force_suffixes=False): 

303 """ 

304 Rename files in a directory to have consistent site number and Z-index padding. 

305 Optionally force the addition of missing optional suffixes (site, channel, z-index). 

306 

307 Args: 

308 directory (str or Path): Directory containing files to rename 

309 parser (FilenameParser): Parser to use for filename parsing and padding (required) 

310 width (int, optional): Width to pad site numbers to 

311 force_suffixes (bool, optional): If True, add missing optional suffixes with default values 

312 

313 Returns: 

314 dict: Dictionary mapping original filenames to new filenames 

315 

316 Raises: 

317 ValueError: If parser is None 

318 """ 

319 directory = Path(directory) 

320 

321 # Ensure parser is provided 

322 if parser is None: 

323 raise ValueError("A FilenameParser instance must be provided") 

324 

325 # Find all image files 

326 image_files = FileSystemManager.list_image_files(directory, recursive=False) 

327 

328 # Map original filenames to reconstructed filenames 

329 rename_map = {} 

330 for file_path in image_files: 

331 original_name = file_path.name 

332 

333 # Parse the filename components 

334 metadata = parser.parse_filename(original_name) 

335 if not metadata: 

336 raise ValueError(f"Could not parse filename: {original_name}") 

337 

338 # Reconstruct the filename with proper padding 

339 # If force_suffixes is True, add default values for missing components 

340 if force_suffixes: 

341 # Default values for missing components 

342 site = metadata['site'] or 1 

343 channel = metadata['channel'] or 1 

344 z_index = metadata['z_index'] or 1 

345 else: 

346 # Use existing values or None 

347 site = metadata.get('site') 

348 channel = metadata.get('channel') 

349 z_index = metadata.get('z_index') 

350 

351 # Reconstruct the filename with proper padding 

352 new_name = parser.construct_filename( 

353 well=metadata['well'], 

354 site=site, 

355 channel=channel, 

356 z_index=z_index, 

357 extension=metadata['extension'], 

358 site_padding=width, 

359 z_padding=width 

360 ) 

361 

362 # Add to rename map if different 

363 if original_name != new_name: 

364 rename_map[original_name] = new_name 

365 

366 # Perform the renaming 

367 for original_name, new_name in rename_map.items(): 

368 original_path = directory / original_name 

369 new_path = directory / new_name 

370 

371 try: 

372 original_path.rename(new_path) 

373 logger.debug(f"Renamed {original_path} to {new_path}") 

374 except Exception as e: 

375 logger.error(f"Error renaming {original_path} to {new_path}: {e}") 

376 

377 return rename_map 

378 

379 @staticmethod 

380 def find_z_stack_dirs(root_dir: Union[str, Path], 

381 pattern: str = r"ZStep_\d+", 

382 recursive: bool = True) -> List[Tuple[int, Path]]: 

383 """ 

384 Find directories matching a pattern (default: ZStep_#) recursively. 

385 

386 Args: 

387 root_dir (str or Path): Root directory to start the search 

388 pattern (str): Regex pattern to match directory names (default: ZStep_ followed by digits) 

389 recursive (bool): Whether to search recursively in subdirectories 

390 

391 Returns: 

392 List of (z_index, directory) tuples where z_index is extracted from the pattern 

393 """ 

394 root_dir = Path(root_dir) 

395 if not root_dir.exists(): 

396 logger.warning(f"Directory does not exist: {root_dir}") 

397 return [] 

398 

399 z_stack_dirs = [] 

400 z_pattern = re.compile(pattern) 

401 

402 # Walk through directory structure 

403 for dirpath, dirnames, _ in os.walk(root_dir): 

404 # Process each directory at this level 

405 for dirname in dirnames: 

406 if z_pattern.search(dirname): 

407 dir_path = Path(dirpath) / dirname 

408 # Extract z-index from directory name (default to 0 if not found) 

409 try: 

410 digits_match = re.search(r'\d+', dirname) 

411 z_index = int(digits_match.group(0)) if digits_match else 0 

412 except (ValueError, IndexError): 

413 z_index = 0 

414 

415 z_stack_dirs.append((z_index, dir_path)) 

416 

417 # Stop recursion if not requested 

418 if not recursive: 

419 break 

420 

421 # Sort by Z-index 

422 z_stack_dirs.sort(key=lambda x: x[0]) 

423 

424 logger.debug(f"Found {len(z_stack_dirs)} directories matching pattern '{pattern}'") 

425 return z_stack_dirs 

426 

427 @staticmethod 

428 def find_image_directory(plate_folder: Union[str, Path], extensions: Optional[List[str]] = None) -> Path: 

429 """ 

430 Find the directory where images are actually located. 

431 

432 Handles both cases: 

433 1. Images directly in a folder (returns that folder) 

434 2. Images split across ZStep folders (returns parent of ZStep folders) 

435 

436 Args: 

437 plate_folder (str or Path): Base directory to search 

438 extensions (list): List of file extensions to include. If None, uses default_extensions. 

439 

440 Returns: 

441 Path: Path to the directory containing images 

442 """ 

443 plate_folder = Path(plate_folder) 

444 if not plate_folder.exists(): 

445 return plate_folder 

446 

447 # First check if we have ZStep folders 

448 z_stack_dirs = FileSystemManager.find_z_stack_dirs(plate_folder) 

449 if z_stack_dirs: 

450 # Check if there are images in the ZStep folders 

451 for _, z_dir in z_stack_dirs: 

452 if FileSystemManager.list_image_files(z_dir, extensions, recursive=False): 

453 # Return the parent directory of the first ZStep folder with images 

454 return z_dir.parent 

455 

456 # If no ZStep folders with images, find all images recursively 

457 images = FileSystemManager.list_image_files(plate_folder, extensions, recursive=True) 

458 

459 # If no images found, return original folder 

460 if not images: 

461 return plate_folder 

462 

463 # Count images by parent directory 

464 dir_counts = {} 

465 for img in images: 

466 parent = img.parent 

467 dir_counts[parent] = dir_counts.get(parent, 0) + 1 

468 

469 # Return directory with most images 

470 return max(dir_counts.items(), key=lambda x: x[1])[0] 

471 

472 @staticmethod 

473 def detect_zstack_folders(plate_folder, pattern=None): 

474 """ 

475 Detect Z-stack folders in a plate folder. 

476 

477 Args: 

478 plate_folder (str or Path): Path to the plate folder 

479 pattern (str or Pattern, optional): Regex pattern to match Z-stack folders 

480 

481 Returns: 

482 tuple: (has_zstack, z_folders) where z_folders is a list of (z_index, folder_path) tuples 

483 """ 

484 

485 plate_path = FileSystemManager.find_image_directory(Path(plate_folder)) 

486 

487 # Use find_z_stack_dirs to find Z-stack directories 

488 z_folders = FileSystemManager.find_z_stack_dirs( 

489 plate_path, 

490 pattern=pattern or r'ZStep_\d+', 

491 recursive=False # Only look in the immediate directory 

492 ) 

493 

494 return bool(z_folders), z_folders 

495 

496 @staticmethod 

497 def organize_zstack_folders(plate_folder, filename_parser): 

498 """ 

499 Organize Z-stack folders by moving files to the plate folder with proper naming. 

500 

501 Args: 

502 plate_folder (str or Path): Path to the plate folder 

503 filename_parser (FilenameParser): Parser for microscopy filenames (required) 

504 

505 Returns: 

506 bool: True if Z-stack was organized, False otherwise 

507 

508 Raises: 

509 ValueError: If filename_parser is None 

510 """ 

511 # Ensure parser is provided 

512 if filename_parser is None: 

513 raise ValueError("A FilenameParser instance must be provided") 

514 

515 has_zstack_folders, z_folders = FileSystemManager.detect_zstack_folders(plate_folder) 

516 if not has_zstack_folders: 

517 return False 

518 

519 plate_path = FileSystemManager.find_image_directory(plate_folder) 

520 

521 # Process each Z-stack folder 

522 for z_index, z_folder in z_folders: 

523 # Get all image files in this folder 

524 image_files = FileSystemManager.list_image_files(z_folder) 

525 

526 for img_file in image_files: 

527 # Parse the filename 

528 metadata = filename_parser.parse_filename(str(img_file)) 

529 if not metadata: 

530 continue 

531 

532 # Construct new filename with Z-index 

533 new_name = filename_parser.construct_filename( 

534 well=metadata['well'], 

535 site=metadata['site'], 

536 channel=metadata['channel'], 

537 z_index=z_index, 

538 extension=metadata['extension'] 

539 ) 

540 

541 # Copy file to plate folder 

542 new_path = plate_path / new_name 

543 FileSystemManager.copy_file(img_file, new_path) 

544 

545 # Remove Z-stack folders 

546 for _, z_folder in z_folders: 

547 FileSystemManager.remove_directory(z_folder) 

548 

549 return True 

550 

551 @staticmethod 

552 def delete_file(file_path: Union[str, Path]) -> bool: 

553 """ 

554 Delete a file from the file system. 

555 

556 This method abstracts the file deletion operation, handling any errors that might occur. 

557 It provides proper error handling and logging for file deletion operations. 

558 

559 Args: 

560 file_path (str or Path): Path to the file to delete 

561 

562 Returns: 

563 bool: True if successful, False otherwise 

564 """ 

565 try: 

566 file_path = Path(file_path) 

567 

568 # Check if the file exists 

569 if not file_path.exists(): 

570 logger.warning(f"File does not exist: {file_path}") 

571 return False 

572 

573 # Check if it's a file (not a directory) 

574 if not file_path.is_file(): 

575 logger.error(f"Not a file: {file_path}") 

576 return False 

577 

578 # Delete the file 

579 file_path.unlink() 

580 logger.debug(f"Deleted file: {file_path}") 

581 return True 

582 except Exception as e: 

583 logger.error(f"Error deleting file {file_path}: {e}") 

584 return False 

585 

586 #### SMELLY #### 

587 #### becoming god class #### 

588 @staticmethod 

589 def mirror_directory_with_symlinks(source_dir: Union[str, Path], 

590 target_dir: Union[str, Path], 

591 recursive: bool = True, 

592 overwrite: bool = True) -> int: 

593 """ 

594 Mirror a directory structure from source to target and create symlinks to all files. 

595 If the target directory exists and overwrite is True, it will be deleted and recreated. 

596 

597 Args: 

598 source_dir (str or Path): Path to the source directory to mirror 

599 target_dir (str or Path): Path to the target directory where the mirrored structure will be created 

600 recursive (bool, optional): Whether to recursively mirror subdirectories. Defaults to True. 

601 overwrite (bool, optional): Whether to overwrite the target directory if it exists. Defaults to True. 

602 

603 Returns: 

604 int: Number of symlinks created 

605 """ 

606 source_dir = Path(source_dir) 

607 target_dir = Path(target_dir) 

608 

609 # Ensure source directory exists 

610 if not source_dir.is_dir(): 

611 logger.error(f"Source directory not found: {source_dir}") 

612 return 0 

613 

614 # If target directory exists and overwrite is True, delete it 

615 if target_dir.exists() and overwrite: 

616 logger.info(f"Removing existing target directory: {target_dir}") 

617 try: 

618 shutil.rmtree(target_dir) 

619 except Exception as e: 

620 logger.error(f"Error removing target directory {target_dir}: {e}") 

621 logger.info("Continuing without removing the directory...") 

622 

623 # Create target directory 

624 target_dir.mkdir(parents=True, exist_ok=True) 

625 

626 # Counter for created symlinks 

627 symlinks_created = 0 

628 

629 # Get all items in the source directory 

630 try: 

631 items = list(source_dir.iterdir()) 

632 total_items = len(items) 

633 print(f"Found {total_items} items in {source_dir}") 

634 sys.stdout.flush() 

635 

636 # Process all items 

637 for i, item in enumerate(items): 

638 # Log progress every 100 items 

639 if i > 0 and i % 100 == 0: 

640 print(f"Processed {i}/{total_items} items ({(i/total_items)*100:.1f}%)") 

641 sys.stdout.flush() 

642 

643 # Handle subdirectories 

644 if item.is_dir() and recursive: 

645 symlinks_created += FileSystemManager.mirror_directory_with_symlinks( 

646 item, target_dir / item.name, recursive, False # Don't overwrite subdirectories 

647 ) 

648 continue 

649 

650 # Skip non-files 

651 if not item.is_file(): 

652 continue 

653 

654 # Create symlink 

655 target_path = target_dir / item.name 

656 

657 try: 

658 # Remove existing symlink if it exists 

659 if target_path.exists(): 

660 target_path.unlink() 

661 

662 # Create new symlink 

663 os.symlink(item.resolve(), target_path) 

664 symlinks_created += 1 

665 except Exception as e: 

666 logger.error(f"Error creating symlink from {item} to {target_path}: {e}") 

667 

668 print(f"Completed processing all {total_items} items in {source_dir}") 

669 sys.stdout.flush() 

670 except Exception as e: 

671 logger.error(f"Error processing directory {source_dir}: {e}") 

672 print(f"Error processing directory {source_dir}: {e}") 

673 sys.stdout.flush() 

674 

675 return symlinks_created