Detection Engine Architecture¶
The FastAPI Guard Detection Engine uses a modular architecture that provides timeout-protected pattern matching with optional preprocessing and heuristic analysis. This document describes the actual implementation and how components interact.
System Architecture¶
flowchart TD
Request[FastAPI Request] --> Middleware[SecurityMiddleware<br/>• Checks if penetration detection enabled<br/>• Calls detect_penetration_attempt]
Middleware --> DPA[detect_penetration_attempt<br/>• Extracts content from request<br/>• Query, body, path, headers<br/>• Calls SusPatternsManager.detect]
DPA --> SPM[SusPatternsManager Singleton<br/>• Manages patterns: default + custom<br/>• Initializes components lazily<br/>• Orchestrates detection process]
SPM --> CP[ContentPreprocessor<br/>if configured]
SPM --> PC[PatternCompiler<br/>if configured]
SPM --> SA[SemanticAnalyzer<br/>if configured]
CP --> PM[PerformanceMonitor<br/>always created]
PC --> PM
SA --> PM
style Request fill:#e1f5fe,stroke:#01579b,stroke-width:2px
style Middleware fill:#fff3e0,stroke:#e65100,stroke-width:2px
style DPA fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
style SPM fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px
style PM fill:#fce4ec,stroke:#880e4f,stroke-width:2px
Core Components¶
1. SusPatternsManager¶
The central component that manages the detection process:
class SusPatternsManager:
"""Singleton pattern manager with enhanced detection capabilities."""
_instance: SusPatternsManager | None = None
_lock: threading.Lock = threading.Lock()
def __new__(cls) -> SusPatternsManager:
# Singleton implementation
def __init__(self) -> None:
# Load patterns from YAML files
self.patterns: list[str] = []
self.custom_patterns: set[str] = set()
self.compiled_patterns: list[re.Pattern] = []
# Detection engine components (initialized lazily)
self._compiler: PatternCompiler | None = None
self._preprocessor: ContentPreprocessor | None = None
self._semantic_analyzer: SemanticAnalyzer | None = None
self._performance_monitor: PerformanceMonitor | None = None
Key Responsibilities:
- Pattern loading from YAML files
- Custom pattern management
- Component initialization based on configuration
- Detection orchestration
- Result aggregation
2. Component Initialization¶
Components are initialized only when needed based on configuration:
def _ensure_detection_components(self) -> None:
"""Initialize detection engine components based on configuration."""
config = get_current_config()
# PatternCompiler: Only if timeout > 0
if config.detection_compiler_timeout > 0 and not self._compiler:
self._compiler = PatternCompiler(config)
# ContentPreprocessor: Only if max_content_length > 0
if config.detection_max_content_length > 0 and not self._preprocessor:
self._preprocessor = ContentPreprocessor(config)
# SemanticAnalyzer: Only if threshold > 0
if config.detection_semantic_threshold > 0 and not self._semantic_analyzer:
self._semantic_analyzer = SemanticAnalyzer(config)
# PerformanceMonitor: Always created
if not self._performance_monitor:
self._performance_monitor = PerformanceMonitor(config)
Detection Flow¶
1. Request Reception¶
# In middleware.py
if self.config.enable_penetration_detection:
detection_result, trigger_info = await detect_penetration_attempt(request)
if detection_result:
# Handle detected attack
2. Content Extraction¶
# In utils.py
async def detect_penetration_attempt(request: Request) -> tuple[bool, str]:
# Extract content from various sources
contents_to_check = []
# Query parameters
if request.query_params:
for key, value in request.query_params.items():
contents_to_check.append((f"{key}={value}", "query_param"))
# Request body
body = await get_body_content(request)
if body:
contents_to_check.append((body, "body"))
# Path parameters
if path_params := request.path_params:
contents_to_check.append((str(path_params), "path"))
# Headers
for header, value in request.headers.items():
contents_to_check.append((f"{header}: {value}", "header"))
3. Detection Process¶
# For each content piece
for content, context in contents_to_check:
result = await sus_patterns_handler.detect(
content=content,
ip_address=client_ip,
context=context,
correlation_id=correlation_id
)
if result["is_threat"]:
return True, format_trigger_info(result)
4. Detection Implementation¶
The actual detection in SusPatternsManager.detect()
:
async def detect(self, content: str, **kwargs) -> dict[str, Any]:
start_time = time.time()
threats = []
timeouts = []
# 1. Ensure components are initialized
self._ensure_detection_components()
# 2. Preprocess content
if self._preprocessor:
processed_content = self._preprocessor.preprocess(content)
preserved_attacks = processed_content != content[:len(processed_content)]
else:
processed_content = content
preserved_attacks = False
# 3. Pattern matching
for i, pattern in enumerate(self.compiled_patterns):
pattern_str = self.patterns[i] if i < len(self.patterns) else str(pattern.pattern)
if self._compiler:
# Use timeout-protected matching
safe_matcher = self._compiler.create_safe_matcher(pattern_str, pattern)
match_result = await safe_matcher(processed_content)
if match_result:
if match_result.get("timeout"):
timeouts.append(pattern_str)
elif match_result.get("match"):
threats.append({
"type": "regex",
"pattern": pattern_str,
"execution_time": match_result.get("execution_time", 0)
})
else:
# Fallback to direct matching
try:
if pattern.search(processed_content):
threats.append({
"type": "regex",
"pattern": pattern_str
})
except Exception:
pass
# 4. Semantic analysis
if self._semantic_analyzer and not threats:
semantic_result = self._semantic_analyzer.analyze_content(processed_content)
if semantic_result["score"] > self._semantic_analyzer.threshold:
threats.append({
"type": "semantic",
"score": semantic_result["score"],
"attack_types": semantic_result["attack_types"],
"confidence": semantic_result["confidence"]
})
# 5. Performance tracking
execution_time = time.time() - start_time
if self._performance_monitor:
for threat in threats:
await self._performance_monitor.record_metric(
pattern=threat.get("pattern", "semantic"),
execution_time=threat.get("execution_time", execution_time),
matched=True,
timeout=False
)
# 6. Build result
return {
"is_threat": len(threats) > 0,
"threat_score": max((t.get("score", 1.0) for t in threats), default=0.0),
"threats": threats,
"context": kwargs.get("context", "unknown"),
"original_length": len(content),
"processed_length": len(processed_content),
"execution_time": execution_time,
"detection_method": "enhanced" if self._compiler else "legacy",
"timeouts": timeouts,
"correlation_id": kwargs.get("correlation_id")
}
Data Flow¶
Pattern Loading¶
flowchart LR
YAML[YAML Files<br/>package data] --> PL[Pattern Loading<br/>• Read YAML files<br/>• Merge custom<br/>• Compile regex]
Custom[Custom Patterns<br/>runtime] --> PL
Cache[Compiled Cache<br/>memory] --> PL
style YAML fill:#bbdefb,stroke:#1565c0,stroke-width:2px
style Custom fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
style Cache fill:#ffccbc,stroke:#bf360c,stroke-width:2px
style PL fill:#fff9c4,stroke:#f57f17,stroke-width:2px
Detection Result Structure¶
{
"is_threat": bool, # True if any threat detected
"threat_score": float, # 0.0-1.0, highest score
"threats": [ # List of detected threats
{
"type": "regex", # or "semantic"
"pattern": str, # For regex matches
"score": float, # For semantic matches
"execution_time": float # Pattern execution time
}
],
"context": str, # Where content came from
"original_length": int, # Original content length
"processed_length": int, # After preprocessing
"execution_time": float, # Total detection time
"detection_method": str, # "enhanced" or "legacy"
"timeouts": list[str], # Patterns that timed out
"correlation_id": str | None # Request correlation ID
}
Integration Points¶
1. Configuration Integration¶
The engine reads configuration from SecurityConfig
:
# Key configuration fields
config.enable_penetration_detection # Enable/disable
config.detection_compiler_timeout # Pattern timeout
config.detection_max_content_length # Content limit
config.detection_preserve_attack_patterns # Preservation
config.detection_semantic_threshold # Semantic threshold
config.detection_slow_pattern_threshold # Performance threshold
2. Redis Integration (Optional)¶
When Redis is enabled: - Custom patterns can be stored/retrieved - Performance metrics can be aggregated - Pattern effectiveness can be tracked
3. Agent Integration (Optional)¶
When Agent is enabled: - Detection events are sent with full context - Performance metrics are reported - Pattern effectiveness is tracked
Security Considerations¶
1. ReDoS Prevention¶
The engine prevents Regular Expression Denial of Service through:
# In PatternCompiler.create_safe_matcher()
async def safe_matcher(content: str) -> dict[str, Any] | None:
try:
start = time.time()
match = await asyncio.wait_for(
asyncio.to_thread(pattern.search, content),
timeout=self.timeout
)
return {
"match": match,
"execution_time": time.time() - start
}
except asyncio.TimeoutError:
return {"timeout": True}
2. Resource Management¶
- Memory: Content preprocessing limits input size
- CPU: Timeout protection prevents excessive CPU usage
- Concurrency: Thread pool executor prevents blocking
- Caching: Bounded caches prevent memory leaks
3. Error Isolation¶
Each component handles errors independently: - Pattern compilation errors don't crash the system - Timeout errors are logged but don't stop detection - Component initialization failures fall back gracefully
Performance Characteristics¶
Latency Impact¶
Component | Typical Latency | Max Latency |
---|---|---|
Preprocessing | < 1ms | 5ms |
Pattern Matching (per pattern) | < 0.1ms | timeout value |
Semantic Analysis | 1-5ms | 10ms |
Total Detection | 5-20ms | 50ms |
Memory Usage¶
- Pattern storage: ~100KB for default patterns
- Compiled pattern cache: ~1MB
- Performance history: ~500KB (configurable)
- Total overhead: ~2-5MB per instance
Monitoring and Debugging¶
Performance Monitoring¶
# Get performance statistics
stats = await sus_patterns_handler.get_performance_stats()
# Example output
{
"slow_patterns": [...], # Patterns exceeding threshold
"problematic_patterns": [...], # Patterns with issues
"summary": {
"total_executions": 10000,
"average_time": 0.002,
"timeout_rate": 0.001,
"match_rate": 0.05
}
}
Component Status¶
# Check component status
status = await sus_patterns_handler.get_component_status()
# Example output
{
"compiler": True, # PatternCompiler active
"preprocessor": True, # ContentPreprocessor active
"semantic_analyzer": False, # Not configured
"performance_monitor": True # Always active
}
Best Practices¶
- Configuration: Start with defaults, adjust based on monitoring
- Pattern Management: Regularly review and optimize patterns
- Performance: Monitor slow patterns and remove/optimize them
- Security: Always enable timeout protection in production
- Testing: Test patterns in staging before production deployment
Limitations¶
- Pattern-Based: Relies on known attack patterns
- Context-Unaware: Doesn't understand application-specific logic
- Performance Trade-offs: More detection = higher latency
- False Positives: Legitimate content may match patterns
Future Considerations¶
While not currently implemented, potential enhancements could include:
- True machine learning models
- Distributed pattern learning
- Real-time threat intelligence feeds
- Context-aware detection