Coverage for openhcs/core/steps/function_step.py: 6.9%
561 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2FunctionStep implementation for pattern-based processing.
4This module contains the FunctionStep class. During execution, FunctionStep instances
5are stateless regarding their configuration. All operational parameters, including
6the function(s) to execute, special input/output keys, their VFS paths, and memory types,
7are retrieved from this step's entry in `context.step_plans`.
8"""
10import logging
11import os
12import time
13import gc
14import json
15import shutil
16from functools import partial
17from pathlib import Path
18from typing import Any, Callable, Dict, List, Optional, Tuple, Union, OrderedDict as TypingOrderedDict, TYPE_CHECKING
20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true
21 from openhcs.core.config import PathPlanningConfig
23from openhcs.core.config import StreamingConfig
25from openhcs.constants.constants import (DEFAULT_IMAGE_EXTENSION,
26 DEFAULT_IMAGE_EXTENSIONS,
27 DEFAULT_SITE_PADDING, Backend,
28 MemoryType, VariableComponents, GroupBy)
29from openhcs.constants.input_source import InputSource
30from openhcs.core.context.processing_context import ProcessingContext
31from openhcs.core.steps.abstract import AbstractStep
32from openhcs.formats.func_arg_prep import prepare_patterns_and_functions
33from openhcs.core.memory.stack_utils import stack_slices, unstack_slices
34# OpenHCS imports moved to local imports to avoid circular dependencies
36from openhcs.core.components.validation import GenericValidator
38logger = logging.getLogger(__name__)
40def _generate_materialized_paths(memory_paths: List[str], step_output_dir: Path, materialized_output_dir: Path) -> List[str]:
41 """Generate materialized file paths by replacing step output directory."""
42 materialized_paths = []
43 for memory_path in memory_paths:
44 relative_path = Path(memory_path).relative_to(step_output_dir)
45 materialized_path = materialized_output_dir / relative_path
46 materialized_paths.append(str(materialized_path))
47 return materialized_paths
50def _save_materialized_data(filemanager, memory_data: List, materialized_paths: List[str],
51 materialized_backend: str, step_plan: Dict, context, axis_id: str) -> None:
52 """Save data to materialized location using appropriate backend."""
53 if materialized_backend == Backend.ZARR.value:
54 n_channels, n_z, n_fields = _calculate_zarr_dimensions(materialized_paths, context.microscope_handler)
55 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id)
56 filemanager.save_batch(memory_data, materialized_paths, materialized_backend,
57 chunk_name=axis_id, zarr_config=step_plan.get("zarr_config"),
58 n_channels=n_channels, n_z=n_z, n_fields=n_fields,
59 row=row, col=col)
60 else:
61 filemanager.save_batch(memory_data, materialized_paths, materialized_backend)
66def get_all_image_paths(input_dir, backend, axis_id, filemanager, microscope_handler):
67 """
68 Get all image file paths for a specific well from a directory.
70 Args:
71 input_dir: Directory to search for images
72 axis_id: Well identifier to filter files
73 backend: Backend to use for file listing
74 filemanager: FileManager instance
75 microscope_handler: Microscope handler with parser for filename parsing
77 Returns:
78 List of full file paths for the well
79 """
80 # List all image files in directory
81 all_image_files = filemanager.list_image_files(str(input_dir), backend)
83 # Filter by well using parser (FIXED: was using naive string matching)
84 axis_files = []
85 parser = microscope_handler.parser
87 for f in all_image_files:
88 filename = os.path.basename(str(f))
89 metadata = parser.parse_filename(filename)
90 # Use dynamic multiprocessing axis instead of hardcoded 'well'
91 from openhcs.constants import MULTIPROCESSING_AXIS
92 axis_key = MULTIPROCESSING_AXIS.value
93 if metadata and metadata.get(axis_key) == axis_id:
94 axis_files.append(str(f))
96 # Remove duplicates and sort
97 sorted_files = sorted(list(set(axis_files)))
99 # Prepare full file paths
100 input_dir_path = Path(input_dir)
101 full_file_paths = [str(input_dir_path / Path(f).name) for f in sorted_files]
103 logger.debug(f"Found {len(all_image_files)} total files, {len(full_file_paths)} for axis {axis_id}")
105 return full_file_paths
108def create_image_path_getter(axis_id, filemanager, microscope_handler):
109 """
110 Create a specialized image path getter function using runtime context.
112 Args:
113 axis_id: Well identifier
114 filemanager: FileManager instance
115 microscope_handler: Microscope handler with parser for filename parsing
117 Returns:
118 Function that takes (input_dir, backend) and returns image paths for the well
119 """
120 def get_paths_for_axis(input_dir, backend):
121 return get_all_image_paths(
122 input_dir=input_dir,
123 axis_id=axis_id,
124 backend=backend,
125 filemanager=filemanager,
126 microscope_handler=microscope_handler
127 )
128 return get_paths_for_axis
130# Environment variable to disable universal GPU defragmentation
131DISABLE_GPU_DEFRAG = os.getenv('OPENHCS_DISABLE_GPU_DEFRAG', 'false').lower() == 'true'
133def _bulk_preload_step_images(
134 step_input_dir: Path,
135 step_output_dir: Path,
136 axis_id: str,
137 read_backend: str,
138 patterns_by_well: Dict[str, Any],
139 filemanager: 'FileManager',
140 microscope_handler: 'MicroscopeHandler',
141 zarr_config: Optional[Dict[str, Any]] = None
142) -> None:
143 """
144 Pre-load all images for this step from source backend into memory backend.
146 This reduces I/O overhead by doing a single bulk read operation
147 instead of loading images per pattern group.
149 Note: External conditional logic ensures this is only called for non-memory backends.
150 """
151 import time
152 start_time = time.time()
154 logger.debug(f"🔄 BULK PRELOAD: Loading images from {read_backend} to memory for well {axis_id}")
156 # Get all files for this well from patterns
157 all_files = []
158 # Create specialized path getter for this well
159 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler)
161 # Get all image paths for this well
162 full_file_paths = get_paths_for_axis(step_input_dir, read_backend)
164 if not full_file_paths:
165 raise RuntimeError(f"🔄 BULK PRELOAD: No files found for well {axis_id} in {step_input_dir} with backend {read_backend}")
167 # Load from source backend with conditional zarr_config
168 if read_backend == Backend.ZARR.value:
169 raw_images = filemanager.load_batch(full_file_paths, read_backend, zarr_config=zarr_config)
170 else:
171 raw_images = filemanager.load_batch(full_file_paths, read_backend)
173 # Ensure directory exists in memory backend before saving
174 filemanager.ensure_directory(str(step_input_dir), Backend.MEMORY.value)
176 # Save to memory backend using OUTPUT paths
177 # memory_paths = [str(step_output_dir / Path(fp).name) for fp in full_file_paths]
178 for file_path in full_file_paths:
179 if filemanager.exists(file_path, Backend.MEMORY.value):
180 filemanager.delete(file_path, Backend.MEMORY.value)
181 logger.debug(f"🔄 BULK PRELOAD: Deleted existing file {file_path} before bulk preload")
183 filemanager.save_batch(raw_images, full_file_paths, Backend.MEMORY.value)
184 logger.debug(f"🔄 BULK PRELOAD: Saving {file_path} to memory")
186 # Clean up source references - keep only memory backend references
187 del raw_images
189 load_time = time.time() - start_time
190 logger.debug(f"🔄 BULK PRELOAD: Completed in {load_time:.2f}s - {len(full_file_paths)} images now in memory")
192def _bulk_writeout_step_images(
193 step_output_dir: Path,
194 write_backend: str,
195 axis_id: str,
196 zarr_config: Optional[Dict[str, Any]],
197 filemanager: 'FileManager',
198 microscope_handler: Optional[Any] = None
199) -> None:
200 """
201 Write all processed images from memory to final backend (disk/zarr).
203 This reduces I/O overhead by doing a single bulk write operation
204 instead of writing images per pattern group.
206 Note: External conditional logic ensures this is only called for non-memory backends.
207 """
208 import time
209 start_time = time.time()
211 logger.debug(f"🔄 BULK WRITEOUT: Writing images from memory to {write_backend} for well {axis_id}")
213 # Create specialized path getter and get memory paths for this well
214 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler)
215 memory_file_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value)
217 if not memory_file_paths:
218 raise RuntimeError(f"🔄 BULK WRITEOUT: No image files found for well {axis_id} in memory directory {step_output_dir}")
220 # Convert relative memory paths back to absolute paths for target backend
221 # Memory backend stores relative paths, but target backend needs absolute paths
222# file_paths =
223# for memory_path in memory_file_paths:
224# # Get just the filename and construct proper target path
225# filename = Path(memory_path).name
226# target_path = step_output_dir / filename
227# file_paths.append(str(target_path))
229 file_paths = memory_file_paths
230 logger.debug(f"🔄 BULK WRITEOUT: Found {len(file_paths)} image files in memory to write")
232 # Load all data from memory backend
233 memory_data = filemanager.load_batch(file_paths, Backend.MEMORY.value)
235 # Ensure output directory exists before bulk write
236 filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value)
238 # Bulk write to target backend with conditional zarr_config
239 if write_backend == Backend.ZARR.value:
240 # Calculate zarr dimensions from file paths
241 if microscope_handler is not None:
242 n_channels, n_z, n_fields = _calculate_zarr_dimensions(file_paths, microscope_handler)
243 # Parse well to get row and column for zarr structure
244 row, col = microscope_handler.parser.extract_component_coordinates(axis_id)
245 filemanager.save_batch(memory_data, file_paths, write_backend,
246 chunk_name=axis_id, zarr_config=zarr_config,
247 n_channels=n_channels, n_z=n_z, n_fields=n_fields,
248 row=row, col=col)
249 else:
250 # Fallback without dimensions if microscope_handler not available
251 filemanager.save_batch(memory_data, file_paths, write_backend, chunk_name=axis_id, zarr_config=zarr_config)
252 else:
253 filemanager.save_batch(memory_data, file_paths, write_backend)
255 write_time = time.time() - start_time
256 logger.debug(f"🔄 BULK WRITEOUT: Completed in {write_time:.2f}s - {len(memory_data)} images written to {write_backend}")
258def _calculate_zarr_dimensions(file_paths: List[Union[str, Path]], microscope_handler) -> tuple[int, int, int]:
259 """
260 Calculate zarr dimensions (n_channels, n_z, n_fields) from file paths using microscope parser.
262 Args:
263 file_paths: List of file paths to analyze
264 microscope_handler: Microscope handler with filename parser
266 Returns:
267 Tuple of (n_channels, n_z, n_fields)
268 """
269 parsed_files = []
270 for file_path in file_paths:
271 filename = Path(file_path).name
272 metadata = microscope_handler.parser.parse_filename(filename)
273 parsed_files.append(metadata)
275 # Count unique values for each dimension from actual files
276 n_channels = len(set(f.get('channel') for f in parsed_files if f.get('channel') is not None))
277 n_z = len(set(f.get('z_index') for f in parsed_files if f.get('z_index') is not None))
278 n_fields = len(set(f.get('site') for f in parsed_files if f.get('site') is not None))
280 # Ensure at least 1 for each dimension (handle cases where metadata is missing)
281 n_channels = max(1, n_channels)
282 n_z = max(1, n_z)
283 n_fields = max(1, n_fields)
285 return n_channels, n_z, n_fields
289def _is_3d(array: Any) -> bool:
290 """Check if an array is 3D."""
291 return hasattr(array, 'ndim') and array.ndim == 3
293def _execute_function_core(
294 func_callable: Callable,
295 main_data_arg: Any,
296 base_kwargs: Dict[str, Any],
297 context: 'ProcessingContext',
298 special_inputs_plan: Dict[str, str], # {'arg_name_for_func': 'special_path_value'}
299 special_outputs_plan: TypingOrderedDict[str, str], # {'output_key': 'special_path_value'}, order matters
300 axis_id: str, # Add axis_id parameter
301 input_memory_type: str,
302 device_id: int
303) -> Any: # Returns the main processed data stack
304 """
305 Executes a single callable, handling its special I/O.
306 - Loads special inputs from VFS paths in `special_inputs_plan`.
307 - Calls `func_callable(main_data_arg, **all_kwargs)`.
308 - If `special_outputs_plan` is non-empty, expects func to return (main_out, sp_val1, sp_val2,...).
309 - Saves special outputs positionally to VFS paths in `special_outputs_plan`.
310 - Returns the main processed data stack.
311 """
312 final_kwargs = base_kwargs.copy()
314 if special_inputs_plan:
315 logger.info(f"�� SPECIAL_INPUTS_DEBUG : special_inputs_plan = {special_inputs_plan}")
316 for arg_name, path_info in special_inputs_plan.items():
317 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Processing arg_name='{arg_name}', path_info={path_info} (type: {type(path_info)})")
320 # Extract path string from the path info dictionary
321 # Current format: {"path": "/path/to/file.pkl", "source_step_id": "step_123"}
322 if isinstance(path_info, dict) and 'path' in path_info:
323 special_path_value = path_info['path']
324 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Extracted path from dict: '{special_path_value}' (type: {type(special_path_value)})")
325 else:
326 special_path_value = path_info # Fallback if it's already a string
327 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Using path_info directly: '{special_path_value}' (type: {type(special_path_value)})")
329 logger.info(f"Loading special input '{arg_name}' from path '{special_path_value}' (memory backend)")
330 try:
331 final_kwargs[arg_name] = context.filemanager.load(special_path_value, Backend.MEMORY.value)
332 except Exception as e:
333 logger.error(f"Failed to load special input '{arg_name}' from '{special_path_value}': {e}", exc_info=True)
334 raise
336 # Auto-inject context if function signature expects it
337 import inspect
338 sig = inspect.signature(func_callable)
339 if 'context' in sig.parameters:
340 final_kwargs['context'] = context
342 # 🔍 DEBUG: Log input dimensions
343 input_shape = getattr(main_data_arg, 'shape', 'no shape attr')
344 input_type = type(main_data_arg).__name__
345 logger.debug(f"🔍 FUNCTION INPUT: {func_callable.__name__} - shape: {input_shape}, type: {input_type}")
347 # ⚡ INFO: Terse function execution log for user feedback
348 logger.info(f"⚡ Executing: {func_callable.__name__}")
350 # 🔍 DEBUG: Log function attributes before execution
351 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - special_outputs: {getattr(func_callable, '__special_outputs__', 'None')}")
352 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - input_memory_type: {getattr(func_callable, 'input_memory_type', 'None')}")
353 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - output_memory_type: {getattr(func_callable, 'output_memory_type', 'None')}")
355 raw_function_output = func_callable(main_data_arg, **final_kwargs)
357 # 🔍 DEBUG: Log output dimensions and type details
358 output_shape = getattr(raw_function_output, 'shape', 'no shape attr')
359 output_type = type(raw_function_output).__name__
360 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - shape: {output_shape}, type: {output_type}")
362 # 🔍 DEBUG: If it's a tuple, log details about each element
363 if isinstance(raw_function_output, tuple):
364 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - tuple length: {len(raw_function_output)}")
365 for i, element in enumerate(raw_function_output):
366 elem_shape = getattr(element, 'shape', 'no shape attr')
367 elem_type = type(element).__name__
368 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - element[{i}]: shape={elem_shape}, type={elem_type}")
369 else:
370 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - not a tuple, single return value")
372 main_output_data = raw_function_output
374 # 🔍 DEBUG: Log special output plan status
375 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: {special_outputs_plan}")
376 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Is empty? {not special_outputs_plan}")
377 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Length: {len(special_outputs_plan) if special_outputs_plan else 0}")
379 # Only log special outputs if there are any (avoid spamming empty dict logs)
380 if special_outputs_plan:
381 logger.debug(f"🔍 SPECIAL OUTPUT: {special_outputs_plan}")
382 if special_outputs_plan:
383 num_special_outputs = len(special_outputs_plan)
384 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Expected {num_special_outputs} special outputs")
385 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned type: {type(raw_function_output)}")
386 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned tuple length: {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'not tuple'}")
388 if not isinstance(raw_function_output, tuple) or len(raw_function_output) != (1 + num_special_outputs):
389 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Function '{getattr(func_callable, '__name__', 'unknown')}' special output mismatch")
390 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Expected tuple of {1 + num_special_outputs} values")
391 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Got {type(raw_function_output)} with {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'N/A'} values")
392 raise ValueError(
393 f"Function '{getattr(func_callable, '__name__', 'unknown')}' was expected to return a tuple of "
394 f"{1 + num_special_outputs} values (main_output + {num_special_outputs} special) "
395 f"based on 'special_outputs' in step plan, but returned {len(raw_function_output) if isinstance(raw_function_output, tuple) else type(raw_function_output)} values."
396 )
397 main_output_data = raw_function_output[0]
398 returned_special_values_tuple = raw_function_output[1:]
400 # 🔍 DEBUG: Log what we extracted
401 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data type: {type(main_output_data)}")
402 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data shape: {getattr(main_output_data, 'shape', 'no shape')}")
403 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted {len(returned_special_values_tuple)} special values")
405 # Iterate through special_outputs_plan (which must be ordered by compiler)
406 # and match with positionally returned special values.
407 for i, (output_key, vfs_path_info) in enumerate(special_outputs_plan.items()):
408 logger.info(f"Saving special output '{output_key}' to VFS path '{vfs_path_info}' (memory backend)")
409 if i < len(returned_special_values_tuple):
410 value_to_save = returned_special_values_tuple[i]
411 # Extract path string from the path info dictionary
412 # Current format: {"path": "/path/to/file.pkl"}
413 if isinstance(vfs_path_info, dict) and 'path' in vfs_path_info:
414 vfs_path = vfs_path_info['path']
415 else:
416 vfs_path = vfs_path_info # Fallback if it's already a string
417 # # Add axis_id prefix to filename for memory backend to avoid thread collisions
418 # from pathlib import Path
419 # vfs_path_obj = Path(vfs_path)
420 # prefixed_filename = f"{axis_id}_{vfs_path_obj.name}"
421 # prefixed_vfs_path = str(vfs_path_obj.parent / prefixed_filename)
423 logger.info(f"🔍 SPECIAL_SAVE: Saving '{output_key}' to '{vfs_path}' (memory backend)")
424 # Ensure directory exists for memory backend
425 parent_dir = str(Path(vfs_path).parent)
426 context.filemanager.ensure_directory(parent_dir, Backend.MEMORY.value)
427 context.filemanager.save(value_to_save, vfs_path, Backend.MEMORY.value)
428 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory")
429 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory")
430 else:
431 # This indicates a mismatch that should ideally be caught by schema/validation
432 logger.error(f"Mismatch: {num_special_outputs} special outputs planned, but fewer values returned by function for key '{output_key}'.")
433 # Or, if partial returns are allowed, this might be a warning. For now, error.
434 raise ValueError(f"Function did not return enough values for all planned special outputs. Missing value for '{output_key}'.")
436 return main_output_data
438def _execute_chain_core(
439 initial_data_stack: Any,
440 func_chain: List[Union[Callable, Tuple[Callable, Dict]]],
441 context: 'ProcessingContext',
442 step_special_inputs_plan: Dict[str, str],
443 step_special_outputs_plan: TypingOrderedDict[str, str],
444 axis_id: str, # Add axis_id parameter
445 device_id: int,
446 input_memory_type: str,
447 step_index: int, # Add step_index for funcplan lookup
448 dict_key: str = "default" # Add dict_key for funcplan lookup
449) -> Any:
450 current_stack = initial_data_stack
451 current_memory_type = input_memory_type # Track memory type from frozen context
453 for i, func_item in enumerate(func_chain):
454 actual_callable: Callable
455 base_kwargs_for_item: Dict[str, Any] = {}
456 is_last_in_chain = (i == len(func_chain) - 1)
458 if isinstance(func_item, tuple) and len(func_item) == 2 and callable(func_item[0]):
459 actual_callable, base_kwargs_for_item = func_item
460 elif callable(func_item):
461 actual_callable = func_item
462 else:
463 raise TypeError(f"Invalid item in function chain: {func_item}.")
465 # Convert to function's input memory type (noop if same)
466 from openhcs.core.memory.converters import convert_memory
467 current_stack = convert_memory(
468 data=current_stack,
469 source_type=current_memory_type,
470 target_type=actual_callable.input_memory_type,
471 gpu_id=device_id,
472 allow_cpu_roundtrip=False
473 )
475 # Use funcplan to determine which outputs this function should save
476 funcplan = context.step_plans[step_index].get("funcplan", {})
477 func_name = getattr(actual_callable, '__name__', 'unknown')
479 # Construct execution key: function_name_dict_key_chain_position
480 execution_key = f"{func_name}_{dict_key}_{i}"
482 logger.info(f"🔍 FUNCPLAN DEBUG: execution_key = {execution_key}")
483 logger.info(f"🔍 FUNCPLAN DEBUG: funcplan keys = {list(funcplan.keys()) if funcplan else 'EMPTY'}")
484 logger.info(f"🔍 FUNCPLAN DEBUG: step_special_outputs_plan = {step_special_outputs_plan}")
486 if execution_key in funcplan:
487 # Get outputs this specific function should save
488 outputs_to_save = funcplan[execution_key]
489 outputs_plan_for_this_call = {
490 key: step_special_outputs_plan[key]
491 for key in outputs_to_save
492 if key in step_special_outputs_plan
493 }
494 logger.info(f"🔍 FUNCPLAN: {execution_key} -> {outputs_to_save}")
495 logger.info(f"🔍 FUNCPLAN: outputs_plan_for_this_call = {outputs_plan_for_this_call}")
496 else:
497 # Fallback: no funcplan entry, save nothing
498 outputs_plan_for_this_call = {}
499 logger.info(f"🔍 FUNCPLAN: No entry for {execution_key}, saving nothing")
501 current_stack = _execute_function_core(
502 func_callable=actual_callable,
503 main_data_arg=current_stack,
504 base_kwargs=base_kwargs_for_item,
505 context=context,
506 special_inputs_plan=step_special_inputs_plan,
507 special_outputs_plan=outputs_plan_for_this_call,
508 axis_id=axis_id,
509 device_id=device_id,
510 input_memory_type=input_memory_type,
511 )
513 # Update current memory type from frozen context
514 current_memory_type = actual_callable.output_memory_type
516 return current_stack
518def _process_single_pattern_group(
519 context: 'ProcessingContext',
520 pattern_group_info: Any,
521 executable_func_or_chain: Any,
522 base_func_args: Dict[str, Any],
523 step_input_dir: Path,
524 step_output_dir: Path,
525 axis_id: str,
526 component_value: str,
527 read_backend: str,
528 write_backend: str,
529 input_memory_type_from_plan: str, # Explicitly from plan
530 output_memory_type_from_plan: str, # Explicitly from plan
531 device_id: Optional[int],
532 same_directory: bool,
533 special_inputs_map: Dict[str, str],
534 special_outputs_map: TypingOrderedDict[str, str],
535 zarr_config: Optional[Dict[str, Any]],
536 variable_components: Optional[List[str]] = None,
537 step_index: Optional[int] = None # Add step_index for funcplan lookup
538) -> None:
539 start_time = time.time()
540 pattern_repr = str(pattern_group_info)[:100]
541 logger.debug(f"🔥 PATTERN: Processing {pattern_repr} for well {axis_id}")
543 try:
544 if not context.microscope_handler:
545 raise RuntimeError("MicroscopeHandler not available in context.")
547 matching_files = context.microscope_handler.path_list_from_pattern(
548 str(step_input_dir), pattern_group_info, context.filemanager, Backend.MEMORY.value,
549 [vc.value for vc in variable_components] if variable_components else None
550 )
552 if not matching_files:
553 raise ValueError(
554 f"No matching files found for pattern group {pattern_repr} in {step_input_dir}. "
555 f"This indicates either: (1) no image files exist in the directory, "
556 f"(2) files don't match the pattern, or (3) pattern parsing failed. "
557 f"Check that input files exist and match the expected naming convention."
558 )
560 logger.debug(f"🔥 PATTERN: Found {len(matching_files)} files: {[Path(f).name for f in matching_files]}")
562 # Sort files to ensure consistent ordering (especially important for z-stacks)
563 matching_files.sort()
564 logger.debug(f"🔥 PATTERN: Sorted files: {[Path(f).name for f in matching_files]}")
566 full_file_paths = [str(step_input_dir / f) for f in matching_files]
567 raw_slices = context.filemanager.load_batch(full_file_paths, Backend.MEMORY.value)
569 if not raw_slices:
570 raise ValueError(
571 f"No valid images loaded for pattern group {pattern_repr} in {step_input_dir}. "
572 f"Found {len(matching_files)} matching files but failed to load any valid images. "
573 f"This indicates corrupted image files, unsupported formats, or I/O errors. "
574 f"Check file integrity and format compatibility."
575 )
577 # 🔍 DEBUG: Log stacking operation
578 logger.debug(f"🔍 STACKING: {len(raw_slices)} slices → memory_type: {input_memory_type_from_plan}")
579 if raw_slices:
580 slice_shapes = [getattr(s, 'shape', 'no shape') for s in raw_slices[:3]] # First 3 shapes
581 logger.debug(f"🔍 STACKING: Sample slice shapes: {slice_shapes}")
583 main_data_stack = stack_slices(
584 slices=raw_slices, memory_type=input_memory_type_from_plan, gpu_id=device_id
585 )
587 # 🔍 DEBUG: Log stacked result
588 stack_shape = getattr(main_data_stack, 'shape', 'no shape')
589 stack_type = type(main_data_stack).__name__
590 logger.debug(f"🔍 STACKED RESULT: shape: {stack_shape}, type: {stack_type}")
592 logger.info(f"🔍 special_outputs_map: {special_outputs_map}")
594 final_base_kwargs = base_func_args.copy()
596 # Get step function from step plan
597 step_func = context.step_plans[step_index]["func"]
599 if isinstance(step_func, dict):
600 dict_key_for_funcplan = component_value # Use actual dict key for dict patterns
601 else:
602 dict_key_for_funcplan = "default" # Use default for list/single patterns
604 if isinstance(executable_func_or_chain, list):
605 processed_stack = _execute_chain_core(
606 main_data_stack, executable_func_or_chain, context,
607 special_inputs_map, special_outputs_map, axis_id,
608 device_id, input_memory_type_from_plan, step_index, dict_key_for_funcplan
609 )
610 elif callable(executable_func_or_chain):
611 # For single functions, apply funcplan filtering like in chain execution
612 funcplan = context.step_plans[step_index].get("funcplan", {})
613 func_name = getattr(executable_func_or_chain, '__name__', 'unknown')
614 execution_key = f"{func_name}_{dict_key_for_funcplan}_0" # Position 0 for single functions
616 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: execution_key = {execution_key}")
617 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: funcplan keys = {list(funcplan.keys()) if funcplan else 'EMPTY'}")
618 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: special_outputs_map = {special_outputs_map}")
620 if execution_key in funcplan:
621 # Get outputs this specific function should save
622 outputs_to_save = funcplan[execution_key]
623 filtered_special_outputs_map = {
624 key: special_outputs_map[key]
625 for key in outputs_to_save
626 if key in special_outputs_map
627 }
628 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: {execution_key} -> {outputs_to_save}")
629 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: filtered_special_outputs_map = {filtered_special_outputs_map}")
630 else:
631 # Fallback: no funcplan entry, save nothing
632 filtered_special_outputs_map = {}
633 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: No entry for {execution_key}, saving nothing")
635 processed_stack = _execute_function_core(
636 executable_func_or_chain, main_data_stack, final_base_kwargs, context,
637 special_inputs_map, filtered_special_outputs_map, axis_id, input_memory_type_from_plan, device_id
638 )
639 else:
640 raise TypeError(f"Invalid executable_func_or_chain: {type(executable_func_or_chain)}")
642 # 🔍 DEBUG: Check what shape the function actually returned
643 input_shape = getattr(main_data_stack, 'shape', 'unknown')
644 output_shape = getattr(processed_stack, 'shape', 'unknown')
645 processed_type = type(processed_stack).__name__
646 logger.debug(f"🔍 PROCESSING RESULT: input: {input_shape} → output: {output_shape}, type: {processed_type}")
648 # 🔍 DEBUG: Additional validation logging
649 logger.debug(f"🔍 VALIDATION: processed_stack type: {type(processed_stack)}")
650 logger.debug(f"🔍 VALIDATION: processed_stack has shape attr: {hasattr(processed_stack, 'shape')}")
651 logger.debug(f"🔍 VALIDATION: processed_stack has ndim attr: {hasattr(processed_stack, 'ndim')}")
652 if hasattr(processed_stack, 'ndim'):
653 logger.debug(f"🔍 VALIDATION: processed_stack ndim: {processed_stack.ndim}")
654 if hasattr(processed_stack, 'shape'):
655 logger.debug(f"🔍 VALIDATION: processed_stack shape: {processed_stack.shape}")
657 if not _is_3d(processed_stack):
658 logger.error(f"🔍 VALIDATION ERROR: processed_stack is not 3D")
659 logger.error(f"🔍 VALIDATION ERROR: Type: {type(processed_stack)}")
660 logger.error(f"🔍 VALIDATION ERROR: Shape: {getattr(processed_stack, 'shape', 'no shape attr')}")
661 logger.error(f"🔍 VALIDATION ERROR: Has ndim: {hasattr(processed_stack, 'ndim')}")
662 if hasattr(processed_stack, 'ndim'):
663 logger.error(f"🔍 VALIDATION ERROR: ndim value: {processed_stack.ndim}")
664 raise ValueError(f"Main processing must result in a 3D array, got {getattr(processed_stack, 'shape', 'unknown')}")
666 # 🔍 DEBUG: Log unstacking operation
667 logger.debug(f"🔍 UNSTACKING: shape: {output_shape} → memory_type: {output_memory_type_from_plan}")
671 output_slices = unstack_slices(
672 array=processed_stack, memory_type=output_memory_type_from_plan, gpu_id=device_id, validate_slices=True
673 )
675 # 🔍 DEBUG: Log unstacked result
676 if output_slices:
677 unstacked_shapes = [getattr(s, 'shape', 'no shape') for s in output_slices[:3]] # First 3 shapes
678 logger.debug(f"🔍 UNSTACKED RESULT: {len(output_slices)} slices, sample shapes: {unstacked_shapes}")
680 # Handle cases where function returns fewer images than inputs (e.g., z-stack flattening, channel compositing)
681 # In such cases, we save only the returned images using the first N input filenames
682 num_outputs = len(output_slices)
683 num_inputs = len(matching_files)
685 if num_outputs < num_inputs:
686 logger.debug(f"Function returned {num_outputs} images from {num_inputs} inputs - likely flattening operation")
687 elif num_outputs > num_inputs:
688 logger.warning(f"Function returned more images ({num_outputs}) than inputs ({num_inputs}) - unexpected")
690 # Save the output images using batch operations
691 try:
692 # Prepare batch data
693 output_data = []
694 output_paths_batch = []
696 for i, img_slice in enumerate(output_slices):
697 # FAIL FAST: No fallback filenames - if we have more outputs than inputs, something is wrong
698 if i >= len(matching_files):
699 raise ValueError(
700 f"Function returned {num_outputs} output slices but only {num_inputs} input files available. "
701 f"Cannot generate filename for output slice {i}. This indicates a bug in the function or "
702 f"unstacking logic - functions should return same or fewer images than inputs."
703 )
705 input_filename = matching_files[i]
706 output_filename = Path(input_filename).name
707 output_path = Path(step_output_dir) / output_filename
709 # Always ensure we can write to the output path (delete if exists)
710 if context.filemanager.exists(str(output_path), Backend.MEMORY.value):
711 context.filemanager.delete(str(output_path), Backend.MEMORY.value)
713 output_data.append(img_slice)
714 output_paths_batch.append(str(output_path))
716 # Ensure directory exists
717 context.filemanager.ensure_directory(str(step_output_dir), Backend.MEMORY.value)
719 # Only pass zarr_config to zarr backend - fail loud for invalid parameters
720 #if write_backend == Backend.ZARR.value:
721 # Batch save
722 # context.filemanager.save_batch(output_data, output_paths_batch, write_backend, zarr_config=zarr_config)
723 # else:
724 context.filemanager.save_batch(output_data, output_paths_batch, Backend.MEMORY.value)
726 except Exception as e:
727 logger.error(f"Error saving batch of output slices for pattern {pattern_repr}: {e}", exc_info=True)
729 # 🔥 CLEANUP: If function returned fewer images than inputs, delete the unused input files
730 # This prevents unused channel files from remaining in memory after compositing
731 if num_outputs < num_inputs:
732 for j in range(num_outputs, num_inputs):
733 unused_input_filename = matching_files[j]
734 unused_input_path = Path(step_input_dir) / unused_input_filename
735 if context.filemanager.exists(str(unused_input_path), Backend.MEMORY.value):
736 context.filemanager.delete(str(unused_input_path), Backend.MEMORY.value)
737 logger.debug(f"🔥 CLEANUP: Deleted unused input file: {unused_input_filename}")
741 logger.debug(f"Finished pattern group {pattern_repr} in {(time.time() - start_time):.2f}s.")
742 except Exception as e:
743 import traceback
744 full_traceback = traceback.format_exc()
745 logger.error(f"Error processing pattern group {pattern_repr}: {e}", exc_info=True)
746 logger.error(f"Full traceback for pattern group {pattern_repr}:\n{full_traceback}")
747 raise ValueError(f"Failed to process pattern group {pattern_repr}: {e}") from e
749class FunctionStep(AbstractStep):
751 def __init__(
752 self,
753 func: Union[Callable, Tuple[Callable, Dict], List[Union[Callable, Tuple[Callable, Dict]]]],
754 **kwargs
755 ):
756 # Generate default name from function if not provided
757 if 'name' not in kwargs or kwargs['name'] is None:
758 actual_func_for_name = func
759 if isinstance(func, tuple): 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true
760 actual_func_for_name = func[0]
761 elif isinstance(func, list) and func: 761 ↛ 762line 761 didn't jump to line 762 because the condition on line 761 was never true
762 first_item = func[0]
763 if isinstance(first_item, tuple):
764 actual_func_for_name = first_item[0]
765 elif callable(first_item):
766 actual_func_for_name = first_item
767 kwargs['name'] = getattr(actual_func_for_name, '__name__', 'FunctionStep')
769 super().__init__(**kwargs)
770 self.func = func # This is used by prepare_patterns_and_functions at runtime
772 def process(self, context: 'ProcessingContext', step_index: int) -> None:
773 # Access step plan by index (step_plans keyed by index, not step_id)
774 step_plan = context.step_plans[step_index]
776 # Get step name for logging
777 step_name = step_plan['step_name']
779 try:
780 axis_id = step_plan['axis_id']
781 step_input_dir = Path(step_plan['input_dir'])
782 step_output_dir = Path(step_plan['output_dir'])
783 variable_components = step_plan['variable_components']
784 group_by = step_plan['group_by']
785 func_from_plan = step_plan['func']
787 # special_inputs/outputs are dicts: {'key': 'vfs_path_value'}
788 special_inputs = step_plan['special_inputs']
789 special_outputs = step_plan['special_outputs'] # Should be OrderedDict if order matters
791 read_backend = step_plan['read_backend']
792 write_backend = step_plan['write_backend']
793 input_mem_type = step_plan['input_memory_type']
794 output_mem_type = step_plan['output_memory_type']
795 microscope_handler = context.microscope_handler
796 filemanager = context.filemanager
798 # Create path getter for this well
799 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler)
801 # Store path getter in step_plan for streaming access
802 step_plan["get_paths_for_axis"] = get_paths_for_axis
804 # Get patterns first for bulk preload
805 # Use dynamic filter parameter based on current multiprocessing axis
806 from openhcs.constants import MULTIPROCESSING_AXIS
807 axis_name = MULTIPROCESSING_AXIS.value
808 filter_kwargs = {f"{axis_name}_filter": [axis_id]}
810 patterns_by_well = microscope_handler.auto_detect_patterns(
811 str(step_input_dir), # folder_path
812 filemanager, # filemanager
813 read_backend, # backend
814 extensions=DEFAULT_IMAGE_EXTENSIONS, # extensions
815 group_by=group_by, # Pass GroupBy enum directly
816 variable_components=[vc.value for vc in variable_components] if variable_components else [], # variable_components for placeholder logic
817 **filter_kwargs # Dynamic filter parameter
818 )
821 # Only access gpu_id if the step requires GPU (has GPU memory types)
822 from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES
823 requires_gpu = (input_mem_type in VALID_GPU_MEMORY_TYPES or
824 output_mem_type in VALID_GPU_MEMORY_TYPES)
826 # Ensure variable_components is never None - use default if missing
827 if variable_components is None:
828 variable_components = [VariableComponents.SITE] # Default fallback
829 logger.warning(f"Step {step_index} ({step_name}) had None variable_components, using default [SITE]")
830 if requires_gpu:
831 device_id = step_plan['gpu_id']
832 logger.debug(f"🔥 DEBUG: Step {step_index} gpu_id from plan: {device_id}, input_mem: {input_mem_type}, output_mem: {output_mem_type}")
833 else:
834 device_id = None # CPU-only step
835 logger.debug(f"🔥 DEBUG: Step {step_index} is CPU-only, input_mem: {input_mem_type}, output_mem: {output_mem_type}")
837 logger.debug(f"🔥 DEBUG: Step {step_index} read_backend: {read_backend}, write_backend: {write_backend}")
839 if not all([axis_id, step_input_dir, step_output_dir]):
840 raise ValueError(f"Plan missing essential keys for step {step_index}")
842 same_dir = str(step_input_dir) == str(step_output_dir)
843 logger.info(f"Step {step_index} ({step_name}) I/O: read='{read_backend}', write='{write_backend}'.")
844 logger.info(f"Step {step_index} ({step_name}) Paths: input_dir='{step_input_dir}', output_dir='{step_output_dir}', same_dir={same_dir}")
846 # 🔄 MATERIALIZATION READ: Bulk preload if not reading from memory
847 if read_backend != Backend.MEMORY.value:
848 _bulk_preload_step_images(step_input_dir, step_output_dir, axis_id, read_backend,
849 patterns_by_well,filemanager, microscope_handler, step_plan["zarr_config"])
851 # 🔄 INPUT CONVERSION: Convert loaded input data to zarr if configured
852 if "input_conversion_dir" in step_plan:
853 input_conversion_dir = step_plan["input_conversion_dir"]
854 input_conversion_backend = step_plan["input_conversion_backend"]
856 logger.info(f"Converting input data to zarr: {input_conversion_dir}")
858 # Get memory paths from input data (already loaded)
859 memory_paths = get_paths_for_axis(step_input_dir, Backend.MEMORY.value)
860 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
862 # Generate conversion paths (input_dir → conversion_dir)
863 conversion_paths = _generate_materialized_paths(memory_paths, Path(step_input_dir), Path(input_conversion_dir))
865 # Parse actual filenames to determine dimensions
866 # Calculate zarr dimensions from conversion paths (which contain the filenames)
867 n_channels, n_z, n_fields = _calculate_zarr_dimensions(conversion_paths, context.microscope_handler)
868 # Parse well to get row and column for zarr structure
869 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id)
871 # Save using existing materialized data infrastructure
872 _save_materialized_data(filemanager, memory_data, conversion_paths, input_conversion_backend, step_plan, context, axis_id)
874 logger.info(f"🔬 Converted {len(conversion_paths)} input files to {input_conversion_dir}")
876 # 🔍 VRAM TRACKING: Log memory at step start
877 try:
878 from openhcs.core.memory.gpu_cleanup import log_gpu_memory_usage
879 log_gpu_memory_usage(f"step {step_name} start")
880 except ImportError:
881 pass # GPU cleanup not available
885 log_gpu_memory_usage(f"step {step_name} start")
886 except Exception:
887 pass
889 logger.info(f"🔥 STEP: Starting processing for '{step_name}' well {axis_id} (group_by={group_by.name if group_by else None}, variable_components={[vc.name for vc in variable_components] if variable_components else []})")
891 if axis_id not in patterns_by_well:
892 raise ValueError(
893 f"No patterns detected for well '{axis_id}' in step '{step_name}' (index: {step_index}). "
894 f"This indicates either: (1) no image files found for this well, "
895 f"(2) image files don't match the expected naming pattern, or "
896 f"(3) pattern detection failed. Check input directory: {step_input_dir}"
897 )
899 if isinstance(patterns_by_well[axis_id], dict):
900 # Grouped patterns (when group_by is set)
901 for comp_val, pattern_list in patterns_by_well[axis_id].items():
902 logger.debug(f"🔥 STEP: Component '{comp_val}' has {len(pattern_list)} patterns: {pattern_list}")
903 else:
904 # Ungrouped patterns (when group_by is None)
905 logger.debug(f"🔥 STEP: Found {len(patterns_by_well[axis_id])} ungrouped patterns: {patterns_by_well[axis_id]}")
907 if func_from_plan is None:
908 raise ValueError(f"Step plan missing 'func' for step: {step_plan.get('step_name', 'Unknown')} (index: {step_index})")
910 grouped_patterns, comp_to_funcs, comp_to_base_args = prepare_patterns_and_functions(
911 patterns_by_well[axis_id], func_from_plan, component=group_by.value if group_by else None
912 )
914 logger.info(f"🔍 DICT_PATTERN: grouped_patterns keys: {list(grouped_patterns.keys())}")
915 logger.info(f"🔍 DICT_PATTERN: comp_to_funcs keys: {list(comp_to_funcs.keys())}")
916 logger.info(f"🔍 DICT_PATTERN: func_from_plan type: {type(func_from_plan)}")
917 if isinstance(func_from_plan, dict):
918 logger.info(f"🔍 DICT_PATTERN: func_from_plan keys: {list(func_from_plan.keys())}")
920 for comp_val, current_pattern_list in grouped_patterns.items():
921 logger.info(f"🔍 DICT_PATTERN: Processing component '{comp_val}' with {len(current_pattern_list)} patterns")
922 exec_func_or_chain = comp_to_funcs[comp_val]
923 base_kwargs = comp_to_base_args[comp_val]
924 logger.info(f"🔍 DICT_PATTERN: Component '{comp_val}' exec_func_or_chain: {exec_func_or_chain}")
925 for pattern_item in current_pattern_list:
926 _process_single_pattern_group(
927 context, pattern_item, exec_func_or_chain, base_kwargs,
928 step_input_dir, step_output_dir, axis_id, comp_val,
929 read_backend, write_backend, input_mem_type, output_mem_type,
930 device_id, same_dir,
931 special_inputs, special_outputs, # Pass the maps from step_plan
932 step_plan["zarr_config"],
933 variable_components, step_index # Pass step_index for funcplan lookup
934 )
935 logger.info(f"🔥 STEP: Completed processing for '{step_name}' well {axis_id}.")
937 # 📄 MATERIALIZATION WRITE: Only if not writing to memory
938 if write_backend != Backend.MEMORY.value:
939 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value)
940 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
941 # Calculate zarr dimensions (ignored by non-zarr backends)
942 n_channels, n_z, n_fields = _calculate_zarr_dimensions(memory_paths, context.microscope_handler)
943 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id)
944 filemanager.ensure_directory(step_output_dir, write_backend)
945 filemanager.save_batch(memory_data, memory_paths, write_backend,
946 chunk_name=axis_id, zarr_config=step_plan["zarr_config"],
947 n_channels=n_channels, n_z=n_z, n_fields=n_fields,
948 row=row, col=col)
950 # 📄 PER-STEP MATERIALIZATION: Additional materialized output if configured
951 if "materialized_output_dir" in step_plan:
952 materialized_output_dir = step_plan["materialized_output_dir"]
953 materialized_backend = step_plan["materialized_backend"]
955 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value)
956 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
957 materialized_paths = _generate_materialized_paths(memory_paths, step_output_dir, Path(materialized_output_dir))
959 filemanager.ensure_directory(materialized_output_dir, materialized_backend)
960 _save_materialized_data(filemanager, memory_data, materialized_paths, materialized_backend, step_plan, context, axis_id)
962 logger.info(f"🔬 Materialized {len(materialized_paths)} files to {materialized_output_dir}")
964 # 📄 STREAMING: Execute all configured streaming backends
965 from openhcs.core.config import StreamingConfig
967 streaming_configs_found = []
968 for key, config_instance in step_plan.items():
969 if isinstance(config_instance, StreamingConfig):
970 streaming_configs_found.append((key, config_instance))
972 for key, config_instance in streaming_configs_found:
973 # Get paths at runtime like materialization does
974 step_output_dir = step_plan["output_dir"]
975 get_paths_for_axis = step_plan["get_paths_for_axis"] # Get the path getter from step_plan
976 streaming_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value)
977 streaming_data = filemanager.load_batch(streaming_paths, Backend.MEMORY.value)
978 kwargs = config_instance.get_streaming_kwargs(context) # Pass context for microscope handler access
980 # Add step information for proper layer naming
981 kwargs["step_index"] = step_index
982 kwargs["step_name"] = step_name
984 # Execute streaming - backend from config enum
985 filemanager.save_batch(streaming_data, streaming_paths, config_instance.backend.value, **kwargs)
986 logger.info(f"🔍 {config_instance.backend.name}: Streamed {len(streaming_paths)} files for step {step_name}")
988 logger.info(f"FunctionStep {step_index} ({step_name}) completed for well {axis_id}.")
990 # 📄 OPENHCS METADATA: Create metadata file automatically after step completion
991 # Track which backend was actually used for writing files
992 actual_write_backend = step_plan['write_backend']
994 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator
995 metadata_generator = OpenHCSMetadataGenerator(context.filemanager)
997 # Main step output metadata
998 is_pipeline_output = (actual_write_backend != Backend.MEMORY.value)
999 metadata_generator.create_metadata(
1000 context,
1001 step_plan['output_dir'],
1002 actual_write_backend,
1003 is_main=is_pipeline_output,
1004 plate_root=step_plan['output_plate_root'],
1005 sub_dir=step_plan['sub_dir']
1006 )
1008 # 📄 MATERIALIZED METADATA: Create metadata for materialized directory if it exists
1009 if 'materialized_output_dir' in step_plan:
1010 materialized_backend = step_plan.get('materialized_backend', actual_write_backend)
1011 metadata_generator.create_metadata(
1012 context,
1013 step_plan['materialized_output_dir'],
1014 materialized_backend,
1015 is_main=False,
1016 plate_root=step_plan['materialized_plate_root'],
1017 sub_dir=step_plan['materialized_sub_dir']
1018 )
1020 # SPECIAL DATA MATERIALIZATION
1021 special_outputs = step_plan.get('special_outputs', {})
1022 logger.debug(f"🔍 MATERIALIZATION: special_outputs from step_plan: {special_outputs}")
1023 logger.debug(f"🔍 MATERIALIZATION: special_outputs is empty? {not special_outputs}")
1024 if special_outputs:
1025 logger.info(f"🔬 MATERIALIZATION: Starting materialization for {len(special_outputs)} special outputs")
1026 self._materialize_special_outputs(filemanager, step_plan, special_outputs)
1027 logger.info(f"🔬 MATERIALIZATION: Completed materialization")
1028 else:
1029 logger.debug(f"🔍 MATERIALIZATION: No special outputs to materialize")
1033 except Exception as e:
1034 import traceback
1035 full_traceback = traceback.format_exc()
1036 logger.error(f"Error in FunctionStep {step_index} ({step_name}): {e}", exc_info=True)
1037 logger.error(f"Full traceback for FunctionStep {step_index} ({step_name}):\n{full_traceback}")
1041 raise
1044 def _extract_component_metadata(self, context: 'ProcessingContext', component: 'VariableComponents') -> Optional[Dict[str, str]]:
1045 """
1046 Extract component metadata from context cache safely.
1048 Args:
1049 context: ProcessingContext containing metadata_cache
1050 component: VariableComponents enum specifying which component to extract
1052 Returns:
1053 Dictionary mapping component keys to display names, or None if not available
1054 """
1055 try:
1056 if hasattr(context, 'metadata_cache') and context.metadata_cache:
1057 return context.metadata_cache.get(component, None)
1058 else:
1059 logger.debug(f"No metadata_cache available in context for {component.value}")
1060 return None
1061 except Exception as e:
1062 logger.debug(f"Error extracting {component.value} metadata from cache: {e}")
1063 return None
1065 def _create_openhcs_metadata_for_materialization(
1066 self,
1067 context: 'ProcessingContext',
1068 output_dir: str,
1069 write_backend: str
1070 ) -> None:
1071 """
1072 Create OpenHCS metadata file for materialization writes.
1074 Args:
1075 context: ProcessingContext containing microscope_handler and other state
1076 output_dir: Output directory path where metadata should be written
1077 write_backend: Backend being used for the write (disk/zarr)
1078 """
1079 # Check if this is a materialization write (disk/zarr) - memory writes don't need metadata
1080 if write_backend == Backend.MEMORY.value:
1081 logger.debug(f"Skipping metadata creation (memory write)")
1082 return
1084 logger.debug(f"Creating metadata for materialization write: {write_backend} -> {output_dir}")
1086 try:
1087 # Extract required information
1088 step_output_dir = Path(output_dir)
1090 # Check if we have microscope handler for metadata extraction
1091 if not context.microscope_handler:
1092 logger.debug("No microscope_handler in context - skipping OpenHCS metadata creation")
1093 return
1095 # Get source microscope information
1096 source_parser_name = context.microscope_handler.parser.__class__.__name__
1098 # Extract metadata from source microscope handler
1099 try:
1100 grid_dimensions = context.microscope_handler.metadata_handler.get_grid_dimensions(context.input_dir)
1101 pixel_size = context.microscope_handler.metadata_handler.get_pixel_size(context.input_dir)
1102 except Exception as e:
1103 logger.debug(f"Could not extract grid_dimensions/pixel_size from source: {e}")
1104 grid_dimensions = [1, 1] # Default fallback
1105 pixel_size = 1.0 # Default fallback
1107 # Get list of image files in output directory
1108 try:
1109 image_files = []
1110 if context.filemanager.exists(str(step_output_dir), write_backend):
1111 # List files in output directory
1112 files = context.filemanager.list_files(str(step_output_dir), write_backend)
1113 # Filter for image files (common extensions) and convert to strings
1114 image_extensions = {'.tif', '.tiff', '.png', '.jpg', '.jpeg'}
1115 image_files = [str(f) for f in files if Path(f).suffix.lower() in image_extensions]
1116 logger.debug(f"Found {len(image_files)} image files in {step_output_dir}")
1117 except Exception as e:
1118 logger.debug(f"Could not list image files in output directory: {e}")
1119 image_files = []
1121 # Detect available backends based on actual output files
1122 available_backends = self._detect_available_backends(step_output_dir)
1124 # Create metadata structure
1125 metadata = {
1126 "microscope_handler_name": context.microscope_handler.microscope_type,
1127 "source_filename_parser_name": source_parser_name,
1128 "grid_dimensions": list(grid_dimensions) if hasattr(grid_dimensions, '__iter__') else [1, 1],
1129 "pixel_size": float(pixel_size) if pixel_size is not None else 1.0,
1130 "image_files": image_files,
1131 "channels": self._extract_component_metadata(context, VariableComponents.CHANNEL),
1132 "wells": self._extract_component_metadata(context, VariableComponents.WELL),
1133 "sites": self._extract_component_metadata(context, VariableComponents.SITE),
1134 "z_indexes": self._extract_component_metadata(context, VariableComponents.Z_INDEX),
1135 "available_backends": available_backends
1136 }
1138 # Save metadata file using disk backend (JSON files always on disk)
1139 from openhcs.microscopes.openhcs import OpenHCSMetadataHandler
1140 metadata_path = step_output_dir / OpenHCSMetadataHandler.METADATA_FILENAME
1142 # Always ensure we can write to the metadata path (delete if exists)
1143 if context.filemanager.exists(str(metadata_path), Backend.DISK.value):
1144 context.filemanager.delete(str(metadata_path), Backend.DISK.value)
1146 # Ensure output directory exists on disk
1147 context.filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value)
1149 # Create JSON content - OpenHCS handler expects JSON format
1150 import json
1151 json_content = json.dumps(metadata, indent=2)
1152 context.filemanager.save(json_content, str(metadata_path), Backend.DISK.value)
1153 logger.debug(f"Created OpenHCS metadata file (disk): {metadata_path}")
1155 except Exception as e:
1156 # Graceful degradation - log error but don't fail the step
1157 logger.warning(f"Failed to create OpenHCS metadata file: {e}")
1158 logger.debug(f"OpenHCS metadata creation error details:", exc_info=True)
1160 def _detect_available_backends(self, output_dir: Path) -> Dict[str, bool]:
1161 """Detect which storage backends are actually available based on output files."""
1163 backends = {Backend.ZARR.value: False, Backend.DISK.value: False}
1165 # Check for zarr stores
1166 if list(output_dir.glob("*.zarr")):
1167 backends[Backend.ZARR.value] = True
1169 # Check for image files
1170 for ext in DEFAULT_IMAGE_EXTENSIONS:
1171 if list(output_dir.glob(f"*{ext}")):
1172 backends[Backend.DISK.value] = True
1173 break
1175 logger.debug(f"Backend detection result: {backends}")
1176 return backends
1179 def _materialize_special_outputs(self, filemanager, step_plan, special_outputs):
1180 """Load special data from memory and call materialization functions."""
1181 logger.debug(f"🔍 MATERIALIZE_METHOD: Processing {len(special_outputs)} special outputs")
1183 for output_key, output_info in special_outputs.items():
1184 logger.debug(f"🔍 MATERIALIZE_METHOD: Processing output_key: {output_key}")
1185 logger.debug(f"🔍 MATERIALIZE_METHOD: output_info: {output_info}")
1187 mat_func = output_info.get('materialization_function')
1188 logger.debug(f"🔍 MATERIALIZE_METHOD: materialization_function: {mat_func}")
1190 if mat_func:
1191 path = output_info['path']
1192 logger.info(f"🔬 MATERIALIZING: {output_key} from {path}")
1194 try:
1195 filemanager.ensure_directory(Path(path).parent, Backend.MEMORY.value)
1196 special_data = filemanager.load(path, Backend.MEMORY.value)
1197 logger.debug(f"🔍 MATERIALIZE_METHOD: Loaded special data type: {type(special_data)}")
1199 result_path = mat_func(special_data, path, filemanager)
1200 logger.info(f"🔬 MATERIALIZED: {output_key} → {result_path}")
1202 except Exception as e:
1203 logger.error(f"🔬 MATERIALIZATION ERROR: Failed to materialize {output_key}: {e}")
1204 raise
1205 else:
1206 logger.warning(f"🔬 MATERIALIZATION: No materialization function for {output_key}, skipping")