MongoDB’s aggregation pipeline is one of its most powerful features, enabling complex data transformations and analysis. This guide covers advanced patterns, optimization techniques, and real-world use cases.
Understanding the Aggregation Pipeline
The aggregation pipeline processes documents through a sequence of stages, where each stage transforms the document stream.
Basic Pipeline Structure
db.collection.aggregate([
{ $match: { status: "active" } }, // Stage 1: Filter
{ $group: { _id: "$category", count: { $sum: 1 } } }, // Stage 2: Group
{ $sort: { count: -1 } } // Stage 3: Sort
])
Each stage receives documents from the previous stage, transforms them, and passes results to the next stage.
Essential Pipeline Stages
$match: Filtering Documents
Always place $match as early as possible to reduce documents processed by subsequent stages:
// Good: Filter early
db.orders.aggregate([
{ $match: { status: "completed", total: { $gte: 100 } } },
{ $group: { _id: "$customerId", totalSpent: { $sum: "$total" } } }
])
// Bad: Filter late (processes unnecessary documents)
db.orders.aggregate([
{ $group: { _id: "$customerId", totalSpent: { $sum: "$total" } } },
{ $match: { totalSpent: { $gte: 1000 } } }
])
Performance tip: $match can use indexes, but only when placed before stages that modify document structure.
$project: Reshaping Documents
Control which fields pass through the pipeline:
db.users.aggregate([
{
$project: {
_id: 0, // Exclude _id
name: 1, // Include name
email: 1, // Include email
fullName: { // Computed field
$concat: ["$firstName", " ", "$lastName"]
},
ageInMonths: { // Math operation
$multiply: ["$age", 12]
}
}
}
])
When to use: Reduce document size before expensive operations like $group or $lookup.
$group: Aggregating Data
Group documents by a key and perform calculations:
db.sales.aggregate([
{
$group: {
_id: { // Compound group key
year: { $year: "$date" },
month: { $month: "$date" }
},
totalRevenue: { $sum: "$amount" },
avgOrderValue: { $avg: "$amount" },
orderCount: { $sum: 1 },
maxOrder: { $max: "$amount" },
minOrder: { $min: "$amount" },
uniqueCustomers: { $addToSet: "$customerId" }
}
}
])
Common accumulators:
$sum: Total sum$avg: Average value$max,$min: Maximum/minimum value$push: Array of all values$addToSet: Array of unique values$first,$last: First/last value in group
$lookup: Joining Collections
Perform left outer joins between collections:
db.orders.aggregate([
{
$lookup: {
from: "customers", // Collection to join
localField: "customerId", // Field in orders
foreignField: "_id", // Field in customers
as: "customerInfo" // Output array name
}
},
{
$unwind: "$customerInfo" // Convert array to object
}
])
Performance warning: $lookup is expensive. Use indexes on both localField and foreignField, and filter data before the lookup when possible.
$unwind: Deconstructing Arrays
Expand array fields into separate documents:
// Document before $unwind:
// { _id: 1, tags: ["mongodb", "database", "nosql"] }
db.articles.aggregate([
{ $unwind: "$tags" }
])
// After $unwind (3 documents):
// { _id: 1, tags: "mongodb" }
// { _id: 1, tags: "database" }
// { _id: 1, tags: "nosql" }
Useful for analyzing array elements:
db.articles.aggregate([
{ $unwind: "$tags" },
{ $group: { _id: "$tags", count: { $sum: 1 } } },
{ $sort: { count: -1 } }
])
// Result: Most common tags across all articles
Advanced Pipeline Patterns
Pattern 1: Multi-Stage Filtering
Combine index-friendly and complex filters:
db.products.aggregate([
// Stage 1: Index-optimized filter
{ $match: { category: "electronics", price: { $gte: 100 } } },
// Stage 2: Add computed fields
{ $addFields: { discountedPrice: { $multiply: ["$price", 0.9] } } },
// Stage 3: Complex filter on computed field
{ $match: { discountedPrice: { $lte: 500 } } }
])
Pattern 2: Conditional Aggregation
Use $cond for conditional logic:
db.orders.aggregate([
{
$group: {
_id: "$customerId",
premiumOrders: {
$sum: {
$cond: [
{ $gte: ["$total", 1000] }, // Condition
1, // Value if true
0 // Value if false
]
}
},
standardOrders: {
$sum: {
$cond: [{ $lt: ["$total", 1000] }, 1, 0]
}
},
totalRevenue: { $sum: "$total" }
}
}
])
Pattern 3: Bucketing Data
Group data into ranges using $bucket:
db.users.aggregate([
{
$bucket: {
groupBy: "$age",
boundaries: [0, 18, 30, 50, 65, 100],
default: "Other",
output: {
count: { $sum: 1 },
users: { $push: "$name" }
}
}
}
])
// Result:
// { _id: 18, count: 245, users: [...] } // Ages 18-29
// { _id: 30, count: 412, users: [...] } // Ages 30-49
// { _id: 50, count: 189, users: [...] } // Ages 50-64
Pattern 4: Time-Series Analysis
Analyze data over time windows:
db.metrics.aggregate([
{
$match: {
timestamp: {
$gte: ISODate("2024-01-01"),
$lt: ISODate("2025-01-01")
}
}
},
{
$group: {
_id: {
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
day: { $dayOfMonth: "$timestamp" }
},
avgValue: { $avg: "$value" },
maxValue: { $max: "$value" },
minValue: { $min: "$value" },
dataPoints: { $sum: 1 }
}
},
{
$sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 }
}
])
Pattern 5: Top N per Group
Find top items within each group:
db.products.aggregate([
{
$sort: { category: 1, sales: -1 } // Sort by category, then sales descending
},
{
$group: {
_id: "$category",
topProducts: {
$push: {
name: "$name",
sales: "$sales"
}
}
}
},
{
$project: {
topProducts: { $slice: ["$topProducts", 5] } // Keep only top 5
}
}
])
Optimization Techniques
1. Use Indexes Strategically
// Create compound index for aggregation
db.orders.createIndex({ status: 1, createdAt: 1 })
// This pipeline uses the index:
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $sort: { createdAt: -1 } },
{ $limit: 100 }
])
// Verify with explain():
db.orders.explain("executionStats").aggregate([...])
2. Limit Early and Often
db.articles.aggregate([
{ $match: { published: true } },
{ $sort: { views: -1 } },
{ $limit: 10 }, // Limit before expensive operations
{
$lookup: {
from: "authors",
localField: "authorId",
foreignField: "_id",
as: "author"
}
}
])
3. Use $project to Reduce Document Size
db.logs.aggregate([
{
$project: {
_id: 0,
timestamp: 1,
level: 1,
message: 1
// Exclude large fields like `stack_trace` or `metadata`
}
},
{
$group: {
_id: { $hour: "$timestamp" },
errorCount: {
$sum: { $cond: [{ $eq: ["$level", "error"] }, 1, 0] }
}
}
}
])
4. allowDiskUse for Large Datasets
db.hugecollection.aggregate(
[...],
{ allowDiskUse: true } // Enable disk usage for sorts exceeding 100MB memory
)
Warning: Disk operations are much slower. Better to optimize the pipeline to avoid disk usage.
5. Pipeline Order Matters
Optimal stage ordering:
- $match (with indexes)
- $sort (with indexes)
- $limit
- $project (reduce document size)
- $unwind
- $group
- $lookup
- $match (on computed fields)
- $sort (on computed fields)
Real-World Use Cases
E-Commerce: Customer Lifetime Value
db.orders.aggregate([
{
$match: {
status: "completed",
createdAt: { $gte: ISODate("2024-01-01") }
}
},
{
$group: {
_id: "$customerId",
totalSpent: { $sum: "$total" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$total" },
firstOrder: { $min: "$createdAt" },
lastOrder: { $max: "$createdAt" }
}
},
{
$addFields: {
customerLifetimeDays: {
$divide: [
{ $subtract: ["$lastOrder", "$firstOrder"] },
1000 * 60 * 60 * 24
]
}
}
},
{
$match: { totalSpent: { $gte: 1000 } }
},
{
$sort: { totalSpent: -1 }
}
])
Analytics: Cohort Analysis
db.users.aggregate([
{
$addFields: {
cohortMonth: {
$dateToString: {
format: "%Y-%m",
date: "$registeredAt"
}
}
}
},
{
$lookup: {
from: "activities",
let: { userId: "$_id" },
pipeline: [
{ $match: { $expr: { $eq: ["$userId", "$$userId"] } } },
{
$group: {
_id: {
$dateToString: {
format: "%Y-%m",
date: "$timestamp"
}
},
activityCount: { $sum: 1 }
}
}
],
as: "monthlyActivity"
}
},
{
$group: {
_id: "$cohortMonth",
totalUsers: { $sum: 1 },
activeUsers: {
$sum: {
$cond: [{ $gt: [{ $size: "$monthlyActivity" }, 0] }, 1, 0]
}
}
}
},
{
$project: {
totalUsers: 1,
activeUsers: 1,
retentionRate: {
$multiply: [
{ $divide: ["$activeUsers", "$totalUsers"] },
100
]
}
}
}
])
Common Pitfalls
1. Unbounded $lookup
// Bad: Joins entire collections
db.orders.aggregate([
{
$lookup: {
from: "products", // Large collection
localField: "productId",
foreignField: "_id",
as: "product"
}
}
])
// Better: Filter in lookup pipeline
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { productId: "$productId" },
pipeline: [
{ $match: { $expr: { $eq: ["$_id", "$$productId"] }, inStock: true } },
{ $project: { name: 1, price: 1 } }
],
as: "product"
}
}
])
2. Missing Indexes on $match
Always create indexes for fields in $match:
// Create index first
db.events.createIndex({ type: 1, timestamp: -1 })
// Then aggregate
db.events.aggregate([
{ $match: { type: "click", timestamp: { $gte: recentDate } } }
])
3. Memory Limits
Aggregation stages have a 100MB memory limit by default:
// This might fail on large datasets
db.huge.aggregate([
{ $group: { _id: "$category", items: { $push: "$$ROOT" } } }
])
// Solution: Use allowDiskUse or redesign pipeline
db.huge.aggregate(
[{ $group: { _id: "$category", count: { $sum: 1 } } }],
{ allowDiskUse: true }
)
Conclusion
MongoDB’s aggregation pipeline is incredibly powerful for data analysis and transformation. Key takeaways:
- Place $match and $limit early to reduce documents processed
- Use indexes on $match and $sort fields
- Minimize $lookup usage and filter within lookup pipelines
- Project only needed fields to reduce memory usage
- Test with explain() to verify index usage and performance
Start with simple pipelines, measure performance, and add complexity incrementally. The aggregation framework scales beautifully when used correctly, handling billions of documents efficiently.
Remember: The best pipeline is the one that returns the right data with the fewest stages.