Coverage for openhcs/processing/backends/processors/numpy_processor.py: 25.2%
210 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2NumPy Image Processor Implementation
4This module implements the ImageProcessorInterface using NumPy as the backend.
5It serves as the reference implementation and works on CPU.
7Doctrinal Clauses:
8- Clause 3 — Declarative Primacy: All functions are pure and stateless
9- Clause 88 — No Inferred Capabilities: Explicit NumPy dependency
10- Clause 106-A — Declared Memory Types: All methods specify NumPy arrays
11"""
12from __future__ import annotations
14import logging
15from typing import Any, List, Optional, Tuple
17import numpy as np
18from skimage import exposure, filters
19from skimage import morphology as morph
20from skimage import transform as trans
22# Use direct import from core memory decorators to avoid circular imports
23from openhcs.core.memory.decorators import numpy as numpy_func
25logger = logging.getLogger(__name__)
28@numpy_func
29def create_linear_weight_mask(height: int, width: int, margin_ratio: float = 0.1) -> np.ndarray:
30 """
31 Create a 2D weight mask that linearly ramps from 0 at the edges to 1 in the center.
33 Args:
34 height: Height of the mask
35 width: Width of the mask
36 margin_ratio: Ratio of the margin to the image size
38 Returns:
39 2D weight mask of shape (height, width)
40 """
41 margin_y = int(np.floor(height * margin_ratio))
42 margin_x = int(np.floor(width * margin_ratio))
44 weight_y = np.ones(height, dtype=np.float32)
45 if margin_y > 0:
46 ramp_top = np.linspace(0, 1, margin_y, endpoint=False)
47 ramp_bottom = np.linspace(1, 0, margin_y, endpoint=False)
48 weight_y[:margin_y] = ramp_top
49 weight_y[-margin_y:] = ramp_bottom
51 weight_x = np.ones(width, dtype=np.float32)
52 if margin_x > 0:
53 ramp_left = np.linspace(0, 1, margin_x, endpoint=False)
54 ramp_right = np.linspace(1, 0, margin_x, endpoint=False)
55 weight_x[:margin_x] = ramp_left
56 weight_x[-margin_x:] = ramp_right
58 # Create 2D weight mask
59 weight_mask = np.outer(weight_y, weight_x)
61 return weight_mask
64def _validate_3d_array(array: Any, name: str = "input") -> None:
65 """
66 Validate that the input is a 3D NumPy array.
68 Args:
69 array: Array to validate
70 name: Name of the array for error messages
72 Raises:
73 TypeError: If the array is not a NumPy array
74 ValueError: If the array is not 3D
75 """
76 if not isinstance(array, np.ndarray): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 raise TypeError(f"{name} must be a NumPy array, got {type(array)}")
79 if array.ndim != 3: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 raise ValueError(f"{name} must be a 3D array, got {array.ndim}D")
82@numpy_func
83def sharpen(image: np.ndarray, radius: float = 1.0, amount: float = 1.0) -> np.ndarray:
84 """
85 Sharpen a 3D image using unsharp masking.
87 This applies sharpening to each Z-slice independently.
89 Args:
90 image: 3D NumPy array of shape (Z, Y, X)
91 radius: Radius of Gaussian blur
92 amount: Sharpening strength
94 Returns:
95 Sharpened 3D NumPy array of shape (Z, Y, X)
96 """
97 _validate_3d_array(image)
99 # Store original dtype
100 dtype = image.dtype
102 # Process each Z-slice independently
103 result = np.zeros_like(image, dtype=np.float32)
105 for z in range(image.shape[0]):
106 # Convert to float for processing
107 slice_float = image[z].astype(np.float32) / np.max(image[z])
109 # Create blurred version for unsharp mask
110 blurred = filters.gaussian(slice_float, sigma=radius)
112 # Apply unsharp mask: original + amount * (original - blurred)
113 sharpened = slice_float + amount * (slice_float - blurred)
115 # Clip to valid range
116 sharpened = np.clip(sharpened, 0, 1.0)
118 # Scale back to original range
119 sharpened = exposure.rescale_intensity(sharpened, in_range='image', out_range=(0, 65535))
120 result[z] = sharpened
122 # Convert back to original dtype
123 return result.astype(dtype)
125@numpy_func
126def percentile_normalize(image: np.ndarray,
127 low_percentile: float = 1.0,
128 high_percentile: float = 99.0,
129 target_min: float = 0.0,
130 target_max: float = 65535.0) -> np.ndarray:
131 """
132 Normalize a 3D image using percentile-based contrast stretching.
134 This applies normalization to each Z-slice independently.
136 Args:
137 image: 3D NumPy array of shape (Z, Y, X)
138 low_percentile: Lower percentile (0-100)
139 high_percentile: Upper percentile (0-100)
140 target_min: Target minimum value
141 target_max: Target maximum value
143 Returns:
144 Normalized 3D NumPy array of shape (Z, Y, X)
145 """
146 _validate_3d_array(image)
148 # Import shared utilities
149 from .percentile_utils import resolve_target_range, slice_percentile_normalize_core
151 # Auto-detect target range based on input dtype if not specified
152 target_min, target_max = resolve_target_range(image.dtype, target_min, target_max)
154 # Use shared core logic with NumPy-specific functions
155 return slice_percentile_normalize_core(
156 image=image,
157 low_percentile=low_percentile,
158 high_percentile=high_percentile,
159 target_min=target_min,
160 target_max=target_max,
161 percentile_func=np.percentile,
162 clip_func=np.clip,
163 ones_like_func=np.ones_like,
164 zeros_like_func=lambda arr, dtype=None: np.zeros_like(arr, dtype=dtype or np.float32)
165 )
167@numpy_func
168def stack_percentile_normalize(stack: np.ndarray,
169 low_percentile: float = 1.0,
170 high_percentile: float = 99.0,
171 target_min: float = 0.0,
172 target_max: float = 65535.0) -> np.ndarray:
173 """
174 Normalize a stack using global percentile-based contrast stretching.
176 This ensures consistent normalization across all Z-slices by computing
177 global percentiles across the entire stack.
179 Args:
180 stack: 3D NumPy array of shape (Z, Y, X)
181 low_percentile: Lower percentile (0-100)
182 high_percentile: Upper percentile (0-100)
183 target_min: Target minimum value
184 target_max: Target maximum value
186 Returns:
187 Normalized 3D NumPy array of shape (Z, Y, X)
188 """
189 _validate_3d_array(stack)
191 # Import shared utilities
192 from .percentile_utils import resolve_target_range, percentile_normalize_core
194 # Auto-detect target range based on input dtype if not specified
195 target_min, target_max = resolve_target_range(stack.dtype, target_min, target_max)
197 # Use shared core logic with NumPy-specific functions
198 return percentile_normalize_core(
199 stack=stack,
200 low_percentile=low_percentile,
201 high_percentile=high_percentile,
202 target_min=target_min,
203 target_max=target_max,
204 percentile_func=lambda arr, pct: np.percentile(arr, pct, axis=None),
205 clip_func=np.clip,
206 ones_like_func=np.ones_like
207 )
209@numpy_func
210def create_composite(
211 stack: np.ndarray, weights: Optional[List[float]] = None
212) -> np.ndarray:
213 """
214 Create a composite image from a 3D stack where each slice is a channel.
216 Args:
217 stack: 3D NumPy array of shape (N, Y, X) where N is number of channel slices
218 weights: List of weights for each slice. If None, equal weights are used.
220 Returns:
221 Composite 3D NumPy array of shape (1, Y, X)
222 """
223 # Validate input is 3D array
224 _validate_3d_array(stack)
226 n_slices, height, width = stack.shape
228 # Default weights if none provided
229 if weights is None: 229 ↛ 232line 229 didn't jump to line 232 because the condition on line 229 was always true
230 # Equal weights forangeit so that it can only all slices
231 weights = [1.0 / n_slices] * n_slices
232 elif isinstance(weights, (list, tuple)):
233 # Convert tuple to list if needed
234 weights = list(weights)
235 if len(weights) != n_slices:
236 raise ValueError(f"Number of weights ({len(weights)}) must match number of slices ({n_slices})")
237 else:
238 raise TypeError(f"weights must be a list of values or None, got {type(weights)}: {weights}")
240 # Normalize weights to sum to 1
241 weight_sum = sum(weights)
242 if weight_sum == 0: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 raise ValueError("Sum of weights cannot be zero")
244 normalized_weights = [w / weight_sum for w in weights]
246 # Convert weights to NumPy array for efficient computation
247 # CRITICAL: Use float32 for weights to preserve fractional values, not stack.dtype
248 weights_array = np.array(normalized_weights, dtype=np.float32)
250 # Reshape weights for broadcasting: (N, 1, 1) to multiply with (N, Y, X)
251 weights_array = weights_array.reshape(n_slices, 1, 1)
253 # Create composite by weighted sum along the first axis
254 # Convert stack to float32 for computation to avoid precision loss
255 stack_float = stack.astype(np.float32)
256 weighted_stack = stack_float * weights_array
257 composite_slice = np.sum(weighted_stack, axis=0, keepdims=True) # Keep as (1, Y, X)
259 # Convert back to original dtype
260 composite_slice = composite_slice.astype(stack.dtype)
262 return composite_slice
264@numpy_func
265def apply_mask(image: np.ndarray, mask: np.ndarray) -> np.ndarray:
266 """
267 Apply a mask to a 3D image.
269 This applies the mask to each Z-slice independently if mask is 2D,
270 or applies the 3D mask directly if mask is 3D.
272 Args:
273 image: 3D NumPy array of shape (Z, Y, X)
274 mask: 3D NumPy array of shape (Z, Y, X) or 2D NumPy array of shape (Y, X)
276 Returns:
277 Masked 3D NumPy array of shape (Z, Y, X)
278 """
279 _validate_3d_array(image)
281 # Handle 2D mask (apply to each Z-slice)
282 if isinstance(mask, np.ndarray) and mask.ndim == 2:
283 if mask.shape != image.shape[1:]:
284 raise ValueError(
285 f"2D mask shape {mask.shape} doesn't match image slice shape {image.shape[1:]}"
286 )
288 # Apply 2D mask to each Z-slice
289 result = np.zeros_like(image)
290 for z in range(image.shape[0]):
291 result[z] = image[z].astype(np.float32) * mask.astype(np.float32)
293 return result.astype(image.dtype)
295 # Handle 3D mask
296 if isinstance(mask, np.ndarray) and mask.ndim == 3:
297 if mask.shape != image.shape:
298 raise ValueError(
299 f"3D mask shape {mask.shape} doesn't match image shape {image.shape}"
300 )
302 # Apply 3D mask directly
303 masked = image.astype(np.float32) * mask.astype(np.float32)
304 return masked.astype(image.dtype)
306 # If we get here, the mask is neither 2D nor 3D NumPy array
307 raise TypeError(f"mask must be a 2D or 3D NumPy array, got {type(mask)}")
309@numpy_func
310def create_weight_mask(shape: Tuple[int, int], margin_ratio: float = 0.1) -> np.ndarray:
311 """
312 Create a weight mask for blending images.
314 Args:
315 shape: Shape of the mask (height, width)
316 margin_ratio: Ratio of image size to use as margin
318 Returns:
319 2D NumPy weight mask of shape (Y, X)
320 """
321 if not isinstance(shape, tuple) or len(shape) != 2:
322 raise TypeError("shape must be a tuple of (height, width)")
324 height, width = shape
325 return create_linear_weight_mask(height, width, margin_ratio)
327@numpy_func
328def max_projection(stack: np.ndarray) -> np.ndarray:
329 """
330 Create a maximum intensity projection from a Z-stack.
332 Args:
333 stack: 3D NumPy array of shape (Z, Y, X)
335 Returns:
336 3D NumPy array of shape (1, Y, X)
337 """
338 _validate_3d_array(stack)
340 # Create max projection
341 projection_2d = np.max(stack, axis=0)
342 return projection_2d.reshape(1, projection_2d.shape[0], projection_2d.shape[1])
344@numpy_func
345def mean_projection(stack: np.ndarray) -> np.ndarray:
346 """
347 Create a mean intensity projection from a Z-stack.
349 Args:
350 stack: 3D NumPy array of shape (Z, Y, X)
352 Returns:
353 3D NumPy array of shape (1, Y, X)
354 """
355 _validate_3d_array(stack)
357 # Create mean projection
358 projection_2d = np.mean(stack, axis=0).astype(stack.dtype)
359 return projection_2d.reshape(1, projection_2d.shape[0], projection_2d.shape[1])
361@numpy_func
362def spatial_bin_2d(
363 stack: np.ndarray,
364 bin_size: int = 2,
365 method: str = "mean"
366) -> np.ndarray:
367 """
368 Apply 2D spatial binning to each slice in the stack.
370 Reduces spatial resolution by combining neighboring pixels in 2D blocks.
371 Each slice is processed independently.
373 Args:
374 stack: 3D NumPy array of shape (Z, Y, X)
375 bin_size: Size of the square binning kernel (e.g., 2 = 2x2 binning)
376 method: Binning method - "mean", "sum", "max", or "min"
378 Returns:
379 Binned 3D NumPy array of shape (Z, Y//bin_size, X//bin_size)
380 """
381 _validate_3d_array(stack)
383 if bin_size <= 0:
384 raise ValueError("bin_size must be positive")
385 if method not in ["mean", "sum", "max", "min"]:
386 raise ValueError("method must be one of: mean, sum, max, min")
388 z_slices, height, width = stack.shape
390 # Calculate output dimensions
391 new_height = height // bin_size
392 new_width = width // bin_size
394 if new_height == 0 or new_width == 0:
395 raise ValueError(f"bin_size {bin_size} is too large for image dimensions {height}x{width}")
397 # Crop to make dimensions divisible by bin_size
398 crop_height = new_height * bin_size
399 crop_width = new_width * bin_size
400 cropped_stack = stack[:, :crop_height, :crop_width]
402 # Reshape for binning: (Z, new_height, bin_size, new_width, bin_size)
403 reshaped = cropped_stack.reshape(z_slices, new_height, bin_size, new_width, bin_size)
405 # Apply binning operation
406 if method == "mean":
407 result = np.mean(reshaped, axis=(2, 4))
408 elif method == "sum":
409 result = np.sum(reshaped, axis=(2, 4))
410 elif method == "max":
411 result = np.max(reshaped, axis=(2, 4))
412 elif method == "min":
413 result = np.min(reshaped, axis=(2, 4))
415 return result.astype(stack.dtype)
417@numpy_func
418def spatial_bin_3d(
419 stack: np.ndarray,
420 bin_size: int = 2,
421 method: str = "mean"
422) -> np.ndarray:
423 """
424 Apply 3D spatial binning to the entire stack.
426 Reduces spatial resolution by combining neighboring voxels in 3D blocks.
428 Args:
429 stack: 3D NumPy array of shape (Z, Y, X)
430 bin_size: Size of the cubic binning kernel (e.g., 2 = 2x2x2 binning)
431 method: Binning method - "mean", "sum", "max", or "min"
433 Returns:
434 Binned 3D NumPy array of shape (Z//bin_size, Y//bin_size, X//bin_size)
435 """
436 _validate_3d_array(stack)
438 if bin_size <= 0:
439 raise ValueError("bin_size must be positive")
440 if method not in ["mean", "sum", "max", "min"]:
441 raise ValueError("method must be one of: mean, sum, max, min")
443 depth, height, width = stack.shape
445 # Calculate output dimensions
446 new_depth = depth // bin_size
447 new_height = height // bin_size
448 new_width = width // bin_size
450 if new_depth == 0 or new_height == 0 or new_width == 0:
451 raise ValueError(f"bin_size {bin_size} is too large for stack dimensions {depth}x{height}x{width}")
453 # Crop to make dimensions divisible by bin_size
454 crop_depth = new_depth * bin_size
455 crop_height = new_height * bin_size
456 crop_width = new_width * bin_size
457 cropped_stack = stack[:crop_depth, :crop_height, :crop_width]
459 # Reshape for 3D binning: (new_depth, bin_size, new_height, bin_size, new_width, bin_size)
460 reshaped = cropped_stack.reshape(new_depth, bin_size, new_height, bin_size, new_width, bin_size)
462 # Apply binning operation across the three bin_size dimensions
463 if method == "mean":
464 result = np.mean(reshaped, axis=(1, 3, 5))
465 elif method == "sum":
466 result = np.sum(reshaped, axis=(1, 3, 5))
467 elif method == "max":
468 result = np.max(reshaped, axis=(1, 3, 5))
469 elif method == "min":
470 result = np.min(reshaped, axis=(1, 3, 5))
472 return result.astype(stack.dtype)
474@numpy_func
475def stack_equalize_histogram(
476 stack: np.ndarray,
477 bins: int = 65536,
478 range_min: float = 0.0,
479 range_max: float = 65535.0
480) -> np.ndarray:
481 """
482 Apply histogram equalization to an entire stack.
484 This ensures consistent contrast enhancement across all Z-slices by
485 computing a global histogram across the entire stack.
487 Args:
488 stack: 3D NumPy array of shape (Z, Y, X)
489 bins: Number of bins for histogram computation
490 range_min: Minimum value for histogram range
491 range_max: Maximum value for histogram range
493 Returns:
494 Equalized 3D NumPy array of shape (Z, Y, X)
495 """
496 _validate_3d_array(stack)
498 # Flatten the entire stack to compute the global histogram
499 flat_stack = stack.flatten()
501 # Calculate the histogram and cumulative distribution function (CDF)
502 hist, bin_edges = np.histogram(flat_stack, bins=bins, range=(range_min, range_max))
503 cdf = hist.cumsum()
505 # Normalize the CDF to the range [0, 65535]
506 # Avoid division by zero
507 if cdf[-1] > 0:
508 cdf = 65535 * cdf / cdf[-1]
510 # Use linear interpolation to map input values to equalized values
511 equalized_stack = np.interp(stack.flatten(), bin_edges[:-1], cdf).reshape(stack.shape)
513 # Convert to uint16
514 return equalized_stack.astype(np.uint16)
516@numpy_func
517def create_projection(stack: np.ndarray, method: str = "max_projection") -> np.ndarray:
518 """
519 Create a projection from a stack using the specified method.
521 Args:
522 stack: 3D NumPy array of shape (Z, Y, X)
523 method: Projection method (max_projection, mean_projection)
525 Returns:
526 3D NumPy array of shape (1, Y, X)
527 """
528 _validate_3d_array(stack)
530 if method == "max_projection": 530 ↛ 533line 530 didn't jump to line 533 because the condition on line 530 was always true
531 return max_projection(stack)
533 if method == "mean_projection":
534 return mean_projection(stack)
536 # FAIL FAST: No fallback projection methods
537 raise ValueError(f"Unknown projection method: {method}. Valid methods: max_projection, mean_projection")
540@numpy_func
541def crop(
542 input_image: np.ndarray,
543 start_x: int = 0,
544 start_y: int = 0,
545 start_z: int = 0,
546 width: int = 1,
547 height: int = 1,
548 depth: int = 1
549) -> np.ndarray:
550 """
551 Crop a given substack out of a given image stack.
553 Equivalent to pyclesperanto.crop() but using NumPy operations.
555 Parameters
556 ----------
557 input_image: np.ndarray
558 Input 3D image to process of shape (Z, Y, X)
559 start_x: int (= 0)
560 Starting index coordinate x
561 start_y: int (= 0)
562 Starting index coordinate y
563 start_z: int (= 0)
564 Starting index coordinate z
565 width: int (= 1)
566 Width size of the region to crop
567 height: int (= 1)
568 Height size of the region to crop
569 depth: int (= 1)
570 Depth size of the region to crop
572 Returns
573 -------
574 np.ndarray
575 Cropped 3D array of shape (depth, height, width)
576 """
577 _validate_3d_array(input_image)
579 # Validate crop parameters
580 if width <= 0 or height <= 0 or depth <= 0:
581 raise ValueError(f"Crop dimensions must be positive: width={width}, height={height}, depth={depth}")
583 if start_x < 0 or start_y < 0 or start_z < 0:
584 raise ValueError(f"Start coordinates must be non-negative: start_x={start_x}, start_y={start_y}, start_z={start_z}")
586 # Get input dimensions
587 input_depth, input_height, input_width = input_image.shape
589 # Calculate end coordinates
590 end_x = start_x + width
591 end_y = start_y + height
592 end_z = start_z + depth
594 # Validate bounds
595 if end_x > input_width or end_y > input_height or end_z > input_depth:
596 raise ValueError(
597 f"Crop region extends beyond image bounds. "
598 f"Image shape: {input_image.shape}, "
599 f"Crop region: ({start_z}:{end_z}, {start_y}:{end_y}, {start_x}:{end_x})"
600 )
602 # Perform the crop using NumPy slicing
603 cropped = input_image[start_z:end_z, start_y:end_y, start_x:end_x]
605 return cropped
607@numpy_func
608def tophat(
609 image: np.ndarray,
610 selem_radius: int = 50,
611 downsample_factor: int = 4,
612 downsample_anti_aliasing: bool = True,
613 upsample_order: int = 0
614) -> np.ndarray:
615 """
616 Apply white top-hat filter to a 3D image for background removal.
618 This applies the filter to each Z-slice independently.
620 Args:
621 image: 3D NumPy array of shape (Z, Y, X)
622 selem_radius: Radius of the structuring element disk
623 downsample_factor: Factor by which to downsample the image for processing
624 downsample_anti_aliasing: Whether to use anti-aliasing when downsampling
625 upsample_order: Interpolation order for upsampling (0=nearest, 1=linear, etc.)
627 Returns:
628 Filtered 3D NumPy array of shape (Z, Y, X)
629 """
630 _validate_3d_array(image)
632 # Process each Z-slice independently
633 result = np.zeros_like(image)
635 for z in range(image.shape[0]):
636 # Store original data type
637 input_dtype = image[z].dtype
639 # 1) Downsample
640 image_small = trans.resize(
641 image[z],
642 (image[z].shape[0]//downsample_factor, image[z].shape[1]//downsample_factor),
643 anti_aliasing=downsample_anti_aliasing,
644 preserve_range=True
645 )
647 # 2) Build structuring element for the smaller image
648 selem_small = morph.disk(selem_radius // downsample_factor)
650 # 3) White top-hat on the smaller image
651 tophat_small = morph.white_tophat(image_small, selem_small)
653 # 4) Upscale background to original size
654 background_small = image_small - tophat_small
655 background_large = trans.resize(
656 background_small,
657 image[z].shape,
658 order=upsample_order,
659 preserve_range=True
660 )
662 # 5) Subtract background and clip negative values
663 slice_result = np.maximum(image[z] - background_large, 0)
665 # 6) Convert back to original data type
666 result[z] = slice_result.astype(input_dtype)
668 return result