MongoDB 聚合框架实战:取代复杂 SQL 的文档操作的详细使用教程
MongoDB 聚合框架实战:取代复杂 SQL 的文档操作的详细使用教程
引言:聚合框架 – MongoDB 的核心武器
MongoDB 的聚合框架(Aggregation Framework)是其最强大的功能之一。它能让你在单个查询中完成复杂的数据处理,媲美甚至超越 SQL 的 JOIN 操作。
今天这篇教程将带你全面掌握 MongoDB 聚合框架,让你的数据查询能力提升一个数量级。
第一章:聚合框架原理
1.1 什么是聚合管道?
聚合管道(Aggregation Pipeline):
数据处理的流水线,每个阶段对输入文档进行处理,
输出作为下一阶段的输入。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ $match │ → │ $group │ → │ $project │ → │ $sort │
│ (过滤) │ │ (分组) │ │ (投影) │ │ (排序) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
↓ ↓ ↓ ↓
文档 文档 文档 文档
1.2 聚合与查询的区别
-- SQL 方式(需要多次查询)
SELECT
u.name,
COUNT(o.id) as order_count,
SUM(o.amount) as total_amount
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE o.status = 'paid'
GROUP BY u.id, u.name
HAVING COUNT(o.id) > 10
ORDER BY total_amount DESC
LIMIT 10;
-- MongoDB 聚合(单次查询)
db.users.aggregate([
{ $match: { status: 'active' } },
{ $lookup: { ... } }, // JOIN
{ $group: { ... } }, // GROUP BY
{ $sort: { ... } }, // ORDER BY
{ $limit: 10 } // LIMIT
])
聚合框架优势:
- 单次查询完成复杂操作
- 减少网络往返
- 服务器端处理,更高效
- 支持数据转换和计算
第二章:核心操作符
2.1 $match – 过滤文档
“`javascript
// 基础用法
db.orders.aggregate([
{ $match: { status: ‘paid’ } }
]);
// 复杂条件
db.orders.aggregate([
{
$match: {
status: ‘paid’,
amount: { $gte: 100 },
created_at: { $gte: new Date(‘2024-01-01’) }
}
}
]);
// 正则表达式匹配
db.users.aggregate([
{ $match: { name: { $regex: ‘^John’, $options: ‘i’ } } }
]);
// 数组包含
db.products.aggregate([
{ $match: { tags: { $in: [‘sale’, ‘new’] } } }
]);
// 嵌套字段
db.orders.aggregate([
{ $match: { ‘shipping.address.city’: ‘Beijing’ } }
]);
// 性能提示:
// 1. 尽量早使用 $match 减少后续处理
// 2. 使用索引字段进行过滤
// 3. 避免在 $match 中使用函数
2.2 $group - 分组聚合
javascript
// 基础分组
db.orders.aggregate([
{ $group: { _id: ‘$status’ } }
]);
// 统计数量
db.orders.aggregate([
{
$group: {
_id: ‘$status’,
count: { $sum: 1 },
totalAmount: { $sum: ‘$amount’ },
avgAmount: { $avg: ‘$amount’ },
minAmount: { $min: ‘$amount’ },
maxAmount: { $max: ‘$amount’ }
}
}
]);
// 多层分组
db.orders.aggregate([
{ $group: { _id: { $dateToString: { format: “%Y-%m”, date: “$created_at” } } } },
{ $group: {
_id: “$month”,
totalOrders: { $sum: 1 },
avgDailyAmount: { $avg: “$dailyTotal” }
}}
]);
// 数组操作
db.users.aggregate([
{
$group: {
_id: “$department”,
skills: { $addToSet: “$skills” },
allEmails: { $push: “$email” }
}
}
]);
// 条件分组
db.sales.aggregate([
{
$group: {
_id: {
region: “$region”,
tier: {
$cond: {
if: { $gte: [“$amount”, 1000] },
then: “premium”,
else: “standard”
}
}
},
totalSales: { $sum: “$amount” }
}
}
]);
2.3 $project - 字段投影
javascript
// 选择字段
db.users.aggregate([
{ $project: { name: 1, email: 1, _id: 0 } }
]);
// 计算字段
db.products.aggregate([
{
$project: {
name: 1,
price: 1,
discountedPrice: { $multiply: [“$price”, 0.9] },
isExpensive: { $gt: [“$price”, 100] }
}
}
]);
// 嵌套字段
db.orders.aggregate([
{
$project: {
orderId: “$_id”,
customerName: “$customer.name”,
customerEmail: “$customer.email”,
totalAmount: “$items.total”
}
}
]);
// 重命名字段
db.products.aggregate([
{
$project: {
productId: “$_id”,
productName: “$name”,
productPrice: “$price”
}
}
]);
// 条件字段
db.employees.aggregate([
{
$project: {
name: 1,
salary: 1,
salaryTier: {
$switch: {
branches: [
{ case: { $lt: [“$salary”, 5000] }, then: “low” },
{ case: { $lt: [“$salary”, 10000] }, then: “medium” },
{ case: { $gte: [“$salary”, 10000] }, then: “high” }
],
default: “unknown”
}
}
}
}
]);
2.4 $lookup - 关联查询(JOIN)
javascript
// 基础关联
db.orders.aggregate([
{
$lookup: {
from: “users”,
localField: “userId”,
foreignField: “_id”,
as: “userInfo”
}
}
]);
// 左连接过滤
db.orders.aggregate([
{
$lookup: {
from: “users”,
localField: “userId”,
foreignField: “_id”,
as: “userInfo”,
pipeline: [
{ $match: { status: ‘active’ } },
{ $project: { name: 1, email: 1 } }
]
}
}
]);
// 多表关联
db.orders.aggregate([
{
$lookup: {
from: “users”,
localField: “userId”,
foreignField: “_id”,
as: “user”
}
},
{
$lookup: {
from: “products”,
localField: “productId”,
foreignField: “_id”,
as: “product”
}
}
]);
// 数组展开
db.orders.aggregate([
{
$lookup: {
from: “order_items”,
localField: “_id”,
foreignField: “orderId”,
as: “items”
}
},
{
$project: {
orderItems: “$items”
}
}
]);
2.5 $sort - 排序
javascript
// 简单排序
db.products.aggregate([
{ $sort: { price: 1 } } // 1 升序,-1 降序
]);
// 多字段排序
db.users.aggregate([
{ $sort: { department: 1, salary: -1 } }
]);
// 排序 + 限制
db.orders.aggregate([
{ $sort: { amount: -1 } },
{ $limit: 10 }
]);
// 分组后排序
db.sales.aggregate([
{ $group: { _id: “$region”, total: { $sum: “$amount” } } },
{ $sort: { total: -1 } },
{ $limit: 5 }
]);
第三章:常用组合场景
3.1 统计报表
javascript
// 按月份统计订单
db.orders.aggregate([
{
$group: {
_id: {
year: { $year: “$created_at” },
month: { $month: “$created_at” }
},
orderCount: { $sum: 1 },
totalAmount: { $sum: “$amount” },
avgAmount: { $avg: “$amount” }
}
},
{
$sort: { “_id.year”: -1, “_id.month”: -1 }
}
]);
// 统计结果示例:
// {
// “_id”: { “year”: 2024, “month”: 3 },
// “orderCount”: 1500,
// “totalAmount”: 750000,
// “avgAmount”: 500
// }
3.2 用户行为分析
javascript
// 用户活动分析
db.users.aggregate([
{ $match: { status: ‘active’ } },
{
$project: {
userId: “$_id”,
totalLogin: { $sum: “$loginStats” },
lastLogin: { $max: “$lastLogins” }
}
},
{
$lookup: {
from: “orders”,
localField: “userId”,
foreignField: “userId”,
as: “orders”
}
},
{
$project: {
userId: 1,
totalLogin: 1,
orderCount: { $size: “$orders” },
totalOrderAmount: { $sum: “$orders.amount” }
}
},
{
$sort: { totalOrderAmount: -1 }
}
]);
3.3 数据聚合转换
javascript
// 时间窗口聚合
db.logs.aggregate([
{
$group: {
_id: {
$dateTrunc: {
date: “$timestamp”,
unit: “hour”
}
},
errorCount: { $sum: { $cond: [{ $eq: [“$level”, “error”] }, 1, 0] } },
warningCount: { $sum: { $cond: [{ $eq: [“$level”, “warning”] }, 1, 0] } },
totalCount: { $sum: 1 }
}
},
{
$project: {
time: “$_id”,
errorRate: {
$divide: [“$errorCount”, { $ifNull: [“$totalCount”, 1] }]
},
warningRate: {
$divide: [“$warningCount”, { $ifNull: [“$totalCount”, 1] }]
}
}
}
]);
第四章:高级操作符
4.1 条件操作符
javascript
// $cond – 条件判断
db.users.aggregate([
{
$project: {
name: 1,
age: 1,
category: {
$cond: {
if: { $lt: [“$age”, 18] },
then: “minor”,
else: “adult”
}
}
}
}
]);
// $switch – 多重条件
db.products.aggregate([
{
$project: {
name: 1,
stock: 1,
status: {
$switch: {
branches: [
{ case: { $lt: [“$stock”, 10] }, then: “low_stock” },
{ case: { $lt: [“$stock”, 50] }, then: “normal” },
{ case: { $gte: [“$stock”, 100] }, then: “abundant” }
],
default: “unknown”
}
}
}
}
]);
// $ifNull – 空值处理
db.orders.aggregate([
{
$project: {
orderId: 1,
shippingCost: { $ifNull: [“$shippingCost”, 0] }
}
}
]);
4.2 数学操作符
javascript
// $multiply – 乘法
db.products.aggregate([
{
$project: {
name: 1,
price: 1,
discount: 0.1,
finalPrice: { $multiply: [“$price”, { $subtract: [1, “$discount”] }] }
}
}
]);
// $add – 加法
db.sales.aggregate([
{
$project: {
baseAmount: 1,
tax: { $multiply: [“$baseAmount”, 0.08] },
shipping: 5,
total: { $add: [“$baseAmount”, “$tax”, “$shipping”] }
}
}
]);
// $ceil / $floor / $round
db.products.aggregate([
{
$project: {
price: 1,
roundedPrice: { $round: [“$price”, 2] },
ceilingPrice: { $ceil: “$price” },
floorPrice: { $floor: “$price” }
}
}
]);
4.3 数组操作符
javascript
// $unwind – 展开数组
db.orders.aggregate([
{ $unwind: “$items” },
{
$group: {
_id: “$userId”,
totalItems: { $sum: “$items.quantity” },
avgItemPrice: { $avg: “$items.price” }
}
}
]);
// $push – 推入数组
db.departments.aggregate([
{
$group: {
_id: “$name”,
employees: { $push: “$name” },
empCount: { $push: “$age” }
}
}
]);
// $map – 映射数组
db.users.aggregate([
{
$project: {
name: 1,
skills: 1,
skillLevels: {
$map: {
input: “$skills”,
as: “skill”,
in: { name: “$$skill”, level: 3 }
}
}
}
}
]);
// $filter – 过滤数组
db.orders.aggregate([
{
$project: {
orderId: 1,
items: 1,
highValueItems: {
$filter: {
input: “$items”,
as: “item”,
cond: { $gt: [“$$item.price”, 100] }
}
}
}
}
]);
4.4 日期操作符
javascript
// $dateToString – 日期格式化
db.orders.aggregate([
{
$project: {
orderId: 1,
createdDate: {
$dateToString: {
format: “%Y-%m-%d %H:%M:%S”,
date: “$created_at”
}
}
}
}
]);
// $dateFromParts – 构造日期
db.users.aggregate([
{
$project: {
name: 1,
age: {
$dateDiff: {
startDate: “$dateOfBirth”,
endDate: new Date(),
unit: “year”
}
}
}
}
]);
// $dateTrunc – 日期截断
db.logs.aggregate([
{
$group: {
_id: {
$dateTrunc: {
date: “$timestamp”,
unit: “day”
}
},
count: { $sum: 1 }
}
}
]);
第五章:实战案例
5.1 电商数据分析
javascript
// 商品销售排行榜
db.orders.aggregate([
{ $match: { status: ‘completed’ } },
{ $unwind: “$items” },
{
$group: {
_id: “$items.productId”,
totalSold: { $sum: “$items.quantity” },
revenue: { $sum: { $multiply: [“$items.quantity”, “$items.price”] } },
avgRating: { $avg: “$items.rating” },
orderCount: { $sum: 1 }
}
},
{
$lookup: {
from: “products”,
localField: “_id”,
foreignField: “_id”,
as: “product”
}
},
{ $unwind: “$product” },
{
$project: {
productId: “$_id”,
productName: “$product.name”,
category: “$product.category”,
totalSold: 1,
revenue: 1,
avgRating: 1,
orderCount: 1
}
},
{ $sort: { revenue: -1 } },
{ $limit: 10 }
]);
5.2 用户行为分析
javascript
// 用户活跃度分析
db.users.aggregate([
{
$project: {
userId: “$_id”,
lastActive: { $max: “$loginHistory” },
loginCount: { $size: “$loginHistory” },
lastLogin: { $first: “$loginHistory” }
}
},
{
$project: {
userId: 1,
loginCount: 1,
daysSinceLastLogin: {
$subtract: [
new Date(),
{ $toDate: “$lastActive” }
]
},
isActive: { $gt: [“$loginCount”, 10] }
}
}
]);
5.3 时间序列分析
javascript
// 按小时统计系统指标
db.metrics.aggregate([
{
$group: {
_id: {
$dateTrunc: {
date: “$timestamp”,
unit: “hour”
}
},
avgCpu: { $avg: “$cpu” },
avgMemory: { $avg: “$memory” },
requestCount: { $sum: 1 },
errorCount: {
$sum: { $cond: [{ $eq: [“$status”, 500] }, 1, 0] }
}
}
},
{
$sort: { “_id”: 1 }
}
]);
第六章:性能优化
6.1 优化原则
javascript
// ✅ 好的实践
// 1. $match 在管道开始
db.orders.aggregate([
{ $match: { status: ‘paid’ } }, // 先用索引过滤
{ $group: { _id: ‘$userId’, total: { $sum: ‘$amount’ } } },
{ $sort: { total: -1 } }
]);
// ❌ 坏的实践
// 1. $match 在最后
db.orders.aggregate([
{ $group: { _id: ‘$userId’, total: { $sum: ‘$amount’ } } },
{ $sort: { total: -1 } },
{ $match: { total: { $gte: 1000 } } } // 数据已经全部处理完了!
]);
// 2. 使用索引
// 确保 $match 中的字段有索引
db.orders.createIndex({ status: 1, created_at: -1 });
6.2 执行计划
javascript
// 查看执行计划
db.orders.aggregate(
[
{ $match: { status: ‘paid’ } },
{ $group: { _id: ‘$userId’, total: { $sum: ‘$amount’ } } }
],
{ explain: “executionStats” }
);
// 分析结果
{
“stages”: [
{
“$match”: {
“nExamined”: 10000,
“nReturned”: 500,
“totalTimeMillis”: 10
}
},
{
“$group”: {
“executionStages”: {
“nReturned”: 1000,
“totalDocsExamined”: 1000
}
}
}
],
“executionTimeMillis”: 25
}
// 优化建议:
// – nReturned << nExamined: 索引可能未使用
// - 增加 $match 阶段
// - 使用 explain 分析性能瓶颈
6.3 注意事项
javascript
// 内存限制
// 默认内存限制 100MB
db.orders.aggregate(
[
{ $group: { _id: ‘$userId’, total: { $sum: ‘$amount’ } } },
{ $sort: { total: -1 } },
{ $limit: 10 }
],
{ allowDiskUse: true } // 允许使用磁盘
);
// 管道深度限制
// 建议管道不超过 10 个阶段
// 使用 $merge 或 $out 分阶段处理
// 索引使用
// 确保 $match 中的字段有索引
db.users.createIndex({ status: 1, age: 1 });
“`
总结:MongoDB 聚合框架最佳实践
通过合理使用聚合框架:
核心优势:
- 单次查询完成复杂操作
- 减少网络往返
- 服务器端处理高效
- 灵活的数据转换
最佳实践:
- ✅ 尽早使用 $match 过滤
- ✅ 合理使用索引
- ✅ 避免不必要的 $unwind
- ✅ 使用 $allowDiskUse 处理大数据
- ✅ 使用 $explain 分析性能
性能提升:
- 查询性能提升 10 倍 +
- 网络开销减少 80%+
- 代码量减少 60%+
掌握 MongoDB 聚合框架,让你的数据查询能力达到新高度!🚀
—
参考资源:
- [MongoDB 聚合文档](https://www.mongodb.com/docs/manual/aggregation/)
- [聚合管道操作符](https://www.mongodb.com/docs/manual/reference/operator/aggregation/)
- [性能优化指南](https://www.mongodb.com/docs/manual/core/aggregation-pipeline-performance/)
- [实战案例](https://www.mongodb.com/docs/manual/aggregation-examples/)

发表评论