すべてのプロダクト
Search
ドキュメントセンター

DataWorks:データの同期

最終更新日:Jul 04, 2025

このチュートリアルでは、Data Integration のバッチ同期タスクを使用して、MySQL の ods_user_info_d テーブルに格納されている基本的なユーザー情報と、HttpFile の user_log.txt ファイルに格納されているユーザーの Web サイトアクセスログを非公開の Object Storage Service (OSS) バケットに同期します。次に、Spark SQL を使用して外部テーブルを作成し、非公開 OSS バケット内の同期済みデータにアクセスします。このトピックでは、データの同期方法について説明します。

目的

  1. Data Integration を使用して、MySQL データソースに格納されている基本的なユーザー情報と、HttpFile データソースに格納されているユーザーの Web サイトアクセスログを非公開の OSS バケットに同期します。基本的なユーザー情報とユーザーの Web サイトアクセスログは、DataWorks によって提供されます。

    ソースタイプ

    同期されるデータ

    ソーステーブルのスキーマ

    デスティネーションタイプ

    MySQL

    テーブル: ods_user_info_d

    基本的なユーザー情報

    • uid: ユーザー名

    • gender: 性別

    • age_range: 年齢層

    • zodiac: 星座

    OSS

    HttpFile

    ファイル: user_log.txt

    ユーザーの Web サイトアクセスログ

    ユーザーアクセスレコードは 1 行を占めます。

    $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];

    OSS

  2. データが同期された後、EMR Spark SQL を使用して外部テーブルを作成し、非公開の OSS バケット内の同期済みデータにアクセスします。

ステップ 1: ワークフローを設計する

このステップでは、Data Integration ノードと E-MapReduce (EMR) Spark SQL ノードを使用してデータを取得します。データ取得は、ユーザープロファイル分析プロセスの一部です。ods_raw_log_d_2oss_spark ノードは、HttpFile データソース内のユーザーの Web サイトアクセスログを非公開の OSS バケットに同期するために使用されます。次に、ods_raw_log_d_spark ノードを使用して、単純な外部テーブルを作成し、非公開 OSS バケット内の同期済みデータにアクセスします。ods_user_info_d_2oss_spark ノードは、MySQL データソースの基本的なユーザー情報を非公開 OSS バケットに同期するために使用されます。次に、ods_user_info_d_spark ノードを使用して、外部テーブルを作成し、非公開 OSS バケット内の同期済みデータにアクセスします。

DataWorks コンソール にログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

ワークフローを設計する

  1. ワークフローを作成します。

    開発コンポーネントは、ワークフローに基づいてデータを開発するために使用されます。ノードを作成する前に、ワークフローを作成する必要があります。詳細については、「ワークフローを作成する」をご参照ください。

    この例では、User profile analysis_Spark というワークフローが使用されます。

  2. ワークフローを設計します。

    ワークフローを作成すると、ワークフローキャンバスが自動的に表示されます。ワークフローキャンバスの上部で、[ノードを作成] をクリックし、ノードをワークフローキャンバスにドラッグし、線を描画して、ワークフロー設計 に基づいてデータ同期のノード間の依存関係を設定します。

    image

  3. この例では、ゼロロードノードと同期ノードの間にリネージは存在しません。この場合、ノード間の依存関係は、ワークフローに線を描画することで設定されます。依存関係の設定方法の詳細については、「スケジューリング依存関係設定ガイド」をご参照ください。次の表は、ノードタイプ、ノード名、および各ノードの機能を示しています。

    ノードカテゴリ

    ノードタイプ

    ノード名

    ノード機能

    一般

    imageゼロロードノード

    workshop_start_spark

    ユーザープロファイル分析のワークフロー全体を管理するために使用されます。たとえば、ゼロロードノードは、ワークフローの実行開始時間を決定します。ワークスペース内のワークフローが複雑な場合、ゼロロードノードを使用すると、ワークフロー内のデータフローのパスがより明確になります。このノードはドライランノードです。ノードのコードを編集する必要はありません。

    Data Integration

    imageバッチ同期

    ods_raw_log_d_2oss_spark

    HttpFile データソースに格納されているユーザーの Web サイトアクセスログを非公開の OSS バケットに同期するために使用されます。同期されたデータは、後で Spark SQL を使用して取得できます。

    Data Integration

    imageバッチ同期

    ods_user_info_d_2oss_spark

    MySQL データソースに格納されている基本的なユーザー情報を非公開の OSS バケットに同期するために使用されます。同期されたデータは、後で Spark SQL を使用して取得できます。

    EMR

    imageEMR Spark SQL

    ods_raw_log_d_spark

    ods_raw_log_d_spark 外部テーブルを作成して、非公開 OSS バケット内の同期されたユーザーの Web サイトアクセスログにアクセスするために使用されます。

    EMR

    imageEMR Spark SQL

    ods_user_info_d_spark

    ods_user_info_d_spark 外部テーブルを作成して、非公開 OSS バケット内の同期された基本的なユーザー情報にアクセスするために使用されます。

スケジューリングロジックを設定する

この例では、workshop_start_spark ゼロロードノードを使用して、毎日 00:30 にワークフローの実行をトリガーします。次の表は、ゼロロードノードのスケジューリングプロパティの設定を示しています。他のノードのスケジューリング設定を変更する必要はありません。実装ロジックについては、「さまざまなシナリオでワークフロー内のノードのスケジューリング時間を設定する」をご参照ください。その他のスケジューリング設定については、「概要」をご参照ください。

設定項目

スクリーンショット

説明

スケジューリング時間

image

ゼロロードノードのスケジューリング時間は 00:30 に設定されています。ゼロロードノードは、毎日 00:30 に現在のワークフローの実行をトリガーします。

スケジューリングの依存関係

image

workshop_start_spark ゼロロードノードには祖先ノードがありません。この場合、[ワークスペースのルートノード] に依存するようにゼロロードノードを設定できます。ルートノードは、workshop_start_spark ゼロロードノードの実行をトリガーします。

説明

DataWorks ワークフロー内のすべてのノードは、別のノードに依存しています。データ同期フェーズのすべてのノードは、workshop_start_spark ゼロロードノードに依存しています。したがって、workshop_start_spark ノードは、データ同期ワークフローの実行をトリガーします。

ステップ 2: データ同期タスクを設定する

ワークフローを設定した後、ods_user_info_d_2oss_spark ノードと ods_raw_log_d_2oss_spark ノードをダブルクリックできます。ノードの設定タブで、基本的なユーザー情報とユーザーの Web サイトアクセスログを非公開の OSS バケットに同期するためのパラメーターを設定します。次に、ods_raw_log_d_spark ノードと ods_user_info_d_spark ノードに Spark SQL コードを記述して、外部テーブルを作成し、非公開 OSS バケット内の同期済みデータにアクセスします。

基本的なユーザー情報とユーザーの Web サイトアクセスログを非公開 OSS バケットに同期する

Data Integration を使用して、DataWorks によって提供される基本的なユーザー情報とユーザーの Web サイトアクセスログを、非公開 OSS [バケット] のディレクトリに同期します。

ログ

バッチ同期タスクを使用して、HttpFile データソースに格納されているユーザーの Web サイトアクセスログを非公開 OSS バケットに同期できます。

HttpFile データソースの Web サイトアクセスログを作成した OSS バケットに同期します。

  1. [DataStudio] ページで、ods_raw_log_d_2oss_spark ノードをダブルクリックして、ノードの設定タブに移動します。

  2. 使用するリソースグループとデータソースの間にネットワーク接続を確立します。

    ネットワーク接続とリソースの設定が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。

    パラメーター

    説明

    ソース

    • ソース: 値を HttpFile に設定します。

    • データソース名: 値を user_behavior_analysis_httpfile に設定します。

    [リソースグループ]

    購入した serverless リソースグループ を選択します。

    [デスティネーション]

    • デスティネーション: 値を OSS に設定します。

    • データソース名: 追加した非公開 OSS データソースの名前を指定する test_g に設定します。

  3. データ同期ノードを設定します。

    パラメーター

    説明

    ソース

    • [ファイルパス]: 値を /user_log.txt に設定します。

    • [テキストタイプ]: 値を text に設定します。

    • [列区切り文字]: | を入力します。

    • [圧縮形式]: OSS オブジェクトの圧縮形式。有効な値: なし、Gzip、Bzip2、および Zip。なし を選択します。

    • [ヘッダーをスキップ]: 値を いいえ に設定します。

    [デスティネーション]

    • [テキストタイプ]: 値を text に設定します。

    • [オブジェクト名 (パスを含む)]: OSS オブジェクトのパス。OSS バケットに作成したフォルダーに基づいてこのパラメーターを設定します。この例では、ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt と入力します。ods_raw_log_d は、OSS バケットに作成したフォルダーの名前です。$bizdate は前日の日付を示します。

    • [列区切り文字]: | を入力します。

  4. スケジューリングプロパティを設定します。

    ノードの設定タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[プロパティ] タブで、ノードのスケジューリングプロパティと基本情報を設定します。次の表は、設定を示しています。

    説明

    DataWorks は、スケジューリングシナリオで日次データを OSS バケットのさまざまなディレクトリとオブジェクトに書き込むために使用できるスケジューリングパラメーターを提供します。ディレクトリとオブジェクトは、データタイムスタンプに基づいて名前が付けられます。

    ビジネスシナリオに基づいて、非公開 OSS バケットの [オブジェクト名 (パスを含む)] パラメーターを設定するときに、ディレクトリに ${変数名} 形式で変数を設定できます。次に、ノードの設定タブの [プロパティ] タブで、変数にスケジューリングパラメーターを値として割り当てることができます。このようにして、非公開 OSS バケット内のディレクトリ名とオブジェクト名を、スケジューリングシナリオで動的に生成できます。

    セクション

    説明

    スクリーンショット

    [スケジューリングパラメーター]

    このセクションで、[パラメーターを追加] をクリックしてスケジューリングパラメーターを追加します。

    • パラメーター名を bizdate に設定します。

    • パラメーター値を $[yyyymmdd-1] に設定します。

    詳細については、「スケジューリングパラメーターを設定する」をご参照ください。

    image

    [依存関係]

    このセクションで、出力テーブルが現在のノードの出力名として使用されていることを確認します。

    出力テーブルには、ワークスペース名.ノード名 形式で名前が付けられます。

    詳細については、「スケジューリングの依存関係を設定する」をご参照ください。

    image

  5. 設定が完了したら、ツールバーの 保存 アイコンをクリックします。

ユーザーデータ

バッチ同期タスクを使用して、MySQL データソースに格納されている基本的なユーザー情報を非公開 OSS バケットに同期できます。

  1. [DataStudio] ページで、ods_user_info_d_2oss_spark ノードをダブルクリックして、ノードの設定タブに移動します。

  2. 使用するリソースグループとデータソースの間にネットワーク接続を確立します。

    ネットワーク接続とリソースの設定が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。

    パラメーター

    説明

    [ソース]

    • ソース: 値を MySQL に設定します。

    • データソース名: 値を user_behavior_analysis_mysql に設定します。

    [リソースグループ]

    購入した serverless リソースグループ を選択します。

    [デスティネーション]

    • デスティネーション: 値を OSS に設定します。

    • データソース名: 追加した非公開 OSS データソースの名前である test_g に設定します。

  3. データ同期ノードを設定します。

    パラメーター

    説明

    [ソース]

    • [テーブル]: データソースの ods_user_info_d テーブルを選択します。

    • [分割キー]: 読み取るデータの分割キー。プライマリキーまたはインデックス付き列を分割キーとして使用することをお勧めします。INTEGER タイプのフィールドのみがサポートされています。この例では、uid が使用されます。

    [デスティネーション]

    • [テキストタイプ]: 値を text に設定します。

    • [オブジェクト名 (パスを含む)]: OSS オブジェクトのパス。OSS バケットに作成したフォルダーに基づいてこのパラメーターを設定します。この例では、ods_user_info_d/user_${bizdate}/user_${bizdate}.txt と入力します。ods_user_info_d は、OSS バケットに作成したフォルダーの名前です。$bizdate は前日の日付を示します。

    • [列区切り文字]: | を入力します。

  4. スケジューリングプロパティを設定します。

    ノードの設定タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[プロパティ] タブで、ノードのスケジューリングプロパティと基本情報を設定します。次の表は、設定を示しています。

    説明

    DataWorks は、スケジューリングシナリオで日次データを OSS バケットのさまざまなディレクトリとオブジェクトに書き込むために使用できるスケジューリングパラメーターを提供します。ディレクトリとオブジェクトは、データタイムスタンプに基づいて名前が付けられます。

    ビジネスシナリオに基づいて、非公開 OSS バケットの [オブジェクト名 (パスを含む)] パラメーターを設定するときに、ディレクトリに ${変数名} 形式で変数を設定できます。次に、ノードの設定タブの [プロパティ] タブで、変数にスケジューリングパラメーターを値として割り当てることができます。このようにして、非公開 OSS バケット内のディレクトリ名とオブジェクト名を、スケジューリングシナリオで動的に生成できます。

    セクション

    説明

    スクリーンショット

    [スケジューリングパラメーター]

    このセクションで、[パラメーターを追加] をクリックしてスケジューリングパラメーターを追加します。

    • パラメーター名を bizdate に設定します。

    • パラメーター値を $[yyyymmdd-1] に設定します。

    詳細については、「スケジューリングパラメーターを設定する」をご参照ください。

    image

    [依存関係]

    このセクションで、出力テーブルが現在のノードの出力名として使用されていることを確認します。

    出力テーブルには、ワークスペース名.ノード名 形式で名前が付けられます。

    詳細については、「スケジューリングの依存関係を設定する」をご参照ください。

    image

  5. 設定が完了したら、ツールバーの 保存 アイコンをクリックします。

Spark SQL を使用して外部テーブルを作成し、OSS データをロードする

バッチ同期タスクを使用してデータが非公開 OSS バケットに同期された後、生成された OSS オブジェクトに基づいて Spark SQL の CREATE 文を実行して、ods_raw_log_d_spark 外部テーブルと ods_user_info_d_spark 外部テーブルを作成できます。次に、LOCATION を使用して、非公開 OSS バケット内の同期された基本的なユーザー情報とユーザーの Web サイトアクセスログにアクセスし、後続のデータ処理を行います。

ods_raw_log_d_spark ノードを設定する

EMR Spark SQL ノードを使用して作成された ods_raw_log_d_spark 外部テーブルに基づいて、LOCATION を使用して、関連するバッチ同期タスクによって 非公開 OSS [バケット] に同期されたユーザーの Web サイトアクセスログにアクセスできます。

  1. コードを記述します。

    -- シナリオ: 次のサンプルコードでは、Spark SQL の SQL 文が使用されています。コードでは、ods_raw_log_d_spark 外部テーブルが EMR Spark SQL ノードによって作成され、LOCATION を使用して、関連するバッチ同期タスクによって非公開 OSS バケットに同期されたユーザーの Web サイトアクセスログにアクセスします。
    -- 注:
    --      DataWorks は、スケジューリングシナリオで日次の増分データをデスティネーションテーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメーターを提供します。
    --      実際の開発シナリオでは、ノードコードに変数を ${変数名} 形式で定義できます。次に、ノードの設定タブの [プロパティ] タブで、変数にスケジューリングパラメーターを値として割り当てることができます。このようにして、スケジューリングシナリオのスケジューリングパラメーターの設定に基づいて、ノードコード内のスケジューリングパラメーターの値が動的に置き換えられます。
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark
    (
      `col` STRING
    ) 
    PARTITIONED BY (
      dt STRING
    )
    LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/';
    
    ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/'
    ;
    説明

    上記のコードでは、LOCATION にサンプルパスが使用されています。パスは、関連するバッチ同期ノードを設定するときの [オブジェクト名 (パスを含む)] パラメーターの値に基づいています。LOCATION パラメーターは、作成されたフォルダーのパスに設定する必要があります。dw-emr-demo は、環境を準備したときに作成した OSS バケットのドメイン名です。

  2. スケジューリングプロパティを設定します。

    ods_raw_log_d_spark ノードのスケジューリングパラメーターを設定して、特定のデータタイムスタンプを持つ非公開 OSS ログオブジェクトを取得し、Spark テーブル内の同じデータタイムスタンプを持つ対応するパーティションにデータを書き込みます。

    セクション

    説明

    スクリーンショット

    [スケジューリングパラメーター]

    このセクションで、[パラメーターを追加] をクリックしてスケジューリングパラメーターを追加します。

    パラメーター名を bizdate に設定します。

    パラメーター値を $[yyyymmdd-1] に設定します。詳細については、「スケジューリングパラメーターを設定する」をご参照ください。

    image

    [依存関係]

    このセクションで、出力テーブルが現在のノードの出力名として使用されていることを確認します。

    出力テーブルには、ワークスペース名.ノード名 形式で名前が付けられます。

    詳細については、「スケジューリングの依存関係を設定する」をご参照ください。

    image

    説明

    このトピックの SQL 文では、${bizdate} スケジューリングパラメーターが設定され、値 T-1 がスケジューリングパラメーターに割り当てられています。バッチコンピューティングシナリオでは、bizdate はビジネストランザクションが実行された日付を示し、多くの場合、データタイムスタンプと呼ばれます。たとえば、前日の売上高の統計データを当日に収集する場合、前日はビジネストランザクションが実行された日付であり、データタイムスタンプを表します。

  3. 設定が完了したら、image アイコンをクリックします。

ods_user_info_d_spark ノードを設定する

EMR Spark SQL ノードを使用して作成された ods_user_info_d_spark 外部テーブルに基づいて、LOCATION を使用して、関連するバッチ同期タスクによって 非公開 OSS [バケット] に同期された基本的なユーザー情報にアクセスできます。

  1. コードを設定します。

    -- シナリオ: 次のサンプルコードでは、Spark SQL の SQL 文が使用されています。コードでは、ods_user_info_d_spark 外部テーブルが EMR Spark SQL ノードを使用して作成され、LOCATION を使用して、関連するバッチ同期タスクによって非公開 OSS バケットに同期された基本的なユーザー情報にアクセスします。次に、データは dt パーティションに書き込まれます。
    -- 注:
    --      DataWorks は、スケジューリングシナリオで日次の増分データをデスティネーションテーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメーターを提供します。
    --      実際の開発シナリオでは、ノードコードに変数を ${変数名} 形式で定義できます。次に、ノードの設定タブの [プロパティ] タブで、変数にスケジューリングパラメーターを値として割り当てることができます。このようにして、スケジューリングシナリオのスケジューリングパラメーターの設定に基づいて、ノードコード内のスケジューリングパラメーターの値が動的に置き換えられます。
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark
    (
        `uid`        STRING COMMENT 'ユーザー ID'
        ,`gender`    STRING COMMENT '性別'
        ,`age_range` STRING COMMENT '年齢層'
        ,`zodiac`    STRING COMMENT '星座'
    )
    PARTITIONED BY 
    (
        dt           STRING
    )
    ROW FORMAT DELIMITED 
    FIELDS
    TERMINATED
    BY'|'
    STORED AS TEXTFILE
    LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/'
    ;
    
    ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/'
    ;
    説明

    上記のコードでは、LOCATION にサンプルパスが使用されています。パスは、関連するバッチ同期ノードを設定するときの [オブジェクト名 (パスを含む)] パラメーターの値に基づいています。LOCATION パラメーターは、作成されたフォルダーのパスに設定する必要があります。dw-emr-demo は、環境を準備したときに作成した OSS バケットのドメイン名です。

  2. スケジューリングプロパティを設定します。

    ods_user_info_d_spark ノードのスケジューリングパラメーターを設定して、特定のデータタイムスタンプを持つ非公開 OSS ユーザー情報オブジェクトを取得し、Spark テーブル内の同じデータタイムスタンプを持つ対応するパーティションにデータを書き込みます。

    セクション

    説明

    スクリーンショット

    [スケジューリングパラメーター]

    このセクションで、[パラメーターを追加] をクリックしてスケジューリングパラメーターを追加します。

    パラメーター名を bizdate に設定します。

    パラメーター値を $[yyyymmdd-1] に設定します。詳細については、「スケジューリングパラメーターを設定する」をご参照ください。

    image

    [依存関係]

    このセクションで、出力テーブルが現在のノードの出力名として使用されていることを確認します。

    出力テーブルには、ワークスペース名.ノード名 形式で名前が付けられます。

    詳細については、「スケジューリングの依存関係を設定する」をご参照ください。

    image

  3. 設定が完了したら、image アイコンをクリックします。

ステップ 3: 同期されたデータを確認する

このトピックのすべてのノードが正常に実行された後、DataStudio ページの左側のナビゲーションウィンドウで [アドホッククエリ] をクリックします。[アドホッククエリ] ウィンドウで、[EMR Spark SQL] タイプのアドホッククエリタスクを作成し、SQL 文を記述して、EMR Spark SQL ノードに基づいて外部テーブルが期待どおりに作成されたかどうかを確認します。

-- パーティションフィルター条件を現在の操作のデータタイムスタンプに更新する必要があります。たとえば、タスクが 2024 年 8 月 8 日に実行された場合、データタイムスタンプは 20240807 であり、タスクの実行日よりも 1 日前になります。
SELECT  * FROM  ods_raw_log_d_spark  WHERE dt ='データタイムスタンプ';-- ods_raw_log_d_spark テーブルのデータをクエリします。
SELECT  * FROM  ods_user_info_d_spark   WHERE dt ='データタイムスタンプ';-- ods_user_info_d_spark テーブルのデータをクエリします。
説明

同期されたデータを確認するために使用される SQL 文では、WHERE 条件を "dt = ${bizdate}" に置き換えます。アドホッククエリタスクで、image ([パラメーター付きで実行]) アイコンをクリックし、SQL プレースホルダー ${bizdate} に値を割り当てて、アドホッククエリタスクを実行します。

次のステップ

データ同期は完了です。次のチュートリアルに進むことができます。次のチュートリアルでは、Spark で基本的なユーザー情報とユーザーの Web サイトアクセスログを処理する方法について学習します。詳細については、「データを処理する」をご参照ください。