Skip to content

ondine

ondine

LLM Dataset Processing Engine.

A production-grade SDK for processing tabular datasets using Large Language Models with reliability, observability, and cost control.

DatasetProcessor

DatasetProcessor(data: str | DataFrame, input_column: str, output_column: str, prompt: str, llm_config: dict[str, any])

Simplified API for single-prompt, single-column use cases.

This is a convenience wrapper around PipelineBuilder for users who don't need fine-grained control.

Example

processor = DatasetProcessor( data="data.csv", input_column="description", output_column="cleaned", prompt="Clean this text: {description}", llm_config={"provider": "openai", "model": "gpt-4o-mini"} ) result = processor.run()

Initialize dataset processor.

Parameters:

Name Type Description Default
data str | DataFrame

CSV file path or DataFrame

required
input_column str

Input column name

required
output_column str

Output column name

required
prompt str

Prompt template

required
llm_config dict[str, any]

LLM configuration dict

required
Source code in ondine/api/dataset_processor.py
def __init__(
    self,
    data: str | pd.DataFrame,
    input_column: str,
    output_column: str,
    prompt: str,
    llm_config: dict[str, any],
):
    """
    Initialize dataset processor.

    Args:
        data: CSV file path or DataFrame
        input_column: Input column name
        output_column: Output column name
        prompt: Prompt template
        llm_config: LLM configuration dict
    """
    self.data = data
    self.input_column = input_column
    self.output_column = output_column
    self.prompt = prompt
    self.llm_config = llm_config

    # Build pipeline internally
    builder = PipelineBuilder.create()

    # Configure data source
    if isinstance(data, str):
        builder.from_csv(
            data,
            input_columns=[input_column],
            output_columns=[output_column],
        )
    elif isinstance(data, pd.DataFrame):
        builder.from_dataframe(
            data,
            input_columns=[input_column],
            output_columns=[output_column],
        )
    else:
        raise ValueError("data must be file path or DataFrame")

    # Configure prompt
    builder.with_prompt(prompt)

    # Configure LLM
    provider = llm_config.get("provider", "openai")
    model = llm_config.get("model", "gpt-4o-mini")
    api_key = llm_config.get("api_key")
    temperature = llm_config.get("temperature", 0.0)
    max_tokens = llm_config.get("max_tokens")

    builder.with_llm(
        provider=provider,
        model=model,
        api_key=api_key,
        temperature=temperature,
        max_tokens=max_tokens,
    )

    # Build pipeline
    self.pipeline = builder.build()

run

run() -> pd.DataFrame

Execute processing and return results.

Returns:

Type Description
DataFrame

DataFrame with results

Source code in ondine/api/dataset_processor.py
def run(self) -> pd.DataFrame:
    """
    Execute processing and return results.

    Returns:
        DataFrame with results
    """
    result = self.pipeline.execute()
    return result.data

run_sample

run_sample(n: int = 10) -> pd.DataFrame

Test on first N rows.

Parameters:

Name Type Description Default
n int

Number of rows to process

10

Returns:

Type Description
DataFrame

DataFrame with sample results

Source code in ondine/api/dataset_processor.py
def run_sample(self, n: int = 10) -> pd.DataFrame:
    """
    Test on first N rows.

    Args:
        n: Number of rows to process

    Returns:
        DataFrame with sample results
    """
    # Create sample pipeline
    if isinstance(self.data, str):
        df = pd.read_csv(self.data).head(n)
    else:
        df = self.data.head(n)

    builder = (
        PipelineBuilder.create()
        .from_dataframe(
            df,
            input_columns=[self.input_column],
            output_columns=[self.output_column],
        )
        .with_prompt(self.prompt)
        .with_llm(
            provider=self.llm_config.get("provider", "openai"),
            model=self.llm_config.get("model", "gpt-4o-mini"),
            api_key=self.llm_config.get("api_key"),
            temperature=self.llm_config.get("temperature", 0.0),
        )
    )

    sample_pipeline = builder.build()
    result = sample_pipeline.execute()
    return result.data

estimate_cost

estimate_cost() -> float

Estimate total processing cost.

Returns:

Type Description
float

Estimated cost in USD

Source code in ondine/api/dataset_processor.py
def estimate_cost(self) -> float:
    """
    Estimate total processing cost.

    Returns:
        Estimated cost in USD
    """
    estimate = self.pipeline.estimate_cost()
    return float(estimate.total_cost)

Pipeline

Pipeline(specifications: PipelineSpecifications, dataframe: DataFrame | None = None, executor: ExecutionStrategy | None = None)

Main pipeline class - Facade for dataset processing.

Provides high-level interface for building and executing LLM-powered data transformations.

Example

pipeline = Pipeline(specifications) result = pipeline.execute()

Initialize pipeline with specifications.

Parameters:

Name Type Description Default
specifications PipelineSpecifications

Complete pipeline configuration

required
dataframe DataFrame | None

Optional pre-loaded DataFrame

None
executor ExecutionStrategy | None

Optional execution strategy (default: SyncExecutor)

None
Source code in ondine/api/pipeline.py
def __init__(
    self,
    specifications: PipelineSpecifications,
    dataframe: pd.DataFrame | None = None,
    executor: ExecutionStrategy | None = None,
):
    """
    Initialize pipeline with specifications.

    Args:
        specifications: Complete pipeline configuration
        dataframe: Optional pre-loaded DataFrame
        executor: Optional execution strategy (default: SyncExecutor)
    """
    self.id = uuid4()
    self.specifications = specifications
    self.dataframe = dataframe
    self.executor = executor or SyncExecutor()
    self.observers: list[ExecutionObserver] = []
    self.logger = get_logger(f"{__name__}.{self.id}")

add_observer

add_observer(observer: ExecutionObserver) -> Pipeline

Add execution observer.

Parameters:

Name Type Description Default
observer ExecutionObserver

Observer to add

required

Returns:

Type Description
Pipeline

Self for chaining

Source code in ondine/api/pipeline.py
def add_observer(self, observer: ExecutionObserver) -> "Pipeline":
    """
    Add execution observer.

    Args:
        observer: Observer to add

    Returns:
        Self for chaining
    """
    self.observers.append(observer)
    return self

validate

validate() -> ValidationResult

Validate pipeline configuration.

Returns:

Type Description
ValidationResult

ValidationResult with any errors/warnings

Source code in ondine/api/pipeline.py
def validate(self) -> ValidationResult:
    """
    Validate pipeline configuration.

    Returns:
        ValidationResult with any errors/warnings
    """
    result = ValidationResult(is_valid=True)

    # Validate dataset spec
    if not self.specifications.dataset.input_columns:
        result.add_error("No input columns specified")

    if not self.specifications.dataset.output_columns:
        result.add_error("No output columns specified")

    # Validate that input columns exist in dataframe (if dataframe is provided)
    if self.dataframe is not None and self.specifications.dataset.input_columns:
        df_cols = set(self.dataframe.columns)
        input_cols = set(self.specifications.dataset.input_columns)
        missing_cols = input_cols - df_cols
        if missing_cols:
            result.add_error(
                f"Input columns not found in dataframe: {missing_cols}"
            )

    # Validate prompt spec
    if not self.specifications.prompt.template:
        result.add_error("No prompt template specified")
    else:
        # Check that template variables match input columns
        import re

        template_vars = set(
            re.findall(r"\{(\w+)\}", self.specifications.prompt.template)
        )
        input_cols = set(self.specifications.dataset.input_columns)
        missing_vars = template_vars - input_cols
        if missing_vars:
            result.add_error(
                f"Template variables not in input columns: {missing_vars}"
            )

    # Validate LLM spec
    if not self.specifications.llm.model:
        result.add_error("No LLM model specified")

    return result

estimate_cost

estimate_cost() -> CostEstimate

Estimate total processing cost.

Returns:

Type Description
CostEstimate

Cost estimate

Source code in ondine/api/pipeline.py
def estimate_cost(self) -> CostEstimate:
    """
    Estimate total processing cost.

    Returns:
        Cost estimate
    """
    # Create stages
    loader = DataLoaderStage(self.dataframe)

    # Load first few rows for estimation
    df = loader.process(self.specifications.dataset, ExecutionContext())
    sample_size = min(10, len(df))
    sample_df = df.head(sample_size)

    # Create formatter and get prompts
    formatter = PromptFormatterStage(self.specifications.processing.batch_size)
    batches = formatter.process(
        (sample_df, self.specifications.prompt), ExecutionContext()
    )

    # Create LLM client and estimate
    llm_client = create_llm_client(self.specifications.llm)
    llm_stage = LLMInvocationStage(llm_client)

    sample_estimate = llm_stage.estimate_cost(batches)

    # Scale to full dataset
    scale_factor = Decimal(len(df)) / Decimal(sample_size)

    return CostEstimate(
        total_cost=sample_estimate.total_cost * scale_factor,
        total_tokens=int(sample_estimate.total_tokens * float(scale_factor)),
        input_tokens=int(sample_estimate.input_tokens * float(scale_factor)),
        output_tokens=int(sample_estimate.output_tokens * float(scale_factor)),
        rows=len(df),
        confidence="sample-based",
    )

execute

execute(resume_from: UUID | None = None) -> ExecutionResult

Execute pipeline end-to-end.

Parameters:

Name Type Description Default
resume_from UUID | None

Optional session ID to resume from checkpoint

None

Returns:

Type Description
ExecutionResult

ExecutionResult with data and metrics

Source code in ondine/api/pipeline.py
def execute(self, resume_from: UUID | None = None) -> ExecutionResult:
    """
    Execute pipeline end-to-end.

    Args:
        resume_from: Optional session ID to resume from checkpoint

    Returns:
        ExecutionResult with data and metrics
    """
    # Validate first
    validation = self.validate()
    if not validation.is_valid:
        raise ValueError(f"Pipeline validation failed: {validation.errors}")

    # Create or restore execution context
    state_manager = StateManager(
        storage=LocalFileCheckpointStorage(
            self.specifications.processing.checkpoint_dir
        ),
        checkpoint_interval=self.specifications.processing.checkpoint_interval,
    )

    if resume_from:
        # Resume from checkpoint
        context = state_manager.load_checkpoint(resume_from)
        if not context:
            raise ValueError(f"No checkpoint found for session {resume_from}")
        self.logger.info(
            f"Resuming from checkpoint at row {context.last_processed_row}"
        )
    else:
        # Create new context
        context = ExecutionContext(pipeline_id=self.id)

    # Add default observers if none specified
    if not self.observers:
        self.observers = [
            ProgressBarObserver(),
            LoggingObserver(),
            CostTrackingObserver(),
        ]

    # Attach observers to context for progress notifications
    context.observers = self.observers

    # Initialize new observability system if observers configured
    observer_configs = self.specifications.metadata.get("observers", [])
    if observer_configs:
        from ondine.observability.dispatcher import ObserverDispatcher
        from ondine.observability.registry import ObserverRegistry

        # Instantiate observers from configuration
        new_observers = []
        for observer_name, observer_config in observer_configs:
            try:
                observer_class = ObserverRegistry.get(observer_name)
                observer_instance = observer_class(config=observer_config)
                new_observers.append(observer_instance)
                self.logger.info(f"Initialized observer: {observer_name}")
            except Exception as e:
                self.logger.warning(
                    f"Failed to initialize observer '{observer_name}': {e}"
                )

        # Create dispatcher and attach to context
        if new_observers:
            context.observer_dispatcher = ObserverDispatcher(new_observers)

            # Emit pipeline start event
            from ondine.observability.events import PipelineStartEvent

            start_event = PipelineStartEvent(
                pipeline_id=self.id,
                run_id=context.session_id,
                timestamp=datetime.now(),
                trace_id=context.trace_id,
                span_id=context.span_id,
                config={},
                metadata=self.specifications.metadata,
                total_rows=0,  # Will be updated after data loading
            )
            context.observer_dispatcher.dispatch("pipeline_start", start_event)

    # Notify legacy observers of start
    for observer in self.observers:
        observer.on_pipeline_start(self, context)

    try:
        # Execute stages (preprocessing happens inside if enabled)
        result_df = self._execute_stages(context, state_manager)

        # Mark completion
        context.end_time = datetime.now()

        # Create execution result
        result = ExecutionResult(
            data=result_df,
            metrics=context.get_stats(),
            costs=CostEstimate(
                total_cost=context.accumulated_cost,
                total_tokens=context.accumulated_tokens,
                input_tokens=0,
                output_tokens=0,
                rows=context.total_rows,
                confidence="actual",
            ),
            execution_id=context.session_id,
            start_time=context.start_time,
            end_time=context.end_time,
            success=True,
        )

        # Optional: Auto-retry failed rows
        if self.specifications.processing.auto_retry_failed:
            # Get preprocessed data from context (or loaded data if no preprocessing)
            retry_source_df = context.intermediate_data.get("preprocessed_data")
            if retry_source_df is None:
                retry_source_df = context.intermediate_data.get("loaded_data")
            result = self._auto_retry_failed_rows(result, retry_source_df)

        # Cleanup checkpoints on success
        state_manager.cleanup_checkpoints(context.session_id)

        # Notify legacy observers of completion
        for observer in self.observers:
            observer.on_pipeline_complete(context, result)

        # Emit pipeline end event for new observability system
        if context.observer_dispatcher:
            from ondine.observability.events import PipelineEndEvent

            end_event = PipelineEndEvent(
                pipeline_id=self.id,
                run_id=context.session_id,
                success=True,
                timestamp=datetime.now(),
                trace_id=context.trace_id,
                span_id=context.span_id,
                total_duration_ms=(
                    (context.end_time - context.start_time).total_seconds() * 1000
                    if context.end_time
                    else 0
                ),
                rows_processed=result.metrics.processed_rows,
                rows_succeeded=result.metrics.processed_rows
                - result.metrics.failed_rows,
                rows_failed=result.metrics.failed_rows,
                rows_skipped=result.metrics.skipped_rows,
                total_cost=result.costs.total_cost,
                total_tokens=result.costs.total_tokens,
                input_tokens=result.costs.input_tokens,
                output_tokens=result.costs.output_tokens,
            )
            context.observer_dispatcher.dispatch("pipeline_end", end_event)

            # Flush and close observers
            context.observer_dispatcher.flush_all()
            context.observer_dispatcher.close_all()

        return result

    except Exception as e:
        # Save checkpoint on error
        state_manager.save_checkpoint(context)
        self.logger.error(
            f"Pipeline failed. Checkpoint saved. "
            f"Resume with: pipeline.execute(resume_from=UUID('{context.session_id}'))"
        )

        # Notify legacy observers of error
        for observer in self.observers:
            observer.on_pipeline_error(context, e)

        # Emit error event for new observability system
        if context.observer_dispatcher:
            from ondine.observability.events import ErrorEvent

            error_event = ErrorEvent(
                pipeline_id=self.id,
                run_id=context.session_id,
                timestamp=datetime.now(),
                trace_id=context.trace_id,
                span_id=context.span_id,
                error=e,
                error_type=type(e).__name__,
                error_message=str(e),
                stack_trace="",  # Could add full traceback if needed
            )
            context.observer_dispatcher.dispatch("error", error_event)

            # Flush and close observers even on error
            context.observer_dispatcher.flush_all()
            context.observer_dispatcher.close_all()

        raise

execute_async async

execute_async(resume_from: UUID | None = None) -> ExecutionResult

Execute pipeline asynchronously.

Uses AsyncExecutor for non-blocking execution. Ideal for integration with FastAPI, aiohttp, and other async frameworks.

Parameters:

Name Type Description Default
resume_from UUID | None

Optional session ID to resume from checkpoint

None

Returns:

Type Description
ExecutionResult

ExecutionResult with data and metrics

Raises:

Type Description
ValueError

If executor doesn't support async

Source code in ondine/api/pipeline.py
async def execute_async(self, resume_from: UUID | None = None) -> ExecutionResult:
    """
    Execute pipeline asynchronously.

    Uses AsyncExecutor for non-blocking execution. Ideal for integration
    with FastAPI, aiohttp, and other async frameworks.

    Args:
        resume_from: Optional session ID to resume from checkpoint

    Returns:
        ExecutionResult with data and metrics

    Raises:
        ValueError: If executor doesn't support async
    """
    if not self.executor.supports_async():
        raise ValueError(
            "Current executor doesn't support async. "
            "Use AsyncExecutor: Pipeline(specs, executor=AsyncExecutor())"
        )

    # For now, wrap synchronous execution in async
    # TODO: Implement fully async execution pipeline
    import asyncio

    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, self.execute, resume_from)

execute_stream

execute_stream(chunk_size: int | None = None) -> Iterator[ExecutionResult]

Execute pipeline in streaming mode.

Processes data in chunks for memory-efficient handling of large datasets. Ideal for datasets that don't fit in memory.

Parameters:

Name Type Description Default
chunk_size int | None

Number of rows per chunk (uses executor's chunk_size if None)

None

Yields:

Type Description
ExecutionResult

ExecutionResult objects for each processed chunk

Raises:

Type Description
ValueError

If executor doesn't support streaming

Source code in ondine/api/pipeline.py
def execute_stream(
    self, chunk_size: int | None = None
) -> Iterator[ExecutionResult]:
    """
    Execute pipeline in streaming mode.

    Processes data in chunks for memory-efficient handling of large datasets.
    Ideal for datasets that don't fit in memory.

    Args:
        chunk_size: Number of rows per chunk (uses executor's chunk_size if None)

    Yields:
        ExecutionResult objects for each processed chunk

    Raises:
        ValueError: If executor doesn't support streaming
    """
    if not self.executor.supports_streaming():
        raise ValueError(
            "Current executor doesn't support streaming. "
            "Use StreamingExecutor: Pipeline(specs, executor=StreamingExecutor())"
        )

    # Use executor's chunk_size if not provided
    if chunk_size is None and isinstance(self.executor, StreamingExecutor):
        chunk_size = self.executor.chunk_size
    elif chunk_size is None:
        chunk_size = 1000  # Default fallback

    # For now, execute the full pipeline and split result into chunks
    # TODO: Implement proper streaming execution that processes chunks independently
    result = self.execute()

    # Split the result data into chunks and yield as separate ExecutionResults
    total_rows = len(result.data)
    for start_idx in range(0, total_rows, chunk_size):
        end_idx = min(start_idx + chunk_size, total_rows)
        chunk_data = result.data.iloc[start_idx:end_idx].copy()

        # Create a chunk result with proportional metrics
        chunk_rows = len(chunk_data)
        chunk_result = ExecutionResult(
            data=chunk_data,
            metrics=ProcessingStats(
                total_rows=chunk_rows,
                processed_rows=chunk_rows,
                failed_rows=0,
                skipped_rows=0,
                rows_per_second=result.metrics.rows_per_second,
                total_duration_seconds=result.metrics.total_duration_seconds
                * (chunk_rows / total_rows),
                stage_durations=result.metrics.stage_durations,
            ),
            costs=CostEstimate(
                total_cost=result.costs.total_cost
                * Decimal(chunk_rows / total_rows),
                total_tokens=int(
                    result.costs.total_tokens * (chunk_rows / total_rows)
                ),
                input_tokens=int(
                    result.costs.input_tokens * (chunk_rows / total_rows)
                ),
                output_tokens=int(
                    result.costs.output_tokens * (chunk_rows / total_rows)
                ),
                rows=chunk_rows,
                confidence=result.costs.confidence,
            ),
            execution_id=result.execution_id,
            start_time=result.start_time,
            end_time=result.end_time,
            success=True,
        )
        yield chunk_result

PipelineBuilder

PipelineBuilder()

Fluent builder for constructing pipelines.

Provides an intuitive, chainable API for common use cases.

Example

pipeline = ( PipelineBuilder.create() .from_csv("data.csv", input_columns=["text"], output_columns=["result"]) .with_prompt("Process: {text}") .with_llm(provider="openai", model="gpt-4o-mini") .build() )

Initialize builder with None values.

Source code in ondine/api/pipeline_builder.py
def __init__(self):
    """Initialize builder with None values."""
    self._dataset_spec: DatasetSpec | None = None
    self._prompt_spec: PromptSpec | None = None
    self._llm_spec: LLMSpec | None = None
    self._processing_spec: ProcessingSpec = ProcessingSpec()
    self._output_spec: OutputSpec | None = None
    self._dataframe: pd.DataFrame | None = None
    self._executor: ExecutionStrategy | None = None
    self._custom_parser: any | None = None
    self._custom_llm_client: any | None = None
    self._custom_stages: list[dict] = []  # For custom stage injection
    self._observers: list[tuple[str, dict]] = []  # For observability

create staticmethod

create() -> PipelineBuilder

Start builder chain.

Returns:

Type Description
PipelineBuilder

New PipelineBuilder instance

Source code in ondine/api/pipeline_builder.py
@staticmethod
def create() -> "PipelineBuilder":
    """
    Start builder chain.

    Returns:
        New PipelineBuilder instance
    """
    return PipelineBuilder()

from_specifications staticmethod

from_specifications(specs: PipelineSpecifications) -> PipelineBuilder

Create builder from existing specifications.

Useful for loading from YAML and modifying programmatically.

Parameters:

Name Type Description Default
specs PipelineSpecifications

Complete pipeline specifications

required

Returns:

Type Description
PipelineBuilder

PipelineBuilder pre-configured with specs

Example

specs = load_pipeline_config("config.yaml") builder = PipelineBuilder.from_specifications(specs) pipeline = builder.build()

Source code in ondine/api/pipeline_builder.py
@staticmethod
def from_specifications(specs: PipelineSpecifications) -> "PipelineBuilder":
    """
    Create builder from existing specifications.

    Useful for loading from YAML and modifying programmatically.

    Args:
        specs: Complete pipeline specifications

    Returns:
        PipelineBuilder pre-configured with specs

    Example:
        specs = load_pipeline_config("config.yaml")
        builder = PipelineBuilder.from_specifications(specs)
        pipeline = builder.build()
    """
    builder = PipelineBuilder()
    builder._dataset_spec = specs.dataset
    builder._prompt_spec = specs.prompt
    builder._llm_spec = specs.llm
    builder._processing_spec = specs.processing
    builder._output_spec = specs.output
    return builder

from_csv

from_csv(path: str, input_columns: list[str], output_columns: list[str], delimiter: str = ',', encoding: str = 'utf-8') -> PipelineBuilder

Configure CSV data source.

Parameters:

Name Type Description Default
path str

Path to CSV file

required
input_columns list[str]

Input column names

required
output_columns list[str]

Output column names

required
delimiter str

CSV delimiter

','
encoding str

File encoding

'utf-8'

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def from_csv(
    self,
    path: str,
    input_columns: list[str],
    output_columns: list[str],
    delimiter: str = ",",
    encoding: str = "utf-8",
) -> "PipelineBuilder":
    """
    Configure CSV data source.

    Args:
        path: Path to CSV file
        input_columns: Input column names
        output_columns: Output column names
        delimiter: CSV delimiter
        encoding: File encoding

    Returns:
        Self for chaining
    """
    self._dataset_spec = DatasetSpec(
        source_type=DataSourceType.CSV,
        source_path=Path(path),
        input_columns=input_columns,
        output_columns=output_columns,
        delimiter=delimiter,
        encoding=encoding,
    )
    return self

from_excel

from_excel(path: str, input_columns: list[str], output_columns: list[str], sheet_name: str | int = 0) -> PipelineBuilder

Configure Excel data source.

Parameters:

Name Type Description Default
path str

Path to Excel file

required
input_columns list[str]

Input column names

required
output_columns list[str]

Output column names

required
sheet_name str | int

Sheet name or index

0

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def from_excel(
    self,
    path: str,
    input_columns: list[str],
    output_columns: list[str],
    sheet_name: str | int = 0,
) -> "PipelineBuilder":
    """
    Configure Excel data source.

    Args:
        path: Path to Excel file
        input_columns: Input column names
        output_columns: Output column names
        sheet_name: Sheet name or index

    Returns:
        Self for chaining
    """
    self._dataset_spec = DatasetSpec(
        source_type=DataSourceType.EXCEL,
        source_path=Path(path),
        input_columns=input_columns,
        output_columns=output_columns,
        sheet_name=sheet_name,
    )
    return self

from_parquet

from_parquet(path: str, input_columns: list[str], output_columns: list[str]) -> PipelineBuilder

Configure Parquet data source.

Parameters:

Name Type Description Default
path str

Path to Parquet file

required
input_columns list[str]

Input column names

required
output_columns list[str]

Output column names

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def from_parquet(
    self,
    path: str,
    input_columns: list[str],
    output_columns: list[str],
) -> "PipelineBuilder":
    """
    Configure Parquet data source.

    Args:
        path: Path to Parquet file
        input_columns: Input column names
        output_columns: Output column names

    Returns:
        Self for chaining
    """
    self._dataset_spec = DatasetSpec(
        source_type=DataSourceType.PARQUET,
        source_path=Path(path),
        input_columns=input_columns,
        output_columns=output_columns,
    )
    return self

from_dataframe

from_dataframe(df: DataFrame, input_columns: list[str], output_columns: list[str]) -> PipelineBuilder

Configure DataFrame source.

Parameters:

Name Type Description Default
df DataFrame

Pandas DataFrame

required
input_columns list[str]

Input column names

required
output_columns list[str]

Output column names

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def from_dataframe(
    self,
    df: pd.DataFrame,
    input_columns: list[str],
    output_columns: list[str],
) -> "PipelineBuilder":
    """
    Configure DataFrame source.

    Args:
        df: Pandas DataFrame
        input_columns: Input column names
        output_columns: Output column names

    Returns:
        Self for chaining
    """
    self._dataset_spec = DatasetSpec(
        source_type=DataSourceType.DATAFRAME,
        input_columns=input_columns,
        output_columns=output_columns,
    )
    self._dataframe = df
    return self

with_prompt

with_prompt(template: str, system_message: str | None = None) -> PipelineBuilder

Configure prompt template.

Parameters:

Name Type Description Default
template str

Prompt template with {variable} placeholders

required
system_message str | None

Optional system message

None

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_prompt(
    self,
    template: str,
    system_message: str | None = None,
) -> "PipelineBuilder":
    """
    Configure prompt template.

    Args:
        template: Prompt template with {variable} placeholders
        system_message: Optional system message

    Returns:
        Self for chaining
    """
    self._prompt_spec = PromptSpec(
        template=template,
        system_message=system_message,
    )
    return self

with_llm

with_llm(provider: str, model: str, api_key: str | None = None, temperature: float = 0.0, max_tokens: int | None = None, **kwargs: any) -> PipelineBuilder

Configure LLM provider.

Parameters:

Name Type Description Default
provider str

Provider name (openai, azure_openai, anthropic) or custom provider ID

required
model str

Model identifier

required
api_key str | None

API key (or from env)

None
temperature float

Sampling temperature

0.0
max_tokens int | None

Max output tokens

None
**kwargs any

Provider-specific parameters

{}

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_llm(
    self,
    provider: str,
    model: str,
    api_key: str | None = None,
    temperature: float = 0.0,
    max_tokens: int | None = None,
    **kwargs: any,
) -> "PipelineBuilder":
    """
    Configure LLM provider.

    Args:
        provider: Provider name (openai, azure_openai, anthropic) or custom provider ID
        model: Model identifier
        api_key: API key (or from env)
        temperature: Sampling temperature
        max_tokens: Max output tokens
        **kwargs: Provider-specific parameters

    Returns:
        Self for chaining
    """
    from ondine.adapters.provider_registry import ProviderRegistry

    # Try to convert to enum for built-in providers
    try:
        provider_enum = LLMProvider(provider.lower())
    except ValueError:
        # Not a built-in provider - check if it's a custom provider
        if ProviderRegistry.is_registered(provider):
            # Use a dummy enum value for validation, but store the actual provider string
            provider_enum = LLMProvider.OPENAI  # Dummy for Pydantic validation
            kwargs["_custom_provider_id"] = provider
        else:
            raise ValueError(
                f"Unknown provider: {provider}. "
                f"Available providers: {', '.join(ProviderRegistry.list_providers())}"
            )

    self._llm_spec = LLMSpec(
        provider=provider_enum,
        model=model,
        api_key=api_key,
        temperature=temperature,
        max_tokens=max_tokens,
        **kwargs,
    )
    return self

with_llm_spec

with_llm_spec(spec: LLMSpec) -> PipelineBuilder

Configure LLM using a pre-built LLMSpec object.

This method allows using LLMSpec objects directly, enabling: - Reusable provider configurations - Use of LLMProviderPresets for common providers - Custom LLMSpec instances for advanced use cases

Parameters:

Name Type Description Default
spec LLMSpec

LLM specification object

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Raises:

Type Description
TypeError

If spec is not an LLMSpec instance

Example
Use preset

from ondine.core.specifications import LLMProviderPresets

pipeline = ( PipelineBuilder.create() .from_csv("data.csv", input_columns=["text"], output_columns=["result"]) .with_prompt("Process: {text}") .with_llm_spec(LLMProviderPresets.TOGETHER_AI_LLAMA_70B) .build() )

Custom spec

custom = LLMSpec( provider=LLMProvider.OPENAI, model="gpt-4o-mini", temperature=0.7 ) pipeline.with_llm_spec(custom)

Override preset

spec = LLMProviderPresets.GPT4O_MINI.model_copy( update={"temperature": 0.9} ) pipeline.with_llm_spec(spec)

Source code in ondine/api/pipeline_builder.py
def with_llm_spec(self, spec: LLMSpec) -> "PipelineBuilder":
    """
    Configure LLM using a pre-built LLMSpec object.

    This method allows using LLMSpec objects directly, enabling:
    - Reusable provider configurations
    - Use of LLMProviderPresets for common providers
    - Custom LLMSpec instances for advanced use cases

    Args:
        spec: LLM specification object

    Returns:
        Self for chaining

    Raises:
        TypeError: If spec is not an LLMSpec instance

    Example:
        # Use preset
        from ondine.core.specifications import LLMProviderPresets

        pipeline = (
            PipelineBuilder.create()
            .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
            .with_prompt("Process: {text}")
            .with_llm_spec(LLMProviderPresets.TOGETHER_AI_LLAMA_70B)
            .build()
        )

        # Custom spec
        custom = LLMSpec(
            provider=LLMProvider.OPENAI,
            model="gpt-4o-mini",
            temperature=0.7
        )
        pipeline.with_llm_spec(custom)

        # Override preset
        spec = LLMProviderPresets.GPT4O_MINI.model_copy(
            update={"temperature": 0.9}
        )
        pipeline.with_llm_spec(spec)
    """
    if not isinstance(spec, LLMSpec):
        raise TypeError(
            f"Expected LLMSpec, got {type(spec).__name__}. "
            f"Use with_llm() for parameter-based configuration."
        )

    self._llm_spec = spec
    return self

with_custom_llm_client

with_custom_llm_client(client: any) -> PipelineBuilder

Provide a custom LLM client instance directly.

This allows advanced users to create their own LLM client implementations by extending the LLMClient base class. The custom client will be used instead of the factory-created client.

Parameters:

Name Type Description Default
client any

Custom LLM client instance (must inherit from LLMClient)

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Example

class MyCustomClient(LLMClient): def invoke(self, prompt: str, **kwargs) -> LLMResponse: # Custom implementation ...

pipeline = ( PipelineBuilder.create() .from_dataframe(df, ...) .with_prompt("...") .with_custom_llm_client(MyCustomClient(spec)) .build() )

Source code in ondine/api/pipeline_builder.py
def with_custom_llm_client(self, client: any) -> "PipelineBuilder":
    """
    Provide a custom LLM client instance directly.

    This allows advanced users to create their own LLM client implementations
    by extending the LLMClient base class. The custom client will be used
    instead of the factory-created client.

    Args:
        client: Custom LLM client instance (must inherit from LLMClient)

    Returns:
        Self for chaining

    Example:
        class MyCustomClient(LLMClient):
            def invoke(self, prompt: str, **kwargs) -> LLMResponse:
                # Custom implementation
                ...

        pipeline = (
            PipelineBuilder.create()
            .from_dataframe(df, ...)
            .with_prompt("...")
            .with_custom_llm_client(MyCustomClient(spec))
            .build()
        )
    """
    from ondine.adapters.llm_client import LLMClient

    if not isinstance(client, LLMClient):
        raise TypeError(
            f"Custom client must inherit from LLMClient, got {type(client).__name__}"
        )

    self._custom_llm_client = client
    return self

with_batch_size

with_batch_size(size: int) -> PipelineBuilder

Configure batch size.

Parameters:

Name Type Description Default
size int

Rows per batch

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_batch_size(self, size: int) -> "PipelineBuilder":
    """
    Configure batch size.

    Args:
        size: Rows per batch

    Returns:
        Self for chaining
    """
    self._processing_spec.batch_size = size
    return self

with_concurrency

with_concurrency(threads: int) -> PipelineBuilder

Configure concurrent requests.

Parameters:

Name Type Description Default
threads int

Number of concurrent threads

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_concurrency(self, threads: int) -> "PipelineBuilder":
    """
    Configure concurrent requests.

    Args:
        threads: Number of concurrent threads

    Returns:
        Self for chaining
    """
    self._processing_spec.concurrency = threads
    return self

with_checkpoint_interval

with_checkpoint_interval(rows: int) -> PipelineBuilder

Configure checkpoint frequency.

Parameters:

Name Type Description Default
rows int

Rows between checkpoints

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_checkpoint_interval(self, rows: int) -> "PipelineBuilder":
    """
    Configure checkpoint frequency.

    Args:
        rows: Rows between checkpoints

    Returns:
        Self for chaining
    """
    self._processing_spec.checkpoint_interval = rows
    return self

with_rate_limit

with_rate_limit(rpm: int) -> PipelineBuilder

Configure rate limiting.

Parameters:

Name Type Description Default
rpm int

Requests per minute

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_rate_limit(self, rpm: int) -> "PipelineBuilder":
    """
    Configure rate limiting.

    Args:
        rpm: Requests per minute

    Returns:
        Self for chaining
    """
    self._processing_spec.rate_limit_rpm = rpm
    return self

with_max_retries

with_max_retries(retries: int) -> PipelineBuilder

Configure maximum retry attempts.

Parameters:

Name Type Description Default
retries int

Maximum number of retry attempts

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_max_retries(self, retries: int) -> "PipelineBuilder":
    """
    Configure maximum retry attempts.

    Args:
        retries: Maximum number of retry attempts

    Returns:
        Self for chaining
    """
    self._processing_spec.max_retries = retries
    return self

with_max_budget

with_max_budget(budget: float) -> PipelineBuilder

Configure maximum budget.

Parameters:

Name Type Description Default
budget float

Maximum budget in USD

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_max_budget(self, budget: float) -> "PipelineBuilder":
    """
    Configure maximum budget.

    Args:
        budget: Maximum budget in USD

    Returns:
        Self for chaining
    """
    self._processing_spec.max_budget = Decimal(str(budget))
    return self

with_error_policy

with_error_policy(policy: str) -> PipelineBuilder

Configure error handling policy.

Parameters:

Name Type Description Default
policy str

Error policy ('skip', 'fail', 'retry', 'use_default')

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_error_policy(self, policy: str) -> "PipelineBuilder":
    """
    Configure error handling policy.

    Args:
        policy: Error policy ('skip', 'fail', 'retry', 'use_default')

    Returns:
        Self for chaining
    """
    from ondine.core.specifications import ErrorPolicy

    self._processing_spec.error_policy = ErrorPolicy(policy.lower())
    return self

with_checkpoint_dir

with_checkpoint_dir(directory: str) -> PipelineBuilder

Configure checkpoint directory.

Parameters:

Name Type Description Default
directory str

Path to checkpoint directory

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_checkpoint_dir(self, directory: str) -> "PipelineBuilder":
    """
    Configure checkpoint directory.

    Args:
        directory: Path to checkpoint directory

    Returns:
        Self for chaining
    """
    self._processing_spec.checkpoint_dir = Path(directory)
    return self

with_parser

with_parser(parser: any) -> PipelineBuilder

Configure response parser.

This method allows setting a custom parser. The parser type determines the response_format in the prompt spec.

Parameters:

Name Type Description Default
parser any

Parser instance (JSONParser, RegexParser, PydanticParser, etc.)

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_parser(self, parser: any) -> "PipelineBuilder":
    """
    Configure response parser.

    This method allows setting a custom parser. The parser type
    determines the response_format in the prompt spec.

    Args:
        parser: Parser instance (JSONParser, RegexParser, PydanticParser, etc.)

    Returns:
        Self for chaining
    """
    # Store the parser for later use in the pipeline
    # We'll configure response_format based on parser type
    if hasattr(parser, "__class__"):
        parser_name = parser.__class__.__name__
        if "JSON" in parser_name:
            if not self._prompt_spec:
                raise ValueError(
                    "with_prompt() must be called before with_parser()"
                )
            # Update the existing prompt spec's response_format
            self._prompt_spec.response_format = "json"
        elif "Regex" in parser_name:
            if not self._prompt_spec:
                raise ValueError(
                    "with_prompt() must be called before with_parser()"
                )
            self._prompt_spec.response_format = "regex"
            if hasattr(parser, "patterns"):
                self._prompt_spec.regex_patterns = parser.patterns

    # Store the parser instance in metadata for the pipeline to use
    if not hasattr(self, "_custom_parser"):
        self._custom_parser = parser

    return self

to_csv

to_csv(path: str) -> PipelineBuilder

Configure CSV output destination.

Alias for with_output(path, format='csv').

Parameters:

Name Type Description Default
path str

Output CSV file path

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def to_csv(self, path: str) -> "PipelineBuilder":
    """
    Configure CSV output destination.

    Alias for with_output(path, format='csv').

    Args:
        path: Output CSV file path

    Returns:
        Self for chaining
    """
    return self.with_output(path, format="csv")

with_output

with_output(path: str, format: str = 'csv', merge_strategy: str = 'replace') -> PipelineBuilder

Configure output destination.

Parameters:

Name Type Description Default
path str

Output file path

required
format str

Output format (csv, excel, parquet)

'csv'
merge_strategy str

Merge strategy (replace, append, update)

'replace'

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_output(
    self,
    path: str,
    format: str = "csv",
    merge_strategy: str = "replace",
) -> "PipelineBuilder":
    """
    Configure output destination.

    Args:
        path: Output file path
        format: Output format (csv, excel, parquet)
        merge_strategy: Merge strategy (replace, append, update)

    Returns:
        Self for chaining
    """
    format_map = {
        "csv": DataSourceType.CSV,
        "excel": DataSourceType.EXCEL,
        "parquet": DataSourceType.PARQUET,
    }

    merge_map = {
        "replace": MergeStrategy.REPLACE,
        "append": MergeStrategy.APPEND,
        "update": MergeStrategy.UPDATE,
    }

    self._output_spec = OutputSpec(
        destination_type=format_map[format.lower()],
        destination_path=Path(path),
        merge_strategy=merge_map[merge_strategy.lower()],
    )
    return self

with_executor

with_executor(executor: ExecutionStrategy) -> PipelineBuilder

Set custom execution strategy.

Parameters:

Name Type Description Default
executor ExecutionStrategy

ExecutionStrategy instance

required

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_executor(self, executor: ExecutionStrategy) -> "PipelineBuilder":
    """
    Set custom execution strategy.

    Args:
        executor: ExecutionStrategy instance

    Returns:
        Self for chaining
    """
    self._executor = executor
    return self

with_async_execution

with_async_execution(max_concurrency: int = 10) -> PipelineBuilder

Use async execution strategy.

Enables async/await for non-blocking execution. Ideal for FastAPI, aiohttp, and async frameworks.

Parameters:

Name Type Description Default
max_concurrency int

Maximum concurrent async tasks

10

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_async_execution(self, max_concurrency: int = 10) -> "PipelineBuilder":
    """
    Use async execution strategy.

    Enables async/await for non-blocking execution.
    Ideal for FastAPI, aiohttp, and async frameworks.

    Args:
        max_concurrency: Maximum concurrent async tasks

    Returns:
        Self for chaining
    """
    self._executor = AsyncExecutor(max_concurrency=max_concurrency)
    return self

with_streaming

with_streaming(chunk_size: int = 1000) -> PipelineBuilder

Use streaming execution strategy.

Processes data in chunks for memory-efficient handling. Ideal for large datasets (100K+ rows).

Parameters:

Name Type Description Default
chunk_size int

Number of rows per chunk

1000

Returns:

Type Description
PipelineBuilder

Self for chaining

Source code in ondine/api/pipeline_builder.py
def with_streaming(self, chunk_size: int = 1000) -> "PipelineBuilder":
    """
    Use streaming execution strategy.

    Processes data in chunks for memory-efficient handling.
    Ideal for large datasets (100K+ rows).

    Args:
        chunk_size: Number of rows per chunk

    Returns:
        Self for chaining
    """
    self._executor = StreamingExecutor(chunk_size=chunk_size)
    return self

with_stage

with_stage(stage_name: str, position: str = 'before_prompt', **stage_kwargs) -> PipelineBuilder

Add a custom pipeline stage by name.

Enables injection of custom processing stages at specific points in the pipeline. Stages must be registered via StageRegistry.

Parameters:

Name Type Description Default
stage_name str

Registered stage name (e.g., "rag_retrieval")

required
position str

Where to inject the stage. Options: - "after_loader" / "before_prompt": After data loading, before prompt formatting - "after_prompt" / "before_llm": After prompt formatting, before LLM invocation - "after_llm" / "before_parser": After LLM invocation, before parsing - "after_parser": After response parsing

'before_prompt'
**stage_kwargs

Arguments to pass to stage constructor

{}

Returns:

Type Description
PipelineBuilder

Self for chaining

Raises:

Type Description
ValueError

If stage_name not registered or position invalid

Example
RAG retrieval example

pipeline = ( PipelineBuilder.create() .from_csv("questions.csv", input_columns=["question"], output_columns=["answer"]) .with_stage( "rag_retrieval", position="before_prompt", vector_store="pinecone", index_name="my-docs", top_k=5 ) .with_prompt("Context: {retrieved_context}\n\nQuestion: {question}\n\nAnswer:") .with_llm(provider="openai", model="gpt-4o") .build() )

Content moderation example

pipeline = ( PipelineBuilder.create() .from_csv("content.csv", input_columns=["text"], output_columns=["moderated"]) .with_stage( "content_moderation", position="before_llm", block_patterns=["spam", "offensive"] ) .with_prompt("Moderate: {text}") .with_llm(provider="openai", model="gpt-4o-mini") .build() )

Source code in ondine/api/pipeline_builder.py
def with_stage(
    self,
    stage_name: str,
    position: str = "before_prompt",
    **stage_kwargs,
) -> "PipelineBuilder":
    """
    Add a custom pipeline stage by name.

    Enables injection of custom processing stages at specific points
    in the pipeline. Stages must be registered via StageRegistry.

    Args:
        stage_name: Registered stage name (e.g., "rag_retrieval")
        position: Where to inject the stage. Options:
            - "after_loader" / "before_prompt": After data loading, before prompt formatting
            - "after_prompt" / "before_llm": After prompt formatting, before LLM invocation
            - "after_llm" / "before_parser": After LLM invocation, before parsing
            - "after_parser": After response parsing
        **stage_kwargs: Arguments to pass to stage constructor

    Returns:
        Self for chaining

    Raises:
        ValueError: If stage_name not registered or position invalid

    Example:
        # RAG retrieval example
        pipeline = (
            PipelineBuilder.create()
            .from_csv("questions.csv", input_columns=["question"], output_columns=["answer"])
            .with_stage(
                "rag_retrieval",
                position="before_prompt",
                vector_store="pinecone",
                index_name="my-docs",
                top_k=5
            )
            .with_prompt("Context: {retrieved_context}\\n\\nQuestion: {question}\\n\\nAnswer:")
            .with_llm(provider="openai", model="gpt-4o")
            .build()
        )

        # Content moderation example
        pipeline = (
            PipelineBuilder.create()
            .from_csv("content.csv", input_columns=["text"], output_columns=["moderated"])
            .with_stage(
                "content_moderation",
                position="before_llm",
                block_patterns=["spam", "offensive"]
            )
            .with_prompt("Moderate: {text}")
            .with_llm(provider="openai", model="gpt-4o-mini")
            .build()
        )
    """
    from ondine.stages.stage_registry import StageRegistry

    # Validate position
    valid_positions = [
        "after_loader",
        "before_prompt",
        "after_prompt",
        "before_llm",
        "after_llm",
        "before_parser",
        "after_parser",
    ]
    if position not in valid_positions:
        raise ValueError(
            f"Invalid position '{position}'. Must be one of: {', '.join(valid_positions)}"
        )

    # Get stage class from registry (this will raise ValueError if not found)
    stage_class = StageRegistry.get(stage_name)

    # Store stage config for later instantiation
    self._custom_stages.append(
        {
            "name": stage_name,
            "class": stage_class,
            "position": position,
            "kwargs": stage_kwargs,
        }
    )

    return self

with_observer

with_observer(name: str, config: dict[str, any] | None = None) -> PipelineBuilder

Add observability observer to the pipeline.

Observers receive events during pipeline execution for monitoring, logging, and tracing.

Parameters:

Name Type Description Default
name str

Observer identifier (e.g., "langfuse", "opentelemetry", "logging")

required
config dict[str, any] | None

Observer-specific configuration dictionary

None

Returns:

Type Description
PipelineBuilder

Self for chaining

Raises:

Type Description
ValueError

If observer not registered

Example
OpenTelemetry for infrastructure monitoring

pipeline = ( PipelineBuilder.create() .from_csv("data.csv", ...) .with_prompt("...") .with_llm(provider="openai", model="gpt-4o-mini") .with_observer("opentelemetry", config={ "tracer_name": "my_pipeline", "include_prompts": False }) .build() )

Langfuse for LLM-specific observability

pipeline = ( PipelineBuilder.create() .from_csv("data.csv", ...) .with_prompt("...") .with_llm(provider="openai", model="gpt-4o-mini") .with_observer("langfuse", config={ "public_key": "pk-lf-...", "secret_key": "sk-lf-..." }) .build() )

Multiple observers

pipeline = ( PipelineBuilder.create() .from_csv("data.csv", ...) .with_prompt("...") .with_llm(provider="openai", model="gpt-4o-mini") .with_observer("langfuse", config={...}) .with_observer("opentelemetry", config={...}) .with_observer("logging", config={"log_level": "DEBUG"}) .build() )

Source code in ondine/api/pipeline_builder.py
def with_observer(
    self, name: str, config: dict[str, any] | None = None
) -> "PipelineBuilder":
    """
    Add observability observer to the pipeline.

    Observers receive events during pipeline execution for monitoring,
    logging, and tracing.

    Args:
        name: Observer identifier (e.g., "langfuse", "opentelemetry", "logging")
        config: Observer-specific configuration dictionary

    Returns:
        Self for chaining

    Raises:
        ValueError: If observer not registered

    Example:
        # OpenTelemetry for infrastructure monitoring
        pipeline = (
            PipelineBuilder.create()
            .from_csv("data.csv", ...)
            .with_prompt("...")
            .with_llm(provider="openai", model="gpt-4o-mini")
            .with_observer("opentelemetry", config={
                "tracer_name": "my_pipeline",
                "include_prompts": False
            })
            .build()
        )

        # Langfuse for LLM-specific observability
        pipeline = (
            PipelineBuilder.create()
            .from_csv("data.csv", ...)
            .with_prompt("...")
            .with_llm(provider="openai", model="gpt-4o-mini")
            .with_observer("langfuse", config={
                "public_key": "pk-lf-...",
                "secret_key": "sk-lf-..."
            })
            .build()
        )

        # Multiple observers
        pipeline = (
            PipelineBuilder.create()
            .from_csv("data.csv", ...)
            .with_prompt("...")
            .with_llm(provider="openai", model="gpt-4o-mini")
            .with_observer("langfuse", config={...})
            .with_observer("opentelemetry", config={...})
            .with_observer("logging", config={"log_level": "DEBUG"})
            .build()
        )
    """
    from ondine.observability.registry import ObserverRegistry

    # Validate observer is registered
    if not ObserverRegistry.is_registered(name):
        available = ", ".join(ObserverRegistry.list_observers())
        raise ValueError(
            f"Observer '{name}' not registered. "
            f"Available observers: {available or 'none'}"
        )

    # Store observer config for later instantiation
    self._observers.append((name, config or {}))

    return self

build

build() -> Pipeline

Build final Pipeline.

Returns:

Type Description
Pipeline

Configured Pipeline

Raises:

Type Description
ValueError

If required specifications missing

Source code in ondine/api/pipeline_builder.py
def build(self) -> Pipeline:
    """
    Build final Pipeline.

    Returns:
        Configured Pipeline

    Raises:
        ValueError: If required specifications missing
    """
    # Validate required specs
    if not self._dataset_spec:
        raise ValueError("Dataset specification required")
    if not self._prompt_spec:
        raise ValueError("Prompt specification required")

    # LLM spec is optional if custom client is provided
    if not self._llm_spec and not self._custom_llm_client:
        raise ValueError("Either LLM specification or custom LLM client required")

    # Prepare metadata with custom parser, custom client, custom stages, and observers
    metadata = {}
    if self._custom_parser is not None:
        metadata["custom_parser"] = self._custom_parser
    if self._custom_llm_client is not None:
        metadata["custom_llm_client"] = self._custom_llm_client
    if self._custom_stages:
        metadata["custom_stages"] = self._custom_stages
    if self._observers:
        metadata["observers"] = self._observers

    # Create specifications bundle
    # If custom client provided but no llm_spec, create a dummy spec
    llm_spec = self._llm_spec
    if llm_spec is None and self._custom_llm_client is not None:
        # Create minimal spec using custom client's attributes
        llm_spec = LLMSpec(
            provider=LLMProvider.OPENAI,  # Dummy provider
            model=self._custom_llm_client.model,
            temperature=self._custom_llm_client.temperature,
            max_tokens=self._custom_llm_client.max_tokens,
        )

    specifications = PipelineSpecifications(
        dataset=self._dataset_spec,
        prompt=self._prompt_spec,
        llm=llm_spec,
        processing=self._processing_spec,
        output=self._output_spec,
        metadata=metadata,
    )

    # Create and return pipeline
    return Pipeline(
        specifications,
        dataframe=self._dataframe,
        executor=self._executor,
    )

QuickPipeline

Simplified pipeline API with smart defaults.

Designed for rapid prototyping and common use cases. Automatically detects: - Input columns from prompt template placeholders - Provider from model name (e.g., gpt-4 → openai, claude → anthropic) - Parser type (JSON for multi-column, text for single column) - Reasonable defaults for batch size, concurrency, retries

Examples:

Minimal usage:

>>> pipeline = QuickPipeline.create(
...     data="data.csv",
...     prompt="Categorize this text: {text}"
... )
>>> result = pipeline.execute()

With explicit outputs:

>>> pipeline = QuickPipeline.create(
...     data="products.csv",
...     prompt="Extract: {description}",
...     output_columns=["brand", "model", "price"]
... )

Override defaults:

>>> pipeline = QuickPipeline.create(
...     data=df,
...     prompt="Summarize: {content}",
...     model="gpt-4o",
...     temperature=0.7,
...     max_budget=Decimal("5.0")
... )

create staticmethod

create(data: str | Path | DataFrame, prompt: str, model: str = 'gpt-4o-mini', output_columns: list[str] | str | None = None, provider: str | None = None, temperature: float = 0.0, max_tokens: int | None = None, max_budget: Decimal | float | str | None = None, batch_size: int | None = None, concurrency: int | None = None, **kwargs: Any) -> Pipeline

Create a pipeline with smart defaults.

Parameters:

Name Type Description Default
data str | Path | DataFrame

CSV/Excel/Parquet file path or DataFrame

required
prompt str

Prompt template with {placeholders}

required
model str

Model name (default: gpt-4o-mini)

'gpt-4o-mini'
output_columns list[str] | str | None

Output column name(s). If None, uses ["output"]

None
provider str | None

LLM provider. If None, auto-detected from model name

None
temperature float

Sampling temperature (default: 0.0 for deterministic)

0.0
max_tokens int | None

Max output tokens (default: provider's default)

None
max_budget Decimal | float | str | None

Maximum cost budget in USD

None
batch_size int | None

Rows per batch (default: auto-sized based on data)

None
concurrency int | None

Parallel requests (default: auto-sized)

None
**kwargs Any

Additional arguments passed to PipelineBuilder

{}

Returns:

Type Description
Pipeline

Configured Pipeline ready to execute

Raises:

Type Description
ValueError

If input data cannot be loaded or prompt is invalid

Source code in ondine/api/quick.py
@staticmethod
def create(
    data: str | Path | pd.DataFrame,
    prompt: str,
    model: str = "gpt-4o-mini",
    output_columns: list[str] | str | None = None,
    provider: str | None = None,
    temperature: float = 0.0,
    max_tokens: int | None = None,
    max_budget: Decimal | float | str | None = None,
    batch_size: int | None = None,
    concurrency: int | None = None,
    **kwargs: Any,
) -> Pipeline:
    """
    Create a pipeline with smart defaults.

    Args:
        data: CSV/Excel/Parquet file path or DataFrame
        prompt: Prompt template with {placeholders}
        model: Model name (default: gpt-4o-mini)
        output_columns: Output column name(s). If None, uses ["output"]
        provider: LLM provider. If None, auto-detected from model name
        temperature: Sampling temperature (default: 0.0 for deterministic)
        max_tokens: Max output tokens (default: provider's default)
        max_budget: Maximum cost budget in USD
        batch_size: Rows per batch (default: auto-sized based on data)
        concurrency: Parallel requests (default: auto-sized)
        **kwargs: Additional arguments passed to PipelineBuilder

    Returns:
        Configured Pipeline ready to execute

    Raises:
        ValueError: If input data cannot be loaded or prompt is invalid
    """
    # 1. Load data
    df = QuickPipeline._load_data(data)

    # 2. Auto-detect input columns from prompt template
    input_columns = QuickPipeline._extract_placeholders(prompt)
    if not input_columns:
        raise ValueError(
            f"No placeholders found in prompt: {prompt}\n"
            "Expected format: 'Your prompt with {{column_name}} placeholders'"
        )

    # Validate input columns exist in data
    missing = [col for col in input_columns if col not in df.columns]
    if missing:
        raise ValueError(
            f"Input columns {missing} not found in data. "
            f"Available columns: {list(df.columns)}"
        )

    # 3. Normalize output columns
    if output_columns is None:
        output_columns = ["output"]
    elif isinstance(output_columns, str):
        output_columns = [output_columns]

    # 4. Auto-detect provider from model name
    if provider is None:
        provider = QuickPipeline._detect_provider(model)

    # 5. Auto-select parser (JSON for multi-column, text for single)
    parser = QuickPipeline._select_parser(output_columns)

    # 6. Smart defaults for batch_size and concurrency
    if batch_size is None:
        batch_size = QuickPipeline._default_batch_size(len(df))
    if concurrency is None:
        concurrency = QuickPipeline._default_concurrency(provider)

    # 7. Build pipeline
    builder = (
        PipelineBuilder.create()
        .from_dataframe(
            df, input_columns=input_columns, output_columns=output_columns
        )
        .with_prompt(template=prompt)
        .with_llm(
            provider=provider,
            model=model,
            temperature=temperature,
            max_tokens=max_tokens,
            **kwargs,
        )
    )

    # Add optional parser if multi-column
    if parser:
        builder = builder.with_parser(parser)

    # Add batch/concurrency settings
    builder = builder.with_batch_size(batch_size).with_concurrency(concurrency)

    # Add budget if specified
    if max_budget is not None:
        # Convert to float for PipelineBuilder (it expects float)
        if isinstance(max_budget, Decimal | str):
            max_budget = float(max_budget)
        builder = builder.with_max_budget(budget=max_budget)

    # Add sensible retry defaults
    builder = builder.with_max_retries(3)

    return builder.build()

CostEstimate dataclass

CostEstimate(total_cost: Decimal, total_tokens: int, input_tokens: int, output_tokens: int, rows: int, breakdown_by_stage: dict[str, Decimal] = dict(), confidence: str = 'estimate')

Cost estimation for pipeline execution.

ExecutionResult dataclass

ExecutionResult(data: DataFrame, metrics: ProcessingStats, costs: CostEstimate, errors: list[ErrorInfo] = list(), execution_id: UUID = uuid4(), start_time: datetime = datetime.now(), end_time: datetime | None = None, success: bool = True, metadata: dict[str, Any] = dict())

Complete result from pipeline execution.

duration property

duration: float

Get execution duration in seconds.

error_rate property

error_rate: float

Get error rate as percentage.

validate_output_quality

validate_output_quality(output_columns: list[str]) -> QualityReport

Validate the quality of output data by checking for null/empty values.

Parameters:

Name Type Description Default
output_columns list[str]

List of output column names to check

required

Returns:

Type Description
QualityReport

QualityReport with quality metrics and warnings

Source code in ondine/core/models.py
def validate_output_quality(self, output_columns: list[str]) -> "QualityReport":
    """
    Validate the quality of output data by checking for null/empty values.

    Args:
        output_columns: List of output column names to check

    Returns:
        QualityReport with quality metrics and warnings
    """
    total = len(self.data)

    # Count null and empty values across output columns
    null_count = 0
    empty_count = 0

    for col in output_columns:
        if col in self.data.columns:
            # Count nulls (None, NaN, NaT)
            null_count += self.data[col].isna().sum()
            # Count empty strings (only for string columns)
            if self.data[col].dtype == "object":
                empty_count += (self.data[col].astype(str).str.strip() == "").sum()

    # Calculate per-column metrics (exclude both nulls and empties)
    valid_outputs = total - null_count - empty_count
    success_rate = (valid_outputs / total * 100) if total > 0 else 0.0

    # Determine quality score
    if success_rate >= 95.0:
        quality_score = "excellent"
    elif success_rate >= 80.0:
        quality_score = "good"
    elif success_rate >= 50.0:
        quality_score = "poor"
    else:
        quality_score = "critical"

    # Generate warnings and issues
    warnings = []
    issues = []

    if success_rate < 70.0:
        issues.append(
            f"⚠️  LOW SUCCESS RATE: Only {success_rate:.1f}% of outputs are valid "
            f"({valid_outputs}/{total} rows)"
        )

    if null_count > total * 0.3:  # > 30% nulls
        issues.append(
            f"⚠️  HIGH NULL RATE: {null_count} null values found "
            f"({null_count / total * 100:.1f}% of rows)"
        )

    if empty_count > total * 0.1:  # > 10% empty
        warnings.append(
            f"Empty outputs detected: {empty_count} rows "
            f"({empty_count / total * 100:.1f}%)"
        )

    # Check if reported metrics match actual data quality
    if self.metrics.failed_rows == 0 and null_count > 0:
        issues.append(
            f"⚠️  METRICS MISMATCH: Pipeline reported 0 failures but "
            f"{null_count} rows have null outputs. This may indicate silent errors."
        )

    return QualityReport(
        total_rows=total,
        valid_outputs=valid_outputs,
        null_outputs=null_count,
        empty_outputs=empty_count,
        success_rate=success_rate,
        quality_score=quality_score,
        warnings=warnings,
        issues=issues,
    )

ProcessingStats dataclass

ProcessingStats(total_rows: int, processed_rows: int, failed_rows: int, skipped_rows: int, rows_per_second: float, total_duration_seconds: float, stage_durations: dict[str, float] = dict())

Statistics from pipeline execution.

QualityReport dataclass

QualityReport(total_rows: int, valid_outputs: int, null_outputs: int, empty_outputs: int, success_rate: float, quality_score: str, warnings: list[str] = list(), issues: list[str] = list())

Quality assessment of pipeline output.

is_acceptable property

is_acceptable: bool

Check if quality is acceptable (>= 70% success).

has_issues property

has_issues: bool

Check if there are any issues.

DatasetSpec

Bases: BaseModel

Specification for data source configuration.

validate_source_path classmethod

validate_source_path(v: str | Path | None) -> Path | None

Convert string paths to Path objects.

Source code in ondine/core/specifications.py
@field_validator("source_path")
@classmethod
def validate_source_path(cls, v: str | Path | None) -> Path | None:
    """Convert string paths to Path objects."""
    if v is None:
        return None
    return Path(v) if isinstance(v, str) else v

validate_no_overlap classmethod

validate_no_overlap(v: list[str], info: Any) -> list[str]

Ensure output columns don't overlap with input columns.

Source code in ondine/core/specifications.py
@field_validator("output_columns")
@classmethod
def validate_no_overlap(cls, v: list[str], info: Any) -> list[str]:
    """Ensure output columns don't overlap with input columns."""
    if "input_columns" in info.data:
        input_cols = set(info.data["input_columns"])
        output_cols = set(v)
        overlap = input_cols & output_cols
        if overlap:
            raise ValueError(f"Output columns overlap with input: {overlap}")
    return v

LLMSpec

Bases: BaseModel

Specification for LLM provider configuration.

validate_base_url_format classmethod

validate_base_url_format(v: str | None) -> str | None

Validate base_url is a valid HTTP(S) URL with a host.

Source code in ondine/core/specifications.py
@field_validator("base_url")
@classmethod
def validate_base_url_format(cls, v: str | None) -> str | None:
    """Validate base_url is a valid HTTP(S) URL with a host."""
    if v is None:
        return v
    from urllib.parse import urlparse

    parsed = urlparse(v)
    if parsed.scheme not in {"http", "https"}:
        raise ValueError("base_url must start with http:// or https://")
    if not parsed.netloc:
        raise ValueError(
            "base_url must include a host (e.g., localhost, api.example.com)"
        )
    return v

validate_azure_config classmethod

validate_azure_config(v: str | None, info: Any) -> str | None

Validate Azure-specific configuration.

Source code in ondine/core/specifications.py
@field_validator("azure_endpoint", "azure_deployment")
@classmethod
def validate_azure_config(cls, v: str | None, info: Any) -> str | None:
    """Validate Azure-specific configuration."""
    if info.data.get("provider") == LLMProvider.AZURE_OPENAI and v is None:
        field_name = info.field_name
        raise ValueError(f"{field_name} required for Azure OpenAI provider")
    return v

validate_provider_requirements

validate_provider_requirements() -> LLMSpec

Validate provider-specific requirements.

Source code in ondine/core/specifications.py
@model_validator(mode="after")
def validate_provider_requirements(self) -> "LLMSpec":
    """Validate provider-specific requirements."""
    # Check openai_compatible requires base_url
    if self.provider == LLMProvider.OPENAI_COMPATIBLE and self.base_url is None:
        raise ValueError("base_url required for openai_compatible provider")
    return self

PipelineSpecifications

Bases: BaseModel

Container for all pipeline specifications.

ProcessingSpec

Bases: BaseModel

Specification for processing parameters.

validate_checkpoint_dir classmethod

validate_checkpoint_dir(v: str | Path) -> Path

Convert string paths to Path objects.

Source code in ondine/core/specifications.py
@field_validator("checkpoint_dir")
@classmethod
def validate_checkpoint_dir(cls, v: str | Path) -> Path:
    """Convert string paths to Path objects."""
    return Path(v) if isinstance(v, str) else v

PromptSpec

Bases: BaseModel

Specification for prompt template configuration.

validate_template classmethod

validate_template(v: str) -> str

Validate template has at least one variable.

Source code in ondine/core/specifications.py
@field_validator("template")
@classmethod
def validate_template(cls, v: str) -> str:
    """Validate template has at least one variable."""
    if "{" not in v or "}" not in v:
        raise ValueError(
            "Template must contain at least one variable in {var} format"
        )
    return v

validate_response_format classmethod

validate_response_format(v: str) -> str

Validate response format is supported.

Source code in ondine/core/specifications.py
@field_validator("response_format")
@classmethod
def validate_response_format(cls, v: str) -> str:
    """Validate response format is supported."""
    allowed = ["raw", "json", "regex"]
    if v not in allowed:
        raise ValueError(f"response_format must be one of {allowed}, got '{v}'")
    return v