Coverage for openhcs/processing/backends/analysis/cell_counting_pyclesperanto.py: 1.3%

605 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2GPU-accelerated cell counting and multi-channel colocalization analysis for OpenHCS. 

3 

4This module provides comprehensive cell counting capabilities using pyclesperanto, 

5supporting both single-channel and multi-channel analysis with various detection 

6methods and colocalization metrics. 

7""" 

8 

9import numpy as np 

10import logging 

11import gc 

12from typing import Dict, List, Tuple, Any, Optional, Union 

13from dataclasses import dataclass 

14from enum import Enum 

15 

16logger = logging.getLogger(__name__) 

17 

18# Core scientific computing imports 

19import pandas as pd 

20import json 

21import pyclesperanto as cle 

22from scipy.spatial.distance import cdist 

23 

24# OpenHCS imports 

25from openhcs.core.memory.decorators import pyclesperanto as pyclesperanto_func 

26from openhcs.core.pipeline.function_contracts import special_outputs 

27from openhcs.constants.constants import Backend 

28 

29 

30class DetectionMethod(Enum): 

31 """Cell detection methods available.""" 

32 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells) 

33 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose) 

34 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells) 

35 WATERSHED = "watershed" # Watershed segmentation (best for touching cells) 

36 THRESHOLD = "threshold" # Simple thresholding (fastest, basic) 

37 

38 

39class ColocalizationMethod(Enum): 

40 """Methods for multi-channel colocalization analysis.""" 

41 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap 

42 DISTANCE_BASED = "distance_based" # Based on centroid distances 

43 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation 

44 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients 

45 

46 

47class ThresholdMethod(Enum): 

48 """Automatic thresholding methods for watershed segmentation.""" 

49 OTSU = "otsu" # Otsu's method (good for bimodal histograms) 

50 LI = "li" # Li's method (good for low contrast images) 

51 MANUAL = "manual" # Manual threshold value (0.0-1.0) 

52 

53 

54 

55 

56 

57@dataclass 

58class CellCountResult: 

59 """Results for single-channel cell counting.""" 

60 slice_index: int 

61 method: str 

62 cell_count: int 

63 cell_positions: List[Tuple[float, float]] # (x, y) centroids 

64 cell_areas: List[float] 

65 cell_intensities: List[float] 

66 detection_confidence: List[float] 

67 parameters_used: Dict[str, Any] 

68 

69 

70@dataclass 

71class MultiChannelResult: 

72 """Results for multi-channel cell counting and colocalization.""" 

73 slice_index: int 

74 chan_1_results: CellCountResult 

75 chan_2_results: CellCountResult 

76 colocalization_method: str 

77 colocalized_count: int 

78 colocalization_percentage: float 

79 chan_1_only_count: int 

80 chan_2_only_count: int 

81 colocalization_metrics: Dict[str, float] 

82 overlap_positions: List[Tuple[float, float]] 

83 

84 

85def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager) -> str: 

86 """Materialize cell counting results as analysis-ready CSV and JSON formats.""" 

87 

88 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}") 

89 

90 # Convert CLE binary_mask to NumPy if present 

91 if data: 

92 for result in data: 

93 if result.binary_mask is not None: 

94 result.binary_mask = np.asarray(result.binary_mask) 

95 if isinstance(result, MultiChannelResult): 

96 if result.chan_1_results.binary_mask is not None: 

97 result.chan_1_results.binary_mask = np.asarray(result.chan_1_results.binary_mask) 

98 if result.chan_2_results.binary_mask is not None: 

99 result.chan_2_results.binary_mask = np.asarray(result.chan_2_results.binary_mask) 

100 

101 # Determine if this is single-channel or multi-channel data 

102 if not data: 

103 logger.warning(f"🔬 CELL_COUNT_MATERIALIZE: No data to materialize") 

104 return path 

105 

106 is_multi_channel = isinstance(data[0], MultiChannelResult) 

107 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}") 

108 

109 if is_multi_channel: 

110 return _materialize_multi_channel_results(data, path, filemanager) 

111 else: 

112 return _materialize_single_channel_results(data, path, filemanager) 

113 

114 

115def materialize_segmentation_masks(data: List[np.ndarray], path: str, filemanager) -> str: 

116 """Materialize segmentation masks as individual TIFF files.""" 

117 

118 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, masks_count={len(data) if data else 0}") 

119 

120 # Convert CLE arrays to NumPy 

121 if data: 

122 data = [np.asarray(mask) for mask in data] 

123 

124 if not data: 

125 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)") 

126 # Create empty summary file to indicate no masks were generated 

127 summary_path = path.replace('.pkl', '_segmentation_summary.txt') 

128 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n" 

129 from openhcs.constants.constants import Backend 

130 filemanager.save(summary_content, summary_path, Backend.DISK.value) 

131 return summary_path 

132 

133 # Generate output file paths based on the input path 

134 base_path = path.replace('.pkl', '') 

135 

136 # Save each mask as a separate TIFF file 

137 for i, mask in enumerate(data): 

138 mask_filename = f"{base_path}_slice_{i:03d}.tif" 

139 

140 # Convert mask to appropriate dtype for saving (uint16 to match input images) 

141 if mask.dtype != np.uint16: 

142 # Normalize to uint16 range if needed 

143 if mask.max() <= 1.0: 

144 mask_uint16 = (mask * 65535).astype(np.uint16) 

145 else: 

146 mask_uint16 = mask.astype(np.uint16) 

147 else: 

148 mask_uint16 = mask 

149 

150 # Save using filemanager 

151 from openhcs.constants.constants import Backend 

152 filemanager.save(mask_uint16, mask_filename, Backend.DISK.value) 

153 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename}") 

154 

155 # Return summary path 

156 summary_path = f"{base_path}_segmentation_summary.txt" 

157 summary_content = f"Segmentation masks saved: {len(data)} files\n" 

158 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n" 

159 summary_content += f"Mask dtype: {data[0].dtype}\n" 

160 summary_content += f"Mask shape: {data[0].shape}\n" 

161 

162 filemanager.save(summary_content, summary_path, Backend.DISK.value) 

163 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks") 

164 

165 return summary_path 

166 

167 

168@pyclesperanto_func 

169@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks)) 

170def count_cells_single_channel( 

171 image_stack: np.ndarray, 

172 # Detection method and parameters 

173 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

174 # Blob detection parameters 

175 min_sigma: float = 1.0, # Minimum blob size (pixels) 

176 max_sigma: float = 10.0, # Maximum blob size (pixels) 

177 num_sigma: int = 10, # Number of sigma values to test 

178 threshold: float = 0.1, # Detection threshold (0.0-1.0) 

179 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

180 # Watershed parameters 

181 watershed_footprint_size: int = 3, # Local maxima footprint size 

182 watershed_min_distance: int = 5, # Minimum distance between peaks 

183 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods 

184 # Preprocessing parameters 

185 enable_preprocessing: bool = True, 

186 gaussian_sigma: float = 1.0, # Gaussian blur sigma 

187 median_disk_size: int = 1, # Median filter disk size 

188 # Filtering parameters 

189 min_cell_area: int = 10, # Minimum cell area (pixels) 

190 max_cell_area: int = 1000, # Maximum cell area (pixels) 

191 remove_border_cells: bool = True, # Remove cells touching image border 

192 # Output parameters 

193 return_segmentation_mask: bool = False 

194) -> Tuple[np.ndarray, List[CellCountResult]]: 

195 """ 

196 Count cells in single-channel image stack using various detection methods. 

197  

198 Args: 

199 image_stack: 3D array (Z, Y, X) where each Z slice is processed independently 

200 detection_method: Method for cell detection (see DetectionMethod enum) 

201 min_sigma: Minimum blob size for blob detection methods 

202 max_sigma: Maximum blob size for blob detection methods 

203 num_sigma: Number of sigma values to test for blob detection 

204 threshold: Detection threshold (method-dependent) 

205 overlap: Maximum overlap between detected blobs 

206 watershed_footprint_size: Footprint size for local maxima detection 

207 watershed_min_distance: Minimum distance between watershed peaks 

208 watershed_threshold_method: Thresholding method for watershed 

209 enable_preprocessing: Apply Gaussian and median filtering 

210 gaussian_sigma: Standard deviation for Gaussian blur 

211 median_disk_size: Disk size for median filtering 

212 min_cell_area: Minimum area for valid cells 

213 max_cell_area: Maximum area for valid cells 

214 remove_border_cells: Remove cells touching image borders 

215 return_segmentation_mask: Return segmentation masks in output 

216  

217 Returns: 

218 output_stack: Original image stack unchanged (Z, Y, X) 

219 cell_count_results: List of CellCountResult objects for each slice 

220 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True 

221 """ 

222 if image_stack.ndim != 3: 

223 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

224 

225 results = [] 

226 segmentation_masks = [] 

227 

228 # Store parameters for reproducibility (convert enums to values) 

229 parameters = { 

230 "detection_method": detection_method.value, 

231 "min_sigma": min_sigma, 

232 "max_sigma": max_sigma, 

233 "num_sigma": num_sigma, 

234 "threshold": threshold, 

235 "overlap": overlap, 

236 "watershed_footprint_size": watershed_footprint_size, 

237 "watershed_min_distance": watershed_min_distance, 

238 "watershed_threshold_method": watershed_threshold_method.value if hasattr(watershed_threshold_method, 'value') else watershed_threshold_method, 

239 "gaussian_sigma": gaussian_sigma, 

240 "median_disk_size": median_disk_size, 

241 "min_cell_area": min_cell_area, 

242 "max_cell_area": max_cell_area, 

243 "remove_border_cells": remove_border_cells 

244 } 

245 

246 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method") 

247 

248 for z_idx in range(image_stack.shape[0]): 

249 # Extract slice - keep as pyclesperanto array 

250 slice_img = image_stack[z_idx] 

251 

252 # Apply preprocessing if enabled (all operations stay in GPU) 

253 if enable_preprocessing: 

254 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size) 

255 

256 # Detect cells using specified method (slice_img stays as cle array) 

257 result = _detect_cells_single_method( 

258 slice_img, z_idx, detection_method.value, parameters 

259 ) 

260 

261 results.append(result) 

262 

263 # Create segmentation mask if requested 

264 if return_segmentation_mask: 

265 segmentation_mask = _create_segmentation_visualization( 

266 slice_img, result.cell_positions, max_sigma 

267 ) 

268 segmentation_masks.append(segmentation_mask) 

269 

270 # Always return segmentation masks (empty list if not requested) 

271 # This ensures consistent return signature for special outputs system 

272 return image_stack, results, segmentation_masks 

273 

274 

275@pyclesperanto_func 

276@special_outputs(("multi_channel_counts", materialize_cell_counts)) 

277def count_cells_multi_channel( 

278 image_stack: np.ndarray, 

279 chan_1: int, # Index of first channel (positional arg) 

280 chan_2: int, # Index of second channel (positional arg) 

281 # Detection parameters for channel 1 (all single-channel params available) 

282 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

283 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels) 

284 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels) 

285 chan_1_num_sigma: int = 10, # Number of sigma values to test 

286 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

287 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

288 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size 

289 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks 

290 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

291 chan_1_enable_preprocessing: bool = True, # Apply preprocessing 

292 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

293 chan_1_median_disk_size: int = 1, # Median filter disk size 

294 chan_1_min_area: int = 10, # Minimum cell area (pixels) 

295 chan_1_max_area: int = 1000, # Maximum cell area (pixels) 

296 chan_1_remove_border_cells: bool = True, # Remove cells touching border 

297 # Detection parameters for channel 2 (all single-channel params available) 

298 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons 

299 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels) 

300 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels) 

301 chan_2_num_sigma: int = 10, # Number of sigma values to test 

302 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0) 

303 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0) 

304 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size 

305 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks 

306 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method 

307 chan_2_enable_preprocessing: bool = True, # Apply preprocessing 

308 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma 

309 chan_2_median_disk_size: int = 1, # Median filter disk size 

310 chan_2_min_area: int = 10, # Minimum cell area (pixels) 

311 chan_2_max_area: int = 1000, # Maximum cell area (pixels) 

312 chan_2_remove_border_cells: bool = True, # Remove cells touching border 

313 # Colocalization parameters 

314 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods 

315 max_distance: float = 5.0, # Maximum distance for colocalization (pixels) 

316 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method 

317 intensity_threshold: float = 0.5, # Threshold for intensity-based methods 

318 # Output parameters 

319 return_colocalization_map: bool = False 

320) -> Tuple[np.ndarray, List[MultiChannelResult]]: 

321 """ 

322 Count cells in multi-channel image stack with colocalization analysis. 

323 

324 Each channel can be processed with independent parameters, providing the same 

325 flexibility as the single-channel function for each channel separately. 

326 

327 Args: 

328 image_stack: 3D array (Z, Y, X) where Z represents different channels 

329 chan_1: Index of first channel in the stack (positional) 

330 chan_2: Index of second channel in the stack (positional) 

331 

332 # Channel 1 detection parameters (same as single-channel function) 

333 chan_1_method: Detection method for channel 1 (DetectionMethod enum) 

334 chan_1_min_sigma: Minimum blob size for channel 1 

335 chan_1_max_sigma: Maximum blob size for channel 1 

336 chan_1_num_sigma: Number of sigma values to test for channel 1 

337 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0) 

338 chan_1_overlap: Maximum overlap between blobs for channel 1 

339 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1 

340 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1 

341 chan_1_watershed_threshold_method: Thresholding method for channel 1 

342 chan_1_enable_preprocessing: Apply preprocessing to channel 1 

343 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1 

344 chan_1_median_disk_size: Median filter size for channel 1 

345 chan_1_min_area: Minimum cell area for channel 1 

346 chan_1_max_area: Maximum cell area for channel 1 

347 chan_1_remove_border_cells: Remove border cells for channel 1 

348 

349 # Channel 2 detection parameters (same as single-channel function) 

350 chan_2_method: Detection method for channel 2 (DetectionMethod enum) 

351 chan_2_min_sigma: Minimum blob size for channel 2 

352 chan_2_max_sigma: Maximum blob size for channel 2 

353 chan_2_num_sigma: Number of sigma values to test for channel 2 

354 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0) 

355 chan_2_overlap: Maximum overlap between blobs for channel 2 

356 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2 

357 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2 

358 chan_2_watershed_threshold_method: Thresholding method for channel 2 

359 chan_2_enable_preprocessing: Apply preprocessing to channel 2 

360 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2 

361 chan_2_median_disk_size: Median filter size for channel 2 

362 chan_2_min_area: Minimum cell area for channel 2 

363 chan_2_max_area: Maximum cell area for channel 2 

364 chan_2_remove_border_cells: Remove border cells for channel 2 

365 

366 # Colocalization parameters 

367 colocalization_method: Method for determining colocalization (ColocalizationMethod enum) 

368 max_distance: Maximum distance for distance-based colocalization (pixels) 

369 min_overlap_area: Minimum overlap fraction for area-based colocalization 

370 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0) 

371 return_colocalization_map: Return colocalization visualization 

372 

373 Returns: 

374 output_stack: Original images or colocalization maps 

375 multi_channel_results: List of MultiChannelResult objects 

376 """ 

377 if image_stack.ndim != 3: 

378 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D") 

379 

380 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]: 

381 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}") 

382 

383 if chan_1 == chan_2: 

384 raise ValueError("Channel 1 and Channel 2 must be different") 

385 

386 # Extract channel images 

387 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency 

388 chan_2_img = image_stack[chan_2:chan_2+1] 

389 

390 # Count cells in each channel separately using the single-channel function 

391 # Channel 1 parameters (all explicit) 

392 chan_1_params = { 

393 "detection_method": chan_1_method, 

394 "min_sigma": chan_1_min_sigma, 

395 "max_sigma": chan_1_max_sigma, 

396 "num_sigma": chan_1_num_sigma, 

397 "threshold": chan_1_threshold, 

398 "overlap": chan_1_overlap, 

399 "watershed_footprint_size": chan_1_watershed_footprint_size, 

400 "watershed_min_distance": chan_1_watershed_min_distance, 

401 "watershed_threshold_method": chan_1_watershed_threshold_method, 

402 "enable_preprocessing": chan_1_enable_preprocessing, 

403 "gaussian_sigma": chan_1_gaussian_sigma, 

404 "median_disk_size": chan_1_median_disk_size, 

405 "min_cell_area": chan_1_min_area, 

406 "max_cell_area": chan_1_max_area, 

407 "remove_border_cells": chan_1_remove_border_cells, 

408 "return_segmentation_mask": False 

409 } 

410 

411 # Channel 2 parameters (all explicit) 

412 chan_2_params = { 

413 "detection_method": chan_2_method, 

414 "min_sigma": chan_2_min_sigma, 

415 "max_sigma": chan_2_max_sigma, 

416 "num_sigma": chan_2_num_sigma, 

417 "threshold": chan_2_threshold, 

418 "overlap": chan_2_overlap, 

419 "watershed_footprint_size": chan_2_watershed_footprint_size, 

420 "watershed_min_distance": chan_2_watershed_min_distance, 

421 "watershed_threshold_method": chan_2_watershed_threshold_method, 

422 "enable_preprocessing": chan_2_enable_preprocessing, 

423 "gaussian_sigma": chan_2_gaussian_sigma, 

424 "median_disk_size": chan_2_median_disk_size, 

425 "min_cell_area": chan_2_min_area, 

426 "max_cell_area": chan_2_max_area, 

427 "remove_border_cells": chan_2_remove_border_cells, 

428 "return_segmentation_mask": False 

429 } 

430 

431 # Process each channel 

432 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params) 

433 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params) 

434 

435 # Perform colocalization analysis 

436 multi_results = [] 

437 output_stack = image_stack.copy() 

438 

439 # Since we're processing single slices from each channel, we only have one result each 

440 chan_1_result = chan_1_results[0] 

441 chan_2_result = chan_2_results[0] 

442 

443 # Analyze colocalization 

444 coloc_result = _analyze_colocalization( 

445 chan_1_result, chan_2_result, colocalization_method.value, 

446 max_distance, min_overlap_area, intensity_threshold 

447 ) 

448 

449 multi_results.append(coloc_result) 

450 

451 # Create colocalization visualization if requested 

452 if return_colocalization_map: 

453 coloc_map = _create_colocalization_map( 

454 image_stack[chan_1], image_stack[chan_2], coloc_result 

455 ) 

456 # Replace one of the channels with the colocalization map 

457 output_stack = np.stack([image_stack[chan_1], image_stack[chan_2], coloc_map]) 

458 

459 return output_stack, multi_results 

460 

461 

462def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager) -> str: 

463 """Materialize single-channel cell counting results.""" 

464 # Generate output file paths based on the input path 

465 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

466 base_path = path.replace('.pkl', '') 

467 json_path = f"{base_path}.json" 

468 csv_path = f"{base_path}_details.csv" 

469 

470 # Ensure output directory exists for disk backend 

471 from pathlib import Path 

472 output_dir = Path(json_path).parent 

473 filemanager.ensure_directory(str(output_dir), Backend.DISK.value) 

474 

475 summary = { 

476 "analysis_type": "single_channel_cell_counting", 

477 "total_slices": len(data), 

478 "results_per_slice": [] 

479 } 

480 rows = [] 

481 

482 total_cells = 0 

483 for result in data: 

484 total_cells += result.cell_count 

485 

486 # Add to summary 

487 summary["results_per_slice"].append({ 

488 "slice_index": result.slice_index, 

489 "method": result.method, 

490 "cell_count": result.cell_count, 

491 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0, 

492 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0, 

493 "parameters": result.parameters_used 

494 }) 

495 

496 # Add individual cell data to CSV 

497 for i, (pos, area, intensity, confidence) in enumerate(zip( 

498 result.cell_positions, result.cell_areas, 

499 result.cell_intensities, result.detection_confidence 

500 )): 

501 rows.append({ 

502 'slice_index': result.slice_index, 

503 'cell_id': f"slice_{result.slice_index}_cell_{i}", 

504 'x_position': pos[0], 

505 'y_position': pos[1], 

506 'cell_area': area, 

507 'cell_intensity': intensity, 

508 'detection_confidence': confidence, 

509 'detection_method': result.method 

510 }) 

511 

512 summary["total_cells_all_slices"] = total_cells 

513 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0 

514 

515 # Save JSON summary (overwrite if exists) 

516 json_content = json.dumps(summary, indent=2, default=str) 

517 # Remove existing file if it exists using filemanager 

518 if filemanager.exists(json_path, Backend.DISK.value): 

519 filemanager.delete(json_path, Backend.DISK.value) 

520 filemanager.save(json_content, json_path, Backend.DISK.value) 

521 

522 # Save CSV details (overwrite if exists) 

523 if rows: 

524 df = pd.DataFrame(rows) 

525 csv_content = df.to_csv(index=False) 

526 # Remove existing file if it exists using filemanager 

527 if filemanager.exists(csv_path, Backend.DISK.value): 

528 filemanager.delete(csv_path, Backend.DISK.value) 

529 filemanager.save(csv_content, csv_path, Backend.DISK.value) 

530 

531 return json_path 

532 

533 

534def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager) -> str: 

535 """Materialize multi-channel cell counting and colocalization results.""" 

536 # Generate output file paths based on the input path 

537 # Use clean naming: preserve namespaced path structure, don't duplicate special output key 

538 base_path = path.replace('.pkl', '') 

539 json_path = f"{base_path}.json" 

540 csv_path = f"{base_path}_details.csv" 

541 

542 # Ensure output directory exists for disk backend 

543 from pathlib import Path 

544 output_dir = Path(json_path).parent 

545 filemanager.ensure_directory(str(output_dir), Backend.DISK.value) 

546 

547 summary = { 

548 "analysis_type": "multi_channel_cell_counting_colocalization", 

549 "total_slices": len(data), 

550 "colocalization_summary": { 

551 "total_chan_1_cells": 0, 

552 "total_chan_2_cells": 0, 

553 "total_colocalized": 0, 

554 "average_colocalization_percentage": 0 

555 }, 

556 "results_per_slice": [] 

557 } 

558 rows = [] 

559 

560 total_coloc_pct = 0 

561 for result in data: 

562 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count 

563 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count 

564 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count 

565 total_coloc_pct += result.colocalization_percentage 

566 

567 # Add to summary 

568 summary["results_per_slice"].append({ 

569 "slice_index": result.slice_index, 

570 "chan_1_count": result.chan_1_results.cell_count, 

571 "chan_2_count": result.chan_2_results.cell_count, 

572 "colocalized_count": result.colocalized_count, 

573 "colocalization_percentage": result.colocalization_percentage, 

574 "chan_1_only": result.chan_1_only_count, 

575 "chan_2_only": result.chan_2_only_count, 

576 "colocalization_method": result.colocalization_method, 

577 "colocalization_metrics": result.colocalization_metrics 

578 }) 

579 

580 # Add colocalization details to CSV 

581 for i, pos in enumerate(result.overlap_positions): 

582 rows.append({ 

583 'slice_index': result.slice_index, 

584 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}", 

585 'x_position': pos[0], 

586 'y_position': pos[1], 

587 'colocalization_method': result.colocalization_method 

588 }) 

589 

590 summary["colocalization_summary"]["average_colocalization_percentage"] = ( 

591 total_coloc_pct / len(data) if data else 0 

592 ) 

593 

594 # Save JSON summary (overwrite if exists) 

595 json_content = json.dumps(summary, indent=2, default=str) 

596 # Remove existing file if it exists using filemanager 

597 if filemanager.exists(json_path, Backend.DISK.value): 

598 filemanager.delete(json_path, Backend.DISK.value) 

599 filemanager.save(json_content, json_path, Backend.DISK.value) 

600 

601 # Save CSV details (overwrite if exists) 

602 if rows: 

603 df = pd.DataFrame(rows) 

604 csv_content = df.to_csv(index=False) 

605 # Remove existing file if it exists using filemanager 

606 if filemanager.exists(csv_path, Backend.DISK.value): 

607 filemanager.delete(csv_path, Backend.DISK.value) 

608 filemanager.save(csv_content, csv_path, Backend.DISK.value) 

609 

610 return json_path 

611 

612 

613def _preprocess_image(image, gaussian_sigma: float, median_disk_size: int): 

614 """Apply preprocessing to enhance cell detection using pyclesperanto.""" 

615 # Assume image is already a pyclesperanto array 

616 gpu_image = image 

617 

618 # Gaussian blur to reduce noise 

619 if gaussian_sigma > 0: 

620 gpu_image = cle.gaussian_blur(gpu_image, sigma_x=gaussian_sigma, sigma_y=gaussian_sigma) 

621 

622 # Median filter to remove salt-and-pepper noise 

623 if median_disk_size > 0: 

624 gpu_image = cle.median_box(gpu_image, radius_x=median_disk_size, radius_y=median_disk_size) 

625 

626 # Return the GPU array 

627 return gpu_image 

628 

629 

630def _detect_cells_single_method( 

631 image: np.ndarray, 

632 slice_idx: int, 

633 method: str, 

634 params: Dict[str, Any] 

635) -> CellCountResult: 

636 """Detect cells using specified method.""" 

637 

638 if method == DetectionMethod.BLOB_LOG.value: 

639 return _detect_cells_blob_log(image, slice_idx, params) 

640 elif method == DetectionMethod.BLOB_DOG.value: 

641 return _detect_cells_blob_dog(image, slice_idx, params) 

642 elif method == DetectionMethod.BLOB_DOH.value: 

643 return _detect_cells_blob_doh(image, slice_idx, params) 

644 elif method == DetectionMethod.WATERSHED.value: 

645 return _detect_cells_watershed(image, slice_idx, params) 

646 elif method == DetectionMethod.THRESHOLD.value: 

647 return _detect_cells_threshold(image, slice_idx, params) 

648 else: 

649 raise ValueError(f"Unknown detection method: {method}") 

650 

651 

652def _detect_cells_blob_log(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

653 """Detect cells using fast LoG-like blob detection.""" 

654 gpu_image = image 

655 

656 # Use single scale for speed - average of min and max sigma 

657 sigma = (params["min_sigma"] + params["max_sigma"]) / 2 

658 

659 # Fast LoG approximation: Use DoG like blob_dog but with closer scales 

660 blurred1 = cle.gaussian_blur(gpu_image, sigma_x=sigma, sigma_y=sigma) 

661 blurred2 = cle.gaussian_blur(gpu_image, sigma_x=sigma*1.6, sigma_y=sigma*1.6) 

662 

663 # Difference approximates Laplacian 

664 edges = cle.subtract_images(blurred1, blurred2) 

665 

666 # Threshold the edge response 

667 max_response = cle.maximum_of_all_pixels(cle.absolute(edges)) 

668 threshold_val = params["threshold"] * max_response 

669 thresholded = cle.greater_constant(cle.absolute(edges), scalar=threshold_val) 

670 

671 # Find local maxima 

672 maxima = cle.detect_maxima(cle.absolute(edges), 

673 radius_x=int(sigma), 

674 radius_y=int(sigma)) 

675 

676 # Combine threshold and maxima 

677 valid_maxima = cle.binary_and(thresholded, maxima) 

678 

679 # Dilate maxima more aggressively to create proper cell-sized regions 

680 dilated = valid_maxima 

681 for _ in range(6): # More aggressive dilation 

682 dilated = cle.dilate_box(dilated) 

683 

684 # Label connected components 

685 labels = cle.connected_components_labeling(dilated) 

686 

687 # Remove small and large objects 

688 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"]) 

689 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"]) 

690 

691 # Get statistics 

692 if cle.maximum_of_all_pixels(labels) > 0: 

693 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

694 

695 positions = [] 

696 areas = [] 

697 intensities = [] 

698 confidences = [] 

699 

700 if 'centroid_x' in stats_dict and len(stats_dict['centroid_x']) > 0: 

701 for i, (x, y) in enumerate(zip(stats_dict['centroid_x'], stats_dict['centroid_y'])): 

702 positions.append((float(x), float(y))) 

703 

704 # Get area and intensity 

705 area = float(stats_dict['area'][i]) if i < len(stats_dict.get('area', [])) else sigma**2 

706 intensity = float(stats_dict['mean_intensity'][i]) if i < len(stats_dict.get('mean_intensity', [])) else 1.0 

707 

708 areas.append(area) 

709 intensities.append(intensity) 

710 confidences.append(min(1.0, intensity / max_response)) 

711 else: 

712 positions = [] 

713 areas = [] 

714 intensities = [] 

715 confidences = [] 

716 

717 return CellCountResult( 

718 slice_index=slice_idx, 

719 method="blob_log_pyclesperanto", 

720 cell_count=len(positions), 

721 cell_positions=positions, 

722 cell_areas=areas, 

723 cell_intensities=intensities, 

724 detection_confidence=confidences, 

725 parameters_used=params 

726 ) 

727 

728 

729def _detect_cells_blob_dog(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

730 """Detect cells using fast Difference of Gaussians blob detection.""" 

731 gpu_image = image 

732 

733 # Use only two scales for speed 

734 sigma1 = params["min_sigma"] 

735 sigma2 = params["max_sigma"] 

736 

737 # Apply Gaussian blurs 

738 blur1 = cle.gaussian_blur(gpu_image, sigma_x=sigma1, sigma_y=sigma1) 

739 blur2 = cle.gaussian_blur(gpu_image, sigma_x=sigma2, sigma_y=sigma2) 

740 

741 # Difference of Gaussians 

742 dog = cle.subtract_images(blur1, blur2) 

743 

744 # Threshold the DoG response 

745 max_response = cle.maximum_of_all_pixels(cle.absolute(dog)) 

746 threshold_val = params["threshold"] * max_response 

747 thresholded = cle.greater_constant(cle.absolute(dog), scalar=threshold_val) 

748 

749 # Find local maxima 

750 maxima = cle.detect_maxima(cle.absolute(dog), 

751 radius_x=int(sigma1), 

752 radius_y=int(sigma1)) 

753 

754 # Combine threshold and maxima 

755 valid_maxima = cle.binary_and(thresholded, maxima) 

756 

757 # Dilate maxima to create regions for area calculation 

758 dilated = valid_maxima 

759 for _ in range(3): # Moderate dilation 

760 dilated = cle.dilate_box(dilated) 

761 

762 # Label connected components 

763 labels = cle.connected_components_labeling(dilated) 

764 

765 # Remove small and large objects 

766 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"]) 

767 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"]) 

768 

769 # Get statistics 

770 if cle.maximum_of_all_pixels(labels) > 0: 

771 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

772 

773 positions = [] 

774 areas = [] 

775 intensities = [] 

776 confidences = [] 

777 

778 if 'centroid_x' in stats_dict and len(stats_dict['centroid_x']) > 0: 

779 for i, (x, y) in enumerate(zip(stats_dict['centroid_x'], stats_dict['centroid_y'])): 

780 positions.append((float(x), float(y))) 

781 

782 # Get area and intensity 

783 area = float(stats_dict['area'][i]) if i < len(stats_dict.get('area', [])) else sigma1**2 

784 intensity = float(stats_dict['mean_intensity'][i]) if i < len(stats_dict.get('mean_intensity', [])) else 1.0 

785 

786 areas.append(area) 

787 intensities.append(intensity) 

788 confidences.append(min(1.0, intensity / max_response)) 

789 else: 

790 positions = [] 

791 areas = [] 

792 intensities = [] 

793 confidences = [] 

794 

795 return CellCountResult( 

796 slice_index=slice_idx, 

797 method="blob_dog_pyclesperanto", 

798 cell_count=len(positions), 

799 cell_positions=positions, 

800 cell_areas=areas, 

801 cell_intensities=intensities, 

802 detection_confidence=confidences, 

803 parameters_used=params 

804 ) 

805 

806 

807 

808 # Extract the data we need from the statistics dictionary 

809 if 'label' in stats_dict and len(stats_dict['label']) > 0: 

810 # We have detected objects 

811 areas = stats_dict.get('area', []) 

812 labels_list = stats_dict.get('label', []) 

813 else: 

814 # No objects detected 

815 areas = [] 

816 labels_list = [] 

817 centroids_x = [] 

818 centroids_y = [] 

819 

820 # Process similar to blob_log 

821 positions = [] 

822 filtered_areas = [] 

823 intensities = [] 

824 confidences = [] 

825 

826 for i in range(len(labels_list)): 

827 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas): 

828 x = float(centroids_x[i]) 

829 y = float(centroids_y[i]) 

830 positions.append((x, y)) 

831 

832 # Get area 

833 area = float(areas[i]) 

834 filtered_areas.append(area) 

835 

836 # Mean intensity (if available in stats) 

837 mean_intensities = stats_dict.get('mean_intensity', []) 

838 if i < len(mean_intensities): 

839 intensity = float(mean_intensities[i]) 

840 else: 

841 intensity = 1.0 # Default value 

842 intensities.append(intensity) 

843 

844 # Use area as confidence measure 

845 confidence = min(1.0, area / (np.pi * params["max_sigma"]**2)) 

846 confidences.append(confidence) 

847 

848 filtered_data = _filter_by_area( 

849 positions, filtered_areas, intensities, confidences, 

850 params["min_cell_area"], params["max_cell_area"] 

851 ) 

852 

853 return CellCountResult( 

854 slice_index=slice_idx, 

855 method="blob_dog_pyclesperanto", 

856 cell_count=len(filtered_data[0]), 

857 cell_positions=filtered_data[0], 

858 cell_areas=filtered_data[1], 

859 cell_intensities=filtered_data[2], 

860 detection_confidence=filtered_data[3], 

861 parameters_used=params 

862 ) 

863 

864 

865def _detect_cells_blob_doh(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

866 """Detect cells using Hessian-like detection with pyclesperanto.""" 

867 # Assume image is already a pyclesperanto array 

868 gpu_image = image 

869 

870 # Apply Gaussian blur for smoothing 

871 sigma = (params["min_sigma"] + params["max_sigma"]) / 2 

872 blurred = cle.gaussian_blur(gpu_image, sigma_x=sigma, sigma_y=sigma) 

873 

874 # Apply edge detection (approximates Hessian determinant) 

875 edges = cle.binary_edge_detection(blurred) 

876 

877 # Apply threshold to original image for valid regions 

878 threshold_val = params["threshold"] * cle.maximum_of_all_pixels(gpu_image) 

879 thresholded = cle.greater_constant(gpu_image, scalar=threshold_val) 

880 

881 # Detect local maxima in edge response 

882 maxima = cle.detect_maxima_box(edges, 

883 radius_x=int(params["min_sigma"]), 

884 radius_y=int(params["min_sigma"])) 

885 

886 # Combine threshold and maxima 

887 combined = cle.binary_and(thresholded, maxima) 

888 

889 # Dilate the maxima points to create proper regions 

890 dilated = cle.dilate_box(combined) 

891 

892 # Label the dilated regions 

893 labels = cle.connected_components_labeling(dilated) 

894 

895 # Remove small and large objects (same as simple baseline) 

896 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"]) 

897 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"]) 

898 

899 # Get statistics - this returns a dictionary with centroids included 

900 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

901 

902 # Extract centroids directly from statistics (much simpler!) 

903 centroids_x = stats_dict.get('centroid_x', []) 

904 centroids_y = stats_dict.get('centroid_y', []) 

905 

906 # Extract the data we need from the statistics dictionary 

907 if 'label' in stats_dict and len(stats_dict['label']) > 0: 

908 # We have detected objects 

909 areas = stats_dict.get('area', []) 

910 labels_list = stats_dict.get('label', []) 

911 else: 

912 # No objects detected 

913 areas = [] 

914 labels_list = [] 

915 centroids_x = [] 

916 centroids_y = [] 

917 

918 # Process similar to other blob methods 

919 positions = [] 

920 filtered_areas = [] 

921 intensities = [] 

922 confidences = [] 

923 

924 for i in range(len(labels_list)): 

925 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas): 

926 x = float(centroids_x[i]) 

927 y = float(centroids_y[i]) 

928 positions.append((x, y)) 

929 

930 # Get area 

931 area = float(areas[i]) 

932 filtered_areas.append(area) 

933 

934 # Mean intensity (if available in stats) 

935 mean_intensities = stats_dict.get('mean_intensity', []) 

936 if i < len(mean_intensities): 

937 intensity = float(mean_intensities[i]) 

938 else: 

939 intensity = 1.0 # Default value 

940 intensities.append(intensity) 

941 

942 # Use area as confidence measure 

943 confidence = min(1.0, area / (np.pi * params["max_sigma"]**2)) 

944 confidences.append(confidence) 

945 

946 filtered_data = _filter_by_area( 

947 positions, filtered_areas, intensities, confidences, 

948 params["min_cell_area"], params["max_cell_area"] 

949 ) 

950 

951 return CellCountResult( 

952 slice_index=slice_idx, 

953 method="blob_doh_pyclesperanto", 

954 cell_count=len(filtered_data[0]), 

955 cell_positions=filtered_data[0], 

956 cell_areas=filtered_data[1], 

957 cell_intensities=filtered_data[2], 

958 detection_confidence=filtered_data[3], 

959 parameters_used=params 

960 ) 

961 

962 

963def _filter_by_area( 

964 positions: List[Tuple[float, float]], 

965 areas: List[float], 

966 intensities: List[float], 

967 confidences: List[float], 

968 min_area: float, 

969 max_area: float 

970) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]: 

971 """Filter detected cells by area constraints.""" 

972 filtered_positions = [] 

973 filtered_areas = [] 

974 filtered_intensities = [] 

975 filtered_confidences = [] 

976 

977 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences): 

978 if min_area <= area <= max_area: 

979 filtered_positions.append(pos) 

980 filtered_areas.append(area) 

981 filtered_intensities.append(intensity) 

982 filtered_confidences.append(confidence) 

983 

984 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences 

985 

986 

987def _non_maximum_suppression_3d(positions, scales, responses, overlap_threshold): 

988 """Apply non-maximum suppression across scale space.""" 

989 if len(positions) == 0: 

990 return [], [], [] 

991 

992 # Convert to numpy arrays for easier processing 

993 positions = np.array(positions) 

994 scales = np.array(scales) 

995 responses = np.array(responses) 

996 

997 # Sort by response strength (highest first) 

998 sorted_indices = np.argsort(responses)[::-1] 

999 

1000 keep = [] 

1001 for i in sorted_indices: 

1002 pos_i = positions[i] 

1003 scale_i = scales[i] 

1004 

1005 # Check if this detection overlaps with any already kept detection 

1006 should_keep = True 

1007 for j in keep: 

1008 pos_j = positions[j] 

1009 scale_j = scales[j] 

1010 

1011 # Calculate distance in space and scale 

1012 spatial_dist = np.sqrt(np.sum((pos_i - pos_j) ** 2)) 

1013 scale_dist = abs(scale_i - scale_j) / max(scale_i, scale_j) 

1014 

1015 # Check if they overlap significantly 

1016 overlap_radius = max(scale_i, scale_j) * (1 + overlap_threshold) 

1017 if spatial_dist < overlap_radius and scale_dist < overlap_threshold: 

1018 should_keep = False 

1019 break 

1020 

1021 if should_keep: 

1022 keep.append(i) 

1023 

1024 # Return filtered results 

1025 return positions[keep].tolist(), scales[keep].tolist(), responses[keep].tolist() 

1026 

1027 

1028def _detect_cells_watershed(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

1029 """Detect cells using watershed segmentation with pyclesperanto.""" 

1030 # Assume image is already a pyclesperanto array 

1031 gpu_image = image 

1032 

1033 # Use Otsu thresholding (optimal for microscopy images) 

1034 binary = cle.threshold_otsu(gpu_image) 

1035 

1036 # Remove small and large objects with explicit memory cleanup 

1037 temp_labels = cle.connected_components_labeling(binary) 

1038 

1039 # Clean up intermediate arrays to prevent GPU OOM 

1040 temp_labels_filtered = cle.remove_small_labels(temp_labels, minimum_size=params["min_cell_area"]) 

1041 del temp_labels # Explicit cleanup 

1042 

1043 temp_labels = cle.remove_large_labels(temp_labels_filtered, maximum_size=params["max_cell_area"]) 

1044 del temp_labels_filtered # Explicit cleanup 

1045 

1046 binary_filtered = cle.greater_constant(temp_labels, scalar=0) 

1047 del binary # Cleanup original binary 

1048 binary = binary_filtered 

1049 

1050 # Remove border objects if requested 

1051 if params["remove_border_cells"]: 

1052 temp_labels_border = cle.connected_components_labeling(binary) 

1053 del temp_labels # Cleanup previous temp_labels 

1054 

1055 temp_labels = cle.remove_labels_on_edges(temp_labels_border) 

1056 del temp_labels_border # Explicit cleanup 

1057 

1058 binary_final = cle.greater_constant(temp_labels, scalar=0) 

1059 del binary # Cleanup previous binary 

1060 binary = binary_final 

1061 

1062 # Since pyclesperanto doesn't have watershed, use connected components directly 

1063 labels = cle.connected_components_labeling(binary) 

1064 del binary # Cleanup binary mask 

1065 

1066 # Get statistics - this returns a dictionary with centroids included 

1067 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

1068 

1069 # Cleanup labels array after getting statistics 

1070 del labels 

1071 

1072 # Extract centroids directly from statistics (much simpler!) 

1073 centroids_x = stats_dict.get('centroid_x', []) 

1074 centroids_y = stats_dict.get('centroid_y', []) 

1075 

1076 # Extract the data we need from the statistics dictionary 

1077 if 'label' in stats_dict and len(stats_dict['label']) > 0: 

1078 # We have detected objects 

1079 areas = stats_dict.get('area', []) 

1080 labels_list = stats_dict.get('label', []) 

1081 else: 

1082 # No objects detected 

1083 areas = [] 

1084 labels_list = [] 

1085 centroids_x = [] 

1086 centroids_y = [] 

1087 

1088 positions = [] 

1089 filtered_areas = [] 

1090 intensities = [] 

1091 confidences = [] 

1092 

1093 for i in range(len(labels_list)): 

1094 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas): 

1095 area = float(areas[i]) 

1096 

1097 # Filter by area 

1098 if params["min_cell_area"] <= area <= params["max_cell_area"]: 

1099 x = float(centroids_x[i]) 

1100 y = float(centroids_y[i]) 

1101 positions.append((x, y)) 

1102 

1103 filtered_areas.append(area) 

1104 

1105 # Mean intensity (if available in stats) 

1106 mean_intensities = stats_dict.get('mean_intensity', []) 

1107 if i < len(mean_intensities): 

1108 intensity = float(mean_intensities[i]) 

1109 else: 

1110 intensity = 1.0 # Default value 

1111 intensities.append(intensity) 

1112 

1113 # Use area as confidence measure (normalized) 

1114 confidence = min(1.0, area / params["max_cell_area"]) 

1115 confidences.append(confidence) 

1116 

1117 # Force garbage collection to free GPU memory 

1118 gc.collect() 

1119 

1120 return CellCountResult( 

1121 slice_index=slice_idx, 

1122 method="watershed_pyclesperanto", 

1123 cell_count=len(positions), 

1124 cell_positions=positions, 

1125 cell_areas=filtered_areas, 

1126 cell_intensities=intensities, 

1127 detection_confidence=confidences, 

1128 parameters_used=params 

1129 ) 

1130 

1131 

1132def _detect_cells_threshold(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult: 

1133 """Detect cells using simple thresholding and connected components with pyclesperanto.""" 

1134 # Image is already a pyclesperanto array - no conversion needed 

1135 gpu_image = image 

1136 

1137 # Apply threshold (all operations stay on GPU) 

1138 max_intensity = cle.maximum_of_all_pixels(gpu_image) 

1139 threshold_val = params["threshold"] * max_intensity 

1140 binary = cle.greater_constant(gpu_image, scalar=threshold_val) 

1141 

1142 # Remove small objects with explicit memory cleanup 

1143 temp_labels = cle.connected_components_labeling(binary) 

1144 temp_labels_filtered = cle.remove_small_labels(temp_labels, minimum_size=params["min_cell_area"]) 

1145 del temp_labels # Explicit cleanup 

1146 

1147 binary_filtered = cle.greater_constant(temp_labels_filtered, scalar=0) 

1148 del binary # Cleanup original binary 

1149 del temp_labels_filtered # Cleanup filtered labels 

1150 binary = binary_filtered 

1151 

1152 # Remove border objects if requested with explicit cleanup 

1153 if params["remove_border_cells"]: 

1154 temp_labels = cle.connected_components_labeling(binary) 

1155 temp_labels_no_border = cle.remove_labels_on_edges(temp_labels) 

1156 del temp_labels # Explicit cleanup 

1157 

1158 binary_final = cle.greater_constant(temp_labels_no_border, scalar=0) 

1159 del binary # Cleanup previous binary 

1160 del temp_labels_no_border # Cleanup no border labels 

1161 binary = binary_final 

1162 

1163 # Label connected components (final labeling on GPU) 

1164 labels = cle.connected_components_labeling(binary) 

1165 del binary # Cleanup binary mask 

1166 

1167 # Get statistics (operates on GPU arrays, returns dictionary) 

1168 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels) 

1169 del labels # Cleanup labels after getting statistics 

1170 

1171 # Extract centroids directly from statistics (much simpler!) 

1172 centroids_x = stats_dict.get('centroid_x', []) 

1173 centroids_y = stats_dict.get('centroid_y', []) 

1174 

1175 # Extract the data we need from the statistics dictionary 

1176 if 'label' in stats_dict and len(stats_dict['label']) > 0: 

1177 # We have detected objects 

1178 areas = stats_dict.get('area', []) 

1179 labels_list = stats_dict.get('label', []) 

1180 else: 

1181 # No objects detected 

1182 areas = [] 

1183 labels_list = [] 

1184 centroids_x = [] 

1185 centroids_y = [] 

1186 

1187 max_intensity_cpu = float(max_intensity) 

1188 

1189 positions = [] 

1190 filtered_areas = [] 

1191 intensities = [] 

1192 confidences = [] 

1193 

1194 for i in range(len(labels_list)): 

1195 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas): 

1196 area = float(areas[i]) 

1197 

1198 # Filter by area 

1199 if params["min_cell_area"] <= area <= params["max_cell_area"]: 

1200 x = float(centroids_x[i]) 

1201 y = float(centroids_y[i]) 

1202 positions.append((x, y)) 

1203 

1204 filtered_areas.append(area) 

1205 

1206 # Mean intensity (if available in stats) 

1207 mean_intensities = stats_dict.get('mean_intensity', []) 

1208 if i < len(mean_intensities): 

1209 mean_intensity = float(mean_intensities[i]) 

1210 else: 

1211 mean_intensity = 1.0 # Default value 

1212 intensities.append(mean_intensity) 

1213 

1214 # Use intensity as confidence measure 

1215 confidence = mean_intensity / max_intensity_cpu if max_intensity_cpu > 0 else 1.0 

1216 confidences.append(confidence) 

1217 

1218 # Force garbage collection to free GPU memory 

1219 gc.collect() 

1220 

1221 return CellCountResult( 

1222 slice_index=slice_idx, 

1223 method="threshold_pyclesperanto", 

1224 cell_count=len(positions), 

1225 cell_positions=positions, 

1226 cell_areas=filtered_areas, 

1227 cell_intensities=intensities, 

1228 detection_confidence=confidences, 

1229 parameters_used=params 

1230 ) 

1231 

1232 

1233def _analyze_colocalization( 

1234 chan_1_result: CellCountResult, 

1235 chan_2_result: CellCountResult, 

1236 method: str, 

1237 max_distance: float, 

1238 min_overlap_area: float, 

1239 intensity_threshold: float 

1240) -> MultiChannelResult: 

1241 """Analyze colocalization between two channels.""" 

1242 

1243 if method == ColocalizationMethod.DISTANCE_BASED.value: 

1244 return _colocalization_distance_based( 

1245 chan_1_result, chan_2_result, max_distance 

1246 ) 

1247 elif method == ColocalizationMethod.OVERLAP_AREA.value: 

1248 return _colocalization_overlap_based( 

1249 chan_1_result, chan_2_result, min_overlap_area 

1250 ) 

1251 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value: 

1252 return _colocalization_intensity_based( 

1253 chan_1_result, chan_2_result, intensity_threshold 

1254 ) 

1255 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value: 

1256 return _colocalization_manders( 

1257 chan_1_result, chan_2_result, intensity_threshold 

1258 ) 

1259 else: 

1260 raise ValueError(f"Unknown colocalization method: {method}") 

1261 

1262 

1263def _colocalization_distance_based( 

1264 chan_1_result: CellCountResult, 

1265 chan_2_result: CellCountResult, 

1266 max_distance: float 

1267) -> MultiChannelResult: 

1268 """Perform distance-based colocalization analysis.""" 

1269 

1270 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1271 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based") 

1272 

1273 # Convert positions to arrays 

1274 pos_1 = np.array(chan_1_result.cell_positions) 

1275 pos_2 = np.array(chan_2_result.cell_positions) 

1276 

1277 # Calculate pairwise distances 

1278 distances = cdist(pos_1, pos_2) 

1279 

1280 # Find colocalized pairs 

1281 colocalized_pairs = [] 

1282 used_chan_2 = set() 

1283 

1284 for i in range(len(pos_1)): 

1285 # Find closest cell in channel 2 

1286 min_dist_idx = np.argmin(distances[i]) 

1287 min_dist = distances[i, min_dist_idx] 

1288 

1289 # Check if within distance threshold and not already used 

1290 if min_dist <= max_distance and min_dist_idx not in used_chan_2: 

1291 colocalized_pairs.append((i, min_dist_idx)) 

1292 used_chan_2.add(min_dist_idx) 

1293 

1294 # Calculate metrics 

1295 colocalized_count = len(colocalized_pairs) 

1296 total_cells = len(pos_1) + len(pos_2) 

1297 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1298 

1299 chan_1_only = len(pos_1) - colocalized_count 

1300 chan_2_only = len(pos_2) - colocalized_count 

1301 

1302 # Extract colocalized positions (average of paired positions) 

1303 overlap_positions = [] 

1304 for i, j in colocalized_pairs: 

1305 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist() 

1306 overlap_positions.append(tuple(avg_pos)) 

1307 

1308 # Calculate additional metrics 

1309 if colocalized_pairs: 

1310 avg_distance = np.mean([distances[i, j] for i, j in colocalized_pairs]) 

1311 max_distance_found = np.max([distances[i, j] for i, j in colocalized_pairs]) 

1312 else: 

1313 avg_distance = 0 

1314 max_distance_found = 0 

1315 

1316 metrics = { 

1317 "average_colocalization_distance": float(avg_distance), 

1318 "max_colocalization_distance": float(max_distance_found), 

1319 "distance_threshold_used": max_distance 

1320 } 

1321 

1322 return MultiChannelResult( 

1323 slice_index=chan_1_result.slice_index, 

1324 chan_1_results=chan_1_result, 

1325 chan_2_results=chan_2_result, 

1326 colocalization_method="distance_based", 

1327 colocalized_count=colocalized_count, 

1328 colocalization_percentage=colocalization_percentage, 

1329 chan_1_only_count=chan_1_only, 

1330 chan_2_only_count=chan_2_only, 

1331 colocalization_metrics=metrics, 

1332 overlap_positions=overlap_positions 

1333 ) 

1334 

1335 

1336def _create_empty_coloc_result( 

1337 chan_1_result: CellCountResult, 

1338 chan_2_result: CellCountResult, 

1339 method: str 

1340) -> MultiChannelResult: 

1341 """Create empty colocalization result when no cells found.""" 

1342 return MultiChannelResult( 

1343 slice_index=chan_1_result.slice_index, 

1344 chan_1_results=chan_1_result, 

1345 chan_2_results=chan_2_result, 

1346 colocalization_method=method, 

1347 colocalized_count=0, 

1348 colocalization_percentage=0.0, 

1349 chan_1_only_count=chan_1_result.cell_count, 

1350 chan_2_only_count=chan_2_result.cell_count, 

1351 colocalization_metrics={}, 

1352 overlap_positions=[] 

1353 ) 

1354 

1355 

1356def _colocalization_overlap_based( 

1357 chan_1_result: CellCountResult, 

1358 chan_2_result: CellCountResult, 

1359 min_overlap_area: float 

1360) -> MultiChannelResult: 

1361 """Perform area overlap-based colocalization analysis.""" 

1362 # This is a simplified implementation - in practice, you'd need actual segmentation masks 

1363 # For now, we'll use distance as a proxy for overlap 

1364 

1365 # Use distance-based method with smaller threshold as approximation 

1366 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly 

1367 

1368 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold) 

1369 result.colocalization_method = "overlap_area" 

1370 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area 

1371 result.colocalization_metrics["note"] = "Approximated using distance-based method" 

1372 

1373 return result 

1374 

1375 

1376def _colocalization_intensity_based( 

1377 chan_1_result: CellCountResult, 

1378 chan_2_result: CellCountResult, 

1379 intensity_threshold: float 

1380) -> MultiChannelResult: 

1381 """Perform intensity correlation-based colocalization analysis.""" 

1382 

1383 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1384 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation") 

1385 

1386 # Use distance-based pairing first 

1387 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0) 

1388 

1389 # Filter pairs based on intensity correlation 

1390 colocalized_pairs = [] 

1391 overlap_positions = [] 

1392 

1393 pos_1 = np.array(chan_1_result.cell_positions) 

1394 pos_2 = np.array(chan_2_result.cell_positions) 

1395 

1396 for i, (x1, y1) in enumerate(chan_1_result.cell_positions): 

1397 for j, (x2, y2) in enumerate(chan_2_result.cell_positions): 

1398 # Calculate distance 

1399 dist = np.sqrt((x1 - x2)**2 + (y1 - y2)**2) 

1400 

1401 if dist <= 5.0: # Within reasonable distance 

1402 # Check intensity correlation 

1403 int_1 = chan_1_result.cell_intensities[i] 

1404 int_2 = chan_2_result.cell_intensities[j] 

1405 

1406 # Simple intensity correlation: both above threshold 

1407 if int_1 >= intensity_threshold and int_2 >= intensity_threshold: 

1408 colocalized_pairs.append((i, j)) 

1409 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2) 

1410 overlap_positions.append(avg_pos) 

1411 break # One-to-one mapping 

1412 

1413 colocalized_count = len(colocalized_pairs) 

1414 total_cells = len(pos_1) + len(pos_2) 

1415 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0 

1416 

1417 metrics = { 

1418 "intensity_threshold_used": intensity_threshold, 

1419 "correlation_method": "threshold_based" 

1420 } 

1421 

1422 return MultiChannelResult( 

1423 slice_index=chan_1_result.slice_index, 

1424 chan_1_results=chan_1_result, 

1425 chan_2_results=chan_2_result, 

1426 colocalization_method="intensity_correlation", 

1427 colocalized_count=colocalized_count, 

1428 colocalization_percentage=colocalization_percentage, 

1429 chan_1_only_count=len(pos_1) - colocalized_count, 

1430 chan_2_only_count=len(pos_2) - colocalized_count, 

1431 colocalization_metrics=metrics, 

1432 overlap_positions=overlap_positions 

1433 ) 

1434 

1435 

1436def _colocalization_manders( 

1437 chan_1_result: CellCountResult, 

1438 chan_2_result: CellCountResult, 

1439 intensity_threshold: float 

1440) -> MultiChannelResult: 

1441 """Calculate Manders colocalization coefficients.""" 

1442 

1443 if not chan_1_result.cell_positions or not chan_2_result.cell_positions: 

1444 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients") 

1445 

1446 # Simplified Manders calculation based on detected cells 

1447 # In practice, this would use pixel-level intensity analysis 

1448 

1449 # Use intensity-based method as foundation 

1450 intensity_result = _colocalization_intensity_based( 

1451 chan_1_result, chan_2_result, intensity_threshold 

1452 ) 

1453 

1454 # Calculate Manders-like coefficients 

1455 total_int_1 = sum(chan_1_result.cell_intensities) 

1456 total_int_2 = sum(chan_2_result.cell_intensities) 

1457 

1458 # Simplified: assume colocalized cells contribute their full intensity 

1459 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in 

1460 [(i, j) for i in range(len(chan_1_result.cell_positions)) 

1461 for j in range(len(chan_2_result.cell_positions)) 

1462 if (i, j) in [(0, 0)]]) # Simplified placeholder 

1463 

1464 # Manders coefficients (M1 and M2) 

1465 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0 

1466 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified 

1467 

1468 intensity_result.colocalization_method = "manders_coefficients" 

1469 intensity_result.colocalization_metrics.update({ 

1470 "manders_m1": m1, 

1471 "manders_m2": m2, 

1472 "note": "Simplified cell-based Manders calculation" 

1473 }) 

1474 

1475 return intensity_result 

1476 

1477 

1478def _create_segmentation_visualization( 

1479 image: np.ndarray, 

1480 positions: List[Tuple[float, float]], 

1481 max_sigma: float 

1482) -> np.ndarray: 

1483 """Create segmentation visualization with detected cells marked.""" 

1484 # Convert pyclesperanto array to numpy only when needed for visualization 

1485 import pyclesperanto as cle 

1486 if hasattr(image, 'shape') and hasattr(image, 'dtype') and not isinstance(image, np.ndarray): 

1487 # This is a pyclesperanto array - convert to numpy only for final visualization 

1488 visualization = cle.pull(image).copy() 

1489 else: 

1490 visualization = image.copy() 

1491 

1492 # Mark detected cells 

1493 for x, y in positions: 

1494 # Create small circular markers 

1495 rr, cc = np.ogrid[:image.shape[0], :image.shape[1]] 

1496 mask = (rr - y)**2 + (cc - x)**2 <= (max_sigma * 2)**2 

1497 

1498 # Ensure indices are within bounds 

1499 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1]) 

1500 mask = mask & valid_mask 

1501 

1502 visualization[mask] = visualization.max() # Bright markers 

1503 

1504 return visualization 

1505 

1506 

1507def count_cells_simple_baseline( 

1508 image: np.ndarray, # 2D image only 

1509 threshold: float = 0.1, 

1510 min_cell_area: int = 50, 

1511 max_cell_area: int = 5000 

1512) -> Tuple[np.ndarray, int, List[Tuple[float, float]]]: 

1513 """ 

1514 Simple baseline cell counting using pyclesperanto. 

1515 Based on analyse_blobs.ipynb and voronoi_otsu_labeling.ipynb examples. 

1516 

1517 Args: 

1518 image: 2D numpy array 

1519 threshold: Threshold as fraction of max intensity (0.0-1.0) 

1520 min_cell_area: Minimum area in pixels 

1521 max_cell_area: Maximum area in pixels 

1522 

1523 Returns: 

1524 segmentation_mask: 2D array with labeled cells 

1525 cell_count: Number of detected cells 

1526 cell_positions: List of (x, y) centroid positions 

1527 """ 

1528 # Convert to pyclesperanto array 

1529 gpu_image = cle.push(image.astype(np.float32)) 

1530 

1531 # Apply threshold 

1532 max_intensity = cle.maximum_of_all_pixels(gpu_image) 

1533 threshold_val = threshold * max_intensity 

1534 binary = cle.greater_constant(gpu_image, scalar=threshold_val) 

1535 

1536 # Connected components labeling 

1537 labels = cle.connected_components_labeling(binary) 

1538 

1539 # Remove small and large objects 

1540 labels = cle.remove_small_labels(labels, minimum_size=min_cell_area) 

1541 labels = cle.remove_large_labels(labels, maximum_size=max_cell_area) 

1542 

1543 # Get statistics 

1544 stats = cle.statistics_of_labelled_pixels(gpu_image, labels) 

1545 

1546 # Extract results 

1547 if 'label' in stats and len(stats['label']) > 0: 

1548 cell_count = len(stats['label']) 

1549 centroids_x = stats.get('centroid_x', []) 

1550 centroids_y = stats.get('centroid_y', []) 

1551 cell_positions = [(float(x), float(y)) for x, y in zip(centroids_x, centroids_y)] 

1552 else: 

1553 cell_count = 0 

1554 cell_positions = [] 

1555 

1556 # Convert result back to numpy 

1557 segmentation_mask = cle.pull(labels) 

1558 

1559 return segmentation_mask, cell_count, cell_positions 

1560 

1561 

1562def _create_colocalization_map( 

1563 chan_1_img: np.ndarray, 

1564 chan_2_img: np.ndarray, 

1565 coloc_result: MultiChannelResult 

1566) -> np.ndarray: 

1567 """Create colocalization visualization map.""" 

1568 # Create RGB-like visualization 

1569 coloc_map = np.zeros_like(chan_1_img) 

1570 

1571 # Mark colocalized positions 

1572 for x, y in coloc_result.overlap_positions: 

1573 # Create markers for colocalized cells 

1574 rr, cc = np.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]] 

1575 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius 

1576 

1577 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1]) 

1578 mask = mask & valid_mask 

1579 

1580 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers 

1581 

1582 return coloc_map