Kyuubi Gateway は、Java Database Connectivity (JDBC) および ODBC インターフェイスを提供し、Serverless Spark を SQL クエリおよび Tableau や Power BI などのビジネスインテリジェンス (BI) ツールとシームレスに接続します。これにより、効率的なデータアクセスと分析が可能になります。このゲートウェイは、エンタープライズアプリケーションの要求を満たすためのマルチテナンシーとリソースの隔離も特徴としています。
Kyuubi Gateway の作成
Gateway ページに移動します。
EMR コンソールにログインします。
左側のナビゲーションウィンドウで、 を選択します。
[Spark] ページで、管理するワークスペースの名前をクリックします。
[EMR Serverless Spark] ページで、左側のナビゲーションウィンドウで をクリックします。
[Kyuubi Gateway] ページで、[Kyuubi Gateway の作成] をクリックします。
[Kyuubi Gateway の作成] ページで、次のパラメーターを設定し、[作成] をクリックします。
パラメーター
説明
名前
新しいゲートウェイの名前。名前には、小文字、数字、ハイフン (-) のみを含めることができます。文字または数字で開始および終了する必要があります。
Kyuubi Gateway リソース
デフォルトは
2 CPU, 8 GBです。サポートされている仕様と推奨される最大同時接続数は次のとおりです:
1 CPU, 4 GB: 102 CPU, 8 GB: 204 CPU, 16 GB: 308 CPU, 32 GB: 4516 CPU, 64 GB: 8532 CPU, 128 GB: 135
説明Spark の設定項目が多すぎると、Spark ジョブ送信の瞬間的な同時実行数が減少します。
Kyuubi バージョン
ゲートウェイが使用する Kyuubi のバージョン。
説明[データカタログ] で DLF (旧 DLF 2.5) を使用する場合は、[Kyuubi バージョン] を 1.9.2-0.0.1 以降に設定する必要があります。
エンジンバージョン
ゲートウェイが使用するエンジンバージョン。エンジンバージョン番号の詳細については、「エンジンバージョン」をご参照ください。
関連付けられたキュー
ゲートウェイは選択されたキューにデプロイされます。ゲートウェイを介して Spark ジョブを送信すると、ジョブはゲートウェイ作成者の ID を使用して送信されます。
認証方法
トークンベースの認証のみがサポートされています。
ゲートウェイを作成した後、ゲートウェイ用の一意の認証トークンを生成する必要があります。このトークンは、後続のリクエストでの ID 検証とアクセスの制御に使用されます。トークンの作成方法の詳細については、「ゲートウェイの管理」をご参照ください。
サービス高可用性
サービス高可用性を有効にすると、高可用性を実現するために 3 つ以上の Kyuubi サーバーがデプロイされます。
この機能を有効にした後、次のパラメーターを設定します:
Kyuubi サーバーのデプロイメント数: Kyuubi サーバーの数。
Zookeeper クラスターエンドポイント: 高可用性 Kyuubi Gateway は Zookeeper クラスターに依存します。Zookeeper クラスターのエンドポイントを入力します。複数のノードがある場合は、コンマ (,) で区切ります。ネットワークが接続されていることを確認してください。例:
zk1:2181,zk2:2181,zk3:2181。
ネットワーク接続
パブリックエンドポイント
この機能はデフォルトで無効になっています。この機能を有効にすると、システムはパブリックエンドポイントを介して Kyuubi にアクセスします。それ以外の場合、Kyuubi はデフォルトで内部エンドポイントを介してアクセスされます。
Kyuubi 構成
Kyuubi の構成情報を入力します。項目はスペースで区切ります。例:
kyuubi.engine.pool.size 1。次の Kyuubi 構成のみがサポートされています。
kyuubi.engine.pool.size kyuubi.engine.pool.size.threshold kyuubi.engine.share.level kyuubi.engine.single.spark.session kyuubi.session.engine.idle.timeout kyuubi.session.engine.initialize.timeout kyuubi.engine.security.token.max.lifetime kyuubi.session.engine.check.interval kyuubi.session.idle.timeout kyuubi.session.engine.request.timeout kyuubi.session.engine.login.timeout kyuubi.backend.engine.exec.pool.shutdown.timeout kyuubi.backend.server.exec.pool.shutdown.timeout kyuubi.backend.server.exec.pool.keepalive.time kyuubi.frontend.thrift.login.timeout kyuubi.operation.status.polling.timeout kyuubi.engine.pool.selectPolicy kyuubi.authentication kyuubi.kinit.principal kyuubi.kinit.keytab kyuubi.authentication.ldap.* kyuubi.hadoop.proxyuser.hive.hosts kyuubi.hadoop.proxyuser.hive.groups kyuubi.hadoop.proxyuser.kyuubi.hosts kyuubi.hadoop.proxyuser.kyuubi.groups kyuubi.ha.*Spark 構成
Spark の構成情報を入力します。項目はスペースで区切ります。
spark.kubernetes.*タイプのパラメーターを除き、すべてのパラメーターがサポートされています。例:spark.sql.catalog.paimon.metastore dlf。[Kyuubi Gateway] ページで、作成したゲートウェイを見つけ、[操作] 列の [開始] をクリックします。
トークンの管理
[Kyuubi Gateway] ページで、ターゲットゲートウェイを見つけ、[操作] 列の [トークン管理] をクリックします。
[トークンの作成] をクリックします。
[トークンの作成] ダイアログボックスで、パラメーターを設定し、[OK] をクリックします。
パラメーター
説明
名前
新しいトークンの名前。
有効期限
トークンの有効期限を設定します。日数は 1 以上である必要があります。デフォルトでは、これは有効になっており、トークンは 365 日後に有効期限が切れます。
オブジェクトの割り当て
[Resource Access Management] で追加した RAM ユーザーまたは RAM ロールをドロップダウンリストから選択します。
トークンが割り当てられる RAM ユーザーまたは RAM ロールを指定します。これは、Kyuubi Gateway に接続して Spark ジョブを送信する際に DLF にアクセスするために使用されます。
トークン情報をコピーします。
重要トークンは作成後にすぐにコピーする必要があります。後で取得することはできません。トークンの有効期限が切れたり、紛失したりした場合は、新しいトークンを作成するか、トークンをリセットする必要があります。
Kyuubi Gateway への接続
Kyuubi Gateway に接続する際、JDBC URL のプレースホルダーを置き換えます:
<endpoint>: [概要] タブから取得できるエンドポイント情報。<port>: ポート番号。ポート番号は、パブリックエンドポイントの場合は 443、内部の同一リージョンエンドポイントの場合は 80 です。<token>: [トークン管理] ページからコピーしたトークン情報。<tokenname>: トークン名。[トークン管理] ページから取得できます。<UserName/RoleName>: [Resource Access Management] に追加した RAM ユーザーまたは RAM ロール。
Beeline を使用した接続
Kyuubi Gateway に接続する前に、Beeline のバージョンが Kyuubi サーバーのバージョンと互換性があることを確認してください。Beeline がインストールされていない場合は、「Getting Started - Apache Kyuubi」をご参照ください。
[データカタログ] ページで設定されているデフォルトのカタログに基づいて、次のいずれかの方法を選択します。
DLF (旧 DLF 2.5) の使用
beeline -u "jdbc:hive2://<endpoint>:<port>/;transportMode=http;user=<UserName/RoleName>;httpPath=cliservice/token/<token>"別のカタログの使用
beeline -u "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>"Beeline を使用して接続する際に、セッションパラメーターを変更できます。例: beeline -u "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>;#spark.sql.shuffle.partitions=100;spark.executor.instances=2;"。
Java を使用した接続
pom.xml ファイルを更新します。
hadoop-commonとhive-jdbcを適切な依存関係のバージョンに置き換えます。<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.3.9</version> </dependency> </dependencies>Kyuubi Gateway に接続するための Java コードを記述します。
[データカタログ] ページで設定されているデフォルトのカタログに基づいて、次のいずれかの方法を選択します。
DLF (旧 DLF 2.5) の使用
import org.apache.hive.jdbc.HiveStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; public class Main { public static void main(String[] args) throws Exception { String url = "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>;user=<UserName/RoleName>"; Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection conn = DriverManager.getConnection(url); HiveStatement stmt = (HiveStatement) conn.createStatement(); String sql = "select * from students;"; System.out.println("Running " + sql); ResultSet res = stmt.executeQuery(sql); ResultSetMetaData md = res.getMetaData(); String[] columns = new String[md.getColumnCount()]; for (int i = 0; i < columns.length; i++) { columns[i] = md.getColumnName(i + 1); } while (res.next()) { System.out.print("Row " + res.getRow() + "=["); for (int i = 0; i < columns.length; i++) { if (i != 0) { System.out.print(", "); } System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'"); } System.out.println(")]"); } conn.close(); } }別のカタログの使用
import org.apache.hive.jdbc.HiveStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; public class Main { public static void main(String[] args) throws Exception { String url = "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>"; Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection conn = DriverManager.getConnection(url); HiveStatement stmt = (HiveStatement) conn.createStatement(); String sql = "select * from students;"; System.out.println("Running " + sql); ResultSet res = stmt.executeQuery(sql); ResultSetMetaData md = res.getMetaData(); String[] columns = new String[md.getColumnCount()]; for (int i = 0; i < columns.length; i++) { columns[i] = md.getColumnName(i + 1); } while (res.next()) { System.out.print("Row " + res.getRow() + "=["); for (int i = 0; i < columns.length; i++) { if (i != 0) { System.out.print(", "); } System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'"); } System.out.println(")]"); } conn.close(); } }
Python を使用した接続
次のコマンドを実行して、PyHive と Thrift パッケージをインストールします。
pip3 install pyhive thriftKyuubi Gateway に接続するための Python スクリプトを記述します。
次の Python スクリプトは、Kyuubi Gateway に接続してデータベースのリストを表示する方法の例です。
[データカタログ] ページで設定されているデフォルトのカタログに基づいて、次のいずれかの方法を選択します。
DLF (旧 DLF 2.5) の使用
from pyhive import hive if __name__ == '__main__': cursor = hive.connect('<endpoint>', port="<port>", scheme='http', username='<UserName/RoleName>', password='<token>').cursor() cursor.execute('show databases') print(cursor.fetchall()) cursor.close()別のカタログの使用
from pyhive import hive if __name__ == '__main__': cursor = hive.connect('<endpoint>', port="<port>", scheme='http', username='<tokenname>', password='<token>').cursor() cursor.execute('show databases') print(cursor.fetchall()) cursor.close()
REST API を使用した接続
Kyuubi Gateway は、HTTP を介した Kyuubi サービスとの対話をサポートする、オープンソース互換の Representational State Transfer (REST) API を提供します。現在、次の API パスのみがサポートされています:
/api/v1/sessions/*/api/v1/operations/*/api/v1/batches/*
次の例は、REST API を使用して Kyuubi Gateway に接続する方法を示しています。
例 1: セッションを開始して SQL クエリを実行する。
セッションを作成し、Spark 構成を指定します。
[データカタログ] ページで設定されているデフォルトのカタログに基づいて、次のいずれかの方法を選択します。
説明spark.emr.serverless.kyuubi.engine.queueは、Spark ジョブが実行時に使用するキューを指定します。<dev_queue>を実際のキュー名に置き換えてください。<UserName/Rolename>: これを実際のユーザー名またはロール名に置き換えてください。<password>: これはプレースホルダーです。任意の値を入力できます。
DLF (旧 DLF 2.5) の使用
curl -X 'POST' \ 'http://<endpoint>:<port>/api/v1/sessions/token/<token>' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -u '<UserName/Rolename>:<password>' \ -d '{ "configs": { "set:hivevar:spark.emr.serverless.kyuubi.engine.queue": "<dev_queue>" } }'別のカタログの使用
curl -X 'POST' \ 'http://<endpoint>:<port>/api/v1/sessions/token/<token>' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "configs": { "set:hivevar:spark.emr.serverless.kyuubi.engine.queue": "<dev_queue>" } }'次のようなメッセージが返されます。メッセージ内の
identifierは、セッションを一意に識別する Kyuubi セッションハンドルを示します。このトピックでは、この値を<sessionHandle>と呼びます。{"identifier":"619e6ded-xxxx-xxxx-xxxx-c2a43f6fac46","kyuubiInstance":"0.0.0.0:10099"}文を作成します。
DLF (旧 DLF 2.5) の使用
curl -X 'POST' \ 'http://<endpoint>:<port>/api/v1/sessions/<sessionHandle>/operations/statement/token/<token>' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -u '<UserName/RoleName>:<password>' \ -d '{ "statement": "select * from test;", "runAsync": true, "queryTimeout": 0, "confOverlay": { "additionalProp1": "string", "additionalProp2": "string" } }'別のカタログの使用
curl -X 'POST' \ 'http://<endpoint>:<port>/api/v1/sessions/<sessionHandle>/operations/statement/token/<token>' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "statement": "select * from test;", "runAsync": true, "queryTimeout": 0, "confOverlay": { "additionalProp1": "string", "additionalProp2": "string" } }'次のようなメッセージが返されます。ここで、
identifierは、特定の操作を一意に識別する Kyuubi 操作ハンドルを示します。このトピックでは、この値を<operationHandle>と呼びます。{"identifier":"a743e8ff-xxxx-xxxx-xxxx-a66fec66cfa4"}文のステータスを取得します。
DLF (旧 DLF 2.5) の使用
curl --location -X 'GET' \ 'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/event/token/<token>' \ -H 'accept: application/json' \ -u '<UserName/RoleName>:<password>'別のカタログの使用
curl --location -X 'GET' \ 'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/event/token/<token>' \ -H 'accept: application/json'文の結果を取得します。
DLF (旧 DLF 2.5) の使用
curl --location -X 'GET' \ 'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/rowset/token/<token>/?maxrows=100&fetchorientation=FETCH_NEXT' \ -H 'accept: application/json' \ -u '<UserName/RoleName>:<password>'別のカタログの使用
curl --location -X 'GET' \ 'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/rowset/token/<token>/?maxrows=100&fetchorientation=FETCH_NEXT' \ -H 'accept: application/json'
例 2: batches API を使用してバッチジョブを送信する。
REST API を使用して、Spark バッチ処理ジョブを Kyuubi Gateway に送信できます。Kyuubi Gateway は Spark アプリケーションを開始し、リクエスト内のパラメーターに基づいて指定されたタスクを実行します。
この例では、
<endpoint>、<port>、<token>などの情報を置き換えるだけでなく、spark-examples_2.12-3.3.1.jar をクリックしてテスト JAR パッケージをダウンロードする必要もあります。説明この JAR パッケージは、Spark に付属する簡単な例です。円周率 (π) の値を計算するために使用されます。
DLF (旧 DLF 2.5) の使用
curl --location \ --request POST 'http://<endpoint>:<port>/api/v1/batches/token/<token>' \ --user '<UserName/RoleName>:<password>' \ --form 'batchRequest="{ \"batchType\": \"SPARK\", \"className\": \"org.apache.spark.examples.SparkPi\", \"name\": \"kyuubi-spark-pi\", \"resource\": \"oss://bucket/path/to/spark-examples_2.12-3.3.1.jar\" }";type=application/json'別のカタログの使用
curl --location \ --request POST 'http://<endpoint>:<port>/api/v1/batches/token/<token>' \ --form 'batchRequest="{ \"batchType\": \"SPARK\", \"className\": \"org.apache.spark.examples.SparkPi\", \"name\": \"kyuubi-spark-pi\", \"resource\": \"oss://bucket/path/to/spark-examples_2.12-3.3.1.jar\" }";type=application/json'
高可用性 Kyuubi Gateway の設定と接続
ネットワーク接続を確立します。
詳細については、「EMR Serverless Spark と他の VPC 間のネットワーク接続を確立する」をご参照ください。クライアントがターゲット VPC 内の Zookeeper クラスターにアクセスできることを確認してください。たとえば、Alibaba Cloud MSE または EMR on ECS の Zookeeper コンポーネントを使用できます。
Kyuubi Gateway の高可用性を有効にします。
Kyuubi Gateway を作成または編集する際に、[サービス高可用性] を有効にし、関連パラメーターを設定し、[ネットワーク接続] に確立されたネットワーク接続を選択します。
高可用性 Kyuubi Gateway に接続します。
上記の設定を完了すると、Kyuubi Gateway は Zookeeper を介して高可用性用に設定されます。REST API または JDBC を介して接続することで、その可用性を確認できます。
Kyuubi Gateway に接続する際、JDBC URL のプレースホルダーを置き換えます:
<endpoint>: [概要] タブから取得できるエンドポイント情報。<port>: ポート番号。ポート番号は、パブリックエンドポイントの場合は 443、内部の同一リージョンエンドポイントの場合は 80 です。<token>: [トークン管理] ページからコピーしたトークン情報。<tokenname>: トークン名。[トークン管理] ページから取得できます。<UserName/RoleName>: [Resource Access Management] に追加した RAM ユーザーまたは RAM ロール。
次の例は、高可用性 Kyuubi Gateway に接続する方法を示しています。
Beeline を使用した接続
kyuubi-hive-jdbc-1.9.2.jar をクリックして JDBC Driver JAR ファイルをダウンロードします。
JDBC Driver JAR ファイルを置き換えます。
元の JDBC Driver JAR ファイルをバックアップして移動します。
mv /your_path/apache-kyuubi-1.9.2-bin/beeline-jars /bak_path説明EMR on ECS を使用している場合、Kyuubi のデフォルトパスは
/opt/apps/KYUUBI/kyuubi-1.9.2-1.0.0/beeline-jarsです。Kyuubi のインストールパスがわからない場合は、env | grep KYUUBI_HOMEコマンドを実行して見つけることができます。新しい JDBC Driver JAR ファイルに置き換えます。
cp /download/serverless-spark-kyuubi-hive-jdbc-1.9.2.jar /your_path/apache-kyuubi-1.9.2-bin/beeline-jars
Beeline を使用して接続します。
/your_path/apache-kyuubi-1.9.2-bin/bin/beeline -u 'jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>'
Java を使用した接続
serverless-spark-kyuubi-hive-jdbc-shaded-1.9.2.jar をクリックして shaded パッケージをダウンロードします。
JDBC Driver を Maven リポジトリにインストールします。
次のコマンドを実行して、Serverless Spark が提供する JDBC Driver をローカルの Maven リポジトリにインストールします。
mvn install:install-file \ -Dfile=/download/serverless-spark-kyuubi-hive-jdbc-shaded-1.9.2.jar \ -DgroupId=org.apache.kyuubi \ -DartifactId=kyuubi-hive-jdbc-shaded \ -Dversion=1.9.2-ss \ -Dpackaging=jarpom.xmlファイルを修正します。次の依存関係をプロジェクトの
pom.xmlファイルに追加します。<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.kyuubi</groupId> <artifactId>kyuubi-hive-jdbc-shaded</artifactId> <version>1.9.2-ss</version> </dependency> </dependencies>Java のサンプルコードを記述します。
import org.apache.kyuubi.jdbc.hive.KyuubiStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; public class Main { public static void main(String[] args) throws Exception { String url = "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>"; Class.forName("org.apache.kyuubi.jdbc.KyuubiHiveDriver"); Connection conn = DriverManager.getConnection(url); KyuubiStatement stmt = (KyuubiStatement) conn.createStatement(); String sql = "select * from test;"; ResultSet res = stmt.executeQuery(sql); ResultSetMetaData md = res.getMetaData(); String[] columns = new String[md.getColumnCount()]; for (int i = 0; i < columns.length; i++) { columns[i] = md.getColumnName(i + 1); } while (res.next()) { System.out.print("Row " + res.getRow() + "=["); for (int i = 0; i < columns.length; i++) { if (i != 0) { System.out.print(", "); } System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'"); } System.out.println(")]"); } conn.close(); } }
Kyuubi によって送信された Spark ジョブのリストの表示
Kyuubi を介して送信された Spark ジョブについては、[ジョブ履歴] ページの [Kyuubi アプリケーション] タブで詳細なジョブ情報を表示できます。この情報には、[アプリケーション ID]、[アプリケーション名]、[アプリケーションステータス]、および [開始時間] が含まれます。この情報は、Kyuubi によって送信された Spark ジョブを理解し、管理するのに役立ちます。
[Kyuubi Gateway] ページで、ターゲットの Kyuubi Gateway をクリックします。
右上隅にある [アプリケーションリスト] をクリックします。

このページでは、この Kyuubi Gateway を介して送信されたすべての Spark ジョブの詳細を表示できます。[アプリケーション ID] (spark-xxxx) は Spark エンジンによって生成され、Kyuubi クライアントで接続したときに返されるアプリケーション ID と同じです。この ID はタスクインスタンスを一意に識別します。
