Detection Engine Performance Tuning Guide¶
This guide provides comprehensive strategies for optimizing the FlaskAPI Guard Detection Engine for maximum performance while maintaining security effectiveness.
Performance Overview¶
The Detection Engine's performance is influenced by several factors:
- Pattern complexity and quantity
- Content size being analyzed
- Semantic analysis overhead
- Redis/Agent communication latency
- Concurrent request volume
Performance Metrics¶
Key Performance Indicators¶
Monitor these metrics to assess performance:
from flaskapi_guard.handlers.suspatterns_handler import sus_patterns_handler
# Get comprehensive performance statistics
stats = sus_patterns_handler.get_performance_stats()
# Key metrics to monitor
print(f"Average execution time: {stats['summary']['average_time']}s")
print(f"Timeout rate: {stats['summary']['timeout_rate']*100}%")
print(f"Match rate: {stats['summary']['match_rate']*100}%")
print(f"Slow patterns: {len(stats['slow_patterns'])}")
print(f"Problematic patterns: {len(stats['problematic_patterns'])}")
Performance Benchmarks¶
Target performance levels for different scenarios:
| Scenario | Target Response Time | Acceptable Timeout Rate |
|---|---|---|
| API Gateway | < 10ms | < 0.1% |
| Web Application | < 50ms | < 1% |
| High Security | < 100ms | < 2% |
| Batch Processing | < 500ms | < 5% |
Optimization Strategies¶
1. Pattern Optimization¶
Identify Slow Patterns¶
# Get patterns exceeding threshold
slow_patterns = monitor.get_slow_patterns(threshold=0.05)
for pattern_info in slow_patterns:
pattern = pattern_info['pattern']
avg_time = pattern_info['average_time']
if avg_time > 0.1:
# Consider removing or optimizing
print(f"Critical: {pattern} - {avg_time}s average")
Optimize Pattern Complexity¶
Before (Slow):
# Catastrophic backtracking risk
pattern = r"(.*)*attack"
pattern = r"(a+)+b"
pattern = r"(\w+)*@(\w+)*\.com"
After (Optimized):
# Atomic groups prevent backtracking
pattern = r"(?:.*?)attack"
pattern = r"(?:a+)b"
pattern = r"\w+@\w+\.com"
Use Non-Capturing Groups¶
# Slower - creates capture groups
pattern = r"(SELECT|INSERT|UPDATE).*(FROM|INTO)"
# Faster - non-capturing groups
pattern = r"(?:SELECT|INSERT|UPDATE).*(?:FROM|INTO)"
2. Content Preprocessing Optimization¶
Adjust Content Length¶
# For high-traffic APIs
config = SecurityConfig(
detection_max_content_length=2000, # Analyze less content
detection_preserve_attack_patterns=True # Still preserve threats
)
# For form submissions
config = SecurityConfig(
detection_max_content_length=5000, # Moderate analysis
)
# For file uploads
config = SecurityConfig(
detection_max_content_length=1000, # Minimal header analysis
)
Smart Truncation Strategy¶
# Configure based on content type
if content_type == "application/json":
config.detection_max_content_length = 5000
elif content_type == "multipart/form-data":
config.detection_max_content_length = 1000
else:
config.detection_max_content_length = 10000
3. Semantic Analysis Tuning¶
Disable for High-Performance Endpoints¶
# Selectively disable semantic analysis
@app.route("/health")
def health_check():
# Skip semantic analysis for health checks
return {"status": "ok"}
@app.route("/api/data", methods=["POST"])
def process_data():
# Full analysis for data endpoints
return handle_request(request)
Adjust Semantic Threshold¶
# Performance vs Security trade-off
# Higher threshold = Faster (fewer semantic checks triggered)
config = SecurityConfig(
detection_semantic_threshold=0.8 # Only high-confidence threats
)
# For critical endpoints
config = SecurityConfig(
detection_semantic_threshold=0.6 # More thorough analysis
)
4. Caching Optimization¶
Redis Configuration¶
# Optimize Redis settings
config = SecurityConfig(
enable_redis=True,
redis_url="redis://localhost:6379",
)
Pattern Compilation Cache¶
# Monitor cache effectiveness
compiler = sus_patterns_handler._compiler
cache_stats = compiler.get_cache_stats()
if cache_stats['hit_rate'] < 0.8:
# Increase cache size
compiler.max_cache_size = 2000
5. Timeout Configuration¶
Dynamic Timeout Adjustment¶
# Base timeout on endpoint criticality
@app.before_request
def adjust_detection_timeout():
import flask
if request.path.startswith("/api/critical"):
# Longer timeout for critical endpoints
flask.g.detection_timeout = 3.0
elif request.path.startswith("/static"):
# Minimal timeout for static resources
flask.g.detection_timeout = 0.5
else:
# Default timeout
flask.g.detection_timeout = 2.0
6. Parallel Processing¶
Batch Pattern Matching¶
from concurrent.futures import ThreadPoolExecutor, as_completed
# Process multiple patterns in parallel
def parallel_pattern_check(content: str, patterns: list):
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(check_pattern, content, pattern): pattern
for pattern in patterns
}
for future in as_completed(futures):
if future.result():
return True
return False
Performance Monitoring¶
Real-time Monitoring¶
import time
import logging
logger = logging.getLogger(__name__)
def monitor_performance():
stats = sus_patterns_handler.get_performance_stats()
# Alert on performance degradation
if stats['summary']['average_time'] > 0.05:
logger.warning(
f"Performance degradation detected: "
f"{stats['summary']['average_time']}s average"
)
# Check for anomalies
anomalies = monitor.get_anomalies()
if anomalies:
logger.error(f"Pattern anomalies detected: {len(anomalies)}")
Performance Dashboard¶
from flask import jsonify
# Create performance metrics endpoint
@app.route("/metrics/detection-engine")
def get_detection_metrics():
stats = sus_patterns_handler.get_performance_stats()
return jsonify({
"performance": {
"average_execution_ms": stats['summary']['average_time'] * 1000,
"p95_execution_ms": stats['summary'].get('p95_time', 0) * 1000,
"timeout_rate": stats['summary']['timeout_rate'],
"total_executions": stats['summary']['total_executions']
},
"patterns": {
"total": len(stats['all_patterns']),
"slow": len(stats['slow_patterns']),
"problematic": len(stats['problematic_patterns'])
},
"health": calculate_health_score(stats)
})
Troubleshooting Performance Issues¶
High CPU Usage¶
# 1. Check for runaway patterns
problematic = monitor.get_problematic_patterns()
for pattern in problematic:
if pattern['timeout_rate'] > 0.05:
# Remove or fix pattern
sus_patterns_handler.remove_pattern(
pattern['pattern'],
custom=True
)
# 2. Reduce concurrent execution
config.detection_max_concurrent = 10 # Limit parallel checks
Memory Issues¶
# 1. Reduce history size
config = SecurityConfig(
detection_monitor_history_size=500, # Smaller history
detection_max_tracked_patterns=500 # Track fewer patterns
)
# 2. Clear old data periodically
import threading
def cleanup_task():
while True:
monitor.clear_old_metrics()
compiler.clear_unused_cache()
time.sleep(3600) # Every hour
cleanup_thread = threading.Thread(target=cleanup_task, daemon=True)
cleanup_thread.start()
# 3. Monitor memory usage
import psutil
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
Latency Spikes¶
import random
# 1. Implement request sampling
class SamplingFilter:
def __init__(self, sample_rate=0.1):
self.sample_rate = sample_rate
def should_analyze(self, request):
# Only analyze sample of requests
return random.random() < self.sample_rate
# 2. Priority queue for critical paths
critical_paths = {"/api/payment", "/api/auth"}
if request.path in critical_paths:
# Full analysis
result = detect_full(content)
else:
# Light analysis
result = detect_light(content)
Best Practices¶
1. Regular Pattern Audits¶
# Weekly pattern review
def audit_patterns():
stats = sus_patterns_handler.get_performance_stats()
# Remove ineffective patterns
for pattern in stats['all_patterns']:
if pattern['match_rate'] < 0.0001 and pattern['age_days'] > 30:
logger.info(f"Removing ineffective pattern: {pattern['pattern']}")
sus_patterns_handler.remove_pattern(pattern['pattern'])
# Optimize slow patterns
for pattern in stats['slow_patterns']:
optimized = optimize_pattern(pattern['pattern'])
if optimized != pattern['pattern']:
sus_patterns_handler.remove_pattern(pattern['pattern'])
sus_patterns_handler.add_pattern(optimized, custom=True)
2. Load Testing¶
# Performance test script
import time
from concurrent.futures import ThreadPoolExecutor
def load_test():
test_contents = [
"normal request data",
"SELECT * FROM users WHERE id=1",
"<script>alert('xss')</script>",
# Add more test cases
]
start = time.time()
tasks = []
with ThreadPoolExecutor(max_workers=50) as executor:
for _ in range(1000):
for content in test_contents:
future = executor.submit(
sus_patterns_handler.detect,
content=content,
ip_address="127.0.0.1",
context="load_test"
)
tasks.append(future)
elapsed = time.time() - start
print(f"Processed {len(tasks)} requests in {elapsed:.2f}s")
print(f"Average: {elapsed/len(tasks)*1000:.2f}ms per request")
3. Gradual Rollout¶
# Feature flag for new patterns
NEW_PATTERNS_ENABLED = False
if NEW_PATTERNS_ENABLED:
sus_patterns_handler.add_pattern(new_pattern, custom=True)
# Canary deployment
if hash(request.remote_addr) % 100 < 10: # 10% of users
# Use new detection settings
config.detection_semantic_threshold = 0.6
else:
# Use stable settings
config.detection_semantic_threshold = 0.7
Performance Checklist¶
Before deploying to production:
- [ ] Average execution time < 50ms
- [ ] Timeout rate < 1%
- [ ] No patterns with > 100ms average execution
- [ ] Cache hit rate > 80%
- [ ] Memory usage stable over 24 hours
- [ ] CPU usage < 20% under normal load
- [ ] Tested with 10x expected traffic
- [ ] Monitoring alerts configured
- [ ] Rollback plan prepared
Next Steps¶
- Implement Custom Patterns optimized for performance
- Configure Monitoring Dashboard for real-time insights
- Review Architecture Guide for optimization opportunities