このドキュメントでは、Realtime Compute で Log Service ソーステーブルを作成する方法について説明します。 また、テーブル作成プロセスに含まれる属性フィールド、WITH パラメーター、およびフィールドタイプマッピングについても説明します。

Log Service の概要

Log Service は、Alibaba Group が多くのビッグデータシナリオで開発し、テストしたオールインワンのリアルタイムデータロギングサービスです。 Log Service では、追加の開発作業なしで、データの取り込み、消費、配信、照会、分析などのタスクをすばやく完了できます。 これにより、O&M と運用効率が向上し、データテクノロジー時代に大量のログを処理する機能を構築できます。 Log Service は、ストリーミングデータストレージシステムです。 Realtime Compute は、Log Service テーブルをソーステーブルとして作成することをサポートしています。 Log Service では、各データレコードは JSON に類似した形式です。 例は次のとおりです。

{
    "a": 1000,
    "b": 1234,
    "c": "li"
}
			

Realtime Compute は、次の DDL を定義する必要があります (sls は Log Service を示します )。

create table sls_stream(
  a int,
  b int,
  c VARCHAR
) with (
  type ='sls',
  endPoint ='yourEndpoint',
  accessId ='yourAccessId',
  accessKey ='yourAccessKey',
  startTime = 'yourStartTime',
  project ='yourProjectName',
  logStore ='yourLogStoreName',
  consumerGroup ='yourConsumerGroupName'
);
			

属性フィールド

現在、Flink SQL は、 Log Service の次の 3 つの属性フィールドの取得と、他のカスタムフィールドの書き込みをデフォルトでサポートしています。

フィールド 説明
__source__ メッセージのソースです。
__topic__ メッセージのトピックです。
__timestamp__ ログが生成された時刻です。
属性フィールドに関する注意

属性フィールドを取得するには、通常のロジックに従ってフィールドを最初に宣言する必要があります。 次に、キーワード HEADER を型宣言の最後に追加します。 例:

  • テストデータ
         __topic__:  ens_altar_flow  
            result:  {"MsgID":"ems0a","Version":"0.0.1"}
    					
  • テスト文
    CREATE TABLE sls_log (
      __topic__  varchar HEADER,
      result     varchar  
    )
    WITH
    (
      type ='sls',
    );
    
    CREATE TABLE sls_out (
      name     varchar,
      MsgID    varchar,
      Version  varchar 
    )
    WITH
    (
      type ='RDS'
    );
    
    INSERT INTO sls_out
    SELECT 
    __topic__,
    JSON_VALUE(result,'$. MsgID'),
    JSON_VALUE(result,'$. Version')
    FROM
    sls_log
    					
  • テスト結果
    name(VARCAHR) MsgID(VARCAHR) Version(VARCAHR)
    ens_altar_flow ems0a 0.0.1

WITH パラメーター

名前 説明 備考
endPoint 消費エンドポイント情報。 サービスエンドポイント
accessId Log Service の AccessKey ID。 なし
accessKey Log Service の AccessKey シークレット。 なし
project アクセスする Log Service プロジェクト。 なし
logStore Log Service プロジェクトの LogStore。 なし
consumerGroup コンシューマグループの名前。 コンシューマグループ名をカスタマイズできます (固定形式なし)。
startTime ログが消費される開始時刻。 なし
heartBeatIntervalMills 消費クライアントのハートビート間隔。 オプション。 デフォルト値:10 秒。
maxRetryTimes リードリトライの最大数。 オプション。 デフォルト値 : 5。
batchGetSize ロググループで一度に読み取られるログアイテムの数。 オプション。 デフォルト値 : 10 。
lengthCheck 1 行のフィールド数をチェックするためのポリシー。 オプション。 デフォルト値 : NONE。 有効な値:NONE、SKIP、EXCEPTION、および PAD。
  • SKIP:レコード内のフィールドの数が指定した数と一致しない場合、データレコードをスキップします。
  • EXCEPTION:レコード内のフィールドの数が指定された数と一致しない場合、例外をスローします。
  • PAD:フィールドを順番にパディングします。 フィールドが存在しない場合は null パディングします。
columnErrorDebug デバッグを有効にするかどうかを示します。 このパラメーターが true に設定されている場合、例外の解析に関するログが表示されます。 オプション。 デフォルト値 : false 。
  • Log Service は、MAP タイプのデータをサポートしていません。
  • フィールドは順序付けできません。 ただし、参照先テーブルで定義されているものと同じフィールド順序を使用することを推奨します。
  • 入力データソースが JSON 形式の場合は、区切り文字を定義し、ビルトイン関数を使用して JSON_VALUE を分析します。 それ以外の場合、解析は失敗し、次のエラー情報が生成されます。
    2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]
    						
  • batchGetSize 値は 1000 を超えてはなりません。 超えた場合、エラーが返されます。
  • batchGetSize パラメーターは、ロググループで一度に読み取られるログアイテムの数を指定します。 単一のログアイテムのサイズと batchGetSize 値の両方が大きすぎる場合、GC が頻繁にトリガーされる可能性があります。 これを回避するには、パラメーターをより小さい値に設定する必要があります。

フィールドタイプマッピング

次の表に、Log Service フィールドタイプと Realtime Compute フィールドタイプ間のマッピングを示します。 DDL 宣言でマッピングを使用することを推奨します。

Log Service フィールドタイプ Realtime Compute フィールドタイプ
STRING VARCHAR