本文向您介紹如何使用雲訊息佇列 Confluent 版的Connector相關功能。
redis-enterprise-kafka
Connector配置
配置方式分為兩種,在Control Center中自行配置,或者上傳connector config file。
Control Center配置

connector config file配置
{ "name": "RedisEnterpriseSinkConnectorConnector_0", "config": { "name": "RedisEnterpriseSinkConnectorConnector_0", "connector.class": "com.redis.kafka.connect.RedisEnterpriseSinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "redis-connect-topic", "redis.uri": "redis://r-xxxxxxxxxxxx.redis.rds.aliyuncs.com:6379", "redis.type": "STRING", "principal.service.name": "****", "principal.service.password": "************" }
如果配置成功無誤,Control Center中會顯示處於running狀態的connector執行個體。


connector功能測試
通過Client或者SDK方式向配置的Confluent Topic中發送資料。

connector會自動啟動job,可以看到redis-connect-topic中的資料已寫入配置的Redis執行個體中。

kafka-connect-s3
kafka-connect-s3相容OSS協議,可以通過S3SinkConnector相關配置選項建立Connector,將Kafka叢集中Topic資料匯出至OSS的bucket中。
connector配置
設定檔內容如下所示,該配置將叢集中Topic名為test101下的訊息匯出到OSS的cn-beijing區中bucket為lm-xxxx-test的topic_test目錄下。
{ "name": "oss_test", "config": { "name": "oss_test", "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "test101", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "flush.size": "1", "schema.compatibility": "NONE", //bucketName必須是OSS上某個bucket下面的一個目錄 "s3.bucket.name": "topic_test", "s3.region": "cn-beijing", //可以寫入OSS檔案的賬戶AK "aws.access.key.id": "your_access_key_id", "aws.secret.access.key": "******************************", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "topics.dir": "", //此處包含bucket相關資訊 "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com", "principal.service.name": "****", "principal.service.password": "********" }
debezium-connector-mysql
connector配置
設定檔內容如下所示,設定項目需要添加broker認證SASL_SSL相關配置,詳情請參見官方配置文檔。
{ "name": "oss_test", "config": { "name": "oss_test", "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "test101", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "flush.size": "1", "schema.compatibility": "NONE", //bucketName必須是OSS上某個bucket下面的一個目錄 "s3.bucket.name": "topic_test", "s3.region": "cn-beijing", //可以寫入OSS檔案的賬戶AK "aws.access.key.id": "your_access_key_id", "aws.secret.access.key": "******************************", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "topics.dir": "", //此處包含bucket相關資訊 "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com", "principal.service.name": "****", "principal.service.password": "********" }
Connector常見問題
設定項目中要求MySQL的使用者擁有RELOAD或FLUSH_TABLES許可權,否則會出現如下錯誤。

connector把MySQL資料庫表資料發送至Topic過程中需要自動建立Topic,此時需要在Control Center中設定auto.create.topics.enable配置項為true,或者手動建立相應名字的Topic,否則報錯如下。


Connector Rest API
強制更新Connector可能導致正在啟動並執行job結束,且會導致Control Center中Connect資訊異常,可通過Restful API手動刪除Connector進行解決,更多API使用詳情,請參見API文檔。
查看Connector執行個體資訊
API格式:GET /connectors
查詢方式:可以通過Postman或者命令列的方式進行查看。
Postman

命令列
curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors' --header 'Authorization: Basic xxx'
使用情境:用於查看已經部署的connector資訊詳情。
查看Connector狀態資訊
API格式:GET /connectors/(string:name)/tasks/(int:taskid)/status
查詢方式:可以通過Postman或者命令列的方式進行查看。
Postman

命令列
curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector/status' --header 'Authorization: Basic xxx'
使用情境:用於查詢connector的部署狀態,當connector部署failed後,可通過該API進行詳情查詢,診斷問題。
刪除connector
API格式:DELETE /connectors/(string:name)/
查詢方式:可以通過Postman或者命令列的方式進行查看。
Postman

命令列
curl --location --request DELETE 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector' --header 'Authorization: Basic xxx'
使用情境:用於強制更新部署Connector後,Control Center中出現擷取Connector資訊失敗時,可通過該API對connector進行強制移除,Control Center可恢複正常運行。