すべてのプロダクト
Search
ドキュメントセンター

DataWorks:データ同期

最終更新日:Mar 26, 2026

このチュートリアルでは、2 種類の異種ソース(ApsaraDB RDS for MySQL のテーブルと Object Storage Service (OSS) のファイル)から DataWorks のデータ統合バッチ同期タスクを用いて MaxCompute へデータを読み込む手順について説明します。完了後、両方のデータセットはパーティション化された MaxCompute テーブルに格納され、毎日スケジュール実行されます。

実施内容:

  1. ワークフローの設計およびスケジューリングロジックの構成

  2. 送信先 MaxCompute テーブルの作成

  3. MySQL および HttpFile データソースの追加

  4. 2 つのバッチ同期タスクの構成

  5. ワークフローの実行および結果の検証

前提条件

開始する前に、以下の準備が完了していることを確認してください。

  • 実験の概要を確認済みであること。詳細については、「実験の概要」をご参照ください。

  • 環境の準備で説明されている環境設定を完了済みであること。これには、サーバーレスリソースグループの購入およびインターネット NAT ゲートウェイの構成が含まれます。

同期対象データ

ソースオブジェクト内容送信先テーブル
MySQL (user_behavior_analysis_mysql)テーブル:ods_user_info_d基本的なユーザー情報ods_user_info_d_odps
HttpFile (user_behavior_analysis_httpfile)ファイル:user_log.txtWeb サイトアクセスログods_raw_log_d_odps

テストデータおよびデータソースは、本チュートリアル向けに事前にプロビジョニングされています。ご利用のワークスペースでデータソース情報を登録するだけで、これらのリソースにアクセスできます。すべてのデータは合成データであり、Data Integration 内では読み取り専用です。

ステップ 1:ワークフローの設計

ワークフローの作成および設計

DataWorks における開発は、ワークフロー単位で管理されます。ノードを追加する前に、まずワークフローを作成します。

  1. DataStudio に移動します。DataWorks コンソールにログインします。上部ナビゲーションバーで対象リージョンを選択します。左側ナビゲーションウィンドウで、データ開発および運用 > データ開発 を選択します。ドロップダウンリストからワークスペースを選択し、データ開発へ移動 をクリックします。

  2. User profile analysis_MaxCompute という名前のワークフローを作成します。手順については、「ワークフローの作成」をご参照ください。

    image

  3. ワークフローの設計を行います。ワークフローキャンバスが開いたら、ツールバーの ノードの作成 をクリックします。ノードをキャンバス上にドラッグし、線を引いてデータ同期のスケジューリング依存関係を設定します。レイアウトは「ワークフロー設計」に基づきます。本チュートリアルでは、以下の 3 つのノードを使用します。ゼロロードノードと同期ノード間にはデータリネージがないため、キャンバス上で線を引くことで依存関係を構成します。詳細については、「スケジューリング依存関係の構成ガイド」をご参照ください。

    ノードタイプノード名目的
    ゼロロードノードworkshop_start_odps毎日 00:30 にワークフロー全体をトリガーします。このノードにはコードは不要です。
    バッチ同期ods_user_info_d_odpsMySQL から基本的なユーザー情報を MaxCompute へ同期します。
    バッチ同期ods_raw_log_d_odpsOSS から Web サイトアクセスログを MaxCompute へ同期します。

    image

スケジューリングロジックの構成

workshop_start_odps ゼロロードノードがワークフロー全体をトリガーします。このノードを毎日 00:30 に実行するよう構成し、完了後に同期ノードが自動的に実行されるようにします。他のノードのスケジューリング設定はデフォルトのままとします。

説明 DataWorks ワークフロー内のすべてのノードは、先祖ノードに依存する必要があります。2 つの同期ノードは workshop_start_odps に依存しているため、ワークフローはこのノードによってトリガーされます。
構成項目設定値説明
スケジューリング時刻image00:30 に設定します。ゼロロードノードは、この時刻に毎日ワークフローをトリガーします。
スケジューリング依存関係imageworkshop_start_odps ノードには先祖ノードがありません。ワークスペースのルートノードに依存するよう設定することで、このゼロロードノードをトリガーします。

スケジューリング構成の詳細については、「異なるシナリオにおけるワークフロー内ノードのスケジューリング時刻の構成」および「概要」をご参照ください。

ステップ 2:送信先 MaxCompute テーブルの作成

同期タスクの構成前に、同期データを格納する 2 つの MaxCompute テーブルを作成します。MaxCompute テーブル操作に関する一般的な情報については、「MaxCompute テーブルの作成および管理」をご参照ください。

  1. テーブル作成のエントリポイントに移動します。

    image.png

  2. ods_raw_log_d_odps テーブルを作成します。テーブルの作成 ダイアログボックスの 名前 フィールドに ods_raw_log_d_odps を入力します。DDL をクリックし、以下の文を貼り付け、テーブルスキーマの生成 をクリックします。確認 ダイアログボックスで 確認 をクリックして適用します。

    CREATE TABLE IF NOT EXISTS ods_raw_log_d_odps
    (
     col STRING
    )
    PARTITIONED BY
    (
     dt STRING
    )
    LIFECYCLE 7;
  3. ods_user_info_d_odps テーブルを作成します。テーブルの作成 ダイアログボックスの 名前 フィールドに ods_user_info_d_odps を入力します。DDL をクリックし、以下の文を貼り付け、テーブルスキーマの生成 をクリックします。確認 ダイアログボックスで 確認 をクリックして適用します。

    CREATE TABLE IF NOT EXISTS ods_user_info_d_odps (
     uid STRING COMMENT 'ユーザー ID',
     gender STRING COMMENT '性別',
     age_range STRING COMMENT '年齢層',
     zodiac STRING COMMENT '星座'
    )
    PARTITIONED BY (
     dt STRING
    )
    LIFECYCLE 7;
  4. 両方のテーブルをコミットおよびデプロイします。各テーブルの構成タブで、開発環境へのコミット をクリックし、その後 本番環境へのコミット をクリックします。DataWorks は、それぞれの環境に関連付けられた MaxCompute プロジェクト内に物理テーブルを作成します。

    説明 開発環境へのコミットは、開発環境に関連付けられた MaxCompute プロジェクト内にテーブルを作成します。本番環境へのコミットは、本番環境のプロジェクト内にテーブルを作成します。

ステップ 3:データソースの追加

同期タスクの構成前に、ワークスペース内に両方のデータソースを登録します。これにより、各タスクの構成時に名前でデータソースを選択できるようになります。

重要

両方のソースのテストデータはインターネット上にホストされています。リソースグループに対してインターネット NAT ゲートウェイが構成されていることを確認してください(「ステップ 2」を参照)。構成されていない場合、接続性テストは以下のエラーで失敗します。

  • HttpFile:ErrorMessage:[Connect to dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com:443 [...] failed: connect timed out]

  • MySQL:ErrorMessage:[Exception:Communications link failure ... Root Cause:[connect timed out]]

データソース ページに移動するには、以下の手順を実行します。

  1. DataWorks コンソールにログインします。上部ナビゲーションバーで対象リージョンを選択します。左側ナビゲーションウィンドウで、その他 > 管理センター を選択します。ワークスペースを選択し、管理センターへ移動 をクリックします。

  2. 左側ナビゲーションウィンドウで、データソース をクリックします。

MySQL データソースの追加

MySQL に格納された基本的なユーザー情報を読み取るため、ApsaraDB RDS for MySQL データソースを追加します。

  1. データソース ページで、データソースの追加 をクリックします。

  2. データソースの追加 ダイアログボックスで、MySQL をクリックします。

  3. MySQL データソースの追加 ページで、以下のパラメーターを入力します。

    説明 データソースは 開発環境 および 本番環境 の両方に追加してください。いずれかを省略すると、関連するタスクが実行時に失敗します。
    パラメーター
    データソース名user_behavior_analysis_mysql
    データソースの説明DataWorks チュートリアルのユースケース用。バッチ同期の読み取り専用ソース。
    構成モード接続文字列モード
    環境開発・本番
    ホスト IP アドレスrm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
    ポート番号3306
    データベース名workshop
    ユーザー名workshop
    パスワードworkshop#2017
    認証方式認証なし

    image

  4. 接続構成 セクションで、ご利用のサーバーレスリソースグループを見つけ、ネットワーク接続性のテスト をクリックします。開発環境および本番環境をそれぞれ個別にテストします。テストが成功すると、ステータスが 接続済み に変わります。

HttpFile データソースの追加

OSS に格納された Web サイトアクセスログを読み取るため、HttpFile データソースを追加します。

  1. データソース ページで、データソースの追加 をクリックします。

  2. データソースの追加 ダイアログボックスで、HttpFile をクリックします。

  3. HttpFile データソースの追加 ページで、以下のパラメーターを入力します。

    説明 データソースは 開発環境 および 本番環境 の両方に追加してください。いずれかを省略すると、関連するタスクが実行時に失敗します。
    パラメーター
    データソース名user_behavior_analysis_httpfile
    データソースの説明DataWorks チュートリアルのユースケース用。バッチ同期の読み取り専用ソース。
    環境開発環境 および 本番環境
    URL ドメインhttps://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com
  4. 接続構成 セクションで、ご利用のサーバーレスリソースグループを見つけ、ネットワーク接続性のテスト をクリックします。両方の環境をそれぞれ個別にテストし、それぞれが 接続済み であることを確認します。

ステップ 4:バッチ同期タスクの構成

MySQL から MaxCompute への同期

このタスクは、MySQL テーブル ods_user_info_d から読み取り、MaxCompute テーブル ods_user_info_d_odps へ書き込みます。

  1. ods_user_info_d_odps ノードをダブルクリックして、構成タブを開きます。

  2. ソース、リソースグループ、送信先を構成し、次へ をクリックして接続性テストを完了します。

    パラメーター
    ソースMySQL — データソース名:user_behavior_analysis_mysql
    リソースグループご利用のサーバーレスリソースグループ
    送信先MaxCompute — データソース名:user_behavior_analysis_mysql

    image

  3. ソースおよび送信先の設定を構成します。デフォルトのフィールドマッピングおよびチャネル制御設定を使用します。利用可能なすべてのオプションについては、「コードレス UI を使用したバッチ同期タスクの構成」をご参照ください。

    項目パラメータースクリーンショット
    ソーステーブルods_user_info_dimage
    分割キーuid
    送信先Tunnel リソースグループ共通伝送リソース(デフォルト)image
    スキーマdefault
    テーブルods_user_info_d_odps
    パーティション情報${bizdate}
    書き込みモード上書き挿入(書き込み前に既存のデータをクリア)
    空文字列を NULL に変換して書き込みいいえ
  4. スケジューリングプロパティを構成します。右側ナビゲーションペインで プロパティ をクリックし、以下の値を設定します。詳細については、「ノードのスケジューリングプロパティ」をご参照ください。

    説明 DataWorks では、ノード出力を使用して、ノードとその子ノード間のスケジューリング依存関係を設定します。SQL ノードが同期ノードの出力テーブルを処理する場合、自動解析機能により、テーブルリネージに基づいて同期ノードが SQL ノードの親ノードとして設定されます。「ノード出力」で指定された <MaxCompute プロジェクト名(本番環境)>.ods_user_info_d_odps が存在することを確認してください。
    プロパティスクリーンショット
    スケジューリングパラメーター$bizdate(パラメーター名:bizdate;前日の日付を yyyymmdd 形式で照会)image
    スケジューリング周期image
    スケジュール時刻00:30
    再実行実行状態に関係なく許可
    リソースグループご利用のサーバーレスリソースグループimage
    依存関係workshop_start[親ノード] に表示されていることを確認します。表示されていない場合は、ステップ 1 のワークフロー設計を再確認してください。また、<本番環境の MaxCompute プロジェクト名>.ods_user_info_d_odps という名前のノード出力が存在することを確認します。存在しない場合は、手動で追加してください。image

OSS ログの MaxCompute への同期

このタスクは、HttpFile ソースから user_log.txt ファイルを読み取り、MaxCompute テーブル ods_raw_log_d_odps へ書き込みます。

ログファイルは、以下の形式で各行に 1 件のアクセスレコードを格納します。

$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];
  1. ods_raw_log_d_odps ノードをダブルクリックして、構成タブを開きます。

  2. ソース、リソースグループ、送信先を構成し、次へ をクリックして接続性テストを完了します。

    パラメーター
    ソースHttpFile — データソース名:user_behavior_analysis_HttpFile
    リソースグループご利用のサーバーレスリソースグループ
    送信先MaxCompute — データソース名:user_behavior_analysis_mysql

    image

  3. ソースおよび送信先の設定を構成します。ソースを構成した後、データ構造の確認 をクリックして、ログファイルが読み取れることを確認します。デフォルトのフィールドマッピングおよびチャネル制御設定を使用します。利用可能なすべてのオプションについては、「コードレス UI を使用したバッチ同期タスクの構成」をご参照ください。

    項目パラメータースクリーンショット
    ソースファイルパス/user_log.txtimage
    ファイルタイプtext
    列区切り文字|
    高度な構成コーディングUTF-8image
    圧縮形式UTF-8
    ヘッダーをスキップいいえ
    送信先Tunnel リソースグループ共通伝送リソース(デフォルト)image
    スキーマdefault
    テーブルods_raw_log_d_odps
    パーティション情報${bizdate}
    書き込みモード上書き挿入(書き込み前に既存のデータをクリア)
    空文字列を NULL に変換して書き込みいいえ
  4. スケジューリングプロパティを構成します。右側ナビゲーションペインで プロパティ をクリックし、以下の値を設定します。詳細については、「ノードのスケジューリングプロパティ」をご参照ください。

    説明 ノード出力出力名 <本番環境の MaxCompute プロジェクト名>.ods_raw_log_d_odps が存在することを確認してください。DataWorks は、この出力を下流の SQL ノードが同期ノードに依存する際に 自動解析 で使用します。
    属性スクリーンショット
    スケジューリングパラメーター$bizdate (パラメーター名: bizdate; 前日の日付を yyyymmdd 形式で照会します)image
    スケジューリングサイクルDayimage
    スケジュール時刻00:30
    再実行実行ステータスに関わらず許可
    リソースグループご利用のサーバーレスリソースグループimage
    依存関係workshop_start[親ノード] に表示されていることを確認します。また、<本番環境の MaxCompute プロジェクト名>.ods_raw_log_d_odps というノード出力が存在することを確認します。存在しない場合は手動で追加します。image

ステップ 5:ワークフローの実行および結果の検証

ワークフローの実行

  1. DataStudio で、ビジネスフロー 下の User profile analysis_MaxCompute ワークフローをダブルクリックします。

  2. ワークフローキャンバスで、ツールバーの image.png アイコンをクリックして、依存関係順にすべてのノードを実行します。

  3. ステータスを確認します:image

    • ノードのステータス: image.png 状態のノードは、正常に実行されたことを示します。

    • ノードのログ: ods_user_info_d_odps または ods_raw_log_d_odps を右クリックし、ログの表示 を選択します。ログに以下のような出力が表示された場合、データの同期が完了しています。

結果の検証

正常に実行された後:

  • MySQL テーブル ods_user_info_d のすべての行が、前日の日付でパーティション化された workshop2024_01_dev.ods_user_info_d_odps に格納されます。

  • OSS ファイル user_log.txt のすべての行が、前日の日付でパーティション化された workshop2024_01_dev.ods_raw_log_d_odps に格納されます。

データが正しく読み込まれたことを確認するために、アドホッククエリで行数を照会します。

  1. DataStudio の左側ナビゲーションウィンドウで、image.png アイコンをクリックします。アドホッククエリ ペインで、アドホッククエリ を右クリックし、ノードの作成 > ODPS SQL を選択します。

  2. 以下の SQL を実行して、行数を確認します。データタイムスタンプ を実行時のデータタイムスタンプ(例:ワークフローが 2023 年 6 月 21 日に実行された場合、データタイムスタンプは 20230620)に置き換えます。

    select count(*) from ods_user_info_d_odps where dt='データタイムスタンプ';
    select count(*) from ods_raw_log_d_odps where dt='データタイムスタンプ';

    image

説明 DataStudio でノードを実行すると、データは開発環境に書き込まれます。結果は MaxCompute プロジェクト workshop2024_01_dev に表示され、本番プロジェクトには表示されません。

次のステップ

データ同期は完了しました。次に、MaxCompute で読み込まれたデータを処理するための次のチュートリアルに進んでください。「データの処理