Skip to content

pipeline_executor

pipeline_executor

Pipeline executor for orchestrating stage execution.

Implements the complete execution flow with state machine management as specified in the design document.

ExecutionState

Bases: str, Enum

Pipeline execution states.

PipelineExecutor

PipelineExecutor(stages: list[PipelineStage], state_manager: StateManager, observers: list[ExecutionObserver] | None = None)

Orchestrates pipeline execution with state management.

Implements Command and Mediator patterns for coordinating stages, observers, and state management.

State Machine

IDLE → INITIALIZING → EXECUTING → [PAUSED ↔ EXECUTING] → COMPLETED ↓ FAILED

Initialize pipeline executor.

Parameters:

Name Type Description Default
stages list[PipelineStage]

Ordered list of processing stages

required
state_manager StateManager

State manager for checkpointing

required
observers list[ExecutionObserver] | None

Optional execution observers

None
Source code in ondine/orchestration/pipeline_executor.py
def __init__(
    self,
    stages: list[PipelineStage],
    state_manager: StateManager,
    observers: list[ExecutionObserver] | None = None,
):
    """
    Initialize pipeline executor.

    Args:
        stages: Ordered list of processing stages
        state_manager: State manager for checkpointing
        observers: Optional execution observers
    """
    self.execution_id = uuid4()
    self.stages = stages
    self.state_manager = state_manager
    self.observers = observers or []
    self.state = ExecutionState.IDLE
    self.context: ExecutionContext | None = None
    self.logger = get_logger(f"{__name__}.{self.execution_id}")

add_observer

add_observer(observer: ExecutionObserver) -> PipelineExecutor

Add execution observer.

Parameters:

Name Type Description Default
observer ExecutionObserver

Observer to add

required

Returns:

Type Description
PipelineExecutor

Self for chaining

Source code in ondine/orchestration/pipeline_executor.py
def add_observer(self, observer: ExecutionObserver) -> "PipelineExecutor":
    """
    Add execution observer.

    Args:
        observer: Observer to add

    Returns:
        Self for chaining
    """
    self.observers.append(observer)
    return self

execute

execute(pipeline: Any) -> ExecutionResult

Execute pipeline end-to-end.

Parameters:

Name Type Description Default
pipeline Any

Pipeline instance to execute

required

Returns:

Type Description
ExecutionResult

ExecutionResult with data and metrics

Raises:

Type Description
RuntimeError

If pipeline in invalid state

Source code in ondine/orchestration/pipeline_executor.py
def execute(self, pipeline: Any) -> ExecutionResult:
    """
    Execute pipeline end-to-end.

    Args:
        pipeline: Pipeline instance to execute

    Returns:
        ExecutionResult with data and metrics

    Raises:
        RuntimeError: If pipeline in invalid state
    """
    if self.state not in [ExecutionState.IDLE, ExecutionState.FAILED]:
        raise RuntimeError(f"Cannot execute from state: {self.state}")

    try:
        # Initialize
        self.state = ExecutionState.INITIALIZING
        self.context = self._initialize_context()

        # Check for existing checkpoint
        if self.state_manager.can_resume(self.context.session_id):
            self.logger.info("Found existing checkpoint, resuming...")
            self.context = self.state_manager.load_checkpoint(
                self.context.session_id
            )

        # Notify observers
        self._notify_pipeline_start(pipeline)

        # Execute stages
        self.state = ExecutionState.EXECUTING
        result_data = self._execute_all_stages(pipeline)

        # Mark completion
        self.state = ExecutionState.COMPLETED
        self.context.end_time = datetime.now()

        # Create result
        result = self._create_execution_result(result_data)

        # Cleanup checkpoints
        self.state_manager.cleanup_checkpoints(self.context.session_id)

        # Notify observers
        self._notify_pipeline_complete(result)

        return result

    except Exception as e:
        self.state = ExecutionState.FAILED
        self._notify_pipeline_error(e)

        # Save checkpoint on failure
        if self.context:
            self.state_manager.save_checkpoint(self.context)

        raise

pause

pause() -> None

Gracefully pause execution.

Finishes current batch and saves checkpoint.

Source code in ondine/orchestration/pipeline_executor.py
def pause(self) -> None:
    """
    Gracefully pause execution.

    Finishes current batch and saves checkpoint.
    """
    if self.state != ExecutionState.EXECUTING:
        raise RuntimeError(f"Cannot pause from state: {self.state}")

    self.logger.info("Pausing execution...")
    self.state = ExecutionState.PAUSED

    # Save checkpoint
    if self.context:
        self.state_manager.save_checkpoint(self.context)

resume

resume(session_id: UUID) -> ExecutionResult

Resume from saved checkpoint.

Parameters:

Name Type Description Default
session_id UUID

Session ID to resume

required

Returns:

Type Description
ExecutionResult

ExecutionResult

Raises:

Type Description
ValueError

If no checkpoint found

Source code in ondine/orchestration/pipeline_executor.py
def resume(self, session_id: UUID) -> ExecutionResult:
    """
    Resume from saved checkpoint.

    Args:
        session_id: Session ID to resume

    Returns:
        ExecutionResult

    Raises:
        ValueError: If no checkpoint found
    """
    if not self.state_manager.can_resume(session_id):
        raise ValueError(f"No checkpoint found for session {session_id}")

    self.context = self.state_manager.load_checkpoint(session_id)
    if not self.context:
        raise ValueError("Failed to load checkpoint")

    self.logger.info(f"Resuming from row {self.context.last_processed_row}")

    # Continue execution
    # Note: Would need to reconstruct pipeline and skip processed rows
    raise NotImplementedError("Resume functionality coming soon")

cancel

cancel() -> None

Immediately stop and save checkpoint.

Source code in ondine/orchestration/pipeline_executor.py
def cancel(self) -> None:
    """
    Immediately stop and save checkpoint.
    """
    self.logger.info("Cancelling execution...")

    # Save checkpoint
    if self.context:
        self.state_manager.save_checkpoint(self.context)

    self.state = ExecutionState.IDLE