Schema Registry は、Kafka トピックを介して送信されるメッセージの構造を一元的に管理・検証するサービスです。スキーマを登録することで、プロデューサーとコンシューマー間で共通のデータ契約が確立され、不正な形式のメッセージがパイプラインに流入することを防止するとともに、本番環境への展開前に互換性の問題を検出できます。
本ガイドでは、Linux 環境におけるエンドツーエンドのワークフローについて説明します。具体的には、Confluent のサンプルプロジェクトをクローンし、トピックを作成、スキーマ検証を有効化、Avro スキーマを登録し、スキーマ対応のシリアライザーを用いてメッセージを生成および消費する手順を解説します。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
ApsaraMQ for Confluent インスタンスが存在すること。詳細については、「インスタンスの購入とデプロイ」をご参照ください。
Kafka クラスターおよび Schema Registry クラスターへのアクセス権限(RBAC)が付与されていること。詳細については、「RBAC 認可」をご参照ください。
Java 8 または Java 11 がインストールされていること。サポート対象バージョンについては、「Java」をご参照ください。
Maven 3.8 以降がインストールされていること。ダウンロードについては、「Maven ダウンロード」をご参照ください。
ステップ 1:サンプルプロジェクトの準備
Confluent のサンプルリポジトリをクローンし、7.9.0-post ブランチに切り替えます。
git clone https://github.com/confluentinc/examples.git
cd examples/clients/avro
git checkout 7.9.0-postクライアント設定の構成
以下の内容で $HOME/.confluent/java.config を作成します。
# Kafka 接続
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Apache Kafka クライアント (2.6 より前) での正確性確保に必須
client.dns.lookup=use_all_dns_ips
# Apache Kafka クライアント (3.0 より前) での可用性向上を推奨
session.timeout.ms=45000
# データ損失の防止
acks=all
# Schema Registry 接続
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}プレースホルダーを実際の値に置き換えます。
| プレースホルダー | 説明 | 参照先 | 例 |
|---|---|---|---|
{{ BROKER_ENDPOINT }} | Kafka サービスのエンドポイント | [アクセスリンクとポート] ページ。パブリックエンドポイントを使用するには、まず インターネットアクセスを有効にします。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。 | pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092 |
{{ CLUSTER_API_KEY }} | Kafka クラスターの LDAP ユーザー名 | [ユーザー] ページ。 テストでは、ルートアカウントを使用します。 本番環境では、専用のユーザーを作成し、Kafka クラスターの権限を付与します。 | root |
{{ CLUSTER_API_SECRET }} | Kafka クラスターの LDAP パスワード | 上記と同じ | ****** |
{{ SR_ENDPOINT }} | Schema Registry サービスのエンドポイント | [アクセスリンクとポート] ページで、パブリックエンドポイントを使用するには、まず インターネットアクセスを有効にする必要があります。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。 | pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443 |
{{ SR_API_KEY }} | Schema Registry の LDAP ユーザー名 | [ユーザー] ページ。テスト用には、ルートアカウントを使用します。本番環境では、専用ユーザーを作成する とともに、Schema Registry の権限を付与します。 | root |
{{ SR_API_SECRET }} | Schema Registry の LDAP パスワード | 上記と同じページ | ****** |
ステップ 2:トピックの作成
サンプルコードでは transactions という名前のトピックが使用されます。Control Center でこのトピックを作成するか、コード内の transactions をご自身のトピック名に置き換えてください。
Control Center にログインします。ホーム ページで、controlcenter.clusterk カードをクリックして、「クラスターオーバービュー」ページを開きます。

左側ナビゲーションウィンドウで トピック をクリックし、右上隅の + トピックの追加 をクリックします。

「新規トピック」ページで、トピック名とパーティション数を入力し、デフォルト設定で作成 をクリックします。

トピックが作成された後、トピックの詳細ページを開き、設定を確認します。

ステップ 3:スキーマ検証の有効化
スキーマ検証は、プロデューサーおよびコンシューマー双方に対して、ブローカーレベルで登録済みスキーマに準拠しないメッセージを拒否します。
トピックの詳細ページで、構成 タブをクリックし、設定の編集 をクリックします。

エキスパートモードに切り替え をクリックします。

confluent_value_schema_validationをtrueに設定し、変更の保存 をクリックします。
ステップ 4:Avro スキーマの登録
サンプルプロジェクトには、Payment.avsc という Avro スキーマファイルが含まれており、文字列型の id と double 型の amount の 2 つのフィールドを持つ支払いレコードを定義しています。
スキーマファイルを表示します。
cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc出力結果:
{
"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}このスキーマを Control Center に登録します。
トピックの詳細ページで、スキーマ タブをクリックし、スキーマの設定 をクリックします。
フォーマットとして Avro を選択し、スキーマの内容をコードエディタに貼り付け、作成 をクリックします。

ステップ 5:メッセージの生成および消費
スキーマ検証が有効化され、スキーマが登録された状態で、Confluent の Java クライアントを用いて Avro シリアル化されたメッセージを生成および消費します。
プロジェクトのビルド
examples/clients/avro ディレクトリからプロジェクトをコンパイルします。
mvn clean compile packageメッセージの生成
プロデューサーは KafkaAvroSerializer を使用して、登録済みの Avro スキーマに基づいてメッセージの値をシリアル化します。キーはプレーンな文字列としてシリアル化されます。
プロデューサーを実行します。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
-Dexec.args="$HOME/.confluent/java.config"期待される出力:
...
トピック transactions に正常に 10 件のメッセージを生成しました
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
...Control Center でトピックを開き、「メッセージ」タブを確認することで、メッセージが正しく生成されたことを検証できます。

メッセージの消費
コンシューマーは KafkaAvroDeserializer を使用して、メッセージの値を Payment オブジェクトへ逆シリアル化します。SPECIFIC_AVRO_READER_CONFIG を true に設定すると、汎用的な Avro レコードではなく、生成された Payment クラスへの逆シリアル化が可能になります。
コンシューマーを実行します。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
-Dexec.args="$HOME/.confluent/java.config"期待される出力:
...
key = id0, value = {"id": "id0", "amount": 1000.0}
key = id1, value = {"id": "id1", "amount": 1000.0}
key = id2, value = {"id": "id2", "amount": 1000.0}
key = id3, value = {"id": "id3", "amount": 1000.0}
key = id4, value = {"id": "id4", "amount": 1000.0}
key = id5, value = {"id": "id5", "amount": 1000.0}
key = id6, value = {"id": "id6", "amount": 1000.0}
key = id7, value = {"id": "id7", "amount": 1000.0}
key = id8, value = {"id": "id8", "amount": 1000.0}
key = id9, value = {"id": "id9", "amount": 1000.0}
...次のステップ
スキーマの進化 — 下位互換性を維持したまま、フィールドの追加や変更を行います。詳細については、「Confluent Platform の Schema Registry」をご参照ください。
ユーザーと権限の管理 -- 本番ワークロード用に専用の LDAP ユーザーを作成し、詳細な RBAC ロールを割り当てます。「ユーザーを管理し、権限を付与する」をご参照ください。