このトピックでは、Data Integration のバッチ同期タスクを使用して、MySQL テーブル ods_user_info_d に格納されている基本的なユーザー情報と、Object Storage Service (OSS) オブジェクト user_log.txt に格納されているユーザーの Web サイトアクセスログを、それぞれ StarRocks テーブル ods_user_info_d_starrocks と ods_raw_log_d_starrocks に同期します。 このトピックでは、DataWorks の Data Integration サービスを使用して異種データソース間でデータを同期し、データウェアハウスの同期を完了する方法について説明します。
前提条件
データ同期に必要な環境が準備されています。 詳細については、「環境を準備する」をご参照ください。
目的
この例で提供されているパブリックデータソースのデータを StarRocks に同期して、ワークフロー設計でのデータ同期を完了します。
ソースタイプ | 同期するデータ | ソーステーブルのスキーマ | デスティネーションタイプ | デスティネーションテーブル | デスティネーションテーブルのスキーマ |
MySQL | テーブル: ods_user_info_d 基本的なユーザー情報 |
| StarRocks |
|
|
HttpFile | オブジェクト: user_log.txt ユーザーの Web サイトアクセスログ | ユーザーアクセスレコードは 1 行を占めます。 | StarRocks |
|
|
DataStudio ページに移動する
DataWorks コンソール にログインします。 上部のナビゲーションバーで、目的のリージョンを選択します。 左側のナビゲーションウィンドウで、 を選択します。 表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。
ステップ 1: ワークフローを設計する
ワークフローを設計する
ワークフローを作成します。
開発コンポーネントは、ワークフローに基づいてデータを開発するために使用されます。 ノードを作成する前に、ワークフローを作成する必要があります。 詳細については、「ワークフローを作成する」をご参照ください。
この例では、User profile analysis_StarRocks という名前のワークフローが使用されます。

ワークフローを設計します。
ワークフローが作成されると、ワークフローキャンバスが自動的に表示されます。 ワークフローキャンバスの上部で、[ノードを作成] をクリックし、ノードをワークフローキャンバスにドラッグし、ワークフロー設計 に基づいて、ノード間の依存関係を構成するための線を描画します。

この例では、ゼロロードノードと同期ノードの間にリネージは存在しません。 この場合、ノード間の依存関係は、ワークフローで線を描画することによって構成されます。 依存関係の構成方法の詳細については、「スケジューリング依存関係構成ガイド」をご参照ください。 次の表に、ノードタイプ、命名規則、および各ノードの機能を示します。
ノード分類
ノードタイプ
命名規則
(最終出力テーブルにちなんで命名)
ノード機能
一般
ゼロロードノード

workshop_start_starrocksユーザー プロファイル 分析に使用されるワークフロー全体を管理するために使用されます。 たとえば、ゼロロードノードは、ワークフローの実行開始時刻を決定します。 ワークスペース内のワークフローが複雑な場合、ゼロロードノードを使用すると、ワークフロー内のデータフローのパスがより明確になります。 このノードはドライランノードです。 ノードのコードを編集する必要はありません。
データベース
StarRocks

ddl_ods_user_info_d_starrocksMySQL テーブルの基本的なユーザー情報を受信するための StarRocks テーブル
ods_user_info_d_starrocksを作成するために使用されます。 このデータベースノードは、同期前に作成されます。データベース
StarRocks

ddl_ods_raw_log_d_starrocksOSS オブジェクト内のユーザーの Web サイトアクセスログを受信するための StarRocks テーブル
ods_raw_log_d_starrocksを作成するために使用されます。 このデータベースノードは、同期前に作成されます。Data Integration
オフライン同期

ods_user_info_d_starrocksMySQL に格納されている基本的なユーザー情報を StarRocks テーブル
ods_user_info_d_starrocksに同期するために使用されます。Data Integration
オフライン同期

ods_raw_log_d_starrocksOSS に格納されているユーザーの Web サイトアクセスログを StarRocks テーブル
ods_raw_log_d_starrocksに同期するために使用されます。
スケジューリングロジックを構成する
この例では、ゼロロードノード workshop_start_starrocks を使用して、毎日 00:30 にワークフローの実行をトリガーします。 次の表に、ゼロロードノードのスケジューリングプロパティの構成を示します。 他のノードのスケジューリング構成を変更する必要はありません。 実装ロジックの詳細については、「さまざまなシナリオでワークフロー内のノードのスケジューリング時間を構成する」をご参照ください。 その他のスケジューリング構成の詳細については、「概要」をご参照ください。
構成項目 | スクリーンショット | 説明 |
スケジューリング時間 |
| ゼロロードノードのスケジューリング時間は 00:30 に設定されています。 ゼロロードノードは、毎日 00:30 に現在のワークフローの実行をトリガーします。 |
スケジューリングの依存関係 |
| ゼロロードノード |
DataWorks ワークフロー内のすべてのノードは、別のノードに依存する必要があります。 データ同期フェーズのすべてのノードは、ゼロロードノード workshop_start_starrocks に依存します。 したがって、データ同期ワークフローの実行は、workshop_start_starrocks ノードによってトリガーされます。
ステップ 2: データ同期タスクを構成する
デスティネーション StarRocks テーブルを作成する
データを同期する前に、同期する生データを格納するためのデスティネーション StarRocks テーブルを作成する必要があります。
この例では、デスティネーション StarRocks テーブルは、ソーステーブルスキーマに基づいて生成されます。 詳細については、このトピックの目的 セクションを参照してください。 ワークフローの構成タブで、データベースノード ddl_ods_user_info_d_starrocks と ddl_ods_raw_log_d_starrocks をダブルクリックします。 各ノードの構成タブで、StarRocks テーブルを作成するために使用するステートメントを入力し、
アイコンをクリックします。
ddl_ods_user_info_d_starrocksCREATE TABLE IF NOT EXISTS ods_user_info_d_starrocks ( uid STRING COMMENT 'ユーザーID', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢層', zodiac STRING COMMENT '星座', dt STRING not null COMMENT '時間' ) DUPLICATE KEY(uid) COMMENT 'ユーザー行動分析ケース - 基本的なユーザー情報を格納するテーブル' PARTITION BY(dt) PROPERTIES("replication_num" = "1");ddl_ods_raw_info_d_starrocksCREATE TABLE IF NOT EXISTS ods_raw_log_d_starrocks ( col STRING COMMENT 'ログ', dt DATE not null COMMENT '時間' ) DUPLICATE KEY(col) COMMENT 'ユーザー行動分析ケース - ユーザーの Web サイトアクセスログを格納するテーブル' PARTITION BY(dt) PROPERTIES ("replication_num" = "1");
基本的なユーザー情報を同期するためのバッチ同期タスクを構成する
ワークフローの構成タブで、バッチ同期ノード ods_user_info_d_starrocks をダブルクリックして、ノードの構成タブに移動します。 この例で提供されている MySQL テーブル ods_user_info_d から StarRocks テーブル ods_user_info_d_starrocks に基本的なユーザー情報を同期するためのパラメーターを構成します。
ネットワーク接続とリソースグループを構成します。
[ソース]、[リソースグループ]、および [デスティネーション] の構成が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。 次の表に、構成を示します。
パラメーター
説明
ソース
パラメーターを [MySQL] に設定します。
[データソース名] パラメーターを
user_behavior_analysis_mysqlに設定します。
リソースグループ
環境準備 フェーズで購入したサーバーレスリソースグループを選択します。
デスティネーション
パラメーターを [StarRocks] に設定します。
[データソース名] パラメーターを
Doc_StarRocks_Storage_Compute_Tightly_01に設定します。

タスクを構成します。
ソースとデスティネーションを構成します。
項目
パラメーター
説明
ソース
テーブル
MySQL テーブル
ods_user_info_dを選択します。分割キー
読み取るデータの分割キー。 プライマリキーまたはインデックス付き列を分割キーとして使用することをお勧めします。 INTEGER タイプのフィールドのみがサポートされています。
この例では、
uidフィールドが分割キーとして使用されます。デスティネーション
テーブル
StarRocks テーブル
ods_user_info_d_starrocksを選択します。書き込み前に実行されるステートメント
この例では、
dtフィールドに基づいてデータが動的にパーティション化されます。 ノードが再実行されてデータが繰り返し書き込まれるのを防ぐために、次の SQL ステートメントを使用して、各同期前に既存のデスティネーションパーティションを削除します。SQL ステートメント
ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCEでは、${var}はパラメーターです。 スケジューリング構成フェーズで構成したスケジューリングパラメーターが、値として ${var} パラメーターに割り当てられます。 この場合、スケジューリングパラメーターの値は、スケジューリングパラメーターの構成に基づいて、ノードのコードで動的に置き換えられます。 詳細については、スケジューリング構成 を参照してください。StreamLoad リクエストパラメーター
StreamLoad リクエストパラメーター。JSON 形式である必要があります。
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
フィールドマッピングを構成します。
ソースフィールドとデスティネーションフィールド間のマッピングを構成します。 スケジューリングパラメーターをノードコード内の変数に値として割り当てることができます。 これにより、StarRocks テーブルのパーティションフィールドの値を動的に置き換えることができます。 この場合、毎日のデータは、StarRocks テーブルのデータタイムスタンプベースのパーティションに書き込むことができます。
[同名のフィールドをマップ] をクリックします。ソース MySQL テーブルのフィールドは、宛先テーブル内の同じ名前のフィールドに自動的にマップされます。この場合、ソースフィールドのデータは、同じ名前の宛先フィールドに自動的に書き込まれます。
クリック: [追加]。 次に
'${var}'を入力し、このフィールドを StarRocks テーブルの dt フィールドに手動でマッピングします。
スケジューリングプロパティを構成します。
ノードの構成タブで、右側のナビゲーションウィンドウの [プロパティ] をクリックします。 詳細については、「ノードのスケジューリングプロパティ」をご参照ください。 次の表に、構成を示します。
セクション
説明
スクリーンショット
スケジューリングパラメーター
このセクションで、[パラメーターを追加] をクリックして、スケジューリングパラメーターを追加します。
[パラメーター名] パラメーターを var に設定します。
[パラメーター値] パラメーターを $[yyyymmdd-1] に設定します。

依存関係
このセクションで、出力テーブルが現在のノードの出力名として使用されていることを確認します。
出力テーブルには、
worksspacename.tablename形式で名前が付けられます。
ユーザーの Web サイトアクセスログを同期するためのバッチ同期タスクを構成する
ワークフローの構成タブで、バッチ同期ノード ods_raw_log_d_starrocks をダブルクリックして、ノードの構成タブに移動します。 パブリック HttpFile データソースの user_log.txt ファイルから StarRocks テーブル ods_raw_log_d_starrocks にユーザーの Web サイトアクセスログを同期するためのパラメーターを構成します。
ネットワーク接続とリソースグループを構成します。
[ソース]、[リソースグループ]、および [デスティネーション] の構成が完了したら、[次へ] をクリックし、プロンプトに従って接続テストを完了します。 次の表に、構成を示します。
パラメーター
説明
ソース
パラメーターを [HttpFile] に設定します。
[データソース名] パラメーターを
user_behavior_analysis_HttpFileに設定します。
リソースグループ
環境準備 フェーズで購入したサーバーレスリソースグループを選択します。
デスティネーション
パラメーターを [StarRocks] に設定します。
[データソース名] パラメーターを
Doc_StarRocks_Storage_Compute_Tightly_01に設定します。

タスクを構成します。
ソースとデスティネーションを構成します。
項目
パラメーター
説明
ソース
ファイルパス
/user_log.txtファイルタイプ
text列デリミタ
|いいえ
データソースの構成が完了したら、[データ構造の確認] をクリックします。
デスティネーション
テーブル
ods_raw_log_d_starrocks書き込み前に実行されるステートメント
この例では、
dtフィールドに基づいてデータが動的にパーティション化されます。 ノードが再実行されてデータが繰り返し書き込まれるのを防ぐために、次の SQL ステートメントを使用して、各同期前に既存のデスティネーションパーティションを削除します。ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCESQL ステートメントでは、
${var}はパラメーターです。 スケジューリング構成フェーズで構成したスケジューリングパラメーターが、値として ${var} パラメーターに割り当てられます。 この場合、スケジューリングパラメーターの値は、スケジューリングパラメーターの構成に基づいて、ノードのコードで動的に置き換えられます。StreamLoad リクエストパラメーター
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
フィールドマッピングを構成します。
ノードの構成タブの上部にあるツールバーで、
アイコンをクリックして、ノード構成モードを [コードレス UI] から [スクリプトモード] に変更します。 HttpFile ファイルのフィールドと StarRocks テーブルのフィールド間のマッピングを構成し、StarRocks テーブルの dt パーティションフィールドの値を動的に置き換えることができるようにします。HttpFile ファイルの列の追加構成:
{ "type": "STRING", "value": "${var}" }ods_raw_log_d_starrocksノードのサンプルスクリプト全体:{ /* "type": "job", // タスクタイプ "version": "2.0", // バージョン "steps": [ //ステップ { "stepType": "httpfile", //ステップタイプ、httpfileリーダー "parameter": { // パラメーター "fileName": "/user_log.txt", // ファイル名 "nullFormat": "", //null値の形式 "compress": "", // 圧縮タイプ "requestMethod": "GET", // リクエストメソッド "connectTimeoutSeconds": 60, // 接続タイムアウト(秒) "column": [ // 列 { "index": 0, // インデックス "type": "STRING" // タイプ }, { "type": "STRING", // タイプ "value": "${var}" // 値 } ], "skipHeader": "false", // ヘッダーをスキップするかどうか "encoding": "UTF-8", // エンコーディング "fieldDelimiter": "|", // フィールドデリミタ "fieldDelimiterOrigin": "|", // 元のフィールドデリミタ "socketTimeoutSeconds": 3600, // ソケットタイムアウト(秒) "envType": 0, // 環境タイプ "datasource": "user_behavior_analysis", // データソース名 "bufferByteSizeInKB": 1024, // バッファサイズ(KB) "fileFormat": "text" // ファイル形式 }, "name": "Reader", // リーダー名 "category": "reader" // カテゴリ }, { "stepType": "starrocks", // ステップタイプ、starrocksライター "parameter": { // パラメーター "loadProps": { // ロードプロパティ "row_delimiter": "\\x02", // 行デリミタ "column_separator": "\\x01" // 列デリミタ }, "envType": 0, // 環境タイプ "datasource": "Doc_StarRocks_Storage_Compute_Tightly_01", // データソース名 "column": [ // 列 "col", "dt" ], "tableComment": "", // テーブルコメント "table": "ods_raw_log_d_starrocks", // テーブル名 "preSql": "ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE ; " // 書き込み前に実行される SQL ステートメント }, "name": "Writer", // ライター名 "category": "writer" // カテゴリ }, { "copies": 1, // コピー数 "parameter": { // パラメーター "nodes": [], // ノード "edges": [], // エッジ "groups": [], // グループ "version": "2.0" // バージョン }, "name": "Processor", // プロセッサ名 "category": "processor" // カテゴリ } ], "setting": { // 設定 "errorLimit": { // エラー制限 "record": "0" // レコード数 }, "locale": "zh", // ロケール "speed": { // 速度 "throttle": false, // スロットル "concurrent": 2 // 並列数 } }, "order": { // 順序 "hops": [ // ホップ { "from": "Reader", // ソース "to": "Writer" // デスティネーション } ] } } */スケジューリングプロパティを構成します。
ノードの構成タブで、右側のナビゲーションウィンドウの [プロパティ] をクリックします。 [プロパティ] タブで、ノードのスケジューリングプロパティと基本情報を構成します。 次の表に、構成を示します。
セクション
説明
スクリーンショット
スケジューリングパラメーター
このセクションで、[パラメーターを追加] をクリックして、スケジューリングパラメーターを追加します。
[パラメーター名] パラメーターを var に設定します。
[パラメーター値] パラメーターを $[yyyymmdd-1] に設定します。

依存関係
このセクションで、出力テーブルが現在のノードの出力名として使用されていることを確認します。
出力テーブルには、
worksspacename.tablename形式で名前が付けられます。
ステップ 3: 同期されたデータを確認する
ワークフローを実行する
ワークフローの構成タブに移動します。
[ビジネスフロー] の下の User profile analysis_StarRocks ワークフローをダブルクリックします。 ワークフローキャンバスが表示されます。

ワークフローを実行します。
ワークフローキャンバスの上部にあるツールバーで、
アイコンをクリックして、依存関係に基づいてデータ統合フェーズでワークフローを実行します。ノードのステータスを表示します。
ノードが
状態の場合、同期プロセスは正常です。ノード実行ログを表示します。
キャンバスの
ods_user_info_d_starrocksノードまたはods_raw_log_d_starrocksノードを右クリックし、[ログの表示] を選択して、同期の詳細を表示します。
同期結果を表示する
アドホッククエリを作成します。
[DataStudio] ページの左側のナビゲーションウィンドウで、 [アドホッククエリ] アイコンをクリックします。 [アドホッククエリ] ウィンドウが表示されます。 [アドホッククエリ] を右クリックし、 を選択します。

同期結果テーブルをクエリします。
-- クエリステートメントで、パーティションキーの値をノードのデータタイムスタンプに変更します。 たとえば、ノードが 2024 年 1 月 2 日に実行された場合、データタイムスタンプは 20240101 で、これはノードの実行時刻の 1 日前です。 SELECT * from ods_raw_log_d_starrocks where dt=データタイムスタンプ; SELECT * from ods_user_info_d_starrocks where dt=データタイムスタンプ;
次のステップ
データ同期は完了です。 次のチュートリアルに進むことができます。 次のチュートリアルでは、StarRocks で基本的なユーザー情報とユーザーの Web サイトアクセスログを処理する方法について学習します。 詳細については、「データを処理する」をご参照ください。

