Coverage for openhcs/processing/backends/analysis/consolidate_analysis_results.py: 7.5%

214 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Consolidate OpenHCS analysis results into summary tables. 

3 

4This module provides a standalone function for consolidating any CSV-based analysis results 

5from OpenHCS pipelines into a single summary table. Creates MetaXpress-style output where 

6each well is a row and analysis metrics are columns. 

7 

8Usage: 

9 # Standalone 

10 df = consolidate_analysis_results("/path/to/results") 

11  

12 # In pipeline 

13 FunctionStep(func=consolidate_analysis_results_pipeline, ...) 

14""" 

15 

16import pandas as pd 

17import numpy as np 

18import re 

19import logging 

20from pathlib import Path 

21from typing import Dict, List, Optional, Any 

22 

23from openhcs.core.memory.decorators import numpy as numpy_func 

24from openhcs.core.pipeline.function_contracts import special_outputs 

25 

26# Import config classes with TYPE_CHECKING to avoid circular imports 

27from typing import TYPE_CHECKING 

28if TYPE_CHECKING: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true

29 from openhcs.core.config import AnalysisConsolidationConfig, PlateMetadataConfig 

30 

31logger = logging.getLogger(__name__) 

32 

33 

34def extract_well_id(filename: str, pattern: str = r"([A-Z]\d{2})") -> Optional[str]: 

35 """Extract well ID from filename using regex pattern.""" 

36 match = re.search(pattern, filename) 

37 return match.group(1) if match else None 

38 

39 

40def extract_analysis_type(filename: str, well_id: str) -> str: 

41 """Extract analysis type from filename after removing well ID and extension.""" 

42 # Remove well ID prefix and file extension 

43 analysis_type = filename.replace(f"{well_id}_", "").replace(Path(filename).suffix, "") 

44 return analysis_type 

45 

46 

47def create_metaxpress_header(summary_df: pd.DataFrame, plate_metadata: Optional[Dict[str, str]] = None) -> List[List[str]]: 

48 """ 

49 Create MetaXpress-style header rows with metadata. 

50 

51 Returns list of header rows to prepend to the CSV. 

52 """ 

53 if plate_metadata is None: 

54 plate_metadata = {} 

55 

56 # Extract plate info from results directory or use defaults 

57 barcode = plate_metadata.get('barcode', 'OpenHCS-Plate') 

58 plate_name = plate_metadata.get('plate_name', 'OpenHCS Analysis Results') 

59 plate_id = plate_metadata.get('plate_id', '00000') 

60 description = plate_metadata.get('description', 'Consolidated analysis results from OpenHCS pipeline') 

61 acquisition_user = plate_metadata.get('acquisition_user', 'OpenHCS') 

62 z_step = plate_metadata.get('z_step', '1') 

63 

64 # Create header rows matching MetaXpress format 

65 header_rows = [ 

66 ['Barcode', barcode], 

67 ['Plate Name', plate_name], 

68 ['Plate ID', plate_id], 

69 ['Description', description], 

70 ['Acquisition User', acquisition_user], 

71 ['Z Step', z_step] 

72 ] 

73 

74 # Pad header rows to match the number of columns in the data 

75 num_cols = len(summary_df.columns) 

76 for row in header_rows: 

77 while len(row) < num_cols: 

78 row.append('') 

79 

80 return header_rows 

81 

82 

83def save_with_metaxpress_header(summary_df: pd.DataFrame, output_path: str, plate_metadata: Optional[Dict[str, str]] = None): 

84 """ 

85 Save DataFrame with MetaXpress-style header structure. 

86 """ 

87 # Create header rows 

88 header_rows = create_metaxpress_header(summary_df, plate_metadata) 

89 

90 # Convert DataFrame to list of lists 

91 data_rows = [] 

92 

93 # Add column headers as a row 

94 data_rows.append(summary_df.columns.tolist()) 

95 

96 # Add data rows 

97 for _, row in summary_df.iterrows(): 

98 data_rows.append(row.tolist()) 

99 

100 # Combine header + data 

101 all_rows = header_rows + data_rows 

102 

103 # Write to CSV manually to preserve the exact structure 

104 with open(output_path, 'w', newline='') as f: 

105 import csv 

106 writer = csv.writer(f) 

107 for row in all_rows: 

108 writer.writerow(row) 

109 

110 

111def auto_summarize_column(series: pd.Series, column_name: str, analysis_type: str) -> Dict[str, Any]: 

112 """ 

113 Automatically summarize a pandas series with MetaXpress-style naming. 

114 

115 Returns a dictionary of summary statistics with clean, descriptive names. 

116 """ 

117 summary = {} 

118 

119 # Handle empty series 

120 if len(series) == 0: 

121 return {} 

122 

123 # Remove NaN values for analysis 

124 clean_series = series.dropna() 

125 

126 if len(clean_series) == 0: 

127 return {} 

128 

129 # Create clean analysis type name for grouping 

130 clean_analysis = analysis_type.replace('_', ' ').title() 

131 

132 # Create meaningful metric names based on column content 

133 if pd.api.types.is_numeric_dtype(clean_series): 

134 # Numeric data - focus on key metrics like MetaXpress 

135 if 'count' in column_name.lower() or 'total' in column_name.lower(): 

136 # Count/total metrics 

137 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum() 

138 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

139 

140 elif 'area' in column_name.lower(): 

141 # Area metrics 

142 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum() 

143 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

144 

145 elif 'length' in column_name.lower() or 'distance' in column_name.lower(): 

146 # Length/distance metrics 

147 summary[f"Total {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.sum() 

148 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

149 

150 elif 'intensity' in column_name.lower(): 

151 # Intensity metrics 

152 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

153 

154 elif 'confidence' in column_name.lower(): 

155 # Confidence metrics 

156 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

157 

158 else: 

159 # Generic numeric metrics 

160 summary[f"Mean {column_name.replace('_', ' ').title()} ({clean_analysis})"] = clean_series.mean() 

161 

162 elif clean_series.dtype == bool or set(clean_series.unique()).issubset({0, 1, True, False}): 

163 # Boolean data 

164 true_count = clean_series.sum() 

165 total_count = len(clean_series) 

166 summary[f"Count {column_name.replace('_', ' ').title()} ({clean_analysis})"] = true_count 

167 summary[f"% {column_name.replace('_', ' ').title()} ({clean_analysis})"] = (true_count / total_count) * 100 

168 

169 else: 

170 # Categorical/string data - only include if meaningful 

171 unique_values = clean_series.unique() 

172 if len(unique_values) <= 5: # Only include if not too many categories 

173 value_counts = clean_series.value_counts() 

174 most_common = value_counts.index[0] if len(value_counts) > 0 else None 

175 summary[f"Primary {column_name.replace('_', ' ').title()} ({clean_analysis})"] = most_common 

176 

177 return summary 

178 

179 

180def summarize_analysis_file(file_path: str, analysis_type: str) -> Dict[str, Any]: 

181 """ 

182 Summarize a single analysis CSV file with MetaXpress-style metrics. 

183 

184 Returns a dictionary of key summary statistics with clean names. 

185 """ 

186 try: 

187 df = pd.read_csv(file_path) 

188 

189 if df.empty: 

190 logger.warning(f"Empty CSV file: {file_path}") 

191 return {} 

192 

193 summary = {} 

194 clean_analysis = analysis_type.replace('_', ' ').title() 

195 

196 # Add key file-level metrics first 

197 summary[f"Number of Objects ({clean_analysis})"] = len(df) 

198 

199 # Prioritize important columns based on common analysis patterns 

200 priority_columns = [] 

201 other_columns = [] 

202 

203 for column in df.columns: 

204 # Skip common index/ID columns 

205 if column.lower() in ['index', 'unnamed: 0', 'slice_index', 'cell_id', 'match_id', 'skeleton_id']: 

206 continue 

207 

208 # Prioritize key metrics 

209 col_lower = column.lower() 

210 if any(key in col_lower for key in ['area', 'count', 'length', 'distance', 'intensity', 'confidence', 'branch']): 

211 priority_columns.append(column) 

212 else: 

213 other_columns.append(column) 

214 

215 # Process priority columns first 

216 for column in priority_columns: 

217 col_summary = auto_summarize_column(df[column], column, analysis_type) 

218 summary.update(col_summary) 

219 

220 # Process other columns but limit to avoid too many metrics 

221 for column in other_columns[:5]: # Limit to 5 additional columns 

222 col_summary = auto_summarize_column(df[column], column, analysis_type) 

223 summary.update(col_summary) 

224 

225 return summary 

226 

227 except Exception as e: 

228 logger.error(f"Error processing {file_path}: {e}") 

229 return {} 

230 

231 

232def consolidate_analysis_results( 

233 results_directory: str, 

234 well_ids: List[str], 

235 consolidation_config: 'AnalysisConsolidationConfig', 

236 plate_metadata_config: 'PlateMetadataConfig', 

237 output_path: Optional[str] = None 

238) -> pd.DataFrame: 

239 """ 

240 Consolidate analysis results into a single summary table using configuration objects. 

241 

242 Args: 

243 results_directory: Directory containing analysis CSV files 

244 consolidation_config: Configuration for consolidation behavior 

245 plate_metadata_config: Configuration for plate metadata 

246 output_path: Optional path to save consolidated CSV 

247 

248 Returns: 

249 DataFrame with wells as rows and analysis metrics as columns 

250 """ 

251 results_dir = Path(results_directory) 

252 

253 if not results_dir.exists(): 

254 raise FileNotFoundError(f"Results directory does not exist: {results_directory}") 

255 

256 logger.info(f"Consolidating analysis results from: {results_directory}") 

257 

258 # Debug config objects 

259 logger.info(f"DEBUG: consolidation_config type: {type(consolidation_config)}") 

260 logger.info(f"DEBUG: well_pattern: {repr(consolidation_config.well_pattern)}") 

261 logger.info(f"DEBUG: file_extensions: {repr(consolidation_config.file_extensions)}") 

262 logger.info(f"DEBUG: exclude_patterns: {repr(consolidation_config.exclude_patterns)}") 

263 

264 # Find all relevant files 

265 all_files = [] 

266 for ext in consolidation_config.file_extensions: 

267 pattern = f"*{ext}" 

268 files = list(results_dir.glob(pattern)) 

269 all_files.extend([str(f) for f in files]) 

270 

271 logger.info(f"Found {len(all_files)} files with extensions {consolidation_config.file_extensions}") 

272 

273 # Apply exclude filters 

274 if consolidation_config.exclude_patterns: 

275 # Handle case where exclude_patterns might be a string representation 

276 exclude_patterns = consolidation_config.exclude_patterns 

277 if isinstance(exclude_patterns, str): 

278 # If it's a string representation of a tuple, convert it back 

279 import ast 

280 logger.info(f"DEBUG: exclude_patterns is string: {repr(exclude_patterns)}") 

281 try: 

282 exclude_patterns = ast.literal_eval(exclude_patterns) 

283 logger.info(f"DEBUG: Successfully parsed to: {repr(exclude_patterns)}") 

284 except Exception as e: 

285 logger.warning(f"Could not parse exclude_patterns string: {exclude_patterns}, error: {e}") 

286 exclude_patterns = [] 

287 

288 filtered_files = [] 

289 for file_path in all_files: 

290 filename = Path(file_path).name 

291 if not any(re.search(pattern, filename) for pattern in exclude_patterns): 

292 filtered_files.append(file_path) 

293 all_files = filtered_files 

294 logger.info(f"After filtering: {len(all_files)} files to process") 

295 

296 # Group files by well ID and analysis type 

297 wells_data = {} 

298 analysis_types = set() 

299 

300 for file_path in all_files: 

301 filename = Path(file_path).name 

302 

303 # Find well ID by substring matching (much more robust than regex) 

304 well_id = None 

305 for candidate_well in well_ids: 

306 if candidate_well in filename: 

307 well_id = candidate_well 

308 break 

309 

310 if not well_id: 

311 logger.warning(f"Could not find any well ID from {well_ids} in filename {filename}, skipping") 

312 continue 

313 

314 analysis_type = extract_analysis_type(filename, well_id) 

315 analysis_types.add(analysis_type) 

316 

317 if well_id not in wells_data: 

318 wells_data[well_id] = {} 

319 

320 wells_data[well_id][analysis_type] = file_path 

321 

322 logger.info(f"Processing {len(wells_data)} wells with analysis types: {sorted(analysis_types)}") 

323 

324 # Process each well and create summary 

325 summary_rows = [] 

326 

327 for well_id in sorted(wells_data.keys()): 

328 # Always use a consistent well ID column name 

329 well_summary = {'Well': well_id} 

330 

331 # Process each analysis type for this well 

332 for analysis_type in sorted(analysis_types): 

333 if analysis_type in wells_data[well_id]: 

334 file_path = wells_data[well_id][analysis_type] 

335 analysis_summary = summarize_analysis_file(file_path, analysis_type) 

336 well_summary.update(analysis_summary) 

337 

338 summary_rows.append(well_summary) 

339 

340 # Create DataFrame 

341 summary_df = pd.DataFrame(summary_rows) 

342 

343 if consolidation_config.metaxpress_style: 

344 # MetaXpress-style column ordering: Well first, then grouped by analysis type 

345 # Group columns by analysis type (text in parentheses) 

346 analysis_groups = {} 

347 other_cols = [] 

348 

349 for col in summary_df.columns: 

350 if col == 'Well': 

351 continue 

352 if '(' in col and ')' in col: 

353 analysis_name = col.split('(')[-1].replace(')', '') 

354 if analysis_name not in analysis_groups: 

355 analysis_groups[analysis_name] = [] 

356 analysis_groups[analysis_name].append(col) 

357 else: 

358 other_cols.append(col) 

359 

360 # Reorder columns: Well first, then grouped by analysis type 

361 ordered_cols = ['Well'] 

362 for analysis_name in sorted(analysis_groups.keys()): 

363 ordered_cols.extend(sorted(analysis_groups[analysis_name])) 

364 ordered_cols.extend(sorted(other_cols)) 

365 

366 summary_df = summary_df[ordered_cols] 

367 else: 

368 # Original style: sort all columns alphabetically 

369 if 'Well' in summary_df.columns: 

370 other_cols = [col for col in summary_df.columns if col != 'Well'] 

371 summary_df = summary_df[['Well'] + sorted(other_cols)] 

372 

373 logger.info(f"Created summary table with {len(summary_df)} wells and {len(summary_df.columns)} metrics") 

374 

375 # Save to CSV if output path specified 

376 if output_path is None: 

377 output_path = results_dir / consolidation_config.output_filename 

378 

379 if consolidation_config.metaxpress_style: 

380 # Create plate metadata dictionary from config 

381 plate_metadata = { 

382 'barcode': plate_metadata_config.barcode or f"OpenHCS-{results_dir.name}", 

383 'plate_name': plate_metadata_config.plate_name or results_dir.name, 

384 'plate_id': plate_metadata_config.plate_id or str(hash(str(results_dir)) % 100000), 

385 'description': plate_metadata_config.description or f"Consolidated analysis results from OpenHCS pipeline: {len(summary_df)} wells analyzed", 

386 'acquisition_user': plate_metadata_config.acquisition_user, 

387 'z_step': plate_metadata_config.z_step 

388 } 

389 

390 save_with_metaxpress_header(summary_df, output_path, plate_metadata) 

391 logger.info(f"Saved MetaXpress-style summary with header to: {output_path}") 

392 else: 

393 summary_df.to_csv(output_path, index=False) 

394 logger.info(f"Saved consolidated summary to: {output_path}") 

395 

396 return summary_df 

397 

398 

399def materialize_consolidated_results( 

400 data: pd.DataFrame, 

401 output_path: str, 

402 filemanager, 

403 backend: str, 

404 well_id: str 

405) -> str: 

406 """Materialize consolidated results DataFrame to CSV using OpenHCS FileManager.""" 

407 try: 

408 csv_content = data.to_csv(index=False) 

409 

410 # Remove existing file if present 

411 if filemanager.exists(output_path, backend): 

412 filemanager.delete(output_path, backend) 

413 

414 filemanager.save(csv_content, output_path, backend) 

415 logger.info(f"Materialized consolidated results to {output_path}") 

416 return output_path 

417 

418 except Exception as e: 

419 logger.error(f"Failed to materialize consolidated results: {e}") 

420 raise 

421 

422 

423@numpy_func 

424@special_outputs(("consolidated_results", materialize_consolidated_results)) 

425def consolidate_analysis_results_pipeline( 

426 image_stack: np.ndarray, 

427 results_directory: str, 

428 consolidation_config: 'AnalysisConsolidationConfig', 

429 plate_metadata_config: 'PlateMetadataConfig' 

430) -> tuple[np.ndarray, pd.DataFrame]: 

431 """ 

432 Pipeline-compatible version of consolidate_analysis_results. 

433  

434 This function can be used as a FunctionStep in OpenHCS pipelines. 

435 """ 

436 # Call the main consolidation function 

437 summary_df = consolidate_analysis_results( 

438 results_directory=results_directory, 

439 consolidation_config=consolidation_config, 

440 plate_metadata_config=plate_metadata_config, 

441 output_path=None # Will be handled by materialization 

442 ) 

443 

444 return image_stack, summary_df