すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Function Compute シンクコネクタの作成

最終更新日:Apr 15, 2025

このトピックでは、ApsaraMQ for RocketMQ コンソールでシンクコネクタを作成し、ApsaraMQ for RocketMQ インスタンスから Function Compute 関数にデータを同期する方法について説明します。

前提条件

Function Compute とは

Function Compute は、フルマネージドのイベント駆動型サーバーレスコンピューティングサービスです。 Function Compute を使用すると、サーバーなどのインフラストラクチャリソースを管理することなく、コードの記述とアップロードに集中できます。 Function Compute は、信頼性の高い方法でコードを実行するための弾力的なコンピューティングリソースを準備します。詳細については、「What is Function Compute?」をご参照ください。

Function Compute の用途

  • 関数を使用してビジネスメッセージを処理し、Function Compute プラットフォームを使用してビジネスメッセージの処理ロジックを開発および実行できます。また、Function Compute を使用して注文を処理し、タスクを実行することもできます。

  • 関数を使用してメッセージを迅速に処理し、ETL (抽出、変換、書き出し) に基づいてデータクレンジングを実行できます。

  • Function Compute を使用して、特定の VPC (仮想プライベートクラウド) 内の他のダウンストリームシステムにメッセージをダンプできます。

  • 関数を使用して、メッセージングシステムと別の Alibaba Cloud サービスを接続し、サービスにメッセージを配信できます。

シンクコネクタを作成する

  1. ApsaraMQ for Kafka コンソール にログインします。 リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、[コネクタエコシステム統合] > [タスク] を選択します。

  3. [タスク] ページで、[タスクの作成] をクリックします。

  4. タスクの作成 ページで、タスク名 パラメーターと 説明 パラメーターを構成し、画面の指示に従って他のパラメーターを構成します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。

        パラメーター

        説明

        [リージョン]

        ApsaraMQ for Kafka インスタンスが存在するリージョン。

        中国 (北京)

        [apsaramq For Kafka インスタンス]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。

        MQ_INST_115964845466****_ByBeUp3p

        [トピック]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンスのトピック。

        topic

        [グループ ID]

        ApsaraMQ for Kafka インスタンスのコンシューマーグループの名前。メッセージルーティングソースを作成するには、別のコンシューマーグループを使用する必要があります。使用中のコンシューマーグループは使用しないでください。そうしないと、既存のメッセージの送受信に失敗する可能性があります。

        GID_http_1

        [コンシューマーオフセット]

        メッセージが消費されるオフセット。

        最新のオフセット

        [ネットワーク構成]

        メッセージをルーティングするネットワークのタイプ。

        ベーシックネットワーク

        [VPC]

        ApsaraMQ for Kafka インスタンスがデプロイされている仮想プライベートクラウド (VPC) の ID。このパラメーターは、[ネットワーク構成] パラメーターを [セルフマネージドインターネット] に設定した場合にのみ必須です。

        vpc-bp17fapfdj0dwzjkd****

        [vswitch]

        ApsaraMQ for Kafka インスタンスが関連付けられている vSwitch の ID。このパラメーターは、[ネットワーク構成] パラメーターを [セルフマネージドインターネット] に設定した場合にのみ必須です。

        vsw-bp1gbjhj53hdjdkg****

        [セキュリティグループ]

        ApsaraMQ for Kafka インスタンスが属するセキュリティグループ。このパラメーターは、[ネットワーク構成] パラメーターを [セルフマネージドインターネット] に設定した場合にのみ必須です。

        alikafka_pre-cn-7mz2****

        一括プッシュの件数

        関数呼び出しごとに送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。

        100

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。

        3

      2. Filtering (フィルタリング) ステップで、パターン内容 コードエディターでデータパターンを定義してデータをフィルタリングします。詳細については、「イベントパターン」をご参照ください。

      3. Transform (変換) ステップで、データ分割、マッピング、エンリッチメント、およびルーティング機能を実装するためのデータクレンジング方法を指定します。詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。

      4. Sink (ターゲット) ステップで、サービスタイプ パラメーターを [function Compute] に設定し、画面の指示に従って他のパラメーターを構成します。次の表にパラメーターを示します。

        パラメーター

        説明

        [関数]

        作成した Function Compute 関数。

        test

        バージョンとエイリアス

        サービスのバージョンまたはエイリアス。有効な値:

        • [指定バージョン]

        • [指定エイリアス]

        指定バージョン

        [バージョン]

        関数のバージョン。最新バージョンを選択することをお勧めします。このパラメーターは、[サービスバージョンとエイリアス] パラメーターを [指定バージョン] に設定した場合にのみ必須です。

        LATEST

        エイリアス

        関数のエイリアス。このパラメーターは、[サービスバージョンとエイリアス] パラメーターを [指定エイリアス] に設定した場合にのみ必須です。

        test

        [呼び出しモード]

        呼び出しモード。有効な値: [同期] および [非同期]

        非同期

        配信形式

        • [オブジェクト]: イベントはオブジェクトとしてダウンストリーム関数に配信されます。

        • [オブジェクトリスト]: イベントは配列としてダウンストリーム関数に配信されます。

        オブジェクト

        [イベント]

        EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。

        • [完全なデータ]

        • [データ抽出]

        • [固定値]

        • [テンプレート]

        完全なデータ

    • タスクプロパティ

      イベントのプッシュに失敗した場合に使用するリトライポリシーと、エラーを処理するために使用するメソッドを構成します。詳細については、「リトライポリシーとデッドレターキュー」をご参照ください。

  5. タスクリスト ページに戻り、作成した Function Compute シンクコネクタを見つけて、操作する 列の 有効化する をクリックします。

  6. ヒント メッセージで、OK をクリックします。

    シンクコネクタを有効にするには、30 ~ 60 秒かかります。 タスクリスト ページの Status 列で進捗状況を確認できます。

その他の操作

タスクリスト ページで、管理するメッセージアウトフロータスクを見つけて、操作する 列で他の操作を実行します。次の項目では、実行できる操作について説明します。

  • タスクの詳細を表示する: [操作] 列の 詳細 をクリックします。表示されるページで、タスクの基本情報、プロパティ、およびモニタリングメトリックを表示します。

  • タスクの構成を変更する: [操作] 列の 編集する をクリックします。 [タスクの編集] パネルで、タスクの構成を変更します。

  • タスクを有効または無効にする: [操作] 列の 有効化する または 無効化 をクリックします。 ヒント メッセージで、OK をクリックします。

  • タスクを削除する: [操作] 列の 削除する をクリックします。 ヒント メッセージで、OK をクリックします。