Alibaba Cloud Realtime Compute (Flink) は、Log Service のソーステーブルを作成することにより、Log Service のログデータを直接消費することができます。
Log Service はストリーミングデータを保存します。 ストリーミングデータは、Realtime Compute の入力データとして使用することができます。
Log Service では、各ログにキー/値のペアであるフィールドのセットが含まれています。 ログに次の内容があると想定します。
__source__: 11.85.123.199
__tag__:__receive_time__: 1562125591
__topic__: test-topic
a: 1234
b: 0
c: hello
次のデータ定義言語 (DDL) ステートメントを使用して、Realtime Compute でテーブルを作成することができます。
create table sls_stream(
a int,
b int,
c varchar
) with (
type ='sls',
endPoint ='<your endpoint>',
accessId ='<your access key id>',
accessKey ='<your access key>',
startTime = '2017-07-05 00:00:00',
project ='ali-cloud-streamtest',
logStore ='stream-test',
consumerGroup ='consumerGroupTest1'
);
Realtime Compute は、ログフィールドに加えて、
__receive_time__
などの 3 つの属性フィールドとカスタムフィールドをタグで抽出することもできます。 次の表では、予約フィールドについて説明します。
フィールド | 説明 |
---|---|
__source__ |
ログのソースです。 |
__topic__ |
ログのトピックです。 |
__timestamp__ |
ログが生成された時刻です。 |
属性フィールドを抽出するには、HEADER を追加してフィールドを宣言する必要があります。 例:
create table sls_stream(
__timestamp__ bigint HEADER,
__receive_time__ bigint HEADER
b int,
c varchar
) with (
type ='sls',
endPoint ='<your endpoint>',
accessId ='<your access key id>',
accessKey ='<your access key>',
startTime = '2017-07-05 00:00:00',
project ='ali-cloud-streamtest',
logStore ='stream-test',
consumerGroup ='consumerGroupTest1'
);
WITH parameters
パラメーター | 必須項目 | 説明 |
---|---|---|
endPoint | はい | Log Service のエンドポイントです。 |
accessId | はい | Log Service へのアクセスに使用される AccessKey ID です。 |
accessKey | はい | Log Service へのアクセスに使用されるAccessKey シークレットです。 |
project | はい | Log Service のプロジェクトの名前です。 |
logStore | はい | Log Service の Logstore 名です。 |
consumerGroup | いいえ | 消費者グループの名前です。 |
startTime | いいえ | Realtime Compute がログの消費を開始する時刻です。 |
heartBeatIntervalMills | いいえ | 消費クライアントのハートビート間隔です。 単位: 秒です。 デフォルト値: 10 です。 |
maxRetryTimes | いいえ | データの読み取りを試行する最大回数です。 デフォルト値: 5 です。 |
batchGetSize | いいえ | 一度に読み取られるロググループの数です。 デフォルト値: 10 です。 Flink バージョンが 1.4.2 以降の場合は、デフォルト値は 100 で、最大値は 1000 です。 |
columnErrorDebug | いいえ | デバッグを有効にするかどうかを指定します。 デバッグが有効になっている場合は、解析に失敗したログが表示されます。 デフォルト値: false です。 |
タイプマッピング
Log Service のすべてのログフィールドは文字列型です。 次の表は、Log Service フィールドのタイプと Realtime Compute フィールドのタイプの間のマッピングを示しています。
DDL ステートメントでマッピングを宣言することを強く推奨します。
他のタイプが使用されている場合は、Realtime Compute は自動的にタイプの変換も試みます。 たとえば、値が 1000 および 2018-01-12 12:00:00
のフィールドは、それぞれ bigint および timestamp タイプのフィールドとして定義することもできます。
Log Service のフィールドタイプ | Realtime Compute のフィールドタイプ |
---|---|
STRING | VARCHAR |
注
- 2.2.0 より前の Blink バージョンは、シャードスケーリングをサポートしていません。 ジョブが Logstore からデータを読み取っているときにシャードを分割またはマージすると、ジョブが繰り返し失敗して回復できない場合があります。 この場合は、ジョブを再起動して通常に戻す必要があります。
- すべての Blink バージョンでは、ログが消費されている Logstore を削除または再作成することはできません。
- Blink バージョン 1.6.0 以前の場合は、多数のシャードを含む Logstore からログを消費するように消費者グループを指定すると、読み取りパフォーマンスが影響を受ける可能性があります。
- 現在、Log Service はマップタイプのデータをサポートしていません。
- 存在しないフィールドは null に設定されます。
- ソーステーブルのフィールドと同じ順序でフィールドを定義することを推奨します。 順不同フィールドもサポートされています。
- batchGetSize パラメーターは、一度に読み取られるロググループの数を指定します。 各ログのサイズと batchGetSize パラメーターの値がどちらも大きい場合は、ガベージコレクション (GC) が頻繁に生じる可能性があります。
よくある質問
- 新しいデータがシャードに書き込まれない場合は、ジョブの全体的なレイテンシが増加するか、一部のウィンドウに集約されたジョブに出力がありません。 この場合は、同時実行ジョブの数を調整して、データの読み取りと書き込みが可能なシャードの数と同じになるようにする必要があります。
- 同時実行ジョブの数をシャードの数と同じに設定することを推奨します。 一貫性がない場合は、ジョブが 2 つのシャードから非常に異なる速度で履歴データを読み取るときに、データが除外される可能性があります。
__tag__:__hostname__
および__tag__:__path__
のようなタグのフィールドを抽出するには、__ tag__: プレフィックスを削除し、属性フィールドを抽出する方法に従います。注 このタイプのデータは、デバッグ中に抽出することができません。 ログにデータを表示するには、ローカルのデバッグ方法と印刷方法を使用することを推奨します。