DataWorks提供全鏈路巨量資料開發治理能力,支援包括AnalyticDB在內的多種計算引擎。DataWorks資料開發(DataStudio)模組支援工作流程可視化開發和託管調度營運,能夠按照時間和依賴關係輕鬆實現任務的全面託管調度。您可以在DataWorks中通過ADB Spark SQL和ADB Spark節點開發和調度Spark作業(Spark SQL作業和Spark應用作業)。
前提條件
AnalyticDB for MySQL叢集需滿足以下條件:
AnalyticDB for MySQL叢集的產品系列為企業版、基礎版或湖倉版。
AnalyticDB for MySQL叢集與OSS儲存空間位於相同地區。
已建立AnalyticDB for MySQL叢集的資料庫帳號。
如果是通過阿里雲帳號訪問,只需建立高許可權帳號。
如果是通過RAM使用者訪問,需要建立高許可權帳號和普通帳號並且將RAM使用者綁定到普通帳號上。
AnalyticDB for MySQL已建立資源群組。具體操作,請參見建立和管理資源群組。
開發Spark SQL時,AnalyticDB for MySQL需建立Interactive型資源群組,且Interactive資源群組的引擎類型需為Spark。
開發Spark Jar或PySpark時,AnalyticDB for MySQL需建立Job型資源群組。
已授權AnalyticDB for MySQL扮演AliyunADBSparkProcessingDataRole角色來訪問其他雲資源。
已配置Spark應用的日誌儲存地址。
說明登入雲原生資料倉儲AnalyticDB MySQL控制台,在頁面,單擊日誌配置,選擇預設路徑或自訂儲存路徑。自訂儲存路徑時不能將日誌儲存在OSS的根目錄下,請確保該路徑中至少包含一層檔案夾。
DataWorks需滿足以下條件:
AnalyticDB for MySQL叢集與DataWorks位於相同地區。
建立DataWorks工作空間,且已開啟參加資料開發(DataStudio)(新版)公測的開關。
說明您可在建立DataWorks工作空間時開啟參加資料開發(DataStudio)(新版)公測的開關,或提交工單聯絡支援人員為已有工作空間開啟參加資料開發(DataStudio)(新版)公測的開關。
- 說明
建立資源群組時,資源群組所屬VPC與AnalyticDB for MySQL叢集的VPC保持一致。
已將DataWorks工作空間綁定的資源群組的交換器IPv4網段添加到AnalyticDB for MySQL叢集白名單中,詳情請參見設定白名單。
已為DataWorks工作空間建立了AnalyticDB for Spark計算資源。詳情請參見綁定AnalyticDB for Spark計算資源。
DataWorks調度Spark SQL作業
AnalyticDB for MySQL具有外表作業開發和內表作業開發的能力,本文以外表作業開發情境為例,介紹通過DataWorks開發和調度Spark SQL作業的步驟。
步驟一:建立ADB Spark SQL節點
進入DataWorks工作空間列表頁,在頂部切換至目標地區,找到目標工作空間,單擊操作列的,進入Data Studio。
單擊專案目錄右側的
,選擇。在彈出的節點對話方塊中,輸入節點名稱,按Enter鍵確認建立。
步驟二:開發ADB Spark SQL節點
以在ADB Spark SQL節點中建立外部資料庫為例。如您需要建立內表請參考 Spark SQL建立內表。
CREATE DATABASE IF NOT EXISTS `adb_spark_db` LOCATION 'oss://testBuckename/db_dome';在ADB Spark SQL節點中建立外表
adb_spark_db.tb_order。CREATE TABLE IF NOT EXISTS adb_spark_db.tb_order(id int, name string, age int) USING parquet LOCATION 'oss://testBuckename/db_dome/tb1' TBLPROPERTIES ('parquet.compress'='SNAPPY');查詢資料。
外表建立成功後,您可以在AnalyticDB for MySQL中通過SELECT語句查詢Parquet資料。
SELECT * FROM adb_spark_db.tb_order limit 100;在ADB Spark SQL節點中建立Delta Lake表
adb_spark_db.raw_order。CREATE TABLE IF NOT EXISTS adb_spark_db.raw_order(id int, name string, age int) USING delta;將
adb_spark_db.tb_order資料匯入至adb_spark_db.raw_order。INSERT INTO adb_spark_db.raw_order SELECT * FROM adb_spark_db.tb_order;在AnalyticDB for MySQL中建立資料庫。如果有已建立的資料庫,可以忽略本步驟。樣本如下:
CREATE DATABASE adb_demo;在AnalyticDB for MySQL中建立內表用於儲存從Delta Lake表中匯入的資料。樣本如下:
CREATE TABLE adb_demo.order_xuanwu_format ( `id` int, `name` string, `age` int) using adb TBLPROPERTIES ( 'distributeType'='HASH', 'distributeColumns' = 'id', 'storagePolicy' = 'hot' );將Delta Lake表
adb_spark_db.raw_order的資料匯入order_xuanwu_format。INSERT OVERWRITE adb_demo.order_xuanwu_format SELECT * FROM adb_spark_db.adb_spark_db.raw_order;
步驟三:配置並執行ADB Spark SQL節點
在頁面右側單擊調試配置,配置ADB Spark SQL節點運行參數。
參數類型
參數名稱
描述
計算資源
計算資源
選擇您所綁定的AnalyticDB for Spark計算資源。
ADB計算資源群組
選擇您在AnalyticDB for MySQL叢集中建立的Spark引擎Interactive型資源群組。
DataWorks配置
資源群組
選擇您綁定AnalyticDB for Spark計算資源時已通過測試連通性的DataWorks資源群組。
計算CU
當前節點使用預設CU值,無需修改CU。
指令碼參數
參數名
您在ADB Spark SQL節點中配置的參數名稱。例如,您可以在指令碼中配置參數
$[yyyymmdd],實現對每日新增資料的批量同步處理。支援配置的參數及其格式,請參見配置調度參數。說明系統會自動識別節點中配置的參數名稱。
參數值
配置參數值,任務運行時會將它動態替換為真實的取值。
(可選)如需定期執行節點任務,請在節點右側調度配置的調度策略中配置計算資源、ADB計算資源群組、調度資源群組資訊,並在調度參數中配置參數資訊。
完成調試配置後,單擊
儲存已配置好的SQL節點,然後單擊
測試回合SQL指令碼,查看SQL指令碼是否符合預期。完成調度配置後,即可對已完成的資料庫節點提交發布至生產環境。
發布完成的任務,將按照您配置的參數周期性運行,可在中查看並管理發行的周期任務,詳情請參見:營運中心入門。
DataWorks調度Spark JAR作業
步驟一:建立ADB Spark節點
進入DataWorks工作空間列表頁,在頂部切換至目標地區,找到目標工作空間,單擊操作列的,進入Data Studio。
單擊專案目錄右側的
,選擇。在彈出的節點對話方塊中,輸入節點名稱,按Enter鍵確認建立。
步驟二:開發ADB Spark節點
ADB Spark節點支援Java/Scala、Python語言開發。
Java/Scala語言開發說明
準備樣本Jar包。
您可直接下載該樣本Jar包spark-examples_2.12-3.2.0.jar,用於後續開發、調度ADB Spark節點。
將範例程式碼
spark-examples_2.12-3.2.0.jar上傳到與AnalyticDB for MySQL處於同一地區的OSS Bucket。具體操作,請參見控制台上傳檔案。配置ADB Spark節點。
語言類型
參數名稱
參數描述
Java/Scala
主Jar資源
Jar包在OSS上的儲存路徑。樣本值如:
oss://testBucketname/db_dome/spark-examples_2.12-3.2.0.jar。Main Class
您所需執行的主類名稱,例如上述代碼中的主類名稱
com.work.SparkWork。參數
填寫您所需傳入代碼的參數資訊。
配置項
配置Spark程式運行參數,詳情請參見Spark應用配置參數說明。
樣本如下:
spark.driver.resourceSpec: medium
Python語言開發說明
準備測試資料。
建立一個需要通過Spark讀取的TXT檔案
data.txt,在檔案中添加以下內容。Hello,Dataworks Hello,OSS編寫範例程式碼。
您需建立一個
spark_oss.py檔案,在spark_oss.py檔案中添加以下內容。import sys from pyspark.sql import SparkSession # 初始Spark spark = SparkSession.builder.appName('OSS Example').getOrCreate() # 讀取指定的檔案,檔案路徑由args傳入的參數值來指定 textFile = spark.sparkContext.textFile(sys.argv[1]) # 計算檔案行數並列印 print("File total lines: " + str(textFile.count())) # 列印檔案的第一行內容 print("First line is: " + textFile.first())上傳測試資料
data.txt和範例程式碼spark_oss.py到與AnalyticDB for MySQL處於同一地區的OSS Bucket。具體操作,請參見控制台上傳檔案。配置ADB Spark節點內容。
語言類型
參數名稱
參數描述
Python
主程式包
本例步驟3中
spark_oss.py的OSS路徑,樣本值如oss://testBucketname/db_dome/spark_oss.py。參數
本例步驟3中
data.txt的OSS路徑,樣本值如oss://testBucketname/db_dome/data.txt。配置項
配置Spark程式運行參數,詳情請參見Spark應用配置參數說明。
樣本如下:
spark.driver.resourceSpec: medium
步驟三:配置並執行ADB Spark節點
在頁面右側單擊調試配置,配置ADB Spark SQL節點運行參數。
參數類型
參數名稱
描述
計算資源
計算資源
選擇您所綁定的AnalyticDB for Spark計算資源。
ADB計算資源群組
選擇您在AnalyticDB for MySQL叢集中建立的Job型資源群組。
DataWorks配置
資源群組
選擇您綁定AnalyticDB for Spark計算資源時已通過測試連通性的DataWorks資源群組。
計算CU
當前節點使用預設CU值,無需修改CU。
指令碼參數
參數名
您在ADB Spark JAR作業中配置的參數名稱。
說明系統會自動識別節點中配置的參數名稱。
參數值
配置參數值,任務運行時會將它動態替換為真實的取值。
(可選)如需定期執行節點任務,請在節點右側調度配置的調度策略中配置計算資源、ADB計算資源群組、調度資源群組資訊,並在調度參數中配置參數資訊。
完成調試配置後,單擊
儲存已配置好的SQL節點,然後單擊
測試回合SQL指令碼,查看SQL指令碼是否符合預期。完成調度配置後,即可對已完成的資料庫節點提交發布至生產環境。
發布完成的任務,將按照您配置的參數周期性運行,可在中查看並管理發行的周期任務,詳情請參見營運中心入門。