Skip to content

Behavior Manager

guard_core.handlers.behavior_handler.BehaviorTracker(config)

Source code in guard_core/handlers/behavior_handler.py
def __init__(self, config: SecurityConfig):
    self.config = config
    self.logger = logging.getLogger("guard_core.handlers.behavior")
    self.usage_counts: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.return_patterns: dict[str, dict[str, list[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    self.redis_handler: Any | None = None
    self.agent_handler: Any | None = None

agent_handler = None instance-attribute

config = config instance-attribute

logger = logging.getLogger('guard_core.handlers.behavior') instance-attribute

redis_handler = None instance-attribute

return_patterns = defaultdict(lambda: defaultdict(list)) instance-attribute

usage_counts = defaultdict(lambda: defaultdict(list)) instance-attribute

apply_action(rule, client_ip, endpoint_id, details) async

Source code in guard_core/handlers/behavior_handler.py
async def apply_action(
    self, rule: BehaviorRule, client_ip: str, endpoint_id: str, details: str
) -> None:
    if self.agent_handler:
        await self._send_behavior_event(
            event_type="behavioral_violation",
            ip_address=client_ip,
            action_taken=rule.action
            if not self.config.passive_mode
            else "logged_only",
            reason=f"Behavioral rule violated: {details}",
            endpoint=endpoint_id,
            rule_type=rule.rule_type,
            threshold=rule.threshold,
            window=rule.window,
        )

    if self.config.passive_mode:
        self._log_passive_mode_action(rule, client_ip, details)
    else:
        await self._execute_active_mode_action(
            rule, client_ip, endpoint_id, details
        )

initialize_agent(agent_handler) async

Source code in guard_core/handlers/behavior_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/behavior_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler

track_endpoint_usage(endpoint_id, client_ip, rule) async

Source code in guard_core/handlers/behavior_handler.py
async def track_endpoint_usage(
    self, endpoint_id: str, client_ip: str, rule: BehaviorRule
) -> bool:
    current_time = time.time()
    window_start = current_time - rule.window

    if self.redis_handler:
        key = f"behavior:usage:{endpoint_id}:{client_ip}"

        await self.redis_handler.set_key(
            "behavior_usage", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern = f"behavior_usage:{key}:*"
        keys = await self.redis_handler.keys(pattern)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    timestamps = self.usage_counts[endpoint_id][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

track_return_pattern(endpoint_id, client_ip, response, rule) async

Source code in guard_core/handlers/behavior_handler.py
async def track_return_pattern(
    self,
    endpoint_id: str,
    client_ip: str,
    response: GuardResponse,
    rule: BehaviorRule,
) -> bool:
    if not rule.pattern:
        return False

    current_time = time.time()
    window_start = current_time - rule.window

    pattern_matched = await self._check_response_pattern(response, rule.pattern)

    if not pattern_matched:
        return False

    if self.redis_handler:
        key = f"behavior:return:{endpoint_id}:{client_ip}:{rule.pattern}"

        await self.redis_handler.set_key(
            "behavior_returns", f"{key}:{current_time}", "1", ttl=rule.window
        )

        pattern_key = f"behavior_returns:{key}:*"
        keys = await self.redis_handler.keys(pattern_key)

        valid_count = 0
        for key_name in keys:
            try:
                timestamp = float(key_name.split(":")[-1])
                if timestamp >= window_start:
                    valid_count += 1
            except (ValueError, IndexError):
                continue

        return valid_count > rule.threshold

    pattern_key = f"{endpoint_id}:{rule.pattern}"
    timestamps = self.return_patterns[pattern_key][client_ip]

    timestamps[:] = [ts for ts in timestamps if ts >= window_start]

    timestamps.append(current_time)

    return len(timestamps) > rule.threshold

The Behavior Manager handles behavioral analysis for detecting suspicious usage patterns.


BehaviorRule

guard_core.handlers.behavior_handler.BehaviorRule(rule_type, threshold, window=3600, pattern=None, action='log', custom_action=None)

Source code in guard_core/handlers/behavior_handler.py
def __init__(
    self,
    rule_type: Literal["usage", "return_pattern", "frequency"],
    threshold: int,
    window: int = 3600,
    pattern: str | None = None,
    action: Literal["ban", "log", "throttle", "alert"] = "log",
    custom_action: Callable | None = None,
):
    self.rule_type = rule_type
    self.threshold = threshold
    self.window = window
    self.pattern = pattern
    self.action = action
    self.custom_action = custom_action

action = action instance-attribute

custom_action = custom_action instance-attribute

pattern = pattern instance-attribute

rule_type = rule_type instance-attribute

threshold = threshold instance-attribute

window = window instance-attribute

Rule types: usage, return_pattern, frequency

Pattern formats: Simple string, JSON path (json:result.status==win), Regex (regex:win|victory), Status code (status:200)

Actions: ban, log, alert, throttle


Integration with Decorators

guard_deco = SecurityDecorator(config)

@guard_deco.usage_monitor(max_calls=10, window=3600, action="ban")
@guard_deco.return_monitor("rare_item", max_occurrences=3, window=86400, action="alert")
def rewards_endpoint(request):
    return JsonResponse({"reward": "rare_item"})

See Also