Anda dapat menjalankan kueri Spark SQL secara interaktif dengan menentukan kelompok sumber daya Spark Interactive. Sumber daya untuk kelompok ini secara otomatis diskalakan dalam rentang yang ditentukan, sehingga memenuhi kebutuhan analisis interaktif Anda sekaligus mengurangi biaya. Topik ini menjelaskan cara melakukan analisis interaktif dengan Spark SQL menggunakan alat klien seperti Konsol, Hive Java Database Connectivity (JDBC), PyHive, Beeline, dan DBeaver.
Prasyarat
Kluster AnalyticDB for MySQL Edisi Perusahaan, Edisi Dasar, atau Edisi Data Lakehouse telah dibuat.
Bucket Object Storage Service (OSS) telah dibuat di wilayah yang sama dengan kluster AnalyticDB for MySQL.
Akun database telah dibuat untuk kluster AnalyticDB for MySQL.
Jika Anda menggunakan akun Alibaba Cloud, Anda hanya perlu membuat akun istimewa.
Jika Anda menggunakan pengguna Resource Access Management (RAM), Anda harus membuat akun istimewa dan akun standar serta mengaitkan akun standar tersebut dengan pengguna RAM.
Lingkungan pengembangan Java 8 dan Python 3.9 telah diinstal. Lingkungan ini memungkinkan Anda menjalankan klien seperti aplikasi Java, aplikasi Python, dan Beeline.
Anda telah menambahkan alamat IP klien ke AnalyticDB for MySQL cluster daftar putih.
Catatan
Jika kelompok sumber daya Spark Interactive dihentikan, kluster akan secara otomatis memulai ulang saat Anda menjalankan kueri Spark SQL pertama. Akibatnya, kueri pertama mungkin mengantre dalam waktu lama.
Spark tidak dapat membaca dari atau menulis ke database INFORMATION_SCHEMA dan MYSQL. Jangan gunakan database ini untuk koneksi awal.
Pastikan akun database yang digunakan untuk mengirim pekerjaan Spark SQL memiliki izin yang diperlukan untuk mengakses database tujuan. Jika tidak, kueri akan gagal.
Persiapan
Anda telah membuat kelompok sumber daya Spark Interactive.
Dapatkan titik akhir kelompok sumber daya Spark Interactive.
Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.
Di panel navigasi sebelah kiri, pilih , lalu klik tab Resource Groups .
Temukan kelompok sumber daya target dan klik Details di kolom Actions untuk melihat titik akhir internal dan titik akhir publik. Anda dapat mengklik ikon
di samping titik akhir untuk menyalinnya. Anda juga dapat mengklik ikon
di dalam tanda kurung di samping Port untuk menyalin string koneksi JDBC.Dalam kasus berikut, Anda harus mengklik Request Network di samping Public Endpoint untuk meminta titik akhir publik secara manual.
Alat klien yang digunakan untuk mengirim pekerjaan Spark SQL diterapkan di mesin lokal atau server eksternal.
Alat klien yang digunakan untuk mengirim pekerjaan Spark SQL diterapkan di instans ECS, dan instans ECS serta kluster AnalyticDB for MySQL tidak berada dalam VPC yang sama.

Analisis interaktif
Konsol
Jika Anda menggunakan HiveMetastore yang dikelola sendiri, Anda harus membuat database bernama default di AnalyticDB for MySQL. Kemudian, pilih database ini saat Anda mengembangkan dan menjalankan pekerjaan Spark SQL di konsol.
Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.
Di panel navigasi sebelah kiri, pilih .
Pilih engine Spark dan kelompok sumber daya Spark Interactive yang telah Anda buat. Kemudian, jalankan pernyataan Spark SQL berikut:
SHOW DATABASES;
Panggilan SDK
Saat Anda memanggil metode SDK untuk menjalankan kueri Spark SQL, hasil kueri akan ditulis ke file di lokasi OSS yang ditentukan. Anda kemudian dapat mengkueri data tersebut di konsol OSS atau mengunduh file hasil ke komputer lokal Anda untuk melihatnya. Bagian berikut memberikan contoh cara memanggil SDK menggunakan Python.
Jalankan perintah berikut untuk menginstal SDK.
pip install alibabacloud-adb20211201Jalankan perintah berikut untuk menginstal dependensi.
pip install oss2 pip install loguruHubungkan ke kluster dan jalankan kueri Spark SQL.
# coding: utf-8 import csv import json import time from io import StringIO import oss2 from alibabacloud_adb20211201.client import Client from alibabacloud_adb20211201.models import ExecuteSparkWarehouseBatchSQLRequest, ExecuteSparkWarehouseBatchSQLResponse, \ GetSparkWarehouseBatchSQLRequest, GetSparkWarehouseBatchSQLResponse, \ ListSparkWarehouseBatchSQLRequest, CancelSparkWarehouseBatchSQLRequest, ListSparkWarehouseBatchSQLResponse from alibabacloud_tea_openapi.models import Config from loguru import logger def build_sql_config(oss_location, spark_sql_runtime_config: dict = None, file_format = "CSV", output_partitions = 1, sep = "|"): """ Membangun konfigurasi untuk eksekusi SQL ADB. :param oss_location: Jalur OSS tempat hasil eksekusi SQL disimpan. :param spark_sql_runtime_config: Konfigurasi asli komunitas Spark SQL. :param file_format: Format file hasil eksekusi SQL. Nilai default: CSV. :param output_partitions: Jumlah partisi untuk hasil eksekusi SQL. Jika jumlah hasil yang besar perlu dikeluarkan, Anda harus menambah jumlah partisi output agar mencegah satu file terlalu besar. :param sep: Pemisah untuk file CSV. Parameter ini diabaikan untuk file non-CSV. :return: Konfigurasi untuk eksekusi SQL. """ if oss_location is None: raise ValueError("oss_location wajib diisi") if not oss_location.startswith("oss://"): raise ValueError("oss_location harus dimulai dengan oss://") if file_format != "CSV" and file_format != "PARQUET" and file_format != "ORC" and file_format != "JSON": raise ValueError("file_format harus berupa CSV, PARQUET, ORC, atau JSON") runtime_config = { # konfigurasi output sql "spark.adb.sqlOutputFormat": file_format, "spark.adb.sqlOutputPartitions": output_partitions, "spark.adb.sqlOutputLocation": oss_location, # konfigurasi csv "sep": sep } if spark_sql_runtime_config: runtime_config.update(spark_sql_runtime_config) return runtime_config def execute_sql(client: Client, dbcluster_id: str, resource_group_name: str, query: str, limit = 10000, runtime_config: dict = None, schema="default" ): """ Menjalankan pernyataan SQL di kelompok sumber daya Spark Interactive. :param client: Klien Alibaba Cloud. :param dbcluster_id: ID kluster. :param resource_group_name: Kelompok sumber daya kluster. Harus merupakan kelompok sumber daya Spark Interactive. :param schema: Nama database default untuk eksekusi SQL. Jika tidak ditentukan, nilai default adalah `default`. :param limit: Jumlah maksimum baris dalam hasil eksekusi SQL. :param query: Pernyataan SQL yang akan dijalankan. Gunakan titik koma (;) untuk memisahkan beberapa pernyataan SQL. :return: """ # Merakit badan permintaan. req = ExecuteSparkWarehouseBatchSQLRequest() # ID kluster. req.dbcluster_id = dbcluster_id # Nama kelompok sumber daya. req.resource_group_name = resource_group_name # Batas waktu eksekusi SQL. req.execute_time_limit_in_seconds = 3600 # Nama database tempat pernyataan SQL dijalankan. req.schema = schema # Kode bisnis SQL. req.query = query # Jumlah baris hasil yang dikembalikan. req.execute_result_limit = limit if runtime_config: # Konfigurasi untuk eksekusi SQL. req.runtime_config = json.dumps(runtime_config) # Jalankan pernyataan SQL dan dapatkan query_id. resp: ExecuteSparkWarehouseBatchSQLResponse = client.execute_spark_warehouse_batch_sql(req) logger.info("Permintaan eksekusi kueri dikirim: {}", resp.body.data.query_id) return resp.body.data.query_id def get_query_state(client, query_id): """ Memeriksa status eksekusi pernyataan SQL. :param client: Klien Alibaba Cloud. :param query_id: ID eksekusi SQL. :return: Status dan hasil eksekusi SQL. """ req = GetSparkWarehouseBatchSQLRequest(query_id=query_id) resp: GetSparkWarehouseBatchSQLResponse = client.get_spark_warehouse_batch_sql(req) logger.info("Status kueri: {}", resp.body.data.query_state) return resp.body.data.query_state, resp def list_history_query(client, db_cluster, resource_group_name, page_num): """ Memeriksa riwayat pernyataan SQL yang dieksekusi di kelompok sumber daya Spark Interactive. :param client: Klien Alibaba Cloud. :param db_cluster: ID kluster. :param resource_group_name: Nama kelompok sumber daya. :param page_num: Nomor halaman untuk kueri berhalaman. :return: Apakah ada pernyataan SQL. Jika ya, Anda dapat melanjutkan ke halaman berikutnya. """ req = ListSparkWarehouseBatchSQLRequest(dbcluster_id=db_cluster, resource_group_name=resource_group_name, page_number = page_num) resp: ListSparkWarehouseBatchSQLResponse = client.list_spark_warehouse_batch_sql(req) # Jika tidak ditemukan pernyataan SQL, kembalikan true. Jika tidak, kembalikan false. Ukuran halaman default adalah 10. if resp.body.data.queries is None: return True # Cetak pernyataan SQL yang diperoleh. for query in resp.body.data.queries: logger.info("ID Kueri: {}, Status: {}", query.query_id, query.query_state) logger.info("Total kueri: {}", len(resp.body.data.queries)) return len(resp.body.data.queries) < 10 def list_csv_files(oss_client, dir): for obj in oss_client.list_objects_v2(dir).object_list: if obj.key.endswith(".csv"): logger.info(f"membaca {obj.key}") # baca konten file oss csv_content = oss_client.get_object(obj.key).read().decode('utf-8') csv_reader = csv.DictReader(StringIO(csv_content)) # Cetak konten CSV for row in csv_reader: print(row) if __name__ == '__main__': logger.info("Demo ADB Spark Batch SQL") # ID AccessKey. Ganti nilainya dengan ID AccessKey Anda. _ak = "LTAI****************" # Rahasia AccessKey. Ganti nilainya dengan rahasia AccessKey Anda. _sk = "yourAccessKeySecret" # ID wilayah. Ganti nilainya dengan ID wilayah Anda. _region= "cn-shanghai" # ID kluster. Ganti nilainya dengan ID kluster Anda. _db = "amv-uf6485635f****" # Nama kelompok sumber daya. Ganti nilainya dengan nama kelompok sumber daya Anda. _rg_name = "testjob" # konfigurasi klien client_config = Config( # ID AccessKey Alibaba Cloud access_key_id=_ak, # Rahasia AccessKey Alibaba Cloud access_key_secret=_sk, # Titik akhir layanan ADB # adb.ap-southeast-1.aliyuncs.com adalah titik akhir layanan ADB di wilayah Singapura # adb-vpc.ap-southeast-1.aliyuncs.com digunakan dalam skenario VPC endpoint=f"adb.{_region}.aliyuncs.com" ) # Buat Klien Alibaba Cloud. _client = Client(client_config) # Konfigurasi untuk eksekusi SQL. _spark_sql_runtime_config = { "spark.sql.shuffle.partitions": 1000, "spark.sql.autoBroadcastJoinThreshold": 104857600, "spark.sql.sources.partitionOverwriteMode": "dynamic", "spark.sql.sources.partitionOverwriteMode.dynamic": "dynamic" } _config = build_sql_config(oss_location="oss://testBucketName/sql_result", spark_sql_runtime_config = _spark_sql_runtime_config) # Pernyataan SQL yang akan dijalankan. _query = """ SHOW DATABASES; SELECT 100; """ _query_id = execute_sql(client = _client, dbcluster_id=_db, resource_group_name=_rg_name, query=_query, runtime_config=_config) logger.info(f"Menjalankan query_id: {_query_id} untuk SQL {_query}.\n Menunggu hasil...") # Tunggu hingga eksekusi SQL selesai. current_ts = time.time() while True: query_state, resp = get_query_state(_client, _query_id) """ query_state dapat berada dalam salah satu status berikut: - PENDING: Pekerjaan sedang menunggu di antrean layanan. Kelompok sumber daya Spark Interactive sedang dimulai. - SUBMITTED: Pekerjaan telah dikirim ke kelompok sumber daya Spark Interactive. - RUNNING: Pernyataan SQL sedang dijalankan. - FINISHED: Pernyataan SQL berhasil dijalankan tanpa kesalahan. - FAILED: Eksekusi SQL gagal. - CANCELED: Eksekusi SQL dibatalkan. """ if query_state == "FINISHED": logger.info("kueri berhasil selesai") break elif query_state == "FAILED": # Cetak informasi kegagalan. logger.error("Info Kesalahan: {}", resp.body.data) exit(1) elif query_state == "CANCELED": # Cetak informasi pembatalan. logger.error("kueri dibatalkan") exit(1) else: time.sleep(2) if time.time() - current_ts > 600: logger.error("kueri melebihi batas waktu") # Jika waktu eksekusi melebihi 10 menit, batalkan eksekusi SQL. _client.cancel_spark_warehouse_batch_sql(CancelSparkWarehouseBatchSQLRequest(query_id=_query_id)) exit(1) # Satu kueri dapat berisi beberapa pernyataan. Cantumkan semua pernyataan. for stmt in resp.body.data.statements: logger.info( f"statement_id: {stmt.statement_id}, lokasi hasil: {stmt.result_uri}") # Contoh kode untuk melihat hasil. _bucket = stmt.result_uri.split("oss://")[1].split("/")[0] _dir = stmt.result_uri.replace(f"oss://{_bucket}/", "").replace("//", "/") oss_client = oss2.Bucket(oss2.Auth(client_config.access_key_id, client_config.access_key_secret), f"oss-{_region}.aliyuncs.com", _bucket) list_csv_files(oss_client, _dir) # Periksa semua pernyataan SQL yang dieksekusi di kelompok sumber daya Spark Interactive. Kueri berhalaman didukung. logger.info("Menampilkan semua riwayat kueri") page_num = 1 no_more_page = list_history_query(_client, _db, _rg_name, page_num) while no_more_page: logger.info(f"Menampilkan halaman {page_num}") page_num += 1 no_more_page = list_history_query(_client, _db, _rg_name, page_num)Parameter:
_ak: ID AccessKey akun Alibaba Cloud atau pengguna RAM yang memiliki AnalyticDB for MySQL izin akses. Untuk informasi lebih lanjut tentang cara mendapatkan ID AccessKey dan Rahasia AccessKey, lihat Akun dan Izin.
_sk: Rahasia AccessKey akun Alibaba Cloud atau pengguna RAM yang memiliki AnalyticDB for MySQL izin akses. Untuk informasi lebih lanjut tentang cara mendapatkan ID AccessKey dan Rahasia AccessKey, lihat Akun dan Izin.
region: ID wilayah kluster AnalyticDB for MySQL.
_db: ID kluster AnalyticDB for MySQL.
_rg_name: Nama kelompok sumber daya Spark Interactive.
oss_location: (Opsional) Jalur OSS tempat file hasil kueri disimpan.
Jika Anda tidak menentukan parameter ini, Anda hanya dapat melihat lima baris data pertama di Log pernyataan kueri SQL pada halaman .
Aplikasi
Hive JDBC
Konfigurasikan dependensi Maven di file pom.xml.
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.3.9</version> </dependency>Buat koneksi dan jalankan kueri Spark SQL.
public class java { public static void main(String[] args) throws Exception { Class.forName("org.apache.hive.jdbc.HiveDriver"); String url = "<endpoint>"; Connection con = DriverManager.getConnection(url, "<username>", "<password>"); Statement stmt = con.createStatement(); ResultSet tables = stmt.executeQuery("show tables"); List<String> tbls = new ArrayList<>(); while (tables.next()) { System.out.println(tables.getString("tableName")); tbls.add(tables.getString("tableName")); } } }Parameter:
Alamat koneksi: String koneksi JDBC untuk kelompok sumber daya Spark Interactive. Untuk informasi selengkapnya, lihat Persiapan. Di string koneksi, Anda harus mengganti
defaultdengan nama database Anda.username: Akun database untuk AnalyticDB for MySQL.
password: Kata sandi untuk akun database AnalyticDB for MySQL.
PyHive
Instal klien Hive Python.
pip install pyhiveBuat koneksi dan jalankan kueri Spark SQL.
from pyhive import hive from TCLIService.ttypes import TOperationState cursor = hive.connect( host='<endpoint>', port=<port>, username='<resource_group_name>/<username>', password='<password>', auth='CUSTOM' ).cursor() cursor.execute('show tables') status = cursor.poll().operationState while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): logs = cursor.fetch_logs() for message in logs: print(message) # Jika diperlukan, kueri asinkron dapat dibatalkan kapan saja dengan: # cursor.cancel() status = cursor.poll().operationState print(cursor.fetchall())Parameter:
Alamat koneksi: Persiapan menyediakan alamat koneksi kelompok sumber daya Spark Interactive.
port: Nomor port kelompok sumber daya Spark Interactive. Nilainya tetap 10000.
resource_group_name: Nama kelompok sumber daya Spark Interactive.
username: Akun database untuk AnalyticDB for MySQL.
password: Kata sandi untuk akun database AnalyticDB for MySQL.
Klien
Selain klien Beeline, DBeaver, DBVisualizer, dan Datagrip yang dijelaskan dalam topik ini, Anda juga dapat melakukan analisis interaktif di alat penjadwalan alur kerja seperti Airflow, Azkaban, dan DolphinScheduler.
Beeline
Hubungkan ke kelompok sumber daya Spark Interactive.
Format perintahnya sebagai berikut:
!connect <endpoint> <username> <password>Alamat koneksi: String koneksi JDBC untuk kelompok sumber daya Spark Interactive. Untuk informasi selengkapnya, lihat Persiapan. Di string koneksi, Anda harus mengganti
defaultdengan nama database Anda.username: Akun database untuk AnalyticDB for MySQL.
password: Kata sandi untuk akun database AnalyticDB for MySQL.
Contoh:
!connect jdbc:hive2://amv-bp1c3em7b2e****-spark.ads.aliyuncs.com:10000/adb_test spark_resourcegroup/AdbSpark14**** Spark23****Hasil berikut dikembalikan:
Connected to: Spark SQL (version 3.2.0) Driver: Hive JDBC (version 2.3.9) Transaction isolation: TRANSACTION_REPEATABLE_READJalankan kueri Spark SQL.
SHOW TABLES;
DBeaver
Buka klien DBeaver dan pilih .
Di halaman Connect To A Database , pilih Apache Spark dan klik Next.
Konfigurasikan Hadoop/Apache Spark Connection Settings. Parameter dijelaskan sebagai berikut:
Parameter
Deskripsi
Metode koneksi
Tetapkan Metode koneksi ke URL.
URL JDBC
Masukkan string koneksi JDBC yang Anda peroleh di Persiapan.
PentingGanti
defaultdi string koneksi dengan nama database Anda.Username
Akun database AnalyticDB for MySQL.
Password
Kata sandi akun database AnalyticDB for MySQL.
Setelah Anda mengonfigurasi parameter, klik Test Connection.
PentingSaat Anda menguji koneksi untuk pertama kalinya, DBeaver secara otomatis mengambil informasi tentang driver yang diperlukan. Setelah informasi diambil, klik Download untuk mengunduh driver.
Setelah koneksi berhasil, klik Finish.
Di tab Database Navigator, perluas subdirektori sumber data dan klik database.
Di editor kode di sebelah kanan, masukkan pernyataan SQL dan klik ikon
untuk menjalankan pernyataan tersebut.SHOW TABLES;Hasil berikut dikembalikan:
+-----------+-----------+-------------+ | namespace | tableName | isTemporary | +-----------+-----------+-------------+ | db | test | [] | +-----------+-----------+-------------+
DBVisualizer
Buka klien DBVisualizer dan pilih .
Di halaman Driver Manager , pilih Hive dan klik ikon
.Di tab Driver Settings , konfigurasikan parameter berikut:
Parameter
Deskripsi
Name
Nama sumber data Hive. Anda dapat menyesuaikan nama tersebut.
URL Format
Masukkan string koneksi JDBC yang Anda peroleh di Persiapan.
PentingGanti
defaultdi string koneksi dengan nama database Anda.Driver Class
Driver Hive. Tetapkan ke org.apache.hive.jdbc.HiveDriver.
PentingSetelah Anda mengonfigurasi parameter, klik Start Download untuk mengunduh driver.
Setelah driver diunduh, pilih .
Di kotak dialog Create Database Connection from Database URL, konfigurasikan parameter yang dijelaskan dalam tabel berikut.
Parameter
Deskripsi
Database URL
Masukkan string koneksi JDBC yang Anda peroleh di Persiapan.
PentingGanti
defaultdi string koneksi dengan nama database Anda.Driver Class
Pilih sumber data Hive yang Anda buat di Langkah 3.
Di halaman Connection, konfigurasikan parameter koneksi berikut dan klik Connect.
Parameter
Deskripsi
Name
Secara default, parameter ini memiliki nilai yang sama dengan sumber data Hive yang dibuat di Langkah 3. Anda dapat menyesuaikan nama tersebut.
Notes
Keterangan.
Driver Type
Pilih Hive.
Database URL
Masukkan string koneksi JDBC yang Anda peroleh di Persiapan.
PentingGanti
defaultdi string koneksi dengan nama database Anda.Database Userid
Akun database AnalyticDB for MySQL.
Database Password
Kata sandi akun database AnalyticDB for MySQL.
CatatanAnda tidak perlu mengonfigurasi parameter lainnya. Anda dapat menggunakan nilai default.
Setelah koneksi berhasil, di tab Database, perluas subdirektori sumber data dan klik database.
Di editor kode di sebelah kanan, masukkan pernyataan SQL dan klik ikon
untuk menjalankan pernyataan tersebut.SHOW TABLES;Hasil berikut dikembalikan:
+-----------+-----------+-------------+ | namespace | tableName | isTemporary | +-----------+-----------+-------------+ | db | test | false | +-----------+-----------+-------------+
Datagrip
Buka klien Datagrip dan pilih untuk membuat proyek.
Tambahkan sumber data.
Klik ikon
dan pilih .Di kotak dialog Data Sources and Drivers yang muncul, konfigurasikan parameter berikut dan klik OK.

Parameter
Deskripsi
Name
Nama sumber data. Anda dapat menyesuaikan nama tersebut. Dalam contoh ini,
adbtestdigunakan.Host
Masukkan string koneksi JDBC yang Anda peroleh di Persiapan.
PentingGanti
defaultdi string koneksi dengan nama database Anda.Port
Nomor port kelompok sumber daya Spark Interactive. Nilainya tetap 10000.
User
Akun database AnalyticDB for MySQL.
Password
Kata sandi akun database AnalyticDB for MySQL.
Schema
Nama database di kluster AnalyticDB for MySQL.
Jalankan kueri Spark SQL.
Di daftar sumber data, klik kanan sumber data yang Anda buat di Langkah 2 dan pilih .
Di panel Console di sebelah kanan, jalankan kueri Spark SQL.
SHOW TABLES;
Alat BI
Anda dapat melakukan analisis interaktif di Redash, Power BI, dan Metabase.