Coverage for openhcs/processing/backends/analysis/consolidate_analysis_results.py: 8.1%
216 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Consolidate OpenHCS analysis results into summary tables.
4This module provides a standalone function for consolidating any CSV-based analysis results
5from OpenHCS pipelines into a single summary table. Creates MetaXpress-style output where
6each well is a row and analysis metrics are columns.
8Usage:
9 # Standalone
10 df = consolidate_analysis_results("/path/to/results")
12 # In pipeline
13 FunctionStep(func=consolidate_analysis_results_pipeline, ...)
14"""
16import pandas as pd
17import numpy as np
18import re
19import logging
20from pathlib import Path
21from typing import Dict, List, Optional, Any, Union
22from datetime import datetime
24from openhcs.core.memory.decorators import numpy as numpy_func
25from openhcs.core.pipeline.function_contracts import special_outputs
26from openhcs.constants.constants import Backend
28# Import config classes with TYPE_CHECKING to avoid circular imports
29from typing import TYPE_CHECKING
30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true
31 from openhcs.core.config import AnalysisConsolidationConfig, PlateMetadataConfig
33logger = logging.getLogger(__name__)
36def extract_well_id(filename: str, pattern: str = r"([A-Z]\d{2})") -> Optional[str]:
37 """Extract well ID from filename using regex pattern."""
38 match = re.search(pattern, filename)
39 return match.group(1) if match else None
42def extract_analysis_type(filename: str, well_id: str) -> str:
43 """Extract analysis type from filename after removing well ID and extension."""
44 # Remove well ID prefix and file extension
45 analysis_type = filename.replace(f"{well_id}_", "").replace(Path(filename).suffix, "")
46 return analysis_type
49def create_metaxpress_header(summary_df: pd.DataFrame, plate_metadata: Optional[Dict[str, str]] = None) -> List[List[str]]:
50 """
51 Create MetaXpress-style header rows with metadata.
53 Returns list of header rows to prepend to the CSV.
54 """
55 if plate_metadata is None:
56 plate_metadata = {}
58 # Extract plate info from results directory or use defaults
59 barcode = plate_metadata.get('barcode', 'OpenHCS-Plate')
60 plate_name = plate_metadata.get('plate_name', 'OpenHCS Analysis Results')
61 plate_id = plate_metadata.get('plate_id', '00000')
62 description = plate_metadata.get('description', 'Consolidated analysis results from OpenHCS pipeline')
63 acquisition_user = plate_metadata.get('acquisition_user', 'OpenHCS')
64 z_step = plate_metadata.get('z_step', '1')
66 # Create header rows matching MetaXpress format
67 header_rows = [
68 ['Barcode', barcode],
69 ['Plate Name', plate_name],
70 ['Plate ID', plate_id],
71 ['Description', description],
72 ['Acquisition User', acquisition_user],
73 ['Z Step', z_step]
74 ]
76 # Pad header rows to match the number of columns in the data
77 num_cols = len(summary_df.columns)
78 for row in header_rows:
79 while len(row) < num_cols:
80 row.append('')
82 return header_rows
85def save_with_metaxpress_header(summary_df: pd.DataFrame, output_path: str, plate_metadata: Optional[Dict[str, str]] = None):
86 """
87 Save DataFrame with MetaXpress-style header structure.
88 """
89 # Create header rows
90 header_rows = create_metaxpress_header(summary_df, plate_metadata)
92 # Convert DataFrame to list of lists
93 data_rows = []
95 # Add column headers as a row
96 data_rows.append(summary_df.columns.tolist())
98 # Add data rows
99 for _, row in summary_df.iterrows():
100 data_rows.append(row.tolist())
102 # Combine header + data
103 all_rows = header_rows + data_rows
105 # Write to CSV manually to preserve the exact structure
106 with open(output_path, 'w', newline='') as f:
107 import csv
108 writer = csv.writer(f)
109 for row in all_rows:
110 writer.writerow(row)
113def auto_summarize_column(series: pd.Series, column_name: str, analysis_type: str) -> Dict[str, Any]:
114 """
115 Automatically summarize a pandas series with MetaXpress-style naming.
117 Returns a dictionary of summary statistics with clean, descriptive names.
118 """
119 summary = {}
121 # Handle empty series
122 if len(series) == 0:
123 return {}
125 # Remove NaN values for analysis
126 clean_series = series.dropna()
128 if len(clean_series) == 0:
129 return {}
131 # Create clean analysis type name for grouping
132 clean_analysis = analysis_type.replace('_', ' ').title()
134 # Create meaningful metric names based on column content
135 if pd.api.types.is_numeric_dtype(clean_series):
136 # Numeric data - focus on key metrics like MetaXpress
137 if 'count' in column_name.lower() or 'total' in column_name.lower():
138 # Count/total metrics
139 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum()
140 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
142 elif 'area' in column_name.lower():
143 # Area metrics
144 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum()
145 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
147 elif 'length' in column_name.lower() or 'distance' in column_name.lower():
148 # Length/distance metrics
149 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum()
150 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
152 elif 'intensity' in column_name.lower():
153 # Intensity metrics
154 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
156 elif 'confidence' in column_name.lower():
157 # Confidence metrics
158 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
160 else:
161 # Generic numeric metrics
162 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
164 elif clean_series.dtype == bool or set(clean_series.unique()).issubset({0, 1, True, False}):
165 # Boolean data
166 true_count = clean_series.sum()
167 total_count = len(clean_series)
168 summary[f"Count {column_name.replace('_', ' ').title()} ({clean_analysis})"] = true_count
169 summary[f"% {column_name.replace('_', ' ').title()} ({clean_analysis})"] = (true_count / total_count) * 100
171 else:
172 # Categorical/string data - only include if meaningful
173 unique_values = clean_series.unique()
174 if len(unique_values) <= 5: # Only include if not too many categories
175 value_counts = clean_series.value_counts()
176 most_common = value_counts.index[0] if len(value_counts) > 0 else None
177 summary[f"Primary {column_name.replace('_', ' ').title()} ({clean_analysis})"] = most_common
179 return summary
182def summarize_analysis_file(file_path: str, analysis_type: str) -> Dict[str, Any]:
183 """
184 Summarize a single analysis CSV file with MetaXpress-style metrics.
186 Returns a dictionary of key summary statistics with clean names.
187 """
188 try:
189 df = pd.read_csv(file_path)
191 if df.empty:
192 logger.warning(f"Empty CSV file: {file_path}")
193 return {}
195 summary = {}
196 clean_analysis = analysis_type.replace('_', ' ').title()
198 # Add key file-level metrics first
199 summary[f"Number of Objects ({clean_analysis})"] = len(df)
201 # Prioritize important columns based on common analysis patterns
202 priority_columns = []
203 other_columns = []
205 for column in df.columns:
206 # Skip common index/ID columns
207 if column.lower() in ['index', 'unnamed: 0', 'slice_index', 'cell_id', 'match_id', 'skeleton_id']:
208 continue
210 # Prioritize key metrics
211 col_lower = column.lower()
212 if any(key in col_lower for key in ['area', 'count', 'length', 'distance', 'intensity', 'confidence', 'branch']):
213 priority_columns.append(column)
214 else:
215 other_columns.append(column)
217 # Process priority columns first
218 for column in priority_columns:
219 col_summary = auto_summarize_column(df[column], column, analysis_type)
220 summary.update(col_summary)
222 # Process other columns but limit to avoid too many metrics
223 for column in other_columns[:5]: # Limit to 5 additional columns
224 col_summary = auto_summarize_column(df[column], column, analysis_type)
225 summary.update(col_summary)
227 return summary
229 except Exception as e:
230 logger.error(f"Error processing {file_path}: {e}")
231 return {}
234def consolidate_analysis_results(
235 results_directory: str,
236 well_ids: List[str],
237 consolidation_config: 'AnalysisConsolidationConfig',
238 plate_metadata_config: 'PlateMetadataConfig',
239 output_path: Optional[str] = None
240) -> pd.DataFrame:
241 """
242 Consolidate analysis results into a single summary table using configuration objects.
244 Args:
245 results_directory: Directory containing analysis CSV files
246 consolidation_config: Configuration for consolidation behavior
247 plate_metadata_config: Configuration for plate metadata
248 output_path: Optional path to save consolidated CSV
250 Returns:
251 DataFrame with wells as rows and analysis metrics as columns
252 """
253 results_dir = Path(results_directory)
255 if not results_dir.exists():
256 raise FileNotFoundError(f"Results directory does not exist: {results_directory}")
258 logger.info(f"Consolidating analysis results from: {results_directory}")
260 # Debug config objects
261 logger.info(f"DEBUG: consolidation_config type: {type(consolidation_config)}")
262 logger.info(f"DEBUG: well_pattern: {repr(consolidation_config.well_pattern)}")
263 logger.info(f"DEBUG: file_extensions: {repr(consolidation_config.file_extensions)}")
264 logger.info(f"DEBUG: exclude_patterns: {repr(consolidation_config.exclude_patterns)}")
266 # Find all relevant files
267 all_files = []
268 for ext in consolidation_config.file_extensions:
269 pattern = f"*{ext}"
270 files = list(results_dir.glob(pattern))
271 all_files.extend([str(f) for f in files])
273 logger.info(f"Found {len(all_files)} files with extensions {consolidation_config.file_extensions}")
275 # Apply exclude filters
276 if consolidation_config.exclude_patterns:
277 # Handle case where exclude_patterns might be a string representation
278 exclude_patterns = consolidation_config.exclude_patterns
279 if isinstance(exclude_patterns, str):
280 # If it's a string representation of a tuple, convert it back
281 import ast
282 logger.info(f"DEBUG: exclude_patterns is string: {repr(exclude_patterns)}")
283 try:
284 exclude_patterns = ast.literal_eval(exclude_patterns)
285 logger.info(f"DEBUG: Successfully parsed to: {repr(exclude_patterns)}")
286 except Exception as e:
287 logger.warning(f"Could not parse exclude_patterns string: {exclude_patterns}, error: {e}")
288 exclude_patterns = []
290 filtered_files = []
291 for file_path in all_files:
292 filename = Path(file_path).name
293 if not any(re.search(pattern, filename) for pattern in exclude_patterns):
294 filtered_files.append(file_path)
295 all_files = filtered_files
296 logger.info(f"After filtering: {len(all_files)} files to process")
298 # Group files by well ID and analysis type
299 wells_data = {}
300 analysis_types = set()
302 for file_path in all_files:
303 filename = Path(file_path).name
305 # Find well ID by substring matching (much more robust than regex)
306 well_id = None
307 for candidate_well in well_ids:
308 if candidate_well in filename:
309 well_id = candidate_well
310 break
312 if not well_id:
313 logger.warning(f"Could not find any well ID from {well_ids} in filename {filename}, skipping")
314 continue
316 analysis_type = extract_analysis_type(filename, well_id)
317 analysis_types.add(analysis_type)
319 if well_id not in wells_data:
320 wells_data[well_id] = {}
322 wells_data[well_id][analysis_type] = file_path
324 logger.info(f"Processing {len(wells_data)} wells with analysis types: {sorted(analysis_types)}")
326 # Process each well and create summary
327 summary_rows = []
329 for well_id in sorted(wells_data.keys()):
330 # Always use a consistent well ID column name
331 well_summary = {'Well': well_id}
333 # Process each analysis type for this well
334 for analysis_type in sorted(analysis_types):
335 if analysis_type in wells_data[well_id]:
336 file_path = wells_data[well_id][analysis_type]
337 analysis_summary = summarize_analysis_file(file_path, analysis_type)
338 well_summary.update(analysis_summary)
340 summary_rows.append(well_summary)
342 # Create DataFrame
343 summary_df = pd.DataFrame(summary_rows)
345 if consolidation_config.metaxpress_style:
346 # MetaXpress-style column ordering: Well first, then grouped by analysis type
347 # Group columns by analysis type (text in parentheses)
348 analysis_groups = {}
349 other_cols = []
351 for col in summary_df.columns:
352 if col == 'Well':
353 continue
354 if '(' in col and ')' in col:
355 analysis_name = col.split('(')[-1].replace(')', '')
356 if analysis_name not in analysis_groups:
357 analysis_groups[analysis_name] = []
358 analysis_groups[analysis_name].append(col)
359 else:
360 other_cols.append(col)
362 # Reorder columns: Well first, then grouped by analysis type
363 ordered_cols = ['Well']
364 for analysis_name in sorted(analysis_groups.keys()):
365 ordered_cols.extend(sorted(analysis_groups[analysis_name]))
366 ordered_cols.extend(sorted(other_cols))
368 summary_df = summary_df[ordered_cols]
369 else:
370 # Original style: sort all columns alphabetically
371 if 'Well' in summary_df.columns:
372 other_cols = [col for col in summary_df.columns if col != 'Well']
373 summary_df = summary_df[['Well'] + sorted(other_cols)]
375 logger.info(f"Created summary table with {len(summary_df)} wells and {len(summary_df.columns)} metrics")
377 # Save to CSV if output path specified
378 if output_path is None:
379 output_path = results_dir / consolidation_config.output_filename
381 if consolidation_config.metaxpress_style:
382 # Create plate metadata dictionary from config
383 plate_metadata = {
384 'barcode': plate_metadata_config.barcode or f"OpenHCS-{results_dir.name}",
385 'plate_name': plate_metadata_config.plate_name or results_dir.name,
386 'plate_id': plate_metadata_config.plate_id or str(hash(str(results_dir)) % 100000),
387 'description': plate_metadata_config.description or f"Consolidated analysis results from OpenHCS pipeline: {len(summary_df)} wells analyzed",
388 'acquisition_user': plate_metadata_config.acquisition_user,
389 'z_step': plate_metadata_config.z_step
390 }
392 save_with_metaxpress_header(summary_df, output_path, plate_metadata)
393 logger.info(f"Saved MetaXpress-style summary with header to: {output_path}")
394 else:
395 summary_df.to_csv(output_path, index=False)
396 logger.info(f"Saved consolidated summary to: {output_path}")
398 return summary_df
401def materialize_consolidated_results(
402 data: pd.DataFrame,
403 output_path: str,
404 filemanager,
405 well_id: str
406) -> str:
407 """Materialize consolidated results DataFrame to CSV using OpenHCS FileManager."""
408 try:
409 csv_content = data.to_csv(index=False)
411 # Remove existing file if present
412 if filemanager.exists(output_path, Backend.DISK.value):
413 filemanager.delete(output_path, Backend.DISK.value)
415 filemanager.save(csv_content, output_path, Backend.DISK.value)
416 logger.info(f"Materialized consolidated results to {output_path}")
417 return output_path
419 except Exception as e:
420 logger.error(f"Failed to materialize consolidated results: {e}")
421 raise
424@numpy_func
425@special_outputs(("consolidated_results", materialize_consolidated_results))
426def consolidate_analysis_results_pipeline(
427 image_stack: np.ndarray,
428 results_directory: str,
429 consolidation_config: 'AnalysisConsolidationConfig',
430 plate_metadata_config: 'PlateMetadataConfig'
431) -> tuple[np.ndarray, pd.DataFrame]:
432 """
433 Pipeline-compatible version of consolidate_analysis_results.
435 This function can be used as a FunctionStep in OpenHCS pipelines.
436 """
437 # Call the main consolidation function
438 summary_df = consolidate_analysis_results(
439 results_directory=results_directory,
440 consolidation_config=consolidation_config,
441 plate_metadata_config=plate_metadata_config,
442 output_path=None # Will be handled by materialization
443 )
445 return image_stack, summary_df