このチュートリアルでは、DataWorks Data Integration を使用して、MySQL データベースと OSS フラットファイルという 2 つの異種ソースから MaxCompute にデータを同期する方法について説明します。最終的に、MaxCompute の ods_user_info_d テーブルと ods_raw_log_d テーブルにデータが入力され、ダウンストリーム処理の準備が整います。
このチュートリアルでは、次のことを行います。
-
DataWorks ワークスペースに MySQL および HttpFile データソースを追加します。
-
Zero Load Node と 2 つのバッチ同期ノードを含む同期パイプラインを構築します。
-
送信先テーブル DDL、パーティションパラメーター、およびダーティデータポリシーを含む各同期タスクを構成します。
-
パイプラインを実行し、データが MaxCompute に書き込まれていることを確認します。
前提条件
開始する前に、次のことを確認してください。
-
環境の準備 を完了し、DataWorks ワークスペース、Serverless リソースグループ、およびパブリックネットワークアクセスを設定済みであること。
-
ワークスペースにアタッチ済みのサーバーレスリソースグループ(ない場合は、接続構成セクションのプロンプトに従って、[購入] をクリックし、次に [購入済みリソースグループの関連付け] をクリックします)
ステップ 1: データソースの作成
後続の同期タスクが生データにアクセスできるように、DataWorks ワークスペースに次の 2 つのデータソースを追加します。
他のユーザープロファイル分析チュートリアルで MySQL および HttpFile データソースをすでに作成している場合は、このステップをスキップしてください。
MySQL データソースの作成
このチュートリアルの基本的なユーザー情報は MySQL データベースに保存されています。user_behavior_analysis_mysql という名前の MySQL データソースを作成して、ods_user_info_d テーブルを MaxCompute にプルします。
-
DataWorks コンソールにログインします。上部のナビゲーションバーで、リージョンを選択します。左側のナビゲーションウィンドウで、[詳細] > [管理センター] を選択します。ドロップダウンリストからワークスペースを選択し、[管理センターに移動] をクリックします。
-
左側のナビゲーションウィンドウで、[データソース]をクリックします。
-
「[データソースの追加]」をクリックし、次に検索して「[MySQL]」を選択します。
-
[MySQL データソースの追加] ページで、以下のパラメーターを設定します。これらの値は、開発環境と本番環境の両方で使用します。記載されていないパラメーターは、デフォルト値のままにします。
パラメーター 値 データソース user_behavior_analysis_mysqlデータソースの説明 このデータソースは DataWorks チュートリアル用です。オフライン同期タスクを構成してプラットフォームが提供するテストデータにアクセスする際に、このデータソースからデータを読み取ります。このデータソースは Data Integration シナリオでのみ読み取り可能であり、他のモジュールでは使用できません。 設定モード 接続文字列モード 接続アドレス ホスト IP: rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com, ポート:3306データベース名 workshop[ユーザー名] workshopパスワード workshop#2017[認証方法] 認証なし -
[接続設定] セクションで、両方の環境で [ネットワーク接続のテスト] をクリックし、ステータスが [接続済み] と表示されることを確認します。
重要リソースグループは、パブリックネットワークアクセスが有効になっているワークスペースにアタッチされている必要があります。そうでない場合、データ同期は失敗します。
-
[作成の完了] をクリックします。
HttpFile データソースの作成
このチュートリアルのユーザーウェブサイトアクセスログは OSS (Object Storage Service) に保存されています。user_behavior_analysis_httpfile という名前の HttpFile データソースを作成して、user_log.txt を MaxCompute にプルします。
-
左側のナビゲーションウィンドウで、[データソース] をクリックします。
-
[データソースの追加] をクリックし、[HttpFile] を検索して選択します。
-
「HttpFile データソースの追加」ページで、以下のパラメーターを設定します。これらの値は、開発環境および本番環境の両方で使用します。一覧にないパラメーターについては、デフォルト値をそのまま使用します。
パラメーター 値 データソース名 user_behavior_analysis_httpfileデータソースの説明 このデータソースは DataWorks チュートリアル用です。オフライン同期タスクを構成してプラットフォームが提供するテストデータにアクセスする際に、このデータソースからデータを読み取ります。このデータソースは Data Integration シナリオでのみ読み取り可能であり、他のモジュールでは使用できません。 URL https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com -
「[接続設定]」セクションで、両方の環境に対して「[ネットワーク接続のテスト]」をクリックします。ステータスが「[接続済み]」と表示されることを確認します。
重要リソースグループは、パブリックネットワークアクセスが有効になっているワークスペースにアタッチされている必要があります。そうでない場合、データ同期は失敗します。
-
[作成完了] をクリックします。
ステップ 2: 同期パイプラインの構築
2 つの同期タスクをオーケストレーションし、スケジュールに基づいて毎日実行されるワークフローを構築します。
-
左上隅の
アイコンをクリックし、[すべての製品] > [データ開発およびタスク操作] > DataStudio を選択します。このチュートリアル用に作成したワークスペースに切り替えます。 -
左側のナビゲーションウィンドウで、
をクリックしてデータ開発ページを開きます。[ワークスペースディレクトリ] エリアで、
をクリックし、[ワークフローの作成] を選択して、名前を user_profile_analysisとします。 -
ワークフローキャンバスで、左側のペインから 1 つの[ゼロロードノード]と 2 つの[バッチ同期]ノードをドラッグして配置し、次のように名前を付けます。
ノードタイプ ノード名 機能 Zero Load Node workshop_startワークフロー全体を管理するドライラン先祖ノード。コード編集は不要です。 バッチ同期 ods_user_info_dMySQL から MaxCompute の ods_user_info_dテーブルに基本的なユーザー情報を同期します。バッチ同期 ods_raw_log_dOSS から MaxCompute の ods_raw_log_dテーブルにユーザーウェブサイトアクセスログを同期します。 -
接続をドラッグして、
workshop_startを両方のバッチ同期ノードの先祖ノードとして設定します。以下に示すとおりです。 -
ワークフローのスケジューリングを設定します。キャンバスで、右側のペインの[スケジューリング]をクリックし、次のパラメーターを設定します。記載されていないパラメーターは、デフォルト値のままにします。
スケジューリングパラメーター 値 スケジューリングパラメーター bizdate=$[yyyymmdd-1]— 前日の日付をすべての内部ノードに渡しますスケジューリング周期 Dayスケジュール時間 00:30[スケジューリングの依存関係] ワークスペースのルートノードを使用 をクリックして、ワークフローをルートノード (形式: workspace_name_root) にアタッチします。
ステップ 3: 同期タスクの構成
初期ノードの構成
-
ワークフローキャンバスで、
workshop_startにマウスカーソルを合わせ、[ノードを開く] をクリックします。 -
右側のペインで、[スケジューリング] をクリックし、次のパラメーターを設定します。記載されていないパラメーターについては、デフォルト値のままにします。
スケジューリングパラメーター 値 スケジューリングタイプ dry-run schedulingトンネルリソースグループ 環境の準備 で作成した Serverless リソースグループを選択します。 スケジューリングの依存関係 ワークスペースルートノードの使用 をクリックして、ワークスペースルートノード ( workspace_name_root) から実行をトリガーします。
ユーザーデータ同期タスク (ods_user_info_d) の構成
-
ワークフローキャンバスで、
ods_user_info_dにカーソルを合わせ、[ノードを開く] をクリックします。 -
ソースと送信先を構成します。
パラメーター 値 ソース — データソースタイプ MySQLソース — データソース名 user_behavior_analysis_mysqlリソースグループ 環境の準備 から Serverless リソースグループを選択します。 [宛先] — データ送信先タイプ MaxCompute(ODPS)送信先 — データソース名 環境の準備 でアタッチした MaxCompute リソースを選択します (例: MaxCompute_Source)。 -
同期タスクの詳細を設定するには、[次へ] をクリックします。
-
送信元と送信先の設定 次のパラメーターを設定します。一覧にないパラメーターについては、デフォルト値をそのまま使用します。
モジュール 設定項目 値 データソース テーブル ods_user_info_dデータソース 分割キー uid(MySQL のods_user_info_dテーブルのプライマリキー)デスティネーション トンネルリソースグループ 共通転送リソース (デフォルト) です。専用トンネルクォータがある場合は、ドロップダウンリストから選択できます。詳細については、「Data Transmission Service の専用リソースグループの購入と使用」をご参照ください。 デスティネーション テーブル [宛先テーブルスキーマの生成] をクリックします。[テーブル作成ステートメント] ボックスに、以下の DDL を貼り付け、[テーブルの作成] をクリックします: モジュール 設定項目 値 送信先 パーティション情報 ${bizdate}CREATE TABLE IF NOT EXISTS ods_user_info_d ( uid STRING COMMENT 'User ID', gender STRING COMMENT 'Gender', age_range STRING COMMENT 'Age range', zodiac STRING COMMENT 'Zodiac sign' ) PARTITIONED BY ( dt STRING ) LIFECYCLE 7;${bizdate}を使用すると、テスト中に定数を渡し、スケジュールされた実行中に動的な値を渡すことができます。サポートされている変数フォーマットについては、「スケジューリングパラメーター」をご参照ください。 -
フィールドマッピングとチャンネルコントロールを確認してください。 DataWorks では、フィールドマッピング構成に基づいて、ソースフィールドを送信先フィールドにマップします。 [不正データレコードのポリシー] を [不正データレコードを許可しない] に設定します。その他の設定はすべてデフォルト値のままにしてください。詳細については、「コードレス UI を使用した同期タスクの設定」をご参照ください。
-
-
デバッグパラメーターを設定します。右側のペインで、[デバッグ設定] をクリックし、以下を設定します。
構成項目 値 リソースグループ 環境の準備 で作成した Serverless リソースグループを選択します。 スクリプトパラメーター 空白のままにしてください。手順 4 でワークフローを実行する際、[この実行で使用する値] を 20250223などの定数に設定します。ジョブは、この値で${bizdate}を置き換えます。 -
(オプション) スケジューリングプロパティを設定します。右側ペインで、[スケジューリング] をクリックします。ワークフロー レベルのスケジューリング パラメーターは既にこのノードに適用されています。個別の設定は必要ありません。詳細については、「ノード スケジューリング」をご参照ください。
-
ノードツールバーで、[保存] をクリックします。
ユーザーログ同期タスク (ods_raw_log_d) の構成
-
ワークフローキャンバスで、
ods_raw_log_dにマウスオーバーし、[ノードを開く] をクリックします。 -
ソースと送信先を構成します。
パラメーター 値 ソース — データソースタイプ HttpFileソース — データソース名 user_behavior_analysis_httpfileリソースグループ 環境の準備 から Serverless リソースグループを選択します。 宛先 — データ送信先タイプ MaxCompute(ODPS)宛先 — データソース名 環境の準備 から MaxCompute リソースを選択します (例: MaxCompute_Source)。 -
同期タスクの詳細を設定するには、[次へ] をクリックします。
-
[送信元と送信先を設定する] 以下のパラメーターを設定します。一覧に記載されていないパラメーターについては、デフォルト値を保持します。
重要データソースを設定した後、[データ構造の確認] をクリックして、続行する前に DataWorks がログファイルを正しく読み取れることを確認します。
モジュール 構成項目 値 データソース ファイルパス /user_log.txtデータソース テキストタイプ textデータソース 列区切り文字 |データソース 詳細設定 > ヘッダー行をスキップ いいえ送信先 Tunnel リソースグループ 共通伝送リソース(デフォルト)。専用の Tunnel クォータをお持ちの場合は、ドロップダウンリストから選択できます。詳細については、「Data Transmission Service の専用リソースグループの購入と利用」をご参照ください。 送信先 テーブル 送信先テーブルスキーマの生成 をクリックします。テーブル作成ステートメント ボックスに、以下の DDL を貼り付け、テーブルを作成 をクリックします: モジュール 設定項目 値 送信先 パーティション情報 ${bizdate}CREATE TABLE IF NOT EXISTS ods_raw_log_d ( col STRING ) PARTITIONED BY ( dt STRING ) LIFECYCLE 7; -
フィールドマッピングとチャンネルコントロールを確認します。 [ダーティデータレコードのポリシー] を [ダーティデータレコードを許可しない] に設定します。 他のすべての設定はデフォルト値のままにします。 詳細については、「コードレス UI を使用して同期タスクを設定する」をご参照ください。
-
-
デバッグパラメーターを設定します。右側のペインで、[デバッグ設定] をクリックし、以下を設定します。
設定項目 値 リソースグループ 「事前準備」から、Serverless のリソースグループを選択します。 スクリプトパラメーター 空欄のままにしてください。手順 4 でワークフローを実行する際には、今回の実行で使用する値 を、 20250223のような定数に設定します。 -
(オプション) スケジューリングプロパティを設定します。 右側のペインで、[スケジューリング] をクリックします。 ワークフローレベルのパラメーターがこのノードに自動的に適用されます。 詳細については、「ノードのスケジューリング」をご参照ください。
-
ノードツールバーで、[保存] をクリックします。
ステップ 4: データ同期
-
ワークフローツールバーで、[実行] をクリックします。[この実行で使用する値] を
bizdate変数に設定します(例:20250223) — そして [OK] をクリックします。実行が完了するまで待ちます。 -
結果を確認します。
-
DataStudio の左側のナビゲーションウィンドウで、
をクリックしてデータ開発ページを開きます。個人フォルダエリアで、
をクリックして .sql拡張子のファイルを作成します。 -
ページの下部で、言語モードが
MaxCompute SQLであることを確認します。
-
次のクエリを実行して、各テーブルにロードされたレコードをカウントします。
your_data_timestampを、実行で使用されたbizdateの実際の値 (例:20250223) に置き換えます。-- Replace your_data_timestamp with the actual bizdate value (e.g., 20250223) SELECT count(*) FROM ods_user_info_d WHERE dt='your_data_timestamp'; SELECT count(*) FROM ods_raw_log_d WHERE dt='your_data_timestamp';-
両方のクエリがゼロ以外のカウントを返す場合、データ同期は完了しています。
-
クエリがゼロを返した場合、[この実行で使用された値] がクエリ内の
dt値と一致していることを確認してください。確認するには、ワークフローをクリックし、右側ペインで [実行履歴] を開き、[操作] 列の [表示] をクリックして、実行ログ内のパーティション値 (partition=[pt=xxx]) を確認します。
-
-
学習した内容
このチュートリアルでは、次のことを行いました。
-
DataWorks ワークスペースに MySQL データソースと HttpFile データソースを追加しました。
-
Zero Load Node と 2 つのバッチ同期ノードを含む
user_profile_analysisワークフローを構築しました。 -
各同期タスクのソースと送信先テーブル、パーティションパラメーター、およびダーティデータポリシーを構成しました。
-
パイプラインを実行し、データが MaxCompute の
ods_user_info_dテーブルとods_raw_log_dテーブルに書き込まれたことを確認しました。
次のステップ
データの処理 に進み、MaxCompute で同期されたデータを変換および分析します。