Coverage for ezstitcher/core/file_system_manager.py: 74%
282 statements
« prev ^ index » next coverage.py v7.3.2, created at 2025-04-30 13:20 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2025-04-30 13:20 +0000
1"""
2File system manager for ezstitcher.
4This module provides a class for managing file system operations.
5"""
7import os
8import re
9import sys
10import logging
11import warnings
12from pathlib import Path
13from typing import Dict, List, Optional, Union, Any, Tuple, Pattern
14import tifffile
15import numpy as np
16import shutil
17#import imagecodecs
18#import imagecodecs # Import imagecodecs for OperaPhenix TIFF reading
20logger = logging.getLogger(__name__)
23class FileSystemManager:
24 """
25 Manages file system operations for ezstitcher.
26 Abstracts away direct file system interactions for improved testability.
27 """
29 default_extensions = ['.tif', '.TIF', '.tiff', '.TIFF',
30 '.jpg', '.JPG', '.jpeg', '.JPEG',
31 '.png', '.PNG']
33 @staticmethod
34 def ensure_directory(directory: Union[str, Path]) -> Path:
35 """
36 Ensure a directory exists, creating it if necessary.
38 Args:
39 directory (str or Path): Directory path to ensure exists
41 Returns:
42 Path: Path object for the directory
43 """
44 directory = Path(directory)
45 directory.mkdir(parents=True, exist_ok=True)
46 return directory
48 @staticmethod
49 def list_image_files(directory: Union[str, Path],
50 extensions: Optional[List[str]] = None,
51 recursive: bool = True,
52 ) -> List[Path]:
53 """
54 List all image files in a directory with specified extensions.
56 Args:
57 directory (str or Path): Directory to search
58 extensions (list): List of file extensions to include
59 recursive (bool): Whether to search recursively
61 Returns:
62 list: List of Path objects for image files
63 """
64 directory = Path(directory)
65 if not directory.exists():
66 logger.warning(f"Directory does not exist: {directory}")
67 return []
69 if extensions is None:
70 extensions = FileSystemManager.default_extensions
72 # Regular directory search
73 image_files = []
74 for ext in extensions:
75 if recursive:
76 # Use ** for recursive search
77 found_files = list(directory.glob(f"**/*{ext}"))
78 else:
79 # Use * for non-recursive search
80 found_files = list(directory.glob(f"*{ext}"))
82 image_files.extend(found_files)
84 return sorted(image_files)
86 # Removed path_list_from_pattern - use pattern_matcher.path_list_from_pattern directly
88 @staticmethod
89 def load_image(file_path: Union[str, Path]) -> Optional[np.ndarray]:
90 """
91 Load an image. Only 2D images are supported.
93 Args:
94 file_path (str or Path): Path to the image file
96 Returns:
97 numpy.ndarray: 2D image or None if loading fails
98 """
99 try:
100 img = tifffile.imread(str(file_path))
102 # Check if image is 3D and raise an error
103 if img.ndim == 3:
104 raise ValueError("3D images are not supported. Only 2D images can be loaded.")
106 return img
107 except Exception as e:
108 logger.error(f"Error loading image {file_path}: {e}")
109 return None
111 @staticmethod
112 def save_image(file_path: Union[str, Path], image: np.ndarray,
113 compression: Optional[str] = None) -> bool:
114 """
115 Save an image to disk.
117 Args:
118 file_path (str or Path): Path to save the image
119 image (numpy.ndarray): Image to save
120 compression (str or None): Compression method
122 Returns:
123 bool: True if successful, False otherwise
124 """
125 try:
126 # Ensure directory exists
127 directory = Path(file_path).parent
128 directory.mkdir(parents=True, exist_ok=True)
130 # Save image
131 tifffile.imwrite(str(file_path), image, compression=compression)
132 return True
133 except Exception as e:
134 logger.error(f"Error saving image {file_path}: {e}")
135 return False
137 @staticmethod
138 def copy_file(source_path: Union[str, Path], dest_path: Union[str, Path]) -> bool:
139 """
140 Copy a file from source to destination, preserving metadata.
142 This method abstracts the file copying operation, ensuring that the destination
143 directory exists and handling any errors that might occur. It preserves file
144 metadata such as timestamps and permissions.
146 Args:
147 source_path (str or Path): Source file path
148 dest_path (str or Path): Destination file path
150 Returns:
151 bool: True if successful, False otherwise
152 """
153 try:
154 # Ensure destination directory exists
155 directory = Path(dest_path).parent
156 directory.mkdir(parents=True, exist_ok=True)
158 # Copy file with metadata
159 shutil.copy2(source_path, dest_path)
160 return True
161 except Exception as e:
162 logger.error(f"Error copying file from {source_path} to {dest_path}: {e}")
163 return False
165 @staticmethod
166 def remove_directory(directory_path: Union[str, Path], recursive: bool = True) -> bool:
167 """
168 Remove a directory and optionally all its contents.
170 This method abstracts directory removal operations, handling both recursive
171 and non-recursive removal. It provides error handling and logging for
172 directory removal operations.
174 Args:
175 directory_path (str or Path): Path to the directory to remove
176 recursive (bool): Whether to remove the directory recursively
178 Returns:
179 bool: True if successful, False otherwise
180 """
181 try:
182 import shutil
183 directory_path = Path(directory_path)
185 if recursive:
186 shutil.rmtree(directory_path)
187 else:
188 directory_path.rmdir()
190 return True
191 except Exception as e:
192 logger.error(f"Error removing directory {directory_path}: {e}")
193 return False
195 @staticmethod
196 def empty_directory(directory_path: Union[str, Path]) -> bool:
197 """
198 Empty a directory by recursively deleting all its contents.
200 This method removes all files and subdirectories within the specified directory
201 but preserves the directory itself. It provides error handling and logging for
202 directory emptying operations.
204 Args:
205 directory_path (str or Path): Path to the directory to empty
207 Returns:
208 bool: True if successful, False otherwise
209 """
210 try:
211 directory_path = Path(directory_path)
213 if not directory_path.exists() or not directory_path.is_dir():
214 logger.error(f"Cannot empty {directory_path}: Not a valid directory")
215 return False
217 # Iterate through all entries in the directory
218 for item in directory_path.iterdir():
219 if item.is_file() or item.is_symlink():
220 # Remove files and symlinks
221 item.unlink()
222 elif item.is_dir():
223 # Recursively remove subdirectories
224 import shutil
225 shutil.rmtree(item)
227 return True
228 except Exception as e:
229 logger.error(f"Error emptying directory {directory_path}: {e}")
230 return False
233 @staticmethod
234 def find_file_recursive(directory: Union[str, Path], filename: str) -> Optional[Path]:
235 """
236 Recursively search for a file by name in a directory and its subdirectories.
237 Returns the first instance found.
239 Args:
240 directory (str or Path): Directory to search in
241 filename (str): Name of the file to find
243 Returns:
244 Path or None: Path to the first instance of the file, or None if not found
245 """
246 try:
247 directory = Path(directory)
249 # Check if the file exists in the current directory
250 file_path = directory / filename
251 if file_path.exists() and file_path.is_file():
252 logger.debug(f"Found file {filename} in {directory}")
253 return file_path
255 # Recursively search in subdirectories
256 for item in directory.iterdir():
257 if item.is_dir():
258 result = FileSystemManager.find_file_recursive(item, filename)
259 if result is not None:
260 return result
262 # File not found in this directory or its subdirectories
263 return None
264 except Exception as e:
265 logger.error(f"Error searching for file {filename} in {directory}: {e}")
266 return None
268 @staticmethod
269 def find_directory_substring_recursive(start_path: Union[str, Path], substring: str) -> Optional[Path]:
270 """
271 Recursively search for a directory containing a substring in its name.
272 Returns the path to the first directory found, or None if not found.
274 Args:
275 start_path (str or Path): The directory path to start the search from.
276 substring (str): The substring to search for in directory names.
278 Returns:
279 Path or None: Path to the first matching directory, or None if not found.
280 """
281 try:
282 start_path = Path(start_path)
284 for root, dirs, files in os.walk(start_path):
285 for dir_name in dirs:
286 if substring in dir_name:
287 found_dir_path = Path(root) / dir_name
288 logger.debug(f"Found directory with substring '{substring}': {found_dir_path}")
289 return found_dir_path
291 # Directory not found
292 logger.debug(f"No directory found containing substring '{substring}' starting from {start_path}")
293 return None
294 except Exception as e:
295 logger.error(f"Error searching for directory with substring '{substring}' in {start_path}: {e}")
296 return None
301 @staticmethod
302 def rename_files_with_consistent_padding(directory, parser, width=3, force_suffixes=False):
303 """
304 Rename files in a directory to have consistent site number and Z-index padding.
305 Optionally force the addition of missing optional suffixes (site, channel, z-index).
307 Args:
308 directory (str or Path): Directory containing files to rename
309 parser (FilenameParser): Parser to use for filename parsing and padding (required)
310 width (int, optional): Width to pad site numbers to
311 force_suffixes (bool, optional): If True, add missing optional suffixes with default values
313 Returns:
314 dict: Dictionary mapping original filenames to new filenames
316 Raises:
317 ValueError: If parser is None
318 """
319 directory = Path(directory)
321 # Ensure parser is provided
322 if parser is None:
323 raise ValueError("A FilenameParser instance must be provided")
325 # Find all image files
326 image_files = FileSystemManager.list_image_files(directory, recursive=False)
328 # Map original filenames to reconstructed filenames
329 rename_map = {}
330 for file_path in image_files:
331 original_name = file_path.name
333 # Parse the filename components
334 metadata = parser.parse_filename(original_name)
335 if not metadata:
336 raise ValueError(f"Could not parse filename: {original_name}")
338 # Reconstruct the filename with proper padding
339 # If force_suffixes is True, add default values for missing components
340 if force_suffixes:
341 # Default values for missing components
342 site = metadata['site'] or 1
343 channel = metadata['channel'] or 1
344 z_index = metadata['z_index'] or 1
345 else:
346 # Use existing values or None
347 site = metadata.get('site')
348 channel = metadata.get('channel')
349 z_index = metadata.get('z_index')
351 # Reconstruct the filename with proper padding
352 new_name = parser.construct_filename(
353 well=metadata['well'],
354 site=site,
355 channel=channel,
356 z_index=z_index,
357 extension=metadata['extension'],
358 site_padding=width,
359 z_padding=width
360 )
362 # Add to rename map if different
363 if original_name != new_name:
364 rename_map[original_name] = new_name
366 # Perform the renaming
367 for original_name, new_name in rename_map.items():
368 original_path = directory / original_name
369 new_path = directory / new_name
371 try:
372 original_path.rename(new_path)
373 logger.debug(f"Renamed {original_path} to {new_path}")
374 except Exception as e:
375 logger.error(f"Error renaming {original_path} to {new_path}: {e}")
377 return rename_map
379 @staticmethod
380 def find_z_stack_dirs(root_dir: Union[str, Path],
381 pattern: str = r"ZStep_\d+",
382 recursive: bool = True) -> List[Tuple[int, Path]]:
383 """
384 Find directories matching a pattern (default: ZStep_#) recursively.
386 Args:
387 root_dir (str or Path): Root directory to start the search
388 pattern (str): Regex pattern to match directory names (default: ZStep_ followed by digits)
389 recursive (bool): Whether to search recursively in subdirectories
391 Returns:
392 List of (z_index, directory) tuples where z_index is extracted from the pattern
393 """
394 root_dir = Path(root_dir)
395 if not root_dir.exists():
396 logger.warning(f"Directory does not exist: {root_dir}")
397 return []
399 z_stack_dirs = []
400 z_pattern = re.compile(pattern)
402 # Walk through directory structure
403 for dirpath, dirnames, _ in os.walk(root_dir):
404 # Process each directory at this level
405 for dirname in dirnames:
406 if z_pattern.search(dirname):
407 dir_path = Path(dirpath) / dirname
408 # Extract z-index from directory name (default to 0 if not found)
409 try:
410 digits_match = re.search(r'\d+', dirname)
411 z_index = int(digits_match.group(0)) if digits_match else 0
412 except (ValueError, IndexError):
413 z_index = 0
415 z_stack_dirs.append((z_index, dir_path))
417 # Stop recursion if not requested
418 if not recursive:
419 break
421 # Sort by Z-index
422 z_stack_dirs.sort(key=lambda x: x[0])
424 logger.debug(f"Found {len(z_stack_dirs)} directories matching pattern '{pattern}'")
425 return z_stack_dirs
427 @staticmethod
428 def find_image_directory(plate_folder: Union[str, Path], extensions: Optional[List[str]] = None) -> Path:
429 """
430 Find the directory where images are actually located.
432 Handles both cases:
433 1. Images directly in a folder (returns that folder)
434 2. Images split across ZStep folders (returns parent of ZStep folders)
436 Args:
437 plate_folder (str or Path): Base directory to search
438 extensions (list): List of file extensions to include. If None, uses default_extensions.
440 Returns:
441 Path: Path to the directory containing images
442 """
443 plate_folder = Path(plate_folder)
444 if not plate_folder.exists():
445 return plate_folder
447 # First check if we have ZStep folders
448 z_stack_dirs = FileSystemManager.find_z_stack_dirs(plate_folder)
449 if z_stack_dirs:
450 # Check if there are images in the ZStep folders
451 for _, z_dir in z_stack_dirs:
452 if FileSystemManager.list_image_files(z_dir, extensions, recursive=False):
453 # Return the parent directory of the first ZStep folder with images
454 return z_dir.parent
456 # If no ZStep folders with images, find all images recursively
457 images = FileSystemManager.list_image_files(plate_folder, extensions, recursive=True)
459 # If no images found, return original folder
460 if not images:
461 return plate_folder
463 # Count images by parent directory
464 dir_counts = {}
465 for img in images:
466 parent = img.parent
467 dir_counts[parent] = dir_counts.get(parent, 0) + 1
469 # Return directory with most images
470 return max(dir_counts.items(), key=lambda x: x[1])[0]
472 @staticmethod
473 def detect_zstack_folders(plate_folder, pattern=None):
474 """
475 Detect Z-stack folders in a plate folder.
477 Args:
478 plate_folder (str or Path): Path to the plate folder
479 pattern (str or Pattern, optional): Regex pattern to match Z-stack folders
481 Returns:
482 tuple: (has_zstack, z_folders) where z_folders is a list of (z_index, folder_path) tuples
483 """
485 plate_path = FileSystemManager.find_image_directory(Path(plate_folder))
487 # Use find_z_stack_dirs to find Z-stack directories
488 z_folders = FileSystemManager.find_z_stack_dirs(
489 plate_path,
490 pattern=pattern or r'ZStep_\d+',
491 recursive=False # Only look in the immediate directory
492 )
494 return bool(z_folders), z_folders
496 @staticmethod
497 def organize_zstack_folders(plate_folder, filename_parser):
498 """
499 Organize Z-stack folders by moving files to the plate folder with proper naming.
501 Args:
502 plate_folder (str or Path): Path to the plate folder
503 filename_parser (FilenameParser): Parser for microscopy filenames (required)
505 Returns:
506 bool: True if Z-stack was organized, False otherwise
508 Raises:
509 ValueError: If filename_parser is None
510 """
511 # Ensure parser is provided
512 if filename_parser is None:
513 raise ValueError("A FilenameParser instance must be provided")
515 has_zstack_folders, z_folders = FileSystemManager.detect_zstack_folders(plate_folder)
516 if not has_zstack_folders:
517 return False
519 plate_path = FileSystemManager.find_image_directory(plate_folder)
521 # Process each Z-stack folder
522 for z_index, z_folder in z_folders:
523 # Get all image files in this folder
524 image_files = FileSystemManager.list_image_files(z_folder)
526 for img_file in image_files:
527 # Parse the filename
528 metadata = filename_parser.parse_filename(str(img_file))
529 if not metadata:
530 continue
532 # Construct new filename with Z-index
533 new_name = filename_parser.construct_filename(
534 well=metadata['well'],
535 site=metadata['site'],
536 channel=metadata['channel'],
537 z_index=z_index,
538 extension=metadata['extension']
539 )
541 # Copy file to plate folder
542 new_path = plate_path / new_name
543 FileSystemManager.copy_file(img_file, new_path)
545 # Remove Z-stack folders
546 for _, z_folder in z_folders:
547 FileSystemManager.remove_directory(z_folder)
549 return True
551 @staticmethod
552 def delete_file(file_path: Union[str, Path]) -> bool:
553 """
554 Delete a file from the file system.
556 This method abstracts the file deletion operation, handling any errors that might occur.
557 It provides proper error handling and logging for file deletion operations.
559 Args:
560 file_path (str or Path): Path to the file to delete
562 Returns:
563 bool: True if successful, False otherwise
564 """
565 try:
566 file_path = Path(file_path)
568 # Check if the file exists
569 if not file_path.exists():
570 logger.warning(f"File does not exist: {file_path}")
571 return False
573 # Check if it's a file (not a directory)
574 if not file_path.is_file():
575 logger.error(f"Not a file: {file_path}")
576 return False
578 # Delete the file
579 file_path.unlink()
580 logger.debug(f"Deleted file: {file_path}")
581 return True
582 except Exception as e:
583 logger.error(f"Error deleting file {file_path}: {e}")
584 return False
586 #### SMELLY ####
587 #### becoming god class ####
588 @staticmethod
589 def mirror_directory_with_symlinks(source_dir: Union[str, Path],
590 target_dir: Union[str, Path],
591 recursive: bool = True,
592 overwrite: bool = True) -> int:
593 """
594 Mirror a directory structure from source to target and create symlinks to all files.
595 If the target directory exists and overwrite is True, it will be deleted and recreated.
597 Args:
598 source_dir (str or Path): Path to the source directory to mirror
599 target_dir (str or Path): Path to the target directory where the mirrored structure will be created
600 recursive (bool, optional): Whether to recursively mirror subdirectories. Defaults to True.
601 overwrite (bool, optional): Whether to overwrite the target directory if it exists. Defaults to True.
603 Returns:
604 int: Number of symlinks created
605 """
606 source_dir = Path(source_dir)
607 target_dir = Path(target_dir)
609 # Ensure source directory exists
610 if not source_dir.is_dir():
611 logger.error(f"Source directory not found: {source_dir}")
612 return 0
614 # If target directory exists and overwrite is True, delete it
615 if target_dir.exists() and overwrite:
616 logger.info(f"Removing existing target directory: {target_dir}")
617 try:
618 shutil.rmtree(target_dir)
619 except Exception as e:
620 logger.error(f"Error removing target directory {target_dir}: {e}")
621 logger.info("Continuing without removing the directory...")
623 # Create target directory
624 target_dir.mkdir(parents=True, exist_ok=True)
626 # Counter for created symlinks
627 symlinks_created = 0
629 # Get all items in the source directory
630 try:
631 items = list(source_dir.iterdir())
632 total_items = len(items)
633 print(f"Found {total_items} items in {source_dir}")
634 sys.stdout.flush()
636 # Process all items
637 for i, item in enumerate(items):
638 # Log progress every 100 items
639 if i > 0 and i % 100 == 0:
640 print(f"Processed {i}/{total_items} items ({(i/total_items)*100:.1f}%)")
641 sys.stdout.flush()
643 # Handle subdirectories
644 if item.is_dir() and recursive:
645 symlinks_created += FileSystemManager.mirror_directory_with_symlinks(
646 item, target_dir / item.name, recursive, False # Don't overwrite subdirectories
647 )
648 continue
650 # Skip non-files
651 if not item.is_file():
652 continue
654 # Create symlink
655 target_path = target_dir / item.name
657 try:
658 # Remove existing symlink if it exists
659 if target_path.exists():
660 target_path.unlink()
662 # Create new symlink
663 os.symlink(item.resolve(), target_path)
664 symlinks_created += 1
665 except Exception as e:
666 logger.error(f"Error creating symlink from {item} to {target_path}: {e}")
668 print(f"Completed processing all {total_items} items in {source_dir}")
669 sys.stdout.flush()
670 except Exception as e:
671 logger.error(f"Error processing directory {source_dir}: {e}")
672 print(f"Error processing directory {source_dir}: {e}")
673 sys.stdout.flush()
675 return symlinks_created