MongoDB's aggregation pipeline processes documents through stages. Here's how to use it effectively.
Basic Stages#
1// $match - Filter documents
2db.orders.aggregate([
3 { $match: { status: 'completed', total: { $gte: 100 } } }
4]);
5
6// $project - Shape output
7db.users.aggregate([
8 {
9 $project: {
10 fullName: { $concat: ['$firstName', ' ', '$lastName'] },
11 email: 1,
12 _id: 0
13 }
14 }
15]);
16
17// $sort - Order results
18db.products.aggregate([
19 { $sort: { price: -1, name: 1 } }
20]);
21
22// $limit and $skip - Pagination
23db.posts.aggregate([
24 { $sort: { createdAt: -1 } },
25 { $skip: 20 },
26 { $limit: 10 }
27]);
28
29// $count - Count documents
30db.orders.aggregate([
31 { $match: { status: 'pending' } },
32 { $count: 'pendingOrders' }
33]);Grouping and Accumulation#
1// Basic $group
2db.orders.aggregate([
3 {
4 $group: {
5 _id: '$customerId',
6 totalSpent: { $sum: '$total' },
7 orderCount: { $sum: 1 },
8 avgOrder: { $avg: '$total' }
9 }
10 }
11]);
12
13// Group by multiple fields
14db.sales.aggregate([
15 {
16 $group: {
17 _id: {
18 year: { $year: '$date' },
19 month: { $month: '$date' }
20 },
21 revenue: { $sum: '$amount' },
22 transactions: { $sum: 1 }
23 }
24 },
25 { $sort: { '_id.year': 1, '_id.month': 1 } }
26]);
27
28// Accumulator operators
29db.products.aggregate([
30 {
31 $group: {
32 _id: '$category',
33 products: { $push: '$name' }, // Array of all names
34 uniqueBrands: { $addToSet: '$brand' }, // Unique brands
35 cheapest: { $min: '$price' },
36 expensive: { $max: '$price' },
37 avgPrice: { $avg: '$price' },
38 firstAdded: { $first: '$createdAt' },
39 lastAdded: { $last: '$createdAt' }
40 }
41 }
42]);
43
44// $push with object
45db.orders.aggregate([
46 {
47 $group: {
48 _id: '$customerId',
49 orders: {
50 $push: {
51 orderId: '$_id',
52 total: '$total',
53 date: '$createdAt'
54 }
55 }
56 }
57 }
58]);Lookups (Joins)#
1// Basic $lookup
2db.orders.aggregate([
3 {
4 $lookup: {
5 from: 'customers',
6 localField: 'customerId',
7 foreignField: '_id',
8 as: 'customer'
9 }
10 },
11 { $unwind: '$customer' } // Convert array to object
12]);
13
14// Pipeline lookup (more flexible)
15db.orders.aggregate([
16 {
17 $lookup: {
18 from: 'products',
19 let: { orderItems: '$items' },
20 pipeline: [
21 {
22 $match: {
23 $expr: { $in: ['$_id', '$$orderItems.productId'] }
24 }
25 },
26 { $project: { name: 1, price: 1 } }
27 ],
28 as: 'productDetails'
29 }
30 }
31]);
32
33// Multiple lookups
34db.orders.aggregate([
35 {
36 $lookup: {
37 from: 'customers',
38 localField: 'customerId',
39 foreignField: '_id',
40 as: 'customer'
41 }
42 },
43 { $unwind: '$customer' },
44 {
45 $lookup: {
46 from: 'products',
47 localField: 'items.productId',
48 foreignField: '_id',
49 as: 'products'
50 }
51 }
52]);Array Operations#
1// $unwind - Flatten arrays
2db.orders.aggregate([
3 { $unwind: '$items' },
4 {
5 $group: {
6 _id: '$items.productId',
7 totalQuantity: { $sum: '$items.quantity' }
8 }
9 }
10]);
11
12// $unwind with preserveNullAndEmptyArrays
13db.users.aggregate([
14 {
15 $unwind: {
16 path: '$addresses',
17 preserveNullAndEmptyArrays: true
18 }
19 }
20]);
21
22// Array expressions
23db.users.aggregate([
24 {
25 $project: {
26 name: 1,
27 primaryEmail: { $arrayElemAt: ['$emails', 0] },
28 emailCount: { $size: '$emails' },
29 hasVerified: { $in: [true, '$emails.verified'] }
30 }
31 }
32]);
33
34// $filter array elements
35db.orders.aggregate([
36 {
37 $project: {
38 expensiveItems: {
39 $filter: {
40 input: '$items',
41 as: 'item',
42 cond: { $gte: ['$$item.price', 100] }
43 }
44 }
45 }
46 }
47]);
48
49// $map array transformation
50db.products.aggregate([
51 {
52 $project: {
53 name: 1,
54 discountedPrices: {
55 $map: {
56 input: '$variants',
57 as: 'variant',
58 in: {
59 size: '$$variant.size',
60 salePrice: { $multiply: ['$$variant.price', 0.9] }
61 }
62 }
63 }
64 }
65 }
66]);
67
68// $reduce
69db.orders.aggregate([
70 {
71 $project: {
72 orderTotal: {
73 $reduce: {
74 input: '$items',
75 initialValue: 0,
76 in: {
77 $add: [
78 '$$value',
79 { $multiply: ['$$this.price', '$$this.quantity'] }
80 ]
81 }
82 }
83 }
84 }
85 }
86]);Date Operations#
1// Extract date parts
2db.orders.aggregate([
3 {
4 $project: {
5 year: { $year: '$createdAt' },
6 month: { $month: '$createdAt' },
7 day: { $dayOfMonth: '$createdAt' },
8 hour: { $hour: '$createdAt' },
9 dayOfWeek: { $dayOfWeek: '$createdAt' }
10 }
11 }
12]);
13
14// Date formatting
15db.events.aggregate([
16 {
17 $project: {
18 formattedDate: {
19 $dateToString: {
20 format: '%Y-%m-%d %H:%M',
21 date: '$timestamp',
22 timezone: 'America/New_York'
23 }
24 }
25 }
26 }
27]);
28
29// Date calculations
30db.subscriptions.aggregate([
31 {
32 $project: {
33 daysRemaining: {
34 $dateDiff: {
35 startDate: '$$NOW',
36 endDate: '$expiresAt',
37 unit: 'day'
38 }
39 }
40 }
41 }
42]);
43
44// Group by date period
45db.sales.aggregate([
46 {
47 $group: {
48 _id: {
49 $dateToString: { format: '%Y-%m-%d', date: '$date' }
50 },
51 dailyTotal: { $sum: '$amount' }
52 }
53 },
54 { $sort: { _id: 1 } }
55]);Conditional Logic#
1// $cond (if-then-else)
2db.products.aggregate([
3 {
4 $project: {
5 name: 1,
6 priceCategory: {
7 $cond: {
8 if: { $gte: ['$price', 100] },
9 then: 'premium',
10 else: 'standard'
11 }
12 }
13 }
14 }
15]);
16
17// $switch (multiple conditions)
18db.orders.aggregate([
19 {
20 $project: {
21 status: 1,
22 priority: {
23 $switch: {
24 branches: [
25 { case: { $eq: ['$status', 'urgent'] }, then: 1 },
26 { case: { $eq: ['$status', 'high'] }, then: 2 },
27 { case: { $eq: ['$status', 'normal'] }, then: 3 }
28 ],
29 default: 4
30 }
31 }
32 }
33 }
34]);
35
36// $ifNull
37db.users.aggregate([
38 {
39 $project: {
40 displayName: { $ifNull: ['$nickname', '$firstName'] }
41 }
42 }
43]);Text Search#
1// Text search (requires text index)
2db.articles.createIndex({ title: 'text', content: 'text' });
3
4db.articles.aggregate([
5 { $match: { $text: { $search: 'mongodb aggregation' } } },
6 { $addFields: { score: { $meta: 'textScore' } } },
7 { $sort: { score: -1 } },
8 { $limit: 10 }
9]);Faceted Search#
1// Multiple facets in one query
2db.products.aggregate([
3 {
4 $facet: {
5 categoryCounts: [
6 { $group: { _id: '$category', count: { $sum: 1 } } }
7 ],
8 priceBuckets: [
9 {
10 $bucket: {
11 groupBy: '$price',
12 boundaries: [0, 50, 100, 200, 500],
13 default: '500+',
14 output: { count: { $sum: 1 } }
15 }
16 }
17 ],
18 topRated: [
19 { $sort: { rating: -1 } },
20 { $limit: 5 },
21 { $project: { name: 1, rating: 1 } }
22 ]
23 }
24 }
25]);
26
27// $bucket for ranges
28db.employees.aggregate([
29 {
30 $bucket: {
31 groupBy: '$salary',
32 boundaries: [30000, 50000, 75000, 100000, 150000],
33 default: 'Other',
34 output: {
35 count: { $sum: 1 },
36 employees: { $push: '$name' }
37 }
38 }
39 }
40]);Output Stages#
1// $out - Write to collection (replaces)
2db.orders.aggregate([
3 { $match: { status: 'completed' } },
4 {
5 $group: {
6 _id: '$customerId',
7 totalSpent: { $sum: '$total' }
8 }
9 },
10 { $out: 'customer_spending' }
11]);
12
13// $merge - Upsert to collection
14db.dailySales.aggregate([
15 {
16 $group: {
17 _id: { date: '$date', product: '$productId' },
18 totalSold: { $sum: '$quantity' }
19 }
20 },
21 {
22 $merge: {
23 into: 'salesSummary',
24 on: '_id',
25 whenMatched: 'replace',
26 whenNotMatched: 'insert'
27 }
28 }
29]);Performance#
1// Use indexes with $match first
2db.orders.aggregate([
3 { $match: { status: 'completed', date: { $gte: ISODate('2023-01-01') } } },
4 { $group: { _id: '$customerId', total: { $sum: '$amount' } } }
5]);
6
7// Explain aggregation
8db.orders.explain('executionStats').aggregate([
9 { $match: { status: 'completed' } },
10 { $group: { _id: '$customerId', total: { $sum: 1 } } }
11]);
12
13// Use $project early to reduce document size
14db.largeCollection.aggregate([
15 { $match: { active: true } },
16 { $project: { name: 1, value: 1 } }, // Reduce fields early
17 { $group: { _id: '$category', total: { $sum: '$value' } } }
18]);
19
20// Allow disk use for large aggregations
21db.bigData.aggregate(
22 [{ $group: { _id: '$field', count: { $sum: 1 } } }],
23 { allowDiskUse: true }
24);Best Practices#
Pipeline Order:
✓ $match early to filter
✓ $project early to reduce size
✓ $sort after $match for index use
✓ $limit after $sort
Performance:
✓ Create indexes for $match fields
✓ Use $project to limit fields
✓ Avoid $unwind when possible
✓ Use allowDiskUse for large data
Design:
✓ Break complex pipelines into steps
✓ Test each stage independently
✓ Use explain() to analyze
✓ Consider $merge for materialized views
Conclusion#
MongoDB's aggregation pipeline is powerful for data transformation and analysis. Structure pipelines with filtering first, use indexes effectively, and break complex operations into readable stages. For production, always analyze with explain() and consider creating materialized views for frequently-run aggregations.