このトピックでは、Confluent Avroフォーマットの使用方法の例を示し、Confluent Avroのパラメーターとデータ型マッピングについて説明します。
背景情報
Confluent Avroフォーマットを使用すると、io.confluent.kafka.serializers.KafkaAvroSerializerによってシリアル化されたデータレコードを読み取り、io.confluent.kafka.serializers.KafkaAvroDeserializerによってデシリアル化されたデータレコードを書き込むことができます。
Confluent Avroフォーマットでデータレコードを読み取る場合、Avro書き込みスキーマは、データレコードにエンコードされているスキーマバージョン ID に基づいて、構成済みのConfluent Schema Registryから取得され、読み取りスキーマはテーブルスキーマから推測されます。
Confluent Avroフォーマットでデータレコードを書き込む場合、Avro構造はテーブルスキーマから推測され、データレコードにエンコードされているスキーマ ID を取得するために使用されます。スキーマ ID は、Confluent Schema Registryで構成されているサブジェクトで取得されます。サブジェクトは、avro-confluent.subjectパラメーターで指定されます。
Confluent Avroフォーマットをサポートするコネクターには、KafkaコネクターとUpsert Kafkaコネクターがあります。
サンプルコード
次のサンプルコードは、Apache KafkaコネクターとUpsert Kafkaコネクターを使用して、Confluent Avroフォーマットでテーブルを作成する方法の例を示しています。
例 1:Apache Kafkaコネクターを使用して、Kafkaキーとして元の UTF-8 文字列を使用し、Kafka値としてConfluent Schema Registryに登録されているAvroレコードを使用するテーブルを作成します。
CREATE TABLE user_created (
-- Kafkaキーとして使用される元のUTF-8文字列にカラムをマッピングします。
the_kafka_key STRING,
-- Kafka値として使用されるAvroフィールドにカラムをマッピングします。
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example1',
'properties.bootstrap.servers' = 'localhost:9092',
-- UTF-8文字列形式でKafkaキーを構成し、テーブル内のthe_kafka_keyという名前のカラムを使用します。
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
);Kafkaテーブルにデータを書き込むには、次のDDLステートメントを使用します。
INSERT INTO user_created
SELECT
-- ユーザーIDをKafkaキーにマッピングされているカラムにコピーします。
id as the_kafka_key,
-- すべてのKafka値を取得します。
id, name, email
FROM some_table例 2:Apache Kafkaコネクターを使用して、KafkaキーとKafka値としてConfluent Schema Registryに登録されているAvroレコードを使用するテーブルを作成します。
CREATE TABLE user_created (
-- Kafkaキーとして使用されるAvroフィールドidにカラムをマッピングします。
kafka_key_id STRING,
-- Kafka値として使用されるAvroフィールドにカラムをマッピングします。
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example2',
'properties.bootstrap.servers' = 'localhost:9092',
-- 注:ほとんどの場合、ハッシュパーティション分割のため、Kafkaキーのスキーマアップグレードは後方互換性も前方互換性もありません。
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = 'http://localhost:8082',
'key.fields' = 'kafka_key_id',
-- この例では、Avro形式のKafkaキーとKafka値の両方にidフィールドが含まれています。
-- 競合を避けるため、テーブル内のKafkaキーフィールドに関連付けられているカラムにプレフィックスを追加します。
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY',
-- サブジェクトには、Flink 1.13以降ではデフォルト値があります。デフォルト値は上書きできます。
'key.avro-confluent.subject' = 'user_events_example2-key2',
'value.avro-confluent.subject' = 'user_events_example2-value2'
);例 3:Upsert Kafkaコネクターを使用して、Kafka値としてConfluent Schema Registryに登録されているAvroレコードを使用するテーブルを作成します。
CREATE TABLE user_created (
-- Kafkaキーとして使用される元のUTF-8文字列にカラムをマッピングします。
kafka_key_id STRING,
-- Kafka値として使用されるAvroフィールドにカラムをマッピングします。
id STRING,
name STRING,
email STRING,
-- Upsert Kafkaコネクターを使用する場合、UPSERT動作を定義するためにプライマリキーを指定します。
PRIMARY KEY (kafka_key_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user_events_example3',
'properties.bootstrap.servers' = 'localhost:9092',
-- KafkaキーとしてUTF-8文字列を使用します。
-- この例では、key.fieldsパラメーターの値はテーブルのプライマリキーによって決定されるため、このパラメーターは指定されていません。
'key.format' = 'raw',
-- この例では、Avro形式のKafkaキーとKafka値の両方にidフィールドが含まれています。
-- 競合を避けるため、テーブル内のKafkaキーフィールドに関連付けられているカラムにプレフィックスを追加します。
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
);パラメーター
パラメーター | 必須 | デフォルト値 | データ型 | 説明 |
format | はい | デフォルト値なし | STRING | 使用するフォーマットを宣言します。 Confluent Avroフォーマットを使用する場合は、このパラメーターをConfluent Avroに設定します。 |
avro-confluent.basic-auth.credentials-source | いいえ | デフォルト値なし | STRING | Confluent Schema Registryの基本認証資格情報のソース。 |
avro-confluent.basic-auth.user-info | いいえ | デフォルト値なし | STRING | Confluent Schema Registryの基本認証ユーザー情報。 |
avro-confluent.bearer-auth.credentials-source | いいえ | デフォルト値なし | STRING | Confluent Schema Registryのベアラー認証資格情報のソース。 |
avro-confluent.bearer-auth.token | いいえ | デフォルト値なし | STRING | Confluent Schema Registryのベアラー認証トークン。 |
avro-confluent.properties | いいえ | デフォルト値なし | MAP | Confluent Schema Registryに転送されるプロパティマッピング。 このパラメーターは、Flink構成オプションを使用して公式に公開されていないオプションに適しています。 重要 Flink構成オプションは、このパラメーターよりも優先順位が高くなります。 |
avro-confluent.ssl.keystore.location | いいえ | デフォルト値なし | STRING | SSLキーストアの場所。 |
avro-confluent.ssl.keystore.password | いいえ | デフォルト値なし | STRING | SSLキーストアのパスワード。 |
avro-confluent.ssl.truststore.location | いいえ | デフォルト値なし | STRING | SSLトラストストアの場所。 |
avro-confluent.ssl.truststore.password | いいえ | デフォルト値なし | STRING | SSLトラストストアのパスワード。 |
avro-confluent.subject | いいえ | デフォルト値なし | STRING | Confluent Schema Registryのサブジェクト。 シリアル化中にConfluent Avroフォーマットで使用されるスキーマは、Confluent Schema Registryのサブジェクトに登録されます。 デフォルトでは、Apache KafkaコネクターとUpsert Kafkaコネクターは、値またはキーのフォーマットとしてConfluent Avroフォーマットが使用されている場合、デフォルトのサブジェクト名として<Topname>-valueまたは<topname>-keyを使用します。 結果テーブルにファイルシステムコネクターを使用する場合は、avro-confluent.subjectパラメーターが必要です。 |
avro-confluent.url | はい | デフォルト値なし | STRING | スキーマを取得または登録するためのConfluent Schema Registryの URL です。 |
データ型マッピング
Flink SQLデータ型とConfluent Avroデータ型の間のデータ型マッピングは、Flink SQLデータ型とAvroデータ型の間のデータ型マッピングと似ています。 次の表に、Flink SQLデータ型とAvroデータ型のマッピングを示します。
Flink SQLデータ型 | Avroデータ型 |
CHAR、VARCHAR、および STRING | STRING |
BOOLEAN | BOOLEAN |
BINARYおよび VARBINARY | BYTES |
DECIMAL | FIXED 説明 このデータ型の値は、精度を持つ10進数です。 |
TINYINT | INT |
SMALLINT | INT |
INT | INT |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | INT 説明 このデータ型の値は日付です。 |
TIME | INT 説明 このデータ型の値は、ミリ秒単位の時間です。 |
TIMESTAMP | LONG 説明 このデータ型の値は、ミリ秒単位のタイムスタンプです。 |
ARRAY | ARRAY |
MAP 説明 要素は、STRING、CHAR、または VARCHARデータ型である必要があります。 | MAP |
MULTISET 説明 要素は、STRING、CHAR、または VARCHARデータ型である必要があります。 | MAP |
ROW | RECORD |
Flinkは、Nullableデータ型のデータを読み書きできます。 Nullableデータ型のデータは、Avro union(something, null)にマッピングされます。 somethingは、Flinkデータ型から変換されたAvroデータ型です。
Avroデータ型の詳細については、「仕様」をご参照ください。