Skip to content

utils

utils

Utility modules for cross-cutting concerns.

BudgetController

BudgetController(max_budget: Decimal | None = None, warn_at_75: bool = True, warn_at_90: bool = True, fail_on_exceed: bool = True)

Controls and enforces budget limits during execution.

Follows Single Responsibility: only handles budget management.

Initialize budget controller.

Parameters:

Name Type Description Default
max_budget Decimal | None

Maximum allowed budget in USD

None
warn_at_75 bool

Warn at 75% of budget

True
warn_at_90 bool

Warn at 90% of budget

True
fail_on_exceed bool

Raise error if budget exceeded

True
Source code in ondine/utils/budget_controller.py
def __init__(
    self,
    max_budget: Decimal | None = None,
    warn_at_75: bool = True,
    warn_at_90: bool = True,
    fail_on_exceed: bool = True,
):
    """
    Initialize budget controller.

    Args:
        max_budget: Maximum allowed budget in USD
        warn_at_75: Warn at 75% of budget
        warn_at_90: Warn at 90% of budget
        fail_on_exceed: Raise error if budget exceeded
    """
    self.max_budget = max_budget
    self.warn_at_75 = warn_at_75
    self.warn_at_90 = warn_at_90
    self.fail_on_exceed = fail_on_exceed

    self._warned_75 = False
    self._warned_90 = False

check_budget

check_budget(current_cost: Decimal) -> None

Check if budget is within limits.

Parameters:

Name Type Description Default
current_cost Decimal

Current accumulated cost

required

Raises:

Type Description
BudgetExceededError

If budget exceeded and fail_on_exceed=True

Source code in ondine/utils/budget_controller.py
def check_budget(self, current_cost: Decimal) -> None:
    """
    Check if budget is within limits.

    Args:
        current_cost: Current accumulated cost

    Raises:
        BudgetExceededError: If budget exceeded and fail_on_exceed=True
    """
    if self.max_budget is None:
        return

    usage_ratio = float(current_cost / self.max_budget)

    # 75% warning
    if self.warn_at_75 and not self._warned_75 and usage_ratio >= 0.75:
        logger.warning(
            f"Budget warning: 75% used "
            f"(${current_cost:.4f} / ${self.max_budget:.2f})"
        )
        self._warned_75 = True

    # 90% warning
    if self.warn_at_90 and not self._warned_90 and usage_ratio >= 0.90:
        logger.warning(
            f"Budget warning: 90% used "
            f"(${current_cost:.4f} / ${self.max_budget:.2f})"
        )
        self._warned_90 = True

    # Budget exceeded
    if current_cost > self.max_budget:
        error_msg = f"Budget exceeded: ${current_cost:.4f} > ${self.max_budget:.2f}"
        logger.error(error_msg)

        if self.fail_on_exceed:
            raise BudgetExceededError(error_msg)

get_remaining

get_remaining(current_cost: Decimal) -> Decimal | None

Get remaining budget.

Parameters:

Name Type Description Default
current_cost Decimal

Current accumulated cost

required

Returns:

Type Description
Decimal | None

Remaining budget or None if no limit

Source code in ondine/utils/budget_controller.py
def get_remaining(self, current_cost: Decimal) -> Decimal | None:
    """
    Get remaining budget.

    Args:
        current_cost: Current accumulated cost

    Returns:
        Remaining budget or None if no limit
    """
    if self.max_budget is None:
        return None
    return self.max_budget - current_cost

get_usage_percentage

get_usage_percentage(current_cost: Decimal) -> float | None

Get budget usage as percentage.

Parameters:

Name Type Description Default
current_cost Decimal

Current accumulated cost

required

Returns:

Type Description
float | None

Usage percentage or None if no limit

Source code in ondine/utils/budget_controller.py
def get_usage_percentage(self, current_cost: Decimal) -> float | None:
    """
    Get budget usage as percentage.

    Args:
        current_cost: Current accumulated cost

    Returns:
        Usage percentage or None if no limit
    """
    if self.max_budget is None:
        return None
    return float(current_cost / self.max_budget) * 100

can_afford

can_afford(estimated_cost: Decimal, current_cost: Decimal) -> bool

Check if estimated additional cost is within budget.

Parameters:

Name Type Description Default
estimated_cost Decimal

Estimated cost for next operation

required
current_cost Decimal

Current accumulated cost

required

Returns:

Type Description
bool

True if within budget

Source code in ondine/utils/budget_controller.py
def can_afford(self, estimated_cost: Decimal, current_cost: Decimal) -> bool:
    """
    Check if estimated additional cost is within budget.

    Args:
        estimated_cost: Estimated cost for next operation
        current_cost: Current accumulated cost

    Returns:
        True if within budget
    """
    if self.max_budget is None:
        return True
    return (current_cost + estimated_cost) <= self.max_budget

BudgetExceededError

Bases: Exception

Raised when budget limit is exceeded.

CostCalculator

Centralized cost calculation for LLM API usage.

Single Responsibility: Calculate cost from token counts and pricing. Used by: LLMClient, CostTracker, and any component needing cost calculation.

Design Decision: Centralize the cost formula in one place to ensure consistency and make future changes (e.g., tiered pricing) easier.

calculate staticmethod

calculate(tokens_in: int, tokens_out: int, input_cost_per_1k: Decimal, output_cost_per_1k: Decimal) -> Decimal

Calculate cost from token counts and pricing.

Formula

cost = (tokens_in / 1000) * input_cost_per_1k + (tokens_out / 1000) * output_cost_per_1k

Parameters:

Name Type Description Default
tokens_in int

Number of input tokens

required
tokens_out int

Number of output tokens

required
input_cost_per_1k Decimal

Cost per 1000 input tokens

required
output_cost_per_1k Decimal

Cost per 1000 output tokens

required

Returns:

Type Description
Decimal

Total cost as Decimal (exact precision for financial calculations)

Example

from decimal import Decimal cost = CostCalculator.calculate( ... tokens_in=1000, ... tokens_out=500, ... input_cost_per_1k=Decimal("0.00005"), ... output_cost_per_1k=Decimal("0.00008") ... ) cost Decimal('0.00009')

Source code in ondine/utils/cost_calculator.py
@staticmethod
def calculate(
    tokens_in: int,
    tokens_out: int,
    input_cost_per_1k: Decimal,
    output_cost_per_1k: Decimal,
) -> Decimal:
    """
    Calculate cost from token counts and pricing.

    Formula:
        cost = (tokens_in / 1000) * input_cost_per_1k +
               (tokens_out / 1000) * output_cost_per_1k

    Args:
        tokens_in: Number of input tokens
        tokens_out: Number of output tokens
        input_cost_per_1k: Cost per 1000 input tokens
        output_cost_per_1k: Cost per 1000 output tokens

    Returns:
        Total cost as Decimal (exact precision for financial calculations)

    Example:
        >>> from decimal import Decimal
        >>> cost = CostCalculator.calculate(
        ...     tokens_in=1000,
        ...     tokens_out=500,
        ...     input_cost_per_1k=Decimal("0.00005"),
        ...     output_cost_per_1k=Decimal("0.00008")
        ... )
        >>> cost
        Decimal('0.00009')
    """
    input_cost = (Decimal(tokens_in) / 1000) * input_cost_per_1k
    output_cost = (Decimal(tokens_out) / 1000) * output_cost_per_1k
    return input_cost + output_cost

CostTracker

CostTracker(input_cost_per_1k: Decimal | None = None, output_cost_per_1k: Decimal | None = None)

Detailed cost accounting with thread-safety and per-stage breakdowns.

Scope: Detailed financial tracking and reporting Pattern: Accumulator with thread-safe operations

Use CostTracker for: - Stage-by-stage cost breakdowns - Detailed entry logging (timestamp, model, tokens) - Thread-safe accumulation in concurrent execution - Cost reporting and analytics - Budget enforcement (via BudgetController)

NOT for: - Simple orchestration state (use ExecutionContext for that)

Why separate from ExecutionContext? - CostTracker = detailed accounting system (entries, breakdowns, thread-safety) - ExecutionContext = orchestration state (progress, session, timing) - Different concerns: accounting vs execution control

Thread Safety: - All methods protected by threading.Lock - Safe for concurrent LLM invocations

Example

tracker = CostTracker( input_cost_per_1k=Decimal("0.00015"), output_cost_per_1k=Decimal("0.0006") ) cost = tracker.add(tokens_in=1000, tokens_out=500, model="gpt-4o-mini") breakdown = tracker.get_stage_costs() # {"llm_invocation": Decimal("0.00045")}

See Also: - ExecutionContext: For orchestration-level state - BudgetController: For cost limit enforcement - docs/TECHNICAL_REFERENCE.md: Cost tracking architecture

Initialize cost tracker.

Parameters:

Name Type Description Default
input_cost_per_1k Decimal | None

Input token cost per 1K tokens

None
output_cost_per_1k Decimal | None

Output token cost per 1K tokens

None
Source code in ondine/utils/cost_tracker.py
def __init__(
    self,
    input_cost_per_1k: Decimal | None = None,
    output_cost_per_1k: Decimal | None = None,
):
    """
    Initialize cost tracker.

    Args:
        input_cost_per_1k: Input token cost per 1K tokens
        output_cost_per_1k: Output token cost per 1K tokens
    """
    self.input_cost_per_1k = input_cost_per_1k or Decimal("0.0")
    self.output_cost_per_1k = output_cost_per_1k or Decimal("0.0")

    self._total_input_tokens = 0
    self._total_output_tokens = 0
    self._total_cost = Decimal("0.0")
    self._entries: list[CostEntry] = []
    self._stage_costs: dict[str, Decimal] = {}
    self._lock = threading.Lock()

total_cost property

total_cost: Decimal

Get total accumulated cost.

total_tokens property

total_tokens: int

Get total token count.

input_tokens property

input_tokens: int

Get total input tokens.

output_tokens property

output_tokens: int

Get total output tokens.

add

add(tokens_in: int, tokens_out: int, model: str, timestamp: float, stage: str | None = None) -> Decimal

Add cost entry.

Parameters:

Name Type Description Default
tokens_in int

Input tokens used

required
tokens_out int

Output tokens used

required
model str

Model identifier

required
timestamp float

Timestamp of request

required
stage str | None

Optional stage name

None

Returns:

Type Description
Decimal

Cost for this entry

Source code in ondine/utils/cost_tracker.py
def add(
    self,
    tokens_in: int,
    tokens_out: int,
    model: str,
    timestamp: float,
    stage: str | None = None,
) -> Decimal:
    """
    Add cost entry.

    Args:
        tokens_in: Input tokens used
        tokens_out: Output tokens used
        model: Model identifier
        timestamp: Timestamp of request
        stage: Optional stage name

    Returns:
        Cost for this entry
    """
    cost = self.calculate_cost(tokens_in, tokens_out)

    with self._lock:
        entry = CostEntry(
            tokens_in=tokens_in,
            tokens_out=tokens_out,
            cost=cost,
            model=model,
            timestamp=timestamp,
        )
        self._entries.append(entry)

        self._total_input_tokens += tokens_in
        self._total_output_tokens += tokens_out
        self._total_cost += cost

        if stage:
            self._stage_costs[stage] = (
                self._stage_costs.get(stage, Decimal("0.0")) + cost
            )

    return cost

calculate_cost

calculate_cost(tokens_in: int, tokens_out: int) -> Decimal

Calculate cost for given token counts.

Parameters:

Name Type Description Default
tokens_in int

Input tokens

required
tokens_out int

Output tokens

required

Returns:

Type Description
Decimal

Total cost

Source code in ondine/utils/cost_tracker.py
def calculate_cost(self, tokens_in: int, tokens_out: int) -> Decimal:
    """
    Calculate cost for given token counts.

    Args:
        tokens_in: Input tokens
        tokens_out: Output tokens

    Returns:
        Total cost
    """
    from ondine.utils.cost_calculator import CostCalculator

    return CostCalculator.calculate(
        tokens_in=tokens_in,
        tokens_out=tokens_out,
        input_cost_per_1k=self.input_cost_per_1k,
        output_cost_per_1k=self.output_cost_per_1k,
    )

get_estimate

get_estimate(rows: int = 0) -> CostEstimate

Get cost estimate.

Parameters:

Name Type Description Default
rows int

Number of rows processed

0

Returns:

Type Description
CostEstimate

CostEstimate object

Source code in ondine/utils/cost_tracker.py
def get_estimate(self, rows: int = 0) -> CostEstimate:
    """
    Get cost estimate.

    Args:
        rows: Number of rows processed

    Returns:
        CostEstimate object
    """
    with self._lock:
        total_tokens = self._total_input_tokens + self._total_output_tokens
        return CostEstimate(
            total_cost=self._total_cost,
            total_tokens=total_tokens,
            input_tokens=self._total_input_tokens,
            output_tokens=self._total_output_tokens,
            rows=rows,
            breakdown_by_stage=dict(self._stage_costs),
            confidence="actual",
        )

reset

reset() -> None

Reset all tracking.

Source code in ondine/utils/cost_tracker.py
def reset(self) -> None:
    """Reset all tracking."""
    with self._lock:
        self._total_input_tokens = 0
        self._total_output_tokens = 0
        self._total_cost = Decimal("0.0")
        self._entries.clear()
        self._stage_costs.clear()

get_stage_costs

get_stage_costs() -> dict[str, Decimal]

Get costs breakdown by stage.

Source code in ondine/utils/cost_tracker.py
def get_stage_costs(self) -> dict[str, Decimal]:
    """Get costs breakdown by stage."""
    with self._lock:
        return dict(self._stage_costs)

PreprocessingStats dataclass

PreprocessingStats(rows_processed: int, chars_before: int, chars_after: int, truncated_count: int, null_count: int)

Statistics from preprocessing operation.

reduction_pct property

reduction_pct: float

Calculate character reduction percentage.

TextPreprocessor

TextPreprocessor(max_length: int = 500)

Composable text preprocessor following Chain of Responsibility.

Single Responsibility: Orchestrate cleaning steps. Open/Closed: Extensible via cleaners list. Dependency Inversion: Depends on Protocol, not concrete classes.

Initialize with default cleaning pipeline.

Source code in ondine/utils/input_preprocessing.py
def __init__(self, max_length: int = 500):
    """Initialize with default cleaning pipeline."""
    self.cleaners: list[TextCleaner] = [
        UnicodeNormalizer(),
        ControlCharRemover(),
        SpecialCharCleaner(),
        WhitespaceNormalizer(),
        TextTruncator(max_length),
    ]

process

process(text: str) -> str

Apply all cleaners in sequence.

Source code in ondine/utils/input_preprocessing.py
def process(self, text: str) -> str:
    """Apply all cleaners in sequence."""
    if pd.isna(text) or not isinstance(text, str):
        return ""

    for cleaner in self.cleaners:
        text = cleaner.clean(text)

    return text

add_cleaner

add_cleaner(cleaner: TextCleaner) -> None

Extend pipeline with custom cleaner.

Source code in ondine/utils/input_preprocessing.py
def add_cleaner(self, cleaner: TextCleaner) -> None:
    """Extend pipeline with custom cleaner."""
    self.cleaners.append(cleaner)

RateLimiter

RateLimiter(requests_per_minute: int, burst_size: int | None = None)

Token bucket rate limiter for controlling API request rates.

Thread-safe implementation.

Initialize rate limiter.

Parameters:

Name Type Description Default
requests_per_minute int

Maximum requests per minute

required
burst_size int | None

Maximum burst size (default: requests_per_minute)

None
Source code in ondine/utils/rate_limiter.py
def __init__(self, requests_per_minute: int, burst_size: int | None = None):
    """
    Initialize rate limiter.

    Args:
        requests_per_minute: Maximum requests per minute
        burst_size: Maximum burst size (default: requests_per_minute)
    """
    self.rpm = requests_per_minute
    self.capacity = burst_size or requests_per_minute
    self.tokens = float(self.capacity)
    self.last_update = time.time()
    self.lock = threading.Lock()

    # Calculate refill rate (tokens per second)
    self.refill_rate = requests_per_minute / 60.0

available_tokens property

available_tokens: float

Get current available tokens.

acquire

acquire(tokens: int = 1, timeout: float | None = None) -> bool

Acquire tokens for making requests.

Parameters:

Name Type Description Default
tokens int

Number of tokens to acquire

1
timeout float | None

Maximum wait time in seconds (None = wait forever)

None

Returns:

Type Description
bool

True if tokens acquired, False if timeout

Raises:

Type Description
ValueError

If tokens > capacity

Source code in ondine/utils/rate_limiter.py
def acquire(self, tokens: int = 1, timeout: float | None = None) -> bool:
    """
    Acquire tokens for making requests.

    Args:
        tokens: Number of tokens to acquire
        timeout: Maximum wait time in seconds (None = wait forever)

    Returns:
        True if tokens acquired, False if timeout

    Raises:
        ValueError: If tokens > capacity
    """
    if tokens > self.capacity:
        raise ValueError(
            f"Requested {tokens} tokens exceeds capacity {self.capacity}"
        )

    deadline = None if timeout is None else time.time() + timeout

    while True:
        with self.lock:
            self._refill()

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True

        # Check timeout
        if deadline is not None and time.time() >= deadline:
            return False

        # Sleep before retry
        time.sleep(0.1)

reset

reset() -> None

Reset rate limiter to full capacity.

Source code in ondine/utils/rate_limiter.py
def reset(self) -> None:
    """Reset rate limiter to full capacity."""
    with self.lock:
        self.tokens = float(self.capacity)
        self.last_update = time.time()

NetworkError

Bases: RetryableError

Network-related error.

RateLimitError

Bases: RetryableError

Rate limit exceeded error.

RetryableError

Bases: Exception

Base class for errors that should be retried.

RetryHandler

RetryHandler(max_attempts: int = 3, initial_delay: float = 1.0, max_delay: float = 60.0, exponential_base: int = 2, retryable_exceptions: tuple[type[Exception], ...] | None = None)

Request-level retry with exponential backoff (for transient errors).

Scope: Single LLM API call or operation Use when: Transient errors (rate limits, network timeouts, API hiccups) NOT for: Row-level quality issues (use Pipeline.auto_retry_failed for that)

Retry Strategy: - Exponential backoff (1s, 2s, 4s, 8s, ...) - Configurable max attempts (default: 3) - Only retries specific exception types

Example

handler = RetryHandler(max_attempts=3, initial_delay=1.0) result = handler.execute(lambda: call_llm_api())

See Also: - ErrorHandler: Orchestrates retry decisions based on policy - Pipeline._auto_retry_failed_rows(): Row-level quality retry - docs/architecture/decisions/ADR-006-retry-levels.md

Initialize retry handler.

Parameters:

Name Type Description Default
max_attempts int

Maximum retry attempts

3
initial_delay float

Initial delay in seconds

1.0
max_delay float

Maximum delay in seconds

60.0
exponential_base int

Base for exponential backoff

2
retryable_exceptions tuple[type[Exception], ...] | None

Exception types to retry

None
Source code in ondine/utils/retry_handler.py
def __init__(
    self,
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: int = 2,
    retryable_exceptions: tuple[type[Exception], ...] | None = None,
):
    """
    Initialize retry handler.

    Args:
        max_attempts: Maximum retry attempts
        initial_delay: Initial delay in seconds
        max_delay: Maximum delay in seconds
        exponential_base: Base for exponential backoff
        retryable_exceptions: Exception types to retry
    """
    self.max_attempts = max_attempts
    self.initial_delay = initial_delay
    self.max_delay = max_delay
    self.exponential_base = exponential_base

    if retryable_exceptions is None:
        self.retryable_exceptions = (
            RetryableError,
            RateLimitError,
            NetworkError,
        )
    else:
        self.retryable_exceptions = retryable_exceptions

execute

execute(func: Callable[[], T]) -> T

Execute function with retry logic.

Parameters:

Name Type Description Default
func Callable[[], T]

Function to execute

required

Returns:

Type Description
T

Result from function

Raises:

Type Description
Exception

If all retries exhausted

Source code in ondine/utils/retry_handler.py
def execute(self, func: Callable[[], T]) -> T:
    """
    Execute function with retry logic.

    Args:
        func: Function to execute

    Returns:
        Result from function

    Raises:
        Exception: If all retries exhausted
    """
    retryer = Retrying(
        stop=stop_after_attempt(self.max_attempts),
        wait=wait_exponential(
            multiplier=self.initial_delay,
            max=self.max_delay,
            exp_base=self.exponential_base,
        ),
        retry=retry_if_exception_type(self.retryable_exceptions),
        reraise=True,
    )

    return retryer(func)

calculate_delay

calculate_delay(attempt: int) -> float

Calculate delay for given attempt number.

Parameters:

Name Type Description Default
attempt int

Attempt number (1-based)

required

Returns:

Type Description
float

Delay in seconds

Source code in ondine/utils/retry_handler.py
def calculate_delay(self, attempt: int) -> float:
    """
    Calculate delay for given attempt number.

    Args:
        attempt: Attempt number (1-based)

    Returns:
        Delay in seconds
    """
    delay = self.initial_delay * (self.exponential_base ** (attempt - 1))
    return min(delay, self.max_delay)

preprocess_dataframe

preprocess_dataframe(df: DataFrame, input_columns: list[str], max_length: int = 500) -> tuple[pd.DataFrame, PreprocessingStats]

Preprocess input columns in dataframe.

Parameters:

Name Type Description Default
df DataFrame

Input dataframe

required
input_columns list[str]

Columns to clean

required
max_length int

Max chars per field

500

Returns:

Type Description
tuple[DataFrame, PreprocessingStats]

(cleaned_df, stats)

Source code in ondine/utils/input_preprocessing.py
def preprocess_dataframe(
    df: pd.DataFrame,
    input_columns: list[str],
    max_length: int = 500,
) -> tuple[pd.DataFrame, PreprocessingStats]:
    """
    Preprocess input columns in dataframe.

    Args:
        df: Input dataframe
        input_columns: Columns to clean
        max_length: Max chars per field

    Returns:
        (cleaned_df, stats)
    """
    result = df.copy()
    preprocessor = TextPreprocessor(max_length)

    chars_before = 0
    chars_after = 0
    truncated = 0
    nulls = 0

    for col in input_columns:
        if col not in result.columns:
            continue

        for idx in result.index:
            original = result.at[idx, col]

            if pd.isna(original):
                nulls += 1
                continue

            original_str = str(original)
            chars_before += len(original_str)

            cleaned = preprocessor.process(original_str)
            chars_after += len(cleaned)

            if len(original_str) > max_length:
                truncated += 1

            result.at[idx, col] = cleaned

    stats = PreprocessingStats(
        rows_processed=len(result),
        chars_before=chars_before,
        chars_after=chars_after,
        truncated_count=truncated,
        null_count=nulls,
    )

    return result, stats

configure_logging

configure_logging(level: str = 'INFO', json_format: bool = False, include_timestamp: bool = True) -> None

Configure structured logging for the SDK.

Parameters:

Name Type Description Default
level str

Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

'INFO'
json_format bool

Use JSON output format

False
include_timestamp bool

Include timestamps in logs

True
Source code in ondine/utils/logging_utils.py
def configure_logging(
    level: str = "INFO",
    json_format: bool = False,
    include_timestamp: bool = True,
) -> None:
    """
    Configure structured logging for the SDK.

    Args:
        level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        json_format: Use JSON output format
        include_timestamp: Include timestamps in logs
    """
    global _logging_configured

    # Set stdlib logging level
    logging.basicConfig(
        format="%(message)s",
        stream=sys.stdout,
        level=getattr(logging, level.upper()),
    )

    # Configure structlog processors
    processors = [
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.StackInfoRenderer(),
    ]

    if include_timestamp:
        processors.append(structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"))

    if json_format:
        processors.append(structlog.processors.JSONRenderer())
    else:
        # Use custom compact console renderer (no padding)
        processors.append(_compact_console_renderer)

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, level.upper())
        ),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

    _logging_configured = True

get_logger

get_logger(name: str) -> structlog.BoundLogger

Get a structured logger instance.

Auto-configures logging on first use if not already configured.

Parameters:

Name Type Description Default
name str

Logger name (typically name)

required

Returns:

Type Description
BoundLogger

Configured structlog logger

Source code in ondine/utils/logging_utils.py
def get_logger(name: str) -> structlog.BoundLogger:
    """
    Get a structured logger instance.

    Auto-configures logging on first use if not already configured.

    Args:
        name: Logger name (typically __name__)

    Returns:
        Configured structlog logger
    """
    global _logging_configured

    # Auto-configure logging on first use
    if not _logging_configured:
        configure_logging()

    return structlog.get_logger(name)

sanitize_for_logging

sanitize_for_logging(data: dict[str, Any]) -> dict[str, Any]

Sanitize sensitive data for logging.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary potentially containing sensitive data

required

Returns:

Type Description
dict[str, Any]

Sanitized dictionary

Source code in ondine/utils/logging_utils.py
def sanitize_for_logging(data: dict[str, Any]) -> dict[str, Any]:
    """
    Sanitize sensitive data for logging.

    Args:
        data: Dictionary potentially containing sensitive data

    Returns:
        Sanitized dictionary
    """
    sensitive_keys = {
        "api_key",
        "password",
        "secret",
        "token",
        "authorization",
        "credential",
    }

    sanitized = {}
    for key, value in data.items():
        key_lower = key.lower()
        if any(sensitive in key_lower for sensitive in sensitive_keys):
            sanitized[key] = "***REDACTED***"
        elif isinstance(value, dict):
            sanitized[key] = sanitize_for_logging(value)
        else:
            sanitized[key] = value

    return sanitized