Skip to content

orchestration

orchestration

Orchestration engine for pipeline execution control.

AsyncExecutor

AsyncExecutor(max_concurrency: int = 10)

Bases: ExecutionStrategy

Asynchronous execution strategy.

Uses asyncio for true non-blocking execution. Leverages LlamaIndex's async methods (acomplete) for concurrent LLM calls without threads.

Benefits: - Non-blocking (works with FastAPI, aiohttp) - Better resource utilization - Higher concurrency without thread overhead - Ideal for I/O-bound operations

Initialize async executor.

Parameters:

Name Type Description Default
max_concurrency int

Maximum concurrent async tasks

10
Source code in ondine/orchestration/async_executor.py
def __init__(self, max_concurrency: int = 10):
    """
    Initialize async executor.

    Args:
        max_concurrency: Maximum concurrent async tasks
    """
    self.max_concurrency = max_concurrency
    self.logger = logger
    self.semaphore = asyncio.Semaphore(max_concurrency)

name property

name: str

Strategy name.

execute async

execute(stages: list[PipelineStage], context: ExecutionContext) -> ExecutionResult

Execute stages asynchronously.

Parameters:

Name Type Description Default
stages list[PipelineStage]

Pipeline stages

required
context ExecutionContext

Execution context

required

Returns:

Type Description
ExecutionResult

ExecutionResult with data and metrics

Source code in ondine/orchestration/async_executor.py
async def execute(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> ExecutionResult:
    """
    Execute stages asynchronously.

    Args:
        stages: Pipeline stages
        context: Execution context

    Returns:
        ExecutionResult with data and metrics
    """
    start_time = datetime.now()

    try:
        # Execute stages with async/await
        result_data = await self._execute_stages_async(stages, context)

        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()

        # Calculate stats
        stats = ProcessingStats(
            total_rows=context.total_rows,
            processed_rows=context.last_processed_row + 1,
            failed_rows=context.total_rows - (context.last_processed_row + 1),
            skipped_rows=0,
            rows_per_second=context.total_rows / duration if duration > 0 else 0,
            total_duration_seconds=duration,
        )

        # Get cost estimate
        cost_estimate = CostEstimate(
            total_cost=context.accumulated_cost,
            total_tokens=context.accumulated_tokens,
            input_tokens=0,
            output_tokens=0,
            rows=context.total_rows,
            confidence="actual",
        )

        return ExecutionResult(
            data=result_data,
            metrics=stats,
            costs=cost_estimate,
            execution_id=context.session_id,
            start_time=start_time,
            end_time=end_time,
            success=True,
        )

    except Exception as e:
        self.logger.error(f"Async pipeline execution failed: {e}")
        raise

supports_async

supports_async() -> bool

Async executor supports async.

Source code in ondine/orchestration/async_executor.py
def supports_async(self) -> bool:
    """Async executor supports async."""
    return True

supports_streaming

supports_streaming() -> bool

Async executor doesn't support streaming.

Source code in ondine/orchestration/async_executor.py
def supports_streaming(self) -> bool:
    """Async executor doesn't support streaming."""
    return False

execute_async async

execute_async(stages: list[PipelineStage], context: ExecutionContext) -> ExecutionResult

Alias for execute() method.

Source code in ondine/orchestration/async_executor.py
async def execute_async(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> ExecutionResult:
    """Alias for execute() method."""
    return await self.execute(stages, context)

ExecutionContext dataclass

ExecutionContext(session_id: UUID = uuid4(), pipeline_id: UUID = uuid4(), start_time: datetime = datetime.now(), end_time: datetime | None = None, current_stage_index: int = 0, last_processed_row: int = 0, total_rows: int = 0, accumulated_cost: Decimal = (lambda: Decimal('0.0'))(), accumulated_tokens: int = 0, intermediate_data: dict[str, Any] = dict(), failed_rows: int = 0, skipped_rows: int = 0, observers: list[ExecutionObserver] = list(), observer_dispatcher: Optional[ObserverDispatcher] = None, trace_id: str = (lambda: str(uuid4()))(), span_id: str = (lambda: str(uuid4()))())

Lightweight orchestration state (passed between pipeline stages).

Scope: Runtime execution state and progress tracking Pattern: Memento (serializable for checkpointing)

Cost Tracking in ExecutionContext: - Simple accumulation for orchestration purposes - Used by: Executors to track overall progress - NOT for: Detailed accounting (use CostTracker for that)

Why separate from CostTracker? - ExecutionContext = orchestration state (stage progress, session ID, timing) - CostTracker = detailed accounting (per-stage breakdowns, thread-safe entries, metrics) - Different concerns, different use cases

ExecutionContext is: - Passed between stages in the pipeline - Serialized for checkpointing - Focused on execution orchestration

CostTracker is: - Used within LLMInvocationStage for detailed tracking - Thread-safe for concurrent operations - Focused on cost reporting and analytics

See Also: - CostTracker: For detailed cost accounting with breakdowns - docs/TECHNICAL_REFERENCE.md: Cost tracking architecture

Carries shared state between stages and tracks progress. Immutable for most fields to prevent accidental modification.

update_stage

update_stage(stage_index: int) -> None

Update current stage.

Source code in ondine/orchestration/execution_context.py
def update_stage(self, stage_index: int) -> None:
    """Update current stage."""
    self.current_stage_index = stage_index

update_row

update_row(row_index: int) -> None

Update last processed row.

Source code in ondine/orchestration/execution_context.py
def update_row(self, row_index: int) -> None:
    """Update last processed row."""
    self.last_processed_row = row_index

add_cost

add_cost(cost: Decimal, tokens: int) -> None

Add cost and token usage.

Source code in ondine/orchestration/execution_context.py
def add_cost(self, cost: Decimal, tokens: int) -> None:
    """Add cost and token usage."""
    self.accumulated_cost += cost
    self.accumulated_tokens += tokens

notify_progress

notify_progress() -> None

Notify all observers of progress update.

Source code in ondine/orchestration/execution_context.py
def notify_progress(self) -> None:
    """Notify all observers of progress update."""
    for observer in self.observers:
        try:
            observer.on_progress_update(self)
        except Exception:  # nosec B110
            # Silently ignore observer errors to not break pipeline
            # Observers are non-critical, pipeline should continue even if they fail
            pass

get_progress

get_progress() -> float

Get completion percentage.

Source code in ondine/orchestration/execution_context.py
def get_progress(self) -> float:
    """Get completion percentage."""
    if self.total_rows == 0:
        return 0.0
    return (self.last_processed_row / self.total_rows) * 100

get_stats

get_stats() -> ProcessingStats

Get processing statistics.

Source code in ondine/orchestration/execution_context.py
def get_stats(self) -> ProcessingStats:
    """Get processing statistics."""
    duration = (
        (datetime.now() - self.start_time).total_seconds()
        if self.end_time is None
        else (self.end_time - self.start_time).total_seconds()
    )

    # last_processed_row is 0-based index, so add 1 for count
    actual_processed = (
        self.last_processed_row + 1 if self.last_processed_row >= 0 else 0
    )

    rows_per_second = actual_processed / duration if duration > 0 else 0.0

    return ProcessingStats(
        total_rows=self.total_rows,
        processed_rows=actual_processed,
        failed_rows=self.failed_rows,
        skipped_rows=self.skipped_rows,
        rows_per_second=rows_per_second,
        total_duration_seconds=duration,
    )

to_checkpoint

to_checkpoint() -> dict[str, Any]

Serialize to checkpoint dictionary (Memento pattern).

Returns:

Type Description
dict[str, Any]

Dictionary representation for persistence

Source code in ondine/orchestration/execution_context.py
def to_checkpoint(self) -> dict[str, Any]:
    """
    Serialize to checkpoint dictionary (Memento pattern).

    Returns:
        Dictionary representation for persistence
    """
    return {
        "session_id": str(self.session_id),
        "pipeline_id": str(self.pipeline_id),
        "start_time": self.start_time.isoformat(),
        "end_time": self.end_time.isoformat() if self.end_time else None,
        "current_stage_index": self.current_stage_index,
        "last_processed_row": self.last_processed_row,
        "total_rows": self.total_rows,
        "accumulated_cost": str(self.accumulated_cost),
        "accumulated_tokens": self.accumulated_tokens,
        "intermediate_data": self.intermediate_data,
        "failed_rows": self.failed_rows,
        "skipped_rows": self.skipped_rows,
    }

from_checkpoint classmethod

from_checkpoint(data: dict[str, Any]) -> ExecutionContext

Deserialize from checkpoint dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Checkpoint data

required

Returns:

Type Description
ExecutionContext

Restored ExecutionContext

Source code in ondine/orchestration/execution_context.py
@classmethod
def from_checkpoint(cls, data: dict[str, Any]) -> "ExecutionContext":
    """
    Deserialize from checkpoint dictionary.

    Args:
        data: Checkpoint data

    Returns:
        Restored ExecutionContext
    """
    return cls(
        session_id=UUID(data["session_id"]),
        pipeline_id=UUID(data["pipeline_id"]),
        start_time=datetime.fromisoformat(data["start_time"]),
        end_time=(
            datetime.fromisoformat(data["end_time"])
            if data.get("end_time")
            else None
        ),
        current_stage_index=data["current_stage_index"],
        last_processed_row=data["last_processed_row"],
        total_rows=data["total_rows"],
        accumulated_cost=Decimal(data["accumulated_cost"]),
        accumulated_tokens=data["accumulated_tokens"],
        intermediate_data=data.get("intermediate_data", {}),
        failed_rows=data.get("failed_rows", 0),
        skipped_rows=data.get("skipped_rows", 0),
    )

to_dict

to_dict() -> dict[str, Any]

Alias for to_checkpoint().

Source code in ondine/orchestration/execution_context.py
def to_dict(self) -> dict[str, Any]:
    """Alias for to_checkpoint()."""
    return self.to_checkpoint()

from_dict classmethod

from_dict(data: dict[str, Any]) -> ExecutionContext

Alias for from_checkpoint().

Source code in ondine/orchestration/execution_context.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ExecutionContext":
    """Alias for from_checkpoint()."""
    return cls.from_checkpoint(data)

ExecutionStrategy

Bases: ABC

Abstract base for execution strategies.

Follows Strategy pattern: defines interface for executing pipeline stages in different modes (sync, async, streaming).

name abstractmethod property

name: str

Strategy name for logging.

execute abstractmethod

execute(stages: list[PipelineStage], context: ExecutionContext) -> ExecutionResult | Iterator[pd.DataFrame] | AsyncIterator[pd.DataFrame]

Execute pipeline stages.

Parameters:

Name Type Description Default
stages list[PipelineStage]

List of pipeline stages to execute

required
context ExecutionContext

Execution context for state management

required

Returns:

Type Description
ExecutionResult | Iterator[DataFrame] | AsyncIterator[DataFrame]

ExecutionResult or iterator for streaming

Source code in ondine/orchestration/execution_strategy.py
@abstractmethod
def execute(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> ExecutionResult | Iterator[pd.DataFrame] | AsyncIterator[pd.DataFrame]:
    """
    Execute pipeline stages.

    Args:
        stages: List of pipeline stages to execute
        context: Execution context for state management

    Returns:
        ExecutionResult or iterator for streaming
    """
    pass

supports_async abstractmethod

supports_async() -> bool

Whether this strategy supports async execution.

Source code in ondine/orchestration/execution_strategy.py
@abstractmethod
def supports_async(self) -> bool:
    """Whether this strategy supports async execution."""
    pass

supports_streaming abstractmethod

supports_streaming() -> bool

Whether this strategy supports streaming.

Source code in ondine/orchestration/execution_strategy.py
@abstractmethod
def supports_streaming(self) -> bool:
    """Whether this strategy supports streaming."""
    pass

CostTrackingObserver

CostTrackingObserver(warning_threshold: float = 0.75)

Bases: ExecutionObserver

Observer that tracks and warns about costs.

Initialize cost tracking observer.

Parameters:

Name Type Description Default
warning_threshold float

Warn when this fraction of budget used

0.75
Source code in ondine/orchestration/observers.py
def __init__(self, warning_threshold: float = 0.75):
    """
    Initialize cost tracking observer.

    Args:
        warning_threshold: Warn when this fraction of budget used
    """
    self.logger = get_logger(__name__)
    self.warning_threshold = warning_threshold
    self.max_budget: float | None = None

on_pipeline_start

on_pipeline_start(pipeline: Any, context: ExecutionContext) -> None

Set max budget if available.

Source code in ondine/orchestration/observers.py
def on_pipeline_start(self, pipeline: Any, context: ExecutionContext) -> None:
    """Set max budget if available."""
    # Could extract from pipeline specs
    pass

on_stage_start

on_stage_start(stage: PipelineStage, context: ExecutionContext) -> None

No action on stage start.

Source code in ondine/orchestration/observers.py
def on_stage_start(self, stage: PipelineStage, context: ExecutionContext) -> None:
    """No action on stage start."""
    pass

on_stage_complete

on_stage_complete(stage: PipelineStage, context: ExecutionContext, result: Any) -> None

Check cost after stage completion.

Source code in ondine/orchestration/observers.py
def on_stage_complete(
    self, stage: PipelineStage, context: ExecutionContext, result: Any
) -> None:
    """Check cost after stage completion."""
    if self.max_budget:
        usage_ratio = float(context.accumulated_cost) / self.max_budget

        if usage_ratio >= self.warning_threshold:
            self.logger.warning(
                f"Cost warning: {usage_ratio * 100:.1f}% of budget used "
                f"(${context.accumulated_cost:.4f} / ${self.max_budget:.2f})"
            )

on_stage_error

on_stage_error(stage: PipelineStage, context: ExecutionContext, error: Exception) -> None

No action on error.

Source code in ondine/orchestration/observers.py
def on_stage_error(
    self, stage: PipelineStage, context: ExecutionContext, error: Exception
) -> None:
    """No action on error."""
    pass

on_pipeline_complete

on_pipeline_complete(context: ExecutionContext, result: ExecutionResult) -> None

Log final cost summary.

Source code in ondine/orchestration/observers.py
def on_pipeline_complete(
    self, context: ExecutionContext, result: ExecutionResult
) -> None:
    """Log final cost summary."""
    self.logger.info(
        f"Cost summary:\n"
        f"  Total: ${result.costs.total_cost:.4f}\n"
        f"  Input tokens: {result.costs.input_tokens:,}\n"
        f"  Output tokens: {result.costs.output_tokens:,}\n"
        f"  Cost per row: ${float(result.costs.total_cost) / result.metrics.total_rows:.6f}"
    )

on_pipeline_error

on_pipeline_error(context: ExecutionContext, error: Exception) -> None

Log cost at failure.

Source code in ondine/orchestration/observers.py
def on_pipeline_error(self, context: ExecutionContext, error: Exception) -> None:
    """Log cost at failure."""
    self.logger.info(f"Cost at failure: ${context.accumulated_cost:.4f}")

on_progress_update

on_progress_update(context: ExecutionContext) -> None

No action on progress update for cost tracking.

Source code in ondine/orchestration/observers.py
def on_progress_update(self, context: ExecutionContext) -> None:
    """No action on progress update for cost tracking."""
    pass

ExecutionObserver

Bases: ABC

Abstract base for execution observers.

Observers can monitor pipeline execution without coupling to the executor implementation.

on_pipeline_start abstractmethod

on_pipeline_start(pipeline: Any, context: ExecutionContext) -> None

Called before first stage execution.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_pipeline_start(self, pipeline: Any, context: ExecutionContext) -> None:
    """Called before first stage execution."""
    pass

on_stage_start abstractmethod

on_stage_start(stage: PipelineStage, context: ExecutionContext) -> None

Called before each stage.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_stage_start(self, stage: PipelineStage, context: ExecutionContext) -> None:
    """Called before each stage."""
    pass

on_stage_complete abstractmethod

on_stage_complete(stage: PipelineStage, context: ExecutionContext, result: Any) -> None

Called after successful stage completion.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_stage_complete(
    self, stage: PipelineStage, context: ExecutionContext, result: Any
) -> None:
    """Called after successful stage completion."""
    pass

on_stage_error abstractmethod

on_stage_error(stage: PipelineStage, context: ExecutionContext, error: Exception) -> None

Called on stage failure.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_stage_error(
    self, stage: PipelineStage, context: ExecutionContext, error: Exception
) -> None:
    """Called on stage failure."""
    pass

on_pipeline_complete abstractmethod

on_pipeline_complete(context: ExecutionContext, result: ExecutionResult) -> None

Called after all stages complete.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_pipeline_complete(
    self, context: ExecutionContext, result: ExecutionResult
) -> None:
    """Called after all stages complete."""
    pass

on_pipeline_error abstractmethod

on_pipeline_error(context: ExecutionContext, error: Exception) -> None

Called on fatal pipeline failure.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_pipeline_error(self, context: ExecutionContext, error: Exception) -> None:
    """Called on fatal pipeline failure."""
    pass

on_progress_update

on_progress_update(context: ExecutionContext) -> None

Called periodically during execution for progress updates.

Source code in ondine/orchestration/observers.py
def on_progress_update(self, context: ExecutionContext) -> None:
    """Called periodically during execution for progress updates."""
    pass

LoggingObserver

LoggingObserver()

Bases: ExecutionObserver

Observer that logs execution events.

Initialize logging observer.

Source code in ondine/orchestration/observers.py
def __init__(self):
    """Initialize logging observer."""
    self.logger = get_logger(__name__)

on_pipeline_start

on_pipeline_start(pipeline: Any, context: ExecutionContext) -> None

Log pipeline start.

Source code in ondine/orchestration/observers.py
def on_pipeline_start(self, pipeline: Any, context: ExecutionContext) -> None:
    """Log pipeline start."""
    self.logger.info(f"Pipeline execution started (session: {context.session_id})")

on_stage_start

on_stage_start(stage: PipelineStage, context: ExecutionContext) -> None

Log stage start.

Source code in ondine/orchestration/observers.py
def on_stage_start(self, stage: PipelineStage, context: ExecutionContext) -> None:
    """Log stage start."""
    self.logger.info(
        f"Starting stage: {stage.name} (progress: {context.get_progress():.1f}%)"
    )

on_stage_complete

on_stage_complete(stage: PipelineStage, context: ExecutionContext, result: Any) -> None

Log stage completion.

Source code in ondine/orchestration/observers.py
def on_stage_complete(
    self, stage: PipelineStage, context: ExecutionContext, result: Any
) -> None:
    """Log stage completion."""
    self.logger.info(
        f"Completed stage: {stage.name} (cost: ${context.accumulated_cost:.4f})"
    )

on_stage_error

on_stage_error(stage: PipelineStage, context: ExecutionContext, error: Exception) -> None

Log stage error.

Source code in ondine/orchestration/observers.py
def on_stage_error(
    self, stage: PipelineStage, context: ExecutionContext, error: Exception
) -> None:
    """Log stage error."""
    self.logger.error(f"Stage {stage.name} failed: {error}")

on_pipeline_complete

on_pipeline_complete(context: ExecutionContext, result: ExecutionResult) -> None

Log pipeline completion.

Source code in ondine/orchestration/observers.py
def on_pipeline_complete(
    self, context: ExecutionContext, result: ExecutionResult
) -> None:
    """Log pipeline completion."""
    self.logger.info(
        f"Pipeline execution completed successfully\n"
        f"  Processed: {result.metrics.processed_rows} rows\n"
        f"  Duration: {result.metrics.total_duration_seconds:.2f}s\n"
        f"  Total cost: ${result.costs.total_cost:.4f}\n"
        f"  Errors: {result.metrics.failed_rows}"
    )

on_pipeline_error

on_pipeline_error(context: ExecutionContext, error: Exception) -> None

Log pipeline error.

Source code in ondine/orchestration/observers.py
def on_pipeline_error(self, context: ExecutionContext, error: Exception) -> None:
    """Log pipeline error."""
    self.logger.error(f"Pipeline execution failed: {error}")

on_progress_update

on_progress_update(context: ExecutionContext) -> None

Log progress update.

Source code in ondine/orchestration/observers.py
def on_progress_update(self, context: ExecutionContext) -> None:
    """Log progress update."""
    # Make progress very visible with separators
    self.logger.info(
        f"━━━━━━ PROGRESS: {context.last_processed_row}/{context.total_rows} rows "
        f"({context.get_progress():.1f}%) | Cost: ${context.accumulated_cost:.4f} ━━━━━━"
    )

ProgressBarObserver

ProgressBarObserver()

Bases: ExecutionObserver

Observer that displays progress bar with tqdm.

Initialize progress bar observer.

Source code in ondine/orchestration/observers.py
def __init__(self):
    """Initialize progress bar observer."""
    self.progress_bar: tqdm | None = None

on_pipeline_start

on_pipeline_start(pipeline: Any, context: ExecutionContext) -> None

Initialize progress bar.

Source code in ondine/orchestration/observers.py
def on_pipeline_start(self, pipeline: Any, context: ExecutionContext) -> None:
    """Initialize progress bar."""
    if context.total_rows > 0:
        self.progress_bar = tqdm(
            total=context.total_rows,
            desc="Processing",
            unit="rows",
        )

on_stage_start

on_stage_start(stage: PipelineStage, context: ExecutionContext) -> None

Update progress bar description.

Source code in ondine/orchestration/observers.py
def on_stage_start(self, stage: PipelineStage, context: ExecutionContext) -> None:
    """Update progress bar description."""
    if self.progress_bar:
        self.progress_bar.set_description(f"Stage: {stage.name}")

on_stage_complete

on_stage_complete(stage: PipelineStage, context: ExecutionContext, result: Any) -> None

Update progress bar.

Source code in ondine/orchestration/observers.py
def on_stage_complete(
    self, stage: PipelineStage, context: ExecutionContext, result: Any
) -> None:
    """Update progress bar."""
    if self.progress_bar:
        self.progress_bar.n = context.last_processed_row
        self.progress_bar.refresh()

on_stage_error

on_stage_error(stage: PipelineStage, context: ExecutionContext, error: Exception) -> None

Handle error in progress bar.

Source code in ondine/orchestration/observers.py
def on_stage_error(
    self, stage: PipelineStage, context: ExecutionContext, error: Exception
) -> None:
    """Handle error in progress bar."""
    if self.progress_bar:
        self.progress_bar.set_description(f"Error in {stage.name}")

on_pipeline_complete

on_pipeline_complete(context: ExecutionContext, result: ExecutionResult) -> None

Close progress bar.

Source code in ondine/orchestration/observers.py
def on_pipeline_complete(
    self, context: ExecutionContext, result: ExecutionResult
) -> None:
    """Close progress bar."""
    if self.progress_bar:
        self.progress_bar.close()
        self.progress_bar = None

on_pipeline_error

on_pipeline_error(context: ExecutionContext, error: Exception) -> None

Close progress bar on error.

Source code in ondine/orchestration/observers.py
def on_pipeline_error(self, context: ExecutionContext, error: Exception) -> None:
    """Close progress bar on error."""
    if self.progress_bar:
        self.progress_bar.close()
        self.progress_bar = None

on_progress_update

on_progress_update(context: ExecutionContext) -> None

Update progress bar with current row count.

Source code in ondine/orchestration/observers.py
def on_progress_update(self, context: ExecutionContext) -> None:
    """Update progress bar with current row count."""
    if self.progress_bar:
        self.progress_bar.n = context.last_processed_row
        self.progress_bar.set_postfix(
            {
                "cost": f"${context.accumulated_cost:.4f}",
                "progress": f"{context.get_progress():.1f}%",
            }
        )
        self.progress_bar.refresh()

StateManager

StateManager(storage: CheckpointStorage, checkpoint_interval: int = 500)

Manages execution state persistence and recovery.

Follows Single Responsibility: only handles state management. Uses Strategy pattern for pluggable storage backends.

Initialize state manager.

Parameters:

Name Type Description Default
storage CheckpointStorage

Checkpoint storage backend

required
checkpoint_interval int

Rows between checkpoints

500
Source code in ondine/orchestration/state_manager.py
def __init__(self, storage: CheckpointStorage, checkpoint_interval: int = 500):
    """
    Initialize state manager.

    Args:
        storage: Checkpoint storage backend
        checkpoint_interval: Rows between checkpoints
    """
    self.storage = storage
    self.checkpoint_interval = checkpoint_interval
    self._last_checkpoint_row = 0

should_checkpoint

should_checkpoint(current_row: int) -> bool

Check if checkpoint should be saved.

Parameters:

Name Type Description Default
current_row int

Current row index

required

Returns:

Type Description
bool

True if checkpoint due

Source code in ondine/orchestration/state_manager.py
def should_checkpoint(self, current_row: int) -> bool:
    """
    Check if checkpoint should be saved.

    Args:
        current_row: Current row index

    Returns:
        True if checkpoint due
    """
    return (current_row - self._last_checkpoint_row) >= self.checkpoint_interval

save_checkpoint

save_checkpoint(context: ExecutionContext) -> bool

Save checkpoint for execution context.

Parameters:

Name Type Description Default
context ExecutionContext

Execution context to save

required

Returns:

Type Description
bool

True if successful

Source code in ondine/orchestration/state_manager.py
def save_checkpoint(self, context: ExecutionContext) -> bool:
    """
    Save checkpoint for execution context.

    Args:
        context: Execution context to save

    Returns:
        True if successful
    """
    try:
        checkpoint_data = context.to_checkpoint()
        success = self.storage.save(context.session_id, checkpoint_data)

        if success:
            self._last_checkpoint_row = context.last_processed_row
            logger.info(f"Checkpoint saved at row {context.last_processed_row}")

        return success
    except Exception as e:
        logger.error(f"Failed to save checkpoint: {e}")
        return False

load_checkpoint

load_checkpoint(session_id: UUID) -> ExecutionContext | None

Load checkpoint for session.

Parameters:

Name Type Description Default
session_id UUID

Session identifier

required

Returns:

Type Description
ExecutionContext | None

Restored execution context or None

Source code in ondine/orchestration/state_manager.py
def load_checkpoint(self, session_id: UUID) -> ExecutionContext | None:
    """
    Load checkpoint for session.

    Args:
        session_id: Session identifier

    Returns:
        Restored execution context or None
    """
    try:
        checkpoint_data = self.storage.load(session_id)

        if checkpoint_data is None:
            return None

        context = ExecutionContext.from_checkpoint(checkpoint_data)
        logger.info(f"Checkpoint loaded from row {context.last_processed_row}")

        return context
    except Exception as e:
        logger.error(f"Failed to load checkpoint: {e}")
        return None

can_resume

can_resume(session_id: UUID) -> bool

Check if session can be resumed.

Parameters:

Name Type Description Default
session_id UUID

Session identifier

required

Returns:

Type Description
bool

True if checkpoint exists

Source code in ondine/orchestration/state_manager.py
def can_resume(self, session_id: UUID) -> bool:
    """
    Check if session can be resumed.

    Args:
        session_id: Session identifier

    Returns:
        True if checkpoint exists
    """
    return self.storage.exists(session_id)

cleanup_checkpoints

cleanup_checkpoints(session_id: UUID) -> bool

Delete checkpoints for session.

Parameters:

Name Type Description Default
session_id UUID

Session identifier

required

Returns:

Type Description
bool

True if deleted

Source code in ondine/orchestration/state_manager.py
def cleanup_checkpoints(self, session_id: UUID) -> bool:
    """
    Delete checkpoints for session.

    Args:
        session_id: Session identifier

    Returns:
        True if deleted
    """
    try:
        success = self.storage.delete(session_id)
        if success:
            logger.info(f"Checkpoints cleaned up for session {session_id}")
        return success
    except Exception as e:
        logger.error(f"Failed to cleanup checkpoints: {e}")
        return False

list_checkpoints

list_checkpoints() -> list[CheckpointInfo]

List all available checkpoints.

Returns:

Type Description
list[CheckpointInfo]

List of checkpoint information

Source code in ondine/orchestration/state_manager.py
def list_checkpoints(self) -> list[CheckpointInfo]:
    """
    List all available checkpoints.

    Returns:
        List of checkpoint information
    """
    return self.storage.list_checkpoints()

StreamingExecutor

StreamingExecutor(chunk_size: int = 1000)

Bases: ExecutionStrategy

Streaming execution strategy.

Processes data in chunks to maintain constant memory usage. Ideal for very large datasets (100K+ rows) that don't fit in memory.

Benefits: - Constant memory footprint - Can process unlimited dataset sizes - Checkpoints at chunk boundaries - Early results available

Initialize streaming executor.

Parameters:

Name Type Description Default
chunk_size int

Number of rows per chunk

1000
Source code in ondine/orchestration/streaming_executor.py
def __init__(self, chunk_size: int = 1000):
    """
    Initialize streaming executor.

    Args:
        chunk_size: Number of rows per chunk
    """
    self.chunk_size = chunk_size
    self.logger = logger

name property

name: str

Strategy name.

execute

execute(stages: list[PipelineStage], context: ExecutionContext) -> Iterator[pd.DataFrame]

Execute stages in streaming mode.

Parameters:

Name Type Description Default
stages list[PipelineStage]

Pipeline stages

required
context ExecutionContext

Execution context

required

Yields:

Type Description
DataFrame

DataFrames with processed chunks

Source code in ondine/orchestration/streaming_executor.py
def execute(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> Iterator[pd.DataFrame]:
    """
    Execute stages in streaming mode.

    Args:
        stages: Pipeline stages
        context: Execution context

    Yields:
        DataFrames with processed chunks
    """
    self.logger.info(f"Starting streaming execution (chunk_size={self.chunk_size})")

    # Get data loader stage
    data_loader = stages[0]

    # Stream data in chunks
    chunk_index = 0
    total_rows_processed = 0

    # Read data in chunks
    for chunk in self._read_chunks(data_loader, context):
        self.logger.info(f"Processing chunk {chunk_index} ({len(chunk)} rows)")

        # Process chunk through remaining stages
        result_chunk = self._process_chunk(chunk, stages[1:], context)

        # Update context
        total_rows_processed += len(result_chunk)
        context.update_row(total_rows_processed - 1)

        # Yield result
        yield result_chunk

        chunk_index += 1

    self.logger.info(
        f"Streaming execution complete: {total_rows_processed} rows, "
        f"{chunk_index} chunks"
    )

supports_async

supports_async() -> bool

Streaming executor doesn't support async.

Source code in ondine/orchestration/streaming_executor.py
def supports_async(self) -> bool:
    """Streaming executor doesn't support async."""
    return False

supports_streaming

supports_streaming() -> bool

Streaming executor supports streaming.

Source code in ondine/orchestration/streaming_executor.py
def supports_streaming(self) -> bool:
    """Streaming executor supports streaming."""
    return True

execute_stream

execute_stream(stages: list[PipelineStage], context: ExecutionContext) -> Iterator[pd.DataFrame]

Alias for execute() method.

Source code in ondine/orchestration/streaming_executor.py
def execute_stream(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> Iterator[pd.DataFrame]:
    """Alias for execute() method."""
    return self.execute(stages, context)

StreamingResult

StreamingResult()

Result container for streaming execution.

Provides access to metrics after consuming the stream.

Initialize streaming result.

Source code in ondine/orchestration/streaming_executor.py
def __init__(self):
    """Initialize streaming result."""
    self.chunks_processed = 0
    self.total_rows = 0
    self.total_cost = Decimal("0.0")
    self.start_time = datetime.now()
    self.end_time = None

add_chunk

add_chunk(chunk: DataFrame, cost: Decimal)

Add chunk statistics.

Source code in ondine/orchestration/streaming_executor.py
def add_chunk(self, chunk: pd.DataFrame, cost: Decimal):
    """Add chunk statistics."""
    self.chunks_processed += 1
    self.total_rows += len(chunk)
    self.total_cost += cost

finalize

finalize() -> ExecutionResult

Create final ExecutionResult.

Source code in ondine/orchestration/streaming_executor.py
def finalize(self) -> ExecutionResult:
    """Create final ExecutionResult."""
    self.end_time = datetime.now()
    duration = (self.end_time - self.start_time).total_seconds()

    stats = ProcessingStats(
        total_rows=self.total_rows,
        processed_rows=self.total_rows,
        failed_rows=0,
        skipped_rows=0,
        rows_per_second=self.total_rows / duration if duration > 0 else 0,
        total_duration_seconds=duration,
    )

    costs = CostEstimate(
        total_cost=self.total_cost,
        total_tokens=0,
        input_tokens=0,
        output_tokens=0,
        rows=self.total_rows,
        confidence="actual",
    )

    return ExecutionResult(
        data=pd.DataFrame(),  # Streaming doesn't return full data
        metrics=stats,
        costs=costs,
        start_time=self.start_time,
        end_time=self.end_time,
        success=True,
    )

SyncExecutor

SyncExecutor()

Bases: ExecutionStrategy

Synchronous execution strategy.

Uses ThreadPoolExecutor for concurrent LLM calls while maintaining sequential stage execution. This is the default strategy that preserves current behavior.

Initialize synchronous executor.

Source code in ondine/orchestration/sync_executor.py
def __init__(self):
    """Initialize synchronous executor."""
    self.logger = logger

name property

name: str

Strategy name.

execute

execute(stages: list[PipelineStage], context: ExecutionContext) -> ExecutionResult

Execute stages synchronously.

Parameters:

Name Type Description Default
stages list[PipelineStage]

Pipeline stages

required
context ExecutionContext

Execution context

required

Returns:

Type Description
ExecutionResult

ExecutionResult with data and metrics

Source code in ondine/orchestration/sync_executor.py
def execute(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> ExecutionResult:
    """
    Execute stages synchronously.

    Args:
        stages: Pipeline stages
        context: Execution context

    Returns:
        ExecutionResult with data and metrics
    """
    start_time = datetime.now()

    try:
        # Execute stages sequentially
        result_data = self._execute_stages(stages, context)

        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()

        # Calculate stats
        stats = ProcessingStats(
            total_rows=context.total_rows,
            processed_rows=context.last_processed_row + 1,
            failed_rows=context.total_rows - (context.last_processed_row + 1),
            skipped_rows=0,
            rows_per_second=context.total_rows / duration if duration > 0 else 0,
            total_duration_seconds=duration,
        )

        # Get cost estimate
        cost_estimate = CostEstimate(
            total_cost=context.accumulated_cost,
            total_tokens=context.accumulated_tokens,
            input_tokens=0,  # Would need to track separately
            output_tokens=0,
            rows=context.total_rows,
            confidence="actual",
        )

        return ExecutionResult(
            data=result_data,
            metrics=stats,
            costs=cost_estimate,
            execution_id=context.session_id,
            start_time=start_time,
            end_time=end_time,
            success=True,
        )

    except Exception as e:
        self.logger.error(f"Pipeline execution failed: {e}")
        raise

supports_async

supports_async() -> bool

Sync executor doesn't support async.

Source code in ondine/orchestration/sync_executor.py
def supports_async(self) -> bool:
    """Sync executor doesn't support async."""
    return False

supports_streaming

supports_streaming() -> bool

Sync executor doesn't support streaming.

Source code in ondine/orchestration/sync_executor.py
def supports_streaming(self) -> bool:
    """Sync executor doesn't support streaming."""
    return False