すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:コネクタ

最終更新日:Nov 09, 2025

このトピックでは、Realtime Compute for Apache Flink のコネクタに関するよくある質問 (FAQ) に回答します。

Flink は Kafka から JSON データをどのように取得しますか?

  • 通常の JSON データを取得するには、詳細については、「JSON Format」をご参照ください。

  • ネストされた JSON データを取得するには、ソーステーブル DDL で ROW フォーマットを使用して JSON オブジェクトを定義できます。結果テーブル DDL では、取得したい JSON データに対応するキーを定義できます。DML 文では、キーの取得方法を指定できます。これにより、対応するネストされたキーの値を取得できます。次のコードは例を示しています。

    • サンプルデータ

      {
          "a":"abc",
          "b":1,
          "c":{
              "e":["1","2","3","4"],
              "f":{"m":"567"}
          }
      }
    • ソーステーブル DDL

      CREATE TEMPORARY TABLE `kafka_table` (
        `a` VARCHAR,
         b int,
        `c` ROW<e ARRAY<VARCHAR>,f ROW<m VARCHAR>>  -- c は Flink の ROW に対応する JSON オジェクトです。e は ARRAY に対応する JSON リストです。
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'xxx',
        'properties.bootstrap.servers' = 'xxx',
        'properties.group.id' = 'xxx',
        'format' = 'json',
        'scan.startup.mode' = 'xxx'
      );
    • 結果テーブル DDL

      CREATE TEMPORARY TABLE `sink` (
       `a` VARCHAR,
        b INT,
        e VARCHAR,
        `m` varchar
      ) WITH (
        'connector' = 'print',
        'logger' = 'true'
      );
    • DML 文

      INSERT INTO `sink`
        SELECT 
        `a`,
        b,
        c.e[1], -- Flink はインデックス 1 から配列を走査します。この例では、配列のインデックス 1 の要素を取得します。配列全体を取得するには、[1] を削除します。
        c.f.m
      FROM `kafka_table`;
    • テスト結果测试结果

Flink が Kafka とのネットワーク接続は確立しているものの、データの消費または書き込みができない場合はどうすればよいですか?

  • 原因

    Flink と Kafka の間でプロキシ、ポートマッピング、またはその他の転送メカニズムが使用されている場合、Kafka クライアントが Kafka サーバーからプルするネットワークアドレスはプロキシのアドレスではなく、Kafka サーバーのアドレスです。この場合、Flink と Kafka の間にネットワーク接続が存在していても、Flink はデータを消費または書き込みできません。

    Flink と Kafka クライアント (Flink Kafka コネクタ) 間の接続は、2 つのステップで確立されます。

    1. Kafka クライアントは Kafka サーバー (ブローカー) からメタデータをプルします。メタデータには、Kafka サーバー上のすべてのブローカーのネットワークアドレスが含まれます。

    2. Flink は Kafka クライアントを使用して Kafka サーバーからネットワークアドレスをプルし、データを消費または書き込みます。

  • トラブルシューティング

    次の手順を実行して、Flink と Kafka の間でプロキシ、ポートマッピング、またはその他の転送メカニズムが使用されているかどうかを確認できます。

    1. ZooKeeper コマンドラインインターフェイス (zkCli.sh または zookeeper-shell.sh) を使用して、Kafka クラスターが使用する ZooKeeper クラスターにログインします。

    2. クラスターに適したコマンドを実行して、Kafka ブローカーのメタデータを取得します。

      通常、get /brokers/ids/0 コマンドを使用して Kafka ブローカーのメタデータを取得できます。Kafka の接続アドレスは endpoints フィールドにあります。endpoint

    3. pingtelnet などのコマンドを使用して、Flink とメタデータに表示されるエンドポイント間のネットワーク接続をテストします。

      接続に失敗した場合、Flink と Kafka の間でプロキシ、ポートマッピング、またはその他の転送メカニズムが使用されています。

  • ソリューション

    • プロキシ、ポートマッピング、またはその他の転送メカニズムを使用しないでください。Flink と Kafka の間に直接ネットワーク接続を確立して、Flink が Kafka メタデータに表示されるエンドポイントに直接接続できるようにします。

    • Kafka の運用保守 (O&M) エンジニアに連絡して、Kafka ブローカーの advertised.listeners パラメーターに転送アドレスを追加します。これにより、Kafka クライアントは転送アドレスを含む Kafka サーバーのメタデータをプルできるようになります。

      説明

      Kafka 0.10.2.0 以降のみが、Kafka ブローカーのリスナーへのプロキシのアドレスの追加をサポートしています。

    この問題の原則の詳細については、「KIP-103: Separation of Internal and External traffic」および「Kafka Connectivity Issues Explained」をご参照ください。

イベント時間ベースのウィンドウの後、Kafka ソーステーブルがデータを出力しないのはなぜですか?

  • 詳細

    イベント時間ベースのウィンドウの後、Kafka ソーステーブルはデータを出力しません。

  • 原因

    Kafka のパーティションにデータがない場合、ウォーターマークの生成が影響を受けます。その結果、Kafka ソーステーブルはイベント時間ベースのウィンドウの後にデータを出力できません。

  • 解決策

    1. すべてのパーティションにデータが含まれていることを確認してください。

    2. [追加設定] セクションに次のコードを追加して保存することで、アイドルソース検出機能を有効にできます。詳細については、「カスタムジョブ実行パラメーターを設定するにはどうすればよいですか?」をご参照ください。

      table.exec.source.idle-timeout: 5

      table.exec.source.idle-timeout パラメーターの詳細については、「Configuration」をご参照ください。

Kafka のコミットオフセットの目的は何ですか?

Kafka のコミットオフセットは、ストリームデータ処理の一貫性と信頼性を確保するために使用されます。処理済みデータの位置を記録することで、データの重複や損失を防ぎます。コミットオフセットは通常、Kafka などのメッセージキューとともに使用され、ストリームデータ処理におけるデータ消費の進行状況を制御します。Flink は、チェックポイントが成功した場合にのみ、現在の読み取りオフセットを Kafka にコミットします。これにより、処理済みのデータの位置が記録されます。チェックポイントが有効になっていない場合や、チェックポイントの間隔が長すぎる場合、現在の読み取りオフセットが Kafka 側で見つからない可能性があります。これにより、データの重複や損失が発生する可能性があります。

Kafka コネクタを使用してネストされた JSON データを解析するにはどうすればよいですか?

たとえば、次の JSON データを JSON フォーマットを使用して直接解析すると、データは ARRAY<ROW<cola VARCHAR, colb VARCHAR>> フィールドに解析されます。このフィールドは ROW 型の配列であり、ROW 型には 2 つの VARCHAR フィールドが含まれています。次に、ユーザー定義のテーブル値関数 (UDTF) を使用してデータを解析できます。

{"data":[{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"}]}

セキュリティ情報が構成されている Kafka クラスターに接続するにはどうすればよいですか?

  1. Kafka DDL の WITH パラメーターに、暗号化と認証に関連するセキュリティ構成を追加できます。セキュリティ構成の詳細については、「SECURITY」をご参照ください。次のコードは例を示しています。

    重要

    セキュリティ構成には properties. プレフィックスを追加する必要があります。

    • 次の例は、Kafka テーブルを構成して PLAIN を SASL メカニズムとして使用し、JAAS 構成を提供する方法を示しています。

      CREATE TABLE KafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `behavior` STRING,
        `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
        'connector' = 'kafka',
        ...
        'properties.security.protocol' = 'SASL_PLAINTEXT',
        'properties.sasl.mechanism' = 'PLAIN',
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
      );
    • 次の例は、SASL_SSL をセキュリティプロトコルとして使用し、SCRAM-SHA-256 を SASL メカニズムとして使用する方法を示しています。

      CREATE TABLE KafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `behavior` STRING,
        `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
        'connector' = 'kafka',
        ...
        'properties.security.protocol' = 'SASL_SSL',
        /* SSL 構成 */
        /* サーバーから提供されたトラストストア (CA 証明書) のパスを構成します。*/
        'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
        'properties.ssl.truststore.password' = 'test1234',
        /* クライアント認証が必要な場合は、キーストア (秘密鍵) のパスを構成します。*/
        'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
        'properties.ssl.keystore.password' = 'test1234',
        /* SASL 構成 */
        /* SASL メカニズムを SCRAM-SHA-256 として構成します。*/
        'properties.sasl.mechanism' = 'SCRAM-SHA-256',
        /* JAAS を構成します。*/
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
      );
      説明
      • properties.sasl.mechanism が SCRAM-SHA-256 の場合、properties.sasl.jaas.config には org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule を使用します。

      • properties.sasl.mechanism が PLAINTEXT の場合、properties.sasl.jaas.config には org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule を使用します。

  2. ジョブの [追加の依存関係] セクションで、証明書、公開鍵、秘密鍵などの必要なファイルをアップロードできます。

    ファイルがアップロードされると、/flink/usrlib ディレクトリに保存されます。[追加の依存関係] セクションでファイルをアップロードする方法の詳細については、「ジョブをデプロイする」をご参照ください。

    重要

    Kafka ブローカーのユーザー名とパスワードの認証メカニズムが SASL_SSL であるにもかかわらず、クライアントの認証メカニズムが SASL_PLAINTEXT である場合、ジョブの検証中に OutOfMemory 例外が報告されます。この場合、クライアントの認証メカニズムを変更する必要があります。

フィールド名の競合を解決するにはどうすればよいですか?

  • 症状

    Kafka データソースからのメッセージは、2 つの JSON 文字列にシリアル化されます。この場合、キーと値の両方に同じ名前のフィールド (サンプルコードの id フィールドなど) が含まれます。このデータを直接 Flink テーブルに解析して処理すると、フィールド名の競合が発生します。

    • キー

      {
         "id": 1
      }

    • {
         "id": 100,
         "name": "flink"
      }
  • 解決策

    key.fields-prefix プロパティを使用することで、この問題を回避できます。次の SQL 文は Flink テーブルを定義します。

    CREATE TABLE kafka_table (
      -- ここでキーと値のフィールドを定義します。
      key_id INT,
      value_id INT,
      name STRING
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_topic',
      'properties.bootstrap.servers' = 'localhost:9092',
      'format' = 'json',
      'json.ignore-parse-errors' = 'true',
      -- キーのフィールドとそれに対応するデータ型を指定します。
      'key.format' = 'json',
      'key.fields' = 'id',
      'value.format' = 'json',
      'value.fields' = 'id, name',
      -- キーのフィールドにプレフィックスを設定します。
      'key.fields-prefix' = 'key_'
    );

    Flink テーブルを作成するとき、key.fields-prefix プロパティは key_ として指定されます。これは、Flink が Kafka からのデータを処理するときに、キーのフィールド (この場合は id フィールド) に key_ というプレフィックスが付加されることを意味します。したがって、Flink テーブルのフィールド名は key_id となり、value_id と区別されます。

    SELECT * FROM kafka_table; クエリを実行すると、次の出力が返されます。

    key_id: 1,
    value_id: 100,
    name: flink

Kafka ソーステーブルからの読み取りで予期しないビジネスレイテンシーが発生した場合はどうすればよいですか?

  • 詳細

    Kafka ソーステーブルからデータを読み取ると、次の図に示すように、currentEmitEventTimeLag が 50 年以上になります。延迟

  • トラブルシューティング

    1. まず、ジョブが JAR ジョブか SQL ジョブかを確認する必要があります。

      ジョブが JAR ジョブの場合、Pom で使用される Kafka 依存関係が Realtime Compute for Apache Flink に組み込まれているかどうかも確認する必要があります。オープンソースの依存関係は曲線を報告しません。

    2. リアルタイムデータがアップストリーム Kafka トピックのすべてのパーティションに入っているかどうかを確認します。

    3. Kafka メッセージのメタデータタイムスタンプが 0 または null かどうかを確認します。

      Kafka ソースのレイテンシーは、Kafka メッセージのタイムスタンプを現在の時刻から引くことによって計算されます。メッセージにタイムスタンプがない場合、レイテンシーは 50 年以上と表示されます。次の方法でタイムスタンプを確認できます。

      • SQL ジョブの場合、メタデータ列を定義することでメッセージのタイムスタンプを取得できます。詳細については、「Kafka ソーステーブル」をご参照ください。

        CREATE TEMPORARY TABLE sk_flink_src_user_praise_rt (
            `timestamp` BIGINT ,
            `timestamp` TIMESTAMP METADATA,  -- メタデータタイムスタンプ。
            ts as to_timestamp (
              from_unixtime (`timestamp`, 'yyyy-MM-dd HH:mm:ss')
            ),
            watermark for ts as ts - interval '5' second
          ) WITH (
            'connector' = 'kafka',
            'topic' = '',
            'properties.bootstrap.servers' = '',
            'properties.group.id' = '',
            'format' = 'json',
            'scan.startup.mode' = 'latest-offset',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true'
          );
      • 簡単な Java プログラムを書いて、KafkaConsumer でメッセージを読み取り、テストすることができます。

エラー: 'upsert-kafka' テーブルには主キーが必要です

  • 詳細

    image.png

  • 原因

    DDL の定義時に主キーが指定されていませんでした。upsert-kafka が結果テーブルとして使用される場合、コネクタはアップストリームの計算ロジックによって生成された変更ログストリームを消費できます。コネクタは INSERT または UPDATE_AFTER データを Kafka に書き込み、DELETE データを空の値を持つメッセージとして書き込みます。これは、対応するキーのメッセージが削除されたことを示します。Flink は主キー列の値に基づいてデータをパーティション分割します。これにより、同じ主キー上のメッセージが順序付けられ、同じ主キーに対する更新または削除メッセージが同じパーティションに分類されることが保証されます。

  • 解決策

    DDL を定義するときに主キーを指定する必要があります。

DataHub トピックが分割またはスケールインされた後に失敗した Flink ジョブを回復するにはどうすればよいですか?

Flink が読み取っているトピックが分割またはスケールインされた場合、ジョブはエラーを報告し続け、自己回復できません。この場合、ジョブを停止してから再度開始して再起動し、正常な状態に復元する必要があります。

消費中の DataHub トピックを削除できますか?

いいえ、できません。消費中の DataHub トピックを削除または再作成することはできません。

endPoint および tunnelEndpoint パラメーターとは何ですか?また、それらが誤って構成された場合、何が起こりますか?

endPoint および tunnelEndpoint パラメーターの説明については、「Endpoint」をご参照ください。VPC 環境では、これら 2 つのパラメーターを誤って構成すると、次のようなジョブの例外が発生する可能性があります。

  • endPoint パラメーターが誤って構成されている場合、ジョブのデプロイメントは 91% の進行状況で停止します。

  • tunnelEndpoint パラメーターが誤って構成されている場合、ジョブの実行に失敗します。

完全および増分 MaxCompute ソーステーブルは、どのようにして MaxCompute からデータを読み取りますか?

完全および増分 MaxCompute ソーステーブルは、トンネルを介して MaxCompute からデータを読み取ります。読み取り速度は、MaxCompute トンネルの帯域幅によって制限されます。

MaxCompute がデータソースとして使用され、ジョブの開始後に既存のパーティションまたはテーブルにデータが追加された場合、新しいデータは完全または増分 MaxCompute ソーステーブルで読み取ることができますか?

いいえ、できません。Flink ジョブの開始後、ソースによって読み取り中または読み取り済みのテーブルまたはパーティションに新しいデータが追加された場合、このデータは読み取られません。これにより、ジョブがフェールオーバーする可能性もあります。

完全および増分 MaxCompute ソーステーブルは、どちらも ODPS DOWNLOAD SESSION を使用してテーブルデータまたはパーティションデータを読み取ります。新しい DOWNLOAD SESSION が作成されると、サーバーはインデックスファイルを作成します。これは、DOWNLOAD SESSION が作成された瞬間のデータのマッピングを作成することに相当します。後続のデータ読み取りは、このマッピングに基づいています。したがって、新しい DOWNLOAD SESSION が作成された後、MaxCompute テーブルまたはパーティションに追加されたデータは、通常のプロセスでは読み取られません。ただし、MaxCompute ソーステーブルに新しいデータが書き込まれると、2 種類のエラーが発生する可能性があります。

  • エラーが報告される: トンネルがデータを読み取っている間に新しいデータが書き込まれると、次のエラーが報告されます: ErrorCode=TableModified,ErrorMessage=The specified table has been modified since the download initiated.

  • データの正確性が保証されない: トンネルが閉じた後に新しいデータが書き込まれた場合、このデータは読み取られません。ただし、ジョブがフェールオーバーしたり、一時停止後に再開されたりすると、読み取り済みのデータが再読み取りされ、新しく書き込まれたデータが完全に読み取られない可能性があります。

完全または増分 MaxCompute ソーステーブルを使用するジョブを一時停止し、並列度を変更してからジョブを再開できますか?

はい、できます。useNewApi オプションが有効になっている MaxCompute ソーステーブル (デフォルト設定) の場合、ジョブを一時停止し、並列度を変更してから、ストリーミングモードでジョブを再開できます。MaxCompute ソーステーブルは、一致する複数のパーティションを順番に読み取ります。現在のパーティションを読み取るとき、パーティション内の異なる範囲のデータを各同時実行スレッドに割り当てます。並列度を変更しても、ジョブが一時停止される前に読み取られていたパーティションの並列度割り当て方法は変更されません。次のパーティションが読み取られるとき、各同時実行スレッドの読み取り範囲は新しい並列度に基づいて割り当てられます。したがって、単一の大きなパーティションを読み取るとき、並列度を上げてジョブを再起動した後、一部の MaxCompute オペレーターのみがデータを読み取る可能性があります。

useNewApi が false に設定されているジョブおよびバッチジョブの場合、並列度を変更することはできません。

開始オフセット (2019-10-11 00:00:00) より前のパーティションも、完全 MaxCompute ソーステーブルによって読み取られるのはなぜですか?

開始オフセットの設定は、DataHub などのメッセージキュータイプのデータソースに対してのみ有効です。MaxCompute ソーステーブルには有効ではありません。Flink ジョブの開始後のデータ読み取り範囲は次のとおりです。

  • パーティションテーブル: 現在のすべてのパーティションが読み取られます。

  • 非パーティションテーブル: 現在存在するデータが読み取られます。

増分 MaxCompute ソーステーブルが、すべてのデータが書き込まれる前に新しいパーティションを検出した場合はどうすればよいですか?

現在、パーティション内のデータが完全かどうかをマークするメカニズムはありません。新しいパーティションが検出される限り、パーティションは読み取られます。増分 MaxCompute ソーステーブルを使用して、パーティションキー列 ds を持つ MaxCompute パーティションテーブル T を読み取る場合、次の書き込み方法を使用することをお勧めします。パーティションを作成しないでください。まず、Insert overwrite table T partition (ds='20191010') ... 文を実行します。ジョブが正常に完了すると、パーティションとデータが同時に表示されます。

重要

次の書き込み方法は許可されていません。まず、パーティション (たとえば ds=20191010) を作成し、次にパーティションにデータを書き込みます。増分 MaxCompute ソーステーブルは新しいパーティション ds=20191010 を検出し、すぐにパーティションを読み取ります。この時点でパーティションにデータが書き込まれていない場合、データは失われます。

MaxCompute コネクタの実行時エラー: ErrorMessage=Authorization Failed [4019], You have NO privilege

  • エラー詳細

    ジョブの実行中に、フェールオーバーページまたは TaskManager.log にエラーが報告されます。エラーメッセージは次のとおりです。

    ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'
  • 原因

    MaxCompute DDL 定義で指定されたユーザー ID 情報を使用して MaxCompute にアクセスできません。

  • 解決策

    Alibaba Cloud アカウント、RAM ユーザー、または RAM ロールを使用してユーザー ID を認証できます。詳細については、「ユーザー認証」をご参照ください。

増分 MaxCompute ソーステーブルの startPartition パラメーターを指定するにはどうすればよいですか?

次の手順に従って startPartition パラメーターを指定できます。

ステップ

説明

1

各パーティションキー列名とそれに対応するパーティション値を等号 (=) で連結します。パーティション値は静的フィールドである必要があります。

パーティションキー列は dt です。パーティション値 20220901 からデータの読み取りを開始する必要があります。結果は dt=20220901 です。

2

ステップ 1 の結果をパーティションレベルの昇順にソートし、カンマ (,) で連結します。間にスペースを入れることはできません。このステップの結果が startPartition パラメーターの値になります。

説明

最初の数レベルのパーティションのみを指定できます。

  • ハッシュパーティションは dt のみです。dt=20220901 からデータの読み取りを開始するには、'startPartition' = 'dt=20220901' を指定します。

  • パーティションには 3 つのレベルがあります。ハッシュパーティションは dt、最初のサブパーティションは hh、2 番目のサブパーティションは mm です。dt=20220901、hh=08、mm=10 からデータの読み取りを開始するには、'startPartition' = 'dt=20220901,hh=08,mm=10' を指定します。

  • パーティションには 3 つのレベルがあります。ハッシュパーティションは dt、最初のサブパーティションは hh、2 番目のサブパーティションは mm です。dt=20220901 および hh=08 からデータの読み取りを開始するには、'startPartition' = 'dt=20220901,hh=08' を指定します。

システムがパーティションリストをロードするとき、すべてのパーティションを startPartition と辞書式順序で比較します。次に、システムは startPartition 以上のパーティションをロードします。たとえば、増分 MaxCompute パーティションテーブルには、ハッシュパーティション ds とサブパーティション type の 2 つのパーティションキー列があります。たとえば、テーブルに次の 6 つのパーティションがあるとします。

  • ds=20191201,type=a

  • ds=20191201,type=b

  • ds=20191202,type=a

  • ds=20191202,type=b

  • ds=20191202,type=c

  • ds=20191203,type=a

startPartition の値が ds=20191202 の場合、ds=20191202,type=a、ds=20191202,type=b、ds=20191202,type=c、および ds=20191203,type=a の 4 つのパーティションが読み取られます。startPartition の値が ds=20191202,type=b の場合、ds=20191202,type=b、ds=20191202,type=c、および ds=20191203,type=a の 3 つのパーティションが読み取られます。

説明

startPartition で指定されたパーティションは存在する必要はありません。startPartition 以上の辞書式順序のパーティションはすべて読み取られます。

増分 MaxCompute ソーステーブルを使用するジョブが、開始後長時間データ読み取りを開始しないのはなぜですか?

この問題は、startPartition 以上の辞書式順序の既存のパーティションが多すぎるか、これらのパーティションに存在する小さいファイルが多すぎるために発生します。増分 MaxCompute ソーステーブルは、データの読み取りを開始する前に、まず適格な既存のパーティションの情報を整理する必要があります。したがって、次の操作を実行することをお勧めします。

  • 過剰な量の既存データを読み取らないでください。

    説明

    既存データを処理するには、MaxCompute ソーステーブルを持つバッチジョブを実行できます。

  • 既存データ内の小さいファイルの数を減らします。

パーティションからの読み取りまたはパーティションへの書き込み時に partition パラメーターを指定するにはどうすればよいですか?

パーティションの読み取り

  • 固定パーティションの読み取り

    ソーステーブルまたはディメンションテーブルが固定パーティションを読み取る必要がある場合、次の手順に従って partition パラメーターを指定できます。

    ステップ

    説明

    1

    • ディメンションテーブルの場合、各パーティションキー列名とそれに対応するパーティション値を等号 (=) で連結します。パーティション値は静的フィールドです。

    • ソーステーブルの場合、各パーティションキー列名とそれに対応するパーティション値を等号 (=) で連結します。パーティション値は、静的フィールドまたはワイルドカード文字 (*) を含む値にすることができます。ワイルドカード文字は、任意の文字列 (空の文字列を含む) に一致します。

    • パーティションキー列は dt です。パーティション値が 20220901 のデータを読み取るには、結果は dt=20220901 です。

    • パーティションキー列は dt です。202209 で始まるパーティション値を持つデータを読み取るには、結果は dt=202209* です (ソーステーブルのみ)。

    • パーティションキー列は dt です。2022 で始まり 01 で終わるパーティション値を持つデータを読み取るには、結果は dt=2022*01 です (ソーステーブルのみ)。

    • パーティションキー列は dt です。すべてのパーティションからデータを読み取るには、結果は dt=* です (ソーステーブルのみ)。

    2

    ステップ 1 の結果をパーティションレベルの昇順にソートし、カンマ (,) で連結します。間にスペースを入れることはできません。このステップの結果が partition パラメーターの値になります。

    最初の数レベルのパーティションのみを指定できます。

    • ハッシュパーティションは dt のみです。dt=20220901 からデータを読み取るには、'partition' = 'dt=20220901' を指定します。

    • パーティションには 3 つのレベルがあります。ハッシュパーティションは dt、最初のサブパーティションは hh、2 番目のサブパーティションは mm です。dt=20220901、hh=08、mm=10 からデータを読み取るには、'partition' = 'dt=20220901,hh=08,mm=10' を指定します。

    • パーティションには 3 つのレベルがあります。ハッシュパーティションは dt、最初のサブパーティションは hh、2 番目のサブパーティションは mm です。dt=20220901 と任意の hh からデータを読み取るには、'partition' = 'dt=20220901,hh=08' または 'partition' = 'dt=20220901,hh=08,mm=*' を指定します。

    • パーティションには 3 つのレベルがあります。ハッシュパーティションは dt、最初のサブパーティションは hh、2 番目のサブパーティションは mm です。dt=20220901、任意の hh、および mm=10 からデータを読み取るには、'partition' = 'dt=20220901,hh=*,mm=10' を指定します。

    上記の手順でパーティションのフィルタリング要件を満たせない場合は、SQL 文の WHERE 句にフィルター条件を記述し、SQL オプティマイザーのパーティションプッシュダウン機能を使用してパーティションをフィルタリングすることもできます。パーティションには 2 つのレベルがあります。ハッシュパーティションは dt、サブパーティションは hh です。dt が 20220901 以上 20220903 以下、hh が 09 以上 17 以下のパーティションを読み取るには、次の SQL コードを使用できます。

    CREATE TABLE maxcompute_table (
      content VARCHAR,
      dt VARCHAR,
      hh VARCHAR
    ) PARTITIONED BY (dt, hh) WITH ( 
       -- PARTITIONED BY でパーティションキー列を指定する必要があります。そうしないと、SQL オプティマイザーのパーティションプッシュダウン機能が有効にならず、実行効率に影響します。
      'connector' = 'odps',
      ... -- accessId などの必須パラメーターを指定します。SQL オプティマイザーがフィルタリングを実行するため、partition パラメーターは指定しなくてもかまいません。
    );
    
    SELECT content, dt, hh FROM maxcompute_table
    WHERE dt >= '20220901' AND dt <= '20220903' AND hh >= '09' AND hh <= '17'; -- WHERE 句でパーティションフィルター条件を指定します。
  • 辞書式順序で最大のパーティションを読み取る

    • ソーステーブルまたはディメンションテーブルが辞書式順序で最大のパーティションを読み取る必要がある場合、partition パラメーターを 'partition' = 'max_pt()' に設定する必要があります。

    • ソーステーブルまたはディメンションテーブルが辞書式順序で最大の 2 つのパーティションを読み取る必要がある場合、partition パラメーターを 'partition' = 'max_two_pt()' に設定する必要があります。

    • ソーステーブルまたはディメンションテーブルが、.done ファイルを伴う辞書式順序で最大のパーティションを読み取る必要がある場合、partition パラメーターを 'partition' = 'max_pt_with_done()' に設定する必要があります。

    ほとんどのシナリオでは、辞書式順序で最大のパーティションは、最も新しく生成されたパーティションでもあります。場合によっては、最新のパーティションのデータが準備できておらず、ディメンションテーブルに一時的に古いパーティションからデータを読み取らせたいことがあります。このような場合は、max_pt_with_done() パーティションパラメーター値を使用できます。

    パーティション内のデータが準備できたら、同時に空のパーティションを作成する必要があります。このパーティションの名前は、対応するデータパーティションの名前に .done を付けたものです。たとえば、パーティション dt=20220901 のデータが準備できたら、同時に空のパーティション dt=20220901.done を作成する必要があります。max_pt_with_done() パーティションパラメーター値を設定すると、ディメンションテーブルはデータパーティションと .done パーティションの両方が存在するパーティションのみを読み取ります。.done パーティションのないデータパーティションは読み取られません。詳細については、「max_pt() と max_pt_with_done() の違いは何ですか?」をご参照ください。

    説明

    ソーステーブルは、ジョブの開始時にのみ辞書式順序で最大のパーティションを取得します。ジョブはすべてのデータが読み取られた後に終了し、新しいパーティションを監視しません。新しいパーティションを継続的に読み取るには、増分ソーステーブルモードを使用できます。ディメンションテーブルは、更新のたびに最新のパーティションをチェックし、最新のデータを読み取ります。

パーティションへの書き込み

  • 固定パーティションへの書き込み

    結果テーブルが固定パーティションにデータを書き込む必要がある場合、固定パーティションの読み取りと同じ方法で partition パラメーターを指定できます。

    重要

    結果テーブルの partition パラメーターは、ワイルドカード文字 (*) をサポートしていません。

  • 動的パーティションへの書き込み

    結果テーブルが、書き込まれたデータのパーティションキー列の特定の値に基づいて対応するパーティションにデータを書き込む必要がある場合、パーティションキー列名をパーティションレベルの昇順にソートし、カンマ (,) で連結する必要があります。間にスペースを入れることはできません。結果は partition パラメーターの値になります。たとえば、ハッシュパーティションが dt、最初のサブパーティションが hh、2 番目のサブパーティションが mm の 3 つのレベルのパーティションがある場合、'partition' = 'dt,hh,mm' を指定できます。

MaxCompute ソーステーブルを使用するジョブが、開始後長時間開始状態のままになったり、データ生成が遅れたりするのはなぜですか?

理由は次のとおりです。

  • MaxCompute テーブルに存在する小さいファイルが多すぎます。

  • MaxCompute ストレージクラスターと Flink 計算クラスターが同じリージョンにありません。これにより、ネットワーク通信時間が長くなります。ストレージクラスターと計算クラスターに同じリージョンを使用して、再試行することをお勧めします。

  • MaxCompute の権限が正しく設定されていません。ソーステーブルを読み取るには、MaxCompute テーブルのダウンロード権限が必要です。

データトンネルを選択するにはどうすればよいですか?

MaxCompute は、Batch Tunnel と Streaming Tunnel の 2 つのデータトンネルを提供します。一貫性とパフォーマンスの要件に基づいて、異なるデータトンネルを選択できます。2 つのデータトンネルの違いは次のとおりです。

要件

Batch Tunnel

Streaming Tunnel

一貫性

Streaming Tunnel と比較して、この方法はほとんどの場合、MaxCompute テーブルにデータをより安定して書き込むことができ、データが失われないこと (at-least-once セマンティクス) を保証します。

チェックポイントプロセス中に例外が発生し、ジョブが同時に複数のパーティションに書き込む場合にのみ、一部のパーティションで重複データが生成される可能性があります。

データが失われないこと (at-least-once セマンティクス) を保証します。どのような状況でもジョブで例外が発生した場合、重複データが生成される可能性があります。

パフォーマンス

チェックポイントプロセス中のデータ送信やサーバー上のファイル作成などの操作のため、全体的な効率は Streaming Tunnel よりも低くなります。

チェックポイントプロセス中にデータをコミットする必要はありません。Streaming Tunnel を使用し、numFlushThreads の値を 1 より大きく設定すると、データフラッシュプロセス中にアップストリームデータを継続的に受信できます。全体的な効率は Batch Tunnel よりも高くなります。

説明

現在 MaxCompute Batch Tunnel を使用しているジョブで、チェックポイントプロセスが非常に遅い、またはタイムアウトし、ダウンストリームが重複データを受け入れることを確認した場合は、MaxCompute Stream Tunnel の使用を検討できます。

MaxCompute 結果テーブルにデータを書き込む際に重複データを処理するにはどうすればよいですか?

Flink ジョブが MaxCompute コネクタを介して MaxCompute にデータを書き込んでいるときに重複データが発生した場合、次の観点から問題をトラブルシューティングできます。

  • ジョブログをチェックします。MaxCompute 結果テーブルで PRIMARY KEY 制約が宣言されていても、Flink は外部ストレージにデータを書き込むときに主キーの一意性チェックを実行せず、MaxCompute の非トランザクションテーブルは PRIMARY KEY 制約をサポートしていません。Flink ジョブログが重複データを計算する場合、重複データは MaxCompute テーブルに表示されます。

  • 複数の Flink ジョブが同時に同じ MaxCompute テーブルに書き込んでいるかどうかを確認します。前述のように、MaxCompute は PRIMARY KEY 制約をサポートしていません。複数の Flink ジョブが同じ結果を計算する場合、結果は MaxCompute テーブルで重複します。

  • Batch Tunnel を使用する場合、チェックポイント中に Flink ジョブが失敗します。チェックポイントが失敗した場合、MaxCompute 結果テーブルはすでにデータをサーバーに送信している可能性があります。したがって、ジョブが前のチェックポイントから再開されると、2 つのチェックポイント間のデータが重複する可能性があります。

  • Stream Tunnel を使用する場合、Flink ジョブのフェールオーバーが発生します。Stream Tunnel を有効にして MaxCompute にデータを書き込むと、データはチェックポイント間で MaxCompute サーバーに送信されます。したがって、ジョブがフェールオーバーして最新のチェックポイントから再開すると、最新のチェックポイントの完了とジョブのフェールオーバーの間のデータが重複する可能性があります。詳細については、「データトンネルを選択するにはどうすればよいですか?」をご参照ください。この場合、Batch Tunnel モードに切り替えて、この状況で生成される重複データを回避できます。

  • Batch Tunnel を使用する場合、Flink ジョブはフェールオーバーするか、キャンセルされた後に開始されます (たとえば、オートパイロットチューニングによってトリガーされます)。vvr-6.0.7-flink-1.15 より前のバージョンでは、MaxCompute 結果テーブルは閉じるときにデータを送信します。したがって、Flink ジョブが停止して前のチェックポイントから再開すると、チェックポイントとジョブの停止の間のデータが重複する可能性があります。この問題を解決するには、Flink のバージョンを vvr-6.0.7-flink-1.15 以降にアップグレードできます。

MaxCompute 結果テーブルを使用するジョブが、実行時に「Invalid partition spec」エラーを報告します。どうすればよいですか?

  • 原因: MaxCompute に書き込まれるデータのパーティションキー列の値が無効です。無効な値には、空の文字列、null 値、等号 (=)、カンマ (,)、またはスラッシュ (/) を含む値が含まれます。

  • 解決策: 無効なデータを確認できます。

MaxCompute 結果テーブルを使用するジョブが、実行時に「No more available blockId」エラーを報告します。どうすればよいですか?

  • 原因: MaxCompute 結果テーブルに書き込まれるブロックの数が制限を超えています。これは、毎回フラッシュされるデータ量が少なすぎて、フラッシュが頻繁すぎることを示しています。

  • 解決策: batchSize および flushIntervalMs パラメーターの値を調整することをお勧めします。

ディメンションテーブルに SHUFFLE_HASH ヒントを使用するにはどうすればよいですか?

デフォルトでは、各同時実行スレッドはディメンションテーブル全体の情報を格納します。ディメンションテーブルのデータ量が多い場合は、SHUFFLE_HASH ヒントを使用してディメンションテーブルのデータを各同時実行スレッドに均等に分散させることができます。これにより、JVM ヒープメモリの消費が削減されます。次の例では、ディメンションテーブル dim_1 と dim_3 のデータは各同時実行スレッドに分散されますが、ディメンションテーブル dim_2 のデータは引き続き各同時実行スレッドに完全にキャッシュされます。

-- ソーステーブルと 3 つのディメンションテーブルを作成します。
CREATE TABLE source_table (k VARCHAR, v VARCHAR) WITH ( ... );
CREATE TABLE dim_1 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_2 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_3 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );

-- SHUFFLE_HASH ヒントに、データを分散させる必要があるディメンションテーブルの名前を記述します。
SELECT /*+ SHUFFLE_HASH(dim_1), SHUFFLE_HASH(dim_3) */
k, s.v, d1.v, d2.v, d3.v
FROM source_table AS s
INNER JOIN dim_1 FOR SYSTEM_TIME AS OF PROCTIME() AS d1 ON s.k = d1.k
LEFT JOIN dim_2 FOR SYSTEM_TIME AS OF PROCTIME() AS d2 ON s.k = d2.k
LEFT JOIN dim_3 FOR SYSTEM_TIME AS OF PROCTIME() AS d3 ON s.k = d3.k;

CacheReloadTimeBlackList パラメーターを指定するにはどうすればよいですか?

毎日固定された時間帯にディメンションテーブルの更新を禁止する期間。

  • データの型: String

  • -> を使用して開始時刻と終了時刻を連結します。

  • , を使用して複数の期間を区切ります。

  • 時間フォーマット: YYYY-MM-DD HH:mm (時と分のみを指定した場合、デフォルトで毎日になります)。

'cacheReloadTimeBlackList' = '14:00 -> 15:00,23:00 -> 01:00'

サンプルシナリオ

サンプル値

単一の期間

14:00 -> 15:00

複数の期間

14:00 -> 15:00,23:00 -> 01:00

特別な期間

14:00 -> 15:00, 23:00 -> 01:00,2025-10-01 22:00 -> 2025-10-01 23:00

エラー: java.io.EOFException: SSL peer shut down incorrectly

  • エラー詳細

    Caused by: java.io.EOFException: SSL peer shut down incorrectly
        at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302]
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302]
        at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?]
        at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?]
        at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?]
        at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?]
        at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?]
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?]
        ... 14 more
  • 原因

    このエラーは通常、MySQL データベースで SSL プロトコルが有効になっているが、クライアントが SSL 接続用に正しく構成されていない場合に発生します。たとえば、MySQL ドライバーのバージョンが 8.0.27 の場合、MySQL データベースでは SSL プロトコルが有効になっていますが、デフォルトのアクセス方法では SSL を介してデータベースに接続しません。これによりエラーが発生します。

  • 解決策

    WITH パラメーターでコネクタを RDS に設定し、MySQL ディメンションテーブルの URL パラメーターに characterEncoding=utf-8&useSSL=false を追加することをお勧めします。例:

    'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'

MySQL テーブルの主キーが、Flink カタログにテーブルが登録されると BIGINT UNSIGNED から DECIMAL に変更され、CTAS 文を使用して Hologres に同期された後、TEXT に変更されるのはなぜですか?

この問題は、Flink が BIGINT UNSIGNED をサポートしていないために発生します。Flink は、値の範囲を考慮して、MySQL の BIGINT UNSIGNED 主キーを DECIMAL 型として識別します。データが Hologres に同期されると、Hologres は BIGINT UNSIGNED をサポートしておらず、DECIMAL 型を主キーとして使用することをサポートしていないため、システムは主キーを自動的に TEXT 型に変換します。

開発および設計中にこの仕様に基づいて調整することをお勧めします。この列を DECIMAL 型のままにしたい場合は、事前に Hologres でテーブルを手動で作成し、他のフィールドを主キーとして設定するか、主キーを設定しないことができます。ただし、主キーが異なるか欠落しているとデータの一意性に影響するため、データの重複問題が発生する可能性があります。したがって、たとえば、ある程度のデータ重複を許容したり、重複排除ロジックを追加したりするなど、アプリケーション層でこの問題を解決する必要があります。

Flink が RDS テーブルにデータを書き込むとき、主キーに基づいてレコードを更新しますか、それとも新しいレコードを挿入しますか?

DDL で主キーが定義されている場合、INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...; メソッドを使用してレコードを更新します。存在しない主キーフィールドの場合、レコードは直接挿入されます。既存の主キーフィールドの場合、対応する値が更新されます。DDL で PRIMARY KEY が宣言されていない場合、insert into メソッドを使用してレコードを挿入し、データを追加します。

RDS テーブルの一意なインデックスを GROUP BY 句で使用する場合、何を知っておく必要がありますか?

  • ジョブの GROUP BY 句で一意なインデックスを宣言する必要があります。

  • RDS に自動インクリメントの主キーが 1 つしか存在しない場合、Flink ジョブでそれを PRIMARY KEY として宣言することはできません。

RDS for MySQL および AnalyticDB for MySQL を含む MySQL 物理テーブルの INT UNSIGNED フィールド型が、Flink SQL で異なる型として宣言されるのはなぜですか?

この問題は、MySQL JDBC ドライバーが精度の問題により、データを運ぶために異なるデータ型を使用するために発生します。具体的には、MySQL の INT UNSIGNED 型の場合、Java では LONG 型を使用してデータを運び、これは Flink SQL の BIGINT に対応します。MySQL の BIGINT UNSIGNED 型の場合、Java では BIGINTEGER 型を使用してデータを運び、これは Flink SQL の DECIMAL(20, 0) に対応します。

エラー: Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1

  • エラー詳細

    Caused by: java.sql.BatchUpdateException: Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1
    at sun.reflect.GeneratedConstructorAccessor59.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
    at com.mysql.cj.util.Util.getInstance(Util.java:167)
    at com.mysql.cj.util.Util.getInstance(Util.java:174)
    at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
    at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
    at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeBatch(DruidPooledPreparedStatement.java:565)
    at com.alibaba.ververica.connectors.rds.sink.RdsOutputFormat.executeSql(RdsOutputFormat.java:488)
    ... 15 more
  • 原因

    データに特殊文字またはエンコード形式が含まれているため、データベースのエンコーディングが正しく解析されません。

  • 解決策

    JDBC を使用して MySQL データベースに接続する場合、URL アドレスの末尾に UTF-8 を追加できます。たとえば、jdbc:mysql://<internal_endpoint>/<databaseName>?characterEncoding=UTF-8 です。

MySQL (TDDL/RDS) にデータを書き込むときにデッドロックが発生した場合はどうすればよいですか?

  • 詳細

    MySQL (TDDL/RDS) にデータを書き込むときにデッドロックが発生します。

    重要

    Realtime Compute for Apache Flink では、ダウンストリームデータベースが MySQL などのリレーショナルデータベースであり、対応するコネクタが TDDL/RDS である場合、Realtime Compute for Apache Flink がテーブルまたはリソースに頻繁に書き込むとデッドロックが発生する可能性があります。

    次の例は、デッドロックがどのように形成されるかを示しています。

    INSERT 操作には、(A,B) の 2 つのロックを順番に取得する必要があるとします。A は範囲ロックです。2 つのトランザクション (T1,T2) があり、テーブルスキーマは (id(自動インクリメント主キー), nid(一意キー)) です。T1 には 2 つの `insert` 文 insert(null,2),(null,1) が含まれ、T2 には 1 つの `insert` 文 insert(null,2) が含まれています。

    1. 時刻 t に、T1 の最初の INSERT が実行されます。このとき、T1 は 2 つのロック (A,B) を保持しています。

    2. 時刻 t+1 に、T2 が挿入を開始します。(-inf,2] をロックするためにロック A を待つ必要があります。このとき、A は T1 によって保持されており、(-inf,2] をロックしています。間隔には包含関係があるため、T2 は T1 が A を解放するのに依存します。

    3. 時刻 t+2 に、T1 の 2 番目の INSERT が実行されます。(-inf,1] をロックするために A が必要です。この間隔は (-inf,2] に属しているため、T2 がロックを解放するのを待つ必要があります。したがって、T1 は T2 が A を解放するのに依存します。

    T1 と T2 が互いに依存し、待機するため、デッドロックが形成されます。

  • RDS/TDDL と Tablestore データベースエンジンのロックの違い

    • RDS/TDDL: InnoDB では、行ロックは個々のレコードではなく、インデックスに配置されます。したがって、同じインデックスキーを使用して異なる行にアクセスすると、ロック競合が発生し、エリア全体のデータが更新できなくなります。

    • OTS: 単一行ロックを採用しており、他のデータ更新には影響しません。

  • デッドロックの解決策

    高いクエリ/秒 (QPS)、トランザクション数/秒 (TPS)、または高同時実行書き込みシナリオでは、デッドロック問題を解決するために Tablestore を結果テーブルとして使用することをお勧めします。一般的に、Flink ジョブの結果テーブルとして TDDL または RDS を使用することはお勧めしません。

    MySQL などのリレーショナルデータベースを結果ノードとして使用する必要がある場合は、次の提案を検討することをお勧めします。

    • 他の読み取りまたは書き込みビジネス関係者からの干渉がないことを確認してください。

    • ジョブのデータ量が大きくない場合は、単一並列度での書き込みを試すことができます。ただし、高 QPS/TPS および高同時実行状況では、書き込みパフォーマンスが低下します。

    • 一意キーを使用しないようにしてください。一意キーを持つテーブルへの書き込みは、デッドロックを引き起こす可能性があります。ビジネスでテーブルに一意キーを含める必要がある場合は、フィールドを識別能力の降順に並べて一意キーを定義できます。これにより、デッドロックの可能性を大幅に減らすことができます。たとえば、day_time(20171010) の前に MD5 関数を配置して、フィールドを識別能力の降順に並べて一意キーを定義できます。これは、デッドロック問題の解決に役立ちます。

    • ビジネス特性に基づいてシャーディングを実行して、可能な限り単一テーブルへの書き込みを回避できます。実装の詳細については、対応するデータベース管理者にお問い合わせください。

対応する MySQL テーブルのスキーマが更新された後、ダウンストリームテーブルのスキーマが変更されないのはなぜですか?

スキーマ変更の同期は、特定の DDL を認識しません。代わりに、2 つの連続するデータレコード間のスキーマ変更をキャプチャします。DDL 変更のみが発生し、アップストリームに新しいデータまたはデータ変更が存在しない場合、ダウンストリームのデータ変更はトリガーされません。詳細については、「スキーマ変更の同期ポリシー」をご参照ください。

ソースで「finish split response timeout」例外が発生する原因は何ですか?

この例外は、タスクの CPU 使用率が高いために発生し、タスクが Coordinator の RPC リクエストに時間内に応答できなくなります。この場合、リソース構成ページで TaskManager の CPU リソースを増やす必要があります。

MySQL CDC の完全データ同期フェーズ中にスキーマが変更されると、どのような影響がありますか?

完全データ同期フェーズ中にスキーマ変更が発生した場合、ジョブがエラーを報告したり、スキーマ変更の同期に失敗したりする可能性があります。この場合、ジョブを停止し、同期されたダウンストリームテーブルを削除し、状態なしでジョブを開始する必要があります。

CTAS/CDAS 同期中にサポートされていないスキーマ変更が発生し、ジョブが失敗した場合はどうすればよいですか?

テーブルのデータを再同期する必要があります。これを行うには、ジョブを停止し、ダウンストリームテーブルを削除し、状態なしで同期ジョブを再起動します。この互換性のない変更は避けてください。そうしないと、ジョブは再起動後もエラーを報告し、同期に失敗します。スキーマ変更のサポートの詳細については、「CREATE TABLE AS (CTAS) 文」をご参照ください。

ClickHouse 結果テーブルはデータの取り消しと更新をサポートしていますか?

はい、サポートしています。Flink 結果テーブルの DDL で主キーが指定され、ignoreDelete パラメーターが false に設定されている場合、データの取り消しと更新はサポートされますが、パフォーマンスは大幅に低下します。

これは、ClickHouse がオンライン分析処理 (OLAP) 用の列指向 DBMS であり、UPDATE と DELETE のサポートが完全ではないためです。Flink DDL で主キーが指定されている場合、Flink は ALTER TABLE UPDATEALTER TABLE DELETE を使用してデータを更新および削除しようとしますが、これは非常に非効率的です。

ClickHouse の結果テーブルに書き込まれたデータはいつ表示されますか?

  • 1 回限りのセマンティクスが有効になっていない ClickHouse 結果テーブル (デフォルト設定) の場合、キャッシュ内のデータレコード数が batchSize パラメーターの値に達するか、待機時間が flushIntervalMs を超えると、システムはキャッシュ内のデータを自動的に ClickHouse テーブルに書き込みます。このとき、ClickHouse の結果テーブルに書き込まれたデータを確認できます。チェックポイントが成功するのを待つ必要はありません。

  • 1 回限りのセマンティクスが有効になっている ClickHouse 結果テーブルの場合、ClickHouse の結果テーブルに書き込まれたデータを確認する前に、チェックポイントが成功するのを待つ必要があります。

コンソールで print データの出力を表示するにはどうすればよいですか?

次の 2 つの方法のいずれかを使用して、print データの結果を表示できます。

  • Realtime Compute の開発コンソールで結果を表示します。

    1. Realtime Compute 開発コンソールの左側のナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。

    2. 対象のジョブの名前をクリックします。

    3. [ジョブログ] をクリックします。

    4. [実行時ログ] タブの [ジョブ] ドロップダウンリストから、実行中のジョブを選択します。

      查看启动和运行日志2.jpg

    5. [実行中のタスクマネージャー] タブの [パス、ID] をクリックします。

      修改运行作业2.jpg

    6. [ログ] をクリックして、print の出力を表示します。

  • Flink UI で結果を表示します。

    1. Realtime Compute の開発コンソールの左側のナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。

    2. 対象のジョブの名前をクリックします。

    3. 対象のジョブの [ステータス概要] タブで [Flink UI] をクリックします。

      上下游存储.jpg

    4. [タスクマネージャー] をクリックします。

    5. [パス、ID] をクリックします。

    6. [ログ] タブで、print の結果を表示できます。

ディメンションテーブルでの JOIN 操作がデータを返さない場合はどうすればよいですか?

DDL 文のスキーマの型と名前が物理テーブルと一致しているかどうかを確認できます。

max_pt() と max_pt_with_done() の違いは何ですか?

max_pt() は、すべてのパーティションの中で辞書式順序で最大のパーティションを選択します。max_pt_with_done() は、.done パーティションを伴うすべてのパーティションの中で辞書式順序で最大のパーティションを選択します。パーティション列が次のように表される場合:

  • ds=20190101

  • ds=20190101.done

  • ds=20190102

  • ds=20190102.done

  • ds=20190103

max_pt()max_pt_with_done() の違いは次のとおりです。

  • `partition`='max_pt_with_done()' はパーティション ds=20190102 に一致します。

  • `partition`='max_pt()' はパーティション ds=20190103 に一致します。

Paimon にデータを書き込むジョブが「Heartbeat of TaskManager timed out」エラーを報告します。どうすればよいですか?

このエラーの最も可能性の高い原因は、TaskManager のヒープメモリが不足していることです。Paimon は、主に次の方法でヒープメモリを使用します。

  • Paimon 主キーテーブルの writer オペレーターの各同時実行スレッドには、ソート用のメモリバッファーがあります。このバッファーのサイズは write-buffer-size テーブルパラメーターによって制御され、デフォルト値は 256 MB です。

  • Paimon はデフォルトで ORC ファイル形式を使用するため、メモリ内のデータをバッチで列のストア形式に変換するために別のメモリバッファーが必要です。このバッファーのサイズは orc.write.batch-size テーブルパラメーターによって制御され、デフォルト値は 1024 です。これは、バッファーがデフォルトで 1,024 行のデータを保存することを意味します。

  • 変更された各バケットには、そのバケットの書き込みデータを処理するための専用の writer オブジェクトがあります。

ヒープメモリの使用方法に基づいて、ヒープメモリ不足の考えられる原因と対応する解決策は次のとおりです。

  • write-buffer-size の値が大きすぎます。

    このパラメーターを適切に減らすことができます。ただし、バッファーが小さすぎると、ディスクへの書き込みが頻繁になり、小さいファイルのマージ頻度も増加します。これは書き込みパフォーマンスに影響します。

  • 単一のデータのサイズが大きすぎます。

    たとえば、データに 4 MB の JSON フィールドが含まれている場合、ORC バッファーサイズは 4 MB × 1,024 = 4 GB に達し、大量のヒープメモリを占有します。次のいずれかの解決策を使用できます。

    • orc.write.batch-size の値を減らします。

    • Paimon 結果テーブルでアドホッククエリ (OLAP) を実行する必要がなく、バッチまたはストリーム消費のみが必要な場合は、テーブルを作成するときに 'file.format' = 'avro' および 'metadata.stats-mode' = 'none' テーブルパラメーターを設定して、AVRO 形式を使用し、統計情報情報の収集を無効にすることができます。

      説明

      パラメーターはテーブルの作成時にのみ設定でき、テーブルの作成後に ALTER TABLE 文または SQL Hint を使用して変更することはできません。

  • 同時に書き込まれるパーティションの数が多すぎるか、各パーティションのバケット数が多すぎます。これにより、作成される writer オブジェクトが多すぎます。

    パーティションキー列の設定が妥当かどうか、誤った SQL 書き込みによって他のデータがパーティションキー列に書き込まれていないか、バケット数が妥当かどうかを確認できます。各バケットの合計データ量は約 2 GB、最大で 5 GB 以下にすることをお勧めします。バケット数の調整方法については、「固定バケットテーブルのバケット数を調整する」をご参照ください。

Paimon にデータを書き込むジョブが「Sink materializer must not be used with Paimon sink」エラーを報告します。どうすればよいですか?

sink materializer オペレーターは、もともとストリームジョブのカスケード JOIN によって引き起こされるデータ順序の乱れの問題を解決するために使用されていました。ただし、Paimon テーブルにデータを書き込む場合、このオペレーターは追加のオーバーヘッドを発生させるだけでなく、Aggregation データマージメカニズムを使用すると誤った計算結果につながる可能性があります。したがって、Paimon テーブルに書き込むジョブでは sink materializer オペレーターを使用できません。

SET 文を使用するか、実行時パラメーターを構成して、table.exec.sink.upsert-materialize パラメーターを false に設定することで、sink materializer オペレーターを無効にできます。カスケード JOIN によって引き起こされるデータ順序の乱れの問題も解決する必要がある場合は、「順序が乱れたデータを処理する」をご参照ください。

Paimon にデータを書き込むジョブが「File deletion conflicts detected」または「LSM conflicts detected」エラーを報告します。どうすればよいですか?

このエラーの考えられる原因は次のとおりです。

  • 複数のジョブが同時に同じ Paimon テーブルの同じパーティションに書き込んでいます。この場合、Paimon テーブルは失敗して再起動することで競合を解決する必要があり、これは正常な現象です。エラーが再発しない場合は、何もする必要はありません。

  • ジョブは古い既存の状態から再開されます。この場合、エラーが再発します。最新の状態からジョブを再開するか、状態なしでジョブを開始する必要があります。

  • 同じジョブで、複数の INSERT 文を使用して同じ Paimon テーブルに書き込みます。Paimon は現在、同じジョブ内の複数の INSERT 文による個別の書き込みをサポートしていません。UNION ALL 文を使用して、複数のデータストリームを Paimon テーブルに書き込むことができます。

  • Append Scalable テーブルにデータを書き込む際の Global Committer ノードまたは Compaction Coordinator ノードの並列度が 1 より大きい。これら 2 つのノードの並列度は 1 でなければなりません。そうしないと、データ整合性が保証されません。

Paimon からデータを読み取るジョブが「File xxx not found, Possible causes」エラーを報告します。どうすればよいですか?

Paimon テーブルの消費はスナップショットファイルに依存します。スナップショットの有効期限が短すぎるか、消費ジョブが非効率的である場合、消費中のスナップショットファイルは有効期限切れのために削除され、消費ジョブはエラーを報告します。

スナップショットファイルの有効期限を調整するコンシューマー ID を指定する、または消費ジョブを最適化することを検討できます。現在利用可能なスナップショットファイルと各スナップショットファイルの作成時刻を照会するには、「Snapshots システムテーブル」をご参照ください。

Paimon ジョブが「No space left on device」エラーを報告します。どうすればよいですか?

  • ジョブに Paimon クエリプロセスが含まれる場合 (たとえば、Paimon テーブルをディメンションテーブルとして使用したり、Paimon テーブルに書き込むときに changelog-producer=lookup を使用したりする場合)、SQL Hints を使用して次のパラメーターを設定できます。これにより、クエリプロセス中に占有される最大ディスク領域とキャッシュの有効期限が制限され、過剰なキャッシュファイルによるディスク領域不足を回避できます。

    • lookup.cache-max-disk-size: クエリキャッシュが使用できる最大ローカルディスク領域。256 MB、512 MB、1 GB などの値を指定することをお勧めします。

    • lookup.cache-file-retention: クエリキャッシュの有効期限。30 分、15 分以下の時間を指定することをお勧めします。

  • Paimon テーブルに書き込むジョブの場合、SQL Hints を使用して次のパラメーターを設定し、書き込みプロセス中のローカル一時ファイルの合計サイズを制限できます。これにより、過剰な一時ファイルによるディスク領域不足を回避できます。

    • write-buffer-spillable: 書き込みデータキャッシュをディスクにスピルするためのグローバルスイッチ。このパラメーターを false に設定すると、このキャッシュによってすべてのディスク領域が占有されるのを防ぐことができます。

    • write-buffer-spill.max-disk-size: 書き込みデータキャッシュがディスクにスピルできる最大領域。256 MB、512 MB、1 GB などの値を指定することをお勧めします。

OSS に多数の Paimon ファイルがある場合はどうすればよいですか?

  • Paimon は、テーブルの既存状態へのアクセスをサポートするために、いくつかの既存データファイルを保存します。既存データファイルのデータ保持ポリシーを調整できます。詳細については、「期限切れデータをクリーンアップする」をご参照ください。

  • 不合理なパーティションキー列の設定や多すぎるバケットも、この現象を引き起こす可能性があります。各バケットの合計データ量は約 2 GB、最大で 5 GB 以下にすることをお勧めします。詳細については、「バケット化メソッド」をご参照ください。

  • デフォルトでは、データファイルは ORC 形式で保存されます。テーブルを作成するときにテーブルパラメーター 'file.compression' = 'zstd' を設定して、ZSTD 圧縮形式を使用してデータファイルの合計サイズを削減できます。

    説明

    パラメーターはテーブルの作成時にのみ設定でき、テーブルの作成後に ALTER TABLE 文または SQL Hint を使用して変更することはできません。

Paimon コネクタのデータ可視性と Checkpoint 間隔の間に関係はありますか?

はい、あります。Paimon の 1 回限りのセマンティクスは、保証を確保するためにシステムチェックポイントに依存しています。Paimon は各チェックポイントでのみデータをコミットし、これによりこのデータがダウンストリームコンシューマーに表示されるようになります。コミットする前に、ローカルバッファー内のデータはリモートファイルシステムにフラッシュされますが、ダウンストリームコンシューマーにはこのデータを読み取ることができるという通知は送信されません。

Paimon にデータを書き込むジョブのメモリ使用量が長期間にわたってゆっくりと増加するのはなぜですか?

  • ジョブの秒間リクエスト数 (RPS) もゆっくりと増加している場合、ジョブのメモリ使用量も増加するのは正常です。

  • Paimon FileSystem Catalog を使用して OSS に読み書きしている場合は、Catalog パラメーターで fs.oss.endpointfs.oss.accessKeyId、および fs.oss.accessKeySecret を構成していることを確認してください。そうしないと、Flink ジョブが長時間実行された場合、コミュニティで既知の遅いメモリリークの問題が発生する可能性があります。

エラー: IllegalArgumentException: timeout value is negative

  • エラー詳細报错

  • 原因

    一定期間新しい MQ メッセージが消費されない場合、MetaQSource スレッドはスリープ状態になります。スリープ期間は pullIntervalMs パラメーターで設定された値です。ただし、pullIntervalMs パラメーターのデフォルト値は -1 です。-1 がスリープ期間として使用されると、ジョブはエラーを報告します。

  • 解決策

    pullIntervalMs パラメーターを負でない数値に設定できます。

RocketMQ はトピックスケーリング中にトピックパーティション数の変更をどのように検出しますか?

  • 6.0.2 より前の Flink リアルタイムコンピューティングエンジンの VVR バージョンの場合、実装は 5〜10 分ごとに現在のパーティション数を取得することです。パーティション数が元のパーティション数と 3 回連続で異なる場合、フェールオーバーがトリガーされます。したがって、ソースは変更が発生してから 10〜30 分後にパーティション数の変更を検出し、フェールオーバーが発生します。ジョブが再起動されると、新しいパーティション数に基づいてデータを読み取ります。

  • Flink リアルタイムコンピューティングエンジン 6.0.2 以降の VVR バージョンの場合、実装はデフォルトで 5 分ごとに現在のパーティション数を取得することです。新しいパーティションが見つかると、ジョブのフェールオーバーを必要とせずに、TM のソースオペレーターに直接渡されて新しいパーティションデータを読み取ります。したがって、ソースは 1〜5 分以内にパーティション数の変更を検出できます。

エラー: BackPressure Exceed reject Limit

  • エラー詳細报错详情

  • 原因

    Hologres への書き込み圧力が比較的高くなっています。

  • 解決策

    インスタンス情報を Hologres の技術サポートに提供して、スペックアップ操作を依頼できます。

エラー: 残りの接続スロットは、非レプリケーションのスーパーユーザ接続用に予約されています

  • エラー詳細

    Caused by: com.alibaba.hologres.client.exception.HoloClientWithDetailsException: failed records 1, first:Record{schema=org.postgresql.model.TableSchema@188365, values=[f06b41455c694d24a18d0552b8b0****, com.chot.tpfymnq.meta, 2022-04-02 19:46:40.0, 28, 1, null], bitSet={0, 1, 2, 3, 4}},first err:[106]FATAL: remaining connection slots are reserved for non-replication superuser connections
        at com.alibaba.hologres.client.impl.Worker.handlePutAction(Worker.java:406) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.run(Worker.java:118) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
        ... 1 more
    Caused by: com.alibaba.hologres.org.postgresql.util.PSQLException: FATAL: remaining connection slots are reserved for non-replication superuser connections
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.readStartupMessages(QueryExecutorImpl.java:2665) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.<init>(QueryExecutorImpl.java:147) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:273) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:51) ~[?:?]
        at com.alibaba.hologres.org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:240) ~[?:?]
        at com.alibaba.hologres.org.postgresql.Driver.makeConnection(Driver.java:478) ~[?:?]
        at com.alibaba.hologres.org.postgresql.Driver.connect(Driver.java:277) ~[?:?]
        at java.sql.DriverManager.getConnection(DriverManager.java:674) ~[?:1.8.0_302]
        at java.sql.DriverManager.getConnection(DriverManager.java:217) ~[?:1.8.0_302]
        at com.alibaba.hologres.client.impl.ConnectionHolder.buildConnection(ConnectionHolder.java:122) ~[?:?]
        at com.alibaba.hologres.client.impl.ConnectionHolder.retryExecute(ConnectionHolder.java:195) ~[?:?]
        at com.alibaba.hologres.client.impl.ConnectionHolder.retryExecute(ConnectionHolder.java:184) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.doHandlePutAction(Worker.java:460) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.handlePutAction(Worker.java:389) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.run(Worker.java:118) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
        ... 1 more
  • 原因

    接続数が上限を超えました。

  • 解決策

    • 接続されている各 Frontend (FE) ノードの app_name を表示して、flink-connector が使用している Hologres Client 接続の数を確認します。

    • 他のジョブが Hologres に接続しているかどうかを確認します。

    • 接続を解放します。詳細については、「接続管理」をご参照ください。

エラー: パブリケーションにテーブルが定義されていません

  • エラー詳細

    同じ名前のテーブルを削除して再作成すると、ジョブが no table is defined in publication エラーを報告する可能性があります。

  • 原因

    テーブルが削除されたときに、テーブルにバインドされている publication が削除されませんでした。

  • 解決策

    1. Hologres で select * from pg_publication where pubname not in (select pubname from pg_publication_tables); コマンドを実行して、テーブルが削除されたときにクリアされなかった publication 情報を照会します。

    2. drop publication xx; 文を実行して、残りの publication を削除します。

    3. ジョブを再起動します。

Flink Hologres 結果コネクタのチェックポイント間隔と Hologres でのデータの最終的な可視性との関係は何ですか?

Flink Hologres 結果コネクタのチェックポイント (CP) 間隔と Hologres でのデータの最終的な可視性との間に直接的な関係はありません。CP 間隔はデータ復元のサービスレベル契約 (SLA) に影響しますが、Hologres でデータが最終的に表示される時間を決定するものではありません。

Hologres コネクタはトランザクションをサポートしていません。定期的にデータをデータベースにフラッシュするだけです。CP 間隔は、各 CP でデータがデータベースにフラッシュされることを保証するだけですが、この間隔を最大で待機するという意味ではありません。実際、バッファーが特定の条件を満たすと、データはより早くダウンストリームにフラッシュされます。詳細については、「Hologres」、「Hologres」、および「Hologres」をご参照ください。一般的に、データウェアハウスはトランザクションの一貫性を保証する必要はありません。コネクタはバックグラウンドで非同期にデータをフラッシュし、CP 時に強制的にフラッシュして例外回復に備えます。

公開されたジョブが permission denied for database 例外を報告します。どうすればよいですか?

  • 原因

    リアルタイムコンピューティングエンジンの VVR 8.0.4 以降、コネクタはユーザーが 2.0 より後のバージョンの Hologres インスタンスを使用していることを検出すると、JDBC モードを使用してバイナリログを強制的に消費します。Hologres インスタンスがバージョン 2.0 以降で、ユーザーがスーパーユーザーでない場合、JDBC モードでバイナリログを消費するには特別な権限構成が必要です。

  • 解決策

    ユーザーがスーパーユーザーでない場合、JDBC モードでバイナリログを消費するには権限構成が必要です。

    user_name は Alibaba Cloud アカウント ID または RAM ユーザーの名前です。詳細については、「アカウントの概要」をご参照ください。

    -- 標準権限モデルでユーザーに CREATE 権限を付与し、インスタンスの Replication Role 権限をユーザーに付与します。
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- データベースで簡易権限モデル (SLMP) が有効になっている場合、GRANT 文を実行できません。spm_grant を使用して、DB の Admin 権限をユーザーに付与します。Holoweb で直接権限を付与することもできます。
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

ジョブが table id parsed from checkpoint is different from the current table id 例外を報告してチェックポイントから再開するときはどうすればよいですか?

  • 原因

    この問題は、リアルタイムコンピューティングエンジンの VVR バージョン 8.0.5 から 8.0.8 で、Hologres バイナリログソーステーブルがチェックポイントから再開するときに、Hologres テーブルのテーブル ID を強制的にチェックするために発生します。現在のテーブル ID がチェックポイントに保存されているものと一致しない場合、ジョブはチェックポイントから再開できません。この例外は、ユーザーがジョブの実行中にソーステーブルで TRUNCATE またはその他のテーブル再作成操作を実行したことを示します。

  • 解決策

    ジョブを開始するには、VVR-8.0.9 以降のバージョンにアップグレードすることをお勧めします。ビジネスシナリオの複雑さを考慮して、VVR 8.0.9 ではテーブル ID の強制チェックがキャンセルされました。ただし、バイナリログソーステーブルでテーブル再作成操作を実行することはお勧めしません。テーブルが再作成されると、元のテーブルの既存バイナリログは完全にクリアされます。Flink が古いテーブルの消費オフセットを使用して新しいテーブルのデータを消費すると、データ不整合やその他の予期しない状況につながる可能性があります。

JDBC モードでバイナリログを消費するときにデータ精度が期待どおりでない場合はどうすればよいですか?

  • 原因

    リアルタイムコンピューティングエンジンの VVR バージョン 8.0.10 以前では、Flink DDL で宣言されたバイナリログソーステーブルの DECIMAL 型の精度が Hologres の精度と一致しない場合、精度が期待どおりになりません。

  • 解決策

    VVR 8.0.11 では、この問題は修正されています。ただし、精度損失によるデータ損失を避けるために、Flink と Hologres の DECIMAL 型の精度が一致していることを確認する必要があります

同じ名前のテーブルを削除して再作成すると、no table is defined in publication または The table xxx has no slot named xxx 例外が発生する可能性があるのはなぜですか?

  • 原因

    テーブルが削除されたときに、テーブルにバインドされている publication が削除されませんでした。

  • 解決策

    解決策 1: Hologres で select * from pg_publication where pubname not in (select pubname from pg_publication_tables); 文を実行して、テーブルが削除されたときにクリアされなかった publication を照会し、次に drop publication xx; 文を実行して残りの publication を削除できます。その後、ジョブを再起動できます。

    解決策 2: VVR 8.0.5 以降のバージョンを選択できます。コネクタは自動的にクリーンアップ操作を実行します。

エラー: Caused by: com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException: the logs is 8785684 bytes which is larger than MAX_BATCH_SIZE_IN_BYTES 8388608

  • エラー詳細

    Caused by: com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException: the logs is 8785684 bytes which is larger than MAX_BATCH_SIZE_IN_BYTES 8388608
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.ensureValidLogSize(LogAccumulator.java:249)
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.doAppend(LogAccumulator.java:103)
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.append(LogAccumulator.java:84)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:385)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:308)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:211)
    at com.alibaba.ververica.connectors.sls.sink.SLSOutputFormat.writeRecord(SLSOutputFo
    rmat.java:100)
  • 原因

    SLS に書き込まれる単一のログが 8 MB を超え、それ以上データを書き込めません。

  • 解決策

    開始オフセットを変更して、大きすぎる異常なデータをスキップできます。詳細については、「ジョブを開始する」をご参照ください。

失敗した Flink プログラムが再開されると、TaskManager OOM が発生し、ソーステーブルが「java.lang.OutOfMemoryError: Java heap space」エラーを報告します。どうすればよいですか?

  • 原因

    この問題は通常、SLS メッセージ本文が大きすぎることが原因で発生します。SLS コネクタはバッチでデータを要求します。LogGroup は batchGetSize パラメーターによって制御され、デフォルトは 100 です。したがって、毎回最大 100 個の LogGroup が受信されます。通常の操作中、Flink はデータをタイムリーに消費し、通常は 100 個の LogGroup を受信しません。ただし、フェールオーバー中は、大量の未消費データが蓄積されます。単一の LogGroup が占有するメモリに 100 を掛けた値が、利用可能な JVM メモリよりも大きい場合、TaskManager は OOM エラーを発生します。

  • 解決策

    batchGetSize パラメーターの値を減らすことができます。

Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?

scan.mode パラメーターを使用して、Paimon ソーステーブルのコンシューマオフセットを設定できます。scan.mode パラメーターの指定可能な値と動作は次のとおりです。

パラメーター値

バッチ読み取り動作

ストリーム読み取り動作

default

デフォルト値。実際の動作は他のパラメーターによって決まります。

  • scan.timestamp-millis が設定されている場合、動作は from-timestamp パラメーター値の動作と同じです。

  • scan.snapshot-id が設定されている場合、動作は from-snapshot パラメーター値の動作と同じです。

上記の 2 つのパラメーターのいずれも設定されていない場合、動作は latest-full パラメーター値の動作と同じです。

latest-full

テーブルの最新のスナップショットを生成します。

ジョブの開始時に、まずテーブルの最新のスナップショットを生成し、次に増分データを継続的に生成します。

compacted-full

最後のコンパクション後のテーブルのスナップショットを生成します。

ジョブの開始時に、まず最後のコンパクション後のテーブルのスナップショットを生成し、次に増分データを継続的に生成します。

latest

latest-full と同じです。

ジョブの開始時に、テーブルの最新のスナップショットを生成せず、次に増分データを継続的に生成します。

from-timestamp

scan.timestamp-millis で指定された時刻以前のテーブルの最新のスナップショットを生成します。

ジョブの開始時に、スナップショットを生成せず、次に scan.timestamp-millis で指定された時刻から増分データを継続的に生成します。

from-snapshot

テーブルのスナップショットを生成します。スナップショット番号は scan.snapshot-id で指定されます。

ジョブの開始時に、スナップショットを生成せず、次に scan.snapshot-id で指定されたスナップショットから増分データを継続的に生成します。

from-snapshot-full

from-snapshot と同じです。

ジョブの開始時に、テーブルのスナップショットを生成します。スナップショット番号は scan.snapshot-id で指定されます。次に、scan.snapshot-id で指定されたスナップショットの後から増分データを継続的に生成します。

自動パーティション有効期限を構成するにはどうすればよいですか?

Paimon テーブルは、生存時間がパーティションの有効期限より長いパーティションを自動的に削除して、ストレージコストを節約することをサポートしています。詳細は次のとおりです。

  • 生存時間: 現在のシステム時刻から、パーティション値から変換されたタイムスタンプを引いたもの。パーティション値から変換されたタイムスタンプは、次の順序で取得されます。

    1. partition.timestamp-pattern パラメーター形式の文字列を使用して、パーティション値を時間文字列に変換します。

      この形式の文字列では、パーティションキー列はドル記号 ($) の後に列名が続く形式で表されます。たとえば、パーティションキー列が year、month、day、hour の 4 つの列で構成されているとします。形式文字列 $year-$month-$day $hour:00:00 は、パーティション year=2023,month=04,day=21,hour=17 を文字列 2023-04-21 17:00:00 に変換します。

    2. partition.timestamp-formatter パラメーター形式の文字列を使用して、時間文字列をタイムスタンプに変換します。

      このパラメーターが設定されていない場合、デフォルトで yyyy-MM-dd HH:mm:ss および yyyy-MM-dd 形式の文字列が試行されます。Java の DateTimeFormatter と互換性のある任意の形式の文字列を使用できます。

  • パーティションの有効期限: partition.expiration-time パラメーターに設定した値。

ストレージ内でデータが見つからない場合はどうすればよいですか?

  • データがディスクにフラッシュされていない場合、これは正常な現象です。なぜなら、Flink の writer は次のポリシーに基づいてデータをディスクにフラッシュするためです。

    • 特定のバケットがメモリ内で特定のサイズまで蓄積されます。デフォルトのサイズは 64 MB です。

    • 合計バッファーサイズが特定のサイズまで蓄積されます。デフォルトのサイズは 1 GB です。

    • チェックポイントがトリガーされ、メモリ内のすべてのデータがフラッシュされます。

  • ストリーム書き込みの場合は、チェックポイントが有効になっていることを確認してください。

重複データがある場合はどうすればよいですか?

  • Copy On Write (COW) 書き込みを使用している場合は、write.insert.drop.duplicates パラメーターを有効にする必要があります。

    COW 書き込みの場合、各バケットの最初のファイルはデフォルトでは重複排除されません。増分データのみが重複排除されます。グローバルな重複排除には、このパラメーターを有効にする必要があります。Merge On Read (MOR) 書き込みでは、パラメーターを有効にする必要はありません。主キーが定義されると、グローバルな重複排除がデフォルトで有効になります。

    説明

    Hudi 0.10.0 以降、このプロパティは write.precombine に名前が変更され、デフォルト値は true です。

  • 複数のパーティションを重複排除するには、index.global.enabled パラメーターを true に設定する必要があります。

    説明
    • Hudi 0.10.0 以降、このプロパティはデフォルトで true です。

    • index.type=bucket の場合、Bucket インデックスはパーティション間の変更をサポートしていないため、index.global.enabled パラメーターを true に設定しても無効です。したがって、グローバルインデックスが有効になっていても、複数のパーティションの重複排除は実現できません。

  • 1 か月前のデータの更新など、長期的な更新の場合は、index.state.ttl (日数) の値を増やす必要があります。

    インデックスは、データの重複を判断するためのコアデータ構造です。index.state.ttl はインデックスの保持時間を設定し、デフォルトは 1.5 日です。0 未満の値は永続的な保持を意味します。

    説明

    Hudi 0.10.0 以降、このプロパティはデフォルトで 0 です。

Merge On Read にはログファイルしかないのはなぜですか?

  • 原因: Hudi はコンパクションを実行した後にのみ Parquet ファイルを生成します。そうしないと、ログファイルしか存在しません。Merge On Read はデフォルトで非同期コンパクションを有効にし、ポリシーは 5 コミットごとに 1 回コンパクションすることです。コンパクションタスクは、条件が満たされた場合にのみトリガーされます。

  • 解決策: コンパクション間隔パラメーター compaction.delta_commits を調整することで、コンパクションタスクをより速くトリガーできます。

エラー: 複数文が検出されました

  • 詳細

    Flink ジョブが AnalyticDB for MySQL (ADB) にデータを書き込むと、ジョブが異常に再起動し、エラー Caused by: java.sql.SQLSyntaxErrorException: [13000, 2024101216171419216823505703151806929] multi-statement be found. を報告します。

    image

  • 原因

    ALLOW_MULTI_QUERIES=true 構成が AnalyticDB for MySQL (ADB) データベースで有効になっており、MySQL JDBC Driver 8.x と一緒に使用されると、互換性の問題が発生します。

  • 解決策

    1. 技術サポートに連絡して、MySQL JDBC Driver バージョン 5.1.46 用のカスタム ADB 3.0 コネクタを入手し、Flink ジョブに適用します。カスタムコネクタの使用方法については、「カスタムコネクタを管理する」をご参照ください。

    2. ADB テーブルの URI に allowMultiQueries=true パラメーターを構成します。たとえば、jdbc:mysql://xxxxx.ads.aliyuncs.com:3306/xxx?allowMultiQueries=true' です。

エラー: 適切なドライバーが見つかりませんでした

  • 原因

    カスタムコネクタが対応するドライバーを見つけられません。

  • 解決策