Skip to content

streaming_executor

streaming_executor

Streaming execution strategy.

Provides memory-efficient processing for large datasets by processing data in chunks.

StreamingExecutor

StreamingExecutor(chunk_size: int = 1000)

Bases: ExecutionStrategy

Streaming execution strategy.

Processes data in chunks to maintain constant memory usage. Ideal for very large datasets (100K+ rows) that don't fit in memory.

Benefits: - Constant memory footprint - Can process unlimited dataset sizes - Checkpoints at chunk boundaries - Early results available

Initialize streaming executor.

Parameters:

Name Type Description Default
chunk_size int

Number of rows per chunk

1000
Source code in ondine/orchestration/streaming_executor.py
def __init__(self, chunk_size: int = 1000):
    """
    Initialize streaming executor.

    Args:
        chunk_size: Number of rows per chunk
    """
    self.chunk_size = chunk_size
    self.logger = logger

name property

name: str

Strategy name.

execute

execute(stages: list[PipelineStage], context: ExecutionContext) -> Iterator[pd.DataFrame]

Execute stages in streaming mode.

Parameters:

Name Type Description Default
stages list[PipelineStage]

Pipeline stages

required
context ExecutionContext

Execution context

required

Yields:

Type Description
DataFrame

DataFrames with processed chunks

Source code in ondine/orchestration/streaming_executor.py
def execute(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> Iterator[pd.DataFrame]:
    """
    Execute stages in streaming mode.

    Args:
        stages: Pipeline stages
        context: Execution context

    Yields:
        DataFrames with processed chunks
    """
    self.logger.info(f"Starting streaming execution (chunk_size={self.chunk_size})")

    # Get data loader stage
    data_loader = stages[0]

    # Stream data in chunks
    chunk_index = 0
    total_rows_processed = 0

    # Read data in chunks
    for chunk in self._read_chunks(data_loader, context):
        self.logger.info(f"Processing chunk {chunk_index} ({len(chunk)} rows)")

        # Process chunk through remaining stages
        result_chunk = self._process_chunk(chunk, stages[1:], context)

        # Update context
        total_rows_processed += len(result_chunk)
        context.update_row(total_rows_processed - 1)

        # Yield result
        yield result_chunk

        chunk_index += 1

    self.logger.info(
        f"Streaming execution complete: {total_rows_processed} rows, "
        f"{chunk_index} chunks"
    )

supports_async

supports_async() -> bool

Streaming executor doesn't support async.

Source code in ondine/orchestration/streaming_executor.py
def supports_async(self) -> bool:
    """Streaming executor doesn't support async."""
    return False

supports_streaming

supports_streaming() -> bool

Streaming executor supports streaming.

Source code in ondine/orchestration/streaming_executor.py
def supports_streaming(self) -> bool:
    """Streaming executor supports streaming."""
    return True

execute_stream

execute_stream(stages: list[PipelineStage], context: ExecutionContext) -> Iterator[pd.DataFrame]

Alias for execute() method.

Source code in ondine/orchestration/streaming_executor.py
def execute_stream(
    self,
    stages: list[PipelineStage],
    context: ExecutionContext,
) -> Iterator[pd.DataFrame]:
    """Alias for execute() method."""
    return self.execute(stages, context)

StreamingResult

StreamingResult()

Result container for streaming execution.

Provides access to metrics after consuming the stream.

Initialize streaming result.

Source code in ondine/orchestration/streaming_executor.py
def __init__(self):
    """Initialize streaming result."""
    self.chunks_processed = 0
    self.total_rows = 0
    self.total_cost = Decimal("0.0")
    self.start_time = datetime.now()
    self.end_time = None

add_chunk

add_chunk(chunk: DataFrame, cost: Decimal)

Add chunk statistics.

Source code in ondine/orchestration/streaming_executor.py
def add_chunk(self, chunk: pd.DataFrame, cost: Decimal):
    """Add chunk statistics."""
    self.chunks_processed += 1
    self.total_rows += len(chunk)
    self.total_cost += cost

finalize

finalize() -> ExecutionResult

Create final ExecutionResult.

Source code in ondine/orchestration/streaming_executor.py
def finalize(self) -> ExecutionResult:
    """Create final ExecutionResult."""
    self.end_time = datetime.now()
    duration = (self.end_time - self.start_time).total_seconds()

    stats = ProcessingStats(
        total_rows=self.total_rows,
        processed_rows=self.total_rows,
        failed_rows=0,
        skipped_rows=0,
        rows_per_second=self.total_rows / duration if duration > 0 else 0,
        total_duration_seconds=duration,
    )

    costs = CostEstimate(
        total_cost=self.total_cost,
        total_tokens=0,
        input_tokens=0,
        output_tokens=0,
        rows=self.total_rows,
        confidence="actual",
    )

    return ExecutionResult(
        data=pd.DataFrame(),  # Streaming doesn't return full data
        metrics=stats,
        costs=costs,
        start_time=self.start_time,
        end_time=self.end_time,
        success=True,
    )