Coverage for openhcs/processing/backends/analysis/cell_counting_cupy.py: 14.4%

461 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2CuPy-based cell counting and multi-channel colocalization analysis for OpenHCS. 

3 

4This module provides comprehensive cell counting capabilities using CuPy and CuCIM, 

5supporting both single-channel and multi-channel analysis with various detection 

6methods and colocalization metrics. 

7""" 

8 

9import numpy as np # Keep for CPU fallbacks and data conversion 

10import logging 

11from typing import Dict, List, Tuple, Any, Optional, Union, TYPE_CHECKING 

12from dataclasses import dataclass 

13from enum import Enum 

14 

15# Import CuPy using the established optional import pattern 

16from openhcs.core.utils import optional_import 

17 

18cp = optional_import("cupy") 

19 

20# Type checking imports 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 import cupy as cp_typing 

23 

24logger = logging.getLogger(__name__) 

25 

26# Core scientific computing imports 

27import pandas as pd 

28import json 

29 

30# Optional imports using established pattern 

31cupyx_scipy = optional_import("cupyx.scipy") 

32cupyx_spatial_distance = optional_import("cupyx.scipy.spatial.distance") 

33cucim_feature = optional_import("cucim.skimage.feature") 

34cucim_filters = optional_import("cucim.skimage.filters") 

35cucim_segmentation = optional_import("cucim.skimage.segmentation") 

36cucim_morphology = optional_import("cucim.skimage.morphology") 

37cucim_measure = optional_import("cucim.skimage.measure") 

38cucim_exposure = optional_import("cucim.skimage.exposure") 

39 

40 

41 

42# OpenHCS imports 

43from openhcs.core.memory.decorators import cupy as cupy_func 

44from openhcs.core.pipeline.function_contracts import special_outputs 

45from openhcs.constants.constants import Backend 

46 

47 

48class DetectionMethod(Enum): 

49 """Cell detection methods available.""" 

50 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells) 

51 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose) 

52 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells) 

53 WATERSHED = "watershed" # Watershed segmentation (best for touching cells) 

54 THRESHOLD = "threshold" # Simple thresholding (fastest, basic) 

55 

56 

57class ColocalizationMethod(Enum): 

58 """Methods for multi-channel colocalization analysis.""" 

59 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap 

60 DISTANCE_BASED = "distance_based" # Based on centroid distances 

61 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation 

62 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients 

63 

64 

65class ThresholdMethod(Enum): 

66 """Automatic thresholding methods for watershed segmentation.""" 

67 OTSU = "otsu" # Otsu's method (good for bimodal histograms) 

68 LI = "li" # Li's method (good for low contrast images) 

69 MANUAL = "manual" # Manual threshold value (0.0-1.0) 

70 

71 

72 

73 

74 

75@dataclass 

76class CellCountResult: 

77 """Results for single-channel cell counting.""" 

78 slice_index: int 

79 method: str 

80 cell_count: int 

81 cell_positions: List[Tuple[float, float]] # (x, y) centroids 

82 cell_areas: List[float] 

83 cell_intensities: List[float] 

84 detection_confidence: List[float] 

85 parameters_used: Dict[str, Any] 

86 binary_mask: Optional[cp.ndarray] = None # Actual binary mask for segmentation methods 

87 

88 

89@dataclass 

90class MultiChannelResult: 

91 """Results for multi-channel cell counting and colocalization.""" 

92 slice_index: int 

93 chan_1_results: CellCountResult 

94 chan_2_results: CellCountResult 

95 colocalization_method: str 

96 colocalized_count: int 

97 colocalization_percentage: float 

98 chan_1_only_count: int 

99 chan_2_only_count: int 

100 colocalization_metrics: Dict[str, float] 

101 overlap_positions: List[Tuple[float, float]] 

102 

103 

104def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager) -> str: 

105 """Materialize cell counting results as analysis-ready CSV and JSON formats.""" 

106 

107 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}") 

108 

109 # Convert CuPy binary_mask to NumPy if present 

110 if data: 

111 for result in data: 

112 if result.binary_mask is not None: 

113 result.binary_mask = cp.asnumpy(result.binary_mask) 

114 if isinstance(result, MultiChannelResult): 

115 if result.chan_1_results.binary_mask is not None: 

116 result.chan_1_results.binary_mask = cp.asnumpy(result.chan_1_results.binary_mask) 

117 if result.chan_2_results.binary_mask is not None: 

118 result.chan_2_results.binary_mask = cp.asnumpy(result.chan_2_results.binary_mask) 

119 

120 # Determine if this is single-channel or multi-channel data 

121 if not data: 

122 logger.warning(f"🔬 CELL_COUNT_MATERIALIZE: No data to materialize") 

123 return path 

124 

125 is_multi_channel = isinstance(data[0], MultiChannelResult) 

126 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}") 

127 

128 if is_multi_channel: 

129 return _materialize_multi_channel_results(data, path, filemanager) 

130 else: 

131 return _materialize_single_channel_results(data, path, filemanager) 

132 

133 

134def materialize_segmentation_masks(data: List[cp.ndarray], path: str, filemanager) -> str: 

135 """Materialize segmentation masks as individual TIFF files.""" 

136 

137 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, masks_count={len(data) if data else 0}") 

138 

139 # Convert CuPy arrays to NumPy 

140 if data: 

141 data = [cp.asnumpy(mask) for mask in data] 

142 

143 if not data: 

144 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)") 

145 # Create empty summary file to indicate no masks were generated 

146 summary_path = path.replace('.pkl', '_segmentation_summary.txt') 

147 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n" 

148 filemanager.save(summary_content, summary_path, Backend.DISK.value) 

149 return summary_path 

150 

151 # Generate output file paths based on the input path 

152 base_path = path.replace('.pkl', '') 

153 

154 # Save each mask as a separate TIFF file 

155 for i, mask in enumerate(data): 

156 mask_filename = f"{base_path}_slice_{i:03d}.tif" 

157 

158 # Convert mask to appropriate dtype for saving (uint16 to match input images) 

159 if mask.dtype != np.uint16: 

160 # Normalize to uint16 range if needed 

161 if mask.max() <= 1.0: 

162 mask_uint16 = (mask * 65535).astype(np.uint16) 

163 else: 

164 mask_uint16 = mask.astype(np.uint16) 

165 else: 

166 mask_uint16 = mask 

167 

168 # Save using filemanager 

169 filemanager.save(mask_uint16, mask_filename, Backend.DISK.value) 

170 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename}") 

171 

172 # Return summary path 

173 summary_path = f"{base_path}_segmentation_summary.txt" 

174 summary_content = f"Segmentation masks saved: {len(data)} files\n" 

175 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n" 

176 summary_content += f"Mask dtype: {data[0].dtype}\n" 

177 summary_content += f"Mask shape: {data[0].shape}\n" 

178 

179 filemanager.save(summary_content, summary_path, Backend.DISK.value) 

180 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks") 

181 

182 return summary_path 

183 

184 

185@cupy_func 

186@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks)) 

187def count_cells_single_channel( 

188 image_stack: cp.ndarray, 

189 # Detection method and parameters 

190 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

191 # Blob detection parameters 

192 min_sigma: float = 1.0, # Minimum blob size (pixels) 

193 max_sigma: float = 10.0, # Maximum blob size (pixels) 

194 num_sigma: int = 10, # Number of sigma values to test 

195 threshold: float = 0.1, # Detection threshold (0.0-1.0) 

196 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

197 # Watershed parameters 

198 watershed_footprint_size: int = 3, # Local maxima footprint size 

199 watershed_min_distance: int = 5, # Minimum distance between peaks 

200 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods 

201 # Preprocessing parameters 

202 enable_preprocessing: bool = True, 

203 gaussian_sigma: float = 1.0, # Gaussian blur sigma 

204 median_disk_size: int = 1, # Median filter disk size 

205 # Filtering parameters 

206 min_cell_area: int = 10, # Minimum cell area (pixels) 

207 max_cell_area: int = 1000, # Maximum cell area (pixels) 

208 remove_border_cells: bool = True, # Remove cells touching image border 

209 # Output parameters 

210 return_segmentation_mask: bool = False 

211) -> Tuple[cp.ndarray, List[CellCountResult]]: 

212 """ 

213 Count cells in single-channel image stack using various detection methods. 

214 

215 Args: 

216 image_stack: 3D CuPy array (Z, Y, X) where each Z slice is processed independently 

217 detection_method: Method for cell detection (see DetectionMethod enum) 

218 min_sigma: Minimum blob size for blob detection methods 

219 max_sigma: Maximum blob size for blob detection methods 

220 num_sigma: Number of sigma values to test for blob detection 

221 threshold: Detection threshold (method-dependent) 

222 overlap: Maximum overlap between detected blobs 

223 watershed_footprint_size: Footprint size for local maxima detection 

224 watershed_min_distance: Minimum distance between watershed peaks 

225 watershed_threshold_method: Thresholding method for watershed 

226 enable_preprocessing: Apply Gaussian and median filtering 

227 gaussian_sigma: Standard deviation for Gaussian blur 

228 median_disk_size: Disk size for median filtering 

229 min_cell_area: Minimum area for valid cells 

230 max_cell_area: Maximum area for valid cells 

231 remove_border_cells: Remove cells touching image borders 

232 return_segmentation_mask: Return segmentation masks in output 

233  

234 Returns: 

235 output_stack: Original image stack unchanged (Z, Y, X) 

236 cell_count_results: List of CellCountResult objects for each slice 

237 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True 

238 """ 

239 if image_stack.ndim != 3: 

240 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

241 

242 results = [] 

243 segmentation_masks = [] 

244 

245 # Store parameters for reproducibility (convert enums to values) 

246 parameters = { 

247 "detection_method": detection_method.value, 

248 "min_sigma": min_sigma, 

249 "max_sigma": max_sigma, 

250 "num_sigma": num_sigma, 

251 "threshold": threshold, 

252 "overlap": overlap, 

253 "watershed_footprint_size": watershed_footprint_size, 

254 "watershed_min_distance": watershed_min_distance, 

255 "watershed_threshold_method": watershed_threshold_method.value, 

256 "gaussian_sigma": gaussian_sigma, 

257 "median_disk_size": median_disk_size, 

258 "min_cell_area": min_cell_area, 

259 "max_cell_area": max_cell_area, 

260 "remove_border_cells": remove_border_cells 

261 } 

262 

263 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method") 

264 

265 for z_idx in range(image_stack.shape[0]): 

266 slice_img = image_stack[z_idx].astype(cp.float64) 

267 

268 # Apply preprocessing if enabled 

269 if enable_preprocessing: 

270 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size) 

271 

272 # Detect cells using specified method 

273 result = _detect_cells_single_method( 

274 slice_img, z_idx, detection_method.value, parameters 

275 ) 

276 

277 results.append(result) 

278 

279 # Create segmentation mask if requested 

280 if return_segmentation_mask: 

281 segmentation_mask = _create_segmentation_visualization( 

282 slice_img, result.cell_positions, max_sigma, result.cell_areas, result.binary_mask 

283 ) 

284 segmentation_masks.append(segmentation_mask) 

285 

286 # Always return segmentation masks (empty list if not requested) 

287 # This ensures consistent return signature for special outputs system 

288 return image_stack, results, segmentation_masks 

289 

290 

291@cupy_func 

292@special_outputs(("multi_channel_counts", materialize_cell_counts)) 

293def count_cells_multi_channel( 

294 image_stack: cp.ndarray, 

295 chan_1: int, # Index of first channel (positional arg) 

296 chan_2: int, # Index of second channel (positional arg) 

297 # Detection parameters for channel 1 (all single-channel params available) 

298 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

299 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels) 

300 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels) 

301 chan_1_num_sigma: int = 10, # Number of sigma values to test 

302 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

303 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

304 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size 

305 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks 

306 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

307 chan_1_enable_preprocessing: bool = True, # Apply preprocessing 

308 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

309 chan_1_median_disk_size: int = 1, # Median filter disk size 

310 chan_1_min_area: int = 10, # Minimum cell area (pixels) 

311 chan_1_max_area: int = 1000, # Maximum cell area (pixels) 

312 chan_1_remove_border_cells: bool = True, # Remove cells touching border 

313 # Detection parameters for channel 2 (all single-channel params available) 

314 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

315 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels) 

316 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels) 

317 chan_2_num_sigma: int = 10, # Number of sigma values to test 

318 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

319 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

320 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size 

321 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks 

322 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

323 chan_2_enable_preprocessing: bool = True, # Apply preprocessing 

324 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

325 chan_2_median_disk_size: int = 1, # Median filter disk size 

326 chan_2_min_area: int = 10, # Minimum cell area (pixels) 

327 chan_2_max_area: int = 1000, # Maximum cell area (pixels) 

328 chan_2_remove_border_cells: bool = True, # Remove cells touching border 

329 # Colocalization parameters 

330 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods 

331 max_distance: float = 5.0, # Maximum distance for colocalization (pixels) 

332 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method 

333 intensity_threshold: float = 0.5, # Threshold for intensity-based methods 

334 # Output parameters 

335 return_colocalization_map: bool = False 

336) -> Tuple[cp.ndarray, List[MultiChannelResult]]: 

337 """ 

338 Count cells in multi-channel image stack with colocalization analysis. 

339 

340 Each channel can be processed with independent parameters, providing the same 

341 flexibility as the single-channel function for each channel separately. 

342 

343 Args: 

344 image_stack: 3D CuPy array (Z, Y, X) where Z represents different channels 

345 chan_1: Index of first channel in the stack (positional) 

346 chan_2: Index of second channel in the stack (positional) 

347 

348 # Channel 1 detection parameters (same as single-channel function) 

349 chan_1_method: Detection method for channel 1 (DetectionMethod enum) 

350 chan_1_min_sigma: Minimum blob size for channel 1 

351 chan_1_max_sigma: Maximum blob size for channel 1 

352 chan_1_num_sigma: Number of sigma values to test for channel 1 

353 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0) 

354 chan_1_overlap: Maximum overlap between blobs for channel 1 

355 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1 

356 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1 

357 chan_1_watershed_threshold_method: Thresholding method for channel 1 

358 chan_1_enable_preprocessing: Apply preprocessing to channel 1 

359 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1 

360 chan_1_median_disk_size: Median filter size for channel 1 

361 chan_1_min_area: Minimum cell area for channel 1 

362 chan_1_max_area: Maximum cell area for channel 1 

363 chan_1_remove_border_cells: Remove border cells for channel 1 

364 

365 # Channel 2 detection parameters (same as single-channel function) 

366 chan_2_method: Detection method for channel 2 (DetectionMethod enum) 

367 chan_2_min_sigma: Minimum blob size for channel 2 

368 chan_2_max_sigma: Maximum blob size for channel 2 

369 chan_2_num_sigma: Number of sigma values to test for channel 2 

370 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0) 

371 chan_2_overlap: Maximum overlap between blobs for channel 2 

372 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2 

373 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2 

374 chan_2_watershed_threshold_method: Thresholding method for channel 2 

375 chan_2_enable_preprocessing: Apply preprocessing to channel 2 

376 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2 

377 chan_2_median_disk_size: Median filter size for channel 2 

378 chan_2_min_area: Minimum cell area for channel 2 

379 chan_2_max_area: Maximum cell area for channel 2 

380 chan_2_remove_border_cells: Remove border cells for channel 2 

381 

382 # Colocalization parameters 

383 colocalization_method: Method for determining colocalization (ColocalizationMethod enum) 

384 max_distance: Maximum distance for distance-based colocalization (pixels) 

385 min_overlap_area: Minimum overlap fraction for area-based colocalization 

386 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0) 

387 return_colocalization_map: Return colocalization visualization 

388 

389 Returns: 

390 output_stack: Original images or colocalization maps 

391 multi_channel_results: List of MultiChannelResult objects 

392 """ 

393 if image_stack.ndim != 3: 

394 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

395 

396 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]: 

397 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}") 

398 

399 if chan_1 == chan_2: 

400 raise ValueError("Channel 1 and Channel 2 must be different") 

401 

402 # Extract channel images 

403 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency 

404 chan_2_img = image_stack[chan_2:chan_2+1] 

405 

406 # Count cells in each channel separately using the single-channel function 

407 # Channel 1 parameters (all explicit) 

408 chan_1_params = { 

409 "detection_method": chan_1_method, 

410 "min_sigma": chan_1_min_sigma, 

411 "max_sigma": chan_1_max_sigma, 

412 "num_sigma": chan_1_num_sigma, 

413 "threshold": chan_1_threshold, 

414 "overlap": chan_1_overlap, 

415 "watershed_footprint_size": chan_1_watershed_footprint_size, 

416 "watershed_min_distance": chan_1_watershed_min_distance, 

417 "watershed_threshold_method": chan_1_watershed_threshold_method, 

418 "enable_preprocessing": chan_1_enable_preprocessing, 

419 "gaussian_sigma": chan_1_gaussian_sigma, 

420 "median_disk_size": chan_1_median_disk_size, 

421 "min_cell_area": chan_1_min_area, 

422 "max_cell_area": chan_1_max_area, 

423 "remove_border_cells": chan_1_remove_border_cells, 

424 "return_segmentation_mask": False 

425 } 

426 

427 # Channel 2 parameters (all explicit) 

428 chan_2_params = { 

429 "detection_method": chan_2_method, 

430 "min_sigma": chan_2_min_sigma, 

431 "max_sigma": chan_2_max_sigma, 

432 "num_sigma": chan_2_num_sigma, 

433 "threshold": chan_2_threshold, 

434 "overlap": chan_2_overlap, 

435 "watershed_footprint_size": chan_2_watershed_footprint_size, 

436 "watershed_min_distance": chan_2_watershed_min_distance, 

437 "watershed_threshold_method": chan_2_watershed_threshold_method, 

438 "enable_preprocessing": chan_2_enable_preprocessing, 

439 "gaussian_sigma": chan_2_gaussian_sigma, 

440 "median_disk_size": chan_2_median_disk_size, 

441 "min_cell_area": chan_2_min_area, 

442 "max_cell_area": chan_2_max_area, 

443 "remove_border_cells": chan_2_remove_border_cells, 

444 "return_segmentation_mask": False 

445 } 

446 

447 # Process each channel 

448 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params) 

449 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params) 

450 

451 # Perform colocalization analysis 

452 multi_results = [] 

453 output_stack = image_stack.copy() 

454 

455 # Since we're processing single slices from each channel, we only have one result each 

456 chan_1_result = chan_1_results[0] 

457 chan_2_result = chan_2_results[0] 

458 

459 # Analyze colocalization 

460 coloc_result = _analyze_colocalization( 

461 chan_1_result, chan_2_result, colocalization_method.value, 

462 max_distance, min_overlap_area, intensity_threshold 

463 ) 

464 

465 multi_results.append(coloc_result) 

466 

467 # Create colocalization visualization if requested 

468 if return_colocalization_map: 

469 coloc_map = _create_colocalization_map( 

470 image_stack[chan_1], image_stack[chan_2], coloc_result 

471 ) 

472 # Replace one of the channels with the colocalization map 

473 output_stack = cp.stack([image_stack[chan_1], image_stack[chan_2], coloc_map]) 

474 

475 return output_stack, multi_results 

476 

477 

478def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager) -> str: 

479 """Materialize single-channel cell counting results.""" 

480 # Generate output file paths based on the input path 

481 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

482 base_path = path.replace('.pkl', '') 

483 json_path = f"{base_path}.json" 

484 csv_path = f"{base_path}_details.csv" 

485 

486 # Ensure output directory exists for disk backend 

487 from pathlib import Path 

488 from openhcs.constants.constants import Backend 

489 output_dir = Path(json_path).parent 

490 filemanager.ensure_directory(str(output_dir), Backend.DISK.value) 

491 

492 summary = { 

493 "analysis_type": "single_channel_cell_counting", 

494 "total_slices": len(data), 

495 "results_per_slice": [] 

496 } 

497 rows = [] 

498 

499 total_cells = 0 

500 for result in data: 

501 total_cells += result.cell_count 

502 

503 # Add to summary 

504 summary["results_per_slice"].append({ 

505 "slice_index": result.slice_index, 

506 "method": result.method, 

507 "cell_count": result.cell_count, 

508 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0, 

509 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0, 

510 "parameters": result.parameters_used 

511 }) 

512 

513 # Add individual cell data to CSV 

514 for i, (pos, area, intensity, confidence) in enumerate(zip( 

515 result.cell_positions, result.cell_areas, 

516 result.cell_intensities, result.detection_confidence 

517 )): 

518 rows.append({ 

519 'slice_index': result.slice_index, 

520 'cell_id': f"slice_{result.slice_index}_cell_{i}", 

521 'x_position': pos[0], 

522 'y_position': pos[1], 

523 'cell_area': area, 

524 'cell_intensity': intensity, 

525 'detection_confidence': confidence, 

526 'detection_method': result.method 

527 }) 

528 

529 summary["total_cells_all_slices"] = total_cells 

530 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0 

531 

532 # Save JSON summary (overwrite if exists) 

533 json_content = json.dumps(summary, indent=2, default=str) 

534 # Remove existing file if it exists using filemanager 

535 if filemanager.exists(json_path, Backend.DISK.value): 

536 filemanager.delete(json_path, Backend.DISK.value) 

537 filemanager.save(json_content, json_path, Backend.DISK.value) 

538 

539 # Save CSV details (overwrite if exists) 

540 if rows: 

541 df = pd.DataFrame(rows) 

542 csv_content = df.to_csv(index=False) 

543 # Remove existing file if it exists using filemanager 

544 if filemanager.exists(csv_path, Backend.DISK.value): 

545 filemanager.delete(csv_path, Backend.DISK.value) 

546 filemanager.save(csv_content, csv_path, Backend.DISK.value) 

547 

548 return json_path 

549 

550 

551def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager) -> str: 

552 """Materialize multi-channel cell counting and colocalization results.""" 

553 # Generate output file paths based on the input path 

554 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

555 base_path = path.replace('.pkl', '') 

556 json_path = f"{base_path}.json" 

557 csv_path = f"{base_path}_details.csv" 

558 

559 # Ensure output directory exists for disk backend 

560 from pathlib import Path 

561 output_dir = Path(json_path).parent 

562 filemanager.ensure_directory(str(output_dir), Backend.DISK.value) 

563 

564 summary = { 

565 "analysis_type": "multi_channel_cell_counting_colocalization", 

566 "total_slices": len(data), 

567 "colocalization_summary": { 

568 "total_chan_1_cells": 0, 

569 "total_chan_2_cells": 0, 

570 "total_colocalized": 0, 

571 "average_colocalization_percentage": 0 

572 }, 

573 "results_per_slice": [] 

574 } 

575 

576 # CSV for detailed analysis (csv_path already defined above) 

577 rows = [] 

578 

579 total_coloc_pct = 0 

580 for result in data: 

581 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count 

582 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count 

583 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count 

584 total_coloc_pct += result.colocalization_percentage 

585 

586 # Add to summary 

587 summary["results_per_slice"].append({ 

588 "slice_index": result.slice_index, 

589 "chan_1_count": result.chan_1_results.cell_count, 

590 "chan_2_count": result.chan_2_results.cell_count, 

591 "colocalized_count": result.colocalized_count, 

592 "colocalization_percentage": result.colocalization_percentage, 

593 "chan_1_only": result.chan_1_only_count, 

594 "chan_2_only": result.chan_2_only_count, 

595 "colocalization_method": result.colocalization_method, 

596 "colocalization_metrics": result.colocalization_metrics 

597 }) 

598 

599 # Add colocalization details to CSV 

600 for i, pos in enumerate(result.overlap_positions): 

601 rows.append({ 

602 'slice_index': result.slice_index, 

603 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}", 

604 'x_position': pos[0], 

605 'y_position': pos[1], 

606 'colocalization_method': result.colocalization_method 

607 }) 

608 

609 summary["colocalization_summary"]["average_colocalization_percentage"] = ( 

610 total_coloc_pct / len(data) if data else 0 

611 ) 

612 

613 # Save JSON summary (overwrite if exists) 

614 json_content = json.dumps(summary, indent=2, default=str) 

615 # Remove existing file if it exists using filemanager 

616 if filemanager.exists(json_path, Backend.DISK.value): 

617 filemanager.delete(json_path, Backend.DISK.value) 

618 filemanager.save(json_content, json_path, Backend.DISK.value) 

619 

620 # Save CSV details (overwrite if exists) 

621 if rows: 

622 df = pd.DataFrame(rows) 

623 csv_content = df.to_csv(index=False) 

624 # Remove existing file if it exists using filemanager 

625 if filemanager.exists(csv_path, Backend.DISK.value): 

626 filemanager.delete(csv_path, Backend.DISK.value) 

627 filemanager.save(csv_content, csv_path, Backend.DISK.value) 

628 

629 return json_path 

630 

631 

632def _preprocess_image(image: cp.ndarray, gaussian_sigma: float, median_disk_size: int) -> cp.ndarray: 

633 """Apply preprocessing to enhance cell detection.""" 

634 # Gaussian blur to reduce noise 

635 if gaussian_sigma > 0: 

636 image = gaussian(image, sigma=gaussian_sigma, preserve_range=True) 

637 

638 # Median filter to remove salt-and-pepper noise 

639 if median_disk_size > 0: 

640 image = median(image, disk(median_disk_size)) 

641 

642 return image 

643 

644 

645def _detect_cells_single_method( 

646 image: cp.ndarray, 

647 slice_idx: int, 

648 method: str, 

649 params: Dict[str, Any] 

650) -> CellCountResult: 

651 """Detect cells using specified method.""" 

652 

653 if method == DetectionMethod.BLOB_LOG.value: 

654 return _detect_cells_blob_log(image, slice_idx, params) 

655 elif method == DetectionMethod.BLOB_DOG.value: 

656 return _detect_cells_blob_dog(image, slice_idx, params) 

657 elif method == DetectionMethod.BLOB_DOH.value: 

658 return _detect_cells_blob_doh(image, slice_idx, params) 

659 elif method == DetectionMethod.WATERSHED.value: 

660 return _detect_cells_watershed(image, slice_idx, params) 

661 elif method == DetectionMethod.THRESHOLD.value: 

662 return _detect_cells_threshold(image, slice_idx, params) 

663 else: 

664 raise ValueError(f"Unknown detection method: {method}") 

665 

666 

667def _detect_cells_blob_log(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

668 """Detect cells using Laplacian of Gaussian blob detection.""" 

669 blobs = blob_log( 

670 image, 

671 min_sigma=params["min_sigma"], 

672 max_sigma=params["max_sigma"], 

673 num_sigma=params["num_sigma"], 

674 threshold=params["threshold"], 

675 overlap=params["overlap"] 

676 ) 

677 

678 # Extract positions, areas, and intensities 

679 positions = [] 

680 areas = [] 

681 intensities = [] 

682 confidences = [] 

683 

684 for blob in blobs: 

685 y, x, sigma = blob 

686 positions.append((float(x), float(y))) 

687 

688 # Estimate area from sigma (blob radius ≈ sigma * sqrt(2)) 

689 radius = sigma * cp.sqrt(2) 

690 area = cp.pi * radius**2 

691 areas.append(float(area)) 

692 

693 # Sample intensity at blob center 

694 intensity = float(image[int(y), int(x)]) 

695 intensities.append(intensity) 

696 

697 # Use sigma as confidence measure (larger blobs = higher confidence) 

698 confidence = float(sigma / params["max_sigma"]) 

699 confidences.append(confidence) 

700 

701 # Filter by area constraints 

702 filtered_data = _filter_by_area( 

703 positions, areas, intensities, confidences, 

704 params["min_cell_area"], params["max_cell_area"] 

705 ) 

706 

707 return CellCountResult( 

708 slice_index=slice_idx, 

709 method="blob_log", 

710 cell_count=len(filtered_data[0]), 

711 cell_positions=filtered_data[0], 

712 cell_areas=filtered_data[1], 

713 cell_intensities=filtered_data[2], 

714 detection_confidence=filtered_data[3], 

715 parameters_used=params 

716 ) 

717 

718 

719def _detect_cells_blob_dog(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

720 """Detect cells using Difference of Gaussian blob detection.""" 

721 blobs = blob_dog( 

722 image, 

723 min_sigma=params["min_sigma"], 

724 max_sigma=params["max_sigma"], 

725 threshold=params["threshold"], 

726 overlap=params["overlap"] 

727 ) 

728 

729 # Process similar to blob_log 

730 positions = [] 

731 areas = [] 

732 intensities = [] 

733 confidences = [] 

734 

735 for blob in blobs: 

736 y, x, sigma = blob 

737 positions.append((float(x), float(y))) 

738 

739 radius = sigma * cp.sqrt(2) 

740 area = cp.pi * radius**2 

741 areas.append(float(area)) 

742 

743 intensity = float(image[int(y), int(x)]) 

744 intensities.append(intensity) 

745 

746 confidence = float(sigma / params["max_sigma"]) 

747 confidences.append(confidence) 

748 

749 filtered_data = _filter_by_area( 

750 positions, areas, intensities, confidences, 

751 params["min_cell_area"], params["max_cell_area"] 

752 ) 

753 

754 return CellCountResult( 

755 slice_index=slice_idx, 

756 method="blob_dog", 

757 cell_count=len(filtered_data[0]), 

758 cell_positions=filtered_data[0], 

759 cell_areas=filtered_data[1], 

760 cell_intensities=filtered_data[2], 

761 detection_confidence=filtered_data[3], 

762 parameters_used=params 

763 ) 

764 

765 

766def _detect_cells_blob_doh(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

767 """Detect cells using Determinant of Hessian blob detection.""" 

768 blobs = blob_doh( 

769 image, 

770 min_sigma=params["min_sigma"], 

771 max_sigma=params["max_sigma"], 

772 num_sigma=params["num_sigma"], 

773 threshold=params["threshold"], 

774 overlap=params["overlap"] 

775 ) 

776 

777 # Process similar to other blob methods 

778 positions = [] 

779 areas = [] 

780 intensities = [] 

781 confidences = [] 

782 

783 for blob in blobs: 

784 y, x, sigma = blob 

785 positions.append((float(x), float(y))) 

786 

787 radius = sigma * cp.sqrt(2) 

788 area = cp.pi * radius**2 

789 areas.append(float(area)) 

790 

791 intensity = float(image[int(y), int(x)]) 

792 intensities.append(intensity) 

793 

794 confidence = float(sigma / params["max_sigma"]) 

795 confidences.append(confidence) 

796 

797 filtered_data = _filter_by_area( 

798 positions, areas, intensities, confidences, 

799 params["min_cell_area"], params["max_cell_area"] 

800 ) 

801 

802 return CellCountResult( 

803 slice_index=slice_idx, 

804 method="blob_doh", 

805 cell_count=len(filtered_data[0]), 

806 cell_positions=filtered_data[0], 

807 cell_areas=filtered_data[1], 

808 cell_intensities=filtered_data[2], 

809 detection_confidence=filtered_data[3], 

810 parameters_used=params 

811 ) 

812 

813 

814def _filter_by_area( 

815 positions: List[Tuple[float, float]], 

816 areas: List[float], 

817 intensities: List[float], 

818 confidences: List[float], 

819 min_area: float, 

820 max_area: float 

821) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]: 

822 """Filter detected cells by area constraints.""" 

823 filtered_positions = [] 

824 filtered_areas = [] 

825 filtered_intensities = [] 

826 filtered_confidences = [] 

827 

828 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences): 

829 if min_area <= area <= max_area: 

830 filtered_positions.append(pos) 

831 filtered_areas.append(area) 

832 filtered_intensities.append(intensity) 

833 filtered_confidences.append(confidence) 

834 

835 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences 

836 

837 

838def _detect_cells_watershed(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

839 """Detect cells using watershed segmentation.""" 

840 # Determine threshold 

841 if params["watershed_threshold_method"] == "otsu": 

842 threshold_val = threshold_otsu(image) 

843 elif params["watershed_threshold_method"] == "li": 

844 threshold_val = threshold_li(image) 

845 else: 

846 threshold_val = float(params["watershed_threshold_method"]) 

847 

848 # Create binary mask 

849 binary = image > threshold_val 

850 

851 # Remove small objects and border objects 

852 binary = remove_small_objects(binary, min_size=params["min_cell_area"]) 

853 if params["remove_border_cells"]: 

854 binary = clear_border(binary) 

855 

856 # Find local maxima as seeds 

857 distance = ndimage.distance_transform_edt(binary) 

858 local_maxima = peak_local_max( 

859 distance, 

860 min_distance=params["watershed_min_distance"], 

861 footprint=cp.ones((params["watershed_footprint_size"], params["watershed_footprint_size"])) 

862 ) 

863 

864 # Convert coordinates to binary mask 

865 local_maxima_mask = cp.zeros_like(distance, dtype=bool) 

866 if len(local_maxima) > 0: 

867 local_maxima_mask[local_maxima[:, 0], local_maxima[:, 1]] = True 

868 

869 # Create markers for watershed 

870 # Convert boolean mask to integer labels for connected components 

871 markers = label(local_maxima_mask.astype(cp.uint8)) 

872 

873 # Apply watershed 

874 labels = watershed(-distance, markers, mask=binary) 

875 

876 # Extract region properties 

877 regions = regionprops(labels, intensity_image=image) 

878 

879 positions = [] 

880 areas = [] 

881 intensities = [] 

882 confidences = [] 

883 valid_labels = [] # Track which labels pass the size filter 

884 

885 for region in regions: 

886 # Filter by area 

887 if params["min_cell_area"] <= region.area <= params["max_cell_area"]: 

888 # Centroid (note: regionprops returns (row, col) = (y, x)) 

889 y, x = region.centroid 

890 positions.append((float(x), float(y))) 

891 

892 areas.append(float(region.area)) 

893 intensities.append(float(region.mean_intensity)) 

894 

895 # Use area as confidence measure (normalized) 

896 confidence = min(1.0, region.area / params["max_cell_area"]) 

897 confidences.append(confidence) 

898 

899 # Track this label as valid 

900 valid_labels.append(region.label) 

901 

902 # Create filtered binary mask with only cells that passed size filter 

903 filtered_binary_mask = cp.isin(labels, cp.array(valid_labels)) 

904 

905 return CellCountResult( 

906 slice_index=slice_idx, 

907 method="watershed", 

908 cell_count=len(positions), 

909 cell_positions=positions, 

910 cell_areas=areas, 

911 cell_intensities=intensities, 

912 detection_confidence=confidences, 

913 parameters_used=params, 

914 binary_mask=filtered_binary_mask # Only cells that passed all filters 

915 ) 

916 

917 

918def _detect_cells_threshold(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

919 """Detect cells using simple thresholding and connected components.""" 

920 # Apply threshold 

921 binary = image > params["threshold"] * image.max() 

922 

923 # Remove small objects and border objects 

924 binary = remove_small_objects(binary, min_size=params["min_cell_area"]) 

925 if params["remove_border_cells"]: 

926 binary = clear_border(binary) 

927 

928 # Label connected components 

929 labels = label(binary) 

930 regions = regionprops(labels, intensity_image=image) 

931 

932 positions = [] 

933 areas = [] 

934 intensities = [] 

935 confidences = [] 

936 valid_labels = [] # Track which labels pass the size filter 

937 

938 for region in regions: 

939 # Filter by area 

940 if params["min_cell_area"] <= region.area <= params["max_cell_area"]: 

941 y, x = region.centroid 

942 positions.append((float(x), float(y))) 

943 

944 areas.append(float(region.area)) 

945 intensities.append(float(region.mean_intensity)) 

946 

947 # Use intensity as confidence measure 

948 confidence = float(region.mean_intensity / image.max()) 

949 confidences.append(confidence) 

950 

951 # Track this label as valid 

952 valid_labels.append(region.label) 

953 

954 # Create filtered binary mask with only cells that passed size filter 

955 filtered_binary_mask = cp.isin(labels, cp.array(valid_labels)) 

956 

957 return CellCountResult( 

958 slice_index=slice_idx, 

959 method="threshold", 

960 cell_count=len(positions), 

961 cell_positions=positions, 

962 cell_areas=areas, 

963 cell_intensities=intensities, 

964 detection_confidence=confidences, 

965 parameters_used=params, 

966 binary_mask=filtered_binary_mask # Only cells that passed all filters 

967 ) 

968 

969 

970def _analyze_colocalization( 

971 chan_1_result: CellCountResult, 

972 chan_2_result: CellCountResult, 

973 method: str, 

974 max_distance: float, 

975 min_overlap_area: float, 

976 intensity_threshold: float 

977) -> MultiChannelResult: 

978 """Analyze colocalization between two channels.""" 

979 

980 if method == ColocalizationMethod.DISTANCE_BASED.value: 

981 return _colocalization_distance_based( 

982 chan_1_result, chan_2_result, max_distance 

983 ) 

984 elif method == ColocalizationMethod.OVERLAP_AREA.value: 

985 return _colocalization_overlap_based( 

986 chan_1_result, chan_2_result, min_overlap_area 

987 ) 

988 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value: 

989 return _colocalization_intensity_based( 

990 chan_1_result, chan_2_result, intensity_threshold 

991 ) 

992 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value: 

993 return _colocalization_manders( 

994 chan_1_result, chan_2_result, intensity_threshold 

995 ) 

996 else: 

997 raise ValueError(f"Unknown colocalization method: {method}") 

998 

999 

1000def _colocalization_distance_based( 

1001 chan_1_result: CellCountResult, 

1002 chan_2_result: CellCountResult, 

1003 max_distance: float 

1004) -> MultiChannelResult: 

1005 """Perform distance-based colocalization analysis.""" 

1006 

1007 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1008 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based") 

1009 

1010 # Convert positions to arrays 

1011 pos_1 = cp.array(chan_1_result.cell_positions) 

1012 pos_2 = cp.array(chan_2_result.cell_positions) 

1013 

1014 # Calculate pairwise distances 

1015 distances = cdist(pos_1, pos_2) 

1016 

1017 # Find colocalized pairs 

1018 colocalized_pairs = [] 

1019 used_chan_2 = set() 

1020 

1021 for i in range(len(pos_1)): 

1022 # Find closest cell in channel 2 

1023 min_dist_idx = cp.argmin(distances[i]) 

1024 min_dist = distances[i, min_dist_idx] 

1025 

1026 # Check if within distance threshold and not already used 

1027 if min_dist <= max_distance and min_dist_idx not in used_chan_2: 

1028 colocalized_pairs.append((i, min_dist_idx)) 

1029 used_chan_2.add(min_dist_idx) 

1030 

1031 # Calculate metrics 

1032 colocalized_count = len(colocalized_pairs) 

1033 total_cells = len(pos_1) + len(pos_2) 

1034 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1035 

1036 chan_1_only = len(pos_1) - colocalized_count 

1037 chan_2_only = len(pos_2) - colocalized_count 

1038 

1039 # Extract colocalized positions (average of paired positions) 

1040 overlap_positions = [] 

1041 for i, j in colocalized_pairs: 

1042 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist() 

1043 overlap_positions.append(tuple(avg_pos)) 

1044 

1045 # Calculate additional metrics 

1046 if colocalized_pairs: 

1047 avg_distance = cp.mean([distances[i, j] for i, j in colocalized_pairs]) 

1048 max_distance_found = cp.max([distances[i, j] for i, j in colocalized_pairs]) 

1049 else: 

1050 avg_distance = 0 

1051 max_distance_found = 0 

1052 

1053 metrics = { 

1054 "average_colocalization_distance": float(avg_distance), 

1055 "max_colocalization_distance": float(max_distance_found), 

1056 "distance_threshold_used": max_distance 

1057 } 

1058 

1059 return MultiChannelResult( 

1060 slice_index=chan_1_result.slice_index, 

1061 chan_1_results=chan_1_result, 

1062 chan_2_results=chan_2_result, 

1063 colocalization_method="distance_based", 

1064 colocalized_count=colocalized_count, 

1065 colocalization_percentage=colocalization_percentage, 

1066 chan_1_only_count=chan_1_only, 

1067 chan_2_only_count=chan_2_only, 

1068 colocalization_metrics=metrics, 

1069 overlap_positions=overlap_positions 

1070 ) 

1071 

1072 

1073def _create_empty_coloc_result( 

1074 chan_1_result: CellCountResult, 

1075 chan_2_result: CellCountResult, 

1076 method: str 

1077) -> MultiChannelResult: 

1078 """Create empty colocalization result when no cells found.""" 

1079 return MultiChannelResult( 

1080 slice_index=chan_1_result.slice_index, 

1081 chan_1_results=chan_1_result, 

1082 chan_2_results=chan_2_result, 

1083 colocalization_method=method, 

1084 colocalized_count=0, 

1085 colocalization_percentage=0.0, 

1086 chan_1_only_count=chan_1_result.cell_count, 

1087 chan_2_only_count=chan_2_result.cell_count, 

1088 colocalization_metrics={}, 

1089 overlap_positions=[] 

1090 ) 

1091 

1092 

1093def _colocalization_overlap_based( 

1094 chan_1_result: CellCountResult, 

1095 chan_2_result: CellCountResult, 

1096 min_overlap_area: float 

1097) -> MultiChannelResult: 

1098 """Perform area overlap-based colocalization analysis.""" 

1099 # This is a simplified implementation - in practice, you'd need actual segmentation masks 

1100 # For now, we'll use distance as a proxy for overlap 

1101 

1102 # Use distance-based method with smaller threshold as approximation 

1103 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly 

1104 

1105 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold) 

1106 result.colocalization_method = "overlap_area" 

1107 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area 

1108 result.colocalization_metrics["note"] = "Approximated using distance-based method" 

1109 

1110 return result 

1111 

1112 

1113def _colocalization_intensity_based( 

1114 chan_1_result: CellCountResult, 

1115 chan_2_result: CellCountResult, 

1116 intensity_threshold: float 

1117) -> MultiChannelResult: 

1118 """Perform intensity correlation-based colocalization analysis.""" 

1119 

1120 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1121 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation") 

1122 

1123 # Use distance-based pairing first 

1124 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0) 

1125 

1126 # Filter pairs based on intensity correlation 

1127 colocalized_pairs = [] 

1128 overlap_positions = [] 

1129 

1130 pos_1 = cp.array(chan_1_result.cell_positions) 

1131 pos_2 = cp.array(chan_2_result.cell_positions) 

1132 

1133 for i, (x1, y1) in enumerate(chan_1_result.cell_positions): 

1134 for j, (x2, y2) in enumerate(chan_2_result.cell_positions): 

1135 # Calculate distance 

1136 dist = cp.sqrt((x1 - x2)**2 + (y1 - y2)**2) 

1137 

1138 if dist <= 5.0: # Within reasonable distance 

1139 # Check intensity correlation 

1140 int_1 = chan_1_result.cell_intensities[i] 

1141 int_2 = chan_2_result.cell_intensities[j] 

1142 

1143 # Simple intensity correlation: both above threshold 

1144 if int_1 >= intensity_threshold and int_2 >= intensity_threshold: 

1145 colocalized_pairs.append((i, j)) 

1146 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2) 

1147 overlap_positions.append(avg_pos) 

1148 break # One-to-one mapping 

1149 

1150 colocalized_count = len(colocalized_pairs) 

1151 total_cells = len(pos_1) + len(pos_2) 

1152 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1153 

1154 metrics = { 

1155 "intensity_threshold_used": intensity_threshold, 

1156 "correlation_method": "threshold_based" 

1157 } 

1158 

1159 return MultiChannelResult( 

1160 slice_index=chan_1_result.slice_index, 

1161 chan_1_results=chan_1_result, 

1162 chan_2_results=chan_2_result, 

1163 colocalization_method="intensity_correlation", 

1164 colocalized_count=colocalized_count, 

1165 colocalization_percentage=colocalization_percentage, 

1166 chan_1_only_count=len(pos_1) - colocalized_count, 

1167 chan_2_only_count=len(pos_2) - colocalized_count, 

1168 colocalization_metrics=metrics, 

1169 overlap_positions=overlap_positions 

1170 ) 

1171 

1172 

1173def _colocalization_manders( 

1174 chan_1_result: CellCountResult, 

1175 chan_2_result: CellCountResult, 

1176 intensity_threshold: float 

1177) -> MultiChannelResult: 

1178 """Calculate Manders colocalization coefficients.""" 

1179 

1180 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1181 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients") 

1182 

1183 # Simplified Manders calculation based on detected cells 

1184 # In practice, this would use pixel-level intensity analysis 

1185 

1186 # Use intensity-based method as foundation 

1187 intensity_result = _colocalization_intensity_based( 

1188 chan_1_result, chan_2_result, intensity_threshold 

1189 ) 

1190 

1191 # Calculate Manders-like coefficients 

1192 total_int_1 = sum(chan_1_result.cell_intensities) 

1193 total_int_2 = sum(chan_2_result.cell_intensities) 

1194 

1195 # Simplified: assume colocalized cells contribute their full intensity 

1196 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in 

1197 [(i, j) for i in range(len(chan_1_result.cell_positions)) 

1198 for j in range(len(chan_2_result.cell_positions)) 

1199 if (i, j) in [(0, 0)]]) # Simplified placeholder 

1200 

1201 # Manders coefficients (M1 and M2) 

1202 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0 

1203 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified 

1204 

1205 intensity_result.colocalization_method = "manders_coefficients" 

1206 intensity_result.colocalization_metrics.update({ 

1207 "manders_m1": m1, 

1208 "manders_m2": m2, 

1209 "note": "Simplified cell-based Manders calculation" 

1210 }) 

1211 

1212 return intensity_result 

1213 

1214 

1215def _create_segmentation_visualization( 

1216 image: cp.ndarray, 

1217 positions: List[Tuple[float, float]], 

1218 max_sigma: float, 

1219 cell_areas: List[float] = None, 

1220 binary_mask: cp.ndarray = None 

1221) -> cp.ndarray: 

1222 """Create segmentation visualization using actual binary mask if available.""" 

1223 

1224 # If we have the actual binary mask from detection, use it directly 

1225 if binary_mask is not None: 

1226 # Convert boolean mask to uint16 to match input image dtype 

1227 # Use max intensity for detected cells, 0 for background 

1228 max_intensity = image.max() if image.max() > 0 else 65535 

1229 return (binary_mask * max_intensity).astype(image.dtype) 

1230 

1231 # Fallback to original circular marker approach for blob methods 

1232 visualization = image.copy() 

1233 

1234 # Mark detected cells with their actual sizes 

1235 for i, (x, y) in enumerate(positions): 

1236 # Use actual cell area if available, otherwise fall back to max_sigma 

1237 if cell_areas and i < len(cell_areas): 

1238 # Convert area to radius (assuming circular cells) 

1239 radius = cp.sqrt(cell_areas[i] / cp.pi) 

1240 else: 

1241 # Fallback to max_sigma for backward compatibility 

1242 radius = max_sigma * 2 

1243 

1244 # Create circular markers with actual cell size 

1245 rr, cc = cp.ogrid[:image.shape[0], :image.shape[1]] 

1246 mask = (rr - y)**2 + (cc - x)**2 <= radius**2 

1247 

1248 # Ensure indices are within bounds 

1249 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1]) 

1250 mask = mask & valid_mask 

1251 

1252 visualization[mask] = visualization.max() # Bright markers 

1253 

1254 return visualization 

1255 

1256 

1257def _create_colocalization_map( 

1258 chan_1_img: cp.ndarray, 

1259 chan_2_img: cp.ndarray, 

1260 coloc_result: MultiChannelResult 

1261) -> cp.ndarray: 

1262 """Create colocalization visualization map.""" 

1263 # Create RGB-like visualization 

1264 coloc_map = cp.zeros_like(chan_1_img) 

1265 

1266 # Mark colocalized positions 

1267 for x, y in coloc_result.overlap_positions: 

1268 # Create markers for colocalized cells 

1269 rr, cc = cp.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]] 

1270 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius 

1271 

1272 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1]) 

1273 mask = mask & valid_mask 

1274 

1275 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers 

1276 

1277 return coloc_map