Skip to content

Cloud Providers

Guard-core can block requests originating from cloud provider IP ranges (AWS, GCP, Azure). The CloudManager handler fetches the official IP range lists, caches them as ipaddress network objects, and exposes a fast membership check used by the security pipeline.

CloudManager

guard_core.handlers.cloud_handler.CloudManager

agent_handler = None class-attribute instance-attribute

ip_ranges instance-attribute

last_updated instance-attribute

logger instance-attribute

redis_handler = None class-attribute instance-attribute

get_cloud_provider_details(ip, providers=_ALL_PROVIDERS)

Source code in guard_core/handlers/cloud_handler.py
def get_cloud_provider_details(
    self, ip: str, providers: set[str] = _ALL_PROVIDERS
) -> tuple[str, str] | None:
    try:
        ip_obj = ipaddress.ip_address(ip)
        for provider in providers:
            if provider in self.ip_ranges:
                for network in self.ip_ranges[provider]:
                    if ip_obj in network:
                        return (provider, str(network))
        return None
    except ValueError:
        self.logger.error(f"Invalid IP address: {ip}")
        return None

initialize_agent(agent_handler) async

Source code in guard_core/handlers/cloud_handler.py
async def initialize_agent(self, agent_handler: Any) -> None:
    self.agent_handler = agent_handler

initialize_redis(redis_handler, providers=_ALL_PROVIDERS, ttl=3600) async

Source code in guard_core/handlers/cloud_handler.py
async def initialize_redis(
    self,
    redis_handler: Any,
    providers: set[str] = _ALL_PROVIDERS,
    ttl: int = 3600,
) -> None:
    self.redis_handler = redis_handler
    await self.refresh_async(providers, ttl=ttl)

is_cloud_ip(ip, providers=_ALL_PROVIDERS)

Source code in guard_core/handlers/cloud_handler.py
def is_cloud_ip(self, ip: str, providers: set[str] = _ALL_PROVIDERS) -> bool:
    try:
        ip_obj = ipaddress.ip_address(ip)
        for provider in providers:
            if provider in self.ip_ranges:
                for network in self.ip_ranges[provider]:
                    if ip_obj in network:
                        return True
        return False
    except ValueError:
        self.logger.error(f"Invalid IP address: {ip}")
        return False

refresh(providers=_ALL_PROVIDERS)

Source code in guard_core/handlers/cloud_handler.py
def refresh(self, providers: set[str] = _ALL_PROVIDERS) -> None:
    if self.redis_handler is None:
        self._refresh_sync(providers)
    else:
        raise RuntimeError("Use async refresh() when Redis is enabled")

refresh_async(providers=_ALL_PROVIDERS, ttl=3600) async

Source code in guard_core/handlers/cloud_handler.py
async def refresh_async(
    self, providers: set[str] = _ALL_PROVIDERS, ttl: int = 3600
) -> None:
    if self.redis_handler is None:
        self._refresh_sync(providers)
        return

    for provider in providers:
        try:
            cached_ranges = await self.redis_handler.get_key(
                "cloud_ranges", provider
            )
            if cached_ranges:
                self.ip_ranges[provider] = {
                    ipaddress.ip_network(ip) for ip in cached_ranges.split(",")
                }
                continue

            fetch_func = {
                "AWS": fetch_aws_ip_ranges,
                "GCP": fetch_gcp_ip_ranges,
                "Azure": fetch_azure_ip_ranges,
            }[provider]

            ranges = fetch_func()
            if ranges:
                old_ranges = self.ip_ranges.get(provider, set())
                self._log_range_changes(provider, old_ranges, ranges)
                self.ip_ranges[provider] = ranges
                self.last_updated[provider] = datetime.now(timezone.utc)

                await self.redis_handler.set_key(
                    "cloud_ranges",
                    provider,
                    ",".join(str(ip) for ip in ranges),
                    ttl=ttl,
                )

        except Exception as e:
            self.logger.error(f"Failed to refresh {provider} IP ranges: {str(e)}")
            if provider not in self.ip_ranges:
                self.ip_ranges[provider] = set()

send_cloud_detection_event(ip, provider, network, action_taken='request_blocked') async

Source code in guard_core/handlers/cloud_handler.py
async def send_cloud_detection_event(
    self,
    ip: str,
    provider: str,
    network: str,
    action_taken: str = "request_blocked",
) -> None:
    if not self.agent_handler:
        return

    await self._send_cloud_event(
        event_type="cloud_blocked",
        ip_address=ip,
        action_taken=action_taken,
        reason=f"IP belongs to blocked cloud provider: {provider}",
        cloud_provider=provider,
        network=network,
    )

Singleton Pattern

CloudManager uses a singleton so that IP range data is shared across the entire process:

class CloudManager:
    _instance = None
    ip_ranges: dict[str, set[IPv4Network | IPv6Network]]
    last_updated: dict[str, datetime | None]

    def __new__(cls) -> "CloudManager":
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.ip_ranges = {"AWS": set(), "GCP": set(), "Azure": set()}
            cls._instance.last_updated = {p: None for p in _ALL_PROVIDERS}
        return cls._instance

A module-level instance cloud_handler is the canonical access point used throughout guard-core.


IP Range Fetching

Each provider has a dedicated fetch function that returns a set[IPv4Network | IPv6Network].

AWS

def fetch_aws_ip_ranges() -> set[IPv4Network | IPv6Network]:
    response = requests.get(
        "https://ip-ranges.amazonaws.com/ip-ranges.json", timeout=10
    )
    return {
        ipaddress.ip_network(r["ip_prefix"])
        for r in response.json()["prefixes"]
        if r["service"] == "AMAZON"
    }

Filters to service == "AMAZON" prefixes only, which covers all AWS services.

GCP

def fetch_gcp_ip_ranges() -> set[IPv4Network | IPv6Network]:
    response = requests.get(
        "https://www.gstatic.com/ipranges/cloud.json", timeout=10
    )
    # Handles both ipv4Prefix and ipv6Prefix keys

GCP publishes IPv4 and IPv6 ranges under different keys in the same JSON file. The function merges both into a single set.

Azure

Azure does not expose a stable JSON endpoint. The fetch function performs a two-step process:

  1. Fetches the Microsoft download page for Service Tags (id=56519).
  2. Extracts the actual JSON download URL from the HTML using a regex.
  3. Fetches the JSON and parses values[0].properties.addressPrefixes.

A browser-like User-Agent header is required to avoid being blocked by Microsoft's download portal.

Error Handling

Every fetch function catches all exceptions, logs the error, and returns an empty set. This prevents a single provider outage from breaking the entire refresh cycle.


Caching Strategy

In-Memory Cache

IP ranges are stored as set[IPv4Network | IPv6Network] in CloudManager.ip_ranges, keyed by provider name. This gives O(n) membership testing against the network set using Python's ipaddress module (ip_obj in network).

Redis Cache (Optional)

When Redis is available, refresh_async adds a second caching layer:

async def refresh_async(
    self, providers: set[str] = _ALL_PROVIDERS, ttl: int = 3600
) -> None:
    for provider in providers:
        cached_ranges = await self.redis_handler.get_key("cloud_ranges", provider)
        if cached_ranges:
            self.ip_ranges[provider] = {
                ipaddress.ip_network(ip) for ip in cached_ranges.split(",")
            }
            continue

        ranges = fetch_func()
        if ranges:
            self.ip_ranges[provider] = ranges
            await self.redis_handler.set_key(
                "cloud_ranges", provider,
                ",".join(str(ip) for ip in ranges), ttl=ttl,
            )

Flow:

  1. Check Redis for a cached comma-separated string of CIDR blocks.
  2. If found, deserialize into ipaddress networks and populate the in-memory cache.
  3. If not found, fetch from the provider, populate in-memory, and write back to Redis with a TTL.

Redis keys follow the pattern {prefix}cloud_ranges:{provider} (e.g., guard:cloud_ranges:AWS).

Sync vs Async Refresh

Method Redis Required Usage
refresh() No Synchronous in-memory-only refresh
refresh_async() Optional Async refresh with optional Redis cache

Calling refresh() when Redis is enabled raises RuntimeError to prevent blocking the event loop.


IP Checking

is_cloud_ip()

def is_cloud_ip(self, ip: str, providers: set[str] = _ALL_PROVIDERS) -> bool:
    ip_obj = ipaddress.ip_address(ip)
    for provider in providers:
        for network in self.ip_ranges.get(provider, set()):
            if ip_obj in network:
                return True
    return False

Parses the IP once, then iterates over every cached network for the requested providers. Returns True on the first match. Invalid IP strings are caught and logged, returning False.

get_cloud_provider_details()

def get_cloud_provider_details(
    self, ip: str, providers: set[str] = _ALL_PROVIDERS
) -> tuple[str, str] | None

Same logic as is_cloud_ip() but returns a (provider, network) tuple on match, or None. This is used by the event system to include the matched provider and CIDR block in detection events.


Refresh Intervals and cloud_ip_refresh_interval

The SecurityConfig model exposes:

cloud_ip_refresh_interval: int = Field(
    default=3600, ge=60,
    description="Interval in seconds between cloud IP range refreshes",
)

The CloudIpRefreshCheck pipeline check triggers a refresh when enough time has elapsed:

class CloudIpRefreshCheck(SecurityCheck):
    async def check(self, request: GuardRequest) -> GuardResponse | None:
        if (
            self.config.block_cloud_providers
            and time.time() - self.middleware.last_cloud_ip_refresh
            > self.config.cloud_ip_refresh_interval
        ):
            await self.middleware.refresh_cloud_ip_ranges()
        return None

This check runs on every request but only performs work when:

  • block_cloud_providers is configured.
  • The elapsed time since the last refresh exceeds cloud_ip_refresh_interval.

The default interval is 3600 seconds (1 hour). The minimum allowed value is 60 seconds.

Range Change Logging

When a refresh detects changes, _log_range_changes logs the delta:

Cloud IP range update for AWS: +12 added, -3 removed

Per-Provider Timestamps

CloudManager.last_updated tracks the last successful fetch time per provider as a datetime | None dict. This allows consumers to verify freshness independently for each provider.


Agent Event Integration

send_cloud_detection_event()

When a cloud IP is blocked and an agent handler is configured, CloudManager dispatches a SecurityEvent:

async def send_cloud_detection_event(
    self, ip: str, provider: str, network: str,
    action_taken: str = "request_blocked",
) -> None:
    await self._send_cloud_event(
        event_type="cloud_blocked",
        ip_address=ip,
        action_taken=action_taken,
        reason=f"IP belongs to blocked cloud provider: {provider}",
        cloud_provider=provider,
        network=network,
    )

The action_taken field reflects the mode:

Mode action_taken
Active request_blocked
Passive logged_only

Event Dispatch from the Pipeline

The CloudProviderCheck delegates event dispatch to SecurityEventBus.send_cloud_detection_events():

cloud_details = cloud_handler.get_cloud_provider_details(client_ip, providers)
if cloud_details and cloud_handler.agent_handler:
    provider, network = cloud_details
    await cloud_handler.send_cloud_detection_event(
        client_ip, provider, network,
        "request_blocked" if not passive_mode else "logged_only",
    )

Events are only sent when both conditions are met:

  1. get_cloud_provider_details() returns a match.
  2. An agent handler has been initialized via initialize_agent().

Pipeline Integration

Two security checks in the pipeline handle cloud provider logic:

Order Check Responsibility
11 CloudIpRefreshCheck Periodic refresh of IP ranges
13 CloudProviderCheck Block or log requests from cloud IPs

CloudProviderCheck respects:

  • Whitelisted IPs: Skipped if request.state.is_whitelisted is True.
  • Route-level bypass: Skipped if the route config disables the clouds check.
  • Provider scoping: Only checks providers returned by get_cloud_providers_to_check(), which can be narrowed per-route via decorators.
  • Passive mode: Logs but does not block when config.passive_mode is enabled.

Initialization

cloud_manager = CloudManager()

cloud_manager.refresh()

await cloud_manager.initialize_redis(redis_handler, providers={"AWS", "GCP"}, ttl=7200)

await cloud_manager.initialize_agent(agent_handler)

initialize_redis() triggers an immediate async refresh for the specified providers and caches results with the given TTL. initialize_agent() enables event dispatch.