LogHub (SLS) データソースを使用すると、Simple Log Service (SLS) からのデータの読み取りと書き込みができます。このトピックでは、DataWorks が LogHub (SLS) のデータ同期をどのようにサポートするかについて説明します。
制限事項
Data Integration が LogHub (SLS) にオフラインで書き込みを行う際、フェールオーバー後にタスクが再実行されると、データの重複が発生する可能性があります。これは、LogHub (SLS) がべき等ではないためです。
サポートされているフィールドタイプ
Data Integration は、以下の LogHub (SLS) フィールドタイプの読み取りと書き込みをサポートしています。
フィールドタイプ | オフライン読み取り (LogHub (SLS) Reader) | オフライン書き込み (LogHub (SLS) Writer) | リアルタイム読み取り |
STRING | サポート済み | サポート済み | サポート済み |
詳細:
オフラインモードで LogHub (SLS) にデータを書き込む場合
サポートされているすべてのデータ型は、LogHub (SLS) に書き込まれる前に STRING 型に変換されます。次の表に、LogHub (SLS) writer のデータ型変換を示します。
サポートされている Data Integration 内部タイプ
LogHub (SLS) への書き込み時のデータ型
LONG
STRING
DOUBLE
STRING
STRING
STRING
DATE
STRING
BOOLEAN
STRING
BYTES
STRING
リアルタイムモードで LogHub (SLS) からデータを読み取る場合
デフォルトでは、以下のメタデータフィールドが含まれます。
LogHub (SLS) リアルタイム同期フィールド
データ型
説明
__time__
STRING
SLS 予約フィールド: __time__。ログデータが書き込まれるときに指定されるログ時間です。これは秒単位の UNIX タイムスタンプです。
__source__
STRING
SLS 予約フィールド: __source__。ログのソースデバイスです。
__topic__
STRING
SLS 予約フィールド: __topic__。Topic 名です。
__tag__:__receive_time__
STRING
ログがサーバーに到着した時刻です。パブリック IP アドレスを記録する機能を有効にすると、サーバーは受信時にこのフィールドを生ログに追加します。これは秒単位の UNIX タイムスタンプです。
__tag__:__client_ip__
STRING
ログソースデバイスのパブリック IP アドレスです。パブリック IP アドレスを記録する機能を有効にすると、サーバーは受信時にこのフィールドを生ログに追加します。
__tag__:__path__
STRING
Logtail によって収集されたログファイルのパスです。Logtail は自動的にこのフィールドをログに追加します。
__tag__:__hostname__
STRING
Logtail がデータを収集するマシンのホスト名です。Logtail は自動的にこのフィールドをログに追加します。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの infotip を表示して、パラメーターの意味を理解できます。
データ同期タスクの開発
同期タスクを構成するためのエントリポイントと手順については、以下の構成ガイドをご参照ください。
LogHub をソースとして使用する場合、LogHub クエリ構文または構造化プロセス言語 (SPL) 文を使用してデータをフィルターできます。構文の詳細については、「付録 2: フィルタリングのための SPL 構文」をご参照ください。
単一テーブルのオフライン同期タスクの構成
詳細については、「コードレス UI でタスクを構成する」および「コードエディタでタスクを構成する」をご参照ください。
説明コードレス UI で同期タスクを構成する場合、パラメーターのフォーマットが「付録 1: スクリプト例とパラメーターの説明」で説明されているフォーマットと一致していることを確認してください。
コードエディタのすべてのパラメーターとスクリプト例については、「付録 1: スクリプト例とパラメーターの説明」をご参照ください。
単一テーブルのリアルタイム同期タスクの構成
詳細については、「DataStudio でリアルタイム同期タスクを構成する」および「Data Integration でリアルタイム同期タスクを構成する」をご参照ください。
データベース全体のリアルタイム同期タスクの構成
詳細については、「データベース全体のリアルタイム同期タスクを構成する」をご参照ください。
よくある質問
Data Integration のその他のよくある質問については、「Data Integration FAQ」をご参照ください。
付録 1: スクリプト例とパラメーターの説明
コードエディタを使用したバッチ同期タスクの構成
コードエディタを使用してバッチ同期タスクを構成する場合、統一されたスクリプトフォーマットの要件に基づいてスクリプト内の関連パラメーターを構成する必要があります。詳細については、「コードエディタでタスクを構成する」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを構成する際にデータソースに対して構成する必要があるパラメーターについて説明します。
Reader スクリプトの例
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"LogHub",// プラグイン名。
"parameter":{
"datasource":"",// データソース。
"column":[// フィールド。
"col0",
"col1",
"col2",
"col3",
"col4",
"C_Category",
"C_Source",
"C_Topic",
"C_MachineUUID", // Topic。
"C_HostName", // ホスト名。
"C_Path", // パス。
"C_LogTime" // イベント時間。
],
"beginDateTime":"",// データ消費の開始時刻。
"batchSize":"",// Simple Log Service から一度にクエリするデータエントリの数。
"endDateTime":"",// データ消費の終了時刻。
"fieldDelimiter":",",// 列デリミタ。
"logstore":""// 宛先 Logstore の名前。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// エラーレコードの数。
},
"speed":{
"throttle":true,// throttle が false に設定されている場合、mbps パラメーターは効果がなく、データレートは制限されません。throttle が true に設定されている場合、データレートは制限されます。
"concurrent":1, // 同時ジョブの数。
"mbps":"12"// 最大データレート。1 mbps = 1 MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Reader スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
endPoint | Simple Log Service のエンドポイント。エンドポイントは、プロジェクトとそのログデータにアクセスするために使用される URL です。エンドポイントは、プロジェクトが配置されている Alibaba Cloud リージョンとプロジェクト名に関連しています。各リージョンのエンドポイントについては、「エンドポイント」をご参照ください。 | はい | なし |
accessId | Simple Log Service へのアクセスに使用される AccessKey ID。ユーザーを識別します。 | はい | なし |
accessKey | Simple Log Service へのアクセスに使用される AccessKey Secret。ユーザーを認証します。 | はい | なし |
project | 宛先の Simple Log Service プロジェクトの名前。プロジェクトは、リソースを分離および制御するために使用される Simple Log Service のリソース管理ユニットです。 | はい | なし |
logstore | 宛先 Logstore の名前。Logstore は、Simple Log Service におけるログデータの収集、ストレージ、およびクエリの単位です。 | はい | なし |
batchSize | Simple Log Service から一度にクエリするデータエントリの数。 | いいえ | 128 |
column | 各データエントリの列名。Simple Log Service のメタデータを同期列として構成できます。Simple Log Service は、Topic、一意のマシングループ識別子、ホスト名、パス、ログ時間などのメタデータをサポートしています。 説明 列名では大文字と小文字が区別されます。メタデータの書き方については、「Simple Log Service マシングループ」をご参照ください。 | はい | なし |
beginDateTime | データ消費の開始オフセット。これは、ログデータが LogHub (SLS) に到着した時刻です。このパラメーターは、時間範囲の開始を指定します (開始時刻を含む)。時刻は yyyyMMddHHmmss 形式の文字列である必要があります (例: 20180111013000)。このパラメーターは DataWorks のスケジューリングパラメーターと併用できます。 たとえば、ノード編集ページの [スケジューリング設定] タブで、[パラメーター] を 説明
| はい | なし |
endDateTime | データ消費の終了オフセット。このパラメーターは、時間範囲の終了を指定します (終了時刻を含まない)。時刻は yyyyMMddHHmmss 形式の文字列である必要があります (例: 20180111013010)。このパラメーターは DataWorks のスケジューリングパラメーターと併用できます。 たとえば、ノード編集ページの [スケジューリング設定] タブで、[パラメーター] を endDateTime=${yyyymmdd} に設定します。次に、[ログ終了時刻] を ${endDateTime}000000 に設定します。これにより、ログの終了時刻がデータタイムスタンプの翌日の 00:00:00 に設定されます。詳細については、「スケジューリングパラメーターでサポートされている形式」をご参照ください。 重要
| はい | なし |
query | LogHub クエリ構文または SPL 文を使用して LogHub のデータをフィルターします。SPL (構造化プロセス言語) は、SLS がログを処理するために使用する構文です。 | はい | なし |
LogHub からデータを読み取った後にデータが欠落している場合は、LogHub コンソールに移動し、receive_time メタデータフィールドがタスクに構成された時間範囲内にあるかどうかを確認してください。
Writer スクリプトの例
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "LogHub",// プラグイン名。
"parameter": {
"datasource": "",// データソース。
"column": [// フィールド。
"col0",
"col1",
"col2",
"col3",
"col4",
"col5"
],
"topic": "",// Topic を選択します。
"batchSize": "1024",// バッチ送信のレコード数。
"logstore": ""// 宛先の Simple Log Service Logstore の名前。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// エラーレコードの数。
},
"speed": {
"throttle":true,// throttle が false に設定されている場合、mbps パラメーターは効果がなく、データレートは制限されません。throttle が true に設定されている場合、データレートは制限されます。
"concurrent":3, // 同時ジョブの数。
"mbps":"12"// 最大データレート。1 mbps = 1 MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer スクリプトのパラメーター
LogHub (SLS) writer は、Data Integration フレームワークを介して reader からデータを取得します。次に、writer はサポートされている Data Integration のデータ型を STRING 型に変換します。レコード数が指定された batchSize に達すると、データは Simple Log Service Java SDK を使用して単一のバッチで LogHub (SLS) にプッシュされます。
パラメーター | 説明 | 必須 | デフォルト値 |
endpoint | Simple Log Service のエンドポイント。エンドポイントは、プロジェクトとそのログデータにアクセスするために使用される URL です。エンドポイントは、プロジェクトが配置されている Alibaba Cloud リージョンとプロジェクト名に関連しています。各リージョンのエンドポイントについては、「エンドポイント」をご参照ください。 | はい | なし |
accessKeyId | Simple Log Service へのアクセスに使用される AccessKeyId。 | はい | なし |
accessKeySecret | Simple Log Service へのアクセスに使用される AccessKeySecret。 | はい | なし |
project | 宛先の Simple Log Service プロジェクトの名前。 | はい | なし |
logstore | 宛先 Logstore の名前。Logstore は、Simple Log Service におけるログデータの収集、ストレージ、およびクエリの単位です。 | はい | なし |
topic | 宛先の Simple Log Service の topic 名。 | いいえ | Empty string |
batchSize | 一度に LogHub (SLS) に同期するデータエントリの数。デフォルト値は 1,024 です。最大値は 4,096 です。 説明 1 回のバッチで LogHub (SLS) に同期されるデータのサイズは 5 MB を超えることはできません。1 つのデータエントリのサイズに基づいて、一度にプッシュするエントリの数を調整してください。 | いいえ | 1,024 |
column | 各データエントリの列名。 | はい | なし |
付録 2: フィルタリングのための SPL 構文
LogHub をソースとして使用する場合、LogHub クエリ構文または構造化プロセス言語 (SPL) 文を使用して LogHub からデータをフィルターできます。次の表に構文を示します。
SPL の詳細については、「SPL 構文」をご参照ください。
シナリオ | SQL 文 | SPL 文 |
データフィルタリング | |
|
フィールドの処理とフィルタリング | 特定のフィールドを選択して名前を変更します: |
|
データクレンジング (SQL 関数の呼び出し) | データ型を変換し、時刻を解析するなど: | データ型を変換し、時刻を解析するなど: |
フィールド抽出 | 正規表現抽出: JSON 抽出: |
|