すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:データを同期する

最終更新日:Oct 21, 2025

このトピックでは、このトピックのチュートリアルで提供される基本的なユーザー情報と Web サイトのアクセスログにアクセスするために、HttpFile および MySQL データソースを追加する方法について説明します。このトピックでは、データをプライベート Object Storage Service (OSS) データソースに同期するためのデータ同期タスクを設定する方法、および E-MapReduce (EMR) Hive ノードを使用してテーブルを作成し、同期されたデータをクエリする方法についても説明します。

前提条件

  • 環境が準備されています。詳細については、「環境を準備する」をご参照ください。

  • Elastic Compute Service (ECS) コンソールでセキュリティグループルールが追加され、ネットワーク接続が確保されています。ECS インスタンスのポート 10000 を使用して DataWorks に接続し、[認証オブジェクト] をリソースグループが関連付けられている vSwitch の CIDR ブロックに設定できます。詳細については、「セキュリティグループルールを追加する」をご参照ください。

ステップ 1: データソースを追加する

後続の操作を実行するには、HttpFile データソース、MySQL データソース、および OSS データソースを DataWorks ワークスペースに追加する必要があります。

  • HttpFile データソースは、ユーザーの Web サイトのアクセスログにアクセスするために使用されます。

  • MySQL データソースは、基本的なユーザー情報にアクセスするために使用されます。

  • OSS データソースは、HttpFile および MySQL データソースから取得したテストデータを保存するために使用されます。

HttpFile データソースを追加する

  1. [データソース] ページに移動します。

    1. DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[その他] > [管理センター] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターへ] をクリックします。

    2. [設定センター] ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。

  2. [データソース] ページの左上隅にある [データソースの追加] をクリックします。[データソースの追加] ダイアログボックスで、[HttpFile] をクリックします。

  3. [HttpFile データソースの追加] ページで、パラメーターを設定します。このチュートリアルでは、開発環境と本番稼働環境でサンプル値が使用されます。

    パラメーター

    説明

    データソース名

    データソースの名前。この例では、user_behavior_analysis_httpfile が使用されます。

    データソースの説明

    データソースの説明。データソースは DataWorks のユースケース専用に提供され、提供されたテストデータにアクセスするためのバッチ同期タスクのソースとして使用されます。データソースは、データ同期シナリオでのデータ読み取り専用です。

    URL

    開発環境と本番稼働環境の URL フィールドに https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com を入力します

  4. 目的のリソースグループを見つけ、[接続ステータス (開発環境)][接続ステータス (本番稼働環境)] 列で [ネットワーク接続のテスト] をそれぞれクリックします。ネットワーク接続テストが成功すると、対応する列に [接続済み] が表示されます。

    重要

    少なくとも 1 つのリソースグループが [接続可能] であることを確認してください。そうでない場合、コードレス UI を使用してデータソースのデータ同期タスクを設定することはできません。

  5. [作成の完了] をクリックします。

MySQL データソースを追加する

  1. [設定センター] ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。[データソース] ページの左上隅にある [データソースの追加] をクリックします。

  2. [データソースの追加] ダイアログボックスで、[MySQL] を選択します。

  3. [MySQL データソースの追加] ページで、パラメーターを設定します。次の表にパラメーターを示します。この例では、開発環境と本番稼働環境でサンプル値が使用されます。

    パラメーター

    説明

    データソース名

    データソースの名前。この例では、user_behavior_analysis_mysql が使用されます。

    データソースの説明

    データソースの説明。データソースは DataWorks のユースケース専用に提供され、提供されたテストデータにアクセスするためのバッチ同期タスクのソースとして使用されます。データソースは、データ同期シナリオでのデータ読み取り専用です。

    設定モード

    [接続文字列モード] を選択します。

    接続アドレス

    • ホスト IP アドレス: rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com を入力します。

    • ポート番号: 3306 を入力します。

    データベース名

    データベースの名前。この例では、workshop が使用されます。

    ユーザー名

    ユーザー名。この例では、workshop が使用されます。

    パスワード

    パスワード。この例では、workshop#2017 が使用されます。

    認証方法

    [認証なし] を選択します。

  4. 目的のリソースグループを見つけ、[接続ステータス (開発環境)][接続ステータス (本番稼働環境)] 列で [ネットワーク接続のテスト] をそれぞれクリックします。ネットワーク接続テストが成功すると、対応する列に [接続済み] が表示されます。

  5. [作成の完了] をクリックします。

OSS データソースを追加する

この例では、MySQL データソースの基本ユーザー情報と HttpFile データソースの Web サイトアクセスログがプライベート OSS データソースに同期されます。

  1. [設定センター] ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。[データソース] ページの左上隅にある [データソースの追加] をクリックします。

  2. [データソースの追加] ダイアログボックスで、[OSS] を選択します。

  3. [OSS データソースの追加] ページで、パラメーターを設定します。この例では、開発環境と本番稼働環境でサンプル値が使用されます。

    パラメーター

    説明

    データソース名

    データソースの名前。この例では、test_g が使用されます。

    データソースの説明

    データソースの説明。

    アクセスモード

    [AccessKey モード] を選択します。

    AccessKey ID

    DataWorks へのログインに使用するアカウントの AccessKey ID。AccessKey ページに移動して AccessKey ID をコピーできます。

    AccessKey シークレット

    DataWorks へのログインに使用するアカウントの AccessKey シークレット。

    重要

    AccessKey シークレットは作成時にのみ表示されます。作成後に AccessKey シークレットを表示することはできません。機密を保持してください。AccessKey ペアが漏洩または紛失した場合は、AccessKey ペアを削除し、新しい AccessKey ペアを作成してください。

    エンドポイント

    OSS のエンドポイント。この例では、http://oss-cn-shanghai-internal.aliyuncs.com が使用されます。

    バケット

    環境を準備したときに作成した OSS バケットの名前。この例では、バケット名は dw-emr-demo です。

  4. 目的のリソースグループを見つけ、[接続ステータス (開発環境)][接続ステータス (本番稼働環境)] 列で [ネットワーク接続のテスト] をそれぞれクリックします。ネットワーク接続テストが成功すると、対応する列に [接続済み] が表示されます。

    説明

    少なくとも 1 つのリソースグループが [接続可能] であることを確認してください。そうでない場合、コードレス UI を使用してデータソースのデータ同期タスクを設定することはできません。

  5. [作成の完了] をクリックします。

ステップ 2: データ同期タスクを設定する

  1. [データソース] ページで、左上隅の 图标 アイコンをクリックし、[すべての製品] > [データ開発とタスク操作] > [DataStudio] を選択します。

  2. [スケジュールされたワークフロー] ペインで、[ビジネスフロー] を右クリックし、[ワークフローの作成] を選択します。

  3. [ワークフローの作成] ダイアログボックスで、[ワークフロー名] パラメーターを workshop_emr に設定し、[作成] をクリックします。

  4. 新しいワークフローをダブルクリックしてワークフロー設定タブに移動し、ゼロロードノードと 2 つのバッチ同期タスクを作成します。

    1. [ノードの作成] をクリックし、[全般] セクションの [ゼロロードノード] を右側のキャンバスにドラッグします。[ノードの作成] ダイアログボックスで、[名前] パラメーターを workshop_start_emr に設定し、[確認] をクリックします。

    2. [ノードの作成] をクリックし、[Data Integration] セクションの [オフライン同期] を右側のキャンバスにドラッグします。同じ方法で、ods_raw_log_d_2oss_emrods_user_info_d_2oss_emr という名前の 2 つの [バッチ同期ノード] を作成します。2 つのノードは、MySQL の基本ユーザー情報と OSS の Web サイトアクセスログを同期するために使用されます。次に、[確認] をクリックします。

  5. ワークフロー設定タブで、有向線をドラッグして、workshop_start_emr ノードを 2 つのバッチ同期ノードの先祖ノードとして設定します。

    image

ステップ 3: データ同期ノードを設定する

ワークフローの初期ノードを設定する

  1. [スケジュールされたワークフロー] ペインで、ワークフロー内のゼロロードノードをダブルクリックします。ノード設定タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。

  2. スケジューリングプロパティを設定します。

    セクション

    スクリーンショット

    説明

    スケジュール

    image

    • ゼロロードノードのスケジューリング時間は 00:30 に設定されます。ゼロロードノードは、毎日 00:30 に現在のワークフローを実行するようにトリガーします。

    • [再実行] パラメーターを [実行ステータスに関係なく許可] に設定します。

    スケジューリング依存関係

    image

    ゼロロードノード workshop_start_emr には先祖ノードがありません。この場合、ゼロロードノードを [ワークスペースのルートノード] の子孫ノードとして設定できます。ルートノードを使用して、ゼロロードノード workshop_start_emr を実行するようにトリガーできます。

    ワークスペースのルートノードは、ワークスペース名_root 形式で名前が付けられます。

  3. 設定が完了したら、左上隅の 保存 アイコンをクリックします。

バッチ同期ノードを設定する

  1. MySQL データソースの基本ユーザー情報を、作成した OSS バケットに同期します。

    1. [DataStudio] ページで、ods_user_info_d_2oss_emr ノードをダブルクリックしてノード設定ページに移動します。

    2. 使用するリソースグループとデータソース間のネットワーク接続を確立します。

      ネットワーク接続とリソースの設定が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。

      パラメーター

      説明

      ソース

      • ソース: 値を MySQL に設定します。

      • データソース名: 値を user_behavior_analysis_mysql に設定します。

      リソースグループ

      購入したサーバーレスリソースグループを選択します。

      宛先

      • 宛先: 値を OSS に設定します。

      • データソース名: 追加したプライベート OSS データソースの名前を指定する test_g に設定します。

    3. データ同期ノードを設定します。

      パラメーター

      説明

      ソース

      • [テーブル]: データソース内の ods_user_info_d テーブルを選択します。

      • [分割キー]: 読み取るデータの分割キー。プライマリキーまたはインデックス付き列を分割キーとして使用することをお勧めします。INTEGER 型のフィールドのみがサポートされます。この例では、uid が使用されます。

      宛先

      • [テキストタイプ]: 値を text に設定します。

      • [オブジェクト名 (パスを含む)]: OSS オブジェクトのパス。OSS バケットに作成したフォルダーに基づいてこのパラメーターを設定します。この例では、ods_user_info_d/user_${bizdate}/user_${bizdate}.txt が入力されます。ods_user_info_d は、OSS バケットに作成したフォルダーの名前です。$bizdate は前日の日付を示します。

      • [列区切り文字]: | を入力します。

    4. スケジューリングプロパティを設定します。

      ノード設定タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[プロパティ] タブで、スケジューリングプロパティとノードに関する基本情報を設定できます。次の表に、スケジューリングパラメーターを示します。

      セクション

      説明

      スクリーンショット

      スケジューリングパラメーター

      [スケジューリングパラメーター] セクションで [パラメーターの追加] をクリックします。テーブルに表示される行で、スケジューリングパラメーターとスケジューリングパラメーターの値を指定できます。

      • [パラメーター名] を bizdate に設定します。

      • [パラメーター値] を $[yyyymmdd-1] に設定します。

      image

      スケジュール

      [再実行] パラメーターを [実行ステータスに関係なく許可] に設定します。

      image

      依存関係

      生成されたテーブルが現在のノードの出力テーブルとして使用されていることを確認してください。

      出力テーブルは、ワークスペース名.ノード名 形式で名前が付けられます。

      image

    5. 設定が完了したら、ツールバーの 保存 アイコンをクリックします。

  1. HttpFile データソースの Web サイトアクセスログを、作成した OSS バケットに同期します。

    1. [DataStudio] ページで、ods_raw_log_d_2oss_emr ノードをダブルクリックしてノード設定ページに移動します。

    2. 使用するリソースグループとデータソース間のネットワーク接続を確立します。

      ネットワーク接続とリソースの設定が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。

      パラメーター

      説明

      ソース

      • ソース: 値を HttpFile に設定します。

      • データソース名: 値を user_behavior_analysis_httpfile に設定します。

      リソースグループ

      購入したサーバーレスリソースグループを選択します。

      宛先

      • 宛先: 値を OSS に設定します。

      • データソース名: 追加したプライベート OSS データソースの名前を指定する test_g に設定します。

    3. データ同期ノードを設定します。

      パラメーター

      説明

      ソース

      • [ファイルパス]: 値を /user_log.txt に設定します。

      • [テキストタイプ]: 値を text に設定します。

      • [列区切り文字]: | を入力します。

      • [圧縮形式]: OSS オブジェクトの圧縮形式。有効な値: None、Gzip、Bzip2、および Zip。None を選択します。

      • [ヘッダーのスキップ]: 値を [いいえ] に設定します。

      宛先

      • [テキストタイプ]: 値を text に設定します。

      • [オブジェクト名 (パスを含む)]: OSS オブジェクトのパス。OSS バケットに作成したフォルダーに基づいてこのパラメーターを設定します。この例では、ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt が入力されます。ods_raw_log_d は、OSS バケットに作成したフォルダーの名前です。$bizdate は前日の日付を示します。

      • [列区切り文字]: | を入力します。

    4. スケジューリングプロパティを設定します。

      ノード設定タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[プロパティ] タブで、スケジューリングプロパティとノードに関する基本情報を設定できます。次の表に、スケジューリングパラメーターを示します。

      セクション

      説明

      スクリーンショット

      スケジューリングパラメーター

      [スケジューリングパラメーター] セクションで [パラメーターの追加] をクリックします。テーブルに表示される行で、スケジューリングパラメーターとスケジューリングパラメーターの値を指定できます。

      • [パラメーター名] を bizdate に設定します。

      • [パラメーター値] を $[yyyymmdd-1] に設定します。

      image

      スケジュール

      [再実行] パラメーターを [実行ステータスに関係なく許可] に設定します。

      image

      依存関係

      生成されたテーブルが現在のノードの出力テーブルとして使用されていることを確認してください。

      出力テーブルは、ワークスペース名.ノード名 形式で名前が付けられます。

      image

    5. 設定が完了したら、ツールバーの 保存 アイコンをクリックします。

EMR テーブルを作成してデータを同期する

2 つの EMR テーブル ods_user_info_d_emrods_raw_log_d_emr を作成する必要があります。その後、テーブルを使用して、同期された ApsaraDB RDS for MySQL の基本ユーザー情報と OSS の Web サイトアクセスログをクエリできます。

  1. [スケジュールされたワークフロー] ペインで、新しいワークフローをクリックし、[EMR] を右クリックしてから、[ノードの作成] > [EMR Hive] を選択します。

  2. [ノードの作成] ダイアログボックスで、[名前] パラメーターを設定し、[確認] をクリックします。

    2 つの EMR Hive ノード ods_user_info_d_emrods_raw_log_d_emr を作成する必要があります。これらは関連テーブルの作成に使用されます。次に、ワークフロー設定タブで線を描画して、ノード間の依存関係を設定する必要があります。次の図に例を示します。

    image

  3. 各 EMR Hive ノードの設定タブで、CREATE TABLE 文を入力し、[プロパティ] タブの [リソースグループ] セクションで [リソースグループ] パラメーターにサーバーレスリソースグループを選択します。次に、[保存] をクリックし、各ノードの CREATE TABLE 文を [実行] します。

    • ods_user_info_d_emr テーブルを作成します。

      ods_user_info_d_emr ノードをダブルクリックします。表示されるノード設定タブで、ods_user_info_d_emr ノードを設定します。

      1. SQL コードを編集します。

        CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_emr
        (
            `uid` STRING COMMENT 'ユーザー ID',
            `gender` STRING COMMENT '性別',
            `age_range` STRING COMMENT '年齢範囲',
            `zodiac` STRING COMMENT '星座'
        ) PARTITIONED BY (
          dt STRING
        )
        ROW FORMAT  delimited fields terminated by '|'
        LOCATION 'oss://dw-emr-demo/ods_user_info_d/';
        
        ALTER TABLE ods_user_info_d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}')
        LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/';
        説明

        上記のコードでは、LOCATION にサンプルパスが使用されています。このパスは、関連するバッチ同期ノードを設定するときの [オブジェクト名 (パスを含む)] パラメーターの値に基づいています。LOCATION パラメーターを作成したフォルダーのパスに設定する必要があります。dw-emr-demo は、環境を準備したときに作成した OSS バケットのドメイン名です。

      2. ノードのスケジューリングプロパティを設定します

        セクション

        説明

        スクリーンショット

        スケジューリングパラメーター

        [スケジューリングパラメーター] セクションで [パラメーターの追加] をクリックします。テーブルに表示される行で、スケジューリングパラメーターとスケジューリングパラメーターの値を指定できます。

        • [パラメーター名] を bizdate に設定します。

        • [パラメーター値] を $[yyyymmdd-1] に設定します。

        image

        スケジュール

        [再実行] パラメーターを [実行ステータスに関係なく許可] に設定します。

        image

        依存関係

        生成されたテーブルが現在のノードの出力テーブルとして使用されていることを確認してください。

        出力テーブルは、ワークスペース名.ノード名 形式で名前が付けられます。

        image

      3. 設定が完了したら、image アイコンをクリックします。

    • ods_raw_log_d_emr テーブルを作成します。

      ods_raw_log_d_emr ノードをダブルクリックします。表示されるノード設定タブで、ods_raw_log_d_emr ノードを設定します。

      1. SQL コードを編集します。

        -- Web サイトのアクセスログを格納するために使用されるテーブルを作成します。
        CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_emr
        (
          `col` STRING
        ) PARTITIONED BY (
          dt STRING
        );
        ALTER TABLE ods_raw_log_d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}')
        LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/';
        説明

        上記のコードでは、LOCATION にサンプルパスが使用されています。このパスは、関連するバッチ同期ノードを設定するときの [オブジェクト名 (パスを含む)] パラメーターの値に基づいています。LOCATION パラメーターを作成したフォルダーのパスに設定する必要があります。dw-emr-demo は、環境を準備したときに作成した OSS バケットの名前です。

      2. ノードのスケジューリングプロパティを設定します

        セクション

        説明

        スクリーンショット

        スケジューリングパラメーター

        [スケジューリングパラメーター] セクションで [パラメーターの追加] をクリックします。テーブルに表示される行で、スケジューリングパラメーターを設定し、スケジューリングパラメーターの値を指定できます。

        • [パラメーター名] を bizdate に設定します。

        • [パラメーター値] を $[yyyymmdd-1] に設定します。

        image

        スケジュール

        [再実行] パラメーターを [実行ステータスに関係なく許可] に設定します。

        image

        依存関係

        生成されたテーブルが現在のノードの出力テーブルとして使用されていることを確認してください。

        出力テーブルは、ワークスペース名.ノード名 形式で名前が付けられます。

        image

      3. 設定が完了したら、image アイコンをクリックします。

ステップ 4: ワークフロー内のノードを実行し、結果を表示する

ワークフローを実行する

  1. DataStudio ページで、[ビジネスフロー] の下の workshop_emr ワークフローをダブルクリックします。ワークフローの設定タブで、上部のツールバーにある image.png アイコンをクリックして、ノード間のスケジューリング依存関係に基づいてワークフロー内のノードを実行します。

  2. ステータスを確認します。

    • ノードのステータスを表示します。

      • ノードが image.png 状態の場合、同期プロセスは正常です。

      • ノードが image 状態でエラーメッセージ "java.net.ConnectException: Connection timed out (Connection timed out)" が表示された場合は、ECS コンソールでセキュリティグループルールを追加し、ECS インスタンスのポート 10000 を使用して DataWorks に接続し、[認証オブジェクト] をリソースグループが関連付けられている vSwitch の CIDR ブロックに設定する必要があります。次の手順を実行して vSwitch の CIDR ブロックを取得できます: [リソースグループ] ページに移動し、目的のリソースグループを見つけて、[アクション] 列の [ネットワーク設定] をクリックします。表示されるページの [VPC バインディング] タブで、vSwitch の CIDR ブロックを取得します。セキュリティグループルールの追加方法の詳細については、「セキュリティグループルールを追加する」をご参照ください。

    • ノード実行ログの表示: [ods_user_info_d_emr] または [ods_raw_log_d_emr] ノードを右クリックし、[ログの表示] を選択します。ログに次の図に示す情報が表示された場合、ノードが実行され、データが同期されます。

      image.png

同期結果のクエリ

  1. アドホッククエリを作成します。

    [DataStudio] ページの左側のナビゲーションウィンドウで、image.png アイコンをクリックします。[アドホッククエリ] ペインで、[アドホッククエリ] を右クリックし、[ノードの作成] > [EMR Hive] を選択します。

  2. 同期結果テーブルをクエリします。

    次の SQL 文を実行して、データ書き込み結果を確認します。ods_raw_log_d_emr および ods_user_info_d_emr テーブルにインポートされたレコードの数を表示します。

    -- クエリ文で、パーティションキーの値をノードのデータタイムスタンプに変更します。たとえば、ノードが 2019 年 11 月 7 日に実行された場合、データタイムスタンプは 20191106 で、これはノードが実行される 1 日前です。
    SELECT * from ods_user_info_d_emr where dt=データタイムスタンプ; 
    SELECT * from ods_raw_log_d_emr where dt=データタイムスタンプ; 

次のステップ

このチュートリアルに基づいてデータを同期する方法を理解したら、次のチュートリアルに進むことができます。次のチュートリアルでは、同期されたデータを計算および分析する方法を学びます。詳細については、「データを処理する」をご参照ください。