Skip to content

models

models

Core data models for execution results and metadata.

These models represent the outputs and state information from pipeline execution with type safety.

LLMResponse dataclass

LLMResponse(text: str, tokens_in: int, tokens_out: int, model: str, cost: Decimal, latency_ms: float, metadata: dict[str, Any] = dict())

Response from a single LLM invocation.

CostEstimate dataclass

CostEstimate(total_cost: Decimal, total_tokens: int, input_tokens: int, output_tokens: int, rows: int, breakdown_by_stage: dict[str, Decimal] = dict(), confidence: str = 'estimate')

Cost estimation for pipeline execution.

ProcessingStats dataclass

ProcessingStats(total_rows: int, processed_rows: int, failed_rows: int, skipped_rows: int, rows_per_second: float, total_duration_seconds: float, stage_durations: dict[str, float] = dict())

Statistics from pipeline execution.

ErrorInfo dataclass

ErrorInfo(row_index: int, stage_name: str, error_type: str, error_message: str, timestamp: datetime, context: dict[str, Any] = dict())

Information about an error during processing.

ExecutionResult dataclass

ExecutionResult(data: DataFrame, metrics: ProcessingStats, costs: CostEstimate, errors: list[ErrorInfo] = list(), execution_id: UUID = uuid4(), start_time: datetime = datetime.now(), end_time: datetime | None = None, success: bool = True, metadata: dict[str, Any] = dict())

Complete result from pipeline execution.

duration property

duration: float

Get execution duration in seconds.

error_rate property

error_rate: float

Get error rate as percentage.

validate_output_quality

validate_output_quality(output_columns: list[str]) -> QualityReport

Validate the quality of output data by checking for null/empty values.

Parameters:

Name Type Description Default
output_columns list[str]

List of output column names to check

required

Returns:

Type Description
QualityReport

QualityReport with quality metrics and warnings

Source code in ondine/core/models.py
def validate_output_quality(self, output_columns: list[str]) -> "QualityReport":
    """
    Validate the quality of output data by checking for null/empty values.

    Args:
        output_columns: List of output column names to check

    Returns:
        QualityReport with quality metrics and warnings
    """
    total = len(self.data)

    # Count null and empty values across output columns
    null_count = 0
    empty_count = 0

    for col in output_columns:
        if col in self.data.columns:
            # Count nulls (None, NaN, NaT)
            null_count += self.data[col].isna().sum()
            # Count empty strings (only for string columns)
            if self.data[col].dtype == "object":
                empty_count += (self.data[col].astype(str).str.strip() == "").sum()

    # Calculate per-column metrics (exclude both nulls and empties)
    valid_outputs = total - null_count - empty_count
    success_rate = (valid_outputs / total * 100) if total > 0 else 0.0

    # Determine quality score
    if success_rate >= 95.0:
        quality_score = "excellent"
    elif success_rate >= 80.0:
        quality_score = "good"
    elif success_rate >= 50.0:
        quality_score = "poor"
    else:
        quality_score = "critical"

    # Generate warnings and issues
    warnings = []
    issues = []

    if success_rate < 70.0:
        issues.append(
            f"⚠️  LOW SUCCESS RATE: Only {success_rate:.1f}% of outputs are valid "
            f"({valid_outputs}/{total} rows)"
        )

    if null_count > total * 0.3:  # > 30% nulls
        issues.append(
            f"⚠️  HIGH NULL RATE: {null_count} null values found "
            f"({null_count / total * 100:.1f}% of rows)"
        )

    if empty_count > total * 0.1:  # > 10% empty
        warnings.append(
            f"Empty outputs detected: {empty_count} rows "
            f"({empty_count / total * 100:.1f}%)"
        )

    # Check if reported metrics match actual data quality
    if self.metrics.failed_rows == 0 and null_count > 0:
        issues.append(
            f"⚠️  METRICS MISMATCH: Pipeline reported 0 failures but "
            f"{null_count} rows have null outputs. This may indicate silent errors."
        )

    return QualityReport(
        total_rows=total,
        valid_outputs=valid_outputs,
        null_outputs=null_count,
        empty_outputs=empty_count,
        success_rate=success_rate,
        quality_score=quality_score,
        warnings=warnings,
        issues=issues,
    )

ValidationResult dataclass

ValidationResult(is_valid: bool, errors: list[str] = list(), warnings: list[str] = list())

Result from validation checks.

add_error

add_error(error: str) -> None

Add an error message.

Source code in ondine/core/models.py
def add_error(self, error: str) -> None:
    """Add an error message."""
    self.errors.append(error)
    self.is_valid = False

add_warning

add_warning(warning: str) -> None

Add a warning message.

Source code in ondine/core/models.py
def add_warning(self, warning: str) -> None:
    """Add a warning message."""
    self.warnings.append(warning)

QualityReport dataclass

QualityReport(total_rows: int, valid_outputs: int, null_outputs: int, empty_outputs: int, success_rate: float, quality_score: str, warnings: list[str] = list(), issues: list[str] = list())

Quality assessment of pipeline output.

is_acceptable property

is_acceptable: bool

Check if quality is acceptable (>= 70% success).

has_issues property

has_issues: bool

Check if there are any issues.

WriteConfirmation dataclass

WriteConfirmation(path: str, rows_written: int, success: bool, timestamp: datetime = datetime.now(), metadata: dict[str, Any] = dict())

Confirmation of successful data write.

CheckpointInfo dataclass

CheckpointInfo(session_id: UUID, checkpoint_path: str, row_index: int, stage_index: int, timestamp: datetime, size_bytes: int)

Information about a checkpoint.

RowMetadata dataclass

RowMetadata(row_index: int, row_id: Any | None = None, batch_id: int | None = None, attempt: int = 1, custom: dict[str, Any] = dict())

Metadata for a single row during processing.

PromptBatch dataclass

PromptBatch(prompts: list[str], metadata: list[RowMetadata], batch_id: int)

Batch of prompts for processing.

ResponseBatch dataclass

ResponseBatch(responses: list[str], metadata: list[RowMetadata], tokens_used: int, cost: Decimal, batch_id: int, latencies_ms: list[float] = list())

Batch of responses from LLM.