このトピックでは、アップサート Kafka コネクタの使用方法について説明します。
背景情報
アップサート Kafka コネクタは、アップサート操作を使用して Kafka トピックとの間でデータを読み書きします。
-
ソーステーブルとして、コネクタは Kafka に保存されているデータをチェンジログストリームに変換できます。ストリーム内の各レコードは、更新または削除イベントを表します。データレコードの値は、同じキーの最後の値に対する UPDATE として解釈されます (そのキーが存在する場合)。キーが存在しない場合、更新は INSERT として扱われます。テーブル用語では、チェンジログストリーム内のレコードは UPSERT (INSERT または UPDATE とも呼ばれます) であり、同じキーを持つ既存の行は上書きされます。空の値を持つメッセージは DELETE メッセージとして扱われます。
-
シンクテーブルまたはデータインジェストシンクとして、コネクタはアップストリームジョブによって生成されたチェンジログストリームを消費できます。INSERT または UPDATE_AFTER データは通常の Kafka メッセージとして書き込まれます。DELETE データは空の値を持つ Kafka メッセージとして書き込まれ、これは対応するキーのメッセージが削除されたことを示します。Flink は、プライマリキー列の値に基づいてデータをパーティション分割します。これにより、同じプライマリキーを持つメッセージが順序付けされることが保証されます。結果として、同じプライマリキーの更新または削除メッセージは同じパーティションに書き込まれます。
|
カテゴリ |
説明 |
|
サポートされているタイプ |
ソーステーブル、シンクテーブル、およびデータインジェストシンク |
|
実行モード |
ストリーミングモード |
|
データフォーマット |
avro、avro-confluent、csv、json、および raw |
|
特定の監視メトリクス |
|
|
API タイプ |
SQL およびデータインジェスト YAML ジョブ |
|
シンクテーブルでのデータの更新または削除 |
はい |
前提条件
-
Kafka クラスターを作成します。詳細については、「DataFlow Kafka クラスターの作成」または「Kafka でのリソースの作成」をご参照ください。
-
ご利用の Flink クラスターと Kafka クラスター間のネットワーク接続を確立します。EMR 上の Kafka の詳細については、「VPC の作成と管理」および「セキュリティグループの概要」をご参照ください。ApsaraMQ for Kafka の場合は、ホワイトリストを構成する必要があります。
制限事項
-
Kafka コネクタは、Ververica Runtime (VVR) 2.0.0 以降を使用する Flink のみでサポートされています。
-
コネクタは、Apache Kafka 0.10 以降からの読み取りと書き込みのみをサポートしています。
-
コネクタは、Apache Kafka 2.8 のクライアントパラメーターのみをサポートしています。詳細については、Apache Kafka のコンシューマーおよびプロデューサー構成ドキュメントをご参照ください。
-
アップサート Kafka シンクテーブルが 1 回限りのセマンティクスを使用する場合、送信先 Kafka クラスターでトランザクション機能が有効になっている必要があります。クラスターは Apache Kafka 0.11 以降である必要があります。
-
アップサート Kafka ソーステーブルは、`earliest-offset` 起動モードのみをサポートしています。このモードは構成できません。コネクタは、すべての履歴変更データを読み取り、完全なチェンジログを取得します。これにより、SQL で完全なチェンジログを処理し、パイプライン全体で 1 回限りのセマンティクスを提供できます。タイムスタンプまたは `latest-offset` などの別の起動モードを指定すると、コネクタは不完全なチェンジログを読み取ります。これにより、ダウンストリーム計算でデータ整合性の問題が発生する可能性があります。
SQL
アップサート Kafka コネクタは、アップサート操作を使用して Kafka トピックとの間でデータを読み書きします。
構文
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);
WITH パラメーター
-
一般
パラメーター
説明
データ型
必須
デフォルト
備考
connector
テーブルのタイプ。
String
はい
なし
値を `upsert-kafka` に設定します。
properties.bootstrap.servers
Kafka ブローカーのアドレス。
String
はい
なし
フォーマットは
host:port,host:port,host:portです。複数のアドレスをカンマ (,) で区切ります。properties.*
Kafka クライアントのパラメーター。
String
いいえ
なし
サフィックスは、公式 Kafka ドキュメントに記載の、プロデューサーおよびコンシューマー用の構成でなければなりません。
Flink は `properties.` プレフィックスを削除し、残りの構成を Kafka クライアントに渡します。たとえば、
'properties.allow.auto.create.topics' = 'false'を使用して、自動トピック作成を無効にできます。Kafka コネクタがそれらを上書きするため、次の構成をこの方法で変更しないでください。
-
key.deserializer
-
value.deserializer
key.format
Kafka メッセージのキー部分のフォーマット。
String
はい
なし
このパラメーターを構成する場合は、key.fields または key.fields-prefix も構成する必要があります。
有効な値:
-
csv
-
json
-
avro
-
debezium-json
-
canal-json
-
maxwell-json
-
avro-confluent
-
raw
key.fields-prefix
Kafka メッセージ内のすべてのキーフィールドのカスタムプレフィックス。これにより、値フォーマット内のフィールドとの名前衝突を回避します。
String
いいえ
なし
このパラメーターは、ソーステーブルとシンクテーブル間の列名を区別するためにのみ使用されます。Kafka メッセージのキー部分が解析および生成されるときに、プレフィックスは削除されます。
説明このパラメーターを構成する場合は、value.fields-include を `EXCEPT_KEY` に設定する必要があります。
value.format
Kafka メッセージの値部分のフォーマット。
String
はい
なし
このパラメーターは format と同等です。どちらか一方のみを構成できます。format と value.format の両方を構成すると競合が発生します。
value.fields-include
Kafka メッセージの値部分が解析または生成されるときに、メッセージキーに対応するフィールドを含めるかどうかを指定します。
String
はい
ALL
有効な値:
-
ALL (デフォルト): すべての列が Kafka メッセージの値部分として処理されます。
-
EXCEPT_KEY: `key.fields` で定義されたフィールドを除くすべての列が Kafka メッセージの値部分として処理されます。
topic
読み取りまたは書き込みを行うトピック名。
String
はい
なし
なし。
-
-
シンク固有のパラメーター
パラメーター
説明
データ型
必須
デフォルト
備考
sink.parallelism
Kafka シンク演算子の同時実行数。
Integer
いいえ
アップストリーム演算子の同時実行数。これはフレームワークによって決定されます。
なし。
sink.buffer-flush.max-rows
キャッシュがフラッシュされる前にキャッシュできる最大レコード数。
Integer
いいえ
0 (無効)
シンクテーブルが同じキーに対して多くの更新を受信する場合、キャッシュはそのキーの最後のレコードのみを保持します。これにより、Kafka トピックに送信されるデータ量を削減し、潜在的なトムストーンメッセージを回避できます。
説明シンクキャッシュを有効にするには、sink.buffer-flush.max-rows と sink.buffer-flush.interval の両方をゼロより大きい値に設定します。
sink.buffer-flush.interval
キャッシュがフラッシュされる間隔。
Duration
いいえ
0 (無効)
単位はミリ秒 (ms)、秒 (s)、分 (min)、または時間 (h) です。例:
'sink.buffer-flush.interval'='1 s'。シンクテーブルが同じキーに対して多くの更新を受信する場合、キャッシュはそのキーの最後のレコードのみを保持します。これにより、Kafka トピックに送信されるデータ量を削減し、潜在的なトムストーンメッセージを回避できます。
説明シンクキャッシュを有効にするには、sink.buffer-flush.max-rows と sink.buffer-flush.interval の両方をゼロより大きい値に設定します。
データインジェスト
アップサート Kafka コネクタは、YAML データインジェストジョブのシンクとして使用できます。データは JSON フォーマットで書き込まれ、プライマリキーフィールドもメッセージ本文に含まれます。
構文
sink:
type: upsert-kafka
name: upsert-kafka Sink
properties.bootstrap.servers: localhost:9092
# ApsaraMQ for Kafka
aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
aliyun.kafka.instanceId: ${instancd-id}
aliyun.kafka.endpoint: ${endpoint}
aliyun.kafka.regionId: ${region-id}
パラメーター
|
パラメーター |
説明 |
データ型 |
必須 |
デフォルト |
備考 |
|
type |
シンクのタイプ。 |
STRING |
はい |
なし |
値を `upsert-kafka` に設定します。 |
|
name |
シンク名。 |
STRING |
いいえ |
なし |
なし。 |
|
properties.bootstrap.servers |
Kafka ブローカーのアドレス。 |
STRING |
はい |
なし |
フォーマットは |
|
properties.* |
Kafka クライアントのパラメーター。 |
STRING |
いいえ |
なし |
サフィックスは、公式 Kafka ドキュメントのプロデューサーで定義されている構成である必要があります。 Flink は `properties.` プレフィックスを削除し、残りの構成を Kafka クライアントに渡します。たとえば、 |
|
sink.delivery-guarantee |
書き込み操作のセマンティックパターン。 |
STRING |
いいえ |
at-least-once |
有効な値:
|
|
sink.add-tableId-to-header-enabled |
テーブル情報をヘッダーに書き込むかどうかを指定します。 |
BOOLEAN |
いいえ |
false |
有効にすると、`namespace`、`schemaName`、および `tableName` がヘッダーに書き込まれます。 |
|
aliyun.kafka.accessKeyId |
ご利用の Alibaba Cloud アカウントの AccessKey ID。 |
STRING |
いいえ |
なし |
詳細については、「AccessKey ペアの作成」をご参照ください。 説明
ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。 |
|
aliyun.kafka.accessKeySecret |
ご利用の Alibaba Cloud アカウントの AccessKey Secret。 |
STRING |
いいえ |
なし |
詳細については、「AccessKey ペアの作成」をご参照ください。 説明
ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。 |
|
aliyun.kafka.instanceId |
ApsaraMQ for Kafka インスタンスの ID。 |
STRING |
いいえ |
なし |
Alibaba Cloud Kafka インターフェイスでインスタンスの詳細を表示できます。 説明
ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。 |
|
aliyun.kafka.endpoint |
ApsaraMQ for Kafka の API エンドポイント。 |
STRING |
いいえ |
なし |
詳細については、「エンドポイント」をご参照ください。 説明
ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。 |
|
aliyun.kafka.regionId |
トピックが存在するインスタンスのリージョン ID。 |
STRING |
いいえ |
なし |
詳細については、「エンドポイント」をご参照ください。 説明
ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。 |
サポートされているタイプ変更
データインジェスト用のアップサート Kafka コネクタは、すべてのタイプの変更操作をサポートしています。ただし、データを読み取るには、固定スキーマを持つ Flink アップサート Kafka SQL コネクタを使用する必要があります。
例
-
ソーステーブル
ウェブサイトユーザーの閲覧データを含む Kafka ソーステーブルを作成します。
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' ); -
シンクテーブル
-
アップサート Kafka シンクテーブルを作成します。
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' ); -
ウェブサイトユーザーの閲覧データをシンクテーブルに書き込みます。
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCTuser_id) FROM pageviews GROUP BY user_region;
-
-
データインジェストシンク
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: upsert-kafka name: Upsert Kafka Sink properties.bootstrap.servers: ${upsert.kafka.bootstraps.server} aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak} aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk} aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid} aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint} aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid} route: - source-table: ${mysql.source.table} sink-table: ${upsert.kafka.topic}