Coverage for openhcs/processing/backends/analysis/cell_counting_cpu.py: 38.8%

496 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2CPU-based cell counting and multi-channel colocalization analysis for OpenHCS. 

3 

4This module provides comprehensive cell counting capabilities using scikit-image, 

5supporting both single-channel and multi-channel analysis with various detection 

6methods and colocalization metrics. 

7""" 

8 

9import numpy as np 

10import logging 

11from typing import Dict, List, Tuple, Any, Optional, Union 

12from dataclasses import dataclass 

13from enum import Enum 

14 

15logger = logging.getLogger(__name__) 

16 

17# Core scientific computing imports 

18import pandas as pd 

19import json 

20from scipy import ndimage 

21from scipy.spatial.distance import cdist 

22from skimage.feature import blob_log, blob_dog, blob_doh, peak_local_max 

23from skimage.filters import threshold_otsu, threshold_li, gaussian, median 

24from skimage.segmentation import watershed, clear_border 

25from skimage.morphology import remove_small_objects, disk 

26from skimage.measure import label, regionprops 

27 

28# OpenHCS imports 

29from openhcs.core.memory.decorators import numpy as numpy_func 

30from openhcs.core.pipeline.function_contracts import special_outputs 

31from openhcs.constants.constants import Backend 

32 

33 

34class DetectionMethod(Enum): 

35 """Cell detection methods available.""" 

36 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells) 

37 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose) 

38 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells) 

39 WATERSHED = "watershed" # Watershed segmentation (best for touching cells) 

40 THRESHOLD = "threshold" # Simple thresholding (fastest, basic) 

41 

42 

43class ColocalizationMethod(Enum): 

44 """Methods for multi-channel colocalization analysis.""" 

45 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap 

46 DISTANCE_BASED = "distance_based" # Based on centroid distances 

47 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation 

48 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients 

49 

50 

51class ThresholdMethod(Enum): 

52 """Automatic thresholding methods for watershed segmentation.""" 

53 OTSU = "otsu" # Otsu's method (good for bimodal histograms) 

54 LI = "li" # Li's method (good for low contrast images) 

55 MANUAL = "manual" # Manual threshold value (0.0-1.0) 

56 

57 

58 

59 

60 

61@dataclass 

62class CellCountResult: 

63 """Results for single-channel cell counting.""" 

64 slice_index: int 

65 method: str 

66 cell_count: int 

67 cell_positions: List[Tuple[float, float]] # (x, y) centroids 

68 cell_areas: List[float] 

69 cell_intensities: List[float] 

70 detection_confidence: List[float] 

71 parameters_used: Dict[str, Any] 

72 binary_mask: Optional[np.ndarray] = None # Actual binary mask for segmentation methods 

73 

74 

75@dataclass 

76class MultiChannelResult: 

77 """Results for multi-channel cell counting and colocalization.""" 

78 slice_index: int 

79 chan_1_results: CellCountResult 

80 chan_2_results: CellCountResult 

81 colocalization_method: str 

82 colocalized_count: int 

83 colocalization_percentage: float 

84 chan_1_only_count: int 

85 chan_2_only_count: int 

86 colocalization_metrics: Dict[str, float] 

87 overlap_positions: List[Tuple[float, float]] 

88 

89 

90def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager, backends: Union[str, List[str]], backend_kwargs: dict = None) -> str: 

91 """Materialize cell counting results as analysis-ready CSV and JSON formats. 

92 

93 Args: 

94 data: List of cell count results (single or multi-channel) 

95 path: Output path for results 

96 filemanager: FileManager instance for I/O operations 

97 backends: Single backend string or list of backends to save to 

98 backend_kwargs: Dict mapping backend names to their kwargs (e.g., {'fiji_stream': {'port': 5560}}) 

99 

100 Returns: 

101 Path to the saved results file (first backend in list) 

102 """ 

103 # Normalize backends to list 

104 if isinstance(backends, str): 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 backends = [backends] 

106 

107 if backend_kwargs is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 backend_kwargs = {} 

109 

110 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}, backends={backends}") 

111 

112 # Determine if this is single-channel or multi-channel data 

113 if not data: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 logger.warning("🔬 CELL_COUNT_MATERIALIZE: No data to materialize") 

115 return path 

116 

117 is_multi_channel = isinstance(data[0], MultiChannelResult) 

118 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}") 

119 

120 if is_multi_channel: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 return _materialize_multi_channel_results(data, path, filemanager, backends, backend_kwargs) 

122 else: 

123 return _materialize_single_channel_results(data, path, filemanager, backends, backend_kwargs) 

124 

125 

126def materialize_segmentation_masks(data: List[np.ndarray], path: str, filemanager, backends: Union[str, List[str]], backend_kwargs: dict = None) -> str: 

127 """Materialize segmentation masks as ROIs and summary. 

128 

129 Extracts ROIs from labeled masks and saves them via backend-specific format: 

130 - OMERO: Creates omero.model.RoiI objects linked to images 

131 - Napari: Streams shapes layer via ZMQ 

132 - Fiji: Streams ImageJ ROIs via ZMQ 

133 - Disk: Saves JSON file with ROI data 

134 

135 Args: 

136 data: List of labeled mask arrays (one per z-plane) 

137 path: Output path for ROI data 

138 filemanager: FileManager instance for I/O operations 

139 backends: Single backend string or list of backends to save to 

140 backend_kwargs: Dict mapping backend names to their kwargs (e.g., {'fiji_stream': {'port': 5560}}) 

141 

142 Returns: 

143 Path to the saved ROI file (first backend in list) 

144 """ 

145 # Normalize backends to list 

146 if isinstance(backends, str): 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 backends = [backends] 

148 

149 if backend_kwargs is None: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 backend_kwargs = {} 

151 

152 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, masks_count={len(data) if data else 0}, backends={backends}") 

153 

154 if not data: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 logger.info("🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)") 

156 # Create empty summary file to indicate no masks were generated 

157 summary_path = path.replace('.pkl', '_segmentation_summary.txt') 

158 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n" 

159 # Save to all backends 

160 for backend in backends: 

161 filemanager.save(summary_content, summary_path, backend) 

162 return summary_path 

163 

164 # Extract ROIs from labeled masks (once for all backends) 

165 from openhcs.core.roi import extract_rois_from_labeled_mask 

166 

167 all_rois = [] 

168 total_cells = 0 

169 for z_idx, mask in enumerate(data): 

170 rois = extract_rois_from_labeled_mask( 

171 mask, 

172 min_area=10, # Skip tiny regions 

173 extract_contours=True # Extract polygon contours 

174 ) 

175 all_rois.extend(rois) 

176 total_cells += len(rois) 

177 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Extracted {len(rois)} ROIs from z-plane {z_idx}") 

178 

179 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Extracted {total_cells} total ROIs from {len(data)} z-planes") 

180 

181 # Save ROIs to all backends (streaming backends convert to shapes/ROI objects) 

182 base_path = path.replace('.pkl', '').replace('.roi.zip', '') 

183 roi_path = f"{base_path}_rois.roi.zip" # Extension determines format 

184 

185 if all_rois: 185 ↛ 192line 185 didn't jump to line 192 because the condition on line 185 was always true

186 for backend in backends: 

187 # Get kwargs for this backend (empty dict if not specified) 

188 kwargs = backend_kwargs.get(backend, {}) 

189 filemanager.save(all_rois, roi_path, backend, **kwargs) 

190 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Saved {len(all_rois)} ROIs to {backend} backend") 

191 else: 

192 logger.warning(f"🔬 SEGMENTATION_MATERIALIZE: No ROIs extracted (all regions below min_area threshold)") 

193 

194 # Save summary to all backends (backends will ignore if they don't support text data) 

195 summary_path = f"{base_path}_segmentation_summary.txt" 

196 summary_content = f"Segmentation ROIs: {len(all_rois)} cells\n" 

197 summary_content += f"Z-planes: {len(data)}\n" 

198 if all_rois: 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true

199 summary_content += f"ROI file: {roi_path}\n" 

200 else: 

201 summary_content += f"No ROIs extracted (all regions below min_area threshold)\n" 

202 

203 for backend in backends: 

204 # Get kwargs for this backend (empty dict if not specified) 

205 kwargs = backend_kwargs.get(backend, {}) 

206 filemanager.save(summary_content, summary_path, backend, **kwargs) 

207 

208 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved to {len(backends)} backends") 

209 

210 return summary_path 

211 

212 

213@numpy_func 

214@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks)) 

215def count_cells_single_channel( 

216 image_stack: np.ndarray, 

217 # Detection method and parameters 

218 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

219 # Blob detection parameters 

220 min_sigma: float = 1.0, # Minimum blob size (pixels) 

221 max_sigma: float = 10.0, # Maximum blob size (pixels) 

222 num_sigma: int = 10, # Number of sigma values to test 

223 threshold: float = 0.1, # Detection threshold (0.0-1.0) 

224 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

225 # Watershed parameters 

226 watershed_footprint_size: int = 3, # Local maxima footprint size 

227 watershed_min_distance: int = 5, # Minimum distance between peaks 

228 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods 

229 # Preprocessing parameters 

230 enable_preprocessing: bool = True, 

231 gaussian_sigma: float = 1.0, # Gaussian blur sigma 

232 median_disk_size: int = 1, # Median filter disk size 

233 # Filtering parameters 

234 min_cell_area: int = 10, # Minimum cell area (pixels) 

235 max_cell_area: int = 1000, # Maximum cell area (pixels) 

236 remove_border_cells: bool = True, # Remove cells touching image border 

237 # Output parameters 

238 return_segmentation_mask: bool = False 

239) -> Tuple[np.ndarray, List[CellCountResult]]: 

240 """ 

241 Count cells in single-channel image stack using various detection methods. 

242  

243 Args: 

244 image_stack: 3D array (Z, Y, X) where each Z slice is processed independently 

245 detection_method: Method for cell detection (see DetectionMethod enum) 

246 min_sigma: Minimum blob size for blob detection methods 

247 max_sigma: Maximum blob size for blob detection methods 

248 num_sigma: Number of sigma values to test for blob detection 

249 threshold: Detection threshold (method-dependent) 

250 overlap: Maximum overlap between detected blobs 

251 watershed_footprint_size: Footprint size for local maxima detection 

252 watershed_min_distance: Minimum distance between watershed peaks 

253 watershed_threshold_method: Thresholding method for watershed 

254 enable_preprocessing: Apply Gaussian and median filtering 

255 gaussian_sigma: Standard deviation for Gaussian blur 

256 median_disk_size: Disk size for median filtering 

257 min_cell_area: Minimum area for valid cells 

258 max_cell_area: Maximum area for valid cells 

259 remove_border_cells: Remove cells touching image borders 

260 return_segmentation_mask: Return segmentation masks in output 

261  

262 Returns: 

263 output_stack: Original image stack unchanged (Z, Y, X) 

264 cell_count_results: List of CellCountResult objects for each slice 

265 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True 

266 """ 

267 if image_stack.ndim != 3: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true

268 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

269 

270 results = [] 

271 segmentation_masks = [] 

272 

273 # Store parameters for reproducibility (convert enums to values) 

274 parameters = { 

275 "detection_method": detection_method.value, 

276 "min_sigma": min_sigma, 

277 "max_sigma": max_sigma, 

278 "num_sigma": num_sigma, 

279 "threshold": threshold, 

280 "overlap": overlap, 

281 "watershed_footprint_size": watershed_footprint_size, 

282 "watershed_min_distance": watershed_min_distance, 

283 "watershed_threshold_method": watershed_threshold_method.value, 

284 "gaussian_sigma": gaussian_sigma, 

285 "median_disk_size": median_disk_size, 

286 "min_cell_area": min_cell_area, 

287 "max_cell_area": max_cell_area, 

288 "remove_border_cells": remove_border_cells 

289 } 

290 

291 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method") 

292 

293 for z_idx in range(image_stack.shape[0]): 

294 slice_img = image_stack[z_idx].astype(np.float64) 

295 

296 # Apply preprocessing if enabled 

297 if enable_preprocessing: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size) 

299 

300 # Detect cells using specified method 

301 result = _detect_cells_single_method( 

302 slice_img, z_idx, detection_method.value, parameters 

303 ) 

304 

305 results.append(result) 

306 

307 # Create segmentation mask if requested 

308 if return_segmentation_mask: 308 ↛ 293line 308 didn't jump to line 293 because the condition on line 308 was always true

309 segmentation_mask = _create_segmentation_visualization( 

310 slice_img, result.cell_positions, max_sigma, result.cell_areas, result.binary_mask 

311 ) 

312 segmentation_masks.append(segmentation_mask) 

313 

314 # Always return segmentation masks (empty list if not requested) 

315 # This ensures consistent return signature for special outputs system 

316 return image_stack, results, segmentation_masks 

317 

318 

319@numpy_func 

320@special_outputs(("multi_channel_counts", materialize_cell_counts)) 

321def count_cells_multi_channel( 

322 image_stack: np.ndarray, 

323 chan_1: int, # Index of first channel (positional arg) 

324 chan_2: int, # Index of second channel (positional arg) 

325 # Detection parameters for channel 1 (all single-channel params available) 

326 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

327 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels) 

328 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels) 

329 chan_1_num_sigma: int = 10, # Number of sigma values to test 

330 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

331 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

332 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size 

333 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks 

334 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

335 chan_1_enable_preprocessing: bool = True, # Apply preprocessing 

336 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

337 chan_1_median_disk_size: int = 1, # Median filter disk size 

338 chan_1_min_area: int = 10, # Minimum cell area (pixels) 

339 chan_1_max_area: int = 1000, # Maximum cell area (pixels) 

340 chan_1_remove_border_cells: bool = True, # Remove cells touching border 

341 # Detection parameters for channel 2 (all single-channel params available) 

342 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

343 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels) 

344 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels) 

345 chan_2_num_sigma: int = 10, # Number of sigma values to test 

346 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

347 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

348 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size 

349 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks 

350 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

351 chan_2_enable_preprocessing: bool = True, # Apply preprocessing 

352 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

353 chan_2_median_disk_size: int = 1, # Median filter disk size 

354 chan_2_min_area: int = 10, # Minimum cell area (pixels) 

355 chan_2_max_area: int = 1000, # Maximum cell area (pixels) 

356 chan_2_remove_border_cells: bool = True, # Remove cells touching border 

357 # Colocalization parameters 

358 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods 

359 max_distance: float = 5.0, # Maximum distance for colocalization (pixels) 

360 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method 

361 intensity_threshold: float = 0.5, # Threshold for intensity-based methods 

362 # Output parameters 

363 return_colocalization_map: bool = False 

364) -> Tuple[np.ndarray, List[MultiChannelResult]]: 

365 """ 

366 Count cells in multi-channel image stack with colocalization analysis. 

367 

368 Each channel can be processed with independent parameters, providing the same 

369 flexibility as the single-channel function for each channel separately. 

370 

371 Args: 

372 image_stack: 3D array (Z, Y, X) where Z represents different channels 

373 chan_1: Index of first channel in the stack (positional) 

374 chan_2: Index of second channel in the stack (positional) 

375 

376 # Channel 1 detection parameters (same as single-channel function) 

377 chan_1_method: Detection method for channel 1 (DetectionMethod enum) 

378 chan_1_min_sigma: Minimum blob size for channel 1 

379 chan_1_max_sigma: Maximum blob size for channel 1 

380 chan_1_num_sigma: Number of sigma values to test for channel 1 

381 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0) 

382 chan_1_overlap: Maximum overlap between blobs for channel 1 

383 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1 

384 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1 

385 chan_1_watershed_threshold_method: Thresholding method for channel 1 

386 chan_1_enable_preprocessing: Apply preprocessing to channel 1 

387 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1 

388 chan_1_median_disk_size: Median filter size for channel 1 

389 chan_1_min_area: Minimum cell area for channel 1 

390 chan_1_max_area: Maximum cell area for channel 1 

391 chan_1_remove_border_cells: Remove border cells for channel 1 

392 

393 # Channel 2 detection parameters (same as single-channel function) 

394 chan_2_method: Detection method for channel 2 (DetectionMethod enum) 

395 chan_2_min_sigma: Minimum blob size for channel 2 

396 chan_2_max_sigma: Maximum blob size for channel 2 

397 chan_2_num_sigma: Number of sigma values to test for channel 2 

398 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0) 

399 chan_2_overlap: Maximum overlap between blobs for channel 2 

400 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2 

401 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2 

402 chan_2_watershed_threshold_method: Thresholding method for channel 2 

403 chan_2_enable_preprocessing: Apply preprocessing to channel 2 

404 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2 

405 chan_2_median_disk_size: Median filter size for channel 2 

406 chan_2_min_area: Minimum cell area for channel 2 

407 chan_2_max_area: Maximum cell area for channel 2 

408 chan_2_remove_border_cells: Remove border cells for channel 2 

409 

410 # Colocalization parameters 

411 colocalization_method: Method for determining colocalization (ColocalizationMethod enum) 

412 max_distance: Maximum distance for distance-based colocalization (pixels) 

413 min_overlap_area: Minimum overlap fraction for area-based colocalization 

414 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0) 

415 return_colocalization_map: Return colocalization visualization 

416 

417 Returns: 

418 output_stack: Original images or colocalization maps 

419 multi_channel_results: List of MultiChannelResult objects 

420 """ 

421 if image_stack.ndim != 3: 

422 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

423 

424 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]: 

425 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}") 

426 

427 if chan_1 == chan_2: 

428 raise ValueError("Channel 1 and Channel 2 must be different") 

429 

430 # Extract channel images 

431 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency 

432 chan_2_img = image_stack[chan_2:chan_2+1] 

433 

434 # Count cells in each channel separately using the single-channel function 

435 # Channel 1 parameters (all explicit) 

436 chan_1_params = { 

437 "detection_method": chan_1_method, 

438 "min_sigma": chan_1_min_sigma, 

439 "max_sigma": chan_1_max_sigma, 

440 "num_sigma": chan_1_num_sigma, 

441 "threshold": chan_1_threshold, 

442 "overlap": chan_1_overlap, 

443 "watershed_footprint_size": chan_1_watershed_footprint_size, 

444 "watershed_min_distance": chan_1_watershed_min_distance, 

445 "watershed_threshold_method": chan_1_watershed_threshold_method, 

446 "enable_preprocessing": chan_1_enable_preprocessing, 

447 "gaussian_sigma": chan_1_gaussian_sigma, 

448 "median_disk_size": chan_1_median_disk_size, 

449 "min_cell_area": chan_1_min_area, 

450 "max_cell_area": chan_1_max_area, 

451 "remove_border_cells": chan_1_remove_border_cells, 

452 "return_segmentation_mask": False 

453 } 

454 

455 # Channel 2 parameters (all explicit) 

456 chan_2_params = { 

457 "detection_method": chan_2_method, 

458 "min_sigma": chan_2_min_sigma, 

459 "max_sigma": chan_2_max_sigma, 

460 "num_sigma": chan_2_num_sigma, 

461 "threshold": chan_2_threshold, 

462 "overlap": chan_2_overlap, 

463 "watershed_footprint_size": chan_2_watershed_footprint_size, 

464 "watershed_min_distance": chan_2_watershed_min_distance, 

465 "watershed_threshold_method": chan_2_watershed_threshold_method, 

466 "enable_preprocessing": chan_2_enable_preprocessing, 

467 "gaussian_sigma": chan_2_gaussian_sigma, 

468 "median_disk_size": chan_2_median_disk_size, 

469 "min_cell_area": chan_2_min_area, 

470 "max_cell_area": chan_2_max_area, 

471 "remove_border_cells": chan_2_remove_border_cells, 

472 "return_segmentation_mask": False 

473 } 

474 

475 # Process each channel 

476 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params) 

477 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params) 

478 

479 # Perform colocalization analysis 

480 multi_results = [] 

481 output_stack = image_stack.copy() 

482 

483 # Since we're processing single slices from each channel, we only have one result each 

484 chan_1_result = chan_1_results[0] 

485 chan_2_result = chan_2_results[0] 

486 

487 # Analyze colocalization 

488 coloc_result = _analyze_colocalization( 

489 chan_1_result, chan_2_result, colocalization_method.value, 

490 max_distance, min_overlap_area, intensity_threshold 

491 ) 

492 

493 multi_results.append(coloc_result) 

494 

495 # Create colocalization visualization if requested 

496 if return_colocalization_map: 

497 coloc_map = _create_colocalization_map( 

498 image_stack[chan_1], image_stack[chan_2], coloc_result 

499 ) 

500 # Replace one of the channels with the colocalization map 

501 output_stack = np.stack([image_stack[chan_1], image_stack[chan_2], coloc_map]) 

502 

503 return output_stack, multi_results 

504 

505 

506def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager, backends: Union[str, List[str]], backend_kwargs: dict = None) -> str: 

507 """Materialize single-channel cell counting results.""" 

508 # Normalize backends to list 

509 if isinstance(backends, str): 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true

510 backends = [backends] 

511 

512 if backend_kwargs is None: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true

513 backend_kwargs = {} 

514 

515 # Generate output file paths based on the input path 

516 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

517 base_path = path.replace('.pkl', '') 

518 json_path = f"{base_path}.json" 

519 csv_path = f"{base_path}_details.csv" 

520 

521 # Ensure output directory exists for disk backend 

522 from pathlib import Path 

523 from openhcs.constants.constants import Backend 

524 output_dir = Path(json_path).parent 

525 for backend in backends: 

526 if backend == Backend.DISK.value: 

527 filemanager.ensure_directory(str(output_dir), backend) 

528 

529 summary = { 

530 "analysis_type": "single_channel_cell_counting", 

531 "total_slices": len(data), 

532 "results_per_slice": [] 

533 } 

534 rows = [] 

535 

536 total_cells = 0 

537 for result in data: 

538 total_cells += result.cell_count 

539 

540 # Add to summary 

541 summary["results_per_slice"].append({ 

542 "slice_index": result.slice_index, 

543 "method": result.method, 

544 "cell_count": result.cell_count, 

545 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0, 

546 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0, 

547 "parameters": result.parameters_used 

548 }) 

549 

550 # Add individual cell data to CSV 

551 for i, (pos, area, intensity, confidence) in enumerate(zip( 

552 result.cell_positions, result.cell_areas, 

553 result.cell_intensities, result.detection_confidence 

554 )): 

555 rows.append({ 

556 'slice_index': result.slice_index, 

557 'cell_id': f"slice_{result.slice_index}_cell_{i}", 

558 'x_position': pos[0], 

559 'y_position': pos[1], 

560 'cell_area': area, 

561 'cell_intensity': intensity, 

562 'detection_confidence': confidence, 

563 'detection_method': result.method 

564 }) 

565 

566 summary["total_cells_all_slices"] = total_cells 

567 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0 

568 

569 # Save JSON summary to all backends (backends will ignore if they don't support text data) 

570 json_content = json.dumps(summary, indent=2, default=str) 

571 for backend in backends: 

572 kwargs = backend_kwargs.get(backend, {}) 

573 # Only check exists/delete for storage backends (streaming backends don't support these operations) 

574 try: 

575 if filemanager.exists(json_path, backend): 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true

576 filemanager.delete(json_path, backend) 

577 except AttributeError: 

578 # Streaming backend doesn't have exists/delete - that's fine 

579 pass 

580 filemanager.save(json_content, json_path, backend, **kwargs) 

581 

582 # Save CSV details to all backends (backends will ignore if they don't support text data) 

583 if rows: 583 ↛ 597line 583 didn't jump to line 597 because the condition on line 583 was always true

584 df = pd.DataFrame(rows) 

585 csv_content = df.to_csv(index=False) 

586 for backend in backends: 

587 kwargs = backend_kwargs.get(backend, {}) 

588 # Only check exists/delete for storage backends (streaming backends don't support these operations) 

589 try: 

590 if filemanager.exists(csv_path, backend): 590 ↛ 591line 590 didn't jump to line 591 because the condition on line 590 was never true

591 filemanager.delete(csv_path, backend) 

592 except AttributeError: 

593 # Streaming backend doesn't have exists/delete - that's fine 

594 pass 

595 filemanager.save(csv_content, csv_path, backend, **kwargs) 

596 

597 return json_path 

598 

599 

600def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager, backends: Union[str, List[str]], backend_kwargs: dict = None) -> str: 

601 """Materialize multi-channel cell counting and colocalization results.""" 

602 # Normalize backends to list 

603 if isinstance(backends, str): 

604 backends = [backends] 

605 

606 if backend_kwargs is None: 

607 backend_kwargs = {} 

608 

609 # Generate output file paths based on the input path 

610 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

611 base_path = path.replace('.pkl', '') 

612 json_path = f"{base_path}.json" 

613 csv_path = f"{base_path}_details.csv" 

614 

615 # Ensure output directory exists for disk backend 

616 # Skip directory creation for OMERO virtual paths (they don't exist on disk) 

617 from pathlib import Path 

618 output_dir = Path(json_path).parent 

619 for backend in backends: 

620 if backend == Backend.DISK.value and not str(output_dir).startswith('/omero/'): 

621 filemanager.ensure_directory(str(output_dir), backend) 

622 

623 summary = { 

624 "analysis_type": "multi_channel_cell_counting_colocalization", 

625 "total_slices": len(data), 

626 "colocalization_summary": { 

627 "total_chan_1_cells": 0, 

628 "total_chan_2_cells": 0, 

629 "total_colocalized": 0, 

630 "average_colocalization_percentage": 0 

631 }, 

632 "results_per_slice": [] 

633 } 

634 

635 # CSV for detailed analysis (csv_path already defined above) 

636 rows = [] 

637 

638 total_coloc_pct = 0 

639 for result in data: 

640 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count 

641 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count 

642 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count 

643 total_coloc_pct += result.colocalization_percentage 

644 

645 # Add to summary 

646 summary["results_per_slice"].append({ 

647 "slice_index": result.slice_index, 

648 "chan_1_count": result.chan_1_results.cell_count, 

649 "chan_2_count": result.chan_2_results.cell_count, 

650 "colocalized_count": result.colocalized_count, 

651 "colocalization_percentage": result.colocalization_percentage, 

652 "chan_1_only": result.chan_1_only_count, 

653 "chan_2_only": result.chan_2_only_count, 

654 "colocalization_method": result.colocalization_method, 

655 "colocalization_metrics": result.colocalization_metrics 

656 }) 

657 

658 # Add colocalization details to CSV 

659 for i, pos in enumerate(result.overlap_positions): 

660 rows.append({ 

661 'slice_index': result.slice_index, 

662 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}", 

663 'x_position': pos[0], 

664 'y_position': pos[1], 

665 'colocalization_method': result.colocalization_method 

666 }) 

667 

668 summary["colocalization_summary"]["average_colocalization_percentage"] = ( 

669 total_coloc_pct / len(data) if data else 0 

670 ) 

671 

672 # Save JSON summary to all backends (backends will ignore if they don't support text data) 

673 json_content = json.dumps(summary, indent=2, default=str) 

674 for backend in backends: 

675 kwargs = backend_kwargs.get(backend, {}) 

676 # Only check exists/delete for storage backends (streaming backends don't support these operations) 

677 try: 

678 if filemanager.exists(json_path, backend): 

679 filemanager.delete(json_path, backend) 

680 except AttributeError: 

681 # Streaming backend doesn't have exists/delete - that's fine 

682 pass 

683 filemanager.save(json_content, json_path, backend, **kwargs) 

684 

685 # Save CSV details to all backends (backends will ignore if they don't support text data) 

686 if rows: 

687 df = pd.DataFrame(rows) 

688 csv_content = df.to_csv(index=False) 

689 for backend in backends: 

690 kwargs = backend_kwargs.get(backend, {}) 

691 # Only check exists/delete for storage backends (streaming backends don't support these operations) 

692 try: 

693 if filemanager.exists(csv_path, backend): 

694 filemanager.delete(csv_path, backend) 

695 except AttributeError: 

696 # Streaming backend doesn't have exists/delete - that's fine 

697 pass 

698 filemanager.save(csv_content, csv_path, backend, **kwargs) 

699 

700 return json_path 

701 

702 

703def _preprocess_image(image: np.ndarray, gaussian_sigma: float, median_disk_size: int) -> np.ndarray: 

704 """Apply preprocessing to enhance cell detection.""" 

705 # Gaussian blur to reduce noise 

706 if gaussian_sigma > 0: 

707 image = gaussian(image, sigma=gaussian_sigma, preserve_range=True) 

708 

709 # Median filter to remove salt-and-pepper noise 

710 if median_disk_size > 0: 

711 image = median(image, disk(median_disk_size)) 

712 

713 return image 

714 

715 

716def _detect_cells_single_method( 

717 image: np.ndarray, 

718 slice_idx: int, 

719 method: str, 

720 params: Dict[str, Any] 

721) -> CellCountResult: 

722 """Detect cells using specified method.""" 

723 

724 if method == DetectionMethod.BLOB_LOG.value: 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true

725 return _detect_cells_blob_log(image, slice_idx, params) 

726 elif method == DetectionMethod.BLOB_DOG.value: 726 ↛ 727line 726 didn't jump to line 727 because the condition on line 726 was never true

727 return _detect_cells_blob_dog(image, slice_idx, params) 

728 elif method == DetectionMethod.BLOB_DOH.value: 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true

729 return _detect_cells_blob_doh(image, slice_idx, params) 

730 elif method == DetectionMethod.WATERSHED.value: 730 ↛ 732line 730 didn't jump to line 732 because the condition on line 730 was always true

731 return _detect_cells_watershed(image, slice_idx, params) 

732 elif method == DetectionMethod.THRESHOLD.value: 

733 return _detect_cells_threshold(image, slice_idx, params) 

734 else: 

735 raise ValueError(f"Unknown detection method: {method}") 

736 

737 

738def _detect_cells_blob_log(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

739 """Detect cells using Laplacian of Gaussian blob detection.""" 

740 blobs = blob_log( 

741 image, 

742 min_sigma=params["min_sigma"], 

743 max_sigma=params["max_sigma"], 

744 num_sigma=params["num_sigma"], 

745 threshold=params["threshold"], 

746 overlap=params["overlap"] 

747 ) 

748 

749 # Extract positions, areas, and intensities 

750 positions = [] 

751 areas = [] 

752 intensities = [] 

753 confidences = [] 

754 

755 for blob in blobs: 

756 y, x, sigma = blob 

757 positions.append((float(x), float(y))) 

758 

759 # Estimate area from sigma (blob radius ≈ sigma * sqrt(2)) 

760 radius = sigma * np.sqrt(2) 

761 area = np.pi * radius**2 

762 areas.append(float(area)) 

763 

764 # Sample intensity at blob center 

765 intensity = float(image[int(y), int(x)]) 

766 intensities.append(intensity) 

767 

768 # Use sigma as confidence measure (larger blobs = higher confidence) 

769 confidence = float(sigma / params["max_sigma"]) 

770 confidences.append(confidence) 

771 

772 # Filter by area constraints 

773 filtered_data = _filter_by_area( 

774 positions, areas, intensities, confidences, 

775 params["min_cell_area"], params["max_cell_area"] 

776 ) 

777 

778 return CellCountResult( 

779 slice_index=slice_idx, 

780 method="blob_log", 

781 cell_count=len(filtered_data[0]), 

782 cell_positions=filtered_data[0], 

783 cell_areas=filtered_data[1], 

784 cell_intensities=filtered_data[2], 

785 detection_confidence=filtered_data[3], 

786 parameters_used=params 

787 ) 

788 

789 

790def _detect_cells_blob_dog(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

791 """Detect cells using Difference of Gaussian blob detection.""" 

792 blobs = blob_dog( 

793 image, 

794 min_sigma=params["min_sigma"], 

795 max_sigma=params["max_sigma"], 

796 threshold=params["threshold"], 

797 overlap=params["overlap"] 

798 ) 

799 

800 # Process similar to blob_log 

801 positions = [] 

802 areas = [] 

803 intensities = [] 

804 confidences = [] 

805 

806 for blob in blobs: 

807 y, x, sigma = blob 

808 positions.append((float(x), float(y))) 

809 

810 radius = sigma * np.sqrt(2) 

811 area = np.pi * radius**2 

812 areas.append(float(area)) 

813 

814 intensity = float(image[int(y), int(x)]) 

815 intensities.append(intensity) 

816 

817 confidence = float(sigma / params["max_sigma"]) 

818 confidences.append(confidence) 

819 

820 filtered_data = _filter_by_area( 

821 positions, areas, intensities, confidences, 

822 params["min_cell_area"], params["max_cell_area"] 

823 ) 

824 

825 return CellCountResult( 

826 slice_index=slice_idx, 

827 method="blob_dog", 

828 cell_count=len(filtered_data[0]), 

829 cell_positions=filtered_data[0], 

830 cell_areas=filtered_data[1], 

831 cell_intensities=filtered_data[2], 

832 detection_confidence=filtered_data[3], 

833 parameters_used=params 

834 ) 

835 

836 

837def _detect_cells_blob_doh(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

838 """Detect cells using Determinant of Hessian blob detection.""" 

839 blobs = blob_doh( 

840 image, 

841 min_sigma=params["min_sigma"], 

842 max_sigma=params["max_sigma"], 

843 num_sigma=params["num_sigma"], 

844 threshold=params["threshold"], 

845 overlap=params["overlap"] 

846 ) 

847 

848 # Process similar to other blob methods 

849 positions = [] 

850 areas = [] 

851 intensities = [] 

852 confidences = [] 

853 

854 for blob in blobs: 

855 y, x, sigma = blob 

856 positions.append((float(x), float(y))) 

857 

858 radius = sigma * np.sqrt(2) 

859 area = np.pi * radius**2 

860 areas.append(float(area)) 

861 

862 intensity = float(image[int(y), int(x)]) 

863 intensities.append(intensity) 

864 

865 confidence = float(sigma / params["max_sigma"]) 

866 confidences.append(confidence) 

867 

868 filtered_data = _filter_by_area( 

869 positions, areas, intensities, confidences, 

870 params["min_cell_area"], params["max_cell_area"] 

871 ) 

872 

873 return CellCountResult( 

874 slice_index=slice_idx, 

875 method="blob_doh", 

876 cell_count=len(filtered_data[0]), 

877 cell_positions=filtered_data[0], 

878 cell_areas=filtered_data[1], 

879 cell_intensities=filtered_data[2], 

880 detection_confidence=filtered_data[3], 

881 parameters_used=params 

882 ) 

883 

884 

885def _filter_by_area( 

886 positions: List[Tuple[float, float]], 

887 areas: List[float], 

888 intensities: List[float], 

889 confidences: List[float], 

890 min_area: float, 

891 max_area: float 

892) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]: 

893 """Filter detected cells by area constraints.""" 

894 filtered_positions = [] 

895 filtered_areas = [] 

896 filtered_intensities = [] 

897 filtered_confidences = [] 

898 

899 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences): 

900 if min_area <= area <= max_area: 

901 filtered_positions.append(pos) 

902 filtered_areas.append(area) 

903 filtered_intensities.append(intensity) 

904 filtered_confidences.append(confidence) 

905 

906 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences 

907 

908 

909def _detect_cells_watershed(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

910 """Detect cells using watershed segmentation.""" 

911 # Determine threshold 

912 if params["watershed_threshold_method"] == "otsu": 912 ↛ 914line 912 didn't jump to line 914 because the condition on line 912 was always true

913 threshold_val = threshold_otsu(image) 

914 elif params["watershed_threshold_method"] == "li": 

915 threshold_val = threshold_li(image) 

916 else: 

917 threshold_val = float(params["watershed_threshold_method"]) 

918 

919 # Create binary mask 

920 binary = image > threshold_val 

921 

922 # Remove small objects and border objects 

923 binary = remove_small_objects(binary, min_size=params["min_cell_area"]) 

924 if params["remove_border_cells"]: 924 ↛ 928line 924 didn't jump to line 928 because the condition on line 924 was always true

925 binary = clear_border(binary) 

926 

927 # Find local maxima as seeds 

928 distance = ndimage.distance_transform_edt(binary) 

929 local_maxima = peak_local_max( 

930 distance, 

931 min_distance=params["watershed_min_distance"], 

932 footprint=np.ones((params["watershed_footprint_size"], params["watershed_footprint_size"])) 

933 ) 

934 

935 # Convert coordinates to binary mask 

936 local_maxima_mask = np.zeros_like(distance, dtype=bool) 

937 if len(local_maxima) > 0: 937 ↛ 942line 937 didn't jump to line 942 because the condition on line 937 was always true

938 local_maxima_mask[local_maxima[:, 0], local_maxima[:, 1]] = True 

939 

940 # Create markers for watershed 

941 # Convert boolean mask to integer labels for connected components 

942 markers = label(local_maxima_mask.astype(np.uint8)) 

943 

944 # Apply watershed 

945 labels = watershed(-distance, markers, mask=binary) 

946 

947 # Extract region properties 

948 regions = regionprops(labels, intensity_image=image) 

949 

950 positions = [] 

951 areas = [] 

952 intensities = [] 

953 confidences = [] 

954 valid_labels = [] # Track which labels pass the size filter 

955 

956 for region in regions: 

957 # Filter by area 

958 if params["min_cell_area"] <= region.area <= params["max_cell_area"]: 

959 # Centroid (note: regionprops returns (row, col) = (y, x)) 

960 y, x = region.centroid 

961 positions.append((float(x), float(y))) 

962 

963 areas.append(float(region.area)) 

964 intensities.append(float(region.mean_intensity)) 

965 

966 # Use area as confidence measure (normalized) 

967 confidence = min(1.0, region.area / params["max_cell_area"]) 

968 confidences.append(confidence) 

969 

970 # Track this label as valid 

971 valid_labels.append(region.label) 

972 

973 # Create filtered binary mask with only cells that passed size filter 

974 filtered_binary_mask = np.isin(labels, valid_labels) 

975 

976 return CellCountResult( 

977 slice_index=slice_idx, 

978 method="watershed", 

979 cell_count=len(positions), 

980 cell_positions=positions, 

981 cell_areas=areas, 

982 cell_intensities=intensities, 

983 detection_confidence=confidences, 

984 parameters_used=params, 

985 binary_mask=filtered_binary_mask # Only cells that passed all filters 

986 ) 

987 

988 

989def _detect_cells_threshold(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

990 """Detect cells using simple thresholding and connected components.""" 

991 # Apply threshold 

992 binary = image > params["threshold"] * image.max() 

993 

994 # Remove small objects and border objects 

995 binary = remove_small_objects(binary, min_size=params["min_cell_area"]) 

996 if params["remove_border_cells"]: 

997 binary = clear_border(binary) 

998 

999 # Label connected components 

1000 labels = label(binary) 

1001 regions = regionprops(labels, intensity_image=image) 

1002 

1003 positions = [] 

1004 areas = [] 

1005 intensities = [] 

1006 confidences = [] 

1007 valid_labels = [] # Track which labels pass the size filter 

1008 

1009 for region in regions: 

1010 # Filter by area 

1011 if params["min_cell_area"] <= region.area <= params["max_cell_area"]: 

1012 y, x = region.centroid 

1013 positions.append((float(x), float(y))) 

1014 

1015 areas.append(float(region.area)) 

1016 intensities.append(float(region.mean_intensity)) 

1017 

1018 # Use intensity as confidence measure 

1019 confidence = float(region.mean_intensity / image.max()) 

1020 confidences.append(confidence) 

1021 

1022 # Track this label as valid 

1023 valid_labels.append(region.label) 

1024 

1025 # Create filtered binary mask with only cells that passed size filter 

1026 filtered_binary_mask = np.isin(labels, valid_labels) 

1027 

1028 return CellCountResult( 

1029 slice_index=slice_idx, 

1030 method="threshold", 

1031 cell_count=len(positions), 

1032 cell_positions=positions, 

1033 cell_areas=areas, 

1034 cell_intensities=intensities, 

1035 detection_confidence=confidences, 

1036 parameters_used=params, 

1037 binary_mask=filtered_binary_mask # Only cells that passed all filters 

1038 ) 

1039 

1040 

1041def _analyze_colocalization( 

1042 chan_1_result: CellCountResult, 

1043 chan_2_result: CellCountResult, 

1044 method: str, 

1045 max_distance: float, 

1046 min_overlap_area: float, 

1047 intensity_threshold: float 

1048) -> MultiChannelResult: 

1049 """Analyze colocalization between two channels.""" 

1050 

1051 if method == ColocalizationMethod.DISTANCE_BASED.value: 

1052 return _colocalization_distance_based( 

1053 chan_1_result, chan_2_result, max_distance 

1054 ) 

1055 elif method == ColocalizationMethod.OVERLAP_AREA.value: 

1056 return _colocalization_overlap_based( 

1057 chan_1_result, chan_2_result, min_overlap_area 

1058 ) 

1059 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value: 

1060 return _colocalization_intensity_based( 

1061 chan_1_result, chan_2_result, intensity_threshold 

1062 ) 

1063 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value: 

1064 return _colocalization_manders( 

1065 chan_1_result, chan_2_result, intensity_threshold 

1066 ) 

1067 else: 

1068 raise ValueError(f"Unknown colocalization method: {method}") 

1069 

1070 

1071def _colocalization_distance_based( 

1072 chan_1_result: CellCountResult, 

1073 chan_2_result: CellCountResult, 

1074 max_distance: float 

1075) -> MultiChannelResult: 

1076 """Perform distance-based colocalization analysis.""" 

1077 

1078 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1079 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based") 

1080 

1081 # Convert positions to arrays 

1082 pos_1 = np.array(chan_1_result.cell_positions) 

1083 pos_2 = np.array(chan_2_result.cell_positions) 

1084 

1085 # Calculate pairwise distances 

1086 distances = cdist(pos_1, pos_2) 

1087 

1088 # Find colocalized pairs 

1089 colocalized_pairs = [] 

1090 used_chan_2 = set() 

1091 

1092 for i in range(len(pos_1)): 

1093 # Find closest cell in channel 2 

1094 min_dist_idx = np.argmin(distances[i]) 

1095 min_dist = distances[i, min_dist_idx] 

1096 

1097 # Check if within distance threshold and not already used 

1098 if min_dist <= max_distance and min_dist_idx not in used_chan_2: 

1099 colocalized_pairs.append((i, min_dist_idx)) 

1100 used_chan_2.add(min_dist_idx) 

1101 

1102 # Calculate metrics 

1103 colocalized_count = len(colocalized_pairs) 

1104 total_cells = len(pos_1) + len(pos_2) 

1105 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1106 

1107 chan_1_only = len(pos_1) - colocalized_count 

1108 chan_2_only = len(pos_2) - colocalized_count 

1109 

1110 # Extract colocalized positions (average of paired positions) 

1111 overlap_positions = [] 

1112 for i, j in colocalized_pairs: 

1113 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist() 

1114 overlap_positions.append(tuple(avg_pos)) 

1115 

1116 # Calculate additional metrics 

1117 if colocalized_pairs: 

1118 avg_distance = np.mean([distances[i, j] for i, j in colocalized_pairs]) 

1119 max_distance_found = np.max([distances[i, j] for i, j in colocalized_pairs]) 

1120 else: 

1121 avg_distance = 0 

1122 max_distance_found = 0 

1123 

1124 metrics = { 

1125 "average_colocalization_distance": float(avg_distance), 

1126 "max_colocalization_distance": float(max_distance_found), 

1127 "distance_threshold_used": max_distance 

1128 } 

1129 

1130 return MultiChannelResult( 

1131 slice_index=chan_1_result.slice_index, 

1132 chan_1_results=chan_1_result, 

1133 chan_2_results=chan_2_result, 

1134 colocalization_method="distance_based", 

1135 colocalized_count=colocalized_count, 

1136 colocalization_percentage=colocalization_percentage, 

1137 chan_1_only_count=chan_1_only, 

1138 chan_2_only_count=chan_2_only, 

1139 colocalization_metrics=metrics, 

1140 overlap_positions=overlap_positions 

1141 ) 

1142 

1143 

1144def _create_empty_coloc_result( 

1145 chan_1_result: CellCountResult, 

1146 chan_2_result: CellCountResult, 

1147 method: str 

1148) -> MultiChannelResult: 

1149 """Create empty colocalization result when no cells found.""" 

1150 return MultiChannelResult( 

1151 slice_index=chan_1_result.slice_index, 

1152 chan_1_results=chan_1_result, 

1153 chan_2_results=chan_2_result, 

1154 colocalization_method=method, 

1155 colocalized_count=0, 

1156 colocalization_percentage=0.0, 

1157 chan_1_only_count=chan_1_result.cell_count, 

1158 chan_2_only_count=chan_2_result.cell_count, 

1159 colocalization_metrics={}, 

1160 overlap_positions=[] 

1161 ) 

1162 

1163 

1164def _colocalization_overlap_based( 

1165 chan_1_result: CellCountResult, 

1166 chan_2_result: CellCountResult, 

1167 min_overlap_area: float 

1168) -> MultiChannelResult: 

1169 """Perform area overlap-based colocalization analysis.""" 

1170 # This is a simplified implementation - in practice, you'd need actual segmentation masks 

1171 # For now, we'll use distance as a proxy for overlap 

1172 

1173 # Use distance-based method with smaller threshold as approximation 

1174 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly 

1175 

1176 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold) 

1177 result.colocalization_method = "overlap_area" 

1178 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area 

1179 result.colocalization_metrics["note"] = "Approximated using distance-based method" 

1180 

1181 return result 

1182 

1183 

1184def _colocalization_intensity_based( 

1185 chan_1_result: CellCountResult, 

1186 chan_2_result: CellCountResult, 

1187 intensity_threshold: float 

1188) -> MultiChannelResult: 

1189 """Perform intensity correlation-based colocalization analysis.""" 

1190 

1191 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1192 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation") 

1193 

1194 # Use distance-based pairing first 

1195 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0) 

1196 

1197 # Filter pairs based on intensity correlation 

1198 colocalized_pairs = [] 

1199 overlap_positions = [] 

1200 

1201 pos_1 = np.array(chan_1_result.cell_positions) 

1202 pos_2 = np.array(chan_2_result.cell_positions) 

1203 

1204 for i, (x1, y1) in enumerate(chan_1_result.cell_positions): 

1205 for j, (x2, y2) in enumerate(chan_2_result.cell_positions): 

1206 # Calculate distance 

1207 dist = np.sqrt((x1 - x2)**2 + (y1 - y2)**2) 

1208 

1209 if dist <= 5.0: # Within reasonable distance 

1210 # Check intensity correlation 

1211 int_1 = chan_1_result.cell_intensities[i] 

1212 int_2 = chan_2_result.cell_intensities[j] 

1213 

1214 # Simple intensity correlation: both above threshold 

1215 if int_1 >= intensity_threshold and int_2 >= intensity_threshold: 

1216 colocalized_pairs.append((i, j)) 

1217 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2) 

1218 overlap_positions.append(avg_pos) 

1219 break # One-to-one mapping 

1220 

1221 colocalized_count = len(colocalized_pairs) 

1222 total_cells = len(pos_1) + len(pos_2) 

1223 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1224 

1225 metrics = { 

1226 "intensity_threshold_used": intensity_threshold, 

1227 "correlation_method": "threshold_based" 

1228 } 

1229 

1230 return MultiChannelResult( 

1231 slice_index=chan_1_result.slice_index, 

1232 chan_1_results=chan_1_result, 

1233 chan_2_results=chan_2_result, 

1234 colocalization_method="intensity_correlation", 

1235 colocalized_count=colocalized_count, 

1236 colocalization_percentage=colocalization_percentage, 

1237 chan_1_only_count=len(pos_1) - colocalized_count, 

1238 chan_2_only_count=len(pos_2) - colocalized_count, 

1239 colocalization_metrics=metrics, 

1240 overlap_positions=overlap_positions 

1241 ) 

1242 

1243 

1244def _colocalization_manders( 

1245 chan_1_result: CellCountResult, 

1246 chan_2_result: CellCountResult, 

1247 intensity_threshold: float 

1248) -> MultiChannelResult: 

1249 """Calculate Manders colocalization coefficients.""" 

1250 

1251 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1252 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients") 

1253 

1254 # Simplified Manders calculation based on detected cells 

1255 # In practice, this would use pixel-level intensity analysis 

1256 

1257 # Use intensity-based method as foundation 

1258 intensity_result = _colocalization_intensity_based( 

1259 chan_1_result, chan_2_result, intensity_threshold 

1260 ) 

1261 

1262 # Calculate Manders-like coefficients 

1263 total_int_1 = sum(chan_1_result.cell_intensities) 

1264 total_int_2 = sum(chan_2_result.cell_intensities) 

1265 

1266 # Simplified: assume colocalized cells contribute their full intensity 

1267 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in 

1268 [(i, j) for i in range(len(chan_1_result.cell_positions)) 

1269 for j in range(len(chan_2_result.cell_positions)) 

1270 if (i, j) in [(0, 0)]]) # Simplified placeholder 

1271 

1272 # Manders coefficients (M1 and M2) 

1273 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0 

1274 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified 

1275 

1276 intensity_result.colocalization_method = "manders_coefficients" 

1277 intensity_result.colocalization_metrics.update({ 

1278 "manders_m1": m1, 

1279 "manders_m2": m2, 

1280 "note": "Simplified cell-based Manders calculation" 

1281 }) 

1282 

1283 return intensity_result 

1284 

1285 

1286def _create_segmentation_visualization( 

1287 image: np.ndarray, 

1288 positions: List[Tuple[float, float]], 

1289 max_sigma: float, 

1290 cell_areas: List[float] = None, 

1291 binary_mask: np.ndarray = None 

1292) -> np.ndarray: 

1293 """Create segmentation visualization using actual binary mask if available.""" 

1294 

1295 # If we have the actual binary mask from detection, use it directly 

1296 if binary_mask is not None: 1296 ↛ 1303line 1296 didn't jump to line 1303 because the condition on line 1296 was always true

1297 # Convert boolean mask to uint16 to match input image dtype 

1298 # Use max intensity for detected cells, 0 for background 

1299 max_intensity = image.max() if image.max() > 0 else 65535 

1300 return (binary_mask * max_intensity).astype(image.dtype) 

1301 

1302 # Fallback to original circular marker approach for blob methods 

1303 visualization = image.copy() 

1304 

1305 # Mark detected cells with their actual sizes 

1306 for i, (x, y) in enumerate(positions): 

1307 # Use actual cell area if available, otherwise fall back to max_sigma 

1308 if cell_areas and i < len(cell_areas): 

1309 # Convert area to radius (assuming circular cells) 

1310 radius = np.sqrt(cell_areas[i] / np.pi) 

1311 else: 

1312 # Fallback to max_sigma for backward compatibility 

1313 radius = max_sigma * 2 

1314 

1315 # Create circular markers with actual cell size 

1316 rr, cc = np.ogrid[:image.shape[0], :image.shape[1]] 

1317 mask = (rr - y)**2 + (cc - x)**2 <= radius**2 

1318 

1319 # Ensure indices are within bounds 

1320 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1]) 

1321 mask = mask & valid_mask 

1322 

1323 visualization[mask] = visualization.max() # Bright markers 

1324 

1325 return visualization 

1326 

1327 

1328def _create_colocalization_map( 

1329 chan_1_img: np.ndarray, 

1330 chan_2_img: np.ndarray, 

1331 coloc_result: MultiChannelResult 

1332) -> np.ndarray: 

1333 """Create colocalization visualization map.""" 

1334 # Create RGB-like visualization 

1335 coloc_map = np.zeros_like(chan_1_img) 

1336 

1337 # Mark colocalized positions 

1338 for x, y in coloc_result.overlap_positions: 

1339 # Create markers for colocalized cells 

1340 rr, cc = np.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]] 

1341 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius 

1342 

1343 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1]) 

1344 mask = mask & valid_mask 

1345 

1346 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers 

1347 

1348 return coloc_map