Coverage for openhcs/processing/backends/analysis/cell_counting_cpu.py: 38.8%
496 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2CPU-based cell counting and multi-channel colocalization analysis for OpenHCS.
4This module provides comprehensive cell counting capabilities using scikit-image,
5supporting both single-channel and multi-channel analysis with various detection
6methods and colocalization metrics.
7"""
9import numpy as np
10import logging
11from typing import Dict, List, Tuple, Any, Optional, Union
12from dataclasses import dataclass
13from enum import Enum
15logger = logging.getLogger(__name__)
17# Core scientific computing imports
18import pandas as pd
19import json
20from scipy import ndimage
21from scipy.spatial.distance import cdist
22from skimage.feature import blob_log, blob_dog, blob_doh, peak_local_max
23from skimage.filters import threshold_otsu, threshold_li, gaussian, median
24from skimage.segmentation import watershed, clear_border
25from skimage.morphology import remove_small_objects, disk
26from skimage.measure import label, regionprops
28# OpenHCS imports
29from openhcs.core.memory.decorators import numpy as numpy_func
30from openhcs.core.pipeline.function_contracts import special_outputs
31from openhcs.constants.constants import Backend
34class DetectionMethod(Enum):
35 """Cell detection methods available."""
36 BLOB_LOG = "blob_log" # Laplacian of Gaussian (best for round cells)
37 BLOB_DOG = "blob_dog" # Difference of Gaussian (faster, good general purpose)
38 BLOB_DOH = "blob_doh" # Determinant of Hessian (good for elongated cells)
39 WATERSHED = "watershed" # Watershed segmentation (best for touching cells)
40 THRESHOLD = "threshold" # Simple thresholding (fastest, basic)
43class ColocalizationMethod(Enum):
44 """Methods for multi-channel colocalization analysis."""
45 OVERLAP_AREA = "overlap_area" # Based on segmentation overlap
46 DISTANCE_BASED = "distance_based" # Based on centroid distances
47 INTENSITY_CORRELATION = "intensity_correlation" # Based on intensity correlation
48 MANDERS_COEFFICIENTS = "manders_coefficients" # Manders colocalization coefficients
51class ThresholdMethod(Enum):
52 """Automatic thresholding methods for watershed segmentation."""
53 OTSU = "otsu" # Otsu's method (good for bimodal histograms)
54 LI = "li" # Li's method (good for low contrast images)
55 MANUAL = "manual" # Manual threshold value (0.0-1.0)
61@dataclass
62class CellCountResult:
63 """Results for single-channel cell counting."""
64 slice_index: int
65 method: str
66 cell_count: int
67 cell_positions: List[Tuple[float, float]] # (x, y) centroids
68 cell_areas: List[float]
69 cell_intensities: List[float]
70 detection_confidence: List[float]
71 parameters_used: Dict[str, Any]
72 binary_mask: Optional[np.ndarray] = None # Actual binary mask for segmentation methods
75@dataclass
76class MultiChannelResult:
77 """Results for multi-channel cell counting and colocalization."""
78 slice_index: int
79 chan_1_results: CellCountResult
80 chan_2_results: CellCountResult
81 colocalization_method: str
82 colocalized_count: int
83 colocalization_percentage: float
84 chan_1_only_count: int
85 chan_2_only_count: int
86 colocalization_metrics: Dict[str, float]
87 overlap_positions: List[Tuple[float, float]]
90def materialize_cell_counts(data: List[Union[CellCountResult, MultiChannelResult]], path: str, filemanager, backends: Union[str, List[str]], backend_kwargs: dict = None) -> str:
91 """Materialize cell counting results as analysis-ready CSV and JSON formats.
93 Args:
94 data: List of cell count results (single or multi-channel)
95 path: Output path for results
96 filemanager: FileManager instance for I/O operations
97 backends: Single backend string or list of backends to save to
98 backend_kwargs: Dict mapping backend names to their kwargs (e.g., {'fiji_stream': {'port': 5560}})
100 Returns:
101 Path to the saved results file (first backend in list)
102 """
103 # Normalize backends to list
104 if isinstance(backends, str): 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 backends = [backends]
107 if backend_kwargs is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 backend_kwargs = {}
110 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: Called with path={path}, data_length={len(data) if data else 0}, backends={backends}")
112 # Determine if this is single-channel or multi-channel data
113 if not data: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 logger.warning("🔬 CELL_COUNT_MATERIALIZE: No data to materialize")
115 return path
117 is_multi_channel = isinstance(data[0], MultiChannelResult)
118 logger.info(f"🔬 CELL_COUNT_MATERIALIZE: is_multi_channel={is_multi_channel}")
120 if is_multi_channel: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 return _materialize_multi_channel_results(data, path, filemanager, backends, backend_kwargs)
122 else:
123 return _materialize_single_channel_results(data, path, filemanager, backends, backend_kwargs)
126def materialize_segmentation_masks(data: List[np.ndarray], path: str, filemanager, backends: Union[str, List[str]], backend_kwargs: dict = None) -> str:
127 """Materialize segmentation masks as ROIs and summary.
129 Extracts ROIs from labeled masks and saves them via backend-specific format:
130 - OMERO: Creates omero.model.RoiI objects linked to images
131 - Napari: Streams shapes layer via ZMQ
132 - Fiji: Streams ImageJ ROIs via ZMQ
133 - Disk: Saves JSON file with ROI data
135 Args:
136 data: List of labeled mask arrays (one per z-plane)
137 path: Output path for ROI data
138 filemanager: FileManager instance for I/O operations
139 backends: Single backend string or list of backends to save to
140 backend_kwargs: Dict mapping backend names to their kwargs (e.g., {'fiji_stream': {'port': 5560}})
142 Returns:
143 Path to the saved ROI file (first backend in list)
144 """
145 # Normalize backends to list
146 if isinstance(backends, str): 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 backends = [backends]
149 if backend_kwargs is None: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 backend_kwargs = {}
152 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Called with path={path}, masks_count={len(data) if data else 0}, backends={backends}")
154 if not data: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 logger.info("🔬 SEGMENTATION_MATERIALIZE: No segmentation masks to materialize (return_segmentation_mask=False)")
156 # Create empty summary file to indicate no masks were generated
157 summary_path = path.replace('.pkl', '_segmentation_summary.txt')
158 summary_content = "No segmentation masks generated (return_segmentation_mask=False)\n"
159 # Save to all backends
160 for backend in backends:
161 filemanager.save(summary_content, summary_path, backend)
162 return summary_path
164 # Extract ROIs from labeled masks (once for all backends)
165 from openhcs.core.roi import extract_rois_from_labeled_mask
167 all_rois = []
168 total_cells = 0
169 for z_idx, mask in enumerate(data):
170 rois = extract_rois_from_labeled_mask(
171 mask,
172 min_area=10, # Skip tiny regions
173 extract_contours=True # Extract polygon contours
174 )
175 all_rois.extend(rois)
176 total_cells += len(rois)
177 logger.debug(f"🔬 SEGMENTATION_MATERIALIZE: Extracted {len(rois)} ROIs from z-plane {z_idx}")
179 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Extracted {total_cells} total ROIs from {len(data)} z-planes")
181 # Save ROIs to all backends (streaming backends convert to shapes/ROI objects)
182 base_path = path.replace('.pkl', '').replace('.roi.zip', '')
183 roi_path = f"{base_path}_rois.roi.zip" # Extension determines format
185 if all_rois: 185 ↛ 192line 185 didn't jump to line 192 because the condition on line 185 was always true
186 for backend in backends:
187 # Get kwargs for this backend (empty dict if not specified)
188 kwargs = backend_kwargs.get(backend, {})
189 filemanager.save(all_rois, roi_path, backend, **kwargs)
190 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Saved {len(all_rois)} ROIs to {backend} backend")
191 else:
192 logger.warning(f"🔬 SEGMENTATION_MATERIALIZE: No ROIs extracted (all regions below min_area threshold)")
194 # Save summary to all backends (backends will ignore if they don't support text data)
195 summary_path = f"{base_path}_segmentation_summary.txt"
196 summary_content = f"Segmentation ROIs: {len(all_rois)} cells\n"
197 summary_content += f"Z-planes: {len(data)}\n"
198 if all_rois: 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true
199 summary_content += f"ROI file: {roi_path}\n"
200 else:
201 summary_content += f"No ROIs extracted (all regions below min_area threshold)\n"
203 for backend in backends:
204 # Get kwargs for this backend (empty dict if not specified)
205 kwargs = backend_kwargs.get(backend, {})
206 filemanager.save(summary_content, summary_path, backend, **kwargs)
208 logger.info(f"🔬 SEGMENTATION_MATERIALIZE: Completed, saved to {len(backends)} backends")
210 return summary_path
213@numpy_func
214@special_outputs(("cell_counts", materialize_cell_counts), ("segmentation_masks", materialize_segmentation_masks))
215def count_cells_single_channel(
216 image_stack: np.ndarray,
217 # Detection method and parameters
218 detection_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
219 # Blob detection parameters
220 min_sigma: float = 1.0, # Minimum blob size (pixels)
221 max_sigma: float = 10.0, # Maximum blob size (pixels)
222 num_sigma: int = 10, # Number of sigma values to test
223 threshold: float = 0.1, # Detection threshold (0.0-1.0)
224 overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
225 # Watershed parameters
226 watershed_footprint_size: int = 3, # Local maxima footprint size
227 watershed_min_distance: int = 5, # Minimum distance between peaks
228 watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # UI will show threshold methods
229 # Preprocessing parameters
230 enable_preprocessing: bool = True,
231 gaussian_sigma: float = 1.0, # Gaussian blur sigma
232 median_disk_size: int = 1, # Median filter disk size
233 # Filtering parameters
234 min_cell_area: int = 10, # Minimum cell area (pixels)
235 max_cell_area: int = 1000, # Maximum cell area (pixels)
236 remove_border_cells: bool = True, # Remove cells touching image border
237 # Output parameters
238 return_segmentation_mask: bool = False
239) -> Tuple[np.ndarray, List[CellCountResult]]:
240 """
241 Count cells in single-channel image stack using various detection methods.
243 Args:
244 image_stack: 3D array (Z, Y, X) where each Z slice is processed independently
245 detection_method: Method for cell detection (see DetectionMethod enum)
246 min_sigma: Minimum blob size for blob detection methods
247 max_sigma: Maximum blob size for blob detection methods
248 num_sigma: Number of sigma values to test for blob detection
249 threshold: Detection threshold (method-dependent)
250 overlap: Maximum overlap between detected blobs
251 watershed_footprint_size: Footprint size for local maxima detection
252 watershed_min_distance: Minimum distance between watershed peaks
253 watershed_threshold_method: Thresholding method for watershed
254 enable_preprocessing: Apply Gaussian and median filtering
255 gaussian_sigma: Standard deviation for Gaussian blur
256 median_disk_size: Disk size for median filtering
257 min_cell_area: Minimum area for valid cells
258 max_cell_area: Maximum area for valid cells
259 remove_border_cells: Remove cells touching image borders
260 return_segmentation_mask: Return segmentation masks in output
262 Returns:
263 output_stack: Original image stack unchanged (Z, Y, X)
264 cell_count_results: List of CellCountResult objects for each slice
265 segmentation_masks: (Special output) List of segmentation mask arrays if return_segmentation_mask=True
266 """
267 if image_stack.ndim != 3: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true
268 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
270 results = []
271 segmentation_masks = []
273 # Store parameters for reproducibility (convert enums to values)
274 parameters = {
275 "detection_method": detection_method.value,
276 "min_sigma": min_sigma,
277 "max_sigma": max_sigma,
278 "num_sigma": num_sigma,
279 "threshold": threshold,
280 "overlap": overlap,
281 "watershed_footprint_size": watershed_footprint_size,
282 "watershed_min_distance": watershed_min_distance,
283 "watershed_threshold_method": watershed_threshold_method.value,
284 "gaussian_sigma": gaussian_sigma,
285 "median_disk_size": median_disk_size,
286 "min_cell_area": min_cell_area,
287 "max_cell_area": max_cell_area,
288 "remove_border_cells": remove_border_cells
289 }
291 logging.info(f"Processing {image_stack.shape[0]} slices with {detection_method.value} method")
293 for z_idx in range(image_stack.shape[0]):
294 slice_img = image_stack[z_idx].astype(np.float64)
296 # Apply preprocessing if enabled
297 if enable_preprocessing: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 slice_img = _preprocess_image(slice_img, gaussian_sigma, median_disk_size)
300 # Detect cells using specified method
301 result = _detect_cells_single_method(
302 slice_img, z_idx, detection_method.value, parameters
303 )
305 results.append(result)
307 # Create segmentation mask if requested
308 if return_segmentation_mask: 308 ↛ 293line 308 didn't jump to line 293 because the condition on line 308 was always true
309 segmentation_mask = _create_segmentation_visualization(
310 slice_img, result.cell_positions, max_sigma, result.cell_areas, result.binary_mask
311 )
312 segmentation_masks.append(segmentation_mask)
314 # Always return segmentation masks (empty list if not requested)
315 # This ensures consistent return signature for special outputs system
316 return image_stack, results, segmentation_masks
319@numpy_func
320@special_outputs(("multi_channel_counts", materialize_cell_counts))
321def count_cells_multi_channel(
322 image_stack: np.ndarray,
323 chan_1: int, # Index of first channel (positional arg)
324 chan_2: int, # Index of second channel (positional arg)
325 # Detection parameters for channel 1 (all single-channel params available)
326 chan_1_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
327 chan_1_min_sigma: float = 1.0, # Minimum blob size (pixels)
328 chan_1_max_sigma: float = 10.0, # Maximum blob size (pixels)
329 chan_1_num_sigma: int = 10, # Number of sigma values to test
330 chan_1_threshold: float = 0.1, # Detection threshold (0.0-1.0)
331 chan_1_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
332 chan_1_watershed_footprint_size: int = 3, # Local maxima footprint size
333 chan_1_watershed_min_distance: int = 5, # Minimum distance between peaks
334 chan_1_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
335 chan_1_enable_preprocessing: bool = True, # Apply preprocessing
336 chan_1_gaussian_sigma: float = 1.0, # Gaussian blur sigma
337 chan_1_median_disk_size: int = 1, # Median filter disk size
338 chan_1_min_area: int = 10, # Minimum cell area (pixels)
339 chan_1_max_area: int = 1000, # Maximum cell area (pixels)
340 chan_1_remove_border_cells: bool = True, # Remove cells touching border
341 # Detection parameters for channel 2 (all single-channel params available)
342 chan_2_method: DetectionMethod = DetectionMethod.BLOB_LOG, # UI will show radio buttons
343 chan_2_min_sigma: float = 1.0, # Minimum blob size (pixels)
344 chan_2_max_sigma: float = 10.0, # Maximum blob size (pixels)
345 chan_2_num_sigma: int = 10, # Number of sigma values to test
346 chan_2_threshold: float = 0.1, # Detection threshold (0.0-1.0)
347 chan_2_overlap: float = 0.5, # Maximum overlap between blobs (0.0-1.0)
348 chan_2_watershed_footprint_size: int = 3, # Local maxima footprint size
349 chan_2_watershed_min_distance: int = 5, # Minimum distance between peaks
350 chan_2_watershed_threshold_method: ThresholdMethod = ThresholdMethod.OTSU, # Thresholding method
351 chan_2_enable_preprocessing: bool = True, # Apply preprocessing
352 chan_2_gaussian_sigma: float = 1.0, # Gaussian blur sigma
353 chan_2_median_disk_size: int = 1, # Median filter disk size
354 chan_2_min_area: int = 10, # Minimum cell area (pixels)
355 chan_2_max_area: int = 1000, # Maximum cell area (pixels)
356 chan_2_remove_border_cells: bool = True, # Remove cells touching border
357 # Colocalization parameters
358 colocalization_method: ColocalizationMethod = ColocalizationMethod.DISTANCE_BASED, # UI will show coloc methods
359 max_distance: float = 5.0, # Maximum distance for colocalization (pixels)
360 min_overlap_area: float = 0.3, # Minimum overlap fraction for area-based method
361 intensity_threshold: float = 0.5, # Threshold for intensity-based methods
362 # Output parameters
363 return_colocalization_map: bool = False
364) -> Tuple[np.ndarray, List[MultiChannelResult]]:
365 """
366 Count cells in multi-channel image stack with colocalization analysis.
368 Each channel can be processed with independent parameters, providing the same
369 flexibility as the single-channel function for each channel separately.
371 Args:
372 image_stack: 3D array (Z, Y, X) where Z represents different channels
373 chan_1: Index of first channel in the stack (positional)
374 chan_2: Index of second channel in the stack (positional)
376 # Channel 1 detection parameters (same as single-channel function)
377 chan_1_method: Detection method for channel 1 (DetectionMethod enum)
378 chan_1_min_sigma: Minimum blob size for channel 1
379 chan_1_max_sigma: Maximum blob size for channel 1
380 chan_1_num_sigma: Number of sigma values to test for channel 1
381 chan_1_threshold: Detection threshold for channel 1 (0.0-1.0)
382 chan_1_overlap: Maximum overlap between blobs for channel 1
383 chan_1_watershed_footprint_size: Local maxima footprint size for channel 1
384 chan_1_watershed_min_distance: Minimum distance between peaks for channel 1
385 chan_1_watershed_threshold_method: Thresholding method for channel 1
386 chan_1_enable_preprocessing: Apply preprocessing to channel 1
387 chan_1_gaussian_sigma: Gaussian blur sigma for channel 1
388 chan_1_median_disk_size: Median filter size for channel 1
389 chan_1_min_area: Minimum cell area for channel 1
390 chan_1_max_area: Maximum cell area for channel 1
391 chan_1_remove_border_cells: Remove border cells for channel 1
393 # Channel 2 detection parameters (same as single-channel function)
394 chan_2_method: Detection method for channel 2 (DetectionMethod enum)
395 chan_2_min_sigma: Minimum blob size for channel 2
396 chan_2_max_sigma: Maximum blob size for channel 2
397 chan_2_num_sigma: Number of sigma values to test for channel 2
398 chan_2_threshold: Detection threshold for channel 2 (0.0-1.0)
399 chan_2_overlap: Maximum overlap between blobs for channel 2
400 chan_2_watershed_footprint_size: Local maxima footprint size for channel 2
401 chan_2_watershed_min_distance: Minimum distance between peaks for channel 2
402 chan_2_watershed_threshold_method: Thresholding method for channel 2
403 chan_2_enable_preprocessing: Apply preprocessing to channel 2
404 chan_2_gaussian_sigma: Gaussian blur sigma for channel 2
405 chan_2_median_disk_size: Median filter size for channel 2
406 chan_2_min_area: Minimum cell area for channel 2
407 chan_2_max_area: Maximum cell area for channel 2
408 chan_2_remove_border_cells: Remove border cells for channel 2
410 # Colocalization parameters
411 colocalization_method: Method for determining colocalization (ColocalizationMethod enum)
412 max_distance: Maximum distance for distance-based colocalization (pixels)
413 min_overlap_area: Minimum overlap fraction for area-based colocalization
414 intensity_threshold: Threshold for intensity-based colocalization (0.0-1.0)
415 return_colocalization_map: Return colocalization visualization
417 Returns:
418 output_stack: Original images or colocalization maps
419 multi_channel_results: List of MultiChannelResult objects
420 """
421 if image_stack.ndim != 3:
422 raise ValueError(f"Expected 3D image stack, got {image_stack.ndim}D")
424 if chan_1 >= image_stack.shape[0] or chan_2 >= image_stack.shape[0]:
425 raise ValueError(f"Channel indices {chan_1}, {chan_2} exceed stack size {image_stack.shape[0]}")
427 if chan_1 == chan_2:
428 raise ValueError("Channel 1 and Channel 2 must be different")
430 # Extract channel images
431 chan_1_img = image_stack[chan_1:chan_1+1] # Keep 3D shape for consistency
432 chan_2_img = image_stack[chan_2:chan_2+1]
434 # Count cells in each channel separately using the single-channel function
435 # Channel 1 parameters (all explicit)
436 chan_1_params = {
437 "detection_method": chan_1_method,
438 "min_sigma": chan_1_min_sigma,
439 "max_sigma": chan_1_max_sigma,
440 "num_sigma": chan_1_num_sigma,
441 "threshold": chan_1_threshold,
442 "overlap": chan_1_overlap,
443 "watershed_footprint_size": chan_1_watershed_footprint_size,
444 "watershed_min_distance": chan_1_watershed_min_distance,
445 "watershed_threshold_method": chan_1_watershed_threshold_method,
446 "enable_preprocessing": chan_1_enable_preprocessing,
447 "gaussian_sigma": chan_1_gaussian_sigma,
448 "median_disk_size": chan_1_median_disk_size,
449 "min_cell_area": chan_1_min_area,
450 "max_cell_area": chan_1_max_area,
451 "remove_border_cells": chan_1_remove_border_cells,
452 "return_segmentation_mask": False
453 }
455 # Channel 2 parameters (all explicit)
456 chan_2_params = {
457 "detection_method": chan_2_method,
458 "min_sigma": chan_2_min_sigma,
459 "max_sigma": chan_2_max_sigma,
460 "num_sigma": chan_2_num_sigma,
461 "threshold": chan_2_threshold,
462 "overlap": chan_2_overlap,
463 "watershed_footprint_size": chan_2_watershed_footprint_size,
464 "watershed_min_distance": chan_2_watershed_min_distance,
465 "watershed_threshold_method": chan_2_watershed_threshold_method,
466 "enable_preprocessing": chan_2_enable_preprocessing,
467 "gaussian_sigma": chan_2_gaussian_sigma,
468 "median_disk_size": chan_2_median_disk_size,
469 "min_cell_area": chan_2_min_area,
470 "max_cell_area": chan_2_max_area,
471 "remove_border_cells": chan_2_remove_border_cells,
472 "return_segmentation_mask": False
473 }
475 # Process each channel
476 _, chan_1_results = count_cells_single_channel(chan_1_img, **chan_1_params)
477 _, chan_2_results = count_cells_single_channel(chan_2_img, **chan_2_params)
479 # Perform colocalization analysis
480 multi_results = []
481 output_stack = image_stack.copy()
483 # Since we're processing single slices from each channel, we only have one result each
484 chan_1_result = chan_1_results[0]
485 chan_2_result = chan_2_results[0]
487 # Analyze colocalization
488 coloc_result = _analyze_colocalization(
489 chan_1_result, chan_2_result, colocalization_method.value,
490 max_distance, min_overlap_area, intensity_threshold
491 )
493 multi_results.append(coloc_result)
495 # Create colocalization visualization if requested
496 if return_colocalization_map:
497 coloc_map = _create_colocalization_map(
498 image_stack[chan_1], image_stack[chan_2], coloc_result
499 )
500 # Replace one of the channels with the colocalization map
501 output_stack = np.stack([image_stack[chan_1], image_stack[chan_2], coloc_map])
503 return output_stack, multi_results
506def _materialize_single_channel_results(data: List[CellCountResult], path: str, filemanager, backends: Union[str, List[str]], backend_kwargs: dict = None) -> str:
507 """Materialize single-channel cell counting results."""
508 # Normalize backends to list
509 if isinstance(backends, str): 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true
510 backends = [backends]
512 if backend_kwargs is None: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true
513 backend_kwargs = {}
515 # Generate output file paths based on the input path
516 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
517 base_path = path.replace('.pkl', '')
518 json_path = f"{base_path}.json"
519 csv_path = f"{base_path}_details.csv"
521 # Ensure output directory exists for disk backend
522 from pathlib import Path
523 from openhcs.constants.constants import Backend
524 output_dir = Path(json_path).parent
525 for backend in backends:
526 if backend == Backend.DISK.value:
527 filemanager.ensure_directory(str(output_dir), backend)
529 summary = {
530 "analysis_type": "single_channel_cell_counting",
531 "total_slices": len(data),
532 "results_per_slice": []
533 }
534 rows = []
536 total_cells = 0
537 for result in data:
538 total_cells += result.cell_count
540 # Add to summary
541 summary["results_per_slice"].append({
542 "slice_index": result.slice_index,
543 "method": result.method,
544 "cell_count": result.cell_count,
545 "avg_cell_area": np.mean(result.cell_areas) if result.cell_areas else 0,
546 "avg_cell_intensity": np.mean(result.cell_intensities) if result.cell_intensities else 0,
547 "parameters": result.parameters_used
548 })
550 # Add individual cell data to CSV
551 for i, (pos, area, intensity, confidence) in enumerate(zip(
552 result.cell_positions, result.cell_areas,
553 result.cell_intensities, result.detection_confidence
554 )):
555 rows.append({
556 'slice_index': result.slice_index,
557 'cell_id': f"slice_{result.slice_index}_cell_{i}",
558 'x_position': pos[0],
559 'y_position': pos[1],
560 'cell_area': area,
561 'cell_intensity': intensity,
562 'detection_confidence': confidence,
563 'detection_method': result.method
564 })
566 summary["total_cells_all_slices"] = total_cells
567 summary["average_cells_per_slice"] = total_cells / len(data) if data else 0
569 # Save JSON summary to all backends (backends will ignore if they don't support text data)
570 json_content = json.dumps(summary, indent=2, default=str)
571 for backend in backends:
572 kwargs = backend_kwargs.get(backend, {})
573 # Only check exists/delete for storage backends (streaming backends don't support these operations)
574 try:
575 if filemanager.exists(json_path, backend): 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true
576 filemanager.delete(json_path, backend)
577 except AttributeError:
578 # Streaming backend doesn't have exists/delete - that's fine
579 pass
580 filemanager.save(json_content, json_path, backend, **kwargs)
582 # Save CSV details to all backends (backends will ignore if they don't support text data)
583 if rows: 583 ↛ 597line 583 didn't jump to line 597 because the condition on line 583 was always true
584 df = pd.DataFrame(rows)
585 csv_content = df.to_csv(index=False)
586 for backend in backends:
587 kwargs = backend_kwargs.get(backend, {})
588 # Only check exists/delete for storage backends (streaming backends don't support these operations)
589 try:
590 if filemanager.exists(csv_path, backend): 590 ↛ 591line 590 didn't jump to line 591 because the condition on line 590 was never true
591 filemanager.delete(csv_path, backend)
592 except AttributeError:
593 # Streaming backend doesn't have exists/delete - that's fine
594 pass
595 filemanager.save(csv_content, csv_path, backend, **kwargs)
597 return json_path
600def _materialize_multi_channel_results(data: List[MultiChannelResult], path: str, filemanager, backends: Union[str, List[str]], backend_kwargs: dict = None) -> str:
601 """Materialize multi-channel cell counting and colocalization results."""
602 # Normalize backends to list
603 if isinstance(backends, str):
604 backends = [backends]
606 if backend_kwargs is None:
607 backend_kwargs = {}
609 # Generate output file paths based on the input path
610 # Use clean naming: preserve namespaced path structure, don't duplicate special output key
611 base_path = path.replace('.pkl', '')
612 json_path = f"{base_path}.json"
613 csv_path = f"{base_path}_details.csv"
615 # Ensure output directory exists for disk backend
616 # Skip directory creation for OMERO virtual paths (they don't exist on disk)
617 from pathlib import Path
618 output_dir = Path(json_path).parent
619 for backend in backends:
620 if backend == Backend.DISK.value and not str(output_dir).startswith('/omero/'):
621 filemanager.ensure_directory(str(output_dir), backend)
623 summary = {
624 "analysis_type": "multi_channel_cell_counting_colocalization",
625 "total_slices": len(data),
626 "colocalization_summary": {
627 "total_chan_1_cells": 0,
628 "total_chan_2_cells": 0,
629 "total_colocalized": 0,
630 "average_colocalization_percentage": 0
631 },
632 "results_per_slice": []
633 }
635 # CSV for detailed analysis (csv_path already defined above)
636 rows = []
638 total_coloc_pct = 0
639 for result in data:
640 summary["colocalization_summary"]["total_chan_1_cells"] += result.chan_1_results.cell_count
641 summary["colocalization_summary"]["total_chan_2_cells"] += result.chan_2_results.cell_count
642 summary["colocalization_summary"]["total_colocalized"] += result.colocalized_count
643 total_coloc_pct += result.colocalization_percentage
645 # Add to summary
646 summary["results_per_slice"].append({
647 "slice_index": result.slice_index,
648 "chan_1_count": result.chan_1_results.cell_count,
649 "chan_2_count": result.chan_2_results.cell_count,
650 "colocalized_count": result.colocalized_count,
651 "colocalization_percentage": result.colocalization_percentage,
652 "chan_1_only": result.chan_1_only_count,
653 "chan_2_only": result.chan_2_only_count,
654 "colocalization_method": result.colocalization_method,
655 "colocalization_metrics": result.colocalization_metrics
656 })
658 # Add colocalization details to CSV
659 for i, pos in enumerate(result.overlap_positions):
660 rows.append({
661 'slice_index': result.slice_index,
662 'colocalization_id': f"slice_{result.slice_index}_coloc_{i}",
663 'x_position': pos[0],
664 'y_position': pos[1],
665 'colocalization_method': result.colocalization_method
666 })
668 summary["colocalization_summary"]["average_colocalization_percentage"] = (
669 total_coloc_pct / len(data) if data else 0
670 )
672 # Save JSON summary to all backends (backends will ignore if they don't support text data)
673 json_content = json.dumps(summary, indent=2, default=str)
674 for backend in backends:
675 kwargs = backend_kwargs.get(backend, {})
676 # Only check exists/delete for storage backends (streaming backends don't support these operations)
677 try:
678 if filemanager.exists(json_path, backend):
679 filemanager.delete(json_path, backend)
680 except AttributeError:
681 # Streaming backend doesn't have exists/delete - that's fine
682 pass
683 filemanager.save(json_content, json_path, backend, **kwargs)
685 # Save CSV details to all backends (backends will ignore if they don't support text data)
686 if rows:
687 df = pd.DataFrame(rows)
688 csv_content = df.to_csv(index=False)
689 for backend in backends:
690 kwargs = backend_kwargs.get(backend, {})
691 # Only check exists/delete for storage backends (streaming backends don't support these operations)
692 try:
693 if filemanager.exists(csv_path, backend):
694 filemanager.delete(csv_path, backend)
695 except AttributeError:
696 # Streaming backend doesn't have exists/delete - that's fine
697 pass
698 filemanager.save(csv_content, csv_path, backend, **kwargs)
700 return json_path
703def _preprocess_image(image: np.ndarray, gaussian_sigma: float, median_disk_size: int) -> np.ndarray:
704 """Apply preprocessing to enhance cell detection."""
705 # Gaussian blur to reduce noise
706 if gaussian_sigma > 0:
707 image = gaussian(image, sigma=gaussian_sigma, preserve_range=True)
709 # Median filter to remove salt-and-pepper noise
710 if median_disk_size > 0:
711 image = median(image, disk(median_disk_size))
713 return image
716def _detect_cells_single_method(
717 image: np.ndarray,
718 slice_idx: int,
719 method: str,
720 params: Dict[str, Any]
721) -> CellCountResult:
722 """Detect cells using specified method."""
724 if method == DetectionMethod.BLOB_LOG.value: 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true
725 return _detect_cells_blob_log(image, slice_idx, params)
726 elif method == DetectionMethod.BLOB_DOG.value: 726 ↛ 727line 726 didn't jump to line 727 because the condition on line 726 was never true
727 return _detect_cells_blob_dog(image, slice_idx, params)
728 elif method == DetectionMethod.BLOB_DOH.value: 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true
729 return _detect_cells_blob_doh(image, slice_idx, params)
730 elif method == DetectionMethod.WATERSHED.value: 730 ↛ 732line 730 didn't jump to line 732 because the condition on line 730 was always true
731 return _detect_cells_watershed(image, slice_idx, params)
732 elif method == DetectionMethod.THRESHOLD.value:
733 return _detect_cells_threshold(image, slice_idx, params)
734 else:
735 raise ValueError(f"Unknown detection method: {method}")
738def _detect_cells_blob_log(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
739 """Detect cells using Laplacian of Gaussian blob detection."""
740 blobs = blob_log(
741 image,
742 min_sigma=params["min_sigma"],
743 max_sigma=params["max_sigma"],
744 num_sigma=params["num_sigma"],
745 threshold=params["threshold"],
746 overlap=params["overlap"]
747 )
749 # Extract positions, areas, and intensities
750 positions = []
751 areas = []
752 intensities = []
753 confidences = []
755 for blob in blobs:
756 y, x, sigma = blob
757 positions.append((float(x), float(y)))
759 # Estimate area from sigma (blob radius ≈ sigma * sqrt(2))
760 radius = sigma * np.sqrt(2)
761 area = np.pi * radius**2
762 areas.append(float(area))
764 # Sample intensity at blob center
765 intensity = float(image[int(y), int(x)])
766 intensities.append(intensity)
768 # Use sigma as confidence measure (larger blobs = higher confidence)
769 confidence = float(sigma / params["max_sigma"])
770 confidences.append(confidence)
772 # Filter by area constraints
773 filtered_data = _filter_by_area(
774 positions, areas, intensities, confidences,
775 params["min_cell_area"], params["max_cell_area"]
776 )
778 return CellCountResult(
779 slice_index=slice_idx,
780 method="blob_log",
781 cell_count=len(filtered_data[0]),
782 cell_positions=filtered_data[0],
783 cell_areas=filtered_data[1],
784 cell_intensities=filtered_data[2],
785 detection_confidence=filtered_data[3],
786 parameters_used=params
787 )
790def _detect_cells_blob_dog(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
791 """Detect cells using Difference of Gaussian blob detection."""
792 blobs = blob_dog(
793 image,
794 min_sigma=params["min_sigma"],
795 max_sigma=params["max_sigma"],
796 threshold=params["threshold"],
797 overlap=params["overlap"]
798 )
800 # Process similar to blob_log
801 positions = []
802 areas = []
803 intensities = []
804 confidences = []
806 for blob in blobs:
807 y, x, sigma = blob
808 positions.append((float(x), float(y)))
810 radius = sigma * np.sqrt(2)
811 area = np.pi * radius**2
812 areas.append(float(area))
814 intensity = float(image[int(y), int(x)])
815 intensities.append(intensity)
817 confidence = float(sigma / params["max_sigma"])
818 confidences.append(confidence)
820 filtered_data = _filter_by_area(
821 positions, areas, intensities, confidences,
822 params["min_cell_area"], params["max_cell_area"]
823 )
825 return CellCountResult(
826 slice_index=slice_idx,
827 method="blob_dog",
828 cell_count=len(filtered_data[0]),
829 cell_positions=filtered_data[0],
830 cell_areas=filtered_data[1],
831 cell_intensities=filtered_data[2],
832 detection_confidence=filtered_data[3],
833 parameters_used=params
834 )
837def _detect_cells_blob_doh(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
838 """Detect cells using Determinant of Hessian blob detection."""
839 blobs = blob_doh(
840 image,
841 min_sigma=params["min_sigma"],
842 max_sigma=params["max_sigma"],
843 num_sigma=params["num_sigma"],
844 threshold=params["threshold"],
845 overlap=params["overlap"]
846 )
848 # Process similar to other blob methods
849 positions = []
850 areas = []
851 intensities = []
852 confidences = []
854 for blob in blobs:
855 y, x, sigma = blob
856 positions.append((float(x), float(y)))
858 radius = sigma * np.sqrt(2)
859 area = np.pi * radius**2
860 areas.append(float(area))
862 intensity = float(image[int(y), int(x)])
863 intensities.append(intensity)
865 confidence = float(sigma / params["max_sigma"])
866 confidences.append(confidence)
868 filtered_data = _filter_by_area(
869 positions, areas, intensities, confidences,
870 params["min_cell_area"], params["max_cell_area"]
871 )
873 return CellCountResult(
874 slice_index=slice_idx,
875 method="blob_doh",
876 cell_count=len(filtered_data[0]),
877 cell_positions=filtered_data[0],
878 cell_areas=filtered_data[1],
879 cell_intensities=filtered_data[2],
880 detection_confidence=filtered_data[3],
881 parameters_used=params
882 )
885def _filter_by_area(
886 positions: List[Tuple[float, float]],
887 areas: List[float],
888 intensities: List[float],
889 confidences: List[float],
890 min_area: float,
891 max_area: float
892) -> Tuple[List[Tuple[float, float]], List[float], List[float], List[float]]:
893 """Filter detected cells by area constraints."""
894 filtered_positions = []
895 filtered_areas = []
896 filtered_intensities = []
897 filtered_confidences = []
899 for pos, area, intensity, confidence in zip(positions, areas, intensities, confidences):
900 if min_area <= area <= max_area:
901 filtered_positions.append(pos)
902 filtered_areas.append(area)
903 filtered_intensities.append(intensity)
904 filtered_confidences.append(confidence)
906 return filtered_positions, filtered_areas, filtered_intensities, filtered_confidences
909def _detect_cells_watershed(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
910 """Detect cells using watershed segmentation."""
911 # Determine threshold
912 if params["watershed_threshold_method"] == "otsu": 912 ↛ 914line 912 didn't jump to line 914 because the condition on line 912 was always true
913 threshold_val = threshold_otsu(image)
914 elif params["watershed_threshold_method"] == "li":
915 threshold_val = threshold_li(image)
916 else:
917 threshold_val = float(params["watershed_threshold_method"])
919 # Create binary mask
920 binary = image > threshold_val
922 # Remove small objects and border objects
923 binary = remove_small_objects(binary, min_size=params["min_cell_area"])
924 if params["remove_border_cells"]: 924 ↛ 928line 924 didn't jump to line 928 because the condition on line 924 was always true
925 binary = clear_border(binary)
927 # Find local maxima as seeds
928 distance = ndimage.distance_transform_edt(binary)
929 local_maxima = peak_local_max(
930 distance,
931 min_distance=params["watershed_min_distance"],
932 footprint=np.ones((params["watershed_footprint_size"], params["watershed_footprint_size"]))
933 )
935 # Convert coordinates to binary mask
936 local_maxima_mask = np.zeros_like(distance, dtype=bool)
937 if len(local_maxima) > 0: 937 ↛ 942line 937 didn't jump to line 942 because the condition on line 937 was always true
938 local_maxima_mask[local_maxima[:, 0], local_maxima[:, 1]] = True
940 # Create markers for watershed
941 # Convert boolean mask to integer labels for connected components
942 markers = label(local_maxima_mask.astype(np.uint8))
944 # Apply watershed
945 labels = watershed(-distance, markers, mask=binary)
947 # Extract region properties
948 regions = regionprops(labels, intensity_image=image)
950 positions = []
951 areas = []
952 intensities = []
953 confidences = []
954 valid_labels = [] # Track which labels pass the size filter
956 for region in regions:
957 # Filter by area
958 if params["min_cell_area"] <= region.area <= params["max_cell_area"]:
959 # Centroid (note: regionprops returns (row, col) = (y, x))
960 y, x = region.centroid
961 positions.append((float(x), float(y)))
963 areas.append(float(region.area))
964 intensities.append(float(region.mean_intensity))
966 # Use area as confidence measure (normalized)
967 confidence = min(1.0, region.area / params["max_cell_area"])
968 confidences.append(confidence)
970 # Track this label as valid
971 valid_labels.append(region.label)
973 # Create filtered binary mask with only cells that passed size filter
974 filtered_binary_mask = np.isin(labels, valid_labels)
976 return CellCountResult(
977 slice_index=slice_idx,
978 method="watershed",
979 cell_count=len(positions),
980 cell_positions=positions,
981 cell_areas=areas,
982 cell_intensities=intensities,
983 detection_confidence=confidences,
984 parameters_used=params,
985 binary_mask=filtered_binary_mask # Only cells that passed all filters
986 )
989def _detect_cells_threshold(image: np.ndarray, slice_idx: int, params: Dict[str, Any]) -> CellCountResult:
990 """Detect cells using simple thresholding and connected components."""
991 # Apply threshold
992 binary = image > params["threshold"] * image.max()
994 # Remove small objects and border objects
995 binary = remove_small_objects(binary, min_size=params["min_cell_area"])
996 if params["remove_border_cells"]:
997 binary = clear_border(binary)
999 # Label connected components
1000 labels = label(binary)
1001 regions = regionprops(labels, intensity_image=image)
1003 positions = []
1004 areas = []
1005 intensities = []
1006 confidences = []
1007 valid_labels = [] # Track which labels pass the size filter
1009 for region in regions:
1010 # Filter by area
1011 if params["min_cell_area"] <= region.area <= params["max_cell_area"]:
1012 y, x = region.centroid
1013 positions.append((float(x), float(y)))
1015 areas.append(float(region.area))
1016 intensities.append(float(region.mean_intensity))
1018 # Use intensity as confidence measure
1019 confidence = float(region.mean_intensity / image.max())
1020 confidences.append(confidence)
1022 # Track this label as valid
1023 valid_labels.append(region.label)
1025 # Create filtered binary mask with only cells that passed size filter
1026 filtered_binary_mask = np.isin(labels, valid_labels)
1028 return CellCountResult(
1029 slice_index=slice_idx,
1030 method="threshold",
1031 cell_count=len(positions),
1032 cell_positions=positions,
1033 cell_areas=areas,
1034 cell_intensities=intensities,
1035 detection_confidence=confidences,
1036 parameters_used=params,
1037 binary_mask=filtered_binary_mask # Only cells that passed all filters
1038 )
1041def _analyze_colocalization(
1042 chan_1_result: CellCountResult,
1043 chan_2_result: CellCountResult,
1044 method: str,
1045 max_distance: float,
1046 min_overlap_area: float,
1047 intensity_threshold: float
1048) -> MultiChannelResult:
1049 """Analyze colocalization between two channels."""
1051 if method == ColocalizationMethod.DISTANCE_BASED.value:
1052 return _colocalization_distance_based(
1053 chan_1_result, chan_2_result, max_distance
1054 )
1055 elif method == ColocalizationMethod.OVERLAP_AREA.value:
1056 return _colocalization_overlap_based(
1057 chan_1_result, chan_2_result, min_overlap_area
1058 )
1059 elif method == ColocalizationMethod.INTENSITY_CORRELATION.value:
1060 return _colocalization_intensity_based(
1061 chan_1_result, chan_2_result, intensity_threshold
1062 )
1063 elif method == ColocalizationMethod.MANDERS_COEFFICIENTS.value:
1064 return _colocalization_manders(
1065 chan_1_result, chan_2_result, intensity_threshold
1066 )
1067 else:
1068 raise ValueError(f"Unknown colocalization method: {method}")
1071def _colocalization_distance_based(
1072 chan_1_result: CellCountResult,
1073 chan_2_result: CellCountResult,
1074 max_distance: float
1075) -> MultiChannelResult:
1076 """Perform distance-based colocalization analysis."""
1078 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1079 return _create_empty_coloc_result(chan_1_result, chan_2_result, "distance_based")
1081 # Convert positions to arrays
1082 pos_1 = np.array(chan_1_result.cell_positions)
1083 pos_2 = np.array(chan_2_result.cell_positions)
1085 # Calculate pairwise distances
1086 distances = cdist(pos_1, pos_2)
1088 # Find colocalized pairs
1089 colocalized_pairs = []
1090 used_chan_2 = set()
1092 for i in range(len(pos_1)):
1093 # Find closest cell in channel 2
1094 min_dist_idx = np.argmin(distances[i])
1095 min_dist = distances[i, min_dist_idx]
1097 # Check if within distance threshold and not already used
1098 if min_dist <= max_distance and min_dist_idx not in used_chan_2:
1099 colocalized_pairs.append((i, min_dist_idx))
1100 used_chan_2.add(min_dist_idx)
1102 # Calculate metrics
1103 colocalized_count = len(colocalized_pairs)
1104 total_cells = len(pos_1) + len(pos_2)
1105 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1107 chan_1_only = len(pos_1) - colocalized_count
1108 chan_2_only = len(pos_2) - colocalized_count
1110 # Extract colocalized positions (average of paired positions)
1111 overlap_positions = []
1112 for i, j in colocalized_pairs:
1113 avg_pos = ((pos_1[i] + pos_2[j]) / 2).tolist()
1114 overlap_positions.append(tuple(avg_pos))
1116 # Calculate additional metrics
1117 if colocalized_pairs:
1118 avg_distance = np.mean([distances[i, j] for i, j in colocalized_pairs])
1119 max_distance_found = np.max([distances[i, j] for i, j in colocalized_pairs])
1120 else:
1121 avg_distance = 0
1122 max_distance_found = 0
1124 metrics = {
1125 "average_colocalization_distance": float(avg_distance),
1126 "max_colocalization_distance": float(max_distance_found),
1127 "distance_threshold_used": max_distance
1128 }
1130 return MultiChannelResult(
1131 slice_index=chan_1_result.slice_index,
1132 chan_1_results=chan_1_result,
1133 chan_2_results=chan_2_result,
1134 colocalization_method="distance_based",
1135 colocalized_count=colocalized_count,
1136 colocalization_percentage=colocalization_percentage,
1137 chan_1_only_count=chan_1_only,
1138 chan_2_only_count=chan_2_only,
1139 colocalization_metrics=metrics,
1140 overlap_positions=overlap_positions
1141 )
1144def _create_empty_coloc_result(
1145 chan_1_result: CellCountResult,
1146 chan_2_result: CellCountResult,
1147 method: str
1148) -> MultiChannelResult:
1149 """Create empty colocalization result when no cells found."""
1150 return MultiChannelResult(
1151 slice_index=chan_1_result.slice_index,
1152 chan_1_results=chan_1_result,
1153 chan_2_results=chan_2_result,
1154 colocalization_method=method,
1155 colocalized_count=0,
1156 colocalization_percentage=0.0,
1157 chan_1_only_count=chan_1_result.cell_count,
1158 chan_2_only_count=chan_2_result.cell_count,
1159 colocalization_metrics={},
1160 overlap_positions=[]
1161 )
1164def _colocalization_overlap_based(
1165 chan_1_result: CellCountResult,
1166 chan_2_result: CellCountResult,
1167 min_overlap_area: float
1168) -> MultiChannelResult:
1169 """Perform area overlap-based colocalization analysis."""
1170 # This is a simplified implementation - in practice, you'd need actual segmentation masks
1171 # For now, we'll use distance as a proxy for overlap
1173 # Use distance-based method with smaller threshold as approximation
1174 distance_threshold = 2.0 # Assume cells must be very close to overlap significantly
1176 result = _colocalization_distance_based(chan_1_result, chan_2_result, distance_threshold)
1177 result.colocalization_method = "overlap_area"
1178 result.colocalization_metrics["min_overlap_threshold"] = min_overlap_area
1179 result.colocalization_metrics["note"] = "Approximated using distance-based method"
1181 return result
1184def _colocalization_intensity_based(
1185 chan_1_result: CellCountResult,
1186 chan_2_result: CellCountResult,
1187 intensity_threshold: float
1188) -> MultiChannelResult:
1189 """Perform intensity correlation-based colocalization analysis."""
1191 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1192 return _create_empty_coloc_result(chan_1_result, chan_2_result, "intensity_correlation")
1194 # Use distance-based pairing first
1195 distance_result = _colocalization_distance_based(chan_1_result, chan_2_result, 5.0)
1197 # Filter pairs based on intensity correlation
1198 colocalized_pairs = []
1199 overlap_positions = []
1201 pos_1 = np.array(chan_1_result.cell_positions)
1202 pos_2 = np.array(chan_2_result.cell_positions)
1204 for i, (x1, y1) in enumerate(chan_1_result.cell_positions):
1205 for j, (x2, y2) in enumerate(chan_2_result.cell_positions):
1206 # Calculate distance
1207 dist = np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
1209 if dist <= 5.0: # Within reasonable distance
1210 # Check intensity correlation
1211 int_1 = chan_1_result.cell_intensities[i]
1212 int_2 = chan_2_result.cell_intensities[j]
1214 # Simple intensity correlation: both above threshold
1215 if int_1 >= intensity_threshold and int_2 >= intensity_threshold:
1216 colocalized_pairs.append((i, j))
1217 avg_pos = ((x1 + x2) / 2, (y1 + y2) / 2)
1218 overlap_positions.append(avg_pos)
1219 break # One-to-one mapping
1221 colocalized_count = len(colocalized_pairs)
1222 total_cells = len(pos_1) + len(pos_2)
1223 colocalization_percentage = (2 * colocalized_count / total_cells * 100) if total_cells > 0 else 0
1225 metrics = {
1226 "intensity_threshold_used": intensity_threshold,
1227 "correlation_method": "threshold_based"
1228 }
1230 return MultiChannelResult(
1231 slice_index=chan_1_result.slice_index,
1232 chan_1_results=chan_1_result,
1233 chan_2_results=chan_2_result,
1234 colocalization_method="intensity_correlation",
1235 colocalized_count=colocalized_count,
1236 colocalization_percentage=colocalization_percentage,
1237 chan_1_only_count=len(pos_1) - colocalized_count,
1238 chan_2_only_count=len(pos_2) - colocalized_count,
1239 colocalization_metrics=metrics,
1240 overlap_positions=overlap_positions
1241 )
1244def _colocalization_manders(
1245 chan_1_result: CellCountResult,
1246 chan_2_result: CellCountResult,
1247 intensity_threshold: float
1248) -> MultiChannelResult:
1249 """Calculate Manders colocalization coefficients."""
1251 if not chan_1_result.cell_positions or not chan_2_result.cell_positions:
1252 return _create_empty_coloc_result(chan_1_result, chan_2_result, "manders_coefficients")
1254 # Simplified Manders calculation based on detected cells
1255 # In practice, this would use pixel-level intensity analysis
1257 # Use intensity-based method as foundation
1258 intensity_result = _colocalization_intensity_based(
1259 chan_1_result, chan_2_result, intensity_threshold
1260 )
1262 # Calculate Manders-like coefficients
1263 total_int_1 = sum(chan_1_result.cell_intensities)
1264 total_int_2 = sum(chan_2_result.cell_intensities)
1266 # Simplified: assume colocalized cells contribute their full intensity
1267 coloc_int_1 = sum(chan_1_result.cell_intensities[i] for i, j in
1268 [(i, j) for i in range(len(chan_1_result.cell_positions))
1269 for j in range(len(chan_2_result.cell_positions))
1270 if (i, j) in [(0, 0)]]) # Simplified placeholder
1272 # Manders coefficients (M1 and M2)
1273 m1 = coloc_int_1 / total_int_1 if total_int_1 > 0 else 0
1274 m2 = coloc_int_1 / total_int_2 if total_int_2 > 0 else 0 # Simplified
1276 intensity_result.colocalization_method = "manders_coefficients"
1277 intensity_result.colocalization_metrics.update({
1278 "manders_m1": m1,
1279 "manders_m2": m2,
1280 "note": "Simplified cell-based Manders calculation"
1281 })
1283 return intensity_result
1286def _create_segmentation_visualization(
1287 image: np.ndarray,
1288 positions: List[Tuple[float, float]],
1289 max_sigma: float,
1290 cell_areas: List[float] = None,
1291 binary_mask: np.ndarray = None
1292) -> np.ndarray:
1293 """Create segmentation visualization using actual binary mask if available."""
1295 # If we have the actual binary mask from detection, use it directly
1296 if binary_mask is not None: 1296 ↛ 1303line 1296 didn't jump to line 1303 because the condition on line 1296 was always true
1297 # Convert boolean mask to uint16 to match input image dtype
1298 # Use max intensity for detected cells, 0 for background
1299 max_intensity = image.max() if image.max() > 0 else 65535
1300 return (binary_mask * max_intensity).astype(image.dtype)
1302 # Fallback to original circular marker approach for blob methods
1303 visualization = image.copy()
1305 # Mark detected cells with their actual sizes
1306 for i, (x, y) in enumerate(positions):
1307 # Use actual cell area if available, otherwise fall back to max_sigma
1308 if cell_areas and i < len(cell_areas):
1309 # Convert area to radius (assuming circular cells)
1310 radius = np.sqrt(cell_areas[i] / np.pi)
1311 else:
1312 # Fallback to max_sigma for backward compatibility
1313 radius = max_sigma * 2
1315 # Create circular markers with actual cell size
1316 rr, cc = np.ogrid[:image.shape[0], :image.shape[1]]
1317 mask = (rr - y)**2 + (cc - x)**2 <= radius**2
1319 # Ensure indices are within bounds
1320 valid_mask = (rr >= 0) & (rr < image.shape[0]) & (cc >= 0) & (cc < image.shape[1])
1321 mask = mask & valid_mask
1323 visualization[mask] = visualization.max() # Bright markers
1325 return visualization
1328def _create_colocalization_map(
1329 chan_1_img: np.ndarray,
1330 chan_2_img: np.ndarray,
1331 coloc_result: MultiChannelResult
1332) -> np.ndarray:
1333 """Create colocalization visualization map."""
1334 # Create RGB-like visualization
1335 coloc_map = np.zeros_like(chan_1_img)
1337 # Mark colocalized positions
1338 for x, y in coloc_result.overlap_positions:
1339 # Create markers for colocalized cells
1340 rr, cc = np.ogrid[:chan_1_img.shape[0], :chan_1_img.shape[1]]
1341 mask = (rr - y)**2 + (cc - x)**2 <= 25 # 5-pixel radius
1343 valid_mask = (rr >= 0) & (rr < chan_1_img.shape[0]) & (cc >= 0) & (cc < chan_1_img.shape[1])
1344 mask = mask & valid_mask
1346 coloc_map[mask] = chan_1_img.max() # Bright colocalization markers
1348 return coloc_map