Realtime Compute for Apache Flink は、Log Service (SLS) のソーステーブルを作成することで、SLS から直接データを消費できます。このトピックでは、SLS ソーステーブルの作成方法とメタデータフィールドの抽出方法について説明します。
背景情報
次の表は、Flink による Log Service (SLS) データの消費に関するサポート状況を説明しています。
|
カテゴリ |
詳細 |
|
サポートタイプ |
ソーステーブルと結果テーブル。 |
|
実行モード |
ストリーミングモードのみがサポートされています。 |
|
特定の監視メトリクス |
該当なし。 |
|
データフォーマット |
なし。 |
|
API タイプ |
SQL。 |
|
結果テーブル内のデータの更新または削除 |
結果テーブル内のデータは更新または削除できません。挿入のみがサポートされています。 |
Flink を使用したログの消費を開始するには、「Flink SQL ジョブのクイックスタート」をご参照ください。
前提条件
-
RAM ユーザーまたは RAM ロールを使用する場合、Flink コンソールを使用するために必要な権限があることを確認してください。詳細については、「権限管理」をご参照ください。
-
Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
-
プロジェクトと Logstore が作成されていること。詳細については、「プロジェクトと Logstore の作成」をご参照ください。
制限事項
-
Ververica Runtime (VVR) 11.1 以降のバージョンのみが、データインジェスト YAML の同期データソースとして SLS をサポートします。
-
SLS コネクタは at-least-once セマンティクスを保証します。
-
ソースの同時実行数をシャード数より高く設定しないでください。リソースの無駄になります。VVR 8.0.5 以前のバージョンでは、シャード数が変更されると、自動フェイルオーバーが機能しなくなり、一部のシャードが消費されないままになる可能性があります。
SLS のソーステーブルと結果テーブルの作成
Flink を使用して Log Service (SLS) からデータを消費するには、完全な SQL ジョブが必要です。完全な SQL ジョブは、ソーステーブル、結果テーブル、および処理後にデータをソースからシンクに移動するための INSERT INTO 文で構成されます。
Flink SQL ジョブの開発方法については、「SQL ジョブ開発ガイド」をご参照ください。
Realtime Compute for Apache Flink は、Log Service (SLS) からのリアルタイムデータをストリーミング入力として使用できます。たとえば、次のログ内容を考えます。
__source__: 11.85.*.199
__tag__:__receive_time__: 1562125591
__topic__: test-topic
request_method: GET
status: 200
例
次の例は、Log Service (SLS) からデータを消費する SQL ジョブを示しています。
SQL ステートメント内のテーブル名、列名、またはキーワードが予約語と競合する場合は、バックティック (`) で囲んでください。
CREATE TEMPORARY TABLE sls_input(
request_method STRING,
status BIGINT,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
request_method STRING,
status BIGINT,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
request_method,
status,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
WITH パラメーター
-
共通パラメーター
パラメーター
説明
データの型
必須
デフォルト値
注意事項
connector
テーブルタイプ。
String
はい
なし
固定値:sls。
endPoint
エンドポイントアドレス。
String
はい
なし
SLS の内部エンドポイントを使用してください。詳細については、「サービスエンドポイント」をご参照ください。
説明-
Realtime Compute for Apache Flink は、デフォルトではパブリックネットワークアクセスをサポートしていません。ただし、Alibaba Cloud NAT Gateway を使用すると、VPC ネットワークとパブリックネットワーク間の通信が可能になります。詳細については、「パブリックネットワークへのアクセス方法」をご参照ください。
-
パブリックネットワーク経由での SLS へのアクセスは避けてください。必要な場合は、HTTPS を使用し、SLS の Global Accelerator (GA) を有効にしてください。詳細については、「転送アクセラレーションの管理」をご参照ください。
project
SLS プロジェクト名。
String
はい
なし
なし。
logStore
SLS の Logstore または Metricstore 名。
String
はい
なし
Logstore と Metricstore は同じ消費方法を使用します。
accessId
ご利用の Alibaba Cloud アカウントの AccessKey ID。
String
はい
なし
詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。
重要AccessKey 情報の漏洩を避けるため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。
accessKey
ご利用の Alibaba Cloud アカウントの AccessKey Secret。
String
はい
なし
-
-
ソース固有のパラメーター
パラメーター
説明
データの型
必須
デフォルト値
注意事項
enableNewSource
新しい FLIP-27 ベースのソースを使用するかどうか。
ブール値
いいえ
false
新しいソースは、シャードの変更に自動的に適応し、すべてのソースタスクにシャードを均等に分散します。
重要-
このパラメーターは VVR 8.0.9 以降でサポートされています。VVR 11.1 から、デフォルト値は true です。
-
このパラメーターを変更すると、ジョブは以前の状態から再開できません。過去のオフセットから消費を再開するには、まず `consumerGroup` パラメーターを指定してジョブを開始し、SLS コンシューマーグループに消費の進行状況を記録します。次に、`consumeFromCheckpoint` を true に設定し、状態なしでジョブを再起動します。
-
SLS に読み取り専用シャードが存在する場合、一部の Flink タスクが読み取り専用シャードを消費した後に未処理のシャードを要求し続けることがあり、シャードの不均等な分散を引き起こし、消費効率を低下させる可能性があります。これを軽減するには、同時実行数を調整するか、タスクのスケジューリングを最適化するか、小さなシャードをマージします。
shardDiscoveryIntervalMs
シャード変更検出の間隔 (ミリ秒単位)。
Long
いいえ
60000
動的検出を無効にするには、負の値を設定します。
説明-
この値は 60000 ms (1 分) 以上である必要があります。
-
このパラメーターは `enableNewSource` が true の場合にのみ有効です。
-
このパラメーターは VVR 8.0.9 以降でサポートされています。
startupMode
ソーステーブルの起動モード。
String
いいえ
timestamp
-
timestamp(デフォルト):指定された時刻からログを消費します。 -
latest:最大オフセットからログを消費します。 -
earliest:最小オフセットからログを消費します。 -
consumer_group:コンシューマーグループに記録されたオフセットからログを消費します。シャードにオフセットが記録されていない場合、消費は最小オフセットから開始されます。
重要-
VVR 11.1 より前のバージョンでは `consumer_group` はサポートされていません。
consumeFromCheckpointをtrueに設定してください。消費は指定されたコンシューマーグループに記録されたオフセットから開始され、`startupMode` は効果がありません。
startTime
ログ消費の開始時刻。
String
いいえ
現在の時刻
フォーマット:
yyyy-MM-dd hh:mm:ss。「
startupMode」を「timestamp」に設定した場合にのみ有効になります。説明`startTime` と `stopTime` は、SLS の `__timestamp__` 属性ではなく、`__receive_time__` 属性に基づいています。
stopTime
ログ消費の終了時刻。
String
いいえ
なし
フォーマット:
yyyy-MM-dd hh:mm:ss。説明-
このパラメーターは、過去のログを消費する場合にのみ使用してください。過去の時刻に設定してください。未来の時刻に設定すると、新しいログがないために消費が早期に終了し、データストリームが静かに中断される可能性があります。
-
ログの消費が完了したときに Flink プログラムを終了するには、`exitAfterFinish=true` も設定してください。
consumerGroup
コンシューマーグループ名。
String
いいえ
なし
コンシューマーグループは消費の進行状況を追跡します。任意のカスタム名を使用できます。
説明同じコンシューマーグループを使用して複数のジョブ間で消費を調整することはできません。各 Flink ジョブは一意のコンシューマーグループを使用する必要があります。複数のジョブが同じコンシューマーグループを共有すると、すべてのデータを消費してしまいます。これは、Flink がパーティションの割り当てに SLS コンシューマーグループを使用しないためです。各コンシューマーはすべてのメッセージを独立して処理します。
consumeFromCheckpoint
指定されたコンシューマーグループのチェックポイントから消費を開始するかどうか。
String
いいえ
false
-
true:Flink プログラムは、指定されたコンシューマーグループのチェックポイントから消費を開始します。チェックポイントが存在しない場合、消費は `startTime` の値から開始されます。 -
false(デフォルト):指定されたコンシューマーグループに保存されているチェックポイントからログの消費を開始しません。
重要VVR 11.1 以降では、このパラメーターはサポートされていません。VVR 11.1 以降では、
startupModeをconsumer_groupに設定してください。maxRetries
SLS の読み取りに失敗した後の再試行回数。
String
いいえ
3
なし。
batchGetSize
リクエストごとに読み取るロググループの数。
String
いいえ
100
batchGetSizeの値は 1000 を超えてはなりません。超えた場合、エラーが発生します。exitAfterFinish
データ消費が完了した後に Flink プログラムが終了するかどうか。
String
いいえ
false
-
true:データ消費が完了した後に Flink プログラムが終了します。 -
false(デフォルト):データ消費が完了した後も Flink プログラムは実行を継続します。
query
重要VVR 11.3 以降非推奨。下位互換性のために以降のバージョンでもサポートされています。
SLS 消費の前処理ステートメント。
String
いいえ
なし
このパラメーターを使用して、消費前に SLS データをフィルター処理し、コストを削減し、処理速度を向上させます。
たとえば、
'query' = '*| where request_method = ''GET'''は、Flink が読み取る前に `request_method` フィールドが GET に等しいログに一致します。説明クエリには SPL 構文を使用してください。詳細については、「SPL 構文」をご参照ください。
重要-
このパラメーターは VVR 8.0.1 以降でサポートされています。
-
この機能には SLS の料金が発生します。詳細については、「料金」をご参照ください。
processor
SLS コンシューマープロセッサ。`query` と両方が存在する場合、こちらが優先されます。
String
いいえ
なし
このパラメーターを使用して、消費前に SLS データをフィルター処理し、コストを削減し、処理速度を向上させます。`query` の代わりに `processor` を使用することを推奨します。
たとえば、
'processor' = 'test-filter-processor'は、Flink がデータを読み取る前に SLS コンシューマープロセッサを適用します。説明プロセッサには SPL 構文を使用してください。詳細については、「SPL 構文」をご参照ください。コンシューマープロセッサの作成または更新方法については、「コンシューマープロセッサの管理」をご参照ください。
重要このパラメーターは VVR 11.3 以降でサポートされています。
この機能には SLS の料金が発生します。詳細については、「料金」をご参照ください。
-
-
結果テーブル専用
パラメーター
説明
データの型
必須
デフォルト値
注意事項
topicField
値が `__topic__` メタデータフィールドをオーバーライドするフィールド。ログのトピックを表します。
String
いいえ
なし
このフィールドはテーブルに存在する必要があります。
timeField
値が `__timestamp__` メタデータフィールドをオーバーライドするフィールド。ログの書き込み時刻を表します。
String
いいえ
現在の時刻
このフィールドはテーブルに存在し、INT データ型である必要があります。指定しない場合、現在の時刻が使用されます。
sourceField
値が `__source__` メタデータフィールドをオーバーライドするフィールド。ログのソース (例:ログを生成したマシンの IP アドレス) を表します。
String
いいえ
なし
このフィールドはテーブルに存在する必要があります。
partitionField
シャードルーティングに使用されるフィールド。データはこのフィールドのハッシュ値に基づいてシャードに書き込まれます。同じハッシュ値を持つレコードは同じシャードに送られます。
String
いいえ
なし
指定しない場合、各レコードは利用可能なシャードにランダムに書き込まれます。
buckets
`partitionField` が指定されたときにデータを再割り当てするためのバケット数。
String
いいえ
64
有効な値:1 から 256 までの 2 のべき乗の整数。バケット数はシャード数以上である必要があります。そうでない場合、一部のシャードはデータを受信しません。
flushIntervalMs
データ書き込みをトリガーする間隔。
String
いいえ
2000
単位:ミリ秒。
writeNullProperties
null 値を空の文字列として SLS に書き込むかどうか。
ブール値
いいえ
true
-
true(デフォルト):null 値を空の文字列として書き込みます。 -
false:書き込み中に null 値を持つフィールドをスキップします。
説明このパラメーターは VVR 8.0.6 以降でサポートされています。
-
メタデータフィールドの抽出
ログフィールドに加えて、次の 4 つのメタデータフィールドやその他のカスタムフィールドを抽出できます。
|
パラメーター |
タイプ |
説明 |
|
__source__ |
STRING METADATA VIRTUAL |
メッセージソース。 |
|
__topic__ |
STRING METADATA VIRTUAL |
メッセージトピック。 |
|
__timestamp__ |
BIGINT METADATA VIRTUAL |
ログ時間。 |
|
__tag__ |
MAP<VARCHAR, VARCHAR> METADATA VIRTUAL |
メッセージタグ。
|
メタデータフィールドを抽出するには、次の例に示すように、`CREATE TABLE` 文で `METADATA` キーワードを使用して宣言します。
create table sls_stream(
`__timestamp__` bigint METADATA,
b int,
c varchar
) with (
'connector' = 'sls',
'endpoint' ='cn-hangzhou.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
関連ドキュメント
Flink DataStream API を使用したデータ消費の方法については、「DataStream API」をご参照ください。