本ページでは、LogHub で取得したデータを、Data Integration を使って関連サービス (MaxCompute、Object Storage Server (OSS)、Table Store、リレーショナルデータベース管理システム (RDBMS)、DataHub など) へ送る方法について説明します。 本ページでは MaxCompute を例として使用します。

この機能は、中国 (北京)、中国 (上海)、中国 (香港)、米国 (シリコンバレー)、シンガポール、ドイツ (フランクフルト)、オーストラリア (シドニー)、マレーシア (クアラルンプール)、日本 (東京)、インド (ムンバイ) のリージョンで利用可能です。

シナリオ

  • 異なるリージョン間で異なるタイプのデータソース (LogHub、MaxCompute データソースなど) を同期します。
  • 異なる Alibaba Cloud アカウント間で異なるタイプのデータソース (LogHub、MaxCompute データソースなど) を同期します。
  • 1 つの Alibaba Cloud アカウントを使って異なるタイプのデータソース (LogHub、MaxCompute データソースなど) を同期します。
  • パブリッククラウドアカウントと Alibaba 金融クラウドアカウントを使って、異なるタイプのデータソース (LogHub、MaxCompute データソースなど) を同期します。

クロスアカウントのデータ同期に関する注意

アカウント B を使って Data Integration タスクを作成し、アカウント A の LogHub データを、アカウント B の MaxCompute データソースへ同期する場合

  1. アカウント A のアクセス ID とアクセスキーを使って LogHub データソースを作成します。

    アカウント B には、アカウント A で作成された Log Service プロジェクトへアクセスする権利が付与されています。

  2. RAM ユーザー A1 のアクセス ID とアクセスキーを使って、LogHub データソースを作成します。
    • Alibaba Cloud アカウント A を使って、あらかじめ定義した Log Service 権限 (AliyunLogFullAccess および AliyunLogReadOnlyAccess)を RAM ユーザー A1 へ付与します。 詳細については、「RAM サブアカウントへの Log Service アクセス権限の付与」をご参照ください。
    • Alibaba Cloud アカウント A を使って、カスタムの Log Service 権限を RAM ユーザー A1 へ付与します。

      [RAM console] > [Policies] を選択し、[Custom Policy] > [Create Authorization Policy] > [Blank Template] の順に選択します。

      権限の付与についての詳細は、「RAM アクセス制限」および「RAM サブアカウントアクセス」をご参照ください。

      次のポリシーを RAM ユーザー A1 へ適用する場合、アカウント B は RAM ユーザー A1 としてのみ、Log Service にある project_name1 と project_name2 データを読み取ることができます。
      {
      "Version": "1",
      "Statement": [
      {
      "Action": [
      "log:Get*",
      "log:List*",
      "log:CreateConsumerGroup",
      "log:UpdateConsumerGroup",
      "log:DeleteConsumerGroup",
      "log:ListConsumerGroup",
      "log:ConsumerGroupUpdateCheckPoint",
      "log:ConsumerGroupHeartBeat",
      "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": [
      "acs:log:*:*:project/project_name1",
      "acs:log:*:*:project/project_name1/*",
      "acs:log:*:*:project/project_name2",
      "acs:log:*:*:project/project_name2/*"
      ],
      "Effect": "Allow"
      }
      ]
      }

データソースの追加

  1. アカウント B またはそのアカウントの RAM ユーザーを使って、DataWorks コンソール に開発者としてログインします。プロジェクトを探し、[Data Integration] をクリックします。
  2. [Sync Resources] > [Data Source] の順に選択し、右上隅で、[Add Data Source]をクリックします。
  3. データソースタイプには 「LogHub」 を選択し、[Add Data Source LogHub] ダイアログボックスでデータソースを設定します。
    設定項目 説明
    Data Source Name 文字、数字、アンダースコア (_) を使用できます。 文字から始め、60 文字以内で指定します。
    Description データソースの説明です。80 文字を超えることはできません。
    LogHub Endpoint LogHub データソースのエンドポイントの形式は、http://example.com です。
    Project

    詳細については、「サービスエンドポイント」をご参照ください。

    Access Id、Access Key ログイン認証情報です。アカウント名とパスワードと同様です。 Alibaba Cloud アカウントまたは RAM ユーザーアカウントの Access Id および Access Key を入力します。
  4. [Test Connectivity] をクリックします。
  5. 接続テストが正常に完了したら、[OK]をクリックします。

ウィザードモードでの同期タスクの設定

  1. [Business Flow] > [Data Integration] の順に選択してから、左上隅の [Create Integration Node] をクリックします。
  2. [Create Node] ダイアログボックスで構成を設定し、[Submit] をクリックします。 データ同期タスクの設定ページが表示されます。
  3. ソースを選択します。
    設定項目 説明
    Data source [LogHub] を選択し、LogHub データソース名を入力します。
    Logstore 増分データがエクスポートされるテーブル名 テーブル作成時、または作成後 UpdateTable 操作を行う場合は、テーブルの Stream 機能を有効にする必要があります。
    Start Time ログ時間でログエントリをフィルターするための時間の範囲の開始時間 (含む)。 形式は、 yyyyMMddHHmmss です。 たとえば、20180111013000 です。 これらパラメーターは DataWorks タスクのスケジューリング時間に対応しています。
    End Time ログ時間でログエントリをフィルターするための時間範囲の終了時間 (含まない)。 形式は、 yyyyMMddHHmmss です。 たとえば、20180111013000 です。 これらパラメーターは DataWorks タスクのスケジューリング時間に対応しています。
    Number of Records Read Per Batch 毎回読み取りを行うデータエントリの数 デフォルト値は 2 です。
    [Data preview] ボタンをクリックしてデータをプレビューします。
    Data Preview では、少量の LogHub データエントリをプレビューボックスで表示することができます。表示されるデータは同期するデータとは異なる場合があります。 同期するデータは、[Start Time] と [End Time] で判断されます。
  4. 同期先を選択します。
    MaxCompute 同期先を選択し、テーブルを選択します。 この例では、ok テーブルを選択します。
    設定項目 説明
    Data Source ODPS を選択し、同期先の名前を選択します。
    Table 同期させるテーブルを選択します。
    Partition information 同期するテーブルは、非パーティションテーブルです。 したがって、パーティション情報は表示されません。
    Clearance Rule
    • Clear Existing Data Before Writing (Insert Overwrite): インポートする前に、テーブルまたはパーティション内のすべてのデータがクリーンアップされます。
    • Retain Existing Data (Insert Into): データインポートを行う前にデータのクリーンアップは行われません。 新しいデータが常に添付され実行されます。
    Compression デフォルト値は 「false」 です。
    Consider Empty String as Null デフォルト値は 「90」 です。
  5. フィールドマッピングを設定します。
    ソースと同期先テーブルのフィールドをマッピングします。 ソーステーブル (左側) のフィールドは、同期先のテーブルのフィールドと 1 対 1 で対応しています。 [Enable Same Line Mapping] を選択します。
  6. チャンネル制御ポリシーを設定します。
    最大転送速度とダーティデータのチェックルールを設定します。
    設定項目 説明
    DMU Data Integration の課金単位
    DMU 値は最大同時実行数を制限します。 適切な DMU 値になっていることを確認します。
    Number of Concurrent Jobs 同時同期を設定した場合、データレコードは指定した読み取り分割キーに基づいていくつかのタスクに分割されます。 これらのタスクは同時に実行され、転送速度が向上します。
    Transmission Rate 転送速度を設定することで、過剰な読み取りアクティビティや高負荷からソースデータベースを守ることができます。 転送速度を調整し、ソースデータベースの設定にあった適切な転送速度の設定を推奨します。
    If there are more than ダーティデータエントリ数 たとえば、ソースの Varchar 型のデータが Int 型の同期先カラムへ書き込まれる場合、データ変換例外が発生し、同期先カラムへデータは書き込まれません。 ダーティデータエントリの上限数を設定し、同期データの質を制御します。 ビジネス要件にあわせた適切な上限数を設定します。
    Task's Resource Group 同期タスクを実行する際に使用されるリソースグループ デフォルトでは、タスクはデフォルトリソースグループで実行されます。 プロジェクトのリソースが不十分な場合、カスタムリソースグループを追加し、そのカスタムリソースグループを使って同期タスクを実行することができます。 カスタムリソースグループの追加方法については、「スケジューリングリソースの追加」 をご参照ください。

    データソースネットワークの状態、プロジェクトリソース、ビジネスにおける重要性にあわせて、適切なリソースグループを選択します。

  7. タスクを実行します。

    次の方法のいずれかを使ってタスクを実行します。

    • 直接タスクを実行する (1 回実行)

      ツールバーで [Run] をクリックし、タスクを実行します。 特定のパラメーターを設定すると、[DataStudio] ページから直接タスクを実行することができます。

    • タスクをスケジュールする

      [Submit] をクリックし、同期タスクをスケジューリングシステムへ送信します。 スケジューリングシステムは、タスク設定に基づいて次の日から定期的にタスクを実行します。

スクリプトモードでの同期タスクの設定

スクリプトモードでタスクを設定するには、ツールバーで [Switch to Script Mode] をクリックし、[OK] をクリックします。

スクリプトモードでは、構成を設定することができます。 コード例は次のとおりです。

{
"type": "job",
"version": "1.0",
"Configuration ":{
"reader": {
"plugin": "loghub",
"parameter": {
"datasource": "loghub_lzz",//Data source name. Use the name of the data resource that you have added.
"logstore": "logstore-ut2",//Source Logstore name. A Logstore is a log data collection, storage, and query unit in LogHub.
"beginDateTime": "${startTime}",//Start (included) time for filtering log entries by log time.
"endDateTime": "${endTime}",//End (included) time for filtering log entries by log time.
"batchSize": 256,//The number of data entries that are read each time. The default value is 256.
"splitPk": "",
"column": [
"key1",
"key2",
"key3"
]
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "odps_first",//Data source name. Use the name of the data resource that you have added.
"table": "ok",//Destination table name
"truncate": true,
"partition": "",//Partition information
"column": [//Destination column name
"key1",
"key2",
"key3"
]
}
},
"Setting ":{
"Speed ":{
"mbps": 8,//Maximum transmission rate
"concurrent": 7//Number of concurrent jobs
}
}
}
}