All Products
Search
Document Center

AnalyticDB:Interactive analysis with Spark SQL

Last Updated:Mar 28, 2026

Run Spark SQL queries interactively against a Spark Interactive resource group in AnalyticDB for MySQL. The resource group scales automatically within a configured range, balancing performance and cost. This topic explains how to connect and run queries using the console, Python SDK, Hive JDBC, PyHive, Beeline, DBeaver, DBVisualizer, and DataGrip.

Prerequisites

Before you begin, ensure that you have:

Usage notes

  • If the Spark Interactive resource group is stopped, the cluster restarts it automatically when you run the first query. The first query may stay in the queue for a while during startup.

  • Spark cannot read from or write to the INFORMATION_SCHEMA and MYSQL databases. Do not use either database as the initial connection target.

  • The database account used to submit Spark SQL jobs must have permission to access the target database. Queries fail if permission is missing.

Get the connection endpoint

Before connecting with any client tool, get the endpoint for your Spark Interactive resource group.

  1. Create a Spark Interactive resource group if you have not done so.

  2. Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left navigation pane, click Clusters, then click the cluster ID.

  3. In the left navigation pane, choose Cluster Management > Resource Management, then click the Resource Groups tab.

  4. Find the resource group and click Details in the Actions column. The internal endpoint and public endpoint are displayed. Click the image icon next to an endpoint to copy it. Click the same icon next to Port to copy the full JDBC connection string. If your client tool runs on a local machine, an external server, or an ECS instance that is not in the same VPC as the cluster, click Apply for Endpoint next to Public Endpoint to request a public endpoint.

    image

Connect and run queries

Console

Important

If you use a self-managed Hive Metastore, create a database named default in AnalyticDB for MySQL first, then select that database when running Spark SQL jobs in the console.

  1. Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left navigation pane, click Clusters, then click the cluster ID.

  2. In the left navigation pane, choose Job Development > SQL Development.

  3. Select the Spark engine and your Spark Interactive resource group, then run:

    SHOW DATABASES;

Python SDK

The SDK submits queries asynchronously. Results are written to an OSS path you specify, and you can download the result file to view it locally.

All examples use ExecuteSparkWarehouseBatchSQLRequest to submit a query, poll for completion via GetSparkWarehouseBatchSQLRequest, then read each statement's result from OSS.

  1. Install the SDK and dependencies.

    pip install alibabacloud-adb20211201 oss2 loguru
  2. Submit and retrieve Spark SQL results.

    PlaceholderDescription
    <your-access-key-id>AccessKey ID of an Alibaba Cloud account or RAM user with AnalyticDB for MySQL access permissions. See Accounts and permissions.
    <your-access-key-secret>AccessKey secret for the same account or RAM user.
    <region-id>Region ID of the cluster, for example, cn-shanghai.
    <cluster-id>AnalyticDB for MySQL cluster ID, for example, amv-uf6485635f****.
    <resource-group-name>Name of the Spark Interactive resource group.
    oss://<your-bucket>/sql_result(Optional) OSS path for result files. If omitted, only the first five rows appear in the query logs on the Job Development > Spark JAR Development page.
    # coding: utf-8
    import csv
    import json
    import time
    from io import StringIO
    
    import oss2
    from alibabacloud_adb20211201.client import Client
    from alibabacloud_adb20211201.models import (
        ExecuteSparkWarehouseBatchSQLRequest,
        ExecuteSparkWarehouseBatchSQLResponse,
        GetSparkWarehouseBatchSQLRequest,
        GetSparkWarehouseBatchSQLResponse,
        ListSparkWarehouseBatchSQLRequest,
        CancelSparkWarehouseBatchSQLRequest,
        ListSparkWarehouseBatchSQLResponse,
    )
    from alibabacloud_tea_openapi.models import Config
    from loguru import logger
    
    
    def build_sql_config(
        oss_location,
        spark_sql_runtime_config: dict = None,
        file_format="CSV",
        output_partitions=1,
        sep="|",
    ):
        """
        Build the output configuration for a Spark SQL execution.
    
        :param oss_location: OSS path where results are stored. Must start with oss://.
        :param spark_sql_runtime_config: Additional Spark SQL runtime properties.
        :param file_format: Output format — CSV, PARQUET, ORC, or JSON. Default: CSV.
        :param output_partitions: Number of output partitions. Increase this for large result sets
                                  to avoid generating a single oversized file.
        :param sep: Column separator for CSV output. Ignored for non-CSV formats.
        """
        if oss_location is None:
            raise ValueError("oss_location is required")
        if not oss_location.startswith("oss://"):
            raise ValueError("oss_location must start with oss://")
        if file_format not in ("CSV", "PARQUET", "ORC", "JSON"):
            raise ValueError("file_format must be CSV, PARQUET, ORC, or JSON")
    
        runtime_config = {
            "spark.adb.sqlOutputFormat": file_format,
            "spark.adb.sqlOutputPartitions": output_partitions,
            "spark.adb.sqlOutputLocation": oss_location,
            "sep": sep,
        }
        if spark_sql_runtime_config:
            runtime_config.update(spark_sql_runtime_config)
        return runtime_config
    
    
    def execute_sql(
        client: Client,
        dbcluster_id: str,
        resource_group_name: str,
        query: str,
        limit=10000,
        runtime_config: dict = None,
        schema="default",
    ):
        """
        Submit a Spark SQL query to a Spark Interactive resource group.
    
        :param client: Alibaba Cloud SDK client.
        :param dbcluster_id: Cluster ID.
        :param resource_group_name: Name of the Spark Interactive resource group.
        :param query: SQL statements to run. Separate multiple statements with semicolons.
        :param limit: Maximum number of rows to return.
        :param runtime_config: Spark SQL runtime properties (e.g., output format, partitions).
        :param schema: Default database name. Defaults to "default".
        :return: Query ID for polling.
        """
        req = ExecuteSparkWarehouseBatchSQLRequest()
        req.dbcluster_id = dbcluster_id
        req.resource_group_name = resource_group_name
        req.execute_time_limit_in_seconds = 3600       # Timeout: 1 hour
        req.schema = schema
        req.query = query
        req.execute_result_limit = limit
        if runtime_config:
            req.runtime_config = json.dumps(runtime_config)
    
        resp: ExecuteSparkWarehouseBatchSQLResponse = client.execute_spark_warehouse_batch_sql(req)
        logger.info("Query submitted: {}", resp.body.data.query_id)
        return resp.body.data.query_id
    
    
    def get_query_state(client, query_id):
        """
        Poll the execution status of a submitted query.
    
        :return: Tuple of (query_state, response). query_state is one of:
                 PENDING   — queued; the Spark Interactive resource group may be starting.
                 SUBMITTED — accepted by the resource group.
                 RUNNING   — executing.
                 FINISHED  — completed successfully.
                 FAILED    — execution failed.
                 CANCELED  — canceled by the user.
        """
        req = GetSparkWarehouseBatchSQLRequest(query_id=query_id)
        resp: GetSparkWarehouseBatchSQLResponse = client.get_spark_warehouse_batch_sql(req)
        logger.info("Query state: {}", resp.body.data.query_state)
        return resp.body.data.query_state, resp
    
    
    def list_history_queries(client, db_cluster, resource_group_name, page_num):
        """
        List queries previously run in the Spark Interactive resource group (paginated, 10 per page).
    
        :return: True if there are no more pages, False otherwise.
        """
        req = ListSparkWarehouseBatchSQLRequest(
            dbcluster_id=db_cluster,
            resource_group_name=resource_group_name,
            page_number=page_num,
        )
        resp: ListSparkWarehouseBatchSQLResponse = client.list_spark_warehouse_batch_sql(req)
        if resp.body.data.queries is None:
            return True
        for query in resp.body.data.queries:
            logger.info("Query ID: {}, State: {}", query.query_id, query.query_state)
        logger.info("Total on this page: {}", len(resp.body.data.queries))
        return len(resp.body.data.queries) < 10
    
    
    def list_csv_files(oss_client, dir):
        for obj in oss_client.list_objects_v2(dir).object_list:
            if obj.key.endswith(".csv"):
                logger.info("Reading {}", obj.key)
                csv_content = oss_client.get_object(obj.key).read().decode("utf-8")
                for row in csv.DictReader(StringIO(csv_content)):
                    print(row)
    
    
    if __name__ == "__main__":
        logger.info("ADB Spark SQL demo")
    
        _ak = "<your-access-key-id>"          # AccessKey ID
        _sk = "<your-access-key-secret>"      # AccessKey secret
        _region = "<region-id>"               # Region ID, e.g., cn-shanghai
        _db = "<cluster-id>"                  # AnalyticDB for MySQL cluster ID
        _rg_name = "<resource-group-name>"    # Spark Interactive resource group name
    
        client_config = Config(
            access_key_id=_ak,
            access_key_secret=_sk,
            endpoint=f"adb.{_region}.aliyuncs.com",   # Use adb-vpc.<region>.aliyuncs.com for VPC
        )
        _client = Client(client_config)
    
        _spark_sql_runtime_config = {
            "spark.sql.shuffle.partitions": 1000,
            "spark.sql.autoBroadcastJoinThreshold": 104857600,
            "spark.sql.sources.partitionOverwriteMode": "dynamic",
            "spark.sql.sources.partitionOverwriteMode.dynamic": "dynamic",
        }
        _config = build_sql_config(
            oss_location="oss://<your-bucket>/sql_result",
            spark_sql_runtime_config=_spark_sql_runtime_config,
        )
    
        _query = """
        SHOW DATABASES;
        SELECT 100;
        """
        _query_id = execute_sql(
            client=_client,
            dbcluster_id=_db,
            resource_group_name=_rg_name,
            query=_query,
            runtime_config=_config,
        )
        logger.info("Waiting for query {} to finish...", _query_id)
    
        # Poll until the query finishes, fails, or times out (10 minutes).
        current_ts = time.time()
        while True:
            query_state, resp = get_query_state(_client, _query_id)
            if query_state == "FINISHED":
                logger.info("Query finished successfully")
                break
            elif query_state == "FAILED":
                logger.error("Query failed: {}", resp.body.data)
                exit(1)
            elif query_state == "CANCELED":
                logger.error("Query was canceled")
                exit(1)
            else:
                time.sleep(2)
                if time.time() - current_ts > 600:
                    logger.error("Query timed out after 10 minutes")
                    _client.cancel_spark_warehouse_batch_sql(
                        CancelSparkWarehouseBatchSQLRequest(query_id=_query_id)
                    )
                    exit(1)
    
        # A query can contain multiple statements. Iterate over each statement's result.
        for stmt in resp.body.data.statements:
            logger.info("Statement {}: result at {}", stmt.statement_id, stmt.result_uri)
            _bucket = stmt.result_uri.split("oss://")[1].split("/")[0]
            _dir = stmt.result_uri.replace(f"oss://{_bucket}/", "").replace("//", "/")
            oss_client = oss2.Bucket(
                oss2.Auth(_ak, _sk),
                f"oss-{_region}.aliyuncs.com",
                _bucket,
            )
            list_csv_files(oss_client, _dir)
    
        # List execution history with pagination.
        logger.info("Listing query history")
        page_num = 1
        no_more_pages = list_history_queries(_client, _db, _rg_name, page_num)
        while not no_more_pages:
            page_num += 1
            no_more_pages = list_history_queries(_client, _db, _rg_name, page_num)

    Replace the following placeholders:

Hive JDBC

  1. Add the Hive JDBC driver to your pom.xml.

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>2.3.9</version>
    </dependency>
  2. Connect and run a query.

    ParameterDescription
    <jdbc-connection-string>JDBC connection string from the resource group Details page. Replace default in the string with your database name.
    <username>Database account for AnalyticDB for MySQL.
    <password>Password for the database account.
    public class SparkSqlExample {
        public static void main(String[] args) throws Exception {
            Class.forName("org.apache.hive.jdbc.HiveDriver");
    
            // JDBC connection string from the resource group Details page.
            // Replace "default" with your database name.
            String url = "<jdbc-connection-string>";
    
            Connection con = DriverManager.getConnection(
                url,
                "<username>",   // AnalyticDB for MySQL database account
                "<password>"    // Password for the database account
            );
            Statement stmt = con.createStatement();
            ResultSet tables = stmt.executeQuery("SHOW TABLES");
            while (tables.next()) {
                System.out.println(tables.getString("tableName"));
            }
        }
    }

PyHive

  1. Install the Python Hive client.

    pip install pyhive
  2. Connect and run a query.

    ParameterDescription
    <endpoint>Endpoint from the resource group Details page.
    <resource-group-name>Name of the Spark Interactive resource group.
    <username>Database account for AnalyticDB for MySQL.
    <password>Password for the database account.
    from pyhive import hive
    from TCLIService.ttypes import TOperationState
    
    cursor = hive.connect(
        host="<endpoint>",                        # Endpoint from the resource group Details page
        port=10000,                               # Port is fixed at 10000
        username="<resource-group-name>/<username>",  # Format: resource_group/db_account
        password="<password>",
        auth="CUSTOM",
    ).cursor()
    
    cursor.execute("SHOW TABLES")
    
    status = cursor.poll().operationState
    while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
        for message in cursor.fetch_logs():
            print(message)
        # To cancel an in-progress query: cursor.cancel()
        status = cursor.poll().operationState
    
    print(cursor.fetchall())

Beeline

  1. Connect to the Spark Interactive resource group.

    !connect <jdbc-connection-string> <username> <password>

    Replace default in the JDBC connection string with your database name. Example:

    !connect jdbc:hive2://amv-bp1c3em7b2e****-spark.ads.aliyuncs.com:10000/adb_test spark_resourcegroup/AdbSpark14**** Spark23****

    A successful connection returns:

    Connected to: Spark SQL (version 3.2.0)
    Driver: Hive JDBC (version 2.3.9)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
  2. Run a query.

    SHOW TABLES;

DBeaver

  1. Install DBeaver and choose Database > New Database Connection.

  2. On the Connect to a database page, select Apache Spark and click Next.

  3. Configure the connection settings.

    ParameterValue
    Connection methodURL
    JDBC URLJDBC connection string from the resource group Details page. Replace default with your database name.
    UsernameDatabase account for AnalyticDB for MySQL.
    PasswordPassword for the database account.
  4. Click Test Connection.

    The first time you test the connection, DBeaver downloads the required drivers automatically. Click Download when prompted.
  5. After the test succeeds, click Finish.

  6. In the Database Navigator tab, expand the data source and click the database.

  7. Enter a query in the SQL editor and click the image icon to run it.

    SHOW TABLES;

    Expected output:

    +-----------+-----------+-------------+
    | namespace | tableName | isTemporary |
    +-----------+-----------+-------------+
    |    db     |   test    |     []      |
    +-----------+-----------+-------------+

DBVisualizer

  1. Open DBVisualizer and choose Tools > Driver Manager.

  2. Select Hive and click the image icon.

  3. On the Driver Settings tab, configure the Hive driver.

    Important

    Click Start Download to download the driver before proceeding.

    ParameterValue
    NameA name for the Hive data source (customizable).
    URL FormatJDBC connection string from the resource group Details page. Replace default with your database name.
    Driver Classorg.apache.hive.jdbc.HiveDriver
  4. After the driver downloads, choose Database > Create Database Connection > Create Database Connection from Database URL.

  5. In the Create Database Connection from Database URL dialog box, set the following parameters.

    ParameterValue
    Database URLJDBC connection string from the resource group Details page. Replace default with your database name.
    Driver ClassSelect the Hive data source you created in step 3.
  6. On the Connection page, set the following parameters and click Connect.

    Leave all other parameters at their default values.
    ParameterValue
    NameDefaults to the Hive data source name from step 3. Customizable.
    NotesOptional remarks.
    Driver TypeHive
    Database URLJDBC connection string from the resource group Details page. Replace default with your database name.
    Database UseridDatabase account for AnalyticDB for MySQL.
    Database PasswordPassword for the database account.
  7. After the connection succeeds, expand the data source in the Database tab and click the database.

  8. Enter a query in the SQL editor and click the image icon to run it.

    SHOW TABLES;

    Expected output:

    +-----------+-----------+-------------+
    | namespace | tableName | isTemporary |
    +-----------+-----------+-------------+
    |    db     |   test    |    false    |
    +-----------+-----------+-------------+

DataGrip

  1. Open DataGrip and choose Projects > New Projects to create a project.

  2. Add a data source.

    1. Click the image icon and choose Data Source > Other > Apache Spark.

    2. In the Data Sources and Drivers dialog box, configure the following parameters and click OK.

      image

      Parameter

      Value

      Name

      A name for the data source (customizable).

      Host

      JDBC connection string from the resource group Details page. Replace default with your database name.

      Port

      10000 (fixed)

      User

      Database account for AnalyticDB for MySQL.

      Password

      Password for the database account.

      Schema

      Database name in the AnalyticDB for MySQL cluster.

  3. Run a query.

    1. In the data source list, right-click the data source and choose New > Query Console.

    2. In the Console panel, run: ``sql SHOW TABLES; ``

What's next

Connect to AnalyticDB for MySQL Spark Interactive resource groups from workflow scheduling tools:

For interactive analysis in BI tools, see Redash, Power BI, and Metabase.