Skip to content

Redis Integration

Guard-core uses Redis for distributed state management across multiple application instances. The RedisManager handler provides connection management, namespaced key operations, and fault-tolerant wrappers.

RedisManager

guard_core.handlers.redis_handler.RedisManager

agent_handler = None class-attribute instance-attribute

config instance-attribute

logger instance-attribute

close() async

Source code in guard_core/handlers/redis_handler.py
async def close(self) -> None:
    if self._redis:
        await self._redis.aclose()
        self._redis = None
        self.logger.info("Redis connection closed")

        await self._send_redis_event(
            event_type="redis_connection",
            action_taken="connection_closed",
            reason="Redis connection closed gracefully",
        )
    self._closed = True

delete(namespace, key) async

Source code in guard_core/handlers/redis_handler.py
async def delete(self, namespace: str, key: str) -> int | None:
    if not self.config.enable_redis:
        return None

    async def _delete(conn: Redis) -> int:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        delete_result = await conn.delete(full_key)
        return int(delete_result) if delete_result is not None else 0

    result = await self.safe_operation(_delete)
    return int(result) if result is not None else 0

delete_pattern(pattern) async

Source code in guard_core/handlers/redis_handler.py
async def delete_pattern(self, pattern: str) -> int | None:
    if not self.config.enable_redis:
        return None

    async def _delete_pattern(conn: Redis) -> int:
        full_pattern = f"{self.config.redis_prefix}{pattern}"
        keys = await conn.keys(full_pattern)
        if not keys:
            return 0
        result = await conn.delete(*keys)
        return int(result) if result is not None else 0

    result = await self.safe_operation(_delete_pattern)
    return int(result) if result is not None else 0

exists(namespace, key) async

Source code in guard_core/handlers/redis_handler.py
async def exists(self, namespace: str, key: str) -> bool | None:
    if not self.config.enable_redis:
        return None

    async def _exists(conn: Redis) -> bool:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        return bool(await conn.exists(full_key))

    result = await self.safe_operation(_exists)
    return False if result is None else bool(result)

get_connection() async

Source code in guard_core/handlers/redis_handler.py
@asynccontextmanager
async def get_connection(self) -> AsyncIterator[Redis]:
    try:
        if self._closed:
            await self._send_redis_event(
                event_type="redis_error",
                action_taken="operation_failed",
                reason="Attempted to use closed Redis connection",
                error_type="connection_closed",
            )
            raise GuardRedisError(503, "Redis connection closed")

        if not self._redis:
            await self.initialize()

        if self._redis is None:
            await self._send_redis_event(
                event_type="redis_error",
                action_taken="operation_failed",
                reason="Redis connection is None after initialization",
                error_type="initialization_failed",
            )
            raise GuardRedisError(503, "Redis connection failed")

        yield self._redis
    except (ConnectionError, AttributeError) as e:
        self.logger.error(f"Redis operation failed: {str(e)}")

        await self._send_redis_event(
            event_type="redis_error",
            action_taken="operation_failed",
            reason=f"Redis operation failed: {str(e)}",
            error_type="operation_error",
        )

        raise GuardRedisError(503, "Redis connection failed") from e

get_key(namespace, key) async

Source code in guard_core/handlers/redis_handler.py
async def get_key(self, namespace: str, key: str) -> Any:
    if not self.config.enable_redis:
        return None

    async def _get(conn: Redis) -> Any:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        return await conn.get(full_key)

    return await self.safe_operation(_get)

incr(namespace, key, ttl=None) async

Source code in guard_core/handlers/redis_handler.py
async def incr(
    self, namespace: str, key: str, ttl: int | None = None
) -> int | None:
    if not self.config.enable_redis:
        return None

    async def _incr(conn: Redis) -> int:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        async with conn.pipeline() as pipe:
            await pipe.incr(full_key)
            if ttl:
                await pipe.expire(full_key, ttl)
            result = await pipe.execute()
            return int(result[0]) if result else 0

    result = await self.safe_operation(_incr)
    return int(result) if result is not None else 0

initialize() async

Source code in guard_core/handlers/redis_handler.py
async def initialize(self) -> None:
    if self._closed or not self.config.enable_redis:
        self._redis = None
        return

    async with self._connection_lock:
        try:
            if self.config.redis_url is not None:
                self._redis = Redis.from_url(
                    self.config.redis_url, decode_responses=True
                )
                if self._redis is not None:
                    await self._redis.ping()
                    self.logger.info("Redis connection established")

                    await self._send_redis_event(
                        event_type="redis_connection",
                        action_taken="connection_established",
                        reason="Redis connection successfully established",
                        redis_url=self.config.redis_url,
                    )
            else:
                self.logger.warning("Redis URL is None, skipping connection")

        except Exception as e:
            self.logger.error(f"Redis connection failed: {str(e)}")

            await self._send_redis_event(
                event_type="redis_error",
                action_taken="connection_failed",
                reason=f"Redis connection failed: {str(e)}",
                redis_url=self.config.redis_url,
                error_type="connection_error",
            )

            self._redis = None
            raise GuardRedisError(503, "Redis connection failed") from e

initialize_agent(agent_handler) async

Source code in guard_core/handlers/redis_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

keys(pattern) async

Source code in guard_core/handlers/redis_handler.py
async def keys(self, pattern: str) -> list[str] | None:
    if not self.config.enable_redis:
        return None

    async def _keys(conn: Redis) -> list[str]:
        full_pattern = f"{self.config.redis_prefix}{pattern}"
        keys = await conn.keys(full_pattern)
        return [str(k) for k in keys] if keys else []

    result = await self.safe_operation(_keys)
    return result if result is not None else []

safe_operation(func, *args, **kwargs) async

Source code in guard_core/handlers/redis_handler.py
async def safe_operation(self, func: Any, *args: Any, **kwargs: Any) -> Any:
    if not self.config.enable_redis:
        return None

    try:
        async with self.get_connection() as conn:
            return await func(conn, *args, **kwargs)
    except Exception as e:
        self.logger.error(f"Redis operation failed: {str(e)}")

        await self._send_redis_event(
            event_type="redis_error",
            action_taken="safe_operation_failed",
            reason=f"Redis safe operation failed: {str(e)}",
            error_type="safe_operation_error",
            function_name=getattr(func, "__name__", "unknown"),
        )

        raise GuardRedisError(503, "Redis operation failed") from e

set_key(namespace, key, value, ttl=None) async

Source code in guard_core/handlers/redis_handler.py
async def set_key(
    self, namespace: str, key: str, value: Any, ttl: int | None = None
) -> bool | None:
    if not self.config.enable_redis:
        return None

    async def _set(conn: Redis) -> bool:
        full_key = f"{self.config.redis_prefix}{namespace}:{key}"
        if ttl:
            return bool(await conn.setex(full_key, ttl, value))
        return bool(await conn.set(full_key, value))

    result = await self.safe_operation(_set)
    return False if result is None else bool(result)

Construction

class RedisManager:
    def __new__(cls, config: SecurityConfig) -> "RedisManager":
        cls._instance = super().__new__(cls)
        cls._instance.config = config
        cls._instance._closed = False
        cls._instance.agent_handler = None
        return cls._instance

Unlike other handlers, RedisManager creates a new instance on every construction. The redis_handler module-level variable is the class itself (not an instance), allowing deferred instantiation with a config.

Connection Lifecycle

Initialize:

redis_manager = RedisManager(config)
await redis_manager.initialize()

Creates a redis.asyncio.Redis connection from config.redis_url with decode_responses=True. Pings to verify connectivity. Raises GuardRedisError(503) on failure.

Close:

await redis_manager.close()

Closes the connection and sets _closed = True. Subsequent operations will raise GuardRedisError.

Connection Context Manager

@asynccontextmanager
async def get_connection(self) -> AsyncIterator[Redis]:

All operations use this context manager, which:

  1. Checks if the connection is closed.
  2. Auto-initializes if _redis is None.
  3. Yields the Redis connection.
  4. Catches ConnectionError and AttributeError, wrapping them in GuardRedisError(503).

Safe Operations

async def safe_operation(self, func, *args, **kwargs) -> Any

Wraps any async function that takes a Redis connection as its first argument. Returns None if Redis is disabled. Raises GuardRedisError on failure.


Key Namespacing

All keys are prefixed with config.redis_prefix (default: "guard_core:") and organized by namespace:

{redis_prefix}{namespace}:{key}

Examples:

Operation Redis Key
Banned IP lookup guard_core:banned_ips:192.168.1.1
Rate limit counter guard_core:rate_limit:rate:10.0.0.1
Cloud IP cache guard_core:cloud_ranges:AWS
Custom patterns guard_core:patterns:custom
Security headers config guard_core:security_headers:csp_config
Behavioral tracking guard_core:behavior_usage:{key}:{ts}

Key Operations

Method Description
get_key(namespace, key) Get a namespaced key value
set_key(namespace, key, value, ttl) Set with optional TTL (uses SETEX if TTL provided)
incr(namespace, key, ttl) Atomic increment with optional TTL
exists(namespace, key) Check key existence
delete(namespace, key) Delete a single key
keys(pattern) Find keys matching a pattern (auto-prefixed)
delete_pattern(pattern) Delete all keys matching a pattern

All methods return None when Redis is disabled (config.enable_redis = False), allowing callers to fall back to local state without error handling.


Fault Tolerance

Graceful Degradation

When Redis is unavailable, guard-core falls back to in-memory state for all subsystems:

Subsystem Redis State Fallback
Rate limiting Sorted sets per IP In-memory defaultdict(deque)
IP banning Key-value with TTL TTLCache(maxsize=10000)
Cloud IP ranges Cached CIDR strings Direct HTTP fetch + memory
Suspicious patterns Custom pattern list Built-in patterns only
Security headers Configuration cache In-memory configuration

Error Handling

GuardRedisError is raised with a status_code and detail:

class GuardRedisError(GuardCoreError):
    def __init__(self, status_code: int, detail: str) -> None:
        self.status_code = status_code
        self.detail = detail

Adapters should catch GuardRedisError during initialization and handle it according to their framework's error model.


Configuration

Field Type Default Description
enable_redis bool True Master switch for Redis integration
redis_url str \| None "redis://localhost:6379" Redis connection URL
redis_prefix str "guard_core:" Key prefix for namespace isolation

Adapter Considerations

  • Set redis_prefix to a unique value per application to avoid key collisions when sharing a Redis instance.
  • When enable_redis is False, all RedisManager methods return None without attempting connections.
  • The redis_url can include authentication: "redis://user:password@host:port/db".