すべてのプロダクト
Search
ドキュメントセンター

DataWorks:LogHub (SLS)

最終更新日:Apr 02, 2026

LogHub (SLS) データソースを使用すると、Simple Log Service (SLS) からデータを読み取ったり、SLS へデータを書き込んだりできます。本トピックでは、DataWorks が LogHub (SLS) とのデータ同期をどのようにサポートしているかについて説明します。

機能

DataWorks は、LogHub (SLS) とのデータ同期に関して以下のシナリオをサポートしています。

  • LogHub (SLS) と MaxCompute などのデータソース間で、異なるリージョンにわたるデータ同期を行います。

  • LogHub (SLS) と MaxCompute などのデータソース間で、異なる Alibaba Cloud アカウントにわたるデータ同期を行います。

  • LogHub (SLS) と MaxCompute などのデータソース間で、同一の Alibaba Cloud アカウント内でのデータ同期を行います。

  • LogHub (SLS) と MaxCompute などのデータソース間で、パブリッククラウドアカウントと金融クラウドアカウントにわたるデータ同期を行います。

制限事項

Data Integration が LogHub (SLS) に対してオフライン書き込みを行う場合、フェールオーバー後にタスクを再実行すると、データ重複が発生する可能性があります。これは、LogHub (SLS) が冪等性を持たないためです。

サポートされるフィールドの型

Data Integration は、以下の LogHub (SLS) のフィールドの型の読み取りおよび書き込みをサポートしています。

フィールドの型

オフライン読み取り(LogHub (SLS) Reader)

オフライン書き込み(LogHub (SLS) Writer)

リアルタイム読み取り

STRING

サポート済み

サポート済み

サポート済み

詳細情報:

  • LogHub (SLS) へのオフライン書き込み時

    すべてのサポート対象データ型は、LogHub (SLS) へ書き込む前に STRING 型に変換されます。以下の表に、LogHub (SLS) Writer におけるデータ型の変換関係を示します。

    サポート対象の Data Integration 内部型

    LogHub (SLS) への書き込み時のデータ型

    LONG

    STRING

    DOUBLE

    STRING

    STRING

    STRING

    DATE

    STRING

    BOOLEAN

    STRING

    BYTES

    STRING

  • LogHub (SLS) からのリアルタイム読み取り時

    以下のメタデータフィールドがデフォルトで含まれます。

    LogHub (SLS) リアルタイム同期フィールド

    データ型

    説明

    __time__

    STRING

    SLS 予約フィールド:__time__。ログデータの書き込み時に指定されたログ時刻です。UNIX タイムスタンプ(秒単位)です。

    __source__

    STRING

    SLS 予約フィールド:__source__。ログの送信元デバイスです。

    __topic__

    STRING

    SLS 予約フィールド:__topic__。トピック名です。

    __tag__:__receive_time__

    STRING

    ログがサーバーに到着した時刻です。パブリック IP アドレスの記録機能を有効化している場合、サーバーは受信時にこのフィールドを生ログに追加します。UNIX タイムスタンプ(秒単位)です。

    __tag__:__client_ip__

    STRING

    ログ送信元デバイスのパブリック IP アドレスです。パブリック IP アドレスの記録機能を有効化している場合、サーバーは受信時にこのフィールドを生ログに追加します。

    __tag__:__path__

    STRING

    Logtail によって収集されたログファイルのパスです。Logtail が自動的にこのフィールドをログに追加します。

    __tag__:__hostname__

    STRING

    Logtail がデータを収集するマシンのホスト名です。Logtail が自動的にこのフィールドをログに追加します。

データソースの作成

データソースの構成

データ同期タスクを開発する前に、DataWorks でデータソースを作成する必要があります。詳細については、「データソースの管理」をご参照ください。各パラメーターの詳細な説明については、構成ページのツールチップをご確認ください。

クロスアカウントデータソースの作成

本例では、Alibaba Cloud アカウント A の LogHub (SLS) インスタンスから、Alibaba Cloud アカウント B の MaxCompute インスタンスへデータを同期する方法を示します。このタスクは、アカウント B の Data Integration サービスで構成されます。

  1. Alibaba Cloud アカウント A の AccessKey ID および AccessKey Secret を使用して、LogHub (SLS) データソースを作成します。

    これにより、アカウント B はアカウント A のすべての Simple Log Service プロジェクトからデータを同期できます。

  2. Alibaba Cloud アカウント A の RAM ユーザー(例:A1)の AccessKey ID および AccessKey Secret を使用して、LogHub (SLS) データソースを作成します。

    • アカウント A は、RAM ユーザー A1 に Simple Log Service の一般的な権限(AliyunLogFullAccess および AliyunLogReadOnlyAccess)を付与します。詳細については、「RAM ユーザーの作成と権限付与」をご参照ください。

      説明

      AliyunLogFullAccess および AliyunLogReadOnlyAccess のシステムポリシーを RAM アカウントに付与すると、その RAM アカウントは、親アカウント下のすべての Simple Log Service リソースを照会できます。

    • アカウント A は、RAM ユーザー A1 に Simple Log Service のカスタム権限を付与します。

      アカウント A で RAM コンソール にログインし、「権限ポリシー」ページに移動して、ポリシーの作成 をクリックします。

      権限付与の詳細については、「はじめに」および「概要」をご参照ください。

      以下のポリシーを使用して権限を付与した後、アカウント B は RAM ユーザー A1 を使用して、Simple Log Serviceプロジェクト project_name1 および project_name2 のみからデータを同期できます。

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "log:Get*",
                      "log:List*",
                      "log:CreateConsumerGroup",
                      "log:UpdateConsumerGroup",
                      "log:DeleteConsumerGroup",
                      "log:ListConsumerGroup",
                      "log:ConsumerGroupUpdateCheckPoint",
                      "log:ConsumerGroupHeartBeat",
                      "log:GetConsumerGroupCheckPoint"
                  ],
                  "Resource": [
                      "acs:log:*:*:project/project_name1",
                      "acs:log:*:*:project/project_name1/*",
                      "acs:log:*:*:project/project_name2",
                      "acs:log:*:*:project/project_name2/*"
                  ],
                  "Effect": "Allow"
              }
          ]
      }

データ同期タスクの開発

同期タスクの設定入口および手順については、以下の構成ガイドをご参照ください。

説明

LogHub をソースとして使用する場合、LogHub クエリ構文または構造化プロセス言語(SPL)ステートメントを使用してデータをフィルター処理できます。構文の詳細については、「付録 2:フィルター処理用 SPL 構文」をご参照ください。

単一テーブルオフライン同期タスクの構成

単一テーブルリアルタイム同期タスクの構成

詳細については、「単一テーブルリアルタイム同期タスクの構成」をご参照ください。

全体データベースリアルタイム同期タスクの構成

詳細については、「全体データベースリアルタイム同期タスクの構成」をご参照ください。

よくある質問

Data Integration のその他のよくある質問については、「Data Integration よくある質問」をご参照ください。

付録 1:スクリプト例およびパラメーター説明

コードエディタを使用したバッチ同期タスクの構成

コードエディタを使用してバッチ同期タスクを構成する場合、統一スクリプト形式の要件に基づいて、関連するパラメーターをスクリプトで構成する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下に、コードエディタを使用してバッチ同期タスクを構成する際に、データソースに対して設定する必要があるパラメーターについて説明します。

Reader スクリプト例

{
 "type":"job",
 "version":"2.0",// バージョン番号。
 "steps":[
     {
         "stepType":"LogHub",// プラグイン名。
         "parameter":{
             "datasource":"",// データソース。
             "column":[// フィールド。
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", // トピック。
                 "C_HostName", // ホスト名。
                 "C_Path", // パス。
                 "C_LogTime" // イベント時刻。
             ],
             "beginDateTime":"",// データ消費の開始時刻。
             "batchSize":"",// Simple Log Service から一度にクエリするデータ件数。
             "endDateTime":"",// データ消費の終了時刻。
             "fieldDelimiter":",",// カラム区切り文字。
             "logstore":""// 送信先 Logstore の名前。
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"// エラー件数。
     },
     "speed":{
         "throttle":true,// throttle が false の場合、mbps パラメーターは無効となり、データ転送速度は制限されません。throttle が true の場合、データ転送速度が制限されます。
            "concurrent":1, // 同時実行ジョブ数。
            "mbps":"12"// 最大データ転送速度。1 mbps = 1 MB/s。
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}

Reader スクリプトパラメーター

パラメーター

説明

必須

デフォルト値

endPoint

Simple Log Service のエンドポイントです。エンドポイントは、プロジェクトおよびそのログデータにアクセスするための URL です。エンドポイントは、プロジェクトが配置されている Alibaba Cloud リージョンおよびプロジェクト名に関連付けられます。各リージョンのエンドポイントについては、「エンドポイント」をご参照ください。

はい

なし

accessId

Simple Log Service にアクセスするために使用する AccessKey ID です。ユーザーを識別します。

はい

なし

accessKey

Simple Log Service にアクセスするために使用する AccessKey Secret です。ユーザーを認証します。

はい

なし

project

送信先 Simple Log Service プロジェクトの名前です。プロジェクトは、Simple Log Service におけるリソース管理単位であり、リソースの分離および制御に使用されます。

はい

なし

logstore

送信先 Logstore の名前です。Logstore は、Simple Log Service におけるログデータの収集、保存、およびクエリの単位です。

はい

なし

batchSize

Simple Log Service から一度にクエリするデータ件数です。

いいえ

128

column

各データ入力のカラム名です。Simple Log Service のメタデータを同期カラムとして構成できます。Simple Log Service は、トピック、一意のマシングループ識別子、ホスト名、パス、ログ時刻などのメタデータをサポートしています。

説明

カラム名は大文字小文字を区別します。メタデータの記述方法については、「Simple Log Service のマシングループ」をご参照ください。

はい

なし

beginDateTime

データ消費の開始オフセットです。これは、ログデータが LogHub (SLS) に到着した時刻です。このパラメーターは、包含される開始時刻範囲を指定します。時刻は yyyyMMddHHmmss 形式の文字列である必要があります(例:20180111013000)。DataWorks のスケジューリングパラメーターと併用できます。

たとえば、ノード編集ページの スケジューリング構成 タブで、パラメーターbeginDateTime=${yyyymmdd-1} に設定します。次に、ログ開始時刻${beginDateTime}000000 に設定します。これにより、ログ開始時刻がデータタイムスタンプの 00:00:00 になります。詳細については、「スケジューリングパラメーターのサポート形式」をご参照ください。

説明
  • beginDateTime および endDateTime パラメーターは、必ず同時に使用してください。

  • すべてのデータを同期するには、beginDateTime をデータの開始時刻に、endDateTime を現在日付に設定します。ただし、データ量が多い場合は、大量のリソースを消費する可能性があります。必要に応じて、リソースグループの仕様を調整してください。

はい

なし

endDateTime

データ消費の終了オフセットです。このパラメーターは、排他的な終了時刻範囲を指定します。時刻は yyyyMMddHHmmss 形式の文字列である必要があります(例:20180111013010)。DataWorks のスケジューリングパラメーターと併用できます。

たとえば、ノード編集ページの スケジューリング構成 タブで、パラメーターendDateTime=${yyyymmdd} に設定します。次に、ログ終了時刻${endDateTime}000000 に設定します。これにより、ログ終了時刻がデータタイムスタンプの翌日の 00:00:00 になります。詳細については、「スケジューリングパラメーターのサポート形式」をご参照ください。

重要
  • endDatetime に設定する時刻は、2038-01-19 11:14:07 +08:00 よりも前でなければなりません。そうでない場合、データの取得に失敗する可能性があります。

  • 前のエポックの endDateTime は、次のエポックの beginDateTime と同じかそれより後である必要があります。そうでない場合、一部のリージョンのデータが取得できない可能性があります。

はい

なし

query

LogHub の LogHub クエリ構文 または SPL ステートメント を使用して、LogHub のデータをフィルター処理します。SPL(Structured Process Language)は、SLS がログ処理に使用する構文です。

はい

なし

説明

LogHub から読み取ったデータが欠落している場合は、LogHub コンソールにアクセスし、receive_time メタデータフィールドの値がタスクで構成された時刻範囲内にあるかどうかを確認してください。

Writer スクリプト例

{
    "type": "job",
    "version": "2.0",// バージョン番号。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "LogHub",// プラグイン名。
            "parameter": {
                "datasource": "",// データソース。
                "column": [// フィールド。
                    "col0",
                    "col1",
                    "col2",
                    "col3",
                    "col4",
                    "col5"
                ],
                "topic": "",// トピックを選択します。
                "batchSize": "1024",// 一括送信するレコード数。
                "logstore": ""// 送信先 Simple Log Service Logstore の名前。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// エラー件数。
        },
        "speed": {
            "throttle":true,// throttle が false の場合、mbps パラメーターは無効となり、データ転送速度は制限されません。throttle が true の場合、データ転送速度が制限されます。
            "concurrent":3, // 同時実行ジョブ数。
            "mbps":"12"// 最大データ転送速度。1 mbps = 1 MB/s。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer スクリプトパラメーター

説明

LogHub (SLS) Writer は、Data Integration フレームワークを介して Reader からデータを取得します。その後、サポート対象の Data Integration データ型を STRING 型に変換します。指定された batchSize に達すると、Simple Log Service Java SDK を使用して、データを一括で LogHub (SLS) にプッシュします。

パラメーター

説明

必須

デフォルト値

endpoint

Simple Log Service のエンドポイントです。エンドポイントは、プロジェクトおよびそのログデータにアクセスするための URL です。エンドポイントは、プロジェクトが配置されている Alibaba Cloud リージョンおよびプロジェクト名に関連付けられます。各リージョンのエンドポイントについては、「エンドポイント」をご参照ください。

はい

なし

accessKeyId

Simple Log Service にアクセスするために使用する AccessKeyId です。

はい

なし

accessKeySecret

Simple Log Service にアクセスするために使用する AccessKeySecret です。

はい

なし

project

送信先 Simple Log Service プロジェクトの名前です。

はい

なし

logstore

送信先 Logstore の名前です。Logstore は、Simple Log Service におけるログデータの収集、保存、およびクエリの単位です。

はい

なし

topic

送信先 Simple Log Service の topic 名です。

いいえ

空文字列

batchSize

LogHub (SLS) へ一度に同期するデータ件数です。デフォルト値は 1,024 です。最大値は 4,096 です。

説明

LogHub (SLS) へ一括で同期するデータのサイズは、5 MB を超えてはなりません。単一データ入力のサイズに応じて、一度にプッシュする件数を調整してください。

いいえ

1,024

column

各データ入力のカラム名です。

はい

なし

付録 2:フィルター処理用 SPL 構文

LogHub をソースとして使用する場合、LogHub クエリ構文または構造化プロセス言語(SPL)ステートメントを使用して LogHub のデータをフィルター処理できます。以下の表に構文を示します。

説明

SPL の詳細については、「SPL 構文」をご参照ください。

シナリオ

SQL ステートメント

SPL ステートメント

データフィルター処理

SELECT * WHERE Type='write'

  • 条件付きフィルター処理。

    | where Type='write'
  • あいまい検索。

    | where Type like '%write%'
  • 正規表現。

    | where regexp_like(server_protocol, '\d+')
  • その他(SQL 式)。

    | where <sql-expr> 

フィールド処理およびフィルター処理

特定のフィールドを選択して名前を変更:

SELECT "__tag__:node" AS node, path
  • 特定のフィールドを選択して名前を変更。

    | project node="__tag__:node", path
  • パターンでフィールドを選択。

    | project -wildcard "__tag__:*"
  • 他のフィールドに影響を与えることなく一部のフィールドの名前を変更。

    | project-rename node="__tag__:node"
  • パターンでフィールドを除外。

    | project-away -wildcard "__tag__:*"

データクリーニング

(SQL 関数の呼び出し)

データ型の変換、時刻の解析など:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

データ型の変換、時刻の解析など:

| extend Status=cast(Status as BIGINT), extend Time=date_parse(Time, '%Y-%m-%d %H:%i')

フィールド抽出

正規表現による抽出:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

JSON 抽出:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time
  • 正規表現による抽出:1 回のマッチ。

    | parse-regexp protocol, '(\w+)/(\d+)' as scheme, version
  • JSON 抽出:すべて展開。

    | parse-json -path='$.0' content
  • CSV 抽出。

    | parse-csv -delim='^_^' content as ip, time, host