このトピックでは、Alibaba Cloud Flink SQL が Simple Log Service 処理言語 (SPL) 文を使用して行フィルタリングと列プルーニングを実装する方法について説明します。
背景
Realtime Compute for Apache Flink で Simple Log Service をソーステーブルとして構成すると、Simple Log Service のログストア内のデータが自動的に消費され、動的テーブルが構築されます。データ消費の開始時刻を指定できます。開始時刻以降に生成されたデータが消費されます。2 つの問題が発生する可能性があります。
Simple Log Service コネクタが不要なデータ行または列を過剰にプルし、ネットワークオーバーヘッドが発生します。
不要なデータは Realtime Compute for Apache Flink でクレンジングする必要がありますが、データクレンジングはデータ分析の焦点ではなく、追加の計算リソースを消費します。データクレンジングには、データフィルタリングとデータ射影が含まれます。
SPL は、Simple Log Service コネクタにフィルタプッシュダウンおよび射影プッシュダウン機能を提供します。Simple Log Service コネクタのクエリ文を指定してフィルタ条件のプッシュダウンを有効にするか、Simple Log Service コネクタのクエリパラメータを構成して射影フィールドのプッシュダウンを有効にすることができます。これらの設定により、完全なデータ転送または計算が回避され、データ転送と計算の効率が向上します。
ソリューション
SPL 文が指定されていない場合:Realtime Compute for Apache Flink は、すべての行と列を含む Simple Log Service の完全なログをプルします。
SPL 文が指定されている場合:行フィルタリングまたは列プルーニング設定が SPL 文に含まれている場合、Realtime Compute for Apache Flink は、行フィルタリングまたは列プルーニング後に取得された部分的なデータのみを後続の計算のためにプルします。
準備
Simple Log Service がアクティブ化されています。プロジェクトとログストアが作成されています。詳細については、「Simple Log Service のアクティブ化」および「プロジェクトとログストアの作成」をご参照ください。
このトピックでは、Server Load Balancer (SLB) のシミュレートされたレイヤー 7 アクセスログが Simple Log Service のログストアに収集されます。ログには 10 個以上のフィールドが含まれています。ランダムなシミュレートログが継続的に生成されます。ログの例:
{ "__source__": "127.0.0.1", "__tag__:__receive_time__": "1706531737", "__time__": "1706531727", "__topic__": "slb_layer7", "body_bytes_sent": "3577", "client_ip": "114.137.XXX.XXX", "host": "www.pi.mock.com", "http_host": "www.cwj.mock.com", "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0", "request_length": "1662", "request_method": "GET", "request_time": "31", "request_uri": "/request/path-0/file-3", "scheme": "https", "slbid": "slb-02", "status": "200", "upstream_addr": "42.63.XXX.XXX", "upstream_response_time": "32", "upstream_status": "200", "vip_addr": "223.18.XX.XXX" }
ログストアの slbid フィールドには 3 種類の値があります。次の文は、15 分以内に生成されたログの slbid フィールドの値のページビュー (PV) をクエリするために使用されます。結果は、
slb-01
値の PV がslb-02
値の PV と同等であることを示しています。
手順
行フィルタリング:SPL は Simple Log Service コネクタにフィルタプッシュダウン機能を提供します。Simple Log Service コネクタのクエリ文を指定することで、フィルタ条件のプッシュダウンを有効にできます。これにより、完全なデータ転送、完全なデータフィルタリング、および計算が回避されます。
列プルーニング:SPL は Simple Log Service コネクタに射影プッシュダウン機能を提供します。Simple Log Service コネクタのクエリパラメータを構成することで、射影フィールドのプッシュダウンを有効にできます。これにより、完全なデータ転送、完全なデータフィルタリング、および計算が回避されます。
行フィルタリング
手順 1:SQL ジョブを作成する
Realtime Compute for Apache Flink コンソール にログインします。管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、
を選択します。[新規] をクリックします。[新規ドラフト] ダイアログボックスで、
を選択し、[次へ] をクリックします。一時テーブルを作成するために使用される次の SQL ジョブ設定を SQL エディタにコピーします。
CREATE TEMPORARY TABLE sls_input( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = 'yourAccessKeyID', 'accessKey' = 'yourAccessKeySecret', 'starttime' = '2025-02-19 00:00:00', 'project' ='test-project', 'logstore' ='clb-access-log', 'query' = '* | where slbid = ''slb-01''' );
次の表は、SQL ジョブのパラメータについて説明しています。
パラメータ
説明
例
connector
Simple Log Service コネクタ。詳細については、「サポートされているコネクタ」をご参照ください。
sls
endpoint
内部 Simple Log Service エンドポイント。詳細については、「エンドポイント」をご参照ください。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
AccessKey ID。詳細については、「AccessKey ペアの作成」をご参照ください。
yourAccessKeyID
accessKey
AccessKey シークレット。詳細については、「AccessKey ペアの作成」をご参照ください。
yourAccessKeySecret
starttime
クエリの開始時刻。
2025-02-19 00:00:00
project
Simple Log Service プロジェクトの名前。
test-project
logstore
Simple Log Service ログストアの名前。
clb-access-log
query
SPL 文。Realtime Compute for Apache Flink の SQL ジョブの開発では、文字列は単一引用符 ('') を使用してエスケープする必要があることに注意してください。
* | where slbid = ''slb-01''
SQL ジョブ設定を選択して右クリックします。表示されるポップオーバーで、[実行] をクリックして Simple Log Service に接続します。
手順 2:連続クエリと結果
SQL ジョブに次の分析文を入力し、slbid で集計クエリを実行します。
SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid;
ページの右上隅にある [デバッグ] アイコンをクリックします。デバッグダイアログボックスで、[セッションクラスタ] ドロップダウンリストから [新しいセッションクラスタを作成] を選択し、次の図に基づいてパラメータを構成します。
デバッグドロップダウンリストで、作成したセッションクラスタを選択し、[OK] をクリックします。
[結果] セクションには、[slbid] フィールドに値
slb-01
が常に表示されます。結果は、SPL 文を指定した後、sls_input には slbid='slb-01' に一致するデータのみが含まれることを示しています。
列プルーニング
手順 1:SQL ジョブを作成する
Realtime Compute for Apache Flink コンソール にログインします。管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、
を選択します。[新規] をクリックします。[新規ドラフト] ダイアログボックスで、
を選択し、[次へ] をクリックします。一時テーブルを作成するために使用される次の SQL ジョブ設定を SQL エディタにコピーします。行フィルタリングとは異なり、行フィルタリング設定に基づいて射影文がクエリパラメータに追加されます。縦棒 (
|
) は命令を分割するために使用されます。前の命令の出力は、次の命令の入力として使用されます。最後の命令の出力は、パイプライン全体の出力を示します。このようにして、特定のフィールドの内容のみが Simple Log Service サーバーからプルされます。CREATE TEMPORARY TABLE sls_input_project( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = 'yourAccessKeyID', 'accessKey' = 'yourAccessKeySecret', 'starttime' = '2025-02-19 00:00:00', 'project' ='test-project', 'logstore' ='clb-access-log', 'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"' );
次の表は、SQL ジョブのパラメータについて説明しています。
パラメータ
説明
例
connector
Simple Log Service コネクタ。詳細については、「サポートされているコネクタ」をご参照ください。
sls
endpoint
内部 Simple Log Service エンドポイント。詳細については、「エンドポイント」をご参照ください。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
AccessKey ID。詳細については、「AccessKey ペアの作成」をご参照ください。
yourAccessKeyID
accessKey
AccessKey シークレット。詳細については、「AccessKey ペアの作成」をご参照ください。
yourAccessKeySecret
starttime
クエリの開始時刻。
2025-02-19 00:00:00
project
Simple Log Service プロジェクトの名前。
test-project
logstore
Simple Log Service ログストアの名前。
clb-access-log
query
SPL 文。Realtime Compute for Apache Flink の SQL ジョブの開発では、文字列は単一引用符 ('') を使用してエスケープする必要があることに注意してください。
* | where slbid = ''slb-01''
SQL ジョブ設定を選択して右クリックします。表示されるポップオーバーで、[実行] をクリックして Simple Log Service に接続します。
手順 2:連続クエリと結果
SQL ジョブに次の分析文を入力し、slbid で集計クエリを実行します。
SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid;
ページの右上隅にある [デバッグ] アイコンをクリックします。デバッグダイアログボックスで、[セッションクラスタ] ドロップダウンリストから [新しいセッションクラスタを作成] を選択し、次の図に基づいてパラメータを構成します。
デバッグドロップダウンリストで、作成したセッションクラスタを選択し、[OK] をクリックします。
[結果] セクションには、行フィルタリングが実行された SQL ジョブの結果と同様の結果が表示されます。
説明注:行フィルタリングが実行されたジョブはすべてのフィールドを返します。ただし、Simple Log Service コネクタは現在の SPL 文に基づいて特定のフィールドのみを返し、データのネットワーク転送をさらに削減します。