Data Integration は、Kafka や LogHub などのソースの単一テーブルから OSS へのデータのリアルタイム同期をサポートしています。このトピックでは、Simple Log Service Logstore から OSS-HDFS データレイクにデータをリアルタイムで同期する方法について説明します。
前提条件
サーバーレスリソースグループまたはデータ統合専用リソースグループが購入済みであること。
Simple Log Service データソースと OSS-HDFS データソースが作成済みであること。詳細については、「データ統合のデータソースを作成する」をご参照ください。
リソースグループとデータソース間のネットワーク接続が確立されていること。詳細については、「ネットワーク接続ソリューション」をご参照ください。
手順
ステップ 1: 同期タスクの種類を選択する
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ統合へ] をクリックします。
左側のナビゲーションウィンドウで、[同期タスク] をクリックし、ページの上部にある [同期タスクの作成] をクリックします。表示されたページで、次の基本情報を設定します。
[ソースと宛先]:
LogHub→OSS-HDFS[新しいノード名]: 同期タスクの名前を指定します。
[同期方法]:
単一 Logstore リアルタイム同期。
ステップ 2: ネットワークとリソースを設定する
[ネットワークとリソースの設定] セクションで、同期タスクの [リソースグループ] を選択します。[タスクリソース使用量] の計算ユニット (CU) 数を割り当てることができます。
[ソース] には追加した
LogHubデータソースを選択し、[宛先] には追加したOSS-HDFSデータソースを選択して、[接続性テスト] をクリックします。
ソースと宛先のデータソースが接続されていることを確認したら、[次へ] をクリックします。
ステップ 3: 同期リンクを設定する
1. Kafka ソースの設定
ページ上部のウィザードで、SLS をクリックして [SLS ソース情報] を設定します。

[SLS ソース情報] セクションで、データを同期する Logstore を選択します。
右上隅にある [データサンプリング] をクリックします。
表示されるダイアログボックスで、[開始時間] と [サンプリングデータレコード] パラメーターを指定し、[収集開始] をクリックします。システムは Logstore からサンプルデータを収集します。Logstore のデータをプレビューでき、これは後続のデータ処理ノードでのデータプレビューと視覚的な設定の入力となります。
Logstore を選択すると、システムは Logstore からデータを自動的にロードし、[出力フィールド設定] セクションにフィールド名を生成します。[データ型] の調整、フィールドの [削除]、および [出力フィールドの手動追加] ができます。
説明出力フィールドが Simple Log Service データソースに存在しない場合、宛先には NULL が書き込まれます。
2. データ処理ノードを編集する
アイコンをクリックしてデータ処理メソッドを追加できます。サポートされているデータ処理メソッドは、データマスキング、文字列置換、データフィルタリング、JSON パース、および フィールドの編集と値の割り当て です。ビジネス要件に基づいてデータ処理メソッドを配置できます。同期タスクが実行されると、指定した処理順序に基づいてデータが処理されます。

データ処理ノードの設定が完了したら、右上隅にある [データ出力のプレビュー] をクリックできます。表示されるダイアログボックスで、[アップストリーム出力を再度取得] をクリックして、Logstore からのサンプルデータが現在のデータ処理ノードで処理された後の結果をシミュレートします。
データ出力のプレビュー機能は、Simple Log Service ソースからの [データサンプリング] に大きく依存します。データ出力をプレビューする前に、Simple Log Service ソースフォームでデータサンプリングを完了する必要があります。
3. OSS-HDFS 宛先情報を設定する
ページ上部のウィザードで、OSS-HDFS をクリックして [OSS-HDFS 宛先情報] を設定します。

[OSS-HDFS 宛先情報] セクションで、データを書き込む OSS-HDFS 宛先に関する基本情報を選択します。
書き込みフォーマット: 3 つの書き込みフォーマット (Hudi、Paimon、および lceberg) がサポートされています。
[メタデータベース自動構築場所の選択]: アカウントで Data Lake Formation (DLF) を有効にしている場合、データレイクにデータを同期する際に DLF でメタデータベースとメタテーブルを自動的に構築できます。
説明リージョンをまたいだメタデータベースとメタテーブルの作成はサポートされていません。
[ストレージパスの選択]: データがデータレイクに同期された後に保存される OSS パスを選択します。
[宛先データベース]: データを書き込む宛先データベースを選択します。[データベースの作成] を選択して DLF メタデータベースを作成し、[データベース名] を指定できます。
[宛先テーブル]: データを書き込む OSS テーブルについて、[テーブルの自動作成] または [既存テーブルの使用] を選択します。
[テーブル名]: データを書き込む OSS テーブルの名前を入力または選択します。
(オプション) 宛先テーブルのスキーマを変更します。
[宛先テーブル] パラメーターで [テーブルを自動的に作成] を選択した場合は、[テーブルスキーマの編集] をクリックします。表示されるダイアログボックスで、自動的に作成される宛先テーブルのスキーマを編集します。また、[先祖ノードの出力列に基づいてテーブルスキーマを再生成] をクリックして、先祖ノードの出力列に基づいてスキーマを再生成することもできます。生成されたスキーマから列を選択し、その列をプライマリキーとして設定できます。
ソースのフィールドと宛先のフィールド間のマッピングを設定します。
前述の設定が完了すると、システムは [同名マッピング] の原則に基づいて、ソースのフィールドと宛先のフィールド間のマッピングを自動的に確立します。ビジネス要件に基づいてマッピングを変更できます。ソースの 1 つのフィールドは、宛先の複数のフィールドにマッピングできます。ソースの複数のフィールドを宛先の同じフィールドにマッピングすることはできません。ソースのフィールドにマッピングされた宛先のフィールドがない場合、ソースのフィールドのデータは宛先に同期されません。
4. アラートルールを設定する
同期タスクの失敗がビジネスデータの同期に遅延を引き起こすのを防ぐために、同期タスクにさまざまなアラートルールを設定できます。
ページの右上隅にある [アラートルールの設定] をクリックして、[アラートルールの設定] パネルに移動します。
[アラートルールの設定] パネルで、[アラートルールの追加] をクリックします。[アラートルールの追加] ダイアログボックスで、パラメーターを設定してアラートルールを構成します。
説明このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了した後、「リアルタイム同期タスクの管理」を参照して [リアルタイム同期タスク] ページに移動し、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。
アラートルールの管理。
作成されたアラートルールを有効または無効にできます。また、アラートの重大度レベルに基づいて、異なるアラート受信者を指定することもできます。
5. 高度なパラメーターを設定する
DataWorks では、特定のパラメーターの設定を変更できます。ビジネス要件に基づいてこれらのパラメーターの値を変更できます。
予期しないエラーやデータ品質の問題を防ぐために、パラメーターの値を変更する前に、パラメーターの意味を理解することをお勧めします。
設定ページの右上隅にある [高度なパラメーターの設定] をクリックします。
[高度なパラメーターの設定] パネルで、目的のパラメーターの値を変更します。
ステップ 6: DDL 機能を設定する
ソースで DDL 操作が実行される場合があります。ページの右上隅にある [DDL 機能の設定] をクリックして、ビジネス要件に基づいてソースからの DDL メッセージを処理するルールを設定できます。
詳細については、「DDL メッセージを処理するルールを設定する」をご参照ください。
ステップ 7: リソースグループを設定する
ページの右上隅にある [リソースグループの設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。
ステップ 8: シミュレーション実行を実行する
前述の設定が完了したら、設定ページの右上隅にある [シミュレーション実行の実行] をクリックして、同期タスクがサンプリングされたデータを宛先テーブルに同期できるようにします。宛先テーブルで同期結果を表示できます。同期タスクの特定の設定が無効である場合、テスト実行中に例外が発生した場合、またはダーティデータが生成された場合、システムはリアルタイムでエラーを報告します。これにより、同期タスクの設定を確認し、期待される結果が早期に得られるかどうかを判断できます。
表示されるダイアログボックスで、[開始] および [サンプリングデータレコード] パラメーターを含む、指定されたテーブルからのデータサンプリングのパラメーターを設定します。
[収集開始] をクリックして、同期タスクがソースからデータをサンプリングできるようにします。
[プレビュー] をクリックして、同期タスクがサンプリングされたデータを宛先に同期できるようにします。
ステップ 9: 同期タスクを実行する
同期タスクの設定が完了したら、ページの下部にある [完了] をクリックします。
[同期タスク] ページの [タスク] セクションで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。
[タスク] セクションで同期タスクの [名前または ID] をクリックし、同期タスクの詳細な実行プロセスを表示します。
同期タスクの O&M
同期タスクの実行ステータスを表示する
同期タスクが作成された後、[同期タスク] ページに移動して、ワークスペースで作成されたすべての同期タスクと各同期タスクの基本情報を表示できます。

[アクション] 列で同期タスクを [開始] または [停止] できます。また、[その他] を選択して、同期タスクの [編集]、[表示]、またはその他の操作を実行することもできます。
開始されたタスクについては、[実行概要] 列で基本的な実行ステータスを表示できます。また、対応する概要エリアをクリックして実行の詳細を表示することもできます。

Simple Log Service から OSS-HDFS への同期タスクには、[スキーマ移行] と [リアルタイムデータ同期] のステージが含まれます。
[スキーマ移行]: このタブには、宛先テーブルが新しく作成されたテーブルか既存のテーブルかなどの情報が表示されます。新しく作成されたテーブルの場合、テーブルの作成に使用された DDL 文が表示されます。
[リアルタイムデータ同期]: このタブには、リアルタイム同期の詳細、DDL レコード、アラート情報など、リアルタイム同期に関する統計情報が表示されます。
同期タスクを再実行する
特殊なケースで、同期するフィールド、宛先テーブルのフィールド、またはテーブル名情報を変更したい場合は、目的の同期タスクの [操作] 列にある [再実行] をクリックすることもできます。これにより、システムは宛先に行われた変更を同期します。すでに同期されていて変更されていないテーブルのデータは、再度同期されません。
同期タスクの設定を変更せずに直接 [再実行] をクリックすると、システムが同期タスクを再実行できます。
同期タスクの設定を変更してから [完了] をクリックします。同期タスクの [操作] 列に表示される [更新の適用] をクリックして、最新の設定を有効にするために同期タスクを再実行します。