Coverage for openhcs/processing/backends/analysis/cell_counting_cpu.py: 14.5%
446 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2CPU-based cell counting and multi-channel colocalization analysis for OpenHCS.
4This module provides comprehensive cell counting capabilities using scikit-image,
5supporting both single-channel and multi-channel analysis with various detection
6methods and colocalization metrics.
7"""
9import numpy as np
10import logging
11from typing import Dict, List, Tuple, Any, Optional, Union
12from dataclasses import dataclass
13from enum import Enum
15logger = logging.getLogger(__name__)
17# Core scientific computing imports
18import pandas as pd
19import json
20from scipy import ndimage
21from scipy.spatial.distance import cdist
22from skimage.feature import blob_log, blob_dog, blob_doh, peak_local_max
23from skimage.filters import threshold_otsu, threshold_li, gaussian, median
24from skimage.segmentation import watershed, clear_border
25from skimage.morphology import remove_small_objects, disk, binary_erosion, binary_dilation
26from skimage.measure import label, regionprops
27from skimage import exposure
29# OpenHCS imports
30from openhcs.core.memory.decorators import numpy as numpy_func
31from openhcs.core.pipeline.function_contracts import special_outputs
32from openhcs.constants.constants import Backend
35class DetectionMethod(Enum):
36 """Cell detection methods available."""
37 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells)
38 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose)
39 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells)
40 WATERSHED = "watershed" # Watershed segmentation (best for touching cells)
41 THRESHOLD = "threshold" # Simple thresholding (fastest, basic)
44class ColocalizationMethod(Enum):
45 """Methods for multi-channel colocalization analysis."""
46 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap
47 DISTANCE_BASED = "distance_based" # Based on centroid distances
48 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation
49 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients
52class ThresholdMethod(Enum):
53 """Automatic thresholding methods for watershed segmentation."""
54 OTSU = "otsu" # Otsu's method (good for bimodal histograms)
55 LI = "li" # Li's method (good for low contrast images)
56 MANUAL = "manual" # Manual threshold value (0.0-1.0)
62@dataclass
63class CellCountResult:
64 """Results for single-channel cell counting."""
65 slice_index: int
66 method: str
67 cell_count: int
68 cell_positions: List[Tuple[float, float]] # (x, y) centroids
69 cell_areas: List[float]
70 cell_intensities: List[float]
71 detection_confidence: List[float]
72 parameters_used: Dict[str, Any]
73 binary_mask: Optional[np.ndarray] = None # Actual binary mask for segmentation methods
76@dataclass
77class MultiChannelResult:
78 """Results for multi-channel cell counting and colocalization."""
79 slice_index: int
80 chan_1_results: CellCountResult
81 chan_2_results: CellCountResult
82 colocalization_method: str
83 colocalized_count: int
84 colocalization_percentage: float
85 chan_1_only_count: int
86 chan_2_only_count: int
87 colocalization_metrics: Dict[str, float]
88 overlap_positions: List[Tuple[float, float]]
91def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager) -> str:
92 """Materialize cell counting results as analysis-ready CSV and JSON formats."""
94 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}")
96 # Determine if this is single-channel or multi-channel data
97 if not data:
98 logger.warning(f"🔬 CELL_COUNT_MATERIALIZE: No data to materialize")
99 return path
101 is_multi_channel = isinstance(data[0], MultiChannelResult)
102 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}")
104 if is_multi_channel:
105 return _materialize_multi_channel_results(data, path, filemanager)
106 else:
107 return _materialize_single_channel_results(data, path, filemanager)
110def materialize_segmentation_masks(data: List[np.ndarray], path: str, filemanager) -> str:
111 """Materialize segmentation masks as individual TIFF files."""
113 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, masks_count={len(data) if data else 0}")
115 if not data:
116 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)")
117 # Create empty summary file to indicate no masks were generated
118 summary_path = path.replace('.pkl', '_segmentation_summary.txt')
119 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n"
120 filemanager.save(summary_content, summary_path, Backend.DISK.value)
121 return summary_path
123 # Generate output file paths based on the input path
124 base_path = path.replace('.pkl', '')
126 # Save each mask as a separate TIFF file
127 for i, mask in enumerate(data):
128 mask_filename = f"{base_path}_slice_{i:03d}.tif"
130 # Convert mask to appropriate dtype for saving (uint16 to match input images)
131 if mask.dtype != np.uint16:
132 # Normalize to uint16 range if needed
133 if mask.max() <= 1.0:
134 mask_uint16 = (mask * 65535).astype(np.uint16)
135 else:
136 mask_uint16 = mask.astype(np.uint16)
137 else:
138 mask_uint16 = mask
140 # Save using filemanager
141 filemanager.save(mask_uint16, mask_filename, Backend.DISK.value)
142 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename}")
144 # Return summary path
145 summary_path = f"{base_path}_segmentation_summary.txt"
146 summary_content = f"Segmentation masks saved: {len(data)} files\n"
147 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n"
148 summary_content += f"Mask dtype: {data[0].dtype}\n"
149 summary_content += f"Mask shape: {data[0].shape}\n"
151 filemanager.save(summary_content, summary_path, Backend.DISK.value)
152 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks")
154 return summary_path
157@numpy_func
158@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks))
159def count_cells_single_channel(
160 image_stack: np.ndarray,
161 # Detection method and parameters
162 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
163 # Blob detection parameters
164 min_sigma: float = 1.0, # Minimum blob size (pixels)
165 max_sigma: float = 10.0, # Maximum blob size (pixels)
166 num_sigma: int = 10, # Number of sigma values to test
167 threshold: float = 0.1, # Detection threshold (0.0-1.0)
168 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
169 # Watershed parameters
170 watershed_footprint_size: int = 3, # Local maxima footprint size
171 watershed_min_distance: int = 5, # Minimum distance between peaks
172 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods
173 # Preprocessing parameters
174 enable_preprocessing: bool = True,
175 gaussian_sigma: float = 1.0, # Gaussian blur sigma
176 median_disk_size: int = 1, # Median filter disk size
177 # Filtering parameters
178 min_cell_area: int = 10, # Minimum cell area (pixels)
179 max_cell_area: int = 1000, # Maximum cell area (pixels)
180 remove_border_cells: bool = True, # Remove cells touching image border
181 # Output parameters
182 return_segmentation_mask: bool = False
183) -> Tuple[np.ndarray, List[CellCountResult]]:
184 """
185 Count cells in single-channel image stack using various detection methods.
187 Args:
188 image_stack: 3D array (Z, Y, X) where each Z slice is processed independently
189 detection_method: Method for cell detection (see DetectionMethod enum)
190 min_sigma: Minimum blob size for blob detection methods
191 max_sigma: Maximum blob size for blob detection methods
192 num_sigma: Number of sigma values to test for blob detection
193 threshold: Detection threshold (method-dependent)
194 overlap: Maximum overlap between detected blobs
195 watershed_footprint_size: Footprint size for local maxima detection
196 watershed_min_distance: Minimum distance between watershed peaks
197 watershed_threshold_method: Thresholding method for watershed
198 enable_preprocessing: Apply Gaussian and median filtering
199 gaussian_sigma: Standard deviation for Gaussian blur
200 median_disk_size: Disk size for median filtering
201 min_cell_area: Minimum area for valid cells
202 max_cell_area: Maximum area for valid cells
203 remove_border_cells: Remove cells touching image borders
204 return_segmentation_mask: Return segmentation masks in output
206 Returns:
207 output_stack: Original image stack unchanged (Z, Y, X)
208 cell_count_results: List of CellCountResult objects for each slice
209 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True
210 """
211 if image_stack.ndim != 3:
212 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
214 results = []
215 segmentation_masks = []
217 # Store parameters for reproducibility (convert enums to values)
218 parameters = {
219 "detection_method": detection_method.value,
220 "min_sigma": min_sigma,
221 "max_sigma": max_sigma,
222 "num_sigma": num_sigma,
223 "threshold": threshold,
224 "overlap": overlap,
225 "watershed_footprint_size": watershed_footprint_size,
226 "watershed_min_distance": watershed_min_distance,
227 "watershed_threshold_method": watershed_threshold_method.value,
228 "gaussian_sigma": gaussian_sigma,
229 "median_disk_size": median_disk_size,
230 "min_cell_area": min_cell_area,
231 "max_cell_area": max_cell_area,
232 "remove_border_cells": remove_border_cells
233 }
235 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method")
237 for z_idx in range(image_stack.shape[0]):
238 slice_img = image_stack[z_idx].astype(np.float64)
240 # Apply preprocessing if enabled
241 if enable_preprocessing:
242 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size)
244 # Detect cells using specified method
245 result = _detect_cells_single_method(
246 slice_img, z_idx, detection_method.value, parameters
247 )
249 results.append(result)
251 # Create segmentation mask if requested
252 if return_segmentation_mask:
253 segmentation_mask = _create_segmentation_visualization(
254 slice_img, result.cell_positions, max_sigma, result.cell_areas, result.binary_mask
255 )
256 segmentation_masks.append(segmentation_mask)
258 # Always return segmentation masks (empty list if not requested)
259 # This ensures consistent return signature for special outputs system
260 return image_stack, results, segmentation_masks
263@numpy_func
264@special_outputs(("multi_channel_counts", materialize_cell_counts))
265def count_cells_multi_channel(
266 image_stack: np.ndarray,
267 chan_1: int, # Index of first channel (positional arg)
268 chan_2: int, # Index of second channel (positional arg)
269 # Detection parameters for channel 1 (all single-channel params available)
270 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
271 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels)
272 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels)
273 chan_1_num_sigma: int = 10, # Number of sigma values to test
274 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0)
275 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
276 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size
277 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks
278 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
279 chan_1_enable_preprocessing: bool = True, # Apply preprocessing
280 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma
281 chan_1_median_disk_size: int = 1, # Median filter disk size
282 chan_1_min_area: int = 10, # Minimum cell area (pixels)
283 chan_1_max_area: int = 1000, # Maximum cell area (pixels)
284 chan_1_remove_border_cells: bool = True, # Remove cells touching border
285 # Detection parameters for channel 2 (all single-channel params available)
286 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
287 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels)
288 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels)
289 chan_2_num_sigma: int = 10, # Number of sigma values to test
290 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0)
291 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
292 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size
293 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks
294 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
295 chan_2_enable_preprocessing: bool = True, # Apply preprocessing
296 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma
297 chan_2_median_disk_size: int = 1, # Median filter disk size
298 chan_2_min_area: int = 10, # Minimum cell area (pixels)
299 chan_2_max_area: int = 1000, # Maximum cell area (pixels)
300 chan_2_remove_border_cells: bool = True, # Remove cells touching border
301 # Colocalization parameters
302 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods
303 max_distance: float = 5.0, # Maximum distance for colocalization (pixels)
304 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method
305 intensity_threshold: float = 0.5, # Threshold for intensity-based methods
306 # Output parameters
307 return_colocalization_map: bool = False
308) -> Tuple[np.ndarray, List[MultiChannelResult]]:
309 """
310 Count cells in multi-channel image stack with colocalization analysis.
312 Each channel can be processed with independent parameters, providing the same
313 flexibility as the single-channel function for each channel separately.
315 Args:
316 image_stack: 3D array (Z, Y, X) where Z represents different channels
317 chan_1: Index of first channel in the stack (positional)
318 chan_2: Index of second channel in the stack (positional)
320 # Channel 1 detection parameters (same as single-channel function)
321 chan_1_method: Detection method for channel 1 (DetectionMethod enum)
322 chan_1_min_sigma: Minimum blob size for channel 1
323 chan_1_max_sigma: Maximum blob size for channel 1
324 chan_1_num_sigma: Number of sigma values to test for channel 1
325 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0)
326 chan_1_overlap: Maximum overlap between blobs for channel 1
327 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1
328 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1
329 chan_1_watershed_threshold_method: Thresholding method for channel 1
330 chan_1_enable_preprocessing: Apply preprocessing to channel 1
331 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1
332 chan_1_median_disk_size: Median filter size for channel 1
333 chan_1_min_area: Minimum cell area for channel 1
334 chan_1_max_area: Maximum cell area for channel 1
335 chan_1_remove_border_cells: Remove border cells for channel 1
337 # Channel 2 detection parameters (same as single-channel function)
338 chan_2_method: Detection method for channel 2 (DetectionMethod enum)
339 chan_2_min_sigma: Minimum blob size for channel 2
340 chan_2_max_sigma: Maximum blob size for channel 2
341 chan_2_num_sigma: Number of sigma values to test for channel 2
342 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0)
343 chan_2_overlap: Maximum overlap between blobs for channel 2
344 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2
345 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2
346 chan_2_watershed_threshold_method: Thresholding method for channel 2
347 chan_2_enable_preprocessing: Apply preprocessing to channel 2
348 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2
349 chan_2_median_disk_size: Median filter size for channel 2
350 chan_2_min_area: Minimum cell area for channel 2
351 chan_2_max_area: Maximum cell area for channel 2
352 chan_2_remove_border_cells: Remove border cells for channel 2
354 # Colocalization parameters
355 colocalization_method: Method for determining colocalization (ColocalizationMethod enum)
356 max_distance: Maximum distance for distance-based colocalization (pixels)
357 min_overlap_area: Minimum overlap fraction for area-based colocalization
358 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0)
359 return_colocalization_map: Return colocalization visualization
361 Returns:
362 output_stack: Original images or colocalization maps
363 multi_channel_results: List of MultiChannelResult objects
364 """
365 if image_stack.ndim != 3:
366 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
368 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]:
369 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}")
371 if chan_1 == chan_2:
372 raise ValueError("Channel 1 and Channel 2 must be different")
374 # Extract channel images
375 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency
376 chan_2_img = image_stack[chan_2:chan_2+1]
378 # Count cells in each channel separately using the single-channel function
379 # Channel 1 parameters (all explicit)
380 chan_1_params = {
381 "detection_method": chan_1_method,
382 "min_sigma": chan_1_min_sigma,
383 "max_sigma": chan_1_max_sigma,
384 "num_sigma": chan_1_num_sigma,
385 "threshold": chan_1_threshold,
386 "overlap": chan_1_overlap,
387 "watershed_footprint_size": chan_1_watershed_footprint_size,
388 "watershed_min_distance": chan_1_watershed_min_distance,
389 "watershed_threshold_method": chan_1_watershed_threshold_method,
390 "enable_preprocessing": chan_1_enable_preprocessing,
391 "gaussian_sigma": chan_1_gaussian_sigma,
392 "median_disk_size": chan_1_median_disk_size,
393 "min_cell_area": chan_1_min_area,
394 "max_cell_area": chan_1_max_area,
395 "remove_border_cells": chan_1_remove_border_cells,
396 "return_segmentation_mask": False
397 }
399 # Channel 2 parameters (all explicit)
400 chan_2_params = {
401 "detection_method": chan_2_method,
402 "min_sigma": chan_2_min_sigma,
403 "max_sigma": chan_2_max_sigma,
404 "num_sigma": chan_2_num_sigma,
405 "threshold": chan_2_threshold,
406 "overlap": chan_2_overlap,
407 "watershed_footprint_size": chan_2_watershed_footprint_size,
408 "watershed_min_distance": chan_2_watershed_min_distance,
409 "watershed_threshold_method": chan_2_watershed_threshold_method,
410 "enable_preprocessing": chan_2_enable_preprocessing,
411 "gaussian_sigma": chan_2_gaussian_sigma,
412 "median_disk_size": chan_2_median_disk_size,
413 "min_cell_area": chan_2_min_area,
414 "max_cell_area": chan_2_max_area,
415 "remove_border_cells": chan_2_remove_border_cells,
416 "return_segmentation_mask": False
417 }
419 # Process each channel
420 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params)
421 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params)
423 # Perform colocalization analysis
424 multi_results = []
425 output_stack = image_stack.copy()
427 # Since we're processing single slices from each channel, we only have one result each
428 chan_1_result = chan_1_results[0]
429 chan_2_result = chan_2_results[0]
431 # Analyze colocalization
432 coloc_result = _analyze_colocalization(
433 chan_1_result, chan_2_result, colocalization_method.value,
434 max_distance, min_overlap_area, intensity_threshold
435 )
437 multi_results.append(coloc_result)
439 # Create colocalization visualization if requested
440 if return_colocalization_map:
441 coloc_map = _create_colocalization_map(
442 image_stack[chan_1], image_stack[chan_2], coloc_result
443 )
444 # Replace one of the channels with the colocalization map
445 output_stack = np.stack([image_stack[chan_1], image_stack[chan_2], coloc_map])
447 return output_stack, multi_results
450def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager) -> str:
451 """Materialize single-channel cell counting results."""
452 # Generate output file paths based on the input path
453 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
454 base_path = path.replace('.pkl', '')
455 json_path = f"{base_path}.json"
456 csv_path = f"{base_path}_details.csv"
458 # Ensure output directory exists for disk backend
459 from pathlib import Path
460 from openhcs.constants.constants import Backend
461 output_dir = Path(json_path).parent
462 filemanager.ensure_directory(str(output_dir), Backend.DISK.value)
464 summary = {
465 "analysis_type": "single_channel_cell_counting",
466 "total_slices": len(data),
467 "results_per_slice": []
468 }
469 rows = []
471 total_cells = 0
472 for result in data:
473 total_cells += result.cell_count
475 # Add to summary
476 summary["results_per_slice"].append({
477 "slice_index": result.slice_index,
478 "method": result.method,
479 "cell_count": result.cell_count,
480 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0,
481 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0,
482 "parameters": result.parameters_used
483 })
485 # Add individual cell data to CSV
486 for i, (pos, area, intensity, confidence) in enumerate(zip(
487 result.cell_positions, result.cell_areas,
488 result.cell_intensities, result.detection_confidence
489 )):
490 rows.append({
491 'slice_index': result.slice_index,
492 'cell_id': f"slice_{result.slice_index}_cell_{i}",
493 'x_position': pos[0],
494 'y_position': pos[1],
495 'cell_area': area,
496 'cell_intensity': intensity,
497 'detection_confidence': confidence,
498 'detection_method': result.method
499 })
501 summary["total_cells_all_slices"] = total_cells
502 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0
504 # Save JSON summary (overwrite if exists)
505 json_content = json.dumps(summary, indent=2, default=str)
506 # Remove existing file if it exists using filemanager
507 if filemanager.exists(json_path, Backend.DISK.value):
508 filemanager.delete(json_path, Backend.DISK.value)
509 filemanager.save(json_content, json_path, Backend.DISK.value)
511 # Save CSV details (overwrite if exists)
512 if rows:
513 df = pd.DataFrame(rows)
514 csv_content = df.to_csv(index=False)
515 # Remove existing file if it exists using filemanager
516 if filemanager.exists(csv_path, Backend.DISK.value):
517 filemanager.delete(csv_path, Backend.DISK.value)
518 filemanager.save(csv_content, csv_path, Backend.DISK.value)
520 return json_path
523def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager) -> str:
524 """Materialize multi-channel cell counting and colocalization results."""
525 # Generate output file paths based on the input path
526 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
527 base_path = path.replace('.pkl', '')
528 json_path = f"{base_path}.json"
529 csv_path = f"{base_path}_details.csv"
531 # Ensure output directory exists for disk backend
532 from pathlib import Path
533 output_dir = Path(json_path).parent
534 filemanager.ensure_directory(str(output_dir), Backend.DISK.value)
536 summary = {
537 "analysis_type": "multi_channel_cell_counting_colocalization",
538 "total_slices": len(data),
539 "colocalization_summary": {
540 "total_chan_1_cells": 0,
541 "total_chan_2_cells": 0,
542 "total_colocalized": 0,
543 "average_colocalization_percentage": 0
544 },
545 "results_per_slice": []
546 }
548 # CSV for detailed analysis (csv_path already defined above)
549 rows = []
551 total_coloc_pct = 0
552 for result in data:
553 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count
554 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count
555 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count
556 total_coloc_pct += result.colocalization_percentage
558 # Add to summary
559 summary["results_per_slice"].append({
560 "slice_index": result.slice_index,
561 "chan_1_count": result.chan_1_results.cell_count,
562 "chan_2_count": result.chan_2_results.cell_count,
563 "colocalized_count": result.colocalized_count,
564 "colocalization_percentage": result.colocalization_percentage,
565 "chan_1_only": result.chan_1_only_count,
566 "chan_2_only": result.chan_2_only_count,
567 "colocalization_method": result.colocalization_method,
568 "colocalization_metrics": result.colocalization_metrics
569 })
571 # Add colocalization details to CSV
572 for i, pos in enumerate(result.overlap_positions):
573 rows.append({
574 'slice_index': result.slice_index,
575 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}",
576 'x_position': pos[0],
577 'y_position': pos[1],
578 'colocalization_method': result.colocalization_method
579 })
581 summary["colocalization_summary"]["average_colocalization_percentage"] = (
582 total_coloc_pct / len(data) if data else 0
583 )
585 # Save JSON summary (overwrite if exists)
586 json_content = json.dumps(summary, indent=2, default=str)
587 # Remove existing file if it exists using filemanager
588 if filemanager.exists(json_path, Backend.DISK.value):
589 filemanager.delete(json_path, Backend.DISK.value)
590 filemanager.save(json_content, json_path, Backend.DISK.value)
592 # Save CSV details (overwrite if exists)
593 if rows:
594 df = pd.DataFrame(rows)
595 csv_content = df.to_csv(index=False)
596 # Remove existing file if it exists using filemanager
597 if filemanager.exists(csv_path, Backend.DISK.value):
598 filemanager.delete(csv_path, Backend.DISK.value)
599 filemanager.save(csv_content, csv_path, Backend.DISK.value)
601 return json_path
604def _preprocess_image(image: np.ndarray, gaussian_sigma: float, median_disk_size: int) -> np.ndarray:
605 """Apply preprocessing to enhance cell detection."""
606 # Gaussian blur to reduce noise
607 if gaussian_sigma > 0:
608 image = gaussian(image, sigma=gaussian_sigma, preserve_range=True)
610 # Median filter to remove salt-and-pepper noise
611 if median_disk_size > 0:
612 image = median(image, disk(median_disk_size))
614 return image
617def _detect_cells_single_method(
618 image: np.ndarray,
619 slice_idx: int,
620 method: str,
621 params: Dict[str, Any]
622) -> CellCountResult:
623 """Detect cells using specified method."""
625 if method == DetectionMethod.BLOB_LOG.value:
626 return _detect_cells_blob_log(image, slice_idx, params)
627 elif method == DetectionMethod.BLOB_DOG.value:
628 return _detect_cells_blob_dog(image, slice_idx, params)
629 elif method == DetectionMethod.BLOB_DOH.value:
630 return _detect_cells_blob_doh(image, slice_idx, params)
631 elif method == DetectionMethod.WATERSHED.value:
632 return _detect_cells_watershed(image, slice_idx, params)
633 elif method == DetectionMethod.THRESHOLD.value:
634 return _detect_cells_threshold(image, slice_idx, params)
635 else:
636 raise ValueError(f"Unknown detection method: {method}")
639def _detect_cells_blob_log(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
640 """Detect cells using Laplacian of Gaussian blob detection."""
641 blobs = blob_log(
642 image,
643 min_sigma=params["min_sigma"],
644 max_sigma=params["max_sigma"],
645 num_sigma=params["num_sigma"],
646 threshold=params["threshold"],
647 overlap=params["overlap"]
648 )
650 # Extract positions, areas, and intensities
651 positions = []
652 areas = []
653 intensities = []
654 confidences = []
656 for blob in blobs:
657 y, x, sigma = blob
658 positions.append((float(x), float(y)))
660 # Estimate area from sigma (blob radius ≈ sigma * sqrt(2))
661 radius = sigma * np.sqrt(2)
662 area = np.pi * radius**2
663 areas.append(float(area))
665 # Sample intensity at blob center
666 intensity = float(image[int(y), int(x)])
667 intensities.append(intensity)
669 # Use sigma as confidence measure (larger blobs = higher confidence)
670 confidence = float(sigma / params["max_sigma"])
671 confidences.append(confidence)
673 # Filter by area constraints
674 filtered_data = _filter_by_area(
675 positions, areas, intensities, confidences,
676 params["min_cell_area"], params["max_cell_area"]
677 )
679 return CellCountResult(
680 slice_index=slice_idx,
681 method="blob_log",
682 cell_count=len(filtered_data[0]),
683 cell_positions=filtered_data[0],
684 cell_areas=filtered_data[1],
685 cell_intensities=filtered_data[2],
686 detection_confidence=filtered_data[3],
687 parameters_used=params
688 )
691def _detect_cells_blob_dog(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
692 """Detect cells using Difference of Gaussian blob detection."""
693 blobs = blob_dog(
694 image,
695 min_sigma=params["min_sigma"],
696 max_sigma=params["max_sigma"],
697 threshold=params["threshold"],
698 overlap=params["overlap"]
699 )
701 # Process similar to blob_log
702 positions = []
703 areas = []
704 intensities = []
705 confidences = []
707 for blob in blobs:
708 y, x, sigma = blob
709 positions.append((float(x), float(y)))
711 radius = sigma * np.sqrt(2)
712 area = np.pi * radius**2
713 areas.append(float(area))
715 intensity = float(image[int(y), int(x)])
716 intensities.append(intensity)
718 confidence = float(sigma / params["max_sigma"])
719 confidences.append(confidence)
721 filtered_data = _filter_by_area(
722 positions, areas, intensities, confidences,
723 params["min_cell_area"], params["max_cell_area"]
724 )
726 return CellCountResult(
727 slice_index=slice_idx,
728 method="blob_dog",
729 cell_count=len(filtered_data[0]),
730 cell_positions=filtered_data[0],
731 cell_areas=filtered_data[1],
732 cell_intensities=filtered_data[2],
733 detection_confidence=filtered_data[3],
734 parameters_used=params
735 )
738def _detect_cells_blob_doh(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
739 """Detect cells using Determinant of Hessian blob detection."""
740 blobs = blob_doh(
741 image,
742 min_sigma=params["min_sigma"],
743 max_sigma=params["max_sigma"],
744 num_sigma=params["num_sigma"],
745 threshold=params["threshold"],
746 overlap=params["overlap"]
747 )
749 # Process similar to other blob methods
750 positions = []
751 areas = []
752 intensities = []
753 confidences = []
755 for blob in blobs:
756 y, x, sigma = blob
757 positions.append((float(x), float(y)))
759 radius = sigma * np.sqrt(2)
760 area = np.pi * radius**2
761 areas.append(float(area))
763 intensity = float(image[int(y), int(x)])
764 intensities.append(intensity)
766 confidence = float(sigma / params["max_sigma"])
767 confidences.append(confidence)
769 filtered_data = _filter_by_area(
770 positions, areas, intensities, confidences,
771 params["min_cell_area"], params["max_cell_area"]
772 )
774 return CellCountResult(
775 slice_index=slice_idx,
776 method="blob_doh",
777 cell_count=len(filtered_data[0]),
778 cell_positions=filtered_data[0],
779 cell_areas=filtered_data[1],
780 cell_intensities=filtered_data[2],
781 detection_confidence=filtered_data[3],
782 parameters_used=params
783 )
786def _filter_by_area(
787 positions: List[Tuple[float, float]],
788 areas: List[float],
789 intensities: List[float],
790 confidences: List[float],
791 min_area: float,
792 max_area: float
793) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]:
794 """Filter detected cells by area constraints."""
795 filtered_positions = []
796 filtered_areas = []
797 filtered_intensities = []
798 filtered_confidences = []
800 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences):
801 if min_area <= area <= max_area:
802 filtered_positions.append(pos)
803 filtered_areas.append(area)
804 filtered_intensities.append(intensity)
805 filtered_confidences.append(confidence)
807 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences
810def _detect_cells_watershed(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
811 """Detect cells using watershed segmentation."""
812 # Determine threshold
813 if params["watershed_threshold_method"] == "otsu":
814 threshold_val = threshold_otsu(image)
815 elif params["watershed_threshold_method"] == "li":
816 threshold_val = threshold_li(image)
817 else:
818 threshold_val = float(params["watershed_threshold_method"])
820 # Create binary mask
821 binary = image > threshold_val
823 # Remove small objects and border objects
824 binary = remove_small_objects(binary, min_size=params["min_cell_area"])
825 if params["remove_border_cells"]:
826 binary = clear_border(binary)
828 # Find local maxima as seeds
829 distance = ndimage.distance_transform_edt(binary)
830 local_maxima = peak_local_max(
831 distance,
832 min_distance=params["watershed_min_distance"],
833 footprint=np.ones((params["watershed_footprint_size"], params["watershed_footprint_size"]))
834 )
836 # Convert coordinates to binary mask
837 local_maxima_mask = np.zeros_like(distance, dtype=bool)
838 if len(local_maxima) > 0:
839 local_maxima_mask[local_maxima[:, 0], local_maxima[:, 1]] = True
841 # Create markers for watershed
842 # Convert boolean mask to integer labels for connected components
843 markers = label(local_maxima_mask.astype(np.uint8))
845 # Apply watershed
846 labels = watershed(-distance, markers, mask=binary)
848 # Extract region properties
849 regions = regionprops(labels, intensity_image=image)
851 positions = []
852 areas = []
853 intensities = []
854 confidences = []
855 valid_labels = [] # Track which labels pass the size filter
857 for region in regions:
858 # Filter by area
859 if params["min_cell_area"] <= region.area <= params["max_cell_area"]:
860 # Centroid (note: regionprops returns (row, col) = (y, x))
861 y, x = region.centroid
862 positions.append((float(x), float(y)))
864 areas.append(float(region.area))
865 intensities.append(float(region.mean_intensity))
867 # Use area as confidence measure (normalized)
868 confidence = min(1.0, region.area / params["max_cell_area"])
869 confidences.append(confidence)
871 # Track this label as valid
872 valid_labels.append(region.label)
874 # Create filtered binary mask with only cells that passed size filter
875 filtered_binary_mask = np.isin(labels, valid_labels)
877 return CellCountResult(
878 slice_index=slice_idx,
879 method="watershed",
880 cell_count=len(positions),
881 cell_positions=positions,
882 cell_areas=areas,
883 cell_intensities=intensities,
884 detection_confidence=confidences,
885 parameters_used=params,
886 binary_mask=filtered_binary_mask # Only cells that passed all filters
887 )
890def _detect_cells_threshold(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
891 """Detect cells using simple thresholding and connected components."""
892 # Apply threshold
893 binary = image > params["threshold"] * image.max()
895 # Remove small objects and border objects
896 binary = remove_small_objects(binary, min_size=params["min_cell_area"])
897 if params["remove_border_cells"]:
898 binary = clear_border(binary)
900 # Label connected components
901 labels = label(binary)
902 regions = regionprops(labels, intensity_image=image)
904 positions = []
905 areas = []
906 intensities = []
907 confidences = []
908 valid_labels = [] # Track which labels pass the size filter
910 for region in regions:
911 # Filter by area
912 if params["min_cell_area"] <= region.area <= params["max_cell_area"]:
913 y, x = region.centroid
914 positions.append((float(x), float(y)))
916 areas.append(float(region.area))
917 intensities.append(float(region.mean_intensity))
919 # Use intensity as confidence measure
920 confidence = float(region.mean_intensity / image.max())
921 confidences.append(confidence)
923 # Track this label as valid
924 valid_labels.append(region.label)
926 # Create filtered binary mask with only cells that passed size filter
927 filtered_binary_mask = np.isin(labels, valid_labels)
929 return CellCountResult(
930 slice_index=slice_idx,
931 method="threshold",
932 cell_count=len(positions),
933 cell_positions=positions,
934 cell_areas=areas,
935 cell_intensities=intensities,
936 detection_confidence=confidences,
937 parameters_used=params,
938 binary_mask=filtered_binary_mask # Only cells that passed all filters
939 )
942def _analyze_colocalization(
943 chan_1_result: CellCountResult,
944 chan_2_result: CellCountResult,
945 method: str,
946 max_distance: float,
947 min_overlap_area: float,
948 intensity_threshold: float
949) -> MultiChannelResult:
950 """Analyze colocalization between two channels."""
952 if method == ColocalizationMethod.DISTANCE_BASED.value:
953 return _colocalization_distance_based(
954 chan_1_result, chan_2_result, max_distance
955 )
956 elif method == ColocalizationMethod.OVERLAP_AREA.value:
957 return _colocalization_overlap_based(
958 chan_1_result, chan_2_result, min_overlap_area
959 )
960 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value:
961 return _colocalization_intensity_based(
962 chan_1_result, chan_2_result, intensity_threshold
963 )
964 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value:
965 return _colocalization_manders(
966 chan_1_result, chan_2_result, intensity_threshold
967 )
968 else:
969 raise ValueError(f"Unknown colocalization method: {method}")
972def _colocalization_distance_based(
973 chan_1_result: CellCountResult,
974 chan_2_result: CellCountResult,
975 max_distance: float
976) -> MultiChannelResult:
977 """Perform distance-based colocalization analysis."""
979 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
980 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based")
982 # Convert positions to arrays
983 pos_1 = np.array(chan_1_result.cell_positions)
984 pos_2 = np.array(chan_2_result.cell_positions)
986 # Calculate pairwise distances
987 distances = cdist(pos_1, pos_2)
989 # Find colocalized pairs
990 colocalized_pairs = []
991 used_chan_2 = set()
993 for i in range(len(pos_1)):
994 # Find closest cell in channel 2
995 min_dist_idx = np.argmin(distances[i])
996 min_dist = distances[i, min_dist_idx]
998 # Check if within distance threshold and not already used
999 if min_dist <= max_distance and min_dist_idx not in used_chan_2:
1000 colocalized_pairs.append((i, min_dist_idx))
1001 used_chan_2.add(min_dist_idx)
1003 # Calculate metrics
1004 colocalized_count = len(colocalized_pairs)
1005 total_cells = len(pos_1) + len(pos_2)
1006 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1008 chan_1_only = len(pos_1) - colocalized_count
1009 chan_2_only = len(pos_2) - colocalized_count
1011 # Extract colocalized positions (average of paired positions)
1012 overlap_positions = []
1013 for i, j in colocalized_pairs:
1014 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist()
1015 overlap_positions.append(tuple(avg_pos))
1017 # Calculate additional metrics
1018 if colocalized_pairs:
1019 avg_distance = np.mean([distances[i, j] for i, j in colocalized_pairs])
1020 max_distance_found = np.max([distances[i, j] for i, j in colocalized_pairs])
1021 else:
1022 avg_distance = 0
1023 max_distance_found = 0
1025 metrics = {
1026 "average_colocalization_distance": float(avg_distance),
1027 "max_colocalization_distance": float(max_distance_found),
1028 "distance_threshold_used": max_distance
1029 }
1031 return MultiChannelResult(
1032 slice_index=chan_1_result.slice_index,
1033 chan_1_results=chan_1_result,
1034 chan_2_results=chan_2_result,
1035 colocalization_method="distance_based",
1036 colocalized_count=colocalized_count,
1037 colocalization_percentage=colocalization_percentage,
1038 chan_1_only_count=chan_1_only,
1039 chan_2_only_count=chan_2_only,
1040 colocalization_metrics=metrics,
1041 overlap_positions=overlap_positions
1042 )
1045def _create_empty_coloc_result(
1046 chan_1_result: CellCountResult,
1047 chan_2_result: CellCountResult,
1048 method: str
1049) -> MultiChannelResult:
1050 """Create empty colocalization result when no cells found."""
1051 return MultiChannelResult(
1052 slice_index=chan_1_result.slice_index,
1053 chan_1_results=chan_1_result,
1054 chan_2_results=chan_2_result,
1055 colocalization_method=method,
1056 colocalized_count=0,
1057 colocalization_percentage=0.0,
1058 chan_1_only_count=chan_1_result.cell_count,
1059 chan_2_only_count=chan_2_result.cell_count,
1060 colocalization_metrics={},
1061 overlap_positions=[]
1062 )
1065def _colocalization_overlap_based(
1066 chan_1_result: CellCountResult,
1067 chan_2_result: CellCountResult,
1068 min_overlap_area: float
1069) -> MultiChannelResult:
1070 """Perform area overlap-based colocalization analysis."""
1071 # This is a simplified implementation - in practice, you'd need actual segmentation masks
1072 # For now, we'll use distance as a proxy for overlap
1074 # Use distance-based method with smaller threshold as approximation
1075 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly
1077 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold)
1078 result.colocalization_method = "overlap_area"
1079 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area
1080 result.colocalization_metrics["note"] = "Approximated using distance-based method"
1082 return result
1085def _colocalization_intensity_based(
1086 chan_1_result: CellCountResult,
1087 chan_2_result: CellCountResult,
1088 intensity_threshold: float
1089) -> MultiChannelResult:
1090 """Perform intensity correlation-based colocalization analysis."""
1092 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1093 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation")
1095 # Use distance-based pairing first
1096 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0)
1098 # Filter pairs based on intensity correlation
1099 colocalized_pairs = []
1100 overlap_positions = []
1102 pos_1 = np.array(chan_1_result.cell_positions)
1103 pos_2 = np.array(chan_2_result.cell_positions)
1105 for i, (x1, y1) in enumerate(chan_1_result.cell_positions):
1106 for j, (x2, y2) in enumerate(chan_2_result.cell_positions):
1107 # Calculate distance
1108 dist = np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
1110 if dist <= 5.0: # Within reasonable distance
1111 # Check intensity correlation
1112 int_1 = chan_1_result.cell_intensities[i]
1113 int_2 = chan_2_result.cell_intensities[j]
1115 # Simple intensity correlation: both above threshold
1116 if int_1 >= intensity_threshold and int_2 >= intensity_threshold:
1117 colocalized_pairs.append((i, j))
1118 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2)
1119 overlap_positions.append(avg_pos)
1120 break # One-to-one mapping
1122 colocalized_count = len(colocalized_pairs)
1123 total_cells = len(pos_1) + len(pos_2)
1124 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1126 metrics = {
1127 "intensity_threshold_used": intensity_threshold,
1128 "correlation_method": "threshold_based"
1129 }
1131 return MultiChannelResult(
1132 slice_index=chan_1_result.slice_index,
1133 chan_1_results=chan_1_result,
1134 chan_2_results=chan_2_result,
1135 colocalization_method="intensity_correlation",
1136 colocalized_count=colocalized_count,
1137 colocalization_percentage=colocalization_percentage,
1138 chan_1_only_count=len(pos_1) - colocalized_count,
1139 chan_2_only_count=len(pos_2) - colocalized_count,
1140 colocalization_metrics=metrics,
1141 overlap_positions=overlap_positions
1142 )
1145def _colocalization_manders(
1146 chan_1_result: CellCountResult,
1147 chan_2_result: CellCountResult,
1148 intensity_threshold: float
1149) -> MultiChannelResult:
1150 """Calculate Manders colocalization coefficients."""
1152 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1153 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients")
1155 # Simplified Manders calculation based on detected cells
1156 # In practice, this would use pixel-level intensity analysis
1158 # Use intensity-based method as foundation
1159 intensity_result = _colocalization_intensity_based(
1160 chan_1_result, chan_2_result, intensity_threshold
1161 )
1163 # Calculate Manders-like coefficients
1164 total_int_1 = sum(chan_1_result.cell_intensities)
1165 total_int_2 = sum(chan_2_result.cell_intensities)
1167 # Simplified: assume colocalized cells contribute their full intensity
1168 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in
1169 [(i, j) for i in range(len(chan_1_result.cell_positions))
1170 for j in range(len(chan_2_result.cell_positions))
1171 if (i, j) in [(0, 0)]]) # Simplified placeholder
1173 # Manders coefficients (M1 and M2)
1174 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0
1175 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified
1177 intensity_result.colocalization_method = "manders_coefficients"
1178 intensity_result.colocalization_metrics.update({
1179 "manders_m1": m1,
1180 "manders_m2": m2,
1181 "note": "Simplified cell-based Manders calculation"
1182 })
1184 return intensity_result
1187def _create_segmentation_visualization(
1188 image: np.ndarray,
1189 positions: List[Tuple[float, float]],
1190 max_sigma: float,
1191 cell_areas: List[float] = None,
1192 binary_mask: np.ndarray = None
1193) -> np.ndarray:
1194 """Create segmentation visualization using actual binary mask if available."""
1196 # If we have the actual binary mask from detection, use it directly
1197 if binary_mask is not None:
1198 # Convert boolean mask to uint16 to match input image dtype
1199 # Use max intensity for detected cells, 0 for background
1200 max_intensity = image.max() if image.max() > 0 else 65535
1201 return (binary_mask * max_intensity).astype(image.dtype)
1203 # Fallback to original circular marker approach for blob methods
1204 visualization = image.copy()
1206 # Mark detected cells with their actual sizes
1207 for i, (x, y) in enumerate(positions):
1208 # Use actual cell area if available, otherwise fall back to max_sigma
1209 if cell_areas and i < len(cell_areas):
1210 # Convert area to radius (assuming circular cells)
1211 radius = np.sqrt(cell_areas[i] / np.pi)
1212 else:
1213 # Fallback to max_sigma for backward compatibility
1214 radius = max_sigma * 2
1216 # Create circular markers with actual cell size
1217 rr, cc = np.ogrid[:image.shape[0], :image.shape[1]]
1218 mask = (rr - y)**2 + (cc - x)**2 <= radius**2
1220 # Ensure indices are within bounds
1221 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1])
1222 mask = mask & valid_mask
1224 visualization[mask] = visualization.max() # Bright markers
1226 return visualization
1229def _create_colocalization_map(
1230 chan_1_img: np.ndarray,
1231 chan_2_img: np.ndarray,
1232 coloc_result: MultiChannelResult
1233) -> np.ndarray:
1234 """Create colocalization visualization map."""
1235 # Create RGB-like visualization
1236 coloc_map = np.zeros_like(chan_1_img)
1238 # Mark colocalized positions
1239 for x, y in coloc_result.overlap_positions:
1240 # Create markers for colocalized cells
1241 rr, cc = np.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]]
1242 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius
1244 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1])
1245 mask = mask & valid_mask
1247 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers
1249 return coloc_map