全部产品
Search
文档中心

优化聚合与排序

更新时间: 2020-08-15

基本概念

聚合操作(Aggregate,简称Agg) 语义为按照 GROUP BY 指定列对输入数据进行聚合计算,或者不分组、对所有数据进行聚合计算。DRDS支持如下聚合函数:

  • COUNT
  • SUM
  • AVG
  • MAX
  • MIN
  • BIT_OR
  • BIT_XOR
  • GROUP_CONCAT

排序操作(Sort)语义为按照指定的ORDER BY列对输入进行排序。

本章节中提到的均为不下推的 Agg 或 Sort 的实现。如果已被下推到 LogicalView 中,则由 MySQL 来选择执行方式。

聚合(Agg)

聚合(Agg)主要有 HashAgg 和 SortAgg 两种实现。

HashAgg

HashAgg 利用哈希表实现聚合:

  1. 根据输入行的分组列的值,通过Hash找到对应的分组
  2. 按照指定的聚合函数,对该行进行聚合计算
  3. 重复以上步骤直到处理完所有的输入行,最后输出聚合结果
  1. > explain select count(*) from t1 join t2 on t1.id = t2.id group by t1.name,t2.name;
  2. Project(count(*)="count(*)")
  3. HashAgg(group="name,name0", count(*)="COUNT()")
  4. BKAJoin(condition="id = id", type="inner")
  5. Gather(concurrent=true)
  6. LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
  7. Gather(concurrent=true)
  8. LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")

Explain 结果中,HashAgg 算子还包含以下关键信息:

  • group:表示 GROUP BY 字段,示例中为name,name0分别引用t1,t2表的name列,当存在相同别名会通过后缀数字区分 。
  • 聚合函数= 前为聚合函数对应的输出列名,其后为对应的计算方法。示例中 count(*)="COUNT()" ,第一个 count(*) 对应输出的列名,随后的 COUNT() 表示对其输入数据进行计数。

HashAgg对应可以通过Hint来关闭:/*+TDDL:cmd_extra(ENABLE_HASH_AGG=false)*/

SortAgg

SortAgg在输入数据已按分组列排序的情况,对各个分组依次完成聚合。

  1. 保证输入按指定的分组列排序(例如,可能会看到 MergeSort 或 MemSort)。
  2. 逐行读入输入数据,如果分组与当前分组相同,则对其进行聚合计算。
  3. 否则,如果分组与当前分组不同,则输出当前分组上的聚合结果。

相比 HashAgg,SortAgg 每次只要处理一个分组,内存消耗很小;相对的,HashAgg 需要把所有分组放在内存中,需要消耗较多的内存。

  1. > explain select count(*) from t1 join t2 on t1.id = t2.id group by t1.name,t2.name order by t1.name, t2.name;
  2. Project(count(*)="count(*)")
  3. MemSort(sort="name ASC,name0 ASC")
  4. HashAgg(group="name,name0", count(*)="COUNT()")
  5. BKAJoin(condition="id = id", type="inner")
  6. Gather(concurrent=true)
  7. LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
  8. Gather(concurrent=true)
  9. LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")

SortAgg对应可以通过Hint来关闭:/*+TDDL:cmd_extra(ENABLE_SORT_AGG=false)*/

两阶段聚合优化

两阶段聚合,即通过将Agg拆分为部分聚合(Partial Agg)和最终聚合(Final Agg)的两个阶段,先对部分结果集做聚合,然后将这些部分聚合结果汇总,得到整体聚合的结果。

例如下面的 SQL 中,HashAgg 中拆分出的部分聚合(PartialAgg)会被下推至MySQL上的各个分表,而其中的AVG函数也被拆分成 SUMCOUNT 以实现两阶段的计算:

  1. > explain select avg(age) from t2 group by name
  2. Project(avg(age)="sum_pushed_sum / sum_pushed_count")
  3. HashAgg(group="name", sum_pushed_sum="SUM(pushed_sum)", sum_pushed_count="SUM(pushed_count)")
  4. Gather(concurrent=true)
  5. LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `name`, SUM(`age`) AS `pushed_sum`, COUNT(`age`) AS `pushed_count` FROM `t2` AS `t2` GROUP BY `name`")

两阶段聚合的优化能大大减少数据传输量、提高执行效率。

排序(Sort)

DRDS 中的排序算子主要包括 MemSort、TopN,以及 MergeSort。

MemSort

DRDS中的通用的排序实现为MemSort算子,即内存中运行快速排序(Quick Sort)算法。

下面是一个用到MemSort算子的例子:

  1. > explain select t1.name from t1 join t2 on t1.id = t2.id order by t1.name,t2.name;
  2. Project(name="name")
  3. MemSort(sort="name ASC,name0 ASC")
  4. Project(name="name", name0="name0")
  5. BKAJoin(condition="id = id", type="inner")
  6. Gather(concurrent=true)
  7. LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
  8. Gather(concurrent=true)
  9. LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")

TopN

当SQL中 ORDER BY 和 LIMIT 一起出现时,Sort算子和Limit算子会合并成TopN算子。

TopN算子维护一个最大/最小堆,按照排序键的值,堆中始终保留最大/最小的N行数据。当处理完全部的输入数据时,堆中留下的N个行(或小于N个)就是需要的结果。

下面是一个用到 TopN 算子的例子:

  1. > explain select t1.name from t1 join t2 on t1.id = t2.id order by t1.name,t2.name limit 10;
  2. Project(name="name")
  3. TopN(sort="name ASC,name0 ASC", offset=0, fetch=?0)
  4. Project(name="name", name0="name0")
  5. BKAJoin(condition="id = id", type="inner")
  6. Gather(concurrent=true)
  7. LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
  8. Gather(concurrent=true)
  9. LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")

MergeSort

通常,只要语义允许,SQL中的排序操作会被下推到MySQL上执行,而DRDS执行层只做最后的归并操作,即MergeSort。严格来说,MergeSort 不仅仅是排序,更是一种数据重分布算子(类似 Gather)。

下面的SQL是对t1表进行排序,经过DRDS查询优化器的优化,Sort算子被下推至各个MySQL分片中执行,最终只在上层做归并操作。

  1. > explain select name from t1 order by name;
  2. MergeSort(sort="name ASC")
  3. LogicalView(tables="t1", shardCount=2, sql="SELECT `name` FROM `t1` AS `t1` ORDER BY `name`")

相比 MemSort,MergeSort 算法可以减少DRDS层的内存消耗,并充分利用 MySQL 层的计算能力。

优化组合的例子

下面是一个组合优化的例子,在这个例子中,用到了以下优化规则:

  • Agg下推穿过Join
  • Join算法选择为SortMergeJoin
  • Agg算法选择为SortAgg
  • SortMergeJoin中需要的排序利用了SortAgg输出的有序
  • 两阶段Agg
  • Agg下推
  • Sort下推
  1. > explain select count(*) from t1 join t2 on t1.name = t2.name group by t1.name;
  2. Project(count(*)="count(*) * count(*)0")
  3. SortMergeJoin(condition="name = name", type="inner")
  4. SortAgg(group="name", count(*)="SUM(count(*))")
  5. MergeSort(sort="name ASC")
  6. LogicalView(tables="t1", shardCount=2, sql="SELECT `name`, COUNT(*) AS `count(*)` FROM `t1` AS `t1` GROUP BY `name` ORDER BY `name`")
  7. SortAgg(group="name", count(*)="SUM(count(*))")
  8. MergeSort(sort="name ASC")
  9. LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `name`, COUNT(*) AS `count(*)` FROM `t2` AS `t2` GROUP BY `name` ORDER BY `name`")