すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:JSON

最終更新日:Mar 25, 2025

このトピックでは、JSON 形式とデータ型マッピングの使用方法について説明します。

背景情報

JSON 形式を使用して、JSON 構造に基づいて JSON データを読み書きできます。JSON 構造は、テーブルスキーマから自動的に推測されます。JSON 形式をサポートするコネクタには、Kafka コネクタUpsert Kafka コネクタElasticsearch コネクタオブジェクトストレージサービス (OSS) コネクタMongoDBコネクタStarRocks コネクタなどがあります。

サンプルコード

次のサンプルコードは、Apache Kafka コネクタを使用して JSON 形式でテーブルを作成する方法の例を示しています。

CREATE TABLE Orders (
orderId INT,
product STRING,
orderInfo MAP<STRING, STRING>,
orderTime TIMESTAMP(3),
WATERMARK FOR orderTime AS orderTime - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)

パラメーター

パラメーター

必須

デフォルト値

データ型

説明

format

はい

(なし)

STRING

使用する形式を宣言します。JSON 形式を使用する場合は、このパラメーターを json に設定します。

json.fail-on-missing-field

いいえ

false

BOOLEAN

有効な値:

  • true: 解析する必要があるフィールドが存在しない場合、現在のフィールドまたは行はスキップされます。

  • false: 解析する必要があるフィールドが存在しない場合、エラーが返され、デプロイの開始に失敗します。これはデフォルト値です。

json.ignore-parse-errors

いいえ

false

BOOLEAN

有効な値:

  • true: 解析に失敗した場合、現在のフィールドまたは行はスキップされます。

  • false: 解析に失敗した場合、エラーが返され、デプロイの開始に失敗します。これはデフォルト値です。

json.timestamp-format.standard

いいえ

SQL

STRING

入力タイムスタンプと出力タイムスタンプの形式。有効な値:

  • SQL: yyyy-MM-dd HH:mm:ss.s{precision} 形式の入力タイムスタンプが解析されます。たとえば、入力タイムスタンプは 2020-12-30 12:13:14.123 です。出力タイムスタンプは入力タイムスタンプと同じ形式です。

  • ISO-8601: yyyy-MM-ddTHH:mm:ss.s{precision} 形式の入力タイムスタンプが解析されます。たとえば、入力タイムスタンプは 2020-12-30T12:13:14.123 です。出力タイムスタンプは入力タイムスタンプと同じ形式です。

json.map-null-key.mode

いいえ

FAIL

STRING

マップ内の null キー値を処理するために使用されるメソッド。有効な値:

  • FAIL: マップ内のキー値が null の場合、エラーが返されます。

  • DROP: マップ内のキー値が null のデータは破棄されます。

  • LITERAL: 文字列定数を使用して、マップ内の null キー値を置き換えます。文字列定数の値は、json.map-null-key.literal パラメーターで指定されます。

json.map-null-key.literal

いいえ

null

STRING

json.map-null-key.mode パラメーターが LITERAL に設定されている場合、指定された文字列定数を使用してマップ内の null キー値を置き換えます。

json.encode.decimal-as-plain-number

いいえ

false

BOOLEAN

有効な値:

  • true: DECIMAL 型のすべてのデータは変更されず、科学表記法形式では表されません。たとえば、0.000000027 は 0.000000027 として表されます。

  • false: DECIMAL 型のすべてのデータは、科学表記法形式で表されます。たとえば、0.000000027 は 2.7E-8 として表されます。

json.write-null-properties

いいえ

true

BOOLEAN

列の空の値を JSON 文字列に書き込むかどうかを指定します。有効な値:

  • true: 列の空の値は、null 値として JSON 文字列に書き込まれます。

  • false: 列の空の値は JSON 文字列に書き込まれません。

説明

このパラメーターは、Realtime Compute for Apache Flink が Ververica Runtime (VVR) 8.0.6 以降を使用している場合にのみ構成できます。

データ型マッピング

JSON 構造は、テーブルスキーマから自動的に推測されます。Realtime Compute for Apache Flink では、JSON 形式は jackson databind API を呼び出して、JSON データを解析および生成します。次の表に、Flink SQL データ型と JSON データ型の間のマッピングを示します。

Realtime Compute for Apache Flink SQL のデータ型

JSON データ型

CHAR / VARCHAR / STRING

STRING

BOOLEAN

BOOLEAN

BINARY / VARBINARY

エンコード STRING: base64

DECIMAL

NUMBER

TINYINT

NUMBER

SMALLINT

NUMBER

INT

NUMBER

BIGINT

NUMBER

FLOAT

NUMBER

DOUBLE

NUMBER

DATE

形式の STRING: date

TIME

形式の STRING: time

TIMESTAMP

形式の STRING: date-time

TIMESTAMP_WITH_LOCAL_TIME_ZONE

形式の STRING: date-time (UTC タイムゾーン)

INTERVAL

NUMBER

ARRAY

ARRAY

MAP / MULTISET

OBJECT

ROW

OBJECT

その他

OSS に書き込まれるデータは、JSON ファイルとして書き込むことができません。詳細については、「FLINK-30635」をご参照ください。