Back to Blog
MongoDBDatabaseAggregationNoSQL

MongoDB Aggregation Pipeline Guide

Master MongoDB aggregation. From basic stages to complex pipelines to performance optimization.

B
Bootspring Team
Engineering
September 17, 2021
7 min read

MongoDB's aggregation pipeline processes documents through stages. Here's how to use it effectively.

Basic Stages#

1// $match - Filter documents 2db.orders.aggregate([ 3 { $match: { status: 'completed', total: { $gte: 100 } } } 4]); 5 6// $project - Shape output 7db.users.aggregate([ 8 { 9 $project: { 10 fullName: { $concat: ['$firstName', ' ', '$lastName'] }, 11 email: 1, 12 _id: 0 13 } 14 } 15]); 16 17// $sort - Order results 18db.products.aggregate([ 19 { $sort: { price: -1, name: 1 } } 20]); 21 22// $limit and $skip - Pagination 23db.posts.aggregate([ 24 { $sort: { createdAt: -1 } }, 25 { $skip: 20 }, 26 { $limit: 10 } 27]); 28 29// $count - Count documents 30db.orders.aggregate([ 31 { $match: { status: 'pending' } }, 32 { $count: 'pendingOrders' } 33]);

Grouping and Accumulation#

1// Basic $group 2db.orders.aggregate([ 3 { 4 $group: { 5 _id: '$customerId', 6 totalSpent: { $sum: '$total' }, 7 orderCount: { $sum: 1 }, 8 avgOrder: { $avg: '$total' } 9 } 10 } 11]); 12 13// Group by multiple fields 14db.sales.aggregate([ 15 { 16 $group: { 17 _id: { 18 year: { $year: '$date' }, 19 month: { $month: '$date' } 20 }, 21 revenue: { $sum: '$amount' }, 22 transactions: { $sum: 1 } 23 } 24 }, 25 { $sort: { '_id.year': 1, '_id.month': 1 } } 26]); 27 28// Accumulator operators 29db.products.aggregate([ 30 { 31 $group: { 32 _id: '$category', 33 products: { $push: '$name' }, // Array of all names 34 uniqueBrands: { $addToSet: '$brand' }, // Unique brands 35 cheapest: { $min: '$price' }, 36 expensive: { $max: '$price' }, 37 avgPrice: { $avg: '$price' }, 38 firstAdded: { $first: '$createdAt' }, 39 lastAdded: { $last: '$createdAt' } 40 } 41 } 42]); 43 44// $push with object 45db.orders.aggregate([ 46 { 47 $group: { 48 _id: '$customerId', 49 orders: { 50 $push: { 51 orderId: '$_id', 52 total: '$total', 53 date: '$createdAt' 54 } 55 } 56 } 57 } 58]);

Lookups (Joins)#

1// Basic $lookup 2db.orders.aggregate([ 3 { 4 $lookup: { 5 from: 'customers', 6 localField: 'customerId', 7 foreignField: '_id', 8 as: 'customer' 9 } 10 }, 11 { $unwind: '$customer' } // Convert array to object 12]); 13 14// Pipeline lookup (more flexible) 15db.orders.aggregate([ 16 { 17 $lookup: { 18 from: 'products', 19 let: { orderItems: '$items' }, 20 pipeline: [ 21 { 22 $match: { 23 $expr: { $in: ['$_id', '$$orderItems.productId'] } 24 } 25 }, 26 { $project: { name: 1, price: 1 } } 27 ], 28 as: 'productDetails' 29 } 30 } 31]); 32 33// Multiple lookups 34db.orders.aggregate([ 35 { 36 $lookup: { 37 from: 'customers', 38 localField: 'customerId', 39 foreignField: '_id', 40 as: 'customer' 41 } 42 }, 43 { $unwind: '$customer' }, 44 { 45 $lookup: { 46 from: 'products', 47 localField: 'items.productId', 48 foreignField: '_id', 49 as: 'products' 50 } 51 } 52]);

Array Operations#

1// $unwind - Flatten arrays 2db.orders.aggregate([ 3 { $unwind: '$items' }, 4 { 5 $group: { 6 _id: '$items.productId', 7 totalQuantity: { $sum: '$items.quantity' } 8 } 9 } 10]); 11 12// $unwind with preserveNullAndEmptyArrays 13db.users.aggregate([ 14 { 15 $unwind: { 16 path: '$addresses', 17 preserveNullAndEmptyArrays: true 18 } 19 } 20]); 21 22// Array expressions 23db.users.aggregate([ 24 { 25 $project: { 26 name: 1, 27 primaryEmail: { $arrayElemAt: ['$emails', 0] }, 28 emailCount: { $size: '$emails' }, 29 hasVerified: { $in: [true, '$emails.verified'] } 30 } 31 } 32]); 33 34// $filter array elements 35db.orders.aggregate([ 36 { 37 $project: { 38 expensiveItems: { 39 $filter: { 40 input: '$items', 41 as: 'item', 42 cond: { $gte: ['$$item.price', 100] } 43 } 44 } 45 } 46 } 47]); 48 49// $map array transformation 50db.products.aggregate([ 51 { 52 $project: { 53 name: 1, 54 discountedPrices: { 55 $map: { 56 input: '$variants', 57 as: 'variant', 58 in: { 59 size: '$$variant.size', 60 salePrice: { $multiply: ['$$variant.price', 0.9] } 61 } 62 } 63 } 64 } 65 } 66]); 67 68// $reduce 69db.orders.aggregate([ 70 { 71 $project: { 72 orderTotal: { 73 $reduce: { 74 input: '$items', 75 initialValue: 0, 76 in: { 77 $add: [ 78 '$$value', 79 { $multiply: ['$$this.price', '$$this.quantity'] } 80 ] 81 } 82 } 83 } 84 } 85 } 86]);

Date Operations#

1// Extract date parts 2db.orders.aggregate([ 3 { 4 $project: { 5 year: { $year: '$createdAt' }, 6 month: { $month: '$createdAt' }, 7 day: { $dayOfMonth: '$createdAt' }, 8 hour: { $hour: '$createdAt' }, 9 dayOfWeek: { $dayOfWeek: '$createdAt' } 10 } 11 } 12]); 13 14// Date formatting 15db.events.aggregate([ 16 { 17 $project: { 18 formattedDate: { 19 $dateToString: { 20 format: '%Y-%m-%d %H:%M', 21 date: '$timestamp', 22 timezone: 'America/New_York' 23 } 24 } 25 } 26 } 27]); 28 29// Date calculations 30db.subscriptions.aggregate([ 31 { 32 $project: { 33 daysRemaining: { 34 $dateDiff: { 35 startDate: '$$NOW', 36 endDate: '$expiresAt', 37 unit: 'day' 38 } 39 } 40 } 41 } 42]); 43 44// Group by date period 45db.sales.aggregate([ 46 { 47 $group: { 48 _id: { 49 $dateToString: { format: '%Y-%m-%d', date: '$date' } 50 }, 51 dailyTotal: { $sum: '$amount' } 52 } 53 }, 54 { $sort: { _id: 1 } } 55]);

Conditional Logic#

1// $cond (if-then-else) 2db.products.aggregate([ 3 { 4 $project: { 5 name: 1, 6 priceCategory: { 7 $cond: { 8 if: { $gte: ['$price', 100] }, 9 then: 'premium', 10 else: 'standard' 11 } 12 } 13 } 14 } 15]); 16 17// $switch (multiple conditions) 18db.orders.aggregate([ 19 { 20 $project: { 21 status: 1, 22 priority: { 23 $switch: { 24 branches: [ 25 { case: { $eq: ['$status', 'urgent'] }, then: 1 }, 26 { case: { $eq: ['$status', 'high'] }, then: 2 }, 27 { case: { $eq: ['$status', 'normal'] }, then: 3 } 28 ], 29 default: 4 30 } 31 } 32 } 33 } 34]); 35 36// $ifNull 37db.users.aggregate([ 38 { 39 $project: { 40 displayName: { $ifNull: ['$nickname', '$firstName'] } 41 } 42 } 43]);
1// Text search (requires text index) 2db.articles.createIndex({ title: 'text', content: 'text' }); 3 4db.articles.aggregate([ 5 { $match: { $text: { $search: 'mongodb aggregation' } } }, 6 { $addFields: { score: { $meta: 'textScore' } } }, 7 { $sort: { score: -1 } }, 8 { $limit: 10 } 9]);
1// Multiple facets in one query 2db.products.aggregate([ 3 { 4 $facet: { 5 categoryCounts: [ 6 { $group: { _id: '$category', count: { $sum: 1 } } } 7 ], 8 priceBuckets: [ 9 { 10 $bucket: { 11 groupBy: '$price', 12 boundaries: [0, 50, 100, 200, 500], 13 default: '500+', 14 output: { count: { $sum: 1 } } 15 } 16 } 17 ], 18 topRated: [ 19 { $sort: { rating: -1 } }, 20 { $limit: 5 }, 21 { $project: { name: 1, rating: 1 } } 22 ] 23 } 24 } 25]); 26 27// $bucket for ranges 28db.employees.aggregate([ 29 { 30 $bucket: { 31 groupBy: '$salary', 32 boundaries: [30000, 50000, 75000, 100000, 150000], 33 default: 'Other', 34 output: { 35 count: { $sum: 1 }, 36 employees: { $push: '$name' } 37 } 38 } 39 } 40]);

Output Stages#

1// $out - Write to collection (replaces) 2db.orders.aggregate([ 3 { $match: { status: 'completed' } }, 4 { 5 $group: { 6 _id: '$customerId', 7 totalSpent: { $sum: '$total' } 8 } 9 }, 10 { $out: 'customer_spending' } 11]); 12 13// $merge - Upsert to collection 14db.dailySales.aggregate([ 15 { 16 $group: { 17 _id: { date: '$date', product: '$productId' }, 18 totalSold: { $sum: '$quantity' } 19 } 20 }, 21 { 22 $merge: { 23 into: 'salesSummary', 24 on: '_id', 25 whenMatched: 'replace', 26 whenNotMatched: 'insert' 27 } 28 } 29]);

Performance#

1// Use indexes with $match first 2db.orders.aggregate([ 3 { $match: { status: 'completed', date: { $gte: ISODate('2023-01-01') } } }, 4 { $group: { _id: '$customerId', total: { $sum: '$amount' } } } 5]); 6 7// Explain aggregation 8db.orders.explain('executionStats').aggregate([ 9 { $match: { status: 'completed' } }, 10 { $group: { _id: '$customerId', total: { $sum: 1 } } } 11]); 12 13// Use $project early to reduce document size 14db.largeCollection.aggregate([ 15 { $match: { active: true } }, 16 { $project: { name: 1, value: 1 } }, // Reduce fields early 17 { $group: { _id: '$category', total: { $sum: '$value' } } } 18]); 19 20// Allow disk use for large aggregations 21db.bigData.aggregate( 22 [{ $group: { _id: '$field', count: { $sum: 1 } } }], 23 { allowDiskUse: true } 24);

Best Practices#

Pipeline Order: ✓ $match early to filter ✓ $project early to reduce size ✓ $sort after $match for index use ✓ $limit after $sort Performance: ✓ Create indexes for $match fields ✓ Use $project to limit fields ✓ Avoid $unwind when possible ✓ Use allowDiskUse for large data Design: ✓ Break complex pipelines into steps ✓ Test each stage independently ✓ Use explain() to analyze ✓ Consider $merge for materialized views

Conclusion#

MongoDB's aggregation pipeline is powerful for data transformation and analysis. Structure pipelines with filtering first, use indexes effectively, and break complex operations into readable stages. For production, always analyze with explain() and consider creating materialized views for frequently-run aggregations.

Share this article

Help spread the word about Bootspring