Coverage for openhcs/constants/constants.py: 77.6%

241 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Consolidated constants for OpenHCS. 

3 

4This module defines all constants related to backends, defaults, I/O, memory, and pipeline. 

5These constants are governed by various doctrinal clauses. 

6 

7Caching: 

8- Component enums (AllComponents, VariableComponents, GroupBy) are cached persistently 

9- Cache invalidated on OpenHCS version change or after 7 days 

10- Provides ~20x speedup on subsequent runs and in subprocesses 

11""" 

12 

13from enum import Enum 

14from functools import lru_cache 

15from typing import Any, Callable, Set, TypeVar, Dict, Tuple 

16import logging 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class Microscope(Enum): 

22 AUTO = "auto" 

23 OPENHCS = "openhcs" # Added for the OpenHCS pre-processed format 

24 IMAGEXPRESS = "ImageXpress" 

25 OPERAPHENIX = "OperaPhenix" 

26 OMERO = "omero" # Added for OMERO virtual filesystem backend 

27 

28 

29class VirtualComponents(Enum): 

30 """ 

31 Components that don't come from filename parsing but from execution/location context. 

32 

33 SOURCE represents: 

34 - During pipeline execution: step_name (distinguishes pipeline steps) 

35 - When loading from disk: subdirectory name (distinguishes image sources) 

36 

37 This unifies the step/source concept across Napari and Fiji viewers. 

38 """ 

39 SOURCE = "source" # Unified step/source component 

40 

41 

42def get_openhcs_config(): 

43 """Get the OpenHCS configuration, initializing it if needed.""" 

44 from openhcs.components.framework import ComponentConfigurationFactory 

45 return ComponentConfigurationFactory.create_openhcs_default_configuration() 

46 

47 

48# Lazy import cache manager to avoid circular dependencies 

49_component_enum_cache_manager = None 

50 

51 

52def _get_component_enum_cache_manager(): 

53 """Lazy import of cache manager for component enums.""" 

54 global _component_enum_cache_manager 

55 if _component_enum_cache_manager is None: 55 ↛ 92line 55 didn't jump to line 92 because the condition on line 55 was always true

56 try: 

57 from openhcs.core.registry_cache import RegistryCacheManager, CacheConfig 

58 

59 def get_version(): 

60 try: 

61 import openhcs 

62 return openhcs.__version__ 

63 except: 

64 return "unknown" 

65 

66 # Serializer for component enum data 

67 # Note: RegistryCacheManager calls serializer(item) for each item in the dict 

68 # We store all three enums as a single item with key 'enums' 

69 def serialize_component_enums(enum_data: Dict[str, Any]) -> Dict[str, Any]: 

70 """Serialize the three component enum dicts to JSON.""" 

71 return enum_data # Already a dict of dicts 

72 

73 # Deserializer for component enum data 

74 def deserialize_component_enums(data: Dict[str, Any]) -> Dict[str, Any]: 

75 """Deserialize component enum data from JSON.""" 

76 return data # Already a dict of dicts 

77 

78 _component_enum_cache_manager = RegistryCacheManager( 

79 cache_name="component_enums", 

80 version_getter=get_version, 

81 serializer=serialize_component_enums, 

82 deserializer=deserialize_component_enums, 

83 config=CacheConfig( 

84 max_age_days=7, 

85 check_mtimes=False # No file tracking needed for config-based enums 

86 ) 

87 ) 

88 except Exception as e: 

89 logger.debug(f"Failed to initialize component enum cache manager: {e}") 

90 _component_enum_cache_manager = False # Mark as failed to avoid retrying 

91 

92 return _component_enum_cache_manager if _component_enum_cache_manager is not False else None 

93 

94 

95def _add_groupby_methods(GroupBy: Enum) -> Enum: 

96 """Add custom methods to GroupBy enum.""" 

97 GroupBy.component = property(lambda self: self.value) 

98 GroupBy.__eq__ = lambda self, other: self.value == getattr(other, 'value', other) 

99 GroupBy.__hash__ = lambda self: hash("GroupBy.NONE") if self.value is None else hash(self.value) 

100 GroupBy.__str__ = lambda self: f"GroupBy.{self.name}" 

101 GroupBy.__repr__ = lambda self: f"GroupBy.{self.name}" 

102 return GroupBy 

103 

104 

105# Simple lazy initialization - just defer the config call 

106@lru_cache(maxsize=1) 

107def _create_enums(): 

108 """Create enums when first needed with persistent caching. 

109 

110 CRITICAL: This function must create enums with proper __module__ and __qualname__ 

111 attributes so they can be pickled correctly in multiprocessing contexts. 

112 The enums are stored in module globals() to ensure identity consistency. 

113 

114 Caching provides ~20x speedup on subsequent runs and in subprocesses. 

115 """ 

116 import os 

117 import traceback 

118 logger.info(f"🔧 _create_enums() CALLED in process {os.getpid()}") 

119 logger.info(f"🔧 _create_enums() cache_info: {_create_enums.cache_info()}") 

120 logger.info(f"🔧 _create_enums() STACK TRACE:\n{''.join(traceback.format_stack())}") 

121 

122 # Try to load from persistent cache first 

123 cache_manager = _get_component_enum_cache_manager() 

124 if cache_manager: 124 ↛ 151line 124 didn't jump to line 151 because the condition on line 124 was always true

125 try: 

126 cached_dict = cache_manager.load_cache() 

127 if cached_dict is not None and 'enums' in cached_dict: 127 ↛ 129line 127 didn't jump to line 129 because the condition on line 127 was never true

128 # Cache hit - reconstruct enums from cached data 

129 cached_data = cached_dict['enums'] 

130 logger.debug("✅ Loading component enums from cache") 

131 

132 all_components = Enum('AllComponents', cached_data['all_components']) 

133 all_components.__module__ = __name__ 

134 all_components.__qualname__ = 'AllComponents' 

135 

136 vc = Enum('VariableComponents', cached_data['variable_components']) 

137 vc.__module__ = __name__ 

138 vc.__qualname__ = 'VariableComponents' 

139 

140 GroupBy = Enum('GroupBy', cached_data['group_by']) 

141 GroupBy.__module__ = __name__ 

142 GroupBy.__qualname__ = 'GroupBy' 

143 GroupBy = _add_groupby_methods(GroupBy) 

144 

145 logger.info(f"🔧 _create_enums() LOADED FROM CACHE in process {os.getpid()}") 

146 return all_components, vc, GroupBy 

147 except Exception as e: 

148 logger.debug(f"Cache load failed for component enums: {e}") 

149 

150 # Cache miss or disabled - create enums from config 

151 config = get_openhcs_config() 

152 remaining = config.get_remaining_components() 

153 

154 # AllComponents: ALL possible dimensions (including multiprocessing axis) 

155 all_components_dict = {c.name: c.value for c in config.all_components} 

156 all_components = Enum('AllComponents', all_components_dict) 

157 all_components.__module__ = __name__ 

158 all_components.__qualname__ = 'AllComponents' 

159 

160 # VariableComponents: Components available for variable selection (excludes multiprocessing axis) 

161 vc_dict = {c.name: c.value for c in remaining} 

162 vc = Enum('VariableComponents', vc_dict) 

163 vc.__module__ = __name__ 

164 vc.__qualname__ = 'VariableComponents' 

165 

166 # GroupBy: Same as VariableComponents + NONE option (they're the same concept) 

167 gb_dict = {c.name: c.value for c in remaining} 

168 gb_dict['NONE'] = None 

169 GroupBy = Enum('GroupBy', gb_dict) 

170 GroupBy.__module__ = __name__ 

171 GroupBy.__qualname__ = 'GroupBy' 

172 GroupBy = _add_groupby_methods(GroupBy) 

173 

174 # Save to persistent cache 

175 # Store all three enums as a single item with key 'enums' 

176 if cache_manager: 176 ↛ 188line 176 didn't jump to line 188 because the condition on line 176 was always true

177 try: 

178 enum_data = { 

179 'all_components': all_components_dict, 

180 'variable_components': vc_dict, 

181 'group_by': gb_dict 

182 } 

183 cache_manager.save_cache({'enums': enum_data}) 

184 logger.debug("💾 Saved component enums to cache") 

185 except Exception as e: 

186 logger.debug(f"Failed to save component enum cache: {e}") 

187 

188 logger.info(f"🔧 _create_enums() RETURNING in process {os.getpid()}: " 

189 f"AllComponents={id(all_components)}, VariableComponents={id(vc)}, GroupBy={id(GroupBy)}") 

190 logger.info(f"🔧 _create_enums() cache_info after return: {_create_enums.cache_info()}") 

191 return all_components, vc, GroupBy 

192 

193 

194@lru_cache(maxsize=1) 

195def _create_streaming_components(): 

196 """Create StreamingComponents enum combining AllComponents + VirtualComponents. 

197 

198 This enum includes both filename components (from parser) and virtual components 

199 (from execution/location context) for streaming visualization. 

200 """ 

201 import logging 

202 import os 

203 logger = logging.getLogger(__name__) 

204 logger.info(f"🔧 _create_streaming_components() CALLED in process {os.getpid()}") 

205 

206 # Import AllComponents (triggers lazy creation if needed) 

207 from openhcs.constants import AllComponents 

208 

209 # Combine all component types 

210 components_dict = {c.name: c.value for c in AllComponents} 

211 components_dict.update({c.name: c.value for c in VirtualComponents}) 

212 

213 streaming_components = Enum('StreamingComponents', components_dict) 

214 streaming_components.__module__ = __name__ 

215 streaming_components.__qualname__ = 'StreamingComponents' 

216 

217 logger.info(f"🔧 _create_streaming_components() RETURNING: StreamingComponents={id(streaming_components)}") 

218 return streaming_components 

219 

220 

221def __getattr__(name): 

222 """Lazy enum creation with identity guarantee. 

223 

224 CRITICAL: Ensures enums are created exactly once per process and stored in globals() 

225 so that pickle identity checks pass in multiprocessing contexts. 

226 """ 

227 if name in ('AllComponents', 'VariableComponents', 'GroupBy'): 

228 # Check if already created (handles race conditions) 

229 if name in globals(): 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 return globals()[name] 

231 

232 # Create all enums at once and store in globals 

233 import logging 

234 import os 

235 logger = logging.getLogger(__name__) 

236 logger.info(f"🔧 ENUM CREATION: Creating {name} in process {os.getpid()}") 

237 

238 all_components, vc, gb = _create_enums() 

239 globals()['AllComponents'] = all_components 

240 globals()['VariableComponents'] = vc 

241 globals()['GroupBy'] = gb 

242 

243 logger.info(f"🔧 ENUM CREATION: Created enums in process {os.getpid()}: " 

244 f"AllComponents={id(all_components)}, VariableComponents={id(vc)}, GroupBy={id(gb)}") 

245 logger.info(f"🔧 ENUM CREATION: VariableComponents.__module__={vc.__module__}, __qualname__={vc.__qualname__}") 

246 

247 return globals()[name] 

248 

249 if name == 'StreamingComponents': 249 ↛ 251line 249 didn't jump to line 251 because the condition on line 249 was never true

250 # Check if already created 

251 if name in globals(): 

252 return globals()[name] 

253 

254 import logging 

255 import os 

256 logger = logging.getLogger(__name__) 

257 logger.info(f"🔧 ENUM CREATION: Creating StreamingComponents in process {os.getpid()}") 

258 

259 streaming_components = _create_streaming_components() 

260 globals()['StreamingComponents'] = streaming_components 

261 

262 logger.info(f"🔧 ENUM CREATION: Created StreamingComponents in process {os.getpid()}: " 

263 f"StreamingComponents={id(streaming_components)}") 

264 

265 return globals()[name] 

266 

267 raise AttributeError(f"module '{__name__}' has no attribute '{name}'") 

268 

269 

270 

271 

272 

273#Documentation URL 

274DOCUMENTATION_URL = "https://openhcs.readthedocs.io/en/latest/" 

275 

276 

277class OrchestratorState(Enum): 

278 """Simple orchestrator state tracking - no complex state machine.""" 

279 CREATED = "created" # Object exists, not initialized 

280 READY = "ready" # Initialized, ready for compilation 

281 COMPILED = "compiled" # Compilation complete, ready for execution 

282 EXECUTING = "executing" # Execution in progress 

283 COMPLETED = "completed" # Execution completed successfully 

284 INIT_FAILED = "init_failed" # Initialization failed 

285 COMPILE_FAILED = "compile_failed" # Compilation failed (implies initialized) 

286 EXEC_FAILED = "exec_failed" # Execution failed (implies compiled) 

287 

288# I/O-related constants 

289DEFAULT_IMAGE_EXTENSION = ".tif" 

290DEFAULT_IMAGE_EXTENSIONS: Set[str] = {".tif", ".tiff", ".TIF", ".TIFF"} 

291DEFAULT_SITE_PADDING = 3 

292DEFAULT_RECURSIVE_PATTERN_SEARCH = False 

293# Lazy default resolution using lru_cache 

294@lru_cache(maxsize=1) 

295def get_default_variable_components(): 

296 """Get default variable components from ComponentConfiguration.""" 

297 _, vc, _ = _create_enums() # Get the enum directly 

298 return [getattr(vc, c.name) for c in get_openhcs_config().default_variable] 

299 

300 

301@lru_cache(maxsize=1) 

302def get_default_group_by(): 

303 """Get default group_by from ComponentConfiguration.""" 

304 _, _, gb = _create_enums() # Get the enum directly 

305 config = get_openhcs_config() 

306 return getattr(gb, config.default_group_by.name) if config.default_group_by else None 

307 

308@lru_cache(maxsize=1) 

309def get_multiprocessing_axis(): 

310 """Get multiprocessing axis from ComponentConfiguration.""" 

311 config = get_openhcs_config() 

312 return config.multiprocessing_axis 

313 

314DEFAULT_MICROSCOPE: Microscope = Microscope.AUTO 

315 

316 

317 

318 

319 

320# Backend-related constants 

321class Backend(Enum): 

322 AUTO = "auto" 

323 DISK = "disk" 

324 MEMORY = "memory" 

325 ZARR = "zarr" 

326 NAPARI_STREAM = "napari_stream" 

327 FIJI_STREAM = "fiji_stream" 

328 OMERO_LOCAL = "omero_local" 

329 VIRTUAL_WORKSPACE = "virtual_workspace" 

330 

331class FileFormat(Enum): 

332 TIFF = list(DEFAULT_IMAGE_EXTENSIONS) 

333 NUMPY = [".npy"] 

334 TORCH = [".pt", ".torch", ".pth"] 

335 JAX = [".jax"] 

336 CUPY = [".cupy",".craw"] 

337 TENSORFLOW = [".tf"] 

338 JSON = [".json"] 

339 CSV = [".csv"] 

340 TEXT = [".txt", ".py", ".md"] 

341 ROI = [".roi.zip"] 

342 

343DEFAULT_BACKEND = Backend.MEMORY 

344REQUIRES_DISK_READ = "requires_disk_read" 

345REQUIRES_DISK_WRITE = "requires_disk_write" 

346FORCE_DISK_WRITE = "force_disk_write" 

347READ_BACKEND = "read_backend" 

348WRITE_BACKEND = "write_backend" 

349 

350# Default values 

351DEFAULT_TILE_OVERLAP = 10.0 

352DEFAULT_MAX_SHIFT = 50 

353DEFAULT_MARGIN_RATIO = 0.1 

354DEFAULT_PIXEL_SIZE = 1.0 

355DEFAULT_ASSEMBLER_LOG_LEVEL = "INFO" 

356DEFAULT_INTERPOLATION_MODE = "nearest" 

357DEFAULT_INTERPOLATION_ORDER = 1 

358DEFAULT_CPU_THREAD_COUNT = 4 

359DEFAULT_PATCH_SIZE = 128 

360DEFAULT_SEARCH_RADIUS = 20 

361# Consolidated definition for CPU thread count 

362 

363# ZMQ transport constants 

364# Note: Streaming port defaults are defined in NapariStreamingConfig and FijiStreamingConfig 

365CONTROL_PORT_OFFSET = 1000 # Control port = data port + 1000 

366DEFAULT_EXECUTION_SERVER_PORT = 7777 

367IPC_SOCKET_DIR_NAME = "ipc" # ~/.openhcs/ipc/ 

368IPC_SOCKET_PREFIX = "openhcs-zmq" # ipc://openhcs-zmq-{port} or ~/.openhcs/ipc/openhcs-zmq-{port}.sock 

369IPC_SOCKET_EXTENSION = ".sock" # Unix domain socket extension 

370 

371 

372# Memory-related constants 

373T = TypeVar('T') 

374ConversionFunc = Callable[[Any], Any] 

375 

376class MemoryType(Enum): 

377 NUMPY = "numpy" 

378 CUPY = "cupy" 

379 TORCH = "torch" 

380 TENSORFLOW = "tensorflow" 

381 JAX = "jax" 

382 PYCLESPERANTO = "pyclesperanto" 

383 

384 @property 

385 def converter(self): 

386 """Get the converter instance for this memory type.""" 

387 from openhcs.core.memory.conversion_helpers import _CONVERTERS 

388 return _CONVERTERS[self] 

389 

390# Auto-generate to_X() methods on enum 

391def _add_conversion_methods(): 

392 """Add to_X() conversion methods to MemoryType enum.""" 

393 for target_type in MemoryType: 

394 method_name = f"to_{target_type.value}" 

395 def make_method(target): 

396 def method(self, data, gpu_id): 

397 return getattr(self.converter, f"to_{target.value}")(data, gpu_id) 

398 return method 

399 setattr(MemoryType, method_name, make_method(target_type)) 

400 

401_add_conversion_methods() 

402 

403 

404CPU_MEMORY_TYPES: Set[MemoryType] = {MemoryType.NUMPY} 

405GPU_MEMORY_TYPES: Set[MemoryType] = { 

406 MemoryType.CUPY, 

407 MemoryType.TORCH, 

408 MemoryType.TENSORFLOW, 

409 MemoryType.JAX, 

410 MemoryType.PYCLESPERANTO 

411} 

412SUPPORTED_MEMORY_TYPES: Set[MemoryType] = CPU_MEMORY_TYPES | GPU_MEMORY_TYPES 

413 

414VALID_MEMORY_TYPES = {mt.value for mt in MemoryType} 

415VALID_GPU_MEMORY_TYPES = {mt.value for mt in GPU_MEMORY_TYPES} 

416 

417# Memory type constants for direct access 

418MEMORY_TYPE_NUMPY = MemoryType.NUMPY.value 

419MEMORY_TYPE_CUPY = MemoryType.CUPY.value 

420MEMORY_TYPE_TORCH = MemoryType.TORCH.value 

421MEMORY_TYPE_TENSORFLOW = MemoryType.TENSORFLOW.value 

422MEMORY_TYPE_JAX = MemoryType.JAX.value 

423MEMORY_TYPE_PYCLESPERANTO = MemoryType.PYCLESPERANTO.value 

424 

425DEFAULT_NUM_WORKERS = 1