Redis is the gold standard for application caching, powering some of the world’s highest-traffic websites. This guide covers proven caching strategies, from basic cache-aside patterns to advanced invalidation techniques.
Why Redis for Caching?
Redis excels as a cache because of:
- In-memory speed: Sub-millisecond latency for most operations
- Rich data structures: Strings, hashes, lists, sets, sorted sets
- Atomic operations: INCR, ZADD, etc. prevent race conditions
- Built-in expiration: Automatic key cleanup with TTL
- Persistence options: Snapshots and append-only logs for durability
- High availability: Replication and Redis Sentinel/Cluster
Core Caching Patterns
Cache-Aside (Lazy Loading)
The most common pattern: application manages cache explicitly.
def get_user(user_id):
# 1. Check cache first
cache_key = f"user:{user_id}"
user = redis.get(cache_key)
if user:
return json.loads(user) # Cache hit
# 2. Cache miss: fetch from database
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
# 3. Store in cache for next time
redis.setex(cache_key, 3600, json.dumps(user)) # 1 hour TTL
return user
Advantages:
- Simple to implement
- Cache only contains requested data (efficient memory usage)
- Resilient to cache failures (falls back to database)
Disadvantages:
- Cache miss penalty: First request is slow
- Stale data possible until TTL expires
- Cache stampede risk (covered later)
Read-Through Cache
Cache library handles database access transparently:
from functools import lru_cache
import redis_cache
@redis_cache.cache(ttl=3600)
def get_user(user_id):
return db.query("SELECT * FROM users WHERE id = %s", user_id)
# Application code doesn't know about cache
user = get_user(123) # Cached automatically
When to use: Simpler application code, but less control over cache behavior.
Write-Through Cache
Write to cache and database simultaneously:
def update_user(user_id, data):
# 1. Update database
db.execute("UPDATE users SET name = %s WHERE id = %s", data['name'], user_id)
# 2. Update cache immediately
cache_key = f"user:{user_id}"
redis.setex(cache_key, 3600, json.dumps(data))
return data
Advantages:
- Cache always consistent with database
- No stale reads
Disadvantages:
- Write latency increased (two systems to update)
- Wasted cache writes for rarely-read data
Write-Behind (Write-Back) Cache
Write to cache immediately, persist to database asynchronously:
def update_user(user_id, data):
# 1. Update cache immediately
cache_key = f"user:{user_id}"
redis.setex(cache_key, 3600, json.dumps(data))
# 2. Queue database write
queue.enqueue('update_user_db', user_id, data)
return data
Advantages:
- Fastest write performance
- Batching reduces database load
Disadvantages:
- Risk of data loss if cache fails before persistence
- More complex error handling
Advanced Caching Strategies
Cache Stampede Prevention
When many requests simultaneously cache-miss the same key, they all hit the database. Solution: Lock-based pattern:
import time
def get_user_safe(user_id):
cache_key = f"user:{user_id}"
lock_key = f"lock:{cache_key}"
user = redis.get(cache_key)
if user:
return json.loads(user)
# Try to acquire lock
lock_acquired = redis.set(lock_key, '1', nx=True, ex=10) # 10s lock
if lock_acquired:
try:
# This request rebuilds cache
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
redis.setex(cache_key, 3600, json.dumps(user))
return user
finally:
redis.delete(lock_key)
else:
# Another request is rebuilding cache, wait
for _ in range(50): # Try for 5 seconds
time.sleep(0.1)
user = redis.get(cache_key)
if user:
return json.loads(user)
# Fallback to database if still not cached
return db.query("SELECT * FROM users WHERE id = %s", user_id)
Probabilistic Early Expiration
Prevent cache stampede by refreshing cache probabilistically before expiration:
import random
def get_user_probabilistic(user_id):
cache_key = f"user:{user_id}"
result = redis.get(cache_key)
if result:
# Check TTL
ttl = redis.ttl(cache_key)
if ttl > 0:
# Probability increases as TTL decreases
refresh_probability = 1 - (ttl / 3600) # Assuming 1h max TTL
if random.random() < refresh_probability:
# Asynchronously refresh cache
queue.enqueue('refresh_user_cache', user_id)
return json.loads(result)
# Cache miss: fetch and cache
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
redis.setex(cache_key, 3600, json.dumps(user))
return user
Multi-Level Caching
Combine L1 (in-process) and L2 (Redis) caches:
from functools import lru_cache
# L1: In-process LRU cache
_local_cache = {}
def get_user_multi_level(user_id):
# L1 cache check
if user_id in _local_cache:
return _local_cache[user_id]
# L2 cache check (Redis)
cache_key = f"user:{user_id}"
user = redis.get(cache_key)
if user:
user = json.loads(user)
_local_cache[user_id] = user # Populate L1
return user
# Database fetch
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
# Populate both caches
redis.setex(cache_key, 3600, json.dumps(user))
_local_cache[user_id] = user
return user
Benefits:
- L1 cache eliminates network latency for hot keys
- L2 cache shares data across application servers
Challenge: Cache invalidation across levels.
Cache Invalidation Patterns
Phil Karlton famously said: “There are only two hard things in Computer Science: cache invalidation and naming things.”
Time-Based Invalidation (TTL)
Simplest approach: Set expiration time:
# Short TTL for frequently changing data
redis.setex("trending_posts", 60, json.dumps(posts)) # 1 minute
# Long TTL for stable data
redis.setex("user:123", 86400, json.dumps(user)) # 24 hours
Pros: Simple, automatic cleanup Cons: Stale data until expiration, cache may expire during traffic spike
Event-Based Invalidation
Invalidate cache when underlying data changes:
def update_user(user_id, data):
# Update database
db.execute("UPDATE users SET name = %s WHERE id = %s", data['name'], user_id)
# Invalidate cache
redis.delete(f"user:{user_id}")
# Also invalidate related caches
redis.delete(f"user_posts:{user_id}")
redis.delete(f"user_followers:{user_id}")
Challenge: Identifying all affected cache keys.
Tag-Based Invalidation
Associate cache keys with tags for bulk invalidation:
def cache_with_tags(key, value, tags, ttl=3600):
# Store value
redis.setex(key, ttl, value)
# Associate with tags
for tag in tags:
redis.sadd(f"tag:{tag}", key)
def invalidate_by_tag(tag):
# Get all keys with this tag
keys = redis.smembers(f"tag:{tag}")
# Delete all keys
if keys:
redis.delete(*keys)
# Clean up tag set
redis.delete(f"tag:{tag}")
# Usage
cache_with_tags(
"user:123",
json.dumps(user),
tags=["user", "user:123", "organization:456"],
ttl=3600
)
# Invalidate all user caches in organization 456
invalidate_by_tag("organization:456")
Version-Based Invalidation
Include version number in cache key:
def get_user_versioned(user_id):
# Get current version from a versioned key
version = redis.get(f"user_version:{user_id}") or "1"
cache_key = f"user:{user_id}:v{version}"
user = redis.get(cache_key)
if user:
return json.loads(user)
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
redis.setex(cache_key, 3600, json.dumps(user))
return user
def update_user(user_id, data):
db.execute("UPDATE users SET name = %s WHERE id = %s", data['name'], user_id)
# Increment version (invalidates old cache implicitly)
redis.incr(f"user_version:{user_id}")
Benefit: Old cached versions expire naturally via TTL.
Caching Complex Queries
Full Result Caching
Cache entire query results:
def get_trending_posts():
cache_key = "trending_posts"
posts = redis.get(cache_key)
if posts:
return json.loads(posts)
posts = db.query("""
SELECT p.id, p.title, COUNT(v.id) as vote_count
FROM posts p
LEFT JOIN votes v ON p.id = v.post_id
WHERE p.created_at > NOW() - INTERVAL '24 hours'
GROUP BY p.id
ORDER BY vote_count DESC
LIMIT 100
""")
redis.setex(cache_key, 300, json.dumps(posts)) # 5 min TTL
return posts
Partial Result Caching
Cache aggregates separately from details:
def get_post_with_stats(post_id):
# Cache post data
post = get_cached(f"post:{post_id}", lambda: db.get_post(post_id), ttl=3600)
# Cache vote count separately (shorter TTL)
vote_count = get_cached(
f"post_votes:{post_id}",
lambda: db.count_votes(post_id),
ttl=60
)
return {**post, 'vote_count': vote_count}
Cached Materialized Views
Pre-compute expensive aggregations:
def refresh_user_stats():
users = db.query("SELECT id FROM users")
for user in users:
stats = db.query("""
SELECT
COUNT(*) as post_count,
SUM(upvotes) as total_upvotes,
AVG(upvotes) as avg_upvotes
FROM posts
WHERE user_id = %s
""", user['id'])
redis.setex(f"user_stats:{user['id']}", 3600, json.dumps(stats))
# Run periodically via cron job or queue
Redis Data Structures for Caching
Strings: Simple Values
# Single value
redis.set("counter", "42")
redis.incr("counter") # Atomic increment
# JSON objects
redis.set("user:123", json.dumps(user_data))
Hashes: Structured Data
Better than JSON strings for partial updates:
# Store user as hash
redis.hset("user:123", mapping={
"name": "John Doe",
"email": "john@example.com",
"age": "30"
})
# Update single field
redis.hset("user:123", "age", "31")
# Get specific fields
name = redis.hget("user:123", "name")
# Get all fields
user = redis.hgetall("user:123")
Lists: Ordered Collections
# Recent activity feed (newest first)
redis.lpush("user:123:activity", json.dumps(activity_data))
redis.ltrim("user:123:activity", 0, 99) # Keep only 100 items
# Get latest 10 activities
activities = redis.lrange("user:123:activity", 0, 9)
Sets: Unique Collections
# Tags for a post
redis.sadd("post:123:tags", "python", "redis", "caching")
# Check membership
is_tagged = redis.sismember("post:123:tags", "python")
# Set operations
common_tags = redis.sinter("post:123:tags", "post:456:tags")
Sorted Sets: Ranked Data
Perfect for leaderboards and trending data:
# Add posts with scores (vote count)
redis.zadd("trending", {"post:123": 42, "post:456": 38, "post:789": 55})
# Get top 10 posts
top_posts = redis.zrevrange("trending", 0, 9, withscores=True)
# Increment score atomically
redis.zincrby("trending", 1, "post:123")
# Get rank
rank = redis.zrevrank("trending", "post:123") # 0-indexed
Monitoring and Optimization
Track Cache Hit Rate
def get_with_metrics(key, fetch_fn, ttl=3600):
value = redis.get(key)
if value:
redis.incr("cache_hits")
return json.loads(value)
redis.incr("cache_misses")
value = fetch_fn()
redis.setex(key, ttl, json.dumps(value))
return value
# Calculate hit rate
hits = int(redis.get("cache_hits") or 0)
misses = int(redis.get("cache_misses") or 0)
hit_rate = hits / (hits + misses) if (hits + misses) > 0 else 0
print(f"Cache hit rate: {hit_rate:.2%}")
Target: 80%+ hit rate for effective caching.
Identify Hot Keys
# Redis CLI
# Monitor access patterns
redis-cli --hotkeys
# Or use MONITOR (expensive, don't use in production)
redis-cli MONITOR
Memory Optimization
# Check memory usage
info = redis.info("memory")
print(f"Used memory: {info['used_memory_human']}")
# Find large keys
# Use redis-rdb-tools
rdb --command memory dump.rdb --largest 10
# Set eviction policy
redis.config_set("maxmemory-policy", "allkeys-lru")
Best Practices
- Always set TTLs: Prevent unbounded growth
- Use appropriate data structures: Hashes for objects, sorted sets for rankings
- Handle cache failures gracefully: Fallback to database
- Monitor hit rates: Low hit rate = ineffective caching
- Namespace your keys:
app:entity:idprevents collisions - Use connection pooling: Reuse Redis connections
- Implement circuit breakers: Protect database from cache failure cascade
- Compress large values: Use gzip for >1KB values
- Pipeline commands: Batch multiple Redis operations
- Use Lua scripts: Atomic multi-step operations
Conclusion
Effective Redis caching requires:
- Right pattern: Cache-aside for most use cases
- Proper invalidation: Combine TTL with event-based invalidation
- Stampede prevention: Lock-based or probabilistic patterns
- Monitoring: Track hit rates and memory usage
- Graceful degradation: Always have database fallback
Start simple with cache-aside and short TTLs. Add complexity (multi-level caching, tag-based invalidation) only when needed. Measure everything and optimize based on real traffic patterns.
Done right, Redis can reduce database load by 80%+ while improving response times by 10-100x. That’s worth the investment.