全部產品
Search
文件中心

AnalyticDB:Spark SQL互動式分析

更新時間:Sep 10, 2025

如果您需要以互動式方式執行Spark SQL,可以指定Spark Interactive型資源群組作為執行查詢的資源群組。資源群組的資源量會在指定範圍內自動擴縮容,在滿足您互動式分析需求的同時還可以降低使用成本。本文為您詳細介紹如何通過控制台、Hive JDBC、PyHive、Beeline、DBeaver等用戶端工具實現Spark SQL互動式分析。

前提條件

  • 叢集的產品系列為企業版、基礎版或湖倉版

  • 叢集與OSS儲存空間位於相同地區。

  • 已建立資料庫帳號。

  • 已安裝Java 8開發環境和Python 3.9開發環境,以便後續運行Java應用、Python應用、Beeline等用戶端。

  • 已將用戶端IP地址添加至AnalyticDB for MySQL叢集白名單中。

注意事項

  • 如果Spark Interactive型資源群組處於停止狀態,在執行第一個Spark SQL時叢集會重新啟動Spark Interactive型資源群組,第一個Spark SQL可能會處於較長時間的排隊等待狀態。

  • Spark無法讀寫INFORMATION_SCHEMA和MYSQL資料庫,因此請不要將這些資料庫作為初始串連的資料庫。

  • 請確保提交Spark SQL作業的資料庫帳號已具有訪問目標資料庫的許可權,否則會導致查詢失敗。

準備工作

  1. 建立Spark Interactive型資源群組

  2. 擷取Spark Interactive型資源群組的串連地址。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊集群管理 > 資源管理,單擊資源組管理頁簽。

    3. 單擊對應資源群組操作列的詳情,查看內網串連地址和公網串連地址。您可單擊串連地址後image按鈕複製串連地址,或連接埠號碼括弧內的image按鈕,複製JDBC串連串。

      以下兩種情況,您需要單擊公網地址後的申請網路,手動申請公網串連地址。

      • 提交Spark SQL作業的用戶端工具部署在本地或外部伺服器。

      • 提交Spark SQL作業的用戶端工具部署在ECS上,且ECSAnalyticDB for MySQL不屬於同一VPC。

      image

互動式分析

控制台

重要

若您是自建HiveMetastore,使用控制台開發Spark SQL作業時,請在AnalyticDB for MySQL中建立一個名為default的資料庫,並選擇它作為執行Spark SQL的資料庫。

  1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。

  2. 在左側導覽列單擊作業開發 > SQL開發

  3. 選擇Spark引擎和建立的Spark Interactive型資源群組,執行如下Spark SQL:

    SHOW DATABASES;

SDK調用

通過調用SDK的方法執行Spark SQL時,查詢結果會以檔案的形式寫入指定的OSS中,後續您可以在OSS控制台上查詢資料,或將結果檔案下載到本地查看。下文以Python語言調用SDK為例。

  1. 執行以下語句,安裝SDK。

    pip install alibabacloud-adb20211201
  2. 依次執行以下語句,安裝環境依賴。

    pip install oss2
    pip install loguru
  3. 串連並執行Spark SQL。

    # coding: utf-8
    import csv
    import json
    import time
    from io import StringIO
    
    import oss2
    from alibabacloud_adb20211201.client import Client
    from alibabacloud_adb20211201.models import ExecuteSparkWarehouseBatchSQLRequest, ExecuteSparkWarehouseBatchSQLResponse, \
        GetSparkWarehouseBatchSQLRequest, GetSparkWarehouseBatchSQLResponse, \
        ListSparkWarehouseBatchSQLRequest, CancelSparkWarehouseBatchSQLRequest, ListSparkWarehouseBatchSQLResponse
    from alibabacloud_tea_openapi.models import Config
    from loguru import logger
    
    def build_sql_config(oss_location,
                         spark_sql_runtime_config: dict = None,
                         file_format = "CSV",
                         output_partitions = 1,
                         sep = "|"):
        """
        構建ADB SQL執行的配置
        :param oss_location: OSS存放SQL執行結果的路徑
        :param spark_sql_runtime_config: Spark SQL社區的原生配置
        :param file_format: SQL執行結果的檔案格式, 預設為CSV
        :param output_partitions: SQL執行結果的分區數, 如果需要輸出大量結果, 必須增加輸出時的分區數避免單個檔案過大
        :param sep: CSV檔案的分隔字元, 非CSV檔案忽略
        :return: SQL執行的配置
        """
    
        if oss_location is None:
            raise ValueError("oss_location is required")
    
        if not oss_location.startswith("oss://"):
            raise ValueError("oss_location must start with oss://")
    
        if file_format != "CSV" and file_format != "PARQUET" and file_format != "ORC" and file_format != "JSON":
            raise ValueError("file_format must be CSV, PARQUET, ORC or JSON")
    
        runtime_config =  {
            # sql output config
            "spark.adb.sqlOutputFormat": file_format,
            "spark.adb.sqlOutputPartitions": output_partitions,
            "spark.adb.sqlOutputLocation": oss_location,
            # csv config
            "sep": sep
        }
    
        if spark_sql_runtime_config:
            runtime_config.update(spark_sql_runtime_config)
        return runtime_config
    
    
    def execute_sql(client: Client, dbcluster_id: str,
                    resource_group_name: str,
                    query: str,
                    limit = 10000,
                    runtime_config: dict = None,
                    schema="default"
                    ):
        """
        在Spark Interactive型資源群組中執行SQL
        :param client: 阿里雲用戶端
        :param dbcluster_id: 叢集的ID
        :param resource_group_name: 叢集的資源群組,要求必須是Spark Interactive資源群組
        :param schema: SQL執行的預設的資料庫名稱, 不填寫為default
        :param limit: SQL執行的結果的行數限制
        :param query: 執行的SQL語句, 使用分號分隔多個SQL語句
        :return:
        """
        # 組裝請求體
        req = ExecuteSparkWarehouseBatchSQLRequest()
        # 叢集ID
        req.dbcluster_id = dbcluster_id
        # 資源群組名稱
        req.resource_group_name = resource_group_name
        # SQL執行逾時時間
        req.execute_time_limit_in_seconds = 3600
        # 執行SQL的資料庫名稱
        req.schema = schema
        # SQL業務代碼
        req.query = query
        # 返回結果行數
        req.execute_result_limit = limit
    
        if runtime_config:
        # SQL執行的配置
            req.runtime_config = json.dumps(runtime_config) 
        # 執行SQL並擷取query_id
        resp: ExecuteSparkWarehouseBatchSQLResponse = client.execute_spark_warehouse_batch_sql(req)
        logger.info("Query execute submitted: {}", resp.body.data.query_id)
        return resp.body.data.query_id
    
    
    def get_query_state(client, query_id):
        """
        查詢SQL執行的狀態
        :param client: 阿里雲用戶端
        :param query_id: SQL執行的ID
        :return: SQL執行的狀態和結果
        """
        req = GetSparkWarehouseBatchSQLRequest(query_id=query_id)
        resp: GetSparkWarehouseBatchSQLResponse = client.get_spark_warehouse_batch_sql(req)
        logger.info("Query state: {}", resp.body.data.query_state)
        return resp.body.data.query_state, resp
    
    def list_history_query(client, db_cluster, resource_group_name, page_num):
        """
        查詢Spark Interactive資源群組中執行的SQL的歷史
        :param client: 阿里雲用戶端
        :param db_cluster: 叢集的ID
        :param resource_group_name: 資源群組名稱
        :param page_num: 分頁查詢的頁數
        :return: 是否有SQL, 有的話代表可以進入下一頁查詢
        """
        req = ListSparkWarehouseBatchSQLRequest(dbcluster_id=db_cluster,
                                                resource_group_name=resource_group_name,
                                                page_number = page_num)
        resp: ListSparkWarehouseBatchSQLResponse = client.list_spark_warehouse_batch_sql(req)
    
        # 如果沒有查詢到SQL, 返回false. 否則返回true. 預設每頁10條
        if resp.body.data.queries is None:
            return True
    
        # 列印查詢到的SQL
        for query in resp.body.data.queries:
            logger.info("Query ID: {}, State: {}", query.query_id, query.query_state)
        logger.info("Total queries: {}", len(resp.body.data.queries))
        return len(resp.body.data.queries) < 10
    
    
    def list_csv_files(oss_client, dir):
        for obj in oss_client.list_objects_v2(dir).object_list:
            if obj.key.endswith(".csv"):
                logger.info(f"reading {obj.key}")
                # read oss file content
                csv_content = oss_client.get_object(obj.key).read().decode('utf-8')
                csv_reader = csv.DictReader(StringIO(csv_content))
                # Print the CSV content
                for row in csv_reader:
                    print(row)
    
    
    if __name__ == '__main__':
        logger.info("ADB Spark Batch SQL Demo")
    
        # AccessKey ID,請填寫實際值
        _ak = "LTAI****************"
        # AccessKey Serect,請填寫實際值
        _sk = "yourAccessKeySecret"
        # 地區ID,請填寫實際值
        _region= "cn-shanghai" 
    
    
        # 叢集ID,請填寫實際值
        _db = "amv-uf6485635f****"
        # 資源群組名稱,請填寫實際值
        _rg_name = "testjob" 
    
        # client config
        client_config = Config(
            # Alibaba Cloud AccessKey ID
            access_key_id=_ak,
            # Alibaba Cloud AccessKey Secret
            access_key_secret=_sk,
            # The endpoint of the ADB service
            # adb.ap-southeast-1.aliyuncs.com is the endpoint of the ADB service in the Singapore region
            # adb-vpc.ap-southeast-1.aliyuncs.com used in the VPC scenario
            endpoint=f"adb.{_region}.aliyuncs.com"
        )
    
        # 建立阿里雲用戶端
        _client = Client(client_config)
    
        # SQL執行的配置
        _spark_sql_runtime_config = {
            "spark.sql.shuffle.partitions": 1000,
            "spark.sql.autoBroadcastJoinThreshold": 104857600,
            "spark.sql.sources.partitionOverwriteMode": "dynamic",
            "spark.sql.sources.partitionOverwriteMode.dynamic": "dynamic"
        }
        _config = build_sql_config(oss_location="oss://testBucketName/sql_result",
                                   spark_sql_runtime_config = _spark_sql_runtime_config)
        # 需要執行的SQL
        _query = """
        SHOW DATABASES;
        SELECT 100;
        """
    
        _query_id = execute_sql(client = _client,
                                dbcluster_id=_db,
                                resource_group_name=_rg_name,
                                query=_query, runtime_config=_config)
        logger.info(f"Run query_id: {_query_id} for SQL {_query}.\n Waiting for result...")
    
        # 等待SQL執行完成
        current_ts = time.time()
        while True:
            query_state, resp = get_query_state(_client, _query_id)
            """
            query_state 有如下幾種狀態:
            - PENDING: 在服務隊列排隊中, 此時Spark Interactive型資源群組正在啟動
            - SUBMITTED: 提交到了Spark Interactive型資源群組
            - RUNNING: SQL正在執行
            - FINISHED: SQL執行完成, 沒有報錯
            - FAILED: SQL執行失敗
            - CANCELED: SQL執行被取消
            """
    
            if query_state == "FINISHED":
                logger.info("query finished success")
                break
            elif query_state == "FAILED":
                # 列印失敗資訊
                logger.error("Error Info: {}", resp.body.data)
                exit(1)
            elif query_state == "CANCELED":
                # 列印取消資訊
                logger.error("query canceled")
                exit(1)
            else:
                time.sleep(2)
                if time.time() - current_ts > 600:
                    logger.error("query timeout")
                    # 執行時間超過10分鐘, 取消SQL執行
                    _client.cancel_spark_warehouse_batch_sql(CancelSparkWarehouseBatchSQLRequest(query_id=_query_id))
                    exit(1)
    
        # 一個Query可以包含多個語句, 列舉所有的語句
        for stmt in resp.body.data.statements:
            logger.info(
                f"statement_id: {stmt.statement_id}, result location: {stmt.result_uri}")
            # 查看結果的範例程式碼
            _bucket = stmt.result_uri.split("oss://")[1].split("/")[0]
            _dir = stmt.result_uri.replace(f"oss://{_bucket}/", "").replace("//", "/")
            oss_client = oss2.Bucket(oss2.Auth(client_config.access_key_id, client_config.access_key_secret),
                                     f"oss-{_region}.aliyuncs.com", _bucket)
            list_csv_files(oss_client, _dir)
    
        # 查詢在Spark Interactive資源群組中執行的所有的SQL,可以分頁查詢
        logger.info("List all history query")
        page_num = 1
        no_more_page = list_history_query(_client, _db, _rg_name, page_num)
        while no_more_page:
            logger.info(f"List page {page_num}")
            page_num += 1
            no_more_page = list_history_query(_client, _db, _rg_name, page_num)
    

    參數說明:

    • _ak:阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey ID。如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權

    • _sk:阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey Secret。如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權

    • region:AnalyticDB for MySQL叢集所屬地區ID。

    • _db:AnalyticDB for MySQL叢集ID。

    • _rg_nameSpark Interactive型資源群組的名稱。

    • oss_location(可選):查詢結果檔案儲存的OSS路徑。

      若不填寫該參數,您僅可以在作業開發 > Spark Jar開發頁面,對應SQL查詢語句的日誌中,查看到5行資料。

應用程式

Hive JDBC

  1. 在pom.xml中配置Maven依賴。

        <dependency>
          <groupId>org.apache.hive</groupId>
          <artifactId>hive-jdbc</artifactId>
          <version>2.3.9</version>
        </dependency>
  2. 建立串連並執行Spark SQL。

    public class java {
        public static void main(String[] args) throws Exception {
    
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            String url = "<串連地址>";
            Connection con = DriverManager.getConnection(url, "<使用者名稱>", "<密碼>");
            Statement stmt = con.createStatement();
            ResultSet tables = stmt.executeQuery("show tables");
            List<String> tbls = new ArrayList<>();
            while (tables.next()) {
                System.out.println(tables.getString("tableName"));
                tbls.add(tables.getString("tableName"));
            }
        }
    }

    參數說明:

    • 串連地址:準備工作擷取的Spark Interactive型資源群組JDBC串連串。其中default需替換成實際串連的資料庫名稱。

    • 使用者名稱:AnalyticDB for MySQL的資料庫帳號。

    • 密碼:AnalyticDB for MySQL資料庫帳號的密碼。

PyHive

  1. 安裝Python Hive用戶端。

    pip install pyhive
  2. 建立串連並執行Spark SQL。

    from pyhive import hive
    from TCLIService.ttypes import TOperationState
    
    cursor = hive.connect(
        host='<串連地址>',
        port=<連接埠號碼>,
        username='<資源群組名稱>/<使用者名稱>',
        password='<密碼>',
        auth='CUSTOM'
    ).cursor()
    
    cursor.execute('show tables')
    
    status = cursor.poll().operationState
    while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
        logs = cursor.fetch_logs()
        for message in logs:
            print(message)
        # If needed, an asynchronous query can be cancelled at any time with:
        # cursor.cancel()
        status = cursor.poll().operationState
    print(cursor.fetchall())
    

    參數說明:

    • 串連地址:準備工作擷取的Spark Interactive型資源群組串連地址。

    • 連接埠號碼:Spark Interactive型資源群組的連接埠號碼,固定為10000

    • 資源群組名稱:Spark Interactive型資源群組的名稱。

    • 使用者名稱:AnalyticDB for MySQL的資料庫帳號。

    • 密碼:AnalyticDB for MySQL資料庫帳號的密碼。

用戶端

除了在本文中詳細介紹的Beeline、DBeaver、DBVisualizer、Datagrip用戶端外,您還可以在AirflowAzkabanDolphinScheduler等工作流程調度工具中執行互動式分析。

Beeline

  1. 串連Spark Interactive型資源群組

    命令格式如下:

    !connect <串連地址> <使用者名稱> <密碼>
    • 串連地址:準備工作擷取的Spark Interactive型資源群組JDBC串連串。其中default需替換成實際串連的資料庫名稱。

    • 使用者名稱:AnalyticDB for MySQL的資料庫帳號。

    • 密碼:AnalyticDB for MySQL資料庫帳號的密碼。

    樣本:

    !connect jdbc:hive2://amv-bp1c3em7b2e****-spark.ads.aliyuncs.com:10000/adb_test spark_resourcegroup/AdbSpark14**** Spark23****

    返回結果:

    Connected to: Spark SQL (version 3.2.0)
    Driver: Hive JDBC (version 2.3.9)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
  2. 執行Spark SQL。

    SHOW TABLES;

DBeaver

  1. 開啟DBeaver用戶端,單擊資料庫 > 建立資料庫連接

  2. 串連到資料庫頁面,選擇Apache Spark,單擊下一步

  3. 配置Hadoop/Apache Spark 串連設定,參數說明如下:

    參數

    說明

    串連方式

    串連方式選擇為URL

    JDBC URL

    請填寫準備工作中擷取的JDBC串連串。

    重要

    串連串中的default需替換為實際的資料庫名。

    使用者名稱

    AnalyticDB for MySQL的資料庫帳號。

    密碼

    AnalyticDB for MySQL資料庫帳號的密碼。

  4. 上述參數配置完成後,單擊測試連接

    重要

    首次測試連接時,DBeaver會自動擷取需要下載的驅動資訊,擷取完成後,請單擊下載,下載相關驅動。

  5. 測試連接成功後,單擊完成

  6. 資料庫導航頁簽下,展開對應資料來源的子目錄,單擊對應資料庫。

  7. 在右側代碼框中輸入SQL語句,並單擊image按鈕運行。

    SHOW TABLES;

    返回結果如下:

    +-----------+-----------+-------------+
    | namespace | tableName | isTemporary |
    +-----------+-----------+-------------+
    |    db     |   test    |     []      |
    +-----------+-----------+-------------+

DBVisualizer

  1. 開啟DBVisualizer用戶端,單擊Tools > Driver Manager

  2. Driver Manage頁面,選擇Hive,單擊image按鈕

  3. Driver Settings頁簽下,配置如下參數:

    參數

    說明

    Name

    Hive資料來源名稱,您可以自訂名稱。

    URL Format

    請填寫準備工作中擷取的JDBC串連串。

    重要

    串連串中的default需替換為實際的資料庫名。

    Driver Class

    Hive驅動,固定選擇為org.apache.hive.jdbc.HiveDriver

    重要

    參數配置完成後,請單擊Start Download,下載對應驅動。

  4. 驅動下載完成後,單擊Database > Create Database Connection > Create Database Connection from Database URL

  5. Create Database Connection from Database URL對話方塊中填寫以下參數:

    參數

    說明

    Database URL

    請填寫準備工作中擷取的JDBC串連串。

    重要

    串連串中的default需替換為實際的資料庫名。

    Driver Class

    選擇步驟3建立的Hive資料來源。

  6. Connection頁面配置以下串連參數,並單擊Connect

    參數

    說明

    Name

    預設與步驟3建立的Hive資料來源同名,您可以自訂名稱。

    Notes

    備忘資訊。

    Driver Type

    選擇Hive

    Database URL

    請填寫準備工作中擷取的JDBC串連串。

    重要

    串連串中的default需替換為實際的資料庫名。

    Database Userid

    AnalyticDB for MySQL的資料庫帳號。

    Database Password

    AnalyticDB for MySQL資料庫帳號的密碼。

    說明

    其他參數無需配置,使用預設值即可。

  7. 串連成功後,在Database頁簽下,展開對應資料來源的子目錄,單擊對應資料庫。

  8. 在右側代碼框中輸入SQL語句,並單擊image按鈕運行。

    SHOW TABLES;

    返回結果如下:

    +-----------+-----------+-------------+
    | namespace | tableName | isTemporary |
    +-----------+-----------+-------------+
    |    db     |   test    |    false    |
    +-----------+-----------+-------------+

Datagrip

  1. 開啟Datagrip用戶端,單擊Projects > New Projects,建立專案。

  2. 添加資料來源。

    1. 單擊image按鈕,選擇Data Source > Other > Apache Spark

    2. 在彈出的Data Sources and Drivers對話方塊中配置如下參數後,單擊OK

      image

      參數

      說明

      Name

      資料來源名稱,您可以自訂。本文樣本為adbtest

      Host

      請填寫準備工作中擷取的JDBC串連串。

      重要

      串連串中的default需替換為實際的資料庫名。

      Port

      Spark Interactive型資源群組的連接埠號碼,固定為10000

      User

      AnalyticDB for MySQL的資料庫帳號。

      Password

      AnalyticDB for MySQL資料庫帳號的密碼。

      Schema

      AnalyticDB for MySQL叢集的資料庫名稱。

  3. 執行Spark SQL。

    1. 在資料來源列表中,右擊步驟2建立的資料來源,選擇New > Query Console

    2. 在右側Console面板中執行Spark SQL。

      SHOW TABLES;

BI工具

您可以在RedashPower BIMetabaseBI工具中執行互動式分析。