Skip to content

Behavioral Analysis Decorators

Behavioral analysis decorators provide advanced monitoring capabilities to detect suspicious usage patterns, automated behavior, and potential abuse of your API endpoints. These decorators help identify bots, scrapers, and malicious users through behavioral analysis.


Usage Monitoring

Monitor how frequently individual IPs access specific endpoints:

. Basic Usage Monitoring

from guard.decorators import SecurityDecorator

guard_deco = SecurityDecorator(config)

@app.get("/api/sensitive")
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
def sensitive_endpoint():
    return {"data": "sensitive information"}

. Gaming Endpoint Protection

@app.post("/api/game/lootbox")
@guard_deco.usage_monitor(max_calls=5, window=3600, action="ban")
def lootbox_endpoint():
    # Prevent lootbox farming
    return {"reward": "rare_item", "value": 1000}

@app.post("/api/game/daily-reward")
@guard_deco.usage_monitor(max_calls=1, window=86400, action="ban")
def daily_reward():
    # Only once per day per IP
    return {"reward": "daily_bonus", "amount": 100}

. API Rate Abuse Detection

@app.get("/api/expensive-computation")
@guard_deco.usage_monitor(max_calls=3, window=3600, action="throttle")
def expensive_operation():
    # Prevent abuse of computationally expensive operations
    return {"result": "computed_data"}

@app.get("/api/search")
@guard_deco.usage_monitor(max_calls=100, window=3600, action="alert")
def search_endpoint():
    # Monitor for search abuse but don't block immediately
    return {"results": "search_data"}

Return Pattern Monitoring

Detect when the same IP receives specific responses too frequently:

. Win/Success Pattern Detection

@app.post("/api/lottery")
@guard_deco.return_monitor("win", max_occurrences=2, window=86400, action="ban")
def lottery_endpoint():
    # Prevent lottery manipulation
    result = random.choice(["win", "lose", "lose", "lose"])
    return {"result": result, "prize": 1000 if result == "win" else 0}

. Reward System Protection

@app.get("/api/rewards/spin")
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="ban")
def spin_wheel():
    # Prevent rare item farming
    items = ["common", "common", "rare_item", "common"]
    result = random.choice(items)
    return {"item": result, "rarity": "rare" if result == "rare_item" else "common"}

. JSON Path Pattern Matching

@app.post("/api/game/battle")
@guard_deco.return_monitor(
    "json:result.outcome==victory",
    max_occurrences=10,
    window=3600,
    action="alert"
)
def battle_endpoint():
    # Monitor for suspicious win rates
    return {
        "result": {
            "outcome": "victory",
            "experience": 100,
            "loot": ["sword", "gold"]
        }
    }

. Regex Pattern Detection

@app.get("/api/contest/submit")
@guard_deco.return_monitor(
    "regex:(success|winner|prize)",
    max_occurrences=5,
    window=86400,
    action="ban"
)
def contest_submission():
    # Detect multiple contest wins from same IP
    return {"status": "success", "message": "Contest entry submitted"}

Frequency Detection

Detect suspiciously high request frequencies:

. Slow Operations Protection

@app.post("/api/report/generate")
@guard_deco.suspicious_frequency(max_frequency=0.1, window=300, action="ban")
def generate_report():
    # Max 1 request per 10 seconds (0.1 requests/second)
    return {"status": "Report generation started"}

@app.post("/api/backup/create")
@guard_deco.suspicious_frequency(max_frequency=0.017, window=3600, action="ban")
def create_backup():
    # Max 1 request per minute (0.017 requests/second)
    return {"status": "Backup initiated"}

. API Scraping Prevention

@app.get("/api/products/{product_id}")
@guard_deco.suspicious_frequency(max_frequency=2.0, window=300, action="alert")
def product_details(product_id: int):
    # Alert if more than 2 requests per second for 5 minutes
    return {"product": f"Product {product_id}", "price": 99.99}

Complex Behavioral Analysis

Combine multiple behavioral rules for comprehensive protection:

. Multi-Rule Analysis

from guard.handlers.behavior_handler import BehaviorRule

# Define multiple rules
rules = [
    BehaviorRule("usage", threshold=20, window=3600, action="alert"),
    BehaviorRule("return_pattern", threshold=5, pattern="win", window=86400, action="ban"),
    BehaviorRule("frequency", threshold=60, window=300, action="throttle")
]

@app.post("/api/casino/play")
@guard_deco.behavior_analysis(rules)
def casino_game():
    # Protected by multiple behavioral rules
    return {"result": "win", "amount": 500}

. Gaming Platform Protection

# Comprehensive gaming endpoint protection
@app.post("/api/game/action")
@guard_deco.usage_monitor(max_calls=100, window=3600, action="alert")
@guard_deco.return_monitor("critical_hit", max_occurrences=10, window=3600, action="ban")
@guard_deco.suspicious_frequency(max_frequency=5.0, window=60, action="throttle")
def game_action():
    # Multi-layered protection against game exploitation
    return {"action": "attack", "result": "critical_hit", "damage": 150}

. Financial API Protection

@app.post("/api/trading/execute")
@guard_deco.usage_monitor(max_calls=50, window=3600, action="ban")
@guard_deco.return_monitor("profit", max_occurrences=20, window=86400, action="alert")
@guard_deco.suspicious_frequency(max_frequency=1.0, window=60, action="ban")
def execute_trade():
    # Prevent trading bot abuse
    return {"status": "executed", "result": "profit", "amount": 1000}

Action Types

Different actions can be taken when behavioral thresholds are exceeded:

. Ban Action

@guard_deco.usage_monitor(max_calls=5, window=3600, action="ban")
def strict_endpoint():
    # Immediately ban IPs that exceed threshold
    return {"data": "strictly protected"}

. Alert Action

@guard_deco.return_monitor("suspicious_pattern", max_occurrences=3, window=3600, action="alert")
def monitored_endpoint():
    # Log alerts but don't block access
    return {"status": "monitored"}

. Throttle Action

@guard_deco.suspicious_frequency(max_frequency=2.0, window=300, action="throttle")
def throttled_endpoint():
    # Apply rate limiting when threshold exceeded
    return {"data": "throttled access"}

. Log Action

@guard_deco.usage_monitor(max_calls=100, window=3600, action="log")
def logged_endpoint():
    # Only log incidents for analysis
    return {"data": "logged access"}

Advanced Pattern Formats

. Status Code Monitoring

@guard_deco.return_monitor("status:200", max_occurrences=1000, window=3600, action="alert")
def success_monitored():
    # Monitor successful request patterns
    return {"status": "success"}

. Complex JSON Patterns

@guard_deco.return_monitor(
    "json:user.level>50",
    max_occurrences=5,
    window=86400,
    action="ban"
)
def level_up():
    # Detect suspicious leveling patterns
    return {"user": {"level": 55, "experience": 10000}}

@guard_deco.return_monitor(
    "json:transaction.amount>10000",
    max_occurrences=3,
    window=86400,
    action="alert"
)
def high_value_transaction():
    # Monitor large transactions
    return {"transaction": {"amount": 15000, "currency": "USD"}}

Best Practices

. Set Realistic Thresholds

Base thresholds on legitimate user behavior:

# Good: Based on actual usage patterns
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")  # 50/hour is reasonable

# Avoid: Too restrictive for normal users
# @guard_deco.usage_monitor(max_calls=3, window=3600, action="ban")  # Too strict

. Use Graduated Responses

Start with monitoring, then escalate to blocking:

# Progressive enforcement
@guard_deco.usage_monitor(max_calls=20, window=3600, action="log")      # Log at 20
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")    # Alert at 50
@guard_deco.usage_monitor(max_calls=100, window=3600, action="ban")     # Ban at 100

. Monitor Valuable Operations

Focus on endpoints that provide value to attackers:

# High-value endpoints
@guard_deco.return_monitor("rare_reward", max_occurrences=2, window=86400, action="ban")

# Financial operations
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")

# Data extraction points
@guard_deco.suspicious_frequency(max_frequency=1.0, window=60, action="throttle")

. Consider Time Windows Carefully

Match windows to expected usage patterns:

# Daily limits for once-per-day operations
@guard_deco.usage_monitor(max_calls=1, window=86400, action="ban")

# Hourly limits for regular operations
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")

# Short-term frequency detection
@guard_deco.suspicious_frequency(max_frequency=2.0, window=300, action="throttle")

Integration with Redis

For distributed applications, ensure Redis is configured:

config = SecurityConfig(
    enable_redis=True,
    redis_url="redis://localhost:6379",
    redis_prefix="fastapi_guard:"
)

# Behavioral tracking will use Redis for distributed state
guard_deco = SecurityDecorator(config)

Error Handling

Behavioral decorators integrate with middleware error handling:

  • 403 Forbidden: When action is "ban"
  • 429 Too Many Requests: When action is "throttle"
  • Logging: When action is "log" or "alert"

. Custom Error Messages

config = SecurityConfig(
    custom_error_responses={
        403: "Behavioral analysis detected suspicious activity",
        429: "Request frequency too high - throttled"
    }
)

Monitoring and Debugging

Enable detailed logging to monitor behavioral analysis:

config = SecurityConfig(
    log_suspicious_level="DEBUG",
    log_request_level="INFO"
)

# Logs will include:
# - Behavioral rule violations
# - Pattern matching results
# - Action execution details

Testing Behavioral Rules

Test your behavioral decorators:

import pytest
from fastapi.testclient import TestClient

def test_usage_monitor():
    # Should allow normal usage
    for i in range(5):
        response = client.get("/api/monitored")
        assert response.status_code == 200

    # Should block after threshold
    response = client.get("/api/monitored")
    assert response.status_code == 403

def test_return_pattern():
    # Mock responses to trigger pattern
    with patch('random.choice', return_value='win'):
        for i in range(3):
            response = client.post("/api/lottery")
            if i < 2:
                assert response.status_code == 200
            else:
                assert response.status_code == 403  # Blocked after 2 wins

Next Steps

Now that you understand behavioral analysis decorators, explore other security features:

For complete API reference, see the Behavioral Analysis API Documentation.