Detection Engine Components¶
This document provides detailed information about each component of the FastAPI Guard Detection Engine, including their actual implementation, capabilities, and usage.
Component Overview¶
The Detection Engine consists of four main components, each initialized conditionally based on configuration:
- ContentPreprocessor - Truncates content while preserving attack patterns
- PatternCompiler - Provides timeout-protected pattern matching
- SemanticAnalyzer - Heuristic-based attack detection
- PerformanceMonitor - Tracks execution metrics
ContentPreprocessor¶
Located in guard/detection_engine/preprocessor.py
Purpose¶
Intelligently truncates content to prevent excessive memory usage while ensuring potential attack patterns are preserved.
Implementation¶
class ContentPreprocessor:
"""Intelligent content preprocessing with attack pattern preservation."""
def __init__(self, config: SecurityConfig):
self.max_length = config.detection_max_content_length
self.preserve_patterns = config.detection_preserve_attack_patterns
Key Methods¶
preprocess(content: str) -> str
¶
Preprocesses content with the following logic:
- If content length ≤ max_length, returns unchanged
- If preserve_patterns is False, simple truncation
- If preserve_patterns is True:
- Scans for attack patterns in a sliding window
- Preserves sections containing potential attacks
- Returns truncated content with preserved attack regions
Attack Pattern Preservation¶
The preprocessor looks for indicators like: - SQL keywords: SELECT, UNION, INSERT, DELETE, etc. - Script tags and JavaScript events - Path traversal patterns: ../, ..\ - Command injection indicators - Common encoding patterns
Example Usage¶
preprocessor = ContentPreprocessor(config)
processed = preprocessor.preprocess(long_content)
# Result: Truncated content with attack patterns preserved
PatternCompiler¶
Located in guard/detection_engine/compiler.py
Purpose¶
Provides safe pattern compilation and execution with timeout protection against ReDoS attacks.
Implementation¶
class PatternCompiler:
"""Pattern compilation with timeout protection."""
def __init__(self, config: SecurityConfig):
self.timeout = config.detection_compiler_timeout
self._compiled_cache: dict[str, re.Pattern | None] = {}
Key Methods¶
compile_pattern(pattern: str) -> re.Pattern | None
¶
Compiles regex patterns with error handling: - Caches compiled patterns for performance - Returns None for invalid patterns - Logs compilation errors
create_safe_matcher(pattern: str, compiled_pattern: re.Pattern) -> Callable
¶
Creates a timeout-protected matcher function:
async def safe_matcher(content: str) -> dict[str, Any] | None:
try:
match = await asyncio.wait_for(
asyncio.to_thread(compiled_pattern.search, content),
timeout=self.timeout
)
return {"match": match} if match else None
except asyncio.TimeoutError:
return {"timeout": True}
Timeout Protection¶
- Uses
asyncio.wait_for()
with configurable timeout - Runs pattern matching in thread pool to prevent blocking
- Returns timeout indicator instead of hanging
SemanticAnalyzer¶
Located in guard/detection_engine/semantic.py
Purpose¶
Provides heuristic-based detection of obfuscated attacks that might bypass regex patterns.
Implementation¶
class SemanticAnalyzer:
"""Heuristic-based semantic analysis for attack detection."""
def __init__(self, config: SecurityConfig):
self.threshold = config.detection_semantic_threshold
self.token_patterns = self._initialize_patterns()
Key Methods¶
analyze_content(content: str) -> dict[str, Any]
¶
Performs multi-stage analysis:
- Token Extraction: Breaks content into meaningful tokens
- Pattern Analysis: Looks for attack-specific patterns
- Context Evaluation: Considers token relationships
- Scoring: Calculates threat probability
Attack Detection Heuristics¶
The analyzer detects:
- SQL Injection:
- Keywords: SELECT, UNION, WHERE, OR, AND
- Operators: =, --, /*
-
Functions: concat(), char()
-
XSS Attacks:
- Tags: