Skip to content

Detection Engine

guard_core.detection_engine

ContentPreprocessor(max_content_length=10000, preserve_attack_patterns=True, agent_handler=None, correlation_id=None)

Source code in guard_core/detection_engine/preprocessor.py
def __init__(
    self,
    max_content_length: int = 10000,
    preserve_attack_patterns: bool = True,
    agent_handler: Any = None,
    correlation_id: str | None = None,
):
    self.max_content_length = max_content_length
    self.preserve_attack_patterns = preserve_attack_patterns
    self.agent_handler = agent_handler
    self.correlation_id = correlation_id

    self.attack_indicators = [
        r"<script",
        r"javascript:",
        r"on\w+=",
        r"SELECT\s+.{0,50}?\s+FROM",
        r"UNION\s+SELECT",
        r"\.\./",
        r"eval\s*\(",
        r"exec\s*\(",
        r"system\s*\(",
        r"<?php",
        r"<%",
        r"{{",
        r"{%",
        r"<iframe",
        r"<object",
        r"<embed",
        r"onerror\s*=",
        r"onload\s*=",
        r"\$\{",
        r"\\x[0-9a-fA-F]{2}",
        r"%[0-9a-fA-F]{2}",
    ]

    self.compiled_indicators = [
        re.compile(pattern, re.IGNORECASE) for pattern in self.attack_indicators
    ]

agent_handler = agent_handler instance-attribute

attack_indicators = ['<script', 'javascript:', 'on\\w+=', 'SELECT\\s+.{0,50}?\\s+FROM', 'UNION\\s+SELECT', '\\.\\./', 'eval\\s*\\(', 'exec\\s*\\(', 'system\\s*\\(', '<?php', '<%', '{{', '{%', '<iframe', '<object', '<embed', 'onerror\\s*=', 'onload\\s*=', '\\$\\{', '\\\\x[0-9a-fA-F]{2}', '%[0-9a-fA-F]{2}'] instance-attribute

compiled_indicators = [(re.compile(pattern, re.IGNORECASE)) for pattern in (self.attack_indicators)] instance-attribute

correlation_id = correlation_id instance-attribute

max_content_length = max_content_length instance-attribute

preserve_attack_patterns = preserve_attack_patterns instance-attribute

decode_common_encodings(content) async

Source code in guard_core/detection_engine/preprocessor.py
async def decode_common_encodings(self, content: str) -> str:
    max_decode_iterations = 3
    iterations = 0

    while iterations < max_decode_iterations:
        original = content

        try:
            import urllib.parse

            decoded = urllib.parse.unquote(content, errors="ignore")
            if decoded != content:
                content = decoded
        except Exception as e:
            await self._send_preprocessor_event(
                event_type="decoding_error",
                action_taken="decode_failed",
                reason="Failed to URL decode content",
                error=str(e),
                error_type="url_decode",
            )

        try:
            import html

            decoded = html.unescape(content)
            if decoded != content:
                content = decoded
        except Exception as e:
            await self._send_preprocessor_event(
                event_type="decoding_error",
                action_taken="decode_failed",
                reason="Failed to HTML decode content",
                error=str(e),
                error_type="html_decode",
            )

        if content == original:
            break

        iterations += 1

    return content

extract_attack_regions(content)

Source code in guard_core/detection_engine/preprocessor.py
def extract_attack_regions(self, content: str) -> list[tuple[int, int]]:
    max_regions = min(100, self.max_content_length // 100)
    regions = []

    for indicator in self.compiled_indicators:
        import concurrent.futures

        def _find_all(pattern: re.Pattern, text: str) -> list[tuple[int, int]]:
            found: list[tuple[int, int]] = []
            for match in pattern.finditer(text):
                if len(found) >= max_regions:
                    break
                start = max(0, match.start() - 100)
                end = min(len(text), match.end() + 100)
                found.append((start, end))
            return found

        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_find_all, indicator, content)
            try:
                indicator_regions = future.result(timeout=0.5)
                regions.extend(indicator_regions)
            except concurrent.futures.TimeoutError:
                continue

        if len(regions) >= max_regions:
            break

    if regions:
        regions.sort()
        merged = [regions[0]]
        for start, end in regions[1:]:
            if start <= merged[-1][1]:
                merged[-1] = (merged[-1][0], max(merged[-1][1], end))
            else:
                merged.append((start, end))
        return merged[:max_regions]

    return []

normalize_unicode(content)

Source code in guard_core/detection_engine/preprocessor.py
def normalize_unicode(self, content: str) -> str:
    normalized = unicodedata.normalize("NFKC", content)

    lookalikes = {
        "\u2044": "/",
        "\uff0f": "/",
        "\u29f8": "/",
        "\u0130": "I",
        "\u0131": "i",
        "\u200b": "",
        "\u200c": "",
        "\u200d": "",
        "\ufeff": "",
        "\u00ad": "",
        "\u034f": "",
        "\u180e": "",
        "\u2028": "\n",
        "\u2029": "\n",
        "\ue000": "",
        "\ufff0": "",
        "\u01c0": "|",
        "\u037e": ";",
        "\u2215": "/",
        "\u2216": "\\",
        "\uff1c": "<",
        "\uff1e": ">",
    }

    for char, replacement in lookalikes.items():
        normalized = normalized.replace(char, replacement)

    return normalized

preprocess(content) async

Source code in guard_core/detection_engine/preprocessor.py
async def preprocess(self, content: str) -> str:
    if not content:
        return ""

    content = self.normalize_unicode(content)
    content = await self.decode_common_encodings(content)
    content = self.remove_null_bytes(content)
    content = self.remove_excessive_whitespace(content)
    content = self.truncate_safely(content)

    return content

preprocess_batch(contents) async

Source code in guard_core/detection_engine/preprocessor.py
async def preprocess_batch(self, contents: list[str]) -> list[str]:
    return [await self.preprocess(content) for content in contents]

remove_excessive_whitespace(content)

Source code in guard_core/detection_engine/preprocessor.py
def remove_excessive_whitespace(self, content: str) -> str:
    content = re.sub(r"\s+", " ", content)
    content = content.strip()
    return content

remove_null_bytes(content)

Source code in guard_core/detection_engine/preprocessor.py
def remove_null_bytes(self, content: str) -> str:
    content = content.replace("\x00", "")

    control_chars = "".join(chr(i) for i in range(32) if i not in (9, 10, 13))
    translator = str.maketrans("", "", control_chars)
    return content.translate(translator)

truncate_safely(content)

Source code in guard_core/detection_engine/preprocessor.py
def truncate_safely(self, content: str) -> str:
    if len(content) <= self.max_content_length:
        return content

    if not self.preserve_attack_patterns:
        return content[: self.max_content_length]

    attack_regions = self.extract_attack_regions(content)

    if not attack_regions:
        return content[: self.max_content_length]

    attack_length = sum(end - start for start, end in attack_regions)

    if attack_length >= self.max_content_length:
        return self._extract_and_concatenate_attack_regions(content, attack_regions)

    return self._build_result_with_attack_regions_and_context(
        content, attack_regions
    )

PatternCompiler(default_timeout=5.0, max_cache_size=1000)

Source code in guard_core/detection_engine/compiler.py
def __init__(self, default_timeout: float = 5.0, max_cache_size: int = 1000):
    self.default_timeout = default_timeout
    self.max_cache_size = min(max_cache_size, 5000)
    self._compiled_cache: dict[str, re.Pattern] = {}
    self._cache_order: list[str] = []
    self._lock = asyncio.Lock()

MAX_CACHE_SIZE = 1000 class-attribute instance-attribute

default_timeout = default_timeout instance-attribute

max_cache_size = min(max_cache_size, 5000) instance-attribute

batch_compile(patterns, validate=True) async

Source code in guard_core/detection_engine/compiler.py
async def batch_compile(
    self, patterns: list[str], validate: bool = True
) -> dict[str, re.Pattern]:
    compiled_patterns = {}
    for pattern in patterns:
        if validate:
            is_safe, reason = self.validate_pattern_safety(pattern)
            if not is_safe:
                continue
        try:
            compiled_patterns[pattern] = await self.compile_pattern(pattern)
        except re.error:
            continue
    return compiled_patterns

clear_cache() async

Source code in guard_core/detection_engine/compiler.py
async def clear_cache(self) -> None:
    async with self._lock:
        self._compiled_cache.clear()
        self._cache_order.clear()

compile_pattern(pattern, flags=re.IGNORECASE | re.MULTILINE) async

Source code in guard_core/detection_engine/compiler.py
async def compile_pattern(
    self, pattern: str, flags: int = re.IGNORECASE | re.MULTILINE
) -> re.Pattern:
    cache_key = f"{hash(pattern)}:{flags}"

    if cache_key in self._compiled_cache:
        async with self._lock:
            if cache_key in self._compiled_cache:
                self._cache_order.remove(cache_key)
                self._cache_order.append(cache_key)
                return self._compiled_cache[cache_key]

    async with self._lock:
        if cache_key not in self._compiled_cache:
            if len(self._compiled_cache) >= self.max_cache_size:
                oldest_key = self._cache_order.pop(0)
                del self._compiled_cache[oldest_key]

            self._compiled_cache[cache_key] = re.compile(pattern, flags)
            self._cache_order.append(cache_key)

        return self._compiled_cache[cache_key]

compile_pattern_sync(pattern, flags=re.IGNORECASE | re.MULTILINE)

Source code in guard_core/detection_engine/compiler.py
def compile_pattern_sync(
    self, pattern: str, flags: int = re.IGNORECASE | re.MULTILINE
) -> re.Pattern:
    return re.compile(pattern, flags)

create_safe_matcher(pattern, timeout=None)

Source code in guard_core/detection_engine/compiler.py
def create_safe_matcher(
    self, pattern: str, timeout: float | None = None
) -> Callable[[str], re.Match | None]:
    compiled = self.compile_pattern_sync(pattern)
    match_timeout = timeout or self.default_timeout

    def safe_match(text: str) -> re.Match | None:
        import concurrent.futures

        def _search() -> re.Match | None:
            return compiled.search(text)

        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_search)
            try:
                return future.result(timeout=match_timeout)
            except concurrent.futures.TimeoutError:
                future.cancel()
                return None
            except Exception:
                return None

    return safe_match

validate_pattern_safety(pattern, test_strings=None)

Source code in guard_core/detection_engine/compiler.py
def validate_pattern_safety(
    self, pattern: str, test_strings: list[str] | None = None
) -> tuple[bool, str]:
    dangerous_patterns = [
        r"\(\.\*\)\+",
        r"\(\.\+\)\+",
        r"\([^)]*\*\)\+",
        r"\([^)]*\+\)\+",
        r"(?:\.\*){2,}",
        r"(?:\.\+){2,}",
    ]

    for dangerous in dangerous_patterns:
        if re.search(dangerous, pattern):
            return False, f"Pattern contains dangerous construct: {dangerous}"

    if test_strings is None:
        test_strings = [
            "a" * 10,
            "a" * 100,
            "a" * 1000,
            "x" * 50 + "y" * 50,
            "<" * 100 + ">" * 100,
        ]

    try:
        compiled = self.compile_pattern_sync(pattern)
        import concurrent.futures

        for test_str in test_strings:
            start_time = time.time()

            def _search(text: str = test_str) -> re.Match | None:
                return compiled.search(text)

            with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
                future = executor.submit(_search)
                try:
                    future.result(timeout=0.1)
                except concurrent.futures.TimeoutError:
                    return (
                        False,
                        f"Pattern timed out on test string of length "
                        f"{len(test_str)}",
                    )

            elapsed = time.time() - start_time
            if elapsed > 0.05:
                return (
                    False,
                    f"Pattern timed out on test string of length {len(test_str)}",
                )
    except Exception as e:
        return False, f"Pattern validation failed: {str(e)}"

    return True, "Pattern appears safe"

PerformanceMonitor(anomaly_threshold=3.0, slow_pattern_threshold=0.1, history_size=1000, max_tracked_patterns=1000)

Source code in guard_core/detection_engine/monitor.py
def __init__(
    self,
    anomaly_threshold: float = 3.0,
    slow_pattern_threshold: float = 0.1,
    history_size: int = 1000,
    max_tracked_patterns: int = 1000,
):
    self.anomaly_threshold = max(1.0, min(10.0, float(anomaly_threshold)))
    self.slow_pattern_threshold = max(
        0.01, min(10.0, float(slow_pattern_threshold))
    )
    self.history_size = max(100, min(10000, int(history_size)))
    self.max_tracked_patterns = max(100, min(5000, int(max_tracked_patterns)))

    self.pattern_stats: dict[str, PatternStats] = {}
    self.recent_metrics: deque[PerformanceMetric] = deque(maxlen=history_size)
    self.anomaly_callbacks: list[Any] = []
    self._lock = asyncio.Lock()

anomaly_callbacks = [] instance-attribute

anomaly_threshold = max(1.0, min(10.0, float(anomaly_threshold))) instance-attribute

history_size = max(100, min(10000, int(history_size))) instance-attribute

max_tracked_patterns = max(100, min(5000, int(max_tracked_patterns))) instance-attribute

pattern_stats = {} instance-attribute

recent_metrics = deque(maxlen=history_size) instance-attribute

slow_pattern_threshold = max(0.01, min(10.0, float(slow_pattern_threshold))) instance-attribute

clear_stats() async

Source code in guard_core/detection_engine/monitor.py
async def clear_stats(self) -> None:
    async with self._lock:
        self.pattern_stats.clear()
        self.recent_metrics.clear()

get_pattern_report(pattern)

Source code in guard_core/detection_engine/monitor.py
def get_pattern_report(self, pattern: str) -> dict[str, Any] | None:
    MAX_PATTERN_LENGTH = 100
    if len(pattern) > MAX_PATTERN_LENGTH:
        pattern = pattern[:MAX_PATTERN_LENGTH] + "...[truncated]"

    stats = self.pattern_stats.get(pattern)
    if not stats:
        return None

    safe_pattern = pattern[:50] + "..." if len(pattern) > 50 else pattern

    return {
        "pattern": safe_pattern,
        "pattern_hash": str(hash(pattern))[:8],
        "total_executions": stats.total_executions,
        "total_matches": stats.total_matches,
        "total_timeouts": stats.total_timeouts,
        "match_rate": stats.total_matches / max(stats.total_executions, 1),
        "timeout_rate": stats.total_timeouts / max(stats.total_executions, 1),
        "avg_execution_time": round(stats.avg_execution_time, 4),
        "max_execution_time": round(stats.max_execution_time, 4),
        "min_execution_time": round(
            stats.min_execution_time
            if stats.min_execution_time != float("inf")
            else 0.0,
            4,
        ),
    }

get_problematic_patterns()

Source code in guard_core/detection_engine/monitor.py
def get_problematic_patterns(self) -> list[dict[str, Any]]:
    problematic = []

    for pattern, stats in self.pattern_stats.items():
        if stats.total_executions == 0:
            continue

        timeout_rate = stats.total_timeouts / stats.total_executions

        if timeout_rate > 0.1:
            report = self.get_pattern_report(pattern)
            if report:
                report["issue"] = "high_timeout_rate"
                problematic.append(report)

        elif stats.avg_execution_time > self.slow_pattern_threshold:
            report = self.get_pattern_report(pattern)
            if report:
                report["issue"] = "consistently_slow"
                problematic.append(report)

    return problematic

get_slow_patterns(limit=10)

Source code in guard_core/detection_engine/monitor.py
def get_slow_patterns(self, limit: int = 10) -> list[dict[str, Any]]:
    patterns_with_times = [
        (stats.avg_execution_time, pattern)
        for pattern, stats in self.pattern_stats.items()
        if stats.recent_times
    ]

    patterns_with_times.sort(reverse=True)

    reports = []
    for _, pattern in patterns_with_times[:limit]:
        report = self.get_pattern_report(pattern)
        if report is not None:
            reports.append(report)
    return reports

get_summary_stats()

Source code in guard_core/detection_engine/monitor.py
def get_summary_stats(self) -> dict[str, Any]:
    if not self.recent_metrics:
        return self._get_empty_summary()

    recent_times, timeouts, matches = self._extract_metric_components()
    return self._build_summary_dict(recent_times, timeouts, matches)

record_metric(pattern, execution_time, content_length, matched, timeout=False, agent_handler=None, correlation_id=None) async

Source code in guard_core/detection_engine/monitor.py
async def record_metric(
    self,
    pattern: str,
    execution_time: float,
    content_length: int,
    matched: bool,
    timeout: bool = False,
    agent_handler: Any = None,
    correlation_id: str | None = None,
) -> None:
    MAX_PATTERN_LENGTH = 100
    if len(pattern) > MAX_PATTERN_LENGTH:
        pattern = pattern[:MAX_PATTERN_LENGTH] + "...[truncated]"

    execution_time = max(0.0, float(execution_time))
    content_length = max(0, int(content_length))

    metric = PerformanceMetric(
        pattern=pattern,
        execution_time=execution_time,
        content_length=content_length,
        timestamp=datetime.now(timezone.utc),
        matched=matched,
        timeout=timeout,
    )

    async with self._lock:
        self.recent_metrics.append(metric)

        if pattern not in self.pattern_stats:
            if len(self.pattern_stats) >= self.max_tracked_patterns:
                oldest_pattern = next(iter(self.pattern_stats))
                del self.pattern_stats[oldest_pattern]
            self.pattern_stats[pattern] = PatternStats(pattern=pattern)

        stats = self.pattern_stats[pattern]
        stats.total_executions += 1
        if matched:
            stats.total_matches += 1
        if timeout:
            stats.total_timeouts += 1

        if not timeout:
            stats.recent_times.append(execution_time)
            stats.max_execution_time = max(stats.max_execution_time, execution_time)
            stats.min_execution_time = min(stats.min_execution_time, execution_time)
            if stats.recent_times:
                stats.avg_execution_time = mean(stats.recent_times)

    await self._check_anomalies(metric, agent_handler, correlation_id)

register_anomaly_callback(callback)

Source code in guard_core/detection_engine/monitor.py
def register_anomaly_callback(self, callback: Any) -> None:
    self.anomaly_callbacks.append(callback)

remove_pattern_stats(pattern) async

Source code in guard_core/detection_engine/monitor.py
async def remove_pattern_stats(self, pattern: str) -> None:
    async with self._lock:
        if pattern in self.pattern_stats:
            del self.pattern_stats[pattern]

SemanticAnalyzer()

Source code in guard_core/detection_engine/semantic.py
def __init__(self) -> None:
    self.attack_keywords = {
        "xss": {
            "script",
            "javascript",
            "onerror",
            "onload",
            "onclick",
            "onmouseover",
            "alert",
            "eval",
            "document",
            "cookie",
            "window",
            "location",
        },
        "sql": {
            "select",
            "union",
            "insert",
            "update",
            "delete",
            "drop",
            "from",
            "where",
            "order",
            "group",
            "having",
            "concat",
            "substring",
            "database",
            "table",
            "column",
        },
        "command": {
            "exec",
            "system",
            "shell",
            "cmd",
            "bash",
            "powershell",
            "wget",
            "curl",
            "nc",
            "netcat",
            "chmod",
            "chown",
            "sudo",
            "passwd",
        },
        "path": {"etc", "passwd", "shadow", "hosts", "proc", "boot", "win", "ini"},
        "template": {
            "render",
            "template",
            "jinja",
            "mustache",
            "handlebars",
            "ejs",
            "pug",
            "twig",
        },
    }

    self.suspicious_chars = {
        "brackets": r"[<>{}()\[\]]",
        "quotes": r"['\"`]",
        "slashes": r"[/\\]",
        "special": r"[;&|$]",
        "wildcards": r"[*?]",
    }

    self.attack_structures = {
        "tag_like": r"<[^>]+>",
        "function_call": r"\w+\s*\([^)]*\)",
        "command_chain": r"[;&|]{1,2}",
        "path_traversal": r"\.{2,}[/\\]",
        "url_pattern": r"[a-z]+://",
    }

attack_keywords = {'xss': {'script', 'javascript', 'onerror', 'onload', 'onclick', 'onmouseover', 'alert', 'eval', 'document', 'cookie', 'window', 'location'}, 'sql': {'select', 'union', 'insert', 'update', 'delete', 'drop', 'from', 'where', 'order', 'group', 'having', 'concat', 'substring', 'database', 'table', 'column'}, 'command': {'exec', 'system', 'shell', 'cmd', 'bash', 'powershell', 'wget', 'curl', 'nc', 'netcat', 'chmod', 'chown', 'sudo', 'passwd'}, 'path': {'etc', 'passwd', 'shadow', 'hosts', 'proc', 'boot', 'win', 'ini'}, 'template': {'render', 'template', 'jinja', 'mustache', 'handlebars', 'ejs', 'pug', 'twig'}} instance-attribute

attack_structures = {'tag_like': '<[^>]+>', 'function_call': '\\w+\\s*\\([^)]*\\)', 'command_chain': '[;&|]{1,2}', 'path_traversal': '\\.{2,}[/\\\\]', 'url_pattern': '[a-z]+://'} instance-attribute

suspicious_chars = {'brackets': '[<>{}()\\[\\]]', 'quotes': '[\'\\"`]', 'slashes': '[/\\\\]', 'special': '[;&|$]', 'wildcards': '[*?]'} instance-attribute

analyze(content)

Source code in guard_core/detection_engine/semantic.py
def analyze(self, content: str) -> dict[str, Any]:
    return {
        "attack_probabilities": self.analyze_attack_probability(content),
        "entropy": self.calculate_entropy(content),
        "encoding_layers": self.detect_encoding_layers(content),
        "is_obfuscated": self.detect_obfuscation(content),
        "suspicious_patterns": self.extract_suspicious_patterns(content),
        "code_injection_risk": self.analyze_code_injection_risk(content),
        "token_count": len(self.extract_tokens(content)),
    }

analyze_attack_probability(content)

Source code in guard_core/detection_engine/semantic.py
def analyze_attack_probability(self, content: str) -> dict[str, float]:
    tokens = self.extract_tokens(content)
    token_set = set(tokens)
    probabilities = {}

    for attack_type, keywords in self.attack_keywords.items():
        base_score = self._calculate_base_score(token_set, keywords)
        pattern_boost = self._get_structural_pattern_boost(attack_type, content)
        score = base_score + pattern_boost
        probabilities[attack_type] = min(score, 1.0)

    return probabilities

analyze_code_injection_risk(content)

Source code in guard_core/detection_engine/semantic.py
def analyze_code_injection_risk(self, content: str) -> float:
    risk_score = 0.0
    risk_score += self._check_code_pattern_risks(content)
    risk_score += self._check_ast_parsing_risk(content)
    risk_score += self._check_injection_keywords(content)
    return min(risk_score, 1.0)

calculate_entropy(content)

Source code in guard_core/detection_engine/semantic.py
def calculate_entropy(self, content: str) -> float:
    if not content:
        return 0.0

    MAX_ENTROPY_LENGTH = 10000
    if len(content) > MAX_ENTROPY_LENGTH:
        content = content[:MAX_ENTROPY_LENGTH]

    char_counts = Counter(content)
    length = len(content)

    import math

    entropy = 0.0
    for count in char_counts.values():
        probability = count / length
        if probability > 0:
            entropy -= probability * math.log2(probability)

    return entropy

detect_encoding_layers(content)

Source code in guard_core/detection_engine/semantic.py
def detect_encoding_layers(self, content: str) -> int:
    MAX_SCAN_LENGTH = 10000
    if len(content) > MAX_SCAN_LENGTH:
        content = content[:MAX_SCAN_LENGTH]

    layers = 0

    if re.search(r"%[0-9a-fA-F]{2}", content):
        layers += 1

    if re.search(r"[A-Za-z0-9+/]{4,}={0,2}", content):
        layers += 1

    if re.search(r"(?:0x)?[0-9a-fA-F]{4,}", content):
        layers += 1

    if re.search(r"\\u[0-9a-fA-F]{4}", content):
        layers += 1

    if re.search(r"&[#\w]+;", content):
        layers += 1

    return layers

detect_obfuscation(content)

Source code in guard_core/detection_engine/semantic.py
def detect_obfuscation(self, content: str) -> bool:
    if self.calculate_entropy(content) > 4.5:
        return True

    if self.detect_encoding_layers(content) > 2:
        return True

    special_char_ratio = len(re.findall(r"[^a-zA-Z0-9\s]", content)) / max(
        len(content), 1
    )
    if special_char_ratio > 0.4:
        return True

    if re.search(r"\S{100,}", content):
        return True

    return False

extract_suspicious_patterns(content)

Source code in guard_core/detection_engine/semantic.py
def extract_suspicious_patterns(self, content: str) -> list[dict[str, Any]]:
    patterns = []

    for name, pattern in self.attack_structures.items():
        for match in re.finditer(pattern, content, re.IGNORECASE):
            context_start = max(0, match.start() - 20)
            context_end = min(len(content), match.end() + 20)
            patterns.append(
                {
                    "type": name,
                    "pattern": match.group(),
                    "position": match.start(),
                    "context": content[context_start:context_end],
                }
            )

    return patterns

extract_tokens(content)

Source code in guard_core/detection_engine/semantic.py
def extract_tokens(self, content: str) -> list[str]:
    MAX_CONTENT_LENGTH = 50000
    if len(content) > MAX_CONTENT_LENGTH:
        content = content[:MAX_CONTENT_LENGTH]

    content = re.sub(r"\s+", " ", content)

    MAX_TOKENS = 1000
    tokens = re.findall(r"\b\w+\b", content.lower())[:MAX_TOKENS]

    special_patterns = []
    import concurrent.futures

    for _, pattern in self.attack_structures.items():

        def _find_pattern(p: str, c: str) -> list[str]:
            return re.findall(p, c, re.IGNORECASE)[:10]

        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_find_pattern, pattern, content)
            try:
                matches = future.result(timeout=0.1)
                special_patterns.extend(matches)
            except concurrent.futures.TimeoutError:
                continue

        if len(special_patterns) >= 50:
            break

    return (tokens + special_patterns)[:MAX_TOKENS]

get_threat_score(analysis_results)

Source code in guard_core/detection_engine/semantic.py
def get_threat_score(self, analysis_results: dict[str, Any]) -> float:
    score = 0.0

    attack_probs = analysis_results.get("attack_probabilities", {})
    if attack_probs:
        max_prob = max(attack_probs.values())
        score += max_prob * 0.3

    if analysis_results.get("is_obfuscated", False):
        score += 0.2

    encoding_layers = analysis_results.get("encoding_layers", 0)
    if encoding_layers > 0:
        score += min(encoding_layers * 0.1, 0.2)

    injection_risk = analysis_results.get("code_injection_risk", 0.0)
    score += injection_risk * 0.2

    patterns = analysis_results.get("suspicious_patterns", [])
    if patterns:
        score += min(len(patterns) * 0.05, 0.1)

    return float(min(score, 1.0))