このトピックでは、Upsert Kafkaコネクタの使用方法について説明します。
背景情報
Upsert Kafkaコネクタを使用すると、Kafkaトピックとの間でデータをアップサート形式で読み書きできます。
ソーステーブル用のUpsert Kafkaコネクタは、Kafkaトピックに格納されているデータを変更ログストリームに変換できます。変更ログストリームの各データレコードは、更新イベントまたは削除イベントを表します。 Kafkaトピックにデータレコードのキーと同じキーが含まれている場合、データレコードの値がキーの値を上書きします。データレコードはUPDATEとして解釈されます。 Kafkaトピックにデータレコードにそのようなキーが含まれていない場合、データレコードの値がKafkaトピックに挿入されます。データレコードはINSERTとして解釈されます。変更ログストリームの各データレコードは、同じキーを持つ既存の行が常に上書きされるため、UPSERT(INSERTまたはUPDATE)として解釈されます。データレコードのキーの値がnullの場合、データレコードはDELETEとして解釈されます。
シンクテーブルまたはデータインジェスチョンシンク用のUpsert Kafkaコネクタは、ソースによって生成された変更ログストリームを使用できます。 Upsert Kafkaコネクタは、INSERTおよびUPDATE_AFTERデータを通常のKafkaメッセージとしてKafkaトピックに書き込むことができます。 Upsert Kafkaコネクタは、DELETEデータをnull値を持つKafkaメッセージとしてKafkaトピックに書き込むことができます。データレコードのキーの値がnullの場合、そのキーを使用するKafkaメッセージは削除されます。 Flinkは、主キー列の値に基づいてデータをパーティション分割します。これにより、同じ主キーを持つメッセージが値でソートされます。したがって、同じ主キーを含むUPDATEまたはDELETEデータは、同じパーティションに書き込まれます。
項目 | 説明 |
テーブルタイプ | ソーステーブル、シンクテーブル、データインジェスチョンシンク |
実行モード | ストリーミングモード |
データ形式 | avro、avro-confluent、csv、json、およびraw |
メトリック |
|
APIタイプ | SQL APIおよびデータインジェスチョンYAML API |
シンクテーブルでのデータの更新または削除 | はい |
前提条件
Kafkaクラスタが作成されていること。 詳細については、「Dataflow Kafkaクラスタを作成する」または「Kafkaでリソースを作成する」をご参照ください。
Realtime Compute for Apache FlinkとKafkaクラスタ間にネットワーク接続が確立されていること。 Realtime Compute for Apache FlinkとE-MapReduce(EMR)で作成されたKafkaクラスタ間のネットワーク接続を確立する方法の詳細については、「VPCの作成と管理」および「セキュリティグループの概要」をご参照ください。 Realtime Compute for Apache Flinkと ApsaraMQ for Kafka クラスタ間のネットワーク接続を確立する方法の詳細については、「ホワイトリストを設定する」をご参照ください。
制限事項
Ververica Runtime(VVR) 2.0.0以降を使用するRealtime Compute for Apache Flinkのみが、Apache Kafkaコネクタをサポートしています。
Upsert Kafkaコネクタは、Apache Kafka 0.10以降のデータのみの読み取りまたは書き込みに使用できます。
Upsert Kafkaコネクタは、Apache Kafka 2.8のクライアントパラメータのみをサポートしています。 Kafkaプロデューサーとコンシューマーの構成パラメーターの詳細については、「Consumer Configs」および「Producer Configs」をご参照ください。
Upsert Kafkaシンクテーブルがexactly-onceセマンティクスを使用する場合、Kafkaトランザクションメカニズムを有効にして、Kafkaクラスタにデータを書き込む必要があります。 Kafkaクラスタのバージョンは、Apache Kafka 0.11以降である必要があります。
SQL
Upsert Kafkaコネクタを使用すると、Kafkaトピックとの間でデータをアップサート形式で読み書きできます。
構文
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);
WITH句のパラメーター
共通パラメーター
オプション
説明
データ型
必須
デフォルト値
備考
connector
テーブルのタイプ。
String
はい
デフォルト値なし
値をupsert-kafkaに設定します。
properties.bootstrap.servers
KafkaブローカーのIPアドレスまたはエンドポイントとポート番号。
String
はい
デフォルト値なし
形式:
host:port,host:port,host:port
。 複数の host:port ペアをコンマ(,)で区切ります。properties.*
Kafkaクライアントに構成されているパラメーター。
String
いいえ
デフォルト値なし
このパラメーターの接尾辞は、「Producer Configs」および「Consumer Configs」で定義されているルールに準拠する必要があります。
Flink は properties. プレフィックスを削除し、変換されたキーと値を Kafka クライアントに渡します。 たとえば、
properties.allow.auto.create.topics
を false に設定して、トピックの自動作成を無効にすることができます。Upsert Kafkaコネクタを使用した後、パラメーターの値が上書きされるため、properties.プレフィックスを追加して次のパラメーターの構成を変更することはできません。
key.deserializer
value.deserializer
key.format
Kafkaメッセージのキーフィールドの読み取りまたは書き込みに使用される形式。
String
はい
デフォルト値なし
このパラメーターを構成する場合は、key.fieldsまたはkey.fields-prefixパラメーターを構成する必要があります。
有効な値:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
key.fields-prefix
Kafkaメッセージのすべてのキーフィールドのカスタムプレフィックス。このパラメーターを構成して、値フィールドとの名前の競合を防ぐことができます。
String
いいえ
デフォルト値なし
このパラメーターは、ソーステーブルとシンクテーブルの列名を区別するためにのみ使用されます。 Kafkaメッセージのキーフィールドが解析および生成されると、プレフィックスは列名から削除されます。
説明このパラメーターを構成する場合は、value.fields-includeパラメーターをEXCEPT_KEYに設定する必要があります。
value.format
Kafkaメッセージの値フィールドの読み取りまたは書き込みに使用される形式。
String
はい
デフォルト値なし
このパラメーターの構成は、formatパラメーターの構成と同じです。 formatパラメーターは、value.formatパラメーターと組み合わせて使用することはできません。両方のパラメーターを構成すると、競合が発生します。
value.fields-include
Kafkaメッセージの値フィールドが解析または生成されるときに、メッセージキーに対応するフィールドを含めるかどうかを指定します。
String
はい
ALL
有効な値:
ALL:すべてのフィールドがKafkaメッセージの値フィールドとして処理されます。これはデフォルト値です。
EXCEPT_KEY:key.fieldsパラメーターで指定されたフィールドを除くすべてのフィールドが、Kafkaメッセージの値フィールドとして処理されます。
topic
データの読み取り元または書き込み先のトピックの名前。
String
はい
デフォルト値なし
該当なし
シンクテーブル専用のパラメーター
オプション
説明
データ型
必須
デフォルト値
備考
sink.parallelism
Kafkaシンクテーブルの演算子の並列度。
Integer
いいえ
アップストリーム演算子の並列度。これはフレームワークによって決定されます。
該当なし
sink.buffer-flush.max-rows
キャッシュが更新される前にキャッシュできるデータレコードの最大数。
Integer
いいえ
0(無効)
シンクテーブルが同じキーに対して多数の更新を受信した場合、キーの最後のデータレコードのみがキャッシュに保持されます。この場合、シンクテーブルでのデータキャッシュは、Kafkaトピックに書き込まれるデータ量を削減するのに役立ちます。これにより、潜在的な墓石メッセージがKafkaトピックに送信されるのを防ぎます。
説明シンクテーブルのデータキャッシュを有効にするには、sink.buffer-flush.max-rowsおよびsink.buffer-flush.intervalパラメーターを 0より大きい値に設定する必要があります。
sink.buffer-flush.interval
キャッシュが更新される間隔。
Duration
いいえ
0(無効)
単位はミリ秒、秒、分、または時間です。 たとえば、
'sink.buffer-flush.interval'='1 s'
と構成できます。シンクテーブルが同じキーに対して多数の更新を受信した場合、キーの最後のデータレコードのみがキャッシュに保持されます。この場合、シンクテーブルでのデータキャッシュは、Kafkaトピックに書き込まれるデータ量を削減するのに役立ちます。これにより、潜在的な墓石メッセージがKafkaトピックに送信されるのを防ぎます。
説明シンクテーブルのデータキャッシュを有効にするには、sink.buffer-flush.max-rowsおよびsink.buffer-flush.intervalパラメーターを 0より大きい値に設定する必要があります。
データインジェスチョン
Upsert Kafkaコネクタを使用して、データインジェスチョンのYAMLドラフトを作成できます。さらに、データインジェスチョンのシンクとして使用できます。データはJSON形式でシンクに書き込まれ、主キーフィールドもメッセージ本文に含まれます。
構文
sink:
type: upsert-kafka
name: upsert-kafka Sink
properties.bootstrap.servers: localhost:9092
# ApsaraMQ for Kafka
aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
aliyun.kafka.instanceId: ${instancd-id}
aliyun.kafka.endpoint: ${endpoint}
aliyun.kafka.regionId: ${region-id}
コネクタオプション
オプション | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
type | シンクのコネクタタイプ | STRING | はい | デフォルト値なし | パラメーター値をupsert-kafkaに設定します。 |
name | シンクのコネクタ名 | STRING | いいえ | デフォルト値なし | 該当なし |
properties.bootstrap.servers | KafkaブローカーのIPアドレスまたはエンドポイントとポート番号。 | STRING | はい | デフォルト値なし | 形式: |
properties.* | Kafkaクライアントに構成されているパラメーター。 | STRING | いいえ | デフォルト値なし | このパラメーターの接尾辞は、「Producer Configs」で定義されているルールに準拠する必要があります。 Flink は |
sink.delivery-guarantee | Kafkaシンクの配信セマンティクス | STRING | いいえ | at-least-once | 有効な値:
|
sink.add-tableId-to-header-enabled | テーブル情報をヘッダーに書き込むかどうか。 | BOOLEAN | いいえ | false | オプションを |
aliyun.kafka.accessKeyId | Alibaba CloudアカウントのAccessKey ID。 | STRING | いいえ | デフォルト値なし | 詳細については、「AccessKeyペアを取得する」をご参照ください。 説明 ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。 |
aliyun.kafka.accessKeySecret | Alibaba CloudアカウントのAccessKeyシークレット。 | STRING | いいえ | デフォルト値なし | 詳細については、「AccessKeyペアを取得する」をご参照ください。 説明 ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。 |
aliyun.kafka.instanceId | ApsaraMQ for KafkaインスタンスのID | STRING | いいえ | デフォルト値なし | [インスタンスの詳細] ページでインスタンス ID を表示できます。 説明 ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。 |
aliyun.kafka.endpoint | ApsaraMQ for Kafkaのエンドポイント。 | STRING | いいえ | デフォルト値なし | 詳細については、「エンドポイント」をご参照ください。 説明 ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。 |
aliyun.kafka.regionId | トピックを作成するインスタンスのリージョン ID。 | STRING | いいえ | デフォルト値なし | 詳細については、「エンドポイント」をご参照ください。 説明 ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。 |
サポートされている変更タイプ
Upsert Kafka コネクタは、データ ingestion のすべての変更タイプをサポートしています。 ただし、Upsert Kafka コネクタを使用して Flink SQL ジョブを介して固定スキーマを持つソーステーブルからデータを ingestion する必要があることに注意してください。
サンプルコード
ソーステーブルのサンプルコード
Webサイトユーザーの閲覧データを含むKafkaソーステーブルを作成します。
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND // viewtime に対するウォーターマークは、viewtime から 2 秒を引いた値です。 )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' );
シンクテーブルのサンプルコード
Upsert Kafkaシンクテーブルを作成します。
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' );
Webサイトユーザーの閲覧データをシンクテーブルに書き込みます。
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCT user_id) FROM pageviews GROUP BY user_region;
データインジェスチョンシンク
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: upsert-kafka name: Upsert Kafka Sink properties.bootstrap.servers: ${upsert.kafka.bootstraps.server} aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak} aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk} aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid} aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint} aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid} route: - source-table: ${mysql.source.table} sink-table: ${upsert.kafka.topic}