すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:OSS シンクコネクタの作成

最終更新日:Mar 27, 2025

このトピックでは、ApsaraMQ for Kafka インスタンスのソース Topic からオブジェクトストレージサービス(OSS)にデータを同期するために、OSS シンクコネクタを作成する方法について説明します。

前提条件

以下の要件が満たされていること:

使用上の注意

  • ApsaraMQ for Kafka インスタンスの Topic から OSS バケットにデータを同期する場合は、Message Queue for Apache Kafka インスタンスと OSS バケットが同じリージョンにあり、Function Compute が指定されたリージョンで使用可能であることを確認してください。Message Queue for Apache Kafka は、まずデータを Function Compute に同期し、次に Function Compute がデータを OSS に同期します。コネクタの制限の詳細については、制限をご参照ください。

  • OSS シンクコネクタは、Function Compute を使用してデータをエクスポートします。Function Compute は一定量の無料リソースを提供します。この無料枠を使い切った場合、使用した Function Compute リソースに対して課金規則に基づいて課金されます。詳細については、課金概要をご参照ください。

  • Function Compute では、関数呼び出しのログをクエリできます。詳細については、ロギングの設定をご参照ください。

  • ApsaraMQ for Kafka は、メッセージを UTF-8 エンコードの文字列にシリアル化して転送します。Message Queue for Apache Kafka はバイナリデータをサポートしていません。

OSS シンクコネクタの作成とデプロイ

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. 左側のナビゲーションペインで、Connector タスクリスト をクリックします。

  4. Connector タスクリスト ページで、インスタンスの選択 ドロップダウンリストからコネクタが属するインスタンスを選択し、Connector の作成 をクリックします。

  5. Connector の作成 ウィザードで、以下の手順を実行します。

    1. 基本情報の設定 手順で、パラメータを設定し、次へ をクリックします。次の表にパラメータを示します。

      重要

      デフォルトでは、Authorize to Create Service Linked Role が選択されています。これは、ApsaraMQ for Kafka が次のルールに基づいてサービスリンクロールを作成することを意味します。

      • サービスリンクロールが作成されていない場合、ApsaraMQ for Kafka は、OSS シンクコネクタを使用して ApsaraMQ for Kafka から OSS にデータを同期するために、自動的にサービスリンクロールを作成します。

      • サービスリンクロールが使用可能な場合、ApsaraMQ for Kafka は新しいロールを作成しません。

      サービスリンクロールの詳細については、サービスリンクロールをご参照ください。

      パラメータ

      説明

      名前

      コネクタの名前。次の命名規則に基づいてコネクタ名を指定します。

      • コネクタ名は 1 ~ 48 文字で、数字、小文字、ハイフン(-)を含めることができます。名前はハイフン(-)で始めることはできません。

      • 名前は、ApsaraMQ for Kafka インスタンス内で一意である必要があります。

      コネクタのデータ同期タスクでは、connect-タスク名 形式の名前を持つコンシューマーグループを使用する必要があります。グループ このようなコンシューマーグループを作成していない場合、Message Queue for Apache Kafka によって自動的に作成されます。グループ

      kafka-oss-sink

      インスタンス

      Message Queue for Apache Kafka インスタンスの情報。デフォルトでは、インスタンスの名前と ID が表示されます。

      demo alikafka_post-cn-st21p8vj****

    2. ソースサービスの設定 手順で、ソースサービスとして Message Queue for Apache Kafka を選択し、パラメータを設定して、次へ をクリックします。次の表にパラメータを示します。

      パラメータ

      説明

      データソース Topic

      データを同期する Topic の名前。

      oss-test-input

      コンシューマースレッドの同時発生数

      ソース Topic からデータを同期するために使用される同時実行コンシューマースレッドの数。デフォルト値:6。有効な値:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      消費の開始位置

      メッセージ消費を開始するコンシューマーオフセット。有効な値:

      • 一番古いオフセット:メッセージ消費は最も古いコンシューマーオフセットから開始されます。

      • 一番新しいオフセット:メッセージ消費は最新のコンシューマーオフセットから開始されます。

      一番古いオフセット

      VPC ID

      ソースインスタンスがデプロイされている仮想プライベートクラウド(VPC)の ID。実行環境の設定 をクリックすると、パラメータが表示されます。デフォルトでは、ソースの ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した VPC ID が表示されます。このパラメータを設定する必要はありません。

      vpc-bp1xpdnd3l***

      VSwitch ID

      ソースインスタンスが接続されている vSwitch の ID。実行環境の設定 をクリックすると、パラメータが表示されます。vSwitch は、ソースの ApsaraMQ for Kafka インスタンスと同じ VPC にデプロイする必要があります。デフォルトでは、ソースの ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した vSwitch ID が表示されます。

      vsw-bp1d2jgg81***

      失敗の処理

      そのパーティションでメッセージ送信エラーが発生した後、パーティションへのサブスクリプションを保持するかどうかを指定します。実行環境の設定 をクリックすると、パラメータが表示されます。有効な値:

      • サブスクリプションの継続:サブスクリプションを保持します。エラーのログエントリが生成されます。

      • サブスクリプションの停止:サブスクリプションを停止します。エラーのログエントリが生成されます。

      説明
      • 詳細については、コネクタの管理をご参照ください。

      • エラーコードに基づいてエラーをトラブルシューティングする方法については、エラーコードをご参照ください。

      サブスクリプションの継続

      リソースの作成方法

      OSS シンクコネクタに必要な Topic とグループを作成する方法。実行環境の設定 をクリックすると、パラメータが表示されます。

      • 自動作成

      • 手動で作成します

      自動作成

      Connector コンシューマーグループ

      OSS シンクコネクタに必要なコンシューマーグループの名前。グループ 実行環境の設定 をクリックすると、パラメータが表示されます。名前は connect-cluster で始めることをお勧めします。グループ

      connect-cluster-kafka-oss-sink

      タスクサイトの Topic

      コンシューマーオフセットを保存するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。

      • Topic: Topic 名は connect-offset で始めることをお勧めします。

      • パーティション: Topic のパーティション数は 1 より大きい必要があります。

      • ストレージエンジン: Topic のストレージエンジンをローカルストレージに設定する必要があります。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      • cleanup.policy: Topic のログクリーンアップポリシーを Compact に設定する必要があります。

      connect-offset-kafka-oss-sink

      タスク設定の Topic

      タスク設定を保存するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。

      • Topic: Topic 名は connect-config で始めることをお勧めします。

      • パーティション: Topic には 1 つのパーティションのみを含めることができます。

      • ストレージエンジン: Topic のストレージエンジンをローカルストレージに設定する必要があります。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      • cleanup.policy: Topic のログクリーンアップポリシーを Compact に設定する必要があります。

      connect-config-kafka-oss-sink

      タスクステータスの Topic

      タスクステータスを保存するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。

      • Topic: Topic 名は connect-status で始めることをお勧めします。

      • パーティション: Topic のパーティション数を 6 に設定することをお勧めします。

      • ストレージエンジン: Topic のストレージエンジンをローカルストレージに設定する必要があります。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      • cleanup.policy: Topic のログクリーンアップポリシーを Compact に設定する必要があります。

      connect-status-kafka-oss-sink

      デッドレターキューの Topic

      Kafka Connect フレームワークのエラーデータを保存するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。Topic リソースを節約するために、 Topic を作成し、その Topic をデッドレターキュー Topic とエラーデータ Topic の両方として使用できます。

      • Topic: Topic 名は connect-error で始めることをお勧めします。

      • パーティション: Topic のパーティション数を 6 に設定することをお勧めします。

      • ストレージエンジン: Topic のストレージエンジンをローカルストレージまたはクラウドストレージに設定できます。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      connect-error-kafka-oss-sink

      例外データ Topic

      シンクコネクタのエラーデータを保存するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。Topic リソースを節約するために、 Topic を作成し、その Topic をデッドレターキュー Topic とエラーデータ Topic の両方として使用できます。

      • Topic: Topic 名は connect-error で始めることをお勧めします。

      • パーティション: Topic のパーティション数を 6 に設定することをお勧めします。

      • ストレージエンジン: Topic のストレージエンジンをローカルストレージまたはクラウドストレージに設定できます。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      connect-error-kafka-oss-sink

    3. ターゲットサービスの設定 手順で、宛先サービスとして オブジェクトストレージサービス を選択し、パラメータを設定して、作成 をクリックします。次の表にパラメータを示します。

      パラメータ

      説明

      バケット名

      データを同期する OSS バケットの名前。

      bucket_test

      Access Key

      Alibaba Cloud アカウントの AccessKey ID。

      yourAccessKeyID

      Secret Key

      Alibaba Cloud アカウントの AccessKey Secret。

      yourAccessKeySecret

      最小権限の原則に従って、Alibaba Cloud アカウントに次の権限が付与されていることを確認してください。

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "oss:GetObject", // オブジェクトの取得
                      "oss:PutObject"  // オブジェクトの配置
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
      説明

      データ同期タスクの作成時に、AccessKey ID と AccessKey Secret は環境変数として OSS に渡されます。タスクの作成後、ApsaraMQ for Kafka は Alibaba Cloud アカウントの AccessKey ID または AccessKey Secret を保存しません。ApsaraMQ for Kafka

      コネクタが作成されると、Connector タスクリスト ページでコネクタを表示できます。

  6. Connector タスクリスト ページに移動し、作成したコネクタを見つけ、デプロイ操作 列の をクリックします。

メッセージの送信

OSS シンクコネクタをデプロイした後、ApsaraMQ for Kafka インスタンスのソース Topic にメッセージを送信して、メッセージが OSS に同期されるかどうかをテストします。

  1. Connector タスクリスト ページで、管理するコネクタを見つけ、テスト操作 列の をクリックします。

  2. メッセージの送信 パネルで、パラメータを設定してテストメッセージを送信します。

    • 送信方法 パラメータを コンソール に設定した場合は、次の手順を実行します。

      1. メッセージキー フィールドにメッセージキーを入力します。例:demo。

      2. メッセージの内容 フィールドにメッセージの内容を入力します。例:{"key": "test"}。

      3. 指定されたパーティションに送信 パラメータを設定して、テストメッセージを特定のパーティションに送信するかどうかを指定します。

        • テストメッセージを特定のパーティションに送信する場合は、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例:0。パーティション ID をクエリする方法については、パーティションステータスの表示をご参照ください。

        • テストメッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。

    • 送信方法 パラメータを Docker に設定した場合は、Docker コンテナーを実行してサンプルメッセージを生成する セクションの Docker コマンドを実行して、テストメッセージを送信します。

    • 送信方法 パラメータを SDK に設定した場合は、必要なプログラミング言語またはフレームワークの SDK と、テストメッセージを送受信するためのアクセス方法を選択します。

結果の確認

ApsaraMQ for Kafka インスタンスのソース Topic にテストメッセージを送信した後、OSS コンソールの指定された OSS バケットのファイルページで、メッセージが OSS に同期されているかどうかを確認できます。詳細については、概要をご参照ください。

OSS バケットに新しいオブジェクトが生成された場合、データは OSS に同期されます。

files

ApsaraMQ for Kafka から OSS に同期されるデータの形式は次のとおりです。

[
    {
        "key":"123", // メッセージキー
        "offset":4, // オフセット
        "overflowFlag":true, // オーバーフローフラグ
        "partition":0, // パーティション
        "timestamp":1603779578478, // タイムスタンプ
        "topic":"Test", // Topic
        "value":"1", // 値
        "valueSize":272687 // 値のサイズ
    }
]

関連操作

要件に基づいて、OSS シンクコネクタに必要な Function Compute リソースを設定できます。

Connector タスクリスト ページで、作成したコネクタを見つけ、詳細操作 列の 関数の設定 をクリックし、 を選択します。

Function Compute コンソールにリダイレクトされ、必要に応じてリソースを設定できます。