Handlers
Guard-core handlers are singleton services that manage specific security subsystems. Each handler supports optional Redis and agent integration.
IPBanManager
Manages banned IP addresses with a dual-layer cache (TTLCache + Redis).
guard_core.handlers.ipban_handler.IPBanManager
agent_handler = None
class-attribute
instance-attribute
banned_ips
instance-attribute
redis_handler = None
class-attribute
instance-attribute
ban_ip(ip, duration, reason='threshold_exceeded')
async
Source code in guard_core/handlers/ipban_handler.py
| async def ban_ip(
self, ip: str, duration: int, reason: str = "threshold_exceeded"
) -> None:
expiry = time.time() + duration
self.banned_ips[ip] = expiry
if self.redis_handler:
await self.redis_handler.set_key(
"banned_ips", ip, str(expiry), ttl=duration
)
if self.agent_handler:
await self._send_ban_event(ip, duration, reason)
|
initialize_agent(agent_handler)
async
Source code in guard_core/handlers/ipban_handler.py
| async def initialize_agent(self, agent_handler: Any) -> None:
self.agent_handler = agent_handler
|
initialize_redis(redis_handler)
async
Source code in guard_core/handlers/ipban_handler.py
| async def initialize_redis(self, redis_handler: Any) -> None:
self.redis_handler = redis_handler
|
is_ip_banned(ip)
async
Source code in guard_core/handlers/ipban_handler.py
| async def is_ip_banned(self, ip: str) -> bool:
current_time = time.time()
if ip in self.banned_ips:
if current_time > self.banned_ips[ip]:
del self.banned_ips[ip]
return False
return True
if self.redis_handler:
expiry = await self.redis_handler.get_key("banned_ips", ip)
if expiry:
expiry_time = float(expiry)
if current_time <= expiry_time:
self.banned_ips[ip] = expiry_time
return True
await self.redis_handler.delete("banned_ips", ip)
return False
|
reset()
async
Source code in guard_core/handlers/ipban_handler.py
| async def reset(self) -> None:
self.banned_ips.clear()
if self.redis_handler:
async with self.redis_handler.get_connection() as conn:
keys = await conn.keys(
f"{self.redis_handler.config.redis_prefix}banned_ips:*"
)
if keys:
await conn.delete(*keys)
|
unban_ip(ip)
async
Source code in guard_core/handlers/ipban_handler.py
| async def unban_ip(self, ip: str) -> None:
if ip in self.banned_ips:
del self.banned_ips[ip]
if self.redis_handler:
await self.redis_handler.delete("banned_ips", ip)
if self.agent_handler:
await self._send_unban_event(ip)
|
RateLimitManager
Implements sliding window rate limiting with in-memory and Redis backends.
guard_core.handlers.ratelimit_handler.RateLimitManager
agent_handler = None
class-attribute
instance-attribute
config
instance-attribute
logger
instance-attribute
rate_limit_script_sha = None
class-attribute
instance-attribute
redis_handler = None
class-attribute
instance-attribute
request_timestamps
instance-attribute
check_rate_limit(request, client_ip, create_error_response, endpoint_path='', rate_limit=None, rate_limit_window=None)
async
Source code in guard_core/handlers/ratelimit_handler.py
| async def check_rate_limit(
self,
request: GuardRequest,
client_ip: str,
create_error_response: Callable[[int, str], Awaitable[GuardResponse]],
endpoint_path: str = "",
rate_limit: int | None = None,
rate_limit_window: int | None = None,
) -> GuardResponse | None:
if not self.config.enable_rate_limiting:
return None
effective_limit = (
rate_limit if rate_limit is not None else self.config.rate_limit
)
effective_window = (
rate_limit_window
if rate_limit_window is not None
else self.config.rate_limit_window
)
current_time = time.time()
window_start = current_time - effective_window
if self.config.enable_redis and self.redis_handler:
count = await self._get_redis_request_count(
client_ip,
current_time,
window_start,
endpoint_path=endpoint_path,
rate_limit_window=effective_window,
rate_limit=effective_limit,
)
if count is not None:
if count > effective_limit:
return await self._handle_rate_limit_exceeded(
request,
client_ip,
count,
create_error_response,
rate_limit_window=effective_window,
)
return None
request_count = self._get_in_memory_request_count(
client_ip, window_start, current_time, endpoint_path=endpoint_path
)
if request_count >= effective_limit:
return await self._handle_rate_limit_exceeded(
request,
client_ip,
request_count + 1,
create_error_response,
rate_limit_window=effective_window,
)
return None
|
initialize_agent(agent_handler)
async
Source code in guard_core/handlers/ratelimit_handler.py
| async def initialize_agent(self, agent_handler: Any) -> None:
self.agent_handler = agent_handler
|
initialize_redis(redis_handler)
async
Source code in guard_core/handlers/ratelimit_handler.py
| async def initialize_redis(self, redis_handler: Any) -> None:
self.redis_handler = redis_handler
if self.redis_handler and self.config.enable_redis:
try:
async with self.redis_handler.get_connection() as conn:
self.rate_limit_script_sha = await conn.script_load(
RATE_LIMIT_SCRIPT
)
self.logger.info("Rate limiting Lua script loaded successfully")
except Exception as e:
self.logger.error(f"Failed to load rate limiting Lua script: {str(e)}")
|
reset()
async
Source code in guard_core/handlers/ratelimit_handler.py
| async def reset(self) -> None:
self.request_timestamps.clear()
if self.config.enable_redis and self.redis_handler:
try:
keys = await self.redis_handler.keys("rate_limit:rate:*")
if keys and len(keys) > 0:
await self.redis_handler.delete_pattern("rate_limit:rate:*")
except Exception as e:
self.logger.error(f"Failed to reset Redis rate limits: {str(e)}")
|
CloudManager
Fetches and caches IP ranges for AWS, GCP, and Azure cloud providers.
guard_core.handlers.cloud_handler.CloudManager
agent_handler = None
class-attribute
instance-attribute
ip_ranges
instance-attribute
last_updated
instance-attribute
logger
instance-attribute
redis_handler = None
class-attribute
instance-attribute
get_cloud_provider_details(ip, providers=_ALL_PROVIDERS)
Source code in guard_core/handlers/cloud_handler.py
| def get_cloud_provider_details(
self, ip: str, providers: set[str] = _ALL_PROVIDERS
) -> tuple[str, str] | None:
try:
ip_obj = ipaddress.ip_address(ip)
for provider in providers:
if provider in self.ip_ranges:
for network in self.ip_ranges[provider]:
if ip_obj in network:
return (provider, str(network))
return None
except ValueError:
self.logger.error(f"Invalid IP address: {ip}")
return None
|
initialize_agent(agent_handler)
async
Source code in guard_core/handlers/cloud_handler.py
| async def initialize_agent(self, agent_handler: Any) -> None:
self.agent_handler = agent_handler
|
initialize_redis(redis_handler, providers=_ALL_PROVIDERS, ttl=3600)
async
Source code in guard_core/handlers/cloud_handler.py
| async def initialize_redis(
self,
redis_handler: Any,
providers: set[str] = _ALL_PROVIDERS,
ttl: int = 3600,
) -> None:
self.redis_handler = redis_handler
await self.refresh_async(providers, ttl=ttl)
|
is_cloud_ip(ip, providers=_ALL_PROVIDERS)
Source code in guard_core/handlers/cloud_handler.py
| def is_cloud_ip(self, ip: str, providers: set[str] = _ALL_PROVIDERS) -> bool:
try:
ip_obj = ipaddress.ip_address(ip)
for provider in providers:
if provider in self.ip_ranges:
for network in self.ip_ranges[provider]:
if ip_obj in network:
return True
return False
except ValueError:
self.logger.error(f"Invalid IP address: {ip}")
return False
|
refresh(providers=_ALL_PROVIDERS)
Source code in guard_core/handlers/cloud_handler.py
| def refresh(self, providers: set[str] = _ALL_PROVIDERS) -> None:
if self.redis_handler is None:
self._refresh_sync(providers)
else:
raise RuntimeError("Use async refresh() when Redis is enabled")
|
refresh_async(providers=_ALL_PROVIDERS, ttl=3600)
async
Source code in guard_core/handlers/cloud_handler.py
| async def refresh_async(
self, providers: set[str] = _ALL_PROVIDERS, ttl: int = 3600
) -> None:
if self.redis_handler is None:
self._refresh_sync(providers)
return
for provider in providers:
try:
cached_ranges = await self.redis_handler.get_key(
"cloud_ranges", provider
)
if cached_ranges:
self.ip_ranges[provider] = {
ipaddress.ip_network(ip) for ip in cached_ranges.split(",")
}
continue
fetch_func = {
"AWS": fetch_aws_ip_ranges,
"GCP": fetch_gcp_ip_ranges,
"Azure": fetch_azure_ip_ranges,
}[provider]
ranges = fetch_func()
if ranges:
old_ranges = self.ip_ranges.get(provider, set())
self._log_range_changes(provider, old_ranges, ranges)
self.ip_ranges[provider] = ranges
self.last_updated[provider] = datetime.now(timezone.utc)
await self.redis_handler.set_key(
"cloud_ranges",
provider,
",".join(str(ip) for ip in ranges),
ttl=ttl,
)
except Exception as e:
self.logger.error(f"Failed to refresh {provider} IP ranges: {str(e)}")
if provider not in self.ip_ranges:
self.ip_ranges[provider] = set()
|
send_cloud_detection_event(ip, provider, network, action_taken='request_blocked')
async
Source code in guard_core/handlers/cloud_handler.py
| async def send_cloud_detection_event(
self,
ip: str,
provider: str,
network: str,
action_taken: str = "request_blocked",
) -> None:
if not self.agent_handler:
return
await self._send_cloud_event(
event_type="cloud_blocked",
ip_address=ip,
action_taken=action_taken,
reason=f"IP belongs to blocked cloud provider: {provider}",
cloud_provider=provider,
network=network,
)
|
RedisManager
Provides namespaced Redis operations with connection management and fault tolerance.
guard_core.handlers.redis_handler.RedisManager
agent_handler = None
class-attribute
instance-attribute
config
instance-attribute
logger
instance-attribute
close()
async
Source code in guard_core/handlers/redis_handler.py
| async def close(self) -> None:
if self._redis:
await self._redis.aclose()
self._redis = None
self.logger.info("Redis connection closed")
await self._send_redis_event(
event_type="redis_connection",
action_taken="connection_closed",
reason="Redis connection closed gracefully",
)
self._closed = True
|
delete(namespace, key)
async
Source code in guard_core/handlers/redis_handler.py
| async def delete(self, namespace: str, key: str) -> int | None:
if not self.config.enable_redis:
return None
async def _delete(conn: Redis) -> int:
full_key = f"{self.config.redis_prefix}{namespace}:{key}"
delete_result = await conn.delete(full_key)
return int(delete_result) if delete_result is not None else 0
result = await self.safe_operation(_delete)
return int(result) if result is not None else 0
|
delete_pattern(pattern)
async
Source code in guard_core/handlers/redis_handler.py
| async def delete_pattern(self, pattern: str) -> int | None:
if not self.config.enable_redis:
return None
async def _delete_pattern(conn: Redis) -> int:
full_pattern = f"{self.config.redis_prefix}{pattern}"
keys = await conn.keys(full_pattern)
if not keys:
return 0
result = await conn.delete(*keys)
return int(result) if result is not None else 0
result = await self.safe_operation(_delete_pattern)
return int(result) if result is not None else 0
|
exists(namespace, key)
async
Source code in guard_core/handlers/redis_handler.py
| async def exists(self, namespace: str, key: str) -> bool | None:
if not self.config.enable_redis:
return None
async def _exists(conn: Redis) -> bool:
full_key = f"{self.config.redis_prefix}{namespace}:{key}"
return bool(await conn.exists(full_key))
result = await self.safe_operation(_exists)
return False if result is None else bool(result)
|
get_connection()
async
Source code in guard_core/handlers/redis_handler.py
| @asynccontextmanager
async def get_connection(self) -> AsyncIterator[Redis]:
try:
if self._closed:
await self._send_redis_event(
event_type="redis_error",
action_taken="operation_failed",
reason="Attempted to use closed Redis connection",
error_type="connection_closed",
)
raise GuardRedisError(503, "Redis connection closed")
if not self._redis:
await self.initialize()
if self._redis is None:
await self._send_redis_event(
event_type="redis_error",
action_taken="operation_failed",
reason="Redis connection is None after initialization",
error_type="initialization_failed",
)
raise GuardRedisError(503, "Redis connection failed")
yield self._redis
except (ConnectionError, AttributeError) as e:
self.logger.error(f"Redis operation failed: {str(e)}")
await self._send_redis_event(
event_type="redis_error",
action_taken="operation_failed",
reason=f"Redis operation failed: {str(e)}",
error_type="operation_error",
)
raise GuardRedisError(503, "Redis connection failed") from e
|
get_key(namespace, key)
async
Source code in guard_core/handlers/redis_handler.py
| async def get_key(self, namespace: str, key: str) -> Any:
if not self.config.enable_redis:
return None
async def _get(conn: Redis) -> Any:
full_key = f"{self.config.redis_prefix}{namespace}:{key}"
return await conn.get(full_key)
return await self.safe_operation(_get)
|
incr(namespace, key, ttl=None)
async
Source code in guard_core/handlers/redis_handler.py
| async def incr(
self, namespace: str, key: str, ttl: int | None = None
) -> int | None:
if not self.config.enable_redis:
return None
async def _incr(conn: Redis) -> int:
full_key = f"{self.config.redis_prefix}{namespace}:{key}"
async with conn.pipeline() as pipe:
await pipe.incr(full_key)
if ttl:
await pipe.expire(full_key, ttl)
result = await pipe.execute()
return int(result[0]) if result else 0
result = await self.safe_operation(_incr)
return int(result) if result is not None else 0
|
initialize()
async
Source code in guard_core/handlers/redis_handler.py
| async def initialize(self) -> None:
if self._closed or not self.config.enable_redis:
self._redis = None
return
async with self._connection_lock:
try:
if self.config.redis_url is not None:
self._redis = Redis.from_url(
self.config.redis_url, decode_responses=True
)
if self._redis is not None:
await self._redis.ping()
self.logger.info("Redis connection established")
await self._send_redis_event(
event_type="redis_connection",
action_taken="connection_established",
reason="Redis connection successfully established",
redis_url=self.config.redis_url,
)
else:
self.logger.warning("Redis URL is None, skipping connection")
except Exception as e:
self.logger.error(f"Redis connection failed: {str(e)}")
await self._send_redis_event(
event_type="redis_error",
action_taken="connection_failed",
reason=f"Redis connection failed: {str(e)}",
redis_url=self.config.redis_url,
error_type="connection_error",
)
self._redis = None
raise GuardRedisError(503, "Redis connection failed") from e
|
initialize_agent(agent_handler)
async
Source code in guard_core/handlers/redis_handler.py
| async def initialize_agent(self, agent_handler: Any) -> None:
self.agent_handler = agent_handler
|
keys(pattern)
async
Source code in guard_core/handlers/redis_handler.py
| async def keys(self, pattern: str) -> list[str] | None:
if not self.config.enable_redis:
return None
async def _keys(conn: Redis) -> list[str]:
full_pattern = f"{self.config.redis_prefix}{pattern}"
keys = await conn.keys(full_pattern)
return [str(k) for k in keys] if keys else []
result = await self.safe_operation(_keys)
return result if result is not None else []
|
safe_operation(func, *args, **kwargs)
async
Source code in guard_core/handlers/redis_handler.py
| async def safe_operation(self, func: Any, *args: Any, **kwargs: Any) -> Any:
if not self.config.enable_redis:
return None
try:
async with self.get_connection() as conn:
return await func(conn, *args, **kwargs)
except Exception as e:
self.logger.error(f"Redis operation failed: {str(e)}")
await self._send_redis_event(
event_type="redis_error",
action_taken="safe_operation_failed",
reason=f"Redis safe operation failed: {str(e)}",
error_type="safe_operation_error",
function_name=getattr(func, "__name__", "unknown"),
)
raise GuardRedisError(503, "Redis operation failed") from e
|
set_key(namespace, key, value, ttl=None)
async
Source code in guard_core/handlers/redis_handler.py
| async def set_key(
self, namespace: str, key: str, value: Any, ttl: int | None = None
) -> bool | None:
if not self.config.enable_redis:
return None
async def _set(conn: Redis) -> bool:
full_key = f"{self.config.redis_prefix}{namespace}:{key}"
if ttl:
return bool(await conn.setex(full_key, ttl, value))
return bool(await conn.set(full_key, value))
result = await self.safe_operation(_set)
return False if result is None else bool(result)
|
Applies HTTP security headers (CSP, HSTS, CORS, and defaults) to responses.
guard_core.handlers.security_headers_handler.SecurityHeadersManager
agent_handler = None
class-attribute
instance-attribute
cors_config
instance-attribute
csp_config
instance-attribute
custom_headers
instance-attribute
default_headers = {'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': 'SAMEORIGIN', 'X-XSS-Protection': '1; mode=block', 'Referrer-Policy': 'strict-origin-when-cross-origin', 'Permissions-Policy': 'geolocation=(), microphone=(), camera=()', 'X-Permitted-Cross-Domain-Policies': 'none', 'X-Download-Options': 'noopen', 'Cross-Origin-Embedder-Policy': 'require-corp', 'Cross-Origin-Opener-Policy': 'same-origin', 'Cross-Origin-Resource-Policy': 'same-origin'}
class-attribute
instance-attribute
enabled
instance-attribute
headers_cache
instance-attribute
hsts_config
instance-attribute
logger
instance-attribute
redis_handler = None
class-attribute
instance-attribute
Source code in guard_core/handlers/security_headers_handler.py
| def configure(
self,
*,
enabled: bool = True,
csp: dict[str, list[str]] | None = None,
hsts_max_age: int | None = None,
hsts_include_subdomains: bool = True,
hsts_preload: bool = False,
frame_options: str | None = None,
content_type_options: str | None = None,
xss_protection: str | None = None,
referrer_policy: str | None = None,
permissions_policy: str | None = "UNSET",
custom_headers: dict[str, str] | None = None,
cors_origins: list[str] | None = None,
cors_allow_credentials: bool = False,
cors_allow_methods: list[str] | None = None,
cors_allow_headers: list[str] | None = None,
) -> None:
self.enabled = enabled
self._configure_csp(csp)
self._configure_hsts(hsts_max_age, hsts_include_subdomains, hsts_preload)
self._configure_cors(
cors_origins, cors_allow_credentials, cors_allow_methods, cors_allow_headers
)
self._update_default_headers(
frame_options,
content_type_options,
xss_protection,
referrer_policy,
permissions_policy,
)
self._add_custom_headers(custom_headers)
|
get_cors_headers(origin)
async
Source code in guard_core/handlers/security_headers_handler.py
| async def get_cors_headers(self, origin: str) -> dict[str, str]:
if not self.cors_config:
return {}
allowed_origins = self.cors_config.get("origins", [])
if not isinstance(allowed_origins, list):
return {}
if self._is_wildcard_with_credentials(allowed_origins):
return {}
if not self._is_origin_allowed(origin, allowed_origins):
return {}
allow_methods, allow_headers = self._get_validated_cors_config()
return self._build_cors_headers(
origin, allowed_origins, allow_methods, allow_headers
)
|
get_headers(request_path=None)
async
Source code in guard_core/handlers/security_headers_handler.py
| async def get_headers(self, request_path: str | None = None) -> dict[str, str]:
if not self.enabled:
return {}
cache_key = self._generate_cache_key(request_path)
if cache_key in self.headers_cache:
cached = self.headers_cache[cache_key]
if isinstance(cached, dict):
return cached
headers = self.default_headers.copy()
if self.csp_config:
headers["Content-Security-Policy"] = self._build_csp(self.csp_config)
if self.hsts_config:
headers["Strict-Transport-Security"] = self._build_hsts(self.hsts_config)
headers.update(self.custom_headers)
self.headers_cache[cache_key] = headers
if self.agent_handler and request_path:
await self._send_headers_applied_event(request_path, headers)
return headers
|
initialize_agent(agent_handler)
async
Source code in guard_core/handlers/security_headers_handler.py
| async def initialize_agent(self, agent_handler: Any) -> None:
self.agent_handler = agent_handler
|
initialize_redis(redis_handler)
async
Source code in guard_core/handlers/security_headers_handler.py
| async def initialize_redis(self, redis_handler: Any) -> None:
self.redis_handler = redis_handler
await self._load_cached_config()
await self._cache_configuration()
|
reset()
async
Source code in guard_core/handlers/security_headers_handler.py
| async def reset(self) -> None:
self.headers_cache.clear()
self.custom_headers.clear()
self.csp_config = None
self.hsts_config = None
self.cors_config = None
self.enabled = True
self.default_headers = self.__class__.default_headers.copy()
if self.redis_handler:
try:
async with self.redis_handler.get_connection() as conn:
keys = await conn.keys(
f"{self.redis_handler.config.redis_prefix}security_headers:*"
)
if keys:
await conn.delete(*keys)
except Exception as e:
self.logger.warning(f"Failed to clear Redis cache: {e}")
|
validate_csp_report(report)
async
Source code in guard_core/handlers/security_headers_handler.py
| async def validate_csp_report(self, report: dict[str, Any]) -> bool:
required_fields = ["document-uri", "violated-directive", "blocked-uri"]
csp_report = report.get("csp-report", {})
if not all(field in csp_report for field in required_fields):
return False
self.logger.warning(
f"CSP Violation: {csp_report.get('violated-directive')} "
f"blocked {csp_report.get('blocked-uri')} "
f"on {csp_report.get('document-uri')}"
)
if self.agent_handler:
await self._send_csp_violation_event(csp_report)
return True
|
BehaviorTracker
Tracks endpoint usage patterns and response patterns for behavioral analysis.
guard_core.handlers.behavior_handler.BehaviorTracker(config)
Source code in guard_core/handlers/behavior_handler.py
| def __init__(self, config: SecurityConfig):
self.config = config
self.logger = logging.getLogger("guard_core.handlers.behavior")
self.usage_counts: dict[str, dict[str, list[float]]] = defaultdict(
lambda: defaultdict(list)
)
self.return_patterns: dict[str, dict[str, list[float]]] = defaultdict(
lambda: defaultdict(list)
)
self.redis_handler: Any | None = None
self.agent_handler: Any | None = None
|
agent_handler = None
instance-attribute
config = config
instance-attribute
logger = logging.getLogger('guard_core.handlers.behavior')
instance-attribute
redis_handler = None
instance-attribute
return_patterns = defaultdict(lambda: defaultdict(list))
instance-attribute
usage_counts = defaultdict(lambda: defaultdict(list))
instance-attribute
apply_action(rule, client_ip, endpoint_id, details)
async
Source code in guard_core/handlers/behavior_handler.py
| async def apply_action(
self, rule: BehaviorRule, client_ip: str, endpoint_id: str, details: str
) -> None:
if self.agent_handler:
await self._send_behavior_event(
event_type="behavioral_violation",
ip_address=client_ip,
action_taken=rule.action
if not self.config.passive_mode
else "logged_only",
reason=f"Behavioral rule violated: {details}",
endpoint=endpoint_id,
rule_type=rule.rule_type,
threshold=rule.threshold,
window=rule.window,
)
if self.config.passive_mode:
self._log_passive_mode_action(rule, client_ip, details)
else:
await self._execute_active_mode_action(
rule, client_ip, endpoint_id, details
)
|
initialize_agent(agent_handler)
async
Source code in guard_core/handlers/behavior_handler.py
| async def initialize_agent(self, agent_handler: Any) -> None:
self.agent_handler = agent_handler
|
initialize_redis(redis_handler)
async
Source code in guard_core/handlers/behavior_handler.py
| async def initialize_redis(self, redis_handler: Any) -> None:
self.redis_handler = redis_handler
|
track_endpoint_usage(endpoint_id, client_ip, rule)
async
Source code in guard_core/handlers/behavior_handler.py
| async def track_endpoint_usage(
self, endpoint_id: str, client_ip: str, rule: BehaviorRule
) -> bool:
current_time = time.time()
window_start = current_time - rule.window
if self.redis_handler:
key = f"behavior:usage:{endpoint_id}:{client_ip}"
await self.redis_handler.set_key(
"behavior_usage", f"{key}:{current_time}", "1", ttl=rule.window
)
pattern = f"behavior_usage:{key}:*"
keys = await self.redis_handler.keys(pattern)
valid_count = 0
for key_name in keys:
try:
timestamp = float(key_name.split(":")[-1])
if timestamp >= window_start:
valid_count += 1
except (ValueError, IndexError):
continue
return valid_count > rule.threshold
timestamps = self.usage_counts[endpoint_id][client_ip]
timestamps[:] = [ts for ts in timestamps if ts >= window_start]
timestamps.append(current_time)
return len(timestamps) > rule.threshold
|
track_return_pattern(endpoint_id, client_ip, response, rule)
async
Source code in guard_core/handlers/behavior_handler.py
| async def track_return_pattern(
self,
endpoint_id: str,
client_ip: str,
response: GuardResponse,
rule: BehaviorRule,
) -> bool:
if not rule.pattern:
return False
current_time = time.time()
window_start = current_time - rule.window
pattern_matched = await self._check_response_pattern(response, rule.pattern)
if not pattern_matched:
return False
if self.redis_handler:
key = f"behavior:return:{endpoint_id}:{client_ip}:{rule.pattern}"
await self.redis_handler.set_key(
"behavior_returns", f"{key}:{current_time}", "1", ttl=rule.window
)
pattern_key = f"behavior_returns:{key}:*"
keys = await self.redis_handler.keys(pattern_key)
valid_count = 0
for key_name in keys:
try:
timestamp = float(key_name.split(":")[-1])
if timestamp >= window_start:
valid_count += 1
except (ValueError, IndexError):
continue
return valid_count > rule.threshold
pattern_key = f"{endpoint_id}:{rule.pattern}"
timestamps = self.return_patterns[pattern_key][client_ip]
timestamps[:] = [ts for ts in timestamps if ts >= window_start]
timestamps.append(current_time)
return len(timestamps) > rule.threshold
|
SusPatternsManager
Orchestrates the detection engine for threat pattern matching and semantic analysis.
guard_core.handlers.suspatterns_handler.SusPatternsManager
agent_handler
instance-attribute
compiled_custom_patterns
instance-attribute
compiled_patterns
instance-attribute
custom_patterns
instance-attribute
patterns = [(p[0]) for p in _pattern_definitions]
class-attribute
instance-attribute
redis_handler
instance-attribute
add_pattern(pattern, custom=False)
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def add_pattern(cls, pattern: str, custom: bool = False) -> None:
instance = cls()
compiled_pattern = re.compile(pattern, re.IGNORECASE | re.MULTILINE)
compiled_tuple = (compiled_pattern, _CTX_ALL)
if custom:
instance.compiled_custom_patterns.add(compiled_tuple)
instance.custom_patterns.add(pattern)
if instance.redis_handler:
await instance.redis_handler.set_key(
"patterns", "custom", ",".join(instance.custom_patterns)
)
else:
instance.compiled_patterns.append(compiled_tuple)
instance.patterns.append(pattern)
if instance._compiler:
await instance._compiler.clear_cache()
if instance.agent_handler:
details = f"{'Custom' if custom else 'Default'} pattern added"
await instance._send_pattern_event(
event_type="pattern_added",
ip_address="system",
action_taken="pattern_added",
reason=f"{details} to detection system",
pattern=pattern,
pattern_type="custom" if custom else "default",
total_patterns=len(instance.custom_patterns)
if custom
else len(instance.patterns),
)
|
Source code in guard_core/handlers/suspatterns_handler.py
| async def configure_semantic_threshold(self, threshold: float) -> None:
self._semantic_threshold = max(0.0, min(1.0, threshold))
|
detect(content, ip_address, context='unknown', correlation_id=None)
async
Source code in guard_core/handlers/suspatterns_handler.py
| async def detect(
self,
content: str,
ip_address: str,
context: str = "unknown",
correlation_id: str | None = None,
) -> dict[str, Any]:
original_content = content
execution_start = time.time()
processed_content = await self._preprocess_content(content, correlation_id)
regex_threats, matched_patterns, timeouts = await self._check_regex_patterns(
processed_content, ip_address, correlation_id, context
)
semantic_threats, semantic_score = await self._check_semantic_threats(
processed_content
)
threats = regex_threats + semantic_threats
is_threat = len(threats) > 0
threat_score = await self._calculate_threat_score(
regex_threats, semantic_threats
)
total_execution_time = time.time() - execution_start
if self._performance_monitor:
await self._performance_monitor.record_metric(
pattern="overall_detection",
execution_time=total_execution_time,
content_length=len(content),
matched=is_threat,
timeout=False,
agent_handler=self.agent_handler,
correlation_id=correlation_id,
)
if is_threat:
await self._send_threat_event(
matched_patterns,
semantic_threats,
ip_address,
context,
content,
threat_score,
threats,
regex_threats,
timeouts,
total_execution_time,
correlation_id,
)
return {
"is_threat": is_threat,
"threat_score": threat_score,
"threats": threats,
"context": context,
"original_length": len(original_content),
"processed_length": len(processed_content),
"execution_time": total_execution_time,
"detection_method": "enhanced" if self._compiler else "legacy",
"timeouts": timeouts,
"correlation_id": correlation_id,
}
|
detect_pattern_match(content, ip_address, context='unknown', correlation_id=None)
async
Source code in guard_core/handlers/suspatterns_handler.py
| async def detect_pattern_match(
self,
content: str,
ip_address: str,
context: str = "unknown",
correlation_id: str | None = None,
) -> tuple[bool, str | None]:
result = await self.detect(content, ip_address, context, correlation_id)
if result["is_threat"]:
if result["threats"]:
threat = result["threats"][0]
if threat["type"] == "regex":
return True, threat["pattern"]
elif threat["type"] == "semantic":
return True, f"semantic:{threat.get('attack_type', 'suspicious')}"
return True, "unknown"
return False, None
|
get_all_compiled_patterns()
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def get_all_compiled_patterns(
cls,
) -> list[tuple[re.Pattern, frozenset[str]]]:
instance = cls()
return instance.compiled_patterns + list(instance.compiled_custom_patterns)
|
get_all_patterns()
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def get_all_patterns(cls) -> list[str]:
instance = cls()
return instance.patterns + list(instance.custom_patterns)
|
get_component_status()
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def get_component_status(cls) -> dict[str, bool]:
instance = cls()
return {
"compiler": instance._compiler is not None,
"preprocessor": instance._preprocessor is not None,
"semantic_analyzer": instance._semantic_analyzer is not None,
"performance_monitor": instance._performance_monitor is not None,
}
|
get_custom_compiled_patterns()
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def get_custom_compiled_patterns(
cls,
) -> list[tuple[re.Pattern, frozenset[str]]]:
instance = cls()
return list(instance.compiled_custom_patterns)
|
get_custom_patterns()
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def get_custom_patterns(cls) -> list[str]:
instance = cls()
return list(instance.custom_patterns)
|
get_default_compiled_patterns()
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def get_default_compiled_patterns(
cls,
) -> list[tuple[re.Pattern, frozenset[str]]]:
instance = cls()
return instance.compiled_patterns.copy()
|
get_default_patterns()
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def get_default_patterns(cls) -> list[str]:
instance = cls()
return instance.patterns.copy()
|
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def get_performance_stats(cls) -> dict[str, Any] | None:
instance = cls()
if instance._performance_monitor:
return {
"summary": instance._performance_monitor.get_summary_stats(),
"slow_patterns": instance._performance_monitor.get_slow_patterns(),
"problematic_patterns": (
instance._performance_monitor.get_problematic_patterns()
),
}
return None
|
initialize_agent(agent_handler)
async
Source code in guard_core/handlers/suspatterns_handler.py
| async def initialize_agent(self, agent_handler: Any) -> None:
self.agent_handler = agent_handler
|
initialize_redis(redis_handler)
async
Source code in guard_core/handlers/suspatterns_handler.py
| async def initialize_redis(self, redis_handler: Any) -> None:
self.redis_handler = redis_handler
if self.redis_handler:
cached_patterns = await self.redis_handler.get_key("patterns", "custom")
if cached_patterns:
patterns = cached_patterns.split(",")
for pattern in patterns:
if pattern not in self.custom_patterns:
await self.add_pattern(pattern, custom=True)
|
remove_pattern(pattern, custom=False)
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def remove_pattern(cls, pattern: str, custom: bool = False) -> bool:
instance = cls()
if custom:
pattern_removed = await instance._remove_custom_pattern(pattern)
else:
pattern_removed = await instance._remove_default_pattern(pattern)
if pattern_removed:
await instance._clear_pattern_caches(pattern)
if pattern_removed:
total_patterns = (
len(instance.custom_patterns) if custom else len(instance.patterns)
)
await instance._send_pattern_removal_event(pattern, custom, total_patterns)
return pattern_removed
|
reset()
async
classmethod
Source code in guard_core/handlers/suspatterns_handler.py
| @classmethod
async def reset(cls) -> None:
if cls._instance is not None:
cls._instance.custom_patterns.clear()
cls._instance.compiled_custom_patterns.clear()
cls._instance.redis_handler = None
cls._instance.agent_handler = None
if hasattr(cls._instance, "_compiler") and cls._instance._compiler:
await cls._instance._compiler.clear_cache()
if (
hasattr(cls._instance, "_performance_monitor")
and cls._instance._performance_monitor
):
cls._instance._performance_monitor.pattern_stats.clear()
cls._instance._performance_monitor.recent_metrics.clear()
cls._config = None
|