全部產品
Search
文件中心

ApsaraDB for SelectDB:Colocation Join

更新時間:Jul 06, 2024

本文介紹ApsaraDB for SelectDB提供的Colocation Join的原理、實現、使用方式和注意事項,作為您選擇Join方式進行查詢最佳化的參考。

概述

Colocation Join為某些Join查詢提供本地性最佳化,來減少資料在節點間的傳輸耗時,加速查詢。最初的設計、實現和效果詳情請參見ISSUE 245。Colocation Join功能經過一次改版,設計和使用方式和最初設計稍有不同。

重要

標註為可使用Colocation Join的表的這個屬性不會被CCR同步,如果這個表是被CCR複製而來的,即PROPERTIES中包含is_being_synced = true時,這個屬性將會在這個表中被擦除。

名詞解釋

  • Colocation Group(CG):一個CG中會包含一張及以上的Table。在同一個Group內的Table有著相同的Colocation Group Schema,並且有著相同的資料分區分布。

  • Colocation Group Schema(CGS):用於描述一個CG中的Table,和Colocation相關的通用Schema資訊。包括分桶列類型、分桶數等。

基本原理

Colocation Join功能,是將一組擁有相同CGS的Table組成一個CG,並保證這些Table對應的資料分區會落在同一個BE節點上,使得當CG內的表進行分桶列上的Join操作時,可以通過直接進行本機資料Join,減少資料在節點間的傳輸耗時。

一個表的資料,最終會根據分桶列值做雜湊操作、對桶數模數後落在某一個分桶內。假設一個Table的分桶數為8,則共有[0, 1, 2, 3, 4, 5, 6, 7]8個分桶(Bucket),我們稱這樣一個序列為一個BucketsSequence。每個Bucket內會有一個或多個資料分區(Tablet)。當表為單分區表時,一個Bucket內僅有一個Tablet。如果是多分區表,則會有多個。

為了使得Table能夠有相同的資料分布,同一CG內的Table必須保證分桶列和分桶數相同:分桶列,即在建表語句中DISTRIBUTED BY HASH(col1, col2, ...)中指定的列。分桶列決定了一張表的資料通過哪些列的值進行Hash劃分到不同的Tablet中。同一CG內的Table必須保證分桶列的類型和數量完全一致,並且桶數一致,才能保證多張表的資料分區能夠一一對應的進行分布控制。

同一個CG內的表,分區的個數、範圍以及分區列的類型不要求一致。

在固定了分桶列和分桶數後,同一個CG內的表會擁有相同的BucketsSequence。假設BucketsSequence為[0, 1, 2, 3, 4, 5, 6, 7],BE節點有[A, B, C, D]4個。則一個可能的資料分布如下:

+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| A | | B | | C | | D | | A | | B | | C | | D |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+

CG內所有表的資料都會按照上面的規則進行統一分布,這樣可以保證分桶列值相同的資料都在同一個BE節點上,可以進行本機資料Join。

使用方式

建立表

建表時,可以在PROPERTIES中指定屬性"colocate_with" = "group_name",表示這個表是一個Colocation Join表,並且歸屬於一個指定的Colocation Group。

樣本如下。

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
    "colocate_with" = "group1"
);

如果指定的Group不存在,則SelectDB會自動建立一個只包含當前這張表的Group。如果Group已存在,則SelectDB會檢查當前表是否滿足Colocation Group Schema。如果滿足則會建立該表,並將該表加入Group。同時,表會根據已存在的Group中的資料分布規則建立分區和副本。Group歸屬於一個Database,Group的名字在一個Database內唯一。在內部,Group的全名被儲存為dbId_groupName,但使用者只感知groupName。

SelectDB支援跨Database的Group。在建表時,需使用關鍵詞__global__作為Group名稱的首碼,樣本如下。

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
    "colocate_with" = "__global__group1"
);

__global__首碼的Group不再歸屬於一個Database,其名稱也是全域唯一的。通過建立Global Group,可以實現跨Database的Colocate Join。

刪除表

當Group中最後一張表徹底刪除後(徹底刪除是指從資源回收筒中刪除。通常,一張表通過DROP TABLE命令刪除後,會在資源回收筒預設停留一天的時間後,再刪除),該Group也會被自動刪除。

查詢表

對Colocation表的查詢方式和普通表一樣,您無需感知Colocation屬性,系統自動規劃採用Colocation Join方式。以下舉例說明。

  1. 建立表。

    建立表tbl1,樣本如下。

    CREATE TABLE `tbl1` (
        `k1` date NOT NULL COMMENT "",
        `k2` int(11) NOT NULL COMMENT "",
        `v1` int(11) SUM NOT NULL COMMENT ""
    ) ENGINE=OLAP
    AGGREGATE KEY(`k1`, `k2`)
    PARTITION BY RANGE(`k1`)
    (
        PARTITION p1 VALUES LESS THAN ('2019-05-31'),
        PARTITION p2 VALUES LESS THAN ('2019-06-30')
    )
    DISTRIBUTED BY HASH(`k2`) BUCKETS 8
    PROPERTIES (
        "colocate_with" = "group1"
    );

    建立表tbl2,樣本如下。

    CREATE TABLE `tbl2` (
        `k1` datetime NOT NULL COMMENT "",
        `k2` int(11) NOT NULL COMMENT "",
        `v1` double SUM NOT NULL COMMENT ""
    ) ENGINE=OLAP
    AGGREGATE KEY(`k1`, `k2`)
    DISTRIBUTED BY HASH(`k2`) BUCKETS 8
    PROPERTIES (
        "colocate_with" = "group1"
    );
  2. 查看查詢計劃,樣本如下。

    DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
    +----------------------------------------------------+
    | Explain String                                     |
    +----------------------------------------------------+
    | PLAN FRAGMENT 0                                    |
    |  OUTPUT EXPRS:`tbl1`.`k1` |                        |
    |   PARTITION: RANDOM                                |
    |                                                    |
    |   RESULT SINK                                      |
    |                                                    |
    |   2:HASH JOIN                                      |
    |   |  join op: INNER JOIN                           |
    |   |  hash predicates:                              |
    |   |  colocate: true                                |
    |   |    `tbl1`.`k2` = `tbl2`.`k2`                   |
    |   |  tuple ids: 0 1                                |
    |   |                                                |
    |   |----1:OlapScanNode                              |
    |   |       TABLE: tbl2                              |
    |   |       PREAGGREGATION: OFF. Reason: null        |
    |   |       partitions=0/1                           |
    |   |       rollup: null                             |
    |   |       buckets=0/0                              |
    |   |       cardinality=-1                           |
    |   |       avgRowSize=0.0                           |
    |   |       numNodes=0                               |
    |   |       tuple ids: 1                             |
    |   |                                                |
    |   0:OlapScanNode                                   |
    |      TABLE: tbl1                                   |
    |      PREAGGREGATION: OFF. Reason: No AggregateInfo |
    |      partitions=0/2                                |
    |      rollup: null                                  |
    |      buckets=0/0                                   |
    |      cardinality=-1                                |
    |      avgRowSize=0.0                                |
    |      numNodes=0                                    |
    |      tuple ids: 0                                  |
    +----------------------------------------------------+

    Colocation Join生效,則Hash Join節點會顯示colocate: true

    如果沒有生效,則查詢計劃如下。

    +----------------------------------------------------+
    | Explain String                                     |
    +----------------------------------------------------+
    | PLAN FRAGMENT 0                                    |
    |  OUTPUT EXPRS:`tbl1`.`k1` |                        |
    |   PARTITION: RANDOM                                |
    |                                                    |
    |   RESULT SINK                                      |
    |                                                    |
    |   2:HASH JOIN                                      |
    |   |  join op: INNER JOIN (BROADCAST)               |
    |   |  hash predicates:                              |
    |   |  colocate: false, reason: group is not stable  |
    |   |    `tbl1`.`k2` = `tbl2`.`k2`                   |
    |   |  tuple ids: 0 1                                |
    |   |                                                |
    |   |----3:EXCHANGE                                  |
    |   |       tuple ids: 1                             |
    |   |                                                |
    |   0:OlapScanNode                                   |
    |      TABLE: tbl1                                   |
    |      PREAGGREGATION: OFF. Reason: No AggregateInfo |
    |      partitions=0/2                                |
    |      rollup: null                                  |
    |      buckets=0/0                                   |
    |      cardinality=-1                                |
    |      avgRowSize=0.0                                |
    |      numNodes=0                                    |
    |      tuple ids: 0                                  |
    |                                                    |
    | PLAN FRAGMENT 1                                    |
    |  OUTPUT EXPRS:                                     |
    |   PARTITION: RANDOM                                |
    |                                                    |
    |   STREAM DATA SINK                                 |
    |     EXCHANGE ID: 03                                |
    |     UNPARTITIONED                                  |
    |                                                    |
    |   1:OlapScanNode                                   |
    |      TABLE: tbl2                                   |
    |      PREAGGREGATION: OFF. Reason: null             |
    |      partitions=0/1                                |
    |      rollup: null                                  |
    |      buckets=0/0                                   |
    |      cardinality=-1                                |
    |      avgRowSize=0.0                                |
    |      numNodes=0                                    |
    |      tuple ids: 1                                  |
    +----------------------------------------------------+

    Hash Join節點會顯示對應原因:colocate: false, reason: group is not stable。同時會有一個 EXCHANGE節點產生。

查看Colocate Group

查看叢集內已存在的Group資訊,樣本如下。

SHOW PROC '/colocation_group';

+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId     | GroupName    | TableIds     | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10         | 3              | int(11)  | true     |
+-------------+--------------+--------------+------------+----------------+----------+----------+

返回參數說明如下。

參數名稱

說明

GroupId

一個Group的全叢集唯一標識,前半部分為DB ID,後半部分為GROUP ID。

GroupName

Group的全名。

TabletIds

該Group包含的Table的ID列表。

BucketsNum

分桶數。

ReplicationNum

副本數。

DistCols

Distribution columns,即分桶列類型。

IsStable

該Group是否穩定(穩定的定義,見Colocation 副本均衡和修複一節)。

查看一個Group的資料分布情況,樣本如下。

SHOW PROC '/colocation_group/10005.10008';

+-------------+---------------------+
| BucketIndex | BackendIds          |
+-------------+---------------------+
| 0           | 10004               |
| 1           | 10003               |
| 2           | 10002               |
| 3           | 10003               |
| 4           | 10002               |
| 5           | 10003               |
| 6           | 10003               |
| 7           | 10003               |
+-------------+---------------------+

參數名稱

說明

BucketIndex

分桶序列的下標。

BackendIds

分桶中資料分區所在的BE節點ID列表。

說明

以上命令需要ADMIN許可權,暫不支援普通使用者查看。

修改表Colocate Group屬性

對一個已經建立的表,修改其Colocation Group屬性,樣本如下。

ALTER TABLE tbl SET ("colocate_with" = "group2");
  • 該表之前沒有指定過Group,則該命令檢查Schema,並將該表加入到該Group(Group不存在則會建立)。

  • 該表之前有指定其他Group,則該命令會先將該表從原有Group中移除,並加入新Group(Group不存在則會建立)。

也可以通過以下命令,刪除一個表的Colocation屬性,樣本如下。

ALTER TABLE tbl SET ("colocate_with" = "");

其他相關操作

當對一個具有Colocation屬性的表進行增加分區(ADD PARTITION)、修改副本數時,SelectDB會檢查修改是否會違反Colocation Group Schema,如果違反則會拒絕。

使用進階

FE配置項

  • disable_colocate_relocate

    是否關閉SelectDB的自動Colocation副本修複。預設為false,即不關閉。該參數隻影響Colocation表的副本修複,不影響普通表。

  • disable_colocate_balance

    是否關閉SelectDB的自動Colocation副本均衡。預設為false,即不關閉。該參數隻影響Colocation表的副本均衡,不影響普通表。

以上參數可以動態修改,設定方式請參閱HELP ADMIN SHOW CONFIG;HELP ADMIN SET CONFIG;

  • disable_colocate_join

    是否關閉 Colocation Join 功能。預設為false,即開啟。

  • use_new_tablet_scheduler

    是否啟用新的副本調度邏輯,預設為true,即開啟。

HTTP Restful API

SelectDB提供了幾個和Colocation Join有關的HTTP Restful API,用於查看和修改Colocation Group。

該API實現在FE端,使用fe_host:fe_http_port進行訪問。訪問使用者需要ADMIN許可權。

  • 查看叢集的全部Colocation資訊,樣本如下。

    GET /api/colocate
    
    返回以 Json 格式表示內部 Colocation 資訊。
    
    {
        "msg": "success",
        "code": 0,
        "data": {
            "infos": [
                ["10003.12002", "10003_group1", "10037, 10043", "1", "1", "int(11)", "true"]
            ],
            "unstableGroupIds": [],
            "allGroupIds": [{
                "dbId": 10003,
                "grpId": 12002
            }]
        },
        "count": 0
    }
  • 將Group標記為Stable或Unstable,樣本如下。

    • 標記為Stable。

      DELETE /api/colocate/group_stable?db_id=10005&group_id=10008
      
      返回:200
    • 標記為Unstable。

      POST /api/colocate/group_stable?db_id=10005&group_id=10008
      
      返回:200
  • 設定Group的資料分布,樣本如下。

    該介面可以強制設定某一Group的資料分布。返回結果中Body是以嵌套數組表示的BucketsSequence以及每個Bucket中分區分布所在BE的ID。

    POST /api/colocate/bucketseq?db_id=10005&group_id=10008
    
    Body:
    [[10004],[10003],[10002],[10003],[10002],[10003],[10003],[10003],[10003],[10002]]
    
    返回 200
    說明

    使用該命令,需要將FE的配置disable_colocate_relocatedisable_colocate_balance設為true,即關閉系統自動的Colocation副本修複和均衡。否則在手動修改後,被系統自動重設。

Colocation副本均衡和修複

Colocation表的副本分布需要遵循Group中指定的分布,所以在副本修複和均衡方面和普通分區有所區別。

Group自身有一個Stable屬性,當Stable屬性為true時,表示當前Group內的表的所有分區都沒有進行中變動,Colocation特性可以正常使用。當Stable為 false 時(Unstable),表示當前Group內有部分表的分區進行中修複或遷移,此時,相關表的Colocation Join將退化為普通Join。

副本修複

副本只能儲存在指定的BE節點上,所以當某個BE不可用時(宕機、Decommission 等),需要尋找一個新的BE進行替換。在這種情境下,SelectDB會優先尋找負載最低的BE進行替換。替換後,該Bucket內的所有在舊BE上的資料分區都要做修複。遷移過程中,Group被標記為Unstable。

副本均衡

SelectDB會將Colocation表的分區均勻分布在所有BE節點上。普通表的副本均衡是以單副本為粒度的,即單獨為每一個副本尋找負載較低的BE節點。而Colocation表的均衡是Bucket層級的,即一個Bucket內的所有副本都會一起遷移。

SelectDB採用一個簡單的均衡演算法,即在不考慮副本實際大小,而只根據副本數量,將BucketsSequence均勻的分布在所有BE上。

說明
  • 當前的Colocation副本均衡和修複演算法,對於異構部署的SelectDB執行個體效果可能不佳。所謂異構部署,即BE節點的磁碟容量、數量、磁碟類型(SSD和HDD)不一致。在異構部署情況下,可能出現小容量的BE節點和大容量的BE節點儲存了相同的副本數量。

  • 當一個Group處於Unstable狀態時,其中的表的Join將退化為普通Join。此時可能會極大降低叢集的查詢效能。如果不希望系統自動均衡,可以設定 FE 的配置項disable_colocate_balance來禁止自動均衡。如果需要該功能手動開啟即可。