DataWorks は、ビッグデータ開発と管理のためのエンドツーエンドの機能を提供し、AnalyticDB for MySQL を含む複数のコンピュートエンジンをサポートしています。DataWorks のデータ開発 (DataStudio) モジュールは、視覚的なワークフロー開発、管理されたスケジューリング、および運用保守 (O&M) をサポートしています。これにより、時間や依存関係に基づいてタスクを簡単に管理およびスケジューリングできます。DataWorks で ADB Spark SQL ノードと ADB Spark ノードを使用して、Spark SQL ジョブや Spark アプリケーションジョブなどの Spark ジョブを開発およびスケジューリングできます。
前提条件
お使いの AnalyticDB for MySQL クラスターは、次の要件を満たす必要があります:
AnalyticDB for MySQL Enterprise Edition、Basic Edition、または Data Lakehouse Edition クラスターが作成されていること。
AnalyticDB for MySQL クラスターと同じリージョンに Object Storage Service (OSS) バケットが作成されていること。
AnalyticDB for MySQL クラスターにデータベースアカウントが作成されます。
Alibaba Cloud アカウントを使用する場合は、特権アカウントを作成するだけで済みます。
Resource Access Management (RAM) ユーザーを使用する場合は、特権アカウントと標準アカウントを作成し、標準アカウントを RAM ユーザーに関連付ける必要があります。
AnalyticDB for MySQL クラスター用のリソースグループが作成されていること。詳細については、「リソースグループの作成と管理」をご参照ください。
Spark SQL ジョブを開発するには、AnalyticDB for MySQL にインタラクティブリソースグループを作成します。インタラクティブリソースグループのエンジンタイプは Spark である必要があります。
Spark JAR または PySpark ジョブを開発するには、AnalyticDB for MySQL にジョブリソースグループを作成します。
AnalyticDB for MySQL が AliyunADBSparkProcessingDataRole ロールを偽装して他のクラウドリソースにアクセスすることを承認されていること。
Spark アプリケーションのログストレージパスが構成されていること。
説明AnalyticDB for MySQL コンソールにログインします。管理するクラスターを見つけて、クラスター ID をクリックします。左側のナビゲーションウィンドウで、 を選択します。ログ設定 をクリックします。表示されるダイアログボックスで、デフォルトのパスを選択するか、カスタムストレージパスを指定します。カスタムストレージパスを OSS のルートディレクトリに設定することはできません。カスタムストレージパスには、少なくとも 1 つのレイヤーのフォルダーが含まれていることを確認してください。
お使いの DataWorks ワークスペースは、次の要件を満たす必要があります:
AnalyticDB for MySQL クラスターと DataWorks ワークスペースが同じリージョンにあること。
DataWorks ワークスペースが作成され、[データ開発 (DataStudio) (新バージョン) のパブリックプレビューに参加] スイッチがオンになっていること。
説明新しい DataWorks ワークスペースを作成するときに、[データ開発 (DataStudio) (新バージョン) のパブリックプレビューに参加] スイッチをオンにできます。既存のワークスペースの場合は、してチケットを送信し、[データ開発 (DataStudio) (新バージョン) のパブリックプレビューに参加] スイッチを有効にできます。
リソースグループが作成され、アタッチされていること。詳細については、「リソースグループの作成とアタッチ」をご参照ください。
説明リソースグループを作成するときは、その VPC が AnalyticDB for MySQL クラスターの VPC と同じであることを確認してください。
DataWorks ワークスペースがアタッチされているリソースグループの vSwitch の IPv4 CIDR ブロックが、AnalyticDB for MySQL クラスターのホワイトリストに追加されていること。詳細については、「ホワイトリストの設定」をご参照ください。
AnalyticDB for Spark 計算リソースが DataWorks ワークスペースにアタッチされていること。詳細については、「AnalyticDB for Spark 計算リソースのアタッチ」をご参照ください。
DataWorks での Spark SQL ジョブのスケジューリング
AnalyticDB for MySQL では、外部テーブルと内部テーブルの両方のジョブを開発できます。このトピックでは、外部テーブルジョブを例として、DataWorks で Spark SQL ジョブを開発およびスケジューリングする手順について説明します。
ステップ 1: ADB Spark SQL ノードの作成
DataWorks コンソールの [ワークスペース] ページに移動します。上部のナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[アクション] 列で を選択します。
[プロジェクトフォルダー]の横にある
アイコンをクリックし、を選択します。表示されるダイアログボックスで、ノードの名前を入力して Enter キーを押します。
ステップ 2: ADB Spark SQL ノードの開発
このトピックでは、ADB Spark SQL ノードで外部データベースを作成する方法の例を示します。内部テーブルの作成方法については、「Spark SQL を使用して内部テーブルを作成する」をご参照ください。
CREATE DATABASE IF NOT EXISTS `adb_spark_db` LOCATION 'oss://testBucketname/db_dome';ADB Spark SQL ノードに外部テーブル
adb_spark_db.tb_orderを作成します。CREATE TABLE IF NOT EXISTS adb_spark_db.tb_order(id int, name string, age int) USING parquet LOCATION 'oss://testBucketname/db_dome/tb1' TBLPROPERTIES ('parquet.compress'='SNAPPY');データのクエリ
外部テーブルが作成された後、AnalyticDB for MySQL の SELECT 文を使用して Parquet データをクエリできます。
SELECT * FROM adb_spark_db.tb_order limit 100;ADB Spark SQL ノードに Delta Lake テーブル
adb_spark_db.raw_orderを作成します。CREATE TABLE IF NOT EXISTS adb_spark_db.raw_order(id int, name string, age int) USING delta;adb_spark_db.tb_orderからadb_spark_db.raw_orderにデータをインポートします。INSERT INTO adb_spark_db.raw_order SELECT * FROM adb_spark_db.tb_order;AnalyticDB for MySQL にデータベースを作成します。データベースが既に作成されている場合は、このステップをスキップできます。以下に例を示します:
CREATE DATABASE adb_demo;Delta Lake テーブルからインポートされたデータを格納するために、AnalyticDB for MySQL に内部テーブルを作成します。以下に例を示します:
CREATE TABLE adb_demo.order_xuanwu_format ( `id` int, `name` string, `age` int) using adb TBLPROPERTIES ( 'distributeType'='HASH', 'distributeColumns' = 'id', 'storagePolicy' = 'hot' );Delta Lake テーブル
adb_spark_db.raw_orderからorder_xuanwu_formatにデータをインポートします。INSERT OVERWRITE adb_demo.order_xuanwu_format SELECT * FROM adb_spark_db.adb_spark_db.raw_order;
ステップ 3: ADB Spark SQL ノードの構成と実行
右側のペインで、[デバッグ構成] をクリックして、ADB Spark SQL ノードの実行時パラメーターを構成します。
パラメータータイプ
パラメーター
説明
計算リソース
計算リソース
アタッチされている AnalyticDB for Spark 計算リソースを選択します。
ADB 計算リソースグループ
AnalyticDB for MySQL クラスターで作成した Spark エンジンを持つインタラクティブリソースグループを選択します。
DataWorks 構成
リソースグループ
AnalyticDB for Spark 計算リソースをアタッチしたときに接続性テストに合格した DataWorks リソースグループを選択します。
計算 CU
ノードはデフォルトの CU 値を使用します。CU を変更する必要はありません。
スクリプトパラメーター
パラメーター名
ADB Spark SQL ノードに指定したパラメーターの名前。たとえば、スクリプトで
$[yyyymmdd]パラメーターを構成して、毎日の新しいデータのバッチ同期を実行できます。サポートされているパラメーターとそのフォーマットについては、「スケジューリングパラメーターの構成」をご参照ください。説明システムは、ノードで構成されたパラメーター名を自動的に検出します。
パラメーター値
パラメーター値を構成します。実行時に、パラメーターは実際の値に動的に置き換えられます。
(オプション) ノードタスクをスケジュールで実行するには、ノードの右側にある [スケジューリング構成] ペインの [スケジューリングポリシー] セクションで [計算リソース]、[ADB 計算リソースグループ]、および [スケジューリングリソースグループ] パラメーターを構成します。次に、[スケジューリングパラメーター] セクションでパラメーターを構成します。
デバッグ設定を構成した後、
アイコンをクリックして SQL ノードを保存します。次に、
アイコンをクリックして SQL スクリプトをテストし、期待どおりに実行されることを確認します。スケジューリング構成が完了したら、データベースノードを本番環境に送信して公開できます。
タスクを公開すると、構成されたパラメーターに基づいて定期的に実行されます。 ページで、公開された定期タスクを表示および管理できます。詳細については、「オペレーションセンターの概要」をご参照ください。
DataWorks での Spark JAR ジョブのスケジューリング
ステップ 1: ADB Spark ノードの作成
DataWorks コンソールの [ワークスペース] ページに移動します。上部のナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[アクション] 列で を選択します。
[プロジェクトフォルダー] の横にある
アイコンをクリックし、 を選択します。表示されるダイアログボックスで、ノードの名前を入力して Enter キーを押します。
ステップ 2: ADB Spark ノードの開発
ADB Spark ノードは、Java/Scala および Python での開発をサポートしています。
Java/Scala 開発
サンプル JAR パッケージの準備
サンプル JAR パッケージ spark-examples_2.12-3.2.0.jar をダウンロードして、ADB Spark ノードを開発およびスケジューリングできます。
サンプルコード
spark-examples_2.12-3.2.0.jarを、AnalyticDB for MySQL クラスターと同じリージョンにある OSS バケットにアップロードします。詳細については、「コンソールを使用したファイルのアップロード」をご参照ください。ADB Spark ノードの構成
言語
パラメーター
説明
Java/Scala
メイン JAR リソース
OSS 内の JAR パッケージのストレージパス。例:
oss://testBucketname/db_dome/spark-examples_2.12-3.2.0.jar。メインクラス
実行するメインクラスの名前。例:
com.work.SparkWork。パラメーター
コードに渡すパラメーターを入力します。
構成項目
Spark アプリケーションの実行時パラメーターを構成します。詳細については、「Spark アプリケーションの構成パラメーター」をご参照ください。
例:
spark.driver.resourceSpec: medium
Python 開発
テストデータの準備
Spark を使用して読み取りたい
data.txtという名前の TXT ファイルを作成します。ファイルに次の内容を追加します。Hello,Dataworks Hello,OSSサンプルコードの作成
spark_oss.pyという名前のファイルを作成します。spark_oss.pyファイルに次の内容を追加します。import sys from pyspark.sql import SparkSession # Spark を初期化します。 spark = SparkSession.builder.appName('OSS Example').getOrCreate() # 指定されたファイルを読み取ります。ファイルパスは args によって渡された値で指定されます。 textFile = spark.sparkContext.textFile(sys.argv[1]) # ファイルの行数を計算して出力します。 print("File total lines: " + str(textFile.count())) # ファイルの最初の行を出力します。 print("First line is: " + textFile.first())テストデータ
data.txtとサンプルコードspark_oss.pyを、AnalyticDB for MySQL クラスターと同じリージョンにある OSS バケットにアップロードします。詳細については、「コンソールを使用したファイルのアップロード」をご参照ください。ADB Spark ノードの構成
言語
パラメーター
説明
Python
メインプログラムパッケージ
この例のステップ 3 の
spark_oss.pyの OSS パス。例:oss://testBucketname/db_dome/spark_oss.py。パラメーター
この例のステップ 3 の
data.txtの OSS パス。例:oss://testBucketname/db_dome/data.txt。構成項目
Spark プログラムの実行時パラメーターを構成します。詳細については、「Spark アプリケーションの構成パラメーター」をご参照ください。
例:
spark.driver.resourceSpec: medium
ステップ 3: ADB Spark ノードの構成と実行
右側のペインで、[デバッグ構成] をクリックして、ADB Spark ノードの実行時パラメーターを構成します。
パラメータータイプ
パラメーター
説明
計算リソース
計算リソース
アタッチされている AnalyticDB for Spark 計算リソースを選択します。
ADB 計算リソースグループ
AnalyticDB for MySQL クラスターで作成したジョブリソースグループを選択します。
DataWorks 構成
リソースグループ
AnalyticDB for Spark 計算リソースをアタッチしたときに接続性テストに合格した DataWorks リソースグループを選択します。
計算 CU
ノードはデフォルトの CU 値を使用します。CU を変更する必要はありません。
スクリプトパラメーター
パラメーター名
ADB Spark JAR ジョブで構成したパラメーターの名前。
説明システムは、ノードで構成されたパラメーター名を自動的に検出します。
パラメーター値
パラメーター値を構成します。実行時に、パラメーターは実際の値に動的に置き換えられます。
(オプション) ノードタスクをスケジュールで実行するには、ノードの右側にある [スケジューリング構成] ペインの [スケジューリングポリシー] セクションで [計算リソース]、[ADB 計算リソースグループ]、および [スケジューリングリソースグループ] パラメーターを構成します。次に、[スケジューリングパラメーター] セクションでパラメーターを構成します。
デバッグ設定を構成した後、
アイコンをクリックしてノードを保存します。次に、
アイコンをクリックしてスクリプトをテストし、期待どおりに実行されることを確認します。スケジューリング構成が完了したら、データベースノードを本番環境に公開できます。
公開された定期タスクは、構成されたパラメーターに基づいて定期的に実行されます。タスクは ページで表示および管理できます。詳細については、「オペレーションセンターの概要」をご参照ください。