すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:アップサート Kafka

最終更新日:Mar 10, 2026

このトピックでは、アップサート Kafka コネクタの使用方法について説明します。

背景情報

アップサート Kafka コネクタは、アップサート操作を使用して Kafka トピックとの間でデータを読み書きします。

  • ソーステーブルとして、コネクタは Kafka に保存されているデータをチェンジログストリームに変換できます。ストリーム内の各レコードは、更新または削除イベントを表します。データレコードの値は、同じキーの最後の値に対する UPDATE として解釈されます (そのキーが存在する場合)。キーが存在しない場合、更新は INSERT として扱われます。テーブル用語では、チェンジログストリーム内のレコードは UPSERT (INSERT または UPDATE とも呼ばれます) であり、同じキーを持つ既存の行は上書きされます。空の値を持つメッセージは DELETE メッセージとして扱われます。

  • シンクテーブルまたはデータインジェストシンクとして、コネクタはアップストリームジョブによって生成されたチェンジログストリームを消費できます。INSERT または UPDATE_AFTER データは通常の Kafka メッセージとして書き込まれます。DELETE データは空の値を持つ Kafka メッセージとして書き込まれ、これは対応するキーのメッセージが削除されたことを示します。Flink は、プライマリキー列の値に基づいてデータをパーティション分割します。これにより、同じプライマリキーを持つメッセージが順序付けされることが保証されます。結果として、同じプライマリキーの更新または削除メッセージは同じパーティションに書き込まれます。

カテゴリ

説明

サポートされているタイプ

ソーステーブル、シンクテーブル、およびデータインジェストシンク

実行モード

ストリーミングモード

データフォーマット

avro、avro-confluent、csv、json、および raw

特定の監視メトリクス

  • ソーステーブル

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • シンクテーブル

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

API タイプ

SQL およびデータインジェスト YAML ジョブ

シンクテーブルでのデータの更新または削除

はい

前提条件

制限事項

  • Kafka コネクタは、Ververica Runtime (VVR) 2.0.0 以降を使用する Flink のみでサポートされています。

  • コネクタは、Apache Kafka 0.10 以降からの読み取りと書き込みのみをサポートしています。

  • コネクタは、Apache Kafka 2.8 のクライアントパラメーターのみをサポートしています。詳細については、Apache Kafka のコンシューマーおよびプロデューサー構成ドキュメントをご参照ください。

  • アップサート Kafka シンクテーブルが 1 回限りのセマンティクスを使用する場合、送信先 Kafka クラスターでトランザクション機能が有効になっている必要があります。クラスターは Apache Kafka 0.11 以降である必要があります。

  • アップサート Kafka ソーステーブルは、`earliest-offset` 起動モードのみをサポートしています。このモードは構成できません。コネクタは、すべての履歴変更データを読み取り、完全なチェンジログを取得します。これにより、SQL で完全なチェンジログを処理し、パイプライン全体で 1 回限りのセマンティクスを提供できます。タイムスタンプまたは `latest-offset` などの別の起動モードを指定すると、コネクタは不完全なチェンジログを読み取ります。これにより、ダウンストリーム計算でデータ整合性の問題が発生する可能性があります。

SQL

アップサート Kafka コネクタは、アップサート操作を使用して Kafka トピックとの間でデータを読み書きします。

構文

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

WITH パラメーター

  • 一般

    パラメーター

    説明

    データ型

    必須

    デフォルト

    備考

    connector

    テーブルのタイプ。

    String

    はい

    なし

    値を `upsert-kafka` に設定します。

    properties.bootstrap.servers

    Kafka ブローカーのアドレス。

    String

    はい

    なし

    フォーマットは host:port,host:port,host:port です。複数のアドレスをカンマ (,) で区切ります。

    properties.*

    Kafka クライアントのパラメーター。

    String

    いいえ

    なし

    サフィックスは、公式 Kafka ドキュメントに記載の、プロデューサーおよびコンシューマー用の構成でなければなりません。

    Flink は `properties.` プレフィックスを削除し、残りの構成を Kafka クライアントに渡します。たとえば、'properties.allow.auto.create.topics' = 'false' を使用して、自動トピック作成を無効にできます。

    Kafka コネクタがそれらを上書きするため、次の構成をこの方法で変更しないでください。

    • key.deserializer

    • value.deserializer

    key.format

    Kafka メッセージのキー部分のフォーマット。

    String

    はい

    なし

    このパラメーターを構成する場合は、key.fields または key.fields-prefix も構成する必要があります。

    有効な値:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    Kafka メッセージ内のすべてのキーフィールドのカスタムプレフィックス。これにより、値フォーマット内のフィールドとの名前衝突を回避します。

    String

    いいえ

    なし

    このパラメーターは、ソーステーブルとシンクテーブル間の列名を区別するためにのみ使用されます。Kafka メッセージのキー部分が解析および生成されるときに、プレフィックスは削除されます。

    説明

    このパラメーターを構成する場合は、value.fields-include を `EXCEPT_KEY` に設定する必要があります。

    value.format

    Kafka メッセージの値部分のフォーマット。

    String

    はい

    なし

    このパラメーターは format と同等です。どちらか一方のみを構成できます。formatvalue.format の両方を構成すると競合が発生します。

    value.fields-include

    Kafka メッセージの値部分が解析または生成されるときに、メッセージキーに対応するフィールドを含めるかどうかを指定します。

    String

    はい

    ALL

    有効な値:

    • ALL (デフォルト): すべての列が Kafka メッセージの値部分として処理されます。

    • EXCEPT_KEY: `key.fields` で定義されたフィールドを除くすべての列が Kafka メッセージの値部分として処理されます。

    topic

    読み取りまたは書き込みを行うトピック名。

    String

    はい

    なし

    なし。

  • シンク固有のパラメーター

    パラメーター

    説明

    データ型

    必須

    デフォルト

    備考

    sink.parallelism

    Kafka シンク演算子の同時実行数。

    Integer

    いいえ

    アップストリーム演算子の同時実行数。これはフレームワークによって決定されます。

    なし。

    sink.buffer-flush.max-rows

    キャッシュがフラッシュされる前にキャッシュできる最大レコード数。

    Integer

    いいえ

    0 (無効)

    シンクテーブルが同じキーに対して多くの更新を受信する場合、キャッシュはそのキーの最後のレコードのみを保持します。これにより、Kafka トピックに送信されるデータ量を削減し、潜在的なトムストーンメッセージを回避できます。

    説明

    シンクキャッシュを有効にするには、sink.buffer-flush.max-rowssink.buffer-flush.interval の両方をゼロより大きい値に設定します。

    sink.buffer-flush.interval

    キャッシュがフラッシュされる間隔。

    Duration

    いいえ

    0 (無効)

    単位はミリ秒 (ms)、秒 (s)、分 (min)、または時間 (h) です。例: 'sink.buffer-flush.interval'='1 s'

    シンクテーブルが同じキーに対して多くの更新を受信する場合、キャッシュはそのキーの最後のレコードのみを保持します。これにより、Kafka トピックに送信されるデータ量を削減し、潜在的なトムストーンメッセージを回避できます。

    説明

    シンクキャッシュを有効にするには、sink.buffer-flush.max-rowssink.buffer-flush.interval の両方をゼロより大きい値に設定します。

データインジェスト

アップサート Kafka コネクタは、YAML データインジェストジョブのシンクとして使用できます。データは JSON フォーマットで書き込まれ、プライマリキーフィールドもメッセージ本文に含まれます。

構文

sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # ApsaraMQ for Kafka
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}

パラメーター

パラメーター

説明

データ型

必須

デフォルト

備考

type

シンクのタイプ。

STRING

はい

なし

値を `upsert-kafka` に設定します。

name

シンク名。

STRING

いいえ

なし

なし。

properties.bootstrap.servers

Kafka ブローカーのアドレス。

STRING

はい

なし

フォーマットは host:port,host:port,host:port です。複数のアドレスをカンマ (,) で区切ります。

properties.*

Kafka クライアントのパラメーター。

STRING

いいえ

なし

サフィックスは、公式 Kafka ドキュメントのプロデューサーで定義されている構成である必要があります。

Flink は `properties.` プレフィックスを削除し、残りの構成を Kafka クライアントに渡します。たとえば、'properties.allow.auto.create.topics' = 'false' を使用して、自動トピック作成を無効にできます。

sink.delivery-guarantee

書き込み操作のセマンティックパターン。

STRING

いいえ

at-least-once

有効な値:

  • none: 保証なし。データが失われたり、重複したりする可能性があります。

  • at-least-once (デフォルト): データが失われないことを保証しますが、データが重複する可能性があります。

  • exactly-once: Kafka トランザクションを使用して、データが失われたり重複したりしないことを保証します。

sink.add-tableId-to-header-enabled

テーブル情報をヘッダーに書き込むかどうかを指定します。

BOOLEAN

いいえ

false

有効にすると、`namespace`、`schemaName`、および `tableName` がヘッダーに書き込まれます。

aliyun.kafka.accessKeyId

ご利用の Alibaba Cloud アカウントの AccessKey ID。

STRING

いいえ

なし

詳細については、「AccessKey ペアの作成」をご参照ください。

説明

ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。

aliyun.kafka.accessKeySecret

ご利用の Alibaba Cloud アカウントの AccessKey Secret。

STRING

いいえ

なし

詳細については、「AccessKey ペアの作成」をご参照ください。

説明

ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。

aliyun.kafka.instanceId

ApsaraMQ for Kafka インスタンスの ID。

STRING

いいえ

なし

Alibaba Cloud Kafka インターフェイスでインスタンスの詳細を表示できます。

説明

ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。

aliyun.kafka.endpoint

ApsaraMQ for Kafka の API エンドポイント。

STRING

いいえ

なし

詳細については、「エンドポイント」をご参照ください。

説明

ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。

aliyun.kafka.regionId

トピックが存在するインスタンスのリージョン ID。

STRING

いいえ

なし

詳細については、「エンドポイント」をご参照ください。

説明

ApsaraMQ for Kafka にデータを同期するときに、このパラメーターを構成します。

サポートされているタイプ変更

データインジェスト用のアップサート Kafka コネクタは、すべてのタイプの変更操作をサポートしています。ただし、データを読み取るには、固定スキーマを持つ Flink アップサート Kafka SQL コネクタを使用する必要があります。

  • ソーステーブル

    ウェブサイトユーザーの閲覧データを含む Kafka ソーステーブルを作成します。

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • シンクテーブル

    • アップサート Kafka シンクテーブルを作成します。

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • ウェブサイトユーザーの閲覧データをシンクテーブルに書き込みます。

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCTuser_id)
      FROM pageviews
      GROUP BY user_region;
  • データインジェストシンク

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: upsert-kafka
      name: Upsert Kafka Sink
      properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
      aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
      aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
      aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
      aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
      aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${upsert.kafka.topic}

ベストプラクティス