Coverage for openhcs/processing/backends/analysis/cell_counting_cupy.py: 14.2%
463 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2CuPy-based cell counting and multi-channel colocalization analysis for OpenHCS.
4This module provides comprehensive cell counting capabilities using CuPy and CuCIM,
5supporting both single-channel and multi-channel analysis with various detection
6methods and colocalization metrics.
7"""
9import numpy as np # Keep for CPU fallbacks and data conversion
10import logging
11from typing import Dict, List, Tuple, Any, Optional, Union, TYPE_CHECKING
12from dataclasses import dataclass
13from enum import Enum
15# Import CuPy using the established optional import pattern
16from openhcs.core.utils import optional_import
18cp = optional_import("cupy")
20# Type checking imports
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 pass
24logger = logging.getLogger(__name__)
26# Core scientific computing imports
27import pandas as pd
28import json
30# Optional imports using established pattern
31cupyx_scipy = optional_import("cupyx.scipy")
32cupyx_spatial_distance = optional_import("cupyx.scipy.spatial.distance")
33cucim_feature = optional_import("cucim.skimage.feature")
34cucim_filters = optional_import("cucim.skimage.filters")
35cucim_segmentation = optional_import("cucim.skimage.segmentation")
36cucim_morphology = optional_import("cucim.skimage.morphology")
37cucim_measure = optional_import("cucim.skimage.measure")
38cucim_exposure = optional_import("cucim.skimage.exposure")
42# OpenHCS imports
43from openhcs.core.memory.decorators import cupy as cupy_func
44from openhcs.core.pipeline.function_contracts import special_outputs
45from openhcs.constants.constants import Backend
48class DetectionMethod(Enum):
49 """Cell detection methods available."""
50 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells)
51 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose)
52 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells)
53 WATERSHED = "watershed" # Watershed segmentation (best for touching cells)
54 THRESHOLD = "threshold" # Simple thresholding (fastest, basic)
57class ColocalizationMethod(Enum):
58 """Methods for multi-channel colocalization analysis."""
59 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap
60 DISTANCE_BASED = "distance_based" # Based on centroid distances
61 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation
62 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients
65class ThresholdMethod(Enum):
66 """Automatic thresholding methods for watershed segmentation."""
67 OTSU = "otsu" # Otsu's method (good for bimodal histograms)
68 LI = "li" # Li's method (good for low contrast images)
69 MANUAL = "manual" # Manual threshold value (0.0-1.0)
75@dataclass
76class CellCountResult:
77 """Results for single-channel cell counting."""
78 slice_index: int
79 method: str
80 cell_count: int
81 cell_positions: List[Tuple[float, float]] # (x, y) centroids
82 cell_areas: List[float]
83 cell_intensities: List[float]
84 detection_confidence: List[float]
85 parameters_used: Dict[str, Any]
86 binary_mask: Optional[cp.ndarray] = None # Actual binary mask for segmentation methods
89@dataclass
90class MultiChannelResult:
91 """Results for multi-channel cell counting and colocalization."""
92 slice_index: int
93 chan_1_results: CellCountResult
94 chan_2_results: CellCountResult
95 colocalization_method: str
96 colocalized_count: int
97 colocalization_percentage: float
98 chan_1_only_count: int
99 chan_2_only_count: int
100 colocalization_metrics: Dict[str, float]
101 overlap_positions: List[Tuple[float, float]]
104def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager, backend: str) -> str:
105 """Materialize cell counting results as analysis-ready CSV and JSON formats."""
107 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}")
109 # Convert CuPy binary_mask to NumPy if present
110 if data:
111 for result in data:
112 if result.binary_mask is not None:
113 result.binary_mask = cp.asnumpy(result.binary_mask)
114 if isinstance(result, MultiChannelResult):
115 if result.chan_1_results.binary_mask is not None:
116 result.chan_1_results.binary_mask = cp.asnumpy(result.chan_1_results.binary_mask)
117 if result.chan_2_results.binary_mask is not None:
118 result.chan_2_results.binary_mask = cp.asnumpy(result.chan_2_results.binary_mask)
120 # Determine if this is single-channel or multi-channel data
121 if not data:
122 logger.warning("🔬 CELL_COUNT_MATERIALIZE: No data to materialize")
123 return path
125 is_multi_channel = isinstance(data[0], MultiChannelResult)
126 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}")
128 if is_multi_channel:
129 return _materialize_multi_channel_results(data, path, filemanager, backend)
130 else:
131 return _materialize_single_channel_results(data, path, filemanager, backend)
134def materialize_segmentation_masks(data: List[cp.ndarray], path: str, filemanager, backend: str) -> str:
135 """
136 Materialize segmentation masks as individual TIFF files.
138 Single-backend signature - decorator automatically wraps for multi-backend support.
139 """
141 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, backend={backend}, masks_count={len(data) if data else 0}")
143 # Convert CuPy arrays to NumPy
144 if data:
145 data = [cp.asnumpy(mask) for mask in data]
147 if not data:
148 logger.info("🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)")
149 # Create empty summary file to indicate no masks were generated
150 summary_path = path.replace('.pkl', '_segmentation_summary.txt')
151 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n"
152 filemanager.save(summary_content, summary_path, backend)
153 return summary_path
155 # Generate output file paths based on the input path
156 base_path = path.replace('.pkl', '')
158 # Save each mask as a separate TIFF file
159 for i, mask in enumerate(data):
160 mask_filename = f"{base_path}_slice_{i:03d}.tif"
162 # Convert mask to appropriate dtype for saving (uint16 to match input images)
163 if mask.dtype != np.uint16:
164 # Normalize to uint16 range if needed
165 if mask.max() <= 1.0:
166 mask_uint16 = (mask * 65535).astype(np.uint16)
167 else:
168 mask_uint16 = mask.astype(np.uint16)
169 else:
170 mask_uint16 = mask
172 # Save using filemanager with provided backend
173 filemanager.save(mask_uint16, mask_filename, backend)
174 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename} (backend={backend})")
176 # Return summary path
177 summary_path = f"{base_path}_segmentation_summary.txt"
178 summary_content = f"Segmentation masks saved: {len(data)} files\n"
179 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n"
180 summary_content += f"Mask dtype: {data[0].dtype}\n"
181 summary_content += f"Mask shape: {data[0].shape}\n"
183 filemanager.save(summary_content, summary_path, backend)
184 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks to backend={backend}")
186 return summary_path
189@cupy_func
190@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks))
191def count_cells_single_channel(
192 image_stack: cp.ndarray,
193 # Detection method and parameters
194 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
195 # Blob detection parameters
196 min_sigma: float = 1.0, # Minimum blob size (pixels)
197 max_sigma: float = 10.0, # Maximum blob size (pixels)
198 num_sigma: int = 10, # Number of sigma values to test
199 threshold: float = 0.1, # Detection threshold (0.0-1.0)
200 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
201 # Watershed parameters
202 watershed_footprint_size: int = 3, # Local maxima footprint size
203 watershed_min_distance: int = 5, # Minimum distance between peaks
204 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods
205 # Preprocessing parameters
206 enable_preprocessing: bool = True,
207 gaussian_sigma: float = 1.0, # Gaussian blur sigma
208 median_disk_size: int = 1, # Median filter disk size
209 # Filtering parameters
210 min_cell_area: int = 10, # Minimum cell area (pixels)
211 max_cell_area: int = 1000, # Maximum cell area (pixels)
212 remove_border_cells: bool = True, # Remove cells touching image border
213 # Output parameters
214 return_segmentation_mask: bool = False
215) -> Tuple[cp.ndarray, List[CellCountResult]]:
216 """
217 Count cells in single-channel image stack using various detection methods.
219 Args:
220 image_stack: 3D CuPy array (Z, Y, X) where each Z slice is processed independently
221 detection_method: Method for cell detection (see DetectionMethod enum)
222 min_sigma: Minimum blob size for blob detection methods
223 max_sigma: Maximum blob size for blob detection methods
224 num_sigma: Number of sigma values to test for blob detection
225 threshold: Detection threshold (method-dependent)
226 overlap: Maximum overlap between detected blobs
227 watershed_footprint_size: Footprint size for local maxima detection
228 watershed_min_distance: Minimum distance between watershed peaks
229 watershed_threshold_method: Thresholding method for watershed
230 enable_preprocessing: Apply Gaussian and median filtering
231 gaussian_sigma: Standard deviation for Gaussian blur
232 median_disk_size: Disk size for median filtering
233 min_cell_area: Minimum area for valid cells
234 max_cell_area: Maximum area for valid cells
235 remove_border_cells: Remove cells touching image borders
236 return_segmentation_mask: Return segmentation masks in output
238 Returns:
239 output_stack: Original image stack unchanged (Z, Y, X)
240 cell_count_results: List of CellCountResult objects for each slice
241 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True
242 """
243 if image_stack.ndim != 3:
244 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
246 results = []
247 segmentation_masks = []
249 # Store parameters for reproducibility (convert enums to values)
250 parameters = {
251 "detection_method": detection_method.value,
252 "min_sigma": min_sigma,
253 "max_sigma": max_sigma,
254 "num_sigma": num_sigma,
255 "threshold": threshold,
256 "overlap": overlap,
257 "watershed_footprint_size": watershed_footprint_size,
258 "watershed_min_distance": watershed_min_distance,
259 "watershed_threshold_method": watershed_threshold_method.value,
260 "gaussian_sigma": gaussian_sigma,
261 "median_disk_size": median_disk_size,
262 "min_cell_area": min_cell_area,
263 "max_cell_area": max_cell_area,
264 "remove_border_cells": remove_border_cells
265 }
267 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method")
269 for z_idx in range(image_stack.shape[0]):
270 slice_img = image_stack[z_idx].astype(cp.float64)
272 # Apply preprocessing if enabled
273 if enable_preprocessing:
274 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size)
276 # Detect cells using specified method
277 result = _detect_cells_single_method(
278 slice_img, z_idx, detection_method.value, parameters
279 )
281 results.append(result)
283 # Create segmentation mask if requested
284 if return_segmentation_mask:
285 segmentation_mask = _create_segmentation_visualization(
286 slice_img, result.cell_positions, max_sigma, result.cell_areas, result.binary_mask
287 )
288 segmentation_masks.append(segmentation_mask)
290 # Always return segmentation masks (empty list if not requested)
291 # This ensures consistent return signature for special outputs system
292 return image_stack, results, segmentation_masks
295@cupy_func
296@special_outputs(("multi_channel_counts", materialize_cell_counts))
297def count_cells_multi_channel(
298 image_stack: cp.ndarray,
299 chan_1: int, # Index of first channel (positional arg)
300 chan_2: int, # Index of second channel (positional arg)
301 # Detection parameters for channel 1 (all single-channel params available)
302 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
303 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels)
304 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels)
305 chan_1_num_sigma: int = 10, # Number of sigma values to test
306 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0)
307 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
308 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size
309 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks
310 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
311 chan_1_enable_preprocessing: bool = True, # Apply preprocessing
312 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma
313 chan_1_median_disk_size: int = 1, # Median filter disk size
314 chan_1_min_area: int = 10, # Minimum cell area (pixels)
315 chan_1_max_area: int = 1000, # Maximum cell area (pixels)
316 chan_1_remove_border_cells: bool = True, # Remove cells touching border
317 # Detection parameters for channel 2 (all single-channel params available)
318 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
319 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels)
320 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels)
321 chan_2_num_sigma: int = 10, # Number of sigma values to test
322 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0)
323 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
324 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size
325 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks
326 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
327 chan_2_enable_preprocessing: bool = True, # Apply preprocessing
328 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma
329 chan_2_median_disk_size: int = 1, # Median filter disk size
330 chan_2_min_area: int = 10, # Minimum cell area (pixels)
331 chan_2_max_area: int = 1000, # Maximum cell area (pixels)
332 chan_2_remove_border_cells: bool = True, # Remove cells touching border
333 # Colocalization parameters
334 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods
335 max_distance: float = 5.0, # Maximum distance for colocalization (pixels)
336 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method
337 intensity_threshold: float = 0.5, # Threshold for intensity-based methods
338 # Output parameters
339 return_colocalization_map: bool = False
340) -> Tuple[cp.ndarray, List[MultiChannelResult]]:
341 """
342 Count cells in multi-channel image stack with colocalization analysis.
344 Each channel can be processed with independent parameters, providing the same
345 flexibility as the single-channel function for each channel separately.
347 Args:
348 image_stack: 3D CuPy array (Z, Y, X) where Z represents different channels
349 chan_1: Index of first channel in the stack (positional)
350 chan_2: Index of second channel in the stack (positional)
352 # Channel 1 detection parameters (same as single-channel function)
353 chan_1_method: Detection method for channel 1 (DetectionMethod enum)
354 chan_1_min_sigma: Minimum blob size for channel 1
355 chan_1_max_sigma: Maximum blob size for channel 1
356 chan_1_num_sigma: Number of sigma values to test for channel 1
357 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0)
358 chan_1_overlap: Maximum overlap between blobs for channel 1
359 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1
360 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1
361 chan_1_watershed_threshold_method: Thresholding method for channel 1
362 chan_1_enable_preprocessing: Apply preprocessing to channel 1
363 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1
364 chan_1_median_disk_size: Median filter size for channel 1
365 chan_1_min_area: Minimum cell area for channel 1
366 chan_1_max_area: Maximum cell area for channel 1
367 chan_1_remove_border_cells: Remove border cells for channel 1
369 # Channel 2 detection parameters (same as single-channel function)
370 chan_2_method: Detection method for channel 2 (DetectionMethod enum)
371 chan_2_min_sigma: Minimum blob size for channel 2
372 chan_2_max_sigma: Maximum blob size for channel 2
373 chan_2_num_sigma: Number of sigma values to test for channel 2
374 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0)
375 chan_2_overlap: Maximum overlap between blobs for channel 2
376 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2
377 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2
378 chan_2_watershed_threshold_method: Thresholding method for channel 2
379 chan_2_enable_preprocessing: Apply preprocessing to channel 2
380 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2
381 chan_2_median_disk_size: Median filter size for channel 2
382 chan_2_min_area: Minimum cell area for channel 2
383 chan_2_max_area: Maximum cell area for channel 2
384 chan_2_remove_border_cells: Remove border cells for channel 2
386 # Colocalization parameters
387 colocalization_method: Method for determining colocalization (ColocalizationMethod enum)
388 max_distance: Maximum distance for distance-based colocalization (pixels)
389 min_overlap_area: Minimum overlap fraction for area-based colocalization
390 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0)
391 return_colocalization_map: Return colocalization visualization
393 Returns:
394 output_stack: Original images or colocalization maps
395 multi_channel_results: List of MultiChannelResult objects
396 """
397 if image_stack.ndim != 3:
398 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
400 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]:
401 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}")
403 if chan_1 == chan_2:
404 raise ValueError("Channel 1 and Channel 2 must be different")
406 # Extract channel images
407 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency
408 chan_2_img = image_stack[chan_2:chan_2+1]
410 # Count cells in each channel separately using the single-channel function
411 # Channel 1 parameters (all explicit)
412 chan_1_params = {
413 "detection_method": chan_1_method,
414 "min_sigma": chan_1_min_sigma,
415 "max_sigma": chan_1_max_sigma,
416 "num_sigma": chan_1_num_sigma,
417 "threshold": chan_1_threshold,
418 "overlap": chan_1_overlap,
419 "watershed_footprint_size": chan_1_watershed_footprint_size,
420 "watershed_min_distance": chan_1_watershed_min_distance,
421 "watershed_threshold_method": chan_1_watershed_threshold_method,
422 "enable_preprocessing": chan_1_enable_preprocessing,
423 "gaussian_sigma": chan_1_gaussian_sigma,
424 "median_disk_size": chan_1_median_disk_size,
425 "min_cell_area": chan_1_min_area,
426 "max_cell_area": chan_1_max_area,
427 "remove_border_cells": chan_1_remove_border_cells,
428 "return_segmentation_mask": False
429 }
431 # Channel 2 parameters (all explicit)
432 chan_2_params = {
433 "detection_method": chan_2_method,
434 "min_sigma": chan_2_min_sigma,
435 "max_sigma": chan_2_max_sigma,
436 "num_sigma": chan_2_num_sigma,
437 "threshold": chan_2_threshold,
438 "overlap": chan_2_overlap,
439 "watershed_footprint_size": chan_2_watershed_footprint_size,
440 "watershed_min_distance": chan_2_watershed_min_distance,
441 "watershed_threshold_method": chan_2_watershed_threshold_method,
442 "enable_preprocessing": chan_2_enable_preprocessing,
443 "gaussian_sigma": chan_2_gaussian_sigma,
444 "median_disk_size": chan_2_median_disk_size,
445 "min_cell_area": chan_2_min_area,
446 "max_cell_area": chan_2_max_area,
447 "remove_border_cells": chan_2_remove_border_cells,
448 "return_segmentation_mask": False
449 }
451 # Process each channel
452 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params)
453 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params)
455 # Perform colocalization analysis
456 multi_results = []
457 output_stack = image_stack.copy()
459 # Since we're processing single slices from each channel, we only have one result each
460 chan_1_result = chan_1_results[0]
461 chan_2_result = chan_2_results[0]
463 # Analyze colocalization
464 coloc_result = _analyze_colocalization(
465 chan_1_result, chan_2_result, colocalization_method.value,
466 max_distance, min_overlap_area, intensity_threshold
467 )
469 multi_results.append(coloc_result)
471 # Create colocalization visualization if requested
472 if return_colocalization_map:
473 coloc_map = _create_colocalization_map(
474 image_stack[chan_1], image_stack[chan_2], coloc_result
475 )
476 # Replace one of the channels with the colocalization map
477 output_stack = cp.stack([image_stack[chan_1], image_stack[chan_2], coloc_map])
479 return output_stack, multi_results
482def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager, backend: str) -> str:
483 """Materialize single-channel cell counting results."""
484 # Generate output file paths based on the input path
485 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
486 base_path = path.replace('.pkl', '')
487 json_path = f"{base_path}.json"
488 csv_path = f"{base_path}_details.csv"
490 # Ensure output directory exists for disk backend
491 from pathlib import Path
492 from openhcs.constants.constants import Backend
493 output_dir = Path(json_path).parent
494 if backend == Backend.DISK.value:
495 filemanager.ensure_directory(str(output_dir), backend)
497 summary = {
498 "analysis_type": "single_channel_cell_counting",
499 "total_slices": len(data),
500 "results_per_slice": []
501 }
502 rows = []
504 total_cells = 0
505 for result in data:
506 total_cells += result.cell_count
508 # Add to summary
509 summary["results_per_slice"].append({
510 "slice_index": result.slice_index,
511 "method": result.method,
512 "cell_count": result.cell_count,
513 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0,
514 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0,
515 "parameters": result.parameters_used
516 })
518 # Add individual cell data to CSV
519 for i, (pos, area, intensity, confidence) in enumerate(zip(
520 result.cell_positions, result.cell_areas,
521 result.cell_intensities, result.detection_confidence
522 )):
523 rows.append({
524 'slice_index': result.slice_index,
525 'cell_id': f"slice_{result.slice_index}_cell_{i}",
526 'x_position': pos[0],
527 'y_position': pos[1],
528 'cell_area': area,
529 'cell_intensity': intensity,
530 'detection_confidence': confidence,
531 'detection_method': result.method
532 })
534 summary["total_cells_all_slices"] = total_cells
535 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0
537 # Save JSON summary (overwrite if exists)
538 json_content = json.dumps(summary, indent=2, default=str)
539 # Remove existing file if it exists using filemanager
540 if filemanager.exists(json_path, backend):
541 filemanager.delete(json_path, backend)
542 filemanager.save(json_content, json_path, backend)
544 # Save CSV details (overwrite if exists)
545 if rows:
546 df = pd.DataFrame(rows)
547 csv_content = df.to_csv(index=False)
548 # Remove existing file if it exists using filemanager
549 if filemanager.exists(csv_path, backend):
550 filemanager.delete(csv_path, backend)
551 filemanager.save(csv_content, csv_path, backend)
553 return json_path
556def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager, backend: str) -> str:
557 """Materialize multi-channel cell counting and colocalization results."""
558 # Generate output file paths based on the input path
559 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
560 base_path = path.replace('.pkl', '')
561 json_path = f"{base_path}.json"
562 csv_path = f"{base_path}_details.csv"
564 # Ensure output directory exists for disk backend
565 from pathlib import Path
566 output_dir = Path(json_path).parent
567 if backend == Backend.DISK.value:
568 filemanager.ensure_directory(str(output_dir), backend)
570 summary = {
571 "analysis_type": "multi_channel_cell_counting_colocalization",
572 "total_slices": len(data),
573 "colocalization_summary": {
574 "total_chan_1_cells": 0,
575 "total_chan_2_cells": 0,
576 "total_colocalized": 0,
577 "average_colocalization_percentage": 0
578 },
579 "results_per_slice": []
580 }
582 # CSV for detailed analysis (csv_path already defined above)
583 rows = []
585 total_coloc_pct = 0
586 for result in data:
587 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count
588 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count
589 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count
590 total_coloc_pct += result.colocalization_percentage
592 # Add to summary
593 summary["results_per_slice"].append({
594 "slice_index": result.slice_index,
595 "chan_1_count": result.chan_1_results.cell_count,
596 "chan_2_count": result.chan_2_results.cell_count,
597 "colocalized_count": result.colocalized_count,
598 "colocalization_percentage": result.colocalization_percentage,
599 "chan_1_only": result.chan_1_only_count,
600 "chan_2_only": result.chan_2_only_count,
601 "colocalization_method": result.colocalization_method,
602 "colocalization_metrics": result.colocalization_metrics
603 })
605 # Add colocalization details to CSV
606 for i, pos in enumerate(result.overlap_positions):
607 rows.append({
608 'slice_index': result.slice_index,
609 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}",
610 'x_position': pos[0],
611 'y_position': pos[1],
612 'colocalization_method': result.colocalization_method
613 })
615 summary["colocalization_summary"]["average_colocalization_percentage"] = (
616 total_coloc_pct / len(data) if data else 0
617 )
619 # Save JSON summary (overwrite if exists)
620 json_content = json.dumps(summary, indent=2, default=str)
621 # Remove existing file if it exists using filemanager
622 if filemanager.exists(json_path, backend):
623 filemanager.delete(json_path, backend)
624 filemanager.save(json_content, json_path, backend)
626 # Save CSV details (overwrite if exists)
627 if rows:
628 df = pd.DataFrame(rows)
629 csv_content = df.to_csv(index=False)
630 # Remove existing file if it exists using filemanager
631 if filemanager.exists(csv_path, backend):
632 filemanager.delete(csv_path, backend)
633 filemanager.save(csv_content, csv_path, backend)
635 return json_path
638def _preprocess_image(image: cp.ndarray, gaussian_sigma: float, median_disk_size: int) -> cp.ndarray:
639 """Apply preprocessing to enhance cell detection."""
640 # Gaussian blur to reduce noise
641 if gaussian_sigma > 0:
642 image = gaussian(image, sigma=gaussian_sigma, preserve_range=True)
644 # Median filter to remove salt-and-pepper noise
645 if median_disk_size > 0:
646 image = median(image, disk(median_disk_size))
648 return image
651def _detect_cells_single_method(
652 image: cp.ndarray,
653 slice_idx: int,
654 method: str,
655 params: Dict[str, Any]
656) -> CellCountResult:
657 """Detect cells using specified method."""
659 if method == DetectionMethod.BLOB_LOG.value:
660 return _detect_cells_blob_log(image, slice_idx, params)
661 elif method == DetectionMethod.BLOB_DOG.value:
662 return _detect_cells_blob_dog(image, slice_idx, params)
663 elif method == DetectionMethod.BLOB_DOH.value:
664 return _detect_cells_blob_doh(image, slice_idx, params)
665 elif method == DetectionMethod.WATERSHED.value:
666 return _detect_cells_watershed(image, slice_idx, params)
667 elif method == DetectionMethod.THRESHOLD.value:
668 return _detect_cells_threshold(image, slice_idx, params)
669 else:
670 raise ValueError(f"Unknown detection method: {method}")
673def _detect_cells_blob_log(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
674 """Detect cells using Laplacian of Gaussian blob detection."""
675 blobs = blob_log(
676 image,
677 min_sigma=params["min_sigma"],
678 max_sigma=params["max_sigma"],
679 num_sigma=params["num_sigma"],
680 threshold=params["threshold"],
681 overlap=params["overlap"]
682 )
684 # Extract positions, areas, and intensities
685 positions = []
686 areas = []
687 intensities = []
688 confidences = []
690 for blob in blobs:
691 y, x, sigma = blob
692 positions.append((float(x), float(y)))
694 # Estimate area from sigma (blob radius ≈ sigma * sqrt(2))
695 radius = sigma * cp.sqrt(2)
696 area = cp.pi * radius**2
697 areas.append(float(area))
699 # Sample intensity at blob center
700 intensity = float(image[int(y), int(x)])
701 intensities.append(intensity)
703 # Use sigma as confidence measure (larger blobs = higher confidence)
704 confidence = float(sigma / params["max_sigma"])
705 confidences.append(confidence)
707 # Filter by area constraints
708 filtered_data = _filter_by_area(
709 positions, areas, intensities, confidences,
710 params["min_cell_area"], params["max_cell_area"]
711 )
713 return CellCountResult(
714 slice_index=slice_idx,
715 method="blob_log",
716 cell_count=len(filtered_data[0]),
717 cell_positions=filtered_data[0],
718 cell_areas=filtered_data[1],
719 cell_intensities=filtered_data[2],
720 detection_confidence=filtered_data[3],
721 parameters_used=params
722 )
725def _detect_cells_blob_dog(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
726 """Detect cells using Difference of Gaussian blob detection."""
727 blobs = blob_dog(
728 image,
729 min_sigma=params["min_sigma"],
730 max_sigma=params["max_sigma"],
731 threshold=params["threshold"],
732 overlap=params["overlap"]
733 )
735 # Process similar to blob_log
736 positions = []
737 areas = []
738 intensities = []
739 confidences = []
741 for blob in blobs:
742 y, x, sigma = blob
743 positions.append((float(x), float(y)))
745 radius = sigma * cp.sqrt(2)
746 area = cp.pi * radius**2
747 areas.append(float(area))
749 intensity = float(image[int(y), int(x)])
750 intensities.append(intensity)
752 confidence = float(sigma / params["max_sigma"])
753 confidences.append(confidence)
755 filtered_data = _filter_by_area(
756 positions, areas, intensities, confidences,
757 params["min_cell_area"], params["max_cell_area"]
758 )
760 return CellCountResult(
761 slice_index=slice_idx,
762 method="blob_dog",
763 cell_count=len(filtered_data[0]),
764 cell_positions=filtered_data[0],
765 cell_areas=filtered_data[1],
766 cell_intensities=filtered_data[2],
767 detection_confidence=filtered_data[3],
768 parameters_used=params
769 )
772def _detect_cells_blob_doh(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
773 """Detect cells using Determinant of Hessian blob detection."""
774 blobs = blob_doh(
775 image,
776 min_sigma=params["min_sigma"],
777 max_sigma=params["max_sigma"],
778 num_sigma=params["num_sigma"],
779 threshold=params["threshold"],
780 overlap=params["overlap"]
781 )
783 # Process similar to other blob methods
784 positions = []
785 areas = []
786 intensities = []
787 confidences = []
789 for blob in blobs:
790 y, x, sigma = blob
791 positions.append((float(x), float(y)))
793 radius = sigma * cp.sqrt(2)
794 area = cp.pi * radius**2
795 areas.append(float(area))
797 intensity = float(image[int(y), int(x)])
798 intensities.append(intensity)
800 confidence = float(sigma / params["max_sigma"])
801 confidences.append(confidence)
803 filtered_data = _filter_by_area(
804 positions, areas, intensities, confidences,
805 params["min_cell_area"], params["max_cell_area"]
806 )
808 return CellCountResult(
809 slice_index=slice_idx,
810 method="blob_doh",
811 cell_count=len(filtered_data[0]),
812 cell_positions=filtered_data[0],
813 cell_areas=filtered_data[1],
814 cell_intensities=filtered_data[2],
815 detection_confidence=filtered_data[3],
816 parameters_used=params
817 )
820def _filter_by_area(
821 positions: List[Tuple[float, float]],
822 areas: List[float],
823 intensities: List[float],
824 confidences: List[float],
825 min_area: float,
826 max_area: float
827) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]:
828 """Filter detected cells by area constraints."""
829 filtered_positions = []
830 filtered_areas = []
831 filtered_intensities = []
832 filtered_confidences = []
834 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences):
835 if min_area <= area <= max_area:
836 filtered_positions.append(pos)
837 filtered_areas.append(area)
838 filtered_intensities.append(intensity)
839 filtered_confidences.append(confidence)
841 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences
844def _detect_cells_watershed(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
845 """Detect cells using watershed segmentation."""
846 # Determine threshold
847 if params["watershed_threshold_method"] == "otsu":
848 threshold_val = threshold_otsu(image)
849 elif params["watershed_threshold_method"] == "li":
850 threshold_val = threshold_li(image)
851 else:
852 threshold_val = float(params["watershed_threshold_method"])
854 # Create binary mask
855 binary = image > threshold_val
857 # Remove small objects and border objects
858 binary = remove_small_objects(binary, min_size=params["min_cell_area"])
859 if params["remove_border_cells"]:
860 binary = clear_border(binary)
862 # Find local maxima as seeds
863 distance = ndimage.distance_transform_edt(binary)
864 local_maxima = peak_local_max(
865 distance,
866 min_distance=params["watershed_min_distance"],
867 footprint=cp.ones((params["watershed_footprint_size"], params["watershed_footprint_size"]))
868 )
870 # Convert coordinates to binary mask
871 local_maxima_mask = cp.zeros_like(distance, dtype=bool)
872 if len(local_maxima) > 0:
873 local_maxima_mask[local_maxima[:, 0], local_maxima[:, 1]] = True
875 # Create markers for watershed
876 # Convert boolean mask to integer labels for connected components
877 markers = label(local_maxima_mask.astype(cp.uint8))
879 # Apply watershed
880 labels = watershed(-distance, markers, mask=binary)
882 # Extract region properties
883 regions = regionprops(labels, intensity_image=image)
885 positions = []
886 areas = []
887 intensities = []
888 confidences = []
889 valid_labels = [] # Track which labels pass the size filter
891 for region in regions:
892 # Filter by area
893 if params["min_cell_area"] <= region.area <= params["max_cell_area"]:
894 # Centroid (note: regionprops returns (row, col) = (y, x))
895 y, x = region.centroid
896 positions.append((float(x), float(y)))
898 areas.append(float(region.area))
899 intensities.append(float(region.mean_intensity))
901 # Use area as confidence measure (normalized)
902 confidence = min(1.0, region.area / params["max_cell_area"])
903 confidences.append(confidence)
905 # Track this label as valid
906 valid_labels.append(region.label)
908 # Create filtered binary mask with only cells that passed size filter
909 filtered_binary_mask = cp.isin(labels, cp.array(valid_labels))
911 return CellCountResult(
912 slice_index=slice_idx,
913 method="watershed",
914 cell_count=len(positions),
915 cell_positions=positions,
916 cell_areas=areas,
917 cell_intensities=intensities,
918 detection_confidence=confidences,
919 parameters_used=params,
920 binary_mask=filtered_binary_mask # Only cells that passed all filters
921 )
924def _detect_cells_threshold(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
925 """Detect cells using simple thresholding and connected components."""
926 # Apply threshold
927 binary = image > params["threshold"] * image.max()
929 # Remove small objects and border objects
930 binary = remove_small_objects(binary, min_size=params["min_cell_area"])
931 if params["remove_border_cells"]:
932 binary = clear_border(binary)
934 # Label connected components
935 labels = label(binary)
936 regions = regionprops(labels, intensity_image=image)
938 positions = []
939 areas = []
940 intensities = []
941 confidences = []
942 valid_labels = [] # Track which labels pass the size filter
944 for region in regions:
945 # Filter by area
946 if params["min_cell_area"] <= region.area <= params["max_cell_area"]:
947 y, x = region.centroid
948 positions.append((float(x), float(y)))
950 areas.append(float(region.area))
951 intensities.append(float(region.mean_intensity))
953 # Use intensity as confidence measure
954 confidence = float(region.mean_intensity / image.max())
955 confidences.append(confidence)
957 # Track this label as valid
958 valid_labels.append(region.label)
960 # Create filtered binary mask with only cells that passed size filter
961 filtered_binary_mask = cp.isin(labels, cp.array(valid_labels))
963 return CellCountResult(
964 slice_index=slice_idx,
965 method="threshold",
966 cell_count=len(positions),
967 cell_positions=positions,
968 cell_areas=areas,
969 cell_intensities=intensities,
970 detection_confidence=confidences,
971 parameters_used=params,
972 binary_mask=filtered_binary_mask # Only cells that passed all filters
973 )
976def _analyze_colocalization(
977 chan_1_result: CellCountResult,
978 chan_2_result: CellCountResult,
979 method: str,
980 max_distance: float,
981 min_overlap_area: float,
982 intensity_threshold: float
983) -> MultiChannelResult:
984 """Analyze colocalization between two channels."""
986 if method == ColocalizationMethod.DISTANCE_BASED.value:
987 return _colocalization_distance_based(
988 chan_1_result, chan_2_result, max_distance
989 )
990 elif method == ColocalizationMethod.OVERLAP_AREA.value:
991 return _colocalization_overlap_based(
992 chan_1_result, chan_2_result, min_overlap_area
993 )
994 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value:
995 return _colocalization_intensity_based(
996 chan_1_result, chan_2_result, intensity_threshold
997 )
998 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value:
999 return _colocalization_manders(
1000 chan_1_result, chan_2_result, intensity_threshold
1001 )
1002 else:
1003 raise ValueError(f"Unknown colocalization method: {method}")
1006def _colocalization_distance_based(
1007 chan_1_result: CellCountResult,
1008 chan_2_result: CellCountResult,
1009 max_distance: float
1010) -> MultiChannelResult:
1011 """Perform distance-based colocalization analysis."""
1013 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1014 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based")
1016 # Convert positions to arrays
1017 pos_1 = cp.array(chan_1_result.cell_positions)
1018 pos_2 = cp.array(chan_2_result.cell_positions)
1020 # Calculate pairwise distances
1021 distances = cdist(pos_1, pos_2)
1023 # Find colocalized pairs
1024 colocalized_pairs = []
1025 used_chan_2 = set()
1027 for i in range(len(pos_1)):
1028 # Find closest cell in channel 2
1029 min_dist_idx = cp.argmin(distances[i])
1030 min_dist = distances[i, min_dist_idx]
1032 # Check if within distance threshold and not already used
1033 if min_dist <= max_distance and min_dist_idx not in used_chan_2:
1034 colocalized_pairs.append((i, min_dist_idx))
1035 used_chan_2.add(min_dist_idx)
1037 # Calculate metrics
1038 colocalized_count = len(colocalized_pairs)
1039 total_cells = len(pos_1) + len(pos_2)
1040 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1042 chan_1_only = len(pos_1) - colocalized_count
1043 chan_2_only = len(pos_2) - colocalized_count
1045 # Extract colocalized positions (average of paired positions)
1046 overlap_positions = []
1047 for i, j in colocalized_pairs:
1048 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist()
1049 overlap_positions.append(tuple(avg_pos))
1051 # Calculate additional metrics
1052 if colocalized_pairs:
1053 avg_distance = cp.mean([distances[i, j] for i, j in colocalized_pairs])
1054 max_distance_found = cp.max([distances[i, j] for i, j in colocalized_pairs])
1055 else:
1056 avg_distance = 0
1057 max_distance_found = 0
1059 metrics = {
1060 "average_colocalization_distance": float(avg_distance),
1061 "max_colocalization_distance": float(max_distance_found),
1062 "distance_threshold_used": max_distance
1063 }
1065 return MultiChannelResult(
1066 slice_index=chan_1_result.slice_index,
1067 chan_1_results=chan_1_result,
1068 chan_2_results=chan_2_result,
1069 colocalization_method="distance_based",
1070 colocalized_count=colocalized_count,
1071 colocalization_percentage=colocalization_percentage,
1072 chan_1_only_count=chan_1_only,
1073 chan_2_only_count=chan_2_only,
1074 colocalization_metrics=metrics,
1075 overlap_positions=overlap_positions
1076 )
1079def _create_empty_coloc_result(
1080 chan_1_result: CellCountResult,
1081 chan_2_result: CellCountResult,
1082 method: str
1083) -> MultiChannelResult:
1084 """Create empty colocalization result when no cells found."""
1085 return MultiChannelResult(
1086 slice_index=chan_1_result.slice_index,
1087 chan_1_results=chan_1_result,
1088 chan_2_results=chan_2_result,
1089 colocalization_method=method,
1090 colocalized_count=0,
1091 colocalization_percentage=0.0,
1092 chan_1_only_count=chan_1_result.cell_count,
1093 chan_2_only_count=chan_2_result.cell_count,
1094 colocalization_metrics={},
1095 overlap_positions=[]
1096 )
1099def _colocalization_overlap_based(
1100 chan_1_result: CellCountResult,
1101 chan_2_result: CellCountResult,
1102 min_overlap_area: float
1103) -> MultiChannelResult:
1104 """Perform area overlap-based colocalization analysis."""
1105 # This is a simplified implementation - in practice, you'd need actual segmentation masks
1106 # For now, we'll use distance as a proxy for overlap
1108 # Use distance-based method with smaller threshold as approximation
1109 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly
1111 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold)
1112 result.colocalization_method = "overlap_area"
1113 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area
1114 result.colocalization_metrics["note"] = "Approximated using distance-based method"
1116 return result
1119def _colocalization_intensity_based(
1120 chan_1_result: CellCountResult,
1121 chan_2_result: CellCountResult,
1122 intensity_threshold: float
1123) -> MultiChannelResult:
1124 """Perform intensity correlation-based colocalization analysis."""
1126 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1127 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation")
1129 # Use distance-based pairing first
1130 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0)
1132 # Filter pairs based on intensity correlation
1133 colocalized_pairs = []
1134 overlap_positions = []
1136 pos_1 = cp.array(chan_1_result.cell_positions)
1137 pos_2 = cp.array(chan_2_result.cell_positions)
1139 for i, (x1, y1) in enumerate(chan_1_result.cell_positions):
1140 for j, (x2, y2) in enumerate(chan_2_result.cell_positions):
1141 # Calculate distance
1142 dist = cp.sqrt((x1 - x2)**2 + (y1 - y2)**2)
1144 if dist <= 5.0: # Within reasonable distance
1145 # Check intensity correlation
1146 int_1 = chan_1_result.cell_intensities[i]
1147 int_2 = chan_2_result.cell_intensities[j]
1149 # Simple intensity correlation: both above threshold
1150 if int_1 >= intensity_threshold and int_2 >= intensity_threshold:
1151 colocalized_pairs.append((i, j))
1152 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2)
1153 overlap_positions.append(avg_pos)
1154 break # One-to-one mapping
1156 colocalized_count = len(colocalized_pairs)
1157 total_cells = len(pos_1) + len(pos_2)
1158 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1160 metrics = {
1161 "intensity_threshold_used": intensity_threshold,
1162 "correlation_method": "threshold_based"
1163 }
1165 return MultiChannelResult(
1166 slice_index=chan_1_result.slice_index,
1167 chan_1_results=chan_1_result,
1168 chan_2_results=chan_2_result,
1169 colocalization_method="intensity_correlation",
1170 colocalized_count=colocalized_count,
1171 colocalization_percentage=colocalization_percentage,
1172 chan_1_only_count=len(pos_1) - colocalized_count,
1173 chan_2_only_count=len(pos_2) - colocalized_count,
1174 colocalization_metrics=metrics,
1175 overlap_positions=overlap_positions
1176 )
1179def _colocalization_manders(
1180 chan_1_result: CellCountResult,
1181 chan_2_result: CellCountResult,
1182 intensity_threshold: float
1183) -> MultiChannelResult:
1184 """Calculate Manders colocalization coefficients."""
1186 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1187 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients")
1189 # Simplified Manders calculation based on detected cells
1190 # In practice, this would use pixel-level intensity analysis
1192 # Use intensity-based method as foundation
1193 intensity_result = _colocalization_intensity_based(
1194 chan_1_result, chan_2_result, intensity_threshold
1195 )
1197 # Calculate Manders-like coefficients
1198 total_int_1 = sum(chan_1_result.cell_intensities)
1199 total_int_2 = sum(chan_2_result.cell_intensities)
1201 # Simplified: assume colocalized cells contribute their full intensity
1202 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in
1203 [(i, j) for i in range(len(chan_1_result.cell_positions))
1204 for j in range(len(chan_2_result.cell_positions))
1205 if (i, j) in [(0, 0)]]) # Simplified placeholder
1207 # Manders coefficients (M1 and M2)
1208 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0
1209 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified
1211 intensity_result.colocalization_method = "manders_coefficients"
1212 intensity_result.colocalization_metrics.update({
1213 "manders_m1": m1,
1214 "manders_m2": m2,
1215 "note": "Simplified cell-based Manders calculation"
1216 })
1218 return intensity_result
1221def _create_segmentation_visualization(
1222 image: cp.ndarray,
1223 positions: List[Tuple[float, float]],
1224 max_sigma: float,
1225 cell_areas: List[float] = None,
1226 binary_mask: cp.ndarray = None
1227) -> cp.ndarray:
1228 """Create segmentation visualization using actual binary mask if available."""
1230 # If we have the actual binary mask from detection, use it directly
1231 if binary_mask is not None:
1232 # Convert boolean mask to uint16 to match input image dtype
1233 # Use max intensity for detected cells, 0 for background
1234 max_intensity = image.max() if image.max() > 0 else 65535
1235 return (binary_mask * max_intensity).astype(image.dtype)
1237 # Fallback to original circular marker approach for blob methods
1238 visualization = image.copy()
1240 # Mark detected cells with their actual sizes
1241 for i, (x, y) in enumerate(positions):
1242 # Use actual cell area if available, otherwise fall back to max_sigma
1243 if cell_areas and i < len(cell_areas):
1244 # Convert area to radius (assuming circular cells)
1245 radius = cp.sqrt(cell_areas[i] / cp.pi)
1246 else:
1247 # Fallback to max_sigma for backward compatibility
1248 radius = max_sigma * 2
1250 # Create circular markers with actual cell size
1251 rr, cc = cp.ogrid[:image.shape[0], :image.shape[1]]
1252 mask = (rr - y)**2 + (cc - x)**2 <= radius**2
1254 # Ensure indices are within bounds
1255 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1])
1256 mask = mask & valid_mask
1258 visualization[mask] = visualization.max() # Bright markers
1260 return visualization
1263def _create_colocalization_map(
1264 chan_1_img: cp.ndarray,
1265 chan_2_img: cp.ndarray,
1266 coloc_result: MultiChannelResult
1267) -> cp.ndarray:
1268 """Create colocalization visualization map."""
1269 # Create RGB-like visualization
1270 coloc_map = cp.zeros_like(chan_1_img)
1272 # Mark colocalized positions
1273 for x, y in coloc_result.overlap_positions:
1274 # Create markers for colocalized cells
1275 rr, cc = cp.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]]
1276 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius
1278 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1])
1279 mask = mask & valid_mask
1281 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers
1283 return coloc_map