Flink SQL Batch節點使您能夠使用標準SQL語句定義和執行資料處理任務,適用於巨量資料集的分析與轉換,包括資料清洗和彙總。該節點支援可視化配置,為您提供高效且靈活的大規模資料批處理方案。在Flink SQL Batch節點中,您可以使用類SQL陳述式完成大規模資料加工的批處理。本文將介紹如何在Flink SQL Batch節點中配置和使用SQL語句來完成資料的批處理。
前提條件
已建立工作空間,並在管理中心綁定Realtime ComputeFlink版計算資源,詳情請參見綁定計算資源。
已建立Flink SQL Batch節點,詳情請參見建立調度工作流程的節點。
已為DataWorks調用Realtime ComputeFlink版OpenAPI使用的RAM使用者或RAM角色新增授權以下OpenAPI許可權。該授權用於將節點任務提交並部署到Flink叢集。
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": ["stream:CreateDeployment", "stream:UpdateDeployment", "stream:GetDeployment", "stream:DeleteDeployment"], "Resource": ["*"] } ] }
使用限制
僅支援使用Serverless資源群組,不支援舊版獨享調度資源群組。
步驟一:開發Flink SQL Batch節點
在Flink SQL Batch節點編輯頁面,執行如下開發操作,完成節點任務的開發。
開發SQL代碼
在SQL編輯地區開發工作單位代碼,您可在代碼中使用${變數名}的方式定義變數,並在節點編輯頁面右側調度配置的調度參數中為該變數賦值。實現調度情境下代碼的動態傳參,調度參數使用詳情請參見調度參數來源及其運算式,樣本如下。
--建立源表datagen_source。
CREATE TEMPORARY TABLE datagen_source_${var}(
name VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '1000'
);
--建立結果表blackhole_sink。
CREATE TEMPORARY TABLE blackhole_sink_${var}(
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
--將源表資料插入到結果表。
INSERT INTO blackhole_sink_${var}
SELECT
name
FROM datagen_source_${var};該樣本參數bizdate對應的參數值為$[yyyymmdd],通過設定該參數可以實現對每日新增資料的批量同步處理。
步驟二:配置Flink SQL Batch節點
您可根據業務情況,參照下面的參數描述資訊配置Flink SQL Batch節點任務。
配置Flink資源
您可在編輯頁面右側調度配置的Flink 資源信息中配置如下參數資訊,詳情請參見配置作業部署資訊。
參數 | 描述 |
Flink 集群 | 在管理中心綁定的全託管Flink計算資源名稱。 |
Flink 引擎版本 | 您可根據實際情況選擇引擎版本。 |
調度資源組 | 選擇與Flink網路連通的Serverless資源群組。 |
Job Manager CPU | 根據Flink的最佳實務,JobManager至少需要0.5核CPU和2GiB記憶體來確保穩定運行,建議配置為1核CPU和4 GiB記憶體,最大不超過16核CPU。具體配置應根據叢集規模和作業複雜度調整。 |
Job Manager Memory | JobManager的記憶體配置影響其處理調度和管理工作的能力,推薦配置範圍是2 GiB到64 GiB,以確保穩定高效的運行。具體大小應根據叢集規模和作業需求調整。 |
Task Manager CPU | TaskManager的CPU資源配置影響其任務處理能力。根據Flink的最佳實務,建議配置至少0.5核CPU和2 GiB記憶體,推薦1核CPU和4 GiB記憶體,最大不超過16核CPU。具體配置應依據實際需求調整。 |
Task Manager Memory | TaskManager的記憶體配置決定了其處理任務的資料量和效能。為了確保任務穩定執行和高效處理,記憶體大小至少應為2 GiB,最大可設定為64 GiB。 |
並發度 | 決定了Flink作業中任務的並存執行數量,較高的並發度可以提高處理速度和資源使用率,您需要根據叢集資源和作業特性進行合理設定。 |
最大 slot 数 | 代表了Task Manager上可以分配給任務的固定大小的資源。每個Slot可以運行一個task或operator執行個體。您可以根據實際資源情況調整最大Slot數。 |
每個 TaskManager Slot 數 | 每個TaskManager的Slot數決定了它可以並存執行的任務數量,您可通過調整Slot配置最佳化資源利用和作業的平行處理能力。 |
(可選)配置調度參數
您可在編輯頁面右側調度配置的調度參數地區單擊添加參數,並編輯相應的參數名、參數值資訊,方便在代碼中動態使用。
(可選)配置Flink運行參數
您可在編輯頁面右側調度配置的Flink 運行參數地區中配置相關運行參數資訊,詳情請參見配置作業部署資訊。
Flink運行參數配置時,其編寫規範與VVP(Ververica Platform)保持相容,支援直接採用YAML文法格式編寫配置,無需添加分號等特殊符號實現換行。
如需定期執行節點任務,請根據業務需求配置調度資訊(調度策略、調度時間、調度依賴及節點輸出參數),詳情請參見節點調度配置。
完成任務配置後,單擊保存。
步驟三:(可選)調試Flink SQL Batch節點
在節點發布到生產環境前,您可使用調試功能基於上傳的Mock資料對節點代碼進行試運行,提前驗證SQL邏輯與上下遊資料,無需將任務發布至營運中心即可完成校正。
調試功能採用白名單方式開放,如需使用,請提交工單申請開通。
配置Flink資源資訊
在節點編輯頁面右側回合組態面板的Flink 資源信息地區,按下表說明完成參數配置。
參數 | 描述 |
Flink 調試叢集 | 用於運行調試任務的Flink會話叢集(Session Cluster),必填。下拉式清單會展示當前計算資源下已存在的會話叢集及其運行狀態,僅狀態為運行中的叢集可以選用。 如果列表中無可用叢集,可單擊建立叢集跳轉至Realtime ComputeFlink版控制台建立會話叢集。 |
Flink 引擎版本 | 所選會話叢集對應的Flink引擎版本,由系統根據叢集自動展示,無需手動填寫。 |
逾時時間 | 單次調試任務的最長已耗用時間,單位為分鐘,預設30分鐘。超過該時間長度後調試任務會被自動停止。 |
切換當前節點的計算資源後,已選擇的Flink 調試叢集與已上傳的調試資料將被清空,需要重新選擇叢集並重新上傳資料。
準備調試資料
在回合組態面板的調試數據地區,為代碼中引用的源表準備Mock資料。
單擊產生模板,系統會解析當前SQL中引用的源表,並在下方列表中產生對應的表名記錄。已上傳的資料不會被清空。
在表名記錄的操作列單擊下載模板,下載與該源表結構匹配的CSV模板。
在本地按模板的欄位順序填寫調試資料並儲存為CSV檔案。
在表名記錄的操作列單擊上傳,選擇填寫好的CSV檔案完成上傳。上傳成功後,狀態列會顯示為已啟用。
(可選)上傳成功後,可單擊預覽在底部面板查看資料內容;如需修改資料,可重新上傳CSV檔案覆蓋。
如臨時不希望某張源表的Mock資料參與本次調試,可單擊禁用,狀態變為已禁用;需要重新啟用時單擊啟用。僅已啟用狀態的資料會被本次調試使用。
上傳調試資料前需先選擇Flink 調試叢集,否則會提示請先選擇計算資源。
調試資料僅支援CSV格式,單檔案大小不超過1 MB;CSV檔案首行需為欄位名,編碼建議使用UTF-8。
運行調試
調試資料準備完成後,單擊編輯器頂部工具列的運行按鈕(或按下F8),系統會將代碼、Mock資料與Flink資源資訊一併提交至所選會話叢集運行。
如代碼中使用了${變數名}形式的參數,請確保已在調度參數地區為變數賦值,調試時系統會按該取值替換代碼中的同名預留位置後再提交。
查看調試結果
調試任務運行後,節點底部結果區會提供以下資訊,便於您快速定位問題:
代碼:本次提交到Flink引擎的SQL代碼內容(已完成變數替換)。
日誌:調試任務的作業記錄與異常資訊。
查詢結果:調試任務的輸出資料。