Coverage for openhcs/processing/backends/processors/numpy_processor.py: 14.2%
210 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2NumPy Image Processor Implementation
4This module implements the ImageProcessorInterface using NumPy as the backend.
5It serves as the reference implementation and works on CPU.
7Doctrinal Clauses:
8- Clause 3 — Declarative Primacy: All functions are pure and stateless
9- Clause 88 — No Inferred Capabilities: Explicit NumPy dependency
10- Clause 106-A — Declared Memory Types: All methods specify NumPy arrays
11"""
12from __future__ import annotations
14import logging
15from typing import TYPE_CHECKING, Any, List, Optional, Tuple
17import numpy as np
18from skimage import exposure, filters
19from skimage import morphology as morph
20from skimage import transform as trans
22# Use direct import from core memory decorators to avoid circular imports
23from openhcs.core.memory.decorators import numpy as numpy_func
25logger = logging.getLogger(__name__)
28@numpy_func
29def create_linear_weight_mask(height: int, width: int, margin_ratio: float = 0.1) -> np.ndarray:
30 """
31 Create a 2D weight mask that linearly ramps from 0 at the edges to 1 in the center.
33 Args:
34 height: Height of the mask
35 width: Width of the mask
36 margin_ratio: Ratio of the margin to the image size
38 Returns:
39 2D weight mask of shape (height, width)
40 """
41 margin_y = int(np.floor(height * margin_ratio))
42 margin_x = int(np.floor(width * margin_ratio))
44 weight_y = np.ones(height, dtype=np.float32)
45 if margin_y > 0:
46 ramp_top = np.linspace(0, 1, margin_y, endpoint=False)
47 ramp_bottom = np.linspace(1, 0, margin_y, endpoint=False)
48 weight_y[:margin_y] = ramp_top
49 weight_y[-margin_y:] = ramp_bottom
51 weight_x = np.ones(width, dtype=np.float32)
52 if margin_x > 0:
53 ramp_left = np.linspace(0, 1, margin_x, endpoint=False)
54 ramp_right = np.linspace(1, 0, margin_x, endpoint=False)
55 weight_x[:margin_x] = ramp_left
56 weight_x[-margin_x:] = ramp_right
58 # Create 2D weight mask
59 weight_mask = np.outer(weight_y, weight_x)
61 return weight_mask
64def _validate_3d_array(array: Any, name: str = "input") -> None:
65 """
66 Validate that the input is a 3D NumPy array.
68 Args:
69 array: Array to validate
70 name: Name of the array for error messages
72 Raises:
73 TypeError: If the array is not a NumPy array
74 ValueError: If the array is not 3D
75 """
76 if not isinstance(array, np.ndarray):
77 raise TypeError(f"{name} must be a NumPy array, got {type(array)}")
79 if array.ndim != 3:
80 raise ValueError(f"{name} must be a 3D array, got {array.ndim}D")
82@numpy_func
83def sharpen(image: np.ndarray, radius: float = 1.0, amount: float = 1.0) -> np.ndarray:
84 """
85 Sharpen a 3D image using unsharp masking.
87 This applies sharpening to each Z-slice independently.
89 Args:
90 image: 3D NumPy array of shape (Z, Y, X)
91 radius: Radius of Gaussian blur
92 amount: Sharpening strength
94 Returns:
95 Sharpened 3D NumPy array of shape (Z, Y, X)
96 """
97 _validate_3d_array(image)
99 # Store original dtype
100 dtype = image.dtype
102 # Process each Z-slice independently
103 result = np.zeros_like(image, dtype=np.float32)
105 for z in range(image.shape[0]):
106 # Convert to float for processing
107 slice_float = image[z].astype(np.float32) / np.max(image[z])
109 # Create blurred version for unsharp mask
110 blurred = filters.gaussian(slice_float, sigma=radius)
112 # Apply unsharp mask: original + amount * (original - blurred)
113 sharpened = slice_float + amount * (slice_float - blurred)
115 # Clip to valid range
116 sharpened = np.clip(sharpened, 0, 1.0)
118 # Scale back to original range
119 sharpened = exposure.rescale_intensity(sharpened, in_range='image', out_range=(0, 65535))
120 result[z] = sharpened
122 # Convert back to original dtype
123 return result.astype(dtype)
125@numpy_func
126def percentile_normalize(image: np.ndarray,
127 low_percentile: float = 1.0,
128 high_percentile: float = 99.0,
129 target_min: float = 0.0,
130 target_max: float = 65535.0) -> np.ndarray:
131 """
132 Normalize a 3D image using percentile-based contrast stretching.
134 This applies normalization to each Z-slice independently.
136 Args:
137 image: 3D NumPy array of shape (Z, Y, X)
138 low_percentile: Lower percentile (0-100)
139 high_percentile: Upper percentile (0-100)
140 target_min: Target minimum value
141 target_max: Target maximum value
143 Returns:
144 Normalized 3D NumPy array of shape (Z, Y, X)
145 """
146 _validate_3d_array(image)
148 # Import shared utilities
149 from .percentile_utils import resolve_target_range, slice_percentile_normalize_core
151 # Auto-detect target range based on input dtype if not specified
152 target_min, target_max = resolve_target_range(image.dtype, target_min, target_max)
154 # Use shared core logic with NumPy-specific functions
155 return slice_percentile_normalize_core(
156 image=image,
157 low_percentile=low_percentile,
158 high_percentile=high_percentile,
159 target_min=target_min,
160 target_max=target_max,
161 percentile_func=np.percentile,
162 clip_func=np.clip,
163 ones_like_func=np.ones_like,
164 zeros_like_func=lambda arr, dtype=None: np.zeros_like(arr, dtype=dtype or np.float32),
165 preserve_dtype=preserve_dtype
166 )
168@numpy_func
169def stack_percentile_normalize(stack: np.ndarray,
170 low_percentile: float = 1.0,
171 high_percentile: float = 99.0,
172 target_min: float = 0.0,
173 target_max: float = 65535.0) -> np.ndarray:
174 """
175 Normalize a stack using global percentile-based contrast stretching.
177 This ensures consistent normalization across all Z-slices by computing
178 global percentiles across the entire stack.
180 Args:
181 stack: 3D NumPy array of shape (Z, Y, X)
182 low_percentile: Lower percentile (0-100)
183 high_percentile: Upper percentile (0-100)
184 target_min: Target minimum value
185 target_max: Target maximum value
187 Returns:
188 Normalized 3D NumPy array of shape (Z, Y, X)
189 """
190 _validate_3d_array(stack)
192 # Import shared utilities
193 from .percentile_utils import resolve_target_range, percentile_normalize_core
195 # Auto-detect target range based on input dtype if not specified
196 target_min, target_max = resolve_target_range(stack.dtype, target_min, target_max)
198 # Use shared core logic with NumPy-specific functions
199 return percentile_normalize_core(
200 stack=stack,
201 low_percentile=low_percentile,
202 high_percentile=high_percentile,
203 target_min=target_min,
204 target_max=target_max,
205 percentile_func=lambda arr, pct: np.percentile(arr, pct, axis=None),
206 clip_func=np.clip,
207 ones_like_func=np.ones_like,
208 preserve_dtype=preserve_dtype
209 )
211@numpy_func
212def create_composite(
213 stack: np.ndarray, weights: Optional[List[float]] = None
214) -> np.ndarray:
215 """
216 Create a composite image from a 3D stack where each slice is a channel.
218 Args:
219 stack: 3D NumPy array of shape (N, Y, X) where N is number of channel slices
220 weights: List of weights for each slice. If None, equal weights are used.
222 Returns:
223 Composite 3D NumPy array of shape (1, Y, X)
224 """
225 # Validate input is 3D array
226 _validate_3d_array(stack)
228 n_slices, height, width = stack.shape
230 # Default weights if none provided
231 if weights is None:
232 # Equal weights forangeit so that it can only all slices
233 weights = [1.0 / n_slices] * n_slices
234 elif isinstance(weights, (list, tuple)):
235 # Convert tuple to list if needed
236 weights = list(weights)
237 if len(weights) != n_slices:
238 raise ValueError(f"Number of weights ({len(weights)}) must match number of slices ({n_slices})")
239 else:
240 raise TypeError(f"weights must be a list of values or None, got {type(weights)}: {weights}")
242 # Normalize weights to sum to 1
243 weight_sum = sum(weights)
244 if weight_sum == 0:
245 raise ValueError("Sum of weights cannot be zero")
246 normalized_weights = [w / weight_sum for w in weights]
248 # Convert weights to NumPy array for efficient computation
249 # CRITICAL: Use float32 for weights to preserve fractional values, not stack.dtype
250 weights_array = np.array(normalized_weights, dtype=np.float32)
252 # Reshape weights for broadcasting: (N, 1, 1) to multiply with (N, Y, X)
253 weights_array = weights_array.reshape(n_slices, 1, 1)
255 # Create composite by weighted sum along the first axis
256 # Convert stack to float32 for computation to avoid precision loss
257 stack_float = stack.astype(np.float32)
258 weighted_stack = stack_float * weights_array
259 composite_slice = np.sum(weighted_stack, axis=0, keepdims=True) # Keep as (1, Y, X)
261 # Convert back to original dtype
262 composite_slice = composite_slice.astype(stack.dtype)
264 return composite_slice
266@numpy_func
267def apply_mask(image: np.ndarray, mask: np.ndarray) -> np.ndarray:
268 """
269 Apply a mask to a 3D image.
271 This applies the mask to each Z-slice independently if mask is 2D,
272 or applies the 3D mask directly if mask is 3D.
274 Args:
275 image: 3D NumPy array of shape (Z, Y, X)
276 mask: 3D NumPy array of shape (Z, Y, X) or 2D NumPy array of shape (Y, X)
278 Returns:
279 Masked 3D NumPy array of shape (Z, Y, X)
280 """
281 _validate_3d_array(image)
283 # Handle 2D mask (apply to each Z-slice)
284 if isinstance(mask, np.ndarray) and mask.ndim == 2:
285 if mask.shape != image.shape[1:]:
286 raise ValueError(
287 f"2D mask shape {mask.shape} doesn't match image slice shape {image.shape[1:]}"
288 )
290 # Apply 2D mask to each Z-slice
291 result = np.zeros_like(image)
292 for z in range(image.shape[0]):
293 result[z] = image[z].astype(np.float32) * mask.astype(np.float32)
295 return result.astype(image.dtype)
297 # Handle 3D mask
298 if isinstance(mask, np.ndarray) and mask.ndim == 3:
299 if mask.shape != image.shape:
300 raise ValueError(
301 f"3D mask shape {mask.shape} doesn't match image shape {image.shape}"
302 )
304 # Apply 3D mask directly
305 masked = image.astype(np.float32) * mask.astype(np.float32)
306 return masked.astype(image.dtype)
308 # If we get here, the mask is neither 2D nor 3D NumPy array
309 raise TypeError(f"mask must be a 2D or 3D NumPy array, got {type(mask)}")
311@numpy_func
312def create_weight_mask(shape: Tuple[int, int], margin_ratio: float = 0.1) -> np.ndarray:
313 """
314 Create a weight mask for blending images.
316 Args:
317 shape: Shape of the mask (height, width)
318 margin_ratio: Ratio of image size to use as margin
320 Returns:
321 2D NumPy weight mask of shape (Y, X)
322 """
323 if not isinstance(shape, tuple) or len(shape) != 2:
324 raise TypeError("shape must be a tuple of (height, width)")
326 height, width = shape
327 return create_linear_weight_mask(height, width, margin_ratio)
329@numpy_func
330def max_projection(stack: np.ndarray) -> np.ndarray:
331 """
332 Create a maximum intensity projection from a Z-stack.
334 Args:
335 stack: 3D NumPy array of shape (Z, Y, X)
337 Returns:
338 3D NumPy array of shape (1, Y, X)
339 """
340 _validate_3d_array(stack)
342 # Create max projection
343 projection_2d = np.max(stack, axis=0)
344 return projection_2d.reshape(1, projection_2d.shape[0], projection_2d.shape[1])
346@numpy_func
347def mean_projection(stack: np.ndarray) -> np.ndarray:
348 """
349 Create a mean intensity projection from a Z-stack.
351 Args:
352 stack: 3D NumPy array of shape (Z, Y, X)
354 Returns:
355 3D NumPy array of shape (1, Y, X)
356 """
357 _validate_3d_array(stack)
359 # Create mean projection
360 projection_2d = np.mean(stack, axis=0).astype(stack.dtype)
361 return projection_2d.reshape(1, projection_2d.shape[0], projection_2d.shape[1])
363@numpy_func
364def spatial_bin_2d(
365 stack: np.ndarray,
366 bin_size: int = 2,
367 method: str = "mean"
368) -> np.ndarray:
369 """
370 Apply 2D spatial binning to each slice in the stack.
372 Reduces spatial resolution by combining neighboring pixels in 2D blocks.
373 Each slice is processed independently.
375 Args:
376 stack: 3D NumPy array of shape (Z, Y, X)
377 bin_size: Size of the square binning kernel (e.g., 2 = 2x2 binning)
378 method: Binning method - "mean", "sum", "max", or "min"
380 Returns:
381 Binned 3D NumPy array of shape (Z, Y//bin_size, X//bin_size)
382 """
383 _validate_3d_array(stack)
385 if bin_size <= 0:
386 raise ValueError("bin_size must be positive")
387 if method not in ["mean", "sum", "max", "min"]:
388 raise ValueError("method must be one of: mean, sum, max, min")
390 z_slices, height, width = stack.shape
392 # Calculate output dimensions
393 new_height = height // bin_size
394 new_width = width // bin_size
396 if new_height == 0 or new_width == 0:
397 raise ValueError(f"bin_size {bin_size} is too large for image dimensions {height}x{width}")
399 # Crop to make dimensions divisible by bin_size
400 crop_height = new_height * bin_size
401 crop_width = new_width * bin_size
402 cropped_stack = stack[:, :crop_height, :crop_width]
404 # Reshape for binning: (Z, new_height, bin_size, new_width, bin_size)
405 reshaped = cropped_stack.reshape(z_slices, new_height, bin_size, new_width, bin_size)
407 # Apply binning operation
408 if method == "mean":
409 result = np.mean(reshaped, axis=(2, 4))
410 elif method == "sum":
411 result = np.sum(reshaped, axis=(2, 4))
412 elif method == "max":
413 result = np.max(reshaped, axis=(2, 4))
414 elif method == "min":
415 result = np.min(reshaped, axis=(2, 4))
417 return result.astype(stack.dtype)
419@numpy_func
420def spatial_bin_3d(
421 stack: np.ndarray,
422 bin_size: int = 2,
423 method: str = "mean"
424) -> np.ndarray:
425 """
426 Apply 3D spatial binning to the entire stack.
428 Reduces spatial resolution by combining neighboring voxels in 3D blocks.
430 Args:
431 stack: 3D NumPy array of shape (Z, Y, X)
432 bin_size: Size of the cubic binning kernel (e.g., 2 = 2x2x2 binning)
433 method: Binning method - "mean", "sum", "max", or "min"
435 Returns:
436 Binned 3D NumPy array of shape (Z//bin_size, Y//bin_size, X//bin_size)
437 """
438 _validate_3d_array(stack)
440 if bin_size <= 0:
441 raise ValueError("bin_size must be positive")
442 if method not in ["mean", "sum", "max", "min"]:
443 raise ValueError("method must be one of: mean, sum, max, min")
445 depth, height, width = stack.shape
447 # Calculate output dimensions
448 new_depth = depth // bin_size
449 new_height = height // bin_size
450 new_width = width // bin_size
452 if new_depth == 0 or new_height == 0 or new_width == 0:
453 raise ValueError(f"bin_size {bin_size} is too large for stack dimensions {depth}x{height}x{width}")
455 # Crop to make dimensions divisible by bin_size
456 crop_depth = new_depth * bin_size
457 crop_height = new_height * bin_size
458 crop_width = new_width * bin_size
459 cropped_stack = stack[:crop_depth, :crop_height, :crop_width]
461 # Reshape for 3D binning: (new_depth, bin_size, new_height, bin_size, new_width, bin_size)
462 reshaped = cropped_stack.reshape(new_depth, bin_size, new_height, bin_size, new_width, bin_size)
464 # Apply binning operation across the three bin_size dimensions
465 if method == "mean":
466 result = np.mean(reshaped, axis=(1, 3, 5))
467 elif method == "sum":
468 result = np.sum(reshaped, axis=(1, 3, 5))
469 elif method == "max":
470 result = np.max(reshaped, axis=(1, 3, 5))
471 elif method == "min":
472 result = np.min(reshaped, axis=(1, 3, 5))
474 return result.astype(stack.dtype)
476@numpy_func
477def stack_equalize_histogram(
478 stack: np.ndarray,
479 bins: int = 65536,
480 range_min: float = 0.0,
481 range_max: float = 65535.0
482) -> np.ndarray:
483 """
484 Apply histogram equalization to an entire stack.
486 This ensures consistent contrast enhancement across all Z-slices by
487 computing a global histogram across the entire stack.
489 Args:
490 stack: 3D NumPy array of shape (Z, Y, X)
491 bins: Number of bins for histogram computation
492 range_min: Minimum value for histogram range
493 range_max: Maximum value for histogram range
495 Returns:
496 Equalized 3D NumPy array of shape (Z, Y, X)
497 """
498 _validate_3d_array(stack)
500 # Flatten the entire stack to compute the global histogram
501 flat_stack = stack.flatten()
503 # Calculate the histogram and cumulative distribution function (CDF)
504 hist, bin_edges = np.histogram(flat_stack, bins=bins, range=(range_min, range_max))
505 cdf = hist.cumsum()
507 # Normalize the CDF to the range [0, 65535]
508 # Avoid division by zero
509 if cdf[-1] > 0:
510 cdf = 65535 * cdf / cdf[-1]
512 # Use linear interpolation to map input values to equalized values
513 equalized_stack = np.interp(stack.flatten(), bin_edges[:-1], cdf).reshape(stack.shape)
515 # Convert to uint16
516 return equalized_stack.astype(np.uint16)
518@numpy_func
519def create_projection(stack: np.ndarray, method: str = "max_projection") -> np.ndarray:
520 """
521 Create a projection from a stack using the specified method.
523 Args:
524 stack: 3D NumPy array of shape (Z, Y, X)
525 method: Projection method (max_projection, mean_projection)
527 Returns:
528 3D NumPy array of shape (1, Y, X)
529 """
530 _validate_3d_array(stack)
532 if method == "max_projection":
533 return max_projection(stack)
535 if method == "mean_projection":
536 return mean_projection(stack)
538 # FAIL FAST: No fallback projection methods
539 raise ValueError(f"Unknown projection method: {method}. Valid methods: max_projection, mean_projection")
542@numpy_func
543def crop(
544 input_image: np.ndarray,
545 start_x: int = 0,
546 start_y: int = 0,
547 start_z: int = 0,
548 width: int = 1,
549 height: int = 1,
550 depth: int = 1
551) -> np.ndarray:
552 """
553 Crop a given substack out of a given image stack.
555 Equivalent to pyclesperanto.crop() but using NumPy operations.
557 Parameters
558 ----------
559 input_image: np.ndarray
560 Input 3D image to process of shape (Z, Y, X)
561 start_x: int (= 0)
562 Starting index coordinate x
563 start_y: int (= 0)
564 Starting index coordinate y
565 start_z: int (= 0)
566 Starting index coordinate z
567 width: int (= 1)
568 Width size of the region to crop
569 height: int (= 1)
570 Height size of the region to crop
571 depth: int (= 1)
572 Depth size of the region to crop
574 Returns
575 -------
576 np.ndarray
577 Cropped 3D array of shape (depth, height, width)
578 """
579 _validate_3d_array(input_image)
581 # Validate crop parameters
582 if width <= 0 or height <= 0 or depth <= 0:
583 raise ValueError(f"Crop dimensions must be positive: width={width}, height={height}, depth={depth}")
585 if start_x < 0 or start_y < 0 or start_z < 0:
586 raise ValueError(f"Start coordinates must be non-negative: start_x={start_x}, start_y={start_y}, start_z={start_z}")
588 # Get input dimensions
589 input_depth, input_height, input_width = input_image.shape
591 # Calculate end coordinates
592 end_x = start_x + width
593 end_y = start_y + height
594 end_z = start_z + depth
596 # Validate bounds
597 if end_x > input_width or end_y > input_height or end_z > input_depth:
598 raise ValueError(
599 f"Crop region extends beyond image bounds. "
600 f"Image shape: {input_image.shape}, "
601 f"Crop region: ({start_z}:{end_z}, {start_y}:{end_y}, {start_x}:{end_x})"
602 )
604 # Perform the crop using NumPy slicing
605 cropped = input_image[start_z:end_z, start_y:end_y, start_x:end_x]
607 return cropped
609@numpy_func
610def tophat(
611 image: np.ndarray,
612 selem_radius: int = 50,
613 downsample_factor: int = 4,
614 downsample_anti_aliasing: bool = True,
615 upsample_order: int = 0
616) -> np.ndarray:
617 """
618 Apply white top-hat filter to a 3D image for background removal.
620 This applies the filter to each Z-slice independently.
622 Args:
623 image: 3D NumPy array of shape (Z, Y, X)
624 selem_radius: Radius of the structuring element disk
625 downsample_factor: Factor by which to downsample the image for processing
626 downsample_anti_aliasing: Whether to use anti-aliasing when downsampling
627 upsample_order: Interpolation order for upsampling (0=nearest, 1=linear, etc.)
629 Returns:
630 Filtered 3D NumPy array of shape (Z, Y, X)
631 """
632 _validate_3d_array(image)
634 # Process each Z-slice independently
635 result = np.zeros_like(image)
637 for z in range(image.shape[0]):
638 # Store original data type
639 input_dtype = image[z].dtype
641 # 1) Downsample
642 image_small = trans.resize(
643 image[z],
644 (image[z].shape[0]//downsample_factor, image[z].shape[1]//downsample_factor),
645 anti_aliasing=downsample_anti_aliasing,
646 preserve_range=True
647 )
649 # 2) Build structuring element for the smaller image
650 selem_small = morph.disk(selem_radius // downsample_factor)
652 # 3) White top-hat on the smaller image
653 tophat_small = morph.white_tophat(image_small, selem_small)
655 # 4) Upscale background to original size
656 background_small = image_small - tophat_small
657 background_large = trans.resize(
658 background_small,
659 image[z].shape,
660 order=upsample_order,
661 preserve_range=True
662 )
664 # 5) Subtract background and clip negative values
665 slice_result = np.maximum(image[z] - background_large, 0)
667 # 6) Convert back to original data type
668 result[z] = slice_result.astype(input_dtype)
670 return result