Skip to content

Rate Limiting

Guard-core implements rate limiting using a sliding window algorithm with dual backends: in-memory for single-instance deployments and Redis for distributed deployments. The RateLimitManager handler orchestrates both.

RateLimitManager

guard_core.handlers.ratelimit_handler.RateLimitManager

agent_handler = None class-attribute instance-attribute

config instance-attribute

logger instance-attribute

rate_limit_script_sha = None class-attribute instance-attribute

redis_handler = None class-attribute instance-attribute

request_timestamps instance-attribute

check_rate_limit(request, client_ip, create_error_response, endpoint_path='', rate_limit=None, rate_limit_window=None) async

Source code in guard_core/handlers/ratelimit_handler.py
async def check_rate_limit(
    self,
    request: GuardRequest,
    client_ip: str,
    create_error_response: Callable[[int, str], Awaitable[GuardResponse]],
    endpoint_path: str = "",
    rate_limit: int | None = None,
    rate_limit_window: int | None = None,
) -> GuardResponse | None:
    if not self.config.enable_rate_limiting:
        return None

    effective_limit = (
        rate_limit if rate_limit is not None else self.config.rate_limit
    )
    effective_window = (
        rate_limit_window
        if rate_limit_window is not None
        else self.config.rate_limit_window
    )

    current_time = time.time()
    window_start = current_time - effective_window

    if self.config.enable_redis and self.redis_handler:
        count = await self._get_redis_request_count(
            client_ip,
            current_time,
            window_start,
            endpoint_path=endpoint_path,
            rate_limit_window=effective_window,
            rate_limit=effective_limit,
        )

        if count is not None:
            if count > effective_limit:
                return await self._handle_rate_limit_exceeded(
                    request,
                    client_ip,
                    count,
                    create_error_response,
                    rate_limit_window=effective_window,
                )
            return None

    request_count = self._get_in_memory_request_count(
        client_ip, window_start, current_time, endpoint_path=endpoint_path
    )

    if request_count >= effective_limit:
        return await self._handle_rate_limit_exceeded(
            request,
            client_ip,
            request_count + 1,
            create_error_response,
            rate_limit_window=effective_window,
        )

    return None

initialize_agent(agent_handler) async

Source code in guard_core/handlers/ratelimit_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/ratelimit_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler

    if self.redis_handler and self.config.enable_redis:
        try:
            async with self.redis_handler.get_connection() as conn:
                self.rate_limit_script_sha = await conn.script_load(
                    RATE_LIMIT_SCRIPT
                )
                self.logger.info("Rate limiting Lua script loaded successfully")
        except Exception as e:
            self.logger.error(f"Failed to load rate limiting Lua script: {str(e)}")

reset() async

Source code in guard_core/handlers/ratelimit_handler.py
async def reset(self) -> None:
    self.request_timestamps.clear()

    if self.config.enable_redis and self.redis_handler:
        try:
            keys = await self.redis_handler.keys("rate_limit:rate:*")
            if keys and len(keys) > 0:
                await self.redis_handler.delete_pattern("rate_limit:rate:*")
        except Exception as e:
            self.logger.error(f"Failed to reset Redis rate limits: {str(e)}")

Singleton Pattern

RateLimitManager is a singleton parameterized by SecurityConfig:

class RateLimitManager:
    _instance: Optional["RateLimitManager"] = None

    def __new__(cls, config: SecurityConfig) -> "RateLimitManager":
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.config = config
            cls._instance.request_timestamps = defaultdict(deque)
            cls._instance.rate_limit_script_sha = None
        cls._instance.config = config
        return cls._instance

The config is always updated on access, allowing runtime reconfiguration.

Core Method

async def check_rate_limit(
    self,
    request: GuardRequest,
    client_ip: str,
    create_error_response: Callable[[int, str], Awaitable[GuardResponse]],
    endpoint_path: str = "",
    rate_limit: int | None = None,
    rate_limit_window: int | None = None,
) -> GuardResponse | None

Parameters:

Parameter Description
request The current request object
client_ip Resolved client IP address
create_error_response Factory callback for creating error responses
endpoint_path Optional path for per-endpoint tracking
rate_limit Override for config.rate_limit
rate_limit_window Override for config.rate_limit_window

Returns: None if under the limit, or a 429 response if exceeded.


Sliding Window Algorithm

In-Memory Backend

Uses collections.defaultdict[str, deque[float]] keyed by "{client_ip}:{endpoint_path}" or just "{client_ip}" for global limits.

def _get_in_memory_request_count(
    self, client_ip, window_start, current_time, endpoint_path=""
) -> int:
    key = f"{client_ip}:{endpoint_path}" if endpoint_path else client_ip
    # Evict expired timestamps
    while self.request_timestamps[key] and self.request_timestamps[key][0] <= window_start:
        self.request_timestamps[key].popleft()
    request_count = len(self.request_timestamps[key])
    self.request_timestamps[key].append(current_time)
    return request_count

The deque acts as a sliding window. Timestamps older than current_time - window are evicted on each call.

Redis Backend

Uses a Redis sorted set where:

  • Key: {redis_prefix}rate_limit:rate:{client_ip}:{endpoint_path}
  • Members: Timestamps (as strings)
  • Scores: Timestamps (as floats)

Each request:

  1. Adds the current timestamp (ZADD).
  2. Removes all entries before the window start (ZREMRANGEBYSCORE).
  3. Counts remaining entries (ZCARD).
  4. Sets a TTL of window * 2 to prevent key leakage.

Lua Script for Atomic Operations

When Redis is available, guard-core loads a Lua script at initialization for atomic rate limit operations:

local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local window_start = now - window

redis.call('ZADD', key, now, now)
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
local count = redis.call('ZCARD', key)
redis.call('EXPIRE', key, window * 2)

return count

The script SHA is cached in rate_limit_script_sha and executed with EVALSHA on each request.

Advantages of the Lua approach:

  • Atomicity: All four operations execute as a single Redis transaction.
  • Performance: One round trip instead of four.
  • Consistency: No race conditions between concurrent requests.

Fallback

If the Lua script is not loaded (e.g., script load failed), the Redis backend falls back to a pipeline of individual commands:

pipeline = conn.pipeline()
pipeline.zadd(key_name, {str(current_time): current_time})
pipeline.zremrangebyscore(key_name, 0, window_start)
pipeline.zcard(key_name)
pipeline.expire(key_name, window * 2)
results = await pipeline.execute()

If Redis fails entirely (connection error), the system falls back to in-memory rate limiting with a log warning.


Per-Endpoint Rate Limits

The RateLimitCheck pipeline check evaluates rate limits in priority order:

Priority Source Configuration
1 Dynamic endpoint rules config.endpoint_rate_limits[path] = (limit, window)
2 Route decorator RouteConfig.rate_limit + RouteConfig.rate_limit_window
3 Geo-based route limits RouteConfig.geo_rate_limits[country] = (limit, window)
4 Global config.rate_limit + config.rate_limit_window

The first matching tier short-circuits evaluation. For endpoint-specific and route-level limits, the endpoint_path parameter is set so that the rate counters are tracked separately from global counters.

Geo-Based Rate Limits

When RouteConfig.geo_rate_limits is configured, the check resolves the client's country via GeoIPHandler, then looks up the limit:

  1. Exact country match (e.g., "US").
  2. Wildcard fallback ("*").
  3. If no match, skips to the next tier.

Initialization

rate_limit_manager = RateLimitManager(config)
await rate_limit_manager.initialize_redis(redis_handler)
await rate_limit_manager.initialize_agent(agent_handler)

Redis initialization triggers the Lua script load. If it fails, the manager logs an error and proceeds with the pipeline fallback.


Reset

await rate_limit_manager.reset()

Clears the in-memory request_timestamps dictionary and deletes all rate_limit:rate:* keys from Redis.