Data Integration は、DataHub や Hologres などのデータソースの単一テーブルから Kafka へのデータのリアルタイム同期をサポートしています。リアルタイム ETL 同期タスクは、ソース Hologres テーブルのスキーマに基づいて Kafka の Topic を初期化し、Hologres テーブルから Kafka へリアルタイムでデータを同期して消費します。このトピックでは、単一の Hologres テーブルから Kafka へのリアルタイム同期を設定する方法について説明します。
制限事項
Kafka データソースのバージョンは 0.10.2 から 3.6.0 の範囲である必要があります。
Hologres データソースのバージョンは V2.1 以降である必要があります。
Hologres パーティションテーブルからのデータの増分同期はサポートされていません。
Hologres テーブルの DDL 変更のメッセージは同期できません。
Hologres から同期できるのは、INTEGER、BIGINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、JSON、SERIAL、OID、INT4[]、INT8[]、FLOAT8[]、BOOLEAN[]、TEXT[]、および JSONB の各データ型の増分データです。
ソース Hologres データベースの Hologres テーブルでバイナリログを有効にする必要があります。詳細については、「Hologres バイナリログをサブスクライブする」をご参照ください。
前提条件
サーバーレスリソースグループが購入済みであること。
Hologres および Kafka データソースが作成済みであること。詳細については、「Data Integration のデータソースを作成する」をご参照ください。
リソースグループとデータソース間のネットワーク接続が確立されていること。詳細については、「ネットワーク接続ソリューション」をご参照ください。
手順
1. 同期タスクのタイプを選択する
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration に移動] をクリックします。
左側のナビゲーションウィンドウで、[同期タスク] をクリックします。次に、ページの上部にある [同期タスクの作成] をクリックして、同期タスク作成ページに移動します。次の基本情報を設定します:
[データソースと宛先]:
Hologres→Kafka[新しいタスク名]: 同期タスクの名前をカスタマイズします。
[同期タイプ]:
単一テーブルリアルタイム。[同期ステップ]:
完全同期を選択します。
2. ネットワークとリソースを設定する
[ネットワークとリソースの設定] セクションで、同期タスクの [リソースグループ] を選択します。タスクに CU 単位で [タスクリソース使用量] を割り当てることができます。
[ソースデータソース] には、追加した
Hologresデータソースを選択します。[宛先データソース] には、追加したKafkaデータソースを選択します。次に、[接続をテスト] をクリックします。
ソースと宛先の両方のデータソースが接続されていることを確認したら、[次へ] をクリックします。
3. 同期リンクを設定する
a. Hologres ソースを設定する
ページの上部で、Hologres データソースをクリックし、[Holo ソース情報] を編集します。

[Holo ソース情報] セクションで、データを読み取る Hologres テーブルを含むスキーマとソーステーブルを選択します。
右上隅にある [データサンプリング] をクリックします。
[データ出力プレビュー] ダイアログボックスで、[サンプル数] を指定し、[収集を開始] をクリックします。指定した Hologres テーブルからデータをサンプリングして、Hologres テーブル内のデータをプレビューできます。これにより、後続のデータ処理ノードでのデータプレビューと視覚的な設定のための入力が提供されます。
b. Kafka 宛先を設定する
ページの上部で、Kafka 宛先をクリックし、[Kafka 宛先情報] を編集します。

[Kafka 宛先情報] セクションで、データを書き込む Kafka Topic を選択します。
必要に応じて [ソース Binlog 更新メッセージをマージ] を設定します。このオプションを有効にすると、ソースバイナリログの更新操作に対応する 2 つの更新メッセージが、Kafka に書き込まれる前に 1 つのメッセージにマージされます。
[出力形式]、[キー列]、および [Kafka プロデューサーパラメーター] を設定します。
[出力形式]: Kafka に書き込まれるレコードの値コンテンツの形式を確認します。有効な値: Canal CDC および JSON。詳細については、「付録: 出力形式の説明」をご参照ください。
[キー列]: ソース列を選択します。選択した列の値は文字列にシリアル化され、カンマで連結されて、Kafka Topic に書き込まれるレコードのキーを形成します。
説明列値のシリアル化ルールは、Hologres の列データ型の JSON シリアル化ルールと同じです。
Kafka Topic のキー値は、データが書き込まれるパーティションを決定します。同じキー値を持つデータは、同じパーティションに書き込まれます。コンシューマーが Kafka Topic のデータを順番に消費できるようにするには、Hologres テーブルのプライマリキー列をキー列として使用することをお勧めします。
ソース列がキー列として使用されない場合、Kafka Topic のキー値は null になります。この場合、データは Kafka Topic のランダムなパーティションに書き込まれます。
[Kafka プロデューサーパラメーター]: これらのパラメーターは、書き込み操作の一貫性、安定性、および例外処理の動作に影響します。ほとんどの場合、デフォルト設定を使用できます。カスタム要件がある場合は、特定のパラメーターを指定できます。Kafka のさまざまなバージョンでサポートされているプロデューサーパラメーターについては、「Kafka ドキュメント」をご参照ください。
4. アラートルールを設定する
同期タスクの失敗がビジネスデータの同期に遅延を引き起こすのを防ぐために、同期タスクにさまざまなアラートルールを設定できます。
ページの右上隅にある [アラートルールを設定] をクリックして、[アラートルールを設定] パネルに移動します。
[アラートルールを設定] パネルで、[アラートルールを追加] をクリックします。[アラートルールを追加] ダイアログボックスで、パラメーターを設定してアラートルールを構成します。
説明このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了したら、「リアルタイム同期タスクの管理」を参照して、[リアルタイム同期タスク] ページに移動し、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。
アラートルールを管理します。
作成されたアラートルールを有効または無効にできます。また、アラートの重大度レベルに基づいて、異なるアラート受信者を指定することもできます。
5. 詳細パラメーターを設定する
DataWorks では、特定のパラメーターの設定を変更できます。ビジネス要件に基づいてこれらのパラメーターの値を変更できます。
予期しないエラーやデータ品質の問題を防ぐために、パラメーターの値を変更する前に、パラメーターの意味を理解することをお勧めします。
設定ページの右上隅にある [詳細パラメーターを設定] をクリックします。
[詳細パラメーターを設定] パネルで、目的のパラメーターの値を変更します。
6. リソースグループを設定する
ページの右上隅にある [リソースグループを設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。
7. 同期タスクを実行する
同期タスクの設定が完了したら、ページの下部にある [完了] をクリックします。
[タスク] ページの [同期タスク] セクションで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。
[タスク] セクションで同期タスクの [名前または ID] をクリックして、同期タスクの詳細な実行プロセスを表示します。
同期タスクの O&M 操作を実行する
同期タスクのステータスを表示する
データ同期ソリューションが作成された後、[タスク] ページに移動して、ワークスペースで作成されたすべてのデータ同期ソリューションと各データ同期ソリューションの基本情報を表示できます。

[操作] 列で同期タスクを [開始] または [停止] できます。[その他] ドロップダウンリストから同期タスクを [編集] または [表示] することもできます。
開始されたタスクについては、[実行概要] でタスクの基本ステータスを表示できます。対応する概要エリアをクリックして、実行の詳細を表示することもできます。

Hologres テーブルから Kafka へのリアルタイム同期タスクは、次の 3 つのステップで構成されます:
[構造移行]: 宛先テーブルの作成方法 (既存のテーブルまたは自動テーブル作成) が含まれます。自動テーブル作成を選択した場合、テーブルを作成するためのデータ定義言語 (DDL) 文が表示されます。
[完全初期化]: タスクの [同期ステップ] で [完全同期] を選択した場合、完全初期化の進行状況がここに表示されます。
[リアルタイムデータ同期]: リアルタイムの読み取りおよび書き込みトラフィック、ダーティデータ、フェールオーバー、操作ログなどのリアルタイム同期に関する統計情報が含まれます。
同期タスクを再実行する
特別な場合に、同期するフィールド、宛先テーブルのフィールド、またはテーブル名情報を変更したい場合は、目的の同期タスクの [操作] 列にある [再実行] をクリックすることもできます。これにより、システムは宛先に行われた変更を同期します。すでに同期されていて変更されていないテーブルのデータは、再度同期されません。
同期タスクの設定を変更せずに直接 [再実行] をクリックすると、システムが同期タスクを再実行します。
同期タスクの設定を変更してから [完了] をクリックします。同期タスクの [操作] 列に表示される [更新を適用] をクリックして、最新の設定を有効にするために同期タスクを再実行します。
付録: 出力形式の説明
Canal CDC
Canal CDC は、Alibaba Canal によって定義された CDC データ形式です。
Json
Json は、Hologres バイナリログのフィールド名をキーとして使用し、フィールドのデータコンテンツを文字列にシリアル化して値として使用する形式です。その後、キーと値は JSON 形式の文字列として整理され、Kafka Topic に書き込まれます。
