狀態管理不僅影響應用的效能,還關係到系統的穩定性和資源的有效利用。如果狀態管理不當,可能會導致效能下降、資源耗盡,甚至系統崩潰。本文為您介紹SQL作業大狀態導致反壓的調優原理與方法。
運行原理:狀態運算元的產生
作為一種特定領域語言,SQL的設計初衷是隱藏底層資料處理的複雜性,可以通過聲明式語言來進行資料操作。而Flink SQL由於其架構的特殊性,在實現層面通常需要引入狀態後端配合系統檢查點(Checkpoint)來保證計算結果的最終一致性。目前Flink SQL由最佳化器根據配置項以及SQL語句來推導產生狀態運算元,想要高效處理有狀態的大規模資料和效能調優,需要對SQL狀態運算元產生機制和管理原則有一定瞭解。
基於最佳化器推導產生的狀態運算元
主要有如下三種狀態運算元:
狀態運算元 | 狀態清理機制 |
ChangelogNormalize | |
SinkUpsertMaterlizer | |
LookupJoin(*) |
ChangelogNormalize
ChangelogNormalize旨在對涉及主鍵語義的資料變更日誌進行標準化處理。通過該運算元,可以有效地整合和最佳化資料變更記錄,確保資料的一致性和準確性。該狀態運算元會在以下兩種情境出現:
使用了帶有主鍵的upsert源表
upsert源表特指在保持主鍵順序一致性的前提下,僅產生基於主鍵的UPDATE(包括INSERT和 UPDATE_AFTER)及DELETE操作的變更資料表。例如,Upsert Kafka便是支援這類操作的典型連接器之一。此外,您也可以通過重寫自訂來源表連接器中的getChangelogMode方法,實現upsert功能。
@Override public ChangelogMode getChangelogMode() { return ChangelogMode.upsert(); }顯式設定
'table.exec.source.cdc-events-duplicate' = 'true'在使用at-least-once語義進行CDC事件處理時,可能會產生重複的變更日誌。在需要exactly-once語義時,您需要開啟此配置項來對變更日誌進行去重。例如
當出現該運算元時,上遊資料將按照Flink SQL源表DDL中定義的主鍵做一次hash shuffle操作後使用ValueState來儲存當前主鍵下最新的整行記錄。更新狀態並向下遊發送變更的過程如下圖所示。處理第二條
-U(2, 'Jerry', 77)時State已經empty,說明截止目前+I/+UA和-D/-UB已經兩兩抵銷,當前這條retract訊息是重複的,可以丟棄。
SinkUpsertMaterializer
專門用於處理具有主鍵定義的結果表,並確保資料的物化操作符合upsert語義。在資料流更新過程中,如果無法保證upsert的特定要求,即按照主鍵進行更新時保持資料的唯一性和有序性,最佳化器會自動引入此運算元。它通過維護基於結果表主鍵的狀態資訊,來確保這些約束得到滿足。更多資訊及常見情境請參見Flink SQL中Changelog事件亂序處理原理。
LookupJoin
在處理LookupJoin操作時,若主動配置了系統最佳化選項'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE',且最佳化器識別到潛在的非確定性更新問題(如何消除流查詢的不確定性影響),則系統會嘗試採取特殊措施以解決這一問題。具體而言,若通過引入一個狀態運算元能夠消除非確定性,最佳化器便會自動建立一個帶狀態的LookupJoin運算元。
帶狀態的LookupJoin運算元主要適用於以下情況:結果表被定義了主鍵,而這些主鍵完全或部分來自於維表,同時維表中的資料可能會發生變化(例如通過變更資料擷取,即CDC Lookup Source機制)。此外,用於Join操作的欄位在維表中並非主鍵。在這種情況下,帶狀態的LookupJoin運算元能夠有效地處理資料的動態變化,確保查詢結果的準確性和一致性。
基於SQL操作產生的狀態運算元
基於SQL操作產生的狀態運算元,按狀態清理機制可以分為TTL到期和依賴watermark推進兩類。具體說來,Flink SQL裡有部分狀態運算元的生命週期不是由TTL來控制,例如Window相關的狀態計算(WindowAggregate、WindowDeduplicate、WindowJoin、WindowTopN等)。它們的狀態清理主要依賴於watermark的推進,當watermark超過視窗結束時間時,內建的定時器就會觸發狀態清理。
狀態運算元 | 如何產生 | 狀態清理機制 |
Deduplicate | 使用row_number語句,order by的欄位必須為時間屬性(time attribute)欄位(事件時間event time或處理時間processing time),且只取第一條。 | TTL |
RegularJoin | 使用join語句,等值條件裡不包含時間屬性欄位。 | |
GroupAggregate | 使用group by語句進行分組彙總,如sum、count、min、max、first_value、last_value,或使用distinct關鍵字。 | |
GlobalGroupAggregate | 分組彙總開啟local-global最佳化。 | |
IncrementalGroupAggregate | 當存在兩層分組彙總操作並開啟兩階段最佳化時,內層彙總對應的狀態運算元GlobalGroupAggregate和外層彙總對應的狀態運算元LocalGroupAggregate被合并成一個IncrementalGroupAggregate。 | |
Rank | 使用row_number語句,order by的欄位必須為非時間屬性欄位。 | |
GlobalRank | 使用row_number語句,order by的欄位必須為非時間屬性欄位,並開啟local-global最佳化。 | |
IntervalJoin | 使用join語句,範圍條件裡包含時間屬性欄位(事件時間或處理時間)。例如: | watermark |
TemporalJoin | 使用基於事件時間的inner或left join語句。 | |
WindowDeduplicate | 基於Window TVF的去重操作。 | |
WindowAggregate | 基於Window TVF彙總。 | |
GlobalWindowAggregate | 基於Window TVF彙總,並開啟兩階段最佳化。 | |
WindowJoin | 基於Window TVF的Join。 | |
WindowRank | 基於Window TVF的排序。 | |
GroupWindowAggregate | 基於legacy文法的Window彙總。 |
問題診斷方法
在Flink作業遭遇效能瓶頸時,系統往往表現出明顯的反壓現象。這種反壓可能由多種因素引起,但主要的原因之一是作業狀態規模的持續膨脹,直至超出記憶體限制。此時,狀態儲存引擎會將部分不頻繁使用的狀態資料移至磁碟,而磁碟與記憶體在資料存取速度上的巨大差異,使得磁碟IO操作成為資料處理效率的瓶頸。尤其在Flink的計算過程中,如果運算元頻繁地從磁碟讀取狀態資料,將顯著增加作業的延遲,降低整體處理速度,成為效能問題的根源。
為了準確識別是否由狀態訪問引發反壓,需要對作業的運行狀態和運算元行為進行深入分析。利用監控工具追蹤和診斷效能瓶頸,可以有效地發現並解決由狀態訪問引起的效能問題,從而提升Flink作業的效能,具體方法請參見問題診斷方法。
調優方法
主動避免產生不必要的狀態運算元
基於SQL操作產生的狀態運算元一般很難避免,因此主要針對最佳化器自動推導的運算元進行討論。
ChangelogNormalize
在使用upsert source進行資料處理時,需注意其ChangelogNormalize狀態節點的產生。通常情況下,除了事件時間的時態關聯(event time temporal join)外,其他upsert source應用情境都會產生該狀態節點。因此,在選擇Upsert Kafka或類似的Upsert連接器時,應首先評估具體的使用情境,對於非事件時間關聯情境,應特別關注狀態運算元的狀態指標(state metrics)。由於狀態節點是基於KeyedState的,當源表的主鍵數量龐大時,狀態節點的規模也會相應增加。如果物理表的主鍵更新頻繁,狀態節點也將頻繁地被訪問和修改。從實踐角度而言,像資料同步類的情境,建議避免使用Upsert Kafka作為源表連接器,同時也最好選擇能夠保證exactly-once語義的資料同步工具。
SinkUpsertMaterializer
auto作為table.exec.sink.upsert-materialize配置項的預設值,表明系統會自動判斷資料的一致性,尤其是在變更日誌(changelog)出現無序的情況下。該機制確保了通過引入SinkUpsertMaterializer來鑑效組資料處理的準確性。但並不意味著每當該運算元被啟用,資料就一定存在無序問題。例如,將多個分組鍵(group by key)合并的操作,這種情況下最佳化器無法準確推匯出upsert鍵,因此出於安全考慮,會預設添加SinkUpsertMaterializer。如果對資料的分布有充分的瞭解,不使用該運算元也能夠確保輸出結果的正確性,可以將參數設定為none,從而在資料正確性和效能上都得到保證。您可以通過檢查作業的最後一個節點來確認SinkUpsertMaterializer是否被啟用使用。在作業的運行拓撲圖中(如下所示),該運算元通常會與sink運算元一起顯示,形成一個運算元鏈。通過這種方式,可以直觀地監控和評估SinkUpsertMaterializer在資料處理過程中的實際應用情況,從而做出更加合理的最佳化決策。


在檢測到產生了特定運算元且資料計算無誤的情況下,可以調整配置項為
'table.exec.sink.upsert-materialize'='none'(配置步驟請參見空間管理與操作),以避免自動添加SinkUpsertMaterializer。Realtime Compute引擎VVR 8.0及以上版本中引入了SQL執行計畫智能分析功能,協助您更好地識別此類問題,如下圖所示。
減少狀態訪問頻次:開啟mini-batch
在對延時要求不高(比如分鐘層級更新)的情境下,開啟mini-batch攢批最佳化將會減少State的訪問和更新頻率(具體操作請參見開啟MiniBatch),提升吞吐。
Realtime ComputeFlink版可以應用mini-batch的狀態運算元如下:
狀態運算元 | 說明 |
ChangelogNormalize | 無。 |
Deduplicate | 可配置table.exec.deduplicate.mini-batch.compact-changes-enable,在基於事件時間去重時是否壓縮Changelog。 |
GroupAggregate GlobalGroupAggregate IncrementalGroupAggregate | 無。 |
RegularJoin | 需額外配置table.exec.stream.join.mini-batch-enabled開啟mini-batch join最佳化。適用於更新流和outer join情境。 |
減少狀態大小:設定合理生命週期
開啟或關閉TTL不能保證完全相容。當嘗試在已開啟TTL的作業上關閉TTL配置時,或者反過來操作時,將會導致相容性失敗並引發StateMigrationException異常。
在最佳化計算系統時,關鍵在於精簡狀態資料以提高效能。您可以在作業營運頁面配置State資料到期時間(參數詳情請參見運行參數配置)來控製作業狀態的生命週期,以滿足不同的營運需求和策略。

過短的TTL可能導致資料未能及時處理,從而產生不符合預期的計算結果,例如,在彙總或串連操作時,部分資料晚到,而相關狀態已到期,導致結果異常。相反,過長的TTL會消耗資源,降低作業的穩定性。因此,在對Flink SQL作業進行TTL配置時,建議根據資料特性和業務需求進行恰當的TTL設定。例如,如果計算周期以自然天為單位,並且資料跨天漂移不會超過1小時,那麼將TTL設定為25小時即可滿足需求。資料開發人員應深入瞭解業務情境和計算邏輯,以實現最佳的平衡。
此外,針對雙流串連情境,Flink SQL自Realtime Compute引擎VVR 8.0.1版本起,支援通過JOIN_STATE_TTL Hint為左流和右流分別設定不同的生命週期。這一改進允許為各自資料流定製生命週期,有效減少不必要的狀態儲存開銷,從而最佳化作業效能。您可以根據左右流資料的實際生命週期需求,靈活配置,以達到節省資源和提高作業效率的目的,具體操作請參見查詢提示。
SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...下面是一個作業使用JOIN_STATE_TTL Hint前後的State大小對比樣本。
對比 | 作業情況 | 狀態大小 |
最佳化前 |
|
|
最佳化後 |
|
|
減少狀態大小:命中更優的執行計畫
在產生執行計畫時,最佳化器會結合輸入SQL和配置選擇相應的State實現。
利用主鍵最佳化雙流串連
當串連鍵(Join Key)包含主鍵時,系統採用ValueState<RowData>進行資料存放區,這樣可以為每個串連鍵僅保留一條最新記錄,實現儲存空間的最大化節省。
如果串連操作使用了非主鍵欄位,即使已定義主鍵,系統會使用MapState<RowData, RowData>進行儲存,以便為每個串連鍵儲存來自源表的、基於主鍵的最新記錄。
在未定義主鍵的情況下,系統將使用MapState<RowData, Integer>儲存資料,記錄每個串連鍵對應的整行資料及其出現次數。
因此,建議在建表DDL中聲明主鍵,並在雙流串連時優先使用主鍵,以最佳化儲存效率。
最佳化append_only流去重操作
使用ROW_NUMBER函數替代FIRST_VALUE或LAST_VALUE函數進行去重,可以更有效地保留首次(ROW_NUMBER函數產生的Deduplicate運算元僅保留出現過的Key)或最新出現的記錄(保留Key及其最後一次出現的記錄)。
提升彙總查詢效能
在進行多維度統計,例如計算全網UV、手機用戶端UV、PC端UV等,推薦使用AGG WITH FILTER文法替代傳統的CASE WHEN文法。SQL最佳化器能夠識別Filter參數,使得在同一個欄位上根據不同條件計算COUNT DISTINCT時能夠共用狀態資訊,減少狀態的讀寫次數。根據效能測試結果,採用AGG WITH FILTER文法相比CASE WHEN可以提升高達一倍的效能。
減少狀態大小:調整多流Join順序,緩解State放大
Flink在處理資料流時,採用了二進位雜湊串連(Binary Hash Join)的方式。在下圖樣本中,A與B的串連結果會導致資料存放區的冗餘,這種冗餘程度與串連操作的頻率成正比。隨著加入串連的流數量增加,State的冗餘問題會變得更加嚴重。

您可以策略性地調整串連的順序來最佳化該問題。具體來說,可以先將資料量較小的流進行串連,而將資料量大的流放在最後進行。這樣的順序調整有助於減輕狀態冗餘帶來的放大效應,從而提高資料處理的效率和效能。
儘可能減少讀盤
為了提升系統效能,可以通過減少磁碟讀取次數並最佳化記憶體使用量來實現,具體請參見儘可能減少讀盤。
相關文檔
大狀態作業導致的問題和診斷調優整體思路,詳情請參見大狀態作業調優實踐指南。
Flink Datastream API在狀態管理方面提供了靈活的介面,您可以採取相關措施來確保狀態大小可控,避免狀態的無限制增長,詳情請參見DataStream作業大狀態導致反壓的調優原理與方法。
快速啟動和擴縮容過程中初始化瓶頸問題的診斷方法和調優策略,詳情請參見作業啟動和擴縮容速度最佳化。

