MongoDB 聚合

MongoDB 的聚合操作

聚合操作处理数据记录并返回计算结果。聚合操作将来自多个 document 的值分组,并可以对分组的数据执行各种操作以返回单个结果。 MongoDB 提供了三种执行聚合的方式:聚合管道,map-reduce 函数和单一目的聚合方法。

Pipeline

Pipeline 简介

MongoDB 的聚合框架以数据处理管道(Pipeline)的概念为模型。

MongoDB 通过 db.collection.aggregate() 方法支持聚合操作。并提供了 aggregate 命令来执行 pipeline。

MongoDB Pipeline 由多个阶段(stages)组成。每个阶段在 document 通过 pipeline 时都会对其进行转换。pipeline 阶段不需要为每个输入 document 都生成一个输出 document。例如,某些阶段可能会生成新 document 或过滤 document。

同一个阶段可以在 pipeline 中出现多次,但 $out$merge,和 $geoNear 阶段除外。所有可用 pipeline 阶段可以参考:Aggregation Pipeline Stages

img

  • 第一阶段:$match 阶段按状态字段过滤 document,然后将状态等于 A 的那些 document 传递到下一阶段。
  • 第二阶段:$group 阶段按 cust_id 字段对 document 进行分组,以计算每个唯一 cust_id 的金额总和。

最基本的管道阶段提供过滤器,其操作类似于查询和 document 转换(修改输出 document 形式)。

其他管道操作提供了用于按特定字段对 document 进行分组和排序的工具,以及用于汇总数组(包括 document 数组)内容的工具。另外,管道阶段可以将运算符用于诸如计算平均值或连接字符串之类的任务。

聚合管道也可以在分片 collection 上操作。

Pipeline 优化

投影优化

Pipeline 可以确定是否仅需要 document 中必填字段即可获得结果。

Pipeline 串行优化

$project$unset$addFields$set) + $match 串行优化

对于包含投影阶段($project$unset$addFields$set),且后续跟随着 $match 阶段的 Pipeline ,MongoDB 会将所有 $match 阶段中不需要在投影阶段中计算出的值的过滤器,移动一个在投影阶段之前的新 $match 阶段。

如果 Pipeline 包含多个投影阶段 和 / 或 $match 阶段,则 MongoDB 将为每个 $match 阶段执行此优化,将每个 $match 过滤器移动到该过滤器不依赖的所有投影阶段之前。

【示例】Pipeline 串行优化示例

优化前:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{ $addFields: {
maxTime: { $max: "$times" },
minTime: { $min: "$times" }
} },
{ $project: {
_id: 1, name: 1, times: 1, maxTime: 1, minTime: 1,
avgTime: { $avg: ["$maxTime", "$minTime"] }
} },
{ $match: {
name: "Joe Schmoe",
maxTime: { $lt: 20 },
minTime: { $gt: 5 },
avgTime: { $gt: 7 }
} }

优化后:

1
2
3
4
5
6
7
8
9
10
11
{ $match: { name: "Joe Schmoe" } },
{ $addFields: {
maxTime: { $max: "$times" },
minTime: { $min: "$times" }
} },
{ $match: { maxTime: { $lt: 20 }, minTime: { $gt: 5 } } },
{ $project: {
_id: 1, name: 1, times: 1, maxTime: 1, minTime: 1,
avgTime: { $avg: ["$maxTime", "$minTime"] }
} },
{ $match: { avgTime: { $gt: 7 } } }

说明:

{ name: "Joe Schmoe" } 不需要计算任何投影阶段的值,所以可以放在最前面。

{ avgTime: { $gt: 7 } } 依赖 $project 阶段的 avgTime 字段,所以不能移动。

maxTimeminTime 字段被 $addFields 阶段所依赖,但自身不依赖其他,所以会新建一个 $match 阶段,并将其置于 $project 阶段之前。

Pipeline 并行优化

如果可能,优化阶段会将 Pipeline 阶段合并到其前身。通常,合并发生在任意序列重新排序优化之后。

$sort + $limit

$sort$limit 之前时,如果没有中间阶段修改文档数量(例如 $unwind$group),则优化程序可以将 $limit 合并到 $sort 中。如果有管道阶段更改了 $sort$limit 阶段之间的文档数,则 MongoDB 不会将 $limit 合并到 $sort 中。

【示例】$sort + $limit

优化前:

1
2
3
{ $sort : { age : -1 } },
{ $project : { age : 1, status : 1, name : 1 } },
{ $limit: 5 }

优化后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"$sort" : {
"sortKey" : {
"age" : -1
},
"limit" : NumberLong(5)
}
},
{ "$project" : {
"age" : 1,
"status" : 1,
"name" : 1
}
}
$limit + $limit

如果一个 $limit 紧随另一个 $limit,那么它们可以合并为一。

优化前:

1
2
{ $limit: 100 },
{ $limit: 10 }

优化后:

1
2
3
{
$limit: 10
}
$skip + $skip

如果一个 $skip 紧随另一个 $skip,那么它们可以合并为一。

优化前:

1
2
{ $skip: 5 },
{ $skip: 2 }

优化后:

1
2
3
{
$skip: 7
}
$match + $match

如果一个 $skip 紧随另一个 $skip ,那么它们可以通过 $and 合并为一。

优化前:

1
2
{ $match: { year: 2014 } },
{ $match: { status: "A" } }

优化后:

1
2
3
4
5
{
$match: {
$and: [{ year: 2014 }, { status: 'A' }]
}
}
$lookup + $unwind

如果一个 $unwind 紧随另一个 $lookup,并且 $unwind$lookup 的 as 字段上运行时,优化程序可以将 $unwind 合并到 $lookup 阶段。这样可以避免创建较大的中间文档。

优化前:

1
2
3
4
5
6
7
8
9
{
$lookup: {
from: "otherCollection",
as: "resultingArray",
localField: "x",
foreignField: "y"
}
},
{ $unwind: "$resultingArray"}

优化后:

1
2
3
4
5
6
7
8
9
{
$lookup: {
from: "otherCollection",
as: "resultingArray",
localField: "x",
foreignField: "y",
unwinding: { preserveNullAndEmptyArrays: false }
}
}

Pipeline 限制

结果集中的每个文档均受 BSON 文档大小限制(当前为 16 MB)

Pipeline 的内存限制为 100 MB。

Map-Reduce

聚合 pipeline 比 map-reduce 提供更好的性能和更一致的接口。

Map-reduce 是一种数据处理范式,用于将大量数据汇总为有用的聚合结果。为了执行 map-reduce 操作,MongoDB 提供了 mapReduce 数据库命令。

img

在上面的操作中,MongoDB 将 map 阶段应用于每个输入 document(即 collection 中与查询条件匹配的 document)。 map 函数分发出多个键-值对。对于具有多个值的那些键,MongoDB 应用 reduce 阶段,该阶段收集并汇总聚合的数据。然后,MongoDB 将结果存储在 collection 中。可选地,reduce 函数的输出可以通过 finalize 函数来进一步汇总聚合结果。

MongoDB 中的所有 map-reduce 函数都是 JavaScript,并在 mongod 进程中运行。 Map-reduce 操作将单个 collection 的 document 作为输入,并且可以在开始 map 阶段之前执行任意排序和限制。 mapReduce 可以将 map-reduce 操作的结果作为 document 返回,也可以将结果写入 collection。

单一目的聚合方法

MongoDB 支持一下单一目的的聚合操作:

  • db.collection.estimatedDocumentCount()
  • db.collection.count()
  • db.collection.distinct()

所有这些操作都汇总了单个 collection 中的 document。尽管这些操作提供了对常见聚合过程的简单访问,但是它们相比聚合 pipeline 和 map-reduce,缺少灵活性和丰富的功能性。

img

SQL 和 MongoDB 聚合对比

MongoDB pipeline 提供了许多等价于 SQL 中常见聚合语句的操作。

下表概述了常见的 SQL 聚合语句或函数和 MongoDB 聚合操作的映射表:

SQL Terms, Functions, and Concepts MongoDB Aggregation Operators
WHERE $match
GROUP BY $group
HAVING $match
SELECT $project
ORDER BY $sort
LIMIT $limit
SUM() $sum
COUNT() $sum$sortByCount
JOIN $lookup
SELECT INTO NEW_TABLE $out
MERGE INTO TABLE $merge (Available starting in MongoDB 4.2)
UNION ALL $unionWith (Available starting in MongoDB 4.4)

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
db.orders.insertMany([
{
_id: 1,
cust_id: 'Ant O. Knee',
ord_date: new Date('2020-03-01'),
price: 25,
items: [
{ sku: 'oranges', qty: 5, price: 2.5 },
{ sku: 'apples', qty: 5, price: 2.5 },
],
status: 'A',
},
{
_id: 2,
cust_id: 'Ant O. Knee',
ord_date: new Date('2020-03-08'),
price: 70,
items: [
{ sku: 'oranges', qty: 8, price: 2.5 },
{ sku: 'chocolates', qty: 5, price: 10 },
],
status: 'A',
},
{
_id: 3,
cust_id: 'Busby Bee',
ord_date: new Date('2020-03-08'),
price: 50,
items: [
{ sku: 'oranges', qty: 10, price: 2.5 },
{ sku: 'pears', qty: 10, price: 2.5 },
],
status: 'A',
},
{
_id: 4,
cust_id: 'Busby Bee',
ord_date: new Date('2020-03-18'),
price: 25,
items: [{ sku: 'oranges', qty: 10, price: 2.5 }],
status: 'A',
},
{
_id: 5,
cust_id: 'Busby Bee',
ord_date: new Date('2020-03-19'),
price: 50,
items: [{ sku: 'chocolates', qty: 5, price: 10 }],
status: 'A',
},
{
_id: 6,
cust_id: 'Cam Elot',
ord_date: new Date('2020-03-19'),
price: 35,
items: [
{ sku: 'carrots', qty: 10, price: 1.0 },
{ sku: 'apples', qty: 10, price: 2.5 },
],
status: 'A',
},
{
_id: 7,
cust_id: 'Cam Elot',
ord_date: new Date('2020-03-20'),
price: 25,
items: [{ sku: 'oranges', qty: 10, price: 2.5 }],
status: 'A',
},
{
_id: 8,
cust_id: 'Don Quis',
ord_date: new Date('2020-03-20'),
price: 75,
items: [
{ sku: 'chocolates', qty: 5, price: 10 },
{ sku: 'apples', qty: 10, price: 2.5 },
],
status: 'A',
},
{
_id: 9,
cust_id: 'Don Quis',
ord_date: new Date('2020-03-20'),
price: 55,
items: [
{ sku: 'carrots', qty: 5, price: 1.0 },
{ sku: 'apples', qty: 10, price: 2.5 },
{ sku: 'oranges', qty: 10, price: 2.5 },
],
status: 'A',
},
{
_id: 10,
cust_id: 'Don Quis',
ord_date: new Date('2020-03-23'),
price: 25,
items: [{ sku: 'oranges', qty: 10, price: 2.5 }],
status: 'A',
},
])

SQL 和 MongoDB 聚合方式对比:

img

MongoDB 聚合操作

MongoDB 中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果。有点类似 sql 语句中的 count(*)。

管道

整个聚合运算过程称为管道,它是由多个步骤组成,每个管道

  • 接受一系列文档(原始数据);
  • 每个步骤对这些文档进行一系列运算;
  • 结果文档输出给下一个步骤;

聚合操作的基本格式

1
2
3
pipeline = [$stage1, $stage1, ..., $stageN];

db.<集合>.aggregate(pipeline, {options});

聚合步骤

步骤 作用 SQL 等价运算符
$match 过滤 WHERE
$project 投影 AS
$sort 排序 ORDER BY
$group 分组 GROUP BY
$skip / $limit 结果限制 SKIP / LIMIT
$lookup 左外连接 LEFT OUTER JOIN
$unwind 展开数组 N/A
$graphLookup 图搜索 N/A
$facet / $bucket 分面搜索 N/A

【示例】

1
2
3
4
5
> db.collection.insertMany([{"title":"MongoDB Overview","description":"MongoDB is no sql database","by_user":"collection","tagsr":["mongodb","database","NoSQL"],"likes":"100"},{"title":"NoSQL Overview","description":"No sql database is very fast","by_user":"collection","tagsr":["mongodb","database","NoSQL"],"likes":"10"},{"title":"Neo4j Overview","description":"Neo4j is no sql database","by_user":"Neo4j","tagsr":["neo4j","database","NoSQL"],"likes":"750"}])
> db.collection.aggregate([{$group : {_id : "$by_user", num_tutorial : {$sum : 1}}}])
{ "_id" : null, "num_tutorial" : 3 }
{ "_id" : "Neo4j", "num_tutorial" : 1 }
{ "_id" : "collection", "num_tutorial" : 2 }

下表展示了一些聚合的表达式:

表达式 描述 实例
$sum 计算总和。 db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$sum : "$likes"}}}])
$avg 计算平均值 db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$avg : "$likes"}}}])
$min 获取集合中所有文档对应值得最小值。 db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$min : "$likes"}}}])
$max 获取集合中所有文档对应值得最大值。 db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$max : "$likes"}}}])
$push 在结果文档中插入值到一个数组中。 db.mycol.aggregate([{$group : {_id : "$by_user", url : {$push: "$url"}}}])
$addToSet 在结果文档中插入值到一个数组中,但不创建副本。 db.mycol.aggregate([{$group : {_id : "$by_user", url : {$addToSet : "$url"}}}])
$first 根据资源文档的排序获取第一个文档数据。 db.mycol.aggregate([{$group : {_id : "$by_user", first_url : {$first : "$url"}}}])
$last 根据资源文档的排序获取最后一个文档数据 db.mycol.aggregate([{$group : {_id : "$by_user", last_url : {$last : "$url"}}}])

MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
db.runCommand({  
mapreduce:<collection>, //需要进行处理的集合名
map:<mapfunction>, //映射函数(分组)
reduce:<reducefunction>, //统计函数
[,query:<query filter object>] //,在发往map函数之前,对文档进行过滤
[,sort:<sorts the input objects using this key.Useful for optimization,like sorting by the emit key for fewer reduces>] //在发往map函数之前,对文档进行排序
[,limit:<number of objects to return from collection>] //限制发往map函数的文档数量
[,out:<see output options below>] //新建集合,用于存放统计结果
[,keeptemp:<true|false>] //是否保存统计结果为临时集合
[,finalize:<finalizefunction>] //最终处理函数,对reduce返回结果(存入out之前)进行最终处理
[,scope:<object where fields go into javascript global scope>] //向map、reduce、finalize导入外部变量
[,verbose:true] //详细的统计信息,用于调试
});

map函数(分组) ,调用emit(key, values),遍历collection中所有的记录,其中emit中的key是分组依据,values是分组后需要保留的数据;key对应最后结果集中的_id。

reduce函数(聚合) ,接收map函数的key,values作为参数,将key-values变成key-value,将values数组变成一个个单一的value。当key-values中的values数组过大时,会再被切分成多个小的key-values,再对这些小的key-values分别执行reduce,再将多个块的结构组合成一个新的数组,作为redus函数的第二个参数,继续reduce操作。

group,aggegate,MapReduce比较

group aggregate MapReduce
是否使用JavaScript引擎 是,定制reduce函数 是,不能编写自定义函数
返回结果集保存位置 内联,结果必须符合BSON文档限制 内联,服务器支持最大文档大小(16Mb) 内联,新集合,合并,替换,减少
处理数据大小 小于10000 操作在内存中完成,有内存大小的限制, 大型数据集,超过20000的独立分组
处理性能 低于aggregate 较高,管道可以重复使用 低于aggregate
灵活度 低于mapReduce 低于MapReduce 较高,能使用JavaScript