Coverage for openhcs/io/pipeline_migration.py: 8.1%

150 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2OpenHCS Pipeline Migration Utilities 

3 

4This module provides utilities to migrate old OpenHCS pipeline files that contain 

5legacy enum values to the new variable component system. 

6 

7The migration handles: 

8- Converting old string-based GroupBy enum values to new VariableComponents-based values 

9- Preserving all other step attributes and functionality 

10- Creating atomic backups during migration 

11- Detecting legacy pipeline format automatically 

12 

13Usage: 

14 from openhcs.io.pipeline_migration import migrate_pipeline_file, detect_legacy_pipeline 

15  

16 # Check if migration is needed 

17 if detect_legacy_pipeline(steps): 

18 success = migrate_pipeline_file(pipeline_path) 

19""" 

20 

21import logging 

22from pathlib import Path 

23from typing import Any, List, Dict, Optional 

24import dill as pickle 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29def detect_legacy_pipeline(steps: List[Any]) -> bool: 

30 """ 

31 Detect if pipeline contains legacy enum values that need migration. 

32  

33 Args: 

34 steps: List of pipeline steps 

35  

36 Returns: 

37 True if legacy format detected, False otherwise 

38 """ 

39 try: 

40 for step in steps: 

41 # Check if step has group_by attribute with string value 

42 if hasattr(step, 'group_by') and step.group_by is not None: 

43 if isinstance(step.group_by, str): 

44 logger.debug(f"Legacy string group_by detected: {step.group_by}") 

45 return True 

46 

47 # Check variable_components for string values 

48 if hasattr(step, 'variable_components') and step.variable_components: 

49 for component in step.variable_components: 

50 if isinstance(component, str): 

51 logger.debug(f"Legacy string variable_component detected: {component}") 

52 return True 

53 

54 return False 

55 except Exception as e: 

56 logger.warning(f"Error detecting legacy pipeline format: {e}") 

57 return False 

58 

59 

60def create_migration_mapping(enum_class) -> Dict[str, Any]: 

61 """ 

62 Create migration mapping from enum using clean functional approach. 

63 Single source of truth for all migration mappings. 

64 """ 

65 # Special cases for NONE enum 

66 mapping = {'': enum_class.NONE, 'none': enum_class.NONE} if hasattr(enum_class, 'NONE') else {} 

67 

68 # Generate all variations using dict comprehension - Pythonic and clean 

69 variations = { 

70 variation: member 

71 for member in enum_class 

72 if member.value is not None 

73 for variation in _generate_string_variations(member) 

74 } 

75 

76 return {**mapping, **variations} 

77 

78 

79def _generate_string_variations(enum_member): 

80 """Generate string variations for enum member - clean and functional.""" 

81 base_strings = [enum_member.name, enum_member.value] 

82 return [ 

83 variant.lower() 

84 for base in base_strings 

85 for variant in [base, base.replace('_', '')] 

86 ] 

87 

88 

89def migrate_legacy_group_by(group_by_value: Any) -> Any: 

90 """Clean migration using single mapping source.""" 

91 if not isinstance(group_by_value, str): 

92 return group_by_value 

93 

94 from openhcs.constants.constants import GroupBy 

95 

96 migration_map = create_migration_mapping(GroupBy) 

97 migrated_value = migration_map.get(group_by_value.lower()) 

98 

99 if migrated_value: 

100 logger.debug(f"Migrated group_by: '{group_by_value}' -> {migrated_value}") 

101 return migrated_value 

102 

103 logger.warning(f"Legacy group_by '{group_by_value}' not available - using NONE") 

104 return GroupBy.NONE 

105 

106 

107def migrate_legacy_variable_components(variable_components: List[Any]) -> List[Any]: 

108 """Clean migration for variable components using functional approach.""" 

109 if not variable_components: 

110 return variable_components 

111 

112 from openhcs.constants.constants import VariableComponents 

113 

114 migration_map = create_migration_mapping(VariableComponents) 

115 

116 # Functional approach using list comprehension 

117 migrated = [] 

118 for comp in variable_components: 

119 if isinstance(comp, str): 

120 migrated_comp = migration_map.get(comp.lower()) 

121 if migrated_comp: 

122 logger.debug(f"Migrated variable_component: '{comp}' -> {migrated_comp}") 

123 migrated.append(migrated_comp) 

124 else: 

125 logger.warning(f"Legacy variable_component '{comp}' not available - skipping") 

126 else: 

127 # Already an enum - keep as-is 

128 migrated.append(comp) 

129 

130 return migrated 

131 

132 

133def migrate_pipeline_steps(steps: List[Any]) -> List[Any]: 

134 """ 

135 Migrate pipeline steps from legacy format to new enum structure. 

136  

137 Args: 

138 steps: List of pipeline steps to migrate 

139  

140 Returns: 

141 List of migrated pipeline steps 

142 """ 

143 migrated_steps = [] 

144 

145 for step in steps: 

146 # Create a copy of the step to avoid modifying the original 

147 migrated_step = step 

148 

149 # Migrate group_by if present 

150 if hasattr(step, 'group_by') and step.group_by is not None: 

151 migrated_step.group_by = migrate_legacy_group_by(step.group_by) 

152 

153 # Migrate variable_components if present 

154 if hasattr(step, 'variable_components') and step.variable_components: 

155 migrated_step.variable_components = migrate_legacy_variable_components(step.variable_components) 

156 

157 migrated_steps.append(migrated_step) 

158 

159 return migrated_steps 

160 

161 

162def migrate_pipeline_file(pipeline_path: Path, backup_suffix: str = ".backup") -> bool: 

163 """ 

164 Migrate a pipeline file from legacy format to new enum structure. 

165  

166 Args: 

167 pipeline_path: Path to pipeline file 

168 backup_suffix: Suffix for backup file 

169  

170 Returns: 

171 True if migration was needed and successful, False otherwise 

172 """ 

173 if not pipeline_path.exists(): 

174 logger.error(f"Pipeline file not found: {pipeline_path}") 

175 return False 

176 

177 # Load existing pipeline 

178 try: 

179 with open(pipeline_path, 'rb') as f: 

180 steps = pickle.load(f) 

181 except Exception as e: 

182 logger.error(f"Failed to load pipeline from {pipeline_path}: {e}") 

183 return False 

184 

185 if not isinstance(steps, list): 

186 logger.error(f"Invalid pipeline format in {pipeline_path}: expected list, got {type(steps)}") 

187 return False 

188 

189 # Check if migration is needed 

190 if not detect_legacy_pipeline(steps): 

191 logger.info(f"Pipeline file {pipeline_path} is already in new format - no migration needed") 

192 return False 

193 

194 logger.info(f"Legacy format detected in {pipeline_path}") 

195 

196 # Perform migration 

197 try: 

198 migrated_steps = migrate_pipeline_steps(steps) 

199 except Exception as e: 

200 logger.error(f"Failed to migrate pipeline: {e}") 

201 return False 

202 

203 # Create backup 

204 backup_file = pipeline_path.with_suffix(f"{pipeline_path.suffix}{backup_suffix}") 

205 try: 

206 pipeline_path.rename(backup_file) 

207 logger.info(f"Created backup: {backup_file}") 

208 except OSError as e: 

209 logger.error(f"Failed to create backup: {e}") 

210 return False 

211 

212 # Write migrated pipeline 

213 try: 

214 with open(pipeline_path, 'wb') as f: 

215 pickle.dump(migrated_steps, f) 

216 logger.info(f"Successfully migrated pipeline file: {pipeline_path}") 

217 return True 

218 except Exception as e: 

219 logger.error(f"Failed to write migrated pipeline: {e}") 

220 # Restore backup 

221 try: 

222 backup_file.rename(pipeline_path) 

223 logger.info("Restored original file from backup") 

224 except OSError: 

225 logger.error(f"Failed to restore backup - original file is at {backup_file}") 

226 return False 

227 

228 

229class LegacyGroupByUnpickler(pickle.Unpickler): 

230 """ 

231 Custom unpickler that handles legacy GroupBy enum values during deserialization. 

232 

233 This unpickler intercepts the creation of GroupBy enum instances and converts 

234 legacy string values to the new VariableComponents-based structure. 

235 """ 

236 

237 def find_class(self, module, name): 

238 """Override find_class to handle GroupBy enum migration.""" 

239 # Get the original class 

240 cls = super().find_class(module, name) 

241 

242 # If this is the GroupBy enum, wrap it with migration logic 

243 if name == 'GroupBy' and module == 'openhcs.constants.constants': 

244 return self._create_migrating_groupby_class(cls) 

245 

246 return cls 

247 

248 def _create_migrating_groupby_class(self, original_groupby_class): 

249 """Clean unpickler using single migration mapping source.""" 

250 

251 class MigratingGroupBy: 

252 """Wrapper that migrates legacy string values using clean mapping.""" 

253 

254 def __new__(cls, value): 

255 # If it's already a GroupBy enum, return it as-is 

256 if hasattr(value, '__class__') and value.__class__.__name__ == 'GroupBy': 

257 return value 

258 

259 # Handle legacy string values 

260 if isinstance(value, str): 

261 from openhcs.constants.constants import GroupBy 

262 

263 # Use same clean migration mapping 

264 migration_map = create_migration_mapping(GroupBy) 

265 migrated_value = migration_map.get(value.lower()) 

266 

267 if migrated_value: 

268 logger.debug(f"Unpickler migrated: '{value}' -> {migrated_value}") 

269 return migrated_value 

270 

271 logger.warning(f"Unpickler: '{value}' not available - using NONE") 

272 return GroupBy.NONE 

273 

274 # Fallback for other types 

275 try: 

276 return original_groupby_class(value) 

277 except ValueError: 

278 logger.warning(f"Failed to create GroupBy from value: {value}") 

279 from openhcs.constants.constants import GroupBy 

280 return GroupBy.NONE 

281 

282 return MigratingGroupBy 

283 

284 

285def load_pipeline_with_migration(pipeline_path: Path) -> Optional[List[Any]]: 

286 """ 

287 Load pipeline file with automatic migration if needed. 

288  

289 This is the main function that should be used by the PyQt GUI 

290 to load pipeline files with backward compatibility. 

291  

292 Args: 

293 pipeline_path: Path to pipeline file 

294  

295 Returns: 

296 List of pipeline steps or None if loading failed 

297 """ 

298 try: 

299 # Load pipeline using custom unpickler for enum migration 

300 with open(pipeline_path, 'rb') as f: 

301 unpickler = LegacyGroupByUnpickler(f) 

302 steps = unpickler.load() 

303 

304 if not isinstance(steps, list): 

305 logger.error(f"Invalid pipeline format: expected list, got {type(steps)}") 

306 return None 

307 

308 # Check if migration is needed 

309 if detect_legacy_pipeline(steps): 

310 logger.info(f"Migrating legacy pipeline format in {pipeline_path}") 

311 

312 # Migrate in-memory (don't modify the file unless explicitly requested) 

313 migrated_steps = migrate_pipeline_steps(steps) 

314 

315 # Optionally save the migrated version back to file 

316 # For now, just return the migrated steps without saving 

317 logger.info("Pipeline migrated in-memory. Use migrate_pipeline_file() to save changes.") 

318 return migrated_steps 

319 

320 return steps 

321 

322 except Exception as e: 

323 logger.error(f"Failed to load pipeline from {pipeline_path}: {e}") 

324 return None