utils¶
utils ¶
Utility modules for cross-cutting concerns.
BudgetController ¶
BudgetController(max_budget: Decimal | None = None, warn_at_75: bool = True, warn_at_90: bool = True, fail_on_exceed: bool = True)
Controls and enforces budget limits during execution.
Follows Single Responsibility: only handles budget management.
Initialize budget controller.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_budget
|
Decimal | None
|
Maximum allowed budget in USD |
None
|
warn_at_75
|
bool
|
Warn at 75% of budget |
True
|
warn_at_90
|
bool
|
Warn at 90% of budget |
True
|
fail_on_exceed
|
bool
|
Raise error if budget exceeded |
True
|
Source code in ondine/utils/budget_controller.py
check_budget ¶
Check if budget is within limits.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current_cost
|
Decimal
|
Current accumulated cost |
required |
Raises:
| Type | Description |
|---|---|
BudgetExceededError
|
If budget exceeded and fail_on_exceed=True |
Source code in ondine/utils/budget_controller.py
get_remaining ¶
Get remaining budget.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current_cost
|
Decimal
|
Current accumulated cost |
required |
Returns:
| Type | Description |
|---|---|
Decimal | None
|
Remaining budget or None if no limit |
Source code in ondine/utils/budget_controller.py
get_usage_percentage ¶
Get budget usage as percentage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current_cost
|
Decimal
|
Current accumulated cost |
required |
Returns:
| Type | Description |
|---|---|
float | None
|
Usage percentage or None if no limit |
Source code in ondine/utils/budget_controller.py
can_afford ¶
Check if estimated additional cost is within budget.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
estimated_cost
|
Decimal
|
Estimated cost for next operation |
required |
current_cost
|
Decimal
|
Current accumulated cost |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if within budget |
Source code in ondine/utils/budget_controller.py
BudgetExceededError ¶
Bases: Exception
Raised when budget limit is exceeded.
CostCalculator ¶
Centralized cost calculation for LLM API usage.
Single Responsibility: Calculate cost from token counts and pricing. Used by: LLMClient, CostTracker, and any component needing cost calculation.
Design Decision: Centralize the cost formula in one place to ensure consistency and make future changes (e.g., tiered pricing) easier.
calculate
staticmethod
¶
calculate(tokens_in: int, tokens_out: int, input_cost_per_1k: Decimal, output_cost_per_1k: Decimal) -> Decimal
Calculate cost from token counts and pricing.
Formula
cost = (tokens_in / 1000) * input_cost_per_1k + (tokens_out / 1000) * output_cost_per_1k
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tokens_in
|
int
|
Number of input tokens |
required |
tokens_out
|
int
|
Number of output tokens |
required |
input_cost_per_1k
|
Decimal
|
Cost per 1000 input tokens |
required |
output_cost_per_1k
|
Decimal
|
Cost per 1000 output tokens |
required |
Returns:
| Type | Description |
|---|---|
Decimal
|
Total cost as Decimal (exact precision for financial calculations) |
Example
from decimal import Decimal cost = CostCalculator.calculate( ... tokens_in=1000, ... tokens_out=500, ... input_cost_per_1k=Decimal("0.00005"), ... output_cost_per_1k=Decimal("0.00008") ... ) cost Decimal('0.00009')
Source code in ondine/utils/cost_calculator.py
CostTracker ¶
Detailed cost accounting with thread-safety and per-stage breakdowns.
Scope: Detailed financial tracking and reporting Pattern: Accumulator with thread-safe operations
Use CostTracker for: - Stage-by-stage cost breakdowns - Detailed entry logging (timestamp, model, tokens) - Thread-safe accumulation in concurrent execution - Cost reporting and analytics - Budget enforcement (via BudgetController)
NOT for: - Simple orchestration state (use ExecutionContext for that)
Why separate from ExecutionContext? - CostTracker = detailed accounting system (entries, breakdowns, thread-safety) - ExecutionContext = orchestration state (progress, session, timing) - Different concerns: accounting vs execution control
Thread Safety: - All methods protected by threading.Lock - Safe for concurrent LLM invocations
Example
tracker = CostTracker( input_cost_per_1k=Decimal("0.00015"), output_cost_per_1k=Decimal("0.0006") ) cost = tracker.add(tokens_in=1000, tokens_out=500, model="gpt-4o-mini") breakdown = tracker.get_stage_costs() # {"llm_invocation": Decimal("0.00045")}
See Also: - ExecutionContext: For orchestration-level state - BudgetController: For cost limit enforcement - docs/TECHNICAL_REFERENCE.md: Cost tracking architecture
Initialize cost tracker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_cost_per_1k
|
Decimal | None
|
Input token cost per 1K tokens |
None
|
output_cost_per_1k
|
Decimal | None
|
Output token cost per 1K tokens |
None
|
Source code in ondine/utils/cost_tracker.py
add ¶
add(tokens_in: int, tokens_out: int, model: str, timestamp: float, stage: str | None = None) -> Decimal
Add cost entry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tokens_in
|
int
|
Input tokens used |
required |
tokens_out
|
int
|
Output tokens used |
required |
model
|
str
|
Model identifier |
required |
timestamp
|
float
|
Timestamp of request |
required |
stage
|
str | None
|
Optional stage name |
None
|
Returns:
| Type | Description |
|---|---|
Decimal
|
Cost for this entry |
Source code in ondine/utils/cost_tracker.py
calculate_cost ¶
Calculate cost for given token counts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tokens_in
|
int
|
Input tokens |
required |
tokens_out
|
int
|
Output tokens |
required |
Returns:
| Type | Description |
|---|---|
Decimal
|
Total cost |
Source code in ondine/utils/cost_tracker.py
get_estimate ¶
Get cost estimate.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rows
|
int
|
Number of rows processed |
0
|
Returns:
| Type | Description |
|---|---|
CostEstimate
|
CostEstimate object |
Source code in ondine/utils/cost_tracker.py
reset ¶
Reset all tracking.
PreprocessingStats
dataclass
¶
PreprocessingStats(rows_processed: int, chars_before: int, chars_after: int, truncated_count: int, null_count: int)
Statistics from preprocessing operation.
TextPreprocessor ¶
Composable text preprocessor following Chain of Responsibility.
Single Responsibility: Orchestrate cleaning steps. Open/Closed: Extensible via cleaners list. Dependency Inversion: Depends on Protocol, not concrete classes.
Initialize with default cleaning pipeline.
Source code in ondine/utils/input_preprocessing.py
process ¶
Apply all cleaners in sequence.
RateLimiter ¶
Token bucket rate limiter for controlling API request rates.
Thread-safe implementation.
Initialize rate limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
requests_per_minute
|
int
|
Maximum requests per minute |
required |
burst_size
|
int | None
|
Maximum burst size (default: requests_per_minute) |
None
|
Source code in ondine/utils/rate_limiter.py
acquire ¶
Acquire tokens for making requests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tokens
|
int
|
Number of tokens to acquire |
1
|
timeout
|
float | None
|
Maximum wait time in seconds (None = wait forever) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if tokens acquired, False if timeout |
Raises:
| Type | Description |
|---|---|
ValueError
|
If tokens > capacity |
Source code in ondine/utils/rate_limiter.py
NetworkError ¶
Bases: RetryableError
Network-related error.
RateLimitError ¶
Bases: RetryableError
Rate limit exceeded error.
RetryableError ¶
Bases: Exception
Base class for errors that should be retried.
RetryHandler ¶
RetryHandler(max_attempts: int = 3, initial_delay: float = 1.0, max_delay: float = 60.0, exponential_base: int = 2, retryable_exceptions: tuple[type[Exception], ...] | None = None)
Request-level retry with exponential backoff (for transient errors).
Scope: Single LLM API call or operation Use when: Transient errors (rate limits, network timeouts, API hiccups) NOT for: Row-level quality issues (use Pipeline.auto_retry_failed for that)
Retry Strategy: - Exponential backoff (1s, 2s, 4s, 8s, ...) - Configurable max attempts (default: 3) - Only retries specific exception types
Example
handler = RetryHandler(max_attempts=3, initial_delay=1.0) result = handler.execute(lambda: call_llm_api())
See Also: - ErrorHandler: Orchestrates retry decisions based on policy - Pipeline._auto_retry_failed_rows(): Row-level quality retry - docs/architecture/decisions/ADR-006-retry-levels.md
Initialize retry handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_attempts
|
int
|
Maximum retry attempts |
3
|
initial_delay
|
float
|
Initial delay in seconds |
1.0
|
max_delay
|
float
|
Maximum delay in seconds |
60.0
|
exponential_base
|
int
|
Base for exponential backoff |
2
|
retryable_exceptions
|
tuple[type[Exception], ...] | None
|
Exception types to retry |
None
|
Source code in ondine/utils/retry_handler.py
execute ¶
Execute function with retry logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[[], T]
|
Function to execute |
required |
Returns:
| Type | Description |
|---|---|
T
|
Result from function |
Raises:
| Type | Description |
|---|---|
Exception
|
If all retries exhausted |
Source code in ondine/utils/retry_handler.py
calculate_delay ¶
Calculate delay for given attempt number.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attempt
|
int
|
Attempt number (1-based) |
required |
Returns:
| Type | Description |
|---|---|
float
|
Delay in seconds |
Source code in ondine/utils/retry_handler.py
preprocess_dataframe ¶
preprocess_dataframe(df: DataFrame, input_columns: list[str], max_length: int = 500) -> tuple[pd.DataFrame, PreprocessingStats]
Preprocess input columns in dataframe.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input dataframe |
required |
input_columns
|
list[str]
|
Columns to clean |
required |
max_length
|
int
|
Max chars per field |
500
|
Returns:
| Type | Description |
|---|---|
tuple[DataFrame, PreprocessingStats]
|
(cleaned_df, stats) |
Source code in ondine/utils/input_preprocessing.py
configure_logging ¶
configure_logging(level: str = 'INFO', json_format: bool = False, include_timestamp: bool = True) -> None
Configure structured logging for the SDK.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level
|
str
|
Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) |
'INFO'
|
json_format
|
bool
|
Use JSON output format |
False
|
include_timestamp
|
bool
|
Include timestamps in logs |
True
|
Source code in ondine/utils/logging_utils.py
get_logger ¶
Get a structured logger instance.
Auto-configures logging on first use if not already configured.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Logger name (typically name) |
required |
Returns:
| Type | Description |
|---|---|
BoundLogger
|
Configured structlog logger |
Source code in ondine/utils/logging_utils.py
sanitize_for_logging ¶
Sanitize sensitive data for logging.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dictionary potentially containing sensitive data |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Sanitized dictionary |