Skip to content

Behavioral Analysis Decorators

Behavioral analysis decorators provide advanced monitoring capabilities to detect suspicious usage patterns, automated behavior, and potential abuse of your API endpoints. These decorators help identify bots, scrapers, and malicious users through behavioral analysis.


Enabling behavioral decorators

Behavioral decorators are wired into the running middleware in three steps. The third step is what actually makes the rules fire — attaching a decorator without it leaves the rule inert.

from fastapi import FastAPI

from guard import SecurityConfig, SecurityDecorator, SecurityMiddleware

app = FastAPI()

# 1. Configure and install the middleware
config = SecurityConfig(enable_penetration_detection=True)
app.add_middleware(SecurityMiddleware, config=config)

# 2. Build a decorator bound to the same config
guard_deco = SecurityDecorator(config)

# 3. Wire the decorator into the middleware — REQUIRED
app.state.guard_decorator = guard_deco


@app.get("/api/sensitive")
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
async def sensitive_endpoint() -> dict[str, str]:
    return {"data": "sensitive information"}

Without app.state.guard_decorator = guard_deco, the decorators attach to your routes but their rules never fire — the middleware has no handle on the decorator, so no behavioral evaluation runs. If you hold the SecurityMiddleware instance directly rather than using add_middleware, the equivalent call is middleware.set_decorator_handler(guard_deco).

Every decorator example below assumes this wiring is in place.


Usage Monitoring

Monitor how frequently individual IPs access specific endpoints:

. Basic Usage Monitoring

from guard import SecurityDecorator

guard_deco = SecurityDecorator(config)

@app.get("/api/sensitive")
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
def sensitive_endpoint():
    return {"data": "sensitive information"}

. Gaming Endpoint Protection

@app.post("/api/game/lootbox")
@guard_deco.usage_monitor(max_calls=5, window=3600, action="ban")
def lootbox_endpoint():
    # Prevent lootbox farming
    return {"reward": "rare_item", "value": 1000}

@app.post("/api/game/daily-reward")
@guard_deco.usage_monitor(max_calls=1, window=86400, action="ban")
def daily_reward():
    # Only once per day per IP
    return {"reward": "daily_bonus", "amount": 100}

. API Rate Abuse Detection

@app.get("/api/expensive-computation")
@guard_deco.usage_monitor(max_calls=3, window=3600, action="throttle")
def expensive_operation():
    # Prevent abuse of computationally expensive operations
    return {"result": "computed_data"}

@app.get("/api/search")
@guard_deco.usage_monitor(max_calls=100, window=3600, action="alert")
def search_endpoint():
    # Monitor for search abuse but don't block immediately
    return {"results": "search_data"}

Return Pattern Monitoring

Detect when the same IP receives specific responses too frequently:

. Win/Success Pattern Detection

@app.post("/api/lottery")
@guard_deco.return_monitor("win", max_occurrences=2, window=86400, action="ban")
def lottery_endpoint():
    # Prevent lottery manipulation
    result = random.choice(["win", "lose", "lose", "lose"])
    return {"result": result, "prize": 1000 if result == "win" else 0}

. Reward System Protection

@app.get("/api/rewards/spin")
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="ban")
def spin_wheel():
    # Prevent rare item farming
    items = ["common", "common", "rare_item", "common"]
    result = random.choice(items)
    return {"item": result, "rarity": "rare" if result == "rare_item" else "common"}

. JSON Path Pattern Matching

@app.post("/api/game/battle")
@guard_deco.return_monitor(
    "json:result.outcome==victory",
    max_occurrences=10,
    window=3600,
    action="alert"
)
def battle_endpoint():
    # Monitor for suspicious win rates
    return {
        "result": {
            "outcome": "victory",
            "experience": 100,
            "loot": ["sword", "gold"]
        }
    }

. Regex Pattern Detection

@app.get("/api/contest/submit")
@guard_deco.return_monitor(
    "regex:(success|winner|prize)",
    max_occurrences=5,
    window=86400,
    action="ban"
)
def contest_submission():
    # Detect multiple contest wins from same IP
    return {"status": "success", "message": "Contest entry submitted"}

Frequency Detection

Detect suspiciously high request frequencies:

. Slow Operations Protection

@app.post("/api/report/generate")
@guard_deco.suspicious_frequency(max_frequency=0.1, window=300, action="ban")
def generate_report():
    # Max 1 request per 10 seconds (0.1 requests/second)
    return {"status": "Report generation started"}

@app.post("/api/backup/create")
@guard_deco.suspicious_frequency(max_frequency=0.017, window=3600, action="ban")
def create_backup():
    # Max 1 request per minute (0.017 requests/second)
    return {"status": "Backup initiated"}

. API Scraping Prevention

@app.get("/api/products/{product_id}")
@guard_deco.suspicious_frequency(max_frequency=2.0, window=300, action="alert")
def product_details(product_id: int):
    # Alert if more than 2 requests per second for 5 minutes
    return {"product": f"Product {product_id}", "price": 99.99}

Complex Behavioral Analysis

Combine multiple behavioral rules for comprehensive protection:

. Multi-Rule Analysis

from guard import BehaviorRule

# Define multiple rules
rules = [
    BehaviorRule("usage", threshold=20, window=3600, action="alert"),
    BehaviorRule("return_pattern", threshold=5, pattern="win", window=86400, action="ban"),
    BehaviorRule("frequency", threshold=60, window=300, action="throttle")
]

@app.post("/api/casino/play")
@guard_deco.behavior_analysis(rules)
def casino_game():
    # Protected by multiple behavioral rules
    return {"result": "win", "amount": 500}

. Gaming Platform Protection

# Comprehensive gaming endpoint protection
@app.post("/api/game/action")
@guard_deco.usage_monitor(max_calls=100, window=3600, action="alert")
@guard_deco.return_monitor("critical_hit", max_occurrences=10, window=3600, action="ban")
@guard_deco.suspicious_frequency(max_frequency=5.0, window=60, action="throttle")
def game_action():
    # Multi-layered protection against game exploitation
    return {"action": "attack", "result": "critical_hit", "damage": 150}

. Financial API Protection

@app.post("/api/trading/execute")
@guard_deco.usage_monitor(max_calls=50, window=3600, action="ban")
@guard_deco.return_monitor("profit", max_occurrences=20, window=86400, action="alert")
@guard_deco.suspicious_frequency(max_frequency=1.0, window=60, action="ban")
def execute_trade():
    # Prevent trading bot abuse
    return {"status": "executed", "result": "profit", "amount": 1000}

Action Types

Different actions can be taken when behavioral thresholds are exceeded:

. Ban Action

@guard_deco.usage_monitor(max_calls=5, window=3600, action="ban")
def strict_endpoint():
    # Immediately ban IPs that exceed threshold
    return {"data": "strictly protected"}

. Alert Action

@guard_deco.return_monitor("suspicious_pattern", max_occurrences=3, window=3600, action="alert")
def monitored_endpoint():
    # Log alerts but don't block access
    return {"status": "monitored"}

. Throttle Action

@guard_deco.suspicious_frequency(max_frequency=2.0, window=300, action="throttle")
def throttled_endpoint():
    # Apply rate limiting when threshold exceeded
    return {"data": "throttled access"}

. Log Action

@guard_deco.usage_monitor(max_calls=100, window=3600, action="log")
def logged_endpoint():
    # Only log incidents for analysis
    return {"data": "logged access"}

Advanced Pattern Formats

. Status Code Monitoring

@guard_deco.return_monitor("status:200", max_occurrences=1000, window=3600, action="alert")
def success_monitored():
    # Monitor successful request patterns
    return {"status": "success"}

. Complex JSON Patterns

@guard_deco.return_monitor(
    "json:user.level>50",
    max_occurrences=5,
    window=86400,
    action="ban"
)
def level_up():
    # Detect suspicious leveling patterns
    return {"user": {"level": 55, "experience": 10000}}

@guard_deco.return_monitor(
    "json:transaction.amount>10000",
    max_occurrences=3,
    window=86400,
    action="alert"
)
def high_value_transaction():
    # Monitor large transactions
    return {"transaction": {"amount": 15000, "currency": "USD"}}

Best Practices

. Set Realistic Thresholds

Base thresholds on legitimate user behavior:

# Good: Based on actual usage patterns
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")  # 50/hour is reasonable

# Avoid: Too restrictive for normal users
# @guard_deco.usage_monitor(max_calls=3, window=3600, action="ban")  # Too strict

. Use Graduated Responses

Start with monitoring, then escalate to blocking:

# Progressive enforcement
@guard_deco.usage_monitor(max_calls=20, window=3600, action="log")      # Log at 20
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")    # Alert at 50
@guard_deco.usage_monitor(max_calls=100, window=3600, action="ban")     # Ban at 100

. Monitor Valuable Operations

Focus on endpoints that provide value to attackers:

# High-value endpoints
@guard_deco.return_monitor("rare_reward", max_occurrences=2, window=86400, action="ban")

# Financial operations
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")

# Data extraction points
@guard_deco.suspicious_frequency(max_frequency=1.0, window=60, action="throttle")

. Consider Time Windows Carefully

Match windows to expected usage patterns:

# Daily limits for once-per-day operations
@guard_deco.usage_monitor(max_calls=1, window=86400, action="ban")

# Hourly limits for regular operations
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")

# Short-term frequency detection
@guard_deco.suspicious_frequency(max_frequency=2.0, window=300, action="throttle")

Integration with Redis

For distributed applications, ensure Redis is configured:

config = SecurityConfig(
    enable_redis=True,
    redis_url="redis://localhost:6379",
    redis_prefix="fastapi_guard:"
)

# Behavioral tracking will use Redis for distributed state
guard_deco = SecurityDecorator(config)

Error Handling

Behavioral decorators integrate with middleware error handling:

  • 403 Forbidden: When action is "ban"
  • 429 Too Many Requests: When action is "throttle"
  • Logging: When action is "log" or "alert"

. Custom Error Messages

config = SecurityConfig(
    custom_error_responses={
        403: "Behavioral analysis detected suspicious activity",
        429: "Request frequency too high - throttled"
    }
)

Monitoring and Debugging

Enable detailed logging to monitor behavioral analysis:

config = SecurityConfig(
    log_suspicious_level="DEBUG",
    log_request_level="INFO"
)

# Logs will include:
# - Behavioral rule violations
# - Pattern matching results
# - Action execution details

Testing Behavioral Rules

Test your behavioral decorators:

import pytest
from fastapi.testclient import TestClient

def test_usage_monitor():
    # Should allow normal usage
    for i in range(5):
        response = client.get("/api/monitored")
        assert response.status_code == 200

    # Should block after threshold
    response = client.get("/api/monitored")
    assert response.status_code == 403

def test_return_pattern():
    # Mock responses to trigger pattern
    with patch('random.choice', return_value='win'):
        for i in range(3):
            response = client.post("/api/lottery")
            if i < 2:
                assert response.status_code == 200
            else:
                assert response.status_code == 403  # Blocked after 2 wins

Verifying it works

Once the decorator is wired (see Enabling behavioral decorators), confirm rules are firing end to end:

  • Trip a low-threshold rule. Set a deliberately small threshold — e.g. @guard_deco.usage_monitor(max_calls=2, window=60, action="log") — and call the route a few times. On the threshold breach the engine's BehavioralProcessor emits a decorator_violation event.
  • Observe the outcome. What you see depends on the rule's action: log writes a log line, throttle delays, and ban returns 403. With the Guard Agent enabled, the decorator_violation events are also shipped to your dashboard, so you can confirm a rule fired even when its action is log.
  • Trial safely with passive mode. Set passive_mode=True on the SecurityConfig to run behavioral rules in log-only mode: violations are recorded with the [PASSIVE MODE] prefix instead of blocking, so you can validate thresholds against production traffic before enforcing. See Security Monitoring.

If a route's rule never fires, the wiring step is almost always the cause — verify app.state.guard_decorator is set to the same decorator that decorates the route.


Per-route rules vs. fleet-wide rules

The decorators on this page attach rules to a single route. To apply a rule to every route — for example fleet-wide 404 / scraping tracking — set SecurityConfig.global_behavior_rules instead. Global rules need no per-route decorator and no app.state.guard_decorator wiring; they are evaluated by the middleware for all traffic.

The two compose: a decorated route is evaluated against both its own decorator rules and any global_behavior_rules. Reach for decorators when a limit is specific to an endpoint, and global_behavior_rules when a policy should hold everywhere. See the Security Config reference for the field details.


Next Steps

Now that you understand behavioral analysis decorators, explore other security features:

For complete API reference, see the Behavioral Analysis API Documentation.