This topic describes how the query executor of PolarDB-X executes SQL queries that cannot be pushed down.

Query executor

The SQL query executor is a PolarDB-X component that is used to execute logical operators. In most cases, point queries are pushed down to MySQL at the storage layer and are then executed. PolarDB-X does not need the executor to handle simple point queries. The execution results are decoded, encoded again, and then returned to users. However, some complex SQL queries cannot be pushed down to MySQL. PolarDB-X must use the executor to handle these queries.
SELECT l_orderkey, sum(l_extendedprice *(1 - l_discount)) AS revenue
FROM CUSTOMER, ORDERS, LINEITEM
WHERE c_mktsegment = 'AUTOMOBILE'
  and c_custkey = o_custkey
  and l_orderkey = o_orderkey
  and o_orderdate < '1995-03-13'
  and l_shipdate > '1995-03-13'
GROUP BY l_orderkey;
You can execute the following EXPLAIN statement to query the information about a PolarDB-X execution plan:
HashAgg(group="l_orderkey", revenue="SUM(*)")
  HashJoin(condition="o_custkey = c_custkey", type="inner")
    Gather(concurrent=true)
      LogicalView(tables="ORDERS_[0-7],LINEITEM_[0-7]", shardCount=8, sql="SELECT `ORDERS`.`o_custkey`, `LINEITEM`.`l_orderkey`, (`LINEITEM`.`l_extendedprice` * (? - `LINEITEM`.`l_discount`)) AS `x` FROM `ORDERS` AS `ORDERS` INNER JOIN `LINEITEM` AS `LINEITEM` ON (((`ORDERS`.`o_orderkey` = `LINEITEM`.`l_orderkey`) AND (`ORDERS`.`o_orderdate` < ?)) AND (`LINEITEM`.`l_shipdate` > ?))")
    Gather(concurrent=true)
      LogicalView(tables="CUSTOMER_[0-7]", shardCount=8, sql="SELECT `c_custkey` FROM `CUSTOMER` AS `CUSTOMER` WHERE (`c_mktsegment` = ?)")

The queries in LogicalView are pushed down to MySQL and are then executed. The queries that cannot be pushed down are executed by the executor of PolarDB-X. Then, the executor returns the result to the user.

Execution model

Traditional databases use the Volcano model. However, PolarDB-X uses a hybrid model that combines the PULL and PUSH methods. PolarDB-X splits an execution plan into multiple pipelines based on whether temporary tables are required to cache data. A pipeline uses the NEXT() method to retrieve data for completing the calculation inside the pipeline. A pipeline uses the PUSH method to push calculation results to downstream pipelines. In the following example, an execution plan is split into two pipelines. Pipeline A scans Table A and then builds a hash table. Pipeline B scans Table B, uses HASH JOIN to join the hashed table pushed by Pipeline A and Table B, and then returns the result to the client.

Execution mode

PolarDB-X supports the following execution modes:
  • TP_LOCAL: A query is processed by using a single thread on a single node. The TP_LOCAL mode is suitable for queries related to transaction processing because these queries involve only a small number of rows. For example, this mode is applicable to point queries based on primary keys.
  • AP_LOCAL: A query is processed by using multiple CPU cores of a single node in parallel. The AP_LOCAL mode is suitable for queries related to analytical processing if you do not have a read-only instance. This mode is also known as the parallel query mode.
  • MPP: A query is processed by using multiple CPU cores across different nodes of a read-only instance in parallel. This mode accelerates queries and is suitable for queries related to analytical processing.
PolarDB-X introduces the EXPLAIN PHYSICAL statement that can be used to query the execution mode of an execution plan. In the following example, the result of the EXPLAIN PHYSICAL statement shows that the execution mode of the current query is MPP. The result also shows the degree of parallelism of each fragment.
mysql> explain physical select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 or
der by cnt limit 5, 10;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN                                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorType: MPP                                                                                                                                                 |
| The Query's MaxConcurrentParallelism: 2                                                                                                                           |
| Fragment 1                                                                                                                                                        |
|     Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT]                                                                                       |
|     Output partitioning: SINGLE [] Parallelism: 1                                                                                                                 |
|     TopN(sort="cnt ASC", offset=?2, fetch=?3)                                                                                                                     |
|   Filter(condition="cnt > ?1")                                                                                                                                    |
|     HashAgg(group="k", cnt="COUNT()")                                                                                                                             |
|       BKAJoin(condition="k = id", type="inner")                                                                                                                   |
|         RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k))                                                             |
|         Gather(concurrent=true)                                                                                                                                   |
|           LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 0                                                                                                                                                        |
|     Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT]                                                                                       |
|     Output partitioning: SINGLE [] Parallelism: 1 Splits: 16                                                                                                      |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)")                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
You can execute HINT EXECUTOR_MODE to specify an execution mode. For example, if the primary instance has a large amount of idle resources, you can set the execution mode to AP_LOCAL or MPP. This improves the execution efficiency.
mysql> explain physical /*+TDDL:EXECUTOR_MODE=AP_LOCAL*/select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10;                                                                                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorMode: AP_LOCAL                                                                                                                                      |
| Fragment 0 dependency: [] parallelism: 4                                                                                                                    |
| BKAJoin(condition="k = id", type="inner")                                                                                                                   |
|   Gather(concurrent=true)                                                                                                                                   |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)")               |
|   Gather(concurrent=true)                                                                                                                                   |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 1 dependency: [] parallelism: 8                                                                                                                    |
| LocalBuffer                                                                                                                                                 |
|   RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0))                                        |
| Fragment 2 dependency: [0, 1] parallelism: 8                                                                                                                |
| Filter(condition="cnt > ?1")                                                                                                                                |
|   HashAgg(group="k", cnt="COUNT()")                                                                                                                         |
|     RemoteSource(sourceFragmentIds=[1], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0))                                      |
| Fragment 3 dependency: [0, 1] parallelism: 1                                                                                                                |
| LocalBuffer                                                                                                                                                 |
|   RemoteSource(sourceFragmentIds=[2], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt))                                                                      |
| Fragment 4 dependency: [2, 3] parallelism: 1                                                                                                                |
| TopN(sort="cnt ASC", offset=?2, fetch=?3)                                                                                                                   |
|   RemoteSource(sourceFragmentIds=[3], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt))                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
The degree of parallelism in MPP mode is calculated based on the number of rows scanned in the physical table, the instance specifications, and the number of table shards involved. The degree of parallelism is high in most cases because the system must support high concurrency scenarios. You can execute EXPLAIN PHYSICAL to query the degree of parallelism. You can execute HINT MPP_PARALLELISM to specify the degree of parallelism.
/*+TDDL:EXECUTOR_MODE=MPP MPP_PARALLELISM=8*/select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10;