すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Message Queue for Apache Kafka

最終更新日:Mar 22, 2026

このトピックでは、Message Queue for Apache Kafka コネクタについて説明します。

背景情報

Apache Kafka は、ビッグデータ分野で広く利用されるオープンソースの分散型メッセージキューであり、高性能なデータ処理、ストリーミング分析、およびデータ統合に使用されます。Realtime Compute for Apache Flink 向けの Realtime Compute for Apache Flink の Kafka コネクタは、オープンソースの Apache Kafka クライアントを活用し、高いデータスループットを実現するとともに、複数のデータ形式の読み書きと「1 回限りのセマンティクス」をサポートします。

カテゴリ

説明

対応タイプ

ソーステーブル、シンクテーブル、およびデータ取り込みシンク

実行モード

ストリーミングモード

データ形式

対応するデータ形式

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

説明
  • 組み込みの Protobuf データ形式は、Ververica Runtime (VVR) 8.0.9 以降でのみサポートされます。

  • 各対応データ形式には、WITH 句で指定可能な対応パラメーターがあります。詳細については、「フォーマット」をご参照ください。

メトリック

メトリック

  • ソーステーブル

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • 結果テーブル

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

説明

メトリックの詳細については、「モニタリングメトリック」をご参照ください。

API タイプ

SQL API、DataStream API、およびデータインジェスト YAML API

結果テーブルへのデータの更新または削除

このコネクタは、結果テーブルへのデータ挿入のみをサポートしています。更新および削除はサポートされていません。

説明

結果テーブルへのデータの更新または削除方法の詳細については、「Upsert Kafka」をご参照ください。

前提条件

ご使用の Kafka クラスターのタイプに応じて、以下の前提条件を満たしてください。

  • ApsaraMQ for Kafka クラスターへの接続

    • Kafka クラスターのバージョンが 0.11 以降である必要があります。

    • ApsaraMQ for Kafka クラスターを作成済みである必要があります。詳細については、「手順 3:リソースの作成」をご参照ください。

    • Flink ワークスペースと Kafka クラスターが同一の Virtual Private Cloud (VPC) 内にあり、Flink ワークスペースの CIDR ブロックが ApsaraMQ for Kafka のホワイトリストに追加済みである必要があります。詳細については、「ホワイトリストの設定」をご参照ください。

    重要

    ApsaraMQ for Kafka へのデータ書き込みに関する制限事項:

    • ApsaraMQ for Kafka は、Zstandard (zstd) 圧縮形式でのデータ書き込みをサポートしていません。

    • ApsaraMQ for Kafka は、べき等性またはトランザクションによる書き込みをサポートしておらず、Kafka 結果テーブルが提供する「1 回限りのセマンティクス」を利用できません。Realtime Compute Engine VVR 8.0.0 以降では、Kafka コネクタが Kafka クライアント 3.x を使用しており、properties.enable.idempotence プロパティのデフォルト値は true です。したがって、Realtime Compute Engine VVR 8.0.0 以降を使用して ApsaraMQ for Kafka へ書き込む場合、書き込み失敗を防ぐために、結果テーブル定義に properties.enable.idempotence=false という構成を必ず追加してください。ApsaraMQ for Kafka のストレージエンジンおよび機能制限の比較については、「ストレージエンジンの比較」をご参照ください。

  • 自己管理型 Apache Kafka クラスターへの接続

    • 自己管理型 Apache Kafka クラスターのバージョンが 0.11 以降である必要があります。

    • Flink ワークスペースと自己管理型 Apache Kafka クラスター間のネットワーク接続が確立されている必要があります。パブリックインターネット経由でのクラスター接続方法の詳細については、「ネットワーク接続に関する FAQ」をご参照ください。

    • Apache Kafka バージョン 2.8 のクライアント構成オプションのみがサポートされています。詳細については、Apache Kafka のConsumer Configs およびProducer Configs のドキュメントをご参照ください。

注意事項

トランザクションによる書き込みは推奨されません。これは、Apache Flink および Apache Kafka の既知の設計上の制限によるものです。sink.delivery-guarantee = 'exactly-once' を設定すると、Kafka コネクタはトランザクションによる書き込みを有効化しますが、以下のような既知の問題があります。

  • 各チェックポイントごとに新しいトランザクション ID が生成されます。チェックポイント間隔が短すぎると、過剰な数のトランザクション ID が作成されます。これにより、Apache Kafka クラスター内のコーディネーターのメモリが不足し、クラスターの安定性が損なわれる可能性があります。

  • 各トランザクションごとに Producer インスタンスが作成されます。同時に多数のトランザクションがコミットされると、TaskManager のメモリが不足し、Apache Flink ジョブの安定性が損なわれる可能性があります。

  • 複数の Apache Flink ジョブが同じ sink.transactional-id-prefix を使用している場合、生成されたトランザクション ID が競合する可能性があります。あるジョブでの書き込み操作が失敗すると、Apache Kafka パーティションのログ開始オフセット (LSO) の進行が妨げられ、そのパーティションのすべてのコンシューマーに影響を与えます。

「1 回限りのセマンティクス」が必要な場合は、主キーを持つテーブルに書き込むために Upsert Kafka コネクタ を使用し、べき等性を保証してください。トランザクションによる書き込みを必須とする場合は、「1 回限りのセマンティクスの使用上の注意事項」をご参照ください。

ネットワーク接続のトラブルシューティング

Realtime Compute for Apache Flink ジョブの起動時に Timed out waiting for a node assignment エラーが発生した場合、これは通常、Realtime Compute for Apache Flink と Kafka クラスター間のネットワーク接続に問題があることを示しています。

Kafka クライアントは、以下のようにブローカーに接続します。

  1. クライアントは、bootstrap.servers で指定されたアドレスを使用して、Kafka クラスターへの初期接続を確立します。

  2. Kafka クラスターは、各ブローカーのエンドポイントを含むメタデータを返します。

  3. クライアントは、これらのエンドポイントを使用してブローカーに接続し、データの読み取りまたは書き込みを行います。

bootstrap.servers のアドレスに到達可能であっても、Kafka が不正なブローカーエンドポイントを返すと、クライアントはデータの読み取りまたは書き込みができなくなります。この問題は、プロキシ、ポートフォワーディング、または専用回線を使用するネットワークアーキテクチャでよく発生します。

トラブルシューティング手順

Message Queue for Kafka

  1. エンドポイントタイプの確認

    • デフォルトエンドポイント(内部ネットワーク)

    • SASL エンドポイント(認証付き内部ネットワーク)

    • パブリックエンドポイント(別途申請が必要)

    ネットワークプローブ 機能を Realtime Compute for Apache Flink 開発コンソールで使用し、bootstrap.servers アドレスとの接続性の問題を除外してください。

  2. セキュリティグループおよびホワイトリストの確認

    Realtime Compute for Apache Flink ワークスペースの CIDR ブロックを Kafka インスタンスのホワイトリストに追加してください。詳細については、「VPC CIDR ブロックの表示」および「ホワイトリストの設定」をご参照ください。

  3. SASL 構成の確認(有効な場合)

    SASL_SSL エンドポイントを使用する場合、Realtime Compute for Apache Flink ジョブで JAAS、SSL、および SASL メカニズムが正しく構成されていることを確認してください。認証情報が不足していると、ハンドシェイク段階で接続が失敗し、タイムアウトとして表示されることがあります。詳細については、「セキュリティおよび認証」をご参照ください。

自己管理型 Kafka

  1. Network Probe」機能を使用します。

    この機能により、bootstrap.servers アドレスとの接続性の問題を除外し、正しい内部またはパブリックエンドポイントが使用されていることを確認できます。

  2. セキュリティグループおよびホワイトリストの確認

    • Elastic Compute Service (ECS) インスタンスのセキュリティグループが、Kafka エンドポイントのポート(通常は 9092 または 9093)に対するインバウンドトラフィックを許可している必要があります。

    • ECS インスタンスのファイアウォールが、Realtime Compute for Apache Flink ワークスペースの VPC からのトラフィックを許可していることを確認してください。詳細については、「VPC CIDR ブロックの表示」をご参照ください。

  3. 構成の確認

    1. zkCli.sh または zookeeper-shell.sh ツールを使用して、Kafka が使用する ZooKeeper クラスターにログインします。

    2. ブローカーメタデータを取得するコマンドを実行します。たとえば、get /brokers/ids/0 を実行します。応答の endpoints フィールドに、Kafka がクライアントに通知するアドレスが表示されます。

      example

    3. Realtime Compute for Apache Flink 開発コンソールの ネットワークプローブ 機能を使用して、このアドレスへのアクセスが可能かどうかをテストします。

      説明
      • このアドレスにアクセスできない場合、Kafka 管理者に連絡し、listeners および advertised.listeners 構成を確認・修正して、Realtime Compute for Apache Flink からアクセス可能なアドレスが通知されるようにしてください。

      • Kafka クライアント接続の詳細については、「接続性のトラブルシューティング」をご参照ください。

  4. SASL 構成の確認(有効な場合)

    SASL_SSL エンドポイントを使用する場合、Realtime Compute for Apache Flink ジョブで JAAS、SSL、および SASL メカニズムが正しく構成されていることを確認してください。認証情報が不足していると、ハンドシェイク段階で接続が失敗し、タイムアウトとして表示されることがあります。詳細については、「セキュリティおよび認証」をご参照ください。

SQL

Kafka コネクタは、SQL ジョブにおけるソーステーブルまたは結果テーブルとして使用できます。

構文

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

メタデータ列

ソーステーブルまたは結果テーブルでメタデータ列を定義することで、Kafka メッセージのメタデータにアクセスまたは書き込みできます。たとえば、WITH 句で複数のトピックを定義する場合、ソーステーブルのメタデータ列を使用して、各レコードのソーストピックを識別できます。次のコードはその例です。

CREATE TABLE kafka_source (
  -- メッセージトピックを `record_topic` 列として読み取る
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- ConsumerRecord からタイムスタンプを `ts` 列として読み取る
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- メッセージオフセットを `record_offset` 列として読み取る
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- `ts` 列からタイムスタンプを ProducerRecord のタイムスタンプとして Kafka に書き込む
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

次の表は、Kafka ソースおよび結果テーブルでサポートされるメタデータ列を示しています。

キー

タイプ

説明

範囲

topic

STRING NOT NULL METADATA VIRTUAL

メッセージのトピック。

ソーステーブル

partition

INT NOT NULL METADATA VIRTUAL

メッセージのパーティション ID。

ソーステーブル

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

メッセージヘッダー。

ソーステーブルおよび結果テーブル

leader-epoch

INT NOT NULL METADATA VIRTUAL

メッセージの leader-epoch。

ソーステーブル

offset

BIGINT NOT NULL METADATA VIRTUAL

メッセージオフセット。

ソーステーブル

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

メッセージのタイムスタンプ。

ソーステーブルおよび結果テーブル

timestamp-type

STRING NOT NULL METADATA VIRTUAL

メッセージのタイムスタンプタイプ。有効な値は以下のとおりです。

  • NoTimestampType: メッセージにタイムスタンプが定義されていません。

  • CreateTime: メッセージが作成された時刻。

  • LogAppendTime: メッセージが Kafka ブローカーのログに追加された時刻。

ソーステーブル

__raw_key__

STRING NOT NULL METADATA VIRTUAL

生のメッセージキー。

ソーステーブルおよび結果テーブル

説明

このパラメーターは、Ververica Runtime (VVR) 11.4 以降でのみサポートされます。

__raw_value__

STRING NOT NULL METADATA VIRTUAL

生のメッセージ値。

ソーステーブルおよび結果テーブル

説明

このパラメーターは、Ververica Runtime (VVR) 11.4 以降でのみサポートされます。

WITH パラメーター

  • 一般

    パラメーター

    説明

    タイプ

    必須

    デフォルト

    備考

    connector

    使用するコネクタ。

    String

    はい

    該当なし

    値は kafka でなければなりません。

    properties.bootstrap.servers

    Kafka ブローカーのアドレス一覧。

    String

    はい

    該当なし

    書式: host:port,host:port,...。アドレスはカンマ (,) で区切ります。

    properties.*

    Kafka クライアントの追加プロパティ。

    String

    いいえ

    該当なし

    プロパティキーは、公式 Apache Kafka ドキュメントのProducer Configs およびConsumer Configs で定義された有効なオプションである必要があります。

    Realtime Compute for Apache Flink は、properties. プレフィックスを削除し、残りのキーと値のペアを基盤となる Kafka クライアントに渡します。たとえば、自動トピック作成を無効化するために 'properties.allow.auto.create.topics' = 'false' を設定できます。

    以下のオプションは Kafka コネクタによって上書きされるため、この方法では構成できません。

    • key.deserializer

    • value.deserializer

    format

    Kafka メッセージの値をシリアル化および逆シリアル化するフォーマット。

    String

    いいえ

    該当なし

    対応フォーマット:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    説明

    詳細については、「フォーマットオプション」をご参照ください。

    key.format

    Kafka メッセージのキーをシリアル化および逆シリアル化するフォーマット。

    String

    いいえ

    該当なし

    対応フォーマット:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    説明

    この構成を使用する場合、key.options の指定が必須です。

    key.fields

    テーブルスキーマから Kafka メッセージのキーとして使用するフィールド。

    String

    いいえ

    該当なし

    複数のフィールド名はセミコロン (;) で区切ります。たとえば、'field1;field2' のようにします。

    key.fields-prefix

    キーと値のフィールド間で名前が重複しないよう、すべてのキー用フィールドに付与するカスタムプレフィックス。

    String

    いいえ

    該当なし

    このプレフィックスは、キーと値のフィールドを区別するために使用されます。キーのシリアル化前または逆シリアル化後に削除されます。

    説明

    このオプションを使用する場合、value.fields-includeEXCEPT_KEY に設定する必要があります。

    value.format

    Kafka メッセージの値をシリアル化および逆シリアル化するフォーマット。

    String

    いいえ

    該当なし

    この構成は format と同等です。ただし、formatvalue.format の両方を設定することはできません。両方が構成されている場合、value.formatformat を上書きします。

    value.fields-include

    キー用フィールドが値フォーマットに含まれるかどうかを定義します。

    String

    いいえ

    ALL

    有効な値:

    • ALL: Kafka メッセージの値には、すべてのテーブル列が含まれます。

    • EXCEPT_KEY: Kafka メッセージの値には、key.fields で定義された列を除くすべてのテーブル列が含まれます。

  • ソーステーブル

    パラメーター

    説明

    タイプ

    必須

    デフォルト

    備考

    topic

    読み取るトピックまたはトピック一覧。

    String

    いいえ

    該当なし

    複数のトピックをサブスクライブするには、トピック名をセミコロン (;) で区切ります。たとえば、'topic-1;topic-2' のようにします。

    説明

    このオプションと topic-pattern のどちらか一方のみを指定できます。両方を指定することはできません。

    topic-pattern

    サブスクライブするトピックを定義する正規表現。コンシューマーは、このパターンに一致する名前のすべてのトピックをサブスクライブします。

    String

    いいえ

    該当なし

    説明

    このオプションと topic のどちらか一方のみを指定できます。両方を指定することはできません。

    properties.group.id

    Kafka ソースのコンシューマーグループ ID。

    String

    いいえ

    KafkaSource-{ソーステーブル名}

    初めてコンシューマーグループ ID を使用する場合、初期スタートオフセットを定義するために、properties.auto.offset.resetearliest または latest のいずれかに設定する必要があります。

    scan.startup.mode

    Kafka コンシューマーのスタートオフセット。

    String

    いいえ

    group-offsets

    有効な値:

    • earliest-offset: 最も古い利用可能なオフセットから読み取りを開始します。

    • latest-offset: 最新のオフセットから読み取りを開始します。

    • group-offsets: 指定された properties.group.id のコミット済みオフセットから読み取りを開始します。

    • timestamp: 指定された scan.startup.timestamp-millis から読み取りを開始します。

    • specific-offsets: scan.startup.specific-offsets で指定されたオフセットから読み取りを開始します。

    説明

    このオプションは、ジョブがクリーンな状態から開始される場合にのみ有効です。チェックポイントからジョブを再開する場合、常にチェックポイント状態に保存されたオフセットから再開されます。

    scan.startup.specific-offsets

    scan.startup.modespecific-offsets の場合の各パーティションのスタートオフセット。

    String

    いいえ

    該当なし

    たとえば、partition:0,offset:42;partition:1,offset:300 のようにします。

    scan.startup.timestamp-millis

    scan.startup.modetimestamp に設定されている場合の、スタートタイムスタンプ(ミリ秒単位)。

    Long

    いいえ

    該当なし

    単位はミリ秒です。

    scan.topic-partition-discovery.interval

    トピック内の新しいパーティションを検出する間隔。

    Duration

    いいえ

    5 分

    コネクタは定期的に新しいパーティションを検出し、そこから読み取りを行います。topic-pattern を使用する場合、コネクタはパターンに一致する新しいトピックも検出します。この機能を無効にするには、間隔を非正の値に設定します。

    説明

    Ververica Runtime (VVR) 6.0.x では、動的パーティション検出はデフォルトで無効になっています。VVR 8.0 以降では、この機能はデフォルトで有効になっており、検出間隔は 5 分です。

    scan.header-filter

    Kafka メッセージヘッダーに基づいてメッセージをフィルター処理します。

    String

    いいえ

    該当なし

    ヘッダーのキーと値はコロン (:) で区切られます。複数のヘッダー条件は論理演算子 (& および |) を使用して結合されます。NOT 論理演算子 (!) もサポートされています。たとえば、depart:toy|depart:book&!env:test は、ヘッダーに depart=toy または depart=book が含まれ、かつ env=test が含まれていない場合に Kafka データを保持します。

    説明
    • このオプションは、Ververica Runtime (VVR) 8.0.6 以降でのみサポートされます。

    • 式内の括弧はサポートされていません。

    • 論理演算は左から右に評価されます。

    • ヘッダーの値は UTF-8 文字列に変換されて比較されます。

    scan.check.duplicated.group.id

    properties.group.id を使用する別のアクティブなコンシューマーが存在するかどうかを確認します。

    Boolean

    いいえ

    false

    有効な値:

    • true: ジョブの開始前に、重複するコンシューマーグループを確認します。重複が見つかった場合、ジョブは失敗して競合を防止します。

    • false: 競合の確認を行わず、ジョブを開始します。

    説明

    このオプションは、Ververica Runtime (VVR) 6.0.4 以降でのみサポートされます。

  • シンクテーブル

    パラメーター

    説明

    タイプ

    必須

    デフォルト

    備考

    topic

    書き込むトピック。

    String

    はい

    該当なし

    該当なし

    sink.partitioner

    並列 sink インスタンスから Kafka パーティションへのレコードのマッピング方法を定義します。

    String

    いいえ

    default

    有効な値:

    • default: Kafka のデフォルトパーティショナーを使用します。

    • fixed: 各並列 sink インスタンスが固定の Kafka パーティションに書き込みます。

    • round-robin: レコードはラウンドロビン方式でパーティションに配布されます。

    • カスタムパーティショナー:カスタムパーティショナーを使用するには、FlinkKafkaPartitioner サブクラスの完全修飾クラス名(例:org.mycompany.MyPartitioner)を指定します。

    sink.delivery-guarantee

    sink の配信保証レベル。

    String

    いいえ

    at-least-once

    有効な値:

    • none: 保証は提供されません。レコードが失われるか、重複する可能性があります。

    • at-least-once: レコードが失われることはありませんが、重複する可能性があります。

    • exactly-once: Kafka トランザクションを使用して「1 回限りのセマンティクス」を提供し、レコードが失われず、重複しないことを保証します。

    説明

    exactly-once セマンティクスを使用する場合、sink.transactional-id-prefix も指定する必要があります。

    sink.transactional-id-prefix

    sink.delivery-guaranteeexactly-once の場合に必要なトランザクション ID のプレフィックス。

    String

    はい。sink.delivery-guaranteeexactly-once の場合です。

    該当なし

    sink.delivery-guaranteeexactly-once に設定した場合にのみ必須です。

    sink.parallelism

    sink オペレーターの並列度。

    Integer

    いいえ

    該当なし

    デフォルトでは、フレームワークが上流のオペレーターに基づいて並列度を決定します。

セキュリティおよび認証

Kafka クラスターが安全な接続または認証を必要とする場合、関連するセキュリティおよび認証構成に properties. をプレフィックスとして付け、WITH パラメーター内で設定します。PLAINT を SASL メカニズムとして使用し、JAAS 構成を提供する Kafka テーブルの構成例を以下に示します。

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

次の例は、セキュリティプロトコルとして SASL_SSL を、SASL メカニズムとして SCRAM-SHA-256 を使用する方法を示しています。

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /* SSL 構成 */
  /* サーバーの CA 証明書の truststore のパス。 */
  /* Artifacts を使用してアップロードされたファイルは /flink/usrlib/ ディレクトリに格納されます。 */
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /* クライアント認証が必要な場合、keystore(秘密鍵)のパスも構成する必要があります。 */
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /* サーバーのホスト名を検証するために使用されるアルゴリズム。空文字列はホスト名検証を無効化します。 */
  'properties.ssl.endpoint.identification.algorithm' = '',
  /* SASL 構成 */
  /* SASL メカニズムを SCRAM-SHA-256 に設定します。 */
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /* JAAS を構成します。 */
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

Real-time Compute コンソールの Artifacts 機能を使用して、上記の例で言及されている CA 証明書および秘密鍵をアップロードできます。アップロードされたファイルは /flink/usrlib ディレクトリに格納されます。my-truststore.jks という名前の CA 証明書ファイルを使用する場合、WITH 句の 'properties.ssl.truststore.location' プロパティを、以下のいずれかの方法で設定できます。

  • 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' を設定します。この方法では、Object Storage Service (OSS) から実行時にファイルを動的にダウンロードする必要がなくなりますが、デバッグモードはサポートされません。

  • Realtime Compute エンジンのバージョンが VVR 11.5 以降の場合、properties.ssl.truststore.location および properties.ssl.keystore.location を絶対 OSS パスに構成できます。ファイルパスの書式は oss://flink-fullymanaged-<Workspace ID>/artifacts/namespaces/<Namespace name>/<file name> です。この方法では、Flink 実行時に OSS ファイルを動的にダウンロードし、デバッグモードをサポートします。

説明
  • 構成の確認: このトピックの例は一般的な構成を示しています。Kafka コネクタを構成する前に、Kafka の運用・保守チームに連絡して、正しいセキュリティおよび認証設定を取得してください。

  • エスケープ: ネイティブの Apache Flink とは異なり、Realtime Compute for Apache Flink SQL エディターでは、二重引用符 (") がデフォルトでエスケープされます。したがって、properties.sasl.jaas.config オプションでユーザー名およびパスワードに使用される二重引用符をエスケープするために、バックスラッシュ (\) を追加する必要はありません。

ソーステーブルのスタートオフセット

スタートモード

scan.startup.mode オプションを構成することで、Kafka ソーステーブルがデータの読み取りを開始するオフセットを指定できます。有効な値は以下のとおりです。

  • earliest-offset: 最も古いオフセットから読み取りを開始します。

  • latest-offset: 最新のオフセットから読み取りを開始します。

  • group-offsets: 指定された properties.group.id のコンシューマーグループのコミット済みオフセットから読み取りを開始します。

  • timestamp: scan.startup.timestamp-millis で指定された値以上である最初のメッセージのタイムスタンプから読み取りを開始します。

  • specific-offsets: scan.startup.specific-offsets で指定された特定のパーティションオフセットから読み取りを開始します。

説明
  • スタートモードを指定しない場合、デフォルトは「group-offsets」です。

  • scan.startup.mode オプションは、ステートレスジョブにのみ適用されます。ステートフルジョブを開始する場合、常にその状態に保存されたオフセットから消費されます。

次のコードはその例です。

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- 最も古いオフセットから消費します。
  'scan.startup.mode' = 'earliest-offset',
  -- 最新のオフセットから消費します。
  'scan.startup.mode' = 'latest-offset',
  -- 「my-group」というコンシューマーグループのコミット済みオフセットから消費します。
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- 「my-group」を初めて使用する場合、最も古いオフセットから消費を開始します。
  'properties.auto.offset.reset' = 'latest', -- 「my-group」を初めて使用する場合、最新のオフセットから消費を開始します。
  -- 指定されたタイムスタンプ(ミリ秒単位):1655395200000 から消費します。
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- 特定のオフセットから消費します。
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

スタートオフセットの優先順位

ソーステーブルのスタートオフセットは、以下の優先順位(高から低)に基づいて決定されます。

優先順位(高から低)

チェックポイントまたはセーブポイントに保存されたオフセット。

リアルタイムコンピュートコンソールでジョブ起動時に選択されたスタート時刻。

WITH 句の scan.startup.mode で指定されたスタートオフセット。

scan.startup.mode が指定されていない場合、group-offsets を使用して、対応するコンシューマーグループのオフセットから消費を開始します。

これらの手順のいずれかで決定されたオフセットが無効な場合(たとえば、期限切れになっているか、Kafka クラスターで問題が発生している場合)、システムは properties.auto.offset.reset で指定されたポリシーに従ってオフセットをリセットします。このオプションが構成されていない場合、システムはユーザーによる介入を必要とする例外をスローします。

一般的なシナリオとして、新しいコンシューマーグループ ID を使用してデータを消費する場合があります。ソーステーブルはまず、Kafka クラスターに対してそのグループのコミット済みオフセットを照会します。新しいグループ ID のため、有効なオフセットは見つかりません。その結果、システムは properties.auto.offset.reset で指定されたポリシーに従ってオフセットをリセットします。したがって、新しいグループ ID を使用して消費する場合、properties.auto.offset.reset オプションを構成する必要があります。

ソースオフセットのコミット

Kafka ソーステーブルは、成功したチェックポイント後にのみ、現在のコンシューマーオフセットを Kafka クラスターにコミットします。チェックポイント間隔が長い場合、Kafka クラスター内のコンシューマーオフセットが遅延します。チェックポイント中、Kafka ソーステーブルは現在の読み取り進行状況をその状態に保存します。システムはこの状態を使用して障害復旧を行い、Kafka クラスターにコミットされたオフセットには依存しません。オフセットは、Kafka 内での読み取り進行状況の監視のみを目的としてコミットされます。オフセットのコミット失敗は、データの正確性には影響しません。

カスタム sink パーティショナー

Kafka の組み込みパーティショニング戦略が要件を満たさない場合、FlinkKafkaPartitioner クラスを拡張することで、カスタムパーティショナーを実装できます。開発が完了したら、コードを JAR パッケージにコンパイルし、Realtime Compute コンソールの Artifacts 機能を使用してアップロードします。JAR パッケージがアップロードおよび参照された後、WITH 句の sink.partitioner パラメーターを、パーティショナーの完全修飾クラス名(例:org.mycompany.MyPartitioner)に設定します。

Kafka、Upsert Kafka、および Kafka JSON カタログ

Kafka は、データの更新または削除をサポートしない追加専用のメッセージキューです。ストリーミング SQL では、標準の Kafka 結果テーブルは、上流の Change Data Capture (CDC) データや集計および結合などのオペレーターの再tractionロジックを処理できません。変更または再tractionを含むデータを書き込む必要がある場合は、Upsert Kafka 結果テーブルを使用します。

1 つ以上の上流データベーステーブルから Kafka への Change Data Capture (CDC) データのバッチ同期を簡素化するために、Kafka JSON カタログを使用できます。Kafka に格納されたデータが JSON 形式の場合、Kafka JSON カタログを使用すると、スキーマおよび WITH パラメーターの定義を省略できます。詳細については、「Kafka JSON カタログの管理」をご参照ください。

例 1:Kafka からの読み取りおよび Kafka への書き込み

この例では、ソース Kafka トピックからデータを読み取り、結果トピックに書き込みます。データは CSV 形式です。

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

例 2:テーブルスキーマおよびデータの同期

Kafka コネクタを使用して、Kafka トピックからのメッセージを Hologres にリアルタイムで同期できます。フェールオーバー時に Hologres で重複メッセージが発生しないようにするには、Kafka メッセージのオフセットおよびパーティション ID を複合プライマリキーとして使用できます。

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- オプション:すべての入れ子になった列をフラット化します。
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

例 3:Kafka キーおよび値の同期

Kafka メッセージキーに関連する情報が含まれている場合、キーと値の両方を同期できます。

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
説明

Kafka メッセージキーは、スキーマ進化または自動型解析をサポートしていません。スキーマは手動で宣言する必要があります。

例 4:データの同期および計算の実行

Kafka から Hologres へのデータ同期には、多くの場合、軽量な計算が必要です。

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- COALESCE を使用して null 値を処理します。

例 5:入れ子になった JSON の解析

以下のサンプル JSON メッセージを示します。

{
  "id": 101,
  "name": "VVP",
  "properties": {
    "owner": "Alibaba Cloud",
    "engine": "Flink"
  }
}

JSON_VALUE(payload, '$.properties.owner') のような関数を使用してフィールドを解析する代わりに、ソース DDL で直接構造を定義できます。

CREATE TEMPORARY TABLE kafka_source (
  id          VARCHAR,
  `name`      VARCHAR,
  properties  ROW<`owner` STRING, engine STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxx',
  'properties.bootstrap.servers' = 'xxx',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

このようにすることで、Flink は読み取りフェーズで JSON を一度に構造化されたフィールドに解析します。その後の SQL クエリでは、properties.owner を直接使用でき、追加の関数呼び出しを必要としないため、全体的なパフォーマンスが向上します。

DataStream API

重要

DataStream API を使用してデータを読み書きするには、DataStream コネクタ を使用して Realtime Compute for Apache Flink に接続します。DataStream コネクタ の設定方法の詳細については、「DataStream コネクタの統合」をご参照ください。

  • Kafka ソースの構築

    Kafka ソース は、Kafka ソース インスタンスを作成するためのビルダークラスを提供します。次のサンプルコードは、input-topic トピック の最も古い オフセット からデータを消費する Kafka ソース を構築します。コンシューマーグループmy-group で、Kafka メッセージ は文字列として逆シリアル化されます。

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    Kafka ソース を構築するには、以下のプロパティを指定する必要があります。

    パラメーター

    説明

    BootstrapServers

    Kafka ブローカーのアドレス一覧。このプロパティは setBootstrapServers(String) メソッドを呼び出して設定します。

    GroupId

    コンシューマーグループ の ID。このプロパティは setGroupId(String) メソッドを呼び出して設定します。

    Topics または Partitions

    サブスクライブ するトピックまたはパーティション。この Kafka ソース は、トピックまたはパーティションを サブスクライブ するための以下の 3 つの方法をサポートしています。

    • リスト内のトピックのすべてのパーティションをサブスクライブします。

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • トピックパターン:サブスクライブ するトピックの名前が指定された正規表現に一致するすべてのパーティションをサブスクライブします。

      KafkaSource.builder().setTopicPattern("topic.*")
    • パーティションの一覧:指定されたパーティションをサブスクライブできます。

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // topic "topic-a" のパーティション 0
              new TopicPartition("topic-b", 5)));  // topic "topic-b" のパーティション 5
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    Kafka メッセージを解析するために使用されるデシリアライザー。

    setDeserializer(KafkaRecordDeserializationSchema) メソッドを使用してデシリアライザーを指定します。KafkaRecordDeserializationSchema は、Kafka ConsumerRecord をどのように解析するかを定義します。Kafka メッセージ のみを解析する必要がある場合、以下のいずれかの方法を使用できます。

    • ビルダークラスの setValueOnlyDeserializer(DeserializationSchema) メソッドを使用します。DeserializationSchema は、Kafka メッセージ のバイナリデータの をどのように解析するかを定義します。

    • Kafka の Deserializer インターフェイス を実装するクラスを使用します。たとえば、StringDeserializer を使用して Kafka メッセージ を文字列として解析できます。

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    説明

    完全な ConsumerRecord を解析するには、KafkaRecordDeserializationSchema インターフェイスを実装する必要があります。

    POM

    Kafka DataStream コネクタ は Maven セントラルリポジトリで利用可能です。

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    Kafka DataStream コネクタ を使用する際には、以下のプロパティを考慮してください。

    • スタートオフセット

      Kafka ソース は、スタートオフセットオフセット初期化子 (OffsetsInitializer) を使用して指定します。組み込みの初期化子には以下があります。

      オフセット初期化子

      コード

      最も古い オフセット から消費を開始します。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      最新の オフセット から消費を開始します。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      指定された時刻以上であるタイムスタンプを持つデータの消費を開始します。単位はミリ秒です。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      コンシューマーグループ のコミット済み オフセット からデータの消費を開始します。コミット済み オフセット が存在しない場合、指定されたリセット戦略(たとえば、最も古い オフセット)を使用します。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      コンシューマーがコミットしたオフセットから消費を開始し、オフセットリセットポリシーを指定しません。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      説明
      • 組み込みの初期化子が要件を満たさない場合、カスタムの オフセット初期化子 を実装できます。

      • オフセット初期化子 を指定しない場合、デフォルトは OffsetsInitializer.earliest() です。

    • ストリーミングモードおよびバッチモード

      Kafka ソース は、ストリーミングモード および バッチモード の両方をサポートしています。デフォルトでは、ジョブ が失敗またはキャンセルされるまで無限に実行される ストリーミングモード で動作します。Kafka ソースバッチモード で実行するには、setBounded(OffsetsInitializer) を使用して停止 オフセット を指定できます。Kafka ソース は、すべてのパーティションが指定された停止オフセットに達すると終了します。

      説明

      ストリーミングモード の Kafka ソース には通常、停止 オフセット がありません。ただし、テスト目的で、ストリーミングモード でも setUnbounded(OffsetsInitializer) を使用して停止 オフセット を指定できます。ストリーミングモードバッチモード で停止 オフセット を指定するメソッド名は異なります:ストリーミングモード では setUnboundedバッチモード では setBounded です。

    • 動的パーティション検出

      Flink ジョブ を再起動することなく、トピックパターンによるサブスクライブ時にトピックのスケーリングまたは新規トピックの作成を処理するには、トピック のスケーリングに対応するために 動的パーティション検出 を有効化できます。この機能はデフォルトで無効化されており、明示的に有効化する必要があります。

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // 10 秒ごとに新しいパーティションを検出します。
      重要

      動的パーティション検出機能は、Kafka クラスターのメタデータ更新メカニズムに依存します。Kafka クラスターがパーティション情報を適切なタイミングで更新しない場合、新しいパーティションが検出されない可能性があります。Kafka クラスターの partition.discovery.interval.ms 構成が実際のシナリオと一致していることを確認してください。

    • イベント時刻および Watermark

      デフォルトでは、Kafka ソース は Kafka メッセージ のタイムスタンプを イベント時刻 として使用します。カスタムの Watermark 戦略を定義することで、メッセージ の本文から イベント時刻 を抽出し、下流に Watermark を送信できます。

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      カスタム Watermark 戦略の詳細については、「Watermark の生成」をご参照ください。

      説明

      ソースのサブタスクがアイドル状態の場合(たとえば、Kafka パーティション に新しいデータがない場合や、ソースの 並列度 が Kafka パーティション数より大きい場合)、そのサブタスクの Watermark は進行しません。これにより、下流のウィンドウ計算がブロックされる可能性があります。

      この問題を解決するには、以下のソリューションを検討してください。

      • ソースアイドルタイムアウトの設定:table.exec.source.idle-timeout プロパティ を有効にして、アイドル状態のソースを一時的にアイドル状態としてマークします。これにより、下流の Watermark が進行できるようになります。

      • 適切な 並列度 の設定:ソースの 並列度 が Kafka パーティション数を超えないようにしてください。

    • オフセットのコミット

      チェックポイントが有効な場合、Kafka ソース は、チェックポイント が完了したときに、現在のコンシューマー オフセット を Kafka にコミットします。これにより、Flink チェックポイント の状態が、Kafka ブローカーにコミットされた オフセット と一貫していることが保証されます。チェックポイントが無効な場合、Kafka ソース は Kafka コンシューマーの内部自動定期 オフセット コミットメカニズムに依存します。この機能は、enable.auto.commit および auto.commit.interval.ms Kafka コンシューマープロパティによって制御されます。

      説明

      Kafka ソース は、フォールトトレランスおよび回復のためにコミットされたオフセットに依存しません。オフセットのコミットは、Kafka コンシューマーおよび コンシューマーグループ の進行状況を監視するためだけです。

    • その他のプロパティ

      上記で言及されたプロパティに加えて、setProperties(Properties) および setProperty(String, String) を使用して、Kafka ソース およびその基盤となる Kafka コンシューマーの任意の プロパティ を設定できます。Kafka ソース は以下の特定のプロパティを提供します。

      パラメーター

      説明

      client.id.prefix

      Kafka コンシューマーのクライアント ID プレフィックスを指定します。

      partition.discovery.interval.ms

      新しいパーティションを検出する間隔をミリ秒単位で定義します。-1 の値は動的パーティション検出を無効にします。

      説明

      バッチモード では、このプロパティは自動的に -1 に設定されます。

      register.consumer.metrics

      Flink に Kafka コンシューマーメトリックを登録するかどうかを指定します。

      その他の Kafka コンシューマー構成

      Kafka コンシューマー構成の完全なリストについては、公式 Apache Kafka ドキュメント をご参照ください。

      重要

      正しい操作を保証するために、Kafka DataStream コネクタ は、以下の手動で構成されたプロパティを上書きします。

      • key.deserializer は常に org.apache.kafka.common.serialization.ByteArrayDeserializer に上書きされます。

      • value.deserializer は常に org.apache.kafka.common.serialization.ByteArrayDeserializer に上書きされます。

      • auto.offset.reset.strategyOffsetsInitializer によって提供される戦略によって上書きされます。

      次の例は、PLAIN SASL メカニズムを使用し、JAAS 構成を提供する Kafka コンシューマーを構成する方法を示しています。

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • モニタリング

      Kafka ソース は、Flink のメトリックシステムを通じてメトリックを公開し、モニタリングおよび診断に利用できます。

      • メトリックスコープ

        Kafka ソースリーダーのすべてのメトリックは、オペレーターのメトリックグループのサブグループである KafkaSourceReader メトリックグループの下に登録されます。特定の トピック パーティション に関連するメトリックは、KafkaSourceReader.topic.<topic_name>.partition.<partition_id> サブグループに登録されます。

        たとえば、「my-topic」トピックパーティション 1 の現在の コンシューマー オフセット メトリック (currentOffset) は、.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset で利用できます。成功したコミット数 (commitsSucceeded) は、.operator.KafkaSourceReader.commitsSucceeded で利用できます。

      • メトリックのリスト

        メトリック

        説明

        スコープ

        currentOffset

        パーティションの現在のコンシューマー オフセット

        TopicPartition

        committedOffset

        パーティションの最後にコミットされた オフセット

        TopicPartition

        commitsSucceeded

        成功したオフセットコミットの合計数。

        KafkaSourceReader

        commitsFailed

        失敗したコミット数

        KafkaSourceReader

      • Kafka コンシューマーメトリック

        基盤となる Kafka コンシューマーのメトリックは、KafkaSourceReader.KafkaConsumer メトリックグループに登録されます。たとえば、records-consumed-total メトリック.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total に登録されます。

        プロパティ register.consumer.metrics を使用して、Kafka コンシューマーメトリックを登録するかどうかを指定できます。このオプションはデフォルトで有効 (true) です。Kafka コンシューマーメトリックの詳細については、「Apache Kafka ドキュメント」をご参照ください。

  • Kafkaシンクの構築

    Flink Kafkaシンクは、データストリームを1つ以上の Kafkaトピックに書き込みます。

    DataStream<String> stream = ...
    
    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
    
    KafkaSink<String> sink = KafkaSink.<String>builder()
            .setKafkaProducerConfig(kafkaProperties)
            .setRecordSerializer(
                    KafkaRecordSerializationSchema.builder()
                            .setTopic("my-topic")
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build())
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();
    
    stream.sinkTo(sink);

    Kafkaシンクを構築するには、次のプロパティを設定する必要があります。

    パラメーター

    説明

    Kafkaクライアントプロパティ

    bootstrap.servers プロパティは必須です。これは、カンマ区切りの Kafkaブローカーのリストを指定します。

    レコードシリアライザー

    入力データを Kafka の KafkaRecordSerializationSchema を使用して Kafka の ProducerRecord に変換する必要があります。Flink では、メッセージキーおよび値のシリアル化、トピック選択、メッセージのパーティション分割など、一般的なコンポーネントを提供するスキーマビルダーが用意されています。さらに細かい制御が必要な場合は、対応するインターフェイスを実装することもできます。各受信レコードに対して ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) メソッドが呼び出され、Kafka に書き込む ProducerRecord を生成します。

    ProducerRecord は、各レコードが Kafkaに書き込まれる方法を詳細に制御し、これにより、以下を行うことができます。

    • 送信先の [トピック] を設定します。

    • [メッセージ] [キー] を設定します。

    • 送信先の [パーティション] を指定します。

    配信保証

    bootstrap.servers パラメーターは必須であり、カンマ区切りの Kafkaブローカーのリストを指定します。

    配達保証

    Flinkチェックポイントが有効になっている場合、Flink Kafkaシンクは1 回限りのセマンティクスを提供できます。チェックポイントを有効にすることに加えて、DeliveryGuaranteeパラメーターを使用して異なる配信保証を指定できます。DeliveryGuaranteeパラメーターは次のオプションを提供します。

    • DeliveryGuarantee.NONE: (デフォルト) Flink は保証を提供しません。データが失われたり、重複したりする可能性があります。

    • DeliveryGuarantee.AT_LEAST_ONCE: データが失われないことを保証しますが、重複が発生する可能性があります。

    • DeliveryGuarantee.EXACTLY_ONCE: Kafka トランザクションを使用して、1 回限りのセマンティクスを提供します。

      説明

      EXACTLY_ONCE セマンティクスを使用する場合は、「EXACTLY_ONCE セマンティクスに関する考慮事項」をご参照ください。

データインジェスト

Kafka コネクタをソースまたはシンクとして使用して、データインジェスト用の YAML ジョブを作成できます。

制限事項

  • Kafka データソースから Flink CDC データをインジェストするには、Realtime Compute for Apache Flink (VVR) 11.1 以降を使用します。

  • JSON、Debezium JSON、および Canal JSON のみがサポートされています。

  • 複数のパーティションに分散された単一テーブルからデータを読み取ることは、Realtime Compute for Apache Flink (VVR) 8.0.11 以降でのみサポートされています。

構文

source:
  type: kafka
  name: Kafka source
  properties.bootstrap.servers: localhost:9092
  topic: ${kafka.topic}
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

パラメーター

  • 一般

    パラメーター

    説明

    必須

    タイプ

    デフォルト

    備考

    type

    ソースまたはシンクのタイプ。

    はい

    String

    該当なし

    値は kafka でなければなりません。

    name

    ソースまたはシンクの名前。

    いいえ

    String

    該当なし

    該当なし

    properties.bootstrap.servers

    Kafka ブローカーのアドレス。

    はい

    String

    該当なし

    書式は host:port,host:port,host:port で、カンマ (,) で区切られます。

    properties.*

    Kafka クライアントの構成プロパティ。

    いいえ

    String

    該当なし

    プロパティキーは、公式 Apache Kafka ドキュメントのProducer Configs およびConsumer Configs で定義された有効なオプションである必要があります。

    Realtime Compute for Apache Flink (VVR) は、properties. プレフィックスを削除してから、残りのキーと値のペアを基盤となる Kafka クライアントに渡します。たとえば、自動トピック作成を無効にするには、'properties.allow.auto.create.topics' = 'false' を設定します。

    key.format

    Kafka メッセージキーのシリアル化および逆シリアル化のフォーマット。

    いいえ

    String

    該当なし

    • ソースの場合、json フォーマットのみがサポートされます。

    • シンクの場合、有効な値は以下のとおりです。

      • csv

      • json

    説明

    このオプションは、Realtime Compute for Apache Flink (VVR) 11.0.0 以降でのみサポートされます。

    value.format

    Kafka メッセージ値のシリアル化および逆シリアル化のフォーマット。

    いいえ

    String

    debezium-json

    • ソースの場合、有効な値は以下のとおりです。

      • debezium-json

      • canal-json

      • json

    • シンクの場合、有効な値は以下のとおりです。

      • debezium-json

      • canal-json

      • canal-protobuf

    説明
    • debezium-json および canal-json フォーマットには、Realtime Compute for Apache Flink (VVR) バージョン 8.0.10 以降が必要です。

    • json フォーマットには、Realtime Compute for Apache Flink (VVR) バージョン 11.0.0 以降が必要です。

  • ソースパラメーター

    パラメーター

    説明

    必須

    タイプ

    デフォルト

    備考

    topic

    読み取るトピックまたはトピック一覧。

    いいえ

    String

    該当なし

    複数のトピックをサブスクライブするには、トピック名をセミコロン (;) で区切ります。たとえば、topic-1;topic-2 のようにします。

    説明

    このパラメーターまたは topic-pattern のいずれか一方のみを指定できます。両方を指定することはできません。

    topic-pattern

    サブスクライブするトピックの名前と一致する正規表現。

    いいえ

    String

    該当なし

    説明

    このパラメーターまたは topic のいずれか一方のみを指定できます。両方を指定することはできません。

    properties.group.id

    コンシューマーグループの ID。

    いいえ

    String

    該当なし

    新しいコンシューマーグループ ID を指定する場合、初期スタートオフセットを定義するために、properties.auto.offset.reset パラメーターを earliest または latest に設定する必要があります。

    scan.startup.mode

    Kafka コンシューマーのスタートオフセット。

    いいえ

    String

    group-offsets

    有効な値:

    • earliest-offset: 最も古い利用可能なオフセットから読み取りを開始します。

    • latest-offset: 最新のオフセットから読み取りを開始します。

    • group-offsets (デフォルト値): 指定された properties.group.id のコミット済みオフセットから読み取りを開始します。

    • timestamp: scan.startup.timestamp-millis で指定されたタイムスタンプから読み取りを開始します。

    • specific-offsets: scan.startup.specific-offsets で指定されたオフセットから読み取りを開始します。

    説明

    このパラメーターは、ジョブがステートレスなスタートアップで開始される場合にのみ適用されます。ステートフルジョブが開始される場合、常にその状態に保存されたオフセットから消費されます。

    scan.startup.specific-offsets

    scan.startup.modespecific-offsets に設定されている場合の、各パーティションのスタートオフセット。

    いいえ

    String

    該当なし

    たとえば、partition:0,offset:42;partition:1,offset:300 のようにします。

    scan.startup.timestamp-millis

    scan.startup.modetimestamp に設定されている場合の、スタートタイムスタンプ(ミリ秒単位)。

    いいえ

    Long

    該当なし

    単位はミリ秒です。

    scan.topic-partition-discovery.interval

    トピック内の新しいパーティションを動的に検出する間隔。

    いいえ

    Duration

    5 分

    コネクタは定期的に新しいパーティションを検出し、そこから読み取りを行います。topic-pattern を使用する場合、コネクタはパターンに一致する新しいトピックも検出します。検出を無効にするには、この値を 0 以下に設定します。

    scan.check.duplicated.group.id

    properties.group.id で指定されたコンシューマーグループが重複しているかどうかを確認します。

    いいえ

    Boolean

    false

    有効な値:

    • true: ジョブの開始前に重複するコンシューマーグループを確認します。重複が見つかった場合、ジョブは失敗します。

    • false: 競合を確認せずにジョブを開始します。

    schema.inference.strategy

    スキーマ解析戦略。

    いいえ

    String

    continuous

    有効な値:

    • continuous: 各データレコードのスキーマを解析します。スキーマに互換性がない場合、システムはより広いスキーマを推論し、スキーマ変更イベントを生成します。

    • static: ジョブ開始時にスキーマ解析を一度だけ実行します。その後、この初期スキーマに基づいてデータが解析され、スキーマ変更イベントは生成されません。

    説明

    scan.max.pre.fetch.records

    初期スキーマ推論のために、各パーティションから消費および解析するメッセージの最大数。

    いいえ

    Int

    50

    データ処理が開始される前に、システムは各パーティションから指定された数の最新メッセージをプリフェッチして消費し、スキーマを初期化します。

    key.fields-prefix

    名前の競合を避けるために、メッセージキーのフィールド名に追加されるカスタムプレフィックス。

    いいえ

    String

    該当なし

    たとえば、このパラメーターが key_ に設定され、メッセージキーに a という名前のフィールドが含まれている場合、解析されたフィールド名は key_a になります。

    説明

    key.fields-prefix の値は、value.fields-prefix の値のプレフィックスであってはなりません。

    value.fields-prefix

    名前の競合を避けるために、メッセージ値のフィールド名に追加されるカスタムプレフィックス。

    いいえ

    String

    該当なし

    たとえば、このパラメーターが value_ に設定され、メッセージ値に b という名前のフィールドが含まれている場合、解析されたフィールド名は value_b になります。

    説明

    value.fields-prefix の値は、key.fields-prefix の値のプレフィックスであってはなりません。

    metadata.list

    下流のシンクに渡すメタデータ列。

    いいえ

    String

    該当なし

    利用可能なメタデータ列には、topicpartitionoffsettimestamptimestamp-typeheaders、および leader-epoch が含まれます。列名はカンマで区切ります。

    scan.value.initial-schemas.ddls

    特定のテーブルの初期スキーマを定義する DDL ステートメント。

    いいえ

    String

    該当なし

    複数の DDL ステートメントをセミコロン (;) で区切ります。たとえば、CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT); を使用して、テーブル db1.t1 および db1.t2 の初期スキーマをそれぞれ指定します。

    DDL で定義されたテーブルスキーマは、ターゲットのシンクテーブルと一致し、Flink SQL 構文に準拠している必要があります。

    説明

    この構成オプションは、Ververica Runtime (VVR) 11.5 以降でのみサポートされます。

    ingestion.ignore-errors

    データ解析エラーを無視するかどうかを指定します。

    いいえ

    Boolean

    false

    説明

    この構成オプションは、Ververica Runtime (VVR) 11.5 以降でのみサポートされます。

    ingestion.error-tolerance.max-count

    ingestion.ignore-errorstrue の場合、ジョブが失敗するまでに許容する解析エラーの最大数を指定します。

    いいえ

    Integer

    -1

    このパラメーターは、ingestion.ignore-errorstrue に設定されている場合にのみ適用されます。-1 の値は無制限の許容を意味し、解析例外によってジョブが失敗することはありません。

    説明

    この構成オプションは、Ververica Runtime (VVR) 11.5 以降でのみサポートされます。

    • Debezium JSON フォーマットパラメーター

      パラメーター

      必須

      タイプ

      デフォルト

      説明

      debezium-json.distributed-tables

      いいえ

      Boolean

      false

      単一の Debezium JSON テーブルのデータが複数のパーティションに分散されている場合に true に設定します。

      説明

      この構成オプションは、Ververica Runtime (VVR) 8.0.11 以降でのみサポートされます。

      重要

      このパラメーターを変更するには、ステートレスなスタートアップが必要です。

      debezium-json.schema-include

      いいえ

      Boolean

      false

      Debezium JSON メッセージにスキーマが含まれるかどうかを指定します。これは、Debezium Kafka Connect 構成の value.converter.schemas.enable プロパティに対応します。

      有効な値:

      • true: Debezium JSON メッセージにスキーマが含まれます。

      • false: Debezium JSON メッセージにスキーマは含まれません。

      debezium-json.ignore-parse-errors

      いいえ

      Boolean

      false

      有効な値:

      • true: 解析例外を引き起こす行をスキップします。

      • false: エラーをスローし、ジョブは失敗します。

      debezium-json.infer-schema.primitive-as-string

      いいえ

      Boolean

      false

      テーブルスキーマを解析する際に、すべてのプリミティブ型を String 型として解析するかどうかを指定します。

      有効な値:

      • true: すべてのプリミティブ型を String として解析します。

      • false: デフォルトのルールに基づいて型を解析します。

    • Canal JSON フォーマットパラメーター

      パラメーター

      必須

      タイプ

      デフォルト

      説明

      canal-json.distributed-tables

      いいえ

      Boolean

      false

      Canal JSON の単一テーブルのデータが複数のパーティションに分散されている場合、このオプションを有効にする必要があります。

      説明

      この構成オプションは、Ververica Runtime (VVR) 8.0.11 以降でのみサポートされます。

      重要

      このパラメーターを変更するには、ステートレスなスタートアップが必要です。

      canal-json.database.include

      いいえ

      String

      該当なし

      Canal レコードの database メタデータフィールドで変更ログをフィルタリングするためのオプションの正規表現。一致するデータベースからのレコードのみが処理されます。正規表現は Java の Pattern クラスと互換性があります。

      canal-json.table.include

      いいえ

      String

      該当なし

      Canal レコードの table メタデータフィールドで変更ログをフィルタリングするためのオプションの正規表現。一致するテーブルからのレコードのみが処理されます。正規表現は Java の Pattern クラスと互換性があります。

      canal-json.ignore-parse-errors

      いいえ

      Boolean

      false

      有効な値:

      • true: 解析例外が発生した場合、現在の行をスキップします。

      • false: エラーをスローし、ジョブの開始に失敗します。

      canal-json.infer-schema.primitive-as-string

      いいえ

      Boolean

      false

      テーブルスキーマを解析する際に、すべてのプリミティブ型を String 型として解析するかどうかを指定します。

      有効な値:

      • true: すべてのプリミティブ型を String として解析します。

      • false: デフォルトのルールに基づいて型を解析します。

      canal-json.infer-schema.strategy

      いいえ

      String

      AUTO

      テーブルスキーマを解析する戦略を指定します。

      有効な値:

      • AUTO: JSON データからスキーマを自動的に解析します。データに sqlType フィールドが含まれていない場合、解析失敗を防ぐために推奨されます。

      • SQL_TYPE: Canal JSON データ内の sqlType 配列からスキーマを解析します。データに sqlType フィールドが含まれている場合、より正確な型を取得するためにこれを SQL_TYPE に設定することを推奨します。

      • MYSQL_TYPE: Canal JSON データ内の mysqlType 配列からスキーマを解析します。

      sqlType 型マッピングルールの詳細については、「Canal JSON のスキーマ解析」をご参照ください。

      説明
      • この構成は、Ververica Runtime (VVR) 11.1 以降でのみサポートされます。

      • MYSQL_TYPE の値は、Ververica Runtime (VVR) 11.3 以降でサポートされます。

      canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled

      いいえ

      Boolean

      true

      MySQL の TIMESTAMP 型を CDC の TIMESTAMP 型にマッピングするかどうかを指定します。

      • true: MySQL の TIMESTAMP 型は CDC の TIMESTAMP 型にマッピングされます。

      • false: MySQL の TIMESTAMP 型は CDC の TIMESTAMP_LTZ 型にマッピングされます。

      canal-json.mysql.treat-tinyint1-as-boolean.enabled

      いいえ

      Boolean

      true

      MYSQL_TYPE 解析戦略を使用する場合、MySQL の TINYINT(1) 型を CDC の BOOLEAN 型にマッピングするかどうかを指定します。

      • true: MySQL の TINYINT(1) 型は CDC の BOOLEAN 型にマッピングされます。

      • false: MySQL の TINYINT(1) 型は CDC の TINYINT(1) 型にマッピングされます。

      このオプションは、canal-json.infer-schema.strategyMYSQL_TYPE に設定されている場合にのみ適用されます。

    • JSON フォーマットパラメーター

      パラメーター

      必須

      タイプ

      デフォルト

      説明

      json.timestamp-format.standard

      いいえ

      String

      SQL

      入力および出力データのタイムスタンプフォーマット。

      • SQL: yyyy-MM-dd HH:mm:ss.s{precision} 形式の入力タイムスタンプを解析します(例:2020-12-30 12:13:14.123)。

      • ISO-8601: yyyy-MM-ddTHH:mm:ss.s{precision} 形式の入力タイムスタンプを解析します(例:2020-12-30T12:13:14.123)。

      json.ignore-parse-errors

      いいえ

      Boolean

      false

      有効な値:

      • true: 解析例外が発生した場合、現在の行をスキップします。

      • false: エラーをスローし、ジョブの開始に失敗します。

      json.infer-schema.primitive-as-string

      いいえ

      Boolean

      false

      テーブルスキーマを解析する際に、すべてのプリミティブ型を String 型として解析するかどうかを指定します。

      有効な値:

      • true: すべてのプリミティブ型を String として解析します。

      • false: デフォルトのルールに基づいて型を解析します。

      json.infer-schema.flatten-nested-columns.enable

      いいえ

      Boolean

      false

      JSON データ内の入れ子になった列を再帰的に展開するかどうかを指定します。有効な値:

      • true: 入れ子になった列を再帰的に展開します。

      • false: 入れ子になった列を String として扱います。

      json.decode.parser-table-id.fields

      いいえ

      String

      該当なし

      JSON フォーマットでデータを解析する際に、一部の JSON フィールドの値を使用して tableId を生成するかどうかを指定します。複数のフィールドの値は、英語のカンマ , で連結されます。たとえば、JSON データが {"col0":"a", "col1","b", "col2","c"} の場合、生成される結果は以下のとおりです。

      構成

      tableId

      col0

      a

      col0,col1

      a.b

      col0,col1,col2

      a.b.c

      json.infer-schema.fixed-types

      いいえ

      String

      該当なし

      JSON データを解析する際、特定のフィールドのデータ型を指定できます。複数のフィールドはカンマ , で区切ります。たとえば、id BIGINT, name VARCHAR(10) は、id フィールドが BIGINT 型で、name フィールドが VARCHAR(10) 型であることを指定します。

      説明
      • この構成オプションは、Ververica Runtime (VVR) 11.5 以降でのみサポートされます。

      • Ververica Runtime (VVR) バージョン 11.5 でこの構成を使用する場合、scan.max.pre.fetch.records: 0 の構成も追加する必要があります。

  • シンクテーブルパラメーター

    パラメーター

    説明

    必須

    タイプ

    デフォルト

    備考

    type

    シンクタイプ。

    はい

    String

    なし

    値は kafka でなければなりません。

    name

    シンク名。

    いいえ

    String

    なし

    該当なし

    topic

    Kafka トピック名。

    いいえ

    String

    なし

    このパラメーターが指定されている場合、すべてのデータはこのトピックに書き込まれます。

    説明

    このパラメーターが指定されていない場合、各レコードは TableID にちなんで名付けられたトピックに書き込まれます。TableID は、データベース名とテーブル名をピリオド (.) で結合して構築されます(例:databaseName.tableName)。

    partition.strategy

    Kafka パーティションにデータを書き込む戦略。

    いいえ

    String

    all-to-zero

    有効な値:

    • all-to-zero (デフォルト): すべてのデータをパーティション 0 に書き込みます。

    • hash-by-key: プライマリキーのハッシュ値に基づいてデータをパーティションに書き込みます。これにより、同じプライマリキーを持つレコードが同じパーティションに書き込まれ、順序が保持されます。

    sink.tableId-to-topic.mapping

    上流のテーブル名から下流の Kafka トピック名へのマッピング。

    いいえ

    String

    なし

    マッピングはセミコロン (;) で区切ります。各マッピング内で、上流のテーブル名と下流の Kafka トピック名はコロン (:) で区切ります。テーブル名には正規表現を使用できます。複数のテーブルを同じトピックにマッピングするには、テーブル名をカンマ (,) で区切ります。例:mydb.mytable1:topic1;mydb.mytable2:topic2

    説明

    このパラメーターを使用すると、元のテーブル名情報を保持したまま、マッピングされたトピックを変更できます。

    • Debezium JSON フォーマットパラメーター

      パラメーター

      必須

      タイプ

      デフォルト

      説明

      debezium-json.include-schema.enabled

      いいえ

      Boolean

      false

      Debezium JSON データにスキーマ情報を含めるかどうかを指定します。

      debezium-json.emit.full-table-id.enabled

      いいえ

      Boolean

      false

      完全な 3 部構成のテーブル ID を Debezium JSON メタデータフィールドに書き込むかどうかを指定します。

      このパラメーターが有効な場合、マッピングは以下のとおりです。

      CDC テーブル ID 部

      Debezium JSON キー

      Namespace

      db

      Schema

      schema

      Table

      table

      このパラメーターが無効な場合、マッピングは以下のとおりです。

      CDC テーブル ID 部

      Debezium JSON キー

      Namespace

      マッピングなし

      Schema

      db

      Table

      table

      説明

      このパラメーターは、Ververica Runtime (VVR) 11.6 以降でのみサポートされます。

  • Kafka をデータインジェストソースとして使用する場合:

    source:
      type: kafka
      name: Kafka source
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      topic: ${kafka.topic}
      value.format: ${value.format}
      scan.startup.mode: ${scan.startup.mode}
     
    sink:
      type: hologres
      name: Hologres sink
      endpoint: <yourEndpoint>
      dbname: <yourDbname>
      username: ${secret_values.ak_id}
      password: ${secret_values.ak_secret}
      sink.type-normalize-strategy: BROADEN
  • Kafka をデータインジェストシンクとして使用する場合:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${kafka.topic}

    route モジュールは、ソーステーブルの送信先 Kafka トピックを指定します。

説明

デフォルトでは、ApsaraMQ for Kafka の自動トピック作成機能は無効になっています。詳細については、「自動トピック作成に関する FAQ」をご参照ください。ApsaraMQ for Kafka にデータを書き込む前に、トピックを作成する必要があります。詳細については、「手順 3:リソースの作成」をご参照ください。

スキーマ解析およびスキーマ進化に関するポリシー

Kafka コネクタは、現在認識されているすべてのテーブルのスキーマを維持します。

テーブルスキーマの初期化

テーブルスキーマ情報には、カラムおよびデータの型に関する情報、データベースおよびテーブルに関する情報、およびプライマリキーに関する情報が含まれます。以下のセクションでは、これらの 3 種類の情報をそれぞれ初期化する方法について説明します。

  • カラムおよびデータの型に関する情報

データ取り込みジョブは、データからテーブルのカラムおよびデータの型を自動的に推論できます。ただし、特定のシナリオでは、一部のテーブルに対してカラムおよびデータの型を明示的に指定したい場合があります。ユーザーが指定する型の粒度に応じて、テーブルスキーマの初期化には以下の 3 つの戦略があります。

  1. 完全自動スキーマ推論

Kafka からデータを読み取る前に、Kafka コネクタは各パーティションから最大 scan.max.pre.fetch.records 件のメッセージをフェッチし、各メッセージのスキーマを解析してマージすることでテーブルスキーマを初期化します。その後、実際のデータ消費の前に、この初期化されたスキーマに基づいてテーブル作成イベントが生成されます。

説明

Debezium JSON 形式および Canal JSON 形式では、テーブル情報が各メッセージ内に含まれています。scan.max.pre.fetch.records パラメーターに基づいて事前にフェッチされたメッセージには、複数のテーブルからのデータが含まれる可能性があります。したがって、単一のテーブルに対する事前フェッチ件数は確定できません。事前フェッチおよびスキーマ初期化は、各パーティションでメッセージの消費および処理を開始する前に 1 回のみ実行されます。後から新しいテーブルのデータが出現した場合、そのテーブルの最初のレコードから解析されたスキーマが初期スキーマとして使用され、再度の事前フェッチまたは初期化は行われません。

重要

単一テーブルのデータが複数のパーティションに分散される機能は、Ververica Runtime (VVR) 8.0.11 以降でのみサポートされており、debezium-json.distributed-tables または canal-json.distributed-tables 構成オプションを true に設定する必要があります。

  1. 初期テーブルスキーマの指定

一部のシナリオでは、たとえば Kafka から事前に作成済みのダウンストリームテーブルへデータを書き込むために、初期テーブルスキーマを手動で指定したい場合があります。このような場合は、scan.value.initial-schemas.ddls パラメーターを追加することで指定できます。以下に構成例を示します。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # 初期テーブルスキーマを設定
  scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);

DDL 文はターゲットテーブルのスキーマと一致している必要があります。この構成では、db1.t1 テーブルの id カラムの初期型を BIGINTname カラムの初期型を VARCHAR(10) と指定し、db1.t2 テーブルの id カラムの初期型を BIGINT と指定しています。

DDL 文には Flink SQL の構文を使用します。

  1. 特定フィールドに対する固定型の設定

一部のシナリオでは、特定のフィールドに対して固定のデータの型を設定したい場合があります。たとえば、TIMESTAMP 型として推論される可能性のあるフィールドを文字列として出力したい場合などです。このような場合は、json.infer-schema.fixed-types パラメーターを追加して初期テーブルスキーマを指定できます。このパラメーターは、メッセージフォーマットが JSON の場合にのみ有効です。以下に構成例を示します。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # 特定フィールドを固定型に設定
  json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
  scan.max.pre.fetch.records: 0

この構成では、すべての id フィールドを BIGINT 型、すべての name フィールドを VARCHAR(10) 型として指定しています。

データの型は Flink SQL の型と一致します。

  • データベースおよびテーブルに関する情報

    • Canal JSON 形式および Debezium JSON 形式では、コネクタは各メッセージからデータベース名およびテーブル名を含むテーブル情報を解析します。

    • JSON 形式の場合、デフォルトではテーブル情報にはテーブル名のみが含まれます。これは、データを含むトピックの名前です。データにデータベースおよびテーブル情報が含まれる場合、json.infer-schema.fixed-types パラメーターを使用して、この情報を含むフィールドを指定できます。これらのフィールドは、データベース名およびテーブル名にマップされます。以下に設定例を示します:

      source:
        type: kafka
        name: Kafka Source
        properties.bootstrap.servers: host:9092
        topic: test-topic
        value.format: json
        scan.startup.mode: earliest-offset
        # col1 フィールドの値をデータベース名、col2 フィールドの値をテーブル名として使用
        json.decode.parser-table-id.fields: col1,col2

      この構成により、コネクタは各レコードを、データベース名が col1 フィールドの値、テーブル名が col2 フィールドの値であるテーブルに送信します。

  • プライマリキーに関する情報

    • Canal JSON 形式では、JSON データ内の pkNames フィールドがテーブルのプライマリキーを定義します。

    • Debezium JSON 形式および JSON 形式では、データにプライマリキー情報は含まれません。この場合、transform ルールを用いてテーブルにプライマリキーを手動で追加できます。

      transform:
        - source-table: \.*.\.*
          projection: \*
          primary-keys: key1, key2

スキーマ解析およびスキーマ進化

テーブルスキーマが初期化された後、schema.inference.strategy が static に設定されている場合、Kafka コネクタは初期テーブルスキーマに基づいて各メッセージの値を解析し、スキーマ変更イベントを生成しません。一方、schema.inference.strategy が continuous に設定されている場合、Kafka コネクタは各 Kafka メッセージの値を解析し、物理カラムを特定したうえで、得られたスキーマを現在維持中のスキーマと比較します。スキーマが不一致の場合、コネクタはそれらをマージしようと試み、対応するテーブルスキーマ変更イベントを生成します。マージ規則は以下のとおりです。

  • 解析された物理カラムに、現在のスキーマに存在しないフィールドが含まれている場合、これらのフィールドはスキーマに追加され、nullable カラムとして追加するイベントが生成されます。

  • 解析された物理カラムに、現在のスキーマに存在するフィールドが含まれていない場合、当該フィールドはスキーマに保持され、その値は NULL として設定されます。カラム削除イベントは生成されません。

  • 同名のカラムについては、以下のとおり処理されます。

    • カラムのデータの型が同一で精度が異なる場合、より大きな精度を持つ型が採用され、カラム型変更イベントが生成されます。

    • カラムのデータの型が異なる場合、システムは型階層ツリー内で最も小さい共通親型を検索し、その共通親型をカラムの型として採用し、カラム型変更イベントを生成します。

      image

  • サポートされるスキーマ進化ポリシー:

    • カラムの追加:コネクタは新規カラムをスキーマの末尾に追加し、そのデータを同期します。新規カラムは nullable として設定されます。

    • カラムの削除:カラム削除イベントは生成されません。代わりに、当該カラムの今後のデータはすべて NULL として設定されます。

    • カラムの名前変更:コネクタはこれを旧カラムの削除および新カラムの追加として扱います。新カラムはスキーマの末尾に追加され、元のカラムの値は NULL として設定されます。

    • カラム型の変更:

      • カラム型変更をサポートするダウンストリーム sink を使用する場合、データ取り込みジョブは、sink の設定に応じて型変更(たとえば INT から BIGINT への変更)を処理できます。この機能は、個別の sink がサポートするカラム型変更規則に依存します。サポートされる規則については、各 sink のドキュメントをご参照ください。

      • Hologres のようなカラム型変更をサポートしないダウンストリーム sink を使用する場合、ワイド型マッピング を利用できます。この機能では、ジョブ起動時にダウンストリーム sink に幅広いデータの型を持つテーブルが作成されます。カラム型が変更された場合、新規型がダウンストリーム sink で定義されたワイド型の範囲内に収まれば、システムはその変更を許容します。

  • サポートされないスキーマ変更:

    • プライマリキーまたはインデックスなどの制約の変更。

    • NOT NULL から NULLABLE へのカラム変更。

  • Canal JSON スキーマ解析

    Canal JSON データには、任意の sqlType フィールドが含まれており、これによりデータカラムの正確な型情報が記録されます。より正確なスキーマを取得するため、canal-json.infer-schema.strategySQL_TYPE に設定して、sqlType フィールドの型を利用できます。型の対応関係は以下のとおりです。

    JDBC 型

    型コード

    CDC 型

    BIT

    -7

    BOOLEAN

    BOOLEAN

    16

    TINYINT

    -6

    TINYINT

    SMALLINT

    5

    SMALLINT

    INTEGER

    4

    INT

    BIGINT

    -5

    BIGINT

    DECIMAL

    3

    DECIMAL(38,18)

    NUMERIC

    2

    REAL

    7

    FLOAT

    FLOAT

    6

    DOUBLE

    8

    DOUBLE

    BINARY

    -2

    BYTES

    VARBINARY

    -3

    LONGVARBINARY

    -4

    BLOB

    2004

    DATE

    91

    DATE

    TIME

    92

    TIME

    TIMESTAMP

    93

    TIMESTAMP

    CHAR

    1

    STRING

    VARCHAR

    12

    LONGVARCHAR

    -1

    その他のデータ型

ダーティデータの許容とコレクション

ご利用の Kafka データソースには、不正な形式のレコード(いわゆる「ダーティデータ」)が含まれている可能性があります。ジョブが頻繁に失敗・再起動するのを防ぐため、無効なレコードを無視するよう構成できます。たとえば、以下のとおりです。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # ダーティデータの許容を有効化
  ingestion.ignore-errors: true
  # 最大 1,000 件のダーティデータレコードを許容
  ingestion.error-tolerance.max-count: 1000

この構成では、最大 1,000 件のダーティレコードを許容し、ジョブを正常に実行可能にします。ただし、ダーティレコード数がこのしきい値を超えるとジョブは失敗し、データの検証が必要になります。

ダーティデータによるジョブの失敗を一切回避するには、以下の構成をご使用ください。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # ダーティデータの許容を有効化
  ingestion.ignore-errors: true
  # 全てのダーティデータレコードを許容
  ingestion.error-tolerance.max-count: -1

ダーティデータ許容ポリシーにより、無効なレコードによってジョブが失敗することを防止できます。また、Kafka データプロデューサーの動作を調整するために、このダーティデータを分析することも推奨されます。「ダーティデータのコレクション」で説明されているとおり、TaskManager のログからジョブのダーティデータを確認できます。たとえば、以下のとおりです。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # ダーティデータの許容を有効化
  ingestion.ignore-errors: true
  # 全てのダーティデータレコードを許容
  ingestion.error-tolerance.max-count: -1

pipeline:
  dirty-data.collector:
    # ダーティデータを TaskManager のログファイルに書き込む
    type: logger

テーブル名とトピックのマッピング

データ取り込みジョブの Sink として Kafka を使用する場合、Debezium JSON や Canal JSON などのメッセージフォーマットには、元のテーブル名情報が含まれます。これらのメッセージを利用するダウンストリームシステムは、多くの場合、トピック名ではなく、埋め込まれたテーブル名を実際のテーブル識別子として使用します。したがって、テーブル名とトピック間のマッピング戦略を慎重に構成する必要があります。

MySQL データベースから 2 つのテーブル (mydb.mytable1mydb.mytable2) を同期する必要があるとします。以下のマッピング戦略が利用可能です:

1. マッピング戦略なし

マッピング戦略がない場合、各テーブルのデータは <データベース名>.<テーブル名> というフォーマットの名前を持つトピックに書き込まれます。したがって、mydb.mytable1 のデータは mydb.mytable1 という名前のトピックに、mydb.mytable2 のデータは mydb.mytable2 という名前のトピックに書き込まれます。以下に構成例を示します:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

2. ルート ルールによるマッピング (非推奨)

デフォルトの <データベース名>.<テーブル名> フォーマットを使用する代わりに、特定のトピックにデータを書き込みたい場合があります。これを行うには、ルート ルールを構成します。以下に構成例を示します:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  
 route:
  - source-table: mydb.mytable1,mydb.mytable2
    sink-table: mytable

この場合、mydb.mytable1mydb.mytable2 のすべてのデータは、mytable という名前の単一のトピックに書き込まれます。

ただし、ルート ルールを使用して送信先トピックを変更すると、Kafka メッセージ (Debezium JSON または Canal JSON フォーマット) 内のテーブル名も変更されます。この場合、すべての Kafka メッセージ内のテーブル名は mytable になります。これにより、このトピックからメッセージを利用するシステムで予期しない動作が発生する可能性があります。

3. sink.tableId-to-topic.mapping を使用したマッピング (推奨)

元のソーステーブル名を保持したままテーブル名をトピックにマッピングするには、`sink.tableId-to-topic.mapping` パラメーターを使用します。以下に構成例を示します:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

または、以下の構成を使用することもできます:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

この場合、mydb.mytable1mydb.mytable2 のすべてのデータは mytable トピックに書き込まれ、Kafka メッセージ (Debezium JSON または Canal JSON フォーマット) 内のテーブル名は mydb.mytable1 または mydb.mytable2 として保持されます。これにより、ダウンストリームシステムは元のソーステーブル名を正しく取得できます。

EXACTLY_ONCE セマンティクス

  • コンシューマーの隔離レベルの設定

    Kafka データを消費するすべてのアプリケーションは、isolation.level プロパティを設定する必要があります。

    • read_committed:コミット済みのデータのみを読み取ります。

    • read_uncommitted(デフォルト):未コミットのデータも読み取ることができます。

    EXACTLY_ONCE は read_committed に依存します。それ以外の場合、コンシューマーが未コミットのデータを参照し、一貫性が損なわれる可能性があります。

  • トランザクションのタイムアウトとデータ喪失

    チェックポイントからの回復時、Realtime Compute for Apache Flink は、そのチェックポイントの開始前にコミットされたトランザクションのみを考慮します。ジョブの障害発生から再起動までの持続時間が Kafka のトランザクションタイムアウトを超える場合、Kafka は自動的にオープン中のトランザクションを中止し、データ喪失を引き起こす可能性があります。

    • Kafka ブローカーにおける transaction.max.timeout.ms のデフォルト値は 15 分です。

    • Flink Kafka Sink では、デフォルトで transaction.timeout.ms パラメーターが 1 時間に設定されます。

    • ブローカー側の transaction.max.timeout.ms を、Flink 側の設定値以上(等しいか大きい値)に増加させる必要があります。

  • プロデューサープールと同時チェックポイント

    EXACTLY_ONCE モードでは、固定サイズの Kafka プロデューサープールが使用されます。各チェックポイントは、このプールから 1 つのプロデューサーを使用します。同時実行中のチェックポイント数がプールサイズを超えると、ジョブは失敗します。

    プロデューサープールのサイズは、同時実行可能なチェックポイントの最大数に基づいて設定してください。

  • 並列処理数のスケールダウン制約

    最初のチェックポイントが完了する前にジョブが失敗した場合、再起動時に元のプロデューサープール情報が失われます。したがって、最初のチェックポイント完了前に、ジョブの並列処理数をスケールダウンしないでください。スケールダウンがやむを得ず必要となる場合は、新しい並列処理数が FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR を下回らないようにしてください。

  • トランザクションによる読み取りのブロック

    read_committed モードでは、コミットまたは中止されていないトランザクションが存在すると、トピック全体の読み取り操作がブロックされます。

    たとえば:

    • トランザクション 1 がデータを書き込みます。

    • トランザクション 2 がさらにデータを書き込み、コミットされます。

    • トランザクション 1 がオープンのままの場合、コミット済みのトランザクション 2 のデータはコンシューマーに対して非表示のままとなります。

    これは以下の影響を及ぼします:

    • 通常運用時において、データの可視性遅延はおおよそチェックポイント間隔と同等になります。

    • ジョブが失敗した場合、当該ジョブが書き込んでいたトピックは、ジョブの再起動またはトランザクションのタイムアウトが発生するまで、コンシューマーによる読み取りがブロックされます。極端なケースでは、トランザクションタイムアウト処理自体が読み取り操作に影響を与える可能性もあります。

よくある質問