Coverage for openhcs/processing/backends/analysis/cell_counting_pyclesperanto.py: 1.3%
605 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2GPU-accelerated cell counting and multi-channel colocalization analysis for OpenHCS.
4This module provides comprehensive cell counting capabilities using pyclesperanto,
5supporting both single-channel and multi-channel analysis with various detection
6methods and colocalization metrics.
7"""
9import numpy as np
10import logging
11import gc
12from typing import Dict, List, Tuple, Any, Optional, Union
13from dataclasses import dataclass
14from enum import Enum
16logger = logging.getLogger(__name__)
18# Core scientific computing imports
19import pandas as pd
20import json
21import pyclesperanto as cle
22from scipy.spatial.distance import cdist
24# OpenHCS imports
25from openhcs.core.memory.decorators import pyclesperanto as pyclesperanto_func
26from openhcs.core.pipeline.function_contracts import special_outputs
27from openhcs.constants.constants import Backend
30class DetectionMethod(Enum):
31 """Cell detection methods available."""
32 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells)
33 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose)
34 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells)
35 WATERSHED = "watershed" # Watershed segmentation (best for touching cells)
36 THRESHOLD = "threshold" # Simple thresholding (fastest, basic)
39class ColocalizationMethod(Enum):
40 """Methods for multi-channel colocalization analysis."""
41 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap
42 DISTANCE_BASED = "distance_based" # Based on centroid distances
43 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation
44 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients
47class ThresholdMethod(Enum):
48 """Automatic thresholding methods for watershed segmentation."""
49 OTSU = "otsu" # Otsu's method (good for bimodal histograms)
50 LI = "li" # Li's method (good for low contrast images)
51 MANUAL = "manual" # Manual threshold value (0.0-1.0)
57@dataclass
58class CellCountResult:
59 """Results for single-channel cell counting."""
60 slice_index: int
61 method: str
62 cell_count: int
63 cell_positions: List[Tuple[float, float]] # (x, y) centroids
64 cell_areas: List[float]
65 cell_intensities: List[float]
66 detection_confidence: List[float]
67 parameters_used: Dict[str, Any]
70@dataclass
71class MultiChannelResult:
72 """Results for multi-channel cell counting and colocalization."""
73 slice_index: int
74 chan_1_results: CellCountResult
75 chan_2_results: CellCountResult
76 colocalization_method: str
77 colocalized_count: int
78 colocalization_percentage: float
79 chan_1_only_count: int
80 chan_2_only_count: int
81 colocalization_metrics: Dict[str, float]
82 overlap_positions: List[Tuple[float, float]]
85def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager) -> str:
86 """Materialize cell counting results as analysis-ready CSV and JSON formats."""
88 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}")
90 # Convert CLE binary_mask to NumPy if present
91 if data:
92 for result in data:
93 if result.binary_mask is not None:
94 result.binary_mask = np.asarray(result.binary_mask)
95 if isinstance(result, MultiChannelResult):
96 if result.chan_1_results.binary_mask is not None:
97 result.chan_1_results.binary_mask = np.asarray(result.chan_1_results.binary_mask)
98 if result.chan_2_results.binary_mask is not None:
99 result.chan_2_results.binary_mask = np.asarray(result.chan_2_results.binary_mask)
101 # Determine if this is single-channel or multi-channel data
102 if not data:
103 logger.warning(f"🔬 CELL_COUNT_MATERIALIZE: No data to materialize")
104 return path
106 is_multi_channel = isinstance(data[0], MultiChannelResult)
107 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}")
109 if is_multi_channel:
110 return _materialize_multi_channel_results(data, path, filemanager)
111 else:
112 return _materialize_single_channel_results(data, path, filemanager)
115def materialize_segmentation_masks(data: List[np.ndarray], path: str, filemanager) -> str:
116 """Materialize segmentation masks as individual TIFF files."""
118 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, masks_count={len(data) if data else 0}")
120 # Convert CLE arrays to NumPy
121 if data:
122 data = [np.asarray(mask) for mask in data]
124 if not data:
125 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)")
126 # Create empty summary file to indicate no masks were generated
127 summary_path = path.replace('.pkl', '_segmentation_summary.txt')
128 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n"
129 from openhcs.constants.constants import Backend
130 filemanager.save(summary_content, summary_path, Backend.DISK.value)
131 return summary_path
133 # Generate output file paths based on the input path
134 base_path = path.replace('.pkl', '')
136 # Save each mask as a separate TIFF file
137 for i, mask in enumerate(data):
138 mask_filename = f"{base_path}_slice_{i:03d}.tif"
140 # Convert mask to appropriate dtype for saving (uint16 to match input images)
141 if mask.dtype != np.uint16:
142 # Normalize to uint16 range if needed
143 if mask.max() <= 1.0:
144 mask_uint16 = (mask * 65535).astype(np.uint16)
145 else:
146 mask_uint16 = mask.astype(np.uint16)
147 else:
148 mask_uint16 = mask
150 # Save using filemanager
151 from openhcs.constants.constants import Backend
152 filemanager.save(mask_uint16, mask_filename, Backend.DISK.value)
153 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename}")
155 # Return summary path
156 summary_path = f"{base_path}_segmentation_summary.txt"
157 summary_content = f"Segmentation masks saved: {len(data)} files\n"
158 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n"
159 summary_content += f"Mask dtype: {data[0].dtype}\n"
160 summary_content += f"Mask shape: {data[0].shape}\n"
162 filemanager.save(summary_content, summary_path, Backend.DISK.value)
163 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks")
165 return summary_path
168@pyclesperanto_func
169@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks))
170def count_cells_single_channel(
171 image_stack: np.ndarray,
172 # Detection method and parameters
173 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
174 # Blob detection parameters
175 min_sigma: float = 1.0, # Minimum blob size (pixels)
176 max_sigma: float = 10.0, # Maximum blob size (pixels)
177 num_sigma: int = 10, # Number of sigma values to test
178 threshold: float = 0.1, # Detection threshold (0.0-1.0)
179 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
180 # Watershed parameters
181 watershed_footprint_size: int = 3, # Local maxima footprint size
182 watershed_min_distance: int = 5, # Minimum distance between peaks
183 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods
184 # Preprocessing parameters
185 enable_preprocessing: bool = True,
186 gaussian_sigma: float = 1.0, # Gaussian blur sigma
187 median_disk_size: int = 1, # Median filter disk size
188 # Filtering parameters
189 min_cell_area: int = 10, # Minimum cell area (pixels)
190 max_cell_area: int = 1000, # Maximum cell area (pixels)
191 remove_border_cells: bool = True, # Remove cells touching image border
192 # Output parameters
193 return_segmentation_mask: bool = False
194) -> Tuple[np.ndarray, List[CellCountResult]]:
195 """
196 Count cells in single-channel image stack using various detection methods.
198 Args:
199 image_stack: 3D array (Z, Y, X) where each Z slice is processed independently
200 detection_method: Method for cell detection (see DetectionMethod enum)
201 min_sigma: Minimum blob size for blob detection methods
202 max_sigma: Maximum blob size for blob detection methods
203 num_sigma: Number of sigma values to test for blob detection
204 threshold: Detection threshold (method-dependent)
205 overlap: Maximum overlap between detected blobs
206 watershed_footprint_size: Footprint size for local maxima detection
207 watershed_min_distance: Minimum distance between watershed peaks
208 watershed_threshold_method: Thresholding method for watershed
209 enable_preprocessing: Apply Gaussian and median filtering
210 gaussian_sigma: Standard deviation for Gaussian blur
211 median_disk_size: Disk size for median filtering
212 min_cell_area: Minimum area for valid cells
213 max_cell_area: Maximum area for valid cells
214 remove_border_cells: Remove cells touching image borders
215 return_segmentation_mask: Return segmentation masks in output
217 Returns:
218 output_stack: Original image stack unchanged (Z, Y, X)
219 cell_count_results: List of CellCountResult objects for each slice
220 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True
221 """
222 if image_stack.ndim != 3:
223 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
225 results = []
226 segmentation_masks = []
228 # Store parameters for reproducibility (convert enums to values)
229 parameters = {
230 "detection_method": detection_method.value,
231 "min_sigma": min_sigma,
232 "max_sigma": max_sigma,
233 "num_sigma": num_sigma,
234 "threshold": threshold,
235 "overlap": overlap,
236 "watershed_footprint_size": watershed_footprint_size,
237 "watershed_min_distance": watershed_min_distance,
238 "watershed_threshold_method": watershed_threshold_method.value if hasattr(watershed_threshold_method, 'value') else watershed_threshold_method,
239 "gaussian_sigma": gaussian_sigma,
240 "median_disk_size": median_disk_size,
241 "min_cell_area": min_cell_area,
242 "max_cell_area": max_cell_area,
243 "remove_border_cells": remove_border_cells
244 }
246 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method")
248 for z_idx in range(image_stack.shape[0]):
249 # Extract slice - keep as pyclesperanto array
250 slice_img = image_stack[z_idx]
252 # Apply preprocessing if enabled (all operations stay in GPU)
253 if enable_preprocessing:
254 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size)
256 # Detect cells using specified method (slice_img stays as cle array)
257 result = _detect_cells_single_method(
258 slice_img, z_idx, detection_method.value, parameters
259 )
261 results.append(result)
263 # Create segmentation mask if requested
264 if return_segmentation_mask:
265 segmentation_mask = _create_segmentation_visualization(
266 slice_img, result.cell_positions, max_sigma
267 )
268 segmentation_masks.append(segmentation_mask)
270 # Always return segmentation masks (empty list if not requested)
271 # This ensures consistent return signature for special outputs system
272 return image_stack, results, segmentation_masks
275@pyclesperanto_func
276@special_outputs(("multi_channel_counts", materialize_cell_counts))
277def count_cells_multi_channel(
278 image_stack: np.ndarray,
279 chan_1: int, # Index of first channel (positional arg)
280 chan_2: int, # Index of second channel (positional arg)
281 # Detection parameters for channel 1 (all single-channel params available)
282 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
283 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels)
284 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels)
285 chan_1_num_sigma: int = 10, # Number of sigma values to test
286 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0)
287 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
288 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size
289 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks
290 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
291 chan_1_enable_preprocessing: bool = True, # Apply preprocessing
292 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma
293 chan_1_median_disk_size: int = 1, # Median filter disk size
294 chan_1_min_area: int = 10, # Minimum cell area (pixels)
295 chan_1_max_area: int = 1000, # Maximum cell area (pixels)
296 chan_1_remove_border_cells: bool = True, # Remove cells touching border
297 # Detection parameters for channel 2 (all single-channel params available)
298 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
299 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels)
300 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels)
301 chan_2_num_sigma: int = 10, # Number of sigma values to test
302 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0)
303 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
304 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size
305 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks
306 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
307 chan_2_enable_preprocessing: bool = True, # Apply preprocessing
308 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma
309 chan_2_median_disk_size: int = 1, # Median filter disk size
310 chan_2_min_area: int = 10, # Minimum cell area (pixels)
311 chan_2_max_area: int = 1000, # Maximum cell area (pixels)
312 chan_2_remove_border_cells: bool = True, # Remove cells touching border
313 # Colocalization parameters
314 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods
315 max_distance: float = 5.0, # Maximum distance for colocalization (pixels)
316 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method
317 intensity_threshold: float = 0.5, # Threshold for intensity-based methods
318 # Output parameters
319 return_colocalization_map: bool = False
320) -> Tuple[np.ndarray, List[MultiChannelResult]]:
321 """
322 Count cells in multi-channel image stack with colocalization analysis.
324 Each channel can be processed with independent parameters, providing the same
325 flexibility as the single-channel function for each channel separately.
327 Args:
328 image_stack: 3D array (Z, Y, X) where Z represents different channels
329 chan_1: Index of first channel in the stack (positional)
330 chan_2: Index of second channel in the stack (positional)
332 # Channel 1 detection parameters (same as single-channel function)
333 chan_1_method: Detection method for channel 1 (DetectionMethod enum)
334 chan_1_min_sigma: Minimum blob size for channel 1
335 chan_1_max_sigma: Maximum blob size for channel 1
336 chan_1_num_sigma: Number of sigma values to test for channel 1
337 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0)
338 chan_1_overlap: Maximum overlap between blobs for channel 1
339 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1
340 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1
341 chan_1_watershed_threshold_method: Thresholding method for channel 1
342 chan_1_enable_preprocessing: Apply preprocessing to channel 1
343 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1
344 chan_1_median_disk_size: Median filter size for channel 1
345 chan_1_min_area: Minimum cell area for channel 1
346 chan_1_max_area: Maximum cell area for channel 1
347 chan_1_remove_border_cells: Remove border cells for channel 1
349 # Channel 2 detection parameters (same as single-channel function)
350 chan_2_method: Detection method for channel 2 (DetectionMethod enum)
351 chan_2_min_sigma: Minimum blob size for channel 2
352 chan_2_max_sigma: Maximum blob size for channel 2
353 chan_2_num_sigma: Number of sigma values to test for channel 2
354 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0)
355 chan_2_overlap: Maximum overlap between blobs for channel 2
356 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2
357 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2
358 chan_2_watershed_threshold_method: Thresholding method for channel 2
359 chan_2_enable_preprocessing: Apply preprocessing to channel 2
360 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2
361 chan_2_median_disk_size: Median filter size for channel 2
362 chan_2_min_area: Minimum cell area for channel 2
363 chan_2_max_area: Maximum cell area for channel 2
364 chan_2_remove_border_cells: Remove border cells for channel 2
366 # Colocalization parameters
367 colocalization_method: Method for determining colocalization (ColocalizationMethod enum)
368 max_distance: Maximum distance for distance-based colocalization (pixels)
369 min_overlap_area: Minimum overlap fraction for area-based colocalization
370 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0)
371 return_colocalization_map: Return colocalization visualization
373 Returns:
374 output_stack: Original images or colocalization maps
375 multi_channel_results: List of MultiChannelResult objects
376 """
377 if image_stack.ndim != 3:
378 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
380 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]:
381 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}")
383 if chan_1 == chan_2:
384 raise ValueError("Channel 1 and Channel 2 must be different")
386 # Extract channel images
387 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency
388 chan_2_img = image_stack[chan_2:chan_2+1]
390 # Count cells in each channel separately using the single-channel function
391 # Channel 1 parameters (all explicit)
392 chan_1_params = {
393 "detection_method": chan_1_method,
394 "min_sigma": chan_1_min_sigma,
395 "max_sigma": chan_1_max_sigma,
396 "num_sigma": chan_1_num_sigma,
397 "threshold": chan_1_threshold,
398 "overlap": chan_1_overlap,
399 "watershed_footprint_size": chan_1_watershed_footprint_size,
400 "watershed_min_distance": chan_1_watershed_min_distance,
401 "watershed_threshold_method": chan_1_watershed_threshold_method,
402 "enable_preprocessing": chan_1_enable_preprocessing,
403 "gaussian_sigma": chan_1_gaussian_sigma,
404 "median_disk_size": chan_1_median_disk_size,
405 "min_cell_area": chan_1_min_area,
406 "max_cell_area": chan_1_max_area,
407 "remove_border_cells": chan_1_remove_border_cells,
408 "return_segmentation_mask": False
409 }
411 # Channel 2 parameters (all explicit)
412 chan_2_params = {
413 "detection_method": chan_2_method,
414 "min_sigma": chan_2_min_sigma,
415 "max_sigma": chan_2_max_sigma,
416 "num_sigma": chan_2_num_sigma,
417 "threshold": chan_2_threshold,
418 "overlap": chan_2_overlap,
419 "watershed_footprint_size": chan_2_watershed_footprint_size,
420 "watershed_min_distance": chan_2_watershed_min_distance,
421 "watershed_threshold_method": chan_2_watershed_threshold_method,
422 "enable_preprocessing": chan_2_enable_preprocessing,
423 "gaussian_sigma": chan_2_gaussian_sigma,
424 "median_disk_size": chan_2_median_disk_size,
425 "min_cell_area": chan_2_min_area,
426 "max_cell_area": chan_2_max_area,
427 "remove_border_cells": chan_2_remove_border_cells,
428 "return_segmentation_mask": False
429 }
431 # Process each channel
432 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params)
433 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params)
435 # Perform colocalization analysis
436 multi_results = []
437 output_stack = image_stack.copy()
439 # Since we're processing single slices from each channel, we only have one result each
440 chan_1_result = chan_1_results[0]
441 chan_2_result = chan_2_results[0]
443 # Analyze colocalization
444 coloc_result = _analyze_colocalization(
445 chan_1_result, chan_2_result, colocalization_method.value,
446 max_distance, min_overlap_area, intensity_threshold
447 )
449 multi_results.append(coloc_result)
451 # Create colocalization visualization if requested
452 if return_colocalization_map:
453 coloc_map = _create_colocalization_map(
454 image_stack[chan_1], image_stack[chan_2], coloc_result
455 )
456 # Replace one of the channels with the colocalization map
457 output_stack = np.stack([image_stack[chan_1], image_stack[chan_2], coloc_map])
459 return output_stack, multi_results
462def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager) -> str:
463 """Materialize single-channel cell counting results."""
464 # Generate output file paths based on the input path
465 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
466 base_path = path.replace('.pkl', '')
467 json_path = f"{base_path}.json"
468 csv_path = f"{base_path}_details.csv"
470 # Ensure output directory exists for disk backend
471 from pathlib import Path
472 output_dir = Path(json_path).parent
473 filemanager.ensure_directory(str(output_dir), Backend.DISK.value)
475 summary = {
476 "analysis_type": "single_channel_cell_counting",
477 "total_slices": len(data),
478 "results_per_slice": []
479 }
480 rows = []
482 total_cells = 0
483 for result in data:
484 total_cells += result.cell_count
486 # Add to summary
487 summary["results_per_slice"].append({
488 "slice_index": result.slice_index,
489 "method": result.method,
490 "cell_count": result.cell_count,
491 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0,
492 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0,
493 "parameters": result.parameters_used
494 })
496 # Add individual cell data to CSV
497 for i, (pos, area, intensity, confidence) in enumerate(zip(
498 result.cell_positions, result.cell_areas,
499 result.cell_intensities, result.detection_confidence
500 )):
501 rows.append({
502 'slice_index': result.slice_index,
503 'cell_id': f"slice_{result.slice_index}_cell_{i}",
504 'x_position': pos[0],
505 'y_position': pos[1],
506 'cell_area': area,
507 'cell_intensity': intensity,
508 'detection_confidence': confidence,
509 'detection_method': result.method
510 })
512 summary["total_cells_all_slices"] = total_cells
513 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0
515 # Save JSON summary (overwrite if exists)
516 json_content = json.dumps(summary, indent=2, default=str)
517 # Remove existing file if it exists using filemanager
518 if filemanager.exists(json_path, Backend.DISK.value):
519 filemanager.delete(json_path, Backend.DISK.value)
520 filemanager.save(json_content, json_path, Backend.DISK.value)
522 # Save CSV details (overwrite if exists)
523 if rows:
524 df = pd.DataFrame(rows)
525 csv_content = df.to_csv(index=False)
526 # Remove existing file if it exists using filemanager
527 if filemanager.exists(csv_path, Backend.DISK.value):
528 filemanager.delete(csv_path, Backend.DISK.value)
529 filemanager.save(csv_content, csv_path, Backend.DISK.value)
531 return json_path
534def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager) -> str:
535 """Materialize multi-channel cell counting and colocalization results."""
536 # Generate output file paths based on the input path
537 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
538 base_path = path.replace('.pkl', '')
539 json_path = f"{base_path}.json"
540 csv_path = f"{base_path}_details.csv"
542 # Ensure output directory exists for disk backend
543 from pathlib import Path
544 output_dir = Path(json_path).parent
545 filemanager.ensure_directory(str(output_dir), Backend.DISK.value)
547 summary = {
548 "analysis_type": "multi_channel_cell_counting_colocalization",
549 "total_slices": len(data),
550 "colocalization_summary": {
551 "total_chan_1_cells": 0,
552 "total_chan_2_cells": 0,
553 "total_colocalized": 0,
554 "average_colocalization_percentage": 0
555 },
556 "results_per_slice": []
557 }
558 rows = []
560 total_coloc_pct = 0
561 for result in data:
562 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count
563 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count
564 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count
565 total_coloc_pct += result.colocalization_percentage
567 # Add to summary
568 summary["results_per_slice"].append({
569 "slice_index": result.slice_index,
570 "chan_1_count": result.chan_1_results.cell_count,
571 "chan_2_count": result.chan_2_results.cell_count,
572 "colocalized_count": result.colocalized_count,
573 "colocalization_percentage": result.colocalization_percentage,
574 "chan_1_only": result.chan_1_only_count,
575 "chan_2_only": result.chan_2_only_count,
576 "colocalization_method": result.colocalization_method,
577 "colocalization_metrics": result.colocalization_metrics
578 })
580 # Add colocalization details to CSV
581 for i, pos in enumerate(result.overlap_positions):
582 rows.append({
583 'slice_index': result.slice_index,
584 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}",
585 'x_position': pos[0],
586 'y_position': pos[1],
587 'colocalization_method': result.colocalization_method
588 })
590 summary["colocalization_summary"]["average_colocalization_percentage"] = (
591 total_coloc_pct / len(data) if data else 0
592 )
594 # Save JSON summary (overwrite if exists)
595 json_content = json.dumps(summary, indent=2, default=str)
596 # Remove existing file if it exists using filemanager
597 if filemanager.exists(json_path, Backend.DISK.value):
598 filemanager.delete(json_path, Backend.DISK.value)
599 filemanager.save(json_content, json_path, Backend.DISK.value)
601 # Save CSV details (overwrite if exists)
602 if rows:
603 df = pd.DataFrame(rows)
604 csv_content = df.to_csv(index=False)
605 # Remove existing file if it exists using filemanager
606 if filemanager.exists(csv_path, Backend.DISK.value):
607 filemanager.delete(csv_path, Backend.DISK.value)
608 filemanager.save(csv_content, csv_path, Backend.DISK.value)
610 return json_path
613def _preprocess_image(image, gaussian_sigma: float, median_disk_size: int):
614 """Apply preprocessing to enhance cell detection using pyclesperanto."""
615 # Assume image is already a pyclesperanto array
616 gpu_image = image
618 # Gaussian blur to reduce noise
619 if gaussian_sigma > 0:
620 gpu_image = cle.gaussian_blur(gpu_image, sigma_x=gaussian_sigma, sigma_y=gaussian_sigma)
622 # Median filter to remove salt-and-pepper noise
623 if median_disk_size > 0:
624 gpu_image = cle.median_box(gpu_image, radius_x=median_disk_size, radius_y=median_disk_size)
626 # Return the GPU array
627 return gpu_image
630def _detect_cells_single_method(
631 image: np.ndarray,
632 slice_idx: int,
633 method: str,
634 params: Dict[str, Any]
635) -> CellCountResult:
636 """Detect cells using specified method."""
638 if method == DetectionMethod.BLOB_LOG.value:
639 return _detect_cells_blob_log(image, slice_idx, params)
640 elif method == DetectionMethod.BLOB_DOG.value:
641 return _detect_cells_blob_dog(image, slice_idx, params)
642 elif method == DetectionMethod.BLOB_DOH.value:
643 return _detect_cells_blob_doh(image, slice_idx, params)
644 elif method == DetectionMethod.WATERSHED.value:
645 return _detect_cells_watershed(image, slice_idx, params)
646 elif method == DetectionMethod.THRESHOLD.value:
647 return _detect_cells_threshold(image, slice_idx, params)
648 else:
649 raise ValueError(f"Unknown detection method: {method}")
652def _detect_cells_blob_log(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
653 """Detect cells using fast LoG-like blob detection."""
654 gpu_image = image
656 # Use single scale for speed - average of min and max sigma
657 sigma = (params["min_sigma"] + params["max_sigma"]) / 2
659 # Fast LoG approximation: Use DoG like blob_dog but with closer scales
660 blurred1 = cle.gaussian_blur(gpu_image, sigma_x=sigma, sigma_y=sigma)
661 blurred2 = cle.gaussian_blur(gpu_image, sigma_x=sigma*1.6, sigma_y=sigma*1.6)
663 # Difference approximates Laplacian
664 edges = cle.subtract_images(blurred1, blurred2)
666 # Threshold the edge response
667 max_response = cle.maximum_of_all_pixels(cle.absolute(edges))
668 threshold_val = params["threshold"] * max_response
669 thresholded = cle.greater_constant(cle.absolute(edges), scalar=threshold_val)
671 # Find local maxima
672 maxima = cle.detect_maxima(cle.absolute(edges),
673 radius_x=int(sigma),
674 radius_y=int(sigma))
676 # Combine threshold and maxima
677 valid_maxima = cle.binary_and(thresholded, maxima)
679 # Dilate maxima more aggressively to create proper cell-sized regions
680 dilated = valid_maxima
681 for _ in range(6): # More aggressive dilation
682 dilated = cle.dilate_box(dilated)
684 # Label connected components
685 labels = cle.connected_components_labeling(dilated)
687 # Remove small and large objects
688 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"])
689 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"])
691 # Get statistics
692 if cle.maximum_of_all_pixels(labels) > 0:
693 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
695 positions = []
696 areas = []
697 intensities = []
698 confidences = []
700 if 'centroid_x' in stats_dict and len(stats_dict['centroid_x']) > 0:
701 for i, (x, y) in enumerate(zip(stats_dict['centroid_x'], stats_dict['centroid_y'])):
702 positions.append((float(x), float(y)))
704 # Get area and intensity
705 area = float(stats_dict['area'][i]) if i < len(stats_dict.get('area', [])) else sigma**2
706 intensity = float(stats_dict['mean_intensity'][i]) if i < len(stats_dict.get('mean_intensity', [])) else 1.0
708 areas.append(area)
709 intensities.append(intensity)
710 confidences.append(min(1.0, intensity / max_response))
711 else:
712 positions = []
713 areas = []
714 intensities = []
715 confidences = []
717 return CellCountResult(
718 slice_index=slice_idx,
719 method="blob_log_pyclesperanto",
720 cell_count=len(positions),
721 cell_positions=positions,
722 cell_areas=areas,
723 cell_intensities=intensities,
724 detection_confidence=confidences,
725 parameters_used=params
726 )
729def _detect_cells_blob_dog(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
730 """Detect cells using fast Difference of Gaussians blob detection."""
731 gpu_image = image
733 # Use only two scales for speed
734 sigma1 = params["min_sigma"]
735 sigma2 = params["max_sigma"]
737 # Apply Gaussian blurs
738 blur1 = cle.gaussian_blur(gpu_image, sigma_x=sigma1, sigma_y=sigma1)
739 blur2 = cle.gaussian_blur(gpu_image, sigma_x=sigma2, sigma_y=sigma2)
741 # Difference of Gaussians
742 dog = cle.subtract_images(blur1, blur2)
744 # Threshold the DoG response
745 max_response = cle.maximum_of_all_pixels(cle.absolute(dog))
746 threshold_val = params["threshold"] * max_response
747 thresholded = cle.greater_constant(cle.absolute(dog), scalar=threshold_val)
749 # Find local maxima
750 maxima = cle.detect_maxima(cle.absolute(dog),
751 radius_x=int(sigma1),
752 radius_y=int(sigma1))
754 # Combine threshold and maxima
755 valid_maxima = cle.binary_and(thresholded, maxima)
757 # Dilate maxima to create regions for area calculation
758 dilated = valid_maxima
759 for _ in range(3): # Moderate dilation
760 dilated = cle.dilate_box(dilated)
762 # Label connected components
763 labels = cle.connected_components_labeling(dilated)
765 # Remove small and large objects
766 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"])
767 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"])
769 # Get statistics
770 if cle.maximum_of_all_pixels(labels) > 0:
771 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
773 positions = []
774 areas = []
775 intensities = []
776 confidences = []
778 if 'centroid_x' in stats_dict and len(stats_dict['centroid_x']) > 0:
779 for i, (x, y) in enumerate(zip(stats_dict['centroid_x'], stats_dict['centroid_y'])):
780 positions.append((float(x), float(y)))
782 # Get area and intensity
783 area = float(stats_dict['area'][i]) if i < len(stats_dict.get('area', [])) else sigma1**2
784 intensity = float(stats_dict['mean_intensity'][i]) if i < len(stats_dict.get('mean_intensity', [])) else 1.0
786 areas.append(area)
787 intensities.append(intensity)
788 confidences.append(min(1.0, intensity / max_response))
789 else:
790 positions = []
791 areas = []
792 intensities = []
793 confidences = []
795 return CellCountResult(
796 slice_index=slice_idx,
797 method="blob_dog_pyclesperanto",
798 cell_count=len(positions),
799 cell_positions=positions,
800 cell_areas=areas,
801 cell_intensities=intensities,
802 detection_confidence=confidences,
803 parameters_used=params
804 )
808 # Extract the data we need from the statistics dictionary
809 if 'label' in stats_dict and len(stats_dict['label']) > 0:
810 # We have detected objects
811 areas = stats_dict.get('area', [])
812 labels_list = stats_dict.get('label', [])
813 else:
814 # No objects detected
815 areas = []
816 labels_list = []
817 centroids_x = []
818 centroids_y = []
820 # Process similar to blob_log
821 positions = []
822 filtered_areas = []
823 intensities = []
824 confidences = []
826 for i in range(len(labels_list)):
827 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas):
828 x = float(centroids_x[i])
829 y = float(centroids_y[i])
830 positions.append((x, y))
832 # Get area
833 area = float(areas[i])
834 filtered_areas.append(area)
836 # Mean intensity (if available in stats)
837 mean_intensities = stats_dict.get('mean_intensity', [])
838 if i < len(mean_intensities):
839 intensity = float(mean_intensities[i])
840 else:
841 intensity = 1.0 # Default value
842 intensities.append(intensity)
844 # Use area as confidence measure
845 confidence = min(1.0, area / (np.pi * params["max_sigma"]**2))
846 confidences.append(confidence)
848 filtered_data = _filter_by_area(
849 positions, filtered_areas, intensities, confidences,
850 params["min_cell_area"], params["max_cell_area"]
851 )
853 return CellCountResult(
854 slice_index=slice_idx,
855 method="blob_dog_pyclesperanto",
856 cell_count=len(filtered_data[0]),
857 cell_positions=filtered_data[0],
858 cell_areas=filtered_data[1],
859 cell_intensities=filtered_data[2],
860 detection_confidence=filtered_data[3],
861 parameters_used=params
862 )
865def _detect_cells_blob_doh(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
866 """Detect cells using Hessian-like detection with pyclesperanto."""
867 # Assume image is already a pyclesperanto array
868 gpu_image = image
870 # Apply Gaussian blur for smoothing
871 sigma = (params["min_sigma"] + params["max_sigma"]) / 2
872 blurred = cle.gaussian_blur(gpu_image, sigma_x=sigma, sigma_y=sigma)
874 # Apply edge detection (approximates Hessian determinant)
875 edges = cle.binary_edge_detection(blurred)
877 # Apply threshold to original image for valid regions
878 threshold_val = params["threshold"] * cle.maximum_of_all_pixels(gpu_image)
879 thresholded = cle.greater_constant(gpu_image, scalar=threshold_val)
881 # Detect local maxima in edge response
882 maxima = cle.detect_maxima_box(edges,
883 radius_x=int(params["min_sigma"]),
884 radius_y=int(params["min_sigma"]))
886 # Combine threshold and maxima
887 combined = cle.binary_and(thresholded, maxima)
889 # Dilate the maxima points to create proper regions
890 dilated = cle.dilate_box(combined)
892 # Label the dilated regions
893 labels = cle.connected_components_labeling(dilated)
895 # Remove small and large objects (same as simple baseline)
896 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"])
897 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"])
899 # Get statistics - this returns a dictionary with centroids included
900 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
902 # Extract centroids directly from statistics (much simpler!)
903 centroids_x = stats_dict.get('centroid_x', [])
904 centroids_y = stats_dict.get('centroid_y', [])
906 # Extract the data we need from the statistics dictionary
907 if 'label' in stats_dict and len(stats_dict['label']) > 0:
908 # We have detected objects
909 areas = stats_dict.get('area', [])
910 labels_list = stats_dict.get('label', [])
911 else:
912 # No objects detected
913 areas = []
914 labels_list = []
915 centroids_x = []
916 centroids_y = []
918 # Process similar to other blob methods
919 positions = []
920 filtered_areas = []
921 intensities = []
922 confidences = []
924 for i in range(len(labels_list)):
925 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas):
926 x = float(centroids_x[i])
927 y = float(centroids_y[i])
928 positions.append((x, y))
930 # Get area
931 area = float(areas[i])
932 filtered_areas.append(area)
934 # Mean intensity (if available in stats)
935 mean_intensities = stats_dict.get('mean_intensity', [])
936 if i < len(mean_intensities):
937 intensity = float(mean_intensities[i])
938 else:
939 intensity = 1.0 # Default value
940 intensities.append(intensity)
942 # Use area as confidence measure
943 confidence = min(1.0, area / (np.pi * params["max_sigma"]**2))
944 confidences.append(confidence)
946 filtered_data = _filter_by_area(
947 positions, filtered_areas, intensities, confidences,
948 params["min_cell_area"], params["max_cell_area"]
949 )
951 return CellCountResult(
952 slice_index=slice_idx,
953 method="blob_doh_pyclesperanto",
954 cell_count=len(filtered_data[0]),
955 cell_positions=filtered_data[0],
956 cell_areas=filtered_data[1],
957 cell_intensities=filtered_data[2],
958 detection_confidence=filtered_data[3],
959 parameters_used=params
960 )
963def _filter_by_area(
964 positions: List[Tuple[float, float]],
965 areas: List[float],
966 intensities: List[float],
967 confidences: List[float],
968 min_area: float,
969 max_area: float
970) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]:
971 """Filter detected cells by area constraints."""
972 filtered_positions = []
973 filtered_areas = []
974 filtered_intensities = []
975 filtered_confidences = []
977 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences):
978 if min_area <= area <= max_area:
979 filtered_positions.append(pos)
980 filtered_areas.append(area)
981 filtered_intensities.append(intensity)
982 filtered_confidences.append(confidence)
984 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences
987def _non_maximum_suppression_3d(positions, scales, responses, overlap_threshold):
988 """Apply non-maximum suppression across scale space."""
989 if len(positions) == 0:
990 return [], [], []
992 # Convert to numpy arrays for easier processing
993 positions = np.array(positions)
994 scales = np.array(scales)
995 responses = np.array(responses)
997 # Sort by response strength (highest first)
998 sorted_indices = np.argsort(responses)[::-1]
1000 keep = []
1001 for i in sorted_indices:
1002 pos_i = positions[i]
1003 scale_i = scales[i]
1005 # Check if this detection overlaps with any already kept detection
1006 should_keep = True
1007 for j in keep:
1008 pos_j = positions[j]
1009 scale_j = scales[j]
1011 # Calculate distance in space and scale
1012 spatial_dist = np.sqrt(np.sum((pos_i - pos_j) ** 2))
1013 scale_dist = abs(scale_i - scale_j) / max(scale_i, scale_j)
1015 # Check if they overlap significantly
1016 overlap_radius = max(scale_i, scale_j) * (1 + overlap_threshold)
1017 if spatial_dist < overlap_radius and scale_dist < overlap_threshold:
1018 should_keep = False
1019 break
1021 if should_keep:
1022 keep.append(i)
1024 # Return filtered results
1025 return positions[keep].tolist(), scales[keep].tolist(), responses[keep].tolist()
1028def _detect_cells_watershed(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
1029 """Detect cells using watershed segmentation with pyclesperanto."""
1030 # Assume image is already a pyclesperanto array
1031 gpu_image = image
1033 # Use Otsu thresholding (optimal for microscopy images)
1034 binary = cle.threshold_otsu(gpu_image)
1036 # Remove small and large objects with explicit memory cleanup
1037 temp_labels = cle.connected_components_labeling(binary)
1039 # Clean up intermediate arrays to prevent GPU OOM
1040 temp_labels_filtered = cle.remove_small_labels(temp_labels, minimum_size=params["min_cell_area"])
1041 del temp_labels # Explicit cleanup
1043 temp_labels = cle.remove_large_labels(temp_labels_filtered, maximum_size=params["max_cell_area"])
1044 del temp_labels_filtered # Explicit cleanup
1046 binary_filtered = cle.greater_constant(temp_labels, scalar=0)
1047 del binary # Cleanup original binary
1048 binary = binary_filtered
1050 # Remove border objects if requested
1051 if params["remove_border_cells"]:
1052 temp_labels_border = cle.connected_components_labeling(binary)
1053 del temp_labels # Cleanup previous temp_labels
1055 temp_labels = cle.remove_labels_on_edges(temp_labels_border)
1056 del temp_labels_border # Explicit cleanup
1058 binary_final = cle.greater_constant(temp_labels, scalar=0)
1059 del binary # Cleanup previous binary
1060 binary = binary_final
1062 # Since pyclesperanto doesn't have watershed, use connected components directly
1063 labels = cle.connected_components_labeling(binary)
1064 del binary # Cleanup binary mask
1066 # Get statistics - this returns a dictionary with centroids included
1067 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
1069 # Cleanup labels array after getting statistics
1070 del labels
1072 # Extract centroids directly from statistics (much simpler!)
1073 centroids_x = stats_dict.get('centroid_x', [])
1074 centroids_y = stats_dict.get('centroid_y', [])
1076 # Extract the data we need from the statistics dictionary
1077 if 'label' in stats_dict and len(stats_dict['label']) > 0:
1078 # We have detected objects
1079 areas = stats_dict.get('area', [])
1080 labels_list = stats_dict.get('label', [])
1081 else:
1082 # No objects detected
1083 areas = []
1084 labels_list = []
1085 centroids_x = []
1086 centroids_y = []
1088 positions = []
1089 filtered_areas = []
1090 intensities = []
1091 confidences = []
1093 for i in range(len(labels_list)):
1094 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas):
1095 area = float(areas[i])
1097 # Filter by area
1098 if params["min_cell_area"] <= area <= params["max_cell_area"]:
1099 x = float(centroids_x[i])
1100 y = float(centroids_y[i])
1101 positions.append((x, y))
1103 filtered_areas.append(area)
1105 # Mean intensity (if available in stats)
1106 mean_intensities = stats_dict.get('mean_intensity', [])
1107 if i < len(mean_intensities):
1108 intensity = float(mean_intensities[i])
1109 else:
1110 intensity = 1.0 # Default value
1111 intensities.append(intensity)
1113 # Use area as confidence measure (normalized)
1114 confidence = min(1.0, area / params["max_cell_area"])
1115 confidences.append(confidence)
1117 # Force garbage collection to free GPU memory
1118 gc.collect()
1120 return CellCountResult(
1121 slice_index=slice_idx,
1122 method="watershed_pyclesperanto",
1123 cell_count=len(positions),
1124 cell_positions=positions,
1125 cell_areas=filtered_areas,
1126 cell_intensities=intensities,
1127 detection_confidence=confidences,
1128 parameters_used=params
1129 )
1132def _detect_cells_threshold(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
1133 """Detect cells using simple thresholding and connected components with pyclesperanto."""
1134 # Image is already a pyclesperanto array - no conversion needed
1135 gpu_image = image
1137 # Apply threshold (all operations stay on GPU)
1138 max_intensity = cle.maximum_of_all_pixels(gpu_image)
1139 threshold_val = params["threshold"] * max_intensity
1140 binary = cle.greater_constant(gpu_image, scalar=threshold_val)
1142 # Remove small objects with explicit memory cleanup
1143 temp_labels = cle.connected_components_labeling(binary)
1144 temp_labels_filtered = cle.remove_small_labels(temp_labels, minimum_size=params["min_cell_area"])
1145 del temp_labels # Explicit cleanup
1147 binary_filtered = cle.greater_constant(temp_labels_filtered, scalar=0)
1148 del binary # Cleanup original binary
1149 del temp_labels_filtered # Cleanup filtered labels
1150 binary = binary_filtered
1152 # Remove border objects if requested with explicit cleanup
1153 if params["remove_border_cells"]:
1154 temp_labels = cle.connected_components_labeling(binary)
1155 temp_labels_no_border = cle.remove_labels_on_edges(temp_labels)
1156 del temp_labels # Explicit cleanup
1158 binary_final = cle.greater_constant(temp_labels_no_border, scalar=0)
1159 del binary # Cleanup previous binary
1160 del temp_labels_no_border # Cleanup no border labels
1161 binary = binary_final
1163 # Label connected components (final labeling on GPU)
1164 labels = cle.connected_components_labeling(binary)
1165 del binary # Cleanup binary mask
1167 # Get statistics (operates on GPU arrays, returns dictionary)
1168 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
1169 del labels # Cleanup labels after getting statistics
1171 # Extract centroids directly from statistics (much simpler!)
1172 centroids_x = stats_dict.get('centroid_x', [])
1173 centroids_y = stats_dict.get('centroid_y', [])
1175 # Extract the data we need from the statistics dictionary
1176 if 'label' in stats_dict and len(stats_dict['label']) > 0:
1177 # We have detected objects
1178 areas = stats_dict.get('area', [])
1179 labels_list = stats_dict.get('label', [])
1180 else:
1181 # No objects detected
1182 areas = []
1183 labels_list = []
1184 centroids_x = []
1185 centroids_y = []
1187 max_intensity_cpu = float(max_intensity)
1189 positions = []
1190 filtered_areas = []
1191 intensities = []
1192 confidences = []
1194 for i in range(len(labels_list)):
1195 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas):
1196 area = float(areas[i])
1198 # Filter by area
1199 if params["min_cell_area"] <= area <= params["max_cell_area"]:
1200 x = float(centroids_x[i])
1201 y = float(centroids_y[i])
1202 positions.append((x, y))
1204 filtered_areas.append(area)
1206 # Mean intensity (if available in stats)
1207 mean_intensities = stats_dict.get('mean_intensity', [])
1208 if i < len(mean_intensities):
1209 mean_intensity = float(mean_intensities[i])
1210 else:
1211 mean_intensity = 1.0 # Default value
1212 intensities.append(mean_intensity)
1214 # Use intensity as confidence measure
1215 confidence = mean_intensity / max_intensity_cpu if max_intensity_cpu > 0 else 1.0
1216 confidences.append(confidence)
1218 # Force garbage collection to free GPU memory
1219 gc.collect()
1221 return CellCountResult(
1222 slice_index=slice_idx,
1223 method="threshold_pyclesperanto",
1224 cell_count=len(positions),
1225 cell_positions=positions,
1226 cell_areas=filtered_areas,
1227 cell_intensities=intensities,
1228 detection_confidence=confidences,
1229 parameters_used=params
1230 )
1233def _analyze_colocalization(
1234 chan_1_result: CellCountResult,
1235 chan_2_result: CellCountResult,
1236 method: str,
1237 max_distance: float,
1238 min_overlap_area: float,
1239 intensity_threshold: float
1240) -> MultiChannelResult:
1241 """Analyze colocalization between two channels."""
1243 if method == ColocalizationMethod.DISTANCE_BASED.value:
1244 return _colocalization_distance_based(
1245 chan_1_result, chan_2_result, max_distance
1246 )
1247 elif method == ColocalizationMethod.OVERLAP_AREA.value:
1248 return _colocalization_overlap_based(
1249 chan_1_result, chan_2_result, min_overlap_area
1250 )
1251 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value:
1252 return _colocalization_intensity_based(
1253 chan_1_result, chan_2_result, intensity_threshold
1254 )
1255 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value:
1256 return _colocalization_manders(
1257 chan_1_result, chan_2_result, intensity_threshold
1258 )
1259 else:
1260 raise ValueError(f"Unknown colocalization method: {method}")
1263def _colocalization_distance_based(
1264 chan_1_result: CellCountResult,
1265 chan_2_result: CellCountResult,
1266 max_distance: float
1267) -> MultiChannelResult:
1268 """Perform distance-based colocalization analysis."""
1270 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1271 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based")
1273 # Convert positions to arrays
1274 pos_1 = np.array(chan_1_result.cell_positions)
1275 pos_2 = np.array(chan_2_result.cell_positions)
1277 # Calculate pairwise distances
1278 distances = cdist(pos_1, pos_2)
1280 # Find colocalized pairs
1281 colocalized_pairs = []
1282 used_chan_2 = set()
1284 for i in range(len(pos_1)):
1285 # Find closest cell in channel 2
1286 min_dist_idx = np.argmin(distances[i])
1287 min_dist = distances[i, min_dist_idx]
1289 # Check if within distance threshold and not already used
1290 if min_dist <= max_distance and min_dist_idx not in used_chan_2:
1291 colocalized_pairs.append((i, min_dist_idx))
1292 used_chan_2.add(min_dist_idx)
1294 # Calculate metrics
1295 colocalized_count = len(colocalized_pairs)
1296 total_cells = len(pos_1) + len(pos_2)
1297 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1299 chan_1_only = len(pos_1) - colocalized_count
1300 chan_2_only = len(pos_2) - colocalized_count
1302 # Extract colocalized positions (average of paired positions)
1303 overlap_positions = []
1304 for i, j in colocalized_pairs:
1305 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist()
1306 overlap_positions.append(tuple(avg_pos))
1308 # Calculate additional metrics
1309 if colocalized_pairs:
1310 avg_distance = np.mean([distances[i, j] for i, j in colocalized_pairs])
1311 max_distance_found = np.max([distances[i, j] for i, j in colocalized_pairs])
1312 else:
1313 avg_distance = 0
1314 max_distance_found = 0
1316 metrics = {
1317 "average_colocalization_distance": float(avg_distance),
1318 "max_colocalization_distance": float(max_distance_found),
1319 "distance_threshold_used": max_distance
1320 }
1322 return MultiChannelResult(
1323 slice_index=chan_1_result.slice_index,
1324 chan_1_results=chan_1_result,
1325 chan_2_results=chan_2_result,
1326 colocalization_method="distance_based",
1327 colocalized_count=colocalized_count,
1328 colocalization_percentage=colocalization_percentage,
1329 chan_1_only_count=chan_1_only,
1330 chan_2_only_count=chan_2_only,
1331 colocalization_metrics=metrics,
1332 overlap_positions=overlap_positions
1333 )
1336def _create_empty_coloc_result(
1337 chan_1_result: CellCountResult,
1338 chan_2_result: CellCountResult,
1339 method: str
1340) -> MultiChannelResult:
1341 """Create empty colocalization result when no cells found."""
1342 return MultiChannelResult(
1343 slice_index=chan_1_result.slice_index,
1344 chan_1_results=chan_1_result,
1345 chan_2_results=chan_2_result,
1346 colocalization_method=method,
1347 colocalized_count=0,
1348 colocalization_percentage=0.0,
1349 chan_1_only_count=chan_1_result.cell_count,
1350 chan_2_only_count=chan_2_result.cell_count,
1351 colocalization_metrics={},
1352 overlap_positions=[]
1353 )
1356def _colocalization_overlap_based(
1357 chan_1_result: CellCountResult,
1358 chan_2_result: CellCountResult,
1359 min_overlap_area: float
1360) -> MultiChannelResult:
1361 """Perform area overlap-based colocalization analysis."""
1362 # This is a simplified implementation - in practice, you'd need actual segmentation masks
1363 # For now, we'll use distance as a proxy for overlap
1365 # Use distance-based method with smaller threshold as approximation
1366 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly
1368 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold)
1369 result.colocalization_method = "overlap_area"
1370 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area
1371 result.colocalization_metrics["note"] = "Approximated using distance-based method"
1373 return result
1376def _colocalization_intensity_based(
1377 chan_1_result: CellCountResult,
1378 chan_2_result: CellCountResult,
1379 intensity_threshold: float
1380) -> MultiChannelResult:
1381 """Perform intensity correlation-based colocalization analysis."""
1383 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1384 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation")
1386 # Use distance-based pairing first
1387 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0)
1389 # Filter pairs based on intensity correlation
1390 colocalized_pairs = []
1391 overlap_positions = []
1393 pos_1 = np.array(chan_1_result.cell_positions)
1394 pos_2 = np.array(chan_2_result.cell_positions)
1396 for i, (x1, y1) in enumerate(chan_1_result.cell_positions):
1397 for j, (x2, y2) in enumerate(chan_2_result.cell_positions):
1398 # Calculate distance
1399 dist = np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
1401 if dist <= 5.0: # Within reasonable distance
1402 # Check intensity correlation
1403 int_1 = chan_1_result.cell_intensities[i]
1404 int_2 = chan_2_result.cell_intensities[j]
1406 # Simple intensity correlation: both above threshold
1407 if int_1 >= intensity_threshold and int_2 >= intensity_threshold:
1408 colocalized_pairs.append((i, j))
1409 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2)
1410 overlap_positions.append(avg_pos)
1411 break # One-to-one mapping
1413 colocalized_count = len(colocalized_pairs)
1414 total_cells = len(pos_1) + len(pos_2)
1415 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1417 metrics = {
1418 "intensity_threshold_used": intensity_threshold,
1419 "correlation_method": "threshold_based"
1420 }
1422 return MultiChannelResult(
1423 slice_index=chan_1_result.slice_index,
1424 chan_1_results=chan_1_result,
1425 chan_2_results=chan_2_result,
1426 colocalization_method="intensity_correlation",
1427 colocalized_count=colocalized_count,
1428 colocalization_percentage=colocalization_percentage,
1429 chan_1_only_count=len(pos_1) - colocalized_count,
1430 chan_2_only_count=len(pos_2) - colocalized_count,
1431 colocalization_metrics=metrics,
1432 overlap_positions=overlap_positions
1433 )
1436def _colocalization_manders(
1437 chan_1_result: CellCountResult,
1438 chan_2_result: CellCountResult,
1439 intensity_threshold: float
1440) -> MultiChannelResult:
1441 """Calculate Manders colocalization coefficients."""
1443 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1444 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients")
1446 # Simplified Manders calculation based on detected cells
1447 # In practice, this would use pixel-level intensity analysis
1449 # Use intensity-based method as foundation
1450 intensity_result = _colocalization_intensity_based(
1451 chan_1_result, chan_2_result, intensity_threshold
1452 )
1454 # Calculate Manders-like coefficients
1455 total_int_1 = sum(chan_1_result.cell_intensities)
1456 total_int_2 = sum(chan_2_result.cell_intensities)
1458 # Simplified: assume colocalized cells contribute their full intensity
1459 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in
1460 [(i, j) for i in range(len(chan_1_result.cell_positions))
1461 for j in range(len(chan_2_result.cell_positions))
1462 if (i, j) in [(0, 0)]]) # Simplified placeholder
1464 # Manders coefficients (M1 and M2)
1465 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0
1466 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified
1468 intensity_result.colocalization_method = "manders_coefficients"
1469 intensity_result.colocalization_metrics.update({
1470 "manders_m1": m1,
1471 "manders_m2": m2,
1472 "note": "Simplified cell-based Manders calculation"
1473 })
1475 return intensity_result
1478def _create_segmentation_visualization(
1479 image: np.ndarray,
1480 positions: List[Tuple[float, float]],
1481 max_sigma: float
1482) -> np.ndarray:
1483 """Create segmentation visualization with detected cells marked."""
1484 # Convert pyclesperanto array to numpy only when needed for visualization
1485 import pyclesperanto as cle
1486 if hasattr(image, 'shape') and hasattr(image, 'dtype') and not isinstance(image, np.ndarray):
1487 # This is a pyclesperanto array - convert to numpy only for final visualization
1488 visualization = cle.pull(image).copy()
1489 else:
1490 visualization = image.copy()
1492 # Mark detected cells
1493 for x, y in positions:
1494 # Create small circular markers
1495 rr, cc = np.ogrid[:image.shape[0], :image.shape[1]]
1496 mask = (rr - y)**2 + (cc - x)**2 <= (max_sigma * 2)**2
1498 # Ensure indices are within bounds
1499 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1])
1500 mask = mask & valid_mask
1502 visualization[mask] = visualization.max() # Bright markers
1504 return visualization
1507def count_cells_simple_baseline(
1508 image: np.ndarray, # 2D image only
1509 threshold: float = 0.1,
1510 min_cell_area: int = 50,
1511 max_cell_area: int = 5000
1512) -> Tuple[np.ndarray, int, List[Tuple[float, float]]]:
1513 """
1514 Simple baseline cell counting using pyclesperanto.
1515 Based on analyse_blobs.ipynb and voronoi_otsu_labeling.ipynb examples.
1517 Args:
1518 image: 2D numpy array
1519 threshold: Threshold as fraction of max intensity (0.0-1.0)
1520 min_cell_area: Minimum area in pixels
1521 max_cell_area: Maximum area in pixels
1523 Returns:
1524 segmentation_mask: 2D array with labeled cells
1525 cell_count: Number of detected cells
1526 cell_positions: List of (x, y) centroid positions
1527 """
1528 # Convert to pyclesperanto array
1529 gpu_image = cle.push(image.astype(np.float32))
1531 # Apply threshold
1532 max_intensity = cle.maximum_of_all_pixels(gpu_image)
1533 threshold_val = threshold * max_intensity
1534 binary = cle.greater_constant(gpu_image, scalar=threshold_val)
1536 # Connected components labeling
1537 labels = cle.connected_components_labeling(binary)
1539 # Remove small and large objects
1540 labels = cle.remove_small_labels(labels, minimum_size=min_cell_area)
1541 labels = cle.remove_large_labels(labels, maximum_size=max_cell_area)
1543 # Get statistics
1544 stats = cle.statistics_of_labelled_pixels(gpu_image, labels)
1546 # Extract results
1547 if 'label' in stats and len(stats['label']) > 0:
1548 cell_count = len(stats['label'])
1549 centroids_x = stats.get('centroid_x', [])
1550 centroids_y = stats.get('centroid_y', [])
1551 cell_positions = [(float(x), float(y)) for x, y in zip(centroids_x, centroids_y)]
1552 else:
1553 cell_count = 0
1554 cell_positions = []
1556 # Convert result back to numpy
1557 segmentation_mask = cle.pull(labels)
1559 return segmentation_mask, cell_count, cell_positions
1562def _create_colocalization_map(
1563 chan_1_img: np.ndarray,
1564 chan_2_img: np.ndarray,
1565 coloc_result: MultiChannelResult
1566) -> np.ndarray:
1567 """Create colocalization visualization map."""
1568 # Create RGB-like visualization
1569 coloc_map = np.zeros_like(chan_1_img)
1571 # Mark colocalized positions
1572 for x, y in coloc_result.overlap_positions:
1573 # Create markers for colocalized cells
1574 rr, cc = np.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]]
1575 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius
1577 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1])
1578 mask = mask & valid_mask
1580 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers
1582 return coloc_map