Skip to content

Detection Engine

The detection engine (guard_core.detection_engine) provides multi-layered threat detection with four components working together: pattern compilation, content preprocessing, semantic analysis, and performance monitoring.

These components are orchestrated by the SusPatternsManager handler, which adapter developers do not call directly but may need to understand for tuning and diagnostics.

Architecture

flowchart TD
    DETECT["SusPatternsManager.detect()"]
    PREPROCESS["1. Preprocess content"]
    NORM["Normalize unicode"]
    DECODE["Decode URL + HTML"]
    NULL["Remove null bytes"]
    WHITESPACE["Normalize whitespace"]
    TRUNCATE["Truncate safely"]
    REGEX["2. Regex matching"]
    SAFE["Safe matcher with timeout"]
    PERF["Record performance metrics"]
    SEMANTIC["3. Semantic analysis"]
    PROB["Attack probability scoring"]
    OBFUSC["Obfuscation detection"]
    INJECT["Code injection risk"]
    AGG["4. Aggregate results"]

    DETECT --> PREPROCESS
    PREPROCESS --> NORM --> DECODE --> NULL --> WHITESPACE --> TRUNCATE
    TRUNCATE --> REGEX
    REGEX --> SAFE --> PERF
    PERF --> SEMANTIC
    SEMANTIC --> PROB --> OBFUSC --> INJECT
    INJECT --> AGG

PatternCompiler

guard_core.detection_engine.compiler.PatternCompiler(default_timeout=5.0, max_cache_size=1000)

Source code in guard_core/detection_engine/compiler.py
def __init__(self, default_timeout: float = 5.0, max_cache_size: int = 1000):
    self.default_timeout = default_timeout
    self.max_cache_size = min(max_cache_size, 5000)
    self._compiled_cache: dict[str, re.Pattern] = {}
    self._cache_order: list[str] = []
    self._lock = asyncio.Lock()

MAX_CACHE_SIZE = 1000 class-attribute instance-attribute

default_timeout = default_timeout instance-attribute

max_cache_size = min(max_cache_size, 5000) instance-attribute

batch_compile(patterns, validate=True) async

Source code in guard_core/detection_engine/compiler.py
async def batch_compile(
    self, patterns: list[str], validate: bool = True
) -> dict[str, re.Pattern]:
    compiled_patterns = {}
    for pattern in patterns:
        if validate:
            is_safe, reason = self.validate_pattern_safety(pattern)
            if not is_safe:
                continue
        try:
            compiled_patterns[pattern] = await self.compile_pattern(pattern)
        except re.error:
            continue
    return compiled_patterns

clear_cache() async

Source code in guard_core/detection_engine/compiler.py
async def clear_cache(self) -> None:
    async with self._lock:
        self._compiled_cache.clear()
        self._cache_order.clear()

compile_pattern(pattern, flags=re.IGNORECASE | re.MULTILINE) async

Source code in guard_core/detection_engine/compiler.py
async def compile_pattern(
    self, pattern: str, flags: int = re.IGNORECASE | re.MULTILINE
) -> re.Pattern:
    cache_key = f"{hash(pattern)}:{flags}"

    if cache_key in self._compiled_cache:
        async with self._lock:
            if cache_key in self._compiled_cache:
                self._cache_order.remove(cache_key)
                self._cache_order.append(cache_key)
                return self._compiled_cache[cache_key]

    async with self._lock:
        if cache_key not in self._compiled_cache:
            if len(self._compiled_cache) >= self.max_cache_size:
                oldest_key = self._cache_order.pop(0)
                del self._compiled_cache[oldest_key]

            self._compiled_cache[cache_key] = re.compile(pattern, flags)
            self._cache_order.append(cache_key)

        return self._compiled_cache[cache_key]

compile_pattern_sync(pattern, flags=re.IGNORECASE | re.MULTILINE)

Source code in guard_core/detection_engine/compiler.py
def compile_pattern_sync(
    self, pattern: str, flags: int = re.IGNORECASE | re.MULTILINE
) -> re.Pattern:
    return re.compile(pattern, flags)

create_safe_matcher(pattern, timeout=None)

Source code in guard_core/detection_engine/compiler.py
def create_safe_matcher(
    self, pattern: str, timeout: float | None = None
) -> Callable[[str], re.Match | None]:
    compiled = self.compile_pattern_sync(pattern)
    match_timeout = timeout or self.default_timeout

    def safe_match(text: str) -> re.Match | None:
        import concurrent.futures

        def _search() -> re.Match | None:
            return compiled.search(text)

        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_search)
            try:
                return future.result(timeout=match_timeout)
            except concurrent.futures.TimeoutError:
                future.cancel()
                return None
            except Exception:
                return None

    return safe_match

validate_pattern_safety(pattern, test_strings=None)

Source code in guard_core/detection_engine/compiler.py
def validate_pattern_safety(
    self, pattern: str, test_strings: list[str] | None = None
) -> tuple[bool, str]:
    dangerous_patterns = [
        r"\(\.\*\)\+",
        r"\(\.\+\)\+",
        r"\([^)]*\*\)\+",
        r"\([^)]*\+\)\+",
        r"(?:\.\*){2,}",
        r"(?:\.\+){2,}",
    ]

    for dangerous in dangerous_patterns:
        if re.search(dangerous, pattern):
            return False, f"Pattern contains dangerous construct: {dangerous}"

    if test_strings is None:
        test_strings = [
            "a" * 10,
            "a" * 100,
            "a" * 1000,
            "x" * 50 + "y" * 50,
            "<" * 100 + ">" * 100,
        ]

    try:
        compiled = self.compile_pattern_sync(pattern)
        import concurrent.futures

        for test_str in test_strings:
            start_time = time.time()

            def _search(text: str = test_str) -> re.Match | None:
                return compiled.search(text)

            with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
                future = executor.submit(_search)
                try:
                    future.result(timeout=0.1)
                except concurrent.futures.TimeoutError:
                    return (
                        False,
                        f"Pattern timed out on test string of length "
                        f"{len(test_str)}",
                    )

            elapsed = time.time() - start_time
            if elapsed > 0.05:
                return (
                    False,
                    f"Pattern timed out on test string of length {len(test_str)}",
                )
    except Exception as e:
        return False, f"Pattern validation failed: {str(e)}"

    return True, "Pattern appears safe"

Manages regex pattern compilation with LRU caching and ReDoS safety validation.

Constructor

PatternCompiler(default_timeout: float = 5.0, max_cache_size: int = 1000)
Parameter Description Bounds
default_timeout Timeout for safe matchers in seconds N/A
max_cache_size Maximum compiled patterns to cache Capped at 5000

Key Methods

compile_pattern(pattern, flags) -> re.Pattern (async)

Thread-safe compilation with LRU eviction. Cache key is f"{hash(pattern)}:{flags}".

compile_pattern_sync(pattern, flags) -> re.Pattern

Synchronous compilation without caching. Used internally by validators and safe matchers.

validate_pattern_safety(pattern, test_strings) -> tuple[bool, str]

Validates a pattern against ReDoS vulnerability:

  1. Checks for known dangerous constructs: (.*)+, (.+)+, nested quantifiers.
  2. Runs the pattern against test strings (default: varying lengths of 'a', 'x'+'y', '<'+'>') with a 100ms timeout per string.
  3. If any test exceeds 50ms, the pattern is flagged as unsafe.

create_safe_matcher(pattern, timeout) -> Callable[[str], Match | None]

Returns a closure that executes the regex in a thread pool with a timeout. If the match exceeds the timeout, the future is cancelled and None is returned.

safe_match = compiler.create_safe_matcher(r"<script.*?>", timeout=2.0)
result = safe_match(user_input)  # None if timed out

batch_compile(patterns, validate) -> dict[str, re.Pattern] (async)

Compiles multiple patterns, optionally validating each for safety. Unsafe or invalid patterns are silently skipped.


ContentPreprocessor

guard_core.detection_engine.preprocessor.ContentPreprocessor(max_content_length=10000, preserve_attack_patterns=True, agent_handler=None, correlation_id=None)

Source code in guard_core/detection_engine/preprocessor.py
def __init__(
    self,
    max_content_length: int = 10000,
    preserve_attack_patterns: bool = True,
    agent_handler: Any = None,
    correlation_id: str | None = None,
):
    self.max_content_length = max_content_length
    self.preserve_attack_patterns = preserve_attack_patterns
    self.agent_handler = agent_handler
    self.correlation_id = correlation_id

    self.attack_indicators = [
        r"<script",
        r"javascript:",
        r"on\w+=",
        r"SELECT\s+.{0,50}?\s+FROM",
        r"UNION\s+SELECT",
        r"\.\./",
        r"eval\s*\(",
        r"exec\s*\(",
        r"system\s*\(",
        r"<?php",
        r"<%",
        r"{{",
        r"{%",
        r"<iframe",
        r"<object",
        r"<embed",
        r"onerror\s*=",
        r"onload\s*=",
        r"\$\{",
        r"\\x[0-9a-fA-F]{2}",
        r"%[0-9a-fA-F]{2}",
    ]

    self.compiled_indicators = [
        re.compile(pattern, re.IGNORECASE) for pattern in self.attack_indicators
    ]

agent_handler = agent_handler instance-attribute

attack_indicators = ['<script', 'javascript:', 'on\\w+=', 'SELECT\\s+.{0,50}?\\s+FROM', 'UNION\\s+SELECT', '\\.\\./', 'eval\\s*\\(', 'exec\\s*\\(', 'system\\s*\\(', '<?php', '<%', '{{', '{%', '<iframe', '<object', '<embed', 'onerror\\s*=', 'onload\\s*=', '\\$\\{', '\\\\x[0-9a-fA-F]{2}', '%[0-9a-fA-F]{2}'] instance-attribute

compiled_indicators = [(re.compile(pattern, re.IGNORECASE)) for pattern in (self.attack_indicators)] instance-attribute

correlation_id = correlation_id instance-attribute

max_content_length = max_content_length instance-attribute

preserve_attack_patterns = preserve_attack_patterns instance-attribute

decode_common_encodings(content) async

Source code in guard_core/detection_engine/preprocessor.py
async def decode_common_encodings(self, content: str) -> str:
    max_decode_iterations = 3
    iterations = 0

    while iterations < max_decode_iterations:
        original = content

        try:
            import urllib.parse

            decoded = urllib.parse.unquote(content, errors="ignore")
            if decoded != content:
                content = decoded
        except Exception as e:
            await self._send_preprocessor_event(
                event_type="decoding_error",
                action_taken="decode_failed",
                reason="Failed to URL decode content",
                error=str(e),
                error_type="url_decode",
            )

        try:
            import html

            decoded = html.unescape(content)
            if decoded != content:
                content = decoded
        except Exception as e:
            await self._send_preprocessor_event(
                event_type="decoding_error",
                action_taken="decode_failed",
                reason="Failed to HTML decode content",
                error=str(e),
                error_type="html_decode",
            )

        if content == original:
            break

        iterations += 1

    return content

extract_attack_regions(content)

Source code in guard_core/detection_engine/preprocessor.py
def extract_attack_regions(self, content: str) -> list[tuple[int, int]]:
    max_regions = min(100, self.max_content_length // 100)
    regions = []

    for indicator in self.compiled_indicators:
        import concurrent.futures

        def _find_all(pattern: re.Pattern, text: str) -> list[tuple[int, int]]:
            found: list[tuple[int, int]] = []
            for match in pattern.finditer(text):
                if len(found) >= max_regions:
                    break
                start = max(0, match.start() - 100)
                end = min(len(text), match.end() + 100)
                found.append((start, end))
            return found

        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_find_all, indicator, content)
            try:
                indicator_regions = future.result(timeout=0.5)
                regions.extend(indicator_regions)
            except concurrent.futures.TimeoutError:
                continue

        if len(regions) >= max_regions:
            break

    if regions:
        regions.sort()
        merged = [regions[0]]
        for start, end in regions[1:]:
            if start <= merged[-1][1]:
                merged[-1] = (merged[-1][0], max(merged[-1][1], end))
            else:
                merged.append((start, end))
        return merged[:max_regions]

    return []

normalize_unicode(content)

Source code in guard_core/detection_engine/preprocessor.py
def normalize_unicode(self, content: str) -> str:
    normalized = unicodedata.normalize("NFKC", content)

    lookalikes = {
        "\u2044": "/",
        "\uff0f": "/",
        "\u29f8": "/",
        "\u0130": "I",
        "\u0131": "i",
        "\u200b": "",
        "\u200c": "",
        "\u200d": "",
        "\ufeff": "",
        "\u00ad": "",
        "\u034f": "",
        "\u180e": "",
        "\u2028": "\n",
        "\u2029": "\n",
        "\ue000": "",
        "\ufff0": "",
        "\u01c0": "|",
        "\u037e": ";",
        "\u2215": "/",
        "\u2216": "\\",
        "\uff1c": "<",
        "\uff1e": ">",
    }

    for char, replacement in lookalikes.items():
        normalized = normalized.replace(char, replacement)

    return normalized

preprocess(content) async

Source code in guard_core/detection_engine/preprocessor.py
async def preprocess(self, content: str) -> str:
    if not content:
        return ""

    content = self.normalize_unicode(content)
    content = await self.decode_common_encodings(content)
    content = self.remove_null_bytes(content)
    content = self.remove_excessive_whitespace(content)
    content = self.truncate_safely(content)

    return content

preprocess_batch(contents) async

Source code in guard_core/detection_engine/preprocessor.py
async def preprocess_batch(self, contents: list[str]) -> list[str]:
    return [await self.preprocess(content) for content in contents]

remove_excessive_whitespace(content)

Source code in guard_core/detection_engine/preprocessor.py
def remove_excessive_whitespace(self, content: str) -> str:
    content = re.sub(r"\s+", " ", content)
    content = content.strip()
    return content

remove_null_bytes(content)

Source code in guard_core/detection_engine/preprocessor.py
def remove_null_bytes(self, content: str) -> str:
    content = content.replace("\x00", "")

    control_chars = "".join(chr(i) for i in range(32) if i not in (9, 10, 13))
    translator = str.maketrans("", "", control_chars)
    return content.translate(translator)

truncate_safely(content)

Source code in guard_core/detection_engine/preprocessor.py
def truncate_safely(self, content: str) -> str:
    if len(content) <= self.max_content_length:
        return content

    if not self.preserve_attack_patterns:
        return content[: self.max_content_length]

    attack_regions = self.extract_attack_regions(content)

    if not attack_regions:
        return content[: self.max_content_length]

    attack_length = sum(end - start for start, end in attack_regions)

    if attack_length >= self.max_content_length:
        return self._extract_and_concatenate_attack_regions(content, attack_regions)

    return self._build_result_with_attack_regions_and_context(
        content, attack_regions
    )

Normalizes and sanitizes input before pattern matching.

Constructor

ContentPreprocessor(
    max_content_length: int = 10000,
    preserve_attack_patterns: bool = True,
    agent_handler: Any = None,
    correlation_id: str | None = None,
)

Preprocessing Pipeline

The preprocess() method runs five stages in order:

Stage Method Purpose
Unicode normalization normalize_unicode() NFKC normalization + lookalike character replacement
Encoding detection decode_common_encodings() URL decode + HTML entity decode (up to 3 iterations)
Null byte removal remove_null_bytes() Strips \x00 and control characters except tab/newline/CR
Whitespace normalization remove_excessive_whitespace() Collapses multiple spaces, strips leading/trailing
Safe truncation truncate_safely() Truncates to max_content_length preserving attack regions

Attack-Preserving Truncation

When content exceeds max_content_length and preserve_attack_patterns is True:

  1. extract_attack_regions() scans for 21 attack indicator patterns (e.g., <script, SELECT ... FROM, eval(, ../).
  2. Regions around matches (100 characters of context on each side) are extracted.
  3. Overlapping regions are merged.
  4. Attack regions are included first, then non-attack content fills remaining space.

This ensures that truncated content still contains the attack patterns for detection.

Unicode Lookalike Map

The preprocessor replaces over 20 Unicode characters used for evasion:

Unicode Replacement Purpose
\u2044 / Fraction slash evasion
\uff0f / Fullwidth solidus
\u200b (empty) Zero-width space
\uff1c < Fullwidth less-than
\uff1e > Fullwidth greater-than
\u037e ; Greek question mark

SemanticAnalyzer

guard_core.detection_engine.semantic.SemanticAnalyzer()

Source code in guard_core/detection_engine/semantic.py
def __init__(self) -> None:
    self.attack_keywords = {
        "xss": {
            "script",
            "javascript",
            "onerror",
            "onload",
            "onclick",
            "onmouseover",
            "alert",
            "eval",
            "document",
            "cookie",
            "window",
            "location",
        },
        "sql": {
            "select",
            "union",
            "insert",
            "update",
            "delete",
            "drop",
            "from",
            "where",
            "order",
            "group",
            "having",
            "concat",
            "substring",
            "database",
            "table",
            "column",
        },
        "command": {
            "exec",
            "system",
            "shell",
            "cmd",
            "bash",
            "powershell",
            "wget",
            "curl",
            "nc",
            "netcat",
            "chmod",
            "chown",
            "sudo",
            "passwd",
        },
        "path": {"etc", "passwd", "shadow", "hosts", "proc", "boot", "win", "ini"},
        "template": {
            "render",
            "template",
            "jinja",
            "mustache",
            "handlebars",
            "ejs",
            "pug",
            "twig",
        },
    }

    self.suspicious_chars = {
        "brackets": r"[<>{}()\[\]]",
        "quotes": r"['\"`]",
        "slashes": r"[/\\]",
        "special": r"[;&|$]",
        "wildcards": r"[*?]",
    }

    self.attack_structures = {
        "tag_like": r"<[^>]+>",
        "function_call": r"\w+\s*\([^)]*\)",
        "command_chain": r"[;&|]{1,2}",
        "path_traversal": r"\.{2,}[/\\]",
        "url_pattern": r"[a-z]+://",
    }

attack_keywords = {'xss': {'script', 'javascript', 'onerror', 'onload', 'onclick', 'onmouseover', 'alert', 'eval', 'document', 'cookie', 'window', 'location'}, 'sql': {'select', 'union', 'insert', 'update', 'delete', 'drop', 'from', 'where', 'order', 'group', 'having', 'concat', 'substring', 'database', 'table', 'column'}, 'command': {'exec', 'system', 'shell', 'cmd', 'bash', 'powershell', 'wget', 'curl', 'nc', 'netcat', 'chmod', 'chown', 'sudo', 'passwd'}, 'path': {'etc', 'passwd', 'shadow', 'hosts', 'proc', 'boot', 'win', 'ini'}, 'template': {'render', 'template', 'jinja', 'mustache', 'handlebars', 'ejs', 'pug', 'twig'}} instance-attribute

attack_structures = {'tag_like': '<[^>]+>', 'function_call': '\\w+\\s*\\([^)]*\\)', 'command_chain': '[;&|]{1,2}', 'path_traversal': '\\.{2,}[/\\\\]', 'url_pattern': '[a-z]+://'} instance-attribute

suspicious_chars = {'brackets': '[<>{}()\\[\\]]', 'quotes': '[\'\\"`]', 'slashes': '[/\\\\]', 'special': '[;&|$]', 'wildcards': '[*?]'} instance-attribute

analyze(content)

Source code in guard_core/detection_engine/semantic.py
def analyze(self, content: str) -> dict[str, Any]:
    return {
        "attack_probabilities": self.analyze_attack_probability(content),
        "entropy": self.calculate_entropy(content),
        "encoding_layers": self.detect_encoding_layers(content),
        "is_obfuscated": self.detect_obfuscation(content),
        "suspicious_patterns": self.extract_suspicious_patterns(content),
        "code_injection_risk": self.analyze_code_injection_risk(content),
        "token_count": len(self.extract_tokens(content)),
    }

analyze_attack_probability(content)

Source code in guard_core/detection_engine/semantic.py
def analyze_attack_probability(self, content: str) -> dict[str, float]:
    tokens = self.extract_tokens(content)
    token_set = set(tokens)
    probabilities = {}

    for attack_type, keywords in self.attack_keywords.items():
        base_score = self._calculate_base_score(token_set, keywords)
        pattern_boost = self._get_structural_pattern_boost(attack_type, content)
        score = base_score + pattern_boost
        probabilities[attack_type] = min(score, 1.0)

    return probabilities

analyze_code_injection_risk(content)

Source code in guard_core/detection_engine/semantic.py
def analyze_code_injection_risk(self, content: str) -> float:
    risk_score = 0.0
    risk_score += self._check_code_pattern_risks(content)
    risk_score += self._check_ast_parsing_risk(content)
    risk_score += self._check_injection_keywords(content)
    return min(risk_score, 1.0)

calculate_entropy(content)

Source code in guard_core/detection_engine/semantic.py
def calculate_entropy(self, content: str) -> float:
    if not content:
        return 0.0

    MAX_ENTROPY_LENGTH = 10000
    if len(content) > MAX_ENTROPY_LENGTH:
        content = content[:MAX_ENTROPY_LENGTH]

    char_counts = Counter(content)
    length = len(content)

    import math

    entropy = 0.0
    for count in char_counts.values():
        probability = count / length
        if probability > 0:
            entropy -= probability * math.log2(probability)

    return entropy

detect_encoding_layers(content)

Source code in guard_core/detection_engine/semantic.py
def detect_encoding_layers(self, content: str) -> int:
    MAX_SCAN_LENGTH = 10000
    if len(content) > MAX_SCAN_LENGTH:
        content = content[:MAX_SCAN_LENGTH]

    layers = 0

    if re.search(r"%[0-9a-fA-F]{2}", content):
        layers += 1

    if re.search(r"[A-Za-z0-9+/]{4,}={0,2}", content):
        layers += 1

    if re.search(r"(?:0x)?[0-9a-fA-F]{4,}", content):
        layers += 1

    if re.search(r"\\u[0-9a-fA-F]{4}", content):
        layers += 1

    if re.search(r"&[#\w]+;", content):
        layers += 1

    return layers

detect_obfuscation(content)

Source code in guard_core/detection_engine/semantic.py
def detect_obfuscation(self, content: str) -> bool:
    if self.calculate_entropy(content) > 4.5:
        return True

    if self.detect_encoding_layers(content) > 2:
        return True

    special_char_ratio = len(re.findall(r"[^a-zA-Z0-9\s]", content)) / max(
        len(content), 1
    )
    if special_char_ratio > 0.4:
        return True

    if re.search(r"\S{100,}", content):
        return True

    return False

extract_suspicious_patterns(content)

Source code in guard_core/detection_engine/semantic.py
def extract_suspicious_patterns(self, content: str) -> list[dict[str, Any]]:
    patterns = []

    for name, pattern in self.attack_structures.items():
        for match in re.finditer(pattern, content, re.IGNORECASE):
            context_start = max(0, match.start() - 20)
            context_end = min(len(content), match.end() + 20)
            patterns.append(
                {
                    "type": name,
                    "pattern": match.group(),
                    "position": match.start(),
                    "context": content[context_start:context_end],
                }
            )

    return patterns

extract_tokens(content)

Source code in guard_core/detection_engine/semantic.py
def extract_tokens(self, content: str) -> list[str]:
    MAX_CONTENT_LENGTH = 50000
    if len(content) > MAX_CONTENT_LENGTH:
        content = content[:MAX_CONTENT_LENGTH]

    content = re.sub(r"\s+", " ", content)

    MAX_TOKENS = 1000
    tokens = re.findall(r"\b\w+\b", content.lower())[:MAX_TOKENS]

    special_patterns = []
    import concurrent.futures

    for _, pattern in self.attack_structures.items():

        def _find_pattern(p: str, c: str) -> list[str]:
            return re.findall(p, c, re.IGNORECASE)[:10]

        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_find_pattern, pattern, content)
            try:
                matches = future.result(timeout=0.1)
                special_patterns.extend(matches)
            except concurrent.futures.TimeoutError:
                continue

        if len(special_patterns) >= 50:
            break

    return (tokens + special_patterns)[:MAX_TOKENS]

get_threat_score(analysis_results)

Source code in guard_core/detection_engine/semantic.py
def get_threat_score(self, analysis_results: dict[str, Any]) -> float:
    score = 0.0

    attack_probs = analysis_results.get("attack_probabilities", {})
    if attack_probs:
        max_prob = max(attack_probs.values())
        score += max_prob * 0.3

    if analysis_results.get("is_obfuscated", False):
        score += 0.2

    encoding_layers = analysis_results.get("encoding_layers", 0)
    if encoding_layers > 0:
        score += min(encoding_layers * 0.1, 0.2)

    injection_risk = analysis_results.get("code_injection_risk", 0.0)
    score += injection_risk * 0.2

    patterns = analysis_results.get("suspicious_patterns", [])
    if patterns:
        score += min(len(patterns) * 0.05, 0.1)

    return float(min(score, 1.0))

Performs structural and statistical analysis of content to detect attacks that evade pattern matching.

Attack Probability Analysis

def analyze_attack_probability(self, content: str) -> dict[str, float]

Returns a dictionary mapping attack types to probability scores (0.0 - 1.0):

Attack Type Keywords Checked Structural Boost
xss script, javascript, onerror, onload, alert, eval, document... <...> tags
sql select, union, insert, update, delete, drop, from, where... SQL keywords
command exec, system, shell, cmd, bash, wget, curl, sudo... ;&| operators
path etc, passwd, shadow, hosts, proc, boot, win, ini ../ traversal
template render, template, jinja, mustache, handlebars... N/A

The score is computed as: min(keyword_match_ratio + structural_boost, 1.0).

Entropy Calculation

def calculate_entropy(self, content: str) -> float

Shannon entropy of the character distribution. High entropy (> 4.5) indicates potential obfuscation or encoded payloads.

Obfuscation Detection

detect_obfuscation() returns True when any of:

  • Entropy > 4.5
  • More than 2 encoding layers detected
  • Special character ratio > 40%
  • Contiguous non-space run > 100 characters

Code Injection Risk

analyze_code_injection_risk() scores (0.0 - 1.0) based on:

  • Code-like patterns ({}, function calls, variable references)
  • AST parseability (attempts ast.parse(content, mode="eval") with 100ms timeout)
  • Injection keywords (eval, exec, compile, __import__, globals, locals)

Threat Score

def get_threat_score(self, analysis_results: dict) -> float

Aggregates all analysis results into a single 0.0 - 1.0 score:

Component Weight
Max attack probability 30%
Obfuscation detected 20%
Encoding layers 10-20% (min of layers *10%, 20%)
Code injection risk 20%
Suspicious patterns 5-10% (min of count* 5%, 10%)

PerformanceMonitor

guard_core.detection_engine.monitor.PerformanceMonitor(anomaly_threshold=3.0, slow_pattern_threshold=0.1, history_size=1000, max_tracked_patterns=1000)

Source code in guard_core/detection_engine/monitor.py
def __init__(
    self,
    anomaly_threshold: float = 3.0,
    slow_pattern_threshold: float = 0.1,
    history_size: int = 1000,
    max_tracked_patterns: int = 1000,
):
    self.anomaly_threshold = max(1.0, min(10.0, float(anomaly_threshold)))
    self.slow_pattern_threshold = max(
        0.01, min(10.0, float(slow_pattern_threshold))
    )
    self.history_size = max(100, min(10000, int(history_size)))
    self.max_tracked_patterns = max(100, min(5000, int(max_tracked_patterns)))

    self.pattern_stats: dict[str, PatternStats] = {}
    self.recent_metrics: deque[PerformanceMetric] = deque(maxlen=history_size)
    self.anomaly_callbacks: list[Any] = []
    self._lock = asyncio.Lock()

anomaly_callbacks = [] instance-attribute

anomaly_threshold = max(1.0, min(10.0, float(anomaly_threshold))) instance-attribute

history_size = max(100, min(10000, int(history_size))) instance-attribute

max_tracked_patterns = max(100, min(5000, int(max_tracked_patterns))) instance-attribute

pattern_stats = {} instance-attribute

recent_metrics = deque(maxlen=history_size) instance-attribute

slow_pattern_threshold = max(0.01, min(10.0, float(slow_pattern_threshold))) instance-attribute

clear_stats() async

Source code in guard_core/detection_engine/monitor.py
async def clear_stats(self) -> None:
    async with self._lock:
        self.pattern_stats.clear()
        self.recent_metrics.clear()

get_pattern_report(pattern)

Source code in guard_core/detection_engine/monitor.py
def get_pattern_report(self, pattern: str) -> dict[str, Any] | None:
    MAX_PATTERN_LENGTH = 100
    if len(pattern) > MAX_PATTERN_LENGTH:
        pattern = pattern[:MAX_PATTERN_LENGTH] + "...[truncated]"

    stats = self.pattern_stats.get(pattern)
    if not stats:
        return None

    safe_pattern = pattern[:50] + "..." if len(pattern) > 50 else pattern

    return {
        "pattern": safe_pattern,
        "pattern_hash": str(hash(pattern))[:8],
        "total_executions": stats.total_executions,
        "total_matches": stats.total_matches,
        "total_timeouts": stats.total_timeouts,
        "match_rate": stats.total_matches / max(stats.total_executions, 1),
        "timeout_rate": stats.total_timeouts / max(stats.total_executions, 1),
        "avg_execution_time": round(stats.avg_execution_time, 4),
        "max_execution_time": round(stats.max_execution_time, 4),
        "min_execution_time": round(
            stats.min_execution_time
            if stats.min_execution_time != float("inf")
            else 0.0,
            4,
        ),
    }

get_problematic_patterns()

Source code in guard_core/detection_engine/monitor.py
def get_problematic_patterns(self) -> list[dict[str, Any]]:
    problematic = []

    for pattern, stats in self.pattern_stats.items():
        if stats.total_executions == 0:
            continue

        timeout_rate = stats.total_timeouts / stats.total_executions

        if timeout_rate > 0.1:
            report = self.get_pattern_report(pattern)
            if report:
                report["issue"] = "high_timeout_rate"
                problematic.append(report)

        elif stats.avg_execution_time > self.slow_pattern_threshold:
            report = self.get_pattern_report(pattern)
            if report:
                report["issue"] = "consistently_slow"
                problematic.append(report)

    return problematic

get_slow_patterns(limit=10)

Source code in guard_core/detection_engine/monitor.py
def get_slow_patterns(self, limit: int = 10) -> list[dict[str, Any]]:
    patterns_with_times = [
        (stats.avg_execution_time, pattern)
        for pattern, stats in self.pattern_stats.items()
        if stats.recent_times
    ]

    patterns_with_times.sort(reverse=True)

    reports = []
    for _, pattern in patterns_with_times[:limit]:
        report = self.get_pattern_report(pattern)
        if report is not None:
            reports.append(report)
    return reports

get_summary_stats()

Source code in guard_core/detection_engine/monitor.py
def get_summary_stats(self) -> dict[str, Any]:
    if not self.recent_metrics:
        return self._get_empty_summary()

    recent_times, timeouts, matches = self._extract_metric_components()
    return self._build_summary_dict(recent_times, timeouts, matches)

record_metric(pattern, execution_time, content_length, matched, timeout=False, agent_handler=None, correlation_id=None) async

Source code in guard_core/detection_engine/monitor.py
async def record_metric(
    self,
    pattern: str,
    execution_time: float,
    content_length: int,
    matched: bool,
    timeout: bool = False,
    agent_handler: Any = None,
    correlation_id: str | None = None,
) -> None:
    MAX_PATTERN_LENGTH = 100
    if len(pattern) > MAX_PATTERN_LENGTH:
        pattern = pattern[:MAX_PATTERN_LENGTH] + "...[truncated]"

    execution_time = max(0.0, float(execution_time))
    content_length = max(0, int(content_length))

    metric = PerformanceMetric(
        pattern=pattern,
        execution_time=execution_time,
        content_length=content_length,
        timestamp=datetime.now(timezone.utc),
        matched=matched,
        timeout=timeout,
    )

    async with self._lock:
        self.recent_metrics.append(metric)

        if pattern not in self.pattern_stats:
            if len(self.pattern_stats) >= self.max_tracked_patterns:
                oldest_pattern = next(iter(self.pattern_stats))
                del self.pattern_stats[oldest_pattern]
            self.pattern_stats[pattern] = PatternStats(pattern=pattern)

        stats = self.pattern_stats[pattern]
        stats.total_executions += 1
        if matched:
            stats.total_matches += 1
        if timeout:
            stats.total_timeouts += 1

        if not timeout:
            stats.recent_times.append(execution_time)
            stats.max_execution_time = max(stats.max_execution_time, execution_time)
            stats.min_execution_time = min(stats.min_execution_time, execution_time)
            if stats.recent_times:
                stats.avg_execution_time = mean(stats.recent_times)

    await self._check_anomalies(metric, agent_handler, correlation_id)

register_anomaly_callback(callback)

Source code in guard_core/detection_engine/monitor.py
def register_anomaly_callback(self, callback: Any) -> None:
    self.anomaly_callbacks.append(callback)

remove_pattern_stats(pattern) async

Source code in guard_core/detection_engine/monitor.py
async def remove_pattern_stats(self, pattern: str) -> None:
    async with self._lock:
        if pattern in self.pattern_stats:
            del self.pattern_stats[pattern]

Tracks pattern execution performance and detects anomalies.

Constructor

PerformanceMonitor(
    anomaly_threshold: float = 3.0,
    slow_pattern_threshold: float = 0.1,
    history_size: int = 1000,
    max_tracked_patterns: int = 1000,
)
Parameter Description Bounds
anomaly_threshold Z-score threshold for statistical anomalies 1.0 - 10.0
slow_pattern_threshold Seconds to consider a pattern slow 0.01 - 1.0
history_size Recent metrics to retain 100 - 10,000
max_tracked_patterns Maximum unique patterns to track 100 - 5,000

Metric Recording

Each pattern execution records a PerformanceMetric dataclass:

@dataclass
class PerformanceMetric:
    pattern: str
    execution_time: float
    content_length: int
    timestamp: datetime
    matched: bool
    timeout: bool = False

Anomaly Detection

Three types of anomalies are detected after each metric recording:

Anomaly Type Condition
timeout The pattern execution timed out
slow_execution Execution time > slow_pattern_threshold (without timeout)
statistical_anomaly Z-score of execution time > anomaly_threshold (needs >= 10 samples)

Detected anomalies are sent as events to the agent handler and forwarded to registered callbacks.

Diagnostics

Method Returns
get_pattern_report(p) Stats for a specific pattern (executions, matches, timeouts, avg/max/min time)
get_slow_patterns(n) Top N slowest patterns by average execution time
get_problematic_patterns() Patterns with >10% timeout rate or consistently slow execution
get_summary_stats() Overall summary (total executions, avg time, timeout rate, match rate)

Callback Registration

monitor.register_anomaly_callback(lambda anomaly: print(anomaly))

Callbacks receive a sanitized anomaly dictionary with truncated pattern strings.