すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:データの有効性に関する FAQ

最終更新日:Jun 27, 2025

このトピックでは、データの有効性に関するよくある質問への回答を提供します。

シンクテーブルに出力がないのはなぜですか?

  • 説明

    ジョブの開始後、シンクテーブルにデータが表示されません。

  • 問題のトラブルシューティング

    作业排错流程图

    1. フェールオーバーが発生するかどうかを確認します。

      • トラブルシューティング

        エラーメッセージに従って、フェールオーバーの原因を分析します。

      • 解決策

        ジョブが期待どおりに実行されるように、原因を解決します。

    2. データが Realtime Compute for Apache Flink に入力されたことを確認します。

      • トラブルシューティング

        フェールオーバーが発生しないが、データ遅延が非常に大きい場合は、監視およびアラートページで numRecordsInOfSource メトリックの値を表示して、各ソースに入力データがあるかどうかを確認します。

      • 解決策

        ソーステーブルをチェックして、ソーステーブルのデータが Realtime Compute for Apache Flink に送信されていることを確認します。

    3. オペレーターによってデータがフィルタリングされていないかどうかを確認します。

      pipeline.operator-chaining: 'false'[その他の構成] フィールドに追加します。詳細な手順については、「実行中のデプロイメントのカスタムパラメータを構成するにはどうすればよいですか?」を参照してください。オペレーターを分割し、各オペレーターの入力(受信バイト数)と出力(送信バイト数)を観察して、問題のあるオペレーターを特定します。オペレーターに入力はあるが出力がない場合、データはこのオペレーターでフィルタリングされています。この問題は、多くの場合、JOIN、WINDOW、WHERE のいずれかのオペレーターが原因で発生します。

    4. デフォルトのキャッシュメカニズムに基づいて、ダウンストリームデータベースのデータがキャッシュされているかどうかを確認します。

      解決策:ダウンストリームストレージのバッチサイズを調整します。

      重要

      非常に小さいバッチサイズを使用すると、ダウンストリームデータベースが過負荷になり、パフォーマンスのボトルネックが発生する可能性があります。たとえば、バッチサイズが 1 の場合、Flink は処理されたレコードごとにリクエストを送信します。これは、特にデータ量が多い場合にデータベースに負担をかけます。

    5. ダウンストリームの ApsaraDB RDS データベースにデッドロックが発生していないかどうかを確認します。

      解決策:ApsaraDB RDS for MySQL コネクタまたは TDDL コネクタを使用して MySQL データベースにデータを書き込むときにデッドロックが発生した場合はどうすればよいですか? を参照してください。

説明

印刷シンクテーブルを使用して計算結果をログに出力することで、問題のトラブルシューティングを行います。詳細については、「Realtime Compute for Apache Flink のコンソールで印刷データ結果を表示するにはどうすればよいですか?」を参照してください。

Flink ソースの読み取りに関する問題のトラブルシューティングを行うにはどうすればよいですか?

Realtime Compute for Apache Flink がソースからデータを読み取れない場合は、次の手順を実行します。

  • ネットワーク接続を確認します。

    デフォルトでは、Realtime Compute for Apache Flink は、独自のリージョンおよび VPC (Virtual Private Cloud)内のサービスにのみアクセスできます。他の方法でリソースにアクセスするには、次のトピックを参照してください。

  • アップストリームサービスのホワイトリスト構成を確認します。

    Kafka や Elasticsearch などのサービスからデータを読み取るには、次のように Flink をホワイトリストに追加します。

    1. Flink ワークスペースの vSwitch の CIDR ブロックを取得します。

      CIDR ブロックの取得方法の詳細については、「ホワイトリストを構成するにはどうすればよいですか?」を参照してください。

    2. アップストリームサービスのホワイトリストを構成します。

      詳細については、「Kafka コネクタ」などの、それぞれのコネクタドキュメントの「前提条件」セクションを参照してください。

  • Flink テーブルと物理テーブルの間で、フィールドタイプ、フィールド順序、およびフィールド名の大文字と小文字の区別の一貫性を確認します。

    Flink テーブルと物理テーブルの一貫性を確保するには、Flink テーブルの DDL ステートメントを記述するときに、次のガイドラインに従ってください。

    • フィールド順序:物理テーブルのフィールド順序を正確に複製します。

    • フィールド名の大文字と小文字の区別:物理テーブルと同じ大文字と小文字の区別を使用してフィールド名を使用します。

    • フィールドタイプ:Flink と外部サービスでは、異なるデータタイプがサポートされている場合があります。 Flink テーブルのフィールドタイプは、物理テーブルのフィールドのマップされた同等のタイプである必要があります。 Flink と外部サービス間のタイプマッピングの詳細については、「Simple Log Service コネクタ」などの、それぞれのコネクタドキュメントの「データタイプマッピング」セクションを参照してください。

  • ソーステーブルの Taskmanager.log ファイルに例外メッセージが含まれているかどうかを確認します。

    例外が報告された場合は、エラーメッセージに基づいてエラーのトラブルシューティングを行います。ソーステーブルの Taskmanager.log ファイルを表示するには、次の手順を実行します。

    1. 開発コンソールの左側の ナビゲーションウィンドウ で、[O&M] > [デプロイメント] に移動します。

    2. ターゲットデプロイメントの名前をクリックします。

    3. [ステータス] タブをクリックし、DAG 内のソースを表す頂点をクリックします。

    4. 右側のペインで、[サブタスク] タブをクリックします。

    5. [詳細] 列で、image アイコンをクリックし、[TaskManager ログページを開く] を選択します。TM日志

    6. [ログ] タブで、ログ情報を確認します。

      「Caused by」を含む最も古いメッセージを探します。通常、これは例外の根本原因を示しています。次に、メッセージに基づいて例外のトラブルシューティングを行うことができます。

    ダウンストリームシステムに出力がない場合のトラブルシューティングを行うにはどうすればよいですか?

    次の手順を実行して、問題のトラブルシューティングを行います。

    • ネットワーク接続を確認します。

      デフォルトでは、Realtime Compute for Apache Flink は、独自のリージョンおよび VPC (Virtual Private Cloud)内のサービスにのみアクセスできます。他の方法でリソースにアクセスするには、次のトピックを参照してください。

    • ダウンストリームシステムのホワイトリスト構成を確認します。

      ApsaraDB RDS for MySQL、Kafka、Elasticsearch、AnalyticDB for MySQL 3.0、Apache HBase、Redis、ClickHouse などのサービスにデータを書き込むには、Flink をホワイトリストに追加する必要があります。

      1. Flink ワークスペースの vSwitch の CIDR ブロックを取得します。

        CIDR ブロックの取得方法の詳細については、「ホワイトリストを構成するにはどうすればよいですか?」を参照してください。

      2. ダウンストリームサービスのホワイトリストを構成します。

        詳細については、「ApsaraDB RDS for MySQL コネクタ」などの、それぞれのコネクタドキュメントの「前提条件」セクションを参照してください。

    • Flink テーブルと物理テーブルの間で、フィールドタイプ、フィールド順序、およびフィールド名の大文字と小文字の区別の一貫性を確認します。

      Flink テーブルと物理テーブルの一貫性を確保するには、Flink テーブルの DDL ステートメントを記述するときに、次のガイドラインに従ってください。

      • フィールド順序:物理テーブルのフィールド順序を正確に複製します。

      • フィールド名の大文字と小文字の区別:物理テーブルと同じ大文字と小文字の区別を使用してフィールド名を使用します。

      • フィールドタイプ:Flink と外部サービスでは、異なるデータタイプがサポートされている場合があります。 Flink テーブルのフィールドタイプは、物理テーブルのフィールドのマップされた同等のタイプである必要があります。 Flink と外部サービス間のタイプマッピングの詳細については、「Simple Log Service コネクタ」などの、それぞれのコネクタドキュメントの「データタイプマッピング」セクションを参照してください。

    • WHERE、JOIN、WINDOW などの仲介オペレーターによってデータがフィルタリングされていないかどうかを確認します。

      ジョブの DAG 内の各頂点の入力と出力を調べます。たとえば、WHERE 頂点の入力が 5 で出力が 0 の場合、これは WHERE オペレーターによってデータがフィルタリングされていることを示しています。

    • ダウンストリームシステムのシンク固有のコネクタオプションのデフォルト値が大きすぎるかどうかを確認します。

      入力量が小さい場合、特定のシンクオプションのデフォルト値が大きすぎると、データがダウンストリームシステムにフラッシュされない可能性があります。これは、シンクにバッファリングされたデータがデフォルトの出力しきい値に到達できないためです。この問題を解決するには、必要に応じて、以下にリストされている関連オプションを小さい値で構成します。

      オプション

      説明

      関連するダウンストリームサービス

      batchSize

      一度に書き込まれるデータのサイズ。

      batchCount

      一度に書き込まれるデータレコードの最大数。

      DataHub

      flushIntervalMs

      MaxCompute Tunnel のライターのバッファでフラッシュ操作が実行される間隔。

      MaxCompute

      sink.buffer-flush.max-size

      データが ApsaraDB for HBase データベースに書き込まれる前にメモリにキャッシュされるデータのサイズ(バイト単位)。このパラメータの値を大きくすると、ApsaraDB for HBase の書き込みパフォーマンスは向上しますが、書き込みレイテンシが長くなり、より多くのメモリが消費されます。

      ApsaraDB for HBase

      sink.buffer-flush.max-rows

      データが ApsaraDB for HBase データベースに書き込まれる前にメモリにキャッシュされるデータレコードの数。このパラメータの値を大きくすると、ApsaraDB for HBase の書き込みパフォーマンスは向上しますが、書き込みレイテンシが長くなり、より多くのメモリが消費されます。

      ApsaraDB for HBase

      sink.buffer-flush.interval

      キャッシュされたデータが ApsaraDB for HBase データベースに書き込まれる間隔。このパラメータは、ApsaraDB for HBase データベースへのデータの書き込みレイテンシを制御します。

      ApsaraDB for HBase

      jdbcWriteBatchSize

      JDBC ドライバーが使用されている場合に、Hologres ストリーミングシンクノードによって一度に処理できるデータ行の最大数。

      Hologres

    • ウィンドウ操作で順序が正しくないデータが原因でデータが出力できないかどうかを確認します。

      次のシナリオを考えてみます。最初のレコードのタイムスタンプは 2100 で、ウォーターマークも 2100 です。システムは 2100 より前のデータが処理済みであると想定し、この時間以降のデータのみを処理します。 2021 などのタイムスタンプを持つ後続のレコードは、ウォーターマーク 2100 よりも小さいため破棄されます。現在のイベント時間ウィンドウは閉じることができず、2100 より古いタイムスタンプを持つレコードが到着するまで結果は計算されません。

      ソースで順序が正しくないデータを確認するには、印刷シンクテーブルを使用するか、Log4j ログを調べます。詳細については、「印刷結果テーブルを作成する」および「デプロイメントのログをエクスポートするためのパラメータを構成する」を参照してください。順序が正しくないレコードがあることが確認された場合は、それらをフィルタリングするか、遅延レコードの処理を許可します。

    • すべての並列ソースオペレーターに入力があることを確認します。

      ソース サブタスクが入力を受け取らない場合、そのウォーターマークはデフォルト値の 1970-01-01T00:00:00Z のままです。これがソース オペレーター全体のウォーターマークになります。その結果、ウィンドウが閉じなくなり、データが出力されません。

      ソースサブタスクが入力を受け取らない場合、そのウォーターマークはデフォルト値の 1970-01-01T00:00:00Z のままです。これは、ソースオペレーターの全体的なウォーターマークになります。その結果、ウィンドウが閉じなくなり、データが出力されなくなります。トラブルシューティングを行うには、ジョブの DAG を調べて、すべてのソースサブタスクが入力を受け取っていることを確認します。入力を受け取らない頂点がある場合は、ジョブの並列処理をアップストリームテーブルのシャード数に合わせて削減し、すべてのサブタスクが入力を受け取るようにします。

    • すべての Kafka パーティションにデータが含まれていることを確認します。

      空の Kafka パーティションは、ウォーターマークの生成を妨げる可能性があります。詳細と解決策については、「イベント時間ベースのウィンドウ関数を使用して Kafka ソーステーブルのデータを計算した後、データ出力が返されないのはなぜですか?」を参照してください。

データ損失のトラブルシューティングを行うにはどうすればよいですか?

データ量の削減は、データ損失が原因である可能性があります。データ損失のトラブルシューティングを行うには、次の手順を実行します。

  • ディメンションテーブルのキャッシュポリシーを確認します。

    ルックアップ結合の失敗とデータ損失を防ぐために、ディメンションテーブルに適切なキャッシュポリシーを設定します。 詳細については、関連コネクタドキュメントのキャッシュ関連、ディメンションテーブル固有のオプション(ディメンション固有のオプション(キャッシュ関連) セクションなど ApsaraDB for HBase コネクタ Topic)をご参照ください。

  • 関数が正しく使用されていることを確認します。

    to_timestamp_tzdate_format などの関数を誤って使用すると、データ変換エラーやデータ損失が発生する可能性があります。

    print シンクテーブルまたは Log4j を使用してログ分析を通じて関数の使用状況を確認します。 詳細については、「Print コネクタ」および「デプロイメントのログをエクスポートするためのパラメーターを設定する」をご参照ください。

  • 順序が正しくないデータが存在するかどうかを確認します。

    現在のウィンドウの期間外の遅延イベントは破棄されます。 次の図では、タイムスタンプ 11 秒のイベントが 15 ~ 20 秒のウィンドウに入ります。 ウォーターマークが 11 であるため、遅延していると見なされ、破棄されます。乱序

    データは通常、単一のウィンドウ中に失われます。 print シンクテーブルまたは Log4j を使用して、順序が正しくないデータを検出できます。 詳細については、「Print コネクタ」および「デプロイメントのログをエクスポートするためのパラメーターを設定する」をご参照ください。

    順序が正しくないデータを正確に処理するには、適切なウォーターマーク生成戦略(ウォーターマーク = イベント時間 - 5 秒など)を設定します。 さらに、ウィンドウを正確な日、時間、または分の間隔に合わせることをお勧めします。 この方法とサービス停止保護期間の延長により、順序が大きく正しくないデータの損失を防ぐことができます。

CDC モードで Hologres から取り込んだデータを ROW_NUMBER を使用して重複除去すると、不正確な結果が返されるのはなぜですか?

  • 不正確な結果

    image.png

  • 原因

    ダウンストリームは、重複排除のための ROW_NUMBER OVER WINDOW などの削除オペレーターを使用していますが、データは upsert モードで Hologres から取り込まれていません。

  • 解決策

    データ重複除去のために、ソーステーブルの DDL 文の WITH 句に 'upsertSource' = 'true' を追加します。

    image.png

不正確な結果のトラブルシューティングを行うにはどうすればよいですか?

  1. ログレベルを調整します。

    ログレベルを INFO に変更する.

  2. オペレータープロファイリングを有効にします。

    プログラムロジックを変更せずに中間結果を表示できます。

  3. ランタイムログを分析します。

    1. ターゲットデプロイメントの名前をクリックします。

    2. デプロイメント詳細ページで、[ステータス] タブをクリックします。

    3. DAG グラフで、オペレーター名をコピーします。

    4. デプロイメントのログリストの [ログ名] 列で、inspect-taskmanager_0.out をクリックし、オペレーター名を検索します。

    image

  4. 最適化と検証を実行します。

    ログで原因を見つけた後、問題のあるオペレーターのロジックを修正し、ジョブを再起動して、データの精度を確認します。

ノード TableSourceScan によって生成された更新および削除の変更の消費をサポートしていません」エラーを修正するにはどうすればよいですか?

  • エラーメッセージ

    Table sink 'vvp.default.***' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[vvp, default, ***]], fields=[id,b, content])
        // Table sink 'vvp.default.***' は、ノード TableSourceScan(table=[[vvp, default, ***]], fields=[id,b, content]) によって生成される更新および削除の変更の消費をサポートしていません。
        at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.wrapExecutor(DelegateOperationExecutor.java:286)
        at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.validate(DelegateOperationExecutor.java:211)
        at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.validate(FlinkSqlServiceImpl.java:741)
        at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:2522)
        at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
        at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
  • 原因

    シンクテーブルの書き込みモードは追加専用であるため、データ更新を消費できません。

  • 解決策

    Upsert Kafka コネクタ などのコネクタを使用して、更新イベントをサポートするシンクテーブルを作成します。

Lindorm コネクタを使用しているときに、予期しないデータの上書きまたは削除を修正するにはどうすればよいですか?

説明

デフォルトでは、Lindorm コネクタはデータ書き込みに upsert materialize オペレーター(デフォルト値は AUTO)を使用します。このオペレーターは、同じプライマリキーに対して DELETE の後に INSERT を生成します。 Lindorm はミリ秒単位のタイムスタンプでデータバージョンを管理するため、同じプライマリキーを持つ複数のイベントが 1 ミリ秒以内に書き込まれた場合、システムは正確な順序を確立するのが困難になる可能性があります。これにより、予期しないデータの上書きまたは削除が発生する可能性があります。

原因

  • タイムスタンプの精度: Lindorm はミリ秒単位のタイムスタンプでデータバージョンを管理します。 同じプライマリキーを持つ複数のレコードが同じミリ秒以内に書き込まれた場合、システムは正しい順序を判断できず、バージョン競合が発生する可能性があります。

  • 書き込みセマンティクスの違い: Lindorm は UPSERT 構文(既存の行は新しい値で更新される)のみをサポートし、ネイティブの DELETE サポートがないため、削除は元に戻せません。 そのため、upsert materialize オペレーターの順序維持ロジックは Lindorm シナリオでは無意味になり、DELETE + INSERT 操作によるデータ異常が発生する可能性があります。

リスクと影響

1 ミリ秒以内の同時書き込みは、予期しない DELETE と INSERT の組み合わせにつながり、データの損失または状態の不整合を引き起こす可能性があります。

解決策

明示的に upsert materialize オペレーターを無効にします。

適用可能なシナリオ: Flink を介して Lindorm にデータが書き込まれるユースケースに最適です。

構成: ジョブ ランタイム パラメーター構成または SQL コードで、次の文を追加してオペレーターをグローバルに無効にします:

SET 'table.exec.sink.upsert-materialize' = 'NONE'; // オペレーターをグローバルに無効にする

注記: このオペレーターを無効にした後、結果整合性のみが保証されます。ビジネス アプリケーションで許容できることを確認してください。