すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Simple Log Service データソース

最終更新日:Jun 09, 2025

DataWorks は、Simple Log Service データソースからデータを読み取るための LogHub Reader と、Simple Log Service データソースにデータを書き込むための LogHub Writer を提供しています。このトピックでは、Simple Log Service データソースからのデータ同期、または Simple Log Service データソースへのデータ同期の機能について説明します。

制限事項

DataWorks Data Integration を使用してバッチ同期タスクを実行し、Simple Log Service にデータを書き込む場合、Simple Log Service はべき等性を保証しません。失敗したタスクを再実行すると、冗長データが生成される可能性があります。

データ型

次の表に、Simple Log Service の主要なデータ型のサポート状況を示します。

データ型

バッチデータ読み取り用 LogHub Reader

バッチデータ書き込み用 LogHub Writer

リアルタイムデータ読み取り用 LogHub Reader

STRING

サポートされています

サポートされています

サポートされています

  • バッチデータ書き込み用 LogHub Writer

    LogHub Writer は、Data Integration でサポートされているデータ型を、Simple Log Service に書き込む前に STRING に変換します。次の表に、LogHub Writer がデータ型を変換する際のデータ型のマッピングを示します。

    Data Integration データ型

    Simple Log Service データ型

    LONG

    STRING

    DOUBLE

    STRING

    STRING

    STRING

    DATE

    STRING

    BOOLEAN

    STRING

    BYTES

    STRING

  • リアルタイムデータ読み取り用 LogHub Reader

    次の表に、リアルタイムデータ同期用の LogHub Reader が提供するメタデータフィールドを示します。

    リアルタイムデータ同期用の LogHub Reader が提供するフィールド

    データ型

    説明

    __time__

    STRING

    Simple Log Service の予約フィールドです。このフィールドは、ログが Simple Log Service に書き込まれた時刻を指定します。フィールド値は、秒単位の UNIX タイムスタンプです。

    __source__

    STRING

    Simple Log Service の予約フィールドです。このフィールドは、ログが収集されたソースデバイスを指定します。

    __topic__

    STRING

    Simple Log Service の予約フィールドです。このフィールドは、ログのトピックの名前を指定します。

    __tag__:__receive_time__

    STRING

    ログがサーバーに到着した時刻です。パブリック IP アドレス記録機能を有効にすると、サーバーがログを受信したときに、このフィールドが生ログに追加されます。フィールド値は、秒単位の UNIX タイムスタンプです。

    __tag__:__client_ip__

    STRING

    ソースデバイスのパブリック IP アドレスです。パブリック IP アドレス記録機能を有効にすると、サーバーがログを受信したときに、このフィールドが生ログに追加されます。

    __tag__:__path__

    STRING

    Logtail によって収集されたログファイルのパスです。Logtail はこのフィールドをログに自動的に追加します。

    __tag__:__hostname__

    STRING

    Logtail がデータを収集するデバイスのホスト名です。Logtail はこのフィールドをログに自動的に追加します。

データソースを追加する

DataWorks で同期タスクを開発する前に、データソースを追加および管理するの手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールのパラメーターのヒントを参照して、パラメーターの意味を確認できます

データ同期タスクを開発する

同期タスクのエントリポイントと構成手順については、以下の構成ガイドを参照してください。

説明

Simple Log Service データソースからデータを同期するデータ同期タスクを構成する場合、データソースでは、Simple Log Service のクエリ構文と SLS 処理言語 (SPL) ステートメントを使用してデータをフィルタリングできます。Simple Log Service は SPL を使用してログを処理します。詳細については、「付録 2: SPL 構文」をご参照ください。

単一テーブルのデータを同期するバッチ同期タスクを構成する

単一テーブルのデータを同期するリアルタイム同期タスクを構成する

構成手順の詳細については、「単一テーブルから増分データを同期するリアルタイム同期タスクを作成する」および「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。

データベース内のすべてのデータのバッチ同期、データベース内の完全データまたは増分データのリアルタイム同期、およびシャーディングデータベース内のシャーディングテーブルからのデータのリアルタイム同期を実装するための同期設定を構成する

構成手順の詳細については、「Data Integration で同期タスクを構成する」をご参照ください。

FAQ

詳細については、「Data Integration に関する FAQ」をご参照ください。

付録 1: コードとパラメーター

コードエディタを使用してバッチ同期タスクを構成する

コードエディタを使用してバッチ同期タスクを構成する場合は、統一スクリプト形式の要件に基づいて、スクリプトに関連パラメーターを構成する必要があります。詳細については、「コードエディタを使用してバッチ同期タスクを構成する」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを構成する場合に、データソースに構成する必要があるパラメーターについて説明しています。

LogHub Reader のコード

{
 "type":"job",
 "version":"2.0",// バージョン番号。
 "steps":[
     {
         "stepType":"LogHub",// プラグイン名。
         "parameter":{
             "datasource":"",// データソースの名前。
             "column":[// 列の名前。
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", // ログトピック。
                 "C_HostName", // ホスト名。
                 "C_Path", // パス。
                 "C_LogTime" // イベントが発生した時刻。
             ],
             "beginDateTime":"",// データ消費の開始時刻。
             "batchSize":"",// 一度にクエリされるデータエントリの数。
             "endDateTime":"",// データ消費の終了時刻。
             "fieldDelimiter":",",// 列区切り文字。
             "logstore":""// Logstore の名前。
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"// ダーティデータレコードの許容最大数。
     },
     "speed":{
         "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
            "concurrent":1, // 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。単位: MB/s。
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}

LogHub Reader のコードのパラメーター

パラメーター

説明

必須

デフォルト値

endPoint

Simple Log Service のエンドポイント。エンドポイントは、プロジェクトおよびプロジェクト内のログデータにアクセスするために使用できる URL です。エンドポイントは、プロジェクト名と、プロジェクトが存在する Alibaba Cloud リージョンによって異なります。各リージョンの Simple Log Service のエンドポイントの詳細については、「エンドポイント」をご参照ください。

はい

デフォルト値なし

accessId

Simple Log Service プロジェクトへのアクセスに使用される Alibaba Cloud アカウントの AccessKey ID。

はい

デフォルト値なし

accessKey

Simple Log Service プロジェクトへのアクセスに使用される Alibaba Cloud アカウントの AccessKey シークレット。

はい

デフォルト値なし

project

Simple Log Service プロジェクトの名前。プロジェクトは、Simple Log Service でリソースを管理するための基本単位です。プロジェクトは、リソースを分離し、リソースへのアクセスを制御するために使用されます。

はい

デフォルト値なし

logstore

Logstore の名前。Logstore は、Simple Log Service でログデータを収集、保存、およびクエリするために使用できる基本単位です。

はい

デフォルト値なし

batchSize

Simple Log Service から一度に読み取るデータエントリの数。

いいえ

128

column

列の名前。このパラメーターは、Simple Log Service のメタデータに設定できます。サポートされているメタデータには、ログトピック、ホストの一意の識別子、ホスト名、パス、およびログ時間が含まれます。

説明

列名は大文字と小文字が区別されます。Simple Log Service の列名の詳細については、「はじめに」をご参照ください。

はい

デフォルト値なし

beginDateTime

データ消費の開始時刻。値は、ログデータが Simple Log Service に到着した時刻です。このパラメーターは、yyyyMMddHHmmss の形式(例: 20180111013000)で、左閉右開区間の左境界を定義します。このパラメーターは、DataWorks のスケジューリングパラメーターと連携できます。

たとえば、 フィールドに beginDateTime=${yyyymmdd-1}パラメーター と入力した場合、[プロパティ] タブのタスク構成タブで [開始タイムスタンプ]${beginDateTime}000000 に設定して、データタイムスタンプの 00:00:00 から生成されたログを消費できます。詳細については、「スケジューリングパラメーターのサポートされている形式」をご参照ください。

説明
  • beginDateTime パラメーターと endDateTime パラメーターは、ペアで使用してください。

  • すべてのデータを同期するには、beginDateTime をデータの生成開始時刻に設定し、endDateTime を現在の日付に設定します。 これにより、大量のリソースが消費される可能性があります。ビジネス要件に基づいてリソースグループの仕様を調整してください。

はい

デフォルト値はありません

終了日時

データ消費の終了時間。このパラメーターは、20180111013010 などの yyyyMMddHHmmss 形式の左閉右開区間の右境界を定義します。このパラメーターは、DataWorks のスケジューリングパラメーターと連携できます。

たとえば、[パラメーター] フィールドの [プロパティ] タブに endDateTime=${yyyymmdd} と入力した場合、タスク構成タブで [終了タイムスタンプ]${endDateTime}000000 に設定して、データタイムスタンプの翌日の 00:00:00 までに生成されたログを消費できます。詳細については、「スケジューリングパラメーターでサポートされている形式」をご参照ください。

重要
  • endDateTime パラメーターで指定する時間は、(2038-01-19 11:14:07 +8:00) より前である必要があります。そうでない場合、データが読み取られない可能性があります。

  • 前の間隔の endDateTime パラメーターで指定された時間は、現在の間隔の beginDateTime パラメーターで指定された時間より前であってはなりません。そうでない場合、一部のリージョンのデータが読み取られない可能性があります。

はい

デフォルト値なし

クエリ

Simple Log Service のクエリ構文または SPL 文を使用してデータをフィルタリングする方法です。Simple Log Service は、ログの処理に SPL を使用します。

はい

デフォルト値なし

説明

LogHub Reader を使用して Simple Log Service データソースからデータを読み取るときに特定のデータが欠落している場合、メタデータフィールド receive_time の値が、Simple Log Service コンソールでバッチ同期タスクに指定された時間範囲内にあるかどうかを確認します。

LogHub Writer のコード

{
    "type": "job",
    "version": "2.0",// バージョン番号。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType":"LogHub",// プラグイン名。
            "parameter": {
                "datasource": "",// データソース名。
                "column": [// 列の名前。
                    "col0",
                    "col1",
                    "col2",
                    "col3",
                    "col4",
                    "col5"
                ],
                "topic": "",// トピック名。
                "batchSize": "1024",// 一度に書き込むデータレコードの数。
                "logstore": ""// Logstore の名前。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// 許容されるダーティデータレコードの最大数。
        },
        "speed": {
            "throttle":true,// スロットルを有効にするかどうかを指定します。値が false の場合はスロットルが無効になっていることを示し、値が true の場合はスロットルが有効になっていることを示します。 mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
            "concurrent":3, // 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。単位:MB/秒。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

LogHub Writer のコードのパラメーター

説明

LogHub Writer はリーダーからデータを取得し、Data Integration でサポートされているデータ型を STRING に変換します。データレコード数が batchSize パラメーターに指定された値に達すると、LogHub Writer は Simple Log Service SDK for Java を使用して、Simple Log Service にデータレコードを一括送信します。

パラメーター

説明

必須

デフォルト値

エンドポイント

Simple Log Service のエンドポイント。エンドポイントは、プロジェクトおよびプロジェクト内のログデータにアクセスするために使用できる URL です。エンドポイントは、プロジェクト名と、プロジェクトが存在する Alibaba Cloud リージョンによって異なります。各リージョンの Simple Log Service のエンドポイントの詳細については、「エンドポイント」をご参照ください。

はい

デフォルト値なし

AccessKey ID

Simple Log Service プロジェクトへのアクセスに使用する Alibaba Cloud アカウントの [AccessKey ID] です。

はい

デフォルト値なし

AccessKey Secret

Simple Log Service プロジェクトへのアクセスに使用する Alibaba Cloud アカウントの AccessKey Secret です。

はい

デフォルト値なし

プロジェクト

Simple Log Service プロジェクトの名前。

はい

デフォルト値なし

ログストア

ログストアの名前。ログストアは、Simple Log Service でログデータを収集、保存、およびクエリするために使用できる基本単位です。

はい

デフォルト値なし

トピック

トピックの名前。

いいえ

空の文字列

バッチサイズ

Simple Log Service に一度に書き込むデータレコードの数。デフォルト値:1024。最大値:4096

説明

Simple Log Service に一度に書き込むデータのサイズは 5 MB を超えることはできません。単一のデータレコードのサイズに基づいて、このパラメーターの値を変更できます。

いいえ

1,024

各データレコードの列の名前。

はい

デフォルト値なし

付録 2:SPL 構文

Simple Log Service データソースからデータを同期するデータ同期タスクを構成する場合、データソースでは Simple Log Service のクエリ構文と SPL 文を使用してデータをフィルタリングできます。Simple Log Service は、ログの処理に SPL を使用します。次の表は、さまざまなシナリオでの SPL 構文について説明しています。

説明

SPL の詳細については、「SPL 構文」をご参照ください。

シナリオ

SQL 文

SPL 文

データのフィルタリング

SELECT * WHERE Type='write'
| where Type='write'

フィールドの処理とフィルタリング

完全一致モードでフィールドを検索し、フィールドの名前を変更します。

SELECT "__tag__:node" AS node, path
  • 完全一致モードでフィールドを検索し、フィールドの名前を変更します。

    | project node="__tag__:node", path
  • モード別にフィールドを検索します。

    | project -wildcard "__tag__:*"
  • 他のフィールドに影響を与えずにフィールドの名前を変更します。

    | project-rename node="__tag__:node"
  • モード別にフィールドを削除します。

    | project-away -wildcard "__tag__:*"

データの標準化

(SQL 関数の呼び出し)

データ型を変換し、時間を解析します。

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

データ型を変換し、時間を解析します。

| extend Status=cast(Status as BIGINT), extend Time=date_parse(Time, '%Y-%m-%d %H:%i')

フィールドの抽出

正規表現を使用してデータを抽出します。

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

JSON データを抽出します。

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time
  • 1 回の一致に基づいて、正規表現を使用してデータを抽出します。

    | parse-regexp protocol, '(\w+)/(\d+)' as scheme, version
  • 完全展開に基づいて JSON データを抽出します。

    | parse-json -path='$.0' content
  • CSV ファイルからデータを抽出します。

    | parse-csv -delim='^_^' content as ip, time, host