すべてのプロダクト
Search
ドキュメントセンター

DataWorks:データの同期

最終更新日:Feb 13, 2026

このトピックでは、ユーザー情報および Web サイトのログデータを同期する方法について説明します。HttpFile および MySQL のデータソースを作成し、Object Storage Service (OSS) バケットへのデータ同期パイプラインを構成して、データを解析するための Spark 外部テーブルを作成します。最後に、クエリを実行してデータが正しく同期されたことを確認します。

前提条件

開始前に、必要な環境を準備してください。詳細については、「環境の準備」をご参照ください。

1. データソースの作成

後続のステップでデータを処理できるようにするには、生データを取得するために DataWorks ワークスペースに以下のデータソースを追加する必要があります。

  • MySQL データソース:このチュートリアルでは、user_behavior_analysis_mysql という名前のデータソースを使用して、MySQL から基本的なユーザー情報 (ods_user_info_d) を取得します。

  • HttpFile データソース:このチュートリアルでは、user_behavior_analysis_httpfile という名前のデータソースを使用して、OSS に保存されているユーザーの Web サイトアクセスログ (user_log.txt) を取得します。

  • OSS データソース:MySQL および HttpFile データソースから同期されたユーザー情報および Web サイトアクセスレコードを格納します。後ほど、Spark 外部テーブルがこのデータを読み取ります。

MySQL データソース (user_behavior_analysis_mysql) の作成

このチュートリアルで提供されるユーザー情報は、MySQL データベースに保存されています。環境の準備 ステップで作成した非公開 OSS バケットに、MySQL データベースからユーザー情報データ (ods_user_info_d) を同期するための MySQL データソースを作成します。

  1. Data Source ページに移動します。

    1. DataWorks コンソール にログインします。上部のナビゲーションバーで目的のリージョンを選択し、左側のナビゲーションウィンドウで その他 > 管理センター を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、管理センターへ移動 をクリックします。

    2. 左側のナビゲーションウィンドウで、データソース をクリックして Data Source ページに移動します。

  2. データソースの追加 をクリックします。データソースタイプとして MySQL を検索して選択します。

  3. MySQL データソースの追加 ページでパラメーターを構成します。このチュートリアルでは、開発環境と本番環境の両方で同じサンプル値を使用します。

    以下の表は主要なパラメーターを示しています。その他のパラメーターはデフォルト値のままにしてください。

    パラメーター

    説明

    データソース名

    データソースの名前を入力します。このチュートリアルでは、user_behavior_analysis_mysql と入力します。

    データソースの説明

    このデータソースは DataWorks チュートリアル用です。バッチ同期タスクを構成してプラットフォームが提供するテストデータにアクセスする際に、このデータソースからデータを読み取ります。このデータソースはデータ統合シナリオでのみ読み取り可能であり、他のモジュールでは使用できません。

    構成モード

    接続文字列モード を選択します。

    接続アドレス

    • ホスト IP アドレス:rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com

    • ポート:3306

    データベース名

    データベース名を入力します。このチュートリアルでは、workshop と入力します。

    ユーザー名

    ユーザー名を入力します。このチュートリアルでは、workshop と入力します。

    パスワード

    パスワードを入力します。このチュートリアルでは、workshop#2017 と入力します。

    認証方法

    認証なし。

  4. 接続構成 セクションで、本番環境および開発環境の両方について ネットワーク接続性のテスト をクリックします。接続ステータスが 接続済み であることを確認してください。

    重要
    • リソースグループがワークスペースにアタッチされており、パブリックネットワークアクセスが有効になっていることを確認してください。そうでない場合、データ同期が失敗します。詳細については、「環境の準備」をご参照ください。

    • 利用可能なリソースグループがない場合は、接続構成セクションのプロンプトに従って、購入 および 購入済みリソースグループの関連付け をクリックします。

  5. 作成完了 をクリックします。

HttpFile データソース (user_behavior_analysis_httpfile) の作成

このチュートリアルで提供される Web サイトアクセスレコードは OSS に保存されています。環境の準備 ステップで作成した非公開 OSS バケットに、OSS から Web サイトアクセスレコード (user_log.txt) を同期するための HttpFile データソースを作成します。

  1. 左側のナビゲーションウィンドウで データソース をクリックします。

  2. データソースの追加 をクリックします。データソースの追加 ダイアログボックスで、データソースタイプとして HttpFile を検索して選択します。

  3. HttpFile データソースの追加 ページでパラメーターを構成します。このチュートリアルでは、開発環境と本番環境の両方で同じサンプル値を使用します。

    以下の表は主要なパラメーターを示しています。その他のパラメーターはデフォルト値のままにしてください。

    パラメーター

    説明

    データソース名

    データソース名を入力します。このチュートリアルでは、user_behavior_analysis_httpfile と入力します。

    データソースの説明

    このデータソースは DataWorks チュートリアル用です。バッチ同期タスクを構成してプラットフォームが提供するテストデータにアクセスする際に、このデータソースからデータを読み取ります。このデータソースはデータ統合シナリオでのみ読み取り可能であり、他のモジュールでは使用できません。

    URL

    開発環境および本番環境の両方で URLhttps://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com に設定します。

  4. 接続構成 セクションで、本番環境および開発環境の両方について ネットワーク接続性のテスト をクリックします。接続ステータスが 接続済み であることを確認してください。

    重要
    • リソースグループがワークスペースにアタッチされており、パブリックネットワークアクセスが有効になっていることを確認してください。そうでない場合、データ同期が失敗します。詳細については、「環境の準備」をご参照ください。

    • 利用可能なリソースグループがない場合は、接続構成セクションのプロンプトに従って、購入 および 購入済みリソースグループの関連付け をクリックします。

  5. 作成完了 をクリックします。

OSS データソースの作成

このチュートリアルでは、MySQL データソースからユーザー情報を同期し、HttpFile データソースからログ情報を同期して、OSS データソースに格納します。

  1. 管理センター ページで データソース ページに移動し、データソースの追加 をクリックします。

  2. データソースの追加 ダイアログボックスで、データソースタイプとして OSS を検索して選択します。

  3. OSS データソースの追加 ページでパラメーターを構成します。このチュートリアルでは、開発環境および本番環境の両方でサンプル値を使用します。

    パラメーター

    説明

    データソース名

    データソースの名前を入力します。このチュートリアルでは test_g を使用します。

    データソースの説明

    データソースの簡単な説明を入力します。

    アクセスモード

    AccessKey モード を選択します。

    AccessKey ID

    現在のアカウントの AccessKey ID です。AccessKey ページから AccessKey ID をコピーできます。

    AccessKey Secret

    現在のアカウントの AccessKey Secret を入力します。

    重要

    AccessKey Secret は作成時のみ表示され、後から確認することはできません。安全に保管してください。AccessKey が漏洩または紛失した場合は、削除して新しいものを作成してください。

    Endpoint

    http://oss-cn-shanghai-internal.aliyuncs.com を入力します。

    バケット

    非公開 OSS バケット の名前です。このチュートリアルでは dw-spark-demo を使用します。

  4. 接続ステータス (開発環境) および 接続ステータス (本番環境) 列の指定されたリソースグループについて、ネットワーク接続性のテスト をクリックします。テストが完了し、ステータスが 接続済み に変化するまで待ちます。

    説明

    少なくとも 1 つのリソースグループが 接続済み 状態である必要があります。そうでない場合、コードレス UI を使用してこのデータソースの同期タスクを作成できません。

  5. 作成完了 をクリックして OSS データソースを作成します。

2. 同期パイプラインの構築

  1. 左上隅の image アイコンをクリックし、すべてのプロダクト > データ開発およびタスク運用 > DataStudio を選択します。

  2. 左側のナビゲーションウィンドウで image をクリックします。ワークスペースディレクトリ エリアで image をクリックし、ワークフローの作成 を選択して、ワークフロー名を設定します。このチュートリアルでは User_profile_analysis_spark を使用します。

  3. ワークフローのオーケストレーションページで、左側からキャンバス上に ゼロロード ノード、バッチ同期 ノード、および EMR Spark SQL ノードをドラッグし、各ノードに名前を設定します。

    このチュートリアルでのノード名の例とその機能は以下のとおりです。

    ノードタイプ

    ノード名

    機能

    image ゼロロード

    workshop_start_spark

    ユーザー プロファイル分析ワークフロー全体(内部ノードの開始時刻など)を管理します。複雑なワークフローにおけるデータ転送パスを明確にします。このノードはドライランタスクであり、コード編集は不要です。

    imageバッチ同期ノード

    ods_raw_log_d_2oss_spark

    OSS からユーザーの Web サイトアクセスログを、作成した OSS バケットに同期します。

    imageバッチ同期ノード

    ods_user_info_d_2oss_spark

    MySQL からユーザー情報を、作成した OSS バケットに同期します。

    imageEMR SPARK SQL

    ods_raw_log_d_spark

    OSS に保存されているユーザーの Web サイトアクセスログを読み取るための ods_raw_log_d_spark 外部テーブルを作成します。

    imageEMR SPARK SQL

    ods_user_info_d_spark

    OSS に保存されているユーザー情報を読み取るための ods_user_info_d_spark 外部テーブルを作成します。

  4. 手動で線をドラッグしてノードを接続します。workShop_start_spark ノードを 2 つのバッチ同期ノードの先祖ノードに設定します。最終的な結果は以下のとおりです。

    image
  5. ワークフローのスケジューリングプロパティを構成します。

    ワークフローのキャンバスで、右側のペインの スケジューリング をクリックしてパラメーターを構成します。以下の表は主要なパラメーターを示しています。その他のパラメーターはデフォルト値のままにしてください。

    スケジューリングパラメーター

    説明

    スケジューリングパラメーター

    ワークフロー全体のスケジューリングパラメーターを構成できます。ワークフローの内部ノードは、構成されたスケジューリングパラメーターを直接使用できます。このチュートリアルでは、前日の日付を取得するためにパラメーターを bizdate=$[yyyymmdd-1] に設定します。

    スケジューリング周期

    このチュートリアルでは に設定します。

    スケジューリング時刻

    このチュートリアルでは、スケジューリング時刻00:30 に設定します。つまり、ワークフローは毎日 00:30 に開始されます。

    スケジューリング依存関係

    ワークフローに上流の依存関係がないため、構成する必要はありません。管理を容易にするために、ワークスペースルートノードを使用 をクリックして、ワークフローをワークスペースのルートノードにアタッチします。

    ワークスペースルートノードの命名形式は workspace_name_root です。

3. 同期タスクの構成

初期ノードの構成

  1. ワークフローのオーケストレーションページで、workshop_start_spark ノードにカーソルを合わせ、ノードを開く をクリックします。

  2. workshop_start_spark ノードの編集ページの右側で、スケジューリング をクリックしてパラメーターを構成します。以下の表はこのチュートリアルの主要なパラメーターを示しています。言及されていないパラメーターはデフォルト値のままにしてください。

    スケジューリング構成パラメーター

    説明

    リソースグループ

    このチュートリアルでは、環境の準備 ステップで作成したサーバーレスリソースグループを選択します。

    ノード依存関係構成

    workshop_start_spark は初期ノードであるため、上流の依存関係がありません。ワークスペースルートノードを使用 をクリックして、ワークスペースルート頂点によってワークフローをトリガーします。

    ワークスペースルート頂点の名前は次のとおりです:WorkspaceName_root

ユーザー ログ同期パイプライン (ods_raw_log_d_2oss_spark) の構成

バッチ ods_raw_log_d_2oss_spark ノードは、HttpFile データソースからユーザー ログ情報を非公開 OSS データソースに同期します。

  1. ビジネスフロー開発パネルで、ods_raw_log_d_2oss_spark ノードにカーソルを合わせ、ノードを開くボタンをクリックしてノード構成ページに移動します。

  2. ネットワークとリソースを構成します。

    パラメーター

    説明

    ソース

    • データソース:HttpFile

    • データソース名:user_behavior_analysis_httpfile

    マイ リソースグループ

    環境準備時に購入したサーバーレスリソースグループを選択します。

    宛先

    • データ宛先:OSS

    • データソース名:作成した非公開 OSS データソースを選択します。このチュートリアルでは test_g を使用します。

    重要

    ネットワーク接続が失敗した場合は、サーバーレスリソースグループでパブリックネットワークが有効になっているか確認してください。

  3. 次へ をクリックして同期タスクを構成します。

    • データソースおよび宛先の構成

      以下の表はこのチュートリアルの主要なパラメーターを示しています。言及されていないパラメーターはデフォルト値のままにしてください。

      パラメーター

      説明

      ソース

      • ファイルパス/user_log.txt

      • テキストタイプtext を選択します。

      • カラム区切り文字| を入力します。

      • 圧縮フォーマットなし を選択します。

      • ヘッダーをスキップ:いいえを選択します。

      宛先

      • テキストタイプtext を選択します。

      • ファイル名 (パスを含む):OSS ディレクトリに基づいてパスを入力します。例:ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt。このパスでは、ods_raw_log_d は作成したディレクトリ、$bizdate は前日の日付を表します。

      • カラム区切り文字| を入力します。

    • フィールドマッピング および チャネル制御 を確認します。

      DataWorks は、構成されたフィールドマッピングに基づいて、指定されたソースフィールドから指定された宛先フィールドにデータを同期します。同時実行数を設定したり、ダーティデータのポリシーを構成したりすることもできます。このチュートリアルでは、ダーティデータレコードのポリシーダーティデータレコードを許可しない に設定します。その他の設定はデフォルト値のままにしてください。詳細については、「コードレス UI を使用した同期タスクの構成」をご参照ください。

  4. テストパラメーターを構成します。

    バッチ同期タスク構成ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト構成を使用してテストを実行するため、以下のパラメーターを構成します。

    構成項目

    構成の説明

    リソースグループ

    環境の準備 ステップで購入したサーバーレスリソースグループを選択します。

    スクリプトパラメーター

    パラメーターの追加 をクリックし、bizdate を yyyymmdd 形式に設定します。例:bizdate=20250223。テスト中、DataStudio はタスクで定義された変数をこの定数に置き換えます。

  5. (任意)スケジューリングプロパティを構成します。

    このチュートリアルでは、スケジューリング構成パラメーターをデフォルト値のままにしてください。ノード編集ページの右側で、スケジューリング をクリックします。スケジューリング構成のパラメーターの詳細については、「ノードスケジューリング構成」をご参照ください。

    • スケジューリングパラメーター:このチュートリアルでは、ワークフローレベルでスケジューリングパラメーターを構成します。内部ノードは個別に構成する必要はなく、タスクまたはコード内で直接パラメーターを使用できます。

    • スケジューリングポリシー遅延実行時間 パラメーターで、ワークフロー開始後に子ノードが実行を待機する時間を指定できます。このチュートリアルでは設定しません。

  6. 上部のツールバーで、保存 をクリックして現在のノードを保存します。

ユーザー データ同期パイプライン (ods_user_info_d_2oss_spark) の構成

バッチ ods_user_info_d_2oss_Spark ノードは、MySQL データソースからユーザー データを非公開 OSS データソースに同期します。

  1. ワークフローのオーケストレーションページで、ods_user_info_d_2oss_Spark ノードにカーソルを合わせ、ノードを開く をクリックします。

  2. 同期パイプラインのネットワークとリソースを構成します。

    パラメーター

    説明

    ソース

    • データソース:MySQL

    • データソース名:user_behavior_analysis_mysql

    マイ リソースグループ

    環境準備時に購入したサーバーレスリソースグループを選択します。

    宛先

    • データ宛先:OSS

    • データソース名:作成した非公開 OSS データソースを選択します。このチュートリアルでは test_g を使用します。

    重要

    ネットワーク接続が失敗した場合は、サーバーレスリソースグループでパブリックネットワークが有効になっているか確認してください。

  3. 次へ をクリックして同期タスクを構成します。

    • データソースおよび宛先の構成

      以下の表はこのチュートリアルの主要なパラメーターを示しています。言及されていないパラメーターはデフォルト値のままにしてください。

      パラメーター

      説明

      ソース

      • テーブル:データソースから ods_user_info_d を選択します。

      • 分割キー:プライマリキーまたはインデックス付きカラムを分割キーとして使用します。整数型フィールドのみサポートされています。分割キーを uid に設定します。

      宛先

      • テキストタイプtext を選択します。

      • ファイル名 (パスを含む):OSS ディレクトリに基づいてパスを入力します。例:ods_user_info_d/user_${bizdate}/user_${bizdate}.txt。このパスでは、ods_user_info_d は作成したディレクトリ、$bizdate は前日の日付を表します。

      • カラム区切り文字| を入力します。

    • フィールドマッピング および チャネル制御 を確認します。

      DataWorks は、構成されたフィールドマッピングに基づいて、指定されたソースフィールドから指定された宛先フィールドにデータを同期します。同時実行数を設定したり、ダーティデータのポリシーを構成したりすることもできます。このチュートリアルでは、ダーティデータレコードのポリシーダーティデータレコードを許可しない に設定します。その他の設定はデフォルト値のままにしてください。詳細については、「コードレス UI を使用した同期タスクの構成」をご参照ください。

  4. テストパラメーターを構成します。

    バッチ同期タスク構成ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト構成を使用してテストを実行するため、以下のパラメーターを構成します。

    構成項目

    構成の説明

    リソースグループ

    環境の準備 ステップで購入したサーバーレスリソースグループを選択します。

    スクリプトパラメーター

    パラメーターの追加 をクリックし、bizdate を yyyymmdd 形式に設定します。例:bizdate=20250223。テスト中、DataStudio はタスクで定義された変数をこの定数に置き換えます。

  5. (任意)スケジューリングプロパティを構成します。

    このチュートリアルでは、スケジューリング構成パラメーターをデフォルト値のままにしてください。ノード編集ページの右側で、スケジューリング をクリックします。スケジューリング構成のパラメーターの詳細については、「ノードスケジューリング構成」をご参照ください。

    • スケジューリングパラメーター:このチュートリアルでは、ワークフローレベルでスケジューリングパラメーターを構成します。内部ノードは個別に構成する必要はなく、タスクまたはコード内で直接パラメーターを使用できます。

    • スケジューリングポリシー遅延実行時間 パラメーターで、ワークフロー開始後に子ノードが実行を待機する時間を指定できます。このチュートリアルでは設定しません。

  6. 上部のツールバーで、保存 をクリックして現在のノードを保存します。

4. データの同期

  1. データを同期します。

    ワークフローのキャンバス上部のツールバーで、実行 をクリックします。この実行で各ノードに定義されたパラメーター変数の値を設定します。このチュートリアルでは 20250223 を使用しますが、必要に応じて変更できます。OK をクリックして、実行が完了するまで待ちます。

  2. 同期結果を表示します。

    ワークフローが正常に実行された後、OSS コンソール にログインします。新しく作成された OSS データソース のバケットを確認し、/ods_user_info_d および /ods_raw_log_d ディレクトリに対応するディレクトリとデータが存在するかどうかを確認します。

5. データの解析

データが同期された後、Spark SQL を使用して外部テーブルを作成し、OSS に保存されているユーザー情報および Web サイトアクセスレコードを解析します。

ログテーブル (ods_raw_log_d_spark) の作成とデータの解析

バッチ統合タスクによってデータが非公開 OSS データソースに同期された後、生成された OSS ファイルに基づいて EMR SPARK SQL ノードによって作成された外部テーブル ods_raw_log_d_spark に書き込まれます。

  1. ワークフローのオーケストレーションページで、ods_raw_log_d_spark ノードにカーソルを合わせ、ノードを開く をクリックします。

  2. テーブル作成文を編集します。

    Paimon テーブル (DLF)

    -- 1. 追加専用の Paimon テーブルを作成します。
    CREATE TABLE IF NOT EXISTS ods_raw_log_d_spark (
      `col` STRING,
      `dt` STRING   -- パーティションキーを通常のカラムとしてもテーブルに含めます。これは Paimon の推奨プラクティスです。
    )
    PARTITIONED BY (dt)
    TBLPROPERTIES (
      'format' = 'paimon'
    );
    
    -- 2. OSS 上のソースファイルを指し示し、解析する一時ビューを作成します。
    CREATE TEMPORARY VIEW source_of_logs
    (
        -- ビューにはプレーンテキスト行全体を読み取るための 1 つのカラムのみがあります。
        `value` STRING 
    )
    USING TEXT
    OPTIONS (
        path 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'
    );
    
    INSERT INTO ods_raw_log_d_spark
    SELECT 
      value,      -- 生ログ行
      '${bizdate}' AS dt -- SELECT 文でパーティション値を直接指定します。
    FROM source_of_logs;

    Hive テーブル (DLF-Legacy)

    -- シナリオ:以下の Spark SQL 文は、EMR Spark SQL ノードを使用して ods_raw_log_d_spark 外部テーブルを作成します。LOCATION 句を使用して、バッチデータ統合タスクによって非公開 OSS バケットに書き込まれたログ情報を取得し、対応する dt パーティションを追加します。
    -- 注:
    --      DataWorks はスケジューリングパラメーターを提供しており、スケジューリングシナリオでは、日々の増分データをターゲットテーブルの対応するビジネスパーティションに書き込むことができます。
    --      実際の開発シナリオでは、${variable_name} 形式でコード変数を定義し、スケジューリング構成ページで値を割り当てることができます。これにより、スケジュールされた実行中にコード内で動的パラメーター入力が可能になります。
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark
    (
      `col` STRING
    ) 
    PARTITIONED BY (
      dt STRING
    )
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/';
    
    ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'
    
    説明

    コード内の location アドレスを実際のアドレスに置き換えてください。dw-spark-demo は、OSS 環境の準備 時に作成した OSS バケットの名前です。

  3. テストパラメーターを構成します。

    EMR SPARK SQL タスク構成ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト構成を使用してテストを実行するため、以下のパラメーターを構成します。

    構成項目

    構成の説明

    コンピューティングリソース

    環境の準備 ステップでアタッチした Spark コンピューティングリソースを選択します。

    リソースグループ

    環境の準備 ステップで購入したサーバーレスリソースグループを選択します。

    スクリプトパラメーター

    パラメーターの追加 をクリックし、bizdate を yyyymmdd 形式に設定します。例:bizdate=20250223。テスト中、DataStudio はタスクで定義された変数をこの定数に置き換えます。

  4. (任意)スケジューリングプロパティを構成します。

    このチュートリアルでは、スケジューリング構成パラメーターをデフォルト値のままにしてください。ノード編集ページの右側で、スケジューリング をクリックします。スケジューリング構成のパラメーターの詳細については、「ノードスケジューリング構成」をご参照ください。

    • スケジューリングパラメーター:このチュートリアルでは、ワークフローレベルでスケジューリングパラメーターを構成します。内部ノードは個別に構成する必要はなく、タスクまたはコード内で直接パラメーターを使用できます。

    • スケジューリングポリシー遅延実行時間 パラメーターで、ワークフロー開始後に子ノードが実行を待機する時間を指定できます。このチュートリアルでは設定しません。

  5. 上部のツールバーで、保存 をクリックして現在のノードを保存します。

ユーザーテーブル (ods_user_info_d_spark) の作成とデータの解析

バッチ統合タスクによってデータが非公開 OSS データソースに同期された後、生成された OSS ファイルに基づいて外部テーブル ods_user_info_d_spark に書き込まれます。

  1. ワークフローのオーケストレーションページで、ods_user_info_d_spark ノードにカーソルを合わせ、ノードを開く をクリックします。

  2. 同期リンク、ネットワーク、およびリソースを構成します。

    Paimon テーブル (DLF)

    -- 1. 宛先として Paimon テーブルを作成します。
    CREATE TABLE IF NOT EXISTS ods_user_info_d_spark (
        `uid`        STRING COMMENT 'ユーザー ID',
        `gender`     STRING COMMENT '性別',
        `age_range`  STRING COMMENT '年齢層',
        `zodiac`     STRING COMMENT '星座',
        `dt`         STRING COMMENT 'パーティション日付'
    ) 
    PARTITIONED BY (dt)
    TBLPROPERTIES (
        'format' = 'paimon'
    );
    
    -- 2. OSS 上のソースファイルを指し示し、解析する一時ビューを作成します。
    CREATE TEMPORARY VIEW source_of_user_info
    (
        -- ビューにはプレーンテキスト行全体を読み取るための 1 つのカラムのみがあります。
        `value` STRING 
    )
    USING TEXT -- Spark に対してプレーンテキストファイルであることを指示します。
    OPTIONS (
        path 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/'
    );
    
    -- 3. 一時ビューからデータをクエリおよび解析し、Paimon テーブルに挿入します。
    INSERT INTO ods_user_info_d_spark
    SELECT 
        -- split 関数を使用して、生テキスト行を '|' で分割します。
        split(value, '\\|')[0] AS uid,
        split(value, '\\|')[1] AS gender,
        split(value, '\\|')[2] AS age_range,
        split(value, '\\|')[3] AS zodiac,
        '${bizdate}' AS dt -- パーティションフィールドに値を割り当てます。
    FROM 
        source_of_user_info;

    Hive テーブル (DLF-Legacy)

    -- シナリオ:以下の Spark SQL 文は、EMR Spark SQL ノードを使用して ods_user_info_d_spark 外部テーブルを作成します。LOCATION 句を使用して、バッチデータ統合タスクによって非公開 OSS バケットに書き込まれたユーザー情報を取得し、対応する dt パーティションに書き込みます。
    -- 注:
    --      DataWorks はスケジューリングパラメーターを提供しており、スケジューリングシナリオでは、日々の増分データをターゲットテーブルの対応するビジネスパーティションに書き込むことができます。
    --      実際の開発シナリオでは、${variable_name} 形式でコード変数を定義し、スケジューリング構成ページで値を割り当てることができます。これにより、スケジュールされた実行中にコード内で動的パラメーター入力が可能になります。
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark
    (
        `uid`        STRING COMMENT 'ユーザー ID'
        ,`gender`    STRING COMMENT '性別'
        ,`age_range` STRING COMMENT '年齢層'
        ,`zodiac`    STRING COMMENT '星座'
    )
    PARTITIONED BY 
    (
        dt           STRING
    )
    ROW FORMAT DELIMITED 
    FIELDS
    TERMINATED
    BY'|'
    STORED AS TEXTFILE
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/'
    ;
    
    ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/'
    ;
    説明

    コード内の location アドレスを実際のアドレスに置き換えてください。dw-spark-demo は、OSS 環境の準備 時に作成した OSS バケットの名前です。

  3. テストパラメーターを構成します。

    EMR SPARK SQL タスク構成ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト構成を使用してテストを実行するため、以下のパラメーターを構成します。

    構成項目

    構成の説明

    コンピューティングリソース

    環境の準備 ステップでアタッチした Spark コンピューティングリソースを選択します。

    リソースグループ

    環境の準備 ステップで購入したサーバーレスリソースグループを選択します。

    スクリプトパラメーター

    パラメーターの追加 をクリックし、bizdate を yyyymmdd 形式に設定します。例:bizdate=20250223。テスト中、DataStudio はタスクで定義された変数をこの定数に置き換えます。

  4. (任意)スケジューリングプロパティを構成します。

    このチュートリアルでは、スケジューリング構成パラメーターをデフォルト値のままにしてください。ノード編集ページの右側で、スケジューリング をクリックします。スケジューリング構成のパラメーターの詳細については、「ノードスケジューリング構成」をご参照ください。

    • スケジューリングパラメーター:このチュートリアルでは、ワークフローレベルでスケジューリングパラメーターを構成します。内部ノードは個別に構成する必要はなく、タスクまたはコード内で直接パラメーターを使用できます。

    • スケジューリングポリシー遅延実行時間 パラメーターで、ワークフロー開始後に子ノードが実行を待機する時間を指定できます。このチュートリアルでは設定しません。

  5. 上部のツールバーで、保存 をクリックして現在のノードを保存します。

6. タスクの実行

  1. データを同期します。

    ワークフローツールバーで、実行 をクリックします。この実行で各ノードに定義されたパラメーター変数の値を設定します。このチュートリアルでは 20250223 を使用しますが、必要に応じて値を変更できます。その後、OK をクリックして、実行が完了するまで待ちます。

  2. データ同期結果をクエリします。

    このセクションのすべてのノードが正常に実行された後、以下の SQL クエリを記述して、EMR SPARK SQL ノードによって作成された外部テーブルが正しく生成されたかどうかを確認します。

    1. ods_raw_log_d_spark テーブルの結果を検証します。

      -- 現在の操作の実際のデータタイムスタンプにパーティションフィルター条件を更新する必要があります。たとえば、タスクが 20250223 に実行された場合、データタイムスタンプはタスク実行日の前日である 20250222 です。
      SELECT * FROM ods_raw_log_d_spark WHERE dt='${bizdate}';--ods_raw_log_d_spark テーブルをクエリ

    2. ods_user_info_d_spark テーブルの結果を検証します。

      -- 現在の操作の実際のデータタイムスタンプにパーティションフィルター条件を更新する必要があります。たとえば、タスクが 20250223 に実行された場合、データタイムスタンプはタスク実行日の前日である 20250222 です。
      SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}';--ods_user_info_d_spark テーブルをクエリ

次のステップ

これで、ログデータを同期する方法を学びました。次のチュートリアルに進み、同期されたデータを処理および分析する方法を学ぶことができます。詳細については、「データの処理」をご参照ください。

よくある質問

  • Q:テーブルを作成する際に、エラー Option 'path' is not allowed for Normal Paimon table, please remove it in table options. が報告されます。

    A:これは構文エラーです。このトピックの Paimon テーブル (DLF) セクションに示されているとおりにテーブルを作成してください。