DataWorks では、ADB Spark ノードを使用して AnalyticDB Spark タスクを開発・スケジュールし、他のタスクタイプと統合できます。本トピックでは、ADB Spark ノードを使用したタスク開発方法について説明します。
背景情報
ADB Spark は、AnalyticDB 内で大規模な Apache Spark データ処理タスクを実行するためのコンピュートエンジンです。リアルタイムデータ分析、複雑なクエリ、機械学習アプリケーションをサポートしています。Java、Scala、Python などの言語での開発を簡素化し、パフォーマンスの最適化とコスト削減のために自動的にスケーリングできます。タスクを構成するために JAR ファイルまたは .py ファイルをアップロードできます。この機能は、大規模データセットの効率的な処理とリアルタイムインサイトが必要な業界に最適です。これにより、企業はデータから有益な情報を抽出し、ビジネス成長を促進できます。
前提条件
以下の AnalyticDB for MySQL の前提条件を満たしている必要があります。
ご利用の DataWorks ワークスペースと同じリージョンに、AnalyticDB for MySQL の Basic Edition クラスターが作成されていること。詳細については、「クラスターの作成」をご参照ください。
AnalyticDB for MySQL クラスター内にジョブリソースグループが構成されていること。詳細については、「ジョブリソースグループの作成」をご参照ください。
説明DataWorks を使用して Spark アプリケーションを開発する場合は、必ずジョブリソースグループを作成してください。
ADB Spark ノードで OSS をストレージとして使用する場合、OSS バケットが AnalyticDB for MySQL クラスターと同じリージョンにあることを確認してください。
以下の DataWorks の前提条件を満たしている必要があります。
ワークスペースが作成されており、[Data Studio (新バージョン) を使用] が選択され、リソースグループがワークスペースにアタッチされていること。詳細については、「ワークスペースの作成」をご参照ください。
リソースグループが AnalyticDB for MySQL クラスターと同じ VPC にアタッチされていること。AnalyticDB for MySQL クラスター内に リソースグループ の IP アドレスホワイトリストが構成されていること。詳細については、「ホワイトリストの設定」をご参照ください。
AnalyticDB for MySQL クラスターインスタンスが、AnalyticDB for Spark タイプのコンピュートエンジンとして DataWorks に追加されていること。リソースグループとコンピュートエンジン間の接続性がテスト済みであること。詳細については、「コンピュートエンジンのアタッチ」をご参照ください。
ワークフローフォルダが作成されていること。詳細については、「ワークフローフォルダの作成」をご参照ください。
ADB Spark ノードが作成されていること。詳細については、「ワークフロー用ノードの作成」をご参照ください。
ステップ 1:ADB Spark ノードの開発
ADB Spark ノードでは、選択した言語に基づいてノードの内容を構成できます。サンプル JAR パッケージ spark-examples_2.12-3.2.0.jar またはサンプル spark_oss.py ファイルを使用できます。ノード内容の開発に関する詳細については、「spark-submit コマンドラインツールを使用した Spark アプリケーションの開発」をご参照ください。
ADB Spark ノードの内容構成(Java/Scala)
実行ファイル(JAR)の準備
ノード構成内で JAR パッケージファイルを実行できるように、サンプル JAR パッケージを OSS にアップロードします。
サンプル JAR パッケージを準備します。
ADB Spark ノードで使用するサンプル JAR パッケージ spark-examples_2.12-3.2.0.jar をダウンロードします。
サンプルコードを OSS にアップロードします。
OSS コンソール にログインします。左側のナビゲーションウィンドウで、バケット をクリックします。
バケットページで、バケットの作成 をクリックします。バケットの作成 パネルで、AnalyticDB for MySQL クラスターと同じリージョンにバケットを作成します。
説明本トピックでは、
dw-1127という名前のバケットを例として使用します。外部ストレージフォルダを作成します。
バケット作成後、バケットに移動 をクリックします。オブジェクト ページで、ディレクトリの作成 をクリックして、データベース用の外部ストレージフォルダを作成します。ディレクトリ名 に
db_homeを指定します。サンプルコードファイル
spark-examples_2.12-3.2.0.jarをdb_homeフォルダにアップロードします。詳細については、「コンソールを使用したファイルのアップロード」をご参照ください。
ADB Spark ノードの構成
以下のパラメーターを使用して ADB Spark ノードの内容を構成します。
言語 | パラメーター | 説明 |
Java/Scala | メイン JAR リソース | OSS 内の JAR パッケージリソースの保存パス。例: |
メインクラス | コンパイル済み JAR パッケージ内のタスクのメインクラス。サンプルコードのメインクラス名は | |
パラメーター | コードに渡すパラメーター情報を入力します。 説明 例の動的パラメーター | |
構成項目 | ここで Spark プログラムの実行時パラメーターを構成できます。詳細については、「Spark アプリケーションの構成パラメーター」をご参照ください。例: |
ADB Spark ノードの内容構成(Python)
実行ファイル(Python)の準備
テストデータファイルとサンプルコードを OSS にアップロードします。これにより、ノード構成内のサンプルコードがテストデータファイルを読み取れるようになります。
テストデータを準備します。
data.txtファイルを作成し、以下の内容をファイルに追加します。Hello,Dataworks Hello,OSSサンプルコードを作成します。
spark_oss.pyファイルを作成し、spark_oss.pyファイルに以下の内容を追加します。import sys from pyspark.sql import SparkSession # Initialize Spark. spark = SparkSession.builder.appName('OSS Example').getOrCreate() # Read the specified file. The file path is specified by the value passed in by args. textFile = spark.sparkContext.textFile(sys.argv[1]) # Calculate and print the number of lines in the file. print("File total lines: " + str(textFile.count())) # Print the first line of the file. print("First line is: " + textFile.first())テストデータとサンプルコードを OSS にアップロードします。
OSS コンソール にログインします。左側のナビゲーションウィンドウで、バケット をクリックします。
バケットページで、バケットの作成 をクリックします。バケットの作成 パネルで、AnalyticDB for MySQL クラスターと同じリージョンにバケットを作成します。
説明本トピックでは、
dw-1127という名前のバケットを例として使用します。外部ストレージフォルダを作成します。
バケット作成後、バケットに移動 をクリックします。オブジェクト ページで、ディレクトリの作成 をクリックして、データベース用の外部ストレージフォルダを作成します。ディレクトリ名 に
db_homeを指定します。テストデータファイル
data.txtおよびサンプルコードファイルspark_oss.pyをdb_homeフォルダにアップロードします。詳細については、「コンソールを使用したファイルのアップロード」をご参照ください。
ADB Spark ノードの構成
以下のパラメーターを使用して ADB Spark ノードの内容を構成します。
言語 | パラメーター | 説明 |
Python | メインパッケージ | 実行するサンプルコードファイルの保存場所を入力します。例: |
パラメーター | 渡すパラメーター情報を入力します。例では、読み取るテストデータファイルの保存場所を示します。例: | |
構成項目 | ここで Spark プログラムの実行時パラメーターを構成できます。詳細については、「Spark アプリケーションの構成パラメーター」をご参照ください。例: |
ステップ 2:ADB Spark ノードのデバッグ
ADB Spark ノードのデバッグプロパティを構成します。
ノード右側の Run Configuration セクションで、コンピューティングリソース、AnalyticDB コンピューティングリソースグループ、リソースグループ、および コンピューティング CU を以下のように構成します。
パラメータータイプ
パラメーター
説明
コンピューティングリソース
コンピューティングリソース
アタッチ済みの AnalyticDB for Spark コンピュートエンジンを選択します。
AnalyticDB コンピューティングリソースグループ
AnalyticDB for MySQL クラスター内に作成したジョブリソースグループを選択します。詳細については、「リソースグループ概要」をご参照ください。
リソースグループ
リソースグループ
AnalyticDB for Spark コンピュートエンジンをアタッチする際に接続性テストに合格したリソースグループを選択します。
コンピューティング用 CUs
現在のノードはデフォルトの CU 値を使用します。CU 値を変更する必要はありません。
ADB Spark ノードをデバッグおよび実行します。
ノードタスクを実行するには、保存 をクリックした後、実行 をクリックします。
ステップ 3:ADB Spark ノードのスケジュール
ADB Spark ノードのスケジューリングプロパティを構成します。
ノードタスクを定期的に実行するには、ノード右側の スケジューリング タブ内の スケジューリングポリシー セクションで以下のパラメーターを構成します。パラメーター構成の詳細については、「ノードのスケジューリングの設定」をご参照ください。
パラメーター
説明
コンピューティングリソース
アタッチ済みの AnalyticDB for Spark コンピュートエンジンを選択します。
AnalyticDB コンピューティングリソースグループ
AnalyticDB for MySQL クラスター内に作成したジョブリソースグループを選択します。詳細については、「リソースグループ概要」をご参照ください。
リソースグループ
AnalyticDB for Spark コンピュートエンジンをアタッチする際に接続性テストに合格したリソースグループを選択します。
コンピューティング用 CUs
現在のノードはデフォルトの CU 値を使用します。CU 値を変更する必要はありません。
ADB Spark ノードを公開します。
ノードタスクを構成したら、ノードを公開する必要があります。詳細については、「ノードまたはワークフローの公開」をご参照ください。
次のステップ
タスクを公開すると、オペレーションセンターで自動トリガーされたタスクの実行状況を確認できます。詳細については、「オペレーションセンターの概要」をご参照ください。