Skip to content

Security Checks Framework

The security checks framework is the backbone of guard-core's request processing. It implements a Chain of Responsibility pattern where each check runs in sequence, and the first check to return a non-None response terminates the pipeline.

SecurityCheck Base Class

guard_core.core.checks.base.SecurityCheck(middleware)

Bases: ABC

Source code in guard_core/core/checks/base.py
def __init__(self, middleware: "GuardMiddlewareProtocol") -> None:
    self.middleware = middleware
    self.config = middleware.config
    self.logger = middleware.logger

check_name abstractmethod property

config = middleware.config instance-attribute

logger = middleware.logger instance-attribute

middleware = middleware instance-attribute

check(request) abstractmethod async

Source code in guard_core/core/checks/base.py
@abstractmethod
async def check(self, request: GuardRequest) -> GuardResponse | None:
    pass  # pragma: no cover

create_error_response(status_code, default_message) async

Source code in guard_core/core/checks/base.py
async def create_error_response(
    self, status_code: int, default_message: str
) -> GuardResponse:
    return await self.middleware.create_error_response(status_code, default_message)

is_passive_mode()

Source code in guard_core/core/checks/base.py
def is_passive_mode(self) -> bool:
    return self.config.passive_mode

send_event(event_type, request, action_taken, reason, **kwargs) async

Source code in guard_core/core/checks/base.py
async def send_event(
    self,
    event_type: str,
    request: GuardRequest,
    action_taken: str,
    reason: str,
    **kwargs: Any,
) -> None:
    await self.middleware.event_bus.send_middleware_event(
        event_type=event_type,
        request=request,
        action_taken=action_taken,
        reason=reason,
        **kwargs,
    )

All security checks inherit from the abstract SecurityCheck class, which provides a consistent interface and shared utilities.

from abc import ABC, abstractmethod
from guard_core.protocols.request_protocol import GuardRequest
from guard_core.protocols.response_protocol import GuardResponse

class SecurityCheck(ABC):
    def __init__(self, middleware: "GuardMiddlewareProtocol") -> None:
        self.middleware = middleware
        self.config = middleware.config
        self.logger = middleware.logger

    @abstractmethod
    async def check(self, request: GuardRequest) -> GuardResponse | None: ...

    @property
    @abstractmethod
    def check_name(self) -> str: ...

    async def send_event(
        self, event_type: str, request: GuardRequest,
        action_taken: str, reason: str, **kwargs
    ) -> None: ...

    async def create_error_response(
        self, status_code: int, default_message: str
    ) -> GuardResponse: ...

    def is_passive_mode(self) -> bool: ...

Constructor

The constructor receives a GuardMiddlewareProtocol instance and extracts config and logger from it. Adapter developers do not call this directly -- the middleware builds check instances during pipeline construction.

Attribute Type Source
middleware GuardMiddlewareProtocol Passed to constructor
config SecurityConfig middleware.config
logger logging.Logger middleware.logger

Abstract Members

check(request) -> GuardResponse | None

The core method every check must implement. Returns None to pass the request to the next check, or a GuardResponse to block and respond immediately.

check_name -> str

A read-only property returning a unique string identifier for the check. Used in logging, pipeline management, and the remove_check method.

Utility Methods

send_event(event_type, request, action_taken, reason, **kwargs)

Delegates to middleware.event_bus.send_middleware_event(). Adapter developers should use this for audit trail events rather than calling the event bus directly.

create_error_response(status_code, default_message)

Delegates to middleware.create_error_response(). The middleware implementation consults SecurityConfig.custom_error_responses for message overrides before creating the response via the adapter's GuardResponseFactory.

is_passive_mode()

Returns self.config.passive_mode. When passive mode is active, checks should log and emit events but not block requests.

SecurityCheckPipeline

guard_core.core.checks.pipeline.SecurityCheckPipeline(checks)

Source code in guard_core/core/checks/pipeline.py
def __init__(self, checks: list[SecurityCheck]) -> None:
    self.checks = checks
    self.logger = logging.getLogger(__name__)

checks = checks instance-attribute

logger = logging.getLogger(__name__) instance-attribute

add_check(check)

Source code in guard_core/core/checks/pipeline.py
def add_check(self, check: SecurityCheck) -> None:
    self.checks.append(check)

execute(request) async

Source code in guard_core/core/checks/pipeline.py
async def execute(self, request: GuardRequest) -> GuardResponse | None:
    for check in self.checks:
        try:
            response = await check.check(request)
            if response is not None:
                self.logger.info(
                    f"Request blocked by {check.check_name}",
                    extra={
                        "check": check.check_name,
                        "path": request.url_path,
                        "method": request.method,
                    },
                )
                return response

        except Exception as e:
            self.logger.error(
                f"Error in security check {check.check_name}: {e}",
                extra={
                    "check": check.check_name,
                    "path": request.url_path,
                    "method": request.method,
                },
                exc_info=True,
            )

            if hasattr(check.config, "fail_secure") and check.config.fail_secure:
                self.logger.warning(
                    f"Blocking request due to check error "
                    f"in fail-secure mode: {check.check_name}"
                )
                return await check.create_error_response(
                    status_code=500,
                    default_message="Security check failed",
                )

            continue

    return None

get_check_names()

Source code in guard_core/core/checks/pipeline.py
def get_check_names(self) -> list[str]:
    return [check.check_name for check in self.checks]

insert_check(index, check)

Source code in guard_core/core/checks/pipeline.py
def insert_check(self, index: int, check: SecurityCheck) -> None:
    self.checks.insert(index, check)

remove_check(check_name)

Source code in guard_core/core/checks/pipeline.py
def remove_check(self, check_name: str) -> bool:
    for i, check in enumerate(self.checks):
        if check.check_name == check_name:
            self.checks.pop(i)
            return True
    return False

The pipeline holds an ordered list of SecurityCheck instances and executes them sequentially.

class SecurityCheckPipeline:
    def __init__(self, checks: list[SecurityCheck]) -> None: ...
    async def execute(self, request: GuardRequest) -> GuardResponse | None: ...
    def add_check(self, check: SecurityCheck) -> None: ...
    def insert_check(self, index: int, check: SecurityCheck) -> None: ...
    def remove_check(self, check_name: str) -> bool: ...
    def get_check_names(self) -> list[str]: ...

Execution Flow

flowchart TD
    START["execute(request)"]
    LOOP{"Next check?"}
    RUN["await check.check(request)"]
    RESP{"Response returned?"}
    LOG_BLOCK["Log: blocked by check"]
    RETURN_RESP["Return response"]
    EXCEPTION{"Exception raised?"}
    LOG_ERR["Log error"]
    FAIL_SECURE{"fail_secure enabled?"}
    RETURN_500["Return 500 error"]
    CONTINUE["Continue to next check"]
    ALL_PASSED["Return None"]

    START --> LOOP
    LOOP -- Yes --> RUN
    RUN --> EXCEPTION
    EXCEPTION -- No --> RESP
    RESP -- Yes --> LOG_BLOCK --> RETURN_RESP
    RESP -- No --> LOOP
    EXCEPTION -- Yes --> LOG_ERR --> FAIL_SECURE
    FAIL_SECURE -- Yes --> RETURN_500
    FAIL_SECURE -- No --> CONTINUE --> LOOP
    LOOP -- "No more checks" --> ALL_PASSED

Error Handling

When a check raises an exception:

  • The error is logged with exc_info=True for full traceback.
  • If config.fail_secure is set (opt-in, not a standard SecurityConfig field — use hasattr), the pipeline returns a 500 error response (fail-closed behavior).
  • Otherwise, the pipeline continues to the next check (fail-open behavior).

Pipeline Manipulation

Adapters can modify the pipeline after construction:

Method Description
add_check(check) Appends a check to the end of the pipeline
insert_check(index, check) Inserts a check at a specific position
remove_check(check_name) Removes the first check matching the name; returns bool
get_check_names() Returns the ordered list of check name strings
len(pipeline) Returns the number of checks

Order Matters

Inserting checks at the wrong position can break assumptions. For example, RouteConfigCheck must run first because all subsequent checks read request.state.route_config and request.state.client_ip, which it populates.

Helper Functions

guard_core.core.checks.helpers

check_country_access(client_ip, route_config, geo_ip_handler)

Source code in guard_core/core/checks/helpers.py
def check_country_access(
    client_ip: str, route_config: RouteConfig, geo_ip_handler: Any
) -> bool | None:
    if not geo_ip_handler:
        return None

    country = None

    if route_config.blocked_countries:
        country = geo_ip_handler.get_country(client_ip)
        if country and country in route_config.blocked_countries:
            return False

    if route_config.whitelist_countries:
        if country is None:
            country = geo_ip_handler.get_country(client_ip)

        if country:
            return country in route_config.whitelist_countries
        return False

    return None

check_route_ip_access(client_ip, route_config, middleware) async

Source code in guard_core/core/checks/helpers.py
async def check_route_ip_access(
    client_ip: str, route_config: RouteConfig, middleware: Any
) -> bool | None:
    try:
        ip_addr = ip_address(client_ip)

        if _check_ip_blacklist(client_ip, ip_addr, route_config):
            return False

        whitelist_result = _check_ip_whitelist(client_ip, ip_addr, route_config)
        if whitelist_result is not None:
            return whitelist_result

        country_result = check_country_access(
            client_ip, route_config, middleware.geo_ip_handler
        )
        if country_result is not None:
            return country_result

        return None
    except ValueError:
        return False

check_user_agent_allowed(user_agent, route_config, config) async

Source code in guard_core/core/checks/helpers.py
async def check_user_agent_allowed(
    user_agent: str, route_config: RouteConfig | None, config: Any
) -> bool:
    from guard_core.utils import is_user_agent_allowed as global_user_agent_check

    if route_config and route_config.blocked_user_agents:
        for pattern in route_config.blocked_user_agents:
            if re.search(pattern, user_agent, re.IGNORECASE):
                return False

    return await global_user_agent_check(user_agent, config)

detect_penetration_patterns(request, route_config, config, should_bypass_check_fn) async

Source code in guard_core/core/checks/helpers.py
async def detect_penetration_patterns(
    request: GuardRequest,
    route_config: RouteConfig | None,
    config: SecurityConfig,
    should_bypass_check_fn: Any,
) -> tuple[bool, str]:
    penetration_enabled, route_specific_detection = _get_effective_penetration_setting(
        config, route_config
    )

    if penetration_enabled and not should_bypass_check_fn("penetration", route_config):
        return await detect_penetration_attempt(request)

    reason = _get_detection_disabled_reason(config, route_specific_detection)
    return False, reason

is_ip_in_blacklist(client_ip, ip_addr, blacklist)

Source code in guard_core/core/checks/helpers.py
def is_ip_in_blacklist(client_ip: str, ip_addr: object, blacklist: list[str]) -> bool:
    for blocked in blacklist:
        if "/" in blocked:
            if ip_addr in ip_network(blocked, strict=False):
                return True
        elif client_ip == blocked:
            return True
    return False

is_ip_in_whitelist(client_ip, ip_addr, whitelist)

Source code in guard_core/core/checks/helpers.py
def is_ip_in_whitelist(
    client_ip: str, ip_addr: object, whitelist: list[str]
) -> bool | None:
    if not whitelist:
        return None

    for allowed in whitelist:
        if "/" in allowed:
            if ip_addr in ip_network(allowed, strict=False):
                return True
        elif client_ip == allowed:
            return True
    return False

is_referrer_domain_allowed(referrer, allowed_domains)

Source code in guard_core/core/checks/helpers.py
def is_referrer_domain_allowed(referrer: str, allowed_domains: list[str]) -> bool:
    try:
        referrer_domain = urlparse(referrer).netloc.lower()
        for allowed_domain in allowed_domains:
            if referrer_domain == allowed_domain.lower() or referrer_domain.endswith(
                f".{allowed_domain.lower()}"
            ):
                return True
        return False
    except Exception:
        return False

validate_auth_header(auth_header, auth_type)

Source code in guard_core/core/checks/helpers.py
def validate_auth_header(auth_header: str, auth_type: str) -> tuple[bool, str]:
    if auth_type == "bearer":
        if not auth_header.startswith("Bearer "):
            return False, "Missing or invalid Bearer token"
    elif auth_type == "basic":
        if not auth_header.startswith("Basic "):
            return False, "Missing or invalid Basic authentication"
    else:
        if not auth_header:
            return False, f"Missing {auth_type} authentication"

    return True, ""

Helper functions are stateless utilities shared across check implementations.

IP Access Helpers

is_ip_in_blacklist(client_ip, ip_addr, blacklist) -> bool

Checks whether client_ip appears in a list of blocked IPs or CIDR ranges.

is_ip_in_whitelist(client_ip, ip_addr, whitelist) -> bool | None

Returns True if allowed, False if explicitly not in the whitelist, or None if the whitelist is empty (no opinion).

check_country_access(client_ip, route_config, geo_ip_handler) -> bool | None

Evaluates country-based rules on a RouteConfig. Returns False to block, True to allow, None for no opinion.

check_route_ip_access(client_ip, route_config, middleware) -> bool | None

Combines blacklist, whitelist, and country checks for a route-level IP evaluation. Returns False on any deny condition.

User Agent Helpers

check_user_agent_allowed(user_agent, route_config, config) -> bool

Checks the user agent against both route-level blocked_user_agents and the global config.blocked_user_agents list. Uses case-insensitive regex matching.

Authentication Helpers

validate_auth_header(auth_header, auth_type) -> tuple[bool, str]

Validates that an Authorization header has the correct prefix for the specified type ("bearer", "basic", or custom). Returns (True, "") on success or (False, reason) on failure.

Referrer Helpers

is_referrer_domain_allowed(referrer, allowed_domains) -> bool

Parses the referrer URL and checks the domain against a list. Supports subdomain matching (e.g., sub.example.com matches example.com).

Penetration Detection Helpers

detect_penetration_patterns(request, route_config, config, should_bypass_check_fn) -> tuple[bool, str]

Orchestrates the penetration detection check. Respects decorator-level overrides and bypass configuration. Returns (True, trigger_info) when a threat is detected, or (False, reason) when skipped or clean.