Data Integration は、DataHub、Kafka、LogHub などのソースの単一テーブルから MaxCompute へのリアルタイムデータ同期をサポートしています。このトピックでは、単一の LogHub (SLS) テーブルから MaxCompute へリアルタイムデータを同期する方法について説明します。
前提条件
Serverless リソースグループ または データ統合専用リソースグループ を購入済みであること。
LogHub (SLS) データソースと MaxCompute データソースを作成済みであること。詳細については、「データソースの構成」をご参照ください。
リソースグループとデータソース間のネットワーク接続が確立されていること。詳細については、「ネットワーク接続ソリューションの概要」をご参照ください。
制限事項
ソースデータを MaxCompute の外部テーブルに同期することはサポートされていません。
操作手順
ステップ 1:同期タスクタイプの選択
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration へ] をクリックします。
左側のナビゲーションウィンドウで、[同期タスク] をクリックします。ページの上部で [同期タスクの作成] をクリックして、タスク作成ページを開きます。次のように基本情報を設定します。
ソース:
LogHub。宛先:
MaxCompute。タスク名: 同期タスクのカスタム名を入力できます。
タスクタイプ:
単一 Logstore リアルタイム同期。
ステップ 2:ネットワークとリソースの構成
[ネットワークとリソースの構成] セクションで、同期タスクの [リソースグループ] を選択します。[タスクリソース使用量] については、要件に基づいて CU を割り当てます。
[ソース] を
LogHubデータソースに、[宛先] をMaxComputeデータソースに設定します。次に、[接続テスト] をクリックします。
ソースと宛先のデータソースが接続されていることを確認したら、[次へ] をクリックします。
ステップ 3:同期リンクの構成
1. SLS ソースの構成
ページの上部で SLS ソースをクリックし、ソース情報を編集します。

[ソース情報] セクションで、同期する LogHub の Logstore を選択します。
右上隅にある [データサンプリング] をクリックします。
表示されるダイアログボックスで、[開始時刻] と [サンプルデータレコード] を指定し、[収集開始] をクリックします。この操作により、Logstore からデータがサンプリングされ、プレビューが生成されます。このプレビューを使用して、後続のノードでデータ処理と可視化を設定できます。
Logstore を選択すると、そのデータは自動的に [出力フィールドの設定] セクションに読み込まれ、対応するフィールド名が生成されます。必要に応じて、[データ型] の調整、フィールドの [削除]、または [出力フィールドの追加] オプションを使用できます。
説明設定されたフィールドが SLS に存在しない場合、そのフィールドの値は下流ノードに NULL として出力されます。
2. データ処理ノードの編集
アイコンをクリックして、データ処理メソッドを追加できます。サポートされているデータ処理メソッドは、データマスキング、文字列置換、データフィルタリング、JSON パース、フィールドの編集と値の割り当て です。ビジネス要件に基づいてデータ処理メソッドを配置できます。同期タスクが実行されると、指定した処理順序に基づいてデータが処理されます。

データ処理ノードを設定した後、右上隅にある [データ出力のプレビュー] ボタンをクリックします。表示されるダイアログボックスで、[先祖ノードの出力を再取得] をクリックして、現在のノードがサンプリングされたログデータから生成する出力をシミュレートします。

データ出力プレビュー機能は、LogHub (SLS) ソースからの [データサンプリング] に依存します。したがって、プレビューを有効にするには、ソースフォームでデータサンプリングを完了する必要があります。
3. MaxCompute 宛先の構成
ページの上部で MaxCompute 宛先をクリックし、[宛先情報] を編集します。

[宛先情報] エリアで、[テーブルを自動作成] または [既存テーブルを使用] を選択します。
[テーブルを自動作成] を選択した場合、デフォルトでソーステーブルと同じ名前のテーブルが作成されます。必要に応じて宛先テーブルの名前を変更できます。
[既存テーブルを使用] を選択した場合、ドロップダウンリストから宛先テーブルを選択します。
(任意) 宛先テーブルのスキーマを変更します。
宛先テーブル パラメーターで [テーブルを自動的に作成] を選択した場合は、[テーブルスキーマの編集] をクリックします。表示されるダイアログボックスで、自動的に作成される宛先テーブルのスキーマを編集します。また、[先祖ノードの出力列に基づいてテーブルスキーマを再生成] をクリックして、先祖ノードの出力列に基づいてスキーマを再生成することもできます。生成されたスキーマから列を選択し、その列をプライマリキーとして設定できます。
説明宛先テーブルにはプライマリキーが必要です。そうでない場合、設定を保存できません。
ソースのフィールドと宛先のフィールド間のマッピングを設定します。
前述の構成が完了すると、[同名のフィールドをマッピング] の原則に基づき、ソースと宛先のフィールド間のマッピングがシステムによって自動的に作成されます。ビジネス要件に基づいてマッピングを変更できます。ソースの 1 つのフィールドは、宛先の複数のフィールドにマッピングできます。ソースの複数のフィールドを、宛先の同じフィールドにマッピングすることはできません。ソースのフィールドが宛先のどのフィールドにもマッピングされていない場合、そのソースフィールドのデータは宛先に同期されません。
パーティション設定。
パーティショニングメソッドを選択します。サポートされているメソッドは、[時間ベースの自動パーティショニング] と [フィールド値による動的パーティショニング] です。
4. アラートルールの設定
同期タスクの失敗によるビジネスデータの同期遅延を防ぐために、同期タスクにさまざまなアラートルールを設定できます。
ページの右上隅にある [アラートルールの設定] をクリックして、[リアルタイム同期サブノードのアラートルール設定] パネルに移動します。
[アラートルールの設定] パネルで、[アラートルールの追加] をクリックします。[アラートルールの追加] ダイアログボックスで、パラメーターを設定してアラートルールを構成します。
説明このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了した後、「リアルタイム同期タスクの管理」を参照して、リアルタイム同期タスクページに移動し、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。
アラートルールの管理。
作成されたアラートルールを有効または無効にできます。また、アラートの重大度レベルに基づいて、異なるアラート受信者を指定することもできます。
5. 詳細パラメーターの設定
DataWorks では、特定のパラメーターの設定を変更できます。ビジネス要件に基づいてこれらのパラメーターの値を変更できます。
予期しないエラーやデータ品質の問題を防ぐために、パラメーターの値を変更する前に、パラメーターの意味を理解することを推奨します。
設定ページの右上隅にある [詳細パラメーターの設定] をクリックします。
[詳細パラメーターの設定] パネルで、目的のパラメーターの値を変更します。
6. リソースグループの構成
ページの右上隅にある [リソースグループの設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。
7. シミュレーション実行
上記の設定が完了したら、設定ページの右上隅にある [シミュレーション実行] をクリックして、同期タスクがサンプリングされたデータを宛先テーブルに同期できるようにします。同期結果は宛先テーブルで確認できます。同期タスクの特定の設定が無効である場合、テスト実行中に例外が発生した場合、またはダーティデータが生成された場合、システムはリアルタイムでエラーを報告します。これにより、同期タスクの設定を確認し、期待される結果が得られるかどうかをできるだけ早く判断できます。
表示されるダイアログボックスで、指定されたテーブルからのデータサンプリングのパラメーターを設定します。これには、[開始位置] および [サンプルデータレコード] パラメーターが含まれます。
[収集開始] をクリックして、同期タスクがソースからデータをサンプリングできるようにします。
[プレビュー] をクリックして、同期タスクがサンプリングされたデータを宛先に同期できるようにします。
8. 同期タスクの実行
同期タスクの設定が完了したら、ページの下部にある [完了] をクリックします。
ページで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。
[タスク] セクションで同期タスクの [名前または ID] をクリックし、同期タスクの詳細な実行プロセスを表示します。
同期タスクの運用保守
タスク実行ステータスの表示
同期タスクを作成した後、[同期タスク] ページで同期タスクのリストとその基本情報を表示できます。

[操作] 列では、同期タスクを [開始] または [停止] できます。また、[その他] をクリックして、[編集] や [表示] などの他の操作を実行することもできます。
[実行概要] で実行中のタスクのステータスを表示し、タスクをクリックしてその実行詳細を表示できます。

LogHub (SLS) から MaxCompute への同期タスクには、[スキーマ移行] と [リアルタイムデータ同期] の 2 つのステップが含まれます。
スキーマ移行: 既存の宛先テーブルを使用するか、新しいテーブルを自動的に作成するかを指定します。テーブルの自動作成を選択した場合、テーブルのデータ定義言語 (DDL) 文が表示されます。
リアルタイムデータ同期: リアルタイムのランタイム情報、DDL レコード、アラート情報などのパフォーマンス統計を提供します。
同期タスクの再実行
特殊なケースでは、同期するフィールド、宛先テーブルのフィールド、またはテーブル名情報を変更する場合、目的の同期タスクの [操作] 列にある [再実行] をクリックすることもできます。これにより、システムは宛先に対して行われた変更を同期します。すでに同期済みで変更されていないテーブルのデータは、再度同期されません。
同期タスクの構成を変更せずに[再実行]を直接クリックすると、システムが同期タスクを再実行します。
同期タスクの構成を変更し、[完了] をクリックします。 同期タスクの [操作] 列に表示される [更新を適用] をクリックすると、同期タスクが再実行され、最新の構成が反映されます。