ApsaraMQ for Kafka インスタンスのデータのクレンジング、変換、およびダンプを行うために、抽出、変換、ロード (ETL) タスクを作成して実行できます。このトピックでは、ApsaraMQ for Kafka コンソールで ETL タスクを作成して、ソース トピック内の処理済みデータをデスティネーション トピックに配信する方法について説明します。
前提条件
ETL タスクを実行する前に、以下の操作が実行されていることを確認してください。
ApsaraMQ for Kafka インスタンスでソース トピックとデスティネーション トピックを作成します。詳細については、「手順 1: トピックの作成」をご参照ください。
説明ETL タスクに必要な補助トピックを手動で作成する場合、ETL タスクに関する情報を保存するために使用されるトピックも作成する必要があります。トピックの作成時に設定されるパラメーターの詳細については、このトピックの「ETL タスクの作成」セクションの手順 3 を参照してください。
Function Compute を有効化します。詳細については、「関数をすばやく作成する」をご参照ください。
Resource Access Management (RAM) ユーザーの場合は、必要な権限を取得します。詳細については、「RAM ユーザーへの権限の付与」をご参照ください。
次のコードは、権限を指定するポリシーの例を示しています。
{ "Version": "1", "Statement": [ { "Action": [ "kafka:CreateETLTask", "kafka:ListETLTask", "kafka:DeleteETLTask" ], "Resource": "*", "Effect": "Allow" } ] }
背景情報
ETL とは、データが抽出、変換、およびデスティネーションにロードされるプロセスです。ETL タスクのロジックを実装する関数を記述できます。ApsaraMQ for Kafka は、この関数を呼び出してソース トピック内のデータを処理し、デスティネーション トピックにデータを送信します。
データ処理中、Function Compute は対応するサービスと関数を自動的に作成します。作成されたサービスの名前は、
_FC-kafka-ETL タスク名形式です。ApsaraMQ for Kafka インスタンスのソース トピックとデスティネーション トピックは、同じリージョンに存在する必要があります。
Function Compute では、関数呼び出しのログをクエリして問題をトラブルシューティングできます。詳細については、「ロギング機能の設定」をご参照ください。
ApsaraMQ for Kafka の ETL タスク機能はパブリック プレビュー中です。この機能は、ApsaraMQ for Kafka インスタンスとは独立しています。ApsaraMQ for Kafka は、この機能に対して課金されません。作成した ETL タスクが他のサービスに依存している場合は、対応するサービスの請求ルールを参照してください。
ETL を有効にする
ETL タスクは、ApsaraMQ for Kafka コンソールの Connector Ecosystem Integration モジュールで作成されます。このモジュールは、データのフィルタリングと変換機能を提供します。詳細については、「概要」をご参照ください。
ETL タスク機能を初めて使用する場合は、ApsaraMQ for Kafka が関連サービスにアクセスすることを承認する必要があります。承認を確認すると、システムはサービスにリンクされたロール AliyunServiceRoleForAlikafkaETL を自動的に作成します。ApsaraMQ for Kafka は、このロールを引き受けて、ETL タスクの処理中に使用されるサービスにアクセスできます。詳細については、「サービスにリンクされたロール」をご参照ください。
ApsaraMQ for Kafka コンソール にログインします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーション ペインで、 を選択します。
データ処理タスクリスト ページで、ジョブの作成 をクリックします。
表示される サービスの権限付与 メッセージで、[OK] をクリックします。
ETL タスクの作成
このセクションでは、ソース トピックからデータを抽出し、データを処理し、処理済みデータをデスティネーション トピックにロードする ETL タスクを作成およびデプロイする方法について説明します。
データ処理タスクリスト ページで、ジョブの作成 をクリックします。
基本情報の設定 手順で、タスク名を入力し、次へ をクリックします。
データソースと対象の設定 ステップで、データソース、ターゲットトピック、および消費情報を指定します。次に、次へ をクリックします。
パラメーター
説明
例
インスタンス
ソース トピックとデスティネーション トピックが属するインスタンス。
alikafka_pre-cn-7pp2bz47****
alikafka_post-cn-2r42bvbm****
Topic
ソース トピックとデスティネーション トピック。
説明ソース トピックとデスティネーション トピックは異なるトピックである必要があります。
topic****
test****
消費開始位置
メッセージのコンシュームを開始するオフセット。詳細オプション をクリックした後にのみ、このパラメーターが表示されます。有効な値:
最も早いオフセット: 最も古いオフセットからメッセージをコンシュームします。
最も近いオフセット: 最新のオフセットからメッセージをコンシュームします。
最も近いオフセット
失敗の処理
メッセージの送信に失敗した場合に、後続のメッセージを送信するかどうかを指定します。詳細オプション をクリックした後にのみ、このパラメーターが表示されます。有効な値:
サブスクリプションの継続: メッセージの送信に失敗した場合でも、後続のメッセージを送信します。
サブスクリプションの停止: メッセージの送信に失敗した場合、後続のメッセージを送信しません。
サブスクリプションの継続
リソースの作成方法
ETL タスクに必要な補助トピックを作成するために使用される方法。詳細オプション をクリックした後にのみ、このパラメーターが表示されます。有効な値:
自動作成
手動で作成する
自動作成
データ処理タスクのコンシューマーグループ
ETL タスクで使用される コンシューマー グループ。リソースの作成方法 パラメーターを 手動で作成する に設定した場合にのみ、このパラメーターが表示されます。コンシューマー グループ 名のプレフィックスとして etl-cluster を使用することをお勧めします。
etl-cluster-kafka
タスクサイトの Topic
コンシューマー オフセットを格納するために使用されるトピック。手動で作成する パラメーターを リソースの作成方法 に設定した場合にのみ、このパラメーターが表示されます。
トピック: トピックの名前。トピック名のプレフィックスとして etl-offset を使用することをお勧めします。
パーティション: トピック内のパーティションの数。このパラメーターは 1 より大きい値に設定する必要があります。
ストレージ エンジン: トピックで使用されるストレージ エンジン。このパラメーターはローカル ストレージに設定する必要があります。
説明プロフェッショナル版インスタンスでトピックを作成する場合にのみ、ストレージ エンジン パラメーターをローカル ストレージに設定できます。
cleanup.policy: トピックで使用されるログ クリーンアップ ポリシー。このパラメーターは Compact に設定する必要があります。
etl-offset-kafka
タスク設定の Topic
タスク構成を格納するために使用されるトピック。手動で作成する パラメーターを リソースの作成方法 に設定した場合にのみ、このパラメーターが表示されます。
トピック: トピックの名前。トピック名のプレフィックスとして etl-config を使用することをお勧めします。
パーティション: トピック内のパーティションの数。このパラメーターは 1 に設定する必要があります。
ストレージ エンジン: トピックで使用されるストレージ エンジン。このパラメーターはローカル ストレージに設定する必要があります。
説明プロフェッショナル版インスタンスでトピックを作成する場合にのみ、ストレージ エンジン パラメーターをローカル ストレージに設定できます。
cleanup.policy: トピックで使用されるログ クリーンアップ ポリシー。このパラメーターは Compact に設定する必要があります。
etl-config-kafka
タスクステータスの Topic
タスク ステータスを格納するために使用されるトピック。手動で作成する パラメーターを リソースの作成方法 に設定した場合にのみ、このパラメーターが表示されます。
トピック: トピックの名前。トピック名のプレフィックスとして etl-status を使用することをお勧めします。
パーティション: トピック内のパーティションの数。このパラメーターは 6 に設定することをお勧めします。
ストレージ エンジン: トピックで使用されるストレージ エンジン。このパラメーターはローカル ストレージに設定する必要があります。
説明プロフェッショナル版インスタンスでトピックを作成する場合にのみ、ストレージ エンジン パラメーターをローカル ストレージに設定できます。
cleanup.policy: トピックで使用されるログ クリーンアップ ポリシー。このパラメーターは Compact に設定する必要があります。
etl-status-kafka
デッドレターキューの Topic
ETL フレームワークのエラー データを格納するために使用されるトピック。手動で作成する パラメーターを リソースの作成方法 に設定した場合にのみ、このパラメーターが表示されます。トピック リソースを節約するために、このトピックはエラー データ トピックと同じにすることができます。
トピック: トピックの名前。トピック名のプレフィックスとして etl-error を使用することをお勧めします。
パーティション: トピック内のパーティションの数。このパラメーターは 6 に設定することをお勧めします。
ストレージ エンジン: トピックで使用されるストレージ エンジン。このパラメーターは、ローカル ストレージまたはクラウド ストレージに設定できます。
説明プロフェッショナル版インスタンスでトピックを作成する場合にのみ、ストレージ エンジン パラメーターをローカル ストレージに設定できます。
etl-error-kafka
例外データ Topic
シンクのエラー データを格納するために使用されるトピック。手動で作成する パラメーターを リソースの作成方法 に設定した場合にのみ、このパラメーターが表示されます。トピック リソースを節約するために、このトピックは デッドレター キュー トピック と同じにすることができます。
トピック: トピックの名前。トピック名のプレフィックスとして etl-error を使用することをお勧めします。
パーティション: トピック内のパーティションの数。このパラメーターは 6 に設定することをお勧めします。
ストレージ エンジン: トピックで使用されるストレージ エンジン。このパラメーターは、ローカル ストレージまたはクラウド ストレージに設定できます。
説明プロフェッショナル版インスタンスでトピックを作成する場合にのみ、ストレージ エンジン パラメーターをローカル ストレージに設定できます。
etl-error-kafka
処理関数の設定 ステップで、パラメーターを設定し、作成 をクリックします。
作成 をクリックする前に、テスト をクリックして、関数が期待どおりに動作するかどうかをテストできます。
パラメーター
説明
例
関数言語
関数が記述されている言語。このパラメーターを Python 3 に設定します。
Python3
関数テンプレート
システムが提供する関数テンプレート。関数テンプレートを選択すると、システムによって [コード] フィールドに関数テンプレートのコードが自動的に入力されます。
接頭辞/接尾辞の追加
関数コード
メッセージの処理に使用するコード。ApsaraMQ for Kafka は、データのクレンジと変換に使用できる関数テンプレートを提供します。ビジネス要件に基づいて、選択した関数テンプレートのコードを変更できます。
説明要件に基づいて Python モジュールをインポートできます。
コード内のメッセージは辞書形式です。キーと値を変更するだけで済みます。
処理されたメッセージを返します。関数がメッセージのフィルタリングに使用される場合は、None を返します。
def deal_message(message): for keyItem in message.keys(): if (keyItem == 'key'): # メッセージキーに接尾辞を追加します。 message[keyItem] = message[keyItem] + "KeySurfix" continue if (keyItem == 'value'): # メッセージ値に接尾辞を追加します。 message[keyItem] = message[keyItem] + "ValueSurfix" continue return messageメッセージキー
ソース トピックで処理されるメッセージのキー。テストコード をクリックすると、パラメーターが表示されます。
demo
メッセージの内容
ソース トピックで処理されるメッセージの値。
{"key": "test"}
ETL タスクが作成されると、データ処理タスクリスト ページでタスクを表示できます。システムはタスクを自動的にデプロイします。
テスト メッセージを送信する
ETL タスクがデプロイされた後、ソースの ApsaraMQ for Kafka トピックにテスト メッセージを送信して、設定された関数によってデータが処理され、デスティネーション トピックに送信されるかどうかをテストできます。
データ処理タスクリスト ページで、作成した ETL タスクを見つけ、テスト操作 列の をクリックします。
メッセージの送信 パネルで、テストメッセージの情報を入力し、[OK] をクリックしてテストメッセージを送信します。
メッセージキー フィールドに、テストメッセージのキーを入力します。例:demo。
メッセージの内容 フィールドに、テストメッセージのコンテンツを入力します。例:{"key": "test"}。
指定されたパーティションに送信 パラメーターを設定して、テストメッセージを特定のパーティションに送信するかどうかを指定します。
テストメッセージを特定のパーティションに送信する場合は、パーティション ID をクリックし、はい フィールドにパーティション ID を入力します。例:0。パーティション ID を照会する方法については、パーティションの状態の表示 をご参照ください。
テストメッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。
関数ログを表示する
ApsaraMQ for Kafka でデータを抽出して処理した後、関数のログを表示して、デスティネーション トピックが処理済みデータを受信したかどうかを確認できます。詳細については、「ロギング機能の設定」をご参照ください。
図 1. 関数ログの表示
ETL タスクの詳細を表示する
ETL タスクを作成した後、ApsaraMQ for Kafka コンソールで詳細を表示できます。
データ処理タスクリスト ページで、作成した ETL タスクを見つけ、詳細操作 列の をクリックします。
タスクの詳細 ページで、ETL タスクの詳細を表示します。
ETL タスクを削除する
ETL タスクが不要になった場合は、ApsaraMQ for Kafka コンソールでタスクを削除できます。
データ処理タスクリスト ページで、削除する ETL タスクを見つけ、削除操作 列の をクリックします。
表示される [注記] メッセージで、[OK] をクリックします。