Coverage for openhcs/processing/backends/analysis/consolidate_analysis_results.py: 8.1%

216 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Consolidate OpenHCS analysis results into summary tables. 

3 

4This module provides a standalone function for consolidating any CSV-based analysis results 

5from OpenHCS pipelines into a single summary table. Creates MetaXpress-style output where 

6each well is a row and analysis metrics are columns. 

7 

8Usage: 

9 # Standalone 

10 df = consolidate_analysis_results("/path/to/results") 

11  

12 # In pipeline 

13 FunctionStep(func=consolidate_analysis_results_pipeline, ...) 

14""" 

15 

16import pandas as pd 

17import numpy as np 

18import re 

19import logging 

20from pathlib import Path 

21from typing import Dict, List, Optional, Any, Union 

22from datetime import datetime 

23 

24from openhcs.core.memory.decorators import numpy as numpy_func 

25from openhcs.core.pipeline.function_contracts import special_outputs 

26from openhcs.constants.constants import Backend 

27 

28# Import config classes with TYPE_CHECKING to avoid circular imports 

29from typing import TYPE_CHECKING 

30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true

31 from openhcs.core.config import AnalysisConsolidationConfig, PlateMetadataConfig 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36def extract_well_id(filename: str, pattern: str = r"([A-Z]\d{2})") -> Optional[str]: 

37 """Extract well ID from filename using regex pattern.""" 

38 match = re.search(pattern, filename) 

39 return match.group(1) if match else None 

40 

41 

42def extract_analysis_type(filename: str, well_id: str) -> str: 

43 """Extract analysis type from filename after removing well ID and extension.""" 

44 # Remove well ID prefix and file extension 

45 analysis_type = filename.replace(f"{well_id}_", "").replace(Path(filename).suffix, "") 

46 return analysis_type 

47 

48 

49def create_metaxpress_header(summary_df: pd.DataFrame, plate_metadata: Optional[Dict[str, str]] = None) -> List[List[str]]: 

50 """ 

51 Create MetaXpress-style header rows with metadata. 

52 

53 Returns list of header rows to prepend to the CSV. 

54 """ 

55 if plate_metadata is None: 

56 plate_metadata = {} 

57 

58 # Extract plate info from results directory or use defaults 

59 barcode = plate_metadata.get('barcode', 'OpenHCS-Plate') 

60 plate_name = plate_metadata.get('plate_name', 'OpenHCS Analysis Results') 

61 plate_id = plate_metadata.get('plate_id', '00000') 

62 description = plate_metadata.get('description', 'Consolidated analysis results from OpenHCS pipeline') 

63 acquisition_user = plate_metadata.get('acquisition_user', 'OpenHCS') 

64 z_step = plate_metadata.get('z_step', '1') 

65 

66 # Create header rows matching MetaXpress format 

67 header_rows = [ 

68 ['Barcode', barcode], 

69 ['Plate Name', plate_name], 

70 ['Plate ID', plate_id], 

71 ['Description', description], 

72 ['Acquisition User', acquisition_user], 

73 ['Z Step', z_step] 

74 ] 

75 

76 # Pad header rows to match the number of columns in the data 

77 num_cols = len(summary_df.columns) 

78 for row in header_rows: 

79 while len(row) < num_cols: 

80 row.append('') 

81 

82 return header_rows 

83 

84 

85def save_with_metaxpress_header(summary_df: pd.DataFrame, output_path: str, plate_metadata: Optional[Dict[str, str]] = None): 

86 """ 

87 Save DataFrame with MetaXpress-style header structure. 

88 """ 

89 # Create header rows 

90 header_rows = create_metaxpress_header(summary_df, plate_metadata) 

91 

92 # Convert DataFrame to list of lists 

93 data_rows = [] 

94 

95 # Add column headers as a row 

96 data_rows.append(summary_df.columns.tolist()) 

97 

98 # Add data rows 

99 for _, row in summary_df.iterrows(): 

100 data_rows.append(row.tolist()) 

101 

102 # Combine header + data 

103 all_rows = header_rows + data_rows 

104 

105 # Write to CSV manually to preserve the exact structure 

106 with open(output_path, 'w', newline='') as f: 

107 import csv 

108 writer = csv.writer(f) 

109 for row in all_rows: 

110 writer.writerow(row) 

111 

112 

113def auto_summarize_column(series: pd.Series, column_name: str, analysis_type: str) -> Dict[str, Any]: 

114 """ 

115 Automatically summarize a pandas series with MetaXpress-style naming. 

116 

117 Returns a dictionary of summary statistics with clean, descriptive names. 

118 """ 

119 summary = {} 

120 

121 # Handle empty series 

122 if len(series) == 0: 

123 return {} 

124 

125 # Remove NaN values for analysis 

126 clean_series = series.dropna() 

127 

128 if len(clean_series) == 0: 

129 return {} 

130 

131 # Create clean analysis type name for grouping 

132 clean_analysis = analysis_type.replace('_', ' ').title() 

133 

134 # Create meaningful metric names based on column content 

135 if pd.api.types.is_numeric_dtype(clean_series): 

136 # Numeric data - focus on key metrics like MetaXpress 

137 if 'count' in column_name.lower() or 'total' in column_name.lower(): 

138 # Count/total metrics 

139 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum() 

140 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

141 

142 elif 'area' in column_name.lower(): 

143 # Area metrics 

144 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum() 

145 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

146 

147 elif 'length' in column_name.lower() or 'distance' in column_name.lower(): 

148 # Length/distance metrics 

149 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum() 

150 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

151 

152 elif 'intensity' in column_name.lower(): 

153 # Intensity metrics 

154 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

155 

156 elif 'confidence' in column_name.lower(): 

157 # Confidence metrics 

158 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

159 

160 else: 

161 # Generic numeric metrics 

162 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

163 

164 elif clean_series.dtype == bool or set(clean_series.unique()).issubset({0, 1, True, False}): 

165 # Boolean data 

166 true_count = clean_series.sum() 

167 total_count = len(clean_series) 

168 summary[f"Count {column_name.replace('_', ' ').title()} ({clean_analysis})"] = true_count 

169 summary[f"% {column_name.replace('_', ' ').title()} ({clean_analysis})"] = (true_count / total_count) * 100 

170 

171 else: 

172 # Categorical/string data - only include if meaningful 

173 unique_values = clean_series.unique() 

174 if len(unique_values) <= 5: # Only include if not too many categories 

175 value_counts = clean_series.value_counts() 

176 most_common = value_counts.index[0] if len(value_counts) > 0 else None 

177 summary[f"Primary {column_name.replace('_', ' ').title()} ({clean_analysis})"] = most_common 

178 

179 return summary 

180 

181 

182def summarize_analysis_file(file_path: str, analysis_type: str) -> Dict[str, Any]: 

183 """ 

184 Summarize a single analysis CSV file with MetaXpress-style metrics. 

185 

186 Returns a dictionary of key summary statistics with clean names. 

187 """ 

188 try: 

189 df = pd.read_csv(file_path) 

190 

191 if df.empty: 

192 logger.warning(f"Empty CSV file: {file_path}") 

193 return {} 

194 

195 summary = {} 

196 clean_analysis = analysis_type.replace('_', ' ').title() 

197 

198 # Add key file-level metrics first 

199 summary[f"Number of Objects ({clean_analysis})"] = len(df) 

200 

201 # Prioritize important columns based on common analysis patterns 

202 priority_columns = [] 

203 other_columns = [] 

204 

205 for column in df.columns: 

206 # Skip common index/ID columns 

207 if column.lower() in ['index', 'unnamed: 0', 'slice_index', 'cell_id', 'match_id', 'skeleton_id']: 

208 continue 

209 

210 # Prioritize key metrics 

211 col_lower = column.lower() 

212 if any(key in col_lower for key in ['area', 'count', 'length', 'distance', 'intensity', 'confidence', 'branch']): 

213 priority_columns.append(column) 

214 else: 

215 other_columns.append(column) 

216 

217 # Process priority columns first 

218 for column in priority_columns: 

219 col_summary = auto_summarize_column(df[column], column, analysis_type) 

220 summary.update(col_summary) 

221 

222 # Process other columns but limit to avoid too many metrics 

223 for column in other_columns[:5]: # Limit to 5 additional columns 

224 col_summary = auto_summarize_column(df[column], column, analysis_type) 

225 summary.update(col_summary) 

226 

227 return summary 

228 

229 except Exception as e: 

230 logger.error(f"Error processing {file_path}: {e}") 

231 return {} 

232 

233 

234def consolidate_analysis_results( 

235 results_directory: str, 

236 well_ids: List[str], 

237 consolidation_config: 'AnalysisConsolidationConfig', 

238 plate_metadata_config: 'PlateMetadataConfig', 

239 output_path: Optional[str] = None 

240) -> pd.DataFrame: 

241 """ 

242 Consolidate analysis results into a single summary table using configuration objects. 

243 

244 Args: 

245 results_directory: Directory containing analysis CSV files 

246 consolidation_config: Configuration for consolidation behavior 

247 plate_metadata_config: Configuration for plate metadata 

248 output_path: Optional path to save consolidated CSV 

249 

250 Returns: 

251 DataFrame with wells as rows and analysis metrics as columns 

252 """ 

253 results_dir = Path(results_directory) 

254 

255 if not results_dir.exists(): 

256 raise FileNotFoundError(f"Results directory does not exist: {results_directory}") 

257 

258 logger.info(f"Consolidating analysis results from: {results_directory}") 

259 

260 # Debug config objects 

261 logger.info(f"DEBUG: consolidation_config type: {type(consolidation_config)}") 

262 logger.info(f"DEBUG: well_pattern: {repr(consolidation_config.well_pattern)}") 

263 logger.info(f"DEBUG: file_extensions: {repr(consolidation_config.file_extensions)}") 

264 logger.info(f"DEBUG: exclude_patterns: {repr(consolidation_config.exclude_patterns)}") 

265 

266 # Find all relevant files 

267 all_files = [] 

268 for ext in consolidation_config.file_extensions: 

269 pattern = f"*{ext}" 

270 files = list(results_dir.glob(pattern)) 

271 all_files.extend([str(f) for f in files]) 

272 

273 logger.info(f"Found {len(all_files)} files with extensions {consolidation_config.file_extensions}") 

274 

275 # Apply exclude filters 

276 if consolidation_config.exclude_patterns: 

277 # Handle case where exclude_patterns might be a string representation 

278 exclude_patterns = consolidation_config.exclude_patterns 

279 if isinstance(exclude_patterns, str): 

280 # If it's a string representation of a tuple, convert it back 

281 import ast 

282 logger.info(f"DEBUG: exclude_patterns is string: {repr(exclude_patterns)}") 

283 try: 

284 exclude_patterns = ast.literal_eval(exclude_patterns) 

285 logger.info(f"DEBUG: Successfully parsed to: {repr(exclude_patterns)}") 

286 except Exception as e: 

287 logger.warning(f"Could not parse exclude_patterns string: {exclude_patterns}, error: {e}") 

288 exclude_patterns = [] 

289 

290 filtered_files = [] 

291 for file_path in all_files: 

292 filename = Path(file_path).name 

293 if not any(re.search(pattern, filename) for pattern in exclude_patterns): 

294 filtered_files.append(file_path) 

295 all_files = filtered_files 

296 logger.info(f"After filtering: {len(all_files)} files to process") 

297 

298 # Group files by well ID and analysis type 

299 wells_data = {} 

300 analysis_types = set() 

301 

302 for file_path in all_files: 

303 filename = Path(file_path).name 

304 

305 # Find well ID by substring matching (much more robust than regex) 

306 well_id = None 

307 for candidate_well in well_ids: 

308 if candidate_well in filename: 

309 well_id = candidate_well 

310 break 

311 

312 if not well_id: 

313 logger.warning(f"Could not find any well ID from {well_ids} in filename {filename}, skipping") 

314 continue 

315 

316 analysis_type = extract_analysis_type(filename, well_id) 

317 analysis_types.add(analysis_type) 

318 

319 if well_id not in wells_data: 

320 wells_data[well_id] = {} 

321 

322 wells_data[well_id][analysis_type] = file_path 

323 

324 logger.info(f"Processing {len(wells_data)} wells with analysis types: {sorted(analysis_types)}") 

325 

326 # Process each well and create summary 

327 summary_rows = [] 

328 

329 for well_id in sorted(wells_data.keys()): 

330 # Always use a consistent well ID column name 

331 well_summary = {'Well': well_id} 

332 

333 # Process each analysis type for this well 

334 for analysis_type in sorted(analysis_types): 

335 if analysis_type in wells_data[well_id]: 

336 file_path = wells_data[well_id][analysis_type] 

337 analysis_summary = summarize_analysis_file(file_path, analysis_type) 

338 well_summary.update(analysis_summary) 

339 

340 summary_rows.append(well_summary) 

341 

342 # Create DataFrame 

343 summary_df = pd.DataFrame(summary_rows) 

344 

345 if consolidation_config.metaxpress_style: 

346 # MetaXpress-style column ordering: Well first, then grouped by analysis type 

347 # Group columns by analysis type (text in parentheses) 

348 analysis_groups = {} 

349 other_cols = [] 

350 

351 for col in summary_df.columns: 

352 if col == 'Well': 

353 continue 

354 if '(' in col and ')' in col: 

355 analysis_name = col.split('(')[-1].replace(')', '') 

356 if analysis_name not in analysis_groups: 

357 analysis_groups[analysis_name] = [] 

358 analysis_groups[analysis_name].append(col) 

359 else: 

360 other_cols.append(col) 

361 

362 # Reorder columns: Well first, then grouped by analysis type 

363 ordered_cols = ['Well'] 

364 for analysis_name in sorted(analysis_groups.keys()): 

365 ordered_cols.extend(sorted(analysis_groups[analysis_name])) 

366 ordered_cols.extend(sorted(other_cols)) 

367 

368 summary_df = summary_df[ordered_cols] 

369 else: 

370 # Original style: sort all columns alphabetically 

371 if 'Well' in summary_df.columns: 

372 other_cols = [col for col in summary_df.columns if col != 'Well'] 

373 summary_df = summary_df[['Well'] + sorted(other_cols)] 

374 

375 logger.info(f"Created summary table with {len(summary_df)} wells and {len(summary_df.columns)} metrics") 

376 

377 # Save to CSV if output path specified 

378 if output_path is None: 

379 output_path = results_dir / consolidation_config.output_filename 

380 

381 if consolidation_config.metaxpress_style: 

382 # Create plate metadata dictionary from config 

383 plate_metadata = { 

384 'barcode': plate_metadata_config.barcode or f"OpenHCS-{results_dir.name}", 

385 'plate_name': plate_metadata_config.plate_name or results_dir.name, 

386 'plate_id': plate_metadata_config.plate_id or str(hash(str(results_dir)) % 100000), 

387 'description': plate_metadata_config.description or f"Consolidated analysis results from OpenHCS pipeline: {len(summary_df)} wells analyzed", 

388 'acquisition_user': plate_metadata_config.acquisition_user, 

389 'z_step': plate_metadata_config.z_step 

390 } 

391 

392 save_with_metaxpress_header(summary_df, output_path, plate_metadata) 

393 logger.info(f"Saved MetaXpress-style summary with header to: {output_path}") 

394 else: 

395 summary_df.to_csv(output_path, index=False) 

396 logger.info(f"Saved consolidated summary to: {output_path}") 

397 

398 return summary_df 

399 

400 

401def materialize_consolidated_results( 

402 data: pd.DataFrame, 

403 output_path: str, 

404 filemanager, 

405 well_id: str 

406) -> str: 

407 """Materialize consolidated results DataFrame to CSV using OpenHCS FileManager.""" 

408 try: 

409 csv_content = data.to_csv(index=False) 

410 

411 # Remove existing file if present 

412 if filemanager.exists(output_path, Backend.DISK.value): 

413 filemanager.delete(output_path, Backend.DISK.value) 

414 

415 filemanager.save(csv_content, output_path, Backend.DISK.value) 

416 logger.info(f"Materialized consolidated results to {output_path}") 

417 return output_path 

418 

419 except Exception as e: 

420 logger.error(f"Failed to materialize consolidated results: {e}") 

421 raise 

422 

423 

424@numpy_func 

425@special_outputs(("consolidated_results", materialize_consolidated_results)) 

426def consolidate_analysis_results_pipeline( 

427 image_stack: np.ndarray, 

428 results_directory: str, 

429 consolidation_config: 'AnalysisConsolidationConfig', 

430 plate_metadata_config: 'PlateMetadataConfig' 

431) -> tuple[np.ndarray, pd.DataFrame]: 

432 """ 

433 Pipeline-compatible version of consolidate_analysis_results. 

434  

435 This function can be used as a FunctionStep in OpenHCS pipelines. 

436 """ 

437 # Call the main consolidation function 

438 summary_df = consolidate_analysis_results( 

439 results_directory=results_directory, 

440 consolidation_config=consolidation_config, 

441 plate_metadata_config=plate_metadata_config, 

442 output_path=None # Will be handled by materialization 

443 ) 

444 

445 return image_stack, summary_df