すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:CDC に関する FAQ

最終更新日:Nov 06, 2025

このトピックでは、Change Data Capture (CDC) に関するよくある質問への回答を記載します。

Flink CDC デプロイメントが失敗した場合、デプロイメントを再起動する代わりにキャンセルできますか?

デプロイメントの構成を変更して、再起動ポリシーを指定できます。たとえば、次の構成では、最大 2 回の再起動試行を実行でき、試行の間隔は 10 秒であることを指定します。2 回の試行後にデプロイメントの開始に失敗した場合、デプロイメントはキャンセルされます。

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 s

MySQL CDC ソーステーブルと Hologres CDC ソーステーブルはウィンドウ関数をサポートしていません。MySQL CDC ソーステーブルまたは Hologres CDC ソーステーブルで分単位のデータ集約を実装するにはどうすればよいですか?

MySQL CDC ソーステーブルまたは Hologres CDC ソーステーブルでウィンドウ集約と同様の効果を得るには、次のメソッドを使用して時間ベースの集約を実行できます。Spark 構成を変更するには、次の手順を実行します。

  1. DATE_FORMAT 関数を使用して、時間フィールドを分単位でフォーマットされた文字列に変換し、その文字列をウィンドウ値として使用します。

  2. GROUP BY 関数を使用して、ウィンドウ値を集約します。

次のサンプルコードは、店舗ごとに毎分の注文数と売上に関する統計を収集する方法の例を示しています。

SELECT 
    shop_id, 
    DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm') AS window,
    COUNT(*) AS order_count, 
    SUM(price) AS amount 
FROM order_mysql_cdc 
GROUP BY shop_id, window

MySQL CDC テーブルはソーステーブルとしてのみ使用できますか?

はい、MySQL CDC テーブルはソーステーブルとしてのみ使用できます。MySQL CDC テーブルは、MySQL データベーステーブルから完全データと増分データを読み取るために使用できます。MySQL テーブルは、ディメンションテーブルまたは結果テーブルとして使用できます。

MySQL CDC コネクタが完全データを読み取った後に増分データを読み取らないのはなぜですか?

問題の説明

原因

解決策

MySQL CDC コネクタは完全データのみを読み取り、増分データは読み取りません。

MySQL CDC コネクタは、MySQL CDC デプロイメントの構成に基づいて、ApsaraDB RDS for MySQL V5.6 のセカンダリインスタンスまたは読み取り専用インスタンスのデータを読み取ります。ただし、ApsaraDB RDS for MySQL V5.6 のセカンダリインスタンスまたは読み取り専用インスタンスは、ログファイルにデータを書き込みません。その結果、ダウンストリームの同期ツールは増分変更情報を読み取ることができません。

書き込みリクエストを処理できる ApsaraDB RDS for MySQL インスタンスを使用するか、ApsaraDB RDS for MySQL インスタンスを V5.6 より後のバージョンにアップグレードすることをお勧めします。

現在、MySQL CDC コネクタはバイナリログトランザクション圧縮をサポートしていません。自己管理型の MySQL クラスターでこの機能を有効にすると、増分読み取りでエラーが発生する可能性があります。

MySQL CDC コネクタを使用して自己管理型の MySQL クラスターから増分データを消費するには、バイナリログトランザクション圧縮を無効にします。

完全データが読み取られた後、MySQL CDC デプロイメントは一時停止されます。

MySQL CDC デプロイメントで完全データを読み取るのに必要な時間が長すぎます。この場合、最後のシャードのデータ量が過大になります。これにより、メモリ不足 (OOM) エラーが発生します。その結果、フェールオーバー後にデプロイメントは一時停止されます。

MySQL データベースの並列処理を増やして、完全データの読み取りを高速化します。

完全データ同期が完了すると、MySQL CDC コネクタはデータ同期デプロイメントを増分データ同期フェーズに自動的に切り替えます。MySQL CDC コネクタが複数のサブタスクを並行して実行して完全データを読み取る場合、データ同期デプロイメントが増分データ同期フェーズに入る前に、MySQL CDC コネクタはもう 1 つのチェックポイントを待つ必要があります。これにより、増分データが読み取られる前に、完全データがシンクに書き込まれることが保証されます。これにより、データの精度が保証されます。指定したチェックポイント間隔が長すぎると、MySQL CDC コネクタは増分データの同期を開始するまで長時間待機する必要があります。たとえば、チェックポイント間隔を 20 分に設定した場合、MySQL CDC コネクタは完全データ同期が完了してから 20 分間待機します。

この問題を回避するには、ビジネス要件に基づいて適切なチェックポイント間隔を指定することをお勧めします。

MySQL CDC デプロイメントの table-name パラメーターの値の正規表現に含まれるカンマ (,) が解析に失敗した場合はどうすればよいですか?

  • 原因

    たとえば、MySQL CDC デプロイメントで 'table-name' = 't_process_wi_history_\d{1,2}' 構成を使用すると、エラーが発生します。次の図にエラーの詳細を示します。报错参数

  • 原因

    Debezium はカンマ (,) を区切り文字として使用し、カンマ (,) を含む正規表現をサポートしていません。その結果、解析エラーが発生します。

  • 解決策

    'table-name' = '(t_process_wi_history_\d{1}|t_process_wi_history_\d{2})' 構成を使用することをお勧めします。

デプロイメントを再起動すると、MySQL CDC ソーステーブルのコネクタは、デプロイメントがキャンセルされたバイナリログファイルの位置からデータを消費しますか、それともデプロイメントが開始するように構成されたバイナリログファイルの位置からデータを消費しますか?

デプロイメントを再起動するときに、ビジネス要件に基づいて起動ポリシーを構成できます。[Deployment Starting Configuration] ダイアログボックスで [Starting Strategy] パラメーターを [NONE] に設定した場合、MySQL CDC ソーステーブルのコネクタは、デプロイメントが開始するように構成されたバイナリログファイルの位置からデータを再消費します。[Deployment Starting Configuration] ダイアログボックスで [Starting Strategy] パラメーターを [Latest State] に設定した場合、MySQL CDC ソーステーブルのコネクタは、デプロイメントがキャンセルされたバイナリログファイルの位置からデータを消費します。

たとえば、デプロイメントがバイナリログファイルの位置 {file=mysql-bin.01, position=40} から開始するように構成され、一定期間実行された後にデプロイメントがキャンセルされたとします。この場合、データはバイナリログファイルの位置 {file=mysql-bin.01, position=210} で消費されます。[Deployment Starting Configuration] ダイアログボックスで [Starting Strategy] パラメーターを [NONE] に設定した場合、MySQL CDC ソーステーブルのコネクタは、バイナリログファイルの位置 {file=mysql-bin.01, position=40} からデータを再消費します。[Deployment Starting Configuration] ダイアログボックスで [Starting Strategy] パラメーターを [Latest State] に設定した場合、MySQL CDC ソーステーブルのコネクタは、バイナリログファイルの位置 {file=mysql-bin.01, position=210} からデータを消費します。

重要

デプロイメントを再起動するときは、必要なバイナリログファイルが有効期限切れのためにサーバーから削除されていないことを確認してください。そうでない場合、エラーが返されます。

MySQL CDC ソーステーブルのコネクタはどのように機能しますか? MySQL CDC ソーステーブルはデータベースにどのような影響を与えますか?

MySQL CDC ソーステーブルの WITH 句の scan.startup.mode パラメーターが initial に設定されている場合、MySQL CDC ソーステーブルのコネクタは Java Database Connectivity (JDBC) ドライバーを使用して MySQL データベースに接続し、SELECT 文を実行して完全データを読み取り、バイナリログファイルの位置を記録します。scan.startup.mode パラメーターのデフォルト値は initial です。完全データの読み取りが完了すると、コネクタは記録されたバイナリログファイルの位置にあるバイナリログファイルから増分データを読み取ります。

完全データの読み取り中、SELECT 文がデータをクエリするために実行されるため、MySQL データベースのクエリ負荷が増加する可能性があります。増分データの読み取り中、binlog クライアントは MySQL データベースに接続してバイナリログデータを読み取るために使用されます。使用されるデータテーブルの数が増加すると、接続が過剰になる可能性があります。次の MySQL コマンドを実行して、最大接続数をクエリできます。

show variables like '%max_connections%';

MySQL CDC コネクタがスナップショットデータ読み取りフェーズをスキップして変更データのみを読み取るようにするにはどうすればよいですか?

WITH 句の scan.startup.mode パラメーターを構成して、データ消費に使用する起動モードを指定できます。最も古い位置からアクセス可能なバイナリログデータ、最新のバイナリログデータ、指定されたタイムスタンプからのバイナリログデータ、または指定された位置からのバイナリログデータを消費するように指定できます。scan.startup.mode パラメーターの詳細については、「MySQL CDC ソーステーブルの作成」トピックの「WITH 句のパラメーター」セクションをご参照ください。

シャーディングが実行されている MySQL データベースから MySQL CDC コネクタはどのようにデータを読み取りますか?

たとえば、シャーディング後に MySQL データベースに user_00、user_02、user_99 などの複数のテーブルがあり、これらのテーブルのスキーマが同じであるとします。このシナリオでは、table-name パラメーターを使用して、データを読み取ることができる複数のテーブルに一致する正規表現を指定できます。たとえば、table-name パラメーターを user_.* に設定して、プレフィックスが user_ であるすべてのテーブルを監視できます。データベース内のすべてのテーブルのスキーマが同じ場合は、database-name パラメーターを使用して同じ効果を得ることができます。

MySQL CDC デプロイメントで完全データ同期が完了したかどうかを判断するにはどうすればよいですか?

  • [Deployments] ページの [Metrics] タブで currentEmitEventTimeLag メトリックの値に基づいて、デプロイメントで完全データ同期が完了したかどうかを判断できます。

    currentEmitEventTimeLag メトリックは、ソースがデータレコードをシンクに送信した時刻と、データベースでデータレコードが生成された時刻との差を示します。このメトリックは、データがデータベースで生成されてからソースを離れるまでの遅延を測定するために使用されます。指标

    currentEmitEventTimeLag メトリックの値の説明:

    • currentEmitEventTimeLag の値が 0 以下の場合、MySQL CDC デプロイメントの完全データ同期は完了していません。

    • currentEmitEventTimeLag の値が 0 より大きい場合、CDC タスクは完全データ同期を完了し、バイナリログデータの読み取りを開始します。

  • MySQL CDC ソーステーブルの TaskManager のログに「BinlogSplitReader is created」が含まれているかどうかを確認します。このメッセージが表示された場合、完全データが読み取られています。次の図は、TaskManager のログ内の「BinlogSplitReader is created」を示しています。

    日志

複数の MySQL CDC デプロイメントが原因で MySQL データベースの負荷が高くなった場合はどうすればよいですか?

MySQL CDC ソーステーブルのコネクタは、バイナリログデータを読み取るためにデータベースに接続する必要があります。ソーステーブルの数が増えると、データベースの負荷も増加します。データベースの負荷を軽減するには、MySQL CDC ソーステーブルから ApsaraMQ for Kafka シンクテーブルにデータを同期し、シンクテーブル内のデータを消費します。これにより、MySQL CDC デプロイメントはバイナリログデータの読み取りに依存しなくなります。詳細については、「MySQL データベースのすべてのテーブルから Kafka にデータを同期する」をご参照ください。

CREATE TABLE AS 文を使用したデータ同期が原因でデータベースの負荷が高くなった場合は、CREATE TABLE AS 文を使用する複数のデプロイメントを 1 つのデプロイメントにマージして実行できます。CREATE TABLE AS 文を使用するデプロイメントの構成が同じ場合は、各 MySQL CDC ソーステーブルに同じサーバー ID を構成して、データソースを再利用し、データベースの負荷を軽減できます。詳細については、「例 4: 複数の CREATE TABLE AS 文の実行」をご参照ください。

MySQL CDC ソーステーブルでは少量のデータが更新されます。MySQL CDC ソーステーブルのコネクタが大量の帯域幅リソースを消費してデータを読み取るのはなぜですか?

  • 問題の説明

    MySQL CDC ソーステーブルでは少量のデータが更新されますが、MySQL CDC ソーステーブルのコネクタは大量の帯域幅リソースを消費してデータを読み取ります。

  • 原因

    MySQL インスタンスのバイナリログデータには、インスタンス内のすべてのデータベースとテーブルの変更データが含まれています。MySQL データベースに 3 つのテーブルが含まれている場合、デプロイメントが 1 つのテーブルのみの変更を実装していても、バイナリログデータには 3 つすべてのテーブルの変更データが含まれます。

    基礎となるバイナリログデータには、MySQL インスタンスのすべての変更データが含まれています。この場合、MySQL CDC コネクタを構成して、特定のテーブルの変更データを読み取ることができます。この構成は、MySQL インスタンスではなく、Debezium または Flink CDC コネクタで有効になります。

  • 解決策

    バイナリログデータのストレージメカニズムは変更できません。ただし、MySQL CDC ソーステーブルを再利用して、追加の帯域幅リソースの使用を防ぐことができます。詳細については、「MySQL コネクタ」トピックの「MySQL CDC ソーステーブルの再利用の有効化」セクションをご参照ください。

MySQL CDC コネクタを使用してテーブルから増分データを読み取ると、読み取られたタイムスタンプフィールドのデータと MySQL サーバーのタイムゾーンとの間に 8 時間の時差が存在します。なぜですか?

  • CDC デプロイメントで構成されている server-time-zone パラメーターの値が、MySQL サーバーのタイムゾーンと一致していません。バイナリログデータの timestamp フィールドが解析されるときに、このエラーが発生します。

  • DataStream では、MyDeserializer implements DebeziumDeserializationSchema などのカスタムシリアライザーが使用されます。カスタムシリアライザーが TIMESTAMP 型のデータを解析するときに、このエラーが発生します。RowDataDebeziumDeserializeSchemaTIMESTAMP 型のデータの解析情報に基づいて、コードで serverTimeZone を指定できます。

      private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
              if (dbzObj instanceof Long) {
                  switch (schema.name()) {
                      case Timestamp.SCHEMA_NAME:
                         return TimestampData.fromEpochMillis((Long) dbzObj);
                      case MicroTimestamp.SCHEMA_NAME:
                         long micro = (long) dbzObj;
                         return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
                      case NanoTimestamp.SCHEMA_NAME:
                         long nano = (long) dbzObj;
                         return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
                  }
             }
             LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
             return TimestampData.fromLocalDateTime(localDateTime);
        }

MySQL CDC コネクタはセカンダリデータベースをリッスンできますか? セカンダリデータベースを構成するにはどうすればよいですか?

はい、MySQL CDC コネクタはセカンダリデータベースをリッスンできます。MySQL CDC コネクタがセカンダリデータベースをリッスンできるようにするには、セカンダリデータベースのコードに次の構成を追加する必要があります。構成が完了すると、プライマリデータベースから同期されたデータがセカンダリデータベースのバイナリログファイルに書き込まれます。

log-slave-updates = 1

プライマリデータベースでグローバルトランザクション ID (GTID) モードが有効になっている場合は、セカンダリデータベースでも GTID モードを有効にする必要があります。GTID モードを有効にするには、プライマリデータベースとセカンダリデータベースのコードに次の構成を追加します。

gtid_mode = on
enforce_gtid_consistency = on

データベースから DDL イベントを取得するにはどうすればよいですか?

Apache Flink 用の CDC コネクタを使用する場合、DataStream API を呼び出して MySqlSource クラスを使用し、includeSchemaChanges(true) パラメーターを構成して DDL イベントを取得できます。DDL イベントを取得した後、後続の処理用のコードを記述できます。サンプルステートメント:

MySqlSource<xxx> mySqlSource =
 MySqlSource.<xxx>builder()
 .hostname(...)
 .port(...)
 .databaseList("<databaseName>")
 .tableList("<databaseName>.<tableName>")
 .username(...)
 .password(...)
 .serverId(...)
 .deserializer(...)
 .includeSchemaChanges(true) // パラメーターを構成して DDL イベントを取得します。
 .build();
 ... // 他の処理ロジックを記述します。

MySQL CDC コネクタは、MySQL データベース内のすべてのテーブルからのデータ同期をサポートしていますか? MySQL データベース内のすべてのテーブルからデータを同期するにはどうすればよいですか?

はい、Realtime Compute for Apache Flink では、CREATE TABLE AS または CREATE DATABASE AS 文を実行して、MySQL データベース内のすべてのテーブルからデータを同期できます。詳細については、「CREATE TABLE AS 文」または「CREATE DATABASE AS 文」をご参照ください。

説明

ApsaraDB RDS for MySQL V5.6 インスタンスはログファイルにデータを書き込みません。その結果、ダウンストリームの同期ツールは増分変更情報を読み取ることができません。

インスタンス内の特定のデータベースのテーブルの増分データが同期に失敗するのはなぜですか?

MySQL サーバーにバイナリログフィルターが構成されており、特定のデータベースのバイナリログが除外されています。show master status コマンドを実行して、Binlog_Ignore_DB と Binlog_Do_DB の値をクエリできます。次の例に結果を示します。

mysql> show master status;
+------------------+----------+--------------+------------------+----------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |  Executed_Gtid_Set   |
+------------------+----------+--------------+------------------+----------------------+
| mysql-bin.000006 |     4594 |              |                  | xxx:1-15             |
+------------------+----------+--------------+------------------+----------------------+

DataStream API を使用して MySQL CDC ソーステーブルを作成するときに tableList パラメーターを構成するにはどうすればよいですか?

tableList パラメーターの値には、DataStream API のテーブル名ではなく、データベース名とテーブル名を含める必要があります。MySQL CDC ソーステーブルを作成するときに、tableList("yourDatabaseName.yourTableName") 形式で tableList パラメーターを構成できます。

完全データ読み取りフェーズ中にデプロイメントが失敗した場合、MongoDB CDC コネクタはデプロイメントのチェックポイントからデータの読み取りを続行できますか?

はい。デプロイメントの WITH 句で 'scan.incremental.snapshot.enabled'= 'true' を構成すると、完全データ読み取りフェーズ中にデプロイメントが失敗した場合に MongoDB CDC コネクタがデプロイメントのチェックポイントからデータの読み取りを続行できるようになります。

MongoDB CDC コネクタは、完全データと増分データの両方の読み取り、または増分データのみの読み取りをサポートしていますか?

はい。デフォルトでは、MongoDB CDC コネクタは完全データと増分データの両方を読み取ります。MongoDB CDC コネクタが増分データのみを読み取るようにする場合は、デプロイメントの WITH 句で 'scan.startup.mode' = 'latest-offset' を構成します。

MongoDB CDC コネクタは、データベースの特定のコレクションのみへのサブスクリプションをサポートしていますか?

いいえ、MongoDB CDC コネクタは、データベースの特定のコレクションのみへのサブスクリプションをサポートしていません。MongoDB CDC コネクタを使用して、データベースのすべてのコレクションにサブスクライブできます。たとえば、WITH 句で 'database' = 'mgdb' と 'collection' = '' を構成すると、コネクタは MongoDB データベースのすべてのコレクションにサブスクライブします。

MongoDB CDC コネクタは同時読み取りをサポートしていますか?

scan.incremental.snapshot.enabled パラメーターを true に設定した場合、初期スナップショットフェーズ中に同時読み取りがサポートされます。

MongoDB CDC コネクタでサポートされている MongoDB のバージョンは何ですか?

MongoDB CDC コネクタは、変更ストリーム機能に基づいて実装されています。この機能は MongoDB 3.6 で導入されました。理論的には、MongoDB CDC コクタは MongoDB 3.6 以降をサポートしています。MongoDB 4.0 以降を使用することをお勧めします。MongoDB のバージョンが 3.6 より前の場合、MongoDB CDC コネクタが変更ストリームを読み取るときに「Unrecognized pipeline stage name: '$changeStream'」というエラーメッセージが表示されることがあります。

MongoDB CDC コネクタでサポートされている MongoDB データベースアーキテクチャは何ですか?

変更ストリームでは、MongoDB データベースがレプリカセットアーキテクチャまたはシャーディングされたクラスターアーキテクチャで実行されている必要があります。オンプレミステスト中の操作を簡素化するために、MongoDB データベースをスタンドアロンのレプリカセットアーキテクチャで実行できます。rs.initiate() コマンドを実行して、変更ストリームを初期化できます。MongoDB CDC コネクタを使用してスタンドアロンのレプリカセットアーキテクチャで実行されている MongoDB データベースからデータを読み取る場合、「The $changestage is only supported on replica sets」というエラーメッセージが表示されることがあります。

MongoDB CDC コネクタは Debezium 関連のパラメーターをサポートしていますか?

いいえ、MongoDB CDC コネクタは Debezium 関連のパラメーターをサポートしていません。MongoDB CDC コネクタは Flink CDC で独自に開発されており、Debezium には依存していません。

MongoDB CDC コネクタは、ユーザー名とそのパスワードに基づいて MongoDB データベースにアクセスできず、ユーザー名またはパスワードが無効であることを示すエラーメッセージが表示されます。しかし、他のコンポーネントはユーザー名とパスワードに基づいて MongoDB データベースにアクセスできます。なぜですか?

このエラーは、ユーザー資格情報が特定のデータベースの下で作成されているために発生します。MongoDB データベースにアクセスする場合は、WITH 句に 'connection.options' = 'authSource=ユーザーが属するデータベース' を追加します。

MongoDB CDC コネクタは、デプロイメントの再起動後にデプロイメントのチェックポイントからデータを読み取ることができますか? その動作原理は何ですか?

はい、MongoDB CDC コネクタは、デプロイメントの再起動後にデプロイメントのチェックポイントからデータを読み取ることができます。チェックポイントは、変更ストリームの再開トークンを記録します。コネクタは、関連する再開トークンに基づいて変更ストリームの読み取りを再開します。再開トークンは、oplog.rs コレクションの場所に対応します。oplog.rs コレクションは、MongoDB 変更ログのコレクションであり、固定容量を持ちます。

再開トークンのデータレコードが oplog.rs コレクションに存在しない場合、再開トークンが無効である可能性があります。この場合、oplog.rs コレクションのサイズを適切な値に設定して、oplog.rs コレクションが短すぎる期間保持されるのを防ぐことができます。詳細については、「自己管理レプリカセットメンバーの Oplog サイズの変更.」をご参照ください。

再開トークンは、新しく保持された変更レコードとハートビートレコードに基づいて更新できます。

MongoDB CDC コネクタは UPDATE_BEFORE メッセージ (更新前イメージ) の出力をサポートしていますか?

  • MongoDB データベース 6.0 以降で更新前イメージまたは更新後イメージ機能が有効になっている場合は、SQL デプロイメントに 'scan.full-changelog' = 'true' を構成できます。これにより、MongoDBSource は UPDATE_BEFORE メッセージを生成でき、ChangelogNormalize オペレーターは使用されません。

  • 6.0 より前のバージョンの MongoDB データベースの元の oplog.rs コレクションには、INSERT、UPDATE、REPLACE、DELETE の変更タイプが含まれますが、UPDATE_BEFORE 変更タイプは含まれません。したがって、MongoDBSource は UPDATE_BEFORE メッセージを直接生成できません。Flink は UPDATE ベースのセマンティクスのみをサポートします。MongoDBTableSource を使用すると、Flink プランナーは ChangelogNormalize オペレーターを使用してデータを自動的に最適化し、UPDATE_BEFORE メッセージを補足し、INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE メッセージを生成します。ただし、ChangelogNormalize オペレーターは、更新前にすべてのキーの状態を保存するため、大きなオーバーヘッドが発生します。DataStream デプロイメントが Flink プランナーを使用して最適化せずに MongoDBSource を使用する場合、ChangelogNormalize オペレーターは自動的に最適化に使用されません。その結果、MongoDBSource は UPDATE_BEFORE メッセージを生成できません。更新前イメージを取得する場合は、状態データを自分で管理する必要があります。状態データを自分で管理したくない場合は、MongoDBTableSource を有効にして oplog.rs コレクションの元のストリームを ChangelogStream または RetractStream に変換し、Flink プランナーを使用して更新前イメージを取得できます。サンプルコード:

     tEnv.executeSql("CREATE TABLE orders ( ... ) WITH ( 'connector'='mongodb-cdc',... )");
    
     Table table = tEnv.from("orders")
     .select($("*"));
    
     tEnv.toChangelogStream(table)
     .print()
     .setParallelism(1);
    
     env.execute();

PostgreSQL CDC コネクタを使用して無効な日付値をフィルターで除外するパラメーターを構成するにはどうすればよいですか?

無効な日付値をフィルターで除外するには、ビジネス要件に基づいて、Postgres CDC コネクタの WITH 句に次のいずれかの構成を追加できます。

  • 'debezium.event.deserialization.failure.handling.mode'='warn': ダーティデータをスキップし、ダーティデータを WARN ログに出力します。

  • 'debezium.event.deserialization.failure.handling.mode'='ignore': ダーティデータをスキップし、ダーティデータをログに出力しません。

PostgreSQL CDC コネクタを使用すると、TOAST データが送信されないことを示すエラーメッセージが表示されるのはなぜですか?

レプリカ ID が完全であることを確認してください。TOAST データのサイズは大きいです。WAL ログのサイズを小さくするために、TOAST データが変更されておらず、'debezium.schema.refresh.mode'='columns_diff_exclude_unchanged_toast' 構成が使用されている場合、wal2json プラグインは更新されたデータに TOAST データを挿入しません。

PostgreSQL データベースサーバーのディスク使用率が高いときに WAL ログが解放されないのはなぜですか?

PostgreSQL CDC コネクタは、チェックポイントが完了したときにのみ、PostgreSQL データベースのレプリケーションスロットのログシーケンス番号 (LSN) を更新します。ディスク使用率が高い場合は、PostgreSQL データベースのチェックポイントが有効になっているか、レプリケーションスロットが使用されていないか、同期遅延があるかを確認してください。

PostgreSQL CDC コネクタを使用して PostgreSQL から同期された DECIMAL データの精度が最大精度を超えた場合、何が返されますか?

PostgreSQL CDC コネクタが受信する DECIMAL データの精度が、Postgres CDC コネクタを使用するソーステーブルのステートメントで宣言されたデータ型の精度よりも大きい場合、DECIMAL データは null として処理されます。この場合、'debezium.decimal.handling.mode' = 'string' を構成して、PostgreSQL データベースから読み取られたデータを文字列として処理できます。

DataStream API を使用して PostgreSQL CDC ソーステーブルを作成するときに tableList パラメーターを構成するにはどうすればよいですか?

tableList パラメーターの値には、DataStream API のテーブル名ではなく、テーブル名とスキーマ名を含める必要があります。PostgreSQL CDC ソーステーブルを作成するときは、tableList パラメーターを my_schema.my_table 形式で構成します。

flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar のダウンロードに失敗するのはなぜですか? xxx-SNAPSHOT 依存関係が Maven リポジトリに存在しないのはなぜですか?

xxx-SNAPSHOT バージョンは、主流の Maven プロジェクトのバージョン管理メカニズムに基づいて、開発ブランチのコードに対応します。このバージョンを使用する場合は、ソースコードをダウンロードして関連する JAR パッケージをコンパイルする必要があります。flink-sql-connector-mysql-cdc-2.1.0.jar などの安定版のパッケージを使用できます。パッケージは Maven 中央リポジトリから取得できます。

flink-sql-connector-xxx.jar と flink-connector-xxx.jar の違いは何ですか?

Flink CDC コネクタのパッケージ命名規則は、他の Flink コネクタのパッケージ命名規則と一致しています。

  • flink-sql-connector-xx は fat JAR です。コネクタのコードに加えて、コネクタが依存するすべてのサードパーティパッケージが fat JAR にシェーディングされます。flink-sql-connector-xx は SQL デプロイメント用に提供されます。fat JAR を lib ディレクトリに追加するだけで済みます。

  • flink-connector-xx にはコネクタのコードのみが含まれ、コネクタの必要な依存関係は含まれていません。flink-connector-xx は DataStream デプロイメント用に提供されます。必要なサードパーティパッケージの依存関係を管理し、exclude および shade 操作を実行して依存関係の競合を処理する必要があります。

Maven リポジトリで 2.X バージョンのコネクタのパッケージが見つからないのはなぜですか?

Flink CDC コネクタ 2.0.0 では、グループ ID が com.alibaba.ververica から com.ververica に変更されました。したがって、Maven リポジトリの 2.X バージョンのパッケージパスは /com/ververica に変更されました。

DataStream API が JsonDebeziumDeserializationSchema 逆シリアル化機能を使用すると、数値型のデータが文字列として表示されます。どうすればよいですか?

Debezium が数値型のデータを解析するときに、さまざまな変換メソッドが使用されます。詳細については、「Debezium connector for MySQL」をご参照ください。次のサンプルコードは、Flink CDC で構成された変換メソッドを示しています。

Properties properties = new Properties();
....
properties.setProperty("bigint.unsigned.handling.mode","long");
properties.setProperty("decimal.handling.mode","double");

MySqlSource.<String>builder()
 .hostname(config.getHostname())
 ....
 .debeziumProperties(properties);

「Replication slot "xxxx" is active」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    PostgreSQL CDC デプロイメントが終了した後、スロットが正しく解放されない場合があります。

  • 解決策

    次のいずれかの方法を使用してスロットを解放します。

    • PostgreSQL CDC デプロイメントで次のコマンドを実行して、スロットを手動で解放します。

      select pg_drop_replication_slot('rep_slot');

      「ERROR: replication slot "rep_slot" is active for PID 162564」 というエラーメッセージが表示された場合、スロットはメッセージで指定された ID のプロセスによって占有されています。スロットを解放する前に、プロセスを終了する必要があります。プロセスを終了してスロットを解放するには、次のコマンドを実行します。

      select pg_terminate_backend(162564);
      select pg_drop_replication_slot('rep_slot');
    • 自動スロットクリーンアップを有効にします。この機能を有効にするには、デプロイメントの Postgres ソースに 'debezium.slot.drop.on.stop' = 'true' 構成を追加します。これにより、PostgreSQL CDC デプロイメントがキャンセルされたときにスロットが自動的にドロップされます。

      警告

      自動スロットクリーンアップを有効にすると、WAL ログが再利用されます。デプロイメントが再起動されると、データが失われ、At-Least Once セマンティクスを保証できなくなります。

「binlog probably contains events generated with statement or mixed based replication format」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    Caused by: io.debezium.DebeziumException: Received DML 'insert into gd_chat_fetch_log (
    
    id,
    c_cursor,
    d_timestamp,
    msg_cnt,
    state,
    ext1,
    ext2,
    cost_time
    
    ) values (
    null,
    null,
    '2022-03-23 16:51:00.616',
    0,
    1,
    null,
    null,
    0
    )' for processing, binlog probably contains events generated with statement or mixed based replication format
  • 原因

    バイナリログ形式は MIXED です。MySQL CDC ソーステーブルは、ROW 形式のバイナリログのみをサポートします。

  • 解決策

    1. MySQL データベースで show variables like "binlog_format" コマンドを実行して、バイナリログの現在の形式をクエリします。

      説明

      show global variables like "binlog_format" コマンドを実行して、バイナリログのグローバル形式を表示できます。

    2. MySQL データベースでバイナリログの形式を ROW に変更します。詳細については、「バイナリログ形式の設定」をご参照ください。

    3. デプロイメントを再起動します。

「Encountered change event for table xxx.xxx whose schema isn't known to this connector」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明报错详情

    202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader                     [] - Encountered change event 'Event{header=EventHeaderV4{timestamp=xxx, eventType=TABLE_MAP, serverId=xxx, headerLength=xxx, dataLength=xxx, nextPosition=xxx, flags=xxx}, data=TableMapEventData{tableId=xxx, database='xxx', table='xxx', columnTypes=xxx, xxx..., columnMetadata=xxx,xxx..., columnNullability={xxx,xxx...}, eventMetadata=null}}' at offset {ts_sec=xxx, file=mysql-bin.xxx, pos=xxx, gtids=xxx, server_id=xxx, event=xxx} for table xxx.xxx whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.
    Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=30946 --stop-position=31028 --verbose mysql-bin.004419
    202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader                     [] - Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin.xxx/xxx
    202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader                     [] - Failed due to error: Error processing binlog event
    org.apache.kafka.connect.errors.ConnectException: Encountered change event for table statistic.apk_info whose schema isn't known to this connector
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:218) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:607) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1104) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:955) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:839) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
    Caused by: org.apache.kafka.connect.errors.ConnectException: Encountered change event for table xxx.xxx whose schema isn't known to this connector
        at io.debezium.connector.mysql.BinlogReader.informAboutUnknownTableIfRequired(BinlogReader.java:875) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.BinlogReader.handleUpdateTableMetadata(BinlogReader.java:849) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:590) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        ... 5 more
  • 原因

    • デプロイメントで使用されている特定のデータベースに対する必要な権限がありません。

    • デプロイメントで 'debezium.snapshot.mode'='never' 構成が使用されています。debezium.snapshot.mode パラメーターが never に設定されている場合、データはバイナリログの先頭から読み取られます。ただし、バイナリログの先頭の変更イベントに対応するテーブルスキーマが現在のテーブルのスキーマと一致しません。その結果、このエラーが発生します。

    • Debezium が解釈できない変更が存在します。たとえば、`DEFAULT (now())` が存在する場合、このエラーが発生する可能性があります。

  • 解決策

    • デプロイメントで使用されているすべてのデータベースに対する必要な権限があるかどうかを確認します。詳細については、「MySQL データベースの構成」をご参照ください。

    • 'debezium.snapshot.mode'='never' 構成を使用しないことをお勧めします。このエラーを回避するには、'debezium.inconsistent.schema.handling.mode' = 'warn' 構成を使用できます。

    • io.debezium.connector.mysql.MySqlSchema WARN ログをクエリして、Debezium が解釈できない変更を確認します。たとえば、`DEFAULT (now())` は解釈できません。

「The connector is trying to read binlog starting at GTIDs ..., but this is no longer available on the server」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.000064', pos=89887992, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed
        at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:133)
        at io.debezium.connector.common. BaseSourceTask.start (BaseSourceTask.java:106) 
        at io.debezium.embedded.EmbeddedEngine.run (EmbeddedEngine.java:758) 
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
        at java.util.concurrent.ThreadPoolExecutor. runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
  • 原因と解決策

    原因

    解決策

    デプロイメントによって読み取られているバイナリログファイルが MySQL サーバーから削除されました。これは、MySQL サーバー上のバイナリログの保持期間が短すぎるためです。

    バイナリログの保持期間を長くします。たとえば、バイナリログの保持期間を 7 日に変更できます。バイナリログの保持期間を変更するには、次のコマンドを実行します。

    show variables like 'expire_logs_days';
    set global expire_logs_days=7;

    MySQL CDC デプロイメントは、低速でバイナリログを消費します。たとえば、ダウンストリームの集約オペレーターまたはシンクオペレーターに長期間バックプレッシャーがあり、バックプレッシャーがソースに転送されます。その結果、ソースはデータを消費できません。

    デプロイメントのリソース構成を最適化して、ソースが期待どおりにデータを消費できるようにします。

    ApsaraDB for RDS MySQL のログは最大 18 時間保持でき、ストレージ容量の最大 30% を占有します。ログの保持期間が 18 時間を超えるか、ログが占有するストレージ容量が 30% を超えると、ログ削除操作がトリガーされます。書き込まれるデータが多すぎてストレージ容量の 30% 以上が占有されると、バイナリログが削除されて使用できなくなる可能性があります。

    ApsaraDB RDS for MySQL のバイナリログ有効期限ポリシーを変更して、バイナリログが期待どおりに読み取れるようにします。

    読み取り専用の ApsaraDB RDS for MySQL インスタンスを使用して CDC データを消費する場合、バイナリログの可用性は保証されません。読み取り専用の ApsaraDB RDS for MySQL インスタンスによって消費されるバイナリログは、オンプレミスのマシンに最低 10 秒間保持された後、Object Storage Service (OSS) にアップロードされます。MySQL CDC ソーステーブルのコネクタがデプロイメントの構成に基づいて読み取り専用の ApsaraDB RDS for MySQL インスタンスからデータを読み取り、フェールオーバー後 10 秒以内にデプロイメントが再開できない場合、このエラーが発生します。

    MySQL CDC ソーステーブルのコネクタが読み取り専用の ApsaraDB RDS for MySQL インスタンスからデータを読み取らないようにパラメーターを構成することをお勧めします。

    説明

    読み取り専用インスタンスのホスト名は rr で始まり、通常インスタンスのホスト名は rm で始まります。

    ApsaraDB RDS for MySQL インスタンスで内部データ移行が実行されます。

    デプロイメントを再起動してデータを再読み取りします。

「EventDataDeserializationException: Failed to deserialize data of EventHeaderV4」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    EventDataDeserializationException: Failed to deserialize data of EventHeaderV4 .... Caused by: java.net.SocketException: Connection reset
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:304)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:227)
        at io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:252)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:934)
    ... 3 more
    Caused by: java.io.EOFException
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read (ByteArrayInputStream.java:192)
        at java.io.InputSt ream.read (InputStream.java:170)
        at java.io.InputSt ream.skip (InputStream.java:224)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:301)
    ...    6 more
  • 原因

    • ネットワークの問題が発生しました。

    • デプロイメントにバックプレッシャーがあります。

      MySQL CDC デプロイメントにバックプレッシャーがある場合、MySQL CDC ソーステーブルで使用される Binlog クライアントはバックプレッシャーのためにデータの読み取りを続行できません。データベース上の残りの接続数を最小限に抑えるために、Binlog クライアントの接続がデータベースで指定されたタイムアウト期間を超えて非アクティブになると、MySQL データベースは自動的に接続をキャンセルします。その結果、デプロイメントは異常になります。

    • net_write_timeout パラメーターが小さすぎる値に設定されています。net_write_timeout パラメーターのデフォルト値は 60 (秒) です。このパラメーターの値が小さすぎると、MySQL データベースは自動的にクライアントを切断します。

  • 解決策

    • このエラーがネットワークの問題によって引き起こされた場合は、MySQL CDC ソーステーブルに 'debezium.connect.keep.alive.interval.ms' = '40000' 構成を追加します。データベースの構成を変更できる場合は、net_write_timeout パラメーターをより大きな値に設定することもできます。詳細については、「インスタンスパラメーターの最適化」をご参照ください。

    • このエラーがデプロイメントのバックプレッシャーによって引き起こされた場合は、デプロイメントのパラメーター構成を変更します。

    • VVR 8.0.7 以降を使用する Realtime Compute for Apache Flink は、バックプレッシャーによって引き起こされるエラーの再試行をサポートします。VVR 8.0.7 以降を使用する Realtime Compute for Apache Flink でバックプレッシャーのあるデプロイメントを実行できます。

「The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    org.apache.kafka.connect.errors.ConnectException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDS that the slave requires. Error code: 1236; SQLSTATE: HY000.
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241) 
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
        at io.debezium.connector.mysql.BinlogReadersReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1142) 
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:962)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:839)
        at java.lang.Thread.run(Thread. java:834)
    Caused by: com.github.shyiko.mysql. binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
        at com.github.shyiko.mysql.binlog. BinaryLogClient.listenForEventPackets(BinaryLogClient.java:926) 
    ... 3 more
  • 原因

    完全データの読み取りに必要な時間が長すぎます。その結果、完全データの読み取りが完了してバイナリログの読み取りが開始されると、以前に記録された GTID セットの起動位置が MySQL データベースから削除されます。

  • 解決策

    バイナリログを削除するのに必要な時間を長くするか、許可されるバイナリログファイルの最大サイズを大きくします。バイナリログを削除するのに必要な時間を変更するには、次のコマンドを実行します。

    mysql> show variables like 'expire_logs_days';
    mysql> set global expire_logs_days=7;

「The "before" field of UPDATE/DELETE message is null」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    java.lang.IllegalStateException: UPDATE/DELETE メッセージの "before" フィールドが null です。Postgres テーブルの REPLICA IDENTITY が FULL レベルに設定されていることを確認してください。この設定は、Postgres で 'ALTER TABLE xxx.xxx REPLICA IDENTITY FULL' コマンドを実行することで更新できます。詳細については、Debezium のドキュメントをご参照ください: https:debezium.io/documentation/reference/1.2/connectors/postresql.html#postgresql-replica-identity
        at com.alibaba.ververica.cdc.connectors.postgres.table.PostgresValueValidator.validate(PostgresValueValidator.java:46)
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:113)
        at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:158)
        at io.debezium.embedded.ConvertingEngineBuilder.lambdaşnotifying$2(ConvertingEngineBuilder.java:82)
        at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutorSWorker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
  • 原因

    PostgreSQL テーブルの REPLICA IDENTITY が FULL に設定されていません。

  • 解決策

    プロンプトに従って ALTER TABLE yourTableName REPLICA IDENTITY FULL; 文を実行します。ステートメントを実行してデプロイメントを再起動してもエラーが解決しない場合は、デプロイメントのコードに ALTER TABLE yourTableName REPLICA IDENTITY FULL; 構成を追加します。

「Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: xxx and table-name: xxxx」というエラーメッセージが表示された場合はどうすればよいですか?

  • 原因

    • 構成されたテーブル名がデータベースに見つかりません。

    • デプロイメントには異なるデータベースのテーブルが含まれています。ただし、使用しているアカウントには特定のデータベースに対する権限がありません。

  • 解決策

    1. 構成されたテーブル名がデータベースに存在するかどうかを確認します。

    2. 使用するアカウントに、デプロイメント内のすべてのデータベースに対する必要な権限を付与します。

「The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    VVR 4.0.X の MySQL CDC ソーステーブルで構文チェックが実行されると、次のエラーメッセージが表示されます。

    Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
        ... 30 more
  • 原因

    MySQL CDC ソーステーブルを作成するために使用される DDL 文の WITH 句のパラメーターにプライマリキーが定義されていません。

  • 解決策

    DDL 文にプライマリキー情報を追加します。

「java.io.EOFException: SSL peer shut down incorrectly」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    Caused by: java.io.EOFException: SSL peer shut down incorrectly
        at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302]
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302]
        at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?]
        at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?]
        at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?]
        at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?]
        at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?]
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?]
        ... 14 more
  • 原因

    MySQL 8.0.27 では、MySQL データベースはデフォルトで SSL 経由で接続されます。ただし、JDBC ドライバーを使用して接続された MySQL データベースには SSL 経由でアクセスできません。その結果、このエラーが発生します。

  • 解決策

    • デプロイメントの VVR バージョンを 6.0.2 以降にアップグレードできる場合は、MySQL CDC テーブルの WITH 句に 'jdbc.properties.useSSL'='false' 構成を追加します。

    • テーブルがディメンションテーブルとしてのみ宣言されている場合は、MySQL CDC テーブルの WITH 句で connector パラメーターを rds に設定し、URL パラメーターに characterEncoding=utf-8&useSSL=false 構成を追加します。例:

      'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'

「com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
        at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[?:?]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
    Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx, the last byte read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx. Error code: 1236; SQLSTATE: HY000.
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146) ~[?:?]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx, the last byte read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx.
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
  • 原因

    MySQL CDC ソーステーブルのコネクタがデータを読み取るときは、各並列サブタスクにサーバー ID が構成され、各サーバー ID が一意であることを確認してください。MySQL CDC ソーステーブルのコネクタによって読み取られたデータの server-id パラメーターの値が、同じデプロイメントの CDC ソーステーブル、別のデプロイメントの CDC ソーステーブル、または同期ツールの server-id パラメーターの値と競合する場合、このエラーが発生します。

  • 解決策

    MySQL CDC ソーステーブルの各並列サブタスクにグローバルに一意のサーバー ID を指定します。詳細については、「MySQL CDC ソーステーブルの作成」トピックの「注意事項」セクションをご参照ください。

テーブルの完全データ読み取り中に MySQL テーブルに列を追加した後に「NullPointerException」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    Caused by: org.apache.flink.util.FlinkRuntimeException: Read split MySqlSnapshotSplit{tableId=iplus.retail_detail, splitId='iplus.retail_detail:68', splitKeyType=[`id` BIGINT NOT NULL], splitStart=[212974500236****], splitEnd=[213118153601****], highWatermark=null} error due to java.lang.NullPointerException.
      at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:361)
      at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:293)
      at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:124)
      at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:86)
      at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
      at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
      ... 6 more
  • 原因

    デプロイメント内のテーブルの完全データ読み取り中に、デプロイメントの開始時にテーブルのスキーマが決定され、スキーマがチェックポイントに記録されます。完全データ読み取り中にテーブルに列が追加されると、スキーマを一致させることができません。その結果、エラーが返されます。

  • 解決策

    デプロイメントをキャンセルし、データが同期されるダウンストリームテーブルを削除します。次に、状態なしでデプロイメントを再起動します。

「The connector is trying to read binlog starting at GTIDs xxx」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    CDC デプロイメントが読み取ろうとしているバイナリログファイルは、MySQL サーバー上でクリアされています。

    The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.000064', pos=89887992, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed
  • 原因

    • MySQL サーバーでバイナリログファイルに構成されている保持期間が短すぎます。その結果、ファイルは自動的に削除されます。

    • CDC デプロイメントがバイナリログデータを処理する速度が遅すぎます。

  • 解決策

    • バイナリログファイルの保持期間を長くします。たとえば、保持期間を 7 日に設定できます。

      mysql> show variables like 'expire_logs_days';
      mysql> set global expire_logs_days=7;
    • Flink デプロイメントにより多くのリソースを割り当てて、バイナリログデータの処理を高速化します。

「Mysql8.0 Public Key Retrieval is not allowed」というエラーメッセージが表示された場合はどうすればよいですか?

  • 原因

    構成された MySQL ユーザーは SHA256 パスワード認証を使用しており、パスワードは Transport Layer Security (TLS) 経由で送信する必要があります。

  • 解決策

    MySQL ユーザーがネイティブパスワードを使用してデータベースにアクセスできるようにします。認証方式を変更するには、次のコマンドを実行します。

    mysql> ALTER USER 'username'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
    mysql> FLUSH PRIVILEGES; 

「sub account not auth permission」というエラーメッセージが表示された場合はどうすればよいですか?

「DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name'」というエラーメッセージが表示された場合はどうすればよいですか?

  • 問題の説明

    Cause by:java.sql.SQLSyntaxErrorException:DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name'
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        ...
  • 原因

    MySQL CDC データストリームを処理するために使用される SQL 文に WHERE 句が追加されている場合、Realtime Compute for Apache Flink は、UPDATE 操作が実行されたときに生成されたデータの BEFORE UPDATE データレコードと AFTER UPDATE データレコードをダウンストリームに送信します。ダウンストリームは、BEFORE UPDATE データレコードを DELETE 操作として識別します。この場合、MySQL CDC 結果テーブルで操作を実行するユーザーは DELETE 権限を持っている必要があります。

  • 解決策

    SQL ロジックに retract 操作が存在するかどうかを確認します。retract 操作が存在する場合は、MySQL CDC 結果テーブルで操作を実行するユーザーに DELETE 権限を付与します。