Coverage for openhcs/core/steps/function_step.py: 74.1%
623 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2FunctionStep implementation for pattern-based processing.
4This module contains the FunctionStep class. During execution, FunctionStep instances
5are stateless regarding their configuration. All operational parameters, including
6the function(s) to execute, special input/output keys, their VFS paths, and memory types,
7are retrieved from this step's entry in `context.step_plans`.
8"""
10import logging
11import os
12import time
13from pathlib import Path
14from typing import Any, Callable, Dict, List, Optional, Tuple, Union, OrderedDict as TypingOrderedDict, TYPE_CHECKING
16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17 because the condition on line 16 was never true
17 pass
20from openhcs.constants.constants import (DEFAULT_IMAGE_EXTENSIONS,
21 Backend,
22 VariableComponents)
23from openhcs.core.context.processing_context import ProcessingContext
24from openhcs.core.steps.abstract import AbstractStep
25from openhcs.formats.func_arg_prep import prepare_patterns_and_functions
26from openhcs.core.memory.stack_utils import stack_slices, unstack_slices
27# OpenHCS imports moved to local imports to avoid circular dependencies
30logger = logging.getLogger(__name__)
32def _generate_materialized_paths(memory_paths: List[str], step_output_dir: Path, materialized_output_dir: Path) -> List[str]:
33 """Generate materialized file paths by replacing step output directory."""
34 materialized_paths = []
35 for memory_path in memory_paths:
36 relative_path = Path(memory_path).relative_to(step_output_dir)
37 materialized_path = materialized_output_dir / relative_path
38 materialized_paths.append(str(materialized_path))
39 return materialized_paths
42def _filter_special_outputs_for_function(
43 outputs_to_save: List[str],
44 special_outputs_map: Dict,
45 dict_key: str
46) -> Dict:
47 """Filter and build channel-specific paths for special outputs.
49 Args:
50 outputs_to_save: List of output keys this function should save
51 special_outputs_map: Map of all special outputs for the step
52 dict_key: Dict pattern key (e.g., "1" for channel 1, or "default")
54 Returns:
55 Filtered map with channel-specific paths for dict patterns
56 """
57 from openhcs.core.pipeline.path_planner import PipelinePathPlanner
59 result = {}
60 for key in outputs_to_save:
61 if key in special_outputs_map: 61 ↛ 60line 61 didn't jump to line 60 because the condition on line 61 was always true
62 output_config = special_outputs_map[key].copy()
64 # For dict patterns, build channel-specific path
65 if dict_key != "default":
66 output_config['path'] = PipelinePathPlanner.build_dict_pattern_path(
67 output_config['path'], dict_key
68 )
70 result[key] = output_config
72 return result
75def _save_materialized_data(filemanager, memory_data: List, materialized_paths: List[str],
76 materialized_backend: str, step_plan: Dict, context, axis_id: str) -> None:
77 """Save data to materialized location using appropriate backend."""
79 # Build kwargs with parser metadata (all backends receive it)
80 save_kwargs = {
81 'parser_name': context.microscope_handler.parser.__class__.__name__,
82 'microscope_type': context.microscope_handler.microscope_type
83 }
85 if materialized_backend == Backend.ZARR.value:
86 n_channels, n_z, n_fields = _calculate_zarr_dimensions(materialized_paths, context.microscope_handler)
87 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id)
88 save_kwargs.update({
89 'chunk_name': axis_id,
90 'zarr_config': step_plan.get("zarr_config"),
91 'n_channels': n_channels,
92 'n_z': n_z,
93 'n_fields': n_fields,
94 'row': row,
95 'col': col
96 })
98 filemanager.save_batch(memory_data, materialized_paths, materialized_backend, **save_kwargs)
103def get_all_image_paths(input_dir, backend, axis_id, filemanager, microscope_handler):
104 """
105 Get all image file paths for a specific well from a directory.
107 Args:
108 input_dir: Directory to search for images
109 axis_id: Well identifier to filter files
110 backend: Backend to use for file listing
111 filemanager: FileManager instance
112 microscope_handler: Microscope handler with parser for filename parsing
114 Returns:
115 List of full file paths for the well
116 """
117 # List all image files in directory
118 all_image_files = filemanager.list_image_files(str(input_dir), backend)
120 # Filter by well using parser (FIXED: was using naive string matching)
121 axis_files = []
122 parser = microscope_handler.parser
124 for f in all_image_files:
125 filename = os.path.basename(str(f))
126 metadata = parser.parse_filename(filename)
127 # Use dynamic multiprocessing axis instead of hardcoded 'well'
128 from openhcs.constants import MULTIPROCESSING_AXIS
129 axis_key = MULTIPROCESSING_AXIS.value
130 if metadata and metadata.get(axis_key) == axis_id:
131 axis_files.append(str(f))
133 # Remove duplicates and sort
134 sorted_files = sorted(list(set(axis_files)))
136 # Prepare full file paths
137 input_dir_path = Path(input_dir)
138 full_file_paths = [str(input_dir_path / Path(f).name) for f in sorted_files]
140 logger.debug(f"Found {len(all_image_files)} total files, {len(full_file_paths)} for axis {axis_id}")
142 return full_file_paths
145def create_image_path_getter(axis_id, filemanager, microscope_handler):
146 """
147 Create a specialized image path getter function using runtime context.
149 Args:
150 axis_id: Well identifier
151 filemanager: FileManager instance
152 microscope_handler: Microscope handler with parser for filename parsing
154 Returns:
155 Function that takes (input_dir, backend) and returns image paths for the well
156 """
157 def get_paths_for_axis(input_dir, backend):
158 return get_all_image_paths(
159 input_dir=input_dir,
160 axis_id=axis_id,
161 backend=backend,
162 filemanager=filemanager,
163 microscope_handler=microscope_handler
164 )
165 return get_paths_for_axis
167# Environment variable to disable universal GPU defragmentation
168DISABLE_GPU_DEFRAG = os.getenv('OPENHCS_DISABLE_GPU_DEFRAG', 'false').lower() == 'true'
170def _bulk_preload_step_images(
171 step_input_dir: Path,
172 step_output_dir: Path,
173 axis_id: str,
174 read_backend: str,
175 patterns_by_well: Dict[str, Any],
176 filemanager: 'FileManager',
177 microscope_handler: 'MicroscopeHandler',
178 zarr_config: Optional[Dict[str, Any]] = None
179) -> None:
180 """
181 Pre-load all images for this step from source backend into memory backend.
183 This reduces I/O overhead by doing a single bulk read operation
184 instead of loading images per pattern group.
186 Note: External conditional logic ensures this is only called for non-memory backends.
187 """
188 import time
189 start_time = time.time()
191 logger.debug(f"🔄 BULK PRELOAD: Loading images from {read_backend} to memory for well {axis_id}")
193 # Get all files for this well from patterns
194 all_files = []
195 # Create specialized path getter for this well
196 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler)
198 # Get all image paths for this well
199 full_file_paths = get_paths_for_axis(step_input_dir, read_backend)
201 if not full_file_paths: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 raise RuntimeError(f"🔄 BULK PRELOAD: No files found for well {axis_id} in {step_input_dir} with backend {read_backend}")
204 # Load from source backend with conditional zarr_config
205 if read_backend == Backend.ZARR.value:
206 raw_images = filemanager.load_batch(full_file_paths, read_backend, zarr_config=zarr_config)
207 else:
208 raw_images = filemanager.load_batch(full_file_paths, read_backend)
210 # Ensure directory exists in memory backend before saving
211 filemanager.ensure_directory(str(step_input_dir), Backend.MEMORY.value)
213 # Save to memory backend using OUTPUT paths
214 # memory_paths = [str(step_output_dir / Path(fp).name) for fp in full_file_paths]
215 for file_path in full_file_paths:
216 if filemanager.exists(file_path, Backend.MEMORY.value):
217 filemanager.delete(file_path, Backend.MEMORY.value)
218 logger.debug(f"🔄 BULK PRELOAD: Deleted existing file {file_path} before bulk preload")
220 filemanager.save_batch(raw_images, full_file_paths, Backend.MEMORY.value)
221 logger.debug(f"🔄 BULK PRELOAD: Saving {file_path} to memory")
223 # Clean up source references - keep only memory backend references
224 del raw_images
226 load_time = time.time() - start_time
227 logger.debug(f"🔄 BULK PRELOAD: Completed in {load_time:.2f}s - {len(full_file_paths)} images now in memory")
229def _bulk_writeout_step_images(
230 step_output_dir: Path,
231 write_backend: str,
232 axis_id: str,
233 zarr_config: Optional[Dict[str, Any]],
234 filemanager: 'FileManager',
235 microscope_handler: Optional[Any] = None
236) -> None:
237 """
238 Write all processed images from memory to final backend (disk/zarr).
240 This reduces I/O overhead by doing a single bulk write operation
241 instead of writing images per pattern group.
243 Note: External conditional logic ensures this is only called for non-memory backends.
244 """
245 import time
246 start_time = time.time()
248 logger.debug(f"🔄 BULK WRITEOUT: Writing images from memory to {write_backend} for well {axis_id}")
250 # Create specialized path getter and get memory paths for this well
251 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler)
252 memory_file_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value)
254 if not memory_file_paths:
255 raise RuntimeError(f"🔄 BULK WRITEOUT: No image files found for well {axis_id} in memory directory {step_output_dir}")
257 # Convert relative memory paths back to absolute paths for target backend
258 # Memory backend stores relative paths, but target backend needs absolute paths
259# file_paths =
260# for memory_path in memory_file_paths:
261# # Get just the filename and construct proper target path
262# filename = Path(memory_path).name
263# target_path = step_output_dir / filename
264# file_paths.append(str(target_path))
266 file_paths = memory_file_paths
267 logger.debug(f"🔄 BULK WRITEOUT: Found {len(file_paths)} image files in memory to write")
269 # Load all data from memory backend
270 memory_data = filemanager.load_batch(file_paths, Backend.MEMORY.value)
272 # Ensure output directory exists before bulk write
273 filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value)
275 # Bulk write to target backend with conditional zarr_config
276 if write_backend == Backend.ZARR.value:
277 # Calculate zarr dimensions from file paths
278 if microscope_handler is not None:
279 n_channels, n_z, n_fields = _calculate_zarr_dimensions(file_paths, microscope_handler)
280 # Parse well to get row and column for zarr structure
281 row, col = microscope_handler.parser.extract_component_coordinates(axis_id)
282 filemanager.save_batch(memory_data, file_paths, write_backend,
283 chunk_name=axis_id, zarr_config=zarr_config,
284 n_channels=n_channels, n_z=n_z, n_fields=n_fields,
285 row=row, col=col)
286 else:
287 # Fallback without dimensions if microscope_handler not available
288 filemanager.save_batch(memory_data, file_paths, write_backend, chunk_name=axis_id, zarr_config=zarr_config)
289 else:
290 filemanager.save_batch(memory_data, file_paths, write_backend)
292 write_time = time.time() - start_time
293 logger.debug(f"🔄 BULK WRITEOUT: Completed in {write_time:.2f}s - {len(memory_data)} images written to {write_backend}")
295def _calculate_zarr_dimensions(file_paths: List[Union[str, Path]], microscope_handler) -> tuple[int, int, int]:
296 """
297 Calculate zarr dimensions (n_channels, n_z, n_fields) from file paths using microscope parser.
299 Args:
300 file_paths: List of file paths to analyze
301 microscope_handler: Microscope handler with filename parser
303 Returns:
304 Tuple of (n_channels, n_z, n_fields)
305 """
306 parsed_files = []
307 for file_path in file_paths:
308 filename = Path(file_path).name
309 metadata = microscope_handler.parser.parse_filename(filename)
310 parsed_files.append(metadata)
312 # Count unique values for each dimension from actual files
313 n_channels = len(set(f.get('channel') for f in parsed_files if f.get('channel') is not None))
314 n_z = len(set(f.get('z_index') for f in parsed_files if f.get('z_index') is not None))
315 n_fields = len(set(f.get('site') for f in parsed_files if f.get('site') is not None))
317 # Ensure at least 1 for each dimension (handle cases where metadata is missing)
318 n_channels = max(1, n_channels)
319 n_z = max(1, n_z)
320 n_fields = max(1, n_fields)
322 return n_channels, n_z, n_fields
326def _is_3d(array: Any) -> bool:
327 """Check if an array is 3D."""
328 return hasattr(array, 'ndim') and array.ndim == 3
330def _execute_function_core(
331 func_callable: Callable,
332 main_data_arg: Any,
333 base_kwargs: Dict[str, Any],
334 context: 'ProcessingContext',
335 special_inputs_plan: Dict[str, str], # {'arg_name_for_func': 'special_path_value'}
336 special_outputs_plan: TypingOrderedDict[str, str], # {'output_key': 'special_path_value'}, order matters
337 axis_id: str, # Add axis_id parameter
338 input_memory_type: str,
339 device_id: int
340) -> Any: # Returns the main processed data stack
341 """
342 Executes a single callable, handling its special I/O.
343 - Loads special inputs from VFS paths in `special_inputs_plan`.
344 - Calls `func_callable(main_data_arg, **all_kwargs)`.
345 - If `special_outputs_plan` is non-empty, expects func to return (main_out, sp_val1, sp_val2,...).
346 - Saves special outputs positionally to VFS paths in `special_outputs_plan`.
347 - Returns the main processed data stack.
348 """
349 final_kwargs = base_kwargs.copy()
351 if special_inputs_plan:
352 logger.info(f"�� SPECIAL_INPUTS_DEBUG : special_inputs_plan = {special_inputs_plan}")
353 for arg_name, path_info in special_inputs_plan.items():
354 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Processing arg_name='{arg_name}', path_info={path_info} (type: {type(path_info)})")
357 # Extract path string from the path info dictionary
358 # Current format: {"path": "/path/to/file.pkl", "source_step_id": "step_123"}
359 if isinstance(path_info, dict) and 'path' in path_info: 359 ↛ 363line 359 didn't jump to line 363 because the condition on line 359 was always true
360 special_path_value = path_info['path']
361 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Extracted path from dict: '{special_path_value}' (type: {type(special_path_value)})")
362 else:
363 special_path_value = path_info # Fallback if it's already a string
364 logger.info(f"🔍 SPECIAL_INPUTS_DEBUG: Using path_info directly: '{special_path_value}' (type: {type(special_path_value)})")
366 logger.info(f"Loading special input '{arg_name}' from path '{special_path_value}' (memory backend)")
367 try:
368 final_kwargs[arg_name] = context.filemanager.load(special_path_value, Backend.MEMORY.value)
369 except Exception as e:
370 logger.error(f"Failed to load special input '{arg_name}' from '{special_path_value}': {e}", exc_info=True)
371 raise
373 # Auto-inject context if function signature expects it
374 import inspect
375 sig = inspect.signature(func_callable)
376 if 'context' in sig.parameters: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 final_kwargs['context'] = context
379 # 🔍 DEBUG: Log input dimensions
380 input_shape = getattr(main_data_arg, 'shape', 'no shape attr')
381 input_type = type(main_data_arg).__name__
382 logger.debug(f"🔍 FUNCTION INPUT: {func_callable.__name__} - shape: {input_shape}, type: {input_type}")
384 # ⚡ INFO: Terse function execution log for user feedback
385 logger.info(f"⚡ Executing: {func_callable.__name__}")
387 # 🔍 DEBUG: Log function attributes before execution
388 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - special_outputs: {getattr(func_callable, '__special_outputs__', 'None')}")
389 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - input_memory_type: {getattr(func_callable, 'input_memory_type', 'None')}")
390 logger.debug(f"🔍 FUNCTION ATTRS: {func_callable.__name__} - output_memory_type: {getattr(func_callable, 'output_memory_type', 'None')}")
392 raw_function_output = func_callable(main_data_arg, **final_kwargs)
394 # 🔍 DEBUG: Log output dimensions and type details
395 output_shape = getattr(raw_function_output, 'shape', 'no shape attr')
396 output_type = type(raw_function_output).__name__
397 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - shape: {output_shape}, type: {output_type}")
399 # 🔍 DEBUG: If it's a tuple, log details about each element
400 if isinstance(raw_function_output, tuple):
401 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - tuple length: {len(raw_function_output)}")
402 for i, element in enumerate(raw_function_output):
403 elem_shape = getattr(element, 'shape', 'no shape attr')
404 elem_type = type(element).__name__
405 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - element[{i}]: shape={elem_shape}, type={elem_type}")
406 else:
407 logger.debug(f"🔍 FUNCTION OUTPUT: {func_callable.__name__} - not a tuple, single return value")
409 main_output_data = raw_function_output
411 # 🔍 DEBUG: Log special output plan status
412 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: {special_outputs_plan}")
413 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Is empty? {not special_outputs_plan}")
414 logger.debug(f"🔍 SPECIAL OUTPUT PLAN: Length: {len(special_outputs_plan) if special_outputs_plan else 0}")
416 # Only log special outputs if there are any (avoid spamming empty dict logs)
417 if special_outputs_plan:
418 logger.debug(f"🔍 SPECIAL OUTPUT: {special_outputs_plan}")
419 if special_outputs_plan:
420 num_special_outputs = len(special_outputs_plan)
421 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Expected {num_special_outputs} special outputs")
422 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned type: {type(raw_function_output)}")
423 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Function returned tuple length: {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'not tuple'}")
425 if not isinstance(raw_function_output, tuple) or len(raw_function_output) != (1 + num_special_outputs): 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true
426 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Function '{getattr(func_callable, '__name__', 'unknown')}' special output mismatch")
427 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Expected tuple of {1 + num_special_outputs} values")
428 logger.error(f"🔍 SPECIAL OUTPUT ERROR: Got {type(raw_function_output)} with {len(raw_function_output) if isinstance(raw_function_output, tuple) else 'N/A'} values")
429 raise ValueError(
430 f"Function '{getattr(func_callable, '__name__', 'unknown')}' was expected to return a tuple of "
431 f"{1 + num_special_outputs} values (main_output + {num_special_outputs} special) "
432 f"based on 'special_outputs' in step plan, but returned {len(raw_function_output) if isinstance(raw_function_output, tuple) else type(raw_function_output)} values."
433 )
434 main_output_data = raw_function_output[0]
435 returned_special_values_tuple = raw_function_output[1:]
437 # 🔍 DEBUG: Log what we extracted
438 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data type: {type(main_output_data)}")
439 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted main_output_data shape: {getattr(main_output_data, 'shape', 'no shape')}")
440 logger.debug(f"🔍 SPECIAL OUTPUT PROCESSING: Extracted {len(returned_special_values_tuple)} special values")
442 # Iterate through special_outputs_plan (which must be ordered by compiler)
443 # and match with positionally returned special values.
444 for i, (output_key, vfs_path_info) in enumerate(special_outputs_plan.items()):
445 logger.info(f"Saving special output '{output_key}' to VFS path '{vfs_path_info}' (memory backend)")
446 if i < len(returned_special_values_tuple): 446 ↛ 469line 446 didn't jump to line 469 because the condition on line 446 was always true
447 value_to_save = returned_special_values_tuple[i]
448 # Extract path string from the path info dictionary
449 # Current format: {"path": "/path/to/file.pkl"}
450 if isinstance(vfs_path_info, dict) and 'path' in vfs_path_info: 450 ↛ 453line 450 didn't jump to line 453 because the condition on line 450 was always true
451 vfs_path = vfs_path_info['path']
452 else:
453 vfs_path = vfs_path_info # Fallback if it's already a string
454 # # Add axis_id prefix to filename for memory backend to avoid thread collisions
455 # from pathlib import Path
456 # vfs_path_obj = Path(vfs_path)
457 # prefixed_filename = f"{axis_id}_{vfs_path_obj.name}"
458 # prefixed_vfs_path = str(vfs_path_obj.parent / prefixed_filename)
460 logger.info(f"🔍 SPECIAL_SAVE: Saving '{output_key}' to '{vfs_path}' (memory backend)")
461 # Ensure directory exists for memory backend
462 parent_dir = str(Path(vfs_path).parent)
463 context.filemanager.ensure_directory(parent_dir, Backend.MEMORY.value)
464 context.filemanager.save(value_to_save, vfs_path, Backend.MEMORY.value)
465 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory")
466 logger.info(f"🔍 SPECIAL_SAVE: Successfully saved '{output_key}' to memory")
467 else:
468 # This indicates a mismatch that should ideally be caught by schema/validation
469 logger.error(f"Mismatch: {num_special_outputs} special outputs planned, but fewer values returned by function for key '{output_key}'.")
470 # Or, if partial returns are allowed, this might be a warning. For now, error.
471 raise ValueError(f"Function did not return enough values for all planned special outputs. Missing value for '{output_key}'.")
473 return main_output_data
475def _execute_chain_core(
476 initial_data_stack: Any,
477 func_chain: List[Union[Callable, Tuple[Callable, Dict]]],
478 context: 'ProcessingContext',
479 step_special_inputs_plan: Dict[str, str],
480 step_special_outputs_plan: TypingOrderedDict[str, str],
481 axis_id: str, # Add axis_id parameter
482 device_id: int,
483 input_memory_type: str,
484 step_index: int, # Add step_index for funcplan lookup
485 dict_key: str = "default" # Add dict_key for funcplan lookup
486) -> Any:
487 current_stack = initial_data_stack
488 current_memory_type = input_memory_type # Track memory type from frozen context
490 for i, func_item in enumerate(func_chain):
491 actual_callable: Callable
492 base_kwargs_for_item: Dict[str, Any] = {}
493 is_last_in_chain = (i == len(func_chain) - 1)
495 if isinstance(func_item, tuple) and len(func_item) == 2 and callable(func_item[0]): 495 ↛ 497line 495 didn't jump to line 497 because the condition on line 495 was always true
496 actual_callable, base_kwargs_for_item = func_item
497 elif callable(func_item):
498 actual_callable = func_item
499 else:
500 raise TypeError(f"Invalid item in function chain: {func_item}.")
502 # Convert to function's input memory type (noop if same)
503 from openhcs.core.memory.converters import convert_memory
504 current_stack = convert_memory(
505 data=current_stack,
506 source_type=current_memory_type,
507 target_type=actual_callable.input_memory_type,
508 gpu_id=device_id
509 )
511 # Use funcplan to determine which outputs this function should save
512 funcplan = context.step_plans[step_index].get("funcplan", {})
513 func_name = getattr(actual_callable, '__name__', 'unknown')
515 # Construct execution key: function_name_dict_key_chain_position
516 execution_key = f"{func_name}_{dict_key}_{i}"
518 logger.info(f"🔍 FUNCPLAN DEBUG: execution_key = {execution_key}")
519 logger.info(f"🔍 FUNCPLAN DEBUG: funcplan keys = {list(funcplan.keys()) if funcplan else 'EMPTY'}")
520 logger.info(f"🔍 FUNCPLAN DEBUG: step_special_outputs_plan = {step_special_outputs_plan}")
522 if execution_key in funcplan: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true
523 outputs_to_save = funcplan[execution_key]
524 outputs_plan_for_this_call = _filter_special_outputs_for_function(
525 outputs_to_save, step_special_outputs_plan, dict_key
526 )
527 logger.info(f"🔍 FUNCPLAN: {execution_key} -> {outputs_to_save}")
528 logger.info(f"🔍 FUNCPLAN: outputs_plan_for_this_call = {outputs_plan_for_this_call}")
529 else:
530 # Fallback: no funcplan entry, save nothing
531 outputs_plan_for_this_call = {}
532 logger.info(f"🔍 FUNCPLAN: No entry for {execution_key}, saving nothing")
534 current_stack = _execute_function_core(
535 func_callable=actual_callable,
536 main_data_arg=current_stack,
537 base_kwargs=base_kwargs_for_item,
538 context=context,
539 special_inputs_plan=step_special_inputs_plan,
540 special_outputs_plan=outputs_plan_for_this_call,
541 axis_id=axis_id,
542 device_id=device_id,
543 input_memory_type=input_memory_type,
544 )
546 # Update current memory type from frozen context
547 current_memory_type = actual_callable.output_memory_type
549 return current_stack
551def _process_single_pattern_group(
552 context: 'ProcessingContext',
553 pattern_group_info: Any,
554 executable_func_or_chain: Any,
555 base_func_args: Dict[str, Any],
556 step_input_dir: Path,
557 step_output_dir: Path,
558 axis_id: str,
559 component_value: str,
560 read_backend: str,
561 write_backend: str,
562 input_memory_type_from_plan: str, # Explicitly from plan
563 output_memory_type_from_plan: str, # Explicitly from plan
564 device_id: Optional[int],
565 same_directory: bool,
566 special_inputs_map: Dict[str, str],
567 special_outputs_map: TypingOrderedDict[str, str],
568 zarr_config: Optional[Dict[str, Any]],
569 variable_components: Optional[List[str]] = None,
570 step_index: Optional[int] = None # Add step_index for funcplan lookup
571) -> None:
572 start_time = time.time()
573 pattern_repr = str(pattern_group_info)[:100]
574 logger.debug(f"🔥 PATTERN: Processing {pattern_repr} for well {axis_id}")
576 try:
577 if not context.microscope_handler: 577 ↛ 578line 577 didn't jump to line 578 because the condition on line 577 was never true
578 raise RuntimeError("MicroscopeHandler not available in context.")
580 matching_files = context.microscope_handler.path_list_from_pattern(
581 str(step_input_dir), pattern_group_info, context.filemanager, Backend.MEMORY.value,
582 [vc.value for vc in variable_components] if variable_components else None
583 )
585 if not matching_files: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 raise ValueError(
587 f"No matching files found for pattern group {pattern_repr} in {step_input_dir}. "
588 f"This indicates either: (1) no image files exist in the directory, "
589 f"(2) files don't match the pattern, or (3) pattern parsing failed. "
590 f"Check that input files exist and match the expected naming convention."
591 )
593 logger.debug(f"🔥 PATTERN: Found {len(matching_files)} files: {[Path(f).name for f in matching_files]}")
595 # Sort files to ensure consistent ordering (especially important for z-stacks)
596 matching_files.sort()
597 logger.debug(f"🔥 PATTERN: Sorted files: {[Path(f).name for f in matching_files]}")
599 full_file_paths = [str(step_input_dir / f) for f in matching_files]
600 raw_slices = context.filemanager.load_batch(full_file_paths, Backend.MEMORY.value)
602 if not raw_slices: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true
603 raise ValueError(
604 f"No valid images loaded for pattern group {pattern_repr} in {step_input_dir}. "
605 f"Found {len(matching_files)} matching files but failed to load any valid images. "
606 f"This indicates corrupted image files, unsupported formats, or I/O errors. "
607 f"Check file integrity and format compatibility."
608 )
610 # 🔍 DEBUG: Log stacking operation
611 logger.debug(f"🔍 STACKING: {len(raw_slices)} slices → memory_type: {input_memory_type_from_plan}")
612 if raw_slices: 612 ↛ 616line 612 didn't jump to line 616 because the condition on line 612 was always true
613 slice_shapes = [getattr(s, 'shape', 'no shape') for s in raw_slices[:3]] # First 3 shapes
614 logger.debug(f"🔍 STACKING: Sample slice shapes: {slice_shapes}")
616 main_data_stack = stack_slices(
617 slices=raw_slices, memory_type=input_memory_type_from_plan, gpu_id=device_id
618 )
620 # 🔍 DEBUG: Log stacked result
621 stack_shape = getattr(main_data_stack, 'shape', 'no shape')
622 stack_type = type(main_data_stack).__name__
623 logger.debug(f"🔍 STACKED RESULT: shape: {stack_shape}, type: {stack_type}")
625 logger.info(f"🔍 special_outputs_map: {special_outputs_map}")
627 final_base_kwargs = base_func_args.copy()
629 # Get step function from step plan
630 step_func = context.step_plans[step_index]["func"]
632 if isinstance(step_func, dict):
633 dict_key_for_funcplan = component_value # Use actual dict key for dict patterns
634 else:
635 dict_key_for_funcplan = "default" # Use default for list/single patterns
637 if isinstance(executable_func_or_chain, list):
638 processed_stack = _execute_chain_core(
639 main_data_stack, executable_func_or_chain, context,
640 special_inputs_map, special_outputs_map, axis_id,
641 device_id, input_memory_type_from_plan, step_index, dict_key_for_funcplan
642 )
643 elif callable(executable_func_or_chain): 643 ↛ 670line 643 didn't jump to line 670 because the condition on line 643 was always true
644 # For single functions, apply funcplan filtering like in chain execution
645 funcplan = context.step_plans[step_index].get("funcplan", {})
646 func_name = getattr(executable_func_or_chain, '__name__', 'unknown')
647 execution_key = f"{func_name}_{dict_key_for_funcplan}_0" # Position 0 for single functions
649 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: execution_key = {execution_key}")
650 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: funcplan keys = {list(funcplan.keys()) if funcplan else 'EMPTY'}")
651 logger.info(f"🔍 SINGLE FUNC FUNCPLAN DEBUG: special_outputs_map = {special_outputs_map}")
653 if execution_key in funcplan:
654 outputs_to_save = funcplan[execution_key]
655 filtered_special_outputs_map = _filter_special_outputs_for_function(
656 outputs_to_save, special_outputs_map, dict_key_for_funcplan
657 )
658 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: {execution_key} -> {outputs_to_save}")
659 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: filtered_special_outputs_map = {filtered_special_outputs_map}")
660 else:
661 # Fallback: no funcplan entry, save nothing
662 filtered_special_outputs_map = {}
663 logger.info(f"🔍 SINGLE FUNC FUNCPLAN: No entry for {execution_key}, saving nothing")
665 processed_stack = _execute_function_core(
666 executable_func_or_chain, main_data_stack, final_base_kwargs, context,
667 special_inputs_map, filtered_special_outputs_map, axis_id, input_memory_type_from_plan, device_id
668 )
669 else:
670 raise TypeError(f"Invalid executable_func_or_chain: {type(executable_func_or_chain)}")
672 # 🔍 DEBUG: Check what shape the function actually returned
673 input_shape = getattr(main_data_stack, 'shape', 'unknown')
674 output_shape = getattr(processed_stack, 'shape', 'unknown')
675 processed_type = type(processed_stack).__name__
676 logger.debug(f"🔍 PROCESSING RESULT: input: {input_shape} → output: {output_shape}, type: {processed_type}")
678 # 🔍 DEBUG: Additional validation logging
679 logger.debug(f"🔍 VALIDATION: processed_stack type: {type(processed_stack)}")
680 logger.debug(f"🔍 VALIDATION: processed_stack has shape attr: {hasattr(processed_stack, 'shape')}")
681 logger.debug(f"🔍 VALIDATION: processed_stack has ndim attr: {hasattr(processed_stack, 'ndim')}")
682 if hasattr(processed_stack, 'ndim'): 682 ↛ 684line 682 didn't jump to line 684 because the condition on line 682 was always true
683 logger.debug(f"🔍 VALIDATION: processed_stack ndim: {processed_stack.ndim}")
684 if hasattr(processed_stack, 'shape'): 684 ↛ 687line 684 didn't jump to line 687 because the condition on line 684 was always true
685 logger.debug(f"🔍 VALIDATION: processed_stack shape: {processed_stack.shape}")
687 if not _is_3d(processed_stack): 687 ↛ 688line 687 didn't jump to line 688 because the condition on line 687 was never true
688 logger.error("🔍 VALIDATION ERROR: processed_stack is not 3D")
689 logger.error(f"🔍 VALIDATION ERROR: Type: {type(processed_stack)}")
690 logger.error(f"🔍 VALIDATION ERROR: Shape: {getattr(processed_stack, 'shape', 'no shape attr')}")
691 logger.error(f"🔍 VALIDATION ERROR: Has ndim: {hasattr(processed_stack, 'ndim')}")
692 if hasattr(processed_stack, 'ndim'):
693 logger.error(f"🔍 VALIDATION ERROR: ndim value: {processed_stack.ndim}")
694 raise ValueError(f"Main processing must result in a 3D array, got {getattr(processed_stack, 'shape', 'unknown')}")
696 # 🔍 DEBUG: Log unstacking operation
697 logger.debug(f"🔍 UNSTACKING: shape: {output_shape} → memory_type: {output_memory_type_from_plan}")
701 output_slices = unstack_slices(
702 array=processed_stack, memory_type=output_memory_type_from_plan, gpu_id=device_id, validate_slices=True
703 )
705 # 🔍 DEBUG: Log unstacked result
706 if output_slices: 706 ↛ 712line 706 didn't jump to line 712 because the condition on line 706 was always true
707 unstacked_shapes = [getattr(s, 'shape', 'no shape') for s in output_slices[:3]] # First 3 shapes
708 logger.debug(f"🔍 UNSTACKED RESULT: {len(output_slices)} slices, sample shapes: {unstacked_shapes}")
710 # Handle cases where function returns fewer images than inputs (e.g., z-stack flattening, channel compositing)
711 # In such cases, we save only the returned images using the first N input filenames
712 num_outputs = len(output_slices)
713 num_inputs = len(matching_files)
715 if num_outputs < num_inputs:
716 logger.debug(f"Function returned {num_outputs} images from {num_inputs} inputs - likely flattening operation")
717 elif num_outputs > num_inputs: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true
718 logger.warning(f"Function returned more images ({num_outputs}) than inputs ({num_inputs}) - unexpected")
720 # Save the output images using batch operations
721 try:
722 # Prepare batch data
723 output_data = []
724 output_paths_batch = []
726 for i, img_slice in enumerate(output_slices):
727 # FAIL FAST: No fallback filenames - if we have more outputs than inputs, something is wrong
728 if i >= len(matching_files): 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true
729 raise ValueError(
730 f"Function returned {num_outputs} output slices but only {num_inputs} input files available. "
731 f"Cannot generate filename for output slice {i}. This indicates a bug in the function or "
732 f"unstacking logic - functions should return same or fewer images than inputs."
733 )
735 input_filename = matching_files[i]
736 output_filename = Path(input_filename).name
737 output_path = Path(step_output_dir) / output_filename
739 # Always ensure we can write to the output path (delete if exists)
740 if context.filemanager.exists(str(output_path), Backend.MEMORY.value):
741 context.filemanager.delete(str(output_path), Backend.MEMORY.value)
743 output_data.append(img_slice)
744 output_paths_batch.append(str(output_path))
746 # Ensure directory exists
747 context.filemanager.ensure_directory(str(step_output_dir), Backend.MEMORY.value)
749 # Only pass zarr_config to zarr backend - fail loud for invalid parameters
750 #if write_backend == Backend.ZARR.value:
751 # Batch save
752 # context.filemanager.save_batch(output_data, output_paths_batch, write_backend, zarr_config=zarr_config)
753 # else:
754 context.filemanager.save_batch(output_data, output_paths_batch, Backend.MEMORY.value)
756 except Exception as e:
757 logger.error(f"Error saving batch of output slices for pattern {pattern_repr}: {e}", exc_info=True)
759 # 🔥 CLEANUP: If function returned fewer images than inputs, delete the unused input files
760 # This prevents unused channel files from remaining in memory after compositing
761 if num_outputs < num_inputs:
762 for j in range(num_outputs, num_inputs):
763 unused_input_filename = matching_files[j]
764 unused_input_path = Path(step_input_dir) / unused_input_filename
765 if context.filemanager.exists(str(unused_input_path), Backend.MEMORY.value): 765 ↛ 762line 765 didn't jump to line 762 because the condition on line 765 was always true
766 context.filemanager.delete(str(unused_input_path), Backend.MEMORY.value)
767 logger.debug(f"🔥 CLEANUP: Deleted unused input file: {unused_input_filename}")
771 logger.debug(f"Finished pattern group {pattern_repr} in {(time.time() - start_time):.2f}s.")
772 except Exception as e:
773 import traceback
774 full_traceback = traceback.format_exc()
775 logger.error(f"Error processing pattern group {pattern_repr}: {e}", exc_info=True)
776 logger.error(f"Full traceback for pattern group {pattern_repr}:\n{full_traceback}")
777 raise ValueError(f"Failed to process pattern group {pattern_repr}: {e}") from e
779class FunctionStep(AbstractStep):
781 def __init__(
782 self,
783 func: Union[Callable, Tuple[Callable, Dict], List[Union[Callable, Tuple[Callable, Dict]]]],
784 **kwargs
785 ):
786 # Generate default name from function if not provided
787 if 'name' not in kwargs or kwargs['name'] is None:
788 actual_func_for_name = func
789 if isinstance(func, tuple): 789 ↛ 790line 789 didn't jump to line 790 because the condition on line 789 was never true
790 actual_func_for_name = func[0]
791 elif isinstance(func, list) and func: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true
792 first_item = func[0]
793 if isinstance(first_item, tuple):
794 actual_func_for_name = first_item[0]
795 elif callable(first_item):
796 actual_func_for_name = first_item
797 kwargs['name'] = getattr(actual_func_for_name, '__name__', 'FunctionStep')
799 super().__init__(**kwargs)
800 self.func = func # This is used by prepare_patterns_and_functions at runtime
802 def process(self, context: 'ProcessingContext', step_index: int) -> None:
803 # Access step plan by index (step_plans keyed by index, not step_id)
804 step_plan = context.step_plans[step_index]
806 # Get step name for logging
807 step_name = step_plan['step_name']
809 try:
810 axis_id = step_plan['axis_id']
811 step_input_dir = Path(step_plan['input_dir'])
812 step_output_dir = Path(step_plan['output_dir'])
813 variable_components = step_plan['variable_components']
814 group_by = step_plan['group_by']
815 func_from_plan = step_plan['func']
817 # special_inputs/outputs are dicts: {'key': 'vfs_path_value'}
818 special_inputs = step_plan['special_inputs']
819 special_outputs = step_plan['special_outputs'] # Should be OrderedDict if order matters
821 read_backend = step_plan['read_backend']
822 write_backend = step_plan['write_backend']
823 input_mem_type = step_plan['input_memory_type']
824 output_mem_type = step_plan['output_memory_type']
825 microscope_handler = context.microscope_handler
826 filemanager = context.filemanager
828 # Create path getter for this well
829 get_paths_for_axis = create_image_path_getter(axis_id, filemanager, microscope_handler)
831 # Store path getter in step_plan for streaming access
832 step_plan["get_paths_for_axis"] = get_paths_for_axis
834 # Get patterns first for bulk preload
835 # Use dynamic filter parameter based on current multiprocessing axis
836 from openhcs.constants import MULTIPROCESSING_AXIS
837 axis_name = MULTIPROCESSING_AXIS.value
838 filter_kwargs = {f"{axis_name}_filter": [axis_id]}
840 patterns_by_well = microscope_handler.auto_detect_patterns(
841 str(step_input_dir), # folder_path
842 filemanager, # filemanager
843 read_backend, # backend
844 extensions=DEFAULT_IMAGE_EXTENSIONS, # extensions
845 group_by=group_by, # Pass GroupBy enum directly
846 variable_components=[vc.value for vc in variable_components] if variable_components else [], # variable_components for placeholder logic
847 **filter_kwargs # Dynamic filter parameter
848 )
851 # Only access gpu_id if the step requires GPU (has GPU memory types)
852 from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES
853 requires_gpu = (input_mem_type in VALID_GPU_MEMORY_TYPES or
854 output_mem_type in VALID_GPU_MEMORY_TYPES)
856 # Ensure variable_components is never None - use default if missing
857 if variable_components is None: 857 ↛ 858line 857 didn't jump to line 858 because the condition on line 857 was never true
858 variable_components = [VariableComponents.SITE] # Default fallback
859 logger.warning(f"Step {step_index} ({step_name}) had None variable_components, using default [SITE]")
860 if requires_gpu: 860 ↛ 861line 860 didn't jump to line 861 because the condition on line 860 was never true
861 device_id = step_plan['gpu_id']
862 logger.debug(f"🔥 DEBUG: Step {step_index} gpu_id from plan: {device_id}, input_mem: {input_mem_type}, output_mem: {output_mem_type}")
863 else:
864 device_id = None # CPU-only step
865 logger.debug(f"🔥 DEBUG: Step {step_index} is CPU-only, input_mem: {input_mem_type}, output_mem: {output_mem_type}")
867 logger.debug(f"🔥 DEBUG: Step {step_index} read_backend: {read_backend}, write_backend: {write_backend}")
869 if not all([axis_id, step_input_dir, step_output_dir]): 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true
870 raise ValueError(f"Plan missing essential keys for step {step_index}")
872 same_dir = str(step_input_dir) == str(step_output_dir)
873 logger.info(f"Step {step_index} ({step_name}) I/O: read='{read_backend}', write='{write_backend}'.")
874 logger.info(f"Step {step_index} ({step_name}) Paths: input_dir='{step_input_dir}', output_dir='{step_output_dir}', same_dir={same_dir}")
876 # 🔄 MATERIALIZATION READ: Bulk preload if not reading from memory
877 if read_backend != Backend.MEMORY.value:
878 _bulk_preload_step_images(step_input_dir, step_output_dir, axis_id, read_backend,
879 patterns_by_well,filemanager, microscope_handler, step_plan["zarr_config"])
881 # 🔄 INPUT CONVERSION: Convert loaded input data to zarr if configured
882 if "input_conversion_dir" in step_plan:
883 input_conversion_dir = step_plan["input_conversion_dir"]
884 input_conversion_backend = step_plan["input_conversion_backend"]
886 logger.info(f"Converting input data to zarr: {input_conversion_dir}")
888 # Get memory paths from input data (already loaded)
889 memory_paths = get_paths_for_axis(step_input_dir, Backend.MEMORY.value)
890 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
892 # Generate conversion paths (input_dir → conversion_dir)
893 conversion_paths = _generate_materialized_paths(memory_paths, Path(step_input_dir), Path(input_conversion_dir))
895 # Parse actual filenames to determine dimensions
896 # Calculate zarr dimensions from conversion paths (which contain the filenames)
897 n_channels, n_z, n_fields = _calculate_zarr_dimensions(conversion_paths, context.microscope_handler)
898 # Parse well to get row and column for zarr structure
899 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id)
901 # Save using existing materialized data infrastructure
902 _save_materialized_data(filemanager, memory_data, conversion_paths, input_conversion_backend, step_plan, context, axis_id)
904 logger.info(f"🔬 Converted {len(conversion_paths)} input files to {input_conversion_dir}")
906 # Update metadata after conversion
907 conversion_dir = Path(step_plan["input_conversion_dir"])
908 zarr_subdir = conversion_dir.name if step_plan["input_conversion_uses_virtual_workspace"] else None
909 _update_metadata_for_zarr_conversion(
910 conversion_dir.parent,
911 step_plan["input_conversion_original_subdir"],
912 zarr_subdir,
913 context
914 )
916 logger.info(f"🔥 STEP: Starting processing for '{step_name}' well {axis_id} (group_by={group_by.name if group_by else None}, variable_components={[vc.name for vc in variable_components] if variable_components else []})")
918 if axis_id not in patterns_by_well: 918 ↛ 919line 918 didn't jump to line 919 because the condition on line 918 was never true
919 raise ValueError(
920 f"No patterns detected for well '{axis_id}' in step '{step_name}' (index: {step_index}). "
921 f"This indicates either: (1) no image files found for this well, "
922 f"(2) image files don't match the expected naming pattern, or "
923 f"(3) pattern detection failed. Check input directory: {step_input_dir}"
924 )
926 if isinstance(patterns_by_well[axis_id], dict): 926 ↛ 932line 926 didn't jump to line 932 because the condition on line 926 was always true
927 # Grouped patterns (when group_by is set)
928 for comp_val, pattern_list in patterns_by_well[axis_id].items():
929 logger.debug(f"🔥 STEP: Component '{comp_val}' has {len(pattern_list)} patterns: {pattern_list}")
930 else:
931 # Ungrouped patterns (when group_by is None)
932 logger.debug(f"🔥 STEP: Found {len(patterns_by_well[axis_id])} ungrouped patterns: {patterns_by_well[axis_id]}")
934 if func_from_plan is None: 934 ↛ 935line 934 didn't jump to line 935 because the condition on line 934 was never true
935 raise ValueError(f"Step plan missing 'func' for step: {step_plan.get('step_name', 'Unknown')} (index: {step_index})")
937 grouped_patterns, comp_to_funcs, comp_to_base_args = prepare_patterns_and_functions(
938 patterns_by_well[axis_id], func_from_plan, component=group_by.value if group_by else None
939 )
941 logger.info(f"🔍 DICT_PATTERN: grouped_patterns keys: {list(grouped_patterns.keys())}")
942 logger.info(f"🔍 DICT_PATTERN: comp_to_funcs keys: {list(comp_to_funcs.keys())}")
943 logger.info(f"🔍 DICT_PATTERN: func_from_plan type: {type(func_from_plan)}")
944 if isinstance(func_from_plan, dict):
945 logger.info(f"🔍 DICT_PATTERN: func_from_plan keys: {list(func_from_plan.keys())}")
947 for comp_val, current_pattern_list in grouped_patterns.items():
948 logger.info(f"🔍 DICT_PATTERN: Processing component '{comp_val}' with {len(current_pattern_list)} patterns")
949 exec_func_or_chain = comp_to_funcs[comp_val]
950 base_kwargs = comp_to_base_args[comp_val]
951 logger.info(f"🔍 DICT_PATTERN: Component '{comp_val}' exec_func_or_chain: {exec_func_or_chain}")
952 for pattern_item in current_pattern_list:
953 _process_single_pattern_group(
954 context, pattern_item, exec_func_or_chain, base_kwargs,
955 step_input_dir, step_output_dir, axis_id, comp_val,
956 read_backend, write_backend, input_mem_type, output_mem_type,
957 device_id, same_dir,
958 special_inputs, special_outputs, # Pass the maps from step_plan
959 step_plan["zarr_config"],
960 variable_components, step_index # Pass step_index for funcplan lookup
961 )
962 logger.info(f"🔥 STEP: Completed processing for '{step_name}' well {axis_id}.")
964 # 📄 MATERIALIZATION WRITE: Only if not writing to memory
965 if write_backend != Backend.MEMORY.value:
966 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value)
967 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
968 # Calculate zarr dimensions (ignored by non-zarr backends)
969 n_channels, n_z, n_fields = _calculate_zarr_dimensions(memory_paths, context.microscope_handler)
970 row, col = context.microscope_handler.parser.extract_component_coordinates(axis_id)
971 filemanager.ensure_directory(step_output_dir, write_backend)
973 # Build save kwargs with parser metadata for all backends
974 save_kwargs = {
975 'chunk_name': axis_id,
976 'zarr_config': step_plan["zarr_config"],
977 'n_channels': n_channels,
978 'n_z': n_z,
979 'n_fields': n_fields,
980 'row': row,
981 'col': col,
982 'parser_name': context.microscope_handler.parser.__class__.__name__,
983 'microscope_type': context.microscope_handler.microscope_type
984 }
986 filemanager.save_batch(memory_data, memory_paths, write_backend, **save_kwargs)
988 # 📄 PER-STEP MATERIALIZATION: Additional materialized output if configured
989 if "materialized_output_dir" in step_plan:
990 materialized_output_dir = step_plan["materialized_output_dir"]
991 materialized_backend = step_plan["materialized_backend"]
993 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value)
994 memory_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
995 materialized_paths = _generate_materialized_paths(memory_paths, step_output_dir, Path(materialized_output_dir))
997 filemanager.ensure_directory(materialized_output_dir, materialized_backend)
998 _save_materialized_data(filemanager, memory_data, materialized_paths, materialized_backend, step_plan, context, axis_id)
1000 logger.info(f"🔬 Materialized {len(materialized_paths)} files to {materialized_output_dir}")
1002 # 📄 STREAMING: Execute all configured streaming backends
1003 from openhcs.core.config import StreamingConfig
1005 streaming_configs_found = []
1006 for key, config_instance in step_plan.items():
1007 if isinstance(config_instance, StreamingConfig):
1008 streaming_configs_found.append((key, config_instance))
1010 for key, config_instance in streaming_configs_found:
1011 # Get paths at runtime like materialization does
1012 step_output_dir = step_plan["output_dir"]
1013 get_paths_for_axis = step_plan["get_paths_for_axis"] # Get the path getter from step_plan
1015 # Get memory paths (where data actually is)
1016 memory_paths = get_paths_for_axis(step_output_dir, Backend.MEMORY.value)
1018 # For materialized steps, use materialized paths for streaming (for correct source extraction)
1019 # but load from memory paths (where data actually is)
1020 if "materialized_output_dir" in step_plan: 1020 ↛ 1027line 1020 didn't jump to line 1027 because the condition on line 1020 was always true
1021 materialized_output_dir = step_plan["materialized_output_dir"]
1022 streaming_paths = _generate_materialized_paths(memory_paths, step_output_dir, Path(materialized_output_dir))
1023 logger.info(f"🔍 STREAMING: Materialized step - loading from memory, streaming with materialized paths")
1024 logger.info(f"🔍 STREAMING: First memory path: {memory_paths[0] if memory_paths else 'NONE'}")
1025 logger.info(f"🔍 STREAMING: First streaming path: {streaming_paths[0] if streaming_paths else 'NONE'}")
1026 else:
1027 streaming_paths = memory_paths
1029 # Load from memory (where data actually is)
1030 streaming_data = filemanager.load_batch(memory_paths, Backend.MEMORY.value)
1031 kwargs = config_instance.get_streaming_kwargs(context) # Pass context for microscope handler access
1033 # Add pre-built source value for layer/window naming
1034 # During pipeline execution: source = step_name
1035 kwargs["source"] = step_name
1037 # Execute streaming - use streaming_paths (materialized paths) for metadata extraction
1038 filemanager.save_batch(streaming_data, streaming_paths, config_instance.backend.value, **kwargs)
1040 # Add small delay between image and ROI streaming to prevent race conditions
1041 import time
1042 time.sleep(0.1)
1044 logger.info(f"FunctionStep {step_index} ({step_name}) completed for well {axis_id}.")
1046 # 📄 OPENHCS METADATA: Create metadata file automatically after step completion
1047 # Track which backend was actually used for writing files
1048 actual_write_backend = step_plan['write_backend']
1050 # Only create OpenHCS metadata for disk/zarr backends, not OMERO
1051 # OMERO has its own metadata system and doesn't use openhcs_metadata.json
1052 if actual_write_backend not in [Backend.OMERO_LOCAL.value, Backend.MEMORY.value]:
1053 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator
1054 metadata_generator = OpenHCSMetadataGenerator(context.filemanager)
1056 # Main step output metadata
1057 is_pipeline_output = (actual_write_backend != Backend.MEMORY.value)
1058 metadata_generator.create_metadata(
1059 context,
1060 step_plan['output_dir'],
1061 actual_write_backend,
1062 is_main=is_pipeline_output,
1063 plate_root=step_plan['output_plate_root'],
1064 sub_dir=step_plan['sub_dir'],
1065 results_dir=step_plan.get('analysis_results_dir') # Pass pre-calculated results directory
1066 )
1068 # 📄 MATERIALIZED METADATA: Create metadata for materialized directory if it exists
1069 # This must be OUTSIDE the main write_backend check because materializations
1070 # can happen even when the main step writes to memory
1071 if 'materialized_output_dir' in step_plan:
1072 materialized_backend = step_plan['materialized_backend']
1073 # Only create metadata if materialized backend is also disk/zarr
1074 if materialized_backend not in [Backend.OMERO_LOCAL.value, Backend.MEMORY.value]: 1074 ↛ 1088line 1074 didn't jump to line 1088 because the condition on line 1074 was always true
1075 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator
1076 metadata_generator = OpenHCSMetadataGenerator(context.filemanager)
1077 metadata_generator.create_metadata(
1078 context,
1079 step_plan['materialized_output_dir'],
1080 materialized_backend,
1081 is_main=False,
1082 plate_root=step_plan['materialized_plate_root'],
1083 sub_dir=step_plan['materialized_sub_dir'],
1084 results_dir=step_plan.get('materialized_analysis_results_dir') # Pass pre-calculated materialized results directory
1085 )
1087 # SPECIAL DATA MATERIALIZATION
1088 special_outputs = step_plan.get('special_outputs', {})
1089 logger.debug(f"🔍 MATERIALIZATION: special_outputs from step_plan: {special_outputs}")
1090 logger.debug(f"🔍 MATERIALIZATION: special_outputs is empty? {not special_outputs}")
1091 if special_outputs:
1092 logger.info(f"🔬 MATERIALIZATION: Starting materialization for {len(special_outputs)} special outputs")
1093 # Special outputs ALWAYS use the main materialization backend (disk/zarr),
1094 # not the step's write backend (which may be memory for intermediate steps).
1095 # This ensures analysis results are always persisted to disk.
1096 from openhcs.core.pipeline.materialization_flag_planner import MaterializationFlagPlanner
1097 vfs_config = context.get_vfs_config()
1098 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config)
1099 logger.debug(f"🔍 MATERIALIZATION: Using materialization backend '{materialization_backend}' for special outputs (step write backend is '{actual_write_backend}')")
1100 self._materialize_special_outputs(filemanager, step_plan, special_outputs, materialization_backend, context)
1101 logger.info("🔬 MATERIALIZATION: Completed materialization")
1102 else:
1103 logger.debug("🔍 MATERIALIZATION: No special outputs to materialize")
1107 except Exception as e:
1108 import traceback
1109 full_traceback = traceback.format_exc()
1110 logger.error(f"Error in FunctionStep {step_index} ({step_name}): {e}", exc_info=True)
1111 logger.error(f"Full traceback for FunctionStep {step_index} ({step_name}):\n{full_traceback}")
1115 raise
1118 def _extract_component_metadata(self, context: 'ProcessingContext', component: 'VariableComponents') -> Optional[Dict[str, str]]:
1119 """
1120 Extract component metadata from context cache safely.
1122 Args:
1123 context: ProcessingContext containing metadata_cache
1124 component: VariableComponents enum specifying which component to extract
1126 Returns:
1127 Dictionary mapping component keys to display names, or None if not available
1128 """
1129 try:
1130 if hasattr(context, 'metadata_cache') and context.metadata_cache:
1131 return context.metadata_cache.get(component, None)
1132 else:
1133 logger.debug(f"No metadata_cache available in context for {component.value}")
1134 return None
1135 except Exception as e:
1136 logger.debug(f"Error extracting {component.value} metadata from cache: {e}")
1137 return None
1139 def _create_openhcs_metadata_for_materialization(
1140 self,
1141 context: 'ProcessingContext',
1142 output_dir: str,
1143 write_backend: str
1144 ) -> None:
1145 """
1146 Create OpenHCS metadata file for materialization writes.
1148 Args:
1149 context: ProcessingContext containing microscope_handler and other state
1150 output_dir: Output directory path where metadata should be written
1151 write_backend: Backend being used for the write (disk/zarr)
1152 """
1153 # Only create OpenHCS metadata for disk/zarr backends
1154 # OMERO has its own metadata system, memory doesn't need metadata
1155 if write_backend in [Backend.MEMORY.value, Backend.OMERO_LOCAL.value]:
1156 logger.debug(f"Skipping metadata creation (backend={write_backend})")
1157 return
1159 logger.debug(f"Creating metadata for materialization write: {write_backend} -> {output_dir}")
1161 try:
1162 # Extract required information
1163 step_output_dir = Path(output_dir)
1165 # Check if we have microscope handler for metadata extraction
1166 if not context.microscope_handler:
1167 logger.debug("No microscope_handler in context - skipping OpenHCS metadata creation")
1168 return
1170 # Get source microscope information
1171 source_parser_name = context.microscope_handler.parser.__class__.__name__
1173 # Extract metadata from source microscope handler
1174 try:
1175 grid_dimensions = context.microscope_handler.metadata_handler.get_grid_dimensions(context.input_dir)
1176 pixel_size = context.microscope_handler.metadata_handler.get_pixel_size(context.input_dir)
1177 except Exception as e:
1178 logger.debug(f"Could not extract grid_dimensions/pixel_size from source: {e}")
1179 grid_dimensions = [1, 1] # Default fallback
1180 pixel_size = 1.0 # Default fallback
1182 # Get list of image files in output directory
1183 try:
1184 image_files = []
1185 if context.filemanager.exists(str(step_output_dir), write_backend):
1186 # List files in output directory
1187 files = context.filemanager.list_files(str(step_output_dir), write_backend)
1188 # Filter for image files (common extensions) and convert to strings
1189 image_extensions = {'.tif', '.tiff', '.png', '.jpg', '.jpeg'}
1190 image_files = [str(f) for f in files if Path(f).suffix.lower() in image_extensions]
1191 logger.debug(f"Found {len(image_files)} image files in {step_output_dir}")
1192 except Exception as e:
1193 logger.debug(f"Could not list image files in output directory: {e}")
1194 image_files = []
1196 # Detect available backends based on actual output files
1197 available_backends = self._detect_available_backends(step_output_dir)
1199 # Create metadata structure
1200 metadata = {
1201 "microscope_handler_name": context.microscope_handler.microscope_type,
1202 "source_filename_parser_name": source_parser_name,
1203 "grid_dimensions": list(grid_dimensions) if hasattr(grid_dimensions, '__iter__') else [1, 1],
1204 "pixel_size": float(pixel_size) if pixel_size is not None else 1.0,
1205 "image_files": image_files,
1206 "channels": self._extract_component_metadata(context, VariableComponents.CHANNEL),
1207 "wells": self._extract_component_metadata(context, VariableComponents.WELL),
1208 "sites": self._extract_component_metadata(context, VariableComponents.SITE),
1209 "z_indexes": self._extract_component_metadata(context, VariableComponents.Z_INDEX),
1210 "timepoints": self._extract_component_metadata(context, VariableComponents.TIMEPOINT),
1211 "available_backends": available_backends
1212 }
1214 # Save metadata file using disk backend (JSON files always on disk)
1215 from openhcs.microscopes.openhcs import OpenHCSMetadataHandler
1216 metadata_path = step_output_dir / OpenHCSMetadataHandler.METADATA_FILENAME
1218 # Always ensure we can write to the metadata path (delete if exists)
1219 if context.filemanager.exists(str(metadata_path), Backend.DISK.value):
1220 context.filemanager.delete(str(metadata_path), Backend.DISK.value)
1222 # Ensure output directory exists on disk
1223 context.filemanager.ensure_directory(str(step_output_dir), Backend.DISK.value)
1225 # Create JSON content - OpenHCS handler expects JSON format
1226 import json
1227 json_content = json.dumps(metadata, indent=2)
1228 context.filemanager.save(json_content, str(metadata_path), Backend.DISK.value)
1229 logger.debug(f"Created OpenHCS metadata file (disk): {metadata_path}")
1231 except Exception as e:
1232 # Graceful degradation - log error but don't fail the step
1233 logger.warning(f"Failed to create OpenHCS metadata file: {e}")
1234 logger.debug("OpenHCS metadata creation error details:", exc_info=True)
1236 def _detect_available_backends(self, output_dir: Path) -> Dict[str, bool]:
1237 """Detect which storage backends are actually available based on output files."""
1239 backends = {Backend.ZARR.value: False, Backend.DISK.value: False}
1241 # Check for zarr stores - look for .zarray or .zgroup files (zarr metadata)
1242 # Zarr stores don't need .zarr extension - any directory with zarr metadata is a store
1243 if list(output_dir.glob("**/.zarray")) or list(output_dir.glob("**/.zgroup")):
1244 backends[Backend.ZARR.value] = True
1246 # Check for image files
1247 for ext in DEFAULT_IMAGE_EXTENSIONS:
1248 if list(output_dir.glob(f"*{ext}")):
1249 backends[Backend.DISK.value] = True
1250 break
1252 logger.debug(f"Backend detection result: {backends}")
1253 return backends
1255 def _build_analysis_filename(self, output_key: str, step_index: int, step_plan: Dict, dict_key: Optional[str] = None, context=None) -> str:
1256 """Build analysis result filename from first image path template.
1258 Uses first image filename as template to preserve all metadata components.
1259 Falls back to well ID only if no images available.
1261 Args:
1262 output_key: Special output key (e.g., 'rois', 'cell_counts')
1263 step_index: Pipeline step index
1264 step_plan: Step plan dictionary
1265 dict_key: Optional channel/component key for dict pattern functions
1266 context: Processing context (for accessing microscope handler)
1267 """
1268 memory_paths = step_plan['get_paths_for_axis'](step_plan['output_dir'], Backend.MEMORY.value)
1270 if not memory_paths: 1270 ↛ 1271line 1270 didn't jump to line 1271 because the condition on line 1270 was never true
1271 return f"{step_plan['axis_id']}_{output_key}_step{step_index}.roi.zip"
1273 # Filter paths by channel if dict_key provided (for dict pattern functions)
1274 if dict_key and context: 1274 ↛ 1290line 1274 didn't jump to line 1290 because the condition on line 1274 was always true
1275 # Use microscope handler to parse filenames and filter by channel
1276 microscope_handler = context.microscope_handler
1277 parser = microscope_handler.parser
1279 filtered_paths = []
1280 for path in memory_paths:
1281 filename = Path(path).name
1282 metadata = parser.parse_filename(filename)
1283 if metadata and str(metadata.get('channel')) == str(dict_key):
1284 filtered_paths.append(path)
1286 if filtered_paths: 1286 ↛ 1290line 1286 didn't jump to line 1290 because the condition on line 1286 was always true
1287 memory_paths = filtered_paths
1289 # Use first image as template: "A01_s001_w1_z001_t001.tif" -> "A01_s001_w1_z001_t001_rois_step7.roi.zip"
1290 base_filename = Path(memory_paths[0]).stem
1291 return f"{base_filename}_{output_key}_step{step_index}.roi.zip"
1293 def _materialize_special_outputs(self, filemanager, step_plan, special_outputs, backend, context):
1294 """Materialize special outputs (ROIs, cell counts) to disk and streaming backends."""
1295 # Collect backends: main + streaming
1296 from openhcs.core.config import StreamingConfig
1297 backends = [backend]
1298 backend_kwargs = {backend: {}}
1300 for config in step_plan.values():
1301 if isinstance(config, StreamingConfig): 1301 ↛ 1302line 1301 didn't jump to line 1302 because the condition on line 1301 was never true
1302 backends.append(config.backend.value)
1303 backend_kwargs[config.backend.value] = config.get_streaming_kwargs(context)
1305 # Get analysis directory (pre-calculated by compiler)
1306 has_step_mat = 'materialized_output_dir' in step_plan
1307 analysis_output_dir = Path(step_plan['materialized_analysis_results_dir' if has_step_mat else 'analysis_results_dir'])
1308 images_dir = str(step_plan['materialized_output_dir' if has_step_mat else 'output_dir'])
1310 # Add images_dir and source to all backend kwargs
1311 step_name = step_plan.get('step_name', 'unknown_step')
1312 for kwargs in backend_kwargs.values():
1313 kwargs['images_dir'] = images_dir
1314 kwargs['source'] = step_name # Pre-built source value for layer/window naming
1316 filemanager._materialization_context = {'images_dir': images_dir}
1318 # Get dict pattern info
1319 step_func = step_plan['func']
1320 dict_keys = list(step_func.keys()) if isinstance(step_func, dict) else []
1322 # Materialize each special output
1323 for output_key, output_info in special_outputs.items():
1324 mat_func = output_info.get('materialization_function')
1325 if not mat_func:
1326 continue
1328 memory_path = output_info['path']
1329 step_index = step_plan['pipeline_position']
1331 # For dict patterns, materialize each channel separately
1332 channels_to_process = dict_keys if dict_keys else [None]
1334 for dict_key in channels_to_process:
1335 # Build channel-specific memory path if needed
1336 if dict_key: 1336 ↛ 1340line 1336 didn't jump to line 1340 because the condition on line 1336 was always true
1337 from openhcs.core.pipeline.path_planner import PipelinePathPlanner
1338 channel_path = PipelinePathPlanner.build_dict_pattern_path(memory_path, dict_key)
1339 else:
1340 channel_path = memory_path
1342 # Load data
1343 filemanager.ensure_directory(Path(channel_path).parent, Backend.MEMORY.value)
1344 data = filemanager.load(channel_path, Backend.MEMORY.value)
1346 # Build analysis filename and path (pass dict_key for channel-specific naming)
1347 filename = self._build_analysis_filename(output_key, step_index, step_plan, dict_key, context)
1348 analysis_path = analysis_output_dir / filename
1350 # Materialize to all backends
1351 mat_func(data, str(analysis_path), filemanager, backends, backend_kwargs)
1354def _update_metadata_for_zarr_conversion(
1355 plate_root: Path,
1356 original_subdir: str,
1357 zarr_subdir: str | None,
1358 context: 'ProcessingContext'
1359) -> None:
1360 """Update metadata after zarr conversion.
1362 If zarr_subdir is None: add zarr to original_subdir's available_backends
1363 If zarr_subdir is set: create complete metadata for zarr subdirectory, set original main=false
1364 """
1365 from openhcs.io.metadata_writer import get_metadata_path, AtomicMetadataWriter
1366 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator
1368 if zarr_subdir: 1368 ↛ 1389line 1368 didn't jump to line 1389 because the condition on line 1368 was always true
1369 # Create complete metadata for zarr subdirectory (skip if already complete)
1370 zarr_dir = plate_root / zarr_subdir
1371 metadata_generator = OpenHCSMetadataGenerator(context.filemanager)
1372 metadata_generator.create_metadata(
1373 context,
1374 str(zarr_dir),
1375 "zarr", # Zarr subdirectory uses zarr backend
1376 is_main=True,
1377 plate_root=str(plate_root),
1378 sub_dir=zarr_subdir,
1379 skip_if_complete=True
1380 )
1382 # Set original subdirectory to main=false
1383 metadata_path = get_metadata_path(plate_root)
1384 writer = AtomicMetadataWriter()
1385 writer.merge_subdirectory_metadata(metadata_path, {original_subdir: {"main": False}})
1386 logger.info(f"Ensured complete metadata for {zarr_subdir}, set {original_subdir} main=false")
1387 else:
1388 # Shared subdirectory - add zarr to available_backends
1389 metadata_path = get_metadata_path(plate_root)
1390 writer = AtomicMetadataWriter()
1391 writer.merge_subdirectory_metadata(metadata_path, {original_subdir: {"available_backends": {"zarr": True}}})
1392 logger.info(f"Updated metadata: {original_subdir} now has zarr backend")