Coverage for openhcs/processing/backends/analysis/cell_counting_pyclesperanto.py: 1.3%
605 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2GPU-accelerated cell counting and multi-channel colocalization analysis for OpenHCS.
4This module provides comprehensive cell counting capabilities using pyclesperanto,
5supporting both single-channel and multi-channel analysis with various detection
6methods and colocalization metrics.
7"""
9import numpy as np
10import logging
11import gc
12from typing import Dict, List, Tuple, Any, Union
13from dataclasses import dataclass
14from enum import Enum
16logger = logging.getLogger(__name__)
18# Core scientific computing imports
19import pandas as pd
20import json
21import pyclesperanto as cle
22from scipy.spatial.distance import cdist
24# OpenHCS imports
25from openhcs.core.memory.decorators import pyclesperanto as pyclesperanto_func
26from openhcs.core.pipeline.function_contracts import special_outputs
27from openhcs.constants.constants import Backend
30class DetectionMethod(Enum):
31 """Cell detection methods available."""
32 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells)
33 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose)
34 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells)
35 WATERSHED = "watershed" # Watershed segmentation (best for touching cells)
36 THRESHOLD = "threshold" # Simple thresholding (fastest, basic)
39class ColocalizationMethod(Enum):
40 """Methods for multi-channel colocalization analysis."""
41 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap
42 DISTANCE_BASED = "distance_based" # Based on centroid distances
43 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation
44 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients
47class ThresholdMethod(Enum):
48 """Automatic thresholding methods for watershed segmentation."""
49 OTSU = "otsu" # Otsu's method (good for bimodal histograms)
50 LI = "li" # Li's method (good for low contrast images)
51 MANUAL = "manual" # Manual threshold value (0.0-1.0)
57@dataclass
58class CellCountResult:
59 """Results for single-channel cell counting."""
60 slice_index: int
61 method: str
62 cell_count: int
63 cell_positions: List[Tuple[float, float]] # (x, y) centroids
64 cell_areas: List[float]
65 cell_intensities: List[float]
66 detection_confidence: List[float]
67 parameters_used: Dict[str, Any]
70@dataclass
71class MultiChannelResult:
72 """Results for multi-channel cell counting and colocalization."""
73 slice_index: int
74 chan_1_results: CellCountResult
75 chan_2_results: CellCountResult
76 colocalization_method: str
77 colocalized_count: int
78 colocalization_percentage: float
79 chan_1_only_count: int
80 chan_2_only_count: int
81 colocalization_metrics: Dict[str, float]
82 overlap_positions: List[Tuple[float, float]]
85def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager, backend: str) -> str:
86 """Materialize cell counting results as analysis-ready CSV and JSON formats."""
88 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}, backend={backend}")
90 # Convert CLE binary_mask to NumPy if present
91 if data:
92 for result in data:
93 if result.binary_mask is not None:
94 result.binary_mask = np.asarray(result.binary_mask)
95 if isinstance(result, MultiChannelResult):
96 if result.chan_1_results.binary_mask is not None:
97 result.chan_1_results.binary_mask = np.asarray(result.chan_1_results.binary_mask)
98 if result.chan_2_results.binary_mask is not None:
99 result.chan_2_results.binary_mask = np.asarray(result.chan_2_results.binary_mask)
101 # Determine if this is single-channel or multi-channel data
102 if not data:
103 logger.warning("🔬 CELL_COUNT_MATERIALIZE: No data to materialize")
104 return path
106 is_multi_channel = isinstance(data[0], MultiChannelResult)
107 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}")
109 if is_multi_channel:
110 return _materialize_multi_channel_results(data, path, filemanager, backend)
111 else:
112 return _materialize_single_channel_results(data, path, filemanager, backend)
115def materialize_segmentation_masks(data: List[np.ndarray], path: str, filemanager, backend: str) -> str:
116 """
117 Materialize segmentation masks as individual TIFF files.
119 Single-backend signature - decorator automatically wraps for multi-backend support.
120 """
122 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, backend={backend}, masks_count={len(data) if data else 0}")
124 # Convert CLE arrays to NumPy
125 if data:
126 data = [np.asarray(mask) for mask in data]
128 if not data:
129 logger.info("🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)")
130 # Create empty summary file to indicate no masks were generated
131 summary_path = path.replace('.pkl', '_segmentation_summary.txt')
132 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n"
133 filemanager.save(summary_content, summary_path, backend)
134 return summary_path
136 # Generate output file paths based on the input path
137 base_path = path.replace('.pkl', '')
139 # Save each mask as a separate TIFF file
140 for i, mask in enumerate(data):
141 mask_filename = f"{base_path}_slice_{i:03d}.tif"
143 # Convert mask to appropriate dtype for saving (uint16 to match input images)
144 if mask.dtype != np.uint16:
145 # Normalize to uint16 range if needed
146 if mask.max() <= 1.0:
147 mask_uint16 = (mask * 65535).astype(np.uint16)
148 else:
149 mask_uint16 = mask.astype(np.uint16)
150 else:
151 mask_uint16 = mask
153 # Save using filemanager with provided backend
154 filemanager.save(mask_uint16, mask_filename, backend)
155 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Saved mask {i} to {mask_filename} (backend={backend})")
157 # Return summary path
158 summary_path = f"{base_path}_segmentation_summary.txt"
159 summary_content = f"Segmentation masks saved: {len(data)} files\n"
160 summary_content += f"Base filename pattern: {base_path}_slice_XXX.tif\n"
161 summary_content += f"Mask dtype: {data[0].dtype}\n"
162 summary_content += f"Mask shape: {data[0].shape}\n"
164 filemanager.save(summary_content, summary_path, backend)
165 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved {len(data)} masks to backend={backend}")
167 return summary_path
170@pyclesperanto_func
171@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks))
172def count_cells_single_channel(
173 image_stack: np.ndarray,
174 # Detection method and parameters
175 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
176 # Blob detection parameters
177 min_sigma: float = 1.0, # Minimum blob size (pixels)
178 max_sigma: float = 10.0, # Maximum blob size (pixels)
179 num_sigma: int = 10, # Number of sigma values to test
180 threshold: float = 0.1, # Detection threshold (0.0-1.0)
181 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
182 # Watershed parameters
183 watershed_footprint_size: int = 3, # Local maxima footprint size
184 watershed_min_distance: int = 5, # Minimum distance between peaks
185 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods
186 # Preprocessing parameters
187 enable_preprocessing: bool = True,
188 gaussian_sigma: float = 1.0, # Gaussian blur sigma
189 median_disk_size: int = 1, # Median filter disk size
190 # Filtering parameters
191 min_cell_area: int = 10, # Minimum cell area (pixels)
192 max_cell_area: int = 1000, # Maximum cell area (pixels)
193 remove_border_cells: bool = True, # Remove cells touching image border
194 # Output parameters
195 return_segmentation_mask: bool = False
196) -> Tuple[np.ndarray, List[CellCountResult]]:
197 """
198 Count cells in single-channel image stack using various detection methods.
200 Args:
201 image_stack: 3D array (Z, Y, X) where each Z slice is processed independently
202 detection_method: Method for cell detection (see DetectionMethod enum)
203 min_sigma: Minimum blob size for blob detection methods
204 max_sigma: Maximum blob size for blob detection methods
205 num_sigma: Number of sigma values to test for blob detection
206 threshold: Detection threshold (method-dependent)
207 overlap: Maximum overlap between detected blobs
208 watershed_footprint_size: Footprint size for local maxima detection
209 watershed_min_distance: Minimum distance between watershed peaks
210 watershed_threshold_method: Thresholding method for watershed
211 enable_preprocessing: Apply Gaussian and median filtering
212 gaussian_sigma: Standard deviation for Gaussian blur
213 median_disk_size: Disk size for median filtering
214 min_cell_area: Minimum area for valid cells
215 max_cell_area: Maximum area for valid cells
216 remove_border_cells: Remove cells touching image borders
217 return_segmentation_mask: Return segmentation masks in output
219 Returns:
220 output_stack: Original image stack unchanged (Z, Y, X)
221 cell_count_results: List of CellCountResult objects for each slice
222 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True
223 """
224 if image_stack.ndim != 3:
225 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
227 results = []
228 segmentation_masks = []
230 # Store parameters for reproducibility (convert enums to values)
231 parameters = {
232 "detection_method": detection_method.value,
233 "min_sigma": min_sigma,
234 "max_sigma": max_sigma,
235 "num_sigma": num_sigma,
236 "threshold": threshold,
237 "overlap": overlap,
238 "watershed_footprint_size": watershed_footprint_size,
239 "watershed_min_distance": watershed_min_distance,
240 "watershed_threshold_method": watershed_threshold_method.value if hasattr(watershed_threshold_method, 'value') else watershed_threshold_method,
241 "gaussian_sigma": gaussian_sigma,
242 "median_disk_size": median_disk_size,
243 "min_cell_area": min_cell_area,
244 "max_cell_area": max_cell_area,
245 "remove_border_cells": remove_border_cells
246 }
248 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method")
250 for z_idx in range(image_stack.shape[0]):
251 # Extract slice - keep as pyclesperanto array
252 slice_img = image_stack[z_idx]
254 # Apply preprocessing if enabled (all operations stay in GPU)
255 if enable_preprocessing:
256 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size)
258 # Detect cells using specified method (slice_img stays as cle array)
259 result = _detect_cells_single_method(
260 slice_img, z_idx, detection_method.value, parameters
261 )
263 results.append(result)
265 # Create segmentation mask if requested
266 if return_segmentation_mask:
267 segmentation_mask = _create_segmentation_visualization(
268 slice_img, result.cell_positions, max_sigma
269 )
270 segmentation_masks.append(segmentation_mask)
272 # Always return segmentation masks (empty list if not requested)
273 # This ensures consistent return signature for special outputs system
274 return image_stack, results, segmentation_masks
277@pyclesperanto_func
278@special_outputs(("multi_channel_counts", materialize_cell_counts))
279def count_cells_multi_channel(
280 image_stack: np.ndarray,
281 chan_1: int, # Index of first channel (positional arg)
282 chan_2: int, # Index of second channel (positional arg)
283 # Detection parameters for channel 1 (all single-channel params available)
284 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
285 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels)
286 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels)
287 chan_1_num_sigma: int = 10, # Number of sigma values to test
288 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0)
289 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
290 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size
291 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks
292 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
293 chan_1_enable_preprocessing: bool = True, # Apply preprocessing
294 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma
295 chan_1_median_disk_size: int = 1, # Median filter disk size
296 chan_1_min_area: int = 10, # Minimum cell area (pixels)
297 chan_1_max_area: int = 1000, # Maximum cell area (pixels)
298 chan_1_remove_border_cells: bool = True, # Remove cells touching border
299 # Detection parameters for channel 2 (all single-channel params available)
300 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
301 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels)
302 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels)
303 chan_2_num_sigma: int = 10, # Number of sigma values to test
304 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0)
305 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
306 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size
307 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks
308 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
309 chan_2_enable_preprocessing: bool = True, # Apply preprocessing
310 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma
311 chan_2_median_disk_size: int = 1, # Median filter disk size
312 chan_2_min_area: int = 10, # Minimum cell area (pixels)
313 chan_2_max_area: int = 1000, # Maximum cell area (pixels)
314 chan_2_remove_border_cells: bool = True, # Remove cells touching border
315 # Colocalization parameters
316 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods
317 max_distance: float = 5.0, # Maximum distance for colocalization (pixels)
318 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method
319 intensity_threshold: float = 0.5, # Threshold for intensity-based methods
320 # Output parameters
321 return_colocalization_map: bool = False
322) -> Tuple[np.ndarray, List[MultiChannelResult]]:
323 """
324 Count cells in multi-channel image stack with colocalization analysis.
326 Each channel can be processed with independent parameters, providing the same
327 flexibility as the single-channel function for each channel separately.
329 Args:
330 image_stack: 3D array (Z, Y, X) where Z represents different channels
331 chan_1: Index of first channel in the stack (positional)
332 chan_2: Index of second channel in the stack (positional)
334 # Channel 1 detection parameters (same as single-channel function)
335 chan_1_method: Detection method for channel 1 (DetectionMethod enum)
336 chan_1_min_sigma: Minimum blob size for channel 1
337 chan_1_max_sigma: Maximum blob size for channel 1
338 chan_1_num_sigma: Number of sigma values to test for channel 1
339 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0)
340 chan_1_overlap: Maximum overlap between blobs for channel 1
341 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1
342 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1
343 chan_1_watershed_threshold_method: Thresholding method for channel 1
344 chan_1_enable_preprocessing: Apply preprocessing to channel 1
345 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1
346 chan_1_median_disk_size: Median filter size for channel 1
347 chan_1_min_area: Minimum cell area for channel 1
348 chan_1_max_area: Maximum cell area for channel 1
349 chan_1_remove_border_cells: Remove border cells for channel 1
351 # Channel 2 detection parameters (same as single-channel function)
352 chan_2_method: Detection method for channel 2 (DetectionMethod enum)
353 chan_2_min_sigma: Minimum blob size for channel 2
354 chan_2_max_sigma: Maximum blob size for channel 2
355 chan_2_num_sigma: Number of sigma values to test for channel 2
356 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0)
357 chan_2_overlap: Maximum overlap between blobs for channel 2
358 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2
359 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2
360 chan_2_watershed_threshold_method: Thresholding method for channel 2
361 chan_2_enable_preprocessing: Apply preprocessing to channel 2
362 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2
363 chan_2_median_disk_size: Median filter size for channel 2
364 chan_2_min_area: Minimum cell area for channel 2
365 chan_2_max_area: Maximum cell area for channel 2
366 chan_2_remove_border_cells: Remove border cells for channel 2
368 # Colocalization parameters
369 colocalization_method: Method for determining colocalization (ColocalizationMethod enum)
370 max_distance: Maximum distance for distance-based colocalization (pixels)
371 min_overlap_area: Minimum overlap fraction for area-based colocalization
372 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0)
373 return_colocalization_map: Return colocalization visualization
375 Returns:
376 output_stack: Original images or colocalization maps
377 multi_channel_results: List of MultiChannelResult objects
378 """
379 if image_stack.ndim != 3:
380 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
382 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]:
383 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}")
385 if chan_1 == chan_2:
386 raise ValueError("Channel 1 and Channel 2 must be different")
388 # Extract channel images
389 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency
390 chan_2_img = image_stack[chan_2:chan_2+1]
392 # Count cells in each channel separately using the single-channel function
393 # Channel 1 parameters (all explicit)
394 chan_1_params = {
395 "detection_method": chan_1_method,
396 "min_sigma": chan_1_min_sigma,
397 "max_sigma": chan_1_max_sigma,
398 "num_sigma": chan_1_num_sigma,
399 "threshold": chan_1_threshold,
400 "overlap": chan_1_overlap,
401 "watershed_footprint_size": chan_1_watershed_footprint_size,
402 "watershed_min_distance": chan_1_watershed_min_distance,
403 "watershed_threshold_method": chan_1_watershed_threshold_method,
404 "enable_preprocessing": chan_1_enable_preprocessing,
405 "gaussian_sigma": chan_1_gaussian_sigma,
406 "median_disk_size": chan_1_median_disk_size,
407 "min_cell_area": chan_1_min_area,
408 "max_cell_area": chan_1_max_area,
409 "remove_border_cells": chan_1_remove_border_cells,
410 "return_segmentation_mask": False
411 }
413 # Channel 2 parameters (all explicit)
414 chan_2_params = {
415 "detection_method": chan_2_method,
416 "min_sigma": chan_2_min_sigma,
417 "max_sigma": chan_2_max_sigma,
418 "num_sigma": chan_2_num_sigma,
419 "threshold": chan_2_threshold,
420 "overlap": chan_2_overlap,
421 "watershed_footprint_size": chan_2_watershed_footprint_size,
422 "watershed_min_distance": chan_2_watershed_min_distance,
423 "watershed_threshold_method": chan_2_watershed_threshold_method,
424 "enable_preprocessing": chan_2_enable_preprocessing,
425 "gaussian_sigma": chan_2_gaussian_sigma,
426 "median_disk_size": chan_2_median_disk_size,
427 "min_cell_area": chan_2_min_area,
428 "max_cell_area": chan_2_max_area,
429 "remove_border_cells": chan_2_remove_border_cells,
430 "return_segmentation_mask": False
431 }
433 # Process each channel
434 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params)
435 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params)
437 # Perform colocalization analysis
438 multi_results = []
439 output_stack = image_stack.copy()
441 # Since we're processing single slices from each channel, we only have one result each
442 chan_1_result = chan_1_results[0]
443 chan_2_result = chan_2_results[0]
445 # Analyze colocalization
446 coloc_result = _analyze_colocalization(
447 chan_1_result, chan_2_result, colocalization_method.value,
448 max_distance, min_overlap_area, intensity_threshold
449 )
451 multi_results.append(coloc_result)
453 # Create colocalization visualization if requested
454 if return_colocalization_map:
455 coloc_map = _create_colocalization_map(
456 image_stack[chan_1], image_stack[chan_2], coloc_result
457 )
458 # Replace one of the channels with the colocalization map
459 output_stack = np.stack([image_stack[chan_1], image_stack[chan_2], coloc_map])
461 return output_stack, multi_results
464def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager, backend: str) -> str:
465 """Materialize single-channel cell counting results."""
466 # Generate output file paths based on the input path
467 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
468 base_path = path.replace('.pkl', '')
469 json_path = f"{base_path}.json"
470 csv_path = f"{base_path}_details.csv"
472 # Ensure output directory exists for disk backend
473 from pathlib import Path
474 output_dir = Path(json_path).parent
475 if backend == Backend.DISK.value:
476 filemanager.ensure_directory(str(output_dir), backend)
478 summary = {
479 "analysis_type": "single_channel_cell_counting",
480 "total_slices": len(data),
481 "results_per_slice": []
482 }
483 rows = []
485 total_cells = 0
486 for result in data:
487 total_cells += result.cell_count
489 # Add to summary
490 summary["results_per_slice"].append({
491 "slice_index": result.slice_index,
492 "method": result.method,
493 "cell_count": result.cell_count,
494 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0,
495 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0,
496 "parameters": result.parameters_used
497 })
499 # Add individual cell data to CSV
500 for i, (pos, area, intensity, confidence) in enumerate(zip(
501 result.cell_positions, result.cell_areas,
502 result.cell_intensities, result.detection_confidence
503 )):
504 rows.append({
505 'slice_index': result.slice_index,
506 'cell_id': f"slice_{result.slice_index}_cell_{i}",
507 'x_position': pos[0],
508 'y_position': pos[1],
509 'cell_area': area,
510 'cell_intensity': intensity,
511 'detection_confidence': confidence,
512 'detection_method': result.method
513 })
515 summary["total_cells_all_slices"] = total_cells
516 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0
518 # Save JSON summary (overwrite if exists)
519 json_content = json.dumps(summary, indent=2, default=str)
520 # Remove existing file if it exists using filemanager
521 if filemanager.exists(json_path, backend):
522 filemanager.delete(json_path, backend)
523 filemanager.save(json_content, json_path, backend)
525 # Save CSV details (overwrite if exists)
526 if rows:
527 df = pd.DataFrame(rows)
528 csv_content = df.to_csv(index=False)
529 # Remove existing file if it exists using filemanager
530 if filemanager.exists(csv_path, backend):
531 filemanager.delete(csv_path, backend)
532 filemanager.save(csv_content, csv_path, backend)
534 return json_path
537def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager, backend: str) -> str:
538 """Materialize multi-channel cell counting and colocalization results."""
539 # Generate output file paths based on the input path
540 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
541 base_path = path.replace('.pkl', '')
542 json_path = f"{base_path}.json"
543 csv_path = f"{base_path}_details.csv"
545 # Ensure output directory exists for disk backend
546 from pathlib import Path
547 output_dir = Path(json_path).parent
548 if backend == Backend.DISK.value:
549 filemanager.ensure_directory(str(output_dir), backend)
551 summary = {
552 "analysis_type": "multi_channel_cell_counting_colocalization",
553 "total_slices": len(data),
554 "colocalization_summary": {
555 "total_chan_1_cells": 0,
556 "total_chan_2_cells": 0,
557 "total_colocalized": 0,
558 "average_colocalization_percentage": 0
559 },
560 "results_per_slice": []
561 }
562 rows = []
564 total_coloc_pct = 0
565 for result in data:
566 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count
567 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count
568 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count
569 total_coloc_pct += result.colocalization_percentage
571 # Add to summary
572 summary["results_per_slice"].append({
573 "slice_index": result.slice_index,
574 "chan_1_count": result.chan_1_results.cell_count,
575 "chan_2_count": result.chan_2_results.cell_count,
576 "colocalized_count": result.colocalized_count,
577 "colocalization_percentage": result.colocalization_percentage,
578 "chan_1_only": result.chan_1_only_count,
579 "chan_2_only": result.chan_2_only_count,
580 "colocalization_method": result.colocalization_method,
581 "colocalization_metrics": result.colocalization_metrics
582 })
584 # Add colocalization details to CSV
585 for i, pos in enumerate(result.overlap_positions):
586 rows.append({
587 'slice_index': result.slice_index,
588 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}",
589 'x_position': pos[0],
590 'y_position': pos[1],
591 'colocalization_method': result.colocalization_method
592 })
594 summary["colocalization_summary"]["average_colocalization_percentage"] = (
595 total_coloc_pct / len(data) if data else 0
596 )
598 # Save JSON summary (overwrite if exists)
599 json_content = json.dumps(summary, indent=2, default=str)
600 # Remove existing file if it exists using filemanager
601 if filemanager.exists(json_path, backend):
602 filemanager.delete(json_path, backend)
603 filemanager.save(json_content, json_path, backend)
605 # Save CSV details (overwrite if exists)
606 if rows:
607 df = pd.DataFrame(rows)
608 csv_content = df.to_csv(index=False)
609 # Remove existing file if it exists using filemanager
610 if filemanager.exists(csv_path, backend):
611 filemanager.delete(csv_path, backend)
612 filemanager.save(csv_content, csv_path, backend)
614 return json_path
617def _preprocess_image(image, gaussian_sigma: float, median_disk_size: int):
618 """Apply preprocessing to enhance cell detection using pyclesperanto."""
619 # Assume image is already a pyclesperanto array
620 gpu_image = image
622 # Gaussian blur to reduce noise
623 if gaussian_sigma > 0:
624 gpu_image = cle.gaussian_blur(gpu_image, sigma_x=gaussian_sigma, sigma_y=gaussian_sigma)
626 # Median filter to remove salt-and-pepper noise
627 if median_disk_size > 0:
628 gpu_image = cle.median_box(gpu_image, radius_x=median_disk_size, radius_y=median_disk_size)
630 # Return the GPU array
631 return gpu_image
634def _detect_cells_single_method(
635 image: np.ndarray,
636 slice_idx: int,
637 method: str,
638 params: Dict[str, Any]
639) -> CellCountResult:
640 """Detect cells using specified method."""
642 if method == DetectionMethod.BLOB_LOG.value:
643 return _detect_cells_blob_log(image, slice_idx, params)
644 elif method == DetectionMethod.BLOB_DOG.value:
645 return _detect_cells_blob_dog(image, slice_idx, params)
646 elif method == DetectionMethod.BLOB_DOH.value:
647 return _detect_cells_blob_doh(image, slice_idx, params)
648 elif method == DetectionMethod.WATERSHED.value:
649 return _detect_cells_watershed(image, slice_idx, params)
650 elif method == DetectionMethod.THRESHOLD.value:
651 return _detect_cells_threshold(image, slice_idx, params)
652 else:
653 raise ValueError(f"Unknown detection method: {method}")
656def _detect_cells_blob_log(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
657 """Detect cells using fast LoG-like blob detection."""
658 gpu_image = image
660 # Use single scale for speed - average of min and max sigma
661 sigma = (params["min_sigma"] + params["max_sigma"]) / 2
663 # Fast LoG approximation: Use DoG like blob_dog but with closer scales
664 blurred1 = cle.gaussian_blur(gpu_image, sigma_x=sigma, sigma_y=sigma)
665 blurred2 = cle.gaussian_blur(gpu_image, sigma_x=sigma*1.6, sigma_y=sigma*1.6)
667 # Difference approximates Laplacian
668 edges = cle.subtract_images(blurred1, blurred2)
670 # Threshold the edge response
671 max_response = cle.maximum_of_all_pixels(cle.absolute(edges))
672 threshold_val = params["threshold"] * max_response
673 thresholded = cle.greater_constant(cle.absolute(edges), scalar=threshold_val)
675 # Find local maxima
676 maxima = cle.detect_maxima(cle.absolute(edges),
677 radius_x=int(sigma),
678 radius_y=int(sigma))
680 # Combine threshold and maxima
681 valid_maxima = cle.binary_and(thresholded, maxima)
683 # Dilate maxima more aggressively to create proper cell-sized regions
684 dilated = valid_maxima
685 for _ in range(6): # More aggressive dilation
686 dilated = cle.dilate_box(dilated)
688 # Label connected components
689 labels = cle.connected_components_labeling(dilated)
691 # Remove small and large objects
692 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"])
693 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"])
695 # Get statistics
696 if cle.maximum_of_all_pixels(labels) > 0:
697 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
699 positions = []
700 areas = []
701 intensities = []
702 confidences = []
704 if 'centroid_x' in stats_dict and len(stats_dict['centroid_x']) > 0:
705 for i, (x, y) in enumerate(zip(stats_dict['centroid_x'], stats_dict['centroid_y'])):
706 positions.append((float(x), float(y)))
708 # Get area and intensity
709 area = float(stats_dict['area'][i]) if i < len(stats_dict.get('area', [])) else sigma**2
710 intensity = float(stats_dict['mean_intensity'][i]) if i < len(stats_dict.get('mean_intensity', [])) else 1.0
712 areas.append(area)
713 intensities.append(intensity)
714 confidences.append(min(1.0, intensity / max_response))
715 else:
716 positions = []
717 areas = []
718 intensities = []
719 confidences = []
721 return CellCountResult(
722 slice_index=slice_idx,
723 method="blob_log_pyclesperanto",
724 cell_count=len(positions),
725 cell_positions=positions,
726 cell_areas=areas,
727 cell_intensities=intensities,
728 detection_confidence=confidences,
729 parameters_used=params
730 )
733def _detect_cells_blob_dog(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
734 """Detect cells using fast Difference of Gaussians blob detection."""
735 gpu_image = image
737 # Use only two scales for speed
738 sigma1 = params["min_sigma"]
739 sigma2 = params["max_sigma"]
741 # Apply Gaussian blurs
742 blur1 = cle.gaussian_blur(gpu_image, sigma_x=sigma1, sigma_y=sigma1)
743 blur2 = cle.gaussian_blur(gpu_image, sigma_x=sigma2, sigma_y=sigma2)
745 # Difference of Gaussians
746 dog = cle.subtract_images(blur1, blur2)
748 # Threshold the DoG response
749 max_response = cle.maximum_of_all_pixels(cle.absolute(dog))
750 threshold_val = params["threshold"] * max_response
751 thresholded = cle.greater_constant(cle.absolute(dog), scalar=threshold_val)
753 # Find local maxima
754 maxima = cle.detect_maxima(cle.absolute(dog),
755 radius_x=int(sigma1),
756 radius_y=int(sigma1))
758 # Combine threshold and maxima
759 valid_maxima = cle.binary_and(thresholded, maxima)
761 # Dilate maxima to create regions for area calculation
762 dilated = valid_maxima
763 for _ in range(3): # Moderate dilation
764 dilated = cle.dilate_box(dilated)
766 # Label connected components
767 labels = cle.connected_components_labeling(dilated)
769 # Remove small and large objects
770 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"])
771 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"])
773 # Get statistics
774 if cle.maximum_of_all_pixels(labels) > 0:
775 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
777 positions = []
778 areas = []
779 intensities = []
780 confidences = []
782 if 'centroid_x' in stats_dict and len(stats_dict['centroid_x']) > 0:
783 for i, (x, y) in enumerate(zip(stats_dict['centroid_x'], stats_dict['centroid_y'])):
784 positions.append((float(x), float(y)))
786 # Get area and intensity
787 area = float(stats_dict['area'][i]) if i < len(stats_dict.get('area', [])) else sigma1**2
788 intensity = float(stats_dict['mean_intensity'][i]) if i < len(stats_dict.get('mean_intensity', [])) else 1.0
790 areas.append(area)
791 intensities.append(intensity)
792 confidences.append(min(1.0, intensity / max_response))
793 else:
794 positions = []
795 areas = []
796 intensities = []
797 confidences = []
799 return CellCountResult(
800 slice_index=slice_idx,
801 method="blob_dog_pyclesperanto",
802 cell_count=len(positions),
803 cell_positions=positions,
804 cell_areas=areas,
805 cell_intensities=intensities,
806 detection_confidence=confidences,
807 parameters_used=params
808 )
812 # Extract the data we need from the statistics dictionary
813 if 'label' in stats_dict and len(stats_dict['label']) > 0:
814 # We have detected objects
815 areas = stats_dict.get('area', [])
816 labels_list = stats_dict.get('label', [])
817 else:
818 # No objects detected
819 areas = []
820 labels_list = []
821 centroids_x = []
822 centroids_y = []
824 # Process similar to blob_log
825 positions = []
826 filtered_areas = []
827 intensities = []
828 confidences = []
830 for i in range(len(labels_list)):
831 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas):
832 x = float(centroids_x[i])
833 y = float(centroids_y[i])
834 positions.append((x, y))
836 # Get area
837 area = float(areas[i])
838 filtered_areas.append(area)
840 # Mean intensity (if available in stats)
841 mean_intensities = stats_dict.get('mean_intensity', [])
842 if i < len(mean_intensities):
843 intensity = float(mean_intensities[i])
844 else:
845 intensity = 1.0 # Default value
846 intensities.append(intensity)
848 # Use area as confidence measure
849 confidence = min(1.0, area / (np.pi * params["max_sigma"]**2))
850 confidences.append(confidence)
852 filtered_data = _filter_by_area(
853 positions, filtered_areas, intensities, confidences,
854 params["min_cell_area"], params["max_cell_area"]
855 )
857 return CellCountResult(
858 slice_index=slice_idx,
859 method="blob_dog_pyclesperanto",
860 cell_count=len(filtered_data[0]),
861 cell_positions=filtered_data[0],
862 cell_areas=filtered_data[1],
863 cell_intensities=filtered_data[2],
864 detection_confidence=filtered_data[3],
865 parameters_used=params
866 )
869def _detect_cells_blob_doh(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
870 """Detect cells using Hessian-like detection with pyclesperanto."""
871 # Assume image is already a pyclesperanto array
872 gpu_image = image
874 # Apply Gaussian blur for smoothing
875 sigma = (params["min_sigma"] + params["max_sigma"]) / 2
876 blurred = cle.gaussian_blur(gpu_image, sigma_x=sigma, sigma_y=sigma)
878 # Apply edge detection (approximates Hessian determinant)
879 edges = cle.binary_edge_detection(blurred)
881 # Apply threshold to original image for valid regions
882 threshold_val = params["threshold"] * cle.maximum_of_all_pixels(gpu_image)
883 thresholded = cle.greater_constant(gpu_image, scalar=threshold_val)
885 # Detect local maxima in edge response
886 maxima = cle.detect_maxima_box(edges,
887 radius_x=int(params["min_sigma"]),
888 radius_y=int(params["min_sigma"]))
890 # Combine threshold and maxima
891 combined = cle.binary_and(thresholded, maxima)
893 # Dilate the maxima points to create proper regions
894 dilated = cle.dilate_box(combined)
896 # Label the dilated regions
897 labels = cle.connected_components_labeling(dilated)
899 # Remove small and large objects (same as simple baseline)
900 labels = cle.remove_small_labels(labels, minimum_size=params["min_cell_area"])
901 labels = cle.remove_large_labels(labels, maximum_size=params["max_cell_area"])
903 # Get statistics - this returns a dictionary with centroids included
904 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
906 # Extract centroids directly from statistics (much simpler!)
907 centroids_x = stats_dict.get('centroid_x', [])
908 centroids_y = stats_dict.get('centroid_y', [])
910 # Extract the data we need from the statistics dictionary
911 if 'label' in stats_dict and len(stats_dict['label']) > 0:
912 # We have detected objects
913 areas = stats_dict.get('area', [])
914 labels_list = stats_dict.get('label', [])
915 else:
916 # No objects detected
917 areas = []
918 labels_list = []
919 centroids_x = []
920 centroids_y = []
922 # Process similar to other blob methods
923 positions = []
924 filtered_areas = []
925 intensities = []
926 confidences = []
928 for i in range(len(labels_list)):
929 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas):
930 x = float(centroids_x[i])
931 y = float(centroids_y[i])
932 positions.append((x, y))
934 # Get area
935 area = float(areas[i])
936 filtered_areas.append(area)
938 # Mean intensity (if available in stats)
939 mean_intensities = stats_dict.get('mean_intensity', [])
940 if i < len(mean_intensities):
941 intensity = float(mean_intensities[i])
942 else:
943 intensity = 1.0 # Default value
944 intensities.append(intensity)
946 # Use area as confidence measure
947 confidence = min(1.0, area / (np.pi * params["max_sigma"]**2))
948 confidences.append(confidence)
950 filtered_data = _filter_by_area(
951 positions, filtered_areas, intensities, confidences,
952 params["min_cell_area"], params["max_cell_area"]
953 )
955 return CellCountResult(
956 slice_index=slice_idx,
957 method="blob_doh_pyclesperanto",
958 cell_count=len(filtered_data[0]),
959 cell_positions=filtered_data[0],
960 cell_areas=filtered_data[1],
961 cell_intensities=filtered_data[2],
962 detection_confidence=filtered_data[3],
963 parameters_used=params
964 )
967def _filter_by_area(
968 positions: List[Tuple[float, float]],
969 areas: List[float],
970 intensities: List[float],
971 confidences: List[float],
972 min_area: float,
973 max_area: float
974) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]:
975 """Filter detected cells by area constraints."""
976 filtered_positions = []
977 filtered_areas = []
978 filtered_intensities = []
979 filtered_confidences = []
981 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences):
982 if min_area <= area <= max_area:
983 filtered_positions.append(pos)
984 filtered_areas.append(area)
985 filtered_intensities.append(intensity)
986 filtered_confidences.append(confidence)
988 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences
991def _non_maximum_suppression_3d(positions, scales, responses, overlap_threshold):
992 """Apply non-maximum suppression across scale space."""
993 if len(positions) == 0:
994 return [], [], []
996 # Convert to numpy arrays for easier processing
997 positions = np.array(positions)
998 scales = np.array(scales)
999 responses = np.array(responses)
1001 # Sort by response strength (highest first)
1002 sorted_indices = np.argsort(responses)[::-1]
1004 keep = []
1005 for i in sorted_indices:
1006 pos_i = positions[i]
1007 scale_i = scales[i]
1009 # Check if this detection overlaps with any already kept detection
1010 should_keep = True
1011 for j in keep:
1012 pos_j = positions[j]
1013 scale_j = scales[j]
1015 # Calculate distance in space and scale
1016 spatial_dist = np.sqrt(np.sum((pos_i - pos_j) ** 2))
1017 scale_dist = abs(scale_i - scale_j) / max(scale_i, scale_j)
1019 # Check if they overlap significantly
1020 overlap_radius = max(scale_i, scale_j) * (1 + overlap_threshold)
1021 if spatial_dist < overlap_radius and scale_dist < overlap_threshold:
1022 should_keep = False
1023 break
1025 if should_keep:
1026 keep.append(i)
1028 # Return filtered results
1029 return positions[keep].tolist(), scales[keep].tolist(), responses[keep].tolist()
1032def _detect_cells_watershed(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
1033 """Detect cells using watershed segmentation with pyclesperanto."""
1034 # Assume image is already a pyclesperanto array
1035 gpu_image = image
1037 # Use Otsu thresholding (optimal for microscopy images)
1038 binary = cle.threshold_otsu(gpu_image)
1040 # Remove small and large objects with explicit memory cleanup
1041 temp_labels = cle.connected_components_labeling(binary)
1043 # Clean up intermediate arrays to prevent GPU OOM
1044 temp_labels_filtered = cle.remove_small_labels(temp_labels, minimum_size=params["min_cell_area"])
1045 del temp_labels # Explicit cleanup
1047 temp_labels = cle.remove_large_labels(temp_labels_filtered, maximum_size=params["max_cell_area"])
1048 del temp_labels_filtered # Explicit cleanup
1050 binary_filtered = cle.greater_constant(temp_labels, scalar=0)
1051 del binary # Cleanup original binary
1052 binary = binary_filtered
1054 # Remove border objects if requested
1055 if params["remove_border_cells"]:
1056 temp_labels_border = cle.connected_components_labeling(binary)
1057 del temp_labels # Cleanup previous temp_labels
1059 temp_labels = cle.remove_labels_on_edges(temp_labels_border)
1060 del temp_labels_border # Explicit cleanup
1062 binary_final = cle.greater_constant(temp_labels, scalar=0)
1063 del binary # Cleanup previous binary
1064 binary = binary_final
1066 # Since pyclesperanto doesn't have watershed, use connected components directly
1067 labels = cle.connected_components_labeling(binary)
1068 del binary # Cleanup binary mask
1070 # Get statistics - this returns a dictionary with centroids included
1071 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
1073 # Cleanup labels array after getting statistics
1074 del labels
1076 # Extract centroids directly from statistics (much simpler!)
1077 centroids_x = stats_dict.get('centroid_x', [])
1078 centroids_y = stats_dict.get('centroid_y', [])
1080 # Extract the data we need from the statistics dictionary
1081 if 'label' in stats_dict and len(stats_dict['label']) > 0:
1082 # We have detected objects
1083 areas = stats_dict.get('area', [])
1084 labels_list = stats_dict.get('label', [])
1085 else:
1086 # No objects detected
1087 areas = []
1088 labels_list = []
1089 centroids_x = []
1090 centroids_y = []
1092 positions = []
1093 filtered_areas = []
1094 intensities = []
1095 confidences = []
1097 for i in range(len(labels_list)):
1098 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas):
1099 area = float(areas[i])
1101 # Filter by area
1102 if params["min_cell_area"] <= area <= params["max_cell_area"]:
1103 x = float(centroids_x[i])
1104 y = float(centroids_y[i])
1105 positions.append((x, y))
1107 filtered_areas.append(area)
1109 # Mean intensity (if available in stats)
1110 mean_intensities = stats_dict.get('mean_intensity', [])
1111 if i < len(mean_intensities):
1112 intensity = float(mean_intensities[i])
1113 else:
1114 intensity = 1.0 # Default value
1115 intensities.append(intensity)
1117 # Use area as confidence measure (normalized)
1118 confidence = min(1.0, area / params["max_cell_area"])
1119 confidences.append(confidence)
1121 # Force garbage collection to free GPU memory
1122 gc.collect()
1124 return CellCountResult(
1125 slice_index=slice_idx,
1126 method="watershed_pyclesperanto",
1127 cell_count=len(positions),
1128 cell_positions=positions,
1129 cell_areas=filtered_areas,
1130 cell_intensities=intensities,
1131 detection_confidence=confidences,
1132 parameters_used=params
1133 )
1136def _detect_cells_threshold(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
1137 """Detect cells using simple thresholding and connected components with pyclesperanto."""
1138 # Image is already a pyclesperanto array - no conversion needed
1139 gpu_image = image
1141 # Apply threshold (all operations stay on GPU)
1142 max_intensity = cle.maximum_of_all_pixels(gpu_image)
1143 threshold_val = params["threshold"] * max_intensity
1144 binary = cle.greater_constant(gpu_image, scalar=threshold_val)
1146 # Remove small objects with explicit memory cleanup
1147 temp_labels = cle.connected_components_labeling(binary)
1148 temp_labels_filtered = cle.remove_small_labels(temp_labels, minimum_size=params["min_cell_area"])
1149 del temp_labels # Explicit cleanup
1151 binary_filtered = cle.greater_constant(temp_labels_filtered, scalar=0)
1152 del binary # Cleanup original binary
1153 del temp_labels_filtered # Cleanup filtered labels
1154 binary = binary_filtered
1156 # Remove border objects if requested with explicit cleanup
1157 if params["remove_border_cells"]:
1158 temp_labels = cle.connected_components_labeling(binary)
1159 temp_labels_no_border = cle.remove_labels_on_edges(temp_labels)
1160 del temp_labels # Explicit cleanup
1162 binary_final = cle.greater_constant(temp_labels_no_border, scalar=0)
1163 del binary # Cleanup previous binary
1164 del temp_labels_no_border # Cleanup no border labels
1165 binary = binary_final
1167 # Label connected components (final labeling on GPU)
1168 labels = cle.connected_components_labeling(binary)
1169 del binary # Cleanup binary mask
1171 # Get statistics (operates on GPU arrays, returns dictionary)
1172 stats_dict = cle.statistics_of_labelled_pixels(gpu_image, labels)
1173 del labels # Cleanup labels after getting statistics
1175 # Extract centroids directly from statistics (much simpler!)
1176 centroids_x = stats_dict.get('centroid_x', [])
1177 centroids_y = stats_dict.get('centroid_y', [])
1179 # Extract the data we need from the statistics dictionary
1180 if 'label' in stats_dict and len(stats_dict['label']) > 0:
1181 # We have detected objects
1182 areas = stats_dict.get('area', [])
1183 labels_list = stats_dict.get('label', [])
1184 else:
1185 # No objects detected
1186 areas = []
1187 labels_list = []
1188 centroids_x = []
1189 centroids_y = []
1191 max_intensity_cpu = float(max_intensity)
1193 positions = []
1194 filtered_areas = []
1195 intensities = []
1196 confidences = []
1198 for i in range(len(labels_list)):
1199 if i < len(centroids_x) and i < len(centroids_y) and i < len(areas):
1200 area = float(areas[i])
1202 # Filter by area
1203 if params["min_cell_area"] <= area <= params["max_cell_area"]:
1204 x = float(centroids_x[i])
1205 y = float(centroids_y[i])
1206 positions.append((x, y))
1208 filtered_areas.append(area)
1210 # Mean intensity (if available in stats)
1211 mean_intensities = stats_dict.get('mean_intensity', [])
1212 if i < len(mean_intensities):
1213 mean_intensity = float(mean_intensities[i])
1214 else:
1215 mean_intensity = 1.0 # Default value
1216 intensities.append(mean_intensity)
1218 # Use intensity as confidence measure
1219 confidence = mean_intensity / max_intensity_cpu if max_intensity_cpu > 0 else 1.0
1220 confidences.append(confidence)
1222 # Force garbage collection to free GPU memory
1223 gc.collect()
1225 return CellCountResult(
1226 slice_index=slice_idx,
1227 method="threshold_pyclesperanto",
1228 cell_count=len(positions),
1229 cell_positions=positions,
1230 cell_areas=filtered_areas,
1231 cell_intensities=intensities,
1232 detection_confidence=confidences,
1233 parameters_used=params
1234 )
1237def _analyze_colocalization(
1238 chan_1_result: CellCountResult,
1239 chan_2_result: CellCountResult,
1240 method: str,
1241 max_distance: float,
1242 min_overlap_area: float,
1243 intensity_threshold: float
1244) -> MultiChannelResult:
1245 """Analyze colocalization between two channels."""
1247 if method == ColocalizationMethod.DISTANCE_BASED.value:
1248 return _colocalization_distance_based(
1249 chan_1_result, chan_2_result, max_distance
1250 )
1251 elif method == ColocalizationMethod.OVERLAP_AREA.value:
1252 return _colocalization_overlap_based(
1253 chan_1_result, chan_2_result, min_overlap_area
1254 )
1255 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value:
1256 return _colocalization_intensity_based(
1257 chan_1_result, chan_2_result, intensity_threshold
1258 )
1259 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value:
1260 return _colocalization_manders(
1261 chan_1_result, chan_2_result, intensity_threshold
1262 )
1263 else:
1264 raise ValueError(f"Unknown colocalization method: {method}")
1267def _colocalization_distance_based(
1268 chan_1_result: CellCountResult,
1269 chan_2_result: CellCountResult,
1270 max_distance: float
1271) -> MultiChannelResult:
1272 """Perform distance-based colocalization analysis."""
1274 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1275 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based")
1277 # Convert positions to arrays
1278 pos_1 = np.array(chan_1_result.cell_positions)
1279 pos_2 = np.array(chan_2_result.cell_positions)
1281 # Calculate pairwise distances
1282 distances = cdist(pos_1, pos_2)
1284 # Find colocalized pairs
1285 colocalized_pairs = []
1286 used_chan_2 = set()
1288 for i in range(len(pos_1)):
1289 # Find closest cell in channel 2
1290 min_dist_idx = np.argmin(distances[i])
1291 min_dist = distances[i, min_dist_idx]
1293 # Check if within distance threshold and not already used
1294 if min_dist <= max_distance and min_dist_idx not in used_chan_2:
1295 colocalized_pairs.append((i, min_dist_idx))
1296 used_chan_2.add(min_dist_idx)
1298 # Calculate metrics
1299 colocalized_count = len(colocalized_pairs)
1300 total_cells = len(pos_1) + len(pos_2)
1301 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1303 chan_1_only = len(pos_1) - colocalized_count
1304 chan_2_only = len(pos_2) - colocalized_count
1306 # Extract colocalized positions (average of paired positions)
1307 overlap_positions = []
1308 for i, j in colocalized_pairs:
1309 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist()
1310 overlap_positions.append(tuple(avg_pos))
1312 # Calculate additional metrics
1313 if colocalized_pairs:
1314 avg_distance = np.mean([distances[i, j] for i, j in colocalized_pairs])
1315 max_distance_found = np.max([distances[i, j] for i, j in colocalized_pairs])
1316 else:
1317 avg_distance = 0
1318 max_distance_found = 0
1320 metrics = {
1321 "average_colocalization_distance": float(avg_distance),
1322 "max_colocalization_distance": float(max_distance_found),
1323 "distance_threshold_used": max_distance
1324 }
1326 return MultiChannelResult(
1327 slice_index=chan_1_result.slice_index,
1328 chan_1_results=chan_1_result,
1329 chan_2_results=chan_2_result,
1330 colocalization_method="distance_based",
1331 colocalized_count=colocalized_count,
1332 colocalization_percentage=colocalization_percentage,
1333 chan_1_only_count=chan_1_only,
1334 chan_2_only_count=chan_2_only,
1335 colocalization_metrics=metrics,
1336 overlap_positions=overlap_positions
1337 )
1340def _create_empty_coloc_result(
1341 chan_1_result: CellCountResult,
1342 chan_2_result: CellCountResult,
1343 method: str
1344) -> MultiChannelResult:
1345 """Create empty colocalization result when no cells found."""
1346 return MultiChannelResult(
1347 slice_index=chan_1_result.slice_index,
1348 chan_1_results=chan_1_result,
1349 chan_2_results=chan_2_result,
1350 colocalization_method=method,
1351 colocalized_count=0,
1352 colocalization_percentage=0.0,
1353 chan_1_only_count=chan_1_result.cell_count,
1354 chan_2_only_count=chan_2_result.cell_count,
1355 colocalization_metrics={},
1356 overlap_positions=[]
1357 )
1360def _colocalization_overlap_based(
1361 chan_1_result: CellCountResult,
1362 chan_2_result: CellCountResult,
1363 min_overlap_area: float
1364) -> MultiChannelResult:
1365 """Perform area overlap-based colocalization analysis."""
1366 # This is a simplified implementation - in practice, you'd need actual segmentation masks
1367 # For now, we'll use distance as a proxy for overlap
1369 # Use distance-based method with smaller threshold as approximation
1370 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly
1372 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold)
1373 result.colocalization_method = "overlap_area"
1374 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area
1375 result.colocalization_metrics["note"] = "Approximated using distance-based method"
1377 return result
1380def _colocalization_intensity_based(
1381 chan_1_result: CellCountResult,
1382 chan_2_result: CellCountResult,
1383 intensity_threshold: float
1384) -> MultiChannelResult:
1385 """Perform intensity correlation-based colocalization analysis."""
1387 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1388 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation")
1390 # Use distance-based pairing first
1391 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0)
1393 # Filter pairs based on intensity correlation
1394 colocalized_pairs = []
1395 overlap_positions = []
1397 pos_1 = np.array(chan_1_result.cell_positions)
1398 pos_2 = np.array(chan_2_result.cell_positions)
1400 for i, (x1, y1) in enumerate(chan_1_result.cell_positions):
1401 for j, (x2, y2) in enumerate(chan_2_result.cell_positions):
1402 # Calculate distance
1403 dist = np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
1405 if dist <= 5.0: # Within reasonable distance
1406 # Check intensity correlation
1407 int_1 = chan_1_result.cell_intensities[i]
1408 int_2 = chan_2_result.cell_intensities[j]
1410 # Simple intensity correlation: both above threshold
1411 if int_1 >= intensity_threshold and int_2 >= intensity_threshold:
1412 colocalized_pairs.append((i, j))
1413 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2)
1414 overlap_positions.append(avg_pos)
1415 break # One-to-one mapping
1417 colocalized_count = len(colocalized_pairs)
1418 total_cells = len(pos_1) + len(pos_2)
1419 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1421 metrics = {
1422 "intensity_threshold_used": intensity_threshold,
1423 "correlation_method": "threshold_based"
1424 }
1426 return MultiChannelResult(
1427 slice_index=chan_1_result.slice_index,
1428 chan_1_results=chan_1_result,
1429 chan_2_results=chan_2_result,
1430 colocalization_method="intensity_correlation",
1431 colocalized_count=colocalized_count,
1432 colocalization_percentage=colocalization_percentage,
1433 chan_1_only_count=len(pos_1) - colocalized_count,
1434 chan_2_only_count=len(pos_2) - colocalized_count,
1435 colocalization_metrics=metrics,
1436 overlap_positions=overlap_positions
1437 )
1440def _colocalization_manders(
1441 chan_1_result: CellCountResult,
1442 chan_2_result: CellCountResult,
1443 intensity_threshold: float
1444) -> MultiChannelResult:
1445 """Calculate Manders colocalization coefficients."""
1447 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1448 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients")
1450 # Simplified Manders calculation based on detected cells
1451 # In practice, this would use pixel-level intensity analysis
1453 # Use intensity-based method as foundation
1454 intensity_result = _colocalization_intensity_based(
1455 chan_1_result, chan_2_result, intensity_threshold
1456 )
1458 # Calculate Manders-like coefficients
1459 total_int_1 = sum(chan_1_result.cell_intensities)
1460 total_int_2 = sum(chan_2_result.cell_intensities)
1462 # Simplified: assume colocalized cells contribute their full intensity
1463 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in
1464 [(i, j) for i in range(len(chan_1_result.cell_positions))
1465 for j in range(len(chan_2_result.cell_positions))
1466 if (i, j) in [(0, 0)]]) # Simplified placeholder
1468 # Manders coefficients (M1 and M2)
1469 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0
1470 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified
1472 intensity_result.colocalization_method = "manders_coefficients"
1473 intensity_result.colocalization_metrics.update({
1474 "manders_m1": m1,
1475 "manders_m2": m2,
1476 "note": "Simplified cell-based Manders calculation"
1477 })
1479 return intensity_result
1482def _create_segmentation_visualization(
1483 image: np.ndarray,
1484 positions: List[Tuple[float, float]],
1485 max_sigma: float
1486) -> np.ndarray:
1487 """Create segmentation visualization with detected cells marked."""
1488 # Convert pyclesperanto array to numpy only when needed for visualization
1489 import pyclesperanto as cle
1490 if hasattr(image, 'shape') and hasattr(image, 'dtype') and not isinstance(image, np.ndarray):
1491 # This is a pyclesperanto array - convert to numpy only for final visualization
1492 visualization = cle.pull(image).copy()
1493 else:
1494 visualization = image.copy()
1496 # Mark detected cells
1497 for x, y in positions:
1498 # Create small circular markers
1499 rr, cc = np.ogrid[:image.shape[0], :image.shape[1]]
1500 mask = (rr - y)**2 + (cc - x)**2 <= (max_sigma * 2)**2
1502 # Ensure indices are within bounds
1503 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1])
1504 mask = mask & valid_mask
1506 visualization[mask] = visualization.max() # Bright markers
1508 return visualization
1511def count_cells_simple_baseline(
1512 image: np.ndarray, # 2D image only
1513 threshold: float = 0.1,
1514 min_cell_area: int = 50,
1515 max_cell_area: int = 5000
1516) -> Tuple[np.ndarray, int, List[Tuple[float, float]]]:
1517 """
1518 Simple baseline cell counting using pyclesperanto.
1519 Based on analyse_blobs.ipynb and voronoi_otsu_labeling.ipynb examples.
1521 Args:
1522 image: 2D numpy array
1523 threshold: Threshold as fraction of max intensity (0.0-1.0)
1524 min_cell_area: Minimum area in pixels
1525 max_cell_area: Maximum area in pixels
1527 Returns:
1528 segmentation_mask: 2D array with labeled cells
1529 cell_count: Number of detected cells
1530 cell_positions: List of (x, y) centroid positions
1531 """
1532 # Convert to pyclesperanto array
1533 gpu_image = cle.push(image.astype(np.float32))
1535 # Apply threshold
1536 max_intensity = cle.maximum_of_all_pixels(gpu_image)
1537 threshold_val = threshold * max_intensity
1538 binary = cle.greater_constant(gpu_image, scalar=threshold_val)
1540 # Connected components labeling
1541 labels = cle.connected_components_labeling(binary)
1543 # Remove small and large objects
1544 labels = cle.remove_small_labels(labels, minimum_size=min_cell_area)
1545 labels = cle.remove_large_labels(labels, maximum_size=max_cell_area)
1547 # Get statistics
1548 stats = cle.statistics_of_labelled_pixels(gpu_image, labels)
1550 # Extract results
1551 if 'label' in stats and len(stats['label']) > 0:
1552 cell_count = len(stats['label'])
1553 centroids_x = stats.get('centroid_x', [])
1554 centroids_y = stats.get('centroid_y', [])
1555 cell_positions = [(float(x), float(y)) for x, y in zip(centroids_x, centroids_y)]
1556 else:
1557 cell_count = 0
1558 cell_positions = []
1560 # Convert result back to numpy
1561 segmentation_mask = cle.pull(labels)
1563 return segmentation_mask, cell_count, cell_positions
1566def _create_colocalization_map(
1567 chan_1_img: np.ndarray,
1568 chan_2_img: np.ndarray,
1569 coloc_result: MultiChannelResult
1570) -> np.ndarray:
1571 """Create colocalization visualization map."""
1572 # Create RGB-like visualization
1573 coloc_map = np.zeros_like(chan_1_img)
1575 # Mark colocalized positions
1576 for x, y in coloc_result.overlap_positions:
1577 # Create markers for colocalized cells
1578 rr, cc = np.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]]
1579 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius
1581 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1])
1582 mask = mask & valid_mask
1584 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers
1586 return coloc_map