Utilities¶
The utils module provides various helper functions for security operations.
Logging Functions¶
setup_custom_logging¶
def setup_custom_logging(
log_file: str | None = None
) -> logging.Logger:
"""
Setup custom logging for FlaskAPI Guard.
Configures a hierarchical logger that outputs to both console and file.
Console output is ALWAYS enabled for visibility.
File output is optional for persistence.
Args:
log_file: Optional path to log file. If None, only console output is enabled.
If provided, creates the directory if it doesn't exist.
Returns:
logging.Logger: Configured logger with namespace "flaskapi_guard"
Note: This function is synchronous (not async).
"""
log_activity¶
def log_activity(
request: Request,
logger: logging.Logger,
log_type: str = "request",
reason: str = "",
passive_mode: bool = False,
trigger_info: str = "",
level: Literal["INFO", "DEBUG", "WARNING", "ERROR", "CRITICAL"] | None = "WARNING"
):
"""
Universal logging function for all types of requests and activities.
"""
Parameters:
request: The Flask request objectlogger: The logger instancelog_type: Type of log entry (default: "request", can also be "suspicious")reason: Reason for flagging an activitypassive_mode: Whether to enable passive mode logging formattrigger_info: Details about what triggered detectionlevel: The logging level to use. IfNone, logging is disabled. Defaults to "WARNING".
This is a unified logging function that handles regular requests, suspicious activities, and passive mode logging.
Security Check Functions¶
is_user_agent_allowed¶
def is_user_agent_allowed(
user_agent: str,
config: SecurityConfig
) -> bool:
"""
Check if user agent is allowed.
"""
check_ip_country¶
def check_ip_country(
request: str | Request,
config: SecurityConfig,
ipinfo_db: IPInfoManager
) -> bool:
"""
Check if IP is from a blocked country.
"""
is_ip_allowed¶
def is_ip_allowed(
ip: str,
config: SecurityConfig,
ipinfo_db: IPInfoManager | None = None
) -> bool:
"""
Check if IP address is allowed.
"""
The ipinfo_db parameter is now properly optional - it's only needed when country filtering is configured. If it's not provided when country filtering is configured, the function will work correctly but won't apply country filtering rules.
This function intelligently handles:
- Whitelist/blacklist checking
- Country filtering (only when IPInfoManager is provided)
- Cloud provider detection (only when cloud blocking is configured)
This selective processing aligns with FlaskAPI Guard's smart resource loading to optimize performance.
detect_penetration_attempt¶
def detect_penetration_attempt(
request: Request,
config: SecurityConfig | None = None,
route_config: RouteConfig | None = None,
) -> DetectionResult
Detect potential penetration attempts in the request using the Detection Engine.
This function analyzes various parts of the request (query params, body, path, headers) using the Detection Engine's components including pattern matching, semantic analysis, and performance monitoring.
Parameters:
request: The Flask request object to analyzeconfig: OptionalSecurityConfigused to resolve global detection-exclusion lists and enabled threat categoriesroute_config: Optional per-routeRouteConfigwhose exclusions and category overrides take precedence overconfig
Returns a DetectionResult dataclass with the following fields:
is_threat(bool):Trueif a potential attack is detected,Falseotherwisetrigger_info(str): Human-readable details about what triggered the detection, or an empty string if no attack was detectedthreat_categories(list[str]): Distinct categories matched on this request (e.g."sql_injection","xss","path_traversal"); empty whenis_threatisFalsethreat_scores(dict[str, float]): Highest score observed per category inthreat_categories
The Detection Engine provides:
- Timeout-protected pattern matching (configured via detection_compiler_timeout in SecurityConfig)
- Intelligent content preprocessing that preserves attack patterns
- Semantic analysis for obfuscated attacks (when enabled)
- Performance monitoring for pattern effectiveness
Example usage:
from flask import request
from guard_core.sync.utils import detect_penetration_attempt
@app.route("/api/submit", methods=["POST"])
def submit_data():
result = detect_penetration_attempt(request)
if result.is_threat:
logger.warning(f"Attack detected: {result.trigger_info}")
return {"error": "Suspicious activity detected"}
return {"success": True}
@app.route("/api/critical", methods=["POST"])
def critical_endpoint():
# Timeout protection is configured via SecurityConfig.detection_compiler_timeout
result = detect_penetration_attempt(request)
if result.is_threat:
return {
"error": "Security check failed",
"categories": result.threat_categories,
}
return {"success": True}
extract_client_ip¶
def extract_client_ip(request: Request, config: SecurityConfig) -> str:
"""
Securely extract the client IP address from the request, considering trusted proxies.
This function implements a secure approach to IP extraction that protects against
X-Forwarded-For header injection attacks.
"""
This function provides a secure way to extract client IPs by:
- Only trusting X-Forwarded-For headers from configured trusted proxies
- Using the connecting IP when not from a trusted proxy
- Properly handling proxy chains based on configured depth
Usage Examples¶
from guard_core.sync.utils import (
setup_custom_logging,
log_activity,
detect_penetration_attempt
)
# Setup logging (synchronous function)
# Console only
logger = setup_custom_logging() # or setup_custom_logging(None)
# Console + file
logger = setup_custom_logging("security.log")
# Log regular request
log_activity(request, logger)
# Log suspicious activity
log_activity(
request,
logger,
log_type="suspicious",
reason="Suspicious pattern detected"
)
# Check for penetration attempts
result = detect_penetration_attempt(request)
if result.is_threat:
logger.warning(f"Attack detected: {result.trigger_info}")