全部產品
Search
文件中心

DataWorks:Hologres單表即時同步至Kafka

更新時間:Jun 21, 2025

Data Integration目前支援將DataHub、Hologres等源頭的資料單表即時同步至Kafka。即時ETL同步任務根據來源Hologres表結構對目標Kafka的Topic進行初始化,將Hologres資料即時同步至Kafka以供消費。本文為您介紹如何配置Hologres單表即時同步到Kafka。

使用限制

  • Kafka的版本需在0.10.2至3.6.0之間。

  • Hologres的版本要求必須為2.1及以上。

  • 不支援Hologres分區表的增量同步處理。

  • 不支援Hologres表DDL變更訊息同步。

  • Hologres增量同步處理支援的資料類型包括:INTEGER、BIGINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、JSON、SERIAL、OID、INT4[]、INT8[]、FLOAT8[]、BOOLEAN[]、TEXT[]、JSONB。

  • 需開啟源端的Hologres資料庫的表Hologres Binlog,詳情可參見訂閱Hologres Binlog

前提條件

操作步驟

一、選擇同步任務類型

  1. 進入Data Integration頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的Data Integration > Data Integration,在下拉框中選擇對應工作空間後單擊進入Data Integration

  2. 在左側導覽列單擊同步任務,然後在頁面頂部單擊建立同步任務,進入同步任務的建立頁面,配置如下基本資料。

    • 資料來源和去向HologresKafka

    • 新任務名稱:自訂同步任務名稱。

    • 同步類型單表即時

    • 同步步驟:選擇全量同步

二、網路與資源配置

  1. 網路與資源配置地區,選擇同步任務所使用的資源群組。您可以為該任務分配任務資源佔用CU數。

  2. 來來源資料源選擇已添加的Hologres資料來源,去向資料來源選擇已添加的Kafka資料來源後,單擊測試連通性image

  3. 確保來來源資料源與去向資料來源均連通成功後,單擊下一步

三、配置同步鏈路

1、配置Hologres來源

在頁面上方單擊資料來源Hologres,編輯Holo來源資訊

image

  1. Holo來源資訊地區,選擇要讀取的Hologres表所在的Schema,以及來源表。

  2. 單擊右上方的資料採樣

    資料輸出預覽對話方塊中指定好採樣條數,單擊開始採集按鈕,可以對指定的Hologres進行資料採樣,預覽Hologres中的資料,為後續資料處理節點的資料預覽和可視化配置提供輸入。

2、配置Kafka去向資訊

在頁面上方單擊資料去向Kafka,編輯Kafka去向資訊

image

  1. Kafka去向資訊地區,選擇要寫入的Kafka主題(Topic)。

  2. 按需設定是否合并來源Binglog Update訊息,開啟後會將源端Binlog的一個Update動作對應的兩條Update訊息合并為一條訊息寫入。

  3. 設定輸出格式鍵取值列以及Kafka Producer參數

    • 輸出格式:確認寫入Kafka記錄的Value內容格式,支援Canal CDC和JSON。的詳細資料,請參見附錄:輸出格式說明

    • 鍵取值列:選取的是源端列,對應列值會序列化為字串後,用逗號拼接作為寫入Kafka Topic中記錄的key。

      說明
      • 列值序列化規則與JSON中的Hologres中列類型序列化規則說明一致。

      • Kafka Topic中記錄的Key值決定寫入的分區,相同Key值寫入同一分區,為了保證下遊消費Kafka Topic時資料能夠保持順序,建議選擇Hologres表主鍵作為鍵取值列。

      • 如果不選擇任何源側列作為鍵取值列,Kafka Topic 中記錄的Key值為null,會導致Kafka寫入分區呈隨機寫入。

    • Kafka Producer參數:是影響寫入一致性、穩定性和異常處理行為的參數,一般情況預設配置即可,如有定製化需求可以指定特定參數,各個Kafka叢集版本支援的Producer參數可參考Kafka官方文檔

四、警示配置

為避免任務出錯導致業務資料同步延遲,您可以對同步任務設定警示策略。

  1. 單擊頁面右上方的警示配置,進入即時子任務警示設定頁面。

  2. 單擊新增警示,配置警示規則。

    說明

    此處定義的警示規則,將對該任務產生的即時同步子任務生效,您可在任務配置完成後,進入即時同步任務運行與管理介面查看並修改該即時同步子任務的監控警示規則。

  3. 管理警示規則。

    對於已建立的警示規則,您可以通過警示開關控制警示規則是否開啟,同時,您可以根據警示層級將警示發送給不同的人員。

五、進階參數配置

同步任務提供部分參數可供修改,您可以按需對該參數值進行修改。

說明

請在完全瞭解對應參數含義的情況下再進行修改,以免產生不可預料的錯誤或者資料品質問題。

  1. 單擊介面右上方的進階參數配置,進入進階參數配置頁面。

  2. 進階參數配置頁面修改相關參數值。

六、資源群組配置

您可以單擊介面右上方的資源群組配置,查看並切換當前的任務所使用的資源群組。

七、執行同步任務

  1. 完成所有配置後,單擊頁面底部的完成配置

  2. Data Integration > 同步任務介面,找到已建立的同步任務,單擊操作列的啟動

  3. 單擊工作清單中對應任務的名稱/ID,查看任務的詳細執行過程。

同步任務營運

查看任務運行狀態

建立完成同步任務後,您可以在同步任務頁面查看當前已建立的同步工作清單及各個同步任務的基本資料。

image

  • 您可以在操作列啟動停止同步任務,在更多中可以對同步任務進行編輯查看等操作。

  • 已啟動的任務您可以在執行概況中看到任務啟動並執行基本情況,也可以單擊對應的概況地區查看執行詳情。

image

Hologres到Kafka的單表即時同步任務分為三個步驟:

  • 結構遷移:包含目標表的建立方式(已有表或自動建表),如果是自動建表,將會為您展示建表的DDL。

  • 全量初始化:如果您的任務同步步驟選擇了全量同步,此處將展示全量初始化進度。

  • 即時資料同步:包含即時同步的統計資訊,包含即時的讀寫流量、髒資料、Failover和作業記錄。

任務重跑

在某些特殊情況下,如果需要修改同步欄位、調整目標表欄位或表名資訊時,您還可以單擊同步任務操作列的重跑,系統會將調整的欄位、變更的目標包等資訊進行同步,之前同步過未修改的表將不會再進行同步。

  • 不修改任務配置,直接單擊重跑操作,重新運行一次同步任務。

  • 編輯任務,進行修改操作後,單擊完成配置。此時任務的操作會變成應用程式更新,單擊應用程式更新會直接觸發修改後的任務重跑。即時同步任務會按照新的配置運行。

附錄:輸出格式說明

Canal CDC

Canal CDC是Alibaba Canal定義的一種CDC資料格式。

  • 欄位及含義

    欄位名

    欄位值含義

    id

    固定取值為0。

    database

    Hologres資料庫名稱。

    table

    Hologres表名稱。

    pkNames

    Hologres表主鍵列名。

    isDdl

    Binlog記錄是否對應DDL變更,由於不支援同步Hologres表DDL變更,該欄位始終取值false。

    type

    DML變更類型,可選值為INSERT/UPDATE/DELETE,對應插入、更新和刪除。

    說明

    注意Hologres表中的一次更新會產生兩條type為UPDATE的記錄寫入Kafka Topic中

    • 一條對應更新前的資料內容。

    • 一條對應更新後的資料內容。

    • 對全量同步資料,type固定取值為INSERT。

    es

    13位毫秒時間戳記,對應Hologres表對應的資料變更發生時間。

    在全量同步資料時,es固定取值為0。

    ts

    13位毫秒時間戳記,對應Hologres BInlog記錄被同步任務讀到的時間。

    sql

    Binlog記錄在上遊資料來源DDL變更時,記錄DDL變更的SQL內容,但由於不支援同步Hologres表DDL變更,該欄位始終取Null 字元串。

    sqlType

    Hologres表中各欄位的資料類型對應的SQL欄位類型。

    各Hologres資料類型對應sqlType取值如下:

    • bigint:-5

    • scale非0的decimal:3

    • scale為0的decimal:-5

    • boolean:16

    • date:91

    • float4:6

    • float8:8

    • integer:4

    • smallint:5

    • json:12

    • text:12

    • varchar:12

    • timestamp:93

    • timestamptz:93

    • bigserial:-5

    • bytea:12

    • char:12

    • serial:4

    • time:92

    • int4[]:12

    • int8[]:12

    • float4[]:12

    • float8[]:12

    • boolean[]:12

    • text[]:12

    mysqlType

    Hologres表各欄位的資料類型對應的MySQL欄位類型。

    各Hologres資料類型對應MySQL欄位類型取值如下:

    • bigint:BIGINT

    • int4:INT

    • scale非0的decimal:DECIMAL(xx,xx)

    • scale為0的decimal:BIGINT

    • boolean:BOOLEAN

    • date:DATE

    • float4:FLOAT

    • float8:DOUBLE

    • integer:INT

    • smallint:SMALLINT

    • json:TEXT

    • text:TEXT

    • varchar:VARCHAR(xx)

    • timestamp:DATETIME(6)

    • timestamptz:DATETIME(6)

    • bigserial:BIGINT

    • bytea:TEXT

    • char:TEXT

    • serial:INT

    • time:TIME(6)

    • int4[]:TEXT

    • int8[]:TEXT

    • float4[]:TEXT

    • float8[]:TEXT

    • boolean[]:TEXT

    • text[]:TEXT

    data

    資料變更內容,以表列名作為key,將表列變更資料內容序列化為字串後作為value組裝為JSON格式字串。具體序列化方式見下文JSON序列化說明

    old

    Hologres表中的一次更新會產生兩條type為UPDATE的記錄寫入Kafka Topic中。

    變更前資料的記錄,使用old欄位記錄變更前資料內容,其他類型DML變更都使用data欄位記錄資料內容。

  • Hologres Binlog INSERT DML資料對應的Canal JSON格式資料樣本

    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "INSERT",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }
  • Hologres全量同步資料對應的Canal JSON格式資料樣本

    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "INSERT",
        "es": 0,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }
  • Hologres Binlog UPDATE DML資料對應的兩條Canal JSON格式資料樣本

    //變更前資料
    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "UPDATE",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "old": [
            {
                "bigint": "0",
                "integer": "0",
                "smallint": "0"
            }
        ],
        "data": null
    }
    //變更後資料
    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "UPDATE",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }
  • Hologres Binlog DELETE DML資料對應Canal JSON格式資料樣本

    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "DELETE",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }

Json

Json是將Hologres Binlog中的變更記錄,以列表名作為Key,將列表的資料內容序列化為字串後作為value,組裝為JSON格式字串寫入Kafka Topic中。

JSON 序列化說明

Hologres中欄位類型序列化說明

Hologres欄位類型

寫入Kafka序列化結果

bit

不支援,任務啟動報錯。

inet

不支援,任務啟動報錯。

interval

不支援,任務啟動報錯。

money

不支援,任務啟動報錯。

oid

不支援,任務啟動報錯。

timetz

不支援,任務啟動報錯。

uuid

不支援,任務啟動報錯。

varbit

不支援,任務啟動報錯。

jsonb

不支援,資料寫入後報錯binlog解析錯誤。

bigint

數字字串:"2"。

decimal(38,18)

小數位元跟精度定義一致的數字字串,"1.234560000000000000"。

decimal(38,0)

小數位元跟精度定義一致的數字字串,"2"。

boolean

"true"/"false"。

date

yyyy-MM-dd格式日期文字:"2024-02-02"。

float4/float8/double

數值字串,轉換後與Holo查詢結果一致,不會補零:"1.24"。

interger/smallint

數字字串:"2"。

json

json字串:"{\"a\":2}"。

text/varchar

utf8編碼字串:"text"。

timestamp

精確到微秒的時間字串

  • 如果毫秒部分和微秒部分為0,寫入時會省略小數點後的0,例如:

    • "2020-01-01 09:01:01.000000"寫入後為"2020-01-01 09:01:01"。

  • 如果微秒部分為0,寫入時會省略毫秒後面的0,例如:

    • "2020-01-01 09:01:01.123000"寫入後為"2020-01-01 09:01:01.123"。

  • 如果微秒部分不為0,會在最後補3個0,例如:

    • "2020-01-01 09:01:01.123457"寫入後為"2020-01-01 09:01:01.123457000"。

timestamp with time zone

精確到毫秒的時間字串:"2020-01-01 09:01:01.123"。

  • 如果毫秒部分為0,寫入時會省略小數點後的0,例如:

    • "2020-01-01 09:01:01.000"寫入時為"2020-01-01 09:01:01"。

bigserial

數字字串:"2"。

bytea

base64編碼字串:"ASDB=="。

char

定長字串:"char"。

serial

數字字串:"2"

time

精確到微秒時間字串。

  • 如果毫秒部分和微秒部分為0,寫入時會省略小數點後的0:

    • 例如"2020-01-01 09:01:01.000000"寫入後為"2020-01-01 09:01:01"。

  • 如果毫秒部分或微秒部分不為0,會在最後補充0到納秒位:

    • 例如"2020-01-01 09:01:01.123457"寫入後為"2020-01-01 09:01:01.123457000"。

int4[]/int8[]

字串數組:["1","2","3","4"]。

float4[]/float8[]

字串數組:["1.23","2.34"]。

boolean[]

字串數組:["true","false"]。

text[]

字串數組:["a","b"]。

說明

注意時間類型的欄位在序列化時,若不在[0001-01-01,9999-12-31]範圍內的資料,序列化結果與Hologres中的查詢結果會有差異。

中繼資料欄位內容說明

說明
  • 與Canal CDC格式一樣,Hologres Binlog三種類型變更記錄插入、更新和刪除中,一次更新會產生兩條JSON格式記錄寫入Kafka Topic中,一條對應更新前的資料內容,一條對應更新後的資料內容。

  • JSON格式可以選擇是否輸出來源Binlog中繼資料,如果勾選,在JSON格式字串中會加入一些描述Hologres Binlog變更記錄屬性的欄位。

image

欄位名

欄位值含義

_sequence_id_

Hologres Binlog中記錄的唯一標識,對於全量同步資料填充null。

_operation_type_

DML變更類型,可選值為"I"/"U"/"D",對應插入、更新和刪除,對於全量同步資料填充"I"。

_execute_time_

13位毫秒時間戳記。

  • 對應Hologres表對應的資料變更發生時間。

  • 對於全量同步資料填充0。

_before_image_

  • 增量同步處理的訊息資料是否對應變更前內容,Y-是,N-否。

  • 全量同步填充N。

  • 訊息變更類型為插入時填充N。

  • 訊息變更類型為更新時,會往Kafka寫入兩條記錄,一條記錄填充Y,一條記錄填充N。

  • 訊息變更類型為刪除時填充Y。

_after_image_

  • 增量同步處理的訊息資料是否對應變更後內容,Y-是,N-否。

  • 全量同步填充Y。

  • 訊息變更類型為插入時填充Y。

  • 訊息變更類型為更新時,會往Kafka寫入兩條記錄,一條記錄填充Y,一條記錄填充N。

  • 訊息變更類型為刪除時填充N。