このチュートリアルでは、ユーザーの基本情報を含む MySQL の ods_user_info_d テーブルと、Web サイトのアクセスログデータを含む OSS の user_log.txt ファイルという 2 つのデータソースを例として使用します。Data Integration のオフライン同期タスクを使用して、これらのソースから MaxCompute の ods_user_info_d テーブルと ods_raw_log_d テーブルにデータを同期します。このチュートリアルでは、DataWorks Data Integration を使用して、データウェアハウス用に異種データソース間でデータを同期する方法について説明します。
前提条件
必要な動作環境を準備していることを確認してください。詳細については、「環境の準備」をご参照ください。
1. データソースの作成
後続のステップでデータを処理できるように、生データを取得するために、次のデータソースを DataWorks ワークスペースに追加する必要があります。
MySQL データソース:このチュートリアルでは、
user_behavior_analysis_mysqlという名前のデータソースを使用して、 MySQL から基本ユーザー情報 (ods_user_info_d) を取得します。HttpFile データソース: このチュートリアルでは、
user_behavior_analysis_httpfileという名前のデータソースを使用して、OSS に保存されているユーザーのウェブサイトアクセスログ (user_log.txt) を取得します。
他のユーザーペルソナ分析チュートリアル用にすでに MySQL および HttpFile データソースを作成している場合は、このステップをスキップできます。
MySQL データソースの作成 (user_behavior_analysis_mysql)
このチュートリアルの基本的なユーザー情報は、MySQL データベースに保存されています。データベースから MaxCompute にユーザー情報 (ods_user_info_d) を同期するには、MySQL データソースを作成する必要があります。
[データソース] ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターへ移動] をクリックします。
左側のナビゲーションウィンドウで [データソース] をクリックして、[データソース] ページに移動します。
[データソースを追加] をクリックします。データソースタイプとして [MySQL] を検索して選択します。
[MySQL データソースを追加] ページで、パラメーターを設定します。このチュートリアルでは、開発環境と本番環境の両方で同じサンプル値を使用します。
次の表に、主要なパラメーターを示します。その他のパラメーターはデフォルト値のままにすることができます。
パラメーター
説明
[データソース名]
データソースの名前を入力します。このチュートリアルでは、
user_behavior_analysis_mysqlと入力します。データソースの説明
このデータソースは DataWorks のチュートリアル用です。プラットフォームが提供するテストデータにアクセスするためにオフライン同期タスクを設定する際に、このデータソースからデータを読み取ります。このデータソースは Data Integration シナリオでのみ読み取り可能です。他のモジュールでは使用できません。
設定モード
[接続文字列モード] を選択します。
[接続アドレス]
ホスト IP アドレス:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.comポート:
3306
データベース名
データベース名を入力します。このチュートリアルでは、
workshopと入力します。ユーザー名
ユーザー名を入力してください。このチュートリアルでは
workshopと入力します。パスワード
パスワードを入力します。このチュートリアルでは、
workshop#2017を入力します。認証方式
認証なし。
[接続設定] セクションで、本番環境と開発環境の両方で [ネットワーク接続をテスト] をクリックします。接続ステータスが [接続済み] であることを確認してください。
重要リソースグループがワークスペースにアタッチされ、パブリックネットワークアクセスが有効になっていることを確認してください。そうでない場合、データ同期は失敗します。詳細については、「環境の準備」をご参照ください。
利用可能なリソースグループがない場合は、接続設定セクションのプロンプトに従ってください。[購入] と [購入したリソースグループを関連付け] をクリックします。
[作成を完了] をクリックします。
HttpFile データソースの作成 (user_behavior_analysis_httpfile)
このチュートリアルのユーザー Web サイトのアクセスログは、OSS に保存されています。ユーザー Web サイトのアクセスログ (user_log.txt) を OSS から MaxCompute に同期するには、HttpFile データソースを作成する必要があります。
左側のナビゲーションウィンドウで [データソース] をクリックします。
[データソースを追加] をクリックします。[データソースを追加] ダイアログボックスで、データソースタイプとして [HttpFile] を検索して選択します。
[HttpFile データソースを追加] ページで、パラメーターを設定します。このチュートリアルでは、開発環境と本番環境の両方で同じサンプル値を使用します。
次の表に、主要なパラメーターを示します。その他のパラメーターはデフォルト値のままにすることができます。
パラメーター
説明
データソース名
データソース名を入力します。このチュートリアルでは、
user_behavior_analysis_httpfileと入力します。[データソースの説明]
このデータソースは DataWorks のチュートリアル用です。プラットフォームが提供するテストデータにアクセスするためにオフライン同期タスクを設定する際に、このデータソースからデータを読み取ります。このデータソースは Data Integration シナリオでのみ読み取り可能です。他のモジュールでは使用できません。
[URL]
開発環境と本番環境の両方で、[URL] を
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.comに設定します。[接続設定] セクションで、本番環境と開発環境の両方で [ネットワーク接続をテスト] をクリックします。接続ステータスが [接続済み] であることを確認してください。
重要リソースグループがワークスペースにアタッチされ、パブリックネットワークアクセスが有効になっていることを確認してください。そうでない場合、データ同期は失敗します。詳細については、「環境の準備」をご参照ください。
利用可能なリソースグループがない場合は、接続設定セクションのプロンプトに従ってください。[購入] と [購入したリソースグループを関連付け] をクリックします。
[作成を完了] をクリックします。
2. 同期パイプラインの構築
このステップでは、同期パイプラインを構築して、ユーザー情報と Web サイトアクセスログをそれぞれの MaxCompute テーブルに同期します。これにより、さらなる処理のためにデータが準備されます。
左上隅の
アイコンをクリックし、 を選択します。その後、ページの上部で、このチュートリアル用に作成されたワークスペースに切り替えます。左側のナビゲーションウィンドウで
をクリックしてデータ開発ページに移動します。次に、[ワークスペースディレクトリ] エリアで
をクリックし、[ワークフローの作成] を選択して、ワークフロー名を入力します。このチュートリアルでは、名前を user_profile_analysisに設定します。ワークフローキャンバスで、左側のペインから [ゼロロードノード] 1 つと [バッチ同期] ノード 2 つをキャンバスにドラッグし、ノードに名前を付けます。
次の表に、このチュートリアルのノード名の例とその機能を示します。
ノードタイプ
ノード名
ノード機能
ゼロロードノードworkshop_startユーザーペルソナ分析ワークフロー全体を管理し、データ転送パスを明確にします。このノードは [ドライラン] ノードであり、コード編集は不要です。
バッチ同期ods_user_info_dMySQL からユーザーの基本情報を MaxCompute の
ods_user_info_dテーブルに同期します。
バッチ同期ods_raw_log_dユーザーのウェブサイトアクセスレコードを OSS から MaxCompute の
ods_raw_log_dテーブルに同期します。次の図に示すように、接続をドラッグして、
workshop_startノードを 2 つのオフライン同期ノードの先祖ノードとして設定します。ワークフローのスケジューリングプロパティを設定します。
ワークフローキャンバスで、右側のペインの [スケジュール] をクリックし、パラメーターを設定します。次の表に、主要なパラメーターを示します。その他のパラメーターはデフォルト値のままにすることができます。
スケジューリングパラメーター
説明
スケジューリングパラメーター
ワークフロー全体のスケジューリングパラメーターを設定できます。ワークフローの内部ノードは、設定されたスケジューリングパラメーターを直接使用できます。このチュートリアルでは、前日の日付を取得するために、パラメーターを
bizdate=$[yyyymmdd-1]に設定します。[スケジューリング周期]
このチュートリアルでは
Dayに設定します。スケジュール時間
このチュートリアルでは、[スケジュール時刻] が
00:30に設定されているため、ワークフローは毎日00:30に開始されます。[スケジューリング依存関係]
ワークフローには上流の依存関係がないため、設定する必要はありません。管理を容易にするために、[ワークスペースのルートノードを使用] をクリックして、ワークフローをワークスペースのルートノードにアタッチします。
ワークスペースルートノードの命名フォーマットは
workspace_name_rootです。
3. 同期タスクの設定
開始ノードの設定
ワークフローオーケストレーションページで、
workshop_startノードにマウスカーソルを合わせ、[ノードを開く] をクリックします。workshop_startノード設定ページの右側のペインで、[スケジューリング] をクリックして、必須パラメーターを設定します。 次の表で、キーパラメーターについて説明します。 その他のパラメーターについては、デフォルト値のままにすることができます。スケジューリングパラメーター
説明
スケジューリングタイプ
このチュートリアルでは、
ドライランスケジューリングを使用します。[リソースグループ]
このチュートリアルでは、「環境の準備」ステップで作成した Serverless リソースグループを選択します。
スケジューリングの依存関係
workshop_startは初期ノードで、上流の依存関係がないため、[ワークスペースルートノードを使用] をクリックしてワークスペースのルートノードからワークフローの実行をトリガーできます。ワークスペースルートノードの名前は
workspace_name_rootです。
ユーザーデータ同期パイプラインの設定 (ods_user_info_d)
ワークフローオーケストレーションページで、
ods_user_info_dノードにマウスカーソルを合わせ、[ノードを開く] をクリックします。同期パイプラインのネットワークとリソースを設定します。
パラメーター
説明
ソース
データソース:
MySQL。データソース名:
user_behavior_analysis_mysql。
[リソースグループ]
「環境の準備」ステップで購入した Serverless リソースグループを選択します。
宛先
データ宛先:
MaxCompute(ODPS)。データソース名: 環境の準備 ステップでアタッチした MaxCompute コンピューティングリソースを選択します。たとえば、
MaxCompute_Sourceです。
[次へ] をクリックして同期タスクを設定します。
[ソースと宛先を設定]
次の表に、主要なパラメーターを示します。その他のパラメーターはデフォルト値のままにすることができます。
モジュール
設定項目
説明
データソース
テーブル
ods_user_info_dを選択します。キーの分割
シャードキーは
uidフィールドに設定されます。 uid フィールドは MySQL のods_user_info_dテーブルのプライマリキーです。[宛先]
トンネルリソースグループ
このチュートリアルでは、デフォルトで [共通転送リソース] を使用します。専用のトンネルクォータがある場合は、ドロップダウンリストから選択できます。専用のトンネルクォータの詳細については、「Data Transmission Service の専用リソースグループの購入と使用」をご参照ください。
[テーブル]
[宛先テーブルスキーマを生成] をクリックして、MaxCompute テーブルを迅速に作成します。次の DDL 文を [テーブル作成文] セクションに貼り付け、[テーブルを作成] をクリックします。このテーブルは、ソースからのユーザーデータを受け取ります。
CREATE TABLE IF NOT EXISTS ods_user_info_d ( uid STRING COMMENT 'User ID', gender STRING COMMENT 'Gender', age_range STRING COMMENT 'Age range', zodiac STRING COMMENT 'Zodiac sign' ) PARTITIONED BY ( dt STRING ) LIFECYCLE 7;パーティション情報
このチュートリアルで
${bizdate}を入力すると、テスト段階ではbizdateパラメーターに定数値を、スケジュール実行時にはbizdateパラメーターに動的に値を割り当てることができます。 DataStudio でサポートされている変数フォーマットと構成メソッドの詳細については、「スケジューリングパラメーター」をご参照ください。[フィールドマッピング] と [チャネル制御] を確認します。
DataWorks は、設定されたフィールドマッピングに基づいて、指定されたソースフィールドから指定された宛先フィールドにデータを同期します。また、同時実行数を設定し、ダーティデータのポリシーを設定することもできます。このチュートリアルでは、[ダーティデータレコードのポリシー] を [ダーティデータレコードを許可しない] に設定します。その他の設定はデフォルト値のままにすることができます。詳細については、「コードレス UI を使用した同期タスクの設定」をご参照ください。
デバッグパラメーターを設定します。
オフライン同期タスク設定ページの右側のペインで、[デバッグ設定] をクリックします。ステップ 4:データの同期 で実行をテストするために、次のパラメーターを設定します。
設定項目
説明
リソースグループ
「環境の準備」ステップで購入した Serverless リソースグループを選択します。
スクリプトパラメーター
このパラメーターを設定する必要はありません。このチュートリアルでは、サンプルコードで
${bizdate}を使用してデータタイムスタンプを表します。ステップ 4 でワークフローをデバッグする際、[今回の実行で使用する値] を20250223のような特定の定数に設定します。ジョブの実行では、この定数によって、ジョブで定義された変数が置き換えられます。(オプション) スケジューリングプロパティを設定します。
このチュートリアルでは、スケジューリングパラメーターのデフォルト値をそのまま使用できます。オフライン同期タスク設定ページの右側のペインで、[スケジュール] をクリックします。パラメーターの詳細については、「ノードのスケジューリング」をご参照ください。
[スケジューリングパラメーター]:これらのパラメーターはワークフローに対してすでに設定されています。内部ノードは個別の設定を必要とせず、タスクやコードで直接パラメーターを使用できます。
[スケジューリングポリシー]:[遅延実行時間] パラメーターを使用して、ワークフローが実行された後に子ノードが実行を待機する期間を指定できます。このパラメーターは、このチュートリアルでは設定しません。
ノードツールバーで [保存] をクリックします。
ユーザーログ同期パイプラインの設定 (ods_raw_log_d)
ワークフローオーケストレーションページで、
ods_raw_log_dノードにカーソルを合わせ、[ノードを開く] をクリックします。同期パイプラインのネットワークとリソースを設定します。
パラメーター
説明
ソース
データソース:
HttpFile。データソース名:
user_behavior_analysis_HttpFile。
[リソースグループ]
「環境の準備」ステップで購入した Serverless リソースグループを選択します。
宛先
データ宛先:
MaxCompute(ODPS)。データソース名: 環境の準備 ステージでアタッチした MaxCompute コンピューティングリソースを選択します。 この例では、名前は
MaxCompute_Sourceです。
[次へ] をクリックして同期タスクを設定します。
ソースと宛先の設定
次の表に、主要なパラメーターを示します。その他のパラメーターはデフォルト値のままにすることができます。
モジュール
設定項目
説明
データソース
ファイルパス
このチュートリアルでは、
/user_log.txtを入力します。テキストタイプ
このチュートリアルでは、
textタイプを使用します。[列区切り文字]
このチュートリアルでは、
|を入力します。高度な構成 > ヘッダーのスキップ
このチュートリアルでは、テーブルヘッダーをスキップしないように、
Noを選択します。重要データソースを設定した後、[データ構造を確認] をクリックして、ログファイルが正しく読み取れるか確認します。
送信先
トンネルリソースグループ
このチュートリアルでは、デフォルトで [共通転送リソース] を使用します。専用のトンネルクォータがある場合は、ドロップダウンリストから選択できます。専用のトンネルクォータの詳細については、「Data Transmission Service の専用リソースグループの購入と使用」をご参照ください。
テーブル
[宛先テーブルスキーマを生成] をクリックして、MaxCompute テーブルを迅速に作成します。次の DDL 文を [テーブル作成文] セクションに貼り付け、[テーブルを作成] をクリックします。
CREATE TABLE IF NOT EXISTS ods_raw_log_d ( col STRING ) PARTITIONED BY ( dt STRING ) LIFECYCLE 7;パーティション情報
このチュートリアルでは、
${bizdate}と入力します。これにより、テスト段階ではbizdateパラメーターに定数値を割り当て、スケジュール実行時にはbizdateパラメーターに動的に値を割り当てることができます。DataStudio でサポートされている変数のフォーマットと構成メソッドの詳細については、「スケジューリングパラメーター」をご参照ください。[フィールドマッピング] と [チャネル制御] を確認します。
DataWorks は、設定されたフィールドマッピングに基づいて、指定されたソースフィールドから指定された宛先フィールドにデータを同期します。また、同時実行数を設定し、ダーティデータのポリシーを設定することもできます。このチュートリアルでは、[ダーティデータレコードのポリシー] を [ダーティデータレコードを許可しない] に設定します。その他の設定はデフォルト値のままにすることができます。詳細については、「コードレス UI を使用した同期タスクの設定」をご参照ください。
デバッグパラメーターを設定します。
オフライン同期タスク設定ページの右側のペインで、[デバッグ設定] をクリックします。ステップ 4:データの同期 で実行をテストするために、次のパラメーターを設定します。
設定項目
説明
[リソースグループ]
「環境の準備」ステップで購入した Serverless リソースグループを選択します。
[スクリプトパラメーター]
このパラメーターを設定する必要はありません。このチュートリアルでは、サンプルコードは
${bizdate}を使用してデータタイムスタンプを表します。ステップ 4 でワークフローをデバッグするときは、[今回の実行で使用する値] を20250223などの特定の定数に設定します。ジョブの実行では、この定数を使用してジョブで定義された変数を置き換えます。(オプション) スケジューリングプロパティを設定します。
このチュートリアルでは、スケジューリングパラメーターのデフォルト値をそのまま使用できます。オフライン同期タスク設定ページの右側のペインで、[スケジュール] をクリックします。パラメーターの詳細については、「ノードのスケジューリング」をご参照ください。
[スケジューリングパラメーター]:これらのパラメーターはワークフローに対してすでに設定されています。内部ノードは個別の設定を必要とせず、タスクやコードで直接パラメーターを使用できます。
[スケジューリングポリシー]:[遅延実行時間] パラメーターを使用して、ワークフローが実行された後に子ノードが実行を待機する期間を指定できます。このパラメーターは、このチュートリアルでは設定しません。
ノードツールバーで [保存] をクリックします。
4. データの同期
データを同期します。
ワークフローツールバーで、[実行] をクリックします。この実行では、各ノードで定義されているパラメーター変数の値を設定します。このチュートリアルでは
20250223を使用しますが、必要に応じて値を変更できます。次に、[OK] をクリックし、実行が完了するまで待ちます。データ同期の結果をクエリします。
DataStudio の左側にあるナビゲーションウィンドウで、
をクリックしてデータ開発ページに移動します。次に、個人フォルダエリアで
をクリックして、.sql拡張子のファイルを作成します。任意のファイル名を指定できます。ページの下部で、言語モードが
MaxCompute SQLであることを確認してください。
ノード編集ウィンドウで、次の SQL 文を入力して、
ods_raw_log_dとods_user_info_dにインポートされたレコード数を表示します。これにより、同期されたデータが宛先テーブルに書き込まれているかどうかを確認できます。-- 現在の操作の実際のデータタイムスタンプに合わせて、パーティションフィルター条件を変更する必要があります。このチュートリアルでは、デバッグパラメーター bizdate (データタイムスタンプ) は 20250223 に設定されていました。 SELECT count(*) FROM ods_user_info_d WHERE dt='your_data_timestamp'; SELECT count(*) FROM ods_raw_log_d WHERE dt='your_data_timestamp';クエリがデータを返した場合、データ同期は完了です。
データが返されない場合は、ワークフローの実行に設定されている [今回の実行で使用される値] が、クエリで
dtによって指定されたデータタイムスタンプと一致することを確認します。 ワークフローをクリックし、右側のペインで [実行履歴] をクリックし、実行レコードの [操作] 列で [表示] をクリックして、ワークフロー実行ログでデータタイムスタンプの値 (partition=[pt=xxx]) を確認できます。
次のステップ
データ同期を設定した後、次のチュートリアルに進み、同期されたデータを処理および分析する方法を学びます。詳細については、「データの処理」をご参照ください。