このチュートリアルでは、Data Integration Batch Synchronization タスクを使用して、MySQL テーブル (ods_user_info_d) と Object Storage Service (OSS) ログファイル (user_log.txt) から StarRocks テーブル (ods_user_info_d_starrocks および ods_raw_log_d_starrocks) にデータを同期する方法を説明します。
前提条件
開始する前に、必要な環境が準備されていることを確認してください。詳細については、「環境の準備」をご参照ください。
1. データソースの作成
後続のステップでデータを処理できるようにするには、生のデータを取得するために、次のデータソースを DataWorks ワークスペースに追加する必要があります。
MySQL データソース: このチュートリアルでは、
user_behavior_analysis_mysqlという名前のデータソースを使用して、MySQL からユーザーの基本情報 (ods_user_info_d) を取得します。HttpFile データソース: このチュートリアルでは、データソースは
user_behavior_analysis_httpfileという名前で、OSS に保存されているユーザーのウェブサイトアクセスログ (user_log.txt) を取得するために使用されます。
MySQL データソース (user_behavior_analysis_mysql) の作成
このチュートリアルのユーザー情報は MySQL データベースに格納されています。このデータ (ods_user_info_d) を StarRocks に同期するには、MySQL データソースを作成します。
データソースページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターへ] をクリックします。
左側のナビゲーションウィンドウで、[データソース] をクリックしてデータソースページに移動します。
[データソースの追加] をクリックします。データソースタイプとして MySQL を検索して選択します。
[MySQL データソースの追加] ページで、パラメーターを構成します。このチュートリアルでは、開発環境と本番環境の両方で同じ例の値を使用します。
次の表に、主要パラメーターを示します。その他のパラメーターはデフォルト値を維持できます。
パラメーター
説明
データソース名
データソースの名前を入力します。このチュートリアルでは、
user_behavior_analysis_mysqlと入力します。データソースの説明
このデータソースは DataWorks チュートリアル用です。バッチ同期タスクを構成してプラットフォームが提供するテストデータにアクセスする際に、このデータソースからデータを読み取ります。このデータソースは Data Integration シナリオでのみ読み取り可能です。他のモジュールでは使用できません。
構成モード
[接続文字列モード] を選択します。
接続アドレス
ホスト IP アドレス:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.comポート:
3306
データベース名
データベース名を入力します。このチュートリアルでは、
workshopと入力します。ユーザー名
ユーザー名を入力します。このチュートリアルでは、
workshopと入力します。パスワード
パスワードを入力します。このチュートリアルでは、
workshop#2017と入力します。認証方法
認証なし。
[接続構成] セクションで、本番環境と開発環境の両方について [ネットワーク接続テスト] をクリックします。接続ステータスが [接続済み] であることを確認します。
重要リソースグループがワークスペースにアタッチされており、パブリックネットワークアクセスが有効になっていることを確認してください。そうでない場合、データ同期は失敗します。詳細については、「環境の準備」をご参照ください。
利用可能なリソースグループがない場合は、接続構成セクションのプロンプトに従ってください。[購入] と [購入済みリソースグループの関連付け] をクリックします。
[作成完了] をクリックします。
HttpFile データソース (user_behavior_analysis_httpfile) の作成
このチュートリアルのユーザーウェブサイトアクセスログは OSS に保存されています。ログデータ (user_log.txt) を StarRocks に同期するために、HttpFile データソースを作成する必要があります。
左側のナビゲーションウィンドウで、[データソース] をクリックします。
「[データソースの追加]」をクリックします。「[データソースの追加]」ダイアログボックスで、データソースの種類として[HttpFile]を検索して選択します。
[HttpFile データソースの追加] ページで、パラメーターを設定します。このチュートリアルでは、開発環境と本番環境の両方で同じサンプル値を使用します。
次の表に、主要なパラメーターを示します。その他のパラメーターはデフォルト値のままにできます。
パラメーター
説明
データソース名
データソース名を入力します。このチュートリアルでは、
user_behavior_analysis_httpfileと入力します。データソースの説明
このデータソースは DataWorks チュートリアル用です。バッチ同期タスクを構成してプラットフォームが提供するテストデータにアクセスする場合に、このデータソースからデータを読み取ります。このデータソースは Data Integration シナリオでのみ読み取り可能です。他のモジュールでは使用できません。
URL
開発環境と本番環境の両方で、URL を
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.comに設定します。[接続設定] セクションで、本番環境および開発環境の両方で [ネットワーク接続のテスト] をクリックします。接続ステータスが [接続済み] であることを確認してください。
重要リソースグループがワークスペースにアタッチされ、パブリックネットワークアクセスが有効になっていることを確認してください。そうしないと、データ同期が失敗します。詳細については、「環境の準備」をご参照ください。
利用可能なリソースグループがない場合は、接続構成セクションのプロンプトに従い、[購入] と [購入済みリソースグループの関連付け] をクリックします。
[作成の完了] をクリックします。
2. 同期パイプラインの構築
左上隅の
アイコンをクリックし、 を選択します。次に、ページ上部で、このチュートリアル用に作成されたワークスペースに切り替えます。左側のナビゲーションウィンドウで、
アイコンをクリックします。[ワークスペースディレクトリ] セクションで、
アイコンをクリックし、[ワークフローの作成] を選択して、ワークフローに名前を付けます。このチュートリアルでは、user_profile_analysis_starrocksと名付けます。ワークフローキャンバスで、次の表にリストされているノードを作成します。コンポーネントは左側のウィンドウで見つけて、キャンバスにドラッグできます。
ソースタイプ:
MySQL。宛先タイプ:
OSS。特定タイプ: バッチ同期.
ノードタイプ
ノード名
説明
ゼロロードノードworkshop_start_starrocksユーザーペルソナ分析ワークフロー全体を管理し、データ転送パスを明確にします。このノードは [ドライラン] ノードであり、コードの編集は不要です。
StarRocks ノードddl_ods_user_info_d_starrocksMySQL データソースからユーザーデータを受信するために、宛先 StarRocks テーブル
ods_user_info_d_starrocksを作成します。
StarRocks ノードddl_ods_raw_log_d_starrocksOSS ソースからウェブサイトアクセスログを受信するために、宛先 StarRocks テーブル
ods_raw_log_d_starrocksを作成します。
バッチ同期ノードods_user_info_d_starrocksMySQL から StarRocks テーブル
ods_user_info_d_starrocksにユーザー情報データを同期します。
バッチ同期ノードods_raw_log_d_starrocksOSS から StarRocks テーブル
ods_raw_log_d_starrocksにユーザーウェブサイトアクセスログを同期します。workshop_start_starrocksを両方のバッチ同期ノードの親ノードとして、図のようにノードを接続します。最終的なワークフローは次のようになります。ワークフローのスケジューリングプロパティを構成します。
ワークフロー キャンバスで、右側のペインの [スケジューリング] をクリックし、パラメーターを設定します。以下の表では、主要なパラメーターについて説明します。その他のパラメーターは、デフォルト値のままにしておくことができます。
スケジューリングパラメーター
説明
スケジューリングパラメーター
ワークフロー全体のスケジューリングパラメーターを構成できます。ワークフローの内部ノードは、構成されたスケジューリングパラメーターを直接使用できます。このチュートリアルでは、パラメーターは
bizdate=$[yyyymmdd-1]に設定され、前日の日付を取得します。スケジューリングサイクル
このチュートリアルでは、
Dayに設定します。スケジューリング時間
このチュートリアルでは、[スケジュール設定時刻] を
00:30に設定します。これは、ワークフローが毎日00:30に開始されることを意味します。スケジューリング依存関係
このワークフローには上流の依存関係がないため、設定する必要はありません。管理を容易にするため、[ワークスペースルートノードを使用] をクリックして、ワークフローをワークスペースのルートノードにアタッチします。
ワークスペースルートノードの命名形式は
workspace_name_rootです。
3. 同期タスクの構成
初期ノードの構成
ワークフロー キャンバスで、
workshop_start_starrocksノードの上にマウスを移動して、[ノードを開く] をクリックします。workshop_start_starrocksノード構成ページの右側で、[プロパティ] をクリックしてパラメーターを設定します。以下の表では、主要なパラメーターについて説明しています。一覧にないパラメーターについては、デフォルト設定をそのまま使用できます。スケジューリングパラメーター
説明
スケジューリングタイプ
このチュートリアルでは、
Dry-runを選択します。スケジューリング用リソースグループ
このチュートリアルでは、「環境の準備」ステップで作成されたサーバーレスリソースグループを選択します。
スケジューリング依存関係
「
workshop_start_starrocks」は初期ノードであり、上流の依存関係がないため、[ワークスペースのルートノードを使用] をクリックできます。これにより、ワークスペースのルートノードがワークフローをトリガーします。ワークスペースルートノードは
WorkspaceName_rootの形式で命名されます。
ユーザーテーブル ddl_ods_user_info_d_starrocks の作成
まず、MySQL データソースからユーザー情報を受信するために、StarRocks で ddl_ods_user_info_d_starrocks テーブルを作成します。このテーブルは、このノード内で作成するか、データカタログで手動で作成できます。
ワークフローキャンバスで、
ddl_ods_user_info_d_starrocksノードにカーソルを合わせ、[ノードを開く] をクリックします。テーブル作成文を編集します。
CREATE TABLE IF NOT EXISTS ods_user_info_d_starrocks ( uid STRING COMMENT 'ユーザー ID', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢層', zodiac STRING COMMENT '星座', dt STRING not null COMMENT '日付' ) DUPLICATE KEY(uid) COMMENT 'ユーザープロファイル分析ユースケース - ユーザー情報テーブル' PARTITION BY(dt) PROPERTIES("replication_num" = "1");デバッグパラメーターを設定します。
StarRocks ノード設定ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト実行で使用する次のパラメーターを設定します。
パラメーター
説明
コンピューティングリソース
「事前準備」ステップでバインドした StarRocks 計算リソースを選択します。
[リソースグループ]
「事前準備」ステップで購入したサーバーレスリソースグループを選択します。
(任意) スケジューリングプロパティを設定します。
このチュートリアルでは、デフォルトのスケジューリングパラメーターのままでかまいません。ノード設定ページの右側で、[プロパティ] をクリックします。パラメーターの詳細については、「ノードのスケジューリングプロパティの設定」をご参照ください。
[スケジューリングパラメーター]:これらはすでにワークフローレベルで設定されているため、ノードレベルでの設定は不要です。タスクまたはコードで直接使用できます。
[スケジューリングポリシー]:[遅延実行時間] パラメーターを使用して、ワークフローの実行後に子ノードの実行を遅延させる時間を指定できます。このチュートリアルでは、この設定は行いません。
上部のツールバーで [保存] をクリックしてノードを保存します。
ログテーブル ddl_ods_raw_log_d_starrocks の作成
まず、HttpFile データソースからユーザーのウェブサイトアクセスログを受信するために、StarRocks で ddl_ods_raw_log_d_starrocks テーブルを作成します。このテーブルは、このノード内で作成するか、データカタログ で手動で作成できます。
ワークフローキャンバスで、
ddl_ods_raw_log_d_starrocksノードにカーソルを合わせ、[ノードを開く] をクリックします。テーブル作成文を編集します。
CREATE TABLE IF NOT EXISTS ods_raw_log_d_starrocks ( col STRING COMMENT 'ログ', dt DATE not null COMMENT '日付' ) DUPLICATE KEY(col) COMMENT 'ユーザープロファイル分析ユースケース - 生のウェブサイトアクセスログテーブル' PARTITION BY(dt) PROPERTIES ("replication_num" = "1");デバッグパラメーターを設定します。
バッチ同期タスクの設定ページの右側で、Run Configuration をクリックします。ステップ 4 のテスト実行で使用する次のパラメーターを設定します。
パラメーター
説明
コンピューティングリソース
「事前準備」ステップでバインドした StarRocks 計算リソースを選択します。
[リソースグループ]
「事前準備」ステップで購入したサーバーレスリソースグループを選択します。
(任意) スケジューリングプロパティを設定します。
このチュートリアルでは、デフォルトのスケジューリングパラメーターを維持できます。ノード設定ページの右側で、[プロパティ] をクリックします。パラメーターの詳細については、「ノードのスケジューリングプロパティの設定」をご参照ください。
[スケジューリングパラメーター]:これらはすでにワークフローレベルで設定されているため、ノードレベルでの設定は不要です。タスクまたはコードで直接使用できます。
[スケジューリングポリシー]:[遅延実行時間] パラメーターを使用して、ワークフローの実行後に子ノードの実行を遅延させる時間を指定できます。このチュートリアルでは、この設定は行いません。
上部のツールバーで、[保存] をクリックしてノードを保存します。
ユーザーデータ同期パイプライン (ods_user_info_d_starrocks) の構成
ワークフロー キャンバスで、
ods_user_info_d_starrocksノードにマウスを合わせて、[ノードを開く] をクリックします。データ同期パイプラインのネットワークとリソースを構成します。
パラメーター
説明
ソース
ソース: MySQL
データソース名:
user_behavior_analysis_mysql
リソースグループ
「環境の準備」ステップで作成したサーバーレスリソースグループを選択します。
宛先
宛先: StarRocks
データソース名:
doc_starrocks_storage_compute_tightly_01
タスクを構成します。
ソースと宛先の設定。
モジュール
パラメーター
構成
ソース
テーブル
MySQL テーブル
ods_user_info_dを選択します。分割キー
プライマリキーまたはインデックス付きカラムを分割キーとして使用することを推奨します。整数型フィールドのみがサポートされています。
このチュートリアルでは、
uidフィールドを分割キーとして使用します。宛先
テーブル
StarRocks テーブル
ods_user_info_d_starrocksを選択します。書き込み前に実行するステートメント
このユースケースでは、
dtフィールドに基づく動的パーティションを使用します。ノードが再実行されたときにデータの重複を防ぐため、各同期の前に次の SQL ステートメントを使用してターゲットパーティションを削除します。ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE${var}は、スケジューリング構成フェーズ中にスケジューリングパラメーターから値が割り当てられる変数です。これにより、スケジュールされたシナリオで動的パラメーター入力が可能になります。詳細については、「スケジューリングの構成」をご参照ください。StreamLoad リクエストパラメーター
StreamLoad のリクエストパラメーター。JSON 形式である必要があります。
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }フィールドマッピング。
フィールドマッピングを構成して、ソースフィールドが宛先フィールドに書き込まれる方法を定義します。また、スケジューリングパラメーター変数を使用して StarRocks パーティションフィールドを動的に入力し、各日のデータが StarRocks の対応するパーティションに書き込まれることを保証します。
名前が一致するソースフィールドを宛先フィールドに自動的にマップするには、[同名のフィールドをマップ] をクリックします。
「[追加]」をクリックし、'${var}' を入力します。このフィールドを StarRocks の dt フィールドに手動でマッピングします。
チャネル コントロール。
このチュートリアルでは、[ダーティデータのポリシー] を [ダーティデータを許可しない] に設定し、その他の設定はデフォルト値のままにします。詳細については、「ウィザードモードでタスクを設定する」をご参照ください。
デバッグパラメーターを構成します。
バッチ同期タスクの構成ページの右側で、Run Configuration をクリックします。手順 4 のテスト実行で使用する以下のパラメーターを設定します。手順 4。
パラメーター
説明
リソースグループ
「環境の準備」ステップで購入したサーバーレスリソースグループを選択します。
スクリプトパラメーター
「[パラメーターの追加]」をクリックし、yyyymmdd 形式の特定の定数(例:
var=20250223)に設定します。デバッグ時に、DataStudio はタスクで定義された変数をこの定数で置き換えます。(オプション) スケジューリングプロパティを構成します。
このチュートリアルでは、デフォルトのスケジューリングパラメーターをそのまま使用できます。ノード構成ページの右側で、[プロパティ] をクリックします。パラメーターの詳細については、「ノードのスケジューリングプロパティの設定」をご参照ください。
[スケジューリングパラメーター]: これらはすでにワークフロー レベルで構成済みであるため、ノード レベルの構成は不要です。タスクまたはコードで直接使用できます。
[スケジューリングポリシー]: [遅延実行時間] パラメーターを使用して、ワークフロー 実行後の子ノード の 実行を遅らせる時間を指定できます。このチュートリアルでは、この設定は行っていません。
トップツールバーで、[保存] をクリックして、ノードを保存します。
ログデータ同期パイプライン (ods_raw_log_d_starrocks) の構成
Workflow キャンバスで、
ods_raw_log_d_starrocksノードにカーソルを合わせ、[ノードを開く] をクリックします。データ同期パイプラインのネットワークとリソースを構成します。
[ソース]、[リソースグループ]、および[宛先]を構成した後、[次へ]をクリックし、プロンプトに従って接続性テストを完了します。次の表に構成の詳細を示します。
パラメーター
構成
ソース
ソース: HttpFile
[データソース名]:
user_behavior_analysis_httpfile
リソースグループ
「環境の準備」ステップで作成したサーバーレスリソースグループを選択します。
宛先
宛先: StarRocks
データソース名:
doc_starrocks_storage_compute_tightly_01
[次へ] をクリックして、データ同期タスクを設定します。
ソースと宛先の設定.
モジュール
パラメーター
構成
ソース
ファイルパス
/user_log.txtテキストタイプ
textカラムデリミタ
|Noソースを設定した後、[データ構造を確認] をクリックします。
宛先
テーブル
StarRocks テーブル
ods_raw_log_d_starrocksを選択します。書き込み前に実行するステートメント
このユースケースでは、
dtフィールドに基づく動的パーティションを使用します。ノードが再実行されたときにデータの重複を防ぐため、各同期の前に次の SQL ステートメントを使用してターゲットパーティションを削除します。ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE${var}は、スケジューリング構成フェーズ中にスケジューリングパラメーターから値が割り当てられる変数です。これにより、スケジュールされたシナリオで動的パラメーター入力が可能になります。詳細については、「スケジューリングの構成」をご参照ください。StreamLoad リクエストパラメーター
StreamLoad のリクエストパラメーター。JSON 形式である必要があります。
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }チャネル コントロール。
このチュートリアルでは、[不正データのポリシー] を [不正データを許可しない] に設定し、他の設定はデフォルト値のままにしてください。詳細については、「ウィザードモードでタスクを設定する」をご参照ください。
フィールド マッピング。
ツールバーで、[ウィザードモード] から [スクリプトモード] に切り替えるには、
アイコンをクリックします。これにより、HttpFile データソースのフィールドマッピングを設定し、StarRocks の dt パーティションフィールドに動的に値を割り当てることができます。ソース HttpFile 構成の Column セクションに、次を追加します。
{ "type": "STRING", "value": "${var}" }ods_raw_log_d_starrocksノードの完全なスクリプトは次のとおりです。{ "type": "job", "version": "2.0", "steps": [ { "stepType": "httpfile", "parameter": { "fileName": "/user_log.txt", "nullFormat": "", "compress": "", "requestMethod": "GET", "connectTimeoutSeconds": 60, "column": [ { "index": 0, "type": "STRING" }, { "type": "STRING", "value": "${var}" } ], "skipHeader": "false", "encoding": "UTF-8", "fieldDelimiter": "|", "fieldDelimiterOrigin": "|", "socketTimeoutSeconds": 3600, "envType": 0, "datasource": "user_behavior_analysis_httpfile", "bufferByteSizeInKB": 1024, "fileFormat": "text" }, "name": "Reader", "category": "reader" }, { "stepType": "starrocks", "parameter": { "loadProps": { "row_delimiter": "\\x02", "column_separator": "\\x01" }, "envType": 0, "datasource": "doc_starrocks_storage_compute_tightly_01", "column": [ "col", "dt" ], "tableComment": "", "table": "ods_raw_log_d_starrocks", "preSql": "ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE ; " }, "name": "Writer", "category": "writer" }, { "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" }, "name": "Processor", "category": "processor" } ], "setting": { "errorLimit": { "record": "0" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
デバッグパラメーターを構成します。
バッチ同期タスクの構成ページの右側で、Run Configuration をクリックします。次のパラメーターを設定して、手順 4 のテスト実行で使用します。
パラメーター
説明
リソースグループ
「環境の準備」ステップで購入したサーバーレスリソースグループを選択します。
スクリプトパラメーター
[パラメーターの追加] をクリックし、
var=20250223のように、yyyymmdd フォーマットで特定の定数を設定します。デバッグ中に、DataStudio はタスクで定義された変数をこの定数に置き換えます。(オプション) スケジューリングプロパティを構成します。
このチュートリアルでは、デフォルトのスケジューリングパラメーターをそのまま使用できます。ノード構成ページの右側で、[プロパティ] をクリックします。パラメーターの詳細については、「ノードのスケジューリングプロパティを構成する」をご参照ください。
スケジューリングパラメーター: これらはすでにワークフローレベルで構成されているため、ノードレベルの構成は不要です。タスクまたはコードで直接使用できます。
スケジューリングポリシー: [遅延実行時間] パラメーターを使用して、Workflow の実行後に子ノードの実行を遅らせる時間を指定できます。このチュートリアルでは、この設定は行いません。
上部のツールバーで、[保存] をクリックして、ノードを保存します。
4. タスクの実行
データを同期します。
ワークフローツールバーで、[実行] をクリックします。この実行用に各ノードで定義されたパラメーター変数の値を設定します。このチュートリアルでは
20250223を使用しますが、必要に応じて値を変更できます。その後、[OK] をクリックし、実行が完了するまで待ちます。結果をクエリします。
SQL クエリページに移動します。
DataWorks コンソールにログインします。 上部のナビゲーションバーで、目的のリージョンを選択します。 左側のナビゲーションウィンドウで、 を選択します。 表示されたページで、[DataAnalysis に移動] をクリックします。 表示されたページの左側のナビゲーションウィンドウで、[SQL クエリ] をクリックします。
SQL クエリファイルを構成します。
[マイ ファイル] の横にある
アイコンをクリックして新しいファイルを作成し、SQL クエリファイルの名前を入力します。新しく作成したファイルをクリックしてエディターを開きます。
エディターの右上隅にある
アイコンをクリックして、クエリのワークスペースとその他の設定を構成します。次の表に構成の詳細を示します。パラメーター
説明
ワークスペース
user_profile_analysis_starrocksワークフローが配置されているワークスペースを選択します。データソースタイプ
ドロップダウンリストから
StarRocksを選択します。データソース名
「環境の準備」ステップでバインドした StarRocks 開発環境を選択します。
[OK] をクリックしてクエリデータソースの構成を完了します。
クエリ SQL を編集します。
ノードが正常に実行された後、次のクエリを実行して、データが StarRocks テーブルに書き込まれたことを確認します。
-- Update `your_business_date` with the correct Data Timestamp. For a task run on 20250223, the Data Timestamp is 20250222 (the previous day). SELECT * FROM ods_raw_log_d_starrocks WHERE dt=your_business_date; SELECT * FROM ods_user_info_d_starrocks WHERE dt=your_business_date;
次のステップ
データの同期が完了しました。次に、データの処理と分析の方法を学ぶために、次のチュートリアルに進みます。「データの処理」