すべてのプロダクト
Search
ドキュメントセンター

DataWorks:データの同期

最終更新日:Jul 04, 2025

このトピックでは、Data Integration のバッチ同期タスクを使用して、MySQL テーブル ods_user_info_d に格納されている基本的なユーザー情報と、Object Storage Service (OSS) オブジェクト user_log.txt に格納されているユーザーの Web サイトアクセスログを、それぞれ MaxCompute テーブル ods_user_info_d_odpsods_user_info_d_odps に同期します。このトピックでは、DataWorks の Data Integration サービスを使用して異種データソース間でデータを同期し、データウェアハウスの同期を完了する方法について説明します。

前提条件

  • 実験の紹介を読み、このチュートリアルについて予備知識を得ていること。実験の詳細については、「実験の紹介」をご参照ください。

  • データ同期に必要な環境が準備されていること。詳細については、「要件分析」をご参照ください。

目的

この例で提供されているパブリックデータソースのデータを MaxCompute に同期して、ワークフロー設計におけるデータ同期を完了します。

ソースタイプ

同期するデータ

ソーステーブルのスキーマ

デスティネーションタイプ

デスティネーションテーブル

デスティネーションテーブルのスキーマ

MySQL

テーブル: ods_user_info_d

基本的なユーザー情報

  • uid: ユーザー名

  • gender: 性別

  • age_range: 年齢層

  • zodiac: 星座

MaxCompute

ods_user_info_d_odps

  • uid: ユーザー名

  • gender: 性別

  • age_range: 年齢層

  • zodiac: 星座

  • dt: パーティションフィールド

HttpFile

オブジェクト: user_log.txt

ユーザーの Web サイトアクセスログ

ユーザーアクセスレコードは1行を占めます。

$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];

MaxCompute

ods_raw_log_d_odps

  • col: 生ログ

  • dt: パーティションフィールド

重要
  • このチュートリアルでは、必要なテストデータとデータソースが準備されています。ワークスペースからテストデータにアクセスするには、データソース情報をワークスペースに追加するだけで済みます。

  • この実験のデータは DataWorks での実験操作にのみ使用でき、すべてのデータは手動のモックデータであり、データは Data Integration でのみ読み取ることができます。

DataStudio ページに移動する

DataWorks コンソール にログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

ステップ 1: ワークフローを設計する

ワークフローを設計する

  1. ワークフローを作成します。

    開発コンポーネントは、ワークフローに基づいてデータを開発するために使用されます。ノードを作成する前に、ワークフローを作成する必要があります。詳細については、「ワークフローを作成する」をご参照ください。

    この例では、User profile analysis_MaxCompute という名前のワークフローを使用します。

    image

  2. ワークフローを設計します。

    ワークフローを作成すると、ワークフローキャンバスが自動的に表示されます。ワークフローキャンバスの上部にある [ノードの作成] をクリックし、ノードをワークフローキャンバスにドラッグし、ワークフロー設計 に基づいて線を引いてノード間の依存関係を構成し、データ同期を行います。

    image

  3. このチュートリアルでは、ゼロロードノードと同期ノードの間にリネージは存在しません。この場合、ノード間の依存関係は、ワークフローで線を引くことによって構成されます。依存関係の構成方法の詳細については、「スケジューリング依存関係構成ガイド」をご参照ください。次の表に、ノードタイプ、ノード名、各ノードの機能を示します。

    ノード分類

    ノードタイプ

    命名規則

    (最終出力テーブルにちなんで命名)

    ノード機能

    全般

    ゼロロードノード

    workshop_start_odps

    ユーザープロファイル分析のワークフロー全体を管理するために使用されます。たとえば、ゼロロードノードは、ワークフローの実行開始時間を決定します。ワークスペース内のワークフローが複雑な場合、ゼロロードノードを使用すると、ワークフロー内のデータフローのパスがより明確になります。このノードはドライランノードです。ノードのコードを編集する必要はありません。

    Data Integration

    バッチ同期

    ods_user_info_d_odps

    MySQL に格納されている基本的なユーザー情報を MaxCompute テーブル ods_user_info_d_odps に同期するために使用されます。

    Data Integration

    バッチ同期

    ods_raw_log_d_odps

    OSS に格納されているユーザーの Web サイトアクセスログを MaxCompute テーブル ods_raw_log_d_odps に同期するために使用されます。

スケジューリングロジックを構成する

この例では、workshop_start_odps ゼロロードノードを使用して、毎日 00:30 にワークフローの実行をトリガーします。次の表に、ゼロロードノードのスケジューリングプロパティの構成を示します。他のノードのスケジューリング構成を変更する必要はありません。実装ロジックについては、「さまざまなシナリオでワークフロー内のノードのスケジューリング時間を構成する」をご参照ください。その他のスケジューリング構成については、「概要」をご参照ください。

構成項目

スクリーンショット

説明

スケジューリング時間

image

ゼロロードノードのスケジューリング時間は 00:30 に設定されています。ゼロロードノードは、毎日 00:30 に現在のワークフローの実行をトリガーします。

スケジューリングの依存関係

image

workshop_start_odps ゼロロードノードには祖先ノードがありません。この場合、ゼロロードノードを [ワークスペースのルートノード] に依存するように構成できます。ルートノードは、workshop_start_spark ゼロロードノードの実行をトリガーします。

説明

DataWorks ワークフロー内のすべてのノードは、祖先ノードに依存する必要があります。データ同期フェーズのすべてのノードは、ゼロロードノード workshop_start_odps に依存します。したがって、データ同期ワークフローの実行は、workshop_start_odps ノードによってトリガーされます。

ステップ 2: データ同期タスクを構成する

デスティネーション MaxCompute テーブルを作成する

事前に Data Integration を使用して同期されたデータを格納するために使用する MaxCompute テーブルを作成する必要があります。このチュートリアルでは、テーブルは迅速な方法で作成されます。MaxCompute テーブル関連操作の詳細については、「MaxCompute テーブルを作成および管理する」をご参照ください。

  1. テーブル作成のエントリポイントに移動します。

    image.png

  2. ods_raw_log_d という名前のテーブルを作成します。

    [テーブルの作成] ダイアログボックスで、[名前] フィールドに ods_raw_log_d_odps と入力します。テーブル構成タブの上部にある [DDL] をクリックし、次のテーブル作成ステートメントを入力して、[テーブルスキーマの生成] をクリックします。[確認] ダイアログボックスで、[確認] をクリックして元の構成を上書きします。

    CREATE TABLE IF NOT EXISTS ods_raw_log_d_odps
    (
     col STRING
    ) 
    PARTITIONED BY
    (
     dt STRING
    )
    LIFECYCLE 7;
  3. ods_user_info_d_odps という名前のテーブルを作成します。

    [テーブルの作成] ダイアログボックスで、[名前] フィールドに ods_user_info_d_odps と入力します。テーブル構成タブの上部にある [DDL] をクリックし、次のテーブル作成ステートメントを入力して、[テーブルスキーマの生成] をクリックします。[確認] ダイアログボックスで、[確認] をクリックして元の構成を上書きします。

    CREATE TABLE IF NOT EXISTS ods_user_info_d_odps (
     uid STRING COMMENT 'ユーザー ID',
     gender STRING COMMENT '性別',
     age_range STRING COMMENT '年齢層',
     zodiac STRING COMMENT '星座'
    )
    PARTITIONED BY (
     dt STRING
    )
    LIFECYCLE 7;
  4. テーブルをコミットしてデプロイします。

    テーブル情報が有効であることを確認した後、ods_user_info_d テーブルと ods_raw_log_d テーブルの構成タブで、[開発環境にコミット][本番環境にコミット] を順番にクリックします。開発環境と本番環境のワークスペースに関連付けられている MaxCompute プロジェクトでは、ノード構成に基づいて、MaxCompute プロジェクトに関連する物理テーブルが作成されます。

    説明

    テーブルのスキーマを定義した後、テーブルを開発環境と本番環境にコミットできます。テーブルがコミットされると、特定の環境の MaxCompute プロジェクトでテーブルを表示できます。

    • テーブルをワークスペースの開発環境にコミットすると、テーブルは開発環境のワークスペースに関連付けられている MaxCompute プロジェクトに作成されます。

    • テーブルをワークスペースの本番環境にコミットすると、テーブルは本番環境のワークスペースに関連付けられている MaxCompute プロジェクトに作成されます。

ソースを追加する

このチュートリアルでは、ApsaraDB RDS for MySQL データベースOSS バケット のデータがテストデータとして使用されます。テストデータにアクセスするには、ワークスペースに user_behavior_analysis_mysql という名前の ApsaraDB RDS for MySQL データソースと user_behavior_analysis_mysql という名前の HttpFile データソースを追加する必要があります。テストに使用されるデータソースの基本情報が提供されます。

説明
  • Data Integration 同期タスクを設定する前に、DataWorks コンソールの[データソース]ページでソースおよびターゲットのデータベースまたはデータウェアハウスを追加および設定できます。これにより、同期タスクを設定する際に、名前でデータソースを検索して、使用するソースおよびターゲットのデータベースまたはデータウェアハウスを決定できます。

  • この実験のデータは DataWorks での実験操作にのみ使用でき、すべてのデータは手動のモックデータであり、データは Data Integration でのみ読み取ることができます。

  • この手順で追加する HttpFile データソースと ApsaraDB RDS for MySQL データソースのテストデータはインターネット上に格納されています。ステップ 2 に従って、DataWorks リソースグループにインターネット NAT ゲートウェイが構成されていることを確認してください。そうでない場合、接続性をテストするときに次のエラーが報告されます。

    • HttpFile: ErrorMessage:[Connect to dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com:443 [dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com/106.14.XX.XX] failed: connect timed out]

    • MySQL: ErrorMessage:[Exception:Communications link failure The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.<br><br>ExtraInfo:Resource Group IP:****,detail version info:mysql_all],Root Cause:[connect timed out]

user_behavior_analysis_mysql という名前の ApsaraDB RDS for MySQL データソースを追加する

ApsaraDB RDS for MySQL データソースをワークスペースに追加します。次に、データソースとデータ同期に使用するリソースグループの間にネットワーク接続が確立されているかどうかをテストします。ApsaraDB RDS for MySQL データソースは、ApsaraDB RDS for MySQL に格納され、DataWorks からアクセスできる基本的なユーザー情報を読み取るために使用されます。

  1. [データソース] ページに移動します。

    1. DataWorks コンソール にログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[その他] > [管理センター] を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターに移動] をクリックします。

    2. SettingCenter ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。

  2. ApsaraDB RDS for MySQL データソースを追加します。

    1. [データソース] ページで、[データソースの追加] をクリックします。

    2. [データソースの追加] ダイアログボックスで、[MySQL] をクリックします。

    3. [MySQL データソースの追加] ページで、パラメータを構成します。次の表にパラメータを示します。

      image

      パラメータ

      説明

      データソース名

      データソースの名前。user_behavior_analysis_mysql と入力します。

      データソースの説明

      データソースの説明。データソースは DataWorks のユースケース専用に提供され、提供されたテストデータにアクセスするためのバッチ同期タスクのソースとして使用されます。データソースは、データ同期シナリオでのデータ読み取り専用です。

      構成モード

      このパラメータを [接続文字列モード] に設定します。

      環境

      [開発と本番] を選択します。

      説明

      開発環境 のデータソースと 本番環境 のデータソースを追加する必要があります。そうでない場合、関連するタスクを実行してデータを生成するときにエラーが報告されます。

      接続アドレス

      ホスト IP アドレス

      rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com

      ポート番号

      3306

      データベース名

      workshop

      ユーザー名

      workshop

      パスワード

      workshop#2017

      認証方法

      このパラメータを [認証なし] に設定します。

      接続構成

      [接続構成] セクションで、購入したサーバーレスリソースグループを見つけ、[接続ステータス] 列の [ネットワーク接続のテスト] をクリックします。開発環境と本番環境のリソースグループとデータソース間のネットワーク接続を個別にテストする必要があります。テストが成功したことを示すメッセージがシステムから返されると、接続ステータスは [接続済み] に変わります。

      重要

      この手順で追加する ApsaraDB RDS for MySQL データソースのテストデータはインターネット上に格納されています。ステップ 2 に従って、DataWorks リソースグループにインターネット NAT ゲートウェイが構成されていることを確認してください。そうでない場合、接続性をテストするときに次のエラーが報告されます: ErrorMessage:[Exception:Communications link failure The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.<br><br>ExtraInfo:Resource Group IP:****,detail version info:mysql_all],Root Cause:[connect timed out]

user_behavior_analysis_httpfile という名前の HttpFile データソースを追加する

HttpFile データソースをワークスペースに追加します。次に、データソースとデータ同期に使用するリソースグループの間にネットワーク接続が確立されているかどうかをテストします。HttpFile データソースは、OSS に格納され、DataWorks からアクセスできるユーザーの Web サイトアクセステストデータを読み取るために使用されます。

  1. [データソース] ページに移動します。

    1. DataWorks コンソール にログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[その他] > [管理センター] を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターに移動] をクリックします。

    2. SettingCenter ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。

  2. HttpFile データソースを追加します。

    1. [データソース] ページで、[データソースの追加] をクリックします。

    2. [データソースの追加] ダイアログボックスで、[HttpFile] をクリックします。

    3. [HttpFile データソースの追加] ページで、パラメータを構成します。次の表にパラメータを示します。

      パラメータ

      説明

      データソース名

      データソースの名前。ワークスペース内のデータソースの識別子です。この例では、パラメータは user_behavior_analysis_httpfile に設定されています。

      データソースの説明

      データソースの説明。データソースは DataWorks のユースケース専用に提供され、提供されたテストデータにアクセスするためのバッチ同期タスクのソースとして使用されます。データソースは、データ同期シナリオでのデータ読み取り専用です。

      環境

      [開発環境] と [本番環境] を選択します。

      説明

      開発環境 のデータソースと 本番環境 のデータソースを追加する必要があります。そうでない場合、関連するタスクを実行してデータを生成するときにエラーが報告されます。

      URL ドメイン

      OSS バケットの URL。https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com と入力します。

      接続構成

      [接続設定] セクションで、購入したサーバーレスリソースグループを見つけ、[接続ステータス] 列の [ネットワーク接続性テスト] をクリックします。開発環境と本番環境それぞれについて、リソースグループとデータソース間のネットワーク接続を個別にテストする必要があります。テストが成功したことを示すメッセージがシステムから返されると、接続性ステータスが [接続済み] に変わります。

      重要

      この手順で追加する HttpFile データソースのテストデータはインターネット上に格納されています。ステップ 2 に従って、DataWorks リソースグループにインターネット NAT ゲートウェイが構成されていることを確認してください。そうでない場合、接続性をテストするときに次のエラーが報告されます: ErrorMessage:[Connect to dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com:443 [dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com/106.14.XX.XX] failed: connect timed out]

バッチ同期タスクを構成して基本的なユーザー情報を同期する

この例では、バッチ同期タスクを使用して、MySQL テーブル ods_user_info_d から MaxCompute テーブル ods_user_info_d_odps に基本的なユーザー情報を同期します。

  1. バッチ同期ノード ods_user_info_d_odps をダブルクリックして、ノードの構成タブに移動します。

  2. ネットワーク接続とリソースグループを構成します。

    [ソース][リソースグループ][デスティネーション] の構成が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。次の表に構成を示します。

    image

    パラメータ

    説明

    ソース

    • パラメータを MySQL に設定します。

    • [データソース名] パラメータを user_behavior_analysis_mysql に設定します。

    [リソースグループ]

    環境準備 フェーズで購入したサーバーレスリソースグループを選択します。

    [デスティネーション]

    • パラメータを MaxCompute に設定します。

    • [データソース名] パラメータを user_behavior_analysis_mysql に設定します。

  3. バッチ同期ノードに基づいてタスクを構成します。

    • ソースとデスティネーションを構成します。

      項目

      パラメータ

      説明

      [ソース]

      [テーブル]

      MySQL テーブル ods_user_info_d を選択します。

      image

      [分割キー]

      読み取るデータの分割キー。プライマリキーまたはインデックス付き列を分割キーとして使用することをお勧めします。INTEGER タイプのフィールドのみがサポートされています。

      この例では、uid フィールドが分割キーとして使用されています。

      [デスティネーション]

      [トンネルリソースグループ]

      このチュートリアルでは、[共通伝送リソース] がデフォルトで選択されています。専用のトンネルクォータが存在する場合は、ドロップダウンリストから専用のトンネルクォータを選択できます。

      説明

      MaxCompute のデータ伝送リソースの詳細については、「データ伝送サービス専用の排他的リソースグループを購入および使用する」をご参照ください。延滞または期限切れにより専用のトンネルクォータが使用できない場合、実行中のタスクは専用のトンネルクォータから無料のトンネルクォータに自動的に切り替わります。

      image

      [スキーマ]

      このチュートリアルでは、[デフォルト] が選択されています。MaxCompute プロジェクトに別のスキーマがある場合は、ドロップダウンリストからスキーマを選択できます。

      [テーブル]

      ドロップダウンリストから、アドホッククエリで作成された ods_user_info_d_odps テーブルを選択します。

      [パーティション情報]

      このチュートリアルでは、値を ${bizdate} に設定します。

      [書き込みモード]

      • ドロップダウンリストから [書き込み前に既存のデータをクリーンアップする (Insert Overwrite)] を選択します。

      • 有効な値:

        • Insert Into: テーブルまたはテーブルの静的パーティションにデータを挿入します。

        • Insert Overwrite: 指定されたテーブルをクリアし、テーブルまたはテーブルの静的パーティションにデータを挿入します。

      [空の文字列を Null に変換して書き込む]

      このチュートリアルでは、[いいえ] を選択します。

    • フィールドマッピングと一般設定を構成します。

      DataWorks では、ソースフィールドとデスティネーションフィールド間のマッピングを構成して、指定されたソースフィールドからデータを読み取り、デスティネーションフィールドにデータを書き込むことができます。[チャネル制御] セクションでは、データの読み取りと書き込みの並列処理、データベースのパフォーマンスへの影響を防ぐための最大伝送速度、ダーティデータレコードと分散実行のポリシーなどの機能も使用できます。このチュートリアルでは、デフォルト設定が使用されています。同期タスクのその他の構成項目については、「コードレス UI を使用してバッチ同期タスクを構成する」をご参照ください。

  4. スケジューリングプロパティを構成します。

    ノードの構成タブで、右側のナビゲーションウィンドウの [プロパティ] をクリックします。[プロパティ] タブで、ノードのスケジューリングプロパティと基本情報を構成します。詳細については、「ノードのスケジューリングプロパティ」をご参照ください。次の表に構成を示します。

    セクション

    説明

    [スケジューリングパラメータ]

    [スケジューリングパラメータ] のデフォルト値 $bizdate を保持します。

    説明

    [パラメータ名] には bizdate、[パラメータ値] には $bizdate と入力します。これは、前日の日付を照会するために使用されます。日付の形式は yyyymmdd です

    image

    [スケジュール]

    • スケジューリングサイクル:値を に設定します。

    • [スケジュールされた時間]: 値を 00:30 に設定します。

    • [再実行]: 値を [実行ステータスに関係なく許可] に設定します。

    その他のパラメータにはデフォルト値を使用します。

    説明

    現在のノードが毎日スケジュールされる時間は、ワークフローのゼロロードノード workshop_start のスケジューリング時間によって決まります。現在のノードは、毎日 00:30 以後にスケジュールされます。

    image

    [リソースグループ]

    環境準備 フェーズで購入したサーバーレスリソースグループを選択します。

    image

    [依存関係]

    • 現在のノードの祖先ノードの決定: 現在のノードの [親ノード]workshop_start ノードが表示されるかを確認します。 線を引いて現在のノードの祖先ノードとして指定したノードが表示されます。 workshop_start ノードが表示されない場合は、「2. ワークフローの設計」を参照し、ビジネスデータ同期フェーズにおけるワークフロー設計が完了しているかどうかを確認します。

      この例では、workshop_start ノードのスケジューリング時間が到着し、ノードの実行が完了すると、現在のノードの実行がトリガーされます。

    • [現在のノードの出力を決定する]: 本番環境の MaxCompute プロジェクト名.ods_user_info_d_odps の形式で名前が付けられた現在のノードの出力が存在するかどうかを決定します。ノード出力が存在しない場合は、指定された [出力名] でノード出力を手動で追加する必要があります。

    説明
    • DataWorks では、ノードの出力は、ノードとその子孫ノード間のスケジューリングの依存関係を構成するために使用されます。SQL ノードが同期ノードに依存している場合、SQL ノードが同期ノードの出力テーブルの処理を開始するときに、DataWorks は 自動解析 機能を使用して、テーブルリネージに基づいて同期ノードを SQL ノードの祖先ノードとして迅速に構成します。本番環境の MaxCompute プロジェクト名.ods_user_info_d_odps の形式で名前が付けられたノード出力テーブルと同じ名前を持つ [ノード出力] が存在するかどうかを確認する必要があります。

    image

バッチ同期タスクを構成してユーザーの Web サイトアクセスログを同期する

この例では、バッチ同期タスクを使用して、パブリック HttpFile データソースの user_log.txt ファイルから MaxCompute テーブル ods_raw_log_d_odps にユーザーの Web サイトアクセスログを同期します。

  1. バッチ同期ノード ods_raw_log_d_odps をダブルクリックして、ノードの構成タブに移動します。

  2. ネットワーク接続とリソースグループを構成します。

    [ソース][リソースグループ][デスティネーション] の構成が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。次の表に構成を示します。

    image

    パラメータ

    説明

    [ソース]

    • パラメータを HttpFile に設定します。

    • [データソース名] パラメータを user_behavior_analysis_HttpFile に設定します。

    [リソースグループ]

    環境準備 フェーズで購入したサーバーレスリソースグループを選択します。

    [デスティネーション]

    • パラメータを MaxCompute に設定します。

    • [データソース名] パラメータを user_behavior_analysis_mysql に設定します。

  3. タスクを構成します。

    • ソースとデスティネーションを構成します。

      項目

      パラメータ

      説明

      [ソース]

      [ファイルパス]

      このチュートリアルでは、値を /user_log.txt に設定します。

      image

      [ファイルタイプ]

      ドロップダウンリストから text を選択します。

      [列区切り文字]

      値を | に設定します。

      [詳細設定]

      [コーディング]

      ドロップダウンリストから UTF-8 エンコーディング形式を選択します。

      image

      [圧縮形式]

      ドロップダウンリストから UTF-8 形式を選択します。

      [ヘッダーをスキップ]

      ドロップダウンリストから No を選択します。ヘッダーはスキップされません。

      [デスティネーション]

      [トンネルリソースグループ]

      このチュートリアルでは、[共通伝送リソース] がデフォルトで選択されています。専用のトンネルクォータが存在する場合は、ドロップダウンリストから専用のトンネルクォータを選択できます。

      説明

      MaxCompute のデータ伝送リソースの詳細については、「データ伝送サービス専用の排他的リソースグループを購入および使用する」をご参照ください。延滞または期限切れにより専用のトンネルクォータが使用できない場合、実行中のタスクは専用のトンネルクォータから無料のトンネルクォータに自動的に切り替わります。

      image

      [スキーマ]

      このチュートリアルでは、[デフォルト] が選択されています。DataWorks ワークスペースに別のスキーマがある場合は、ドロップダウンリストからスキーマを選択できます。

      [テーブル]

      ドロップダウンリストから、アドホッククエリで作成された ods_raw_log_d_odps テーブルを選択します。

      [パーティション情報]

      このチュートリアルでは、値を ${bizdate} に設定します。

      [書き込みモード]

      • ドロップダウンリストから [書き込み前に既存のデータをクリーンアップする (Insert Overwrite)] を選択します。

      • 有効な値:

        • Insert Into: テーブルまたはテーブルの静的パーティションにデータを挿入します。

        • Insert Overwrite: 指定されたテーブルをクリアし、テーブルまたはテーブルの静的パーティションにデータを挿入します。

      [空の文字列を Null に変換して書き込む]

      このチュートリアルでは、[いいえ] を選択します。

      データソースの構成が完了したら、[データ構造の確認] をクリックして、ログファイルを読み取ることができるかどうかを確認します。

    • フィールドマッピングと一般設定を構成します。

      DataWorks では、ソースフィールドとデスティネーションフィールド間のマッピングを構成して、指定されたソースフィールドからデータを読み取り、デスティネーションフィールドにデータを書き込むことができます。[チャネル制御] セクションでは、データの読み取りと書き込みの並列処理、データベースのパフォーマンスへの影響を防ぐための最大伝送速度、ダーティデータレコードと分散実行のポリシーなどの機能も使用できます。このチュートリアルでは、デフォルト設定が使用されています。同期タスクのその他の構成項目については、「コードレス UI を使用してバッチ同期タスクを構成する」をご参照ください。

  4. スケジューリングプロパティを構成します。

    ノードの構成タブで、右側のナビゲーションウィンドウの [プロパティ] をクリックします。[プロパティ] タブで、ノードのスケジューリングプロパティと基本情報を構成します。詳細については、「ノードのスケジューリングプロパティ」をご参照ください。次の表に構成を示します。

    パラメータ

    説明

    [スケジューリングパラメータ]

    [スケジューリングパラメータ] のデフォルト値 $bizdate を保持します。

    説明

    [パラメータ名] には bizdate、[パラメータ値] には $bizdate と入力します。これは、前日の日付を照会するために使用されます。日付の形式は yyyymmdd です

    image

    [スケジュール]

    • [スケジューリングサイクル]: 値を Day に設定します。

    • [スケジュールされた時間]: 値を 00:30 に設定します。

    • [再実行]: 値を [実行ステータスに関係なく許可] に設定します。

    その他のパラメータにはデフォルト値を使用します。

    説明

    現在のノードが毎日スケジュールされる時間は、ワークフローのゼロロードノード workshop_start のスケジューリング時間によって決まります。現在のノードは、毎日 00:30 以後にスケジュールされます。

    image

    [リソースグループ]

    環境準備 フェーズで購入したサーバーレスリソースグループを選択します。

    image

    [依存関係]

    • [現在のノードの祖先ノード] の決定: 現在のノードについて、[親ノード]workshop_start ノードが表示されるかを確認します。線を引いて現在のノードの祖先ノードとして指定したノードが表示されます。workshop_start ノードが表示されない場合は、2. ワークフローの設計 を参照して、ビジネスデータ同期フェーズのワークフロー設計が完了しているかどうかを確認してください。

      この例では、workshop_start ノードのスケジューリング時間が到着し、ノードの実行が完了すると、現在のノードの実行がトリガーされます。

    • [現在のノードの出力を決定する]: 本番環境の MaxCompute プロジェクト名.ods_raw_log_d_odps の形式で名前が付けられた現在のノードの出力が存在するかどうかを決定します。ノード出力が存在しない場合は、指定された [出力名] でノード出力を手動で追加する必要があります。

    説明

    DataWorks では、ノードの出力は、ノードとその子孫ノード間のスケジューリングの依存関係を設定するために使用されます。 SQL ノードが同期ノードに依存している場合、SQL ノードが同期ノードの出力テーブルの処理を開始すると、DataWorks はテーブルリネージに基づいて 自動解析 機能を使用し、同期ノードを SQL ノードの祖先ノードとして迅速に設定します。 MaxCompute project name in the production environment.ods_raw_log_d_odps というフォーマットの名前を持つノード出力テーブルと同じ名前の [ノード出力] が存在するかどうかを確認する必要があります。

    image

ステップ 8: ワークフローを実行して結果を表示する

ワークフローを実行する

  1. DataStudio ページで、[ビジネスフロー] の下にある User profile analysis_MaxCompute ワークフローをダブルクリックし、ワークフローの構成タブで、上部のツールバーにある image.png アイコンをクリックして、ノード間のスケジューリングの依存関係に基づいてワークフロー内のノードを実行します。

  2. ステータスを確認します。

    • ノードステータスの表示: ノードが image.png 状態の場合、同期プロセスは正常です。

    • ノードの実行ログを表示するには、たとえば ods_user_info_d_odps または ods_raw_log_d_odps ノードを右クリックし、[ログの表示] を選択します。 次の図に示す情報がログに表示された場合、ノードが実行され、データが同期されます。

      image

同期結果を表示する

ワークフロー内のノードが期待どおりに実行された場合、ApsaraDB RDS for MySQL テーブル ods_user_info_d のすべての基本的なユーザー情報は、出力テーブル workshop2024_01_dev.ods_user_info_d_odps の前日のパーティションに同期され、OSS オブジェクト user_log.txt のユーザーのすべての Web サイトアクセスログは、出力テーブル workshop2024_01_dev.ods_raw_log_d_odps の前日のパーティションに同期されます。クエリ SQL ステートメントを実行のために本番環境にデプロイする必要はありません。したがって、アドホッククエリを作成 することで同期結果をクエリできます。

  1. アドホッククエリを作成します。

    [DataStudio] ページの左側のナビゲーションウィンドウで、image.png アイコンをクリックします。 Ad Hoc Query ペインで、[Ad Hoc Query] を右クリックし、[ノードの作成] > [ODPS SQL] を選択します。

  2. 同期結果テーブルをクエリします。

    次の SQL ステートメントを実行して、データ書き込み結果を確認します。ods_raw_log_d_odps テーブルと ods_user_info_d_odps テーブルにインポートされたレコード数を確認します。

    // 読み取りおよび書き込み操作を実行するデータのデータタイムスタンプを、パーティションのフィルター条件として指定する必要があります。たとえば、ノードが 2023 年 6 月 21 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20230620 で、ノードの実行日よりも 1 日前です。
    select count(*) from ods_user_info_d_odps  where dt='データタイムスタンプ'; 
    select count(*) from ods_raw_log_d_odps where dt='データタイムスタンプ';

    image

    説明

    このチュートリアルでは、ノードは開発環境である [DataStudio] で実行されます。したがって、データはデフォルトで開発環境のワークスペースに関連付けられている MaxCompute プロジェクト workshop2024_01_dev の指定されたテーブルに書き込まれます。

次のステップ

データ同期は完了です。次のチュートリアルに進むことができます。次のチュートリアルでは、MaxCompute で基本的なユーザー情報とユーザーの Web サイトアクセスログを処理する方法を学習します。詳細については、「データを処理する」をご参照ください。