すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Upsert Kafka コネクタ

最終更新日:Jul 03, 2025

このトピックでは、Upsert Kafkaコネクタの使用方法について説明します。

背景情報

Upsert Kafkaコネクタを使用すると、Kafkaトピックとの間でデータをアップサート形式で読み書きできます。

  • ソーステーブル用のUpsert Kafkaコネクタは、Kafkaトピックに格納されているデータを変更ログストリームに変換できます。変更ログストリームの各データレコードは、更新イベントまたは削除イベントを表します。 Kafkaトピックにデータレコードのキーと同じキーが含まれている場合、データレコードの値がキーの値を上書きします。データレコードはUPDATEとして解釈されます。 Kafkaトピックにデータレコードにそのようなキーが含まれていない場合、データレコードの値がKafkaトピックに挿入されます。データレコードはINSERTとして解釈されます。変更ログストリームの各データレコードは、同じキーを持つ既存の行が常に上書きされるため、UPSERT(INSERTまたはUPDATE)として解釈されます。データレコードのキーの値がnullの場合、データレコードはDELETEとして解釈されます。

  • シンクテーブルまたはデータインジェスチョンシンク用のUpsert Kafkaコネクタは、ソースによって生成された変更ログストリームを使用できます。 Upsert Kafkaコネクタは、INSERTおよびUPDATE_AFTERデータを通常のKafkaメッセージとしてKafkaトピックに書き込むことができます。 Upsert Kafkaコネクタは、DELETEデータをnull値を持つKafkaメッセージとしてKafkaトピックに書き込むことができます。データレコードのキーの値がnullの場合、そのキーを使用するKafkaメッセージは削除されます。 Flinkは、主キー列の値に基づいてデータをパーティション分割します。これにより、同じ主キーを持つメッセージが値でソートされます。したがって、同じ主キーを含むUPDATEまたはDELETEデータは、同じパーティションに書き込まれます。

項目

説明

テーブルタイプ

ソーステーブル、シンクテーブル、データインジェスチョンシンク

実行モード

ストリーミングモード

データ形式

avro、avro-confluent、csv、json、およびraw

メトリック

  • ソーステーブルのメトリック

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • シンクテーブルのメトリック

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

APIタイプ

SQL APIおよびデータインジェスチョンYAML API

シンクテーブルでのデータの更新または削除

はい

前提条件

制限事項

  • Ververica Runtime(VVR) 2.0.0以降を使用するRealtime Compute for Apache Flinkのみが、Apache Kafkaコネクタをサポートしています。

  • Upsert Kafkaコネクタは、Apache Kafka 0.10以降のデータのみの読み取りまたは書き込みに使用できます。

  • Upsert Kafkaコネクタは、Apache Kafka 2.8のクライアントパラメータのみをサポートしています。 Kafkaプロデューサーとコンシューマーの構成パラメーターの詳細については、「Consumer Configs」および「Producer Configs」をご参照ください。

  • Upsert Kafkaシンクテーブルがexactly-onceセマンティクスを使用する場合、Kafkaトランザクションメカニズムを有効にして、Kafkaクラスタにデータを書き込む必要があります。 Kafkaクラスタのバージョンは、Apache Kafka 0.11以降である必要があります。

SQL

Upsert Kafkaコネクタを使用すると、Kafkaトピックとの間でデータをアップサート形式で読み書きできます。

構文

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

WITH句のパラメーター

  • 共通パラメーター

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    String

    はい

    デフォルト値なし

    値をupsert-kafkaに設定します。

    properties.bootstrap.servers

    KafkaブローカーのIPアドレスまたはエンドポイントとポート番号。

    String

    はい

    デフォルト値なし

    形式:host:port,host:port,host:port。 複数の host:port ペアをコンマ(,)で区切ります。

    properties.*

    Kafkaクライアントに構成されているパラメーター。

    String

    いいえ

    デフォルト値なし

    このパラメーターの接尾辞は、「Producer Configs」および「Consumer Configs」で定義されているルールに準拠する必要があります。

    Flink は properties. プレフィックスを削除し、変換されたキーと値を Kafka クライアントに渡します。 たとえば、properties.allow.auto.create.topics を false に設定して、トピックの自動作成を無効にすることができます。

    Upsert Kafkaコネクタを使用した後、パラメーターの値が上書きされるため、properties.プレフィックスを追加して次のパラメーターの構成を変更することはできません。

    • key.deserializer

    • value.deserializer

    key.format

    Kafkaメッセージのキーフィールドの読み取りまたは書き込みに使用される形式。

    String

    はい

    デフォルト値なし

    このパラメーターを構成する場合は、key.fieldsまたはkey.fields-prefixパラメーターを構成する必要があります。

    有効な値:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    Kafkaメッセージのすべてのキーフィールドのカスタムプレフィックス。このパラメーターを構成して、値フィールドとの名前の競合を防ぐことができます。

    String

    いいえ

    デフォルト値なし

    このパラメーターは、ソーステーブルとシンクテーブルの列名を区別するためにのみ使用されます。 Kafkaメッセージのキーフィールドが解析および生成されると、プレフィックスは列名から削除されます。

    説明

    このパラメーターを構成する場合は、value.fields-includeパラメーターをEXCEPT_KEYに設定する必要があります。

    value.format

    Kafkaメッセージの値フィールドの読み取りまたは書き込みに使用される形式。

    String

    はい

    デフォルト値なし

    このパラメーターの構成は、formatパラメーターの構成と同じです。 formatパラメーターは、value.formatパラメーターと組み合わせて使用することはできません。両方のパラメーターを構成すると、競合が発生します。

    value.fields-include

    Kafkaメッセージの値フィールドが解析または生成されるときに、メッセージキーに対応するフィールドを含めるかどうかを指定します。

    String

    はい

    ALL

    有効な値:

    • ALL:すべてのフィールドがKafkaメッセージの値フィールドとして処理されます。これはデフォルト値です。

    • EXCEPT_KEY:key.fieldsパラメーターで指定されたフィールドを除くすべてのフィールドが、Kafkaメッセージの値フィールドとして処理されます。

    topic

    データの読み取り元または書き込み先のトピックの名前。

    String

    はい

    デフォルト値なし

    該当なし

  • シンクテーブル専用のパラメーター

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    sink.parallelism

    Kafkaシンクテーブルの演算子の並列度。

    Integer

    いいえ

    アップストリーム演算子の並列度。これはフレームワークによって決定されます。

    該当なし

    sink.buffer-flush.max-rows

    キャッシュが更新される前にキャッシュできるデータレコードの最大数。

    Integer

    いいえ

    0(無効)

    シンクテーブルが同じキーに対して多数の更新を受信した場合、キーの最後のデータレコードのみがキャッシュに保持されます。この場合、シンクテーブルでのデータキャッシュは、Kafkaトピックに書き込まれるデータ量を削減するのに役立ちます。これにより、潜在的な墓石メッセージがKafkaトピックに送信されるのを防ぎます。

    説明

    シンクテーブルのデータキャッシュを有効にするには、sink.buffer-flush.max-rowsおよびsink.buffer-flush.intervalパラメーターを 0より大きい値に設定する必要があります。

    sink.buffer-flush.interval

    キャッシュが更新される間隔。

    Duration

    いいえ

    0(無効)

    単位はミリ秒、秒、分、または時間です。 たとえば、'sink.buffer-flush.interval'='1 s' と構成できます。

    シンクテーブルが同じキーに対して多数の更新を受信した場合、キーの最後のデータレコードのみがキャッシュに保持されます。この場合、シンクテーブルでのデータキャッシュは、Kafkaトピックに書き込まれるデータ量を削減するのに役立ちます。これにより、潜在的な墓石メッセージがKafkaトピックに送信されるのを防ぎます。

    説明

    シンクテーブルのデータキャッシュを有効にするには、sink.buffer-flush.max-rowsおよびsink.buffer-flush.intervalパラメーターを 0より大きい値に設定する必要があります。

データインジェスチョン

Upsert Kafkaコネクタを使用して、データインジェスチョンのYAMLドラフトを作成できます。さらに、データインジェスチョンのシンクとして使用できます。データはJSON形式でシンクに書き込まれ、主キーフィールドもメッセージ本文に含まれます。

構文

sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # ApsaraMQ for Kafka
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}

コネクタオプション

オプション

説明

データ型

必須

デフォルト値

備考

type

シンクのコネクタタイプ

STRING

はい

デフォルト値なし

パラメーター値をupsert-kafkaに設定します。

name

シンクのコネクタ名

STRING

いいえ

デフォルト値なし

該当なし

properties.bootstrap.servers

KafkaブローカーのIPアドレスまたはエンドポイントとポート番号。

STRING

はい

デフォルト値なし

形式: host:port,host:port,host:port 。複数の host:port ペアをコンマ (,) で区切ります。

properties.*

Kafkaクライアントに構成されているパラメーター。

STRING

いいえ

デフォルト値なし

このパラメーターの接尾辞は、「Producer Configs」で定義されているルールに準拠する必要があります。

Flink は properties. というプレフィックスを削除し、変換されたキーと値を Kafka クライアントに渡します。たとえば、properties.allow.auto.create.topics を false に設定して、トピックの自動作成を無効にすることができます。

sink.delivery-guarantee

Kafkaシンクの配信セマンティクス

STRING

いいえ

at-least-once

有効な値:

  • none:配信セマンティクスは何も保証しません。データが失われたり、重複したりする可能性があります。

  • at-least-once:at-least-onceセマンティクスは、データが失われないことを保証します。ただし、重複データが存在する可能性があります。これはデフォルト値です。

  • exactly-once:Kafkaトランザクションを使用して、exactly-onceセマンティクスを保証します。これにより、データが失われたり、重複したりすることはありません。

sink.add-tableId-to-header-enabled

テーブル情報をヘッダーに書き込むかどうか。

BOOLEAN

いいえ

false

オプションを true に設定すると、namespace、schemaName、および tableName がヘッダーに書き込まれます。

aliyun.kafka.accessKeyId

Alibaba CloudアカウントのAccessKey ID。

STRING

いいえ

デフォルト値なし

詳細については、「AccessKeyペアを取得する」をご参照ください。

説明

ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。

aliyun.kafka.accessKeySecret

Alibaba CloudアカウントのAccessKeyシークレット。

STRING

いいえ

デフォルト値なし

詳細については、「AccessKeyペアを取得する」をご参照ください。

説明

ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。

aliyun.kafka.instanceId

ApsaraMQ for KafkaインスタンスのID

STRING

いいえ

デフォルト値なし

[インスタンスの詳細] ページでインスタンス ID を表示できます。

説明

ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。

aliyun.kafka.endpoint

ApsaraMQ for Kafkaのエンドポイント。

STRING

いいえ

デフォルト値なし

詳細については、「エンドポイント」をご参照ください。

説明

ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。

aliyun.kafka.regionId

トピックを作成するインスタンスのリージョン ID。

STRING

いいえ

デフォルト値なし

詳細については、「エンドポイント」をご参照ください。

説明

ApsaraMQ for Kafkaにデータを同期する場合は、このパラメーターを構成する必要があります。

サポートされている変更タイプ

Upsert Kafka コネクタは、データ ingestion のすべての変更タイプをサポートしています。 ただし、Upsert Kafka コネクタを使用して Flink SQL ジョブを介して固定スキーマを持つソーステーブルからデータを ingestion する必要があることに注意してください。

サンプルコード

  • ソーステーブルのサンプルコード

    Webサイトユーザーの閲覧データを含むKafkaソーステーブルを作成します。

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND  // viewtime に対するウォーターマークは、viewtime から 2 秒を引いた値です。
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • シンクテーブルのサンプルコード

    • Upsert Kafkaシンクテーブルを作成します。

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • Webサイトユーザーの閲覧データをシンクテーブルに書き込みます。

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCT user_id)
      FROM pageviews
      GROUP BY user_region;
  • データインジェスチョンシンク

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: upsert-kafka
      name: Upsert Kafka Sink
      properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
      aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
      aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
      aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
      aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
      aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${upsert.kafka.topic}

参照