すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:スキーマ検証を実装するための操作

最終更新日:Jun 11, 2025

このトピックでは、ApsaraMQ for Confluent インスタンスでメッセージを送受信する際にスキーマ検証を実装するためのプロセスと操作について説明します。スキーマ検証により、プロデューサーによって送信されるメッセージが事前に定義されたデータ構造に準拠していることが保証され、データの整合性とシステムの信頼性が向上します。

プロセス

ステップ 1:インスタンスを購入してデプロイする

インスタンスを購入する

  1. ApsaraMQ for Confluent コンソールにログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。

  2. 上部のナビゲーションバーで、リージョンを選択し、インスタンスの購入 をクリックします。

  3. 表示されるパネルで、[インスタンスバージョン] パラメーターを [Confluent] に設定し、[OK] をクリックします。

  4. インスタンス購入ページで、画面の指示に従ってパラメーターを構成し、[今すぐ購入] をクリックして、支払いを完了します。次の表にパラメーターを示します。

    パラメーター

    インスタンスエディション

    Professional

    エディション間の違いについては、「エディション」をご参照ください。

    期間

    12 か月

    リージョンとゾーン

    中国 (杭州)

    計算リソース

    クラスターサイズに基づいて計算リソースの数とストレージスペースのサイズを選択し、ビジネス要件に基づいてカスタムコンポーネントを構成します。詳細については、「クラスターリソースを評価する」をご参照ください。

    コンポーネントリソース

    クラスターサイズに基づいて計算リソースの数とストレージスペースのサイズを選択し、ビジネス要件に基づいてカスタムコンポーネントを構成します。詳細については、「クラスターリソースを評価する」をご参照ください。

    説明

    1 容量単位 (CU) は、1 CPU コアと 4 GB のメモリを持つ計算リソースを表します。

インスタンスをデプロイする

  1. ApsaraMQ for Confluent コンソールにログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。

  2. 上部のナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。[インスタンス] ページで、インスタンスを見つけ、[操作] 列の デプロイ をクリックします。

  3. インスタンスのデプロイ パネルで、次の表に示すパラメーターを構成し、OK をクリックします。

    インスタンスのデプロイ時に構成されるパラメーター

    パラメーター

    説明

    デプロイメントモード

    クラスターのデプロイメントモードを選択します。シングルゾーンとマルチゾーンのデプロイメントがサポートされています。

    シングルゾーン

    ゾーン

    ゾーンを選択します。

    ゾーン A

    VPC

    仮想プライベートクラウド (VPC) を選択します。

    vpc-bp17fapfdj0dwzjkd****

    vSwitch

    vSwitch を選択します。 vSwitch が作成されていない場合は、最初に該当のゾーンに vSwitch を作成する必要があります。クラスターが想定どおりにデプロイされるように、各 vSwitch に 64 個以上の使用可能な IP アドレスを指定します。

    vsw-bp1gbjhj53hdjdkg****

    SLB

    デフォルトでは、SLB は有効になっています。

    なし

    パブリックネットワーク IP の割り当て

    クラスターのインターネットアクセスを有効にするかどうかを指定します。

    有効

    ログインユーザー名

    デフォルトでは、コントロールセンターは root ユーザーを使用します。

    root

    ログインパスワード

    コントロールセンターにログインするためのパスワードを設定します。

    ******

    パスワードの確認

    パスワードを確認するために、パスワードをもう一度入力します。

    ******

    パラメーターが構成されると、インスタンスは デプロイ中 状態になります。インスタンスのデプロイには約 10 ~ 30 分かかります。

ステップ 2:コントロールセンターにログインする

  1. ApsaraMQ for Confluent コンソールにログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。

  2. 上部のナビゲーションバーで、管理する ApsaraMQ for Confluent インスタンスが存在するリージョンを選択します。次に、[インスタンス] ページで、インスタンスの名前をクリックします。

  3. インスタンスの詳細 ページの右上隅にある [コンソールにログイン] をクリックして、コントロールセンターにログインします。

    説明

    コントロールセンターにログインするためのユーザー名とパスワードは、ApsaraMQ for Confluent インスタンスをデプロイしたときに構成した root ユーザーのユーザー名とパスワードです。

    image

  4. コントロールセンターの [ホーム] ページに入ります。

    image

(オプション) ステップ 3:コネクタを追加する

コネクタ機能を使用するには、Kafka Connect を購入し、ApsaraMQ for Confluent インスタンスにコネクタを追加する必要があります。

  • ApsaraMQ for Confluent コンソールで ApsaraMQ for Confluent インスタンスにコネクタを追加した後でのみ、コントロールセンターでコネクタを使用できます。

  • ApsaraMQ for Confluent インスタンスのさまざまなエディションでは、さまざまなタイプのコネクタがサポートされています。詳細については、「コネクタを管理する」をご参照ください。

  • コネクタの使用方法については、「コネクタのユースケース」をご参照ください。

ステップ 4:トピックを作成する

  1. コントロールセンターにログインします。 [ホーム] ページで、[controlcenter.clusterk] カードをクリックして、[クラスターの概要] ページに移動します。

    image

  2. 左側のナビゲーションウィンドウで、[トピック] をクリックします。次に、[トピック] ページの右上隅にある [+ トピックを追加] をクリックします。

    image

  3. [新しいトピック] ページで、トピック名とパーティション数を指定し、[デフォルトで作成] をクリックします。

    image

  4. トピックを作成したら、トピックの詳細ページに移動して、トピックの詳細を表示します。

    image

ステップ 5:スキーマ検証を有効にする

  1. トピックの詳細ページで、[構成] タブをクリックします。次に、[設定の編集] をクリックします。

    image

  2. [エキスパートモードに切り替える] をクリックします。

    image

  3. [confluent_value_schema_validation] パラメーターを [true] に設定し、[変更を保存] をクリックして、メッセージ本文のスキーマ検証を有効にします。スキーマ検証が有効になると、メッセージの送受信時にデータ形式が検証されます。

    image

ステップ 6:ユーザーを管理し、権限を付与する

root ユーザー以外の Lightweight Directory Access Protocol (LDAP) ユーザーを使用する場合は、ApsaraMQ for Confluent コンソールに LDAP ユーザーを追加し、必要な権限を付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。

ステップ 7:ネットワークアクセスとセキュリティ設定を構成する

メッセージを送受信する場合は、サービスエンドポイントを取得し、対応するサービス権限を LDAP ユーザーに付与する必要があります。

  • エンドポイントを取得する

    ビジネス要件に基づいて、サービスの内部エンドポイントまたはパブリックエンドポイントを選択します。パブリックエンドポイントを使用する場合は、インターネットアクセスを有効にする必要があります。詳細については、「インターネットアクセス機能を有効にする」をご参照ください。

  • LDAP ユーザーに権限を付与する

    対応するサービス権限を LDAP ユーザーに付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。

    説明

    root ユーザーはすべての権限を持っています。本番環境では、別の LDAP ユーザーを使用し、権限を付与することをお勧めします。

  • セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。

ステップ 8:メッセージを送受信する

環境を準備する

このトピックでは、ApsaraMQ for Confluent のメッセージは Linux サーバーで送受信されます。

  1. Java 8 または 11 をインストールします。 ApsaraMQ for Confluent でサポートされている Java バージョンについては、「Java」をご参照ください。

  2. Maven 3.8 以降をインストールします。詳細については、「ダウンロード」をご参照ください。

  3. 次のコマンドを実行して、サンプルコードをクローンし、7.9.0-post ブランチに切り替えます。

    git clone https://github.com/confluentinc/examples.git
    
    cd examples/clients/avro
    
    git checkout 7.9.0-post
  4. java.config という名前の構成ファイルを作成します。作成場所は $HOME/.confluent/ パスです。ここで、$HOME はホームディレクトリです。構成ファイルで、以下の項目を構成します。

    # Kafka プロデューサー、コンシューマー、および管理者に必要な接続構成
    bootstrap.servers={{ BROKER_ENDPOINT }}
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
    sasl.mechanism=PLAIN
    
    # 2.6 より前の Apache Kafka クライアントの正確性のために必要
    client.dns.lookup=use_all_dns_ips
    
    # 3.0 より前の Apache Kafka クライアントでの高可用性のためのベストプラクティス
    session.timeout.ms=45000
    
    # データ損失を防ぐための Kafka プロデューサーのベストプラクティス
    acks=all
    
    # Confluent Cloud Schema Registry に必要な接続構成
    schema.registry.url=https://{{ SR_ENDPOINT }}
    basic.auth.credentials.source=USER_INFO
    basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

    次の表にパラメーターを示します。

    パラメーター

    説明

    BROKER_ENDPOINT

    Kafka サービスのエンドポイント。

    ApsaraMQ for Confluent コンソールの [アクセスリンクとポート] ページでエンドポイントを取得できます。パブリックエンドポイントを使用する場合は、インターネットアクセスを有効にする必要があります。詳細については、「インターネットアクセス機能を有効にする」をご参照ください。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。

    pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092

    CLUSTER_API_KEY

    LDAP ユーザーのユーザー名とパスワード。ApsaraMQ for Confluent コンソールの [ユーザー] ページでユーザー名とパスワードを取得できます。

    テスト中は、ルートアカウントとそのパスワードを使用できます。別の LDAP ユーザーを使用する場合は、ApsaraMQ for Confluent コンソールでユーザーを作成し、Kafka クラスターの対応する権限を付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。

    root

    CLUSTER_API_SECRET

    ******

    SR_ENDPOINT

    Schema Registry サービスのエンドポイント。

    ApsaraMQ for Confluent コンソールの [アクセスリンクとポート] ページでエンドポイントを取得できます。パブリックエンドポイントを使用する場合は、インターネットアクセスを有効にする必要があります。詳細については、「インターネットアクセス機能を有効にする」をご参照ください。セキュリティ設定については、「ネットワークアクセスとセキュリティ設定を構成する」をご参照ください。

    pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443

    SR_API_KEY

    LDAP ユーザーのユーザー名とパスワード。ApsaraMQ for Confluent コンソールの [ユーザー] ページでユーザー名とパスワードを取得できます。

    テスト中は、ルートアカウントとそのパスワードを使用できます。別の LDAP ユーザーを使用する場合は、ApsaraMQ for Confluent コンソールでユーザーを作成し、Schema Registry の対応する権限を付与する必要があります。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。

    root

    SR_API_SECRET

    ******

  5. コードを実行してメッセージを送受信します。次のサンプルコードでは、Topic パラメーターは transactions に設定されています。テスト中は、transactions という名前のトピックを作成できます。別のトピックを使用する場合は、Topic パラメーターの値を実際のトピック名に置き換えます。

    メッセージを送信するためのサンプルコード

    import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import org.apache.kafka.common.errors.SerializationException;
    
    import java.util.Properties;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.io.FileInputStream;
    import java.io.InputStream;
    
    public class ProducerExample {
    
        private static final String TOPIC = "transactions";
        private static final Properties props = new Properties();
        private static String configFile;
    
        @SuppressWarnings("InfiniteLoopStatement")
        public static void main(final String[] args) throws IOException {
    
            if (args.length < 1) {
              // 下位互換性、localhost を想定
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
            } else {
              // ローカル構成ファイルからプロパティを読み込む
              // Kafka クラスターに接続するための構成パラメーターを使用して構成ファイル (例:'$HOME/.confluent/java.config') を作成します。これは、ローカルホスト、Confluent Cloud、またはその他のクラスター上に配置できます。
              // ドキュメント:https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
              configFile = args[0];
              if (!Files.exists(Paths.get(configFile))) {
                throw new IOException(configFile + " not found.");
              } else {
                try (InputStream inputStream = new FileInputStream(configFile)) {
                  props.load(inputStream);
                }
              }
            }
    
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    
            try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) {
    
                for (long i = 0; i < 10; i++) {
                    final String orderId = "id" + Long.toString(i);
                    final Payment payment = new Payment(orderId, 1000.00d);
                    final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
                    producer.send(record);
                    Thread.sleep(1000L);
                }
    
                producer.flush();
                System.out.printf("transactions という名前のトピックに 10 件のメッセージが正常に生成されました%n", TOPIC);
    
            } catch (final SerializationException e) {
                e.printStackTrace();
            } catch (final InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
    }

    メッセージを受信するためのサンプルコード

    import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.io.FileInputStream;
    import java.io.InputStream;
    
    public class ConsumerExample {
    
        private static final String TOPIC = "transactions";
        private static final Properties props = new Properties();
        private static String configFile;
    
        @SuppressWarnings("InfiniteLoopStatement")
        public static void main(final String[] args) throws IOException {
    
            if (args.length < 1) {
              // 下位互換性、localhost を想定
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
            } else {
              // ローカル構成ファイルからプロパティを読み込む
              // Kafka クラスターに接続するための構成パラメーターを使用して構成ファイル (例:'$HOME/.confluent/java.config') を作成します。これは、ローカルホスト、Confluent Cloud、またはその他のクラスター上に配置できます。
              // ドキュメント:https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
              configFile = args[0];
              if (!Files.exists(Paths.get(configFile))) {
                throw new IOException(configFile + " not found.");
              } else {
                try (InputStream inputStream = new FileInputStream(configFile)) {
                  props.load(inputStream);
                }
              }
            }
    
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-payments");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
    
            try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
                consumer.subscribe(Collections.singletonList(TOPIC));
    
                while (true) {
                    final ConsumerRecords<String, Payment> records = consumer.poll(Duration.ofMillis(100));
                    for (final ConsumerRecord<String, Payment> record : records) {
                        final String key = record.key();
                        final Payment value = record.value();
                        System.out.printf("key = %s, value = %s%n", key, value);
                    }
                }
    
            }
        }
    
    }

    完全なサンプルコードについては、「Avro を使用した Java プロデューサーとコンシューマー」をご参照ください。

スキーマを作成する

  1. プロジェクトの examples/clients/avro ディレクトリに移動し、次のコマンドを実行して Payment.avsc ファイルの内容を表示します。

    cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

    返された結果:

    {
     "namespace": "io.confluent.examples.clients.basicavro",
     "type": "record",
     "name": "Payment",
     "fields": [
         {"name": "id", "type": "string"},
         {"name": "amount", "type": "double"}
     ]
    }
    
  2. コントロールセンターのトピックの詳細ページで、[スキーマ] タブをクリックします。次に、[スキーマを設定] をクリックします。

  3. [スキーマ] タブで、[Avro] をクリックし、コードエディターに Payment.avsc ファイルの内容を入力して、[作成] をクリックします。

    image

メッセージを送信する

  1. プロジェクトの examples/clients/avro ディレクトリに移動し、次のコマンドを実行してプロジェクトをコンパイルします。

    mvn clean compile package
  2. 次のコードを実行してメッセージを送信します。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
      -Dexec.args="$HOME/.confluent/java.config"

    次のような情報が返された場合、メッセージは送信されています。

    ...
    transactions という名前のトピックに 10 件のメッセージが正常に生成されました
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    ...
  3. コントロールセンターで送信されたメッセージを表示します。

    image

メッセージを消費する

  1. プロジェクトの examples/clients/avro ディレクトリに移動し、次のコマンドを実行してプロジェクトをコンパイルします。

    mvn clean compile package
  2. 次のコードを実行してメッセージを消費します。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
      -Dexec.args="$HOME/.confluent/java.config"

    次のような情報が返された場合、メッセージは消費されています。

    ...
    key = id0, value = {"id": "id0", "amount": 1000.0}
    key = id1, value = {"id": "id1", "amount": 1000.0}
    key = id2, value = {"id": "id2", "amount": 1000.0}
    key = id3, value = {"id": "id3", "amount": 1000.0}
    key = id4, value = {"id": "id4", "amount": 1000.0}
    key = id5, value = {"id": "id5", "amount": 1000.0}
    key = id6, value = {"id": "id6", "amount": 1000.0}
    key = id7, value = {"id": "id7", "amount": 1000.0}
    key = id8, value = {"id": "id8", "amount": 1000.0}
    key = id9, value = {"id": "id9", "amount": 1000.0}
    ...