Data Integration は、Kafka や LogHub などのデータソースの単一テーブルから OSS へのデータのリアルタイム同期をサポートしています。このトピックでは、DataWorks Data Integration を使用して Kafka から OSS データレイクにデータをリアルタイムで同期する方法について説明します。
制限事項
使用する Kafka サービスのバージョンは 0.10.2 から 2.2.0 の範囲である必要があります。
前提条件
サーバーレスリソースグループまたはデータ統合専用リソースグループを購入済みであること。
Kafka データソースと OSS データソースを作成済みであること。 詳細については、「Data Integration のデータソースを作成する」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立済みであること。 詳細については、「ネットワーク接続ソリューション」をご参照ください。
手順
ステップ 1: 同期タスクタイプを選択する
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration へ移動] をクリックします。
左側のナビゲーションウィンドウで、[同期タスク] をクリックします。次に、ページの上部にある [同期タスクの作成] をクリックして、同期タスクの作成ページに移動します。次の基本情報を設定します。
ソースと宛先:
Kafka→OSS新しいノード名: 同期タスクの名前を指定します。
同期方法:
単一テーブルリアルタイム。
ステップ 2: ネットワークとリソースを設定する
[ネットワークとリソースの構成] セクションで、同期タスクに使用する [リソースグループ] を選択します。タスクに CU 単位で [タスクリソース使用量] を割り当てることができます。
[ソースデータソース] には、追加した
kafkaデータソースを選択します。[宛先データソース] には、追加したOSSデータソースを選択し、[接続テスト] をクリックします。
ソースと宛先の両方のデータソースが接続されていることを確認したら、[次へ] をクリックします。
ステップ 3: 同期リンクを設定する
1. Kafka データソースを設定する
設定ページの上部にあるウィザードで、Kafka をクリックし、[Kafka ソース情報] を編集します。

[Kafka ソース情報] セクションで、データを同期する Kafka Topic を選択します。
他のパラメーターはデフォルト値のままにするか、ビジネス要件に基づいて設定を変更します。
右上隅にある [データサンプリング] をクリックします。
表示されるダイアログボックスで、[開始時間] と [サンプリングデータレコード] パラメーターを設定し、[収集開始] をクリックします。システムは、指定した Kafka Topic からデータをサンプリングします。Kafka Topic のデータをプレビューできます。Kafka Topic のデータは、データ処理ノードのデータプレビューおよび可視化設定の入力データとして使用されます。
[出力フィールド設定] セクションで、同期するフィールドを選択します。
2. データ処理ノードを設定する
アイコンをクリックして、データ処理メソッドを追加できます。サポートされているデータ処理メソッドは、データマスキング、文字列置換、データフィルタリング、JSON パース、フィールドの編集と値の割り当て です。ビジネス要件に基づいてデータ処理メソッドを配置できます。同期タスクが実行されると、指定した処理順序に基づいてデータが処理されます。

データ処理ノードを設定した後、設定ページの右上隅にある [データ出力のプレビュー] をクリックできます。表示されるダイアログボックスで、[先祖ノードの出力を再取得] をクリックすると、データ処理ノードが指定された Kafka Topic からサンプリングされたデータを処理し、処理結果をプレビューできます。

データ処理ノードによって入力データが処理された後に生成される結果をプレビューする前に、Kafka データソースの [データサンプリング] 設定を行う必要があります。
3. OSS データソースを設定する
設定ページの上部にあるウィザードで、OSS をクリックして [OSS 宛先情報] を編集します。

[OSS 宛先情報] セクションで、データを書き込む OSS オブジェクトに関する基本情報を選択します。
書き込みフォーマット: Hudi、Paimon、lceberg のフォーマットがサポートされています。
メタデータベースの自動ビルド場所の選択: Alibaba Cloud アカウント内で Data Lake Formation (DLF) を有効にしている場合、データがデータレイクに同期されると、システムは自動的に DLF にメタデータベースとメタテーブルを作成します。
説明リージョンをまたいだメタデータベースの作成はサポートされていません。
ストレージパス: 同期されたデータを保存する OSS パスを選択します。
宛先データベース: データを書き込むデータベースの名前を選択します。[データベースの作成] を選択して DLF メタデータベースを作成し、[データベース名] を指定することもできます。
宛先テーブル: データを書き込む OSS オブジェクトの生成方法を選択します。有効な値: [テーブルの作成] および [既存のテーブルを使用]。
テーブル名: データを書き込む OSS オブジェクトの名前を入力または選択します。
(オプション) 宛先テーブルのスキーマを変更します。
宛先テーブルパラメーターに [テーブルを自動的に作成] を選択した場合は、[テーブルスキーマの編集] をクリックします。表示されるダイアログボックスで、自動的に作成される宛先テーブルのスキーマを編集します。[先祖ノードの出力列に基づいてテーブルスキーマを再生成] をクリックして、先祖ノードの出力列に基づいてスキーマを再生成することもできます。生成されたスキーマから列を選択し、その列をプライマリキーとして設定できます。
ソースのフィールドと宛先のフィールド間のマッピングを設定します。
上記の設定が完了すると、システムは [同名マッピング] の原則に基づいて、ソースのフィールドと宛先のフィールド間のマッピングを自動的に確立します。ビジネス要件に基づいてマッピングを変更できます。ソースの 1 つのフィールドは、宛先の複数のフィールドにマッピングできます。ソースの複数のフィールドを宛先の同じフィールドにマッピングすることはできません。ソースのフィールドにマッピングされた宛先のフィールドがない場合、ソースのフィールドのデータは宛先に同期されません。
4. アラートルールを設定する
同期タスクの失敗がビジネスデータの同期に遅延を引き起こすのを防ぐために、同期タスクに異なるアラートルールを設定できます。
ページの右上隅にある [アラートルールの設定] をクリックして、[アラートルールの設定] パネルに移動します。
[アラートルールの設定] パネルで、[アラートルールの追加] をクリックします。[アラートルールの追加] ダイアログボックスで、パラメーターを設定してアラートルールを構成します。
説明このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了した後、「リアルタイム同期タスクの管理」を参照して、リアルタイム同期タスクページに移動し、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。
アラートルールを管理します。
作成されたアラートルールを有効または無効にできます。アラートの重大度レベルに基づいて、異なるアラート受信者を指定することもできます。
5. 詳細パラメーターを設定する
DataWorks では、特定のパラメーターの設定を変更できます。ビジネス要件に基づいて、これらのパラメーターの値を変更できます。
予期しないエラーやデータ品質の問題を防ぐために、パラメーターの値を変更する前に、パラメーターの意味を理解することをお勧めします。
設定ページの右上隅にある [詳細パラメーターの設定] をクリックします。
[詳細パラメーターの設定] パネルで、目的のパラメーターの値を変更します。
ステップ 6: DDL 機能を設定する
ソースに対して DDL 操作が実行される場合があります。ページの右上隅にある [DDL 機能の設定] をクリックして、ビジネス要件に基づいてソースからの DDL メッセージを処理するルールを設定できます。
詳細については、「DDL メッセージを処理するルールを設定する」をご参照ください。
ステップ 7: リソースグループを設定する
ページの右上隅にある [リソースグループの設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。
ステップ 8: 同期タスクのテストを実行する
上記の設定が完了したら、設定ページの右上隅にある [シミュレーション実行] をクリックして、同期タスクがサンプリングされたデータを宛先テーブルに同期できるようにします。宛先テーブルで同期結果を表示できます。同期タスクの特定の設定が無効である場合、テスト実行中に例外が発生した場合、またはダーティデータが生成された場合、システムはリアルタイムでエラーを報告します。これにより、同期タスクの設定を確認し、期待される結果が最も早い機会に得られるかどうかを判断できます。
表示されるダイアログボックスで、指定したテーブルからのデータサンプリングのパラメーター ( [開始] および [サンプリングデータレコード] パラメーターを含む) を設定します。
[収集開始] をクリックして、同期タスクがソースからデータをサンプリングできるようにします。
[プレビュー] をクリックして、同期タスクがサンプリングされたデータを宛先に同期できるようにします。
ステップ 9: 同期タスクを実行する
同期タスクの設定が完了したら、ページの下部にある [完了] をクリックします。
[タスク] ページの [同期タスク] セクションで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。
[タスク] セクションで同期タスクの [名前または ID] をクリックし、同期タスクの詳細な実行プロセスを表示します。
同期タスクの O&M 操作を実行する
同期タスクのステータスを表示する
同期タスクが作成された後、[同期タスク] ページに移動して、ワークスペースで作成されたすべての同期タスクと各同期タスクの基本情報を表示できます。

[操作] 列の [開始] または [停止] をクリックして、同期タスクを開始または停止できます。[その他] を選択して、同期タスクに対して [編集] や [表示] などの操作を実行することもできます。
開始されたタスクについては、[実行概要] 列でタスクの基本的な実行ステータスを表示できます。対応する概要エリアをクリックして、実行の詳細を表示することもできます。

Kafka から OSS へのリアルタイム同期タスクには、次のステージがあります。
スキーマ移行: このタブには、宛先オブジェクトが新しく作成されたオブジェクトか既存のオブジェクトかなどの情報が表示されます。新しく作成されたオブジェクトの場合、オブジェクトの作成に使用された DDL 文が表示されます。
リアルタイム同期: このタブには、リアルタイム同期の詳細、DDL レコード、アラート情報など、リアルタイム同期に関する統計が表示されます。
同期タスクを再実行する
特殊なケースで、同期するフィールド、宛先テーブルのフィールド、またはテーブル名情報を変更したい場合は、目的の同期タスクの [操作] 列にある [再実行] をクリックすることもできます。これにより、システムは宛先に行われた変更を同期します。すでに同期され、変更されていないテーブルのデータは再同期されません。
同期タスクの設定を変更せずに直接 [再実行] をクリックすると、システムが同期タスクを再実行します。
同期タスクの設定を変更してから [完了] をクリックします。同期タスクの [操作] 列に表示される [更新を適用] をクリックして、最新の設定を有効にするために同期タスクを再実行します。