全部產品
Search
文件中心

ApsaraMQ for Kafka:Connector使用案例

更新時間:Jul 06, 2024

本文向您介紹如何使用雲訊息佇列 Confluent 版的Connector相關功能。

redis-enterprise-kafka

Connector配置

配置方式分為兩種,在Control Center中自行配置,或者上傳connector config file。

  • Control Center配置image

  • connector config file配置

    {
     "name": "RedisEnterpriseSinkConnectorConnector_0",
     "config": {
     "name": "RedisEnterpriseSinkConnectorConnector_0",
     "connector.class": "com.redis.kafka.connect.RedisEnterpriseSinkConnector",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "redis-connect-topic",
     "redis.uri": "redis://r-xxxxxxxxxxxx.redis.rds.aliyuncs.com:6379",
     "redis.type": "STRING",
     "principal.service.name": "****",
     "principal.service.password": "************"
     }
  1. 如果配置成功無誤,Control Center中會顯示處於running狀態的connector執行個體。imageimage

connector功能測試

  1. 通過Client或者SDK方式向配置的Confluent Topic中發送資料。image

  2. connector會自動啟動job,可以看到redis-connect-topic中的資料已寫入配置的Redis執行個體中。image

kafka-connect-s3

kafka-connect-s3相容OSS協議,可以通過S3SinkConnector相關配置選項建立Connector,將Kafka叢集中Topic資料匯出至OSS的bucket中。

connector配置

  1. 設定檔內容如下所示,該配置將叢集中Topic名為test101下的訊息匯出到OSS的cn-beijing區中bucket為lm-xxxx-test的topic_test目錄下。

    {
     "name": "oss_test",
     "config": {
     "name": "oss_test",
     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "test101",
     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
     "flush.size": "1",
     "schema.compatibility": "NONE",
     //bucketName必須是OSS上某個bucket下面的一個目錄
     "s3.bucket.name": "topic_test",
     "s3.region": "cn-beijing",
     //可以寫入OSS檔案的賬戶AK
     "aws.access.key.id": "your_access_key_id",
     "aws.secret.access.key": "******************************",
     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
     "topics.dir": "",
     //此處包含bucket相關資訊
     "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com",
     "principal.service.name": "****",
     "principal.service.password": "********"
     }

debezium-connector-mysql

connector配置

  1. 設定檔內容如下所示,設定項目需要添加broker認證SASL_SSL相關配置,詳情請參見官方配置文檔

    {
     "name": "oss_test",
     "config": {
     "name": "oss_test",
     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "test101",
     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
     "flush.size": "1",
     "schema.compatibility": "NONE",
     //bucketName必須是OSS上某個bucket下面的一個目錄
     "s3.bucket.name": "topic_test",
     "s3.region": "cn-beijing",
     //可以寫入OSS檔案的賬戶AK
     "aws.access.key.id": "your_access_key_id",
     "aws.secret.access.key": "******************************",
     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
     "topics.dir": "",
     //此處包含bucket相關資訊
     "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com",
     "principal.service.name": "****",
     "principal.service.password": "********"
     }

Connector常見問題

  1. 設定項目中要求MySQL的使用者擁有RELOAD或FLUSH_TABLES許可權,否則會出現如下錯誤。image

  2. connector把MySQL資料庫表資料發送至Topic過程中需要自動建立Topic,此時需要在Control Center中設定auto.create.topics.enable配置項為true,或者手動建立相應名字的Topic,否則報錯如下。imageimage

Connector Rest API

重要

強制更新Connector可能導致正在啟動並執行job結束,且會導致Control Center中Connect資訊異常,可通過Restful API手動刪除Connector進行解決,更多API使用詳情,請參見API文檔

查看Connector執行個體資訊

  • API格式:GET /connectors

  • 查詢方式:可以通過Postman或者命令列的方式進行查看。

    • Postmanimage

    • 命令列

      curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors' --header 'Authorization: Basic xxx'
  • 使用情境:用於查看已經部署的connector資訊詳情。

查看Connector狀態資訊

  • API格式:GET /connectors/(string:name)/tasks/(int:taskid)/status

  • 查詢方式:可以通過Postman或者命令列的方式進行查看。

    • Postmanimage

    • 命令列

      curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector/status' --header 'Authorization: Basic xxx'
  • 使用情境:用於查詢connector的部署狀態,當connector部署failed後,可通過該API進行詳情查詢,診斷問題。

刪除connector

  • API格式:DELETE /connectors/(string:name)/

  • 查詢方式:可以通過Postman或者命令列的方式進行查看。

    • Postmanimage

    • 命令列

      curl --location --request DELETE 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector'  --header 'Authorization: Basic xxx'
  • 使用情境:用於強制更新部署Connector後,Control Center中出現擷取Connector資訊失敗時,可通過該API對connector進行強制移除,Control Center可恢複正常運行。