Coverage for openhcs/io/pipeline_migration.py: 9.0%

152 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2OpenHCS Pipeline Migration Utilities 

3 

4This module provides utilities to migrate old OpenHCS pipeline files that contain 

5legacy enum values to the new variable component system. 

6 

7The migration handles: 

8- Converting old string-based GroupBy enum values to new VariableComponents-based values 

9- Preserving all other step attributes and functionality 

10- Creating atomic backups during migration 

11- Detecting legacy pipeline format automatically 

12 

13Usage: 

14 from openhcs.io.pipeline_migration import migrate_pipeline_file, detect_legacy_pipeline 

15  

16 # Check if migration is needed 

17 if detect_legacy_pipeline(steps): 

18 success = migrate_pipeline_file(pipeline_path) 

19""" 

20 

21import logging 

22import sys 

23from pathlib import Path 

24from typing import Any, List, Dict, Optional 

25import dill as pickle 

26import io 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31def detect_legacy_pipeline(steps: List[Any]) -> bool: 

32 """ 

33 Detect if pipeline contains legacy enum values that need migration. 

34  

35 Args: 

36 steps: List of pipeline steps 

37  

38 Returns: 

39 True if legacy format detected, False otherwise 

40 """ 

41 try: 

42 for step in steps: 

43 # Check if step has group_by attribute with string value 

44 if hasattr(step, 'group_by') and step.group_by is not None: 

45 if isinstance(step.group_by, str): 

46 logger.debug(f"Legacy string group_by detected: {step.group_by}") 

47 return True 

48 

49 # Check variable_components for string values 

50 if hasattr(step, 'variable_components') and step.variable_components: 

51 for component in step.variable_components: 

52 if isinstance(component, str): 

53 logger.debug(f"Legacy string variable_component detected: {component}") 

54 return True 

55 

56 return False 

57 except Exception as e: 

58 logger.warning(f"Error detecting legacy pipeline format: {e}") 

59 return False 

60 

61 

62def create_migration_mapping(enum_class) -> Dict[str, Any]: 

63 """ 

64 Create migration mapping from enum using clean functional approach. 

65 Single source of truth for all migration mappings. 

66 """ 

67 # Special cases for NONE enum 

68 mapping = {'': enum_class.NONE, 'none': enum_class.NONE} if hasattr(enum_class, 'NONE') else {} 

69 

70 # Generate all variations using dict comprehension - Pythonic and clean 

71 variations = { 

72 variation: member 

73 for member in enum_class 

74 if member.value is not None 

75 for variation in _generate_string_variations(member) 

76 } 

77 

78 return {**mapping, **variations} 

79 

80 

81def _generate_string_variations(enum_member): 

82 """Generate string variations for enum member - clean and functional.""" 

83 base_strings = [enum_member.name, enum_member.value] 

84 return [ 

85 variant.lower() 

86 for base in base_strings 

87 for variant in [base, base.replace('_', '')] 

88 ] 

89 

90 

91def migrate_legacy_group_by(group_by_value: Any) -> Any: 

92 """Clean migration using single mapping source.""" 

93 if not isinstance(group_by_value, str): 

94 return group_by_value 

95 

96 from openhcs.constants.constants import GroupBy 

97 

98 migration_map = create_migration_mapping(GroupBy) 

99 migrated_value = migration_map.get(group_by_value.lower()) 

100 

101 if migrated_value: 

102 logger.debug(f"Migrated group_by: '{group_by_value}' -> {migrated_value}") 

103 return migrated_value 

104 

105 logger.warning(f"Legacy group_by '{group_by_value}' not available - using NONE") 

106 return GroupBy.NONE 

107 

108 

109def migrate_legacy_variable_components(variable_components: List[Any]) -> List[Any]: 

110 """Clean migration for variable components using functional approach.""" 

111 if not variable_components: 

112 return variable_components 

113 

114 from openhcs.constants.constants import VariableComponents 

115 

116 migration_map = create_migration_mapping(VariableComponents) 

117 

118 # Functional approach using list comprehension 

119 migrated = [] 

120 for comp in variable_components: 

121 if isinstance(comp, str): 

122 migrated_comp = migration_map.get(comp.lower()) 

123 if migrated_comp: 

124 logger.debug(f"Migrated variable_component: '{comp}' -> {migrated_comp}") 

125 migrated.append(migrated_comp) 

126 else: 

127 logger.warning(f"Legacy variable_component '{comp}' not available - skipping") 

128 else: 

129 # Already an enum - keep as-is 

130 migrated.append(comp) 

131 

132 return migrated 

133 

134 

135def migrate_pipeline_steps(steps: List[Any]) -> List[Any]: 

136 """ 

137 Migrate pipeline steps from legacy format to new enum structure. 

138  

139 Args: 

140 steps: List of pipeline steps to migrate 

141  

142 Returns: 

143 List of migrated pipeline steps 

144 """ 

145 migrated_steps = [] 

146 

147 for step in steps: 

148 # Create a copy of the step to avoid modifying the original 

149 migrated_step = step 

150 

151 # Migrate group_by if present 

152 if hasattr(step, 'group_by') and step.group_by is not None: 

153 migrated_step.group_by = migrate_legacy_group_by(step.group_by) 

154 

155 # Migrate variable_components if present 

156 if hasattr(step, 'variable_components') and step.variable_components: 

157 migrated_step.variable_components = migrate_legacy_variable_components(step.variable_components) 

158 

159 migrated_steps.append(migrated_step) 

160 

161 return migrated_steps 

162 

163 

164def migrate_pipeline_file(pipeline_path: Path, backup_suffix: str = ".backup") -> bool: 

165 """ 

166 Migrate a pipeline file from legacy format to new enum structure. 

167  

168 Args: 

169 pipeline_path: Path to pipeline file 

170 backup_suffix: Suffix for backup file 

171  

172 Returns: 

173 True if migration was needed and successful, False otherwise 

174 """ 

175 if not pipeline_path.exists(): 

176 logger.error(f"Pipeline file not found: {pipeline_path}") 

177 return False 

178 

179 # Load existing pipeline 

180 try: 

181 with open(pipeline_path, 'rb') as f: 

182 steps = pickle.load(f) 

183 except Exception as e: 

184 logger.error(f"Failed to load pipeline from {pipeline_path}: {e}") 

185 return False 

186 

187 if not isinstance(steps, list): 

188 logger.error(f"Invalid pipeline format in {pipeline_path}: expected list, got {type(steps)}") 

189 return False 

190 

191 # Check if migration is needed 

192 if not detect_legacy_pipeline(steps): 

193 logger.info(f"Pipeline file {pipeline_path} is already in new format - no migration needed") 

194 return False 

195 

196 logger.info(f"Legacy format detected in {pipeline_path}") 

197 

198 # Perform migration 

199 try: 

200 migrated_steps = migrate_pipeline_steps(steps) 

201 except Exception as e: 

202 logger.error(f"Failed to migrate pipeline: {e}") 

203 return False 

204 

205 # Create backup 

206 backup_file = pipeline_path.with_suffix(f"{pipeline_path.suffix}{backup_suffix}") 

207 try: 

208 pipeline_path.rename(backup_file) 

209 logger.info(f"Created backup: {backup_file}") 

210 except OSError as e: 

211 logger.error(f"Failed to create backup: {e}") 

212 return False 

213 

214 # Write migrated pipeline 

215 try: 

216 with open(pipeline_path, 'wb') as f: 

217 pickle.dump(migrated_steps, f) 

218 logger.info(f"Successfully migrated pipeline file: {pipeline_path}") 

219 return True 

220 except Exception as e: 

221 logger.error(f"Failed to write migrated pipeline: {e}") 

222 # Restore backup 

223 try: 

224 backup_file.rename(pipeline_path) 

225 logger.info(f"Restored original file from backup") 

226 except OSError: 

227 logger.error(f"Failed to restore backup - original file is at {backup_file}") 

228 return False 

229 

230 

231class LegacyGroupByUnpickler(pickle.Unpickler): 

232 """ 

233 Custom unpickler that handles legacy GroupBy enum values during deserialization. 

234 

235 This unpickler intercepts the creation of GroupBy enum instances and converts 

236 legacy string values to the new VariableComponents-based structure. 

237 """ 

238 

239 def find_class(self, module, name): 

240 """Override find_class to handle GroupBy enum migration.""" 

241 # Get the original class 

242 cls = super().find_class(module, name) 

243 

244 # If this is the GroupBy enum, wrap it with migration logic 

245 if name == 'GroupBy' and module == 'openhcs.constants.constants': 

246 return self._create_migrating_groupby_class(cls) 

247 

248 return cls 

249 

250 def _create_migrating_groupby_class(self, original_groupby_class): 

251 """Clean unpickler using single migration mapping source.""" 

252 

253 class MigratingGroupBy: 

254 """Wrapper that migrates legacy string values using clean mapping.""" 

255 

256 def __new__(cls, value): 

257 # If it's already a GroupBy enum, return it as-is 

258 if hasattr(value, '__class__') and value.__class__.__name__ == 'GroupBy': 

259 return value 

260 

261 # Handle legacy string values 

262 if isinstance(value, str): 

263 from openhcs.constants.constants import GroupBy 

264 

265 # Use same clean migration mapping 

266 migration_map = create_migration_mapping(GroupBy) 

267 migrated_value = migration_map.get(value.lower()) 

268 

269 if migrated_value: 

270 logger.debug(f"Unpickler migrated: '{value}' -> {migrated_value}") 

271 return migrated_value 

272 

273 logger.warning(f"Unpickler: '{value}' not available - using NONE") 

274 return GroupBy.NONE 

275 

276 # Fallback for other types 

277 try: 

278 return original_groupby_class(value) 

279 except ValueError: 

280 logger.warning(f"Failed to create GroupBy from value: {value}") 

281 from openhcs.constants.constants import GroupBy 

282 return GroupBy.NONE 

283 

284 return MigratingGroupBy 

285 

286 

287def load_pipeline_with_migration(pipeline_path: Path) -> Optional[List[Any]]: 

288 """ 

289 Load pipeline file with automatic migration if needed. 

290  

291 This is the main function that should be used by the PyQt GUI 

292 to load pipeline files with backward compatibility. 

293  

294 Args: 

295 pipeline_path: Path to pipeline file 

296  

297 Returns: 

298 List of pipeline steps or None if loading failed 

299 """ 

300 try: 

301 # Load pipeline using custom unpickler for enum migration 

302 with open(pipeline_path, 'rb') as f: 

303 unpickler = LegacyGroupByUnpickler(f) 

304 steps = unpickler.load() 

305 

306 if not isinstance(steps, list): 

307 logger.error(f"Invalid pipeline format: expected list, got {type(steps)}") 

308 return None 

309 

310 # Check if migration is needed 

311 if detect_legacy_pipeline(steps): 

312 logger.info(f"Migrating legacy pipeline format in {pipeline_path}") 

313 

314 # Migrate in-memory (don't modify the file unless explicitly requested) 

315 migrated_steps = migrate_pipeline_steps(steps) 

316 

317 # Optionally save the migrated version back to file 

318 # For now, just return the migrated steps without saving 

319 logger.info(f"Pipeline migrated in-memory. Use migrate_pipeline_file() to save changes.") 

320 return migrated_steps 

321 

322 return steps 

323 

324 except Exception as e: 

325 logger.error(f"Failed to load pipeline from {pipeline_path}: {e}") 

326 return None