Why Rate Limiting Matters in Web Scraping
Rate limiting is fundamental to ethical and sustainable web scraping. It protects websites from overload, maintains good relationships with site owners, and helps avoid IP bans and legal issues. Professional scrapers understand that respectful data collection leads to long-term success.
This guide covers comprehensive rate limiting strategies, from basic delays to sophisticated adaptive throttling systems that automatically adjust to website conditions.
Understanding Rate Limiting Principles
What is Rate Limiting?
Rate limiting controls the frequency of requests sent to a target website. It involves:
- Request Frequency: Number of requests per time period
- Concurrent Connections: Simultaneous connections to a domain
- Bandwidth Usage: Data transfer rate control
- Resource Respect: Consideration for server capacity
Why Rate Limiting is Essential
- Legal Compliance: Avoid violating terms of service
- Server Protection: Prevent overwhelming target systems
- IP Preservation: Avoid getting blocked or banned
- Data Quality: Ensure consistent, reliable data collection
- Ethical Standards: Maintain professional scraping practices
Basic Rate Limiting Implementation
Simple Delay Mechanisms
import time
import random
import requests
class BasicRateLimiter:
def __init__(self, delay_range=(1, 3)):
self.min_delay = delay_range[0]
self.max_delay = delay_range[1]
self.last_request_time = 0
def wait(self):
"""Implement random delay between requests"""
current_time = time.time()
elapsed = current_time - self.last_request_time
# Calculate required delay
delay = random.uniform(self.min_delay, self.max_delay)
if elapsed < delay:
sleep_time = delay - elapsed
print(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
time.sleep(sleep_time)
self.last_request_time = time.time()
def request(self, url, **kwargs):
"""Make rate-limited request"""
self.wait()
return requests.get(url, **kwargs)
# Usage example
limiter = BasicRateLimiter(delay_range=(2, 5))
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
for url in urls:
response = limiter.request(url)
print(f"Scraped {url}: {response.status_code}")
Domain-Specific Rate Limiting
from urllib.parse import urlparse
from collections import defaultdict
class DomainRateLimiter:
def __init__(self):
self.domain_delays = defaultdict(lambda: 1.0) # Default 1 second
self.last_request_times = defaultdict(float)
def set_domain_delay(self, domain, delay):
"""Set specific delay for a domain"""
self.domain_delays[domain] = delay
def wait_for_domain(self, url):
"""Wait appropriate time for specific domain"""
domain = urlparse(url).netloc
current_time = time.time()
last_request = self.last_request_times[domain]
required_delay = self.domain_delays[domain]
elapsed = current_time - last_request
if elapsed < required_delay:
sleep_time = required_delay - elapsed
time.sleep(sleep_time)
self.last_request_times[domain] = time.time()
def request(self, url, **kwargs):
"""Make domain-aware rate-limited request"""
self.wait_for_domain(url)
return requests.get(url, **kwargs)
# Usage with different domain settings
limiter = DomainRateLimiter()
limiter.set_domain_delay("api.example.com", 0.5) # Fast API
limiter.set_domain_delay("slow-site.com", 5.0) # Slow site
limiter.set_domain_delay("ecommerce.com", 2.0) # E-commerce site
# Requests will be automatically rate-limited per domain
response1 = limiter.request("https://api.example.com/data")
response2 = limiter.request("https://slow-site.com/page")
response3 = limiter.request("https://ecommerce.com/products")
Advanced Rate Limiting Strategies
Exponential Backoff
import math
class ExponentialBackoffLimiter:
def __init__(self, base_delay=1.0, max_delay=60.0):
self.base_delay = base_delay
self.max_delay = max_delay
self.consecutive_errors = defaultdict(int)
self.domain_delays = defaultdict(lambda: base_delay)
def calculate_delay(self, domain, error_occurred=False):
"""Calculate delay using exponential backoff"""
if error_occurred:
self.consecutive_errors[domain] += 1
else:
self.consecutive_errors[domain] = 0
# Exponential backoff formula
error_count = self.consecutive_errors[domain]
delay = min(
self.base_delay * (2 ** error_count),
self.max_delay
)
self.domain_delays[domain] = delay
return delay
def request_with_backoff(self, url, max_retries=3):
"""Make request with exponential backoff on errors"""
domain = urlparse(url).netloc
for attempt in range(max_retries + 1):
try:
delay = self.calculate_delay(domain, error_occurred=False)
time.sleep(delay)
response = requests.get(url, timeout=10)
if response.status_code == 429: # Too Many Requests
raise requests.exceptions.RequestException("Rate limited")
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
print(f"Request failed (attempt {attempt + 1}): {e}")
if attempt < max_retries:
error_delay = self.calculate_delay(domain, error_occurred=True)
print(f"Backing off for {error_delay:.2f} seconds")
time.sleep(error_delay)
else:
raise
# Usage
backoff_limiter = ExponentialBackoffLimiter()
response = backoff_limiter.request_with_backoff("https://api.example.com/data")
Adaptive Rate Limiting
class AdaptiveRateLimiter:
def __init__(self, initial_delay=1.0):
self.domain_stats = defaultdict(lambda: {
'delay': initial_delay,
'response_times': [],
'success_rate': 1.0,
'last_adjustment': time.time()
})
def record_response(self, domain, response_time, success):
"""Record response statistics"""
stats = self.domain_stats[domain]
# Keep only recent response times (last 10)
stats['response_times'].append(response_time)
if len(stats['response_times']) > 10:
stats['response_times'].pop(0)
# Update success rate (exponential moving average)
alpha = 0.1
stats['success_rate'] = (
alpha * (1 if success else 0) +
(1 - alpha) * stats['success_rate']
)
def adjust_delay(self, domain):
"""Dynamically adjust delay based on performance"""
stats = self.domain_stats[domain]
current_time = time.time()
# Only adjust every 30 seconds
if current_time - stats['last_adjustment'] < 30:
return stats['delay']
avg_response_time = (
sum(stats['response_times']) / len(stats['response_times'])
if stats['response_times'] else 1.0
)
# Adjustment logic
if stats['success_rate'] < 0.8: # Low success rate
stats['delay'] *= 1.5 # Increase delay
elif avg_response_time > 5.0: # Slow responses
stats['delay'] *= 1.2
elif stats['success_rate'] > 0.95 and avg_response_time < 2.0:
stats['delay'] *= 0.9 # Decrease delay for good performance
# Keep delay within reasonable bounds
stats['delay'] = max(0.5, min(stats['delay'], 30.0))
stats['last_adjustment'] = current_time
return stats['delay']
def request(self, url):
"""Make adaptive rate-limited request"""
domain = urlparse(url).netloc
delay = self.adjust_delay(domain)
time.sleep(delay)
start_time = time.time()
try:
response = requests.get(url, timeout=10)
response_time = time.time() - start_time
success = response.status_code == 200
self.record_response(domain, response_time, success)
return response
except Exception as e:
response_time = time.time() - start_time
self.record_response(domain, response_time, False)
raise
# Usage
adaptive_limiter = AdaptiveRateLimiter()
# The limiter will automatically adjust delays based on performance
for i in range(100):
try:
response = adaptive_limiter.request(f"https://api.example.com/data/{i}")
print(f"Request {i}: {response.status_code}")
except Exception as e:
print(f"Request {i} failed: {e}")
Distributed Rate Limiting
Redis-Based Rate Limiting
import redis
import json
class DistributedRateLimiter:
def __init__(self, redis_url='redis://localhost:6379'):
self.redis_client = redis.from_url(redis_url)
self.default_window = 60 # 1 minute window
self.default_limit = 30 # 30 requests per minute
def is_allowed(self, domain, limit=None, window=None):
"""Check if request is allowed using sliding window"""
limit = limit or self.default_limit
window = window or self.default_window
current_time = time.time()
key = f"rate_limit:{domain}"
# Use Redis pipeline for atomic operations
pipe = self.redis_client.pipeline()
# Remove old entries outside the window
pipe.zremrangebyscore(key, 0, current_time - window)
# Count current requests in window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(current_time): current_time})
# Set expiry for cleanup
pipe.expire(key, window)
results = pipe.execute()
current_requests = results[1]
return current_requests < limit
def wait_if_needed(self, domain, limit=None, window=None):
"""Wait until request is allowed"""
while not self.is_allowed(domain, limit, window):
print(f"Rate limit exceeded for {domain}, waiting...")
time.sleep(1)
def request(self, url, **kwargs):
"""Make distributed rate-limited request"""
domain = urlparse(url).netloc
self.wait_if_needed(domain)
return requests.get(url, **kwargs)
# Usage across multiple scraper instances
distributed_limiter = DistributedRateLimiter()
# This will coordinate rate limiting across all instances
response = distributed_limiter.request("https://api.example.com/data")
Token Bucket Algorithm
class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate # tokens per second
self.last_refill = time.time()
def consume(self, tokens=1):
"""Try to consume tokens from bucket"""
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
"""Refill tokens based on elapsed time"""
current_time = time.time()
elapsed = current_time - self.last_refill
# Add tokens based on elapsed time
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = current_time
def wait_for_tokens(self, tokens=1):
"""Wait until enough tokens are available"""
while not self.consume(tokens):
time.sleep(0.1)
class TokenBucketRateLimiter:
def __init__(self):
self.buckets = {}
def get_bucket(self, domain, capacity=10, refill_rate=1.0):
"""Get or create token bucket for domain"""
if domain not in self.buckets:
self.buckets[domain] = TokenBucket(capacity, refill_rate)
return self.buckets[domain]
def request(self, url, **kwargs):
"""Make token bucket rate-limited request"""
domain = urlparse(url).netloc
bucket = self.get_bucket(domain)
# Wait for token availability
bucket.wait_for_tokens()
return requests.get(url, **kwargs)
# Usage
token_limiter = TokenBucketRateLimiter()
# Allows burst requests up to bucket capacity
# then throttles to refill rate
for i in range(20):
response = token_limiter.request(f"https://api.example.com/data/{i}")
print(f"Request {i}: {response.status_code}")
Integration with Popular Libraries
Scrapy Rate Limiting
# Custom Scrapy middleware for advanced rate limiting
from scrapy.downloadermiddlewares.delay import DelayMiddleware
class AdaptiveDelayMiddleware:
def __init__(self, delay=1.0):
self.delay = delay
self.domain_stats = defaultdict(lambda: {
'delay': delay,
'errors': 0,
'successes': 0
})
@classmethod
def from_crawler(cls, crawler):
return cls(
delay=crawler.settings.getfloat('DOWNLOAD_DELAY', 1.0)
)
def process_request(self, request, spider):
domain = urlparse(request.url).netloc
delay = self.calculate_delay(domain)
if delay > 0:
time.sleep(delay)
def process_response(self, request, response, spider):
domain = urlparse(request.url).netloc
stats = self.domain_stats[domain]
if response.status == 200:
stats['successes'] += 1
stats['errors'] = max(0, stats['errors'] - 1)
else:
stats['errors'] += 1
self.adjust_delay(domain)
return response
def calculate_delay(self, domain):
return self.domain_stats[domain]['delay']
def adjust_delay(self, domain):
stats = self.domain_stats[domain]
if stats['errors'] > 3:
stats['delay'] *= 1.5
elif stats['successes'] > 10 and stats['errors'] == 0:
stats['delay'] *= 0.9
stats['delay'] = max(0.5, min(stats['delay'], 10.0))
# settings.py
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.AdaptiveDelayMiddleware': 543,
}
DOWNLOAD_DELAY = 1.0
RANDOMIZE_DOWNLOAD_DELAY = 0.5
Requests-HTML Rate Limiting
from requests_html import HTMLSession
class RateLimitedSession(HTMLSession):
def __init__(self, rate_limiter=None):
super().__init__()
self.rate_limiter = rate_limiter or BasicRateLimiter()
def get(self, url, **kwargs):
"""Override get method with rate limiting"""
self.rate_limiter.wait_for_domain(url)
return super().get(url, **kwargs)
def post(self, url, **kwargs):
"""Override post method with rate limiting"""
self.rate_limiter.wait_for_domain(url)
return super().post(url, **kwargs)
# Usage
session = RateLimitedSession(
rate_limiter=DomainRateLimiter()
)
response = session.get('https://example.com')
response.html.render() # JavaScript rendering with rate limiting
Monitoring and Analytics
Rate Limiting Metrics
import logging
from collections import defaultdict
class RateLimitingMonitor:
def __init__(self):
self.metrics = defaultdict(lambda: {
'requests_made': 0,
'requests_blocked': 0,
'total_delay_time': 0,
'errors': 0
})
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('rate_limiting.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def log_request(self, domain, delay_time, success=True):
"""Log request metrics"""
metrics = self.metrics[domain]
metrics['requests_made'] += 1
metrics['total_delay_time'] += delay_time
if not success:
metrics['errors'] += 1
self.logger.info(f"Domain: {domain}, Delay: {delay_time:.2f}s, Success: {success}")
def log_rate_limit_hit(self, domain):
"""Log when rate limit is encountered"""
self.metrics[domain]['requests_blocked'] += 1
self.logger.warning(f"Rate limit hit for domain: {domain}")
def get_statistics(self):
"""Get comprehensive statistics"""
stats = {}
for domain, metrics in self.metrics.items():
total_requests = metrics['requests_made']
if total_requests > 0:
stats[domain] = {
'total_requests': total_requests,
'requests_blocked': metrics['requests_blocked'],
'error_rate': metrics['errors'] / total_requests,
'avg_delay': metrics['total_delay_time'] / total_requests,
'block_rate': metrics['requests_blocked'] / total_requests
}
return stats
def print_report(self):
"""Print detailed statistics report"""
stats = self.get_statistics()
print("\n" + "="*60)
print("RATE LIMITING STATISTICS REPORT")
print("="*60)
for domain, metrics in stats.items():
print(f"\nDomain: {domain}")
print(f" Total Requests: {metrics['total_requests']}")
print(f" Requests Blocked: {metrics['requests_blocked']}")
print(f" Error Rate: {metrics['error_rate']:.2%}")
print(f" Average Delay: {metrics['avg_delay']:.2f}s")
print(f" Block Rate: {metrics['block_rate']:.2%}")
# Usage
monitor = RateLimitingMonitor()
class MonitoredRateLimiter(BasicRateLimiter):
def __init__(self, monitor, *args, **kwargs):
super().__init__(*args, **kwargs)
self.monitor = monitor
def request(self, url, **kwargs):
domain = urlparse(url).netloc
start_time = time.time()
try:
response = super().request(url, **kwargs)
delay_time = time.time() - start_time
success = response.status_code == 200
self.monitor.log_request(domain, delay_time, success)
return response
except Exception as e:
delay_time = time.time() - start_time
self.monitor.log_request(domain, delay_time, False)
raise
# Use monitored rate limiter
limiter = MonitoredRateLimiter(monitor, delay_range=(1, 3))
# After scraping session
monitor.print_report()
Best Practices and Recommendations
General Guidelines
- Start Conservative: Begin with longer delays and adjust down
- Respect robots.txt: Check crawl-delay directives
- Monitor Server Response: Watch for 429 status codes
- Use Random Delays: Avoid predictable patterns
- Implement Backoff: Increase delays on errors
Domain-Specific Strategies
- E-commerce Sites: 2-5 second delays during peak hours
- News Websites: 1-3 second delays, respect peak traffic
- APIs: Follow documented rate limits strictly
- Government Sites: Very conservative approach (5+ seconds)
- Social Media: Use official APIs when possible
Legal and Ethical Considerations
- Review terms of service before scraping
- Identify yourself with proper User-Agent headers
- Consider reaching out for API access
- Respect copyright and data protection laws
- Implement circuit breakers for server protection
Professional Rate Limiting Solutions
UK Data Services implements sophisticated rate limiting strategies for ethical, compliant web scraping that respects website resources while maximizing data collection efficiency.
Get Rate Limiting Consultation