このトピックでは、デプロイメントのパフォーマンスに関するよくある質問への回答を提供します。
RMI TCP 接続スレッドとは何ですか? RMI TCP 接続によって占有される CPU リソースが、他のスレッドによって占有される CPU リソースよりも過度に高いのはなぜですか?
Flink SQL デプロイメントのデータ ホットスポットの問題が原因でバックプレッシャーが発生した場合はどうすればよいですか?
デプロイメントのオペレーターを分割するにはどうすればよいですか?
ページで、目的のデプロイメントの名前をクリックします。 [デプロイメントの詳細] ページの [構成] タブで、[パラメーター] セクションの右上隅にある [編集] をクリックし、次のコードを [その他の構成] フィールドに追加して、[保存] をクリックして構成を有効にします。
pipeline.operator-chaining: 'false'GROUP BY を使用した集計関数を使用するデプロイメントを最適化するにはどうすればよいですか?
miniBatch を有効にしてデータスループットを向上させる
miniBatch が有効になっている場合、Realtime Compute for Apache Flink は、データキャッシュがトリガー条件を満たすとデータを処理します。これにより、Realtime Compute for Apache Flink が状態データにアクセスする回数が減ります。これにより、データスループットが向上し、データ出力が削減されます。
miniBatch 機能は、イベントメッセージに基づいてマイクロバッチ処理をトリガーします。イベントメッセージは、指定された間隔でソースに挿入されます。
シナリオ
マイクロバッチ処理は、待機時間を犠牲にしてスループットを向上させます。したがって、マイクロバッチ処理は、非常に低い待機時間が必要なシナリオには適用されません。ただし、データ集約シナリオでは、マイクロバッチ処理を有効にしてシステムパフォーマンスを向上させることをお勧めします。
miniBatch を有効にする方法
miniBatch 機能はデフォルトで無効になっています。この機能を有効にするには、デプロイメント詳細ページの [構成] タブの [パラメーター] セクションの [その他の構成] フィールドに次のコードを入力する必要があります。
table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5s次の表にパラメーターを示します。
パラメーター
説明
table.exec.mini-batch.enabled
miniBatch 機能を有効にするかどうかを指定します。
table.exec.mini-batch.allow-latency
データをバッチでエクスポートする間隔。
LocalGlobal を有効にして一般的なデータホットスポットの問題を解決する
LocalGlobal ポリシーは、ローカル集計を使用して、歪んだデータをフィルタリングできます。これにより、グローバル集計におけるデータ ホットスポットの問題が効率的に軽減され、デプロイメント パフォーマンスが向上します。
LocalGlobal ポリシーは、集約プロセスをローカル集約とグローバル集約の 2 つのフェーズに分割します。これらのフェーズは、MapReduce の combine フェーズと reduce フェーズに相当します。ローカル集約フェーズでは、Realtime Compute for Apache Flink は、各アップストリームノードでローカルにキャッシュされたデータのマイクロバッチを集約し、各マイクロバッチのアキュムレータ値を生成します。グローバル集約フレーズでは、アキュムレータが最終結果にマージされます。次に、グローバル集約結果が生成されます。
シナリオ
LocalGlobal ポリシーは、SUM、COUNT、MAX、MIN、AVG などの一般的な集計関数を使用して、デプロイメントのパフォーマンスを向上させ、データホットスポットの問題を解決する場合に適しています。
制限事項
LocalGlobal はデフォルトで有効になっています。このポリシーには次の制限があります。
LocalGlobal は、miniBatch が有効になっている場合にのみ有効になります。
AggregateFunction を使用してデータをマージする必要があります。
検証
LocalGlobal が有効になっているかどうかを確認するには、最終的なトポロジに GlobalGroupAggregate ノードまたは LocalGroupAggregate ノードが存在するかどうかを確認します。
PartialFinal を有効にして、COUNT DISTINCT 関数を使用する場合のデータホットスポットの問題を解決する
通常の場合、COUNT DISTINCT 関数を使用する場合は、個別のキーでデータを分散するレイヤーを追加する必要があります。このようにして、集約プロセスを 2 つのフェーズに分割して、データホットスポットの問題を解決できます。Realtime Compute for Apache Flink は、PartialFinal ポリシーを提供して、データを自動的に分散し、集約プロセスを分割します。
LocalGlobal ポリシーは、SUM、COUNT、MAX、MIN、AVG などの一般的な集計関数の パフォーマンスを向上させます。ただし、LocalGlobal ポリシーは、COUNT DISTINCT 関数のパフォーマンスを向上させるにはあまり効果的ではありません。これは、ローカル集約では重複する個別のキーを削除できないためです。その結果、グローバル集約フェーズで大量のデータが積み重ねられます。
シナリオ
PartialFinal ポリシーは、COUNT DISTINCT 関数を使用する場合に集約パフォーマンスが要件を満たせないシナリオに適しています。
重要ユーザー定義の集計関数(UDAF)を含む Flink SQL コードでは、PartialFinal を有効にできません。
リソースの浪費を防ぐために、データ量が多い場合にのみ PartialFinal を有効にすることをお勧めします。PartialFinal はデータを自動的に分散し、集約プロセスを 2 つのフェーズに分割します。これにより、追加のネットワークシャッフリングが発生します。
PartialFinal を有効にする方法
PartialFinal はデフォルトで無効になっています。この機能を有効にするには、デプロイメント詳細ページの [構成] タブの [パラメーター] セクションの [その他の構成] フィールドに次のコードを入力する必要があります。
table.optimizer.distinct-agg.split.enabled: true検証
最終的なトポロジで 1 層集約が 2 層集約に変更されるかどうかを確認します。
大量のデータが存在する場合に COUNT DISTINCT 関数を使用するシナリオでシステム パフォーマンスを向上させるために、AGG WITH CASE WHEN を AGG WITH FILTER に変更する
統計デプロイメントは、すべてのチャネルの ユニークビジター(UV)、モバイル端末の UV、PC の UV など、さまざまなディメンションで UV を記録します。多次元統計分析を実装するには、AGG WITH CASE WHEN 構文ではなく標準の AGG WITH FILTER 構文を使用することをお勧めします。その理由は、Realtime Compute for Apache Flink の SQL オプティマイザーが filter パラメーターを分析できるためです。このようにして、Realtime Compute for Apache Flink は、状態データを共有することにより、異なるフィルター条件で同じフィールドに対して COUNT DISTINCT 関数を実行できます。これにより、状態データの読み取りおよび書き込み操作が削減されます。パフォーマンステストでは、AGG WITH FILTER 構文は AGG WITH CASE WHEN 構文よりも優れています。これは、AGG WITH FILTER 構文のデプロイメント パフォーマンスが AGG WITH CASE WHEN 構文の 2 倍になるためです。
シナリオ
同じフィールドで異なる条件下での結果を計算するために COUNT DISTINCT 関数を使用する場合に、AGG WITH CASE WHEN の代わりに AGG WITH FILTER を使用すると、デプロイメント パフォーマンスが大幅に向上します。
元の文
COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2最適化された文
COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2
TopN プラクティスを使用したデプロイメントの最適化方法
TopN アルゴリズム
TopN の入力データストリームが Log Service データソースからのデータストリームなどの静的データストリームの場合、TopN は AppendRank アルゴリズムのみをサポートします。TopN の入力データストリームが集計関数または結合関数によって処理されるデータストリームなどの動的データストリームの場合、TopN は UpdateFastRank アルゴリズムと RetractRank アルゴリズムをサポートします。UpdateFastRank のパフォーマンスは RetractRank よりも優れています。使用中のアルゴリズムの名前は、トポロジ内のノードの名前に含まれています。
AppendRank: 静的データストリームに対してサポートされているのはこのアルゴリズムのみです。
UpdateFastRank: このアルゴリズムは、動的データストリームに最適です。
RetractRank: このアルゴリズムは、動的データストリームの基本的なアルゴリズムです。このアルゴリズムのパフォーマンスがビジネス要件を満たしていない場合は、特定のシナリオでこのアルゴリズムを UpdateFastRank に変更できます。
次のセクションでは、RetractRank を UpdateFastRank に変更する方法について説明します。UpdateFastRank アルゴリズムを使用する場合は、次の条件が満たされていることを確認してください。
TopN の入力データストリームは動的データストリームです。
入力データストリームにはプライマリキー情報が含まれています。たとえば、GROUP BY 句を使用して、ソースのプライマリキーに基づいて列を集計します。
ORDER BY COUNT、COUNT_DISTINCT、SUM(正の値)DESC などのフィールドまたは関数の値は、ソートの逆順に単調に更新されます。
ORDER BY SUM DESC を使用して UpdateFastRank の最適化プランを取得する場合は、正の SUM 値を取得するための条件を指定する必要があります。これにより、[total_fee] 値が正になります。
insert into print_test SELECT cate_id, seller_id, stat_date, pay_ord_amt -- rownum フィールドは出力データに含まれません。これにより、結果テーブルに書き込まれる出力データの量が削減されます。 FROM ( SELECT *, ROW_NUMBER () OVER ( PARTITION BY cate_id, stat_date -- stat_date フィールドが含まれていることを確認します。そうしないと、状態データの有効期限が切れると、データの順序が正しくなくなる可能性があります。 ORDER BY pay_ord_amt DESC ) as rownum -- データは入力データの合計に基づいてソートされます。 FROM ( SELECT cate_id, seller_id, stat_date, -- 注: SUM 関数の結果は単調に増加します。これは、SUM 関数によって返される値が正であるためです。したがって、TopN は最適化されたアルゴリズムを使用して上位 100 件のデータレコードを取得できます。 sum (total_fee) filter ( where total_fee >= 0 ) as pay_ord_amt FROM random_test WHERE total_fee >= 0 GROUP BY cate_name, seller_id, stat_date, cate_id ) a ) WHERE rownum <= 100;TopN 最適化手法
ランキングなしの最適化を実行する
TopN の出力に rownum を含めないでください。最終的にフロントエンドに表示される際に結果をソートすることをお勧めします。これにより、結果テーブルに書き込まれるデータ量が大幅に削減されます。ランキングなしの最適化手法の詳細については、「Top-N」をご参照ください。
TopN のキャッシュサイズを増やす
TopN は、状態データへのアクセス効率を向上させるために状態キャッシュを提供します。TopN のキャッシュヒット率は、次の式を使用して計算されます。
cache_hit = cache_size*parallelism/top_n/partition_key_numたとえば、Top100 が使用され、キャッシュに 10,000 レコードが含まれ、並列度が 50 であるとします。PARTITION BY 関数のキーの数が 100,000 の場合、キャッシュヒット率は 5% です。この比率は、10000 × 50 / 100 / 100,000 = 5% という式を使用して計算されます。キャッシュヒット率が低いということは、多数のリクエストがディスクの状態データにアクセスし、状態シークメトリックの値が安定していない可能性があることを示しています。この場合、パフォーマンスが大幅に低下します。
したがって、PARTITION BY 関数のキーの数が多い場合は、TopN のキャッシュサイズとヒープメモリを増やすことができます。詳細については、「デプロイメントを構成する」をご参照ください。
table.exec.rank.topn-cache-size: 200000この例では、TopN のキャッシュサイズをデフォルト値の 10,000 から 200,000 に増やすと、キャッシュヒット率が 100% に達する可能性があります。このキャッシュヒット率は、
200,000 × 50 / 100 / 100,000 = 100%という式を使用して計算されます。PARTITION BY 関数に時間フィールドを含める
たとえば、毎日 Day フィールドをランキングに追加します。そうしないと、状態データの生存時間(TTL)が原因で TopN の結果の順序が正しくなくなります。
効率的な重複排除の実行方法
Realtime Compute for Apache Flink の入力ストリームには重複データが含まれている場合があり、効率的な重複排除が頻繁に求められます。 Realtime Compute for Apache Flink は、重複を削除するための 2 つのポリシー(Deduplicate Keep FirstRow と Deduplicate Keep LastRow)を提供しています。
構文
Flink SQL は、重複を削除するための構文を提供していません。 指定されたプライマリキーで重複行の最初または最後の行のレコードを予約し、他の重複を破棄するには、SQL ROW_NUMBER() 関数を OVER 句と共に使用する必要があります。 重複排除は特別な TopN 関数です。
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY col1[, col2..] ORDER BY timeAttributeCol [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1パラメータ
説明
ROW_NUMBER()
行番号を計算します。 これは、OVER 句と共に使用されるウィンドウ関数です。 値は 1 から始まります。
PARTITION BY col1[, col2..]
オプション。 重複排除のプライマリキーを格納するパーティション列を指定します。
ORDER BY timeAttributeCol [asc|desc])
データをソートする列を指定します。 proctime または rowtime の時間属性を指定する必要があります。 時間属性に基づいて、行を昇順または降順にソートできます。 昇順の場合、重複行の最初の行のレコードが予約されます。 降順の場合、重複行の最後の行のレコードが予約されます。
rownum
行数を指定します。
rownum = 1またはrownum <= 1を設定できます。上記の構文は、重複排除に 2 レベルのクエリが関係していることを示しています。
ROW_NUMBER()ウィンドウ関数を使用して、指定された時間属性に基づいてデータをソートし、ランキングを使用してデータをマークします。時間属性が proctime の場合、Realtime Compute for Apache Flink はレコードが処理された時間に基づいて重複を削除します。 この場合、ソート結果は、システムがデータレコードをソートするたびに異なる場合があります。
時間属性が rowtime の場合、Realtime Compute for Apache Flink はレコードが Realtime Compute for Apache Flink に書き込まれた時間に基づいて重複を削除します。 この場合、ソート結果は、システムがデータレコードをソートするたびに同じになります。
指定されたプライマリキーで最初の行のレコードのみを予約し、他の重複を削除します。
時間属性に基づいて、レコードを昇順または降順にソートできます。
Deduplicate Keep FirstRow: Realtime Compute for Apache Flink は、時間属性に基づいて行のレコードを昇順にソートし、指定されたプライマリキーで重複行の最初の行のレコードを予約します。
Deduplicate Keep LastRow: Realtime Compute for Apache Flink は、時間属性に基づいて行のレコードを降順にソートし、指定されたプライマリキーで重複行の最初の行のレコードを予約します。
Deduplicate Keep FirstRow
Deduplicate Keep FirstRow ポリシーを選択すると、Realtime Compute for Apache Flink は指定されたプライマリキーで重複行の最初の行のレコードを予約し、他の重複を破棄します。 この場合、状態データはプライマリキー情報のみを格納し、状態データへのアクセスの効率が大幅に向上します。 次の例は、ポリシーを説明するために使用されます。
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum FROM T ) WHERE rowNum = 1上記のコードは、b フィールドに基づいてテーブル T から重複を削除し、システム時間に基づいて指定されたプライマリキーで重複行の最初の行のレコードを予約します。 この例では、proctime 属性は、Realtime Compute for Apache Flink でレコードが処理されたシステム時間を示します。 Realtime Compute for Apache Flink は、この属性に基づいてテーブル T のレコードをソートします。 システム時間に基づいて重複を削除するには、proctime 属性を宣言する代わりに PROCTIME() 関数を呼び出すこともできます。
Deduplicate Keep LastRow
Deduplicate Keep LastRow ポリシーを選択すると、Realtime Compute for Apache Flink は指定されたプライマリキーで重複行の最後の行のレコードを予約し、他の重複を破棄します。 このポリシーは、パフォーマンスの点で LAST_VALUE 関数よりもわずかに優れています。 次の例は、ポリシーを説明するために使用されます。
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum FROM T ) WHERE rowNum = 1上記のコードは、b フィールドと d フィールドに基づいてテーブル T から重複を削除し、レコードが Realtime Compute for Apache Flink に書き込まれた時間に基づいて指定されたプライマリキーで最後の行のレコードを予約します。 この例では、rowtime 属性は、レコードが Realtime Compute for Apache Flink に書き込まれたイベント時間を示します。 Realtime Compute for Apache Flink は、この属性に基づいてテーブル T のレコードをソートします。
ビルトイン関数を使用する際に注意すべき点はありますか?
ユーザー定義関数 (UDF) の代わりにビルトイン関数を使用する
Realtime Compute for Apache Flink のビルトイン関数は継続的に最適化されています。可能な場合は、UDF をビルトイン関数に置き換えることをお勧めします。Realtime Compute for Apache Flink は、ビルトイン関数を最適化するために次の操作を実行します。
シリアル化とデシリアル化の効率を向上させます。
バイト単位のデータ操作を可能にします。
KEYVALUE 関数では単一文字のデリミタを使用する
KEYVALUE 関数のシグネチャは、
KEYVALUE(content, keyValueSplit, keySplit, keyName)です。 keyValueSplit と keySplit がコロン (:) やカンマ (,) などの単一文字のデリミタである場合、Realtime Compute for Apache Flink は最適化されたアルゴリズムを使用します。 Realtime Compute for Apache Flink は、バイナリデータ内で必要な KeyName 値を検索し、コンテンツ全体をセグメント化しません。 デプロイのパフォーマンスが約 30% 向上します。LIKE 演算子を使用する場合は、次の点に注意してください。
指定したコンテンツで始まるレコードを照合するには、
LIKE 'xxx%'を使用します。指定したコンテンツで終わるレコードを照合するには、
LIKE '%xxx'を使用します。指定したコンテンツを含むレコードを照合するには、
LIKE '%xxx%'を使用します。指定したコンテンツと同じレコードを照合するには、
LIKE 'xxx'を使用します。これはstr = 'xxx'と同等です。アンダースコア (_) を照合するには、
LIKE '%seller/_id%' ESCAPE '/'を使用します。 アンダースコア (_) は、SQL の単一文字のワイルドカードであり、すべての文字と一致するため、エスケープされます。LIKE '%seller_id%'を使用すると、seller_id、seller#id、sellerxid、seller1idなど、多数の結果が返されます。 これにより、予期しない結果が生じる可能性があります。
正規表現の使用を避ける
正規表現を使用したデータ処理には時間がかかり、加算、減算、乗算、除算よりも 100 倍以上のパフォーマンスオーバーヘッドが発生する可能性があります。 また、正規表現は極端な場合に無限ループに入る可能性があります。 その結果、ジョブがブロックされる可能性があります。 詳細については、「Regex execution is too slow」をご参照ください。 デプロイのブロッキングの問題を防ぐために、LIKE 演算子を使用することをお勧めします。 よく使用される正規表現の詳細については、次のリンクをクリックしてください。
テーブルから完全データを読み取るときに、データ読み取り効率が低く、バックプレッシャーが存在する場合はどうすればよいですか?
ダウンストリーム ノードが低速でデータを処理する場合、バックプレッシャーが発生する可能性があります。 シンクにバックプレッシャーがあるかどうかを確認できます。 シンクにバックプレッシャーがある場合は、次のいずれかの方法を使用して、最初にシンクのバックプレッシャーの問題を解決します。この問題を解決するには、次のいずれかの方法を使用できます。
並列処理の次数を増やします。
miniBatch などの集約最適化機能を有効にします。
状態の期間サブタスクメトリクス タブの 列にある凡例のさまざまな色の意味は何ですか?

[ステータス期間] 列の値は、頂点サブタスクの各フェーズの期間を示します。このセクションでは、各ノードボックスの色の意味について説明します。
: 作成済み
: スケジュール済み
: デプロイ中
: 初期化中
: 実行中
RMI TCP Connection スレッドとは何ですか。なぜ RMI TCP Connection で使用される CPU リソースが他のスレッドで使用される CPU リソースよりも過剰に高いのですか。

RMI TCP Connection スレッドは、Java の Remote Method Invocation(RMI)フレームワークのスレッドです。このスレッドは、リモートメソッドを呼び出すために使用されます。スレッドで使用される CPU リソースは、リアルタイムで動的に変化します。短期間のメトリックの変動は、CPU 負荷全体が過剰に高いことを示しているとは限りません。一定期間における RMI TCP Connection スレッドで使用される CPU リソースを観察し、CPU 使用率評価のためにスレッドのフレームグラフを分析できます。次の図は、RMI TCP Connection スレッドが CPU リソースをほとんど消費していないことを示しています。

ウォーターマーク タブの [低ウォーターマーク] パラメーターと [ウォーターマーク タイムスタンプの日時] パラメーターの値に表示される時間と現在の時間の間、およびメトリック タブのウォーターマーク セクションの [タスク入力ウォーターマーク] メトリックの値に表示される時間と現在の時間の間には、なぜ時間差があるのですか?
原因 1:ソーステーブルでウォーターマークを宣言するために、
TIMESTAMP_LTZ (TIMESTAMP(p) WITH LOCAL TIME ZONE)データ型のフィールドが使用されています。その結果、現在の時間とウォーターマーク関連のパラメーターの値の間に時間差が生じます。次の例は、TIMESTAMP_LTZ データ型のフィールドを使用して宣言されたウォーターマークと、TIMESTAMP データ型のフィールドを使用して宣言されたウォーターマークの違いを示しています。
次のサンプルコードは、ソーステーブルでウォーターマークを宣言するために使用されるフィールドが TIMESTAMP_LTZ データ型であることを示しています。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts as CURRENT_TIMESTAMP,-- ビルトイン関数 CURRENT_TIMESTAMP を使用して TIMESTAMP_LTZ データ型のデータを生成します。 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE t1 ( k INT, ts_ltz timestamp_ltz(3), cnt BIGINT ) WITH ('connector' = 'print'); -- 計算結果を取得します。 INSERT INTO t1 SELECT b, window_start, COUNT(*) FROM TABLE( TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND)) GROUP BY b, window_start, window_end;説明従来のウィンドウの構文を使用して生成される計算結果は、
テーブル値関数 (TVF) ウィンドウを使用して生成される計算結果と同じです。次のサンプルコードは、従来のウィンドウの構文の例を示しています。SELECT b, TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) FROM s1 GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), b;Realtime Compute for Apache Flink の開発コンソールでドラフトをデプロイして公開した後、[ステータス] タブの [ウォーターマーク] タブの [低ウォーターマーク] パラメーターと [ウォーターマーク タイムスタンプの日時] パラメーターの値で指定された時間と、[メトリック] タブの [ウォーターマーク] セクションの [タスク入力ウォーターマーク] メトリックの値で指定された時間と現在の時間 (UTC + 08:00) の間に 8 時間の時間差があります。
ウォーターマークと低ウォーターマーク

タスク入力ウォーターマーク

次のサンプルコードは、ソーステーブルでウォーターマークを宣言するために使用されるフィールドが TIMESTAMP (TIMESTAMP(p) WITHOUT TIME ZONE) データ型であることを示しています。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, -- シミュレートされたデータソースのタイムスタンプにはタイムゾーン情報が含まれていません。この場合、タイムスタンプは 2024-01-31 01:00:00 から 1 秒ずつ増加します。 ts as TIMESTAMPADD(SECOND, a, TIMESTAMP '2024-01-31 01:00:00'), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.a.kind'='sequence','fields.a.start'='0','fields.a.end'='100000', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE t1 ( k INT, ts_ltz timestamp_ltz(3), cnt BIGINT ) WITH ('connector' = 'print'); -- 計算結果を取得します。 INSERT INTO t1 SELECT b, window_start, COUNT(*) FROM TABLE( TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND)) GROUP BY b, window_start, window_end;Realtime Compute for Apache Flink の開発コンソールでドラフトをデプロイして公開した後、[ステータス] タブの [ウォーターマーク] タブの [低ウォーターマーク] パラメーターと [ウォーターマーク タイムスタンプの日時] パラメーターの値で指定された時間と、[メトリック] タブの [ウォーターマーク] セクションの [タスク入力ウォーターマーク] メトリックの値で指定された時間は現在の時間と同じです。この例では、現在の時間とシミュレートされた時間の間に時間差はありません。
ウォーターマークと低ウォーターマーク

タスク入力ウォーターマーク

原因 2:Realtime Compute for Apache Flink の開発コンソールに表示される時間のタイムゾーンと Apache Flink UI に表示される時間のタイムゾーンが異なります。
Realtime Compute for Apache Flink の開発コンソールに表示される時間は UTC + 0 です。ただし、Apache Flink UI に表示される時間は、Apache Flink UI がブラウザを使用して取得するローカルタイムゾーンに基づいて変換されたローカル時間です。次の例は、UTC + 08:00 が使用されている場合の Realtime Compute for Apache Flink の開発コンソールに表示される時間と Apache Flink UI に表示される時間の差を示しています。Realtime Compute for Apache Flink の開発コンソールに表示される時間は、Apache Flink UI に表示される時間よりも 8 時間早いです。
Realtime Compute for Apache Flink の開発コンソール

Apache Flink UI

How do I troubleshoot backpressure issues?バックプレッシャーの問題をトラブルシューティングするにはどうすればよいですか?
[デプロイメント] ページで、デプロイメントの名前をクリックします。デプロイメント詳細ページで、[ステータス] タブをクリックします。
ビジー パラメーターとバックプレッシャー パラメーターを確認して、バックプレッシャーが発生しているオペレーターを特定します。
ビジー インジケーター ライトが赤いほど、負荷が大きくなります。バックプレッシャー インジケーターが暗いほど、バックプレッシャーの問題が深刻になります。

バックプレッシャーが発生しているオペレーターをクリックします。
[バックプレッシャー] タブで、サブタスクのバックプレッシャー ステータスを確認します。

How do I troubleshoot high latency issues?高レイテンシの問題をトラブルシューティングするにはどうすればよいですか?
[デプロイメント] ページの [アラーム] タブまたは [メトリック] タブで、currentEmitEventTimeLag メトリックと currentFetchEventTimeLag メトリックを表示し、関連する処理操作を実行します。メトリックの説明:
currentEmitEventTimeLagメトリックの値が過度に高い場合、デプロイメントのデータのプルまたは処理時にレイテンシが発生しています。オペレーターのパフォーマンスが要件を満たしているかどうかを確認する必要があります。currentFetchEventTimeLagメトリックの値が過度に高い場合、デプロイメントのアップストリーム システムでデータのプルまたは処理時にレイテンシが発生しています。ネットワーク I/O の問題とアップストリーム システムの問題をトラブルシューティングする必要があります。
アップストリーム システムで高レイテンシの問題が発生した場合、両方のメトリックの値が同時に増加します。

What do I do if backpressure occurs due to data hotspot issues of Flink SQL deployments?Flink SQL デプロイメントのデータ ホットスポットの問題が原因でバックプレッシャーが発生した場合はどうすればよいですか?
バックプレッシャーが発生しているデプロイメントのサブタスク ステータスを確認します。この問題がデータ ホットスポットによって引き起こされていると判断できます。この場合、パフォーマンス最適化のために次のいずれかの方法を使用します。
LocalGlobal を有効にして一般的なデータ ホットスポットの問題を解決する
LocalGlobal ポリシーは、ローカル集計を使用して、歪んだデータをフィルタリングできます。これにより、グローバル集計におけるデータ ホットスポットの問題が効率的に軽減され、デプロイメント パフォーマンスが向上します。
LocalGlobal ポリシーは、集計プロセスをローカル集計とグローバル集計の 2 つのフェーズに分割します。これらのフェーズは、MapReduce の結合フェーズと削減フェーズに相当します。ローカル集計フェーズでは、Realtime Compute for Apache Flink は、各アップストリーム ノードでローカルにキャッシュされたデータのマイクロバッチを集計し、各マイクロバッチのアキュムレーター値を生成します。グローバル集計フレーズでは、アキュムレーターが最終結果にマージされます。次に、グローバル集計結果が生成されます。
シナリオ
LocalGlobal ポリシーは、SUM、COUNT、MAX、MIN、AVG などの一般的な集計関数を使用して、デプロイメント パフォーマンスを向上させ、データ ホットスポットの問題を解決する場合に適しています。
制限
LocalGlobal はデフォルトで有効になっています。このポリシーには次の制限があります。
miniBatch が有効になっている場合にのみ有効になります。
データをマージするには、AggregateFunction を使用する必要があります。
検証
LocalGlobal が有効になっているかどうかを確認するには、最終的なトポロジに GlobalGroupAggregate ノードまたは LocalGroupAggregate ノードが存在するかどうかを確認します。
PartialFinal を有効にして、COUNT DISTINCT 関数を使用する際のデータ ホットスポットの問題を解決する
通常の場合、COUNT DISTINCT 関数を使用する場合は、個別のキーでデータを分散するレイヤーを追加する必要があります。このようにして、集計プロセスを 2 つのフェーズに分割して、データ ホットスポットの問題を解決できます。Realtime Compute for Apache Flink は現在、PartialFinal ポリシーを提供して、データを自動的に分散し、集計プロセスを分割します。
LocalGlobal ポリシーは、SUM、COUNT、MAX、MIN、AVG などの一般的な集計関数のパーフォーマンスを向上させます。ただし、LocalGlobal ポリシーは、COUNT DISTINCT 関数のパーフォーマンスを向上させる効果はあまりありません。これは、ローカル集計では重複する個別のキーを削除できないためです。その結果、グローバル集計フェーズで大量のデータがスタックされます。
シナリオ
PartialFinal ポリシーは、COUNT DISTINCT 関数を使用する際に集計パフォーマンスが要件を満たせない場合に適しています。
重要UDAF を含む Flink SQL コードでは、PartialFinal を有効にできません。
リソースの無駄を防ぐために、データ量が多い場合にのみ PartialFinal を有効にすることをお勧めします。PartialFinal はデータを自動的に分散し、集計プロセスを 2 つのフェーズに分割します。これにより、追加のネットワーク シャッフリングが発生します。
PartialFinal を有効にする方法
PartialFinal はデフォルトで無効になっています。この機能を有効にするには、デプロイメント詳細ページの [構成] タブの [パラメーター] セクションの [その他の構成] フィールドに次のコードを入力する必要があります。
table.optimizer.distinct-agg.split.enabled: true検証
最終的なトポロジで、1 層集計が 2 層集計に変更されるかどうかを確認します。
上流データの消費が不安定な場合はどうすればよいですか?
考えられる原因と解決策:
アップストリーム データの生成速度とデータ処理速度の不一致
アップストリーム データの生成ルールを分析して、データ生成速度がデータ処理速度と一致していることを確認します。
デプロイメントのバックプレッシャー
ジョブ頂点にバックプレッシャーが存在するかどうかを確認します。これはアップストリーム データの消費速度に影響します。デプロイメントにノードが 1 つしかない場合は、
pipeline.operator-chaining: 'false'構成を追加して、デプロイメントを再起動します。デプロイメントのオペレーター チェーンを分解し、消費率がバックプレッシャーのあるノードによって悪影響を受けているかどうかを確認します。異常な I/O レート
不安定なデータ消費の問題が発生した時点で Realtime Compute for Apache Flink のデータ入力メトリックと消費レートを確認して、問題が異常な I/O レートによって引き起こされているかどうかを確認します。
異常なデータ消費レート
データ消費レートがガベージ コレクション(GC)が発生する時点に変動するかどうかを確認します。レートが GC が発生する時点に変動する場合は、関連する TaskManager ノードのメモリ使用量を確認します。
