Skip to content

sync_executor

sync_executor

Synchronous execution strategy.

Default executor that maintains current behavior using ThreadPoolExecutor for concurrent LLM calls.

SyncExecutor

SyncExecutor()

Bases: ExecutionStrategy

Synchronous execution strategy.

Uses ThreadPoolExecutor for concurrent LLM calls while maintaining sequential stage execution. This is the default strategy that preserves current behavior.

Initialize synchronous executor.

Source code in ondine/orchestration/sync_executor.py
def __init__(self):
    """Initialize synchronous executor."""
    self.logger = logger

name property

name: str

Strategy name.

execute

execute(stages: list[PipelineStage], context: ExecutionContext) -> ExecutionResult

Execute stages synchronously.

Parameters:

Name Type Description Default
stages list[PipelineStage]

Pipeline stages

required
context ExecutionContext

Execution context

required

Returns:

Type Description
ExecutionResult

ExecutionResult with data and metrics

Source code in ondine/orchestration/sync_executor.py
def execute(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> ExecutionResult:
    """
    Execute stages synchronously.

    Args:
        stages: Pipeline stages
        context: Execution context

    Returns:
        ExecutionResult with data and metrics
    """
    start_time = datetime.now()

    try:
        # Execute stages sequentially
        result_data = self._execute_stages(stages, context)

        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()

        # Calculate stats
        stats = ProcessingStats(
            total_rows=context.total_rows,
            processed_rows=context.last_processed_row + 1,
            failed_rows=context.total_rows - (context.last_processed_row + 1),
            skipped_rows=0,
            rows_per_second=context.total_rows / duration if duration > 0 else 0,
            total_duration_seconds=duration,
        )

        # Get cost estimate
        cost_estimate = CostEstimate(
            total_cost=context.accumulated_cost,
            total_tokens=context.accumulated_tokens,
            input_tokens=0,  # Would need to track separately
            output_tokens=0,
            rows=context.total_rows,
            confidence="actual",
        )

        return ExecutionResult(
            data=result_data,
            metrics=stats,
            costs=cost_estimate,
            execution_id=context.session_id,
            start_time=start_time,
            end_time=end_time,
            success=True,
        )

    except Exception as e:
        self.logger.error(f"Pipeline execution failed: {e}")
        raise

supports_async

supports_async() -> bool

Sync executor doesn't support async.

Source code in ondine/orchestration/sync_executor.py
def supports_async(self) -> bool:
    """Sync executor doesn't support async."""
    return False

supports_streaming

supports_streaming() -> bool

Sync executor doesn't support streaming.

Source code in ondine/orchestration/sync_executor.py
def supports_streaming(self) -> bool:
    """Sync executor doesn't support streaming."""
    return False