Skip to content

Behavioral Analysis

Guard-core provides behavioral analysis through two components: the BehaviorTracker handler that stores and evaluates behavioral data, and the BehavioralProcessor core module that integrates tracking into the middleware pipeline.

BehaviorRule

class BehaviorRule:
    def __init__(
        self,
        rule_type: Literal["usage", "return_pattern", "frequency"],
        threshold: int,
        window: int = 3600,
        pattern: str | None = None,
        action: Literal["ban", "log", "throttle", "alert"] = "log",
        custom_action: Callable | None = None,
    ): ...
Field Type Description
rule_type "usage" \| "return_pattern" \| "frequency" When the rule is evaluated
threshold int Number of occurrences before triggering
window int Time window in seconds
pattern str \| None Pattern for return_pattern rules
action "ban" \| "log" \| "throttle" \| "alert" Action to take when threshold is exceeded
custom_action Callable \| None Override function (client_ip, endpoint_id, details)

Rule Types

usage and frequency: Track how many times a client IP calls a specific endpoint within the window.

return_pattern: Track how many times a response matches a pattern for a specific client IP and endpoint.

Return Pattern Formats

Format Example Matches
status:{code} status:404 Response status code
json:{path} json:error.code=="AUTH_FAIL" JSON field value via dot-path traversal
regex:{pattern} regex:error.*failed Regex match against response body (case-insensitive)
Plain string unauthorized Substring match in response body (case-insensitive)

BehaviorTracker

guard_core.handlers.behavior_handler.BehaviorTracker(config)

Source code in guard_core/handlers/behavior_handler.py
def __init__(self, config: SecurityConfig):
    self.config = config
    self.logger = logging.getLogger("guard_core.handlers.behavior")
    self.usage_counts: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.return_patterns: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.redis_handler: Any | None = None
    self.agent_handler: Any | None = None

agent_handler = None instance-attribute

config = config instance-attribute

logger = logging.getLogger('guard_core.handlers.behavior') instance-attribute

redis_handler = None instance-attribute

return_patterns = defaultdict(lambda: defaultdict(list)) instance-attribute

usage_counts = defaultdict(lambda: defaultdict(list)) instance-attribute

apply_action(rule, client_ip, endpoint_id, details) async

Source code in guard_core/handlers/behavior_handler.py
async def apply_action(
    self, rule: BehaviorRule, client_ip: str, endpoint_id: str, details: str
) -> None:
    if self.agent_handler:
        await self._send_behavior_event(
            event_type="behavioral_violation",
            ip_address=client_ip,
            action_taken=rule.action
            if not self.config.passive_mode
            else "logged_only",
            reason=f"Behavioral rule violated: {details}",
            endpoint=endpoint_id,
            rule_type=rule.rule_type,
            threshold=rule.threshold,
            window=rule.window,
        )

    if self.config.passive_mode:
        self._log_passive_mode_action(rule, client_ip, details)
    else:
        await self._execute_active_mode_action(
            rule, client_ip, endpoint_id, details
        )

initialize_agent(agent_handler) async

Source code in guard_core/handlers/behavior_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/behavior_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler

track_endpoint_usage(endpoint_id, client_ip, rule) async

Source code in guard_core/handlers/behavior_handler.py
async def track_endpoint_usage(
    self, endpoint_id: str, client_ip: str, rule: BehaviorRule
) -> bool:
    current_time = time.time()
    window_start = current_time - rule.window

    if self.redis_handler:
        key = f"behavior:usage:{endpoint_id}:{client_ip}"

        await self.redis_handler.set_key(
            "behavior_usage", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern = f"behavior_usage:{key}:*"
        keys = await self.redis_handler.keys(pattern)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    timestamps = self.usage_counts[endpoint_id][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

track_return_pattern(endpoint_id, client_ip, response, rule) async

Source code in guard_core/handlers/behavior_handler.py
async def track_return_pattern(
    self,
    endpoint_id: str,
    client_ip: str,
    response: GuardResponse,
    rule: BehaviorRule,
) -> bool:
    if not rule.pattern:
        return False

    current_time = time.time()
    window_start = current_time - rule.window

    pattern_matched = await self._check_response_pattern(response, rule.pattern)

    if not pattern_matched:
        return False

    if self.redis_handler:
        key = f"behavior:return:{endpoint_id}:{client_ip}:{rule.pattern}"

        await self.redis_handler.set_key(
            "behavior_returns", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern_key = f"behavior_returns:{key}:*"
        keys = await self.redis_handler.keys(pattern_key)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    pattern_key = f"{endpoint_id}:{rule.pattern}"
    timestamps = self.return_patterns[pattern_key][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

Storage

In-memory: defaultdict(lambda: defaultdict(list)) mapping endpoint_id -> client_ip -> list[timestamp].

Redis: Keys like behavior:usage:{endpoint_id}:{client_ip}:{timestamp} with TTL equal to the rule window.

Key Methods

track_endpoint_usage(endpoint_id, client_ip, rule) -> bool

Records a usage event and returns True if the count exceeds rule.threshold within rule.window.

track_return_pattern(endpoint_id, client_ip, response, rule) -> bool

Checks if the response matches rule.pattern, records the event if it does, and returns True if the count exceeds the threshold.

Action Execution

await tracker.apply_action(rule, client_ip, endpoint_id, details)

Active mode actions:

Action Behavior
ban Calls ip_ban_manager.ban_ip(client_ip, 3600, "behavioral_violation")
log Logs a warning
throttle Logs a warning (throttling is informational; rate limiting handles enforcement)
alert Logs at CRITICAL level
Custom Calls rule.custom_action(client_ip, endpoint_id, details)

Passive mode: All actions are logged with a [PASSIVE MODE] prefix instead of being executed.


BehavioralProcessor

guard_core.core.behavioral.processor.BehavioralProcessor(context)

Source code in guard_core/core/behavioral/processor.py
def __init__(self, context: BehavioralContext) -> None:
    self.context = context

context = context instance-attribute

get_endpoint_id(request)

Source code in guard_core/core/behavioral/processor.py
def get_endpoint_id(self, request: GuardRequest) -> str:
    if hasattr(request, "scope") and "route" in request.scope:
        route = request.scope["route"]
        if hasattr(route, "endpoint"):
            return f"{route.endpoint.__module__}.{route.endpoint.__qualname__}"
    return f"{request.method}:{request.url_path}"

process_return_rules(request, response, client_ip, route_config) async

Source code in guard_core/core/behavioral/processor.py
async def process_return_rules(
    self,
    request: GuardRequest,
    response: GuardResponse,
    client_ip: str,
    route_config: RouteConfig,
) -> None:
    if not self.context.guard_decorator:
        return

    endpoint_id = self.get_endpoint_id(request)
    for rule in route_config.behavior_rules:
        if rule.rule_type == "return_pattern":
            behavior_tracker = self.context.guard_decorator.behavior_tracker
            pattern_detected = await behavior_tracker.track_return_pattern(
                endpoint_id, client_ip, response, rule
            )
            if pattern_detected:
                details = f"{rule.threshold} for '{rule.pattern}' in {rule.window}s"

                await self.context.event_bus.send_middleware_event(
                    event_type="decorator_violation",
                    request=request,
                    action_taken="behavioral_action_triggered",
                    reason=f"Return pattern threshold exceeded: {details}",
                    decorator_type="behavioral",
                    violation_type="return_pattern",
                    threshold=rule.threshold,
                    window=rule.window,
                    pattern=rule.pattern,
                    action=rule.action,
                    endpoint_id=endpoint_id,
                )

                await self.context.guard_decorator.behavior_tracker.apply_action(
                    rule,
                    client_ip,
                    endpoint_id,
                    f"Return pattern threshold exceeded: {details}",
                )

process_usage_rules(request, client_ip, route_config) async

Source code in guard_core/core/behavioral/processor.py
async def process_usage_rules(
    self, request: GuardRequest, client_ip: str, route_config: RouteConfig
) -> None:
    if not self.context.guard_decorator:
        return

    endpoint_id = self.get_endpoint_id(request)
    for rule in route_config.behavior_rules:
        if rule.rule_type in ["usage", "frequency"]:
            behavior_tracker = self.context.guard_decorator.behavior_tracker
            threshold_exceeded = await behavior_tracker.track_endpoint_usage(
                endpoint_id, client_ip, rule
            )
            if threshold_exceeded:
                details = f"{rule.threshold} calls in {rule.window}s"
                message = f"Behavioral {rule.rule_type}"
                reason = "threshold exceeded"

                await self.context.event_bus.send_middleware_event(
                    event_type="decorator_violation",
                    request=request,
                    action_taken="behavioral_action_triggered",
                    reason=f"{message} {reason}: {details}",
                    decorator_type="behavioral",
                    violation_type=rule.rule_type,
                    threshold=rule.threshold,
                    window=rule.window,
                    action=rule.action,
                    endpoint_id=endpoint_id,
                )

                await self.context.guard_decorator.behavior_tracker.apply_action(
                    rule,
                    client_ip,
                    endpoint_id,
                    f"Usage threshold exceeded: {details}",
                )

The processor integrates behavioral tracking into the middleware pipeline. It is called at two points in the request lifecycle:

Usage Rules (Pre-Handler)

await processor.process_usage_rules(request, client_ip, route_config)

Iterates over route_config.behavior_rules where rule_type is "usage" or "frequency". For each rule that exceeds its threshold, emits a decorator_violation event and applies the rule's action.

Return Rules (Post-Handler)

await processor.process_return_rules(request, response, client_ip, route_config)

Iterates over rules where rule_type is "return_pattern". Checks if the response matches the rule's pattern, and if the threshold is exceeded, emits an event and applies the action.

Endpoint ID Resolution

def get_endpoint_id(self, request: GuardRequest) -> str

Resolves the endpoint identifier from the request scope:

  1. If request.scope["route"].endpoint exists, returns "{module}.{qualname}".
  2. Otherwise, falls back to "{method}:{url_path}".

Configuration via Decorators

Behavioral rules are attached to routes through the SecurityDecorator:

from guard_core.decorators import SecurityDecorator
from guard_core.handlers.behavior_handler import BehaviorRule

security = SecurityDecorator(config)

@security.behavioral(rules=[
    BehaviorRule(
        rule_type="usage",
        threshold=100,
        window=300,
        action="throttle",
    ),
    BehaviorRule(
        rule_type="return_pattern",
        threshold=10,
        window=60,
        pattern="status:429",
        action="ban",
    ),
])
async def my_endpoint():
    ...