すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Message Queue for Kafka

最終更新日:Feb 07, 2026

このトピックでは、Kafka コネクタの使用方法について説明します。

背景情報

Apache Kafka は、高性能なデータ処理、ストリーミング分析、データ統合など、ビッグデータアプリケーションで広く使用されているオープンソースの分散メッセージキューサービスです。Kafka コネクタは、Apache Kafka クライアントを活用することで、高スループットのデータインジェストとエグレス、複数形式のデータに対する読み書き操作、および Realtime Compute for Apache Flink のための exactly-once セマンティクスをサポートします。

カテゴリ

説明

サポートタイプ

ソーステーブル、シンクテーブル、データインジェストシンク

実行モード

ストリーミングモード

データ形式

サポートされるデータ形式

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

説明
  • Protobuf は、Ververica Runtime (VVR) 8.0.9 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

  • サポートされている各データ形式には、WITH 句で直接使用できる対応する設定オプションが含まれています。詳細については、「Flink ドキュメント」をご参照ください。

固有の監視メトリック

特定の監視メトリック

  • ソーステーブル

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • シンクテーブル

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

説明

メトリックの詳細については、「監視メトリック」をご参照ください。

API タイプ

SQL API、DataStream API、およびデータインジェスト YAML API

シンクテーブルのデータを更新または削除できますか?

シンクテーブルのデータを更新または削除することはできません。シンクテーブルにはデータを挿入することしかできません。

説明

データの更新または削除に関連する機能の詳細については、「Upsert Kafka」をご参照ください。

前提条件

要件に応じて、次のいずれかの方法でクラスターに接続できます:

  • ApsaraMQ for Kafka クラスターへの接続

    • ApsaraMQ for Kafka クラスターのバージョンは 0.11 以降です。

    • ApsaraMQ for Kafka クラスターが作成されていること。詳細については、「リソースの作成」をご参照ください。

    • Realtime Compute for Apache Flink ワークスペースが ApsaraMQ for Kafka クラスターと同じ Virtual Private Cloud (VPC) 内にあり、Realtime Compute for Apache Flink VPC の CIDR ブロックが ApsaraMQ for Kafka クラスターのホワイトリストに追加されていること。ApsaraMQ for Kafka のホワイトリストの設定方法については、「ホワイトリストの設定」をご参照ください。

    重要

    ApsaraMQ for Kafka にデータを書き込む際は、次の点にご注意ください:

    • ApsaraMQ for Kafka は、データ書き込み時の Zstandard 圧縮アルゴリズムをサポートしていません。

    • ApsaraMQ for Kafka は、べき等またはトランザクション書き込み操作をサポートしていません。そのため、Kafka シンクテーブルがサポートする exactly-once セマンティクスは使用できません。ご利用の Realtime Compute for Apache Flink ジョブが Ververica Runtime (VVR) 8.0.0 以降を使用している場合は、シンクテーブルに対して properties.enable.idempotence=false を設定して、べき等書き込み機能を無効にしてください。ApsaraMQ for Kafka のストレージエンジンの比較と制限事項の詳細については、「ストレージエンジンの比較」をご参照ください。

  • セルフマネージド Apache Kafka クラスターへの接続

    • セルフマネージド Apache Kafka クラスターのバージョンは 0.11 以降です。

    • Flink とセルフマネージド Apache Kafka クラスター間にネットワーク接続が存在すること。パブリックネットワーク経由でセルフマネージド Apache Kafka クラスターに接続する方法の詳細については、「ネットワーク接続オプション」をご参照ください。

    • Apache Kafka 2.8 クライアントオプションのみがサポートされています。詳細については、Apache Kafka の Consumer Configs および Producer Configs ドキュメントをご参照ください。

注意事項

現在、Flink および Kafka コミュニティの設計上の制限により、トランザクション書き込みは推奨されませんsink.delivery-guarantee = exactly-once を設定すると、Kafka コネクタはトランザクション書き込みを有効にしますが、これには 3 つの既知の問題があります:

  • 各チェックポイントは新しいトランザクション ID を生成します。チェックポイントの間隔が短すぎると、トランザクション ID の数が過剰になります。その結果、Kafka コーディネーターのメモリが不足し、Kafka クラスターの安定性が損なわれる可能性があります。

  • 各トランザクションはプロデューサーインスタンスを作成します。同時にコミットされるトランザクションが多すぎると、TaskManager のメモリが不足し、Flink ジョブが不安定になる可能性があります。

  • 複数の Flink ジョブが同じ sink.transactional-id-prefix を使用すると、生成されるトランザクション ID が競合する可能性があります。あるジョブが書き込みに失敗すると、Kafka パーティションの Log Start Offset (LSO) の進行がブロックされ、そのパーティションから読み取るすべてのコンシューマーに影響を与えます。

exactly-once セマンティクスが必要な場合は、Upsert Kafka を使用してプライマリキーテーブルに書き込み、プライマリキーに依存してべき等性を確保してください。トランザクション書き込みを使用する場合は、「Exactly-Once セマンティクスに関する注意事項」をご参照ください。

ネットワーク接続のトラブルシューティング

Flink ジョブの起動中に Timed out waiting for a node assignment というエラーが報告された場合、これは通常、Flink と Kafka の間のネットワーク接続に問題があることを示します。

Kafka クライアントは、以下のようにサーバーに接続します:

  1. bootstrap.servers のアドレスを使用して Kafka に接続します。

  2. Kafka は、クラスター内の各ブローカーのメタデータ (エンドポイントなど) を返します。

  3. クライアントは、返されたエンドポイントを使用して各ブローカーに接続し、データを生成または消費します。

bootstrap.servers のアドレスに到達できても、Kafka が不正なブローカーアドレスを返した場合、クライアントはブローカーへの読み書きができません。この問題は、プロキシ、ポートフォワーディング、または専用線を使用するネットワークアーキテクチャで一般的に発生します。

トラブルシューティング手順

ApsaraMQ for Kafka

  1. アクセスポイントタイプの確認

    • デフォルトアクセスポイント (内部ネットワーク)

    • SASL アクセスポイント (認証付き内部ネットワーク)

    • パブリックネットワークアクセスポイント (別途申請が必要)

    Flink 開発コンソールを使用して ネットワークプローブ を実行し、bootstrap.servers アドレスとの接続問題を排除します。

  2. セキュリティグループとホワイトリストの確認

    Kafka インスタンスは、Flink VPC をホワイトリストに追加する必要があります。詳細については、「VPC CIDR ブロックの表示」および「ホワイトリストの設定」をご参照ください。

  3. SASL 設定の確認 (有効な場合)

    SASL_SSL エンドポイントを使用する場合、Flink ジョブで JAAS、SSL、および SASL メカニズムを正しく設定する必要があります。認証が欠落していると、ハンドシェイクフェーズで接続が失敗し、タイムアウトとして現れることがあります。詳細については、「セキュリティと認証」をご参照ください。

セルフマネージド Kafka (ECS)

  1. Flink 開発コンソールを使用してネットワークプローブを実行する。

    bootstrap.servers アドレスとの接続問題を排除し、パブリックおよび内部エンドポイントが正しいことを確認します。

  2. セキュリティグループとホワイトリストの確認

    • ECS セキュリティグループは、Kafka アクセスポイントポート (通常 9092 または 9093) でのトラフィックを許可する必要があります。

    • Flink が配置されている VPC は、ECS インスタンスのホワイトリストに追加する必要があります。詳細については、「VPC CIDR ブロックの表示」をご参照ください。

  3. 設定のトラブルシューティング

    1. Kafka が使用する ZooKeeper クラスターにログインし、zkCli.sh または zookeeper-shell.sh ツールを使用します。

    2. コマンドを実行してブローカーのメタデータを取得します。例:get /brokers/ids/0。応答の endpoints フィールドで、Kafka がクライアントにアドバタイズするアドレスを見つけます。

      example

    3. Flink 開発コンソールで ネットワークプローブ を実行し、アドレスが到達可能かどうかをテストします。

      説明
      • アドレスが到達不可能な場合は、Kafka の運用保守エンジニアに連絡して listeners および advertised.listeners の設定を確認・修正し、返されたアドレスが Flink からアクセスできるようにしてください。

      • Kafka クライアントとサーバーの接続性の詳細については、「接続性のトラブルシューティング」をご参照ください。

  4. SASL 設定の確認 (有効な場合)

    SASL_SSL エンドポイントを使用する場合、Flink ジョブで JAAS、SSL、および SASL メカニズムを正しく設定する必要があります。認証が欠落していると、ハンドシェイクフェーズで接続が失敗したり、タイムアウトとして現れたりします。詳細については、「セキュリティと認証」をご参照ください。

SQL

Kafka コネクタは、SQL ジョブでソーステーブルまたはシンクテーブルとして使用できます。

構文

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

メタデータ列

Kafka ソーステーブルまたは Kafka シンクテーブルにメタデータ列を定義して、Kafka メッセージのメタデータを取得できます。たとえば、Kafka ソーステーブルの WITH 句で複数のトピックが定義され、ソーステーブルにメタデータ列が定義されている場合、Flink がデータを読み取るトピックがマークされます。次のサンプルコードは、メタデータ列の使用方法の例です:

CREATE TABLE kafka_source (
  -- メッセージが属するトピックを record_topic フィールドの値として読み取ります。
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- ConsumerRecord のタイムスタンプを ts フィールドの値として読み取ります。
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- メッセージのオフセットを record_offset フィールドの値として読み取ります。
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- ts フィールドのタイムスタンプを ProducerRecord のタイムスタンプとして Kafka に書き込みます。
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

次の表は、Kafka ソーステーブルとシンクテーブルでサポートされているメタデータ列について説明します。

キー

データの型

説明

ソーステーブルまたはシンクテーブル

topic

STRING NOT NULL METADATA VIRTUAL

Kafka メッセージが属するトピックの名前。

ソーステーブル

partition

INT NOT NULL METADATA VIRTUAL

Kafka メッセージが属するパーティションの ID。

ソーステーブル

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

Kafka メッセージのヘッダー。

ソーステーブルとシンクテーブル

leader-epoch

INT NOT NULL METADATA VIRTUAL

Kafka メッセージのリーダーエポック。

ソーステーブル

offset

BIGINT NOT NULL METADATA VIRTUAL

Kafka メッセージのオフセット。

ソーステーブル

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

Kafka メッセージのタイムスタンプ。

ソーステーブルとシンクテーブル

timestamp-type

STRING NOT NULL METADATA VIRTUAL

Kafka メッセージのタイムスタンプタイプ:

  • NoTimestampType:メッセージにタイムスタンプが定義されていないことを示します。

  • CreateTime:メッセージが生成された時刻を示します。

  • LogAppendTime:メッセージが Kafka ブローカーに追加された時刻を示します。

ソーステーブル

__raw_key__

STRING NOT NULL METADATA VIRTUAL

Kafka メッセージの生のキーフィールド。

ソーステーブルとシンクテーブル

説明

このパラメーターは、VVR 11.4 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

__raw_value__

STRING NOT NULL METADATA VIRTUAL

Kafka メッセージの生の値フィールド。

ソーステーブルとシンクテーブル

説明

このパラメーターは、VVR 11.4 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

WITH パラメーター

  • 一般

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    String

    はい

    なし

    値を kafka に設定します。

    properties.bootstrap.servers

    Kafka ブローカーの IP アドレスとポート番号。

    String

    はい

    なし

    フォーマット:host:port,host:port,host:port。複数の host:port ペアはカンマ (,) で区切ります。

    properties.*

    Kafka クライアント用に設定されるオプション。

    String

    いいえ

    なし

    サフィックスは、Producer Configs および Consumer Configs で定義されているルールに準拠する必要があります。

    Flink は "properties." プレフィックスを削除し、残りの設定を Kafka クライアントに渡します。たとえば、'properties.allow.auto.create.topics'='false' を使用して、自動トピック作成を無効にできます。

    以下の設定は、Kafka コネクタが上書きするため、この方法では変更できません:

    • key.deserializer

    • value.deserializer

    format

    Kafka メッセージの値フィールドを読み書きするために使用されるフォーマット。

    String

    いいえ

    デフォルト値なし。

    サポートされるフォーマット

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    説明

    フォーマットオプションの詳細については、「フォーマットオプション」をご参照ください。

    key.format

    Kafka メッセージのキーフィールドを読み書きするために使用されるフォーマット。

    String

    いいえ

    デフォルト値なし。

    サポートされるフォーマット

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    説明

    この設定を使用する場合、key.options 設定が必要です。

    key.fields

    ソーステーブルまたはシンクテーブルのキーフィールドで、Kafka メッセージのキーフィールドに対応するもの。

    String

    いいえ

    なし

    複数のフィールド名はセミコロン (;) で区切ります。例:field1;field2

    key.fields-prefix

    Kafka メッセージのすべてのキーフィールドのカスタムプレフィックス。このオプションを設定して、値フィールドとの名前の競合を防ぐことができます。

    String

    いいえ

    なし

    このオプションは、ソーステーブルとシンクテーブルの列名を区別するためにのみ使用されます。Kafka メッセージのキーフィールドが解析または生成されるときに、プレフィックスは列名から削除されます。

    説明

    この設定を使用する場合、value.fields-include オプションを EXCEPT_KEY に設定する必要があります。

    value.format

    Kafka メッセージの値フィールドを読み書きするために使用されるフォーマット。

    String

    いいえ

    なし

    この設定は format と同等であり、format または value.format のいずれか一方のみを設定できます。両方を設定した場合、value.formatformat を上書きします。

    value.fields-include

    Kafka メッセージの値を解析または生成する際に、対応するメッセージキーを含めるかどうかを指定します。

    String

    いいえ

    ALL

    有効な値:

    • ALL (デフォルト):すべてのフィールドが Kafka メッセージの値として処理されます。

    • EXCEPT_KEY:key.fields オプションで指定されたフィールドを除くすべてのフィールドが Kafka メッセージの値として処理されます。

  • ソーステーブル

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    備考

    topic

    データを読み取るトピックの名前。

    String

    いいえ

    なし

    複数のトピック名はセミコロン (;) で区切ります。例:topic-1 と topic-2。

    説明

    topic オプションは topic-pattern オプションと併用できません。

    topic-pattern

    トピックを照合するために使用される正規表現。デプロイメントの実行中に、指定された正規表現に名前が一致するすべてのトピックのデータが読み取られます。

    String

    いいえ

    なし

    説明

    topic オプションは topic-pattern オプションと併用できません。

    properties.group.id

    コンシューマーグループ ID。

    String

    いいえ

    KafkaSource-{ソーステーブル名}

    指定されたグループ ID を初めて使用する場合は、properties.auto.offset.reset を earliest または latest に設定して、初期開始オフセットを指定する必要があります。

    scan.startup.mode

    Kafka がデータを読み取る開始オフセット。

    String

    いいえ

    group-offsets

    有効な値:

    • earliest-offset:Kafka は最も古いパーティションからデータを読み取ります。

    • latest-offset:Kafka は最新のオフセットからデータを読み取ります。

    • group-offsets (デフォルト):properties.group.id オプションで指定された ID を持つコンシューマーグループによってコミットされたオフセットからデータを読み取ります。

    • timestampscan.startup.timestamp-millis で指定されたタイムスタンプからデータを読み取ります。

    • specific-offsetsscan.startup.specific-offsets オプションで指定されたオフセットからデータを読み取ります。

    説明

    このオプションは、ステートなしでデプロイメントが開始された場合に有効になります。デプロイメントがチェックポイントから再開されたり、指定されたステートから再開されたりすると、デプロイメントは優先的にステートデータに保存されている進行状況でデータの読み取りを開始します。

    scan.startup.specific-offsets

    scan.startup.mode オプションが specific-offsets に設定されている場合の各パーティションの開始オフセット。

    String

    いいえ

    なし

    例:partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    scan.startup.mode オプションが timestamp に設定されている場合の開始オフセットのタイムスタンプ。

    Long

    いいえ

    なし

    単位:ミリ秒。

    scan.topic-partition-discovery.interval

    Kafka のトピックとパーティションを動的に検出する時間間隔。

    Duration

    いいえ

    5 分

    デフォルトのパーティション検出間隔は 5 分です。この機能を無効にするには、このオプションを明示的に非正の値に設定する必要があります。動的パーティション検出機能が有効になると、Kafka ソースは新しいパーティションを自動的に検出し、そのパーティションからデータを読み取ることができます。topic-pattern モードでは、Kafka ソースは既存のトピックの新しいパーティションからデータを読み取り、正規表現に一致する新しいトピックのすべてのパーティションからデータを読み取ります。

    説明

    VVR 6.0.X を使用する Realtime Compute for Apache Flink では、動的パーティション検出機能はデフォルトで無効になっています。VVR 8.0 以降を使用する Realtime Compute for Apache Flink では、この機能はデフォルトで有効になっています。デフォルトのパーティション検出間隔は 5 分です。

    scan.header-filter

    データが特定のメッセージヘッダーを含むかどうかに基づく Kafka データフィルタリング。

    String

    いいえ

    なし

    ヘッダーキーと値はコロン (:) で区切ります。複数のヘッダーは AND (&) や OR (|) などの論理演算子で区切ります。論理演算子 NOT (!) がサポートされています。たとえば、depart:toy|depart:book&!env:test は、ヘッダーに depart=toy または depart=book を含み、かつ env=test を含まない Kafka データを保持することを示します。

    説明
    • このオプションは、VVR 8.0.6 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

    • 括弧操作はサポートされていません。

    • 論理演算は左から右へ順に実行されます。

    • UTF-8 形式のヘッダー値は文字列に変換され、scan.header-filter オプションで指定されたヘッダー値と比較されます。

    scan.check.duplicated.group.id

    properties.group.id パラメーターで指定された重複するコンシューマーグループをチェックするかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true:ジョブが開始する前に重複するコンシューマーグループをチェックします。重複するコンシューマーグループが存在する場合、エラーを報告し、競合を防ぐためにジョブを一時停止します。

    • false:ジョブが開始する前に重複するコンシューマーグループをチェックしません。

    説明

    このオプションは、VVR 6.0.4 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

  • シンク固有

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    備考

    topic

    データが書き込まれるトピックの名前。

    String

    はい

    なし

    N/A

    sink.partitioner

    Flink の並列度を Kafka のパーティションにマッピングするパターン。

    String

    いいえ

    default

    有効な値:

    • default (デフォルト):デフォルトの Kafka パーティショナーを使用してデータをパーティション分割します。

    • fixed:各 Flink パーティションは固定の Kafka パーティションに対応します。

    • round-robin:Flink パーティション内のデータは、ラウンドロビンシーケンスで Kafka パーティションに分散されます。

    • カスタムパーティションマッピングパターン:FlinkKafkaPartitioner のサブクラスを作成して、カスタムパーティションマッピングパターンを設定できます。例:org.mycompany.MyPartitioner。

    sink.delivery-guarantee

    Kafka シンクテーブルのセマンティックパターン。

    String

    いいえ

    at-least-once

    有効な値:

    • none:配信セマンティクスは保証されません。データが失われたり、重複したりする可能性があります。

    • at-least-once (デフォルト):データが失われないことを保証します。ただし、データが重複する可能性があります。

    • exactly-once:Kafka トランザクションを使用して exactly-once セマンティクスを保証します。これにより、データが失われたり、重複したりしないことが保証されます。

    説明

    このオプションを exactly-once に設定する場合は、sink.transactional-id-prefix オプションを設定する必要があります。

    sink.transactional-id-prefix

    exactly-once セマンティクスで使用される Kafka トランザクション ID のプレフィックス。

    String

    いいえ

    なし

    このオプションは、sink.delivery-guarantee オプションが exactly-once に設定されている場合にのみ有効です。

    sink.parallelism

    Kafka シンクテーブルのオペレーターの並列度。

    Integer

    いいえ

    なし

    フレームワークによって決定される、上流オペレーターの並列度。

セキュリティと認証

ご利用の Kafka クラスターが安全な接続または認証を必要とする場合は、セキュリティおよび認証オプションの名前に properties. プレフィックスを追加し、WITH 句でそれらを設定します。次のサンプルコードは、Kafka テーブルが Simple Authentication and Security Layer (SASL) メカニズムとして PLAIN を使用し、Java Authentication and Authorization Service (JAAS) 設定を提供する方法を示しています。

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

次のサンプルコードは、Kafka テーブルがセキュリティプロトコルとして SASL_SSL を、SASL メカニズムとして SCRAM-SHA-256 を使用するように設定する方法を示しています。

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /*Secure Sockets Layer (SSL) を設定します。*/
  /*サーバーから提供された CA 証明書トラストストアのパスを指定します。*/
  /*アップロードされたアーティファクトは /flink/usrlib/ に保存されます。*/
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /*クライアント認証が必要な場合は、秘密鍵ファイルキーストアのパスを指定します。*/
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /*クライアントがサーバーアドレスを検証するために使用するアルゴリズム。null 値はサーバーアドレス検証が無効であることを示します。*/
  'properties.ssl.endpoint.identification.algorithm' = '',
  /*SASL を設定します。*/
  /*SASL メカニズムとして SCRAM-SHA-256 を設定します。*/
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /*JAAS を設定します。*/
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

開発コンソールの アーティファクト 機能を使用して、例の CA 証明書と秘密鍵をアップロードできます。アップロード後、ファイルは /flink/usrlib ディレクトリに保存されます。使用したい CA 証明書ファイルの名前が my-truststore.jks の場合、WITH 句の 'properties.ssl.truststore.location' パラメーターを次の 2 つの方法で設定して、この証明書を使用できます:

  • 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' を設定した場合、Flink は実行時に OSS ファイルを動的にダウンロードする必要はありませんが、デバッグモードはサポートされません。

  • リアルタイムコンピューティングエンジンバージョン Ververica Runtime (VVR) 11.5 以降では、properties.ssl.truststore.locationproperties.ssl.keystore.location を OSS 絶対パスで設定できます。ファイルパスの形式は oss://flink-fullymanaged-<workspace ID>/artifacts/namespaces/<project name>/<file name> です。この方法は、Flink の実行時に OSS ファイルを動的にダウンロードし、デバッグモードをサポートします。

説明
  • 設定の確認:上記のコードスニペットは、ほとんどの設定シナリオに適用されます。Kafka コネクタを設定する前に、Kafka サーバーの運用保守担当者に連絡して、正しいセキュリティおよび認証設定情報を取得してください。

  • エスケープに関する注意:Apache Flink とは異なり、Realtime Compute for Apache Flink の SQL エディターはデフォルトで二重引用符 (") をエスケープします。そのため、properties.sasl.jaas.config オプションを設定する際に、ユーザー名とパスワードを囲む二重引用符 (") にバックスラッシュ (\) をエスケープ文字として追加する必要はありません。

Kafka ソーステーブルの開始オフセット

起動モード

scan.startup.mode パラメーターを設定して、Kafka ソーステーブルの初期読み取りオフセットを指定できます:

  • earliest-offset:現在のパーティションの最も古いオフセットからデータを読み取ります。

  • latest-offset:現在のパーティションの最新のオフセットからデータを読み取ります。

  • group-offsets:properties.group.id オプションで指定された ID を持つコンシューマーグループによってコミットされたオフセットからデータを読み取ります。

  • timestamp:scan.startup.timestamp-millis で指定されたタイムスタンプ以上の最初のメッセージからデータを読み取ります。

  • specific-offsets:scan.startup.specific-offsets オプションで指定されたパーティションオフセットからデータを読み取ります。

説明
  • 開始オフセットを指定しない場合、デフォルトで group-offsets が使用されます。

  • scan.startup.mode はステートレスジョブに対してのみ有効です。ステートフルジョブの場合、消費はステートに保存されているオフセットから開始されます。

サンプルコード:

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- 最も古いオフセットからデータを消費します。
  'scan.startup.mode' = 'earliest-offset',
  -- 最新のオフセットからデータを消費します。
  'scan.startup.mode' = 'latest-offset',
  -- コンシューマーグループ my-group によってコミットされたオフセットからデータを消費します。
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- my-group を初めて使用する場合、消費は最も古いオフセットから開始されます。
  'properties.auto.offset.reset' = 'latest', -- my-group を初めて使用する場合、消費は最新のオフセットから開始されます。
  -- タイムスタンプ 1655395200000 (ミリ秒) からデータを消費します。
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- 指定されたオフセットからデータを消費します。
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

開始オフセットの優先順位

ソーステーブルの開始オフセットの優先順位は次のとおりです:

優先度 (高い順)

チェックポイントまたはセーブポイントに保存されているオフセット。

コンソールで指定された開始時刻。

WITH パラメーターの scan.startup.mode パラメーターで指定された開始オフセット。

scan.startup.mode が指定されていない場合、システムは group-offsets を使用し、対応するコンシューマーグループのオフセットからデータを消費します。

上記のいずれかのステップでオフセットが期限切れまたは Kafka クラスターの問題で無効になった場合、properties.auto.offset.reset で指定されたリセットポリシーが使用されます。この設定項目が設定されていない場合、ユーザーの介入が必要な例外がスローされます。

ほとんどの場合、Kafka ソーステーブルは新しいグループ ID を持つコンシューマーグループによってコミットされたオフセットからデータの読み取りを開始します。Kafka ソーステーブルが Kafka クラスターでコンシューマーグループによってコミットされたオフセットをクエリすると、グループ ID が初めて使用されるため、有効なオフセットは返されません。この場合、properties.auto.offset.reset パラメーターで設定されたリセット戦略がオフセットのリセットに使用されます。したがって、オフセットリセット戦略を指定するために properties.auto.offset.reset パラメーターを設定する必要があります。

ソーステーブルのオフセット送信

Kafka ソーステーブルは、チェックポイント操作が成功した後にのみ、コンシューマーオフセットを Kafka クラスターにコミットします。指定したチェックポイント間隔が長すぎると、コンシューマーオフセットの Kafka クラスターへのコミットが遅延します。チェックポイント操作中、Kafka ソーステーブルは現在のデータ読み取りの進行状況をステートバックエンドに保存します。Kafka クラスターにコミットされたオフセットは、障害回復には使用されません。コミットされたオフセットは、Kafka でのデータ読み取りの進行状況を監視するためにのみ使用されます。オフセットのコミットに失敗しても、データの正確性には影響しません。

シンクテーブルのカスタムパーティショナー

組み込みの Kafka プロデューサーパーティショナーが要件を満たさない場合は、カスタムパーティショナーを実装して特定のパーティションにデータを書き込むことができます。カスタムパーティショナーは FlinkKafkaPartitioner を継承する必要があります。開発後、JAR パッケージをコンパイルし、ファイル管理 機能を使用してリアルタイムコンピューティングコンソールにアップロードします。JAR パッケージがアップロードされ、参照された後、WITH 句で sink.partitioner パラメーターを設定します。パラメーター値は、パーティショナーの完全なクラスパスである必要があります。例:org.mycompany.MyPartitioner

Kafka、Upsert Kafka、Kafka JSON カタログの比較

Kafka はデータ挿入のみをサポートし、更新や削除をサポートしないメッセージキューシステムです。そのため、Kafka は上流システムからの Change Data Capture (CDC) データや、ストリーミング SQL 計算中の集計や結合などのオペレーターからのリトラクションロジックを処理できません。変更データやリトラクションデータを含むデータを Kafka に書き込みたい場合は、変更データを特別に処理する Upsert Kafka シンクテーブルを使用します。

上流データベースの 1 つまたは複数のデータテーブルから Kafka に変更データをバッチで同期したい場合は、Kafka JSON カタログを使用できます。Kafka に保存されているデータが JSON 形式の場合、Kafka JSON カタログを使用できます。これにより、WITH 句でスキーマとオプションを設定する必要がなくなります。詳細については、「Kafka JSON カタログの管理」をご参照ください。

例 1:Kafka トピックからデータを読み取り、別の Kafka トピックにデータを書き込む

次のコードサンプルは、ソース Kafka トピックからデータを読み取り、シンク Kafka トピックに書き込みます。データは CSV 形式です。

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

例 2:テーブルスキーマとデータの同期

Kafka コネクタを使用して、Kafka トピックから Hologres にメッセージをリアルタイムで同期します。Kafka メッセージのオフセットとパーティション ID をプライマリキーとして設定することで、フェールオーバー時に Hologres でのメッセージの重複を回避します。

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- オプション。すべてのネストされた列を展開します。
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

例 3:Kafka メッセージのキー列と値列のテーブルスキーマとデータの同期

Kafka メッセージのキーフィールドには関連情報が保存されています。Kafka メッセージのキー列と値列の両方のデータを同時に同期できます。

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
説明

Kafka メッセージキーはスキーマ進化と型解析をサポートしていません。手動での宣言が必要です。

例 4:テーブルスキーマとデータの同期および計算の実行

Kafka から Hologres にデータを同期する際、軽量な計算が必要です。

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- COALESCE を使用して null 値を処理します。

例 5:ネストされた JSON データの解析

JSON メッセージのサンプル

{
  "id": 101,
  "name": "VVP",
  "properties": {
    "owner": "Alibaba Cloud",
    "engine": "Flink"
  }
}

JSON_VALUE(payload, '$.properties.owner') のような関数を使用してフィールドを解析する必要を避けるために、Source DDL で直接構造を定義できます:

CREATE TEMPORARY TABLE kafka_source (
  id          VARCHAR,
  `name`      VARCHAR,
  properties  ROW<`owner` STRING, engine STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxx',
  'properties.bootstrap.servers' = 'xxx',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

その結果、Flink は読み取りフェーズで JSON を構造化フィールドに解析し、後続の SQL クエリは追加の関数呼び出しなしで直接 properties.owner を使用するため、全体的なパフォーマンスが向上します。

DataStream API

重要

DataStream API を使用してデータを読み書きする場合は、関連タイプの DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。

  • Kafka ソースの作成

    Kafka ソースは、KafkaSource のインスタンスを作成するためのビルダークラスを提供します。次のサンプルコードは、"input-topic" トピックの最も古いオフセットからメッセージを消費し、コンシューマーグループ名を my-group とし、Kafka メッセージ本文を文字列として逆シリアル化する Kafka ソースを構築する方法を示しています。

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    KafkaSource を作成する際には、以下のパラメーターを指定する必要があります。

    パラメーター

    説明

    BootstrapServers

    Kafka ブローカーのアドレス。setBootstrapServers(String) 操作を呼び出してアドレスを設定できます。

    GroupId

    コンシューマーグループの ID。setGroupId(String) メソッドを呼び出して ID を設定できます。

    Topics または Partition

    サブスクライブするトピックまたはパーティションの名前。以下のサブスクリプションパターンのいずれかを使用して、トピックまたはパーティションにサブスクライブするように Kafka ソースを設定できます:

    • トピックリスト。トピックリストを設定すると、Kafka ソースは指定されたトピックのすべてのパーティションにサブスクライブします。

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • トピックパターン。正規表現を指定すると、Kafka ソースは指定された正規表現に一致するトピックのすべてのパーティションにサブスクライブします。

      KafkaSource.builder().setTopicPattern("topic.*")
    • パーティションリスト。パーティションリストを設定すると、Kafka ソースは指定されたパーティションにサブスクライブします。

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // "topic-a" トピックのパーティション 0
              new TopicPartition("topic-b", 5)));  // "topic-b" トピックのパーティション 5
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    Kafka メッセージを逆シリアル化するデシリアライザ。

    setDeserializer(KafkaRecordDeserializationSchema) メソッドを呼び出してデシリアライザを指定できます。KafkaRecordDeserializationSchema インターフェイスは、ConsumerRecord オブジェクトがどのように逆シリアル化されるかを定義します。ConsumerRecord オブジェクトの Kafka メッセージの Value フィールドのみを逆シリアル化するには、以下のいずれかの方法を使用できます:

    • Kafka ソースは setValueOnlyDeserializer(DeserializationSchema) メソッドを提供します。DeserializationSchema クラスは、バイナリ値として保存されている Kafka メッセージがどのように逆シリアル化されるかを定義します。

    • Kafka の Deserializer インターフェイスを実装するクラスを使用します。たとえば、StringDeserializer クラスを使用してメッセージを文字列に逆シリアル化できます。

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    説明

    ConsumerRecord オブジェクトを逆シリアル化したい場合は、KafkaRecordDeserializationSchema インターフェイスを実装するクラスを作成する必要があります。

    XML

    Kafka DataStream コネクタは Maven 中央リポジトリに保存されています。

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    Kafka DataStream コネクタを使用する際には、以下の Kafka プロパティに精通している必要があります:

    • 開始オフセット

      Kafka ソースがデータの読み取りを開始する際に、オフセットイニシャライザを使用してオフセットを指定できます。オフセットイニシャライザは、OffsetsInitializer インターフェイスを実装するオブジェクトです。KafkaSource クラスは、以下の組み込みオフセットイニシャライザを提供します。

      オフセットイニシャライザ

      コード設定

      最も古いオフセットからデータを読み取ります。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      最新のオフセットからデータを読み取ります。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      指定された時刻 (ミリ秒) 以上のタイムスタンプでデータの消費を開始します。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      コンシューマーグループによってコミットされたオフセットから消費します。そのようなオフセットが存在しない場合は、最も古いオフセットを使用します。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      各パーティションのコミットされたオフセットからデータを読み取り、リセット戦略は指定されません。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      説明
      • 組み込みのオフセットイニシャライザがビジネス要件を満たさない場合は、カスタムオフセットイニシャライザを作成できます。

      • オフセットイニシャライザを指定しない場合、デフォルトで OffsetsInitializer.earliest() オフセットイニシャライザが使用されます。

    • ストリーミング実行モードとバッチ実行モード

      Kafka ソースは、ストリーミングモードまたはバッチモードで動作できます。デフォルトでは、Kafka ソースはストリーミングモードで動作します。このモードでは、デプロイメントは失敗するかキャンセルされるまで実行し続けます。Kafka ソースをバッチモードで動作させたい場合は、setBounded(OffsetsInitializer) メソッドを呼び出して停止オフセットを指定できます。すべてのパーティションが停止オフセットに達すると、Kafka ソースは終了します。

      説明

      ほとんどの場合、ストリーミングモードで動作する Kafka ソースには停止オフセットがありません。ストリーミングモードで動作する Kafka ソースをデバッグしたい場合は、setUnbounded(OffsetsInitializer) メソッドを呼び出して停止オフセットを指定できます。停止オフセットを指定する方法は、ストリーミングモードとバッチモードのどちらを使用するかによって異なります。

    • 動的パーティション検出

      実行中のデプロイメントが、デプロイメントを再起動することなく、サブスクリプションパターンに一致する新しいトピックや新しいパーティションからのデータを処理するようにしたい場合は、Kafka ソースで動的パーティション検出機能を有効にできます。DataStream コネクタでは、この機能はデフォルトで無効になっており、手動で有効にする必要があります:

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // 10 秒ごとに新しいパーティションを検出します。
      重要

      動的パーティション検出機能は、Kafka クラスターのメタデータ更新メカニズムに依存します。Kafka クラスターがパーティション情報を迅速に更新しない場合、新しいパーティションが検出されない可能性があります。Kafka クラスターの partition.discovery.interval.ms 設定が実際の状況と一致していることを確認してください。

    • イベント時間とウォーターマーク

      デフォルトでは、Kafka ソースはレコードに添付されたタイムスタンプをそのレコードのイベント時間として使用します。各レコードのイベント時間に基づいてウォーターマーク戦略を定義し、ウォーターマークを下流のサービスに送信できます。

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      カスタムウォーターマーク戦略の定義方法の詳細については、「ウォーターマークの生成」をご参照ください。

      説明

      一部のソースサブタスクが長期間アイドル状態になると (たとえば、Kafka パーティションが新しいメッセージを受信しない、またはソースの並列度が Kafka パーティション数を超えるなど)、ウォーターマークの生成が失敗する可能性があります。この場合、ウィンドウ計算がトリガーされず、データ処理が停止します。

      解決策は次のとおりです:

      • ウォーターマークのタイムアウトメカニズムを設定する:table.exec.source.idle-timeout パラメーターを有効にして、指定されたタイムアウト期間後にシステムがウォーターマークを強制的に生成するようにし、ウィンドウ計算エポックの進行を確保します。

      • データソースを最適化する:ソースの並列度を Kafka パーティション数以下に設定します。

    • コンシューマーオフセットのコミット

      チェックポイントが生成されると、Kafka ソースは各パーティションの Kafka コンシューマーオフセットを Kafka ブローカーにコミットします。これにより、Kafka ブローカーに記録された Kafka コンシューマーオフセットがチェックポイントの状態と一致することが保証されます。Kafka コンシューマーは、各パーティションのオフセットを定期的に Kafka ブローカーに自動的にコミットできます。自動オフセットコミット機能は、enable.auto.commit および auto.commit.interval.ms オプションを使用して設定できます。チェックポイント機能を無効にすると、Kafka ソースは Kafka コンシューマーに依存してオフセットを Kafka ブローカーにコミットします。

      説明

      Kafka ソースは、フォールトトレランスのために Kafka ブローカーに記録されたコミット済みオフセットを使用しません。オフセットをコミットすると、Kafka ブローカーは各パーティションでのレコード消費の進行状況を監視できます。

    • 追加のプロパティ

      setProperties(Properties) および setProperty(String, String) メソッドを呼び出して、Kafka ソースおよび Kafka コンシューマーに追加のプロパティを設定できます。次の表は、Kafka ソースのプロパティについて説明します。

      設定項目

      説明

      client.id.prefix

      Kafka コンシューマーのクライアント ID のプレフィックスを指定します。

      partition.discovery.interval.ms

      Kafka ソースが新しいパーティションをチェックする時間間隔を指定します。

      説明

      partition.discovery.interval.ms プロパティは、バッチモードでは -1 に上書きされます。

      register.consumer.metrics

      Realtime Compute for Apache Flink で Kafka コンシューマーのメトリックを登録するかどうかを指定します。

      その他の Kafka Consumer 設定

      Kafka コンシューマーのプロパティの詳細については、「Apache Kafka」をご参照ください。

      重要

      Kafka DataStream コネクタは、以下のプロパティの値を上書きします:

      • key.deserializer:このプロパティの値は ByteArrayDeserializer に設定されます。

      • value.deserializer:このプロパティの値は ByteArrayDeserializer に設定されます。

      • auto.offset.reset.strategy:このプロパティの値は OffsetsInitializer#getAutoOffsetResetStrategy() に設定されます。

      次のサンプルコードは、Kafka コンシューマーが JAAS 設定と SASL/PLAIN 認証メカニズムを使用して Kafka クラスターに接続する方法を示しています。

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • 監視

      Kafka ソースは、監視と診断のために Realtime Compute for Apache Flink にメトリックを登録します。

      • メトリクススコープ

        Kafka ソースのすべてのメトリックは、KafkaSourceReader メトリックグループの下に登録されます。KafkaSourceReader は、オペレーターメトリックグループのサブグループです。特定のパーティションのメトリックは、KafkaSourceReader.topic.<topic_name>.partition.<partition_id> メトリックグループに登録されます。

        たとえば、トピック名が my-topic で、トピックのパーティション名が 1 の場合、パーティションのコンシューマーオフセットは <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset メトリックによって報告されます。コンシューマーオフセットの成功したコミット数は、<some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded メトリックによって測定されます。

      • メトリック

        メトリック

        説明

        範囲

        currentOffset

        現在のコンシューマーオフセット

        TopicPartition

        committedOffset

        現在のコミットオフセット

        TopicPartition

        commitsSucceeded

        成功した送信数

        KafkaSourceReader

        commitsFailed

        失敗した送信数

        KafkaSourceReader

      • Kafka コンシューマーメトリック

        Kafka コンシューマーのメトリックは、KafkaSourceReader.KafkaConsumer メトリックグループに登録されます。たとえば、records-consumed-total メトリックは <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total に登録されます。

        register.consumer.metrics オプションを設定して、Kafka コンシューマーのメトリックを登録するかどうかを指定できます。デフォルトでは、register.consumer.metrics オプションは true に設定されています。Kafka コンシューマーのメトリックの詳細については、「Apache Kafka」をご参照ください。

  • Kafka シンクの作成

    Kafka シンクは、複数のストリームから 1 つまたは複数の Kafka トピックにデータを書き込むことができます。

    DataStream<String> stream = ...
    
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", );
    KafkaSink<String> kafkaSink =
                    KafkaSink.<String>builder()
                            .setKafkaProducerConfig(kafkaProperties) // // プロデューサー設定
                            .setRecordSerializer(
                                    KafkaRecordSerializationSchema.builder()
                                            .setTopic("my-topic") // ターゲットトピック
                                            .setKafkaValueSerializer(StringSerializer.class) // シリアル化スキーマ
                                            .build())
                            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // フォールトトレランス
                            .build();
    
    stream.sinkTo(kafkaSink);

    以下のパラメーターを設定する必要があります。

    パラメーター

    説明

    Topic

    データが書き込まれるトピックの名前。

    データシリアル化

    Kafka シンクを構築する際には、入力データを Kafka の ProducerRecord オブジェクトに変換するために KafkaRecordSerializationSchema を提供する必要があります。Flink は、メッセージキーと値のシリアル化、トピックの選択、メッセージのパーティショニングなどの一般的なコンポーネントを提供するスキーマビルダーを提供します。より高度な制御のために、対応するインターフェイスを実装することもできます。Kafka シンクは、受信する各レコードに対して ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) メソッドを呼び出し、シリアル化されたレコードを表す ProducerRecord オブジェクトを生成します。その後、Kafka シンクはその ProducerRecord オブジェクトを必要なトピックに書き込みます。

    各レコードが Kafka にどのように書き込まれるかを詳細に制御できます。ProducerRecord を使用して、以下の操作を実行できます:

    • ターゲットトピックの名前を設定します。

    • メッセージキーを定義します。

    • ターゲットパーティションを指定します。

    Kafka クライアントプロパティ

    bootstrap.servers プロパティは必須です。カンマ区切りの Kafka ブローカーアドレスのリストを指定します。

    フォールトトレランスセマンティクス

    チェックポイント機能を有効にすると、Kafka シンクは exactly-once 配信を保証できます。また、DeliveryGuarantee パラメーターを設定して、異なるフォールトトレランスセマンティクスを指定することもできます。DeliveryGuarantee パラメーターの詳細は以下のとおりです:

    • DeliveryGuarantee.NONE:Flink による配信保証は提供されません。データが失われたり、重複したりする可能性があります。

    • DeliveryGuarantee.AT_LEAST_ONCE:Kafka シンクはデータが失われないことを保証します。ただし、データが重複する可能性があります。

    • DeliveryGuarantee.EXACTLY_ONCE:Kafka シンクはデータが失われたり、重複したりしないことを保証します。Kafka トランザクションメカニズムを使用して exactly-once 配信を保証します。

      説明

      exactly-once セマンティクスの詳細については、「Semantic.EXACTLY_ONCE の使用上の注意」をご参照ください。

データインジェスト

Kafka コネクタは、YAML ベースのデータインジェストジョブでソースまたはシンクとして使用できます。

制限事項

  • VVR 11.1 以降を使用する Realtime Compute for Apache Flink の Flink CDC データインジェストでは、Kafka を同期データソースとして使用することを推奨します。

  • JSON、Debezium JSON、Canal JSON 形式のみがサポートされています。他のデータ形式はサポートされていません。

  • ソースの場合、同じテーブルのデータが複数のパーティションに分散できるのは、VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink のみです。

構文

source:
  type: kafka
  name: Kafka source
  properties.bootstrap.servers: localhost:9092
  topic: ${kafka.topic}
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

設定オプション

  • 一般

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    type

    ソースまたはシンクのタイプ。

    はい

    String

    なし

    値を kafka に設定します。

    name

    ソースまたはシンクの名前。

    いいえ

    String

    デフォルト値なし。

    N/A

    properties.bootstrap.servers

    Kafka ブローカーの IP アドレスとポート番号。

    はい

    String

    なし

    フォーマット:host:port,host:port,host:port。複数の host:port ペアはカンマ (,) で区切ります。

    properties.*

    Kafka クライアント用に設定されるオプション。

    いいえ

    String

    なし

    サフィックスは、公式 Kafka ドキュメントで定義されているプロデューサーまたはコンシューマー設定である必要があります。

    Flink は properties. プレフィックスを削除し、変換されたキーと値を Kafka クライアントに渡します。たとえば、properties.allow.auto.create.topics を false に設定して、自動トピック作成を無効にできます。

    key.format

    Kafka メッセージのキーフィールドを読み書きするために使用されるフォーマット。

    いいえ

    String

    なし

    • ソースの場合、json のみがサポートされます。

    • シンクの場合、有効な値は次のとおりです:

      • csv

      • json

    説明

    このオプションは、VVR 11.0.0 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

    value.format

    Kafka メッセージの値フィールドを読み書きするために使用されるフォーマット。

    いいえ

    String

    debezium-json

    有効な値:

    • debezium-json

    • canal-json

    • json

    説明
    • debezium-json および canal-json 形式は、VVR 8.0.10 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

    • json 形式は、VVR 11.0.0 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

  • source table

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    topic

    データを読み取るトピックの名前。

    いいえ

    String

    デフォルト値なし。

    複数のトピック名はセミコロン (;) で区切ります。例:topic-1 と topic-2。

    説明

    topic オプションは topic-pattern オプションと併用できません。

    topic-pattern

    トピックを照合するために使用される正規表現。ジョブの実行中に、指定された正規表現に名前が一致するすべてのトピックのデータが読み取られます。

    いいえ

    String

    デフォルト値なし。

    説明

    topic オプションは topic-pattern オプションと併用できません。

    properties.group.id

    コンシューマーグループ ID。

    いいえ

    String

    なし

    指定されたグループ ID を初めて使用する場合は、properties.auto.offset.reset を earliest または latest に設定して、初期開始オフセットを指定する必要があります。

    scan.startup.mode

    Kafka がデータを読み取る開始オフセット。

    いいえ

    String

    group-offsets

    有効な値:

    • earliest-offset:最も古いパーティションからデータを読み取ります。

    • latest-offset:最新のオフセットからデータを読み取ります。

    • group-offsets (デフォルト):properties.group.id で指定されたグループのコミット済みオフセットから読み取りを開始します。

    • timestamp:scan.startup.timestamp-millis で指定されたタイムスタンプから読み取ります。

    • specific-offsets:scan.startup.specific-offsets で指定されたオフセットから読み取ります。

    説明

    このオプションは、ジョブがステートなしで開始された場合に有効になります。ジョブがチェックポイントから再開されたり、指定されたステートから再開されたりすると、ジョブは優先的にステートデータに保存されている進行状況でデータの読み取りを開始します。

    scan.startup.specific-offsets

    scan.startup.mode オプションが specific-offsets に設定されている場合の各パーティションの開始オフセット。

    いいえ

    String

    なし

    例:partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    scan.startup.mode オプションが timestamp に設定されている場合の開始オフセットのタイムスタンプ。

    いいえ

    Long

    なし

    単位:ミリ秒。

    scan.topic-partition-discovery.interval

    Kafka のトピックとパーティションを動的に検出する時間間隔。

    いいえ

    Duration

    5 分

    デフォルトのパーティション検出間隔は 5 分です。この機能を無効にするには、このオプションを明示的に非正の値に設定する必要があります。動的パーティション検出機能が有効になると、Kafka ソースは新しいパーティションを自動的に検出し、そのパーティションからデータを読み取ることができます。topic-pattern モードでは、Kafka ソースは既存のトピックの新しいパーティションからデータを読み取り、正規表現に一致する新しいトピックのすべてのパーティションからデータを読み取ります。

    scan.check.duplicated.group.id

    properties.group.id で指定された重複するコンシューマーグループをチェックするかどうか。

    いいえ

    Boolean

    false

    有効な値:

    • true:ジョブが開始する前に重複するコンシューマーグループをチェックします。重複するコンシューマーグループが存在する場合、エラーを報告し、競合を防ぐためにジョブを一時停止します。

    • false:ジョブが開始する前に重複するコンシューマーグループをチェックしません。

    schema.inference.strategy

    スキーマ推論戦略。

    いいえ

    String

    continuous

    有効な値:

    • continuous:各レコードのスキーマを推論します。スキーマに互換性がない場合、より広いスキーマを推論し、スキーマ変更イベントを生成します。

    • static:ジョブ開始時にスキーマ推論を一度だけ実行します。後続のレコードは初期スキーマに基づいて解析されます。スキーマ変更イベントは生成されません。

    説明

    scan.max.pre.fetch.records

    初期スキーマ推論中に、システムがパーティション内で消費および解析を試みるメッセージの最大数。

    いいえ

    Int

    50

    ジョブがデータを読み取って処理する前に、システムはスキーマ情報を初期化するために、パーティション内の特定の数の最新メッセージを事前に消費しようとします。

    key.fields-prefix

    Kafka メッセージのキーフィールドから解析されたフィールドに追加されるプレフィックス。Kafka メッセージのキーフィールドが解析された後の名前の競合を防ぐために、このオプションを設定します。

    いいえ

    String

    なし

    たとえば、このオプションが key_ に設定され、キーフィールドに a という名前のフィールドが含まれている場合、解析後のフィールド名は key_a になります。

    説明

    key.fields-prefix オプションの値は、value.fields-prefix オプションのプレフィックスであってはなりません。

    value.fields-prefix

    Kafka メッセージの値フィールドから解析されたフィールドに追加されるプレフィックス。Kafka メッセージの値フィールドが解析された後の名前の競合を防ぐために、このオプションを設定できます。

    いいえ

    String

    なし

    たとえば、このオプションが value_ に設定され、値フィールドに b という名前のフィールドが含まれている場合、解析後のフィールド名は value_b になります。

    説明

    value.fields-prefix オプションの値は、key.fields-prefix オプションのプレフィックスであってはなりません。

    metadata.list

    下流のストレージに渡すメタデータ列。

    いいえ

    String

    なし

    利用可能なメタデータ列には、topicpartitionoffsettimestamptimestamp-typeheadersleader-epoch__raw_key__、および __raw_value__ があり、カンマで区切られます。

    scan.value.initial-schemas.ddls

    DDL 文を使用して特定のテーブルの初期スキーマを指定します。

    いいえ

    String

    なし

    複数の DDL 文は英語のセミコロン (;) で接続されます。たとえば、CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT); を使用して、テーブル db1.t1 と db1.t2 の初期スキーマを指定します。

    ここでの DDL テーブル構造は、ターゲットテーブルと一致し、Flink SQL 構文ルールに準拠する必要があります。

    説明

    VVR バージョン 11.5 以降はこの設定をサポートしています。

    ingestion.ignore-errors

    データ解析中にエラーを無視するかどうかを指定します。

    いいえ

    Boolean

    false

    説明

    この設定は VVR 11.5 以降のバージョンでサポートされています。

    ingestion.error-tolerance.max-count

    データ解析中にエラーが無視される場合、ジョブが失敗するまでの解析エラーの数。

    いいえ

    Integer

    -1

    このオプションは、ingestion.ignore-errors が有効な場合にのみ有効です。デフォルト値 -1 は、解析例外がジョブの失敗をトリガーしないことを意味します。

    説明

    Ververica Runtime (VVR) バージョン 11.5 以降はこの設定をサポートしています。

    • Debezium JSON 形式のソーステーブル

      パラメーター

      必須

      データの型

      デフォルト値

      説明

      debezium-json.distributed-tables

      いいえ

      Boolean

      false

      Debezium JSON の単一テーブルのデータが複数のパーティションに現れる場合は、このオプションを有効にする必要があります。

      説明

      このオプションは、VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

      重要

      このオプションを設定した後、ステートなしでデプロイメントを開始する必要があります。

      debezium-json.schema-include

      いいえ

      Boolean

      false

      Debezium Kafka Connect を設定する際、Kafka 設定 value.converter.schemas.enable を有効にしてメッセージにスキーマ情報を含めることができます。このオプションは、Debezium JSON メッセージにスキーマ情報を含めるかどうかを指定します。

      有効な値:

      • true:Debezium JSON メッセージにスキーマ情報が含まれます。

      • false:Debezium JSON メッセージにスキーマ情報が含まれません。

      debezium-json.ignore-parse-errors

      いいえ

      Boolean

      false

      有効な値:

      • true:解析例外が発生した場合、現在の行をスキップします。

      • false (デフォルト):エラーを返し、デプロイメントの開始に失敗します。

      debezium-json.infer-schema.primitive-as-string

      いいえ

      Boolean

      false

      テーブルスキーマを解析する際に、すべてのデータ型を STRING として解釈するかどうかを指定します。

      有効な値:

      • true:すべての基本型を STRING として解釈します。

      • false (デフォルト) の場合、解析は基本ルールに従います。

    • ソーステーブル用の Canal JSON フォーマット

      パラメーター

      必須

      データの型

      デフォルト値

      説明

      canal-json.distributed-tables

      いいえ

      Boolean

      false

      Canal JSON の単一テーブルのデータが複数のパーティションに現れる場合は、このオプションを有効にする必要があります。

      説明

      このオプションは、VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

      重要

      このオプションを設定した後、ステートなしでデプロイメントを開始する必要があります。

      canal-json.database.include

      いいえ

      String

      なし

      Canal レコードのデータベースメタデータフィールドに一致するオプションの正規表現。指定されたデータベースの変更ログのみが読み取られます。正規表現文字列は Java の Pattern と互換性があります。

      canal-json.table.include

      いいえ

      String

      なし

      Canal レコードのテーブルメタデータフィールドに一致するオプションの正規表現。指定されたテーブルの変更ログレコードのみが読み取られます。正規表現は Java の Pattern と互換性があります。

      canal-json.ignore-parse-errors

      いいえ

      Boolean

      false

      有効な値:

      • true:解析例外が発生した場合、現在の行をスキップします。

      • false (デフォルト):エラーを返し、デプロイメントの開始に失敗します。

      canal-json.infer-schema.primitive-as-string

      いいえ

      Boolean

      false

      テーブルスキーマを解析する際に、すべてのデータ型を STRING として解釈するかどうかを指定します。

      有効な値:

      • true:すべての基本型を STRING として解釈します。

      • false (デフォルト):パーサは基本ルールに従います。

      canal-json.infer-schema.strategy

      いいえ

      String

      AUTO

      スキーマ推論戦略。

      有効な値:

      • AUTO (デフォルト):JSON データを解析してスキーマを自動的に推論します。データに sqlType フィールドが含まれていない場合は、解析の失敗を避けるために AUTO を使用します。

      • SQL_TYPE:Canal JSON データの sqlType 配列を使用してスキーマを推論します。データに sqlType フィールドが含まれている場合は、より正確な型推論のために canal-json.infer-schema.strategy を SQL_TYPE に設定することを推奨します。

      • MYSQL_TYPE:Canal JSON データの mysqlType 配列を使用してスキーマを推論します。

      Kafka の Canal JSON データに sqlType フィールドが含まれており、より正確な型マッピングが必要な場合は、canal-json.infer-schema.strategy を SQL_TYPE に設定します。

      sqlType マッピングルールの詳細については、「Canal JSON スキーマ解析」をご参照ください。

      説明
      • Ververica Runtime (VVR) 11.1 以降のバージョンはこの設定をサポートしています。

      • MYSQL_TYPE は、VVR 11.3 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

      canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled

      いいえ

      Boolean

      true

      MySQL TIMESTAMP を CDC TIMESTAMP にマッピングするかどうかを指定します:

      • true (デフォルト):MySQL TIMESTAMP を CDC TIMESTAMP にマッピングします。

      • false:MySQL TIMESTAMP を CDC TIMESTAMP_LTZ にマッピングします。

      canal-json.mysql.treat-tinyint1-as-boolean.enabled

      いいえ

      Boolean

      true

      MYSQL_TYPE を使用して解析する場合、MySQL TINYINT(1) を CDC BOOLEAN にマッピングするかどうかを指定します:

      • true (デフォルト):MySQL TINYINT(1) を CDC BOOLEAN にマッピングします。

      • false:MySQL TINYINT(1) を CDC TINYINT(1) にマッピングします。

      このオプションは、canal-json.infer-schema.strategy が MYSQL_TYPE に設定されている場合にのみ有効です。

    • ソーステーブル JSON 形式

      パラメーター

      必須

      データの型

      デフォルト値

      説明

      json.timestamp-format.standard

      いいえ

      String

      SQL

      入出力のタイムスタンプ形式を指定します。有効な値:

      • SQL:yyyy-MM-dd HH:mm:ss.s{precision} 形式の入力タイムスタンプを解析します。例:2020-12-30 12:13:14.123。

      • ISO-8601:yyyy-MM-ddTHH:mm:ss.s{precision} 形式の入力タイムスタンプを解析します。例:2020-12-30T12:13:14.123。

      json.ignore-parse-errors

      いいえ

      Boolean

      false

      有効な値:

      • true:解析例外が発生した場合、現在の行をスキップします。

      • false (デフォルト):エラーを返し、デプロイメントの開始に失敗します。

      json.infer-schema.primitive-as-string

      いいえ

      Boolean

      false

      テーブルスキーマを解析する際に、すべてのデータ型を STRING として解釈するかどうかを指定します。

      有効な値:

      • true:すべての基本型を STRING として解釈します。

      • false (デフォルト):基本ルールに従って解析します。

      json.infer-schema.flatten-nested-columns.enable

      いいえ

      Boolean

      false

      JSON 形式のデータを解析する際、ネストされた列を再帰的に展開するかどうかを指定する必要があります。このパラメーターの有効な値は次のとおりです:

      • true:ネストされた列を再帰的に展開します。

      • false (デフォルト):ネストされた型を STRING として扱います。

      json.decode.parser-table-id.fields

      いいえ

      String

      なし

      特定の JSON フィールドの値に基づいて tableId を生成するかどうかを指定します。複数のフィールドはカンマ , で区切ります。たとえば、JSON データが {"col0":"a", "col1","b", "col2","c"} の場合、生成される結果は次のようになります:

      設定

      tableId

      col0

      a

      col0,col1

      a.b

      col0,col1,col2

      a.b.c

      json.infer-schema.fixed-types

      いいえ

      String

      デフォルト値なし。

      JSON 形式のデータを解析する際、特定のフィールドの正確なデータ型を指定します。複数のフィールドは英語のカンマ (,) で区切ります。たとえば、id BIGINT, name VARCHAR(10) は、JSON データの id フィールドを BIGINT、name フィールドを VARCHAR(10) として指定します。

      この設定を使用する場合、scan.max.pre.fetch.records: 0 設定も追加する必要があります。

      説明

      このオプションは、VVR 11.5 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

  • シンク固有

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    type

    シンクのタイプ。

    はい

    String

    なし

    値を Kafka に設定します。

    name

    シンクの名前。

    いいえ

    String

    なし

    N/A

    topic

    Kafka トピックの名前。

    いいえ

    String

    なし

    このオプションが有効な場合、すべてのデータはこのトピックに書き込まれます。

    説明

    このオプションが有効でない場合、各データレコードは、そのテーブル ID 文字列 (データベース名とテーブル名をピリオド (.) で連結したもの) から派生した名前のトピックに書き込まれます。例:databaseName.tableName

    partition.strategy

    Kafka のパーティショニング戦略。

    いいえ

    String

    all-to-zero

    有効な値:

    • all-to-zero (デフォルト):すべてのデータをパーティション 0 に書き込みます。

    • hash-by-key:プライマリキーのハッシュ値に基づいてデータをパーティションに書き込みます。これにより、同じプライマリキーを持つデータが同じパーティションにあり、順序付けられていることが保証されます。

    sink.tableId-to-topic.mapping

    上流のテーブル名と下流の Kafka トピック名のマッピング。

    いいえ

    String

    なし

    各マッピング関係は ; で区切られます。先祖テーブル名とそれに対応する下流の Kafka トピック名は : で区切られます。テーブル名には正規表現を使用でき、同じトピックにマッピングされる複数のテーブルは , を使用して連結できます。例:mydb.mytable1:topic1;mydb.mytable2:topic2

    説明

    このパラメーターを設定すると、元のテーブル名情報を保持したまま、マッピングされたトピックを変更できます。

    • Debezium JSON 形式のシンクテーブル

      パラメーター

      必須

      データの型

      デフォルト値

      説明

      debezium-json.include-schema.enabled

      いいえ

      Boolean

      false

      Debezium JSON データにスキーマ情報を含めるかどうかを指定します。

  • Kafka からデータをインジェストする:

    source:
      type: kafka
      name: Kafka source
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      topic: ${kafka.topic}
      value.format: ${value.format}
      scan.startup.mode: ${scan.startup.mode}
     
    sink:
      type: hologres
      name: Hologres sink
      endpoint: <yourEndpoint>
      dbname: <yourDbname>
      username: ${secret_values.ak_id}
      password: ${secret_values.ak_secret}
      sink.type-normalize-strategy: BROADEN
  • Kafka にデータをインジェストする:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${kafka.topic}

    route セクションで、宛先の Kafka トピックの名前を指定します。

説明

デフォルトでは、Alibaba Cloud Kafka の自動トピック作成機能は無効になっています。詳細については、「自動トピック作成に関するよくある質問」をご参照ください。Alibaba Cloud Kafka にデータを書き込む場合は、事前に対応するトピックを作成する必要があります。詳細については、「ステップ 3:リソースの作成」をご参照ください。

スキーマの解析と進化に関するポリシー

Kafka コネクタは、既知のすべてのテーブルのスキーマを維持します。

スキーマの初期化

スキーマ情報には、フィールドとデータ型の情報、データベースとテーブルの情報、およびプライマリキーの情報が含まれます。これら 3 種類の情報がどのように初期化されるかを以下に説明します:

  • フィールドとデータ型の情報

データインジェストジョブは、データからフィールドとデータ型の情報を自動的に推論できます。ただし、一部のシナリオでは、特定のテーブルのフィールドと型の情報を指定したい場合があります。ユーザーが指定するフィールド型の粒度に基づいて、スキーマの初期化は以下の 3 つの戦略をサポートします:

  1. システムによって完全に推論されるスキーマ

Kafka メッセージが読み取られる前に、Kafka コネクタは各パーティションのメッセージを消費しようとし、各データレコードのスキーマを解析し、その後スキーマをマージしてテーブルスキーマ情報を初期化します。消費できるメッセージの数は、scan.max.pre.fetch.records オプションの値以下です。データが消費される前に、初期化されたスキーマに基づいてテーブル作成イベントが生成されます。

説明

Debezium JSON および Canal JSON 形式の場合、テーブル情報は特定のメッセージに含まれます。事前に消費するメッセージの数は、scan.max.pre.fetch.records パラメーターによって指定されます。これらの事前に消費されたメッセージには、複数のテーブルのデータが含まれている可能性があります。したがって、各テーブルの事前に消費されたデータレコードの数は決定できません。パーティションメッセージの事前消費とテーブルスキーマの初期化は、各パーティションのメッセージの実際の消費と処理の前に一度だけ実行されます。後続のテーブルデータが存在する場合、テーブルの最初のデータレコードから解析されたテーブルスキーマが初期テーブルスキーマとして使用されます。この場合、パーティションメッセージの事前消費とテーブルスキーマの初期化は再度実行されません。

重要

単一テーブルのデータが複数のパーティションに分散できるのは、Ververica Runtime (VVR) 8.0.11 以降のみです。このシナリオでは、debezium-json.distributed-tables または canal-json.distributed-tables オプションを true に設定する必要があります。

  1. 初期スキーマの指定

一部のシナリオでは、たとえば Kafka から事前に作成された子孫テーブルにデータを書き込む場合など、初期テーブルスキーマを自分で指定したい場合があります。これを行うには、scan.value.initial-schemas.ddls パラメーターを追加します。以下のコードは設定例です:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # 初期スキーマを設定します。
  scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);

CREATE TABLE 文は、ターゲットテーブルのスキーマと一致する必要があります。ここでは、テーブル db1.t1 の id フィールドの初期型は BIGINT に、name フィールドの初期型は VARCHAR(10) に設定されます。同様に、テーブル db1.t2 の id フィールドの初期型は BIGINT に設定されます。

CREATE TABLE 文は Flink SQL 構文を使用します。

  1. フィールド型の固定

一部のシナリオでは、たとえば TIMESTAMP 型として推論される可能性のある特定のフィールドを文字列として配信したい場合など、特定のフィールドのデータ型を固定したい場合があります。この場合、json.infer-schema.fixed-types パラメーターを追加して初期テーブルスキーマを指定できます (メッセージ形式が JSON の場合のみ有効)。設定例:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # 特定のフィールドを静的型に固定します。
  json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
  scan.max.pre.fetch.records: 0

これにより、すべての id フィールドの型が BIGINT に、すべての name フィールドの型が VARCHAR(10) に固定されます。

ここでの型は Flink SQL データ型と一致します。

  • データベースとテーブルの情報

    • Canal JSON および Debezium JSON 形式の場合、データベースとテーブルの名前は個々のメッセージから解析されます。

    • デフォルトでは、JSON 形式のメッセージの場合、テーブル情報にはテーブル名 (データを含むトピックの名前) のみが含まれます。データにデータベースとテーブルの情報が含まれている場合は、json.infer-schema.fixed-types パラメーターを使用して、この情報を含むフィールドを指定できます。これらのフィールドをデータベース名とテーブル名にマッピングします。以下のコードは設定例です:

      source:
        type: kafka
        name: Kafka Source
        properties.bootstrap.servers: host:9092
        topic: test-topic
        value.format: json
        scan.startup.mode: earliest-offset
        # col1 フィールドの値をデータベース名として、col2 フィールドの値をテーブル名として使用します。
        json.decode.parser-table-id.fields: col1,col2

      これにより、各レコードは、データベース名が col1 フィールドの値、テーブル名が col2 フィールドの値であるテーブルに書き込まれます。

  • プライマリキー

    • Canal JSON 形式の場合、テーブルのプライマリキーは JSON の pkNames フィールドに基づいて定義されます。

    • Debezium JSON および JSON 形式の場合、JSON にはプライマリキー情報が含まれていません。transform ルールを使用して、テーブルに手動でプライマリキーを追加できます:

      transform:
        - source-table: \.*\.\.*
          projection: \*
          primary-keys: key1, key2

スキーマの解析とスキーマ進化

初期スキーマ同期が完了した後、schema.inference.strategystatic に設定されている場合、Kafka コネクタは各メッセージの値を初期テーブルスキーマに基づいて解析し、スキーマ変更イベントを生成しません。一方、schema.inference.strategycontinuous に設定されている場合、Kafka コネクタは各 Kafka メッセージの値部分を物理列として解析し、その列を現在維持されているスキーマと比較します。解析されたスキーマが現在のスキーマと不一致の場合、Kafka コネクタはスキーマをマージしようと試み、対応するテーブルスキーマ変更イベントを生成します。マージ規則は以下のとおりです:

  • 解析された物理列に現在のスキーマにないフィールドが含まれている場合、Kafka コネクタはそれらのフィールドをスキーマに追加し、NULL 許容の列追加イベントを生成します。

  • 解析された物理列に現在のスキーマに既に存在するフィールドが含まれていない場合、それらのフィールドは保持され、値は NULL で埋められます。列削除イベントは生成されません。

  • 解析された物理列と現在のスキーマに同じ名前の列が含まれている場合、以下のように処理します:

    • データ型が同じで精度のみ異なる場合、より高い精度の型を使用し、列型変更イベントを生成します。

    • データ型が異なる場合、ツリー構造における最小の親ノードを、同名の列の型として使用し、列型変更イベントを生成します。

      image

  • サポートされるスキーマ進化オプション:

    • 列の追加:新しい列を現在のスキーマの末尾に追加し、新しい列のデータを同期します。新しい列は NULL 許容になります。

    • 列の削除:列削除イベントは生成されません。代わりに、その後のその列のデータは自動的に NULL 値で埋められます。

    • 列の名前変更:列の追加と削除として扱われます。名前変更された列がスキーマの末尾に追加され、元の列のデータは NULL 値で埋められます。

    • 列のデータ型の変更:

      • 下流システムが列型変更をサポートしている場合、データインジェストジョブは、下流シンクが列型変更の処理をサポートするようになると、通常の列の型変更 (たとえば INT から BIGINT) をサポートします。このような変更は、下流シンクがサポートする列型変更規則に依存します。異なるシンクテーブルは異なる列型変更規則をサポートします。関連するシンクテーブルのドキュメントを参照して、サポートされている列型変更規則について確認してください。

      • Hologres のような列型変更をサポートしない下流システムの場合、ワイド型マッピング を使用できます。この方法では、ジョブ開始時に下流システムに汎用性の高いデータ型を持つテーブルを作成します。列型変更が発生した場合、システムは下流シンクがその変更を受け入れられるかどうかを判断し、これにより列型変更に対する許容的なサポートを実現します。

  • サポートされていないスキーマ変更:

    • プライマリキーまたはインデックスなどの制約の変更。

    • NOT NULL から NULLABLE への変更。

  • Canal JSON のスキーマ解析

    Canal JSON 形式のデータには、オプションの sqlType フィールドが含まれており、データ列の正確な型情報を提供します。より正確なスキーマを得るためには、canal-json.infer-schema.strategy 設定を SQL_TYPE に設定して、sqlType からの型を使用できます。型マッピング関係は以下のとおりです:

    JDBC データ型

    型コード

    CDC データ型

    BIT

    -7

    BOOLEAN

    BOOLEAN

    16

    TINYINT

    -6

    TINYINT

    SMALLINT

    -5

    SMALLINT

    INTEGER

    4

    INT

    BIGINT

    -5

    BIGINT

    DECIMAL

    3

    DECIMAL(38,18)

    NUMERIC

    2

    REAL

    7

    FLOAT

    FLOAT

    6

    DOUBLE

    8

    DOUBLE

    BINARY

    -2

    BYTES

    VARBINARY

    -3

    LONGVARBINARY

    -4

    BLOB

    2004

    DATE

    91

    DATE

    TIME

    92

    TIME

    TIMESTAMP

    93

    TIMESTAMP

    CHAR

    1

    STRING

    VARCHAR

    12

    LONGVARCHAR

    -1

    その他のデータ型

ダーティデータの許容と収集

一部のケースでは、Kafka データソースに不正なデータ (ダーティデータ) が含まれている可能性があります。このようなダーティデータによってジョブが頻繁に再起動されるのを防ぐために、ジョブを設定してこれらの例外を無視できるようにできます。設定例:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # ダーティデータの許容を有効にします。
  ingestion.ignore-errors: true
  # 最大 1000 件のダーティデータレコードを許容します。
  ingestion.error-tolerance.max-count: 1000

この設定により、最大 1000 件のダーティデータレコードが許容され、少量のダーティデータが存在する場合でもジョブが正常に実行されます。ダーティデータレコードの数がこのしきい値を超えると、ジョブは失敗し、データの検証が必要になります。

ダーティデータによってジョブが失敗しないことを保証するために、次の設定を使用できます:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # ダーティデータの許容を有効にします。
  ingestion.ignore-errors: true
  # 全てのダーティデータを許容します。
  ingestion.error-tolerance.max-count: -1

ダーティデータの許容ポリシーにより、異常なデータによってジョブが頻繁に失敗することを防ぎます。また、Kafka プロデューサーの動作を調整するために、ダーティデータについて詳しく知りたい場合もあります。ダーティデータ収集 で説明されているプロセスでは、ジョブのダーティデータを TaskManager のログファイルで確認できます。設定例:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # ダーティデータの許容を有効にします。
  ingestion.ignore-errors: true
  # 全てのダーティデータを許容します。
  ingestion.error-tolerance.max-count: -1

pipeline:
  dirty-data.collector:
    # ダーティデータを TaskManager のログファイルに書き込みます。
    type: logger

テーブル名とトピックのマッピング戦略

Kafka をデータインジェストシンクとして使用する場合、メッセージ形式 (debezium-json や canal-json) には多くの場合テーブル名情報が含まれています。コンシューマーは通常、トピック名ではなく、このテーブル名を実際のテーブル名として使用します。したがって、テーブル名とトピックのマッピング戦略を注意深く設定する必要があります。

MySQL から mydb.mytable1 および mydb.mytable2 の 2 つのテーブルを同期する必要があると仮定します。可能なマッピング戦略は以下のとおりです:

1. 何のマッピング戦略も設定しない

マッピング戦略を設定しない場合、各テーブルはデータベース名とテーブル名を基にした名前のトピック (例:mydb.mytable1) に書き込まれます。したがって、mydb.mytable1 のデータは mydb.mytable1 トピックに、mydb.mytable2 のデータは mydb.mytable2 トピックに書き込まれます。設定例:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

2. ルートルールによるマッピングの設定 (推奨されません)

多くのシナリオでは、ユーザーはデータベース名とテーブル名を含むトピック名を望まず、代わりに特定のトピックにデータをマップするルートルールを設定します。設定例:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  
 route:
  - source-table: mydb.mytable1,mydb.mytable2
    sink-table: mytable1

この場合、mydb.mytable1 および mydb.mytable2 のすべてのデータが mytable1 トピックに書き込まれます。

ただし、ルートルールを使用してトピック名を変更すると、Kafka メッセージ (debezium-json や canal-json 形式) 内のテーブル名も変更されます。その結果、このトピック内のすべてのメッセージのテーブル名は mytable1 になります。このトピックからメッセージを消費する他のシステムは、予期しない動作をする可能性があります。

3. sink.tableId-to-topic.mapping パラメーターによるマッピングの設定 (推奨)

元のテーブル名情報を保持したままカスタムトピックにマッピングするには、sink.tableId-to-topic.mapping パラメーターを使用します。設定例:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

または:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

この設定では、`mydb.mytable1` および `mydb.mytable2` のすべてのデータが `mytable1` トピックに書き込まれますが、元のテーブル名 (`mydb.mytable1` または `mydb.mytable2`) は Kafka メッセージ形式 (`debezium-json` や `canal-json`) 内で保持されます。これにより、このトピックからメッセージを消費する他のシステムが、ソーステーブル名情報を正しく取得できます。

EXACTLY_ONCE セマンティクスに関する考慮事項

  • コンシューマー分離レベルの設定

    Kafka データを消費するすべてのアプリケーションは、isolation.level を設定する必要があります:

    • read_committed:コミット済みのデータのみを読み取ります。

    • read_uncommitted (デフォルト):未コミットのデータを読み取ることもできます。

    EXACTLY_ONCE は read_committed に依存しています。そうでないと、コンシューマーが未コミットのデータを読み取ってしまい、一貫性が損なわれる可能性があります。

  • トランザクションタイムアウトとデータ損失

    チェックポイントから復旧する際、Flink はチェックポイントの開始前にコミットされたトランザクションのみに依存します。ジョブのクラッシュと再起動の間隔が Kafka トランザクションのタイムアウトを超えている場合、Kafka は自動的にトランザクションを中止し、データが失われる可能性があります。

    • Kafka ブローカーのデフォルトの transaction.max.timeout.ms = 15 分。

    • Flink Kafka シンクは、デフォルトで transaction.timeout.ms を 1 時間に設定します。

    • ブローカーの transaction.max.timeout.ms を Flink の設定以上に増やす必要があります。

  • Kafka プロデューサープールと同時チェックポイント

    EXACTLY_ONCE モードでは、固定サイズの Kafka プロデューサープールが使用されます。各チェックポイントはプールから 1 つのプロデューサーを消費します。同時チェックポイント数がプールサイズを超えると、ジョブは失敗します。

    プールサイズは、最大同時チェックポイント数に応じて調整してください。

  • スケールダウンの制限

    最初のチェックポイントの前にジョブが失敗した場合、プロデューサープール情報は再起動後に保持されません。したがって、最初のチェックポイントが完了するまでは、ジョブの並列度をスケールダウンしないでください。スケールダウンする必要がある場合は、並列度を FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR 未満にしてはいけません。

  • トランザクションによる読み取りのブロッキング

    read_committed モードでは、未完了 (コミットも中止もされていない) のトランザクションが、トピック全体からの読み取りをブロックします。

    たとえば:

    • トランザクション 1 がデータを書き込みます。

    • トランザクション 2 がデータを書き込み、コミットします。

    • トランザクション 1 が完了するまで、トランザクション 2 のデータはコンシューマーには表示されません。

    したがって:

    • 通常の運用では、データの可視性は平均チェックポイント間隔分遅延します。

    • ジョブが失敗した場合、書き込み中のトピックはジョブの再起動またはトランザクションのタイムアウトまでコンシューマーの読み取りをブロックします。極端な場合、トランザクションのタイムアウトが読み取りにも影響を与える可能性があります。

よくある質問