すべてのプロダクト
Search
ドキュメントセンター

DataWorks:データの同期

最終更新日:Mar 25, 2025

このトピックでは、このトピックのチュートリアルで提供されている基本的なユーザー情報と Web サイトアクセスログにアクセスするために、HttpFile データソースと MySQL データソースを追加する方法について説明します。 また、データ同期タスクを構成してデータを非公開の Object Storage Service (OSS) データソースに同期し、E-MapReduce (EMR) Hive ノードを使用してテーブルを作成し、同期されたデータに対してクエリを実行する方法についても説明します。

前提条件

  • 環境が準備されていること。 詳細については、「環境を準備する」をご参照ください。

  • ネットワーク接続を確保するために、Elastic Compute Service (ECS) コンソールでセキュリティグループルールが追加されていること。 ECS インスタンスのポート 10000 を使用して DataWorks に接続し、承認オブジェクトをリソースグループが関連付けられている vSwitch の CIDR ブロックに設定できます。 詳細については、「セキュリティグループルールの追加」をご参照ください。

手順 1:データソースを追加する

後続の操作を実行するには、DataWorks ワークスペースに HttpFile データソース、MySQL データソース、および OSS データソースを追加する必要があります。

  • HttpFile データソースは、ユーザーの Web サイトアクセスログにアクセスするために使用されます。

  • MySQL データソースは、基本的なユーザー情報にアクセスするために使用されます。

  • OSS データソースは、HttpFile データソースと MySQL データソースから取得したテストデータを保存するために使用されます。

HttpFile データソースを追加する

  1. データソースページに移動します。

    1. DataWorks コンソールにログインします。 上部のナビゲーションバーで、目的のリージョンを選択します。 左側のナビゲーションウィンドウで、[詳細] > [管理センター] を選択します。 表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターに移動] をクリックします。

    2. SettingCenter ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。

  2. データソースページの左上隅にある [データソースの追加] をクリックします。 [データソースの追加] ダイアログボックスで、[httpfile] をクリックします。

  3. [httpfile データソースの追加] ページで、パラメータを構成します。 このチュートリアルでは、開発環境と本番環境でサンプル値が使用されます。

    パラメータ

    説明

    データソース名

    データソースの名前。 この例では、user_behavior_analysis_httpfile が使用されます。

    データソースの説明

    データソースの説明。 データソースは DataWorks のユースケース専用に提供され、提供されたテストデータにアクセスするためのバッチ同期タスクのソースとして使用されます。 データソースは、データ同期のシナリオでのデータ読み取り専用です。

    URL

    開発環境と本番環境の URL フィールドに https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com と入力します。

  4. 目的のリソースグループを見つけ、[接続ステータス(開発環境)] 列と [接続ステータス(本番環境)] 列で [ネットワーク接続のテスト] をそれぞれクリックします。 ネットワーク接続テストが成功すると、対応する列に [接続済み] と表示されます。

    重要

    少なくとも 1 つのリソースグループが [接続可能] であることを確認してください。 そうでない場合、コードレス ユーザーインターフェース (UI) を使用してデータソースのデータ同期タスクを構成することはできません。

  5. [作成完了] をクリックします。

MySQL データソースを追加する

  1. [settingcenter] ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。 データソースページの左上隅にある [データソースの追加] をクリックします。

  2. データソースを追加[mysql] を選択します。

  3. [mysql データソースの追加] ページで、パラメータを構成します。 次の表にパラメータを示します。 この例では、開発環境と本番環境でサンプル値が使用されます。

    パラメータ

    説明

    データソース名

    データソースの名前。 この例では、user_behavior_analysis_mysql が使用されます。

    データソースの説明

    データソースの説明。 データソースは DataWorks のユースケース専用に提供され、提供されたテストデータにアクセスするためのバッチ同期タスクのソースとして使用されます。 データソースは、データ同期のシナリオでのデータ読み取り専用です。

    構成モード

    [接続文字列モード] を選択します。

    接続アドレス

    • ホスト IP アドレス:rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com と入力します。

    • ポート番号:3306 と入力します。

    データベース名

    データベースの名前。 この例では、workshop が使用されます。

    ユーザー名

    ユーザー名。 この例では、workshop が使用されます。

    パスワード

    パスワード。 この例では、workshop#2017 が使用されます。

    認証方法

    認証なしを選択します。

  4. 目的のリソースグループを見つけ、[接続ステータス(開発環境)] 列と [接続ステータス(本番環境)] 列で [ネットワーク接続のテスト] をそれぞれクリックします。 ネットワーク接続テストが成功すると、対応する列に [接続済み] と表示されます。

  5. [作成完了] をクリックします。

OSS データソースを追加する

この例では、MySQL データソースの基本的なユーザー情報と HttpFile データソースの Web サイトアクセスログが、非公開の OSS データソースに同期されます。

  1. [settingcenter] ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。 データソースページの左上隅にある [データソースの追加] をクリックします。

  2. [データソースの追加] ダイアログボックスで、[OSS] を選択します。

  3. [OSS データソースの追加] ページで、パラメータを構成します。 この例では、開発環境と本番環境でサンプル値が使用されます。

    パラメータ

    説明

    データソース名

    データソースの名前。 この例では、test_g が使用されます。

    データソースの説明

    データソースの説明。

    アクセスモード

    [accesskey モード] を選択します。

    AccessKey ID

    DataWorks にログインするために使用されるアカウントの AccessKey ID。 AccessKey ページに移動して、AccessKey ID をコピーできます。

    Accesskey シークレット

    DataWorks にログインするために使用されるアカウントの AccessKey シークレット。

    重要

    AccessKey シークレットは、作成時にのみ表示されます。 作成後は AccessKey シークレットを表示できません。 機密として保管してください。 AccessKey ペアが漏洩または紛失した場合は、AccessKey ペアを削除し、「新しい AccessKey ペアを作成する」をご参照ください。

    エンドポイント

    OSS のエンドポイント。 この例では、http://oss-cn-shanghai-internal.aliyuncs.com が使用されます。

    バケット

    環境を準備したときに作成した OSS バケットの名前。 この例では、バケット名は dw-emr-demo です。

  4. 目的のリソースグループを見つけ、[接続ステータス(開発環境)] 列と [接続ステータス(本番環境)] 列で [ネットワーク接続のテスト] をそれぞれクリックします。 ネットワーク接続テストが成功すると、対応する列に [接続済み] と表示されます。

    説明

    少なくとも 1 つのリソースグループが [接続可能] であることを確認してください。 そうでない場合、コードレス UI を使用してデータソースのデータ同期タスクを構成することはできません。

  5. [作成完了] をクリックします。

手順 2:データ同期タスクを構成する

  1. データソースページで、左上隅にある 图标 アイコンをクリックし、[すべてのプロダクト] > [データ開発とタスク操作] > [datastudio] を選択します。

  2. [スケジュールされたワークフロー] ウィンドウで、[ビジネスフロー] を右クリックし、[ワークフローの作成] を選択します。

  3. [ワークフローの作成] ダイアログボックスで、[ワークフロー名] パラメータを workshop_emr に設定し、[作成] をクリックします。

  4. 新しいワークフローをダブルクリックしてワークフロー構成タブに移動し、ゼロロードノードと 2 つのバッチ同期タスクを作成します。

    1. [ノードの作成] をクリックし、[全般] セクションの [ゼロロードノード] を右側のキャンバスにドラッグします。 [ノードの作成] ダイアログボックスで、[名前] パラメータを workshop_start_emr に設定し、[確認] をクリックします。

    2. [ノードの作成] をクリックし、[データ統合] セクションの [オフライン同期] を右側のキャンバスにドラッグします。 ods_raw_log_d_2oss_emrods_user_info_d_2oss_emr という名前の 2 つの[バッチ同期ノード] を同じ方法で作成します。 これら 2 つのノードは、MySQL の基本的なユーザー情報と OSS の Web サイトアクセスログを同期するために使用されます。 次に、[確認] をクリックします。

  5. ワークフロー構成タブで、有向線をドラッグして、workshop_start_emr ノードを 2 つのバッチ同期ノードの先祖ノードとして構成します。

    image

手順 3:データ同期ノードを構成する

ワークフローの初期ノードを構成する

  1. [スケジュールされたワークフロー] ウィンドウで、ワークフローのゼロロードノードをダブルクリックします。 ノード構成タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。

  2. スケジューリングプロパティを構成します。

    セクション

    スクリーンショット

    説明

    スケジュール

    image

    • ゼロロードノードのスケジュール時刻は 00:30 に設定されています。 ゼロロードノードは、毎日 00:30 に現在のワークフローの実行をトリガーします。

    • [再実行] パラメータを [実行ステータスに関係なく許可] に設定します。

    スケジューリングの依存関係

    image

    ゼロロードノード workshop_start_emr には先祖ノードがありません。 この場合、ゼロロードノードを [ワークスペースのルートノード] の子孫ノードとして構成できます。 ルートノードを使用して、ゼロロードノード workshop_start_emr の実行をトリガーできます。

    ワークスペースのルートノードは、ワークスペース名_root の形式で名前が付けられます。

  3. 構成が完了したら、左上隅にある 保存 アイコンをクリックします。

バッチ同期ノードを構成する

  1. MySQL データソースの基本的なユーザー情報を、作成した OSS バケットに同期します。

    1. [datastudio] ページで、ods_user_info_d_2oss_emr ノードをダブルクリックして、ノード構成ページに移動します。

    2. 使用するリソースグループとデータソース間にネットワーク接続を確立します。

      ネットワーク接続とリソースの構成が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。

      パラメータ

      説明

      ソース

      • ソース:値を MySQL に設定します。

      • データソース名:値を user_behavior_analysis_mysql に設定します。

      リソースグループ

      購入したサーバーレス リソースグループを選択します。

      接続先

      • 接続先:値を OSS に設定します。

      • データソース名:追加した非公開 OSS データソースの名前を指定する test_g に設定します。

    3. データ同期ノードを構成します。

      パラメータ

      説明

      ソース

      • [テーブル]:データソースの ods_user_info_d テーブルを選択します。

      • [分割キー]:読み取るデータの分割キー。 プライマリキーまたはインデックス付き列を分割キーとして使用することをお勧めします。 INTEGER 型のフィールドのみがサポートされています。 この例では、uid が使用されます。

      接続先

      • [テキストタイプ]:値を text に設定します。

      • [オブジェクト名(パスを含む)]:OSS オブジェクトのパス。 OSS バケットに作成したフォルダに基づいて、このパラメータを構成します。 この例では、ods_user_info_d/user_${bizdate}/user_${bizdate}.txt と入力します。 ods_user_info_d は、OSS バケットに作成したフォルダの名前です。 $bizdate は前日の日付を示します。

      • [列区切り文字]| と入力します。

    4. スケジューリングプロパティを構成します。

      ノード構成タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[プロパティ] タブで、スケジューリングのプロパティとノードに関する基本情報を構成できます。次の表に、スケジューリングパラメーターについて説明します。

      セクション

      説明

      スクリーンショット

      スケジューリングパラメータ

      [スケジューリングパラメータ] セクションの [パラメータの追加] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を指定できます。

      • パラメータ名に bizdate を設定します。

      • パラメータ値に $[yyyymmdd-1] を設定します。

      image

      スケジュール

      [再実行] パラメータを [実行ステータスに関係なく許可] に設定します。

      image

      依存関係

      生成されたテーブルが現在のノードの出力テーブルとして使用されていることを確認します。

      出力テーブルは、ワークスペース名.ノード名 の形式で名前が付けられます。

      image

    5. 構成が完了したら、ツールバーの 保存 アイコンをクリックします。

  1. HttpFile データソースの Web サイトアクセスログを、作成した OSS バケットに同期します。

    1. [datastudio] ページで、ods_raw_log_d_2oss_emr ノードをダブルクリックして、ノード構成ページに移動します。

    2. 使用するリソースグループとデータソース間にネットワーク接続を確立します。

      ネットワーク接続とリソースの構成が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。

      パラメータ

      説明

      ソース

      • ソース:値を HttpFile に設定します。

      • データソース名:値を user_behavior_analysis_httpfile に設定します。

      リソースグループ

      購入したサーバーレス リソースグループを選択します。

      接続先

      • 接続先:値を OSS に設定します。

      • データソース名:追加した非公開 OSS データソースの名前を指定する test_g に設定します。

    3. データ同期ノードを構成します。

      パラメータ

      説明

      ソース

      • [ファイルパス]:値を /user_log.txt に設定します。

      • [テキストタイプ]:値を text に設定します。

      • [列区切り文字]| と入力します。

      • [圧縮形式]:OSS オブジェクトの圧縮形式。 有効な値:なし、Gzip、Bzip2、および Zip。 なし を選択します。

      • [ヘッダーをスキップ]:値をいいえに設定します。

      接続先

      • [テキストタイプ]:値を text に設定します。

      • [オブジェクト名(パスを含む)]:OSS オブジェクトのパス。 OSS バケットに作成したフォルダに基づいて、このパラメータを構成します。 この例では、ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt と入力します。 ods_raw_log_d は、OSS バケットに作成したフォルダの名前です。 $bizdate は前日の日付を示します。

      • [列区切り文字]| と入力します。

    4. スケジューリングプロパティを構成します。

      ノード構成タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。 [プロパティ] タブで、スケジューリングプロパティとノードに関する基本情報を構成できます。 次の表に、スケジューリングパラメータを示します。

      セクション

      説明

      スクリーンショット

      スケジューリングパラメータ

      [スケジューリングパラメータ] セクションの [パラメータの追加] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を指定できます。

      • パラメータ名に bizdate を設定します。

      • パラメータ値に $[yyyymmdd-1] を設定します。

      image

      スケジュール

      [再実行] パラメータを [実行ステータスに関係なく許可] に設定します。

      image

      依存関係

      生成されたテーブルが現在のノードの出力テーブルとして使用されていることを確認します。

      出力テーブルは、ワークスペース名.ノード名 の形式で名前が付けられます。

      image

    5. 構成が完了したら、ツールバーの 保存 アイコンをクリックします。

EMR テーブルを作成してデータを同期する

ods_user_info_d_emrods_raw_log_d_emr の 2 つの EMR テーブルを作成する必要があります。 次に、これらのテーブルを使用して、ApsaraDB RDS for MySQL の同期された基本的なユーザー情報と OSS の Web サイトアクセスログをクエリできます。

  1. [スケジュールされたワークフロー] ウィンドウで、新しいワークフローをクリックし、[EMR] を右クリックして、[ノードの作成] > [EMR Hive] を選択します。

  2. [ノードの作成] ダイアログボックスで、[名前] パラメータを構成し、[確認] をクリックします。

    関連するテーブルを作成するために使用される ods_user_info_d_emrods_raw_log_d_emr の 2 つの EMR Hive ノードを作成する必要があります。 次に、ワークフロー構成タブで線を描画することにより、ノード間の依存関係を構成する必要があります。 次の図は例を示しています。

    image

  3. 各 EMR Hive ノードの構成タブで、CREATE TABLE 文を入力し、[プロパティ] タブの [リソースグループ] セクションの [リソースグループ] パラメータにサーバーレス リソースグループを選択します。 次に、[保存] をクリックし、各ノードの CREATE TABLE 文を [実行] します。

    • ods_user_info_d_emr テーブルを作成します。

      ods_user_info_d_emr ノードをダブルクリックします。 表示されるノード構成タブで、ods_user_info_d_emr ノードを構成します。

      1. SQL コードを編集します。

        CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_emr
        (
            `uid` STRING COMMENT 'ユーザーID',
            `gender` STRING COMMENT '性別',
            `age_range` STRING COMMENT '年齢層',
            `zodiac` STRING COMMENT '星座'
        ) PARTITIONED BY (
          dt STRING
        )
        ROW FORMAT  delimited fields terminated by '|'
        LOCATION 'oss://dw-emr-demo/ods_user_info_d/';
        
        ALTER TABLE ods_user_info_d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}')
        LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/';
        説明

        上記のコードでは、LOCATION にサンプルパスが使用されています。 パスは、関連するバッチ同期ノードを構成するときの [オブジェクト名(パスを含む)] パラメータの値に基づいています。 LOCATION パラメータを作成したフォルダのパスに設定する必要があります。 dw-emr-demo は、環境を準備したときに作成した OSS バケットのドメイン名です。

      2. ノードのスケジューリングプロパティを構成します

        セクション

        説明

        スクリーンショット

        スケジューリングパラメータ

        [スケジューリングパラメータ] セクションの [パラメータの追加] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を指定できます。

        • パラメータ名に bizdate を設定します。

        • パラメータ値に $[yyyymmdd-1] を設定します。

        image

        スケジュール

        [再実行] パラメータを [実行ステータスに関係なく許可] に設定します。

        image

        依存関係

        生成されたテーブルが現在のノードの出力テーブルとして使用されていることを確認します。

        出力テーブルは、ワークスペース名.ノード名 の形式で名前が付けられます。

        image

      3. 構成が完了したら、image アイコンをクリックします。

    • ods_raw_log_d_emr テーブルを作成します。

      ods_raw_log_d_emr ノードをダブルクリックします。 表示されるノード構成タブで、ods_raw_log_d_emr ノードを構成します。

      1. SQL コードを編集します。

        -- Web サイトアクセスログを保存するために使用されるテーブルを作成します。
        CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_emr
        (
          `col` STRING
        ) PARTITIONED BY (
          dt STRING
        );
        ALTER TABLE ods_raw_log_d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}')
        LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/';
        説明

        上記のコードでは、LOCATION にサンプルパスが使用されています。 パスは、関連するバッチ同期ノードを構成するときの [オブジェクト名(パスを含む)] パラメータの値に基づいています。 LOCATION パラメータを作成したフォルダのパスに設定する必要があります。 dw-emr-demo は、環境を準備したときに作成した OSS バケットの名前です。

      2. ノードのスケジューリングプロパティを構成します

        セクション

        説明

        スクリーンショット

        スケジューリングパラメータ

        [スケジューリングパラメータ] セクションの [パラメータの追加] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を構成できます。

        • パラメータ名に bizdate を設定します。

        • パラメータ値に $[yyyymmdd-1] を設定します。

        image

        スケジュール

        [再実行] パラメータを [実行ステータスに関係なく許可] に設定します。

        image

        依存関係

        生成されたテーブルが現在のノードの出力テーブルとして使用されていることを確認します。

        出力テーブルは、ワークスペース名.ノード名 の形式で名前が付けられます。

        image

      3. 構成が完了したら、image アイコンをクリックします。

手順 4:ワークフローのノードを実行し、結果を表示する

ワークフローを実行する

  1. DataStudio ページで、[ビジネスフロー] の下の workshop_emr ワークフローをダブルクリックします。 ワークフローの構成タブで、上部ツールバーの image.png アイコンをクリックして、ノード間のスケジューリングの依存関係に基づいてワークフローのノードを実行します。

  2. ステータスを確認します。

    • ノードのステータスを表示します。

      • ノードが image.png 状態の場合、同期プロセスは正常です。

      • ノードが image 状態であり、エラーメッセージ "java.net.ConnectException: Connection timed out (Connection timed out)" が表示される場合は、ECS コンソール でセキュリティグループルールを追加し、ECS インスタンスのポート 10000 を使用して DataWorks に接続し、承認オブジェクトをリソースグループに関連付けられている vSwitch の CIDR ブロックに設定する必要があります。 vSwitch CIDR ブロックを取得するには、次の手順を実行します。[リソースグループ] ページに移動し、目的のリソースグループを見つけ、[アクション] 列の [ネットワーク設定] をクリックします。 表示されるページの [VPC バインディング] タブで、vSwitch CIDR ブロックを取得します。 セキュリティグループルールの追加方法の詳細については、「セキュリティグループルールを追加する」をご参照ください。

    • ノード実行ログの表示: [ods_user_info_d_emr] ノードまたは [ods_raw_log_d_emr] ノードを右クリックし、[ログの表示] を選択します。次の図に示す情報がログに表示されている場合、ノードは実行中で、データは同期されています。

      image.png

同期の結果をクエリする

  1. アドホッククエリを作成します。

    [datastudio] ページの左側のナビゲーションウィンドウで、image.png アイコンをクリックします。 [Ad Hoc Query] ペインで、[Ad Hoc Query] を右クリックし、[ノードの作成] > [EMR Hive] を選択します。

  2. 同期結果テーブルをクエリします。

    次の SQL 文を実行して、データ書き込み結果を確認します。 ods_raw_log_d_emr テーブルと ods_user_info_d_emr テーブルにインポートされたレコードの数を確認します。

    -- クエリ文では、パーティションキーの値をノードのデータタイムスタンプに変更します。 たとえば、ノードが 2019 年 11 月 7 日に実行された場合、データタイムスタンプは 20191106 で、これはノードが実行される 1 日前です。
    SELECT * from ods_user_info_d_emr where dt=データタイムスタンプ;
    SELECT * from ods_raw_log_d_emr where dt=データタイムスタンプ;

次のステップ

このチュートリアルに基づいてデータを同期する方法を理解したら、次のチュートリアルに進むことができます。 次のチュートリアルでは、同期されたデータを計算および分析する方法について学習します。 詳細については、「データの処理」をご参照ください。