I’ve been hunting SSRF bugs in Python applications for over five years, and the number of vulnerable codebases I encounter is honestly terrifying. Flask apps with requests.get(user_input) scattered everywhere. Django views that blindly fetch URLs from form data. FastAPI endpoints that turn your server into an attacker’s proxy.

The worst part? Most developers don’t realize they’re building SSRF vulnerabilities until it’s too late. By then, attackers have already stolen your AWS credentials or mapped your entire internal network.

This guide will teach you everything you need to know about preventing SSRF in Python applications, with practical examples for every major framework.

Understanding SSRF in Python Context

Server-Side Request Forgery happens when your Python application makes HTTP requests based on user-controlled input. The attacker manipulates these requests to access resources they shouldn’t be able to reach.

The Classic Python SSRF Vulnerability

Here’s the vulnerable pattern I see in almost every Python codebase:

import requests
from flask import Flask, request

app = Flask(__name__)

@app.route('/fetch-data', methods=['POST'])
def fetch_data():
    url = request.json.get('url')
    
    # This single line creates an SSRF vulnerability
    response = requests.get(url)
    
    return {
        'data': response.text,
        'status': response.status_code
    }

# Attack payload:
# POST /fetch-data
# {"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/"}
# Result: Attacker gets your AWS credentials

The problem isn’t just external attacks. Internal attackers can use SSRF to access admin panels, databases, and other services that trust your application server.

Framework-Agnostic SSRF Protection Library

Let me show you the production-ready SSRF protection library I use across all my Python projects:

import ipaddress
import socket
import urllib.parse
from typing import Set, Optional, Tuple
import re

class SSRFGuard:
    """Production-ready SSRF protection for Python applications"""
    
    def __init__(self):
        # Dangerous IP ranges that attackers target
        self.blocked_networks = [
            # IPv4 private/dangerous ranges
            ipaddress.ip_network("0.0.0.0/8"),        # "This" network
            ipaddress.ip_network("10.0.0.0/8"),       # Private Class A
            ipaddress.ip_network("127.0.0.0/8"),      # Loopback
            ipaddress.ip_network("169.254.0.0/16"),   # Link-local (AWS metadata!)
            ipaddress.ip_network("172.16.0.0/12"),    # Private Class B
            ipaddress.ip_network("192.0.0.0/24"),     # IETF Protocol Assignments
            ipaddress.ip_network("192.0.2.0/24"),     # Test-Net-1
            ipaddress.ip_network("192.88.99.0/24"),   # 6to4 Relay Anycast
            ipaddress.ip_network("192.168.0.0/16"),   # Private Class C
            ipaddress.ip_network("198.18.0.0/15"),    # Network Interconnect Device Benchmark
            ipaddress.ip_network("198.51.100.0/24"),  # Test-Net-2
            ipaddress.ip_network("203.0.113.0/24"),   # Test-Net-3
            ipaddress.ip_network("224.0.0.0/4"),      # Multicast
            ipaddress.ip_network("240.0.0.0/4"),      # Reserved for future use
            # IPv6 equivalents
            ipaddress.ip_network("::1/128"),          # IPv6 loopback
            ipaddress.ip_network("fc00::/7"),         # IPv6 unique local
            ipaddress.ip_network("fe80::/10"),        # IPv6 link-local
        ]
        
        # Only allow safe schemes
        self.allowed_schemes = {'http', 'https'}
        
        # Common dangerous ports to block
        self.dangerous_ports = {
            22,    # SSH
            23,    # Telnet
            25,    # SMTP
            53,    # DNS
            110,   # POP3
            143,   # IMAP
            993,   # IMAPS
            995,   # POP3S
            1433,  # MSSQL
            3306,  # MySQL
            5432,  # PostgreSQL
            6379,  # Redis
            9200,  # Elasticsearch
            27017, # MongoDB
        }
    
    def validate_url(self, url: str, allowed_hosts: Optional[Set[str]] = None, 
                     allowed_ports: Optional[Set[int]] = None) -> Tuple[bool, str]:
        """
        Validate URL for SSRF safety
        
        Args:
            url: URL to validate
            allowed_hosts: Set of explicitly allowed hostnames (allowlist approach)
            allowed_ports: Set of allowed ports (if None, uses default safe ports)
            
        Returns:
            Tuple of (is_safe: bool, error_message: str)
        """
        try:
            parsed = urllib.parse.urlparse(url)
        except Exception as e:
            return False, f"Invalid URL format: {e}"
        
        # Validate scheme
        scheme = parsed.scheme.lower()
        if scheme not in self.allowed_schemes:
            return False, f"Scheme '{scheme}' not allowed. Only http/https permitted."
        
        # Get hostname and port
        hostname = parsed.hostname
        if not hostname:
            return False, "Missing hostname"
        
        port = parsed.port or (443 if scheme == 'https' else 80)
        
        # Check port if restrictions are set
        if allowed_ports and port not in allowed_ports:
            return False, f"Port {port} not in allowed ports"
        
        # Check against dangerous ports (unless explicitly allowed)
        if not allowed_ports and port in self.dangerous_ports:
            return False, f"Port {port} is potentially dangerous"
        
        # Allowlist check (most secure approach)
        if allowed_hosts:
            if hostname not in allowed_hosts:
                return False, f"Host '{hostname}' not in allowlist"
        else:
            # If no allowlist, at least block obvious bad hostnames
            if self._is_dangerous_hostname(hostname):
                return False, f"Hostname '{hostname}' appears dangerous"
        
        # Resolve hostname and validate IPs
        try:
            addr_info = socket.getaddrinfo(hostname, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
        except socket.gaierror as e:
            return False, f"Cannot resolve hostname '{hostname}': {e}"
        
        # Check each resolved IP
        for family, type_, proto, canonname, sockaddr in addr_info:
            ip_str = sockaddr[0]
            
            try:
                ip = ipaddress.ip_address(ip_str)
            except ValueError:
                return False, f"Invalid IP address: {ip_str}"
            
            # Check against blocked networks
            for blocked_network in self.blocked_networks:
                if ip in blocked_network:
                    return False, f"IP {ip} is in blocked network {blocked_network}"
            
            # Special check for cloud metadata service
            if str(ip) == "169.254.169.254":
                return False, "Access to cloud metadata service blocked"
        
        return True, "URL is safe"
    
    def _is_dangerous_hostname(self, hostname: str) -> bool:
        """Check for obviously dangerous hostnames"""
        dangerous_patterns = [
            r'^localhost$',
            r'^.*\.local$',
            r'^.*\.internal$', 
            r'^.*\.corp$',
            r'^.*\.lan$',
            r'^\d+\.\d+\.\d+\.\d+$',  # Raw IP addresses
        ]
        
        hostname_lower = hostname.lower()
        for pattern in dangerous_patterns:
            if re.match(pattern, hostname_lower):
                return True
        return False

# Global instance
ssrf_guard = SSRFGuard()

Flask: SSRF-Safe Request Handling

Here’s how to properly implement SSRF protection in Flask applications:

from flask import Flask, request, jsonify
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

app = Flask(__name__)

def create_safe_session():
    """Create a requests session with SSRF protection"""
    session = requests.Session()
    
    # Configure retry strategy
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,
        status_forcelist=[429, 500, 502, 503, 504],
    )
    
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    # Set safe defaults
    session.headers.update({
        'User-Agent': 'MyApp/1.0 (Security Scanner)',
        'Accept': 'application/json, text/plain, */*',
    })
    
    return session

@app.route('/api/proxy', methods=['POST'])
def secure_proxy():
    """Secure proxy endpoint with SSRF protection"""
    data = request.get_json()
    if not data or 'url' not in data:
        return jsonify({'error': 'URL required'}), 400
    
    url = data['url']
    
    # Define allowed destinations for this endpoint
    allowed_hosts = {
        'api.github.com',
        'api.twitter.com', 
        'httpbin.org',
        'jsonplaceholder.typicode.com'
    }
    
    # Validate URL
    is_safe, error = ssrf_guard.validate_url(url, allowed_hosts=allowed_hosts)
    if not is_safe:
        app.logger.warning(f"SSRF attempt blocked: {url} - {error}")
        return jsonify({'error': f'Invalid URL: {error}'}), 400
    
    try:
        session = create_safe_session()
        response = session.get(
            url,
            timeout=10,
            allow_redirects=False,  # Prevent redirect-based bypasses
            stream=True  # Don't load huge responses into memory
        )
        
        # Validate content type
        content_type = response.headers.get('content-type', '').lower()
        allowed_content_types = ['application/json', 'text/plain', 'text/html']
        
        if not any(ct in content_type for ct in allowed_content_types):
            return jsonify({'error': 'Unsupported content type'}), 400
        
        # Limit response size (10MB max)
        content_length = response.headers.get('content-length')
        if content_length and int(content_length) > 10 * 1024 * 1024:
            return jsonify({'error': 'Response too large'}), 413
        
        # Read response with size limit
        content = response.text[:1024 * 1024]  # 1MB limit
        
        return jsonify({
            'data': content,
            'status_code': response.status_code,
            'headers': dict(response.headers),
            'url': response.url
        })
        
    except requests.Timeout:
        return jsonify({'error': 'Request timeout'}), 408
    except requests.RequestException as e:
        app.logger.error(f"Request failed: {e}")
        return jsonify({'error': 'Request failed'}), 502

@app.route('/api/fetch-image', methods=['POST'])  
def fetch_image():
    """Fetch and validate image from URL"""
    data = request.get_json()
    if not data or 'url' not in data:
        return jsonify({'error': 'URL required'}), 400
    
    url = data['url']
    
    # More restrictive validation for image fetching
    allowed_hosts = {'images.unsplash.com', 'picsum.photos'}
    allowed_ports = {80, 443}  # Only HTTP/HTTPS default ports
    
    is_safe, error = ssrf_guard.validate_url(
        url, 
        allowed_hosts=allowed_hosts,
        allowed_ports=allowed_ports
    )
    
    if not is_safe:
        return jsonify({'error': f'Invalid image URL: {error}'}), 400
    
    try:
        session = create_safe_session()
        response = session.head(url, timeout=5)  # Just check headers first
        
        content_type = response.headers.get('content-type', '').lower()
        if not content_type.startswith('image/'):
            return jsonify({'error': 'URL does not point to an image'}), 400
        
        # Now fetch the actual image
        response = session.get(url, timeout=15, stream=True)
        
        return jsonify({
            'image_url': url,
            'content_type': content_type,
            'size': response.headers.get('content-length'),
            'status': 'success'
        })
        
    except requests.RequestException as e:
        return jsonify({'error': f'Failed to fetch image: {e}'}), 502

Django: SSRF Protection in Views

Django applications need similar protection, integrated with Django’s request handling:

from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.utils.decorators import method_decorator
from django.views.generic import View
import json
import requests
import logging

logger = logging.getLogger(__name__)

class SSRFProtectedMixin:
    """Mixin to add SSRF protection to Django views"""
    
    def validate_and_fetch(self, url, allowed_hosts=None, **kwargs):
        """Validate URL and fetch safely"""
        is_safe, error = ssrf_guard.validate_url(url, allowed_hosts)
        if not is_safe:
            logger.warning(f"SSRF attempt in {self.__class__.__name__}: {url} - {error}")
            raise ValueError(error)
        
        return requests.get(
            url,
            timeout=kwargs.get('timeout', 10),
            allow_redirects=False,
            headers={'User-Agent': 'Django App/1.0'}
        )

@method_decorator(csrf_exempt, name='dispatch')
class WebhookProxyView(SSRFProtectedMixin, View):
    """Secure webhook proxy that forwards requests to allowed services"""
    
    def post(self, request):
        try:
            data = json.loads(request.body)
            webhook_url = data.get('webhook_url')
            payload = data.get('payload', {})
            
            if not webhook_url:
                return JsonResponse({'error': 'webhook_url required'}, status=400)
            
            # Define allowed webhook destinations
            allowed_webhook_hosts = {
                'hooks.slack.com',
                'discord.com',
                'api.github.com'
            }
            
            # Use the mixin method for safe fetching
            response = self.validate_and_fetch(
                webhook_url,
                allowed_hosts=allowed_webhook_hosts
            )
            
            # Forward the webhook
            webhook_response = requests.post(
                webhook_url,
                json=payload,
                timeout=10,
                allow_redirects=False
            )
            
            return JsonResponse({
                'status': 'success',
                'webhook_status': webhook_response.status_code
            })
            
        except ValueError as e:
            return JsonResponse({'error': str(e)}, status=400)
        except json.JSONDecodeError:
            return JsonResponse({'error': 'Invalid JSON'}, status=400)
        except requests.RequestException as e:
            logger.error(f"Webhook request failed: {e}")
            return JsonResponse({'error': 'Webhook delivery failed'}, status=502)

@require_http_methods(["POST"])
@csrf_exempt
def url_metadata_extractor(request):
    """Extract metadata from URLs safely"""
    try:
        data = json.loads(request.body)
        url = data.get('url')
        
        if not url:
            return JsonResponse({'error': 'URL required'}, status=400)
        
        # Allow any HTTPS URL for metadata extraction, but validate first
        is_safe, error = ssrf_guard.validate_url(url)
        if not is_safe:
            return JsonResponse({'error': error}, status=400)
        
        # Only allow HTTPS for metadata extraction
        if not url.startswith('https://'):
            return JsonResponse({'error': 'Only HTTPS URLs allowed'}, status=400)
        
        response = requests.head(url, timeout=5, allow_redirects=True)
        
        metadata = {
            'url': response.url,  # Final URL after redirects
            'status_code': response.status_code,
            'content_type': response.headers.get('content-type'),
            'content_length': response.headers.get('content-length'),
            'last_modified': response.headers.get('last-modified'),
            'server': response.headers.get('server', '').split('/')[0]  # Don't leak versions
        }
        
        return JsonResponse(metadata)
        
    except requests.RequestException as e:
        return JsonResponse({'error': 'Failed to fetch URL metadata'}, status=502)
    except Exception as e:
        logger.error(f"Unexpected error in url_metadata_extractor: {e}")
        return JsonResponse({'error': 'Internal server error'}, status=500)

FastAPI: Modern Async SSRF Protection

FastAPI applications can use async HTTP clients, but need the same SSRF protection:

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, HttpUrl, validator
import httpx
import asyncio
from typing import Optional, Set

app = FastAPI(title="SSRF-Protected API")

class URLRequest(BaseModel):
    url: HttpUrl
    timeout: Optional[int] = 10
    
    @validator('timeout')
    def validate_timeout(cls, v):
        if v < 1 or v > 30:
            raise ValueError('Timeout must be between 1 and 30 seconds')
        return v

class SSRFProtectedClient:
    """Async HTTP client with SSRF protection"""
    
    def __init__(self):
        self.client = httpx.AsyncClient(
            timeout=httpx.Timeout(30.0),
            limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
            headers={'User-Agent': 'FastAPI App/1.0'}
        )
    
    async def safe_get(self, url: str, allowed_hosts: Optional[Set[str]] = None, **kwargs):
        """Make safe GET request with SSRF protection"""
        is_safe, error = ssrf_guard.validate_url(url, allowed_hosts)
        if not is_safe:
            raise HTTPException(status_code=400, detail=f"SSRF protection: {error}")
        
        try:
            response = await self.client.get(
                url,
                follow_redirects=False,  # Prevent redirect bypasses
                **kwargs
            )
            return response
        except httpx.RequestError as e:
            raise HTTPException(status_code=502, detail=f"Request failed: {e}")
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.aclose()

async def get_protected_client():
    """Dependency to provide SSRF-protected HTTP client"""
    async with SSRFProtectedClient() as client:
        yield client

@app.post("/api/fetch-json")
async def fetch_json_data(
    request: URLRequest,
    client: SSRFProtectedClient = Depends(get_protected_client)
):
    """Fetch JSON data from allowed external APIs"""
    allowed_apis = {
        'jsonplaceholder.typicode.com',
        'api.github.com',
        'httpbin.org'
    }
    
    response = await client.safe_get(
        str(request.url),
        allowed_hosts=allowed_apis,
        timeout=request.timeout
    )
    
    # Validate response is JSON
    content_type = response.headers.get('content-type', '')
    if 'application/json' not in content_type:
        raise HTTPException(status_code=400, detail="Response is not JSON")
    
    try:
        data = response.json()
    except ValueError:
        raise HTTPException(status_code=502, detail="Invalid JSON response")
    
    return {
        'data': data,
        'status_code': response.status_code,
        'url': str(response.url),
        'content_type': content_type
    }

@app.post("/api/check-website")
async def check_website_status(
    request: URLRequest,
    client: SSRFProtectedClient = Depends(get_protected_client)
):
    """Check if a website is accessible (HEAD request only)"""
    
    # More permissive - allow any valid external domain
    response = await client.safe_get(str(request.url))
    
    return {
        'url': str(response.url),
        'status_code': response.status_code,
        'is_accessible': response.status_code < 400,
        'response_time_ms': response.elapsed.total_seconds() * 1000,
        'headers': {
            'content-type': response.headers.get('content-type'),
            'server': response.headers.get('server', '').split('/')[0],
            'content-length': response.headers.get('content-length')
        }
    }

Cloud Environment Protection

SSRF in cloud environments is particularly dangerous because of metadata services:

import boto3
from botocore.exceptions import ClientError

class CloudSSRFProtection:
    """Enhanced SSRF protection for cloud environments"""
    
    def __init__(self):
        self.metadata_endpoints = {
            '169.254.169.254',  # AWS/Azure
            'metadata.google.internal',  # GCP 
            '100.100.100.200',  # Alibaba Cloud
        }
        
    def is_cloud_metadata_request(self, url: str) -> bool:
        """Check if URL targets cloud metadata service"""
        parsed = urllib.parse.urlparse(url)
        hostname = parsed.hostname
        
        if not hostname:
            return False
            
        # Check direct IP access
        if hostname in self.metadata_endpoints:
            return True
            
        # Check DNS resolution to metadata IPs
        try:
            addr_info = socket.getaddrinfo(hostname, None)
            for family, type_, proto, canonname, sockaddr in addr_info:
                if sockaddr[0] in self.metadata_endpoints:
                    return True
        except socket.gaierror:
            pass
            
        return False

def setup_aws_imds_protection():
    """Configure AWS Instance Metadata Service v2 (IMDSv2) for SSRF protection"""
    try:
        ec2_client = boto3.client('ec2')
        
        # Get current instance ID
        import requests
        instance_id = requests.get(
            'http://169.254.169.254/latest/meta-data/instance-id',
            timeout=2
        ).text
        
        # Enable IMDSv2 (requires tokens, prevents SSRF)
        ec2_client.modify_instance_metadata_options(
            InstanceId=instance_id,
            HttpTokens='required',  # Require tokens
            HttpPutResponseHopLimit=1,  # Prevent forwarding
            HttpEndpoint='enabled'
        )
        
        print("IMDSv2 enabled for SSRF protection")
        
    except Exception as e:
        print(f"Failed to configure IMDSv2: {e}")

# Network-level protection
def generate_iptables_rules():
    """Generate iptables rules for SSRF protection"""
    rules = [
        # Block metadata services
        "iptables -A OUTPUT -d 169.254.169.254 -j REJECT --reject-with icmp-port-unreachable",
        "iptables -A OUTPUT -d 100.100.100.200 -j REJECT --reject-with icmp-port-unreachable",
        
        # Block private networks (adjust for your environment)
        "iptables -A OUTPUT -d 10.0.0.0/8 -p tcp --dport 22 -j REJECT",
        "iptables -A OUTPUT -d 172.16.0.0/12 -p tcp --dport 22 -j REJECT",
        "iptables -A OUTPUT -d 192.168.0.0/16 -p tcp --dport 22 -j REJECT",
        
        # Allow specific external services
        "iptables -I OUTPUT -d api.github.com -j ACCEPT",
        "iptables -I OUTPUT -d httpbin.org -j ACCEPT",
    ]
    
    return rules

Testing Your SSRF Protection

Comprehensive testing is crucial to verify your SSRF defenses work:

import pytest
import responses
from unittest.mock import patch, MagicMock

class TestSSRFProtection:
    """Comprehensive SSRF protection test suite"""
    
    def test_blocks_localhost(self):
        """Test that localhost access is blocked"""
        dangerous_urls = [
            "http://127.0.0.1:22",
            "http://localhost:3306",
            "http://[::1]:80",
            "http://0.0.0.0:8080"
        ]
        
        for url in dangerous_urls:
            is_safe, error = ssrf_guard.validate_url(url)
            assert not is_safe, f"URL should be blocked: {url}"
            assert "blocked" in error.lower()
    
    def test_blocks_private_networks(self):
        """Test that private network access is blocked"""
        private_urls = [
            "http://192.168.1.1/admin",
            "http://10.0.0.5:8080", 
            "http://172.16.0.1/config"
        ]
        
        for url in private_urls:
            is_safe, error = ssrf_guard.validate_url(url)
            assert not is_safe, f"Private URL should be blocked: {url}"
    
    def test_blocks_cloud_metadata(self):
        """Test that cloud metadata access is blocked"""
        metadata_urls = [
            "http://169.254.169.254/latest/meta-data/",
            "http://metadata.google.internal/computeMetadata/",
            "http://100.100.100.200/latest/meta-data/"
        ]
        
        for url in metadata_urls:
            is_safe, error = ssrf_guard.validate_url(url)
            assert not is_safe, f"Metadata URL should be blocked: {url}"
    
    def test_allows_allowlisted_hosts(self):
        """Test that allowlisted hosts are permitted"""
        allowed_hosts = {'httpbin.org', 'api.github.com'}
        safe_urls = [
            "https://httpbin.org/json",
            "https://api.github.com/users/octocat"
        ]
        
        for url in safe_urls:
            is_safe, error = ssrf_guard.validate_url(url, allowed_hosts)
            assert is_safe, f"Allowlisted URL should be allowed: {url}"
    
    def test_dns_rebinding_protection(self):
        """Test protection against DNS rebinding attacks"""
        # Mock DNS resolution that changes
        with patch('socket.getaddrinfo') as mock_getaddrinfo:
            # First resolves to safe IP, then to dangerous IP
            mock_getaddrinfo.side_effect = [
                [socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 80)],
                [socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.0.0.1', 80)]
            ]
            
            # Should be safe first time
            is_safe, error = ssrf_guard.validate_url("http://evil.com/")
            # Implementation would need caching to prevent rebinding
            
    @responses.activate
    def test_flask_endpoint_blocks_ssrf(self):
        """Test Flask endpoint properly blocks SSRF attempts"""
        from your_app import app  # Import your Flask app
        
        client = app.test_client()
        
        # Test blocked request
        response = client.post('/api/proxy', 
            json={'url': 'http://127.0.0.1:22'}
        )
        assert response.status_code == 400
        assert 'blocked' in response.get_json()['error'].lower()
        
        # Test allowed request  
        responses.add(
            responses.GET,
            'https://httpbin.org/json',
            json={'test': 'data'},
            status=200
        )
        
        response = client.post('/api/proxy',
            json={'url': 'https://httpbin.org/json'}
        )
        assert response.status_code == 200
    
    def test_production_payloads(self):
        """Test with real-world SSRF attack payloads"""
        attack_payloads = [
            # Basic localhost variants
            "http://127.0.0.1/",
            "http://localhost/",
            "http://0.0.0.0/",
            "http://[::1]/",
            
            # Decimal/octal IP encoding
            "http://2130706433/",  # 127.0.0.1 in decimal
            "http://0177.0.0.1/",  # Octal encoding
            "http://127.1/",       # Short form
            
            # Domain tricks
            "http://127.0.0.1.nip.io/",  # Resolves to 127.0.0.1
            "http://127.0.0.1.xip.io/",
            
            # Cloud metadata
            "http://169.254.169.254/latest/meta-data/",
            "http://[fd00:ec2::254]/latest/meta-data/",
            
            # Protocol variants
            "file:///etc/passwd",
            "gopher://127.0.0.1:80/",
            "dict://127.0.0.1:11211/",
            
            # Port variants
            "http://127.0.0.1:22",     # SSH
            "http://127.0.0.1:3306",   # MySQL
            "http://127.0.0.1:6379",   # Redis
        ]
        
        for payload in attack_payloads:
            is_safe, error = ssrf_guard.validate_url(payload)
            assert not is_safe, f"Attack payload should be blocked: {payload}"

if __name__ == "__main__":
    # Quick test runner
    test_suite = TestSSRFProtection()
    print("Testing SSRF protection...")
    
    test_methods = [method for method in dir(test_suite) if method.startswith('test_')]
    
    for test_method in test_methods:
        try:
            getattr(test_suite, test_method)()
            print(f"✓ {test_method}")
        except AssertionError as e:
            print(f"✗ {test_method}: {e}")
        except Exception as e:
            print(f"⚠ {test_method}: Error - {e}")
    
    print("SSRF protection testing complete!")

Production Deployment Checklist

Before deploying your SSRF-protected Python application:

Application Level

  • All outbound HTTP requests use the SSRFGuard validation
  • Allowlists are defined for all external API integrations
  • Request timeouts are configured (max 30 seconds)
  • Response size limits are enforced (max 10MB)
  • Redirects are disabled or carefully validated
  • Error messages don’t leak internal network information
  • SSRF attempts are logged for security monitoring

Infrastructure Level

  • Cloud metadata services are protected (IMDSv2 enabled)
  • Firewall rules block internal network access from web tier
  • DNS resolution is controlled (no wildcard internal domains)
  • Network segmentation isolates sensitive services
  • VPC security groups restrict outbound connections
  • Load balancer logs capture outbound request attempts

Monitoring & Response

  • SSRF attempts trigger security alerts
  • Failed requests are correlated with user accounts
  • Network traffic is monitored for unusual patterns
  • Incident response plan includes SSRF scenarios
  • Regular penetration testing validates defenses

Frequently Asked Questions

How do I handle webhooks without creating SSRF vulnerabilities?

Use strict allowlists for webhook destinations. For services like Slack or Discord, only allow their official webhook domains. Validate the webhook URL before storing it, not just before using it.

Can attackers bypass DNS-based validation?

Yes, through DNS rebinding attacks. The attacker controls a domain that initially resolves to a safe IP, then changes to target internal services. Use DNS caching and re-validation to mitigate this.

What if I need to fetch arbitrary URLs for legitimate reasons?

Use a separate service in a sandboxed environment (like AWS Lambda) that has no access to your internal network. Pass the results back through a secure API.

How do I test SSRF protection in CI/CD pipelines?

Include the test payloads in your automated test suite. Use tools like SSRFmap for comprehensive testing, and monitor your application logs for blocked requests during testing.

Should I use the requests library or httpx for Python applications?

Both can be made secure with proper validation. httpx has better async support for modern applications, but requests is more mature. The validation layer is more important than the HTTP library choice.

What’s the performance impact of DNS resolution for every request?

Minimal if you cache DNS results appropriately. Consider using a DNS cache with a TTL of 5-10 minutes to balance security and performance.

The Bottom Line

SSRF prevention in Python comes down to three principles:

  1. Never trust user input for URLs - Always validate before making requests
  2. Use allowlists, not denylists - Explicitly define what’s allowed rather than trying to block everything dangerous
  3. Defense in depth - Combine application-level validation with network-level controls

The code examples in this guide give you production-ready SSRF protection for any Python web application. Implement the SSRFGuard library, adapt it for your framework, test thoroughly with attack payloads, and monitor for attempts in production.

SSRF is 100% preventable if you validate every outbound request your application makes. No exceptions.

📘 SSRF Deep Dive Series:

  1. 7 Critical SSRF Attack Techniques - Learn how SSRF attacks work
  2. SSRF Prevention Guide - General defense strategies across languages
  3. Python SSRF Prevention - Python-specific guide (you are here)
  4. CVE-2026-27696 Analysis - Real-world SSRF bypass example

🎯 Python Security

🛠️ Hands-On Practice

📚 Web Security Fundamentals