pipeline_executor¶
pipeline_executor ¶
Pipeline executor for orchestrating stage execution.
Implements the complete execution flow with state machine management as specified in the design document.
ExecutionState ¶
Bases: str, Enum
Pipeline execution states.
PipelineExecutor ¶
PipelineExecutor(stages: list[PipelineStage], state_manager: StateManager, observers: list[ExecutionObserver] | None = None)
Orchestrates pipeline execution with state management.
Implements Command and Mediator patterns for coordinating stages, observers, and state management.
State Machine
IDLE → INITIALIZING → EXECUTING → [PAUSED ↔ EXECUTING] → COMPLETED ↓ FAILED
Initialize pipeline executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stages
|
list[PipelineStage]
|
Ordered list of processing stages |
required |
state_manager
|
StateManager
|
State manager for checkpointing |
required |
observers
|
list[ExecutionObserver] | None
|
Optional execution observers |
None
|
Source code in ondine/orchestration/pipeline_executor.py
add_observer ¶
Add execution observer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
observer
|
ExecutionObserver
|
Observer to add |
required |
Returns:
| Type | Description |
|---|---|
PipelineExecutor
|
Self for chaining |
Source code in ondine/orchestration/pipeline_executor.py
execute ¶
Execute pipeline end-to-end.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline
|
Any
|
Pipeline instance to execute |
required |
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
ExecutionResult with data and metrics |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If pipeline in invalid state |
Source code in ondine/orchestration/pipeline_executor.py
pause ¶
Gracefully pause execution.
Finishes current batch and saves checkpoint.
Source code in ondine/orchestration/pipeline_executor.py
resume ¶
Resume from saved checkpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
UUID
|
Session ID to resume |
required |
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
ExecutionResult |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no checkpoint found |
Source code in ondine/orchestration/pipeline_executor.py
cancel ¶
Immediately stop and save checkpoint.