DataWorks は、AnalyticDB Spark タスクを開発し、スケジュールし、他のタスクタイプと統合するための ADB Spark ノードを提供します。このトピックでは、ADB Spark ノードを使用してタスクを開発する方法について説明します。
背景情報
ADB Spark は、大規模な Apache Spark データ処理タスクを実行するための AnalyticDB のコンピュートエンジンです。リアルタイムのデータ分析、複雑なクエリ、機械学習アプリケーションをサポートしています。Java、Scala、Python などの言語での開発を簡素化し、自動的にスケーリングしてパフォーマンスを最適化し、コストを削減できます。タスクを設定するために JAR または .py ファイルをアップロードできます。この機能は、大規模なデータセットの効率的な処理とリアルタイムのインサイトを必要とする業界に最適です。これにより、企業はデータから貴重な情報を抽出し、ビジネスの成長を促進できます。
前提条件
AnalyticDB for MySQL の次の前提条件が満たされています。
DataWorks ワークスペースと同じリージョンに AnalyticDB for MySQL Basic Edition クラスターが作成されていること。 詳細については、「クラスターの作成」をご参照ください。
AnalyticDB for MySQL クラスターにジョブリソースグループが設定されていること。 詳細については、「ジョブリソースグループの作成」をご参照ください。
説明DataWorks を使用して Spark アプリケーションを開発する場合、ジョブリソースグループを作成する必要があります。
ADB Spark ノードでストレージに OSS を使用する場合、OSS バケットが AnalyticDB for MySQL クラスターと同じリージョンにあることを確認してください。
DataWorks の次の前提条件が満たされています。
ワークスペースが作成され、[Data Studio (新バージョン) を使用] オプションが選択され、リソースグループがワークスペースにアタッチされていること。 詳細については、「ワークスペースの作成」をご参照ください。
リソースグループが AnalyticDB for MySQL クラスターと同じ VPC にアタッチされていること。 AnalyticDB for MySQL クラスターの リソースグループ に IP アドレスホワイトリストが設定されていること。 詳細については、「ホワイトリストの設定」をご参照ください。
AnalyticDB for MySQL クラスターインスタンスが AnalyticDB for Spark タイプのコンピュートエンジンとして DataWorks に追加されていること。 リソースグループとコンピュートエンジン間の接続性がテストされていること。 詳細については、「コンピュートエンジンのアタッチ」をご参照ください。
ワークフローフォルダが作成されていること。 詳細については、「ワークフローフォルダの作成」をご参照ください。
ADB Spark ノードが作成されていること。 詳細については、「ワークフローのノードを作成する」をご参照ください。
手順 1: ADB Spark ノードを開発する
ADB Spark ノードでは、選択した [言語] に基づいてノードコンテンツを設定できます。 サンプル JAR パッケージ spark-examples_2.12-3.2.0.jar またはサンプル spark_oss.py ファイルを使用できます。 ノードコンテンツの開発の詳細については、「spark-submit コマンドラインツールを使用して Spark アプリケーションを開発する」をご参照ください。
ADB Spark ノードコンテンツの設定 (Java/Scala)
実行するファイルの準備 (JAR)
サンプル JAR パッケージを OSS にアップロードして、ノード設定で JAR パッケージファイルを実行できるようにします。
サンプル JAR パッケージを準備します。
ADB Spark ノードで使用する spark-examples_2.12-3.2.0.jar サンプル JAR パッケージをダウンロードします。
サンプルコードを OSS にアップロードします。
OSS コンソールにログインします。 左側のナビゲーションウィンドウで、[バケット] をクリックします。
[バケット] ページで、[バケットの作成] をクリックします。 [バケットの作成] パネルで、AnalyticDB for MySQL クラスターと同じリージョンにバケットを作成します。
説明このトピックでは、
dw-1127という名前のバケットを例として使用します。外部ストレージフォルダを作成します。
バケットを作成したら、[バケットに移動] をクリックします。 [ファイル] ページで、[フォルダの作成] をクリックして、データベース用の外部ストレージフォルダを作成します。 [フォルダ名] を
db_homeに設定します。サンプルコードファイル
spark-examples_2.12-3.2.0.jarをdb_homeフォルダにアップロードします。 詳細については、「コンソールを使用してファイルをアップロードする」をご参照ください。
ADB Spark ノードを構成する
次のパラメーターを使用して ADB Spark ノードコンテンツを設定します。
言語 | パラメータ | 説明 |
Java/Scala | メイン JAR リソース | OSS 内の JAR パッケージリソースのストレージパス。 例: |
メインクラス | コンパイルされた JAR パッケージ内のタスクのメインクラス。 サンプルコードのメインクラス名は | |
パラメータ | コードに渡すパラメーター情報を入力します。 このパラメーターは、 説明 例の動的パラメーター | |
構成アイテム | ここで Spark プログラムの実行時パラメーターを設定できます。 詳細については、「Spark アプリケーションの設定パラメーター」をご参照ください。 例: |
ADB Spark ノードコンテンツの設定 (Python)
実行するファイルの準備 (Python)
テストデータファイルとサンプルコードを OSS にアップロードします。 これにより、ノード設定のサンプルコードがテストデータファイルを読み取ることができます。
テストデータを準備します。
data.txtファイルを作成し、ファイルに次のコンテンツを追加します。Hello,Dataworks Hello,OSSサンプルコードを記述します。
spark_oss.pyファイルを作成し、spark_oss.pyファイルに次のコンテンツを追加します。import sys from pyspark.sql import SparkSession # Spark を初期化します。 spark = SparkSession.builder.appName('OSS Example').getOrCreate() # 指定されたファイルを読み取ります。 ファイルパスは args によって渡された値で指定されます。 textFile = spark.sparkContext.textFile(sys.argv[1]) # ファイルの行数を計算して出力します。 print("File total lines: " + str(textFile.count())) # ファイルの最初の行を出力します。 print("First line is: " + textFile.first())テストデータとサンプルコードを OSS にアップロードします。
OSS コンソールにログインします。 左側のナビゲーションウィンドウで、[バケット] をクリックします。
[バケット] ページで、[バケットの作成] をクリックします。 [バケットの作成] パネルで、AnalyticDB for MySQL クラスターと同じリージョンにバケットを作成します。
説明このトピックでは、
dw-1127という名前のバケットを例として使用します。外部ストレージフォルダを作成します。
バケットが作成されたら、[バケットに移動] をクリックします。 [ファイル] ページで、[フォルダの作成] をクリックして、データベース用の外部ストレージフォルダを作成します。 [フォルダ名] を
db_homeに設定します。テストデータファイル
data.txtとサンプルコードファイルspark_oss.pyをdb_homeフォルダにアップロードします。 詳細については、「コンソールを使用してファイルをアップロードする」をご参照ください。
ADB Spark ノードを構成する
次のパラメーターを使用して ADB Spark ノードコンテンツを設定します。
言語 | パラメータ | 説明 |
Python | メインパッケージ | 実行するサンプルコードファイルのストレージ場所を入力します。 例: |
パラメータ | 渡すパラメーター情報を入力します。 この例の情報は、読み取るテストデータファイルのストレージ場所です。 例: | |
構成アイテム | ここで Spark プログラムの実行時パラメーターを設定できます。 詳細については、「Spark アプリケーションの設定パラメーター」をご参照ください。 例: |
手順 2: ADB Spark ノードをデバッグする
ADB Spark ノードのデバッグプロパティを設定します。
ノードの右側にある [デバッグ設定] セクションで、[コンピュートエンジン]、[ADB コンピュートリソースグループ]、[スケジューリングリソースグループ]、および [コンピュート CU] を次のように設定します。
パラメータータイプ
パラメータ
説明
コンピューティングリソース
コンピュートエンジン
アタッチした AnalyticDB for Spark コンピュートエンジンを選択します。
ADB コンピュートリソースグループ
AnalyticDB for MySQL クラスタで作成したジョブリソースグループを選択します。詳細については、「リソースグループの概要」をご参照ください。
Dataworks 構成
リソースグループ
AnalyticDB for Spark コンピュートエンジンをアタッチしたときに接続性テストに合格したリソースグループを選択します。
コンピュート CU
現在のノードはデフォルトの CU 値を使用します。 CU 値を変更する必要はありません。
ADB Spark ノードをデバッグして実行します。
ノードタスクを実行するには、[保存] をクリックしてから [実行] をクリックします。
手順 3: ADB Spark ノードをスケジュールする
ADB Spark ノードのスケジューリングプロパティを設定します。
ノードタスクを定期的に実行するには、ノードの右側にある [スケジューリング設定] タブの [スケジューリングポリシー] セクションで次のパラメーターを設定します。 パラメーター設定の詳細については、「ノードのスケジューリングを設定する」をご参照ください。
パラメータ
説明
コンピューティングリソース
アタッチした AnalyticDB for Spark コンピュートエンジンを選択します。
ADB コンピュートリソースグループ
AnalyticDB for MySQL クラスタで作成したジョブリソースグループを選択します。詳細については、「リソースグループの概要」をご参照ください。
リソースグループ
AnalyticDB for Spark コンピュートエンジンをアタッチしたときに接続性テストに合格したリソースグループを選択します。
コンピュート CU
現在のノードはデフォルトの CU 値を使用します。 CU 値を変更する必要はありません。
ADB Spark ノードを公開します。
ノードタスクを設定した後、ノードを公開する必要があります。 詳細については、「ノードまたはワークフローを公開する」をご参照ください。
次の手順
タスクが公開された後、オペレーションセンターで自動トリガーされたタスクの実行ステータスを表示できます。 詳細については、「オペレーションセンターの概要」をご参照ください。