Run Spark SQL queries interactively against a Spark Interactive resource group in AnalyticDB for MySQL. The resource group scales automatically within a configured range, balancing performance and cost. This topic explains how to connect and run queries using the console, Python SDK, Hive JDBC, PyHive, Beeline, DBeaver, DBVisualizer, and DataGrip.
Prerequisites
Before you begin, ensure that you have:
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster
An Object Storage Service (OSS) bucket in the same region as the cluster
A database account for the cluster:
Alibaba Cloud account: create a privileged account
Resource Access Management (RAM) user: create a privileged account and a standard account, then associate the standard account with the RAM user
Java 8 and Python 3.9 installed (required to run Hive JDBC applications, Python applications, and Beeline)
The client IP address added to the cluster whitelist
Usage notes
If the Spark Interactive resource group is stopped, the cluster restarts it automatically when you run the first query. The first query may stay in the queue for a while during startup.
Spark cannot read from or write to the
INFORMATION_SCHEMAandMYSQLdatabases. Do not use either database as the initial connection target.The database account used to submit Spark SQL jobs must have permission to access the target database. Queries fail if permission is missing.
Get the connection endpoint
Before connecting with any client tool, get the endpoint for your Spark Interactive resource group.
Create a Spark Interactive resource group if you have not done so.
Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left navigation pane, click Clusters, then click the cluster ID.
In the left navigation pane, choose Cluster Management > Resource Management, then click the Resource Groups tab.
Find the resource group and click Details in the Actions column. The internal endpoint and public endpoint are displayed. Click the
icon next to an endpoint to copy it. Click the same icon next to Port to copy the full JDBC connection string. If your client tool runs on a local machine, an external server, or an ECS instance that is not in the same VPC as the cluster, click Apply for Endpoint next to Public Endpoint to request a public endpoint.
Connect and run queries
Console
If you use a self-managed Hive Metastore, create a database named default in AnalyticDB for MySQL first, then select that database when running Spark SQL jobs in the console.
Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left navigation pane, click Clusters, then click the cluster ID.
In the left navigation pane, choose Job Development > SQL Development.
Select the Spark engine and your Spark Interactive resource group, then run:
SHOW DATABASES;
Python SDK
The SDK submits queries asynchronously. Results are written to an OSS path you specify, and you can download the result file to view it locally.
All examples use ExecuteSparkWarehouseBatchSQLRequest to submit a query, poll for completion via GetSparkWarehouseBatchSQLRequest, then read each statement's result from OSS.
Install the SDK and dependencies.
pip install alibabacloud-adb20211201 oss2 loguruSubmit and retrieve Spark SQL results.
Placeholder Description <your-access-key-id>AccessKey ID of an Alibaba Cloud account or RAM user with AnalyticDB for MySQL access permissions. See Accounts and permissions. <your-access-key-secret>AccessKey secret for the same account or RAM user. <region-id>Region ID of the cluster, for example, cn-shanghai.<cluster-id>AnalyticDB for MySQL cluster ID, for example, amv-uf6485635f****.<resource-group-name>Name of the Spark Interactive resource group. oss://<your-bucket>/sql_result(Optional) OSS path for result files. If omitted, only the first five rows appear in the query logs on the Job Development > Spark JAR Development page. # coding: utf-8 import csv import json import time from io import StringIO import oss2 from alibabacloud_adb20211201.client import Client from alibabacloud_adb20211201.models import ( ExecuteSparkWarehouseBatchSQLRequest, ExecuteSparkWarehouseBatchSQLResponse, GetSparkWarehouseBatchSQLRequest, GetSparkWarehouseBatchSQLResponse, ListSparkWarehouseBatchSQLRequest, CancelSparkWarehouseBatchSQLRequest, ListSparkWarehouseBatchSQLResponse, ) from alibabacloud_tea_openapi.models import Config from loguru import logger def build_sql_config( oss_location, spark_sql_runtime_config: dict = None, file_format="CSV", output_partitions=1, sep="|", ): """ Build the output configuration for a Spark SQL execution. :param oss_location: OSS path where results are stored. Must start with oss://. :param spark_sql_runtime_config: Additional Spark SQL runtime properties. :param file_format: Output format — CSV, PARQUET, ORC, or JSON. Default: CSV. :param output_partitions: Number of output partitions. Increase this for large result sets to avoid generating a single oversized file. :param sep: Column separator for CSV output. Ignored for non-CSV formats. """ if oss_location is None: raise ValueError("oss_location is required") if not oss_location.startswith("oss://"): raise ValueError("oss_location must start with oss://") if file_format not in ("CSV", "PARQUET", "ORC", "JSON"): raise ValueError("file_format must be CSV, PARQUET, ORC, or JSON") runtime_config = { "spark.adb.sqlOutputFormat": file_format, "spark.adb.sqlOutputPartitions": output_partitions, "spark.adb.sqlOutputLocation": oss_location, "sep": sep, } if spark_sql_runtime_config: runtime_config.update(spark_sql_runtime_config) return runtime_config def execute_sql( client: Client, dbcluster_id: str, resource_group_name: str, query: str, limit=10000, runtime_config: dict = None, schema="default", ): """ Submit a Spark SQL query to a Spark Interactive resource group. :param client: Alibaba Cloud SDK client. :param dbcluster_id: Cluster ID. :param resource_group_name: Name of the Spark Interactive resource group. :param query: SQL statements to run. Separate multiple statements with semicolons. :param limit: Maximum number of rows to return. :param runtime_config: Spark SQL runtime properties (e.g., output format, partitions). :param schema: Default database name. Defaults to "default". :return: Query ID for polling. """ req = ExecuteSparkWarehouseBatchSQLRequest() req.dbcluster_id = dbcluster_id req.resource_group_name = resource_group_name req.execute_time_limit_in_seconds = 3600 # Timeout: 1 hour req.schema = schema req.query = query req.execute_result_limit = limit if runtime_config: req.runtime_config = json.dumps(runtime_config) resp: ExecuteSparkWarehouseBatchSQLResponse = client.execute_spark_warehouse_batch_sql(req) logger.info("Query submitted: {}", resp.body.data.query_id) return resp.body.data.query_id def get_query_state(client, query_id): """ Poll the execution status of a submitted query. :return: Tuple of (query_state, response). query_state is one of: PENDING — queued; the Spark Interactive resource group may be starting. SUBMITTED — accepted by the resource group. RUNNING — executing. FINISHED — completed successfully. FAILED — execution failed. CANCELED — canceled by the user. """ req = GetSparkWarehouseBatchSQLRequest(query_id=query_id) resp: GetSparkWarehouseBatchSQLResponse = client.get_spark_warehouse_batch_sql(req) logger.info("Query state: {}", resp.body.data.query_state) return resp.body.data.query_state, resp def list_history_queries(client, db_cluster, resource_group_name, page_num): """ List queries previously run in the Spark Interactive resource group (paginated, 10 per page). :return: True if there are no more pages, False otherwise. """ req = ListSparkWarehouseBatchSQLRequest( dbcluster_id=db_cluster, resource_group_name=resource_group_name, page_number=page_num, ) resp: ListSparkWarehouseBatchSQLResponse = client.list_spark_warehouse_batch_sql(req) if resp.body.data.queries is None: return True for query in resp.body.data.queries: logger.info("Query ID: {}, State: {}", query.query_id, query.query_state) logger.info("Total on this page: {}", len(resp.body.data.queries)) return len(resp.body.data.queries) < 10 def list_csv_files(oss_client, dir): for obj in oss_client.list_objects_v2(dir).object_list: if obj.key.endswith(".csv"): logger.info("Reading {}", obj.key) csv_content = oss_client.get_object(obj.key).read().decode("utf-8") for row in csv.DictReader(StringIO(csv_content)): print(row) if __name__ == "__main__": logger.info("ADB Spark SQL demo") _ak = "<your-access-key-id>" # AccessKey ID _sk = "<your-access-key-secret>" # AccessKey secret _region = "<region-id>" # Region ID, e.g., cn-shanghai _db = "<cluster-id>" # AnalyticDB for MySQL cluster ID _rg_name = "<resource-group-name>" # Spark Interactive resource group name client_config = Config( access_key_id=_ak, access_key_secret=_sk, endpoint=f"adb.{_region}.aliyuncs.com", # Use adb-vpc.<region>.aliyuncs.com for VPC ) _client = Client(client_config) _spark_sql_runtime_config = { "spark.sql.shuffle.partitions": 1000, "spark.sql.autoBroadcastJoinThreshold": 104857600, "spark.sql.sources.partitionOverwriteMode": "dynamic", "spark.sql.sources.partitionOverwriteMode.dynamic": "dynamic", } _config = build_sql_config( oss_location="oss://<your-bucket>/sql_result", spark_sql_runtime_config=_spark_sql_runtime_config, ) _query = """ SHOW DATABASES; SELECT 100; """ _query_id = execute_sql( client=_client, dbcluster_id=_db, resource_group_name=_rg_name, query=_query, runtime_config=_config, ) logger.info("Waiting for query {} to finish...", _query_id) # Poll until the query finishes, fails, or times out (10 minutes). current_ts = time.time() while True: query_state, resp = get_query_state(_client, _query_id) if query_state == "FINISHED": logger.info("Query finished successfully") break elif query_state == "FAILED": logger.error("Query failed: {}", resp.body.data) exit(1) elif query_state == "CANCELED": logger.error("Query was canceled") exit(1) else: time.sleep(2) if time.time() - current_ts > 600: logger.error("Query timed out after 10 minutes") _client.cancel_spark_warehouse_batch_sql( CancelSparkWarehouseBatchSQLRequest(query_id=_query_id) ) exit(1) # A query can contain multiple statements. Iterate over each statement's result. for stmt in resp.body.data.statements: logger.info("Statement {}: result at {}", stmt.statement_id, stmt.result_uri) _bucket = stmt.result_uri.split("oss://")[1].split("/")[0] _dir = stmt.result_uri.replace(f"oss://{_bucket}/", "").replace("//", "/") oss_client = oss2.Bucket( oss2.Auth(_ak, _sk), f"oss-{_region}.aliyuncs.com", _bucket, ) list_csv_files(oss_client, _dir) # List execution history with pagination. logger.info("Listing query history") page_num = 1 no_more_pages = list_history_queries(_client, _db, _rg_name, page_num) while not no_more_pages: page_num += 1 no_more_pages = list_history_queries(_client, _db, _rg_name, page_num)Replace the following placeholders:
Hive JDBC
Add the Hive JDBC driver to your
pom.xml.<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.3.9</version> </dependency>Connect and run a query.
Parameter Description <jdbc-connection-string>JDBC connection string from the resource group Details page. Replace defaultin the string with your database name.<username>Database account for AnalyticDB for MySQL. <password>Password for the database account. public class SparkSqlExample { public static void main(String[] args) throws Exception { Class.forName("org.apache.hive.jdbc.HiveDriver"); // JDBC connection string from the resource group Details page. // Replace "default" with your database name. String url = "<jdbc-connection-string>"; Connection con = DriverManager.getConnection( url, "<username>", // AnalyticDB for MySQL database account "<password>" // Password for the database account ); Statement stmt = con.createStatement(); ResultSet tables = stmt.executeQuery("SHOW TABLES"); while (tables.next()) { System.out.println(tables.getString("tableName")); } } }
PyHive
Install the Python Hive client.
pip install pyhiveConnect and run a query.
Parameter Description <endpoint>Endpoint from the resource group Details page. <resource-group-name>Name of the Spark Interactive resource group. <username>Database account for AnalyticDB for MySQL. <password>Password for the database account. from pyhive import hive from TCLIService.ttypes import TOperationState cursor = hive.connect( host="<endpoint>", # Endpoint from the resource group Details page port=10000, # Port is fixed at 10000 username="<resource-group-name>/<username>", # Format: resource_group/db_account password="<password>", auth="CUSTOM", ).cursor() cursor.execute("SHOW TABLES") status = cursor.poll().operationState while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): for message in cursor.fetch_logs(): print(message) # To cancel an in-progress query: cursor.cancel() status = cursor.poll().operationState print(cursor.fetchall())
Beeline
Connect to the Spark Interactive resource group.
!connect <jdbc-connection-string> <username> <password>Replace
defaultin the JDBC connection string with your database name. Example:!connect jdbc:hive2://amv-bp1c3em7b2e****-spark.ads.aliyuncs.com:10000/adb_test spark_resourcegroup/AdbSpark14**** Spark23****A successful connection returns:
Connected to: Spark SQL (version 3.2.0) Driver: Hive JDBC (version 2.3.9) Transaction isolation: TRANSACTION_REPEATABLE_READRun a query.
SHOW TABLES;
DBeaver
Install DBeaver and choose Database > New Database Connection.
On the Connect to a database page, select Apache Spark and click Next.
Configure the connection settings.
Parameter Value Connection method URL JDBC URL JDBC connection string from the resource group Details page. Replace defaultwith your database name.Username Database account for AnalyticDB for MySQL. Password Password for the database account. Click Test Connection.
The first time you test the connection, DBeaver downloads the required drivers automatically. Click Download when prompted.
After the test succeeds, click Finish.
In the Database Navigator tab, expand the data source and click the database.
Enter a query in the SQL editor and click the
icon to run it.SHOW TABLES;Expected output:
+-----------+-----------+-------------+ | namespace | tableName | isTemporary | +-----------+-----------+-------------+ | db | test | [] | +-----------+-----------+-------------+
DBVisualizer
Open DBVisualizer and choose Tools > Driver Manager.
Select Hive and click the
icon.On the Driver Settings tab, configure the Hive driver.
ImportantClick Start Download to download the driver before proceeding.
Parameter Value Name A name for the Hive data source (customizable). URL Format JDBC connection string from the resource group Details page. Replace defaultwith your database name.Driver Class org.apache.hive.jdbc.HiveDriverAfter the driver downloads, choose Database > Create Database Connection > Create Database Connection from Database URL.
In the Create Database Connection from Database URL dialog box, set the following parameters.
Parameter Value Database URL JDBC connection string from the resource group Details page. Replace defaultwith your database name.Driver Class Select the Hive data source you created in step 3. On the Connection page, set the following parameters and click Connect.
Leave all other parameters at their default values.
Parameter Value Name Defaults to the Hive data source name from step 3. Customizable. Notes Optional remarks. Driver Type Hive Database URL JDBC connection string from the resource group Details page. Replace defaultwith your database name.Database Userid Database account for AnalyticDB for MySQL. Database Password Password for the database account. After the connection succeeds, expand the data source in the Database tab and click the database.
Enter a query in the SQL editor and click the
icon to run it.SHOW TABLES;Expected output:
+-----------+-----------+-------------+ | namespace | tableName | isTemporary | +-----------+-----------+-------------+ | db | test | false | +-----------+-----------+-------------+
DataGrip
Open DataGrip and choose Projects > New Projects to create a project.
Add a data source.
Click the
icon and choose Data Source > Other > Apache Spark.In the Data Sources and Drivers dialog box, configure the following parameters and click OK.

Parameter
Value
Name
A name for the data source (customizable).
Host
JDBC connection string from the resource group Details page. Replace
defaultwith your database name.Port
10000(fixed)User
Database account for AnalyticDB for MySQL.
Password
Password for the database account.
Schema
Database name in the AnalyticDB for MySQL cluster.
Run a query.
In the data source list, right-click the data source and choose New > Query Console.
In the Console panel, run: ``
sql SHOW TABLES;``
What's next
Connect to AnalyticDB for MySQL Spark Interactive resource groups from workflow scheduling tools:
For interactive analysis in BI tools, see Redash, Power BI, and Metabase.