Skip to content

pipeline_stage

pipeline_stage

Base pipeline stage abstraction.

Defines the contract for all processing stages using Template Method pattern for execution flow.

PipelineStage

PipelineStage(name: str)

Bases: ABC, Generic[TInput, TOutput]

Abstract base class for all pipeline stages.

Implements Template Method pattern with hooks for extensibility. All stages follow Single Responsibility and are composable.

Initialize pipeline stage.

Parameters:

Name Type Description Default
name str

Human-readable stage name

required
Source code in ondine/stages/pipeline_stage.py
def __init__(self, name: str):
    """
    Initialize pipeline stage.

    Args:
        name: Human-readable stage name
    """
    self.name = name
    self.logger = get_logger(f"{__name__}.{name}")

process abstractmethod

process(input_data: TInput, context: Any) -> TOutput

Core processing logic (must be implemented by subclasses).

Parameters:

Name Type Description Default
input_data TInput

Input data for this stage

required
context Any

Execution context with shared state

required

Returns:

Type Description
TOutput

Processed output data

Source code in ondine/stages/pipeline_stage.py
@abstractmethod
def process(self, input_data: TInput, context: Any) -> TOutput:
    """
    Core processing logic (must be implemented by subclasses).

    Args:
        input_data: Input data for this stage
        context: Execution context with shared state

    Returns:
        Processed output data
    """
    pass

validate_input abstractmethod

validate_input(input_data: TInput) -> ValidationResult

Validate input before processing.

Parameters:

Name Type Description Default
input_data TInput

Input to validate

required

Returns:

Type Description
ValidationResult

ValidationResult with errors/warnings

Source code in ondine/stages/pipeline_stage.py
@abstractmethod
def validate_input(self, input_data: TInput) -> ValidationResult:
    """
    Validate input before processing.

    Args:
        input_data: Input to validate

    Returns:
        ValidationResult with errors/warnings
    """
    pass

execute

execute(input_data: TInput, context: Any) -> TOutput

Execute stage with pre/post hooks (Template Method).

This method orchestrates the execution flow and should not be overridden.

Parameters:

Name Type Description Default
input_data TInput

Input data

required
context Any

Execution context

required

Returns:

Type Description
TOutput

Processed output

Raises:

Type Description
ValueError

If input validation fails

Source code in ondine/stages/pipeline_stage.py
def execute(self, input_data: TInput, context: Any) -> TOutput:
    """
    Execute stage with pre/post hooks (Template Method).

    This method orchestrates the execution flow and should not
    be overridden.

    Args:
        input_data: Input data
        context: Execution context

    Returns:
        Processed output

    Raises:
        ValueError: If input validation fails
    """
    self.logger.info(f"Starting stage: {self.name}")

    # Pre-processing hook
    self.before_process(context)

    # Validate input
    validation = self.validate_input(input_data)
    if not validation.is_valid:
        error_msg = f"Input validation failed: {validation.errors}"
        self.logger.error(error_msg)
        raise ValueError(error_msg)

    if validation.warnings:
        for warning in validation.warnings:
            self.logger.warning(warning)

    # Core processing
    try:
        result = self.process(input_data, context)
        self.logger.info(f"Completed stage: {self.name}")

        # Post-processing hook
        self.after_process(result, context)

        return result
    except Exception as e:
        self.logger.error(f"Stage {self.name} failed: {e}")
        error_decision = self.on_error(e, context)
        raise error_decision

before_process

before_process(context: Any) -> None

Hook called before processing (default: no-op).

Parameters:

Name Type Description Default
context Any

Execution context

required
Source code in ondine/stages/pipeline_stage.py
def before_process(self, context: Any) -> None:
    """
    Hook called before processing (default: no-op).

    Args:
        context: Execution context
    """
    pass

after_process

after_process(result: TOutput, context: Any) -> None

Hook called after successful processing (default: no-op).

Parameters:

Name Type Description Default
result TOutput

Processing result

required
context Any

Execution context

required
Source code in ondine/stages/pipeline_stage.py
def after_process(self, result: TOutput, context: Any) -> None:
    """
    Hook called after successful processing (default: no-op).

    Args:
        result: Processing result
        context: Execution context
    """
    pass

on_error

on_error(error: Exception, context: Any) -> Exception

Hook called on processing error (default: re-raise).

Parameters:

Name Type Description Default
error Exception

The exception that occurred

required
context Any

Execution context

required

Returns:

Type Description
Exception

Exception to raise (can transform error)

Source code in ondine/stages/pipeline_stage.py
def on_error(self, error: Exception, context: Any) -> Exception:
    """
    Hook called on processing error (default: re-raise).

    Args:
        error: The exception that occurred
        context: Execution context

    Returns:
        Exception to raise (can transform error)
    """
    return error

estimate_cost abstractmethod

estimate_cost(input_data: TInput) -> CostEstimate

Estimate processing cost for this stage.

Parameters:

Name Type Description Default
input_data TInput

Input data to estimate for

required

Returns:

Type Description
CostEstimate

Cost estimate

Source code in ondine/stages/pipeline_stage.py
@abstractmethod
def estimate_cost(self, input_data: TInput) -> CostEstimate:
    """
    Estimate processing cost for this stage.

    Args:
        input_data: Input data to estimate for

    Returns:
        Cost estimate
    """
    pass