このトピックでは、ApsaraMQ for Kafka コネクタの使用方法について説明します。
背景情報
Apache Kafka は、オープンソースの分散メッセージキューシステムです。パフォーマンス専有型のデータ処理、ストリーム分析、データ統合などのビッグデータ分野で広く使用されています。Kafka コネクタは、オープンソースの Apache Kafka クライアントに基づいています。高いデータスループットを提供し、複数のデータ形式の読み書きをサポートし、Realtime Compute for Apache Flink に 1 回限りのセマンティクスを提供します。
カテゴリ | 詳細 |
サポートタイプ | ソーステーブル、結果テーブル、データ統合ターゲット |
ランタイムモード | ストリーミングモード |
データ形式 | |
特定の監視メトリック | |
API タイプ | SQL、DataStream、データ統合 YAML |
結果テーブルのデータの更新または削除 | コネクタは結果テーブルのデータの更新または削除をサポートしていません。データの挿入のみをサポートしています。 説明 データの更新と削除に関連する機能については、「Upsert Kafka」をご参照ください。 |
前提条件
クラスターへの接続方法は次のとおりです:
Alibaba Cloud ApsaraMQ for Kafka クラスターへの接続
Kafka クラスターのバージョンが 0.11 以降であること。
ApsaraMQ for Kafka クラスターを作成済みであること。詳細については、「リソースの作成」をご参照ください。
Flink ワークスペースと Kafka クラスターが同じ VPC 内にあり、ApsaraMQ for Kafka クラスターのホワイトリストに Flink が追加されていること。詳細については、「ホワイトリストの設定」をご参照ください。
重要ApsaraMQ for Kafka へのデータ書き込みに関する制限事項:
ApsaraMQ for Kafka は、zstd 圧縮形式でのデータ書き込みをサポートしていません。
ApsaraMQ for Kafka は、べき等な書き込みやトランザクション書き込みをサポートしていません。そのため、Kafka 結果テーブルの 1 回限りのセマンティクス機能は使用できません。Ververica Runtime (VVR) 8.0.0 以降を使用する場合は、結果テーブルに
properties.enable.idempotence=false設定項目を追加して、べき等な書き込みを無効にする必要があります。ApsaraMQ for Kafka のストレージエンジンと機能制限の比較については、「ストレージエンジンの比較」をご参照ください。
自己管理型 Apache Kafka クラスターへの接続
セルフマネージド Apache Kafka クラスターのバージョンが 0.11 以降であること。
Flink とセルフマネージド Apache Kafka クラスター間のネットワーク接続が確立されていること。パブリックネットワーク経由でセルフマネージドクラスターに接続する方法については、「ネットワーク接続タイプの選択」をご参照ください。
Apache Kafka 2.8 のクライアント設定項目のみがサポートされています。詳細については、Apache Kafka ドキュメントの コンシューマーおよびプロデューサー設定をご参照ください。
注意事項
現在、Flink と Kafka の設計上のバグのため、トランザクション書き込みの使用は推奨されません。sink.delivery-guarantee = exactly-once を設定すると、Kafka コネクタはトランザクション書き込みを有効にしますが、3 つの既知の問題があります:
各チェックポイントはトランザクション ID を生成します。チェックポイントの間隔が短すぎると、生成されるトランザクション ID が多すぎます。Kafka クラスターのコーディネーターがメモリ不足になり、Kafka クラスターの安定性が損なわれる可能性があります。
各トランザクションはプロデューサーインスタンスを作成します。同時にコミットされるトランザクションが多すぎると、TaskManager がメモリ不足になり、Flink ジョブの安定性が損なわれる可能性があります。
複数の Flink ジョブが同じ
sink.transactional-id-prefixを使用すると、生成されるトランザクション ID が競合する可能性があります。1 つのジョブの書き込みに失敗すると、Kafka パーティションの Log Start Offset (LSO) の進行がブロックされます。これにより、そのパーティションからデータを読み取るすべてのコンシューマーに影響が及びます。
1 回限りのセマンティクスが必要な場合は、Upsert Kafka を使用してプライマリキーテーブルに書き込み、プライマリキーでべき等性を確保してください。トランザクション書き込みを使用する必要がある場合は、「EXACTLY_ONCE セマンティクスに関する考慮事項」をご参照ください。
ネットワーク接続のトラブルシューティング
Flink ジョブの起動中に Timed out waiting for a node assignment というエラーが報告された場合、通常は Flink と Kafka 間のネットワーク接続の問題が原因です。
Kafka クライアントは次のようにサーバーに接続します:
クライアントは
bootstrap.servers内のアドレスを使用して Kafka に接続します。Kafka は、接続アドレスを含む、クラスター内の各ブローカーのメタデータを返します。
クライアントは、返されたこれらのアドレスを使用して各ブローカーに接続し、読み取りおよび書き込み操作を実行します。
bootstrap.servers アドレスにアクセスできても、Kafka が不正なブローカーアドレスを返した場合、クライアントはデータを読み書きできません。この問題は、プロキシ、ポートフォワーディング、または専用線を使用するネットワークアーキテクチャでよく発生します。
トラブルシューティング手順
ApsaraMQ for Kafka
エンドポイントタイプの確認
デフォルトのエンドポイント (内部ネットワーク)
SASL エンドポイント (内部ネットワーク + 認証)
パブリックネットワークエンドポイント (別途リクエストが必要)
Flink 開発コンソールを使用してネットワーク診断を実行し、
bootstrap.serversアドレスとの接続問題を排除できます。セキュリティグループとホワイトリストの確認
Kafka インスタンスは、Flink が存在する VPC の CIDR ブロックをホワイトリストに追加する必要があります。詳細については、「VPC CIDR ブロックの表示」および「ホワイトリストの設定」をご参照ください。
SASL 設定の確認 (有効な場合)
SASL_SSL エンドポイントを使用する場合、Flink ジョブで JAAS、SSL、および SASL メカニズムを正しく設定する必要があります。認証が欠落していると、ハンドシェイクフェーズで接続が失敗し、タイムアウトとして現れることもあります。詳細については、「セキュリティと認証」をご参照ください。
ECS 上の自己管理型 Kafka
Flink 開発コンソールを使用してネットワーク診断を実行します。
bootstrap.serversアドレスとの接続問題を排除し、内部およびパブリックネットワークエンドポイントの正しさを確認します。セキュリティグループとホワイトリストの確認
ECS セキュリティグループは、Kafka エンドポイントポート (通常は 9092 または 9093) でのトラフィックを許可する必要があります。
ECS インスタンスは、Flink が存在する VPC の CIDR ブロックをホワイトリストに追加する必要があります。詳細については、「VPC CIDR ブロックの表示」をご参照ください。
設定の確認
Kafka が使用する ZooKeeper クラスターにログインします。zkCli.sh または zookeeper-shell.sh ツールを使用できます。
コマンドを実行してブローカーのメタデータを取得します。例:
get /brokers/ids/0。返された結果で、endpoints フィールドにある Kafka がクライアントにアドバタイズするアドレスを見つけます。
Flink 開発コンソールを使用してネットワーク診断を実行し、アドレスが到達可能かどうかをテストします。
説明アドレスが到達不可能な場合は、Kafka の運用保守エンジニアに連絡して
listenersおよびadvertised.listenersの設定を確認し、修正してください。返されたアドレスが Flink からアクセス可能であることを確認してください。Kafka クライアントがサーバーに接続する方法の詳細については、「接続のトラブルシューティング」をご参照ください。
SASL 設定の確認 (有効な場合)
SASL_SSL エンドポイントを使用する場合、Flink ジョブで JAAS、SSL、および SASL メカニズムを正しく設定する必要があります。認証が欠落していると、ハンドシェイクフェーズで接続が失敗し、タイムアウトとして現れることもあります。詳細については、「セキュリティと認証」をご参照ください。
SQL
Kafka コネクタは、SQL タスクでソーステーブルまたは結果テーブルとして使用できます。
構文
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)メタデータ列
ソーステーブルと結果テーブルにメタデータ列を定義して、Kafka メッセージのメタデータにアクセスしたり書き込んだりできます。たとえば、WITH パラメーターで複数のトピックを定義し、Kafka ソーステーブルでメタデータ列を定義すると、Flink が読み取るデータにはそのソース Topic がマークされます。次の例は、メタデータ列の使用方法を示しています。
CREATE TABLE kafka_source (
-- メッセージのトピックを `record_topic` フィールドとして読み取ります。
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- ConsumerRecord からタイムスタンプを `ts` フィールドとして読み取ります。
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- メッセージのオフセットを `record_offset` フィールドとして読み取ります。
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- `ts` フィールドのタイムスタンプを ProducerRecord のタイムスタンプとして Kafka に書き込みます。
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);次の表に、Kafka ソーステーブルと結果テーブルでサポートされているメタデータ列を示します。
キー | データ型 | 説明 | ソーステーブルまたは結果テーブル |
topic | STRING NOT NULL METADATA VIRTUAL | Kafka メッセージを含むトピックの名前。 | ソーステーブル |
partition | INT NOT NULL METADATA VIRTUAL | Kafka メッセージを含むパーティションの ID。 | ソーステーブル |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Kafka メッセージのヘッダー。 | ソーステーブルと結果テーブル |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Kafka メッセージのリーダーエポック。 | ソーステーブル |
offset | BIGINT NOT NULL METADATA VIRTUAL | Kafka メッセージのオフセット。 | ソーステーブル |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Kafka メッセージのタイムスタンプ。 | ソーステーブルと結果テーブル |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Kafka メッセージのタイムスタンプタイプ:
| ソーステーブル |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | 生の Kafka メッセージのキーフィールド。 | ソーステーブルと結果テーブル |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | 生の Kafka メッセージの値フィールド。 | ソーステーブルと結果テーブル |
WITH パラメーター
全般
パラメーター
説明
データ型
必須
デフォルト値
備考
connector
テーブルタイプ。
String
はい
なし
値は Kafka に固定されます。
properties.bootstrap.servers
Kafka ブローカーのアドレス。
String
はい
なし
形式は host:port,host:port,host:port です。アドレスはコンマ (,) で区切ります。
properties.*
Kafka クライアントの直接設定。
String
いいえ
なし
サフィックスは、Kafka の公式ドキュメントで定義されているプロデューサーおよびコンシューマーの設定である必要があります。
Flink は properties. プレフィックスを削除し、残りの設定を Kafka クライアントに渡します。たとえば、
'properties.allow.auto.create.topics'='false'を使用して、トピックの自動作成を無効にできます。Kafka コネクタによって上書きされるため、この方法で次の設定を変更しないでください:
key.deserializer
value.deserializer
format
Kafka メッセージの値部分を読み書きするために使用されるフォーマット。
String
いいえ
なし
サポートされているフォーマット:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
説明形式パラメーター設定の詳細については、「形式パラメーター」をご参照ください。
key.format
Kafka メッセージのキー部分を読み書きするために使用されるフォーマット。
String
いいえ
なし
サポートされているフォーマット:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
説明この設定を使用する場合、key.options 設定が必要です。
key.fields
ソーステーブルまたは結果テーブルのフィールドで、Kafka メッセージのキー部分に対応するもの。
String
いいえ
なし
複数のフィールド名はセミコロン (;) で区切ります。例:
field1;field2key.fields-prefix
Kafka メッセージのすべてのキーフィールドにカスタムプレフィックスを指定して、メッセージの値部分のフィールドとの名前の競合を回避します。
String
いいえ
なし
この設定項目は、ソーステーブルと結果テーブルの列名を区別するためにのみ使用されます。Kafka メッセージのキー部分を解析および生成する際にプレフィックスは削除されます。
説明この設定を使用する場合、
value.fields-includeを EXCEPT_KEY に設定する必要があります。value.format
Kafka メッセージの値部分を読み書きするために使用されるフォーマット。
String
いいえ
なし
この設定は
formatと同等です。formatまたはvalue.formatのいずれか一方のみを設定できます。両方が設定されている場合、value.formatがformatを上書きします。value.fields-include
Kafka メッセージの値部分を解析または生成する際に、メッセージのキー部分に対応するフィールドを含めるかどうかを指定します。
String
いいえ
ALL
有効な値:
ALL(デフォルト): すべての列が Kafka メッセージの値部分として処理されます。EXCEPT_KEY: key.fields で定義されたものを除く残りのフィールドが、Kafka メッセージの値部分として処理されます。
ソーステーブル
パラメーター
説明
データ型
必須
デフォルト値
備考
topic
読み取るトピックの名前。
String
いいえ
なし
複数のトピック名はセミコロン (;) で区切ります。例: topic-1;topic-2。
説明topic と topic-pattern オプションのいずれか一方のみを指定できます。
topic-pattern
読み取り元のトピック名に一致する正規表現。ジョブの実行中に、この正規表現に一致するすべてのトピックが読み取られます。
String
いいえ
なし
説明topic と topic-pattern オプションのいずれか一方のみを指定できます。
properties.group.id
コンシューマーグループ ID。
String
いいえ
KafkaSource-{source_table_name}
指定されたグループ ID を初めて使用する場合は、properties.auto.offset.reset を earliest または latest に設定して、初期開始オフセットを指定する必要があります。
scan.startup.mode
Kafka からデータを読み取るための開始オフセット。
String
いいえ
group-offsets
有効な値:
earliest-offset:Kafka の最も早いパーティションから読み取りを開始します。latest-offset:Kafka の最新のオフセットから読み取りを開始します。group-offsets(デフォルト):指定された properties.group.id のコミット済みオフセットから読み取りを開始します。timestamp:scan.startup.timestamp-millis で指定されたタイムスタンプから読み取りを開始します。specific-offsets:scan.startup.specific-offsets で指定されたオフセットから読み取りを開始します。
説明このパラメーターは、状態なしでタスクが開始された場合に有効になります。タスクがチェックポイントから再開するか、状態から回復すると、状態に保存された進行状況を優先的に使用して読み取りを再開します。
scan.startup.specific-offsets
specific-offsets スタートアップモードで、各パーティションの開始オフセットを指定します。
String
いいえ
なし
例:
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
timestamp スタートアップモードで、開始オフセットのタイムスタンプを指定します。
Long
いいえ
なし
単位はミリ秒です。
scan.topic-partition-discovery.interval
Kafka のトピックとパーティションを動的に検出する間隔。
Duration
いいえ
5 分
デフォルトのパーティションチェック間隔は 5 分です。この機能を無効にするには、パーティションチェック間隔を明示的に 0 以下の値に設定する必要があります。動的パーティション検出が有効な場合、Kafka ソースは新しいパーティションを自動的に検出し、そこからデータを読み取ることができます。topic-pattern モードでは、既存のトピックの新しいパーティションからデータを読み取るだけでなく、正規表現に一致する新しいトピックのすべてのパーティションからもデータを読み取ります。
説明Ververica Runtime (VVR) 6.0.x では、動的パーティション検出はデフォルトで無効になっています。VVR 8.0 以降、この機能はデフォルトで有効になり、検出間隔は 5 分に設定されています。
scan.header-filter
Kafka データに指定されたヘッダーが含まれているかどうかに基づいてデータをフィルターします。
String
いいえ
なし
ヘッダーのキーと値はコロン (:) で区切ります。複数のヘッダー条件は論理演算子 (&, |) で接続します。NOT 論理演算子 (!) もサポートされています。たとえば、
depart:toy|depart:book&!env:testは、ヘッダーに depart=toy または depart=book を含み、env=test を含まない Kafka データを保持します。説明このパラメーターは Ververica Runtime (VVR) 8.0.6 以降でのみサポートされます。
論理演算では括弧はサポートされていません。
論理演算は左から右に実行されます。
ヘッダー値は、指定されたヘッダー値と比較するために UTF-8 形式の文字列に変換されます。
scan.check.duplicated.group.id
properties.group.idで指定された重複するコンシューマーグループをチェックするかどうかを指定します。Boolean
いいえ
false
有効な値:
true: タスクを開始する前に、システムは重複するコンシューマーグループをチェックします。重複が見つかった場合、タスクはエラーを報告して停止し、既存のコンシューマーグループとの競合を防ぎます。
false: タスクはコンシューマーグループの競合をチェックせずに直接開始します。
説明このパラメーターは、VVR 6.0.4 以降でのみサポートされます。
シンクテーブル
パラメーター
説明
データ型
必須
デフォルト値
備考
topic
書き込むトピックの名前。
String
はい
なし
なし
sink.partitioner
Flink の並列処理から Kafka パーティションへのマッピングモード。
String
いいえ
default
有効な値:
default: デフォルトの Kafka partitioner を使用します。
fixed: 各 Flink の並列処理は、固定の Kafka パーティションに対応します。
round-robin:Flink の並列度からのデータがラウンドロビン方式で Kafka パーティションに割り当てられます。
カスタム partitioner:fixed と round-robin がニーズに合わない場合は、FlinkKafkaPartitioner の子クラスを作成してカスタム partitioner を定義できます。例:org.mycompany.MyPartitioner
sink.delivery-guarantee
Kafka 結果テーブルの配信セマンティクス。
String
いいえ
at-least-once
有効な値:
none: 保証なし。データが失われたり、重複したりする可能性があります。
at-least-once (デフォルト): データの損失がないことを保証しますが、データが重複する可能性があります。
exactly-once: Kafka トランザクションを使用して、データが失われたり重複したりしないことを保証します。
説明exactly-once セマンティクスを使用する場合、sink.transactional-id-prefix パラメーターが必要です。
sink.transactional-id-prefix
exactly-once セマンティクスで使用される Kafka トランザクション ID のプレフィックス。
String
いいえ
なし
この設定は、sink.delivery-guarantee が exactly-once に設定されている場合にのみ有効です。
sink.parallelism
Kafka 結果テーブルオペレーターの並列処理の次数。
Integer
いいえ
なし
アップストリームオペレーターの並列度はフレームワークによって決定されます。
セキュリティと認証
Kafka クラスターが安全な接続または認証を必要とする場合、関連するセキュリティおよび認証設定を properties. プレフィックスを付けて WITH パラメーターに追加できます。次の例は、SASL メカニズムとして PLAIN を使用し、JAAS 設定を提供するように Kafka テーブルを設定する方法を示しています。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)次の例は、セキュリティプロトコルとして SASL_SSL を使用し、SASL メカニズムとして SCRAM-SHA-256 を使用する方法を示しています。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/*SSL 設定*/
/*サーバーから提供されたトラストストア (CA 証明書) へのパスを設定します。*/
/*ファイル管理を通じてアップロードされたファイルは /flink/usrlib/ パスに保存されます。*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/*クライアント認証が必要な場合は、キーストア (秘密鍵) へのパスを設定します。*/
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/*クライアントがサーバーアドレスを検証するためのアルゴリズム。空の値はサーバーアドレス検証を無効にします。*/
'properties.ssl.endpoint.identification.algorithm' = '',
/*SASL 設定*/
/*SASL メカニズムを SCRAM-SHA-256 に設定します。*/
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/*JAAS を設定します*/
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)例で言及されている CA 証明書と秘密鍵は、Realtime Compute コンソールのファイル管理機能を使用してプラットフォームにアップロードできます。アップロード後、ファイルは /flink/usrlib ディレクトリに保存されます。使用する CA 証明書ファイルの名前が my-truststore.jks の場合、WITH パラメーターで 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' を指定してこの証明書を使用します。
上記の例は、ほとんどの設定シナリオに適用されます。Kafka コネクタを設定する前に、Kafka サーバーの運用保守エンジニアに連絡して、正しいセキュリティおよび認証設定を取得してください。
オープンソースの Flink とは異なり、Realtime Compute for Apache Flink の SQL エディターは二重引用符 (") を自動的にエスケープします。そのため、
properties.sasl.jaas.configを設定する際に、ユーザー名とパスワードの二重引用符に追加のエスケープ文字 (\) を追加する必要はありません。
ソーステーブルの開始オフセット
スタートアップモード
scan.startup.mode を設定することで、Kafka ソーステーブルの初期読み取りオフセットを指定できます。
earliest-offset: 現在のパーティションの最も古いオフセットから読み取りを開始します。
latest-offset: 現在のパーティションの最新のオフセットから読み取りを開始します。
group-offsets: 指定されたグループ ID のコミット済みオフセットから読み取りを開始します。グループ ID は properties.group.id で指定されます。
timestamp: 指定された時刻以上のタイムスタンプを持つ最初のメッセージから読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis で指定されます。
specific-offsets: 指定されたパーティションオフセットから消費を開始します。オフセットは scan.startup.specific-offsets で指定されます。
開始オフセットを指定しない場合、デフォルトでコミット済みオフセット (group-offsets) から消費が開始されます。
scan.startup.mode パラメーターは、状態なしで開始するジョブに対してのみ有効です。ステートフルなジョブが開始されると、その状態に保存されているオフセットから消費を開始します。
次のコードは例を示しています。
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- 最も早いオフセットから消費を開始します。
'scan.startup.mode' = 'earliest-offset',
-- 最新のオフセットから消費を開始します。
'scan.startup.mode' = 'latest-offset',
-- コンシューマーグループ "my-group" のコミット済みオフセットから消費を開始します。
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- "my-group" を初めて使用する場合、最も早いオフセットから消費を開始します。
'properties.auto.offset.reset' = 'latest', -- "my-group" を初めて使用する場合、最新のオフセットから消費を開始します。
-- 指定されたミリ秒タイムスタンプ 1655395200000 から消費を開始します。
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- 指定されたオフセットから消費を開始します。
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);開始オフセットの優先順位
ソーステーブルの開始オフセットの優先度は次のとおりです:
優先度 (高から低) | チェックポイントまたはセーブポイントに保存されているオフセット |
ジョブ開始時に Realtime Compute コンソールで選択された開始時刻 | |
WITH パラメーターの scan.startup.mode で指定された開始オフセット | |
scan.startup.mode が指定されていない場合、group-offsets が使用され、対応するコンシューマーグループのオフセットが使用されます |
いずれかのステップでオフセットが無効になった場合 (たとえば、有効期限切れや Kafka クラスターの問題)、システムは properties.auto.offset.reset で設定されたポリシーを使用してオフセットをリセットします。このパラメーターが設定されていない場合、例外が発生し、手動での介入が必要になります。
一般的なシナリオは、新しいグループ ID で消費を開始することです。まず、ソーステーブルは Kafka クラスターにそのグループのコミット済みオフセットを問い合わせます。このグループ ID は初めて使用されるため、有効なオフセットは見つかりません。したがって、オフセットは properties.auto.offset.reset パラメーターで設定されたポリシーに従ってリセットされます。新しいグループ ID で消費する場合、オフセットリセットポリシーを指定するために properties.auto.offset.reset を設定する必要があります。
ソーステーブルのオフセットコミット
Kafka ソーステーブルは、チェックポイントが成功した後にのみ、現在のコンシューマーオフセットを Kafka クラスターにコミットします。チェックポイントの間隔が長い場合、Kafka クラスターで観測されるコンシューマーオフセットは遅れます。チェックポイント中、Kafka ソーステーブルは現在の読み取り進捗をその状態に保存し、障害回復のためにクラスターにコミットされたオフセットに依存しません。オフセットのコミットは、Kafka 側での読み取り進捗の監視のためだけです。オフセットのコミットに失敗しても、データの正確性には影響しません。
結果テーブルのカスタム partitioner
組み込みの Kafka プロデューサー partitioner がニーズに合わない場合は、カスタム partitioner を実装して対応するパーティションにデータを書き込むことができます。カスタム partitioner は FlinkKafkaPartitioner を継承する必要があります。開発後、JAR パッケージをコンパイルし、Realtime Compute コンソールのファイル管理機能を使用してアップロードします。ファイルをアップロードして参照した後、WITH 句の sink.partitioner パラメーターを partitioner の完全なクラスパス (例:org.mycompany.MyPartitioner) に設定します。
Kafka、Upsert Kafka、Kafka JSON カタログの選択
Kafka は追記専用のメッセージキューシステムであり、データの更新や削除をサポートしていません。そのため、アップストリームの Change Data Capture (CDC) データや、ストリーミング SQL の集約や結合などのオペレーターからのリトラクションロジックを処理できません。変更やリトラクションを含むデータを Kafka に書き込むには、変更データを処理するために特別に設計されたUpsert Kafka 結果テーブルを使用します。
アップストリームデータベースの 1 つ以上のテーブルから Kafka に変更データをバッチで便利に同期するには、Kafka JSON カタログを使用できます。Kafka に保存されているデータが JSON 形式の場合、Kafka JSON カタログを使用すると、スキーマや WITH パラメーターを定義する必要がなくなります。詳細については、「Kafka JSON カタログの管理」をご参照ください。
例
例 1: Kafka からデータを読み取り、Kafka に書き込む
`source` という名前のトピックから Kafka データを読み取り、`sink` という名前のトピックに書き込みます。データは CSV 形式です。
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>', // Kafka ブローカーのリスト
'properties.group.id' = '<yourKafkaConsumerGroupId>', // Kafka コンシューマーグループ ID
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>', // Kafka ブローカーのリスト
'properties.group.id' = '<yourKafkaConsumerGroupId>', // Kafka コンシューマーグループ ID
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;例 2: テーブルスキーマとデータを同期する
Kafka トピックから Hologres にメッセージをリアルタイムで同期します。この場合、Kafka メッセージのオフセットとパーティション ID をプライマリキーとして使用して、フェールオーバー中に Hologres に重複メッセージがないことを保証できます。
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- オプション。すべてのネストされた列をフラット化します。
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;例 3: テーブルスキーマと Kafka メッセージのキーと値のデータの同期
Kafka メッセージのキー部分にすでに関連情報が格納されている場合、Kafka からキーと値の両方を同期できます。
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>', // Kafka ブローカーのリスト
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Kafka メッセージのキー部分は、テーブルスキーマの変更や型解析に対応していません。手動で宣言する必要があります。
例 4: テーブルスキーマとデータの同期および計算の実行
Kafka データを Hologres に同期する際、軽量な計算が必要になることがよくあります。
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- COALESCE を使用して null 値を処理します。例 5: ネストされた JSON の解析
JSON メッセージの例
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Alibaba Cloud",
"engine": "Flink"
}
}後でフィールドを解析するために JSON_VALUE(payload, '$.properties.owner') のような関数を使用するのを避けるために、Source DDL で直接構造を定義できます:
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);このようにすると、Flink は読み取り段階で JSON を構造化されたフィールドに解析します。後続の SQL クエリは、追加の関数呼び出しなしで直接 properties.owner を使用でき、全体的なパフォーマンスが向上します。
Datastream API
DataStream API を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。DataStream コネクタの設定方法については、「DataStream コネクタの使用」をご参照ください。
Kafka ソースの構築
Kafka ソースは、KafkaSource インスタンスを作成するためのビルダー クラスを提供します。次のコード例は、`input-topic` の最も早いオフセットからデータを消費する Kafka ソースを構築する方法を示しています。コンシューマーグループの名前は `my-group` で、Kafka メッセージ本文は文字列に逆シリアル化されます。
Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");KafkaSource を構築する際には、次のパラメーターを指定する必要があります。
パラメーター
説明
BootstrapServers
Kafka ブローカーのアドレス。setBootstrapServers(String) メソッドを使用してこれを設定します。
GroupId
コンシューマーグループ ID。setGroupId(String) メソッドを使用してこれを設定します。
トピックまたはパーティション
サブスクライブするトピックまたはパーティションの名前。Kafka ソースは、トピックまたはパーティションをサブスクライブするための次の 3 つの方法を提供します:
トピックリスト: トピックリスト内のすべてのパーティションをサブスクライブします。
KafkaSource.builder().setTopics("topic-a","topic-b")正規表現マッチング: 正規表現に一致するトピックのすべてのパーティションをサブスクライブします。
KafkaSource.builder().setTopicPattern("topic.*")パーティションリスト: 指定されたパーティションをサブスクライブします。
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" KafkaSource.builder().setPartitions(partitionSet)
デシリアライザ
Kafka メッセージを解析するための逆シリアライザー。
setDeserializer(KafkaRecordDeserializationSchema) を使用してデシリアライザーを指定します。ここで、KafkaRecordDeserializationSchema は Kafka ConsumerRecord を解析する方法を定義します。Kafka メッセージのメッセージ本文 (値) のデータのみを解析する必要がある場合は、次のいずれかの方法で行うことができます:
Flink が提供する KafkaSource ビルダークラスの setValueOnlyDeserializer(DeserializationSchema) メソッドを使用します。DeserializationSchema は、Kafka メッセージ本文のバイナリデータの解析方法を定義します。
Kafka が提供するパーサーを使用します。これには複数の実装クラスが含まれます。たとえば、StringDeserializer を使用して Kafka メッセージ本文を文字列に解析できます。
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
説明ConsumerRecord を完全に解析するには、KafkaRecordDeserializationSchema インターフェイスを自分で実装する必要があります。
XML
Kafka DataStream コネクタは、Maven Central Repository で入手できます。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>Kafka DataStream コネクタを使用する場合、次の Kafka プロパティを理解する必要があります:
開始コンシューマーオフセット
Kafka ソースは、オフセット初期化子 (OffsetsInitializer) を通じて消費の開始オフセットを指定できます。組み込みのオフセット初期化子には次のものがあります。
オフセットイニシャライザ
コード設定
最も早いオフセットから消費を開始します。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())最新のオフセットから消費を開始します。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())指定された時刻以上のタイムスタンプを持つデータから消費を開始します (ミリ秒単位)。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))コンシューマーグループによってコミットされたオフセットから消費を開始します。コミットされたオフセットが存在しない場合は、最も早いオフセットを使用します。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))オフセットリセット戦略を指定せずに、コンシューマーグループによってコミットされたオフsetから消費を開始します。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())説明組み込みのイニシャライザがニーズを満たさない場合は、カスタムオフセットイニシャライザを実装できます。
オフセットイニシャライザが指定されていない場合、デフォルトで OffsetsInitializer.earliest() (最も古いオフセット) が使用されます。
ストリーミングモードとバッチモード
Kafka ソースは、ストリーミングとバッチの両方のランタイムモードをサポートしています。デフォルトでは、Kafka ソースはストリーミングモードで実行するように設定されているため、Flink ジョブが失敗するかキャンセルされるまでジョブは停止しません。Kafka ソースをバッチモードで実行するように設定するには、setBounded(OffsetsInitializer) を使用して停止オフセットを指定します。すべてのパーティションが停止オフセットに達すると、Kafka ソースは終了します。
説明通常、ストリーミングモードには停止オフセットはありません。コードのデバッグを容易にするために、setUnbounded(OffsetsInitializer) を使用してストリーミングモードで停止オフセットを指定できます。ストリーミングモードとバッチモードで停止オフセットを指定するメソッド名 (setUnbounded と setBounded) は異なることに注意してください。
動的パーティション検出
Flink ジョブを再起動せずにトピックのスケーリングや新しいトピックの作成などのシナリオに対応するために、提供されているトピックまたはパーティションのサブスクリプションモードで動的パーティション検出機能を有効にできます。
説明動的パーティション検出機能はデフォルトで有効になっており、パーティションチェック間隔は 5 分です。この機能を無効にするには、パーティションチェック間隔を明示的に 0 以下の値に設定する必要があります。次のコードは例を示しています。
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // 10 秒ごとに新しいパーティションをチェックします。重要動的パーティション検出機能は、Kafka クラスターのメタデータ更新メカニズムに依存します。Kafka クラスターがパーティション情報を迅速に更新しない場合、新しいパーティションが検出されない可能性があります。Kafka クラスターの partition.discovery.interval.ms 設定が実際の状況と一致していることを確認してください。
イベント時間とウォーターマーク
デフォルトでは、Kafka ソースは Kafka メッセージのタイムスタンプをイベント時間として使用します。カスタムウォーターマーク戦略を定義して、メッセージからイベント時間を抽出し、ウォーターマークをダウンストリームに送信できます。
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")カスタムウォーターマーク戦略の詳細については、「ウォーターマークの生成」をご参照ください。
説明並列ソースの一部のタスクが長時間アイドル状態になると (たとえば、Kafka パーティションに長時間データ入力がない場合や、ソースの並列度が Kafka パーティションの数を超える場合)、ウォーターマーク生成メカニズムが失敗する可能性があります。この場合、システムはウィンドウ計算を正常にトリガーできず、データ処理フローが停止します。
この問題を解決するには、次の調整を行うことができます。
ウォーターマークタイムアウトメカニズムの設定:table.exec.source.idle-timeout パラメーターを有効にして、指定されたタイムアウト期間後にシステムが強制的にウォーターマークを生成するようにします。これにより、ウィンドウ計算サイクルが進行することが保証されます。
データソースの最適化:Kafka パーティションとソースの並列度の比率を適切に保つことを推奨します (推奨:パーティション数 ≥ ソース並列度)。
コンシューマーオフセットのコミット
Kafka ソースは、チェックポイントが完了したときに現在のコンシューマーオフセットをコミットします。これにより、Flink のチェックポイント状態が Kafka ブローカー上のコミット済みオフセットと一致することが保証されます。チェックポイントが有効になっていない場合、Kafka ソースは Kafka コンシューマーの内部自動オフセットコミットロジックに依存します。自動コミット機能は、enable.auto.commit および auto.commit.interval.ms Kafka コンシューマー設定項目によって設定されます。
説明Kafka ソースは、失敗したジョブを回復するためにブローカーにコミットされたオフセットに依存しません。オフセットのコミットは、ブローカー側での Kafka コンシューマーとコンシューマーグループの消費進捗を監視するためだけです。
その他のプロパティ
上記のプロパティに加えて、setProperties(Properties) および setProperty(String, String) を使用して、Kafka ソースおよび Kafka コンシューマーの任意のプロパティを設定できます。KafkaSource には通常、次の設定項目があります。
設定項目
説明
client.id.prefix
Kafka コンシューマーのクライアント ID プレフィックスを指定します。
partition.discovery.interval.ms
Kafka ソースが新しいパーティションをチェックする間隔を定義します。
説明partition.discovery.interval.ms はバッチモードでは -1 に上書きされます。
register.consumer.metrics
Flink に Kafka コンシューマーのメトリックを登録するかどうかを指定します。
その他の Kafka コンシューマー設定
Kafka コンシューマー設定の詳細については、「Apache Kafka」をご参照ください。
重要Kafka コネクタは、手動で設定された一部のパラメーターを次のように強制的に上書きします:
key.deserializer は常に ByteArrayDeserializer に上書きされます。
value.deserializer は常に ByteArrayDeserializer に上書きされます。
auto.offset.reset.strategy は OffsetsInitializer#getAutoOffsetResetStrategy() に上書きされます。
次の例は、SASL メカニズムとして PLAIN を使用し、JAAS 設定を提供するように Kafka コンシューマーを設定する方法を示しています。
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")モニタリング
Kafka ソースは、監視と診断のために Flink にメトリックを登録します。
メトリックスコープ
Kafka ソースリーダーのすべてのメトリックは、オペレーターメトリックグループのサブグループである KafkaSourceReader メトリックグループの下に登録されます。特定のトピックパーティションに関連するメトリックは、KafkaSourceReader.topic.<topic_name>.partition.<partition_id> メトリックグループに登録されます。
たとえば、トピック "my-topic" のパーティション 1 の現在のコンシューマーオフセット (currentOffset) は、<some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset の下に登録されます。成功したオフセットコミットの数 (commitsSucceeded) は、<some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded の下に登録されます。
メトリックリスト
メトリック名
説明
スコープ
currentOffset
現在のコンシューマーオフセット。
TopicPartition
committedOffset
現在のコミット済みオフセット。
TopicPartition
commitsSucceeded
成功したコミット数。
KafkaSourceReader
commitsFailed
失敗したコミット数。
KafkaSourceReader
Kafka コンシューマーメトリック
Kafka コンシューマーのメトリックは、KafkaSourceReader.KafkaConsumer メトリックグループに登録されます。たとえば、Kafka コンシューマーメトリック records-consumed-total は、<some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total の下に登録されます。
register.consumer.metrics 設定項目を使用して、Kafka コンシューマーのメトリックを登録するかどうかを指定できます。デフォルトでは、このオプションは true に設定されています。Kafka コンシューマーメトリックの詳細については、「Apache Kafka」をご参照ください。
Kafka シンクの構築
Flink Kafka シンクは、ストリームデータを 1 つ以上の Kafka トピックに書き込むことができます。
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) // プロデューサー設定 .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") // ターゲットトピック .setKafkaValueSerializer(StringSerializer.class) // シリアル化スキーマ .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // フォールトトレランス .build(); stream.sinkTo(kafkaSink);次のパラメーターを設定する必要があります。
パラメーター
説明
トピック
データが書き込まれるデフォルトのトピック名。
データシリアル化
構築時に、入力データを Kafka
ProducerRecordに変換するためにKafkaRecordSerializationSchemaを提供する必要があります。Flink は、メッセージキー/値のシリアル化、トピック選択、メッセージパーティショニングなどの一般的なコンポーネントを提供するためのスキーマビルダーを提供します。より詳細な制御のために、対応するインターフェイスを実装することもできます。ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) メソッドは、流入する各データレコードに対して呼び出され、Kafka に書き込む ProducerRecord を生成します。各データレコードが Kafka にどのように書き込まれるかを詳細に制御できます。ProducerRecord を使用すると、次の操作を実行できます:
書き込み先のトピックの名前を設定します。
メッセージキーを定義します。
データが書き込まれるパーティションを指定します。
Kafka クライアントプロパティ
bootstrap.servers は必須です。これは、カンマで区切られた Kafka ブローカーのリストです。
フォールトトレランスセマンティクス
Flink のチェックポイントが有効な場合、Flink Kafka シンクは 1 回限りのセマンティクスを保証できます。Flink のチェックポイントを有効にすることに加えて、DeliveryGuarantee パラメーターを通じて異なるフォールトトレランスセマンティクスを指定することもできます。DeliveryGuarantee パラメーターの詳細は次のとおりです:
DeliveryGuarantee.NONE:(デフォルト設定) Flink は保証しません。データが失われたり、重複したりする可能性があります。
DeliveryGuarantee.AT_LEAST_ONCE: データの損失がないことを保証しますが、データが重複する可能性があります。
DeliveryGuarantee.EXACTLY_ONCE: Kafka トランザクションを使用して 1 回限りのセマンティクスを提供します。
説明EXACTLY_ONCE セマンティクスを使用する場合の考慮事項については、「EXACTLY_ONCE セマンティクスに関する考慮事項」をご参照ください。
データ統合
Kafka コネクタは、データ統合 YAML ジョブ開発でソースからの読み取りまたはターゲットへの書き込みに使用できます。
制限事項
Ververica Runtime (VVR) 11.1 以降では、Flink Change Data Capture (CDC) データ統合の同期データソースとして Kafka を使用することを推奨します。
JSON、Debezium JSON、Canal JSON 形式のみがサポートされています。現時点では、他のデータ形式はサポートされていません。
データソースの場合、単一テーブルのデータを複数のパーティションに分散させることは、Ververica Runtime (VVR) 8.0.11 以降でのみサポートされます。
構文
source:
type: kafka
name: Kafka source // Kafka ソース
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092設定項目
全般
パラメーター
説明
必須
データの型
デフォルト値
備考
type
ソースまたはターゲットのタイプ。
はい
文字列
なし
このパラメーターを kafka に設定します。
name
ソースまたはターゲットの名前。
いいえ
文字列
なし
なし
properties.bootstrap.servers
Kafka ブローカーのアドレス。
はい
文字列
なし
形式は
host:port,host:port,host:portです。アドレスはコンマ (,) で区切ります。properties.*
Kafka クライアントの直接設定。
いいえ
文字列
なし
サフィックスは、Kafka の公式ドキュメントで定義されているプロデューサーおよびコンシューマーの設定である必要があります。
Flink は properties. プレフィックスを削除し、残りの設定を Kafka クライアントに渡します。たとえば、
'properties.allow.auto.create.topics' = 'false'を使用して、トピックの自動作成を無効にできます。key.format
Kafka メッセージのキー部分を読み書きするために使用されるフォーマット。
いいえ
文字列
なし
ソースの場合、json のみがサポートされます。
シンクの場合、有効な値は次のとおりです:
csv
json
説明このパラメーターは Ververica Runtime (VVR) 11.0.0 以降でのみサポートされます。
value.format
Kafka メッセージの値部分を読み書きするために使用されるフォーマット。
いいえ
文字列
debezium-json
有効な値:
debezium-json
canal-json
json
説明debezium-json および canal-json 形式は、Ververica Runtime (VVR) 8.0.10 以降でのみサポートされます。
json 形式は、Ververica Runtime (VVR) 11.0.0 以降でのみサポートされます。
ソーステーブル
パラメーター
説明
必須
データ型
デフォルト値
備考
topic
読み取るトピックの名前。
いいえ
文字列
なし
複数のトピック名はセミコロン (;) で区切ります。例: topic-1;topic-2。
説明topic と topic-pattern オプションのいずれか一方のみを指定できます。
topic-pattern
読み取り元のトピック名に一致する正規表現。ジョブの実行中に、この正規表現に一致するすべてのトピックが読み取られます。
いいえ
文字列
なし
説明topic と topic-pattern オプションのいずれか一方のみを指定できます。
properties.group.id
コンシューマーグループ ID。
いいえ
文字列
なし
指定されたグループ ID を初めて使用する場合は、properties.auto.offset.reset を earliest または latest に設定して、初期開始オフセットを指定する必要があります。
scan.startup.mode
Kafka からデータを読み取るための開始オフセット。
いいえ
文字列
group-offsets
有効な値:
earliest-offset:最も早い Kafka パーティションから読み取りを開始します。
latest-offset:最新の Kafka オフセットから読み取りを開始します。
group-offsets (デフォルト):指定された properties.group.id のコミット済みオフセットから読み取りを開始します。
timestamp:scan.startup.timestamp-millis で指定されたタイムスタンプから読み取りを開始します。
specific-offsets:scan.startup.specific-offsets で指定されたオフセットから読み取りを開始します。
説明このパラメーターは、状態なしでタスクが開始された場合に有効になります。タスクがチェックポイントから再開するか、状態から回復すると、状態に保存された進行状況を優先的に使用して読み取りを再開します。
scan.startup.specific-offsets
specific-offsets スタートアップモードで、各パーティションの開始オフセットを指定します。
いいえ
文字列
なし
例:
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
timestamp スタートアップモードで、開始オフセットのタイムスタンプを指定します。
いいえ
Long
なし
単位はミリ秒です。
scan.topic-partition-discovery.interval
Kafka のトピックとパーティションを動的に検出する間隔。
いいえ
Duration
5 分
デフォルトのパーティションチェック間隔は 5 分です。この機能を無効にするには、パーティションチェック間隔を明示的に 0 以下の値に設定する必要があります。動的パーティション検出が有効な場合、Kafka ソースは新しいパーティションを自動的に検出し、そこからデータを読み取ることができます。topic-pattern モードでは、既存のトピックの新しいパーティションからデータを読み取るだけでなく、正規表現に一致する新しいトピックのすべてのパーティションからもデータを読み取ります。
scan.check.duplicated.group.id
properties.group.idで指定された重複するコンシューマーグループをチェックするかどうかを指定します。いいえ
ブール値
false
有効な値:
true:ジョブの開始前に、システムは重複するコンシューマーグループをチェックします。重複が見つかった場合、ジョブはエラーを報告し、既存のコンシューマーグループとの競合を防ぎます。
false: タスクはコンシューマーグループの競合をチェックせずに直接開始します。
schema.inference.strategy
スキーマ解析戦略。
いいえ
文字列
continuous
有効な値:
continuous:各データレコードのスキーマを解析します。先行するスキーマと後続するスキーマに互換性がない場合、より広いスキーマが解析され、スキーマ変更イベントが生成されます。
static:ジョブ開始時にスキーマを一度だけ解析します。後続のデータは初期スキーマに基づいて解析され、スキーマ変更イベントは生成されません。
説明スキーマ解析の詳細については、「テーブルスキーマの解析と変更同期戦略」をご参照ください。
この設定項目は、VVR 8.0.11 以降でのみサポートされます。
scan.max.pre.fetch.records
初期スキーマ解析中に各パーティションで消費および解析を試みるメッセージの最大数。
いいえ
Int
50
ジョブが実際にデータを読み取って処理する前に、各パーティションの最新メッセージを指定された数だけ事前消費してスキーマ情報を初期化しようとします。
key.fields-prefix
Kafka メッセージキーを解析した後の命名の競合を避けるために、メッセージキーから解析されたフィールド名に追加されるカスタムプレフィックス。
いいえ
文字列
なし
この設定項目が key_ に設定されていると仮定します。キーに `a` という名前のフィールドが含まれている場合、キーを解析した後のフィールド名は `key_a` になります。
説明key.fields-prefix の値は、value.fields-prefix のプレフィックスにすることはできません。
value.fields-prefix
Kafka メッセージ本文を解析した後の命名競合を避けるために、メッセージ値から解析されたフィールド名に追加されるカスタムプレフィックス。
いいえ
文字列
なし
この設定項目が value_ に設定されていると仮定します。値に `b` という名前のフィールドが含まれている場合、値を解析した後のフィールド名は `value_b` になります。
説明value.fields-prefix の値は、key.fields-prefix のプレフィックスにすることはできません。
metadata.list
ダウンストリームに渡されるメタデータ列。
いいえ
文字列
なし
利用可能なメタデータ列には、
topic、partition、offset、timestamp、timestamp-type、headers、leader-epoch、__raw_key__、__raw_value__があります。カンマで区切ります。ソーステーブル Debezium JSON 形式
パラメーター
必須
データの型
デフォルト値
説明
debezium-json.distributed-tables
いいえ
ブール値
false
Debezium JSON の単一テーブルのデータが複数のパーティションに表示される場合は、このオプションを有効にする必要があります。
説明この設定項目は、VVR 8.0.11 以降でのみサポートされます。
重要この設定項目を変更した後、状態なしでタスクを開始する必要があります。
debezium-json.schema-include
いいえ
ブール値
false
Debezium Kafka Connect を設定する際、Kafka 設定 value.converter.schemas.enable を有効にして、メッセージにスキーマを含めることができます。このオプションは、Debezium JSON メッセージにスキーマが含まれているかどうかを示します。
有効な値:
true: Debezium JSON メッセージにスキーマが含まれています。
false: Debezium JSON メッセージにスキーマが含まれていません。
debezium-json.ignore-parse-errors
いいえ
ブール値
false
有効な値:
true: 解析エラーが発生した場合、現在の行をスキップします。
false (デフォルト):エラーを報告し、ジョブの開始に失敗します。
debezium-json.infer-schema.primitive-as-string
いいえ
ブール値
false
テーブルスキーマを解析する際に、すべての型を String として解析するかどうかを指定します。
有効な値:
true: すべての基本データ型を String として解析します。
false (デフォルト): 基本的なルールに従って解析します。
ソーステーブル Canal JSON 形式
パラメーター
必須
データの型
デフォルト値
説明
canal-json.distributed-tables
いいえ
ブール値
false
Canal JSON の単一テーブルのデータが複数のパーティションに表示される場合は、このオプションを有効にする必要があります。
説明この設定項目は、VVR 8.0.11 以降でのみサポートされます。
重要この設定項目を変更した後、状態なしでタスクを開始する必要があります。
canal-json.database.include
いいえ
文字列
なし
Canal レコードの `database` メタデータフィールドに一致するオプションの正規表現。指定されたデータベースの変更ログレコードのみを読み取ります。正規表現文字列は Java の Pattern と互換性があります。
canal-json.table.include
いいえ
文字列
なし
Canal レコードの `table` メタデータフィールドに一致するオプションの正規表現。指定されたテーブルの変更ログレコードのみを読み取ります。正規表現文字列は Java の Pattern と互換性があります。
canal-json.ignore-parse-errors
いいえ
ブール値
false
有効な値:
true: 解析エラーが発生した場合、現在の行をスキップします。
false (デフォルト):エラーを報告し、ジョブの開始に失敗します。
canal-json.infer-schema.primitive-as-string
いいえ
ブール値
false
テーブルスキーマを解析する際に、すべての型を String として解析するかどうかを指定します。
有効な値:
true: すべての基本データ型を String として解析します。
false (デフォルト): 基本的なルールに従って解析します。
canal-json.infer-schema.strategy
いいえ
文字列
AUTO
テーブルスキーマの解析戦略。
有効な値:
AUTO (デフォルト):JSON データを分析して自動的に解析します。データに `sqlType` フィールドが含まれていない場合、解析の失敗を避けるために AUTO を使用することを推奨します。
SQL_TYPE:Canal JSON データの `sqlType` 配列を使用して解析します。データに `sqlType` フィールドが含まれている場合、より正確な型を取得するために canal-json.infer-schema.strategy を SQL_TYPE に設定することを推奨します。
MYSQL_TYPE:Canal JSON データの `mysqlType` 配列を使用して解析します。
Kafka の Canal JSON データに `sqlType` フィールドが含まれており、より正確な型マッピングが必要な場合は、canal-json.infer-schema.strategy を SQL_TYPE に設定することを推奨します。
`sqlType` のマッピングルールについては、「Canal JSON のスキーマ解析」をご参照ください。
説明この設定は VVR 11.1 以降でサポートされています。
MYSQL_TYPE は VVR 11.3 以降でサポートされています。
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
いいえ
ブール値
true
MySQL `timestamp` 型を CDC `timestamp` 型にマッピングするかどうかを指定します:
true (デフォルト):MySQL `timestamp` 型は CDC `timestamp` 型にマッピングされます。
false:MySQL `timestamp` 型は CDC `timestamp_ltz` 型にマッピングされます。
canal-json.mysql.treat-tinyint1-as-boolean.enabled
いいえ
ブール値
true
MYSQL_TYPE で解析する際、MySQL `tinyint(1)` 型を CDC `boolean` 型にマッピングするかどうかを指定します:
true (デフォルト):MySQL `tinyint(1)` 型は CDC `boolean` 型にマッピングされます。
false:MySQL `tinyint(1)` 型は CDC `tinyint(1)` 型にマッピングされます。
この設定は、canal-json.infer-schema.strategy が MYSQL_TYPE に設定されている場合にのみ有効です。
ソーステーブル JSON 形式
パラメーター
必須
データ型
デフォルト値
説明
json.timestamp-format.standard
いいえ
文字列
SQL
入出力のタイムスタンプ形式を指定します。有効な値:
SQL:入力タイムスタンプを yyyy-MM-dd HH:mm:ss.s{precision} 形式で解析します。例:2020-12-30 12:13:14.123。
ISO-8601:入力タイムスタンプを yyyy-MM-ddTHH:mm:ss.s{precision} 形式で解析します。例:2020-12-30T12:13:14.123。
json.ignore-parse-errors
いいえ
ブール値
false
有効な値:
true: 解析エラーが発生した場合、現在の行をスキップします。
false (デフォルト):エラーを報告し、ジョブの開始に失敗します。
json.infer-schema.primitive-as-string
いいえ
ブール値
false
テーブルスキーマを解析する際に、すべての型を String として解析するかどうかを指定します。
有効な値:
true: すべての基本データ型を String として解析します。
false (デフォルト): 基本的なルールに従って解析します。
json.infer-schema.flatten-nested-columns.enable
いいえ
ブール値
false
JSON データを解析する際、JSON 内のネストされた列を再帰的に展開するかどうかを指定します。有効な値:
true:再帰的に展開します。
false (デフォルト): ネストされた列を String として扱います。
json.decode.parser-table-id.fields
いいえ
文字列
なし
JSON データを解析する際、一部の JSON フィールド値を使用して tableId を生成するかどうかを指定します。複数のフィールドはカンマ
,で接続します。たとえば、JSON データが{"col0":"a", "col1","b", "col2","c"}の場合、結果は次のようになります:設定
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
結果テーブル
パラメーター
説明
必須
データの型
デフォルト値
備考
type
ターゲットのタイプ。
はい
文字列
なし
このパラメーターを kafka に設定します。
name
ターゲットの名前。
いいえ
文字列
なし
なし
topic
Kafka トピックの名前。
いいえ
文字列
なし
このパラメーターが指定されている場合、すべてのデータはこのトピックに書き込まれます。
説明このパラメーターが指定されていない場合、各データレコードは TableID 文字列に対応するトピックに書き込まれます。TableID は、データベース名とテーブル名をピリオド (
.) で結合して生成されます。例:databaseName.tableName。パーティション戦略
Kafka パーティションにデータを書き込むための戦略。
いいえ
文字列
オールゼロ
有効な値:
all-to-zero (デフォルト): すべてのデータをパーティション 0 に書き込みます。
hash-by-key:プライマリキーのハッシュ値に基づいてパーティションにデータを書き込みます。これにより、同じプライマリキーを持つデータレコードが同じパーティションに書き込まれ、その順序が保持されることが保証されます。
sink.tableId-to-topic.mapping
アップストリームのテーブル名からダウンストリームの Kafka トピック名へのマッピング。
いいえ
文字列
なし
複数のマッピングはセミコロン (
;) で区切ります。各マッピングでは、コロン (:) を使用してアップストリームのテーブル名とダウンストリームのトピック名を区切ります。テーブル名には正規表現を使用できます。複数のテーブルを同じトピックにマッピングするには、テーブル名をカンマ (,) で区切ります。例:mydb.mytable1:topic1;mydb.mytable2:topic2。説明このパラメーターを使用すると、元のテーブル名情報を保持したまま、マッピングされたトピックを変更できます。
シンクテーブル Debezium JSON 形式
パラメーター
必須
データ型
デフォルト値
説明
debezium-json.include-schema.enabled
いいえ
ブール値
false
Debezium JSON データにスキーマ情報が含まれているかどうかを指定します。
例
Kafka をデータ統合ソースとして使用する:
source: type: kafka name: Kafka source // Kafka ソース properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink // Hologres シンク endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADENKafka をデータ統合ターゲットとして使用する:
source: type: mysql name: MySQL Source // MySQL ソース hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink // Kafka シンク properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}ここでは、`route` モジュールを使用して、ソーステーブルから Kafka に書き込むトピック名を設定します。
ApsaraMQ for Kafka は、デフォルトでトピックの自動作成を有効にしていません。詳細については、「トピックの自動作成に関する問題」をご参照ください。ApsaraMQ for Kafka に書き込む場合は、事前に関連するトピックを作成する必要があります。詳細については、「ステップ 3:リソースの作成」をご参照ください。
テーブルスキーマの解析と変更同期戦略
パーティションメッセージの事前消費とテーブルスキーマの初期化
Kafka コネクタは、現在既知のすべてのテーブルのスキーマを維持します。Kafka データを読み取る前に、Kafka コネクタは各パーティションで最大 scan.max.pre.fetch.records 件のメッセージを事前消費しようとします。各データレコードのスキーマを解析し、これらのスキーマをマージしてテーブルスキーマ情報を初期化します。その後、データを消費する前に、コネクタは初期化されたスキーマに基づいて対応するテーブル作成イベントを生成します。
説明Debezium JSON および Canal JSON 形式の場合、テーブル情報は特定のメッセージから取得されます。事前消費される scan.max.pre.fetch.records 件のメッセージには、いくつかのテーブルのデータが含まれている可能性があります。したがって、各テーブルの事前消費されるデータレコードの数は特定できません。事前消費とテーブルスキーマの初期化は、各パーティションからメッセージを消費して処理する前に一度だけ実行されます。後で新しいテーブルデータが現れた場合、そのテーブルの最初のデータレコードから解析されたテーブルスキーマが初期スキーマとして使用されます。そのテーブルのスキーマは、事前消費によって再初期化されることはありません。
重要単一テーブルのデータを複数のパーティションに分散させることは、VVR 8.0.11 以降でのみサポートされます。このシナリオでは、設定項目 debezium-json.distributed-tables または canal-json.distributed-tables を true に設定する必要があります。
テーブル情報
Canal JSON および Debezium JSON 形式の場合、データベース名とテーブル名を含むテーブル情報は、特定のメッセージから解析されます。
JSON 形式の場合、テーブル情報にはテーブル名のみが含まれ、これはデータが存在するトピックの名前です。
プライマリキー情報
Canal JSON 形式の場合、テーブルのプライマリキーは JSON の `pkNames` フィールドに基づいて定義されます。
Debezium JSON および JSON 形式の場合、JSON にはプライマリキー情報が含まれていません。変換ルールを使用して、テーブルに手動でプライマリキーを追加できます。
transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
スキーマ解析とスキーマ変更
schema.inference.strategy が `static` に設定されている場合、テーブルスキーマが初期化された後、Kafka コネクタは初期テーブルスキーマに基づいて各メッセージの値を解析し、スキーマ変更イベントを生成しません。schema.inference.strategy が `continuous` に設定されている場合、Kafka コネクタは各 Kafka メッセージの本文を解析して物理列を抽出し、現在維持されているスキーマと比較します。解析されたスキーマが現在のスキーマと一致しない場合、スキーマのマージを試み、対応するテーブルスキーマ変更イベントを生成します。マージルールは次のとおりです:
解析された物理列に現在のスキーマにないフィールドが含まれている場合、これらのフィールドはスキーマに追加され、null 許容列を追加するイベントが生成されます。
解析された物理列に現在のスキーマに既にあるフィールドが含まれていない場合、そのフィールドは保持され、そのデータは NULL で埋められます。列を削除するイベントは生成されません。
両方に同じ名前の列がある場合、次のシナリオに従って処理されます:
型は同じだが精度が異なる場合、精度の大きい方の型が使用され、列型変更イベントが生成されます。
型が異なる場合、システムは次の図に示すツリー構造で最も低い共通の親ノードを見つけ、それを同じ名前の列の型として使用し、列型変更イベントを生成します。

現在サポートされているスキーマ変更戦略は次のとおりです:
列の追加: 現在のスキーマの末尾に対応する列を追加し、新しい列のデータを同期します。新しい列は null 許容に設定されます。
列の削除:列削除イベントは生成されません。代わりに、その列のデータは自動的に NULL 値で埋められます。
列名の変更:これは列の追加と削除として扱われます。名前が変更された列は現在のスキーマの末尾に追加され、名前変更前の列のデータは NULL 値で埋められます。
列の型変更:
ダウンストリームシステムが列の型変更をサポートしている場合、ダウンストリームシンクが列の型変更の処理をサポートした後、データ統合ジョブは通常の列の型変更 (例:INT から BIGINT へ) をサポートします。このような変更は、ダウンストリームシンクがサポートする列の型変更ルールに依存します。シンクテーブルごとにサポートされる列の型変更ルールは異なります。詳細については、特定のシンクテーブルのドキュメントをご参照ください。
Hologres のようにダウンストリームシステムが列の型変更をサポートしていない場合、拡張型マッピングを使用できます。これは、ジョブ開始時にダウンストリームシステムにより広い型を持つテーブルを作成することを意味します。列の型変更が発生した場合、システムはダウンストリームシンクが変更を受け入れられるかどうかを判断し、列の型変更に対して寛容なサポートを提供します。
現在サポートされていないスキーマ変更:
プライマリキーやインデックスなどの制約の変更。
NOT NULL から NULLABLE への変更。
Canal JSON のスキーマ解析
Canal JSON データには、データ列の正確な型情報を記録するオプションの `sqlType` フィールドが含まれている場合があります。より正確なスキーマを取得するには、canal-json.infer-schema.strategy を SQL_TYPE に設定して `sqlType` の型を使用できます。型マッピングの関係は次のとおりです:
JDBC 型
タイプコード
CDC 型
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
-5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
数値
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
バイナリ
-2
バイト
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
日付
91
日付
時間
92
時間
タイムスタンプ
93
タイムスタンプ
CHAR
1
文字列
VARCHAR
12
LONGVARCHAR
-1
その他の型
テーブル名からトピックへのマッピング戦略
Kafka をデータ統合ジョブのターゲットとして使用する場合、テーブル名からトピックへのマッピング戦略を慎重に設定する必要があります。これは、書き込まれる Kafka メッセージ形式 (Debezium JSON または Canal JSON) にもテーブル名情報が含まれており、後続の Kafka メッセージの消費では、トピック名ではなくデータ内のテーブル名が実際のテーブル名として使用されることが多いためです。
MySQL から `mydb.mytable1` と `mydb.mytable2` の 2 つのテーブルを同期する必要があるとします。考えられる設定戦略は次のとおりです:
1. マッピング戦略を設定しない
マッピング戦略がない場合、各テーブルは "database_name.table_name" 形式の名前のトピックに書き込まれます。したがって、`mydb.mytable1` からのデータは `mydb.mytable1` という名前のトピックに書き込まれ、`mydb.mytable2` からのデータは `mydb.mytable2` という名前のトピックに書き込まれます。次のコードは設定例を示しています:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. マッピングのためのルート ルールを設定する (非推奨)
多くのシナリオでは、ユーザーは書き込まれるトピックが "database_name.table_name" 形式であることを望まず、指定されたトピックにデータを書き込みたいと考えています。そのため、マッピングのためのルート ルールを設定します。次のコードは設定例を示しています:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytable1この場合、`mydb.mytable1` と `mydb.mytable2` からのすべてのデータは `mytable1` トピックに書き込まれます。
ただし、ルート ルールを使用して書き込まれるトピック名を変更すると、Kafka メッセージ (Debezium JSON または Canal JSON 形式) のテーブル名情報も変更されます。この場合、Kafka メッセージ内のすべてのテーブル名は `mytable1` になります。これにより、他のシステムがこのトピックから Kafka メッセージを消費する際に予期しない動作が発生する可能性があります。
3. sink.tableId-to-topic.mapping パラメーターをマッピング用に設定する (推奨)
ソーステーブル名情報を保持しながらテーブル名からトピックへのマッピングルールを設定するには、`sink.tableId-to-topic.mapping` パラメーターを使用します。次のコードは設定例を示しています:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}または
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}この場合、`mydb.mytable1` と `mydb.mytable2` からのすべてのデータは `mytable1` トピックに書き込まれます。Kafka メッセージ (Debezium JSON または Canal JSON 形式) のテーブル名情報は `mydb.mytable1` または `mydb.mytable2` のままです。他のシステムがこのトピックから Kafka メッセージを消費する際、ソーステーブル名情報を正しく取得できます。
EXACTLY_ONCE セマンティクスに関する考慮事項
コンシューマーの分離レベルの設定
Kafka データを消費するすべてのアプリケーションは `isolation.level` を設定する必要があります:
read_committed: コミットされたデータのみを読み取ります。read_uncommitted(デフォルト): コミットされていないデータを読み取ることができます。
EXACTLY_ONCE は
read_committedに依存します。そうしないと、コンシューマーがコミットされていないデータを表示する可能性があり、一貫性が損なわれます。トランザクションのタイムアウトとデータ損失
Flink がチェックポイントから回復する際、そのチェックポイントが開始される前にコミットされたトランザクションにのみ依存します。ジョブのクラッシュから再起動までの時間が Kafka のトランザクションタイムアウトを超えると、Kafka は自動的にトランザクションを中止し、データ損失につながります。
Kafka ブローカーのデフォルトの
transaction.max.timeout.msは 15 分です。Flink Kafka シンクのデフォルトの
transaction.timeout.msは 1 時間です。ブローカー側の
transaction.max.timeout.msを Flink の設定以上になるように増やす必要があります。
プロデューサープールと同時チェックポイント
EXACTLY_ONCE モードでは、固定サイズの Kafka プロデューサープールを使用します。各チェックポイントはプールから 1 つのプロデューサーを占有します。同時チェックポイントの数がプールサイズを超えると、ジョブは失敗します。
同時チェックポイントの最大数に基づいてプロデューサープールサイズを調整してください。
並列度のスケールインに関する制限
最初のチェックポイントの前にジョブが失敗した場合、再起動時に元のプロデューサープール情報は保持されません。したがって、最初のチェックポイントが完了する前にジョブの並列度を下げないでください。スケールインする必要がある場合、並列度は
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTORより低くしてはなりません。トランザクションによる読み取りのブロック
read_committedモードでは、開いているトランザクション (コミットも中止もされていない) があると、トピック全体からの読み取りがブロックされます。例:
トランザクション 1 がデータを書き込みます。
トランザクション 2 がデータを書き込み、コミットします。
トランザクション 1 が完了していない限り、トランザクション 2 のデータはコンシューマーには表示されません。
したがって:
通常の操作中、データの可視性のレイテンシーはチェックポイントの間隔とほぼ同じです。
ジョブが失敗すると、書き込み中のトピックは、ジョブが再起動するかトランザクションがタイムアウトするまでコンシューマーをブロックします。極端な場合、トランザクションのタイムアウトが読み取りに影響を与えることさえあります。