Coverage for openhcs/processing/backends/analysis/cell_counting_cupy.py: 14.2%

463 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2CuPy-based cell counting and multi-channel colocalization analysis for OpenHCS. 

3 

4This module provides comprehensive cell counting capabilities using CuPy and CuCIM, 

5supporting both single-channel and multi-channel analysis with various detection 

6methods and colocalization metrics. 

7""" 

8 

9import numpy as np # Keep for CPU fallbacks and data conversion 

10import logging 

11from typing import Dict, List, Tuple, Any, Optional, Union, TYPE_CHECKING 

12from dataclasses import dataclass 

13from enum import Enum 

14 

15# Import CuPy using the established optional import pattern 

16from openhcs.core.utils import optional_import 

17 

18cp = optional_import("cupy") 

19 

20# Type checking imports 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 pass 

23 

24logger = logging.getLogger(__name__) 

25 

26# Core scientific computing imports 

27import pandas as pd 

28import json 

29 

30# Optional imports using established pattern 

31cupyx_scipy = optional_import("cupyx.scipy") 

32cupyx_spatial_distance = optional_import("cupyx.scipy.spatial.distance") 

33cucim_feature = optional_import("cucim.skimage.feature") 

34cucim_filters = optional_import("cucim.skimage.filters") 

35cucim_segmentation = optional_import("cucim.skimage.segmentation") 

36cucim_morphology = optional_import("cucim.skimage.morphology") 

37cucim_measure = optional_import("cucim.skimage.measure") 

38cucim_exposure = optional_import("cucim.skimage.exposure") 

39 

40 

41 

42# OpenHCS imports 

43from openhcs.core.memory.decorators import cupy as cupy_func 

44from openhcs.core.pipeline.function_contracts import special_outputs 

45from openhcs.constants.constants import Backend 

46 

47 

48class DetectionMethod(Enum): 

49 """Cell detection methods available.""" 

50 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells) 

51 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose) 

52 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells) 

53 WATERSHED = "watershed" # Watershed segmentation (best for touching cells) 

54 THRESHOLD = "threshold" # Simple thresholding (fastest, basic) 

55 

56 

57class ColocalizationMethod(Enum): 

58 """Methods for multi-channel colocalization analysis.""" 

59 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap 

60 DISTANCE_BASED = "distance_based" # Based on centroid distances 

61 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation 

62 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients 

63 

64 

65class ThresholdMethod(Enum): 

66 """Automatic thresholding methods for watershed segmentation.""" 

67 OTSU = "otsu" # Otsu's method (good for bimodal histograms) 

68 LI = "li" # Li's method (good for low contrast images) 

69 MANUAL = "manual" # Manual threshold value (0.0-1.0) 

70 

71 

72 

73 

74 

75@dataclass 

76class CellCountResult: 

77 """Results for single-channel cell counting.""" 

78 slice_index: int 

79 method: str 

80 cell_count: int 

81 cell_positions: List[Tuple[float, float]] # (x, y) centroids 

82 cell_areas: List[float] 

83 cell_intensities: List[float] 

84 detection_confidence: List[float] 

85 parameters_used: Dict[str, Any] 

86 binary_mask: Optional[cp.ndarray] = None # Actual binary mask for segmentation methods 

87 

88 

89@dataclass 

90class MultiChannelResult: 

91 """Results for multi-channel cell counting and colocalization.""" 

92 slice_index: int 

93 chan_1_results: CellCountResult 

94 chan_2_results: CellCountResult 

95 colocalization_method: str 

96 colocalized_count: int 

97 colocalization_percentage: float 

98 chan_1_only_count: int 

99 chan_2_only_count: int 

100 colocalization_metrics: Dict[str, float] 

101 overlap_positions: List[Tuple[float, float]] 

102 

103 

104def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager, backend: str) -> str: 

105 """Materialize cell counting results as analysis-ready CSV and JSON formats.""" 

106 

107 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}") 

108 

109 # Convert CuPy binary_mask to NumPy if present 

110 if data: 

111 for result in data: 

112 if result.binary_mask is not None: 

113 result.binary_mask = cp.asnumpy(result.binary_mask) 

114 if isinstance(result, MultiChannelResult): 

115 if result.chan_1_results.binary_mask is not None: 

116 result.chan_1_results.binary_mask = cp.asnumpy(result.chan_1_results.binary_mask) 

117 if result.chan_2_results.binary_mask is not None: 

118 result.chan_2_results.binary_mask = cp.asnumpy(result.chan_2_results.binary_mask) 

119 

120 # Determine if this is single-channel or multi-channel data 

121 if not data: 

122 logger.warning("🔬 CELL_COUNT_MATERIALIZE: No data to materialize") 

123 return path 

124 

125 is_multi_channel = isinstance(data[0], MultiChannelResult) 

126 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}") 

127 

128 if is_multi_channel: 

129 return _materialize_multi_channel_results(data, path, filemanager, backend) 

130 else: 

131 return _materialize_single_channel_results(data, path, filemanager, backend) 

132 

133 

134def materialize_segmentation_masks(data: List[cp.ndarray], path: str, filemanager, backend: str) -> str: 

135 """ 

136 Materialize segmentation masks as individual TIFF files. 

137 

138 Single-backend signature - decorator automatically wraps for multi-backend support. 

139 """ 

140 

141 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, backend={backend}, masks_count={len(data) if data else 0}") 

142 

143 # Convert CuPy arrays to NumPy 

144 if data: 

145 data = [cp.asnumpy(mask) for mask in data] 

146 

147 if not data: 

148 logger.info("🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)") 

149 # Create empty summary file to indicate no masks were generated 

150 summary_path = path.replace('.pkl', '_segmentation_summary.txt') 

151 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n" 

152 filemanager.save(summary_content, summary_path, backend) 

153 return summary_path 

154 

155 # Generate output file paths based on the input path 

156 base_path = path.replace('.pkl', '') 

157 

158 # Save each mask as a separate TIFF file 

159 for i, mask in enumerate(data): 

160 mask_filename = f"{base_path}_slice_{i:03d}.tif" 

161 

162 # Convert mask to appropriate dtype for saving (uint16 to match input images) 

163 if mask.dtype != np.uint16: 

164 # Normalize to uint16 range if needed 

165 if mask.max() <= 1.0: 

166 mask_uint16 = (mask * 65535).astype(np.uint16) 

167 else: 

168 mask_uint16 = mask.astype(np.uint16) 

169 else: 

170 mask_uint16 = mask 

171 

172 # Save using filemanager with provided backend 

173 filemanager.save(mask_uint16, mask_filename, backend) 

174 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename} (backend={backend})") 

175 

176 # Return summary path 

177 summary_path = f"{base_path}_segmentation_summary.txt" 

178 summary_content = f"Segmentation masks saved: {len(data)} files\n" 

179 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n" 

180 summary_content += f"Mask dtype: {data[0].dtype}\n" 

181 summary_content += f"Mask shape: {data[0].shape}\n" 

182 

183 filemanager.save(summary_content, summary_path, backend) 

184 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks to backend={backend}") 

185 

186 return summary_path 

187 

188 

189@cupy_func 

190@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks)) 

191def count_cells_single_channel( 

192 image_stack: cp.ndarray, 

193 # Detection method and parameters 

194 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

195 # Blob detection parameters 

196 min_sigma: float = 1.0, # Minimum blob size (pixels) 

197 max_sigma: float = 10.0, # Maximum blob size (pixels) 

198 num_sigma: int = 10, # Number of sigma values to test 

199 threshold: float = 0.1, # Detection threshold (0.0-1.0) 

200 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

201 # Watershed parameters 

202 watershed_footprint_size: int = 3, # Local maxima footprint size 

203 watershed_min_distance: int = 5, # Minimum distance between peaks 

204 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods 

205 # Preprocessing parameters 

206 enable_preprocessing: bool = True, 

207 gaussian_sigma: float = 1.0, # Gaussian blur sigma 

208 median_disk_size: int = 1, # Median filter disk size 

209 # Filtering parameters 

210 min_cell_area: int = 10, # Minimum cell area (pixels) 

211 max_cell_area: int = 1000, # Maximum cell area (pixels) 

212 remove_border_cells: bool = True, # Remove cells touching image border 

213 # Output parameters 

214 return_segmentation_mask: bool = False 

215) -> Tuple[cp.ndarray, List[CellCountResult]]: 

216 """ 

217 Count cells in single-channel image stack using various detection methods. 

218 

219 Args: 

220 image_stack: 3D CuPy array (Z, Y, X) where each Z slice is processed independently 

221 detection_method: Method for cell detection (see DetectionMethod enum) 

222 min_sigma: Minimum blob size for blob detection methods 

223 max_sigma: Maximum blob size for blob detection methods 

224 num_sigma: Number of sigma values to test for blob detection 

225 threshold: Detection threshold (method-dependent) 

226 overlap: Maximum overlap between detected blobs 

227 watershed_footprint_size: Footprint size for local maxima detection 

228 watershed_min_distance: Minimum distance between watershed peaks 

229 watershed_threshold_method: Thresholding method for watershed 

230 enable_preprocessing: Apply Gaussian and median filtering 

231 gaussian_sigma: Standard deviation for Gaussian blur 

232 median_disk_size: Disk size for median filtering 

233 min_cell_area: Minimum area for valid cells 

234 max_cell_area: Maximum area for valid cells 

235 remove_border_cells: Remove cells touching image borders 

236 return_segmentation_mask: Return segmentation masks in output 

237  

238 Returns: 

239 output_stack: Original image stack unchanged (Z, Y, X) 

240 cell_count_results: List of CellCountResult objects for each slice 

241 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True 

242 """ 

243 if image_stack.ndim != 3: 

244 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

245 

246 results = [] 

247 segmentation_masks = [] 

248 

249 # Store parameters for reproducibility (convert enums to values) 

250 parameters = { 

251 "detection_method": detection_method.value, 

252 "min_sigma": min_sigma, 

253 "max_sigma": max_sigma, 

254 "num_sigma": num_sigma, 

255 "threshold": threshold, 

256 "overlap": overlap, 

257 "watershed_footprint_size": watershed_footprint_size, 

258 "watershed_min_distance": watershed_min_distance, 

259 "watershed_threshold_method": watershed_threshold_method.value, 

260 "gaussian_sigma": gaussian_sigma, 

261 "median_disk_size": median_disk_size, 

262 "min_cell_area": min_cell_area, 

263 "max_cell_area": max_cell_area, 

264 "remove_border_cells": remove_border_cells 

265 } 

266 

267 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method") 

268 

269 for z_idx in range(image_stack.shape[0]): 

270 slice_img = image_stack[z_idx].astype(cp.float64) 

271 

272 # Apply preprocessing if enabled 

273 if enable_preprocessing: 

274 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size) 

275 

276 # Detect cells using specified method 

277 result = _detect_cells_single_method( 

278 slice_img, z_idx, detection_method.value, parameters 

279 ) 

280 

281 results.append(result) 

282 

283 # Create segmentation mask if requested 

284 if return_segmentation_mask: 

285 segmentation_mask = _create_segmentation_visualization( 

286 slice_img, result.cell_positions, max_sigma, result.cell_areas, result.binary_mask 

287 ) 

288 segmentation_masks.append(segmentation_mask) 

289 

290 # Always return segmentation masks (empty list if not requested) 

291 # This ensures consistent return signature for special outputs system 

292 return image_stack, results, segmentation_masks 

293 

294 

295@cupy_func 

296@special_outputs(("multi_channel_counts", materialize_cell_counts)) 

297def count_cells_multi_channel( 

298 image_stack: cp.ndarray, 

299 chan_1: int, # Index of first channel (positional arg) 

300 chan_2: int, # Index of second channel (positional arg) 

301 # Detection parameters for channel 1 (all single-channel params available) 

302 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

303 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels) 

304 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels) 

305 chan_1_num_sigma: int = 10, # Number of sigma values to test 

306 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

307 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

308 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size 

309 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks 

310 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

311 chan_1_enable_preprocessing: bool = True, # Apply preprocessing 

312 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

313 chan_1_median_disk_size: int = 1, # Median filter disk size 

314 chan_1_min_area: int = 10, # Minimum cell area (pixels) 

315 chan_1_max_area: int = 1000, # Maximum cell area (pixels) 

316 chan_1_remove_border_cells: bool = True, # Remove cells touching border 

317 # Detection parameters for channel 2 (all single-channel params available) 

318 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

319 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels) 

320 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels) 

321 chan_2_num_sigma: int = 10, # Number of sigma values to test 

322 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

323 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

324 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size 

325 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks 

326 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

327 chan_2_enable_preprocessing: bool = True, # Apply preprocessing 

328 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

329 chan_2_median_disk_size: int = 1, # Median filter disk size 

330 chan_2_min_area: int = 10, # Minimum cell area (pixels) 

331 chan_2_max_area: int = 1000, # Maximum cell area (pixels) 

332 chan_2_remove_border_cells: bool = True, # Remove cells touching border 

333 # Colocalization parameters 

334 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods 

335 max_distance: float = 5.0, # Maximum distance for colocalization (pixels) 

336 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method 

337 intensity_threshold: float = 0.5, # Threshold for intensity-based methods 

338 # Output parameters 

339 return_colocalization_map: bool = False 

340) -> Tuple[cp.ndarray, List[MultiChannelResult]]: 

341 """ 

342 Count cells in multi-channel image stack with colocalization analysis. 

343 

344 Each channel can be processed with independent parameters, providing the same 

345 flexibility as the single-channel function for each channel separately. 

346 

347 Args: 

348 image_stack: 3D CuPy array (Z, Y, X) where Z represents different channels 

349 chan_1: Index of first channel in the stack (positional) 

350 chan_2: Index of second channel in the stack (positional) 

351 

352 # Channel 1 detection parameters (same as single-channel function) 

353 chan_1_method: Detection method for channel 1 (DetectionMethod enum) 

354 chan_1_min_sigma: Minimum blob size for channel 1 

355 chan_1_max_sigma: Maximum blob size for channel 1 

356 chan_1_num_sigma: Number of sigma values to test for channel 1 

357 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0) 

358 chan_1_overlap: Maximum overlap between blobs for channel 1 

359 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1 

360 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1 

361 chan_1_watershed_threshold_method: Thresholding method for channel 1 

362 chan_1_enable_preprocessing: Apply preprocessing to channel 1 

363 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1 

364 chan_1_median_disk_size: Median filter size for channel 1 

365 chan_1_min_area: Minimum cell area for channel 1 

366 chan_1_max_area: Maximum cell area for channel 1 

367 chan_1_remove_border_cells: Remove border cells for channel 1 

368 

369 # Channel 2 detection parameters (same as single-channel function) 

370 chan_2_method: Detection method for channel 2 (DetectionMethod enum) 

371 chan_2_min_sigma: Minimum blob size for channel 2 

372 chan_2_max_sigma: Maximum blob size for channel 2 

373 chan_2_num_sigma: Number of sigma values to test for channel 2 

374 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0) 

375 chan_2_overlap: Maximum overlap between blobs for channel 2 

376 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2 

377 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2 

378 chan_2_watershed_threshold_method: Thresholding method for channel 2 

379 chan_2_enable_preprocessing: Apply preprocessing to channel 2 

380 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2 

381 chan_2_median_disk_size: Median filter size for channel 2 

382 chan_2_min_area: Minimum cell area for channel 2 

383 chan_2_max_area: Maximum cell area for channel 2 

384 chan_2_remove_border_cells: Remove border cells for channel 2 

385 

386 # Colocalization parameters 

387 colocalization_method: Method for determining colocalization (ColocalizationMethod enum) 

388 max_distance: Maximum distance for distance-based colocalization (pixels) 

389 min_overlap_area: Minimum overlap fraction for area-based colocalization 

390 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0) 

391 return_colocalization_map: Return colocalization visualization 

392 

393 Returns: 

394 output_stack: Original images or colocalization maps 

395 multi_channel_results: List of MultiChannelResult objects 

396 """ 

397 if image_stack.ndim != 3: 

398 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

399 

400 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]: 

401 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}") 

402 

403 if chan_1 == chan_2: 

404 raise ValueError("Channel 1 and Channel 2 must be different") 

405 

406 # Extract channel images 

407 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency 

408 chan_2_img = image_stack[chan_2:chan_2+1] 

409 

410 # Count cells in each channel separately using the single-channel function 

411 # Channel 1 parameters (all explicit) 

412 chan_1_params = { 

413 "detection_method": chan_1_method, 

414 "min_sigma": chan_1_min_sigma, 

415 "max_sigma": chan_1_max_sigma, 

416 "num_sigma": chan_1_num_sigma, 

417 "threshold": chan_1_threshold, 

418 "overlap": chan_1_overlap, 

419 "watershed_footprint_size": chan_1_watershed_footprint_size, 

420 "watershed_min_distance": chan_1_watershed_min_distance, 

421 "watershed_threshold_method": chan_1_watershed_threshold_method, 

422 "enable_preprocessing": chan_1_enable_preprocessing, 

423 "gaussian_sigma": chan_1_gaussian_sigma, 

424 "median_disk_size": chan_1_median_disk_size, 

425 "min_cell_area": chan_1_min_area, 

426 "max_cell_area": chan_1_max_area, 

427 "remove_border_cells": chan_1_remove_border_cells, 

428 "return_segmentation_mask": False 

429 } 

430 

431 # Channel 2 parameters (all explicit) 

432 chan_2_params = { 

433 "detection_method": chan_2_method, 

434 "min_sigma": chan_2_min_sigma, 

435 "max_sigma": chan_2_max_sigma, 

436 "num_sigma": chan_2_num_sigma, 

437 "threshold": chan_2_threshold, 

438 "overlap": chan_2_overlap, 

439 "watershed_footprint_size": chan_2_watershed_footprint_size, 

440 "watershed_min_distance": chan_2_watershed_min_distance, 

441 "watershed_threshold_method": chan_2_watershed_threshold_method, 

442 "enable_preprocessing": chan_2_enable_preprocessing, 

443 "gaussian_sigma": chan_2_gaussian_sigma, 

444 "median_disk_size": chan_2_median_disk_size, 

445 "min_cell_area": chan_2_min_area, 

446 "max_cell_area": chan_2_max_area, 

447 "remove_border_cells": chan_2_remove_border_cells, 

448 "return_segmentation_mask": False 

449 } 

450 

451 # Process each channel 

452 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params) 

453 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params) 

454 

455 # Perform colocalization analysis 

456 multi_results = [] 

457 output_stack = image_stack.copy() 

458 

459 # Since we're processing single slices from each channel, we only have one result each 

460 chan_1_result = chan_1_results[0] 

461 chan_2_result = chan_2_results[0] 

462 

463 # Analyze colocalization 

464 coloc_result = _analyze_colocalization( 

465 chan_1_result, chan_2_result, colocalization_method.value, 

466 max_distance, min_overlap_area, intensity_threshold 

467 ) 

468 

469 multi_results.append(coloc_result) 

470 

471 # Create colocalization visualization if requested 

472 if return_colocalization_map: 

473 coloc_map = _create_colocalization_map( 

474 image_stack[chan_1], image_stack[chan_2], coloc_result 

475 ) 

476 # Replace one of the channels with the colocalization map 

477 output_stack = cp.stack([image_stack[chan_1], image_stack[chan_2], coloc_map]) 

478 

479 return output_stack, multi_results 

480 

481 

482def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager, backend: str) -> str: 

483 """Materialize single-channel cell counting results.""" 

484 # Generate output file paths based on the input path 

485 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

486 base_path = path.replace('.pkl', '') 

487 json_path = f"{base_path}.json" 

488 csv_path = f"{base_path}_details.csv" 

489 

490 # Ensure output directory exists for disk backend 

491 from pathlib import Path 

492 from openhcs.constants.constants import Backend 

493 output_dir = Path(json_path).parent 

494 if backend == Backend.DISK.value: 

495 filemanager.ensure_directory(str(output_dir), backend) 

496 

497 summary = { 

498 "analysis_type": "single_channel_cell_counting", 

499 "total_slices": len(data), 

500 "results_per_slice": [] 

501 } 

502 rows = [] 

503 

504 total_cells = 0 

505 for result in data: 

506 total_cells += result.cell_count 

507 

508 # Add to summary 

509 summary["results_per_slice"].append({ 

510 "slice_index": result.slice_index, 

511 "method": result.method, 

512 "cell_count": result.cell_count, 

513 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0, 

514 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0, 

515 "parameters": result.parameters_used 

516 }) 

517 

518 # Add individual cell data to CSV 

519 for i, (pos, area, intensity, confidence) in enumerate(zip( 

520 result.cell_positions, result.cell_areas, 

521 result.cell_intensities, result.detection_confidence 

522 )): 

523 rows.append({ 

524 'slice_index': result.slice_index, 

525 'cell_id': f"slice_{result.slice_index}_cell_{i}", 

526 'x_position': pos[0], 

527 'y_position': pos[1], 

528 'cell_area': area, 

529 'cell_intensity': intensity, 

530 'detection_confidence': confidence, 

531 'detection_method': result.method 

532 }) 

533 

534 summary["total_cells_all_slices"] = total_cells 

535 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0 

536 

537 # Save JSON summary (overwrite if exists) 

538 json_content = json.dumps(summary, indent=2, default=str) 

539 # Remove existing file if it exists using filemanager 

540 if filemanager.exists(json_path, backend): 

541 filemanager.delete(json_path, backend) 

542 filemanager.save(json_content, json_path, backend) 

543 

544 # Save CSV details (overwrite if exists) 

545 if rows: 

546 df = pd.DataFrame(rows) 

547 csv_content = df.to_csv(index=False) 

548 # Remove existing file if it exists using filemanager 

549 if filemanager.exists(csv_path, backend): 

550 filemanager.delete(csv_path, backend) 

551 filemanager.save(csv_content, csv_path, backend) 

552 

553 return json_path 

554 

555 

556def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager, backend: str) -> str: 

557 """Materialize multi-channel cell counting and colocalization results.""" 

558 # Generate output file paths based on the input path 

559 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

560 base_path = path.replace('.pkl', '') 

561 json_path = f"{base_path}.json" 

562 csv_path = f"{base_path}_details.csv" 

563 

564 # Ensure output directory exists for disk backend 

565 from pathlib import Path 

566 output_dir = Path(json_path).parent 

567 if backend == Backend.DISK.value: 

568 filemanager.ensure_directory(str(output_dir), backend) 

569 

570 summary = { 

571 "analysis_type": "multi_channel_cell_counting_colocalization", 

572 "total_slices": len(data), 

573 "colocalization_summary": { 

574 "total_chan_1_cells": 0, 

575 "total_chan_2_cells": 0, 

576 "total_colocalized": 0, 

577 "average_colocalization_percentage": 0 

578 }, 

579 "results_per_slice": [] 

580 } 

581 

582 # CSV for detailed analysis (csv_path already defined above) 

583 rows = [] 

584 

585 total_coloc_pct = 0 

586 for result in data: 

587 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count 

588 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count 

589 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count 

590 total_coloc_pct += result.colocalization_percentage 

591 

592 # Add to summary 

593 summary["results_per_slice"].append({ 

594 "slice_index": result.slice_index, 

595 "chan_1_count": result.chan_1_results.cell_count, 

596 "chan_2_count": result.chan_2_results.cell_count, 

597 "colocalized_count": result.colocalized_count, 

598 "colocalization_percentage": result.colocalization_percentage, 

599 "chan_1_only": result.chan_1_only_count, 

600 "chan_2_only": result.chan_2_only_count, 

601 "colocalization_method": result.colocalization_method, 

602 "colocalization_metrics": result.colocalization_metrics 

603 }) 

604 

605 # Add colocalization details to CSV 

606 for i, pos in enumerate(result.overlap_positions): 

607 rows.append({ 

608 'slice_index': result.slice_index, 

609 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}", 

610 'x_position': pos[0], 

611 'y_position': pos[1], 

612 'colocalization_method': result.colocalization_method 

613 }) 

614 

615 summary["colocalization_summary"]["average_colocalization_percentage"] = ( 

616 total_coloc_pct / len(data) if data else 0 

617 ) 

618 

619 # Save JSON summary (overwrite if exists) 

620 json_content = json.dumps(summary, indent=2, default=str) 

621 # Remove existing file if it exists using filemanager 

622 if filemanager.exists(json_path, backend): 

623 filemanager.delete(json_path, backend) 

624 filemanager.save(json_content, json_path, backend) 

625 

626 # Save CSV details (overwrite if exists) 

627 if rows: 

628 df = pd.DataFrame(rows) 

629 csv_content = df.to_csv(index=False) 

630 # Remove existing file if it exists using filemanager 

631 if filemanager.exists(csv_path, backend): 

632 filemanager.delete(csv_path, backend) 

633 filemanager.save(csv_content, csv_path, backend) 

634 

635 return json_path 

636 

637 

638def _preprocess_image(image: cp.ndarray, gaussian_sigma: float, median_disk_size: int) -> cp.ndarray: 

639 """Apply preprocessing to enhance cell detection.""" 

640 # Gaussian blur to reduce noise 

641 if gaussian_sigma > 0: 

642 image = gaussian(image, sigma=gaussian_sigma, preserve_range=True) 

643 

644 # Median filter to remove salt-and-pepper noise 

645 if median_disk_size > 0: 

646 image = median(image, disk(median_disk_size)) 

647 

648 return image 

649 

650 

651def _detect_cells_single_method( 

652 image: cp.ndarray, 

653 slice_idx: int, 

654 method: str, 

655 params: Dict[str, Any] 

656) -> CellCountResult: 

657 """Detect cells using specified method.""" 

658 

659 if method == DetectionMethod.BLOB_LOG.value: 

660 return _detect_cells_blob_log(image, slice_idx, params) 

661 elif method == DetectionMethod.BLOB_DOG.value: 

662 return _detect_cells_blob_dog(image, slice_idx, params) 

663 elif method == DetectionMethod.BLOB_DOH.value: 

664 return _detect_cells_blob_doh(image, slice_idx, params) 

665 elif method == DetectionMethod.WATERSHED.value: 

666 return _detect_cells_watershed(image, slice_idx, params) 

667 elif method == DetectionMethod.THRESHOLD.value: 

668 return _detect_cells_threshold(image, slice_idx, params) 

669 else: 

670 raise ValueError(f"Unknown detection method: {method}") 

671 

672 

673def _detect_cells_blob_log(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

674 """Detect cells using Laplacian of Gaussian blob detection.""" 

675 blobs = blob_log( 

676 image, 

677 min_sigma=params["min_sigma"], 

678 max_sigma=params["max_sigma"], 

679 num_sigma=params["num_sigma"], 

680 threshold=params["threshold"], 

681 overlap=params["overlap"] 

682 ) 

683 

684 # Extract positions, areas, and intensities 

685 positions = [] 

686 areas = [] 

687 intensities = [] 

688 confidences = [] 

689 

690 for blob in blobs: 

691 y, x, sigma = blob 

692 positions.append((float(x), float(y))) 

693 

694 # Estimate area from sigma (blob radius ≈ sigma * sqrt(2)) 

695 radius = sigma * cp.sqrt(2) 

696 area = cp.pi * radius**2 

697 areas.append(float(area)) 

698 

699 # Sample intensity at blob center 

700 intensity = float(image[int(y), int(x)]) 

701 intensities.append(intensity) 

702 

703 # Use sigma as confidence measure (larger blobs = higher confidence) 

704 confidence = float(sigma / params["max_sigma"]) 

705 confidences.append(confidence) 

706 

707 # Filter by area constraints 

708 filtered_data = _filter_by_area( 

709 positions, areas, intensities, confidences, 

710 params["min_cell_area"], params["max_cell_area"] 

711 ) 

712 

713 return CellCountResult( 

714 slice_index=slice_idx, 

715 method="blob_log", 

716 cell_count=len(filtered_data[0]), 

717 cell_positions=filtered_data[0], 

718 cell_areas=filtered_data[1], 

719 cell_intensities=filtered_data[2], 

720 detection_confidence=filtered_data[3], 

721 parameters_used=params 

722 ) 

723 

724 

725def _detect_cells_blob_dog(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

726 """Detect cells using Difference of Gaussian blob detection.""" 

727 blobs = blob_dog( 

728 image, 

729 min_sigma=params["min_sigma"], 

730 max_sigma=params["max_sigma"], 

731 threshold=params["threshold"], 

732 overlap=params["overlap"] 

733 ) 

734 

735 # Process similar to blob_log 

736 positions = [] 

737 areas = [] 

738 intensities = [] 

739 confidences = [] 

740 

741 for blob in blobs: 

742 y, x, sigma = blob 

743 positions.append((float(x), float(y))) 

744 

745 radius = sigma * cp.sqrt(2) 

746 area = cp.pi * radius**2 

747 areas.append(float(area)) 

748 

749 intensity = float(image[int(y), int(x)]) 

750 intensities.append(intensity) 

751 

752 confidence = float(sigma / params["max_sigma"]) 

753 confidences.append(confidence) 

754 

755 filtered_data = _filter_by_area( 

756 positions, areas, intensities, confidences, 

757 params["min_cell_area"], params["max_cell_area"] 

758 ) 

759 

760 return CellCountResult( 

761 slice_index=slice_idx, 

762 method="blob_dog", 

763 cell_count=len(filtered_data[0]), 

764 cell_positions=filtered_data[0], 

765 cell_areas=filtered_data[1], 

766 cell_intensities=filtered_data[2], 

767 detection_confidence=filtered_data[3], 

768 parameters_used=params 

769 ) 

770 

771 

772def _detect_cells_blob_doh(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

773 """Detect cells using Determinant of Hessian blob detection.""" 

774 blobs = blob_doh( 

775 image, 

776 min_sigma=params["min_sigma"], 

777 max_sigma=params["max_sigma"], 

778 num_sigma=params["num_sigma"], 

779 threshold=params["threshold"], 

780 overlap=params["overlap"] 

781 ) 

782 

783 # Process similar to other blob methods 

784 positions = [] 

785 areas = [] 

786 intensities = [] 

787 confidences = [] 

788 

789 for blob in blobs: 

790 y, x, sigma = blob 

791 positions.append((float(x), float(y))) 

792 

793 radius = sigma * cp.sqrt(2) 

794 area = cp.pi * radius**2 

795 areas.append(float(area)) 

796 

797 intensity = float(image[int(y), int(x)]) 

798 intensities.append(intensity) 

799 

800 confidence = float(sigma / params["max_sigma"]) 

801 confidences.append(confidence) 

802 

803 filtered_data = _filter_by_area( 

804 positions, areas, intensities, confidences, 

805 params["min_cell_area"], params["max_cell_area"] 

806 ) 

807 

808 return CellCountResult( 

809 slice_index=slice_idx, 

810 method="blob_doh", 

811 cell_count=len(filtered_data[0]), 

812 cell_positions=filtered_data[0], 

813 cell_areas=filtered_data[1], 

814 cell_intensities=filtered_data[2], 

815 detection_confidence=filtered_data[3], 

816 parameters_used=params 

817 ) 

818 

819 

820def _filter_by_area( 

821 positions: List[Tuple[float, float]], 

822 areas: List[float], 

823 intensities: List[float], 

824 confidences: List[float], 

825 min_area: float, 

826 max_area: float 

827) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]: 

828 """Filter detected cells by area constraints.""" 

829 filtered_positions = [] 

830 filtered_areas = [] 

831 filtered_intensities = [] 

832 filtered_confidences = [] 

833 

834 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences): 

835 if min_area <= area <= max_area: 

836 filtered_positions.append(pos) 

837 filtered_areas.append(area) 

838 filtered_intensities.append(intensity) 

839 filtered_confidences.append(confidence) 

840 

841 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences 

842 

843 

844def _detect_cells_watershed(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

845 """Detect cells using watershed segmentation.""" 

846 # Determine threshold 

847 if params["watershed_threshold_method"] == "otsu": 

848 threshold_val = threshold_otsu(image) 

849 elif params["watershed_threshold_method"] == "li": 

850 threshold_val = threshold_li(image) 

851 else: 

852 threshold_val = float(params["watershed_threshold_method"]) 

853 

854 # Create binary mask 

855 binary = image > threshold_val 

856 

857 # Remove small objects and border objects 

858 binary = remove_small_objects(binary, min_size=params["min_cell_area"]) 

859 if params["remove_border_cells"]: 

860 binary = clear_border(binary) 

861 

862 # Find local maxima as seeds 

863 distance = ndimage.distance_transform_edt(binary) 

864 local_maxima = peak_local_max( 

865 distance, 

866 min_distance=params["watershed_min_distance"], 

867 footprint=cp.ones((params["watershed_footprint_size"], params["watershed_footprint_size"])) 

868 ) 

869 

870 # Convert coordinates to binary mask 

871 local_maxima_mask = cp.zeros_like(distance, dtype=bool) 

872 if len(local_maxima) > 0: 

873 local_maxima_mask[local_maxima[:, 0], local_maxima[:, 1]] = True 

874 

875 # Create markers for watershed 

876 # Convert boolean mask to integer labels for connected components 

877 markers = label(local_maxima_mask.astype(cp.uint8)) 

878 

879 # Apply watershed 

880 labels = watershed(-distance, markers, mask=binary) 

881 

882 # Extract region properties 

883 regions = regionprops(labels, intensity_image=image) 

884 

885 positions = [] 

886 areas = [] 

887 intensities = [] 

888 confidences = [] 

889 valid_labels = [] # Track which labels pass the size filter 

890 

891 for region in regions: 

892 # Filter by area 

893 if params["min_cell_area"] <= region.area <= params["max_cell_area"]: 

894 # Centroid (note: regionprops returns (row, col) = (y, x)) 

895 y, x = region.centroid 

896 positions.append((float(x), float(y))) 

897 

898 areas.append(float(region.area)) 

899 intensities.append(float(region.mean_intensity)) 

900 

901 # Use area as confidence measure (normalized) 

902 confidence = min(1.0, region.area / params["max_cell_area"]) 

903 confidences.append(confidence) 

904 

905 # Track this label as valid 

906 valid_labels.append(region.label) 

907 

908 # Create filtered binary mask with only cells that passed size filter 

909 filtered_binary_mask = cp.isin(labels, cp.array(valid_labels)) 

910 

911 return CellCountResult( 

912 slice_index=slice_idx, 

913 method="watershed", 

914 cell_count=len(positions), 

915 cell_positions=positions, 

916 cell_areas=areas, 

917 cell_intensities=intensities, 

918 detection_confidence=confidences, 

919 parameters_used=params, 

920 binary_mask=filtered_binary_mask # Only cells that passed all filters 

921 ) 

922 

923 

924def _detect_cells_threshold(image: cp.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

925 """Detect cells using simple thresholding and connected components.""" 

926 # Apply threshold 

927 binary = image > params["threshold"] * image.max() 

928 

929 # Remove small objects and border objects 

930 binary = remove_small_objects(binary, min_size=params["min_cell_area"]) 

931 if params["remove_border_cells"]: 

932 binary = clear_border(binary) 

933 

934 # Label connected components 

935 labels = label(binary) 

936 regions = regionprops(labels, intensity_image=image) 

937 

938 positions = [] 

939 areas = [] 

940 intensities = [] 

941 confidences = [] 

942 valid_labels = [] # Track which labels pass the size filter 

943 

944 for region in regions: 

945 # Filter by area 

946 if params["min_cell_area"] <= region.area <= params["max_cell_area"]: 

947 y, x = region.centroid 

948 positions.append((float(x), float(y))) 

949 

950 areas.append(float(region.area)) 

951 intensities.append(float(region.mean_intensity)) 

952 

953 # Use intensity as confidence measure 

954 confidence = float(region.mean_intensity / image.max()) 

955 confidences.append(confidence) 

956 

957 # Track this label as valid 

958 valid_labels.append(region.label) 

959 

960 # Create filtered binary mask with only cells that passed size filter 

961 filtered_binary_mask = cp.isin(labels, cp.array(valid_labels)) 

962 

963 return CellCountResult( 

964 slice_index=slice_idx, 

965 method="threshold", 

966 cell_count=len(positions), 

967 cell_positions=positions, 

968 cell_areas=areas, 

969 cell_intensities=intensities, 

970 detection_confidence=confidences, 

971 parameters_used=params, 

972 binary_mask=filtered_binary_mask # Only cells that passed all filters 

973 ) 

974 

975 

976def _analyze_colocalization( 

977 chan_1_result: CellCountResult, 

978 chan_2_result: CellCountResult, 

979 method: str, 

980 max_distance: float, 

981 min_overlap_area: float, 

982 intensity_threshold: float 

983) -> MultiChannelResult: 

984 """Analyze colocalization between two channels.""" 

985 

986 if method == ColocalizationMethod.DISTANCE_BASED.value: 

987 return _colocalization_distance_based( 

988 chan_1_result, chan_2_result, max_distance 

989 ) 

990 elif method == ColocalizationMethod.OVERLAP_AREA.value: 

991 return _colocalization_overlap_based( 

992 chan_1_result, chan_2_result, min_overlap_area 

993 ) 

994 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value: 

995 return _colocalization_intensity_based( 

996 chan_1_result, chan_2_result, intensity_threshold 

997 ) 

998 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value: 

999 return _colocalization_manders( 

1000 chan_1_result, chan_2_result, intensity_threshold 

1001 ) 

1002 else: 

1003 raise ValueError(f"Unknown colocalization method: {method}") 

1004 

1005 

1006def _colocalization_distance_based( 

1007 chan_1_result: CellCountResult, 

1008 chan_2_result: CellCountResult, 

1009 max_distance: float 

1010) -> MultiChannelResult: 

1011 """Perform distance-based colocalization analysis.""" 

1012 

1013 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1014 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based") 

1015 

1016 # Convert positions to arrays 

1017 pos_1 = cp.array(chan_1_result.cell_positions) 

1018 pos_2 = cp.array(chan_2_result.cell_positions) 

1019 

1020 # Calculate pairwise distances 

1021 distances = cdist(pos_1, pos_2) 

1022 

1023 # Find colocalized pairs 

1024 colocalized_pairs = [] 

1025 used_chan_2 = set() 

1026 

1027 for i in range(len(pos_1)): 

1028 # Find closest cell in channel 2 

1029 min_dist_idx = cp.argmin(distances[i]) 

1030 min_dist = distances[i, min_dist_idx] 

1031 

1032 # Check if within distance threshold and not already used 

1033 if min_dist <= max_distance and min_dist_idx not in used_chan_2: 

1034 colocalized_pairs.append((i, min_dist_idx)) 

1035 used_chan_2.add(min_dist_idx) 

1036 

1037 # Calculate metrics 

1038 colocalized_count = len(colocalized_pairs) 

1039 total_cells = len(pos_1) + len(pos_2) 

1040 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1041 

1042 chan_1_only = len(pos_1) - colocalized_count 

1043 chan_2_only = len(pos_2) - colocalized_count 

1044 

1045 # Extract colocalized positions (average of paired positions) 

1046 overlap_positions = [] 

1047 for i, j in colocalized_pairs: 

1048 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist() 

1049 overlap_positions.append(tuple(avg_pos)) 

1050 

1051 # Calculate additional metrics 

1052 if colocalized_pairs: 

1053 avg_distance = cp.mean([distances[i, j] for i, j in colocalized_pairs]) 

1054 max_distance_found = cp.max([distances[i, j] for i, j in colocalized_pairs]) 

1055 else: 

1056 avg_distance = 0 

1057 max_distance_found = 0 

1058 

1059 metrics = { 

1060 "average_colocalization_distance": float(avg_distance), 

1061 "max_colocalization_distance": float(max_distance_found), 

1062 "distance_threshold_used": max_distance 

1063 } 

1064 

1065 return MultiChannelResult( 

1066 slice_index=chan_1_result.slice_index, 

1067 chan_1_results=chan_1_result, 

1068 chan_2_results=chan_2_result, 

1069 colocalization_method="distance_based", 

1070 colocalized_count=colocalized_count, 

1071 colocalization_percentage=colocalization_percentage, 

1072 chan_1_only_count=chan_1_only, 

1073 chan_2_only_count=chan_2_only, 

1074 colocalization_metrics=metrics, 

1075 overlap_positions=overlap_positions 

1076 ) 

1077 

1078 

1079def _create_empty_coloc_result( 

1080 chan_1_result: CellCountResult, 

1081 chan_2_result: CellCountResult, 

1082 method: str 

1083) -> MultiChannelResult: 

1084 """Create empty colocalization result when no cells found.""" 

1085 return MultiChannelResult( 

1086 slice_index=chan_1_result.slice_index, 

1087 chan_1_results=chan_1_result, 

1088 chan_2_results=chan_2_result, 

1089 colocalization_method=method, 

1090 colocalized_count=0, 

1091 colocalization_percentage=0.0, 

1092 chan_1_only_count=chan_1_result.cell_count, 

1093 chan_2_only_count=chan_2_result.cell_count, 

1094 colocalization_metrics={}, 

1095 overlap_positions=[] 

1096 ) 

1097 

1098 

1099def _colocalization_overlap_based( 

1100 chan_1_result: CellCountResult, 

1101 chan_2_result: CellCountResult, 

1102 min_overlap_area: float 

1103) -> MultiChannelResult: 

1104 """Perform area overlap-based colocalization analysis.""" 

1105 # This is a simplified implementation - in practice, you'd need actual segmentation masks 

1106 # For now, we'll use distance as a proxy for overlap 

1107 

1108 # Use distance-based method with smaller threshold as approximation 

1109 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly 

1110 

1111 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold) 

1112 result.colocalization_method = "overlap_area" 

1113 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area 

1114 result.colocalization_metrics["note"] = "Approximated using distance-based method" 

1115 

1116 return result 

1117 

1118 

1119def _colocalization_intensity_based( 

1120 chan_1_result: CellCountResult, 

1121 chan_2_result: CellCountResult, 

1122 intensity_threshold: float 

1123) -> MultiChannelResult: 

1124 """Perform intensity correlation-based colocalization analysis.""" 

1125 

1126 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1127 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation") 

1128 

1129 # Use distance-based pairing first 

1130 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0) 

1131 

1132 # Filter pairs based on intensity correlation 

1133 colocalized_pairs = [] 

1134 overlap_positions = [] 

1135 

1136 pos_1 = cp.array(chan_1_result.cell_positions) 

1137 pos_2 = cp.array(chan_2_result.cell_positions) 

1138 

1139 for i, (x1, y1) in enumerate(chan_1_result.cell_positions): 

1140 for j, (x2, y2) in enumerate(chan_2_result.cell_positions): 

1141 # Calculate distance 

1142 dist = cp.sqrt((x1 - x2)**2 + (y1 - y2)**2) 

1143 

1144 if dist <= 5.0: # Within reasonable distance 

1145 # Check intensity correlation 

1146 int_1 = chan_1_result.cell_intensities[i] 

1147 int_2 = chan_2_result.cell_intensities[j] 

1148 

1149 # Simple intensity correlation: both above threshold 

1150 if int_1 >= intensity_threshold and int_2 >= intensity_threshold: 

1151 colocalized_pairs.append((i, j)) 

1152 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2) 

1153 overlap_positions.append(avg_pos) 

1154 break # One-to-one mapping 

1155 

1156 colocalized_count = len(colocalized_pairs) 

1157 total_cells = len(pos_1) + len(pos_2) 

1158 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1159 

1160 metrics = { 

1161 "intensity_threshold_used": intensity_threshold, 

1162 "correlation_method": "threshold_based" 

1163 } 

1164 

1165 return MultiChannelResult( 

1166 slice_index=chan_1_result.slice_index, 

1167 chan_1_results=chan_1_result, 

1168 chan_2_results=chan_2_result, 

1169 colocalization_method="intensity_correlation", 

1170 colocalized_count=colocalized_count, 

1171 colocalization_percentage=colocalization_percentage, 

1172 chan_1_only_count=len(pos_1) - colocalized_count, 

1173 chan_2_only_count=len(pos_2) - colocalized_count, 

1174 colocalization_metrics=metrics, 

1175 overlap_positions=overlap_positions 

1176 ) 

1177 

1178 

1179def _colocalization_manders( 

1180 chan_1_result: CellCountResult, 

1181 chan_2_result: CellCountResult, 

1182 intensity_threshold: float 

1183) -> MultiChannelResult: 

1184 """Calculate Manders colocalization coefficients.""" 

1185 

1186 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1187 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients") 

1188 

1189 # Simplified Manders calculation based on detected cells 

1190 # In practice, this would use pixel-level intensity analysis 

1191 

1192 # Use intensity-based method as foundation 

1193 intensity_result = _colocalization_intensity_based( 

1194 chan_1_result, chan_2_result, intensity_threshold 

1195 ) 

1196 

1197 # Calculate Manders-like coefficients 

1198 total_int_1 = sum(chan_1_result.cell_intensities) 

1199 total_int_2 = sum(chan_2_result.cell_intensities) 

1200 

1201 # Simplified: assume colocalized cells contribute their full intensity 

1202 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in 

1203 [(i, j) for i in range(len(chan_1_result.cell_positions)) 

1204 for j in range(len(chan_2_result.cell_positions)) 

1205 if (i, j) in [(0, 0)]]) # Simplified placeholder 

1206 

1207 # Manders coefficients (M1 and M2) 

1208 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0 

1209 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified 

1210 

1211 intensity_result.colocalization_method = "manders_coefficients" 

1212 intensity_result.colocalization_metrics.update({ 

1213 "manders_m1": m1, 

1214 "manders_m2": m2, 

1215 "note": "Simplified cell-based Manders calculation" 

1216 }) 

1217 

1218 return intensity_result 

1219 

1220 

1221def _create_segmentation_visualization( 

1222 image: cp.ndarray, 

1223 positions: List[Tuple[float, float]], 

1224 max_sigma: float, 

1225 cell_areas: List[float] = None, 

1226 binary_mask: cp.ndarray = None 

1227) -> cp.ndarray: 

1228 """Create segmentation visualization using actual binary mask if available.""" 

1229 

1230 # If we have the actual binary mask from detection, use it directly 

1231 if binary_mask is not None: 

1232 # Convert boolean mask to uint16 to match input image dtype 

1233 # Use max intensity for detected cells, 0 for background 

1234 max_intensity = image.max() if image.max() > 0 else 65535 

1235 return (binary_mask * max_intensity).astype(image.dtype) 

1236 

1237 # Fallback to original circular marker approach for blob methods 

1238 visualization = image.copy() 

1239 

1240 # Mark detected cells with their actual sizes 

1241 for i, (x, y) in enumerate(positions): 

1242 # Use actual cell area if available, otherwise fall back to max_sigma 

1243 if cell_areas and i < len(cell_areas): 

1244 # Convert area to radius (assuming circular cells) 

1245 radius = cp.sqrt(cell_areas[i] / cp.pi) 

1246 else: 

1247 # Fallback to max_sigma for backward compatibility 

1248 radius = max_sigma * 2 

1249 

1250 # Create circular markers with actual cell size 

1251 rr, cc = cp.ogrid[:image.shape[0], :image.shape[1]] 

1252 mask = (rr - y)**2 + (cc - x)**2 <= radius**2 

1253 

1254 # Ensure indices are within bounds 

1255 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1]) 

1256 mask = mask & valid_mask 

1257 

1258 visualization[mask] = visualization.max() # Bright markers 

1259 

1260 return visualization 

1261 

1262 

1263def _create_colocalization_map( 

1264 chan_1_img: cp.ndarray, 

1265 chan_2_img: cp.ndarray, 

1266 coloc_result: MultiChannelResult 

1267) -> cp.ndarray: 

1268 """Create colocalization visualization map.""" 

1269 # Create RGB-like visualization 

1270 coloc_map = cp.zeros_like(chan_1_img) 

1271 

1272 # Mark colocalized positions 

1273 for x, y in coloc_result.overlap_positions: 

1274 # Create markers for colocalized cells 

1275 rr, cc = cp.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]] 

1276 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius 

1277 

1278 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1]) 

1279 mask = mask & valid_mask 

1280 

1281 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers 

1282 

1283 return coloc_map