Coverage for openhcs/io/pipeline_migration.py: 8.1%
150 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2OpenHCS Pipeline Migration Utilities
4This module provides utilities to migrate old OpenHCS pipeline files that contain
5legacy enum values to the new variable component system.
7The migration handles:
8- Converting old string-based GroupBy enum values to new VariableComponents-based values
9- Preserving all other step attributes and functionality
10- Creating atomic backups during migration
11- Detecting legacy pipeline format automatically
13Usage:
14 from openhcs.io.pipeline_migration import migrate_pipeline_file, detect_legacy_pipeline
16 # Check if migration is needed
17 if detect_legacy_pipeline(steps):
18 success = migrate_pipeline_file(pipeline_path)
19"""
21import logging
22from pathlib import Path
23from typing import Any, List, Dict, Optional
24import dill as pickle
26logger = logging.getLogger(__name__)
29def detect_legacy_pipeline(steps: List[Any]) -> bool:
30 """
31 Detect if pipeline contains legacy enum values that need migration.
33 Args:
34 steps: List of pipeline steps
36 Returns:
37 True if legacy format detected, False otherwise
38 """
39 try:
40 for step in steps:
41 # Check if step has group_by attribute with string value
42 if hasattr(step, 'group_by') and step.group_by is not None:
43 if isinstance(step.group_by, str):
44 logger.debug(f"Legacy string group_by detected: {step.group_by}")
45 return True
47 # Check variable_components for string values
48 if hasattr(step, 'variable_components') and step.variable_components:
49 for component in step.variable_components:
50 if isinstance(component, str):
51 logger.debug(f"Legacy string variable_component detected: {component}")
52 return True
54 return False
55 except Exception as e:
56 logger.warning(f"Error detecting legacy pipeline format: {e}")
57 return False
60def create_migration_mapping(enum_class) -> Dict[str, Any]:
61 """
62 Create migration mapping from enum using clean functional approach.
63 Single source of truth for all migration mappings.
64 """
65 # Special cases for NONE enum
66 mapping = {'': enum_class.NONE, 'none': enum_class.NONE} if hasattr(enum_class, 'NONE') else {}
68 # Generate all variations using dict comprehension - Pythonic and clean
69 variations = {
70 variation: member
71 for member in enum_class
72 if member.value is not None
73 for variation in _generate_string_variations(member)
74 }
76 return {**mapping, **variations}
79def _generate_string_variations(enum_member):
80 """Generate string variations for enum member - clean and functional."""
81 base_strings = [enum_member.name, enum_member.value]
82 return [
83 variant.lower()
84 for base in base_strings
85 for variant in [base, base.replace('_', '')]
86 ]
89def migrate_legacy_group_by(group_by_value: Any) -> Any:
90 """Clean migration using single mapping source."""
91 if not isinstance(group_by_value, str):
92 return group_by_value
94 from openhcs.constants.constants import GroupBy
96 migration_map = create_migration_mapping(GroupBy)
97 migrated_value = migration_map.get(group_by_value.lower())
99 if migrated_value:
100 logger.debug(f"Migrated group_by: '{group_by_value}' -> {migrated_value}")
101 return migrated_value
103 logger.warning(f"Legacy group_by '{group_by_value}' not available - using NONE")
104 return GroupBy.NONE
107def migrate_legacy_variable_components(variable_components: List[Any]) -> List[Any]:
108 """Clean migration for variable components using functional approach."""
109 if not variable_components:
110 return variable_components
112 from openhcs.constants.constants import VariableComponents
114 migration_map = create_migration_mapping(VariableComponents)
116 # Functional approach using list comprehension
117 migrated = []
118 for comp in variable_components:
119 if isinstance(comp, str):
120 migrated_comp = migration_map.get(comp.lower())
121 if migrated_comp:
122 logger.debug(f"Migrated variable_component: '{comp}' -> {migrated_comp}")
123 migrated.append(migrated_comp)
124 else:
125 logger.warning(f"Legacy variable_component '{comp}' not available - skipping")
126 else:
127 # Already an enum - keep as-is
128 migrated.append(comp)
130 return migrated
133def migrate_pipeline_steps(steps: List[Any]) -> List[Any]:
134 """
135 Migrate pipeline steps from legacy format to new enum structure.
137 Args:
138 steps: List of pipeline steps to migrate
140 Returns:
141 List of migrated pipeline steps
142 """
143 migrated_steps = []
145 for step in steps:
146 # Create a copy of the step to avoid modifying the original
147 migrated_step = step
149 # Migrate group_by if present
150 if hasattr(step, 'group_by') and step.group_by is not None:
151 migrated_step.group_by = migrate_legacy_group_by(step.group_by)
153 # Migrate variable_components if present
154 if hasattr(step, 'variable_components') and step.variable_components:
155 migrated_step.variable_components = migrate_legacy_variable_components(step.variable_components)
157 migrated_steps.append(migrated_step)
159 return migrated_steps
162def migrate_pipeline_file(pipeline_path: Path, backup_suffix: str = ".backup") -> bool:
163 """
164 Migrate a pipeline file from legacy format to new enum structure.
166 Args:
167 pipeline_path: Path to pipeline file
168 backup_suffix: Suffix for backup file
170 Returns:
171 True if migration was needed and successful, False otherwise
172 """
173 if not pipeline_path.exists():
174 logger.error(f"Pipeline file not found: {pipeline_path}")
175 return False
177 # Load existing pipeline
178 try:
179 with open(pipeline_path, 'rb') as f:
180 steps = pickle.load(f)
181 except Exception as e:
182 logger.error(f"Failed to load pipeline from {pipeline_path}: {e}")
183 return False
185 if not isinstance(steps, list):
186 logger.error(f"Invalid pipeline format in {pipeline_path}: expected list, got {type(steps)}")
187 return False
189 # Check if migration is needed
190 if not detect_legacy_pipeline(steps):
191 logger.info(f"Pipeline file {pipeline_path} is already in new format - no migration needed")
192 return False
194 logger.info(f"Legacy format detected in {pipeline_path}")
196 # Perform migration
197 try:
198 migrated_steps = migrate_pipeline_steps(steps)
199 except Exception as e:
200 logger.error(f"Failed to migrate pipeline: {e}")
201 return False
203 # Create backup
204 backup_file = pipeline_path.with_suffix(f"{pipeline_path.suffix}{backup_suffix}")
205 try:
206 pipeline_path.rename(backup_file)
207 logger.info(f"Created backup: {backup_file}")
208 except OSError as e:
209 logger.error(f"Failed to create backup: {e}")
210 return False
212 # Write migrated pipeline
213 try:
214 with open(pipeline_path, 'wb') as f:
215 pickle.dump(migrated_steps, f)
216 logger.info(f"Successfully migrated pipeline file: {pipeline_path}")
217 return True
218 except Exception as e:
219 logger.error(f"Failed to write migrated pipeline: {e}")
220 # Restore backup
221 try:
222 backup_file.rename(pipeline_path)
223 logger.info("Restored original file from backup")
224 except OSError:
225 logger.error(f"Failed to restore backup - original file is at {backup_file}")
226 return False
229class LegacyGroupByUnpickler(pickle.Unpickler):
230 """
231 Custom unpickler that handles legacy GroupBy enum values during deserialization.
233 This unpickler intercepts the creation of GroupBy enum instances and converts
234 legacy string values to the new VariableComponents-based structure.
235 """
237 def find_class(self, module, name):
238 """Override find_class to handle GroupBy enum migration."""
239 # Get the original class
240 cls = super().find_class(module, name)
242 # If this is the GroupBy enum, wrap it with migration logic
243 if name == 'GroupBy' and module == 'openhcs.constants.constants':
244 return self._create_migrating_groupby_class(cls)
246 return cls
248 def _create_migrating_groupby_class(self, original_groupby_class):
249 """Clean unpickler using single migration mapping source."""
251 class MigratingGroupBy:
252 """Wrapper that migrates legacy string values using clean mapping."""
254 def __new__(cls, value):
255 # If it's already a GroupBy enum, return it as-is
256 if hasattr(value, '__class__') and value.__class__.__name__ == 'GroupBy':
257 return value
259 # Handle legacy string values
260 if isinstance(value, str):
261 from openhcs.constants.constants import GroupBy
263 # Use same clean migration mapping
264 migration_map = create_migration_mapping(GroupBy)
265 migrated_value = migration_map.get(value.lower())
267 if migrated_value:
268 logger.debug(f"Unpickler migrated: '{value}' -> {migrated_value}")
269 return migrated_value
271 logger.warning(f"Unpickler: '{value}' not available - using NONE")
272 return GroupBy.NONE
274 # Fallback for other types
275 try:
276 return original_groupby_class(value)
277 except ValueError:
278 logger.warning(f"Failed to create GroupBy from value: {value}")
279 from openhcs.constants.constants import GroupBy
280 return GroupBy.NONE
282 return MigratingGroupBy
285def load_pipeline_with_migration(pipeline_path: Path) -> Optional[List[Any]]:
286 """
287 Load pipeline file with automatic migration if needed.
289 This is the main function that should be used by the PyQt GUI
290 to load pipeline files with backward compatibility.
292 Args:
293 pipeline_path: Path to pipeline file
295 Returns:
296 List of pipeline steps or None if loading failed
297 """
298 try:
299 # Load pipeline using custom unpickler for enum migration
300 with open(pipeline_path, 'rb') as f:
301 unpickler = LegacyGroupByUnpickler(f)
302 steps = unpickler.load()
304 if not isinstance(steps, list):
305 logger.error(f"Invalid pipeline format: expected list, got {type(steps)}")
306 return None
308 # Check if migration is needed
309 if detect_legacy_pipeline(steps):
310 logger.info(f"Migrating legacy pipeline format in {pipeline_path}")
312 # Migrate in-memory (don't modify the file unless explicitly requested)
313 migrated_steps = migrate_pipeline_steps(steps)
315 # Optionally save the migrated version back to file
316 # For now, just return the migrated steps without saving
317 logger.info("Pipeline migrated in-memory. Use migrate_pipeline_file() to save changes.")
318 return migrated_steps
320 return steps
322 except Exception as e:
323 logger.error(f"Failed to load pipeline from {pipeline_path}: {e}")
324 return None