Skip to content

Security Headers

The SecurityHeadersManager applies HTTP security headers to responses following OWASP best practices. It supports Content Security Policy (CSP), HTTP Strict Transport Security (HSTS), CORS, and a comprehensive set of default headers.

SecurityHeadersManager

guard_core.handlers.security_headers_handler.SecurityHeadersManager

agent_handler = None class-attribute instance-attribute

cors_config instance-attribute

csp_config instance-attribute

custom_headers instance-attribute

default_headers = {'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': 'SAMEORIGIN', 'X-XSS-Protection': '1; mode=block', 'Referrer-Policy': 'strict-origin-when-cross-origin', 'Permissions-Policy': 'geolocation=(), microphone=(), camera=()', 'X-Permitted-Cross-Domain-Policies': 'none', 'X-Download-Options': 'noopen', 'Cross-Origin-Embedder-Policy': 'require-corp', 'Cross-Origin-Opener-Policy': 'same-origin', 'Cross-Origin-Resource-Policy': 'same-origin'} class-attribute instance-attribute

enabled instance-attribute

headers_cache instance-attribute

hsts_config instance-attribute

logger instance-attribute

redis_handler = None class-attribute instance-attribute

configure(*, enabled=True, csp=None, hsts_max_age=None, hsts_include_subdomains=True, hsts_preload=False, frame_options=None, content_type_options=None, xss_protection=None, referrer_policy=None, permissions_policy='UNSET', custom_headers=None, cors_origins=None, cors_allow_credentials=False, cors_allow_methods=None, cors_allow_headers=None)

Source code in guard_core/handlers/security_headers_handler.py
def configure(
    self,
    *,
    enabled: bool = True,
    csp: dict[str, list[str]] | None = None,
    hsts_max_age: int | None = None,
    hsts_include_subdomains: bool = True,
    hsts_preload: bool = False,
    frame_options: str | None = None,
    content_type_options: str | None = None,
    xss_protection: str | None = None,
    referrer_policy: str | None = None,
    permissions_policy: str | None = "UNSET",
    custom_headers: dict[str, str] | None = None,
    cors_origins: list[str] | None = None,
    cors_allow_credentials: bool = False,
    cors_allow_methods: list[str] | None = None,
    cors_allow_headers: list[str] | None = None,
) -> None:
    self.enabled = enabled

    self._configure_csp(csp)
    self._configure_hsts(hsts_max_age, hsts_include_subdomains, hsts_preload)
    self._configure_cors(
        cors_origins, cors_allow_credentials, cors_allow_methods, cors_allow_headers
    )
    self._update_default_headers(
        frame_options,
        content_type_options,
        xss_protection,
        referrer_policy,
        permissions_policy,
    )
    self._add_custom_headers(custom_headers)

get_cors_headers(origin) async

Source code in guard_core/handlers/security_headers_handler.py
async def get_cors_headers(self, origin: str) -> dict[str, str]:
    if not self.cors_config:
        return {}

    allowed_origins = self.cors_config.get("origins", [])
    if not isinstance(allowed_origins, list):
        return {}

    if self._is_wildcard_with_credentials(allowed_origins):
        return {}

    if not self._is_origin_allowed(origin, allowed_origins):
        return {}

    allow_methods, allow_headers = self._get_validated_cors_config()
    return self._build_cors_headers(
        origin, allowed_origins, allow_methods, allow_headers
    )

get_headers(request_path=None) async

Source code in guard_core/handlers/security_headers_handler.py
async def get_headers(self, request_path: str | None = None) -> dict[str, str]:
    if not self.enabled:
        return {}

    cache_key = self._generate_cache_key(request_path)
    if cache_key in self.headers_cache:
        cached = self.headers_cache[cache_key]
        if isinstance(cached, dict):
            return cached

    headers = self.default_headers.copy()

    if self.csp_config:
        headers["Content-Security-Policy"] = self._build_csp(self.csp_config)

    if self.hsts_config:
        headers["Strict-Transport-Security"] = self._build_hsts(self.hsts_config)

    headers.update(self.custom_headers)

    self.headers_cache[cache_key] = headers

    if self.agent_handler and request_path:
        await self._send_headers_applied_event(request_path, headers)

    return headers

initialize_agent(agent_handler) async

Source code in guard_core/handlers/security_headers_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler) async

Source code in guard_core/handlers/security_headers_handler.py
async def initialize_redis(self, redis_handler: Any) -> None:
    self.redis_handler = redis_handler
    await self._load_cached_config()
    await self._cache_configuration()

reset() async

Source code in guard_core/handlers/security_headers_handler.py
async def reset(self) -> None:
    self.headers_cache.clear()
    self.custom_headers.clear()
    self.csp_config = None
    self.hsts_config = None
    self.cors_config = None
    self.enabled = True
    self.default_headers = self.__class__.default_headers.copy()

    if self.redis_handler:
        try:
            async with self.redis_handler.get_connection() as conn:
                keys = await conn.keys(
                    f"{self.redis_handler.config.redis_prefix}security_headers:*"
                )
                if keys:
                    await conn.delete(*keys)
        except Exception as e:
            self.logger.warning(f"Failed to clear Redis cache: {e}")

validate_csp_report(report) async

Source code in guard_core/handlers/security_headers_handler.py
async def validate_csp_report(self, report: dict[str, Any]) -> bool:
    required_fields = ["document-uri", "violated-directive", "blocked-uri"]

    csp_report = report.get("csp-report", {})
    if not all(field in csp_report for field in required_fields):
        return False

    self.logger.warning(
        f"CSP Violation: {csp_report.get('violated-directive')} "
        f"blocked {csp_report.get('blocked-uri')} "
        f"on {csp_report.get('document-uri')}"
    )

    if self.agent_handler:
        await self._send_csp_violation_event(csp_report)

    return True

Singleton

Thread-safe singleton using threading.Lock for the __new__ double-checked locking pattern. Pre-instantiated as security_headers_manager at module level.

Default Headers

Applied to every response when the manager is enabled:

Header Default Value
X-Content-Type-Options nosniff
X-Frame-Options SAMEORIGIN
X-XSS-Protection 1; mode=block
Referrer-Policy strict-origin-when-cross-origin
Permissions-Policy geolocation=(), microphone=(), camera=()
X-Permitted-Cross-Domain-Policies none
X-Download-Options noopen
Cross-Origin-Embedder-Policy require-corp
Cross-Origin-Opener-Policy same-origin
Cross-Origin-Resource-Policy same-origin

Configuration

The configure() method accepts keyword-only arguments:

security_headers_manager.configure(
    enabled=True,
    csp={"default-src": ["'self'"], "script-src": ["'self'", "cdn.example.com"]},
    hsts_max_age=31536000,
    hsts_include_subdomains=True,
    hsts_preload=True,
    frame_options="DENY",
    content_type_options="nosniff",
    xss_protection="0",
    referrer_policy="no-referrer",
    permissions_policy="camera=()",
    custom_headers={"X-Custom": "value"},
    cors_origins=["https://example.com"],
    cors_allow_credentials=True,
    cors_allow_methods=["GET", "POST"],
    cors_allow_headers=["Authorization"],
)

CSP Configuration

CSP directives are stored as dict[str, list[str]] and serialized by _build_csp():

{"default-src": ["'self'"], "script-src": ["'self'", "'nonce-abc123'"]}
# becomes: "default-src 'self'; script-src 'self' 'nonce-abc123'"

Unsafe Sources

The manager logs a warning when CSP directives contain 'unsafe-inline' or 'unsafe-eval'.

HSTS Configuration

HSTS is built from a config dictionary:

{"max_age": 31536000, "include_subdomains": True, "preload": True}
# becomes: "max-age=31536000; includeSubDomains; preload"

Preload validation:

  • max_age must be >= 31536000 (1 year) for preload. If not, preload is disabled with a warning.
  • includeSubDomains must be True for preload. If not, it is force-enabled with a warning.

CORS Configuration

cors_config = {
    "origins": ["https://example.com"],
    "allow_credentials": True,
    "allow_methods": ["GET", "POST"],
    "allow_headers": ["Authorization"],
}

Safety check: If origins contains "*" and allow_credentials is True, credentials are force-disabled and the CORS configuration is blocked. This prevents a known security vulnerability.

Getting Headers

headers = await security_headers_manager.get_headers(request_path="/api/data")

Caching: Headers are cached per request path in a TTLCache(maxsize=1000, ttl=300). Cache keys are SHA-256 hashes of normalized paths.

Composition order:

  1. Default headers (copy)
  2. CSP header (if configured)
  3. HSTS header (if configured)
  4. Custom headers (override any defaults)

CORS Headers

cors_headers = await security_headers_manager.get_cors_headers(origin="https://example.com")

Returns an empty dict if:

  • CORS is not configured.
  • The origin is not in the allowed list.
  • Wildcard origin with credentials (security block).

CSP Violation Reporting

is_valid = await security_headers_manager.validate_csp_report(report_json)

Validates the structure of a CSP violation report (checks for document-uri, violated-directive, blocked-uri in the csp-report object) and logs the violation.

Header Value Validation

All header values pass through _validate_header_value():

  • Rejects values containing \r or \n (HTTP response splitting prevention).
  • Rejects values longer than 8192 bytes.
  • Strips non-printable characters except tab.

Redis Integration

Configuration (CSP, HSTS, custom headers) is cached to Redis with a 24-hour TTL. On initialization, the manager attempts to load cached configuration before applying defaults.