全部產品
Search
文件中心

ApsaraMQ for Kafka:操作指引

更新時間:May 17, 2025

本文為您介紹雲訊息佇列 Confluent 版執行個體訊息收發時實現Schema格式校正的整體流程和操作。通過Schema格式校正,可以確保生產者發送的訊息符合預定義的資料結構,從而提高資料一致性和系統可靠性。

操作流程

步驟一:購買和部署執行個體

購買執行個體

  1. 登入雲訊息佇列 Confluent 版控制台,在左側導覽列,單擊实例列表

  2. 在頂部功能表列,選擇地區,然後單擊购买实例

  3. 请选择您要创建的实例的付费方式面板,選擇執行個體系列Confluent 系列,然後單擊確定

  4. 購買面板,根據自身業務需求設定以下參數,然後單擊立即購買,根據頁面提示完成支付。

    參數

    取值樣本

    套餐版本

    專業版

    不同版本之間的差異,請參見版本介紹

    付費時間長度

    一年

    地區和可用性區域

    華東1(杭州)

    計算資源配置

    根據您的叢集規模選擇叢集計算資源和儲存大小,並根據業務需求配置自訂群組件資源,配置建議,請參見叢集資源規格評估

    組件資源配置

    根據您的叢集規模選擇叢集計算資源和儲存大小,並根據業務需求配置自訂群組件資源,配置建議,請參見叢集資源規格評估

    說明

    1 CU代表1 Core 4 GB計算資源。

部署執行個體

  1. 登入雲訊息佇列 Confluent 版控制台,在左側導覽列,單擊实例列表

  2. 在頂部功能表列,選擇地區,然後在執行個體列表頁面,找到未部署的執行個體,單擊右側操作列的部署

  3. 部署实例面板,配置以下參數,然後單擊确定

    執行個體部署說明

    參數

    說明

    樣本

    部署模式

    叢集支援單/多可用性區域部署。

    單可用性區域

    可用性區域

    選擇可用性區域。

    可用性區域a

    選擇專用網路

    選擇專用網路。

    vpc-bp17fapfdj0dwzjkd****

    選擇交換器

    如果之前沒有建立過交換器,需要首先建立對應可用性區域的交換器,同時為了保證叢集的順利拉起,每個交換器的可用IP數建議設定在64個以上。

    vsw-bp1gbjhj53hdjdkg****

    SLB服務

    預設開通。

    掛載公網

    是否開啟公網訪問。

    開啟

    登入名稱

    Control Center登入預設使用root使用者。

    root

    登入密碼

    設定Control Center登入密碼。

    ******

    確認密碼

    再次輸入密碼。

    ******

    執行個體進入部署中狀態。執行個體部署預計需要10分鐘~30分鐘。

步驟二:登入Control Center

  1. 登入雲訊息佇列 Confluent 版控制台,在左側導覽列,單擊实例列表

  2. 在頂部功能表列,選擇地區,然後在執行個體列表頁面,單擊目標執行個體名稱。

  3. 实例详情頁面,單擊右上方的登入控制台進行Control Center登入。

    說明

    Control Center控制台的登入使用者名稱和密碼為部署執行個體時配置名為root的使用者名稱和密碼。

    image

  4. 登入完成後,進入Control Center控制台的Home頁面。

    image

步驟三:(可選)Connector外掛程式安裝

如果需要使用Connector相關功能,需要購買Connect組件資源並安裝Connector外掛程式。

  • Connector外掛程式需要在雲訊息佇列 Confluent 版控制台安裝完成後,才能在Control Center控制台中正常使用。

  • 雲訊息佇列 Confluent 版根據執行個體版本的不同,支援安裝不同類型的Connector外掛程式,具體詳情請參見Connector外掛程式管理

  • Connector使用請參見Connector使用案例

步驟四:建立Topic

  1. 登入Control Center控制台,在Home頁面單擊controlcenter.clusterk卡片,進入到Cluster overview頁面。

    image

  2. 在左側導覽列,單擊Topics,然後在Topic列表頁面單擊+ Add topic

    image

  3. New topic頁面,設定Topic名稱和分區數,然後單擊Create with defaults

    image

  4. 建立完成後,進入到Topic詳情頁面。

    image

步驟五:開啟Schema格式校正

  1. 在Topic詳情頁面,單擊Configuration頁簽,然後單擊Edit settings

    image

  2. 然後單擊Switch to expert mode

    image

  3. confluent_value_schema_validation欄位設定為true,然後單擊Save changes,啟用Schema驗證訊息內容格式。啟用後發送和消費資料時將進行格式校正。

    image

步驟六:使用者管理和授權

如需使用其他LDAP使用者,則需要在雲訊息佇列 Confluent 版控制台添加LDAP使用者後並進行授權。詳情請參見使用者管理和授權

步驟七:網路訪問和安全設定

在收發訊息時,需要提供服務的連結地址,並為使用的LDAP使用者授予服務對應的許可權。

  • 連結地址

    根據實際情況選擇內網、外網連結地址。如需使用公網地址,則需要進行開啟公網操作。

  • 授權

    需要對使用的LDAP使用者授予服務對應的許可權。相關操作請參見使用者管理和授權

    說明

    root使用者具備所有許可權,建議在實際生產環境中使用其他使用者並為其進行授權。

  • 安全設定請參見網路訪問與安全設定

步驟八:發送/消費訊息

1.環境準備

  1. 本文以在Linux伺服器中使用範例程式碼接入雲訊息佇列 Confluent 版進行訊息收發為例進行說明。

  2. 執行以下命令,複製範例程式碼,並切換到7.9.0-post分支。

    git clone https://github.com/confluentinc/examples.git
    
    cd examples/clients/avro
    
    git checkout 7.9.0-post
  3. $HOME/.confluent/路徑下建立用戶端設定檔java.config。其中 $HOME為您的使用者主目錄。在設定檔中,配置如下配置項。

    # Required connection configs for Kafka producer, consumer, and admin
    bootstrap.servers={{ BROKER_ENDPOINT }}
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
    sasl.mechanism=PLAIN
    
    # Required for correctness in Apache Kafka clients prior to 2.6
    client.dns.lookup=use_all_dns_ips
    
    # Best practice for higher availability in Apache Kafka clients prior to 3.0
    session.timeout.ms=45000
    
    # Best practice for Kafka producer to prevent data loss
    acks=all
    
    # Required connection configs for Confluent Cloud Schema Registry
    schema.registry.url=https://{{ SR_ENDPOINT }}
    basic.auth.credentials.source=USER_INFO
    basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

    參數

    描述

    樣本值

    BROKER_ENDPOINT

    KAFKA服務的連結地址。

    服務地址在雲訊息佇列 Confluent 版控制台訪問連結和介面頁面查看。若需要使用公網訪問,則需要開啟公網,其他安全訪問配置請參見網路訪問與安全設定

    pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092

    CLUSTER_API_KEY

    雲訊息佇列 Confluent 版控制台使用者管理頁面中LDAP使用者名稱和密碼。

    在測試過程中,可以暫時使用root帳號及其密碼。如需使用其他使用者,則需在雲訊息佇列 Confluent 版控制台中建立該使用者,並為其授予Kafka cluster相應的許可權。建立使用者和授權,請參見使用者管理和授權

    root

    CLUSTER_API_SECRET

    ******

    SR_ENDPOINT

    SCHEMA_REGISTRY服務的連結地址。

    服務地址在雲訊息佇列 Confluent 版控制台訪問連結和介面頁面查看。若需要使用公網訪問,則需要開啟公網,其他安全訪問配置請參見網路訪問與安全設定

    pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443

    SR_API_KEY

    雲訊息佇列 Confluent 版控制台使用者管理頁面中LDAP使用者名稱和密碼。

    在測試過程中,可以暫時使用root帳號及其密碼。如需使用其他使用者,則需在雲訊息佇列 Confluent 版控制台中建立該使用者,並為其授予Schema Registry相應的許可權。建立使用者和授權,請參見使用者管理和授權

    root

    SR_API_SECRET

    ******

  4. 範例程式碼中的Topic參數值設定為transactions。在測試時,可以直接建立名為transactions的Topic。如果需要使用其他的Topic,則需相應更改代碼中的參數值。範例程式碼如下,更多詳情請參見Confluent Platform範例程式碼

    生產訊息範例程式碼

    import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import org.apache.kafka.common.errors.SerializationException;
    
    import java.util.Properties;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.io.FileInputStream;
    import java.io.InputStream;
    
    public class ProducerExample {
    
        private static final String TOPIC = "transactions";
        private static final Properties props = new Properties();
        private static String configFile;
    
        @SuppressWarnings("InfiniteLoopStatement")
        public static void main(final String[] args) throws IOException {
    
            if (args.length < 1) {
              // Backwards compatibility, assume localhost
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
            } else {
              // Load properties from a local configuration file
              // Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
              // to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
              // Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
              configFile = args[0];
              if (!Files.exists(Paths.get(configFile))) {
                throw new IOException(configFile + " not found.");
              } else {
                try (InputStream inputStream = new FileInputStream(configFile)) {
                  props.load(inputStream);
                }
              }
            }
    
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    
            try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) {
    
                for (long i = 0; i < 10; i++) {
                    final String orderId = "id" + Long.toString(i);
                    final Payment payment = new Payment(orderId, 1000.00d);
                    final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
                    producer.send(record);
                    Thread.sleep(1000L);
                }
    
                producer.flush();
                System.out.printf("Successfully produced 10 messages to a topic called %s%n", TOPIC);
    
            } catch (final SerializationException e) {
                e.printStackTrace();
            } catch (final InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
    }

    消費訊息範例程式碼

    import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.io.FileInputStream;
    import java.io.InputStream;
    
    public class ConsumerExample {
    
        private static final String TOPIC = "transactions";
        private static final Properties props = new Properties();
        private static String configFile;
    
        @SuppressWarnings("InfiniteLoopStatement")
        public static void main(final String[] args) throws IOException {
    
            if (args.length < 1) {
              // Backwards compatibility, assume localhost
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
            } else {
              // Load properties from a local configuration file
              // Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
              // to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
              // Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
              configFile = args[0];
              if (!Files.exists(Paths.get(configFile))) {
                throw new IOException(configFile + " not found.");
              } else {
                try (InputStream inputStream = new FileInputStream(configFile)) {
                  props.load(inputStream);
                }
              }
            }
    
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-payments");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
    
            try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
                consumer.subscribe(Collections.singletonList(TOPIC));
    
                while (true) {
                    final ConsumerRecords<String, Payment> records = consumer.poll(Duration.ofMillis(100));
                    for (final ConsumerRecord<String, Payment> record : records) {
                        final String key = record.key();
                        final Payment value = record.value();
                        System.out.printf("key = %s, value = %s%n", key, value);
                    }
                }
    
            }
        }
    
    }

2.建立Schema

  1. 進入專案的examples/clients/avro目錄下,執行以下命令,查看Payment.avsc檔案內容。

    cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

    返回結果

    {
     "namespace": "io.confluent.examples.clients.basicavro",
     "type": "record",
     "name": "Payment",
     "fields": [
         {"name": "id", "type": "string"},
         {"name": "amount", "type": "double"}
     ]
    }
    
  2. 在Control Center控制台Topic詳情頁面,單擊Schema,然後單擊Set a schema

  3. Schema頁簽,單擊Avro,將上述Payment.avsc文本填入文字框,單擊Create

    image

3.發送訊息

  1. 進入專案的examples/clients/avro目錄下,執行以下命令編譯專案。

    mvn clean compile package
  2. 編譯完成後,執行以下代碼,發送訊息。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
      -Dexec.args="$HOME/.confluent/java.config"

    執行發送命令後,如下結果則表明發送成功。

    ...
    Successfully produced 10 messages to a topic called transactions
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    ...
  3. 在Control Center控制台可以查看到已發送的訊息。

    image

4.消費訊息

  1. 進入專案的examples/clients/avro目錄下,執行以下命令編譯專案。

    mvn clean compile package
  2. 執行以下代碼,消費訊息。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
      -Dexec.args="$HOME/.confluent/java.config"
  3. 運行消費命令後,如下結果則表明訊息已被成功消費。

    ...
    key = id0, value = {"id": "id0", "amount": 1000.0}
    key = id1, value = {"id": "id1", "amount": 1000.0}
    key = id2, value = {"id": "id2", "amount": 1000.0}
    key = id3, value = {"id": "id3", "amount": 1000.0}
    key = id4, value = {"id": "id4", "amount": 1000.0}
    key = id5, value = {"id": "id5", "amount": 1000.0}
    key = id6, value = {"id": "id6", "amount": 1000.0}
    key = id7, value = {"id": "id7", "amount": 1000.0}
    key = id8, value = {"id": "id8", "amount": 1000.0}
    key = id9, value = {"id": "id9", "amount": 1000.0}
    ...