すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Flink SQL:SPL を使用した行フィルタリングと列のプルーニング

最終更新日:Jun 09, 2026

SPL を使用すると、データが Flink に到達する前に Simple Log Service (SLS) サーバー側で行フィルタリングと列のプルーニングを実行することで、ネットワークオーバーヘッドとコンピューティングコストを削減できます。

背景情報

Realtime Compute for Apache Flink で SLS をソーステーブルとして使用する場合、コネクタは指定した開始時刻以降の Logstore レコードをすべて消費します。これにより、次の 2 つの問題が発生します。

  1. コネクタがソースから過剰な行または列を取得し、ネットワークオーバーヘッドが増加します。

  2. Flink で不要なデータをフィルタリングするためにコンピューティングリソースが無駄になります。

SPL は、SLS コネクタ用のフィルタープッシュダウンとプロジェクションプッシュダウンによってこれらの問題に対処します。query パラメータを設定してフィルター条件と射影フィールドをサーバー側にプッシュし、データセット全体の転送と計算を回避します。

仕組み

  • SPL ステートメントなし:Flink は計算のために SLS からすべての行と列を取得します。

    image
  • SPL ステートメントあり:SPL ステートメントに行フィルタリングまたは列のプルーニングが含まれている場合、Flink は一致するサブセットのみを取得します。

    image

前提条件

  • SLS を有効化し、プロジェクトと Logstore を作成済みであること。

  • この例では、疑似的なレイヤー 7 SLB ログを含む Logstore を使用します。データには 10 個を超えるフィールドが含まれ、継続的に生成されます。ログエントリの例:

    {
      "__source__": "127.0.0.1",
      "__tag__:__receive_time__": "1706531737",
      "__time__": "1706531727",
      "__topic__": "slb_layer7",
      "body_bytes_sent": "3577",
      "client_ip": "114.137.XXX.XXX",
      "host": "www.pi.mock.com",
      "http_host": "www.cwj.mock.com",
      "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0",
      "request_length": "1662",
      "request_method": "GET",
      "request_time": "31",
      "request_uri": "/request/path-0/file-3",
      "scheme": "https",
      "slbid": "slb-02",
      "status": "200",
      "upstream_addr": "42.63.XXX.XXX",
      "upstream_response_time": "32",
      "upstream_status": "200",
      "vip_addr": "223.18.XX.XXX"
    }
  • slbid フィールドには 3 つの異なる値があります。15 分間のクエリでは、slb-01slb-02 の件数が同程度であることが分かります。

    クエリエディターで * | select slbid, count(1) cnt group by slbid を実行します。結果:slb-02 は 2,001 件、slb-01 は 1,986 件、lb-uf6qfcvsrouodhvw39oog は 4,144 件です。

操作手順

行フィルタリングquery パラメータを使用してフィルター条件を SLS サーバーにプッシュし、データセット全体の転送を防ぎます。

列のプルーニングquery パラメータを使用して射影フィールドを SLS サーバーにプッシュし、指定した列のみを返します。

行フィルタリング

手順 1:SQL ジョブの作成

  1. Realtime Compute for Apache Flink コンソールにログインし、対象のワークスペースをクリックします。

  2. 左側のナビゲーションペインで、Data Development > [ETL] を選択します。

  3. 作成 をクリックします。[新規下書き] ダイアログボックスで、[SQL スクリプト] > [空白のストリームジョブの下書き] を選択し、次へ をクリックします。

  4. 次の一時テーブル SQL をエディターにコピーします。

    CREATE TEMPORARY TABLE sls_input(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'yourAccessKeyID',
      'accessKey' = 'yourAccessKeySecret',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'''
    );

    パラメータ:

    パラメータ

    説明

    connector

    コネクタのタイプです。詳細については、「サポートされているコネクタ」をご参照ください。

    sls

    endpoint

    SLS の内部エンドポイントです。詳細については、「エンドポイント」をご参照ください。

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    アクセスキー ID です。詳細については、「アクセスキーを作成」をご参照ください。

    LTAI**************

    accessKey

    アクセスキーシークレットです。詳細については、「アクセスキーを作成」をご参照ください。

    yourAccessKeySecret

    starttime

    ログ消費の開始時刻です。

    2025-02-19 00:00:00

    project

    SLS プロジェクト名です。

    test-project

    logstore

    SLS Logstore 名です。

    clb-access-log

    query

    SPL ステートメントです。Flink SQL では、文字列リテラルを単一引用符 2 つ ('') でエスケープします。

    * | where slbid = ''slb-01''

  5. SQL ステートメントを選択し、右クリックして 実行 をクリックし、SLS に接続します。

    CREATE TEMPORARY TABLE sls_input(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
      __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' = 'cn-haxxx',
      'accessId' = 'xxx',
      'accessKey' = 'xxx',
      'starttime' = '202xxx',
      'project' = 'test-pxxx',
      'logstore' = 'clb-axxx',
      'query' = '*|whexxx'
    );

手順 2:クエリの実行と結果の確認

  1. slbid フィールドに対して次の集計クエリを実行します。

    SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid;
  2. 右上の [Debug] をクリックします。ダイアログボックスで、[Session Cluster] ドロップダウンリストから [Create new session cluster] を選択し、次のように設定します。

    [Name]demo-test[Deployment Target]default-queue[Status]RUNNING[Engine Version]vvr-8.0.11-flink-1.17 に設定し、[Create Session Cluster] をクリックします。

  3. デバッグダイアログボックスで、作成したセッションクラスターを選択し、OK をクリックします。

    注:SLS ソーステーブルを使用してデバッグすると、コンシューマーグループのオフセットが進みます。デプロイ済みのジョブは、この新しいオフセットから再開します。

  4. 結果では slbid が常に slb-01 となることから、sls_input にフィルター条件に一致する行のみが含まれていることが確認できます。

    slb_cnt の値は 185 です。

列のプルーニング

手順 1:SQL ジョブの作成

  1. Realtime Compute for Apache Flink コンソールにログインし、対象のワークスペースをクリックします。

  2. 左側のナビゲーションペインで、Data Development > [ETL] を選択します。

  3. 作成 をクリックします。[新規下書き] ダイアログボックスで、[SQL スクリプト] > [空白のストリームジョブの下書き] を選択し、次へ をクリックします。

  4. 次の一時テーブル SQL をエディターにコピーします。行フィルタリングと比べて、query パラメータに射影ステートメントが追加されています。コマンドは | (Unix のパイプのようなもの) で連結され、射影されたフィールドのみが SLS サーバーから取得されます。

    CREATE TEMPORARY TABLE sls_input_project(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'yourAccessKeyID',
      'accessKey' = 'yourAccessKeySecret',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, __timestamp__, "__tag__:__receive_time__"'
    );

    パラメータ:

    パラメータ

    説明

    connector

    コネクタのタイプです。詳細については、「サポートされているコネクタ」をご参照ください。

    sls

    endpoint

    SLS の内部エンドポイントです。詳細については、「エンドポイント」をご参照ください。

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    アクセスキー ID です。詳細については、「アクセスキーを作成」をご参照ください。

    LTAI**************

    accessKey

    アクセスキーシークレットです。詳細については、「アクセスキーを作成」をご参照ください。

    yourAccessKeySecret

    starttime

    ログ消費の開始時刻です。

    2025-02-19 00:00:00

    project

    SLS プロジェクト名です。

    test-project

    logstore

    SLS Logstore 名です。

    clb-access-log

    query

    SPL ステートメントです。Flink SQL では、文字列リテラルを単一引用符 2 つ ('') でエスケープします。

    * | where slbid = ''slb-01''

  5. SQL ステートメントを選択し、右クリックして 実行 をクリックし、SLS に接続します。

手順 2:クエリの実行と結果の確認

  1. slbid フィールドに対して次の集計クエリを実行します。

    SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid;
  2. 右上の [Debug] をクリックします。ダイアログボックスで、[Session Cluster] ドロップダウンリストから [Create new session cluster] を選択し、次のように設定します。

    [Name]demo-test[Deployment Target]default-queue[Status]RUNNING[Engine Version]vvr-8.0.11-flink-1.17 に設定し、[Create Session Cluster] をクリックします。

  3. デバッグダイアログボックスで、作成したセッションクラスターを選択し、OK をクリックします。

    注:SLS ソーステーブルを使用してデバッグすると、コンシューマーグループのオフセットが進みます。デプロイ済みのジョブは、この新しいオフセットから再開します。

  4. 結果は、行フィルタリングのユースケースと同様です。

    説明

    行フィルタリングとは異なり、列のプルーニングでは指定したフィールドのみが返されるため、ネットワークトラフィックをさらに削減できます。

    slb_cnt の値は 185 です。