このドキュメントでは、Realtime Compute で Log Service ソーステーブルを作成する方法について説明します。 また、テーブル作成プロセスに含まれる属性フィールド、WITH パラメーター、およびフィールドタイプマッピングについても説明します。
Log Service の概要
Log Service は、Alibaba Group が多くのビッグデータシナリオで開発し、テストしたオールインワンのリアルタイムデータロギングサービスです。 Log Service では、追加の開発作業なしで、データの取り込み、消費、配信、照会、分析などのタスクをすばやく完了できます。 これにより、O&M と運用効率が向上し、データテクノロジー時代に大量のログを処理する機能を構築できます。 Log Service は、ストリーミングデータストレージシステムです。 Realtime Compute は、Log Service テーブルをソーステーブルとして作成することをサポートしています。 Log Service では、各データレコードは JSON に類似した形式です。 例は次のとおりです。
{
"a": 1000,
"b": 1234,
"c": "li"
}
Realtime Compute は、次の DDL を定義する必要があります (sls は Log Service を示します )。
create table sls_stream(
a int,
b int,
c VARCHAR
) with (
type ='sls',
endPoint ='yourEndpoint',
accessId ='yourAccessId',
accessKey ='yourAccessKey',
startTime = 'yourStartTime',
project ='yourProjectName',
logStore ='yourLogStoreName',
consumerGroup ='yourConsumerGroupName'
);
属性フィールド
現在、Flink SQL は、 Log Service の次の 3 つの属性フィールドの取得と、他のカスタムフィールドの書き込みをデフォルトでサポートしています。
フィールド | 説明 |
---|---|
__source__ |
メッセージのソースです。 |
__topic__ |
メッセージのトピックです。 |
__timestamp__ |
ログが生成された時刻です。 |
属性フィールドを取得するには、通常のロジックに従ってフィールドを最初に宣言する必要があります。 次に、キーワード HEADER
を型宣言の最後に追加します。 例:
- テストデータ
__topic__: ens_altar_flow result: {"MsgID":"ems0a","Version":"0.0.1"}
- テスト文
CREATE TABLE sls_log ( __topic__ varchar HEADER, result varchar ) WITH ( type ='sls', ); CREATE TABLE sls_out ( name varchar, MsgID varchar, Version varchar ) WITH ( type ='RDS' ); INSERT INTO sls_out SELECT __topic__, JSON_VALUE(result,'$. MsgID'), JSON_VALUE(result,'$. Version') FROM sls_log
- テスト結果
name(VARCAHR) MsgID(VARCAHR) Version(VARCAHR) ens_altar_flow ems0a 0.0.1
WITH パラメーター
名前 | 説明 | 備考 |
---|---|---|
endPoint | 消費エンドポイント情報。 | サービスエンドポイント |
accessId | Log Service の AccessKey ID。 | なし |
accessKey | Log Service の AccessKey シークレット。 | なし |
project | アクセスする Log Service プロジェクト。 | なし |
logStore | Log Service プロジェクトの LogStore。 | なし |
consumerGroup | コンシューマグループの名前。 | コンシューマグループ名をカスタマイズできます (固定形式なし)。 |
startTime | ログが消費される開始時刻。 | なし |
heartBeatIntervalMills | 消費クライアントのハートビート間隔。 | オプション。 デフォルト値:10 秒。 |
maxRetryTimes | リードリトライの最大数。 | オプション。 デフォルト値 : 5。 |
batchGetSize | ロググループで一度に読み取られるログアイテムの数。 | オプション。 デフォルト値 : 10 。 |
lengthCheck | 1 行のフィールド数をチェックするためのポリシー。 | オプション。 デフォルト値 : NONE。 有効な値:NONE、SKIP、EXCEPTION、および PAD。
|
columnErrorDebug | デバッグを有効にするかどうかを示します。 このパラメーターが true に設定されている場合、例外の解析に関するログが表示されます。 | オプション。 デフォルト値 : false 。 |
- Log Service は、MAP タイプのデータをサポートしていません。
- フィールドは順序付けできません。 ただし、参照先テーブルで定義されているものと同じフィールド順序を使用することを推奨します。
- 入力データソースが JSON 形式の場合は、区切り文字を定義し、ビルトイン関数を使用して JSON_VALUE を分析します。 それ以外の場合、解析は失敗し、次のエラー情報が生成されます。
2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]
- batchGetSize 値は 1000 を超えてはなりません。 超えた場合、エラーが返されます。
- batchGetSize パラメーターは、ロググループで一度に読み取られるログアイテムの数を指定します。 単一のログアイテムのサイズと batchGetSize 値の両方が大きすぎる場合、GC が頻繁にトリガーされる可能性があります。 これを回避するには、パラメーターをより小さい値に設定する必要があります。
フィールドタイプマッピング
次の表に、Log Service フィールドタイプと Realtime Compute フィールドタイプ間のマッピングを示します。 DDL 宣言でマッピングを使用することを推奨します。
Log Service フィールドタイプ | Realtime Compute フィールドタイプ |
---|---|
STRING | VARCHAR |