Middleware Integration¶
The Dispatch Pattern¶
Every adapter follows the same dispatch lifecycle:
- Passthrough check -- skip requests with no client IP or excluded paths.
- Route resolution -- resolve the matched route's
RouteConfig(decorator settings). - Security bypass check -- if the route bypasses all checks, forward immediately.
- Security pipeline execution -- run the chain of 17 security checks.
- Behavioral usage rules -- track endpoint usage for behavioral rules.
- Call next -- forward to the application handler and measure response time.
- Response processing -- apply behavioral return rules, collect metrics, add security headers.
Guard-core provides all the building blocks. Your middleware class orchestrates them.
Reference: fastapi-guard Middleware¶
The production fastapi-guard adapter demonstrates the complete pattern. Study this as a blueprint for your own adapter.
Constructor¶
class SecurityMiddleware(BaseHTTPMiddleware):
def __init__(self, app: ASGIApp, *, config: SecurityConfig) -> None:
super().__init__(app)
self.app = app
self.config = config
self.logger = setup_custom_logging(
config.custom_log_file, log_format=config.log_format
)
self.last_cloud_ip_refresh = 0
self.suspicious_request_counts: dict[str, int] = {}
self.rate_limit_handler = RateLimitManager(config)
self.guard_decorator: BaseSecurityDecorator | None = None
self._configure_security_headers(config)
self.geo_ip_handler = None
if config.whitelist_countries or config.blocked_countries:
self.geo_ip_handler = config.geo_ip_handler
self.redis_handler = None
if config.enable_redis:
from guard_core.handlers.redis_handler import RedisManager
self.redis_handler = RedisManager(config)
self.agent_handler = None
if config.enable_agent:
agent_config = config.to_agent_config()
if agent_config:
try:
from guard_agent import guard_agent
self.agent_handler = guard_agent(agent_config)
except ImportError:
self.logger.warning(
"Agent enabled but guard_agent package not installed."
)
self.security_pipeline: SecurityCheckPipeline | None = None
self.event_bus = SecurityEventBus(
self.agent_handler, self.config, self.geo_ip_handler
)
self.metrics_collector = MetricsCollector(self.agent_handler, self.config)
self.handler_initializer = HandlerInitializer(
config=self.config,
redis_handler=self.redis_handler,
agent_handler=self.agent_handler,
geo_ip_handler=self.geo_ip_handler,
rate_limit_handler=self.rate_limit_handler,
guard_decorator=self.guard_decorator,
)
response_context = ResponseContext(
config=self.config,
logger=self.logger,
metrics_collector=self.metrics_collector,
agent_handler=self.agent_handler,
guard_decorator=self.guard_decorator,
)
self.response_factory = ErrorResponseFactory(response_context)
routing_context = RoutingContext(
config=self.config,
logger=self.logger,
guard_decorator=self.guard_decorator,
)
self.route_resolver = RouteConfigResolver(routing_context)
validation_context = ValidationContext(
config=self.config,
logger=self.logger,
event_bus=self.event_bus,
)
self.validator = RequestValidator(validation_context)
bypass_context = BypassContext(
config=self.config,
logger=self.logger,
event_bus=self.event_bus,
route_resolver=self.route_resolver,
response_factory=self.response_factory,
validator=self.validator,
)
self.bypass_handler = BypassHandler(bypass_context)
behavioral_context = BehavioralContext(
config=self.config,
logger=self.logger,
event_bus=self.event_bus,
guard_decorator=self.guard_decorator,
)
self.behavioral_processor = BehavioralProcessor(behavioral_context)
Dispatch Method¶
async def dispatch(
self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
passthrough = await self.bypass_handler.handle_passthrough(request, call_next)
if passthrough:
return passthrough
client_ip = await extract_client_ip(request, self.config, self.agent_handler)
route_config = self.route_resolver.get_route_config(request)
if bypass := await self.bypass_handler.handle_security_bypass(
request, call_next, route_config
):
return bypass
if not self.security_pipeline:
self._build_security_pipeline()
assert self.security_pipeline is not None
if blocking := await self.security_pipeline.execute(request):
return blocking
if route_config and route_config.behavior_rules and client_ip:
await self.behavioral_processor.process_usage_rules(
request, client_ip, route_config
)
start_time = time.time()
response = await call_next(request)
response_time = time.time() - start_time
return await self.response_factory.process_response(
request,
response,
response_time,
route_config,
process_behavioral_rules=self.behavioral_processor.process_return_rules,
)
Initialize Method¶
async def initialize(self) -> None:
self._build_security_pipeline()
self.handler_initializer.guard_decorator = self.guard_decorator
await self.handler_initializer.initialize_redis_handlers()
await self.handler_initializer.initialize_agent_integrations()
Building Your Own Middleware¶
Step 1: Create the Middleware Class¶
Extend your framework's middleware base class. The middleware must satisfy GuardMiddlewareProtocol:
The protocol requires:
| Attribute / Method | Type | Purpose |
|---|---|---|
config |
SecurityConfig |
Configuration object |
logger |
logging.Logger |
Logger instance |
last_cloud_ip_refresh |
int |
Unix timestamp of last cloud IP refresh |
suspicious_request_counts |
dict[str, int] |
Per-IP suspicious request counters |
event_bus |
SecurityEventBus |
Event dispatching |
route_resolver |
RouteConfigResolver |
Route config resolution |
response_factory |
ErrorResponseFactory |
Error response creation |
rate_limit_handler |
RateLimitManager |
Rate limiting |
agent_handler |
Agent instance or None |
Telemetry agent |
geo_ip_handler |
GeoIP instance or None |
IP geolocation |
guard_response_factory |
GuardResponseFactory |
Framework response factory |
create_error_response() |
async (int, str) -> GuardResponse |
Create error response |
refresh_cloud_ip_ranges() |
async () -> None |
Refresh cloud IPs |
Step 2: Initialize Core Modules¶
In your constructor, initialize the guard-core modules with their context objects. The context objects use dependency injection to keep modules decoupled:
from guard_core.core.behavioral import BehavioralContext, BehavioralProcessor
from guard_core.core.bypass import BypassContext, BypassHandler
from guard_core.core.events import MetricsCollector, SecurityEventBus
from guard_core.core.initialization import HandlerInitializer
from guard_core.core.responses import ErrorResponseFactory, ResponseContext
from guard_core.core.routing import RouteConfigResolver, RoutingContext
from guard_core.core.validation import RequestValidator, ValidationContext
Each context is a @dataclass with explicit dependencies:
ResponseContext--config,logger,metrics_collector,agent_handler,guard_decorator,response_factory(yourGuardResponseFactory)RoutingContext--config,logger,guard_decoratorValidationContext--config,logger,event_busBypassContext--config,logger,event_bus,route_resolver,response_factory,validatorBehavioralContext--config,logger,event_bus,guard_decorator
Step 3: Build the Security Pipeline¶
Import check implementations from guard_core.core.checks.implementations and assemble them in order:
from guard_core.core.checks.implementations import (
AuthenticationCheck,
CloudIpRefreshCheck,
CloudProviderCheck,
CustomRequestCheck,
CustomValidatorsCheck,
EmergencyModeCheck,
HttpsEnforcementCheck,
IpSecurityCheck,
RateLimitCheck,
ReferrerCheck,
RequestLoggingCheck,
RequestSizeContentCheck,
RequiredHeadersCheck,
RouteConfigCheck,
SuspiciousActivityCheck,
TimeWindowCheck,
UserAgentCheck,
)
from guard_core.core.checks.pipeline import SecurityCheckPipeline
def _build_security_pipeline(self) -> None:
checks = [
RouteConfigCheck(self),
EmergencyModeCheck(self),
HttpsEnforcementCheck(self),
RequestLoggingCheck(self),
RequestSizeContentCheck(self),
RequiredHeadersCheck(self),
AuthenticationCheck(self),
ReferrerCheck(self),
CustomValidatorsCheck(self),
TimeWindowCheck(self),
CloudIpRefreshCheck(self),
IpSecurityCheck(self),
CloudProviderCheck(self),
UserAgentCheck(self),
RateLimitCheck(self),
SuspiciousActivityCheck(self),
CustomRequestCheck(self),
]
self.security_pipeline = SecurityCheckPipeline(checks)
Each check receives self (your middleware instance) and accesses everything it needs through the GuardMiddlewareProtocol interface.
Step 4: Use HandlerInitializer¶
HandlerInitializer centralizes the async initialization of Redis, Agent, and handler subsystems:
from guard_core.core.initialization import HandlerInitializer
self.handler_initializer = HandlerInitializer(
config=self.config,
redis_handler=self.redis_handler,
agent_handler=self.agent_handler,
geo_ip_handler=self.geo_ip_handler,
rate_limit_handler=self.rate_limit_handler,
guard_decorator=self.guard_decorator,
)
Call these in your initialize() method:
async def initialize(self) -> None:
self._build_security_pipeline()
self.handler_initializer.guard_decorator = self.guard_decorator
await self.handler_initializer.initialize_redis_handlers()
await self.handler_initializer.initialize_agent_integrations()
initialize_redis_handlers() performs:
redis_handler.initialize()cloud_handler.initialize_redis()(if cloud providers configured)ip_ban_manager.initialize_redis()geo_ip_handler.initialize_redis()(if present)rate_limit_handler.initialize_redis()(if present)sus_patterns_handler.initialize_redis()
initialize_agent_integrations() performs:
agent_handler.start()- Cross-initialization between agent and Redis handlers
- Agent initialization for IP ban, rate limit, suspicious patterns, cloud, and geo handlers
- Decorator agent initialization
- Dynamic rule manager initialization (if enabled)
Step 5: Wire the Dispatch¶
Your framework's middleware entry point must:
- Wrap the incoming request into a
GuardRequest(if not already protocol-compatible). - Call
bypass_handler.handle_passthrough(). - Extract client IP and resolve route config.
- Call
bypass_handler.handle_security_bypass(). - Call
security_pipeline.execute(). - Process behavioral usage rules.
- Forward to the next handler.
- Call
response_factory.process_response()on the result.
Step 6: Implement create_error_response¶
Your middleware must expose this method, as individual security checks call it through the GuardMiddlewareProtocol:
async def create_error_response(
self, status_code: int, default_message: str
) -> GuardResponse:
return await self.response_factory.create_error_response(
status_code, default_message
)
Step 7: Implement refresh_cloud_ip_ranges¶
Required by CloudIpRefreshCheck:
async def refresh_cloud_ip_ranges(self) -> None:
if not self.config.block_cloud_providers:
return
from guard_core.handlers.cloud_handler import cloud_handler
if self.config.enable_redis and self.redis_handler:
await cloud_handler.refresh_async(
self.config.block_cloud_providers,
ttl=self.config.cloud_ip_refresh_interval,
)
else:
cloud_handler.refresh(self.config.block_cloud_providers)
self.last_cloud_ip_refresh = int(time.time())
CORS Handling¶
Guard-core does not manage CORS middleware registration. CORS is framework-specific and must be handled at the adapter level. The fastapi-guard adapter provides a static helper:
from fastapi.middleware.cors import CORSMiddleware
@staticmethod
def configure_cors(app: FastAPI, config: SecurityConfig) -> bool:
if config.enable_cors:
cors_params: dict[str, Any] = {
"allow_origins": config.cors_allow_origins,
"allow_methods": config.cors_allow_methods,
"allow_headers": config.cors_allow_headers,
"allow_credentials": config.cors_allow_credentials,
"max_age": config.cors_max_age,
}
if config.cors_expose_headers:
cors_params["expose_headers"] = config.cors_expose_headers
app.add_middleware(CORSMiddleware, **cors_params)
return True
return False
Guard-core does handle CORS headers on individual responses through ErrorResponseFactory.apply_cors_headers(). When a request includes an Origin header, the response processing stage adds the appropriate CORS headers via security_headers_manager.get_cors_headers(). But the framework's CORS middleware (which handles preflight OPTIONS requests) must be registered by your adapter.
For Flask, use flask-cors. For Django, use django-cors-headers. Read the CORS-related fields from SecurityConfig:
enable_corscors_allow_originscors_allow_methodscors_allow_headerscors_allow_credentialscors_expose_headerscors_max_age
Framework-Specific Middleware Patterns¶
FastAPI / Starlette¶
Extend BaseHTTPMiddleware. Override dispatch(self, request, call_next). The call_next callable forwards to the next middleware or route handler.
Flask¶
Use app.before_request and app.after_request hooks. Since Flask is synchronous, you need asyncio.run() or an async-compatible Flask extension (like Quart) to call guard-core's async methods:
import asyncio
@app.before_request
def guard_before_request():
loop = asyncio.new_event_loop()
try:
wrapped = FlaskGuardRequest(request, app)
result = loop.run_until_complete(run_security_pipeline(wrapped))
if result is not None:
return unwrap_response(result)
finally:
loop.close()
For production Flask adapters, consider using Quart (async Flask) to avoid the overhead of creating event loops per request.
Django¶
Use Django middleware classes with __call__ or process_request/process_response hooks. For async Django (3.1+), use async def __call__:
class GuardMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.security_middleware = None
async def __call__(self, request):
if self.security_middleware is None:
await self._initialize()
wrapped = DjangoGuardRequest(request)
passthrough = await self.bypass_handler.handle_passthrough(
wrapped, self._call_next
)
if passthrough:
return unwrap_response(passthrough)
blocking = await self.security_pipeline.execute(wrapped)
if blocking:
return unwrap_response(blocking)
response = await self.get_response(request)
return response
Initialization Timing¶
The initialize() method is async and must be called after the middleware is instantiated. In FastAPI, use a startup event:
In Django, call it in AppConfig.ready() or the first request. In Flask, use app.before_first_request or call it during app factory setup.