Coverage for openhcs/processing/backends/analysis/consolidate_special_outputs.py: 12.8%

160 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Generic consolidation of OpenHCS special outputs into summary tables. 

3 

4This module provides a generic function for consolidating any CSV-based special outputs 

5from OpenHCS analysis pipelines into summary tables. It automatically detects well-based 

6naming patterns and creates comprehensive summary statistics. 

7 

8Follows OpenHCS architectural principles: 

9- Uses FileManager for all I/O operations 

10- Proper memory type decorators 

11- Special I/O integration 

12- Fail-loud behavior 

13- Stateless design 

14""" 

15 

16import numpy as np 

17import pandas as pd 

18import re 

19import logging 

20from pathlib import Path 

21from typing import Dict, List, Tuple, Any, Optional, Union 

22from enum import Enum 

23 

24from openhcs.core.memory.decorators import numpy as numpy_func 

25from openhcs.core.pipeline.function_contracts import special_outputs, special_inputs 

26from openhcs.constants.constants import Backend 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class AggregationStrategy(Enum): 

32 """Aggregation strategies for different data types.""" 

33 NUMERIC = "numeric" 

34 CATEGORICAL = "categorical" 

35 BOOLEAN = "boolean" 

36 MIXED = "mixed" 

37 

38 

39class WellPatternType(Enum): 

40 """Common well ID patterns for different plate formats.""" 

41 STANDARD_96 = r"([A-H]\d{2})" # A01, B02, etc. 

42 STANDARD_384 = r"([A-P]\d{2})" # A01-P24 

43 CUSTOM = "custom" 

44 

45 

46def materialize_consolidated_summary( 

47 data: Dict[str, Any], 

48 output_path: str, 

49 filemanager, 

50 well_id: str 

51) -> str: 

52 """ 

53 Materialize consolidated summary data to CSV file. 

54  

55 Args: 

56 data: Dictionary containing consolidated summary data 

57 output_path: Path where CSV should be saved 

58 filemanager: OpenHCS FileManager instance 

59 well_id: Well identifier 

60  

61 Returns: 

62 Path to saved CSV file 

63 """ 

64 try: 

65 # Convert to DataFrame 

66 if 'summary_table' in data: 

67 df = pd.DataFrame(data['summary_table']) 

68 else: 

69 # Fallback: create DataFrame from raw data 

70 df = pd.DataFrame([data]) 

71 

72 # Generate CSV content 

73 csv_content = df.to_csv(index=False) 

74 

75 # Save using FileManager (remove existing first if present) 

76 if filemanager.exists(output_path, Backend.DISK.value): 

77 filemanager.delete(output_path, Backend.DISK.value) 

78 

79 filemanager.save(csv_content, output_path, Backend.DISK.value) 

80 

81 logger.info(f"Materialized consolidated summary to {output_path}") 

82 return output_path 

83 

84 except Exception as e: 

85 logger.error(f"Failed to materialize consolidated summary: {e}") 

86 raise 

87 

88 

89def materialize_detailed_report( 

90 data: Dict[str, Any], 

91 output_path: str, 

92 filemanager, 

93 well_id: str 

94) -> str: 

95 """ 

96 Materialize detailed analysis report to text file. 

97  

98 Args: 

99 data: Dictionary containing analysis data 

100 output_path: Path where report should be saved 

101 filemanager: OpenHCS FileManager instance 

102 well_id: Well identifier 

103  

104 Returns: 

105 Path to saved report file 

106 """ 

107 try: 

108 report_lines = [] 

109 report_lines.append("="*80) 

110 report_lines.append("OPENHCS SPECIAL OUTPUTS CONSOLIDATION REPORT") 

111 report_lines.append("="*80) 

112 

113 if 'metadata' in data: 

114 metadata = data['metadata'] 

115 report_lines.append(f"Analysis timestamp: {metadata.get('timestamp', 'Unknown')}") 

116 report_lines.append(f"Total wells processed: {metadata.get('total_wells', 0)}") 

117 report_lines.append(f"Output types detected: {metadata.get('output_types', [])}") 

118 report_lines.append("") 

119 

120 if 'summary_stats' in data: 

121 stats = data['summary_stats'] 

122 report_lines.append("SUMMARY STATISTICS:") 

123 report_lines.append("-" * 40) 

124 for output_type, type_stats in stats.items(): 

125 report_lines.append(f"\n{output_type.upper()}:") 

126 for metric, value in type_stats.items(): 

127 if isinstance(value, float): 

128 report_lines.append(f" {metric}: {value:.3f}") 

129 else: 

130 report_lines.append(f" {metric}: {value}") 

131 

132 report_lines.append("\n" + "="*80) 

133 

134 # Save report 

135 report_content = "\n".join(report_lines) 

136 

137 if filemanager.exists(output_path, Backend.DISK.value): 

138 filemanager.delete(output_path, Backend.DISK.value) 

139 

140 filemanager.save(report_content, output_path, Backend.DISK.value) 

141 

142 logger.info(f"Materialized detailed report to {output_path}") 

143 return output_path 

144 

145 except Exception as e: 

146 logger.error(f"Failed to materialize detailed report: {e}") 

147 raise 

148 

149 

150def extract_well_id(filename: str, pattern: str = WellPatternType.STANDARD_96.value) -> Optional[str]: 

151 """ 

152 Extract well ID from filename using regex pattern. 

153  

154 Args: 

155 filename: Name of the file 

156 pattern: Regex pattern for well ID extraction 

157  

158 Returns: 

159 Well ID if found, None otherwise 

160 """ 

161 match = re.search(pattern, filename) 

162 return match.group(1) if match else None 

163 

164 

165def detect_aggregation_strategy(series: pd.Series) -> AggregationStrategy: 

166 """ 

167 Automatically detect the appropriate aggregation strategy for a data series. 

168  

169 Args: 

170 series: Pandas series to analyze 

171  

172 Returns: 

173 Appropriate aggregation strategy 

174 """ 

175 # Check if boolean 

176 if series.dtype == bool or set(series.dropna().unique()).issubset({0, 1, True, False}): 

177 return AggregationStrategy.BOOLEAN 

178 

179 # Check if numeric 

180 if pd.api.types.is_numeric_dtype(series): 

181 return AggregationStrategy.NUMERIC 

182 

183 # Check if categorical (string/object with limited unique values) 

184 unique_ratio = len(series.unique()) / len(series) 

185 if unique_ratio < 0.5: # Less than 50% unique values suggests categorical 

186 return AggregationStrategy.CATEGORICAL 

187 

188 return AggregationStrategy.MIXED 

189 

190 

191def aggregate_series(series: pd.Series, strategy: AggregationStrategy) -> Dict[str, Any]: 

192 """ 

193 Aggregate a pandas series based on the specified strategy. 

194  

195 Args: 

196 series: Series to aggregate 

197 strategy: Aggregation strategy to use 

198  

199 Returns: 

200 Dictionary of aggregated statistics 

201 """ 

202 result = {} 

203 

204 if strategy == AggregationStrategy.NUMERIC: 

205 result.update({ 

206 'count': len(series), 

207 'mean': series.mean(), 

208 'std': series.std(), 

209 'min': series.min(), 

210 'max': series.max(), 

211 'sum': series.sum(), 

212 'median': series.median() 

213 }) 

214 

215 elif strategy == AggregationStrategy.BOOLEAN: 

216 result.update({ 

217 'count': len(series), 

218 'true_count': series.sum(), 

219 'false_count': len(series) - series.sum(), 

220 'true_percentage': (series.sum() / len(series)) * 100 

221 }) 

222 

223 elif strategy == AggregationStrategy.CATEGORICAL: 

224 value_counts = series.value_counts() 

225 result.update({ 

226 'count': len(series), 

227 'unique_values': len(series.unique()), 

228 'most_common': value_counts.index[0] if len(value_counts) > 0 else None, 

229 'most_common_count': value_counts.iloc[0] if len(value_counts) > 0 else 0, 

230 'unique_values_list': ','.join(map(str, series.unique())) 

231 }) 

232 

233 else: # MIXED 

234 result.update({ 

235 'count': len(series), 

236 'unique_values': len(series.unique()), 

237 'data_type': str(series.dtype) 

238 }) 

239 

240 return result 

241 

242 

243@numpy_func 

244@special_outputs( 

245 ("consolidated_summary", materialize_consolidated_summary), 

246 ("detailed_report", materialize_detailed_report) 

247) 

248def consolidate_special_outputs( 

249 image_stack: np.ndarray, 

250 results_directory: str, 

251 well_pattern: str = WellPatternType.STANDARD_96.value, 

252 file_extensions: List[str] = [".csv"], 

253 include_patterns: Optional[List[str]] = None, 

254 exclude_patterns: Optional[List[str]] = None, 

255 custom_aggregations: Optional[Dict[str, Dict[str, str]]] = None 

256) -> Tuple[np.ndarray, Dict[str, Any], Dict[str, Any]]: 

257 """ 

258 Consolidate special outputs from OpenHCS analysis into summary tables. 

259  

260 This function automatically detects CSV files with well-based naming patterns, 

261 groups them by output type, and creates comprehensive summary statistics. 

262  

263 Args: 

264 image_stack: Input image stack (dummy for OpenHCS compatibility) 

265 results_directory: Directory containing special output files 

266 well_pattern: Regex pattern for extracting well IDs 

267 file_extensions: List of file extensions to process 

268 include_patterns: Optional list of filename patterns to include 

269 exclude_patterns: Optional list of filename patterns to exclude 

270 custom_aggregations: Optional custom aggregation rules per output type 

271  

272 Returns: 

273 Tuple of (image_stack, consolidated_summary, detailed_report) 

274 """ 

275 from openhcs.io.filemanager import FileManager 

276 from openhcs.io.base import storage_registry 

277 from datetime import datetime 

278 

279 # Initialize FileManager 

280 filemanager = FileManager(storage_registry) 

281 

282 logger.info(f"Consolidating special outputs from: {results_directory}") 

283 

284 # Find all relevant files 

285 all_files = [] 

286 for ext in file_extensions: 

287 pattern = f"*{ext}" 

288 files = filemanager.list_files(results_directory, Backend.DISK.value, pattern=pattern, recursive=False) 

289 all_files.extend(files) 

290 

291 logger.info(f"Found {len(all_files)} files with extensions {file_extensions}") 

292 

293 # Apply include/exclude filters 

294 if include_patterns: 

295 all_files = [f for f in all_files if any(re.search(pattern, Path(f).name) for pattern in include_patterns)] 

296 

297 if exclude_patterns: 

298 all_files = [f for f in all_files if not any(re.search(pattern, Path(f).name) for pattern in exclude_patterns)] 

299 

300 logger.info(f"After filtering: {len(all_files)} files to process") 

301 

302 # Group files by well ID and output type 

303 wells_data = {} 

304 output_types = set() 

305 

306 for file_path in all_files: 

307 filename = Path(file_path).name 

308 well_id = extract_well_id(filename, well_pattern) 

309 

310 if not well_id: 

311 logger.warning(f"Could not extract well ID from {filename}, skipping") 

312 continue 

313 

314 # Extract output type (everything after well ID and before extension) 

315 output_type = filename.replace(f"{well_id}_", "").replace(Path(filename).suffix, "") 

316 output_types.add(output_type) 

317 

318 if well_id not in wells_data: 

319 wells_data[well_id] = {} 

320 

321 wells_data[well_id][output_type] = file_path 

322 

323 logger.info(f"Processing {len(wells_data)} wells with output types: {sorted(output_types)}") 

324 

325 # Process each output type and create summary statistics 

326 summary_table = [] 

327 summary_stats = {} 

328 

329 for output_type in sorted(output_types): 

330 logger.info(f"Processing output type: {output_type}") 

331 

332 # Collect data for this output type across all wells 

333 type_data = [] 

334 wells_with_type = [] 

335 

336 for well_id, well_files in wells_data.items(): 

337 if output_type in well_files: 

338 try: 

339 file_path = well_files[output_type] 

340 df = pd.read_csv(file_path) 

341 

342 # Create well-level summary 

343 well_summary = {'well_id': well_id, 'output_type': output_type} 

344 

345 # Aggregate each column 

346 for col in df.columns: 

347 if col in ['well_id', 'output_type']: 

348 continue 

349 

350 strategy = detect_aggregation_strategy(df[col]) 

351 col_stats = aggregate_series(df[col], strategy) 

352 

353 # Prefix column stats with column name 

354 for stat_name, stat_value in col_stats.items(): 

355 well_summary[f"{col}_{stat_name}"] = stat_value 

356 

357 type_data.append(well_summary) 

358 wells_with_type.append(well_id) 

359 

360 except Exception as e: 

361 logger.error(f"Error processing {file_path}: {e}") 

362 continue 

363 

364 # Add to summary table 

365 summary_table.extend(type_data) 

366 

367 # Create type-level statistics 

368 if type_data: 

369 type_df = pd.DataFrame(type_data) 

370 type_stats = { 

371 'wells_count': len(wells_with_type), 

372 'wells_list': ','.join(sorted(wells_with_type)) 

373 } 

374 

375 # Add aggregate statistics for numeric columns 

376 numeric_cols = type_df.select_dtypes(include=[np.number]).columns 

377 for col in numeric_cols: 

378 if col != 'well_id': 

379 type_stats[f"{col}_mean"] = type_df[col].mean() 

380 type_stats[f"{col}_std"] = type_df[col].std() 

381 

382 summary_stats[output_type] = type_stats 

383 

384 # Create consolidated summary 

385 consolidated_summary = { 

386 'summary_table': summary_table, 

387 'metadata': { 

388 'timestamp': datetime.now().isoformat(), 

389 'total_wells': len(wells_data), 

390 'output_types': sorted(output_types), 

391 'total_files_processed': len(all_files), 

392 'well_pattern': well_pattern 

393 } 

394 } 

395 

396 # Create detailed report 

397 detailed_report = { 

398 'summary_stats': summary_stats, 

399 'metadata': consolidated_summary['metadata'] 

400 } 

401 

402 logger.info(f"Consolidation complete: {len(summary_table)} well-output combinations processed") 

403 

404 return image_stack, consolidated_summary, detailed_report