Coverage for openhcs/core/steps/function_step.py: 75.2%

460 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2FunctionStep implementation for pattern-based processing. 

3 

4This module contains the FunctionStep class. During execution, FunctionStep instances 

5are stateless regarding their configuration. All operational parameters, including 

6the function(s) to execute, special input/output keys, their VFS paths, and memory types, 

7are retrieved from this step's entry in `context.step_plans`. 

8""" 

9 

10import logging 

11import os 

12import time 

13import gc 

14import json 

15import shutil 

16from functools import partial 

17from pathlib import Path 

18from typing import Any, Callable, Dict, List, Optional, Tuple, Union, OrderedDict as TypingOrderedDict, TYPE_CHECKING 

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true

21 from openhcs.core.config import PathPlanningConfig 

22 

23from openhcs.constants.constants import (DEFAULT_IMAGE_EXTENSION, 

24 DEFAULT_IMAGE_EXTENSIONS, 

25 DEFAULT_SITE_PADDING, Backend, 

26 MemoryType, VariableComponents, GroupBy) 

27from openhcs.constants.input_source import InputSource 

28from openhcs.core.context.processing_context import ProcessingContext 

29from openhcs.core.steps.abstract import AbstractStep, get_step_id 

30from openhcs.formats.func_arg_prep import prepare_patterns_and_functions 

31from openhcs.core.memory.stack_utils import stack_slices, unstack_slices 

32# OpenHCS imports moved to local imports to avoid circular dependencies 

33 

34logger = logging.getLogger(__name__) 

35 

36def _generate_materialized_paths(memory_paths: List[str], step_output_dir: Path, materialized_output_dir: Path) -> List[str]: 

37 """Generate materialized file paths by replacing step output directory.""" 

38 materialized_paths = [] 

39 for memory_path in memory_paths: 

40 relative_path = Path(memory_path).relative_to(step_output_dir) 

41 materialized_path = materialized_output_dir / relative_path 

42 materialized_paths.append(str(materialized_path)) 

43 return materialized_paths 

44 

45 

46def _save_materialized_data(filemanager, memory_data: List, materialized_paths: List[str], 

47 materialized_backend: str, step_plan: Dict, context, well_id: str) -> None: 

48 """Save data to materialized location using appropriate backend.""" 

49 if materialized_backend == Backend.ZARR.value: 

50 n_channels, n_z, n_fields = _calculate_zarr_dimensions(materialized_paths, context.microscope_handler) 

51 row, col = context.microscope_handler.parser.extract_row_column(well_id) 

52 filemanager.save_batch(memory_data, materialized_paths, materialized_backend, 

53 chunk_name=well_id, zarr_config=step_plan.get("zarr_config"), 

54 n_channels=n_channels, n_z=n_z, n_fields=n_fields, 

55 row=row, col=col) 

56 else: 

57 filemanager.save_batch(memory_data, materialized_paths, materialized_backend) 

58 

59 

60 

61 

62def get_all_image_paths(input_dir, backend, well_id, filemanager, microscope_handler): 

63 """ 

64 Get all image file paths for a specific well from a directory. 

65 

66 Args: 

67 input_dir: Directory to search for images 

68 well_id: Well identifier to filter files 

69 backend: Backend to use for file listing 

70 filemanager: FileManager instance 

71 microscope_handler: Microscope handler with parser for filename parsing 

72 

73 Returns: 

74 List of full file paths for the well 

75 """ 

76 # List all image files in directory 

77 all_image_files = filemanager.list_image_files(str(input_dir), backend) 

78 

79 # Filter by well using parser (FIXED: was using naive string matching) 

80 well_files = [] 

81 parser = microscope_handler.parser 

82 

83 for f in all_image_files: 

84 filename = os.path.basename(str(f)) 

85 metadata = parser.parse_filename(filename) 

86 if metadata and metadata.get('well') == well_id: 

87 well_files.append(str(f)) 

88 

89 # Remove duplicates and sort 

90 sorted_files = sorted(list(set(well_files))) 

91 

92 # Prepare full file paths 

93 full_file_paths = [str(input_dir / Path(f).name) for f in sorted_files] 

94 

95 logger.debug(f"Found {len(all_image_files)} total files, {len(full_file_paths)} for well {well_id}") 

96 

97 return full_file_paths 

98 

99 

100def create_image_path_getter(well_id, filemanager, microscope_handler): 

101 """ 

102 Create a specialized image path getter function using runtime context. 

103 

104 Args: 

105 well_id: Well identifier 

106 filemanager: FileManager instance 

107 microscope_handler: Microscope handler with parser for filename parsing 

108 

109 Returns: 

110 Function that takes (input_dir, backend) and returns image paths for the well 

111 """ 

112 def get_paths_for_well(input_dir, backend): 

113 return get_all_image_paths( 

114 input_dir=input_dir, 

115 well_id=well_id, 

116 backend=backend, 

117 filemanager=filemanager, 

118 microscope_handler=microscope_handler 

119 ) 

120 return get_paths_for_well 

121 

122# Environment variable to disable universal GPU defragmentation 

123DISABLE_GPU_DEFRAG = os.getenv('OPENHCS_DISABLE_GPU_DEFRAG', 'false').lower() == 'true' 

124 

125def _bulk_preload_step_images( 

126 step_input_dir: Path, 

127 step_output_dir: Path, 

128 well_id: str, 

129 read_backend: str, 

130 patterns_by_well: Dict[str, Any], 

131 filemanager: 'FileManager', 

132 microscope_handler: 'MicroscopeHandler', 

133 zarr_config: Optional[Dict[str, Any]] = None 

134) -> None: 

135 """ 

136 Pre-load all images for this step from source backend into memory backend. 

137 

138 This reduces I/O overhead by doing a single bulk read operation 

139 instead of loading images per pattern group. 

140 

141 Note: External conditional logic ensures this is only called for non-memory backends. 

142 """ 

143 import time 

144 start_time = time.time() 

145 

146 logger.debug(f"🔄 BULK PRELOAD: Loading images from {read_backend} to memory for well {well_id}") 

147 

148 # Get all files for this well from patterns 

149 all_files = [] 

150 # Create specialized path getter for this well 

151 get_paths_for_well = create_image_path_getter(well_id, filemanager, microscope_handler) 

152 

153 # Get all image paths for this well 

154 full_file_paths = get_paths_for_well(step_input_dir, read_backend) 

155 

156 if not full_file_paths: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 raise RuntimeError(f"🔄 BULK PRELOAD: No files found for well {well_id} in {step_input_dir} with backend {read_backend}") 

158 

159 # Load from source backend with conditional zarr_config 

160 if read_backend == Backend.ZARR.value: 

161 raw_images = filemanager.load_batch(full_file_paths, read_backend, zarr_config=zarr_config) 

162 else: 

163 raw_images = filemanager.load_batch(full_file_paths, read_backend) 

164 

165 # Ensure directory exists in memory backend before saving 

166 filemanager.ensure_directory(str(step_input_dir), Backend.MEMORY.value) 

167 

168 # Save to memory backend using OUTPUT paths 

169 # memory_paths = [str(step_output_dir / Path(fp).name) for fp in full_file_paths] 

170 for file_path in full_file_paths: 

171 if filemanager.exists(file_path, Backend.MEMORY.value): 

172 filemanager.delete(file_path, Backend.MEMORY.value) 

173 logger.debug(f"🔄 BULK PRELOAD: Deleted existing file {file_path} before bulk preload") 

174 

175 filemanager.save_batch(raw_images, full_file_paths, Backend.MEMORY.value) 

176 logger.debug(f"🔄 BULK PRELOAD: Saving {file_path} to memory") 

177 

178 # Clean up source references - keep only memory backend references 

179 del raw_images 

180 

181 load_time = time.time() - start_time 

182 logger.debug(f"🔄 BULK PRELOAD: Completed in {load_time:.2f}s - {len(full_file_paths)} images now in memory") 

183 

184def _bulk_writeout_step_images( 

185 step_output_dir: Path, 

186 write_backend: str, 

187 well_id: str, 

188 zarr_config: Optional[Dict[str, Any]], 

189 filemanager: 'FileManager', 

190 microscope_handler: Optional[Any] = None 

191) -> None: 

192 """ 

193 Write all processed images from memory to final backend (disk/zarr). 

194 

195 This reduces I/O overhead by doing a single bulk write operation 

196 instead of writing images per pattern group. 

197 

198 Note: External conditional logic ensures this is only called for non-memory backends. 

199 """ 

200 import time 

201 start_time = time.time() 

202 

203 logger.debug(f"🔄 BULK WRITEOUT: Writing images from memory to {write_backend} for well {well_id}") 

204 

205 # Create specialized path getter and get memory paths for this well 

206 get_paths_for_well = create_image_path_getter(well_id, filemanager, microscope_handler) 

207 memory_file_paths = get_paths_for_well(step_output_dir, Backend.MEMORY.value) 

208 

209 if not memory_file_paths: 

210 raise RuntimeError(f"🔄 BULK WRITEOUT: No image files found for well {well_id} in memory directory {step_output_dir}") 

211 

212 # Convert relative memory paths back to absolute paths for target backend 

213 # Memory backend stores relative paths, but target backend needs absolute paths 

214# file_paths = 

215# for memory_path in memory_file_paths: 

216# # Get just the filename and construct proper target path 

217# filename = Path(memory_path).name 

218# target_path = step_output_dir / filename 

219# file_paths.append(str(target_path)) 

220 

221 file_paths = memory_file_paths 

222 logger.debug(f"🔄 BULK WRITEOUT: Found {len(file_paths)} image files in memory to write") 

223 

224 # Load all data from memory backend 

225 memory_data = filemanager.load_batch(file_paths, Backend.MEMORY.value) 

226 

227 # Ensure output directory exists before bulk write 

228 filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value) 

229 

230 # Bulk write to target backend with conditional zarr_config 

231 if write_backend == Backend.ZARR.value: 

232 # Calculate zarr dimensions from file paths 

233 if microscope_handler is not None: 

234 n_channels, n_z, n_fields = _calculate_zarr_dimensions(file_paths, microscope_handler) 

235 # Parse well to get row and column for zarr structure 

236 row, col = microscope_handler.parser.extract_row_column(well_id) 

237 filemanager.save_batch(memory_data, file_paths, write_backend, 

238 chunk_name=well_id, zarr_config=zarr_config, 

239 n_channels=n_channels, n_z=n_z, n_fields=n_fields, 

240 row=row, col=col) 

241 else: 

242 # Fallback without dimensions if microscope_handler not available 

243 filemanager.save_batch(memory_data, file_paths, write_backend, chunk_name=well_id, zarr_config=zarr_config) 

244 else: 

245 filemanager.save_batch(memory_data, file_paths, write_backend) 

246 

247 write_time = time.time() - start_time 

248 logger.debug(f"🔄 BULK WRITEOUT: Completed in {write_time:.2f}s - {len(memory_data)} images written to {write_backend}") 

249 

250def _calculate_zarr_dimensions(file_paths: List[Union[str, Path]], microscope_handler) -> tuple[int, int, int]: 

251 """ 

252 Calculate zarr dimensions (n_channels, n_z, n_fields) from file paths using microscope parser. 

253 

254 Args: 

255 file_paths: List of file paths to analyze 

256 microscope_handler: Microscope handler with filename parser 

257 

258 Returns: 

259 Tuple of (n_channels, n_z, n_fields) 

260 """ 

261 parsed_files = [] 

262 for file_path in file_paths: 

263 filename = Path(file_path).name 

264 metadata = microscope_handler.parser.parse_filename(filename) 

265 parsed_files.append(metadata) 

266 

267 # Count unique values for each dimension from actual files 

268 n_channels = len(set(f.get('channel') for f in parsed_files if f.get('channel') is not None)) 

269 n_z = len(set(f.get('z_index') for f in parsed_files if f.get('z_index') is not None)) 

270 n_fields = len(set(f.get('site') for f in parsed_files if f.get('site') is not None)) 

271 

272 # Ensure at least 1 for each dimension (handle cases where metadata is missing) 

273 n_channels = max(1, n_channels) 

274 n_z = max(1, n_z) 

275 n_fields = max(1, n_fields) 

276 

277 return n_channels, n_z, n_fields 

278 

279 

280 

281def _is_3d(array: Any) -> bool: 

282 """Check if an array is 3D.""" 

283 return hasattr(array, 'ndim') and array.ndim == 3 

284 

285def _execute_function_core( 

286 func_callable: Callable, 

287 main_data_arg: Any, 

288 base_kwargs: Dict[str, Any], 

289 context: 'ProcessingContext', 

290 special_inputs_plan: Dict[str, str], # {'arg_name_for_func': 'special_path_value'} 

291 special_outputs_plan: TypingOrderedDict[str, str], # {'output_key': 'special_path_value'}, order matters 

292 well_id: str, # Add well_id parameter 

293 input_memory_type: str, 

294 device_id: int 

295) -> Any: # Returns the main processed data stack 

296 """ 

297 Executes a single callable, handling its special I/O. 

298 - Loads special inputs from VFS paths in `special_inputs_plan`. 

299 - Calls `func_callable(main_data_arg, **all_kwargs)`. 

300 - If `special_outputs_plan` is non-empty, expects func to return (main_out, sp_val1, sp_val2,...). 

301 - Saves special outputs positionally to VFS paths in `special_outputs_plan`. 

302 - Returns the main processed data stack. 

303 """ 

304 final_kwargs = base_kwargs.copy() 

305 

306 if special_inputs_plan: 

307 logger.info(f"�� SPECIAL_INPUTS_DEBUG : special_inputs_plan = {special_inputs_plan}") 

308 for arg_name, path_info in special_inputs_plan.items(): 

309 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Processing arg_name='{arg_name}', path_info={path_info} (type: {type(path_info)})") 

310 

311 

312 # Extract path string from the path info dictionary 

313 # Current format: {"path": "/path/to/file.pkl", "source_step_id": "step_123"} 

314 if isinstance(path_info, dict) and 'path' in path_info: 314 ↛ 318line 314 didn't jump to line 318 because the condition on line 314 was always true

315 special_path_value = path_info['path'] 

316 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Extracted path from dict: '{special_path_value}' (type: {type(special_path_value)})") 

317 else: 

318 special_path_value = path_info # Fallback if it's already a string 

319 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Using path_info directly: '{special_path_value}' (type: {type(special_path_value)})") 

320 

321 logger.info(f"Loading special input '{arg_name}' from path '{special_path_value}' (memory backend)") 

322 try: 

323 final_kwargs[arg_name] = context.filemanager.load(special_path_value, Backend.MEMORY.value) 

324 except Exception as e: 

325 logger.error(f"Failed to load special input '{arg_name}' from '{special_path_value}': {e}", exc_info=True) 

326 raise 

327 

328 # Auto-inject context if function signature expects it 

329 import inspect 

330 sig = inspect.signature(func_callable) 

331 if 'context' in sig.parameters: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 final_kwargs['context'] = context 

333 

334 # 🔍 DEBUG: Log input dimensions 

335 input_shape = getattr(main_data_arg, 'shape', 'no shape attr') 

336 input_type = type(main_data_arg).__name__ 

337 logger.debug(f"🔍 FUNCTION INPUT: {func_callable.__name__} - shape: {input_shape}, type: {input_type}") 

338 

339 # ⚡ INFO: Terse function execution log for user feedback 

340 logger.info(f"⚡ Executing: {func_callable.__name__}") 

341 

342 # 🔍 DEBUG: Log function attributes before execution 

343 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - special_outputs: {getattr(func_callable, '__special_outputs__', 'None')}") 

344 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - input_memory_type: {getattr(func_callable, 'input_memory_type', 'None')}") 

345 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - output_memory_type: {getattr(func_callable, 'output_memory_type', 'None')}") 

346 

347 raw_function_output = func_callable(main_data_arg, **final_kwargs) 

348 

349 # 🔍 DEBUG: Log output dimensions and type details 

350 output_shape = getattr(raw_function_output, 'shape', 'no shape attr') 

351 output_type = type(raw_function_output).__name__ 

352 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - shape: {output_shape}, type: {output_type}") 

353 

354 # 🔍 DEBUG: If it's a tuple, log details about each element 

355 if isinstance(raw_function_output, tuple): 

356 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - tuple length: {len(raw_function_output)}") 

357 for i, element in enumerate(raw_function_output): 

358 elem_shape = getattr(element, 'shape', 'no shape attr') 

359 elem_type = type(element).__name__ 

360 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - element[{i}]: shape={elem_shape}, type={elem_type}") 

361 else: 

362 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - not a tuple, single return value") 

363 

364 main_output_data = raw_function_output 

365 

366 # 🔍 DEBUG: Log special output plan status 

367 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: {special_outputs_plan}") 

368 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Is empty? {not special_outputs_plan}") 

369 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Length: {len(special_outputs_plan) if special_outputs_plan else 0}") 

370 

371 # Only log special outputs if there are any (avoid spamming empty dict logs) 

372 if special_outputs_plan: 

373 logger.debug(f"🔍 SPECIAL OUTPUT: {special_outputs_plan}") 

374 if special_outputs_plan: 

375 num_special_outputs = len(special_outputs_plan) 

376 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Expected {num_special_outputs} special outputs") 

377 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned type: {type(raw_function_output)}") 

378 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned tuple length: {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'not tuple'}") 

379 

380 if not isinstance(raw_function_output, tuple) or len(raw_function_output) != (1 + num_special_outputs): 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true

381 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Function '{getattr(func_callable, '__name__', 'unknown')}' special output mismatch") 

382 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Expected tuple of {1 + num_special_outputs} values") 

383 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Got {type(raw_function_output)} with {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'N/A'} values") 

384 raise ValueError( 

385 f"Function '{getattr(func_callable, '__name__', 'unknown')}' was expected to return a tuple of " 

386 f"{1 + num_special_outputs} values (main_output + {num_special_outputs} special) " 

387 f"based on 'special_outputs' in step plan, but returned {len(raw_function_output) if isinstance(raw_function_output, tuple) else type(raw_function_output)} values." 

388 ) 

389 main_output_data = raw_function_output[0] 

390 returned_special_values_tuple = raw_function_output[1:] 

391 

392 # 🔍 DEBUG: Log what we extracted 

393 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data type: {type(main_output_data)}") 

394 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data shape: {getattr(main_output_data, 'shape', 'no shape')}") 

395 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted {len(returned_special_values_tuple)} special values") 

396 

397 # Iterate through special_outputs_plan (which must be ordered by compiler) 

398 # and match with positionally returned special values. 

399 for i, (output_key, vfs_path_info) in enumerate(special_outputs_plan.items()): 

400 logger.info(f"Saving special output '{output_key}' to VFS path '{vfs_path_info}' (memory backend)") 

401 if i < len(returned_special_values_tuple): 401 ↛ 424line 401 didn't jump to line 424 because the condition on line 401 was always true

402 value_to_save = returned_special_values_tuple[i] 

403 # Extract path string from the path info dictionary 

404 # Current format: {"path": "/path/to/file.pkl"} 

405 if isinstance(vfs_path_info, dict) and 'path' in vfs_path_info: 405 ↛ 408line 405 didn't jump to line 408 because the condition on line 405 was always true

406 vfs_path = vfs_path_info['path'] 

407 else: 

408 vfs_path = vfs_path_info # Fallback if it's already a string 

409 # # Add well_id prefix to filename for memory backend to avoid thread collisions 

410 # from pathlib import Path 

411 # vfs_path_obj = Path(vfs_path) 

412 # prefixed_filename = f"{well_id}_{vfs_path_obj.name}" 

413 # prefixed_vfs_path = str(vfs_path_obj.parent / prefixed_filename) 

414 

415 logger.info(f"🔍 SPECIAL_SAVE: Saving '{output_key}' to '{vfs_path}' (memory backend)") 

416 # Ensure directory exists for memory backend 

417 parent_dir = str(Path(vfs_path).parent) 

418 context.filemanager.ensure_directory(parent_dir, Backend.MEMORY.value) 

419 context.filemanager.save(value_to_save, vfs_path, Backend.MEMORY.value) 

420 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory") 

421 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory") 

422 else: 

423 # This indicates a mismatch that should ideally be caught by schema/validation 

424 logger.error(f"Mismatch: {num_special_outputs} special outputs planned, but fewer values returned by function for key '{output_key}'.") 

425 # Or, if partial returns are allowed, this might be a warning. For now, error. 

426 raise ValueError(f"Function did not return enough values for all planned special outputs. Missing value for '{output_key}'.") 

427 

428 return main_output_data 

429 

430def _execute_chain_core( 

431 initial_data_stack: Any, 

432 func_chain: List[Union[Callable, Tuple[Callable, Dict]]], 

433 context: 'ProcessingContext', 

434 step_special_inputs_plan: Dict[str, str], 

435 step_special_outputs_plan: TypingOrderedDict[str, str], 

436 well_id: str, # Add well_id parameter 

437 device_id: int, 

438 input_memory_type: str, 

439 step_id: str, # Add step_id for funcplan lookup 

440 dict_key: str = "default" # Add dict_key for funcplan lookup 

441) -> Any: 

442 current_stack = initial_data_stack 

443 current_memory_type = input_memory_type # Track memory type from frozen context 

444 

445 for i, func_item in enumerate(func_chain): 

446 actual_callable: Callable 

447 base_kwargs_for_item: Dict[str, Any] = {} 

448 is_last_in_chain = (i == len(func_chain) - 1) 

449 

450 if isinstance(func_item, tuple) and len(func_item) == 2 and callable(func_item[0]): 450 ↛ 452line 450 didn't jump to line 452 because the condition on line 450 was always true

451 actual_callable, base_kwargs_for_item = func_item 

452 elif callable(func_item): 

453 actual_callable = func_item 

454 else: 

455 raise TypeError(f"Invalid item in function chain: {func_item}.") 

456 

457 # Convert to function's input memory type (noop if same) 

458 from openhcs.core.memory.converters import convert_memory 

459 current_stack = convert_memory( 

460 data=current_stack, 

461 source_type=current_memory_type, 

462 target_type=actual_callable.input_memory_type, 

463 gpu_id=device_id, 

464 allow_cpu_roundtrip=False 

465 ) 

466 

467 # Use funcplan to determine which outputs this function should save 

468 funcplan = context.step_plans[step_id].get("funcplan", {}) 

469 func_name = getattr(actual_callable, '__name__', 'unknown') 

470 

471 # Construct execution key: function_name_dict_key_chain_position 

472 execution_key = f"{func_name}_{dict_key}_{i}" 

473 

474 if execution_key in funcplan: 474 ↛ 476line 474 didn't jump to line 476 because the condition on line 474 was never true

475 # Get outputs this specific function should save 

476 outputs_to_save = funcplan[execution_key] 

477 outputs_plan_for_this_call = { 

478 key: step_special_outputs_plan[key] 

479 for key in outputs_to_save 

480 if key in step_special_outputs_plan 

481 } 

482 logger.info(f"🔍 FUNCPLAN: {execution_key} -> {outputs_to_save}") 

483 logger.info(f"🔍 FUNCPLAN: outputs_plan_for_this_call = {outputs_plan_for_this_call}") 

484 else: 

485 # Fallback: no funcplan entry, save nothing 

486 outputs_plan_for_this_call = {} 

487 logger.info(f"🔍 FUNCPLAN: No entry for {execution_key}, saving nothing") 

488 

489 current_stack = _execute_function_core( 

490 func_callable=actual_callable, 

491 main_data_arg=current_stack, 

492 base_kwargs=base_kwargs_for_item, 

493 context=context, 

494 special_inputs_plan=step_special_inputs_plan, 

495 special_outputs_plan=outputs_plan_for_this_call, 

496 well_id=well_id, 

497 device_id=device_id, 

498 input_memory_type=input_memory_type, 

499 ) 

500 

501 # Update current memory type from frozen context 

502 current_memory_type = actual_callable.output_memory_type 

503 

504 return current_stack 

505 

506def _process_single_pattern_group( 

507 context: 'ProcessingContext', 

508 pattern_group_info: Any, 

509 executable_func_or_chain: Any, 

510 base_func_args: Dict[str, Any], 

511 step_input_dir: Path, 

512 step_output_dir: Path, 

513 well_id: str, 

514 component_value: str, 

515 read_backend: str, 

516 write_backend: str, 

517 input_memory_type_from_plan: str, # Explicitly from plan 

518 output_memory_type_from_plan: str, # Explicitly from plan 

519 device_id: Optional[int], 

520 same_directory: bool, 

521 special_inputs_map: Dict[str, str], 

522 special_outputs_map: TypingOrderedDict[str, str], 

523 zarr_config: Optional[Dict[str, Any]], 

524 variable_components: Optional[List[str]] = None, 

525 step_id: Optional[str] = None # Add step_id for funcplan lookup 

526) -> None: 

527 start_time = time.time() 

528 pattern_repr = str(pattern_group_info)[:100] 

529 logger.debug(f"🔥 PATTERN: Processing {pattern_repr} for well {well_id}") 

530 

531 try: 

532 if not context.microscope_handler: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 raise RuntimeError("MicroscopeHandler not available in context.") 

534 

535 matching_files = context.microscope_handler.path_list_from_pattern( 

536 str(step_input_dir), pattern_group_info, context.filemanager, Backend.MEMORY.value, 

537 [vc.value for vc in variable_components] if variable_components else None 

538 ) 

539 

540 if not matching_files: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true

541 raise ValueError( 

542 f"No matching files found for pattern group {pattern_repr} in {step_input_dir}. " 

543 f"This indicates either: (1) no image files exist in the directory, " 

544 f"(2) files don't match the pattern, or (3) pattern parsing failed. " 

545 f"Check that input files exist and match the expected naming convention." 

546 ) 

547 

548 logger.debug(f"🔥 PATTERN: Found {len(matching_files)} files: {[Path(f).name for f in matching_files]}") 

549 

550 # Sort files to ensure consistent ordering (especially important for z-stacks) 

551 matching_files.sort() 

552 logger.debug(f"🔥 PATTERN: Sorted files: {[Path(f).name for f in matching_files]}") 

553 

554 full_file_paths = [str(step_input_dir / f) for f in matching_files] 

555 raw_slices = context.filemanager.load_batch(full_file_paths, Backend.MEMORY.value) 

556 

557 if not raw_slices: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 raise ValueError( 

559 f"No valid images loaded for pattern group {pattern_repr} in {step_input_dir}. " 

560 f"Found {len(matching_files)} matching files but failed to load any valid images. " 

561 f"This indicates corrupted image files, unsupported formats, or I/O errors. " 

562 f"Check file integrity and format compatibility." 

563 ) 

564 

565 # 🔍 DEBUG: Log stacking operation 

566 logger.debug(f"🔍 STACKING: {len(raw_slices)} slices → memory_type: {input_memory_type_from_plan}") 

567 if raw_slices: 567 ↛ 571line 567 didn't jump to line 571 because the condition on line 567 was always true

568 slice_shapes = [getattr(s, 'shape', 'no shape') for s in raw_slices[:3]] # First 3 shapes 

569 logger.debug(f"🔍 STACKING: Sample slice shapes: {slice_shapes}") 

570 

571 main_data_stack = stack_slices( 

572 slices=raw_slices, memory_type=input_memory_type_from_plan, gpu_id=device_id 

573 ) 

574 

575 # 🔍 DEBUG: Log stacked result 

576 stack_shape = getattr(main_data_stack, 'shape', 'no shape') 

577 stack_type = type(main_data_stack).__name__ 

578 logger.debug(f"🔍 STACKED RESULT: shape: {stack_shape}, type: {stack_type}") 

579 

580 logger.info(f"🔍 special_outputs_map: {special_outputs_map}") 

581 

582 final_base_kwargs = base_func_args.copy() 

583 

584 # Get step function from step plan 

585 step_func = context.step_plans[step_id]["func"] 

586 

587 if isinstance(step_func, dict): 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true

588 dict_key_for_funcplan = component_value # Use actual dict key for dict patterns 

589 else: 

590 dict_key_for_funcplan = "default" # Use default for list/single patterns 

591 

592 if isinstance(executable_func_or_chain, list): 

593 processed_stack = _execute_chain_core( 

594 main_data_stack, executable_func_or_chain, context, 

595 special_inputs_map, special_outputs_map, well_id, 

596 device_id, input_memory_type_from_plan, step_id, dict_key_for_funcplan 

597 ) 

598 elif callable(executable_func_or_chain): 598 ↛ 605line 598 didn't jump to line 605 because the condition on line 598 was always true

599 # For single functions, we don't need chain execution, but we still need the right dict_key 

600 processed_stack = _execute_function_core( 

601 executable_func_or_chain, main_data_stack, final_base_kwargs, context, 

602 special_inputs_map, special_outputs_map, well_id, input_memory_type_from_plan, device_id 

603 ) 

604 else: 

605 raise TypeError(f"Invalid executable_func_or_chain: {type(executable_func_or_chain)}") 

606 

607 # 🔍 DEBUG: Check what shape the function actually returned 

608 input_shape = getattr(main_data_stack, 'shape', 'unknown') 

609 output_shape = getattr(processed_stack, 'shape', 'unknown') 

610 processed_type = type(processed_stack).__name__ 

611 logger.debug(f"🔍 PROCESSING RESULT: input: {input_shape} → output: {output_shape}, type: {processed_type}") 

612 

613 # 🔍 DEBUG: Additional validation logging 

614 logger.debug(f"🔍 VALIDATION: processed_stack type: {type(processed_stack)}") 

615 logger.debug(f"🔍 VALIDATION: processed_stack has shape attr: {hasattr(processed_stack, 'shape')}") 

616 logger.debug(f"🔍 VALIDATION: processed_stack has ndim attr: {hasattr(processed_stack, 'ndim')}") 

617 if hasattr(processed_stack, 'ndim'): 617 ↛ 619line 617 didn't jump to line 619 because the condition on line 617 was always true

618 logger.debug(f"🔍 VALIDATION: processed_stack ndim: {processed_stack.ndim}") 

619 if hasattr(processed_stack, 'shape'): 619 ↛ 622line 619 didn't jump to line 622 because the condition on line 619 was always true

620 logger.debug(f"🔍 VALIDATION: processed_stack shape: {processed_stack.shape}") 

621 

622 if not _is_3d(processed_stack): 622 ↛ 623line 622 didn't jump to line 623 because the condition on line 622 was never true

623 logger.error(f"🔍 VALIDATION ERROR: processed_stack is not 3D") 

624 logger.error(f"🔍 VALIDATION ERROR: Type: {type(processed_stack)}") 

625 logger.error(f"🔍 VALIDATION ERROR: Shape: {getattr(processed_stack, 'shape', 'no shape attr')}") 

626 logger.error(f"🔍 VALIDATION ERROR: Has ndim: {hasattr(processed_stack, 'ndim')}") 

627 if hasattr(processed_stack, 'ndim'): 

628 logger.error(f"🔍 VALIDATION ERROR: ndim value: {processed_stack.ndim}") 

629 raise ValueError(f"Main processing must result in a 3D array, got {getattr(processed_stack, 'shape', 'unknown')}") 

630 

631 # 🔍 DEBUG: Log unstacking operation 

632 logger.debug(f"🔍 UNSTACKING: shape: {output_shape} → memory_type: {output_memory_type_from_plan}") 

633 

634 

635 

636 output_slices = unstack_slices( 

637 array=processed_stack, memory_type=output_memory_type_from_plan, gpu_id=device_id, validate_slices=True 

638 ) 

639 

640 # 🔍 DEBUG: Log unstacked result 

641 if output_slices: 641 ↛ 647line 641 didn't jump to line 647 because the condition on line 641 was always true

642 unstacked_shapes = [getattr(s, 'shape', 'no shape') for s in output_slices[:3]] # First 3 shapes 

643 logger.debug(f"🔍 UNSTACKED RESULT: {len(output_slices)} slices, sample shapes: {unstacked_shapes}") 

644 

645 # Handle cases where function returns fewer images than inputs (e.g., z-stack flattening, channel compositing) 

646 # In such cases, we save only the returned images using the first N input filenames 

647 num_outputs = len(output_slices) 

648 num_inputs = len(matching_files) 

649 

650 if num_outputs < num_inputs: 

651 logger.debug(f"Function returned {num_outputs} images from {num_inputs} inputs - likely flattening operation") 

652 elif num_outputs > num_inputs: 652 ↛ 653line 652 didn't jump to line 653 because the condition on line 652 was never true

653 logger.warning(f"Function returned more images ({num_outputs}) than inputs ({num_inputs}) - unexpected") 

654 

655 # Save the output images using batch operations 

656 try: 

657 # Prepare batch data 

658 output_data = [] 

659 output_paths_batch = [] 

660 

661 for i, img_slice in enumerate(output_slices): 

662 # FAIL FAST: No fallback filenames - if we have more outputs than inputs, something is wrong 

663 if i >= len(matching_files): 663 ↛ 664line 663 didn't jump to line 664 because the condition on line 663 was never true

664 raise ValueError( 

665 f"Function returned {num_outputs} output slices but only {num_inputs} input files available. " 

666 f"Cannot generate filename for output slice {i}. This indicates a bug in the function or " 

667 f"unstacking logic - functions should return same or fewer images than inputs." 

668 ) 

669 

670 input_filename = matching_files[i] 

671 output_filename = Path(input_filename).name 

672 output_path = Path(step_output_dir) / output_filename 

673 

674 # Always ensure we can write to the output path (delete if exists) 

675 if context.filemanager.exists(str(output_path), Backend.MEMORY.value): 

676 context.filemanager.delete(str(output_path), Backend.MEMORY.value) 

677 

678 output_data.append(img_slice) 

679 output_paths_batch.append(str(output_path)) 

680 

681 # Ensure directory exists 

682 context.filemanager.ensure_directory(str(step_output_dir), Backend.MEMORY.value) 

683 

684 # Only pass zarr_config to zarr backend - fail loud for invalid parameters 

685 #if write_backend == Backend.ZARR.value: 

686 # Batch save 

687 # context.filemanager.save_batch(output_data, output_paths_batch, write_backend, zarr_config=zarr_config) 

688 # else: 

689 context.filemanager.save_batch(output_data, output_paths_batch, Backend.MEMORY.value) 

690 

691 except Exception as e: 

692 logger.error(f"Error saving batch of output slices for pattern {pattern_repr}: {e}", exc_info=True) 

693 

694 # 🔥 CLEANUP: If function returned fewer images than inputs, delete the unused input files 

695 # This prevents unused channel files from remaining in memory after compositing 

696 if num_outputs < num_inputs: 

697 for j in range(num_outputs, num_inputs): 

698 unused_input_filename = matching_files[j] 

699 unused_input_path = Path(step_input_dir) / unused_input_filename 

700 if context.filemanager.exists(str(unused_input_path), Backend.MEMORY.value): 700 ↛ 697line 700 didn't jump to line 697 because the condition on line 700 was always true

701 context.filemanager.delete(str(unused_input_path), Backend.MEMORY.value) 

702 logger.debug(f"🔥 CLEANUP: Deleted unused input file: {unused_input_filename}") 

703 

704 

705 

706 logger.debug(f"Finished pattern group {pattern_repr} in {(time.time() - start_time):.2f}s.") 

707 except Exception as e: 

708 import traceback 

709 full_traceback = traceback.format_exc() 

710 logger.error(f"Error processing pattern group {pattern_repr}: {e}", exc_info=True) 

711 logger.error(f"Full traceback for pattern group {pattern_repr}:\n{full_traceback}") 

712 raise ValueError(f"Failed to process pattern group {pattern_repr}: {e}") from e 

713 

714class FunctionStep(AbstractStep): 

715 

716 def __init__( 

717 self, 

718 func: Union[Callable, Tuple[Callable, Dict], List[Union[Callable, Tuple[Callable, Dict]]]], 

719 **kwargs 

720 ): 

721 # Generate default name from function if not provided 

722 if 'name' not in kwargs or kwargs['name'] is None: 

723 actual_func_for_name = func 

724 if isinstance(func, tuple): 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true

725 actual_func_for_name = func[0] 

726 elif isinstance(func, list) and func: 726 ↛ 727line 726 didn't jump to line 727 because the condition on line 726 was never true

727 first_item = func[0] 

728 if isinstance(first_item, tuple): 

729 actual_func_for_name = first_item[0] 

730 elif callable(first_item): 

731 actual_func_for_name = first_item 

732 kwargs['name'] = getattr(actual_func_for_name, '__name__', 'FunctionStep') 

733 

734 super().__init__(**kwargs) 

735 self.func = func # This is used by prepare_patterns_and_functions at runtime 

736 

737 def process(self, context: 'ProcessingContext') -> None: 

738 # Generate step_id from object reference (elegant stateless approach) 

739 step_id = get_step_id(self) 

740 step_plan = context.step_plans[step_id] 

741 

742 # Get step name for logging 

743 step_name = step_plan['step_name'] 

744 

745 try: 

746 well_id = step_plan['well_id'] 

747 step_input_dir = Path(step_plan['input_dir']) 

748 step_output_dir = Path(step_plan['output_dir']) 

749 variable_components = step_plan['variable_components'] 

750 group_by = step_plan['group_by'] 

751 func_from_plan = step_plan['func'] 

752 

753 # special_inputs/outputs are dicts: {'key': 'vfs_path_value'} 

754 special_inputs = step_plan['special_inputs'] 

755 special_outputs = step_plan['special_outputs'] # Should be OrderedDict if order matters 

756 

757 read_backend = step_plan['read_backend'] 

758 write_backend = step_plan['write_backend'] 

759 input_mem_type = step_plan['input_memory_type'] 

760 output_mem_type = step_plan['output_memory_type'] 

761 microscope_handler = context.microscope_handler 

762 filemanager = context.filemanager 

763 

764 # Create path getter for this well 

765 get_paths_for_well = create_image_path_getter(well_id, filemanager, microscope_handler) 

766 

767 # Get patterns first for bulk preload 

768 patterns_by_well = microscope_handler.auto_detect_patterns( 

769 str(step_input_dir), # folder_path 

770 filemanager, # filemanager 

771 read_backend, # backend 

772 well_filter=[well_id], # well_filter 

773 extensions=DEFAULT_IMAGE_EXTENSIONS, # extensions 

774 group_by=group_by.value if group_by else None, # group_by 

775 variable_components=[vc.value for vc in variable_components] if variable_components else [] # variable_components 

776 ) 

777 

778 

779 # Only access gpu_id if the step requires GPU (has GPU memory types) 

780 from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES 

781 requires_gpu = (input_mem_type in VALID_GPU_MEMORY_TYPES or 

782 output_mem_type in VALID_GPU_MEMORY_TYPES) 

783 

784 # Ensure variable_components is never None - use default if missing 

785 if variable_components is None: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true

786 variable_components = [VariableComponents.SITE] # Default fallback 

787 logger.warning(f"Step {step_id} ({step_name}) had None variable_components, using default [SITE]") 

788 if requires_gpu: 788 ↛ 789line 788 didn't jump to line 789 because the condition on line 788 was never true

789 device_id = step_plan['gpu_id'] 

790 logger.debug(f"🔥 DEBUG: Step {step_id} gpu_id from plan: {device_id}, input_mem: {input_mem_type}, output_mem: {output_mem_type}") 

791 else: 

792 device_id = None # CPU-only step 

793 logger.debug(f"🔥 DEBUG: Step {step_id} is CPU-only, input_mem: {input_mem_type}, output_mem: {output_mem_type}") 

794 

795 logger.debug(f"🔥 DEBUG: Step {step_id} read_backend: {read_backend}, write_backend: {write_backend}") 

796 

797 if not all([well_id, step_input_dir, step_output_dir]): 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 raise ValueError(f"Plan missing essential keys for step {step_id}") 

799 

800 same_dir = str(step_input_dir) == str(step_output_dir) 

801 logger.info(f"Step {step_id} ({step_name}) I/O: read='{read_backend}', write='{write_backend}'.") 

802 logger.info(f"Step {step_id} ({step_name}) Paths: input_dir='{step_input_dir}', output_dir='{step_output_dir}', same_dir={same_dir}") 

803 

804 # 🔄 MATERIALIZATION READ: Bulk preload if not reading from memory 

805 if read_backend != Backend.MEMORY.value: 

806 _bulk_preload_step_images(step_input_dir, step_output_dir, well_id, read_backend, 

807 patterns_by_well,filemanager, microscope_handler, step_plan["zarr_config"]) 

808 

809 # 🔄 INPUT CONVERSION: Convert loaded input data to zarr if configured 

810 if "input_conversion_dir" in step_plan: 

811 input_conversion_dir = step_plan["input_conversion_dir"] 

812 input_conversion_backend = step_plan["input_conversion_backend"] 

813 

814 logger.info(f"Converting input data to zarr: {input_conversion_dir}") 

815 

816 # Get memory paths from input data (already loaded) 

817 memory_paths = get_paths_for_well(step_input_dir, Backend.MEMORY.value) 

818 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

819 

820 # Generate conversion paths (input_dir → conversion_dir) 

821 conversion_paths = _generate_materialized_paths(memory_paths, Path(step_input_dir), Path(input_conversion_dir)) 

822 

823 # Ensure conversion directory exists 

824 filemanager.ensure_directory(input_conversion_dir, input_conversion_backend) 

825 

826 # Save using existing materialized data infrastructure 

827 _save_materialized_data(filemanager, memory_data, conversion_paths, input_conversion_backend, step_plan, context, well_id) 

828 

829 logger.info(f"🔬 Converted {len(conversion_paths)} input files to {input_conversion_dir}") 

830 

831 # 🔍 VRAM TRACKING: Log memory at step start 

832 try: 

833 from openhcs.core.memory.gpu_cleanup import log_gpu_memory_usage 

834 log_gpu_memory_usage(f"step {step_name} start") 

835 except ImportError: 

836 pass # GPU cleanup not available 

837 

838 

839 

840 log_gpu_memory_usage(f"step {step_name} start") 

841 except Exception: 

842 pass 

843 

844 logger.info(f"🔥 STEP: Starting processing for '{step_name}' well {well_id} (group_by={group_by.name if group_by else None}, variable_components={[vc.name for vc in variable_components] if variable_components else []})") 

845 

846 if well_id not in patterns_by_well: 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true

847 raise ValueError( 

848 f"No patterns detected for well '{well_id}' in step '{step_name}' (ID: {step_id}). " 

849 f"This indicates either: (1) no image files found for this well, " 

850 f"(2) image files don't match the expected naming pattern, or " 

851 f"(3) pattern detection failed. Check input directory: {step_input_dir}" 

852 ) 

853 

854 if isinstance(patterns_by_well[well_id], dict): 854 ↛ 856line 854 didn't jump to line 856 because the condition on line 854 was never true

855 # Grouped patterns (when group_by is set) 

856 for comp_val, pattern_list in patterns_by_well[well_id].items(): 

857 logger.debug(f"🔥 STEP: Component '{comp_val}' has {len(pattern_list)} patterns: {pattern_list}") 

858 else: 

859 # Ungrouped patterns (when group_by is None) 

860 logger.debug(f"🔥 STEP: Found {len(patterns_by_well[well_id])} ungrouped patterns: {patterns_by_well[well_id]}") 

861 

862 if func_from_plan is None: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 raise ValueError(f"Step plan missing 'func' for step: {step_plan.get('step_name', 'Unknown')} (ID: {step_id})") 

864 

865 grouped_patterns, comp_to_funcs, comp_to_base_args = prepare_patterns_and_functions( 

866 patterns_by_well[well_id], func_from_plan, component=group_by.value if group_by else None 

867 ) 

868 

869 logger.info(f"🔍 DICT_PATTERN: grouped_patterns keys: {list(grouped_patterns.keys())}") 

870 logger.info(f"🔍 DICT_PATTERN: comp_to_funcs keys: {list(comp_to_funcs.keys())}") 

871 logger.info(f"🔍 DICT_PATTERN: func_from_plan type: {type(func_from_plan)}") 

872 if isinstance(func_from_plan, dict): 872 ↛ 873line 872 didn't jump to line 873 because the condition on line 872 was never true

873 logger.info(f"🔍 DICT_PATTERN: func_from_plan keys: {list(func_from_plan.keys())}") 

874 

875 for comp_val, current_pattern_list in grouped_patterns.items(): 

876 logger.info(f"🔍 DICT_PATTERN: Processing component '{comp_val}' with {len(current_pattern_list)} patterns") 

877 exec_func_or_chain = comp_to_funcs[comp_val] 

878 base_kwargs = comp_to_base_args[comp_val] 

879 logger.info(f"🔍 DICT_PATTERN: Component '{comp_val}' exec_func_or_chain: {exec_func_or_chain}") 

880 for pattern_item in current_pattern_list: 

881 _process_single_pattern_group( 

882 context, pattern_item, exec_func_or_chain, base_kwargs, 

883 step_input_dir, step_output_dir, well_id, comp_val, 

884 read_backend, write_backend, input_mem_type, output_mem_type, 

885 device_id, same_dir, 

886 special_inputs, special_outputs, # Pass the maps from step_plan 

887 step_plan["zarr_config"], 

888 variable_components, step_id # Pass step_id for funcplan lookup 

889 ) 

890 logger.info(f"🔥 STEP: Completed processing for '{step_name}' well {well_id}.") 

891 

892 # 📄 MATERIALIZATION WRITE: Only if not writing to memory 

893 if write_backend != Backend.MEMORY.value: 

894 memory_paths = get_paths_for_well(step_output_dir, Backend.MEMORY.value) 

895 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

896 # Calculate zarr dimensions (ignored by non-zarr backends) 

897 n_channels, n_z, n_fields = _calculate_zarr_dimensions(memory_paths, context.microscope_handler) 

898 row, col = context.microscope_handler.parser.extract_row_column(well_id) 

899 filemanager.ensure_directory(step_output_dir, write_backend) 

900 filemanager.save_batch(memory_data, memory_paths, write_backend, 

901 chunk_name=well_id, zarr_config=step_plan["zarr_config"], 

902 n_channels=n_channels, n_z=n_z, n_fields=n_fields, 

903 row=row, col=col) 

904 

905 # 📄 PER-STEP MATERIALIZATION: Additional materialized output if configured 

906 if "materialized_output_dir" in step_plan: 

907 materialized_output_dir = step_plan["materialized_output_dir"] 

908 materialized_backend = step_plan["materialized_backend"] 

909 

910 memory_paths = get_paths_for_well(step_output_dir, Backend.MEMORY.value) 

911 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

912 materialized_paths = _generate_materialized_paths(memory_paths, step_output_dir, Path(materialized_output_dir)) 

913 

914 filemanager.ensure_directory(materialized_output_dir, materialized_backend) 

915 _save_materialized_data(filemanager, memory_data, materialized_paths, materialized_backend, step_plan, context, well_id) 

916 

917 logger.info(f"🔬 Materialized {len(materialized_paths)} files to {materialized_output_dir}") 

918 

919 logger.info(f"FunctionStep {step_id} ({step_name}) completed for well {well_id}.") 

920 

921 # 📄 OPENHCS METADATA: Create metadata file automatically after step completion 

922 # Track which backend was actually used for writing files 

923 actual_write_backend = step_plan['write_backend'] 

924 

925 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator 

926 metadata_generator = OpenHCSMetadataGenerator(context.filemanager) 

927 

928 # Main step output metadata 

929 is_pipeline_output = (actual_write_backend != Backend.MEMORY.value) 

930 metadata_generator.create_metadata( 

931 context, 

932 step_plan['output_dir'], 

933 actual_write_backend, 

934 is_main=is_pipeline_output, 

935 plate_root=step_plan['output_plate_root'], 

936 sub_dir=step_plan['sub_dir'] 

937 ) 

938 

939 # 📄 MATERIALIZED METADATA: Create metadata for materialized directory if it exists 

940 if 'materialized_output_dir' in step_plan: 

941 materialized_backend = step_plan.get('materialized_backend', actual_write_backend) 

942 metadata_generator.create_metadata( 

943 context, 

944 step_plan['materialized_output_dir'], 

945 materialized_backend, 

946 is_main=False, 

947 plate_root=step_plan['materialized_plate_root'], 

948 sub_dir=step_plan['materialized_sub_dir'] 

949 ) 

950 

951 # SPECIAL DATA MATERIALIZATION 

952 special_outputs = step_plan.get('special_outputs', {}) 

953 logger.debug(f"🔍 MATERIALIZATION: special_outputs from step_plan: {special_outputs}") 

954 logger.debug(f"🔍 MATERIALIZATION: special_outputs is empty? {not special_outputs}") 

955 if special_outputs: 

956 logger.info(f"🔬 MATERIALIZATION: Starting materialization for {len(special_outputs)} special outputs") 

957 self._materialize_special_outputs(filemanager, step_plan, special_outputs) 

958 logger.info(f"🔬 MATERIALIZATION: Completed materialization") 

959 else: 

960 logger.debug(f"🔍 MATERIALIZATION: No special outputs to materialize") 

961 

962 

963 

964 except Exception as e: 

965 import traceback 

966 full_traceback = traceback.format_exc() 

967 logger.error(f"Error in FunctionStep {step_id} ({step_name}): {e}", exc_info=True) 

968 logger.error(f"Full traceback for FunctionStep {step_id} ({step_name}):\n{full_traceback}") 

969 

970 

971 

972 raise 

973 

974 

975 

976 def _materialize_special_outputs(self, filemanager, step_plan, special_outputs): 

977 """Load special data from memory and call materialization functions.""" 

978 logger.debug(f"🔍 MATERIALIZE_METHOD: Processing {len(special_outputs)} special outputs") 

979 

980 for output_key, output_info in special_outputs.items(): 

981 logger.debug(f"🔍 MATERIALIZE_METHOD: Processing output_key: {output_key}") 

982 logger.debug(f"🔍 MATERIALIZE_METHOD: output_info: {output_info}") 

983 

984 mat_func = output_info.get('materialization_function') 

985 logger.debug(f"🔍 MATERIALIZE_METHOD: materialization_function: {mat_func}") 

986 

987 if mat_func: 987 ↛ 988line 987 didn't jump to line 988 because the condition on line 987 was never true

988 path = output_info['path'] 

989 logger.info(f"🔬 MATERIALIZING: {output_key} from {path}") 

990 

991 try: 

992 filemanager.ensure_directory(Path(path).parent, Backend.MEMORY.value) 

993 special_data = filemanager.load(path, Backend.MEMORY.value) 

994 logger.debug(f"🔍 MATERIALIZE_METHOD: Loaded special data type: {type(special_data)}") 

995 

996 result_path = mat_func(special_data, path, filemanager) 

997 logger.info(f"🔬 MATERIALIZED: {output_key}{result_path}") 

998 

999 except Exception as e: 

1000 logger.error(f"🔬 MATERIALIZATION ERROR: Failed to materialize {output_key}: {e}") 

1001 raise 

1002 else: 

1003 logger.warning(f"🔬 MATERIALIZATION: No materialization function for {output_key}, skipping") 

1004 

1005 

1006