全部產品
Search
文件中心

ApsaraMQ for Kafka:Schema Registry管理

更新時間:May 29, 2025

雲訊息佇列 Confluent 版使用Schema Registry管理Schema。本文將向您介紹在Linux環境下Schema Registry的基本操作。

前提條件

步驟一:準備範例程式碼

  1. 執行以下命令,複製範例程式碼,並切換到7.9.0-post分支。

    git clone https://github.com/confluentinc/examples.git
    
    cd examples/clients/avro
    
    git checkout 7.9.0-post
  2. $HOME/.confluent/路徑下建立用戶端設定檔java.config。其中 $HOME為您的使用者主目錄。在設定檔中,配置如下配置項。

    # Required connection configs for Kafka producer, consumer, and admin
    bootstrap.servers={{ BROKER_ENDPOINT }}
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
    sasl.mechanism=PLAIN
    
    # Required for correctness in Apache Kafka clients prior to 2.6
    client.dns.lookup=use_all_dns_ips
    
    # Best practice for higher availability in Apache Kafka clients prior to 3.0
    session.timeout.ms=45000
    
    # Best practice for Kafka producer to prevent data loss
    acks=all
    
    # Required connection configs for Confluent Cloud Schema Registry
    schema.registry.url=https://{{ SR_ENDPOINT }}
    basic.auth.credentials.source=USER_INFO
    basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

    參數

    描述

    樣本值

    BROKER_ENDPOINT

    KAFKA服務的連結地址。

    服務地址在雲訊息佇列 Confluent 版控制台訪問連結和介面頁面查看。若需要使用公網訪問,則需要開啟公網,其他安全訪問配置請參見網路訪問與安全設定

    pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092

    CLUSTER_API_KEY

    雲訊息佇列 Confluent 版控制台使用者管理頁面中LDAP使用者名稱和密碼。

    在測試過程中,可以暫時使用root帳號及其密碼。如需使用其他使用者,則需在雲訊息佇列 Confluent 版控制台中建立該使用者,並為其授予Kafka cluster相應的許可權。建立使用者和授權,請參見使用者管理和授權

    root

    CLUSTER_API_SECRET

    ******

    SR_ENDPOINT

    SCHEMA_REGISTRY服務的連結地址。

    服務地址在雲訊息佇列 Confluent 版控制台訪問連結和介面頁面查看。若需要使用公網訪問,則需要開啟公網,其他安全訪問配置請參見網路訪問與安全設定

    pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443

    SR_API_KEY

    雲訊息佇列 Confluent 版控制台使用者管理頁面中LDAP使用者名稱和密碼。

    在測試過程中,可以暫時使用root帳號及其密碼。如需使用其他使用者,則需在雲訊息佇列 Confluent 版控制台中建立該使用者,並為其授予Schema Registry相應的許可權。建立使用者和授權,請參見使用者管理和授權

    root

    SR_API_SECRET

    ******

步驟二:建立Topic

說明

範例程式碼中的Topic參數值設定為transactions。在測試時,可以直接建立名為transactions的Topic。如果需要使用其他的Topic,則需相應更改代碼中的參數值。

  1. 登入Control Center控制台,在Home頁面單擊controlcenter.clusterk卡片,進入到Cluster overview頁面。

    image

  2. 在左側導覽列,單擊Topics,然後在Topic列表頁面單擊+ Add topic

    image

  3. New topic頁面,設定Topic名稱和分區數,然後單擊Create with defaults

    image

  4. 建立完成後,進入到Topic詳情頁面。

    image

步驟三:開啟Schema格式校正

  1. 在Topic詳情頁面,單擊Configuration頁簽,然後單擊Edit settings

    image

  2. 然後單擊Switch to expert mode

    image

  3. confluent_value_schema_validation欄位設定為true,然後單擊Save changes,啟用Schema驗證訊息內容格式。啟用後發送和消費資料時將進行格式校正。

    image

步驟四:建立Schema

  1. 進入專案的examples/clients/avro目錄下,執行以下命令,查看Payment.avsc檔案內容。

    cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

    返回結果

    {
     "namespace": "io.confluent.examples.clients.basicavro",
     "type": "record",
     "name": "Payment",
     "fields": [
         {"name": "id", "type": "string"},
         {"name": "amount", "type": "double"}
     ]
    }
    
  2. 在Control Center控制台Topic詳情頁面,單擊Schema,然後單擊Set a schema

  3. Schema頁簽,單擊Avro,將上述Payment.avsc文本填入文字框,單擊Create

    image

步驟五:發送/消費訊息

發送訊息

建立Schema的校正格式為Avro,則需要在發送訊息時將訊息序列化方式指定為KafkaAvroSerializer類,並將訊息值類配置為Payment類。

範例程式碼如下:

生產訊息範例程式碼

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ProducerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
          // Backwards compatibility, assume localhost
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
          // Load properties from a local configuration file
          // Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
          // to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
          // Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
          configFile = args[0];
          if (!Files.exists(Paths.get(configFile))) {
            throw new IOException(configFile + " not found.");
          } else {
            try (InputStream inputStream = new FileInputStream(configFile)) {
              props.load(inputStream);
            }
          }
        }

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) {

            for (long i = 0; i < 10; i++) {
                final String orderId = "id" + Long.toString(i);
                final Payment payment = new Payment(orderId, 1000.00d);
                final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
                producer.send(record);
                Thread.sleep(1000L);
            }

            producer.flush();
            System.out.printf("Successfully produced 10 messages to a topic called %s%n", TOPIC);

        } catch (final SerializationException e) {
            e.printStackTrace();
        } catch (final InterruptedException e) {
            e.printStackTrace();
        }

    }

}

發送訊息的操作如下:

  1. 進入專案的examples/clients/avro目錄下,執行以下命令編譯專案。

    mvn clean compile package
  2. 編譯完成後,執行以下代碼,發送訊息。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
      -Dexec.args="$HOME/.confluent/java.config"

    執行發送命令後,如下結果則表明發送成功。

    ...
    Successfully produced 10 messages to a topic called transactions
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    ...
  3. 在Control Center控制台可以查看到已發送的訊息。

    image

消費訊息

建立Schema的校正格式為Avro,則需要在消費訊息時將訊息還原序列化方式指定為KafkaAvroDeSerializer類,並將訊息值類配置為Payment類。

範例程式碼如下:

消費訊息範例程式碼

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ConsumerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
          // Backwards compatibility, assume localhost
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
          // Load properties from a local configuration file
          // Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
          // to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
          // Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
          configFile = args[0];
          if (!Files.exists(Paths.get(configFile))) {
            throw new IOException(configFile + " not found.");
          } else {
            try (InputStream inputStream = new FileInputStream(configFile)) {
              props.load(inputStream);
            }
          }
        }

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-payments");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 

        try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(TOPIC));

            while (true) {
                final ConsumerRecords<String, Payment> records = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<String, Payment> record : records) {
                    final String key = record.key();
                    final Payment value = record.value();
                    System.out.printf("key = %s, value = %s%n", key, value);
                }
            }

        }
    }

}

消費訊息的操作如下:

  1. 進入專案的examples/clients/avro目錄下,執行以下命令編譯專案。

    mvn clean compile package
  2. 執行以下代碼,消費訊息。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
      -Dexec.args="$HOME/.confluent/java.config"
  3. 運行消費命令後,如下結果則表明訊息已被成功消費。

    ...
    key = id0, value = {"id": "id0", "amount": 1000.0}
    key = id1, value = {"id": "id1", "amount": 1000.0}
    key = id2, value = {"id": "id2", "amount": 1000.0}
    key = id3, value = {"id": "id3", "amount": 1000.0}
    key = id4, value = {"id": "id4", "amount": 1000.0}
    key = id5, value = {"id": "id5", "amount": 1000.0}
    key = id6, value = {"id": "id6", "amount": 1000.0}
    key = id7, value = {"id": "id7", "amount": 1000.0}
    key = id8, value = {"id": "id8", "amount": 1000.0}
    key = id9, value = {"id": "id9", "amount": 1000.0}
    ...

相關文檔

更多關於Schema Registry的資訊,請參見Schema Registry Overview