This topic describes the connector feature provided by ApsaraMQ for Confluent.
redis-enterprise-kafka
Configure a connector
You can use one of the following methods to configure a connector:
Use Control Center

Upload a connector configuration file
{ "name": "RedisEnterpriseSinkConnectorConnector_0", "config": { "name": "RedisEnterpriseSinkConnectorConnector_0", "connector.class": "com.redis.kafka.connect.RedisEnterpriseSinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "redis-connect-topic", "redis.uri": "redis://r-xxxxxxxxxxxx.redis.rds.aliyuncs.com:6379", "redis.type": "STRING", "principal.service.name": "****", "principal.service.password": "************" }
If the connector is configured, the status of the connector changes to Running in Control Center.

Test the connector
Use a client or an SDK to send messages to a topic named redis-connect-topic.
The connector automatically runs the message sending job. Then, the data is written to the ApsaraDB for Redis instance that you configured.
kafka-connect-s3
The Kafka connector for Amazon Simple Storage Service (Amazon S3) is compatible with Object Storage Service (OSS) protocols. You can install the connector by configuring the settings of S3SinkConnector. This way, you can export messages from topics in Kafka clusters and upload the messages to OSS buckets.
Configure a connector
The following code shows the content of a connector configuration file. After the connector is configured, messages in the test101 topic can be exported from the specified Kafka cluster to the topic_test directory of the lm-xxxx-test bucket in the cn-beijing region of OSS.
{ "name": "oss_test", "config": { "name": "oss_test", "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "test101", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "flush.size": "1", "schema.compatibility": "NONE", // The value of the s3.bucket.name parameter must be a directory of an OSS bucket. "s3.bucket.name": "topic_test", "s3.region": "cn-beijing", // The AccessKey pair of the account that is authorized to upload the messages to OSS. "aws.access.key.id": "your_access_key_id", "aws.secret.access.key": "******************************", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "topics.dir": "", // The information about the OSS bucket. "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com", "principal.service.name": "****", "principal.service.password": "********" }
debezium-connector-mysql
Configure a connector
The following code shows the content of a connector configuration file. To configure the connector, you must configure SASL_SSL for Kafka brokers. For more information, see Database history parameters.
{ "name": "oss_test", "config": { "name": "oss_test", "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "test101", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "flush.size": "1", "schema.compatibility": "NONE", // The value of the s3.bucket.name parameter must be a directory of an OSS bucket. "s3.bucket.name": "topic_test", "s3.region": "cn-beijing", // The AccessKey pair of the account that is authorized to upload the messages to OSS. "aws.access.key.id": "your_access_key_id", "aws.secret.access.key": "******************************", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "topics.dir": "", // The information about the OSS bucket. "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com", "principal.service.name": "****", "principal.service.password": "********" }
Common errors
To configure a connector, the MySQL user must have the RELOAD or FLUSH_TABLES permission. Otherwise, the error in the following figure occurs.
When a connector sends data from a MySQL database to a topic, the topic must exist. Otherwise, the errors in the following figure occur. You can set the auto.create.topics.enable parameter to true in Control Center to automatically create a topic. You can also create a topic in advance.

Connector Rest API
If you forcibly update a connector, an ongoing job may stop and the information about the connector may be incorrectly displayed in Control Center. To resolve the issue, you can call a RESTful API operation to manually delete the connector. For more information, see Kafka Connect REST Interface for Confluent Platform.
Query the details of all deployed connectors
API operation format: GET /connectors
Query method: You can use Postman or CLI to call the API operation.
Postman

CLI
curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors' --header 'Authorization: Basic xxx'
Scenario: You can call this API operation to query the details of all deployed connectors.
Query the status of a connector
API operation format: GET /connectors/(string:name)/tasks/(int:taskid)/status
Query method: You can use Postman or CLI to call the API operation.
Postman

CLI
curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector/status' --header 'Authorization: Basic xxx'
Scenario: You can call this operation to query the deployment status of a connector. If a connector fails to be deployed, you can call this operation to query the details and troubleshoot the issue.
Delete a connector
API operation format: DELETE /connectors/(string:name)/
Query method: You can use Postman or CLI to call the API operation.
Postman

CLI
curl --location --request DELETE 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector' --header 'Authorization: Basic xxx'
Scenario: If the information about a connector fails to be queried in Control Center after a connector is forcibly updated and redeployed, you can call this operation to forcibly delete the connector. This way, Control Center can run as expected.