このチュートリアルでは、HttpFile および MySQL データソースを作成して、ユーザーおよびウェブサイトのログデータにアクセスする方法を説明します。データ同期パイプラインを設定して、このデータを Object Storage Service (OSS) バケットに転送します。その後、E-MapReduce (EMR) Hive 外部テーブルを作成して OSS 内のデータを解析し、クエリを実行して結果を検証します。
前提条件
必要な環境が準備されていることを確認してください。詳細については、「環境の準備」をご参照ください。
ネットワーク接続を確保するために、ECS コンソールに移動し、EMR クラスターに関連付けられているセキュリティグループにセキュリティグループルールを追加します。このルールは、リソースグループの VPC の vSwitch CIDR ブロックからのポート
10000でのインバウンドトラフィックを許可する必要があります。詳細については、「セキュリティグループルールの追加」をご参照ください。
1. データソースの作成
後続の処理のためにサンプルデータを取得し、保存するために、DataWorks ワークスペースに以下のデータソースを追加します。
MySQL データソース:DataWorks が提供するサンプルデータソースで、基本的なユーザー情報を保存します。
HttpFile データソース:DataWorks が提供するサンプルデータソースで、ユーザーのウェブサイトアクセスログを保存します。
OSS データソース:「EMR 環境の準備」で作成した EMR OSS-HDFS ストレージです。これは、MySQL および HttpFile データソースから同期されたサンプルユーザー情報とウェブサイトアクセスログを保存するために使用されます。
MySQL データソースの作成 (user_behavior_analysis_mysql)
このチュートリアルのサンプルユーザーデータは MySQL データベースに保存されています。このデータを取得するために MySQL データソースを作成します (ods_user_info_d)。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターへ移動] をクリックします。
[設定センター] ページで、左側のナビゲーションウィンドウにある [データソース] をクリックします。
[データソースの追加] をクリックし、データソースタイプとして [MySQL] を検索して選択します。
[MySQL データソースの追加] ページで、パラメーターを設定します。このチュートリアルでは、開発環境と本番環境の両方で同じサンプル値を使用します。
次の表では、主要なパラメーターのみを説明します。他のすべてのパラメーターにはデフォルト値を使用してください。
パラメーター
説明
データ ソース名
データソースの名前を入力します。このチュートリアルでは、
user_behavior_analysis_mysqlと入力します。[データソースの説明]
これは DataWorks チュートリアル用の読み取り専用データソースです。データ統合内のバッチ同期ノードにサンプルデータを提供し、他のモジュールではサポートされていません。
[設定モード]
[接続文字列モード] を選択します。
[接続アドレス]
ホスト IP アドレス:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.comポート番号:
3306
[データベース名]
データベース名を入力します。このチュートリアルでは、
workshopと入力します。ユーザー名
ユーザー名を入力します。このチュートリアルでは、
workshopと入力します。パスワード
パスワードを入力します。このチュートリアルでは、
workshop#2017と入力します。[認証方式]
認証なし。
[接続設定] セクションで、本番環境と開発環境の両方で [ネットワーク接続のテスト] をクリックします。接続ステータスが [接続済み] であることを確認します。
重要リソースグループがワークスペースにアタッチされ、パブリックネットワークアクセスが有効になっていることを確認してください。そうでない場合、データ同期は失敗します。詳細については、「環境の準備」をご参照ください。
利用可能なリソースグループがない場合は、接続設定セクションのプロンプトに従ってください。[購入] と [購入済みリソースグループの関連付け] をクリックします。
[作成を完了] をクリックします。
HttpFile データソースの作成 (user_behavior_analysis_httpfile)
このチュートリアルのユーザーウェブサイトアクセスログは、DataWorks が提供するサンプルの OSS バケットに保存されています。これらのログを取得するために HttpFile データソースを作成します (user_log.txt)。
データソースページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターへ移動] をクリックします。
[設定センター] ページで、左側のナビゲーションウィンドウにある [データソース] をクリックします。
[データソースの追加] をクリックし、データソースタイプとして [HttpFile] を検索して選択します。
[HttpFile データソースの追加] ページで、パラメーターを設定します。このチュートリアルでは、開発環境と本番環境の両方で同じサンプル値を使用します。
次の表では、主要なパラメーターのみを説明します。他のすべてのパラメーターにはデフォルト値を使用してください。
パラメーター
説明
[データソース名]
データソースの名前を入力します。このチュートリアルでは、
user_behavior_analysis_httpfileと入力します。[データソースの説明]
これは DataWorks チュートリアル用の読み取り専用データソースです。データ統合内のバッチ同期ノードにサンプルデータを提供し、他のモジュールではサポートされていません。
[URL]
[URL] を、開発環境と本番環境の両方で
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.comに設定します。[接続設定] セクションで、本番環境と開発環境の両方で [ネットワーク接続のテスト] をクリックします。ステータスが [接続済み] であることを確認します。
重要データ同期の失敗を防ぐために、パブリックアクセスが可能なリソースグループがワークスペースに関連付けられていることを確認してください。設定手順については、「環境の準備」をご参照ください。
利用可能なリソースグループがない場合は、画面の指示に従って [購入] と [購入済みリソースグループの関連付け] をクリックします。
[作成を完了] をクリックします。
OSS データソースの作成 (test_g)
「EMR 環境の準備」で作成した OSS バケットを DataWorks に追加します。このバケットは、MySQL および HttpFile データソースから同期されたデータを保存します。
OSS データソースを追加する際、現在のアカウントに Object Storage Service (OSS) の AliyunOSSFullAccess 権限があることを確認してください。詳細な手順については、「RAM ユーザーの権限の表示」および「RAM ユーザーの権限の管理」をご参照ください。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターへ移動] をクリックします。
「[SettingCenter]」ページで、左側のナビゲーションウィンドウの「[Data Sources]」をクリックします。
[データソースの追加] をクリックし、データソースタイプとして [OSS] を検索して選択します。
[OSS データソースの追加] ページで、パラメーターを設定します。このチュートリアルでは、[開発] 環境と [本番] 環境の両方で同じサンプル値を使用します。
パラメーター
説明
データソース名
データソースの名前を入力します。本例では、
test_gを使用します。データソースの説明
データソースの簡単な説明を入力します。
アクセスモード
AccessKey モード を選択します。
AccessKey ID
現在のアカウントの AccessKey ID です。
DataWorks コンソール にログインし、右上隅のプロフィール画像にマウスを合わせて、AccessKey ページへ移動することで、必要な権限を持つ RAM ユーザー の
AccessKey IDを取得できます。AccessKey シークレット
現在のアカウントの AccessKey シークレットを入力します。
重要AccessKey シークレットは作成時のみ表示され、その後は再取得できません。機密情報を厳重に管理してください。AccessKey が漏洩または紛失した場合は、既存のものを削除し、新しい AccessKey を作成してください。
リージョン
中国 (上海) リージョンを選択します。
エンドポイント
oss-cn-shanghai-internal.aliyuncs.comを入力します。バケット
EMR クラスターを作成する際に設定した OSS バケットの名前です。本例では、
dw-emr-demoを使用します。[接続設定] セクションで、本番環境と開発環境の両方で [ネットワーク接続のテスト] をクリックします。接続ステータスが [接続済み] であることを確認します。
重要リソースグループがワークスペースにアタッチされ、パブリックネットワークアクセスが有効になっていることを確認してください。そうでない場合、データ同期は失敗します。詳細については、「環境の準備」をご参照ください。
利用可能なリソースグループがない場合は、接続設定セクションのプロンプトに従ってください。[購入] と [購入済みリソースグループの関連付け] をクリックします。
[作成を完了] をクリックします。
2. 同期パイプラインの構築
このステップでは、後続の処理のためにユーザーデータとウェブサイトアクセスログを OSS バケットに移動するための同期パイプラインを構築します。
DataWorks コンソールの [ワークスペース] ページに移動します。上部のナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[アクション] 列で を選択します。
Data Studio ページで、左側のナビゲーションウィンドウにある
アイコンをクリックして [データ開発] ページに移動します。次に、左側のディレクトリツリーで [ワークスペースディレクトリ] を選択します。workという名前のディレクトリとworkshop_emrという名前のワークフローを作成します。詳細については、「ワークスペースディレクトリ」および「定期的なワークフローのオーケストレーション」をご参照ください。workshop_emrワークフローをクリックしてワークフローオーケストレーションページを開きます。次に、ゼロ負荷ノードを 1 つ、バッチ同期ノードを 2 つ、EMR Hive ノードを 2 つ作成します。を選択し、右側の編集キャンバスにドラッグします。[ノードの作成] ダイアログボックスで、[ノード名] に
workshop_start_emrと入力し、[確認] をクリックします。2 つの [データ統合] ノードを編集キャンバスにドラッグします。[ノードの作成] ダイアログボックスで、ソースタイプを MySQL、宛先タイプを OSS に設定します。特定のノードタイプとして [バッチ同期] を選択します。[ノードに名前を付けます]
ods_user_info_d_2oss_emrとods_raw_log_d_2oss_emrをそれぞれ指定します。これらのノードは、MySQL のユーザー情報と HttpFile のログデータを OSS バケットに同期します。[確認] をクリックします。2 つの ノードを編集キャンバスにドラッグします。[ノードの作成] ダイアログボックスで、[ノードに名前を付けます]
ods_user_info_d_emrとods_raw_log_d_emrをそれぞれ指定します。これらのノードは、OSS バケット内のデータを解析するための Hive テーブルを作成します。[確認] をクリックします。
次の表は、このチュートリアルで使用されるサンプルノードについて説明しています。
タイプ
パラメーター
機能
ゼロ負荷ノードworkshop_start_emrワークフローをオーケストレーションし、データフローを定義します。この [Zero Load Node] はコード編集が不要です。
バッチ同期ノードods_user_info_d_2oss_emrMySQL データソースから OSS データソース (
test_g) に基本的なユーザー情報を同期します。
バッチ同期ノードods_raw_log_d_2oss_emrHttpFile データソース (OSS) から OSS データソース (
test_g) にユーザーのウェブサイトアクセスログを同期します。
EMR Hive ノードods_user_info_d_emrOSS データソース (
test_g) から基本的なユーザー情報を解析するために Hive テーブルods_user_info_d_emrを作成します。
EMR Hive ノードods_raw_log_d_emrOSS データソース (
test_g) からユーザーのウェブサイトアクセスログを解析するために Hive テーブルods_raw_log_d_emrを作成します。ワークフロー開発パネルで、接続をドラッグして
workshop_start_emrノードを 2 つのバッチ同期ノードの上流依存関係として設定します。最終的な結果は次のようになります:
3. 同期ノードの設定
以下の手順に従って、MySQL データソースからユーザー情報を、HttpFile データソースからログデータを OSS バケットに同期します。その後、Hive 外部テーブルを作成して、OSS に保存されたデータを解析し、クエリを実行します。
ゼロ負荷ノードの設定
ゼロ負荷ノードのスケジュールプロパティを次のように設定します。
ゼロ負荷ノードを開きます。
ワークフローオーケストレーションページで、ゼロ負荷ノードにカーソルを合わせ、[ノードを開く] をクリックしてノード編集ページに移動します。
ノードを設定します。
右側のペインで [プロパティ] をクリックし、次のパラメーターを設定します。
パラメーター
説明
[スケジュール用リソースグループ]
環境準備中に作成したサーバーレスリソースグループを選択します。
[スケジュール依存関係]
ワークフローには上流依存関係がないため、設定は不要です。統一管理のために、[ワークスペースルートノードを使用] をクリックして、ワークスペースルートノードを親依存関係として設定できます。
ワークスペースルートノードは
WorkspaceName_rootの形式で命名されます。ノードを保存します。
設定が完了したら、ツールバーの
アイコンをクリックしてノードを保存します。
4. OSS データの解析
バッチ同期ノードの実行が完了したら、以下の手順に従って Hive 外部テーブルを作成し、test_g OSS データソースに保存されているサンプルデータを解析します。
5. 同期ワークフローの実行
ワークフロースケジュールを設定します。
ワークフローオーケストレーションページの右側で [プロパティ] をクリックし、パラメーターを設定します。次の表では、主要なパラメーターのみを説明します。他のすべてのパラメーターにはデフォルト値を使用してください。
スケジューリングパラメーター
説明
スケジューリングパラメーター
ワークフロー内のすべてのノードで利用可能なスケジューリングパラメーターを設定します。本チュートリアルでは、前日の日付を取得するため、
bizdate=$[yyyymmdd-1]を指定します。実行頻度
本チュートリアルでは、実行頻度を
日単位に設定します。開始時刻
本チュートリアルでは、開始時刻 を
00:30に設定します。ワークフローは毎日00:30に開始されます。スケジューリング依存関係
本ワークフローには上流の依存関係がありませんので、特別な設定は不要です。一元管理を目的として、ワークスペースルートノードを使用 をクリックし、ワークスペースルートノードを親依存関係として設定できます。
ワークスペースルートノードの名前は、
WorkspaceName_rootの形式で付与されます。ワークフロースケジュールプロパティを設定した後、ワークフローキャンバスの上部にあるツールバーで [実行] をクリックします。この実行のパラメーター変数を設定します。このチュートリアルでは例として
20250223を使用しますが、別の値を使用することもできます。[確認] をクリックし、実行が完了するのを待ちます。ods_raw_log_d_emrおよびods_user_info_d_emrノードが
ステータスを示している場合、同期は成功です。ods_raw_log_d_emrおよびods_user_info_d_emrノードが
ステータスを示し、java.net.ConnectException: Connection timed out (Connection timed out)エラーが表示された場合は、ECS コンソールでセキュリティグループルールを追加して ECS ポート10000を開く必要があります。承認オブジェクトをリソースグループの VPC の vSwitch CIDR ブロックに設定します。vSwitch CIDR ブロックは、リソースグループリストで対応するリソースグループの [ネットワーク設定] をクリックすることで取得できます。セキュリティグループルールの追加方法の詳細については、「セキュリティグループルールの追加」をご参照ください。
データ同期には約 24 分かかります。
6. データ同期の検証
DataWorks コンソールの [ワークスペース] ページに移動します。上部のナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[アクション] 列で を選択します。
Data Studio ページで、
アイコンをクリックしてデータ開発に入り、ナビゲーションウィンドウで [ワークスペースディレクトリ] セクションを見つけます。作成した
workディレクトリを右クリックし、 を選択します。EMR Hive ノードにカスタム名を入力し、Enter キーを押してノードを作成します。EMR Hive ノード編集ページで、
your_data_timestampをワークフロー実行のデータタイムスタンプに置き換えます。次のクエリを実行して、データがods_raw_log_d_emrおよびods_user_info_d_emrテーブルに正しくマッピングされていることを確認します。説明クエリのパーティション列は、データタイムスタンプに更新する必要があります。たとえば、ワークフローが
20250223に実行された場合、データタイムスタンプは実行日の前日である20250222になります。SELECT * FROM ods_user_info_d_emr WHERE dt=your_data_timestamp; SELECT * FROM ods_raw_log_d_emr WHERE dt=your_data_timestamp;上記のコマンドがデータを返した場合、データ処理は完了しています。
データが返されない場合は、ワークフロー実行に設定された [今回の実行で使用する値] が、クエリで
dtによって指定されたデータタイムスタンプと一致していることを確認してください。ワークフローをクリックし、右側のペインで [実行履歴] をクリックし、実行レコードの [アクション] 列で [表示] をクリックして、ワークフロー実行ログのデータタイムスタンプ値 (partition=[pt=xxx]) を確認できます。
次のステップ
データの同期が完了したので、次のチュートリアルに進んでデータの処理と分析方法を学びます。詳細については、「データの処理」をご参照ください。