Data Integration は、DataHub、Hologres、Kafka、LogHub などのデータソースの単一テーブルから Hologres へのデータのリアルタイム同期をサポートしています。リアルタイム同期タスクは、指定された Kafka トピックの構造に基づいて、宛先 Hologres テーブルのスキーマを初期化します。次に、タスクは指定された Kafka トピックから宛先 Hologres テーブルに一度に完全データを同期し、トピックから宛先 Hologres テーブルに増分データをリアルタイムで同期します。このトピックでは、Kafka トピックから Hologres にデータをリアルタイムで同期する方法について説明します。
前提条件
サーバーレスリソースグループまたはデータ統合専用リソースグループを購入済みであること。
Kafka および Hologres データソースを作成済みであること。詳細については、「Data Integration のデータソースを作成する」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立済みであること。詳細については、「ネットワーク接続ソリューション」をご参照ください。
手順
1. 同期タスクタイプを選択する
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ統合に移動] をクリックします。
左側のナビゲーションウィンドウで [同期タスク] をクリックし、ページ上部の [同期タスクの作成] をクリックして同期タスク作成ページに移動します。次の基本情報を設定します。
データソースと宛先:
Kafka→Hologres新しいタスク名: 同期タスク名をカスタマイズします。
同期タイプ:
単一テーブルのリアルタイム同期。
2. ネットワークとリソースを設定する
[ネットワークとリソースの設定] セクションで、同期タスクの [リソースグループ] を選択します。[タスクリソース使用量] の計算ユニット (CU) 数を割り当てることができます。
[ソースデータソース]
kafkaに追加したデータソースを選択し、[宛先データソース] に追加したHologresデータソースを選択して、[接続性テスト] をクリックします。
ソースと宛先の両方のデータソースが正常に接続されたことを確認したら、[次へ] をクリックします。
3. 同期リンクを設定する
1. Kafka ソースを設定する
ページ上部の Kafka データソースをクリックして、[Kafka ソース情報] を編集します。

[Kafka ソース情報] セクションで、Kafka クラスター内でデータを同期するトピックを選択します。
他のパラメーターはデフォルト値を保持するか、ビジネス要件に基づいて設定を変更します。
右上隅の [データサンプリング] をクリックします。
表示されるダイアログボックスで、[開始時刻] と [サンプリングデータレコード] パラメーターを設定し、[収集開始] をクリックします。システムは指定した Kafka トピックからデータをサンプリングします。Kafka トピックのデータをプレビューできます。Kafka トピックのデータは、データ処理ノードのデータプレビューおよび可視化設定の入力データとして使用されます。
[出力フィールド設定] セクションで、必要に応じて同期するフィールドを選択します。
2. データ処理ノードを編集する
アイコンをクリックして、データ処理メソッドを追加できます。次のデータ処理メソッドがサポートされています: データマスキング、文字列の置換、データフィルタリング、JSON 解析、および フィールドの編集と値の割り当て。ビジネス要件に基づいてデータ処理メソッドを配置できます。同期タスクが実行されると、指定した処理順序に基づいてデータが処理されます。

データ処理ノードを設定した後、右上隅の [データ出力のプレビュー] をクリックできます。表示されるダイアログボックスで、[先祖ノードの出力を再取得] をクリックして、データ処理ノードが指定された Kafka トピックからサンプリングされたデータを処理し、処理結果をプレビューできるようにします。

データ処理ノードによって入力データが処理された後に生成される結果をプレビューする前に、Kafka データソースの [データサンプリング] 設定を構成する必要があります。
3. Hologres 宛先を設定する
ページ上部の Hologres データ宛先をクリックして、[Hologres 宛先情報] を編集します。

[Hologres 宛先情報] セクションで、データを書き込む Hologres テーブルを含むスキーマを選択し、宛先テーブルに [テーブルの作成] または [既存のテーブルを使用] を選択します。
[テーブルの作成] を選択した場合、デフォルトでソーステーブルと同じ名前のテーブルが作成されます。宛先テーブル名は手動で変更できます。
[既存のテーブルを使用] を選択した場合、ドロップダウンリストからデータを同期する宛先テーブルを選択します。
(オプション) 宛先テーブルのスキーマを変更します。
[宛先テーブル] パラメーターに [テーブルを自動的に作成] を選択した場合、[テーブルスキーマの編集] をクリックします。表示されるダイアログボックスで、自動的に作成される宛先テーブルのスキーマを編集します。また、[先祖ノードの出力列に基づいてテーブルスキーマを再生成] をクリックして、先祖ノードの出力列に基づいてスキーマを再生成することもできます。生成されたスキーマから列を選択し、その列をプライマリキーとして設定できます。
説明宛先テーブルにはプライマリキーが必要です。そうでない場合、設定は保存できません。
[ジョブタイプ] と [書き込み競合ポリシー] を設定します。
ジョブタイプ:
リプレイ: このオプションはミラー機能を意味します。ソースでレコードが挿入されると、そのレコードは Hologres にも挿入されます。ソースでレコードが更新または削除されると、そのレコードは Hologres でも更新または削除されます。
挿入: このオプションは、Hologres がストリーミングストレージとして扱われることを意味します。ソースからのすべてのデータは、INSERT 文を使用して Hologres に挿入されます。
書き込み競合ポリシー: 書き込み競合を処理するためのポリシーです。有効な値: 上書き および 無視。
ソースのフィールドと宛先のフィールド間のマッピングを設定します。
上記の設定を完了すると、システムは [同名マッピング] の原則に基づいて、ソースのフィールドと宛先のフィールド間のマッピングを自動的に確立します。ビジネス要件に基づいてマッピングを変更できます。ソースの 1 つのフィールドは、宛先の複数のフィールドにマッピングできます。ソースの複数のフィールドを、宛先の同じフィールドにマッピングすることはできません。ソースのフィールドに宛先のマッピングされたフィールドがない場合、ソースのフィールドのデータは宛先に同期されません。
4. アラートルールを設定する
同期タスクの失敗がビジネスデータの同期に遅延を引き起こすのを防ぐために、同期タスクに異なるアラートルールを設定できます。
ページの右上隅にある [アラートルールの設定] をクリックして、[アラートルールの設定] パネルに移動します。
[アラートルールの設定] パネルで、[アラートルールの追加] をクリックします。[アラートルールの追加] ダイアログボックスで、パラメーターを設定してアラートルールを構成します。
説明このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了した後、「リアルタイム同期タスクの管理」を参照してリアルタイム同期タスクページに移動し、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。
アラートルールを管理します。
作成されたアラートルールを有効または無効にできます。また、アラートの重大度レベルに基づいて、異なるアラート受信者を指定することもできます。
5. 詳細パラメーターを設定する
DataWorks では、特定のパラメーターの設定を変更できます。ビジネス要件に基づいてこれらのパラメーターの値を変更できます。
予期しないエラーやデータ品質の問題を防ぐために、パラメーターの値を変更する前に、パラメーターの意味を理解することをお勧めします。
設定ページの右上隅にある [詳細パラメーターの設定] をクリックします。
[詳細パラメーターの設定] パネルで、目的のパラメーターの値を変更します。
6. リソースグループを設定する
ページの右上隅にある [リソースグループの設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。
7. 同期タスクのテストを実行する
上記の設定が完了したら、設定ページの右上隅にある [シミュレーション実行の実行] をクリックして、同期タスクがサンプリングされたデータを宛先テーブルに同期できるようにします。同期結果は宛先テーブルで確認できます。同期タスクの特定の設定が無効である場合、テスト実行中に例外が発生した場合、またはダーティデータが生成された場合、システムはリアルタイムでエラーを報告します。これにより、同期タスクの設定を確認し、期待される結果が早期に得られるかどうかを判断できます。
表示されるダイアログボックスで、[開始時刻] および [サンプリングデータレコード] パラメーターを含む、指定されたテーブルからのデータサンプリングのパラメーターを設定します。
[収集開始] をクリックして、同期タスクがソースからデータをサンプリングできるようにします。
[プレビュー] をクリックして、同期タスクがサンプリングされたデータを宛先に同期できるようにします。
8. 同期タスクを実行する
同期タスクの設定が完了したら、ページ下部の [完了] をクリックします。
[同期タスク] ページの [タスク] セクションで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。
[タスク] セクションの同期タスクの [名前または ID] をクリックし、同期タスクの詳細な実行プロセスを表示します。
同期タスクの O&M 操作を実行する
同期タスクの実行ステータスを表示する
同期タスクの設定を完了すると、作成されたすべての同期タスクと各同期タスクの基本情報を [同期タスク] ページで表示できます。

[アクション] 列の [開始] または [停止] をクリックして、同期タスクを開始または停止できます。また、[その他] をクリックして、同期タスクに対して [編集] や [表示] などの操作を実行することもできます。
開始されたタスクについては、[実行詳細] で基本的な実行ステータスを表示するか、対応するエリアをクリックして実行詳細を表示できます。

Kafka から Hologres へのリアルタイム同期タスクは、2 つのステップで構成されます。
スキーマ移行: このタブには、宛先テーブルの生成方法などの情報が表示されます。宛先テーブルの生成方法には、[既存のテーブルを使用] と [テーブルの作成] があります。宛先テーブルの生成方法が [テーブルの作成] の場合、テーブルの作成に使用される DDL 文が表示されます。
リアルタイムデータ同期: このタブには、リアルタイムの読み取りおよび書き込みトラフィック、ダーティデータ情報、フェールオーバー、操作ログなど、リアルタイム同期に関する統計情報が表示されます。
同期タスクを再実行する
特別な場合に、同期するフィールド、宛先テーブルのフィールド、またはテーブル名情報を変更したい場合は、目的の同期タスクの [操作] 列にある [再実行] をクリックすることもできます。これにより、システムは宛先に行われた変更を同期します。既に同期され、変更されていないテーブルのデータは再同期されません。
同期タスクの設定を変更せずに直接 [再実行] をクリックすると、システムが同期タスクを再実行します。
同期タスクの設定を変更してから [完了] をクリックします。同期タスクの [操作] 列に表示される [更新の適用] をクリックして、最新の設定を有効にするために同期タスクを再実行します。