Coverage for openhcs/constants/constants.py: 77.6%
241 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Consolidated constants for OpenHCS.
4This module defines all constants related to backends, defaults, I/O, memory, and pipeline.
5These constants are governed by various doctrinal clauses.
7Caching:
8- Component enums (AllComponents, VariableComponents, GroupBy) are cached persistently
9- Cache invalidated on OpenHCS version change or after 7 days
10- Provides ~20x speedup on subsequent runs and in subprocesses
11"""
13from enum import Enum
14from functools import lru_cache
15from typing import Any, Callable, Set, TypeVar, Dict, Tuple
16import logging
18logger = logging.getLogger(__name__)
21class Microscope(Enum):
22 AUTO = "auto"
23 OPENHCS = "openhcs" # Added for the OpenHCS pre-processed format
24 IMAGEXPRESS = "ImageXpress"
25 OPERAPHENIX = "OperaPhenix"
26 OMERO = "omero" # Added for OMERO virtual filesystem backend
29class VirtualComponents(Enum):
30 """
31 Components that don't come from filename parsing but from execution/location context.
33 SOURCE represents:
34 - During pipeline execution: step_name (distinguishes pipeline steps)
35 - When loading from disk: subdirectory name (distinguishes image sources)
37 This unifies the step/source concept across Napari and Fiji viewers.
38 """
39 SOURCE = "source" # Unified step/source component
42def get_openhcs_config():
43 """Get the OpenHCS configuration, initializing it if needed."""
44 from openhcs.components.framework import ComponentConfigurationFactory
45 return ComponentConfigurationFactory.create_openhcs_default_configuration()
48# Lazy import cache manager to avoid circular dependencies
49_component_enum_cache_manager = None
52def _get_component_enum_cache_manager():
53 """Lazy import of cache manager for component enums."""
54 global _component_enum_cache_manager
55 if _component_enum_cache_manager is None: 55 ↛ 92line 55 didn't jump to line 92 because the condition on line 55 was always true
56 try:
57 from openhcs.core.registry_cache import RegistryCacheManager, CacheConfig
59 def get_version():
60 try:
61 import openhcs
62 return openhcs.__version__
63 except:
64 return "unknown"
66 # Serializer for component enum data
67 # Note: RegistryCacheManager calls serializer(item) for each item in the dict
68 # We store all three enums as a single item with key 'enums'
69 def serialize_component_enums(enum_data: Dict[str, Any]) -> Dict[str, Any]:
70 """Serialize the three component enum dicts to JSON."""
71 return enum_data # Already a dict of dicts
73 # Deserializer for component enum data
74 def deserialize_component_enums(data: Dict[str, Any]) -> Dict[str, Any]:
75 """Deserialize component enum data from JSON."""
76 return data # Already a dict of dicts
78 _component_enum_cache_manager = RegistryCacheManager(
79 cache_name="component_enums",
80 version_getter=get_version,
81 serializer=serialize_component_enums,
82 deserializer=deserialize_component_enums,
83 config=CacheConfig(
84 max_age_days=7,
85 check_mtimes=False # No file tracking needed for config-based enums
86 )
87 )
88 except Exception as e:
89 logger.debug(f"Failed to initialize component enum cache manager: {e}")
90 _component_enum_cache_manager = False # Mark as failed to avoid retrying
92 return _component_enum_cache_manager if _component_enum_cache_manager is not False else None
95def _add_groupby_methods(GroupBy: Enum) -> Enum:
96 """Add custom methods to GroupBy enum."""
97 GroupBy.component = property(lambda self: self.value)
98 GroupBy.__eq__ = lambda self, other: self.value == getattr(other, 'value', other)
99 GroupBy.__hash__ = lambda self: hash("GroupBy.NONE") if self.value is None else hash(self.value)
100 GroupBy.__str__ = lambda self: f"GroupBy.{self.name}"
101 GroupBy.__repr__ = lambda self: f"GroupBy.{self.name}"
102 return GroupBy
105# Simple lazy initialization - just defer the config call
106@lru_cache(maxsize=1)
107def _create_enums():
108 """Create enums when first needed with persistent caching.
110 CRITICAL: This function must create enums with proper __module__ and __qualname__
111 attributes so they can be pickled correctly in multiprocessing contexts.
112 The enums are stored in module globals() to ensure identity consistency.
114 Caching provides ~20x speedup on subsequent runs and in subprocesses.
115 """
116 import os
117 import traceback
118 logger.info(f"🔧 _create_enums() CALLED in process {os.getpid()}")
119 logger.info(f"🔧 _create_enums() cache_info: {_create_enums.cache_info()}")
120 logger.info(f"🔧 _create_enums() STACK TRACE:\n{''.join(traceback.format_stack())}")
122 # Try to load from persistent cache first
123 cache_manager = _get_component_enum_cache_manager()
124 if cache_manager: 124 ↛ 151line 124 didn't jump to line 151 because the condition on line 124 was always true
125 try:
126 cached_dict = cache_manager.load_cache()
127 if cached_dict is not None and 'enums' in cached_dict: 127 ↛ 129line 127 didn't jump to line 129 because the condition on line 127 was never true
128 # Cache hit - reconstruct enums from cached data
129 cached_data = cached_dict['enums']
130 logger.debug("✅ Loading component enums from cache")
132 all_components = Enum('AllComponents', cached_data['all_components'])
133 all_components.__module__ = __name__
134 all_components.__qualname__ = 'AllComponents'
136 vc = Enum('VariableComponents', cached_data['variable_components'])
137 vc.__module__ = __name__
138 vc.__qualname__ = 'VariableComponents'
140 GroupBy = Enum('GroupBy', cached_data['group_by'])
141 GroupBy.__module__ = __name__
142 GroupBy.__qualname__ = 'GroupBy'
143 GroupBy = _add_groupby_methods(GroupBy)
145 logger.info(f"🔧 _create_enums() LOADED FROM CACHE in process {os.getpid()}")
146 return all_components, vc, GroupBy
147 except Exception as e:
148 logger.debug(f"Cache load failed for component enums: {e}")
150 # Cache miss or disabled - create enums from config
151 config = get_openhcs_config()
152 remaining = config.get_remaining_components()
154 # AllComponents: ALL possible dimensions (including multiprocessing axis)
155 all_components_dict = {c.name: c.value for c in config.all_components}
156 all_components = Enum('AllComponents', all_components_dict)
157 all_components.__module__ = __name__
158 all_components.__qualname__ = 'AllComponents'
160 # VariableComponents: Components available for variable selection (excludes multiprocessing axis)
161 vc_dict = {c.name: c.value for c in remaining}
162 vc = Enum('VariableComponents', vc_dict)
163 vc.__module__ = __name__
164 vc.__qualname__ = 'VariableComponents'
166 # GroupBy: Same as VariableComponents + NONE option (they're the same concept)
167 gb_dict = {c.name: c.value for c in remaining}
168 gb_dict['NONE'] = None
169 GroupBy = Enum('GroupBy', gb_dict)
170 GroupBy.__module__ = __name__
171 GroupBy.__qualname__ = 'GroupBy'
172 GroupBy = _add_groupby_methods(GroupBy)
174 # Save to persistent cache
175 # Store all three enums as a single item with key 'enums'
176 if cache_manager: 176 ↛ 188line 176 didn't jump to line 188 because the condition on line 176 was always true
177 try:
178 enum_data = {
179 'all_components': all_components_dict,
180 'variable_components': vc_dict,
181 'group_by': gb_dict
182 }
183 cache_manager.save_cache({'enums': enum_data})
184 logger.debug("💾 Saved component enums to cache")
185 except Exception as e:
186 logger.debug(f"Failed to save component enum cache: {e}")
188 logger.info(f"🔧 _create_enums() RETURNING in process {os.getpid()}: "
189 f"AllComponents={id(all_components)}, VariableComponents={id(vc)}, GroupBy={id(GroupBy)}")
190 logger.info(f"🔧 _create_enums() cache_info after return: {_create_enums.cache_info()}")
191 return all_components, vc, GroupBy
194@lru_cache(maxsize=1)
195def _create_streaming_components():
196 """Create StreamingComponents enum combining AllComponents + VirtualComponents.
198 This enum includes both filename components (from parser) and virtual components
199 (from execution/location context) for streaming visualization.
200 """
201 import logging
202 import os
203 logger = logging.getLogger(__name__)
204 logger.info(f"🔧 _create_streaming_components() CALLED in process {os.getpid()}")
206 # Import AllComponents (triggers lazy creation if needed)
207 from openhcs.constants import AllComponents
209 # Combine all component types
210 components_dict = {c.name: c.value for c in AllComponents}
211 components_dict.update({c.name: c.value for c in VirtualComponents})
213 streaming_components = Enum('StreamingComponents', components_dict)
214 streaming_components.__module__ = __name__
215 streaming_components.__qualname__ = 'StreamingComponents'
217 logger.info(f"🔧 _create_streaming_components() RETURNING: StreamingComponents={id(streaming_components)}")
218 return streaming_components
221def __getattr__(name):
222 """Lazy enum creation with identity guarantee.
224 CRITICAL: Ensures enums are created exactly once per process and stored in globals()
225 so that pickle identity checks pass in multiprocessing contexts.
226 """
227 if name in ('AllComponents', 'VariableComponents', 'GroupBy'):
228 # Check if already created (handles race conditions)
229 if name in globals(): 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 return globals()[name]
232 # Create all enums at once and store in globals
233 import logging
234 import os
235 logger = logging.getLogger(__name__)
236 logger.info(f"🔧 ENUM CREATION: Creating {name} in process {os.getpid()}")
238 all_components, vc, gb = _create_enums()
239 globals()['AllComponents'] = all_components
240 globals()['VariableComponents'] = vc
241 globals()['GroupBy'] = gb
243 logger.info(f"🔧 ENUM CREATION: Created enums in process {os.getpid()}: "
244 f"AllComponents={id(all_components)}, VariableComponents={id(vc)}, GroupBy={id(gb)}")
245 logger.info(f"🔧 ENUM CREATION: VariableComponents.__module__={vc.__module__}, __qualname__={vc.__qualname__}")
247 return globals()[name]
249 if name == 'StreamingComponents': 249 ↛ 251line 249 didn't jump to line 251 because the condition on line 249 was never true
250 # Check if already created
251 if name in globals():
252 return globals()[name]
254 import logging
255 import os
256 logger = logging.getLogger(__name__)
257 logger.info(f"🔧 ENUM CREATION: Creating StreamingComponents in process {os.getpid()}")
259 streaming_components = _create_streaming_components()
260 globals()['StreamingComponents'] = streaming_components
262 logger.info(f"🔧 ENUM CREATION: Created StreamingComponents in process {os.getpid()}: "
263 f"StreamingComponents={id(streaming_components)}")
265 return globals()[name]
267 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
273#Documentation URL
274DOCUMENTATION_URL = "https://openhcs.readthedocs.io/en/latest/"
277class OrchestratorState(Enum):
278 """Simple orchestrator state tracking - no complex state machine."""
279 CREATED = "created" # Object exists, not initialized
280 READY = "ready" # Initialized, ready for compilation
281 COMPILED = "compiled" # Compilation complete, ready for execution
282 EXECUTING = "executing" # Execution in progress
283 COMPLETED = "completed" # Execution completed successfully
284 INIT_FAILED = "init_failed" # Initialization failed
285 COMPILE_FAILED = "compile_failed" # Compilation failed (implies initialized)
286 EXEC_FAILED = "exec_failed" # Execution failed (implies compiled)
288# I/O-related constants
289DEFAULT_IMAGE_EXTENSION = ".tif"
290DEFAULT_IMAGE_EXTENSIONS: Set[str] = {".tif", ".tiff", ".TIF", ".TIFF"}
291DEFAULT_SITE_PADDING = 3
292DEFAULT_RECURSIVE_PATTERN_SEARCH = False
293# Lazy default resolution using lru_cache
294@lru_cache(maxsize=1)
295def get_default_variable_components():
296 """Get default variable components from ComponentConfiguration."""
297 _, vc, _ = _create_enums() # Get the enum directly
298 return [getattr(vc, c.name) for c in get_openhcs_config().default_variable]
301@lru_cache(maxsize=1)
302def get_default_group_by():
303 """Get default group_by from ComponentConfiguration."""
304 _, _, gb = _create_enums() # Get the enum directly
305 config = get_openhcs_config()
306 return getattr(gb, config.default_group_by.name) if config.default_group_by else None
308@lru_cache(maxsize=1)
309def get_multiprocessing_axis():
310 """Get multiprocessing axis from ComponentConfiguration."""
311 config = get_openhcs_config()
312 return config.multiprocessing_axis
314DEFAULT_MICROSCOPE: Microscope = Microscope.AUTO
320# Backend-related constants
321class Backend(Enum):
322 AUTO = "auto"
323 DISK = "disk"
324 MEMORY = "memory"
325 ZARR = "zarr"
326 NAPARI_STREAM = "napari_stream"
327 FIJI_STREAM = "fiji_stream"
328 OMERO_LOCAL = "omero_local"
329 VIRTUAL_WORKSPACE = "virtual_workspace"
331class FileFormat(Enum):
332 TIFF = list(DEFAULT_IMAGE_EXTENSIONS)
333 NUMPY = [".npy"]
334 TORCH = [".pt", ".torch", ".pth"]
335 JAX = [".jax"]
336 CUPY = [".cupy",".craw"]
337 TENSORFLOW = [".tf"]
338 JSON = [".json"]
339 CSV = [".csv"]
340 TEXT = [".txt", ".py", ".md"]
341 ROI = [".roi.zip"]
343DEFAULT_BACKEND = Backend.MEMORY
344REQUIRES_DISK_READ = "requires_disk_read"
345REQUIRES_DISK_WRITE = "requires_disk_write"
346FORCE_DISK_WRITE = "force_disk_write"
347READ_BACKEND = "read_backend"
348WRITE_BACKEND = "write_backend"
350# Default values
351DEFAULT_TILE_OVERLAP = 10.0
352DEFAULT_MAX_SHIFT = 50
353DEFAULT_MARGIN_RATIO = 0.1
354DEFAULT_PIXEL_SIZE = 1.0
355DEFAULT_ASSEMBLER_LOG_LEVEL = "INFO"
356DEFAULT_INTERPOLATION_MODE = "nearest"
357DEFAULT_INTERPOLATION_ORDER = 1
358DEFAULT_CPU_THREAD_COUNT = 4
359DEFAULT_PATCH_SIZE = 128
360DEFAULT_SEARCH_RADIUS = 20
361# Consolidated definition for CPU thread count
363# ZMQ transport constants
364# Note: Streaming port defaults are defined in NapariStreamingConfig and FijiStreamingConfig
365CONTROL_PORT_OFFSET = 1000 # Control port = data port + 1000
366DEFAULT_EXECUTION_SERVER_PORT = 7777
367IPC_SOCKET_DIR_NAME = "ipc" # ~/.openhcs/ipc/
368IPC_SOCKET_PREFIX = "openhcs-zmq" # ipc://openhcs-zmq-{port} or ~/.openhcs/ipc/openhcs-zmq-{port}.sock
369IPC_SOCKET_EXTENSION = ".sock" # Unix domain socket extension
372# Memory-related constants
373T = TypeVar('T')
374ConversionFunc = Callable[[Any], Any]
376class MemoryType(Enum):
377 NUMPY = "numpy"
378 CUPY = "cupy"
379 TORCH = "torch"
380 TENSORFLOW = "tensorflow"
381 JAX = "jax"
382 PYCLESPERANTO = "pyclesperanto"
384 @property
385 def converter(self):
386 """Get the converter instance for this memory type."""
387 from openhcs.core.memory.conversion_helpers import _CONVERTERS
388 return _CONVERTERS[self]
390# Auto-generate to_X() methods on enum
391def _add_conversion_methods():
392 """Add to_X() conversion methods to MemoryType enum."""
393 for target_type in MemoryType:
394 method_name = f"to_{target_type.value}"
395 def make_method(target):
396 def method(self, data, gpu_id):
397 return getattr(self.converter, f"to_{target.value}")(data, gpu_id)
398 return method
399 setattr(MemoryType, method_name, make_method(target_type))
401_add_conversion_methods()
404CPU_MEMORY_TYPES: Set[MemoryType] = {MemoryType.NUMPY}
405GPU_MEMORY_TYPES: Set[MemoryType] = {
406 MemoryType.CUPY,
407 MemoryType.TORCH,
408 MemoryType.TENSORFLOW,
409 MemoryType.JAX,
410 MemoryType.PYCLESPERANTO
411}
412SUPPORTED_MEMORY_TYPES: Set[MemoryType] = CPU_MEMORY_TYPES | GPU_MEMORY_TYPES
414VALID_MEMORY_TYPES = {mt.value for mt in MemoryType}
415VALID_GPU_MEMORY_TYPES = {mt.value for mt in GPU_MEMORY_TYPES}
417# Memory type constants for direct access
418MEMORY_TYPE_NUMPY = MemoryType.NUMPY.value
419MEMORY_TYPE_CUPY = MemoryType.CUPY.value
420MEMORY_TYPE_TORCH = MemoryType.TORCH.value
421MEMORY_TYPE_TENSORFLOW = MemoryType.TENSORFLOW.value
422MEMORY_TYPE_JAX = MemoryType.JAX.value
423MEMORY_TYPE_PYCLESPERANTO = MemoryType.PYCLESPERANTO.value
425DEFAULT_NUM_WORKERS = 1