Skip to content

async_executor

async_executor

Asynchronous execution strategy.

Provides async/await support for non-blocking execution, ideal for integration with FastAPI, aiohttp, and other async frameworks.

AsyncExecutor

AsyncExecutor(max_concurrency: int = 10)

Bases: ExecutionStrategy

Asynchronous execution strategy.

Uses asyncio for true non-blocking execution. Leverages LlamaIndex's async methods (acomplete) for concurrent LLM calls without threads.

Benefits: - Non-blocking (works with FastAPI, aiohttp) - Better resource utilization - Higher concurrency without thread overhead - Ideal for I/O-bound operations

Initialize async executor.

Parameters:

Name Type Description Default
max_concurrency int

Maximum concurrent async tasks

10
Source code in ondine/orchestration/async_executor.py
def __init__(self, max_concurrency: int = 10):
    """
    Initialize async executor.

    Args:
        max_concurrency: Maximum concurrent async tasks
    """
    self.max_concurrency = max_concurrency
    self.logger = logger
    self.semaphore = asyncio.Semaphore(max_concurrency)

name property

name: str

Strategy name.

execute async

execute(stages: list[PipelineStage], context: ExecutionContext) -> ExecutionResult

Execute stages asynchronously.

Parameters:

Name Type Description Default
stages list[PipelineStage]

Pipeline stages

required
context ExecutionContext

Execution context

required

Returns:

Type Description
ExecutionResult

ExecutionResult with data and metrics

Source code in ondine/orchestration/async_executor.py
async def execute(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> ExecutionResult:
    """
    Execute stages asynchronously.

    Args:
        stages: Pipeline stages
        context: Execution context

    Returns:
        ExecutionResult with data and metrics
    """
    start_time = datetime.now()

    try:
        # Execute stages with async/await
        result_data = await self._execute_stages_async(stages, context)

        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()

        # Calculate stats
        stats = ProcessingStats(
            total_rows=context.total_rows,
            processed_rows=context.last_processed_row + 1,
            failed_rows=context.total_rows - (context.last_processed_row + 1),
            skipped_rows=0,
            rows_per_second=context.total_rows / duration if duration > 0 else 0,
            total_duration_seconds=duration,
        )

        # Get cost estimate
        cost_estimate = CostEstimate(
            total_cost=context.accumulated_cost,
            total_tokens=context.accumulated_tokens,
            input_tokens=0,
            output_tokens=0,
            rows=context.total_rows,
            confidence="actual",
        )

        return ExecutionResult(
            data=result_data,
            metrics=stats,
            costs=cost_estimate,
            execution_id=context.session_id,
            start_time=start_time,
            end_time=end_time,
            success=True,
        )

    except Exception as e:
        self.logger.error(f"Async pipeline execution failed: {e}")
        raise

supports_async

supports_async() -> bool

Async executor supports async.

Source code in ondine/orchestration/async_executor.py
def supports_async(self) -> bool:
    """Async executor supports async."""
    return True

supports_streaming

supports_streaming() -> bool

Async executor doesn't support streaming.

Source code in ondine/orchestration/async_executor.py
def supports_streaming(self) -> bool:
    """Async executor doesn't support streaming."""
    return False

execute_async async

execute_async(stages: list[PipelineStage], context: ExecutionContext) -> ExecutionResult

Alias for execute() method.

Source code in ondine/orchestration/async_executor.py
async def execute_async(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> ExecutionResult:
    """Alias for execute() method."""
    return await self.execute(stages, context)