LogHub (SLS) データソースを使用すると、Simple Log Service (SLS) からデータを読み取ったり、SLS へデータを書き込んだりできます。本トピックでは、DataWorks が LogHub (SLS) とのデータ同期をどのようにサポートしているかについて説明します。
機能
DataWorks は、LogHub (SLS) とのデータ同期に関して以下のシナリオをサポートしています。
-
LogHub (SLS) と MaxCompute などのデータソース間で、異なるリージョンにわたるデータ同期を行います。
-
LogHub (SLS) と MaxCompute などのデータソース間で、異なる Alibaba Cloud アカウントにわたるデータ同期を行います。
-
LogHub (SLS) と MaxCompute などのデータソース間で、同一の Alibaba Cloud アカウント内でのデータ同期を行います。
-
LogHub (SLS) と MaxCompute などのデータソース間で、パブリッククラウドアカウントと金融クラウドアカウントにわたるデータ同期を行います。
制限事項
Data Integration が LogHub (SLS) に対してオフライン書き込みを行う場合、フェールオーバー後にタスクを再実行すると、データ重複が発生する可能性があります。これは、LogHub (SLS) が冪等性を持たないためです。
サポートされるフィールドの型
Data Integration は、以下の LogHub (SLS) のフィールドの型の読み取りおよび書き込みをサポートしています。
フィールドの型 | オフライン読み取り(LogHub (SLS) Reader) | オフライン書き込み(LogHub (SLS) Writer) | リアルタイム読み取り |
STRING | サポート済み | サポート済み | サポート済み |
詳細情報:
LogHub (SLS) へのオフライン書き込み時
すべてのサポート対象データ型は、LogHub (SLS) へ書き込む前に STRING 型に変換されます。以下の表に、LogHub (SLS) Writer におけるデータ型の変換関係を示します。
サポート対象の Data Integration 内部型
LogHub (SLS) への書き込み時のデータ型
LONG
STRING
DOUBLE
STRING
STRING
STRING
DATE
STRING
BOOLEAN
STRING
BYTES
STRING
LogHub (SLS) からのリアルタイム読み取り時
以下のメタデータフィールドがデフォルトで含まれます。
LogHub (SLS) リアルタイム同期フィールド
データ型
説明
__time__
STRING
SLS 予約フィールド:__time__。ログデータの書き込み時に指定されたログ時刻です。UNIX タイムスタンプ(秒単位)です。
__source__
STRING
SLS 予約フィールド:__source__。ログの送信元デバイスです。
__topic__
STRING
SLS 予約フィールド:__topic__。トピック名です。
__tag__:__receive_time__
STRING
ログがサーバーに到着した時刻です。パブリック IP アドレスの記録機能を有効化している場合、サーバーは受信時にこのフィールドを生ログに追加します。UNIX タイムスタンプ(秒単位)です。
__tag__:__client_ip__
STRING
ログ送信元デバイスのパブリック IP アドレスです。パブリック IP アドレスの記録機能を有効化している場合、サーバーは受信時にこのフィールドを生ログに追加します。
__tag__:__path__
STRING
Logtail によって収集されたログファイルのパスです。Logtail が自動的にこのフィールドをログに追加します。
__tag__:__hostname__
STRING
Logtail がデータを収集するマシンのホスト名です。Logtail が自動的にこのフィールドをログに追加します。
データソースの作成
データソースの構成
データ同期タスクを開発する前に、DataWorks でデータソースを作成する必要があります。詳細については、「データソースの管理」をご参照ください。各パラメーターの詳細な説明については、構成ページのツールチップをご確認ください。
クロスアカウントデータソースの作成
本例では、Alibaba Cloud アカウント A の LogHub (SLS) インスタンスから、Alibaba Cloud アカウント B の MaxCompute インスタンスへデータを同期する方法を示します。このタスクは、アカウント B の Data Integration サービスで構成されます。
-
Alibaba Cloud アカウント A の AccessKey ID および AccessKey Secret を使用して、LogHub (SLS) データソースを作成します。
これにより、アカウント B はアカウント A のすべての Simple Log Service プロジェクトからデータを同期できます。
-
Alibaba Cloud アカウント A の RAM ユーザー(例:A1)の AccessKey ID および AccessKey Secret を使用して、LogHub (SLS) データソースを作成します。
-
アカウント A は、RAM ユーザー A1 に Simple Log Service の一般的な権限(
AliyunLogFullAccessおよびAliyunLogReadOnlyAccess)を付与します。詳細については、「RAM ユーザーの作成と権限付与」をご参照ください。説明AliyunLogFullAccessおよびAliyunLogReadOnlyAccessのシステムポリシーを RAM アカウントに付与すると、その RAM アカウントは、親アカウント下のすべての Simple Log Service リソースを照会できます。 -
アカウント A は、RAM ユーザー A1 に Simple Log Service のカスタム権限を付与します。
アカウント A で にログインし、「権限 > ポリシー」ページに移動して、ポリシーの作成 をクリックします。
権限付与の詳細については、「はじめに」および「概要」をご参照ください。
以下のポリシーを使用して権限を付与した後、アカウント B は RAM ユーザー A1 を使用して、Simple Log Serviceプロジェクト
project_name1およびproject_name2のみからデータを同期できます。{ "Version": "1", "Statement": [ { "Action": [ "log:Get*", "log:List*", "log:CreateConsumerGroup", "log:UpdateConsumerGroup", "log:DeleteConsumerGroup", "log:ListConsumerGroup", "log:ConsumerGroupUpdateCheckPoint", "log:ConsumerGroupHeartBeat", "log:GetConsumerGroupCheckPoint" ], "Resource": [ "acs:log:*:*:project/project_name1", "acs:log:*:*:project/project_name1/*", "acs:log:*:*:project/project_name2", "acs:log:*:*:project/project_name2/*" ], "Effect": "Allow" } ] }
-
データ同期タスクの開発
同期タスクの設定入口および手順については、以下の構成ガイドをご参照ください。
LogHub をソースとして使用する場合、LogHub クエリ構文または構造化プロセス言語(SPL)ステートメントを使用してデータをフィルター処理できます。構文の詳細については、「付録 2:フィルター処理用 SPL 構文」をご参照ください。
単一テーブルオフライン同期タスクの構成
詳細については、「コードレス UI によるバッチ同期の構成」および「コードエディタの使用」をご参照ください。
説明コードレス UI で同期タスクを構成する場合、パラメーターの形式が「付録 1:スクリプト例およびパラメーター説明」で説明されている形式と一致していることを確認してください。
コードエディタ向けの全パラメーターおよびスクリプト例については、「付録 1:スクリプト例およびパラメーター説明」をご参照ください。
単一テーブルリアルタイム同期タスクの構成
詳細については、「単一テーブルリアルタイム同期タスクの構成」をご参照ください。
全体データベースリアルタイム同期タスクの構成
詳細については、「全体データベースリアルタイム同期タスクの構成」をご参照ください。
よくある質問
Data Integration のその他のよくある質問については、「Data Integration よくある質問」をご参照ください。
付録 1:スクリプト例およびパラメーター説明
コードエディタを使用したバッチ同期タスクの構成
コードエディタを使用してバッチ同期タスクを構成する場合、統一スクリプト形式の要件に基づいて、関連するパラメーターをスクリプトで構成する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下に、コードエディタを使用してバッチ同期タスクを構成する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader スクリプト例
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"LogHub",// プラグイン名。
"parameter":{
"datasource":"",// データソース。
"column":[// フィールド。
"col0",
"col1",
"col2",
"col3",
"col4",
"C_Category",
"C_Source",
"C_Topic",
"C_MachineUUID", // トピック。
"C_HostName", // ホスト名。
"C_Path", // パス。
"C_LogTime" // イベント時刻。
],
"beginDateTime":"",// データ消費の開始時刻。
"batchSize":"",// Simple Log Service から一度にクエリするデータ件数。
"endDateTime":"",// データ消費の終了時刻。
"fieldDelimiter":",",// カラム区切り文字。
"logstore":""// 送信先 Logstore の名前。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// エラー件数。
},
"speed":{
"throttle":true,// throttle が false の場合、mbps パラメーターは無効となり、データ転送速度は制限されません。throttle が true の場合、データ転送速度が制限されます。
"concurrent":1, // 同時実行ジョブ数。
"mbps":"12"// 最大データ転送速度。1 mbps = 1 MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Reader スクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
endPoint | Simple Log Service のエンドポイントです。エンドポイントは、プロジェクトおよびそのログデータにアクセスするための URL です。エンドポイントは、プロジェクトが配置されている Alibaba Cloud リージョンおよびプロジェクト名に関連付けられます。各リージョンのエンドポイントについては、「エンドポイント」をご参照ください。 | はい | なし |
accessId | Simple Log Service にアクセスするために使用する AccessKey ID です。ユーザーを識別します。 | はい | なし |
accessKey | Simple Log Service にアクセスするために使用する AccessKey Secret です。ユーザーを認証します。 | はい | なし |
project | 送信先 Simple Log Service プロジェクトの名前です。プロジェクトは、Simple Log Service におけるリソース管理単位であり、リソースの分離および制御に使用されます。 | はい | なし |
logstore | 送信先 Logstore の名前です。Logstore は、Simple Log Service におけるログデータの収集、保存、およびクエリの単位です。 | はい | なし |
batchSize | Simple Log Service から一度にクエリするデータ件数です。 | いいえ | 128 |
column | 各データ入力のカラム名です。Simple Log Service のメタデータを同期カラムとして構成できます。Simple Log Service は、トピック、一意のマシングループ識別子、ホスト名、パス、ログ時刻などのメタデータをサポートしています。 説明 カラム名は大文字小文字を区別します。メタデータの記述方法については、「Simple Log Service のマシングループ」をご参照ください。 | はい | なし |
beginDateTime | データ消費の開始オフセットです。これは、ログデータが LogHub (SLS) に到着した時刻です。このパラメーターは、包含される開始時刻範囲を指定します。時刻は yyyyMMddHHmmss 形式の文字列である必要があります(例:20180111013000)。DataWorks のスケジューリングパラメーターと併用できます。 たとえば、ノード編集ページの スケジューリング構成 タブで、パラメーター を 説明
| はい | なし |
endDateTime | データ消費の終了オフセットです。このパラメーターは、排他的な終了時刻範囲を指定します。時刻は yyyyMMddHHmmss 形式の文字列である必要があります(例:20180111013010)。DataWorks のスケジューリングパラメーターと併用できます。 たとえば、ノード編集ページの スケジューリング構成 タブで、パラメーター を endDateTime=${yyyymmdd} に設定します。次に、ログ終了時刻 を ${endDateTime}000000 に設定します。これにより、ログ終了時刻がデータタイムスタンプの翌日の 00:00:00 になります。詳細については、「スケジューリングパラメーターのサポート形式」をご参照ください。 重要
| はい | なし |
query | LogHub の LogHub クエリ構文 または SPL ステートメント を使用して、LogHub のデータをフィルター処理します。SPL(Structured Process Language)は、SLS がログ処理に使用する構文です。 | はい | なし |
LogHub から読み取ったデータが欠落している場合は、LogHub コンソールにアクセスし、receive_time メタデータフィールドの値がタスクで構成された時刻範囲内にあるかどうかを確認してください。
Writer スクリプト例
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "LogHub",// プラグイン名。
"parameter": {
"datasource": "",// データソース。
"column": [// フィールド。
"col0",
"col1",
"col2",
"col3",
"col4",
"col5"
],
"topic": "",// トピックを選択します。
"batchSize": "1024",// 一括送信するレコード数。
"logstore": ""// 送信先 Simple Log Service Logstore の名前。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// エラー件数。
},
"speed": {
"throttle":true,// throttle が false の場合、mbps パラメーターは無効となり、データ転送速度は制限されません。throttle が true の場合、データ転送速度が制限されます。
"concurrent":3, // 同時実行ジョブ数。
"mbps":"12"// 最大データ転送速度。1 mbps = 1 MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer スクリプトパラメーター
LogHub (SLS) Writer は、Data Integration フレームワークを介して Reader からデータを取得します。その後、サポート対象の Data Integration データ型を STRING 型に変換します。指定された batchSize に達すると、Simple Log Service Java SDK を使用して、データを一括で LogHub (SLS) にプッシュします。
パラメーター | 説明 | 必須 | デフォルト値 |
endpoint | Simple Log Service のエンドポイントです。エンドポイントは、プロジェクトおよびそのログデータにアクセスするための URL です。エンドポイントは、プロジェクトが配置されている Alibaba Cloud リージョンおよびプロジェクト名に関連付けられます。各リージョンのエンドポイントについては、「エンドポイント」をご参照ください。 | はい | なし |
accessKeyId | Simple Log Service にアクセスするために使用する AccessKeyId です。 | はい | なし |
accessKeySecret | Simple Log Service にアクセスするために使用する AccessKeySecret です。 | はい | なし |
project | 送信先 Simple Log Service プロジェクトの名前です。 | はい | なし |
logstore | 送信先 Logstore の名前です。Logstore は、Simple Log Service におけるログデータの収集、保存、およびクエリの単位です。 | はい | なし |
topic | 送信先 Simple Log Service の topic 名です。 | いいえ | 空文字列 |
batchSize | LogHub (SLS) へ一度に同期するデータ件数です。デフォルト値は 1,024 です。最大値は 4,096 です。 説明 LogHub (SLS) へ一括で同期するデータのサイズは、5 MB を超えてはなりません。単一データ入力のサイズに応じて、一度にプッシュする件数を調整してください。 | いいえ | 1,024 |
column | 各データ入力のカラム名です。 | はい | なし |
付録 2:フィルター処理用 SPL 構文
LogHub をソースとして使用する場合、LogHub クエリ構文または構造化プロセス言語(SPL)ステートメントを使用して LogHub のデータをフィルター処理できます。以下の表に構文を示します。
SPL の詳細については、「SPL 構文」をご参照ください。
シナリオ | SQL ステートメント | SPL ステートメント |
データフィルター処理 | |
|
フィールド処理およびフィルター処理 | 特定のフィールドを選択して名前を変更: |
|
データクリーニング (SQL 関数の呼び出し) | データ型の変換、時刻の解析など: | データ型の変換、時刻の解析など: |
フィールド抽出 | 正規表現による抽出: JSON 抽出: |
|