Skip to content

Behavior Manager

The Behavior Manager handles behavioral analysis and monitoring for FlaskAPI Guard, providing advanced detection capabilities for suspicious usage patterns and automated response actions.


Overview

The Behavior Manager system consists of:

  • BehaviorTracker: Main tracking and analysis engine
  • BehaviorRule: Rule definition for behavioral analysis
  • Integration: Seamless integration with decorators and extension

BehaviorTracker

flaskapi_guard.handlers.behavior_handler.BehaviorTracker(config)

Advanced behavioral analysis tracker for detecting suspicious patterns.

This class can track: - Per-endpoint usage patterns - Return value frequency analysis - Time-based behavioral anomalies

Source code in flaskapi_guard/handlers/behavior_handler.py
def __init__(self, config: SecurityConfig):
    self.config = config
    self.logger = logging.getLogger("flaskapi_guard.handlers.behavior")
    self.usage_counts: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.return_patterns: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.redis_handler: Any | None = None
    self.agent_handler: Any | None = None

agent_handler = None instance-attribute

config = config instance-attribute

logger = logging.getLogger('flaskapi_guard.handlers.behavior') instance-attribute

redis_handler = None instance-attribute

return_patterns = defaultdict(lambda: defaultdict(list)) instance-attribute

usage_counts = defaultdict(lambda: defaultdict(list)) instance-attribute

apply_action(rule, client_ip, endpoint_id, details)

Apply the configured action when a rule is violated.

Source code in flaskapi_guard/handlers/behavior_handler.py
def apply_action(
    self, rule: BehaviorRule, client_ip: str, endpoint_id: str, details: str
) -> None:
    """Apply the configured action when a rule is violated."""
    if self.agent_handler:
        self._send_behavior_event(
            event_type="behavioral_violation",
            ip_address=client_ip,
            action_taken=rule.action
            if not self.config.passive_mode
            else "logged_only",
            reason=f"Behavioral rule violated: {details}",
            endpoint=endpoint_id,
            rule_type=rule.rule_type,
            threshold=rule.threshold,
            window=rule.window,
        )

    if self.config.passive_mode:
        self._log_passive_mode_action(rule, client_ip, details)
    else:
        self._execute_active_mode_action(rule, client_ip, endpoint_id, details)

initialize_agent(agent_handler)

Initialize agent integration.

Source code in flaskapi_guard/handlers/behavior_handler.py
def initialize_agent(self, agent_handler: Any) -> None:
    """Initialize agent integration."""
    self.agent_handler = agent_handler

initialize_redis(redis_handler)

Initialize Redis connection for distributed tracking.

Source code in flaskapi_guard/handlers/behavior_handler.py
def initialize_redis(self, redis_handler: Any) -> None:
    """Initialize Redis connection for distributed tracking."""
    self.redis_handler = redis_handler

track_endpoint_usage(endpoint_id, client_ip, rule)

Track endpoint usage and return True if threshold exceeded.

Parameters:

Name Type Description Default
endpoint_id str

Unique identifier for the endpoint

required
client_ip str

Client IP address

required
rule BehaviorRule

Behavior rule to apply

required

Returns:

Name Type Description
bool bool

True if threshold exceeded, False otherwise

Source code in flaskapi_guard/handlers/behavior_handler.py
def track_endpoint_usage(
    self, endpoint_id: str, client_ip: str, rule: BehaviorRule
) -> bool:
    """
    Track endpoint usage and return True if threshold exceeded.

    Args:
        endpoint_id: Unique identifier for the endpoint
        client_ip: Client IP address
        rule: Behavior rule to apply

    Returns:
        bool: True if threshold exceeded, False otherwise
    """
    current_time = time.time()
    window_start = current_time - rule.window

    if self.redis_handler:
        key = f"behavior:usage:{endpoint_id}:{client_ip}"

        self.redis_handler.set_key(
            "behavior_usage", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern = f"behavior_usage:{key}:*"
        keys = self.redis_handler.keys(pattern)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    timestamps = self.usage_counts[endpoint_id][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

track_return_pattern(endpoint_id, client_ip, response, rule)

Track return value patterns and detect anomalies.

Parameters:

Name Type Description Default
endpoint_id str

Unique identifier for the endpoint

required
client_ip str

Client IP address

required
response Response

Flask Response object

required
rule BehaviorRule

Behavior rule with pattern to match

required

Returns:

Name Type Description
bool bool

True if suspicious pattern detected, False otherwise

Source code in flaskapi_guard/handlers/behavior_handler.py
def track_return_pattern(
    self, endpoint_id: str, client_ip: str, response: Response, rule: BehaviorRule
) -> bool:
    """
    Track return value patterns and detect anomalies.

    Args:
        endpoint_id: Unique identifier for the endpoint
        client_ip: Client IP address
        response: Flask Response object
        rule: Behavior rule with pattern to match

    Returns:
        bool: True if suspicious pattern detected, False otherwise
    """
    if not rule.pattern:
        return False

    current_time = time.time()
    window_start = current_time - rule.window

    pattern_matched = self._check_response_pattern(response, rule.pattern)

    if not pattern_matched:
        return False

    if self.redis_handler:
        key = f"behavior:return:{endpoint_id}:{client_ip}:{rule.pattern}"

        self.redis_handler.set_key(
            "behavior_returns", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern_key = f"behavior_returns:{key}:*"
        keys = self.redis_handler.keys(pattern_key)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    pattern_key = f"{endpoint_id}:{rule.pattern}"
    timestamps = self.return_patterns[pattern_key][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

The main class responsible for tracking and analyzing user behavior patterns.

. Key Features

  • Endpoint Usage Tracking: Monitor how frequently IPs access specific endpoints
  • Return Pattern Analysis: Detect when IPs receive specific response patterns too often
  • Frequency Detection: Identify suspiciously high request frequencies
  • Automated Actions: Apply bans, logs, alerts, or throttling based on rules

. Example Usage

from flaskapi_guard.handlers.behavior_handler import BehaviorTracker, BehaviorRule

# Create tracker
tracker = BehaviorTracker(config)

# Define rules
usage_rule = BehaviorRule(
    rule_type="usage",
    threshold=10,
    window=3600,
    action="ban"
)

return_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=3,
    pattern="win",
    window=86400,
    action="alert"
)

# Track usage
tracker.track_endpoint_usage(endpoint_id, client_ip, usage_rule)

# Track return patterns
tracker.track_return_pattern(endpoint_id, client_ip, response, return_rule)

BehaviorRule

flaskapi_guard.handlers.behavior_handler.BehaviorRule(rule_type, threshold, window=3600, pattern=None, action='log', custom_action=None)

Defines a behavioral analysis rule.

Source code in flaskapi_guard/handlers/behavior_handler.py
def __init__(
    self,
    rule_type: Literal["usage", "return_pattern", "frequency"],
    threshold: int,
    window: int = 3600,
    pattern: str | None = None,
    action: Literal["ban", "log", "throttle", "alert"] = "log",
    custom_action: Callable | None = None,
):
    self.rule_type = rule_type
    self.threshold = threshold
    self.window = window
    self.pattern = pattern
    self.action = action
    self.custom_action = custom_action

action = action instance-attribute

custom_action = custom_action instance-attribute

pattern = pattern instance-attribute

rule_type = rule_type instance-attribute

threshold = threshold instance-attribute

window = window instance-attribute

Configuration class that defines behavioral analysis rules.

. Rule Types

  • usage: Monitor endpoint usage frequency
  • return_pattern: Analyze response patterns
  • frequency: Detect suspicious request frequencies

. Pattern Formats

For return_pattern rules, the following pattern formats are supported:

  • Simple string: "win", "success", "rare_item"
  • JSON path: "json:result.status==win"
  • Regex: "regex:win|victory|success"
  • Status code: "status:200"

. Actions

  • ban: Ban the IP address
  • log: Log the incident
  • alert: Send an alert notification
  • throttle: Apply rate limiting

. Example Rules

# Usage monitoring
usage_rule = BehaviorRule(
    rule_type="usage",
    threshold=50,
    window=3600,
    action="ban"
)

# Return pattern monitoring
win_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=3,
    pattern="win",
    window=86400,
    action="ban"
)

# Frequency detection
freq_rule = BehaviorRule(
    rule_type="frequency",
    threshold=30,  # 30 requests
    window=300,    # in 5 minutes
    action="alert"
)

Integration with Decorators

The Behavior Manager integrates seamlessly with the decorator system:

from flaskapi_guard.decorators import SecurityDecorator

guard_deco = SecurityDecorator(config)

@app.route("/api/rewards")
@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="alert")
def rewards_endpoint():
    return {"reward": "rare_item", "value": 1000}

Redis Integration

The Behavior Manager supports Redis for distributed tracking:

# Initialize with Redis
tracker.initialize_redis(redis_handler)

# All tracking operations will use Redis for storage
# This enables behavior tracking across multiple instances

Advanced Usage

. Custom Pattern Matching

# JSON path pattern
json_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=5,
    pattern="json:result.reward.rarity==legendary",
    window=86400,
    action="ban"
)

# Regex pattern
regex_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=10,
    pattern="regex:(win|victory|success)",
    window=3600,
    action="alert"
)

# Status code pattern
status_rule = BehaviorRule(
    rule_type="return_pattern",
    threshold=100,
    pattern="status:200",
    window=3600,
    action="log"
)

. Multiple Rule Analysis

# Apply multiple rules to an endpoint
rules = [
    BehaviorRule("usage", threshold=20, window=3600, action="ban"),
    BehaviorRule("return_pattern", threshold=5, pattern="win", window=86400, action="alert"),
    BehaviorRule("frequency", threshold=60, window=300, action="throttle")
]

@guard_deco.behavior_analysis(rules)
def complex_endpoint():
    return {"data": "complex"}

Best Practices

. Set Appropriate Thresholds

Match thresholds to expected legitimate usage:

# High-value endpoint - strict limits
@guard_deco.usage_monitor(max_calls=5, window=3600, action="ban")

# Regular endpoint - moderate limits
@guard_deco.usage_monitor(max_calls=50, window=3600, action="alert")

. Use Graduated Responses

Start with logging, then alerts, then bans:

# First violation - log
BehaviorRule("usage", threshold=10, window=3600, action="log")

# Second violation - alert
BehaviorRule("usage", threshold=20, window=3600, action="alert")

# Third violation - ban
BehaviorRule("usage", threshold=30, window=3600, action="ban")

. Monitor Return Patterns Carefully

Focus on patterns that indicate abuse:

# Gaming/gambling endpoints
@guard_deco.return_monitor("jackpot", max_occurrences=2, window=86400, action="ban")

# Reward systems
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="alert")

# Success patterns
@guard_deco.return_monitor("regex:success|win|victory", max_occurrences=10, window=3600, action="log")

Error Handling

The Behavior Manager handles errors gracefully:

  • Redis Connection Issues: Falls back to in-memory tracking
  • Pattern Matching Errors: Logs errors and continues processing
  • Action Failures: Logs failures but doesn't interrupt request flow

Monitoring and Debugging

Enable detailed logging for behavioral analysis:

config = SecurityConfig(
    log_suspicious_level="DEBUG",
    log_request_level="INFO"
)

# Logs will include:
# - Behavioral rule violations
# - Pattern matching results
# - Action execution results

Performance Considerations

  • Redis Usage: Reduces memory usage and enables distributed tracking
  • Pattern Complexity: Simple string patterns are fastest, regex patterns are slowest
  • Rule Count: More rules per endpoint increase processing time
  • Window Sizes: Larger windows require more memory for tracking

See Also