すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:コネクタのユースケース

最終更新日:Mar 14, 2025

このトピックでは、ApsaraMQ for Confluent が提供するコネクタ機能について説明します。

redis-enterprise-kafka

コネクタの設定

コネクタを設定するには、次のいずれかの方法を使用できます。

  • Control Center を使用するimage

  • コネクタ構成ファイルをアップロードする

    {
     "name": "RedisEnterpriseSinkConnectorConnector_0",
     "config": {
     "name": "RedisEnterpriseSinkConnectorConnector_0",
     "connector.class": "com.redis.kafka.connect.RedisEnterpriseSinkConnector",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "redis-connect-topic",
     "redis.uri": "redis://r-xxxxxxxxxxxx.redis.rds.aliyuncs.com:6379",
     "redis.type": "STRING",
     "principal.service.name": "****",
     "principal.service.password": "************"
     }
  1. コネクタが設定されている場合、コネクタのステータスは Control Center で「実行中」に変わります。imageimage

コネクタのテスト

  1. クライアントまたは SDK を使用して、redis-connect-topic という名前のトピックにメッセージを送信します。image

  2. コネクタはメッセージ送信ジョブを自動的に実行します。その後、データは設定した ApsaraDB for Redis インスタンスに書き込まれます。image

kafka-connect-s3

Amazon Simple Storage Service(Amazon S3)用 Kafka コネクタは、Object Storage Service(OSS)プロトコルと互換性があります。 S3SinkConnector の設定を構成することで、コネクタをインストールできます。このようにして、Kafka クラスタのトピックからメッセージをエクスポートし、OSS バケットにメッセージをアップロードできます。

コネクタの設定

  1. 次のコードは、コネクタ構成ファイルの内容を示しています。コネクタが設定されると、test101 トピックのメッセージを、指定された Kafka クラスタから OSS の cn-beijing リージョンの lm-xxxx-test バケットの topic_test ディレクトリにエクスポートできます。

    {
     "name": "oss_test",
     "config": {
     "name": "oss_test",
     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "test101",
     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
     "flush.size": "1",
     "schema.compatibility": "NONE",
     // s3.bucket.name パラメータの値は、OSS バケットのディレクトリである必要があります。
     "s3.bucket.name": "topic_test",
     "s3.region": "cn-beijing",
     // OSS にメッセージをアップロードすることを承認されたアカウントの AccessKey ペア。
     "aws.access.key.id": "your_access_key_id",
     "aws.secret.access.key": "******************************",
     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
     "topics.dir": "",
     // OSS バケットに関する情報。
     "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com",
     "principal.service.name": "****",
     "principal.service.password": "********"
     }

debezium-connector-mysql

コネクタの設定

  1. 次のコードは、コネクタ構成ファイルの内容を示しています。コネクタを設定するには、Kafka ブローカーの SASL_SSL を設定する必要があります。詳細については、「データベース履歴パラメータ」をご参照ください。

    {
     "name": "oss_test",
     "config": {
     "name": "oss_test",
     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "test101",
     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
     "flush.size": "1",
     "schema.compatibility": "NONE",
     // s3.bucket.name パラメータの値は、OSS バケットのディレクトリである必要があります。
     "s3.bucket.name": "topic_test",
     "s3.region": "cn-beijing",
     // OSS にメッセージをアップロードすることを承認されたアカウントの AccessKey ペア。
     "aws.access.key.id": "your_access_key_id",
     "aws.secret.access.key": "******************************",
     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
     "topics.dir": "",
     // OSS バケットに関する情報。
     "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com",
     "principal.service.name": "****",
     "principal.service.password": "********"
     }

一般的なエラー

  1. コネクタを設定するには、MySQL ユーザーに RELOAD または FLUSH_TABLES 権限が必要です。そうでない場合、次の図のエラーが発生します。image

  2. コネクタが MySQL データベースからトピックにデータを送信する場合、トピックが存在する必要があります。そうでない場合、次の図のエラーが発生します。 Control Center で auto.create.topics.enable パラメータを true に設定して、トピックを自動的に作成できます。事前にトピックを作成することもできます。imageimage

Connector Rest API

重要

コネクタを強制的に更新すると、実行中のジョブが停止し、コネクタの情報が Control Center に正しく表示されない場合があります。この問題を解決するには、RESTful API 操作を呼び出して、コネクタを手動で削除します。詳細については、「Confluent Platform 用 Kafka Connect REST インターフェース」をご参照ください。

デプロイされているすべてのコネクタの詳細をクエリする

  • API 操作形式:GET /connectors

  • クエリ方法:Postman または CLI を使用して API 操作を呼び出すことができます。

    • Postmanimage

    • CLI

      curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors' --header 'Authorization: Basic xxx'
  • シナリオ:この API 操作を呼び出して、デプロイされているすべてのコネクタの詳細をクエリできます。

コネクタのステータスをクエリする

  • API 操作形式:GET /connectors/(string:name)/tasks/(int:taskid)/status

  • クエリ方法:Postman または CLI を使用して API 操作を呼び出すことができます。

    • Postmanimage

    • CLI

      curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector/status' --header 'Authorization: Basic xxx'
  • シナリオ:この操作を呼び出して、コネクタのデプロイステータスをクエリできます。コネクタのデプロイに失敗した場合、この操作を呼び出して詳細をクエリし、問題のトラブルシューティングを行うことができます。

コネクタを削除する

  • API 操作形式:DELETE /connectors/(string:name)/

  • クエリ方法:Postman または CLI を使用して API 操作を呼び出すことができます。

    • Postmanimage

    • CLI

      curl --location --request DELETE 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector'  --header 'Authorization: Basic xxx'
  • シナリオ:コネクタが強制的に更新および再デプロイされた後、Control Center でコネクタに関する情報をクエリできない場合、この操作を呼び出してコネクタを強制的に削除できます。このようにして、Control Center は期待どおりに実行できます。