すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:データ同期に関する FAQ

最終更新日:May 15, 2025

このトピックでは、Realtime Compute for Apache Flink のデータ同期に関するよくある質問への回答を提供します。

JSON 形式のメッセージが Kafka から Flink 経由で Hologres に送信される場合、JSON スキーマの変更はどのように処理すればよいですか?

一般的な解決策は、Flink デプロイメントをキャンセルし、コードと Hologres テーブルのスキーマを変更し、スクリプトを再デプロイし、デプロイメントを再起動することです。ただし、これによりデータ配信が遅延し、エラーが発生する可能性があります。

Realtime Compute for Apache Flink は、スキーマ進化を処理するために次の最適化を実装しています。

  • Kafka JSON メッセージの自己適応型スキーマ進化。JSON スキーマの進化が発生した場合、Flink はデプロイメントをキャンセルしてコードを変更することなく、スキーマの変更を Hologres に自動的に同期します。

  • Kafka JSON メッセージの型推論。これにより、Kafka テーブルの DDL でデータ型を宣言する必要がなくなります。

  • 再帰的な JSON 展開。たとえば、{"nested": {"col": true}} では、col は nested.col に再帰的に展開されます。

「Encountered change event for table xxx.xxx whose schema isn't known to this connector」エラーを修正するにはどうすればよいですか?

  • 説明

    image.png

  • 原因

    CREATE DATABASE AS(CDAS)または CREATE TABLE AS(CTAS)を使用して新しいテーブルを同期する場合、次の理由によりこのエラーが発生する可能性があります。

    • データベース権限が不十分です。Flink ジョブのコネクタに構成されているデータベースアカウントに必要なデータベースにアクセスするための十分な権限がありません。

    • 'debezium.snapshot.mode'='never' が構成されています。データはバイナリログの先頭から読み取られます。ただし、バイナリログの先頭にある変更イベントに対応するテーブルスキーマが、現在のテーブルのスキーマと一致しません。

    • Debezium が処理できない変更(`DEFAULT (now())` など)があります。

  • 解決策

    • Flink ジョブで使用されるすべてのデータベースに必要な権限をデータベースアカウントに付与します。必要な権限には、通常、データの読み取り、書き込み、更新、テーブルスキーマの作成と更新、テーブルの作成が含まれます。詳細な権限要件については、特定の コネクタ のトピックをご参照ください。

    • 'debezium.snapshot.mode'='never' の代わりに 'debezium.inconsistent.schema.handling.mode' = 'warn' オプションを使用します。

    • Debezium が処理できない特定の変更(`DEFAULT (now())` など)を示す io.debezium.connector.mysql.MySqlSchema WARN ログエントリを探します。

「Currently does not support merge StreamExecMiniBatchAssigner type ExecNode in CTAS/CDAS syntax」エラーを修正するにはどうすればよいですか?

  • 説明

    SQL ドラフトをデプロイするか、SQL デプロイメントを開始すると、次のエラーが報告されます。

    image.png

  • 原因

    MiniBatch をサポートしていない CTAS または CDAS ステートメントを含む SQL ストリームドラフトで、'table.exec.mini-batch.enabled' = 'true' を設定して MiniBatch を有効にしています。

  • 解決策

    CTAS または CDAS ステートメントを含む SQL ストリームドラフトから MiniBatch 構成を削除します。実際の状況に応じて、次のいずれかの解決策を選択してください。

    • SQL ドラフトが正常にデプロイされていない場合:

      1. 開発コンソールの左側のナビゲーションウィンドウで、[運用&管理] > [構成] を選択します。

      2. [構成] ペインで、[デプロイメントのデフォルト] タブを選択します。

      3. [その他の構成] セクションで、MiniBatch 構成を削除するか、table.exec.mini-batch.enabledfalse に設定します。

      4. [変更の保存] をクリックします。

      5. ドラフトを再度デプロイします。

        image

    • SQL ドラフトがデプロイされている場合:

      検証をスキップしてドラフトを正常にデプロイした場合は、次の手順を実行します。

      1. 開発コンソールの左側のナビゲーションウィンドウで、[運用&管理] > [デプロイメント] を選択します。

      2. SQL デプロイメントの名前をクリックします。

      3. [構成] タブを選択します。

      4. [パラメータ] セクションで、[編集] をクリックします。

      5. [その他の構成] フィールドで、MiniBatch 構成を削除するか、table.exec.mini-batch.enabledfalse に設定します。

      6. デプロイメントを再起動します。

      image