Behavioral Analysis Decorators¶
Behavioral analysis decorators provide advanced monitoring capabilities to detect suspicious usage patterns, automated behavior, and potential abuse of your API endpoints. These decorators help identify bots, scrapers, and malicious users through behavioral analysis.
Enabling behavioral decorators¶
Behavioral decorators are wired into the running middleware in three steps. The third step is what actually makes the rules fire — attaching a decorator without it leaves the rule inert.
from fastapi import FastAPI
from guard import SecurityConfig, SecurityDecorator, SecurityMiddleware
app = FastAPI()
# 1. Configure and install the middleware
config = SecurityConfig(enable_penetration_detection=True)
app.add_middleware(SecurityMiddleware, config=config)
# 2. Build a decorator bound to the same config
guard_deco = SecurityDecorator(config)
# 3. Wire the decorator into the middleware — REQUIRED
app.state.guard_decorator = guard_deco
@app.get("/api/sensitive")
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
async def sensitive_endpoint() -> dict[str, str]:
return {"data": "sensitive information"}
Without app.state.guard_decorator = guard_deco, the decorators attach to your routes but their rules never fire — the middleware has no handle on the decorator, so no behavioral evaluation runs. If you hold the SecurityMiddleware instance directly rather than using add_middleware, the equivalent call is middleware.set_decorator_handler(guard_deco).
Every decorator example below assumes this wiring is in place.
Usage Monitoring¶
Monitor how frequently individual IPs access specific endpoints:
. Basic Usage Monitoring¶
from guard import SecurityDecorator
guard_deco = SecurityDecorator(config)
@app.get("/api/sensitive")
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
def sensitive_endpoint():
return {"data": "sensitive information"}
. Gaming Endpoint Protection¶
@app.post("/api/game/lootbox")
@guard_deco.usage_monitor(max_calls=5, window=3600, action="ban")
def lootbox_endpoint():
# Prevent lootbox farming
return {"reward": "rare_item", "value": 1000}
@app.post("/api/game/daily-reward")
@guard_deco.usage_monitor(max_calls=1, window=86400, action="ban")
def daily_reward():
# Only once per day per IP
return {"reward": "daily_bonus", "amount": 100}
. API Rate Abuse Detection¶
@app.get("/api/expensive-computation")
@guard_deco.usage_monitor(max_calls=3, window=3600, action="throttle")
def expensive_operation():
# Prevent abuse of computationally expensive operations
return {"result": "computed_data"}
@app.get("/api/search")
@guard_deco.usage_monitor(max_calls=100, window=3600, action="alert")
def search_endpoint():
# Monitor for search abuse but don't block immediately
return {"results": "search_data"}
Return Pattern Monitoring¶
Detect when the same IP receives specific responses too frequently:
. Win/Success Pattern Detection¶
@app.post("/api/lottery")
@guard_deco.return_monitor("win", max_occurrences=2, window=86400, action="ban")
def lottery_endpoint():
# Prevent lottery manipulation
result = random.choice(["win", "lose", "lose", "lose"])
return {"result": result, "prize": 1000 if result == "win" else 0}
. Reward System Protection¶
@app.get("/api/rewards/spin")
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="ban")
def spin_wheel():
# Prevent rare item farming
items = ["common", "common", "rare_item", "common"]
result = random.choice(items)
return {"item": result, "rarity": "rare" if result == "rare_item" else "common"}
. JSON Path Pattern Matching¶
@app.post("/api/game/battle")
@guard_deco.return_monitor(
"json:result.outcome==victory",
max_occurrences=10,
window=3600,
action="alert"
)
def battle_endpoint():
# Monitor for suspicious win rates
return {
"result": {
"outcome": "victory",
"experience": 100,
"loot": ["sword", "gold"]
}
}
. Regex Pattern Detection¶
@app.get("/api/contest/submit")
@guard_deco.return_monitor(
"regex:(success|winner|prize)",
max_occurrences=5,
window=86400,
action="ban"
)
def contest_submission():
# Detect multiple contest wins from same IP
return {"status": "success", "message": "Contest entry submitted"}
Frequency Detection¶
Detect suspiciously high request frequencies:
. Slow Operations Protection¶
@app.post("/api/report/generate")
@guard_deco.suspicious_frequency(max_frequency=0.1, window=300, action="ban")
def generate_report():
# Max 1 request per 10 seconds (0.1 requests/second)
return {"status": "Report generation started"}
@app.post("/api/backup/create")
@guard_deco.suspicious_frequency(max_frequency=0.017, window=3600, action="ban")
def create_backup():
# Max 1 request per minute (0.017 requests/second)
return {"status": "Backup initiated"}
. API Scraping Prevention¶
@app.get("/api/products/{product_id}")
@guard_deco.suspicious_frequency(max_frequency=2.0, window=300, action="alert")
def product_details(product_id: int):
# Alert if more than 2 requests per second for 5 minutes
return {"product": f"Product {product_id}", "price": 99.99}
Complex Behavioral Analysis¶
Combine multiple behavioral rules for comprehensive protection:
. Multi-Rule Analysis¶
from guard import BehaviorRule
# Define multiple rules
rules = [
BehaviorRule("usage", threshold=20, window=3600, action="alert"),
BehaviorRule("return_pattern", threshold=5, pattern="win", window=86400, action="ban"),
BehaviorRule("frequency", threshold=60, window=300, action="throttle")
]
@app.post("/api/casino/play")
@guard_deco.behavior_analysis(rules)
def casino_game():
# Protected by multiple behavioral rules
return {"result": "win", "amount": 500}
. Gaming Platform Protection¶
# Comprehensive gaming endpoint protection
@app.post("/api/game/action")
@guard_deco.usage_monitor(max_calls=100, window=3600, action="alert")
@guard_deco.return_monitor("critical_hit", max_occurrences=10, window=3600, action="ban")
@guard_deco.suspicious_frequency(max_frequency=5.0, window=60, action="throttle")
def game_action():
# Multi-layered protection against game exploitation
return {"action": "attack", "result": "critical_hit", "damage": 150}
. Financial API Protection¶
@app.post("/api/trading/execute")
@guard_deco.usage_monitor(max_calls=50, window=3600, action="ban")
@guard_deco.return_monitor("profit", max_occurrences=20, window=86400, action="alert")
@guard_deco.suspicious_frequency(max_frequency=1.0, window=60, action="ban")
def execute_trade():
# Prevent trading bot abuse
return {"status": "executed", "result": "profit", "amount": 1000}
Action Types¶
Different actions can be taken when behavioral thresholds are exceeded:
. Ban Action¶
@guard_deco.usage_monitor(max_calls=5, window=3600, action="ban")
def strict_endpoint():
# Immediately ban IPs that exceed threshold
return {"data": "strictly protected"}
. Alert Action¶
@guard_deco.return_monitor("suspicious_pattern", max_occurrences=3, window=3600, action="alert")
def monitored_endpoint():
# Log alerts but don't block access
return {"status": "monitored"}
. Throttle Action¶
@guard_deco.suspicious_frequency(max_frequency=2.0, window=300, action="throttle")
def throttled_endpoint():
# Apply rate limiting when threshold exceeded
return {"data": "throttled access"}
. Log Action¶
@guard_deco.usage_monitor(max_calls=100, window=3600, action="log")
def logged_endpoint():
# Only log incidents for analysis
return {"data": "logged access"}
Advanced Pattern Formats¶
. Status Code Monitoring¶
@guard_deco.return_monitor("status:200", max_occurrences=1000, window=3600, action="alert")
def success_monitored():
# Monitor successful request patterns
return {"status": "success"}
. Complex JSON Patterns¶
@guard_deco.return_monitor(
"json:user.level>50",
max_occurrences=5,
window=86400,
action="ban"
)
def level_up():
# Detect suspicious leveling patterns
return {"user": {"level": 55, "experience": 10000}}
@guard_deco.return_monitor(
"json:transaction.amount>10000",
max_occurrences=3,
window=86400,
action="alert"
)
def high_value_transaction():
# Monitor large transactions
return {"transaction": {"amount": 15000, "currency": "USD"}}
Best Practices¶
. Set Realistic Thresholds¶
Base thresholds on legitimate user behavior:
# Good: Based on actual usage patterns
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert") # 50/hour is reasonable
# Avoid: Too restrictive for normal users
# @guard_deco.usage_monitor(max_calls=3, window=3600, action="ban") # Too strict
. Use Graduated Responses¶
Start with monitoring, then escalate to blocking:
# Progressive enforcement
@guard_deco.usage_monitor(max_calls=20, window=3600, action="log") # Log at 20
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert") # Alert at 50
@guard_deco.usage_monitor(max_calls=100, window=3600, action="ban") # Ban at 100
. Monitor Valuable Operations¶
Focus on endpoints that provide value to attackers:
# High-value endpoints
@guard_deco.return_monitor("rare_reward", max_occurrences=2, window=86400, action="ban")
# Financial operations
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
# Data extraction points
@guard_deco.suspicious_frequency(max_frequency=1.0, window=60, action="throttle")
. Consider Time Windows Carefully¶
Match windows to expected usage patterns:
# Daily limits for once-per-day operations
@guard_deco.usage_monitor(max_calls=1, window=86400, action="ban")
# Hourly limits for regular operations
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")
# Short-term frequency detection
@guard_deco.suspicious_frequency(max_frequency=2.0, window=300, action="throttle")
Integration with Redis¶
For distributed applications, ensure Redis is configured:
config = SecurityConfig(
enable_redis=True,
redis_url="redis://localhost:6379",
redis_prefix="fastapi_guard:"
)
# Behavioral tracking will use Redis for distributed state
guard_deco = SecurityDecorator(config)
Error Handling¶
Behavioral decorators integrate with middleware error handling:
- 403 Forbidden: When action is "ban"
- 429 Too Many Requests: When action is "throttle"
- Logging: When action is "log" or "alert"
. Custom Error Messages¶
config = SecurityConfig(
custom_error_responses={
403: "Behavioral analysis detected suspicious activity",
429: "Request frequency too high - throttled"
}
)
Monitoring and Debugging¶
Enable detailed logging to monitor behavioral analysis:
config = SecurityConfig(
log_suspicious_level="DEBUG",
log_request_level="INFO"
)
# Logs will include:
# - Behavioral rule violations
# - Pattern matching results
# - Action execution details
Testing Behavioral Rules¶
Test your behavioral decorators:
import pytest
from fastapi.testclient import TestClient
def test_usage_monitor():
# Should allow normal usage
for i in range(5):
response = client.get("/api/monitored")
assert response.status_code == 200
# Should block after threshold
response = client.get("/api/monitored")
assert response.status_code == 403
def test_return_pattern():
# Mock responses to trigger pattern
with patch('random.choice', return_value='win'):
for i in range(3):
response = client.post("/api/lottery")
if i < 2:
assert response.status_code == 200
else:
assert response.status_code == 403 # Blocked after 2 wins
Verifying it works¶
Once the decorator is wired (see Enabling behavioral decorators), confirm rules are firing end to end:
- Trip a low-threshold rule. Set a deliberately small threshold — e.g.
@guard_deco.usage_monitor(max_calls=2, window=60, action="log")— and call the route a few times. On the threshold breach the engine'sBehavioralProcessoremits adecorator_violationevent. - Observe the outcome. What you see depends on the rule's
action:logwrites a log line,throttledelays, andbanreturns403. With the Guard Agent enabled, thedecorator_violationevents are also shipped to your dashboard, so you can confirm a rule fired even when its action islog. - Trial safely with passive mode. Set
passive_mode=Trueon theSecurityConfigto run behavioral rules in log-only mode: violations are recorded with the[PASSIVE MODE]prefix instead of blocking, so you can validate thresholds against production traffic before enforcing. See Security Monitoring.
If a route's rule never fires, the wiring step is almost always the cause — verify app.state.guard_decorator is set to the same decorator that decorates the route.
Per-route rules vs. fleet-wide rules¶
The decorators on this page attach rules to a single route. To apply a rule to every route — for example fleet-wide 404 / scraping tracking — set SecurityConfig.global_behavior_rules instead. Global rules need no per-route decorator and no app.state.guard_decorator wiring; they are evaluated by the middleware for all traffic.
The two compose: a decorated route is evaluated against both its own decorator rules and any global_behavior_rules. Reach for decorators when a limit is specific to an endpoint, and global_behavior_rules when a policy should hold everywhere. See the Security Config reference for the field details.
Next Steps¶
Now that you understand behavioral analysis decorators, explore other security features:
- Advanced Decorators - Time windows and detection controls
- Access Control Decorators - IP and geographic restrictions
- Content Filtering - Request validation and filtering
- Rate Limiting Decorators - Traditional rate limiting
For complete API reference, see the Behavioral Analysis API Documentation.