すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Kafka JSON カタログの管理

最終更新日:Jan 14, 2026

Kafka JSON カタログを設定すると、Realtime Compute for Apache Flink でジョブを開発する際にスキーマを定義することなく、Kafka クラスター内の JSON 形式の Topic に直接アクセスできます。このトピックでは、Kafka JSON カタログの作成、表示、削除方法について説明します。

背景情報

Kafka JSON カタログは、JSON 形式のメッセージを自動的に解析することで、Topic のスキーマを推測します。これにより、Flink SQL で Kafka テーブルのスキーマを宣言することなく、メッセージから特定のフィールド情報を取得できます。Kafka JSON カタログには、次の特徴があります:

  • Kafka JSON カタログのテーブル名は Kafka の Topic 名に対応します。データ定義言語 (DDL) 文を使用して Kafka テーブルを手動で登録する必要はありません。これにより、開発効率と精度が向上します。

  • Kafka JSON カタログによって提供されるテーブルは、Flink SQL ジョブのソーステーブルとして直接使用できます。

  • Kafka JSON カタログを CREATE TABLE AS (CTAS) 文と組み合わせて使用することで、スキーマの変更に合わせてデータを同期できます。

制限事項

  • Kafka JSON カタログは、JSON 形式のメッセージを含む Topic のみをサポートします。

  • VVR 6.0.2 以降を使用するコンピュートエンジンのみが Kafka JSON カタログをサポートします。

    説明

    VVR 4.x を使用している場合、Kafka JSON カタログを使用するには、ジョブを VVR 6.0.2 以降にアップグレードする必要があります。

  • DDL 文を使用して既存の Kafka JSON カタログを変更することはできません。

  • データテーブルのクエリのみが可能です。データベースやテーブルの作成、変更、削除はできません。

    説明

    Kafka JSON カタログを使用する CREATE DATABASE AS (CDAS) または CREATE TABLE AS (CTAS) のシナリオでは、Topic を自動的に作成できます。

  • Kafka JSON カタログは、SSL または SASL 認証が有効になっている Kafka クラスターからの読み取りや書き込みはできません。

  • Kafka JSON カタログによって提供されるテーブルは、Flink SQL ジョブのソーステーブルとして直接使用できます。結果テーブルやルックアップディメンションテーブルとしては使用できません。

  • ApsaraMQ for Kafka では現在、オープンソースの Apache Kafka と同じ API 操作を使用してコンシューマーグループを削除することはできません。Kafka JSON カタログを作成する際には、コンシューマーグループを自動的に削除するために、aliyun.kafka.instanceIdaliyun.kafka.accessKeyIdaliyun.kafka.accessKeySecretaliyun.kafka.endpoint、および aliyun.kafka.regionId パラメーターを設定する必要があります。詳細については、「ApsaraMQ for Kafka とオープンソース Apache Kafka の比較」をご参照ください。

注意事項

Kafka JSON カタログは、サンプルデータを解析してテーブルスキーマを生成します。データ形式に一貫性のない Topic の場合、カタログは可能な限り広いスキーマを返し、デフォルトですべての列を保持します。Topic のデータ形式が変更されると、カタログによって推測されるテーブルスキーマは、異なる時点で一貫性がなくなる可能性があります。ジョブの再起動前後で異なるスキーマが推測された場合、ジョブの実行に問題が発生する可能性があります。

たとえば、Flink SQL ジョブが Kafka JSON カタログ内のテーブルを参照しているとします。ジョブが一定期間実行された後、セーブポイントから再起動すると、前回の実行で使用されたスキーマとは異なる新しいスキーマが取得される可能性があります。しかし、ジョブの実行計画は古いスキーマで生成されたバージョンを依然として使用します。これにより、フィルター条件やフィールド値など、下流コンポーネントで不一致が発生する可能性があります。これを防ぐには、Flink SQL ジョブで CREATE TEMPORARY TABLE 文を使用して Kafka テーブルを作成し、固定スキーマを強制します。

Kafka JSON カタログを作成する

  1. [データ探索] タブのテキストエディターで、Kafka JSON カタログを設定する文を入力します。

    • 自己管理型 Kafka クラスターまたは EMR Kafka クラスター

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100'
      );
    • ApsaraMQ for Kafka

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100',
       'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>',
       'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>',
       'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>',
       'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>',
       'aliyun.kafka.regionId'='<aliyunKafkaRegionId>'
      );

    パラメータ

    タイプ

    説明

    必須

    備考

    YourCatalogName

    String

    Kafka JSON カタログの名前。

    はい

    任意の英語の名前を入力します。

    重要

    パラメーターをカタログ名に置き換えた後、山括弧 (<>) を削除してください。削除しない場合、構文チェックエラーが発生します。

    type

    String

    カタログのタイプ。

    はい

    値は kafka である必要があります。

    properties.bootstrap.servers

    String

    Kafka ブローカーのアドレス。

    はい

    フォーマット:host1:port1,host2:port2,host3:port3

    複数のアドレスはカンマ (,) で区切ります。

    format

    String

    Kafka メッセージのフォーマット。

    はい

    現在、JSON のみがサポートされています。Flink は JSON 形式の Kafka メッセージを解析してスキーマを取得します。

    default-database

    String

    Kafka クラスタの名前。

    いいえ

    デフォルト値は kafka です。カタログがテーブルを特定するには、catalog_name.db_name.table_name という 3 部構成の名前が必要です。このパラメーターはデフォルトの db_name を指定します。Kafka にはデータベースの概念がないため、任意の文字列を使用して Kafka クラスターをデータベースとして表現できます。

    key.fields-prefix

    String

    メッセージキーから解析されたフィールド名に追加されるカスタムプレフィックス。これにより、Kafka メッセージキーの解析後の名前の競合を回避します。

    いいえ

    デフォルト値は key_ です。たとえば、キーフィールド名が a の場合、システムはフィールド名を key_a として解析します。

    説明

    key.fields-prefix パラメーターの値は、value.fields-prefix パラメーターの値のプレフィックスにすることはできません。たとえば、value.fields-prefix を test1_value_ に設定した場合、key.fields-prefix を test1_ に設定することはできません。

    value.fields-prefix

    String

    メッセージ本文 (値) から解析されたフィールド名に追加されるカスタムプレフィックス。これにより、Kafka メッセージ本文の解析後の名前の競合を回避します。

    いいえ

    デフォルト値は value_ です。たとえば、値フィールド名が b の場合、システムはフィールド名を value_b として解析します。

    説明

    value.fields-prefix パラメーターの値は、key.fields-prefix パラメーターの値のプレフィックスにすることはできません。たとえば、key.fields-prefix を test2_value_ に設定した場合、value.fields-prefix を test2_ に設定することはできません。

    timestamp-format.standard

    String

    JSON 形式のメッセージ内の Timestamp 型フィールドを解析するためのフォーマット。システムはまず設定されたフォーマットで解析を試み、失敗した場合は自動的に他のフォーマットを試します。

    いいえ

    有効な値:

    • SQL (デフォルト)

    • ISO-8601

    infer-schema.flatten-nested-columns.enable

    Boolean

    解析中に JSON メッセージ本文 (値) のネストされた列を再帰的に展開するかどうかを指定します。

    いいえ

    有効な値:

    • true:再帰的に展開します。

      展開された列について、Flink は値をインデックスするパスを名前として使用します。たとえば、{"nested": {"col": true}} の列 col の場合、展開された名前は nested.col になります。

      説明

      このパラメーターを true に設定した場合、CREATE TABLE AS (CTAS) 文と組み合わせて使用してください。他の DML 文はネストされた列の自動展開をサポートしていません。

    • false (デフォルト):ネストされた型を String として扱います。

    infer-schema.primitive-as-string

    Boolean

    JSON メッセージ本文 (値) を解析する際に、すべての基本データ型を String 型として推測するかどうかを指定します。

    いいえ

    有効な値:

    • true:すべての基本データ型を String として推測します。

    • false (デフォルト):基本ルールに基づいて型を推測します。

    infer-schema.parse-key-error.field-name

    String

    JSON メッセージキーを解析する際、キーが空でなく解析に失敗した場合、VARBINARY 型のフィールドがテーブルスキーマに追加され、メッセージキーデータを表します。列名は key.fields-prefix プレフィックスとこのパラメーターの値の連結になります。

    いいえ

    デフォルト値は col です。たとえば、メッセージ本文が value_name という名前のフィールドに解析され、メッセージキーが空でなく解析に失敗した場合、返されるスキーマには key_col と value_name の 2 つのフィールドが含まれます。

    infer-schema.compacted-topic-as-upsert-table

    Boolean

    Kafka Topic のクリーンアップポリシーが compact で、メッセージキーが空でない場合に、テーブルを Upsert Kafka テーブルとして使用するかどうかを指定します。

    いいえ

    デフォルト値は true です。CTAS または CDAS 構文を使用して ApsaraMQ for Kafka にデータを同期する場合は、これを true に設定します。

    説明

    VVR 6.0.2 以降のコンピュートエンジンのみがこのパラメーターをサポートします。

    max.fetch.records

    Int

    JSON 形式のメッセージを解析する際に消費するメッセージの最大数。

    いいえ

    デフォルト値は 100 です。

    aliyun.kafka.accessKeyId

    String

    ご利用の Alibaba Cloud アカウントの AccessKey ID。詳細については、「AccessKey ペアの作成」をご参照ください。

    いいえ

    CTAS または CDAS 構文を使用して ApsaraMQ for Kafka にデータを同期する場合に、このパラメーターを設定します。

    説明

    VVR 6.0.2 以降のコンピュートエンジンのみがこのパラメーターをサポートします。

    aliyun.kafka.accessKeySecret

    String

    ご利用の Alibaba Cloud アカウントの AccessKey Secret。詳細については、「AccessKey ペアの作成」をご参照ください。

    いいえ

    CTAS または CDAS 構文を使用して ApsaraMQ for Kafka にデータを同期する場合に、このパラメーターを設定します。

    説明

    VVR 6.0.2 以降のコンピュートエンジンのみがこのパラメーターをサポートします。

    aliyun.kafka.instanceId

    String

    ApsaraMQ for Kafka インスタンスの ID。ApsaraMQ for Kafka コンソールのインスタンス詳細ページで ID を表示できます。

    いいえ

    CTAS または CDAS 構文を使用して ApsaraMQ for Kafka にデータを同期する場合に、このパラメーターを設定します。

    説明

    VVR 6.0.2 以降のコンピュートエンジンのみがこのパラメーターをサポートします。

    aliyun.kafka.endpoint

    String

    ApsaraMQ for Kafka の API エンドポイント。詳細については、「エンドポイント」をご参照ください。

    いいえ

    CTAS または CDAS 構文を使用して ApsaraMQ for Kafka にデータを同期する場合に、このパラメーターを設定します。

    説明

    VVR 6.0.2 以降のコンピュートエンジンのみがこのパラメーターをサポートします。

    aliyun.kafka.regionId

    String

    Topic が配置されているインスタンスのリージョン ID。詳細については、「エンドポイント」をご参照ください。

    いいえ

    CTAS または CDAS 構文を使用して ApsaraMQ for Kafka にデータを同期する場合に、このパラメーターを設定します。

    説明

    VVR 6.0.2 以降のコンピュートエンジンのみがこのパラメーターをサポートします。

  2. カタログを作成する文を選択し、左側の行番号の横にある [実行] をクリックします。

    image.png

  3. 左側の [メタデータ] エリアで、作成されたカタログを表示できます。

Kafka JSON カタログを表示する

  1. [データ探索] タブのテキストエディターで、次の文を入力します。

    DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;

    パラメータ

    説明

    ${catalog_name}

    Kafka JSON カタログの名前。

    ${db_name}

    Kafka クラスタの名前。

    ${topic_name}

    Kafka トピックの名前。

  2. カタログを表示する文を選択し、左側の行番号の横にある [実行] をクリックします。

    文が正常に実行されると、実行結果でテーブルの詳細を表示できます。Table information

Kafka JSON カタログを使用する

  • カタログをソーステーブルとして使用して、Kafka Topic からデータを読み取ります。

    INSERT INTO ${other_sink_table}
    SELECT...
    FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    説明

    Kafka JSON カタログのテーブルを使用する際に他の WITH パラメーターを指定するには、SQL ヒントを使用して追加できます。たとえば、上記の SQL 文では、SQL ヒントを使用して最も早いデータから消費を開始するように指定しています。他のパラメーターの詳細については、「ApsaraMQ for Kafka ソーステーブル」および「ApsaraMQ for Kafka 結果テーブル」をご参照ください。

  • CREATE TABLE AS (CTAS) 文を使用して、Kafka Topic をソーステーブルとして、Kafka Topic からターゲットテーブルにデータを同期できます。

    • 単一テーブルをリアルタイムで同期します。

      CREATE TABLE IF NOT EXISTS `${target_table_name}`
      WITH(...)
      AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}`
      /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    • 1 つのジョブで複数のテーブルを同期します。

      BEGIN STATEMENT SET;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
      AS TABLE `kafka-catalog`.`kafka`.`topic0`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
      AS TABLE `kafka-catalog`.`kafka`.`topic1`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
      AS TABLE `kafka-catalog`.`kafka`.`topic2`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      END;

      Kafka JSON カタログを使用すると、同じタスクで複数の Kafka テーブルを同期できます。ただし、次の条件を満たす必要があります:

      • いずれの Kafka テーブルにも topic-pattern パラメーターが設定されていないこと。

      • 各テーブルの Kafka 構成が同一であること。つまり、properties.bootstrap.servers や properties.group.id を含むすべての properties.* 設定が同じである必要があります。

      • 各テーブルの scan.startup.mode 設定が同一であること。group-offsets、latest-offset、または earliest-offset のいずれかにのみ設定できます。

      たとえば、次の図では、上の 2 つのテーブルは条件を満たしていますが、下の 2 つのテーブルは条件に違反しています。Example

説明

Kafka JSON カタログを使用した完全なエンドツーエンドの例については、「リアルタイムログウェアハウジングのクイックスタート」をご参照ください。

Kafka JSON カタログの削除

警告

Kafka JSON カタログを削除しても、実行中のジョブには影響しません。ただし、そのカタログのテーブルを使用するジョブは、公開または再起動時に「テーブルが見つかりません」というエラーで失敗します。注意して進めてください。

  1. [データ探索] タブのテキストエディターで、次の文を入力します。

    DROP CATALOG ${catalog_name};

    ${catalog_name} を、削除したい Kafka JSON カタログの名前に置き換えます。

  2. カタログを削除する文を選択し、右クリックして [実行] を選択します。

  3. 左側の [メタデータ] エリアで、カタログが削除されたかどうかを確認します。

Kafka JSON カタログからのテーブル情報の詳細

Kafka JSON カタログのテーブルを使いやすくするために、カタログは推測されたテーブルにデフォルトの構成パラメーター、メタデータ、およびプライマリキー情報を追加します。次のセクションでは、Kafka JSON カタログからのテーブル情報の詳細について説明します:

  • Kafka テーブルのスキーマ推測

    Kafka JSON カタログが JSON 形式のメッセージを解析して Topic のスキーマを取得する際、カタログは最大 max.fetch.records 件のメッセージを消費しようとします。Kafka が CTAS データソースとして使用される場合と同じ基本ルールに従って、各データレコードのスキーマを解析します。その後、これらのスキーマをマージして最終的なスキーマを形成します。

    重要
    • Kafka JSON カタログがスキーマを推測する際、Topic のデータを消費するためにコンシューマーグループを作成します。コンシューマーグループ名には、カタログによって作成されたことを示すプレフィックスが使用されます。

    • ApsaraMQ for Kafka の場合、VVR 6.0.7 以降で Kafka JSON カタログを使用してください。6.0.7 より前のバージョンでは、コンシューマーグループは自動的に削除されません。これにより、コンシューマーグループでのメッセージの滞留に関するアラートが発生する可能性があります。

    スキーマは主に次の部分で構成されます:

    • 推測される物理列

      Kafka JSON カタログは、Kafka メッセージキーと本文 (値) からメッセージの物理列を推測します。対応するプレフィックスが列名に追加されます。

      メッセージキーが空でなく、解析できない場合、VARBINARY 型の列が作成されます。列名は、key.fields-prefix プレフィックスと infer-schema.parse-key-error.field-name パラメーターの値の連結になります。

      Kafka メッセージのグループをプルした後、カタログは各メッセージを解析し、解析された物理列をマージして Topic 全体のスキーマを作成します。マージルールは次のとおりです:

      • 解析された物理列に結果スキーマにないフィールドが含まれている場合、Kafka JSON カタログはこれらのフィールドを結果スキーマに自動的に追加します。

      • 同じ名前の列が表示された場合、次のシナリオに基づいて処理されます:

        • 型は同じだが精度が異なる場合、より高い精度の型が使用されます。

        • 型が異なる場合、システムは次の図に示すツリー構造で最小の親ノードを見つけ、それを同じ名前の列の型として使用します。ただし、Decimal 型と Float 型がマージされる場合、精度を維持するために Double 型に変換されます。Schema merge

      たとえば、以下の 3 つのデータレコードを含む Kafka Topic の場合、Kafka JSON カタログは図に示すスキーマを生成します。Schema

    • デフォルトのメタデータ列

      Kafka JSON カタログは、デフォルトで partition、offset、timestamp の 3 つの有用なメタデータ列を追加します。詳細は次の表のとおりです。

      メタデータ名

      列名

      説明

      partition

      partition

      INT NOT NULL

      パーティションの値。

      offset

      offset

      BIGINT NOT NULL

      オフセット。

      timestamp

      timestamp

      TIMESTAMP_LTZ(3) NOT NULL

      メッセージのタイムスタンプ。

    • デフォルトの主キー制約

      Kafka JSON カタログのテーブルがソーステーブルとして消費される場合、データの独自性を保証するために、メタデータ列の partition と offset がデフォルトでプライマリキーとして使用されます。

    説明

    Kafka JSON カタログによって推測されたテーブルスキーマが要件を満たさない場合は、CREATE TEMPORARY TABLE ... LIKE 構文を使用して一時テーブルを宣言し、目的のテーブルスキーマを指定できます。たとえば、JSON データに '2023-01-01 12:00:01' 形式の ts という名前のフィールドが含まれている場合、Kafka JSON カタログは ts フィールドを自動的に TIMESTAMP 型として推測します。ts フィールドを STRING 型として使用したい場合は、CREATE TEMPORARY TABLE ... LIKE 構文を使用してテーブルを宣言できます。次の例に示すように、デフォルト設定ではメッセージ値フィールドに value_ プレフィックスが追加されるため、フィールド名は value_ts になります。

    CREATE TEMPORARY TABLE tempTable (
        value_name STRING,
        value_ts STRING
    ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;
  • デフォルトのテーブルパラメーター

    パラメータ

    説明

    備考

    connector

    コネクタのタイプ。

    値は kafka または upsert-kafka である必要があります。

    topic

    対応する Topic 名。

    テーブルの名前。

    properties.bootstrap.servers

    Kafka ブローカーのアドレス。

    カタログ構成の properties.bootstrap.servers パラメーターの値に対応します。

    value.format

    Flink Kafka コネクタが Kafka メッセージ本文 (値) をシリアル化または逆シリアル化するために使用するフォーマット。

    値は JSON に固定されます。

    value.fields-prefix

    すべての Kafka メッセージ本文 (値) フィールドにカスタムプレフィックスを指定して、メッセージキーまたはメタデータフィールドとの名前の競合を回避します。

    カタログ構成の value.fields-prefix パラメーターの値に対応します。

    value.json.infer-schema.flatten-nested-columns.enable

    Kafka メッセージ本文 (値) の JSON 内のネストされた列を再帰的に展開するかどうかを指定します。

    カタログ構成の infer-schema.flatten-nested-columns.enable パラメーターの値に対応します。

    value.json.infer-schema.primitive-as-string

    Kafka メッセージ本文 (値) のすべての基本データ型を String 型として推測するかどうかを指定します。

    カタログ構成の infer-schema.primitive-as-string パラメーターの値に対応します。

    value.fields-include

    メッセージ本文内のメッセージキーフィールドの処理ポリシーを定義します。

    値は EXCEPT_KEY である必要があります。これは、メッセージ本文にメッセージキーフィールドが含まれていないことを示します。

    メッセージキーが空でない場合は、このパラメーターを設定する必要があります。メッセージキーが空の場合は、このパラメーターを設定しないでください。

    key.format

    Flink Kafka コネクタが Kafka メッセージキーをシリアル化または逆シリアル化するために使用するフォーマット。

    値は json または raw である必要があります。

    メッセージキーが空でない場合は、このパラメーターを設定する必要があります。メッセージキーが空の場合は、このパラメーターを設定しないでください。

    メッセージキーが空でなく、解析できない場合は、このパラメーターを raw に設定します。解析が成功した場合は、このパラメーターを json に設定します。

    key.fields-prefix

    すべての Kafka メッセージキーフィールドにカスタムプレフィックスを指定して、メッセージ本文フォーマットフィールドとの名前の競合を回避します。

    カタログ構成の key.fields-prefix パラメーターの値に対応します。

    メッセージキーが空でない場合は、このパラメーターを設定する必要があります。メッセージキーが空の場合は、このパラメーターを設定しないでください。

    key.fields

    Kafka メッセージキーから解析されたデータが格納されるフィールド。

    解析されたキーフィールドのリストが自動的に入力されます。

    メッセージキーが空でなく、テーブルが Upsert Kafka テーブルでない場合は、このパラメーターを設定する必要があります。それ以外の場合は、このパラメーターを設定しないでください。