Coverage for openhcs/io/pipeline_migration.py: 9.0%
152 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2OpenHCS Pipeline Migration Utilities
4This module provides utilities to migrate old OpenHCS pipeline files that contain
5legacy enum values to the new variable component system.
7The migration handles:
8- Converting old string-based GroupBy enum values to new VariableComponents-based values
9- Preserving all other step attributes and functionality
10- Creating atomic backups during migration
11- Detecting legacy pipeline format automatically
13Usage:
14 from openhcs.io.pipeline_migration import migrate_pipeline_file, detect_legacy_pipeline
16 # Check if migration is needed
17 if detect_legacy_pipeline(steps):
18 success = migrate_pipeline_file(pipeline_path)
19"""
21import logging
22import sys
23from pathlib import Path
24from typing import Any, List, Dict, Optional
25import dill as pickle
26import io
28logger = logging.getLogger(__name__)
31def detect_legacy_pipeline(steps: List[Any]) -> bool:
32 """
33 Detect if pipeline contains legacy enum values that need migration.
35 Args:
36 steps: List of pipeline steps
38 Returns:
39 True if legacy format detected, False otherwise
40 """
41 try:
42 for step in steps:
43 # Check if step has group_by attribute with string value
44 if hasattr(step, 'group_by') and step.group_by is not None:
45 if isinstance(step.group_by, str):
46 logger.debug(f"Legacy string group_by detected: {step.group_by}")
47 return True
49 # Check variable_components for string values
50 if hasattr(step, 'variable_components') and step.variable_components:
51 for component in step.variable_components:
52 if isinstance(component, str):
53 logger.debug(f"Legacy string variable_component detected: {component}")
54 return True
56 return False
57 except Exception as e:
58 logger.warning(f"Error detecting legacy pipeline format: {e}")
59 return False
62def create_migration_mapping(enum_class) -> Dict[str, Any]:
63 """
64 Create migration mapping from enum using clean functional approach.
65 Single source of truth for all migration mappings.
66 """
67 # Special cases for NONE enum
68 mapping = {'': enum_class.NONE, 'none': enum_class.NONE} if hasattr(enum_class, 'NONE') else {}
70 # Generate all variations using dict comprehension - Pythonic and clean
71 variations = {
72 variation: member
73 for member in enum_class
74 if member.value is not None
75 for variation in _generate_string_variations(member)
76 }
78 return {**mapping, **variations}
81def _generate_string_variations(enum_member):
82 """Generate string variations for enum member - clean and functional."""
83 base_strings = [enum_member.name, enum_member.value]
84 return [
85 variant.lower()
86 for base in base_strings
87 for variant in [base, base.replace('_', '')]
88 ]
91def migrate_legacy_group_by(group_by_value: Any) -> Any:
92 """Clean migration using single mapping source."""
93 if not isinstance(group_by_value, str):
94 return group_by_value
96 from openhcs.constants.constants import GroupBy
98 migration_map = create_migration_mapping(GroupBy)
99 migrated_value = migration_map.get(group_by_value.lower())
101 if migrated_value:
102 logger.debug(f"Migrated group_by: '{group_by_value}' -> {migrated_value}")
103 return migrated_value
105 logger.warning(f"Legacy group_by '{group_by_value}' not available - using NONE")
106 return GroupBy.NONE
109def migrate_legacy_variable_components(variable_components: List[Any]) -> List[Any]:
110 """Clean migration for variable components using functional approach."""
111 if not variable_components:
112 return variable_components
114 from openhcs.constants.constants import VariableComponents
116 migration_map = create_migration_mapping(VariableComponents)
118 # Functional approach using list comprehension
119 migrated = []
120 for comp in variable_components:
121 if isinstance(comp, str):
122 migrated_comp = migration_map.get(comp.lower())
123 if migrated_comp:
124 logger.debug(f"Migrated variable_component: '{comp}' -> {migrated_comp}")
125 migrated.append(migrated_comp)
126 else:
127 logger.warning(f"Legacy variable_component '{comp}' not available - skipping")
128 else:
129 # Already an enum - keep as-is
130 migrated.append(comp)
132 return migrated
135def migrate_pipeline_steps(steps: List[Any]) -> List[Any]:
136 """
137 Migrate pipeline steps from legacy format to new enum structure.
139 Args:
140 steps: List of pipeline steps to migrate
142 Returns:
143 List of migrated pipeline steps
144 """
145 migrated_steps = []
147 for step in steps:
148 # Create a copy of the step to avoid modifying the original
149 migrated_step = step
151 # Migrate group_by if present
152 if hasattr(step, 'group_by') and step.group_by is not None:
153 migrated_step.group_by = migrate_legacy_group_by(step.group_by)
155 # Migrate variable_components if present
156 if hasattr(step, 'variable_components') and step.variable_components:
157 migrated_step.variable_components = migrate_legacy_variable_components(step.variable_components)
159 migrated_steps.append(migrated_step)
161 return migrated_steps
164def migrate_pipeline_file(pipeline_path: Path, backup_suffix: str = ".backup") -> bool:
165 """
166 Migrate a pipeline file from legacy format to new enum structure.
168 Args:
169 pipeline_path: Path to pipeline file
170 backup_suffix: Suffix for backup file
172 Returns:
173 True if migration was needed and successful, False otherwise
174 """
175 if not pipeline_path.exists():
176 logger.error(f"Pipeline file not found: {pipeline_path}")
177 return False
179 # Load existing pipeline
180 try:
181 with open(pipeline_path, 'rb') as f:
182 steps = pickle.load(f)
183 except Exception as e:
184 logger.error(f"Failed to load pipeline from {pipeline_path}: {e}")
185 return False
187 if not isinstance(steps, list):
188 logger.error(f"Invalid pipeline format in {pipeline_path}: expected list, got {type(steps)}")
189 return False
191 # Check if migration is needed
192 if not detect_legacy_pipeline(steps):
193 logger.info(f"Pipeline file {pipeline_path} is already in new format - no migration needed")
194 return False
196 logger.info(f"Legacy format detected in {pipeline_path}")
198 # Perform migration
199 try:
200 migrated_steps = migrate_pipeline_steps(steps)
201 except Exception as e:
202 logger.error(f"Failed to migrate pipeline: {e}")
203 return False
205 # Create backup
206 backup_file = pipeline_path.with_suffix(f"{pipeline_path.suffix}{backup_suffix}")
207 try:
208 pipeline_path.rename(backup_file)
209 logger.info(f"Created backup: {backup_file}")
210 except OSError as e:
211 logger.error(f"Failed to create backup: {e}")
212 return False
214 # Write migrated pipeline
215 try:
216 with open(pipeline_path, 'wb') as f:
217 pickle.dump(migrated_steps, f)
218 logger.info(f"Successfully migrated pipeline file: {pipeline_path}")
219 return True
220 except Exception as e:
221 logger.error(f"Failed to write migrated pipeline: {e}")
222 # Restore backup
223 try:
224 backup_file.rename(pipeline_path)
225 logger.info(f"Restored original file from backup")
226 except OSError:
227 logger.error(f"Failed to restore backup - original file is at {backup_file}")
228 return False
231class LegacyGroupByUnpickler(pickle.Unpickler):
232 """
233 Custom unpickler that handles legacy GroupBy enum values during deserialization.
235 This unpickler intercepts the creation of GroupBy enum instances and converts
236 legacy string values to the new VariableComponents-based structure.
237 """
239 def find_class(self, module, name):
240 """Override find_class to handle GroupBy enum migration."""
241 # Get the original class
242 cls = super().find_class(module, name)
244 # If this is the GroupBy enum, wrap it with migration logic
245 if name == 'GroupBy' and module == 'openhcs.constants.constants':
246 return self._create_migrating_groupby_class(cls)
248 return cls
250 def _create_migrating_groupby_class(self, original_groupby_class):
251 """Clean unpickler using single migration mapping source."""
253 class MigratingGroupBy:
254 """Wrapper that migrates legacy string values using clean mapping."""
256 def __new__(cls, value):
257 # If it's already a GroupBy enum, return it as-is
258 if hasattr(value, '__class__') and value.__class__.__name__ == 'GroupBy':
259 return value
261 # Handle legacy string values
262 if isinstance(value, str):
263 from openhcs.constants.constants import GroupBy
265 # Use same clean migration mapping
266 migration_map = create_migration_mapping(GroupBy)
267 migrated_value = migration_map.get(value.lower())
269 if migrated_value:
270 logger.debug(f"Unpickler migrated: '{value}' -> {migrated_value}")
271 return migrated_value
273 logger.warning(f"Unpickler: '{value}' not available - using NONE")
274 return GroupBy.NONE
276 # Fallback for other types
277 try:
278 return original_groupby_class(value)
279 except ValueError:
280 logger.warning(f"Failed to create GroupBy from value: {value}")
281 from openhcs.constants.constants import GroupBy
282 return GroupBy.NONE
284 return MigratingGroupBy
287def load_pipeline_with_migration(pipeline_path: Path) -> Optional[List[Any]]:
288 """
289 Load pipeline file with automatic migration if needed.
291 This is the main function that should be used by the PyQt GUI
292 to load pipeline files with backward compatibility.
294 Args:
295 pipeline_path: Path to pipeline file
297 Returns:
298 List of pipeline steps or None if loading failed
299 """
300 try:
301 # Load pipeline using custom unpickler for enum migration
302 with open(pipeline_path, 'rb') as f:
303 unpickler = LegacyGroupByUnpickler(f)
304 steps = unpickler.load()
306 if not isinstance(steps, list):
307 logger.error(f"Invalid pipeline format: expected list, got {type(steps)}")
308 return None
310 # Check if migration is needed
311 if detect_legacy_pipeline(steps):
312 logger.info(f"Migrating legacy pipeline format in {pipeline_path}")
314 # Migrate in-memory (don't modify the file unless explicitly requested)
315 migrated_steps = migrate_pipeline_steps(steps)
317 # Optionally save the migrated version back to file
318 # For now, just return the migrated steps without saving
319 logger.info(f"Pipeline migrated in-memory. Use migrate_pipeline_file() to save changes.")
320 return migrated_steps
322 return steps
324 except Exception as e:
325 logger.error(f"Failed to load pipeline from {pipeline_path}: {e}")
326 return None