Skip to content

Detection Engine

The detection_engine module provides the core components for threat detection: pattern compilation with timeout protection, content preprocessing, semantic analysis, and performance monitoring.


PatternCompiler

class PatternCompiler:
    MAX_CACHE_SIZE = 1000

    def __init__(
        self,
        default_timeout: float = 5.0,
        max_cache_size: int = 1000,
    ):
        """
        Compile and cache regex patterns with timeout protection.
        max_cache_size is clamped to 5000.
        """

    async def compile_pattern(
        self,
        pattern: str,
        flags: int = re.IGNORECASE | re.MULTILINE,
    ) -> re.Pattern:
        """
        Compile a pattern with LRU caching.
        """

    def create_safe_matcher(
        self, pattern: str
    ) -> Callable[[str], re.Match | None]:
        """
        Return a callable that matches content against the pattern
        with timeout protection.
        """

    async def clear_cache(self) -> None:
        """
        Clear the compiled pattern cache.
        """

ContentPreprocessor

class ContentPreprocessor:
    def __init__(
        self,
        max_content_length: int = 10000,
        preserve_attack_patterns: bool = True,
        agent_handler: Any = None,
        correlation_id: str | None = None,
    ):
        """
        Preprocess request content before pattern matching.
        Truncates to max_content_length while optionally preserving
        sections that contain attack indicators.
        """

    async def preprocess(self, content: str) -> str:
        """
        Normalize and truncate content for detection.
        """

SemanticAnalyzer

class SemanticAnalyzer:
    def __init__(self) -> None:
        """
        Analyze content semantics to detect obfuscated attacks
        that evade regex patterns.
        """

    def analyze(self, content: str) -> dict[str, Any]:
        """
        Perform semantic analysis. Returns a dict with attack_probabilities
        and other analysis metadata.
        """

    def get_threat_score(
        self, analysis: dict[str, Any]
    ) -> float:
        """
        Calculate an overall threat score from analysis results.
        """

PerformanceMonitor

class PerformanceMonitor:
    def __init__(
        self,
        anomaly_threshold: float = 3.0,
        slow_pattern_threshold: float = 0.1,
        history_size: int = 1000,
        max_tracked_patterns: int = 1000,
    ):
        """
        Track pattern execution performance and detect anomalies.
        anomaly_threshold is clamped to 1.0-10.0.
        """

    async def record_metric(
        self,
        pattern: str,
        execution_time: float,
        content_length: int,
        matched: bool,
        timeout: bool = False,
        agent_handler: Any = None,
        correlation_id: str | None = None,
    ) -> None:
        """
        Record a single pattern execution metric.
        """

    def get_summary_stats(self) -> dict[str, Any]:
        """
        Return aggregate performance statistics.
        """

    def get_slow_patterns(self) -> list[dict[str, Any]]:
        """
        Return patterns exceeding the slow threshold.
        """

    def get_problematic_patterns(self) -> list[dict[str, Any]]:
        """
        Return patterns with high timeout rates or anomalous execution times.
        """

    async def remove_pattern_stats(self, pattern: str) -> None:
        """
        Remove tracked statistics for a pattern.
        """