このトピックでは、DataWorks Data Integration を使用して異種データソース間でデータを同期する方法について説明します。この方法を使用して、データをデータウェアハウスに同期できます。このトピックの例では、Data Integration のバッチ同期タスクを使用して、ods_user_info_d という名前の ApsaraDB RDS for MySQL テーブルに格納されている基本的なユーザー情報を ods_user_info_d という名前の MaxCompute テーブルに同期し、user_log.txt という名前の Object Storage Service (OSS) オブジェクトに格納されている Web サイトアクセスログを ods_raw_log_d という名前の別の MaxCompute テーブルに同期します。
前提条件
ユーザーの基本的なユーザー情報と Web サイトアクセスログは準備されており、ApsaraDB RDS for MySQL インスタンスと OSS バケットに格納されています。準備されたデータは、DataWorks で直接登録して使用できます。 ApsaraDB RDS for MySQL と OSS をアクティブ化したり、テストデータを準備したりする必要はありません。次の要件が満たされていることを確認するだけで済みます。
この例では、標準モードのワークスペースを使用します。ワークスペースの名前は、[WorkShop2024_01] です。ビジネス要件に基づいて名前を指定できます。
この例では、[odps_first] という名前の MaxCompute データソースが追加されています。本番環境で使用される MaxCompute プロジェクトの名前は [workshop2024_01] です。開発環境で使用される MaxCompute プロジェクトの名前は [workshop2024_01_dev] です。
オプション。この例の手順を RAM ユーザーとして実行する場合は、RAM ユーザーに [AliyunBSSOrderAccess] ポリシーと [AliyunDataWorksFullAccess] ポリシーがアタッチされていることを確認してください。権限付与の詳細については、「RAM ユーザーに権限を付与する」をご参照ください。
クイックスタート
この実験では、データ同期とデータ処理のタスクを、抽出、変換、ロード (ETL) ワークフローテンプレートを使用してワンクリックでインポートできます。テンプレートをインポートしたら、目的のワークスペースに移動し、後続のデータ品質監視とデータの可視化操作を実行できます。
ワークスペース管理者ロールが割り当てられているユーザーのみが、目的のワークスペースに ETL ワークフローテンプレートをインポートできます。 ワークスペース管理者ロールの割り当て方法の詳細については、「ワークスペースレベルのサービスの権限を管理する」をご参照ください。
ETL ワークフローテンプレートへのクイックアクセスについては、[Web サイトユーザー行動分析] ページにアクセスしてください。
背景情報
Data Integration は、安定した、効率的でスケーラブルなデータ同期サービスです。複雑なネットワーク環境において、異種データソース間でデータを効率的に転送および同期できます。 Data Integration は、バッチ同期、増分データ同期、リアルタイムの完全および増分データ同期など、さまざまな種類の同期ソリューションを提供します。
この例では、バッチ同期ソリューションを使用します。 DataWorks は、Data Integration のバッチ同期機能をバッチ同期ノードにカプセル化します。各バッチ同期ノードは同期タスクを表します。ノードのソースとデスティネーションを構成して、データソース間のデータ転送を定義し、フィールドマッピングを構成して、ソースフィールドとデスティネーションフィールド間の読み取りと書き込みの関係を定義できます。
この例では、必要なテストデータとデータソースが準備されています。ワークスペースからテストデータにアクセスするには、データソース情報をワークスペースに追加するだけで済みます。
この実験のデータは DataWorks での実験操作にのみ使用でき、すべてのデータは手動のモックデータであり、Data Integration でのみ読み取ることができます。
ソースデータとデスティネーショントテーブルに関する情報
Data Integration サービスは、ApsaraDB RDS for MySQL に格納されている基本的なユーザー情報と OSS に格納されている Web サイトアクセスログを MaxCompute に同期するために使用されます。次の表に、ソースデータとデスティネーショントテーブルに関する情報を示します。
ソース | デスティネーション (MaxCompute) | |
MySQL | テーブル: ods_user_info_d
| テーブル: ods_user_info_d
|
OSS | オブジェクト: user_log.txt
| テーブル: ods_raw_log_d
|
ステップ 1: サーバーレスリソースグループを購入して構成する
この例では、DataWorks のサーバーレスリソースグループを使用して OSS と MySQL から MaxCompute にデータを同期するプロセスを紹介します。まず、サーバーレスリソースグループを購入し、初期設定を完了します。
サーバーレスリソースグループを購入する。
DataWorks コンソール にログオンします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックして、[リソースグループ] ページに移動します。
[リソースグループの作成] をクリックし、[リージョンとゾーン] を [中国 (上海)] として選択し、[リソースグループ名] を入力し、プロンプトに従って追加パラメーターを構成します。ガイダンスに従って支払いプロセスを完了します。詳細については、「サーバーレスリソースグループの課金」をご参照ください。
説明この例では、[中国 (上海)] リージョンにあるサーバーレスリソースグループを使用します。サーバーレスリソースグループは、リージョンをまたがる操作をサポートしていません。
サーバーレスリソースグループを構成する。
DataWorks コンソール にログオンします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックして [リソースグループ] ページに移動します。
購入したサーバーレスリソースグループを見つけ、[アクション] 列の [ワークスペースの関連付け] をクリックし、DataWorks ワークスペースに関連付けます。
リソースグループのインターネットアクセスを構成します。
NAT ゲートウェイコンソール にログオンします。
[インターネット NAT ゲートウェイ] ページで、[インターネット NAT ゲートウェイの作成] をクリックします。
購入ページで、次のパラメーターを設定し、[今すぐ購入] をクリックします。
パラメーター
説明
[リージョン]
インターネット NAT ゲートウェイを作成するリージョンを選択します。
[ネットワークとゾーン]
NAT ゲートウェイが属する VPC と vSwitch を選択します。 NAT ゲートウェイの作成後、VPC または vSwitch を変更することはできません。
[ネットワークタイプ]
この例では、[インターネット NAT ゲートウェイ] が選択されています。
[インターネット NAT ゲートウェイ]: ネットワークアドレス変換機能を提供し、EIP に関連付けて ECS インスタンスがインターネットにアクセスできるようにし、プライベートネットワークとパブリックネットワーク間の通信を可能にします。
[VPC NAT ゲートウェイ]: ネットワークアドレス変換機能も提供しますが、EIP に関連付けることはできません。 ECS インスタンスのプライベートネットワーク内でのアドレス変換のみを提供でき、内部アドレスの非表示やアドレス競合の回避などのシナリオに適しています。
[エラスティック IP アドレス]
この例では、[EIP の購入と関連付け] が選択されています。
[既存のものを選択]
[EIP インスタンス]: [インスタンスに関連付けられていない] EIP を選択します。
[EIP の購入と関連付け]: デフォルトでは、トラフィック課金の [BGP (マルチ ISP)] EIP が作成されます。ビジネス要件に基づいて [帯域幅ピーク] を選択できます。
説明異なる回線タイプまたは課金方法の EIP を関連付ける場合は、最初に EIP を申請 し、次に [既存の EIP を選択] を関連付けます。
NAT ゲートウェイに関連付ける各 EIP は、NAT ゲートウェイが属する vSwitch のプライベート IP アドレスを占有します。 vSwitch に十分な数の使用可能なプライベート IP アドレスがあることを確認してください。そうでない場合、新しい EIP を NAT ゲートウェイに関連付けることはできません。
[後で構成]: 作成された NAT ゲートウェイにはインターネットアクセス機能がありません。 NAT ゲートウェイに EIP を手動で関連付ける必要があります。
詳細については、「サーバーレスリソースグループを作成して使用する」をご参照ください。
ステップ 2: データソースを追加する
この例では、テストデータにアクセスするために、[user_behavior_analysis_httpfile] という名前の HttpFile データソースと [user_behavior_analysis_mysql] という名前の ApsaraDB RDS for MySQL データソースをワークスペースに追加する必要があります。テストに使用されるデータソースの基本情報が提供されています。
Data Integration 同期タスクを構成する前に、DataWorks コンソールの [データソース] ページでソースとデスティネーションのデータベースまたはデータウェアハウスを追加および構成できます。これにより、同期タスクを構成して使用するソースとデスティネーションのデータベースまたはデータウェアハウスを決定するときに、データソースを名前で検索できます。
この実験のデータは DataWorks での実験操作にのみ使用でき、すべてのデータは手動のモックデータであり、Data Integration でのみ読み取ることができます。
user_behavior_analysis_httpfile という名前の HttpFile データソースを追加する
HttpFile データソースをワークスペースに追加します。次に、データソースとデータ同期に使用するリソースグループの間にネットワーク接続が確立されているかどうかをテストします。 HttpFile データソースは、OSS に格納され、DataWorks からアクセスできるユーザーの Web サイトアクセステストデータを読み取るために使用されます。
[データソース] ページに移動します。
DataWorks コンソール にログオンします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターに移動] をクリックします。
[SettingCenter] ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。
HttpFile データソースを追加します。
[SettingCenter] ページの左側のナビゲーションウィンドウで、
を選択します。 [データソース] ページで、[データソースの追加] をクリックします。[データソースの追加] ダイアログボックスで、[HttpFile] をクリックします。
[HttpFile データソースの追加] ページで、パラメーターを構成します。次の表にパラメーターを示します。
パラメーター
説明
データソース名
データソースの名前。ワークスペース内のデータソースの識別子です。この例では、パラメーターは user_behavior_analysis_httpfile に設定されています。
データソースの説明
データソースの説明。データソースは、このトピックの例専用です。このデータソースからデータを読み取るバッチ同期タスクを構成すると、DataWorks が提供するテストデータにアクセスできます。このデータソースは、[Data Integration] ページでのみ追加できます。他のモジュールではデータソースを追加できません。
環境
開発環境と本番環境を選択します。
説明開発環境のデータソースと本番環境のデータソースを追加する必要があります。そうでない場合、関連するタスクを実行してデータを生成するときにエラーが報告されます。
URL
OSS バケットの URL。
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com
を入力します。リソースグループの接続性
リソースグループ情報を表示する表で、購入したサーバーレスリソースグループを見つけ、[接続ステータス] 列の [ネットワーク接続のテスト] をクリックします。リソースグループと開発環境および本番環境のデータソース間のネットワーク接続を個別にテストする必要があります。テストが成功したことを示すメッセージがシステムから返されると、接続ステータスは [接続済み] に変わります。
user_behavior_analysis_mysql という名前の ApsaraDB RDS for MySQL データソースを追加する
ApsaraDB RDS for MySQL データソースをワークスペースに追加します。次に、データソースとデータ同期に使用するリソースグループの間にネットワーク接続が確立されているかどうかをテストします。 ApsaraDB RDS for MySQL データソースは、ApsaraDB RDS for MySQL に格納され、DataWorks からアクセスできる基本的なユーザー情報を読み取るために使用されます。
[データソース] ページに移動します。
DataWorks コンソール にログオンします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターに移動] をクリックします。
[SettingCenter] ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。
ApsaraDB RDS for MySQL データソースを追加します。
[SettingCenter] ページの左側のナビゲーションウィンドウで、
を選択します。 [データソース] ページで、[データソースの追加] をクリックします。[データソースの追加] ダイアログボックスで、[MySQL] をクリックします。
[MySQL データソースの追加] ページで、パラメーターを構成します。次の表にパラメーターを示します。
パラメーター
説明
構成モード
このパラメーターを接続文字列モードに設定します。
データソース名
データソースの名前。 user_behavior_analysis_mysql と入力します。
データソースの説明
データソースの説明。データソースは、このトピックの例専用です。このデータソースからデータを読み取るバッチ同期タスクを構成すると、DataWorks が提供するテストデータにアクセスできます。このデータソースは、[Data Integration] ページでのみ追加できます。他のモジュールではデータソースを追加できません。
環境
開発と本番を選択します。
説明開発環境のデータソースと本番環境のデータソースを追加する必要があります。そうでない場合、関連するタスクを実行してデータを生成するときにエラーが報告されます。
JDBC URL
ホストアドレス IP
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
ポート
3306
データベース名
workshop
ユーザー名
workshop
パスワード
workshop#2017
認証オプション
このパラメーターを認証なしに設定します。
リソースグループの接続性
リソースグループ情報を表示する表で、購入したサーバーレスリソースグループを見つけ、[接続ステータス] 列の [ネットワーク接続のテスト] をクリックします。リソースグループと開発環境および本番環境のデータソース間のネットワーク接続を個別にテストする必要があります。テストが成功したことを示すメッセージがシステムから返されると、接続ステータスは [接続済み] に変わります。
ステップ 3: ワークフローを作成する
要件分析 に基づいてワークフローを設計します。 [ods_raw_log_d] と [ods_user_info_d] という名前の 2 つのバッチ同期ノードを作成および使用して、ApsaraDB RDS for MySQL からの基本的なユーザー情報と OSS からの Web サイトアクセスログを同期します。次に、[workshop_start] という名前のゼロロードノードを作成して、ワークフロー内のノードを一元管理します。このトピックでは、データ同期手順のみを説明します。具体的なタスク構成は含まれていません。
1. ワークフローを作成する
デフォルトでは、DataWorks は workflow という名前のワークフローを提供します。ワークフロー作成手順をスキップして、提供されているワークフローを直接使用できます。
[DataStudio] ページに移動します。
DataWorks コンソール にログオンします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。
ワークフローを作成します。
[スケジュールされたワークフロー] ウィンドウで、[ビジネスフロー] を右クリックし、[ワークフローの作成] を選択します。 [ワークフローの作成] ダイアログボックスで、ビジネス要件に基づいて [ワークフロー名] パラメーターを構成します。この例では、WorkShop を使用します。
2. ワークフローを設計する
ワークフローの構成タブに移動します。
前の手順で作成したワークフローの名前をダブルクリックして、ワークフローの構成タブに移動します。
ノードを作成します。
ワークフローの構成タブでコンポーネントをドラッグアンドドロップして、ワークフローを設計できます。 [ノードの作成] をクリックします。ノードの作成元となるノードタイプを見つけ、右側のワークフローキャンバスにドラッグします。
この例では、workshop_start という名前の [ゼロロードノード] と、ods_raw_log_d と ods_user_info_d という名前の 2 つの [バッチ同期ノード] を作成する必要があります。 ods_raw_log_d ノードは OSS から Web サイトアクセスログを同期するために使用され、ods_user_info_d ノードは ApsaraDB RDS for MySQL から基本的なユーザー情報を同期するために使用されます。
ノードのスケジューリング依存関係を構成します。
2 つのバッチ同期ノードの祖先ノードとして [workshop_start] ノードを構成します。この例では、ゼロロードノードとバッチ同期ノードの間にデータ系列は存在しません。したがって、ワークフロー内のゼロロードノード workshop_start とバッチ同期ノード ods_raw_log_d および ods_user_info_d の間に線を描画して、スケジューリングの依存関係を構成できます。スケジューリング依存関係の構成方法の詳細については、「スケジューリング依存関係の構成ガイド」をご参照ください。
ステップ 4: MaxCompute テーブルを作成する
Data Integration を使用して同期されたデータを格納するために使用される MaxCompute テーブルを事前に作成する必要があります。この例では、テーブルは迅速な方法で作成されます。 MaxCompute テーブル関連操作の詳細については、「MaxCompute テーブルを作成および管理する」をご参照ください。
テーブル作成のエントリポイントに移動します。
ods_raw_log_d という名前のテーブルを作成します。
[テーブルの作成] ダイアログボックスで、[名前] フィールドに ods_raw_log_d と入力します。テーブル構成タブの上部で、[DDL] をクリックし、次のテーブル作成ステートメントを入力して、[テーブルスキーマの生成] をクリックします。 [確認] ダイアログボックスで、[確認] をクリックして元の構成を上書きします。
CREATE TABLE IF NOT EXISTS ods_raw_log_d ( col STRING -- 列 ) PARTITIONED BY // パーティション化 ( dt STRING -- 日付 ) LIFECYCLE 7; // ライフサイクル
ods_user_info_d という名前のテーブルを作成します。
[テーブルの作成] ダイアログボックスで、[名前] フィールドに ods_user_info_d と入力します。テーブル構成タブの上部で、[DDL] をクリックし、次のテーブル作成ステートメントを入力して、[テーブルスキーマの生成] をクリックします。 [確認] ダイアログボックスで、[確認] をクリックして元の構成を上書きします。
CREATE TABLE IF NOT EXISTS ods_user_info_d ( uid STRING COMMENT 'User ID', // ユーザーID gender STRING COMMENT 'Gender', // 性別 age_range STRING COMMENT 'Age range', // 年齢層 zodiac STRING COMMENT 'Zodiac sign' // 干支 ) PARTITIONED BY ( // パーティション化 dt STRING // 日付 ) LIFECYCLE 7; // ライフサイクル
テーブルをコミットしてデプロイします。
テーブル情報が有効であることを確認した後、ods_user_info_d テーブルと ods_raw_log_d テーブルの構成タブで、[開発環境にコミット] と [本番環境にコミット] を順番にクリックします。開発環境と本番環境のワークスペースに関連付けられている MaxCompute プロジェクトでは、ノード構成に基づいて、MaxCompute プロジェクトに関連する物理テーブルが作成されます。
説明テーブルのスキーマを定義した後、テーブルを開発環境と本番環境にコミットできます。テーブルがコミットされると、特定の環境の MaxCompute プロジェクトでテーブルを表示できます。
ワークスペースの開発環境にテーブルをコミットすると、開発環境のワークスペースに関連付けられている MaxCompute プロジェクトにテーブルが作成されます。
ワークスペースの本番環境にテーブルをコミットすると、本番環境のワークスペースに関連付けられている MaxCompute プロジェクトにテーブルが作成されます。
ステップ 5: workshop_start という名前のゼロロードノードを構成する
この例では、workshop_start ノードは毎日 00:15 に実行され、ユーザープロファイル分析ワークフローの実行をトリガーします。 workshop_start ノードは管理および制御ノードです。ノードのコードを記述する必要はありません。ノードのスケジューリング構成を以下に示します。
workshop_start ノードの構成タブに移動します。
[DataStudio] ページで、前の手順で作成したワークフローの名前をダブルクリックして、ワークフローの構成タブに移動します。ゼロロードノード [workshop_start] をダブルクリックします。ゼロロードノードの構成タブで、右側のナビゲーションウィンドウの [プロパティ] をクリックします。
workshop_start ノードのスケジューリングプロパティを構成します。
ワークスペースのルートノードが毎日 00:15 に workshop_start ノードの実行をトリガーできるように、workshop_start ノードに次の設定を構成します。
スケジューリング時間を構成する: [スケジューリングサイクル] を [日] に、[スケジュールされた時間] を [00:15] に設定します。
再実行プロパティを構成する: [再実行] を [実行ステータスに関係なく許可] に設定します。
スケジューリングの依存関係を構成する: [ワークスペースのルートノード] を workshop_start ノードの祖先ノードとして構成します。
この例では、[workshop_start] ノードは管理および制御ノードであり、他のノードに依存しません。したがって、workshop_start ノードをワークスペースのルートノードに依存するように構成できます。これにより、ワークスペースのルートノードを使用して、現在のユーザープロファイル分析ワークフローの実行をトリガーできます。
説明デフォルトでは、ワークスペースのルートノードは、ワークスペースの作成後に自動的に生成されます。ほとんどの場合、ワークスペース内のすべてのノードは、ワークスペースのルートノードに依存します。デフォルトでは、ワークスペースのルートノードは 00:00 にすべてのレベル 1 の子孫ノードの実行をトリガーし始めます。ルートノードの構成は変更できません。
ステップ 6: ods_raw_log_d ノードを構成する
このステップでは、ods_raw_log_d ノードを構成して、OSS オブジェクト user_log.txt から MaxCompute テーブル ods_raw_log_d にユーザーの Web サイトアクセスログを同期できます。
[DataStudio] ページで、ワークフロー内の ods_raw_log_d ノードをダブルクリックします。ノードの構成タブで、ノードを構成します。
1. データソースと使用するリソースグループの間にネットワーク接続を確立する
このステップでは、サーバーレスリソースグループが使用されます。リソースグループとソース user_behavior_analysis_httpfile およびデスティネーション MaxCompute データソース間のネットワーク接続をテストする必要があります。
[ソース] を [HttpFile] に設定し、[データソース名] を [user_behavior_analysis_httpfile] に設定します。これは、ソースの追加 で追加したデータソースです。
[リソースグループ] ステップで、ドロップダウンリストから [購入したサーバーレスリソースグループ] を選択します。
[デスティネーション] を [MaxCompute] に設定し、追加するデータソースを選択します。
2. 同期タスクを構成する
同期タスクの基本設定を構成します。
[ソース] については、データの読み取り元となるオブジェクトとして user_log.txt オブジェクトを構成する必要があります。
[デスティネーション] については、データの書き込み先となるテーブルとして ods_raw_log_d テーブルを構成する必要があります。また、[パーティション情報] パラメーターも構成する必要があります。 bizdate 変数は、[パーティション情報] フィールドで ${} 形式で定義されています。変数値は、後続のステップ 3 で割り当てられます。
フィールドマッピングと一般設定を構成します。
DataWorks では、ソースフィールドとデスティネーションフィールド間のマッピングを構成して、指定されたソースフィールドからデータを読み取り、デスティネーションフィールドにデータを書き込むことができます。 [チャネル制御] セクションでは、データの読み取りと書き込みの並列処理、データベースのパフォーマンスへの影響を防ぐための最大転送速度、ダーティデータレコードと分散実行のポリシーなどの機能も使用できます。この例では、デフォルト設定を使用します。同期タスクのその他の構成項目については、「コードレス UI を使用してバッチ同期タスクを構成する」をご参照ください。
3. スケジューリングプロパティを構成する
同期タスクのスケジューリングプロパティを次の図のように構成すると、DataWorks Data Integration は OSS オブジェクト user_log.txt から MaxCompute テーブル ods_raw_log_d の時間ベースのパーティションに毎日 00:15 にデータを同期します。
[パラメーター] セクションで、[パラメーター名] に bizdate、[パラメーター値] に $bizdate と入力します。これは、前日の日付を照会するために使用されます。パラメーター値の形式は [yyyymmdd] です。
[スケジュール] セクションで、[スケジューリングサイクル] を [日] に設定します。現在のノードの [スケジュールされた時間] パラメーターを個別に構成する必要はありません。現在のノードが毎日スケジュールされる時間は、ワークフローのルートノード workshop_start のスケジューリング時間によって決まります。ルートノードのスケジューリング時間は毎日 00:15 です。
[依存関係] セクションのパラメーターを構成します。
[現在のノードの祖先ノード] を決定する: 現在のノードの親ノードに [workshop_start] ノードを表示するかどうかを決定します。線を描画することで現在のノードの祖先ノードとして指定したノードが表示されます。 [workshop_start] ノードが表示されない場合は、データの同期 を参照して、ビジネスデータ同期フェーズのワークフロー設計が完了しているかどうかを確認してください。
この例では、workshop_start ノードのスケジューリング時間が到着し、ノードの実行が完了すると、現在のノードの実行がトリガーされます。
[現在のノードの出力] を決定する: 本番環境の MaxCompute プロジェクト名.ods_raw_log_d の形式で名前が付けられた現在のノードの出力があるかどうかを決定します。 [SettingCenter] の [ワークスペース] ページに移動して、本番環境の MaxCompute プロジェクト名を表示できます。
ステップ 7: ods_user_info_d ノードを構成する
このステップでは、ods_user_info_d ノードを構成して、ApsaraDB RDS for MySQL テーブル ods_user_info_d から MaxCompute テーブル ods_user_info_d に基本的なユーザー情報を同期できます。
[DataStudio] ページで、ワークフロー内の ods_user_info_d ノードをダブルクリックします。ノードの構成タブで、ノードを構成します。
1. データソースと使用するリソースグループの間にネットワーク接続を確立する
このステップでは、サーバーレスリソースグループが使用されます。リソースグループとソース user_behavior_analysis_mysql およびデスティネーション MaxCompute データソース間のネットワーク接続をテストする必要があります。
[ソース] を MySQL に設定し、[データソース名] を user_behavior_analysis_mysql に設定します。
[リソースグループ] ステップで、ドロップダウンリストから購入したサーバーレスリソースグループを選択します。
[デスティネーション] を MaxCompute に設定し、追加するデータソースを選択します。
2. 同期タスクを構成する
同期タスクの基本設定を構成します。
[ソース] については、データの読み取り元となるテーブルとして ods_user_info_d テーブルを構成する必要があります。
[デスティネーション] については、データの書き込み先となるテーブルとして ods_user_info_d テーブルを構成する必要があります。また、[パーティション情報] パラメーターも構成する必要があります。 bizdate 変数は、[パーティション情報] フィールドで ${} 形式で定義されています。変数値は、後続のステップ 3 で割り当てられます。
説明この例では、デフォルトで ApsaraDB RDS for MySQL テーブルから完全データが読み取られ、MaxCompute テーブルの指定された時間ベースのパーティションに書き込まれます。
フィールドマッピングと一般設定を構成します。
DataWorks では、ソースフィールドとデスティネーションフィールド間のマッピングを構成して、指定されたソースフィールドからデータを読み取り、デスティネーションフィールドにデータを書き込むことができます。 [チャネル制御] セクションでは、データの読み取りと書き込みの並列処理、データベースのパフォーマンスへの影響を防ぐための最大転送速度、ダーティデータレコードと分散実行のポリシーなどの機能も使用できます。この例では、デフォルト設定を使用します。同期タスクのその他の構成項目については、「コードレス UI を使用してバッチ同期タスクを構成する」をご参照ください。
3. スケジューリングプロパティを構成する
同期タスクのスケジューリングプロパティを次の図のように構成すると、DataWorks Data Integration は ApsaraDB RDS for MySQL テーブル ods_user_info_d から MaxCompute テーブル ods_user_info_d の時間ベースのパーティションに毎日 00:15 にデータを同期します。
[パラメーター] セクションで、[パラメーター名] に [bizdate]、[パラメーター値] に $bizdate と入力します。これは、前日の日付を照会するために使用されます。パラメーター値の形式は yyyymmdd です。
[スケジュール] セクションで、[スケジューリングサイクル] を [日] に設定します。現在のノードの [スケジュールされた時間] パラメーターを個別に構成する必要はありません。現在のノードが毎日スケジュールされる時間は、ワークフローのルートノード workshop_start のスケジューリング時間によって決まります。ルートノードのスケジューリング時間は毎日 00:15 です。
[依存関係] セクションのパラメーターを構成します。
[現在のノードの祖先ノード] を決定する: 現在のノードの親ノードに workshop_start ノードを表示するかどうかを決定します。線を描画することで現在のノードの祖先ノードとして指定したノードが表示されます。 [workshop_start] ノードが表示されない場合は、データの同期 を参照して、ビジネスデータ同期フェーズのワークフロー設計が完了しているかどうかを確認してください。
この例では、workshop_start ノードのスケジューリング時間が到着し、ノードの実行が完了すると、現在のノードの実行がトリガーされます。
[現在のノードの出力] を決定する: 本番環境の MaxCompute プロジェクト名.ods_user_info_d の形式で名前が付けられた現在のノードの出力があるかどうかを決定します。 [SettingCenter] の [ワークスペース] ページに移動して、本番環境の MaxCompute プロジェクト名を表示できます。
説明DataWorks では、ノードの出力を使用して、ノードとその子孫ノード間のスケジューリングの依存関係を構成します。 SQL ノードが同期ノードに依存している場合、SQL ノードが同期ノードの出力テーブルの処理を開始すると、DataWorks は 自動解析 機能を使用して、テーブル系列に基づいて同期ノードを SQL ノードの祖先ノードとして迅速に追加します。ノード出力テーブル ods_user_info_d と同じ名前のノード出力が存在するかどうかを確認する必要があります。
ステップ 8: WorkShop ワークフロー内のノードを実行して結果を表示する
現在のワークフロー内のノードを実行して、ApsaraDB RDS for MySQL の基本的なユーザー情報と OSS のユーザーの Web サイトアクセスログを関連する MaxCompute テーブルに書き込みます。
ワークフローを実行する
[DataStudio] ページで、[ビジネスフロー] の下の WorkShop ワークフローをダブルクリックします。ワークフローの構成タブで、上部ツールバーの
アイコンをクリックして、ノード間のスケジューリングの依存関係に基づいてワークフロー内のノードを実行します。
ステータスを確認します。
ノードステータスを表示する:ノードが
状態の場合、同期プロセスは正常です。
ノード実行ログを表示する: たとえば、[ods_user_info_d] ノードまたは [ods_raw_log_d] ノードを右クリックし、[ログの表示] を選択します。次の図に示す情報がログに表示されている場合、ノードは実行され、データは同期されています。
同期結果を照会する
ワークフロー内のノードが期待どおりに実行された場合、ApsaraDB RDS for MySQL テーブル ods_user_info_d のすべての基本的なユーザー情報は、出力テーブル workshop2024_01_dev.ods_user_info_d の前日のパーティションに同期され、OSS オブジェクト user_log.txt 内のユーザーのすべての Web サイトアクセスログは出力テーブル workshop2024_01_dev.ods_raw_log_d の前日のパーティションに同期されます。クエリ SQL ステートメントを本番環境にデプロイして実行する必要はありません。したがって、アドホッククエリを作成することで同期結果を照会できます。
アドホッククエリを作成します。
[DataStudio] ページの左側のナビゲーションウィンドウで、
アイコンをクリックします。 [アドホッククエリ] ウィンドウで、[アドホッククエリ] を右クリックし、 を選択します。
同期結果テーブルを照会します。
次の SQL ステートメントを実行して、データ書き込み結果を確認します。 ods_raw_log_d テーブルと ods_user_info_d テーブルにインポートされたレコード数を確認します。
// 読み取りと書き込み操作を実行するデータのデータタイムスタンプを、パーティションのフィルター条件として指定する必要があります。たとえば、ノードが 2023 年 6 月 21 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20230620 で、ノードの実行日より 1 日前です。 select count(*) from ods_user_info_d where dt=データタイムスタンプ; // ods_user_info_d からデータタイムスタンプが指定された日付のレコード数をカウント select count(*) from ods_raw_log_d where dt=データタイムスタンプ; // ods_raw_log_d からデータタイムスタンプが指定された日付のレコード数をカウント
説明この例では、ノードは開発環境である [DataStudio] で実行されます。したがって、データはデフォルトで、開発環境のワークスペースに関連付けられている MaxCompute プロジェクト workshop2024_01_dev の指定されたテーブルに書き込まれます。
ステップ 9: ワークフローをコミットする
ノードコードをデバッグした後、ユーザープロファイル分析ワークフローをスケジューリングシステムにコミットして定期的にスケジューリングします。これは、未加工のビジネスデータが MaxCompute デスティネーショントテーブルに定期的に同期されることを示します。
ワークフローの構成タブに移動します。
[DataStudio] ページで、ワークフロー名 [WorkShop] をダブルクリックして、ワークフローの構成タブに移動します。
ワークフローをコミットします。
ワークフローキャンバスで、上部ツールバーの
アイコンをクリックして、ワークフローをコミットします。
コミット操作を確認します。
[コミット] ダイアログボックスで、現在のワークフロー内のすべてのノードを選択し、[I/O の不整合アラートを無視] を選択します。構成を確認し、[確認] をクリックして、ワークフロー内のすべてのノードをコミットします。 [デプロイ] ページで、ノードを選択して本番環境にデプロイします。
次のステップ
このチュートリアルに基づいてデータの同期方法を理解したら、次のチュートリアルに進むことができます。次のチュートリアルでは、同期されたデータを計算および分析する方法を学習します。詳細については、「データを処理する」をご参照ください。