すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Realtime Compute for Apache Flink を使用したデータ消費

最終更新日:Mar 26, 2026

Realtime Compute for Apache Flink は、Log Service (SLS) のソーステーブルを作成することで、SLS から直接データを消費できます。このトピックでは、SLS ソーステーブルの作成方法とメタデータフィールドの抽出方法について説明します。

背景情報

次の表は、Flink による Log Service (SLS) データの消費に関するサポート状況を説明しています。

カテゴリ

詳細

サポートタイプ

ソーステーブルと結果テーブル。

実行モード

ストリーミングモードのみがサポートされています。

特定の監視メトリクス

該当なし。

データフォーマット

なし。

API タイプ

SQL。

結果テーブル内のデータの更新または削除

結果テーブル内のデータは更新または削除できません。挿入のみがサポートされています。

Flink を使用したログの消費を開始するには、「Flink SQL ジョブのクイックスタート」をご参照ください。

前提条件

  • RAM ユーザーまたは RAM ロールを使用する場合、Flink コンソールを使用するために必要な権限があることを確認してください。詳細については、「権限管理」をご参照ください。

  • Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。

  • プロジェクトと Logstore が作成されていること。詳細については、「プロジェクトと Logstore の作成」をご参照ください。

制限事項

  • Ververica Runtime (VVR) 11.1 以降のバージョンのみが、データインジェスト YAML の同期データソースとして SLS をサポートします。

  • SLS コネクタは at-least-once セマンティクスを保証します。

  • ソースの同時実行数をシャード数より高く設定しないでください。リソースの無駄になります。VVR 8.0.5 以前のバージョンでは、シャード数が変更されると、自動フェイルオーバーが機能しなくなり、一部のシャードが消費されないままになる可能性があります。

SLS のソーステーブルと結果テーブルの作成

重要

Flink を使用して Log Service (SLS) からデータを消費するには、完全な SQL ジョブが必要です。完全な SQL ジョブは、ソーステーブル、結果テーブル、および処理後にデータをソースからシンクに移動するための INSERT INTO 文で構成されます。

Flink SQL ジョブの開発方法については、「SQL ジョブ開発ガイド」をご参照ください。

Realtime Compute for Apache Flink は、Log Service (SLS) からのリアルタイムデータをストリーミング入力として使用できます。たとえば、次のログ内容を考えます。

__source__:  11.85.*.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
request_method:  GET
status:  200

次の例は、Log Service (SLS) からデータを消費する SQL ジョブを示しています。

重要

SQL ステートメント内のテーブル名、列名、またはキーワードが予約語と競合する場合は、バックティック (`) で囲んでください。

CREATE TEMPORARY TABLE sls_input(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
  request_method,
  status,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

WITH パラメーター

  • 共通パラメーター

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    注意事項

    connector

    テーブルタイプ。

    String

    はい

    なし

    固定値:sls。

    endPoint

    エンドポイントアドレス。

    String

    はい

    なし

    SLS の内部エンドポイントを使用してください。詳細については、「サービスエンドポイント」をご参照ください。

    説明
    • Realtime Compute for Apache Flink は、デフォルトではパブリックネットワークアクセスをサポートしていません。ただし、Alibaba Cloud NAT Gateway を使用すると、VPC ネットワークとパブリックネットワーク間の通信が可能になります。詳細については、「パブリックネットワークへのアクセス方法」をご参照ください。

    • パブリックネットワーク経由での SLS へのアクセスは避けてください。必要な場合は、HTTPS を使用し、SLS の Global Accelerator (GA) を有効にしてください。詳細については、「転送アクセラレーションの管理」をご参照ください。

    project

    SLS プロジェクト名。

    String

    はい

    なし

    なし。

    logStore

    SLS の Logstore または Metricstore 名。

    String

    はい

    なし

    Logstore と Metricstore は同じ消費方法を使用します。

    accessId

    ご利用の Alibaba Cloud アカウントの AccessKey ID。

    String

    はい

    なし

    詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。

    重要

    AccessKey 情報の漏洩を避けるため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。

    accessKey

    ご利用の Alibaba Cloud アカウントの AccessKey Secret。

    String

    はい

    なし

  • ソース固有のパラメーター

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    注意事項

    enableNewSource

    新しい FLIP-27 ベースのソースを使用するかどうか。

    ブール値

    いいえ

    false

    新しいソースは、シャードの変更に自動的に適応し、すべてのソースタスクにシャードを均等に分散します。

    重要
    • このパラメーターは VVR 8.0.9 以降でサポートされています。VVR 11.1 から、デフォルト値は true です。

    • このパラメーターを変更すると、ジョブは以前の状態から再開できません。過去のオフセットから消費を再開するには、まず `consumerGroup` パラメーターを指定してジョブを開始し、SLS コンシューマーグループに消費の進行状況を記録します。次に、`consumeFromCheckpoint` を true に設定し、状態なしでジョブを再起動します。

    • SLS に読み取り専用シャードが存在する場合、一部の Flink タスクが読み取り専用シャードを消費した後に未処理のシャードを要求し続けることがあり、シャードの不均等な分散を引き起こし、消費効率を低下させる可能性があります。これを軽減するには、同時実行数を調整するか、タスクのスケジューリングを最適化するか、小さなシャードをマージします。

    shardDiscoveryIntervalMs

    シャード変更検出の間隔 (ミリ秒単位)。

    Long

    いいえ

    60000

    動的検出を無効にするには、負の値を設定します。

    説明
    • この値は 60000 ms (1 分) 以上である必要があります。

    • このパラメーターは `enableNewSource` が true の場合にのみ有効です。

    • このパラメーターは VVR 8.0.9 以降でサポートされています。

    startupMode

    ソーステーブルの起動モード。

    String

    いいえ

    timestamp

    • timestamp (デフォルト):指定された時刻からログを消費します。

    • latest:最大オフセットからログを消費します。

    • earliest:最小オフセットからログを消費します。

    • consumer_group:コンシューマーグループに記録されたオフセットからログを消費します。シャードにオフセットが記録されていない場合、消費は最小オフセットから開始されます。

    重要
    • VVR 11.1 より前のバージョンでは `consumer_group` はサポートされていません。consumeFromCheckpointtrue に設定してください。消費は指定されたコンシューマーグループに記録されたオフセットから開始され、`startupMode` は効果がありません。

    startTime

    ログ消費の開始時刻。

    String

    いいえ

    現在の時刻

    フォーマット:yyyy-MM-dd hh:mm:ss

    startupMode」を「timestamp」に設定した場合にのみ有効になります。

    説明

    `startTime` と `stopTime` は、SLS の `__timestamp__` 属性ではなく、`__receive_time__` 属性に基づいています。

    stopTime

    ログ消費の終了時刻。

    String

    いいえ

    なし

    フォーマット:yyyy-MM-dd hh:mm:ss

    説明
    • このパラメーターは、過去のログを消費する場合にのみ使用してください。過去の時刻に設定してください。未来の時刻に設定すると、新しいログがないために消費が早期に終了し、データストリームが静かに中断される可能性があります。

    • ログの消費が完了したときに Flink プログラムを終了するには、`exitAfterFinish=true` も設定してください。

    consumerGroup

    コンシューマーグループ名。

    String

    いいえ

    なし

    コンシューマーグループは消費の進行状況を追跡します。任意のカスタム名を使用できます。

    説明

    同じコンシューマーグループを使用して複数のジョブ間で消費を調整することはできません。各 Flink ジョブは一意のコンシューマーグループを使用する必要があります。複数のジョブが同じコンシューマーグループを共有すると、すべてのデータを消費してしまいます。これは、Flink がパーティションの割り当てに SLS コンシューマーグループを使用しないためです。各コンシューマーはすべてのメッセージを独立して処理します。

    consumeFromCheckpoint

    指定されたコンシューマーグループのチェックポイントから消費を開始するかどうか。

    String

    いいえ

    false

    • true:Flink プログラムは、指定されたコンシューマーグループのチェックポイントから消費を開始します。チェックポイントが存在しない場合、消費は `startTime` の値から開始されます。

    • false (デフォルト):指定されたコンシューマーグループに保存されているチェックポイントからログの消費を開始しません。

    重要

    VVR 11.1 以降では、このパラメーターはサポートされていません。VVR 11.1 以降では、startupModeconsumer_group に設定してください。

    maxRetries

    SLS の読み取りに失敗した後の再試行回数。

    String

    いいえ

    3

    なし。

    batchGetSize

    リクエストごとに読み取るロググループの数。

    String

    いいえ

    100

    batchGetSize の値は 1000 を超えてはなりません。超えた場合、エラーが発生します。

    exitAfterFinish

    データ消費が完了した後に Flink プログラムが終了するかどうか。

    String

    いいえ

    false

    • true:データ消費が完了した後に Flink プログラムが終了します。

    • false (デフォルト):データ消費が完了した後も Flink プログラムは実行を継続します。

    query

    重要

    VVR 11.3 以降非推奨。下位互換性のために以降のバージョンでもサポートされています。

    SLS 消費の前処理ステートメント。

    String

    いいえ

    なし

    このパラメーターを使用して、消費前に SLS データをフィルター処理し、コストを削減し、処理速度を向上させます。

    たとえば、 'query' = '*| where request_method = ''GET''' は、Flink が読み取る前に `request_method` フィールドが GET に等しいログに一致します。

    説明

    クエリには SPL 構文を使用してください。詳細については、「SPL 構文」をご参照ください。

    重要
    • このパラメーターは VVR 8.0.1 以降でサポートされています。

    • この機能には SLS の料金が発生します。詳細については、「料金」をご参照ください。

    processor

    SLS コンシューマープロセッサ。`query` と両方が存在する場合、こちらが優先されます。

    String

    いいえ

    なし

    このパラメーターを使用して、消費前に SLS データをフィルター処理し、コストを削減し、処理速度を向上させます。`query` の代わりに `processor` を使用することを推奨します。

    たとえば、 'processor' = 'test-filter-processor' は、Flink がデータを読み取る前に SLS コンシューマープロセッサを適用します。

    説明

    プロセッサには SPL 構文を使用してください。詳細については、「SPL 構文」をご参照ください。コンシューマープロセッサの作成または更新方法については、「コンシューマープロセッサの管理」をご参照ください。

    重要

    このパラメーターは VVR 11.3 以降でサポートされています。

    この機能には SLS の料金が発生します。詳細については、「料金」をご参照ください。

  • 結果テーブル専用

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    注意事項

    topicField

    値が `__topic__` メタデータフィールドをオーバーライドするフィールド。ログのトピックを表します。

    String

    いいえ

    なし

    このフィールドはテーブルに存在する必要があります。

    timeField

    値が `__timestamp__` メタデータフィールドをオーバーライドするフィールド。ログの書き込み時刻を表します。

    String

    いいえ

    現在の時刻

    このフィールドはテーブルに存在し、INT データ型である必要があります。指定しない場合、現在の時刻が使用されます。

    sourceField

    値が `__source__` メタデータフィールドをオーバーライドするフィールド。ログのソース (例:ログを生成したマシンの IP アドレス) を表します。

    String

    いいえ

    なし

    このフィールドはテーブルに存在する必要があります。

    partitionField

    シャードルーティングに使用されるフィールド。データはこのフィールドのハッシュ値に基づいてシャードに書き込まれます。同じハッシュ値を持つレコードは同じシャードに送られます。

    String

    いいえ

    なし

    指定しない場合、各レコードは利用可能なシャードにランダムに書き込まれます。

    buckets

    `partitionField` が指定されたときにデータを再割り当てするためのバケット数。

    String

    いいえ

    64

    有効な値:1 から 256 までの 2 のべき乗の整数。バケット数はシャード数以上である必要があります。そうでない場合、一部のシャードはデータを受信しません。

    flushIntervalMs

    データ書き込みをトリガーする間隔。

    String

    いいえ

    2000

    単位:ミリ秒。

    writeNullProperties

    null 値を空の文字列として SLS に書き込むかどうか。

    ブール値

    いいえ

    true

    • true (デフォルト):null 値を空の文字列として書き込みます。

    • false:書き込み中に null 値を持つフィールドをスキップします。

    説明

    このパラメーターは VVR 8.0.6 以降でサポートされています。

メタデータフィールドの抽出

ログフィールドに加えて、次の 4 つのメタデータフィールドやその他のカスタムフィールドを抽出できます。

パラメーター

タイプ

説明

__source__

STRING METADATA VIRTUAL

メッセージソース。

__topic__

STRING METADATA VIRTUAL

メッセージトピック。

__timestamp__

BIGINT METADATA VIRTUAL

ログ時間。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

メッセージタグ。

"__tag__:__receive_time__":"1616742274" のようなプロパティの場合、キー ('__receive_time__') と値 ('1616742274') はマップに格納されます。SQL で __tag__['__receive_time__'] を使用して値にアクセスできます。

メタデータフィールドを抽出するには、次の例に示すように、`CREATE TABLE` 文で `METADATA` キーワードを使用して宣言します。

create table sls_stream(
  `__timestamp__` bigint METADATA,
  b int,
  c varchar
) with (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

関連ドキュメント

Flink DataStream API を使用したデータ消費の方法については、「DataStream API」をご参照ください。