すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Hologres テーブルから Kafka へのリアルタイム同期

最終更新日:Oct 29, 2025

Data Integration は、DataHub や Hologres などのデータソースの単一テーブルから Kafka へのデータのリアルタイム同期をサポートしています。リアルタイム ETL 同期タスクは、ソース Hologres テーブルのスキーマに基づいて Kafka の Topic を初期化し、Hologres テーブルから Kafka へリアルタイムでデータを同期して消費します。このトピックでは、単一の Hologres テーブルから Kafka へのリアルタイム同期を設定する方法について説明します。

制限事項

  • Kafka データソースのバージョンは 0.10.2 から 3.6.0 の範囲である必要があります。

  • Hologres データソースのバージョンは V2.1 以降である必要があります。

  • Hologres パーティションテーブルからのデータの増分同期はサポートされていません。

  • Hologres テーブルの DDL 変更のメッセージは同期できません。

  • Hologres から同期できるのは、INTEGER、BIGINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、JSON、SERIAL、OID、INT4[]、INT8[]、FLOAT8[]、BOOLEAN[]、TEXT[]、および JSONB の各データ型の増分データです。

  • ソース Hologres データベースの Hologres テーブルでバイナリログを有効にする必要があります。詳細については、「Hologres バイナリログをサブスクライブする」をご参照ください。

前提条件

手順

1. 同期タスクのタイプを選択する

  1. Data Integration ページに移動します。

    DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[Data Integration] > [Data Integration] を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration に移動] をクリックします。

  2. 左側のナビゲーションウィンドウで、[同期タスク] をクリックします。次に、ページの上部にある [同期タスクの作成] をクリックして、同期タスク作成ページに移動します。次の基本情報を設定します:

    • [データソースと宛先]: HologresKafka

    • [新しいタスク名]: 同期タスクの名前をカスタマイズします。

    • [同期タイプ]: 単一テーブルリアルタイム

    • [同期ステップ]: 完全同期を選択します。

2. ネットワークとリソースを設定する

  1. [ネットワークとリソースの設定] セクションで、同期タスクの [リソースグループ] を選択します。タスクに CU 単位で [タスクリソース使用量] を割り当てることができます。

  2. [ソースデータソース] には、追加した Hologres データソースを選択します。[宛先データソース] には、追加した Kafka データソースを選択します。次に、[接続をテスト] をクリックします。image

  3. ソースと宛先の両方のデータソースが接続されていることを確認したら、[次へ] をクリックします。

3. 同期リンクを設定する

a. Hologres ソースを設定する

ページの上部で、Hologres データソースをクリックし、[Holo ソース情報] を編集します。

image

  1. [Holo ソース情報] セクションで、データを読み取る Hologres テーブルを含むスキーマとソーステーブルを選択します。

  2. 右上隅にある [データサンプリング] をクリックします。

    [データ出力プレビュー] ダイアログボックスで、[サンプル数] を指定し、[収集を開始] をクリックします。指定した Hologres テーブルからデータをサンプリングして、Hologres テーブル内のデータをプレビューできます。これにより、後続のデータ処理ノードでのデータプレビューと視覚的な設定のための入力が提供されます。

b. Kafka 宛先を設定する

ページの上部で、Kafka 宛先をクリックし、[Kafka 宛先情報] を編集します。

image

  1. [Kafka 宛先情報] セクションで、データを書き込む Kafka Topic を選択します。

  2. 必要に応じて [ソース Binlog 更新メッセージをマージ] を設定します。このオプションを有効にすると、ソースバイナリログの更新操作に対応する 2 つの更新メッセージが、Kafka に書き込まれる前に 1 つのメッセージにマージされます。

  3. [出力形式][キー列]、および [Kafka プロデューサーパラメーター] を設定します。

    • [出力形式]: Kafka に書き込まれるレコードの値コンテンツの形式を確認します。有効な値: Canal CDC および JSON。詳細については、「付録: 出力形式の説明」をご参照ください。

    • [キー列]: ソース列を選択します。選択した列の値は文字列にシリアル化され、カンマで連結されて、Kafka Topic に書き込まれるレコードのキーを形成します。

      説明
      • 列値のシリアル化ルールは、Hologres の列データ型の JSON シリアル化ルールと同じです。

      • Kafka Topic のキー値は、データが書き込まれるパーティションを決定します。同じキー値を持つデータは、同じパーティションに書き込まれます。コンシューマーが Kafka Topic のデータを順番に消費できるようにするには、Hologres テーブルのプライマリキー列をキー列として使用することをお勧めします。

      • ソース列がキー列として使用されない場合、Kafka Topic のキー値は null になります。この場合、データは Kafka Topic のランダムなパーティションに書き込まれます。

    • [Kafka プロデューサーパラメーター]: これらのパラメーターは、書き込み操作の一貫性、安定性、および例外処理の動作に影響します。ほとんどの場合、デフォルト設定を使用できます。カスタム要件がある場合は、特定のパラメーターを指定できます。Kafka のさまざまなバージョンでサポートされているプロデューサーパラメーターについては、「Kafka ドキュメント」をご参照ください。

4. アラートルールを設定する

同期タスクの失敗がビジネスデータの同期に遅延を引き起こすのを防ぐために、同期タスクにさまざまなアラートルールを設定できます。

  1. ページの右上隅にある [アラートルールを設定] をクリックして、[アラートルールを設定] パネルに移動します。

  2. [アラートルールを設定] パネルで、[アラートルールを追加] をクリックします。[アラートルールを追加] ダイアログボックスで、パラメーターを設定してアラートルールを構成します。

    説明

    このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了したら、「リアルタイム同期タスクの管理」を参照して、[リアルタイム同期タスク] ページに移動し、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。

  3. アラートルールを管理します。

    作成されたアラートルールを有効または無効にできます。また、アラートの重大度レベルに基づいて、異なるアラート受信者を指定することもできます。

5. 詳細パラメーターを設定する

DataWorks では、特定のパラメーターの設定を変更できます。ビジネス要件に基づいてこれらのパラメーターの値を変更できます。

説明

予期しないエラーやデータ品質の問題を防ぐために、パラメーターの値を変更する前に、パラメーターの意味を理解することをお勧めします。

  1. 設定ページの右上隅にある [詳細パラメーターを設定] をクリックします。

  2. [詳細パラメーターを設定] パネルで、目的のパラメーターの値を変更します。

6. リソースグループを設定する

ページの右上隅にある [リソースグループを設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。

7. 同期タスクを実行する

  1. 同期タスクの設定が完了したら、ページの下部にある [完了] をクリックします。

  2. [タスク] ページの [同期タスク] セクションで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。

  3. [タスク] セクションで同期タスクの [名前または ID] をクリックして、同期タスクの詳細な実行プロセスを表示します。

同期タスクの O&M 操作を実行する

同期タスクのステータスを表示する

データ同期ソリューションが作成された後、[タスク] ページに移動して、ワークスペースで作成されたすべてのデータ同期ソリューションと各データ同期ソリューションの基本情報を表示できます。

image

  • [操作] 列で同期タスクを [開始] または [停止] できます。[その他] ドロップダウンリストから同期タスクを [編集] または [表示] することもできます。

  • 開始されたタスクについては、[実行概要] でタスクの基本ステータスを表示できます。対応する概要エリアをクリックして、実行の詳細を表示することもできます。

image

Hologres テーブルから Kafka へのリアルタイム同期タスクは、次の 3 つのステップで構成されます:

  • [構造移行]: 宛先テーブルの作成方法 (既存のテーブルまたは自動テーブル作成) が含まれます。自動テーブル作成を選択した場合、テーブルを作成するためのデータ定義言語 (DDL) 文が表示されます。

  • [完全初期化]: タスクの [同期ステップ][完全同期] を選択した場合、完全初期化の進行状況がここに表示されます。

  • [リアルタイムデータ同期]: リアルタイムの読み取りおよび書き込みトラフィック、ダーティデータ、フェールオーバー、操作ログなどのリアルタイム同期に関する統計情報が含まれます。

同期タスクを再実行する

特別な場合に、同期するフィールド、宛先テーブルのフィールド、またはテーブル名情報を変更したい場合は、目的の同期タスクの [操作] 列にある [再実行] をクリックすることもできます。これにより、システムは宛先に行われた変更を同期します。すでに同期されていて変更されていないテーブルのデータは、再度同期されません。

  • 同期タスクの設定を変更せずに直接 [再実行] をクリックすると、システムが同期タスクを再実行します。

  • 同期タスクの設定を変更してから [完了] をクリックします。同期タスクの [操作] 列に表示される [更新を適用] をクリックして、最新の設定を有効にするために同期タスクを再実行します。

付録: 出力形式の説明

Canal CDC

Canal CDC は、Alibaba Canal によって定義された CDC データ形式です。

  • フィールドと意味

    フィールド名

    フィールド値の意味

    id

    このフィールドの値は 0 に固定されています。

    database

    Hologres データベースの名前。

    table

    Hologres テーブルの名前。

    pkNames

    Hologres テーブルのプライマリキー列。

    isDdl

    バイナリログが DDL 変更を記録するかどうかを指定します。Hologres テーブルの DDL 変更のメッセージの同期はサポートされていないため、このフィールドの値は false に固定されています。

    type

    DML 変更のタイプ。有効な値: INSERT、UPDATE、および DELETE。

    説明

    Hologres テーブルに対する 1 つの変更操作で、type が UPDATE である 2 つのレコードが生成されます。レコードは Kafka トピックに書き込まれます。

    • レコードの 1 つは、変更前のデータコンテンツに対応します。

    • もう 1 つは、変更後のデータコンテンツに対応します。

    • Hologres テーブルから Kafka トピックにすべてのデータを同期する場合、type フィールドの値は INSERT に固定されます。

    es

    ミリ秒単位の 13 桁のタイムスタンプ。タイムスタンプは、Hologres テーブルのデータが変更された時刻を示します。

    Hologres テーブルから Kafka トピックにすべてのデータを同期する場合、es フィールドの値は 0 に固定されます。

    ts

    ミリ秒単位の 13 桁のタイムスタンプ。タイムスタンプは、Hologres テーブル用に生成されたバイナリログが同期タスクによって読み取られた時刻を示します。

    sql

    Hologres テーブル用に生成されたバイナリログに DDL 変更が含まれている場合に、DDL 変更を記録する SQL コード。Hologres テーブルの DDL 変更のメッセージの同期はサポートされていないため、このフィールドの値は空の文字列に固定されています。

    sqlType

    Hologres テーブルフィールドのデータ型に対応する SQL フィールドデータ型。

    Hologres データ型と sqlType の有効な値のマッピング:

    • bigint: -5

    • スケールが 0 でない decimal: 3

    • スケールが 0 の decimal: -5

    • boolean: 16

    • date: 91

    • float4: 6

    • float8: 8

    • integer: 4

    • smallint: 5

    • json: 12

    • text: 12

    • varchar: 12

    • timestamp: 93

    • timestamptz: 93

    • bigserial: -5

    • bytea: 12

    • char: 12

    • serial: 4

    • time: 92

    • int4[]: 12

    • int8[]: 12

    • float4[]: 12

    • float8[]: 12

    • boolean[]: 12

    • text[]: 12

    mysqlType

    Hologres テーブルフィールドのデータ型に対応する MySQL フィールドデータ型。

    Hologres データ型と mysqlType の有効な値のマッピング:

    • bigint: BIGINT

    • int4: INT

    • スケールが 0 でない decimal: DECIMAL(xx,xx)

    • スケールが 0 の decimal: BIGINT

    • boolean: BOOLEAN

    • date: DATE

    • float4: FLOAT

    • float8: DOUBLE

    • integer: INT

    • smallint: SMALLINT

    • json: TEXT

    • text: TEXT

    • varchar: VARCHAR(xx)

    • timestamp: DATETIME(6)

    • timestamptz: DATETIME(6)

    • bigserial: BIGINT

    • bytea: TEXT

    • char: TEXT

    • serial: INT

    • time: TIME(6)

    • int4[]: TEXT

    • int8[]: TEXT

    • float4[]: TEXT

    • float8[]: TEXT

    • boolean[]: TEXT

    • text[]: TEXT

    data

    Hologres テーブルのデータ変更。Hologres テーブルのフィールド名がキーとして使用され、フィールドのデータ変更が文字列にシリアル化されて値として使用されます。その後、キーと値は JSON 形式の文字列として整理されます。シリアル化の詳細については、「JSON シリアル化の説明」をご参照ください。

    old

    Hologres テーブルに対する 1 つの変更操作で、type が UPDATE である 2 つのレコードが生成され、Kafka トピックに書き込まれます。

    レコードは、変更前と変更後のデータコンテンツに対応します。最初のレコードでは、old フィールドを使用して、変更前の Hologres テーブルのデータコンテンツを記録します。data フィールドは、UPDATE 以外の DML 変更のデータコンテンツを記録するために使用されます。

  • Hologres テーブルのバイナリログで INSERT 操作によって生成されたデータ変更に対応する Canal JSON 形式データの例

    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "INSERT",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }
  • Hologres テーブルから同期された完全データに対応する Canal JSON 形式データの例

    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "INSERT",
        "es": 0,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }
  • Hologres テーブルのバイナリログで UPDATE 操作によって生成されたデータ変更に対応する 2 つの Canal JSON 形式データレコードの例

    // 変更前のデータコンテンツ
    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "UPDATE",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "old": [
            {
                "bigint": "0",
                "integer": "0",
                "smallint": "0"
            }
        ],
        "data": null
    }
    // 変更後のデータコンテンツ
    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "UPDATE",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }
  • Hologres テーブルのバイナリログで DELETE 操作によって生成されたデータ変更に対応する Canal JSON 形式データの例

    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "DELETE",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }

Json

Json は、Hologres バイナリログのフィールド名をキーとして使用し、フィールドのデータコンテンツを文字列にシリアル化して値として使用する形式です。その後、キーと値は JSON 形式の文字列として整理され、Kafka Topic に書き込まれます。

JSON シリアル化の説明

Hologres データ型のシリアル化

Hologres データ型

Kafka に書き込まれるシリアル化結果

bit

サポートされていません。同期タスクの開始時にシステムはエラーを報告します。

inet

サポートされていません。同期タスクの開始時にシステムはエラーを報告します。

interval

サポートされていません。同期タスクの開始時にシステムはエラーを報告します。

money

サポートされていません。同期タスクの開始時にシステムはエラーを報告します。

oid

サポートされていません。同期タスクの開始時にシステムはエラーを報告します。

timetz

サポートされていません。同期タスクの開始時にシステムはエラーを報告します。

uuid

サポートされていません。同期タスクの開始時にシステムはエラーを報告します。

varbit

サポートされていません。同期タスクの開始時にシステムはエラーを報告します。

jsonb

サポートされていません。Kafka にデータが書き込まれた後、バイナリログの解析に失敗したことを示すエラーがシステムによって報告されます。

bigint

数値文字列。例: 2。

decimal(38,18)

小数点以下の桁数が精度と同じ数値文字列。例: 1.234560000000000000。

decimal(38,0)

小数点以下の桁数が精度と同じ数値文字列。例: 2。

boolean

"true"/"false"。

date

yyyy-MM-dd 形式の日付文字列。例: 2024-02-02。

float4/float8/double

数値文字列。システムはシリアル化結果に 0 を追加しません。これにより、シリアル化結果が Hologres テーブルからクエリしたデータと一致することが保証されます。例: 1.24。

interger/smallint

数値文字列。例: 2。

json

JSON 文字列。例: {\"a\":2}。

text/varchar

UTF-8 でエンコードされた文字列。例: text。

timestamp

マイクロ秒まで正確な時間文字列

  • ミリ秒部分とマイクロ秒部分の両方が 0 の場合、Kafka にデータが書き込まれるときに 2 つの部分は自動的に省略されます。

    • たとえば、時刻文字列 2020-01-01 09:01:01.000000 は、Kafka に書き込まれた後、2020-01-01 09:01:01 になります。

  • マイクロ秒部分が 0 の場合、書き込み中にミリ秒の後のゼロは省略されます。例:

    • たとえば、時刻文字列 2020-01-01 09:01:01.123000 は、Kafka に書き込まれた後、2020-01-01 09:01:01.123 になります。

  • マイクロ秒部分が 0 でない場合、Kafka にデータが書き込まれるときに、システムはマイクロ秒部分の後に 3 つのゼロを自動的に追加します。

    • たとえば、時刻文字列 2020-01-01 09:01:01.123457 は、Kafka に書き込まれた後、2020-01-01 09:01:01.123457000 になります。

timestamp with time zone

ミリ秒単位で正確な時刻文字列。例: 2020-01-01 09:01:01.123。

  • ミリ秒部分が 0 の場合、Kafka にデータが書き込まれるときにミリ秒部分は自動的に省略されます。

    • たとえば、時刻文字列 2020-01-01 09:01:01.000 は、Kafka に書き込まれた後、2020-01-01 09:01:01 になります。

bigserial

数値文字列。例: 2。

bytea

Base64 でエンコードされた文字列。例: ASDB==。

char

固定長文字列。例: char。

serial

数値文字列。例: 2

time

マイクロ秒単位で正確な時刻文字列。

  • ミリ秒とマイクロ秒の両方の部分が 0 の場合、2 つの部分は Kafka に書き込まれるときに自動的に省略されます:

    • たとえば、時刻文字列 2020-01-01 09:01:01.000000 は、Kafka に書き込まれた後、2020-01-01 09:01:01 になります。

  • ミリ秒またはマイクロ秒部分が 0 でない場合、システムはマイクロ秒部分の後にナノ秒部分の 0 を追加します:

    • たとえば、時刻文字列 2020-01-01 09:01:01.123457 は、Kafka に書き込まれた後、2020-01-01 09:01:01.123457000 になります。

int4[]/int8[]

文字列配列。例: ["1","2","3","4"]。

float4[]/float8[]

文字列配列。例: ["1.23","2.34"]。

boolean[]

文字列配列。例: ["true","false"]。

text[]

文字列配列。例: ["a","b"]。

説明

シリアル化された時間フィールドの値が [0001-01-01,9999-12-31] の範囲外の場合、シリアル化結果は Hologres のクエリ結果とは異なります。

メタデータフィールドの説明

説明
  • Hologres テーブルのバイナリログに記録された 1 つの INSERT、UPDATE、または DELETE 操作では、Canal CDC 形式を使用する場合と同じように、2 つの JSON 形式のレコードが生成されます。JSON 形式のレコードは、関連する Kafka トピックに同期されます。JSON 形式のレコードの 1 つは変更前のデータコンテンツに対応し、もう 1 つは変更後のデータコンテンツに対応します。

  • JSON 形式の場合、[ソース Binlog メタデータを出力] を選択できます。このオプションを選択すると、Hologres バイナリログの変更レコードのプロパティを記述するフィールドが JSON 形式の文字列に追加されます。

image

フィールド名

フィールド値の意味

_sequence_id_

Hologres テーブルのバイナリログ内のレコードの一意の識別子。フル同期を実行する場合、このフィールドの値は null として入力されます。

_operation_type_

DML 変更のタイプ。有効な値: "I"、"U"、および "D"。それぞれ INSERT、UPDATE、および DELETE 操作を示します。フル同期を実行する場合、このフィールドの値は "I" として入力されます。

_execute_time_

ミリ秒単位の 13 桁のタイムスタンプ。

  • Hologres テーブルのデータが変更された時刻を示します。

  • フル同期を実行する場合、このフィールドの値は 0 として入力されます。

_before_image_

  • 増分同期のメッセージデータが変更前のデータコンテンツに対応するかどうかを指定します。有効な値: Y および N。それぞれ yes と no を示します。

  • フル同期を実行する場合、このフィールドの値は N として入力されます。

  • 変更のメッセージタイプが INSERT の場合、このフィールドの値は N として入力されます。

  • 変更のメッセージタイプが UPDATE の場合、2 つのレコードが Kafka に書き込まれます。レコードの 1 つのこのフィールドの値は Y として入力され、もう 1 つのレコードのこのフィールドの値は N として入力されます。

  • 変更のメッセージタイプが DELETE の場合、このフィールドの値は Y として入力されます。

_after_image_

  • 増分同期のメッセージデータが変更後のデータコンテンツに対応するかどうかを指定します。有効な値: Y および N。それぞれ yes と no を示します。

  • フル同期を実行する場合、このフィールドの値は Y として入力されます。

  • 変更のメッセージタイプが INSERT の場合、このフィールドの値は Y として入力されます。

  • 変更のメッセージタイプが UPDATE の場合、2 つのレコードが Kafka に書き込まれます。レコードの 1 つのこのフィールドの値は Y として入力され、もう 1 つのレコードのこのフィールドの値は N として入力されます。

  • 変更のメッセージタイプが DELETE の場合、このフィールドの値は N として入力されます。