すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Confluent Avro

最終更新日:Jan 08, 2025

このトピックでは、Confluent Avroフォーマットの使用方法の例を示し、Confluent Avroのパラメーターとデータ型マッピングについて説明します。

背景情報

Confluent Avroフォーマットを使用すると、io.confluent.kafka.serializers.KafkaAvroSerializerによってシリアル化されたデータレコードを読み取り、io.confluent.kafka.serializers.KafkaAvroDeserializerによってデシリアル化されたデータレコードを書き込むことができます。

  • Confluent Avroフォーマットでデータレコードを読み取る場合、Avro書き込みスキーマは、データレコードにエンコードされているスキーマバージョン ID に基づいて、構成済みのConfluent Schema Registryから取得され、読み取りスキーマはテーブルスキーマから推測されます。

  • Confluent Avroフォーマットでデータレコードを書き込む場合、Avro構造はテーブルスキーマから推測され、データレコードにエンコードされているスキーマ ID を取得するために使用されます。スキーマ ID は、Confluent Schema Registryで構成されているサブジェクトで取得されます。サブジェクトは、avro-confluent.subjectパラメーターで指定されます。

Confluent Avroフォーマットをサポートするコネクターには、KafkaコネクターUpsert Kafkaコネクターがあります。

サンプルコード

次のサンプルコードは、Apache KafkaコネクターとUpsert Kafkaコネクターを使用して、Confluent Avroフォーマットでテーブルを作成する方法の例を示しています。

  • 例 1:Apache Kafkaコネクターを使用して、Kafkaキーとして元の UTF-8 文字列を使用し、Kafka値としてConfluent Schema Registryに登録されているAvroレコードを使用するテーブルを作成します。

CREATE TABLE user_created (
  -- Kafkaキーとして使用される元のUTF-8文字列にカラムをマッピングします。
  the_kafka_key STRING,
  
  -- Kafka値として使用されるAvroフィールドにカラムをマッピングします。
  id STRING,
  name STRING,
  email STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_events_example1',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- UTF-8文字列形式でKafkaキーを構成し、テーブル内のthe_kafka_keyという名前のカラムを使用します。
  'key.format' = 'raw',
  'key.fields' = 'the_kafka_key',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
);

Kafkaテーブルにデータを書き込むには、次のDDLステートメントを使用します。

INSERT INTO user_created
SELECT
  -- ユーザーIDをKafkaキーにマッピングされているカラムにコピーします。
  id as the_kafka_key,

  -- すべてのKafka値を取得します。
  id, name, email
FROM some_table
  • 例 2:Apache Kafkaコネクターを使用して、KafkaキーとKafka値としてConfluent Schema Registryに登録されているAvroレコードを使用するテーブルを作成します。

CREATE TABLE user_created (  
  -- Kafkaキーとして使用されるAvroフィールドidにカラムをマッピングします。
  kafka_key_id STRING,
  
  -- Kafka値として使用されるAvroフィールドにカラムをマッピングします。
  id STRING,
  name STRING, 
  email STRING 
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_events_example2',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- 注:ほとんどの場合、ハッシュパーティション分割のため、Kafkaキーのスキーマアップグレードは後方互換性も前方互換性もありません。
  'key.format' = 'avro-confluent',
  'key.avro-confluent.url' = 'http://localhost:8082',
  'key.fields' = 'kafka_key_id',

  -- この例では、Avro形式のKafkaキーとKafka値の両方にidフィールドが含まれています。
  -- 競合を避けるため、テーブル内のKafkaキーフィールドに関連付けられているカラムにプレフィックスを追加します。
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY',
   
  -- サブジェクトには、Flink 1.13以降ではデフォルト値があります。デフォルト値は上書きできます。
  'key.avro-confluent.subject' = 'user_events_example2-key2',
  'value.avro-confluent.subject' = 'user_events_example2-value2'
);
  • 例 3:Upsert Kafkaコネクターを使用して、Kafka値としてConfluent Schema Registryに登録されているAvroレコードを使用するテーブルを作成します。

CREATE TABLE user_created (  
  -- Kafkaキーとして使用される元のUTF-8文字列にカラムをマッピングします。
  kafka_key_id STRING,
  
  -- Kafka値として使用されるAvroフィールドにカラムをマッピングします。
  id STRING, 
  name STRING, 
  email STRING, 
  
  -- Upsert Kafkaコネクターを使用する場合、UPSERT動作を定義するためにプライマリキーを指定します。
  PRIMARY KEY (kafka_key_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'user_events_example3',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- KafkaキーとしてUTF-8文字列を使用します。
  -- この例では、key.fieldsパラメーターの値はテーブルのプライマリキーによって決定されるため、このパラメーターは指定されていません。
  'key.format' = 'raw',
  
  -- この例では、Avro形式のKafkaキーとKafka値の両方にidフィールドが含まれています。
  -- 競合を避けるため、テーブル内のKafkaキーフィールドに関連付けられているカラムにプレフィックスを追加します。
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
);

パラメーター

パラメーター

必須

デフォルト値

データ型

説明

format

はい

デフォルト値なし

STRING

使用するフォーマットを宣言します。 Confluent Avroフォーマットを使用する場合は、このパラメーターをConfluent Avroに設定します。

avro-confluent.basic-auth.credentials-source

いいえ

デフォルト値なし

STRING

Confluent Schema Registryの基本認証資格情報のソース。

avro-confluent.basic-auth.user-info

いいえ

デフォルト値なし

STRING

Confluent Schema Registryの基本認証ユーザー情報。

avro-confluent.bearer-auth.credentials-source

いいえ

デフォルト値なし

STRING

Confluent Schema Registryのベアラー認証資格情報のソース。

avro-confluent.bearer-auth.token

いいえ

デフォルト値なし

STRING

Confluent Schema Registryのベアラー認証トークン。

avro-confluent.properties

いいえ

デフォルト値なし

MAP

Confluent Schema Registryに転送されるプロパティマッピング。 このパラメーターは、Flink構成オプションを使用して公式に公開されていないオプションに適しています。

重要

Flink構成オプションは、このパラメーターよりも優先順位が高くなります。

avro-confluent.ssl.keystore.location

いいえ

デフォルト値なし

STRING

SSLキーストアの場所。

avro-confluent.ssl.keystore.password

いいえ

デフォルト値なし

STRING

SSLキーストアのパスワード。

avro-confluent.ssl.truststore.location

いいえ

デフォルト値なし

STRING

SSLトラストストアの場所。

avro-confluent.ssl.truststore.password

いいえ

デフォルト値なし

STRING

SSLトラストストアのパスワード。

avro-confluent.subject

いいえ

デフォルト値なし

STRING

Confluent Schema Registryのサブジェクト。 シリアル化中にConfluent Avroフォーマットで使用されるスキーマは、Confluent Schema Registryのサブジェクトに登録されます。 デフォルトでは、Apache KafkaコネクターとUpsert Kafkaコネクターは、値またはキーのフォーマットとしてConfluent Avroフォーマットが使用されている場合、デフォルトのサブジェクト名として<Topname>-valueまたは<topname>-keyを使用します。 結果テーブルにファイルシステムコネクターを使用する場合は、avro-confluent.subjectパラメーターが必要です。

avro-confluent.url

はい

デフォルト値なし

STRING

スキーマを取得または登録するためのConfluent Schema Registryの URL です。

データ型マッピング

Flink SQLデータ型とConfluent Avroデータ型の間のデータ型マッピングは、Flink SQLデータ型とAvroデータ型の間のデータ型マッピングと似ています。 次の表に、Flink SQLデータ型とAvroデータ型のマッピングを示します。

Flink SQLデータ型

Avroデータ型

CHAR、VARCHAR、および STRING

STRING

BOOLEAN

BOOLEAN

BINARYおよび VARBINARY

BYTES

DECIMAL

FIXED

説明

このデータ型の値は、精度を持つ10進数です。

TINYINT

INT

SMALLINT

INT

INT

INT

BIGINT

LONG

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

INT

説明

このデータ型の値は日付です。

TIME

INT

説明

このデータ型の値は、ミリ秒単位の時間です。

TIMESTAMP

LONG

説明

このデータ型の値は、ミリ秒単位のタイムスタンプです。

ARRAY

ARRAY

MAP

説明

要素は、STRING、CHAR、または VARCHARデータ型である必要があります。

MAP

MULTISET

説明

要素は、STRING、CHAR、または VARCHARデータ型である必要があります。

MAP

ROW

RECORD

Flinkは、Nullableデータ型のデータを読み書きできます。 Nullableデータ型のデータは、Avro union(something, null)にマッピングされます。 somethingは、Flinkデータ型から変換されたAvroデータ型です。

説明

Avroデータ型の詳細については、「仕様」をご参照ください。