Performance Testing Guide
This guide covers performance testing patterns, metrics, monitoring, and optimization strategies used in the ADK Agents integration test suite.
Overview
Performance testing in the ADK Agents system focuses on:
- Context Assembly Performance - Efficient context generation with token optimization
- Tool Orchestration Speed - Parallel vs sequential execution performance
- Memory Management - Leak detection and resource optimization
- Load Testing - Concurrent user simulation and scalability
- Token Optimization - Efficient token counting and management
- Cross-turn Correlation - Performance of conversation relationship detection
Performance Testing Architecture
Core Performance Metrics
Primary Metrics
- Execution Time - Total time for operation completion
- Memory Usage - Peak and average memory consumption
- CPU Usage - Processor utilization during operations
- Throughput - Operations per second
- Token Processing Speed - Tokens processed per second
- Context Assembly Time - Time to assemble context from components
Secondary Metrics
- Response Time Distribution - P50, P95, P99 percentiles
- Error Rate - Percentage of failed operations
- Resource Utilization - System resource efficiency
- Concurrent User Capacity - Maximum supported concurrent users
- Memory Leak Detection - Memory growth over time
- Context Size Efficiency - Token count vs content ratio
Performance Test Categories
1. Baseline Performance Tests
Tests that establish performance baselines for core operations.
class TestBaselinePerformance:
def test_context_assembly_performance(self, context_manager):
"""Test baseline context assembly performance."""
# Arrange - Setup test data
context_manager.add_code_snippet("test.py", "def test(): pass", 1, 10)
context_manager.add_tool_result("test_tool", {"result": "success"})
# Act - Measure performance
start_time = time.time()
context_dict, token_count = context_manager.assemble_context(10000)
end_time = time.time()
execution_time = end_time - start_time
# Assert - Verify performance thresholds
assert execution_time < 0.1 # 100ms threshold
assert token_count > 0
assert "conversation_history" in context_dict
2. Load Testing
Tests that simulate realistic user loads and concurrent operations.
class TestLoadPerformance:
@pytest.mark.asyncio
async def test_concurrent_context_assembly(self, context_manager):
"""Test context assembly under concurrent load."""
concurrent_users = 10
operations_per_user = 100
async def simulate_user_operations(user_id):
results = []
for i in range(operations_per_user):
start_time = time.time()
context_dict, token_count = context_manager.assemble_context(5000)
end_time = time.time()
results.append({
"user_id": user_id,
"operation": i,
"execution_time": end_time - start_time,
"token_count": token_count
})
return results
# Execute concurrent operations
tasks = [simulate_user_operations(i) for i in range(concurrent_users)]
all_results = await asyncio.gather(*tasks)
# Analyze performance
flat_results = [r for results in all_results for r in results]
avg_execution_time = sum(r["execution_time"] for r in flat_results) / len(flat_results)
assert avg_execution_time < 0.2 # 200ms average under load
3. Stress Testing
Tests that push the system beyond normal operating conditions.
class TestStressPerformance:
@pytest.mark.asyncio
async def test_extreme_load_handling(self, context_manager):
"""Test system behavior under extreme load."""
# Extreme parameters
concurrent_users = 50
operations_per_user = 200
large_context_size = 50000
# Monitor system resources
resource_monitor = ResourceMonitor()
resource_monitor.start_monitoring()
async def stress_operations(user_id):
for i in range(operations_per_user):
try:
context_dict, token_count = context_manager.assemble_context(large_context_size)
await asyncio.sleep(0.001) # Minimal delay
except Exception as e:
# Log but don't fail - expect some degradation
print(f"User {user_id} operation {i} failed: {e}")
# Execute stress test
tasks = [stress_operations(i) for i in range(concurrent_users)]
await asyncio.gather(*tasks, return_exceptions=True)
metrics = resource_monitor.stop_monitoring()
# Verify system didn't crash and resources are reasonable
assert metrics.peak_memory_mb < 2000 # 2GB limit
assert metrics.peak_cpu_percent < 95 # 95% CPU limit
4. Memory Performance Tests
Tests that focus on memory usage patterns and leak detection.
class TestMemoryPerformance:
def test_memory_leak_detection(self, context_manager):
"""Test for memory leaks in context management."""
import gc
import tracemalloc
tracemalloc.start()
# Baseline memory
gc.collect()
baseline_snapshot = tracemalloc.take_snapshot()
# Execute many operations
for i in range(1000):
context_manager.add_code_snippet(f"test_{i}.py", f"def test_{i}(): pass", 1, 10)
context_dict, token_count = context_manager.assemble_context(10000)
# Clear context periodically
if i % 100 == 0:
context_manager.clear_context()
# Final memory measurement
gc.collect()
final_snapshot = tracemalloc.take_snapshot()
# Calculate memory growth
top_stats = final_snapshot.compare_to(baseline_snapshot, 'lineno')
total_memory_growth = sum(stat.size_diff for stat in top_stats if stat.size_diff > 0)
# Assert reasonable memory growth (< 50MB)
assert total_memory_growth < 50 * 1024 * 1024 # 50MB limit
Performance Monitoring Infrastructure
Resource Monitor Implementation
class ResourceMonitor:
"""Monitor system resources during test execution."""
def __init__(self):
self.monitoring = False
self.start_time = None
self.metrics = {
"memory_usage": [],
"cpu_usage": [],
"operations": [],
"timestamps": []
}
def start_monitoring(self):
"""Start resource monitoring."""
self.monitoring = True
self.start_time = time.time()
# Start background monitoring thread
self.monitor_thread = threading.Thread(target=self._monitor_resources)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop monitoring and return metrics."""
self.monitoring = False
if hasattr(self, 'monitor_thread'):
self.monitor_thread.join(timeout=1.0)
return PerformanceMetrics(
execution_time=time.time() - self.start_time,
peak_memory_mb=max(self.metrics["memory_usage"]) if self.metrics["memory_usage"] else 0,
avg_memory_mb=sum(self.metrics["memory_usage"]) / len(self.metrics["memory_usage"]) if self.metrics["memory_usage"] else 0,
peak_cpu_percent=max(self.metrics["cpu_usage"]) if self.metrics["cpu_usage"] else 0,
avg_cpu_percent=sum(self.metrics["cpu_usage"]) / len(self.metrics["cpu_usage"]) if self.metrics["cpu_usage"] else 0,
total_operations=len(self.metrics["operations"]),
operations_per_second=len(self.metrics["operations"]) / (time.time() - self.start_time) if self.start_time else 0
)
def record_operation(self, operation_type="generic", success=True):
"""Record an operation for metrics."""
self.metrics["operations"].append({
"type": operation_type,
"success": success,
"timestamp": time.time()
})
def _monitor_resources(self):
"""Background resource monitoring."""
import psutil
while self.monitoring:
try:
# Get current process
process = psutil.Process()
# Record memory usage (MB)
memory_mb = process.memory_info().rss / 1024 / 1024
self.metrics["memory_usage"].append(memory_mb)
# Record CPU usage (%)
cpu_percent = process.cpu_percent()
self.metrics["cpu_usage"].append(cpu_percent)
# Record timestamp
self.metrics["timestamps"].append(time.time())
time.sleep(0.1) # Sample every 100ms
except Exception as e:
print(f"Resource monitoring error: {e}")
break
Performance Metrics Data Structure
@dataclass
class PerformanceMetrics:
"""Container for performance metrics."""
execution_time: float
peak_memory_mb: float
avg_memory_mb: float
peak_cpu_percent: float
avg_cpu_percent: float
total_operations: int
operations_per_second: float
def to_dict(self):
"""Convert metrics to dictionary for reporting."""
return {
"execution_time": self.execution_time,
"peak_memory_mb": self.peak_memory_mb,
"avg_memory_mb": self.avg_memory_mb,
"peak_cpu_percent": self.peak_cpu_percent,
"avg_cpu_percent": self.avg_cpu_percent,
"total_operations": self.total_operations,
"operations_per_second": self.operations_per_second
}
def meets_thresholds(self, thresholds):
"""Check if metrics meet performance thresholds."""
return (
self.execution_time <= thresholds.get("max_execution_time", float('inf')) and
self.peak_memory_mb <= thresholds.get("max_memory_mb", float('inf')) and
self.operations_per_second >= thresholds.get("min_operations_per_second", 0) and
self.peak_cpu_percent <= thresholds.get("max_cpu_percent", 100)
)
Performance Test Patterns
1. Benchmark Comparison Pattern
Compare performance between different implementations or configurations.
class TestPerformanceComparison:
def test_parallel_vs_sequential_execution(self, tool_orchestrator):
"""Compare parallel vs sequential tool execution performance."""
tools = [
("read_file", {"file_path": f"test_{i}.py"}),
("analyze_code", {"file_path": f"test_{i}.py"}),
("generate_tests", {"file_path": f"test_{i}.py"})
for i in range(10)
]
# Test sequential execution
start_time = time.time()
sequential_results = []
for tool_name, args in tools:
result = tool_orchestrator.execute_tool_sync(tool_name, args)
sequential_results.append(result)
sequential_time = time.time() - start_time
# Test parallel execution
start_time = time.time()
parallel_results = tool_orchestrator.execute_tools_parallel(tools)
parallel_time = time.time() - start_time
# Assert performance improvement
assert parallel_time < sequential_time * 0.8 # At least 20% improvement
assert len(parallel_results) == len(sequential_results)
# Log performance comparison
speedup = sequential_time / parallel_time
print(f"Parallel execution {speedup:.2f}x faster than sequential")
2. Scalability Testing Pattern
Test how performance scales with increasing load.
class TestScalabilityPerformance:
@pytest.mark.parametrize("user_count", [1, 5, 10, 25, 50])
def test_scalability_with_user_count(self, user_count, context_manager):
"""Test how performance scales with increasing user count."""
operations_per_user = 100
# Monitor performance
resource_monitor = ResourceMonitor()
resource_monitor.start_monitoring()
async def simulate_user_load(user_id):
for i in range(operations_per_user):
context_dict, token_count = context_manager.assemble_context(5000)
await asyncio.sleep(0.01) # Realistic user delay
# Execute load test
start_time = time.time()
tasks = [simulate_user_load(i) for i in range(user_count)]
await asyncio.gather(*tasks)
total_time = time.time() - start_time
metrics = resource_monitor.stop_monitoring()
# Calculate scalability metrics
throughput = (user_count * operations_per_user) / total_time
# Assert reasonable scalability
expected_min_throughput = max(50, 500 / user_count) # Adjust expectations
assert throughput >= expected_min_throughput
# Log scalability data
print(f"Users: {user_count}, Throughput: {throughput:.2f} ops/sec, "
f"Memory: {metrics.peak_memory_mb:.1f} MB")
3. Optimization Validation Pattern
Validate that optimizations improve performance without affecting functionality.
class TestOptimizationValidation:
def test_token_counting_optimization(self, context_manager):
"""Test that token counting optimization improves performance."""
# Test data
large_content = "def test_function():\n pass\n" * 1000
# Test without optimization
context_manager.disable_token_counting_optimization()
start_time = time.time()
for i in range(100):
context_manager.add_code_snippet(f"test_{i}.py", large_content, 1, 1000)
context_dict, token_count = context_manager.assemble_context(50000)
unoptimized_time = time.time() - start_time
# Clear and test with optimization
context_manager.clear_context()
context_manager.enable_token_counting_optimization()
start_time = time.time()
for i in range(100):
context_manager.add_code_snippet(f"test_{i}.py", large_content, 1, 1000)
context_dict, token_count = context_manager.assemble_context(50000)
optimized_time = time.time() - start_time
# Assert optimization improves performance
improvement = (unoptimized_time - optimized_time) / unoptimized_time
assert improvement > 0.2 # At least 20% improvement
print(f"Token counting optimization: {improvement:.1%} improvement")
4. Regression Testing Pattern
Detect performance regressions by comparing against baseline metrics.
class TestPerformanceRegression:
def test_context_assembly_regression(self, context_manager):
"""Test for performance regression in context assembly."""
# Load baseline metrics
baseline_metrics = self.load_baseline_metrics("context_assembly")
# Execute current performance test
resource_monitor = ResourceMonitor()
resource_monitor.start_monitoring()
# Perform standardized operations
for i in range(100):
context_manager.add_code_snippet(f"test_{i}.py", f"def test_{i}(): pass", 1, 10)
context_dict, token_count = context_manager.assemble_context(10000)
current_metrics = resource_monitor.stop_monitoring()
# Compare against baseline (allow 10% degradation)
assert current_metrics.execution_time <= baseline_metrics["execution_time"] * 1.1
assert current_metrics.peak_memory_mb <= baseline_metrics["peak_memory_mb"] * 1.1
assert current_metrics.operations_per_second >= baseline_metrics["operations_per_second"] * 0.9
# Update baseline if significantly improved
if current_metrics.execution_time < baseline_metrics["execution_time"] * 0.9:
self.update_baseline_metrics("context_assembly", current_metrics.to_dict())
def load_baseline_metrics(self, test_name):
"""Load baseline metrics from file."""
baseline_file = f"test_reports/baseline_{test_name}.json"
if os.path.exists(baseline_file):
with open(baseline_file, 'r') as f:
return json.load(f)
else:
# Return permissive defaults if no baseline exists
return {
"execution_time": 10.0,
"peak_memory_mb": 1000.0,
"operations_per_second": 1.0
}
def update_baseline_metrics(self, test_name, metrics):
"""Update baseline metrics file."""
baseline_file = f"test_reports/baseline_{test_name}.json"
os.makedirs(os.path.dirname(baseline_file), exist_ok=True)
with open(baseline_file, 'w') as f:
json.dump(metrics, f, indent=2)
Performance Thresholds and SLAs
Standard Performance Thresholds
PERFORMANCE_THRESHOLDS = {
"context_assembly": {
"max_execution_time": 0.5, # 500ms
"max_memory_mb": 500, # 500MB
"min_operations_per_second": 100,
"max_cpu_percent": 80
},
"tool_orchestration": {
"max_execution_time": 2.0, # 2 seconds
"max_memory_mb": 1000, # 1GB
"min_operations_per_second": 50,
"max_cpu_percent": 90
},
"load_testing": {
"max_execution_time": 10.0, # 10 seconds
"max_memory_mb": 2000, # 2GB
"min_operations_per_second": 200,
"max_cpu_percent": 95
}
}
Performance SLA Validation
def validate_performance_sla(metrics, test_category):
"""Validate performance metrics against SLA thresholds."""
thresholds = PERFORMANCE_THRESHOLDS.get(test_category, {})
violations = []
if "max_execution_time" in thresholds:
if metrics.execution_time > thresholds["max_execution_time"]:
violations.append(f"Execution time {metrics.execution_time:.2f}s exceeds limit {thresholds['max_execution_time']:.2f}s")
if "max_memory_mb" in thresholds:
if metrics.peak_memory_mb > thresholds["max_memory_mb"]:
violations.append(f"Memory usage {metrics.peak_memory_mb:.1f}MB exceeds limit {thresholds['max_memory_mb']:.1f}MB")
if "min_operations_per_second" in thresholds:
if metrics.operations_per_second < thresholds["min_operations_per_second"]:
violations.append(f"Throughput {metrics.operations_per_second:.1f} ops/sec below minimum {thresholds['min_operations_per_second']}")
if "max_cpu_percent" in thresholds:
if metrics.peak_cpu_percent > thresholds["max_cpu_percent"]:
violations.append(f"CPU usage {metrics.peak_cpu_percent:.1f}% exceeds limit {thresholds['max_cpu_percent']:.1f}%")
return violations
Performance Optimization Strategies
1. Context Assembly Optimization
class OptimizedContextManager:
"""Context manager with performance optimizations."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.token_cache = {}
self.content_cache = {}
self.optimization_enabled = True
def assemble_context(self, token_limit):
"""Optimized context assembly with caching."""
if not self.optimization_enabled:
return super().assemble_context(token_limit)
# Use cached results when possible
cache_key = self._generate_cache_key(token_limit)
if cache_key in self.content_cache:
return self.content_cache[cache_key]
# Perform optimized assembly
context_dict = {}
current_tokens = 0
# Prioritize content efficiently
prioritized_content = self._get_prioritized_content()
for content in prioritized_content:
content_tokens = self._get_cached_token_count(content)
if current_tokens + content_tokens <= token_limit:
context_dict[content["type"]] = content["data"]
current_tokens += content_tokens
else:
break
# Cache result
result = (context_dict, current_tokens)
self.content_cache[cache_key] = result
return result
def _get_cached_token_count(self, content):
"""Get token count with caching."""
content_hash = hash(str(content))
if content_hash not in self.token_cache:
self.token_cache[content_hash] = self._count_tokens(content)
return self.token_cache[content_hash]
2. Parallel Processing Optimization
class ParallelToolOrchestrator:
"""Tool orchestrator with parallel processing optimization."""
async def execute_tools_optimized(self, tools, max_concurrency=10):
"""Execute tools with optimized parallelism."""
# Group tools by dependency level
dependency_groups = self._group_by_dependencies(tools)
results = {}
# Execute each dependency level
for level, tool_group in dependency_groups.items():
# Limit concurrency to prevent resource exhaustion
semaphore = asyncio.Semaphore(max_concurrency)
async def execute_with_semaphore(tool):
async with semaphore:
return await self._execute_single_tool(tool, results)
# Execute tools in parallel within dependency level
level_results = await asyncio.gather(
*[execute_with_semaphore(tool) for tool in tool_group],
return_exceptions=True
)
# Update results
for tool, result in zip(tool_group, level_results):
if isinstance(result, Exception):
results[tool["id"]] = {"error": str(result)}
else:
results[tool["id"]] = result
return results
3. Memory Optimization
class MemoryOptimizedContextManager:
"""Context manager with memory optimization."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.memory_threshold_mb = 1000
self.cleanup_interval = 100
self.operation_count = 0
def add_content(self, content_type, content_data):
"""Add content with memory management."""
super().add_content(content_type, content_data)
self.operation_count += 1
# Periodic memory cleanup
if self.operation_count % self.cleanup_interval == 0:
self._cleanup_memory()
def _cleanup_memory(self):
"""Cleanup memory by removing old or low-priority content."""
import gc
import psutil
# Check current memory usage
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
if memory_mb > self.memory_threshold_mb:
# Remove old content
self._remove_old_content()
# Remove low-priority content
self._remove_low_priority_content()
# Force garbage collection
gc.collect()
def _remove_old_content(self):
"""Remove content older than threshold."""
cutoff_time = time.time() - 3600 # 1 hour
for content_type in list(self.content_store.keys()):
self.content_store[content_type] = [
item for item in self.content_store[content_type]
if item.get("timestamp", 0) > cutoff_time
]
def _remove_low_priority_content(self):
"""Remove content with low priority scores."""
for content_type in list(self.content_store.keys()):
if len(self.content_store[content_type]) > 100:
# Keep only top 100 priority items
self.content_store[content_type] = sorted(
self.content_store[content_type],
key=lambda x: x.get("priority_score", 0),
reverse=True
)[:100]
Performance Reporting and Analysis
Performance Report Generation
class PerformanceReportGenerator:
"""Generate comprehensive performance reports."""
def generate_report(self, test_results, output_file="performance_report.html"):
"""Generate HTML performance report."""
report_data = {
"timestamp": datetime.now().isoformat(),
"test_results": test_results,
"summary": self._generate_summary(test_results),
"recommendations": self._generate_recommendations(test_results)
}
html_content = self._generate_html_report(report_data)
with open(output_file, 'w') as f:
f.write(html_content)
return output_file
def _generate_summary(self, test_results):
"""Generate performance summary."""
total_tests = len(test_results)
passed_tests = sum(1 for r in test_results if r["status"] == "passed")
avg_execution_time = sum(r["metrics"]["execution_time"] for r in test_results) / total_tests
avg_memory_usage = sum(r["metrics"]["peak_memory_mb"] for r in test_results) / total_tests
return {
"total_tests": total_tests,
"passed_tests": passed_tests,
"success_rate": passed_tests / total_tests * 100,
"avg_execution_time": avg_execution_time,
"avg_memory_usage": avg_memory_usage
}
def _generate_recommendations(self, test_results):
"""Generate performance optimization recommendations."""
recommendations = []
# Check for slow tests
slow_tests = [r for r in test_results if r["metrics"]["execution_time"] > 5.0]
if slow_tests:
recommendations.append({
"type": "performance",
"priority": "high",
"message": f"Found {len(slow_tests)} slow tests (>5s). Consider optimization.",
"affected_tests": [t["name"] for t in slow_tests]
})
# Check for memory issues
memory_heavy_tests = [r for r in test_results if r["metrics"]["peak_memory_mb"] > 1000]
if memory_heavy_tests:
recommendations.append({
"type": "memory",
"priority": "medium",
"message": f"Found {len(memory_heavy_tests)} memory-heavy tests (>1GB). Review memory usage.",
"affected_tests": [t["name"] for t in memory_heavy_tests]
})
return recommendations
Best Practices for Performance Testing
1. Test Environment Consistency
- Use dedicated test environments for performance testing
- Control for external factors (network, disk, CPU load)
- Use consistent hardware specifications
- Monitor and account for system background processes
2. Baseline Management
- Establish performance baselines for critical operations
- Update baselines when legitimate improvements are made
- Track performance trends over time
- Alert on significant regressions
3. Test Data Management
- Use realistic test data that matches production patterns
- Ensure consistent test data across test runs
- Include edge cases and boundary conditions
- Scale test data appropriately for load testing
4. Measurement Accuracy
- Use appropriate measurement granularity
- Account for warmup time and JIT compilation
- Run multiple iterations and use statistical measures
- Isolate performance measurements from test setup
5. Performance Monitoring
- Monitor multiple metrics simultaneously
- Use percentile-based measurements (P95, P99)
- Track resource utilization patterns
- Identify performance bottlenecks systematically
Integration with CI/CD
Performance Gate Configuration
performance_gates:
context_assembly:
max_execution_time: 0.5
max_memory_mb: 500
min_throughput_ops_per_sec: 100
tool_orchestration:
max_execution_time: 2.0
max_memory_mb: 1000
min_throughput_ops_per_sec: 50
load_testing:
max_execution_time: 10.0
max_memory_mb: 2000
min_throughput_ops_per_sec: 200
Automated Performance Testing
name: Performance Testing
on:
pull_request:
branches: [main]
schedule:
- cron: '0 2 * * *' # Daily at 2 AM
jobs:
performance-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
uv sync --dev
- name: Run performance tests
run: |
./tests/integration/run_integration_tests.py --suite "Performance" --parallel
- name: Check performance gates
run: |
uv run python scripts/check_performance_gates.py test_reports/integration_test_report_latest.json
- name: Generate performance report
run: |
uv run python scripts/generate_performance_report.py
- name: Upload performance report
uses: actions/upload-artifact@v3
with:
name: performance-report
path: test_reports/performance_report.html
Conclusion
Performance testing is critical for ensuring the ADK Agents system maintains acceptable performance characteristics under various conditions. By implementing comprehensive performance testing patterns, monitoring infrastructure, and optimization strategies, you can:
- Detect performance regressions early
- Validate optimization efforts
- Ensure system scalability
- Maintain performance SLAs
- Provide data-driven insights for system improvements
Remember to:
- Establish baseline performance metrics
- Test under realistic conditions
- Monitor multiple performance dimensions
- Automate performance testing in CI/CD
- Optimize based on actual performance data
For general testing patterns, see the Test Patterns Guide.
For troubleshooting performance issues, see the Troubleshooting Guide.