AnalyticDB for MySQL Spark を使用すると、同一の Alibaba Cloud アカウント内、または異なる Alibaba Cloud アカウント間で Object Storage Service (OSS) のデータにアクセスできます。本トピックでは、OSS からテキストファイルを読み込む PySpark の例を用いて、両方のシナリオについて説明します。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
OSS バケットと同じリージョンに、AnalyticDB for MySQL Data Lakehouse Edition クラスターが作成済みであること
クラスター用に作成されたジョブ リソースグループです。詳細については、「リソースグループの作成」をご参照ください。
クラスター向けにデータベースアカウントが設定済みであること:
Alibaba Cloud アカウントのユーザー:特権アカウントです。「特権アカウントを作成する」を参照してください。
Resource Access Management (RAM) ユーザー: 特権アカウントと標準アカウントの両方を持ち、標準アカウントは RAM ユーザーに関連付けられています。「データベースアカウントを作成する」および「RAM ユーザーとのデータベースアカウントの関連付けまたは関連付け解除」をご参照ください。
権限付与が完了済みであること。詳細については、「権限付与の実行」をご参照ください。
ご利用のシナリオを選択
| シナリオ | 必要な権限 | 主な構成 |
|---|---|---|
| 同一の Alibaba Cloud アカウント内での OSS へのアクセス | ご自身のアカウントにおける AliyunADBSparkProcessingDataRole | 追加パラメーターは不要 |
| 異なる Alibaba Cloud アカウント間での OSS へのアクセス | 他アカウントに対する権限付与が完了済みであること | spark.adb.roleArn を conf ブロックに追加 |
ステップ 1:データの準備
ジョブを送信する前に、入力ファイルおよび PySpark スクリプトの両方を OSS バケットにアップロードしてください。
以下の内容で
readme.txtという名前のテキストファイルを作成し、OSS バケットにアップロードします。AnalyticDB for MySQL Database serviceアップロード手順については、「オブジェクトのアップロード」をご参照ください。
example.pyという名前の Python ファイルを作成し、readme.txtの 1 行目を読み込む処理を記述した後、同じ OSS バケットにアップロードします。import sys from pyspark.sql import SparkSession # Spark アプリケーションの初期化 spark = SparkSession.builder.appName('OSS Example').getOrCreate() # テキストファイルの読み取り — パスは引数経由で渡されます textFile = spark.sparkContext.textFile(sys.argv[1]) # 総行数と先頭行を出力 print("File total lines: " + str(textFile.count())) print("First line is: " + textFile.first())
Spark アプリケーションのメインファイル(JAR パッケージまたは Python スクリプト)は、ジョブを送信する前に OSS に格納しておく必要があります。
ステップ 2:Spark ジョブの送信
AnalyticDB for MySQL コンソール にログインします。左上隅でリージョンを選択し、左側のナビゲーションウィンドウから クラスター をクリックします。Data Lakehouse Edition タブで対象のクラスターを見つけ、その ID をクリックします。
左側のナビゲーションウィンドウから、ジョブ開発 > Spark JAR 開発 を選択します。
エディター上部で、ジョブリソースグループおよび Spark アプリケーションの種別を選択します。本例では、バッチ を選択します。
ご利用のシナリオに対応するジョブ構成 JSON を貼り付け、今すぐ実行 をクリックします。
同一アカウント内の OSS データへのアクセス
{ "args": ["oss://testBucketName/data/readme.txt"], "name": "spark-oss-test", "file": "oss://testBucketName/data/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.resourceSpec": "small", "spark.executor.instances": 1 } }他アカウントの OSS データへのアクセス
spark.adb.roleArnをconfブロックに追加します。このパラメーターにより、Spark は他アカウントの OSS バケットにアクセスする際にどの RAM ロールを偽装するかを指定します。{ "args": ["oss://testBucketName/data/readme.txt"], "name": "CrossAccount", "file": "oss://testBucketName/data/example.py", "conf": { "spark.adb.roleArn": "acs:ram::testAccountID:role/<testUserName>", "spark.driver.resourceSpec": "small", "spark.executor.resourceSpec": "small", "spark.executor.instances": 1 } }
パラメーター
| パラメーター | 説明 |
|---|---|
args | Spark アプリケーションに渡される引数です。複数の値はカンマ(,)で区切ります。本例では、入力ファイルの OSS パスが textFile に渡されます。 |
name | Spark アプリケーションの名称です。 |
file | メインファイルのパスです。これは、エントリクラスを含む JAR パッケージ、またはエントリポイントとして機能する Python スクリプトです。OSS パスである必要があります。 |
spark.adb.roleArn | クロスアカウントでの OSS アクセスに使用する RAM ロールです。形式は acs:ram::<testAccountID>:role/<testUserName> です。ここで、<testAccountID> は OSS バケットを所有する Alibaba Cloud アカウントの ID、<testUserName> はクロスアカウント認証時に作成された RAM ロール名です。詳細については、「権限付与の実行」をご参照ください。複数のロールを指定する場合は、カンマ(,)で区切ります。同一アカウント内アクセスには不要です。 |
conf | Spark 構成を key:value 形式で、カンマ区切りで指定します。標準の Apache Spark パラメーターおよび AnalyticDB for MySQL 固有のパラメーターをサポートします。詳細については、「Spark アプリケーションの構成パラメーター」をご参照ください。 |
結果の確認
ジョブが実行された後、[ログ] をクリックして、[Spark JAR 開発] ページの [アプリケーション] タブの [操作] 列で出力を表示します。
ログビューアーの詳細については、「Spark エディター」をご参照ください。