このトピックでは、Raw (未加工) フォーマットの使用方法の例を示し、Raw (未加工) のパラメーターとデータ型マッピングについて説明します。
背景情報
Raw (未加工) フォーマットは、バイトベースの未加工値を単一列として読み書きするために使用できます。 Raw (未加工) フォーマットは Flink に組み込まれています。 Raw (未加工) フォーマットをサポートするコネクタには、Kafka コネクタ、Upsert Kafka コネクタ、および Object Storage Service (OSS) コネクタ が含まれます。
サンプルコード
たとえば、Kafka データベースには、Raw (未加工) フォーマットのログデータが含まれています。 Flink SQL を使用して、ログデータを読み取り、分析できます。
47.xx.xx.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.xx.xx.119 Safari/537.36" "2.75"
次のサンプルコードは、Raw (未加工) フォーマットを使用して Kafka トピックからデータを読み取り、そのデータを文字列として使用する方法の例を示しています。 データは UTF-8 でエンコードされています。
CREATE TABLE nginx_log (
log STRING
) WITH (
'connector' = 'kafka',
'topic' = 'nginx_log',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'raw'
);
前のステートメントを実行して Raw (未加工) フォーマットのデータを読み取り、データを文字列として使用した後、ユーザー定義関数 (UDF) を使用して文字列を複数の文字列に分割し、さらに分析を実行できます。 たとえば、次の SQL ステートメントの my_split 関数を使用して、文字列を分割できます。
SELECT t.hostname, t.datetime, t.url, t.browser, ...
FROM(
SELECT my_split(log) as t FROM nginx_log
);
同様に、STRING データ型の列を、UTF-8 でエンコードされた文字列として Kafka トピックに書き込むことができます。
パラメーター
パラメーター | 必須 | デフォルト値 | データ型 | 説明 |
format | はい | デフォルト値なし | STRING | 使用するように宣言するフォーマット。 Raw (未加工) フォーマットを使用する場合は、このパラメーターを raw に設定します。 |
raw.charset | いいえ | UTF-8 | STRING | テキスト文字列のエンコードに使用する文字セット。 デフォルト値: UTF-8。 |
raw.endianness | いいえ | big-endian | STRING | バイト単位の値のエンディアン形式。 有効な値:
|
データ型マッピング
次の表に、Raw (未加工) フォーマットでサポートされている Flink SQL データ型を示します。
Flink SQL データ型 | 説明 |
CHAR、VARCHAR、および STRING | UTF-8 でエンコードされたテキスト文字列。 UTF-8 はデフォルトのエンコード形式です。 説明 テキスト文字列のエンコードに使用する文字セットは、raw.charset パラメーターで指定できます。 |
BINARY、VARBINARY、および BYTES | バイトのシーケンス。 |
BOOLEAN | BOOLEAN データ型の 1 バイト。 |
TINYINT | 符号付き数値の 1 バイト。 |
SMALLINT | big-endian エンコーディングの 2 バイト。 big-endian はデフォルトのエンディアン形式です。 説明 エンディアン形式は、raw.endianness パラメーターで指定できます。 |
INT | big-endian エンコーディングの 4 バイト。 big-endian はデフォルトのエンディアン形式です。 説明 エンディアン形式は、raw.endianness パラメーターで指定できます。 |
BIGINT | big-endian エンコーディングの 8 バイト。 big-endian はデフォルトのエンディアン形式です。 説明 エンディアン形式は、raw.endianness パラメーターで指定できます。 |
FLOAT | big-endian エンコーディングと IEEE 754 形式の 4 バイト。 big-endian はデフォルトのエンディアン形式です。 説明 エンディアン形式は、raw.endianness パラメーターで指定できます。 |
DOUBLE | big-endian エンコーディングと IEEE 754 形式の 8 バイト。 big-endian はデフォルトのエンディアン形式です。 説明 エンディアン形式は、raw.endianness パラメーターで指定できます。 |
RAW | RAW データ型の基になる TypeSerializer によってシリアル化されたバイトのシーケンス。 |
その他
Raw (未加工) フォーマットは、NULL 値を BYTE [] データ型の NULL 値としてエンコードするために使用されます。 Upsert Kafka コネクタは、NULL 値を tombstone メッセージとして識別し、Kafka キーから NULL 値を削除します。 したがって、フィールドに NULL 値があり、Upsert Kafka コネクタが使用されている場合は、Raw (未加工) フォーマットで value.format パラメーターを指定しないことをお勧めします。