Coverage for openhcs/core/steps/function_step.py: 6.9%

561 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2FunctionStep implementation for pattern-based processing. 

3 

4This module contains the FunctionStep class. During execution, FunctionStep instances 

5are stateless regarding their configuration. All operational parameters, including 

6the function(s) to execute, special input/output keys, their VFS paths, and memory types, 

7are retrieved from this step's entry in `context.step_plans`. 

8""" 

9 

10import logging 

11import os 

12import time 

13import gc 

14import json 

15import shutil 

16from functools import partial 

17from pathlib import Path 

18from typing import Any, Callable, Dict, List, Optional, Tuple, Union, OrderedDict as TypingOrderedDict, TYPE_CHECKING 

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true

21 from openhcs.core.config import PathPlanningConfig 

22 

23from openhcs.core.config import StreamingConfig 

24 

25from openhcs.constants.constants import (DEFAULT_IMAGE_EXTENSION, 

26 DEFAULT_IMAGE_EXTENSIONS, 

27 DEFAULT_SITE_PADDING, Backend, 

28 MemoryType, VariableComponents, GroupBy) 

29from openhcs.constants.input_source import InputSource 

30from openhcs.core.context.processing_context import ProcessingContext 

31from openhcs.core.steps.abstract import AbstractStep 

32from openhcs.formats.func_arg_prep import prepare_patterns_and_functions 

33from openhcs.core.memory.stack_utils import stack_slices, unstack_slices 

34# OpenHCS imports moved to local imports to avoid circular dependencies 

35 

36from openhcs.core.components.validation import GenericValidator 

37 

38logger = logging.getLogger(__name__) 

39 

40def _generate_materialized_paths(memory_paths: List[str], step_output_dir: Path, materialized_output_dir: Path) -> List[str]: 

41 """Generate materialized file paths by replacing step output directory.""" 

42 materialized_paths = [] 

43 for memory_path in memory_paths: 

44 relative_path = Path(memory_path).relative_to(step_output_dir) 

45 materialized_path = materialized_output_dir / relative_path 

46 materialized_paths.append(str(materialized_path)) 

47 return materialized_paths 

48 

49 

50def _save_materialized_data(filemanager, memory_data: List, materialized_paths: List[str], 

51 materialized_backend: str, step_plan: Dict, context, axis_id: str) -> None: 

52 """Save data to materialized location using appropriate backend.""" 

53 if materialized_backend == Backend.ZARR.value: 

54 n_channels, n_z, n_fields = _calculate_zarr_dimensions(materialized_paths, context.microscope_handler) 

55 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id) 

56 filemanager.save_batch(memory_data, materialized_paths, materialized_backend, 

57 chunk_name=axis_id, zarr_config=step_plan.get("zarr_config"), 

58 n_channels=n_channels, n_z=n_z, n_fields=n_fields, 

59 row=row, col=col) 

60 else: 

61 filemanager.save_batch(memory_data, materialized_paths, materialized_backend) 

62 

63 

64 

65 

66def get_all_image_paths(input_dir, backend, axis_id, filemanager, microscope_handler): 

67 """ 

68 Get all image file paths for a specific well from a directory. 

69 

70 Args: 

71 input_dir: Directory to search for images 

72 axis_id: Well identifier to filter files 

73 backend: Backend to use for file listing 

74 filemanager: FileManager instance 

75 microscope_handler: Microscope handler with parser for filename parsing 

76 

77 Returns: 

78 List of full file paths for the well 

79 """ 

80 # List all image files in directory 

81 all_image_files = filemanager.list_image_files(str(input_dir), backend) 

82 

83 # Filter by well using parser (FIXED: was using naive string matching) 

84 axis_files = [] 

85 parser = microscope_handler.parser 

86 

87 for f in all_image_files: 

88 filename = os.path.basename(str(f)) 

89 metadata = parser.parse_filename(filename) 

90 # Use dynamic multiprocessing axis instead of hardcoded 'well' 

91 from openhcs.constants import MULTIPROCESSING_AXIS 

92 axis_key = MULTIPROCESSING_AXIS.value 

93 if metadata and metadata.get(axis_key) == axis_id: 

94 axis_files.append(str(f)) 

95 

96 # Remove duplicates and sort 

97 sorted_files = sorted(list(set(axis_files))) 

98 

99 # Prepare full file paths 

100 input_dir_path = Path(input_dir) 

101 full_file_paths = [str(input_dir_path / Path(f).name) for f in sorted_files] 

102 

103 logger.debug(f"Found {len(all_image_files)} total files, {len(full_file_paths)} for axis {axis_id}") 

104 

105 return full_file_paths 

106 

107 

108def create_image_path_getter(axis_id, filemanager, microscope_handler): 

109 """ 

110 Create a specialized image path getter function using runtime context. 

111 

112 Args: 

113 axis_id: Well identifier 

114 filemanager: FileManager instance 

115 microscope_handler: Microscope handler with parser for filename parsing 

116 

117 Returns: 

118 Function that takes (input_dir, backend) and returns image paths for the well 

119 """ 

120 def get_paths_for_axis(input_dir, backend): 

121 return get_all_image_paths( 

122 input_dir=input_dir, 

123 axis_id=axis_id, 

124 backend=backend, 

125 filemanager=filemanager, 

126 microscope_handler=microscope_handler 

127 ) 

128 return get_paths_for_axis 

129 

130# Environment variable to disable universal GPU defragmentation 

131DISABLE_GPU_DEFRAG = os.getenv('OPENHCS_DISABLE_GPU_DEFRAG', 'false').lower() == 'true' 

132 

133def _bulk_preload_step_images( 

134 step_input_dir: Path, 

135 step_output_dir: Path, 

136 axis_id: str, 

137 read_backend: str, 

138 patterns_by_well: Dict[str, Any], 

139 filemanager: 'FileManager', 

140 microscope_handler: 'MicroscopeHandler', 

141 zarr_config: Optional[Dict[str, Any]] = None 

142) -> None: 

143 """ 

144 Pre-load all images for this step from source backend into memory backend. 

145 

146 This reduces I/O overhead by doing a single bulk read operation 

147 instead of loading images per pattern group. 

148 

149 Note: External conditional logic ensures this is only called for non-memory backends. 

150 """ 

151 import time 

152 start_time = time.time() 

153 

154 logger.debug(f"🔄 BULK PRELOAD: Loading images from {read_backend} to memory for well {axis_id}") 

155 

156 # Get all files for this well from patterns 

157 all_files = [] 

158 # Create specialized path getter for this well 

159 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler) 

160 

161 # Get all image paths for this well 

162 full_file_paths = get_paths_for_axis(step_input_dir, read_backend) 

163 

164 if not full_file_paths: 

165 raise RuntimeError(f"🔄 BULK PRELOAD: No files found for well {axis_id} in {step_input_dir} with backend {read_backend}") 

166 

167 # Load from source backend with conditional zarr_config 

168 if read_backend == Backend.ZARR.value: 

169 raw_images = filemanager.load_batch(full_file_paths, read_backend, zarr_config=zarr_config) 

170 else: 

171 raw_images = filemanager.load_batch(full_file_paths, read_backend) 

172 

173 # Ensure directory exists in memory backend before saving 

174 filemanager.ensure_directory(str(step_input_dir), Backend.MEMORY.value) 

175 

176 # Save to memory backend using OUTPUT paths 

177 # memory_paths = [str(step_output_dir / Path(fp).name) for fp in full_file_paths] 

178 for file_path in full_file_paths: 

179 if filemanager.exists(file_path, Backend.MEMORY.value): 

180 filemanager.delete(file_path, Backend.MEMORY.value) 

181 logger.debug(f"🔄 BULK PRELOAD: Deleted existing file {file_path} before bulk preload") 

182 

183 filemanager.save_batch(raw_images, full_file_paths, Backend.MEMORY.value) 

184 logger.debug(f"🔄 BULK PRELOAD: Saving {file_path} to memory") 

185 

186 # Clean up source references - keep only memory backend references 

187 del raw_images 

188 

189 load_time = time.time() - start_time 

190 logger.debug(f"🔄 BULK PRELOAD: Completed in {load_time:.2f}s - {len(full_file_paths)} images now in memory") 

191 

192def _bulk_writeout_step_images( 

193 step_output_dir: Path, 

194 write_backend: str, 

195 axis_id: str, 

196 zarr_config: Optional[Dict[str, Any]], 

197 filemanager: 'FileManager', 

198 microscope_handler: Optional[Any] = None 

199) -> None: 

200 """ 

201 Write all processed images from memory to final backend (disk/zarr). 

202 

203 This reduces I/O overhead by doing a single bulk write operation 

204 instead of writing images per pattern group. 

205 

206 Note: External conditional logic ensures this is only called for non-memory backends. 

207 """ 

208 import time 

209 start_time = time.time() 

210 

211 logger.debug(f"🔄 BULK WRITEOUT: Writing images from memory to {write_backend} for well {axis_id}") 

212 

213 # Create specialized path getter and get memory paths for this well 

214 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler) 

215 memory_file_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value) 

216 

217 if not memory_file_paths: 

218 raise RuntimeError(f"🔄 BULK WRITEOUT: No image files found for well {axis_id} in memory directory {step_output_dir}") 

219 

220 # Convert relative memory paths back to absolute paths for target backend 

221 # Memory backend stores relative paths, but target backend needs absolute paths 

222# file_paths = 

223# for memory_path in memory_file_paths: 

224# # Get just the filename and construct proper target path 

225# filename = Path(memory_path).name 

226# target_path = step_output_dir / filename 

227# file_paths.append(str(target_path)) 

228 

229 file_paths = memory_file_paths 

230 logger.debug(f"🔄 BULK WRITEOUT: Found {len(file_paths)} image files in memory to write") 

231 

232 # Load all data from memory backend 

233 memory_data = filemanager.load_batch(file_paths, Backend.MEMORY.value) 

234 

235 # Ensure output directory exists before bulk write 

236 filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value) 

237 

238 # Bulk write to target backend with conditional zarr_config 

239 if write_backend == Backend.ZARR.value: 

240 # Calculate zarr dimensions from file paths 

241 if microscope_handler is not None: 

242 n_channels, n_z, n_fields = _calculate_zarr_dimensions(file_paths, microscope_handler) 

243 # Parse well to get row and column for zarr structure 

244 row, col = microscope_handler.parser.extract_component_coordinates(axis_id) 

245 filemanager.save_batch(memory_data, file_paths, write_backend, 

246 chunk_name=axis_id, zarr_config=zarr_config, 

247 n_channels=n_channels, n_z=n_z, n_fields=n_fields, 

248 row=row, col=col) 

249 else: 

250 # Fallback without dimensions if microscope_handler not available 

251 filemanager.save_batch(memory_data, file_paths, write_backend, chunk_name=axis_id, zarr_config=zarr_config) 

252 else: 

253 filemanager.save_batch(memory_data, file_paths, write_backend) 

254 

255 write_time = time.time() - start_time 

256 logger.debug(f"🔄 BULK WRITEOUT: Completed in {write_time:.2f}s - {len(memory_data)} images written to {write_backend}") 

257 

258def _calculate_zarr_dimensions(file_paths: List[Union[str, Path]], microscope_handler) -> tuple[int, int, int]: 

259 """ 

260 Calculate zarr dimensions (n_channels, n_z, n_fields) from file paths using microscope parser. 

261 

262 Args: 

263 file_paths: List of file paths to analyze 

264 microscope_handler: Microscope handler with filename parser 

265 

266 Returns: 

267 Tuple of (n_channels, n_z, n_fields) 

268 """ 

269 parsed_files = [] 

270 for file_path in file_paths: 

271 filename = Path(file_path).name 

272 metadata = microscope_handler.parser.parse_filename(filename) 

273 parsed_files.append(metadata) 

274 

275 # Count unique values for each dimension from actual files 

276 n_channels = len(set(f.get('channel') for f in parsed_files if f.get('channel') is not None)) 

277 n_z = len(set(f.get('z_index') for f in parsed_files if f.get('z_index') is not None)) 

278 n_fields = len(set(f.get('site') for f in parsed_files if f.get('site') is not None)) 

279 

280 # Ensure at least 1 for each dimension (handle cases where metadata is missing) 

281 n_channels = max(1, n_channels) 

282 n_z = max(1, n_z) 

283 n_fields = max(1, n_fields) 

284 

285 return n_channels, n_z, n_fields 

286 

287 

288 

289def _is_3d(array: Any) -> bool: 

290 """Check if an array is 3D.""" 

291 return hasattr(array, 'ndim') and array.ndim == 3 

292 

293def _execute_function_core( 

294 func_callable: Callable, 

295 main_data_arg: Any, 

296 base_kwargs: Dict[str, Any], 

297 context: 'ProcessingContext', 

298 special_inputs_plan: Dict[str, str], # {'arg_name_for_func': 'special_path_value'} 

299 special_outputs_plan: TypingOrderedDict[str, str], # {'output_key': 'special_path_value'}, order matters 

300 axis_id: str, # Add axis_id parameter 

301 input_memory_type: str, 

302 device_id: int 

303) -> Any: # Returns the main processed data stack 

304 """ 

305 Executes a single callable, handling its special I/O. 

306 - Loads special inputs from VFS paths in `special_inputs_plan`. 

307 - Calls `func_callable(main_data_arg, **all_kwargs)`. 

308 - If `special_outputs_plan` is non-empty, expects func to return (main_out, sp_val1, sp_val2,...). 

309 - Saves special outputs positionally to VFS paths in `special_outputs_plan`. 

310 - Returns the main processed data stack. 

311 """ 

312 final_kwargs = base_kwargs.copy() 

313 

314 if special_inputs_plan: 

315 logger.info(f"�� SPECIAL_INPUTS_DEBUG : special_inputs_plan = {special_inputs_plan}") 

316 for arg_name, path_info in special_inputs_plan.items(): 

317 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Processing arg_name='{arg_name}', path_info={path_info} (type: {type(path_info)})") 

318 

319 

320 # Extract path string from the path info dictionary 

321 # Current format: {"path": "/path/to/file.pkl", "source_step_id": "step_123"} 

322 if isinstance(path_info, dict) and 'path' in path_info: 

323 special_path_value = path_info['path'] 

324 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Extracted path from dict: '{special_path_value}' (type: {type(special_path_value)})") 

325 else: 

326 special_path_value = path_info # Fallback if it's already a string 

327 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Using path_info directly: '{special_path_value}' (type: {type(special_path_value)})") 

328 

329 logger.info(f"Loading special input '{arg_name}' from path '{special_path_value}' (memory backend)") 

330 try: 

331 final_kwargs[arg_name] = context.filemanager.load(special_path_value, Backend.MEMORY.value) 

332 except Exception as e: 

333 logger.error(f"Failed to load special input '{arg_name}' from '{special_path_value}': {e}", exc_info=True) 

334 raise 

335 

336 # Auto-inject context if function signature expects it 

337 import inspect 

338 sig = inspect.signature(func_callable) 

339 if 'context' in sig.parameters: 

340 final_kwargs['context'] = context 

341 

342 # 🔍 DEBUG: Log input dimensions 

343 input_shape = getattr(main_data_arg, 'shape', 'no shape attr') 

344 input_type = type(main_data_arg).__name__ 

345 logger.debug(f"🔍 FUNCTION INPUT: {func_callable.__name__} - shape: {input_shape}, type: {input_type}") 

346 

347 # ⚡ INFO: Terse function execution log for user feedback 

348 logger.info(f"⚡ Executing: {func_callable.__name__}") 

349 

350 # 🔍 DEBUG: Log function attributes before execution 

351 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - special_outputs: {getattr(func_callable, '__special_outputs__', 'None')}") 

352 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - input_memory_type: {getattr(func_callable, 'input_memory_type', 'None')}") 

353 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - output_memory_type: {getattr(func_callable, 'output_memory_type', 'None')}") 

354 

355 raw_function_output = func_callable(main_data_arg, **final_kwargs) 

356 

357 # 🔍 DEBUG: Log output dimensions and type details 

358 output_shape = getattr(raw_function_output, 'shape', 'no shape attr') 

359 output_type = type(raw_function_output).__name__ 

360 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - shape: {output_shape}, type: {output_type}") 

361 

362 # 🔍 DEBUG: If it's a tuple, log details about each element 

363 if isinstance(raw_function_output, tuple): 

364 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - tuple length: {len(raw_function_output)}") 

365 for i, element in enumerate(raw_function_output): 

366 elem_shape = getattr(element, 'shape', 'no shape attr') 

367 elem_type = type(element).__name__ 

368 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - element[{i}]: shape={elem_shape}, type={elem_type}") 

369 else: 

370 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - not a tuple, single return value") 

371 

372 main_output_data = raw_function_output 

373 

374 # 🔍 DEBUG: Log special output plan status 

375 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: {special_outputs_plan}") 

376 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Is empty? {not special_outputs_plan}") 

377 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Length: {len(special_outputs_plan) if special_outputs_plan else 0}") 

378 

379 # Only log special outputs if there are any (avoid spamming empty dict logs) 

380 if special_outputs_plan: 

381 logger.debug(f"🔍 SPECIAL OUTPUT: {special_outputs_plan}") 

382 if special_outputs_plan: 

383 num_special_outputs = len(special_outputs_plan) 

384 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Expected {num_special_outputs} special outputs") 

385 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned type: {type(raw_function_output)}") 

386 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned tuple length: {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'not tuple'}") 

387 

388 if not isinstance(raw_function_output, tuple) or len(raw_function_output) != (1 + num_special_outputs): 

389 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Function '{getattr(func_callable, '__name__', 'unknown')}' special output mismatch") 

390 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Expected tuple of {1 + num_special_outputs} values") 

391 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Got {type(raw_function_output)} with {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'N/A'} values") 

392 raise ValueError( 

393 f"Function '{getattr(func_callable, '__name__', 'unknown')}' was expected to return a tuple of " 

394 f"{1 + num_special_outputs} values (main_output + {num_special_outputs} special) " 

395 f"based on 'special_outputs' in step plan, but returned {len(raw_function_output) if isinstance(raw_function_output, tuple) else type(raw_function_output)} values." 

396 ) 

397 main_output_data = raw_function_output[0] 

398 returned_special_values_tuple = raw_function_output[1:] 

399 

400 # 🔍 DEBUG: Log what we extracted 

401 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data type: {type(main_output_data)}") 

402 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data shape: {getattr(main_output_data, 'shape', 'no shape')}") 

403 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted {len(returned_special_values_tuple)} special values") 

404 

405 # Iterate through special_outputs_plan (which must be ordered by compiler) 

406 # and match with positionally returned special values. 

407 for i, (output_key, vfs_path_info) in enumerate(special_outputs_plan.items()): 

408 logger.info(f"Saving special output '{output_key}' to VFS path '{vfs_path_info}' (memory backend)") 

409 if i < len(returned_special_values_tuple): 

410 value_to_save = returned_special_values_tuple[i] 

411 # Extract path string from the path info dictionary 

412 # Current format: {"path": "/path/to/file.pkl"} 

413 if isinstance(vfs_path_info, dict) and 'path' in vfs_path_info: 

414 vfs_path = vfs_path_info['path'] 

415 else: 

416 vfs_path = vfs_path_info # Fallback if it's already a string 

417 # # Add axis_id prefix to filename for memory backend to avoid thread collisions 

418 # from pathlib import Path 

419 # vfs_path_obj = Path(vfs_path) 

420 # prefixed_filename = f"{axis_id}_{vfs_path_obj.name}" 

421 # prefixed_vfs_path = str(vfs_path_obj.parent / prefixed_filename) 

422 

423 logger.info(f"🔍 SPECIAL_SAVE: Saving '{output_key}' to '{vfs_path}' (memory backend)") 

424 # Ensure directory exists for memory backend 

425 parent_dir = str(Path(vfs_path).parent) 

426 context.filemanager.ensure_directory(parent_dir, Backend.MEMORY.value) 

427 context.filemanager.save(value_to_save, vfs_path, Backend.MEMORY.value) 

428 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory") 

429 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory") 

430 else: 

431 # This indicates a mismatch that should ideally be caught by schema/validation 

432 logger.error(f"Mismatch: {num_special_outputs} special outputs planned, but fewer values returned by function for key '{output_key}'.") 

433 # Or, if partial returns are allowed, this might be a warning. For now, error. 

434 raise ValueError(f"Function did not return enough values for all planned special outputs. Missing value for '{output_key}'.") 

435 

436 return main_output_data 

437 

438def _execute_chain_core( 

439 initial_data_stack: Any, 

440 func_chain: List[Union[Callable, Tuple[Callable, Dict]]], 

441 context: 'ProcessingContext', 

442 step_special_inputs_plan: Dict[str, str], 

443 step_special_outputs_plan: TypingOrderedDict[str, str], 

444 axis_id: str, # Add axis_id parameter 

445 device_id: int, 

446 input_memory_type: str, 

447 step_index: int, # Add step_index for funcplan lookup 

448 dict_key: str = "default" # Add dict_key for funcplan lookup 

449) -> Any: 

450 current_stack = initial_data_stack 

451 current_memory_type = input_memory_type # Track memory type from frozen context 

452 

453 for i, func_item in enumerate(func_chain): 

454 actual_callable: Callable 

455 base_kwargs_for_item: Dict[str, Any] = {} 

456 is_last_in_chain = (i == len(func_chain) - 1) 

457 

458 if isinstance(func_item, tuple) and len(func_item) == 2 and callable(func_item[0]): 

459 actual_callable, base_kwargs_for_item = func_item 

460 elif callable(func_item): 

461 actual_callable = func_item 

462 else: 

463 raise TypeError(f"Invalid item in function chain: {func_item}.") 

464 

465 # Convert to function's input memory type (noop if same) 

466 from openhcs.core.memory.converters import convert_memory 

467 current_stack = convert_memory( 

468 data=current_stack, 

469 source_type=current_memory_type, 

470 target_type=actual_callable.input_memory_type, 

471 gpu_id=device_id, 

472 allow_cpu_roundtrip=False 

473 ) 

474 

475 # Use funcplan to determine which outputs this function should save 

476 funcplan = context.step_plans[step_index].get("funcplan", {}) 

477 func_name = getattr(actual_callable, '__name__', 'unknown') 

478 

479 # Construct execution key: function_name_dict_key_chain_position 

480 execution_key = f"{func_name}_{dict_key}_{i}" 

481 

482 logger.info(f"🔍 FUNCPLAN DEBUG: execution_key = {execution_key}") 

483 logger.info(f"🔍 FUNCPLAN DEBUG: funcplan keys = {list(funcplan.keys()) if funcplan else 'EMPTY'}") 

484 logger.info(f"🔍 FUNCPLAN DEBUG: step_special_outputs_plan = {step_special_outputs_plan}") 

485 

486 if execution_key in funcplan: 

487 # Get outputs this specific function should save 

488 outputs_to_save = funcplan[execution_key] 

489 outputs_plan_for_this_call = { 

490 key: step_special_outputs_plan[key] 

491 for key in outputs_to_save 

492 if key in step_special_outputs_plan 

493 } 

494 logger.info(f"🔍 FUNCPLAN: {execution_key} -> {outputs_to_save}") 

495 logger.info(f"🔍 FUNCPLAN: outputs_plan_for_this_call = {outputs_plan_for_this_call}") 

496 else: 

497 # Fallback: no funcplan entry, save nothing 

498 outputs_plan_for_this_call = {} 

499 logger.info(f"🔍 FUNCPLAN: No entry for {execution_key}, saving nothing") 

500 

501 current_stack = _execute_function_core( 

502 func_callable=actual_callable, 

503 main_data_arg=current_stack, 

504 base_kwargs=base_kwargs_for_item, 

505 context=context, 

506 special_inputs_plan=step_special_inputs_plan, 

507 special_outputs_plan=outputs_plan_for_this_call, 

508 axis_id=axis_id, 

509 device_id=device_id, 

510 input_memory_type=input_memory_type, 

511 ) 

512 

513 # Update current memory type from frozen context 

514 current_memory_type = actual_callable.output_memory_type 

515 

516 return current_stack 

517 

518def _process_single_pattern_group( 

519 context: 'ProcessingContext', 

520 pattern_group_info: Any, 

521 executable_func_or_chain: Any, 

522 base_func_args: Dict[str, Any], 

523 step_input_dir: Path, 

524 step_output_dir: Path, 

525 axis_id: str, 

526 component_value: str, 

527 read_backend: str, 

528 write_backend: str, 

529 input_memory_type_from_plan: str, # Explicitly from plan 

530 output_memory_type_from_plan: str, # Explicitly from plan 

531 device_id: Optional[int], 

532 same_directory: bool, 

533 special_inputs_map: Dict[str, str], 

534 special_outputs_map: TypingOrderedDict[str, str], 

535 zarr_config: Optional[Dict[str, Any]], 

536 variable_components: Optional[List[str]] = None, 

537 step_index: Optional[int] = None # Add step_index for funcplan lookup 

538) -> None: 

539 start_time = time.time() 

540 pattern_repr = str(pattern_group_info)[:100] 

541 logger.debug(f"🔥 PATTERN: Processing {pattern_repr} for well {axis_id}") 

542 

543 try: 

544 if not context.microscope_handler: 

545 raise RuntimeError("MicroscopeHandler not available in context.") 

546 

547 matching_files = context.microscope_handler.path_list_from_pattern( 

548 str(step_input_dir), pattern_group_info, context.filemanager, Backend.MEMORY.value, 

549 [vc.value for vc in variable_components] if variable_components else None 

550 ) 

551 

552 if not matching_files: 

553 raise ValueError( 

554 f"No matching files found for pattern group {pattern_repr} in {step_input_dir}. " 

555 f"This indicates either: (1) no image files exist in the directory, " 

556 f"(2) files don't match the pattern, or (3) pattern parsing failed. " 

557 f"Check that input files exist and match the expected naming convention." 

558 ) 

559 

560 logger.debug(f"🔥 PATTERN: Found {len(matching_files)} files: {[Path(f).name for f in matching_files]}") 

561 

562 # Sort files to ensure consistent ordering (especially important for z-stacks) 

563 matching_files.sort() 

564 logger.debug(f"🔥 PATTERN: Sorted files: {[Path(f).name for f in matching_files]}") 

565 

566 full_file_paths = [str(step_input_dir / f) for f in matching_files] 

567 raw_slices = context.filemanager.load_batch(full_file_paths, Backend.MEMORY.value) 

568 

569 if not raw_slices: 

570 raise ValueError( 

571 f"No valid images loaded for pattern group {pattern_repr} in {step_input_dir}. " 

572 f"Found {len(matching_files)} matching files but failed to load any valid images. " 

573 f"This indicates corrupted image files, unsupported formats, or I/O errors. " 

574 f"Check file integrity and format compatibility." 

575 ) 

576 

577 # 🔍 DEBUG: Log stacking operation 

578 logger.debug(f"🔍 STACKING: {len(raw_slices)} slices → memory_type: {input_memory_type_from_plan}") 

579 if raw_slices: 

580 slice_shapes = [getattr(s, 'shape', 'no shape') for s in raw_slices[:3]] # First 3 shapes 

581 logger.debug(f"🔍 STACKING: Sample slice shapes: {slice_shapes}") 

582 

583 main_data_stack = stack_slices( 

584 slices=raw_slices, memory_type=input_memory_type_from_plan, gpu_id=device_id 

585 ) 

586 

587 # 🔍 DEBUG: Log stacked result 

588 stack_shape = getattr(main_data_stack, 'shape', 'no shape') 

589 stack_type = type(main_data_stack).__name__ 

590 logger.debug(f"🔍 STACKED RESULT: shape: {stack_shape}, type: {stack_type}") 

591 

592 logger.info(f"🔍 special_outputs_map: {special_outputs_map}") 

593 

594 final_base_kwargs = base_func_args.copy() 

595 

596 # Get step function from step plan 

597 step_func = context.step_plans[step_index]["func"] 

598 

599 if isinstance(step_func, dict): 

600 dict_key_for_funcplan = component_value # Use actual dict key for dict patterns 

601 else: 

602 dict_key_for_funcplan = "default" # Use default for list/single patterns 

603 

604 if isinstance(executable_func_or_chain, list): 

605 processed_stack = _execute_chain_core( 

606 main_data_stack, executable_func_or_chain, context, 

607 special_inputs_map, special_outputs_map, axis_id, 

608 device_id, input_memory_type_from_plan, step_index, dict_key_for_funcplan 

609 ) 

610 elif callable(executable_func_or_chain): 

611 # For single functions, apply funcplan filtering like in chain execution 

612 funcplan = context.step_plans[step_index].get("funcplan", {}) 

613 func_name = getattr(executable_func_or_chain, '__name__', 'unknown') 

614 execution_key = f"{func_name}_{dict_key_for_funcplan}_0" # Position 0 for single functions 

615 

616 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: execution_key = {execution_key}") 

617 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: funcplan keys = {list(funcplan.keys()) if funcplan else 'EMPTY'}") 

618 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: special_outputs_map = {special_outputs_map}") 

619 

620 if execution_key in funcplan: 

621 # Get outputs this specific function should save 

622 outputs_to_save = funcplan[execution_key] 

623 filtered_special_outputs_map = { 

624 key: special_outputs_map[key] 

625 for key in outputs_to_save 

626 if key in special_outputs_map 

627 } 

628 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: {execution_key} -> {outputs_to_save}") 

629 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: filtered_special_outputs_map = {filtered_special_outputs_map}") 

630 else: 

631 # Fallback: no funcplan entry, save nothing 

632 filtered_special_outputs_map = {} 

633 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: No entry for {execution_key}, saving nothing") 

634 

635 processed_stack = _execute_function_core( 

636 executable_func_or_chain, main_data_stack, final_base_kwargs, context, 

637 special_inputs_map, filtered_special_outputs_map, axis_id, input_memory_type_from_plan, device_id 

638 ) 

639 else: 

640 raise TypeError(f"Invalid executable_func_or_chain: {type(executable_func_or_chain)}") 

641 

642 # 🔍 DEBUG: Check what shape the function actually returned 

643 input_shape = getattr(main_data_stack, 'shape', 'unknown') 

644 output_shape = getattr(processed_stack, 'shape', 'unknown') 

645 processed_type = type(processed_stack).__name__ 

646 logger.debug(f"🔍 PROCESSING RESULT: input: {input_shape} → output: {output_shape}, type: {processed_type}") 

647 

648 # 🔍 DEBUG: Additional validation logging 

649 logger.debug(f"🔍 VALIDATION: processed_stack type: {type(processed_stack)}") 

650 logger.debug(f"🔍 VALIDATION: processed_stack has shape attr: {hasattr(processed_stack, 'shape')}") 

651 logger.debug(f"🔍 VALIDATION: processed_stack has ndim attr: {hasattr(processed_stack, 'ndim')}") 

652 if hasattr(processed_stack, 'ndim'): 

653 logger.debug(f"🔍 VALIDATION: processed_stack ndim: {processed_stack.ndim}") 

654 if hasattr(processed_stack, 'shape'): 

655 logger.debug(f"🔍 VALIDATION: processed_stack shape: {processed_stack.shape}") 

656 

657 if not _is_3d(processed_stack): 

658 logger.error(f"🔍 VALIDATION ERROR: processed_stack is not 3D") 

659 logger.error(f"🔍 VALIDATION ERROR: Type: {type(processed_stack)}") 

660 logger.error(f"🔍 VALIDATION ERROR: Shape: {getattr(processed_stack, 'shape', 'no shape attr')}") 

661 logger.error(f"🔍 VALIDATION ERROR: Has ndim: {hasattr(processed_stack, 'ndim')}") 

662 if hasattr(processed_stack, 'ndim'): 

663 logger.error(f"🔍 VALIDATION ERROR: ndim value: {processed_stack.ndim}") 

664 raise ValueError(f"Main processing must result in a 3D array, got {getattr(processed_stack, 'shape', 'unknown')}") 

665 

666 # 🔍 DEBUG: Log unstacking operation 

667 logger.debug(f"🔍 UNSTACKING: shape: {output_shape} → memory_type: {output_memory_type_from_plan}") 

668 

669 

670 

671 output_slices = unstack_slices( 

672 array=processed_stack, memory_type=output_memory_type_from_plan, gpu_id=device_id, validate_slices=True 

673 ) 

674 

675 # 🔍 DEBUG: Log unstacked result 

676 if output_slices: 

677 unstacked_shapes = [getattr(s, 'shape', 'no shape') for s in output_slices[:3]] # First 3 shapes 

678 logger.debug(f"🔍 UNSTACKED RESULT: {len(output_slices)} slices, sample shapes: {unstacked_shapes}") 

679 

680 # Handle cases where function returns fewer images than inputs (e.g., z-stack flattening, channel compositing) 

681 # In such cases, we save only the returned images using the first N input filenames 

682 num_outputs = len(output_slices) 

683 num_inputs = len(matching_files) 

684 

685 if num_outputs < num_inputs: 

686 logger.debug(f"Function returned {num_outputs} images from {num_inputs} inputs - likely flattening operation") 

687 elif num_outputs > num_inputs: 

688 logger.warning(f"Function returned more images ({num_outputs}) than inputs ({num_inputs}) - unexpected") 

689 

690 # Save the output images using batch operations 

691 try: 

692 # Prepare batch data 

693 output_data = [] 

694 output_paths_batch = [] 

695 

696 for i, img_slice in enumerate(output_slices): 

697 # FAIL FAST: No fallback filenames - if we have more outputs than inputs, something is wrong 

698 if i >= len(matching_files): 

699 raise ValueError( 

700 f"Function returned {num_outputs} output slices but only {num_inputs} input files available. " 

701 f"Cannot generate filename for output slice {i}. This indicates a bug in the function or " 

702 f"unstacking logic - functions should return same or fewer images than inputs." 

703 ) 

704 

705 input_filename = matching_files[i] 

706 output_filename = Path(input_filename).name 

707 output_path = Path(step_output_dir) / output_filename 

708 

709 # Always ensure we can write to the output path (delete if exists) 

710 if context.filemanager.exists(str(output_path), Backend.MEMORY.value): 

711 context.filemanager.delete(str(output_path), Backend.MEMORY.value) 

712 

713 output_data.append(img_slice) 

714 output_paths_batch.append(str(output_path)) 

715 

716 # Ensure directory exists 

717 context.filemanager.ensure_directory(str(step_output_dir), Backend.MEMORY.value) 

718 

719 # Only pass zarr_config to zarr backend - fail loud for invalid parameters 

720 #if write_backend == Backend.ZARR.value: 

721 # Batch save 

722 # context.filemanager.save_batch(output_data, output_paths_batch, write_backend, zarr_config=zarr_config) 

723 # else: 

724 context.filemanager.save_batch(output_data, output_paths_batch, Backend.MEMORY.value) 

725 

726 except Exception as e: 

727 logger.error(f"Error saving batch of output slices for pattern {pattern_repr}: {e}", exc_info=True) 

728 

729 # 🔥 CLEANUP: If function returned fewer images than inputs, delete the unused input files 

730 # This prevents unused channel files from remaining in memory after compositing 

731 if num_outputs < num_inputs: 

732 for j in range(num_outputs, num_inputs): 

733 unused_input_filename = matching_files[j] 

734 unused_input_path = Path(step_input_dir) / unused_input_filename 

735 if context.filemanager.exists(str(unused_input_path), Backend.MEMORY.value): 

736 context.filemanager.delete(str(unused_input_path), Backend.MEMORY.value) 

737 logger.debug(f"🔥 CLEANUP: Deleted unused input file: {unused_input_filename}") 

738 

739 

740 

741 logger.debug(f"Finished pattern group {pattern_repr} in {(time.time() - start_time):.2f}s.") 

742 except Exception as e: 

743 import traceback 

744 full_traceback = traceback.format_exc() 

745 logger.error(f"Error processing pattern group {pattern_repr}: {e}", exc_info=True) 

746 logger.error(f"Full traceback for pattern group {pattern_repr}:\n{full_traceback}") 

747 raise ValueError(f"Failed to process pattern group {pattern_repr}: {e}") from e 

748 

749class FunctionStep(AbstractStep): 

750 

751 def __init__( 

752 self, 

753 func: Union[Callable, Tuple[Callable, Dict], List[Union[Callable, Tuple[Callable, Dict]]]], 

754 **kwargs 

755 ): 

756 # Generate default name from function if not provided 

757 if 'name' not in kwargs or kwargs['name'] is None: 

758 actual_func_for_name = func 

759 if isinstance(func, tuple): 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true

760 actual_func_for_name = func[0] 

761 elif isinstance(func, list) and func: 761 ↛ 762line 761 didn't jump to line 762 because the condition on line 761 was never true

762 first_item = func[0] 

763 if isinstance(first_item, tuple): 

764 actual_func_for_name = first_item[0] 

765 elif callable(first_item): 

766 actual_func_for_name = first_item 

767 kwargs['name'] = getattr(actual_func_for_name, '__name__', 'FunctionStep') 

768 

769 super().__init__(**kwargs) 

770 self.func = func # This is used by prepare_patterns_and_functions at runtime 

771 

772 def process(self, context: 'ProcessingContext', step_index: int) -> None: 

773 # Access step plan by index (step_plans keyed by index, not step_id) 

774 step_plan = context.step_plans[step_index] 

775 

776 # Get step name for logging 

777 step_name = step_plan['step_name'] 

778 

779 try: 

780 axis_id = step_plan['axis_id'] 

781 step_input_dir = Path(step_plan['input_dir']) 

782 step_output_dir = Path(step_plan['output_dir']) 

783 variable_components = step_plan['variable_components'] 

784 group_by = step_plan['group_by'] 

785 func_from_plan = step_plan['func'] 

786 

787 # special_inputs/outputs are dicts: {'key': 'vfs_path_value'} 

788 special_inputs = step_plan['special_inputs'] 

789 special_outputs = step_plan['special_outputs'] # Should be OrderedDict if order matters 

790 

791 read_backend = step_plan['read_backend'] 

792 write_backend = step_plan['write_backend'] 

793 input_mem_type = step_plan['input_memory_type'] 

794 output_mem_type = step_plan['output_memory_type'] 

795 microscope_handler = context.microscope_handler 

796 filemanager = context.filemanager 

797 

798 # Create path getter for this well 

799 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler) 

800 

801 # Store path getter in step_plan for streaming access 

802 step_plan["get_paths_for_axis"] = get_paths_for_axis 

803 

804 # Get patterns first for bulk preload 

805 # Use dynamic filter parameter based on current multiprocessing axis 

806 from openhcs.constants import MULTIPROCESSING_AXIS 

807 axis_name = MULTIPROCESSING_AXIS.value 

808 filter_kwargs = {f"{axis_name}_filter": [axis_id]} 

809 

810 patterns_by_well = microscope_handler.auto_detect_patterns( 

811 str(step_input_dir), # folder_path 

812 filemanager, # filemanager 

813 read_backend, # backend 

814 extensions=DEFAULT_IMAGE_EXTENSIONS, # extensions 

815 group_by=group_by, # Pass GroupBy enum directly 

816 variable_components=[vc.value for vc in variable_components] if variable_components else [], # variable_components for placeholder logic 

817 **filter_kwargs # Dynamic filter parameter 

818 ) 

819 

820 

821 # Only access gpu_id if the step requires GPU (has GPU memory types) 

822 from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES 

823 requires_gpu = (input_mem_type in VALID_GPU_MEMORY_TYPES or 

824 output_mem_type in VALID_GPU_MEMORY_TYPES) 

825 

826 # Ensure variable_components is never None - use default if missing 

827 if variable_components is None: 

828 variable_components = [VariableComponents.SITE] # Default fallback 

829 logger.warning(f"Step {step_index} ({step_name}) had None variable_components, using default [SITE]") 

830 if requires_gpu: 

831 device_id = step_plan['gpu_id'] 

832 logger.debug(f"🔥 DEBUG: Step {step_index} gpu_id from plan: {device_id}, input_mem: {input_mem_type}, output_mem: {output_mem_type}") 

833 else: 

834 device_id = None # CPU-only step 

835 logger.debug(f"🔥 DEBUG: Step {step_index} is CPU-only, input_mem: {input_mem_type}, output_mem: {output_mem_type}") 

836 

837 logger.debug(f"🔥 DEBUG: Step {step_index} read_backend: {read_backend}, write_backend: {write_backend}") 

838 

839 if not all([axis_id, step_input_dir, step_output_dir]): 

840 raise ValueError(f"Plan missing essential keys for step {step_index}") 

841 

842 same_dir = str(step_input_dir) == str(step_output_dir) 

843 logger.info(f"Step {step_index} ({step_name}) I/O: read='{read_backend}', write='{write_backend}'.") 

844 logger.info(f"Step {step_index} ({step_name}) Paths: input_dir='{step_input_dir}', output_dir='{step_output_dir}', same_dir={same_dir}") 

845 

846 # 🔄 MATERIALIZATION READ: Bulk preload if not reading from memory 

847 if read_backend != Backend.MEMORY.value: 

848 _bulk_preload_step_images(step_input_dir, step_output_dir, axis_id, read_backend, 

849 patterns_by_well,filemanager, microscope_handler, step_plan["zarr_config"]) 

850 

851 # 🔄 INPUT CONVERSION: Convert loaded input data to zarr if configured 

852 if "input_conversion_dir" in step_plan: 

853 input_conversion_dir = step_plan["input_conversion_dir"] 

854 input_conversion_backend = step_plan["input_conversion_backend"] 

855 

856 logger.info(f"Converting input data to zarr: {input_conversion_dir}") 

857 

858 # Get memory paths from input data (already loaded) 

859 memory_paths = get_paths_for_axis(step_input_dir, Backend.MEMORY.value) 

860 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

861 

862 # Generate conversion paths (input_dir → conversion_dir) 

863 conversion_paths = _generate_materialized_paths(memory_paths, Path(step_input_dir), Path(input_conversion_dir)) 

864 

865 # Parse actual filenames to determine dimensions 

866 # Calculate zarr dimensions from conversion paths (which contain the filenames) 

867 n_channels, n_z, n_fields = _calculate_zarr_dimensions(conversion_paths, context.microscope_handler) 

868 # Parse well to get row and column for zarr structure 

869 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id) 

870 

871 # Save using existing materialized data infrastructure 

872 _save_materialized_data(filemanager, memory_data, conversion_paths, input_conversion_backend, step_plan, context, axis_id) 

873 

874 logger.info(f"🔬 Converted {len(conversion_paths)} input files to {input_conversion_dir}") 

875 

876 # 🔍 VRAM TRACKING: Log memory at step start 

877 try: 

878 from openhcs.core.memory.gpu_cleanup import log_gpu_memory_usage 

879 log_gpu_memory_usage(f"step {step_name} start") 

880 except ImportError: 

881 pass # GPU cleanup not available 

882 

883 

884 

885 log_gpu_memory_usage(f"step {step_name} start") 

886 except Exception: 

887 pass 

888 

889 logger.info(f"🔥 STEP: Starting processing for '{step_name}' well {axis_id} (group_by={group_by.name if group_by else None}, variable_components={[vc.name for vc in variable_components] if variable_components else []})") 

890 

891 if axis_id not in patterns_by_well: 

892 raise ValueError( 

893 f"No patterns detected for well '{axis_id}' in step '{step_name}' (index: {step_index}). " 

894 f"This indicates either: (1) no image files found for this well, " 

895 f"(2) image files don't match the expected naming pattern, or " 

896 f"(3) pattern detection failed. Check input directory: {step_input_dir}" 

897 ) 

898 

899 if isinstance(patterns_by_well[axis_id], dict): 

900 # Grouped patterns (when group_by is set) 

901 for comp_val, pattern_list in patterns_by_well[axis_id].items(): 

902 logger.debug(f"🔥 STEP: Component '{comp_val}' has {len(pattern_list)} patterns: {pattern_list}") 

903 else: 

904 # Ungrouped patterns (when group_by is None) 

905 logger.debug(f"🔥 STEP: Found {len(patterns_by_well[axis_id])} ungrouped patterns: {patterns_by_well[axis_id]}") 

906 

907 if func_from_plan is None: 

908 raise ValueError(f"Step plan missing 'func' for step: {step_plan.get('step_name', 'Unknown')} (index: {step_index})") 

909 

910 grouped_patterns, comp_to_funcs, comp_to_base_args = prepare_patterns_and_functions( 

911 patterns_by_well[axis_id], func_from_plan, component=group_by.value if group_by else None 

912 ) 

913 

914 logger.info(f"🔍 DICT_PATTERN: grouped_patterns keys: {list(grouped_patterns.keys())}") 

915 logger.info(f"🔍 DICT_PATTERN: comp_to_funcs keys: {list(comp_to_funcs.keys())}") 

916 logger.info(f"🔍 DICT_PATTERN: func_from_plan type: {type(func_from_plan)}") 

917 if isinstance(func_from_plan, dict): 

918 logger.info(f"🔍 DICT_PATTERN: func_from_plan keys: {list(func_from_plan.keys())}") 

919 

920 for comp_val, current_pattern_list in grouped_patterns.items(): 

921 logger.info(f"🔍 DICT_PATTERN: Processing component '{comp_val}' with {len(current_pattern_list)} patterns") 

922 exec_func_or_chain = comp_to_funcs[comp_val] 

923 base_kwargs = comp_to_base_args[comp_val] 

924 logger.info(f"🔍 DICT_PATTERN: Component '{comp_val}' exec_func_or_chain: {exec_func_or_chain}") 

925 for pattern_item in current_pattern_list: 

926 _process_single_pattern_group( 

927 context, pattern_item, exec_func_or_chain, base_kwargs, 

928 step_input_dir, step_output_dir, axis_id, comp_val, 

929 read_backend, write_backend, input_mem_type, output_mem_type, 

930 device_id, same_dir, 

931 special_inputs, special_outputs, # Pass the maps from step_plan 

932 step_plan["zarr_config"], 

933 variable_components, step_index # Pass step_index for funcplan lookup 

934 ) 

935 logger.info(f"🔥 STEP: Completed processing for '{step_name}' well {axis_id}.") 

936 

937 # 📄 MATERIALIZATION WRITE: Only if not writing to memory 

938 if write_backend != Backend.MEMORY.value: 

939 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value) 

940 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

941 # Calculate zarr dimensions (ignored by non-zarr backends) 

942 n_channels, n_z, n_fields = _calculate_zarr_dimensions(memory_paths, context.microscope_handler) 

943 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id) 

944 filemanager.ensure_directory(step_output_dir, write_backend) 

945 filemanager.save_batch(memory_data, memory_paths, write_backend, 

946 chunk_name=axis_id, zarr_config=step_plan["zarr_config"], 

947 n_channels=n_channels, n_z=n_z, n_fields=n_fields, 

948 row=row, col=col) 

949 

950 # 📄 PER-STEP MATERIALIZATION: Additional materialized output if configured 

951 if "materialized_output_dir" in step_plan: 

952 materialized_output_dir = step_plan["materialized_output_dir"] 

953 materialized_backend = step_plan["materialized_backend"] 

954 

955 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value) 

956 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value) 

957 materialized_paths = _generate_materialized_paths(memory_paths, step_output_dir, Path(materialized_output_dir)) 

958 

959 filemanager.ensure_directory(materialized_output_dir, materialized_backend) 

960 _save_materialized_data(filemanager, memory_data, materialized_paths, materialized_backend, step_plan, context, axis_id) 

961 

962 logger.info(f"🔬 Materialized {len(materialized_paths)} files to {materialized_output_dir}") 

963 

964 # 📄 STREAMING: Execute all configured streaming backends 

965 from openhcs.core.config import StreamingConfig 

966 

967 streaming_configs_found = [] 

968 for key, config_instance in step_plan.items(): 

969 if isinstance(config_instance, StreamingConfig): 

970 streaming_configs_found.append((key, config_instance)) 

971 

972 for key, config_instance in streaming_configs_found: 

973 # Get paths at runtime like materialization does 

974 step_output_dir = step_plan["output_dir"] 

975 get_paths_for_axis = step_plan["get_paths_for_axis"] # Get the path getter from step_plan 

976 streaming_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value) 

977 streaming_data = filemanager.load_batch(streaming_paths, Backend.MEMORY.value) 

978 kwargs = config_instance.get_streaming_kwargs(context) # Pass context for microscope handler access 

979 

980 # Add step information for proper layer naming 

981 kwargs["step_index"] = step_index 

982 kwargs["step_name"] = step_name 

983 

984 # Execute streaming - backend from config enum 

985 filemanager.save_batch(streaming_data, streaming_paths, config_instance.backend.value, **kwargs) 

986 logger.info(f"🔍 {config_instance.backend.name}: Streamed {len(streaming_paths)} files for step {step_name}") 

987 

988 logger.info(f"FunctionStep {step_index} ({step_name}) completed for well {axis_id}.") 

989 

990 # 📄 OPENHCS METADATA: Create metadata file automatically after step completion 

991 # Track which backend was actually used for writing files 

992 actual_write_backend = step_plan['write_backend'] 

993 

994 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator 

995 metadata_generator = OpenHCSMetadataGenerator(context.filemanager) 

996 

997 # Main step output metadata 

998 is_pipeline_output = (actual_write_backend != Backend.MEMORY.value) 

999 metadata_generator.create_metadata( 

1000 context, 

1001 step_plan['output_dir'], 

1002 actual_write_backend, 

1003 is_main=is_pipeline_output, 

1004 plate_root=step_plan['output_plate_root'], 

1005 sub_dir=step_plan['sub_dir'] 

1006 ) 

1007 

1008 # 📄 MATERIALIZED METADATA: Create metadata for materialized directory if it exists 

1009 if 'materialized_output_dir' in step_plan: 

1010 materialized_backend = step_plan.get('materialized_backend', actual_write_backend) 

1011 metadata_generator.create_metadata( 

1012 context, 

1013 step_plan['materialized_output_dir'], 

1014 materialized_backend, 

1015 is_main=False, 

1016 plate_root=step_plan['materialized_plate_root'], 

1017 sub_dir=step_plan['materialized_sub_dir'] 

1018 ) 

1019 

1020 # SPECIAL DATA MATERIALIZATION 

1021 special_outputs = step_plan.get('special_outputs', {}) 

1022 logger.debug(f"🔍 MATERIALIZATION: special_outputs from step_plan: {special_outputs}") 

1023 logger.debug(f"🔍 MATERIALIZATION: special_outputs is empty? {not special_outputs}") 

1024 if special_outputs: 

1025 logger.info(f"🔬 MATERIALIZATION: Starting materialization for {len(special_outputs)} special outputs") 

1026 self._materialize_special_outputs(filemanager, step_plan, special_outputs) 

1027 logger.info(f"🔬 MATERIALIZATION: Completed materialization") 

1028 else: 

1029 logger.debug(f"🔍 MATERIALIZATION: No special outputs to materialize") 

1030 

1031 

1032 

1033 except Exception as e: 

1034 import traceback 

1035 full_traceback = traceback.format_exc() 

1036 logger.error(f"Error in FunctionStep {step_index} ({step_name}): {e}", exc_info=True) 

1037 logger.error(f"Full traceback for FunctionStep {step_index} ({step_name}):\n{full_traceback}") 

1038 

1039 

1040 

1041 raise 

1042 

1043 

1044 def _extract_component_metadata(self, context: 'ProcessingContext', component: 'VariableComponents') -> Optional[Dict[str, str]]: 

1045 """ 

1046 Extract component metadata from context cache safely. 

1047 

1048 Args: 

1049 context: ProcessingContext containing metadata_cache 

1050 component: VariableComponents enum specifying which component to extract 

1051 

1052 Returns: 

1053 Dictionary mapping component keys to display names, or None if not available 

1054 """ 

1055 try: 

1056 if hasattr(context, 'metadata_cache') and context.metadata_cache: 

1057 return context.metadata_cache.get(component, None) 

1058 else: 

1059 logger.debug(f"No metadata_cache available in context for {component.value}") 

1060 return None 

1061 except Exception as e: 

1062 logger.debug(f"Error extracting {component.value} metadata from cache: {e}") 

1063 return None 

1064 

1065 def _create_openhcs_metadata_for_materialization( 

1066 self, 

1067 context: 'ProcessingContext', 

1068 output_dir: str, 

1069 write_backend: str 

1070 ) -> None: 

1071 """ 

1072 Create OpenHCS metadata file for materialization writes. 

1073 

1074 Args: 

1075 context: ProcessingContext containing microscope_handler and other state 

1076 output_dir: Output directory path where metadata should be written 

1077 write_backend: Backend being used for the write (disk/zarr) 

1078 """ 

1079 # Check if this is a materialization write (disk/zarr) - memory writes don't need metadata 

1080 if write_backend == Backend.MEMORY.value: 

1081 logger.debug(f"Skipping metadata creation (memory write)") 

1082 return 

1083 

1084 logger.debug(f"Creating metadata for materialization write: {write_backend} -> {output_dir}") 

1085 

1086 try: 

1087 # Extract required information 

1088 step_output_dir = Path(output_dir) 

1089 

1090 # Check if we have microscope handler for metadata extraction 

1091 if not context.microscope_handler: 

1092 logger.debug("No microscope_handler in context - skipping OpenHCS metadata creation") 

1093 return 

1094 

1095 # Get source microscope information 

1096 source_parser_name = context.microscope_handler.parser.__class__.__name__ 

1097 

1098 # Extract metadata from source microscope handler 

1099 try: 

1100 grid_dimensions = context.microscope_handler.metadata_handler.get_grid_dimensions(context.input_dir) 

1101 pixel_size = context.microscope_handler.metadata_handler.get_pixel_size(context.input_dir) 

1102 except Exception as e: 

1103 logger.debug(f"Could not extract grid_dimensions/pixel_size from source: {e}") 

1104 grid_dimensions = [1, 1] # Default fallback 

1105 pixel_size = 1.0 # Default fallback 

1106 

1107 # Get list of image files in output directory 

1108 try: 

1109 image_files = [] 

1110 if context.filemanager.exists(str(step_output_dir), write_backend): 

1111 # List files in output directory 

1112 files = context.filemanager.list_files(str(step_output_dir), write_backend) 

1113 # Filter for image files (common extensions) and convert to strings 

1114 image_extensions = {'.tif', '.tiff', '.png', '.jpg', '.jpeg'} 

1115 image_files = [str(f) for f in files if Path(f).suffix.lower() in image_extensions] 

1116 logger.debug(f"Found {len(image_files)} image files in {step_output_dir}") 

1117 except Exception as e: 

1118 logger.debug(f"Could not list image files in output directory: {e}") 

1119 image_files = [] 

1120 

1121 # Detect available backends based on actual output files 

1122 available_backends = self._detect_available_backends(step_output_dir) 

1123 

1124 # Create metadata structure 

1125 metadata = { 

1126 "microscope_handler_name": context.microscope_handler.microscope_type, 

1127 "source_filename_parser_name": source_parser_name, 

1128 "grid_dimensions": list(grid_dimensions) if hasattr(grid_dimensions, '__iter__') else [1, 1], 

1129 "pixel_size": float(pixel_size) if pixel_size is not None else 1.0, 

1130 "image_files": image_files, 

1131 "channels": self._extract_component_metadata(context, VariableComponents.CHANNEL), 

1132 "wells": self._extract_component_metadata(context, VariableComponents.WELL), 

1133 "sites": self._extract_component_metadata(context, VariableComponents.SITE), 

1134 "z_indexes": self._extract_component_metadata(context, VariableComponents.Z_INDEX), 

1135 "available_backends": available_backends 

1136 } 

1137 

1138 # Save metadata file using disk backend (JSON files always on disk) 

1139 from openhcs.microscopes.openhcs import OpenHCSMetadataHandler 

1140 metadata_path = step_output_dir / OpenHCSMetadataHandler.METADATA_FILENAME 

1141 

1142 # Always ensure we can write to the metadata path (delete if exists) 

1143 if context.filemanager.exists(str(metadata_path), Backend.DISK.value): 

1144 context.filemanager.delete(str(metadata_path), Backend.DISK.value) 

1145 

1146 # Ensure output directory exists on disk 

1147 context.filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value) 

1148 

1149 # Create JSON content - OpenHCS handler expects JSON format 

1150 import json 

1151 json_content = json.dumps(metadata, indent=2) 

1152 context.filemanager.save(json_content, str(metadata_path), Backend.DISK.value) 

1153 logger.debug(f"Created OpenHCS metadata file (disk): {metadata_path}") 

1154 

1155 except Exception as e: 

1156 # Graceful degradation - log error but don't fail the step 

1157 logger.warning(f"Failed to create OpenHCS metadata file: {e}") 

1158 logger.debug(f"OpenHCS metadata creation error details:", exc_info=True) 

1159 

1160 def _detect_available_backends(self, output_dir: Path) -> Dict[str, bool]: 

1161 """Detect which storage backends are actually available based on output files.""" 

1162 

1163 backends = {Backend.ZARR.value: False, Backend.DISK.value: False} 

1164 

1165 # Check for zarr stores 

1166 if list(output_dir.glob("*.zarr")): 

1167 backends[Backend.ZARR.value] = True 

1168 

1169 # Check for image files 

1170 for ext in DEFAULT_IMAGE_EXTENSIONS: 

1171 if list(output_dir.glob(f"*{ext}")): 

1172 backends[Backend.DISK.value] = True 

1173 break 

1174 

1175 logger.debug(f"Backend detection result: {backends}") 

1176 return backends 

1177 

1178 

1179 def _materialize_special_outputs(self, filemanager, step_plan, special_outputs): 

1180 """Load special data from memory and call materialization functions.""" 

1181 logger.debug(f"🔍 MATERIALIZE_METHOD: Processing {len(special_outputs)} special outputs") 

1182 

1183 for output_key, output_info in special_outputs.items(): 

1184 logger.debug(f"🔍 MATERIALIZE_METHOD: Processing output_key: {output_key}") 

1185 logger.debug(f"🔍 MATERIALIZE_METHOD: output_info: {output_info}") 

1186 

1187 mat_func = output_info.get('materialization_function') 

1188 logger.debug(f"🔍 MATERIALIZE_METHOD: materialization_function: {mat_func}") 

1189 

1190 if mat_func: 

1191 path = output_info['path'] 

1192 logger.info(f"🔬 MATERIALIZING: {output_key} from {path}") 

1193 

1194 try: 

1195 filemanager.ensure_directory(Path(path).parent, Backend.MEMORY.value) 

1196 special_data = filemanager.load(path, Backend.MEMORY.value) 

1197 logger.debug(f"🔍 MATERIALIZE_METHOD: Loaded special data type: {type(special_data)}") 

1198 

1199 result_path = mat_func(special_data, path, filemanager) 

1200 logger.info(f"🔬 MATERIALIZED: {output_key}{result_path}") 

1201 

1202 except Exception as e: 

1203 logger.error(f"🔬 MATERIALIZATION ERROR: Failed to materialize {output_key}: {e}") 

1204 raise 

1205 else: 

1206 logger.warning(f"🔬 MATERIALIZATION: No materialization function for {output_key}, skipping") 

1207 

1208 

1209 

1210