I’ve been hunting SSRF bugs in Python applications for over five years, and the number of vulnerable codebases I encounter is honestly terrifying. Flask apps with requests.get(user_input) scattered everywhere. Django views that blindly fetch URLs from form data. FastAPI endpoints that turn your server into an attacker’s proxy.
The worst part? Most developers don’t realize they’re building SSRF vulnerabilities until it’s too late. By then, attackers have already stolen your AWS credentials or mapped your entire internal network.
This guide will teach you everything you need to know about preventing SSRF in Python applications, with practical examples for every major framework.
Understanding SSRF in Python Context#
Server-Side Request Forgery happens when your Python application makes HTTP requests based on user-controlled input. The attacker manipulates these requests to access resources they shouldn’t be able to reach.
The Classic Python SSRF Vulnerability#
Here’s the vulnerable pattern I see in almost every Python codebase:
import requests
from flask import Flask, request
app = Flask(__name__)
@app.route('/fetch-data', methods=['POST'])
def fetch_data():
url = request.json.get('url')
# This single line creates an SSRF vulnerability
response = requests.get(url)
return {
'data': response.text,
'status': response.status_code
}
# Attack payload:
# POST /fetch-data
# {"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/"}
# Result: Attacker gets your AWS credentials
The problem isn’t just external attacks. Internal attackers can use SSRF to access admin panels, databases, and other services that trust your application server.
Framework-Agnostic SSRF Protection Library#
Let me show you the production-ready SSRF protection library I use across all my Python projects:
import ipaddress
import socket
import urllib.parse
from typing import Set, Optional, Tuple
import re
class SSRFGuard:
"""Production-ready SSRF protection for Python applications"""
def __init__(self):
# Dangerous IP ranges that attackers target
self.blocked_networks = [
# IPv4 private/dangerous ranges
ipaddress.ip_network("0.0.0.0/8"), # "This" network
ipaddress.ip_network("10.0.0.0/8"), # Private Class A
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("169.254.0.0/16"), # Link-local (AWS metadata!)
ipaddress.ip_network("172.16.0.0/12"), # Private Class B
ipaddress.ip_network("192.0.0.0/24"), # IETF Protocol Assignments
ipaddress.ip_network("192.0.2.0/24"), # Test-Net-1
ipaddress.ip_network("192.88.99.0/24"), # 6to4 Relay Anycast
ipaddress.ip_network("192.168.0.0/16"), # Private Class C
ipaddress.ip_network("198.18.0.0/15"), # Network Interconnect Device Benchmark
ipaddress.ip_network("198.51.100.0/24"), # Test-Net-2
ipaddress.ip_network("203.0.113.0/24"), # Test-Net-3
ipaddress.ip_network("224.0.0.0/4"), # Multicast
ipaddress.ip_network("240.0.0.0/4"), # Reserved for future use
# IPv6 equivalents
ipaddress.ip_network("::1/128"), # IPv6 loopback
ipaddress.ip_network("fc00::/7"), # IPv6 unique local
ipaddress.ip_network("fe80::/10"), # IPv6 link-local
]
# Only allow safe schemes
self.allowed_schemes = {'http', 'https'}
# Common dangerous ports to block
self.dangerous_ports = {
22, # SSH
23, # Telnet
25, # SMTP
53, # DNS
110, # POP3
143, # IMAP
993, # IMAPS
995, # POP3S
1433, # MSSQL
3306, # MySQL
5432, # PostgreSQL
6379, # Redis
9200, # Elasticsearch
27017, # MongoDB
}
def validate_url(self, url: str, allowed_hosts: Optional[Set[str]] = None,
allowed_ports: Optional[Set[int]] = None) -> Tuple[bool, str]:
"""
Validate URL for SSRF safety
Args:
url: URL to validate
allowed_hosts: Set of explicitly allowed hostnames (allowlist approach)
allowed_ports: Set of allowed ports (if None, uses default safe ports)
Returns:
Tuple of (is_safe: bool, error_message: str)
"""
try:
parsed = urllib.parse.urlparse(url)
except Exception as e:
return False, f"Invalid URL format: {e}"
# Validate scheme
scheme = parsed.scheme.lower()
if scheme not in self.allowed_schemes:
return False, f"Scheme '{scheme}' not allowed. Only http/https permitted."
# Get hostname and port
hostname = parsed.hostname
if not hostname:
return False, "Missing hostname"
port = parsed.port or (443 if scheme == 'https' else 80)
# Check port if restrictions are set
if allowed_ports and port not in allowed_ports:
return False, f"Port {port} not in allowed ports"
# Check against dangerous ports (unless explicitly allowed)
if not allowed_ports and port in self.dangerous_ports:
return False, f"Port {port} is potentially dangerous"
# Allowlist check (most secure approach)
if allowed_hosts:
if hostname not in allowed_hosts:
return False, f"Host '{hostname}' not in allowlist"
else:
# If no allowlist, at least block obvious bad hostnames
if self._is_dangerous_hostname(hostname):
return False, f"Hostname '{hostname}' appears dangerous"
# Resolve hostname and validate IPs
try:
addr_info = socket.getaddrinfo(hostname, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.gaierror as e:
return False, f"Cannot resolve hostname '{hostname}': {e}"
# Check each resolved IP
for family, type_, proto, canonname, sockaddr in addr_info:
ip_str = sockaddr[0]
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
return False, f"Invalid IP address: {ip_str}"
# Check against blocked networks
for blocked_network in self.blocked_networks:
if ip in blocked_network:
return False, f"IP {ip} is in blocked network {blocked_network}"
# Special check for cloud metadata service
if str(ip) == "169.254.169.254":
return False, "Access to cloud metadata service blocked"
return True, "URL is safe"
def _is_dangerous_hostname(self, hostname: str) -> bool:
"""Check for obviously dangerous hostnames"""
dangerous_patterns = [
r'^localhost$',
r'^.*\.local$',
r'^.*\.internal$',
r'^.*\.corp$',
r'^.*\.lan$',
r'^\d+\.\d+\.\d+\.\d+$', # Raw IP addresses
]
hostname_lower = hostname.lower()
for pattern in dangerous_patterns:
if re.match(pattern, hostname_lower):
return True
return False
# Global instance
ssrf_guard = SSRFGuard()
Flask: SSRF-Safe Request Handling#
Here’s how to properly implement SSRF protection in Flask applications:
from flask import Flask, request, jsonify
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
app = Flask(__name__)
def create_safe_session():
"""Create a requests session with SSRF protection"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Set safe defaults
session.headers.update({
'User-Agent': 'MyApp/1.0 (Security Scanner)',
'Accept': 'application/json, text/plain, */*',
})
return session
@app.route('/api/proxy', methods=['POST'])
def secure_proxy():
"""Secure proxy endpoint with SSRF protection"""
data = request.get_json()
if not data or 'url' not in data:
return jsonify({'error': 'URL required'}), 400
url = data['url']
# Define allowed destinations for this endpoint
allowed_hosts = {
'api.github.com',
'api.twitter.com',
'httpbin.org',
'jsonplaceholder.typicode.com'
}
# Validate URL
is_safe, error = ssrf_guard.validate_url(url, allowed_hosts=allowed_hosts)
if not is_safe:
app.logger.warning(f"SSRF attempt blocked: {url} - {error}")
return jsonify({'error': f'Invalid URL: {error}'}), 400
try:
session = create_safe_session()
response = session.get(
url,
timeout=10,
allow_redirects=False, # Prevent redirect-based bypasses
stream=True # Don't load huge responses into memory
)
# Validate content type
content_type = response.headers.get('content-type', '').lower()
allowed_content_types = ['application/json', 'text/plain', 'text/html']
if not any(ct in content_type for ct in allowed_content_types):
return jsonify({'error': 'Unsupported content type'}), 400
# Limit response size (10MB max)
content_length = response.headers.get('content-length')
if content_length and int(content_length) > 10 * 1024 * 1024:
return jsonify({'error': 'Response too large'}), 413
# Read response with size limit
content = response.text[:1024 * 1024] # 1MB limit
return jsonify({
'data': content,
'status_code': response.status_code,
'headers': dict(response.headers),
'url': response.url
})
except requests.Timeout:
return jsonify({'error': 'Request timeout'}), 408
except requests.RequestException as e:
app.logger.error(f"Request failed: {e}")
return jsonify({'error': 'Request failed'}), 502
@app.route('/api/fetch-image', methods=['POST'])
def fetch_image():
"""Fetch and validate image from URL"""
data = request.get_json()
if not data or 'url' not in data:
return jsonify({'error': 'URL required'}), 400
url = data['url']
# More restrictive validation for image fetching
allowed_hosts = {'images.unsplash.com', 'picsum.photos'}
allowed_ports = {80, 443} # Only HTTP/HTTPS default ports
is_safe, error = ssrf_guard.validate_url(
url,
allowed_hosts=allowed_hosts,
allowed_ports=allowed_ports
)
if not is_safe:
return jsonify({'error': f'Invalid image URL: {error}'}), 400
try:
session = create_safe_session()
response = session.head(url, timeout=5) # Just check headers first
content_type = response.headers.get('content-type', '').lower()
if not content_type.startswith('image/'):
return jsonify({'error': 'URL does not point to an image'}), 400
# Now fetch the actual image
response = session.get(url, timeout=15, stream=True)
return jsonify({
'image_url': url,
'content_type': content_type,
'size': response.headers.get('content-length'),
'status': 'success'
})
except requests.RequestException as e:
return jsonify({'error': f'Failed to fetch image: {e}'}), 502
Django: SSRF Protection in Views#
Django applications need similar protection, integrated with Django’s request handling:
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.utils.decorators import method_decorator
from django.views.generic import View
import json
import requests
import logging
logger = logging.getLogger(__name__)
class SSRFProtectedMixin:
"""Mixin to add SSRF protection to Django views"""
def validate_and_fetch(self, url, allowed_hosts=None, **kwargs):
"""Validate URL and fetch safely"""
is_safe, error = ssrf_guard.validate_url(url, allowed_hosts)
if not is_safe:
logger.warning(f"SSRF attempt in {self.__class__.__name__}: {url} - {error}")
raise ValueError(error)
return requests.get(
url,
timeout=kwargs.get('timeout', 10),
allow_redirects=False,
headers={'User-Agent': 'Django App/1.0'}
)
@method_decorator(csrf_exempt, name='dispatch')
class WebhookProxyView(SSRFProtectedMixin, View):
"""Secure webhook proxy that forwards requests to allowed services"""
def post(self, request):
try:
data = json.loads(request.body)
webhook_url = data.get('webhook_url')
payload = data.get('payload', {})
if not webhook_url:
return JsonResponse({'error': 'webhook_url required'}, status=400)
# Define allowed webhook destinations
allowed_webhook_hosts = {
'hooks.slack.com',
'discord.com',
'api.github.com'
}
# Use the mixin method for safe fetching
response = self.validate_and_fetch(
webhook_url,
allowed_hosts=allowed_webhook_hosts
)
# Forward the webhook
webhook_response = requests.post(
webhook_url,
json=payload,
timeout=10,
allow_redirects=False
)
return JsonResponse({
'status': 'success',
'webhook_status': webhook_response.status_code
})
except ValueError as e:
return JsonResponse({'error': str(e)}, status=400)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except requests.RequestException as e:
logger.error(f"Webhook request failed: {e}")
return JsonResponse({'error': 'Webhook delivery failed'}, status=502)
@require_http_methods(["POST"])
@csrf_exempt
def url_metadata_extractor(request):
"""Extract metadata from URLs safely"""
try:
data = json.loads(request.body)
url = data.get('url')
if not url:
return JsonResponse({'error': 'URL required'}, status=400)
# Allow any HTTPS URL for metadata extraction, but validate first
is_safe, error = ssrf_guard.validate_url(url)
if not is_safe:
return JsonResponse({'error': error}, status=400)
# Only allow HTTPS for metadata extraction
if not url.startswith('https://'):
return JsonResponse({'error': 'Only HTTPS URLs allowed'}, status=400)
response = requests.head(url, timeout=5, allow_redirects=True)
metadata = {
'url': response.url, # Final URL after redirects
'status_code': response.status_code,
'content_type': response.headers.get('content-type'),
'content_length': response.headers.get('content-length'),
'last_modified': response.headers.get('last-modified'),
'server': response.headers.get('server', '').split('/')[0] # Don't leak versions
}
return JsonResponse(metadata)
except requests.RequestException as e:
return JsonResponse({'error': 'Failed to fetch URL metadata'}, status=502)
except Exception as e:
logger.error(f"Unexpected error in url_metadata_extractor: {e}")
return JsonResponse({'error': 'Internal server error'}, status=500)
FastAPI: Modern Async SSRF Protection#
FastAPI applications can use async HTTP clients, but need the same SSRF protection:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, HttpUrl, validator
import httpx
import asyncio
from typing import Optional, Set
app = FastAPI(title="SSRF-Protected API")
class URLRequest(BaseModel):
url: HttpUrl
timeout: Optional[int] = 10
@validator('timeout')
def validate_timeout(cls, v):
if v < 1 or v > 30:
raise ValueError('Timeout must be between 1 and 30 seconds')
return v
class SSRFProtectedClient:
"""Async HTTP client with SSRF protection"""
def __init__(self):
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
headers={'User-Agent': 'FastAPI App/1.0'}
)
async def safe_get(self, url: str, allowed_hosts: Optional[Set[str]] = None, **kwargs):
"""Make safe GET request with SSRF protection"""
is_safe, error = ssrf_guard.validate_url(url, allowed_hosts)
if not is_safe:
raise HTTPException(status_code=400, detail=f"SSRF protection: {error}")
try:
response = await self.client.get(
url,
follow_redirects=False, # Prevent redirect bypasses
**kwargs
)
return response
except httpx.RequestError as e:
raise HTTPException(status_code=502, detail=f"Request failed: {e}")
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()
async def get_protected_client():
"""Dependency to provide SSRF-protected HTTP client"""
async with SSRFProtectedClient() as client:
yield client
@app.post("/api/fetch-json")
async def fetch_json_data(
request: URLRequest,
client: SSRFProtectedClient = Depends(get_protected_client)
):
"""Fetch JSON data from allowed external APIs"""
allowed_apis = {
'jsonplaceholder.typicode.com',
'api.github.com',
'httpbin.org'
}
response = await client.safe_get(
str(request.url),
allowed_hosts=allowed_apis,
timeout=request.timeout
)
# Validate response is JSON
content_type = response.headers.get('content-type', '')
if 'application/json' not in content_type:
raise HTTPException(status_code=400, detail="Response is not JSON")
try:
data = response.json()
except ValueError:
raise HTTPException(status_code=502, detail="Invalid JSON response")
return {
'data': data,
'status_code': response.status_code,
'url': str(response.url),
'content_type': content_type
}
@app.post("/api/check-website")
async def check_website_status(
request: URLRequest,
client: SSRFProtectedClient = Depends(get_protected_client)
):
"""Check if a website is accessible (HEAD request only)"""
# More permissive - allow any valid external domain
response = await client.safe_get(str(request.url))
return {
'url': str(response.url),
'status_code': response.status_code,
'is_accessible': response.status_code < 400,
'response_time_ms': response.elapsed.total_seconds() * 1000,
'headers': {
'content-type': response.headers.get('content-type'),
'server': response.headers.get('server', '').split('/')[0],
'content-length': response.headers.get('content-length')
}
}
Cloud Environment Protection#
SSRF in cloud environments is particularly dangerous because of metadata services:
import boto3
from botocore.exceptions import ClientError
class CloudSSRFProtection:
"""Enhanced SSRF protection for cloud environments"""
def __init__(self):
self.metadata_endpoints = {
'169.254.169.254', # AWS/Azure
'metadata.google.internal', # GCP
'100.100.100.200', # Alibaba Cloud
}
def is_cloud_metadata_request(self, url: str) -> bool:
"""Check if URL targets cloud metadata service"""
parsed = urllib.parse.urlparse(url)
hostname = parsed.hostname
if not hostname:
return False
# Check direct IP access
if hostname in self.metadata_endpoints:
return True
# Check DNS resolution to metadata IPs
try:
addr_info = socket.getaddrinfo(hostname, None)
for family, type_, proto, canonname, sockaddr in addr_info:
if sockaddr[0] in self.metadata_endpoints:
return True
except socket.gaierror:
pass
return False
def setup_aws_imds_protection():
"""Configure AWS Instance Metadata Service v2 (IMDSv2) for SSRF protection"""
try:
ec2_client = boto3.client('ec2')
# Get current instance ID
import requests
instance_id = requests.get(
'http://169.254.169.254/latest/meta-data/instance-id',
timeout=2
).text
# Enable IMDSv2 (requires tokens, prevents SSRF)
ec2_client.modify_instance_metadata_options(
InstanceId=instance_id,
HttpTokens='required', # Require tokens
HttpPutResponseHopLimit=1, # Prevent forwarding
HttpEndpoint='enabled'
)
print("IMDSv2 enabled for SSRF protection")
except Exception as e:
print(f"Failed to configure IMDSv2: {e}")
# Network-level protection
def generate_iptables_rules():
"""Generate iptables rules for SSRF protection"""
rules = [
# Block metadata services
"iptables -A OUTPUT -d 169.254.169.254 -j REJECT --reject-with icmp-port-unreachable",
"iptables -A OUTPUT -d 100.100.100.200 -j REJECT --reject-with icmp-port-unreachable",
# Block private networks (adjust for your environment)
"iptables -A OUTPUT -d 10.0.0.0/8 -p tcp --dport 22 -j REJECT",
"iptables -A OUTPUT -d 172.16.0.0/12 -p tcp --dport 22 -j REJECT",
"iptables -A OUTPUT -d 192.168.0.0/16 -p tcp --dport 22 -j REJECT",
# Allow specific external services
"iptables -I OUTPUT -d api.github.com -j ACCEPT",
"iptables -I OUTPUT -d httpbin.org -j ACCEPT",
]
return rules
Testing Your SSRF Protection#
Comprehensive testing is crucial to verify your SSRF defenses work:
import pytest
import responses
from unittest.mock import patch, MagicMock
class TestSSRFProtection:
"""Comprehensive SSRF protection test suite"""
def test_blocks_localhost(self):
"""Test that localhost access is blocked"""
dangerous_urls = [
"http://127.0.0.1:22",
"http://localhost:3306",
"http://[::1]:80",
"http://0.0.0.0:8080"
]
for url in dangerous_urls:
is_safe, error = ssrf_guard.validate_url(url)
assert not is_safe, f"URL should be blocked: {url}"
assert "blocked" in error.lower()
def test_blocks_private_networks(self):
"""Test that private network access is blocked"""
private_urls = [
"http://192.168.1.1/admin",
"http://10.0.0.5:8080",
"http://172.16.0.1/config"
]
for url in private_urls:
is_safe, error = ssrf_guard.validate_url(url)
assert not is_safe, f"Private URL should be blocked: {url}"
def test_blocks_cloud_metadata(self):
"""Test that cloud metadata access is blocked"""
metadata_urls = [
"http://169.254.169.254/latest/meta-data/",
"http://metadata.google.internal/computeMetadata/",
"http://100.100.100.200/latest/meta-data/"
]
for url in metadata_urls:
is_safe, error = ssrf_guard.validate_url(url)
assert not is_safe, f"Metadata URL should be blocked: {url}"
def test_allows_allowlisted_hosts(self):
"""Test that allowlisted hosts are permitted"""
allowed_hosts = {'httpbin.org', 'api.github.com'}
safe_urls = [
"https://httpbin.org/json",
"https://api.github.com/users/octocat"
]
for url in safe_urls:
is_safe, error = ssrf_guard.validate_url(url, allowed_hosts)
assert is_safe, f"Allowlisted URL should be allowed: {url}"
def test_dns_rebinding_protection(self):
"""Test protection against DNS rebinding attacks"""
# Mock DNS resolution that changes
with patch('socket.getaddrinfo') as mock_getaddrinfo:
# First resolves to safe IP, then to dangerous IP
mock_getaddrinfo.side_effect = [
[socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 80)],
[socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.0.0.1', 80)]
]
# Should be safe first time
is_safe, error = ssrf_guard.validate_url("http://evil.com/")
# Implementation would need caching to prevent rebinding
@responses.activate
def test_flask_endpoint_blocks_ssrf(self):
"""Test Flask endpoint properly blocks SSRF attempts"""
from your_app import app # Import your Flask app
client = app.test_client()
# Test blocked request
response = client.post('/api/proxy',
json={'url': 'http://127.0.0.1:22'}
)
assert response.status_code == 400
assert 'blocked' in response.get_json()['error'].lower()
# Test allowed request
responses.add(
responses.GET,
'https://httpbin.org/json',
json={'test': 'data'},
status=200
)
response = client.post('/api/proxy',
json={'url': 'https://httpbin.org/json'}
)
assert response.status_code == 200
def test_production_payloads(self):
"""Test with real-world SSRF attack payloads"""
attack_payloads = [
# Basic localhost variants
"http://127.0.0.1/",
"http://localhost/",
"http://0.0.0.0/",
"http://[::1]/",
# Decimal/octal IP encoding
"http://2130706433/", # 127.0.0.1 in decimal
"http://0177.0.0.1/", # Octal encoding
"http://127.1/", # Short form
# Domain tricks
"http://127.0.0.1.nip.io/", # Resolves to 127.0.0.1
"http://127.0.0.1.xip.io/",
# Cloud metadata
"http://169.254.169.254/latest/meta-data/",
"http://[fd00:ec2::254]/latest/meta-data/",
# Protocol variants
"file:///etc/passwd",
"gopher://127.0.0.1:80/",
"dict://127.0.0.1:11211/",
# Port variants
"http://127.0.0.1:22", # SSH
"http://127.0.0.1:3306", # MySQL
"http://127.0.0.1:6379", # Redis
]
for payload in attack_payloads:
is_safe, error = ssrf_guard.validate_url(payload)
assert not is_safe, f"Attack payload should be blocked: {payload}"
if __name__ == "__main__":
# Quick test runner
test_suite = TestSSRFProtection()
print("Testing SSRF protection...")
test_methods = [method for method in dir(test_suite) if method.startswith('test_')]
for test_method in test_methods:
try:
getattr(test_suite, test_method)()
print(f"✓ {test_method}")
except AssertionError as e:
print(f"✗ {test_method}: {e}")
except Exception as e:
print(f"⚠ {test_method}: Error - {e}")
print("SSRF protection testing complete!")
Production Deployment Checklist#
Before deploying your SSRF-protected Python application:
Application Level#
- All outbound HTTP requests use the SSRFGuard validation
- Allowlists are defined for all external API integrations
- Request timeouts are configured (max 30 seconds)
- Response size limits are enforced (max 10MB)
- Redirects are disabled or carefully validated
- Error messages don’t leak internal network information
- SSRF attempts are logged for security monitoring
Infrastructure Level#
- Cloud metadata services are protected (IMDSv2 enabled)
- Firewall rules block internal network access from web tier
- DNS resolution is controlled (no wildcard internal domains)
- Network segmentation isolates sensitive services
- VPC security groups restrict outbound connections
- Load balancer logs capture outbound request attempts
Monitoring & Response#
- SSRF attempts trigger security alerts
- Failed requests are correlated with user accounts
- Network traffic is monitored for unusual patterns
- Incident response plan includes SSRF scenarios
- Regular penetration testing validates defenses
Frequently Asked Questions#
How do I handle webhooks without creating SSRF vulnerabilities?#
Use strict allowlists for webhook destinations. For services like Slack or Discord, only allow their official webhook domains. Validate the webhook URL before storing it, not just before using it.
Can attackers bypass DNS-based validation?#
Yes, through DNS rebinding attacks. The attacker controls a domain that initially resolves to a safe IP, then changes to target internal services. Use DNS caching and re-validation to mitigate this.
What if I need to fetch arbitrary URLs for legitimate reasons?#
Use a separate service in a sandboxed environment (like AWS Lambda) that has no access to your internal network. Pass the results back through a secure API.
How do I test SSRF protection in CI/CD pipelines?#
Include the test payloads in your automated test suite. Use tools like SSRFmap for comprehensive testing, and monitor your application logs for blocked requests during testing.
Should I use the requests library or httpx for Python applications?#
Both can be made secure with proper validation. httpx has better async support for modern applications, but requests is more mature. The validation layer is more important than the HTTP library choice.
What’s the performance impact of DNS resolution for every request?#
Minimal if you cache DNS results appropriately. Consider using a DNS cache with a TTL of 5-10 minutes to balance security and performance.
The Bottom Line#
SSRF prevention in Python comes down to three principles:
- Never trust user input for URLs - Always validate before making requests
- Use allowlists, not denylists - Explicitly define what’s allowed rather than trying to block everything dangerous
- Defense in depth - Combine application-level validation with network-level controls
The code examples in this guide give you production-ready SSRF protection for any Python web application. Implement the SSRFGuard library, adapt it for your framework, test thoroughly with attack payloads, and monitor for attempts in production.
SSRF is 100% preventable if you validate every outbound request your application makes. No exceptions.
Related Security Topics#
📘 SSRF Deep Dive Series:
- 7 Critical SSRF Attack Techniques - Learn how SSRF attacks work
- SSRF Prevention Guide - General defense strategies across languages
- Python SSRF Prevention - Python-specific guide (you are here)
- CVE-2026-27696 Analysis - Real-world SSRF bypass example
🎯 Python Security#
- Complete Python Security Guide - Prevent SSRF, SQL injection, and XSS
- CSRF vs SSRF Comparison - Understand both vulnerability types
🛠️ Hands-On Practice#
- Security Playground - Interactive SSRF demos and testing
- Python Requests Tutorial - Secure HTTP client usage patterns
📚 Web Security Fundamentals#
- Content Security Policy Guide - Browser-based attack prevention
- Common Weakness Enumeration - CWE-918: Server-Side Request Forgery classification