DataWorks では、リアルタイム抽出・変換・書き出し (ETL) 同期タスクを使用して、Kafka から StarRocks にデータを同期できます。このタスクは、指定された Kafka Topic の構造に基づいて宛先 StarRocks テーブルのスキーマを初期化します。次に、タスクは指定された Kafka Topic から宛先 StarRocks テーブルに一度に完全データを同期し、Topic から宛先 StarRocks テーブルに増分データをリアルタイムで同期します。このトピックでは、Kafka から StarRocks にデータを同期するためのリアルタイム ETL 同期タスクを作成する方法について説明します。
制限事項
使用する Kafka サービスのバージョンは 0.10.2 から 2.2.0 の範囲である必要があります。
前提条件
サーバーレスリソースグループを購入済みであること。
Kafka および StarRocks データソースを作成済みであること。詳細については、「Data Integration でデータソースを作成する」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立済みであること。詳細については、「ネットワーク接続ソリューション」をご参照ください。
手順
1. 同期タスクタイプを選択する
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration に移動] をクリックします。
左側のナビゲーションウィンドウで [同期タスク] をクリックし、ページの上部にある [同期タスクの作成] をクリックしてタスク作成ページに移動します。次の基本情報を設定します。
[ソースと宛先]:
Kafka→StarRocks[新しいタスク名]: 同期タスクのカスタム名。
[同期タイプ]:
単一テーブルのリアルタイム同期。
2. ネットワークとリソースを設定する
[ネットワークとリソースの設定] セクションで、同期タスクの [リソースグループ] を選択します。[タスクリソースの使用量] に CU 数を割り当てることができます。
[ソースデータソース] で、追加した
kafkaデータソースを選択します。[宛先データソース] で、追加したStarRocksデータソースを選択し、[接続性のテスト] をクリックします。
ソースと宛先の両方のデータソースが正常に接続されたことを確認したら、[次へ] をクリックします。
3. 同期リンクを設定する
1. Kafka ソースを設定する
ページ上部のデータソース Kafka をクリックして、[Kafka ソース情報] を編集します。

[Kafka ソース情報] セクションで、同期する Kafka クラスターから Topic を選択します。
他のパラメーターはデフォルト値のままにするか、ビジネス要件に基づいて設定を変更します。
右上隅にある [データサンプリング] をクリックします。
表示されるダイアログボックスで、[開始時刻] と [サンプリングデータレコード] パラメーターを設定し、[収集開始] をクリックします。システムは、指定した Kafka Topic からデータをサンプリングします。Kafka Topic のデータをプレビューできます。Kafka Topic のデータは、データ処理ノードのデータプレビューおよび可視化設定の入力データとして使用されます。
[出力フィールド設定] セクションで、必要に応じて同期するフィールドを選択します。
2. データ処理ノードを設定する
アイコンをクリックして、データ処理メソッドを追加できます。次のデータ処理メソッドがサポートされています: データマスキング、文字列の置換、データフィルタリング、JSON 解析、フィールドの編集と値の割り当て。ビジネス要件に基づいてデータ処理メソッドを配置できます。同期タスクが実行されると、指定した処理順序に基づいてデータが処理されます。

データ処理ノードを設定した後、右上隅にある [データ出力のプレビュー] をクリックできます。表示されるダイアログボックスで、[先祖ノードの出力を再取得] をクリックして、データ処理ノードが指定された Kafka Topic からサンプリングされたデータを処理し、処理結果をプレビューできるようにします。

データ処理ノードによって入力データが処理された後に生成される結果をプレビューする前に、Kafka データソースの [データサンプリング] 設定を構成する必要があります。
3. StarRocks 宛先を設定する
ページ上部のデータ宛先 StarRocks をクリックして、[StarRocks 宛先情報] を編集します。

[StarRocks 宛先情報] セクションで、データを書き込む StarRocks テーブルに対して [テーブルを自動的に作成] するか [既存のテーブルを使用] するかを選択します。
テーブルを自動的に作成することを選択した場合、システムはデフォルトでソーステーブルと同じ名前のテーブルを作成します。宛先テーブル名は手動で変更できます。
既存のテーブルを使用することを選択した場合、ドロップダウンリストからターゲットテーブルを選択します。
(オプション) 宛先テーブルのスキーマを変更します。
[テーブルを自動的に作成] を選択した場合は、[テーブルスキーマの編集] をクリックします。表示されるダイアログボックスで、宛先テーブルのスキーマを編集します。また、[先祖ノードの出力列に基づいてテーブルスキーマを再生成] をクリックして、先祖ノードの出力列に基づいてテーブルスキーマを再生成することもできます。生成されたテーブルスキーマから列を選択し、その列をプライマリキーおよびバケット列として設定できます。
説明宛先テーブルにはプライマリキーとバケット列が必要です。そうでない場合、設定は保存できません。
ソースのフィールドと宛先のフィールド間のマッピングを設定します。
上記の設定を完了すると、システムは [同名マッピング] の原則に基づいて、ソースのフィールドと宛先のフィールド間のマッピングを自動的に確立します。ビジネス要件に基づいてマッピングを変更できます。ソースの 1 つのフィールドは、宛先の複数のフィールドにマッピングできます。ソースの複数のフィールドを宛先の同じフィールドにマッピングすることはできません。ソースのフィールドに宛先のマッピングされたフィールドがない場合、ソースのフィールドのデータは宛先に同期されません。
4. アラートルールを設定する
同期タスクの失敗によるビジネスデータ同期の遅延を防ぐために、同期タスクにさまざまなアラートルールを設定できます。
ページの右上隅にある [アラートルールの設定] をクリックして、[アラートルールの設定] パネルに移動します。
[アラートルールの設定] パネルで、[アラートルールの追加] をクリックします。[アラートルールの追加] ダイアログボックスで、パラメーターを設定してアラートルールを構成します。
説明このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了したら、「リアルタイム同期タスクの管理」を参照して、[リアルタイム同期タスク] ページに移動し、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。
アラートルールを管理します。
作成されたアラートルールを有効または無効にできます。また、アラートの重大度レベルに基づいて異なるアラート受信者を指定することもできます。
5. 詳細パラメーターを設定する
DataWorks では、特定のパラメーターの設定を変更できます。ビジネス要件に基づいてこれらのパラメーターの値を変更できます。
予期しないエラーやデータ品質の問題を防ぐために、パラメーターの値を変更する前に、パラメーターの意味を理解することをお勧めします。
設定ページの右上隅にある [詳細パラメーターの設定] をクリックします。
[詳細パラメーターの設定] パネルで、目的のパラメーターの値を変更します。
6. リソースグループを設定する
ページの右上隅にある [リソースグループの設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。
7. 同期タスクのテストを実行する
上記の設定が完了したら、設定ページの右上隅にある [シミュレーション実行] をクリックして、同期タスクがサンプリングされたデータを宛先テーブルに同期できるようにします。同期結果は宛先テーブルで確認できます。同期タスクの特定の設定が無効である場合、テスト実行中に例外が発生した場合、またはダーティデータが生成された場合、システムはリアルタイムでエラーを報告します。これにより、同期タスクの設定を確認し、期待される結果が早期に得られるかどうかを判断できます。
表示されるダイアログボックスで、[開始] および [サンプリングデータレコード] パラメーターなど、指定されたテーブルからのデータサンプリングのパラメーターを設定します。
[収集開始] をクリックして、同期タスクがソースからデータをサンプリングできるようにします。
[プレビュー] をクリックして、同期タスクがサンプリングされたデータを宛先に同期できるようにします。
8. 同期タスクを実行する
同期タスクの設定が完了したら、ページの下部にある [完了] をクリックします。
[同期タスク] ページの [タスク] セクションで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。
[タスク] セクションで同期タスクの [名前または ID] をクリックし、同期タスクの詳細な実行プロセスを表示します。
データ同期タスクの O&M 操作を実行する
同期タスクのステータスを表示する
同期タスクが作成された後、[同期タスク] ページに移動して、ワークスペースで作成されたすべての同期タスクと各同期タスクの基本情報を表示できます。

[アクション] 列で同期タスクを [開始] または [停止] できます。[その他] メニューでは、同期タスクの [編集]、[表示]、およびその他の操作を実行できます。
開始されたタスクについては、[実行の詳細] セクションで基本的な実行ステータスを確認したり、対応するエリアをクリックして実行の詳細を表示したりできます。

Kafka から StarRocks へのリアルタイム同期タスクには 2 つのステージがあります:
[スキーマ移行]: このタブには、宛先テーブルの生成方法 (既存のテーブルの使用またはテーブルの作成) などの情報が表示されます。宛先テーブルの生成方法がテーブルの作成である場合、テーブルの作成に使用される DDL 文が表示されます。
[リアルタイムデータ同期]: このタブには、リアルタイム同期、DDL レコード、およびアラート情報に関する統計が表示されます。
同期タスクを再実行する
特別な場合に、同期するフィールド、宛先テーブルのフィールド、またはテーブル名情報を変更したい場合は、目的の同期タスクの [操作] 列の [再実行] をクリックすることもできます。これにより、システムは宛先に行われた変更を同期します。既に同期され、変更されていないテーブルのデータは再同期されません。
同期タスクの設定を変更せずに直接 [再実行] をクリックして、システムが同期タスクを再実行できるようにします。
同期タスクの設定を変更してから [完了] をクリックします。同期タスクの [操作] 列に表示される [更新の適用] をクリックして、最新の設定を有効にするために同期タスクを再実行します。