このトピックでは、Kafka コネクタの使用方法について説明します。
背景情報
Apache Kafka は、高性能なデータ処理、ストリーミング分析、データ統合など、ビッグデータアプリケーションで広く使用されているオープンソースの分散メッセージキューサービスです。Kafka コネクタは、Apache Kafka クライアントを活用することで、高スループットのデータインジェストとエグレス、複数形式のデータに対する読み書き操作、および Realtime Compute for Apache Flink のための exactly-once セマンティクスをサポートします。
カテゴリ | 説明 |
サポートタイプ | ソーステーブル、シンクテーブル、データインジェストシンク |
実行モード | ストリーミングモード |
データ形式 | |
固有の監視メトリック | |
API タイプ | SQL API、DataStream API、およびデータインジェスト YAML API |
シンクテーブルのデータを更新または削除できますか? | シンクテーブルのデータを更新または削除することはできません。シンクテーブルにはデータを挿入することしかできません。 説明 データの更新または削除に関連する機能の詳細については、「Upsert Kafka」をご参照ください。 |
前提条件
要件に応じて、次のいずれかの方法でクラスターに接続できます:
ApsaraMQ for Kafka クラスターへの接続
ApsaraMQ for Kafka クラスターのバージョンは 0.11 以降です。
ApsaraMQ for Kafka クラスターが作成されていること。詳細については、「リソースの作成」をご参照ください。
Realtime Compute for Apache Flink ワークスペースが ApsaraMQ for Kafka クラスターと同じ Virtual Private Cloud (VPC) 内にあり、Realtime Compute for Apache Flink VPC の CIDR ブロックが ApsaraMQ for Kafka クラスターのホワイトリストに追加されていること。ApsaraMQ for Kafka のホワイトリストの設定方法については、「ホワイトリストの設定」をご参照ください。
重要ApsaraMQ for Kafka にデータを書き込む際は、次の点にご注意ください:
ApsaraMQ for Kafka は、データ書き込み時の Zstandard 圧縮アルゴリズムをサポートしていません。
ApsaraMQ for Kafka は、べき等またはトランザクション書き込み操作をサポートしていません。そのため、Kafka シンクテーブルがサポートする exactly-once セマンティクスは使用できません。ご利用の Realtime Compute for Apache Flink ジョブが Ververica Runtime (VVR) 8.0.0 以降を使用している場合は、シンクテーブルに対して
properties.enable.idempotence=falseを設定して、べき等書き込み機能を無効にしてください。ApsaraMQ for Kafka のストレージエンジンの比較と制限事項の詳細については、「ストレージエンジンの比較」をご参照ください。
セルフマネージド Apache Kafka クラスターへの接続
セルフマネージド Apache Kafka クラスターのバージョンは 0.11 以降です。
Flink とセルフマネージド Apache Kafka クラスター間にネットワーク接続が存在すること。パブリックネットワーク経由でセルフマネージド Apache Kafka クラスターに接続する方法の詳細については、「ネットワーク接続オプション」をご参照ください。
Apache Kafka 2.8 クライアントオプションのみがサポートされています。詳細については、Apache Kafka の Consumer Configs および Producer Configs ドキュメントをご参照ください。
注意事項
現在、Flink および Kafka コミュニティの設計上の制限により、トランザクション書き込みは推奨されません。sink.delivery-guarantee = exactly-once を設定すると、Kafka コネクタはトランザクション書き込みを有効にしますが、これには 3 つの既知の問題があります:
各チェックポイントは新しいトランザクション ID を生成します。チェックポイントの間隔が短すぎると、トランザクション ID の数が過剰になります。その結果、Kafka コーディネーターのメモリが不足し、Kafka クラスターの安定性が損なわれる可能性があります。
各トランザクションはプロデューサーインスタンスを作成します。同時にコミットされるトランザクションが多すぎると、TaskManager のメモリが不足し、Flink ジョブが不安定になる可能性があります。
複数の Flink ジョブが同じ
sink.transactional-id-prefixを使用すると、生成されるトランザクション ID が競合する可能性があります。あるジョブが書き込みに失敗すると、Kafka パーティションの Log Start Offset (LSO) の進行がブロックされ、そのパーティションから読み取るすべてのコンシューマーに影響を与えます。
exactly-once セマンティクスが必要な場合は、Upsert Kafka を使用してプライマリキーテーブルに書き込み、プライマリキーに依存してべき等性を確保してください。トランザクション書き込みを使用する場合は、「Exactly-Once セマンティクスに関する注意事項」をご参照ください。
ネットワーク接続のトラブルシューティング
Flink ジョブの起動中に Timed out waiting for a node assignment というエラーが報告された場合、これは通常、Flink と Kafka の間のネットワーク接続に問題があることを示します。
Kafka クライアントは、以下のようにサーバーに接続します:
bootstrap.serversのアドレスを使用して Kafka に接続します。Kafka は、クラスター内の各ブローカーのメタデータ (エンドポイントなど) を返します。
クライアントは、返されたエンドポイントを使用して各ブローカーに接続し、データを生成または消費します。
bootstrap.servers のアドレスに到達できても、Kafka が不正なブローカーアドレスを返した場合、クライアントはブローカーへの読み書きができません。この問題は、プロキシ、ポートフォワーディング、または専用線を使用するネットワークアーキテクチャで一般的に発生します。
トラブルシューティング手順
ApsaraMQ for Kafka
アクセスポイントタイプの確認
デフォルトアクセスポイント (内部ネットワーク)
SASL アクセスポイント (認証付き内部ネットワーク)
パブリックネットワークアクセスポイント (別途申請が必要)
Flink 開発コンソールを使用して ネットワークプローブ を実行し、
bootstrap.serversアドレスとの接続問題を排除します。セキュリティグループとホワイトリストの確認
Kafka インスタンスは、Flink VPC をホワイトリストに追加する必要があります。詳細については、「VPC CIDR ブロックの表示」および「ホワイトリストの設定」をご参照ください。
SASL 設定の確認 (有効な場合)
SASL_SSL エンドポイントを使用する場合、Flink ジョブで JAAS、SSL、および SASL メカニズムを正しく設定する必要があります。認証が欠落していると、ハンドシェイクフェーズで接続が失敗し、タイムアウトとして現れることがあります。詳細については、「セキュリティと認証」をご参照ください。
セルフマネージド Kafka (ECS)
Flink 開発コンソールを使用してネットワークプローブを実行する。
bootstrap.serversアドレスとの接続問題を排除し、パブリックおよび内部エンドポイントが正しいことを確認します。セキュリティグループとホワイトリストの確認
ECS セキュリティグループは、Kafka アクセスポイントポート (通常 9092 または 9093) でのトラフィックを許可する必要があります。
Flink が配置されている VPC は、ECS インスタンスのホワイトリストに追加する必要があります。詳細については、「VPC CIDR ブロックの表示」をご参照ください。
設定のトラブルシューティング
Kafka が使用する ZooKeeper クラスターにログインし、zkCli.sh または zookeeper-shell.sh ツールを使用します。
コマンドを実行してブローカーのメタデータを取得します。例:
get /brokers/ids/0。応答のendpointsフィールドで、Kafka がクライアントにアドバタイズするアドレスを見つけます。
Flink 開発コンソールで ネットワークプローブ を実行し、アドレスが到達可能かどうかをテストします。
説明アドレスが到達不可能な場合は、Kafka の運用保守エンジニアに連絡して
listenersおよびadvertised.listenersの設定を確認・修正し、返されたアドレスが Flink からアクセスできるようにしてください。Kafka クライアントとサーバーの接続性の詳細については、「接続性のトラブルシューティング」をご参照ください。
SASL 設定の確認 (有効な場合)
SASL_SSL エンドポイントを使用する場合、Flink ジョブで JAAS、SSL、および SASL メカニズムを正しく設定する必要があります。認証が欠落していると、ハンドシェイクフェーズで接続が失敗したり、タイムアウトとして現れたりします。詳細については、「セキュリティと認証」をご参照ください。
SQL
Kafka コネクタは、SQL ジョブでソーステーブルまたはシンクテーブルとして使用できます。
構文
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)メタデータ列
Kafka ソーステーブルまたは Kafka シンクテーブルにメタデータ列を定義して、Kafka メッセージのメタデータを取得できます。たとえば、Kafka ソーステーブルの WITH 句で複数のトピックが定義され、ソーステーブルにメタデータ列が定義されている場合、Flink がデータを読み取るトピックがマークされます。次のサンプルコードは、メタデータ列の使用方法の例です:
CREATE TABLE kafka_source (
-- メッセージが属するトピックを record_topic フィールドの値として読み取ります。
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- ConsumerRecord のタイムスタンプを ts フィールドの値として読み取ります。
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- メッセージのオフセットを record_offset フィールドの値として読み取ります。
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- ts フィールドのタイムスタンプを ProducerRecord のタイムスタンプとして Kafka に書き込みます。
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);次の表は、Kafka ソーステーブルとシンクテーブルでサポートされているメタデータ列について説明します。
キー | データの型 | 説明 | ソーステーブルまたはシンクテーブル |
topic | STRING NOT NULL METADATA VIRTUAL | Kafka メッセージが属するトピックの名前。 | ソーステーブル |
partition | INT NOT NULL METADATA VIRTUAL | Kafka メッセージが属するパーティションの ID。 | ソーステーブル |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Kafka メッセージのヘッダー。 | ソーステーブルとシンクテーブル |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Kafka メッセージのリーダーエポック。 | ソーステーブル |
offset | BIGINT NOT NULL METADATA VIRTUAL | Kafka メッセージのオフセット。 | ソーステーブル |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Kafka メッセージのタイムスタンプ。 | ソーステーブルとシンクテーブル |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Kafka メッセージのタイムスタンプタイプ:
| ソーステーブル |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | Kafka メッセージの生のキーフィールド。 | ソーステーブルとシンクテーブル 説明 このパラメーターは、VVR 11.4 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。 |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | Kafka メッセージの生の値フィールド。 | ソーステーブルとシンクテーブル 説明 このパラメーターは、VVR 11.4 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。 |
WITH パラメーター
一般
パラメーター
説明
データの型
必須
デフォルト値
備考
connector
テーブルのタイプ。
String
はい
なし
値を kafka に設定します。
properties.bootstrap.servers
Kafka ブローカーの IP アドレスとポート番号。
String
はい
なし
フォーマット:host:port,host:port,host:port。複数の host:port ペアはカンマ (,) で区切ります。
properties.*
Kafka クライアント用に設定されるオプション。
String
いいえ
なし
サフィックスは、Producer Configs および Consumer Configs で定義されているルールに準拠する必要があります。
Flink は "properties." プレフィックスを削除し、残りの設定を Kafka クライアントに渡します。たとえば、
'properties.allow.auto.create.topics'='false'を使用して、自動トピック作成を無効にできます。以下の設定は、Kafka コネクタが上書きするため、この方法では変更できません:
key.deserializer
value.deserializer
format
Kafka メッセージの値フィールドを読み書きするために使用されるフォーマット。
String
いいえ
デフォルト値なし。
サポートされるフォーマット
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
説明フォーマットオプションの詳細については、「フォーマットオプション」をご参照ください。
key.format
Kafka メッセージのキーフィールドを読み書きするために使用されるフォーマット。
String
いいえ
デフォルト値なし。
サポートされるフォーマット
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
説明この設定を使用する場合、key.options 設定が必要です。
key.fields
ソーステーブルまたはシンクテーブルのキーフィールドで、Kafka メッセージのキーフィールドに対応するもの。
String
いいえ
なし
複数のフィールド名はセミコロン (;) で区切ります。例:
field1;field2。key.fields-prefix
Kafka メッセージのすべてのキーフィールドのカスタムプレフィックス。このオプションを設定して、値フィールドとの名前の競合を防ぐことができます。
String
いいえ
なし
このオプションは、ソーステーブルとシンクテーブルの列名を区別するためにのみ使用されます。Kafka メッセージのキーフィールドが解析または生成されるときに、プレフィックスは列名から削除されます。
説明この設定を使用する場合、
value.fields-includeオプションを EXCEPT_KEY に設定する必要があります。value.format
Kafka メッセージの値フィールドを読み書きするために使用されるフォーマット。
String
いいえ
なし
この設定は
formatと同等であり、formatまたはvalue.formatのいずれか一方のみを設定できます。両方を設定した場合、value.formatがformatを上書きします。value.fields-include
Kafka メッセージの値を解析または生成する際に、対応するメッセージキーを含めるかどうかを指定します。
String
いいえ
ALL
有効な値:
ALL(デフォルト):すべてのフィールドが Kafka メッセージの値として処理されます。EXCEPT_KEY:key.fields オプションで指定されたフィールドを除くすべてのフィールドが Kafka メッセージの値として処理されます。
ソーステーブル
パラメーター
説明
データの型
必須
デフォルト値
備考
topic
データを読み取るトピックの名前。
String
いいえ
なし
複数のトピック名はセミコロン (;) で区切ります。例:topic-1 と topic-2。
説明topic オプションは topic-pattern オプションと併用できません。
topic-pattern
トピックを照合するために使用される正規表現。デプロイメントの実行中に、指定された正規表現に名前が一致するすべてのトピックのデータが読み取られます。
String
いいえ
なし
説明topic オプションは topic-pattern オプションと併用できません。
properties.group.id
コンシューマーグループ ID。
String
いいえ
KafkaSource-{ソーステーブル名}
指定されたグループ ID を初めて使用する場合は、properties.auto.offset.reset を earliest または latest に設定して、初期開始オフセットを指定する必要があります。
scan.startup.mode
Kafka がデータを読み取る開始オフセット。
String
いいえ
group-offsets
有効な値:
earliest-offset:Kafka は最も古いパーティションからデータを読み取ります。latest-offset:Kafka は最新のオフセットからデータを読み取ります。group-offsets(デフォルト):properties.group.id オプションで指定された ID を持つコンシューマーグループによってコミットされたオフセットからデータを読み取ります。timestamp:scan.startup.timestamp-millis で指定されたタイムスタンプからデータを読み取ります。specific-offsets:scan.startup.specific-offsets オプションで指定されたオフセットからデータを読み取ります。
説明このオプションは、ステートなしでデプロイメントが開始された場合に有効になります。デプロイメントがチェックポイントから再開されたり、指定されたステートから再開されたりすると、デプロイメントは優先的にステートデータに保存されている進行状況でデータの読み取りを開始します。
scan.startup.specific-offsets
scan.startup.mode オプションが specific-offsets に設定されている場合の各パーティションの開始オフセット。
String
いいえ
なし
例:
partition:0,offset:42;partition:1,offset:300。scan.startup.timestamp-millis
scan.startup.mode オプションが timestamp に設定されている場合の開始オフセットのタイムスタンプ。
Long
いいえ
なし
単位:ミリ秒。
scan.topic-partition-discovery.interval
Kafka のトピックとパーティションを動的に検出する時間間隔。
Duration
いいえ
5 分
デフォルトのパーティション検出間隔は 5 分です。この機能を無効にするには、このオプションを明示的に非正の値に設定する必要があります。動的パーティション検出機能が有効になると、Kafka ソースは新しいパーティションを自動的に検出し、そのパーティションからデータを読み取ることができます。topic-pattern モードでは、Kafka ソースは既存のトピックの新しいパーティションからデータを読み取り、正規表現に一致する新しいトピックのすべてのパーティションからデータを読み取ります。
説明VVR 6.0.X を使用する Realtime Compute for Apache Flink では、動的パーティション検出機能はデフォルトで無効になっています。VVR 8.0 以降を使用する Realtime Compute for Apache Flink では、この機能はデフォルトで有効になっています。デフォルトのパーティション検出間隔は 5 分です。
scan.header-filter
データが特定のメッセージヘッダーを含むかどうかに基づく Kafka データフィルタリング。
String
いいえ
なし
ヘッダーキーと値はコロン (:) で区切ります。複数のヘッダーは AND (&) や OR (|) などの論理演算子で区切ります。論理演算子 NOT (!) がサポートされています。たとえば、
depart:toy|depart:book&!env:testは、ヘッダーに depart=toy または depart=book を含み、かつ env=test を含まない Kafka データを保持することを示します。説明このオプションは、VVR 8.0.6 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
括弧操作はサポートされていません。
論理演算は左から右へ順に実行されます。
UTF-8 形式のヘッダー値は文字列に変換され、scan.header-filter オプションで指定されたヘッダー値と比較されます。
scan.check.duplicated.group.id
properties.group.idパラメーターで指定された重複するコンシューマーグループをチェックするかどうかを指定します。Boolean
いいえ
false
有効な値:
true:ジョブが開始する前に重複するコンシューマーグループをチェックします。重複するコンシューマーグループが存在する場合、エラーを報告し、競合を防ぐためにジョブを一時停止します。
false:ジョブが開始する前に重複するコンシューマーグループをチェックしません。
説明このオプションは、VVR 6.0.4 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
シンク固有
パラメーター
説明
データの型
必須
デフォルト値
備考
topic
データが書き込まれるトピックの名前。
String
はい
なし
N/A
sink.partitioner
Flink の並列度を Kafka のパーティションにマッピングするパターン。
String
いいえ
default
有効な値:
default (デフォルト):デフォルトの Kafka パーティショナーを使用してデータをパーティション分割します。
fixed:各 Flink パーティションは固定の Kafka パーティションに対応します。
round-robin:Flink パーティション内のデータは、ラウンドロビンシーケンスで Kafka パーティションに分散されます。
カスタムパーティションマッピングパターン:FlinkKafkaPartitioner のサブクラスを作成して、カスタムパーティションマッピングパターンを設定できます。例:org.mycompany.MyPartitioner。
sink.delivery-guarantee
Kafka シンクテーブルのセマンティックパターン。
String
いいえ
at-least-once
有効な値:
none:配信セマンティクスは保証されません。データが失われたり、重複したりする可能性があります。
at-least-once (デフォルト):データが失われないことを保証します。ただし、データが重複する可能性があります。
exactly-once:Kafka トランザクションを使用して exactly-once セマンティクスを保証します。これにより、データが失われたり、重複したりしないことが保証されます。
説明このオプションを exactly-once に設定する場合は、sink.transactional-id-prefix オプションを設定する必要があります。
sink.transactional-id-prefix
exactly-once セマンティクスで使用される Kafka トランザクション ID のプレフィックス。
String
いいえ
なし
このオプションは、sink.delivery-guarantee オプションが exactly-once に設定されている場合にのみ有効です。
sink.parallelism
Kafka シンクテーブルのオペレーターの並列度。
Integer
いいえ
なし
フレームワークによって決定される、上流オペレーターの並列度。
セキュリティと認証
ご利用の Kafka クラスターが安全な接続または認証を必要とする場合は、セキュリティおよび認証オプションの名前に properties. プレフィックスを追加し、WITH 句でそれらを設定します。次のサンプルコードは、Kafka テーブルが Simple Authentication and Security Layer (SASL) メカニズムとして PLAIN を使用し、Java Authentication and Authorization Service (JAAS) 設定を提供する方法を示しています。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)次のサンプルコードは、Kafka テーブルがセキュリティプロトコルとして SASL_SSL を、SASL メカニズムとして SCRAM-SHA-256 を使用するように設定する方法を示しています。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/*Secure Sockets Layer (SSL) を設定します。*/
/*サーバーから提供された CA 証明書トラストストアのパスを指定します。*/
/*アップロードされたアーティファクトは /flink/usrlib/ に保存されます。*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/*クライアント認証が必要な場合は、秘密鍵ファイルキーストアのパスを指定します。*/
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/*クライアントがサーバーアドレスを検証するために使用するアルゴリズム。null 値はサーバーアドレス検証が無効であることを示します。*/
'properties.ssl.endpoint.identification.algorithm' = '',
/*SASL を設定します。*/
/*SASL メカニズムとして SCRAM-SHA-256 を設定します。*/
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/*JAAS を設定します。*/
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)開発コンソールの アーティファクト 機能を使用して、例の CA 証明書と秘密鍵をアップロードできます。アップロード後、ファイルは /flink/usrlib ディレクトリに保存されます。使用したい CA 証明書ファイルの名前が my-truststore.jks の場合、WITH 句の 'properties.ssl.truststore.location' パラメーターを次の 2 つの方法で設定して、この証明書を使用できます:
'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks'を設定した場合、Flink は実行時に OSS ファイルを動的にダウンロードする必要はありませんが、デバッグモードはサポートされません。リアルタイムコンピューティングエンジンバージョン Ververica Runtime (VVR) 11.5 以降では、
properties.ssl.truststore.locationとproperties.ssl.keystore.locationを OSS 絶対パスで設定できます。ファイルパスの形式は oss://flink-fullymanaged-<workspace ID>/artifacts/namespaces/<project name>/<file name> です。この方法は、Flink の実行時に OSS ファイルを動的にダウンロードし、デバッグモードをサポートします。
設定の確認:上記のコードスニペットは、ほとんどの設定シナリオに適用されます。Kafka コネクタを設定する前に、Kafka サーバーの運用保守担当者に連絡して、正しいセキュリティおよび認証設定情報を取得してください。
エスケープに関する注意:Apache Flink とは異なり、Realtime Compute for Apache Flink の SQL エディターはデフォルトで二重引用符 (") をエスケープします。そのため、
properties.sasl.jaas.configオプションを設定する際に、ユーザー名とパスワードを囲む二重引用符 (") にバックスラッシュ (\) をエスケープ文字として追加する必要はありません。
Kafka ソーステーブルの開始オフセット
起動モード
scan.startup.mode パラメーターを設定して、Kafka ソーステーブルの初期読み取りオフセットを指定できます:
earliest-offset:現在のパーティションの最も古いオフセットからデータを読み取ります。
latest-offset:現在のパーティションの最新のオフセットからデータを読み取ります。
group-offsets:properties.group.id オプションで指定された ID を持つコンシューマーグループによってコミットされたオフセットからデータを読み取ります。
timestamp:scan.startup.timestamp-millis で指定されたタイムスタンプ以上の最初のメッセージからデータを読み取ります。
specific-offsets:scan.startup.specific-offsets オプションで指定されたパーティションオフセットからデータを読み取ります。
開始オフセットを指定しない場合、デフォルトで group-offsets が使用されます。
scan.startup.mode はステートレスジョブに対してのみ有効です。ステートフルジョブの場合、消費はステートに保存されているオフセットから開始されます。
サンプルコード:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- 最も古いオフセットからデータを消費します。
'scan.startup.mode' = 'earliest-offset',
-- 最新のオフセットからデータを消費します。
'scan.startup.mode' = 'latest-offset',
-- コンシューマーグループ my-group によってコミットされたオフセットからデータを消費します。
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- my-group を初めて使用する場合、消費は最も古いオフセットから開始されます。
'properties.auto.offset.reset' = 'latest', -- my-group を初めて使用する場合、消費は最新のオフセットから開始されます。
-- タイムスタンプ 1655395200000 (ミリ秒) からデータを消費します。
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- 指定されたオフセットからデータを消費します。
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);開始オフセットの優先順位
ソーステーブルの開始オフセットの優先順位は次のとおりです:
優先度 (高い順) | チェックポイントまたはセーブポイントに保存されているオフセット。 |
コンソールで指定された開始時刻。 | |
WITH パラメーターの scan.startup.mode パラメーターで指定された開始オフセット。 | |
scan.startup.mode が指定されていない場合、システムは group-offsets を使用し、対応するコンシューマーグループのオフセットからデータを消費します。 |
上記のいずれかのステップでオフセットが期限切れまたは Kafka クラスターの問題で無効になった場合、properties.auto.offset.reset で指定されたリセットポリシーが使用されます。この設定項目が設定されていない場合、ユーザーの介入が必要な例外がスローされます。
ほとんどの場合、Kafka ソーステーブルは新しいグループ ID を持つコンシューマーグループによってコミットされたオフセットからデータの読み取りを開始します。Kafka ソーステーブルが Kafka クラスターでコンシューマーグループによってコミットされたオフセットをクエリすると、グループ ID が初めて使用されるため、有効なオフセットは返されません。この場合、properties.auto.offset.reset パラメーターで設定されたリセット戦略がオフセットのリセットに使用されます。したがって、オフセットリセット戦略を指定するために properties.auto.offset.reset パラメーターを設定する必要があります。
ソーステーブルのオフセット送信
Kafka ソーステーブルは、チェックポイント操作が成功した後にのみ、コンシューマーオフセットを Kafka クラスターにコミットします。指定したチェックポイント間隔が長すぎると、コンシューマーオフセットの Kafka クラスターへのコミットが遅延します。チェックポイント操作中、Kafka ソーステーブルは現在のデータ読み取りの進行状況をステートバックエンドに保存します。Kafka クラスターにコミットされたオフセットは、障害回復には使用されません。コミットされたオフセットは、Kafka でのデータ読み取りの進行状況を監視するためにのみ使用されます。オフセットのコミットに失敗しても、データの正確性には影響しません。
シンクテーブルのカスタムパーティショナー
組み込みの Kafka プロデューサーパーティショナーが要件を満たさない場合は、カスタムパーティショナーを実装して特定のパーティションにデータを書き込むことができます。カスタムパーティショナーは FlinkKafkaPartitioner を継承する必要があります。開発後、JAR パッケージをコンパイルし、ファイル管理 機能を使用してリアルタイムコンピューティングコンソールにアップロードします。JAR パッケージがアップロードされ、参照された後、WITH 句で sink.partitioner パラメーターを設定します。パラメーター値は、パーティショナーの完全なクラスパスである必要があります。例:org.mycompany.MyPartitioner。
Kafka、Upsert Kafka、Kafka JSON カタログの比較
Kafka はデータ挿入のみをサポートし、更新や削除をサポートしないメッセージキューシステムです。そのため、Kafka は上流システムからの Change Data Capture (CDC) データや、ストリーミング SQL 計算中の集計や結合などのオペレーターからのリトラクションロジックを処理できません。変更データやリトラクションデータを含むデータを Kafka に書き込みたい場合は、変更データを特別に処理する Upsert Kafka シンクテーブルを使用します。
上流データベースの 1 つまたは複数のデータテーブルから Kafka に変更データをバッチで同期したい場合は、Kafka JSON カタログを使用できます。Kafka に保存されているデータが JSON 形式の場合、Kafka JSON カタログを使用できます。これにより、WITH 句でスキーマとオプションを設定する必要がなくなります。詳細については、「Kafka JSON カタログの管理」をご参照ください。
例
例 1:Kafka トピックからデータを読み取り、別の Kafka トピックにデータを書き込む
次のコードサンプルは、ソース Kafka トピックからデータを読み取り、シンク Kafka トピックに書き込みます。データは CSV 形式です。
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;例 2:テーブルスキーマとデータの同期
Kafka コネクタを使用して、Kafka トピックから Hologres にメッセージをリアルタイムで同期します。Kafka メッセージのオフセットとパーティション ID をプライマリキーとして設定することで、フェールオーバー時に Hologres でのメッセージの重複を回避します。
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- オプション。すべてのネストされた列を展開します。
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;例 3:Kafka メッセージのキー列と値列のテーブルスキーマとデータの同期
Kafka メッセージのキーフィールドには関連情報が保存されています。Kafka メッセージのキー列と値列の両方のデータを同時に同期できます。
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Kafka メッセージキーはスキーマ進化と型解析をサポートしていません。手動での宣言が必要です。
例 4:テーブルスキーマとデータの同期および計算の実行
Kafka から Hologres にデータを同期する際、軽量な計算が必要です。
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- COALESCE を使用して null 値を処理します。例 5:ネストされた JSON データの解析
JSON メッセージのサンプル
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Alibaba Cloud",
"engine": "Flink"
}
}JSON_VALUE(payload, '$.properties.owner') のような関数を使用してフィールドを解析する必要を避けるために、Source DDL で直接構造を定義できます:
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);その結果、Flink は読み取りフェーズで JSON を構造化フィールドに解析し、後続の SQL クエリは追加の関数呼び出しなしで直接 properties.owner を使用するため、全体的なパフォーマンスが向上します。
DataStream API
DataStream API を使用してデータを読み書きする場合は、関連タイプの DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。
Kafka ソースの作成
Kafka ソースは、KafkaSource のインスタンスを作成するためのビルダークラスを提供します。次のサンプルコードは、"input-topic" トピックの最も古いオフセットからメッセージを消費し、コンシューマーグループ名を my-group とし、Kafka メッセージ本文を文字列として逆シリアル化する Kafka ソースを構築する方法を示しています。
Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");KafkaSource を作成する際には、以下のパラメーターを指定する必要があります。
パラメーター
説明
BootstrapServers
Kafka ブローカーのアドレス。setBootstrapServers(String) 操作を呼び出してアドレスを設定できます。
GroupId
コンシューマーグループの ID。setGroupId(String) メソッドを呼び出して ID を設定できます。
Topics または Partition
サブスクライブするトピックまたはパーティションの名前。以下のサブスクリプションパターンのいずれかを使用して、トピックまたはパーティションにサブスクライブするように Kafka ソースを設定できます:
トピックリスト。トピックリストを設定すると、Kafka ソースは指定されたトピックのすべてのパーティションにサブスクライブします。
KafkaSource.builder().setTopics("topic-a","topic-b")トピックパターン。正規表現を指定すると、Kafka ソースは指定された正規表現に一致するトピックのすべてのパーティションにサブスクライブします。
KafkaSource.builder().setTopicPattern("topic.*")パーティションリスト。パーティションリストを設定すると、Kafka ソースは指定されたパーティションにサブスクライブします。
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // "topic-a" トピックのパーティション 0 new TopicPartition("topic-b", 5))); // "topic-b" トピックのパーティション 5 KafkaSource.builder().setPartitions(partitionSet)
Deserializer
Kafka メッセージを逆シリアル化するデシリアライザ。
setDeserializer(KafkaRecordDeserializationSchema) メソッドを呼び出してデシリアライザを指定できます。KafkaRecordDeserializationSchema インターフェイスは、ConsumerRecord オブジェクトがどのように逆シリアル化されるかを定義します。ConsumerRecord オブジェクトの Kafka メッセージの Value フィールドのみを逆シリアル化するには、以下のいずれかの方法を使用できます:
Kafka ソースは setValueOnlyDeserializer(DeserializationSchema) メソッドを提供します。DeserializationSchema クラスは、バイナリ値として保存されている Kafka メッセージがどのように逆シリアル化されるかを定義します。
Kafka の Deserializer インターフェイスを実装するクラスを使用します。たとえば、StringDeserializer クラスを使用してメッセージを文字列に逆シリアル化できます。
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
説明ConsumerRecord オブジェクトを逆シリアル化したい場合は、KafkaRecordDeserializationSchema インターフェイスを実装するクラスを作成する必要があります。
XML
Kafka DataStream コネクタは Maven 中央リポジトリに保存されています。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>Kafka DataStream コネクタを使用する際には、以下の Kafka プロパティに精通している必要があります:
開始オフセット
Kafka ソースがデータの読み取りを開始する際に、オフセットイニシャライザを使用してオフセットを指定できます。オフセットイニシャライザは、OffsetsInitializer インターフェイスを実装するオブジェクトです。KafkaSource クラスは、以下の組み込みオフセットイニシャライザを提供します。
オフセットイニシャライザ
コード設定
最も古いオフセットからデータを読み取ります。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())最新のオフセットからデータを読み取ります。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())指定された時刻 (ミリ秒) 以上のタイムスタンプでデータの消費を開始します。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))コンシューマーグループによってコミットされたオフセットから消費します。そのようなオフセットが存在しない場合は、最も古いオフセットを使用します。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))各パーティションのコミットされたオフセットからデータを読み取り、リセット戦略は指定されません。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())説明組み込みのオフセットイニシャライザがビジネス要件を満たさない場合は、カスタムオフセットイニシャライザを作成できます。
オフセットイニシャライザを指定しない場合、デフォルトで OffsetsInitializer.earliest() オフセットイニシャライザが使用されます。
ストリーミング実行モードとバッチ実行モード
Kafka ソースは、ストリーミングモードまたはバッチモードで動作できます。デフォルトでは、Kafka ソースはストリーミングモードで動作します。このモードでは、デプロイメントは失敗するかキャンセルされるまで実行し続けます。Kafka ソースをバッチモードで動作させたい場合は、setBounded(OffsetsInitializer) メソッドを呼び出して停止オフセットを指定できます。すべてのパーティションが停止オフセットに達すると、Kafka ソースは終了します。
説明ほとんどの場合、ストリーミングモードで動作する Kafka ソースには停止オフセットがありません。ストリーミングモードで動作する Kafka ソースをデバッグしたい場合は、setUnbounded(OffsetsInitializer) メソッドを呼び出して停止オフセットを指定できます。停止オフセットを指定する方法は、ストリーミングモードとバッチモードのどちらを使用するかによって異なります。
動的パーティション検出
実行中のデプロイメントが、デプロイメントを再起動することなく、サブスクリプションパターンに一致する新しいトピックや新しいパーティションからのデータを処理するようにしたい場合は、Kafka ソースで動的パーティション検出機能を有効にできます。DataStream コネクタでは、この機能はデフォルトで無効になっており、手動で有効にする必要があります:
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // 10 秒ごとに新しいパーティションを検出します。重要動的パーティション検出機能は、Kafka クラスターのメタデータ更新メカニズムに依存します。Kafka クラスターがパーティション情報を迅速に更新しない場合、新しいパーティションが検出されない可能性があります。Kafka クラスターの partition.discovery.interval.ms 設定が実際の状況と一致していることを確認してください。
イベント時間とウォーターマーク
デフォルトでは、Kafka ソースはレコードに添付されたタイムスタンプをそのレコードのイベント時間として使用します。各レコードのイベント時間に基づいてウォーターマーク戦略を定義し、ウォーターマークを下流のサービスに送信できます。
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")カスタムウォーターマーク戦略の定義方法の詳細については、「ウォーターマークの生成」をご参照ください。
説明一部のソースサブタスクが長期間アイドル状態になると (たとえば、Kafka パーティションが新しいメッセージを受信しない、またはソースの並列度が Kafka パーティション数を超えるなど)、ウォーターマークの生成が失敗する可能性があります。この場合、ウィンドウ計算がトリガーされず、データ処理が停止します。
解決策は次のとおりです:
ウォーターマークのタイムアウトメカニズムを設定する:table.exec.source.idle-timeout パラメーターを有効にして、指定されたタイムアウト期間後にシステムがウォーターマークを強制的に生成するようにし、ウィンドウ計算エポックの進行を確保します。
データソースを最適化する:ソースの並列度を Kafka パーティション数以下に設定します。
コンシューマーオフセットのコミット
チェックポイントが生成されると、Kafka ソースは各パーティションの Kafka コンシューマーオフセットを Kafka ブローカーにコミットします。これにより、Kafka ブローカーに記録された Kafka コンシューマーオフセットがチェックポイントの状態と一致することが保証されます。Kafka コンシューマーは、各パーティションのオフセットを定期的に Kafka ブローカーに自動的にコミットできます。自動オフセットコミット機能は、enable.auto.commit および auto.commit.interval.ms オプションを使用して設定できます。チェックポイント機能を無効にすると、Kafka ソースは Kafka コンシューマーに依存してオフセットを Kafka ブローカーにコミットします。
説明Kafka ソースは、フォールトトレランスのために Kafka ブローカーに記録されたコミット済みオフセットを使用しません。オフセットをコミットすると、Kafka ブローカーは各パーティションでのレコード消費の進行状況を監視できます。
追加のプロパティ
setProperties(Properties) および setProperty(String, String) メソッドを呼び出して、Kafka ソースおよび Kafka コンシューマーに追加のプロパティを設定できます。次の表は、Kafka ソースのプロパティについて説明します。
設定項目
説明
client.id.prefix
Kafka コンシューマーのクライアント ID のプレフィックスを指定します。
partition.discovery.interval.ms
Kafka ソースが新しいパーティションをチェックする時間間隔を指定します。
説明partition.discovery.interval.ms プロパティは、バッチモードでは -1 に上書きされます。
register.consumer.metrics
Realtime Compute for Apache Flink で Kafka コンシューマーのメトリックを登録するかどうかを指定します。
その他の Kafka Consumer 設定
Kafka コンシューマーのプロパティの詳細については、「Apache Kafka」をご参照ください。
重要Kafka DataStream コネクタは、以下のプロパティの値を上書きします:
key.deserializer:このプロパティの値は ByteArrayDeserializer に設定されます。
value.deserializer:このプロパティの値は ByteArrayDeserializer に設定されます。
auto.offset.reset.strategy:このプロパティの値は OffsetsInitializer#getAutoOffsetResetStrategy() に設定されます。
次のサンプルコードは、Kafka コンシューマーが JAAS 設定と SASL/PLAIN 認証メカニズムを使用して Kafka クラスターに接続する方法を示しています。
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")監視
Kafka ソースは、監視と診断のために Realtime Compute for Apache Flink にメトリックを登録します。
メトリクススコープ
Kafka ソースのすべてのメトリックは、KafkaSourceReader メトリックグループの下に登録されます。KafkaSourceReader は、オペレーターメトリックグループのサブグループです。特定のパーティションのメトリックは、KafkaSourceReader.topic.<topic_name>.partition.<partition_id> メトリックグループに登録されます。
たとえば、トピック名が my-topic で、トピックのパーティション名が 1 の場合、パーティションのコンシューマーオフセットは <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset メトリックによって報告されます。コンシューマーオフセットの成功したコミット数は、<some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded メトリックによって測定されます。
メトリック
メトリック
説明
範囲
currentOffset
現在のコンシューマーオフセット
TopicPartition
committedOffset
現在のコミットオフセット
TopicPartition
commitsSucceeded
成功した送信数
KafkaSourceReader
commitsFailed
失敗した送信数
KafkaSourceReader
Kafka コンシューマーメトリック
Kafka コンシューマーのメトリックは、KafkaSourceReader.KafkaConsumer メトリックグループに登録されます。たとえば、records-consumed-total メトリックは <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total に登録されます。
register.consumer.metrics オプションを設定して、Kafka コンシューマーのメトリックを登録するかどうかを指定できます。デフォルトでは、register.consumer.metrics オプションは true に設定されています。Kafka コンシューマーのメトリックの詳細については、「Apache Kafka」をご参照ください。
Kafka シンクの作成
Kafka シンクは、複数のストリームから 1 つまたは複数の Kafka トピックにデータを書き込むことができます。
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) // // プロデューサー設定 .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") // ターゲットトピック .setKafkaValueSerializer(StringSerializer.class) // シリアル化スキーマ .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // フォールトトレランス .build(); stream.sinkTo(kafkaSink);以下のパラメーターを設定する必要があります。
パラメーター
説明
Topic
データが書き込まれるトピックの名前。
データシリアル化
Kafka シンクを構築する際には、入力データを Kafka の
ProducerRecordオブジェクトに変換するためにKafkaRecordSerializationSchemaを提供する必要があります。Flink は、メッセージキーと値のシリアル化、トピックの選択、メッセージのパーティショニングなどの一般的なコンポーネントを提供するスキーマビルダーを提供します。より高度な制御のために、対応するインターフェイスを実装することもできます。Kafka シンクは、受信する各レコードに対して ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) メソッドを呼び出し、シリアル化されたレコードを表す ProducerRecord オブジェクトを生成します。その後、Kafka シンクはその ProducerRecord オブジェクトを必要なトピックに書き込みます。各レコードが Kafka にどのように書き込まれるかを詳細に制御できます。ProducerRecord を使用して、以下の操作を実行できます:
ターゲットトピックの名前を設定します。
メッセージキーを定義します。
ターゲットパーティションを指定します。
Kafka クライアントプロパティ
bootstrap.servers プロパティは必須です。カンマ区切りの Kafka ブローカーアドレスのリストを指定します。
フォールトトレランスセマンティクス
チェックポイント機能を有効にすると、Kafka シンクは exactly-once 配信を保証できます。また、DeliveryGuarantee パラメーターを設定して、異なるフォールトトレランスセマンティクスを指定することもできます。DeliveryGuarantee パラメーターの詳細は以下のとおりです:
DeliveryGuarantee.NONE:Flink による配信保証は提供されません。データが失われたり、重複したりする可能性があります。
DeliveryGuarantee.AT_LEAST_ONCE:Kafka シンクはデータが失われないことを保証します。ただし、データが重複する可能性があります。
DeliveryGuarantee.EXACTLY_ONCE:Kafka シンクはデータが失われたり、重複したりしないことを保証します。Kafka トランザクションメカニズムを使用して exactly-once 配信を保証します。
説明exactly-once セマンティクスの詳細については、「Semantic.EXACTLY_ONCE の使用上の注意」をご参照ください。
データインジェスト
Kafka コネクタは、YAML ベースのデータインジェストジョブでソースまたはシンクとして使用できます。
制限事項
VVR 11.1 以降を使用する Realtime Compute for Apache Flink の Flink CDC データインジェストでは、Kafka を同期データソースとして使用することを推奨します。
JSON、Debezium JSON、Canal JSON 形式のみがサポートされています。他のデータ形式はサポートされていません。
ソースの場合、同じテーブルのデータが複数のパーティションに分散できるのは、VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink のみです。
構文
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092設定オプション
一般
パラメーター
説明
必須
データの型
デフォルト値
備考
type
ソースまたはシンクのタイプ。
はい
String
なし
値を kafka に設定します。
name
ソースまたはシンクの名前。
いいえ
String
デフォルト値なし。
N/A
properties.bootstrap.servers
Kafka ブローカーの IP アドレスとポート番号。
はい
String
なし
フォーマット:
host:port,host:port,host:port。複数の host:port ペアはカンマ (,) で区切ります。properties.*
Kafka クライアント用に設定されるオプション。
いいえ
String
なし
サフィックスは、公式 Kafka ドキュメントで定義されているプロデューサーまたはコンシューマー設定である必要があります。
Flink は properties. プレフィックスを削除し、変換されたキーと値を Kafka クライアントに渡します。たとえば、
properties.allow.auto.create.topicsを false に設定して、自動トピック作成を無効にできます。key.format
Kafka メッセージのキーフィールドを読み書きするために使用されるフォーマット。
いいえ
String
なし
ソースの場合、json のみがサポートされます。
シンクの場合、有効な値は次のとおりです:
csv
json
説明このオプションは、VVR 11.0.0 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
value.format
Kafka メッセージの値フィールドを読み書きするために使用されるフォーマット。
いいえ
String
debezium-json
有効な値:
debezium-json
canal-json
json
説明debezium-json および canal-json 形式は、VVR 8.0.10 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
json 形式は、VVR 11.0.0 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
source table
パラメーター
説明
必須
データの型
デフォルト値
備考
topic
データを読み取るトピックの名前。
いいえ
String
デフォルト値なし。
複数のトピック名はセミコロン (;) で区切ります。例:topic-1 と topic-2。
説明topic オプションは topic-pattern オプションと併用できません。
topic-pattern
トピックを照合するために使用される正規表現。ジョブの実行中に、指定された正規表現に名前が一致するすべてのトピックのデータが読み取られます。
いいえ
String
デフォルト値なし。
説明topic オプションは topic-pattern オプションと併用できません。
properties.group.id
コンシューマーグループ ID。
いいえ
String
なし
指定されたグループ ID を初めて使用する場合は、properties.auto.offset.reset を earliest または latest に設定して、初期開始オフセットを指定する必要があります。
scan.startup.mode
Kafka がデータを読み取る開始オフセット。
いいえ
String
group-offsets
有効な値:
earliest-offset:最も古いパーティションからデータを読み取ります。
latest-offset:最新のオフセットからデータを読み取ります。
group-offsets (デフォルト):properties.group.id で指定されたグループのコミット済みオフセットから読み取りを開始します。
timestamp:scan.startup.timestamp-millis で指定されたタイムスタンプから読み取ります。
specific-offsets:scan.startup.specific-offsets で指定されたオフセットから読み取ります。
説明このオプションは、ジョブがステートなしで開始された場合に有効になります。ジョブがチェックポイントから再開されたり、指定されたステートから再開されたりすると、ジョブは優先的にステートデータに保存されている進行状況でデータの読み取りを開始します。
scan.startup.specific-offsets
scan.startup.mode オプションが specific-offsets に設定されている場合の各パーティションの開始オフセット。
いいえ
String
なし
例:
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
scan.startup.mode オプションが timestamp に設定されている場合の開始オフセットのタイムスタンプ。
いいえ
Long
なし
単位:ミリ秒。
scan.topic-partition-discovery.interval
Kafka のトピックとパーティションを動的に検出する時間間隔。
いいえ
Duration
5 分
デフォルトのパーティション検出間隔は 5 分です。この機能を無効にするには、このオプションを明示的に非正の値に設定する必要があります。動的パーティション検出機能が有効になると、Kafka ソースは新しいパーティションを自動的に検出し、そのパーティションからデータを読み取ることができます。topic-pattern モードでは、Kafka ソースは既存のトピックの新しいパーティションからデータを読み取り、正規表現に一致する新しいトピックのすべてのパーティションからデータを読み取ります。
scan.check.duplicated.group.id
properties.group.idで指定された重複するコンシューマーグループをチェックするかどうか。いいえ
Boolean
false
有効な値:
true:ジョブが開始する前に重複するコンシューマーグループをチェックします。重複するコンシューマーグループが存在する場合、エラーを報告し、競合を防ぐためにジョブを一時停止します。
false:ジョブが開始する前に重複するコンシューマーグループをチェックしません。
schema.inference.strategy
スキーマ推論戦略。
いいえ
String
continuous
有効な値:
continuous:各レコードのスキーマを推論します。スキーマに互換性がない場合、より広いスキーマを推論し、スキーマ変更イベントを生成します。
static:ジョブ開始時にスキーマ推論を一度だけ実行します。後続のレコードは初期スキーマに基づいて解析されます。スキーマ変更イベントは生成されません。
説明スキーマ解析の詳細については、「テーブルスキーマの解析と変更同期ポリシー」をご参照ください。
このオプションは、VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
scan.max.pre.fetch.records
初期スキーマ推論中に、システムがパーティション内で消費および解析を試みるメッセージの最大数。
いいえ
Int
50
ジョブがデータを読み取って処理する前に、システムはスキーマ情報を初期化するために、パーティション内の特定の数の最新メッセージを事前に消費しようとします。
key.fields-prefix
Kafka メッセージのキーフィールドから解析されたフィールドに追加されるプレフィックス。Kafka メッセージのキーフィールドが解析された後の名前の競合を防ぐために、このオプションを設定します。
いいえ
String
なし
たとえば、このオプションが key_ に設定され、キーフィールドに a という名前のフィールドが含まれている場合、解析後のフィールド名は key_a になります。
説明key.fields-prefix オプションの値は、value.fields-prefix オプションのプレフィックスであってはなりません。
value.fields-prefix
Kafka メッセージの値フィールドから解析されたフィールドに追加されるプレフィックス。Kafka メッセージの値フィールドが解析された後の名前の競合を防ぐために、このオプションを設定できます。
いいえ
String
なし
たとえば、このオプションが value_ に設定され、値フィールドに b という名前のフィールドが含まれている場合、解析後のフィールド名は value_b になります。
説明value.fields-prefix オプションの値は、key.fields-prefix オプションのプレフィックスであってはなりません。
metadata.list
下流のストレージに渡すメタデータ列。
いいえ
String
なし
利用可能なメタデータ列には、
topic、partition、offset、timestamp、timestamp-type、headers、leader-epoch、__raw_key__、および__raw_value__があり、カンマで区切られます。scan.value.initial-schemas.ddls
DDL 文を使用して特定のテーブルの初期スキーマを指定します。
いいえ
String
なし
複数の DDL 文は英語のセミコロン (
;) で接続されます。たとえば、CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);を使用して、テーブル db1.t1 と db1.t2 の初期スキーマを指定します。ここでの DDL テーブル構造は、ターゲットテーブルと一致し、Flink SQL 構文ルールに準拠する必要があります。
説明VVR バージョン 11.5 以降はこの設定をサポートしています。
ingestion.ignore-errors
データ解析中にエラーを無視するかどうかを指定します。
いいえ
Boolean
false
説明この設定は VVR 11.5 以降のバージョンでサポートされています。
ingestion.error-tolerance.max-count
データ解析中にエラーが無視される場合、ジョブが失敗するまでの解析エラーの数。
いいえ
Integer
-1
このオプションは、ingestion.ignore-errors が有効な場合にのみ有効です。デフォルト値 -1 は、解析例外がジョブの失敗をトリガーしないことを意味します。
説明Ververica Runtime (VVR) バージョン 11.5 以降はこの設定をサポートしています。
Debezium JSON 形式のソーステーブル
パラメーター
必須
データの型
デフォルト値
説明
debezium-json.distributed-tables
いいえ
Boolean
false
Debezium JSON の単一テーブルのデータが複数のパーティションに現れる場合は、このオプションを有効にする必要があります。
説明このオプションは、VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
重要このオプションを設定した後、ステートなしでデプロイメントを開始する必要があります。
debezium-json.schema-include
いいえ
Boolean
false
Debezium Kafka Connect を設定する際、Kafka 設定 value.converter.schemas.enable を有効にしてメッセージにスキーマ情報を含めることができます。このオプションは、Debezium JSON メッセージにスキーマ情報を含めるかどうかを指定します。
有効な値:
true:Debezium JSON メッセージにスキーマ情報が含まれます。
false:Debezium JSON メッセージにスキーマ情報が含まれません。
debezium-json.ignore-parse-errors
いいえ
Boolean
false
有効な値:
true:解析例外が発生した場合、現在の行をスキップします。
false (デフォルト):エラーを返し、デプロイメントの開始に失敗します。
debezium-json.infer-schema.primitive-as-string
いいえ
Boolean
false
テーブルスキーマを解析する際に、すべてのデータ型を STRING として解釈するかどうかを指定します。
有効な値:
true:すべての基本型を STRING として解釈します。
false (デフォルト) の場合、解析は基本ルールに従います。
ソーステーブル用の Canal JSON フォーマット
パラメーター
必須
データの型
デフォルト値
説明
canal-json.distributed-tables
いいえ
Boolean
false
Canal JSON の単一テーブルのデータが複数のパーティションに現れる場合は、このオプションを有効にする必要があります。
説明このオプションは、VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
重要このオプションを設定した後、ステートなしでデプロイメントを開始する必要があります。
canal-json.database.include
いいえ
String
なし
Canal レコードのデータベースメタデータフィールドに一致するオプションの正規表現。指定されたデータベースの変更ログのみが読み取られます。正規表現文字列は Java の Pattern と互換性があります。
canal-json.table.include
いいえ
String
なし
Canal レコードのテーブルメタデータフィールドに一致するオプションの正規表現。指定されたテーブルの変更ログレコードのみが読み取られます。正規表現は Java の Pattern と互換性があります。
canal-json.ignore-parse-errors
いいえ
Boolean
false
有効な値:
true:解析例外が発生した場合、現在の行をスキップします。
false (デフォルト):エラーを返し、デプロイメントの開始に失敗します。
canal-json.infer-schema.primitive-as-string
いいえ
Boolean
false
テーブルスキーマを解析する際に、すべてのデータ型を STRING として解釈するかどうかを指定します。
有効な値:
true:すべての基本型を STRING として解釈します。
false (デフォルト):パーサは基本ルールに従います。
canal-json.infer-schema.strategy
いいえ
String
AUTO
スキーマ推論戦略。
有効な値:
AUTO (デフォルト):JSON データを解析してスキーマを自動的に推論します。データに sqlType フィールドが含まれていない場合は、解析の失敗を避けるために AUTO を使用します。
SQL_TYPE:Canal JSON データの sqlType 配列を使用してスキーマを推論します。データに sqlType フィールドが含まれている場合は、より正確な型推論のために canal-json.infer-schema.strategy を SQL_TYPE に設定することを推奨します。
MYSQL_TYPE:Canal JSON データの mysqlType 配列を使用してスキーマを推論します。
Kafka の Canal JSON データに sqlType フィールドが含まれており、より正確な型マッピングが必要な場合は、canal-json.infer-schema.strategy を SQL_TYPE に設定します。
sqlType マッピングルールの詳細については、「Canal JSON スキーマ解析」をご参照ください。
説明Ververica Runtime (VVR) 11.1 以降のバージョンはこの設定をサポートしています。
MYSQL_TYPE は、VVR 11.3 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
いいえ
Boolean
true
MySQL TIMESTAMP を CDC TIMESTAMP にマッピングするかどうかを指定します:
true (デフォルト):MySQL TIMESTAMP を CDC TIMESTAMP にマッピングします。
false:MySQL TIMESTAMP を CDC TIMESTAMP_LTZ にマッピングします。
canal-json.mysql.treat-tinyint1-as-boolean.enabled
いいえ
Boolean
true
MYSQL_TYPE を使用して解析する場合、MySQL TINYINT(1) を CDC BOOLEAN にマッピングするかどうかを指定します:
true (デフォルト):MySQL TINYINT(1) を CDC BOOLEAN にマッピングします。
false:MySQL TINYINT(1) を CDC TINYINT(1) にマッピングします。
このオプションは、canal-json.infer-schema.strategy が MYSQL_TYPE に設定されている場合にのみ有効です。
ソーステーブル JSON 形式
パラメーター
必須
データの型
デフォルト値
説明
json.timestamp-format.standard
いいえ
String
SQL
入出力のタイムスタンプ形式を指定します。有効な値:
SQL:yyyy-MM-dd HH:mm:ss.s{precision} 形式の入力タイムスタンプを解析します。例:2020-12-30 12:13:14.123。
ISO-8601:yyyy-MM-ddTHH:mm:ss.s{precision} 形式の入力タイムスタンプを解析します。例:2020-12-30T12:13:14.123。
json.ignore-parse-errors
いいえ
Boolean
false
有効な値:
true:解析例外が発生した場合、現在の行をスキップします。
false (デフォルト):エラーを返し、デプロイメントの開始に失敗します。
json.infer-schema.primitive-as-string
いいえ
Boolean
false
テーブルスキーマを解析する際に、すべてのデータ型を STRING として解釈するかどうかを指定します。
有効な値:
true:すべての基本型を STRING として解釈します。
false (デフォルト):基本ルールに従って解析します。
json.infer-schema.flatten-nested-columns.enable
いいえ
Boolean
false
JSON 形式のデータを解析する際、ネストされた列を再帰的に展開するかどうかを指定する必要があります。このパラメーターの有効な値は次のとおりです:
true:ネストされた列を再帰的に展開します。
false (デフォルト):ネストされた型を STRING として扱います。
json.decode.parser-table-id.fields
いいえ
String
なし
特定の JSON フィールドの値に基づいて tableId を生成するかどうかを指定します。複数のフィールドはカンマ
,で区切ります。たとえば、JSON データが{"col0":"a", "col1","b", "col2","c"}の場合、生成される結果は次のようになります:設定
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
json.infer-schema.fixed-types
いいえ
String
デフォルト値なし。
JSON 形式のデータを解析する際、特定のフィールドの正確なデータ型を指定します。複数のフィールドは英語のカンマ (
,) で区切ります。たとえば、id BIGINT, name VARCHAR(10)は、JSON データの id フィールドを BIGINT、name フィールドを VARCHAR(10) として指定します。この設定を使用する場合、
scan.max.pre.fetch.records: 0設定も追加する必要があります。説明このオプションは、VVR 11.5 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
シンク固有
パラメーター
説明
必須
データの型
デフォルト値
備考
type
シンクのタイプ。
はい
String
なし
値を Kafka に設定します。
name
シンクの名前。
いいえ
String
なし
N/A
topic
Kafka トピックの名前。
いいえ
String
なし
このオプションが有効な場合、すべてのデータはこのトピックに書き込まれます。
説明このオプションが有効でない場合、各データレコードは、そのテーブル ID 文字列 (データベース名とテーブル名をピリオド (
.) で連結したもの) から派生した名前のトピックに書き込まれます。例:databaseName.tableName。partition.strategy
Kafka のパーティショニング戦略。
いいえ
String
all-to-zero
有効な値:
all-to-zero (デフォルト):すべてのデータをパーティション 0 に書き込みます。
hash-by-key:プライマリキーのハッシュ値に基づいてデータをパーティションに書き込みます。これにより、同じプライマリキーを持つデータが同じパーティションにあり、順序付けられていることが保証されます。
sink.tableId-to-topic.mapping
上流のテーブル名と下流の Kafka トピック名のマッピング。
いいえ
String
なし
各マッピング関係は
;で区切られます。先祖テーブル名とそれに対応する下流の Kafka トピック名は:で区切られます。テーブル名には正規表現を使用でき、同じトピックにマッピングされる複数のテーブルは,を使用して連結できます。例:mydb.mytable1:topic1;mydb.mytable2:topic2。説明このパラメーターを設定すると、元のテーブル名情報を保持したまま、マッピングされたトピックを変更できます。
Debezium JSON 形式のシンクテーブル
パラメーター
必須
データの型
デフォルト値
説明
debezium-json.include-schema.enabled
いいえ
Boolean
false
Debezium JSON データにスキーマ情報を含めるかどうかを指定します。
例
Kafka からデータをインジェストする:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADENKafka にデータをインジェストする:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}route セクションで、宛先の Kafka トピックの名前を指定します。
デフォルトでは、Alibaba Cloud Kafka の自動トピック作成機能は無効になっています。詳細については、「自動トピック作成に関するよくある質問」をご参照ください。Alibaba Cloud Kafka にデータを書き込む場合は、事前に対応するトピックを作成する必要があります。詳細については、「ステップ 3:リソースの作成」をご参照ください。
スキーマの解析と進化に関するポリシー
Kafka コネクタは、既知のすべてのテーブルのスキーマを維持します。
スキーマの初期化
スキーマ情報には、フィールドとデータ型の情報、データベースとテーブルの情報、およびプライマリキーの情報が含まれます。これら 3 種類の情報がどのように初期化されるかを以下に説明します:
フィールドとデータ型の情報
データインジェストジョブは、データからフィールドとデータ型の情報を自動的に推論できます。ただし、一部のシナリオでは、特定のテーブルのフィールドと型の情報を指定したい場合があります。ユーザーが指定するフィールド型の粒度に基づいて、スキーマの初期化は以下の 3 つの戦略をサポートします:
システムによって完全に推論されるスキーマ
Kafka メッセージが読み取られる前に、Kafka コネクタは各パーティションのメッセージを消費しようとし、各データレコードのスキーマを解析し、その後スキーマをマージしてテーブルスキーマ情報を初期化します。消費できるメッセージの数は、scan.max.pre.fetch.records オプションの値以下です。データが消費される前に、初期化されたスキーマに基づいてテーブル作成イベントが生成されます。
Debezium JSON および Canal JSON 形式の場合、テーブル情報は特定のメッセージに含まれます。事前に消費するメッセージの数は、scan.max.pre.fetch.records パラメーターによって指定されます。これらの事前に消費されたメッセージには、複数のテーブルのデータが含まれている可能性があります。したがって、各テーブルの事前に消費されたデータレコードの数は決定できません。パーティションメッセージの事前消費とテーブルスキーマの初期化は、各パーティションのメッセージの実際の消費と処理の前に一度だけ実行されます。後続のテーブルデータが存在する場合、テーブルの最初のデータレコードから解析されたテーブルスキーマが初期テーブルスキーマとして使用されます。この場合、パーティションメッセージの事前消費とテーブルスキーマの初期化は再度実行されません。
単一テーブルのデータが複数のパーティションに分散できるのは、Ververica Runtime (VVR) 8.0.11 以降のみです。このシナリオでは、debezium-json.distributed-tables または canal-json.distributed-tables オプションを true に設定する必要があります。
初期スキーマの指定
一部のシナリオでは、たとえば Kafka から事前に作成された子孫テーブルにデータを書き込む場合など、初期テーブルスキーマを自分で指定したい場合があります。これを行うには、scan.value.initial-schemas.ddls パラメーターを追加します。以下のコードは設定例です:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# 初期スキーマを設定します。
scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);CREATE TABLE 文は、ターゲットテーブルのスキーマと一致する必要があります。ここでは、テーブル db1.t1 の id フィールドの初期型は BIGINT に、name フィールドの初期型は VARCHAR(10) に設定されます。同様に、テーブル db1.t2 の id フィールドの初期型は BIGINT に設定されます。
CREATE TABLE 文は Flink SQL 構文を使用します。
フィールド型の固定
一部のシナリオでは、たとえば TIMESTAMP 型として推論される可能性のある特定のフィールドを文字列として配信したい場合など、特定のフィールドのデータ型を固定したい場合があります。この場合、json.infer-schema.fixed-types パラメーターを追加して初期テーブルスキーマを指定できます (メッセージ形式が JSON の場合のみ有効)。設定例:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# 特定のフィールドを静的型に固定します。
json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
scan.max.pre.fetch.records: 0これにより、すべての id フィールドの型が BIGINT に、すべての name フィールドの型が VARCHAR(10) に固定されます。
ここでの型は Flink SQL データ型と一致します。
データベースとテーブルの情報
Canal JSON および Debezium JSON 形式の場合、データベースとテーブルの名前は個々のメッセージから解析されます。
デフォルトでは、JSON 形式のメッセージの場合、テーブル情報にはテーブル名 (データを含むトピックの名前) のみが含まれます。データにデータベースとテーブルの情報が含まれている場合は、json.infer-schema.fixed-types パラメーターを使用して、この情報を含むフィールドを指定できます。これらのフィールドをデータベース名とテーブル名にマッピングします。以下のコードは設定例です:
source: type: kafka name: Kafka Source properties.bootstrap.servers: host:9092 topic: test-topic value.format: json scan.startup.mode: earliest-offset # col1 フィールドの値をデータベース名として、col2 フィールドの値をテーブル名として使用します。 json.decode.parser-table-id.fields: col1,col2これにより、各レコードは、データベース名が col1 フィールドの値、テーブル名が col2 フィールドの値であるテーブルに書き込まれます。
プライマリキー
Canal JSON 形式の場合、テーブルのプライマリキーは JSON の pkNames フィールドに基づいて定義されます。
Debezium JSON および JSON 形式の場合、JSON にはプライマリキー情報が含まれていません。transform ルールを使用して、テーブルに手動でプライマリキーを追加できます:
transform: - source-table: \.*\.\.* projection: \* primary-keys: key1, key2
スキーマの解析とスキーマ進化
初期スキーマ同期が完了した後、schema.inference.strategy が static に設定されている場合、Kafka コネクタは各メッセージの値を初期テーブルスキーマに基づいて解析し、スキーマ変更イベントを生成しません。一方、schema.inference.strategy が continuous に設定されている場合、Kafka コネクタは各 Kafka メッセージの値部分を物理列として解析し、その列を現在維持されているスキーマと比較します。解析されたスキーマが現在のスキーマと不一致の場合、Kafka コネクタはスキーマをマージしようと試み、対応するテーブルスキーマ変更イベントを生成します。マージ規則は以下のとおりです:
解析された物理列に現在のスキーマにないフィールドが含まれている場合、Kafka コネクタはそれらのフィールドをスキーマに追加し、NULL 許容の列追加イベントを生成します。
解析された物理列に現在のスキーマに既に存在するフィールドが含まれていない場合、それらのフィールドは保持され、値は NULL で埋められます。列削除イベントは生成されません。
解析された物理列と現在のスキーマに同じ名前の列が含まれている場合、以下のように処理します:
データ型が同じで精度のみ異なる場合、より高い精度の型を使用し、列型変更イベントを生成します。
データ型が異なる場合、ツリー構造における最小の親ノードを、同名の列の型として使用し、列型変更イベントを生成します。

サポートされるスキーマ進化オプション:
列の追加:新しい列を現在のスキーマの末尾に追加し、新しい列のデータを同期します。新しい列は NULL 許容になります。
列の削除:列削除イベントは生成されません。代わりに、その後のその列のデータは自動的に NULL 値で埋められます。
列の名前変更:列の追加と削除として扱われます。名前変更された列がスキーマの末尾に追加され、元の列のデータは NULL 値で埋められます。
列のデータ型の変更:
下流システムが列型変更をサポートしている場合、データインジェストジョブは、下流シンクが列型変更の処理をサポートするようになると、通常の列の型変更 (たとえば INT から BIGINT) をサポートします。このような変更は、下流シンクがサポートする列型変更規則に依存します。異なるシンクテーブルは異なる列型変更規則をサポートします。関連するシンクテーブルのドキュメントを参照して、サポートされている列型変更規則について確認してください。
Hologres のような列型変更をサポートしない下流システムの場合、ワイド型マッピング を使用できます。この方法では、ジョブ開始時に下流システムに汎用性の高いデータ型を持つテーブルを作成します。列型変更が発生した場合、システムは下流シンクがその変更を受け入れられるかどうかを判断し、これにより列型変更に対する許容的なサポートを実現します。
サポートされていないスキーマ変更:
プライマリキーまたはインデックスなどの制約の変更。
NOT NULL から NULLABLE への変更。
Canal JSON のスキーマ解析
Canal JSON 形式のデータには、オプションの sqlType フィールドが含まれており、データ列の正確な型情報を提供します。より正確なスキーマを得るためには、canal-json.infer-schema.strategy 設定を SQL_TYPE に設定して、sqlType からの型を使用できます。型マッピング関係は以下のとおりです:
JDBC データ型
型コード
CDC データ型
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
-5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
NUMERIC
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
BINARY
-2
BYTES
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
DATE
91
DATE
TIME
92
TIME
TIMESTAMP
93
TIMESTAMP
CHAR
1
STRING
VARCHAR
12
LONGVARCHAR
-1
その他のデータ型
ダーティデータの許容と収集
一部のケースでは、Kafka データソースに不正なデータ (ダーティデータ) が含まれている可能性があります。このようなダーティデータによってジョブが頻繁に再起動されるのを防ぐために、ジョブを設定してこれらの例外を無視できるようにできます。設定例:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# ダーティデータの許容を有効にします。
ingestion.ignore-errors: true
# 最大 1000 件のダーティデータレコードを許容します。
ingestion.error-tolerance.max-count: 1000この設定により、最大 1000 件のダーティデータレコードが許容され、少量のダーティデータが存在する場合でもジョブが正常に実行されます。ダーティデータレコードの数がこのしきい値を超えると、ジョブは失敗し、データの検証が必要になります。
ダーティデータによってジョブが失敗しないことを保証するために、次の設定を使用できます:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# ダーティデータの許容を有効にします。
ingestion.ignore-errors: true
# 全てのダーティデータを許容します。
ingestion.error-tolerance.max-count: -1ダーティデータの許容ポリシーにより、異常なデータによってジョブが頻繁に失敗することを防ぎます。また、Kafka プロデューサーの動作を調整するために、ダーティデータについて詳しく知りたい場合もあります。ダーティデータ収集 で説明されているプロセスでは、ジョブのダーティデータを TaskManager のログファイルで確認できます。設定例:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# ダーティデータの許容を有効にします。
ingestion.ignore-errors: true
# 全てのダーティデータを許容します。
ingestion.error-tolerance.max-count: -1
pipeline:
dirty-data.collector:
# ダーティデータを TaskManager のログファイルに書き込みます。
type: loggerテーブル名とトピックのマッピング戦略
Kafka をデータインジェストシンクとして使用する場合、メッセージ形式 (debezium-json や canal-json) には多くの場合テーブル名情報が含まれています。コンシューマーは通常、トピック名ではなく、このテーブル名を実際のテーブル名として使用します。したがって、テーブル名とトピックのマッピング戦略を注意深く設定する必要があります。
MySQL から mydb.mytable1 および mydb.mytable2 の 2 つのテーブルを同期する必要があると仮定します。可能なマッピング戦略は以下のとおりです:
1. 何のマッピング戦略も設定しない
マッピング戦略を設定しない場合、各テーブルはデータベース名とテーブル名を基にした名前のトピック (例:mydb.mytable1) に書き込まれます。したがって、mydb.mytable1 のデータは mydb.mytable1 トピックに、mydb.mytable2 のデータは mydb.mytable2 トピックに書き込まれます。設定例:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. ルートルールによるマッピングの設定 (推奨されません)
多くのシナリオでは、ユーザーはデータベース名とテーブル名を含むトピック名を望まず、代わりに特定のトピックにデータをマップするルートルールを設定します。設定例:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytable1この場合、mydb.mytable1 および mydb.mytable2 のすべてのデータが mytable1 トピックに書き込まれます。
ただし、ルートルールを使用してトピック名を変更すると、Kafka メッセージ (debezium-json や canal-json 形式) 内のテーブル名も変更されます。その結果、このトピック内のすべてのメッセージのテーブル名は mytable1 になります。このトピックからメッセージを消費する他のシステムは、予期しない動作をする可能性があります。
3. sink.tableId-to-topic.mapping パラメーターによるマッピングの設定 (推奨)
元のテーブル名情報を保持したままカスタムトピックにマッピングするには、sink.tableId-to-topic.mapping パラメーターを使用します。設定例:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}または:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}この設定では、`mydb.mytable1` および `mydb.mytable2` のすべてのデータが `mytable1` トピックに書き込まれますが、元のテーブル名 (`mydb.mytable1` または `mydb.mytable2`) は Kafka メッセージ形式 (`debezium-json` や `canal-json`) 内で保持されます。これにより、このトピックからメッセージを消費する他のシステムが、ソーステーブル名情報を正しく取得できます。
EXACTLY_ONCE セマンティクスに関する考慮事項
コンシューマー分離レベルの設定
Kafka データを消費するすべてのアプリケーションは、isolation.level を設定する必要があります:
read_committed:コミット済みのデータのみを読み取ります。read_uncommitted(デフォルト):未コミットのデータを読み取ることもできます。
EXACTLY_ONCE は
read_committedに依存しています。そうでないと、コンシューマーが未コミットのデータを読み取ってしまい、一貫性が損なわれる可能性があります。トランザクションタイムアウトとデータ損失
チェックポイントから復旧する際、Flink はチェックポイントの開始前にコミットされたトランザクションのみに依存します。ジョブのクラッシュと再起動の間隔が Kafka トランザクションのタイムアウトを超えている場合、Kafka は自動的にトランザクションを中止し、データが失われる可能性があります。
Kafka ブローカーのデフォルトの
transaction.max.timeout.ms= 15 分。Flink Kafka シンクは、デフォルトで
transaction.timeout.msを 1 時間に設定します。ブローカーの
transaction.max.timeout.msを Flink の設定以上に増やす必要があります。
Kafka プロデューサープールと同時チェックポイント
EXACTLY_ONCE モードでは、固定サイズの Kafka プロデューサープールが使用されます。各チェックポイントはプールから 1 つのプロデューサーを消費します。同時チェックポイント数がプールサイズを超えると、ジョブは失敗します。
プールサイズは、最大同時チェックポイント数に応じて調整してください。
スケールダウンの制限
最初のチェックポイントの前にジョブが失敗した場合、プロデューサープール情報は再起動後に保持されません。したがって、最初のチェックポイントが完了するまでは、ジョブの並列度をスケールダウンしないでください。スケールダウンする必要がある場合は、並列度を
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR未満にしてはいけません。トランザクションによる読み取りのブロッキング
read_committedモードでは、未完了 (コミットも中止もされていない) のトランザクションが、トピック全体からの読み取りをブロックします。たとえば:
トランザクション 1 がデータを書き込みます。
トランザクション 2 がデータを書き込み、コミットします。
トランザクション 1 が完了するまで、トランザクション 2 のデータはコンシューマーには表示されません。
したがって:
通常の運用では、データの可視性は平均チェックポイント間隔分遅延します。
ジョブが失敗した場合、書き込み中のトピックはジョブの再起動またはトランザクションのタイムアウトまでコンシューマーの読み取りをブロックします。極端な場合、トランザクションのタイムアウトが読み取りにも影響を与える可能性があります。