Skip to content

observers

observers

Execution observers for monitoring and logging.

Implements Observer pattern for decoupled event notification.

ExecutionObserver

Bases: ABC

Abstract base for execution observers.

Observers can monitor pipeline execution without coupling to the executor implementation.

on_pipeline_start abstractmethod

on_pipeline_start(pipeline: Any, context: ExecutionContext) -> None

Called before first stage execution.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_pipeline_start(self, pipeline: Any, context: ExecutionContext) -> None:
    """Called before first stage execution."""
    pass

on_stage_start abstractmethod

on_stage_start(stage: PipelineStage, context: ExecutionContext) -> None

Called before each stage.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_stage_start(self, stage: PipelineStage, context: ExecutionContext) -> None:
    """Called before each stage."""
    pass

on_stage_complete abstractmethod

on_stage_complete(stage: PipelineStage, context: ExecutionContext, result: Any) -> None

Called after successful stage completion.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_stage_complete(
    self, stage: PipelineStage, context: ExecutionContext, result: Any
) -> None:
    """Called after successful stage completion."""
    pass

on_stage_error abstractmethod

on_stage_error(stage: PipelineStage, context: ExecutionContext, error: Exception) -> None

Called on stage failure.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_stage_error(
    self, stage: PipelineStage, context: ExecutionContext, error: Exception
) -> None:
    """Called on stage failure."""
    pass

on_pipeline_complete abstractmethod

on_pipeline_complete(context: ExecutionContext, result: ExecutionResult) -> None

Called after all stages complete.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_pipeline_complete(
    self, context: ExecutionContext, result: ExecutionResult
) -> None:
    """Called after all stages complete."""
    pass

on_pipeline_error abstractmethod

on_pipeline_error(context: ExecutionContext, error: Exception) -> None

Called on fatal pipeline failure.

Source code in ondine/orchestration/observers.py
@abstractmethod
def on_pipeline_error(self, context: ExecutionContext, error: Exception) -> None:
    """Called on fatal pipeline failure."""
    pass

on_progress_update

on_progress_update(context: ExecutionContext) -> None

Called periodically during execution for progress updates.

Source code in ondine/orchestration/observers.py
def on_progress_update(self, context: ExecutionContext) -> None:
    """Called periodically during execution for progress updates."""
    pass

ProgressBarObserver

ProgressBarObserver()

Bases: ExecutionObserver

Observer that displays progress bar with tqdm.

Initialize progress bar observer.

Source code in ondine/orchestration/observers.py
def __init__(self):
    """Initialize progress bar observer."""
    self.progress_bar: tqdm | None = None

on_pipeline_start

on_pipeline_start(pipeline: Any, context: ExecutionContext) -> None

Initialize progress bar.

Source code in ondine/orchestration/observers.py
def on_pipeline_start(self, pipeline: Any, context: ExecutionContext) -> None:
    """Initialize progress bar."""
    if context.total_rows > 0:
        self.progress_bar = tqdm(
            total=context.total_rows,
            desc="Processing",
            unit="rows",
        )

on_stage_start

on_stage_start(stage: PipelineStage, context: ExecutionContext) -> None

Update progress bar description.

Source code in ondine/orchestration/observers.py
def on_stage_start(self, stage: PipelineStage, context: ExecutionContext) -> None:
    """Update progress bar description."""
    if self.progress_bar:
        self.progress_bar.set_description(f"Stage: {stage.name}")

on_stage_complete

on_stage_complete(stage: PipelineStage, context: ExecutionContext, result: Any) -> None

Update progress bar.

Source code in ondine/orchestration/observers.py
def on_stage_complete(
    self, stage: PipelineStage, context: ExecutionContext, result: Any
) -> None:
    """Update progress bar."""
    if self.progress_bar:
        self.progress_bar.n = context.last_processed_row
        self.progress_bar.refresh()

on_stage_error

on_stage_error(stage: PipelineStage, context: ExecutionContext, error: Exception) -> None

Handle error in progress bar.

Source code in ondine/orchestration/observers.py
def on_stage_error(
    self, stage: PipelineStage, context: ExecutionContext, error: Exception
) -> None:
    """Handle error in progress bar."""
    if self.progress_bar:
        self.progress_bar.set_description(f"Error in {stage.name}")

on_pipeline_complete

on_pipeline_complete(context: ExecutionContext, result: ExecutionResult) -> None

Close progress bar.

Source code in ondine/orchestration/observers.py
def on_pipeline_complete(
    self, context: ExecutionContext, result: ExecutionResult
) -> None:
    """Close progress bar."""
    if self.progress_bar:
        self.progress_bar.close()
        self.progress_bar = None

on_pipeline_error

on_pipeline_error(context: ExecutionContext, error: Exception) -> None

Close progress bar on error.

Source code in ondine/orchestration/observers.py
def on_pipeline_error(self, context: ExecutionContext, error: Exception) -> None:
    """Close progress bar on error."""
    if self.progress_bar:
        self.progress_bar.close()
        self.progress_bar = None

on_progress_update

on_progress_update(context: ExecutionContext) -> None

Update progress bar with current row count.

Source code in ondine/orchestration/observers.py
def on_progress_update(self, context: ExecutionContext) -> None:
    """Update progress bar with current row count."""
    if self.progress_bar:
        self.progress_bar.n = context.last_processed_row
        self.progress_bar.set_postfix(
            {
                "cost": f"${context.accumulated_cost:.4f}",
                "progress": f"{context.get_progress():.1f}%",
            }
        )
        self.progress_bar.refresh()

LoggingObserver

LoggingObserver()

Bases: ExecutionObserver

Observer that logs execution events.

Initialize logging observer.

Source code in ondine/orchestration/observers.py
def __init__(self):
    """Initialize logging observer."""
    self.logger = get_logger(__name__)

on_pipeline_start

on_pipeline_start(pipeline: Any, context: ExecutionContext) -> None

Log pipeline start.

Source code in ondine/orchestration/observers.py
def on_pipeline_start(self, pipeline: Any, context: ExecutionContext) -> None:
    """Log pipeline start."""
    self.logger.info(f"Pipeline execution started (session: {context.session_id})")

on_stage_start

on_stage_start(stage: PipelineStage, context: ExecutionContext) -> None

Log stage start.

Source code in ondine/orchestration/observers.py
def on_stage_start(self, stage: PipelineStage, context: ExecutionContext) -> None:
    """Log stage start."""
    self.logger.info(
        f"Starting stage: {stage.name} (progress: {context.get_progress():.1f}%)"
    )

on_stage_complete

on_stage_complete(stage: PipelineStage, context: ExecutionContext, result: Any) -> None

Log stage completion.

Source code in ondine/orchestration/observers.py
def on_stage_complete(
    self, stage: PipelineStage, context: ExecutionContext, result: Any
) -> None:
    """Log stage completion."""
    self.logger.info(
        f"Completed stage: {stage.name} (cost: ${context.accumulated_cost:.4f})"
    )

on_stage_error

on_stage_error(stage: PipelineStage, context: ExecutionContext, error: Exception) -> None

Log stage error.

Source code in ondine/orchestration/observers.py
def on_stage_error(
    self, stage: PipelineStage, context: ExecutionContext, error: Exception
) -> None:
    """Log stage error."""
    self.logger.error(f"Stage {stage.name} failed: {error}")

on_pipeline_complete

on_pipeline_complete(context: ExecutionContext, result: ExecutionResult) -> None

Log pipeline completion.

Source code in ondine/orchestration/observers.py
def on_pipeline_complete(
    self, context: ExecutionContext, result: ExecutionResult
) -> None:
    """Log pipeline completion."""
    self.logger.info(
        f"Pipeline execution completed successfully\n"
        f"  Processed: {result.metrics.processed_rows} rows\n"
        f"  Duration: {result.metrics.total_duration_seconds:.2f}s\n"
        f"  Total cost: ${result.costs.total_cost:.4f}\n"
        f"  Errors: {result.metrics.failed_rows}"
    )

on_pipeline_error

on_pipeline_error(context: ExecutionContext, error: Exception) -> None

Log pipeline error.

Source code in ondine/orchestration/observers.py
def on_pipeline_error(self, context: ExecutionContext, error: Exception) -> None:
    """Log pipeline error."""
    self.logger.error(f"Pipeline execution failed: {error}")

on_progress_update

on_progress_update(context: ExecutionContext) -> None

Log progress update.

Source code in ondine/orchestration/observers.py
def on_progress_update(self, context: ExecutionContext) -> None:
    """Log progress update."""
    # Make progress very visible with separators
    self.logger.info(
        f"━━━━━━ PROGRESS: {context.last_processed_row}/{context.total_rows} rows "
        f"({context.get_progress():.1f}%) | Cost: ${context.accumulated_cost:.4f} ━━━━━━"
    )

CostTrackingObserver

CostTrackingObserver(warning_threshold: float = 0.75)

Bases: ExecutionObserver

Observer that tracks and warns about costs.

Initialize cost tracking observer.

Parameters:

Name Type Description Default
warning_threshold float

Warn when this fraction of budget used

0.75
Source code in ondine/orchestration/observers.py
def __init__(self, warning_threshold: float = 0.75):
    """
    Initialize cost tracking observer.

    Args:
        warning_threshold: Warn when this fraction of budget used
    """
    self.logger = get_logger(__name__)
    self.warning_threshold = warning_threshold
    self.max_budget: float | None = None

on_pipeline_start

on_pipeline_start(pipeline: Any, context: ExecutionContext) -> None

Set max budget if available.

Source code in ondine/orchestration/observers.py
def on_pipeline_start(self, pipeline: Any, context: ExecutionContext) -> None:
    """Set max budget if available."""
    # Could extract from pipeline specs
    pass

on_stage_start

on_stage_start(stage: PipelineStage, context: ExecutionContext) -> None

No action on stage start.

Source code in ondine/orchestration/observers.py
def on_stage_start(self, stage: PipelineStage, context: ExecutionContext) -> None:
    """No action on stage start."""
    pass

on_stage_complete

on_stage_complete(stage: PipelineStage, context: ExecutionContext, result: Any) -> None

Check cost after stage completion.

Source code in ondine/orchestration/observers.py
def on_stage_complete(
    self, stage: PipelineStage, context: ExecutionContext, result: Any
) -> None:
    """Check cost after stage completion."""
    if self.max_budget:
        usage_ratio = float(context.accumulated_cost) / self.max_budget

        if usage_ratio >= self.warning_threshold:
            self.logger.warning(
                f"Cost warning: {usage_ratio * 100:.1f}% of budget used "
                f"(${context.accumulated_cost:.4f} / ${self.max_budget:.2f})"
            )

on_stage_error

on_stage_error(stage: PipelineStage, context: ExecutionContext, error: Exception) -> None

No action on error.

Source code in ondine/orchestration/observers.py
def on_stage_error(
    self, stage: PipelineStage, context: ExecutionContext, error: Exception
) -> None:
    """No action on error."""
    pass

on_pipeline_complete

on_pipeline_complete(context: ExecutionContext, result: ExecutionResult) -> None

Log final cost summary.

Source code in ondine/orchestration/observers.py
def on_pipeline_complete(
    self, context: ExecutionContext, result: ExecutionResult
) -> None:
    """Log final cost summary."""
    self.logger.info(
        f"Cost summary:\n"
        f"  Total: ${result.costs.total_cost:.4f}\n"
        f"  Input tokens: {result.costs.input_tokens:,}\n"
        f"  Output tokens: {result.costs.output_tokens:,}\n"
        f"  Cost per row: ${float(result.costs.total_cost) / result.metrics.total_rows:.6f}"
    )

on_pipeline_error

on_pipeline_error(context: ExecutionContext, error: Exception) -> None

Log cost at failure.

Source code in ondine/orchestration/observers.py
def on_pipeline_error(self, context: ExecutionContext, error: Exception) -> None:
    """Log cost at failure."""
    self.logger.info(f"Cost at failure: ${context.accumulated_cost:.4f}")

on_progress_update

on_progress_update(context: ExecutionContext) -> None

No action on progress update for cost tracking.

Source code in ondine/orchestration/observers.py
def on_progress_update(self, context: ExecutionContext) -> None:
    """No action on progress update for cost tracking."""
    pass