本文為您介紹如何使用Flink計算Tablestore的資料,Tablestore中的資料表或時序表均可作為Realtime ComputeFlink的源表或結果表進行使用。
前提條件
已開通Tablestore服務並建立執行個體。具體操作,請參見開通服務並建立執行個體。
已開通Flink工作空間。具體操作,請參見開通Realtime ComputeFlink版。
重要Realtime ComputeFlink必須與Tablestore服務位於同一地區。Realtime ComputeFlink支援的地區,請參見地區列表。
已擷取AccessKey資訊。
重要出於安全考慮,強烈建議您通過RAM使用者使用Table Store功能。具體操作,請參見建立RAM使用者並授權。
Realtime Compute作業開發流程
步驟一:建立作業
進入SQL作業建立頁面。
單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊
。
單擊建立後,在新增作業草稿對話方塊,選擇空白的流作業草稿,單擊下一步。
填寫作業資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
flink-test
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊
表徵圖,建立子檔案夾。
作業草稿
引擎版本
vvr-8.0.10-flink-1.17
單擊建立。
步驟二:編寫SQL作業
此處以將資料表中的資料同步至另一個資料表為例,為您介紹如何編寫SQL作業。更多SQL樣本,請參考SQL樣本。
分別建立源表(資料表)和結果表(資料表)的暫存資料表。
詳細配置資訊,請參見附錄1:Tablestore連接器。
-- 建立源表(資料表)的暫存資料表 tablestore_stream CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector' = 'ots', -- 源表的連接器類型。固定取值為ots。 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Table Store執行個體的VPC地址。 'instanceName' = 'xxx', -- Table Store的執行個體名稱。 'tableName' = 'flink_source_table', -- Table Store的源表名稱。 'tunnelName' = 'flink_source_tunnel', -- Table Store源表的資料通道名稱。 'accessId' = 'xxxxxxxxxxx', -- 阿里雲帳號或者RAM使用者的AccessKey ID。 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里雲帳號或者RAM使用者的AccessKey Secret。 'ignoreDelete' = 'false' -- 是否忽略DELETE操作類型的即時資料:不忽略。 ); -- 建立結果表(資料表)的暫存資料表 tablestore_sink CREATE TEMPORARY TABLE tablestore_sink( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED -- 主鍵。 ) WITH ( 'connector' = 'ots', -- 結果表的連接器類型。固定取值為ots。 'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Table Store執行個體的VPC地址。 'instanceName' = 'xxx', -- Table Store的執行個體名稱。 'tableName' = 'flink_sink_table', -- Table Store的結果表名稱。 'accessId' = 'xxxxxxxxxxx', -- 阿里雲帳號或者RAM使用者的AccessKey ID。 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里雲帳號或者RAM使用者的AccessKey Secret。 'valueColumns'='customerid,customername' --插入欄位的列名。 );
編寫作業邏輯。
將源表資料插入到結果表的程式碼範例如下:
--將源表資料插入到結果表 INSERT INTO tablestore_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
步驟三:(可選)查看配置資訊
在SQL編輯地區右側頁簽,您可以查看或上傳相關配置。
頁簽名稱 | 配置說明 |
更多配置 |
|
代碼結構 |
|
版本資訊 | 您可以在此處查看作業版本資訊,操作列下的功能詳情請參見管理作業版本。 |
步驟四:(可選)進行深度檢查
深度檢查能夠檢查作業的SQL語義、網路連通性以及作業使用的表的中繼資料資訊。同時,您可以單擊結果地區的SQL最佳化,展開查看SQL風險問題提示以及對應的SQL最佳化建議。
在SQL編輯地區右上方,單擊深度檢查。
在深度檢查對話方塊,單擊確認。
步驟五:(可選)進行作業調試
您可以使用作業調試功能類比作業運行、檢查輸出結果,驗證SELECT或INSERT商務邏輯的正確性,提升開發效率,降低資料品質風險。
在SQL編輯地區右上方,單擊調試。
在調試對話方塊,選擇調試叢集後,單擊下一步。
如果沒有可用叢集則需要建立新的Session叢集,Session叢集與SQL作業引擎版本需要保持一致並處於運行中。詳情請參見建立Session叢集。
配置調試資料。
如果您使用線上資料,無需處理。
如果您需要使用調試資料,需要先單擊下載調試資料範本,填寫調試資料後,上傳調試資料。詳情請參見作業調試。
確定好調試資料後,單擊確定。
步驟六:進行作業部署
在SQL編輯地區右上方,單擊部署,在部署新版本對話方塊,可根據需要填寫或選中相關內容,單擊確定。
Session叢集適用於非生產環境的開發測試環境,通過部署或調試作業提高作業JM(Job Manager)資源使用率和提高作業啟動速度。但不推薦您將生產作業提交至Session叢集中,可能會導致業務穩定性問題。
步驟七:啟動並查看Flink計算結果
在左側導覽列,單擊
。單擊目標作業操作列中的啟動。
選擇無狀態啟動後,單擊啟動。當作業狀態轉變為運行中時,代表作業運行正常。作業啟動參數配置,詳情請參見作業啟動。
說明Flink中的每個TaskManager建議配置2CPU和4GB記憶體,此配置可以充分發揮每個TaskManager的計算能力。單個TaskManager能達到1萬/s的寫入速率。
在source表分區數目足夠多的情況下,建議Flink中並發配置在16以內,寫入速率隨並發線性增長。
在作業營運詳情頁面,查看Flink計算結果。
在
頁面,單擊目標作業名稱。在作業日誌頁簽,單擊運行Task Managers頁簽下Path,ID列的目標任務。
單擊日誌,在頁面查看相關的日誌資訊。
(可選)停止作業。
如果您對作業進行了修改(例如更改代碼、增刪改WITH參數、更改作業版本等),且希望修改生效,則需要重新部署作業,然後停止再啟動。另外,如果作業無法複用State,希望作業全新啟動時,或者更新非動態生效的參數配置時,也需要停止後再啟動作業。作業停止詳情請參見作業停止。