すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ApsaraMQ for Kafka

最終更新日:Dec 04, 2025

このトピックでは、ApsaraMQ for Kafka コネクタの使用方法について説明します。

背景情報

Apache Kafka は、オープンソースの分散メッセージキューシステムです。パフォーマンス専有型のデータ処理、ストリーム分析、データ統合などのビッグデータ分野で広く使用されています。Kafka コネクタは、オープンソースの Apache Kafka クライアントに基づいています。高いデータスループットを提供し、複数のデータ形式の読み書きをサポートし、Realtime Compute for Apache Flink に 1 回限りのセマンティクスを提供します。

カテゴリ

詳細

サポートタイプ

ソーステーブル、結果テーブル、データ統合ターゲット

ランタイムモード

ストリーミングモード

データ形式

サポートされているデータ形式

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

説明
  • 組み込みの Protobuf データ形式は、Ververica Runtime (VVR) 8.0.9 以降でのみサポートされます。

  • サポートされている各データ形式には、WITH 句で直接使用できる対応する設定項目があります。詳細については、Flink コミュニティドキュメントをご参照ください。

特定の監視メトリック

特定の監視メトリック

  • ソーステーブル

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • シンクテーブル

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

説明

メトリックの詳細については、「メトリックの説明」をご参照ください。

API タイプ

SQL、DataStream、データ統合 YAML

結果テーブルのデータの更新または削除

コネクタは結果テーブルのデータの更新または削除をサポートしていません。データの挿入のみをサポートしています。

説明

データの更新と削除に関連する機能については、「Upsert Kafka」をご参照ください。

前提条件

クラスターへの接続方法は次のとおりです:

  • Alibaba Cloud ApsaraMQ for Kafka クラスターへの接続

    • Kafka クラスターのバージョンが 0.11 以降であること。

    • ApsaraMQ for Kafka クラスターを作成済みであること。詳細については、「リソースの作成」をご参照ください。

    • Flink ワークスペースと Kafka クラスターが同じ VPC 内にあり、ApsaraMQ for Kafka クラスターのホワイトリストに Flink が追加されていること。詳細については、「ホワイトリストの設定」をご参照ください。

    重要

    ApsaraMQ for Kafka へのデータ書き込みに関する制限事項:

    • ApsaraMQ for Kafka は、zstd 圧縮形式でのデータ書き込みをサポートしていません。

    • ApsaraMQ for Kafka は、べき等な書き込みやトランザクション書き込みをサポートしていません。そのため、Kafka 結果テーブルの 1 回限りのセマンティクス機能は使用できません。Ververica Runtime (VVR) 8.0.0 以降を使用する場合は、結果テーブルproperties.enable.idempotence=false 設定項目を追加して、べき等な書き込みを無効にする必要があります。ApsaraMQ for Kafka のストレージエンジンと機能制限の比較については、「ストレージエンジンの比較」をご参照ください。

  • 自己管理型 Apache Kafka クラスターへの接続

    • セルフマネージド Apache Kafka クラスターのバージョンが 0.11 以降であること。

    • Flink とセルフマネージド Apache Kafka クラスター間のネットワーク接続が確立されていること。パブリックネットワーク経由でセルフマネージドクラスターに接続する方法については、「ネットワーク接続タイプの選択」をご参照ください。

    • Apache Kafka 2.8 のクライアント設定項目のみがサポートされています。詳細については、Apache Kafka ドキュメントの コンシューマーおよびプロデューサー設定をご参照ください。

注意事項

現在、Flink と Kafka の設計上のバグのため、トランザクション書き込みの使用は推奨されませんsink.delivery-guarantee = exactly-once を設定すると、Kafka コネクタはトランザクション書き込みを有効にしますが、3 つの既知の問題があります:

  • 各チェックポイントはトランザクション ID を生成します。チェックポイントの間隔が短すぎると、生成されるトランザクション ID が多すぎます。Kafka クラスターのコーディネーターがメモリ不足になり、Kafka クラスターの安定性が損なわれる可能性があります。

  • 各トランザクションはプロデューサーインスタンスを作成します。同時にコミットされるトランザクションが多すぎると、TaskManager がメモリ不足になり、Flink ジョブの安定性が損なわれる可能性があります。

  • 複数の Flink ジョブが同じ sink.transactional-id-prefix を使用すると、生成されるトランザクション ID が競合する可能性があります。1 つのジョブの書き込みに失敗すると、Kafka パーティションの Log Start Offset (LSO) の進行がブロックされます。これにより、そのパーティションからデータを読み取るすべてのコンシューマーに影響が及びます。

1 回限りのセマンティクスが必要な場合は、Upsert Kafka を使用してプライマリキーテーブルに書き込み、プライマリキーでべき等性を確保してください。トランザクション書き込みを使用する必要がある場合は、「EXACTLY_ONCE セマンティクスに関する考慮事項」をご参照ください。

ネットワーク接続のトラブルシューティング

Flink ジョブの起動中に Timed out waiting for a node assignment というエラーが報告された場合、通常は Flink と Kafka 間のネットワーク接続の問題が原因です。

Kafka クライアントは次のようにサーバーに接続します:

  1. クライアントは bootstrap.servers 内のアドレスを使用して Kafka に接続します。

  2. Kafka は、接続アドレスを含む、クラスター内の各ブローカーのメタデータを返します。

  3. クライアントは、返されたこれらのアドレスを使用して各ブローカーに接続し、読み取りおよび書き込み操作を実行します。

bootstrap.servers アドレスにアクセスできても、Kafka が不正なブローカーアドレスを返した場合、クライアントはデータを読み書きできません。この問題は、プロキシ、ポートフォワーディング、または専用線を使用するネットワークアーキテクチャでよく発生します。

トラブルシューティング手順

ApsaraMQ for Kafka

  1. エンドポイントタイプの確認

    • デフォルトのエンドポイント (内部ネットワーク)

    • SASL エンドポイント (内部ネットワーク + 認証)

    • パブリックネットワークエンドポイント (別途リクエストが必要)

    Flink 開発コンソールを使用してネットワーク診断を実行し、bootstrap.servers アドレスとの接続問題を排除できます。

  2. セキュリティグループとホワイトリストの確認

    Kafka インスタンスは、Flink が存在する VPC の CIDR ブロックをホワイトリストに追加する必要があります。詳細については、「VPC CIDR ブロックの表示」および「ホワイトリストの設定」をご参照ください。

  3. SASL 設定の確認 (有効な場合)

    SASL_SSL エンドポイントを使用する場合、Flink ジョブで JAAS、SSL、および SASL メカニズムを正しく設定する必要があります。認証が欠落していると、ハンドシェイクフェーズで接続が失敗し、タイムアウトとして現れることもあります。詳細については、「セキュリティと認証」をご参照ください。

ECS 上の自己管理型 Kafka

  1. Flink 開発コンソールを使用してネットワーク診断を実行します。

    bootstrap.servers アドレスとの接続問題を排除し、内部およびパブリックネットワークエンドポイントの正しさを確認します。

  2. セキュリティグループとホワイトリストの確認

    • ECS セキュリティグループは、Kafka エンドポイントポート (通常は 9092 または 9093) でのトラフィックを許可する必要があります。

    • ECS インスタンスは、Flink が存在する VPC の CIDR ブロックをホワイトリストに追加する必要があります。詳細については、「VPC CIDR ブロックの表示」をご参照ください。

  3. 設定の確認

    1. Kafka が使用する ZooKeeper クラスターにログインします。zkCli.sh または zookeeper-shell.sh ツールを使用できます。

    2. コマンドを実行してブローカーのメタデータを取得します。例:get /brokers/ids/0。返された結果で、endpoints フィールドにある Kafka がクライアントにアドバタイズするアドレスを見つけます。

      example

    3. Flink 開発コンソールを使用してネットワーク診断を実行し、アドレスが到達可能かどうかをテストします。

      説明
      • アドレスが到達不可能な場合は、Kafka の運用保守エンジニアに連絡して listeners および advertised.listeners の設定を確認し、修正してください。返されたアドレスが Flink からアクセス可能であることを確認してください。

      • Kafka クライアントがサーバーに接続する方法の詳細については、「接続のトラブルシューティング」をご参照ください。

  4. SASL 設定の確認 (有効な場合)

    SASL_SSL エンドポイントを使用する場合、Flink ジョブで JAAS、SSL、および SASL メカニズムを正しく設定する必要があります。認証が欠落していると、ハンドシェイクフェーズで接続が失敗し、タイムアウトとして現れることもあります。詳細については、「セキュリティと認証」をご参照ください。

SQL

Kafka コネクタは、SQL タスクでソーステーブルまたは結果テーブルとして使用できます。

構文

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

メタデータ列

ソーステーブルと結果テーブルにメタデータ列を定義して、Kafka メッセージのメタデータにアクセスしたり書き込んだりできます。たとえば、WITH パラメーターで複数のトピックを定義し、Kafka ソーステーブルでメタデータ列を定義すると、Flink が読み取るデータにはそのソース Topic がマークされます。次の例は、メタデータ列の使用方法を示しています。

CREATE TABLE kafka_source (
  -- メッセージのトピックを `record_topic` フィールドとして読み取ります。
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- ConsumerRecord からタイムスタンプを `ts` フィールドとして読み取ります。
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- メッセージのオフセットを `record_offset` フィールドとして読み取ります。
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- `ts` フィールドのタイムスタンプを ProducerRecord のタイムスタンプとして Kafka に書き込みます。
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

次の表に、Kafka ソーステーブルと結果テーブルでサポートされているメタデータ列を示します。

キー

データ型

説明

ソーステーブルまたは結果テーブル

topic

STRING NOT NULL METADATA VIRTUAL

Kafka メッセージを含むトピックの名前。

ソーステーブル

partition

INT NOT NULL METADATA VIRTUAL

Kafka メッセージを含むパーティションの ID。

ソーステーブル

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

Kafka メッセージのヘッダー。

ソーステーブルと結果テーブル

leader-epoch

INT NOT NULL METADATA VIRTUAL

Kafka メッセージのリーダーエポック。

ソーステーブル

offset

BIGINT NOT NULL METADATA VIRTUAL

Kafka メッセージのオフセット。

ソーステーブル

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

Kafka メッセージのタイムスタンプ。

ソーステーブルと結果テーブル

timestamp-type

STRING NOT NULL METADATA VIRTUAL

Kafka メッセージのタイムスタンプタイプ:

  • NoTimestampType: メッセージにタイムスタンプが定義されていません。

  • CreateTime: メッセージが作成された時刻。

  • LogAppendTime: メッセージが Kafka ブローカーに追加された時刻。

ソーステーブル

__raw_key__

STRING NOT NULL METADATA VIRTUAL

生の Kafka メッセージのキーフィールド。

ソーステーブルと結果テーブル

__raw_value__

STRING NOT NULL METADATA VIRTUAL

生の Kafka メッセージの値フィールド。

ソーステーブルと結果テーブル

WITH パラメーター

  • 全般

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルタイプ。

    String

    はい

    なし

    値は Kafka に固定されます。

    properties.bootstrap.servers

    Kafka ブローカーのアドレス。

    String

    はい

    なし

    形式は host:port,host:port,host:port です。アドレスはコンマ (,) で区切ります。

    properties.*

    Kafka クライアントの直接設定。

    String

    いいえ

    なし

    サフィックスは、Kafka の公式ドキュメントで定義されているプロデューサーおよびコンシューマーの設定である必要があります。

    Flink は properties. プレフィックスを削除し、残りの設定を Kafka クライアントに渡します。たとえば、'properties.allow.auto.create.topics'='false' を使用して、トピックの自動作成を無効にできます。

    Kafka コネクタによって上書きされるため、この方法で次の設定を変更しないでください:

    • key.deserializer

    • value.deserializer

    format

    Kafka メッセージの値部分を読み書きするために使用されるフォーマット。

    String

    いいえ

    なし

    サポートされているフォーマット:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    説明

    形式パラメーター設定の詳細については、「形式パラメーター」をご参照ください。

    key.format

    Kafka メッセージのキー部分を読み書きするために使用されるフォーマット。

    String

    いいえ

    なし

    サポートされているフォーマット:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    説明

    この設定を使用する場合、key.options 設定が必要です。

    key.fields

    ソーステーブルまたは結果テーブルのフィールドで、Kafka メッセージのキー部分に対応するもの。

    String

    いいえ

    なし

    複数のフィールド名はセミコロン (;) で区切ります。例: field1;field2

    key.fields-prefix

    Kafka メッセージのすべてのキーフィールドにカスタムプレフィックスを指定して、メッセージの値部分のフィールドとの名前の競合を回避します。

    String

    いいえ

    なし

    この設定項目は、ソーステーブルと結果テーブルの列名を区別するためにのみ使用されます。Kafka メッセージのキー部分を解析および生成する際にプレフィックスは削除されます。

    説明

    この設定を使用する場合、value.fields-include を EXCEPT_KEY に設定する必要があります。

    value.format

    Kafka メッセージの値部分を読み書きするために使用されるフォーマット。

    String

    いいえ

    なし

    この設定は format と同等です。format または value.format のいずれか一方のみを設定できます。両方が設定されている場合、value.formatformat を上書きします。

    value.fields-include

    Kafka メッセージの値部分を解析または生成する際に、メッセージのキー部分に対応するフィールドを含めるかどうかを指定します。

    String

    いいえ

    ALL

    有効な値:

    • ALL (デフォルト): すべての列が Kafka メッセージの値部分として処理されます。

    • EXCEPT_KEY: key.fields で定義されたものを除く残りのフィールドが、Kafka メッセージの値部分として処理されます。

  • ソーステーブル

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    topic

    読み取るトピックの名前。

    String

    いいえ

    なし

    複数のトピック名はセミコロン (;) で区切ります。例: topic-1;topic-2。

    説明

    topic と topic-pattern オプションのいずれか一方のみを指定できます。

    topic-pattern

    読み取り元のトピック名に一致する正規表現。ジョブの実行中に、この正規表現に一致するすべてのトピックが読み取られます。

    String

    いいえ

    なし

    説明

    topic と topic-pattern オプションのいずれか一方のみを指定できます。

    properties.group.id

    コンシューマーグループ ID。

    String

    いいえ

    KafkaSource-{source_table_name}

    指定されたグループ ID を初めて使用する場合は、properties.auto.offset.reset を earliest または latest に設定して、初期開始オフセットを指定する必要があります。

    scan.startup.mode

    Kafka からデータを読み取るための開始オフセット。

    String

    いいえ

    group-offsets

    有効な値:

    • earliest-offset:Kafka の最も早いパーティションから読み取りを開始します。

    • latest-offset:Kafka の最新のオフセットから読み取りを開始します。

    • group-offsets (デフォルト):指定された properties.group.id のコミット済みオフセットから読み取りを開始します。

    • timestampscan.startup.timestamp-millis で指定されたタイムスタンプから読み取りを開始します。

    • specific-offsetsscan.startup.specific-offsets で指定されたオフセットから読み取りを開始します。

    説明

    このパラメーターは、状態なしでタスクが開始された場合に有効になります。タスクがチェックポイントから再開するか、状態から回復すると、状態に保存された進行状況を優先的に使用して読み取りを再開します。

    scan.startup.specific-offsets

    specific-offsets スタートアップモードで、各パーティションの開始オフセットを指定します。

    String

    いいえ

    なし

    例: partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    timestamp スタートアップモードで、開始オフセットのタイムスタンプを指定します。

    Long

    いいえ

    なし

    単位はミリ秒です。

    scan.topic-partition-discovery.interval

    Kafka のトピックとパーティションを動的に検出する間隔。

    Duration

    いいえ

    5 分

    デフォルトのパーティションチェック間隔は 5 分です。この機能を無効にするには、パーティションチェック間隔を明示的に 0 以下の値に設定する必要があります。動的パーティション検出が有効な場合、Kafka ソースは新しいパーティションを自動的に検出し、そこからデータを読み取ることができます。topic-pattern モードでは、既存のトピックの新しいパーティションからデータを読み取るだけでなく、正規表現に一致する新しいトピックのすべてのパーティションからもデータを読み取ります。

    説明

    Ververica Runtime (VVR) 6.0.x では、動的パーティション検出はデフォルトで無効になっています。VVR 8.0 以降、この機能はデフォルトで有効になり、検出間隔は 5 分に設定されています。

    scan.header-filter

    Kafka データに指定されたヘッダーが含まれているかどうかに基づいてデータをフィルターします。

    String

    いいえ

    なし

    ヘッダーのキーと値はコロン (:) で区切ります。複数のヘッダー条件は論理演算子 (&, |) で接続します。NOT 論理演算子 (!) もサポートされています。たとえば、depart:toy|depart:book&!env:test は、ヘッダーに depart=toy または depart=book を含み、env=test を含まない Kafka データを保持します。

    説明
    • このパラメーターは Ververica Runtime (VVR) 8.0.6 以降でのみサポートされます。

    • 論理演算では括弧はサポートされていません。

    • 論理演算は左から右に実行されます。

    • ヘッダー値は、指定されたヘッダー値と比較するために UTF-8 形式の文字列に変換されます。

    scan.check.duplicated.group.id

    properties.group.id で指定された重複するコンシューマーグループをチェックするかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true: タスクを開始する前に、システムは重複するコンシューマーグループをチェックします。重複が見つかった場合、タスクはエラーを報告して停止し、既存のコンシューマーグループとの競合を防ぎます。

    • false: タスクはコンシューマーグループの競合をチェックせずに直接開始します。

    説明

    このパラメーターは、VVR 6.0.4 以降でのみサポートされます。

  • シンクテーブル

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    topic

    書き込むトピックの名前。

    String

    はい

    なし

    なし

    sink.partitioner

    Flink の並列処理から Kafka パーティションへのマッピングモード。

    String

    いいえ

    default

    有効な値:

    • default: デフォルトの Kafka partitioner を使用します。

    • fixed: 各 Flink の並列処理は、固定の Kafka パーティションに対応します。

    • round-robin:Flink の並列度からのデータがラウンドロビン方式で Kafka パーティションに割り当てられます。

    • カスタム partitioner:fixed と round-robin がニーズに合わない場合は、FlinkKafkaPartitioner の子クラスを作成してカスタム partitioner を定義できます。例:org.mycompany.MyPartitioner

    sink.delivery-guarantee

    Kafka 結果テーブルの配信セマンティクス。

    String

    いいえ

    at-least-once

    有効な値:

    • none: 保証なし。データが失われたり、重複したりする可能性があります。

    • at-least-once (デフォルト): データの損失がないことを保証しますが、データが重複する可能性があります。

    • exactly-once: Kafka トランザクションを使用して、データが失われたり重複したりしないことを保証します。

    説明

    exactly-once セマンティクスを使用する場合、sink.transactional-id-prefix パラメーターが必要です。

    sink.transactional-id-prefix

    exactly-once セマンティクスで使用される Kafka トランザクション ID のプレフィックス。

    String

    いいえ

    なし

    この設定は、sink.delivery-guarantee が exactly-once に設定されている場合にのみ有効です。

    sink.parallelism

    Kafka 結果テーブルオペレーターの並列処理の次数。

    Integer

    いいえ

    なし

    アップストリームオペレーターの並列度はフレームワークによって決定されます。

セキュリティと認証

Kafka クラスターが安全な接続または認証を必要とする場合、関連するセキュリティおよび認証設定を properties. プレフィックスを付けて WITH パラメーターに追加できます。次の例は、SASL メカニズムとして PLAIN を使用し、JAAS 設定を提供するように Kafka テーブルを設定する方法を示しています。

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

次の例は、セキュリティプロトコルとして SASL_SSL を使用し、SASL メカニズムとして SCRAM-SHA-256 を使用する方法を示しています。

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /*SSL 設定*/
  /*サーバーから提供されたトラストストア (CA 証明書) へのパスを設定します。*/
  /*ファイル管理を通じてアップロードされたファイルは /flink/usrlib/ パスに保存されます。*/
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /*クライアント認証が必要な場合は、キーストア (秘密鍵) へのパスを設定します。*/
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /*クライアントがサーバーアドレスを検証するためのアルゴリズム。空の値はサーバーアドレス検証を無効にします。*/
  'properties.ssl.endpoint.identification.algorithm' = '',
  /*SASL 設定*/
  /*SASL メカニズムを SCRAM-SHA-256 に設定します。*/
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /*JAAS を設定します*/
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

例で言及されている CA 証明書と秘密鍵は、Realtime Compute コンソールのファイル管理機能を使用してプラットフォームにアップロードできます。アップロード後、ファイルは /flink/usrlib ディレクトリに保存されます。使用する CA 証明書ファイルの名前が my-truststore.jks の場合、WITH パラメーターで 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' を指定してこの証明書を使用します。

説明
  • 上記の例は、ほとんどの設定シナリオに適用されます。Kafka コネクタを設定する前に、Kafka サーバーの運用保守エンジニアに連絡して、正しいセキュリティおよび認証設定を取得してください。

  • オープンソースの Flink とは異なり、Realtime Compute for Apache Flink の SQL エディターは二重引用符 (") を自動的にエスケープします。そのため、properties.sasl.jaas.config を設定する際に、ユーザー名とパスワードの二重引用符に追加のエスケープ文字 (\) を追加する必要はありません。

ソーステーブルの開始オフセット

スタートアップモード

scan.startup.mode を設定することで、Kafka ソーステーブルの初期読み取りオフセットを指定できます。

  • earliest-offset: 現在のパーティションの最も古いオフセットから読み取りを開始します。

  • latest-offset: 現在のパーティションの最新のオフセットから読み取りを開始します。

  • group-offsets: 指定されたグループ ID のコミット済みオフセットから読み取りを開始します。グループ ID は properties.group.id で指定されます。

  • timestamp: 指定された時刻以上のタイムスタンプを持つ最初のメッセージから読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis で指定されます。

  • specific-offsets: 指定されたパーティションオフセットから消費を開始します。オフセットは scan.startup.specific-offsets で指定されます。

説明
  • 開始オフセットを指定しない場合、デフォルトでコミット済みオフセット (group-offsets) から消費が開始されます。

  • scan.startup.mode パラメーターは、状態なしで開始するジョブに対してのみ有効です。ステートフルなジョブが開始されると、その状態に保存されているオフセットから消費を開始します。

次のコードは例を示しています。

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- 最も早いオフセットから消費を開始します。
  'scan.startup.mode' = 'earliest-offset',
  -- 最新のオフセットから消費を開始します。
  'scan.startup.mode' = 'latest-offset',
  -- コンシューマーグループ "my-group" のコミット済みオフセットから消費を開始します。
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- "my-group" を初めて使用する場合、最も早いオフセットから消費を開始します。
  'properties.auto.offset.reset' = 'latest', -- "my-group" を初めて使用する場合、最新のオフセットから消費を開始します。
  -- 指定されたミリ秒タイムスタンプ 1655395200000 から消費を開始します。
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- 指定されたオフセットから消費を開始します。
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

開始オフセットの優先順位

ソーステーブルの開始オフセットの優先度は次のとおりです:

優先度 (高から低)

チェックポイントまたはセーブポイントに保存されているオフセット

ジョブ開始時に Realtime Compute コンソールで選択された開始時刻

WITH パラメーターの scan.startup.mode で指定された開始オフセット

scan.startup.mode が指定されていない場合、group-offsets が使用され、対応するコンシューマーグループのオフセットが使用されます

いずれかのステップでオフセットが無効になった場合 (たとえば、有効期限切れや Kafka クラスターの問題)、システムは properties.auto.offset.reset で設定されたポリシーを使用してオフセットをリセットします。このパラメーターが設定されていない場合、例外が発生し、手動での介入が必要になります。

一般的なシナリオは、新しいグループ ID で消費を開始することです。まず、ソーステーブルは Kafka クラスターにそのグループのコミット済みオフセットを問い合わせます。このグループ ID は初めて使用されるため、有効なオフセットは見つかりません。したがって、オフセットは properties.auto.offset.reset パラメーターで設定されたポリシーに従ってリセットされます。新しいグループ ID で消費する場合、オフセットリセットポリシーを指定するために properties.auto.offset.reset を設定する必要があります。

ソーステーブルのオフセットコミット

Kafka ソーステーブルは、チェックポイントが成功した後にのみ、現在のコンシューマーオフセットを Kafka クラスターにコミットします。チェックポイントの間隔が長い場合、Kafka クラスターで観測されるコンシューマーオフセットは遅れます。チェックポイント中、Kafka ソーステーブルは現在の読み取り進捗をその状態に保存し、障害回復のためにクラスターにコミットされたオフセットに依存しません。オフセットのコミットは、Kafka 側での読み取り進捗の監視のためだけです。オフセットのコミットに失敗しても、データの正確性には影響しません。

結果テーブルのカスタム partitioner

組み込みの Kafka プロデューサー partitioner がニーズに合わない場合は、カスタム partitioner を実装して対応するパーティションにデータを書き込むことができます。カスタム partitioner は FlinkKafkaPartitioner を継承する必要があります。開発後、JAR パッケージをコンパイルし、Realtime Compute コンソールのファイル管理機能を使用してアップロードします。ファイルをアップロードして参照した後、WITH 句の sink.partitioner パラメーターを partitioner の完全なクラスパス (例:org.mycompany.MyPartitioner) に設定します。

Kafka、Upsert Kafka、Kafka JSON カタログの選択

Kafka は追記専用のメッセージキューシステムであり、データの更新や削除をサポートしていません。そのため、アップストリームの Change Data Capture (CDC) データや、ストリーミング SQL の集約や結合などのオペレーターからのリトラクションロジックを処理できません。変更やリトラクションを含むデータを Kafka に書き込むには、変更データを処理するために特別に設計されたUpsert Kafka 結果テーブルを使用します。

アップストリームデータベースの 1 つ以上のテーブルから Kafka に変更データをバッチで便利に同期するには、Kafka JSON カタログを使用できます。Kafka に保存されているデータが JSON 形式の場合、Kafka JSON カタログを使用すると、スキーマや WITH パラメーターを定義する必要がなくなります。詳細については、「Kafka JSON カタログの管理」をご参照ください。

例 1: Kafka からデータを読み取り、Kafka に書き込む

`source` という名前のトピックから Kafka データを読み取り、`sink` という名前のトピックに書き込みます。データは CSV 形式です。

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',  // Kafka ブローカーのリスト
  'properties.group.id' = '<yourKafkaConsumerGroupId>', // Kafka コンシューマーグループ ID
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>', // Kafka ブローカーのリスト
  'properties.group.id' = '<yourKafkaConsumerGroupId>', // Kafka コンシューマーグループ ID
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

例 2: テーブルスキーマとデータを同期する

Kafka トピックから Hologres にメッセージをリアルタイムで同期します。この場合、Kafka メッセージのオフセットとパーティション ID をプライマリキーとして使用して、フェールオーバー中に Hologres に重複メッセージがないことを保証できます。

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- オプション。すべてのネストされた列をフラット化します。
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

例 3: テーブルスキーマと Kafka メッセージのキーと値のデータの同期

Kafka メッセージのキー部分にすでに関連情報が格納されている場合、Kafka からキーと値の両方を同期できます。

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>', // Kafka ブローカーのリスト
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
説明

Kafka メッセージのキー部分は、テーブルスキーマの変更や型解析に対応していません。手動で宣言する必要があります。

例 4: テーブルスキーマとデータの同期および計算の実行

Kafka データを Hologres に同期する際、軽量な計算が必要になることがよくあります。

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- COALESCE を使用して null 値を処理します。

例 5: ネストされた JSON の解析

JSON メッセージの例

{
  "id": 101,
  "name": "VVP",
  "properties": {
    "owner": "Alibaba Cloud",
    "engine": "Flink"
  }
}

後でフィールドを解析するために JSON_VALUE(payload, '$.properties.owner') のような関数を使用するのを避けるために、Source DDL で直接構造を定義できます:

CREATE TEMPORARY TABLE kafka_source (
  id          VARCHAR,
  `name`      VARCHAR,
  properties  ROW<`owner` STRING, engine STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxx',
  'properties.bootstrap.servers' = 'xxx',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

このようにすると、Flink は読み取り段階で JSON を構造化されたフィールドに解析します。後続の SQL クエリは、追加の関数呼び出しなしで直接 properties.owner を使用でき、全体的なパフォーマンスが向上します。

Datastream API

重要

DataStream API を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。DataStream コネクタの設定方法については、「DataStream コネクタの使用」をご参照ください。

  • Kafka ソースの構築

    Kafka ソースは、KafkaSource インスタンスを作成するためのビルダー クラスを提供します。次のコード例は、`input-topic` の最も早いオフセットからデータを消費する Kafka ソースを構築する方法を示しています。コンシューマーグループの名前は `my-group` で、Kafka メッセージ本文は文字列に逆シリアル化されます。

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    KafkaSource を構築する際には、次のパラメーターを指定する必要があります。

    パラメーター

    説明

    BootstrapServers

    Kafka ブローカーのアドレス。setBootstrapServers(String) メソッドを使用してこれを設定します。

    GroupId

    コンシューマーグループ ID。setGroupId(String) メソッドを使用してこれを設定します。

    トピックまたはパーティション

    サブスクライブするトピックまたはパーティションの名前。Kafka ソースは、トピックまたはパーティションをサブスクライブするための次の 3 つの方法を提供します:

    • トピックリスト: トピックリスト内のすべてのパーティションをサブスクライブします。

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • 正規表現マッチング: 正規表現に一致するトピックのすべてのパーティションをサブスクライブします。

      KafkaSource.builder().setTopicPattern("topic.*")
    • パーティションリスト: 指定されたパーティションをサブスクライブします。

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
              new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
      KafkaSource.builder().setPartitions(partitionSet)

    デシリアライザ

    Kafka メッセージを解析するための逆シリアライザー。

    setDeserializer(KafkaRecordDeserializationSchema) を使用してデシリアライザーを指定します。ここで、KafkaRecordDeserializationSchema は Kafka ConsumerRecord を解析する方法を定義します。Kafka メッセージのメッセージ本文 (値) のデータのみを解析する必要がある場合は、次のいずれかの方法で行うことができます:

    • Flink が提供する KafkaSource ビルダークラスの setValueOnlyDeserializer(DeserializationSchema) メソッドを使用します。DeserializationSchema は、Kafka メッセージ本文のバイナリデータの解析方法を定義します。

    • Kafka が提供するパーサーを使用します。これには複数の実装クラスが含まれます。たとえば、StringDeserializer を使用して Kafka メッセージ本文を文字列に解析できます。

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    説明

    ConsumerRecord を完全に解析するには、KafkaRecordDeserializationSchema インターフェイスを自分で実装する必要があります。

    XML

    Kafka DataStream コネクタは、Maven Central Repository で入手できます。

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    Kafka DataStream コネクタを使用する場合、次の Kafka プロパティを理解する必要があります:

    • 開始コンシューマーオフセット

      Kafka ソースは、オフセット初期化子 (OffsetsInitializer) を通じて消費の開始オフセットを指定できます。組み込みのオフセット初期化子には次のものがあります。

      オフセットイニシャライザ

      コード設定

      最も早いオフセットから消費を開始します。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      最新のオフセットから消費を開始します。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      指定された時刻以上のタイムスタンプを持つデータから消費を開始します (ミリ秒単位)。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      コンシューマーグループによってコミットされたオフセットから消費を開始します。コミットされたオフセットが存在しない場合は、最も早いオフセットを使用します。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      オフセットリセット戦略を指定せずに、コンシューマーグループによってコミットされたオフsetから消費を開始します。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      説明
      • 組み込みのイニシャライザがニーズを満たさない場合は、カスタムオフセットイニシャライザを実装できます。

      • オフセットイニシャライザが指定されていない場合、デフォルトで OffsetsInitializer.earliest() (最も古いオフセット) が使用されます。

    • ストリーミングモードとバッチモード

      Kafka ソースは、ストリーミングとバッチの両方のランタイムモードをサポートしています。デフォルトでは、Kafka ソースはストリーミングモードで実行するように設定されているため、Flink ジョブが失敗するかキャンセルされるまでジョブは停止しません。Kafka ソースをバッチモードで実行するように設定するには、setBounded(OffsetsInitializer) を使用して停止オフセットを指定します。すべてのパーティションが停止オフセットに達すると、Kafka ソースは終了します。

      説明

      通常、ストリーミングモードには停止オフセットはありません。コードのデバッグを容易にするために、setUnbounded(OffsetsInitializer) を使用してストリーミングモードで停止オフセットを指定できます。ストリーミングモードとバッチモードで停止オフセットを指定するメソッド名 (setUnbounded と setBounded) は異なることに注意してください。

    • 動的パーティション検出

      Flink ジョブを再起動せずにトピックのスケーリングや新しいトピックの作成などのシナリオに対応するために、提供されているトピックまたはパーティションのサブスクリプションモードで動的パーティション検出機能を有効にできます。

      説明

      動的パーティション検出機能はデフォルトで有効になっており、パーティションチェック間隔は 5 分です。この機能を無効にするには、パーティションチェック間隔を明示的に 0 以下の値に設定する必要があります。次のコードは例を示しています。

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // 10 秒ごとに新しいパーティションをチェックします。
      重要

      動的パーティション検出機能は、Kafka クラスターのメタデータ更新メカニズムに依存します。Kafka クラスターがパーティション情報を迅速に更新しない場合、新しいパーティションが検出されない可能性があります。Kafka クラスターの partition.discovery.interval.ms 設定が実際の状況と一致していることを確認してください。

    • イベント時間とウォーターマーク

      デフォルトでは、Kafka ソースは Kafka メッセージのタイムスタンプをイベント時間として使用します。カスタムウォーターマーク戦略を定義して、メッセージからイベント時間を抽出し、ウォーターマークをダウンストリームに送信できます。

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      カスタムウォーターマーク戦略の詳細については、「ウォーターマークの生成」をご参照ください。

      説明

      並列ソースの一部のタスクが長時間アイドル状態になると (たとえば、Kafka パーティションに長時間データ入力がない場合や、ソースの並列度が Kafka パーティションの数を超える場合)、ウォーターマーク生成メカニズムが失敗する可能性があります。この場合、システムはウィンドウ計算を正常にトリガーできず、データ処理フローが停止します。

      この問題を解決するには、次の調整を行うことができます。

      • ウォーターマークタイムアウトメカニズムの設定:table.exec.source.idle-timeout パラメーターを有効にして、指定されたタイムアウト期間後にシステムが強制的にウォーターマークを生成するようにします。これにより、ウィンドウ計算サイクルが進行することが保証されます。

      • データソースの最適化:Kafka パーティションとソースの並列度の比率を適切に保つことを推奨します (推奨:パーティション数 ≥ ソース並列度)。

    • コンシューマーオフセットのコミット

      Kafka ソースは、チェックポイントが完了したときに現在のコンシューマーオフセットをコミットします。これにより、Flink のチェックポイント状態が Kafka ブローカー上のコミット済みオフセットと一致することが保証されます。チェックポイントが有効になっていない場合、Kafka ソースは Kafka コンシューマーの内部自動オフセットコミットロジックに依存します。自動コミット機能は、enable.auto.commit および auto.commit.interval.ms Kafka コンシューマー設定項目によって設定されます。

      説明

      Kafka ソースは、失敗したジョブを回復するためにブローカーにコミットされたオフセットに依存しません。オフセットのコミットは、ブローカー側での Kafka コンシューマーとコンシューマーグループの消費進捗を監視するためだけです。

    • その他のプロパティ

      上記のプロパティに加えて、setProperties(Properties) および setProperty(String, String) を使用して、Kafka ソースおよび Kafka コンシューマーの任意のプロパティを設定できます。KafkaSource には通常、次の設定項目があります。

      設定項目

      説明

      client.id.prefix

      Kafka コンシューマーのクライアント ID プレフィックスを指定します。

      partition.discovery.interval.ms

      Kafka ソースが新しいパーティションをチェックする間隔を定義します。

      説明

      partition.discovery.interval.ms はバッチモードでは -1 に上書きされます。

      register.consumer.metrics

      Flink に Kafka コンシューマーのメトリックを登録するかどうかを指定します。

      その他の Kafka コンシューマー設定

      Kafka コンシューマー設定の詳細については、「Apache Kafka」をご参照ください。

      重要

      Kafka コネクタは、手動で設定された一部のパラメーターを次のように強制的に上書きします:

      • key.deserializer は常に ByteArrayDeserializer に上書きされます。

      • value.deserializer は常に ByteArrayDeserializer に上書きされます。

      • auto.offset.reset.strategy は OffsetsInitializer#getAutoOffsetResetStrategy() に上書きされます。

      次の例は、SASL メカニズムとして PLAIN を使用し、JAAS 設定を提供するように Kafka コンシューマーを設定する方法を示しています。

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • モニタリング

      Kafka ソースは、監視と診断のために Flink にメトリックを登録します。

      • メトリックスコープ

        Kafka ソースリーダーのすべてのメトリックは、オペレーターメトリックグループのサブグループである KafkaSourceReader メトリックグループの下に登録されます。特定のトピックパーティションに関連するメトリックは、KafkaSourceReader.topic.<topic_name>.partition.<partition_id> メトリックグループに登録されます。

        たとえば、トピック "my-topic" のパーティション 1 の現在のコンシューマーオフセット (currentOffset) は、<some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset の下に登録されます。成功したオフセットコミットの数 (commitsSucceeded) は、<some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded の下に登録されます。

      • メトリックリスト

        メトリック名

        説明

        スコープ

        currentOffset

        現在のコンシューマーオフセット。

        TopicPartition

        committedOffset

        現在のコミット済みオフセット。

        TopicPartition

        commitsSucceeded

        成功したコミット数。

        KafkaSourceReader

        commitsFailed

        失敗したコミット数。

        KafkaSourceReader

      • Kafka コンシューマーメトリック

        Kafka コンシューマーのメトリックは、KafkaSourceReader.KafkaConsumer メトリックグループに登録されます。たとえば、Kafka コンシューマーメトリック records-consumed-total は、<some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total の下に登録されます。

        register.consumer.metrics 設定項目を使用して、Kafka コンシューマーのメトリックを登録するかどうかを指定できます。デフォルトでは、このオプションは true に設定されています。Kafka コンシューマーメトリックの詳細については、「Apache Kafka」をご参照ください。

  • Kafka シンクの構築

    Flink Kafka シンクは、ストリームデータを 1 つ以上の Kafka トピックに書き込むことができます。

    DataStream<String> stream = ...
    
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", );
    KafkaSink<String> kafkaSink =
                    KafkaSink.<String>builder()
                            .setKafkaProducerConfig(kafkaProperties) // プロデューサー設定
                            .setRecordSerializer(
                                    KafkaRecordSerializationSchema.builder()
                                            .setTopic("my-topic") // ターゲットトピック
                                            .setKafkaValueSerializer(StringSerializer.class) // シリアル化スキーマ
                                            .build())
                            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // フォールトトレランス
                            .build();
    
    stream.sinkTo(kafkaSink);

    次のパラメーターを設定する必要があります。

    パラメーター

    説明

    トピック

    データが書き込まれるデフォルトのトピック名。

    データシリアル化

    構築時に、入力データを Kafka ProducerRecord に変換するために KafkaRecordSerializationSchema を提供する必要があります。Flink は、メッセージキー/値のシリアル化、トピック選択、メッセージパーティショニングなどの一般的なコンポーネントを提供するためのスキーマビルダーを提供します。より詳細な制御のために、対応するインターフェイスを実装することもできます。ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) メソッドは、流入する各データレコードに対して呼び出され、Kafka に書き込む ProducerRecord を生成します。

    各データレコードが Kafka にどのように書き込まれるかを詳細に制御できます。ProducerRecord を使用すると、次の操作を実行できます:

    • 書き込み先のトピックの名前を設定します。

    • メッセージキーを定義します。

    • データが書き込まれるパーティションを指定します。

    Kafka クライアントプロパティ

    bootstrap.servers は必須です。これは、カンマで区切られた Kafka ブローカーのリストです。

    フォールトトレランスセマンティクス

    Flink のチェックポイントが有効な場合、Flink Kafka シンクは 1 回限りのセマンティクスを保証できます。Flink のチェックポイントを有効にすることに加えて、DeliveryGuarantee パラメーターを通じて異なるフォールトトレランスセマンティクスを指定することもできます。DeliveryGuarantee パラメーターの詳細は次のとおりです:

    • DeliveryGuarantee.NONE:(デフォルト設定) Flink は保証しません。データが失われたり、重複したりする可能性があります。

    • DeliveryGuarantee.AT_LEAST_ONCE: データの損失がないことを保証しますが、データが重複する可能性があります。

    • DeliveryGuarantee.EXACTLY_ONCE: Kafka トランザクションを使用して 1 回限りのセマンティクスを提供します。

      説明

      EXACTLY_ONCE セマンティクスを使用する場合の考慮事項については、「EXACTLY_ONCE セマンティクスに関する考慮事項」をご参照ください。

データ統合

Kafka コネクタは、データ統合 YAML ジョブ開発でソースからの読み取りまたはターゲットへの書き込みに使用できます。

制限事項

  • Ververica Runtime (VVR) 11.1 以降では、Flink Change Data Capture (CDC) データ統合の同期データソースとして Kafka を使用することを推奨します。

  • JSON、Debezium JSON、Canal JSON 形式のみがサポートされています。現時点では、他のデータ形式はサポートされていません。

  • データソースの場合、単一テーブルのデータを複数のパーティションに分散させることは、Ververica Runtime (VVR) 8.0.11 以降でのみサポートされます。

構文

source:
  type: kafka
  name: Kafka source  // Kafka ソース
  properties.bootstrap.servers: localhost:9092
  topic: ${kafka.topic}
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

設定項目

  • 全般

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    type

    ソースまたはターゲットのタイプ。

    はい

    文字列

    なし

    このパラメーターを kafka に設定します。

    name

    ソースまたはターゲットの名前。

    いいえ

    文字列

    なし

    なし

    properties.bootstrap.servers

    Kafka ブローカーのアドレス。

    はい

    文字列

    なし

    形式は host:port,host:port,host:port です。アドレスはコンマ (,) で区切ります。

    properties.*

    Kafka クライアントの直接設定。

    いいえ

    文字列

    なし

    サフィックスは、Kafka の公式ドキュメントで定義されているプロデューサーおよびコンシューマーの設定である必要があります。

    Flink は properties. プレフィックスを削除し、残りの設定を Kafka クライアントに渡します。たとえば、'properties.allow.auto.create.topics' = 'false' を使用して、トピックの自動作成を無効にできます。

    key.format

    Kafka メッセージのキー部分を読み書きするために使用されるフォーマット。

    いいえ

    文字列

    なし

    • ソースの場合、json のみがサポートされます。

    • シンクの場合、有効な値は次のとおりです:

      • csv

      • json

    説明

    このパラメーターは Ververica Runtime (VVR) 11.0.0 以降でのみサポートされます。

    value.format

    Kafka メッセージの値部分を読み書きするために使用されるフォーマット。

    いいえ

    文字列

    debezium-json

    有効な値:

    • debezium-json 

    • canal-json

    • json

    説明
    • debezium-json および canal-json 形式は、Ververica Runtime (VVR) 8.0.10 以降でのみサポートされます。

    • json 形式は、Ververica Runtime (VVR) 11.0.0 以降でのみサポートされます。

  • ソーステーブル

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    topic

    読み取るトピックの名前。

    いいえ

    文字列

    なし

    複数のトピック名はセミコロン (;) で区切ります。例: topic-1;topic-2。

    説明

    topic と topic-pattern オプションのいずれか一方のみを指定できます。

    topic-pattern

    読み取り元のトピック名に一致する正規表現。ジョブの実行中に、この正規表現に一致するすべてのトピックが読み取られます。

    いいえ

    文字列

    なし

    説明

    topic と topic-pattern オプションのいずれか一方のみを指定できます。

    properties.group.id

    コンシューマーグループ ID。

    いいえ

    文字列

    なし

    指定されたグループ ID を初めて使用する場合は、properties.auto.offset.reset を earliest または latest に設定して、初期開始オフセットを指定する必要があります。

    scan.startup.mode

    Kafka からデータを読み取るための開始オフセット。

    いいえ

    文字列

    group-offsets

    有効な値:

    • earliest-offset:最も早い Kafka パーティションから読み取りを開始します。

    • latest-offset:最新の Kafka オフセットから読み取りを開始します。

    • group-offsets (デフォルト):指定された properties.group.id のコミット済みオフセットから読み取りを開始します。

    • timestamp:scan.startup.timestamp-millis で指定されたタイムスタンプから読み取りを開始します。

    • specific-offsets:scan.startup.specific-offsets で指定されたオフセットから読み取りを開始します。

    説明

    このパラメーターは、状態なしでタスクが開始された場合に有効になります。タスクがチェックポイントから再開するか、状態から回復すると、状態に保存された進行状況を優先的に使用して読み取りを再開します。

    scan.startup.specific-offsets

    specific-offsets スタートアップモードで、各パーティションの開始オフセットを指定します。

    いいえ

    文字列

    なし

    例: partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    timestamp スタートアップモードで、開始オフセットのタイムスタンプを指定します。

    いいえ

    Long

    なし

    単位はミリ秒です。

    scan.topic-partition-discovery.interval

    Kafka のトピックとパーティションを動的に検出する間隔。

    いいえ

    Duration

    5 分

    デフォルトのパーティションチェック間隔は 5 分です。この機能を無効にするには、パーティションチェック間隔を明示的に 0 以下の値に設定する必要があります。動的パーティション検出が有効な場合、Kafka ソースは新しいパーティションを自動的に検出し、そこからデータを読み取ることができます。topic-pattern モードでは、既存のトピックの新しいパーティションからデータを読み取るだけでなく、正規表現に一致する新しいトピックのすべてのパーティションからもデータを読み取ります。

    scan.check.duplicated.group.id

    properties.group.id で指定された重複するコンシューマーグループをチェックするかどうかを指定します。

    いいえ

    ブール値

    false

    有効な値:

    • true:ジョブの開始前に、システムは重複するコンシューマーグループをチェックします。重複が見つかった場合、ジョブはエラーを報告し、既存のコンシューマーグループとの競合を防ぎます。

    • false: タスクはコンシューマーグループの競合をチェックせずに直接開始します。

    schema.inference.strategy

    スキーマ解析戦略。

    いいえ

    文字列

    continuous

    有効な値:

    • continuous:各データレコードのスキーマを解析します。先行するスキーマと後続するスキーマに互換性がない場合、より広いスキーマが解析され、スキーマ変更イベントが生成されます。

    • static:ジョブ開始時にスキーマを一度だけ解析します。後続のデータは初期スキーマに基づいて解析され、スキーマ変更イベントは生成されません。

    説明

    scan.max.pre.fetch.records

    初期スキーマ解析中に各パーティションで消費および解析を試みるメッセージの最大数。

    いいえ

    Int

    50

    ジョブが実際にデータを読み取って処理する前に、各パーティションの最新メッセージを指定された数だけ事前消費してスキーマ情報を初期化しようとします。

    key.fields-prefix

    Kafka メッセージキーを解析した後の命名の競合を避けるために、メッセージキーから解析されたフィールド名に追加されるカスタムプレフィックス。

    いいえ

    文字列

    なし

    この設定項目が key_ に設定されていると仮定します。キーに `a` という名前のフィールドが含まれている場合、キーを解析した後のフィールド名は `key_a` になります。

    説明

    key.fields-prefix の値は、value.fields-prefix のプレフィックスにすることはできません。

    value.fields-prefix

    Kafka メッセージ本文を解析した後の命名競合を避けるために、メッセージ値から解析されたフィールド名に追加されるカスタムプレフィックス。

    いいえ

    文字列

    なし

    この設定項目が value_ に設定されていると仮定します。値に `b` という名前のフィールドが含まれている場合、値を解析した後のフィールド名は `value_b` になります。

    説明

    value.fields-prefix の値は、key.fields-prefix のプレフィックスにすることはできません。

    metadata.list

    ダウンストリームに渡されるメタデータ列。

    いいえ

    文字列

    なし

    利用可能なメタデータ列には、topicpartitionoffsettimestamptimestamp-typeheadersleader-epoch__raw_key____raw_value__ があります。カンマで区切ります。

    • ソーステーブル Debezium JSON 形式

      パラメーター

      必須

      データの型

      デフォルト値

      説明

      debezium-json.distributed-tables

      いいえ

      ブール値

      false

      Debezium JSON の単一テーブルのデータが複数のパーティションに表示される場合は、このオプションを有効にする必要があります。

      説明

      この設定項目は、VVR 8.0.11 以降でのみサポートされます。

      重要

      この設定項目を変更した後、状態なしでタスクを開始する必要があります。

      debezium-json.schema-include

      いいえ

      ブール値

      false

      Debezium Kafka Connect を設定する際、Kafka 設定 value.converter.schemas.enable を有効にして、メッセージにスキーマを含めることができます。このオプションは、Debezium JSON メッセージにスキーマが含まれているかどうかを示します。

      有効な値:

      • true: Debezium JSON メッセージにスキーマが含まれています。

      • false: Debezium JSON メッセージにスキーマが含まれていません。

      debezium-json.ignore-parse-errors

      いいえ

      ブール値

      false

      有効な値:

      • true: 解析エラーが発生した場合、現在の行をスキップします。

      • false (デフォルト):エラーを報告し、ジョブの開始に失敗します。

      debezium-json.infer-schema.primitive-as-string

      いいえ

      ブール値

      false

      テーブルスキーマを解析する際に、すべての型を String として解析するかどうかを指定します。

      有効な値:

      • true: すべての基本データ型を String として解析します。

      • false (デフォルト): 基本的なルールに従って解析します。

    • ソーステーブル Canal JSON 形式

      パラメーター

      必須

      データの型

      デフォルト値

      説明

      canal-json.distributed-tables

      いいえ

      ブール値

      false

      Canal JSON の単一テーブルのデータが複数のパーティションに表示される場合は、このオプションを有効にする必要があります。

      説明

      この設定項目は、VVR 8.0.11 以降でのみサポートされます。

      重要

      この設定項目を変更した後、状態なしでタスクを開始する必要があります。

      canal-json.database.include

      いいえ

      文字列

      なし

      Canal レコードの `database` メタデータフィールドに一致するオプションの正規表現。指定されたデータベースの変更ログレコードのみを読み取ります。正規表現文字列は Java の Pattern と互換性があります。

      canal-json.table.include

      いいえ

      文字列

      なし

      Canal レコードの `table` メタデータフィールドに一致するオプションの正規表現。指定されたテーブルの変更ログレコードのみを読み取ります。正規表現文字列は Java の Pattern と互換性があります。

      canal-json.ignore-parse-errors

      いいえ

      ブール値

      false

      有効な値:

      • true: 解析エラーが発生した場合、現在の行をスキップします。

      • false (デフォルト):エラーを報告し、ジョブの開始に失敗します。

      canal-json.infer-schema.primitive-as-string

      いいえ

      ブール値

      false

      テーブルスキーマを解析する際に、すべての型を String として解析するかどうかを指定します。

      有効な値:

      • true: すべての基本データ型を String として解析します。

      • false (デフォルト): 基本的なルールに従って解析します。

      canal-json.infer-schema.strategy

      いいえ

      文字列

      AUTO

      テーブルスキーマの解析戦略。

      有効な値:

      • AUTO (デフォルト):JSON データを分析して自動的に解析します。データに `sqlType` フィールドが含まれていない場合、解析の失敗を避けるために AUTO を使用することを推奨します。

      • SQL_TYPE:Canal JSON データの `sqlType` 配列を使用して解析します。データに `sqlType` フィールドが含まれている場合、より正確な型を取得するために canal-json.infer-schema.strategy を SQL_TYPE に設定することを推奨します。

      • MYSQL_TYPE:Canal JSON データの `mysqlType` 配列を使用して解析します。

      Kafka の Canal JSON データに `sqlType` フィールドが含まれており、より正確な型マッピングが必要な場合は、canal-json.infer-schema.strategy を SQL_TYPE に設定することを推奨します。

      `sqlType` のマッピングルールについては、「Canal JSON のスキーマ解析」をご参照ください。

      説明
      • この設定は VVR 11.1 以降でサポートされています。

      • MYSQL_TYPE は VVR 11.3 以降でサポートされています。

      canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled

      いいえ

      ブール値

      true

      MySQL `timestamp` 型を CDC `timestamp` 型にマッピングするかどうかを指定します:

      • true (デフォルト):MySQL `timestamp` 型は CDC `timestamp` 型にマッピングされます。

      • false:MySQL `timestamp` 型は CDC `timestamp_ltz` 型にマッピングされます。

      canal-json.mysql.treat-tinyint1-as-boolean.enabled

      いいえ

      ブール値

      true

      MYSQL_TYPE で解析する際、MySQL `tinyint(1)` 型を CDC `boolean` 型にマッピングするかどうかを指定します:

      • true (デフォルト):MySQL `tinyint(1)` 型は CDC `boolean` 型にマッピングされます。

      • false:MySQL `tinyint(1)` 型は CDC `tinyint(1)` 型にマッピングされます。

      この設定は、canal-json.infer-schema.strategy が MYSQL_TYPE に設定されている場合にのみ有効です。

    • ソーステーブル JSON 形式

      パラメーター

      必須

      データ型

      デフォルト値

      説明

      json.timestamp-format.standard

      いいえ

      文字列

      SQL

      入出力のタイムスタンプ形式を指定します。有効な値:

      • SQL:入力タイムスタンプを yyyy-MM-dd HH:mm:ss.s{precision} 形式で解析します。例:2020-12-30 12:13:14.123。

      • ISO-8601:入力タイムスタンプを yyyy-MM-ddTHH:mm:ss.s{precision} 形式で解析します。例:2020-12-30T12:13:14.123。

      json.ignore-parse-errors

      いいえ

      ブール値

      false

      有効な値:

      • true: 解析エラーが発生した場合、現在の行をスキップします。

      • false (デフォルト):エラーを報告し、ジョブの開始に失敗します。

      json.infer-schema.primitive-as-string

      いいえ

      ブール値

      false

      テーブルスキーマを解析する際に、すべての型を String として解析するかどうかを指定します。

      有効な値:

      • true: すべての基本データ型を String として解析します。

      • false (デフォルト): 基本的なルールに従って解析します。

      json.infer-schema.flatten-nested-columns.enable

      いいえ

      ブール値

      false

      JSON データを解析する際、JSON 内のネストされた列を再帰的に展開するかどうかを指定します。有効な値:

      • true:再帰的に展開します。

      • false (デフォルト): ネストされた列を String として扱います。

      json.decode.parser-table-id.fields

      いいえ

      文字列

      なし

      JSON データを解析する際、一部の JSON フィールド値を使用して tableId を生成するかどうかを指定します。複数のフィールドはカンマ , で接続します。たとえば、JSON データが {"col0":"a", "col1","b", "col2","c"} の場合、結果は次のようになります:

      設定

      tableId

      col0

      a

      col0,col1

      a.b

      col0,col1,col2

      a.b.c

  • 結果テーブル

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    type

    ターゲットのタイプ。

    はい

    文字列

    なし

    このパラメーターを kafka に設定します。

    name

    ターゲットの名前。

    いいえ

    文字列

    なし

    なし

    topic

    Kafka トピックの名前。

    いいえ

    文字列

    なし

    このパラメーターが指定されている場合、すべてのデータはこのトピックに書き込まれます。

    説明

    このパラメーターが指定されていない場合、各データレコードは TableID 文字列に対応するトピックに書き込まれます。TableID は、データベース名とテーブル名をピリオド (.) で結合して生成されます。例:databaseName.tableName

    パーティション戦略

    Kafka パーティションにデータを書き込むための戦略。

    いいえ

    文字列

    オールゼロ

    有効な値:

    • all-to-zero (デフォルト): すべてのデータをパーティション 0 に書き込みます。

    • hash-by-key:プライマリキーのハッシュ値に基づいてパーティションにデータを書き込みます。これにより、同じプライマリキーを持つデータレコードが同じパーティションに書き込まれ、その順序が保持されることが保証されます。

    sink.tableId-to-topic.mapping

    アップストリームのテーブル名からダウンストリームの Kafka トピック名へのマッピング。 

    いいえ

    文字列

    なし

    複数のマッピングはセミコロン (;) で区切ります。各マッピングでは、コロン (:) を使用してアップストリームのテーブル名とダウンストリームのトピック名を区切ります。テーブル名には正規表現を使用できます。複数のテーブルを同じトピックにマッピングするには、テーブル名をカンマ (,) で区切ります。例:mydb.mytable1:topic1;mydb.mytable2:topic2

    説明

    このパラメーターを使用すると、元のテーブル名情報を保持したまま、マッピングされたトピックを変更できます。

    • シンクテーブル Debezium JSON 形式

      パラメーター

      必須

      データ型

      デフォルト値

      説明

      debezium-json.include-schema.enabled

      いいえ

      ブール値

      false

      Debezium JSON データにスキーマ情報が含まれているかどうかを指定します。

  • Kafka をデータ統合ソースとして使用する:

    source:
      type: kafka
      name: Kafka source  // Kafka ソース
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      topic: ${kafka.topic}
      value.format: ${value.format}
      scan.startup.mode: ${scan.startup.mode}
     
    sink:
      type: hologres
      name: Hologres sink // Hologres シンク
      endpoint: <yourEndpoint>
      dbname: <yourDbname>
      username: ${secret_values.ak_id}
      password: ${secret_values.ak_secret}
      sink.type-normalize-strategy: BROADEN
  • Kafka をデータ統合ターゲットとして使用する:

    source:
      type: mysql
      name: MySQL Source // MySQL ソース
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink // Kafka シンク
      properties.bootstrap.servers: ${kafka.bootstraps.server}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${kafka.topic}

    ここでは、`route` モジュールを使用して、ソーステーブルから Kafka に書き込むトピック名を設定します。

説明

ApsaraMQ for Kafka は、デフォルトでトピックの自動作成を有効にしていません。詳細については、「トピックの自動作成に関する問題」をご参照ください。ApsaraMQ for Kafka に書き込む場合は、事前に関連するトピックを作成する必要があります。詳細については、「ステップ 3:リソースの作成」をご参照ください。

テーブルスキーマの解析と変更同期戦略

  • パーティションメッセージの事前消費とテーブルスキーマの初期化

    Kafka コネクタは、現在既知のすべてのテーブルのスキーマを維持します。Kafka データを読み取る前に、Kafka コネクタは各パーティションで最大 scan.max.pre.fetch.records 件のメッセージを事前消費しようとします。各データレコードのスキーマを解析し、これらのスキーマをマージしてテーブルスキーマ情報を初期化します。その後、データを消費する前に、コネクタは初期化されたスキーマに基づいて対応するテーブル作成イベントを生成します。

    説明

    Debezium JSON および Canal JSON 形式の場合、テーブル情報は特定のメッセージから取得されます。事前消費される scan.max.pre.fetch.records 件のメッセージには、いくつかのテーブルのデータが含まれている可能性があります。したがって、各テーブルの事前消費されるデータレコードの数は特定できません。事前消費とテーブルスキーマの初期化は、各パーティションからメッセージを消費して処理する前に一度だけ実行されます。後で新しいテーブルデータが現れた場合、そのテーブルの最初のデータレコードから解析されたテーブルスキーマが初期スキーマとして使用されます。そのテーブルのスキーマは、事前消費によって再初期化されることはありません。

    重要

    単一テーブルのデータを複数のパーティションに分散させることは、VVR 8.0.11 以降でのみサポートされます。このシナリオでは、設定項目 debezium-json.distributed-tables または canal-json.distributed-tables を true に設定する必要があります。

  • テーブル情報

    • Canal JSON および Debezium JSON 形式の場合、データベース名とテーブル名を含むテーブル情報は、特定のメッセージから解析されます。

    • JSON 形式の場合、テーブル情報にはテーブル名のみが含まれ、これはデータが存在するトピックの名前です。

  • プライマリキー情報

    • Canal JSON 形式の場合、テーブルのプライマリキーは JSON の `pkNames` フィールドに基づいて定義されます。

    • Debezium JSON および JSON 形式の場合、JSON にはプライマリキー情報が含まれていません。変換ルールを使用して、テーブルに手動でプライマリキーを追加できます。

      transform:
        - source-table: \.*.\.*
          projection: \*
          primary-keys: key1, key2
  • スキーマ解析とスキーマ変更

    schema.inference.strategy が `static` に設定されている場合、テーブルスキーマが初期化された後、Kafka コネクタは初期テーブルスキーマに基づいて各メッセージの値を解析し、スキーマ変更イベントを生成しません。schema.inference.strategy が `continuous` に設定されている場合、Kafka コネクタは各 Kafka メッセージの本文を解析して物理列を抽出し、現在維持されているスキーマと比較します。解析されたスキーマが現在のスキーマと一致しない場合、スキーマのマージを試み、対応するテーブルスキーマ変更イベントを生成します。マージルールは次のとおりです:

    • 解析された物理列に現在のスキーマにないフィールドが含まれている場合、これらのフィールドはスキーマに追加され、null 許容列を追加するイベントが生成されます。

    • 解析された物理列に現在のスキーマに既にあるフィールドが含まれていない場合、そのフィールドは保持され、そのデータは NULL で埋められます。列を削除するイベントは生成されません。

    • 両方に同じ名前の列がある場合、次のシナリオに従って処理されます:

      • 型は同じだが精度が異なる場合、精度の大きい方の型が使用され、列型変更イベントが生成されます。

      • 型が異なる場合、システムは次の図に示すツリー構造で最も低い共通の親ノードを見つけ、それを同じ名前の列の型として使用し、列型変更イベントを生成します。

        image

  • 現在サポートされているスキーマ変更戦略は次のとおりです:

    • 列の追加: 現在のスキーマの末尾に対応する列を追加し、新しい列のデータを同期します。新しい列は null 許容に設定されます。

    • 列の削除:列削除イベントは生成されません。代わりに、その列のデータは自動的に NULL 値で埋められます。

    • 列名の変更:これは列の追加と削除として扱われます。名前が変更された列は現在のスキーマの末尾に追加され、名前変更前の列のデータは NULL 値で埋められます。

    • 列の型変更:

      • ダウンストリームシステムが列の型変更をサポートしている場合、ダウンストリームシンクが列の型変更の処理をサポートした後、データ統合ジョブは通常の列の型変更 (例:INT から BIGINT へ) をサポートします。このような変更は、ダウンストリームシンクがサポートする列の型変更ルールに依存します。シンクテーブルごとにサポートされる列の型変更ルールは異なります。詳細については、特定のシンクテーブルのドキュメントをご参照ください。

      • Hologres のようにダウンストリームシステムが列の型変更をサポートしていない場合、拡張型マッピングを使用できます。これは、ジョブ開始時にダウンストリームシステムにより広い型を持つテーブルを作成することを意味します。列の型変更が発生した場合、システムはダウンストリームシンクが変更を受け入れられるかどうかを判断し、列の型変更に対して寛容なサポートを提供します。

  • 現在サポートされていないスキーマ変更:

    • プライマリキーやインデックスなどの制約の変更。

    • NOT NULL から NULLABLE への変更。

  • Canal JSON のスキーマ解析

    Canal JSON データには、データ列の正確な型情報を記録するオプションの `sqlType` フィールドが含まれている場合があります。より正確なスキーマを取得するには、canal-json.infer-schema.strategy を SQL_TYPE に設定して `sqlType` の型を使用できます。型マッピングの関係は次のとおりです:

    JDBC 型

    タイプコード

    CDC 型

    BIT

    -7

    BOOLEAN

    BOOLEAN

    16

    TINYINT

    -6

    TINYINT

    SMALLINT

    -5

    SMALLINT

    INTEGER

    4

    INT

    BIGINT

    -5

    BIGINT

    DECIMAL

    3

    DECIMAL(38,18)

    数値

    2

    REAL

    7

    FLOAT

    FLOAT

    6

    DOUBLE

    8

    DOUBLE

    バイナリ

    -2

    バイト

    VARBINARY

    -3

    LONGVARBINARY

    -4

    BLOB

    2004

    日付

    91

    日付

    時間

    92

    時間

    タイムスタンプ

    93

    タイムスタンプ

    CHAR

    1

    文字列

    VARCHAR

    12

    LONGVARCHAR

    -1

    その他の型

テーブル名からトピックへのマッピング戦略

Kafka をデータ統合ジョブのターゲットとして使用する場合、テーブル名からトピックへのマッピング戦略を慎重に設定する必要があります。これは、書き込まれる Kafka メッセージ形式 (Debezium JSON または Canal JSON) にもテーブル名情報が含まれており、後続の Kafka メッセージの消費では、トピック名ではなくデータ内のテーブル名が実際のテーブル名として使用されることが多いためです。

MySQL から `mydb.mytable1` と `mydb.mytable2` の 2 つのテーブルを同期する必要があるとします。考えられる設定戦略は次のとおりです:

1. マッピング戦略を設定しない

マッピング戦略がない場合、各テーブルは "database_name.table_name" 形式の名前のトピックに書き込まれます。したがって、`mydb.mytable1` からのデータは `mydb.mytable1` という名前のトピックに書き込まれ、`mydb.mytable2` からのデータは `mydb.mytable2` という名前のトピックに書き込まれます。次のコードは設定例を示しています:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

2. マッピングのためのルート ルールを設定する (非推奨)

多くのシナリオでは、ユーザーは書き込まれるトピックが "database_name.table_name" 形式であることを望まず、指定されたトピックにデータを書き込みたいと考えています。そのため、マッピングのためのルート ルールを設定します。次のコードは設定例を示しています:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  
 route:
  - source-table: mydb.mytable1,mydb.mytable2
    sink-table: mytable1

この場合、`mydb.mytable1` と `mydb.mytable2` からのすべてのデータは `mytable1` トピックに書き込まれます。

ただし、ルート ルールを使用して書き込まれるトピック名を変更すると、Kafka メッセージ (Debezium JSON または Canal JSON 形式) のテーブル名情報も変更されます。この場合、Kafka メッセージ内のすべてのテーブル名は `mytable1` になります。これにより、他のシステムがこのトピックから Kafka メッセージを消費する際に予期しない動作が発生する可能性があります。

3. sink.tableId-to-topic.mapping パラメーターをマッピング用に設定する (推奨)

ソーステーブル名情報を保持しながらテーブル名からトピックへのマッピングルールを設定するには、`sink.tableId-to-topic.mapping` パラメーターを使用します。次のコードは設定例を示しています:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

または

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

この場合、`mydb.mytable1` と `mydb.mytable2` からのすべてのデータは `mytable1` トピックに書き込まれます。Kafka メッセージ (Debezium JSON または Canal JSON 形式) のテーブル名情報は `mydb.mytable1` または `mydb.mytable2` のままです。他のシステムがこのトピックから Kafka メッセージを消費する際、ソーステーブル名情報を正しく取得できます。

EXACTLY_ONCE セマンティクスに関する考慮事項

  • コンシューマーの分離レベルの設定

    Kafka データを消費するすべてのアプリケーションは `isolation.level` を設定する必要があります:

    • read_committed: コミットされたデータのみを読み取ります。

    • read_uncommitted (デフォルト): コミットされていないデータを読み取ることができます。

    EXACTLY_ONCE は read_committed に依存します。そうしないと、コンシューマーがコミットされていないデータを表示する可能性があり、一貫性が損なわれます。

  • トランザクションのタイムアウトとデータ損失

    Flink がチェックポイントから回復する際、そのチェックポイントが開始される前にコミットされたトランザクションにのみ依存します。ジョブのクラッシュから再起動までの時間が Kafka のトランザクションタイムアウトを超えると、Kafka は自動的にトランザクションを中止し、データ損失につながります。

    • Kafka ブローカーのデフォルトの transaction.max.timeout.ms は 15 分です。

    • Flink Kafka シンクのデフォルトの transaction.timeout.ms は 1 時間です。

    • ブローカー側の transaction.max.timeout.ms を Flink の設定以上になるように増やす必要があります。

  • プロデューサープールと同時チェックポイント

    EXACTLY_ONCE モードでは、固定サイズの Kafka プロデューサープールを使用します。各チェックポイントはプールから 1 つのプロデューサーを占有します。同時チェックポイントの数がプールサイズを超えると、ジョブは失敗します。

    同時チェックポイントの最大数に基づいてプロデューサープールサイズを調整してください。

  • 並列度のスケールインに関する制限

    最初のチェックポイントの前にジョブが失敗した場合、再起動時に元のプロデューサープール情報は保持されません。したがって、最初のチェックポイントが完了する前にジョブの並列度を下げないでください。スケールインする必要がある場合、並列度は FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR より低くしてはなりません。

  • トランザクションによる読み取りのブロック

    read_committed モードでは、開いているトランザクション (コミットも中止もされていない) があると、トピック全体からの読み取りがブロックされます。

    例:

    • トランザクション 1 がデータを書き込みます。

    • トランザクション 2 がデータを書き込み、コミットします。

    • トランザクション 1 が完了していない限り、トランザクション 2 のデータはコンシューマーには表示されません。

    したがって:

    • 通常の操作中、データの可視性のレイテンシーはチェックポイントの間隔とほぼ同じです。

    • ジョブが失敗すると、書き込み中のトピックは、ジョブが再起動するかトランザクションがタイムアウトするまでコンシューマーをブロックします。極端な場合、トランザクションのタイムアウトが読み取りに影響を与えることさえあります。

よくある質問