Coverage for openhcs/processing/backends/analysis/consolidate_analysis_results.py: 7.5%
214 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Consolidate OpenHCS analysis results into summary tables.
4This module provides a standalone function for consolidating any CSV-based analysis results
5from OpenHCS pipelines into a single summary table. Creates MetaXpress-style output where
6each well is a row and analysis metrics are columns.
8Usage:
9 # Standalone
10 df = consolidate_analysis_results("/path/to/results")
12 # In pipeline
13 FunctionStep(func=consolidate_analysis_results_pipeline, ...)
14"""
16import pandas as pd
17import numpy as np
18import re
19import logging
20from pathlib import Path
21from typing import Dict, List, Optional, Any
23from openhcs.core.memory.decorators import numpy as numpy_func
24from openhcs.core.pipeline.function_contracts import special_outputs
26# Import config classes with TYPE_CHECKING to avoid circular imports
27from typing import TYPE_CHECKING
28if TYPE_CHECKING: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true
29 from openhcs.core.config import AnalysisConsolidationConfig, PlateMetadataConfig
31logger = logging.getLogger(__name__)
34def extract_well_id(filename: str, pattern: str = r"([A-Z]\d{2})") -> Optional[str]:
35 """Extract well ID from filename using regex pattern."""
36 match = re.search(pattern, filename)
37 return match.group(1) if match else None
40def extract_analysis_type(filename: str, well_id: str) -> str:
41 """Extract analysis type from filename after removing well ID and extension."""
42 # Remove well ID prefix and file extension
43 analysis_type = filename.replace(f"{well_id}_", "").replace(Path(filename).suffix, "")
44 return analysis_type
47def create_metaxpress_header(summary_df: pd.DataFrame, plate_metadata: Optional[Dict[str, str]] = None) -> List[List[str]]:
48 """
49 Create MetaXpress-style header rows with metadata.
51 Returns list of header rows to prepend to the CSV.
52 """
53 if plate_metadata is None:
54 plate_metadata = {}
56 # Extract plate info from results directory or use defaults
57 barcode = plate_metadata.get('barcode', 'OpenHCS-Plate')
58 plate_name = plate_metadata.get('plate_name', 'OpenHCS Analysis Results')
59 plate_id = plate_metadata.get('plate_id', '00000')
60 description = plate_metadata.get('description', 'Consolidated analysis results from OpenHCS pipeline')
61 acquisition_user = plate_metadata.get('acquisition_user', 'OpenHCS')
62 z_step = plate_metadata.get('z_step', '1')
64 # Create header rows matching MetaXpress format
65 header_rows = [
66 ['Barcode', barcode],
67 ['Plate Name', plate_name],
68 ['Plate ID', plate_id],
69 ['Description', description],
70 ['Acquisition User', acquisition_user],
71 ['Z Step', z_step]
72 ]
74 # Pad header rows to match the number of columns in the data
75 num_cols = len(summary_df.columns)
76 for row in header_rows:
77 while len(row) < num_cols:
78 row.append('')
80 return header_rows
83def save_with_metaxpress_header(summary_df: pd.DataFrame, output_path: str, plate_metadata: Optional[Dict[str, str]] = None):
84 """
85 Save DataFrame with MetaXpress-style header structure.
86 """
87 # Create header rows
88 header_rows = create_metaxpress_header(summary_df, plate_metadata)
90 # Convert DataFrame to list of lists
91 data_rows = []
93 # Add column headers as a row
94 data_rows.append(summary_df.columns.tolist())
96 # Add data rows
97 for _, row in summary_df.iterrows():
98 data_rows.append(row.tolist())
100 # Combine header + data
101 all_rows = header_rows + data_rows
103 # Write to CSV manually to preserve the exact structure
104 with open(output_path, 'w', newline='') as f:
105 import csv
106 writer = csv.writer(f)
107 for row in all_rows:
108 writer.writerow(row)
111def auto_summarize_column(series: pd.Series, column_name: str, analysis_type: str) -> Dict[str, Any]:
112 """
113 Automatically summarize a pandas series with MetaXpress-style naming.
115 Returns a dictionary of summary statistics with clean, descriptive names.
116 """
117 summary = {}
119 # Handle empty series
120 if len(series) == 0:
121 return {}
123 # Remove NaN values for analysis
124 clean_series = series.dropna()
126 if len(clean_series) == 0:
127 return {}
129 # Create clean analysis type name for grouping
130 clean_analysis = analysis_type.replace('_', ' ').title()
132 # Create meaningful metric names based on column content
133 if pd.api.types.is_numeric_dtype(clean_series):
134 # Numeric data - focus on key metrics like MetaXpress
135 if 'count' in column_name.lower() or 'total' in column_name.lower():
136 # Count/total metrics
137 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum()
138 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
140 elif 'area' in column_name.lower():
141 # Area metrics
142 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum()
143 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
145 elif 'length' in column_name.lower() or 'distance' in column_name.lower():
146 # Length/distance metrics
147 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum()
148 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
150 elif 'intensity' in column_name.lower():
151 # Intensity metrics
152 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
154 elif 'confidence' in column_name.lower():
155 # Confidence metrics
156 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
158 else:
159 # Generic numeric metrics
160 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean()
162 elif clean_series.dtype == bool or set(clean_series.unique()).issubset({0, 1, True, False}):
163 # Boolean data
164 true_count = clean_series.sum()
165 total_count = len(clean_series)
166 summary[f"Count {column_name.replace('_', ' ').title()} ({clean_analysis})"] = true_count
167 summary[f"% {column_name.replace('_', ' ').title()} ({clean_analysis})"] = (true_count / total_count) * 100
169 else:
170 # Categorical/string data - only include if meaningful
171 unique_values = clean_series.unique()
172 if len(unique_values) <= 5: # Only include if not too many categories
173 value_counts = clean_series.value_counts()
174 most_common = value_counts.index[0] if len(value_counts) > 0 else None
175 summary[f"Primary {column_name.replace('_', ' ').title()} ({clean_analysis})"] = most_common
177 return summary
180def summarize_analysis_file(file_path: str, analysis_type: str) -> Dict[str, Any]:
181 """
182 Summarize a single analysis CSV file with MetaXpress-style metrics.
184 Returns a dictionary of key summary statistics with clean names.
185 """
186 try:
187 df = pd.read_csv(file_path)
189 if df.empty:
190 logger.warning(f"Empty CSV file: {file_path}")
191 return {}
193 summary = {}
194 clean_analysis = analysis_type.replace('_', ' ').title()
196 # Add key file-level metrics first
197 summary[f"Number of Objects ({clean_analysis})"] = len(df)
199 # Prioritize important columns based on common analysis patterns
200 priority_columns = []
201 other_columns = []
203 for column in df.columns:
204 # Skip common index/ID columns
205 if column.lower() in ['index', 'unnamed: 0', 'slice_index', 'cell_id', 'match_id', 'skeleton_id']:
206 continue
208 # Prioritize key metrics
209 col_lower = column.lower()
210 if any(key in col_lower for key in ['area', 'count', 'length', 'distance', 'intensity', 'confidence', 'branch']):
211 priority_columns.append(column)
212 else:
213 other_columns.append(column)
215 # Process priority columns first
216 for column in priority_columns:
217 col_summary = auto_summarize_column(df[column], column, analysis_type)
218 summary.update(col_summary)
220 # Process other columns but limit to avoid too many metrics
221 for column in other_columns[:5]: # Limit to 5 additional columns
222 col_summary = auto_summarize_column(df[column], column, analysis_type)
223 summary.update(col_summary)
225 return summary
227 except Exception as e:
228 logger.error(f"Error processing {file_path}: {e}")
229 return {}
232def consolidate_analysis_results(
233 results_directory: str,
234 well_ids: List[str],
235 consolidation_config: 'AnalysisConsolidationConfig',
236 plate_metadata_config: 'PlateMetadataConfig',
237 output_path: Optional[str] = None
238) -> pd.DataFrame:
239 """
240 Consolidate analysis results into a single summary table using configuration objects.
242 Args:
243 results_directory: Directory containing analysis CSV files
244 consolidation_config: Configuration for consolidation behavior
245 plate_metadata_config: Configuration for plate metadata
246 output_path: Optional path to save consolidated CSV
248 Returns:
249 DataFrame with wells as rows and analysis metrics as columns
250 """
251 results_dir = Path(results_directory)
253 if not results_dir.exists():
254 raise FileNotFoundError(f"Results directory does not exist: {results_directory}")
256 logger.info(f"Consolidating analysis results from: {results_directory}")
258 # Debug config objects
259 logger.info(f"DEBUG: consolidation_config type: {type(consolidation_config)}")
260 logger.info(f"DEBUG: well_pattern: {repr(consolidation_config.well_pattern)}")
261 logger.info(f"DEBUG: file_extensions: {repr(consolidation_config.file_extensions)}")
262 logger.info(f"DEBUG: exclude_patterns: {repr(consolidation_config.exclude_patterns)}")
264 # Find all relevant files
265 all_files = []
266 for ext in consolidation_config.file_extensions:
267 pattern = f"*{ext}"
268 files = list(results_dir.glob(pattern))
269 all_files.extend([str(f) for f in files])
271 logger.info(f"Found {len(all_files)} files with extensions {consolidation_config.file_extensions}")
273 # Apply exclude filters
274 if consolidation_config.exclude_patterns:
275 # Handle case where exclude_patterns might be a string representation
276 exclude_patterns = consolidation_config.exclude_patterns
277 if isinstance(exclude_patterns, str):
278 # If it's a string representation of a tuple, convert it back
279 import ast
280 logger.info(f"DEBUG: exclude_patterns is string: {repr(exclude_patterns)}")
281 try:
282 exclude_patterns = ast.literal_eval(exclude_patterns)
283 logger.info(f"DEBUG: Successfully parsed to: {repr(exclude_patterns)}")
284 except Exception as e:
285 logger.warning(f"Could not parse exclude_patterns string: {exclude_patterns}, error: {e}")
286 exclude_patterns = []
288 filtered_files = []
289 for file_path in all_files:
290 filename = Path(file_path).name
291 if not any(re.search(pattern, filename) for pattern in exclude_patterns):
292 filtered_files.append(file_path)
293 all_files = filtered_files
294 logger.info(f"After filtering: {len(all_files)} files to process")
296 # Group files by well ID and analysis type
297 wells_data = {}
298 analysis_types = set()
300 for file_path in all_files:
301 filename = Path(file_path).name
303 # Find well ID by substring matching (much more robust than regex)
304 well_id = None
305 for candidate_well in well_ids:
306 if candidate_well in filename:
307 well_id = candidate_well
308 break
310 if not well_id:
311 logger.warning(f"Could not find any well ID from {well_ids} in filename {filename}, skipping")
312 continue
314 analysis_type = extract_analysis_type(filename, well_id)
315 analysis_types.add(analysis_type)
317 if well_id not in wells_data:
318 wells_data[well_id] = {}
320 wells_data[well_id][analysis_type] = file_path
322 logger.info(f"Processing {len(wells_data)} wells with analysis types: {sorted(analysis_types)}")
324 # Process each well and create summary
325 summary_rows = []
327 for well_id in sorted(wells_data.keys()):
328 # Always use a consistent well ID column name
329 well_summary = {'Well': well_id}
331 # Process each analysis type for this well
332 for analysis_type in sorted(analysis_types):
333 if analysis_type in wells_data[well_id]:
334 file_path = wells_data[well_id][analysis_type]
335 analysis_summary = summarize_analysis_file(file_path, analysis_type)
336 well_summary.update(analysis_summary)
338 summary_rows.append(well_summary)
340 # Create DataFrame
341 summary_df = pd.DataFrame(summary_rows)
343 if consolidation_config.metaxpress_style:
344 # MetaXpress-style column ordering: Well first, then grouped by analysis type
345 # Group columns by analysis type (text in parentheses)
346 analysis_groups = {}
347 other_cols = []
349 for col in summary_df.columns:
350 if col == 'Well':
351 continue
352 if '(' in col and ')' in col:
353 analysis_name = col.split('(')[-1].replace(')', '')
354 if analysis_name not in analysis_groups:
355 analysis_groups[analysis_name] = []
356 analysis_groups[analysis_name].append(col)
357 else:
358 other_cols.append(col)
360 # Reorder columns: Well first, then grouped by analysis type
361 ordered_cols = ['Well']
362 for analysis_name in sorted(analysis_groups.keys()):
363 ordered_cols.extend(sorted(analysis_groups[analysis_name]))
364 ordered_cols.extend(sorted(other_cols))
366 summary_df = summary_df[ordered_cols]
367 else:
368 # Original style: sort all columns alphabetically
369 if 'Well' in summary_df.columns:
370 other_cols = [col for col in summary_df.columns if col != 'Well']
371 summary_df = summary_df[['Well'] + sorted(other_cols)]
373 logger.info(f"Created summary table with {len(summary_df)} wells and {len(summary_df.columns)} metrics")
375 # Save to CSV if output path specified
376 if output_path is None:
377 output_path = results_dir / consolidation_config.output_filename
379 if consolidation_config.metaxpress_style:
380 # Create plate metadata dictionary from config
381 plate_metadata = {
382 'barcode': plate_metadata_config.barcode or f"OpenHCS-{results_dir.name}",
383 'plate_name': plate_metadata_config.plate_name or results_dir.name,
384 'plate_id': plate_metadata_config.plate_id or str(hash(str(results_dir)) % 100000),
385 'description': plate_metadata_config.description or f"Consolidated analysis results from OpenHCS pipeline: {len(summary_df)} wells analyzed",
386 'acquisition_user': plate_metadata_config.acquisition_user,
387 'z_step': plate_metadata_config.z_step
388 }
390 save_with_metaxpress_header(summary_df, output_path, plate_metadata)
391 logger.info(f"Saved MetaXpress-style summary with header to: {output_path}")
392 else:
393 summary_df.to_csv(output_path, index=False)
394 logger.info(f"Saved consolidated summary to: {output_path}")
396 return summary_df
399def materialize_consolidated_results(
400 data: pd.DataFrame,
401 output_path: str,
402 filemanager,
403 backend: str,
404 well_id: str
405) -> str:
406 """Materialize consolidated results DataFrame to CSV using OpenHCS FileManager."""
407 try:
408 csv_content = data.to_csv(index=False)
410 # Remove existing file if present
411 if filemanager.exists(output_path, backend):
412 filemanager.delete(output_path, backend)
414 filemanager.save(csv_content, output_path, backend)
415 logger.info(f"Materialized consolidated results to {output_path}")
416 return output_path
418 except Exception as e:
419 logger.error(f"Failed to materialize consolidated results: {e}")
420 raise
423@numpy_func
424@special_outputs(("consolidated_results", materialize_consolidated_results))
425def consolidate_analysis_results_pipeline(
426 image_stack: np.ndarray,
427 results_directory: str,
428 consolidation_config: 'AnalysisConsolidationConfig',
429 plate_metadata_config: 'PlateMetadataConfig'
430) -> tuple[np.ndarray, pd.DataFrame]:
431 """
432 Pipeline-compatible version of consolidate_analysis_results.
434 This function can be used as a FunctionStep in OpenHCS pipelines.
435 """
436 # Call the main consolidation function
437 summary_df = consolidate_analysis_results(
438 results_directory=results_directory,
439 consolidation_config=consolidation_config,
440 plate_metadata_config=plate_metadata_config,
441 output_path=None # Will be handled by materialization
442 )
444 return image_stack, summary_df