すべてのプロダクト
Search
ドキュメントセンター

DataWorks:データ同期

最終更新日:Mar 31, 2026

このチュートリアルでは、DataWorks Data Integration を使用して、MySQL データベースと OSS フラットファイルという 2 つの異種ソースから MaxCompute にデータを同期する方法について説明します。最終的に、MaxCompute の ods_user_info_d テーブルと ods_raw_log_d テーブルにデータが入力され、ダウンストリーム処理の準備が整います。

このチュートリアルでは、次のことを行います。

  1. DataWorks ワークスペースに MySQL および HttpFile データソースを追加します。

  2. Zero Load Node と 2 つのバッチ同期ノードを含む同期パイプラインを構築します。

  3. 送信先テーブル DDL、パーティションパラメーター、およびダーティデータポリシーを含む各同期タスクを構成します。

  4. パイプラインを実行し、データが MaxCompute に書き込まれていることを確認します。

前提条件

開始する前に、次のことを確認してください。

  • 環境の準備 を完了し、DataWorks ワークスペース、Serverless リソースグループ、およびパブリックネットワークアクセスを設定済みであること。

  • ワークスペースにアタッチ済みのサーバーレスリソースグループ(ない場合は、接続構成セクションのプロンプトに従って、[購入] をクリックし、次に [購入済みリソースグループの関連付け] をクリックします)

ステップ 1: データソースの作成

後続の同期タスクが生データにアクセスできるように、DataWorks ワークスペースに次の 2 つのデータソースを追加します。

他のユーザープロファイル分析チュートリアルで MySQL および HttpFile データソースをすでに作成している場合は、このステップをスキップしてください。

MySQL データソースの作成

このチュートリアルの基本的なユーザー情報は MySQL データベースに保存されています。user_behavior_analysis_mysql という名前の MySQL データソースを作成して、ods_user_info_d テーブルを MaxCompute にプルします。

  1. DataWorks コンソールにログインします。上部のナビゲーションバーで、リージョンを選択します。左側のナビゲーションウィンドウで、[詳細] > [管理センター] を選択します。ドロップダウンリストからワークスペースを選択し、[管理センターに移動] をクリックします。

  2. 左側のナビゲーションウィンドウで、[データソース]をクリックします。

  3. [データソースの追加]」をクリックし、次に検索して「[MySQL]」を選択します。

  4. [MySQL データソースの追加] ページで、以下のパラメーターを設定します。これらの値は、開発環境と本番環境の両方で使用します。記載されていないパラメーターは、デフォルト値のままにします。

    パラメーター
    データソース user_behavior_analysis_mysql
    データソースの説明 このデータソースは DataWorks チュートリアル用です。オフライン同期タスクを構成してプラットフォームが提供するテストデータにアクセスする際に、このデータソースからデータを読み取ります。このデータソースは Data Integration シナリオでのみ読み取り可能であり、他のモジュールでは使用できません。
    設定モード 接続文字列モード
    接続アドレス ホスト IP: rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com, ポート: 3306
    データベース名 workshop
    [ユーザー名] workshop
    パスワード workshop#2017
    [認証方法] 認証なし
  5. [接続設定] セクションで、両方の環境で [ネットワーク接続のテスト] をクリックし、ステータスが [接続済み] と表示されることを確認します。

    重要

    リソースグループは、パブリックネットワークアクセスが有効になっているワークスペースにアタッチされている必要があります。そうでない場合、データ同期は失敗します。

  6. [作成の完了] をクリックします。

HttpFile データソースの作成

このチュートリアルのユーザーウェブサイトアクセスログは OSS (Object Storage Service) に保存されています。user_behavior_analysis_httpfile という名前の HttpFile データソースを作成して、user_log.txt を MaxCompute にプルします。

  1. 左側のナビゲーションウィンドウで、[データソース] をクリックします。

  2. [データソースの追加] をクリックし、[HttpFile] を検索して選択します。

  3. HttpFile データソースの追加」ページで、以下のパラメーターを設定します。これらの値は、開発環境および本番環境の両方で使用します。一覧にないパラメーターについては、デフォルト値をそのまま使用します。

    パラメーター
    データソース名 user_behavior_analysis_httpfile
    データソースの説明 このデータソースは DataWorks チュートリアル用です。オフライン同期タスクを構成してプラットフォームが提供するテストデータにアクセスする際に、このデータソースからデータを読み取ります。このデータソースは Data Integration シナリオでのみ読み取り可能であり、他のモジュールでは使用できません。
    URL https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com
  4. [接続設定]」セクションで、両方の環境に対して「[ネットワーク接続のテスト]」をクリックします。ステータスが「[接続済み]」と表示されることを確認します。

    重要

    リソースグループは、パブリックネットワークアクセスが有効になっているワークスペースにアタッチされている必要があります。そうでない場合、データ同期は失敗します。

  5. [作成完了] をクリックします。

ステップ 2: 同期パイプラインの構築

2 つの同期タスクをオーケストレーションし、スケジュールに基づいて毎日実行されるワークフローを構築します。

  1. 左上隅の icon アイコンをクリックし、[すべての製品] > [データ開発およびタスク操作] > DataStudio を選択します。このチュートリアル用に作成したワークスペースに切り替えます。

  2. 左側のナビゲーションウィンドウで、image をクリックしてデータ開発ページを開きます。[ワークスペースディレクトリ] エリアで、image をクリックし、[ワークフローの作成] を選択して、名前を user_profile_analysis とします。

  3. ワークフローキャンバスで、左側のペインから 1 つの[ゼロロードノード]と 2 つの[バッチ同期]ノードをドラッグして配置し、次のように名前を付けます。

    ノードタイプ ノード名 機能
    Zero Load Node workshop_start ワークフロー全体を管理するドライラン先祖ノード。コード編集は不要です。
    バッチ同期 ods_user_info_d MySQL から MaxCompute の ods_user_info_d テーブルに基本的なユーザー情報を同期します。
    バッチ同期 ods_raw_log_d OSS から MaxCompute の ods_raw_log_d テーブルにユーザーウェブサイトアクセスログを同期します。
  4. 接続をドラッグして、workshop_start を両方のバッチ同期ノードの先祖ノードとして設定します。以下に示すとおりです。

    image

  5. ワークフローのスケジューリングを設定します。キャンバスで、右側のペインの[スケジューリング]をクリックし、次のパラメーターを設定します。記載されていないパラメーターは、デフォルト値のままにします。

    スケジューリングパラメーター
    スケジューリングパラメーター bizdate=$[yyyymmdd-1] — 前日の日付をすべての内部ノードに渡します
    スケジューリング周期 Day
    スケジュール時間 00:30
    [スケジューリングの依存関係] ワークスペースのルートノードを使用 をクリックして、ワークフローをルートノード (形式: workspace_name_root) にアタッチします。

ステップ 3: 同期タスクの構成

初期ノードの構成

  1. ワークフローキャンバスで、workshop_start にマウスカーソルを合わせ、[ノードを開く] をクリックします。

  2. 右側のペインで、[スケジューリング] をクリックし、次のパラメーターを設定します。記載されていないパラメーターについては、デフォルト値のままにします。

    スケジューリングパラメーター
    スケジューリングタイプ dry-run scheduling
    トンネルリソースグループ 環境の準備 で作成した Serverless リソースグループを選択します。
    スケジューリングの依存関係 ワークスペースルートノードの使用 をクリックして、ワークスペースルートノード (workspace_name_root) から実行をトリガーします。

ユーザーデータ同期タスク (ods_user_info_d) の構成

  1. ワークフローキャンバスで、ods_user_info_d にカーソルを合わせ、[ノードを開く] をクリックします。

  2. ソースと送信先を構成します。

    パラメーター
    ソース — データソースタイプ MySQL
    ソース — データソース名 user_behavior_analysis_mysql
    リソースグループ 環境の準備 から Serverless リソースグループを選択します。
    [宛先] — データ送信先タイプ MaxCompute(ODPS)
    送信先 — データソース名 環境の準備 でアタッチした MaxCompute リソースを選択します (例: MaxCompute_Source)。
  3. 同期タスクの詳細を設定するには、[次へ] をクリックします。

    1. 送信元と送信先の設定 次のパラメーターを設定します。一覧にないパラメーターについては、デフォルト値をそのまま使用します。

      モジュール 設定項目
      データソース テーブル ods_user_info_d
      データソース 分割キー uid (MySQL の ods_user_info_d テーブルのプライマリキー)
      デスティネーション トンネルリソースグループ 共通転送リソース (デフォルト) です。専用トンネルクォータがある場合は、ドロップダウンリストから選択できます。詳細については、「Data Transmission Service の専用リソースグループの購入と使用」をご参照ください。
      デスティネーション テーブル [宛先テーブルスキーマの生成] をクリックします。[テーブル作成ステートメント] ボックスに、以下の DDL を貼り付け、[テーブルの作成] をクリックします:
      モジュール 設定項目
      送信先 パーティション情報 ${bizdate}
         CREATE TABLE IF NOT EXISTS ods_user_info_d (
             uid STRING COMMENT 'User ID',
             gender STRING COMMENT 'Gender',
             age_range STRING COMMENT 'Age range',
             zodiac STRING COMMENT 'Zodiac sign'
         )
         PARTITIONED BY (
             dt STRING
         )
         LIFECYCLE 7;

      ${bizdate} を使用すると、テスト中に定数を渡し、スケジュールされた実行中に動的な値を渡すことができます。サポートされている変数フォーマットについては、「スケジューリングパラメーター」をご参照ください。

    2. フィールドマッピングとチャンネルコントロールを確認してください。 DataWorks では、フィールドマッピング構成に基づいて、ソースフィールドを送信先フィールドにマップします。 [不正データレコードのポリシー][不正データレコードを許可しない] に設定します。その他の設定はすべてデフォルト値のままにしてください。詳細については、「コードレス UI を使用した同期タスクの設定」をご参照ください。

  4. デバッグパラメーターを設定します。右側のペインで、[デバッグ設定] をクリックし、以下を設定します。

    構成項目
    リソースグループ 環境の準備 で作成した Serverless リソースグループを選択します。
    スクリプトパラメーター 空白のままにしてください。手順 4 でワークフローを実行する際、[この実行で使用する値]20250223 などの定数に設定します。ジョブは、この値で ${bizdate} を置き換えます。
  5. (オプション) スケジューリングプロパティを設定します。右側ペインで、[スケジューリング] をクリックします。ワークフロー レベルのスケジューリング パラメーターは既にこのノードに適用されています。個別の設定は必要ありません。詳細については、「ノード スケジューリング」をご参照ください。

  6. ノードツールバーで、[保存] をクリックします。

ユーザーログ同期タスク (ods_raw_log_d) の構成

  1. ワークフローキャンバスで、ods_raw_log_d にマウスオーバーし、[ノードを開く] をクリックします。

  2. ソースと送信先を構成します。

    パラメーター
    ソース — データソースタイプ HttpFile
    ソース — データソース名 user_behavior_analysis_httpfile
    リソースグループ 環境の準備 から Serverless リソースグループを選択します。
    宛先 — データ送信先タイプ MaxCompute(ODPS)
    宛先 — データソース名 環境の準備 から MaxCompute リソースを選択します (例: MaxCompute_Source)。
  3. 同期タスクの詳細を設定するには、[次へ] をクリックします。

    1. [送信元と送信先を設定する] 以下のパラメーターを設定します。一覧に記載されていないパラメーターについては、デフォルト値を保持します。

      重要

      データソースを設定した後、[データ構造の確認] をクリックして、続行する前に DataWorks がログファイルを正しく読み取れることを確認します。

      モジュール 構成項目
      データソース ファイルパス /user_log.txt
      データソース テキストタイプ text
      データソース 列区切り文字 |
      データソース 詳細設定 > ヘッダー行をスキップ いいえ
      送信先 Tunnel リソースグループ 共通伝送リソース(デフォルト)。専用の Tunnel クォータをお持ちの場合は、ドロップダウンリストから選択できます。詳細については、「Data Transmission Service の専用リソースグループの購入と利用」をご参照ください。
      送信先 テーブル 送信先テーブルスキーマの生成 をクリックします。テーブル作成ステートメント ボックスに、以下の DDL を貼り付け、テーブルを作成 をクリックします:
      モジュール 設定項目
      送信先 パーティション情報 ${bizdate}
         CREATE TABLE IF NOT EXISTS ods_raw_log_d
         (
             col STRING
         )
         PARTITIONED BY
         (
             dt STRING
         )
         LIFECYCLE 7;
    2. フィールドマッピングとチャンネルコントロールを確認します。 [ダーティデータレコードのポリシー][ダーティデータレコードを許可しない] に設定します。 他のすべての設定はデフォルト値のままにします。 詳細については、「コードレス UI を使用して同期タスクを設定する」をご参照ください。

  4. デバッグパラメーターを設定します。右側のペインで、[デバッグ設定] をクリックし、以下を設定します。

    設定項目
    リソースグループ 事前準備」から、Serverless のリソースグループを選択します。
    スクリプトパラメーター 空欄のままにしてください。手順 4 でワークフローを実行する際には、今回の実行で使用する値 を、20250223 のような定数に設定します。
  5. (オプション) スケジューリングプロパティを設定します。 右側のペインで、[スケジューリング] をクリックします。 ワークフローレベルのパラメーターがこのノードに自動的に適用されます。 詳細については、「ノードのスケジューリング」をご参照ください。

  6. ノードツールバーで、[保存] をクリックします。

ステップ 4: データ同期

  1. ワークフローツールバーで、[実行] をクリックします。[この実行で使用する値]bizdate 変数に設定します(例: 20250223) — そして [OK] をクリックします。実行が完了するまで待ちます。

  2. 結果を確認します。

    1. DataStudio の左側のナビゲーションウィンドウで、image をクリックしてデータ開発ページを開きます。個人フォルダエリアで、image をクリックして .sql 拡張子のファイルを作成します。

    2. ページの下部で、言語モードが MaxCompute SQL であることを確認します。image

    3. 次のクエリを実行して、各テーブルにロードされたレコードをカウントします。your_data_timestamp を、実行で使用された bizdate の実際の値 (例: 20250223) に置き換えます。

         -- Replace your_data_timestamp with the actual bizdate value (e.g., 20250223)
         SELECT count(*) FROM ods_user_info_d WHERE dt='your_data_timestamp';
         SELECT count(*) FROM ods_raw_log_d WHERE dt='your_data_timestamp';
      • 両方のクエリがゼロ以外のカウントを返す場合、データ同期は完了しています。

      • クエリがゼロを返した場合、[この実行で使用された値] がクエリ内の dt 値と一致していることを確認してください。確認するには、ワークフローをクリックし、右側ペインで [実行履歴] を開き、[操作] 列の [表示] をクリックして、実行ログ内のパーティション値 (partition=[pt=xxx]) を確認します。

学習した内容

このチュートリアルでは、次のことを行いました。

  • DataWorks ワークスペースに MySQL データソースと HttpFile データソースを追加しました。

  • Zero Load Node と 2 つのバッチ同期ノードを含む user_profile_analysis ワークフローを構築しました。

  • 各同期タスクのソースと送信先テーブル、パーティションパラメーター、およびダーティデータポリシーを構成しました。

  • パイプラインを実行し、データが MaxCompute の ods_user_info_d テーブルと ods_raw_log_d テーブルに書き込まれたことを確認しました。

次のステップ

データの処理 に進み、MaxCompute で同期されたデータを変換および分析します。