このトピックでは、ApsaraMQ for Confluent インスタンスでメッセージを送受信する際にスキーマ検証を実装するためのプロセスと操作について説明します。スキーマ検証により、プロデューサーによって送信されるメッセージが事前に定義されたデータ構造に準拠していることが保証され、データの整合性とシステムの信頼性が向上します。
プロセス
ステップ 1:インスタンスを購入してデプロイする
インスタンスを購入する
ApsaraMQ for Confluent コンソールにログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。
上部のナビゲーションバーで、リージョンを選択し、インスタンスの購入 をクリックします。
表示されるパネルで、[インスタンスバージョン] パラメーターを [Confluent] に設定し、[OK] をクリックします。
インスタンス購入ページで、画面の指示に従ってパラメーターを構成し、[今すぐ購入] をクリックして、支払いを完了します。次の表にパラメーターを示します。
パラメーター
例
インスタンスエディション
Professional
エディション間の違いについては、「エディション」をご参照ください。
期間
12 か月
リージョンとゾーン
中国 (杭州)
計算リソース
クラスターサイズに基づいて計算リソースの数とストレージスペースのサイズを選択し、ビジネス要件に基づいてカスタムコンポーネントを構成します。詳細については、「クラスターリソースを評価する」をご参照ください。
コンポーネントリソース
クラスターサイズに基づいて計算リソースの数とストレージスペースのサイズを選択し、ビジネス要件に基づいてカスタムコンポーネントを構成します。詳細については、「クラスターリソースを評価する」をご参照ください。
説明1 容量単位 (CU) は、1 CPU コアと 4 GB のメモリを持つ計算リソースを表します。
インスタンスをデプロイする
ApsaraMQ for Confluent コンソールにログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。
上部のナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。[インスタンス] ページで、インスタンスを見つけ、[操作] 列の デプロイ をクリックします。
インスタンスのデプロイ パネルで、次の表に示すパラメーターを構成し、OK をクリックします。
インスタンスのデプロイ時に構成されるパラメーター
パラメーター
説明
例
デプロイメントモード
クラスターのデプロイメントモードを選択します。シングルゾーンとマルチゾーンのデプロイメントがサポートされています。
シングルゾーン
ゾーン
ゾーンを選択します。
ゾーン A
VPC
仮想プライベートクラウド (VPC) を選択します。
vpc-bp17fapfdj0dwzjkd****
vSwitch
vSwitch を選択します。 vSwitch が作成されていない場合は、最初に該当のゾーンに vSwitch を作成する必要があります。クラスターが想定どおりにデプロイされるように、各 vSwitch に 64 個以上の使用可能な IP アドレスを指定します。
vsw-bp1gbjhj53hdjdkg****
SLB
デフォルトでは、SLB は有効になっています。
なし
パブリックネットワーク IP の割り当て
クラスターのインターネットアクセスを有効にするかどうかを指定します。
有効
ログインユーザー名
デフォルトでは、コントロールセンターは root ユーザーを使用します。
root
ログインパスワード
コントロールセンターにログインするためのパスワードを設定します。
******
パスワードの確認
パスワードを確認するために、パスワードをもう一度入力します。
******
パラメーターが構成されると、インスタンスは デプロイ中 状態になります。インスタンスのデプロイには約 10 ~ 30 分かかります。
ステップ 2:コントロールセンターにログインする
ApsaraMQ for Confluent コンソールにログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。
上部のナビゲーションバーで、管理する ApsaraMQ for Confluent インスタンスが存在するリージョンを選択します。次に、[インスタンス] ページで、インスタンスの名前をクリックします。
インスタンスの詳細 ページの右上隅にある [コンソールにログイン] をクリックして、コントロールセンターにログインします。
説明コントロールセンターにログインするためのユーザー名とパスワードは、ApsaraMQ for Confluent インスタンスをデプロイしたときに構成した root ユーザーのユーザー名とパスワードです。
コントロールセンターの [ホーム] ページに入ります。
(オプション) ステップ 3:コネクタを追加する
コネクタ機能を使用するには、Kafka Connect を購入し、ApsaraMQ for Confluent インスタンスにコネクタを追加する必要があります。
ApsaraMQ for Confluent コンソールで ApsaraMQ for Confluent インスタンスにコネクタを追加した後でのみ、コントロールセンターでコネクタを使用できます。
ApsaraMQ for Confluent インスタンスのさまざまなエディションでは、さまざまなタイプのコネクタがサポートされています。詳細については、「コネクタを管理する」をご参照ください。
コネクタの使用方法については、「コネクタのユースケース」をご参照ください。
ステップ 4:トピックを作成する
コントロールセンターにログインします。 [ホーム] ページで、[controlcenter.clusterk] カードをクリックして、[クラスターの概要] ページに移動します。
左側のナビゲーションウィンドウで、[トピック] をクリックします。次に、[トピック] ページの右上隅にある [+ トピックを追加] をクリックします。
[新しいトピック] ページで、トピック名とパーティション数を指定し、[デフォルトで作成] をクリックします。
トピックを作成したら、トピックの詳細ページに移動して、トピックの詳細を表示します。
ステップ 5:スキーマ検証を有効にする
トピックの詳細ページで、[構成] タブをクリックします。次に、[設定の編集] をクリックします。
[エキスパートモードに切り替える] をクリックします。
[confluent_value_schema_validation] パラメーターを [true] に設定し、[変更を保存] をクリックして、メッセージ本文のスキーマ検証を有効にします。スキーマ検証が有効になると、メッセージの送受信時にデータ形式が検証されます。
ステップ 6:ユーザーを管理し、権限を付与する
root ユーザー以外の Lightweight Directory Access Protocol (LDAP) ユーザーを使用する場合は、ApsaraMQ for Confluent コンソールに LDAP ユーザーを追加し、必要な権限を付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。
ステップ 7:ネットワークアクセスとセキュリティ設定を構成する
メッセージを送受信する場合は、サービスエンドポイントを取得し、対応するサービス権限を LDAP ユーザーに付与する必要があります。
エンドポイントを取得する
ビジネス要件に基づいて、サービスの内部エンドポイントまたはパブリックエンドポイントを選択します。パブリックエンドポイントを使用する場合は、インターネットアクセスを有効にする必要があります。詳細については、「インターネットアクセス機能を有効にする」をご参照ください。
LDAP ユーザーに権限を付与する
対応するサービス権限を LDAP ユーザーに付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。
説明root ユーザーはすべての権限を持っています。本番環境では、別の LDAP ユーザーを使用し、権限を付与することをお勧めします。
セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。
ステップ 8:メッセージを送受信する
環境を準備する
このトピックでは、ApsaraMQ for Confluent のメッセージは Linux サーバーで送受信されます。
Java 8 または 11 をインストールします。 ApsaraMQ for Confluent でサポートされている Java バージョンについては、「Java」をご参照ください。
Maven 3.8 以降をインストールします。詳細については、「ダウンロード」をご参照ください。
次のコマンドを実行して、サンプルコードをクローンし、
7.9.0-post
ブランチに切り替えます。git clone https://github.com/confluentinc/examples.git cd examples/clients/avro git checkout 7.9.0-post
java.config
という名前の構成ファイルを作成します。作成場所は$HOME/.confluent/
パスです。ここで、$HOME
はホームディレクトリです。構成ファイルで、以下の項目を構成します。# Kafka プロデューサー、コンシューマー、および管理者に必要な接続構成 bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # 2.6 より前の Apache Kafka クライアントの正確性のために必要 client.dns.lookup=use_all_dns_ips # 3.0 より前の Apache Kafka クライアントでの高可用性のためのベストプラクティス session.timeout.ms=45000 # データ損失を防ぐための Kafka プロデューサーのベストプラクティス acks=all # Confluent Cloud Schema Registry に必要な接続構成 schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
次の表にパラメーターを示します。
パラメーター
説明
例
BROKER_ENDPOINT
Kafka サービスのエンドポイント。
ApsaraMQ for Confluent コンソールの [アクセスリンクとポート] ページでエンドポイントを取得できます。パブリックエンドポイントを使用する場合は、インターネットアクセスを有効にする必要があります。詳細については、「インターネットアクセス機能を有効にする」をご参照ください。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。
pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092
CLUSTER_API_KEY
LDAP ユーザーのユーザー名とパスワード。ApsaraMQ for Confluent コンソールの [ユーザー] ページでユーザー名とパスワードを取得できます。
テスト中は、ルートアカウントとそのパスワードを使用できます。別の LDAP ユーザーを使用する場合は、ApsaraMQ for Confluent コンソールでユーザーを作成し、Kafka クラスターの対応する権限を付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。
root
CLUSTER_API_SECRET
******
SR_ENDPOINT
Schema Registry サービスのエンドポイント。
ApsaraMQ for Confluent コンソールの [アクセスリンクとポート] ページでエンドポイントを取得できます。パブリックエンドポイントを使用する場合は、インターネットアクセスを有効にする必要があります。詳細については、「インターネットアクセス機能を有効にする」をご参照ください。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。
pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443
SR_API_KEY
LDAP ユーザーのユーザー名とパスワード。ApsaraMQ for Confluent コンソールの [ユーザー] ページでユーザー名とパスワードを取得できます。
テスト中は、ルートアカウントとそのパスワードを使用できます。別の LDAP ユーザーを使用する場合は、ApsaraMQ for Confluent コンソールでユーザーを作成し、Schema Registry の対応する権限を付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。
root
SR_API_SECRET
******
コードを実行してメッセージを送受信します。次のサンプルコードでは、Topic パラメーターは
transactions
に設定されています。テスト中は、transactions
という名前のトピックを作成できます。別のトピックを使用する場合は、Topic パラメーターの値を実際のトピック名に置き換えます。
スキーマを作成する
プロジェクトの
examples/clients/avro
ディレクトリに移動し、次のコマンドを実行してPayment.avsc
ファイルの内容を表示します。cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc
返された結果:
{ "namespace": "io.confluent.examples.clients.basicavro", "type": "record", "name": "Payment", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"} ] }
コントロールセンターのトピックの詳細ページで、[スキーマ] タブをクリックします。次に、[スキーマを設定] をクリックします。
[スキーマ] タブで、[Avro] をクリックし、コードエディターに
Payment.avsc
ファイルの内容を入力して、[作成] をクリックします。
メッセージを送信する
プロジェクトの
examples/clients/avro
ディレクトリに移動し、次のコマンドを実行してプロジェクトをコンパイルします。mvn clean compile package
次のコードを実行してメッセージを送信します。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \ -Dexec.args="$HOME/.confluent/java.config"
次のような情報が返された場合、メッセージは送信されています。
... transactions という名前のトピックに 10 件のメッセージが正常に生成されました [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ ...
コントロールセンターで送信されたメッセージを表示します。
メッセージを消費する
プロジェクトの
examples/clients/avro
ディレクトリに移動し、次のコマンドを実行してプロジェクトをコンパイルします。mvn clean compile package
次のコードを実行してメッセージを消費します。
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \ -Dexec.args="$HOME/.confluent/java.config"
次のような情報が返された場合、メッセージは消費されています。
... key = id0, value = {"id": "id0", "amount": 1000.0} key = id1, value = {"id": "id1", "amount": 1000.0} key = id2, value = {"id": "id2", "amount": 1000.0} key = id3, value = {"id": "id3", "amount": 1000.0} key = id4, value = {"id": "id4", "amount": 1000.0} key = id5, value = {"id": "id5", "amount": 1000.0} key = id6, value = {"id": "id6", "amount": 1000.0} key = id7, value = {"id": "id7", "amount": 1000.0} key = id8, value = {"id": "id8", "amount": 1000.0} key = id9, value = {"id": "id9", "amount": 1000.0} ...