Coverage for openhcs/processing/backends/analysis/cell_counting_cupy.py: 14.4%
461 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2CuPy-based cell counting and multi-channel colocalization analysis for OpenHCS.
4This module provides comprehensive cell counting capabilities using CuPy and CuCIM,
5supporting both single-channel and multi-channel analysis with various detection
6methods and colocalization metrics.
7"""
9import numpy as np # Keep for CPU fallbacks and data conversion
10import logging
11from typing import Dict, List, Tuple, Any, Optional, Union, TYPE_CHECKING
12from dataclasses import dataclass
13from enum import Enum
15# Import CuPy using the established optional import pattern
16from openhcs.core.utils import optional_import
18cp = optional_import("cupy")
20# Type checking imports
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 import cupy as cp_typing
24logger = logging.getLogger(__name__)
26# Core scientific computing imports
27import pandas as pd
28import json
30# Optional imports using established pattern
31cupyx_scipy = optional_import("cupyx.scipy")
32cupyx_spatial_distance = optional_import("cupyx.scipy.spatial.distance")
33cucim_feature = optional_import("cucim.skimage.feature")
34cucim_filters = optional_import("cucim.skimage.filters")
35cucim_segmentation = optional_import("cucim.skimage.segmentation")
36cucim_morphology = optional_import("cucim.skimage.morphology")
37cucim_measure = optional_import("cucim.skimage.measure")
38cucim_exposure = optional_import("cucim.skimage.exposure")
42# OpenHCS imports
43from openhcs.core.memory.decorators import cupy as cupy_func
44from openhcs.core.pipeline.function_contracts import special_outputs
45from openhcs.constants.constants import Backend
48class DetectionMethod(Enum):
49 """Cell detection methods available."""
50 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells)
51 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose)
52 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells)
53 WATERSHED = "watershed" # Watershed segmentation (best for touching cells)
54 THRESHOLD = "threshold" # Simple thresholding (fastest, basic)
57class ColocalizationMethod(Enum):
58 """Methods for multi-channel colocalization analysis."""
59 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap
60 DISTANCE_BASED = "distance_based" # Based on centroid distances
61 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation
62 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients
65class ThresholdMethod(Enum):
66 """Automatic thresholding methods for watershed segmentation."""
67 OTSU = "otsu" # Otsu's method (good for bimodal histograms)
68 LI = "li" # Li's method (good for low contrast images)
69 MANUAL = "manual" # Manual threshold value (0.0-1.0)
75@dataclass
76class CellCountResult:
77 """Results for single-channel cell counting."""
78 slice_index: int
79 method: str
80 cell_count: int
81 cell_positions: List[Tuple[float, float]] # (x, y) centroids
82 cell_areas: List[float]
83 cell_intensities: List[float]
84 detection_confidence: List[float]
85 parameters_used: Dict[str, Any]
86 binary_mask: Optional[cp.ndarray] = None # Actual binary mask for segmentation methods
89@dataclass
90class MultiChannelResult:
91 """Results for multi-channel cell counting and colocalization."""
92 slice_index: int
93 chan_1_results: CellCountResult
94 chan_2_results: CellCountResult
95 colocalization_method: str
96 colocalized_count: int
97 colocalization_percentage: float
98 chan_1_only_count: int
99 chan_2_only_count: int
100 colocalization_metrics: Dict[str, float]
101 overlap_positions: List[Tuple[float, float]]
104def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager) -> str:
105 """Materialize cell counting results as analysis-ready CSV and JSON formats."""
107 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}")
109 # Convert CuPy binary_mask to NumPy if present
110 if data:
111 for result in data:
112 if result.binary_mask is not None:
113 result.binary_mask = cp.asnumpy(result.binary_mask)
114 if isinstance(result, MultiChannelResult):
115 if result.chan_1_results.binary_mask is not None:
116 result.chan_1_results.binary_mask = cp.asnumpy(result.chan_1_results.binary_mask)
117 if result.chan_2_results.binary_mask is not None:
118 result.chan_2_results.binary_mask = cp.asnumpy(result.chan_2_results.binary_mask)
120 # Determine if this is single-channel or multi-channel data
121 if not data:
122 logger.warning(f"🔬 CELL_COUNT_MATERIALIZE: No data to materialize")
123 return path
125 is_multi_channel = isinstance(data[0], MultiChannelResult)
126 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}")
128 if is_multi_channel:
129 return _materialize_multi_channel_results(data, path, filemanager)
130 else:
131 return _materialize_single_channel_results(data, path, filemanager)
134def materialize_segmentation_masks(data: List[cp.ndarray], path: str, filemanager) -> str:
135 """Materialize segmentation masks as individual TIFF files."""
137 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, masks_count={len(data) if data else 0}")
139 # Convert CuPy arrays to NumPy
140 if data:
141 data = [cp.asnumpy(mask) for mask in data]
143 if not data:
144 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)")
145 # Create empty summary file to indicate no masks were generated
146 summary_path = path.replace('.pkl', '_segmentation_summary.txt')
147 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n"
148 filemanager.save(summary_content, summary_path, Backend.DISK.value)
149 return summary_path
151 # Generate output file paths based on the input path
152 base_path = path.replace('.pkl', '')
154 # Save each mask as a separate TIFF file
155 for i, mask in enumerate(data):
156 mask_filename = f"{base_path}_slice_{i:03d}.tif"
158 # Convert mask to appropriate dtype for saving (uint16 to match input images)
159 if mask.dtype != np.uint16:
160 # Normalize to uint16 range if needed
161 if mask.max() <= 1.0:
162 mask_uint16 = (mask * 65535).astype(np.uint16)
163 else:
164 mask_uint16 = mask.astype(np.uint16)
165 else:
166 mask_uint16 = mask
168 # Save using filemanager
169 filemanager.save(mask_uint16, mask_filename, Backend.DISK.value)
170 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename}")
172 # Return summary path
173 summary_path = f"{base_path}_segmentation_summary.txt"
174 summary_content = f"Segmentation masks saved: {len(data)} files\n"
175 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n"
176 summary_content += f"Mask dtype: {data[0].dtype}\n"
177 summary_content += f"Mask shape: {data[0].shape}\n"
179 filemanager.save(summary_content, summary_path, Backend.DISK.value)
180 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks")
182 return summary_path
185@cupy_func
186@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks))
187def count_cells_single_channel(
188 image_stack: cp.ndarray,
189 # Detection method and parameters
190 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
191 # Blob detection parameters
192 min_sigma: float = 1.0, # Minimum blob size (pixels)
193 max_sigma: float = 10.0, # Maximum blob size (pixels)
194 num_sigma: int = 10, # Number of sigma values to test
195 threshold: float = 0.1, # Detection threshold (0.0-1.0)
196 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
197 # Watershed parameters
198 watershed_footprint_size: int = 3, # Local maxima footprint size
199 watershed_min_distance: int = 5, # Minimum distance between peaks
200 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods
201 # Preprocessing parameters
202 enable_preprocessing: bool = True,
203 gaussian_sigma: float = 1.0, # Gaussian blur sigma
204 median_disk_size: int = 1, # Median filter disk size
205 # Filtering parameters
206 min_cell_area: int = 10, # Minimum cell area (pixels)
207 max_cell_area: int = 1000, # Maximum cell area (pixels)
208 remove_border_cells: bool = True, # Remove cells touching image border
209 # Output parameters
210 return_segmentation_mask: bool = False
211) -> Tuple[cp.ndarray, List[CellCountResult]]:
212 """
213 Count cells in single-channel image stack using various detection methods.
215 Args:
216 image_stack: 3D CuPy array (Z, Y, X) where each Z slice is processed independently
217 detection_method: Method for cell detection (see DetectionMethod enum)
218 min_sigma: Minimum blob size for blob detection methods
219 max_sigma: Maximum blob size for blob detection methods
220 num_sigma: Number of sigma values to test for blob detection
221 threshold: Detection threshold (method-dependent)
222 overlap: Maximum overlap between detected blobs
223 watershed_footprint_size: Footprint size for local maxima detection
224 watershed_min_distance: Minimum distance between watershed peaks
225 watershed_threshold_method: Thresholding method for watershed
226 enable_preprocessing: Apply Gaussian and median filtering
227 gaussian_sigma: Standard deviation for Gaussian blur
228 median_disk_size: Disk size for median filtering
229 min_cell_area: Minimum area for valid cells
230 max_cell_area: Maximum area for valid cells
231 remove_border_cells: Remove cells touching image borders
232 return_segmentation_mask: Return segmentation masks in output
234 Returns:
235 output_stack: Original image stack unchanged (Z, Y, X)
236 cell_count_results: List of CellCountResult objects for each slice
237 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True
238 """
239 if image_stack.ndim != 3:
240 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
242 results = []
243 segmentation_masks = []
245 # Store parameters for reproducibility (convert enums to values)
246 parameters = {
247 "detection_method": detection_method.value,
248 "min_sigma": min_sigma,
249 "max_sigma": max_sigma,
250 "num_sigma": num_sigma,
251 "threshold": threshold,
252 "overlap": overlap,
253 "watershed_footprint_size": watershed_footprint_size,
254 "watershed_min_distance": watershed_min_distance,
255 "watershed_threshold_method": watershed_threshold_method.value,
256 "gaussian_sigma": gaussian_sigma,
257 "median_disk_size": median_disk_size,
258 "min_cell_area": min_cell_area,
259 "max_cell_area": max_cell_area,
260 "remove_border_cells": remove_border_cells
261 }
263 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method")
265 for z_idx in range(image_stack.shape[0]):
266 slice_img = image_stack[z_idx].astype(cp.float64)
268 # Apply preprocessing if enabled
269 if enable_preprocessing:
270 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size)
272 # Detect cells using specified method
273 result = _detect_cells_single_method(
274 slice_img, z_idx, detection_method.value, parameters
275 )
277 results.append(result)
279 # Create segmentation mask if requested
280 if return_segmentation_mask:
281 segmentation_mask = _create_segmentation_visualization(
282 slice_img, result.cell_positions, max_sigma, result.cell_areas, result.binary_mask
283 )
284 segmentation_masks.append(segmentation_mask)
286 # Always return segmentation masks (empty list if not requested)
287 # This ensures consistent return signature for special outputs system
288 return image_stack, results, segmentation_masks
291@cupy_func
292@special_outputs(("multi_channel_counts", materialize_cell_counts))
293def count_cells_multi_channel(
294 image_stack: cp.ndarray,
295 chan_1: int, # Index of first channel (positional arg)
296 chan_2: int, # Index of second channel (positional arg)
297 # Detection parameters for channel 1 (all single-channel params available)
298 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
299 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels)
300 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels)
301 chan_1_num_sigma: int = 10, # Number of sigma values to test
302 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0)
303 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
304 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size
305 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks
306 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
307 chan_1_enable_preprocessing: bool = True, # Apply preprocessing
308 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma
309 chan_1_median_disk_size: int = 1, # Median filter disk size
310 chan_1_min_area: int = 10, # Minimum cell area (pixels)
311 chan_1_max_area: int = 1000, # Maximum cell area (pixels)
312 chan_1_remove_border_cells: bool = True, # Remove cells touching border
313 # Detection parameters for channel 2 (all single-channel params available)
314 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
315 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels)
316 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels)
317 chan_2_num_sigma: int = 10, # Number of sigma values to test
318 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0)
319 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
320 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size
321 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks
322 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
323 chan_2_enable_preprocessing: bool = True, # Apply preprocessing
324 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma
325 chan_2_median_disk_size: int = 1, # Median filter disk size
326 chan_2_min_area: int = 10, # Minimum cell area (pixels)
327 chan_2_max_area: int = 1000, # Maximum cell area (pixels)
328 chan_2_remove_border_cells: bool = True, # Remove cells touching border
329 # Colocalization parameters
330 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods
331 max_distance: float = 5.0, # Maximum distance for colocalization (pixels)
332 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method
333 intensity_threshold: float = 0.5, # Threshold for intensity-based methods
334 # Output parameters
335 return_colocalization_map: bool = False
336) -> Tuple[cp.ndarray, List[MultiChannelResult]]:
337 """
338 Count cells in multi-channel image stack with colocalization analysis.
340 Each channel can be processed with independent parameters, providing the same
341 flexibility as the single-channel function for each channel separately.
343 Args:
344 image_stack: 3D CuPy array (Z, Y, X) where Z represents different channels
345 chan_1: Index of first channel in the stack (positional)
346 chan_2: Index of second channel in the stack (positional)
348 # Channel 1 detection parameters (same as single-channel function)
349 chan_1_method: Detection method for channel 1 (DetectionMethod enum)
350 chan_1_min_sigma: Minimum blob size for channel 1
351 chan_1_max_sigma: Maximum blob size for channel 1
352 chan_1_num_sigma: Number of sigma values to test for channel 1
353 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0)
354 chan_1_overlap: Maximum overlap between blobs for channel 1
355 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1
356 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1
357 chan_1_watershed_threshold_method: Thresholding method for channel 1
358 chan_1_enable_preprocessing: Apply preprocessing to channel 1
359 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1
360 chan_1_median_disk_size: Median filter size for channel 1
361 chan_1_min_area: Minimum cell area for channel 1
362 chan_1_max_area: Maximum cell area for channel 1
363 chan_1_remove_border_cells: Remove border cells for channel 1
365 # Channel 2 detection parameters (same as single-channel function)
366 chan_2_method: Detection method for channel 2 (DetectionMethod enum)
367 chan_2_min_sigma: Minimum blob size for channel 2
368 chan_2_max_sigma: Maximum blob size for channel 2
369 chan_2_num_sigma: Number of sigma values to test for channel 2
370 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0)
371 chan_2_overlap: Maximum overlap between blobs for channel 2
372 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2
373 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2
374 chan_2_watershed_threshold_method: Thresholding method for channel 2
375 chan_2_enable_preprocessing: Apply preprocessing to channel 2
376 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2
377 chan_2_median_disk_size: Median filter size for channel 2
378 chan_2_min_area: Minimum cell area for channel 2
379 chan_2_max_area: Maximum cell area for channel 2
380 chan_2_remove_border_cells: Remove border cells for channel 2
382 # Colocalization parameters
383 colocalization_method: Method for determining colocalization (ColocalizationMethod enum)
384 max_distance: Maximum distance for distance-based colocalization (pixels)
385 min_overlap_area: Minimum overlap fraction for area-based colocalization
386 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0)
387 return_colocalization_map: Return colocalization visualization
389 Returns:
390 output_stack: Original images or colocalization maps
391 multi_channel_results: List of MultiChannelResult objects
392 """
393 if image_stack.ndim != 3:
394 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
396 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]:
397 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}")
399 if chan_1 == chan_2:
400 raise ValueError("Channel 1 and Channel 2 must be different")
402 # Extract channel images
403 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency
404 chan_2_img = image_stack[chan_2:chan_2+1]
406 # Count cells in each channel separately using the single-channel function
407 # Channel 1 parameters (all explicit)
408 chan_1_params = {
409 "detection_method": chan_1_method,
410 "min_sigma": chan_1_min_sigma,
411 "max_sigma": chan_1_max_sigma,
412 "num_sigma": chan_1_num_sigma,
413 "threshold": chan_1_threshold,
414 "overlap": chan_1_overlap,
415 "watershed_footprint_size": chan_1_watershed_footprint_size,
416 "watershed_min_distance": chan_1_watershed_min_distance,
417 "watershed_threshold_method": chan_1_watershed_threshold_method,
418 "enable_preprocessing": chan_1_enable_preprocessing,
419 "gaussian_sigma": chan_1_gaussian_sigma,
420 "median_disk_size": chan_1_median_disk_size,
421 "min_cell_area": chan_1_min_area,
422 "max_cell_area": chan_1_max_area,
423 "remove_border_cells": chan_1_remove_border_cells,
424 "return_segmentation_mask": False
425 }
427 # Channel 2 parameters (all explicit)
428 chan_2_params = {
429 "detection_method": chan_2_method,
430 "min_sigma": chan_2_min_sigma,
431 "max_sigma": chan_2_max_sigma,
432 "num_sigma": chan_2_num_sigma,
433 "threshold": chan_2_threshold,
434 "overlap": chan_2_overlap,
435 "watershed_footprint_size": chan_2_watershed_footprint_size,
436 "watershed_min_distance": chan_2_watershed_min_distance,
437 "watershed_threshold_method": chan_2_watershed_threshold_method,
438 "enable_preprocessing": chan_2_enable_preprocessing,
439 "gaussian_sigma": chan_2_gaussian_sigma,
440 "median_disk_size": chan_2_median_disk_size,
441 "min_cell_area": chan_2_min_area,
442 "max_cell_area": chan_2_max_area,
443 "remove_border_cells": chan_2_remove_border_cells,
444 "return_segmentation_mask": False
445 }
447 # Process each channel
448 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params)
449 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params)
451 # Perform colocalization analysis
452 multi_results = []
453 output_stack = image_stack.copy()
455 # Since we're processing single slices from each channel, we only have one result each
456 chan_1_result = chan_1_results[0]
457 chan_2_result = chan_2_results[0]
459 # Analyze colocalization
460 coloc_result = _analyze_colocalization(
461 chan_1_result, chan_2_result, colocalization_method.value,
462 max_distance, min_overlap_area, intensity_threshold
463 )
465 multi_results.append(coloc_result)
467 # Create colocalization visualization if requested
468 if return_colocalization_map:
469 coloc_map = _create_colocalization_map(
470 image_stack[chan_1], image_stack[chan_2], coloc_result
471 )
472 # Replace one of the channels with the colocalization map
473 output_stack = cp.stack([image_stack[chan_1], image_stack[chan_2], coloc_map])
475 return output_stack, multi_results
478def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager) -> str:
479 """Materialize single-channel cell counting results."""
480 # Generate output file paths based on the input path
481 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
482 base_path = path.replace('.pkl', '')
483 json_path = f"{base_path}.json"
484 csv_path = f"{base_path}_details.csv"
486 # Ensure output directory exists for disk backend
487 from pathlib import Path
488 from openhcs.constants.constants import Backend
489 output_dir = Path(json_path).parent
490 filemanager.ensure_directory(str(output_dir), Backend.DISK.value)
492 summary = {
493 "analysis_type": "single_channel_cell_counting",
494 "total_slices": len(data),
495 "results_per_slice": []
496 }
497 rows = []
499 total_cells = 0
500 for result in data:
501 total_cells += result.cell_count
503 # Add to summary
504 summary["results_per_slice"].append({
505 "slice_index": result.slice_index,
506 "method": result.method,
507 "cell_count": result.cell_count,
508 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0,
509 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0,
510 "parameters": result.parameters_used
511 })
513 # Add individual cell data to CSV
514 for i, (pos, area, intensity, confidence) in enumerate(zip(
515 result.cell_positions, result.cell_areas,
516 result.cell_intensities, result.detection_confidence
517 )):
518 rows.append({
519 'slice_index': result.slice_index,
520 'cell_id': f"slice_{result.slice_index}_cell_{i}",
521 'x_position': pos[0],
522 'y_position': pos[1],
523 'cell_area': area,
524 'cell_intensity': intensity,
525 'detection_confidence': confidence,
526 'detection_method': result.method
527 })
529 summary["total_cells_all_slices"] = total_cells
530 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0
532 # Save JSON summary (overwrite if exists)
533 json_content = json.dumps(summary, indent=2, default=str)
534 # Remove existing file if it exists using filemanager
535 if filemanager.exists(json_path, Backend.DISK.value):
536 filemanager.delete(json_path, Backend.DISK.value)
537 filemanager.save(json_content, json_path, Backend.DISK.value)
539 # Save CSV details (overwrite if exists)
540 if rows:
541 df = pd.DataFrame(rows)
542 csv_content = df.to_csv(index=False)
543 # Remove existing file if it exists using filemanager
544 if filemanager.exists(csv_path, Backend.DISK.value):
545 filemanager.delete(csv_path, Backend.DISK.value)
546 filemanager.save(csv_content, csv_path, Backend.DISK.value)
548 return json_path
551def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager) -> str:
552 """Materialize multi-channel cell counting and colocalization results."""
553 # Generate output file paths based on the input path
554 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
555 base_path = path.replace('.pkl', '')
556 json_path = f"{base_path}.json"
557 csv_path = f"{base_path}_details.csv"
559 # Ensure output directory exists for disk backend
560 from pathlib import Path
561 output_dir = Path(json_path).parent
562 filemanager.ensure_directory(str(output_dir), Backend.DISK.value)
564 summary = {
565 "analysis_type": "multi_channel_cell_counting_colocalization",
566 "total_slices": len(data),
567 "colocalization_summary": {
568 "total_chan_1_cells": 0,
569 "total_chan_2_cells": 0,
570 "total_colocalized": 0,
571 "average_colocalization_percentage": 0
572 },
573 "results_per_slice": []
574 }
576 # CSV for detailed analysis (csv_path already defined above)
577 rows = []
579 total_coloc_pct = 0
580 for result in data:
581 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count
582 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count
583 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count
584 total_coloc_pct += result.colocalization_percentage
586 # Add to summary
587 summary["results_per_slice"].append({
588 "slice_index": result.slice_index,
589 "chan_1_count": result.chan_1_results.cell_count,
590 "chan_2_count": result.chan_2_results.cell_count,
591 "colocalized_count": result.colocalized_count,
592 "colocalization_percentage": result.colocalization_percentage,
593 "chan_1_only": result.chan_1_only_count,
594 "chan_2_only": result.chan_2_only_count,
595 "colocalization_method": result.colocalization_method,
596 "colocalization_metrics": result.colocalization_metrics
597 })
599 # Add colocalization details to CSV
600 for i, pos in enumerate(result.overlap_positions):
601 rows.append({
602 'slice_index': result.slice_index,
603 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}",
604 'x_position': pos[0],
605 'y_position': pos[1],
606 'colocalization_method': result.colocalization_method
607 })
609 summary["colocalization_summary"]["average_colocalization_percentage"] = (
610 total_coloc_pct / len(data) if data else 0
611 )
613 # Save JSON summary (overwrite if exists)
614 json_content = json.dumps(summary, indent=2, default=str)
615 # Remove existing file if it exists using filemanager
616 if filemanager.exists(json_path, Backend.DISK.value):
617 filemanager.delete(json_path, Backend.DISK.value)
618 filemanager.save(json_content, json_path, Backend.DISK.value)
620 # Save CSV details (overwrite if exists)
621 if rows:
622 df = pd.DataFrame(rows)
623 csv_content = df.to_csv(index=False)
624 # Remove existing file if it exists using filemanager
625 if filemanager.exists(csv_path, Backend.DISK.value):
626 filemanager.delete(csv_path, Backend.DISK.value)
627 filemanager.save(csv_content, csv_path, Backend.DISK.value)
629 return json_path
632def _preprocess_image(image: cp.ndarray, gaussian_sigma: float, median_disk_size: int) -> cp.ndarray:
633 """Apply preprocessing to enhance cell detection."""
634 # Gaussian blur to reduce noise
635 if gaussian_sigma > 0:
636 image = gaussian(image, sigma=gaussian_sigma, preserve_range=True)
638 # Median filter to remove salt-and-pepper noise
639 if median_disk_size > 0:
640 image = median(image, disk(median_disk_size))
642 return image
645def _detect_cells_single_method(
646 image: cp.ndarray,
647 slice_idx: int,
648 method: str,
649 params: Dict[str, Any]
650) -> CellCountResult:
651 """Detect cells using specified method."""
653 if method == DetectionMethod.BLOB_LOG.value:
654 return _detect_cells_blob_log(image, slice_idx, params)
655 elif method == DetectionMethod.BLOB_DOG.value:
656 return _detect_cells_blob_dog(image, slice_idx, params)
657 elif method == DetectionMethod.BLOB_DOH.value:
658 return _detect_cells_blob_doh(image, slice_idx, params)
659 elif method == DetectionMethod.WATERSHED.value:
660 return _detect_cells_watershed(image, slice_idx, params)
661 elif method == DetectionMethod.THRESHOLD.value:
662 return _detect_cells_threshold(image, slice_idx, params)
663 else:
664 raise ValueError(f"Unknown detection method: {method}")
667def _detect_cells_blob_log(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
668 """Detect cells using Laplacian of Gaussian blob detection."""
669 blobs = blob_log(
670 image,
671 min_sigma=params["min_sigma"],
672 max_sigma=params["max_sigma"],
673 num_sigma=params["num_sigma"],
674 threshold=params["threshold"],
675 overlap=params["overlap"]
676 )
678 # Extract positions, areas, and intensities
679 positions = []
680 areas = []
681 intensities = []
682 confidences = []
684 for blob in blobs:
685 y, x, sigma = blob
686 positions.append((float(x), float(y)))
688 # Estimate area from sigma (blob radius ≈ sigma * sqrt(2))
689 radius = sigma * cp.sqrt(2)
690 area = cp.pi * radius**2
691 areas.append(float(area))
693 # Sample intensity at blob center
694 intensity = float(image[int(y), int(x)])
695 intensities.append(intensity)
697 # Use sigma as confidence measure (larger blobs = higher confidence)
698 confidence = float(sigma / params["max_sigma"])
699 confidences.append(confidence)
701 # Filter by area constraints
702 filtered_data = _filter_by_area(
703 positions, areas, intensities, confidences,
704 params["min_cell_area"], params["max_cell_area"]
705 )
707 return CellCountResult(
708 slice_index=slice_idx,
709 method="blob_log",
710 cell_count=len(filtered_data[0]),
711 cell_positions=filtered_data[0],
712 cell_areas=filtered_data[1],
713 cell_intensities=filtered_data[2],
714 detection_confidence=filtered_data[3],
715 parameters_used=params
716 )
719def _detect_cells_blob_dog(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
720 """Detect cells using Difference of Gaussian blob detection."""
721 blobs = blob_dog(
722 image,
723 min_sigma=params["min_sigma"],
724 max_sigma=params["max_sigma"],
725 threshold=params["threshold"],
726 overlap=params["overlap"]
727 )
729 # Process similar to blob_log
730 positions = []
731 areas = []
732 intensities = []
733 confidences = []
735 for blob in blobs:
736 y, x, sigma = blob
737 positions.append((float(x), float(y)))
739 radius = sigma * cp.sqrt(2)
740 area = cp.pi * radius**2
741 areas.append(float(area))
743 intensity = float(image[int(y), int(x)])
744 intensities.append(intensity)
746 confidence = float(sigma / params["max_sigma"])
747 confidences.append(confidence)
749 filtered_data = _filter_by_area(
750 positions, areas, intensities, confidences,
751 params["min_cell_area"], params["max_cell_area"]
752 )
754 return CellCountResult(
755 slice_index=slice_idx,
756 method="blob_dog",
757 cell_count=len(filtered_data[0]),
758 cell_positions=filtered_data[0],
759 cell_areas=filtered_data[1],
760 cell_intensities=filtered_data[2],
761 detection_confidence=filtered_data[3],
762 parameters_used=params
763 )
766def _detect_cells_blob_doh(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
767 """Detect cells using Determinant of Hessian blob detection."""
768 blobs = blob_doh(
769 image,
770 min_sigma=params["min_sigma"],
771 max_sigma=params["max_sigma"],
772 num_sigma=params["num_sigma"],
773 threshold=params["threshold"],
774 overlap=params["overlap"]
775 )
777 # Process similar to other blob methods
778 positions = []
779 areas = []
780 intensities = []
781 confidences = []
783 for blob in blobs:
784 y, x, sigma = blob
785 positions.append((float(x), float(y)))
787 radius = sigma * cp.sqrt(2)
788 area = cp.pi * radius**2
789 areas.append(float(area))
791 intensity = float(image[int(y), int(x)])
792 intensities.append(intensity)
794 confidence = float(sigma / params["max_sigma"])
795 confidences.append(confidence)
797 filtered_data = _filter_by_area(
798 positions, areas, intensities, confidences,
799 params["min_cell_area"], params["max_cell_area"]
800 )
802 return CellCountResult(
803 slice_index=slice_idx,
804 method="blob_doh",
805 cell_count=len(filtered_data[0]),
806 cell_positions=filtered_data[0],
807 cell_areas=filtered_data[1],
808 cell_intensities=filtered_data[2],
809 detection_confidence=filtered_data[3],
810 parameters_used=params
811 )
814def _filter_by_area(
815 positions: List[Tuple[float, float]],
816 areas: List[float],
817 intensities: List[float],
818 confidences: List[float],
819 min_area: float,
820 max_area: float
821) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]:
822 """Filter detected cells by area constraints."""
823 filtered_positions = []
824 filtered_areas = []
825 filtered_intensities = []
826 filtered_confidences = []
828 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences):
829 if min_area <= area <= max_area:
830 filtered_positions.append(pos)
831 filtered_areas.append(area)
832 filtered_intensities.append(intensity)
833 filtered_confidences.append(confidence)
835 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences
838def _detect_cells_watershed(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
839 """Detect cells using watershed segmentation."""
840 # Determine threshold
841 if params["watershed_threshold_method"] == "otsu":
842 threshold_val = threshold_otsu(image)
843 elif params["watershed_threshold_method"] == "li":
844 threshold_val = threshold_li(image)
845 else:
846 threshold_val = float(params["watershed_threshold_method"])
848 # Create binary mask
849 binary = image > threshold_val
851 # Remove small objects and border objects
852 binary = remove_small_objects(binary, min_size=params["min_cell_area"])
853 if params["remove_border_cells"]:
854 binary = clear_border(binary)
856 # Find local maxima as seeds
857 distance = ndimage.distance_transform_edt(binary)
858 local_maxima = peak_local_max(
859 distance,
860 min_distance=params["watershed_min_distance"],
861 footprint=cp.ones((params["watershed_footprint_size"], params["watershed_footprint_size"]))
862 )
864 # Convert coordinates to binary mask
865 local_maxima_mask = cp.zeros_like(distance, dtype=bool)
866 if len(local_maxima) > 0:
867 local_maxima_mask[local_maxima[:, 0], local_maxima[:, 1]] = True
869 # Create markers for watershed
870 # Convert boolean mask to integer labels for connected components
871 markers = label(local_maxima_mask.astype(cp.uint8))
873 # Apply watershed
874 labels = watershed(-distance, markers, mask=binary)
876 # Extract region properties
877 regions = regionprops(labels, intensity_image=image)
879 positions = []
880 areas = []
881 intensities = []
882 confidences = []
883 valid_labels = [] # Track which labels pass the size filter
885 for region in regions:
886 # Filter by area
887 if params["min_cell_area"] <= region.area <= params["max_cell_area"]:
888 # Centroid (note: regionprops returns (row, col) = (y, x))
889 y, x = region.centroid
890 positions.append((float(x), float(y)))
892 areas.append(float(region.area))
893 intensities.append(float(region.mean_intensity))
895 # Use area as confidence measure (normalized)
896 confidence = min(1.0, region.area / params["max_cell_area"])
897 confidences.append(confidence)
899 # Track this label as valid
900 valid_labels.append(region.label)
902 # Create filtered binary mask with only cells that passed size filter
903 filtered_binary_mask = cp.isin(labels, cp.array(valid_labels))
905 return CellCountResult(
906 slice_index=slice_idx,
907 method="watershed",
908 cell_count=len(positions),
909 cell_positions=positions,
910 cell_areas=areas,
911 cell_intensities=intensities,
912 detection_confidence=confidences,
913 parameters_used=params,
914 binary_mask=filtered_binary_mask # Only cells that passed all filters
915 )
918def _detect_cells_threshold(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
919 """Detect cells using simple thresholding and connected components."""
920 # Apply threshold
921 binary = image > params["threshold"] * image.max()
923 # Remove small objects and border objects
924 binary = remove_small_objects(binary, min_size=params["min_cell_area"])
925 if params["remove_border_cells"]:
926 binary = clear_border(binary)
928 # Label connected components
929 labels = label(binary)
930 regions = regionprops(labels, intensity_image=image)
932 positions = []
933 areas = []
934 intensities = []
935 confidences = []
936 valid_labels = [] # Track which labels pass the size filter
938 for region in regions:
939 # Filter by area
940 if params["min_cell_area"] <= region.area <= params["max_cell_area"]:
941 y, x = region.centroid
942 positions.append((float(x), float(y)))
944 areas.append(float(region.area))
945 intensities.append(float(region.mean_intensity))
947 # Use intensity as confidence measure
948 confidence = float(region.mean_intensity / image.max())
949 confidences.append(confidence)
951 # Track this label as valid
952 valid_labels.append(region.label)
954 # Create filtered binary mask with only cells that passed size filter
955 filtered_binary_mask = cp.isin(labels, cp.array(valid_labels))
957 return CellCountResult(
958 slice_index=slice_idx,
959 method="threshold",
960 cell_count=len(positions),
961 cell_positions=positions,
962 cell_areas=areas,
963 cell_intensities=intensities,
964 detection_confidence=confidences,
965 parameters_used=params,
966 binary_mask=filtered_binary_mask # Only cells that passed all filters
967 )
970def _analyze_colocalization(
971 chan_1_result: CellCountResult,
972 chan_2_result: CellCountResult,
973 method: str,
974 max_distance: float,
975 min_overlap_area: float,
976 intensity_threshold: float
977) -> MultiChannelResult:
978 """Analyze colocalization between two channels."""
980 if method == ColocalizationMethod.DISTANCE_BASED.value:
981 return _colocalization_distance_based(
982 chan_1_result, chan_2_result, max_distance
983 )
984 elif method == ColocalizationMethod.OVERLAP_AREA.value:
985 return _colocalization_overlap_based(
986 chan_1_result, chan_2_result, min_overlap_area
987 )
988 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value:
989 return _colocalization_intensity_based(
990 chan_1_result, chan_2_result, intensity_threshold
991 )
992 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value:
993 return _colocalization_manders(
994 chan_1_result, chan_2_result, intensity_threshold
995 )
996 else:
997 raise ValueError(f"Unknown colocalization method: {method}")
1000def _colocalization_distance_based(
1001 chan_1_result: CellCountResult,
1002 chan_2_result: CellCountResult,
1003 max_distance: float
1004) -> MultiChannelResult:
1005 """Perform distance-based colocalization analysis."""
1007 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1008 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based")
1010 # Convert positions to arrays
1011 pos_1 = cp.array(chan_1_result.cell_positions)
1012 pos_2 = cp.array(chan_2_result.cell_positions)
1014 # Calculate pairwise distances
1015 distances = cdist(pos_1, pos_2)
1017 # Find colocalized pairs
1018 colocalized_pairs = []
1019 used_chan_2 = set()
1021 for i in range(len(pos_1)):
1022 # Find closest cell in channel 2
1023 min_dist_idx = cp.argmin(distances[i])
1024 min_dist = distances[i, min_dist_idx]
1026 # Check if within distance threshold and not already used
1027 if min_dist <= max_distance and min_dist_idx not in used_chan_2:
1028 colocalized_pairs.append((i, min_dist_idx))
1029 used_chan_2.add(min_dist_idx)
1031 # Calculate metrics
1032 colocalized_count = len(colocalized_pairs)
1033 total_cells = len(pos_1) + len(pos_2)
1034 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1036 chan_1_only = len(pos_1) - colocalized_count
1037 chan_2_only = len(pos_2) - colocalized_count
1039 # Extract colocalized positions (average of paired positions)
1040 overlap_positions = []
1041 for i, j in colocalized_pairs:
1042 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist()
1043 overlap_positions.append(tuple(avg_pos))
1045 # Calculate additional metrics
1046 if colocalized_pairs:
1047 avg_distance = cp.mean([distances[i, j] for i, j in colocalized_pairs])
1048 max_distance_found = cp.max([distances[i, j] for i, j in colocalized_pairs])
1049 else:
1050 avg_distance = 0
1051 max_distance_found = 0
1053 metrics = {
1054 "average_colocalization_distance": float(avg_distance),
1055 "max_colocalization_distance": float(max_distance_found),
1056 "distance_threshold_used": max_distance
1057 }
1059 return MultiChannelResult(
1060 slice_index=chan_1_result.slice_index,
1061 chan_1_results=chan_1_result,
1062 chan_2_results=chan_2_result,
1063 colocalization_method="distance_based",
1064 colocalized_count=colocalized_count,
1065 colocalization_percentage=colocalization_percentage,
1066 chan_1_only_count=chan_1_only,
1067 chan_2_only_count=chan_2_only,
1068 colocalization_metrics=metrics,
1069 overlap_positions=overlap_positions
1070 )
1073def _create_empty_coloc_result(
1074 chan_1_result: CellCountResult,
1075 chan_2_result: CellCountResult,
1076 method: str
1077) -> MultiChannelResult:
1078 """Create empty colocalization result when no cells found."""
1079 return MultiChannelResult(
1080 slice_index=chan_1_result.slice_index,
1081 chan_1_results=chan_1_result,
1082 chan_2_results=chan_2_result,
1083 colocalization_method=method,
1084 colocalized_count=0,
1085 colocalization_percentage=0.0,
1086 chan_1_only_count=chan_1_result.cell_count,
1087 chan_2_only_count=chan_2_result.cell_count,
1088 colocalization_metrics={},
1089 overlap_positions=[]
1090 )
1093def _colocalization_overlap_based(
1094 chan_1_result: CellCountResult,
1095 chan_2_result: CellCountResult,
1096 min_overlap_area: float
1097) -> MultiChannelResult:
1098 """Perform area overlap-based colocalization analysis."""
1099 # This is a simplified implementation - in practice, you'd need actual segmentation masks
1100 # For now, we'll use distance as a proxy for overlap
1102 # Use distance-based method with smaller threshold as approximation
1103 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly
1105 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold)
1106 result.colocalization_method = "overlap_area"
1107 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area
1108 result.colocalization_metrics["note"] = "Approximated using distance-based method"
1110 return result
1113def _colocalization_intensity_based(
1114 chan_1_result: CellCountResult,
1115 chan_2_result: CellCountResult,
1116 intensity_threshold: float
1117) -> MultiChannelResult:
1118 """Perform intensity correlation-based colocalization analysis."""
1120 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1121 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation")
1123 # Use distance-based pairing first
1124 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0)
1126 # Filter pairs based on intensity correlation
1127 colocalized_pairs = []
1128 overlap_positions = []
1130 pos_1 = cp.array(chan_1_result.cell_positions)
1131 pos_2 = cp.array(chan_2_result.cell_positions)
1133 for i, (x1, y1) in enumerate(chan_1_result.cell_positions):
1134 for j, (x2, y2) in enumerate(chan_2_result.cell_positions):
1135 # Calculate distance
1136 dist = cp.sqrt((x1 - x2)**2 + (y1 - y2)**2)
1138 if dist <= 5.0: # Within reasonable distance
1139 # Check intensity correlation
1140 int_1 = chan_1_result.cell_intensities[i]
1141 int_2 = chan_2_result.cell_intensities[j]
1143 # Simple intensity correlation: both above threshold
1144 if int_1 >= intensity_threshold and int_2 >= intensity_threshold:
1145 colocalized_pairs.append((i, j))
1146 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2)
1147 overlap_positions.append(avg_pos)
1148 break # One-to-one mapping
1150 colocalized_count = len(colocalized_pairs)
1151 total_cells = len(pos_1) + len(pos_2)
1152 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1154 metrics = {
1155 "intensity_threshold_used": intensity_threshold,
1156 "correlation_method": "threshold_based"
1157 }
1159 return MultiChannelResult(
1160 slice_index=chan_1_result.slice_index,
1161 chan_1_results=chan_1_result,
1162 chan_2_results=chan_2_result,
1163 colocalization_method="intensity_correlation",
1164 colocalized_count=colocalized_count,
1165 colocalization_percentage=colocalization_percentage,
1166 chan_1_only_count=len(pos_1) - colocalized_count,
1167 chan_2_only_count=len(pos_2) - colocalized_count,
1168 colocalization_metrics=metrics,
1169 overlap_positions=overlap_positions
1170 )
1173def _colocalization_manders(
1174 chan_1_result: CellCountResult,
1175 chan_2_result: CellCountResult,
1176 intensity_threshold: float
1177) -> MultiChannelResult:
1178 """Calculate Manders colocalization coefficients."""
1180 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1181 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients")
1183 # Simplified Manders calculation based on detected cells
1184 # In practice, this would use pixel-level intensity analysis
1186 # Use intensity-based method as foundation
1187 intensity_result = _colocalization_intensity_based(
1188 chan_1_result, chan_2_result, intensity_threshold
1189 )
1191 # Calculate Manders-like coefficients
1192 total_int_1 = sum(chan_1_result.cell_intensities)
1193 total_int_2 = sum(chan_2_result.cell_intensities)
1195 # Simplified: assume colocalized cells contribute their full intensity
1196 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in
1197 [(i, j) for i in range(len(chan_1_result.cell_positions))
1198 for j in range(len(chan_2_result.cell_positions))
1199 if (i, j) in [(0, 0)]]) # Simplified placeholder
1201 # Manders coefficients (M1 and M2)
1202 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0
1203 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified
1205 intensity_result.colocalization_method = "manders_coefficients"
1206 intensity_result.colocalization_metrics.update({
1207 "manders_m1": m1,
1208 "manders_m2": m2,
1209 "note": "Simplified cell-based Manders calculation"
1210 })
1212 return intensity_result
1215def _create_segmentation_visualization(
1216 image: cp.ndarray,
1217 positions: List[Tuple[float, float]],
1218 max_sigma: float,
1219 cell_areas: List[float] = None,
1220 binary_mask: cp.ndarray = None
1221) -> cp.ndarray:
1222 """Create segmentation visualization using actual binary mask if available."""
1224 # If we have the actual binary mask from detection, use it directly
1225 if binary_mask is not None:
1226 # Convert boolean mask to uint16 to match input image dtype
1227 # Use max intensity for detected cells, 0 for background
1228 max_intensity = image.max() if image.max() > 0 else 65535
1229 return (binary_mask * max_intensity).astype(image.dtype)
1231 # Fallback to original circular marker approach for blob methods
1232 visualization = image.copy()
1234 # Mark detected cells with their actual sizes
1235 for i, (x, y) in enumerate(positions):
1236 # Use actual cell area if available, otherwise fall back to max_sigma
1237 if cell_areas and i < len(cell_areas):
1238 # Convert area to radius (assuming circular cells)
1239 radius = cp.sqrt(cell_areas[i] / cp.pi)
1240 else:
1241 # Fallback to max_sigma for backward compatibility
1242 radius = max_sigma * 2
1244 # Create circular markers with actual cell size
1245 rr, cc = cp.ogrid[:image.shape[0], :image.shape[1]]
1246 mask = (rr - y)**2 + (cc - x)**2 <= radius**2
1248 # Ensure indices are within bounds
1249 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1])
1250 mask = mask & valid_mask
1252 visualization[mask] = visualization.max() # Bright markers
1254 return visualization
1257def _create_colocalization_map(
1258 chan_1_img: cp.ndarray,
1259 chan_2_img: cp.ndarray,
1260 coloc_result: MultiChannelResult
1261) -> cp.ndarray:
1262 """Create colocalization visualization map."""
1263 # Create RGB-like visualization
1264 coloc_map = cp.zeros_like(chan_1_img)
1266 # Mark colocalized positions
1267 for x, y in coloc_result.overlap_positions:
1268 # Create markers for colocalized cells
1269 rr, cc = cp.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]]
1270 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius
1272 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1])
1273 mask = mask & valid_mask
1275 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers
1277 return coloc_map