このトピックでは、DataWorks Data Integration を使用して LogHub データを MaxCompute に同期する方法について説明します。
背景情報
次のシナリオで、LogHub データソースから宛先にデータを同期できます。
リージョンをまたいで、LogHub データソースから MaxCompute データソースなどのデータソースにデータを同期します。
Alibaba Cloud アカウントをまたいで、LogHub データソースから MaxCompute データソースなどのデータソースにデータを同期します。
同じ Alibaba Cloud アカウント内で、LogHub データソースから MaxCompute データソースなどのデータソースにデータを同期します。
Alibaba Cloud パブリッククラウドと Alibaba Finance Cloud の間で、LogHub データソースから MaxCompute データソースなどのデータソースにデータを同期します。
Alibaba Cloud アカウント A と B がある場合、アカウント B を使用して Data Integration で同期タスクを作成できます。次に、同期タスクを使用して、アカウント A 内の LogHub データをアカウント B 内の MaxCompute データソースに同期できます。以下の説明で詳細を説明します。
アカウント A の AccessKey ID と AccessKey シークレットを使用して、LogHub データソースを追加します。
アカウント B を使用して、アカウント A を使用して作成されたすべての Simple Log Service プロジェクトのデータを同期できます。
アカウント A 内の RAM ユーザー A1 の AccessKey ID と AccessKey シークレットを使用して、LogHub データソースを追加します。
アカウント A を使用して、
AliyunLogFullAccessおよびAliyunLogReadOnlyAccessシステムポリシーを RAM ユーザー A1 にアタッチし、Simple Log Service の一般的な権限を RAM ユーザー A1 に付与します。詳細については、「RAM ユーザーを作成し、RAM ユーザーに Simple Log Service へのアクセスを承認する」をご参照ください。説明AliyunLogFullAccessおよびAliyunLogReadOnlyAccessシステムポリシーが RAM ユーザー A1 にアタッチされた後、RAM ユーザー A1 を使用して、アカウント A を使用して作成されたすべての Simple Log Service プロジェクトにアクセスできます。アカウント A を使用して、Simple Log Service のカスタム権限を RAM ユーザー A1 に付与します。
アカウント A を使用して [RAM コンソール] にログインします。左側のナビゲーションウィンドウで、 を選択します。[ポリシー] ページで、[ポリシーの作成] をクリックします。
RAM および RAM ユーザーに Simple Log Service のカスタム権限を付与する方法の詳細については、「RAM の概要」および「概要」をご参照ください。
次のポリシーが RAM ユーザー A1 にアタッチされている場合、アカウント B を使用して、RAM ユーザー A1 が Simple Log Service で権限を持つ project_name1 と project_name2 のデータのみを同期できます。
{ "Version": "1", "Statement": [ { "Action": [ "log:Get*", "log:List*", "log:CreateConsumerGroup", "log:UpdateConsumerGroup", "log:DeleteConsumerGroup", "log:ListConsumerGroup", "log:ConsumerGroupUpdateCheckPoint", "log:ConsumerGroupHeartBeat", "log:GetConsumerGroupCheckPoint" ], "Resource": [ "acs:log:*:*:project/project_name1", "acs:log:*:*:project/project_name1/*", "acs:log:*:*:project/project_name2", "acs:log:*:*:project/project_name2/*" ], "Effect": "Allow" } ] }
LogHub データソースの追加
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration に移動] をクリックします。
Data Integration ページの左側のナビゲーションウィンドウで、[データソース] をクリックします。
[データソース] ページで、[データソースの追加] をクリックします。
[データソースの追加] ダイアログボックスで、[LogHub] をクリックします。
[LogHub データソースの追加] ダイアログボックスで、パラメーターを設定します。
パラメーター
説明
データソース名
データソースの名前。名前には、文字、数字、アンダースコア (_) のみを含めることができ、文字で始まる必要があります。
データソースの説明
データソースの説明。説明は 80 文字以内で入力してください。
LogHub エンドポイント
Simple Log Service プロジェクトへのアクセスに使用される URL。URL は
http://example.com形式である必要があります。example.com は、Simple Log Service プロジェクトのエンドポイントを示します。詳細については、「エンドポイント」をご参照ください。プロジェクト
Simple Log Service プロジェクトの名前。
AccessKey ID
Simple Log Service プロジェクトへの接続に使用する Alibaba Cloud アカウントの AccessKey ID。[AccessKey ペア] ページで AccessKey ID をコピーできます。
AccessKey シークレット
Simple Log Service プロジェクトへの接続に使用する Alibaba Cloud アカウントの AccessKey シークレット。
使用するリソースグループを見つけ、[接続ステータス (本番環境)] 列の [ネットワーク接続のテスト] をクリックします。
接続テストが成功したら、[完了] をクリックします。
バッチ同期タスクの作成
[データソース] ページで、左上隅の
アイコンをクリックし、 を選択します。[DataStudio] ページで、ポインターを
アイコンの上に移動し、[ワークフローの作成] を選択します。[ワークフローの作成] ダイアログボックスで、[ワークフロー名] と [説明] パラメーターを設定し、[作成] をクリックします。
[Scheduled Workflow] ペインで作成したワークフローの名前をクリックし、[Data Integration] を右クリックして、 を選択します。
[ノードの作成] ダイアログボックスで、[名前] パラメーターを設定し、[パス] ドロップダウンリストからパスを選択します。
[確認] をクリックして、バッチ同期タスクの設定タブに移動します。
コードレス UI を使用した同期タスクの設定
[ネットワーク接続とリソースグループの設定] ステップで、使用するソースと宛先を選択します。

パラメーター
説明
ソース
[LogHub] を選択します。
データソース名
追加した LogHub データソースを選択します。
リソースグループ
使用する Data Integration 専用リソースグループを選択します。
宛先
MaxCompute を選択します。
データソース名
追加した MaxCompute データソースを選択します。
Data Integration 専用リソースグループとソースおよび宛先との間のネットワーク接続をテストします。ネットワーク接続テストが成功したら、[次へ] をクリックします。
ソース Logstore や宛先テーブルなどの情報を設定します。
次の表に、ソースのパラメーターを示します。
パラメーター
説明
Logstore
データの読み取り元となる Logstore の名前。
ログ開始時刻
データ消費の開始時刻。このパラメーターは、yyyyMMddHHmmss 形式の時間範囲の左境界 (左閉右開) を定義します。例: 20180111013000。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。
ログ終了時刻
データ消費の終了時刻。このパラメーターは、yyyyMMddHHmmss 形式の時間範囲の右境界 (左閉右開) を定義します。例: 20180111013010。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。
バッチ数
一度に読み取るデータレコードの数。デフォルト値: 256。
説明[データプレビュー] をクリックしてデータをプレビューできます。少数の LogHub データレコードのみが表示されます。表示されるデータレコードは、指定した開始時刻と終了時刻により、実際に同期されるデータとは異なる場合があります。
[フィールドマッピング] セクションで、ソースフィールドと宛先フィールド間のマッピングを設定します。
[チャネルコントロール] セクションで、[同期レート] や [ダーティデータレコードのポリシー] などのパラメーターを設定します。
同期タスクの設定タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[プロパティ] タブで、再実行プロパティ、スケジューリング用のリソースグループ、同期タスクの祖先タスクなどの設定を行います。
説明同期タスクの祖先タスクを設定するときは、[ルートノードの追加] を選択します。
上記の設定が正しいことを確認し、上部のツールバーの
アイコンをクリックします。バッチ同期タスクを実行します。
次のいずれかの方法でバッチ同期タスクを実行できます。
タスクを 1 回だけ実行します。
上部のツールバーの
アイコンをクリックして、設定タブでタスクを実行します。説明タスクを実行する前に、タスクに設定したカスタムパラメーターの値を指定する必要があります。
スケジューリングシステムを有効にして、スケジューリングプロパティに基づいてタスクを実行します。
タスクの設定タブの右側のナビゲーションウィンドウで [プロパティ] タブをクリックし、タスクのスケジューリングサイクルなどの時間プロパティを設定します。
次に、上部のツールバーの
アイコンと
アイコンを順にクリックして、タスクをスケジューリングシステムにコミットします。スケジューリングシステムは、タスクに設定されたプロパティに基づいて、翌日から定期的にタスクを実行します。
コードエディタを使用したバッチ同期タスクの設定
バッチ同期タスクの設定タブで、上部のツールバーの [変換スクリプト] アイコンをクリックします。

[ヒント] メッセージで、[OK] をクリックしてコードエディタに切り替えます。
上部のツールバーの [テンプレートのインポート] アイコンをクリックします。

[テンプレートのインポート] ダイアログボックスで、[ソースタイプ]、[データソース]、[ターゲットタイプ]、および [データソース] パラメーターを設定し、[確認] をクリックしてテンプレートを適用します。
ビジネス要件に基づいてコードエディタのコードを変更します。サンプルコード:
{ "type": "job", "version": "1.0", "configuration": { "reader": { "plugin": "loghub", "parameter": { "datasource": "loghub_lzz",// データの読み取り元となる LogHub データソースの名前。追加したデータソースの名前と同じである必要があります。 "logstore": "logstore-ut2",// データの読み取り元となる Logstore の名前。Logstore は、ログデータを収集、保存、クエリするための Simple Log Service ユニットです。 "beginDateTime": "${startTime}",// データ消費の開始時刻。このパラメーターは、時間範囲の左境界 (左閉、右開) を定義します。 "endDateTime": "${endTime}",// データ消費の終了時刻。このパラメーターは、時間範囲の右境界 (左閉、右開) を定義します。 "batchSize": 256,// 一度に読み取るデータレコードの数。デフォルト値: 256。 "splitPk": "", "column": [ "key1", "key2", "key3" ] } }, "writer": { "plugin": "odps", "parameter": { "datasource": "odps_source",// データの書き込み先となるデータソースの名前。追加したデータソースの名前と同じである必要があります。 "table": "test",// データの書き込み先となるテーブルの名前。 "truncate": true, "partition": "",// 宛先テーブルのパーティション情報。 "column": [// データの書き込み先となる列の名前。 "key1", "key2", "key3" ] } }, "setting": { "speed": { "mbps": 8,// 最大転送レート。単位: MB/s。 "concurrent": 7// 最大並列スレッド数。 } } } }