すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Schema Registry を使用したスキーマ管理

最終更新日:Mar 11, 2026

Schema Registry は、Kafka トピックを介して送信されるメッセージの構造を一元的に管理・検証するサービスです。スキーマを登録することで、プロデューサーとコンシューマー間で共通のデータ契約が確立され、不正な形式のメッセージがパイプラインに流入することを防止するとともに、本番環境への展開前に互換性の問題を検出できます。

本ガイドでは、Linux 環境におけるエンドツーエンドのワークフローについて説明します。具体的には、Confluent のサンプルプロジェクトをクローンし、トピックを作成、スキーマ検証を有効化、Avro スキーマを登録し、スキーマ対応のシリアライザーを用いてメッセージを生成および消費する手順を解説します。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • ApsaraMQ for Confluent インスタンスが存在すること。詳細については、「インスタンスの購入とデプロイ」をご参照ください。

  • Kafka クラスターおよび Schema Registry クラスターへのアクセス権限(RBAC)が付与されていること。詳細については、「RBAC 認可」をご参照ください。

  • Java 8 または Java 11 がインストールされていること。サポート対象バージョンについては、「Java」をご参照ください。

  • Maven 3.8 以降がインストールされていること。ダウンロードについては、「Maven ダウンロード」をご参照ください。

ステップ 1:サンプルプロジェクトの準備

Confluent のサンプルリポジトリをクローンし、7.9.0-post ブランチに切り替えます。

git clone https://github.com/confluentinc/examples.git
cd examples/clients/avro
git checkout 7.9.0-post

クライアント設定の構成

以下の内容で $HOME/.confluent/java.config を作成します。

# Kafka 接続
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN

# Apache Kafka クライアント (2.6 より前) での正確性確保に必須
client.dns.lookup=use_all_dns_ips

# Apache Kafka クライアント (3.0 より前) での可用性向上を推奨
session.timeout.ms=45000

# データ損失の防止
acks=all

# Schema Registry 接続
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

プレースホルダーを実際の値に置き換えます。

プレースホルダー説明参照先
{{ BROKER_ENDPOINT }}Kafka サービスのエンドポイント[アクセスリンクとポート] ページ。パブリックエンドポイントを使用するには、まず インターネットアクセスを有効にします。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092
{{ CLUSTER_API_KEY }}Kafka クラスターの LDAP ユーザー名[ユーザー] ページ。 テストでは、ルートアカウントを使用します。 本番環境では、専用のユーザーを作成し、Kafka クラスターの権限を付与します。root
{{ CLUSTER_API_SECRET }}Kafka クラスターの LDAP パスワード上記と同じ******
{{ SR_ENDPOINT }}Schema Registry サービスのエンドポイント[アクセスリンクとポート] ページで、パブリックエンドポイントを使用するには、まず インターネットアクセスを有効にする必要があります。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443
{{ SR_API_KEY }}Schema Registry の LDAP ユーザー名[ユーザー] ページ。テスト用には、ルートアカウントを使用します。本番環境では、専用ユーザーを作成する とともに、Schema Registry の権限を付与します。root
{{ SR_API_SECRET }}Schema Registry の LDAP パスワード上記と同じページ******

ステップ 2:トピックの作成

サンプルコードでは transactions という名前のトピックが使用されます。Control Center でこのトピックを作成するか、コード内の transactions をご自身のトピック名に置き換えてください。

  1. Control Center にログインします。ホーム ページで、controlcenter.clusterk カードをクリックして、「クラスターオーバービュー」ページを開きます。

    image

  2. 左側ナビゲーションウィンドウで トピック をクリックし、右上隅の + トピックの追加 をクリックします。

    image

  3. 新規トピック」ページで、トピック名とパーティション数を入力し、デフォルト設定で作成 をクリックします。

    image

  4. トピックが作成された後、トピックの詳細ページを開き、設定を確認します。

    image

ステップ 3:スキーマ検証の有効化

スキーマ検証は、プロデューサーおよびコンシューマー双方に対して、ブローカーレベルで登録済みスキーマに準拠しないメッセージを拒否します。

  1. トピックの詳細ページで、構成 タブをクリックし、設定の編集 をクリックします。

    image

  2. エキスパートモードに切り替え をクリックします。

    image

  3. confluent_value_schema_validationtrue に設定し、変更の保存 をクリックします。

    image

ステップ 4:Avro スキーマの登録

サンプルプロジェクトには、Payment.avsc という Avro スキーマファイルが含まれており、文字列型の id と double 型の amount の 2 つのフィールドを持つ支払いレコードを定義しています。

スキーマファイルを表示します。

cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

出力結果:

{
  "namespace": "io.confluent.examples.clients.basicavro",
  "type": "record",
  "name": "Payment",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}

このスキーマを Control Center に登録します。

  1. トピックの詳細ページで、スキーマ タブをクリックし、スキーマの設定 をクリックします。

  2. フォーマットとして Avro を選択し、スキーマの内容をコードエディタに貼り付け、作成 をクリックします。

    Schema creation with Avro format

ステップ 5:メッセージの生成および消費

スキーマ検証が有効化され、スキーマが登録された状態で、Confluent の Java クライアントを用いて Avro シリアル化されたメッセージを生成および消費します。

プロジェクトのビルド

examples/clients/avro ディレクトリからプロジェクトをコンパイルします。

mvn clean compile package

メッセージの生成

プロデューサーは KafkaAvroSerializer を使用して、登録済みの Avro スキーマに基づいてメッセージの値をシリアル化します。キーはプレーンな文字列としてシリアル化されます。

プロデューサーサンプルコード (ProducerExample.java)

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ProducerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
            // 後方互換性のため、localhost を想定
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
            // 設定ファイルからプロパティを読み込む
            configFile = args[0];
            if (!Files.exists(Paths.get(configFile))) {
                throw new IOException(configFile + " が見つかりません。");
            } else {
                try (InputStream inputStream = new FileInputStream(configFile)) {
                    props.load(inputStream);
                }
            }
        }

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) {

            for (long i = 0; i < 10; i++) {
                final String orderId = "id" + Long.toString(i);
                final Payment payment = new Payment(orderId, 1000.00d);
                final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
                producer.send(record);
                Thread.sleep(1000L);
            }

            producer.flush();
            System.out.printf("トピック %s に正常に 10 件のメッセージを生成しました%n", TOPIC);

        } catch (final SerializationException e) {
            e.printStackTrace();
        } catch (final InterruptedException e) {
            e.printStackTrace();
        }

    }

}

プロデューサーを実行します。

mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
  -Dexec.args="$HOME/.confluent/java.config"

期待される出力:

...
トピック transactions に正常に 10 件のメッセージを生成しました
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
...

Control Center でトピックを開き、「メッセージ」タブを確認することで、メッセージが正しく生成されたことを検証できます。

image

メッセージの消費

コンシューマーは KafkaAvroDeserializer を使用して、メッセージの値を Payment オブジェクトへ逆シリアル化します。SPECIFIC_AVRO_READER_CONFIGtrue に設定すると、汎用的な Avro レコードではなく、生成された Payment クラスへの逆シリアル化が可能になります。

コンシューマーサンプルコード (ConsumerExample.java)

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ConsumerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
            // 後方互換性のため、localhost を想定
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
            // 設定ファイルからプロパティを読み込む
            configFile = args[0];
            if (!Files.exists(Paths.get(configFile))) {
                throw new IOException(configFile + " が見つかりません。");
            } else {
                try (InputStream inputStream = new FileInputStream(configFile)) {
                    props.load(inputStream);
                }
            }
        }

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-payments");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

        try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(TOPIC));

            while (true) {
                final ConsumerRecords<String, Payment> records = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<String, Payment> record : records) {
                    final String key = record.key();
                    final Payment value = record.value();
                    System.out.printf("key = %s, value = %s%n", key, value);
                }
            }

        }
    }

}

コンシューマーを実行します。

mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
  -Dexec.args="$HOME/.confluent/java.config"

期待される出力:

...
key = id0, value = {"id": "id0", "amount": 1000.0}
key = id1, value = {"id": "id1", "amount": 1000.0}
key = id2, value = {"id": "id2", "amount": 1000.0}
key = id3, value = {"id": "id3", "amount": 1000.0}
key = id4, value = {"id": "id4", "amount": 1000.0}
key = id5, value = {"id": "id5", "amount": 1000.0}
key = id6, value = {"id": "id6", "amount": 1000.0}
key = id7, value = {"id": "id7", "amount": 1000.0}
key = id8, value = {"id": "id8", "amount": 1000.0}
key = id9, value = {"id": "id9", "amount": 1000.0}
...

次のステップ

  • スキーマの進化 — 下位互換性を維持したまま、フィールドの追加や変更を行います。詳細については、「Confluent Platform の Schema Registry」をご参照ください。

  • ユーザーと権限の管理 -- 本番ワークロード用に専用の LDAP ユーザーを作成し、詳細な RBAC ロールを割り当てます。「ユーザーを管理し、権限を付与する」をご参照ください。