全部產品
Search
文件中心

PolarDB:查詢執行器介紹

更新時間:Jul 06, 2024

本文介紹PolarDB-X的SQL執行器的概念、執行模型和執行模式。

基本概念

SQL執行器是PolarDB-X中執行計算層運算元的組件。對於簡單的點查SQL,往往可以整體下推儲存層MySQL執行,因而感覺不到執行器的存在,MySQL的結果經過簡單的解包封包又被回傳給使用者。但是對於較複雜的SQL,往往無法將SQL中的運算元全部下推,這時候就需要PolarDB-X執行器執行無法下推的計算。

SELECT l_orderkey, sum(l_extendedprice *(1 - l_discount)) AS revenue
FROM CUSTOMER, ORDERS, LINEITEM
WHERE c_mktsegment = 'AUTOMOBILE'
  and c_custkey = o_custkey
  and l_orderkey = o_orderkey
  and o_orderdate < '1995-03-13'
  and l_shipdate > '1995-03-13'
GROUP BY l_orderkey;

通過EXPLAIN命令看到PolarDB-X的執行計畫如下:

HashAgg(group="l_orderkey", revenue="SUM(*)")
  HashJoin(condition="o_custkey = c_custkey", type="inner")
    Gather(concurrent=true)
      LogicalView(tables="ORDERS_[0-7],LINEITEM_[0-7]", shardCount=8, sql="SELECT `ORDERS`.`o_custkey`, `LINEITEM`.`l_orderkey`, (`LINEITEM`.`l_extendedprice` * (? - `LINEITEM`.`l_discount`)) AS `x` FROM `ORDERS` AS `ORDERS` INNER JOIN `LINEITEM` AS `LINEITEM` ON (((`ORDERS`.`o_orderkey` = `LINEITEM`.`l_orderkey`) AND (`ORDERS`.`o_orderdate` < ?)) AND (`LINEITEM`.`l_shipdate` > ?))")
    Gather(concurrent=true)
      LogicalView(tables="CUSTOMER_[0-7]", shardCount=8, sql="SELECT `c_custkey` FROM `CUSTOMER` AS `CUSTOMER` WHERE (`c_mktsegment` = ?)")

LogicalView的SQL在執行時被下發給MySQL,而不能下推的部分(除LogicalView以外的運算元)由PolarDB-X執行器進行計算,得到終端使用者SQL需要的結果。

執行模型

與傳統資料庫採用Volcano執行模型不同,PolarDB-X採用的是Pull~Push混合執行模型。所有運算元按照計算過程中是否需要緩衝暫存資料表,將執行過程切分成多個pipeline,pipeline內部採用next()介面,按批擷取資料完成在pipeline內部的計算。pipeline間採用push介面,上遊pipeline在計算完成後,會將資料來源源不斷推送給下遊pipeline做計算。

計算被切分成兩個pipeline,在pipeline-A中掃描Table-A資料,完成構建雜湊表。Pipeline-B掃描Table-B的資料,然後在HashJoin運算元內部做關聯得到JOIN結果,再返回用戶端。

執行模式

目前PolarDB-X支援了三種執行模式:

  • 單機單線程(TP_LOCAL):查詢過程是單線程計算,TP負載的查詢涉及到的掃描行數比較少,往往會採用這種執行模式,比如基於主鍵的點查。

  • 單機並行(AP_LOCAL):查詢過程中會利用節點的多核資源做並行計算,如果您沒有配置唯讀執行個體,針對AP負載的查詢,往往會採用這種執行模式,一般也稱之為Parallel Query模式。

  • 多機並行(MPP):您如果配置了唯讀執行個體,針對AP負載的查詢,可以協調唯讀執行個體上多個節點的多核做分布式多機並行加速。

為了準確查看執行模式,在原有EXPLAIN和執行計畫的基礎上,擴充了 EXPLAIN PHYSICAL,例如以下查詢,在返回資訊中可以查看當前查詢採用的是MPP模式,此外還可以擷取到每個執行片段的並發數。

explain physical select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 or
der by cnt limit 5, 10;

返回執行計畫資訊如下:

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN                                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorType: MPP                                                                                                                                                 |
| The Query's MaxConcurrentParallelism: 2                                                                                                                           |
| Fragment 1                                                                                                                                                        |
|     Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT]                                                                                       |
|     Output partitioning: SINGLE [] Parallelism: 1                                                                                                                 |
|     TopN(sort="cnt ASC", offset=?2, fetch=?3)                                                                                                                     |
|   Filter(condition="cnt > ?1")                                                                                                                                    |
|     HashAgg(group="k", cnt="COUNT()")                                                                                                                             |
|       BKAJoin(condition="k = id", type="inner")                                                                                                                   |
|         RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k))                                                             |
|         Gather(concurrent=true)                                                                                                                                   |
|           LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 0                                                                                                                                                        |
|     Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT]                                                                                       |
|     Output partitioning: SINGLE [] Parallelism: 1 Splits: 16                                                                                                      |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)")                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+

同樣的也允許您通過HINT EXECUTOR_MODE語句指定執行模式。例如主執行個體空閑資源很多,可以考慮強制設定為單機或者多機並行模式來加速。

explain physical /*+TDDL:EXECUTOR_MODE=AP_LOCAL*/select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10;     

返回執行計畫資訊如下:

+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorMode: AP_LOCAL                                                                                                                                      |
| Fragment 0 dependency: [] parallelism: 4                                                                                                                    |
| BKAJoin(condition="k = id", type="inner")                                                                                                                   |
|   Gather(concurrent=true)                                                                                                                                   |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)")               |
|   Gather(concurrent=true)                                                                                                                                   |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 1 dependency: [] parallelism: 8                                                                                                                    |
| LocalBuffer                                                                                                                                                 |
|   RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0))                                        |
| Fragment 2 dependency: [0, 1] parallelism: 8                                                                                                                |
| Filter(condition="cnt > ?1")                                                                                                                                |
|   HashAgg(group="k", cnt="COUNT()")                                                                                                                         |
|     RemoteSource(sourceFragmentIds=[1], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0))                                      |
| Fragment 3 dependency: [0, 1] parallelism: 1                                                                                                                |
| LocalBuffer                                                                                                                                                 |
|   RemoteSource(sourceFragmentIds=[2], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt))                                                                      |
| Fragment 4 dependency: [2, 3] parallelism: 1                                                                                                                |
| TopN(sort="cnt ASC", offset=?2, fetch=?3)                                                                                                                   |
|   RemoteSource(sourceFragmentIds=[3], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt))                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+

多機並存執行模式的並發度是根據物理掃描行數、執行個體規格和計算所涉及到表的分表數計算得出的,整體的並行度要考慮高並發情境,所以並行度的計算會偏保守,您可以通過上述EXPLAIN PHYSICAL指令查看並行度。也同樣支援HINT MPP_PARALLELISM強制指定並行度。

/*+TDDL:EXECUTOR_MODE=MPP MPP_PARALLELISM=8*/select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10;