Coverage for openhcs/core/steps/function_step.py: 74.1%

623 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2FunctionStep implementation for pattern-based processing. 

3 

4This module contains the FunctionStep class. During execution, FunctionStep instances 

5are stateless regarding their configuration. All operational parameters, including 

6the function(s) to execute, special input/output keys, their VFS paths, and memory types, 

7are retrieved from this step's entry in `context.step_plans`. 

8""" 

9 

10import logging 

11import os 

12import time 

13from pathlib import Path 

14from typing import Any, Callable, Dict, List, Optional, Tuple, Union, OrderedDict as TypingOrderedDict, TYPE_CHECKING 

15 

16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17 because the condition on line 16 was never true

17 pass 

18 

19 

20from openhcs.constants.constants import (DEFAULT_IMAGE_EXTENSIONS, 

21 Backend, 

22 VariableComponents) 

23from openhcs.core.context.processing_context import ProcessingContext 

24from openhcs.core.steps.abstract import AbstractStep 

25from openhcs.formats.func_arg_prep import prepare_patterns_and_functions 

26from openhcs.core.memory.stack_utils import stack_slices, unstack_slices 

27# OpenHCS imports moved to local imports to avoid circular dependencies 

28 

29 

30logger = logging.getLogger(__name__) 

31 

32def _generate_materialized_paths(memory_paths: List[str], step_output_dir: Path, materialized_output_dir: Path) -> List[str]: 

33 """Generate materialized file paths by replacing step output directory.""" 

34 materialized_paths = [] 

35 for memory_path in memory_paths: 

36 relative_path = Path(memory_path).relative_to(step_output_dir) 

37 materialized_path = materialized_output_dir / relative_path 

38 materialized_paths.append(str(materialized_path)) 

39 return materialized_paths 

40 

41 

42def _filter_special_outputs_for_function( 

43 outputs_to_save: List[str], 

44 special_outputs_map: Dict, 

45 dict_key: str 

46) -> Dict: 

47 """Filter and build channel-specific paths for special outputs. 

48 

49 Args: 

50 outputs_to_save: List of output keys this function should save 

51 special_outputs_map: Map of all special outputs for the step 

52 dict_key: Dict pattern key (e.g., "1" for channel 1, or "default") 

53 

54 Returns: 

55 Filtered map with channel-specific paths for dict patterns 

56 """ 

57 from openhcs.core.pipeline.path_planner import PipelinePathPlanner 

58 

59 result = {} 

60 for key in outputs_to_save: 

61 if key in special_outputs_map: 61 ↛ 60line 61 didn't jump to line 60 because the condition on line 61 was always true

62 output_config = special_outputs_map[key].copy() 

63 

64 # For dict patterns, build channel-specific path 

65 if dict_key != "default": 

66 output_config['path'] = PipelinePathPlanner.build_dict_pattern_path( 

67 output_config['path'], dict_key 

68 ) 

69 

70 result[key] = output_config 

71 

72 return result 

73 

74 

75def _save_materialized_data(filemanager, memory_data: List, materialized_paths: List[str], 

76 materialized_backend: str, step_plan: Dict, context, axis_id: str) -> None: 

77 """Save data to materialized location using appropriate backend.""" 

78 

79 # Build kwargs with parser metadata (all backends receive it) 

80 save_kwargs = { 

81 'parser_name': context.microscope_handler.parser.__class__.__name__, 

82 'microscope_type': context.microscope_handler.microscope_type 

83 } 

84 

85 if materialized_backend == Backend.ZARR.value: 

86 n_channels, n_z, n_fields = _calculate_zarr_dimensions(materialized_paths, context.microscope_handler) 

87 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id) 

88 save_kwargs.update({ 

89 'chunk_name': axis_id, 

90 'zarr_config': step_plan.get("zarr_config"), 

91 'n_channels': n_channels, 

92 'n_z': n_z, 

93 'n_fields': n_fields, 

94 'row': row, 

95 'col': col 

96 }) 

97 

98 filemanager.save_batch(memory_data, materialized_paths, materialized_backend, **save_kwargs) 

99 

100 

101 

102 

103def get_all_image_paths(input_dir, backend, axis_id, filemanager, microscope_handler): 

104 """ 

105 Get all image file paths for a specific well from a directory. 

106 

107 Args: 

108 input_dir: Directory to search for images 

109 axis_id: Well identifier to filter files 

110 backend: Backend to use for file listing 

111 filemanager: FileManager instance 

112 microscope_handler: Microscope handler with parser for filename parsing 

113 

114 Returns: 

115 List of full file paths for the well 

116 """ 

117 # List all image files in directory 

118 all_image_files = filemanager.list_image_files(str(input_dir), backend) 

119 

120 # Filter by well using parser (FIXED: was using naive string matching) 

121 axis_files = [] 

122 parser = microscope_handler.parser 

123 

124 for f in all_image_files: 

125 filename = os.path.basename(str(f)) 

126 metadata = parser.parse_filename(filename) 

127 # Use dynamic multiprocessing axis instead of hardcoded 'well' 

128 from openhcs.constants import MULTIPROCESSING_AXIS 

129 axis_key = MULTIPROCESSING_AXIS.value 

130 if metadata and metadata.get(axis_key) == axis_id: 

131 axis_files.append(str(f)) 

132 

133 # Remove duplicates and sort 

134 sorted_files = sorted(list(set(axis_files))) 

135 

136 # Prepare full file paths 

137 input_dir_path = Path(input_dir) 

138 full_file_paths = [str(input_dir_path / Path(f).name) for f in sorted_files] 

139 

140 logger.debug(f"Found {len(all_image_files)} total files, {len(full_file_paths)} for axis {axis_id}") 

141 

142 return full_file_paths 

143 

144 

145def create_image_path_getter(axis_id, filemanager, microscope_handler): 

146 """ 

147 Create a specialized image path getter function using runtime context. 

148 

149 Args: 

150 axis_id: Well identifier 

151 filemanager: FileManager instance 

152 microscope_handler: Microscope handler with parser for filename parsing 

153 

154 Returns: 

155 Function that takes (input_dir, backend) and returns image paths for the well 

156 """ 

157 def get_paths_for_axis(input_dir, backend): 

158 return get_all_image_paths( 

159 input_dir=input_dir, 

160 axis_id=axis_id, 

161 backend=backend, 

162 filemanager=filemanager, 

163 microscope_handler=microscope_handler 

164 ) 

165 return get_paths_for_axis 

166 

167# Environment variable to disable universal GPU defragmentation 

168DISABLE_GPU_DEFRAG = os.getenv('OPENHCS_DISABLE_GPU_DEFRAG', 'false').lower() == 'true' 

169 

170def _bulk_preload_step_images( 

171 step_input_dir: Path, 

172 step_output_dir: Path, 

173 axis_id: str, 

174 read_backend: str, 

175 patterns_by_well: Dict[str, Any], 

176 filemanager: 'FileManager', 

177 microscope_handler: 'MicroscopeHandler', 

178 zarr_config: Optional[Dict[str, Any]] = None 

179) -> None: 

180 """ 

181 Pre-load all images for this step from source backend into memory backend. 

182 

183 This reduces I/O overhead by doing a single bulk read operation 

184 instead of loading images per pattern group. 

185 

186 Note: External conditional logic ensures this is only called for non-memory backends. 

187 """ 

188 import time 

189 start_time = time.time() 

190 

191 logger.debug(f"🔄 BULK PRELOAD: Loading images from {read_backend} to memory for well {axis_id}") 

192 

193 # Get all files for this well from patterns 

194 all_files = [] 

195 # Create specialized path getter for this well 

196 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler) 

197 

198 # Get all image paths for this well 

199 full_file_paths = get_paths_for_axis(step_input_dir, read_backend) 

200 

201 if not full_file_paths: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 raise RuntimeError(f"🔄 BULK PRELOAD: No files found for well {axis_id} in {step_input_dir} with backend {read_backend}") 

203 

204 # Load from source backend with conditional zarr_config 

205 if read_backend == Backend.ZARR.value: 

206 raw_images = filemanager.load_batch(full_file_paths, read_backend, zarr_config=zarr_config) 

207 else: 

208 raw_images = filemanager.load_batch(full_file_paths, read_backend) 

209 

210 # Ensure directory exists in memory backend before saving 

211 filemanager.ensure_directory(str(step_input_dir), Backend.MEMORY.value) 

212 

213 # Save to memory backend using OUTPUT paths 

214 # memory_paths = [str(step_output_dir / Path(fp).name) for fp in full_file_paths] 

215 for file_path in full_file_paths: 

216 if filemanager.exists(file_path, Backend.MEMORY.value): 

217 filemanager.delete(file_path, Backend.MEMORY.value) 

218 logger.debug(f"🔄 BULK PRELOAD: Deleted existing file {file_path} before bulk preload") 

219 

220 filemanager.save_batch(raw_images, full_file_paths, Backend.MEMORY.value) 

221 logger.debug(f"🔄 BULK PRELOAD: Saving {file_path} to memory") 

222 

223 # Clean up source references - keep only memory backend references 

224 del raw_images 

225 

226 load_time = time.time() - start_time 

227 logger.debug(f"🔄 BULK PRELOAD: Completed in {load_time:.2f}s - {len(full_file_paths)} images now in memory") 

228 

229def _bulk_writeout_step_images( 

230 step_output_dir: Path, 

231 write_backend: str, 

232 axis_id: str, 

233 zarr_config: Optional[Dict[str, Any]], 

234 filemanager: 'FileManager', 

235 microscope_handler: Optional[Any] = None 

236) -> None: 

237 """ 

238 Write all processed images from memory to final backend (disk/zarr). 

239 

240 This reduces I/O overhead by doing a single bulk write operation 

241 instead of writing images per pattern group. 

242 

243 Note: External conditional logic ensures this is only called for non-memory backends. 

244 """ 

245 import time 

246 start_time = time.time() 

247 

248 logger.debug(f"🔄 BULK WRITEOUT: Writing images from memory to {write_backend} for well {axis_id}") 

249 

250 # Create specialized path getter and get memory paths for this well 

251 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler) 

252 memory_file_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value) 

253 

254 if not memory_file_paths: 

255 raise RuntimeError(f"🔄 BULK WRITEOUT: No image files found for well {axis_id} in memory directory {step_output_dir}") 

256 

257 # Convert relative memory paths back to absolute paths for target backend 

258 # Memory backend stores relative paths, but target backend needs absolute paths 

259# file_paths = 

260# for memory_path in memory_file_paths: 

261# # Get just the filename and construct proper target path 

262# filename = Path(memory_path).name 

263# target_path = step_output_dir / filename 

264# file_paths.append(str(target_path)) 

265 

266 file_paths = memory_file_paths 

267 logger.debug(f"🔄 BULK WRITEOUT: Found {len(file_paths)} image files in memory to write") 

268 

269 # Load all data from memory backend 

270 memory_data = filemanager.load_batch(file_paths, Backend.MEMORY.value) 

271 

272 # Ensure output directory exists before bulk write 

273 filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value) 

274 

275 # Bulk write to target backend with conditional zarr_config 

276 if write_backend == Backend.ZARR.value: 

277 # Calculate zarr dimensions from file paths 

278 if microscope_handler is not None: 

279 n_channels, n_z, n_fields = _calculate_zarr_dimensions(file_paths, microscope_handler) 

280 # Parse well to get row and column for zarr structure 

281 row, col = microscope_handler.parser.extract_component_coordinates(axis_id) 

282 filemanager.save_batch(memory_data, file_paths, write_backend, 

283 chunk_name=axis_id, zarr_config=zarr_config, 

284 n_channels=n_channels, n_z=n_z, n_fields=n_fields, 

285 row=row, col=col) 

286 else: 

287 # Fallback without dimensions if microscope_handler not available 

288 filemanager.save_batch(memory_data, file_paths, write_backend, chunk_name=axis_id, zarr_config=zarr_config) 

289 else: 

290 filemanager.save_batch(memory_data, file_paths, write_backend) 

291 

292 write_time = time.time() - start_time 

293 logger.debug(f"🔄 BULK WRITEOUT: Completed in {write_time:.2f}s - {len(memory_data)} images written to {write_backend}") 

294 

295def _calculate_zarr_dimensions(file_paths: List[Union[str, Path]], microscope_handler) -> tuple[int, int, int]: 

296 """ 

297 Calculate zarr dimensions (n_channels, n_z, n_fields) from file paths using microscope parser. 

298 

299 Args: 

300 file_paths: List of file paths to analyze 

301 microscope_handler: Microscope handler with filename parser 

302 

303 Returns: 

304 Tuple of (n_channels, n_z, n_fields) 

305 """ 

306 parsed_files = [] 

307 for file_path in file_paths: 

308 filename = Path(file_path).name 

309 metadata = microscope_handler.parser.parse_filename(filename) 

310 parsed_files.append(metadata) 

311 

312 # Count unique values for each dimension from actual files 

313 n_channels = len(set(f.get('channel') for f in parsed_files if f.get('channel') is not None)) 

314 n_z = len(set(f.get('z_index') for f in parsed_files if f.get('z_index') is not None)) 

315 n_fields = len(set(f.get('site') for f in parsed_files if f.get('site') is not None)) 

316 

317 # Ensure at least 1 for each dimension (handle cases where metadata is missing) 

318 n_channels = max(1, n_channels) 

319 n_z = max(1, n_z) 

320 n_fields = max(1, n_fields) 

321 

322 return n_channels, n_z, n_fields 

323 

324 

325 

326def _is_3d(array: Any) -> bool: 

327 """Check if an array is 3D.""" 

328 return hasattr(array, 'ndim') and array.ndim == 3 

329 

330def _execute_function_core( 

331 func_callable: Callable, 

332 main_data_arg: Any, 

333 base_kwargs: Dict[str, Any], 

334 context: 'ProcessingContext', 

335 special_inputs_plan: Dict[str, str], # {'arg_name_for_func': 'special_path_value'} 

336 special_outputs_plan: TypingOrderedDict[str, str], # {'output_key': 'special_path_value'}, order matters 

337 axis_id: str, # Add axis_id parameter 

338 input_memory_type: str, 

339 device_id: int 

340) -> Any: # Returns the main processed data stack 

341 """ 

342 Executes a single callable, handling its special I/O. 

343 - Loads special inputs from VFS paths in `special_inputs_plan`. 

344 - Calls `func_callable(main_data_arg, **all_kwargs)`. 

345 - If `special_outputs_plan` is non-empty, expects func to return (main_out, sp_val1, sp_val2,...). 

346 - Saves special outputs positionally to VFS paths in `special_outputs_plan`. 

347 - Returns the main processed data stack. 

348 """ 

349 final_kwargs = base_kwargs.copy() 

350 

351 if special_inputs_plan: 

352 logger.info(f"�� SPECIAL_INPUTS_DEBUG : special_inputs_plan = {special_inputs_plan}") 

353 for arg_name, path_info in special_inputs_plan.items(): 

354 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Processing arg_name='{arg_name}', path_info={path_info} (type: {type(path_info)})") 

355 

356 

357 # Extract path string from the path info dictionary 

358 # Current format: {"path": "/path/to/file.pkl", "source_step_id": "step_123"} 

359 if isinstance(path_info, dict) and 'path' in path_info: 359 ↛ 363line 359 didn't jump to line 363 because the condition on line 359 was always true

360 special_path_value = path_info['path'] 

361 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Extracted path from dict: '{special_path_value}' (type: {type(special_path_value)})") 

362 else: 

363 special_path_value = path_info # Fallback if it's already a string 

364 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Using path_info directly: '{special_path_value}' (type: {type(special_path_value)})") 

365 

366 logger.info(f"Loading special input '{arg_name}' from path '{special_path_value}' (memory backend)") 

367 try: 

368 final_kwargs[arg_name] = context.filemanager.load(special_path_value, Backend.MEMORY.value) 

369 except Exception as e: 

370 logger.error(f"Failed to load special input '{arg_name}' from '{special_path_value}': {e}", exc_info=True) 

371 raise 

372 

373 # Auto-inject context if function signature expects it 

374 import inspect 

375 sig = inspect.signature(func_callable) 

376 if 'context' in sig.parameters: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 final_kwargs['context'] = context 

378 

379 # 🔍 DEBUG: Log input dimensions 

380 input_shape = getattr(main_data_arg, 'shape', 'no shape attr') 

381 input_type = type(main_data_arg).__name__ 

382 logger.debug(f"🔍 FUNCTION INPUT: {func_callable.__name__} - shape: {input_shape}, type: {input_type}") 

383 

384 # ⚡ INFO: Terse function execution log for user feedback 

385 logger.info(f"⚡ Executing: {func_callable.__name__}") 

386 

387 # 🔍 DEBUG: Log function attributes before execution 

388 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - special_outputs: {getattr(func_callable, '__special_outputs__', 'None')}") 

389 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - input_memory_type: {getattr(func_callable, 'input_memory_type', 'None')}") 

390 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - output_memory_type: {getattr(func_callable, 'output_memory_type', 'None')}") 

391 

392 raw_function_output = func_callable(main_data_arg, **final_kwargs) 

393 

394 # 🔍 DEBUG: Log output dimensions and type details 

395 output_shape = getattr(raw_function_output, 'shape', 'no shape attr') 

396 output_type = type(raw_function_output).__name__ 

397 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - shape: {output_shape}, type: {output_type}") 

398 

399 # 🔍 DEBUG: If it's a tuple, log details about each element 

400 if isinstance(raw_function_output, tuple): 

401 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - tuple length: {len(raw_function_output)}") 

402 for i, element in enumerate(raw_function_output): 

403 elem_shape = getattr(element, 'shape', 'no shape attr') 

404 elem_type = type(element).__name__ 

405 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - element[{i}]: shape={elem_shape}, type={elem_type}") 

406 else: 

407 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - not a tuple, single return value") 

408 

409 main_output_data = raw_function_output 

410 

411 # 🔍 DEBUG: Log special output plan status 

412 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: {special_outputs_plan}") 

413 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Is empty? {not special_outputs_plan}") 

414 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Length: {len(special_outputs_plan) if special_outputs_plan else 0}") 

415 

416 # Only log special outputs if there are any (avoid spamming empty dict logs) 

417 if special_outputs_plan: 

418 logger.debug(f"🔍 SPECIAL OUTPUT: {special_outputs_plan}") 

419 if special_outputs_plan: 

420 num_special_outputs = len(special_outputs_plan) 

421 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Expected {num_special_outputs} special outputs") 

422 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned type: {type(raw_function_output)}") 

423 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned tuple length: {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'not tuple'}") 

424 

425 if not isinstance(raw_function_output, tuple) or len(raw_function_output) != (1 + num_special_outputs): 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Function '{getattr(func_callable, '__name__', 'unknown')}' special output mismatch") 

427 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Expected tuple of {1 + num_special_outputs} values") 

428 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Got {type(raw_function_output)} with {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'N/A'} values") 

429 raise ValueError( 

430 f"Function '{getattr(func_callable, '__name__', 'unknown')}' was expected to return a tuple of " 

431 f"{1 + num_special_outputs} values (main_output + {num_special_outputs} special) " 

432 f"based on 'special_outputs' in step plan, but returned {len(raw_function_output) if isinstance(raw_function_output, tuple) else type(raw_function_output)} values." 

433 ) 

434 main_output_data = raw_function_output[0] 

435 returned_special_values_tuple = raw_function_output[1:] 

436 

437 # 🔍 DEBUG: Log what we extracted 

438 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data type: {type(main_output_data)}") 

439 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data shape: {getattr(main_output_data, 'shape', 'no shape')}") 

440 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted {len(returned_special_values_tuple)} special values") 

441 

442 # Iterate through special_outputs_plan (which must be ordered by compiler) 

443 # and match with positionally returned special values. 

444 for i, (output_key, vfs_path_info) in enumerate(special_outputs_plan.items()): 

445 logger.info(f"Saving special output '{output_key}' to VFS path '{vfs_path_info}' (memory backend)") 

446 if i < len(returned_special_values_tuple): 446 ↛ 469line 446 didn't jump to line 469 because the condition on line 446 was always true

447 value_to_save = returned_special_values_tuple[i] 

448 # Extract path string from the path info dictionary 

449 # Current format: {"path": "/path/to/file.pkl"} 

450 if isinstance(vfs_path_info, dict) and 'path' in vfs_path_info: 450 ↛ 453line 450 didn't jump to line 453 because the condition on line 450 was always true

451 vfs_path = vfs_path_info['path'] 

452 else: 

453 vfs_path = vfs_path_info # Fallback if it's already a string 

454 # # Add axis_id prefix to filename for memory backend to avoid thread collisions 

455 # from pathlib import Path 

456 # vfs_path_obj = Path(vfs_path) 

457 # prefixed_filename = f"{axis_id}_{vfs_path_obj.name}" 

458 # prefixed_vfs_path = str(vfs_path_obj.parent / prefixed_filename) 

459 

460 logger.info(f"🔍 SPECIAL_SAVE: Saving '{output_key}' to '{vfs_path}' (memory backend)") 

461 # Ensure directory exists for memory backend 

462 parent_dir = str(Path(vfs_path).parent) 

463 context.filemanager.ensure_directory(parent_dir, Backend.MEMORY.value) 

464 context.filemanager.save(value_to_save, vfs_path, Backend.MEMORY.value) 

465 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory") 

466 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory") 

467 else: 

468 # This indicates a mismatch that should ideally be caught by schema/validation 

469 logger.error(f"Mismatch: {num_special_outputs} special outputs planned, but fewer values returned by function for key '{output_key}'.") 

470 # Or, if partial returns are allowed, this might be a warning. For now, error. 

471 raise ValueError(f"Function did not return enough values for all planned special outputs. Missing value for '{output_key}'.") 

472 

473 return main_output_data 

474 

475def _execute_chain_core( 

476 initial_data_stack: Any, 

477 func_chain: List[Union[Callable, Tuple[Callable, Dict]]], 

478 context: 'ProcessingContext', 

479 step_special_inputs_plan: Dict[str, str], 

480 step_special_outputs_plan: TypingOrderedDict[str, str], 

481 axis_id: str, # Add axis_id parameter 

482 device_id: int, 

483 input_memory_type: str, 

484 step_index: int, # Add step_index for funcplan lookup 

485 dict_key: str = "default" # Add dict_key for funcplan lookup 

486) -> Any: 

487 current_stack = initial_data_stack 

488 current_memory_type = input_memory_type # Track memory type from frozen context 

489 

490 for i, func_item in enumerate(func_chain): 

491 actual_callable: Callable 

492 base_kwargs_for_item: Dict[str, Any] = {} 

493 is_last_in_chain = (i == len(func_chain) - 1) 

494 

495 if isinstance(func_item, tuple) and len(func_item) == 2 and callable(func_item[0]): 495 ↛ 497line 495 didn't jump to line 497 because the condition on line 495 was always true

496 actual_callable, base_kwargs_for_item = func_item 

497 elif callable(func_item): 

498 actual_callable = func_item 

499 else: 

500 raise TypeError(f"Invalid item in function chain: {func_item}.") 

501 

502 # Convert to function's input memory type (noop if same) 

503 from openhcs.core.memory.converters import convert_memory 

504 current_stack = convert_memory( 

505 data=current_stack, 

506 source_type=current_memory_type, 

507 target_type=actual_callable.input_memory_type, 

508 gpu_id=device_id 

509 ) 

510 

511 # Use funcplan to determine which outputs this function should save 

512 funcplan = context.step_plans[step_index].get("funcplan", {}) 

513 func_name = getattr(actual_callable, '__name__', 'unknown') 

514 

515 # Construct execution key: function_name_dict_key_chain_position 

516 execution_key = f"{func_name}_{dict_key}_{i}" 

517 

518 logger.info(f"🔍 FUNCPLAN DEBUG: execution_key = {execution_key}") 

519 logger.info(f"🔍 FUNCPLAN DEBUG: funcplan keys = {list(funcplan.keys()) if funcplan else 'EMPTY'}") 

520 logger.info(f"🔍 FUNCPLAN DEBUG: step_special_outputs_plan = {step_special_outputs_plan}") 

521 

522 if execution_key in funcplan: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 outputs_to_save = funcplan[execution_key] 

524 outputs_plan_for_this_call = _filter_special_outputs_for_function( 

525 outputs_to_save, step_special_outputs_plan, dict_key 

526 ) 

527 logger.info(f"🔍 FUNCPLAN: {execution_key} -> {outputs_to_save}") 

528 logger.info(f"🔍 FUNCPLAN: outputs_plan_for_this_call = {outputs_plan_for_this_call}") 

529 else: 

530 # Fallback: no funcplan entry, save nothing 

531 outputs_plan_for_this_call = {} 

532 logger.info(f"🔍 FUNCPLAN: No entry for {execution_key}, saving nothing") 

533 

534 current_stack = _execute_function_core( 

535 func_callable=actual_callable, 

536 main_data_arg=current_stack, 

537 base_kwargs=base_kwargs_for_item, 

538 context=context, 

539 special_inputs_plan=step_special_inputs_plan, 

540 special_outputs_plan=outputs_plan_for_this_call, 

541 axis_id=axis_id, 

542 device_id=device_id, 

543 input_memory_type=input_memory_type, 

544 ) 

545 

546 # Update current memory type from frozen context 

547 current_memory_type = actual_callable.output_memory_type 

548 

549 return current_stack 

550 

551def _process_single_pattern_group( 

552 context: 'ProcessingContext', 

553 pattern_group_info: Any, 

554 executable_func_or_chain: Any, 

555 base_func_args: Dict[str, Any], 

556 step_input_dir: Path, 

557 step_output_dir: Path, 

558 axis_id: str, 

559 component_value: str, 

560 read_backend: str, 

561 write_backend: str, 

562 input_memory_type_from_plan: str, # Explicitly from plan 

563 output_memory_type_from_plan: str, # Explicitly from plan 

564 device_id: Optional[int], 

565 same_directory: bool, 

566 special_inputs_map: Dict[str, str], 

567 special_outputs_map: TypingOrderedDict[str, str], 

568 zarr_config: Optional[Dict[str, Any]], 

569 variable_components: Optional[List[str]] = None, 

570 step_index: Optional[int] = None # Add step_index for funcplan lookup 

571) -> None: 

572 start_time = time.time() 

573 pattern_repr = str(pattern_group_info)[:100] 

574 logger.debug(f"🔥 PATTERN: Processing {pattern_repr} for well {axis_id}") 

575 

576 try: 

577 if not context.microscope_handler: 577 ↛ 578line 577 didn't jump to line 578 because the condition on line 577 was never true

578 raise RuntimeError("MicroscopeHandler not available in context.") 

579 

580 matching_files = context.microscope_handler.path_list_from_pattern( 

581 str(step_input_dir), pattern_group_info, context.filemanager, Backend.MEMORY.value, 

582 [vc.value for vc in variable_components] if variable_components else None 

583 ) 

584 

585 if not matching_files: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true

586 raise ValueError( 

587 f"No matching files found for pattern group {pattern_repr} in {step_input_dir}. " 

588 f"This indicates either: (1) no image files exist in the directory, " 

589 f"(2) files don't match the pattern, or (3) pattern parsing failed. " 

590 f"Check that input files exist and match the expected naming convention." 

591 ) 

592 

593 logger.debug(f"🔥 PATTERN: Found {len(matching_files)} files: {[Path(f).name for f in matching_files]}") 

594 

595 # Sort files to ensure consistent ordering (especially important for z-stacks) 

596 matching_files.sort() 

597 logger.debug(f"🔥 PATTERN: Sorted files: {[Path(f).name for f in matching_files]}") 

598 

599 full_file_paths = [str(step_input_dir / f) for f in matching_files] 

600 raw_slices = context.filemanager.load_batch(full_file_paths, Backend.MEMORY.value) 

601 

602 if not raw_slices: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 raise ValueError( 

604 f"No valid images loaded for pattern group {pattern_repr} in {step_input_dir}. " 

605 f"Found {len(matching_files)} matching files but failed to load any valid images. " 

606 f"This indicates corrupted image files, unsupported formats, or I/O errors. " 

607 f"Check file integrity and format compatibility." 

608 ) 

609 

610 # 🔍 DEBUG: Log stacking operation 

611 logger.debug(f"🔍 STACKING: {len(raw_slices)} slices → memory_type: {input_memory_type_from_plan}") 

612 if raw_slices: 612 ↛ 616line 612 didn't jump to line 616 because the condition on line 612 was always true

613 slice_shapes = [getattr(s, 'shape', 'no shape') for s in raw_slices[:3]] # First 3 shapes 

614 logger.debug(f"🔍 STACKING: Sample slice shapes: {slice_shapes}") 

615 

616 main_data_stack = stack_slices( 

617 slices=raw_slices, memory_type=input_memory_type_from_plan, gpu_id=device_id 

618 ) 

619 

620 # 🔍 DEBUG: Log stacked result 

621 stack_shape = getattr(main_data_stack, 'shape', 'no shape') 

622 stack_type = type(main_data_stack).__name__ 

623 logger.debug(f"🔍 STACKED RESULT: shape: {stack_shape}, type: {stack_type}") 

624 

625 logger.info(f"🔍 special_outputs_map: {special_outputs_map}") 

626 

627 final_base_kwargs = base_func_args.copy() 

628 

629 # Get step function from step plan 

630 step_func = context.step_plans[step_index]["func"] 

631 

632 if isinstance(step_func, dict): 

633 dict_key_for_funcplan = component_value # Use actual dict key for dict patterns 

634 else: 

635 dict_key_for_funcplan = "default" # Use default for list/single patterns 

636 

637 if isinstance(executable_func_or_chain, list): 

638 processed_stack = _execute_chain_core( 

639 main_data_stack, executable_func_or_chain, context, 

640 special_inputs_map, special_outputs_map, axis_id, 

641 device_id, input_memory_type_from_plan, step_index, dict_key_for_funcplan 

642 ) 

643 elif callable(executable_func_or_chain): 643 ↛ 670line 643 didn't jump to line 670 because the condition on line 643 was always true

644 # For single functions, apply funcplan filtering like in chain execution 

645 funcplan = context.step_plans[step_index].get("funcplan", {}) 

646 func_name = getattr(executable_func_or_chain, '__name__', 'unknown') 

647 execution_key = f"{func_name}_{dict_key_for_funcplan}_0" # Position 0 for single functions 

648 

649 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: execution_key = {execution_key}") 

650 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: funcplan keys = {list(funcplan.keys()) if funcplan else 'EMPTY'}") 

651 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: special_outputs_map = {special_outputs_map}") 

652 

653 if execution_key in funcplan: 

654 outputs_to_save = funcplan[execution_key] 

655 filtered_special_outputs_map = _filter_special_outputs_for_function( 

656 outputs_to_save, special_outputs_map, dict_key_for_funcplan 

657 ) 

658 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: {execution_key} -> {outputs_to_save}") 

659 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: filtered_special_outputs_map = {filtered_special_outputs_map}") 

660 else: 

661 # Fallback: no funcplan entry, save nothing 

662 filtered_special_outputs_map = {} 

663 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: No entry for {execution_key}, saving nothing") 

664 

665 processed_stack = _execute_function_core( 

666 executable_func_or_chain, main_data_stack, final_base_kwargs, context, 

667 special_inputs_map, filtered_special_outputs_map, axis_id, input_memory_type_from_plan, device_id 

668 ) 

669 else: 

670 raise TypeError(f"Invalid executable_func_or_chain: {type(executable_func_or_chain)}") 

671 

672 # 🔍 DEBUG: Check what shape the function actually returned 

673 input_shape = getattr(main_data_stack, 'shape', 'unknown') 

674 output_shape = getattr(processed_stack, 'shape', 'unknown') 

675 processed_type = type(processed_stack).__name__ 

676 logger.debug(f"🔍 PROCESSING RESULT: input: {input_shape} → output: {output_shape}, type: {processed_type}") 

677 

678 # 🔍 DEBUG: Additional validation logging 

679 logger.debug(f"🔍 VALIDATION: processed_stack type: {type(processed_stack)}") 

680 logger.debug(f"🔍 VALIDATION: processed_stack has shape attr: {hasattr(processed_stack, 'shape')}") 

681 logger.debug(f"🔍 VALIDATION: processed_stack has ndim attr: {hasattr(processed_stack, 'ndim')}") 

682 if hasattr(processed_stack, 'ndim'): 682 ↛ 684line 682 didn't jump to line 684 because the condition on line 682 was always true

683 logger.debug(f"🔍 VALIDATION: processed_stack ndim: {processed_stack.ndim}") 

684 if hasattr(processed_stack, 'shape'): 684 ↛ 687line 684 didn't jump to line 687 because the condition on line 684 was always true

685 logger.debug(f"🔍 VALIDATION: processed_stack shape: {processed_stack.shape}") 

686 

687 if not _is_3d(processed_stack): 687 ↛ 688line 687 didn't jump to line 688 because the condition on line 687 was never true

688 logger.error("🔍 VALIDATION ERROR: processed_stack is not 3D") 

689 logger.error(f"🔍 VALIDATION ERROR: Type: {type(processed_stack)}") 

690 logger.error(f"🔍 VALIDATION ERROR: Shape: {getattr(processed_stack, 'shape', 'no shape attr')}") 

691 logger.error(f"🔍 VALIDATION ERROR: Has ndim: {hasattr(processed_stack, 'ndim')}") 

692 if hasattr(processed_stack, 'ndim'): 

693 logger.error(f"🔍 VALIDATION ERROR: ndim value: {processed_stack.ndim}") 

694 raise ValueError(f"Main processing must result in a 3D array, got {getattr(processed_stack, 'shape', 'unknown')}") 

695 

696 # 🔍 DEBUG: Log unstacking operation 

697 logger.debug(f"🔍 UNSTACKING: shape: {output_shape} → memory_type: {output_memory_type_from_plan}") 

698 

699 

700 

701 output_slices = unstack_slices( 

702 array=processed_stack, memory_type=output_memory_type_from_plan, gpu_id=device_id, validate_slices=True 

703 ) 

704 

705 # 🔍 DEBUG: Log unstacked result 

706 if output_slices: 706 ↛ 712line 706 didn't jump to line 712 because the condition on line 706 was always true

707 unstacked_shapes = [getattr(s, 'shape', 'no shape') for s in output_slices[:3]] # First 3 shapes 

708 logger.debug(f"🔍 UNSTACKED RESULT: {len(output_slices)} slices, sample shapes: {unstacked_shapes}") 

709 

710 # Handle cases where function returns fewer images than inputs (e.g., z-stack flattening, channel compositing) 

711 # In such cases, we save only the returned images using the first N input filenames 

712 num_outputs = len(output_slices) 

713 num_inputs = len(matching_files) 

714 

715 if num_outputs < num_inputs: 

716 logger.debug(f"Function returned {num_outputs} images from {num_inputs} inputs - likely flattening operation") 

717 elif num_outputs > num_inputs: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true

718 logger.warning(f"Function returned more images ({num_outputs}) than inputs ({num_inputs}) - unexpected") 

719 

720 # Save the output images using batch operations 

721 try: 

722 # Prepare batch data 

723 output_data = [] 

724 output_paths_batch = [] 

725 

726 for i, img_slice in enumerate(output_slices): 

727 # FAIL FAST: No fallback filenames - if we have more outputs than inputs, something is wrong 

728 if i >= len(matching_files): 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true

729 raise ValueError( 

730 f"Function returned {num_outputs} output slices but only {num_inputs} input files available. " 

731 f"Cannot generate filename for output slice {i}. This indicates a bug in the function or " 

732 f"unstacking logic - functions should return same or fewer images than inputs." 

733 ) 

734 

735 input_filename = matching_files[i] 

736 output_filename = Path(input_filename).name 

737 output_path = Path(step_output_dir) / output_filename 

738 

739 # Always ensure we can write to the output path (delete if exists) 

740 if context.filemanager.exists(str(output_path), Backend.MEMORY.value): 

741 context.filemanager.delete(str(output_path), Backend.MEMORY.value) 

742 

743 output_data.append(img_slice) 

744 output_paths_batch.append(str(output_path)) 

745 

746 # Ensure directory exists 

747 context.filemanager.ensure_directory(str(step_output_dir), Backend.MEMORY.value) 

748 

749 # Only pass zarr_config to zarr backend - fail loud for invalid parameters 

750 #if write_backend == Backend.ZARR.value: 

751 # Batch save 

752 # context.filemanager.save_batch(output_data, output_paths_batch, write_backend, zarr_config=zarr_config) 

753 # else: 

754 context.filemanager.save_batch(output_data, output_paths_batch, Backend.MEMORY.value) 

755 

756 except Exception as e: 

757 logger.error(f"Error saving batch of output slices for pattern {pattern_repr}: {e}", exc_info=True) 

758 

759 # 🔥 CLEANUP: If function returned fewer images than inputs, delete the unused input files 

760 # This prevents unused channel files from remaining in memory after compositing 

761 if num_outputs < num_inputs: 

762 for j in range(num_outputs, num_inputs): 

763 unused_input_filename = matching_files[j] 

764 unused_input_path = Path(step_input_dir) / unused_input_filename 

765 if context.filemanager.exists(str(unused_input_path), Backend.MEMORY.value): 765 ↛ 762line 765 didn't jump to line 762 because the condition on line 765 was always true

766 context.filemanager.delete(str(unused_input_path), Backend.MEMORY.value) 

767 logger.debug(f"🔥 CLEANUP: Deleted unused input file: {unused_input_filename}") 

768 

769 

770 

771 logger.debug(f"Finished pattern group {pattern_repr} in {(time.time() - start_time):.2f}s.") 

772 except Exception as e: 

773 import traceback 

774 full_traceback = traceback.format_exc() 

775 logger.error(f"Error processing pattern group {pattern_repr}: {e}", exc_info=True) 

776 logger.error(f"Full traceback for pattern group {pattern_repr}:\n{full_traceback}") 

777 raise ValueError(f"Failed to process pattern group {pattern_repr}: {e}") from e 

778 

779class FunctionStep(AbstractStep): 

780 

781 def __init__( 

782 self, 

783 func: Union[Callable, Tuple[Callable, Dict], List[Union[Callable, Tuple[Callable, Dict]]]], 

784 **kwargs 

785 ): 

786 # Generate default name from function if not provided 

787 if 'name' not in kwargs or kwargs['name'] is None: 

788 actual_func_for_name = func 

789 if isinstance(func, tuple): 789 ↛ 790line 789 didn't jump to line 790 because the condition on line 789 was never true

790 actual_func_for_name = func[0] 

791 elif isinstance(func, list) and func: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true

792 first_item = func[0] 

793 if isinstance(first_item, tuple): 

794 actual_func_for_name = first_item[0] 

795 elif callable(first_item): 

796 actual_func_for_name = first_item 

797 kwargs['name'] = getattr(actual_func_for_name, '__name__', 'FunctionStep') 

798 

799 super().__init__(**kwargs) 

800 self.func = func # This is used by prepare_patterns_and_functions at runtime 

801 

802 def process(self, context: 'ProcessingContext', step_index: int) -> None: 

803 # Access step plan by index (step_plans keyed by index, not step_id) 

804 step_plan = context.step_plans[step_index] 

805 

806 # Get step name for logging 

807 step_name = step_plan['step_name'] 

808 

809 try: 

810 axis_id = step_plan['axis_id'] 

811 step_input_dir = Path(step_plan['input_dir']) 

812 step_output_dir = Path(step_plan['output_dir']) 

813 variable_components = step_plan['variable_components'] 

814 group_by = step_plan['group_by'] 

815 func_from_plan = step_plan['func'] 

816 

817 # special_inputs/outputs are dicts: {'key': 'vfs_path_value'} 

818 special_inputs = step_plan['special_inputs'] 

819 special_outputs = step_plan['special_outputs'] # Should be OrderedDict if order matters 

820 

821 read_backend = step_plan['read_backend'] 

822 write_backend = step_plan['write_backend'] 

823 input_mem_type = step_plan['input_memory_type'] 

824 output_mem_type = step_plan['output_memory_type'] 

825 microscope_handler = context.microscope_handler 

826 filemanager = context.filemanager 

827 

828 # Create path getter for this well 

829 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler) 

830 

831 # Store path getter in step_plan for streaming access 

832 step_plan["get_paths_for_axis"] = get_paths_for_axis 

833 

834 # Get patterns first for bulk preload 

835 # Use dynamic filter parameter based on current multiprocessing axis 

836 from openhcs.constants import MULTIPROCESSING_AXIS 

837 axis_name = MULTIPROCESSING_AXIS.value 

838 filter_kwargs = {f"{axis_name}_filter": [axis_id]} 

839 

840 patterns_by_well = microscope_handler.auto_detect_patterns( 

841 str(step_input_dir), # folder_path 

842 filemanager, # filemanager 

843 read_backend, # backend 

844 extensions=DEFAULT_IMAGE_EXTENSIONS, # extensions 

845 group_by=group_by, # Pass GroupBy enum directly 

846 variable_components=[vc.value for vc in variable_components] if variable_components else [], # variable_components for placeholder logic 

847 **filter_kwargs # Dynamic filter parameter 

848 ) 

849 

850 

851 # Only access gpu_id if the step requires GPU (has GPU memory types) 

852 from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES 

853 requires_gpu = (input_mem_type in VALID_GPU_MEMORY_TYPES or 

854 output_mem_type in VALID_GPU_MEMORY_TYPES) 

855 

856 # Ensure variable_components is never None - use default if missing 

857 if variable_components is None: 857 ↛ 858line 857 didn't jump to line 858 because the condition on line 857 was never true

858 variable_components = [VariableComponents.SITE] # Default fallback 

859 logger.warning(f"Step {step_index} ({step_name}) had None variable_components, using default [SITE]") 

860 if requires_gpu: 860 ↛ 861line 860 didn't jump to line 861 because the condition on line 860 was never true

861 device_id = step_plan['gpu_id'] 

862 logger.debug(f"🔥 DEBUG: Step {step_index} gpu_id from plan: {device_id}, input_mem: {input_mem_type}, output_mem: {output_mem_type}") 

863 else: 

864 device_id = None # CPU-only step 

865 logger.debug(f"🔥 DEBUG: Step {step_index} is CPU-only, input_mem: {input_mem_type}, output_mem: {output_mem_type}") 

866 

867 logger.debug(f"🔥 DEBUG: Step {step_index} read_backend: {read_backend}, write_backend: {write_backend}") 

868 

869 if not all([axis_id, step_input_dir, step_output_dir]): 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true

870 raise ValueError(f"Plan missing essential keys for step {step_index}") 

871 

872 same_dir = str(step_input_dir) == str(step_output_dir) 

873 logger.info(f"Step {step_index} ({step_name}) I/O: read='{read_backend}', write='{write_backend}'.") 

874 logger.info(f"Step {step_index} ({step_name}) Paths: input_dir='{step_input_dir}', output_dir='{step_output_dir}', same_dir={same_dir}") 

875 

876 # 🔄 MATERIALIZATION READ: Bulk preload if not reading from memory 

877 if read_backend != Backend.MEMORY.value: 

878 _bulk_preload_step_images(step_input_dir, step_output_dir, axis_id, read_backend, 

879 patterns_by_well,filemanager, microscope_handler, step_plan["zarr_config"]) 

880 

881 # 🔄 INPUT CONVERSION: Convert loaded input data to zarr if configured 

882 if "input_conversion_dir" in step_plan: 

883 input_conversion_dir = step_plan["input_conversion_dir"] 

884 input_conversion_backend = step_plan["input_conversion_backend"] 

885 

886 logger.info(f"Converting input data to zarr: {input_conversion_dir}") 

887 

888 # Get memory paths from input data (already loaded) 

889 memory_paths = get_paths_for_axis(step_input_dir, Backend.MEMORY.value) 

890 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

891 

892 # Generate conversion paths (input_dir → conversion_dir) 

893 conversion_paths = _generate_materialized_paths(memory_paths, Path(step_input_dir), Path(input_conversion_dir)) 

894 

895 # Parse actual filenames to determine dimensions 

896 # Calculate zarr dimensions from conversion paths (which contain the filenames) 

897 n_channels, n_z, n_fields = _calculate_zarr_dimensions(conversion_paths, context.microscope_handler) 

898 # Parse well to get row and column for zarr structure 

899 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id) 

900 

901 # Save using existing materialized data infrastructure 

902 _save_materialized_data(filemanager, memory_data, conversion_paths, input_conversion_backend, step_plan, context, axis_id) 

903 

904 logger.info(f"🔬 Converted {len(conversion_paths)} input files to {input_conversion_dir}") 

905 

906 # Update metadata after conversion 

907 conversion_dir = Path(step_plan["input_conversion_dir"]) 

908 zarr_subdir = conversion_dir.name if step_plan["input_conversion_uses_virtual_workspace"] else None 

909 _update_metadata_for_zarr_conversion( 

910 conversion_dir.parent, 

911 step_plan["input_conversion_original_subdir"], 

912 zarr_subdir, 

913 context 

914 ) 

915 

916 logger.info(f"🔥 STEP: Starting processing for '{step_name}' well {axis_id} (group_by={group_by.name if group_by else None}, variable_components={[vc.name for vc in variable_components] if variable_components else []})") 

917 

918 if axis_id not in patterns_by_well: 918 ↛ 919line 918 didn't jump to line 919 because the condition on line 918 was never true

919 raise ValueError( 

920 f"No patterns detected for well '{axis_id}' in step '{step_name}' (index: {step_index}). " 

921 f"This indicates either: (1) no image files found for this well, " 

922 f"(2) image files don't match the expected naming pattern, or " 

923 f"(3) pattern detection failed. Check input directory: {step_input_dir}" 

924 ) 

925 

926 if isinstance(patterns_by_well[axis_id], dict): 926 ↛ 932line 926 didn't jump to line 932 because the condition on line 926 was always true

927 # Grouped patterns (when group_by is set) 

928 for comp_val, pattern_list in patterns_by_well[axis_id].items(): 

929 logger.debug(f"🔥 STEP: Component '{comp_val}' has {len(pattern_list)} patterns: {pattern_list}") 

930 else: 

931 # Ungrouped patterns (when group_by is None) 

932 logger.debug(f"🔥 STEP: Found {len(patterns_by_well[axis_id])} ungrouped patterns: {patterns_by_well[axis_id]}") 

933 

934 if func_from_plan is None: 934 ↛ 935line 934 didn't jump to line 935 because the condition on line 934 was never true

935 raise ValueError(f"Step plan missing 'func' for step: {step_plan.get('step_name', 'Unknown')} (index: {step_index})") 

936 

937 grouped_patterns, comp_to_funcs, comp_to_base_args = prepare_patterns_and_functions( 

938 patterns_by_well[axis_id], func_from_plan, component=group_by.value if group_by else None 

939 ) 

940 

941 logger.info(f"🔍 DICT_PATTERN: grouped_patterns keys: {list(grouped_patterns.keys())}") 

942 logger.info(f"🔍 DICT_PATTERN: comp_to_funcs keys: {list(comp_to_funcs.keys())}") 

943 logger.info(f"🔍 DICT_PATTERN: func_from_plan type: {type(func_from_plan)}") 

944 if isinstance(func_from_plan, dict): 

945 logger.info(f"🔍 DICT_PATTERN: func_from_plan keys: {list(func_from_plan.keys())}") 

946 

947 for comp_val, current_pattern_list in grouped_patterns.items(): 

948 logger.info(f"🔍 DICT_PATTERN: Processing component '{comp_val}' with {len(current_pattern_list)} patterns") 

949 exec_func_or_chain = comp_to_funcs[comp_val] 

950 base_kwargs = comp_to_base_args[comp_val] 

951 logger.info(f"🔍 DICT_PATTERN: Component '{comp_val}' exec_func_or_chain: {exec_func_or_chain}") 

952 for pattern_item in current_pattern_list: 

953 _process_single_pattern_group( 

954 context, pattern_item, exec_func_or_chain, base_kwargs, 

955 step_input_dir, step_output_dir, axis_id, comp_val, 

956 read_backend, write_backend, input_mem_type, output_mem_type, 

957 device_id, same_dir, 

958 special_inputs, special_outputs, # Pass the maps from step_plan 

959 step_plan["zarr_config"], 

960 variable_components, step_index # Pass step_index for funcplan lookup 

961 ) 

962 logger.info(f"🔥 STEP: Completed processing for '{step_name}' well {axis_id}.") 

963 

964 # 📄 MATERIALIZATION WRITE: Only if not writing to memory 

965 if write_backend != Backend.MEMORY.value: 

966 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value) 

967 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

968 # Calculate zarr dimensions (ignored by non-zarr backends) 

969 n_channels, n_z, n_fields = _calculate_zarr_dimensions(memory_paths, context.microscope_handler) 

970 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id) 

971 filemanager.ensure_directory(step_output_dir, write_backend) 

972 

973 # Build save kwargs with parser metadata for all backends 

974 save_kwargs = { 

975 'chunk_name': axis_id, 

976 'zarr_config': step_plan["zarr_config"], 

977 'n_channels': n_channels, 

978 'n_z': n_z, 

979 'n_fields': n_fields, 

980 'row': row, 

981 'col': col, 

982 'parser_name': context.microscope_handler.parser.__class__.__name__, 

983 'microscope_type': context.microscope_handler.microscope_type 

984 } 

985 

986 filemanager.save_batch(memory_data, memory_paths, write_backend, **save_kwargs) 

987 

988 # 📄 PER-STEP MATERIALIZATION: Additional materialized output if configured 

989 if "materialized_output_dir" in step_plan: 

990 materialized_output_dir = step_plan["materialized_output_dir"] 

991 materialized_backend = step_plan["materialized_backend"] 

992 

993 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value) 

994 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

995 materialized_paths = _generate_materialized_paths(memory_paths, step_output_dir, Path(materialized_output_dir)) 

996 

997 filemanager.ensure_directory(materialized_output_dir, materialized_backend) 

998 _save_materialized_data(filemanager, memory_data, materialized_paths, materialized_backend, step_plan, context, axis_id) 

999 

1000 logger.info(f"🔬 Materialized {len(materialized_paths)} files to {materialized_output_dir}") 

1001 

1002 # 📄 STREAMING: Execute all configured streaming backends 

1003 from openhcs.core.config import StreamingConfig 

1004 

1005 streaming_configs_found = [] 

1006 for key, config_instance in step_plan.items(): 

1007 if isinstance(config_instance, StreamingConfig): 

1008 streaming_configs_found.append((key, config_instance)) 

1009 

1010 for key, config_instance in streaming_configs_found: 

1011 # Get paths at runtime like materialization does 

1012 step_output_dir = step_plan["output_dir"] 

1013 get_paths_for_axis = step_plan["get_paths_for_axis"] # Get the path getter from step_plan 

1014 

1015 # Get memory paths (where data actually is) 

1016 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value) 

1017 

1018 # For materialized steps, use materialized paths for streaming (for correct source extraction) 

1019 # but load from memory paths (where data actually is) 

1020 if "materialized_output_dir" in step_plan: 1020 ↛ 1027line 1020 didn't jump to line 1027 because the condition on line 1020 was always true

1021 materialized_output_dir = step_plan["materialized_output_dir"] 

1022 streaming_paths = _generate_materialized_paths(memory_paths, step_output_dir, Path(materialized_output_dir)) 

1023 logger.info(f"🔍 STREAMING: Materialized step - loading from memory, streaming with materialized paths") 

1024 logger.info(f"🔍 STREAMING: First memory path: {memory_paths[0] if memory_paths else 'NONE'}") 

1025 logger.info(f"🔍 STREAMING: First streaming path: {streaming_paths[0] if streaming_paths else 'NONE'}") 

1026 else: 

1027 streaming_paths = memory_paths 

1028 

1029 # Load from memory (where data actually is) 

1030 streaming_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

1031 kwargs = config_instance.get_streaming_kwargs(context) # Pass context for microscope handler access 

1032 

1033 # Add pre-built source value for layer/window naming 

1034 # During pipeline execution: source = step_name 

1035 kwargs["source"] = step_name 

1036 

1037 # Execute streaming - use streaming_paths (materialized paths) for metadata extraction 

1038 filemanager.save_batch(streaming_data, streaming_paths, config_instance.backend.value, **kwargs) 

1039 

1040 # Add small delay between image and ROI streaming to prevent race conditions 

1041 import time 

1042 time.sleep(0.1) 

1043 

1044 logger.info(f"FunctionStep {step_index} ({step_name}) completed for well {axis_id}.") 

1045 

1046 # 📄 OPENHCS METADATA: Create metadata file automatically after step completion 

1047 # Track which backend was actually used for writing files 

1048 actual_write_backend = step_plan['write_backend'] 

1049 

1050 # Only create OpenHCS metadata for disk/zarr backends, not OMERO 

1051 # OMERO has its own metadata system and doesn't use openhcs_metadata.json 

1052 if actual_write_backend not in [Backend.OMERO_LOCAL.value, Backend.MEMORY.value]: 

1053 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator 

1054 metadata_generator = OpenHCSMetadataGenerator(context.filemanager) 

1055 

1056 # Main step output metadata 

1057 is_pipeline_output = (actual_write_backend != Backend.MEMORY.value) 

1058 metadata_generator.create_metadata( 

1059 context, 

1060 step_plan['output_dir'], 

1061 actual_write_backend, 

1062 is_main=is_pipeline_output, 

1063 plate_root=step_plan['output_plate_root'], 

1064 sub_dir=step_plan['sub_dir'], 

1065 results_dir=step_plan.get('analysis_results_dir') # Pass pre-calculated results directory 

1066 ) 

1067 

1068 # 📄 MATERIALIZED METADATA: Create metadata for materialized directory if it exists 

1069 # This must be OUTSIDE the main write_backend check because materializations 

1070 # can happen even when the main step writes to memory 

1071 if 'materialized_output_dir' in step_plan: 

1072 materialized_backend = step_plan['materialized_backend'] 

1073 # Only create metadata if materialized backend is also disk/zarr 

1074 if materialized_backend not in [Backend.OMERO_LOCAL.value, Backend.MEMORY.value]: 1074 ↛ 1088line 1074 didn't jump to line 1088 because the condition on line 1074 was always true

1075 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator 

1076 metadata_generator = OpenHCSMetadataGenerator(context.filemanager) 

1077 metadata_generator.create_metadata( 

1078 context, 

1079 step_plan['materialized_output_dir'], 

1080 materialized_backend, 

1081 is_main=False, 

1082 plate_root=step_plan['materialized_plate_root'], 

1083 sub_dir=step_plan['materialized_sub_dir'], 

1084 results_dir=step_plan.get('materialized_analysis_results_dir') # Pass pre-calculated materialized results directory 

1085 ) 

1086 

1087 # SPECIAL DATA MATERIALIZATION 

1088 special_outputs = step_plan.get('special_outputs', {}) 

1089 logger.debug(f"🔍 MATERIALIZATION: special_outputs from step_plan: {special_outputs}") 

1090 logger.debug(f"🔍 MATERIALIZATION: special_outputs is empty? {not special_outputs}") 

1091 if special_outputs: 

1092 logger.info(f"🔬 MATERIALIZATION: Starting materialization for {len(special_outputs)} special outputs") 

1093 # Special outputs ALWAYS use the main materialization backend (disk/zarr), 

1094 # not the step's write backend (which may be memory for intermediate steps). 

1095 # This ensures analysis results are always persisted to disk. 

1096 from openhcs.core.pipeline.materialization_flag_planner import MaterializationFlagPlanner 

1097 vfs_config = context.get_vfs_config() 

1098 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config) 

1099 logger.debug(f"🔍 MATERIALIZATION: Using materialization backend '{materialization_backend}' for special outputs (step write backend is '{actual_write_backend}')") 

1100 self._materialize_special_outputs(filemanager, step_plan, special_outputs, materialization_backend, context) 

1101 logger.info("🔬 MATERIALIZATION: Completed materialization") 

1102 else: 

1103 logger.debug("🔍 MATERIALIZATION: No special outputs to materialize") 

1104 

1105 

1106 

1107 except Exception as e: 

1108 import traceback 

1109 full_traceback = traceback.format_exc() 

1110 logger.error(f"Error in FunctionStep {step_index} ({step_name}): {e}", exc_info=True) 

1111 logger.error(f"Full traceback for FunctionStep {step_index} ({step_name}):\n{full_traceback}") 

1112 

1113 

1114 

1115 raise 

1116 

1117 

1118 def _extract_component_metadata(self, context: 'ProcessingContext', component: 'VariableComponents') -> Optional[Dict[str, str]]: 

1119 """ 

1120 Extract component metadata from context cache safely. 

1121 

1122 Args: 

1123 context: ProcessingContext containing metadata_cache 

1124 component: VariableComponents enum specifying which component to extract 

1125 

1126 Returns: 

1127 Dictionary mapping component keys to display names, or None if not available 

1128 """ 

1129 try: 

1130 if hasattr(context, 'metadata_cache') and context.metadata_cache: 

1131 return context.metadata_cache.get(component, None) 

1132 else: 

1133 logger.debug(f"No metadata_cache available in context for {component.value}") 

1134 return None 

1135 except Exception as e: 

1136 logger.debug(f"Error extracting {component.value} metadata from cache: {e}") 

1137 return None 

1138 

1139 def _create_openhcs_metadata_for_materialization( 

1140 self, 

1141 context: 'ProcessingContext', 

1142 output_dir: str, 

1143 write_backend: str 

1144 ) -> None: 

1145 """ 

1146 Create OpenHCS metadata file for materialization writes. 

1147 

1148 Args: 

1149 context: ProcessingContext containing microscope_handler and other state 

1150 output_dir: Output directory path where metadata should be written 

1151 write_backend: Backend being used for the write (disk/zarr) 

1152 """ 

1153 # Only create OpenHCS metadata for disk/zarr backends 

1154 # OMERO has its own metadata system, memory doesn't need metadata 

1155 if write_backend in [Backend.MEMORY.value, Backend.OMERO_LOCAL.value]: 

1156 logger.debug(f"Skipping metadata creation (backend={write_backend})") 

1157 return 

1158 

1159 logger.debug(f"Creating metadata for materialization write: {write_backend} -> {output_dir}") 

1160 

1161 try: 

1162 # Extract required information 

1163 step_output_dir = Path(output_dir) 

1164 

1165 # Check if we have microscope handler for metadata extraction 

1166 if not context.microscope_handler: 

1167 logger.debug("No microscope_handler in context - skipping OpenHCS metadata creation") 

1168 return 

1169 

1170 # Get source microscope information 

1171 source_parser_name = context.microscope_handler.parser.__class__.__name__ 

1172 

1173 # Extract metadata from source microscope handler 

1174 try: 

1175 grid_dimensions = context.microscope_handler.metadata_handler.get_grid_dimensions(context.input_dir) 

1176 pixel_size = context.microscope_handler.metadata_handler.get_pixel_size(context.input_dir) 

1177 except Exception as e: 

1178 logger.debug(f"Could not extract grid_dimensions/pixel_size from source: {e}") 

1179 grid_dimensions = [1, 1] # Default fallback 

1180 pixel_size = 1.0 # Default fallback 

1181 

1182 # Get list of image files in output directory 

1183 try: 

1184 image_files = [] 

1185 if context.filemanager.exists(str(step_output_dir), write_backend): 

1186 # List files in output directory 

1187 files = context.filemanager.list_files(str(step_output_dir), write_backend) 

1188 # Filter for image files (common extensions) and convert to strings 

1189 image_extensions = {'.tif', '.tiff', '.png', '.jpg', '.jpeg'} 

1190 image_files = [str(f) for f in files if Path(f).suffix.lower() in image_extensions] 

1191 logger.debug(f"Found {len(image_files)} image files in {step_output_dir}") 

1192 except Exception as e: 

1193 logger.debug(f"Could not list image files in output directory: {e}") 

1194 image_files = [] 

1195 

1196 # Detect available backends based on actual output files 

1197 available_backends = self._detect_available_backends(step_output_dir) 

1198 

1199 # Create metadata structure 

1200 metadata = { 

1201 "microscope_handler_name": context.microscope_handler.microscope_type, 

1202 "source_filename_parser_name": source_parser_name, 

1203 "grid_dimensions": list(grid_dimensions) if hasattr(grid_dimensions, '__iter__') else [1, 1], 

1204 "pixel_size": float(pixel_size) if pixel_size is not None else 1.0, 

1205 "image_files": image_files, 

1206 "channels": self._extract_component_metadata(context, VariableComponents.CHANNEL), 

1207 "wells": self._extract_component_metadata(context, VariableComponents.WELL), 

1208 "sites": self._extract_component_metadata(context, VariableComponents.SITE), 

1209 "z_indexes": self._extract_component_metadata(context, VariableComponents.Z_INDEX), 

1210 "timepoints": self._extract_component_metadata(context, VariableComponents.TIMEPOINT), 

1211 "available_backends": available_backends 

1212 } 

1213 

1214 # Save metadata file using disk backend (JSON files always on disk) 

1215 from openhcs.microscopes.openhcs import OpenHCSMetadataHandler 

1216 metadata_path = step_output_dir / OpenHCSMetadataHandler.METADATA_FILENAME 

1217 

1218 # Always ensure we can write to the metadata path (delete if exists) 

1219 if context.filemanager.exists(str(metadata_path), Backend.DISK.value): 

1220 context.filemanager.delete(str(metadata_path), Backend.DISK.value) 

1221 

1222 # Ensure output directory exists on disk 

1223 context.filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value) 

1224 

1225 # Create JSON content - OpenHCS handler expects JSON format 

1226 import json 

1227 json_content = json.dumps(metadata, indent=2) 

1228 context.filemanager.save(json_content, str(metadata_path), Backend.DISK.value) 

1229 logger.debug(f"Created OpenHCS metadata file (disk): {metadata_path}") 

1230 

1231 except Exception as e: 

1232 # Graceful degradation - log error but don't fail the step 

1233 logger.warning(f"Failed to create OpenHCS metadata file: {e}") 

1234 logger.debug("OpenHCS metadata creation error details:", exc_info=True) 

1235 

1236 def _detect_available_backends(self, output_dir: Path) -> Dict[str, bool]: 

1237 """Detect which storage backends are actually available based on output files.""" 

1238 

1239 backends = {Backend.ZARR.value: False, Backend.DISK.value: False} 

1240 

1241 # Check for zarr stores - look for .zarray or .zgroup files (zarr metadata) 

1242 # Zarr stores don't need .zarr extension - any directory with zarr metadata is a store 

1243 if list(output_dir.glob("**/.zarray")) or list(output_dir.glob("**/.zgroup")): 

1244 backends[Backend.ZARR.value] = True 

1245 

1246 # Check for image files 

1247 for ext in DEFAULT_IMAGE_EXTENSIONS: 

1248 if list(output_dir.glob(f"*{ext}")): 

1249 backends[Backend.DISK.value] = True 

1250 break 

1251 

1252 logger.debug(f"Backend detection result: {backends}") 

1253 return backends 

1254 

1255 def _build_analysis_filename(self, output_key: str, step_index: int, step_plan: Dict, dict_key: Optional[str] = None, context=None) -> str: 

1256 """Build analysis result filename from first image path template. 

1257 

1258 Uses first image filename as template to preserve all metadata components. 

1259 Falls back to well ID only if no images available. 

1260 

1261 Args: 

1262 output_key: Special output key (e.g., 'rois', 'cell_counts') 

1263 step_index: Pipeline step index 

1264 step_plan: Step plan dictionary 

1265 dict_key: Optional channel/component key for dict pattern functions 

1266 context: Processing context (for accessing microscope handler) 

1267 """ 

1268 memory_paths = step_plan['get_paths_for_axis'](step_plan['output_dir'], Backend.MEMORY.value) 

1269 

1270 if not memory_paths: 1270 ↛ 1271line 1270 didn't jump to line 1271 because the condition on line 1270 was never true

1271 return f"{step_plan['axis_id']}_{output_key}_step{step_index}.roi.zip" 

1272 

1273 # Filter paths by channel if dict_key provided (for dict pattern functions) 

1274 if dict_key and context: 1274 ↛ 1290line 1274 didn't jump to line 1290 because the condition on line 1274 was always true

1275 # Use microscope handler to parse filenames and filter by channel 

1276 microscope_handler = context.microscope_handler 

1277 parser = microscope_handler.parser 

1278 

1279 filtered_paths = [] 

1280 for path in memory_paths: 

1281 filename = Path(path).name 

1282 metadata = parser.parse_filename(filename) 

1283 if metadata and str(metadata.get('channel')) == str(dict_key): 

1284 filtered_paths.append(path) 

1285 

1286 if filtered_paths: 1286 ↛ 1290line 1286 didn't jump to line 1290 because the condition on line 1286 was always true

1287 memory_paths = filtered_paths 

1288 

1289 # Use first image as template: "A01_s001_w1_z001_t001.tif" -> "A01_s001_w1_z001_t001_rois_step7.roi.zip" 

1290 base_filename = Path(memory_paths[0]).stem 

1291 return f"{base_filename}_{output_key}_step{step_index}.roi.zip" 

1292 

1293 def _materialize_special_outputs(self, filemanager, step_plan, special_outputs, backend, context): 

1294 """Materialize special outputs (ROIs, cell counts) to disk and streaming backends.""" 

1295 # Collect backends: main + streaming 

1296 from openhcs.core.config import StreamingConfig 

1297 backends = [backend] 

1298 backend_kwargs = {backend: {}} 

1299 

1300 for config in step_plan.values(): 

1301 if isinstance(config, StreamingConfig): 1301 ↛ 1302line 1301 didn't jump to line 1302 because the condition on line 1301 was never true

1302 backends.append(config.backend.value) 

1303 backend_kwargs[config.backend.value] = config.get_streaming_kwargs(context) 

1304 

1305 # Get analysis directory (pre-calculated by compiler) 

1306 has_step_mat = 'materialized_output_dir' in step_plan 

1307 analysis_output_dir = Path(step_plan['materialized_analysis_results_dir' if has_step_mat else 'analysis_results_dir']) 

1308 images_dir = str(step_plan['materialized_output_dir' if has_step_mat else 'output_dir']) 

1309 

1310 # Add images_dir and source to all backend kwargs 

1311 step_name = step_plan.get('step_name', 'unknown_step') 

1312 for kwargs in backend_kwargs.values(): 

1313 kwargs['images_dir'] = images_dir 

1314 kwargs['source'] = step_name # Pre-built source value for layer/window naming 

1315 

1316 filemanager._materialization_context = {'images_dir': images_dir} 

1317 

1318 # Get dict pattern info 

1319 step_func = step_plan['func'] 

1320 dict_keys = list(step_func.keys()) if isinstance(step_func, dict) else [] 

1321 

1322 # Materialize each special output 

1323 for output_key, output_info in special_outputs.items(): 

1324 mat_func = output_info.get('materialization_function') 

1325 if not mat_func: 

1326 continue 

1327 

1328 memory_path = output_info['path'] 

1329 step_index = step_plan['pipeline_position'] 

1330 

1331 # For dict patterns, materialize each channel separately 

1332 channels_to_process = dict_keys if dict_keys else [None] 

1333 

1334 for dict_key in channels_to_process: 

1335 # Build channel-specific memory path if needed 

1336 if dict_key: 1336 ↛ 1340line 1336 didn't jump to line 1340 because the condition on line 1336 was always true

1337 from openhcs.core.pipeline.path_planner import PipelinePathPlanner 

1338 channel_path = PipelinePathPlanner.build_dict_pattern_path(memory_path, dict_key) 

1339 else: 

1340 channel_path = memory_path 

1341 

1342 # Load data 

1343 filemanager.ensure_directory(Path(channel_path).parent, Backend.MEMORY.value) 

1344 data = filemanager.load(channel_path, Backend.MEMORY.value) 

1345 

1346 # Build analysis filename and path (pass dict_key for channel-specific naming) 

1347 filename = self._build_analysis_filename(output_key, step_index, step_plan, dict_key, context) 

1348 analysis_path = analysis_output_dir / filename 

1349 

1350 # Materialize to all backends 

1351 mat_func(data, str(analysis_path), filemanager, backends, backend_kwargs) 

1352 

1353 

1354def _update_metadata_for_zarr_conversion( 

1355 plate_root: Path, 

1356 original_subdir: str, 

1357 zarr_subdir: str | None, 

1358 context: 'ProcessingContext' 

1359) -> None: 

1360 """Update metadata after zarr conversion. 

1361 

1362 If zarr_subdir is None: add zarr to original_subdir's available_backends 

1363 If zarr_subdir is set: create complete metadata for zarr subdirectory, set original main=false 

1364 """ 

1365 from openhcs.io.metadata_writer import get_metadata_path, AtomicMetadataWriter 

1366 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator 

1367 

1368 if zarr_subdir: 1368 ↛ 1389line 1368 didn't jump to line 1389 because the condition on line 1368 was always true

1369 # Create complete metadata for zarr subdirectory (skip if already complete) 

1370 zarr_dir = plate_root / zarr_subdir 

1371 metadata_generator = OpenHCSMetadataGenerator(context.filemanager) 

1372 metadata_generator.create_metadata( 

1373 context, 

1374 str(zarr_dir), 

1375 "zarr", # Zarr subdirectory uses zarr backend 

1376 is_main=True, 

1377 plate_root=str(plate_root), 

1378 sub_dir=zarr_subdir, 

1379 skip_if_complete=True 

1380 ) 

1381 

1382 # Set original subdirectory to main=false 

1383 metadata_path = get_metadata_path(plate_root) 

1384 writer = AtomicMetadataWriter() 

1385 writer.merge_subdirectory_metadata(metadata_path, {original_subdir: {"main": False}}) 

1386 logger.info(f"Ensured complete metadata for {zarr_subdir}, set {original_subdir} main=false") 

1387 else: 

1388 # Shared subdirectory - add zarr to available_backends 

1389 metadata_path = get_metadata_path(plate_root) 

1390 writer = AtomicMetadataWriter() 

1391 writer.merge_subdirectory_metadata(metadata_path, {original_subdir: {"available_backends": {"zarr": True}}}) 

1392 logger.info(f"Updated metadata: {original_subdir} now has zarr backend")