このトピックでは、ユーザー情報および Web サイトのログデータを同期する方法について説明します。HttpFile および MySQL のデータソースを作成し、Object Storage Service (OSS) バケットへのデータ同期パイプラインを構成して、データを解析するための Spark 外部テーブルを作成します。最後に、クエリを実行してデータが正しく同期されたことを確認します。
前提条件
開始前に、必要な環境を準備してください。詳細については、「環境の準備」をご参照ください。
1. データソースの作成
後続のステップでデータを処理できるようにするには、生データを取得するために DataWorks ワークスペースに以下のデータソースを追加する必要があります。
MySQL データソース:このチュートリアルでは、
user_behavior_analysis_mysqlという名前のデータソースを使用して、MySQL から基本的なユーザー情報 (ods_user_info_d) を取得します。HttpFile データソース:このチュートリアルでは、
user_behavior_analysis_httpfileという名前のデータソースを使用して、OSS に保存されているユーザーの Web サイトアクセスログ (user_log.txt) を取得します。OSS データソース:MySQL および HttpFile データソースから同期されたユーザー情報および Web サイトアクセスレコードを格納します。後ほど、Spark 外部テーブルがこのデータを読み取ります。
MySQL データソース (user_behavior_analysis_mysql) の作成
このチュートリアルで提供されるユーザー情報は、MySQL データベースに保存されています。環境の準備 ステップで作成した非公開 OSS バケットに、MySQL データベースからユーザー情報データ (ods_user_info_d) を同期するための MySQL データソースを作成します。
Data Source ページに移動します。
DataWorks コンソール にログインします。上部のナビゲーションバーで目的のリージョンを選択し、左側のナビゲーションウィンドウで を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、管理センターへ移動 をクリックします。
左側のナビゲーションウィンドウで、データソース をクリックして Data Source ページに移動します。
データソースの追加 をクリックします。データソースタイプとして MySQL を検索して選択します。
MySQL データソースの追加 ページでパラメーターを構成します。このチュートリアルでは、開発環境と本番環境の両方で同じサンプル値を使用します。
以下の表は主要なパラメーターを示しています。その他のパラメーターはデフォルト値のままにしてください。
パラメーター
説明
データソース名
データソースの名前を入力します。このチュートリアルでは、
user_behavior_analysis_mysqlと入力します。データソースの説明
このデータソースは DataWorks チュートリアル用です。バッチ同期タスクを構成してプラットフォームが提供するテストデータにアクセスする際に、このデータソースからデータを読み取ります。このデータソースはデータ統合シナリオでのみ読み取り可能であり、他のモジュールでは使用できません。
構成モード
接続文字列モード を選択します。
接続アドレス
ホスト IP アドレス:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.comポート:
3306
データベース名
データベース名を入力します。このチュートリアルでは、
workshopと入力します。ユーザー名
ユーザー名を入力します。このチュートリアルでは、
workshopと入力します。パスワード
パスワードを入力します。このチュートリアルでは、
workshop#2017と入力します。認証方法
認証なし。
接続構成 セクションで、本番環境および開発環境の両方について ネットワーク接続性のテスト をクリックします。接続ステータスが 接続済み であることを確認してください。
重要リソースグループがワークスペースにアタッチされており、パブリックネットワークアクセスが有効になっていることを確認してください。そうでない場合、データ同期が失敗します。詳細については、「環境の準備」をご参照ください。
利用可能なリソースグループがない場合は、接続構成セクションのプロンプトに従って、購入 および 購入済みリソースグループの関連付け をクリックします。
作成完了 をクリックします。
HttpFile データソース (user_behavior_analysis_httpfile) の作成
このチュートリアルで提供される Web サイトアクセスレコードは OSS に保存されています。環境の準備 ステップで作成した非公開 OSS バケットに、OSS から Web サイトアクセスレコード (user_log.txt) を同期するための HttpFile データソースを作成します。
左側のナビゲーションウィンドウで データソース をクリックします。
データソースの追加 をクリックします。データソースの追加 ダイアログボックスで、データソースタイプとして HttpFile を検索して選択します。
HttpFile データソースの追加 ページでパラメーターを構成します。このチュートリアルでは、開発環境と本番環境の両方で同じサンプル値を使用します。
以下の表は主要なパラメーターを示しています。その他のパラメーターはデフォルト値のままにしてください。
パラメーター
説明
データソース名
データソース名を入力します。このチュートリアルでは、
user_behavior_analysis_httpfileと入力します。データソースの説明
このデータソースは DataWorks チュートリアル用です。バッチ同期タスクを構成してプラットフォームが提供するテストデータにアクセスする際に、このデータソースからデータを読み取ります。このデータソースはデータ統合シナリオでのみ読み取り可能であり、他のモジュールでは使用できません。
URL
開発環境および本番環境の両方で URL を
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.comに設定します。接続構成 セクションで、本番環境および開発環境の両方について ネットワーク接続性のテスト をクリックします。接続ステータスが 接続済み であることを確認してください。
重要リソースグループがワークスペースにアタッチされており、パブリックネットワークアクセスが有効になっていることを確認してください。そうでない場合、データ同期が失敗します。詳細については、「環境の準備」をご参照ください。
利用可能なリソースグループがない場合は、接続構成セクションのプロンプトに従って、購入 および 購入済みリソースグループの関連付け をクリックします。
作成完了 をクリックします。
OSS データソースの作成
このチュートリアルでは、MySQL データソースからユーザー情報を同期し、HttpFile データソースからログ情報を同期して、OSS データソースに格納します。
管理センター ページで データソース ページに移動し、データソースの追加 をクリックします。
データソースの追加 ダイアログボックスで、データソースタイプとして OSS を検索して選択します。
OSS データソースの追加 ページでパラメーターを構成します。このチュートリアルでは、開発環境および本番環境の両方でサンプル値を使用します。
パラメーター
説明
データソース名
データソースの名前を入力します。このチュートリアルでは test_g を使用します。
データソースの説明
データソースの簡単な説明を入力します。
アクセスモード
AccessKey モード を選択します。
AccessKey ID
現在のアカウントの AccessKey ID です。AccessKey ページから AccessKey ID をコピーできます。
AccessKey Secret
現在のアカウントの AccessKey Secret を入力します。
重要AccessKey Secret は作成時のみ表示され、後から確認することはできません。安全に保管してください。AccessKey が漏洩または紛失した場合は、削除して新しいものを作成してください。
Endpoint
http://oss-cn-shanghai-internal.aliyuncs.comを入力します。バケット
非公開 OSS バケット の名前です。このチュートリアルでは
dw-spark-demoを使用します。接続ステータス (開発環境) および 接続ステータス (本番環境) 列の指定されたリソースグループについて、ネットワーク接続性のテスト をクリックします。テストが完了し、ステータスが 接続済み に変化するまで待ちます。
説明少なくとも 1 つのリソースグループが 接続済み 状態である必要があります。そうでない場合、コードレス UI を使用してこのデータソースの同期タスクを作成できません。
作成完了 をクリックして OSS データソースを作成します。
2. 同期パイプラインの構築
左上隅の
アイコンをクリックし、 を選択します。左側のナビゲーションウィンドウで
をクリックします。ワークスペースディレクトリ エリアで
をクリックし、ワークフローの作成 を選択して、ワークフロー名を設定します。このチュートリアルでは User_profile_analysis_sparkを使用します。ワークフローのオーケストレーションページで、左側からキャンバス上に ゼロロード ノード、バッチ同期 ノード、および EMR Spark SQL ノードをドラッグし、各ノードに名前を設定します。
このチュートリアルでのノード名の例とその機能は以下のとおりです。
ノードタイプ
ノード名
機能
ゼロロードworkshop_start_sparkユーザー プロファイル分析ワークフロー全体(内部ノードの開始時刻など)を管理します。複雑なワークフローにおけるデータ転送パスを明確にします。このノードはドライランタスクであり、コード編集は不要です。
バッチ同期ノードods_raw_log_d_2oss_sparkOSS からユーザーの Web サイトアクセスログを、作成した OSS バケットに同期します。
バッチ同期ノードods_user_info_d_2oss_sparkMySQL からユーザー情報を、作成した OSS バケットに同期します。
EMR SPARK SQLods_raw_log_d_sparkOSS に保存されているユーザーの Web サイトアクセスログを読み取るための
ods_raw_log_d_spark外部テーブルを作成します。
EMR SPARK SQLods_user_info_d_sparkOSS に保存されているユーザー情報を読み取るための
ods_user_info_d_spark外部テーブルを作成します。手動で線をドラッグしてノードを接続します。
workShop_start_sparkノードを 2 つのバッチ同期ノードの先祖ノードに設定します。最終的な結果は以下のとおりです。ワークフローのスケジューリングプロパティを構成します。
ワークフローのキャンバスで、右側のペインの スケジューリング をクリックしてパラメーターを構成します。以下の表は主要なパラメーターを示しています。その他のパラメーターはデフォルト値のままにしてください。
スケジューリングパラメーター
説明
スケジューリングパラメーター
ワークフロー全体のスケジューリングパラメーターを構成できます。ワークフローの内部ノードは、構成されたスケジューリングパラメーターを直接使用できます。このチュートリアルでは、前日の日付を取得するためにパラメーターを
bizdate=$[yyyymmdd-1]に設定します。スケジューリング周期
このチュートリアルでは
日に設定します。スケジューリング時刻
このチュートリアルでは、スケジューリング時刻 を
00:30に設定します。つまり、ワークフローは毎日00:30に開始されます。スケジューリング依存関係
ワークフローに上流の依存関係がないため、構成する必要はありません。管理を容易にするために、ワークスペースルートノードを使用 をクリックして、ワークフローをワークスペースのルートノードにアタッチします。
ワークスペースルートノードの命名形式は
workspace_name_rootです。
3. 同期タスクの構成
初期ノードの構成
ワークフローのオーケストレーションページで、
workshop_start_sparkノードにカーソルを合わせ、ノードを開く をクリックします。workshop_start_sparkノードの編集ページの右側で、スケジューリング をクリックしてパラメーターを構成します。以下の表はこのチュートリアルの主要なパラメーターを示しています。言及されていないパラメーターはデフォルト値のままにしてください。スケジューリング構成パラメーター
説明
リソースグループ
このチュートリアルでは、環境の準備 ステップで作成したサーバーレスリソースグループを選択します。
ノード依存関係構成
workshop_start_sparkは初期ノードであるため、上流の依存関係がありません。ワークスペースルートノードを使用 をクリックして、ワークスペースルート頂点によってワークフローをトリガーします。ワークスペースルート頂点の名前は次のとおりです:
WorkspaceName_root。
ユーザー ログ同期パイプライン (ods_raw_log_d_2oss_spark) の構成
バッチ ods_raw_log_d_2oss_spark ノードは、HttpFile データソースからユーザー ログ情報を非公開 OSS データソースに同期します。
ビジネスフロー開発パネルで、
ods_raw_log_d_2oss_sparkノードにカーソルを合わせ、ノードを開くボタンをクリックしてノード構成ページに移動します。ネットワークとリソースを構成します。
パラメーター
説明
ソース
データソース:
HttpFile。データソース名:
user_behavior_analysis_httpfile。
マイ リソースグループ
環境準備時に購入したサーバーレスリソースグループを選択します。
宛先
データ宛先:
OSS。データソース名:作成した非公開 OSS データソースを選択します。このチュートリアルでは test_g を使用します。
重要ネットワーク接続が失敗した場合は、サーバーレスリソースグループでパブリックネットワークが有効になっているか確認してください。
次へ をクリックして同期タスクを構成します。
データソースおよび宛先の構成
以下の表はこのチュートリアルの主要なパラメーターを示しています。言及されていないパラメーターはデフォルト値のままにしてください。
パラメーター
説明
ソース
ファイルパス:/user_log.txt。
テキストタイプ:text を選択します。
カラム区切り文字:| を入力します。
圧縮フォーマット:なし を選択します。
ヘッダーをスキップ:いいえを選択します。
宛先
テキストタイプ:text を選択します。
ファイル名 (パスを含む):OSS ディレクトリに基づいてパスを入力します。例:ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt。このパスでは、ods_raw_log_d は作成したディレクトリ、$bizdate は前日の日付を表します。
カラム区切り文字:| を入力します。
フィールドマッピング および チャネル制御 を確認します。
DataWorks は、構成されたフィールドマッピングに基づいて、指定されたソースフィールドから指定された宛先フィールドにデータを同期します。同時実行数を設定したり、ダーティデータのポリシーを構成したりすることもできます。このチュートリアルでは、ダーティデータレコードのポリシー を ダーティデータレコードを許可しない に設定します。その他の設定はデフォルト値のままにしてください。詳細については、「コードレス UI を使用した同期タスクの構成」をご参照ください。
テストパラメーターを構成します。
バッチ同期タスク構成ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト構成を使用してテストを実行するため、以下のパラメーターを構成します。
構成項目
構成の説明
リソースグループ
環境の準備 ステップで購入したサーバーレスリソースグループを選択します。
スクリプトパラメーター
パラメーターの追加 をクリックし、bizdate を
yyyymmdd形式に設定します。例:bizdate=20250223。テスト中、DataStudio はタスクで定義された変数をこの定数に置き換えます。(任意)スケジューリングプロパティを構成します。
このチュートリアルでは、スケジューリング構成パラメーターをデフォルト値のままにしてください。ノード編集ページの右側で、スケジューリング をクリックします。スケジューリング構成のパラメーターの詳細については、「ノードスケジューリング構成」をご参照ください。
スケジューリングパラメーター:このチュートリアルでは、ワークフローレベルでスケジューリングパラメーターを構成します。内部ノードは個別に構成する必要はなく、タスクまたはコード内で直接パラメーターを使用できます。
スケジューリングポリシー:遅延実行時間 パラメーターで、ワークフロー開始後に子ノードが実行を待機する時間を指定できます。このチュートリアルでは設定しません。
上部のツールバーで、保存 をクリックして現在のノードを保存します。
ユーザー データ同期パイプライン (ods_user_info_d_2oss_spark) の構成
バッチ ods_user_info_d_2oss_Spark ノードは、MySQL データソースからユーザー データを非公開 OSS データソースに同期します。
ワークフローのオーケストレーションページで、
ods_user_info_d_2oss_Sparkノードにカーソルを合わせ、ノードを開く をクリックします。同期パイプラインのネットワークとリソースを構成します。
パラメーター
説明
ソース
データソース:
MySQL。データソース名:
user_behavior_analysis_mysql。
マイ リソースグループ
環境準備時に購入したサーバーレスリソースグループを選択します。
宛先
データ宛先:
OSS。データソース名:作成した非公開 OSS データソースを選択します。このチュートリアルでは
test_gを使用します。
重要ネットワーク接続が失敗した場合は、サーバーレスリソースグループでパブリックネットワークが有効になっているか確認してください。
次へ をクリックして同期タスクを構成します。
データソースおよび宛先の構成
以下の表はこのチュートリアルの主要なパラメーターを示しています。言及されていないパラメーターはデフォルト値のままにしてください。
パラメーター
説明
ソース
テーブル:データソースから ods_user_info_d を選択します。
分割キー:プライマリキーまたはインデックス付きカラムを分割キーとして使用します。整数型フィールドのみサポートされています。分割キーを uid に設定します。
宛先
テキストタイプ:text を選択します。
ファイル名 (パスを含む):OSS ディレクトリに基づいてパスを入力します。例:ods_user_info_d/user_${bizdate}/user_${bizdate}.txt。このパスでは、ods_user_info_d は作成したディレクトリ、$bizdate は前日の日付を表します。
カラム区切り文字:| を入力します。
フィールドマッピング および チャネル制御 を確認します。
DataWorks は、構成されたフィールドマッピングに基づいて、指定されたソースフィールドから指定された宛先フィールドにデータを同期します。同時実行数を設定したり、ダーティデータのポリシーを構成したりすることもできます。このチュートリアルでは、ダーティデータレコードのポリシー を ダーティデータレコードを許可しない に設定します。その他の設定はデフォルト値のままにしてください。詳細については、「コードレス UI を使用した同期タスクの構成」をご参照ください。
テストパラメーターを構成します。
バッチ同期タスク構成ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト構成を使用してテストを実行するため、以下のパラメーターを構成します。
構成項目
構成の説明
リソースグループ
環境の準備 ステップで購入したサーバーレスリソースグループを選択します。
スクリプトパラメーター
パラメーターの追加 をクリックし、bizdate を
yyyymmdd形式に設定します。例:bizdate=20250223。テスト中、DataStudio はタスクで定義された変数をこの定数に置き換えます。(任意)スケジューリングプロパティを構成します。
このチュートリアルでは、スケジューリング構成パラメーターをデフォルト値のままにしてください。ノード編集ページの右側で、スケジューリング をクリックします。スケジューリング構成のパラメーターの詳細については、「ノードスケジューリング構成」をご参照ください。
スケジューリングパラメーター:このチュートリアルでは、ワークフローレベルでスケジューリングパラメーターを構成します。内部ノードは個別に構成する必要はなく、タスクまたはコード内で直接パラメーターを使用できます。
スケジューリングポリシー:遅延実行時間 パラメーターで、ワークフロー開始後に子ノードが実行を待機する時間を指定できます。このチュートリアルでは設定しません。
上部のツールバーで、保存 をクリックして現在のノードを保存します。
4. データの同期
データを同期します。
ワークフローのキャンバス上部のツールバーで、実行 をクリックします。この実行で各ノードに定義されたパラメーター変数の値を設定します。このチュートリアルでは
20250223を使用しますが、必要に応じて変更できます。OK をクリックして、実行が完了するまで待ちます。同期結果を表示します。
ワークフローが正常に実行された後、OSS コンソール にログインします。新しく作成された OSS データソース のバケットを確認し、
/ods_user_info_dおよび/ods_raw_log_dディレクトリに対応するディレクトリとデータが存在するかどうかを確認します。
5. データの解析
データが同期された後、Spark SQL を使用して外部テーブルを作成し、OSS に保存されているユーザー情報および Web サイトアクセスレコードを解析します。
ログテーブル (ods_raw_log_d_spark) の作成とデータの解析
バッチ統合タスクによってデータが非公開 OSS データソースに同期された後、生成された OSS ファイルに基づいて EMR SPARK SQL ノードによって作成された外部テーブル ods_raw_log_d_spark に書き込まれます。
ワークフローのオーケストレーションページで、
ods_raw_log_d_sparkノードにカーソルを合わせ、ノードを開く をクリックします。テーブル作成文を編集します。
Paimon テーブル (DLF)
-- 1. 追加専用の Paimon テーブルを作成します。 CREATE TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING, `dt` STRING -- パーティションキーを通常のカラムとしてもテーブルに含めます。これは Paimon の推奨プラクティスです。 ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' ); -- 2. OSS 上のソースファイルを指し示し、解析する一時ビューを作成します。 CREATE TEMPORARY VIEW source_of_logs ( -- ビューにはプレーンテキスト行全体を読み取るための 1 つのカラムのみがあります。 `value` STRING ) USING TEXT OPTIONS ( path 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/' ); INSERT INTO ods_raw_log_d_spark SELECT value, -- 生ログ行 '${bizdate}' AS dt -- SELECT 文でパーティション値を直接指定します。 FROM source_of_logs;Hive テーブル (DLF-Legacy)
-- シナリオ:以下の Spark SQL 文は、EMR Spark SQL ノードを使用して ods_raw_log_d_spark 外部テーブルを作成します。LOCATION 句を使用して、バッチデータ統合タスクによって非公開 OSS バケットに書き込まれたログ情報を取得し、対応する dt パーティションを追加します。 -- 注: -- DataWorks はスケジューリングパラメーターを提供しており、スケジューリングシナリオでは、日々の増分データをターゲットテーブルの対応するビジネスパーティションに書き込むことができます。 -- 実際の開発シナリオでは、${variable_name} 形式でコード変数を定義し、スケジューリング構成ページで値を割り当てることができます。これにより、スケジュールされた実行中にコード内で動的パラメーター入力が可能になります。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'説明コード内の location アドレスを実際のアドレスに置き換えてください。
dw-spark-demoは、OSS 環境の準備 時に作成した OSS バケットの名前です。テストパラメーターを構成します。
EMR SPARK SQL タスク構成ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト構成を使用してテストを実行するため、以下のパラメーターを構成します。
構成項目
構成の説明
コンピューティングリソース
環境の準備 ステップでアタッチした Spark コンピューティングリソースを選択します。
リソースグループ
環境の準備 ステップで購入したサーバーレスリソースグループを選択します。
スクリプトパラメーター
パラメーターの追加 をクリックし、bizdate を
yyyymmdd形式に設定します。例:bizdate=20250223。テスト中、DataStudio はタスクで定義された変数をこの定数に置き換えます。(任意)スケジューリングプロパティを構成します。
このチュートリアルでは、スケジューリング構成パラメーターをデフォルト値のままにしてください。ノード編集ページの右側で、スケジューリング をクリックします。スケジューリング構成のパラメーターの詳細については、「ノードスケジューリング構成」をご参照ください。
スケジューリングパラメーター:このチュートリアルでは、ワークフローレベルでスケジューリングパラメーターを構成します。内部ノードは個別に構成する必要はなく、タスクまたはコード内で直接パラメーターを使用できます。
スケジューリングポリシー:遅延実行時間 パラメーターで、ワークフロー開始後に子ノードが実行を待機する時間を指定できます。このチュートリアルでは設定しません。
上部のツールバーで、保存 をクリックして現在のノードを保存します。
ユーザーテーブル (ods_user_info_d_spark) の作成とデータの解析
バッチ統合タスクによってデータが非公開 OSS データソースに同期された後、生成された OSS ファイルに基づいて外部テーブル ods_user_info_d_spark に書き込まれます。
ワークフローのオーケストレーションページで、
ods_user_info_d_sparkノードにカーソルを合わせ、ノードを開く をクリックします。同期リンク、ネットワーク、およびリソースを構成します。
Paimon テーブル (DLF)
-- 1. 宛先として Paimon テーブルを作成します。 CREATE TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT 'ユーザー ID', `gender` STRING COMMENT '性別', `age_range` STRING COMMENT '年齢層', `zodiac` STRING COMMENT '星座', `dt` STRING COMMENT 'パーティション日付' ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' ); -- 2. OSS 上のソースファイルを指し示し、解析する一時ビューを作成します。 CREATE TEMPORARY VIEW source_of_user_info ( -- ビューにはプレーンテキスト行全体を読み取るための 1 つのカラムのみがあります。 `value` STRING ) USING TEXT -- Spark に対してプレーンテキストファイルであることを指示します。 OPTIONS ( path 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ); -- 3. 一時ビューからデータをクエリおよび解析し、Paimon テーブルに挿入します。 INSERT INTO ods_user_info_d_spark SELECT -- split 関数を使用して、生テキスト行を '|' で分割します。 split(value, '\\|')[0] AS uid, split(value, '\\|')[1] AS gender, split(value, '\\|')[2] AS age_range, split(value, '\\|')[3] AS zodiac, '${bizdate}' AS dt -- パーティションフィールドに値を割り当てます。 FROM source_of_user_info;Hive テーブル (DLF-Legacy)
-- シナリオ:以下の Spark SQL 文は、EMR Spark SQL ノードを使用して ods_user_info_d_spark 外部テーブルを作成します。LOCATION 句を使用して、バッチデータ統合タスクによって非公開 OSS バケットに書き込まれたユーザー情報を取得し、対応する dt パーティションに書き込みます。 -- 注: -- DataWorks はスケジューリングパラメーターを提供しており、スケジューリングシナリオでは、日々の増分データをターゲットテーブルの対応するビジネスパーティションに書き込むことができます。 -- 実際の開発シナリオでは、${variable_name} 形式でコード変数を定義し、スケジューリング構成ページで値を割り当てることができます。これにより、スケジュールされた実行中にコード内で動的パラメーター入力が可能になります。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT 'ユーザー ID' ,`gender` STRING COMMENT '性別' ,`age_range` STRING COMMENT '年齢層' ,`zodiac` STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ;説明コード内の location アドレスを実際のアドレスに置き換えてください。
dw-spark-demoは、OSS 環境の準備 時に作成した OSS バケットの名前です。テストパラメーターを構成します。
EMR SPARK SQL タスク構成ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト構成を使用してテストを実行するため、以下のパラメーターを構成します。
構成項目
構成の説明
コンピューティングリソース
環境の準備 ステップでアタッチした Spark コンピューティングリソースを選択します。
リソースグループ
環境の準備 ステップで購入したサーバーレスリソースグループを選択します。
スクリプトパラメーター
パラメーターの追加 をクリックし、bizdate を
yyyymmdd形式に設定します。例:bizdate=20250223。テスト中、DataStudio はタスクで定義された変数をこの定数に置き換えます。(任意)スケジューリングプロパティを構成します。
このチュートリアルでは、スケジューリング構成パラメーターをデフォルト値のままにしてください。ノード編集ページの右側で、スケジューリング をクリックします。スケジューリング構成のパラメーターの詳細については、「ノードスケジューリング構成」をご参照ください。
スケジューリングパラメーター:このチュートリアルでは、ワークフローレベルでスケジューリングパラメーターを構成します。内部ノードは個別に構成する必要はなく、タスクまたはコード内で直接パラメーターを使用できます。
スケジューリングポリシー:遅延実行時間 パラメーターで、ワークフロー開始後に子ノードが実行を待機する時間を指定できます。このチュートリアルでは設定しません。
上部のツールバーで、保存 をクリックして現在のノードを保存します。
6. タスクの実行
データを同期します。
ワークフローツールバーで、実行 をクリックします。この実行で各ノードに定義されたパラメーター変数の値を設定します。このチュートリアルでは
20250223を使用しますが、必要に応じて値を変更できます。その後、OK をクリックして、実行が完了するまで待ちます。データ同期結果をクエリします。
このセクションのすべてのノードが正常に実行された後、以下の SQL クエリを記述して、EMR SPARK SQL ノードによって作成された外部テーブルが正しく生成されたかどうかを確認します。
ods_raw_log_d_spark テーブルの結果を検証します。
-- 現在の操作の実際のデータタイムスタンプにパーティションフィルター条件を更新する必要があります。たとえば、タスクが 20250223 に実行された場合、データタイムスタンプはタスク実行日の前日である 20250222 です。 SELECT * FROM ods_raw_log_d_spark WHERE dt='${bizdate}';--ods_raw_log_d_spark テーブルをクエリods_user_info_d_spark テーブルの結果を検証します。
-- 現在の操作の実際のデータタイムスタンプにパーティションフィルター条件を更新する必要があります。たとえば、タスクが 20250223 に実行された場合、データタイムスタンプはタスク実行日の前日である 20250222 です。 SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}';--ods_user_info_d_spark テーブルをクエリ
次のステップ
これで、ログデータを同期する方法を学びました。次のチュートリアルに進み、同期されたデータを処理および分析する方法を学ぶことができます。詳細については、「データの処理」をご参照ください。
よくある質問
Q:テーブルを作成する際に、エラー
Option 'path' is not allowed for Normal Paimon table, please remove it in table options.が報告されます。A:これは構文エラーです。このトピックの Paimon テーブル (DLF) セクションに示されているとおりにテーブルを作成してください。