Data Integration は、DataHub、Hologres、Kafka、LogHub などのデータソースの単一テーブルから Hologres へのデータのリアルタイム同期をサポートしています。このトピックでは、LogHub (SLS) の単一テーブルから Hologres へデータをリアルタイムで同期する方法について説明します。
前提条件
Serverless リソースグループまたはデータ統合専用リソースグループを購入済みであること。
LogHub (SLS) と Hologres のデータソースを作成済みであること。詳細については、「Data Integration でデータソースを作成する」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立済みであること。詳細については、「ネットワーク接続ソリューション」をご参照ください。
手順
1. 同期タスクタイプを選択する
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration へ移動] をクリックします。
左側のナビゲーションウィンドウで [同期タスク] をクリックし、ページの上部にある [同期タスクの作成] をクリックしてタスク作成ページに移動します。次の基本情報を設定します。
データソースと宛先:
LogHub→Hologres新しいタスク名: 同期タスク名をカスタマイズします。
同期タイプ:
単一テーブルのリアルタイム同期。
2. ネットワークとリソースを設定する
[ネットワークとリソースの設定] セクションで、同期タスクの [リソースグループ] を選択します。[タスクリソースの使用量] に CU 数を割り当てることができます。
追加したデータソースを [ソースデータソース]
LogHub[宛先データソース] として選択し、追加したHologresデータソースを宛先データソースとして選択してから、[接続をテスト] をクリックします。
ソースと宛先の両方のデータソースが正常に接続されたことを確認した後、[次へ] をクリックします。
3. 同期リンクを設定する
1. SLS ソースを設定する
ページ上部の SLS データソースをクリックして、[SLS ソース情報] を編集します。

[SLS ソース情報] セクションで、同期したい LogHub (SLS) の Logstore を選択します。
右上隅の [データサンプリング] をクリックします。
表示されるダイアログボックスで、[開始時刻] と [サンプリングデータレコード] を指定し、[収集開始] をクリックします。Logstore からデータをサンプリングしてプレビューできます。これは、後続のデータ処理ノードのデータプレビューと可視化設定の入力となります。
Logstore を選択すると、システムは [出力フィールド設定] セクションで Logstore からデータを自動的に読み込み、対応するフィールド名を生成します。[データ型] の調整、フィールドの [削除]、および [出力フィールドを手動で追加] ができます。
説明出力フィールドが Simple Log Service データソースに存在しない場合、宛先には NULL が書き込まれます。
2. データ処理ノードを編集する
アイコンをクリックして、データ処理メソッドを追加できます。次のデータ処理メソッドがサポートされています: データマスキング、文字列の置換、データフィルタリング、JSON 解析、および フィールドの編集と値の割り当て。ビジネス要件に基づいてデータ処理メソッドを配置できます。同期タスクが実行されると、指定した処理順序に基づいてデータが処理されます。

データ処理ノードの設定が完了したら、右上隅の [データ出力のプレビュー] をクリックできます。表示されるダイアログボックスで、[先祖ノードの出力を再取得] をクリックして、現在のデータ処理ノードを介して Logstore サンプルデータを処理した結果をシミュレートします。

データ出力プレビューは、LogHub (SLS) ソースからの [データサンプリング] に強く依存します。データ出力プレビューを実行する前に、LogHub (SLS) ソースフォームでデータサンプリングを完了する必要があります。
3. Hologres 宛先情報を設定する
ページ上部の Hologres データ宛先をクリックして、[Hologres 宛先情報] を編集します。

[Hologres 宛先情報] セクションで、書き込みたい Hologres テーブルが配置されているスキーマを選択し、宛先テーブルを [テーブルの作成] にするか [既存のテーブルを使用] にするかを選択します。
[テーブルの作成] を選択した場合、デフォルトでデータソースと同じ名前のテーブルが作成されます。宛先テーブル名は手動で変更できます。
[既存のテーブルを使用] を選択した場合、ドロップダウンリストから同期したい宛先テーブルを選択します。
(任意) 宛先テーブルのスキーマを変更します。
宛先テーブルパラメーターに [テーブルを自動的に作成] を選択した場合、[テーブルスキーマの編集] をクリックします。表示されるダイアログボックスで、自動的に作成される宛先テーブルのスキーマを編集します。また、[先祖ノードの出力列に基づいてテーブルスキーマを再生成] をクリックして、先祖ノードの出力列に基づいてスキーマを再生成することもできます。生成されたスキーマから列を選択し、その列をプライマリキーとして設定できます。
説明宛先テーブルにはプライマリキーが必要です。そうでない場合、設定は保存できません。
[ジョブタイプ] と [書き込み競合ポリシー] を設定します。
ジョブタイプ:
リプレイ: これはミラーリング機能を表します。ソースがレコードを挿入すると、Hologres もレコードを挿入します。ソースがレコードを更新または削除すると、Hologres も対応するレコードを更新または削除します。
挿入: これは Hologres をストリームストレージとして扱い、INSERT を使用してソースからのすべてのデータを保存します。
書き込み競合ポリシー: データの書き込み競合を処理するためのポリシーです。オプションには [上書き] と [無視] があります。
ソースのフィールドと宛先のフィールド間のマッピングを設定します。
上記の設定が完了すると、システムは [同名マッピング] の原則に基づいて、ソースのフィールドと宛先のフィールド間のマッピングを自動的に確立します。ビジネス要件に基づいてマッピングを変更できます。ソースの 1 つのフィールドは、宛先の複数のフィールドにマッピングできます。ソースの複数のフィールドを、宛先の同じフィールドにマッピングすることはできません。ソースのフィールドにマッピングされた宛先のフィールドがない場合、ソースのフィールドのデータは宛先に同期されません。
4. アラートルールを設定する
同期タスクの失敗がビジネスデータの同期に遅延を引き起こすのを防ぐために、同期タスクに異なるアラートルールを設定できます。
ページの右上隅にある [アラートルールの設定] をクリックして、[アラートルールの設定] パネルに移動します。
[アラートルールの設定] パネルで、[アラートルールの追加] をクリックします。[アラートルールの追加] ダイアログボックスで、パラメーターを設定してアラートルールを構成します。
説明このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了したら、「リアルタイム同期タスクの管理」を参照して、リアルタイム同期タスクページに移動し、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。
アラートルールを管理します。
作成されたアラートルールを有効または無効にできます。また、アラートの重大度レベルに基づいて、異なるアラート受信者を指定することもできます。
5. 詳細パラメーターを設定する
DataWorks では、特定のパラメーターの設定を変更できます。ビジネス要件に基づいて、これらのパラメーターの値を変更できます。
予期しないエラーやデータ品質の問題を防ぐために、パラメーターの値を変更する前に、パラメーターの意味を理解することをお勧めします。
設定ページの右上隅にある [詳細パラメーターの設定] をクリックします。
[詳細パラメーターの設定] パネルで、目的のパラメーターの値を変更します。
6. リソースグループを設定する
ページの右上隅にある [リソースグループの設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。
7. 同期タスクのテストを実行する
上記の設定が完了したら、設定ページの右上隅にある [シミュレーション実行の実行] をクリックして、同期タスクがサンプリングされたデータを宛先テーブルに同期できるようにします。同期結果は宛先テーブルで表示できます。同期タスクの特定の設定が無効である場合、テスト実行中に例外が発生した場合、またはダーティデータが生成された場合、システムはリアルタイムでエラーを報告します。これにより、同期タスクの設定を確認し、期待される結果が最も早い機会に得られるかどうかを判断するのに役立ちます。
表示されるダイアログボックスで、指定されたテーブルからのデータサンプリングのパラメーターを設定します。これには [開始] および [サンプリングデータレコード] パラメーターが含まれます。
[収集開始] をクリックして、同期タスクがソースからデータをサンプリングできるようにします。
[プレビュー] をクリックして、同期タスクがサンプリングされたデータを宛先に同期できるようにします。
8. 同期タスクを実行する
同期タスクの設定が完了したら、ページの下部にある [完了] をクリックします。
[タスク] ページの [同期タスク] セクションで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。
[タスク] セクションで同期タスクの [名前または ID] をクリックし、同期タスクの詳細な実行プロセスを表示します。
同期タスクの O&M
同期タスクの実行ステータスを表示する
同期タスクが作成された後、[同期タスク] ページに移動して、ワークスペースで作成されたすべての同期タスクと各同期タスクの基本情報を表示できます。

[アクション] 列で同期タスクを [開始] または [停止] できます。[その他] メニューでは、同期タスクの [編集]、[表示]、およびその他の操作を実行できます。
開始されたタスクについては、[実行概要] でタスクの基本ステータスを確認するか、対応する概要エリアをクリックして実行詳細を表示できます。

LogHub (SLS) から Hologres への同期タスクには、[スキーマ移行] と [リアルタイムデータ同期] の 2 つのステージがあります。
スキーマ移行: このタブには、宛先テーブルの生成メソッドなどの情報が表示されます。宛先テーブルの生成メソッドには、[既存のテーブルを使用] と [テーブルの作成] があります。宛先テーブルの生成メソッドが [テーブルの作成] の場合、テーブルの作成に使用される DDL 文が表示されます。
リアルタイムデータ同期: このタブには、リアルタイム実行情報、DDL レコード、アラート情報など、リアルタイム同期に関する統計情報が表示されます。
同期タスクを再実行する
特殊なケースで、同期するフィールド、宛先テーブルのフィールド、またはテーブル名情報を変更したい場合は、目的の同期タスクの [操作] 列にある [再実行] をクリックすることもできます。これにより、システムは宛先に行われた変更を同期します。すでに同期されていて変更されていないテーブルのデータは、再度同期されません。
同期タスクの設定を変更せずに直接 [再実行] をクリックすると、システムが同期タスクを再実行します。
同期タスクの設定を変更してから [完了] をクリックします。同期タスクの [操作] 列に表示される [更新を適用] をクリックして、最新の設定を有効にするために同期タスクを再実行します。