redis-patterns
Redis data structure patterns, caching strategies, distributed locks, rate limiting, pub/sub, and connection management for production applications.
Redis Patterns
Quick reference for Redis best practices across common backend use cases.
How It Works
Redis is an in-memory data structure store that supports strings, hashes, lists, sets, sorted sets, streams, and more. Individual Redis commands are atomic on a single instance; multi-step workflows require Lua scripts, MULTI/EXEC transactions, or explicit synchronization to stay atomic. Data is optionally persisted via RDB snapshots or AOF logs. Clients communicate over TCP using the RESP protocol; connection pools are essential to avoid per-request handshake overhead.
When to Activate
- Adding caching to an application
- Implementing rate limiting or throttling
- Building distributed locks or coordination
- Setting up session or token storage
- Using Pub/Sub or Redis Streams for messaging
- Configuring Redis in production (pooling, eviction, clustering)
Data Structure Cheat Sheet
| Use Case | Structure | Example Key |
|---|---|---|
| Simple cache | String | product:123 |
| User session | Hash | session:abc |
| Leaderboard | Sorted Set | scores:weekly |
| Unique visitors | Set | visitors:2024-01-01 |
| Activity feed | List | feed:user:456 |
| Event stream | Stream | events:orders |
| Counters / rate limits | String (INCR) | ratelimit:user:123 |
| Bloom filter / HLL | HyperLogLog | hll:pageviews |
Core Patterns
Cache-Aside (Lazy Loading)
import redisimport json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_product(product_id: int): cache_key = f"product:{product_id}" cached = r.get(cache_key)
if cached: return json.loads(cached)
product = db.query("SELECT * FROM products WHERE id = %s", product_id) r.setex(cache_key, 3600, json.dumps(product)) # TTL: 1 hour return productWrite-Through Cache
def update_product(product_id: int, data: dict): # Write to DB first db.execute("UPDATE products SET ... WHERE id = %s", product_id)
# Immediately update cache cache_key = f"product:{product_id}" r.setex(cache_key, 3600, json.dumps(data))Cache Invalidation
# Tag-based invalidation — group related keys under a setdef cache_product(product_id: int, category_id: int, data: dict): key = f"product:{product_id}" tag = f"tag:category:{category_id}" pipe = r.pipeline(transaction=True) pipe.setex(key, 3600, json.dumps(data)) pipe.sadd(tag, key) pipe.expire(tag, 3600) pipe.execute()
def invalidate_category(category_id: int): tag = f"tag:category:{category_id}" keys = r.smembers(tag) if keys: r.delete(*keys) r.delete(tag)Session Storage
import timeimport uuid
def create_session(user_id: int, ttl: int = 86400) -> str: session_id = str(uuid.uuid4()) key = f"session:{session_id}" pipe = r.pipeline(transaction=True) pipe.hset(key, mapping={ "user_id": user_id, "created_at": int(time.time()), }) pipe.expire(key, ttl) pipe.execute() return session_id
def get_session(session_id: str) -> dict | None: data = r.hgetall(f"session:{session_id}") return data if data else None
def delete_session(session_id: str): r.delete(f"session:{session_id}")Rate Limiting
Fixed Window (Simple)
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool: key = f"ratelimit:{user_id}:{int(time.time()) // window}" pipe = r.pipeline(transaction=True) pipe.incr(key) pipe.expire(key, window) count, _ = pipe.execute() return count > limitSliding Window (Lua — Atomic)
-- sliding_window.lualocal key = KEYS[1]local now = tonumber(ARGV[1])local window = tonumber(ARGV[2])local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)local count = redis.call('ZCARD', key)
if count < limit then -- Use unique member (now + sequence) to avoid collisions within the same millisecond local seq_key = key .. ':seq' local seq = redis.call('INCR', seq_key) redis.call('EXPIRE', seq_key, math.ceil(window / 1000)) redis.call('ZADD', key, now, now .. '-' .. seq) redis.call('EXPIRE', key, math.ceil(window / 1000)) return 1endreturn 0sliding_window = r.register_script(open('sliding_window.lua').read())
def allow_request(user_id: int) -> bool: key = f"ratelimit:sliding:{user_id}" now = int(time.time() * 1000) return bool(sliding_window(keys=[key], args=[now, 60000, 100]))Distributed Locks
Distributed Lock (Single Node — SET NX PX)
import uuid
def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None: lock_key = f"lock:{resource}" token = str(uuid.uuid4()) acquired = r.set(lock_key, token, px=ttl_ms, nx=True) return token if acquired else None
def release_lock(resource: str, token: str) -> bool: release_script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ result = r.eval(release_script, 1, f"lock:{resource}", token) return bool(result)
# Usagetoken = acquire_lock("order:payment:123")if token: try: process_payment() finally: release_lock("order:payment:123", token)For multi-node setups use the
redlock-pylibrary which implements the full Redlock algorithm.
Pub/Sub & Streams
Pub/Sub (Fire-and-Forget)
# Publisherdef publish_event(channel: str, payload: dict): r.publish(channel, json.dumps(payload))
# Subscriber (blocking — run in separate thread/process)def subscribe_events(channel: str): pubsub = r.pubsub() pubsub.subscribe(channel) for message in pubsub.listen(): if message['type'] == 'message': handle(json.loads(message['data']))Redis Streams (Durable Queue)
# Producerdef emit(stream: str, event: dict): r.xadd(stream, event, maxlen=10000) # Cap stream length
# Consumer group — guarantees at-least-once deliverytry: r.xgroup_create('events:orders', 'processor', id='0', mkstream=True)except Exception: pass # Group already exists
def consume(stream: str, group: str, consumer: str): while True: messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000) for _, entries in (messages or []): for msg_id, data in entries: process(data) r.xack(stream, group, msg_id)Prefer Streams over Pub/Sub when you need delivery guarantees, consumer groups, or replay.
Key Design
Naming Conventions
# Pattern: resource:id:fielduser:123:profileorder:456:statuscache:product:789
# Pattern: namespace:resource:idmyapp:session:abc123myapp:ratelimit:user:123
# Pattern: resource:date (time-bound keys)stats:pageviews:2024-01-01TTL Strategy
| Data Type | Suggested TTL |
|---|---|
| User session | 24h (86400) |
| API response cache | 5–15 min |
| Rate limit window | Match window size |
| Short-lived tokens | 5–10 min |
| Leaderboard | 1h–24h |
| Static/reference data | 1h–1 week |
Always set a TTL. Keys without TTL accumulate indefinitely and cause memory pressure.
Connection Management
Connection Pooling
from redis import ConnectionPool, Redis
pool = ConnectionPool( host='localhost', port=6379, db=0, max_connections=20, decode_responses=True, socket_connect_timeout=2, socket_timeout=2,)
r = Redis(connection_pool=pool)Cluster Mode
from redis.cluster import RedisCluster
r = RedisCluster( startup_nodes=[{"host": "redis-1", "port": 6379}], decode_responses=True, skip_full_coverage_check=True,)Sentinel (High Availability)
from redis.sentinel import Sentinel
sentinel = Sentinel( [('sentinel-1', 26379), ('sentinel-2', 26379)], socket_timeout=0.5,)master = sentinel.master_for('mymaster', decode_responses=True)replica = sentinel.slave_for('mymaster', decode_responses=True)Eviction Policies
| Policy | Behavior | Best For |
|---|---|---|
noeviction | Error on write when full | Queues / critical data |
allkeys-lru | Evict least recently used | General cache |
volatile-lru | LRU only among keys with TTL | Mixed data store |
allkeys-lfu | Evict least frequently used | Skewed access patterns |
volatile-ttl | Evict soonest-to-expire | Prioritize long-lived data |
Set via redis.conf: maxmemory-policy allkeys-lru
Anti-Patterns
| Anti-Pattern | Problem | Fix |
|---|---|---|
| Keys with no TTL | Memory grows unbounded | Always set TTL |
KEYS * in production | Blocks the server (O(N)) | Use SCAN cursor |
| Storing large blobs (>100KB) | Slow serialization, memory pressure | Store reference + fetch from object store |
| Single Redis for everything | No isolation between cache & queue | Use separate DBs or instances |
| Ignoring connection pool limits | Connection exhaustion under load | Size pool to workload |
| Not handling cache miss stampede | Thundering herd on cold start | Use locks or probabilistic early expiry |
FLUSHALL without thought | Wipes entire instance | Scope deletes by key pattern |
Cache Miss Stampede Prevention
import threading
_locks: dict[str, threading.Lock] = {}_locks_mutex = threading.Lock()
def get_with_lock(key: str, fetch_fn, ttl: int = 300): cached = r.get(key) if cached: return json.loads(cached)
with _locks_mutex: if key not in _locks: _locks[key] = threading.Lock() lock = _locks[key] with lock: cached = r.get(key) # Re-check after acquiring lock if cached: return json.loads(cached) value = fetch_fn() r.setex(key, ttl, json.dumps(value)) return valueNote: for multi-process deployments, replace the in-process lock with
acquire_lock/release_lockfrom the Distributed Locks section above.
Examples
Add caching to a Django/Flask API endpoint:
Use cache-aside with setex and a 5-minute TTL on the response. Key on the request parameters.
Rate-limit an API by user:
Use fixed-window with pipeline(transaction=True) for low-traffic endpoints; use sliding-window Lua for accurate per-user throttling.
Coordinate a background job across workers:
Use acquire_lock with a TTL that exceeds the expected job duration. Always release in a finally block.
Fan-out notifications to multiple subscribers: Use Pub/Sub for fire-and-forget. Switch to Streams if you need guaranteed delivery or replay for late consumers.
Quick Reference
| Pattern | When to Use |
|---|---|
| Cache-aside | Read-heavy, tolerate slight staleness |
| Write-through | Strong consistency required |
| Distributed lock | Prevent concurrent access to a resource |
| Sliding window rate limit | Accurate per-user throttling |
| Redis Streams | Durable event queue with consumer groups |
| Pub/Sub | Broadcast with no delivery guarantees needed |
| Sorted Set leaderboard | Ranked scoring, pagination |
| HyperLogLog | Approximate unique count at low memory |
Related
- Skill:
postgres-patterns— relational data patterns - Skill:
backend-patterns— API and service layer patterns - Skill:
database-migrations— schema versioning - Skill:
django-patterns— Django cache framework integration - Agent:
database-reviewer— full database review workflow