すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Simple Log Service (SLS)

最終更新日:Mar 05, 2026

このトピックでは、Simple Log Service (SLS) コネクタの使用方法について説明します。

背景情報

Simple Log Service は、ログデータ向けの包括的なサービスです。ログデータの迅速な収集、消費、配信、クエリを可能にし、運用保守 (O&M) と運用効率を向上させ、大規模なログ処理能力を構築します。

次の表に、SLS コネクタでサポートされている情報を示します。

カテゴリ

詳細

サポートされる型

ソーステーブルと結果テーブル

実行モード

ストリーミングモードのみ

カスタムモニタリングメトリック

該当なし

データフォーマット

なし

API タイプ

SQL、DataStream、およびデータインジェスト YAML

結果テーブルデータの更新または削除のサポート

結果テーブルデータの更新または削除はサポートしていません。挿入操作のみをサポートします。

主な特徴

SLS コネクタのソーステーブルは、メッセージメタデータフィールドの直接読み取りをサポートしています。サポートされているメタデータフィールドを次の表に示します。

フィールド名

フィールド型

説明

__source__

STRING METADATA VIRTUAL

メッセージソース。

__topic__

STRING METADATA VIRTUAL

メッセージトピック。

__timestamp__

BIGINT METADATA VIRTUAL

ログ時間。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

メッセージタグ。

プロパティ "__tag__:__receive_time__":"1616742274" の場合、'__receive_time__' と '1616742274' はマップ内にキーと値のペアとして格納され、SQL では __tag__['__receive_time__'] を使用してアクセスできます。

前提条件

SLS プロジェクトと Logstore を作成する必要があります。詳細については、「プロジェクトと Logstore の作成」をご参照ください。

制限事項

  • Ververica Runtime (VVR) 11.1 以降のバージョンのみが、データインジェスト YAML の同期データソースとして SLS をサポートします。

  • SLS コネクタは at-least-once セマンティクスを保証します。

  • リソースの無駄遣いを避けるため、ソースの並列度をシャード数より高く設定しないでください。VVR 8.0.5 以前のバージョンでは、シャード数が変更されると、自動フェイルオーバーが機能しなくなり、一部のシャードが消費されなくなる可能性があります。

SQL

構文

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH パラメーター

  • 共通パラメーター

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    注意

    connector

    テーブルタイプ。

    String

    はい

    なし

    固定値:sls。

    endPoint

    エンドポイントアドレス。

    String

    はい

    なし

    SLS のプライベートネットワークエンドポイントを入力します。詳細については、「サービスエンドポイント」をご参照ください。

    説明
    • Realtime Compute for Apache Flink は、デフォルトではパブリックネットワークアクセスをサポートしていません。ただし、Alibaba Cloud NAT Gateway を使用すると、VPC ネットワークとパブリックネットワーク間の通信が可能になります。詳細については、「パブリックネットワークへのアクセス方法」をご参照ください。

    • パブリックネットワーク経由での SLS へのアクセスは避けてください。必要な場合は、HTTPS を使用し、SLS の Global Accelerator (GA) を有効にしてください。詳細については、「転送アクセラレーションの管理」をご参照ください。

    project

    SLS プロジェクト名。

    String

    はい

    なし

    なし。

    logStore

    SLS Logstore またはメトリックストア名。

    String

    はい

    なし

    Logstore とメトリックストアは同じ消費メソッドを使用します。

    accessId

    ご利用の Alibaba Cloud アカウントの AccessKey ID。

    String

    はい

    なし

    詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。

    重要

    AccessKey 情報の漏洩を防ぐため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。

    accessKey

    ご利用の Alibaba Cloud アカウントの AccessKey Secret。

    String

    はい

    なし

  • ソース固有のパラメーター

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    注意

    enableNewSource

    FLIP-27 インターフェイスを実装する新しいソースを有効にするかどうか。

    Boolean

    いいえ

    false

    新しいソースはシャードの変更に自動的に適応し、すべてのソースタスクにシャードを均等に分散します。

    重要
    • このパラメーターは VVR 8.0.9 以降でサポートされています。VVR 11.1 以降、デフォルト値は true です。

    • このパラメーターを変更すると、ジョブは以前の状態から再開できません。過去のオフセットから消費を再開するには、まず consumerGroup パラメーターを指定してジョブを開始し、SLS コンシューマーグループに消費の進行状況を記録します。次に、consumeFromCheckpoint を true に設定し、状態なしでジョブを再起動します。

    • SLS に読み取り専用シャードが存在する場合、一部の Flink タスクは読み取り専用シャードの消費が完了した後も、他の未処理のシャードをリクエストし続けることがあります。これにより、同時実行タスク間でシャードの分散が不均一になり、全体的な消費効率とシステムパフォーマンスが低下する可能性があります。この問題を軽減するには、並列度を調整するか、タスクスケジューリングを最適化するか、または小さなシャードをマージしてシャード数とタスク割り当ての複雑さを軽減します。

    shardDiscoveryIntervalMs

    シャードの変更を動的に検出する間隔 (ミリ秒)。

    Long

    いいえ

    60000

    負の値を設定すると、動的検出が無効になります。

    説明
    • この値は 60000 ms (1 分) 以上である必要があります。

    • このパラメーターは、enableNewSource が true の場合にのみ有効です。

    • このパラメーターは VVR 8.0.9 以降でサポートされています。

    startupMode

    ソーステーブルの起動モード。

    String

    いいえ

    timestamp

    • timestamp (デフォルト):指定された時刻からログの消費を開始します。

    • latest:最新のオフセットからログの消費を開始します。

    • earliest:最も早いオフセットからログの消費を開始します。

    • consumer_group:コンシューマーグループに記録されたオフセットからログの消費を開始します。シャードにオフセットが記録されていない場合、消費は最も早いオフセットから開始されます。

    重要
    • VVR 11.1 より前のバージョンは consumer_group をサポートしていません。consumeFromCheckpointtrue に設定してください。消費は指定されたコンシューマーグループに記録されたオフセットから開始され、startupMode は効果がありません。

    startTime

    ログ消費の開始時間。

    String

    いいえ

    現在の時刻

    フォーマット:yyyy-MM-dd hh:mm:ss

    startupModetimestamp に設定されている場合にのみ有効です。

    説明

    startTime と stopTime は、__timestamp__ ではなく、SLS の __receive_time__ 属性に基づいています。

    stopTime

    ログ消費の終了時間。

    String

    いいえ

    なし

    フォーマット:yyyy-MM-dd hh:mm:ss

    説明
    • このパラメーターは、過去のログのみを消費する場合に使用します。過去の時刻に設定してください。未来の時刻に設定すると、新しいログがないために消費が早期に終了し、エラーメッセージなしでデータストリームが中断される可能性があります。

    • ログ消費が完了したときに Flink プログラムを終了するには、exitAfterFinish=true も設定する必要があります。

    consumerGroup

    コンシューマーグループ名。

    String

    いいえ

    なし

    コンシューマーグループは消費の進行状況を記録します。制限なくカスタム名を定義できます。

    説明

    同じコンシューマーグループを使用して複数のジョブ間で消費を調整することはできません。各 Flink ジョブは一意のコンシューマーグループを使用する必要があります。複数のジョブが同じコンシューマーグループを共有すると、すべてのデータを消費してしまいます。これは、Flink が SLS コンシューマーグループを介してパーティションを割り当てないため、各コンシューマーが共有グループに関係なくメッセージを独立して処理するためです。

    consumeFromCheckpoint

    指定されたコンシューマーグループに保存されているチェックポイントからログの消費を開始するかどうか。

    String

    いいえ

    false

    • true:コンシューマーグループを指定します。Flink プログラムは、そのグループに保存されているチェックポイントからログの消費を開始します。チェックポイントが存在しない場合、消費は startTime の値から開始されます。

    • false (デフォルト):指定されたコンシューマーグループに保存されているチェックポイントからログの消費を開始しません。

    重要

    VVR 11.1 以降では、このパラメーターはサポートされていません。VVR 11.1 以降では、startupModeconsumer_group に設定してください。

    maxRetries

    SLS の読み取りに失敗した後の再試行回数。

    String

    いいえ

    3

    なし。

    batchGetSize

    リクエストごとに読み取るロググループの数。

    String

    いいえ

    100

    batchGetSize の値は 1000 を超えてはなりません。超えた場合、エラーが発生します。

    exitAfterFinish

    データ消費が完了した後に Flink プログラムを終了するかどうか。

    String

    いいえ

    false

    • true:データ消費が完了した後に Flink プログラムを終了します。

    • false (デフォルト):データ消費が完了した後も Flink プログラムは実行を継続します。

    query

    重要

    VVR 11.3 以降では非推奨です。以降のバージョンでも互換性は維持されます。

    SLS 消費の前処理ステートメント。

    String

    いいえ

    なし

    query パラメーターを使用して、消費前に SLS データをフィルターします。これにより、すべてのデータを Flink に消費することを避け、コストを節約し、処理速度を向上させます。

    たとえば、 'query' = '*| where request_method = ''GET''' は、Flink が読み取る前に request_method フィールドが GET に等しいログに一致します。

    説明

    クエリには SPL 構文を使用します。詳細については、「SPL 構文」をご参照ください。

    重要
    • このパラメーターは VVR 8.0.1 以降でサポートされています。

    • この機能には SLS の料金が発生します。詳細については、「料金」をご参照ください。

    processor

    SLS コンシューマープロセッサ。query と両方が存在する場合、こちらが優先されます。

    String

    いいえ

    なし

    processor パラメーターを使用して、消費前に SLS データをフィルターします。これにより、すべてのデータを Flink に消費することを避け、コストを節約し、処理速度を向上させます。query よりも processor の使用を推奨します。

    たとえば、 'processor' = 'test-filter-processor' は、Flink がデータを読み取る前に SLS コンシューマープロセッサを適用します。

    説明

    プロセッサには SPL 構文を使用します。詳細については、「SPL 構文」をご参照ください。コンシューマープロセッサの作成または更新の手順については、「コンシューマープロセッサの管理」をご参照ください。

    重要

    このパラメーターは VVR 11.3 以降でサポートされています。

    この機能には SLS の料金が発生します。詳細については、「料金」をご参照ください。

  • 結果テーブルのみ

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    注意

    topicField

    値が __topic__ メタデータフィールドをオーバーライドするフィールドの名前。ログトピックを表します。

    String

    いいえ

    なし

    このフィールドはテーブルに存在する必要があります。

    timeField

    値が __timestamp__ メタデータフィールドをオーバーライドするフィールドの名前。ログの書き込み時間を表します。

    String

    いいえ

    現在の時刻

    このフィールドはテーブルに存在し、データ型が INT である必要があります。指定しない場合、現在の時刻が使用されます。

    sourceField

    値が __source__ メタデータフィールドをオーバーライドするフィールドの名前。ログを生成したマシンの IP アドレスなどのログソースを表します。

    String

    いいえ

    なし

    このフィールドはテーブルに存在する必要があります。

    partitionField

    フィールドの名前。データはこのフィールドのハッシュ値に基づいてシャードに書き込まれます。同じハッシュ値を持つデータは同じシャードに書き込まれます。

    String

    いいえ

    なし

    指定しない場合、各レコードは利用可能なシャードにランダムに書き込まれます。

    buckets

    partitionField が指定された場合にデータを再割り当てするためのバケット数。

    String

    いいえ

    64

    有効な値:1 から 256 までの 2 のべき乗の整数。バケット数はシャード数以上である必要があります。そうでない場合、一部のシャードはデータを受信しません。

    flushIntervalMs

    データ書き込みをトリガーする時間間隔。

    String

    いいえ

    2000

    単位:ミリ秒。

    writeNullProperties

    null 値を空文字列として SLS に書き込むかどうか。

    Boolean

    いいえ

    true

    • true (デフォルト):null 値を空文字列として書き込みます。

    • false:書き込み中に null 値を持つフィールドをスキップします。

    説明

    このパラメーターは VVR 8.0.6 以降でサポートされています。

型マッピング

Flink フィールド型

SLS フィールド型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

データインジェスト (パブリックプレビュー)

制限事項

この機能は VVR 11.1 以降でのみサポートされています。

構文

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

設定項目

パラメーター

説明

データ型

必須

デフォルト値

注意

type

データソースタイプ。

String

はい

なし

固定値:sls。

endpoint

エンドポイントアドレス。

String

はい

なし

SLS のプライベートネットワークエンドポイントを入力します。詳細については、「サービスエンドポイント」をご参照ください。

説明
  • Realtime Compute は、デフォルトではパブリックネットワークアクセスをサポートしていません。ただし、Alibaba Cloud NAT Gateway を使用すると、VPC ネットワークとパブリックネットワーク間の通信が可能になります。詳細については、「パブリックネットワークへのアクセス方法」をご参照ください。

  • パブリックネットワーク経由での SLS へのアクセスは避けてください。必要な場合は、HTTPS を使用し、SLS の GA を有効にしてください。詳細については、「転送アクセラレーションの管理」をご参照ください。

accessId

ご利用の Alibaba Cloud アカウントの AccessKey ID。

String

はい

なし

詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。

accessKey

ご利用の Alibaba Cloud アカウントの AccessKey Secret。

String

はい

なし

project

SLS プロジェクト名。

String

はい

なし

なし。

logStore

SLS Logstore またはメトリックストア名。

String

はい

なし

Logstore とメトリックストアは同じ消費メソッドを使用します。

schema.inference.strategy

スキーマ推論戦略。

String

いいえ

continuous

  • continuous:すべてのレコードに対してスキーマを推論します。スキーマに互換性がない場合、より広いスキーマを推論し、スキーマ変更イベントを生成します。

  • static:ジョブの起動時に一度だけスキーマを推論します。後続のレコードは初期スキーマを使用して解析します。スキーマ変更イベントは生成されません。

maxPreFetchLogGroups

初期スキーマ推論中に、シャードごとに読み取りと解析を試みるロググループの最大数。

Integer

いいえ

50

実際のデータ読み取りと処理の前に、指定された数のロググループをシャードごとに消費してスキーマ情報を初期化しようとします。

shardDiscoveryIntervalMs

シャードの変更を動的に検出する間隔 (ミリ秒)。

Long

いいえ

60000

負の値を設定すると、動的検出が無効になります。

説明

この値は 60000 ms (1 分) 以上である必要があります。

startupMode

起動モード。

String

いいえ

なし

  • timestamp (デフォルト):指定された時刻からログの消費を開始します。

  • latest:最新のオフセットからログの消費を開始します。

  • earliest:最も早いオフセットからログの消費を開始します。

  • consumer_group:コンシューマーグループに記録されたオフセットからログの消費を開始します。シャードにオフセットが記録されていない場合、消費は最も早いオフセットから開始されます。

startTime

ログ消費の開始時間。

String

いいえ

現在の時刻

フォーマット:yyyy-MM-dd hh:mm:ss。

startupMode が timestamp に設定されている場合にのみ有効です。

説明

startTime と stopTime は、__timestamp__ ではなく、SLS の __receive_time__ 属性に基づいています。

stopTime

ログ消費の終了時間。

String

いいえ

なし

フォーマット:yyyy-MM-dd hh:mm:ss。

説明

ログ消費が完了したときに Flink プログラムを終了するには、exitAfterFinish=true も設定する必要があります。

consumerGroup

コンシューマーグループ名。

String

いいえ

なし

コンシューマーグループは消費の進行状況を記録します。制限なくカスタム名を定義できます。

batchGetSize

リクエストごとに読み取るロググループの数。

Integer

いいえ

100

batchGetSize の値は 1000 を超えてはなりません。超えた場合、エラーが発生します。

maxRetries

SLS の読み取りに失敗した後の再試行回数。

Integer

いいえ

3

なし。

exitAfterFinish

データ消費が完了した後に Flink プログラムを終了するかどうか。

Boolean

いいえ

false

  • true:データ消費が完了した後に Flink プログラムを終了します。

  • false (デフォルト):データ消費が完了した後も Flink プログラムは実行を継続します。

query

SLS 消費の前処理ステートメント。

String

いいえ

なし

query パラメーターを使用して、消費前に SLS データをフィルターします。これにより、すべてのデータを Flink に消費することを避け、コストを節約し、処理速度を向上させます。

たとえば、'query' = '*| where request_method = ''GET''' は、Flink が読み取る前に request_method フィールドが GET に等しいログに一致します。

説明

クエリには SPL 構文を使用します。詳細については、「SPL 構文」をご参照ください。

重要
  • SLS がこの機能をサポートしているリージョンについては、「ルールに基づくログの消費」をご参照ください。

  • この機能はパブリックプレビュー中は無料です。後で料金が発生する場合があります。詳細については、「料金」をご参照ください。

compressType

SLS 圧縮タイプ。

String

いいえ

なし

サポートされている圧縮タイプ:

  • lz4

  • deflate

  • zstd

timeZone

startTime と stopTime のタイムゾーン。

String

いいえ

なし

デフォルトではオフセットは追加されません。

regionId

SLS が利用可能なリージョン。

String

いいえ

なし

設定の詳細については、「利用可能なリージョン」ドキュメントをご参照ください。

signVersion

SLS リクエスト署名バージョン。

String

いいえ

なし

設定の詳細については、「リクエスト署名」ドキュメントをご参照ください。

shardModDivisor

SLS Logstore シャードの読み取り時に使用される除数。

Int

いいえ

-1

設定の詳細については、「シャード」ドキュメントをご参照ください。

shardModRemainder

SLS Logstore シャードの読み取り時に使用される剰余。

Int

いいえ

-1

設定の詳細については、「シャード」ドキュメントをご参照ください。

metadata.list

ダウンストリームシステムに渡すメタデータ列。

String

いいえ

なし

サポートされているメタデータフィールドには、__source____topic____timestamp__、および __tag__ が含まれます。複数のフィールドはカンマで区切ります。

decode.table-id.fields

SLS ログデータを解析する際にテーブル ID を生成するために使用されるフィールド。

String

いいえ

なし

複数のフィールドは英語のカンマ (,) で連結されます。たとえば、アップストリームの SLS ログレコードが {"col0":"a", "col1":"b", "col2":"c"} の場合、異なるパラメーター設定で次の結果が生成されます:

設定

テーブル ID

なし

すべてのメッセージは Project.LogStore に保存されます。

col0

a

col0,col1

a.b

col0,col1,col2

a.b.c

説明

この設定は VVR 11.6 以降でサポートされています。

fixed-types

SLS ログデータを解析する際に指定するフィールド型。

String

いいえ

なし

データを解析する際に、特定のフィールドのデータ型を指定します。複数のフィールドは英語のカンマ , で区切ります。たとえば、id BIGINT, name VARCHAR(10) は、データ内の id フィールドが BIGINT 型であり、name フィールドが VARCHAR(10) 型であることを指定します。

説明

この設定は VVR 11.6 以降でサポートされています。

timestamp-format.standard

SLS ログデータ内のタイムスタンプフィールドのフォーマット。

String

いいえ

SQL

有効な値:

  • SQL:yyyy-MM-dd HH:mm:ss.s{precision} 形式のタイムスタンプ (例:2020-12-30 12:13:14.123) を解析し、同じ形式で出力します。

  • ISO-8601:yyyy-MM-ddTHH:mm:ss.s{precision} 形式のタイムスタンプ (例:2020-12-30T12:13:14.123) を解析し、同じ形式で出力します。

説明

この設定は VVR 11.6 以降でサポートされています。

ingestion.ignore-errors

データ解析中のエラーを無視するかどうか。

Boolean

いいえ

false

説明

この設定は VVR 11.6 以降でサポートされています。

ingestion.error-tolerance.max-count

ingestion.ignore-errors が有効な場合に、ジョブが失敗するまでに許容される解析エラーの最大数。

Integer

いいえ

-1

ingestion.ignore-errors が有効な場合にのみ効果があります。デフォルト値の -1 は、解析エラーがジョブの失敗をトリガーしないことを意味します。

説明

この設定は VVR 11.6 以降でサポートされています。

型マッピング

fixed-types が設定されていない場合、データインジェストの型マッピングは次のようになります:

SLS フィールド型

CDC フィールド型

STRING

STRING

fixed-types が設定されている場合、コネクタは指定された型を使用してデータの解析を試みます。

スキーマ推論とスキーマ変更の同期

  • シャードの事前消費とスキーマの初期化

    SLS コネクタは、データを読み取る Logstore の現在のスキーマを維持します。データを読み取る前に、シャードごとに最大 `maxPreFetchLogGroups` 個のロググループを事前消費します。その後、各ログのスキーマを解析し、これらのスキーマをマージしてテーブル構造を初期化します。続いて、実際の消費の前に、初期化されたスキーマに基づいて対応するテーブル作成イベントを生成します。

    説明

    各シャードについて、SLS コネクタは現在の時刻の 1 時間前からログスキーマの消費と解析を試みます。

  • プライマリキー情報

    SLS ログにはプライマリキー情報が含まれていません。変換ルールを使用して手動でプライマリキーを追加できます:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • スキーマ推論とスキーマ変更

    スキーマの初期化後、schema.inference.strategy が static に設定されている場合、SLS コネクタは初期スキーマを使用して各ログを解析し、スキーマ変更イベントを生成しません。schema.inference.strategy が continuous に設定されている場合、SLS コネクタは各ログを解析し、物理列を推論し、それらを現在のスキーマと比較します。その後、スキーマが異なる場合にスキーマをマージします。マージルール:

    • 推論された物理列に現在のスキーマに存在しないフィールドが含まれている場合、それらのフィールドはスキーマに追加され、新しい null 値許容列イベントが生成されます。

    • 推論された物理列に現在のスキーマに既に存在するフィールドが含まれていない場合、それらのフィールドは残ります。そのデータは NULL で埋められ、列削除イベントは生成されません。

    SLS コネクタは、ログ内のすべてのフィールド型を `STRING` として推論します。現在、列の追加のみがサポートされています。新しい列は現在のスキーマの末尾に追加され、null 値許容として設定されます。

コード例

  • SQL ソーステーブルと結果テーブル

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • データインジェストソース

    SLS はデータインジェストジョブのデータソースとして機能し、SLS データをサポートされているダウンストリームシステムにリアルタイムで書き込みます。たとえば、データインジェストジョブを設定して、Logstore から DLF データレイクに Paimon 形式でデータを書き込むことができます。ジョブはフィールドのデータ型とダウンストリームテーブルの構造を自動的に推論し、実行時の動的なスキーマ進化をサポートします。

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# テーブルにプライマリキー情報を追加
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# test_log のすべてのデータを test_database.inventory テーブルに書き込み
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) 削除ベクターを有効にして読み取りパフォーマンスを向上
  table.properties.deletion-vectors.enabled: true

DataStream API

重要

DataStream を使用してデータを読み書きするには、Flink 用の対応する DataStream コネクタを使用する必要があります。手順については、「DataStream コネクタの使用」をご参照ください。

VVR 8.0.10 より前のバージョンを使用している場合、依存関係の JAR ファイルが不足しているとジョブが起動しないことがあります。対応する -uber JAR ファイルを追加の依存関係として追加する必要があります。

SLS からの読み取り

VVR は、SLS からの読み取り用に `SlsSourceFunction` クラスを提供します。次の例は、SLS から読み取る方法を示しています。

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // ストリーミング実行環境をセットアップします
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // SLS ソースとシンクを作成して追加します。
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // バッチ取得サイズを指定する必要があります。
        accessInfo.setBatchGetSize(10);

        // オプションのパラメーター
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // 消費を開始する時間。現在の時刻に設定します。
        int startInSec = (int) (new Date().getTime() / 1000);

        // 消費を停止する時間。-1 は停止しないことを意味します。
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

SLS への書き込み

VVR は、SLS への書き込み用に `SLSOutputFormat` クラスを提供します。次の例は、SLS に書き込む方法を示しています。

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

SLS DataStream コネクタは、次のリンクから Maven セントラルリポジトリで入手できます: SLS DataStream コネクタ

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-format-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>

よくある質問

失敗した Flink ジョブを回復する際に、TaskManager のメモリが不足し、ソーステーブルが「java.lang.OutOfMemoryError: Java heap space」を報告する