Cloud Providers¶
Guard-core can block requests originating from cloud provider IP ranges (AWS, GCP, Azure). The CloudManager handler fetches the official IP range lists, caches them as ipaddress network objects, and exposes a fast membership check used by the security pipeline.
CloudManager¶
guard_core.handlers.cloud_handler.CloudManager
¶
agent_handler = None
class-attribute
instance-attribute
¶
ip_ranges
instance-attribute
¶
last_updated
instance-attribute
¶
logger
instance-attribute
¶
redis_handler = None
class-attribute
instance-attribute
¶
get_cloud_provider_details(ip, providers=_ALL_PROVIDERS)
¶
Source code in guard_core/handlers/cloud_handler.py
initialize_agent(agent_handler)
async
¶
initialize_redis(redis_handler, providers=_ALL_PROVIDERS, ttl=3600)
async
¶
is_cloud_ip(ip, providers=_ALL_PROVIDERS)
¶
Source code in guard_core/handlers/cloud_handler.py
refresh(providers=_ALL_PROVIDERS)
¶
refresh_async(providers=_ALL_PROVIDERS, ttl=3600)
async
¶
Source code in guard_core/handlers/cloud_handler.py
send_cloud_detection_event(ip, provider, network, action_taken='request_blocked')
async
¶
Source code in guard_core/handlers/cloud_handler.py
Singleton Pattern¶
CloudManager uses a singleton so that IP range data is shared across the entire process:
class CloudManager:
_instance = None
ip_ranges: dict[str, set[IPv4Network | IPv6Network]]
last_updated: dict[str, datetime | None]
def __new__(cls) -> "CloudManager":
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.ip_ranges = {"AWS": set(), "GCP": set(), "Azure": set()}
cls._instance.last_updated = {p: None for p in _ALL_PROVIDERS}
return cls._instance
A module-level instance cloud_handler is the canonical access point used throughout guard-core.
IP Range Fetching¶
Each provider has a dedicated fetch function that returns a set[IPv4Network | IPv6Network].
AWS¶
def fetch_aws_ip_ranges() -> set[IPv4Network | IPv6Network]:
response = requests.get(
"https://ip-ranges.amazonaws.com/ip-ranges.json", timeout=10
)
return {
ipaddress.ip_network(r["ip_prefix"])
for r in response.json()["prefixes"]
if r["service"] == "AMAZON"
}
Filters to service == "AMAZON" prefixes only, which covers all AWS services.
GCP¶
def fetch_gcp_ip_ranges() -> set[IPv4Network | IPv6Network]:
response = requests.get(
"https://www.gstatic.com/ipranges/cloud.json", timeout=10
)
# Handles both ipv4Prefix and ipv6Prefix keys
GCP publishes IPv4 and IPv6 ranges under different keys in the same JSON file. The function merges both into a single set.
Azure¶
Azure does not expose a stable JSON endpoint. The fetch function performs a two-step process:
- Fetches the Microsoft download page for Service Tags (
id=56519). - Extracts the actual JSON download URL from the HTML using a regex.
- Fetches the JSON and parses
values[0].properties.addressPrefixes.
A browser-like User-Agent header is required to avoid being blocked by Microsoft's download portal.
Error Handling¶
Every fetch function catches all exceptions, logs the error, and returns an empty set. This prevents a single provider outage from breaking the entire refresh cycle.
Caching Strategy¶
In-Memory Cache¶
IP ranges are stored as set[IPv4Network | IPv6Network] in CloudManager.ip_ranges, keyed by provider name. This gives O(n) membership testing against the network set using Python's ipaddress module (ip_obj in network).
Redis Cache (Optional)¶
When Redis is available, refresh_async adds a second caching layer:
async def refresh_async(
self, providers: set[str] = _ALL_PROVIDERS, ttl: int = 3600
) -> None:
for provider in providers:
cached_ranges = await self.redis_handler.get_key("cloud_ranges", provider)
if cached_ranges:
self.ip_ranges[provider] = {
ipaddress.ip_network(ip) for ip in cached_ranges.split(",")
}
continue
ranges = fetch_func()
if ranges:
self.ip_ranges[provider] = ranges
await self.redis_handler.set_key(
"cloud_ranges", provider,
",".join(str(ip) for ip in ranges), ttl=ttl,
)
Flow:
- Check Redis for a cached comma-separated string of CIDR blocks.
- If found, deserialize into
ipaddressnetworks and populate the in-memory cache. - If not found, fetch from the provider, populate in-memory, and write back to Redis with a TTL.
Redis keys follow the pattern {prefix}cloud_ranges:{provider} (e.g., guard:cloud_ranges:AWS).
Sync vs Async Refresh¶
| Method | Redis Required | Usage |
|---|---|---|
refresh() |
No | Synchronous in-memory-only refresh |
refresh_async() |
Optional | Async refresh with optional Redis cache |
Calling refresh() when Redis is enabled raises RuntimeError to prevent blocking the event loop.
IP Checking¶
is_cloud_ip()¶
def is_cloud_ip(self, ip: str, providers: set[str] = _ALL_PROVIDERS) -> bool:
ip_obj = ipaddress.ip_address(ip)
for provider in providers:
for network in self.ip_ranges.get(provider, set()):
if ip_obj in network:
return True
return False
Parses the IP once, then iterates over every cached network for the requested providers. Returns True on the first match. Invalid IP strings are caught and logged, returning False.
get_cloud_provider_details()¶
def get_cloud_provider_details(
self, ip: str, providers: set[str] = _ALL_PROVIDERS
) -> tuple[str, str] | None
Same logic as is_cloud_ip() but returns a (provider, network) tuple on match, or None. This is used by the event system to include the matched provider and CIDR block in detection events.
Refresh Intervals and cloud_ip_refresh_interval¶
The SecurityConfig model exposes:
cloud_ip_refresh_interval: int = Field(
default=3600, ge=60,
description="Interval in seconds between cloud IP range refreshes",
)
The CloudIpRefreshCheck pipeline check triggers a refresh when enough time has elapsed:
class CloudIpRefreshCheck(SecurityCheck):
async def check(self, request: GuardRequest) -> GuardResponse | None:
if (
self.config.block_cloud_providers
and time.time() - self.middleware.last_cloud_ip_refresh
> self.config.cloud_ip_refresh_interval
):
await self.middleware.refresh_cloud_ip_ranges()
return None
This check runs on every request but only performs work when:
block_cloud_providersis configured.- The elapsed time since the last refresh exceeds
cloud_ip_refresh_interval.
The default interval is 3600 seconds (1 hour). The minimum allowed value is 60 seconds.
Range Change Logging¶
When a refresh detects changes, _log_range_changes logs the delta:
Per-Provider Timestamps¶
CloudManager.last_updated tracks the last successful fetch time per provider as a datetime | None dict. This allows consumers to verify freshness independently for each provider.
Agent Event Integration¶
send_cloud_detection_event()¶
When a cloud IP is blocked and an agent handler is configured, CloudManager dispatches a SecurityEvent:
async def send_cloud_detection_event(
self, ip: str, provider: str, network: str,
action_taken: str = "request_blocked",
) -> None:
await self._send_cloud_event(
event_type="cloud_blocked",
ip_address=ip,
action_taken=action_taken,
reason=f"IP belongs to blocked cloud provider: {provider}",
cloud_provider=provider,
network=network,
)
The action_taken field reflects the mode:
| Mode | action_taken |
|---|---|
| Active | request_blocked |
| Passive | logged_only |
Event Dispatch from the Pipeline¶
The CloudProviderCheck delegates event dispatch to SecurityEventBus.send_cloud_detection_events():
cloud_details = cloud_handler.get_cloud_provider_details(client_ip, providers)
if cloud_details and cloud_handler.agent_handler:
provider, network = cloud_details
await cloud_handler.send_cloud_detection_event(
client_ip, provider, network,
"request_blocked" if not passive_mode else "logged_only",
)
Events are only sent when both conditions are met:
get_cloud_provider_details()returns a match.- An agent handler has been initialized via
initialize_agent().
Pipeline Integration¶
Two security checks in the pipeline handle cloud provider logic:
| Order | Check | Responsibility |
|---|---|---|
| 11 | CloudIpRefreshCheck |
Periodic refresh of IP ranges |
| 13 | CloudProviderCheck |
Block or log requests from cloud IPs |
CloudProviderCheck respects:
- Whitelisted IPs: Skipped if
request.state.is_whitelistedisTrue. - Route-level bypass: Skipped if the route config disables the
cloudscheck. - Provider scoping: Only checks providers returned by
get_cloud_providers_to_check(), which can be narrowed per-route via decorators. - Passive mode: Logs but does not block when
config.passive_modeis enabled.
Initialization¶
cloud_manager = CloudManager()
cloud_manager.refresh()
await cloud_manager.initialize_redis(redis_handler, providers={"AWS", "GCP"}, ttl=7200)
await cloud_manager.initialize_agent(agent_handler)
initialize_redis() triggers an immediate async refresh for the specified providers and caches results with the given TTL. initialize_agent() enables event dispatch.