Alibaba Cloud Realtime Compute (Flink) は、Log Service のソーステーブルを作成することにより、Log Service のログデータを直接消費することができます。

Log Service はストリーミングデータを保存します。 ストリーミングデータは、Realtime Compute の入力データとして使用することができます。 Log Service では、各ログにキー/値のペアであるフィールドのセットが含まれています。 ログに次の内容があると想定します。
__source__: 11.85.123.199
__tag__:__receive_time__: 1562125591
__topic__: test-topic
a: 1234
b: 0
c: hello
次のデータ定義言語 (DDL) ステートメントを使用して、Realtime Compute でテーブルを作成することができます。
create table sls_stream(
  a int,
  b int,
  c varchar
) with (
  type ='sls',
  endPoint ='<your endpoint>',
  accessId ='<your access key id>',
  accessKey ='<your access key>',
  startTime = '2017-07-05 00:00:00',
  project ='ali-cloud-streamtest',
  logStore ='stream-test',
  consumerGroup ='consumerGroupTest1'
);
Realtime Compute は、ログフィールドに加えて、__receive_time__ などの 3 つの属性フィールドとカスタムフィールドをタグで抽出することもできます。 次の表では、予約フィールドについて説明します。
表 1. 属性フィールド
フィールド 説明
__source__ ログのソースです。
__topic__ ログのトピックです。
__timestamp__ ログが生成された時刻です。
属性フィールドを抽出するには、HEADER を追加してフィールドを宣言する必要があります。 例:
create table sls_stream(
  __timestamp__ bigint HEADER,
  __receive_time__ bigint HEADER
  b int,
  c varchar
) with (
  type ='sls',
  endPoint ='<your endpoint>',
  accessId ='<your access key id>',
  accessKey ='<your access key>',
  startTime = '2017-07-05 00:00:00',
  project ='ali-cloud-streamtest',
  logStore ='stream-test',
  consumerGroup ='consumerGroupTest1'
);
WITH parameters
パラメーター 必須項目 説明
endPoint はい Log Service のエンドポイントです。
accessId はい Log Service へのアクセスに使用される AccessKey ID です。
accessKey はい Log Service へのアクセスに使用されるAccessKey シークレットです。
project はい Log Service のプロジェクトの名前です。
logStore はい Log Service の Logstore 名です。
consumerGroup いいえ 消費者グループの名前です。
startTime いいえ Realtime Compute がログの消費を開始する時刻です。
heartBeatIntervalMills いいえ 消費クライアントのハートビート間隔です。 単位: 秒です。 デフォルト値: 10 です。
maxRetryTimes いいえ データの読み取りを試行する最大回数です。 デフォルト値: 5 です。
batchGetSize いいえ 一度に読み取られるロググループの数です。 デフォルト値: 10 です。 Flink バージョンが 1.4.2 以降の場合は、デフォルト値は 100 で、最大値は 1000 です。
columnErrorDebug いいえ デバッグを有効にするかどうかを指定します。 デバッグが有効になっている場合は、解析に失敗したログが表示されます。 デフォルト値: false です。

タイプマッピング

Log Service のすべてのログフィールドは文字列型です。 次の表は、Log Service フィールドのタイプと Realtime Compute フィールドのタイプの間のマッピングを示しています。 DDL ステートメントでマッピングを宣言することを強く推奨します。
Log Service のフィールドタイプ Realtime Compute のフィールドタイプ
STRING VARCHAR
他のタイプが使用されている場合は、Realtime Compute は自動的にタイプの変換も試みます。 たとえば、値が 1000 および 2018-01-12 12:00:00 のフィールドは、それぞれ bigint および timestamp タイプのフィールドとして定義することもできます。
  • 2.2.0 より前の Blink バージョンは、シャードスケーリングをサポートしていません。 ジョブが Logstore からデータを読み取っているときにシャードを分割またはマージすると、ジョブが繰り返し失敗して回復できない場合があります。 この場合は、ジョブを再起動して通常に戻す必要があります。
  • すべての Blink バージョンでは、ログが消費されている Logstore を削除または再作成することはできません。
  • Blink バージョン 1.6.0 以前の場合は、多数のシャードを含む Logstore からログを消費するように消費者グループを指定すると、読み取りパフォーマンスが影響を受ける可能性があります。
  • 現在、Log Service はマップタイプのデータをサポートしていません。
  • 存在しないフィールドは null に設定されます。
  • ソーステーブルのフィールドと同じ順序でフィールドを定義することを推奨します。 順不同フィールドもサポートされています。
  • batchGetSize パラメーターは、一度に読み取られるロググループの数を指定します。 各ログのサイズと batchGetSize パラメーターの値がどちらも大きい場合は、ガベージコレクション (GC) が頻繁に生じる可能性があります。
よくある質問
  • 新しいデータがシャードに書き込まれない場合は、ジョブの全体的なレイテンシが増加するか、一部のウィンドウに集約されたジョブに出力がありません。 この場合は、同時実行ジョブの数を調整して、データの読み取りと書き込みが可能なシャードの数と同じになるようにする必要があります。
  • 同時実行ジョブの数をシャードの数と同じに設定することを推奨します。 一貫性がない場合は、ジョブが 2 つのシャードから非常に異なる速度で履歴データを読み取るときに、データが除外される可能性があります。
  • __tag__:__hostname__ および __tag__:__path__ のようなタグのフィールドを抽出するには、__ tag__: プレフィックスを削除し、属性フィールドを抽出する方法に従います。
    このタイプのデータは、デバッグ中に抽出することができません。 ログにデータを表示するには、ローカルのデバッグ方法と印刷方法を使用することを推奨します。