Coverage for openhcs/processing/backends/processors/numpy_processor.py: 25.3%
222 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2NumPy Image Processor Implementation
4This module implements the ImageProcessorInterface using NumPy as the backend.
5It serves as the reference implementation and works on CPU.
7Doctrinal Clauses:
8- Clause 3 — Declarative Primacy: All functions are pure and stateless
9- Clause 88 — No Inferred Capabilities: Explicit NumPy dependency
10- Clause 106-A — Declared Memory Types: All methods specify NumPy arrays
11"""
12from __future__ import annotations
14import logging
15from typing import TYPE_CHECKING, Any, List, Optional, Tuple
17import numpy as np
18from skimage import exposure, filters
19from skimage import morphology as morph
20from skimage import transform as trans
22# Use direct import from core memory decorators to avoid circular imports
23from openhcs.core.memory.decorators import numpy as numpy_func
25logger = logging.getLogger(__name__)
28@numpy_func
29def create_linear_weight_mask(height: int, width: int, margin_ratio: float = 0.1) -> np.ndarray:
30 """
31 Create a 2D weight mask that linearly ramps from 0 at the edges to 1 in the center.
33 Args:
34 height: Height of the mask
35 width: Width of the mask
36 margin_ratio: Ratio of the margin to the image size
38 Returns:
39 2D weight mask of shape (height, width)
40 """
41 margin_y = int(np.floor(height * margin_ratio))
42 margin_x = int(np.floor(width * margin_ratio))
44 weight_y = np.ones(height, dtype=np.float32)
45 if margin_y > 0:
46 ramp_top = np.linspace(0, 1, margin_y, endpoint=False)
47 ramp_bottom = np.linspace(1, 0, margin_y, endpoint=False)
48 weight_y[:margin_y] = ramp_top
49 weight_y[-margin_y:] = ramp_bottom
51 weight_x = np.ones(width, dtype=np.float32)
52 if margin_x > 0:
53 ramp_left = np.linspace(0, 1, margin_x, endpoint=False)
54 ramp_right = np.linspace(1, 0, margin_x, endpoint=False)
55 weight_x[:margin_x] = ramp_left
56 weight_x[-margin_x:] = ramp_right
58 # Create 2D weight mask
59 weight_mask = np.outer(weight_y, weight_x)
61 return weight_mask
64def _validate_3d_array(array: Any, name: str = "input") -> None:
65 """
66 Validate that the input is a 3D NumPy array.
68 Args:
69 array: Array to validate
70 name: Name of the array for error messages
72 Raises:
73 TypeError: If the array is not a NumPy array
74 ValueError: If the array is not 3D
75 """
76 if not isinstance(array, np.ndarray): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 raise TypeError(f"{name} must be a NumPy array, got {type(array)}")
79 if array.ndim != 3: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 raise ValueError(f"{name} must be a 3D array, got {array.ndim}D")
82@numpy_func
83def sharpen(image: np.ndarray, radius: float = 1.0, amount: float = 1.0) -> np.ndarray:
84 """
85 Sharpen a 3D image using unsharp masking.
87 This applies sharpening to each Z-slice independently.
89 Args:
90 image: 3D NumPy array of shape (Z, Y, X)
91 radius: Radius of Gaussian blur
92 amount: Sharpening strength
94 Returns:
95 Sharpened 3D NumPy array of shape (Z, Y, X)
96 """
97 _validate_3d_array(image)
99 # Store original dtype
100 dtype = image.dtype
102 # Process each Z-slice independently
103 result = np.zeros_like(image, dtype=np.float32)
105 for z in range(image.shape[0]):
106 # Convert to float for processing
107 slice_float = image[z].astype(np.float32) / np.max(image[z])
109 # Create blurred version for unsharp mask
110 blurred = filters.gaussian(slice_float, sigma=radius)
112 # Apply unsharp mask: original + amount * (original - blurred)
113 sharpened = slice_float + amount * (slice_float - blurred)
115 # Clip to valid range
116 sharpened = np.clip(sharpened, 0, 1.0)
118 # Scale back to original range
119 sharpened = exposure.rescale_intensity(sharpened, in_range='image', out_range=(0, 65535))
120 result[z] = sharpened
122 # Convert back to original dtype
123 return result.astype(dtype)
125@numpy_func
126def percentile_normalize(image: np.ndarray,
127 low_percentile: float = 1.0,
128 high_percentile: float = 99.0,
129 target_min: float = 0.0,
130 target_max: float = 65535.0) -> np.ndarray:
131 """
132 Normalize a 3D image using percentile-based contrast stretching.
134 This applies normalization to each Z-slice independently.
136 Args:
137 image: 3D NumPy array of shape (Z, Y, X)
138 low_percentile: Lower percentile (0-100)
139 high_percentile: Upper percentile (0-100)
140 target_min: Target minimum value
141 target_max: Target maximum value
143 Returns:
144 Normalized 3D NumPy array of shape (Z, Y, X)
145 """
146 _validate_3d_array(image)
148 # Process each Z-slice independently
149 result = np.zeros_like(image, dtype=np.float32)
151 for z in range(image.shape[0]):
152 # Get percentile values for this slice
153 p_low, p_high = np.percentile(image[z], (low_percentile, high_percentile))
155 # Avoid division by zero
156 if p_high == p_low:
157 result[z] = np.ones_like(image[z]) * target_min
158 continue
160 # Clip and normalize to target range
161 clipped = np.clip(image[z], p_low, p_high)
162 normalized = (clipped - p_low) * (target_max - target_min) / (p_high - p_low) + target_min
163 result[z] = normalized
165 # Convert to uint16
166 return result.astype(np.uint16)
168@numpy_func
169def stack_percentile_normalize(stack: np.ndarray,
170 low_percentile: float = 1.0,
171 high_percentile: float = 99.0,
172 target_min: float = 0.0,
173 target_max: float = 65535.0) -> np.ndarray:
174 """
175 Normalize a stack using global percentile-based contrast stretching.
177 This ensures consistent normalization across all Z-slices by computing
178 global percentiles across the entire stack.
180 Args:
181 stack: 3D NumPy array of shape (Z, Y, X)
182 low_percentile: Lower percentile (0-100)
183 high_percentile: Upper percentile (0-100)
184 target_min: Target minimum value
185 target_max: Target maximum value
187 Returns:
188 Normalized 3D NumPy array of shape (Z, Y, X)
189 """
190 _validate_3d_array(stack)
192 # Calculate global percentiles across the entire stack
193 p_low = np.percentile(stack, low_percentile, axis=None)
194 p_high = np.percentile(stack, high_percentile, axis=None)
196 # Avoid division by zero
197 if p_high == p_low: 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true
198 return np.ones_like(stack) * target_min
200 # Clip and normalize to target range
201 clipped = np.clip(stack, p_low, p_high)
202 normalized = (clipped - p_low) * (target_max - target_min) / (p_high - p_low) + target_min
203 normalized = normalized.astype(np.uint16)
205 return normalized
207@numpy_func
208def create_composite(
209 stack: np.ndarray, weights: Optional[List[float]] = None
210) -> np.ndarray:
211 """
212 Create a composite image from a 3D stack where each slice is a channel.
214 Args:
215 stack: 3D NumPy array of shape (N, Y, X) where N is number of channel slices
216 weights: List of weights for each slice. If None, equal weights are used.
218 Returns:
219 Composite 3D NumPy array of shape (1, Y, X)
220 """
221 # Validate input is 3D array
222 _validate_3d_array(stack)
224 n_slices, height, width = stack.shape
226 # Default weights if none provided
227 if weights is None: 227 ↛ 230line 227 didn't jump to line 230 because the condition on line 227 was always true
228 # Equal weights forangeit so that it can only all slices
229 weights = [1.0 / n_slices] * n_slices
230 elif isinstance(weights, (list, tuple)):
231 # Convert tuple to list if needed
232 weights = list(weights)
233 if len(weights) != n_slices:
234 raise ValueError(f"Number of weights ({len(weights)}) must match number of slices ({n_slices})")
235 else:
236 raise TypeError(f"weights must be a list of values or None, got {type(weights)}: {weights}")
238 # Normalize weights to sum to 1
239 weight_sum = sum(weights)
240 if weight_sum == 0: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 raise ValueError("Sum of weights cannot be zero")
242 normalized_weights = [w / weight_sum for w in weights]
244 # Convert weights to NumPy array for efficient computation
245 # CRITICAL: Use float32 for weights to preserve fractional values, not stack.dtype
246 weights_array = np.array(normalized_weights, dtype=np.float32)
248 # Reshape weights for broadcasting: (N, 1, 1) to multiply with (N, Y, X)
249 weights_array = weights_array.reshape(n_slices, 1, 1)
251 # Create composite by weighted sum along the first axis
252 # Convert stack to float32 for computation to avoid precision loss
253 stack_float = stack.astype(np.float32)
254 weighted_stack = stack_float * weights_array
255 composite_slice = np.sum(weighted_stack, axis=0, keepdims=True) # Keep as (1, Y, X)
257 # Convert back to original dtype
258 composite_slice = composite_slice.astype(stack.dtype)
260 return composite_slice
262@numpy_func
263def apply_mask(image: np.ndarray, mask: np.ndarray) -> np.ndarray:
264 """
265 Apply a mask to a 3D image.
267 This applies the mask to each Z-slice independently if mask is 2D,
268 or applies the 3D mask directly if mask is 3D.
270 Args:
271 image: 3D NumPy array of shape (Z, Y, X)
272 mask: 3D NumPy array of shape (Z, Y, X) or 2D NumPy array of shape (Y, X)
274 Returns:
275 Masked 3D NumPy array of shape (Z, Y, X)
276 """
277 _validate_3d_array(image)
279 # Handle 2D mask (apply to each Z-slice)
280 if isinstance(mask, np.ndarray) and mask.ndim == 2:
281 if mask.shape != image.shape[1:]:
282 raise ValueError(
283 f"2D mask shape {mask.shape} doesn't match image slice shape {image.shape[1:]}"
284 )
286 # Apply 2D mask to each Z-slice
287 result = np.zeros_like(image)
288 for z in range(image.shape[0]):
289 result[z] = image[z].astype(np.float32) * mask.astype(np.float32)
291 return result.astype(image.dtype)
293 # Handle 3D mask
294 if isinstance(mask, np.ndarray) and mask.ndim == 3:
295 if mask.shape != image.shape:
296 raise ValueError(
297 f"3D mask shape {mask.shape} doesn't match image shape {image.shape}"
298 )
300 # Apply 3D mask directly
301 masked = image.astype(np.float32) * mask.astype(np.float32)
302 return masked.astype(image.dtype)
304 # If we get here, the mask is neither 2D nor 3D NumPy array
305 raise TypeError(f"mask must be a 2D or 3D NumPy array, got {type(mask)}")
307@numpy_func
308def create_weight_mask(shape: Tuple[int, int], margin_ratio: float = 0.1) -> np.ndarray:
309 """
310 Create a weight mask for blending images.
312 Args:
313 shape: Shape of the mask (height, width)
314 margin_ratio: Ratio of image size to use as margin
316 Returns:
317 2D NumPy weight mask of shape (Y, X)
318 """
319 if not isinstance(shape, tuple) or len(shape) != 2:
320 raise TypeError("shape must be a tuple of (height, width)")
322 height, width = shape
323 return create_linear_weight_mask(height, width, margin_ratio)
325@numpy_func
326def max_projection(stack: np.ndarray) -> np.ndarray:
327 """
328 Create a maximum intensity projection from a Z-stack.
330 Args:
331 stack: 3D NumPy array of shape (Z, Y, X)
333 Returns:
334 3D NumPy array of shape (1, Y, X)
335 """
336 _validate_3d_array(stack)
338 # Create max projection
339 projection_2d = np.max(stack, axis=0)
340 return projection_2d.reshape(1, projection_2d.shape[0], projection_2d.shape[1])
342@numpy_func
343def mean_projection(stack: np.ndarray) -> np.ndarray:
344 """
345 Create a mean intensity projection from a Z-stack.
347 Args:
348 stack: 3D NumPy array of shape (Z, Y, X)
350 Returns:
351 3D NumPy array of shape (1, Y, X)
352 """
353 _validate_3d_array(stack)
355 # Create mean projection
356 projection_2d = np.mean(stack, axis=0).astype(stack.dtype)
357 return projection_2d.reshape(1, projection_2d.shape[0], projection_2d.shape[1])
359@numpy_func
360def spatial_bin_2d(
361 stack: np.ndarray,
362 bin_size: int = 2,
363 method: str = "mean"
364) -> np.ndarray:
365 """
366 Apply 2D spatial binning to each slice in the stack.
368 Reduces spatial resolution by combining neighboring pixels in 2D blocks.
369 Each slice is processed independently.
371 Args:
372 stack: 3D NumPy array of shape (Z, Y, X)
373 bin_size: Size of the square binning kernel (e.g., 2 = 2x2 binning)
374 method: Binning method - "mean", "sum", "max", or "min"
376 Returns:
377 Binned 3D NumPy array of shape (Z, Y//bin_size, X//bin_size)
378 """
379 _validate_3d_array(stack)
381 if bin_size <= 0:
382 raise ValueError("bin_size must be positive")
383 if method not in ["mean", "sum", "max", "min"]:
384 raise ValueError("method must be one of: mean, sum, max, min")
386 z_slices, height, width = stack.shape
388 # Calculate output dimensions
389 new_height = height // bin_size
390 new_width = width // bin_size
392 if new_height == 0 or new_width == 0:
393 raise ValueError(f"bin_size {bin_size} is too large for image dimensions {height}x{width}")
395 # Crop to make dimensions divisible by bin_size
396 crop_height = new_height * bin_size
397 crop_width = new_width * bin_size
398 cropped_stack = stack[:, :crop_height, :crop_width]
400 # Reshape for binning: (Z, new_height, bin_size, new_width, bin_size)
401 reshaped = cropped_stack.reshape(z_slices, new_height, bin_size, new_width, bin_size)
403 # Apply binning operation
404 if method == "mean":
405 result = np.mean(reshaped, axis=(2, 4))
406 elif method == "sum":
407 result = np.sum(reshaped, axis=(2, 4))
408 elif method == "max":
409 result = np.max(reshaped, axis=(2, 4))
410 elif method == "min":
411 result = np.min(reshaped, axis=(2, 4))
413 return result.astype(stack.dtype)
415@numpy_func
416def spatial_bin_3d(
417 stack: np.ndarray,
418 bin_size: int = 2,
419 method: str = "mean"
420) -> np.ndarray:
421 """
422 Apply 3D spatial binning to the entire stack.
424 Reduces spatial resolution by combining neighboring voxels in 3D blocks.
426 Args:
427 stack: 3D NumPy array of shape (Z, Y, X)
428 bin_size: Size of the cubic binning kernel (e.g., 2 = 2x2x2 binning)
429 method: Binning method - "mean", "sum", "max", or "min"
431 Returns:
432 Binned 3D NumPy array of shape (Z//bin_size, Y//bin_size, X//bin_size)
433 """
434 _validate_3d_array(stack)
436 if bin_size <= 0:
437 raise ValueError("bin_size must be positive")
438 if method not in ["mean", "sum", "max", "min"]:
439 raise ValueError("method must be one of: mean, sum, max, min")
441 depth, height, width = stack.shape
443 # Calculate output dimensions
444 new_depth = depth // bin_size
445 new_height = height // bin_size
446 new_width = width // bin_size
448 if new_depth == 0 or new_height == 0 or new_width == 0:
449 raise ValueError(f"bin_size {bin_size} is too large for stack dimensions {depth}x{height}x{width}")
451 # Crop to make dimensions divisible by bin_size
452 crop_depth = new_depth * bin_size
453 crop_height = new_height * bin_size
454 crop_width = new_width * bin_size
455 cropped_stack = stack[:crop_depth, :crop_height, :crop_width]
457 # Reshape for 3D binning: (new_depth, bin_size, new_height, bin_size, new_width, bin_size)
458 reshaped = cropped_stack.reshape(new_depth, bin_size, new_height, bin_size, new_width, bin_size)
460 # Apply binning operation across the three bin_size dimensions
461 if method == "mean":
462 result = np.mean(reshaped, axis=(1, 3, 5))
463 elif method == "sum":
464 result = np.sum(reshaped, axis=(1, 3, 5))
465 elif method == "max":
466 result = np.max(reshaped, axis=(1, 3, 5))
467 elif method == "min":
468 result = np.min(reshaped, axis=(1, 3, 5))
470 return result.astype(stack.dtype)
472@numpy_func
473def stack_equalize_histogram(
474 stack: np.ndarray,
475 bins: int = 65536,
476 range_min: float = 0.0,
477 range_max: float = 65535.0
478) -> np.ndarray:
479 """
480 Apply histogram equalization to an entire stack.
482 This ensures consistent contrast enhancement across all Z-slices by
483 computing a global histogram across the entire stack.
485 Args:
486 stack: 3D NumPy array of shape (Z, Y, X)
487 bins: Number of bins for histogram computation
488 range_min: Minimum value for histogram range
489 range_max: Maximum value for histogram range
491 Returns:
492 Equalized 3D NumPy array of shape (Z, Y, X)
493 """
494 _validate_3d_array(stack)
496 # Flatten the entire stack to compute the global histogram
497 flat_stack = stack.flatten()
499 # Calculate the histogram and cumulative distribution function (CDF)
500 hist, bin_edges = np.histogram(flat_stack, bins=bins, range=(range_min, range_max))
501 cdf = hist.cumsum()
503 # Normalize the CDF to the range [0, 65535]
504 # Avoid division by zero
505 if cdf[-1] > 0:
506 cdf = 65535 * cdf / cdf[-1]
508 # Use linear interpolation to map input values to equalized values
509 equalized_stack = np.interp(stack.flatten(), bin_edges[:-1], cdf).reshape(stack.shape)
511 # Convert to uint16
512 return equalized_stack.astype(np.uint16)
514@numpy_func
515def create_projection(stack: np.ndarray, method: str = "max_projection") -> np.ndarray:
516 """
517 Create a projection from a stack using the specified method.
519 Args:
520 stack: 3D NumPy array of shape (Z, Y, X)
521 method: Projection method (max_projection, mean_projection)
523 Returns:
524 3D NumPy array of shape (1, Y, X)
525 """
526 _validate_3d_array(stack)
528 if method == "max_projection": 528 ↛ 531line 528 didn't jump to line 531 because the condition on line 528 was always true
529 return max_projection(stack)
531 if method == "mean_projection":
532 return mean_projection(stack)
534 # FAIL FAST: No fallback projection methods
535 raise ValueError(f"Unknown projection method: {method}. Valid methods: max_projection, mean_projection")
538@numpy_func
539def crop(
540 input_image: np.ndarray,
541 start_x: int = 0,
542 start_y: int = 0,
543 start_z: int = 0,
544 width: int = 1,
545 height: int = 1,
546 depth: int = 1
547) -> np.ndarray:
548 """
549 Crop a given substack out of a given image stack.
551 Equivalent to pyclesperanto.crop() but using NumPy operations.
553 Parameters
554 ----------
555 input_image: np.ndarray
556 Input 3D image to process of shape (Z, Y, X)
557 start_x: int (= 0)
558 Starting index coordinate x
559 start_y: int (= 0)
560 Starting index coordinate y
561 start_z: int (= 0)
562 Starting index coordinate z
563 width: int (= 1)
564 Width size of the region to crop
565 height: int (= 1)
566 Height size of the region to crop
567 depth: int (= 1)
568 Depth size of the region to crop
570 Returns
571 -------
572 np.ndarray
573 Cropped 3D array of shape (depth, height, width)
574 """
575 _validate_3d_array(input_image)
577 # Validate crop parameters
578 if width <= 0 or height <= 0 or depth <= 0:
579 raise ValueError(f"Crop dimensions must be positive: width={width}, height={height}, depth={depth}")
581 if start_x < 0 or start_y < 0 or start_z < 0:
582 raise ValueError(f"Start coordinates must be non-negative: start_x={start_x}, start_y={start_y}, start_z={start_z}")
584 # Get input dimensions
585 input_depth, input_height, input_width = input_image.shape
587 # Calculate end coordinates
588 end_x = start_x + width
589 end_y = start_y + height
590 end_z = start_z + depth
592 # Validate bounds
593 if end_x > input_width or end_y > input_height or end_z > input_depth:
594 raise ValueError(
595 f"Crop region extends beyond image bounds. "
596 f"Image shape: {input_image.shape}, "
597 f"Crop region: ({start_z}:{end_z}, {start_y}:{end_y}, {start_x}:{end_x})"
598 )
600 # Perform the crop using NumPy slicing
601 cropped = input_image[start_z:end_z, start_y:end_y, start_x:end_x]
603 return cropped
605@numpy_func
606def tophat(
607 image: np.ndarray,
608 selem_radius: int = 50,
609 downsample_factor: int = 4,
610 downsample_anti_aliasing: bool = True,
611 upsample_order: int = 0
612) -> np.ndarray:
613 """
614 Apply white top-hat filter to a 3D image for background removal.
616 This applies the filter to each Z-slice independently.
618 Args:
619 image: 3D NumPy array of shape (Z, Y, X)
620 selem_radius: Radius of the structuring element disk
621 downsample_factor: Factor by which to downsample the image for processing
622 downsample_anti_aliasing: Whether to use anti-aliasing when downsampling
623 upsample_order: Interpolation order for upsampling (0=nearest, 1=linear, etc.)
625 Returns:
626 Filtered 3D NumPy array of shape (Z, Y, X)
627 """
628 _validate_3d_array(image)
630 # Process each Z-slice independently
631 result = np.zeros_like(image)
633 for z in range(image.shape[0]):
634 # Store original data type
635 input_dtype = image[z].dtype
637 # 1) Downsample
638 image_small = trans.resize(
639 image[z],
640 (image[z].shape[0]//downsample_factor, image[z].shape[1]//downsample_factor),
641 anti_aliasing=downsample_anti_aliasing,
642 preserve_range=True
643 )
645 # 2) Build structuring element for the smaller image
646 selem_small = morph.disk(selem_radius // downsample_factor)
648 # 3) White top-hat on the smaller image
649 tophat_small = morph.white_tophat(image_small, selem_small)
651 # 4) Upscale background to original size
652 background_small = image_small - tophat_small
653 background_large = trans.resize(
654 background_small,
655 image[z].shape,
656 order=upsample_order,
657 preserve_range=True
658 )
660 # 5) Subtract background and clip negative values
661 slice_result = np.maximum(image[z] - background_large, 0)
663 # 6) Convert back to original data type
664 result[z] = slice_result.astype(input_dtype)
666 return result