すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:よくある質問

最終更新日:Mar 10, 2026

Realtime Compute for Apache Flink のコネクタに関する代表的な質問とその回答を、コネクタの種別ごとに整理しています。

症状別インデックス(早見表)

原因となるコネクタが不明でも、発生している症状から該当する FAQ を素早く特定できます。

症状

関連する項目

データが出力されない

Kafka:イベントタイムウィンドウで出力が発生しない / Hudi:ストレージにデータが存在しない / Tablestore:ディメンション結合でデータが返らない

重複データ

MaxCompute:重複データの書き込み / Hudi:重複データ

メモリ不足 (OOM)

Paimon:TaskManager のハートビートタイムアウト / Simple Log Service:復元時の OOM

チェックポイントの問題

Hologres:チェックポイントとデータ可視性 / Paimon:チェックポイントとデータ可視性

起動または読み取りが遅い

MaxCompute:ジョブが「起動中」ステータスで停止する/ MaxCompute:増分ソースの読み取り開始が遅い

権限エラー

MaxCompute:権限付与失敗(エラーコード 4019) / Hologres:データベースに対する権限が拒否されました

ディスク領域

Paimon:デバイス上の空き領域がありません / Paimon:OSS 上の大規模ファイル

接続エラー

Kafka:接続は成功したが読み取り/書き込みができない / Hologres:使用可能な接続スロットが残っていません


Kafka

Kafka から JSON データを解析する

標準的な JSON の場合は、DDL で JSON フォーマット を使用します。

ネストされた JSON の場合は、JSON オブジェクトを ROW 型、JSON 配列を ARRAY 型にマッピングします。以下の入力例を参照してください:

{
    "a": "abc",
    "b": 1,
    "c": {
        "e": ["1", "2", "3", "4"],
        "f": {"m": "567"}
    }
}

ソーステーブルの DDL:

CREATE TEMPORARY TABLE kafka_table (
  `a` VARCHAR,
  b INT,
  `c` ROW<e ARRAY<VARCHAR>, f ROW<m VARCHAR>>  -- JSON オブジェクト = ROW、JSON 配列 = ARRAY
) WITH (
  'connector' = 'kafka',
  'topic' = '<your-topic>',
  'properties.bootstrap.servers' = '<broker-list>',
  'properties.group.id' = '<group-id>',
  'format' = 'json',
  'scan.startup.mode' = '<startup-mode>'
);

シンクテーブル DDL:

CREATE TEMPORARY TABLE sink (
  `a` VARCHAR,
  b INT,
  e VARCHAR,
  `m` VARCHAR
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);

ネストされたフィールドを抽出する DML:

INSERT INTO sink
  SELECT
    `a`,
    b,
    c.e[1],   -- Flink では配列のインデックスは 1 から始まります
    c.f.m
  FROM kafka_table;

Kafka には接続できるが、データの読み取り/書き込みができない

原因

Realtime Compute for Apache Flink と Kafka の間にプロキシまたはポートマッピングが存在する場合、Kafka クライアントはプロキシのアドレスではなく、Kafka のメタデータからブローカーエンドポイントを取得します。初期接続は成功しても、その後のデータ操作は失敗します。これは、Flink がブローカーに直接アクセスしようとするためです。

トラブルシューティング手順

  1. zkCli.sh または zookeeper-shell.sh を使用して、Kafka クラスターの ZooKeeper サービスに接続します。

  2. get /brokers/ids/0 を実行し、ブローカーメタデータを取得します。endpoints フィールドを確認します。

  3. Flink 環境から、ping または telnet を使用して、取得したエンドポイントへの接続性をテストします。接続性テストが失敗した場合、プロキシまたはポートマッピングが使用されています。

解決方法

  • Flink と Kafka の間でプロキシをバイパスした直接ネットワーク接続を確立します。

  • または、Kafka ブローカーの advertised.listeners をプロキシのアドレスに設定し、ブローカーメタデータが到達可能なアドレスを返すように構成します。

advertised.listeners は Kafka 0.10.2.0 以降で利用可能です。詳細については、「KIP-103:内部トラフィックと外部トラフィックの分離」および「Kafka のネットワーク接続に関する問題」をご参照ください。

Kafka ソーステーブルでイベントタイムウィンドウの出力が発生しない理由

着信データがないアイドルパーティションがあると、Watermark の進行がブロックされ、イベントタイムウィンドウがトリガーされなくなります。

解決方法

  1. Kafka ソーステーブルのすべてのパーティションにデータが含まれていることを確認します。

  2. アイドルソース検出を有効にするには、[その他の設定] フィールドに次のパラメーターを追加し、[設定] タブで設定します。パラメーターの詳細については、「Flink の設定」をご参照ください。

       table.exec.source.idle-timeout: 5

Kafka でのオフセットのコミット

Realtime Compute for Apache Flink は、各チェックポイントでコンシューマの読み取りオフセットを Kafka にコミットします。これにより、処理済みデータの位置が記録され、障害発生時の重複またはデータ損失を防止します。

チェックポイントが無効化されている場合、またはチェックポイント間隔が大きすぎる場合、オフセットがコミットされず、障害発生後に重複またはデータ損失が発生する可能性があります。

UDTF を使用したネストされた JSON の解析

Kafka コネクタが {"data":[{"cola":"test1","colb":"test2"}, ...]} のようなデータを読み取る場合、ネストされた配列は ARRAY<ROW<cola VARCHAR, colb VARCHAR>> として解析されます。この配列要素は、ユーザー定義のテーブル値関数(UDTF) を使用して処理します。

セキュアな Kafka クラスターへの接続

Kafka テーブルの DDL の WITH 句にセキュリティプロパティを追加します。各 Kafka セキュリティプロパティには、properties. をプレフィックスとして付与します。

重要

JAAS ログインモジュールは、Realtime Compute for Apache Flink でシャドウ化されたクラスパス(org.apache.flink.kafka.shaded.org.apache.kafka...)を使用しており、標準の Apache Kafka クラスパスとは異なります。

SASL/PLAIN の例:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";'
);

SASL_SSL(SCRAM-SHA-256)の例:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /* SSL 構成 */
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = '<truststore-password>',
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = '<keystore-password>',
  /* SASL 構成 */
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<username>\" password=\"<password>\";'
);
JAAS ログインモジュールのクラスは、使用する SASL メカニズムに合わせて選択してください。

必要なすべてのファイル(証明書、キーストアなど)は、デプロイメントの 追加の依存関係 を通じてアップロードします。アップロードされたファイルは /flink/usrlib/ に保存されます。手順については、「ジョブのデプロイ」をご参照ください。

重要

Kafka ブローカーが SASL_SSL を使用しているにもかかわらず、クライアントが SASL_PLAINTEXT で構成されている場合、SQL 検証時に OutOfMemory エラーが発生します。クライアントのセキュリティプロトコルをブローカーと一致させるよう更新してください。

キーと値のフィールド名の競合を解決する

Kafka メッセージのキーと値の両方に同じ名前のフィールド(例:id)が含まれている場合、key.fields-prefix プロパティを使用して区別します:

CREATE TABLE kafka_table (
  key_id INT,
  value_id INT,
  name STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true',
  'key.format' = 'json',
  'key.fields' = 'id',
  'value.format' = 'json',
  'value.fields' = 'id, name',
  'key.fields-prefix' = 'key_'
);

クエリ結果:

key_id: 1,
value_id: 100,
name: flink

key_ というプレフィックスがキーのフィールド名に付与されるため、キーのフィールド id は Flink テーブル内で key_id として参照され、値のフィールド idvalue_id としてアクセス)との競合が回避されます。

Kafka からの読み取り時に予期せぬ 50 年の遅延が発生する

currentEmitEventTimeLag メトリックが 50 年以上の遅延を示すのは、Kafka メッセージのタイムスタンプが 0 または null である場合です。このメトリックは current_time - message_timestamp で計算されるため、ゼロのタイムスタンプは Unix エポックからの経過時間として表示されます。

トラブルシューティング手順:

  1. JAR デプロイメントの場合、POM ファイル内の Kafka 依存関係が Realtime Compute for Apache Flink の組み込み依存関係であることを確認します。サードパーティ製の Kafka 依存関係では、遅延メトリックが報告されない場合があります。

  2. 上流の Kafka トピックのすべてのパーティションにデータが送信されていることを確認します。

  3. メッセージのタイムスタンプが 0 または null であるかどうかを確認します:

    • SQL デプロイメントの場合、メッセージのタイムスタンプを抽出するためのメタデータ列を定義します:sql CREATE TEMPORARY TABLE kafka_source ( timestamp BIGINT, ts_meta TIMESTAMP METADATA FROM 'timestamp', ts AS TO_TIMESTAMP( FROM_UNIXTIME(timestamp, 'yyyy-MM-dd HH:mm:ss') ), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = '<your-topic>', 'properties.bootstrap.servers' = '<broker-list>', 'properties.group.id' = '<group-id>', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' );

    • JAR デプロイメントの場合、KafkaConsumer を使用したシンプルな Java プログラムを作成し、メッセージのタイムスタンプを検査します。

"upsert-kafka テーブルには PRIMARY KEY 制約を定義する必要があります" のエラーを修正する

Upsert Kafka コネクタは、変更ログイベント(INSERT、UPDATE_AFTER、DELETE)をパーティション化するために主キーを必要とします。これにより、同じキーを持つメッセージが同じパーティションに配置され、順序通りに処理されます。

結果テーブルの DDL に PRIMARY KEY ... NOT ENFORCED 宣言を追加します:

CREATE TABLE upsert_kafka_sink (
  id INT,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  ...
);

DataHub

DataHub トピックの分割またはスケールイン後のデプロイメント再開

DataHub トピックの分割またはスケールイン後、読み取り元のトピックが変更された場合、デプロイメントは失敗し、再開できません。デプロイメントをキャンセルして、再度開始してください。

消費中の DataHub トピックを削除できますか?

いいえ。現在消費中の DataHub トピックは、削除も再作成もできません。


MaxCompute

endPointtunnelEndpoint の違い

エンドポイントの詳細については、「エンドポイント」をご参照ください。仮想プライベートクラウド(VPC)における誤った構成が、これらの問題を引き起こします:

パラメーター

症状

endPoint の設定が不正

タスクが進行率 91 % で停止する

tunnelEndpoint の設定が不正

タスクの実行に失敗する

MaxCompute ソーステーブルによるデータ読み取りの仕組み

完全データおよび増分データの MaxCompute ソーステーブルは、どちらも MaxCompute Tunnel を介してデータを読み取ります。読み取り速度は Tunnel の帯域幅によって制限されます。

デプロイメント開始後に追加されたデータが読み取れない

デプロイメント開始後、既に読み取り中のパーティションまたはテーブルに追加されたデータは読み取れず、フェールオーバーを引き起こす可能性があります。

完全および増分の MaxCompute ソーステーブルは、ODPS DOWNLOAD SESSION を使用してデータを読み取ります。ダウンロードセッションが作成されると、MaxCompute はその時点のデータに基づいてインデックスファイルを生成します。その後の読み取りはこのインデックスを使用します。セッション作成後に追加されたデータは含まれません。

セッション開始後に新しいデータが書き込まれた場合:

  • Tunnel 読み取りが次のように失敗する可能性があります:ErrorCode=TableModified, ErrorMessage=The specified table has been modified since the download initiated.

  • デプロイメントがフェールオーバーから回復した場合、データの正確性は保証されません。既存のデータが再読み取りされ、新しいデータは不完全になる可能性があります。

MaxCompute デプロイメントの一時停止後に並列度を変更する

ストリーミングデプロイメントで useNewApi=true(デフォルト)が指定されている場合、一時停止後に並列度を変更できます。次のパーティションのデータは新しい並列度で分散されますが、現在のパーティションは元の分散を維持します。大規模なパーティションの読み取り中に並列度を増加させると、一部のオペレーターのみがそのパーティションのデータを処理することになります。

バッチデプロイメント、または useNewApi=false が指定されたデプロイメントでは、並列度の変更はサポートされていません

開始オフセットは MaxCompute には適用されない

開始オフセットは、メッセージキュー(例:DataHub)などのソースにのみ適用され、MaxCompute には適用されません。デプロイメントを開始した後:

  • パーティションテーブルの場合、Flink は既存のすべてのパーティションを読み取ります。

  • 非パーティションテーブルの場合、Flink は既存のすべてのデータを読み取ります。

増分 MaxCompute ソースにおけるパーティションデータの不完全性

パーティションデータの完全性をチェックする仕組みはありません。新しいパーティションが検出されると、ソースは直ちに読み取りを開始します。

パーティションとそのデータが同時に利用可能になるように、INSERT OVERWRITE を使用します:

INSERT OVERWRITE TABLE T PARTITION (ds='20191010') ...
重要

まずパーティションを作成し、その後にデータを書き込むことは避けてください。増分ソースはパーティションを検出した時点で直ちに読み取りを開始するため、不完全なデータを読み取る可能性があります。

MaxCompute で「権限付与失敗 [4019]」が発生する

MaxCompute の DDL で指定されたユーザー ID にアクセス権限がありません。Alibaba Cloud アカウント、RAM ユーザー、または RAM ロールを使用して認証を行ってください。詳細については、「ユーザー認証」をご参照ください。

増分 MaxCompute ソース向けの startPartition の構成

startPartition の値は、以下の手順で構築します:

ステップ

操作

1

各パーティションキー列と値を = で結合します。

dt=20220901

2

パーティションレベル(昇順)で並べ替え、カンマで区切ります。スペースは含めません。

dt=20220901,hh=08,mm=10

すべてのパーティションレベルを指定するか、最初の数レベルのみを指定します。

パーティション比較: ソースは、すべてのパーティションをアルファベット順に startPartition と比較し、等しいかそれより大きい値のパーティションを読み取ります。

例として、以下のパーティションがあるとします:

  • ds=20191201,type=a

  • ds=20191201,type=b

  • ds=20191202,type=a

  • ds=20191202,type=b

  • ds=20191202,type=c

  • ds=20191203,type=a

startPartition の値

読み取られるパーティション

ds=20191202

ds=20191202,type=a, type=b, type=c, ds=20191203,type=a

ds=20191202,type=b

ds=20191202,type=b, type=c, ds=20191203,type=a

startPartition で指定されたパーティションは、実際に存在する必要はありません。

増分 MaxCompute ソースの読み取り開始が遅い

startPartition 条件に一致するパーティションが多すぎたり、それらのパーティションに多数の小規模ファイルが存在したりすると、ソースが読み取りを開始する前にパーティションメタデータをソートする必要があるため、起動が遅くなります。

  • 過剰な履歴データの読み取りを避けます。履歴データの処理には、バッチデプロイメントを使用してください。

  • 履歴パーティション内の小規模ファイルの数を減らします。

読み取り/書き込み向けの partition パラメーターの構成

静的パーティションからの読み取り

ステップ

操作

1

パーティションキーと値を = で結合します。ディメンションテーブルでは固定値が必要です。ソーステーブルではワイルドカード(*)がサポートされます。

dt=20220901, dt=202209*, dt=2022*01, dt=*

2

パーティションレベルで並べ替え、カンマで区切ります。

dt=20220901,hh=08,mm=10

より柔軟なフィルタリングを行うには、パーティションプッシュダウンを活用した WHERE 句を使用します。PARTITIONED BY を使用してパーティション列を宣言することで、SQL オプティマイザーの機能を有効化できます:

CREATE TABLE maxcompute_table (
  content VARCHAR,
  dt VARCHAR,
  hh VARCHAR
) PARTITIONED BY (dt, hh) WITH (
  'connector' = 'odps',
  ...
);

SELECT content, dt, hh FROM maxcompute_table
WHERE dt >= '20220901' AND dt <= '20220903' AND hh >= '09' AND hh <= '17';

最新パーティションの読み取り

関数

動作

max_pt()

アルファベット順で最初にランク付けされるパーティション(通常は最新のもの)を返します。

max_two_pt()

アルファベット順で最初の 2 つのパーティションを返します。

max_pt_with_done()

対応する .done パーティションを持つ最初のパーティションを返します。

ソーステーブルの場合、max_pt() は一致するパーティションから一度だけデータを読み取り、その後停止します。新しいパーティションの監視は行いません。継続的な読み取りには、増分ソーステーブルを使用してください。ディメンションテーブルの場合、各リフレッシュで最新のパーティションがチェックされます。

最新のパーティションがまだ読み込み中の場合に max_pt_with_done() を使用します。データ準備が完了した後に、空の .done パーティション(例:dt=20220901.done)を作成します。ディメンションテーブルは、.done の対応パートがあるパーティションからのみ読み取ります。

静的パーティションへの書き込み

partition パラメーターは、読み取り時と同様に構成しますが、結果テーブルではワイルドカード(*)はサポートされていません

動的パーティションへの書き込み

パーティションレベルの昇順でパーティションキー列名をカンマで区切ってリストします:

'partition' = 'dt,hh,mm'

max_pt()max_pt_with_done() の比較

以下のパーティションがあるとします:

  • ds=20190101

  • ds=20190101.done

  • ds=20190102

  • ds=20190102.done

  • ds=20190103

関数

返り値

max_pt()

ds=20190103(アルファベット順で最初)

max_pt_with_done()

ds=20190102.done の対応パートを持つ最初のもの)

MaxCompute デプロイメントが「起動中」ステータスで停止する、またはデータ生成が遅い

考えられる原因:

  • 小規模ファイルの多さ: MaxCompute ソーステーブルに多数の小規模ファイルが存在します。

  • クロスリージョンアクセス: MaxCompute ストレージクラスターと Flink コンピューティングクラスターが異なるリージョンにあり、ネットワーク遅延が発生しています。両者を同一リージョンにデプロイしてください。

  • 無効な権限: MaxCompute ソーステーブルには Download 権限が必要です。

データトンネルの選択:バッチトンネル vs. ストリーミングトンネル

検討事項

バッチトンネル

ストリーミングトンネル

整合性

at-least-once。重複は、チェックポイントエラーが発生かつデータが複数のパーティションに同時に書き込まれた場合にのみ発生します。

at-least-once。例外が発生した場合、常に重複が発生する可能性があります。

スループット

低め。データはチェックポイント時にコミットされ、サーバー上でファイルが作成されます。

高め。チェックポイント時のコミットはありません。numFlushThreads > 1 の場合、上流データの受信中にフラッシュが実行されます。

バッチトンネルのチェックポイントが遅い、またはタイムアウトする場合、また下流のストレージが重複を許容する場合は、ストリーミングトンネルに切り替えてください。

MaxCompute への重複データの書き込み

以下の点を順に確認してください:

  1. SQL ロジック: MaxCompute の非トランザクションテーブルでは、DDL で主キーが宣言されていても、主キーの一意性は強制されません。SQL が重複を生成する場合、そのまま書き込まれます。

  2. 複数のデプロイメント: 同じ MaxCompute テーブルに複数のデプロイメントが書き込むと、重複した行が生成されます。

  3. バッチトンネル + チェックポイント失敗: デプロイメントがチェックポイント中に失敗した場合、コミット済みのデータが、前回のチェックポイントから再開された際に再び書き込まれる可能性があります。

  4. ストリーミングトンネル + フェールオーバー: チェックポイント間のデータは即座にコミットされます。フェールオーバー後、最後のチェックポイントから障害発生までのデータが再書き込みされる可能性があります。より厳密な重複排除を実現するには、バッチトンネルへの切り替えを検討してください。

  5. バッチトンネル + キャンセル/再起動(VVR < vvr-6.0.7-flink-1.15): キャンセル時に(例:Autopilot 最適化による)コネクタがシャットダウンする前にデータがコミットされます。この問題を修正するには、VVR vvr-6.0.7-flink-1.15 以降にアップグレードしてください。

MaxCompute 結果テーブルで「無効なパーティション仕様」エラーが発生する

データ内のパーティションキー値が無効です。無効な値には、空文字列、null、および =,/ を含む値が含まれます。データを検査して、これらの無効な値を確認してください。

MaxCompute 結果テーブルで「利用可能な blockId がありません」エラーが発生する

書き込まれるブロック数が上限を超えています。これは、各フラッシュで書き込まれるデータ量が少なすぎ、頻繁にフラッシュが実行されていることを意味します。batchSize および flushIntervalMs を増加させて、フラッシュ頻度を低下させてください。

ディメンションテーブル向けの SHUFFLE_HASH ヒントの使用

デフォルトでは、各サブタスクがディメンションテーブル全体をキャッシュします。大規模なディメンションテーブルの場合、SHUFFLE_HASH を使用してデータをサブタスク間で分散させることで、JVM ヒープメモリの消費を削減できます:

-- ソーステーブルおよびディメンションテーブル
CREATE TABLE source_table (k VARCHAR, v VARCHAR) WITH ( ... );
CREATE TABLE dim_1 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_2 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_3 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );

-- dim_1 および dim_3 に SHUFFLE_HASH を適用;dim_2 はサブタスクごとに完全にキャッシュされたまま
SELECT /*+ SHUFFLE_HASH(dim_1), SHUFFLE_HASH(dim_3) */
  k, s.v, d1.v, d2.v, d3.v
FROM source_table AS s
INNER JOIN dim_1 FOR SYSTEM_TIME AS OF PROCTIME() AS d1 ON s.k = d1.k
LEFT JOIN dim_2 FOR SYSTEM_TIME AS OF PROCTIME() AS d2 ON s.k = d2.k
LEFT JOIN dim_3 FOR SYSTEM_TIME AS OF PROCTIME() AS d3 ON s.k = d3.k;

CacheReloadTimeBlackList の構成

このパラメーターは、指定された期間内でのディメンションテーブルのキャッシュ再読み込みをブロックします。

  • データ型: 文字列

  • 形式: 開始時刻 -> 終了時刻。複数の期間を指定する場合は、カンマで区切ります。

  • 時刻形式: YYYY-MM-DD HH:mm。毎日繰り返す期間の場合は日付を省略できます。

例:

'cacheReloadTimeBlackList' = '14:00 -> 15:00,23:00 -> 01:00'

シナリオ

日次の間隔

14:00 -> 15:00

複数の毎日の期間

14:00 -> 15:00,23:00 -> 01:00

特定の日付の期間

14:00 -> 15:00,23:00 -> 01:00,2025-10-01 22:00 -> 2025-10-01 23:00


MySQL / ApsaraDB RDS for MySQL

"SSL peer shut down incorrectly" エラー

MySQL サーバーで SSL が有効化されているが、クライアントが SSL を使用していない場合に発生します。JDBC URL に characterEncoding=utf-8&useSSL=false を追加します:

'url' = 'jdbc:mysql://<host>:<port>/<database>?characterEncoding=utf-8&useSSL=false'

Hologres への同期時に BIGINT UNSIGNED が DECIMAL、さらに TEXT に変換される

Flink は BIGINT UNSIGNED をサポートしないため、値の範囲制約に基づいて DECIMAL 型に変換します。CREATE TABLE AS を使用して Hologres に同期する際、主キーはさらに TEXT 型に変換されます。これは、Hologres が主キーとして DECIMAL 型をサポートしていないためです。

開発段階で主キーのデータ型を調整してください。DECIMAL 列を保持するには、Hologres テーブルを手動で作成し、別の主キーを設定するか、主キーを完全に省略してください。適切な主キーがない場合、重複排除はアプリケーションレベルで処理する必要があります。

ApsaraDB RDS 結果テーブルにおける Upsert と Insert の動作の違い

DDL に主キーが定義されていますか?

書き込み動作

はい

INSERT ... ON DUPLICATE KEY UPDATE — キーが新規の場合は挿入、既存の場合は更新

いいえ

INSERT INTO — 常に新しい行を挿入

ApsaraDB RDS 結果テーブルにおける GROUP BY と一意なインデックス

  • GROUP BY 句で一意なインデックスを宣言します。

  • 自動インクリメント主キーは、SQL 内で主キーとして宣言できません。

Flink SQL における INT UNSIGNED 型のマッピング

MySQL JDBC ドライバーは、Flink に到達する前にデータ型を変換します:

MySQL の物理型

JDBC ドライバーの型

Flink SQL の型

INT UNSIGNED

LONG

BIGINT

BIGINT UNSIGNED

BIGINTEGER

DECIMAL(20, 0)

"Incorrect string value" エラー

データに、データベースのエンコーディングで解析できない文字が含まれています。JDBC URL に characterEncoding=UTF-8 を追加します:

'url' = 'jdbc:mysql://<host>:<port>/<database>?characterEncoding=UTF-8'

ApsaraDB RDS または TDDL コネクタ経由の MySQL 書き込み時のデッドロック

InnoDB の行ロックは、個々のレコードではなくインデックスに対して動作します。複数のトランザクションが重なるインデックス範囲にアクセスすると、デッドロックが発生する可能性があります。

例: トランザクション T1 が (-inf, 2] の範囲ロックを保持しており、(-inf, 1] の取得を試みています。一方、トランザクション T2 は (-inf, 2] の取得を待っています。両トランザクションが互いをブロックします。

RDS/TDDL と Tablestore の比較:

ストレージ

ロック粒度

影響

RDS/TDDL(InnoDB)

インデックス範囲ロック

競合は範囲全体に及ぶ

Tablestore

単一行ロック

他の行には影響しない

解決方法: 高 QPS または高同時実行数のシナリオでは、結果テーブルとして Tablestore を使用してください。

リレーショナルデータベースが必須の場合は:

  • デプロイメントを他のシステムの読み取り/書き込み操作から分離します。

  • 小規模なデータ量の場合、単一の同時書き込みを使用します。

  • 可能であれば一意なキーを避けてください。必要に応じて、一意なキーを差別化(最も高いものを最初に)する順序で並べます。

  • 単一テーブルのボトルネックを回避するために、データベースおよびテーブルをシャードします。

スキーマ変更が下流テーブルに伝播されない

スキーマ同期は、DDL 文のみではなく、連続するデータレコード間のスキーマの違いによってトリガーされます。DDL 変更後にデータが書き込まれていない場合、下流のスキーマ更新はトリガーされません。詳細については、「CREATE TABLE AS(CTAS)」の「テーブルスキーマ変更の同期ポリシー」セクションをご参照ください。

MySQL CDC ソースで「finish split response timeout」が発生する

CPU 使用率が高いため、ソースがコーディネーターの RPC 要求に応答できていません。リソース タブで TaskManager の CPU コア数を増加させてください。

MySQL CDC テーブルの完全データ読み取り中のスキーマ変更

完全データ読み取り中にスキーマ変更が発生すると、エラーが発生したり、スキーマ同期が妨げられたりする可能性があります。デプロイメントをキャンセルし、下流テーブルを削除して、状態なしで再起動してください。

CTAS/CDAS 同期中のサポートされていないスキーマ変更

デプロイメントをキャンセルし、下流テーブルを削除して、状態なしで再起動してください。互換性のないスキーマ変更は避けてください。サポートされる変更については、「CREATE TABLE AS ステートメント」をご参照ください。


ClickHouse

ClickHouse 結果テーブルからの更新データの撤回

主キーが宣言され、ignoreDeletefalse に設定されている場合、撤回は可能ですが、パフォーマンスが大幅に低下します。ClickHouse は OLAP システムであり、ALTER TABLE UPDATE および ALTER TABLE DELETE 操作は本質的に遅いです。

ClickHouse 結果テーブルにおけるデータ可視性

Exactly-once が有効ですか?

データが可視になるタイミング

いいえ(デフォルト)

書き込みバッファーが batchSize に達するか、待機時間が flushIntervalMs を超えたとき

はい

チェックポイントが完了します


Print

Print コネクタの出力を確認する

方法 1:Realtime Compute for Apache Flink コンソール

  1. O&M > デプロイメント に移動し、デプロイメント名をクリックします。

  2. ログ タブをクリックします。

  3. ジョブ のドロップダウンリストから実行中のジョブを選択します。

  4. 実行中の TaskManager タブをクリックし、パス、ID 列の値をクリックします。

  5. ログ タブをクリックして、Print の結果を確認します。

方法 2:Flink UI

  1. O&M > デプロイメント に移動し、デプロイメント名をクリックします。

  2. ステータス タブで、アクション欄の Flink UI をクリックします。

  3. Apache Flink Dashboard で Task Managers をクリックします。

  4. パス、ID 列の値をクリックします。

  5. ログ タブをクリックして、Print の結果を確認します。


Tablestore

ディメンション結合でデータが返らない(Tablestore)

DDL 内の列の型および名前が、物理テーブルのスキーマと完全に一致していることを確認してください。


ApsaraMQ for RocketMQ

"IllegalArgumentException: timeout value is negative"

pullIntervalMs のデフォルト値は -1 です。一定期間メッセージが到着しない場合、コンシューマースレッドはこの期間スリープし、エラーが発生します。pullIntervalMs を負でない値(例:0)に設定してください。

パーティション検出の動作

VVR バージョン

検出間隔

動作

遅延

6.0.2 より前

5~10 分

連続して 3 回検出された場合、フェールオーバーがトリガーされます

10~30 分

6.0.2 以降

5 分

ソースオペレーターが新しいパーティションを直接読み取り、フェールオーバーは発生しません

1~5 分


Hologres

"BackPressure Exceed reject Limit" エラー(Hologres)

Hologres インスタンスの書き込み負荷が高くなっています。Hologres のテクニカルサポートに連絡して、インスタンスのスペックアップをご依頼ください。

"remaining connection slots" エラー(Hologres)

接続数が Hologres インスタンスの上限を超えています。

  1. 各フロントエンドノードの app_name を確認し、Flink コネクタの接続数をカウントします。

  2. 他のデプロイメントが同じインスタンスに接続していないか確認します。

  3. アイドル状態の接続を解放します。詳細については、「接続の管理」をご参照ください。

テーブル再作成後に「no table is defined in publication」が発生する(Hologres)

削除されたテーブルに関連付けられたパブリケーションがクリーンアップされていません。

  1. 孤立したパブリケーションを照会します:

       SELECT * FROM pg_publication
       WHERE pubname NOT IN (SELECT pubname FROM pg_publication_tables);
  2. 孤立したパブリケーションを削除します:

       DROP PUBLICATION <publication_name>;
  3. デプロイメントを再起動します。

チェックポイント間隔と Hologres のデータ可視性

チェックポイント間隔は、データ可視性と直接関係ありません。Hologres コネクタは、チェックポイントとは独立して定期的にデータを Hologres にフラッシュします。バッファーがフラッシュ条件を満たす場合、次のチェックポイントよりも前にデータが可視になります。

ただし、コネクタはトランザクションの一貫性を提供しません。チェックポイントはフォールトトレランスおよび回復のためにフラッシュを強制しますが、チェックポイント間のデータはすでに部分的に可視になっている可能性があります。

"permission denied for database"(Hologres)

VVR 8.0.4 以降では、Flink は Hologres V2.0 以降のバイナリログを消費するために JDBC モードを使用します。これは、非スーパーユーザーアカウントに対して特定の権限を必要とします。

標準 PostgreSQL 権限モデル:

GRANT CREATE ON DATABASE <db_name> TO <user_name>;
ALTER ROLE <user_name> REPLICATION;

簡易権限モデル(SPM):

CALL spm_grant('<db_name>_admin', '<user_name>');
ALTER ROLE <user_name> REPLICATION;

<user_name> は、Alibaba Cloud アカウント ID または RAM ユーザー名に置き換えてください。詳細については、「アカウントの概要」をご参照ください。

"table id parsed from checkpoint is different from the current table id"

VVR 8.0.5 ~ 8.0.8 では、Flink は Hologres バイナリログジョブのチェックポイント回復時にテーブル ID の整合性を検証します。この例外は、Hologres テーブルが再構築された(例:TRUNCATE を使用)ことを示しています。

解決方法:

  • VVR 8.0.9 以降にアップグレードして、テーブル ID 検証を無効化します。

  • ソーステーブルの再構築を避けてください。再構築により、過去のバイナリログが削除され、古いオフセットで新しいテーブルを読み取ると、データの不整合が発生します。

Hologres バイナリログ JDBC モードにおける Decimal 精度の不一致

VVR 8.0.10 以前では、Flink ソーステーブルと Hologres テーブルの Decimal 精度が一致していない場合、エラーが発生します。

解決方法:

  • VVR 8.0.11 以降にアップグレードします。

  • データ損失を防ぐため、Flink の DDL と Hologres テーブルの Decimal フィールド精度が一致していることを確認してください。

再作成されたテーブルで「no table is defined in publication」または「The table xxx has no slot named xxx」が発生する(Hologres)

削除されたテーブルのパブリケーションが削除されていません。

解決方法 1:

  1. 孤立したパブリケーションを照会します:

       SELECT * FROM pg_publication
       WHERE pubname NOT IN (SELECT pubname FROM pg_publication_tables);
  2. パブリケーションを削除します:

       DROP PUBLICATION <publication_name>;
  3. ジョブを再起動します。

解決方法 2:

VVR 8.0.5 以降にアップグレードしてください。コネクタが孤立したパブリケーションを自動的にクリーンアップします。


Simple Log Service

LogSizeTooLargeException(Simple Log Service)

単一のログ行が 8 MB を超えています(MAX_BATCH_SIZE_IN_BYTES = 8388608)。過大なデータをスキップするために、デプロイメントの開始オフセットを変更してください。手順については、「ジョブデプロイメントの開始」をご参照ください。

Simple Log Service ソースからの復元時に OOM が発生する

Simple Log Service コネクタは、batchGetSize(デフォルト: 100)で制御されるバッチ単位でデータをフェッチします。フェールオーバー後、大量のバックログが蓄積します。single_log_group_size * 100 が利用可能な JVM ヒープメモリを超過すると、OOM エラーが発生します。

batchGetSize の値を小さくしてください。


Paimon

Paimon ソーステーブル向けのコンシューマオフセットの指定

scan.mode を構成して、開始位置を制御します:

スキャンモード

バッチ読み取り

ストリーミング読み取り

default

他のパラメーターによって決定されます:scan.timestamp-millisfrom-timestamp 動作をトリガーし、scan.snapshot-idfrom-snapshot 動作をトリガーします。それ以外の場合は latest-full として動作します。

初期スナップショットについてはバッチと同じロジックで、その後は継続的な増分読み取りを行います。

latest-full

最新のスナップショットを読み取ります。

起動時に最新のスナップショットを読み取り、その後増分データを生成します。

compacted-full

最新のフルコンパクション後のスナップショットを読み取ります。

起動時にコンパクション後のスナップショットを読み取り、その後増分データを生成します。

latest

latest-full と同じです。

増分データのみを生成し、初期スナップショットは生成しません。

from-timestamp

scan.timestamp-millis で指定された時刻またはそれ以前の最新のスナップショットを読み取ります。

scan.timestamp-millis 以降の増分データを生成し、初期スナップショットは生成しません。

from-snapshot

scan.snapshot-id で指定されたスナップショットを読み取ります。

scan.snapshot-id 以降の増分データを生成し、初期スナップショットは生成しません。

from-snapshot-full

from-snapshot と同じです。

起動時に指定されたスナップショットを読み取り、その後増分データを生成します。

自動パーティション有効期限の構成(Paimon)

Paimon は、パーティションタイムスタンプからの経過時間を基に、有効期限が切れたパーティションを自動的に削除できます。

パラメーター:

パラメーター

目的

partition.timestamp-pattern

パーティション値を時刻文字列に変換します。$column_name を各パーティションキーに使用します。

$year-$month-$day $hour:00:00

partition.timestamp-formatter

時刻文字列をタイムスタンプに解析するパターン。デフォルトは yyyy-MM-dd HH:mm:ss または yyyy-MM-dd です。任意の Java DateTimeFormatter パターンを受け入れます。

yyyy-MM-dd HH:mm:ss

partition.expiration-time

パーティションが削除されるまでの最大経過時間。

7d

例: パーティション year=2023,month=04,day=21,hour=17 とパターン $year-$month-$day $hour:00:00 の場合、解決されたタイムスタンプは 2023-04-21 17:00:00 になります。

Paimon への書き込み時に TaskManager のハートビートタイムアウトが発生する

これは、通常、TaskManager のヒープメモリが不足していることを示しています。Paimon はヒープメモリを以下に使用します:

  1. 同時タスクごとの書き込みバッファー: 各ライターオペレーターには、write-buffer-size(デフォルト:256 MB)でサイズ指定されたバッファーがあります。

  2. ORC 形式のバッファー: メモリ内のデータをカラム形式に変換します。orc.write.batch-size(デフォルト:1024 行)でサイズ指定されます。大きなレコードはメモリ消費を増大させます(例:1 行あたり 4 MB の JSON フィールドの場合、4 MB × 1024 = 4 GB)。

  3. バケットごとのライター: 各変更されたバケットには専用のライター・オブジェクトが割り当てられます。

原因別の解決方法:

  • write-buffer-size が大きすぎる: この値を小さくします。値が小さすぎると、頻繁なディスク書き込みおよびコンパクションが発生し、書き込みパフォーマンスが低下します。

  • 個々のレコードが大きすぎる: orc.write.batch-size を小さくするか、統計情報を無効化した Avro 形式に切り替えます。> 注意: file.format および metadata.stats-mode は、テーブル作成時にのみ設定可能であり、ALTER TABLE や SQL ヒントでは後から変更できません。

      'file.format' = 'avro',
      'metadata.stats-mode' = 'none'
  • パーティション/バケットが多すぎる: パーティションキーの構成およびバケット数を確認してください。バケットあたりの目標サイズは 2~5 GB です。バケットの調整については、「主キーテーブルおよび追加専用テーブル」をご参照ください。

"Sink materializer must not be used with Paimon sink"

シンクマテリアライザーオペレーター(カスケード JOIN による順序外データを修正するために設計)は、Paimon で不正確な集約結果や不要なオーバーヘッドを引き起こします。

これを無効化します:

SET 'table.exec.sink.upsert-materialize' = 'false';

順序外データの処理代替手段については、「主キーテーブルおよび追加専用テーブル」をご参照ください。

"File deletion conflicts detected" または "LSM conflicts detected"(Paimon)

考えられる原因:

  1. 同じパーティションに複数のデプロイメントが書き込んでいる: 失敗したデプロイメントを再起動します。偶発的な発生は正常です。

  2. 古くなった状態から再開している: エラーが継続的に発生します。最新の状態から再開するか、状態なしで再起動してください。

  3. 1 つのデプロイメントで複数の INSERT 文が実行されている: Paimon はこれをサポートしていません。UNION ALL を使用して、書き込み前にデータストリームをマージしてください。

  4. Global Committer または Compaction Coordinator の並列度が 1 より大きい: データ整合性を確保するため、両方とも 1 に設定する必要があります。

Paimon ソースからの読み取り時に「File xxx not found」が発生する

スナップショットファイルの有効期限が切れています。消費効率が低すぎるか、スナップショットの有効期限が短すぎます。

利用可能なスナップショットを確認するには、「システムテーブル」の「スナップショットテーブル」セクションをご参照ください。

"No space left on device"(Paimon)

ルックアップ結合または changelog-producer=lookup の場合: 以下のパラメーターを SQL ヒント を通じて構成します:

パラメーター

目的

推奨値

lookup.cache-max-disk-size

ルックアップキャッシュのディスク使用量を制限

256 MB、512 MB、または 1 GB

lookup.cache-file-retention

キャッシュファイルの保持期間

15 分または 30 分

一般的な書き込みの場合: SQL ヒントを通じて write-buffer-spill.max-disk-size を構成し、一時ファイルのサイズを制限します。

OSS 上の Paimon ファイル数が非常に多い

  1. 保持ポリシーの調整: Paimon はタイムトラベルアクセスのために履歴ファイルを保持します。保持期間を短縮してください。「期限切れデータのクリーンアップ」をご参照ください。

  2. パーティションおよびバケット構成の見直し: バケットあたりの目標サイズは 2~5 GB です。「主キーテーブルおよび追加専用テーブル」をご参照ください。

  3. 圧縮の有効化: テーブル作成時に 'file.compression' = 'zstd' を追加して、Zstandard 圧縮を使用します。> 注意: file.compression はテーブル作成時にのみ設定可能であり、ALTER TABLE や SQL ヒントでは後から変更できません。

チェックポイント間隔は Paimon のデータ可視性に影響しますか?

はい。Paimon は、チェックポイント時にのみデータをコミットし、下流のコンシューマーに可視化します。チェックポイント以前は、データがリモートファイルシステムにフラッシュされても、下流のシステムには通知されません。

長時間実行される Paimon ジョブにおけるメモリリーク

考えられる原因は 2 つあります:

  1. 想定される動作: リクエスト率(RPS)の増加に比例して、メモリ使用量が増加します。

  2. 既知の問題: filesystem メタストアタイプを使用する Paimon カタログで長時間実行されるジョブにおいて、メモリリークが発生します。OSS からの読み取り/書き込みを行う場合、以下のパラメーターを構成してください:

    • fs.oss.endpoint

    • fs.oss.accessKeyId

    • fs.oss.accessKeySecret


Hudi

ストレージにデータが存在しない(Hudi)

データは以下の条件でストレージにフラッシュされます:

  • バケットのメモリ使用量が 64 MB に達したとき。

  • 総バッファーが 1 GB に達したとき。

  • チェックポイントがトリガーされます。

ストリーミング書き込みの場合、チェックポイントが有効化されていることを確認してください。

重複データ(Hudi)

パーティション内(COW): 重複排除を有効化するには、write.insert.drop.duplicatestrue に設定します。Merge-on-Read(MOR)では、主キーが定義されている場合、重複排除は自動的に行われます。

Hudi 0.10.0 以降では、write.insert.drop.duplicateswrite.precombined に名称変更され、デフォルト値は true です。

パーティション間: index.global.enabledtrue に設定します。

古いデータ(インデックス TTL を超える): index.state.ttl(単位:日、デフォルト:1.5)を増加させます。0 より小さい値を設定すると、インデックスが永続的に保存されます。

Hudi 0.10.0 以降では、index.state.ttl のデフォルト値は 0(永続)です。

MORモード (Hudi) で生成されたログファイルのみ

Hudi は、コンパクションが実行された後にのみ Parquet ファイルを生成します。MOR モードでは、非同期コンパクションがデフォルトで 5 回のコミットごとに実行されます。compaction.delta_commits を小さくすることで、コンパクションを早期にトリガーできます。


AnalyticDB for MySQL 3.0

"multi-statement be found" エラー

MySQL JDBC 8.x と AnalyticDB for MySQL の ALLOW_MULTI_QUERIES=true との間に互換性の問題があります。

解決方法:

  1. テクニカルサポートに連絡して、MySQL JDBC 5.1.46 でビルドされたカスタム AnalyticDB for MySQL V3.0 コネクタを入手してください。使用方法については、「カスタムコネクタの管理」をご参照ください。

  2. JDBC URL に allowMultiQueries=true を追加します:

       jdbc:mysql://<host>.ads.aliyuncs.com:3306/<database>?allowMultiQueries=true

カスタムコネクタ

「... に適したドライバーが見つかりません」

カスタムコネクタが JDBC ドライバーを見つけられません。

解決方法(いずれかを選択):