すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Alibaba Cloud DataWorks を介したジョブの送信

最終更新日:Oct 16, 2025

Alibaba Cloud DataWorks は、E-MapReduce 上に Hive、Spark SQL、Spark、およびその他のノードを作成して、タスクワークフローを設定およびスケジュールすることをサポートしています。また、メタデータ管理とデータ品質モニタリングアラート機能も提供し、ユーザーが効率的にデータを開発および管理するのに役立ちます。このトピックでは、Alibaba Cloud DataWorks を介してジョブを送信する方法について説明します。

サポートされているクラスタータイプ

DataWorks は現在、次のクラスタータイプの登録をサポートしています。

  • DataLake クラスター (新しいデータレイク)

  • カスタムクラスター

  • Hadoop クラスター (古いデータレイク)

重要
  • DataWorks では、次のバージョンの EMR Hadoop クラスターを使用できます。

    EMR-3.38.2、EMR-3.38.3、EMR-4.9.0、EMR-5.6.0、EMR-3.26.3、EMR-3.27.2、EMR-3.29.0、EMR-3.32.0、EMR-3.35.0、EMR-4.3.0、EMR-4.4.1、EMR-4.5.0、EMR-4.5.1、EMR-4.6.0、EMR-4.8.0、EMR-5.2.1、EMR-5.4.3

  • Hadoop クラスター (古いデータレイク) は推奨されなくなりました。できるだけ早く DataLake クラスターに移行する必要があります。詳細については、「Hadoop クラスターを DataLake クラスターに移行する」をご参照ください。

制限事項

  • タスクタイプ: DataWorks コンソールで EMR Flink タスクを実行することはできません。

  • タスクの実行: サーバーレスリソースグループ (推奨) または古いバージョンの専用スケジューリングリソースグループを使用して、EMR タスクを実行できます。

  • タスクガバナンス:

    • EMR Hive、EMR Spark、および EMR Spark SQL ノードの SQL タスクのみがデータリネージの生成に使用できます。EMR クラスターが V3.43.1、V5.9.1、または V3.43.1 または V5.9.1 より後のマイナーバージョンの場合、クラスターに基づいて作成された先行ノードのテーブルレベルのリネージとフィールドレベルのリネージを表示できます。

      説明

      Spark ベースの EMR ノードの場合、EMR クラスターが V5.8.0、V3.42.0、または V5.8.0 または V3.42.0 より後のマイナーバージョンの場合、Spark ベースの EMR ノードを使用してテーブルレベルおよびフィールドレベルのリネージを表示できます。EMR クラスターが V5.8.0 または V3.42.0 より前のマイナーバージョンの場合、Spark 2.x を使用する Spark ベースの EMR ノードのみがテーブルレベルのリネージの表示に使用できます。

    • DataWorks で DataLake またはカスタムクラスターのメタデータを管理する場合は、まずクラスターに EMR-HOOK を設定する必要があります。目的のクラスターに EMR-HOOK を設定しないと、メタデータをリアルタイムで表示できず、監査ログを生成できず、DataWorks でデータリネージを表示できません。さらに、EMR ガバナンスタスクを実行できません。EMR-HOOK は、EMR Hive および EMR Spark SQL サービス用に設定できます。詳細については、「Hive 拡張機能を使用してデータリネージと履歴アクセス情報を記録する」および「Spark SQL 拡張機能を使用してデータリネージと履歴アクセス情報を記録する」をご参照ください。

  • サポートされているリージョン: EMR Serverless Spark は、中国 (杭州)、中国 (上海)、中国 (北京)、中国 (張家口)、中国 (深圳)、シンガポール、ドイツ (フランクフルト)、および米国 (シリコンバレー) リージョンで利用できます。

  • Kerberos 認証が有効になっている EMR クラスターの場合、リソースグループが関連付けられている vSwitch の CIDR ブロックに対して、EMR クラスターのセキュリティグループに UDP ポートのインバウンドルールを追加する必要があります。

    説明

    インバウンドルールを追加するには、次の操作を実行します。EMR コンソールにログオンします。EMR クラスターの [基本情報] タブに移動します。[基本情報] タブのセキュリティセクションで、[クラスターセキュリティグループ] パラメーターの右側にある image アイコンをクリックします。セキュリティグループページの [セキュリティグループの詳細] タブで、[アクセスルール] セクションの [インバウンド] タブをクリックします。[インバウンド] タブで、[ルールを追加] をクリックします。[プロトコルタイプ] パラメーターを [カスタム UDP] に、[ポート範囲] パラメーターを EMR クラスターの /etc/krb5.conf ファイルで指定されている構成に、[認証オブジェクト] パラメーターをリソースグループが関連付けられている vSwitch の CIDR ブロックに設定します。

前提条件

  • 次の権限が付与されています。

    次の ID を持つ RAM ユーザーまたは RAM ロールのみが EMR クラスターを登録できます。操作の詳細については、「RAM ユーザーに権限を付与する」をご参照ください。

    • Alibaba Cloud アカウント。

    • DataWorks ワークスペース管理者ロール AliyunEMRFullAccess ポリシーの両方を持つ RAM ユーザーまたは RAM ロール。

    • AliyunDataWorksFullAccess AliyunEMRFullAccess ポリシーの両方を持つ RAM ユーザーまたは RAM ロール。

  • 対応するタイプの EMR クラスターが購入されています。この例では、EMR クラスターのリージョンは 中国 (上海) です。

    DataWorks が登録をサポートするクラスタータイプの詳細については、「サポートされているクラスタータイプ」をご参照ください。

注意事項

  • 標準モードのワークスペースを使用して、開発環境の EMR データを本番環境の EMR データから分離する場合は、ワークスペースの開発環境と本番環境に異なる EMR クラスターを登録する必要があります。さらに、EMR クラスターのメタデータは、次のいずれかの方法で保存する必要があります。

  • EMR クラスターを同じ Alibaba Cloud アカウント内の複数のワークスペースに登録できますが、Alibaba Cloud アカウントをまたいで複数のワークスペースに EMR クラスターを登録することはできません。たとえば、現在の Alibaba Cloud 内のワークスペースに EMR クラスターを登録した場合、そのクラスターを別の Alibaba Cloud アカウントのワークスペースに登録することはできません。

  • DataWorks リソースグループと EMR クラスターが同じ VPC (Virtual Private Cloud) にデプロイされ、同じ vSwitch を使用しているにもかかわらず、リソースグループが期待どおりに EMR クラスターに接続できない場合は、EMR クラスターのセキュリティグループルールを確認し、vSwitch の CIDR ブロックと 一般的なオープンソースコンポーネントのポート のインバウンドルールを EMR クラスターのセキュリティグループルールに追加して、DataWorks リソースグループを使用して EMR クラスターに期待どおりにアクセスできるようにします。詳細については、「セキュリティグループの管理」をご参照ください。

DataWorks 環境の準備

DataWorks でタスクを開発する前に、DataWorks をアクティブ化する必要があります。詳細については、「環境の準備」をご参照ください。

ステップ 1: ワークスペースの作成

中国 (上海) リージョンにワークスペースが存在する場合は、このステップをスキップして既存のワークスペースを使用してください。

  1. DataWorks コンソールにログインします。上部のナビゲーションバーで、[中国 (上海)] リージョンを選択します。

  2. 左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。[ワークスペース] ページで、[ワークスペースの作成] をクリックして、標準モードでワークスペースを作成します。詳細については、「ワークスペースの作成」をご参照ください。標準モードのワークスペースでは、開発環境は本番環境から分離されています。

ステップ 2: サーバーレスリソースグループの作成

このチュートリアルでは、データ同期とスケジューリングのためにサーバーレスリソースグループが必要です。そのため、サーバーレスリソースグループを購入して設定する必要があります。

  1. サーバーレスリソースグループを購入します。

    1. DataWorks コンソールにログインします。上部のナビゲーションバーで、[中国 (上海)] リージョンを選択します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックして、[リソースグループ] ページに移動します。

    2. [リソースグループ] ページで、[リソースグループの作成] をクリックします。購入ページで、[リージョンとゾーン][中国 (上海)] に設定し、[リソースグループ名] を指定し、プロンプトに従って他のパラメーターを設定してから、画面の指示に従ってリソースグループの支払いを行います。サーバーレスリソースグループの課金の詳細については、「サーバーレスリソースグループの課金」をご参照ください。

      説明

      現在のリージョンに VPC (Virtual Private Cloud) または vSwitch が存在しない場合は、パラメーターの説明にあるリンクをクリックして VPC コンソールに移動し、作成します。VPC と vSwitch の詳細については、「VPC とは」をご参照ください。

  2. サーバーレスリソースグループを DataWorks ワークスペースに関連付けます。

    購入したサーバーレスリソースグループを後続の操作で使用するには、サーバーレスリソースグループをワークスペースに関連付ける必要があります。

    DataWorks コンソールにログインします。上部のナビゲーションバーで、[中国 (上海)] リージョンを選択します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックします。[リソースグループ] ページで、購入したサーバーレスリソースグループを見つけ、[アクション] 列の [ワークスペースの関連付け] をクリックします。[ワークスペースの関連付け] パネルで、関連付けるワークスペースを見つけ、[アクション] 列の [関連付け] をクリックします。

  3. サーバーレスリソースグループがインターネットにアクセスできるようにします。

    このチュートリアルで使用するテストデータは、インターネット経由で取得する必要があります。デフォルトでは、サーバーレスリソースグループはインターネットへのアクセスに使用できません。サーバーレスリソースグループが関連付けられている VPC にインターネット NAT ゲートウェイを設定し、VPC に EIP を設定して、VPC とテストデータのネットワーク環境との間にネットワーク接続を確立する必要があります。これにより、サーバーレスリソースグループを使用してテストデータにアクセスできます。

    1. VPC コンソールのインターネット NAT ゲートウェイページに移動します。上部のナビゲーションバーで、[中国 (上海)] リージョンを選択します。

    2. [インターネット NAT ゲートウェイの作成] をクリックし、パラメーターを設定します。次の表に、このチュートリアルで必要な主要なパラメーターを示します。次の表に記載されていないパラメーターについては、デフォルト値を保持できます。

      パラメーター

      説明

      リージョン

      中国 (上海) を選択します。

      VPC

      リソースグループが関連付けられている VPC と vSwitch を選択します。

      リソースグループが関連付けられている VPC と vSwitch を表示するには、次の操作を実行します。DataWorks コンソールにログインします。上部のナビゲーションバーで、DataWorks をアクティブ化したリージョンを選択します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックします。[リソースグループ] ページで、作成したリソースグループを見つけ、[アクション] 列の [ネットワーク設定] をクリックします。表示されるページの [VPC バインディング] タブの [データスケジューリング & データ統合] セクションで、リソースグループが関連付けられている [VPC][vSwitch] を表示します。VPC と vSwitch の詳細については、「VPC とは」をご参照ください。

      VSwitch の関連付け

      アクセスモード

      SNAT 対応モードを選択します。

      EIP

      EIP の購入を選択します。

      サービスリンクロール

      初めて NAT ゲートウェイを作成する場合は、[サービスリンクロールの作成] をクリックしてサービスリンクロールを作成します。

    3. [今すぐ購入] をクリックします。[確認] ページで、サービス利用規約を読み、[サービス利用規約] のチェックボックスをオンにしてから、[今すぐアクティブ化] をクリックします。

サーバーレスリソースグループの作成と使用方法の詳細については、「サーバーレスリソースグループの使用」をご参照ください。

ステップ 3: EMR クラスターを DataWorks に登録し、リソースグループを初期化する

EMR クラスターを DataWorks で使用するには、クラスターを DataWorks に登録する必要があります。

  1. EMR クラスターの登録ページに移動します。

    1. SettingCenter ページに移動します。

      DataWorks コンソールにログインします。上部のナビゲーションバーで、[中国 (上海)] リージョンを選択します。左側のナビゲーションウィンドウで、[詳細] > [管理センター] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターに移動] をクリックします。

    2. SettingCenter ページの左側のナビゲーションウィンドウで、[クラスター管理] をクリックします。[クラスター管理] ページで、[クラスターの登録] をクリックします。[クラスタータイプの選択] ダイアログボックスで、[E-MapReduce] をクリックします。[EMR クラスターの登録] ページが表示されます。

  2. EMR クラスターを DataWorks に登録します。

    [EMR クラスターの登録] ページで、クラスター情報を設定します。次の表に、主要なパラメーターを示します。

    パラメーター

    説明

    クラスターが属する Alibaba Cloud アカウント

    [現在の Alibaba Cloud アカウント] に設定します。

    クラスタータイプ

    [データレイク] を選択します。

    デフォルトのアクセス ID

    [クラスターアカウント: Hadoop] に設定します。

    プロキシユーザー情報の受け渡し

    [受け渡し] に設定します。

  3. リソースグループを初期化します。

    1. SettingCenter の [クラスター管理] ページに移動します。DataWorks に登録されている EMR クラスターを見つけ、EMR クラスターの情報を表示するセクションで [リソースグループの初期化] をクリックします。

    2. [リソースグループの初期化] ダイアログボックスで、目的のリソースグループを見つけ、[初期化] をクリックします。

    3. 初期化が完了したら、[OK] をクリックします。

    重要

    リソースグループの初期化が成功したことを確認する必要があります。そうしないと、リソースグループを使用するタスクが失敗する可能性があります。リソースグループの初期化が失敗した場合は、失敗の原因を表示し、プロンプトに従ってネットワーク接続診断を実行できます。

EMR クラスターの登録方法の詳細については、「DataStudio (旧バージョン): EMR 計算資源の関連付け」をご参照ください。

EMR ジョブの送信

EMR Hive ジョブの送信

ステップ 1: EMR Hive ノードの作成

  1. DataStudio ページに移動します。

    DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

  2. EMR Hive ノードを作成します。

    1. 目的のワークフローを見つけ、ワークフローの名前を右クリックし、[ノードの作成] > [EMR] > [EMR Hive] を選択します。

      説明

      または、[作成] アイコンにポインターを合わせ、[ノードの作成] > [EMR] > [EMR Hive] を選択することもできます。

    2. [ノードの作成] ダイアログボックスで、[名前][エンジンインスタンス][ノードタイプ]、および [パス] パラメーターを設定します。[確認] をクリックします。EMR Hive ノードの設定タブが表示されます。

      説明

      ノード名には、文字、数字、アンダースコア (_)、およびピリオド (.) のみを含めることができます。

ステップ 2: EMR Hive タスクの開発

EMR Hive ノードの設定タブで Hive タスクを開発できます。

SQL コードの開発

SQL エディターで、ノードコードを開発します。ノードコードで ${Variable} 形式で変数を定義し、[プロパティ] タブの [スケジューリングパラメーター] セクションで、変数に値として割り当てられるスケジューリングパラメーターを設定できます。これにより、ノードが実行されるようにスケジュールされると、スケジューリングパラメーターの値がノードコードで動的に置き換えられます。スケジューリングパラメーターの使用方法の詳細については、「スケジューリングパラメーターのサポートされているフォーマット」をご参照ください。サンプルコード:

show tables;
select '${var}'; -- var 変数に特定のスケジューリングパラメーターを割り当てることができます。
select * from userinfo ;
説明

Hive タスクの実行

  1. ツールバーで、高级运行 アイコンをクリックします。[パラメーター] ダイアログボックスで、[リソースグループ名] ドロップダウンリストから目的のリソースグループを選択し、[実行] をクリックします。

    説明
    • インターネットまたは VPC (Virtual Private Cloud) 経由で計算資源にアクセスする場合は、計算資源に接続されているスケジューリング用のリソースグループを使用します。詳細については、「ネットワーク接続ソリューション」をご参照ください。

    • 後続の操作でリソースグループを変更する場合は、高级运行 (パラメーターを指定して実行) アイコンをクリックして、[パラメーター] ダイアログボックスでリソースグループを変更できます。

    • EMR Hive ノードを使用してデータをクエリする場合、最大 10,000 件のデータレコードを返すことができ、返されるデータレコードの合計サイズは 10 MB を超えることはできません。

  2. 上部のツールバーの 保存 アイコンをクリックして、SQL 文を保存します。

  3. オプション。スモークテストを実行します。

    ノードをコミットするとき、またはノードをコミットした後に、開発環境でノードのスモークテストを実行できます。詳細については、「スモークテストの実行」をご参照ください。

説明

ジョブがコミットされるキューを変更する場合は、「高度なパラメーターの設定」をご参照ください。

ステップ 3: スケジュールプロパティの設定

システムがノード上のタスクを定期的に実行するようにしたい場合は、ノードの設定タブの右側のナビゲーションウィンドウで [プロパティ] をクリックして、ビジネス要件に基づいてタスクのスケジュールプロパティを設定できます。詳細については、「概要」をご参照ください。

説明

タスクをコミットする前に、[プロパティ] タブで [再実行] および [親ノード] パラメーターを設定する必要があります。

ステップ 4: タスクのデプロイ

ノード上のタスクが設定された後、タスクをコミットしてデプロイする必要があります。タスクをコミットしてデプロイすると、システムはスケジューリング設定に基づいて定期的にタスクを実行します。

  1. 上部のツールバーの 保存 アイコンをクリックして、タスクを保存します。

  2. 上部のツールバーの 提交 アイコンをクリックして、タスクをコミットします。

    [送信] ダイアログボックスで、[変更の説明] パラメーターを設定します。次に、ビジネス要件に基づいて、タスクをコミットした後にタスクコードをレビューするかどうかを決定します。

    説明
    • タスクをコミットする前に、[プロパティ] タブで [再実行] および [親ノード] パラメーターを設定する必要があります。

    • コードレビュー機能を使用して、タスクのコード品質を確保し、無効なタスクコードによるタスク実行エラーを防ぐことができます。コードレビュー機能を有効にすると、コミットされたタスクコードは、タスクコードがコードレビューに合格した後にのみデプロイできます。詳細については、「コードレビュー」をご参照ください。

標準モードのワークスペースを使用する場合、タスクをコミットした後に本番環境にタスクをデプロイする必要があります。ノードにタスクをデプロイするには、ノードの設定タブの右上隅にある [デプロイ] をクリックします。詳細については、「ノードのデプロイ」をご参照ください。

EMR Spark SQL ジョブの送信

1. EMR Spark SQL ノードの作成

  1. DataStudio ページに移動します。

    DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

  2. EMR Spark SQL ノードを作成します。

    1. ターゲットのビジネスフローを右クリックし、[新しいノード] > [EMR] > [EMR Spark SQL] を選択します。

      説明

      または、[新規] にカーソルを合わせ、[新しいノード] > [EMR] > [EMR Spark SQL] を選択することもできます。

    2. [新しいノード] ダイアログボックスで、[名前][エンジンインスタンス][ノードタイプ]、および [パス] を設定します。[確認] をクリックします。EMR Spark SQL ノードの設定タブが表示されます。

      説明

      ノード名には、大文字、小文字、漢字、数字、アンダースコア (_)、およびピリオド (.) を含めることができます。

2. EMR Spark SQL タスクの開発

EMR Spark SQL ノード設定タブで、作成したノードをダブルクリックします。タスク開発タブが表示されます。

SQL コードの開発

SQL 編集エリアで、タスクコードを開発します。コード内で ${variable_name} 形式を使用して変数を定義できます。ノード設定タブの右側のナビゲーションウィンドウにある [スケジューリング設定] > [スケジューリングパラメーター] セクションで、変数に値を割り当てることができます。これにより、スケジューリングシナリオでパラメーターを動的に渡すことができます。スケジューリングパラメーターの使用方法の詳細については、「スケジューリングパラメーターのサポートされているフォーマット」をご参照ください。以下に例を示します。

SHOW TABLES; 
-- ${var} を使用して var という名前の変数を定義します。この変数に ${yyyymmdd} の値を割り当てると、データタイムスタンプを接尾辞として持つテーブルを作成できます。
CREATE TABLE IF NOT EXISTS userinfo_new_${var} (
ip STRING COMMENT'IP アドレス',
uid STRING COMMENT'ユーザー ID'
)PARTITIONED BY(
dt STRING
); -- これはスケジューリングパラメーターと併用できます。
説明
  • SQL 文は 130 KB を超えることはできません。

  • [データ開発] でワークスペースに複数の EMR 計算資源がアタッチされている場合は、計算資源を選択する必要があります。

SQL タスクの実行

  1. ツールバーで、高级运行 アイコンをクリックします。[パラメーター] ダイアログボックスで、作成したスケジューリングリソースグループを選択し、[実行] をクリックします。

    説明
    • パブリックネットワークまたは VPC ネットワーク環境で計算資源にアクセスするには、計算資源との接続性テストに合格したスケジューリングリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。

    • 後続のタスク実行のためにリソースグループを変更するには、[パラメーターを指定して実行] 高级运行 アイコンをクリックし、切り替えたいスケジューリングリソースグループを選択します。

    • EMR Spark SQL ノードを使用してデータをクエリする場合、クエリは最大 10,000 レコードを返すことができ、合計データサイズは 10 MB を超えることはできません。

  2. 保存 アイコンをクリックして SQL 文を保存します。

  3. (オプション) スモークテストを実行します。

    開発環境でスモークテストを実行したい場合は、ノードを送信するとき、またはノードが送信された後に実行できます。詳細については、「スモークテストの実行」をご参照ください。

説明

ジョブがコミットされるキューを変更する場合は、「高度なパラメーターの設定」をご参照ください。

ステップ 3: スケジュールプロパティの設定

システムがノード上のタスクを定期的に実行するようにしたい場合は、ノードの設定タブの右側のナビゲーションウィンドウで [プロパティ] をクリックして、ビジネス要件に基づいてタスクのスケジュールプロパティを設定できます。詳細については、「概要」をご参照ください。

説明

タスクをコミットする前に、[プロパティ] タブで [再実行] および [親ノード] パラメーターを設定する必要があります。

ステップ 4: タスクのデプロイ

ノード上のタスクが設定された後、タスクをコミットしてデプロイする必要があります。タスクをコミットしてデプロイすると、システムはスケジューリング設定に基づいて定期的にタスクを実行します。

  1. 上部のツールバーの 保存 アイコンをクリックして、タスクを保存します。

  2. 上部のツールバーの 提交 アイコンをクリックして、タスクをコミットします。

    [送信] ダイアログボックスで、[変更の説明] パラメーターを設定します。次に、ビジネス要件に基づいて、タスクをコミットした後にタスクコードをレビューするかどうかを決定します。

    説明
    • タスクをコミットする前に、[プロパティ] タブで [再実行] および [親ノード] パラメーターを設定する必要があります。

    • コードレビュー機能を使用して、タスクのコード品質を確保し、無効なタスクコードによるタスク実行エラーを防ぐことができます。コードレビュー機能を有効にすると、コミットされたタスクコードは、タスクコードがコードレビューに合格した後にのみデプロイできます。詳細については、「コードレビュー」をご参照ください。

標準モードのワークスペースを使用する場合、タスクをコミットした後に本番環境にタスクをデプロイする必要があります。ノードにタスクをデプロイするには、ノードの設定タブの右上隅にある [デプロイ] をクリックします。詳細については、「ノードのデプロイ」をご参照ください。

EMR Spark ジョブの送信

ステップ 1: EMR Spark ノードの作成

  1. DataStudio ページに移動します。

    DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

  2. EMR Spark ノードを作成します。

    1. 目的のワークフローを見つけ、ワークフロー名を右クリックし、[ノードの作成] > [EMR] > [EMR Spark] を選択します。

      説明

      または、[作成] アイコンにポインターを合わせ、[ノードの作成] > [EMR] > [EMR Spark] を選択することもできます。

    2. [ノードの作成] ダイアログボックスで、[名前][エンジンインスタンス][ノードタイプ]、および [パス] パラメーターを設定します。[確認] をクリックします。EMR Spark ノードの設定タブが表示されます。

      説明

      ノード名には、文字、数字、アンダースコア (_)、およびピリオド (.) のみを含めることができます。

ステップ 2: Spark タスクの開発

ビジネス要件に基づいて、次のいずれかの方法を使用して、EMR Spark ノードの設定タブで Spark タスクを開発できます。

  • (推奨) オンプレミスのマシンから DataStudio にリソースをアップロードし、そのリソースを参照します。詳細については、このトピックの「方法 1: EMR JAR リソースのアップロードと参照」セクションをご参照ください。

  • OSS REF メソッドを使用して OSS リソースを参照します。詳細については、このトピックの「方法 2: OSS リソースの参照」セクションをご参照ください。

方法 1: EMR JAR リソースのアップロードと参照

DataWorks では、リソースを参照する前に、オンプレミスのマシンから DataStudio にリソースをアップロードできます。EMR で Spark タスクのコードがコンパイルされた後に生成される JAR パッケージを取得して保存する必要があります。JAR パッケージの保存方法は、JAR パッケージのサイズによって異なります。

JAR パッケージを EMR JAR リソースとして DataWorks コンソールにアップロードし、リソースをコミットできます。JAR パッケージを EMR の HDFS に保存することもできます。EMR on ACK ページで作成された Spark クラスターまたは EMR Serverless Spark クラスターの場合、HDFS にリソースをアップロードすることはできません。

JAR パッケージのサイズが 200 MB 未満の場合
  1. EMR JAR リソースを作成します。

    オンプレミスのマシンから JAR パッケージを EMR JAR リソースとして DataWorks コンソールにアップロードできます。これにより、DataWorks コンソールで JAR パッケージを視覚的に管理できます。EMR JAR リソースを作成した後、リソースをコミットする必要があります。詳細については、「EMR リソースの作成と使用」をご参照ください。

    image.png

    説明

    初めて EMR JAR リソースを作成する場合、JAR パッケージがアップロードされた後に OSS に保存されるようにするには、まずプロンプトに従って権限付与を行う必要があります。

  2. EMR JAR リソースを参照します。

    1. 作成した [EMR Spark] ノードの名前をダブルクリックして、ノードの設定タブに移動します。

    2. [EMR] フォルダの [リソース] で目的の EMR JAR リソースを見つけ、リソース名を右クリックして [リソースパスの挿入] を選択します。

    3. リソース参照コードが EMR Spark ノードの設定タブに自動的に追加されます。サンプルコード:

      ##@resource_reference{"spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar"}
      spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar

      上記のコードが正常に追加されると、リソースが参照されます。spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar は、アップロードした JAR パッケージの名前です。

    4. EMR Spark ノードのコードを書き換え、spark-submit コマンドを追加します。次のサンプルコードは参照用です。

      説明

      EMR Spark ノードのコードを記述する際にコメントを追加することはできません。コメントを追加すると、EMR Spark ノードの実行時にエラーが報告されます。次のサンプルコードを参照して、EMR Spark ノードのコードを書き換えることができます。

      ##@resource_reference{"spark-examples_2.11-2.4.0.jar"}
      spark-submit --class org.apache.spark.examples.SparkPi --master yarn  spark-examples_2.11-2.4.0.jar 100

      コンポーネント:

      • org.apache.spark.examples.SparkPi: コンパイルされた JAR パッケージ内のタスクのメインクラス。

      • spark-examples_2.11-2.4.0.jar: アップロードした JAR パッケージの名前。

      • 他のパラメーターの設定は変更しないでおくことができます。また、次のコマンドを実行して spark-submit コマンドの使用に関するヘルプドキュメントを表示し、ビジネス要件に基づいて spark-submit コマンドを変更することもできます。

        説明
        • spark-submit コマンドを実行して簡略化されたパラメーター (例: --executor-memory 2G) を EMR Spark ノードで使用する場合は、そのパラメーターを EMR Spark ノードのコードに追加する必要があります。

        • YARN 上の Spark ノードを使用してジョブを送信できるのは、ノードがクラスターモードの場合のみです。

        • spark-submit を使用してノードをコミットする場合は、deploy-mode を client ではなく cluster に設定することをお勧めします。

        spark-submit --help

        image.png

JAR パッケージのサイズが 200 MB 以上の場合
  1. JAR パッケージを EMR の HDFS に保存します。

    オンプレミスのマシンから JAR パッケージを DataWorks リソースとして DataWorks コンソールにアップロードすることはできません。JAR パッケージを EMR の HDFS に保存し、JAR パッケージの保存パスを記録することをお勧めします。これにより、DataWorks を使用して Spark タスクをスケジュールする際に、このパスで JAR パッケージを参照できます。

  2. JAR パッケージを参照します。

    EMR Spark ノードのコードで JAR パッケージの保存パスを指定することで、JAR パッケージを参照できます。

    1. 作成した [EMR Spark] ノードの名前をダブルクリックして、ノードの設定タブに移動します。

    2. spark-submit コマンドを記述します。例:

      spark-submit --master yarn
      --deploy-mode cluster
      --name SparkPi
      --driver-memory 4G
      --driver-cores 1
      --num-executors 5
      --executor-memory 4G
      --executor-cores 1
      --class org.apache.spark.examples.JavaSparkPi
      hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100

      パラメーターの説明:

      • hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar: HDFS 内の JAR パッケージの保存パス。

      • org.apache.spark.examples.JavaSparkPi: コンパイルされた JAR パッケージ内のタスクのメインクラス。

      • 他のパラメーターは、使用される EMR クラスターで設定されます。ビジネス要件に基づいてパラメーターを変更できます。また、次のコマンドを実行して spark-submit コマンドの使用に関するヘルプドキュメントを表示し、ビジネス要件に基づいて spark-submit コマンドを変更することもできます。

        重要
        • spark-submit コマンドを実行して簡略化されたパラメーター (例: --executor-memory 2G) を EMR Spark ノードで使用する場合は、そのパラメーターを EMR Spark ノードのコードに追加する必要があります。

        • YARN 上の Spark ノードを使用してジョブを送信できるのは、ノードがクラスターモードの場合のみです。

        • spark-submit を使用してノードをコミットする場合は、deploy-mode を client ではなく cluster に設定することをお勧めします。

        spark-submit --help

        image.png

方法 2: OSS リソースの参照

現在のノードは、OSS REF メソッドを使用して OSS リソースを参照できます。ノードでタスクを実行すると、DataWorks はノードコードで指定された OSS リソースを自動的にロードします。この方法は、EMR タスクで JAR 依存関係が必要な場合や、EMR タスクがスクリプトに依存する必要があるシナリオで一般的に使用されます。

  1. JAR パッケージを開発します。

    1. コードの依存関係を準備します。

      EMR クラスターにアクセスし、マスターノードの /usr/lib/emr/spark-current/jars/ パスで必要なコードの依存関係を表示できます。以下の情報は Spark 3.4.2 を例として使用しています。既存の IntelliJ IDEA プロジェクトを開き、Project Object Model (POM) の依存関係と参照プラグインを追加する必要があります。

      POM 依存関係の追加
      <dependencies>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-core_2.12</artifactId>
                  <version>3.4.2</version>
              </dependency>
              <!-- Apache Spark SQL -->
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.12</artifactId>
                  <version>3.4.2</version>
              </dependency>
      </dependencies>
      参照プラグイン
      <build>
              <sourceDirectory>src/main/scala</sourceDirectory>
              <testSourceDirectory>src/test/scala</testSourceDirectory>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <version>3.7.0</version>
                      <configuration>
                          <source>1.8</source>
                          <target>1.8</target>
                      </configuration>
                  </plugin>
                  <plugin>
                      <artifactId>maven-assembly-plugin</artifactId>
                      <configuration>
                          <descriptorRefs>
                              <descriptorRef>jar-with-dependencies</descriptorRef>
                          </descriptorRefs>
                      </configuration>
                      <executions>
                          <execution>
                              <id>make-assembly</id>
                              <phase>package</phase>
                              <goals>
                                  <goal>single</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
                  <plugin>
                      <groupId>net.alchim31.maven</groupId>
                      <artifactId>scala-maven-plugin</artifactId>
                      <version>3.2.2</version>
                      <configuration>
                          <recompileMode>incremental</recompileMode>
                      </configuration>
                      <executions>
                          <execution>
                              <goals>
                                  <goal>compile</goal>
                                  <goal>testCompile</goal>
                              </goals>
                              <configuration>
                                  <args>
                                      <arg>-dependencyfile</arg>
                                      <arg>${project.build.directory}/.scala_dependencies</arg>
                                  </args>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
    2. コードを記述します。サンプルコード:

      package com.aliyun.emr.example.spark
      
      import org.apache.spark.sql.SparkSession
      
      object SparkMaxComputeDemo {
        def main(args: Array[String]): Unit = {
          // Spark セッションを作成します。
          val spark = SparkSession.builder()
            .appName("HelloDataWorks")
            .getOrCreate()
      
          // Spark のバージョンを表示します。
          println(s"Spark version: ${spark.version}")
        }
      }
    3. コードを JAR ファイルにパッケージ化します。

      上記のコードを記述して保存した後、コードを JAR ファイルにパッケージ化します。この例では、SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar という名前のファイルが生成されます。

  2. JAR ファイルをアップロードします。

    1. OSS コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。次に、左側のナビゲーションウィンドウで [バケット] をクリックします。

    2. [バケット] ページで、目的のバケットを見つけ、バケット名をクリックして [オブジェクト] ページに移動します。

      この例では、onaliyun-bucket-2 バケットが使用されます。

    3. [オブジェクト] ページで、[ディレクトリの作成] をクリックして、JAR ファイルを保存するために使用するディレクトリを作成します。

      [ディレクトリの作成] パネルで、[ディレクトリ名]emr/jars に設定し、[OK] をクリックします。

    4. 作成したディレクトリに JAR ファイルをアップロードします。

      作成したディレクトリに移動します。[オブジェクトのアップロード] をクリックします。[アップロードするファイル] セクションで、[ファイルの選択] をクリックし、SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar ファイルを追加します。次に、[オブジェクトのアップロード] をクリックします。

  3. JAR ファイルを参照します。

    1. JAR ファイルを参照するために使用するコードを記述します。

      EMR Spark ノードの設定タブで、JAR ファイルを参照するために使用するコードを記述します。

      spark-submit --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn ossref://onaliyun-bucket-2/emr/jars/SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar

      パラメーターの説明:

      パラメーター

      説明

      class

      実行するメインクラスの完全名。

      master

      Spark アプリケーションの実行モード。

      ossref ファイルパス

      フォーマット: ossref://{endpoint}/{bucket}/{object}

      • endpoint: OSS のエンドポイント。endpoint パラメーターが空の場合、現在の EMR クラスターと同じリージョンにある OSS バケット内のリソースのみを参照できます。

      • bucket: OSS でオブジェクトを保存するために使用されるコンテナー。各 bucket には一意の名前があります。OSS コンソールにログインして、現在のログインアカウント内のすべての Bucket を表示できます。

      • object: bucket に保存されているファイル名またはパス。

    2. EMR Spark ノードでタスクを実行します。

      コードを記述した後、image アイコンをクリックし、作成したサーバーレスリソースグループを選択して EMR Spark ノードでタスクを実行します。タスクの実行が完了したら、コンソールに表示される application ID (例: application_1730367929285_xxxx) を記録します。

    3. 結果を表示します。

      EMR Shell ノードを作成し、EMR Shell ノードで yarn logs -applicationId application_1730367929285_xxxx コマンドを実行して実行結果を表示します。

      image

(オプション) 高度なパラメーターの設定

現在のノードの設定タブの [高度な設定] タブで、Spark 固有のパラメーターを設定できます。パラメーターの設定方法の詳細については、「Spark 設定」をご参照ください。次の表に、さまざまなタイプの EMR クラスターで設定できる高度なパラメーターを示します。

DataLake クラスターまたはカスタムクラスター: EMR on ECS ページで作成

高度なパラメーター

説明

queue

ジョブがコミットされるスケジューリングキュー。デフォルト値: default。

DataWorks ワークスペースに EMR クラスターを登録する際に、ワークスペースレベルの [YARN キュー] を設定した場合、次の設定が適用されます。

  • [グローバル設定タスク優先][はい]選択した場合、EMR クラスターの登録時に設定された YARN キューが Spark タスクの実行に使用されます。

  • [グローバル設定タスク優先] で [はい] を選択しない場合、EMR Spark ノードに設定された YARN キューが Spark タスクの実行に使用されます。

EMR YARN の詳細については、「YARN スケジューラ」をご参照ください。EMR クラスターの登録時のキュー設定の詳細については、「グローバル YARN キューの設定」をご参照ください。

priority

優先度。デフォルト値: 1。

FLOW_SKIP_SQL_ANALYZE

SQL 文の実行方法。有効値:

  • true: 複数の SQL 文を一度に実行します。

  • false (デフォルト): 一度に 1 つの SQL 文のみを実行します。

説明

このパラメーターは、DataWorks ワークスペースの開発環境でのテストでのみ使用できます。

その他

  • [高度な設定] タブで、EMR Spark ノードにカスタム Spark パラメーターを追加することもできます (例: "spark.eventLog.enabled": false )。EMR Spark ノードのコードをコミットすると、DataWorks は --conf key=value 形式でカスタムパラメーターをコードに追加します。

  • グローバル Spark パラメーターを設定することもできます。詳細については、「グローバル Spark パラメーターの設定」をご参照ください。

    説明

    Ranger 権限コントロールを有効にする場合は、グローバル Spark パラメーターを設定する際に spark.hadoop.fs.oss.authorization.method=ranger を追加して、Ranger 権限コントロールを有効にする必要があります。

Hadoop クラスター: EMR on ECS ページで作成

高度なパラメーター

説明

queue

ジョブがコミットされるスケジューリングキュー。デフォルト値: default。

DataWorks ワークスペースに EMR クラスターを登録する際に、ワークスペースレベルの [YARN キュー] を設定した場合、次の設定が適用されます。

  • [グローバル設定タスク優先][はい]選択した場合、EMR クラスターの登録時に設定された YARN キューが Spark タスクの実行に使用されます。

  • [グローバル設定タスク優先] で [はい] を選択しない場合、EMR Spark ノードに設定された YARN キューが Spark タスクの実行に使用されます。

EMR YARN の詳細については、「YARN スケジューラ」をご参照ください。EMR クラスターの登録時のキュー設定の詳細については、「グローバル YARN キューの設定」をご参照ください。

priority

優先度。デフォルト値: 1。

FLOW_SKIP_SQL_ANALYZE

SQL 文の実行方法。有効値:

  • true: 複数の SQL 文を一度に実行します。

  • false: 一度に 1 つの SQL 文のみを実行します。

説明

このパラメーターは、DataWorks ワークスペースの開発環境でのテストでのみ使用できます。

USE_GATEWAY

現在のノードでジョブをコミットするためにゲートウェイクラスターを使用するかどうかを指定します。有効値:

  • true: ゲートウェイクラスターを使用してジョブをコミットします。

  • false: ゲートウェイクラスターを使用せずにジョブをコミットします。ジョブは自動的にマスターノードにコミットされます。

説明

ノードが属する EMR クラスターがゲートウェイクラスターに関連付けられていないが、USE_GATEWAY パラメーターが true に設定されている場合、ジョブのコミットに失敗する可能性があります。

その他

  • [高度な設定] タブで、EMR Spark ノードにカスタム Spark パラメーターを追加することもできます (例: "spark.eventLog.enabled": false )。EMR Spark ノードのコードをコミットすると、DataWorks は --conf key=value 形式でカスタムパラメーターをコードに追加します。

  • グローバル Spark パラメーターを設定することもできます。詳細については、「グローバル Spark パラメーターの設定」をご参照ください。

    説明

    Ranger 権限コントロールを有効にする場合は、グローバル Spark パラメーターを設定する際に spark.hadoop.fs.oss.authorization.method=ranger を追加して、Ranger 権限コントロールを有効にする必要があります。

Spark クラスター: EMR on ACK ページで作成

高度なパラメーター

説明

queue

このパラメーターはサポートされていません。

priority

このパラメーターはサポートされていません。

FLOW_SKIP_SQL_ANALYZE

SQL 文の実行方法。有効値:

  • true: 複数の SQL 文を一度に実行します。

  • false: 一度に 1 つの SQL 文のみを実行します。

説明

このパラメーターは、DataWorks ワークスペースの開発環境でのテストでのみ使用できます。

その他

  • [高度な設定] タブで、EMR Spark ノードにカスタム Spark パラメーターを追加することもできます (例: "spark.eventLog.enabled": false )。EMR Spark ノードのコードをコミットすると、DataWorks は --conf key=value 形式でカスタムパラメーターをコードに追加します。

  • グローバル Spark パラメーターを設定することもできます。詳細については、「グローバル Spark パラメーターの設定」をご参照ください。

EMR Serverless Spark クラスター

パラメーター設定の詳細については、「Use the spark-submit CLI to submit a Spark job」トピックの「ステップ 3: Spark ジョブの送信」セクションをご参照ください。

高度なパラメーター

説明

queue

ジョブがコミットされるスケジューリングキュー。デフォルト値: dev_queue。

priority

優先度。デフォルト値: 1。

FLOW_SKIP_SQL_ANALYZE

SQL 文の実行方法。有効値:

  • true: 複数の SQL 文を一度に実行します。

  • false: 一度に 1 つの SQL 文のみを実行します。

説明

このパラメーターは、DataWorks ワークスペースの開発環境でのテストでのみ使用できます。

SERVERLESS_RELEASE_VERSION

Spark エンジンのバージョン。デフォルトでは、[EMR クラスターの登録] ページの [デフォルトのエンジンバージョン] パラメーターで指定された値が使用されます。[EMR クラスターの登録] ページに移動するには、次の操作を実行します。[SettingCenter] ページに移動します。左側のナビゲーションウィンドウで、[クラスター管理] をクリックします。[クラスター管理] ページで、[クラスターの登録] をクリックし、[クラスタータイプの選択] ダイアログボックスで [E-MapReduce] を選択します。このパラメーターを設定して、さまざまなタイプのタスクに異なるエンジンバージョンを指定できます。

SERVERLESS_QUEUE_NAME

リソースキュー。デフォルトでは、[EMR クラスターの登録] ページの [デフォルトのリソースキュー] パラメーターで指定された値が使用されます。 リソースの分離と管理の要件を満たすためにキューを追加できます。詳細については、「リソースキューの管理」をご参照ください。

その他

  • [高度な設定] タブで、EMR Spark ノードにカスタム Spark パラメーターを追加することもできます (例: "spark.eventLog.enabled": false )。EMR Spark ノードのコードをコミットすると、DataWorks は --conf key=value 形式でカスタムパラメーターをコードに追加します。

  • グローバル Spark パラメーターを設定することもできます。詳細については、「グローバル Spark パラメーターの設定」をご参照ください。

SQL 文の実行

  1. 上部のツールバーの 高级运行 アイコンをクリックします。[パラメーター] ダイアログボックスで、作成したスケジューリング用のリソースグループを選択し、[実行] をクリックします。

    説明
    • インターネットまたは VPC (Virtual Private Cloud) 経由で計算ソースにアクセスする場合は、計算ソースに接続されているスケジューリング用のリソースグループを使用します。詳細については、「ネットワーク接続ソリューション」をご参照ください。

    • 後続の操作でリソースグループを変更する場合は、高级运行 (パラメーターを指定して実行) アイコンをクリックして、[パラメーター] ダイアログボックスでリソースグループを変更できます。

    • EMR Spark ノードを使用してデータをクエリする場合、最大 10,000 件のデータレコードを返すことができ、返されるデータレコードの合計サイズは 10 MB を超えることはできません。

  2. 上部のツールバーの 保存 アイコンをクリックして、SQL 文を保存します。

  3. オプション。スモークテストを実行します。

    ノードをコミットするとき、またはノードをコミットした後に、開発環境でノードのスモークテストを実行できます。詳細については、「スモークテストの実行」をご参照ください。

ステップ 3: スケジュールプロパティの設定

システムがノード上のタスクを定期的に実行するようにしたい場合は、ノードの設定タブの右側のナビゲーションウィンドウで [プロパティ] をクリックして、ビジネス要件に基づいてタスクのスケジュールプロパティを設定できます。詳細については、「概要」をご参照ください。

説明

タスクをコミットする前に、[プロパティ] タブで [再実行] および [親ノード] パラメーターを設定する必要があります。

ステップ 4: タスクのデプロイ

ノード上のタスクが設定された後、タスクをコミットしてデプロイする必要があります。タスクをコミットしてデプロイすると、システムはスケジューリング設定に基づいて定期的にタスクを実行します。

  1. 上部のツールバーの 保存 アイコンをクリックして、タスクを保存します。

  2. 上部のツールバーの 提交 アイコンをクリックして、タスクをコミットします。

    [送信] ダイアログボックスで、[変更の説明] パラメーターを設定します。次に、ビジネス要件に基づいて、タスクをコミットした後にタスクコードをレビューするかどうかを決定します。

    説明
    • タスクをコミットする前に、[プロパティ] タブで [再実行] および [親ノード] パラメーターを設定する必要があります。

    • コードレビュー機能を使用して、タスクのコード品質を確保し、無効なタスクコードによるタスク実行エラーを防ぐことができます。コードレビュー機能を有効にすると、コミットされたタスクコードは、タスクコードがコードレビューに合格した後にのみデプロイできます。詳細については、「コードレビュー」をご参照ください。

標準モードのワークスペースを使用する場合、タスクをコミットした後に本番環境にタスクをデプロイする必要があります。ノードにタスクをデプロイするには、ノードの設定タブの右上隅にある [デプロイ] をクリックします。詳細については、「ノードのデプロイ」をご参照ください。

次のステップ

タスクがデプロイされると、自動的にオペレーションセンターに追加されます。オペレーションセンターでタスクの実行ステータスを表示したり、手動でタスクの実行をトリガーしたりできます。詳細については、「オペレーションセンター」をご参照ください。

よくある質問

  • DataWorks 環境を準備して EMR Hive ジョブを送信した後、java.net.ConnectException: Connection timed out (Connection timed out) エラーが発生します。

    • EMR クラスターと DataWorks 環境がドキュメントの要件どおりに正しく設定されているかどうかを確認し、DataWorks リソースグループと EMR が同じ VPC と vSwitch に関連付けられているかどうかを確認します。

    • EMR クラスターのセキュリティグループルールを確認して、ECS インスタンスのポート 10000 が開いていることを確認します。詳細については、「セキュリティグループの管理」をご参照ください。DataWorks で他のコンポーネントのジョブを送信する場合は、対応する ECS ポートを開く必要があります。詳細については、「オープンソースコンポーネントの一般的に使用されるポート」をご参照ください。

リファレンス

  • タスクを定期的に実行するようにスケジュールする必要がある場合は、スケジューリングサイクル、スケジューリングの依存関係、スケジューリングパラメーターなど、タスクのスケジューリング関連のプロパティを定義する必要があります。詳細については、「ノードのスケジューリング設定」をご参照ください。

  • タスクで複雑な文字列処理や算術演算が必要な場合は、DataWorks でユーザー定義関数を作成できます。詳細については、「EMR 関数の作成」をご参照ください。