このトピックでは、ApsaraMQ for RocketMQ コンソールでシンクコネクタを作成し、ApsaraMQ for RocketMQ インスタンスから Function Compute 関数にデータを同期する方法について説明します。
前提条件
ApsaraMQ for Kafka インスタンスが購入され、デプロイされていること。[実行中] 状態であることを確認してください。詳細については、「ステップ 2: インスタンスを購入してデプロイする」をご参照ください。
Function Compute がアクティブ化されていること。詳細については、「関数をすばやく作成する」をご参照ください。
EventBridge がアクティブ化されており、必要な権限が Resource Access Management (RAM) ユーザーに付与されていること。詳細については、「EventBridge をアクティブ化し、RAM ユーザーに権限を付与する」をご参照ください。
Function Compute とは
Function Compute は、フルマネージドのイベント駆動型サーバーレスコンピューティングサービスです。 Function Compute を使用すると、サーバーなどのインフラストラクチャリソースを管理することなく、コードの記述とアップロードに集中できます。 Function Compute は、信頼性の高い方法でコードを実行するための弾力的なコンピューティングリソースを準備します。詳細については、「What is Function Compute?」をご参照ください。
Function Compute の用途
関数を使用してビジネスメッセージを処理し、Function Compute プラットフォームを使用してビジネスメッセージの処理ロジックを開発および実行できます。また、Function Compute を使用して注文を処理し、タスクを実行することもできます。
関数を使用してメッセージを迅速に処理し、ETL (抽出、変換、書き出し) に基づいてデータクレンジングを実行できます。
Function Compute を使用して、特定の VPC (仮想プライベートクラウド) 内の他のダウンストリームシステムにメッセージをダンプできます。
関数を使用して、メッセージングシステムと別の Alibaba Cloud サービスを接続し、サービスにメッセージを配信できます。
シンクコネクタを作成する
ApsaraMQ for Kafka コンソール にログインします。 リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
[タスク] ページで、[タスクの作成] をクリックします。
タスクの作成 ページで、タスク名 パラメーターと 説明 パラメーターを構成し、画面の指示に従って他のパラメーターを構成します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。
タスクの作成
Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
[リージョン]
ApsaraMQ for Kafka インスタンスが存在するリージョン。
中国 (北京)
[apsaramq For Kafka インスタンス]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。
MQ_INST_115964845466****_ByBeUp3p
[トピック]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンスのトピック。
topic
[グループ ID]
ApsaraMQ for Kafka インスタンスのコンシューマーグループの名前。メッセージルーティングソースを作成するには、別のコンシューマーグループを使用する必要があります。使用中のコンシューマーグループは使用しないでください。そうしないと、既存のメッセージの送受信に失敗する可能性があります。
GID_http_1
[コンシューマーオフセット]
メッセージが消費されるオフセット。
最新のオフセット
[ネットワーク構成]
メッセージをルーティングするネットワークのタイプ。
ベーシックネットワーク
[VPC]
ApsaraMQ for Kafka インスタンスがデプロイされている仮想プライベートクラウド (VPC) の ID。このパラメーターは、[ネットワーク構成] パラメーターを [セルフマネージドインターネット] に設定した場合にのみ必須です。
vpc-bp17fapfdj0dwzjkd****
[vswitch]
ApsaraMQ for Kafka インスタンスが関連付けられている vSwitch の ID。このパラメーターは、[ネットワーク構成] パラメーターを [セルフマネージドインターネット] に設定した場合にのみ必須です。
vsw-bp1gbjhj53hdjdkg****
[セキュリティグループ]
ApsaraMQ for Kafka インスタンスが属するセキュリティグループ。このパラメーターは、[ネットワーク構成] パラメーターを [セルフマネージドインターネット] に設定した場合にのみ必須です。
alikafka_pre-cn-7mz2****
一括プッシュの件数
関数呼び出しごとに送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。
100
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。
3
Filtering (フィルタリング) ステップで、パターン内容 コードエディターでデータパターンを定義してデータをフィルタリングします。詳細については、「イベントパターン」をご参照ください。
Transform (変換) ステップで、データ分割、マッピング、エンリッチメント、およびルーティング機能を実装するためのデータクレンジング方法を指定します。詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。
Sink (ターゲット) ステップで、サービスタイプ パラメーターを [function Compute] に設定し、画面の指示に従って他のパラメーターを構成します。次の表にパラメーターを示します。
パラメーター
説明
例
[関数]
作成した Function Compute 関数。
test
バージョンとエイリアス
サービスのバージョンまたはエイリアス。有効な値:
[指定バージョン]
[指定エイリアス]
指定バージョン
[バージョン]
関数のバージョン。最新バージョンを選択することをお勧めします。このパラメーターは、[サービスバージョンとエイリアス] パラメーターを [指定バージョン] に設定した場合にのみ必須です。
LATEST
エイリアス
関数のエイリアス。このパラメーターは、[サービスバージョンとエイリアス] パラメーターを [指定エイリアス] に設定した場合にのみ必須です。
test
[呼び出しモード]
呼び出しモード。有効な値: [同期] および [非同期]。
非同期
配信形式
[オブジェクト]: イベントはオブジェクトとしてダウンストリーム関数に配信されます。
[オブジェクトリスト]: イベントは配列としてダウンストリーム関数に配信されます。
オブジェクト
[イベント]
EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。
[完全なデータ]
[データ抽出]
[固定値]
[テンプレート]
完全なデータ
タスクプロパティ
イベントのプッシュに失敗した場合に使用するリトライポリシーと、エラーを処理するために使用するメソッドを構成します。詳細については、「リトライポリシーとデッドレターキュー」をご参照ください。
タスクリスト ページに戻り、作成した Function Compute シンクコネクタを見つけて、操作する 列の 有効化する をクリックします。
ヒント メッセージで、OK をクリックします。
シンクコネクタを有効にするには、30 ~ 60 秒かかります。 タスクリスト ページの Status 列で進捗状況を確認できます。
その他の操作
タスクリスト ページで、管理するメッセージアウトフロータスクを見つけて、操作する 列で他の操作を実行します。次の項目では、実行できる操作について説明します。
タスクの詳細を表示する: [操作] 列の 詳細 をクリックします。表示されるページで、タスクの基本情報、プロパティ、およびモニタリングメトリックを表示します。
タスクの構成を変更する: [操作] 列の 編集する をクリックします。 [タスクの編集] パネルで、タスクの構成を変更します。
タスクを有効または無効にする: [操作] 列の 有効化する または 無効化 をクリックします。 ヒント メッセージで、OK をクリックします。
タスクを削除する: [操作] 列の 削除する をクリックします。 ヒント メッセージで、OK をクリックします。