Coverage for openhcs/io/virtual_workspace.py: 54.5%

150 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1"""Virtual Workspace Backend - Symlink-free workspace using metadata mapping.""" 

2 

3import logging 

4import json 

5from pathlib import Path 

6from typing import Any, Dict, List, Optional, Set, Union 

7from fnmatch import fnmatch 

8 

9from openhcs.io.disk import DiskStorageBackend 

10from openhcs.io.metadata_writer import get_metadata_path 

11from openhcs.io.exceptions import StorageResolutionError 

12from openhcs.io.base import ReadOnlyBackend 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class VirtualWorkspaceBackend(ReadOnlyBackend): 

18 """ 

19 Read-only path translation layer for virtual workspace. 

20 

21 Maps virtual filenames to real plate files using workspace_mapping from 

22 openhcs_metadata.json (plate-relative paths), then delegates I/O to DiskStorageBackend. 

23 

24 This is NOT a storage backend - it's a path resolver. It does not support save operations. 

25 

26 Follows OMERO backend pattern: 

27 - Explicit initialization with plate_root 

28 - Fail-loud path resolution 

29 - No path inspection or 'workspace' searching 

30 

31 Uses PLATE-RELATIVE paths (no workspace directory): 

32 - Mapping: {"Images/r01c01f05.tif": "Images/r01c01f01.tif"} 

33 - Resolution: plate_root / "Images/r01c01f05.tif" → plate_root / "Images/r01c01f01.tif" 

34 

35 Example: 

36 backend = VirtualWorkspaceBackend(plate_root=Path("/data/plate")) 

37 # Input: plate_root / "Images/r01c01f05.tif" (doesn't exist) 

38 # Resolves to: plate_root / "Images/r01c01f01.tif" (exists) 

39 """ 

40 

41 _backend_type = 'virtual_workspace' # Auto-registers via metaclass 

42 

43 def __init__(self, plate_root: Path): 

44 """ 

45 Initialize with explicit plate root. 

46 

47 Args: 

48 plate_root: Path to plate directory containing openhcs_metadata.json 

49 

50 Raises: 

51 FileNotFoundError: If metadata file doesn't exist 

52 ValueError: If no workspace_mapping in metadata 

53 """ 

54 self.plate_root = Path(plate_root) 

55 self.disk_backend = DiskStorageBackend() 

56 self._mapping_cache: Optional[Dict[str, str]] = None 

57 self._cache_mtime: Optional[float] = None 

58 

59 # Load mapping eagerly - fail loud if metadata missing 

60 self._load_mapping() 

61 

62 @staticmethod 

63 def _normalize_relative_path(path_str: str) -> str: 

64 """ 

65 Normalize relative path for internal mapping lookups. 

66 

67 Converts Windows backslashes to forward slashes and normalizes 

68 '.' (current directory) to empty string for root directory. 

69 

70 Args: 

71 path_str: Relative path string to normalize 

72 

73 Returns: 

74 Normalized path string with forward slashes, empty string for root 

75 """ 

76 normalized = path_str.replace('\\', '/') 

77 return '' if normalized == '.' else normalized 

78 

79 def _load_mapping(self) -> Dict[str, str]: 

80 """ 

81 Load workspace_mapping from metadata with mtime-based caching. 

82  

83 Returns: 

84 Combined mapping from all subdirectories 

85  

86 Raises: 

87 FileNotFoundError: If metadata file doesn't exist 

88 ValueError: If no workspace_mapping in metadata 

89 """ 

90 metadata_path = get_metadata_path(self.plate_root) 

91 if not metadata_path.exists(): 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 raise FileNotFoundError( 

93 f"Metadata not found: {metadata_path}\n" 

94 f"Plate root: {self.plate_root}" 

95 ) 

96 

97 # Check cache with mtime invalidation 

98 current_mtime = metadata_path.stat().st_mtime 

99 if self._mapping_cache is not None and self._cache_mtime == current_mtime: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 return self._mapping_cache 

101 

102 # Load and combine mappings from all subdirectories 

103 with open(metadata_path, 'r') as f: 

104 metadata = json.load(f) 

105 

106 combined_mapping = {} 

107 for subdir_data in metadata.get('subdirectories', {}).values(): 

108 workspace_mapping = subdir_data.get('workspace_mapping', {}) 

109 combined_mapping.update(workspace_mapping) 

110 

111 if not combined_mapping: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 raise ValueError( 

113 f"No workspace_mapping in {metadata_path}\n" 

114 f"Plate root: {self.plate_root}\n" 

115 f"This is not a virtual workspace." 

116 ) 

117 

118 # Cache it 

119 self._mapping_cache = combined_mapping 

120 self._cache_mtime = current_mtime 

121 

122 logger.info(f"Loaded {len(combined_mapping)} mappings for {self.plate_root}") 

123 return combined_mapping 

124 

125 def _resolve_path(self, path: Union[str, Path]) -> str: 

126 """ 

127 Resolve virtual path to real plate path using plate-relative mapping. 

128 

129 Pure mapping-based resolution - no physical path fallbacks. 

130 Follows OMERO backend pattern: all paths go through mapping. 

131 

132 Args: 

133 path: Absolute or relative path (e.g., "/data/plate/Images/r01c01f05.tif" or "Images/r01c01f05.tif") 

134 

135 Returns: 

136 Real absolute path: e.g., "/data/plate/Images/r01c01f01.tif" 

137 

138 Raises: 

139 StorageResolutionError: If path not in mapping 

140 """ 

141 path_obj = Path(path) 

142 

143 # Convert to plate-relative path 

144 try: 

145 relative_path = path_obj.relative_to(self.plate_root) 

146 except ValueError: 

147 # Already relative or different root 

148 relative_path = path_obj 

149 

150 # Normalize Windows backslashes to forward slashes 

151 relative_str = str(relative_path).replace('\\', '/') 

152 

153 # Load mapping if not cached 

154 if self._mapping_cache is None: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 self._load_mapping() 

156 

157 # Resolve via mapping - fail loud if not in mapping 

158 if relative_str not in self._mapping_cache: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true

159 raise StorageResolutionError( 

160 f"Path not in virtual workspace mapping: {relative_str}\n" 

161 f"Plate root: {self.plate_root}\n" 

162 f"Available virtual paths: {len(self._mapping_cache)}\n" 

163 f"This path must be accessed through the virtual workspace mapping." 

164 ) 

165 

166 real_relative = self._mapping_cache[relative_str] 

167 real_absolute = self.plate_root / real_relative 

168 logger.debug(f"Resolved virtual → real: {relative_str}{real_relative}") 

169 return str(real_absolute) 

170 

171 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

172 """Load file from virtual workspace.""" 

173 real_path = self._resolve_path(file_path) 

174 return self.disk_backend.load(real_path, **kwargs) 

175 

176 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

177 """Load multiple files from virtual workspace.""" 

178 real_paths = [self._resolve_path(fp) for fp in file_paths] 

179 return self.disk_backend.load_batch(real_paths, **kwargs) 

180 

181 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None, 

182 extensions: Optional[Set[str]] = None, recursive: bool = False, 

183 **kwargs) -> List[str]: 

184 """ 

185 List files in directory (returns absolute paths of virtual files). 

186 

187 Returns absolute virtual paths from mapping that match the directory. 

188 

189 Raises: 

190 ValueError: If mapping not loaded 

191 """ 

192 dir_path = Path(directory) 

193 

194 # Convert to plate-relative 

195 try: 

196 relative_dir = dir_path.relative_to(self.plate_root) 

197 except ValueError: 

198 # Already relative 

199 relative_dir = dir_path 

200 

201 # Normalize to forward slashes for comparison with JSON mapping 

202 relative_dir_str = self._normalize_relative_path(str(relative_dir)) 

203 

204 # Load mapping - fail loud if missing 

205 if self._mapping_cache is None: 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true

206 self._load_mapping() 

207 

208 logger.info(f"VirtualWorkspace.list_files called: directory={directory}, recursive={recursive}, pattern={pattern}, extensions={extensions}") 

209 logger.info(f" plate_root={self.plate_root}") 

210 logger.info(f" relative_dir_str='{relative_dir_str}'") 

211 logger.info(f" mapping has {len(self._mapping_cache)} entries") 

212 

213 # Filter paths in this directory 

214 results = [] 

215 for virtual_relative in self._mapping_cache.keys(): 

216 # Check directory match using string comparison with forward slashes 

217 if recursive: 

218 # For recursive, check if virtual_relative starts with directory prefix 

219 if relative_dir_str: 

220 if not virtual_relative.startswith(relative_dir_str + '/') and virtual_relative != relative_dir_str: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 continue 

222 # else: relative_dir_str is empty (root), include all files 

223 else: 

224 # For non-recursive, check if parent directory matches 

225 vpath_parent = self._normalize_relative_path(str(Path(virtual_relative).parent)) 

226 if vpath_parent != relative_dir_str: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 continue 

228 

229 # Apply filters 

230 vpath = Path(virtual_relative) 

231 if pattern and not fnmatch(vpath.name, pattern): 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 continue 

233 if extensions and vpath.suffix not in extensions: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 continue 

235 

236 # Return absolute path 

237 results.append(str(self.plate_root / virtual_relative)) 

238 

239 logger.info(f" VirtualWorkspace.list_files returning {len(results)} files") 

240 if len(results) == 0 and len(self._mapping_cache) > 0: 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was never true

241 # Log first few mapping keys to help debug 

242 sample_keys = list(self._mapping_cache.keys())[:3] 

243 logger.info(f" Sample mapping keys: {sample_keys}") 

244 if not recursive and relative_dir_str == '': 

245 sample_parents = [str(Path(k).parent).replace('\\', '/') for k in sample_keys] 

246 logger.info(f" Sample parent dirs: {sample_parents}") 

247 logger.info(f" Expected parent to match: '{relative_dir_str}'") 

248 

249 return sorted(results) 

250 

251 def list_dir(self, path: Union[str, Path]) -> List[str]: 

252 """ 

253 List directory entries (names only, not full paths). 

254 

255 For virtual workspace, this returns the unique directory names 

256 that exist in the mapping under the given path. 

257 """ 

258 path = Path(path) 

259 

260 # Convert to plate-relative path 

261 if path.is_absolute(): 

262 try: 

263 relative_path = path.relative_to(self.plate_root) 

264 except ValueError: 

265 # Path is not under plate_root 

266 raise FileNotFoundError(f"Path not under plate root: {path}") 

267 else: 

268 relative_path = path 

269 

270 # Normalize to string with forward slashes 

271 relative_str = self._normalize_relative_path(str(relative_path)) 

272 

273 # Collect all unique directory/file names under this path 

274 entries = set() 

275 for virtual_relative in self._mapping_cache.keys(): 

276 # Check if this virtual path is under the requested directory 

277 if relative_str: 

278 # Looking for children of a subdirectory 

279 if not virtual_relative.startswith(relative_str + '/'): 

280 continue 

281 # Get the part after the directory prefix 

282 remainder = virtual_relative[len(relative_str) + 1:] 

283 else: 

284 # Looking for top-level entries 

285 remainder = virtual_relative 

286 

287 # Get the first component (immediate child) 

288 first_component = remainder.split('/')[0] if '/' in remainder else remainder 

289 if first_component: 

290 entries.add(first_component) 

291 

292 return sorted(entries) 

293 

294 def exists(self, path: Union[str, Path]) -> bool: 

295 """Check if virtual path exists (file in mapping or directory containing files).""" 

296 if self._mapping_cache is None: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 self._load_mapping() 

298 

299 try: 

300 relative_str = str(Path(path).relative_to(self.plate_root)) 

301 except ValueError: 

302 relative_str = str(path) 

303 

304 # Normalize Windows backslashes to forward slashes and '.' to '' 

305 relative_str = self._normalize_relative_path(relative_str) 

306 

307 # File in mapping or directory prefix 

308 # For root directory (empty string), check if mapping has any files 

309 if relative_str == '': 

310 return len(self._mapping_cache) > 0 

311 

312 return (relative_str in self._mapping_cache or 

313 any(vp.startswith(relative_str + '/') for vp in self._mapping_cache)) 

314 

315 def is_file(self, path: Union[str, Path]) -> bool: 

316 """Check if virtual path is a file (exists in mapping directly).""" 

317 if self._mapping_cache is None: 

318 self._load_mapping() 

319 

320 try: 

321 relative_str = str(Path(path).relative_to(self.plate_root)) 

322 except ValueError: 

323 relative_str = str(path) 

324 

325 # Normalize Windows backslashes to forward slashes 

326 relative_str = relative_str.replace('\\', '/') 

327 

328 # File if it's directly in the mapping 

329 return relative_str in self._mapping_cache 

330 

331 def is_dir(self, path: Union[str, Path]) -> bool: 

332 """Check if virtual path is a directory (has files under it).""" 

333 if self._mapping_cache is None: 

334 self._load_mapping() 

335 

336 try: 

337 relative_str = str(Path(path).relative_to(self.plate_root)) 

338 except ValueError: 

339 relative_str = str(path) 

340 

341 # Normalize to string with forward slashes and '.' to '' 

342 relative_str = self._normalize_relative_path(relative_str) 

343 

344 # Directory if any virtual path starts with this prefix 

345 if relative_str: 

346 return any(vp.startswith(relative_str + '/') for vp in self._mapping_cache) 

347 else: 

348 # Root is always a directory if mapping exists 

349 return len(self._mapping_cache) > 0 

350