Coverage for openhcs/processing/backends/analysis/cell_counting_pyclesperanto.py: 1.3%

605 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2GPU-accelerated cell counting and multi-channel colocalization analysis for OpenHCS. 

3 

4This module provides comprehensive cell counting capabilities using pyclesperanto, 

5supporting both single-channel and multi-channel analysis with various detection 

6methods and colocalization metrics. 

7""" 

8 

9import numpy as np 

10import logging 

11import gc 

12from typing import Dict, List, Tuple, Any, Union 

13from dataclasses import dataclass 

14from enum import Enum 

15 

16logger = logging.getLogger(__name__) 

17 

18# Core scientific computing imports 

19import pandas as pd 

20import json 

21import pyclesperanto as cle 

22from scipy.spatial.distance import cdist 

23 

24# OpenHCS imports 

25from openhcs.core.memory.decorators import pyclesperanto as pyclesperanto_func 

26from openhcs.core.pipeline.function_contracts import special_outputs 

27from openhcs.constants.constants import Backend 

28 

29 

30class DetectionMethod(Enum): 

31 """Cell detection methods available.""" 

32 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells) 

33 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose) 

34 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells) 

35 WATERSHED = "watershed" # Watershed segmentation (best for touching cells) 

36 THRESHOLD = "threshold" # Simple thresholding (fastest, basic) 

37 

38 

39class ColocalizationMethod(Enum): 

40 """Methods for multi-channel colocalization analysis.""" 

41 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap 

42 DISTANCE_BASED = "distance_based" # Based on centroid distances 

43 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation 

44 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients 

45 

46 

47class ThresholdMethod(Enum): 

48 """Automatic thresholding methods for watershed segmentation.""" 

49 OTSU = "otsu" # Otsu's method (good for bimodal histograms) 

50 LI = "li" # Li's method (good for low contrast images) 

51 MANUAL = "manual" # Manual threshold value (0.0-1.0) 

52 

53 

54 

55 

56 

57@dataclass 

58class CellCountResult: 

59 """Results for single-channel cell counting.""" 

60 slice_index: int 

61 method: str 

62 cell_count: int 

63 cell_positions: List[Tuple[float, float]] # (x, y) centroids 

64 cell_areas: List[float] 

65 cell_intensities: List[float] 

66 detection_confidence: List[float] 

67 parameters_used: Dict[str, Any] 

68 

69 

70@dataclass 

71class MultiChannelResult: 

72 """Results for multi-channel cell counting and colocalization.""" 

73 slice_index: int 

74 chan_1_results: CellCountResult 

75 chan_2_results: CellCountResult 

76 colocalization_method: str 

77 colocalized_count: int 

78 colocalization_percentage: float 

79 chan_1_only_count: int 

80 chan_2_only_count: int 

81 colocalization_metrics: Dict[str, float] 

82 overlap_positions: List[Tuple[float, float]] 

83 

84 

85def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager, backend: str) -> str: 

86 """Materialize cell counting results as analysis-ready CSV and JSON formats.""" 

87 

88 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}, backend={backend}") 

89 

90 # Convert CLE binary_mask to NumPy if present 

91 if data: 

92 for result in data: 

93 if result.binary_mask is not None: 

94 result.binary_mask = np.asarray(result.binary_mask) 

95 if isinstance(result, MultiChannelResult): 

96 if result.chan_1_results.binary_mask is not None: 

97 result.chan_1_results.binary_mask = np.asarray(result.chan_1_results.binary_mask) 

98 if result.chan_2_results.binary_mask is not None: 

99 result.chan_2_results.binary_mask = np.asarray(result.chan_2_results.binary_mask) 

100 

101 # Determine if this is single-channel or multi-channel data 

102 if not data: 

103 logger.warning("🔬 CELL_COUNT_MATERIALIZE: No data to materialize") 

104 return path 

105 

106 is_multi_channel = isinstance(data[0], MultiChannelResult) 

107 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}") 

108 

109 if is_multi_channel: 

110 return _materialize_multi_channel_results(data, path, filemanager, backend) 

111 else: 

112 return _materialize_single_channel_results(data, path, filemanager, backend) 

113 

114 

115def materialize_segmentation_masks(data: List[np.ndarray], path: str, filemanager, backend: str) -> str: 

116 """ 

117 Materialize segmentation masks as individual TIFF files. 

118 

119 Single-backend signature - decorator automatically wraps for multi-backend support. 

120 """ 

121 

122 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, backend={backend}, masks_count={len(data) if data else 0}") 

123 

124 # Convert CLE arrays to NumPy 

125 if data: 

126 data = [np.asarray(mask) for mask in data] 

127 

128 if not data: 

129 logger.info("🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)") 

130 # Create empty summary file to indicate no masks were generated 

131 summary_path = path.replace('.pkl', '_segmentation_summary.txt') 

132 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n" 

133 filemanager.save(summary_content, summary_path, backend) 

134 return summary_path 

135 

136 # Generate output file paths based on the input path 

137 base_path = path.replace('.pkl', '') 

138 

139 # Save each mask as a separate TIFF file 

140 for i, mask in enumerate(data): 

141 mask_filename = f"{base_path}_slice_{i:03d}.tif" 

142 

143 # Convert mask to appropriate dtype for saving (uint16 to match input images) 

144 if mask.dtype != np.uint16: 

145 # Normalize to uint16 range if needed 

146 if mask.max() <= 1.0: 

147 mask_uint16 = (mask * 65535).astype(np.uint16) 

148 else: 

149 mask_uint16 = mask.astype(np.uint16) 

150 else: 

151 mask_uint16 = mask 

152 

153 # Save using filemanager with provided backend 

154 filemanager.save(mask_uint16, mask_filename, backend) 

155 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename} (backend={backend})") 

156 

157 # Return summary path 

158 summary_path = f"{base_path}_segmentation_summary.txt" 

159 summary_content = f"Segmentation masks saved: {len(data)} files\n" 

160 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n" 

161 summary_content += f"Mask dtype: {data[0].dtype}\n" 

162 summary_content += f"Mask shape: {data[0].shape}\n" 

163 

164 filemanager.save(summary_content, summary_path, backend) 

165 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks to backend={backend}") 

166 

167 return summary_path 

168 

169 

170@pyclesperanto_func 

171@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks)) 

172def count_cells_single_channel( 

173 image_stack: np.ndarray, 

174 # Detection method and parameters 

175 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

176 # Blob detection parameters 

177 min_sigma: float = 1.0, # Minimum blob size (pixels) 

178 max_sigma: float = 10.0, # Maximum blob size (pixels) 

179 num_sigma: int = 10, # Number of sigma values to test 

180 threshold: float = 0.1, # Detection threshold (0.0-1.0) 

181 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

182 # Watershed parameters 

183 watershed_footprint_size: int = 3, # Local maxima footprint size 

184 watershed_min_distance: int = 5, # Minimum distance between peaks 

185 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods 

186 # Preprocessing parameters 

187 enable_preprocessing: bool = True, 

188 gaussian_sigma: float = 1.0, # Gaussian blur sigma 

189 median_disk_size: int = 1, # Median filter disk size 

190 # Filtering parameters 

191 min_cell_area: int = 10, # Minimum cell area (pixels) 

192 max_cell_area: int = 1000, # Maximum cell area (pixels) 

193 remove_border_cells: bool = True, # Remove cells touching image border 

194 # Output parameters 

195 return_segmentation_mask: bool = False 

196) -> Tuple[np.ndarray, List[CellCountResult]]: 

197 """ 

198 Count cells in single-channel image stack using various detection methods. 

199  

200 Args: 

201 image_stack: 3D array (Z, Y, X) where each Z slice is processed independently 

202 detection_method: Method for cell detection (see DetectionMethod enum) 

203 min_sigma: Minimum blob size for blob detection methods 

204 max_sigma: Maximum blob size for blob detection methods 

205 num_sigma: Number of sigma values to test for blob detection 

206 threshold: Detection threshold (method-dependent) 

207 overlap: Maximum overlap between detected blobs 

208 watershed_footprint_size: Footprint size for local maxima detection 

209 watershed_min_distance: Minimum distance between watershed peaks 

210 watershed_threshold_method: Thresholding method for watershed 

211 enable_preprocessing: Apply Gaussian and median filtering 

212 gaussian_sigma: Standard deviation for Gaussian blur 

213 median_disk_size: Disk size for median filtering 

214 min_cell_area: Minimum area for valid cells 

215 max_cell_area: Maximum area for valid cells 

216 remove_border_cells: Remove cells touching image borders 

217 return_segmentation_mask: Return segmentation masks in output 

218  

219 Returns: 

220 output_stack: Original image stack unchanged (Z, Y, X) 

221 cell_count_results: List of CellCountResult objects for each slice 

222 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True 

223 """ 

224 if image_stack.ndim != 3: 

225 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

226 

227 results = [] 

228 segmentation_masks = [] 

229 

230 # Store parameters for reproducibility (convert enums to values) 

231 parameters = { 

232 "detection_method": detection_method.value, 

233 "min_sigma": min_sigma, 

234 "max_sigma": max_sigma, 

235 "num_sigma": num_sigma, 

236 "threshold": threshold, 

237 "overlap": overlap, 

238 "watershed_footprint_size": watershed_footprint_size, 

239 "watershed_min_distance": watershed_min_distance, 

240 "watershed_threshold_method": watershed_threshold_method.value if hasattr(watershed_threshold_method, 'value') else watershed_threshold_method, 

241 "gaussian_sigma": gaussian_sigma, 

242 "median_disk_size": median_disk_size, 

243 "min_cell_area": min_cell_area, 

244 "max_cell_area": max_cell_area, 

245 "remove_border_cells": remove_border_cells 

246 } 

247 

248 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method") 

249 

250 for z_idx in range(image_stack.shape[0]): 

251 # Extract slice - keep as pyclesperanto array 

252 slice_img = image_stack[z_idx] 

253 

254 # Apply preprocessing if enabled (all operations stay in GPU) 

255 if enable_preprocessing: 

256 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size) 

257 

258 # Detect cells using specified method (slice_img stays as cle array) 

259 result = _detect_cells_single_method( 

260 slice_img, z_idx, detection_method.value, parameters 

261 ) 

262 

263 results.append(result) 

264 

265 # Create segmentation mask if requested 

266 if return_segmentation_mask: 

267 segmentation_mask = _create_segmentation_visualization( 

268 slice_img, result.cell_positions, max_sigma 

269 ) 

270 segmentation_masks.append(segmentation_mask) 

271 

272 # Always return segmentation masks (empty list if not requested) 

273 # This ensures consistent return signature for special outputs system 

274 return image_stack, results, segmentation_masks 

275 

276 

277@pyclesperanto_func 

278@special_outputs(("multi_channel_counts", materialize_cell_counts)) 

279def count_cells_multi_channel( 

280 image_stack: np.ndarray, 

281 chan_1: int, # Index of first channel (positional arg) 

282 chan_2: int, # Index of second channel (positional arg) 

283 # Detection parameters for channel 1 (all single-channel params available) 

284 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

285 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels) 

286 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels) 

287 chan_1_num_sigma: int = 10, # Number of sigma values to test 

288 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

289 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

290 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size 

291 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks 

292 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

293 chan_1_enable_preprocessing: bool = True, # Apply preprocessing 

294 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

295 chan_1_median_disk_size: int = 1, # Median filter disk size 

296 chan_1_min_area: int = 10, # Minimum cell area (pixels) 

297 chan_1_max_area: int = 1000, # Maximum cell area (pixels) 

298 chan_1_remove_border_cells: bool = True, # Remove cells touching border 

299 # Detection parameters for channel 2 (all single-channel params available) 

300 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

301 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels) 

302 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels) 

303 chan_2_num_sigma: int = 10, # Number of sigma values to test 

304 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

305 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

306 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size 

307 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks 

308 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

309 chan_2_enable_preprocessing: bool = True, # Apply preprocessing 

310 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

311 chan_2_median_disk_size: int = 1, # Median filter disk size 

312 chan_2_min_area: int = 10, # Minimum cell area (pixels) 

313 chan_2_max_area: int = 1000, # Maximum cell area (pixels) 

314 chan_2_remove_border_cells: bool = True, # Remove cells touching border 

315 # Colocalization parameters 

316 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods 

317 max_distance: float = 5.0, # Maximum distance for colocalization (pixels) 

318 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method 

319 intensity_threshold: float = 0.5, # Threshold for intensity-based methods 

320 # Output parameters 

321 return_colocalization_map: bool = False 

322) -> Tuple[np.ndarray, List[MultiChannelResult]]: 

323 """ 

324 Count cells in multi-channel image stack with colocalization analysis. 

325 

326 Each channel can be processed with independent parameters, providing the same 

327 flexibility as the single-channel function for each channel separately. 

328 

329 Args: 

330 image_stack: 3D array (Z, Y, X) where Z represents different channels 

331 chan_1: Index of first channel in the stack (positional) 

332 chan_2: Index of second channel in the stack (positional) 

333 

334 # Channel 1 detection parameters (same as single-channel function) 

335 chan_1_method: Detection method for channel 1 (DetectionMethod enum) 

336 chan_1_min_sigma: Minimum blob size for channel 1 

337 chan_1_max_sigma: Maximum blob size for channel 1 

338 chan_1_num_sigma: Number of sigma values to test for channel 1 

339 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0) 

340 chan_1_overlap: Maximum overlap between blobs for channel 1 

341 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1 

342 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1 

343 chan_1_watershed_threshold_method: Thresholding method for channel 1 

344 chan_1_enable_preprocessing: Apply preprocessing to channel 1 

345 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1 

346 chan_1_median_disk_size: Median filter size for channel 1 

347 chan_1_min_area: Minimum cell area for channel 1 

348 chan_1_max_area: Maximum cell area for channel 1 

349 chan_1_remove_border_cells: Remove border cells for channel 1 

350 

351 # Channel 2 detection parameters (same as single-channel function) 

352 chan_2_method: Detection method for channel 2 (DetectionMethod enum) 

353 chan_2_min_sigma: Minimum blob size for channel 2 

354 chan_2_max_sigma: Maximum blob size for channel 2 

355 chan_2_num_sigma: Number of sigma values to test for channel 2 

356 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0) 

357 chan_2_overlap: Maximum overlap between blobs for channel 2 

358 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2 

359 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2 

360 chan_2_watershed_threshold_method: Thresholding method for channel 2 

361 chan_2_enable_preprocessing: Apply preprocessing to channel 2 

362 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2 

363 chan_2_median_disk_size: Median filter size for channel 2 

364 chan_2_min_area: Minimum cell area for channel 2 

365 chan_2_max_area: Maximum cell area for channel 2 

366 chan_2_remove_border_cells: Remove border cells for channel 2 

367 

368 # Colocalization parameters 

369 colocalization_method: Method for determining colocalization (ColocalizationMethod enum) 

370 max_distance: Maximum distance for distance-based colocalization (pixels) 

371 min_overlap_area: Minimum overlap fraction for area-based colocalization 

372 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0) 

373 return_colocalization_map: Return colocalization visualization 

374 

375 Returns: 

376 output_stack: Original images or colocalization maps 

377 multi_channel_results: List of MultiChannelResult objects 

378 """ 

379 if image_stack.ndim != 3: 

380 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

381 

382 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]: 

383 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}") 

384 

385 if chan_1 == chan_2: 

386 raise ValueError("Channel 1 and Channel 2 must be different") 

387 

388 # Extract channel images 

389 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency 

390 chan_2_img = image_stack[chan_2:chan_2+1] 

391 

392 # Count cells in each channel separately using the single-channel function 

393 # Channel 1 parameters (all explicit) 

394 chan_1_params = { 

395 "detection_method": chan_1_method, 

396 "min_sigma": chan_1_min_sigma, 

397 "max_sigma": chan_1_max_sigma, 

398 "num_sigma": chan_1_num_sigma, 

399 "threshold": chan_1_threshold, 

400 "overlap": chan_1_overlap, 

401 "watershed_footprint_size": chan_1_watershed_footprint_size, 

402 "watershed_min_distance": chan_1_watershed_min_distance, 

403 "watershed_threshold_method": chan_1_watershed_threshold_method, 

404 "enable_preprocessing": chan_1_enable_preprocessing, 

405 "gaussian_sigma": chan_1_gaussian_sigma, 

406 "median_disk_size": chan_1_median_disk_size, 

407 "min_cell_area": chan_1_min_area, 

408 "max_cell_area": chan_1_max_area, 

409 "remove_border_cells": chan_1_remove_border_cells, 

410 "return_segmentation_mask": False 

411 } 

412 

413 # Channel 2 parameters (all explicit) 

414 chan_2_params = { 

415 "detection_method": chan_2_method, 

416 "min_sigma": chan_2_min_sigma, 

417 "max_sigma": chan_2_max_sigma, 

418 "num_sigma": chan_2_num_sigma, 

419 "threshold": chan_2_threshold, 

420 "overlap": chan_2_overlap, 

421 "watershed_footprint_size": chan_2_watershed_footprint_size, 

422 "watershed_min_distance": chan_2_watershed_min_distance, 

423 "watershed_threshold_method": chan_2_watershed_threshold_method, 

424 "enable_preprocessing": chan_2_enable_preprocessing, 

425 "gaussian_sigma": chan_2_gaussian_sigma, 

426 "median_disk_size": chan_2_median_disk_size, 

427 "min_cell_area": chan_2_min_area, 

428 "max_cell_area": chan_2_max_area, 

429 "remove_border_cells": chan_2_remove_border_cells, 

430 "return_segmentation_mask": False 

431 } 

432 

433 # Process each channel 

434 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params) 

435 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params) 

436 

437 # Perform colocalization analysis 

438 multi_results = [] 

439 output_stack = image_stack.copy() 

440 

441 # Since we're processing single slices from each channel, we only have one result each 

442 chan_1_result = chan_1_results[0] 

443 chan_2_result = chan_2_results[0] 

444 

445 # Analyze colocalization 

446 coloc_result = _analyze_colocalization( 

447 chan_1_result, chan_2_result, colocalization_method.value, 

448 max_distance, min_overlap_area, intensity_threshold 

449 ) 

450 

451 multi_results.append(coloc_result) 

452 

453 # Create colocalization visualization if requested 

454 if return_colocalization_map: 

455 coloc_map = _create_colocalization_map( 

456 image_stack[chan_1], image_stack[chan_2], coloc_result 

457 ) 

458 # Replace one of the channels with the colocalization map 

459 output_stack = np.stack([image_stack[chan_1], image_stack[chan_2], coloc_map]) 

460 

461 return output_stack, multi_results 

462 

463 

464def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager, backend: str) -> str: 

465 """Materialize single-channel cell counting results.""" 

466 # Generate output file paths based on the input path 

467 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

468 base_path = path.replace('.pkl', '') 

469 json_path = f"{base_path}.json" 

470 csv_path = f"{base_path}_details.csv" 

471 

472 # Ensure output directory exists for disk backend 

473 from pathlib import Path 

474 output_dir = Path(json_path).parent 

475 if backend == Backend.DISK.value: 

476 filemanager.ensure_directory(str(output_dir), backend) 

477 

478 summary = { 

479 "analysis_type": "single_channel_cell_counting", 

480 "total_slices": len(data), 

481 "results_per_slice": [] 

482 } 

483 rows = [] 

484 

485 total_cells = 0 

486 for result in data: 

487 total_cells += result.cell_count 

488 

489 # Add to summary 

490 summary["results_per_slice"].append({ 

491 "slice_index": result.slice_index, 

492 "method": result.method, 

493 "cell_count": result.cell_count, 

494 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0, 

495 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0, 

496 "parameters": result.parameters_used 

497 }) 

498 

499 # Add individual cell data to CSV 

500 for i, (pos, area, intensity, confidence) in enumerate(zip( 

501 result.cell_positions, result.cell_areas, 

502 result.cell_intensities, result.detection_confidence 

503 )): 

504 rows.append({ 

505 'slice_index': result.slice_index, 

506 'cell_id': f"slice_{result.slice_index}_cell_{i}", 

507 'x_position': pos[0], 

508 'y_position': pos[1], 

509 'cell_area': area, 

510 'cell_intensity': intensity, 

511 'detection_confidence': confidence, 

512 'detection_method': result.method 

513 }) 

514 

515 summary["total_cells_all_slices"] = total_cells 

516 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0 

517 

518 # Save JSON summary (overwrite if exists) 

519 json_content = json.dumps(summary, indent=2, default=str) 

520 # Remove existing file if it exists using filemanager 

521 if filemanager.exists(json_path, backend): 

522 filemanager.delete(json_path, backend) 

523 filemanager.save(json_content, json_path, backend) 

524 

525 # Save CSV details (overwrite if exists) 

526 if rows: 

527 df = pd.DataFrame(rows) 

528 csv_content = df.to_csv(index=False) 

529 # Remove existing file if it exists using filemanager 

530 if filemanager.exists(csv_path, backend): 

531 filemanager.delete(csv_path, backend) 

532 filemanager.save(csv_content, csv_path, backend) 

533 

534 return json_path 

535 

536 

537def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager, backend: str) -> str: 

538 """Materialize multi-channel cell counting and colocalization results.""" 

539 # Generate output file paths based on the input path 

540 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

541 base_path = path.replace('.pkl', '') 

542 json_path = f"{base_path}.json" 

543 csv_path = f"{base_path}_details.csv" 

544 

545 # Ensure output directory exists for disk backend 

546 from pathlib import Path 

547 output_dir = Path(json_path).parent 

548 if backend == Backend.DISK.value: 

549 filemanager.ensure_directory(str(output_dir), backend) 

550 

551 summary = { 

552 "analysis_type": "multi_channel_cell_counting_colocalization", 

553 "total_slices": len(data), 

554 "colocalization_summary": { 

555 "total_chan_1_cells": 0, 

556 "total_chan_2_cells": 0, 

557 "total_colocalized": 0, 

558 "average_colocalization_percentage": 0 

559 }, 

560 "results_per_slice": [] 

561 } 

562 rows = [] 

563 

564 total_coloc_pct = 0 

565 for result in data: 

566 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count 

567 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count 

568 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count 

569 total_coloc_pct += result.colocalization_percentage 

570 

571 # Add to summary 

572 summary["results_per_slice"].append({ 

573 "slice_index": result.slice_index, 

574 "chan_1_count": result.chan_1_results.cell_count, 

575 "chan_2_count": result.chan_2_results.cell_count, 

576 "colocalized_count": result.colocalized_count, 

577 "colocalization_percentage": result.colocalization_percentage, 

578 "chan_1_only": result.chan_1_only_count, 

579 "chan_2_only": result.chan_2_only_count, 

580 "colocalization_method": result.colocalization_method, 

581 "colocalization_metrics": result.colocalization_metrics 

582 }) 

583 

584 # Add colocalization details to CSV 

585 for i, pos in enumerate(result.overlap_positions): 

586 rows.append({ 

587 'slice_index': result.slice_index, 

588 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}", 

589 'x_position': pos[0], 

590 'y_position': pos[1], 

591 'colocalization_method': result.colocalization_method 

592 }) 

593 

594 summary["colocalization_summary"]["average_colocalization_percentage"] = ( 

595 total_coloc_pct / len(data) if data else 0 

596 ) 

597 

598 # Save JSON summary (overwrite if exists) 

599 json_content = json.dumps(summary, indent=2, default=str) 

600 # Remove existing file if it exists using filemanager 

601 if filemanager.exists(json_path, backend): 

602 filemanager.delete(json_path, backend) 

603 filemanager.save(json_content, json_path, backend) 

604 

605 # Save CSV details (overwrite if exists) 

606 if rows: 

607 df = pd.DataFrame(rows) 

608 csv_content = df.to_csv(index=False) 

609 # Remove existing file if it exists using filemanager 

610 if filemanager.exists(csv_path, backend): 

611 filemanager.delete(csv_path, backend) 

612 filemanager.save(csv_content, csv_path, backend) 

613 

614 return json_path 

615 

616 

617def _preprocess_image(image, gaussian_sigma: float, median_disk_size: int): 

618 """Apply preprocessing to enhance cell detection using pyclesperanto.""" 

619 # Assume image is already a pyclesperanto array 

620 gpu_image = image 

621 

622 # Gaussian blur to reduce noise 

623 if gaussian_sigma > 0: 

624 gpu_image = cle.gaussian_blur(gpu_image, sigma_x=gaussian_sigma, sigma_y=gaussian_sigma) 

625 

626 # Median filter to remove salt-and-pepper noise 

627 if median_disk_size > 0: 

628 gpu_image = cle.median_box(gpu_image, radius_x=median_disk_size, radius_y=median_disk_size) 

629 

630 # Return the GPU array 

631 return gpu_image 

632 

633 

634def _detect_cells_single_method( 

635 image: np.ndarray, 

636 slice_idx: int, 

637 method: str, 

638 params: Dict[str, Any] 

639) -> CellCountResult: 

640 """Detect cells using specified method.""" 

641 

642 if method == DetectionMethod.BLOB_LOG.value: 

643 return _detect_cells_blob_log(image, slice_idx, params) 

644 elif method == DetectionMethod.BLOB_DOG.value: 

645 return _detect_cells_blob_dog(image, slice_idx, params) 

646 elif method == DetectionMethod.BLOB_DOH.value: 

647 return _detect_cells_blob_doh(image, slice_idx, params) 

648 elif method == DetectionMethod.WATERSHED.value: 

649 return _detect_cells_watershed(image, slice_idx, params) 

650 elif method == DetectionMethod.THRESHOLD.value: 

651 return _detect_cells_threshold(image, slice_idx, params) 

652 else: 

653 raise ValueError(f"Unknown detection method: {method}") 

654 

655 

656def _detect_cells_blob_log(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

657 """Detect cells using fast LoG-like blob detection.""" 

658 gpu_image = image 

659 

660 # Use single scale for speed - average of min and max sigma 

661 sigma = (params["min_sigma"] + params["max_sigma"]) / 2 

662 

663 # Fast LoG approximation: Use DoG like blob_dog but with closer scales 

664 blurred1 = cle.gaussian_blur(gpu_image, sigma_x=sigma, sigma_y=sigma) 

665 blurred2 = cle.gaussian_blur(gpu_image, sigma_x=sigma*1.6, sigma_y=sigma*1.6) 

666 

667 # Difference approximates Laplacian 

668 edges = cle.subtract_images(blurred1, blurred2) 

669 

670 # Threshold the edge response 

671 max_response = cle.maximum_of_all_pixels(cle.absolute(edges)) 

672 threshold_val = params["threshold"] * max_response 

673 thresholded = cle.greater_constant(cle.absolute(edges), scalar=threshold_val) 

674 

675 # Find local maxima 

676 maxima = cle.detect_maxima(cle.absolute(edges), 

677 radius_x=int(sigma), 

678 radius_y=int(sigma)) 

679 

680 # Combine threshold and maxima 

681 valid_maxima = cle.binary_and(thresholded, maxima) 

682 

683 # Dilate maxima more aggressively to create proper cell-sized regions 

684 dilated = valid_maxima 

685 for _ in range(6): # More aggressive dilation 

686 dilated = cle.dilate_box(dilated) 

687 

688 # Label connected components 

689 labels = cle.connected_components_labeling(dilated) 

690 

691 # Remove small and large objects 

692 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"]) 

693 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"]) 

694 

695 # Get statistics 

696 if cle.maximum_of_all_pixels(labels) > 0: 

697 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

698 

699 positions = [] 

700 areas = [] 

701 intensities = [] 

702 confidences = [] 

703 

704 if 'centroid_x' in stats_dict and len(stats_dict['centroid_x']) > 0: 

705 for i, (x, y) in enumerate(zip(stats_dict['centroid_x'], stats_dict['centroid_y'])): 

706 positions.append((float(x), float(y))) 

707 

708 # Get area and intensity 

709 area = float(stats_dict['area'][i]) if i < len(stats_dict.get('area', [])) else sigma**2 

710 intensity = float(stats_dict['mean_intensity'][i]) if i < len(stats_dict.get('mean_intensity', [])) else 1.0 

711 

712 areas.append(area) 

713 intensities.append(intensity) 

714 confidences.append(min(1.0, intensity / max_response)) 

715 else: 

716 positions = [] 

717 areas = [] 

718 intensities = [] 

719 confidences = [] 

720 

721 return CellCountResult( 

722 slice_index=slice_idx, 

723 method="blob_log_pyclesperanto", 

724 cell_count=len(positions), 

725 cell_positions=positions, 

726 cell_areas=areas, 

727 cell_intensities=intensities, 

728 detection_confidence=confidences, 

729 parameters_used=params 

730 ) 

731 

732 

733def _detect_cells_blob_dog(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

734 """Detect cells using fast Difference of Gaussians blob detection.""" 

735 gpu_image = image 

736 

737 # Use only two scales for speed 

738 sigma1 = params["min_sigma"] 

739 sigma2 = params["max_sigma"] 

740 

741 # Apply Gaussian blurs 

742 blur1 = cle.gaussian_blur(gpu_image, sigma_x=sigma1, sigma_y=sigma1) 

743 blur2 = cle.gaussian_blur(gpu_image, sigma_x=sigma2, sigma_y=sigma2) 

744 

745 # Difference of Gaussians 

746 dog = cle.subtract_images(blur1, blur2) 

747 

748 # Threshold the DoG response 

749 max_response = cle.maximum_of_all_pixels(cle.absolute(dog)) 

750 threshold_val = params["threshold"] * max_response 

751 thresholded = cle.greater_constant(cle.absolute(dog), scalar=threshold_val) 

752 

753 # Find local maxima 

754 maxima = cle.detect_maxima(cle.absolute(dog), 

755 radius_x=int(sigma1), 

756 radius_y=int(sigma1)) 

757 

758 # Combine threshold and maxima 

759 valid_maxima = cle.binary_and(thresholded, maxima) 

760 

761 # Dilate maxima to create regions for area calculation 

762 dilated = valid_maxima 

763 for _ in range(3): # Moderate dilation 

764 dilated = cle.dilate_box(dilated) 

765 

766 # Label connected components 

767 labels = cle.connected_components_labeling(dilated) 

768 

769 # Remove small and large objects 

770 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"]) 

771 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"]) 

772 

773 # Get statistics 

774 if cle.maximum_of_all_pixels(labels) > 0: 

775 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

776 

777 positions = [] 

778 areas = [] 

779 intensities = [] 

780 confidences = [] 

781 

782 if 'centroid_x' in stats_dict and len(stats_dict['centroid_x']) > 0: 

783 for i, (x, y) in enumerate(zip(stats_dict['centroid_x'], stats_dict['centroid_y'])): 

784 positions.append((float(x), float(y))) 

785 

786 # Get area and intensity 

787 area = float(stats_dict['area'][i]) if i < len(stats_dict.get('area', [])) else sigma1**2 

788 intensity = float(stats_dict['mean_intensity'][i]) if i < len(stats_dict.get('mean_intensity', [])) else 1.0 

789 

790 areas.append(area) 

791 intensities.append(intensity) 

792 confidences.append(min(1.0, intensity / max_response)) 

793 else: 

794 positions = [] 

795 areas = [] 

796 intensities = [] 

797 confidences = [] 

798 

799 return CellCountResult( 

800 slice_index=slice_idx, 

801 method="blob_dog_pyclesperanto", 

802 cell_count=len(positions), 

803 cell_positions=positions, 

804 cell_areas=areas, 

805 cell_intensities=intensities, 

806 detection_confidence=confidences, 

807 parameters_used=params 

808 ) 

809 

810 

811 

812 # Extract the data we need from the statistics dictionary 

813 if 'label' in stats_dict and len(stats_dict['label']) > 0: 

814 # We have detected objects 

815 areas = stats_dict.get('area', []) 

816 labels_list = stats_dict.get('label', []) 

817 else: 

818 # No objects detected 

819 areas = [] 

820 labels_list = [] 

821 centroids_x = [] 

822 centroids_y = [] 

823 

824 # Process similar to blob_log 

825 positions = [] 

826 filtered_areas = [] 

827 intensities = [] 

828 confidences = [] 

829 

830 for i in range(len(labels_list)): 

831 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas): 

832 x = float(centroids_x[i]) 

833 y = float(centroids_y[i]) 

834 positions.append((x, y)) 

835 

836 # Get area 

837 area = float(areas[i]) 

838 filtered_areas.append(area) 

839 

840 # Mean intensity (if available in stats) 

841 mean_intensities = stats_dict.get('mean_intensity', []) 

842 if i < len(mean_intensities): 

843 intensity = float(mean_intensities[i]) 

844 else: 

845 intensity = 1.0 # Default value 

846 intensities.append(intensity) 

847 

848 # Use area as confidence measure 

849 confidence = min(1.0, area / (np.pi * params["max_sigma"]**2)) 

850 confidences.append(confidence) 

851 

852 filtered_data = _filter_by_area( 

853 positions, filtered_areas, intensities, confidences, 

854 params["min_cell_area"], params["max_cell_area"] 

855 ) 

856 

857 return CellCountResult( 

858 slice_index=slice_idx, 

859 method="blob_dog_pyclesperanto", 

860 cell_count=len(filtered_data[0]), 

861 cell_positions=filtered_data[0], 

862 cell_areas=filtered_data[1], 

863 cell_intensities=filtered_data[2], 

864 detection_confidence=filtered_data[3], 

865 parameters_used=params 

866 ) 

867 

868 

869def _detect_cells_blob_doh(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

870 """Detect cells using Hessian-like detection with pyclesperanto.""" 

871 # Assume image is already a pyclesperanto array 

872 gpu_image = image 

873 

874 # Apply Gaussian blur for smoothing 

875 sigma = (params["min_sigma"] + params["max_sigma"]) / 2 

876 blurred = cle.gaussian_blur(gpu_image, sigma_x=sigma, sigma_y=sigma) 

877 

878 # Apply edge detection (approximates Hessian determinant) 

879 edges = cle.binary_edge_detection(blurred) 

880 

881 # Apply threshold to original image for valid regions 

882 threshold_val = params["threshold"] * cle.maximum_of_all_pixels(gpu_image) 

883 thresholded = cle.greater_constant(gpu_image, scalar=threshold_val) 

884 

885 # Detect local maxima in edge response 

886 maxima = cle.detect_maxima_box(edges, 

887 radius_x=int(params["min_sigma"]), 

888 radius_y=int(params["min_sigma"])) 

889 

890 # Combine threshold and maxima 

891 combined = cle.binary_and(thresholded, maxima) 

892 

893 # Dilate the maxima points to create proper regions 

894 dilated = cle.dilate_box(combined) 

895 

896 # Label the dilated regions 

897 labels = cle.connected_components_labeling(dilated) 

898 

899 # Remove small and large objects (same as simple baseline) 

900 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"]) 

901 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"]) 

902 

903 # Get statistics - this returns a dictionary with centroids included 

904 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

905 

906 # Extract centroids directly from statistics (much simpler!) 

907 centroids_x = stats_dict.get('centroid_x', []) 

908 centroids_y = stats_dict.get('centroid_y', []) 

909 

910 # Extract the data we need from the statistics dictionary 

911 if 'label' in stats_dict and len(stats_dict['label']) > 0: 

912 # We have detected objects 

913 areas = stats_dict.get('area', []) 

914 labels_list = stats_dict.get('label', []) 

915 else: 

916 # No objects detected 

917 areas = [] 

918 labels_list = [] 

919 centroids_x = [] 

920 centroids_y = [] 

921 

922 # Process similar to other blob methods 

923 positions = [] 

924 filtered_areas = [] 

925 intensities = [] 

926 confidences = [] 

927 

928 for i in range(len(labels_list)): 

929 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas): 

930 x = float(centroids_x[i]) 

931 y = float(centroids_y[i]) 

932 positions.append((x, y)) 

933 

934 # Get area 

935 area = float(areas[i]) 

936 filtered_areas.append(area) 

937 

938 # Mean intensity (if available in stats) 

939 mean_intensities = stats_dict.get('mean_intensity', []) 

940 if i < len(mean_intensities): 

941 intensity = float(mean_intensities[i]) 

942 else: 

943 intensity = 1.0 # Default value 

944 intensities.append(intensity) 

945 

946 # Use area as confidence measure 

947 confidence = min(1.0, area / (np.pi * params["max_sigma"]**2)) 

948 confidences.append(confidence) 

949 

950 filtered_data = _filter_by_area( 

951 positions, filtered_areas, intensities, confidences, 

952 params["min_cell_area"], params["max_cell_area"] 

953 ) 

954 

955 return CellCountResult( 

956 slice_index=slice_idx, 

957 method="blob_doh_pyclesperanto", 

958 cell_count=len(filtered_data[0]), 

959 cell_positions=filtered_data[0], 

960 cell_areas=filtered_data[1], 

961 cell_intensities=filtered_data[2], 

962 detection_confidence=filtered_data[3], 

963 parameters_used=params 

964 ) 

965 

966 

967def _filter_by_area( 

968 positions: List[Tuple[float, float]], 

969 areas: List[float], 

970 intensities: List[float], 

971 confidences: List[float], 

972 min_area: float, 

973 max_area: float 

974) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]: 

975 """Filter detected cells by area constraints.""" 

976 filtered_positions = [] 

977 filtered_areas = [] 

978 filtered_intensities = [] 

979 filtered_confidences = [] 

980 

981 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences): 

982 if min_area <= area <= max_area: 

983 filtered_positions.append(pos) 

984 filtered_areas.append(area) 

985 filtered_intensities.append(intensity) 

986 filtered_confidences.append(confidence) 

987 

988 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences 

989 

990 

991def _non_maximum_suppression_3d(positions, scales, responses, overlap_threshold): 

992 """Apply non-maximum suppression across scale space.""" 

993 if len(positions) == 0: 

994 return [], [], [] 

995 

996 # Convert to numpy arrays for easier processing 

997 positions = np.array(positions) 

998 scales = np.array(scales) 

999 responses = np.array(responses) 

1000 

1001 # Sort by response strength (highest first) 

1002 sorted_indices = np.argsort(responses)[::-1] 

1003 

1004 keep = [] 

1005 for i in sorted_indices: 

1006 pos_i = positions[i] 

1007 scale_i = scales[i] 

1008 

1009 # Check if this detection overlaps with any already kept detection 

1010 should_keep = True 

1011 for j in keep: 

1012 pos_j = positions[j] 

1013 scale_j = scales[j] 

1014 

1015 # Calculate distance in space and scale 

1016 spatial_dist = np.sqrt(np.sum((pos_i - pos_j) ** 2)) 

1017 scale_dist = abs(scale_i - scale_j) / max(scale_i, scale_j) 

1018 

1019 # Check if they overlap significantly 

1020 overlap_radius = max(scale_i, scale_j) * (1 + overlap_threshold) 

1021 if spatial_dist < overlap_radius and scale_dist < overlap_threshold: 

1022 should_keep = False 

1023 break 

1024 

1025 if should_keep: 

1026 keep.append(i) 

1027 

1028 # Return filtered results 

1029 return positions[keep].tolist(), scales[keep].tolist(), responses[keep].tolist() 

1030 

1031 

1032def _detect_cells_watershed(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

1033 """Detect cells using watershed segmentation with pyclesperanto.""" 

1034 # Assume image is already a pyclesperanto array 

1035 gpu_image = image 

1036 

1037 # Use Otsu thresholding (optimal for microscopy images) 

1038 binary = cle.threshold_otsu(gpu_image) 

1039 

1040 # Remove small and large objects with explicit memory cleanup 

1041 temp_labels = cle.connected_components_labeling(binary) 

1042 

1043 # Clean up intermediate arrays to prevent GPU OOM 

1044 temp_labels_filtered = cle.remove_small_labels(temp_labels, minimum_size=params["min_cell_area"]) 

1045 del temp_labels # Explicit cleanup 

1046 

1047 temp_labels = cle.remove_large_labels(temp_labels_filtered, maximum_size=params["max_cell_area"]) 

1048 del temp_labels_filtered # Explicit cleanup 

1049 

1050 binary_filtered = cle.greater_constant(temp_labels, scalar=0) 

1051 del binary # Cleanup original binary 

1052 binary = binary_filtered 

1053 

1054 # Remove border objects if requested 

1055 if params["remove_border_cells"]: 

1056 temp_labels_border = cle.connected_components_labeling(binary) 

1057 del temp_labels # Cleanup previous temp_labels 

1058 

1059 temp_labels = cle.remove_labels_on_edges(temp_labels_border) 

1060 del temp_labels_border # Explicit cleanup 

1061 

1062 binary_final = cle.greater_constant(temp_labels, scalar=0) 

1063 del binary # Cleanup previous binary 

1064 binary = binary_final 

1065 

1066 # Since pyclesperanto doesn't have watershed, use connected components directly 

1067 labels = cle.connected_components_labeling(binary) 

1068 del binary # Cleanup binary mask 

1069 

1070 # Get statistics - this returns a dictionary with centroids included 

1071 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

1072 

1073 # Cleanup labels array after getting statistics 

1074 del labels 

1075 

1076 # Extract centroids directly from statistics (much simpler!) 

1077 centroids_x = stats_dict.get('centroid_x', []) 

1078 centroids_y = stats_dict.get('centroid_y', []) 

1079 

1080 # Extract the data we need from the statistics dictionary 

1081 if 'label' in stats_dict and len(stats_dict['label']) > 0: 

1082 # We have detected objects 

1083 areas = stats_dict.get('area', []) 

1084 labels_list = stats_dict.get('label', []) 

1085 else: 

1086 # No objects detected 

1087 areas = [] 

1088 labels_list = [] 

1089 centroids_x = [] 

1090 centroids_y = [] 

1091 

1092 positions = [] 

1093 filtered_areas = [] 

1094 intensities = [] 

1095 confidences = [] 

1096 

1097 for i in range(len(labels_list)): 

1098 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas): 

1099 area = float(areas[i]) 

1100 

1101 # Filter by area 

1102 if params["min_cell_area"] <= area <= params["max_cell_area"]: 

1103 x = float(centroids_x[i]) 

1104 y = float(centroids_y[i]) 

1105 positions.append((x, y)) 

1106 

1107 filtered_areas.append(area) 

1108 

1109 # Mean intensity (if available in stats) 

1110 mean_intensities = stats_dict.get('mean_intensity', []) 

1111 if i < len(mean_intensities): 

1112 intensity = float(mean_intensities[i]) 

1113 else: 

1114 intensity = 1.0 # Default value 

1115 intensities.append(intensity) 

1116 

1117 # Use area as confidence measure (normalized) 

1118 confidence = min(1.0, area / params["max_cell_area"]) 

1119 confidences.append(confidence) 

1120 

1121 # Force garbage collection to free GPU memory 

1122 gc.collect() 

1123 

1124 return CellCountResult( 

1125 slice_index=slice_idx, 

1126 method="watershed_pyclesperanto", 

1127 cell_count=len(positions), 

1128 cell_positions=positions, 

1129 cell_areas=filtered_areas, 

1130 cell_intensities=intensities, 

1131 detection_confidence=confidences, 

1132 parameters_used=params 

1133 ) 

1134 

1135 

1136def _detect_cells_threshold(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

1137 """Detect cells using simple thresholding and connected components with pyclesperanto.""" 

1138 # Image is already a pyclesperanto array - no conversion needed 

1139 gpu_image = image 

1140 

1141 # Apply threshold (all operations stay on GPU) 

1142 max_intensity = cle.maximum_of_all_pixels(gpu_image) 

1143 threshold_val = params["threshold"] * max_intensity 

1144 binary = cle.greater_constant(gpu_image, scalar=threshold_val) 

1145 

1146 # Remove small objects with explicit memory cleanup 

1147 temp_labels = cle.connected_components_labeling(binary) 

1148 temp_labels_filtered = cle.remove_small_labels(temp_labels, minimum_size=params["min_cell_area"]) 

1149 del temp_labels # Explicit cleanup 

1150 

1151 binary_filtered = cle.greater_constant(temp_labels_filtered, scalar=0) 

1152 del binary # Cleanup original binary 

1153 del temp_labels_filtered # Cleanup filtered labels 

1154 binary = binary_filtered 

1155 

1156 # Remove border objects if requested with explicit cleanup 

1157 if params["remove_border_cells"]: 

1158 temp_labels = cle.connected_components_labeling(binary) 

1159 temp_labels_no_border = cle.remove_labels_on_edges(temp_labels) 

1160 del temp_labels # Explicit cleanup 

1161 

1162 binary_final = cle.greater_constant(temp_labels_no_border, scalar=0) 

1163 del binary # Cleanup previous binary 

1164 del temp_labels_no_border # Cleanup no border labels 

1165 binary = binary_final 

1166 

1167 # Label connected components (final labeling on GPU) 

1168 labels = cle.connected_components_labeling(binary) 

1169 del binary # Cleanup binary mask 

1170 

1171 # Get statistics (operates on GPU arrays, returns dictionary) 

1172 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

1173 del labels # Cleanup labels after getting statistics 

1174 

1175 # Extract centroids directly from statistics (much simpler!) 

1176 centroids_x = stats_dict.get('centroid_x', []) 

1177 centroids_y = stats_dict.get('centroid_y', []) 

1178 

1179 # Extract the data we need from the statistics dictionary 

1180 if 'label' in stats_dict and len(stats_dict['label']) > 0: 

1181 # We have detected objects 

1182 areas = stats_dict.get('area', []) 

1183 labels_list = stats_dict.get('label', []) 

1184 else: 

1185 # No objects detected 

1186 areas = [] 

1187 labels_list = [] 

1188 centroids_x = [] 

1189 centroids_y = [] 

1190 

1191 max_intensity_cpu = float(max_intensity) 

1192 

1193 positions = [] 

1194 filtered_areas = [] 

1195 intensities = [] 

1196 confidences = [] 

1197 

1198 for i in range(len(labels_list)): 

1199 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas): 

1200 area = float(areas[i]) 

1201 

1202 # Filter by area 

1203 if params["min_cell_area"] <= area <= params["max_cell_area"]: 

1204 x = float(centroids_x[i]) 

1205 y = float(centroids_y[i]) 

1206 positions.append((x, y)) 

1207 

1208 filtered_areas.append(area) 

1209 

1210 # Mean intensity (if available in stats) 

1211 mean_intensities = stats_dict.get('mean_intensity', []) 

1212 if i < len(mean_intensities): 

1213 mean_intensity = float(mean_intensities[i]) 

1214 else: 

1215 mean_intensity = 1.0 # Default value 

1216 intensities.append(mean_intensity) 

1217 

1218 # Use intensity as confidence measure 

1219 confidence = mean_intensity / max_intensity_cpu if max_intensity_cpu > 0 else 1.0 

1220 confidences.append(confidence) 

1221 

1222 # Force garbage collection to free GPU memory 

1223 gc.collect() 

1224 

1225 return CellCountResult( 

1226 slice_index=slice_idx, 

1227 method="threshold_pyclesperanto", 

1228 cell_count=len(positions), 

1229 cell_positions=positions, 

1230 cell_areas=filtered_areas, 

1231 cell_intensities=intensities, 

1232 detection_confidence=confidences, 

1233 parameters_used=params 

1234 ) 

1235 

1236 

1237def _analyze_colocalization( 

1238 chan_1_result: CellCountResult, 

1239 chan_2_result: CellCountResult, 

1240 method: str, 

1241 max_distance: float, 

1242 min_overlap_area: float, 

1243 intensity_threshold: float 

1244) -> MultiChannelResult: 

1245 """Analyze colocalization between two channels.""" 

1246 

1247 if method == ColocalizationMethod.DISTANCE_BASED.value: 

1248 return _colocalization_distance_based( 

1249 chan_1_result, chan_2_result, max_distance 

1250 ) 

1251 elif method == ColocalizationMethod.OVERLAP_AREA.value: 

1252 return _colocalization_overlap_based( 

1253 chan_1_result, chan_2_result, min_overlap_area 

1254 ) 

1255 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value: 

1256 return _colocalization_intensity_based( 

1257 chan_1_result, chan_2_result, intensity_threshold 

1258 ) 

1259 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value: 

1260 return _colocalization_manders( 

1261 chan_1_result, chan_2_result, intensity_threshold 

1262 ) 

1263 else: 

1264 raise ValueError(f"Unknown colocalization method: {method}") 

1265 

1266 

1267def _colocalization_distance_based( 

1268 chan_1_result: CellCountResult, 

1269 chan_2_result: CellCountResult, 

1270 max_distance: float 

1271) -> MultiChannelResult: 

1272 """Perform distance-based colocalization analysis.""" 

1273 

1274 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1275 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based") 

1276 

1277 # Convert positions to arrays 

1278 pos_1 = np.array(chan_1_result.cell_positions) 

1279 pos_2 = np.array(chan_2_result.cell_positions) 

1280 

1281 # Calculate pairwise distances 

1282 distances = cdist(pos_1, pos_2) 

1283 

1284 # Find colocalized pairs 

1285 colocalized_pairs = [] 

1286 used_chan_2 = set() 

1287 

1288 for i in range(len(pos_1)): 

1289 # Find closest cell in channel 2 

1290 min_dist_idx = np.argmin(distances[i]) 

1291 min_dist = distances[i, min_dist_idx] 

1292 

1293 # Check if within distance threshold and not already used 

1294 if min_dist <= max_distance and min_dist_idx not in used_chan_2: 

1295 colocalized_pairs.append((i, min_dist_idx)) 

1296 used_chan_2.add(min_dist_idx) 

1297 

1298 # Calculate metrics 

1299 colocalized_count = len(colocalized_pairs) 

1300 total_cells = len(pos_1) + len(pos_2) 

1301 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1302 

1303 chan_1_only = len(pos_1) - colocalized_count 

1304 chan_2_only = len(pos_2) - colocalized_count 

1305 

1306 # Extract colocalized positions (average of paired positions) 

1307 overlap_positions = [] 

1308 for i, j in colocalized_pairs: 

1309 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist() 

1310 overlap_positions.append(tuple(avg_pos)) 

1311 

1312 # Calculate additional metrics 

1313 if colocalized_pairs: 

1314 avg_distance = np.mean([distances[i, j] for i, j in colocalized_pairs]) 

1315 max_distance_found = np.max([distances[i, j] for i, j in colocalized_pairs]) 

1316 else: 

1317 avg_distance = 0 

1318 max_distance_found = 0 

1319 

1320 metrics = { 

1321 "average_colocalization_distance": float(avg_distance), 

1322 "max_colocalization_distance": float(max_distance_found), 

1323 "distance_threshold_used": max_distance 

1324 } 

1325 

1326 return MultiChannelResult( 

1327 slice_index=chan_1_result.slice_index, 

1328 chan_1_results=chan_1_result, 

1329 chan_2_results=chan_2_result, 

1330 colocalization_method="distance_based", 

1331 colocalized_count=colocalized_count, 

1332 colocalization_percentage=colocalization_percentage, 

1333 chan_1_only_count=chan_1_only, 

1334 chan_2_only_count=chan_2_only, 

1335 colocalization_metrics=metrics, 

1336 overlap_positions=overlap_positions 

1337 ) 

1338 

1339 

1340def _create_empty_coloc_result( 

1341 chan_1_result: CellCountResult, 

1342 chan_2_result: CellCountResult, 

1343 method: str 

1344) -> MultiChannelResult: 

1345 """Create empty colocalization result when no cells found.""" 

1346 return MultiChannelResult( 

1347 slice_index=chan_1_result.slice_index, 

1348 chan_1_results=chan_1_result, 

1349 chan_2_results=chan_2_result, 

1350 colocalization_method=method, 

1351 colocalized_count=0, 

1352 colocalization_percentage=0.0, 

1353 chan_1_only_count=chan_1_result.cell_count, 

1354 chan_2_only_count=chan_2_result.cell_count, 

1355 colocalization_metrics={}, 

1356 overlap_positions=[] 

1357 ) 

1358 

1359 

1360def _colocalization_overlap_based( 

1361 chan_1_result: CellCountResult, 

1362 chan_2_result: CellCountResult, 

1363 min_overlap_area: float 

1364) -> MultiChannelResult: 

1365 """Perform area overlap-based colocalization analysis.""" 

1366 # This is a simplified implementation - in practice, you'd need actual segmentation masks 

1367 # For now, we'll use distance as a proxy for overlap 

1368 

1369 # Use distance-based method with smaller threshold as approximation 

1370 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly 

1371 

1372 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold) 

1373 result.colocalization_method = "overlap_area" 

1374 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area 

1375 result.colocalization_metrics["note"] = "Approximated using distance-based method" 

1376 

1377 return result 

1378 

1379 

1380def _colocalization_intensity_based( 

1381 chan_1_result: CellCountResult, 

1382 chan_2_result: CellCountResult, 

1383 intensity_threshold: float 

1384) -> MultiChannelResult: 

1385 """Perform intensity correlation-based colocalization analysis.""" 

1386 

1387 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1388 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation") 

1389 

1390 # Use distance-based pairing first 

1391 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0) 

1392 

1393 # Filter pairs based on intensity correlation 

1394 colocalized_pairs = [] 

1395 overlap_positions = [] 

1396 

1397 pos_1 = np.array(chan_1_result.cell_positions) 

1398 pos_2 = np.array(chan_2_result.cell_positions) 

1399 

1400 for i, (x1, y1) in enumerate(chan_1_result.cell_positions): 

1401 for j, (x2, y2) in enumerate(chan_2_result.cell_positions): 

1402 # Calculate distance 

1403 dist = np.sqrt((x1 - x2)**2 + (y1 - y2)**2) 

1404 

1405 if dist <= 5.0: # Within reasonable distance 

1406 # Check intensity correlation 

1407 int_1 = chan_1_result.cell_intensities[i] 

1408 int_2 = chan_2_result.cell_intensities[j] 

1409 

1410 # Simple intensity correlation: both above threshold 

1411 if int_1 >= intensity_threshold and int_2 >= intensity_threshold: 

1412 colocalized_pairs.append((i, j)) 

1413 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2) 

1414 overlap_positions.append(avg_pos) 

1415 break # One-to-one mapping 

1416 

1417 colocalized_count = len(colocalized_pairs) 

1418 total_cells = len(pos_1) + len(pos_2) 

1419 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1420 

1421 metrics = { 

1422 "intensity_threshold_used": intensity_threshold, 

1423 "correlation_method": "threshold_based" 

1424 } 

1425 

1426 return MultiChannelResult( 

1427 slice_index=chan_1_result.slice_index, 

1428 chan_1_results=chan_1_result, 

1429 chan_2_results=chan_2_result, 

1430 colocalization_method="intensity_correlation", 

1431 colocalized_count=colocalized_count, 

1432 colocalization_percentage=colocalization_percentage, 

1433 chan_1_only_count=len(pos_1) - colocalized_count, 

1434 chan_2_only_count=len(pos_2) - colocalized_count, 

1435 colocalization_metrics=metrics, 

1436 overlap_positions=overlap_positions 

1437 ) 

1438 

1439 

1440def _colocalization_manders( 

1441 chan_1_result: CellCountResult, 

1442 chan_2_result: CellCountResult, 

1443 intensity_threshold: float 

1444) -> MultiChannelResult: 

1445 """Calculate Manders colocalization coefficients.""" 

1446 

1447 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1448 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients") 

1449 

1450 # Simplified Manders calculation based on detected cells 

1451 # In practice, this would use pixel-level intensity analysis 

1452 

1453 # Use intensity-based method as foundation 

1454 intensity_result = _colocalization_intensity_based( 

1455 chan_1_result, chan_2_result, intensity_threshold 

1456 ) 

1457 

1458 # Calculate Manders-like coefficients 

1459 total_int_1 = sum(chan_1_result.cell_intensities) 

1460 total_int_2 = sum(chan_2_result.cell_intensities) 

1461 

1462 # Simplified: assume colocalized cells contribute their full intensity 

1463 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in 

1464 [(i, j) for i in range(len(chan_1_result.cell_positions)) 

1465 for j in range(len(chan_2_result.cell_positions)) 

1466 if (i, j) in [(0, 0)]]) # Simplified placeholder 

1467 

1468 # Manders coefficients (M1 and M2) 

1469 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0 

1470 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified 

1471 

1472 intensity_result.colocalization_method = "manders_coefficients" 

1473 intensity_result.colocalization_metrics.update({ 

1474 "manders_m1": m1, 

1475 "manders_m2": m2, 

1476 "note": "Simplified cell-based Manders calculation" 

1477 }) 

1478 

1479 return intensity_result 

1480 

1481 

1482def _create_segmentation_visualization( 

1483 image: np.ndarray, 

1484 positions: List[Tuple[float, float]], 

1485 max_sigma: float 

1486) -> np.ndarray: 

1487 """Create segmentation visualization with detected cells marked.""" 

1488 # Convert pyclesperanto array to numpy only when needed for visualization 

1489 import pyclesperanto as cle 

1490 if hasattr(image, 'shape') and hasattr(image, 'dtype') and not isinstance(image, np.ndarray): 

1491 # This is a pyclesperanto array - convert to numpy only for final visualization 

1492 visualization = cle.pull(image).copy() 

1493 else: 

1494 visualization = image.copy() 

1495 

1496 # Mark detected cells 

1497 for x, y in positions: 

1498 # Create small circular markers 

1499 rr, cc = np.ogrid[:image.shape[0], :image.shape[1]] 

1500 mask = (rr - y)**2 + (cc - x)**2 <= (max_sigma * 2)**2 

1501 

1502 # Ensure indices are within bounds 

1503 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1]) 

1504 mask = mask & valid_mask 

1505 

1506 visualization[mask] = visualization.max() # Bright markers 

1507 

1508 return visualization 

1509 

1510 

1511def count_cells_simple_baseline( 

1512 image: np.ndarray, # 2D image only 

1513 threshold: float = 0.1, 

1514 min_cell_area: int = 50, 

1515 max_cell_area: int = 5000 

1516) -> Tuple[np.ndarray, int, List[Tuple[float, float]]]: 

1517 """ 

1518 Simple baseline cell counting using pyclesperanto. 

1519 Based on analyse_blobs.ipynb and voronoi_otsu_labeling.ipynb examples. 

1520 

1521 Args: 

1522 image: 2D numpy array 

1523 threshold: Threshold as fraction of max intensity (0.0-1.0) 

1524 min_cell_area: Minimum area in pixels 

1525 max_cell_area: Maximum area in pixels 

1526 

1527 Returns: 

1528 segmentation_mask: 2D array with labeled cells 

1529 cell_count: Number of detected cells 

1530 cell_positions: List of (x, y) centroid positions 

1531 """ 

1532 # Convert to pyclesperanto array 

1533 gpu_image = cle.push(image.astype(np.float32)) 

1534 

1535 # Apply threshold 

1536 max_intensity = cle.maximum_of_all_pixels(gpu_image) 

1537 threshold_val = threshold * max_intensity 

1538 binary = cle.greater_constant(gpu_image, scalar=threshold_val) 

1539 

1540 # Connected components labeling 

1541 labels = cle.connected_components_labeling(binary) 

1542 

1543 # Remove small and large objects 

1544 labels = cle.remove_small_labels(labels, minimum_size=min_cell_area) 

1545 labels = cle.remove_large_labels(labels, maximum_size=max_cell_area) 

1546 

1547 # Get statistics 

1548 stats = cle.statistics_of_labelled_pixels(gpu_image, labels) 

1549 

1550 # Extract results 

1551 if 'label' in stats and len(stats['label']) > 0: 

1552 cell_count = len(stats['label']) 

1553 centroids_x = stats.get('centroid_x', []) 

1554 centroids_y = stats.get('centroid_y', []) 

1555 cell_positions = [(float(x), float(y)) for x, y in zip(centroids_x, centroids_y)] 

1556 else: 

1557 cell_count = 0 

1558 cell_positions = [] 

1559 

1560 # Convert result back to numpy 

1561 segmentation_mask = cle.pull(labels) 

1562 

1563 return segmentation_mask, cell_count, cell_positions 

1564 

1565 

1566def _create_colocalization_map( 

1567 chan_1_img: np.ndarray, 

1568 chan_2_img: np.ndarray, 

1569 coloc_result: MultiChannelResult 

1570) -> np.ndarray: 

1571 """Create colocalization visualization map.""" 

1572 # Create RGB-like visualization 

1573 coloc_map = np.zeros_like(chan_1_img) 

1574 

1575 # Mark colocalized positions 

1576 for x, y in coloc_result.overlap_positions: 

1577 # Create markers for colocalized cells 

1578 rr, cc = np.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]] 

1579 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius 

1580 

1581 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1]) 

1582 mask = mask & valid_mask 

1583 

1584 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers 

1585 

1586 return coloc_map