Skip to content
Go back

Redis Caching Strategies for High-Traffic Applications

Redis is the gold standard for application caching, powering some of the world’s highest-traffic websites. This guide covers proven caching strategies, from basic cache-aside patterns to advanced invalidation techniques.

Why Redis for Caching?

Redis excels as a cache because of:

  1. In-memory speed: Sub-millisecond latency for most operations
  2. Rich data structures: Strings, hashes, lists, sets, sorted sets
  3. Atomic operations: INCR, ZADD, etc. prevent race conditions
  4. Built-in expiration: Automatic key cleanup with TTL
  5. Persistence options: Snapshots and append-only logs for durability
  6. High availability: Replication and Redis Sentinel/Cluster

Core Caching Patterns

Cache-Aside (Lazy Loading)

The most common pattern: application manages cache explicitly.

def get_user(user_id):
    # 1. Check cache first
    cache_key = f"user:{user_id}"
    user = redis.get(cache_key)

    if user:
        return json.loads(user)  # Cache hit

    # 2. Cache miss: fetch from database
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)

    # 3. Store in cache for next time
    redis.setex(cache_key, 3600, json.dumps(user))  # 1 hour TTL

    return user

Advantages:

Disadvantages:

Read-Through Cache

Cache library handles database access transparently:

from functools import lru_cache
import redis_cache

@redis_cache.cache(ttl=3600)
def get_user(user_id):
    return db.query("SELECT * FROM users WHERE id = %s", user_id)

# Application code doesn't know about cache
user = get_user(123)  # Cached automatically

When to use: Simpler application code, but less control over cache behavior.

Write-Through Cache

Write to cache and database simultaneously:

def update_user(user_id, data):
    # 1. Update database
    db.execute("UPDATE users SET name = %s WHERE id = %s", data['name'], user_id)

    # 2. Update cache immediately
    cache_key = f"user:{user_id}"
    redis.setex(cache_key, 3600, json.dumps(data))

    return data

Advantages:

Disadvantages:

Write-Behind (Write-Back) Cache

Write to cache immediately, persist to database asynchronously:

def update_user(user_id, data):
    # 1. Update cache immediately
    cache_key = f"user:{user_id}"
    redis.setex(cache_key, 3600, json.dumps(data))

    # 2. Queue database write
    queue.enqueue('update_user_db', user_id, data)

    return data

Advantages:

Disadvantages:

Advanced Caching Strategies

Cache Stampede Prevention

When many requests simultaneously cache-miss the same key, they all hit the database. Solution: Lock-based pattern:

import time

def get_user_safe(user_id):
    cache_key = f"user:{user_id}"
    lock_key = f"lock:{cache_key}"

    user = redis.get(cache_key)
    if user:
        return json.loads(user)

    # Try to acquire lock
    lock_acquired = redis.set(lock_key, '1', nx=True, ex=10)  # 10s lock

    if lock_acquired:
        try:
            # This request rebuilds cache
            user = db.query("SELECT * FROM users WHERE id = %s", user_id)
            redis.setex(cache_key, 3600, json.dumps(user))
            return user
        finally:
            redis.delete(lock_key)
    else:
        # Another request is rebuilding cache, wait
        for _ in range(50):  # Try for 5 seconds
            time.sleep(0.1)
            user = redis.get(cache_key)
            if user:
                return json.loads(user)

        # Fallback to database if still not cached
        return db.query("SELECT * FROM users WHERE id = %s", user_id)

Probabilistic Early Expiration

Prevent cache stampede by refreshing cache probabilistically before expiration:

import random

def get_user_probabilistic(user_id):
    cache_key = f"user:{user_id}"
    result = redis.get(cache_key)

    if result:
        # Check TTL
        ttl = redis.ttl(cache_key)
        if ttl > 0:
            # Probability increases as TTL decreases
            refresh_probability = 1 - (ttl / 3600)  # Assuming 1h max TTL
            if random.random() < refresh_probability:
                # Asynchronously refresh cache
                queue.enqueue('refresh_user_cache', user_id)

        return json.loads(result)

    # Cache miss: fetch and cache
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    redis.setex(cache_key, 3600, json.dumps(user))
    return user

Multi-Level Caching

Combine L1 (in-process) and L2 (Redis) caches:

from functools import lru_cache

# L1: In-process LRU cache
_local_cache = {}

def get_user_multi_level(user_id):
    # L1 cache check
    if user_id in _local_cache:
        return _local_cache[user_id]

    # L2 cache check (Redis)
    cache_key = f"user:{user_id}"
    user = redis.get(cache_key)

    if user:
        user = json.loads(user)
        _local_cache[user_id] = user  # Populate L1
        return user

    # Database fetch
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)

    # Populate both caches
    redis.setex(cache_key, 3600, json.dumps(user))
    _local_cache[user_id] = user

    return user

Benefits:

Challenge: Cache invalidation across levels.

Cache Invalidation Patterns

Phil Karlton famously said: “There are only two hard things in Computer Science: cache invalidation and naming things.”

Time-Based Invalidation (TTL)

Simplest approach: Set expiration time:

# Short TTL for frequently changing data
redis.setex("trending_posts", 60, json.dumps(posts))  # 1 minute

# Long TTL for stable data
redis.setex("user:123", 86400, json.dumps(user))  # 24 hours

Pros: Simple, automatic cleanup Cons: Stale data until expiration, cache may expire during traffic spike

Event-Based Invalidation

Invalidate cache when underlying data changes:

def update_user(user_id, data):
    # Update database
    db.execute("UPDATE users SET name = %s WHERE id = %s", data['name'], user_id)

    # Invalidate cache
    redis.delete(f"user:{user_id}")

    # Also invalidate related caches
    redis.delete(f"user_posts:{user_id}")
    redis.delete(f"user_followers:{user_id}")

Challenge: Identifying all affected cache keys.

Tag-Based Invalidation

Associate cache keys with tags for bulk invalidation:

def cache_with_tags(key, value, tags, ttl=3600):
    # Store value
    redis.setex(key, ttl, value)

    # Associate with tags
    for tag in tags:
        redis.sadd(f"tag:{tag}", key)

def invalidate_by_tag(tag):
    # Get all keys with this tag
    keys = redis.smembers(f"tag:{tag}")

    # Delete all keys
    if keys:
        redis.delete(*keys)

    # Clean up tag set
    redis.delete(f"tag:{tag}")

# Usage
cache_with_tags(
    "user:123",
    json.dumps(user),
    tags=["user", "user:123", "organization:456"],
    ttl=3600
)

# Invalidate all user caches in organization 456
invalidate_by_tag("organization:456")

Version-Based Invalidation

Include version number in cache key:

def get_user_versioned(user_id):
    # Get current version from a versioned key
    version = redis.get(f"user_version:{user_id}") or "1"

    cache_key = f"user:{user_id}:v{version}"
    user = redis.get(cache_key)

    if user:
        return json.loads(user)

    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    redis.setex(cache_key, 3600, json.dumps(user))

    return user

def update_user(user_id, data):
    db.execute("UPDATE users SET name = %s WHERE id = %s", data['name'], user_id)

    # Increment version (invalidates old cache implicitly)
    redis.incr(f"user_version:{user_id}")

Benefit: Old cached versions expire naturally via TTL.

Caching Complex Queries

Full Result Caching

Cache entire query results:

def get_trending_posts():
    cache_key = "trending_posts"
    posts = redis.get(cache_key)

    if posts:
        return json.loads(posts)

    posts = db.query("""
        SELECT p.id, p.title, COUNT(v.id) as vote_count
        FROM posts p
        LEFT JOIN votes v ON p.id = v.post_id
        WHERE p.created_at > NOW() - INTERVAL '24 hours'
        GROUP BY p.id
        ORDER BY vote_count DESC
        LIMIT 100
    """)

    redis.setex(cache_key, 300, json.dumps(posts))  # 5 min TTL
    return posts

Partial Result Caching

Cache aggregates separately from details:

def get_post_with_stats(post_id):
    # Cache post data
    post = get_cached(f"post:{post_id}", lambda: db.get_post(post_id), ttl=3600)

    # Cache vote count separately (shorter TTL)
    vote_count = get_cached(
        f"post_votes:{post_id}",
        lambda: db.count_votes(post_id),
        ttl=60
    )

    return {**post, 'vote_count': vote_count}

Cached Materialized Views

Pre-compute expensive aggregations:

def refresh_user_stats():
    users = db.query("SELECT id FROM users")

    for user in users:
        stats = db.query("""
            SELECT
                COUNT(*) as post_count,
                SUM(upvotes) as total_upvotes,
                AVG(upvotes) as avg_upvotes
            FROM posts
            WHERE user_id = %s
        """, user['id'])

        redis.setex(f"user_stats:{user['id']}", 3600, json.dumps(stats))

# Run periodically via cron job or queue

Redis Data Structures for Caching

Strings: Simple Values

# Single value
redis.set("counter", "42")
redis.incr("counter")  # Atomic increment

# JSON objects
redis.set("user:123", json.dumps(user_data))

Hashes: Structured Data

Better than JSON strings for partial updates:

# Store user as hash
redis.hset("user:123", mapping={
    "name": "John Doe",
    "email": "john@example.com",
    "age": "30"
})

# Update single field
redis.hset("user:123", "age", "31")

# Get specific fields
name = redis.hget("user:123", "name")

# Get all fields
user = redis.hgetall("user:123")

Lists: Ordered Collections

# Recent activity feed (newest first)
redis.lpush("user:123:activity", json.dumps(activity_data))
redis.ltrim("user:123:activity", 0, 99)  # Keep only 100 items

# Get latest 10 activities
activities = redis.lrange("user:123:activity", 0, 9)

Sets: Unique Collections

# Tags for a post
redis.sadd("post:123:tags", "python", "redis", "caching")

# Check membership
is_tagged = redis.sismember("post:123:tags", "python")

# Set operations
common_tags = redis.sinter("post:123:tags", "post:456:tags")

Sorted Sets: Ranked Data

Perfect for leaderboards and trending data:

# Add posts with scores (vote count)
redis.zadd("trending", {"post:123": 42, "post:456": 38, "post:789": 55})

# Get top 10 posts
top_posts = redis.zrevrange("trending", 0, 9, withscores=True)

# Increment score atomically
redis.zincrby("trending", 1, "post:123")

# Get rank
rank = redis.zrevrank("trending", "post:123")  # 0-indexed

Monitoring and Optimization

Track Cache Hit Rate

def get_with_metrics(key, fetch_fn, ttl=3600):
    value = redis.get(key)

    if value:
        redis.incr("cache_hits")
        return json.loads(value)

    redis.incr("cache_misses")
    value = fetch_fn()
    redis.setex(key, ttl, json.dumps(value))
    return value

# Calculate hit rate
hits = int(redis.get("cache_hits") or 0)
misses = int(redis.get("cache_misses") or 0)
hit_rate = hits / (hits + misses) if (hits + misses) > 0 else 0

print(f"Cache hit rate: {hit_rate:.2%}")

Target: 80%+ hit rate for effective caching.

Identify Hot Keys

# Redis CLI
# Monitor access patterns
redis-cli --hotkeys

# Or use MONITOR (expensive, don't use in production)
redis-cli MONITOR

Memory Optimization

# Check memory usage
info = redis.info("memory")
print(f"Used memory: {info['used_memory_human']}")

# Find large keys
# Use redis-rdb-tools
rdb --command memory dump.rdb --largest 10

# Set eviction policy
redis.config_set("maxmemory-policy", "allkeys-lru")

Best Practices

  1. Always set TTLs: Prevent unbounded growth
  2. Use appropriate data structures: Hashes for objects, sorted sets for rankings
  3. Handle cache failures gracefully: Fallback to database
  4. Monitor hit rates: Low hit rate = ineffective caching
  5. Namespace your keys: app:entity:id prevents collisions
  6. Use connection pooling: Reuse Redis connections
  7. Implement circuit breakers: Protect database from cache failure cascade
  8. Compress large values: Use gzip for >1KB values
  9. Pipeline commands: Batch multiple Redis operations
  10. Use Lua scripts: Atomic multi-step operations

Conclusion

Effective Redis caching requires:

Start simple with cache-aside and short TTLs. Add complexity (multi-level caching, tag-based invalidation) only when needed. Measure everything and optimize based on real traffic patterns.

Done right, Redis can reduce database load by 80%+ while improving response times by 10-100x. That’s worth the investment.


Share this post on:

Previous Post
Zero-Downtime Database Migrations: Strategies and Patterns