Skip to content

result_writer_stage

result_writer_stage

Result writing stage for persisting output.

ResultWriterStage

ResultWriterStage()

Bases: PipelineStage[tuple[DataFrame, DataFrame, OutputSpec], DataFrame]

Write results to destination with merge support.

Responsibilities: - Merge results with original data - Write to configured destination - Support atomic writes - Return merged DataFrame

Initialize result writer stage.

Source code in ondine/stages/result_writer_stage.py
def __init__(self):
    """Initialize result writer stage."""
    super().__init__("ResultWriter")

process

process(input_data: tuple[DataFrame, DataFrame, OutputSpec], context: Any) -> pd.DataFrame

Write results to destination and return merged DataFrame.

Source code in ondine/stages/result_writer_stage.py
def process(
    self,
    input_data: tuple[pd.DataFrame, pd.DataFrame, OutputSpec],
    context: Any,
) -> pd.DataFrame:
    """Write results to destination and return merged DataFrame."""
    original_df, results_df, output_spec = input_data

    # Merge results with original data
    merged_df = self._merge_results(
        original_df, results_df, output_spec.merge_strategy
    )

    # Write to destination
    if output_spec.destination_path:
        writer = create_data_writer(output_spec.destination_type)

        if output_spec.atomic_write:
            confirmation = writer.atomic_write(
                merged_df, output_spec.destination_path
            )
        else:
            confirmation = writer.write(merged_df, output_spec.destination_path)

        self.logger.info(
            f"Wrote {confirmation.rows_written} rows to {confirmation.path}"
        )

    # Always return the merged DataFrame (needed for quality validation)
    return merged_df

validate_input

validate_input(input_data: tuple[DataFrame, DataFrame, OutputSpec]) -> ValidationResult

Validate input data and output specification.

Source code in ondine/stages/result_writer_stage.py
def validate_input(
    self,
    input_data: tuple[pd.DataFrame, pd.DataFrame, OutputSpec],
) -> ValidationResult:
    """Validate input data and output specification."""
    result = ValidationResult(is_valid=True)

    original_df, results_df, output_spec = input_data

    if original_df.empty:
        result.add_warning("Original DataFrame is empty")

    if results_df.empty:
        result.add_error("Results DataFrame is empty")

    # Check destination path if specified
    if output_spec.destination_path:
        dest_dir = output_spec.destination_path.parent
        if not dest_dir.exists():
            result.add_warning(f"Destination directory does not exist: {dest_dir}")

    return result

estimate_cost

estimate_cost(input_data: tuple[DataFrame, DataFrame, OutputSpec]) -> CostEstimate

Result writing has no LLM cost.

Source code in ondine/stages/result_writer_stage.py
def estimate_cost(
    self,
    input_data: tuple[pd.DataFrame, pd.DataFrame, OutputSpec],
) -> CostEstimate:
    """Result writing has no LLM cost."""
    return CostEstimate(
        total_cost=Decimal("0.0"),
        total_tokens=0,
        input_tokens=0,
        output_tokens=0,
        rows=len(input_data[1]),
    )