Cost Control¶
Ondine provides comprehensive cost management features to prevent budget overruns and optimize spending on LLM APIs.
Cost Optimization Strategies¶
1. Multi-Row Batching (100× Speedup) - NEW!¶
Process N rows in a single API call to reduce API calls by 100×:
# Traditional: 5M rows = 5M API calls (~69 hours)
pipeline = (
PipelineBuilder.create()
.from_csv("data.csv", input_columns=["text"])
.with_prompt("Classify: {text}")
.with_llm(provider="openai", model="gpt-4o-mini")
.build()
)
# With batching: 5M rows = 50K API calls (~42 minutes, 100× faster!)
pipeline = (
PipelineBuilder.create()
.from_csv("data.csv", input_columns=["text"])
.with_prompt("Classify: {text}")
.with_batch_size(100) # Process 100 rows per API call!
.with_llm(provider="openai", model="gpt-4o-mini")
.build()
)
Benefits: - 100× fewer API calls - 100× faster processing - Same token cost, eliminates API overhead - Automatic context window validation
Recommended batch sizes: - Simple prompts: 50-500 rows/batch - Complex prompts: 10-50 rows/batch - Start with 10, increase based on results
See examples/21_multi_row_batching.py for complete examples.
2. Prefix Caching (40-50% Cost Reduction) - NEW!¶
Cache static system prompts to reduce costs by 40-50%:
# Without caching: Pay full price for system prompt every row
pipeline = (
PipelineBuilder.create()
.with_prompt("You are a classifier. Classify: {text}")
.with_llm(provider="openai", model="gpt-4o-mini")
.build()
)
# With caching: System prompt cached, only pay for dynamic content
pipeline = (
PipelineBuilder.create()
.with_prompt("Classify: {text}") # Dynamic part
.with_system_prompt("You are a classifier.") # Cached!
.with_llm(provider="openai", model="gpt-4o-mini")
.build()
)
Benefits: - 40-50% cost reduction on cached tokens - 80-85% latency reduction - Automatic (OpenAI, Anthropic)
Combine Both for Maximum Savings:
# Prefix caching + Multi-row batching = 90%+ cost reduction!
pipeline = (
PipelineBuilder.create()
.with_prompt("Classify: {text}")
.with_system_prompt("You are a classifier.") # Cached
.with_batch_size(100) # 100× fewer API calls
.with_llm(provider="openai", model="gpt-4o-mini")
.build()
)
Pre-Execution Cost Estimation¶
Always estimate costs before processing large datasets:
from ondine import PipelineBuilder
pipeline = (
PipelineBuilder.create()
.from_csv("data.csv", input_columns=["text"], output_columns=["summary"])
.with_prompt("Summarize: {text}")
.with_llm(provider="openai", model="gpt-4o-mini")
.build()
)
# Get cost estimate
estimate = pipeline.estimate_cost()
print(f"Total rows: {estimate.total_rows}")
print(f"Estimated tokens: {estimate.estimated_tokens}")
print(f"Estimated cost: ${estimate.total_cost:.4f}")
print(f"Cost per row: ${estimate.cost_per_row:.6f}")
Budget Limits¶
Set maximum budget to prevent overspending:
from decimal import Decimal
pipeline = (
PipelineBuilder.create()
.from_csv("data.csv", ...)
.with_prompt("...")
.with_llm(provider="openai", model="gpt-4o-mini")
.with_max_budget(Decimal("10.0")) # Max $10 USD
.build()
)
# Execution stops if budget exceeded
result = pipeline.execute()
Real-Time Cost Tracking¶
Monitor costs during execution:
result = pipeline.execute()
# View detailed costs
print(f"Total cost: ${result.costs.total_cost:.4f}")
print(f"Input tokens: {result.costs.input_tokens:,}")
print(f"Output tokens: {result.costs.output_tokens:,}")
print(f"Total tokens: {result.costs.total_tokens:,}")
print(f"Cost per row: ${result.costs.total_cost / result.metrics.processed_rows:.6f}")
Cost Optimization Strategies¶
1. Use Prefix Caching (50-90% Cost Reduction) ⭐¶
Prefix caching is the most effective cost optimization technique, reducing costs by 50-90% by caching static system prompts.
OpenAI and Anthropic automatically cache system messages and reuse them across requests, charging only for dynamic content after the first request.
How It Works¶
Separate your prompt into two parts: - System prompt (static, cached): Instructions, context, examples - User prompt (dynamic, per-row): The actual data to process
# ❌ WITHOUT caching (old approach)
pipeline = (
PipelineBuilder.create()
.from_csv("reviews.csv", input_columns=["text"], output_columns=["sentiment"])
.with_prompt("""You are a sentiment classifier.
Classify as: positive, negative, or neutral.
Return only the label.
Review: {text}
Sentiment:""") # System message embedded → sent every time → full cost
.with_llm(provider="openai", model="gpt-4o-mini")
.build()
)
# ✅ WITH caching (new approach)
pipeline = (
PipelineBuilder.create()
.from_csv("reviews.csv", input_columns=["text"], output_columns=["sentiment"])
.with_prompt("Review: {text}\nSentiment:") # Only dynamic content
.with_system_prompt("""You are a sentiment classifier.
Classify as: positive, negative, or neutral.
Return only the label.""") # Cached by provider!
.with_llm(provider="openai", model="gpt-4o-mini")
.build()
)
Cost Comparison¶
For 5,000 rows with a 500-token system prompt:
| Approach | Tokens | Cost (GPT-4o-mini) | Savings |
|---|---|---|---|
| Without caching | 2.75M tokens | $0.41 | - |
| With caching | 250K tokens | $0.04 | 90% |
Provider Support¶
| Provider | Caching Support | Cost Reduction | Latency Reduction |
|---|---|---|---|
| OpenAI | ✅ Automatic | 50% on cached tokens | ~50% |
| Anthropic | ✅ Automatic | 90% on cached tokens | 85% |
| Groq | ❌ Not supported | - | - |
| Azure OpenAI | ✅ Automatic | 50% on cached tokens | ~50% |
Best Practices¶
- Keep system prompts static - No per-row variables
- Put all dynamic content in user prompt - Use template variables
- Use consistent system prompts - Caching works across requests
- Monitor token usage - Verify caching is working
Alternative Syntax¶
You can also set the system message in with_prompt():
.with_prompt(
template="Review: {text}\nSentiment:",
system_message="You are a sentiment classifier..."
)
Both approaches work identically - choose based on preference.
Verification¶
Check that caching is working by monitoring token counts:
result = pipeline.execute()
# First few rows: ~550 tokens/row (system + user)
# Remaining rows: ~50 tokens/row (user only, system cached)
avg_tokens = result.costs.total_tokens / result.metrics.processed_rows
print(f"Average tokens/row: {avg_tokens:.0f}") # Should be ~50-100 with caching
See examples/20_prefix_caching.py for a complete working example.
2. Choose Cost-Effective Models¶
# Expensive: GPT-4
.with_llm(provider="openai", model="gpt-4") # ~$0.03/1K tokens
# Cost-effective: GPT-4o-mini
.with_llm(provider="openai", model="gpt-4o-mini") # ~$0.0001/1K tokens
# Free: Local MLX (Apple Silicon)
.with_llm(provider="mlx", model="mlx-community/Qwen2.5-7B-Instruct-4bit") # $0
2. Optimize Prompts¶
Shorter prompts = lower costs:
# Expensive: Verbose prompt
prompt = """
You are a helpful assistant specialized in text summarization.
Please carefully read the following text and provide a comprehensive
summary that captures the main points while being concise.
Text: {text}
Please provide your summary below:
"""
# Cost-effective: Concise prompt
prompt = "Summarize in 1 sentence: {text}"
3. Use Temperature=0 for Deterministic Tasks¶
Lower temperature often produces shorter, more focused responses:
4. Set Max Tokens¶
Limit response length:
5. Batch Processing¶
Process multiple items per request when possible:
6. Use Cheaper Providers¶
Consider alternative providers for cost savings:
| Provider | Cost (per 1M tokens) | Speed |
|---|---|---|
| OpenAI GPT-4o-mini | $0.15 | Fast |
| Groq (Llama) | $0.05-0.10 | Very Fast |
| Together.AI | $0.20-0.60 | Fast |
| Local MLX | $0 | Medium |
Cost Reporting¶
Summary Report¶
result = pipeline.execute()
print("\n=== Cost Summary ===")
print(f"Rows processed: {result.metrics.processed_rows}")
print(f"Total cost: ${result.costs.total_cost:.4f}")
print(f"Average cost/row: ${result.costs.total_cost / result.metrics.processed_rows:.6f}")
print(f"Input tokens: {result.costs.input_tokens:,}")
print(f"Output tokens: {result.costs.output_tokens:,}")
Export to CSV¶
import pandas as pd
# Create cost report
cost_report = pd.DataFrame([{
"date": pd.Timestamp.now(),
"rows": result.metrics.processed_rows,
"total_cost": result.costs.total_cost,
"input_tokens": result.costs.input_tokens,
"output_tokens": result.costs.output_tokens,
"provider": "openai",
"model": "gpt-4o-mini"
}])
# Append to running cost log
cost_report.to_csv("cost_log.csv", mode="a", header=False, index=False)
Budget-Aware Workflows¶
Estimate Before Execution¶
def safe_execute(pipeline, max_cost=10.0):
estimate = pipeline.estimate_cost()
if estimate.total_cost > max_cost:
print(f"Estimated cost ${estimate.total_cost:.2f} exceeds budget ${max_cost:.2f}")
return None
print(f"Proceeding with estimated cost: ${estimate.total_cost:.2f}")
return pipeline.execute()
result = safe_execute(pipeline, max_cost=5.0)
Incremental Processing with Cost Checks¶
from ondine import PipelineBuilder
pipeline = (
PipelineBuilder.create()
.from_csv("large_data.csv", ...)
.with_streaming(chunk_size=1000)
.with_max_budget(Decimal("20.0"))
.build()
)
total_cost = 0.0
for chunk_result in pipeline.execute_stream():
total_cost += chunk_result.costs.total_cost
print(f"Chunk cost: ${chunk_result.costs.total_cost:.4f}, "
f"Total so far: ${total_cost:.4f}")
if total_cost > 15.0:
print("Approaching budget limit, stopping")
break
Cost Tracking Across Runs¶
Track costs across multiple pipeline runs:
from ondine.utils import CostTracker
tracker = CostTracker()
# Run multiple pipelines
for config in pipeline_configs:
pipeline = build_pipeline(config)
result = pipeline.execute()
tracker.add(
provider=config["provider"],
model=config["model"],
cost=result.costs.total_cost,
tokens=result.costs.total_tokens
)
# View summary
summary = tracker.summary()
print(f"Total spend: ${summary['total_cost']:.2f}")
print(f"Total tokens: {summary['total_tokens']:,}")
Related¶
- Execution Modes - Choose efficient execution strategy
- API Reference: CostTracker
- Structured Output - Optimize response parsing