Confluent 版 ApsaraMQ は、スキーマレジストリを使用してスキーマを管理します。このトピックでは、Linux におけるスキーマレジストリの基本操作について説明します。
始める前に
Confluent 版 ApsaraMQ インスタンスを購入します。詳細については、「インスタンスの購入とデプロイ」をご参照ください。
Kafka クラスタとスキーマレジストリ クラスタにアクセスするための権限を取得します。詳細については、「RBAC 権限付与」をご参照ください。
Java 8 または 11 をインストールします。サポートされている Java バージョンについては、「Java」をご参照ください。
Maven 3.8 以降をインストールします。詳細については、「ダウンロード」をご参照ください。
ステップ 1: サンプルコードを準備する
次のコマンドを実行して、サンプルコードをクローンし、
7.9.0-post
ブランチに切り替えます。git clone https://github.com/confluentinc/examples.git cd examples/clients/avro git checkout 7.9.0-post
$HOME/.confluent/
パスにjava.config
という名前の構成ファイルを作成します。ここで、$HOME
はホームディレクトリです。構成ファイルで、以下の項目を構成します。# Kafka プロデューサー、コンシューマー、および管理者に必要な接続構成 bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # 2.6 より前の Apache Kafka クライアントの正確性のために必要 client.dns.lookup=use_all_dns_ips # 3.0 より前の Apache Kafka クライアントでの高可用性のためのベストプラクティス session.timeout.ms=45000 # データ損失を防ぐための Kafka プロデューサーのベストプラクティス acks=all # Confluent Cloud スキーマレジストリに必要な接続構成 schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
次の表にパラメーターを示します。
パラメーター
説明
例
BROKER_ENDPOINT
Kafka サービスのエンドポイント。
Confluent 版 ApsaraMQ コンソールの アクセスリンクとポート ページでエンドポイントを取得できます。パブリックエンドポイントを使用する場合は、インターネットアクセスを有効にする必要があります。詳細については、「インターネットアクセス機能を有効にする」をご参照ください。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。
pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092
CLUSTER_API_KEY
LDAP ユーザーのユーザー名とパスワード。Confluent 版 ApsaraMQ コンソールの ユーザー ページでユーザー名とパスワードを取得できます。
テスト中は、ルートアカウントとそのパスワードを使用できます。別の LDAP ユーザーを使用する場合は、Confluent 版 ApsaraMQ コンソールでユーザーを作成し、Kafka クラスタの対応する権限を付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。
root
CLUSTER_API_SECRET
******
SR_ENDPOINT
スキーマレジストリ サービスのエンドポイント。
Confluent 版 ApsaraMQ コンソールの アクセスリンクとポート ページでエンドポイントを取得できます。パブリックエンドポイントを使用する場合は、インターネットアクセスを有効にする必要があります。詳細については、「インターネットアクセス機能を有効にする」をご参照ください。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。
pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443
SR_API_KEY
LDAP ユーザーのユーザー名とパスワード。Confluent 版 ApsaraMQ コンソールの ユーザー ページでユーザー名とパスワードを取得できます。
テスト中は、ルートアカウントとそのパスワードを使用できます。別の LDAP ユーザーを使用する場合は、Confluent 版 ApsaraMQ コンソールでユーザーを作成し、スキーマレジストリの対応する権限を付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。
root
SR_API_SECRET
******
ステップ 2: トピックを作成する
サンプルコードでは、Topic パラメーターは transactions
に設定されています。テスト中は、transactions
という名前のトピックを作成できます。別のトピックを使用する場合は、Topic パラメーターの値を実際のトピック名に置き換えます。
Control Center にログオンします。[ホーム] ページで、[controlcenter.clusterk] カードをクリックして [クラスタ概要] ページに移動します。
左側のナビゲーションウィンドウで、[トピック] をクリックします。次に、[トピック] ページの右上隅にある [+ トピックを追加] をクリックします。
[新しいトピック] ページで、トピック名とパーティション数を指定し、[デフォルトで作成] をクリックします。
トピックを作成したら、トピック詳細ページに移動してトピックの詳細を表示します。
ステップ 3: スキーマ検証を有効にする
トピック詳細ページで、[構成] タブをクリックします。次に、[設定の編集] をクリックします。
[エキスパートモードに切り替え] をクリックします。
[confluent_value_schema_validation] パラメーターを [true] に設定し、[変更を保存] をクリックして、メッセージ本文のスキーマ検証を有効にします。スキーマ検証が有効になると、メッセージの送受信時にデータ形式が検証されます。
ステップ 4: スキーマを作成する
プロジェクトの
examples/clients/avro
ディレクトリに移動し、次のコマンドを実行してPayment.avsc
ファイルの内容を表示します。cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc
戻り値:
{ "namespace": "io.confluent.examples.clients.basicavro", "type": "record", "name": "Payment", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"} ] }
Control Center のトピック詳細ページで、[スキーマ] をクリックします。次に、[スキーマを設定] をクリックします。
[スキーマ] タブで、[Avro] をクリックし、コードエディターに
Payment.avsc
ファイルの内容を入力し、[作成] をクリックします。
ステップ 5: メッセージを送受信する
メッセージを送信する
スキーマの作成時に検証形式として Avro を指定した場合、メッセージの送信時にメッセージシリアル化方式として KafkaAvroSerializer
クラスを指定し、メッセージ値クラスとして Payment
を構成する必要があります。
サンプルコード:
メッセージを送信するには、次の手順を実行します。
プロジェクトの
examples/clients/avro
ディレクトリに移動し、次のコマンドを実行してプロジェクトをコンパイルします。mvn clean compile package
次のコードを実行してメッセージを送信します。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \ -Dexec.args="$HOME/.confluent/java.config"
次のような情報が返された場合、メッセージは送信されています。
... Successfully produced 10 messages to a topic called transactions [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ ...
Control Center で送信されたメッセージを表示します。
メッセージを受信する
スキーマの作成時に検証形式として Avro を指定した場合、メッセージの送信時にメッセージシリアル化方式として KafkaAvroDeSerializer
クラスを指定し、メッセージ値クラスとして Payment
を構成する必要があります。
サンプルコード:
メッセージを受信するには、次の手順を実行します。
プロジェクトの
examples/clients/avro
ディレクトリに移動し、次のコマンドを実行してプロジェクトをコンパイルします。mvn clean compile package
次のコードを実行してメッセージを受信します。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \ -Dexec.args="$HOME/.confluent/java.config"
次のような情報が返された場合、メッセージは受信されています。
... key = id0, value = {"id": "id0", "amount": 1000.0} key = id1, value = {"id": "id1", "amount": 1000.0} key = id2, value = {"id": "id2", "amount": 1000.0} key = id3, value = {"id": "id3", "amount": 1000.0} key = id4, value = {"id": "id4", "amount": 1000.0} key = id5, value = {"id": "id5", "amount": 1000.0} key = id6, value = {"id": "id6", "amount": 1000.0} key = id7, value = {"id": "id7", "amount": 1000.0} key = id8, value = {"id": "id8", "amount": 1000.0} key = id9, value = {"id": "id9", "amount": 1000.0} ...
参照
スキーマレジストリの詳細については、「Confluent Platform のスキーマレジストリ」をご参照ください。