Azkaban是一個批量工作流程工作調度器,可以構建、執行和管理組件含複雜依賴關係的工作流程。您可以在Azkaban Web介面調度AnalyticDB for MySQL的Spark作業。
前提條件
AnalyticDB for MySQL叢集的產品系列為企業版、基礎版或湖倉版。
AnalyticDB for MySQL叢集中已建立Job型資源群組或Spark引擎的Interactive型資源群組。
已安裝Beeline。
已將運行Azkaban的伺服器IP地址添加至AnalyticDB for MySQL叢集的白名單中。
調度Spark SQL作業
AnalyticDB for MySQL支援使用批處理和互動式兩種方法執行Spark SQL。選擇的執行方式不同,調度的操作步驟也有所不同。詳細步驟如下:
批處理
- 說明
您只需要配置
keyId、secretId、regionId、clusterId和rgName這些必填參數。 編寫工作流程檔案,並將其所在的檔案夾壓縮成ZIP格式。
nodes: - name: SparkPi type: command config: command: /<your path>/adb-spark-toolkit-submit/bin/spark-submit --class com.aliyun.adb.spark.sql.OfflineSqlTemplate local:///opt/spark/jars/offline-sql.jar "show databases" "select 100" dependsOn: - jobA - jobB - name: jobA type: command config: command: echo "This is an echoed text." - name: jobB type: command config: command: pwd重要<your path>需替換為Spark-Submit命令列工具的實際安裝路徑。command節點的命令輸入請不要使用續行符(\)。
建立專案並上傳步驟2壓縮的工作流程檔案。
訪問Azkaban Web介面,在頂部導覽列單擊Project。
單擊頁面右上方的Create Project。
在彈出的Create Project對話方塊中配置Name、Description參數。
單擊頁面右上方的Upload。
在彈出的Upload Project Files對話方塊中上傳工作流程檔案,單擊Upload。
運行工作流程。
在Project頁面,單擊Flows頁簽。
單擊Execute Flow。
單擊Execute。
在彈出的Flow submitted對話方塊中,單擊Continue。
查看工作流程詳細資料。
在頂部導覽列單擊Executing。
單擊Recently Finished頁簽。
單擊Execution Id,並單擊Job List頁簽查看每一個Job的執行詳情。
單擊Log查看Job的日誌資訊。
互動式
擷取Spark Interactive型資源群組的串連地址。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在企業版、基礎版或湖倉版頁簽下,單擊目的地組群ID。
在左側導覽列,單擊,單擊資源組管理頁簽。
單擊對應資源群組操作列的詳情,查看內網串連地址和公網串連地址。您可單擊連接埠號碼括弧內的
按鈕,複製串連地址。以下兩種情況,您需要單擊公網地址後的申請網路,手動申請公網串連地址。
提交Spark SQL作業的用戶端工具部署在本地。
提交Spark SQL作業的用戶端工具部署在ECS上,且ECS與AnalyticDB for MySQL不屬於同一VPC。
編寫工作流程檔案,並將其所在的檔案夾壓縮成ZIP格式。
nodes: - name: jobB type: command config: command: <path> -u "jdbc:hive2://amv-t4n83e67n7b****sparkwho.ads.aliyuncs.com:10000/adb_demo" -n spark_interactive_prod/spark_user -p "spark_password" -e "show databases;show tables;" dependsOn: - jobA - name: jobA type: command config: command: <path> -u "jdbc:hive2://amv-t4n83e67n7b****sparkwho.ads.aliyuncs.com:10000/adb_demo" -n spark_interactive_prod/spark_user -p "spark_password" -e "show tables;"參數說明:
參數
說明
path
需替換為Beeline用戶端所在的路徑。例如:
/path/to/spark/bin/beeline。-u
請填寫步驟1中擷取的串連地址。串連地址中的
default需替換為實際的資料庫名,並且需要刪除串連地址中的resource_group=<資源群組名稱>尾碼。例如:
jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo。-n
AnalyticDB for MySQL的資料庫帳號及資源群組名稱。格式為
資源群組名稱/資料庫帳號名稱。例如:本文樣本資源群組名稱為spark_interactive_prod,資料庫帳號名稱為spark_user,此處填寫為
spark_interactive_prod/spark_user。-p
AnalyticDB for MySQL資料庫帳號的密碼。
-e
業務具體的SQL語句。多條SQL使用英文分號(;)分隔。
建立專案並上傳步驟1壓縮的工作流程檔案。
訪問Azkaban Web介面,在頂部導覽列單擊Project。
單擊頁面右上方的Create Project。
在彈出的Create Project對話方塊中配置Name、Description參數。
單擊頁面右上方的Upload。
在彈出的Upload Project Files對話方塊中上傳工作流程檔案,單擊Upload。
運行工作流程。
在Project頁面,單擊Flows頁簽。
單擊Execute Flow。
單擊Execute。
在彈出的Flow submitted對話方塊中,單擊Continue。
查看工作流程詳細資料。
在頂部導覽列單擊Executing。
單擊Recently Finished頁簽。
單擊Execution Id,並單擊Job List頁簽查看每一個Job的執行詳情。
單擊Log查看Job的日誌資訊。
調度Spark Jar作業
- 說明
您只需要配置
keyId、secretId、regionId、clusterId和rgName這些必填參數。如果您的Spark Jar包在本地,還需要配置ossUploadPath等OSS相關參數。 編寫工作流程檔案,並將其所在的檔案夾壓縮成ZIP格式。
nodes: - name: SparkPi type: command config: command: /<your path>/adb-spark-toolkit-submit/bin/spark-submit --class org.apache.spark.examples.SparkPi --name SparkPi --conf spark.driver.resourceSpec=medium --conf spark.executor.instances=2 --conf spark.executor.resourceSpec=medium local:///tmp/spark-examples.jar 1000 dependsOn: - jobA - jobB - name: jobA type: command config: command: echo "This is an echoed text." - name: jobB type: command config: command: pwd重要<your path>需替換為Spark-Submit命令列工具的實際安裝路徑。command節點的命令輸入請不要使用續行符(\)。
建立專案並上傳步驟2壓縮的工作流程檔案。
訪問Azkaban Web介面,在頂部導覽列單擊Project。
單擊頁面右上方的Create Project。
在彈出的Create Project對話方塊中配置Name、Description參數。
單擊頁面右上方的Upload。
在彈出的Upload Project Files對話方塊中上傳工作流程檔案,單擊Upload。
運行工作流程。
在Project頁面,單擊Flows頁簽。
單擊Execute Flow。
單擊Execute。
在彈出的Flow submitted對話方塊中,單擊Continue。
查看工作流程詳細資料。
在頂部導覽列單擊Executing。
單擊Recently Finished頁簽。
單擊Execution Id,並單擊Job List頁簽查看每一個Job的執行詳情。
單擊Log查看Job的日誌資訊。