Kafka JSON カタログを作成すると、スキーマを定義しなくても、Realtime Compute for Apache Flink の開発コンソールで Kafka クラスタの JSON 形式のトピックにアクセスできます。このトピックでは、Realtime Compute for Apache Flink の開発コンソールで Kafka JSON カタログを作成、表示、使用、および削除する方法について説明します。
背景情報
Kafka JSON カタログは、JSON 形式のメッセージを自動的に解析してトピックのスキーマを推測します。したがって、JSON カタログを使用すると、Flink SQL で Kafka テーブルのスキーマを宣言しなくても、メッセージの特定のフィールドを取得できます。Kafka JSON カタログを使用する場合は、次の点に注意してください。
Kafka JSON カタログのテーブルの名前は、Kafka クラスタのトピックの名前と一致します。このようにして、Kafka クラスタのトピックにアクセスするために DDL 文を実行する必要はありません。これにより、データ開発の効率と精度が向上します。
Kafka JSON カタログのテーブルは、Flink SQL デプロイメントのソーステーブルとして使用できます。
Kafka JSON カタログを CREATE TABLE AS 文と一緒に使用して、スキーマの変更を同期できます。
このトピックでは、Kafka JSON カタログを管理するために行える操作について説明します。
制限事項
Kafka JSON カタログは、JSON 形式のトピックのみをサポートします。
Ververica Runtime(VVR)6.0.2 以降を使用する Realtime Compute for Apache Flink のみ Kafka JSON カタログをサポートします。
説明デプロイメントで VVR 4.X を使用している場合は、Kafka JSON カタログを使用する前に、デプロイメントの VVR バージョンを VVR 6.0.2 以降にアップグレードすることをお勧めします。
DDL 文を実行して既存の Kafka JSON カタログを変更することはできません。
Kafka JSON カタログを使用してデータテーブルをクエリすることのみ可能です。Kafka JSON カタログを使用してデータベースとテーブルを作成、変更、または削除することはできません。
説明Kafka JSON カタログを CREATE DATABASE AS 文または CREATE TABLE AS 文と一緒に使用すると、トピックが自動的に作成されます。
Kafka JSON カタログを使用して、SSL ベースの認証または簡易認証およびセキュリティ層(SASL)認証が有効になっている Kafka クラスタからデータを読み書きすることはできません。
Kafka JSON カタログのテーブルは、Flink SQL デプロイメントのソーステーブルとして使用できますが、結果テーブルまたはディメンションテーブルとして使用されるルックアップテーブルとして使用することはできません。
ApsaraMQ for Kafka では、Apache Kafka が グループ を削除するために使用する API 操作と同じ API 操作を呼び出すことはできません。Kafka JSON カタログを作成する場合は、aliyun.kafka.instanceId、aliyun.kafka.accessKeyId、aliyun.kafka.accessKeySecret、aliyun.kafka.endpoint、および aliyun.kafka.regionId パラメータを構成して、グループ ID を自動的に削除する必要があります。詳細については、「ApsaraMQ for Kafka とオープンソース Apache Kafka の比較」をご参照ください。
注意事項
Kafka JSON カタログはサンプルデータを解析してテーブルスキーマを生成します。データ形式が一貫していないトピックの場合、Kafka JSON カタログはすべての列のすべてのフィールドを含むスキーマを自動的に保持します。トピックのデータ形式が変更された場合、Kafka JSON カタログによって生成されたテーブルスキーマは、異なる時点で一貫しない可能性があります。その結果、デプロイメントの再起動前後に異なるスキーマが推測された場合、デプロイメントの実行エラーが発生する可能性があります。たとえば、Kafka JSON カタログのテーブルを参照する Realtime Compute for Apache Flink の SQL デプロイメントが一定期間実行された後、セーブポイントから再起動された場合、前回の実行のテーブルスキーマとは異なるテーブルスキーマが生成される可能性があります。SQL デプロイメントは、前回の実行で生成されたテーブルスキーマを使用します。その結果、ダウンストリームストレージのフィルタリング条件やフィールド値などのオブジェクトが一致しない可能性があります。したがって、Create Temporary Table を使用して SQL デプロイメントで Kafka テーブルを作成することをお勧めします。この場合、SQL デプロイメントは固定テーブルスキーマを使用できます。
Kafka JSON カタログを作成する
スクリプトページの スクリプト タブのコードエディタで、次の文を入力して Kafka JSON カタログを作成します。
自己管理 Kafka クラスタまたは E-MapReduce(EMR)Kafka クラスタ の Kafka JSON カタログを作成するために使用する文
CREATE CATALOG <YourCatalogName> WITH( 'type'='kafka', 'properties.bootstrap.servers'='<brokers>', 'format'='json', 'default-database'='<dbName>', 'key.fields-prefix'='<keyPrefix>', 'value.fields-prefix'='<valuePrefix>', 'timestamp-format.standard'='<timestampFormat>', 'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>', 'infer-schema.primitive-as-string'='<primitiveAsString>', 'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>', 'infer-schema.compacted-topic-as-upsert-table'='true', 'max.fetch.records'='100' );ApsaraMQ for Kafka インスタンスの Kafka JSON カタログを作成するために使用する文
CREATE CATALOG <YourCatalogName> WITH( 'type'='kafka', 'properties.bootstrap.servers'='<brokers>', 'format'='json', 'default-database'='<dbName>', 'key.fields-prefix'='<keyPrefix>', 'value.fields-prefix'='<valuePrefix>', 'timestamp-format.standard'='<timestampFormat>', 'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>', 'infer-schema.primitive-as-string'='<primitiveAsString>', 'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>', 'infer-schema.compacted-topic-as-upsert-table'='true', 'max.fetch.records'='100', 'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>', 'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>', 'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>', 'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>', 'aliyun.kafka.regionId'='<aliyunKafkaRegionId>' );
パラメータ
データ型
説明
必須
備考
YourCatalogName
STRING
Kafka JSON カタログの名前。
はい
カスタム名を入力します。
重要パラメータの値をカタログの名前に置き換えるときは、山かっこ(<>)を削除する必要があります。そうしないと、構文チェック中にエラーが返されます。
type
STRING
カタログのタイプ。
はい
値を kafka に設定します。
properties.bootstrap.servers
STRING
Kafka ブローカーの IP アドレスまたはエンドポイント。
はい
形式:
host1:port1,host2:port2,host3:port3。複数の host:port ペアをカンマ(,)で区切ります。
format
STRING
Kafka メッセージの形式。
はい
JSON 形式のみがサポートされています。Realtime Compute for Apache Flink は、JSON 形式の Kafka メッセージを解析してスキーマを取得します。
default-database
STRING
Kafka クラスタの名前。
いいえ
デフォルト値: kafka。カタログは catalog_name.db_name.table_name に基づいてテーブルを定義します。db_name のデフォルト値は catalog_name.db_name.table_name で使用されます。Kafka はデータベースを提供していません。文字列を使用して、Kafka クラスタの db_name の値を変更できます。
key.fields-prefix
STRING
Kafka メッセージのキーフィールドから解析されたフィールドに追加されるプレフィックス。このパラメータを構成すると、Kafka メッセージのキーフィールドが解析された後に名前の競合を防ぐことができます。
いいえ
デフォルト値: key_。キーフィールドの名前が a の場合、Kafka メッセージのキーフィールドが解析された後に取得されるキーの名前は key_a です。
説明key.fields-prefix パラメータの値は、value.fields-prefix パラメータの値のプレフィックスと同じにすることはできません。たとえば、value.fields-prefix パラメータが test1_value_ に設定されている場合、key.fields-prefix パラメータを test1_ に設定することはできません。
value.fields-prefix
STRING
Kafka メッセージの値フィールドから解析されたフィールドに追加されるプレフィックス。このパラメータを構成すると、Kafka メッセージの値フィールドが解析された後に名前の競合を防ぐことができます。
いいえ
デフォルト値: value_。値フィールドの名前が b の場合、Kafka メッセージの値フィールドが解析された後に取得される値の名前は value_b です。
説明value.fields-prefix パラメータの値は、key.fields-prefix パラメータの値のプレフィックスと同じにすることはできません。たとえば、key.fields-prefix パラメータが test2_value_ に設定されている場合、value.fields-prefix パラメータを test2_ に設定することはできません。
timestamp-format.standard
STRING
JSON 形式の Kafka メッセージの TIMESTAMP 型のフィールドの形式。Realtime Compute for Apache Flink は、構成した形式でフィールドを解析します。Realtime Compute for Apache Flink が構成した形式でフィールドを解析できない場合、Realtime Compute for Apache Flink は別の形式でフィールドを解析しようとします。
いいえ
有効な値:
SQL(デフォルト値)
ISO-8601
infer-schema.flatten-nested-columns.enable
BOOLEAN
JSON 形式の Kafka メッセージの値フィールドが解析されるときに、JSON テキストのネストされた列を再帰的に展開するかどうかを指定します。
いいえ
有効な値:
true: ネストされた列が再帰的に展開されます。
Realtime Compute for Apache Flink は、展開された列の値をインデックス付けするパスを列の名前として使用します。たとえば、
{"nested": {"col": true}}の列 col は、展開された後に nested.col という名前になります。説明このパラメータを true に設定する場合は、このパラメータを CREATE TABLE AS 文と一緒に使用することをお勧めします。他の DML 文は、ネストされた列を自動的に展開するために使用できません。
false: ネストされた型は STRING 型として解析されます。これはデフォルト値です。
infer-schema.primitive-as-string
BOOLEAN
JSON 形式の Kafka メッセージの値フィールドが解析されるときに、すべての基本型を STRING 型として推測するかどうかを指定します。
いいえ
有効な値:
true: すべての基本型は STRING 型として推測されます。
false: データ型はデータ型マッピングに基づいて推測されます。これはデフォルト値です。
infer-schema.parse-key-error.field-name
STRING
キーフィールドのデータ。JSON 形式の Kafka メッセージのキーフィールドが解析されるときに、キーフィールドが指定されているが解析に失敗した場合、名前が key.fields-prefix プレフィックスとこのパラメータの値である列が、トピックに一致するテーブルのスキーマに追加されます。この列は VARBINARY 型であり、キーフィールドのデータを示します。
いいえ
デフォルト値: col。たとえば、JSON 形式の Kafka メッセージの値フィールドが value_name として解析され、キーフィールドが指定されているが解析に失敗した場合、返されるトピックに一致するテーブルのスキーマには、key_col と value_name の 2 つのフィールドが含まれます。
infer-schema.compacted-topic-as-upsert-table
BOOLEAN
Kafka トピックのログクリーンアップポリシーが compact で、キーフィールドが指定されているときに、テーブルを Upsert Kafka テーブルとして使用するかどうかを指定します。
いいえ
デフォルト値: true。CREATE TABLE AS 文または CREATE DATABASE AS 文を実行して ApsaraMQ for Kafka にデータを同期するときは、このパラメータを true に設定する必要があります。
説明VVR 6.0.2 以降を使用する Realtime Compute for Apache Flink のみ、このパラメータをサポートします。
max.fetch.records
INT
メッセージが解析されるときにシステムが消費を試みる JSON 形式のメッセージの最大数。
いいえ
デフォルト値: 100。
aliyun.kafka.accessKeyId
STRING
Alibaba Cloud アカウントの AccessKey ID。AccessKey ID の取得方法については、「AccessKey ペアを作成する」をご参照ください。
いいえ
CREATE TABLE AS 文または CREATE DATABASE AS 文を実行して ApsaraMQ for Kafka にデータを同期するときは、このパラメータを構成する必要があります。
説明VVR 6.0.2 以降を使用する Realtime Compute for Apache Flink のみ、このパラメータをサポートします。
aliyun.kafka.accessKeySecret
STRING
Alibaba Cloud アカウントの AccessKey シークレット。AccessKey シークレットの取得方法については、「AccessKey ペアを作成する」をご参照ください。
いいえ
CREATE TABLE AS 文または CREATE DATABASE AS 文を実行して ApsaraMQ for Kafka にデータを同期するときは、このパラメータを構成する必要があります。
説明VVR 6.0.2 以降を使用する Realtime Compute for Apache Flink のみ、このパラメータをサポートします。
aliyun.kafka.instanceId
STRING
ApsaraMQ for Kafka インスタンスの ID。ApsaraMQ for Kafka コンソールの [インスタンスの詳細] ページで ID を表示できます。
いいえ
CREATE TABLE AS 文または CREATE DATABASE AS 文を実行して ApsaraMQ for Kafka にデータを同期するときは、このパラメータを構成する必要があります。
説明VVR 6.0.2 以降を使用する Realtime Compute for Apache Flink のみ、このパラメータをサポートします。
aliyun.kafka.endpoint
STRING
ApsaraMQ for Kafka のエンドポイント。詳細については、「エンドポイント」をご参照ください。
いいえ
CREATE TABLE AS 文または CREATE DATABASE AS 文を実行して ApsaraMQ for Kafka にデータを同期するときは、このパラメータを構成する必要があります。
説明VVR 6.0.2 以降を使用する Realtime Compute for Apache Flink のみ、このパラメータをサポートします。
aliyun.kafka.regionId
STRING
トピックが属する ApsaraMQ for Kafka インスタンスのリージョン ID。詳細については、「エンドポイント」をご参照ください。
いいえ
CREATE TABLE AS 文または CREATE DATABASE AS 文を実行して ApsaraMQ for Kafka にデータを同期するときは、このパラメータを構成する必要があります。
説明VVR 6.0.2 以降を使用する Realtime Compute for Apache Flink のみ、このパラメータをサポートします。
カタログを作成するために使用するコードを選択し、コードの左側に表示される [実行] をクリックします。

カタログタブの左側の [カタログ] ペインで、作成したカタログを表示します。
Kafka JSON カタログを表示する
スクリプトページの スクリプト タブのコードエディタで、次の文を入力します。
DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;パラメータ
説明
${catalog_name}
Kafka JSON カタログの名前。
${db_name}
Kafka クラスタの名前。
${topic_name}
Kafka トピックの名前。
カタログを表示するために使用するコードを選択し、コードの左側に表示される [実行] をクリックします。
文が実行されると、結果にトピックに一致するテーブルの情報が表示されます。

Kafka JSON カタログを使用する
Kafka JSON カタログのテーブルがソーステーブルとして使用されている場合、テーブルに一致する Kafka トピックからデータを読み取ることができます。
INSERT INTO ${other_sink_table} SELECT... FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;説明Kafka JSON カタログを使用するときに WITH 句で他のパラメータを指定する必要がある場合は、SQL ヒント を使用して他のパラメータを追加することをお勧めします。前の SQL 文では、SQL ヒントを使用して、消費が最も古いデータから開始されることを指定しています。他のパラメータの詳細については、「ApsaraMQ for Kafka ソーステーブルを作成する」および「ApsaraMQ for Kafka 結果テーブルを作成する」をご参照ください。
Kafka JSON カタログのテーブルが Apache Kafka ソーステーブルのメッセージキューとして使用されている場合、CREATE TABLE AS 文を使用して、テーブルに一致する Kafka トピックからデスティネーションテーブルにデータを同期できます。
単一のトピックからリアルタイムでデータを同期します。
CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH(...) AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}` /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;デプロイメントの複数のトピックからデータを同期します。
BEGIN STATEMENT SET; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0` AS TABLE `kafka-catalog`.`kafka`.`topic0` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1` AS TABLE `kafka-catalog`.`kafka`.`topic1` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2` AS TABLE `kafka-catalog`.`kafka`.`topic2` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; END;CREATE TABLE AS 文を Kafka JSON カタログと一緒に使用して、デプロイメントの複数の Kafka トピックからデータを同期できます。デプロイメントの複数の Kafka トピックからデータを同期するには、次の条件が満たされていることを確認してください。
topic-pattern は、トピックに一致するすべてのテーブルで構成されていません。
各テーブルの Kafka パラメータの値は同じです。プレフィックスが properties. であるパラメータの値は同じです。パラメータには、properties.bootstrap.servers と properties.group.id が含まれます。
scan.startup.mode パラメータの値は、すべてのテーブルで同じです。scan.startup.mode パラメータは、group-offsets、latest-offset、または earliest-offset のみに設定できます。
次の図は例を示しています。次の図では、上の 2 つのテーブルは上記の条件を満たしており、下の 2 つのテーブルは条件を満たしていません。

Kafka JSON カタログのソーステーブルからデスティネーションカタログのデスティネーションテーブルにデータを同期する方法の詳細については、「ログデータをデータウェアハウスにリアルタイムで取り込む」をご参照ください。
Kafka JSON カタログを削除する
Kafka JSON カタログを削除した後も、実行中のデプロイメントは影響を受けません。ただし、カタログのテーブルを使用するデプロイメントは、デプロイメントが公開または再起動されるときにテーブルを見つけることができなくなります。Kafka JSON カタログを削除するときは注意してください。
スクリプトページの スクリプト タブのコードエディタで、次の文を入力します。
DROP CATALOG ${catalog_name};catalog_name は、削除する Kafka JSON カタログの名前を指定します。
カタログを削除するために使用する文を右クリックし、ショートカットメニューから [実行] を選択します。
カタログリストページの左側の [カタログ] ペインで、カタログが削除されているかどうかを確認します。
スキーマ推論の説明
Kafka JSON カタログを構成した後に取得したテーブルを簡単に使用できるように、システムはテーブルにデフォルトの構成パラメータ、メタデータ列、およびプライマリキーを自動的に追加します。このセクションでは、Kafka JSON カタログを構成した後に取得したテーブルの情報について説明します。
テーブルスキーマ
JSON 形式の Kafka メッセージが解析されてトピックスキーマが取得されると、システムは max.fetch.records パラメータの値以下のメッセージを消費しようとします。システムは各データレコードのスキーマを解析し、スキーマをトピックスキーマとしてマージします。システムは、CREATE TABLE AS 文を使用して Kafka テーブルのデータを同期するときに使用される データ型マッピング に基づいてメッセージを解析します。
重要Kafka JSON カタログを使用してトピックスキーマを推測すると、コンシューマーグループが作成されてトピックのデータが消費されます。コンシューマーグループの名前にプレフィックスが含まれている場合、コンシューマーグループはカタログを使用して作成されます。
ApsaraMQ for Kafka テーブルからデータを取得する場合は、VVR 6.0.7 以降を使用する Realtime Compute for Apache Flink の Kafka JSON カタログを使用することをお勧めします。6.0.7 より前のバージョンの VVR を使用する Realtime Compute for Apache Flink の場合、コンシューマーグループは自動的に削除されません。その結果、コンシューマーグループのメッセージ蓄積に関するアラート通知を受け取る場合があります。
トピックスキーマは次の部分で構成されます。
物理列
デフォルトでは、物理列は Kafka メッセージのキーフィールドと値フィールドに基づいて解析されます。プレフィックスは、取得した列名に追加されます。
キーフィールドが指定されているが解析に失敗した場合、名前が key.fields-prefix プレフィックスと infer-schema.parse-key-error.field-name パラメータの値である列が返されます。列の型は VARBINARY です。
Kafka JSON カタログは Kafka メッセージのグループを取得した後、Kafka メッセージを順番に解析し、解析後に取得された物理列を次のルールに基づいてトピックのスキーマとしてマージします。この関数は次のルールに基づいて JSON ドキュメントをマージします。
解析後に取得された特定の物理列がトピックスキーマにない場合、Kafka JSON カタログは自動的に列をトピックスキーマに追加します。
解析後に取得された特定の物理列の名前がトピックスキーマの特定の列と同じ名前の場合、ビジネスシナリオに基づいて操作を実行します。
列のデータ型は同じだが精度が異なる場合、Kafka JSON カタログは精度の高い列をマージします。
列のデータ型が異なる場合、Kafka JSON カタログは、次の図に示すツリー構造の最小の親ノードを、同じ名前の列の型として使用します。DECIMAL 型と FLOAT 型の列がマージされる場合、列は DOUBLE 型にマージされて精度が保持されます。

たとえば、Kafka トピックに 3 つのデータレコードが含まれている場合、次の図に示すスキーマが返されます。

メタデータ列
デフォルトでは、partition、offset、および timestamp という名前のメタデータ列が追加されます。次の表に、メタデータ列を示します。
メタデータ名
列名
型
説明
partition
partition
INT NOT NULL
パーティションキー列の値。
offset
offset
BIGINT NOT NULL
オフセット。
timestamp
timestamp
TIMESTAMP_LTZ(3) NOT NULL
メッセージのタイムスタンプ。
追加されるデフォルトのプライマリキーのルール
Kafka JSON カタログを構成した後に取得されたテーブルがソーステーブルとして消費される場合、メタデータ列 partition と offset がプライマリキーとして使用されます。これにより、データが重複しないことが保証されます。
説明Kafka JSON カタログから推測されたテーブルスキーマが予期しないものである場合は、CREATE TEMPORARY TABLE ... LIKE 構文を使用して一時テーブルを宣言し、目的のテーブルスキーマを指定できます。たとえば、JSON データには '2023-01-01 12:00:01' 形式の ts フィールドが含まれています。Kafka JSON カタログは、ts フィールドを TIMESTAMP データ型として自動的に推測します。ts フィールドを STRING データ型として使用する場合、CREATE TEMPORARY TABLE... LIKE 構文を使用してテーブルを宣言できます。次のサンプルコードでは、デフォルトの構成で value_ プレフィックスが値フィールドに追加されるため、value_ts フィールドが使用されます。
CREATE TEMPORARY TABLE tempTable ( value_name STRING, value_ts STRING ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;デフォルトで追加されるテーブルパラメータ
パラメータ
説明
備考
connector
コネクタのタイプ。
値を kafka または upsert-kafka に設定します。
topic
トピックの名前。
Kafka JSON カタログで宣言したテーブルの名前に値を設定します。
properties.bootstrap.servers
Kafka ブローカーの IP アドレスまたはエンドポイント。
properties.bootstrap.servers パラメーターの値と同じ値を Kafka JSON カタログに設定します。
value.format
Flink Kafka コネクタが Kafka メッセージの値フィールドをシリアル化または逆シリアル化するために使用する形式。
値を json に設定します。
value.fields-prefix
Kafka メッセージのすべての値フィールドのカスタムプレフィックス。このパラメータを構成すると、キーフィールドまたはメタデータフィールドとの名前の競合を防ぐことができます。
Kafka JSON カタログの value.fields-prefix パラメータの値と同じ値に設定します。
value.json.infer-schema.flatten-nested-columns.enable
JSON 形式の Kafka メッセージの値フィールドが解析されるときに、JSON テキストのネストされた列を再帰的に展開するかどうかを指定します。
Kafka JSON カタログの infer-schema.flatten-nested-columns.enable パラメータの値と同じ値に設定します。
value.json.infer-schema.primitive-as-string
JSON 形式の Kafka メッセージの値フィールドが解析されるときに、すべての基本型を STRING 型として推測するかどうかを指定します。
Kafka JSON カタログの infer-schema.primitive-as-string パラメータの値と同じ値に設定します。
value.fields-include
値フィールドが解析されるときにキーフィールドを処理するために使用されるポリシー。
値を EXCEPT_KEY に設定します。このパラメータが EXCEPT_KEY に設定されている場合、値フィールドが解析されるときにキーフィールドは除外されます。
キーフィールドが指定されている場合は、このパラメータを構成する必要があります。
key.format
Flink Kafka コネクタが Kafka メッセージのキーフィールドをシリアル化または逆シリアル化するために使用する形式。
値を json または raw に設定します。
キーフィールドが指定されている場合は、このパラメータを構成する必要があります。
キーフィールドが指定されているが解析に失敗した場合は、このパラメータの値を raw に設定します。キーフィールドが指定されていて解析される場合は、このパラメータの値を json に設定します。
key.fields-prefix
Kafka メッセージのすべてのキーフィールドのカスタムプレフィックス。このパラメータを構成すると、値フィールドとの名前の競合を防ぐことができます。
Kafka JSON カタログの key.fields-prefix パラメータの値と同じ値に設定します。
キーフィールドが指定されている場合は、このパラメータを構成する必要があります。
key.fields
Kafka メッセージのキーフィールドから解析されたフィールド。
システムはテーブルにキーフィールドを自動的に入力します。
キーフィールドが指定されていて、テーブルが Upsert Kafka テーブルでない場合は、このパラメータを構成する必要があります。