すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:ksqlDB

最終更新日:May 31, 2025

ksqlDB は、Apache Kafka 用のストリーミング SQL エンジンです。使いやすい GUI を提供することでストリーム処理を簡素化し、SQL 文を実行できます。これにより、Apache Kafka のデータを処理し、ストリーミングデータに対して SQL クエリを中断することなく実行できます。 ksqlDB は、集約、接続、ウィンドウ操作、セッション操作など、ストリーム処理における幅広い操作をサポートしています。

アーキテクチャ

次の図は、従来のストリーム処理のアーキテクチャと ksqlDB ベースのストリーム処理のアーキテクチャを示しています。後者のアーキテクチャでは、ストリーム処理エンジンとコネクタが ksqlDB に統合されています。 ksqlDB はまた、ストリーム処理中に SQL クエリを実行するためのマテリアライズドビューも提供します。詳細については、「Confluent Platform 用の ksqlDB」をご参照ください。

  • 従来のストリーム処理アプリケーションのアーキテクチャimage

  • ksqlDB ベースのストリーム処理アプリケーションのアーキテクチャimage

ksqlDB の使用

トピックの作成と構成

  1. トピックを作成する。このトピックでは、ksql_test という名前のトピックが作成されます。

  2. スキーマを作成する。検証モードとして [Avro] を選択し、次の入力規則を追加します。

    {
        "namespace": "io.confluent.examples.clients.basicavro",
        "type": "record",
        "name": "Payment",
        "fields": [
            {
                "name": "id",
                "type": "string"
            },
            {
                "name": "amount",
                "type": "double"
            }
        ]
    }
  3. ksql_test トピックの スキーマ検証を有効にする

権限付与

ApsaraMQ for Confluent では、ロールベースアクセス制御 (RBAC) 権限付与を使用して ksqlDB クラスタを管理できます。このトピックでは、test という名前のユーザーが作成されます。

  1. test ユーザーを作成し、そのユーザーに次の権限を付与します。詳細については、「ユーザーを管理し、権限を付与する」をご参照ください。

    ユーザー名

    クラスタタイプ

    リソースタイプ

    ロール

    test

    Kafka クラスタ

    クラスタ

    SystemAdmin

    test

    KSQL

    クラスタ

    ResourceOwner

    test

    スキーマレジストリ

    クラスタ

    SystemAdmin

  2. ksql_test トピックに対する読み取り専用権限を ksqlDB のデフォルトユーザー ksql に付与します。

    ユーザー名

    クラスタタイプ

    リソースタイプ

    ロール

    ksql

    Kafka クラスタ

    トピック

    DeveloperRead

手順

  1. ApsaraMQ for Confluent コンソール にログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。

  2. 上部のナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。 [インスタンス] ページで、インスタンスの名前をクリックします。

  3. インスタンスの詳細 ページの右上隅にある [Control Center にログイン] をクリックして、Control Center にログインします。

  4. Control Center の [ホーム] ページで、[controlcenter.clusterk] カードをクリックして [クラスタ概要] ページに移動します。

    image

  5. 左側のナビゲーションウィンドウで、[ksqlDB] をクリックします。次に、管理する ksqlDB クラスタの名前をクリックします。

  6. クラスタ詳細ページで、[エディター] タブをクリックします。このタブでストリームを作成し、ksql コマンドを使用してデータをクエリできます。詳細については、「クイックスタート」をご参照ください。

    • ストリームを作成する

      CREATE STREAM ksql_test_stream WITH (KAFKA_TOPIC='ksql_test',VALUE_FORMAT='AVRO');
    • ストリームからデータをクエリする

      SELECT * FROM ksql_test_stream EMIT CHANGES;

メッセージ送信のテストと検証

  1. ストリームデータクエリを有効にします。

    [ksqldb] ページで、[エディター] タブをクリックし、次のクエリ文を入力して、[クエリを実行] をクリックします。

    SELECT * FROM ksql_test_stream EMIT CHANGES;

    image

  2. テストメッセージを送信します。

    1. 新しい Control Center ウィンドウを開きます。

    2. ksql_test トピックの詳細ページで、[メッセージ] タブをクリックし、[新しいメッセージを作成] をクリックします。

    3. [新しいメッセージを作成] パネルで、メッセージコンテンツを入力し、[作成] をクリックします。

      {
          "id": "Tome",
          "amount": 18
      }

      image

  3. メッセージ送信を確認します。

    以前に開いたストリームデータクエリウィンドウで、送信されたテストメッセージがクエリされていることを確認できます。

    image

その他の操作