Skip to content

Behavior Manager

The Behavior Manager handles behavioral analysis and monitoring for FastAPI Guard, providing advanced detection capabilities for suspicious usage patterns and automated response actions.


Overview

The Behavior Manager system consists of:

  • BehaviorTracker: Main tracking and analysis engine
  • BehaviorRule: Rule definition for behavioral analysis
  • Integration: Seamless integration with decorators and middleware

BehaviorTracker

guard_core.handlers.behavior_handler.BehaviorTracker(config)

Source code in guard_core/handlers/behavior_handler.py
def __init__(self, config: SecurityConfig):
    self.config = config
    self.logger = logging.getLogger("guard_core.handlers.behavior")
    self.usage_counts: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.return_patterns: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.redis_handler: Any | None = None
    self.agent_handler: Any | None = None

agent_handler = None instance-attribute

config = config instance-attribute

logger = logging.getLogger('guard_core.handlers.behavior') instance-attribute

redis_handler = None instance-attribute

return_patterns = defaultdict(lambda: defaultdict(list)) instance-attribute

usage_counts = defaultdict(lambda: defaultdict(list)) instance-attribute

apply_action(rule, client_ip, endpoint_id, details) async

Source code in guard_core/handlers/behavior_handler.py
async def apply_action(
    self, rule: BehaviorRule, client_ip: str, endpoint_id: str, details: str
) -> None:
    if self.agent_handler:
        await self._send_behavior_event(
            event_type="behavioral_violation",
            ip_address=client_ip,
            action_taken=rule.action
            if not self.config.passive_mode
            else "logged_only",
            reason=f"Behavioral rule violated: {details}",
            endpoint=endpoint_id,
            rule_type=rule.rule_type,
            threshold=rule.threshold,
            window=rule.window,
        )

    if self.config.passive_mode:
        self._log_passive_mode_action(rule, client_ip, details)
    else:
        await self._execute_active_mode_action(
            rule, client_ip, endpoint_id, details
        )

initialize_agent(agent_handler) async

Source code in guard_core/handlers/behavior_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/behavior_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler

track_endpoint_usage(endpoint_id, client_ip, rule) async

Source code in guard_core/handlers/behavior_handler.py
async def track_endpoint_usage(
    self, endpoint_id: str, client_ip: str, rule: BehaviorRule
) -> bool:
    current_time = time.time()
    window_start = current_time - rule.window

    if self.redis_handler:
        key = f"behavior:usage:{endpoint_id}:{client_ip}"

        await self.redis_handler.set_key(
            "behavior_usage", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern = f"behavior_usage:{key}:*"
        keys = await self.redis_handler.keys(pattern)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    timestamps = self.usage_counts[endpoint_id][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

track_return_pattern(endpoint_id, client_ip, response, rule) async

Source code in guard_core/handlers/behavior_handler.py
async def track_return_pattern(
    self,
    endpoint_id: str,
    client_ip: str,
    response: GuardResponse,
    rule: BehaviorRule,
) -> bool:
    if not rule.pattern:
        return False

    current_time = time.time()
    window_start = current_time - rule.window

    pattern_matched = await self._check_response_pattern(response, rule.pattern)

    if not pattern_matched:
        return False

    if self.redis_handler:
        key = f"behavior:return:{endpoint_id}:{client_ip}:{rule.pattern}"

        await self.redis_handler.set_key(
            "behavior_returns", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern_key = f"behavior_returns:{key}:*"
        keys = await self.redis_handler.keys(pattern_key)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    pattern_key = f"{endpoint_id}:{rule.pattern}"
    timestamps = self.return_patterns[pattern_key][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

The main class responsible for tracking and analyzing user behavior patterns.

. Key Features

  • Endpoint Usage Tracking: Monitor how frequently IPs access specific endpoints
  • Return Pattern Analysis: Detect when IPs receive specific response patterns too often
  • Frequency Detection: Identify suspiciously high request frequencies
  • Automated Actions: Apply bans, logs, alerts, or throttling based on rules

. Example Usage

from guard import BehaviorTracker, BehaviorRule

# Create tracker
tracker = BehaviorTracker(config)

# Define rules
usage_rule = BehaviorRule(
    rule_type="usage",
    threshold=10,
    window=3600,
    action="ban"
)

return_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=3,
    pattern="win",
    window=86400,
    action="alert"
)

# Track usage
await tracker.track_endpoint_usage(endpoint_id, client_ip, usage_rule)

# Track return patterns
await tracker.track_return_pattern(endpoint_id, client_ip, response, return_rule)

BehaviorRule

guard_core.handlers.behavior_handler.BehaviorRule(rule_type, threshold, window=3600, pattern=None, action='log', custom_action=None)

Source code in guard_core/handlers/behavior_handler.py
def __init__(
    self,
    rule_type: Literal["usage", "return_pattern", "frequency"],
    threshold: int,
    window: int = 3600,
    pattern: str | None = None,
    action: Literal["ban", "log", "throttle", "alert"] = "log",
    custom_action: Callable | None = None,
):
    self.rule_type = rule_type
    self.threshold = threshold
    self.window = window
    self.pattern = pattern
    self.action = action
    self.custom_action = custom_action

action = action instance-attribute

custom_action = custom_action instance-attribute

pattern = pattern instance-attribute

rule_type = rule_type instance-attribute

threshold = threshold instance-attribute

window = window instance-attribute

Configuration class that defines behavioral analysis rules.

. Rule Types

  • usage: Monitor endpoint usage frequency
  • return_pattern: Analyze response patterns
  • frequency: Detect suspicious request frequencies

. Pattern Formats

For return_pattern rules, the following pattern formats are supported:

  • Simple string: "win", "success", "rare_item"
  • JSON path: "json:result.status==win"
  • Regex: "regex:win|victory|success"
  • Status code: "status:200"

. Actions

  • ban: Ban the IP address
  • log: Log the incident
  • alert: Send an alert notification
  • throttle: Apply rate limiting

. Example Rules

# Usage monitoring
usage_rule = BehaviorRule(
    rule_type="usage",
    threshold=50,
    window=3600,
    action="ban"
)

# Return pattern monitoring
win_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=3,
    pattern="win",
    window=86400,
    action="ban"
)

# Frequency detection
freq_rule = BehaviorRule(
    rule_type="frequency",
    threshold=30,  # 30 requests
    window=300,    # in 5 minutes
    action="alert"
)

Integration with Decorators

The Behavior Manager integrates seamlessly with the decorator system:

from guard import SecurityDecorator

guard_deco = SecurityDecorator(config)

@app.get("/api/rewards")
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="alert")
def rewards_endpoint():
    return {"reward": "rare_item", "value": 1000}

Redis Integration

The Behavior Manager supports Redis for distributed tracking:

# Initialize with Redis
await tracker.initialize_redis(redis_handler)

# All tracking operations will use Redis for storage
# This enables behavior tracking across multiple instances

Advanced Usage

. Custom Pattern Matching

# JSON path pattern
json_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=5,
    pattern="json:result.reward.rarity==legendary",
    window=86400,
    action="ban"
)

# Regex pattern
regex_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=10,
    pattern="regex:(win|victory|success)",
    window=3600,
    action="alert"
)

# Status code pattern
status_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=100,
    pattern="status:200",
    window=3600,
    action="log"
)

. Multiple Rule Analysis

# Apply multiple rules to an endpoint
rules = [
    BehaviorRule("usage", threshold=20, window=3600, action="ban"),
    BehaviorRule("return_pattern", threshold=5, pattern="win", window=86400, action="alert"),
    BehaviorRule("frequency", threshold=60, window=300, action="throttle")
]

@guard_deco.behavior_analysis(rules)
def complex_endpoint():
    return {"data": "complex"}

Best Practices

. Set Appropriate Thresholds

Match thresholds to expected legitimate usage:

# High-value endpoint - strict limits
@guard_deco.usage_monitor(max_calls=5, window=3600, action="ban")

# Regular endpoint - moderate limits
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")

. Use Graduated Responses

Start with logging, then alerts, then bans:

# First violation - log
BehaviorRule("usage", threshold=10, window=3600, action="log")

# Second violation - alert
BehaviorRule("usage", threshold=20, window=3600, action="alert")

# Third violation - ban
BehaviorRule("usage", threshold=30, window=3600, action="ban")

. Monitor Return Patterns Carefully

Focus on patterns that indicate abuse:

# Gaming/gambling endpoints
@guard_deco.return_monitor("jackpot", max_occurrences=2, window=86400, action="ban")

# Reward systems
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="alert")

# Success patterns
@guard_deco.return_monitor("regex:success|win|victory", max_occurrences=10, window=3600, action="log")

Error Handling

The Behavior Manager handles errors gracefully:

  • Redis Connection Issues: Falls back to in-memory tracking
  • Pattern Matching Errors: Logs errors and continues processing
  • Action Failures: Logs failures but doesn't interrupt request flow

Monitoring and Debugging

Enable detailed logging for behavioral analysis:

config = SecurityConfig(
    log_suspicious_level="DEBUG",
    log_request_level="INFO"
)

# Logs will include:
# - Behavioral rule violations
# - Pattern matching results
# - Action execution results

Performance Considerations

  • Redis Usage: Reduces memory usage and enables distributed tracking
  • Pattern Complexity: Simple string patterns are fastest, regex patterns are slowest
  • Rule Count: More rules per endpoint increase processing time
  • Window Sizes: Larger windows require more memory for tracking

See Also