全部產品
Search
文件中心

Realtime Compute for Apache Flink:SQL作業大狀態導致反壓的調優原理與方法

更新時間:Nov 13, 2024

狀態管理不僅影響應用的效能,還關係到系統的穩定性和資源的有效利用。如果狀態管理不當,可能會導致效能下降、資源耗盡,甚至系統崩潰。本文為您介紹SQL作業大狀態導致反壓的調優原理與方法。

運行原理:狀態運算元的產生

作為一種特定領域語言,SQL的設計初衷是隱藏底層資料處理的複雜性,可以通過聲明式語言來進行資料操作。而Flink SQL由於其架構的特殊性,在實現層面通常需要引入狀態後端配合系統檢查點(Checkpoint)來保證計算結果的最終一致性。目前Flink SQL由最佳化器根據配置項以及SQL語句來推導產生狀態運算元,想要高效處理有狀態的大規模資料和效能調優,需要對SQL狀態運算元產生機制和管理原則有一定瞭解。

基於最佳化器推導產生的狀態運算元

主要有如下三種狀態運算元:

狀態運算元

狀態清理機制

ChangelogNormalize

生命週期TTL

SinkUpsertMaterlizer

LookupJoin(*)

ChangelogNormalize

ChangelogNormalize旨在對涉及主鍵語義的資料變更日誌進行標準化處理。通過該運算元,可以有效地整合和最佳化資料變更記錄,確保資料的一致性和準確性。該狀態運算元會在以下兩種情境出現:

  • 使用了帶有主鍵的upsert源表

    upsert源表特指在保持主鍵順序一致性的前提下,僅產生基於主鍵的UPDATE(包括INSERT和 UPDATE_AFTER)及DELETE操作的變更資料表。例如,Upsert Kafka便是支援這類操作的典型連接器之一。此外,您也可以通過重寫自訂來源表連接器中的getChangelogMode方法,實現upsert功能。

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.upsert();
    }
  • 顯式設定'table.exec.source.cdc-events-duplicate' = 'true'

    在使用at-least-once語義進行CDC事件處理時,可能會產生重複的變更日誌。在需要exactly-once語義時,您需要開啟此配置項來對變更日誌進行去重。例如

    當出現該運算元時,上遊資料將按照Flink SQL源表DDL中定義的主鍵做一次hash shuffle操作後使用ValueState來儲存當前主鍵下最新的整行記錄。更新狀態並向下遊發送變更的過程如下圖所示。處理第二條-U(2, 'Jerry', 77)時State已經empty,說明截止目前+I/+UA和-D/-UB已經兩兩抵銷,當前這條retract訊息是重複的,可以丟棄。

SinkUpsertMaterializer

專門用於處理具有主鍵定義的結果表,並確保資料的物化操作符合upsert語義。在資料流更新過程中,如果無法保證upsert的特定要求,即按照主鍵進行更新時保持資料的唯一性和有序性,最佳化器會自動引入此運算元。它通過維護基於結果表主鍵的狀態資訊,來確保這些約束得到滿足。更多資訊及常見情境請參見Flink SQL中Changelog事件亂序處理原理

LookupJoin

在處理LookupJoin操作時,若主動配置了系統最佳化選項'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE',且最佳化器識別到潛在的非確定性更新問題(如何消除流查詢的不確定性影響),則系統會嘗試採取特殊措施以解決這一問題。具體而言,若通過引入一個狀態運算元能夠消除非確定性,最佳化器便會自動建立一個帶狀態的LookupJoin運算元。

帶狀態的LookupJoin運算元主要適用於以下情況:結果表被定義了主鍵,而這些主鍵完全或部分來自於維表,同時維表中的資料可能會發生變化(例如通過變更資料擷取,即CDC Lookup Source機制)。此外,用於Join操作的欄位在維表中並非主鍵。在這種情況下,帶狀態的LookupJoin運算元能夠有效地處理資料的動態變化,確保查詢結果的準確性和一致性。

基於SQL操作產生的狀態運算元

基於SQL操作產生的狀態運算元,按狀態清理機制可以分為TTL到期和依賴watermark推進兩類。具體說來,Flink SQL裡有部分狀態運算元的生命週期不是由TTL來控制,例如Window相關的狀態計算(WindowAggregate、WindowDeduplicate、WindowJoin、WindowTopN等)。它們的狀態清理主要依賴於watermark的推進,當watermark超過視窗結束時間時,內建的定時器就會觸發狀態清理。

狀態運算元

如何產生

狀態清理機制

Deduplicate

使用row_number語句,order by的欄位必須為時間屬性(time attribute)欄位(事件時間event time或處理時間processing time),且只取第一條。

TTL

RegularJoin

使用join語句,等值條件裡不包含時間屬性欄位。

GroupAggregate

使用group by語句進行分組彙總,如sum、count、min、max、first_value、last_value,或使用distinct關鍵字。

GlobalGroupAggregate

分組彙總開啟local-global最佳化。

IncrementalGroupAggregate

當存在兩層分組彙總操作並開啟兩階段最佳化時,內層彙總對應的狀態運算元GlobalGroupAggregate和外層彙總對應的狀態運算元LocalGroupAggregate被合并成一個IncrementalGroupAggregate。

Rank

使用row_number語句,order by的欄位必須為非時間屬性欄位。

GlobalRank

使用row_number語句,order by的欄位必須為非時間屬性欄位,並開啟local-global最佳化。

IntervalJoin

使用join語句,範圍條件裡包含時間屬性欄位(事件時間或處理時間)。例如:

L.time between R.time + X and R.time + Y 
  -- 或 
R.time between L.time - Y and L.time - X

watermark

TemporalJoin

使用基於事件時間的inner或left join語句。

WindowDeduplicate

基於Window TVF的去重操作。

WindowAggregate

基於Window TVF彙總。

GlobalWindowAggregate

基於Window TVF彙總,並開啟兩階段最佳化。

WindowJoin

基於Window TVF的Join。

WindowRank

基於Window TVF的排序。

GroupWindowAggregate

基於legacy文法的Window彙總。

問題診斷方法

在Flink作業遭遇效能瓶頸時,系統往往表現出明顯的反壓現象。這種反壓可能由多種因素引起,但主要的原因之一是作業狀態規模的持續膨脹,直至超出記憶體限制。此時,狀態儲存引擎會將部分不頻繁使用的狀態資料移至磁碟,而磁碟與記憶體在資料存取速度上的巨大差異,使得磁碟IO操作成為資料處理效率的瓶頸。尤其在Flink的計算過程中,如果運算元頻繁地從磁碟讀取狀態資料,將顯著增加作業的延遲,降低整體處理速度,成為效能問題的根源。

為了準確識別是否由狀態訪問引發反壓,需要對作業的運行狀態和運算元行為進行深入分析。利用監控工具追蹤和診斷效能瓶頸,可以有效地發現並解決由狀態訪問引起的效能問題,從而提升Flink作業的效能,具體方法請參見問題診斷方法

調優方法

主動避免產生不必要的狀態運算元

基於SQL操作產生的狀態運算元一般很難避免,因此主要針對最佳化器自動推導的運算元進行討論。

  • ChangelogNormalize

    在使用upsert source進行資料處理時,需注意其ChangelogNormalize狀態節點的產生。通常情況下,除了事件時間的時態關聯(event time temporal join)外,其他upsert source應用情境都會產生該狀態節點。因此,在選擇Upsert Kafka或類似的Upsert連接器時,應首先評估具體的使用情境,對於非事件時間關聯情境,應特別關注狀態運算元的狀態指標(state metrics)。由於狀態節點是基於KeyedState的,當源表的主鍵數量龐大時,狀態節點的規模也會相應增加。如果物理表的主鍵更新頻繁,狀態節點也將頻繁地被訪問和修改。從實踐角度而言,像資料同步類的情境,建議避免使用Upsert Kafka作為源表連接器,同時也最好選擇能夠保證exactly-once語義的資料同步工具。

  • SinkUpsertMaterializer

    auto作為table.exec.sink.upsert-materialize配置項的預設值,表明系統會自動判斷資料的一致性,尤其是在變更日誌(changelog)出現無序的情況下。該機制確保了通過引入SinkUpsertMaterializer來鑑效組資料處理的準確性。但並不意味著每當該運算元被啟用,資料就一定存在無序問題。例如,將多個分組鍵(group by key)合并的操作,這種情況下最佳化器無法準確推匯出upsert鍵,因此出於安全考慮,會預設添加SinkUpsertMaterializer。如果對資料的分布有充分的瞭解,不使用該運算元也能夠確保輸出結果的正確性,可以將參數設定為none,從而在資料正確性和效能上都得到保證。

    您可以通過檢查作業的最後一個節點來確認SinkUpsertMaterializer是否被啟用使用。在作業的運行拓撲圖中(如下所示),該運算元通常會與sink運算元一起顯示,形成一個運算元鏈。通過這種方式,可以直觀地監控和評估SinkUpsertMaterializer在資料處理過程中的實際應用情況,從而做出更加合理的最佳化決策。

    image.png

    image.png

    在檢測到產生了特定運算元且資料計算無誤的情況下,可以調整配置項為 'table.exec.sink.upsert-materialize'='none'(配置步驟請參見空間管理與操作),以避免自動添加SinkUpsertMaterializer。Realtime Compute引擎VVR 8.0及以上版本中引入了SQL執行計畫智能分析功能,協助您更好地識別此類問題,如下圖所示。

    image.png

減少狀態訪問頻次:開啟mini-batch

在對延時要求不高(比如分鐘層級更新)的情境下,開啟mini-batch攢批最佳化將會減少State的訪問和更新頻率(具體操作請參見開啟MiniBatch),提升吞吐。

Realtime ComputeFlink版可以應用mini-batch的狀態運算元如下:

狀態運算元

說明

ChangelogNormalize

無。

Deduplicate

可配置table.exec.deduplicate.mini-batch.compact-changes-enable,在基於事件時間去重時是否壓縮Changelog。

GroupAggregate

GlobalGroupAggregate

IncrementalGroupAggregate

無。

RegularJoin

需額外配置table.exec.stream.join.mini-batch-enabled開啟mini-batch join最佳化。適用於更新流和outer join情境。

減少狀態大小設定合理生命週期

說明

開啟或關閉TTL不能保證完全相容。當嘗試在已開啟TTL的作業上關閉TTL配置時,或者反過來操作時,將會導致相容性失敗並引發StateMigrationException異常。

在最佳化計算系統時,關鍵在於精簡狀態資料以提高效能。您可以在作業營運頁面配置State資料到期時間(參數詳情請參見運行參數配置)來控製作業狀態的生命週期,以滿足不同的營運需求和策略。

image.png

過短的TTL可能導致資料未能及時處理,從而產生不符合預期的計算結果,例如,在彙總或串連操作時,部分資料晚到,而相關狀態已到期,導致結果異常。相反,過長的TTL會消耗資源,降低作業的穩定性。因此,在對Flink SQL作業進行TTL配置時,建議根據資料特性和業務需求進行恰當的TTL設定。例如,如果計算周期以自然天為單位,並且資料跨天漂移不會超過1小時,那麼將TTL設定為25小時即可滿足需求。資料開發人員應深入瞭解業務情境和計算邏輯,以實現最佳的平衡。

此外,針對雙流串連情境,Flink SQL自Realtime Compute引擎VVR 8.0.1版本起,支援通過JOIN_STATE_TTL Hint為左流和右流分別設定不同的生命週期。這一改進允許為各自資料流定製生命週期,有效減少不必要的狀態儲存開銷,從而最佳化作業效能。您可以根據左右流資料的實際生命週期需求,靈活配置,以達到節省資源和提高作業效率的目的,具體操作請參見查詢提示

SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...

下面是一個作業使用JOIN_STATE_TTL Hint前後的State大小對比樣本。

對比

作業情況

狀態大小

最佳化前

  • 雙流join操作,左流資料量大,約為右流的20至50倍。右流需長期儲存資料,原定為18天。為提升效能,實際將右流的儲存周期縮短至10天,導致資料正確性受損。

  • join操作的狀態大小約為5.8 TB。

  • 單作業所需資源高達700 CU。

22

最佳化後

  • 通過合理設定JOIN_STATE_TTL Hint,左流可縮短至12小時,右流保持18天的儲存周期,無需犧牲資料完整性。

  • join操作的狀態大幅減少至約590 GB,僅約為原來的十分之一。

  • 資源消耗顯著降低,從700 CU降至200-300 CU,節省了50%-70%的資源。

23e

減少狀態大小:命中更優的執行計畫

在產生執行計畫時,最佳化器會結合輸入SQL和配置選擇相應的State實現。

  • 利用主鍵最佳化雙流串連

    • 當串連鍵(Join Key)包含主鍵時,系統採用ValueState<RowData>進行資料存放區,這樣可以為每個串連鍵僅保留一條最新記錄,實現儲存空間的最大化節省。

    • 如果串連操作使用了非主鍵欄位,即使已定義主鍵,系統會使用MapState<RowData, RowData>進行儲存,以便為每個串連鍵儲存來自源表的、基於主鍵的最新記錄。

    • 在未定義主鍵的情況下,系統將使用MapState<RowData, Integer>儲存資料,記錄每個串連鍵對應的整行資料及其出現次數。

    因此,建議在建表DDL中聲明主鍵,並在雙流串連時優先使用主鍵,以最佳化儲存效率。

  • 最佳化append_only流去重操作

    使用ROW_NUMBER函數替代FIRST_VALUE或LAST_VALUE函數進行去重,可以更有效地保留首次(ROW_NUMBER函數產生的Deduplicate運算元僅保留出現過的Key)或最新出現的記錄(保留Key及其最後一次出現的記錄)。

  • 提升彙總查詢效能

    在進行多維度統計,例如計算全網UV、手機用戶端UV、PC端UV等,推薦使用AGG WITH FILTER文法替代傳統的CASE WHEN文法。SQL最佳化器能夠識別Filter參數,使得在同一個欄位上根據不同條件計算COUNT DISTINCT時能夠共用狀態資訊,減少狀態的讀寫次數。根據效能測試結果,採用AGG WITH FILTER文法相比CASE WHEN可以提升高達一倍的效能。

減少狀態大小:調整多流Join順序,緩解State放大

Flink在處理資料流時,採用了二進位雜湊串連(Binary Hash Join)的方式。在下圖樣本中,A與B的串連結果會導致資料存放區的冗餘,這種冗餘程度與串連操作的頻率成正比。隨著加入串連的流數量增加,State的冗餘問題會變得更加嚴重。

image.png

您可以策略性地調整串連的順序來最佳化該問題。具體來說,可以先將資料量較小的流進行串連,而將資料量大的流放在最後進行。這樣的順序調整有助於減輕狀態冗餘帶來的放大效應,從而提高資料處理的效率和效能。

儘可能減少讀盤

為了提升系統效能,可以通過減少磁碟讀取次數並最佳化記憶體使用量來實現,具體請參見儘可能減少讀盤

相關文檔