This topic describes how to use the optimizer and the executor to process the Group-by and Order-by operators to reduce the amount of transferred data and make the execution more efficient.

Basic concept

The semantics of the Aggregate (Agg) operation is to aggregate input data by using the columns that are specified by GROUP BY, or aggregates all the data without grouping.DRDS supports the following aggregate functions:

  • COUNT
  • SUM
  • AVG
  • MAX
  • MIN
  • BIT_OR
  • BIT_XOR
  • GROUP_CONCAT

The semantics of a Sort operation is to sort input data by using the columns that are specified by ORDER BY.

The implementations mentioned in this topic are all the implementations of Agg or Sort. If an Agg or Sort operator has been pushed down to LogicalView, the storage layer MySQL selects the execution method.

Agg

Agg is implemented by two main operators: HashAgg and SortAgg.

HashAgg

HashAgg performs the following steps to use a hash table for aggregation:

  1. Executes Hash by using the value of the grouping column of the input row to find the corresponding group.
  2. Aggregates the row by using the specified aggregate function.
  3. Repeats the preceding steps until all the input rows are processed. Then, HashAgg generates the aggregation result.
> explain select count(*) from t1 join t2 on t1.id = t2.id group by t1.name,t2.name;

Project(count(*)="count(*)")
  HashAgg(group="name,name0", count(*)="COUNT()")
    BKAJoin(condition="id = id", type="inner")
      Gather(concurrent=true)
        LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
      Gather(concurrent=true)
        LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")

In the result of the Explain statement, the HashAgg operator also contains the following key information:

  • group: indicates the GROUP BY field. In this example, the name column of the t1 table is referenced by name in name,name0 and the name column of the t2 table is referenced by name0 in name,name0. The same alias is suffixed with a number for differentiation.
  • Aggregate function: The equal sign (=) follows the output column name that corresponds to the aggregate function and is followed by the corresponding calculation method. In count(*)="COUNT()" of this example, count(*) specifies the output column name and COUNT() counts the input data.

You can disable HashAgg by using the Hint: /*+TDDL:cmd_extra(ENABLE_HASH_AGG=false)*/.

SortAgg

SortAgg aggregates each group in sequence after input data has been sorted by the grouping column.

  1. SortAgg ensures that the input data is sorted by the specified column. For example, you may see MergeSort or MemSort.
  2. SortAgg reads the input data by row. If the group is the same as the current group, the data is aggregated.
  3. SortAgg generates the aggregation result of the current group if the group is different from the current group.

Compared with HashAgg, SortAgg needs to process only one group each time and consumes less memory. By contrast, HashAgg requires all the groups to be stored in the memory and consumes more memory.

> explain select count(*) from t1 join t2 on t1.id = t2.id group by t1.name,t2.name order by t1.name, t2.name;

Project(count(*)="count(*)")
  MemSort(sort="name ASC,name0 ASC")
    HashAgg(group="name,name0", count(*)="COUNT()")
      BKAJoin(condition="id = id", type="inner")
        Gather(concurrent=true)
          LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
        Gather(concurrent=true)
          LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")

You can disable SortAgg by using the Hint: /*+TDDL:cmd_extra(ENABLE_SORT_AGG=false)*/.

Two-phase aggregation optimization

Two-phase aggregation refers to the process in which Agg is split into Partial Agg and Final Agg phases. Partial result sets are aggregated into a partial aggregation result first and then all the partial aggregation results are aggregated to obtain the overall aggregation result.

In the following Structured Query Language (SQL) statement, PartialAgg split from HashAgg is pushed down to each table sharding on MySQL. In the statement, the AVG function is also split into the SUM and COUNT functions to perform computations in two phases:

> explain select avg(age) from t2 group by name

Project(avg(age)="sum_pushed_sum / sum_pushed_count")
  HashAgg(group="name", sum_pushed_sum="SUM(pushed_sum)", sum_pushed_count="SUM(pushed_count)")
    Gather(concurrent=true)
      LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `name`, SUM(`age`) AS `pushed_sum`, COUNT(`age`) AS `pushed_count` FROM `t2` AS `t2` GROUP BY `name`")

Two-phase aggregation greatly reduces the amount of transferred data and makes the execution more efficient.

Sort

In DRDS, Sort operators include MemSort, TopN, and MergeSort.

  • MemSort

    In DRDS, sorting is generally implemented as the MemSort operator that runs the Quick Sort algorithm in the memory.

    The MemSort operator is used in the following example:

    > explain select t1.name from t1 join t2 on t1.id = t2.id order by t1.name,t2.name;
    
    Project(name="name")
      MemSort(sort="name ASC,name0 ASC")
        Project(name="name", name0="name0")
          BKAJoin(condition="id = id", type="inner")
            Gather(concurrent=true)
              LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
            Gather(concurrent=true)
              LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")
  • TopN

    When both ORDER BY and LIMIT appear in SQL, the Sort operator and the Limit operator are combined as the TopN operator.

    The TopN operator maintains a maximum heap or a minimum heap. The heap always retains N maximum rows of data and N minimum rows of data by using the value of the sort key. When all the input data is processed, the N or fewer rows left in the heap are the required result.

    The TopN operator is used in the following example:

    > explain select t1.name from t1 join t2 on t1.id = t2.id order by t1.name,t2.name limit 10;
    
    Project(name="name")
      TopN(sort="name ASC,name0 ASC", offset=0, fetch=? 0)
        Project(name="name", name0="name0")
          BKAJoin(condition="id = id", type="inner")
            Gather(concurrent=true)
              LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
            Gather(concurrent=true)
              LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")
  • MergeSort

    Generally, if the semantics permits, a sorting operation in SQL is pushed down to MySQL for executionand the execution layer of DRDS performs only the final merging operation. This is implemented by MergeSort. Strictly speaking, MergeSort is not only a sort operator, but also a data redistribution operator. MergeSort is similar to the Gather operator.

    The following SQL statement sorts the t1 table.After the query optimizer of DRDS optimizes the SQL statement, the Sort operator is pushed down to each MySQL shard and the merging operation is performed only at an upper layer.

    > explain select name from t1 order by name;
    
    MergeSort(sort="name ASC")
      LogicalView(tables="t1", shardCount=2, sql="SELECT `name` FROM `t1` AS `t1` ORDER BY `name`")

    Compared with MemSort, the MergeSort algorithm can reducethe memory consumption of DRDS and make full use of computing capabilities of the MySQL layer.

Example of an optimization combination

In the following example of an optimization combination, the following optimization rules are applied:

  • Agg is pushed down through Join.
  • SortMergeJoin is selected as the Join algorithm
  • SortAgg is selected as the Agg algorithm
  • The orderly output of SortAgg is used for the sorting that is required for SortMergeJoin.
  • Two-phase Agg
  • Agg pushdown
  • Sort pushdown
>  explain select count(*) from t1 join t2 on t1.name = t2.name group by t1.name;

Project(count(*)="count(*) * count(*)0")
  SortMergeJoin(condition="name = name", type="inner")
    SortAgg(group="name", count(*)="SUM(count(*))")
      MergeSort(sort="name ASC")
        LogicalView(tables="t1", shardCount=2, sql="SELECT `name`, COUNT(*) AS `count(*)` FROM `t1` AS `t1` GROUP BY `name` ORDER BY `name`")
    SortAgg(group="name", count(*)="SUM(count(*))")
      MergeSort(sort="name ASC")
        LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `name`, COUNT(*) AS `count(*)` FROM `t2` AS `t2` GROUP BY `name` ORDER BY `name`")