All Products
Search
Document Center

ApsaraMQ for Kafka:Manage schemas with Schema Registry

Last Updated:Mar 11, 2026

Schema Registry provides a centralized service for managing and validating the structure of messages sent through Kafka topics. By registering schemas, producers and consumers enforce a shared data contract -- preventing malformed messages from entering the pipeline and catching compatibility issues before they reach production.

This guide covers the end-to-end workflow on Linux: clone the Confluent sample project, create a topic, enable schema validation, register an Avro schema, and produce and consume messages with schema-aware serializers.

Before you begin

Before you begin, make sure that you have:

Step 1: Prepare the sample project

Clone the Confluent examples repository and switch to the 7.9.0-post branch:

git clone https://github.com/confluentinc/examples.git
cd examples/clients/avro
git checkout 7.9.0-post

Configure client properties

Create $HOME/.confluent/java.config with the following content:

# Kafka connection
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN

# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Recommended for higher availability in clients prior to 3.0
session.timeout.ms=45000

# Prevent data loss
acks=all

# Schema Registry connection
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Replace the placeholders with your actual values:

PlaceholderDescriptionWhere to find itExample
{{ BROKER_ENDPOINT }}Kafka service endpointAccess Links and Ports page in the ApsaraMQ for Confluent console. To use the public endpoint, enable Internet access first. For security settings, see Configure network access and security settings.pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092
{{ CLUSTER_API_KEY }}LDAP username for the Kafka clusterUsers page in the ApsaraMQ for Confluent console. For testing, use the root account. For production, create a dedicated user and grant Kafka cluster permissions.root
{{ CLUSTER_API_SECRET }}LDAP password for the Kafka clusterSame as above******
{{ SR_ENDPOINT }}Schema Registry service endpointAccess Links and Ports page in the ApsaraMQ for Confluent console. To use the public endpoint, enable Internet access first. For security settings, see Configure network access and security settings.pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443
{{ SR_API_KEY }}LDAP username for Schema RegistryUsers page in the ApsaraMQ for Confluent console. For testing, use the root account. For production, create a dedicated user and grant Schema Registry permissions.root
{{ SR_API_SECRET }}LDAP password for Schema RegistrySame as above******

Step 2: Create a topic

The sample code uses a topic named transactions. Create this topic in Control Center, or replace transactions in the code with your own topic name.

  1. Log on to Control Center. On the Home page, click the controlcenter.clusterk card to open the Cluster overview page.

    image

  2. In the left-side navigation pane, click Topics. In the upper-right corner, click + Add topic.

    image

  3. On the New topic page, enter the topic name and partition count, then click Create with defaults.

    image

  4. After the topic is created, open the topic details page to verify the configuration.

    image

Step 3: Enable schema validation

Schema validation rejects any message that does not conform to the registered schema at the broker level, for both producers and consumers.

  1. On the topic details page, click the Configuration tab, then click Edit settings.

    image

  2. Click Switch to expert mode.

    image

  3. Set confluent_value_schema_validation to true, then click Save changes.

    image

Step 4: Register an Avro schema

The sample project includes an Avro schema file (Payment.avsc) that defines a payment record with two fields: id (string) and amount (double).

View the schema file:

cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

Output:

{
  "namespace": "io.confluent.examples.clients.basicavro",
  "type": "record",
  "name": "Payment",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}

Register this schema in Control Center:

  1. On the topic details page, click the Schema tab, then click Set a schema.

  2. Select Avro as the format, paste the schema content into the code editor, and click Create.

    Schema creation with Avro format

Step 5: Produce and consume messages

With schema validation enabled and a schema registered, produce and consume Avro-serialized messages using the Confluent Java client.

Build the project

From the examples/clients/avro directory, compile the project:

mvn clean compile package

Produce messages

The producer uses KafkaAvroSerializer to serialize message values against the registered Avro schema. Keys are serialized as plain strings.

Producer sample code (ProducerExample.java)

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ProducerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
            // Backwards compatibility, assume localhost
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
            // Load properties from the configuration file
            configFile = args[0];
            if (!Files.exists(Paths.get(configFile))) {
                throw new IOException(configFile + " not found.");
            } else {
                try (InputStream inputStream = new FileInputStream(configFile)) {
                    props.load(inputStream);
                }
            }
        }

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) {

            for (long i = 0; i < 10; i++) {
                final String orderId = "id" + Long.toString(i);
                final Payment payment = new Payment(orderId, 1000.00d);
                final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
                producer.send(record);
                Thread.sleep(1000L);
            }

            producer.flush();
            System.out.printf("Successfully produced 10 messages to a topic called %s%n", TOPIC);

        } catch (final SerializationException e) {
            e.printStackTrace();
        } catch (final InterruptedException e) {
            e.printStackTrace();
        }

    }

}

Run the producer:

mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
  -Dexec.args="$HOME/.confluent/java.config"

Expected output:

...
Successfully produced 10 messages to a topic called transactions
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
...

Verify the messages in Control Center by opening the topic and checking the Messages tab.

image

Consume messages

The consumer uses KafkaAvroDeserializer to deserialize message values back into Payment objects. Setting SPECIFIC_AVRO_READER_CONFIG to true enables deserialization into the generated Payment class rather than a generic Avro record.

Consumer sample code (ConsumerExample.java)

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ConsumerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
            // Backwards compatibility, assume localhost
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
            // Load properties from the configuration file
            configFile = args[0];
            if (!Files.exists(Paths.get(configFile))) {
                throw new IOException(configFile + " not found.");
            } else {
                try (InputStream inputStream = new FileInputStream(configFile)) {
                    props.load(inputStream);
                }
            }
        }

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-payments");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

        try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(TOPIC));

            while (true) {
                final ConsumerRecords<String, Payment> records = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<String, Payment> record : records) {
                    final String key = record.key();
                    final Payment value = record.value();
                    System.out.printf("key = %s, value = %s%n", key, value);
                }
            }

        }
    }

}

Run the consumer:

mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
  -Dexec.args="$HOME/.confluent/java.config"

Expected output:

...
key = id0, value = {"id": "id0", "amount": 1000.0}
key = id1, value = {"id": "id1", "amount": 1000.0}
key = id2, value = {"id": "id2", "amount": 1000.0}
key = id3, value = {"id": "id3", "amount": 1000.0}
key = id4, value = {"id": "id4", "amount": 1000.0}
key = id5, value = {"id": "id5", "amount": 1000.0}
key = id6, value = {"id": "id6", "amount": 1000.0}
key = id7, value = {"id": "id7", "amount": 1000.0}
key = id8, value = {"id": "id8", "amount": 1000.0}
key = id9, value = {"id": "id9", "amount": 1000.0}
...

What to do next