Skip to content

Utilities

The utils module provides various helper functions for security operations.


Logging Functions

setup_custom_logging

def setup_custom_logging(
    log_file: str | None = None
) -> logging.Logger:
    """
    Setup custom logging for Guard Core.

    Configures a hierarchical logger that outputs to both console and file.
    Console output is ALWAYS enabled for visibility.
    File output is optional for persistence.

    Args:
        log_file: Optional path to log file. If None, only console output is enabled.
                  If provided, creates the directory if it doesn't exist.

    Returns:
        logging.Logger: Configured logger with namespace "guard_core"

    Note: This function is synchronous (not async).
    """

log_activity

async def log_activity(
    request: GuardRequest,
    logger: logging.Logger,
    log_type: str = "request",
    reason: str = "",
    passive_mode: bool = False,
    trigger_info: str = "",
    level: Literal["INFO", "DEBUG", "WARNING", "ERROR", "CRITICAL"] | None = "WARNING"
):
    """
    Universal logging function for all types of requests and activities.
    """

Parameters:

  • request: The request object implementing GuardRequest
  • logger: The logger instance
  • log_type: Type of log entry (default: "request", can also be "suspicious")
  • reason: Reason for flagging an activity
  • passive_mode: Whether to enable passive mode logging format
  • trigger_info: Details about what triggered detection
  • level: The logging level to use. If None, logging is disabled. Defaults to "WARNING".

This is a unified logging function that handles regular requests, suspicious activities, and passive mode logging.


Security Check Functions

is_user_agent_allowed

async def is_user_agent_allowed(
    user_agent: str,
    config: Any
) -> bool:
    """
    Check if user agent is allowed.
    """

check_ip_country

async def check_ip_country(
    request: str | GuardRequest,
    config: Any,
    geo_ip_handler: GeoIPHandler
) -> bool:
    """
    Check if IP is from a blocked country.
    """

is_ip_allowed

async def is_ip_allowed(
    ip: str,
    config: Any,
    geo_ip_handler: GeoIPHandler | None = None
) -> bool:
    """
    Check if IP address is allowed.
    """

The geo_ip_handler parameter is now properly optional - it's only needed when country filtering is configured. If it's not provided when country filtering is configured, the function will work correctly but won't apply country filtering rules.

This function intelligently handles:

  • Whitelist/blacklist checking
  • Country filtering (only when GeoIPHandler is provided)
  • Cloud provider detection (only when cloud blocking is configured)

This selective processing aligns with Guard Core's smart resource loading to optimize performance.

detect_penetration_attempt

async def detect_penetration_attempt(
    request: GuardRequest,
    config: SecurityConfig | None = None,
    route_config: RouteConfig | None = None,
) -> DetectionResult

Detect potential penetration attempts in the request using the enhanced Detection Engine.

This function analyzes various parts of the request (query params, URL path, headers, body) using the Detection Engine's components including pattern matching, semantic analysis, and performance monitoring.

Parameters:

  • request: The request object implementing GuardRequest
  • config: Optional SecurityConfig. When provided, drives global header/param/body-field exclusion sets and the active category filter.
  • route_config: Optional RouteConfig. Per-route overrides take precedence over config when both are non-None.

Returns a DetectionResult dataclass:

  • is_threat: boolTrue if any pattern (regex or semantic) matched.
  • trigger_info: str — Human-readable description of what triggered the hit, or empty string when is_threat is False.
  • threat_categories: list[str] — Ordered list of categories that matched (e.g. ["sqli", "xss"]). Categories with no category label (legacy semantic threats) are skipped.
  • threat_scores: dict[str, float] — Maximum score recorded per category. Regex matches contribute 1.0; semantic matches contribute their probability or threat score.

The Detection Engine provides:

  • Timeout-protected pattern matching (configured via detection_compiler_timeout in SecurityConfig)
  • Intelligent content preprocessing that preserves attack patterns
  • Semantic analysis for obfuscated attacks (when enabled)
  • Performance monitoring for pattern effectiveness

Example usage:

from guard_core.protocols import GuardRequest
from guard_core.utils import detect_penetration_attempt

@app.post("/api/submit")
async def submit_data(request: GuardRequest):
    result = await detect_penetration_attempt(request)
    if not result.is_threat:
        return {"success": True}
    logger.warning(
        f"Attack detected: {result.trigger_info} "
        f"categories={result.threat_categories} "
        f"scores={result.threat_scores}"
    )
    return {"error": "Suspicious activity detected"}

extract_client_ip

async def extract_client_ip(
    request: GuardRequest,
    config: Any,
    agent_handler: AgentHandlerProtocol | None = None,
) -> str:
    """
    Securely extract the client IP address from the request, considering trusted proxies.

    This function implements a secure approach to IP extraction that protects against
    X-Forwarded-For header injection attacks.
    """

Parameters:

  • request: The request object implementing GuardRequest
  • config: The security configuration (must have trusted_proxies and trusted_proxy_depth attributes)
  • agent_handler: Optional agent handler for sending IP spoofing telemetry events

This function provides a secure way to extract client IPs by:

  1. Only trusting X-Forwarded-For headers from configured trusted proxies
  2. Using the connecting IP when not from a trusted proxy
  3. Properly handling proxy chains based on configured depth
  4. Detecting and reporting IP spoofing attempts via the agent handler

Usage Examples

from guard_core.utils import (
    setup_custom_logging,
    log_activity,
    detect_penetration_attempt
)

# Setup logging (synchronous function)
# Console only
logger = setup_custom_logging()  # or setup_custom_logging(None)

# Console + file
logger = setup_custom_logging("security.log")

# Log regular request
await log_activity(request, logger)

# Log suspicious activity
await log_activity(
    request,
    logger,
    log_type="suspicious",
    reason="Suspicious pattern detected"
)

# Check for penetration attempts
result = await detect_penetration_attempt(request)
if result.is_threat:
    logger.warning(f"{result.trigger_info} - {result.threat_categories}")