すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:SPL 文を使用した Flink SQL による行フィルタリングと列プルーニングの実装

最終更新日:Apr 02, 2025

このトピックでは、Alibaba Cloud Flink SQL が Simple Log Service 処理言語 (SPL) 文を使用して行フィルタリングと列プルーニングを実装する方法について説明します。

背景

Realtime Compute for Apache Flink で Simple Log Service をソーステーブルとして構成すると、Simple Log Service のログストア内のデータが自動的に消費され、動的テーブルが構築されます。データ消費の開始時刻を指定できます。開始時刻以降に生成されたデータが消費されます。2 つの問題が発生する可能性があります。

  1. Simple Log Service コネクタが不要なデータ行または列を過剰にプルし、ネットワークオーバーヘッドが発生します。

  2. 不要なデータは Realtime Compute for Apache Flink でクレンジングする必要がありますが、データクレンジングはデータ分析の焦点ではなく、追加の計算リソースを消費します。データクレンジングには、データフィルタリングとデータ射影が含まれます。

SPL は、Simple Log Service コネクタにフィルタプッシュダウンおよび射影プッシュダウン機能を提供します。Simple Log Service コネクタのクエリ文を指定してフィルタ条件のプッシュダウンを有効にするか、Simple Log Service コネクタのクエリパラメータを構成して射影フィールドのプッシュダウンを有効にすることができます。これらの設定により、完全なデータ転送または計算が回避され、データ転送と計算の効率が向上します。

ソリューション

  • SPL 文が指定されていない場合:Realtime Compute for Apache Flink は、すべての行と列を含む Simple Log Service の完全なログをプルします。

  • SPL 文が指定されている場合:行フィルタリングまたは列プルーニング設定が SPL 文に含まれている場合、Realtime Compute for Apache Flink は、行フィルタリングまたは列プルーニング後に取得された部分的なデータのみを後続の計算のためにプルします。

準備

  • Simple Log Service がアクティブ化されています。プロジェクトとログストアが作成されています。詳細については、「Simple Log Service のアクティブ化」および「プロジェクトとログストアの作成」をご参照ください。

  • このトピックでは、Server Load Balancer (SLB) のシミュレートされたレイヤー 7 アクセスログが Simple Log Service のログストアに収集されます。ログには 10 個以上のフィールドが含まれています。ランダムなシミュレートログが継続的に生成されます。ログの例:

    {
      "__source__": "127.0.0.1",
      "__tag__:__receive_time__": "1706531737",
      "__time__": "1706531727",
      "__topic__": "slb_layer7",
      "body_bytes_sent": "3577",
      "client_ip": "114.137.XXX.XXX",
      "host": "www.pi.mock.com",
      "http_host": "www.cwj.mock.com",
      "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0",
      "request_length": "1662",
      "request_method": "GET",
      "request_time": "31",
      "request_uri": "/request/path-0/file-3",
      "scheme": "https",
      "slbid": "slb-02",
      "status": "200",
      "upstream_addr": "42.63.XXX.XXX",
      "upstream_response_time": "32",
      "upstream_status": "200",
      "vip_addr": "223.18.XX.XXX"
    }
  • ログストアの slbid フィールドには 3 種類の値があります。次の文は、15 分以内に生成されたログの slbid フィールドの値のページビュー (PV) をクエリするために使用されます。結果は、slb-01 値の PV が slb-02 値の PV と同等であることを示しています。

    image

手順

行フィルタリング:SPL は Simple Log Service コネクタにフィルタプッシュダウン機能を提供します。Simple Log Service コネクタのクエリ文を指定することで、フィルタ条件のプッシュダウンを有効にできます。これにより、完全なデータ転送、完全なデータフィルタリング、および計算が回避されます。

列プルーニング:SPL は Simple Log Service コネクタに射影プッシュダウン機能を提供します。Simple Log Service コネクタのクエリパラメータを構成することで、射影フィールドのプッシュダウンを有効にできます。これにより、完全なデータ転送、完全なデータフィルタリング、および計算が回避されます。

行フィルタリング

手順 1:SQL ジョブを作成する

  1. Realtime Compute for Apache Flink コンソール にログインします。管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

  2. 左側のナビゲーションウィンドウで、[開発] > [ETL] を選択します。

  3. [新規] をクリックします。[新規ドラフト] ダイアログボックスで、[SQL スクリプト] > [空のストリームドラフト] を選択し、[次へ] をクリックします。

    image

  4. 一時テーブルを作成するために使用される次の SQL ジョブ設定を SQL エディタにコピーします。

    CREATE TEMPORARY TABLE sls_input(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'yourAccessKeyID',
      'accessKey' = 'yourAccessKeySecret',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'''
    );

    次の表は、SQL ジョブのパラメータについて説明しています。

    パラメータ

    説明

    connector

    Simple Log Service コネクタ。詳細については、「サポートされているコネクタ」をご参照ください。

    sls

    endpoint

    内部 Simple Log Service エンドポイント。詳細については、「エンドポイント」をご参照ください。

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    AccessKey ID。詳細については、「AccessKey ペアの作成」をご参照ください。

    yourAccessKeyID

    accessKey

    AccessKey シークレット。詳細については、「AccessKey ペアの作成」をご参照ください。

    yourAccessKeySecret

    starttime

    クエリの開始時刻。

    2025-02-19 00:00:00

    project

    Simple Log Service プロジェクトの名前。

    test-project

    logstore

    Simple Log Service ログストアの名前。

    clb-access-log

    query

    SPL 文。Realtime Compute for Apache Flink の SQL ジョブの開発では、文字列は単一引用符 ('') を使用してエスケープする必要があることに注意してください。

    * | where slbid = ''slb-01''

  5. SQL ジョブ設定を選択して右クリックします。表示されるポップオーバーで、[実行] をクリックして Simple Log Service に接続します。

    image

手順 2:連続クエリと結果

  1. SQL ジョブに次の分析文を入力し、slbid で集計クエリを実行します。

    SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid;
  2. ページの右上隅にある [デバッグ] アイコンをクリックします。デバッグダイアログボックスで、[セッションクラスタ] ドロップダウンリストから [新しいセッションクラスタを作成] を選択し、次の図に基づいてパラメータを構成します。

    image

  3. デバッグドロップダウンリストで、作成したセッションクラスタを選択し、[OK] をクリックします。

    image

  4. [結果] セクションには、[slbid] フィールドに値 slb-01 が常に表示されます。結果は、SPL 文を指定した後、sls_input には slbid='slb-01' に一致するデータのみが含まれることを示しています。

    image

列プルーニング

手順 1:SQL ジョブを作成する

  1. Realtime Compute for Apache Flink コンソール にログインします。管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

  2. 左側のナビゲーションウィンドウで、[開発] > [ETL] を選択します。

  3. [新規] をクリックします。[新規ドラフト] ダイアログボックスで、[SQL スクリプト] > [空のストリームドラフト] を選択し、[次へ] をクリックします。

    image

  4. 一時テーブルを作成するために使用される次の SQL ジョブ設定を SQL エディタにコピーします。行フィルタリングとは異なり、行フィルタリング設定に基づいて射影文がクエリパラメータに追加されます。縦棒 (|) は命令を分割するために使用されます。前の命令の出力は、次の命令の入力として使用されます。最後の命令の出力は、パイプライン全体の出力を示します。このようにして、特定のフィールドの内容のみが Simple Log Service サーバーからプルされます。

    CREATE TEMPORARY TABLE sls_input_project(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'yourAccessKeyID',
      'accessKey' = 'yourAccessKeySecret',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"'
    );

    次の表は、SQL ジョブのパラメータについて説明しています。

    パラメータ

    説明

    connector

    Simple Log Service コネクタ。詳細については、「サポートされているコネクタ」をご参照ください。

    sls

    endpoint

    内部 Simple Log Service エンドポイント。詳細については、「エンドポイント」をご参照ください。

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    AccessKey ID。詳細については、「AccessKey ペアの作成」をご参照ください。

    yourAccessKeyID

    accessKey

    AccessKey シークレット。詳細については、「AccessKey ペアの作成」をご参照ください。

    yourAccessKeySecret

    starttime

    クエリの開始時刻。

    2025-02-19 00:00:00

    project

    Simple Log Service プロジェクトの名前。

    test-project

    logstore

    Simple Log Service ログストアの名前。

    clb-access-log

    query

    SPL 文。Realtime Compute for Apache Flink の SQL ジョブの開発では、文字列は単一引用符 ('') を使用してエスケープする必要があることに注意してください。

    * | where slbid = ''slb-01''

  5. SQL ジョブ設定を選択して右クリックします。表示されるポップオーバーで、[実行] をクリックして Simple Log Service に接続します。

    image

手順 2:連続クエリと結果

  1. SQL ジョブに次の分析文を入力し、slbid で集計クエリを実行します。

    SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid;
  2. ページの右上隅にある [デバッグ] アイコンをクリックします。デバッグダイアログボックスで、[セッションクラスタ] ドロップダウンリストから [新しいセッションクラスタを作成] を選択し、次の図に基づいてパラメータを構成します。

    image

  3. デバッグドロップダウンリストで、作成したセッションクラスタを選択し、[OK] をクリックします。

    image

  4. [結果] セクションには、行フィルタリングが実行された SQL ジョブの結果と同様の結果が表示されます。

    説明

    注:行フィルタリングが実行されたジョブはすべてのフィールドを返します。ただし、Simple Log Service コネクタは現在の SPL 文に基づいて特定のフィールドのみを返し、データのネットワーク転送をさらに削減します。

    image