Skip to content

Behavior Manager

The Behavior Manager handles behavioral analysis and monitoring for FastAPI Guard, providing advanced detection capabilities for suspicious usage patterns and automated response actions.


Overview

The Behavior Manager system consists of:

  • BehaviorTracker: Main tracking and analysis engine
  • BehaviorRule: Rule definition for behavioral analysis
  • Integration: Seamless integration with decorators and middleware

BehaviorTracker

guard.handlers.behavior_handler.BehaviorTracker(config)

Advanced behavioral analysis tracker for detecting suspicious patterns.

This class can track: - Per-endpoint usage patterns - Return value frequency analysis - Time-based behavioral anomalies

Source code in guard/handlers/behavior_handler.py
def __init__(self, config: SecurityConfig):
    self.config = config
    self.logger = logging.getLogger(__name__)
    self.usage_counts: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.return_patterns: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.redis_handler: Any | None = None

config = config instance-attribute

logger = logging.getLogger(__name__) instance-attribute

redis_handler = None instance-attribute

return_patterns = defaultdict(lambda: defaultdict(list)) instance-attribute

usage_counts = defaultdict(lambda: defaultdict(list)) instance-attribute

apply_action(rule, client_ip, endpoint_id, details) async

Apply the configured action when a rule is violated.

Source code in guard/handlers/behavior_handler.py
async def apply_action(
    self, rule: BehaviorRule, client_ip: str, endpoint_id: str, details: str
) -> None:
    """Apply the configured action when a rule is violated."""

    if rule.custom_action:
        await rule.custom_action(client_ip, endpoint_id, details)
        return

    if rule.action == "ban":
        # Import here to avoid circular imports
        from guard.handlers.ipban_handler import ip_ban_manager

        await ip_ban_manager.ban_ip(client_ip, 3600)  # 1 hour ban
        self.logger.warning(
            f"IP {client_ip} banned for behavioral violation: {details}"
        )

    elif rule.action == "log":
        self.logger.warning(f"Behavioral anomaly detected: {details}")

    elif rule.action == "throttle":
        # Could implement stricter rate limiting here
        self.logger.warning(f"Throttling IP {client_ip}: {details}")

    elif rule.action == "alert":
        # Could send webhook/notification here
        self.logger.critical(f"ALERT - Behavioral anomaly: {details}")

initialize_redis(redis_handler) async

Initialize Redis connection for distributed tracking.

Source code in guard/handlers/behavior_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    """Initialize Redis connection for distributed tracking."""
    self.redis_handler = redis_handler

track_endpoint_usage(endpoint_id, client_ip, rule) async

Track endpoint usage and return True if threshold exceeded.

Parameters:

Name Type Description Default
endpoint_id str

Unique identifier for the endpoint

required
client_ip str

Client IP address

required
rule BehaviorRule

Behavior rule to apply

required

Returns:

Name Type Description
bool bool

True if threshold exceeded, False otherwise

Source code in guard/handlers/behavior_handler.py
async def track_endpoint_usage(
    self, endpoint_id: str, client_ip: str, rule: BehaviorRule
) -> bool:
    """
    Track endpoint usage and return True if threshold exceeded.

    Args:
        endpoint_id: Unique identifier for the endpoint
        client_ip: Client IP address
        rule: Behavior rule to apply

    Returns:
        bool: True if threshold exceeded, False otherwise
    """
    current_time = time.time()
    window_start = current_time - rule.window

    # Redis implementation
    if self.redis_handler:
        key = f"behavior:usage:{endpoint_id}:{client_ip}"

        # Add current timestamp
        await self.redis_handler.set_key(
            "behavior_usage", f"{key}:{current_time}", "1", ttl=rule.window
        )

        # Count entries in window
        pattern = f"behavior_usage:{key}:*"
        keys = await self.redis_handler.keys(pattern)

        # Filter keys within time window
        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    # In-memory fallback
    timestamps = self.usage_counts[endpoint_id][client_ip]

    # Clean old timestamps
    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    # Add current timestamp
    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

track_return_pattern(endpoint_id, client_ip, response, rule) async

Track return value patterns and detect anomalies.

Parameters:

Name Type Description Default
endpoint_id str

Unique identifier for the endpoint

required
client_ip str

Client IP address

required
response Response

FastAPI Response object

required
rule BehaviorRule

Behavior rule with pattern to match

required

Returns:

Name Type Description
bool bool

True if suspicious pattern detected, False otherwise

Source code in guard/handlers/behavior_handler.py
async def track_return_pattern(
    self, endpoint_id: str, client_ip: str, response: Response, rule: BehaviorRule
) -> bool:
    """
    Track return value patterns and detect anomalies.

    Args:
        endpoint_id: Unique identifier for the endpoint
        client_ip: Client IP address
        response: FastAPI Response object
        rule: Behavior rule with pattern to match

    Returns:
        bool: True if suspicious pattern detected, False otherwise
    """
    if not rule.pattern:
        return False

    current_time = time.time()
    window_start = current_time - rule.window

    # Extract response content for analysis
    pattern_matched = await self._check_response_pattern(response, rule.pattern)

    if not pattern_matched:
        return False

    # Redis implementation
    if self.redis_handler:
        key = f"behavior:return:{endpoint_id}:{client_ip}:{rule.pattern}"

        # Add timestamp for pattern match
        await self.redis_handler.set_key(
            "behavior_returns", f"{key}:{current_time}", "1", ttl=rule.window
        )

        # Count pattern matches in window
        pattern_key = f"behavior_returns:{key}:*"
        keys = await self.redis_handler.keys(pattern_key)

        # Filter keys within time window
        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    # In-memory fallback
    pattern_key = f"{endpoint_id}:{rule.pattern}"
    timestamps = self.return_patterns[pattern_key][client_ip]

    # Clean old timestamps
    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    # Add current timestamp
    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

The main class responsible for tracking and analyzing user behavior patterns.

. Key Features

  • Endpoint Usage Tracking: Monitor how frequently IPs access specific endpoints
  • Return Pattern Analysis: Detect when IPs receive specific response patterns too often
  • Frequency Detection: Identify suspiciously high request frequencies
  • Automated Actions: Apply bans, logs, alerts, or throttling based on rules

. Example Usage

from guard.handlers.behavior_handler import BehaviorTracker, BehaviorRule

# Create tracker
tracker = BehaviorTracker(config)

# Define rules
usage_rule = BehaviorRule(
    rule_type="usage",
    threshold=10,
    window=3600,
    action="ban"
)

return_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=3,
    pattern="win",
    window=86400,
    action="alert"
)

# Track usage
await tracker.track_endpoint_usage(endpoint_id, client_ip, usage_rule)

# Track return patterns
await tracker.track_return_pattern(endpoint_id, client_ip, response, return_rule)

BehaviorRule

guard.handlers.behavior_handler.BehaviorRule(rule_type, threshold, window=3600, pattern=None, action='log', custom_action=None)

Defines a behavioral analysis rule.

Source code in guard/handlers/behavior_handler.py
def __init__(
    self,
    rule_type: Literal["usage", "return_pattern", "frequency"],
    threshold: int,
    window: int = 3600,  # 1 hour default
    pattern: str | None = None,
    action: Literal["ban", "log", "throttle", "alert"] = "log",
    custom_action: Callable | None = None,
):
    self.rule_type = rule_type
    self.threshold = threshold
    self.window = window
    self.pattern = pattern
    self.action = action
    self.custom_action = custom_action

action = action instance-attribute

custom_action = custom_action instance-attribute

pattern = pattern instance-attribute

rule_type = rule_type instance-attribute

threshold = threshold instance-attribute

window = window instance-attribute

Configuration class that defines behavioral analysis rules.

. Rule Types

  • usage: Monitor endpoint usage frequency
  • return_pattern: Analyze response patterns
  • frequency: Detect suspicious request frequencies

. Pattern Formats

For return_pattern rules, the following pattern formats are supported:

  • Simple string: "win", "success", "rare_item"
  • JSON path: "json:result.status==win"
  • Regex: "regex:win|victory|success"
  • Status code: "status:200"

. Actions

  • ban: Ban the IP address
  • log: Log the incident
  • alert: Send an alert notification
  • throttle: Apply rate limiting

. Example Rules

# Usage monitoring
usage_rule = BehaviorRule(
    rule_type="usage",
    threshold=50,
    window=3600,
    action="ban"
)

# Return pattern monitoring
win_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=3,
    pattern="win",
    window=86400,
    action="ban"
)

# Frequency detection
freq_rule = BehaviorRule(
    rule_type="frequency",
    threshold=30,  # 30 requests
    window=300,    # in 5 minutes
    action="alert"
)

Integration with Decorators

The Behavior Manager integrates seamlessly with the decorator system:

from guard.decorators import SecurityDecorator

guard_deco = SecurityDecorator(config)

@app.get("/api/rewards")
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="alert")
def rewards_endpoint():
    return {"reward": "rare_item", "value": 1000}

Redis Integration

The Behavior Manager supports Redis for distributed tracking:

# Initialize with Redis
await tracker.initialize_redis(redis_handler)

# All tracking operations will use Redis for storage
# This enables behavior tracking across multiple instances

Advanced Usage

. Custom Pattern Matching

# JSON path pattern
json_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=5,
    pattern="json:result.reward.rarity==legendary",
    window=86400,
    action="ban"
)

# Regex pattern
regex_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=10,
    pattern="regex:(win|victory|success)",
    window=3600,
    action="alert"
)

# Status code pattern
status_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=100,
    pattern="status:200",
    window=3600,
    action="log"
)

. Multiple Rule Analysis

# Apply multiple rules to an endpoint
rules = [
    BehaviorRule("usage", threshold=20, window=3600, action="ban"),
    BehaviorRule("return_pattern", threshold=5, pattern="win", window=86400, action="alert"),
    BehaviorRule("frequency", threshold=60, window=300, action="throttle")
]

@guard_deco.behavior_analysis(rules)
def complex_endpoint():
    return {"data": "complex"}

Best Practices

. Set Appropriate Thresholds

Match thresholds to expected legitimate usage:

# High-value endpoint - strict limits
@guard_deco.usage_monitor(max_calls=5, window=3600, action="ban")

# Regular endpoint - moderate limits
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")

. Use Graduated Responses

Start with logging, then alerts, then bans:

# First violation - log
BehaviorRule("usage", threshold=10, window=3600, action="log")

# Second violation - alert
BehaviorRule("usage", threshold=20, window=3600, action="alert")

# Third violation - ban
BehaviorRule("usage", threshold=30, window=3600, action="ban")

. Monitor Return Patterns Carefully

Focus on patterns that indicate abuse:

# Gaming/gambling endpoints
@guard_deco.return_monitor("jackpot", max_occurrences=2, window=86400, action="ban")

# Reward systems
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="alert")

# Success patterns
@guard_deco.return_monitor("regex:success|win|victory", max_occurrences=10, window=3600, action="log")

Error Handling

The Behavior Manager handles errors gracefully:

  • Redis Connection Issues: Falls back to in-memory tracking
  • Pattern Matching Errors: Logs errors and continues processing
  • Action Failures: Logs failures but doesn't interrupt request flow

Monitoring and Debugging

Enable detailed logging for behavioral analysis:

config = SecurityConfig(
    log_suspicious_level="DEBUG",
    log_request_level="INFO"
)

# Logs will include:
# - Behavioral rule violations
# - Pattern matching results
# - Action execution results

Performance Considerations

  • Redis Usage: Reduces memory usage and enables distributed tracking
  • Pattern Complexity: Simple string patterns are fastest, regex patterns are slowest
  • Rule Count: More rules per endpoint increase processing time
  • Window Sizes: Larger windows require more memory for tracking

See Also