orchestration¶
orchestration ¶
Orchestration engine for pipeline execution control.
AsyncExecutor ¶
Bases: ExecutionStrategy
Asynchronous execution strategy.
Uses asyncio for true non-blocking execution. Leverages LlamaIndex's async methods (acomplete) for concurrent LLM calls without threads.
Benefits: - Non-blocking (works with FastAPI, aiohttp) - Better resource utilization - Higher concurrency without thread overhead - Ideal for I/O-bound operations
Initialize async executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_concurrency
|
int
|
Maximum concurrent async tasks |
10
|
Source code in ondine/orchestration/async_executor.py
execute
async
¶
Execute stages asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stages
|
list[PipelineStage]
|
Pipeline stages |
required |
context
|
ExecutionContext
|
Execution context |
required |
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
ExecutionResult with data and metrics |
Source code in ondine/orchestration/async_executor.py
supports_async ¶
supports_streaming ¶
execute_async
async
¶
ExecutionContext
dataclass
¶
ExecutionContext(session_id: UUID = uuid4(), pipeline_id: UUID = uuid4(), start_time: datetime = datetime.now(), end_time: datetime | None = None, current_stage_index: int = 0, last_processed_row: int = 0, total_rows: int = 0, accumulated_cost: Decimal = (lambda: Decimal('0.0'))(), accumulated_tokens: int = 0, intermediate_data: dict[str, Any] = dict(), failed_rows: int = 0, skipped_rows: int = 0, observers: list[ExecutionObserver] = list(), observer_dispatcher: Optional[ObserverDispatcher] = None, trace_id: str = (lambda: str(uuid4()))(), span_id: str = (lambda: str(uuid4()))())
Lightweight orchestration state (passed between pipeline stages).
Scope: Runtime execution state and progress tracking Pattern: Memento (serializable for checkpointing)
Cost Tracking in ExecutionContext: - Simple accumulation for orchestration purposes - Used by: Executors to track overall progress - NOT for: Detailed accounting (use CostTracker for that)
Why separate from CostTracker? - ExecutionContext = orchestration state (stage progress, session ID, timing) - CostTracker = detailed accounting (per-stage breakdowns, thread-safe entries, metrics) - Different concerns, different use cases
ExecutionContext is: - Passed between stages in the pipeline - Serialized for checkpointing - Focused on execution orchestration
CostTracker is: - Used within LLMInvocationStage for detailed tracking - Thread-safe for concurrent operations - Focused on cost reporting and analytics
See Also: - CostTracker: For detailed cost accounting with breakdowns - docs/TECHNICAL_REFERENCE.md: Cost tracking architecture
Carries shared state between stages and tracks progress. Immutable for most fields to prevent accidental modification.
update_stage ¶
update_row ¶
add_cost ¶
notify_progress ¶
Notify all observers of progress update.
Source code in ondine/orchestration/execution_context.py
get_progress ¶
get_stats ¶
Get processing statistics.
Source code in ondine/orchestration/execution_context.py
to_checkpoint ¶
Serialize to checkpoint dictionary (Memento pattern).
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation for persistence |
Source code in ondine/orchestration/execution_context.py
from_checkpoint
classmethod
¶
Deserialize from checkpoint dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Checkpoint data |
required |
Returns:
| Type | Description |
|---|---|
ExecutionContext
|
Restored ExecutionContext |
Source code in ondine/orchestration/execution_context.py
to_dict ¶
from_dict
classmethod
¶
ExecutionStrategy ¶
Bases: ABC
Abstract base for execution strategies.
Follows Strategy pattern: defines interface for executing pipeline stages in different modes (sync, async, streaming).
execute
abstractmethod
¶
execute(stages: list[PipelineStage], context: ExecutionContext) -> ExecutionResult | Iterator[pd.DataFrame] | AsyncIterator[pd.DataFrame]
Execute pipeline stages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stages
|
list[PipelineStage]
|
List of pipeline stages to execute |
required |
context
|
ExecutionContext
|
Execution context for state management |
required |
Returns:
| Type | Description |
|---|---|
ExecutionResult | Iterator[DataFrame] | AsyncIterator[DataFrame]
|
ExecutionResult or iterator for streaming |
Source code in ondine/orchestration/execution_strategy.py
supports_async
abstractmethod
¶
CostTrackingObserver ¶
Bases: ExecutionObserver
Observer that tracks and warns about costs.
Initialize cost tracking observer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
warning_threshold
|
float
|
Warn when this fraction of budget used |
0.75
|
Source code in ondine/orchestration/observers.py
on_pipeline_start ¶
on_stage_start ¶
on_stage_complete ¶
Check cost after stage completion.
Source code in ondine/orchestration/observers.py
on_stage_error ¶
on_pipeline_complete ¶
Log final cost summary.
Source code in ondine/orchestration/observers.py
on_pipeline_error ¶
on_progress_update ¶
ExecutionObserver ¶
Bases: ABC
Abstract base for execution observers.
Observers can monitor pipeline execution without coupling to the executor implementation.
LoggingObserver ¶
Bases: ExecutionObserver
Observer that logs execution events.
Initialize logging observer.
Source code in ondine/orchestration/observers.py
on_pipeline_start ¶
on_stage_start ¶
on_stage_complete ¶
Log stage completion.
on_stage_error ¶
on_pipeline_complete ¶
Log pipeline completion.
Source code in ondine/orchestration/observers.py
on_pipeline_error ¶
on_progress_update ¶
Log progress update.
Source code in ondine/orchestration/observers.py
ProgressBarObserver ¶
Bases: ExecutionObserver
Observer that displays progress bar with tqdm.
Initialize progress bar observer.
Source code in ondine/orchestration/observers.py
on_pipeline_start ¶
Initialize progress bar.
on_stage_start ¶
on_stage_complete ¶
Update progress bar.
on_stage_error ¶
Handle error in progress bar.
on_pipeline_complete ¶
on_pipeline_error ¶
on_progress_update ¶
Update progress bar with current row count.
Source code in ondine/orchestration/observers.py
StateManager ¶
Manages execution state persistence and recovery.
Follows Single Responsibility: only handles state management. Uses Strategy pattern for pluggable storage backends.
Initialize state manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage
|
CheckpointStorage
|
Checkpoint storage backend |
required |
checkpoint_interval
|
int
|
Rows between checkpoints |
500
|
Source code in ondine/orchestration/state_manager.py
should_checkpoint ¶
Check if checkpoint should be saved.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current_row
|
int
|
Current row index |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if checkpoint due |
Source code in ondine/orchestration/state_manager.py
save_checkpoint ¶
Save checkpoint for execution context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
ExecutionContext
|
Execution context to save |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Source code in ondine/orchestration/state_manager.py
load_checkpoint ¶
Load checkpoint for session.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
UUID
|
Session identifier |
required |
Returns:
| Type | Description |
|---|---|
ExecutionContext | None
|
Restored execution context or None |
Source code in ondine/orchestration/state_manager.py
can_resume ¶
Check if session can be resumed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
UUID
|
Session identifier |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if checkpoint exists |
cleanup_checkpoints ¶
Delete checkpoints for session.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
UUID
|
Session identifier |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if deleted |
Source code in ondine/orchestration/state_manager.py
list_checkpoints ¶
List all available checkpoints.
Returns:
| Type | Description |
|---|---|
list[CheckpointInfo]
|
List of checkpoint information |
StreamingExecutor ¶
Bases: ExecutionStrategy
Streaming execution strategy.
Processes data in chunks to maintain constant memory usage. Ideal for very large datasets (100K+ rows) that don't fit in memory.
Benefits: - Constant memory footprint - Can process unlimited dataset sizes - Checkpoints at chunk boundaries - Early results available
Initialize streaming executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk_size
|
int
|
Number of rows per chunk |
1000
|
Source code in ondine/orchestration/streaming_executor.py
execute ¶
Execute stages in streaming mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stages
|
list[PipelineStage]
|
Pipeline stages |
required |
context
|
ExecutionContext
|
Execution context |
required |
Yields:
| Type | Description |
|---|---|
DataFrame
|
DataFrames with processed chunks |
Source code in ondine/orchestration/streaming_executor.py
supports_async ¶
supports_streaming ¶
execute_stream ¶
StreamingResult ¶
Result container for streaming execution.
Provides access to metrics after consuming the stream.
Initialize streaming result.
Source code in ondine/orchestration/streaming_executor.py
add_chunk ¶
finalize ¶
Create final ExecutionResult.
Source code in ondine/orchestration/streaming_executor.py
SyncExecutor ¶
Bases: ExecutionStrategy
Synchronous execution strategy.
Uses ThreadPoolExecutor for concurrent LLM calls while maintaining sequential stage execution. This is the default strategy that preserves current behavior.
Initialize synchronous executor.
Source code in ondine/orchestration/sync_executor.py
execute ¶
Execute stages synchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stages
|
list[PipelineStage]
|
Pipeline stages |
required |
context
|
ExecutionContext
|
Execution context |
required |
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
ExecutionResult with data and metrics |