Skip to content

Handlers

Guard-core handlers are singleton services that manage specific security subsystems. Each handler supports optional Redis and agent integration.

IPBanManager

Manages banned IP addresses with a dual-layer cache (TTLCache + Redis).

guard_core.handlers.ipban_handler.IPBanManager

agent_handler = None class-attribute instance-attribute

banned_ips instance-attribute

redis_handler = None class-attribute instance-attribute

ban_ip(ip, duration, reason='threshold_exceeded') async

Source code in guard_core/handlers/ipban_handler.py
async def ban_ip(
    self, ip: str, duration: int, reason: str = "threshold_exceeded"
) -> None:
    expiry = time.time() + duration
    self.banned_ips[ip] = expiry

    if self.redis_handler:
        await self.redis_handler.set_key(
            "banned_ips", ip, str(expiry), ttl=duration
        )

    if self.agent_handler:
        await self._send_ban_event(ip, duration, reason)

initialize_agent(agent_handler) async

Source code in guard_core/handlers/ipban_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/ipban_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler

is_ip_banned(ip) async

Source code in guard_core/handlers/ipban_handler.py
async def is_ip_banned(self, ip: str) -> bool:
    current_time = time.time()

    if ip in self.banned_ips:
        if current_time > self.banned_ips[ip]:
            del self.banned_ips[ip]
            return False
        return True

    if self.redis_handler:
        expiry = await self.redis_handler.get_key("banned_ips", ip)
        if expiry:
            expiry_time = float(expiry)
            if current_time <= expiry_time:
                self.banned_ips[ip] = expiry_time
                return True
            await self.redis_handler.delete("banned_ips", ip)

    return False

reset() async

Source code in guard_core/handlers/ipban_handler.py
async def reset(self) -> None:
    self.banned_ips.clear()
    if self.redis_handler:
        async with self.redis_handler.get_connection() as conn:
            keys = await conn.keys(
                f"{self.redis_handler.config.redis_prefix}banned_ips:*"
            )
            if keys:
                await conn.delete(*keys)

unban_ip(ip) async

Source code in guard_core/handlers/ipban_handler.py
async def unban_ip(self, ip: str) -> None:
    if ip in self.banned_ips:
        del self.banned_ips[ip]

    if self.redis_handler:
        await self.redis_handler.delete("banned_ips", ip)

    if self.agent_handler:
        await self._send_unban_event(ip)

RateLimitManager

Implements sliding window rate limiting with in-memory and Redis backends.

guard_core.handlers.ratelimit_handler.RateLimitManager

agent_handler = None class-attribute instance-attribute

config instance-attribute

logger instance-attribute

rate_limit_script_sha = None class-attribute instance-attribute

redis_handler = None class-attribute instance-attribute

request_timestamps instance-attribute

check_rate_limit(request, client_ip, create_error_response, endpoint_path='', rate_limit=None, rate_limit_window=None) async

Source code in guard_core/handlers/ratelimit_handler.py
async def check_rate_limit(
    self,
    request: GuardRequest,
    client_ip: str,
    create_error_response: Callable[[int, str], Awaitable[GuardResponse]],
    endpoint_path: str = "",
    rate_limit: int | None = None,
    rate_limit_window: int | None = None,
) -> GuardResponse | None:
    if not self.config.enable_rate_limiting:
        return None

    effective_limit = (
        rate_limit if rate_limit is not None else self.config.rate_limit
    )
    effective_window = (
        rate_limit_window
        if rate_limit_window is not None
        else self.config.rate_limit_window
    )

    current_time = time.time()
    window_start = current_time - effective_window

    if self.config.enable_redis and self.redis_handler:
        count = await self._get_redis_request_count(
            client_ip,
            current_time,
            window_start,
            endpoint_path=endpoint_path,
            rate_limit_window=effective_window,
            rate_limit=effective_limit,
        )

        if count is not None:
            if count > effective_limit:
                return await self._handle_rate_limit_exceeded(
                    request,
                    client_ip,
                    count,
                    create_error_response,
                    rate_limit_window=effective_window,
                )
            return None

    request_count = self._get_in_memory_request_count(
        client_ip, window_start, current_time, endpoint_path=endpoint_path
    )

    if request_count >= effective_limit:
        return await self._handle_rate_limit_exceeded(
            request,
            client_ip,
            request_count + 1,
            create_error_response,
            rate_limit_window=effective_window,
        )

    return None

initialize_agent(agent_handler) async

Source code in guard_core/handlers/ratelimit_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/ratelimit_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler

    if self.redis_handler and self.config.enable_redis:
        try:
            async with self.redis_handler.get_connection() as conn:
                self.rate_limit_script_sha = await conn.script_load(
                    RATE_LIMIT_SCRIPT
                )
                self.logger.info("Rate limiting Lua script loaded successfully")
        except Exception as e:
            self.logger.error(f"Failed to load rate limiting Lua script: {str(e)}")

reset() async

Source code in guard_core/handlers/ratelimit_handler.py
async def reset(self) -> None:
    self.request_timestamps.clear()

    if self.config.enable_redis and self.redis_handler:
        try:
            keys = await self.redis_handler.keys("rate_limit:rate:*")
            if keys and len(keys) > 0:
                await self.redis_handler.delete_pattern("rate_limit:rate:*")
        except Exception as e:
            self.logger.error(f"Failed to reset Redis rate limits: {str(e)}")

CloudManager

Fetches and caches IP ranges for AWS, GCP, and Azure cloud providers.

guard_core.handlers.cloud_handler.CloudManager

agent_handler = None class-attribute instance-attribute

ip_ranges instance-attribute

last_updated instance-attribute

logger instance-attribute

redis_handler = None class-attribute instance-attribute

get_cloud_provider_details(ip, providers=_ALL_PROVIDERS)

Source code in guard_core/handlers/cloud_handler.py
def get_cloud_provider_details(
    self, ip: str, providers: set[str] = _ALL_PROVIDERS
) -> tuple[str, str] | None:
    try:
        ip_obj = ipaddress.ip_address(ip)
        for provider in providers:
            if provider in self.ip_ranges:
                for network in self.ip_ranges[provider]:
                    if ip_obj in network:
                        return (provider, str(network))
        return None
    except ValueError:
        self.logger.error(f"Invalid IP address: {ip}")
        return None

initialize_agent(agent_handler) async

Source code in guard_core/handlers/cloud_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler, providers=_ALL_PROVIDERS, ttl=3600) async

Source code in guard_core/handlers/cloud_handler.py
async def initialize_redis(
    self,
    redis_handler: Any,
    providers: set[str] = _ALL_PROVIDERS,
    ttl: int = 3600,
) -> None:
    self.redis_handler = redis_handler
    await self.refresh_async(providers, ttl=ttl)

is_cloud_ip(ip, providers=_ALL_PROVIDERS)

Source code in guard_core/handlers/cloud_handler.py
def is_cloud_ip(self, ip: str, providers: set[str] = _ALL_PROVIDERS) -> bool:
    try:
        ip_obj = ipaddress.ip_address(ip)
        for provider in providers:
            if provider in self.ip_ranges:
                for network in self.ip_ranges[provider]:
                    if ip_obj in network:
                        return True
        return False
    except ValueError:
        self.logger.error(f"Invalid IP address: {ip}")
        return False

refresh(providers=_ALL_PROVIDERS)

Source code in guard_core/handlers/cloud_handler.py
def refresh(self, providers: set[str] = _ALL_PROVIDERS) -> None:
    if self.redis_handler is None:
        self._refresh_sync(providers)
    else:
        raise RuntimeError("Use async refresh() when Redis is enabled")

refresh_async(providers=_ALL_PROVIDERS, ttl=3600) async

Source code in guard_core/handlers/cloud_handler.py
async def refresh_async(
    self, providers: set[str] = _ALL_PROVIDERS, ttl: int = 3600
) -> None:
    if self.redis_handler is None:
        self._refresh_sync(providers)
        return

    for provider in providers:
        try:
            cached_ranges = await self.redis_handler.get_key(
                "cloud_ranges", provider
            )
            if cached_ranges:
                self.ip_ranges[provider] = {
                    ipaddress.ip_network(ip) for ip in cached_ranges.split(",")
                }
                continue

            fetch_func = {
                "AWS": fetch_aws_ip_ranges,
                "GCP": fetch_gcp_ip_ranges,
                "Azure": fetch_azure_ip_ranges,
            }[provider]

            ranges = fetch_func()
            if ranges:
                old_ranges = self.ip_ranges.get(provider, set())
                self._log_range_changes(provider, old_ranges, ranges)
                self.ip_ranges[provider] = ranges
                self.last_updated[provider] = datetime.now(timezone.utc)

                await self.redis_handler.set_key(
                    "cloud_ranges",
                    provider,
                    ",".join(str(ip) for ip in ranges),
                    ttl=ttl,
                )

        except Exception as e:
            self.logger.error(f"Failed to refresh {provider} IP ranges: {str(e)}")
            if provider not in self.ip_ranges:
                self.ip_ranges[provider] = set()

send_cloud_detection_event(ip, provider, network, action_taken='request_blocked') async

Source code in guard_core/handlers/cloud_handler.py
async def send_cloud_detection_event(
    self,
    ip: str,
    provider: str,
    network: str,
    action_taken: str = "request_blocked",
) -> None:
    if not self.agent_handler:
        return

    await self._send_cloud_event(
        event_type="cloud_blocked",
        ip_address=ip,
        action_taken=action_taken,
        reason=f"IP belongs to blocked cloud provider: {provider}",
        cloud_provider=provider,
        network=network,
    )

RedisManager

Provides namespaced Redis operations with connection management and fault tolerance.

guard_core.handlers.redis_handler.RedisManager

agent_handler = None class-attribute instance-attribute

config instance-attribute

logger instance-attribute

close() async

Source code in guard_core/handlers/redis_handler.py
async def close(self) -> None:
    if self._redis:
        await self._redis.aclose()
        self._redis = None
        self.logger.info("Redis connection closed")

        await self._send_redis_event(
            event_type="redis_connection",
            action_taken="connection_closed",
            reason="Redis connection closed gracefully",
        )
    self._closed = True

delete(namespace, key) async

Source code in guard_core/handlers/redis_handler.py
async def delete(self, namespace: str, key: str) -> int | None:
    if not self.config.enable_redis:
        return None

    async def _delete(conn: Redis) -> int:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        delete_result = await conn.delete(full_key)
        return int(delete_result) if delete_result is not None else 0

    result = await self.safe_operation(_delete)
    return int(result) if result is not None else 0

delete_pattern(pattern) async

Source code in guard_core/handlers/redis_handler.py
async def delete_pattern(self, pattern: str) -> int | None:
    if not self.config.enable_redis:
        return None

    async def _delete_pattern(conn: Redis) -> int:
        full_pattern = f"{self.config.redis_prefix}{pattern}"
        keys = await conn.keys(full_pattern)
        if not keys:
            return 0
        result = await conn.delete(*keys)
        return int(result) if result is not None else 0

    result = await self.safe_operation(_delete_pattern)
    return int(result) if result is not None else 0

exists(namespace, key) async

Source code in guard_core/handlers/redis_handler.py
async def exists(self, namespace: str, key: str) -> bool | None:
    if not self.config.enable_redis:
        return None

    async def _exists(conn: Redis) -> bool:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        return bool(await conn.exists(full_key))

    result = await self.safe_operation(_exists)
    return False if result is None else bool(result)

get_connection() async

Source code in guard_core/handlers/redis_handler.py
@asynccontextmanager
async def get_connection(self) -> AsyncIterator[Redis]:
    try:
        if self._closed:
            await self._send_redis_event(
                event_type="redis_error",
                action_taken="operation_failed",
                reason="Attempted to use closed Redis connection",
                error_type="connection_closed",
            )
            raise GuardRedisError(503, "Redis connection closed")

        if not self._redis:
            await self.initialize()

        if self._redis is None:
            await self._send_redis_event(
                event_type="redis_error",
                action_taken="operation_failed",
                reason="Redis connection is None after initialization",
                error_type="initialization_failed",
            )
            raise GuardRedisError(503, "Redis connection failed")

        yield self._redis
    except (ConnectionError, AttributeError) as e:
        self.logger.error(f"Redis operation failed: {str(e)}")

        await self._send_redis_event(
            event_type="redis_error",
            action_taken="operation_failed",
            reason=f"Redis operation failed: {str(e)}",
            error_type="operation_error",
        )

        raise GuardRedisError(503, "Redis connection failed") from e

get_key(namespace, key) async

Source code in guard_core/handlers/redis_handler.py
async def get_key(self, namespace: str, key: str) -> Any:
    if not self.config.enable_redis:
        return None

    async def _get(conn: Redis) -> Any:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        return await conn.get(full_key)

    return await self.safe_operation(_get)

incr(namespace, key, ttl=None) async

Source code in guard_core/handlers/redis_handler.py
async def incr(
    self, namespace: str, key: str, ttl: int | None = None
) -> int | None:
    if not self.config.enable_redis:
        return None

    async def _incr(conn: Redis) -> int:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        async with conn.pipeline() as pipe:
            await pipe.incr(full_key)
            if ttl:
                await pipe.expire(full_key, ttl)
            result = await pipe.execute()
            return int(result[0]) if result else 0

    result = await self.safe_operation(_incr)
    return int(result) if result is not None else 0

initialize() async

Source code in guard_core/handlers/redis_handler.py
async def initialize(self) -> None:
    if self._closed or not self.config.enable_redis:
        self._redis = None
        return

    async with self._connection_lock:
        try:
            if self.config.redis_url is not None:
                self._redis = Redis.from_url(
                    self.config.redis_url, decode_responses=True
                )
                if self._redis is not None:
                    await self._redis.ping()
                    self.logger.info("Redis connection established")

                    await self._send_redis_event(
                        event_type="redis_connection",
                        action_taken="connection_established",
                        reason="Redis connection successfully established",
                        redis_url=self.config.redis_url,
                    )
            else:
                self.logger.warning("Redis URL is None, skipping connection")

        except Exception as e:
            self.logger.error(f"Redis connection failed: {str(e)}")

            await self._send_redis_event(
                event_type="redis_error",
                action_taken="connection_failed",
                reason=f"Redis connection failed: {str(e)}",
                redis_url=self.config.redis_url,
                error_type="connection_error",
            )

            self._redis = None
            raise GuardRedisError(503, "Redis connection failed") from e

initialize_agent(agent_handler) async

Source code in guard_core/handlers/redis_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

keys(pattern) async

Source code in guard_core/handlers/redis_handler.py
async def keys(self, pattern: str) -> list[str] | None:
    if not self.config.enable_redis:
        return None

    async def _keys(conn: Redis) -> list[str]:
        full_pattern = f"{self.config.redis_prefix}{pattern}"
        keys = await conn.keys(full_pattern)
        return [str(k) for k in keys] if keys else []

    result = await self.safe_operation(_keys)
    return result if result is not None else []

safe_operation(func, *args, **kwargs) async

Source code in guard_core/handlers/redis_handler.py
async def safe_operation(self, func: Any, *args: Any, **kwargs: Any) -> Any:
    if not self.config.enable_redis:
        return None

    try:
        async with self.get_connection() as conn:
            return await func(conn, *args, **kwargs)
    except Exception as e:
        self.logger.error(f"Redis operation failed: {str(e)}")

        await self._send_redis_event(
            event_type="redis_error",
            action_taken="safe_operation_failed",
            reason=f"Redis safe operation failed: {str(e)}",
            error_type="safe_operation_error",
            function_name=getattr(func, "__name__", "unknown"),
        )

        raise GuardRedisError(503, "Redis operation failed") from e

set_key(namespace, key, value, ttl=None) async

Source code in guard_core/handlers/redis_handler.py
async def set_key(
    self, namespace: str, key: str, value: Any, ttl: int | None = None
) -> bool | None:
    if not self.config.enable_redis:
        return None

    async def _set(conn: Redis) -> bool:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        if ttl:
            return bool(await conn.setex(full_key, ttl, value))
        return bool(await conn.set(full_key, value))

    result = await self.safe_operation(_set)
    return False if result is None else bool(result)

SecurityHeadersManager

Applies HTTP security headers (CSP, HSTS, CORS, and defaults) to responses.

guard_core.handlers.security_headers_handler.SecurityHeadersManager

agent_handler = None class-attribute instance-attribute

cors_config instance-attribute

csp_config instance-attribute

custom_headers instance-attribute

default_headers = {'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': 'SAMEORIGIN', 'X-XSS-Protection': '1; mode=block', 'Referrer-Policy': 'strict-origin-when-cross-origin', 'Permissions-Policy': 'geolocation=(), microphone=(), camera=()', 'X-Permitted-Cross-Domain-Policies': 'none', 'X-Download-Options': 'noopen', 'Cross-Origin-Embedder-Policy': 'require-corp', 'Cross-Origin-Opener-Policy': 'same-origin', 'Cross-Origin-Resource-Policy': 'same-origin'} class-attribute instance-attribute

enabled instance-attribute

headers_cache instance-attribute

hsts_config instance-attribute

logger instance-attribute

redis_handler = None class-attribute instance-attribute

configure(*, enabled=True, csp=None, hsts_max_age=None, hsts_include_subdomains=True, hsts_preload=False, frame_options=None, content_type_options=None, xss_protection=None, referrer_policy=None, permissions_policy='UNSET', custom_headers=None, cors_origins=None, cors_allow_credentials=False, cors_allow_methods=None, cors_allow_headers=None)

Source code in guard_core/handlers/security_headers_handler.py
def configure(
    self,
    *,
    enabled: bool = True,
    csp: dict[str, list[str]] | None = None,
    hsts_max_age: int | None = None,
    hsts_include_subdomains: bool = True,
    hsts_preload: bool = False,
    frame_options: str | None = None,
    content_type_options: str | None = None,
    xss_protection: str | None = None,
    referrer_policy: str | None = None,
    permissions_policy: str | None = "UNSET",
    custom_headers: dict[str, str] | None = None,
    cors_origins: list[str] | None = None,
    cors_allow_credentials: bool = False,
    cors_allow_methods: list[str] | None = None,
    cors_allow_headers: list[str] | None = None,
) -> None:
    self.enabled = enabled

    self._configure_csp(csp)
    self._configure_hsts(hsts_max_age, hsts_include_subdomains, hsts_preload)
    self._configure_cors(
        cors_origins, cors_allow_credentials, cors_allow_methods, cors_allow_headers
    )
    self._update_default_headers(
        frame_options,
        content_type_options,
        xss_protection,
        referrer_policy,
        permissions_policy,
    )
    self._add_custom_headers(custom_headers)

get_cors_headers(origin) async

Source code in guard_core/handlers/security_headers_handler.py
async def get_cors_headers(self, origin: str) -> dict[str, str]:
    if not self.cors_config:
        return {}

    allowed_origins = self.cors_config.get("origins", [])
    if not isinstance(allowed_origins, list):
        return {}

    if self._is_wildcard_with_credentials(allowed_origins):
        return {}

    if not self._is_origin_allowed(origin, allowed_origins):
        return {}

    allow_methods, allow_headers = self._get_validated_cors_config()
    return self._build_cors_headers(
        origin, allowed_origins, allow_methods, allow_headers
    )

get_headers(request_path=None) async

Source code in guard_core/handlers/security_headers_handler.py
async def get_headers(self, request_path: str | None = None) -> dict[str, str]:
    if not self.enabled:
        return {}

    cache_key = self._generate_cache_key(request_path)
    if cache_key in self.headers_cache:
        cached = self.headers_cache[cache_key]
        if isinstance(cached, dict):
            return cached

    headers = self.default_headers.copy()

    if self.csp_config:
        headers["Content-Security-Policy"] = self._build_csp(self.csp_config)

    if self.hsts_config:
        headers["Strict-Transport-Security"] = self._build_hsts(self.hsts_config)

    headers.update(self.custom_headers)

    self.headers_cache[cache_key] = headers

    if self.agent_handler and request_path:
        await self._send_headers_applied_event(request_path, headers)

    return headers

initialize_agent(agent_handler) async

Source code in guard_core/handlers/security_headers_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/security_headers_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler
    await self._load_cached_config()
    await self._cache_configuration()

reset() async

Source code in guard_core/handlers/security_headers_handler.py
async def reset(self) -> None:
    self.headers_cache.clear()
    self.custom_headers.clear()
    self.csp_config = None
    self.hsts_config = None
    self.cors_config = None
    self.enabled = True
    self.default_headers = self.__class__.default_headers.copy()

    if self.redis_handler:
        try:
            async with self.redis_handler.get_connection() as conn:
                keys = await conn.keys(
                    f"{self.redis_handler.config.redis_prefix}security_headers:*"
                )
                if keys:
                    await conn.delete(*keys)
        except Exception as e:
            self.logger.warning(f"Failed to clear Redis cache: {e}")

validate_csp_report(report) async

Source code in guard_core/handlers/security_headers_handler.py
async def validate_csp_report(self, report: dict[str, Any]) -> bool:
    required_fields = ["document-uri", "violated-directive", "blocked-uri"]

    csp_report = report.get("csp-report", {})
    if not all(field in csp_report for field in required_fields):
        return False

    self.logger.warning(
        f"CSP Violation: {csp_report.get('violated-directive')} "
        f"blocked {csp_report.get('blocked-uri')} "
        f"on {csp_report.get('document-uri')}"
    )

    if self.agent_handler:
        await self._send_csp_violation_event(csp_report)

    return True

BehaviorTracker

Tracks endpoint usage patterns and response patterns for behavioral analysis.

guard_core.handlers.behavior_handler.BehaviorTracker(config)

Source code in guard_core/handlers/behavior_handler.py
def __init__(self, config: SecurityConfig):
    self.config = config
    self.logger = logging.getLogger("guard_core.handlers.behavior")
    self.usage_counts: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.return_patterns: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.redis_handler: Any | None = None
    self.agent_handler: Any | None = None

agent_handler = None instance-attribute

config = config instance-attribute

logger = logging.getLogger('guard_core.handlers.behavior') instance-attribute

redis_handler = None instance-attribute

return_patterns = defaultdict(lambda: defaultdict(list)) instance-attribute

usage_counts = defaultdict(lambda: defaultdict(list)) instance-attribute

apply_action(rule, client_ip, endpoint_id, details) async

Source code in guard_core/handlers/behavior_handler.py
async def apply_action(
    self, rule: BehaviorRule, client_ip: str, endpoint_id: str, details: str
) -> None:
    if self.agent_handler:
        await self._send_behavior_event(
            event_type="behavioral_violation",
            ip_address=client_ip,
            action_taken=rule.action
            if not self.config.passive_mode
            else "logged_only",
            reason=f"Behavioral rule violated: {details}",
            endpoint=endpoint_id,
            rule_type=rule.rule_type,
            threshold=rule.threshold,
            window=rule.window,
        )

    if self.config.passive_mode:
        self._log_passive_mode_action(rule, client_ip, details)
    else:
        await self._execute_active_mode_action(
            rule, client_ip, endpoint_id, details
        )

initialize_agent(agent_handler) async

Source code in guard_core/handlers/behavior_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/behavior_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler

track_endpoint_usage(endpoint_id, client_ip, rule) async

Source code in guard_core/handlers/behavior_handler.py
async def track_endpoint_usage(
    self, endpoint_id: str, client_ip: str, rule: BehaviorRule
) -> bool:
    current_time = time.time()
    window_start = current_time - rule.window

    if self.redis_handler:
        key = f"behavior:usage:{endpoint_id}:{client_ip}"

        await self.redis_handler.set_key(
            "behavior_usage", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern = f"behavior_usage:{key}:*"
        keys = await self.redis_handler.keys(pattern)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    timestamps = self.usage_counts[endpoint_id][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

track_return_pattern(endpoint_id, client_ip, response, rule) async

Source code in guard_core/handlers/behavior_handler.py
async def track_return_pattern(
    self,
    endpoint_id: str,
    client_ip: str,
    response: GuardResponse,
    rule: BehaviorRule,
) -> bool:
    if not rule.pattern:
        return False

    current_time = time.time()
    window_start = current_time - rule.window

    pattern_matched = await self._check_response_pattern(response, rule.pattern)

    if not pattern_matched:
        return False

    if self.redis_handler:
        key = f"behavior:return:{endpoint_id}:{client_ip}:{rule.pattern}"

        await self.redis_handler.set_key(
            "behavior_returns", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern_key = f"behavior_returns:{key}:*"
        keys = await self.redis_handler.keys(pattern_key)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    pattern_key = f"{endpoint_id}:{rule.pattern}"
    timestamps = self.return_patterns[pattern_key][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

SusPatternsManager

Orchestrates the detection engine for threat pattern matching and semantic analysis.

guard_core.handlers.suspatterns_handler.SusPatternsManager

agent_handler instance-attribute

compiled_custom_patterns instance-attribute

compiled_patterns instance-attribute

custom_patterns instance-attribute

patterns = [(p[0]) for p in _pattern_definitions] class-attribute instance-attribute

redis_handler instance-attribute

add_pattern(pattern, custom=False) async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def add_pattern(cls, pattern: str, custom: bool = False) -> None:
    instance = cls()

    compiled_pattern = re.compile(pattern, re.IGNORECASE | re.MULTILINE)
    compiled_tuple = (compiled_pattern, _CTX_ALL)
    if custom:
        instance.compiled_custom_patterns.add(compiled_tuple)
        instance.custom_patterns.add(pattern)

        if instance.redis_handler:
            await instance.redis_handler.set_key(
                "patterns", "custom", ",".join(instance.custom_patterns)
            )
    else:
        instance.compiled_patterns.append(compiled_tuple)
        instance.patterns.append(pattern)

    if instance._compiler:
        await instance._compiler.clear_cache()

    if instance.agent_handler:
        details = f"{'Custom' if custom else 'Default'} pattern added"
        await instance._send_pattern_event(
            event_type="pattern_added",
            ip_address="system",
            action_taken="pattern_added",
            reason=f"{details} to detection system",
            pattern=pattern,
            pattern_type="custom" if custom else "default",
            total_patterns=len(instance.custom_patterns)
            if custom
            else len(instance.patterns),
        )

configure_semantic_threshold(threshold) async

Source code in guard_core/handlers/suspatterns_handler.py
async def configure_semantic_threshold(self, threshold: float) -> None:
    self._semantic_threshold = max(0.0, min(1.0, threshold))

detect(content, ip_address, context='unknown', correlation_id=None) async

Source code in guard_core/handlers/suspatterns_handler.py
async def detect(
    self,
    content: str,
    ip_address: str,
    context: str = "unknown",
    correlation_id: str | None = None,
) -> dict[str, Any]:
    original_content = content
    execution_start = time.time()

    processed_content = await self._preprocess_content(content, correlation_id)

    regex_threats, matched_patterns, timeouts = await self._check_regex_patterns(
        processed_content, ip_address, correlation_id, context
    )

    semantic_threats, semantic_score = await self._check_semantic_threats(
        processed_content
    )

    threats = regex_threats + semantic_threats
    is_threat = len(threats) > 0

    threat_score = await self._calculate_threat_score(
        regex_threats, semantic_threats
    )

    total_execution_time = time.time() - execution_start

    if self._performance_monitor:
        await self._performance_monitor.record_metric(
            pattern="overall_detection",
            execution_time=total_execution_time,
            content_length=len(content),
            matched=is_threat,
            timeout=False,
            agent_handler=self.agent_handler,
            correlation_id=correlation_id,
        )

    if is_threat:
        await self._send_threat_event(
            matched_patterns,
            semantic_threats,
            ip_address,
            context,
            content,
            threat_score,
            threats,
            regex_threats,
            timeouts,
            total_execution_time,
            correlation_id,
        )

    return {
        "is_threat": is_threat,
        "threat_score": threat_score,
        "threats": threats,
        "context": context,
        "original_length": len(original_content),
        "processed_length": len(processed_content),
        "execution_time": total_execution_time,
        "detection_method": "enhanced" if self._compiler else "legacy",
        "timeouts": timeouts,
        "correlation_id": correlation_id,
    }

detect_pattern_match(content, ip_address, context='unknown', correlation_id=None) async

Source code in guard_core/handlers/suspatterns_handler.py
async def detect_pattern_match(
    self,
    content: str,
    ip_address: str,
    context: str = "unknown",
    correlation_id: str | None = None,
) -> tuple[bool, str | None]:
    result = await self.detect(content, ip_address, context, correlation_id)

    if result["is_threat"]:
        if result["threats"]:
            threat = result["threats"][0]
            if threat["type"] == "regex":
                return True, threat["pattern"]
            elif threat["type"] == "semantic":
                return True, f"semantic:{threat.get('attack_type', 'suspicious')}"
        return True, "unknown"

    return False, None

get_all_compiled_patterns() async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def get_all_compiled_patterns(
    cls,
) -> list[tuple[re.Pattern, frozenset[str]]]:
    instance = cls()
    return instance.compiled_patterns + list(instance.compiled_custom_patterns)

get_all_patterns() async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def get_all_patterns(cls) -> list[str]:
    instance = cls()
    return instance.patterns + list(instance.custom_patterns)

get_component_status() async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def get_component_status(cls) -> dict[str, bool]:
    instance = cls()
    return {
        "compiler": instance._compiler is not None,
        "preprocessor": instance._preprocessor is not None,
        "semantic_analyzer": instance._semantic_analyzer is not None,
        "performance_monitor": instance._performance_monitor is not None,
    }

get_custom_compiled_patterns() async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def get_custom_compiled_patterns(
    cls,
) -> list[tuple[re.Pattern, frozenset[str]]]:
    instance = cls()
    return list(instance.compiled_custom_patterns)

get_custom_patterns() async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def get_custom_patterns(cls) -> list[str]:
    instance = cls()
    return list(instance.custom_patterns)

get_default_compiled_patterns() async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def get_default_compiled_patterns(
    cls,
) -> list[tuple[re.Pattern, frozenset[str]]]:
    instance = cls()
    return instance.compiled_patterns.copy()

get_default_patterns() async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def get_default_patterns(cls) -> list[str]:
    instance = cls()
    return instance.patterns.copy()

get_performance_stats() async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def get_performance_stats(cls) -> dict[str, Any] | None:
    instance = cls()
    if instance._performance_monitor:
        return {
            "summary": instance._performance_monitor.get_summary_stats(),
            "slow_patterns": instance._performance_monitor.get_slow_patterns(),
            "problematic_patterns": (
                instance._performance_monitor.get_problematic_patterns()
            ),
        }
    return None

initialize_agent(agent_handler) async

Source code in guard_core/handlers/suspatterns_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/suspatterns_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler
    if self.redis_handler:
        cached_patterns = await self.redis_handler.get_key("patterns", "custom")
        if cached_patterns:
            patterns = cached_patterns.split(",")
            for pattern in patterns:
                if pattern not in self.custom_patterns:
                    await self.add_pattern(pattern, custom=True)

remove_pattern(pattern, custom=False) async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def remove_pattern(cls, pattern: str, custom: bool = False) -> bool:
    instance = cls()

    if custom:
        pattern_removed = await instance._remove_custom_pattern(pattern)
    else:
        pattern_removed = await instance._remove_default_pattern(pattern)

    if pattern_removed:
        await instance._clear_pattern_caches(pattern)

    if pattern_removed:
        total_patterns = (
            len(instance.custom_patterns) if custom else len(instance.patterns)
        )
        await instance._send_pattern_removal_event(pattern, custom, total_patterns)

    return pattern_removed

reset() async classmethod

Source code in guard_core/handlers/suspatterns_handler.py
@classmethod
async def reset(cls) -> None:
    if cls._instance is not None:
        cls._instance.custom_patterns.clear()
        cls._instance.compiled_custom_patterns.clear()

        cls._instance.redis_handler = None
        cls._instance.agent_handler = None

        if hasattr(cls._instance, "_compiler") and cls._instance._compiler:
            await cls._instance._compiler.clear_cache()

        if (
            hasattr(cls._instance, "_performance_monitor")
            and cls._instance._performance_monitor
        ):
            cls._instance._performance_monitor.pattern_stats.clear()
            cls._instance._performance_monitor.recent_metrics.clear()

        cls._config = None