Coverage for openhcs/processing/backends/analysis/cell_counting_cpu.py: 14.5%

446 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2CPU-based cell counting and multi-channel colocalization analysis for OpenHCS. 

3 

4This module provides comprehensive cell counting capabilities using scikit-image, 

5supporting both single-channel and multi-channel analysis with various detection 

6methods and colocalization metrics. 

7""" 

8 

9import numpy as np 

10import logging 

11from typing import Dict, List, Tuple, Any, Optional, Union 

12from dataclasses import dataclass 

13from enum import Enum 

14 

15logger = logging.getLogger(__name__) 

16 

17# Core scientific computing imports 

18import pandas as pd 

19import json 

20from scipy import ndimage 

21from scipy.spatial.distance import cdist 

22from skimage.feature import blob_log, blob_dog, blob_doh, peak_local_max 

23from skimage.filters import threshold_otsu, threshold_li, gaussian, median 

24from skimage.segmentation import watershed, clear_border 

25from skimage.morphology import remove_small_objects, disk, binary_erosion, binary_dilation 

26from skimage.measure import label, regionprops 

27from skimage import exposure 

28 

29# OpenHCS imports 

30from openhcs.core.memory.decorators import numpy as numpy_func 

31from openhcs.core.pipeline.function_contracts import special_outputs 

32from openhcs.constants.constants import Backend 

33 

34 

35class DetectionMethod(Enum): 

36 """Cell detection methods available.""" 

37 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells) 

38 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose) 

39 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells) 

40 WATERSHED = "watershed" # Watershed segmentation (best for touching cells) 

41 THRESHOLD = "threshold" # Simple thresholding (fastest, basic) 

42 

43 

44class ColocalizationMethod(Enum): 

45 """Methods for multi-channel colocalization analysis.""" 

46 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap 

47 DISTANCE_BASED = "distance_based" # Based on centroid distances 

48 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation 

49 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients 

50 

51 

52class ThresholdMethod(Enum): 

53 """Automatic thresholding methods for watershed segmentation.""" 

54 OTSU = "otsu" # Otsu's method (good for bimodal histograms) 

55 LI = "li" # Li's method (good for low contrast images) 

56 MANUAL = "manual" # Manual threshold value (0.0-1.0) 

57 

58 

59 

60 

61 

62@dataclass 

63class CellCountResult: 

64 """Results for single-channel cell counting.""" 

65 slice_index: int 

66 method: str 

67 cell_count: int 

68 cell_positions: List[Tuple[float, float]] # (x, y) centroids 

69 cell_areas: List[float] 

70 cell_intensities: List[float] 

71 detection_confidence: List[float] 

72 parameters_used: Dict[str, Any] 

73 binary_mask: Optional[np.ndarray] = None # Actual binary mask for segmentation methods 

74 

75 

76@dataclass 

77class MultiChannelResult: 

78 """Results for multi-channel cell counting and colocalization.""" 

79 slice_index: int 

80 chan_1_results: CellCountResult 

81 chan_2_results: CellCountResult 

82 colocalization_method: str 

83 colocalized_count: int 

84 colocalization_percentage: float 

85 chan_1_only_count: int 

86 chan_2_only_count: int 

87 colocalization_metrics: Dict[str, float] 

88 overlap_positions: List[Tuple[float, float]] 

89 

90 

91def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager) -> str: 

92 """Materialize cell counting results as analysis-ready CSV and JSON formats.""" 

93 

94 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}") 

95 

96 # Determine if this is single-channel or multi-channel data 

97 if not data: 

98 logger.warning(f"🔬 CELL_COUNT_MATERIALIZE: No data to materialize") 

99 return path 

100 

101 is_multi_channel = isinstance(data[0], MultiChannelResult) 

102 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}") 

103 

104 if is_multi_channel: 

105 return _materialize_multi_channel_results(data, path, filemanager) 

106 else: 

107 return _materialize_single_channel_results(data, path, filemanager) 

108 

109 

110def materialize_segmentation_masks(data: List[np.ndarray], path: str, filemanager) -> str: 

111 """Materialize segmentation masks as individual TIFF files.""" 

112 

113 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, masks_count={len(data) if data else 0}") 

114 

115 if not data: 

116 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)") 

117 # Create empty summary file to indicate no masks were generated 

118 summary_path = path.replace('.pkl', '_segmentation_summary.txt') 

119 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n" 

120 filemanager.save(summary_content, summary_path, Backend.DISK.value) 

121 return summary_path 

122 

123 # Generate output file paths based on the input path 

124 base_path = path.replace('.pkl', '') 

125 

126 # Save each mask as a separate TIFF file 

127 for i, mask in enumerate(data): 

128 mask_filename = f"{base_path}_slice_{i:03d}.tif" 

129 

130 # Convert mask to appropriate dtype for saving (uint16 to match input images) 

131 if mask.dtype != np.uint16: 

132 # Normalize to uint16 range if needed 

133 if mask.max() <= 1.0: 

134 mask_uint16 = (mask * 65535).astype(np.uint16) 

135 else: 

136 mask_uint16 = mask.astype(np.uint16) 

137 else: 

138 mask_uint16 = mask 

139 

140 # Save using filemanager 

141 filemanager.save(mask_uint16, mask_filename, Backend.DISK.value) 

142 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename}") 

143 

144 # Return summary path 

145 summary_path = f"{base_path}_segmentation_summary.txt" 

146 summary_content = f"Segmentation masks saved: {len(data)} files\n" 

147 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n" 

148 summary_content += f"Mask dtype: {data[0].dtype}\n" 

149 summary_content += f"Mask shape: {data[0].shape}\n" 

150 

151 filemanager.save(summary_content, summary_path, Backend.DISK.value) 

152 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks") 

153 

154 return summary_path 

155 

156 

157@numpy_func 

158@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks)) 

159def count_cells_single_channel( 

160 image_stack: np.ndarray, 

161 # Detection method and parameters 

162 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

163 # Blob detection parameters 

164 min_sigma: float = 1.0, # Minimum blob size (pixels) 

165 max_sigma: float = 10.0, # Maximum blob size (pixels) 

166 num_sigma: int = 10, # Number of sigma values to test 

167 threshold: float = 0.1, # Detection threshold (0.0-1.0) 

168 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

169 # Watershed parameters 

170 watershed_footprint_size: int = 3, # Local maxima footprint size 

171 watershed_min_distance: int = 5, # Minimum distance between peaks 

172 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods 

173 # Preprocessing parameters 

174 enable_preprocessing: bool = True, 

175 gaussian_sigma: float = 1.0, # Gaussian blur sigma 

176 median_disk_size: int = 1, # Median filter disk size 

177 # Filtering parameters 

178 min_cell_area: int = 10, # Minimum cell area (pixels) 

179 max_cell_area: int = 1000, # Maximum cell area (pixels) 

180 remove_border_cells: bool = True, # Remove cells touching image border 

181 # Output parameters 

182 return_segmentation_mask: bool = False 

183) -> Tuple[np.ndarray, List[CellCountResult]]: 

184 """ 

185 Count cells in single-channel image stack using various detection methods. 

186  

187 Args: 

188 image_stack: 3D array (Z, Y, X) where each Z slice is processed independently 

189 detection_method: Method for cell detection (see DetectionMethod enum) 

190 min_sigma: Minimum blob size for blob detection methods 

191 max_sigma: Maximum blob size for blob detection methods 

192 num_sigma: Number of sigma values to test for blob detection 

193 threshold: Detection threshold (method-dependent) 

194 overlap: Maximum overlap between detected blobs 

195 watershed_footprint_size: Footprint size for local maxima detection 

196 watershed_min_distance: Minimum distance between watershed peaks 

197 watershed_threshold_method: Thresholding method for watershed 

198 enable_preprocessing: Apply Gaussian and median filtering 

199 gaussian_sigma: Standard deviation for Gaussian blur 

200 median_disk_size: Disk size for median filtering 

201 min_cell_area: Minimum area for valid cells 

202 max_cell_area: Maximum area for valid cells 

203 remove_border_cells: Remove cells touching image borders 

204 return_segmentation_mask: Return segmentation masks in output 

205  

206 Returns: 

207 output_stack: Original image stack unchanged (Z, Y, X) 

208 cell_count_results: List of CellCountResult objects for each slice 

209 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True 

210 """ 

211 if image_stack.ndim != 3: 

212 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

213 

214 results = [] 

215 segmentation_masks = [] 

216 

217 # Store parameters for reproducibility (convert enums to values) 

218 parameters = { 

219 "detection_method": detection_method.value, 

220 "min_sigma": min_sigma, 

221 "max_sigma": max_sigma, 

222 "num_sigma": num_sigma, 

223 "threshold": threshold, 

224 "overlap": overlap, 

225 "watershed_footprint_size": watershed_footprint_size, 

226 "watershed_min_distance": watershed_min_distance, 

227 "watershed_threshold_method": watershed_threshold_method.value, 

228 "gaussian_sigma": gaussian_sigma, 

229 "median_disk_size": median_disk_size, 

230 "min_cell_area": min_cell_area, 

231 "max_cell_area": max_cell_area, 

232 "remove_border_cells": remove_border_cells 

233 } 

234 

235 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method") 

236 

237 for z_idx in range(image_stack.shape[0]): 

238 slice_img = image_stack[z_idx].astype(np.float64) 

239 

240 # Apply preprocessing if enabled 

241 if enable_preprocessing: 

242 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size) 

243 

244 # Detect cells using specified method 

245 result = _detect_cells_single_method( 

246 slice_img, z_idx, detection_method.value, parameters 

247 ) 

248 

249 results.append(result) 

250 

251 # Create segmentation mask if requested 

252 if return_segmentation_mask: 

253 segmentation_mask = _create_segmentation_visualization( 

254 slice_img, result.cell_positions, max_sigma, result.cell_areas, result.binary_mask 

255 ) 

256 segmentation_masks.append(segmentation_mask) 

257 

258 # Always return segmentation masks (empty list if not requested) 

259 # This ensures consistent return signature for special outputs system 

260 return image_stack, results, segmentation_masks 

261 

262 

263@numpy_func 

264@special_outputs(("multi_channel_counts", materialize_cell_counts)) 

265def count_cells_multi_channel( 

266 image_stack: np.ndarray, 

267 chan_1: int, # Index of first channel (positional arg) 

268 chan_2: int, # Index of second channel (positional arg) 

269 # Detection parameters for channel 1 (all single-channel params available) 

270 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

271 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels) 

272 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels) 

273 chan_1_num_sigma: int = 10, # Number of sigma values to test 

274 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

275 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

276 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size 

277 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks 

278 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

279 chan_1_enable_preprocessing: bool = True, # Apply preprocessing 

280 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

281 chan_1_median_disk_size: int = 1, # Median filter disk size 

282 chan_1_min_area: int = 10, # Minimum cell area (pixels) 

283 chan_1_max_area: int = 1000, # Maximum cell area (pixels) 

284 chan_1_remove_border_cells: bool = True, # Remove cells touching border 

285 # Detection parameters for channel 2 (all single-channel params available) 

286 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

287 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels) 

288 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels) 

289 chan_2_num_sigma: int = 10, # Number of sigma values to test 

290 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

291 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

292 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size 

293 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks 

294 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

295 chan_2_enable_preprocessing: bool = True, # Apply preprocessing 

296 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

297 chan_2_median_disk_size: int = 1, # Median filter disk size 

298 chan_2_min_area: int = 10, # Minimum cell area (pixels) 

299 chan_2_max_area: int = 1000, # Maximum cell area (pixels) 

300 chan_2_remove_border_cells: bool = True, # Remove cells touching border 

301 # Colocalization parameters 

302 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods 

303 max_distance: float = 5.0, # Maximum distance for colocalization (pixels) 

304 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method 

305 intensity_threshold: float = 0.5, # Threshold for intensity-based methods 

306 # Output parameters 

307 return_colocalization_map: bool = False 

308) -> Tuple[np.ndarray, List[MultiChannelResult]]: 

309 """ 

310 Count cells in multi-channel image stack with colocalization analysis. 

311 

312 Each channel can be processed with independent parameters, providing the same 

313 flexibility as the single-channel function for each channel separately. 

314 

315 Args: 

316 image_stack: 3D array (Z, Y, X) where Z represents different channels 

317 chan_1: Index of first channel in the stack (positional) 

318 chan_2: Index of second channel in the stack (positional) 

319 

320 # Channel 1 detection parameters (same as single-channel function) 

321 chan_1_method: Detection method for channel 1 (DetectionMethod enum) 

322 chan_1_min_sigma: Minimum blob size for channel 1 

323 chan_1_max_sigma: Maximum blob size for channel 1 

324 chan_1_num_sigma: Number of sigma values to test for channel 1 

325 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0) 

326 chan_1_overlap: Maximum overlap between blobs for channel 1 

327 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1 

328 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1 

329 chan_1_watershed_threshold_method: Thresholding method for channel 1 

330 chan_1_enable_preprocessing: Apply preprocessing to channel 1 

331 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1 

332 chan_1_median_disk_size: Median filter size for channel 1 

333 chan_1_min_area: Minimum cell area for channel 1 

334 chan_1_max_area: Maximum cell area for channel 1 

335 chan_1_remove_border_cells: Remove border cells for channel 1 

336 

337 # Channel 2 detection parameters (same as single-channel function) 

338 chan_2_method: Detection method for channel 2 (DetectionMethod enum) 

339 chan_2_min_sigma: Minimum blob size for channel 2 

340 chan_2_max_sigma: Maximum blob size for channel 2 

341 chan_2_num_sigma: Number of sigma values to test for channel 2 

342 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0) 

343 chan_2_overlap: Maximum overlap between blobs for channel 2 

344 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2 

345 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2 

346 chan_2_watershed_threshold_method: Thresholding method for channel 2 

347 chan_2_enable_preprocessing: Apply preprocessing to channel 2 

348 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2 

349 chan_2_median_disk_size: Median filter size for channel 2 

350 chan_2_min_area: Minimum cell area for channel 2 

351 chan_2_max_area: Maximum cell area for channel 2 

352 chan_2_remove_border_cells: Remove border cells for channel 2 

353 

354 # Colocalization parameters 

355 colocalization_method: Method for determining colocalization (ColocalizationMethod enum) 

356 max_distance: Maximum distance for distance-based colocalization (pixels) 

357 min_overlap_area: Minimum overlap fraction for area-based colocalization 

358 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0) 

359 return_colocalization_map: Return colocalization visualization 

360 

361 Returns: 

362 output_stack: Original images or colocalization maps 

363 multi_channel_results: List of MultiChannelResult objects 

364 """ 

365 if image_stack.ndim != 3: 

366 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

367 

368 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]: 

369 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}") 

370 

371 if chan_1 == chan_2: 

372 raise ValueError("Channel 1 and Channel 2 must be different") 

373 

374 # Extract channel images 

375 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency 

376 chan_2_img = image_stack[chan_2:chan_2+1] 

377 

378 # Count cells in each channel separately using the single-channel function 

379 # Channel 1 parameters (all explicit) 

380 chan_1_params = { 

381 "detection_method": chan_1_method, 

382 "min_sigma": chan_1_min_sigma, 

383 "max_sigma": chan_1_max_sigma, 

384 "num_sigma": chan_1_num_sigma, 

385 "threshold": chan_1_threshold, 

386 "overlap": chan_1_overlap, 

387 "watershed_footprint_size": chan_1_watershed_footprint_size, 

388 "watershed_min_distance": chan_1_watershed_min_distance, 

389 "watershed_threshold_method": chan_1_watershed_threshold_method, 

390 "enable_preprocessing": chan_1_enable_preprocessing, 

391 "gaussian_sigma": chan_1_gaussian_sigma, 

392 "median_disk_size": chan_1_median_disk_size, 

393 "min_cell_area": chan_1_min_area, 

394 "max_cell_area": chan_1_max_area, 

395 "remove_border_cells": chan_1_remove_border_cells, 

396 "return_segmentation_mask": False 

397 } 

398 

399 # Channel 2 parameters (all explicit) 

400 chan_2_params = { 

401 "detection_method": chan_2_method, 

402 "min_sigma": chan_2_min_sigma, 

403 "max_sigma": chan_2_max_sigma, 

404 "num_sigma": chan_2_num_sigma, 

405 "threshold": chan_2_threshold, 

406 "overlap": chan_2_overlap, 

407 "watershed_footprint_size": chan_2_watershed_footprint_size, 

408 "watershed_min_distance": chan_2_watershed_min_distance, 

409 "watershed_threshold_method": chan_2_watershed_threshold_method, 

410 "enable_preprocessing": chan_2_enable_preprocessing, 

411 "gaussian_sigma": chan_2_gaussian_sigma, 

412 "median_disk_size": chan_2_median_disk_size, 

413 "min_cell_area": chan_2_min_area, 

414 "max_cell_area": chan_2_max_area, 

415 "remove_border_cells": chan_2_remove_border_cells, 

416 "return_segmentation_mask": False 

417 } 

418 

419 # Process each channel 

420 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params) 

421 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params) 

422 

423 # Perform colocalization analysis 

424 multi_results = [] 

425 output_stack = image_stack.copy() 

426 

427 # Since we're processing single slices from each channel, we only have one result each 

428 chan_1_result = chan_1_results[0] 

429 chan_2_result = chan_2_results[0] 

430 

431 # Analyze colocalization 

432 coloc_result = _analyze_colocalization( 

433 chan_1_result, chan_2_result, colocalization_method.value, 

434 max_distance, min_overlap_area, intensity_threshold 

435 ) 

436 

437 multi_results.append(coloc_result) 

438 

439 # Create colocalization visualization if requested 

440 if return_colocalization_map: 

441 coloc_map = _create_colocalization_map( 

442 image_stack[chan_1], image_stack[chan_2], coloc_result 

443 ) 

444 # Replace one of the channels with the colocalization map 

445 output_stack = np.stack([image_stack[chan_1], image_stack[chan_2], coloc_map]) 

446 

447 return output_stack, multi_results 

448 

449 

450def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager) -> str: 

451 """Materialize single-channel cell counting results.""" 

452 # Generate output file paths based on the input path 

453 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

454 base_path = path.replace('.pkl', '') 

455 json_path = f"{base_path}.json" 

456 csv_path = f"{base_path}_details.csv" 

457 

458 # Ensure output directory exists for disk backend 

459 from pathlib import Path 

460 from openhcs.constants.constants import Backend 

461 output_dir = Path(json_path).parent 

462 filemanager.ensure_directory(str(output_dir), Backend.DISK.value) 

463 

464 summary = { 

465 "analysis_type": "single_channel_cell_counting", 

466 "total_slices": len(data), 

467 "results_per_slice": [] 

468 } 

469 rows = [] 

470 

471 total_cells = 0 

472 for result in data: 

473 total_cells += result.cell_count 

474 

475 # Add to summary 

476 summary["results_per_slice"].append({ 

477 "slice_index": result.slice_index, 

478 "method": result.method, 

479 "cell_count": result.cell_count, 

480 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0, 

481 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0, 

482 "parameters": result.parameters_used 

483 }) 

484 

485 # Add individual cell data to CSV 

486 for i, (pos, area, intensity, confidence) in enumerate(zip( 

487 result.cell_positions, result.cell_areas, 

488 result.cell_intensities, result.detection_confidence 

489 )): 

490 rows.append({ 

491 'slice_index': result.slice_index, 

492 'cell_id': f"slice_{result.slice_index}_cell_{i}", 

493 'x_position': pos[0], 

494 'y_position': pos[1], 

495 'cell_area': area, 

496 'cell_intensity': intensity, 

497 'detection_confidence': confidence, 

498 'detection_method': result.method 

499 }) 

500 

501 summary["total_cells_all_slices"] = total_cells 

502 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0 

503 

504 # Save JSON summary (overwrite if exists) 

505 json_content = json.dumps(summary, indent=2, default=str) 

506 # Remove existing file if it exists using filemanager 

507 if filemanager.exists(json_path, Backend.DISK.value): 

508 filemanager.delete(json_path, Backend.DISK.value) 

509 filemanager.save(json_content, json_path, Backend.DISK.value) 

510 

511 # Save CSV details (overwrite if exists) 

512 if rows: 

513 df = pd.DataFrame(rows) 

514 csv_content = df.to_csv(index=False) 

515 # Remove existing file if it exists using filemanager 

516 if filemanager.exists(csv_path, Backend.DISK.value): 

517 filemanager.delete(csv_path, Backend.DISK.value) 

518 filemanager.save(csv_content, csv_path, Backend.DISK.value) 

519 

520 return json_path 

521 

522 

523def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager) -> str: 

524 """Materialize multi-channel cell counting and colocalization results.""" 

525 # Generate output file paths based on the input path 

526 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

527 base_path = path.replace('.pkl', '') 

528 json_path = f"{base_path}.json" 

529 csv_path = f"{base_path}_details.csv" 

530 

531 # Ensure output directory exists for disk backend 

532 from pathlib import Path 

533 output_dir = Path(json_path).parent 

534 filemanager.ensure_directory(str(output_dir), Backend.DISK.value) 

535 

536 summary = { 

537 "analysis_type": "multi_channel_cell_counting_colocalization", 

538 "total_slices": len(data), 

539 "colocalization_summary": { 

540 "total_chan_1_cells": 0, 

541 "total_chan_2_cells": 0, 

542 "total_colocalized": 0, 

543 "average_colocalization_percentage": 0 

544 }, 

545 "results_per_slice": [] 

546 } 

547 

548 # CSV for detailed analysis (csv_path already defined above) 

549 rows = [] 

550 

551 total_coloc_pct = 0 

552 for result in data: 

553 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count 

554 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count 

555 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count 

556 total_coloc_pct += result.colocalization_percentage 

557 

558 # Add to summary 

559 summary["results_per_slice"].append({ 

560 "slice_index": result.slice_index, 

561 "chan_1_count": result.chan_1_results.cell_count, 

562 "chan_2_count": result.chan_2_results.cell_count, 

563 "colocalized_count": result.colocalized_count, 

564 "colocalization_percentage": result.colocalization_percentage, 

565 "chan_1_only": result.chan_1_only_count, 

566 "chan_2_only": result.chan_2_only_count, 

567 "colocalization_method": result.colocalization_method, 

568 "colocalization_metrics": result.colocalization_metrics 

569 }) 

570 

571 # Add colocalization details to CSV 

572 for i, pos in enumerate(result.overlap_positions): 

573 rows.append({ 

574 'slice_index': result.slice_index, 

575 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}", 

576 'x_position': pos[0], 

577 'y_position': pos[1], 

578 'colocalization_method': result.colocalization_method 

579 }) 

580 

581 summary["colocalization_summary"]["average_colocalization_percentage"] = ( 

582 total_coloc_pct / len(data) if data else 0 

583 ) 

584 

585 # Save JSON summary (overwrite if exists) 

586 json_content = json.dumps(summary, indent=2, default=str) 

587 # Remove existing file if it exists using filemanager 

588 if filemanager.exists(json_path, Backend.DISK.value): 

589 filemanager.delete(json_path, Backend.DISK.value) 

590 filemanager.save(json_content, json_path, Backend.DISK.value) 

591 

592 # Save CSV details (overwrite if exists) 

593 if rows: 

594 df = pd.DataFrame(rows) 

595 csv_content = df.to_csv(index=False) 

596 # Remove existing file if it exists using filemanager 

597 if filemanager.exists(csv_path, Backend.DISK.value): 

598 filemanager.delete(csv_path, Backend.DISK.value) 

599 filemanager.save(csv_content, csv_path, Backend.DISK.value) 

600 

601 return json_path 

602 

603 

604def _preprocess_image(image: np.ndarray, gaussian_sigma: float, median_disk_size: int) -> np.ndarray: 

605 """Apply preprocessing to enhance cell detection.""" 

606 # Gaussian blur to reduce noise 

607 if gaussian_sigma > 0: 

608 image = gaussian(image, sigma=gaussian_sigma, preserve_range=True) 

609 

610 # Median filter to remove salt-and-pepper noise 

611 if median_disk_size > 0: 

612 image = median(image, disk(median_disk_size)) 

613 

614 return image 

615 

616 

617def _detect_cells_single_method( 

618 image: np.ndarray, 

619 slice_idx: int, 

620 method: str, 

621 params: Dict[str, Any] 

622) -> CellCountResult: 

623 """Detect cells using specified method.""" 

624 

625 if method == DetectionMethod.BLOB_LOG.value: 

626 return _detect_cells_blob_log(image, slice_idx, params) 

627 elif method == DetectionMethod.BLOB_DOG.value: 

628 return _detect_cells_blob_dog(image, slice_idx, params) 

629 elif method == DetectionMethod.BLOB_DOH.value: 

630 return _detect_cells_blob_doh(image, slice_idx, params) 

631 elif method == DetectionMethod.WATERSHED.value: 

632 return _detect_cells_watershed(image, slice_idx, params) 

633 elif method == DetectionMethod.THRESHOLD.value: 

634 return _detect_cells_threshold(image, slice_idx, params) 

635 else: 

636 raise ValueError(f"Unknown detection method: {method}") 

637 

638 

639def _detect_cells_blob_log(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

640 """Detect cells using Laplacian of Gaussian blob detection.""" 

641 blobs = blob_log( 

642 image, 

643 min_sigma=params["min_sigma"], 

644 max_sigma=params["max_sigma"], 

645 num_sigma=params["num_sigma"], 

646 threshold=params["threshold"], 

647 overlap=params["overlap"] 

648 ) 

649 

650 # Extract positions, areas, and intensities 

651 positions = [] 

652 areas = [] 

653 intensities = [] 

654 confidences = [] 

655 

656 for blob in blobs: 

657 y, x, sigma = blob 

658 positions.append((float(x), float(y))) 

659 

660 # Estimate area from sigma (blob radius ≈ sigma * sqrt(2)) 

661 radius = sigma * np.sqrt(2) 

662 area = np.pi * radius**2 

663 areas.append(float(area)) 

664 

665 # Sample intensity at blob center 

666 intensity = float(image[int(y), int(x)]) 

667 intensities.append(intensity) 

668 

669 # Use sigma as confidence measure (larger blobs = higher confidence) 

670 confidence = float(sigma / params["max_sigma"]) 

671 confidences.append(confidence) 

672 

673 # Filter by area constraints 

674 filtered_data = _filter_by_area( 

675 positions, areas, intensities, confidences, 

676 params["min_cell_area"], params["max_cell_area"] 

677 ) 

678 

679 return CellCountResult( 

680 slice_index=slice_idx, 

681 method="blob_log", 

682 cell_count=len(filtered_data[0]), 

683 cell_positions=filtered_data[0], 

684 cell_areas=filtered_data[1], 

685 cell_intensities=filtered_data[2], 

686 detection_confidence=filtered_data[3], 

687 parameters_used=params 

688 ) 

689 

690 

691def _detect_cells_blob_dog(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

692 """Detect cells using Difference of Gaussian blob detection.""" 

693 blobs = blob_dog( 

694 image, 

695 min_sigma=params["min_sigma"], 

696 max_sigma=params["max_sigma"], 

697 threshold=params["threshold"], 

698 overlap=params["overlap"] 

699 ) 

700 

701 # Process similar to blob_log 

702 positions = [] 

703 areas = [] 

704 intensities = [] 

705 confidences = [] 

706 

707 for blob in blobs: 

708 y, x, sigma = blob 

709 positions.append((float(x), float(y))) 

710 

711 radius = sigma * np.sqrt(2) 

712 area = np.pi * radius**2 

713 areas.append(float(area)) 

714 

715 intensity = float(image[int(y), int(x)]) 

716 intensities.append(intensity) 

717 

718 confidence = float(sigma / params["max_sigma"]) 

719 confidences.append(confidence) 

720 

721 filtered_data = _filter_by_area( 

722 positions, areas, intensities, confidences, 

723 params["min_cell_area"], params["max_cell_area"] 

724 ) 

725 

726 return CellCountResult( 

727 slice_index=slice_idx, 

728 method="blob_dog", 

729 cell_count=len(filtered_data[0]), 

730 cell_positions=filtered_data[0], 

731 cell_areas=filtered_data[1], 

732 cell_intensities=filtered_data[2], 

733 detection_confidence=filtered_data[3], 

734 parameters_used=params 

735 ) 

736 

737 

738def _detect_cells_blob_doh(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

739 """Detect cells using Determinant of Hessian blob detection.""" 

740 blobs = blob_doh( 

741 image, 

742 min_sigma=params["min_sigma"], 

743 max_sigma=params["max_sigma"], 

744 num_sigma=params["num_sigma"], 

745 threshold=params["threshold"], 

746 overlap=params["overlap"] 

747 ) 

748 

749 # Process similar to other blob methods 

750 positions = [] 

751 areas = [] 

752 intensities = [] 

753 confidences = [] 

754 

755 for blob in blobs: 

756 y, x, sigma = blob 

757 positions.append((float(x), float(y))) 

758 

759 radius = sigma * np.sqrt(2) 

760 area = np.pi * radius**2 

761 areas.append(float(area)) 

762 

763 intensity = float(image[int(y), int(x)]) 

764 intensities.append(intensity) 

765 

766 confidence = float(sigma / params["max_sigma"]) 

767 confidences.append(confidence) 

768 

769 filtered_data = _filter_by_area( 

770 positions, areas, intensities, confidences, 

771 params["min_cell_area"], params["max_cell_area"] 

772 ) 

773 

774 return CellCountResult( 

775 slice_index=slice_idx, 

776 method="blob_doh", 

777 cell_count=len(filtered_data[0]), 

778 cell_positions=filtered_data[0], 

779 cell_areas=filtered_data[1], 

780 cell_intensities=filtered_data[2], 

781 detection_confidence=filtered_data[3], 

782 parameters_used=params 

783 ) 

784 

785 

786def _filter_by_area( 

787 positions: List[Tuple[float, float]], 

788 areas: List[float], 

789 intensities: List[float], 

790 confidences: List[float], 

791 min_area: float, 

792 max_area: float 

793) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]: 

794 """Filter detected cells by area constraints.""" 

795 filtered_positions = [] 

796 filtered_areas = [] 

797 filtered_intensities = [] 

798 filtered_confidences = [] 

799 

800 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences): 

801 if min_area <= area <= max_area: 

802 filtered_positions.append(pos) 

803 filtered_areas.append(area) 

804 filtered_intensities.append(intensity) 

805 filtered_confidences.append(confidence) 

806 

807 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences 

808 

809 

810def _detect_cells_watershed(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

811 """Detect cells using watershed segmentation.""" 

812 # Determine threshold 

813 if params["watershed_threshold_method"] == "otsu": 

814 threshold_val = threshold_otsu(image) 

815 elif params["watershed_threshold_method"] == "li": 

816 threshold_val = threshold_li(image) 

817 else: 

818 threshold_val = float(params["watershed_threshold_method"]) 

819 

820 # Create binary mask 

821 binary = image > threshold_val 

822 

823 # Remove small objects and border objects 

824 binary = remove_small_objects(binary, min_size=params["min_cell_area"]) 

825 if params["remove_border_cells"]: 

826 binary = clear_border(binary) 

827 

828 # Find local maxima as seeds 

829 distance = ndimage.distance_transform_edt(binary) 

830 local_maxima = peak_local_max( 

831 distance, 

832 min_distance=params["watershed_min_distance"], 

833 footprint=np.ones((params["watershed_footprint_size"], params["watershed_footprint_size"])) 

834 ) 

835 

836 # Convert coordinates to binary mask 

837 local_maxima_mask = np.zeros_like(distance, dtype=bool) 

838 if len(local_maxima) > 0: 

839 local_maxima_mask[local_maxima[:, 0], local_maxima[:, 1]] = True 

840 

841 # Create markers for watershed 

842 # Convert boolean mask to integer labels for connected components 

843 markers = label(local_maxima_mask.astype(np.uint8)) 

844 

845 # Apply watershed 

846 labels = watershed(-distance, markers, mask=binary) 

847 

848 # Extract region properties 

849 regions = regionprops(labels, intensity_image=image) 

850 

851 positions = [] 

852 areas = [] 

853 intensities = [] 

854 confidences = [] 

855 valid_labels = [] # Track which labels pass the size filter 

856 

857 for region in regions: 

858 # Filter by area 

859 if params["min_cell_area"] <= region.area <= params["max_cell_area"]: 

860 # Centroid (note: regionprops returns (row, col) = (y, x)) 

861 y, x = region.centroid 

862 positions.append((float(x), float(y))) 

863 

864 areas.append(float(region.area)) 

865 intensities.append(float(region.mean_intensity)) 

866 

867 # Use area as confidence measure (normalized) 

868 confidence = min(1.0, region.area / params["max_cell_area"]) 

869 confidences.append(confidence) 

870 

871 # Track this label as valid 

872 valid_labels.append(region.label) 

873 

874 # Create filtered binary mask with only cells that passed size filter 

875 filtered_binary_mask = np.isin(labels, valid_labels) 

876 

877 return CellCountResult( 

878 slice_index=slice_idx, 

879 method="watershed", 

880 cell_count=len(positions), 

881 cell_positions=positions, 

882 cell_areas=areas, 

883 cell_intensities=intensities, 

884 detection_confidence=confidences, 

885 parameters_used=params, 

886 binary_mask=filtered_binary_mask # Only cells that passed all filters 

887 ) 

888 

889 

890def _detect_cells_threshold(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

891 """Detect cells using simple thresholding and connected components.""" 

892 # Apply threshold 

893 binary = image > params["threshold"] * image.max() 

894 

895 # Remove small objects and border objects 

896 binary = remove_small_objects(binary, min_size=params["min_cell_area"]) 

897 if params["remove_border_cells"]: 

898 binary = clear_border(binary) 

899 

900 # Label connected components 

901 labels = label(binary) 

902 regions = regionprops(labels, intensity_image=image) 

903 

904 positions = [] 

905 areas = [] 

906 intensities = [] 

907 confidences = [] 

908 valid_labels = [] # Track which labels pass the size filter 

909 

910 for region in regions: 

911 # Filter by area 

912 if params["min_cell_area"] <= region.area <= params["max_cell_area"]: 

913 y, x = region.centroid 

914 positions.append((float(x), float(y))) 

915 

916 areas.append(float(region.area)) 

917 intensities.append(float(region.mean_intensity)) 

918 

919 # Use intensity as confidence measure 

920 confidence = float(region.mean_intensity / image.max()) 

921 confidences.append(confidence) 

922 

923 # Track this label as valid 

924 valid_labels.append(region.label) 

925 

926 # Create filtered binary mask with only cells that passed size filter 

927 filtered_binary_mask = np.isin(labels, valid_labels) 

928 

929 return CellCountResult( 

930 slice_index=slice_idx, 

931 method="threshold", 

932 cell_count=len(positions), 

933 cell_positions=positions, 

934 cell_areas=areas, 

935 cell_intensities=intensities, 

936 detection_confidence=confidences, 

937 parameters_used=params, 

938 binary_mask=filtered_binary_mask # Only cells that passed all filters 

939 ) 

940 

941 

942def _analyze_colocalization( 

943 chan_1_result: CellCountResult, 

944 chan_2_result: CellCountResult, 

945 method: str, 

946 max_distance: float, 

947 min_overlap_area: float, 

948 intensity_threshold: float 

949) -> MultiChannelResult: 

950 """Analyze colocalization between two channels.""" 

951 

952 if method == ColocalizationMethod.DISTANCE_BASED.value: 

953 return _colocalization_distance_based( 

954 chan_1_result, chan_2_result, max_distance 

955 ) 

956 elif method == ColocalizationMethod.OVERLAP_AREA.value: 

957 return _colocalization_overlap_based( 

958 chan_1_result, chan_2_result, min_overlap_area 

959 ) 

960 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value: 

961 return _colocalization_intensity_based( 

962 chan_1_result, chan_2_result, intensity_threshold 

963 ) 

964 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value: 

965 return _colocalization_manders( 

966 chan_1_result, chan_2_result, intensity_threshold 

967 ) 

968 else: 

969 raise ValueError(f"Unknown colocalization method: {method}") 

970 

971 

972def _colocalization_distance_based( 

973 chan_1_result: CellCountResult, 

974 chan_2_result: CellCountResult, 

975 max_distance: float 

976) -> MultiChannelResult: 

977 """Perform distance-based colocalization analysis.""" 

978 

979 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

980 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based") 

981 

982 # Convert positions to arrays 

983 pos_1 = np.array(chan_1_result.cell_positions) 

984 pos_2 = np.array(chan_2_result.cell_positions) 

985 

986 # Calculate pairwise distances 

987 distances = cdist(pos_1, pos_2) 

988 

989 # Find colocalized pairs 

990 colocalized_pairs = [] 

991 used_chan_2 = set() 

992 

993 for i in range(len(pos_1)): 

994 # Find closest cell in channel 2 

995 min_dist_idx = np.argmin(distances[i]) 

996 min_dist = distances[i, min_dist_idx] 

997 

998 # Check if within distance threshold and not already used 

999 if min_dist <= max_distance and min_dist_idx not in used_chan_2: 

1000 colocalized_pairs.append((i, min_dist_idx)) 

1001 used_chan_2.add(min_dist_idx) 

1002 

1003 # Calculate metrics 

1004 colocalized_count = len(colocalized_pairs) 

1005 total_cells = len(pos_1) + len(pos_2) 

1006 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1007 

1008 chan_1_only = len(pos_1) - colocalized_count 

1009 chan_2_only = len(pos_2) - colocalized_count 

1010 

1011 # Extract colocalized positions (average of paired positions) 

1012 overlap_positions = [] 

1013 for i, j in colocalized_pairs: 

1014 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist() 

1015 overlap_positions.append(tuple(avg_pos)) 

1016 

1017 # Calculate additional metrics 

1018 if colocalized_pairs: 

1019 avg_distance = np.mean([distances[i, j] for i, j in colocalized_pairs]) 

1020 max_distance_found = np.max([distances[i, j] for i, j in colocalized_pairs]) 

1021 else: 

1022 avg_distance = 0 

1023 max_distance_found = 0 

1024 

1025 metrics = { 

1026 "average_colocalization_distance": float(avg_distance), 

1027 "max_colocalization_distance": float(max_distance_found), 

1028 "distance_threshold_used": max_distance 

1029 } 

1030 

1031 return MultiChannelResult( 

1032 slice_index=chan_1_result.slice_index, 

1033 chan_1_results=chan_1_result, 

1034 chan_2_results=chan_2_result, 

1035 colocalization_method="distance_based", 

1036 colocalized_count=colocalized_count, 

1037 colocalization_percentage=colocalization_percentage, 

1038 chan_1_only_count=chan_1_only, 

1039 chan_2_only_count=chan_2_only, 

1040 colocalization_metrics=metrics, 

1041 overlap_positions=overlap_positions 

1042 ) 

1043 

1044 

1045def _create_empty_coloc_result( 

1046 chan_1_result: CellCountResult, 

1047 chan_2_result: CellCountResult, 

1048 method: str 

1049) -> MultiChannelResult: 

1050 """Create empty colocalization result when no cells found.""" 

1051 return MultiChannelResult( 

1052 slice_index=chan_1_result.slice_index, 

1053 chan_1_results=chan_1_result, 

1054 chan_2_results=chan_2_result, 

1055 colocalization_method=method, 

1056 colocalized_count=0, 

1057 colocalization_percentage=0.0, 

1058 chan_1_only_count=chan_1_result.cell_count, 

1059 chan_2_only_count=chan_2_result.cell_count, 

1060 colocalization_metrics={}, 

1061 overlap_positions=[] 

1062 ) 

1063 

1064 

1065def _colocalization_overlap_based( 

1066 chan_1_result: CellCountResult, 

1067 chan_2_result: CellCountResult, 

1068 min_overlap_area: float 

1069) -> MultiChannelResult: 

1070 """Perform area overlap-based colocalization analysis.""" 

1071 # This is a simplified implementation - in practice, you'd need actual segmentation masks 

1072 # For now, we'll use distance as a proxy for overlap 

1073 

1074 # Use distance-based method with smaller threshold as approximation 

1075 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly 

1076 

1077 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold) 

1078 result.colocalization_method = "overlap_area" 

1079 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area 

1080 result.colocalization_metrics["note"] = "Approximated using distance-based method" 

1081 

1082 return result 

1083 

1084 

1085def _colocalization_intensity_based( 

1086 chan_1_result: CellCountResult, 

1087 chan_2_result: CellCountResult, 

1088 intensity_threshold: float 

1089) -> MultiChannelResult: 

1090 """Perform intensity correlation-based colocalization analysis.""" 

1091 

1092 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1093 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation") 

1094 

1095 # Use distance-based pairing first 

1096 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0) 

1097 

1098 # Filter pairs based on intensity correlation 

1099 colocalized_pairs = [] 

1100 overlap_positions = [] 

1101 

1102 pos_1 = np.array(chan_1_result.cell_positions) 

1103 pos_2 = np.array(chan_2_result.cell_positions) 

1104 

1105 for i, (x1, y1) in enumerate(chan_1_result.cell_positions): 

1106 for j, (x2, y2) in enumerate(chan_2_result.cell_positions): 

1107 # Calculate distance 

1108 dist = np.sqrt((x1 - x2)**2 + (y1 - y2)**2) 

1109 

1110 if dist <= 5.0: # Within reasonable distance 

1111 # Check intensity correlation 

1112 int_1 = chan_1_result.cell_intensities[i] 

1113 int_2 = chan_2_result.cell_intensities[j] 

1114 

1115 # Simple intensity correlation: both above threshold 

1116 if int_1 >= intensity_threshold and int_2 >= intensity_threshold: 

1117 colocalized_pairs.append((i, j)) 

1118 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2) 

1119 overlap_positions.append(avg_pos) 

1120 break # One-to-one mapping 

1121 

1122 colocalized_count = len(colocalized_pairs) 

1123 total_cells = len(pos_1) + len(pos_2) 

1124 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1125 

1126 metrics = { 

1127 "intensity_threshold_used": intensity_threshold, 

1128 "correlation_method": "threshold_based" 

1129 } 

1130 

1131 return MultiChannelResult( 

1132 slice_index=chan_1_result.slice_index, 

1133 chan_1_results=chan_1_result, 

1134 chan_2_results=chan_2_result, 

1135 colocalization_method="intensity_correlation", 

1136 colocalized_count=colocalized_count, 

1137 colocalization_percentage=colocalization_percentage, 

1138 chan_1_only_count=len(pos_1) - colocalized_count, 

1139 chan_2_only_count=len(pos_2) - colocalized_count, 

1140 colocalization_metrics=metrics, 

1141 overlap_positions=overlap_positions 

1142 ) 

1143 

1144 

1145def _colocalization_manders( 

1146 chan_1_result: CellCountResult, 

1147 chan_2_result: CellCountResult, 

1148 intensity_threshold: float 

1149) -> MultiChannelResult: 

1150 """Calculate Manders colocalization coefficients.""" 

1151 

1152 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1153 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients") 

1154 

1155 # Simplified Manders calculation based on detected cells 

1156 # In practice, this would use pixel-level intensity analysis 

1157 

1158 # Use intensity-based method as foundation 

1159 intensity_result = _colocalization_intensity_based( 

1160 chan_1_result, chan_2_result, intensity_threshold 

1161 ) 

1162 

1163 # Calculate Manders-like coefficients 

1164 total_int_1 = sum(chan_1_result.cell_intensities) 

1165 total_int_2 = sum(chan_2_result.cell_intensities) 

1166 

1167 # Simplified: assume colocalized cells contribute their full intensity 

1168 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in 

1169 [(i, j) for i in range(len(chan_1_result.cell_positions)) 

1170 for j in range(len(chan_2_result.cell_positions)) 

1171 if (i, j) in [(0, 0)]]) # Simplified placeholder 

1172 

1173 # Manders coefficients (M1 and M2) 

1174 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0 

1175 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified 

1176 

1177 intensity_result.colocalization_method = "manders_coefficients" 

1178 intensity_result.colocalization_metrics.update({ 

1179 "manders_m1": m1, 

1180 "manders_m2": m2, 

1181 "note": "Simplified cell-based Manders calculation" 

1182 }) 

1183 

1184 return intensity_result 

1185 

1186 

1187def _create_segmentation_visualization( 

1188 image: np.ndarray, 

1189 positions: List[Tuple[float, float]], 

1190 max_sigma: float, 

1191 cell_areas: List[float] = None, 

1192 binary_mask: np.ndarray = None 

1193) -> np.ndarray: 

1194 """Create segmentation visualization using actual binary mask if available.""" 

1195 

1196 # If we have the actual binary mask from detection, use it directly 

1197 if binary_mask is not None: 

1198 # Convert boolean mask to uint16 to match input image dtype 

1199 # Use max intensity for detected cells, 0 for background 

1200 max_intensity = image.max() if image.max() > 0 else 65535 

1201 return (binary_mask * max_intensity).astype(image.dtype) 

1202 

1203 # Fallback to original circular marker approach for blob methods 

1204 visualization = image.copy() 

1205 

1206 # Mark detected cells with their actual sizes 

1207 for i, (x, y) in enumerate(positions): 

1208 # Use actual cell area if available, otherwise fall back to max_sigma 

1209 if cell_areas and i < len(cell_areas): 

1210 # Convert area to radius (assuming circular cells) 

1211 radius = np.sqrt(cell_areas[i] / np.pi) 

1212 else: 

1213 # Fallback to max_sigma for backward compatibility 

1214 radius = max_sigma * 2 

1215 

1216 # Create circular markers with actual cell size 

1217 rr, cc = np.ogrid[:image.shape[0], :image.shape[1]] 

1218 mask = (rr - y)**2 + (cc - x)**2 <= radius**2 

1219 

1220 # Ensure indices are within bounds 

1221 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1]) 

1222 mask = mask & valid_mask 

1223 

1224 visualization[mask] = visualization.max() # Bright markers 

1225 

1226 return visualization 

1227 

1228 

1229def _create_colocalization_map( 

1230 chan_1_img: np.ndarray, 

1231 chan_2_img: np.ndarray, 

1232 coloc_result: MultiChannelResult 

1233) -> np.ndarray: 

1234 """Create colocalization visualization map.""" 

1235 # Create RGB-like visualization 

1236 coloc_map = np.zeros_like(chan_1_img) 

1237 

1238 # Mark colocalized positions 

1239 for x, y in coloc_result.overlap_positions: 

1240 # Create markers for colocalized cells 

1241 rr, cc = np.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]] 

1242 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius 

1243 

1244 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1]) 

1245 mask = mask & valid_mask 

1246 

1247 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers 

1248 

1249 return coloc_map