Skip to content

API Overview

The FastAPI Guard Agent API provides a comprehensive, protocol-driven architecture for enterprise-grade security telemetry. Designed with extensibility and reliability at its core, the API enables seamless integration with diverse monitoring ecosystems while maintaining strict performance guarantees.

System Architecture

The agent implements a layered architecture optimized for high-throughput telemetry collection with minimal application impact:

graph TB
    subgraph "FastAPI Application"
        FG[FastAPI Guard<br/>Security Middleware]
        SC[SecurityConfig<br/>enable_agent=True]
    end

    subgraph "FastAPI Guard Agent (Auto-initialized)"
        AH[Agent Handler<br/>Central Orchestrator]
        EB[Event Buffer<br/>High-Performance Queue]
        TR[HTTP Transport<br/>Resilient Network Layer]

        subgraph "Protocol Abstractions"
            BP[Buffer Protocol]
            TP[Transport Protocol]
            RP[Redis Protocol]
            AP[Agent Protocol]
        end
    end

    subgraph "External Systems"
        RD[(Redis<br/>Persistent Buffer)]
        BE[Management Platform<br/>Analytics & Policy Engine]
    end

    SC --> FG
    FG --> AH
    AH --> EB
    EB --> TR
    EB -.-> RD
    TR --> BE

    AH -.-> BP
    AH -.-> TP
    EB -.-> RP
    AH -.-> AP

Core Components

1. Agent Handler (GuardAgentHandler)

The central orchestration component responsible for coordinating all telemetry operations.

Core Responsibilities: - Lifecycle Management: Orchestrates initialization, operation, and graceful shutdown sequences - Event Processing: Implements high-throughput event ingestion with backpressure handling - Metric Aggregation: Performs efficient metric collection with configurable sampling rates - Task Coordination: Manages asynchronous operations including buffer flushing and policy synchronization

Basic Usage (Auto-integrated with FastAPI Guard):

from fastapi import FastAPI
from guard import SecurityConfig, SecurityMiddleware

# Configure with agent enabled
config = SecurityConfig(
    enable_agent=True,
    agent_api_key="your-api-key",
    agent_project_id="your-project-id",
    agent_endpoint="https://api.fastapi-guard.com",
)

app = FastAPI()
middleware = SecurityMiddleware(app, config=config)
# Agent starts automatically with middleware

Direct Usage (Advanced):

from guard_agent import guard_agent, AgentConfig

# Initialize directly
config = AgentConfig(
    api_key="your-api-key",
    project_id="your-project-id",
)
agent = guard_agent(config)

# Lifecycle
await agent.start()
await agent.stop()

# Send data manually
await agent.send_event(security_event)
await agent.send_metric(performance_metric)

2. Event Buffer (EventBuffer)

High-performance buffering subsystem engineered for optimal throughput and reliability.

Technical Capabilities: - Lock-Free Architecture: Utilizes deque-based storage for minimal contention - Persistent Buffering: Optional Redis integration for durability across restarts - Intelligent Flushing: Adaptive algorithms balance latency and efficiency - Concurrency Safety: Full async/await compatibility with thread-safe operations

Usage Patterns:

from guard_agent.buffer import EventBuffer
from guard_agent.models import SecurityEvent

# Create buffer
buffer = EventBuffer(
    max_size=1000,
    flush_interval=60,
    redis_handler=redis_client  # optional
)

# Add events
await buffer.add_event(security_event)
await buffer.add_metric(performance_metric)

# Manual flush
events, metrics = await buffer.flush()

3. HTTP Transport (HTTPTransport)

Enterprise-grade network layer implementing industry best practices for reliable data delivery.

Reliability Features: - Intelligent Retry: Exponential backoff with jitter prevents thundering herd - Circuit Breaker: Automatic failure detection with graceful degradation - Adaptive Rate Limiting: Dynamic throttling based on backend capacity - Comprehensive Telemetry: Real-time transport statistics for operational visibility

Configuration:

from guard_agent.transport import HTTPTransport

transport = HTTPTransport(
    backend_url="https://your-backend.com",
    api_key="your-api-key",
    timeout=30,
    max_retries=3,
    backoff_factor=2.0,
    rate_limit_requests=100,
    rate_limit_period=60
)

4. Data Models

Strongly-typed data structures leveraging Pydantic for validation and serialization.

Primary Models: - AgentConfig: Comprehensive configuration with validation and defaults - SecurityEvent: Rich security event representation with contextual metadata - SecurityMetric: Performance metrics with dimensional tagging support - EventBatch: Optimized batch container for network efficiency - AgentStatus: Real-time operational telemetry and health indicators

5. Protocol System

Clean abstraction layer enabling custom implementations while maintaining compatibility.

Protocol Interfaces: - AgentHandlerProtocol: Defines agent lifecycle and event handling contracts - BufferProtocol: Specifies buffering semantics and performance guarantees - TransportProtocol: Establishes network transport requirements and capabilities - RedisHandlerProtocol: Standardizes persistent storage integration patterns

API Reference by Module

Core Handler API

GuardAgentHandler

The main entry point for all agent operations.

class GuardAgentHandler:
    def __init__(self, config: AgentConfig) -> None: ...

    # Lifecycle Management
    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    async def is_running(self) -> bool: ...

    # Event & Metric Handling
    async def send_event(self, event: SecurityEvent) -> None: ...
    async def send_metric(self, metric: SecurityMetric) -> None: ...

    # Status & Health
    async def get_status(self) -> AgentStatus: ...
    async def health_check(self) -> bool: ...

    # Configuration
    async def update_config(self, config: AgentConfig) -> None: ...
    async def get_dynamic_rules(self) -> DynamicRules: ...

Key Methods:

Method Description Usage
start() Initialize agent and start background tasks Called on app startup
stop() Graceful shutdown with data preservation Called on app shutdown
send_event() Send security event to buffer Auto-called by FastAPI Guard
send_metric() Send performance metric Manual or auto collection
get_status() Get real-time agent status Health monitoring

Buffer API

EventBuffer

Intelligent event and metric buffering with persistence.

class EventBuffer:
    def __init__(
        self,
        max_size: int = 1000,
        flush_interval: int = 60,
        redis_handler: Optional[RedisHandlerProtocol] = None
    ) -> None: ...

    # Data Management
    async def add_event(self, event: SecurityEvent) -> None: ...
    async def add_metric(self, metric: SecurityMetric) -> None: ...
    async def flush(self) -> Tuple[List[SecurityEvent], List[SecurityMetric]]: ...

    # Status & Configuration
    def get_stats(self) -> Dict[str, Any]: ...
    async def clear(self) -> None: ...
    def is_full(self) -> bool: ...

Transport API

HTTPTransport

Enterprise-grade HTTP client with resilience features.

class HTTPTransport:
    def __init__(
        self,
        backend_url: str,
        api_key: str,
        timeout: int = 30,
        max_retries: int = 3,
        backoff_factor: float = 2.0
    ) -> None: ...

    # Data Transmission
    async def send_batch(self, batch: EventBatch) -> bool: ...
    async def send_heartbeat(self, status: AgentStatus) -> bool: ...

    # Health & Status
    async def test_connection(self) -> bool: ...
    def get_stats(self) -> TransportStats: ...

    # Configuration
    async def update_rules(self) -> Optional[DynamicRules]: ...

Models API

Core Data Models

AgentConfig

class AgentConfig(BaseModel):
    backend_url: str
    api_key: str
    buffer_size: int = 1000
    flush_interval: int = 60
    enabled: bool = True
    # ... additional fields

SecurityEvent

class SecurityEvent(BaseModel):
    event_type: EventType
    source_ip: str
    timestamp: datetime
    description: str
    metadata: Optional[Dict[str, Any]] = None
    # ... additional fields

SecurityMetric

class SecurityMetric(BaseModel):
    metric_type: MetricType
    value: Union[int, float]
    timestamp: datetime
    metadata: Optional[Dict[str, Any]] = None
    # ... additional fields

Implementation Patterns

Pattern 1: Standard Deployment

from fastapi import FastAPI
from guard import SecurityConfig, SecurityMiddleware

app = FastAPI()

# Configure FastAPI Guard with agent
config = SecurityConfig(
    # Enable agent
    enable_agent=True,
    agent_api_key="your-api-key",
    agent_project_id="your-project-id",

    # Security settings
    enable_rate_limiting=True,
    enable_ip_banning=True,
    enable_penetration_detection=True,
)

# Add middleware - agent starts automatically
middleware = SecurityMiddleware(app, config=config)

Pattern 2: Custom Event Integration

from guard_agent import guard_agent, AgentConfig, SecurityEvent
from guard_agent.utils import get_current_timestamp

# Get agent instance
config = AgentConfig(
    api_key="your-api-key",
    project_id="your-project-id",
)
agent = guard_agent(config)

async def custom_security_check(request):
    """Custom security validation with event reporting."""

    if is_suspicious(request):
        # Create custom event
        event = SecurityEvent(
            timestamp=get_current_timestamp(),
            event_type="custom_rule_triggered",
            ip_address=request.client.host,
            action_taken="blocked",
            reason="Custom security check failed",
            endpoint=str(request.url.path),
            method=request.method,
            metadata={
                "check_type": "business_logic",
                "severity": "high"
            }
        )

        # Send to agent
        await agent.send_event(event)

        return False

    return True

Pattern 3: Performance Monitoring

from guard_agent import guard_agent, SecurityMetric
from guard_agent.utils import get_current_timestamp
import time

# Get agent instance (singleton)
agent = guard_agent(AgentConfig(
    api_key="your-api-key",
    project_id="your-project-id",
))

async def monitor_endpoint_performance():
    """Monitor endpoint performance and send metrics."""

    start_time = time.time()

    try:
        # Your endpoint logic
        result = await process_request()

        # Success metric
        await agent.send_metric(SecurityMetric(
            timestamp=get_current_timestamp(),
            metric_type="response_time",
            value=time.time() - start_time,
            endpoint="/api/process",
            tags={"status": "success"}
        ))

        return result

    except Exception as e:
        # Error metric
        await agent.send_metric(SecurityMetric(
            timestamp=get_current_timestamp(),
            metric_type="error_rate",
            value=1.0,
            endpoint="/api/process",
            tags={"error": str(type(e).__name__)}
        ))
        raise

Pattern 4: Health Monitoring

@app.get("/health")
async def health_check():
    """Application health check including agent status."""

    agent_healthy = await agent.health_check()
    agent_status = await agent.get_status()

    return {
        "status": "healthy" if agent_healthy else "degraded",
        "agent": {
            "running": agent_status.is_running,
            "events_processed": agent_status.events_processed,
            "last_flush": agent_status.last_flush_time,
            "buffer_size": agent_status.buffer_size
        }
    }

Reliability Patterns

Graceful Degradation

The agent implements comprehensive failure isolation to protect application availability:

try:
    await agent.send_event(event)
except Exception as e:
    # Agent failure doesn't break your app
    logger.warning(f"Agent error: {e}")
    # Application continues normally

Circuit Breaker Implementation

Advanced failure detection with automatic recovery mechanisms:

# Circuit breaker state machine:
# - CLOSED: Normal operation with request forwarding
# - OPEN: Fast-fail mode preventing backend overload
# - HALF_OPEN: Controlled recovery testing with limited traffic

Retry Strategies

Sophisticated retry algorithms optimize for both reliability and backend protection:

# Adaptive retry configuration:
max_retries: 3                    # Configurable retry limit
backoff_factor: 2.0              # Exponential growth factor
jitter: True                     # Randomization prevents synchronized retries
max_delay: 60                    # Caps maximum retry delay

Performance Engineering

Memory Optimization

Strategic memory management for various deployment scales:

  • Buffer Sizing: Dynamic sizing based on traffic patterns and memory constraints
  • Flush Strategies: Adaptive algorithms balance latency requirements with efficiency
  • Persistent Buffering: Redis integration for memory-constrained environments

Network Optimization

Advanced techniques minimize bandwidth and latency:

  • Intelligent Batching: Adaptive batch sizes based on network conditions
  • Compression: Automatic gzip compression for payload optimization
  • Connection Management: HTTP/2 multiplexing with connection pooling

Deployment Profiles

# Microservice deployment (low memory, high frequency)
config = AgentConfig(buffer_size=100, flush_interval=10)

# Standard API service (balanced profile)
config = AgentConfig(buffer_size=1000, flush_interval=30)

# High-throughput gateway (optimized for volume)
config = AgentConfig(buffer_size=10000, flush_interval=60)