Skip to content

Handlers

Guard-core handlers are singleton services that manage specific security subsystems. Each handler supports optional Redis and agent integration.


IPBanManager

Manages banned IP addresses with a dual-layer cache (TTLCache + Redis).

class IPBanManager:
    banned_ips: TTLCache
    redis_handler: Any
    agent_handler: Any

    async def initialize_redis(self, redis_handler: Any) -> None:
        """
        Set the Redis handler for distributed ban storage.
        """

    async def initialize_agent(self, agent_handler: Any) -> None:
        """
        Set the agent handler for telemetry events.
        """

    async def ban_ip(
        self, ip: str, duration: int, reason: str = "threshold_exceeded"
    ) -> None:
        """
        Ban an IP for the given duration in seconds.
        """

    async def unban_ip(self, ip: str) -> None:
        """
        Remove a ban for the given IP.
        """

    async def is_ip_banned(self, ip: str) -> bool:
        """
        Check whether an IP is currently banned.
        """

    async def reset(self) -> None:
        """
        Clear all bans from local cache and Redis.
        """

RateLimitManager

Implements sliding window rate limiting with in-memory and Redis backends.

class RateLimitManager:
    config: SecurityConfig
    request_timestamps: defaultdict[str, deque[float]]
    logger: logging.Logger
    redis_handler: Any
    agent_handler: Any
    rate_limit_script_sha: str | None

    def __new__(
        cls: type["RateLimitManager"], config: SecurityConfig
    ) -> "RateLimitManager":
        """
        Singleton constructor. Accepts a SecurityConfig instance.
        """

    async def initialize_redis(self, redis_handler: Any) -> None:
        """
        Set the Redis handler and load the Lua rate-limiting script.
        """

    async def initialize_agent(self, agent_handler: Any) -> None:
        """
        Set the agent handler for telemetry events.
        """

    async def check_rate_limit(
        self,
        request: GuardRequest,
        client_ip: str,
        create_error_response: Callable[[int, str], Awaitable[GuardResponse]],
        endpoint_path: str = "",
        rate_limit: int | None = None,
        rate_limit_window: int | None = None,
    ) -> GuardResponse | None:
        """
        Check whether the client has exceeded the rate limit.
        Returns a 429 response if exceeded, None otherwise.
        """

    async def reset(self) -> None:
        """
        Clear all in-memory timestamps and Redis rate-limit keys.
        """

CloudManager

Fetches and caches IP ranges for AWS, GCP, and Azure cloud providers.

class CloudManager:
    ip_ranges: dict[str, set[IPv4Network | IPv6Network]]
    last_updated: dict[str, datetime | None]
    redis_handler: Any
    agent_handler: Any
    logger: logging.Logger

    async def initialize_redis(
        self,
        redis_handler: Any,
        providers: set[str] = {"AWS", "GCP", "Azure"},
        ttl: int = 3600,
    ) -> None:
        """
        Set the Redis handler and refresh cloud IP ranges with caching.
        """

    async def initialize_agent(self, agent_handler: Any) -> None:
        """
        Set the agent handler for telemetry events.
        """

    async def refresh(
        self, providers: set[str] = {"AWS", "GCP", "Azure"}
    ) -> None:
        """
        Refresh IP ranges without Redis. Raises RuntimeError if Redis is enabled.
        """

    async def refresh_async(
        self,
        providers: set[str] = {"AWS", "GCP", "Azure"},
        ttl: int = 3600,
    ) -> None:
        """
        Refresh IP ranges with optional Redis caching.
        """

    def is_cloud_ip(
        self, ip: str, providers: set[str] = {"AWS", "GCP", "Azure"}
    ) -> bool:
        """
        Check whether an IP belongs to any of the given cloud providers.
        """

    def get_cloud_provider_details(
        self, ip: str, providers: set[str] = {"AWS", "GCP", "Azure"}
    ) -> tuple[str, str] | None:
        """
        Return (provider, network) for the IP, or None.
        """

    async def send_cloud_detection_event(
        self,
        ip: str,
        provider: str,
        network: str,
        action_taken: str = "request_blocked",
    ) -> None:
        """
        Send a cloud-blocked telemetry event via the agent handler.
        """

RedisManager

Provides namespaced Redis operations with connection management and fault tolerance.

class RedisManager:
    config: SecurityConfig
    logger: logging.Logger
    agent_handler: Any

    def __new__(
        cls: type["RedisManager"], config: SecurityConfig
    ) -> "RedisManager":
        """
        Constructor. Accepts a SecurityConfig instance.
        """

    async def initialize(self) -> None:
        """
        Establish the Redis connection. Raises GuardRedisError on failure.
        """

    async def initialize_agent(self, agent_handler: Any) -> None:
        """
        Set the agent handler for telemetry events.
        """

    async def close(self) -> None:
        """
        Close the Redis connection gracefully.
        """

    @asynccontextmanager
    async def get_connection(self) -> AsyncIterator[Redis]:
        """
        Context manager yielding a live Redis connection.
        """

    async def safe_operation(
        self, func: Any, *args: Any, **kwargs: Any
    ) -> Any:
        """
        Execute a function with an auto-managed Redis connection.
        """

    async def get_key(self, namespace: str, key: str) -> Any:
        """
        Get a namespaced key value.
        """

    async def set_key(
        self, namespace: str, key: str, value: Any, ttl: int | None = None
    ) -> bool | None:
        """
        Set a namespaced key with optional TTL.
        """

    async def incr(
        self, namespace: str, key: str, ttl: int | None = None
    ) -> int | None:
        """
        Atomically increment a namespaced key.
        """

    async def exists(self, namespace: str, key: str) -> bool | None:
        """
        Check whether a namespaced key exists.
        """

    async def delete(self, namespace: str, key: str) -> int | None:
        """
        Delete a namespaced key.
        """

    async def keys(self, pattern: str) -> list[str] | None:
        """
        List keys matching a namespaced pattern.
        """

    async def delete_pattern(self, pattern: str) -> int | None:
        """
        Delete all keys matching a namespaced pattern.
        """

SecurityHeadersManager

Applies HTTP security headers (CSP, HSTS, CORS, and defaults) to responses.

class SecurityHeadersManager:
    headers_cache: TTLCache
    redis_handler: Any
    agent_handler: Any
    logger: logging.Logger
    enabled: bool
    custom_headers: dict[str, str]
    csp_config: dict[str, list[str]] | None
    hsts_config: dict[str, Any] | None
    cors_config: dict[str, Any] | None
    default_headers: dict[str, str]

    async def initialize_redis(self, redis_handler: Any) -> None:
        """
        Set the Redis handler and load/cache header configuration.
        """

    async def initialize_agent(self, agent_handler: Any) -> None:
        """
        Set the agent handler for telemetry events.
        """

    def configure(
        self,
        *,
        enabled: bool = True,
        csp: dict[str, list[str]] | None = None,
        hsts_max_age: int | None = None,
        hsts_include_subdomains: bool = True,
        hsts_preload: bool = False,
        frame_options: str | None = None,
        content_type_options: str | None = None,
        xss_protection: str | None = None,
        referrer_policy: str | None = None,
        permissions_policy: str | None = "UNSET",
        custom_headers: dict[str, str] | None = None,
        cors_origins: list[str] | None = None,
        cors_allow_credentials: bool = False,
        cors_allow_methods: list[str] | None = None,
        cors_allow_headers: list[str] | None = None,
    ) -> None:
        """
        Configure all security header subsystems at once.
        """

    async def get_headers(
        self, request_path: str | None = None
    ) -> dict[str, str]:
        """
        Build and return the full set of security headers for a request path.
        """

    async def get_cors_headers(self, origin: str) -> dict[str, str]:
        """
        Return CORS headers for the given origin, or empty dict if disallowed.
        """

    async def validate_csp_report(
        self, report: dict[str, Any]
    ) -> bool:
        """
        Validate and log a CSP violation report. Returns True if valid.
        """

    async def reset(self) -> None:
        """
        Reset all header configuration and caches.
        """

BehaviorTracker

Tracks endpoint usage patterns and response patterns for behavioral analysis.

class BehaviorRule:
    def __init__(
        self,
        rule_type: Literal["usage", "return_pattern", "frequency"],
        threshold: int,
        window: int = 3600,
        pattern: str | None = None,
        action: Literal["ban", "log", "throttle", "alert"] = "log",
        custom_action: Callable | None = None,
    ):
        """
        Define a single behavioral analysis rule.
        """


class BehaviorTracker:
    def __init__(self, config: SecurityConfig):
        """
        Create a tracker bound to the given SecurityConfig.
        """

    async def initialize_redis(self, redis_handler: Any) -> None:
        """
        Set the Redis handler for distributed tracking.
        """

    async def initialize_agent(self, agent_handler: Any) -> None:
        """
        Set the agent handler for telemetry events.
        """

    async def track_endpoint_usage(
        self, endpoint_id: str, client_ip: str, rule: BehaviorRule
    ) -> bool:
        """
        Record a usage event. Returns True if the threshold is exceeded.
        """

    async def track_return_pattern(
        self,
        endpoint_id: str,
        client_ip: str,
        response: GuardResponse,
        rule: BehaviorRule,
    ) -> bool:
        """
        Record a return-pattern match. Returns True if the threshold is exceeded.
        """

    async def apply_action(
        self,
        rule: BehaviorRule,
        client_ip: str,
        endpoint_id: str,
        details: str,
    ) -> None:
        """
        Execute the action defined by the rule (ban, log, throttle, or alert).
        """

SusPatternsManager

Orchestrates the detection engine for threat pattern matching and semantic analysis.

class SusPatternsManager:
    patterns: list[str]
    custom_patterns: set[str]
    compiled_patterns: list[tuple[re.Pattern, frozenset[str]]]
    compiled_custom_patterns: set[tuple[re.Pattern, frozenset[str]]]
    redis_handler: Any
    agent_handler: Any

    def __new__(
        cls: type["SusPatternsManager"], config: Any = None
    ) -> "SusPatternsManager":
        """
        Singleton constructor. Optionally accepts a config to initialize
        the detection engine components (compiler, preprocessor, semantic
        analyzer, performance monitor).
        """

    async def initialize_redis(self, redis_handler: Any) -> None:
        """
        Set the Redis handler and load cached custom patterns.
        """

    async def initialize_agent(self, agent_handler: Any) -> None:
        """
        Set the agent handler for telemetry events.
        """

    async def detect(
        self,
        content: str,
        ip_address: str,
        context: str = "unknown",
        correlation_id: str | None = None,
    ) -> dict[str, Any]:
        """
        Run full detection (regex + semantic) on content.
        Returns a result dict with is_threat, threat_score, threats, etc.
        """

    async def detect_pattern_match(
        self,
        content: str,
        ip_address: str,
        context: str = "unknown",
        correlation_id: str | None = None,
    ) -> tuple[bool, str | None]:
        """
        Simplified detection returning (is_threat, matched_pattern_or_None).
        """

    @classmethod
    async def add_pattern(
        cls, pattern: str, custom: bool = False
    ) -> None:
        """
        Add a regex pattern to the detection engine.
        """

    @classmethod
    async def remove_pattern(
        cls, pattern: str, custom: bool = False
    ) -> bool:
        """
        Remove a pattern. Returns True if it was found and removed.
        """

    @classmethod
    async def get_default_patterns(cls) -> list[str]: ...

    @classmethod
    async def get_custom_patterns(cls) -> list[str]: ...

    @classmethod
    async def get_all_patterns(cls) -> list[str]: ...

    @classmethod
    async def get_all_compiled_patterns(
        cls,
    ) -> list[tuple[re.Pattern, frozenset[str]]]: ...

    @classmethod
    async def get_performance_stats(cls) -> dict[str, Any] | None:
        """
        Return performance statistics if the monitor is active.
        """

    @classmethod
    async def get_component_status(cls) -> dict[str, bool]:
        """
        Return which detection engine components are active.
        """

    async def configure_semantic_threshold(
        self, threshold: float
    ) -> None:
        """
        Set the semantic detection threshold (clamped to 0.0-1.0).
        """

    @classmethod
    async def reset(cls) -> None:
        """
        Clear custom patterns, handlers, and caches.
        """