Coverage for openhcs/core/config.py: 94.1%

211 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Global configuration dataclasses for OpenHCS. 

3 

4This module defines the primary configuration objects used throughout the application, 

5such as VFSConfig, PathPlanningConfig, and the overarching GlobalPipelineConfig. 

6Configuration is intended to be immutable and provided as Python objects. 

7""" 

8 

9import logging 

10import os # For a potentially more dynamic default for num_workers 

11from dataclasses import dataclass, field 

12from pathlib import Path 

13from typing import Optional, Union, Any, List 

14from enum import Enum 

15from abc import ABC, abstractmethod 

16from openhcs.constants import Microscope, VirtualComponents 

17from openhcs.constants.constants import Backend 

18 

19# Import decorator for automatic decorator creation 

20from openhcs.config_framework import auto_create_decorator 

21 

22# Import platform-aware transport mode default 

23# This must be imported here to avoid circular imports 

24import platform 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29class ZarrCompressor(Enum): 

30 """Available compression algorithms for zarr storage.""" 

31 BLOSC = "blosc" 

32 ZLIB = "zlib" 

33 LZ4 = "lz4" 

34 ZSTD = "zstd" 

35 NONE = "none" 

36 

37 def create_compressor(self, compression_level: int, shuffle: bool = True) -> Optional[Any]: 

38 """Create the actual zarr compressor instance. 

39 

40 Args: 

41 compression_level: Compression level (1-22 for ZSTD, 1-9 for others) 

42 shuffle: Enable byte shuffling for better compression (blosc only) 

43 

44 Returns: 

45 Configured zarr compressor instance or None for no compression 

46 """ 

47 import zarr 

48 

49 match self: 

50 case ZarrCompressor.NONE: 50 ↛ 51line 50 didn't jump to line 51 because the pattern on line 50 never matched

51 return None 

52 case ZarrCompressor.BLOSC: 52 ↛ 53line 52 didn't jump to line 53 because the pattern on line 52 never matched

53 return zarr.Blosc(cname='lz4', clevel=compression_level, shuffle=shuffle) 

54 case ZarrCompressor.ZLIB: 54 ↛ 56line 54 didn't jump to line 56 because the pattern on line 54 always matched

55 return zarr.Zlib(level=compression_level) 

56 case ZarrCompressor.LZ4: 

57 return zarr.LZ4(acceleration=compression_level) 

58 case ZarrCompressor.ZSTD: 

59 return zarr.Zstd(level=compression_level) 

60 

61 

62class ZarrChunkStrategy(Enum): 

63 """Chunking strategies for zarr arrays.""" 

64 WELL = "well" # Single chunk per well (optimal for batch I/O) 

65 FILE = "file" # One chunk per file (better for random access) 

66 

67 

68class MaterializationBackend(Enum): 

69 """Available backends for materialization (persistent storage only).""" 

70 AUTO = "auto" 

71 ZARR = "zarr" 

72 DISK = "disk" 

73 OMERO_LOCAL = "omero_local" 

74 

75 

76class WellFilterMode(Enum): 

77 """Well filtering modes for selective materialization.""" 

78 INCLUDE = "include" # Materialize only specified wells 

79 EXCLUDE = "exclude" # Materialize all wells except specified ones 

80 

81 

82class NormalizationMethod(Enum): 

83 """Normalization methods for experimental analysis.""" 

84 FOLD_CHANGE = "fold_change" # value / control_mean 

85 Z_SCORE = "z_score" # (value - control_mean) / control_std 

86 PERCENT_CONTROL = "percent_control" # (value / control_mean) * 100 

87 

88 

89class MicroscopeFormat(Enum): 

90 """Supported microscope formats for experimental analysis.""" 

91 EDDU_CX5 = "EDDU_CX5" # ThermoFisher CX5 format 

92 EDDU_METAXPRESS = "EDDU_metaxpress" # Molecular Devices MetaXpress format 

93 

94 

95class TransportMode(Enum): 

96 """ZMQ transport modes for local vs remote communication.""" 

97 IPC = "ipc" # Inter-process communication (local only, no firewall prompts) 

98 TCP = "tcp" # Network sockets (supports remote, triggers firewall) 

99 

100 

101@auto_create_decorator 

102@dataclass(frozen=True) 

103class GlobalPipelineConfig: 

104 """ 

105 Root configuration object for an OpenHCS pipeline session. 

106 This object is intended to be instantiated at application startup and treated as immutable. 

107 """ 

108 num_workers: int = 1 

109 """Number of worker processes/threads for parallelizable tasks.""" 

110 

111 materialization_results_path: Path = field(default=Path("results"), metadata={'ui_hidden': True}) 

112 """ 

113 Path for materialized analysis results (CSV, JSON files from special outputs). 

114 

115 This is a pipeline-wide setting that controls where all special output materialization 

116 functions save their analysis results, regardless of which step produces them. 

117 

118 Can be relative to plate folder or absolute path. 

119 Default: "results" creates a results/ folder in the plate directory. 

120 Examples: "results", "./analysis", "/data/analysis_results", "../shared_results" 

121 

122 Note: This is separate from per-step image materialization, which is controlled 

123 by the sub_dir field in each step's step_materialization_config. 

124 """ 

125 

126 microscope: Microscope = field(default=Microscope.AUTO, metadata={'ui_hidden': True}) 

127 """Default microscope type for auto-detection.""" 

128 

129 #use_threading: bool = field(default_factory=lambda: os.getenv('OPENHCS_USE_THREADING', 'false').lower() == 'true') 

130 use_threading: bool = field(default_factory=lambda: os.getenv('OPENHCS_USE_THREADING', 'false').lower() == 'true', metadata={'ui_hidden': True}) 

131 """Use ThreadPoolExecutor instead of ProcessPoolExecutor for debugging. Reads from OPENHCS_USE_THREADING environment variable.""" 

132 

133 # Future extension point: 

134 # logging_config: Optional[Dict[str, Any]] = None # For configuring logging levels, handlers 

135 # plugin_settings: Dict[str, Any] = field(default_factory=dict) # For plugin-specific settings 

136 

137 

138# PipelineConfig will be created automatically by the injection system 

139# (GlobalPipelineConfig → PipelineConfig by removing "Global" prefix) 

140 

141 

142 

143# Import utilities for dynamic config creation 

144from openhcs.utils.enum_factory import create_colormap_enum 

145from openhcs.utils.display_config_factory import create_napari_display_config, create_fiji_display_config 

146 

147 

148# Import component order builder from factory module 

149from openhcs.core.streaming_config_factory import build_component_order as _build_component_order 

150 

151# Create colormap enum with minimal set to avoid importing napari (→ dask → GPU libs) 

152# The lazy=True parameter uses a hardcoded minimal set instead of introspecting napari 

153NapariColormap = create_colormap_enum(lazy=True) 

154 

155 

156class NapariDimensionMode(Enum): 

157 """How to handle different dimensions in napari visualization.""" 

158 SLICE = "slice" # Show as 2D slice (take middle slice) 

159 STACK = "stack" # Show as 3D stack/volume 

160 

161 

162class NapariVariableSizeHandling(Enum): 

163 """How to handle images with different sizes in the same layer.""" 

164 SEPARATE_LAYERS = "separate_layers" # Create separate layers per well (preserves exact data) 

165 PAD_TO_MAX = "pad_to_max" # Pad smaller images to match largest (enables stacking) 

166 

167 

168# Create NapariDisplayConfig using factory 

169# Note: Uses lazy colormap enum to avoid importing napari at module level 

170# Note: component_order is automatically derived from VirtualComponents + AllComponents 

171# This makes VirtualComponents the single source of truth 

172NapariDisplayConfig = create_napari_display_config( 

173 colormap_enum=NapariColormap, 

174 dimension_mode_enum=NapariDimensionMode, 

175 variable_size_handling_enum=NapariVariableSizeHandling, 

176 virtual_components=VirtualComponents, 

177 component_order=_build_component_order(), # Auto-generated from VirtualComponents 

178 virtual_component_defaults={ 

179 'source': NapariDimensionMode.SLICE # Separate layers per step by default 

180 } 

181) 

182 

183# Apply the global pipeline config decorator with ui_hidden=True 

184# This config is only inherited by NapariStreamingConfig, so hide it from UI 

185NapariDisplayConfig = global_pipeline_config(ui_hidden=True)(NapariDisplayConfig) 

186 

187 

188# ============================================================================ 

189# Fiji Display Configuration 

190# ============================================================================ 

191 

192class FijiLUT(Enum): 

193 """Fiji/ImageJ LUT options.""" 

194 GRAYS = "Grays" 

195 FIRE = "Fire" 

196 ICE = "Ice" 

197 SPECTRUM = "Spectrum" 

198 RED = "Red" 

199 GREEN = "Green" 

200 BLUE = "Blue" 

201 

202 

203class FijiDimensionMode(Enum): 

204 """ 

205 How to map OpenHCS dimensions to ImageJ hyperstack dimensions. 

206 

207 ImageJ hyperstacks have 3 dimensions: Channels (C), Slices (Z), Frames (T). 

208 Each OpenHCS component (site, channel, z_index, timepoint) can be mapped to one of these. 

209 

210 - WINDOW: Create separate windows for each value (like Napari SLICE mode) 

211 - CHANNEL: Map to ImageJ Channel dimension (C) 

212 - SLICE: Map to ImageJ Slice dimension (Z) 

213 - FRAME: Map to ImageJ Frame dimension (T) 

214 """ 

215 WINDOW = "window" # Separate windows (like Napari SLICE mode) 

216 CHANNEL = "channel" # ImageJ Channel dimension (C) 

217 SLICE = "slice" # ImageJ Slice dimension (Z) 

218 FRAME = "frame" # ImageJ Frame dimension (T) 

219 

220 

221# Create FijiDisplayConfig using factory (with component-specific fields like Napari) 

222# Note: component_order is automatically derived from VirtualComponents + AllComponents 

223# This makes VirtualComponents the single source of truth 

224FijiDisplayConfig = create_fiji_display_config( 

225 lut_enum=FijiLUT, 

226 dimension_mode_enum=FijiDimensionMode, 

227 virtual_components=VirtualComponents, 

228 component_order=_build_component_order(), # Auto-generated from VirtualComponents 

229 virtual_component_defaults={ 

230 # source is WINDOW by default for window grouping (well is already WINDOW in component_defaults) 

231 'source': FijiDimensionMode.WINDOW 

232 } 

233) 

234 

235# Apply the global pipeline config decorator with ui_hidden=True 

236# This config is only inherited by FijiStreamingConfig, so hide it from UI 

237FijiDisplayConfig = global_pipeline_config(ui_hidden=True)(FijiDisplayConfig) 

238# Mark the class directly as well for UI layer checks 

239FijiDisplayConfig._ui_hidden = True 

240 

241 

242@global_pipeline_config 

243@dataclass(frozen=True) 

244class WellFilterConfig: 

245 """Base configuration for well filtering functionality.""" 

246 well_filter: Optional[Union[List[str], str, int]] = None 

247 """Well filter specification: list of wells, pattern string, or max count integer. None means all wells.""" 

248 

249 well_filter_mode: WellFilterMode = WellFilterMode.INCLUDE 

250 """Whether well_filter is an include list or exclude list.""" 

251 

252 

253@global_pipeline_config 

254@dataclass(frozen=True) 

255class ZarrConfig: 

256 """Configuration for Zarr storage backend. 

257 

258 OME-ZARR metadata and plate metadata are always enabled for HCS compliance. 

259 Shuffle filter is always enabled for Blosc compressor (ignored for others). 

260 """ 

261 compressor: ZarrCompressor = ZarrCompressor.ZLIB 

262 """Compression algorithm to use.""" 

263 

264 compression_level: int = 3 

265 """Compression level (1-9 for LZ4, higher = more compression).""" 

266 

267 chunk_strategy: ZarrChunkStrategy = ZarrChunkStrategy.WELL 

268 """Chunking strategy: WELL (single chunk per well) or FILE (one chunk per file).""" 

269 

270 

271@global_pipeline_config 

272@dataclass(frozen=True) 

273class VFSConfig: 

274 """Configuration for Virtual File System (VFS) related operations.""" 

275 read_backend: Backend = Backend.AUTO 

276 """Backend for reading input data. AUTO uses metadata-based detection for OpenHCS plates.""" 

277 

278 intermediate_backend: Backend = Backend.MEMORY 

279 """Backend for storing intermediate step results that are not explicitly materialized.""" 

280 

281 materialization_backend: MaterializationBackend = MaterializationBackend.DISK 

282 """Backend for explicitly materialized outputs (e.g., final results, user-requested saves).""" 

283 

284 

285@global_pipeline_config 

286@dataclass(frozen=True) 

287class AnalysisConsolidationConfig: 

288 """Configuration for automatic analysis results consolidation.""" 

289 enabled: bool = True 

290 """Whether to automatically run analysis consolidation after pipeline completion.""" 

291 

292 metaxpress_style: bool = True 

293 """Whether to generate MetaXpress-compatible output format with headers.""" 

294 

295 well_pattern: str = r"([A-Z]\d{2})" 

296 """Regex pattern for extracting well IDs from filenames.""" 

297 

298 file_extensions: tuple[str, ...] = (".csv",) 

299 """File extensions to include in consolidation.""" 

300 

301 exclude_patterns: tuple[str, ...] = (r".*consolidated.*", r".*metaxpress.*", r".*summary.*") 

302 """Filename patterns to exclude from consolidation.""" 

303 

304 output_filename: str = "metaxpress_style_summary.csv" 

305 """Name of the consolidated output file.""" 

306 

307 

308@global_pipeline_config 

309@dataclass(frozen=True) 

310class PlateMetadataConfig: 

311 """Configuration for plate metadata in MetaXpress-style output.""" 

312 barcode: Optional[str] = None 

313 """Plate barcode. If None, will be auto-generated from plate name.""" 

314 

315 plate_name: Optional[str] = None 

316 """Plate name. If None, will be derived from plate path.""" 

317 

318 plate_id: Optional[str] = None 

319 """Plate ID. If None, will be auto-generated.""" 

320 

321 description: Optional[str] = None 

322 """Experiment description. If None, will be auto-generated.""" 

323 

324 acquisition_user: str = "OpenHCS" 

325 """User who acquired the data.""" 

326 

327 z_step: str = "1" 

328 """Z-step information for MetaXpress compatibility.""" 

329 

330 

331@global_pipeline_config 

332@dataclass(frozen=True) 

333class ExperimentalAnalysisConfig: 

334 """Configuration for experimental analysis system.""" 

335 config_file_name: str = "config.xlsx" 

336 """Name of the experimental configuration Excel file.""" 

337 

338 design_sheet_name: str = "drug_curve_map" 

339 """Name of the sheet containing experimental design.""" 

340 

341 plate_groups_sheet_name: str = "plate_groups" 

342 """Name of the sheet containing plate group mappings.""" 

343 

344 normalization_method: NormalizationMethod = NormalizationMethod.FOLD_CHANGE 

345 """Normalization method for control-based normalization.""" 

346 

347 export_raw_results: bool = True 

348 """Whether to export raw (non-normalized) results.""" 

349 

350 export_heatmaps: bool = True 

351 """Whether to generate heatmap visualizations.""" 

352 

353 auto_detect_format: bool = True 

354 """Whether to automatically detect microscope format.""" 

355 

356 default_format: Optional[MicroscopeFormat] = None 

357 """Default format to use if auto-detection fails.""" 

358 

359 

360@global_pipeline_config 

361@dataclass(frozen=True) 

362class PathPlanningConfig(WellFilterConfig): 

363 """ 

364 Configuration for pipeline path planning and directory structure. 

365 

366 This class handles path construction concerns including plate root directories, 

367 output directory suffixes, and subdirectory organization. It does not handle 

368 analysis results location, which is controlled at the pipeline level. 

369 

370 Inherits well filtering functionality from WellFilterConfig. 

371 """ 

372 output_dir_suffix: str = "_openhcs" 

373 """Default suffix for general step output directories.""" 

374 

375 global_output_folder: Optional[Path] = None 

376 """ 

377 Optional global output folder where all plate workspaces and outputs will be created. 

378 If specified, plate workspaces will be created as {global_output_folder}/{plate_name}_workspace/ 

379 and outputs as {global_output_folder}/{plate_name}_workspace_outputs/. 

380 If None, uses the current behavior (workspace and outputs in same directory as plate). 

381 Example: "/data/results" or "/mnt/hcs_output" 

382 """ 

383 

384 sub_dir: str = "images" 

385 """ 

386 Subdirectory within plate folder for storing processed data. 

387 Examples: "images", "processed", "data/images" 

388 """ 

389 

390@global_pipeline_config 

391@dataclass(frozen=True) 

392class StepWellFilterConfig(WellFilterConfig): 

393 """Well filter configuration specialized for step-level configs with different defaults.""" 

394 # Override defaults for step-level configurations 

395 #well_filter: Optional[Union[List[str], str, int]] = 1 

396 pass 

397 

398@global_pipeline_config 

399@dataclass(frozen=True) 

400class StepMaterializationConfig(StepWellFilterConfig, PathPlanningConfig): 

401 """ 

402 Configuration for per-step materialization - configurable in UI. 

403 

404 This dataclass appears in the UI like any other configuration, allowing users 

405 to set pipeline-level defaults for step materialization behavior. All step 

406 materialization instances will inherit these defaults unless explicitly overridden. 

407 

408 Uses multiple inheritance from PathPlanningConfig and StepWellFilterConfig. 

409 """ 

410 

411 #Override sub_dir for materialization-specific default 

412 sub_dir: str = "checkpoints" 

413 """Subdirectory for materialized outputs (different from global 'images').""" 

414 

415 enabled: bool = True 

416 """Whether this materialization config is enabled. When False, config exists but materialization is disabled.""" 

417 

418 

419# Define platform-aware default transport mode at module level 

420# TCP on Windows (no Unix domain socket support), IPC on Unix/Mac 

421_DEFAULT_TRANSPORT_MODE = TransportMode.TCP if platform.system() == 'Windows' else TransportMode.IPC 

422 

423 

424@global_pipeline_config 

425@dataclass(frozen=True) 

426class StreamingDefaults: 

427 """Default configuration for streaming to visualizers.""" 

428 persistent: bool = True 

429 """Whether viewer stays open after pipeline completion.""" 

430 

431 host: str = 'localhost' 

432 """Host for streaming communication. Use 'localhost' for local, or remote IP for network streaming.""" 

433 

434 port: int = None # Subclasses must override with their specific default 

435 """Port for streaming communication. Each streamer type has its own default.""" 

436 

437 transport_mode: TransportMode = _DEFAULT_TRANSPORT_MODE 

438 """ZMQ transport mode: Platform-aware default (TCP on Windows, IPC on Unix/Mac).""" 

439 

440 enabled: bool = True 

441 """Whether this streaming config is enabled. When False, config exists but streaming is disabled.""" 

442 

443@global_pipeline_config(ui_hidden=True) 

444@dataclass(frozen=True) 

445class StreamingConfig(StepWellFilterConfig, StreamingDefaults, ABC): 

446 """Abstract base configuration for streaming to visualizers. 

447 

448 Uses multiple inheritance from StepWellFilterConfig and StreamingDefaults. 

449 Inherited fields (persistent, host, port, transport_mode) are automatically set to None 

450 by @global_pipeline_config(inherit_as_none=True), enabling polymorphic access without 

451 type-specific attribute names. 

452 """ 

453 

454 @property 

455 @abstractmethod 

456 def backend(self) -> Backend: 

457 """Backend enum for this streaming type.""" 

458 pass 

459 

460 @property 

461 @abstractmethod 

462 def viewer_type(self) -> str: 

463 """Viewer type identifier (e.g., 'napari', 'fiji') for queue tracking and logging.""" 

464 pass 

465 

466 @property 

467 @abstractmethod 

468 def step_plan_output_key(self) -> str: 

469 """Key to use in step_plan for this config's output paths.""" 

470 pass 

471 

472 @abstractmethod 

473 def get_streaming_kwargs(self, global_config) -> dict: 

474 """Return kwargs needed for this streaming backend.""" 

475 pass 

476 

477 @abstractmethod 

478 def create_visualizer(self, filemanager, visualizer_config): 

479 """Create and return the appropriate visualizer for this streaming config.""" 

480 pass 

481 

482 

483# Auto-generate streaming configs using factory (reduces ~110 lines to ~20 lines) 

484from openhcs.core.streaming_config_factory import create_streaming_config 

485 

486NapariStreamingConfig = create_streaming_config( 

487 viewer_name='napari', 

488 port=5555, 

489 backend=Backend.NAPARI_STREAM, 

490 display_config_class=NapariDisplayConfig, 

491 visualizer_module='openhcs.runtime.napari_stream_visualizer', 

492 visualizer_class_name='NapariStreamVisualizer' 

493) 

494 

495FijiStreamingConfig = create_streaming_config( 

496 viewer_name='fiji', 

497 port=5565, 

498 backend=Backend.FIJI_STREAM, 

499 display_config_class=FijiDisplayConfig, 

500 visualizer_module='openhcs.runtime.fiji_stream_visualizer', 

501 visualizer_class_name='FijiStreamVisualizer', 

502 extra_fields={ 

503 'fiji_executable_path': (Optional[Path], None) 

504 } 

505) 

506 

507# Inject all accumulated fields at the end of module loading 

508from openhcs.config_framework.lazy_factory import _inject_all_pending_fields 

509_inject_all_pending_fields() 

510 

511 

512# ============================================================================ 

513# Streaming Port Utilities 

514# ============================================================================ 

515 

516# Import streaming port utility from factory module 

517from openhcs.core.streaming_config_factory import get_all_streaming_ports 

518 

519 

520# ============================================================================ 

521# Configuration Framework Initialization 

522# ============================================================================ 

523 

524# Initialize configuration framework with OpenHCS types 

525from openhcs.config_framework import set_base_config_type 

526 

527set_base_config_type(GlobalPipelineConfig) 

528 

529# Note: We use the framework's default MRO-based priority function. 

530# More derived classes automatically get higher priority through MRO depth. 

531# No custom priority function needed - the framework handles it generically. 

532 

533logger.debug("Configuration framework initialized with OpenHCS types") 

534 

535# PERFORMANCE OPTIMIZATION: Cache warming is now done asynchronously in GUI startup 

536# to avoid blocking imports. For non-GUI contexts (CLI, subprocess), cache warming 

537# happens on-demand when config windows are first opened. 

538 

539# NOTE: Step editor cache warming is done in openhcs.core.steps.__init__ to avoid circular imports