本文為您介紹雲訊息佇列 Confluent 版執行個體訊息收發時實現Schema格式校正的整體流程和操作。通過Schema格式校正,可以確保生產者發送的訊息符合預定義的資料結構,從而提高資料一致性和系統可靠性。
操作流程
步驟一:購買和部署執行個體
購買執行個體
登入雲訊息佇列 Confluent 版控制台,在左側導覽列,單擊实例列表。
在頂部功能表列,選擇地區,然後單擊购买实例。
在请选择您要创建的实例的付费方式面板,選擇執行個體系列為Confluent 系列,然後單擊確定。
在購買面板,根據自身業務需求設定以下參數,然後單擊立即購買,根據頁面提示完成支付。
部署執行個體
登入雲訊息佇列 Confluent 版控制台,在左側導覽列,單擊实例列表。
在頂部功能表列,選擇地區,然後在執行個體列表頁面,找到未部署的執行個體,單擊右側操作列的部署。
在部署实例面板,配置以下參數,然後單擊确定。
執行個體部署說明
參數
說明
樣本
部署模式
叢集支援單/多可用性區域部署。
單可用性區域
可用性區域
選擇可用性區域。
可用性區域a
選擇專用網路
選擇專用網路。
vpc-bp17fapfdj0dwzjkd****
選擇交換器
如果之前沒有建立過交換器,需要首先建立對應可用性區域的交換器,同時為了保證叢集的順利拉起,每個交換器的可用IP數建議設定在64個以上。
vsw-bp1gbjhj53hdjdkg****
SLB服務
預設開通。
無
掛載公網
是否開啟公網訪問。
開啟
登入名稱
Control Center登入預設使用root使用者。
root
登入密碼
設定Control Center登入密碼。
******
確認密碼
再次輸入密碼。
******
執行個體進入部署中狀態。執行個體部署預計需要10分鐘~30分鐘。
步驟二:登入Control Center
登入雲訊息佇列 Confluent 版控制台,在左側導覽列,單擊实例列表。
在頂部功能表列,選擇地區,然後在執行個體列表頁面,單擊目標執行個體名稱。
在实例详情頁面,單擊右上方的登入控制台進行Control Center登入。
說明Control Center控制台的登入使用者名稱和密碼為部署執行個體時配置名為root的使用者名稱和密碼。

登入完成後,進入Control Center控制台的Home頁面。

步驟三:(可選)Connector外掛程式安裝
如果需要使用Connector相關功能,需要購買Connect組件資源並安裝Connector外掛程式。
Connector外掛程式需要在雲訊息佇列 Confluent 版控制台安裝完成後,才能在Control Center控制台中正常使用。
雲訊息佇列 Confluent 版根據執行個體版本的不同,支援安裝不同類型的Connector外掛程式,具體詳情請參見Connector外掛程式管理。
Connector使用請參見Connector使用案例。
步驟四:建立Topic
登入Control Center控制台,在Home頁面單擊controlcenter.clusterk卡片,進入到Cluster overview頁面。

在左側導覽列,單擊Topics,然後在Topic列表頁面單擊+ Add topic。

在New topic頁面,設定Topic名稱和分區數,然後單擊Create with defaults。

建立完成後,進入到Topic詳情頁面。

步驟五:開啟Schema格式校正
在Topic詳情頁面,單擊Configuration頁簽,然後單擊Edit settings。

然後單擊Switch to expert mode。

將confluent_value_schema_validation欄位設定為true,然後單擊Save changes,啟用Schema驗證訊息內容格式。啟用後發送和消費資料時將進行格式校正。

步驟六:使用者管理和授權
如需使用其他LDAP使用者,則需要在雲訊息佇列 Confluent 版控制台添加LDAP使用者後並進行授權。詳情請參見使用者管理和授權。
步驟七:網路訪問和安全設定
在收發訊息時,需要提供服務的連結地址,並為使用的LDAP使用者授予服務對應的許可權。
步驟八:發送/消費訊息
1.環境準備
本文以在Linux伺服器中使用範例程式碼接入雲訊息佇列 Confluent 版進行訊息收發為例進行說明。
安裝Java 8 或 11,關於Java版本支援情況,請參見Confluent Platform中Java版本支援。
安裝Maven 3.8及以上版本,具體操作,請參見安裝Maven。
執行以下命令,複製範例程式碼,並切換到
7.9.0-post分支。git clone https://github.com/confluentinc/examples.git cd examples/clients/avro git checkout 7.9.0-post在
$HOME/.confluent/路徑下建立用戶端設定檔java.config。其中$HOME為您的使用者主目錄。在設定檔中,配置如下配置項。# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for higher availability in Apache Kafka clients prior to 3.0 session.timeout.ms=45000 # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}參數
描述
樣本值
BROKER_ENDPOINT
KAFKA服務的連結地址。
服務地址在雲訊息佇列 Confluent 版控制台訪問連結和介面頁面查看。若需要使用公網訪問,則需要開啟公網,其他安全訪問配置請參見網路訪問與安全設定。
pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092
CLUSTER_API_KEY
雲訊息佇列 Confluent 版控制台使用者管理頁面中LDAP使用者名稱和密碼。
在測試過程中,可以暫時使用root帳號及其密碼。如需使用其他使用者,則需在雲訊息佇列 Confluent 版控制台中建立該使用者,並為其授予Kafka cluster相應的許可權。建立使用者和授權,請參見使用者管理和授權。
root
CLUSTER_API_SECRET
******
SR_ENDPOINT
SCHEMA_REGISTRY服務的連結地址。
服務地址在雲訊息佇列 Confluent 版控制台訪問連結和介面頁面查看。若需要使用公網訪問,則需要開啟公網,其他安全訪問配置請參見網路訪問與安全設定。
pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443
SR_API_KEY
雲訊息佇列 Confluent 版控制台使用者管理頁面中LDAP使用者名稱和密碼。
在測試過程中,可以暫時使用root帳號及其密碼。如需使用其他使用者,則需在雲訊息佇列 Confluent 版控制台中建立該使用者,並為其授予Schema Registry相應的許可權。建立使用者和授權,請參見使用者管理和授權。
root
SR_API_SECRET
******
範例程式碼中的Topic參數值設定為
transactions。在測試時,可以直接建立名為transactions的Topic。如果需要使用其他的Topic,則需相應更改代碼中的參數值。範例程式碼如下,更多詳情請參見Confluent Platform範例程式碼。
2.建立Schema
進入專案的examples/clients/avro目錄下,執行以下命令,查看
Payment.avsc檔案內容。cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc返回結果
{ "namespace": "io.confluent.examples.clients.basicavro", "type": "record", "name": "Payment", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"} ] }在Control Center控制台Topic詳情頁面,單擊Schema,然後單擊Set a schema。
在Schema頁簽,單擊Avro,將上述Payment.avsc文本填入文字框,單擊Create。

3.發送訊息
進入專案的examples/clients/avro目錄下,執行以下命令編譯專案。
mvn clean compile package編譯完成後,執行以下代碼,發送訊息。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \ -Dexec.args="$HOME/.confluent/java.config"執行發送命令後,如下結果則表明發送成功。
... Successfully produced 10 messages to a topic called transactions [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ ...在Control Center控制台可以查看到已發送的訊息。

4.消費訊息
進入專案的examples/clients/avro目錄下,執行以下命令編譯專案。
mvn clean compile package執行以下代碼,消費訊息。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \ -Dexec.args="$HOME/.confluent/java.config"運行消費命令後,如下結果則表明訊息已被成功消費。
... key = id0, value = {"id": "id0", "amount": 1000.0} key = id1, value = {"id": "id1", "amount": 1000.0} key = id2, value = {"id": "id2", "amount": 1000.0} key = id3, value = {"id": "id3", "amount": 1000.0} key = id4, value = {"id": "id4", "amount": 1000.0} key = id5, value = {"id": "id5", "amount": 1000.0} key = id6, value = {"id": "id6", "amount": 1000.0} key = id7, value = {"id": "id7", "amount": 1000.0} key = id8, value = {"id": "id8", "amount": 1000.0} key = id9, value = {"id": "id9", "amount": 1000.0} ...