このトピックでは、Message Queue for Apache Kafka コネクタについて説明します。
背景情報
Apache Kafka は、ビッグデータ分野で広く利用されるオープンソースの分散型メッセージキューであり、高性能なデータ処理、ストリーミング分析、およびデータ統合に使用されます。Realtime Compute for Apache Flink 向けの Realtime Compute for Apache Flink の Kafka コネクタは、オープンソースの Apache Kafka クライアントを活用し、高いデータスループットを実現するとともに、複数のデータ形式の読み書きと「1 回限りのセマンティクス」をサポートします。
カテゴリ | 説明 |
対応タイプ | ソーステーブル、シンクテーブル、およびデータ取り込みシンク |
実行モード | ストリーミングモード |
データ形式 | |
メトリック | |
API タイプ | SQL API、DataStream API、およびデータインジェスト YAML API |
結果テーブルへのデータの更新または削除 | このコネクタは、結果テーブルへのデータ挿入のみをサポートしています。更新および削除はサポートされていません。 説明 結果テーブルへのデータの更新または削除方法の詳細については、「Upsert Kafka」をご参照ください。 |
前提条件
ご使用の Kafka クラスターのタイプに応じて、以下の前提条件を満たしてください。
ApsaraMQ for Kafka クラスターへの接続
Kafka クラスターのバージョンが 0.11 以降である必要があります。
ApsaraMQ for Kafka クラスターを作成済みである必要があります。詳細については、「手順 3:リソースの作成」をご参照ください。
Flink ワークスペースと Kafka クラスターが同一の Virtual Private Cloud (VPC) 内にあり、Flink ワークスペースの CIDR ブロックが ApsaraMQ for Kafka のホワイトリストに追加済みである必要があります。詳細については、「ホワイトリストの設定」をご参照ください。
重要ApsaraMQ for Kafka へのデータ書き込みに関する制限事項:
ApsaraMQ for Kafka は、Zstandard (zstd) 圧縮形式でのデータ書き込みをサポートしていません。
ApsaraMQ for Kafka は、べき等性またはトランザクションによる書き込みをサポートしておらず、Kafka 結果テーブルが提供する「1 回限りのセマンティクス」を利用できません。Realtime Compute Engine VVR 8.0.0 以降では、Kafka コネクタが Kafka クライアント 3.x を使用しており、
properties.enable.idempotenceプロパティのデフォルト値はtrueです。したがって、Realtime Compute Engine VVR 8.0.0 以降を使用して ApsaraMQ for Kafka へ書き込む場合、書き込み失敗を防ぐために、結果テーブル定義にproperties.enable.idempotence=falseという構成を必ず追加してください。ApsaraMQ for Kafka のストレージエンジンおよび機能制限の比較については、「ストレージエンジンの比較」をご参照ください。
自己管理型 Apache Kafka クラスターへの接続
自己管理型 Apache Kafka クラスターのバージョンが 0.11 以降である必要があります。
Flink ワークスペースと自己管理型 Apache Kafka クラスター間のネットワーク接続が確立されている必要があります。パブリックインターネット経由でのクラスター接続方法の詳細については、「ネットワーク接続に関する FAQ」をご参照ください。
Apache Kafka バージョン 2.8 のクライアント構成オプションのみがサポートされています。詳細については、Apache Kafka のConsumer Configs およびProducer Configs のドキュメントをご参照ください。
注意事項
トランザクションによる書き込みは推奨されません。これは、Apache Flink および Apache Kafka の既知の設計上の制限によるものです。sink.delivery-guarantee = 'exactly-once' を設定すると、Kafka コネクタはトランザクションによる書き込みを有効化しますが、以下のような既知の問題があります。
各チェックポイントごとに新しいトランザクション ID が生成されます。チェックポイント間隔が短すぎると、過剰な数のトランザクション ID が作成されます。これにより、Apache Kafka クラスター内のコーディネーターのメモリが不足し、クラスターの安定性が損なわれる可能性があります。
各トランザクションごとに Producer インスタンスが作成されます。同時に多数のトランザクションがコミットされると、TaskManager のメモリが不足し、Apache Flink ジョブの安定性が損なわれる可能性があります。
複数の Apache Flink ジョブが同じ
sink.transactional-id-prefixを使用している場合、生成されたトランザクション ID が競合する可能性があります。あるジョブでの書き込み操作が失敗すると、Apache Kafka パーティションのログ開始オフセット (LSO) の進行が妨げられ、そのパーティションのすべてのコンシューマーに影響を与えます。
「1 回限りのセマンティクス」が必要な場合は、主キーを持つテーブルに書き込むために Upsert Kafka コネクタ を使用し、べき等性を保証してください。トランザクションによる書き込みを必須とする場合は、「1 回限りのセマンティクスの使用上の注意事項」をご参照ください。
ネットワーク接続のトラブルシューティング
Realtime Compute for Apache Flink ジョブの起動時に Timed out waiting for a node assignment エラーが発生した場合、これは通常、Realtime Compute for Apache Flink と Kafka クラスター間のネットワーク接続に問題があることを示しています。
Kafka クライアントは、以下のようにブローカーに接続します。
クライアントは、
bootstrap.serversで指定されたアドレスを使用して、Kafka クラスターへの初期接続を確立します。Kafka クラスターは、各ブローカーのエンドポイントを含むメタデータを返します。
クライアントは、これらのエンドポイントを使用してブローカーに接続し、データの読み取りまたは書き込みを行います。
bootstrap.servers のアドレスに到達可能であっても、Kafka が不正なブローカーエンドポイントを返すと、クライアントはデータの読み取りまたは書き込みができなくなります。この問題は、プロキシ、ポートフォワーディング、または専用回線を使用するネットワークアーキテクチャでよく発生します。
トラブルシューティング手順
Message Queue for Kafka
エンドポイントタイプの確認
デフォルトエンドポイント(内部ネットワーク)
SASL エンドポイント(認証付き内部ネットワーク)
パブリックエンドポイント(別途申請が必要)
ネットワークプローブ 機能を Realtime Compute for Apache Flink 開発コンソールで使用し、
bootstrap.serversアドレスとの接続性の問題を除外してください。セキュリティグループおよびホワイトリストの確認
Realtime Compute for Apache Flink ワークスペースの CIDR ブロックを Kafka インスタンスのホワイトリストに追加してください。詳細については、「VPC CIDR ブロックの表示」および「ホワイトリストの設定」をご参照ください。
SASL 構成の確認(有効な場合)
SASL_SSL エンドポイントを使用する場合、Realtime Compute for Apache Flink ジョブで JAAS、SSL、および SASL メカニズムが正しく構成されていることを確認してください。認証情報が不足していると、ハンドシェイク段階で接続が失敗し、タイムアウトとして表示されることがあります。詳細については、「セキュリティおよび認証」をご参照ください。
自己管理型 Kafka
「Network Probe」機能を使用します。
この機能により、
bootstrap.serversアドレスとの接続性の問題を除外し、正しい内部またはパブリックエンドポイントが使用されていることを確認できます。セキュリティグループおよびホワイトリストの確認
Elastic Compute Service (ECS) インスタンスのセキュリティグループが、Kafka エンドポイントのポート(通常は 9092 または 9093)に対するインバウンドトラフィックを許可している必要があります。
ECS インスタンスのファイアウォールが、Realtime Compute for Apache Flink ワークスペースの VPC からのトラフィックを許可していることを確認してください。詳細については、「VPC CIDR ブロックの表示」をご参照ください。
構成の確認
zkCli.sh または zookeeper-shell.sh ツールを使用して、Kafka が使用する ZooKeeper クラスターにログインします。
ブローカーメタデータを取得するコマンドを実行します。たとえば、
get /brokers/ids/0を実行します。応答の endpoints フィールドに、Kafka がクライアントに通知するアドレスが表示されます。
Realtime Compute for Apache Flink 開発コンソールの ネットワークプローブ 機能を使用して、このアドレスへのアクセスが可能かどうかをテストします。
説明このアドレスにアクセスできない場合、Kafka 管理者に連絡し、
listenersおよびadvertised.listeners構成を確認・修正して、Realtime Compute for Apache Flink からアクセス可能なアドレスが通知されるようにしてください。Kafka クライアント接続の詳細については、「接続性のトラブルシューティング」をご参照ください。
SASL 構成の確認(有効な場合)
SASL_SSL エンドポイントを使用する場合、Realtime Compute for Apache Flink ジョブで JAAS、SSL、および SASL メカニズムが正しく構成されていることを確認してください。認証情報が不足していると、ハンドシェイク段階で接続が失敗し、タイムアウトとして表示されることがあります。詳細については、「セキュリティおよび認証」をご参照ください。
SQL
Kafka コネクタは、SQL ジョブにおけるソーステーブルまたは結果テーブルとして使用できます。
構文
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)メタデータ列
ソーステーブルまたは結果テーブルでメタデータ列を定義することで、Kafka メッセージのメタデータにアクセスまたは書き込みできます。たとえば、WITH 句で複数のトピックを定義する場合、ソーステーブルのメタデータ列を使用して、各レコードのソーストピックを識別できます。次のコードはその例です。
CREATE TABLE kafka_source (
-- メッセージトピックを `record_topic` 列として読み取る
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- ConsumerRecord からタイムスタンプを `ts` 列として読み取る
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- メッセージオフセットを `record_offset` 列として読み取る
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- `ts` 列からタイムスタンプを ProducerRecord のタイムスタンプとして Kafka に書き込む
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);次の表は、Kafka ソースおよび結果テーブルでサポートされるメタデータ列を示しています。
キー | タイプ | 説明 | 範囲 |
topic | STRING NOT NULL METADATA VIRTUAL | メッセージのトピック。 | ソーステーブル |
partition | INT NOT NULL METADATA VIRTUAL | メッセージのパーティション ID。 | ソーステーブル |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | メッセージヘッダー。 | ソーステーブルおよび結果テーブル |
leader-epoch | INT NOT NULL METADATA VIRTUAL | メッセージの leader-epoch。 | ソーステーブル |
offset | BIGINT NOT NULL METADATA VIRTUAL | メッセージオフセット。 | ソーステーブル |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | メッセージのタイムスタンプ。 | ソーステーブルおよび結果テーブル |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | メッセージのタイムスタンプタイプ。有効な値は以下のとおりです。
| ソーステーブル |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | 生のメッセージキー。 | ソーステーブルおよび結果テーブル 説明 このパラメーターは、Ververica Runtime (VVR) 11.4 以降でのみサポートされます。 |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | 生のメッセージ値。 | ソーステーブルおよび結果テーブル 説明 このパラメーターは、Ververica Runtime (VVR) 11.4 以降でのみサポートされます。 |
WITH パラメーター
一般
パラメーター
説明
タイプ
必須
デフォルト
備考
connector
使用するコネクタ。
String
はい
該当なし
値は
kafkaでなければなりません。properties.bootstrap.servers
Kafka ブローカーのアドレス一覧。
String
はい
該当なし
書式:
host:port,host:port,...。アドレスはカンマ (,) で区切ります。properties.*
Kafka クライアントの追加プロパティ。
String
いいえ
該当なし
プロパティキーは、公式 Apache Kafka ドキュメントのProducer Configs およびConsumer Configs で定義された有効なオプションである必要があります。
Realtime Compute for Apache Flink は、properties. プレフィックスを削除し、残りのキーと値のペアを基盤となる Kafka クライアントに渡します。たとえば、自動トピック作成を無効化するために
'properties.allow.auto.create.topics' = 'false'を設定できます。以下のオプションは Kafka コネクタによって上書きされるため、この方法では構成できません。
key.deserializer
value.deserializer
format
Kafka メッセージの値をシリアル化および逆シリアル化するフォーマット。
String
いいえ
該当なし
対応フォーマット:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
説明詳細については、「フォーマットオプション」をご参照ください。
key.format
Kafka メッセージのキーをシリアル化および逆シリアル化するフォーマット。
String
いいえ
該当なし
対応フォーマット:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
説明この構成を使用する場合、key.options の指定が必須です。
key.fields
テーブルスキーマから Kafka メッセージのキーとして使用するフィールド。
String
いいえ
該当なし
複数のフィールド名はセミコロン (;) で区切ります。たとえば、
'field1;field2'のようにします。key.fields-prefix
キーと値のフィールド間で名前が重複しないよう、すべてのキー用フィールドに付与するカスタムプレフィックス。
String
いいえ
該当なし
このプレフィックスは、キーと値のフィールドを区別するために使用されます。キーのシリアル化前または逆シリアル化後に削除されます。
説明このオプションを使用する場合、
value.fields-includeをEXCEPT_KEYに設定する必要があります。value.format
Kafka メッセージの値をシリアル化および逆シリアル化するフォーマット。
String
いいえ
該当なし
この構成は
formatと同等です。ただし、formatとvalue.formatの両方を設定することはできません。両方が構成されている場合、value.formatがformatを上書きします。value.fields-include
キー用フィールドが値フォーマットに含まれるかどうかを定義します。
String
いいえ
ALL
有効な値:
ALL: Kafka メッセージの値には、すべてのテーブル列が含まれます。EXCEPT_KEY: Kafka メッセージの値には、key.fieldsで定義された列を除くすべてのテーブル列が含まれます。
ソーステーブル
パラメーター
説明
タイプ
必須
デフォルト
備考
topic
読み取るトピックまたはトピック一覧。
String
いいえ
該当なし
複数のトピックをサブスクライブするには、トピック名をセミコロン (;) で区切ります。たとえば、
'topic-1;topic-2'のようにします。説明このオプションと
topic-patternのどちらか一方のみを指定できます。両方を指定することはできません。topic-pattern
サブスクライブするトピックを定義する正規表現。コンシューマーは、このパターンに一致する名前のすべてのトピックをサブスクライブします。
String
いいえ
該当なし
説明このオプションと
topicのどちらか一方のみを指定できます。両方を指定することはできません。properties.group.id
Kafka ソースのコンシューマーグループ ID。
String
いいえ
KafkaSource-{ソーステーブル名}
初めてコンシューマーグループ ID を使用する場合、初期スタートオフセットを定義するために、properties.auto.offset.reset を
earliestまたはlatestのいずれかに設定する必要があります。scan.startup.mode
Kafka コンシューマーのスタートオフセット。
String
いいえ
group-offsets
有効な値:
earliest-offset: 最も古い利用可能なオフセットから読み取りを開始します。latest-offset: 最新のオフセットから読み取りを開始します。group-offsets: 指定された properties.group.id のコミット済みオフセットから読み取りを開始します。timestamp: 指定された scan.startup.timestamp-millis から読み取りを開始します。specific-offsets: scan.startup.specific-offsets で指定されたオフセットから読み取りを開始します。
説明このオプションは、ジョブがクリーンな状態から開始される場合にのみ有効です。チェックポイントからジョブを再開する場合、常にチェックポイント状態に保存されたオフセットから再開されます。
scan.startup.specific-offsets
scan.startup.modeがspecific-offsetsの場合の各パーティションのスタートオフセット。String
いいえ
該当なし
たとえば、
partition:0,offset:42;partition:1,offset:300のようにします。scan.startup.timestamp-millis
scan.startup.modeがtimestampに設定されている場合の、スタートタイムスタンプ(ミリ秒単位)。Long
いいえ
該当なし
単位はミリ秒です。
scan.topic-partition-discovery.interval
トピック内の新しいパーティションを検出する間隔。
Duration
いいえ
5 分
コネクタは定期的に新しいパーティションを検出し、そこから読み取りを行います。topic-pattern を使用する場合、コネクタはパターンに一致する新しいトピックも検出します。この機能を無効にするには、間隔を非正の値に設定します。
説明Ververica Runtime (VVR) 6.0.x では、動的パーティション検出はデフォルトで無効になっています。VVR 8.0 以降では、この機能はデフォルトで有効になっており、検出間隔は 5 分です。
scan.header-filter
Kafka メッセージヘッダーに基づいてメッセージをフィルター処理します。
String
いいえ
該当なし
ヘッダーのキーと値はコロン (:) で区切られます。複数のヘッダー条件は論理演算子 (& および |) を使用して結合されます。NOT 論理演算子 (!) もサポートされています。たとえば、
depart:toy|depart:book&!env:testは、ヘッダーにdepart=toyまたはdepart=bookが含まれ、かつenv=testが含まれていない場合に Kafka データを保持します。説明このオプションは、Ververica Runtime (VVR) 8.0.6 以降でのみサポートされます。
式内の括弧はサポートされていません。
論理演算は左から右に評価されます。
ヘッダーの値は UTF-8 文字列に変換されて比較されます。
scan.check.duplicated.group.id
properties.group.idを使用する別のアクティブなコンシューマーが存在するかどうかを確認します。Boolean
いいえ
false
有効な値:
true: ジョブの開始前に、重複するコンシューマーグループを確認します。重複が見つかった場合、ジョブは失敗して競合を防止します。
false: 競合の確認を行わず、ジョブを開始します。
説明このオプションは、Ververica Runtime (VVR) 6.0.4 以降でのみサポートされます。
シンクテーブル
パラメーター
説明
タイプ
必須
デフォルト
備考
topic
書き込むトピック。
String
はい
該当なし
該当なし
sink.partitioner
並列 sink インスタンスから Kafka パーティションへのレコードのマッピング方法を定義します。
String
いいえ
default
有効な値:
default: Kafka のデフォルトパーティショナーを使用します。fixed: 各並列 sink インスタンスが固定の Kafka パーティションに書き込みます。round-robin: レコードはラウンドロビン方式でパーティションに配布されます。カスタムパーティショナー:カスタムパーティショナーを使用するには、
FlinkKafkaPartitionerサブクラスの完全修飾クラス名(例:org.mycompany.MyPartitioner)を指定します。
sink.delivery-guarantee
sink の配信保証レベル。
String
いいえ
at-least-once
有効な値:
none: 保証は提供されません。レコードが失われるか、重複する可能性があります。at-least-once: レコードが失われることはありませんが、重複する可能性があります。exactly-once: Kafka トランザクションを使用して「1 回限りのセマンティクス」を提供し、レコードが失われず、重複しないことを保証します。
説明exactly-onceセマンティクスを使用する場合、sink.transactional-id-prefix も指定する必要があります。sink.transactional-id-prefix
sink.delivery-guaranteeがexactly-onceの場合に必要なトランザクション ID のプレフィックス。String
はい。
sink.delivery-guaranteeがexactly-onceの場合です。該当なし
sink.delivery-guarantee を
exactly-onceに設定した場合にのみ必須です。sink.parallelism
sink オペレーターの並列度。
Integer
いいえ
該当なし
デフォルトでは、フレームワークが上流のオペレーターに基づいて並列度を決定します。
セキュリティおよび認証
Kafka クラスターが安全な接続または認証を必要とする場合、関連するセキュリティおよび認証構成に properties. をプレフィックスとして付け、WITH パラメーター内で設定します。PLAINT を SASL メカニズムとして使用し、JAAS 構成を提供する Kafka テーブルの構成例を以下に示します。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)次の例は、セキュリティプロトコルとして SASL_SSL を、SASL メカニズムとして SCRAM-SHA-256 を使用する方法を示しています。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/* SSL 構成 */
/* サーバーの CA 証明書の truststore のパス。 */
/* Artifacts を使用してアップロードされたファイルは /flink/usrlib/ ディレクトリに格納されます。 */
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/* クライアント認証が必要な場合、keystore(秘密鍵)のパスも構成する必要があります。 */
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* サーバーのホスト名を検証するために使用されるアルゴリズム。空文字列はホスト名検証を無効化します。 */
'properties.ssl.endpoint.identification.algorithm' = '',
/* SASL 構成 */
/* SASL メカニズムを SCRAM-SHA-256 に設定します。 */
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/* JAAS を構成します。 */
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)Real-time Compute コンソールの Artifacts 機能を使用して、上記の例で言及されている CA 証明書および秘密鍵をアップロードできます。アップロードされたファイルは /flink/usrlib ディレクトリに格納されます。my-truststore.jks という名前の CA 証明書ファイルを使用する場合、WITH 句の 'properties.ssl.truststore.location' プロパティを、以下のいずれかの方法で設定できます。
'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks'を設定します。この方法では、Object Storage Service (OSS) から実行時にファイルを動的にダウンロードする必要がなくなりますが、デバッグモードはサポートされません。Realtime Compute エンジンのバージョンが VVR 11.5 以降の場合、
properties.ssl.truststore.locationおよびproperties.ssl.keystore.locationを絶対 OSS パスに構成できます。ファイルパスの書式は oss://flink-fullymanaged-<Workspace ID>/artifacts/namespaces/<Namespace name>/<file name> です。この方法では、Flink 実行時に OSS ファイルを動的にダウンロードし、デバッグモードをサポートします。
構成の確認: このトピックの例は一般的な構成を示しています。Kafka コネクタを構成する前に、Kafka の運用・保守チームに連絡して、正しいセキュリティおよび認証設定を取得してください。
エスケープ: ネイティブの Apache Flink とは異なり、Realtime Compute for Apache Flink SQL エディターでは、二重引用符 (") がデフォルトでエスケープされます。したがって、
properties.sasl.jaas.configオプションでユーザー名およびパスワードに使用される二重引用符をエスケープするために、バックスラッシュ (\) を追加する必要はありません。
ソーステーブルのスタートオフセット
スタートモード
scan.startup.mode オプションを構成することで、Kafka ソーステーブルがデータの読み取りを開始するオフセットを指定できます。有効な値は以下のとおりです。
earliest-offset: 最も古いオフセットから読み取りを開始します。
latest-offset: 最新のオフセットから読み取りを開始します。
group-offsets: 指定された properties.group.id のコンシューマーグループのコミット済みオフセットから読み取りを開始します。
timestamp: scan.startup.timestamp-millis で指定された値以上である最初のメッセージのタイムスタンプから読み取りを開始します。
specific-offsets: scan.startup.specific-offsets で指定された特定のパーティションオフセットから読み取りを開始します。
スタートモードを指定しない場合、デフォルトは「group-offsets」です。
scan.startup.mode オプションは、ステートレスジョブにのみ適用されます。ステートフルジョブを開始する場合、常にその状態に保存されたオフセットから消費されます。
次のコードはその例です。
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- 最も古いオフセットから消費します。
'scan.startup.mode' = 'earliest-offset',
-- 最新のオフセットから消費します。
'scan.startup.mode' = 'latest-offset',
-- 「my-group」というコンシューマーグループのコミット済みオフセットから消費します。
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- 「my-group」を初めて使用する場合、最も古いオフセットから消費を開始します。
'properties.auto.offset.reset' = 'latest', -- 「my-group」を初めて使用する場合、最新のオフセットから消費を開始します。
-- 指定されたタイムスタンプ(ミリ秒単位):1655395200000 から消費します。
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- 特定のオフセットから消費します。
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);スタートオフセットの優先順位
ソーステーブルのスタートオフセットは、以下の優先順位(高から低)に基づいて決定されます。
優先順位(高から低) | チェックポイントまたはセーブポイントに保存されたオフセット。 |
リアルタイムコンピュートコンソールでジョブ起動時に選択されたスタート時刻。 | |
WITH 句の scan.startup.mode で指定されたスタートオフセット。 | |
scan.startup.mode が指定されていない場合、group-offsets を使用して、対応するコンシューマーグループのオフセットから消費を開始します。 |
これらの手順のいずれかで決定されたオフセットが無効な場合(たとえば、期限切れになっているか、Kafka クラスターで問題が発生している場合)、システムは properties.auto.offset.reset で指定されたポリシーに従ってオフセットをリセットします。このオプションが構成されていない場合、システムはユーザーによる介入を必要とする例外をスローします。
一般的なシナリオとして、新しいコンシューマーグループ ID を使用してデータを消費する場合があります。ソーステーブルはまず、Kafka クラスターに対してそのグループのコミット済みオフセットを照会します。新しいグループ ID のため、有効なオフセットは見つかりません。その結果、システムは properties.auto.offset.reset で指定されたポリシーに従ってオフセットをリセットします。したがって、新しいグループ ID を使用して消費する場合、properties.auto.offset.reset オプションを構成する必要があります。
ソースオフセットのコミット
Kafka ソーステーブルは、成功したチェックポイント後にのみ、現在のコンシューマーオフセットを Kafka クラスターにコミットします。チェックポイント間隔が長い場合、Kafka クラスター内のコンシューマーオフセットが遅延します。チェックポイント中、Kafka ソーステーブルは現在の読み取り進行状況をその状態に保存します。システムはこの状態を使用して障害復旧を行い、Kafka クラスターにコミットされたオフセットには依存しません。オフセットは、Kafka 内での読み取り進行状況の監視のみを目的としてコミットされます。オフセットのコミット失敗は、データの正確性には影響しません。
カスタム sink パーティショナー
Kafka の組み込みパーティショニング戦略が要件を満たさない場合、FlinkKafkaPartitioner クラスを拡張することで、カスタムパーティショナーを実装できます。開発が完了したら、コードを JAR パッケージにコンパイルし、Realtime Compute コンソールの Artifacts 機能を使用してアップロードします。JAR パッケージがアップロードおよび参照された後、WITH 句の sink.partitioner パラメーターを、パーティショナーの完全修飾クラス名(例:org.mycompany.MyPartitioner)に設定します。
Kafka、Upsert Kafka、および Kafka JSON カタログ
Kafka は、データの更新または削除をサポートしない追加専用のメッセージキューです。ストリーミング SQL では、標準の Kafka 結果テーブルは、上流の Change Data Capture (CDC) データや集計および結合などのオペレーターの再tractionロジックを処理できません。変更または再tractionを含むデータを書き込む必要がある場合は、Upsert Kafka 結果テーブルを使用します。
1 つ以上の上流データベーステーブルから Kafka への Change Data Capture (CDC) データのバッチ同期を簡素化するために、Kafka JSON カタログを使用できます。Kafka に格納されたデータが JSON 形式の場合、Kafka JSON カタログを使用すると、スキーマおよび WITH パラメーターの定義を省略できます。詳細については、「Kafka JSON カタログの管理」をご参照ください。
例
例 1:Kafka からの読み取りおよび Kafka への書き込み
この例では、ソース Kafka トピックからデータを読み取り、結果トピックに書き込みます。データは CSV 形式です。
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;例 2:テーブルスキーマおよびデータの同期
Kafka コネクタを使用して、Kafka トピックからのメッセージを Hologres にリアルタイムで同期できます。フェールオーバー時に Hologres で重複メッセージが発生しないようにするには、Kafka メッセージのオフセットおよびパーティション ID を複合プライマリキーとして使用できます。
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- オプション:すべての入れ子になった列をフラット化します。
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;例 3:Kafka キーおよび値の同期
Kafka メッセージキーに関連する情報が含まれている場合、キーと値の両方を同期できます。
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Kafka メッセージキーは、スキーマ進化または自動型解析をサポートしていません。スキーマは手動で宣言する必要があります。
例 4:データの同期および計算の実行
Kafka から Hologres へのデータ同期には、多くの場合、軽量な計算が必要です。
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- COALESCE を使用して null 値を処理します。例 5:入れ子になった JSON の解析
以下のサンプル JSON メッセージを示します。
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Alibaba Cloud",
"engine": "Flink"
}
}JSON_VALUE(payload, '$.properties.owner') のような関数を使用してフィールドを解析する代わりに、ソース DDL で直接構造を定義できます。
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);このようにすることで、Flink は読み取りフェーズで JSON を一度に構造化されたフィールドに解析します。その後の SQL クエリでは、properties.owner を直接使用でき、追加の関数呼び出しを必要としないため、全体的なパフォーマンスが向上します。
DataStream API
DataStream API を使用してデータを読み書きするには、DataStream コネクタ を使用して Realtime Compute for Apache Flink に接続します。DataStream コネクタ の設定方法の詳細については、「DataStream コネクタの統合」をご参照ください。
Kafka ソースの構築
Kafka ソース は、Kafka ソース インスタンスを作成するためのビルダークラスを提供します。次のサンプルコードは、
input-topicトピック の最も古い オフセット からデータを消費する Kafka ソース を構築します。コンシューマーグループ はmy-groupで、Kafka メッセージ の 値 は文字列として逆シリアル化されます。Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");Kafka ソース を構築するには、以下のプロパティを指定する必要があります。
パラメーター
説明
BootstrapServers
Kafka ブローカーのアドレス一覧。このプロパティは
setBootstrapServers(String)メソッドを呼び出して設定します。GroupId
コンシューマーグループ の ID。このプロパティは
setGroupId(String)メソッドを呼び出して設定します。Topics または Partitions
サブスクライブ するトピックまたはパーティション。この Kafka ソース は、トピックまたはパーティションを サブスクライブ するための以下の 3 つの方法をサポートしています。
リスト内のトピックのすべてのパーティションをサブスクライブします。
KafkaSource.builder().setTopics("topic-a","topic-b")トピックパターン:サブスクライブ するトピックの名前が指定された正規表現に一致するすべてのパーティションをサブスクライブします。
KafkaSource.builder().setTopicPattern("topic.*")パーティションの一覧:指定されたパーティションをサブスクライブできます。
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // topic "topic-a" のパーティション 0 new TopicPartition("topic-b", 5))); // topic "topic-b" のパーティション 5 KafkaSource.builder().setPartitions(partitionSet)
Deserializer
Kafka メッセージを解析するために使用されるデシリアライザー。
setDeserializer(KafkaRecordDeserializationSchema)メソッドを使用してデシリアライザーを指定します。KafkaRecordDeserializationSchemaは、KafkaConsumerRecordをどのように解析するかを定義します。Kafka メッセージ の 値 のみを解析する必要がある場合、以下のいずれかの方法を使用できます。ビルダークラスの
setValueOnlyDeserializer(DeserializationSchema)メソッドを使用します。DeserializationSchemaは、Kafka メッセージ のバイナリデータの 値 をどのように解析するかを定義します。Kafka の Deserializer インターフェイス を実装するクラスを使用します。たとえば、StringDeserializer を使用して Kafka メッセージ の 値 を文字列として解析できます。
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
説明完全な
ConsumerRecordを解析するには、KafkaRecordDeserializationSchemaインターフェイスを実装する必要があります。POM
Kafka DataStream コネクタ は Maven セントラルリポジトリで利用可能です。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>Kafka DataStream コネクタ を使用する際には、以下のプロパティを考慮してください。
スタートオフセット
Kafka ソース は、スタートオフセット を オフセット初期化子 (
OffsetsInitializer) を使用して指定します。組み込みの初期化子には以下があります。オフセット初期化子
コード
最も古い オフセット から消費を開始します。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())最新の オフセット から消費を開始します。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())指定された時刻以上であるタイムスタンプを持つデータの消費を開始します。単位はミリ秒です。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))コンシューマーグループ のコミット済み オフセット からデータの消費を開始します。コミット済み オフセット が存在しない場合、指定されたリセット戦略(たとえば、最も古い オフセット)を使用します。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))コンシューマーがコミットしたオフセットから消費を開始し、オフセットリセットポリシーを指定しません。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())説明組み込みの初期化子が要件を満たさない場合、カスタムの オフセット初期化子 を実装できます。
オフセット初期化子 を指定しない場合、デフォルトは
OffsetsInitializer.earliest()です。
ストリーミングモードおよびバッチモード
Kafka ソース は、ストリーミングモード および バッチモード の両方をサポートしています。デフォルトでは、ジョブ が失敗またはキャンセルされるまで無限に実行される ストリーミングモード で動作します。Kafka ソース を バッチモード で実行するには、
setBounded(OffsetsInitializer)を使用して停止 オフセット を指定できます。Kafka ソース は、すべてのパーティションが指定された停止オフセットに達すると終了します。説明ストリーミングモード の Kafka ソース には通常、停止 オフセット がありません。ただし、テスト目的で、ストリーミングモード でも
setUnbounded(OffsetsInitializer)を使用して停止 オフセット を指定できます。ストリーミングモード と バッチモード で停止 オフセット を指定するメソッド名は異なります:ストリーミングモード ではsetUnbounded、バッチモード では setBounded です。動的パーティション検出
Flink ジョブ を再起動することなく、トピックパターンによるサブスクライブ時にトピックのスケーリングまたは新規トピックの作成を処理するには、トピック のスケーリングに対応するために 動的パーティション検出 を有効化できます。この機能はデフォルトで無効化されており、明示的に有効化する必要があります。
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // 10 秒ごとに新しいパーティションを検出します。重要動的パーティション検出機能は、Kafka クラスターのメタデータ更新メカニズムに依存します。Kafka クラスターがパーティション情報を適切なタイミングで更新しない場合、新しいパーティションが検出されない可能性があります。Kafka クラスターの partition.discovery.interval.ms 構成が実際のシナリオと一致していることを確認してください。
イベント時刻および Watermark
デフォルトでは、Kafka ソース は Kafka メッセージ のタイムスタンプを イベント時刻 として使用します。カスタムの Watermark 戦略を定義することで、メッセージ の本文から イベント時刻 を抽出し、下流に Watermark を送信できます。
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")カスタム Watermark 戦略の詳細については、「Watermark の生成」をご参照ください。
説明ソースのサブタスクがアイドル状態の場合(たとえば、Kafka パーティション に新しいデータがない場合や、ソースの 並列度 が Kafka パーティション数より大きい場合)、そのサブタスクの Watermark は進行しません。これにより、下流のウィンドウ計算がブロックされる可能性があります。
この問題を解決するには、以下のソリューションを検討してください。
ソースアイドルタイムアウトの設定:table.exec.source.idle-timeout プロパティ を有効にして、アイドル状態のソースを一時的にアイドル状態としてマークします。これにより、下流の Watermark が進行できるようになります。
適切な 並列度 の設定:ソースの 並列度 が Kafka パーティション数を超えないようにしてください。
オフセットのコミット
チェックポイントが有効な場合、Kafka ソース は、チェックポイント が完了したときに、現在のコンシューマー オフセット を Kafka にコミットします。これにより、Flink チェックポイント の状態が、Kafka ブローカーにコミットされた オフセット と一貫していることが保証されます。チェックポイントが無効な場合、Kafka ソース は Kafka コンシューマーの内部自動定期 オフセット コミットメカニズムに依存します。この機能は、
enable.auto.commitおよびauto.commit.interval.msKafka コンシューマープロパティによって制御されます。説明Kafka ソース は、フォールトトレランスおよび回復のためにコミットされたオフセットに依存しません。オフセットのコミットは、Kafka コンシューマーおよび コンシューマーグループ の進行状況を監視するためだけです。
その他のプロパティ
上記で言及されたプロパティに加えて、
setProperties(Properties)およびsetProperty(String, String)を使用して、Kafka ソース およびその基盤となる Kafka コンシューマーの任意の プロパティ を設定できます。Kafka ソース は以下の特定のプロパティを提供します。パラメーター
説明
client.id.prefix
Kafka コンシューマーのクライアント ID プレフィックスを指定します。
partition.discovery.interval.ms
新しいパーティションを検出する間隔をミリ秒単位で定義します。
-1の値は動的パーティション検出を無効にします。説明バッチモード では、このプロパティは自動的に
-1に設定されます。register.consumer.metrics
Flink に Kafka コンシューマーメトリックを登録するかどうかを指定します。
その他の Kafka コンシューマー構成
Kafka コンシューマー構成の完全なリストについては、公式 Apache Kafka ドキュメント をご参照ください。
重要正しい操作を保証するために、Kafka DataStream コネクタ は、以下の手動で構成されたプロパティを上書きします。
key.deserializerは常に org.apache.kafka.common.serialization.ByteArrayDeserializer に上書きされます。value.deserializerは常に org.apache.kafka.common.serialization.ByteArrayDeserializer に上書きされます。auto.offset.reset.strategyはOffsetsInitializerによって提供される戦略によって上書きされます。
次の例は、PLAIN SASL メカニズムを使用し、JAAS 構成を提供する Kafka コンシューマーを構成する方法を示しています。
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")モニタリング
Kafka ソース は、Flink のメトリックシステムを通じてメトリックを公開し、モニタリングおよび診断に利用できます。
メトリックスコープ
Kafka ソースリーダーのすべてのメトリックは、オペレーターのメトリックグループのサブグループである
KafkaSourceReaderメトリックグループの下に登録されます。特定の トピック パーティション に関連するメトリックは、KafkaSourceReader.topic.<topic_name>.partition.<partition_id>サブグループに登録されます。たとえば、「my-topic」トピック の パーティション 1 の現在の コンシューマー オフセット メトリック (
currentOffset) は、.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset で利用できます。成功したコミット数 ( commitsSucceeded) は、.operator.KafkaSourceReader.commitsSucceeded で利用できます。 メトリックのリスト
メトリック
説明
スコープ
currentOffset
パーティションの現在のコンシューマー オフセット。
TopicPartition
committedOffset
パーティションの最後にコミットされた オフセット。
TopicPartition
commitsSucceeded
成功したオフセットコミットの合計数。
KafkaSourceReader
commitsFailed
失敗したコミット数
KafkaSourceReader
Kafka コンシューマーメトリック
基盤となる Kafka コンシューマーのメトリックは、KafkaSourceReader.KafkaConsumer メトリックグループに登録されます。たとえば、
records-consumed-totalメトリック は.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total に登録されます。 プロパティ
register.consumer.metricsを使用して、Kafka コンシューマーメトリックを登録するかどうかを指定できます。このオプションはデフォルトで有効 (true) です。Kafka コンシューマーメトリックの詳細については、「Apache Kafka ドキュメント」をご参照ください。
Kafkaシンクの構築
Flink Kafkaシンクは、データストリームを1つ以上の Kafkaトピックに書き込みます。
DataStream<String> stream = ... Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", "localhost:9092"); KafkaSink<String> sink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); stream.sinkTo(sink);Kafkaシンクを構築するには、次のプロパティを設定する必要があります。
パラメーター
説明
Kafkaクライアントプロパティ
bootstrap.serversプロパティは必須です。これは、カンマ区切りの Kafkaブローカーのリストを指定します。レコードシリアライザー
入力データを Kafka の
KafkaRecordSerializationSchemaを使用して Kafka のProducerRecordに変換する必要があります。Flink では、メッセージキーおよび値のシリアル化、トピック選択、メッセージのパーティション分割など、一般的なコンポーネントを提供するスキーマビルダーが用意されています。さらに細かい制御が必要な場合は、対応するインターフェイスを実装することもできます。各受信レコードに対してProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp)メソッドが呼び出され、Kafka に書き込むProducerRecordを生成します。ProducerRecordは、各レコードが Kafkaに書き込まれる方法を詳細に制御し、これにより、以下を行うことができます。送信先の [トピック] を設定します。
[メッセージ] [キー] を設定します。
送信先の [パーティション] を指定します。
配信保証
bootstrap.serversパラメーターは必須であり、カンマ区切りの Kafkaブローカーのリストを指定します。配達保証
Flinkチェックポイントが有効になっている場合、Flink Kafkaシンクは1 回限りのセマンティクスを提供できます。チェックポイントを有効にすることに加えて、DeliveryGuaranteeパラメーターを使用して異なる配信保証を指定できます。DeliveryGuaranteeパラメーターは次のオプションを提供します。
DeliveryGuarantee.NONE: (デフォルト) Flink は保証を提供しません。データが失われたり、重複したりする可能性があります。
DeliveryGuarantee.AT_LEAST_ONCE: データが失われないことを保証しますが、重複が発生する可能性があります。
DeliveryGuarantee.EXACTLY_ONCE: Kafka トランザクションを使用して、1 回限りのセマンティクスを提供します。
説明EXACTLY_ONCE セマンティクスを使用する場合は、「EXACTLY_ONCE セマンティクスに関する考慮事項」をご参照ください。
データインジェスト
Kafka コネクタをソースまたはシンクとして使用して、データインジェスト用の YAML ジョブを作成できます。
制限事項
Kafka データソースから Flink CDC データをインジェストするには、Realtime Compute for Apache Flink (VVR) 11.1 以降を使用します。
JSON、Debezium JSON、および Canal JSON のみがサポートされています。
複数のパーティションに分散された単一テーブルからデータを読み取ることは、Realtime Compute for Apache Flink (VVR) 8.0.11 以降でのみサポートされています。
構文
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092パラメーター
一般
パラメーター
説明
必須
タイプ
デフォルト
備考
type
ソースまたはシンクのタイプ。
はい
String
該当なし
値は
kafkaでなければなりません。name
ソースまたはシンクの名前。
いいえ
String
該当なし
該当なし
properties.bootstrap.servers
Kafka ブローカーのアドレス。
はい
String
該当なし
書式は
host:port,host:port,host:portで、カンマ (,) で区切られます。properties.*
Kafka クライアントの構成プロパティ。
いいえ
String
該当なし
プロパティキーは、公式 Apache Kafka ドキュメントのProducer Configs およびConsumer Configs で定義された有効なオプションである必要があります。
Realtime Compute for Apache Flink (VVR) は、
properties.プレフィックスを削除してから、残りのキーと値のペアを基盤となる Kafka クライアントに渡します。たとえば、自動トピック作成を無効にするには、'properties.allow.auto.create.topics' = 'false'を設定します。key.format
Kafka メッセージキーのシリアル化および逆シリアル化のフォーマット。
いいえ
String
該当なし
ソースの場合、
jsonフォーマットのみがサポートされます。シンクの場合、有効な値は以下のとおりです。
csv
json
説明このオプションは、Realtime Compute for Apache Flink (VVR) 11.0.0 以降でのみサポートされます。
value.format
Kafka メッセージ値のシリアル化および逆シリアル化のフォーマット。
いいえ
String
debezium-json
ソースの場合、有効な値は以下のとおりです。
debezium-json
canal-json
json
シンクの場合、有効な値は以下のとおりです。
debezium-json
canal-json
canal-protobuf
説明debezium-jsonおよびcanal-jsonフォーマットには、Realtime Compute for Apache Flink (VVR) バージョン 8.0.10 以降が必要です。jsonフォーマットには、Realtime Compute for Apache Flink (VVR) バージョン 11.0.0 以降が必要です。
ソースパラメーター
パラメーター
説明
必須
タイプ
デフォルト
備考
topic
読み取るトピックまたはトピック一覧。
いいえ
String
該当なし
複数のトピックをサブスクライブするには、トピック名をセミコロン (;) で区切ります。たとえば、
topic-1;topic-2のようにします。説明このパラメーターまたは
topic-patternのいずれか一方のみを指定できます。両方を指定することはできません。topic-pattern
サブスクライブするトピックの名前と一致する正規表現。
いいえ
String
該当なし
説明このパラメーターまたは
topicのいずれか一方のみを指定できます。両方を指定することはできません。properties.group.id
コンシューマーグループの ID。
いいえ
String
該当なし
新しいコンシューマーグループ ID を指定する場合、初期スタートオフセットを定義するために、properties.auto.offset.reset パラメーターを
earliestまたはlatestに設定する必要があります。scan.startup.mode
Kafka コンシューマーのスタートオフセット。
いいえ
String
group-offsets
有効な値:
earliest-offset: 最も古い利用可能なオフセットから読み取りを開始します。
latest-offset: 最新のオフセットから読み取りを開始します。
group-offsets (デフォルト値): 指定された properties.group.id のコミット済みオフセットから読み取りを開始します。
timestamp: scan.startup.timestamp-millis で指定されたタイムスタンプから読み取りを開始します。
specific-offsets: scan.startup.specific-offsets で指定されたオフセットから読み取りを開始します。
説明このパラメーターは、ジョブがステートレスなスタートアップで開始される場合にのみ適用されます。ステートフルジョブが開始される場合、常にその状態に保存されたオフセットから消費されます。
scan.startup.specific-offsets
scan.startup.modeがspecific-offsetsに設定されている場合の、各パーティションのスタートオフセット。いいえ
String
該当なし
たとえば、
partition:0,offset:42;partition:1,offset:300のようにします。scan.startup.timestamp-millis
scan.startup.modeがtimestampに設定されている場合の、スタートタイムスタンプ(ミリ秒単位)。いいえ
Long
該当なし
単位はミリ秒です。
scan.topic-partition-discovery.interval
トピック内の新しいパーティションを動的に検出する間隔。
いいえ
Duration
5 分
コネクタは定期的に新しいパーティションを検出し、そこから読み取りを行います。
topic-patternを使用する場合、コネクタはパターンに一致する新しいトピックも検出します。検出を無効にするには、この値を 0 以下に設定します。scan.check.duplicated.group.id
properties.group.idで指定されたコンシューマーグループが重複しているかどうかを確認します。いいえ
Boolean
false
有効な値:
true: ジョブの開始前に重複するコンシューマーグループを確認します。重複が見つかった場合、ジョブは失敗します。
false: 競合を確認せずにジョブを開始します。
schema.inference.strategy
スキーマ解析戦略。
いいえ
String
continuous
有効な値:
continuous: 各データレコードのスキーマを解析します。スキーマに互換性がない場合、システムはより広いスキーマを推論し、スキーマ変更イベントを生成します。
static: ジョブ開始時にスキーマ解析を一度だけ実行します。その後、この初期スキーマに基づいてデータが解析され、スキーマ変更イベントは生成されません。
説明スキーマ解析の詳細については、「スキーマ解析および進化のポリシー」をご参照ください。
この構成オプションは、Ververica Runtime (VVR) 8.0.11 以降でのみサポートされます。
scan.max.pre.fetch.records
初期スキーマ推論のために、各パーティションから消費および解析するメッセージの最大数。
いいえ
Int
50
データ処理が開始される前に、システムは各パーティションから指定された数の最新メッセージをプリフェッチして消費し、スキーマを初期化します。
key.fields-prefix
名前の競合を避けるために、メッセージキーのフィールド名に追加されるカスタムプレフィックス。
いいえ
String
該当なし
たとえば、このパラメーターが
key_に設定され、メッセージキーにaという名前のフィールドが含まれている場合、解析されたフィールド名はkey_aになります。説明key.fields-prefixの値は、value.fields-prefixの値のプレフィックスであってはなりません。value.fields-prefix
名前の競合を避けるために、メッセージ値のフィールド名に追加されるカスタムプレフィックス。
いいえ
String
該当なし
たとえば、このパラメーターが
value_に設定され、メッセージ値にbという名前のフィールドが含まれている場合、解析されたフィールド名はvalue_bになります。説明value.fields-prefixの値は、key.fields-prefixの値のプレフィックスであってはなりません。metadata.list
下流のシンクに渡すメタデータ列。
いいえ
String
該当なし
利用可能なメタデータ列には、
topic、partition、offset、timestamp、timestamp-type、headers、およびleader-epochが含まれます。列名はカンマで区切ります。scan.value.initial-schemas.ddls
特定のテーブルの初期スキーマを定義する DDL ステートメント。
いいえ
String
該当なし
複数の DDL ステートメントをセミコロン (
;) で区切ります。たとえば、CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);を使用して、テーブル db1.t1 および db1.t2 の初期スキーマをそれぞれ指定します。DDL で定義されたテーブルスキーマは、ターゲットのシンクテーブルと一致し、Flink SQL 構文に準拠している必要があります。
説明この構成オプションは、Ververica Runtime (VVR) 11.5 以降でのみサポートされます。
ingestion.ignore-errors
データ解析エラーを無視するかどうかを指定します。
いいえ
Boolean
false
説明この構成オプションは、Ververica Runtime (VVR) 11.5 以降でのみサポートされます。
ingestion.error-tolerance.max-count
ingestion.ignore-errorsがtrueの場合、ジョブが失敗するまでに許容する解析エラーの最大数を指定します。いいえ
Integer
-1
このパラメーターは、
ingestion.ignore-errorsがtrueに設定されている場合にのみ適用されます。-1 の値は無制限の許容を意味し、解析例外によってジョブが失敗することはありません。説明この構成オプションは、Ververica Runtime (VVR) 11.5 以降でのみサポートされます。
Debezium JSON フォーマットパラメーター
パラメーター
必須
タイプ
デフォルト
説明
debezium-json.distributed-tables
いいえ
Boolean
false
単一の Debezium JSON テーブルのデータが複数のパーティションに分散されている場合に
trueに設定します。説明この構成オプションは、Ververica Runtime (VVR) 8.0.11 以降でのみサポートされます。
重要このパラメーターを変更するには、ステートレスなスタートアップが必要です。
debezium-json.schema-include
いいえ
Boolean
false
Debezium JSON メッセージにスキーマが含まれるかどうかを指定します。これは、Debezium Kafka Connect 構成の
value.converter.schemas.enableプロパティに対応します。有効な値:
true: Debezium JSON メッセージにスキーマが含まれます。
false: Debezium JSON メッセージにスキーマは含まれません。
debezium-json.ignore-parse-errors
いいえ
Boolean
false
有効な値:
true: 解析例外を引き起こす行をスキップします。
false: エラーをスローし、ジョブは失敗します。
debezium-json.infer-schema.primitive-as-string
いいえ
Boolean
false
テーブルスキーマを解析する際に、すべてのプリミティブ型を
String型として解析するかどうかを指定します。有効な値:
true: すべてのプリミティブ型を
Stringとして解析します。false: デフォルトのルールに基づいて型を解析します。
Canal JSON フォーマットパラメーター
パラメーター
必須
タイプ
デフォルト
説明
canal-json.distributed-tables
いいえ
Boolean
false
Canal JSON の単一テーブルのデータが複数のパーティションに分散されている場合、このオプションを有効にする必要があります。
説明この構成オプションは、Ververica Runtime (VVR) 8.0.11 以降でのみサポートされます。
重要このパラメーターを変更するには、ステートレスなスタートアップが必要です。
canal-json.database.include
いいえ
String
該当なし
Canal レコードの
databaseメタデータフィールドで変更ログをフィルタリングするためのオプションの正規表現。一致するデータベースからのレコードのみが処理されます。正規表現は Java の Pattern クラスと互換性があります。canal-json.table.include
いいえ
String
該当なし
Canal レコードの
tableメタデータフィールドで変更ログをフィルタリングするためのオプションの正規表現。一致するテーブルからのレコードのみが処理されます。正規表現は Java の Pattern クラスと互換性があります。canal-json.ignore-parse-errors
いいえ
Boolean
false
有効な値:
true: 解析例外が発生した場合、現在の行をスキップします。
false: エラーをスローし、ジョブの開始に失敗します。
canal-json.infer-schema.primitive-as-string
いいえ
Boolean
false
テーブルスキーマを解析する際に、すべてのプリミティブ型を
String型として解析するかどうかを指定します。有効な値:
true: すべてのプリミティブ型を
Stringとして解析します。false: デフォルトのルールに基づいて型を解析します。
canal-json.infer-schema.strategy
いいえ
String
AUTO
テーブルスキーマを解析する戦略を指定します。
有効な値:
AUTO: JSON データからスキーマを自動的に解析します。データに
sqlTypeフィールドが含まれていない場合、解析失敗を防ぐために推奨されます。SQL_TYPE: Canal JSON データ内の
sqlType配列からスキーマを解析します。データにsqlTypeフィールドが含まれている場合、より正確な型を取得するためにこれを SQL_TYPE に設定することを推奨します。MYSQL_TYPE: Canal JSON データ内の
mysqlType配列からスキーマを解析します。
sqlType型マッピングルールの詳細については、「Canal JSON のスキーマ解析」をご参照ください。説明この構成は、Ververica Runtime (VVR) 11.1 以降でのみサポートされます。
MYSQL_TYPEの値は、Ververica Runtime (VVR) 11.3 以降でサポートされます。
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
いいえ
Boolean
true
MySQL の
TIMESTAMP型を CDC のTIMESTAMP型にマッピングするかどうかを指定します。true: MySQL の
TIMESTAMP型は CDC のTIMESTAMP型にマッピングされます。false: MySQL の
TIMESTAMP型は CDC のTIMESTAMP_LTZ型にマッピングされます。
canal-json.mysql.treat-tinyint1-as-boolean.enabled
いいえ
Boolean
true
MYSQL_TYPE解析戦略を使用する場合、MySQL のTINYINT(1)型を CDC のBOOLEAN型にマッピングするかどうかを指定します。true: MySQL の
TINYINT(1)型は CDC のBOOLEAN型にマッピングされます。false: MySQL の
TINYINT(1)型は CDC のTINYINT(1)型にマッピングされます。
このオプションは、
canal-json.infer-schema.strategyがMYSQL_TYPEに設定されている場合にのみ適用されます。JSON フォーマットパラメーター
パラメーター
必須
タイプ
デフォルト
説明
json.timestamp-format.standard
いいえ
String
SQL
入力および出力データのタイムスタンプフォーマット。
SQL:
yyyy-MM-dd HH:mm:ss.s{precision}形式の入力タイムスタンプを解析します(例:2020-12-30 12:13:14.123)。ISO-8601:
yyyy-MM-ddTHH:mm:ss.s{precision}形式の入力タイムスタンプを解析します(例:2020-12-30T12:13:14.123)。
json.ignore-parse-errors
いいえ
Boolean
false
有効な値:
true: 解析例外が発生した場合、現在の行をスキップします。
false: エラーをスローし、ジョブの開始に失敗します。
json.infer-schema.primitive-as-string
いいえ
Boolean
false
テーブルスキーマを解析する際に、すべてのプリミティブ型を
String型として解析するかどうかを指定します。有効な値:
true: すべてのプリミティブ型を
Stringとして解析します。false: デフォルトのルールに基づいて型を解析します。
json.infer-schema.flatten-nested-columns.enable
いいえ
Boolean
false
JSON データ内の入れ子になった列を再帰的に展開するかどうかを指定します。有効な値:
true: 入れ子になった列を再帰的に展開します。
false: 入れ子になった列を
Stringとして扱います。
json.decode.parser-table-id.fields
いいえ
String
該当なし
JSON フォーマットでデータを解析する際に、一部の JSON フィールドの値を使用して tableId を生成するかどうかを指定します。複数のフィールドの値は、英語のカンマ
,で連結されます。たとえば、JSON データが{"col0":"a", "col1","b", "col2","c"}の場合、生成される結果は以下のとおりです。構成
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
json.infer-schema.fixed-types
いいえ
String
該当なし
JSON データを解析する際、特定のフィールドのデータ型を指定できます。複数のフィールドはカンマ
,で区切ります。たとえば、id BIGINT, name VARCHAR(10)は、idフィールドが BIGINT 型で、nameフィールドが VARCHAR(10) 型であることを指定します。説明この構成オプションは、Ververica Runtime (VVR) 11.5 以降でのみサポートされます。
Ververica Runtime (VVR) バージョン 11.5 でこの構成を使用する場合、
scan.max.pre.fetch.records: 0の構成も追加する必要があります。
シンクテーブルパラメーター
パラメーター
説明
必須
タイプ
デフォルト
備考
type
シンクタイプ。
はい
String
なし
値は
kafkaでなければなりません。name
シンク名。
いいえ
String
なし
該当なし
topic
Kafka トピック名。
いいえ
String
なし
このパラメーターが指定されている場合、すべてのデータはこのトピックに書き込まれます。
説明このパラメーターが指定されていない場合、各レコードは TableID にちなんで名付けられたトピックに書き込まれます。TableID は、データベース名とテーブル名をピリオド (
.) で結合して構築されます(例:databaseName.tableName)。partition.strategy
Kafka パーティションにデータを書き込む戦略。
いいえ
String
all-to-zero
有効な値:
all-to-zero(デフォルト): すべてのデータをパーティション 0 に書き込みます。hash-by-key: プライマリキーのハッシュ値に基づいてデータをパーティションに書き込みます。これにより、同じプライマリキーを持つレコードが同じパーティションに書き込まれ、順序が保持されます。
sink.tableId-to-topic.mapping
上流のテーブル名から下流の Kafka トピック名へのマッピング。
いいえ
String
なし
マッピングはセミコロン (
;) で区切ります。各マッピング内で、上流のテーブル名と下流の Kafka トピック名はコロン (:) で区切ります。テーブル名には正規表現を使用できます。複数のテーブルを同じトピックにマッピングするには、テーブル名をカンマ (,) で区切ります。例:mydb.mytable1:topic1;mydb.mytable2:topic2。説明このパラメーターを使用すると、元のテーブル名情報を保持したまま、マッピングされたトピックを変更できます。
Debezium JSON フォーマットパラメーター
パラメーター
必須
タイプ
デフォルト
説明
debezium-json.include-schema.enabled
いいえ
Boolean
false
Debezium JSON データにスキーマ情報を含めるかどうかを指定します。
debezium-json.emit.full-table-id.enabled
いいえ
Boolean
false
完全な 3 部構成のテーブル ID を Debezium JSON メタデータフィールドに書き込むかどうかを指定します。
このパラメーターが有効な場合、マッピングは以下のとおりです。
CDC テーブル ID 部
Debezium JSON キー
Namespace
dbSchema
schemaTable
tableこのパラメーターが無効な場合、マッピングは以下のとおりです。
CDC テーブル ID 部
Debezium JSON キー
Namespace
マッピングなし
Schema
dbTable
table説明このパラメーターは、Ververica Runtime (VVR) 11.6 以降でのみサポートされます。
例
Kafka をデータインジェストソースとして使用する場合:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADENKafka をデータインジェストシンクとして使用する場合:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}routeモジュールは、ソーステーブルの送信先 Kafka トピックを指定します。
デフォルトでは、ApsaraMQ for Kafka の自動トピック作成機能は無効になっています。詳細については、「自動トピック作成に関する FAQ」をご参照ください。ApsaraMQ for Kafka にデータを書き込む前に、トピックを作成する必要があります。詳細については、「手順 3:リソースの作成」をご参照ください。
スキーマ解析およびスキーマ進化に関するポリシー
Kafka コネクタは、現在認識されているすべてのテーブルのスキーマを維持します。
テーブルスキーマの初期化
テーブルスキーマ情報には、カラムおよびデータの型に関する情報、データベースおよびテーブルに関する情報、およびプライマリキーに関する情報が含まれます。以下のセクションでは、これらの 3 種類の情報をそれぞれ初期化する方法について説明します。
カラムおよびデータの型に関する情報
データ取り込みジョブは、データからテーブルのカラムおよびデータの型を自動的に推論できます。ただし、特定のシナリオでは、一部のテーブルに対してカラムおよびデータの型を明示的に指定したい場合があります。ユーザーが指定する型の粒度に応じて、テーブルスキーマの初期化には以下の 3 つの戦略があります。
完全自動スキーマ推論
Kafka からデータを読み取る前に、Kafka コネクタは各パーティションから最大 scan.max.pre.fetch.records 件のメッセージをフェッチし、各メッセージのスキーマを解析してマージすることでテーブルスキーマを初期化します。その後、実際のデータ消費の前に、この初期化されたスキーマに基づいてテーブル作成イベントが生成されます。
Debezium JSON 形式および Canal JSON 形式では、テーブル情報が各メッセージ内に含まれています。scan.max.pre.fetch.records パラメーターに基づいて事前にフェッチされたメッセージには、複数のテーブルからのデータが含まれる可能性があります。したがって、単一のテーブルに対する事前フェッチ件数は確定できません。事前フェッチおよびスキーマ初期化は、各パーティションでメッセージの消費および処理を開始する前に 1 回のみ実行されます。後から新しいテーブルのデータが出現した場合、そのテーブルの最初のレコードから解析されたスキーマが初期スキーマとして使用され、再度の事前フェッチまたは初期化は行われません。
単一テーブルのデータが複数のパーティションに分散される機能は、Ververica Runtime (VVR) 8.0.11 以降でのみサポートされており、debezium-json.distributed-tables または canal-json.distributed-tables 構成オプションを true に設定する必要があります。
初期テーブルスキーマの指定
一部のシナリオでは、たとえば Kafka から事前に作成済みのダウンストリームテーブルへデータを書き込むために、初期テーブルスキーマを手動で指定したい場合があります。このような場合は、scan.value.initial-schemas.ddls パラメーターを追加することで指定できます。以下に構成例を示します。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# 初期テーブルスキーマを設定
scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);DDL 文はターゲットテーブルのスキーマと一致している必要があります。この構成では、db1.t1 テーブルの id カラムの初期型を BIGINT、name カラムの初期型を VARCHAR(10) と指定し、db1.t2 テーブルの id カラムの初期型を BIGINT と指定しています。
DDL 文には Flink SQL の構文を使用します。
特定フィールドに対する固定型の設定
一部のシナリオでは、特定のフィールドに対して固定のデータの型を設定したい場合があります。たとえば、TIMESTAMP 型として推論される可能性のあるフィールドを文字列として出力したい場合などです。このような場合は、json.infer-schema.fixed-types パラメーターを追加して初期テーブルスキーマを指定できます。このパラメーターは、メッセージフォーマットが JSON の場合にのみ有効です。以下に構成例を示します。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# 特定フィールドを固定型に設定
json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
scan.max.pre.fetch.records: 0この構成では、すべての id フィールドを BIGINT 型、すべての name フィールドを VARCHAR(10) 型として指定しています。
データの型は Flink SQL の型と一致します。
データベースおよびテーブルに関する情報
Canal JSON 形式および Debezium JSON 形式では、コネクタは各メッセージからデータベース名およびテーブル名を含むテーブル情報を解析します。
JSON 形式の場合、デフォルトではテーブル情報にはテーブル名のみが含まれます。これは、データを含むトピックの名前です。データにデータベースおよびテーブル情報が含まれる場合、json.infer-schema.fixed-types パラメーターを使用して、この情報を含むフィールドを指定できます。これらのフィールドは、データベース名およびテーブル名にマップされます。以下に設定例を示します:
source: type: kafka name: Kafka Source properties.bootstrap.servers: host:9092 topic: test-topic value.format: json scan.startup.mode: earliest-offset # col1 フィールドの値をデータベース名、col2 フィールドの値をテーブル名として使用 json.decode.parser-table-id.fields: col1,col2この構成により、コネクタは各レコードを、データベース名が
col1フィールドの値、テーブル名がcol2フィールドの値であるテーブルに送信します。
プライマリキーに関する情報
Canal JSON 形式では、JSON データ内の
pkNamesフィールドがテーブルのプライマリキーを定義します。Debezium JSON 形式および JSON 形式では、データにプライマリキー情報は含まれません。この場合、
transformルールを用いてテーブルにプライマリキーを手動で追加できます。transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
スキーマ解析およびスキーマ進化
テーブルスキーマが初期化された後、schema.inference.strategy が static に設定されている場合、Kafka コネクタは初期テーブルスキーマに基づいて各メッセージの値を解析し、スキーマ変更イベントを生成しません。一方、schema.inference.strategy が continuous に設定されている場合、Kafka コネクタは各 Kafka メッセージの値を解析し、物理カラムを特定したうえで、得られたスキーマを現在維持中のスキーマと比較します。スキーマが不一致の場合、コネクタはそれらをマージしようと試み、対応するテーブルスキーマ変更イベントを生成します。マージ規則は以下のとおりです。
解析された物理カラムに、現在のスキーマに存在しないフィールドが含まれている場合、これらのフィールドはスキーマに追加され、nullable カラムとして追加するイベントが生成されます。
解析された物理カラムに、現在のスキーマに存在するフィールドが含まれていない場合、当該フィールドはスキーマに保持され、その値は
NULLとして設定されます。カラム削除イベントは生成されません。同名のカラムについては、以下のとおり処理されます。
カラムのデータの型が同一で精度が異なる場合、より大きな精度を持つ型が採用され、カラム型変更イベントが生成されます。
カラムのデータの型が異なる場合、システムは型階層ツリー内で最も小さい共通親型を検索し、その共通親型をカラムの型として採用し、カラム型変更イベントを生成します。

サポートされるスキーマ進化ポリシー:
カラムの追加:コネクタは新規カラムをスキーマの末尾に追加し、そのデータを同期します。新規カラムは nullable として設定されます。
カラムの削除:カラム削除イベントは生成されません。代わりに、当該カラムの今後のデータはすべて
NULLとして設定されます。カラムの名前変更:コネクタはこれを旧カラムの削除および新カラムの追加として扱います。新カラムはスキーマの末尾に追加され、元のカラムの値は
NULLとして設定されます。カラム型の変更:
カラム型変更をサポートするダウンストリーム sink を使用する場合、データ取り込みジョブは、sink の設定に応じて型変更(たとえば
INTからBIGINTへの変更)を処理できます。この機能は、個別の sink がサポートするカラム型変更規則に依存します。サポートされる規則については、各 sink のドキュメントをご参照ください。Hologres のようなカラム型変更をサポートしないダウンストリーム sink を使用する場合、ワイド型マッピング を利用できます。この機能では、ジョブ起動時にダウンストリーム sink に幅広いデータの型を持つテーブルが作成されます。カラム型が変更された場合、新規型がダウンストリーム sink で定義されたワイド型の範囲内に収まれば、システムはその変更を許容します。
サポートされないスキーマ変更:
プライマリキーまたはインデックスなどの制約の変更。
NOT NULLからNULLABLEへのカラム変更。
Canal JSON スキーマ解析
Canal JSON データには、任意の
sqlTypeフィールドが含まれており、これによりデータカラムの正確な型情報が記録されます。より正確なスキーマを取得するため、canal-json.infer-schema.strategy をSQL_TYPEに設定して、sqlTypeフィールドの型を利用できます。型の対応関係は以下のとおりです。JDBC 型
型コード
CDC 型
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
NUMERIC
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
BINARY
-2
BYTES
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
DATE
91
DATE
TIME
92
TIME
TIMESTAMP
93
TIMESTAMP
CHAR
1
STRING
VARCHAR
12
LONGVARCHAR
-1
その他のデータ型
ダーティデータの許容とコレクション
ご利用の Kafka データソースには、不正な形式のレコード(いわゆる「ダーティデータ」)が含まれている可能性があります。ジョブが頻繁に失敗・再起動するのを防ぐため、無効なレコードを無視するよう構成できます。たとえば、以下のとおりです。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# ダーティデータの許容を有効化
ingestion.ignore-errors: true
# 最大 1,000 件のダーティデータレコードを許容
ingestion.error-tolerance.max-count: 1000この構成では、最大 1,000 件のダーティレコードを許容し、ジョブを正常に実行可能にします。ただし、ダーティレコード数がこのしきい値を超えるとジョブは失敗し、データの検証が必要になります。
ダーティデータによるジョブの失敗を一切回避するには、以下の構成をご使用ください。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# ダーティデータの許容を有効化
ingestion.ignore-errors: true
# 全てのダーティデータレコードを許容
ingestion.error-tolerance.max-count: -1ダーティデータ許容ポリシーにより、無効なレコードによってジョブが失敗することを防止できます。また、Kafka データプロデューサーの動作を調整するために、このダーティデータを分析することも推奨されます。「ダーティデータのコレクション」で説明されているとおり、TaskManager のログからジョブのダーティデータを確認できます。たとえば、以下のとおりです。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# ダーティデータの許容を有効化
ingestion.ignore-errors: true
# 全てのダーティデータレコードを許容
ingestion.error-tolerance.max-count: -1
pipeline:
dirty-data.collector:
# ダーティデータを TaskManager のログファイルに書き込む
type: loggerテーブル名とトピックのマッピング
データ取り込みジョブの Sink として Kafka を使用する場合、Debezium JSON や Canal JSON などのメッセージフォーマットには、元のテーブル名情報が含まれます。これらのメッセージを利用するダウンストリームシステムは、多くの場合、トピック名ではなく、埋め込まれたテーブル名を実際のテーブル識別子として使用します。したがって、テーブル名とトピック間のマッピング戦略を慎重に構成する必要があります。
MySQL データベースから 2 つのテーブル (mydb.mytable1 と mydb.mytable2) を同期する必要があるとします。以下のマッピング戦略が利用可能です:
1. マッピング戦略なし
マッピング戦略がない場合、各テーブルのデータは <データベース名>.<テーブル名> というフォーマットの名前を持つトピックに書き込まれます。したがって、mydb.mytable1 のデータは mydb.mytable1 という名前のトピックに、mydb.mytable2 のデータは mydb.mytable2 という名前のトピックに書き込まれます。以下に構成例を示します:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. ルート ルールによるマッピング (非推奨)
デフォルトの <データベース名>.<テーブル名> フォーマットを使用する代わりに、特定のトピックにデータを書き込みたい場合があります。これを行うには、ルート ルールを構成します。以下に構成例を示します:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytableこの場合、mydb.mytable1 と mydb.mytable2 のすべてのデータは、mytable という名前の単一のトピックに書き込まれます。
ただし、ルート ルールを使用して送信先トピックを変更すると、Kafka メッセージ (Debezium JSON または Canal JSON フォーマット) 内のテーブル名も変更されます。この場合、すべての Kafka メッセージ内のテーブル名は mytable になります。これにより、このトピックからメッセージを利用するシステムで予期しない動作が発生する可能性があります。
3. sink.tableId-to-topic.mapping を使用したマッピング (推奨)
元のソーステーブル名を保持したままテーブル名をトピックにマッピングするには、`sink.tableId-to-topic.mapping` パラメーターを使用します。以下に構成例を示します:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}または、以下の構成を使用することもできます:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}この場合、mydb.mytable1 と mydb.mytable2 のすべてのデータは mytable トピックに書き込まれ、Kafka メッセージ (Debezium JSON または Canal JSON フォーマット) 内のテーブル名は mydb.mytable1 または mydb.mytable2 として保持されます。これにより、ダウンストリームシステムは元のソーステーブル名を正しく取得できます。
EXACTLY_ONCE セマンティクス
コンシューマーの隔離レベルの設定
Kafka データを消費するすべてのアプリケーションは、
isolation.levelプロパティを設定する必要があります。read_committed:コミット済みのデータのみを読み取ります。read_uncommitted(デフォルト):未コミットのデータも読み取ることができます。
EXACTLY_ONCE は
read_committedに依存します。それ以外の場合、コンシューマーが未コミットのデータを参照し、一貫性が損なわれる可能性があります。トランザクションのタイムアウトとデータ喪失
チェックポイントからの回復時、Realtime Compute for Apache Flink は、そのチェックポイントの開始前にコミットされたトランザクションのみを考慮します。ジョブの障害発生から再起動までの持続時間が Kafka のトランザクションタイムアウトを超える場合、Kafka は自動的にオープン中のトランザクションを中止し、データ喪失を引き起こす可能性があります。
Kafka ブローカーにおける
transaction.max.timeout.msのデフォルト値は 15 分です。Flink Kafka Sink では、デフォルトで
transaction.timeout.msパラメーターが 1 時間に設定されます。ブローカー側の
transaction.max.timeout.msを、Flink 側の設定値以上(等しいか大きい値)に増加させる必要があります。
プロデューサープールと同時チェックポイント
EXACTLY_ONCEモードでは、固定サイズの Kafka プロデューサープールが使用されます。各チェックポイントは、このプールから 1 つのプロデューサーを使用します。同時実行中のチェックポイント数がプールサイズを超えると、ジョブは失敗します。プロデューサープールのサイズは、同時実行可能なチェックポイントの最大数に基づいて設定してください。
並列処理数のスケールダウン制約
最初のチェックポイントが完了する前にジョブが失敗した場合、再起動時に元のプロデューサープール情報が失われます。したがって、最初のチェックポイント完了前に、ジョブの並列処理数をスケールダウンしないでください。スケールダウンがやむを得ず必要となる場合は、新しい並列処理数が
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTORを下回らないようにしてください。トランザクションによる読み取りのブロック
read_committedモードでは、コミットまたは中止されていないトランザクションが存在すると、トピック全体の読み取り操作がブロックされます。たとえば:
トランザクション 1 がデータを書き込みます。
トランザクション 2 がさらにデータを書き込み、コミットされます。
トランザクション 1 がオープンのままの場合、コミット済みのトランザクション 2 のデータはコンシューマーに対して非表示のままとなります。
これは以下の影響を及ぼします:
通常運用時において、データの可視性遅延はおおよそチェックポイント間隔と同等になります。
ジョブが失敗した場合、当該ジョブが書き込んでいたトピックは、ジョブの再起動またはトランザクションのタイムアウトが発生するまで、コンシューマーによる読み取りがブロックされます。極端なケースでは、トランザクションタイムアウト処理自体が読み取り操作に影響を与える可能性もあります。