SPL を使用すると、データが Flink に到達する前に Simple Log Service (SLS) サーバー側で行フィルタリングと列のプルーニングを実行することで、ネットワークオーバーヘッドとコンピューティングコストを削減できます。
背景情報
Realtime Compute for Apache Flink で SLS をソーステーブルとして使用する場合、コネクタは指定した開始時刻以降の Logstore レコードをすべて消費します。これにより、次の 2 つの問題が発生します。
-
コネクタがソースから過剰な行または列を取得し、ネットワークオーバーヘッドが増加します。
-
Flink で不要なデータをフィルタリングするためにコンピューティングリソースが無駄になります。
SPL は、SLS コネクタ用のフィルタープッシュダウンとプロジェクションプッシュダウンによってこれらの問題に対処します。query パラメータを設定してフィルター条件と射影フィールドをサーバー側にプッシュし、データセット全体の転送と計算を回避します。
仕組み
-
SPL ステートメントなし:Flink は計算のために SLS からすべての行と列を取得します。
-
SPL ステートメントあり:SPL ステートメントに行フィルタリングまたは列のプルーニングが含まれている場合、Flink は一致するサブセットのみを取得します。
前提条件
-
SLS を有効化し、プロジェクトと Logstore を作成済みであること。
-
この例では、疑似的なレイヤー 7 SLB ログを含む Logstore を使用します。データには 10 個を超えるフィールドが含まれ、継続的に生成されます。ログエントリの例:
{ "__source__": "127.0.0.1", "__tag__:__receive_time__": "1706531737", "__time__": "1706531727", "__topic__": "slb_layer7", "body_bytes_sent": "3577", "client_ip": "114.137.XXX.XXX", "host": "www.pi.mock.com", "http_host": "www.cwj.mock.com", "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0", "request_length": "1662", "request_method": "GET", "request_time": "31", "request_uri": "/request/path-0/file-3", "scheme": "https", "slbid": "slb-02", "status": "200", "upstream_addr": "42.63.XXX.XXX", "upstream_response_time": "32", "upstream_status": "200", "vip_addr": "223.18.XX.XXX" } -
slbidフィールドには 3 つの異なる値があります。15 分間のクエリでは、slb-01とslb-02の件数が同程度であることが分かります。クエリエディターで
* | select slbid, count(1) cnt group by slbidを実行します。結果:slb-02は 2,001 件、slb-01は 1,986 件、lb-uf6qfcvsrouodhvw39oogは 4,144 件です。
操作手順
行フィルタリング:query パラメータを使用してフィルター条件を SLS サーバーにプッシュし、データセット全体の転送を防ぎます。
列のプルーニング:query パラメータを使用して射影フィールドを SLS サーバーにプッシュし、指定した列のみを返します。
行フィルタリング
手順 1:SQL ジョブの作成
-
Realtime Compute for Apache Flink コンソールにログインし、対象のワークスペースをクリックします。
-
左側のナビゲーションペインで、 を選択します。
-
作成 をクリックします。[新規下書き] ダイアログボックスで、 を選択し、次へ をクリックします。
-
次の一時テーブル SQL をエディターにコピーします。
CREATE TEMPORARY TABLE sls_input( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = 'yourAccessKeyID', 'accessKey' = 'yourAccessKeySecret', 'starttime' = '2025-02-19 00:00:00', 'project' ='test-project', 'logstore' ='clb-access-log', 'query' = '* | where slbid = ''slb-01''' );パラメータ:
パラメータ
説明
例
connector
コネクタのタイプです。詳細については、「サポートされているコネクタ」をご参照ください。
sls
endpoint
SLS の内部エンドポイントです。詳細については、「エンドポイント」をご参照ください。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
アクセスキー ID です。詳細については、「アクセスキーを作成」をご参照ください。
LTAI**************
accessKey
アクセスキーシークレットです。詳細については、「アクセスキーを作成」をご参照ください。
yourAccessKeySecret
starttime
ログ消費の開始時刻です。
2025-02-19 00:00:00
project
SLS プロジェクト名です。
test-project
logstore
SLS Logstore 名です。
clb-access-log
query
SPL ステートメントです。Flink SQL では、文字列リテラルを単一引用符 2 つ (
'') でエスケープします。* | where slbid = ''slb-01''
-
SQL ステートメントを選択し、右クリックして 実行 をクリックし、SLS に接続します。
CREATE TEMPORARY TABLE sls_input( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' = 'cn-haxxx', 'accessId' = 'xxx', 'accessKey' = 'xxx', 'starttime' = '202xxx', 'project' = 'test-pxxx', 'logstore' = 'clb-axxx', 'query' = '*|whexxx' );
手順 2:クエリの実行と結果の確認
-
slbidフィールドに対して次の集計クエリを実行します。SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid; -
右上の [Debug] をクリックします。ダイアログボックスで、[Session Cluster] ドロップダウンリストから [Create new session cluster] を選択し、次のように設定します。
[Name] を
demo-test、[Deployment Target] をdefault-queue、[Status] を RUNNING、[Engine Version] をvvr-8.0.11-flink-1.17に設定し、[Create Session Cluster] をクリックします。 -
デバッグダイアログボックスで、作成したセッションクラスターを選択し、OK をクリックします。
注:SLS ソーステーブルを使用してデバッグすると、コンシューマーグループのオフセットが進みます。デプロイ済みのジョブは、この新しいオフセットから再開します。
-
結果では slbid が常に
slb-01となることから、sls_inputにフィルター条件に一致する行のみが含まれていることが確認できます。slb_cnt の値は
185です。
列のプルーニング
手順 1:SQL ジョブの作成
-
Realtime Compute for Apache Flink コンソールにログインし、対象のワークスペースをクリックします。
-
左側のナビゲーションペインで、 を選択します。
-
作成 をクリックします。[新規下書き] ダイアログボックスで、 を選択し、次へ をクリックします。
-
次の一時テーブル SQL をエディターにコピーします。行フィルタリングと比べて、
queryパラメータに射影ステートメントが追加されています。コマンドは|(Unix のパイプのようなもの) で連結され、射影されたフィールドのみが SLS サーバーから取得されます。CREATE TEMPORARY TABLE sls_input_project( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = 'yourAccessKeyID', 'accessKey' = 'yourAccessKeySecret', 'starttime' = '2025-02-19 00:00:00', 'project' ='test-project', 'logstore' ='clb-access-log', 'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, __timestamp__, "__tag__:__receive_time__"' );パラメータ:
パラメータ
説明
例
connector
コネクタのタイプです。詳細については、「サポートされているコネクタ」をご参照ください。
sls
endpoint
SLS の内部エンドポイントです。詳細については、「エンドポイント」をご参照ください。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
アクセスキー ID です。詳細については、「アクセスキーを作成」をご参照ください。
LTAI**************
accessKey
アクセスキーシークレットです。詳細については、「アクセスキーを作成」をご参照ください。
yourAccessKeySecret
starttime
ログ消費の開始時刻です。
2025-02-19 00:00:00
project
SLS プロジェクト名です。
test-project
logstore
SLS Logstore 名です。
clb-access-log
query
SPL ステートメントです。Flink SQL では、文字列リテラルを単一引用符 2 つ (
'') でエスケープします。* | where slbid = ''slb-01''
-
SQL ステートメントを選択し、右クリックして 実行 をクリックし、SLS に接続します。
手順 2:クエリの実行と結果の確認
-
slbidフィールドに対して次の集計クエリを実行します。SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid; -
右上の [Debug] をクリックします。ダイアログボックスで、[Session Cluster] ドロップダウンリストから [Create new session cluster] を選択し、次のように設定します。
[Name] を
demo-test、[Deployment Target] をdefault-queue、[Status] を RUNNING、[Engine Version] をvvr-8.0.11-flink-1.17に設定し、[Create Session Cluster] をクリックします。 -
デバッグダイアログボックスで、作成したセッションクラスターを選択し、OK をクリックします。
注:SLS ソーステーブルを使用してデバッグすると、コンシューマーグループのオフセットが進みます。デプロイ済みのジョブは、この新しいオフセットから再開します。
-
結果は、行フィルタリングのユースケースと同様です。
説明行フィルタリングとは異なり、列のプルーニングでは指定したフィールドのみが返されるため、ネットワークトラフィックをさらに削減できます。
slb_cnt の値は
185です。