Coverage for openhcs/processing/backends/analysis/consolidate_special_outputs.py: 12.8%
160 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Generic consolidation of OpenHCS special outputs into summary tables.
4This module provides a generic function for consolidating any CSV-based special outputs
5from OpenHCS analysis pipelines into summary tables. It automatically detects well-based
6naming patterns and creates comprehensive summary statistics.
8Follows OpenHCS architectural principles:
9- Uses FileManager for all I/O operations
10- Proper memory type decorators
11- Special I/O integration
12- Fail-loud behavior
13- Stateless design
14"""
16import numpy as np
17import pandas as pd
18import re
19import logging
20from pathlib import Path
21from typing import Dict, List, Tuple, Any, Optional, Union
22from enum import Enum
24from openhcs.core.memory.decorators import numpy as numpy_func
25from openhcs.core.pipeline.function_contracts import special_outputs, special_inputs
26from openhcs.constants.constants import Backend
28logger = logging.getLogger(__name__)
31class AggregationStrategy(Enum):
32 """Aggregation strategies for different data types."""
33 NUMERIC = "numeric"
34 CATEGORICAL = "categorical"
35 BOOLEAN = "boolean"
36 MIXED = "mixed"
39class WellPatternType(Enum):
40 """Common well ID patterns for different plate formats."""
41 STANDARD_96 = r"([A-H]\d{2})" # A01, B02, etc.
42 STANDARD_384 = r"([A-P]\d{2})" # A01-P24
43 CUSTOM = "custom"
46def materialize_consolidated_summary(
47 data: Dict[str, Any],
48 output_path: str,
49 filemanager,
50 well_id: str
51) -> str:
52 """
53 Materialize consolidated summary data to CSV file.
55 Args:
56 data: Dictionary containing consolidated summary data
57 output_path: Path where CSV should be saved
58 filemanager: OpenHCS FileManager instance
59 well_id: Well identifier
61 Returns:
62 Path to saved CSV file
63 """
64 try:
65 # Convert to DataFrame
66 if 'summary_table' in data:
67 df = pd.DataFrame(data['summary_table'])
68 else:
69 # Fallback: create DataFrame from raw data
70 df = pd.DataFrame([data])
72 # Generate CSV content
73 csv_content = df.to_csv(index=False)
75 # Save using FileManager (remove existing first if present)
76 if filemanager.exists(output_path, Backend.DISK.value):
77 filemanager.delete(output_path, Backend.DISK.value)
79 filemanager.save(csv_content, output_path, Backend.DISK.value)
81 logger.info(f"Materialized consolidated summary to {output_path}")
82 return output_path
84 except Exception as e:
85 logger.error(f"Failed to materialize consolidated summary: {e}")
86 raise
89def materialize_detailed_report(
90 data: Dict[str, Any],
91 output_path: str,
92 filemanager,
93 well_id: str
94) -> str:
95 """
96 Materialize detailed analysis report to text file.
98 Args:
99 data: Dictionary containing analysis data
100 output_path: Path where report should be saved
101 filemanager: OpenHCS FileManager instance
102 well_id: Well identifier
104 Returns:
105 Path to saved report file
106 """
107 try:
108 report_lines = []
109 report_lines.append("="*80)
110 report_lines.append("OPENHCS SPECIAL OUTPUTS CONSOLIDATION REPORT")
111 report_lines.append("="*80)
113 if 'metadata' in data:
114 metadata = data['metadata']
115 report_lines.append(f"Analysis timestamp: {metadata.get('timestamp', 'Unknown')}")
116 report_lines.append(f"Total wells processed: {metadata.get('total_wells', 0)}")
117 report_lines.append(f"Output types detected: {metadata.get('output_types', [])}")
118 report_lines.append("")
120 if 'summary_stats' in data:
121 stats = data['summary_stats']
122 report_lines.append("SUMMARY STATISTICS:")
123 report_lines.append("-" * 40)
124 for output_type, type_stats in stats.items():
125 report_lines.append(f"\n{output_type.upper()}:")
126 for metric, value in type_stats.items():
127 if isinstance(value, float):
128 report_lines.append(f" {metric}: {value:.3f}")
129 else:
130 report_lines.append(f" {metric}: {value}")
132 report_lines.append("\n" + "="*80)
134 # Save report
135 report_content = "\n".join(report_lines)
137 if filemanager.exists(output_path, Backend.DISK.value):
138 filemanager.delete(output_path, Backend.DISK.value)
140 filemanager.save(report_content, output_path, Backend.DISK.value)
142 logger.info(f"Materialized detailed report to {output_path}")
143 return output_path
145 except Exception as e:
146 logger.error(f"Failed to materialize detailed report: {e}")
147 raise
150def extract_well_id(filename: str, pattern: str = WellPatternType.STANDARD_96.value) -> Optional[str]:
151 """
152 Extract well ID from filename using regex pattern.
154 Args:
155 filename: Name of the file
156 pattern: Regex pattern for well ID extraction
158 Returns:
159 Well ID if found, None otherwise
160 """
161 match = re.search(pattern, filename)
162 return match.group(1) if match else None
165def detect_aggregation_strategy(series: pd.Series) -> AggregationStrategy:
166 """
167 Automatically detect the appropriate aggregation strategy for a data series.
169 Args:
170 series: Pandas series to analyze
172 Returns:
173 Appropriate aggregation strategy
174 """
175 # Check if boolean
176 if series.dtype == bool or set(series.dropna().unique()).issubset({0, 1, True, False}):
177 return AggregationStrategy.BOOLEAN
179 # Check if numeric
180 if pd.api.types.is_numeric_dtype(series):
181 return AggregationStrategy.NUMERIC
183 # Check if categorical (string/object with limited unique values)
184 unique_ratio = len(series.unique()) / len(series)
185 if unique_ratio < 0.5: # Less than 50% unique values suggests categorical
186 return AggregationStrategy.CATEGORICAL
188 return AggregationStrategy.MIXED
191def aggregate_series(series: pd.Series, strategy: AggregationStrategy) -> Dict[str, Any]:
192 """
193 Aggregate a pandas series based on the specified strategy.
195 Args:
196 series: Series to aggregate
197 strategy: Aggregation strategy to use
199 Returns:
200 Dictionary of aggregated statistics
201 """
202 result = {}
204 if strategy == AggregationStrategy.NUMERIC:
205 result.update({
206 'count': len(series),
207 'mean': series.mean(),
208 'std': series.std(),
209 'min': series.min(),
210 'max': series.max(),
211 'sum': series.sum(),
212 'median': series.median()
213 })
215 elif strategy == AggregationStrategy.BOOLEAN:
216 result.update({
217 'count': len(series),
218 'true_count': series.sum(),
219 'false_count': len(series) - series.sum(),
220 'true_percentage': (series.sum() / len(series)) * 100
221 })
223 elif strategy == AggregationStrategy.CATEGORICAL:
224 value_counts = series.value_counts()
225 result.update({
226 'count': len(series),
227 'unique_values': len(series.unique()),
228 'most_common': value_counts.index[0] if len(value_counts) > 0 else None,
229 'most_common_count': value_counts.iloc[0] if len(value_counts) > 0 else 0,
230 'unique_values_list': ','.join(map(str, series.unique()))
231 })
233 else: # MIXED
234 result.update({
235 'count': len(series),
236 'unique_values': len(series.unique()),
237 'data_type': str(series.dtype)
238 })
240 return result
243@numpy_func
244@special_outputs(
245 ("consolidated_summary", materialize_consolidated_summary),
246 ("detailed_report", materialize_detailed_report)
247)
248def consolidate_special_outputs(
249 image_stack: np.ndarray,
250 results_directory: str,
251 well_pattern: str = WellPatternType.STANDARD_96.value,
252 file_extensions: List[str] = [".csv"],
253 include_patterns: Optional[List[str]] = None,
254 exclude_patterns: Optional[List[str]] = None,
255 custom_aggregations: Optional[Dict[str, Dict[str, str]]] = None
256) -> Tuple[np.ndarray, Dict[str, Any], Dict[str, Any]]:
257 """
258 Consolidate special outputs from OpenHCS analysis into summary tables.
260 This function automatically detects CSV files with well-based naming patterns,
261 groups them by output type, and creates comprehensive summary statistics.
263 Args:
264 image_stack: Input image stack (dummy for OpenHCS compatibility)
265 results_directory: Directory containing special output files
266 well_pattern: Regex pattern for extracting well IDs
267 file_extensions: List of file extensions to process
268 include_patterns: Optional list of filename patterns to include
269 exclude_patterns: Optional list of filename patterns to exclude
270 custom_aggregations: Optional custom aggregation rules per output type
272 Returns:
273 Tuple of (image_stack, consolidated_summary, detailed_report)
274 """
275 from openhcs.io.filemanager import FileManager
276 from openhcs.io.base import storage_registry
277 from datetime import datetime
279 # Initialize FileManager
280 filemanager = FileManager(storage_registry)
282 logger.info(f"Consolidating special outputs from: {results_directory}")
284 # Find all relevant files
285 all_files = []
286 for ext in file_extensions:
287 pattern = f"*{ext}"
288 files = filemanager.list_files(results_directory, Backend.DISK.value, pattern=pattern, recursive=False)
289 all_files.extend(files)
291 logger.info(f"Found {len(all_files)} files with extensions {file_extensions}")
293 # Apply include/exclude filters
294 if include_patterns:
295 all_files = [f for f in all_files if any(re.search(pattern, Path(f).name) for pattern in include_patterns)]
297 if exclude_patterns:
298 all_files = [f for f in all_files if not any(re.search(pattern, Path(f).name) for pattern in exclude_patterns)]
300 logger.info(f"After filtering: {len(all_files)} files to process")
302 # Group files by well ID and output type
303 wells_data = {}
304 output_types = set()
306 for file_path in all_files:
307 filename = Path(file_path).name
308 well_id = extract_well_id(filename, well_pattern)
310 if not well_id:
311 logger.warning(f"Could not extract well ID from {filename}, skipping")
312 continue
314 # Extract output type (everything after well ID and before extension)
315 output_type = filename.replace(f"{well_id}_", "").replace(Path(filename).suffix, "")
316 output_types.add(output_type)
318 if well_id not in wells_data:
319 wells_data[well_id] = {}
321 wells_data[well_id][output_type] = file_path
323 logger.info(f"Processing {len(wells_data)} wells with output types: {sorted(output_types)}")
325 # Process each output type and create summary statistics
326 summary_table = []
327 summary_stats = {}
329 for output_type in sorted(output_types):
330 logger.info(f"Processing output type: {output_type}")
332 # Collect data for this output type across all wells
333 type_data = []
334 wells_with_type = []
336 for well_id, well_files in wells_data.items():
337 if output_type in well_files:
338 try:
339 file_path = well_files[output_type]
340 df = pd.read_csv(file_path)
342 # Create well-level summary
343 well_summary = {'well_id': well_id, 'output_type': output_type}
345 # Aggregate each column
346 for col in df.columns:
347 if col in ['well_id', 'output_type']:
348 continue
350 strategy = detect_aggregation_strategy(df[col])
351 col_stats = aggregate_series(df[col], strategy)
353 # Prefix column stats with column name
354 for stat_name, stat_value in col_stats.items():
355 well_summary[f"{col}_{stat_name}"] = stat_value
357 type_data.append(well_summary)
358 wells_with_type.append(well_id)
360 except Exception as e:
361 logger.error(f"Error processing {file_path}: {e}")
362 continue
364 # Add to summary table
365 summary_table.extend(type_data)
367 # Create type-level statistics
368 if type_data:
369 type_df = pd.DataFrame(type_data)
370 type_stats = {
371 'wells_count': len(wells_with_type),
372 'wells_list': ','.join(sorted(wells_with_type))
373 }
375 # Add aggregate statistics for numeric columns
376 numeric_cols = type_df.select_dtypes(include=[np.number]).columns
377 for col in numeric_cols:
378 if col != 'well_id':
379 type_stats[f"{col}_mean"] = type_df[col].mean()
380 type_stats[f"{col}_std"] = type_df[col].std()
382 summary_stats[output_type] = type_stats
384 # Create consolidated summary
385 consolidated_summary = {
386 'summary_table': summary_table,
387 'metadata': {
388 'timestamp': datetime.now().isoformat(),
389 'total_wells': len(wells_data),
390 'output_types': sorted(output_types),
391 'total_files_processed': len(all_files),
392 'well_pattern': well_pattern
393 }
394 }
396 # Create detailed report
397 detailed_report = {
398 'summary_stats': summary_stats,
399 'metadata': consolidated_summary['metadata']
400 }
402 logger.info(f"Consolidation complete: {len(summary_table)} well-output combinations processed")
404 return image_stack, consolidated_summary, detailed_report