All Products
Search
Document Center

ApsaraMQ for Kafka:Use cases of connectors

Last Updated:May 29, 2025

This topic describes the connector feature provided by ApsaraMQ for Confluent.

redis-enterprise-kafka

Configure a connector

You can use one of the following methods to configure a connector:

  • Use Control Centerimage

  • Upload a connector configuration file

    {
     "name": "RedisEnterpriseSinkConnectorConnector_0",
     "config": {
     "name": "RedisEnterpriseSinkConnectorConnector_0",
     "connector.class": "com.redis.kafka.connect.RedisEnterpriseSinkConnector",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "redis-connect-topic",
     "redis.uri": "redis://r-xxxxxxxxxxxx.redis.rds.aliyuncs.com:6379",
     "redis.type": "STRING",
     "principal.service.name": "****",
     "principal.service.password": "************"
     }
  1. If the connector is configured, the status of the connector changes to Running in Control Center.imageimage

Test the connector

  1. Use a client or an SDK to send messages to a topic named redis-connect-topic.image

  2. The connector automatically runs the message sending job. Then, the data is written to the ApsaraDB for Redis instance that you configured.image

kafka-connect-s3

The Kafka connector for Amazon Simple Storage Service (Amazon S3) is compatible with Object Storage Service (OSS) protocols. You can install the connector by configuring the settings of S3SinkConnector. This way, you can export messages from topics in Kafka clusters and upload the messages to OSS buckets.

Configure a connector

  1. The following code shows the content of a connector configuration file. After the connector is configured, messages in the test101 topic can be exported from the specified Kafka cluster to the topic_test directory of the lm-xxxx-test bucket in the cn-beijing region of OSS.

    {
     "name": "oss_test",
     "config": {
     "name": "oss_test",
     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "test101",
     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
     "flush.size": "1",
     "schema.compatibility": "NONE",
     // The value of the s3.bucket.name parameter must be a directory of an OSS bucket.
     "s3.bucket.name": "topic_test",
     "s3.region": "cn-beijing",
     // The AccessKey pair of the account that is authorized to upload the messages to OSS.
     "aws.access.key.id": "your_access_key_id",
     "aws.secret.access.key": "******************************",
     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
     "topics.dir": "",
     // The information about the OSS bucket.
     "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com",
     "principal.service.name": "****",
     "principal.service.password": "********"
     }

debezium-connector-mysql

Configure a connector

  1. The following code shows the content of a connector configuration file. To configure the connector, you must configure SASL_SSL for Kafka brokers. For more information, see Database history parameters.

    {
     "name": "oss_test",
     "config": {
     "name": "oss_test",
     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "test101",
     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
     "flush.size": "1",
     "schema.compatibility": "NONE",
     // The value of the s3.bucket.name parameter must be a directory of an OSS bucket.
     "s3.bucket.name": "topic_test",
     "s3.region": "cn-beijing",
     // The AccessKey pair of the account that is authorized to upload the messages to OSS.
     "aws.access.key.id": "your_access_key_id",
     "aws.secret.access.key": "******************************",
     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
     "topics.dir": "",
     // The information about the OSS bucket.
     "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com",
     "principal.service.name": "****",
     "principal.service.password": "********"
     }

Common errors

  1. To configure a connector, the MySQL user must have the RELOAD or FLUSH_TABLES permission. Otherwise, the error in the following figure occurs.image

  2. When a connector sends data from a MySQL database to a topic, the topic must exist. Otherwise, the errors in the following figure occur. You can set the auto.create.topics.enable parameter to true in Control Center to automatically create a topic. You can also create a topic in advance.imageimage

Connector Rest API

Important

If you forcibly update a connector, an ongoing job may stop and the information about the connector may be incorrectly displayed in Control Center. To resolve the issue, you can call a RESTful API operation to manually delete the connector. For more information, see Kafka Connect REST Interface for Confluent Platform.

Query the details of all deployed connectors

  • API operation format: GET /connectors

  • Query method: You can use Postman or CLI to call the API operation.

    • Postmanimage

    • CLI

      curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors' --header 'Authorization: Basic xxx'
  • Scenario: You can call this API operation to query the details of all deployed connectors.

Query the status of a connector

  • API operation format: GET /connectors/(string:name)/tasks/(int:taskid)/status

  • Query method: You can use Postman or CLI to call the API operation.

    • Postmanimage

    • CLI

      curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector/status' --header 'Authorization: Basic xxx'
  • Scenario: You can call this operation to query the deployment status of a connector. If a connector fails to be deployed, you can call this operation to query the details and troubleshoot the issue.

Delete a connector

  • API operation format: DELETE /connectors/(string:name)/

  • Query method: You can use Postman or CLI to call the API operation.

    • Postmanimage

    • CLI

      curl --location --request DELETE 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector'  --header 'Authorization: Basic xxx'
  • Scenario: If the information about a connector fails to be queried in Control Center after a connector is forcibly updated and redeployed, you can call this operation to forcibly delete the connector. This way, Control Center can run as expected.