Coverage for openhcs/core/steps/function_step.py: 75.2%
460 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2FunctionStep implementation for pattern-based processing.
4This module contains the FunctionStep class. During execution, FunctionStep instances
5are stateless regarding their configuration. All operational parameters, including
6the function(s) to execute, special input/output keys, their VFS paths, and memory types,
7are retrieved from this step's entry in `context.step_plans`.
8"""
10import logging
11import os
12import time
13import gc
14import json
15import shutil
16from functools import partial
17from pathlib import Path
18from typing import Any, Callable, Dict, List, Optional, Tuple, Union, OrderedDict as TypingOrderedDict, TYPE_CHECKING
20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true
21 from openhcs.core.config import PathPlanningConfig
23from openhcs.constants.constants import (DEFAULT_IMAGE_EXTENSION,
24 DEFAULT_IMAGE_EXTENSIONS,
25 DEFAULT_SITE_PADDING, Backend,
26 MemoryType, VariableComponents, GroupBy)
27from openhcs.constants.input_source import InputSource
28from openhcs.core.context.processing_context import ProcessingContext
29from openhcs.core.steps.abstract import AbstractStep, get_step_id
30from openhcs.formats.func_arg_prep import prepare_patterns_and_functions
31from openhcs.core.memory.stack_utils import stack_slices, unstack_slices
32# OpenHCS imports moved to local imports to avoid circular dependencies
34logger = logging.getLogger(__name__)
36def _generate_materialized_paths(memory_paths: List[str], step_output_dir: Path, materialized_output_dir: Path) -> List[str]:
37 """Generate materialized file paths by replacing step output directory."""
38 materialized_paths = []
39 for memory_path in memory_paths:
40 relative_path = Path(memory_path).relative_to(step_output_dir)
41 materialized_path = materialized_output_dir / relative_path
42 materialized_paths.append(str(materialized_path))
43 return materialized_paths
46def _save_materialized_data(filemanager, memory_data: List, materialized_paths: List[str],
47 materialized_backend: str, step_plan: Dict, context, well_id: str) -> None:
48 """Save data to materialized location using appropriate backend."""
49 if materialized_backend == Backend.ZARR.value:
50 n_channels, n_z, n_fields = _calculate_zarr_dimensions(materialized_paths, context.microscope_handler)
51 row, col = context.microscope_handler.parser.extract_row_column(well_id)
52 filemanager.save_batch(memory_data, materialized_paths, materialized_backend,
53 chunk_name=well_id, zarr_config=step_plan.get("zarr_config"),
54 n_channels=n_channels, n_z=n_z, n_fields=n_fields,
55 row=row, col=col)
56 else:
57 filemanager.save_batch(memory_data, materialized_paths, materialized_backend)
62def get_all_image_paths(input_dir, backend, well_id, filemanager, microscope_handler):
63 """
64 Get all image file paths for a specific well from a directory.
66 Args:
67 input_dir: Directory to search for images
68 well_id: Well identifier to filter files
69 backend: Backend to use for file listing
70 filemanager: FileManager instance
71 microscope_handler: Microscope handler with parser for filename parsing
73 Returns:
74 List of full file paths for the well
75 """
76 # List all image files in directory
77 all_image_files = filemanager.list_image_files(str(input_dir), backend)
79 # Filter by well using parser (FIXED: was using naive string matching)
80 well_files = []
81 parser = microscope_handler.parser
83 for f in all_image_files:
84 filename = os.path.basename(str(f))
85 metadata = parser.parse_filename(filename)
86 if metadata and metadata.get('well') == well_id:
87 well_files.append(str(f))
89 # Remove duplicates and sort
90 sorted_files = sorted(list(set(well_files)))
92 # Prepare full file paths
93 full_file_paths = [str(input_dir / Path(f).name) for f in sorted_files]
95 logger.debug(f"Found {len(all_image_files)} total files, {len(full_file_paths)} for well {well_id}")
97 return full_file_paths
100def create_image_path_getter(well_id, filemanager, microscope_handler):
101 """
102 Create a specialized image path getter function using runtime context.
104 Args:
105 well_id: Well identifier
106 filemanager: FileManager instance
107 microscope_handler: Microscope handler with parser for filename parsing
109 Returns:
110 Function that takes (input_dir, backend) and returns image paths for the well
111 """
112 def get_paths_for_well(input_dir, backend):
113 return get_all_image_paths(
114 input_dir=input_dir,
115 well_id=well_id,
116 backend=backend,
117 filemanager=filemanager,
118 microscope_handler=microscope_handler
119 )
120 return get_paths_for_well
122# Environment variable to disable universal GPU defragmentation
123DISABLE_GPU_DEFRAG = os.getenv('OPENHCS_DISABLE_GPU_DEFRAG', 'false').lower() == 'true'
125def _bulk_preload_step_images(
126 step_input_dir: Path,
127 step_output_dir: Path,
128 well_id: str,
129 read_backend: str,
130 patterns_by_well: Dict[str, Any],
131 filemanager: 'FileManager',
132 microscope_handler: 'MicroscopeHandler',
133 zarr_config: Optional[Dict[str, Any]] = None
134) -> None:
135 """
136 Pre-load all images for this step from source backend into memory backend.
138 This reduces I/O overhead by doing a single bulk read operation
139 instead of loading images per pattern group.
141 Note: External conditional logic ensures this is only called for non-memory backends.
142 """
143 import time
144 start_time = time.time()
146 logger.debug(f"🔄 BULK PRELOAD: Loading images from {read_backend} to memory for well {well_id}")
148 # Get all files for this well from patterns
149 all_files = []
150 # Create specialized path getter for this well
151 get_paths_for_well = create_image_path_getter(well_id, filemanager, microscope_handler)
153 # Get all image paths for this well
154 full_file_paths = get_paths_for_well(step_input_dir, read_backend)
156 if not full_file_paths: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 raise RuntimeError(f"🔄 BULK PRELOAD: No files found for well {well_id} in {step_input_dir} with backend {read_backend}")
159 # Load from source backend with conditional zarr_config
160 if read_backend == Backend.ZARR.value:
161 raw_images = filemanager.load_batch(full_file_paths, read_backend, zarr_config=zarr_config)
162 else:
163 raw_images = filemanager.load_batch(full_file_paths, read_backend)
165 # Ensure directory exists in memory backend before saving
166 filemanager.ensure_directory(str(step_input_dir), Backend.MEMORY.value)
168 # Save to memory backend using OUTPUT paths
169 # memory_paths = [str(step_output_dir / Path(fp).name) for fp in full_file_paths]
170 for file_path in full_file_paths:
171 if filemanager.exists(file_path, Backend.MEMORY.value):
172 filemanager.delete(file_path, Backend.MEMORY.value)
173 logger.debug(f"🔄 BULK PRELOAD: Deleted existing file {file_path} before bulk preload")
175 filemanager.save_batch(raw_images, full_file_paths, Backend.MEMORY.value)
176 logger.debug(f"🔄 BULK PRELOAD: Saving {file_path} to memory")
178 # Clean up source references - keep only memory backend references
179 del raw_images
181 load_time = time.time() - start_time
182 logger.debug(f"🔄 BULK PRELOAD: Completed in {load_time:.2f}s - {len(full_file_paths)} images now in memory")
184def _bulk_writeout_step_images(
185 step_output_dir: Path,
186 write_backend: str,
187 well_id: str,
188 zarr_config: Optional[Dict[str, Any]],
189 filemanager: 'FileManager',
190 microscope_handler: Optional[Any] = None
191) -> None:
192 """
193 Write all processed images from memory to final backend (disk/zarr).
195 This reduces I/O overhead by doing a single bulk write operation
196 instead of writing images per pattern group.
198 Note: External conditional logic ensures this is only called for non-memory backends.
199 """
200 import time
201 start_time = time.time()
203 logger.debug(f"🔄 BULK WRITEOUT: Writing images from memory to {write_backend} for well {well_id}")
205 # Create specialized path getter and get memory paths for this well
206 get_paths_for_well = create_image_path_getter(well_id, filemanager, microscope_handler)
207 memory_file_paths = get_paths_for_well(step_output_dir, Backend.MEMORY.value)
209 if not memory_file_paths:
210 raise RuntimeError(f"🔄 BULK WRITEOUT: No image files found for well {well_id} in memory directory {step_output_dir}")
212 # Convert relative memory paths back to absolute paths for target backend
213 # Memory backend stores relative paths, but target backend needs absolute paths
214# file_paths =
215# for memory_path in memory_file_paths:
216# # Get just the filename and construct proper target path
217# filename = Path(memory_path).name
218# target_path = step_output_dir / filename
219# file_paths.append(str(target_path))
221 file_paths = memory_file_paths
222 logger.debug(f"🔄 BULK WRITEOUT: Found {len(file_paths)} image files in memory to write")
224 # Load all data from memory backend
225 memory_data = filemanager.load_batch(file_paths, Backend.MEMORY.value)
227 # Ensure output directory exists before bulk write
228 filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value)
230 # Bulk write to target backend with conditional zarr_config
231 if write_backend == Backend.ZARR.value:
232 # Calculate zarr dimensions from file paths
233 if microscope_handler is not None:
234 n_channels, n_z, n_fields = _calculate_zarr_dimensions(file_paths, microscope_handler)
235 # Parse well to get row and column for zarr structure
236 row, col = microscope_handler.parser.extract_row_column(well_id)
237 filemanager.save_batch(memory_data, file_paths, write_backend,
238 chunk_name=well_id, zarr_config=zarr_config,
239 n_channels=n_channels, n_z=n_z, n_fields=n_fields,
240 row=row, col=col)
241 else:
242 # Fallback without dimensions if microscope_handler not available
243 filemanager.save_batch(memory_data, file_paths, write_backend, chunk_name=well_id, zarr_config=zarr_config)
244 else:
245 filemanager.save_batch(memory_data, file_paths, write_backend)
247 write_time = time.time() - start_time
248 logger.debug(f"🔄 BULK WRITEOUT: Completed in {write_time:.2f}s - {len(memory_data)} images written to {write_backend}")
250def _calculate_zarr_dimensions(file_paths: List[Union[str, Path]], microscope_handler) -> tuple[int, int, int]:
251 """
252 Calculate zarr dimensions (n_channels, n_z, n_fields) from file paths using microscope parser.
254 Args:
255 file_paths: List of file paths to analyze
256 microscope_handler: Microscope handler with filename parser
258 Returns:
259 Tuple of (n_channels, n_z, n_fields)
260 """
261 parsed_files = []
262 for file_path in file_paths:
263 filename = Path(file_path).name
264 metadata = microscope_handler.parser.parse_filename(filename)
265 parsed_files.append(metadata)
267 # Count unique values for each dimension from actual files
268 n_channels = len(set(f.get('channel') for f in parsed_files if f.get('channel') is not None))
269 n_z = len(set(f.get('z_index') for f in parsed_files if f.get('z_index') is not None))
270 n_fields = len(set(f.get('site') for f in parsed_files if f.get('site') is not None))
272 # Ensure at least 1 for each dimension (handle cases where metadata is missing)
273 n_channels = max(1, n_channels)
274 n_z = max(1, n_z)
275 n_fields = max(1, n_fields)
277 return n_channels, n_z, n_fields
281def _is_3d(array: Any) -> bool:
282 """Check if an array is 3D."""
283 return hasattr(array, 'ndim') and array.ndim == 3
285def _execute_function_core(
286 func_callable: Callable,
287 main_data_arg: Any,
288 base_kwargs: Dict[str, Any],
289 context: 'ProcessingContext',
290 special_inputs_plan: Dict[str, str], # {'arg_name_for_func': 'special_path_value'}
291 special_outputs_plan: TypingOrderedDict[str, str], # {'output_key': 'special_path_value'}, order matters
292 well_id: str, # Add well_id parameter
293 input_memory_type: str,
294 device_id: int
295) -> Any: # Returns the main processed data stack
296 """
297 Executes a single callable, handling its special I/O.
298 - Loads special inputs from VFS paths in `special_inputs_plan`.
299 - Calls `func_callable(main_data_arg, **all_kwargs)`.
300 - If `special_outputs_plan` is non-empty, expects func to return (main_out, sp_val1, sp_val2,...).
301 - Saves special outputs positionally to VFS paths in `special_outputs_plan`.
302 - Returns the main processed data stack.
303 """
304 final_kwargs = base_kwargs.copy()
306 if special_inputs_plan:
307 logger.info(f"�� SPECIAL_INPUTS_DEBUG : special_inputs_plan = {special_inputs_plan}")
308 for arg_name, path_info in special_inputs_plan.items():
309 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Processing arg_name='{arg_name}', path_info={path_info} (type: {type(path_info)})")
312 # Extract path string from the path info dictionary
313 # Current format: {"path": "/path/to/file.pkl", "source_step_id": "step_123"}
314 if isinstance(path_info, dict) and 'path' in path_info: 314 ↛ 318line 314 didn't jump to line 318 because the condition on line 314 was always true
315 special_path_value = path_info['path']
316 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Extracted path from dict: '{special_path_value}' (type: {type(special_path_value)})")
317 else:
318 special_path_value = path_info # Fallback if it's already a string
319 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Using path_info directly: '{special_path_value}' (type: {type(special_path_value)})")
321 logger.info(f"Loading special input '{arg_name}' from path '{special_path_value}' (memory backend)")
322 try:
323 final_kwargs[arg_name] = context.filemanager.load(special_path_value, Backend.MEMORY.value)
324 except Exception as e:
325 logger.error(f"Failed to load special input '{arg_name}' from '{special_path_value}': {e}", exc_info=True)
326 raise
328 # Auto-inject context if function signature expects it
329 import inspect
330 sig = inspect.signature(func_callable)
331 if 'context' in sig.parameters: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 final_kwargs['context'] = context
334 # 🔍 DEBUG: Log input dimensions
335 input_shape = getattr(main_data_arg, 'shape', 'no shape attr')
336 input_type = type(main_data_arg).__name__
337 logger.debug(f"🔍 FUNCTION INPUT: {func_callable.__name__} - shape: {input_shape}, type: {input_type}")
339 # ⚡ INFO: Terse function execution log for user feedback
340 logger.info(f"⚡ Executing: {func_callable.__name__}")
342 # 🔍 DEBUG: Log function attributes before execution
343 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - special_outputs: {getattr(func_callable, '__special_outputs__', 'None')}")
344 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - input_memory_type: {getattr(func_callable, 'input_memory_type', 'None')}")
345 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - output_memory_type: {getattr(func_callable, 'output_memory_type', 'None')}")
347 raw_function_output = func_callable(main_data_arg, **final_kwargs)
349 # 🔍 DEBUG: Log output dimensions and type details
350 output_shape = getattr(raw_function_output, 'shape', 'no shape attr')
351 output_type = type(raw_function_output).__name__
352 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - shape: {output_shape}, type: {output_type}")
354 # 🔍 DEBUG: If it's a tuple, log details about each element
355 if isinstance(raw_function_output, tuple):
356 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - tuple length: {len(raw_function_output)}")
357 for i, element in enumerate(raw_function_output):
358 elem_shape = getattr(element, 'shape', 'no shape attr')
359 elem_type = type(element).__name__
360 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - element[{i}]: shape={elem_shape}, type={elem_type}")
361 else:
362 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - not a tuple, single return value")
364 main_output_data = raw_function_output
366 # 🔍 DEBUG: Log special output plan status
367 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: {special_outputs_plan}")
368 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Is empty? {not special_outputs_plan}")
369 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Length: {len(special_outputs_plan) if special_outputs_plan else 0}")
371 # Only log special outputs if there are any (avoid spamming empty dict logs)
372 if special_outputs_plan:
373 logger.debug(f"🔍 SPECIAL OUTPUT: {special_outputs_plan}")
374 if special_outputs_plan:
375 num_special_outputs = len(special_outputs_plan)
376 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Expected {num_special_outputs} special outputs")
377 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned type: {type(raw_function_output)}")
378 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned tuple length: {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'not tuple'}")
380 if not isinstance(raw_function_output, tuple) or len(raw_function_output) != (1 + num_special_outputs): 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true
381 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Function '{getattr(func_callable, '__name__', 'unknown')}' special output mismatch")
382 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Expected tuple of {1 + num_special_outputs} values")
383 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Got {type(raw_function_output)} with {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'N/A'} values")
384 raise ValueError(
385 f"Function '{getattr(func_callable, '__name__', 'unknown')}' was expected to return a tuple of "
386 f"{1 + num_special_outputs} values (main_output + {num_special_outputs} special) "
387 f"based on 'special_outputs' in step plan, but returned {len(raw_function_output) if isinstance(raw_function_output, tuple) else type(raw_function_output)} values."
388 )
389 main_output_data = raw_function_output[0]
390 returned_special_values_tuple = raw_function_output[1:]
392 # 🔍 DEBUG: Log what we extracted
393 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data type: {type(main_output_data)}")
394 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data shape: {getattr(main_output_data, 'shape', 'no shape')}")
395 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted {len(returned_special_values_tuple)} special values")
397 # Iterate through special_outputs_plan (which must be ordered by compiler)
398 # and match with positionally returned special values.
399 for i, (output_key, vfs_path_info) in enumerate(special_outputs_plan.items()):
400 logger.info(f"Saving special output '{output_key}' to VFS path '{vfs_path_info}' (memory backend)")
401 if i < len(returned_special_values_tuple): 401 ↛ 424line 401 didn't jump to line 424 because the condition on line 401 was always true
402 value_to_save = returned_special_values_tuple[i]
403 # Extract path string from the path info dictionary
404 # Current format: {"path": "/path/to/file.pkl"}
405 if isinstance(vfs_path_info, dict) and 'path' in vfs_path_info: 405 ↛ 408line 405 didn't jump to line 408 because the condition on line 405 was always true
406 vfs_path = vfs_path_info['path']
407 else:
408 vfs_path = vfs_path_info # Fallback if it's already a string
409 # # Add well_id prefix to filename for memory backend to avoid thread collisions
410 # from pathlib import Path
411 # vfs_path_obj = Path(vfs_path)
412 # prefixed_filename = f"{well_id}_{vfs_path_obj.name}"
413 # prefixed_vfs_path = str(vfs_path_obj.parent / prefixed_filename)
415 logger.info(f"🔍 SPECIAL_SAVE: Saving '{output_key}' to '{vfs_path}' (memory backend)")
416 # Ensure directory exists for memory backend
417 parent_dir = str(Path(vfs_path).parent)
418 context.filemanager.ensure_directory(parent_dir, Backend.MEMORY.value)
419 context.filemanager.save(value_to_save, vfs_path, Backend.MEMORY.value)
420 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory")
421 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory")
422 else:
423 # This indicates a mismatch that should ideally be caught by schema/validation
424 logger.error(f"Mismatch: {num_special_outputs} special outputs planned, but fewer values returned by function for key '{output_key}'.")
425 # Or, if partial returns are allowed, this might be a warning. For now, error.
426 raise ValueError(f"Function did not return enough values for all planned special outputs. Missing value for '{output_key}'.")
428 return main_output_data
430def _execute_chain_core(
431 initial_data_stack: Any,
432 func_chain: List[Union[Callable, Tuple[Callable, Dict]]],
433 context: 'ProcessingContext',
434 step_special_inputs_plan: Dict[str, str],
435 step_special_outputs_plan: TypingOrderedDict[str, str],
436 well_id: str, # Add well_id parameter
437 device_id: int,
438 input_memory_type: str,
439 step_id: str, # Add step_id for funcplan lookup
440 dict_key: str = "default" # Add dict_key for funcplan lookup
441) -> Any:
442 current_stack = initial_data_stack
443 current_memory_type = input_memory_type # Track memory type from frozen context
445 for i, func_item in enumerate(func_chain):
446 actual_callable: Callable
447 base_kwargs_for_item: Dict[str, Any] = {}
448 is_last_in_chain = (i == len(func_chain) - 1)
450 if isinstance(func_item, tuple) and len(func_item) == 2 and callable(func_item[0]): 450 ↛ 452line 450 didn't jump to line 452 because the condition on line 450 was always true
451 actual_callable, base_kwargs_for_item = func_item
452 elif callable(func_item):
453 actual_callable = func_item
454 else:
455 raise TypeError(f"Invalid item in function chain: {func_item}.")
457 # Convert to function's input memory type (noop if same)
458 from openhcs.core.memory.converters import convert_memory
459 current_stack = convert_memory(
460 data=current_stack,
461 source_type=current_memory_type,
462 target_type=actual_callable.input_memory_type,
463 gpu_id=device_id,
464 allow_cpu_roundtrip=False
465 )
467 # Use funcplan to determine which outputs this function should save
468 funcplan = context.step_plans[step_id].get("funcplan", {})
469 func_name = getattr(actual_callable, '__name__', 'unknown')
471 # Construct execution key: function_name_dict_key_chain_position
472 execution_key = f"{func_name}_{dict_key}_{i}"
474 if execution_key in funcplan: 474 ↛ 476line 474 didn't jump to line 476 because the condition on line 474 was never true
475 # Get outputs this specific function should save
476 outputs_to_save = funcplan[execution_key]
477 outputs_plan_for_this_call = {
478 key: step_special_outputs_plan[key]
479 for key in outputs_to_save
480 if key in step_special_outputs_plan
481 }
482 logger.info(f"🔍 FUNCPLAN: {execution_key} -> {outputs_to_save}")
483 logger.info(f"🔍 FUNCPLAN: outputs_plan_for_this_call = {outputs_plan_for_this_call}")
484 else:
485 # Fallback: no funcplan entry, save nothing
486 outputs_plan_for_this_call = {}
487 logger.info(f"🔍 FUNCPLAN: No entry for {execution_key}, saving nothing")
489 current_stack = _execute_function_core(
490 func_callable=actual_callable,
491 main_data_arg=current_stack,
492 base_kwargs=base_kwargs_for_item,
493 context=context,
494 special_inputs_plan=step_special_inputs_plan,
495 special_outputs_plan=outputs_plan_for_this_call,
496 well_id=well_id,
497 device_id=device_id,
498 input_memory_type=input_memory_type,
499 )
501 # Update current memory type from frozen context
502 current_memory_type = actual_callable.output_memory_type
504 return current_stack
506def _process_single_pattern_group(
507 context: 'ProcessingContext',
508 pattern_group_info: Any,
509 executable_func_or_chain: Any,
510 base_func_args: Dict[str, Any],
511 step_input_dir: Path,
512 step_output_dir: Path,
513 well_id: str,
514 component_value: str,
515 read_backend: str,
516 write_backend: str,
517 input_memory_type_from_plan: str, # Explicitly from plan
518 output_memory_type_from_plan: str, # Explicitly from plan
519 device_id: Optional[int],
520 same_directory: bool,
521 special_inputs_map: Dict[str, str],
522 special_outputs_map: TypingOrderedDict[str, str],
523 zarr_config: Optional[Dict[str, Any]],
524 variable_components: Optional[List[str]] = None,
525 step_id: Optional[str] = None # Add step_id for funcplan lookup
526) -> None:
527 start_time = time.time()
528 pattern_repr = str(pattern_group_info)[:100]
529 logger.debug(f"🔥 PATTERN: Processing {pattern_repr} for well {well_id}")
531 try:
532 if not context.microscope_handler: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true
533 raise RuntimeError("MicroscopeHandler not available in context.")
535 matching_files = context.microscope_handler.path_list_from_pattern(
536 str(step_input_dir), pattern_group_info, context.filemanager, Backend.MEMORY.value,
537 [vc.value for vc in variable_components] if variable_components else None
538 )
540 if not matching_files: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 raise ValueError(
542 f"No matching files found for pattern group {pattern_repr} in {step_input_dir}. "
543 f"This indicates either: (1) no image files exist in the directory, "
544 f"(2) files don't match the pattern, or (3) pattern parsing failed. "
545 f"Check that input files exist and match the expected naming convention."
546 )
548 logger.debug(f"🔥 PATTERN: Found {len(matching_files)} files: {[Path(f).name for f in matching_files]}")
550 # Sort files to ensure consistent ordering (especially important for z-stacks)
551 matching_files.sort()
552 logger.debug(f"🔥 PATTERN: Sorted files: {[Path(f).name for f in matching_files]}")
554 full_file_paths = [str(step_input_dir / f) for f in matching_files]
555 raw_slices = context.filemanager.load_batch(full_file_paths, Backend.MEMORY.value)
557 if not raw_slices: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 raise ValueError(
559 f"No valid images loaded for pattern group {pattern_repr} in {step_input_dir}. "
560 f"Found {len(matching_files)} matching files but failed to load any valid images. "
561 f"This indicates corrupted image files, unsupported formats, or I/O errors. "
562 f"Check file integrity and format compatibility."
563 )
565 # 🔍 DEBUG: Log stacking operation
566 logger.debug(f"🔍 STACKING: {len(raw_slices)} slices → memory_type: {input_memory_type_from_plan}")
567 if raw_slices: 567 ↛ 571line 567 didn't jump to line 571 because the condition on line 567 was always true
568 slice_shapes = [getattr(s, 'shape', 'no shape') for s in raw_slices[:3]] # First 3 shapes
569 logger.debug(f"🔍 STACKING: Sample slice shapes: {slice_shapes}")
571 main_data_stack = stack_slices(
572 slices=raw_slices, memory_type=input_memory_type_from_plan, gpu_id=device_id
573 )
575 # 🔍 DEBUG: Log stacked result
576 stack_shape = getattr(main_data_stack, 'shape', 'no shape')
577 stack_type = type(main_data_stack).__name__
578 logger.debug(f"🔍 STACKED RESULT: shape: {stack_shape}, type: {stack_type}")
580 logger.info(f"🔍 special_outputs_map: {special_outputs_map}")
582 final_base_kwargs = base_func_args.copy()
584 # Get step function from step plan
585 step_func = context.step_plans[step_id]["func"]
587 if isinstance(step_func, dict): 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true
588 dict_key_for_funcplan = component_value # Use actual dict key for dict patterns
589 else:
590 dict_key_for_funcplan = "default" # Use default for list/single patterns
592 if isinstance(executable_func_or_chain, list):
593 processed_stack = _execute_chain_core(
594 main_data_stack, executable_func_or_chain, context,
595 special_inputs_map, special_outputs_map, well_id,
596 device_id, input_memory_type_from_plan, step_id, dict_key_for_funcplan
597 )
598 elif callable(executable_func_or_chain): 598 ↛ 605line 598 didn't jump to line 605 because the condition on line 598 was always true
599 # For single functions, we don't need chain execution, but we still need the right dict_key
600 processed_stack = _execute_function_core(
601 executable_func_or_chain, main_data_stack, final_base_kwargs, context,
602 special_inputs_map, special_outputs_map, well_id, input_memory_type_from_plan, device_id
603 )
604 else:
605 raise TypeError(f"Invalid executable_func_or_chain: {type(executable_func_or_chain)}")
607 # 🔍 DEBUG: Check what shape the function actually returned
608 input_shape = getattr(main_data_stack, 'shape', 'unknown')
609 output_shape = getattr(processed_stack, 'shape', 'unknown')
610 processed_type = type(processed_stack).__name__
611 logger.debug(f"🔍 PROCESSING RESULT: input: {input_shape} → output: {output_shape}, type: {processed_type}")
613 # 🔍 DEBUG: Additional validation logging
614 logger.debug(f"🔍 VALIDATION: processed_stack type: {type(processed_stack)}")
615 logger.debug(f"🔍 VALIDATION: processed_stack has shape attr: {hasattr(processed_stack, 'shape')}")
616 logger.debug(f"🔍 VALIDATION: processed_stack has ndim attr: {hasattr(processed_stack, 'ndim')}")
617 if hasattr(processed_stack, 'ndim'): 617 ↛ 619line 617 didn't jump to line 619 because the condition on line 617 was always true
618 logger.debug(f"🔍 VALIDATION: processed_stack ndim: {processed_stack.ndim}")
619 if hasattr(processed_stack, 'shape'): 619 ↛ 622line 619 didn't jump to line 622 because the condition on line 619 was always true
620 logger.debug(f"🔍 VALIDATION: processed_stack shape: {processed_stack.shape}")
622 if not _is_3d(processed_stack): 622 ↛ 623line 622 didn't jump to line 623 because the condition on line 622 was never true
623 logger.error(f"🔍 VALIDATION ERROR: processed_stack is not 3D")
624 logger.error(f"🔍 VALIDATION ERROR: Type: {type(processed_stack)}")
625 logger.error(f"🔍 VALIDATION ERROR: Shape: {getattr(processed_stack, 'shape', 'no shape attr')}")
626 logger.error(f"🔍 VALIDATION ERROR: Has ndim: {hasattr(processed_stack, 'ndim')}")
627 if hasattr(processed_stack, 'ndim'):
628 logger.error(f"🔍 VALIDATION ERROR: ndim value: {processed_stack.ndim}")
629 raise ValueError(f"Main processing must result in a 3D array, got {getattr(processed_stack, 'shape', 'unknown')}")
631 # 🔍 DEBUG: Log unstacking operation
632 logger.debug(f"🔍 UNSTACKING: shape: {output_shape} → memory_type: {output_memory_type_from_plan}")
636 output_slices = unstack_slices(
637 array=processed_stack, memory_type=output_memory_type_from_plan, gpu_id=device_id, validate_slices=True
638 )
640 # 🔍 DEBUG: Log unstacked result
641 if output_slices: 641 ↛ 647line 641 didn't jump to line 647 because the condition on line 641 was always true
642 unstacked_shapes = [getattr(s, 'shape', 'no shape') for s in output_slices[:3]] # First 3 shapes
643 logger.debug(f"🔍 UNSTACKED RESULT: {len(output_slices)} slices, sample shapes: {unstacked_shapes}")
645 # Handle cases where function returns fewer images than inputs (e.g., z-stack flattening, channel compositing)
646 # In such cases, we save only the returned images using the first N input filenames
647 num_outputs = len(output_slices)
648 num_inputs = len(matching_files)
650 if num_outputs < num_inputs:
651 logger.debug(f"Function returned {num_outputs} images from {num_inputs} inputs - likely flattening operation")
652 elif num_outputs > num_inputs: 652 ↛ 653line 652 didn't jump to line 653 because the condition on line 652 was never true
653 logger.warning(f"Function returned more images ({num_outputs}) than inputs ({num_inputs}) - unexpected")
655 # Save the output images using batch operations
656 try:
657 # Prepare batch data
658 output_data = []
659 output_paths_batch = []
661 for i, img_slice in enumerate(output_slices):
662 # FAIL FAST: No fallback filenames - if we have more outputs than inputs, something is wrong
663 if i >= len(matching_files): 663 ↛ 664line 663 didn't jump to line 664 because the condition on line 663 was never true
664 raise ValueError(
665 f"Function returned {num_outputs} output slices but only {num_inputs} input files available. "
666 f"Cannot generate filename for output slice {i}. This indicates a bug in the function or "
667 f"unstacking logic - functions should return same or fewer images than inputs."
668 )
670 input_filename = matching_files[i]
671 output_filename = Path(input_filename).name
672 output_path = Path(step_output_dir) / output_filename
674 # Always ensure we can write to the output path (delete if exists)
675 if context.filemanager.exists(str(output_path), Backend.MEMORY.value):
676 context.filemanager.delete(str(output_path), Backend.MEMORY.value)
678 output_data.append(img_slice)
679 output_paths_batch.append(str(output_path))
681 # Ensure directory exists
682 context.filemanager.ensure_directory(str(step_output_dir), Backend.MEMORY.value)
684 # Only pass zarr_config to zarr backend - fail loud for invalid parameters
685 #if write_backend == Backend.ZARR.value:
686 # Batch save
687 # context.filemanager.save_batch(output_data, output_paths_batch, write_backend, zarr_config=zarr_config)
688 # else:
689 context.filemanager.save_batch(output_data, output_paths_batch, Backend.MEMORY.value)
691 except Exception as e:
692 logger.error(f"Error saving batch of output slices for pattern {pattern_repr}: {e}", exc_info=True)
694 # 🔥 CLEANUP: If function returned fewer images than inputs, delete the unused input files
695 # This prevents unused channel files from remaining in memory after compositing
696 if num_outputs < num_inputs:
697 for j in range(num_outputs, num_inputs):
698 unused_input_filename = matching_files[j]
699 unused_input_path = Path(step_input_dir) / unused_input_filename
700 if context.filemanager.exists(str(unused_input_path), Backend.MEMORY.value): 700 ↛ 697line 700 didn't jump to line 697 because the condition on line 700 was always true
701 context.filemanager.delete(str(unused_input_path), Backend.MEMORY.value)
702 logger.debug(f"🔥 CLEANUP: Deleted unused input file: {unused_input_filename}")
706 logger.debug(f"Finished pattern group {pattern_repr} in {(time.time() - start_time):.2f}s.")
707 except Exception as e:
708 import traceback
709 full_traceback = traceback.format_exc()
710 logger.error(f"Error processing pattern group {pattern_repr}: {e}", exc_info=True)
711 logger.error(f"Full traceback for pattern group {pattern_repr}:\n{full_traceback}")
712 raise ValueError(f"Failed to process pattern group {pattern_repr}: {e}") from e
714class FunctionStep(AbstractStep):
716 def __init__(
717 self,
718 func: Union[Callable, Tuple[Callable, Dict], List[Union[Callable, Tuple[Callable, Dict]]]],
719 **kwargs
720 ):
721 # Generate default name from function if not provided
722 if 'name' not in kwargs or kwargs['name'] is None:
723 actual_func_for_name = func
724 if isinstance(func, tuple): 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true
725 actual_func_for_name = func[0]
726 elif isinstance(func, list) and func: 726 ↛ 727line 726 didn't jump to line 727 because the condition on line 726 was never true
727 first_item = func[0]
728 if isinstance(first_item, tuple):
729 actual_func_for_name = first_item[0]
730 elif callable(first_item):
731 actual_func_for_name = first_item
732 kwargs['name'] = getattr(actual_func_for_name, '__name__', 'FunctionStep')
734 super().__init__(**kwargs)
735 self.func = func # This is used by prepare_patterns_and_functions at runtime
737 def process(self, context: 'ProcessingContext') -> None:
738 # Generate step_id from object reference (elegant stateless approach)
739 step_id = get_step_id(self)
740 step_plan = context.step_plans[step_id]
742 # Get step name for logging
743 step_name = step_plan['step_name']
745 try:
746 well_id = step_plan['well_id']
747 step_input_dir = Path(step_plan['input_dir'])
748 step_output_dir = Path(step_plan['output_dir'])
749 variable_components = step_plan['variable_components']
750 group_by = step_plan['group_by']
751 func_from_plan = step_plan['func']
753 # special_inputs/outputs are dicts: {'key': 'vfs_path_value'}
754 special_inputs = step_plan['special_inputs']
755 special_outputs = step_plan['special_outputs'] # Should be OrderedDict if order matters
757 read_backend = step_plan['read_backend']
758 write_backend = step_plan['write_backend']
759 input_mem_type = step_plan['input_memory_type']
760 output_mem_type = step_plan['output_memory_type']
761 microscope_handler = context.microscope_handler
762 filemanager = context.filemanager
764 # Create path getter for this well
765 get_paths_for_well = create_image_path_getter(well_id, filemanager, microscope_handler)
767 # Get patterns first for bulk preload
768 patterns_by_well = microscope_handler.auto_detect_patterns(
769 str(step_input_dir), # folder_path
770 filemanager, # filemanager
771 read_backend, # backend
772 well_filter=[well_id], # well_filter
773 extensions=DEFAULT_IMAGE_EXTENSIONS, # extensions
774 group_by=group_by.value if group_by else None, # group_by
775 variable_components=[vc.value for vc in variable_components] if variable_components else [] # variable_components
776 )
779 # Only access gpu_id if the step requires GPU (has GPU memory types)
780 from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES
781 requires_gpu = (input_mem_type in VALID_GPU_MEMORY_TYPES or
782 output_mem_type in VALID_GPU_MEMORY_TYPES)
784 # Ensure variable_components is never None - use default if missing
785 if variable_components is None: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true
786 variable_components = [VariableComponents.SITE] # Default fallback
787 logger.warning(f"Step {step_id} ({step_name}) had None variable_components, using default [SITE]")
788 if requires_gpu: 788 ↛ 789line 788 didn't jump to line 789 because the condition on line 788 was never true
789 device_id = step_plan['gpu_id']
790 logger.debug(f"🔥 DEBUG: Step {step_id} gpu_id from plan: {device_id}, input_mem: {input_mem_type}, output_mem: {output_mem_type}")
791 else:
792 device_id = None # CPU-only step
793 logger.debug(f"🔥 DEBUG: Step {step_id} is CPU-only, input_mem: {input_mem_type}, output_mem: {output_mem_type}")
795 logger.debug(f"🔥 DEBUG: Step {step_id} read_backend: {read_backend}, write_backend: {write_backend}")
797 if not all([well_id, step_input_dir, step_output_dir]): 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true
798 raise ValueError(f"Plan missing essential keys for step {step_id}")
800 same_dir = str(step_input_dir) == str(step_output_dir)
801 logger.info(f"Step {step_id} ({step_name}) I/O: read='{read_backend}', write='{write_backend}'.")
802 logger.info(f"Step {step_id} ({step_name}) Paths: input_dir='{step_input_dir}', output_dir='{step_output_dir}', same_dir={same_dir}")
804 # 🔄 MATERIALIZATION READ: Bulk preload if not reading from memory
805 if read_backend != Backend.MEMORY.value:
806 _bulk_preload_step_images(step_input_dir, step_output_dir, well_id, read_backend,
807 patterns_by_well,filemanager, microscope_handler, step_plan["zarr_config"])
809 # 🔄 INPUT CONVERSION: Convert loaded input data to zarr if configured
810 if "input_conversion_dir" in step_plan:
811 input_conversion_dir = step_plan["input_conversion_dir"]
812 input_conversion_backend = step_plan["input_conversion_backend"]
814 logger.info(f"Converting input data to zarr: {input_conversion_dir}")
816 # Get memory paths from input data (already loaded)
817 memory_paths = get_paths_for_well(step_input_dir, Backend.MEMORY.value)
818 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
820 # Generate conversion paths (input_dir → conversion_dir)
821 conversion_paths = _generate_materialized_paths(memory_paths, Path(step_input_dir), Path(input_conversion_dir))
823 # Ensure conversion directory exists
824 filemanager.ensure_directory(input_conversion_dir, input_conversion_backend)
826 # Save using existing materialized data infrastructure
827 _save_materialized_data(filemanager, memory_data, conversion_paths, input_conversion_backend, step_plan, context, well_id)
829 logger.info(f"🔬 Converted {len(conversion_paths)} input files to {input_conversion_dir}")
831 # 🔍 VRAM TRACKING: Log memory at step start
832 try:
833 from openhcs.core.memory.gpu_cleanup import log_gpu_memory_usage
834 log_gpu_memory_usage(f"step {step_name} start")
835 except ImportError:
836 pass # GPU cleanup not available
840 log_gpu_memory_usage(f"step {step_name} start")
841 except Exception:
842 pass
844 logger.info(f"🔥 STEP: Starting processing for '{step_name}' well {well_id} (group_by={group_by.name if group_by else None}, variable_components={[vc.name for vc in variable_components] if variable_components else []})")
846 if well_id not in patterns_by_well: 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true
847 raise ValueError(
848 f"No patterns detected for well '{well_id}' in step '{step_name}' (ID: {step_id}). "
849 f"This indicates either: (1) no image files found for this well, "
850 f"(2) image files don't match the expected naming pattern, or "
851 f"(3) pattern detection failed. Check input directory: {step_input_dir}"
852 )
854 if isinstance(patterns_by_well[well_id], dict): 854 ↛ 856line 854 didn't jump to line 856 because the condition on line 854 was never true
855 # Grouped patterns (when group_by is set)
856 for comp_val, pattern_list in patterns_by_well[well_id].items():
857 logger.debug(f"🔥 STEP: Component '{comp_val}' has {len(pattern_list)} patterns: {pattern_list}")
858 else:
859 # Ungrouped patterns (when group_by is None)
860 logger.debug(f"🔥 STEP: Found {len(patterns_by_well[well_id])} ungrouped patterns: {patterns_by_well[well_id]}")
862 if func_from_plan is None: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true
863 raise ValueError(f"Step plan missing 'func' for step: {step_plan.get('step_name', 'Unknown')} (ID: {step_id})")
865 grouped_patterns, comp_to_funcs, comp_to_base_args = prepare_patterns_and_functions(
866 patterns_by_well[well_id], func_from_plan, component=group_by.value if group_by else None
867 )
869 logger.info(f"🔍 DICT_PATTERN: grouped_patterns keys: {list(grouped_patterns.keys())}")
870 logger.info(f"🔍 DICT_PATTERN: comp_to_funcs keys: {list(comp_to_funcs.keys())}")
871 logger.info(f"🔍 DICT_PATTERN: func_from_plan type: {type(func_from_plan)}")
872 if isinstance(func_from_plan, dict): 872 ↛ 873line 872 didn't jump to line 873 because the condition on line 872 was never true
873 logger.info(f"🔍 DICT_PATTERN: func_from_plan keys: {list(func_from_plan.keys())}")
875 for comp_val, current_pattern_list in grouped_patterns.items():
876 logger.info(f"🔍 DICT_PATTERN: Processing component '{comp_val}' with {len(current_pattern_list)} patterns")
877 exec_func_or_chain = comp_to_funcs[comp_val]
878 base_kwargs = comp_to_base_args[comp_val]
879 logger.info(f"🔍 DICT_PATTERN: Component '{comp_val}' exec_func_or_chain: {exec_func_or_chain}")
880 for pattern_item in current_pattern_list:
881 _process_single_pattern_group(
882 context, pattern_item, exec_func_or_chain, base_kwargs,
883 step_input_dir, step_output_dir, well_id, comp_val,
884 read_backend, write_backend, input_mem_type, output_mem_type,
885 device_id, same_dir,
886 special_inputs, special_outputs, # Pass the maps from step_plan
887 step_plan["zarr_config"],
888 variable_components, step_id # Pass step_id for funcplan lookup
889 )
890 logger.info(f"🔥 STEP: Completed processing for '{step_name}' well {well_id}.")
892 # 📄 MATERIALIZATION WRITE: Only if not writing to memory
893 if write_backend != Backend.MEMORY.value:
894 memory_paths = get_paths_for_well(step_output_dir, Backend.MEMORY.value)
895 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
896 # Calculate zarr dimensions (ignored by non-zarr backends)
897 n_channels, n_z, n_fields = _calculate_zarr_dimensions(memory_paths, context.microscope_handler)
898 row, col = context.microscope_handler.parser.extract_row_column(well_id)
899 filemanager.ensure_directory(step_output_dir, write_backend)
900 filemanager.save_batch(memory_data, memory_paths, write_backend,
901 chunk_name=well_id, zarr_config=step_plan["zarr_config"],
902 n_channels=n_channels, n_z=n_z, n_fields=n_fields,
903 row=row, col=col)
905 # 📄 PER-STEP MATERIALIZATION: Additional materialized output if configured
906 if "materialized_output_dir" in step_plan:
907 materialized_output_dir = step_plan["materialized_output_dir"]
908 materialized_backend = step_plan["materialized_backend"]
910 memory_paths = get_paths_for_well(step_output_dir, Backend.MEMORY.value)
911 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
912 materialized_paths = _generate_materialized_paths(memory_paths, step_output_dir, Path(materialized_output_dir))
914 filemanager.ensure_directory(materialized_output_dir, materialized_backend)
915 _save_materialized_data(filemanager, memory_data, materialized_paths, materialized_backend, step_plan, context, well_id)
917 logger.info(f"🔬 Materialized {len(materialized_paths)} files to {materialized_output_dir}")
919 logger.info(f"FunctionStep {step_id} ({step_name}) completed for well {well_id}.")
921 # 📄 OPENHCS METADATA: Create metadata file automatically after step completion
922 # Track which backend was actually used for writing files
923 actual_write_backend = step_plan['write_backend']
925 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator
926 metadata_generator = OpenHCSMetadataGenerator(context.filemanager)
928 # Main step output metadata
929 is_pipeline_output = (actual_write_backend != Backend.MEMORY.value)
930 metadata_generator.create_metadata(
931 context,
932 step_plan['output_dir'],
933 actual_write_backend,
934 is_main=is_pipeline_output,
935 plate_root=step_plan['output_plate_root'],
936 sub_dir=step_plan['sub_dir']
937 )
939 # 📄 MATERIALIZED METADATA: Create metadata for materialized directory if it exists
940 if 'materialized_output_dir' in step_plan:
941 materialized_backend = step_plan.get('materialized_backend', actual_write_backend)
942 metadata_generator.create_metadata(
943 context,
944 step_plan['materialized_output_dir'],
945 materialized_backend,
946 is_main=False,
947 plate_root=step_plan['materialized_plate_root'],
948 sub_dir=step_plan['materialized_sub_dir']
949 )
951 # SPECIAL DATA MATERIALIZATION
952 special_outputs = step_plan.get('special_outputs', {})
953 logger.debug(f"🔍 MATERIALIZATION: special_outputs from step_plan: {special_outputs}")
954 logger.debug(f"🔍 MATERIALIZATION: special_outputs is empty? {not special_outputs}")
955 if special_outputs:
956 logger.info(f"🔬 MATERIALIZATION: Starting materialization for {len(special_outputs)} special outputs")
957 self._materialize_special_outputs(filemanager, step_plan, special_outputs)
958 logger.info(f"🔬 MATERIALIZATION: Completed materialization")
959 else:
960 logger.debug(f"🔍 MATERIALIZATION: No special outputs to materialize")
964 except Exception as e:
965 import traceback
966 full_traceback = traceback.format_exc()
967 logger.error(f"Error in FunctionStep {step_id} ({step_name}): {e}", exc_info=True)
968 logger.error(f"Full traceback for FunctionStep {step_id} ({step_name}):\n{full_traceback}")
972 raise
976 def _materialize_special_outputs(self, filemanager, step_plan, special_outputs):
977 """Load special data from memory and call materialization functions."""
978 logger.debug(f"🔍 MATERIALIZE_METHOD: Processing {len(special_outputs)} special outputs")
980 for output_key, output_info in special_outputs.items():
981 logger.debug(f"🔍 MATERIALIZE_METHOD: Processing output_key: {output_key}")
982 logger.debug(f"🔍 MATERIALIZE_METHOD: output_info: {output_info}")
984 mat_func = output_info.get('materialization_function')
985 logger.debug(f"🔍 MATERIALIZE_METHOD: materialization_function: {mat_func}")
987 if mat_func: 987 ↛ 988line 987 didn't jump to line 988 because the condition on line 987 was never true
988 path = output_info['path']
989 logger.info(f"🔬 MATERIALIZING: {output_key} from {path}")
991 try:
992 filemanager.ensure_directory(Path(path).parent, Backend.MEMORY.value)
993 special_data = filemanager.load(path, Backend.MEMORY.value)
994 logger.debug(f"🔍 MATERIALIZE_METHOD: Loaded special data type: {type(special_data)}")
996 result_path = mat_func(special_data, path, filemanager)
997 logger.info(f"🔬 MATERIALIZED: {output_key} → {result_path}")
999 except Exception as e:
1000 logger.error(f"🔬 MATERIALIZATION ERROR: Failed to materialize {output_key}: {e}")
1001 raise
1002 else:
1003 logger.warning(f"🔬 MATERIALIZATION: No materialization function for {output_key}, skipping")