このトピックでは、ApsaraMQ for Confluent が提供するコネクタ機能について説明します。
redis-enterprise-kafka
コネクタの設定
コネクタを設定するには、次のいずれかの方法を使用できます。
Control Center を使用する

コネクタ構成ファイルをアップロードする
{ "name": "RedisEnterpriseSinkConnectorConnector_0", "config": { "name": "RedisEnterpriseSinkConnectorConnector_0", "connector.class": "com.redis.kafka.connect.RedisEnterpriseSinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "redis-connect-topic", "redis.uri": "redis://r-xxxxxxxxxxxx.redis.rds.aliyuncs.com:6379", "redis.type": "STRING", "principal.service.name": "****", "principal.service.password": "************" }
コネクタが設定されている場合、コネクタのステータスは Control Center で「実行中」に変わります。


コネクタのテスト
クライアントまたは SDK を使用して、redis-connect-topic という名前のトピックにメッセージを送信します。

コネクタはメッセージ送信ジョブを自動的に実行します。その後、データは設定した ApsaraDB for Redis インスタンスに書き込まれます。

kafka-connect-s3
Amazon Simple Storage Service(Amazon S3)用 Kafka コネクタは、Object Storage Service(OSS)プロトコルと互換性があります。 S3SinkConnector の設定を構成することで、コネクタをインストールできます。このようにして、Kafka クラスタのトピックからメッセージをエクスポートし、OSS バケットにメッセージをアップロードできます。
コネクタの設定
次のコードは、コネクタ構成ファイルの内容を示しています。コネクタが設定されると、test101 トピックのメッセージを、指定された Kafka クラスタから OSS の cn-beijing リージョンの lm-xxxx-test バケットの topic_test ディレクトリにエクスポートできます。
{ "name": "oss_test", "config": { "name": "oss_test", "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "test101", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "flush.size": "1", "schema.compatibility": "NONE", // s3.bucket.name パラメータの値は、OSS バケットのディレクトリである必要があります。 "s3.bucket.name": "topic_test", "s3.region": "cn-beijing", // OSS にメッセージをアップロードすることを承認されたアカウントの AccessKey ペア。 "aws.access.key.id": "your_access_key_id", "aws.secret.access.key": "******************************", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "topics.dir": "", // OSS バケットに関する情報。 "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com", "principal.service.name": "****", "principal.service.password": "********" }
debezium-connector-mysql
コネクタの設定
次のコードは、コネクタ構成ファイルの内容を示しています。コネクタを設定するには、Kafka ブローカーの SASL_SSL を設定する必要があります。詳細については、「データベース履歴パラメータ」をご参照ください。
{ "name": "oss_test", "config": { "name": "oss_test", "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "test101", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "flush.size": "1", "schema.compatibility": "NONE", // s3.bucket.name パラメータの値は、OSS バケットのディレクトリである必要があります。 "s3.bucket.name": "topic_test", "s3.region": "cn-beijing", // OSS にメッセージをアップロードすることを承認されたアカウントの AccessKey ペア。 "aws.access.key.id": "your_access_key_id", "aws.secret.access.key": "******************************", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "topics.dir": "", // OSS バケットに関する情報。 "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com", "principal.service.name": "****", "principal.service.password": "********" }
一般的なエラー
コネクタを設定するには、MySQL ユーザーに RELOAD または FLUSH_TABLES 権限が必要です。そうでない場合、次の図のエラーが発生します。

コネクタが MySQL データベースからトピックにデータを送信する場合、トピックが存在する必要があります。そうでない場合、次の図のエラーが発生します。 Control Center で auto.create.topics.enable パラメータを true に設定して、トピックを自動的に作成できます。事前にトピックを作成することもできます。


Connector Rest API
コネクタを強制的に更新すると、実行中のジョブが停止し、コネクタの情報が Control Center に正しく表示されない場合があります。この問題を解決するには、RESTful API 操作を呼び出して、コネクタを手動で削除します。詳細については、「Confluent Platform 用 Kafka Connect REST インターフェース」をご参照ください。
デプロイされているすべてのコネクタの詳細をクエリする
API 操作形式:GET /connectors
クエリ方法:Postman または CLI を使用して API 操作を呼び出すことができます。
Postman

CLI
curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors' --header 'Authorization: Basic xxx'
シナリオ:この API 操作を呼び出して、デプロイされているすべてのコネクタの詳細をクエリできます。
コネクタのステータスをクエリする
API 操作形式:GET /connectors/(string:name)/tasks/(int:taskid)/status
クエリ方法:Postman または CLI を使用して API 操作を呼び出すことができます。
Postman

CLI
curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector/status' --header 'Authorization: Basic xxx'
シナリオ:この操作を呼び出して、コネクタのデプロイステータスをクエリできます。コネクタのデプロイに失敗した場合、この操作を呼び出して詳細をクエリし、問題のトラブルシューティングを行うことができます。
コネクタを削除する
API 操作形式:DELETE /connectors/(string:name)/
クエリ方法:Postman または CLI を使用して API 操作を呼び出すことができます。
Postman

CLI
curl --location --request DELETE 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector' --header 'Authorization: Basic xxx'
シナリオ:コネクタが強制的に更新および再デプロイされた後、Control Center でコネクタに関する情報をクエリできない場合、この操作を呼び出してコネクタを強制的に削除できます。このようにして、Control Center は期待どおりに実行できます。