すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Simple Log Service (SLS) コネクタ

最終更新日:Nov 06, 2025

このトピックでは、Simple Log Service(SLS)コネクタの使用方法について説明します。

背景情報

Simple Log Service は、Alibaba Cloud によって開発されたエンドツーエンドのデータロギングサービスです。ログデータの効率的な収集、消費、転送、クエリ、分析が可能です。O&M 効率を向上させ、大量のログデータを処理する機能を提供します。

次の表は、SLS コネクタでサポートされている機能を示しています。

カテゴリ

説明

サポートされているタイプ

ソーステーブルとシンクテーブル

実行モード

ストリーミングモード

メトリック

N/A

データ形式

N/A

APIタイプ

SQL、DataStream API、およびデータインジェスチョン YAML API

シンクテーブルのデータ更新または削除

シンクテーブルのデータは更新または削除できません。データはシンクテーブルに挿入することのみ可能です。

機能

SLS ソースコネクタを使用して、メッセージの属性フィールドを読み取ることができます。次の表は、SLS ソースコネクタでサポートされている属性フィールドを示しています。

フィールド

タイプ

説明

__source__

STRING METADATA VIRTUAL

メッセージソース。

__topic__

STRING METADATA VIRTUAL

メッセージトピック。

__timestamp__

BIGINT METADATA VIRTUAL

ログが生成された時刻。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

メッセージタグ。

"__tag__:__receive_time__":"1616742274"'__receive_time__'、および '1616742274' 属性は、マップ内のキーと値のペアとしてログに記録され、SQL では __tag__['__receive_time__'] モードでアクセスされます。

前提条件

プロジェクトとログストアが作成されていること。詳細については、「プロジェクトとログストアを作成する」をご参照ください。

制限事項

  • Ververica Runtime(VVR)11.1 以降のみが、データインジェスチョンソースとしての SLS の使用をサポートしています。

  • SLS コネクタは、少なくとも1回のセマンティクスのみをサポートしています。

  • リソース効率を高めるには、ソースオペレーターの並列処理をシャード数以下の値に設定します。VVR 8.0.5 以前では、ソースの並列処理がシャード数を超え、シャード数が変更された場合、自動ジョブフェールオーバーが無効になり、シャードが消費されない可能性があります。

SQL

構文

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH 句のコネクタオプション

  • 一般

    オプション

    説明

    データ型

    必須?

    デフォルト値

    備考

    connector

    使用するコネクタ。

    文字列

    はい

    デフォルト値なし

    sls に設定します。

    endPoint

    SLS のエンドポイント。

    文字列

    はい

    デフォルト値なし

    SLS の VPC アクセスアドレスを入力します。詳細については、「エンドポイント」をご参照ください。

    説明

    project

    SLS プロジェクトの名前。

    文字列

    はい

    デフォルト値なし

    logStore

    SLS ログストアまたはメトリックストアの名前。

    STRING

    はい

    デフォルト値なし

    ログストアのデータは、メトリックストアの場合と同じ方法で消費されます。

    accessId

    Alibaba CloudアカウントのAccessKey ID。

    STRING

    はい

    デフォルト値なし

    詳細については、「アカウントの AccessKey ペアを表示するにはどうすればよいですか?」をご参照ください。

    重要

    AccessKey ペアを保護するために、変数 を使用して AccessKey を構成します。

    accessKey

    Alibaba CloudアカウントのAccessKeyシークレット。

    STRING

    はい

    デフォルト値なし

  • ソース固有

    オプション

    説明

    データ型

    必須?

    デフォルト値

    備考

    enableNewSource

    FLIP-27 リファクタリングソースインターフェースを使用するかどうかを指定します。

    BOOLEAN

    いいえ

    false

    このオプションを有効にすると、ソースはシャードの変更に自動的に適応し、ソースサブタスク間でシャードをできるだけ均等に分散します。

    説明

    VVR 8.0.9 以降のみがこのオプションをサポートしています。

    重要
    • VVR 11.1 以降では、このオプションはデフォルトで true に設定されています。

    • オプションの値が変更された場合、ジョブは特定の状態から再開できません。これを解決するには、consumerGroup オプションを構成して現在のコンシューマオフセットを記録し、ジョブを開始します。次に、consumeFromCheckpointtrue に設定し、状態なしでジョブを開始します。

    • ソースサブタスクが読み取り専用シャードからの読み取りを完了すると、他のシャードの消費を要求し続けます。これにより、ソースサブタスク間でシャードの消費が不均一になり、ジョブ全体のパフォーマンスに影響を与える可能性があります。この問題を軽減するには、ソースの並列処理の調整、スケジューリング戦略の最適化、または小さなシャードのマージによるシャード割り当ての簡素化を検討してください。

    shardDiscoveryIntervalMs

    シャードの変更を動的に検出する間隔。

    LONG

    いいえ

    60000

    単位:ミリ秒。

    動的検出を無効にするには、オプションを負の値に設定します。

    説明
    • このオプションの値は 1 分(60,000 ミリ秒)未満にすることはできません。

    • このオプションは、enableNewSource オプションが true に設定されている場合にのみ有効になります。

    • VVR 8.0.9 以降のみがこのオプションをサポートしています。

    startupMode

    ソーステーブルの起動モード。

    STRING

    いいえ

    timestamp

    • timestamp:ログは指定された開始時刻から消費されます。

    • latest:ログは最新のオフセットから消費されます。

    • earliest:ログは最も古いオフセットから消費されます。

    • consumer_group: 使用者グループに記録されたオフセットからログが消費されます。使用者グループがシャードの消費オフセットを記録していない場合、ログは最も古いオフセットから消費されます。

    重要
    • VVR 11.1 より前のバージョンでは、consumer_group はサポートされなくなりました。指定された使用者グループに記録されたオフセットからデータを消費するには、consumeFromCheckpointtrue に設定します。

    startTime

    ログの消費を開始する時刻。

    STRING

    いいえ

    現在時刻

    このオプションの値は、yyyy-MM-dd hh:mm:ss 形式です。

    このオプションは、startupModetimestamp に設定されている場合にのみ有効になります。

    説明

    startTime パラメーターと stopTime パラメーターは、__timestamp__ フィールドではなく、SLS ソーステーブルの __receive_time__ フィールドに基づいて構成されます。

    stopTime

    ログの消費を停止する時刻。

    文字列

    いいえ

    デフォルト値なし

    このオプションの値は、yyyy-MM-dd hh:mm:ss 形式です。

    説明
    • 履歴ログのみを消費するには、このオプションを特定の履歴時点に設定します。将来の時点を使用すると、新しいログのインジェストが一時的に中断された場合に、消費が予期せず停止する可能性があります。観測可能な症状は、付随するエラーメッセージや例外なしにデータストリームが中断することです。

    • ログの消費が完了した後に Realtime Compute for Apache Flink プログラムを終了させたい場合は、exitAfterFinish オプションも設定し、exitAfterFinish オプションを true に設定する必要があります。

    consumerGroup

    コンシューマグループの名前。

    STRING

    いいえ

    デフォルト値なし

    コンシューマグループは消費の進捗状況を記録します。カスタムコンシューマグループ名を指定できます。名前の形式は固定されていません。

    説明

    コンシューマーグループは、共同消費のために複数のジョブで共有することはできません。ジョブごとに異なるコンシューマーグループを指定することをお勧めします。異なるジョブに同じコンシューマーグループを指定すると、すべてのデータが消費されます。Realtime Compute for Apache Flink が SLS からデータを消費する場合、データはコンシューマーグループ内でシャーディングされません。したがって、複数のジョブが同じコンシューマーグループを共有する場合、コンシューマーグループ内のすべてのメッセージが各ジョブによって消費されます。

    consumeFromCheckpoint

    指定されたコンシューマグループに保存されているチェックポイントからログを消費するかどうかを指定します。

    STRING

    いいえ

    false

    • true:このオプションを true に設定する場合は、コンシューマーグループも指定する必要があります。Flink は、コンシューマーグループに保存されているチェックポイントからログを消費します。コンシューマーグループにチェックポイントが存在しない場合、Flink は startTime オプションで指定された時刻からログを消費します。

    • false:Flink は、指定されたコンシューマーグループに保存されているチェックポイントからログを消費しません。

    重要

    VVR 11.1 以降、このオプションはサポートされなくなりました。startupModeconsumer_group に設定する必要があります。

    maxRetries

    SLS からデータを読み取ることができない場合に許可される再試行回数。

    文字列

    いいえ

    3

    batchGetSize

    リクエストで読み取るロググループの数。

    文字列

    いいえ

    100

    エラーを防ぐために、batchGetSize を 1000 未満の値に設定します。

    exitAfterFinish

    データ消費の完了後に Realtime Compute for Apache Flink プログラムを終了するかどうかを指定します。

    文字列

    いいえ

    false

    • true

    • false

    query

    重要

    このオプションは VVR 11.3 で非推奨になりましたが、後続のバージョンでは互換性が維持されます。

    データ消費前にデータを前処理するために使用されるクエリ文。

    STRING

    いいえ

    デフォルト値なし

    このオプションを構成すると、データ消費が始まる前に SLS からデータをフィルタリングして、コストを削減し、データ処理効率を向上させることができます。

    たとえば、'query' = '*| where request_method = ''GET''' を指定すると、Realtime Compute for Apache Flink は、データ消費前に request_method フィールド値が GET と等しいデータをフィルタリングします。

    説明

    このオプションを構成する際は、SPL 構文 を使用してください。

    重要
    • VVR 8.0.1 以降のみがこのオプションをサポートしています。

    • この機能は SLS から料金が発生します。詳細については、「課金」をご参照ください。

    processor

    SLS プロセッサ。このオプションと query の両方が設定されている場合、query が優先されます。

    STRING

    いいえ

    デフォルト値なし

    このオプションは機能的に query と同等ですが、このオプションの使用をお勧めします。たとえば、'processor' = 'test-filter-processor' を設定すると、データは Flink によって消費される前に SLS プロセッサによってフィルター処理されることを示します。

    説明

    このオプションを設定する際は、「SPL 構文」をご参照ください。

    重要
    • VVR 8.0.1 以降のみがこのオプションをサポートしています。

    • この機能は SLS から料金が発生します。詳細については、「課金」をご参照ください。

  • シンク固有

    オプション

    説明

    データ型

    必須?

    デフォルト値

    備考

    topicField

    フィールド名を指定します。このオプションの値は、__topic__ フィールドの値を上書きして、ログのトピックを示します。

    文字列

    いいえ

    デフォルト値なし

    このオプションの値は、テーブルに存在するフィールドである必要があります。

    timeField

    フィールド名を指定します。このオプションの値は、__timestamp__ フィールドの値を上書きして、ログの書き込み時刻を示します。

    文字列

    いいえ

    現在時刻

    このオプションは、既存の INT フィールドに設定する必要があります。フィールドが指定されていない場合は、現在時刻が使用されます。

    sourceField

    フィールド名を指定します。このオプションの値は、__source__ 属性フィールドの値を上書きして、ログの発信元を示します。たとえば、値はログを生成するマシンの IP アドレスです。

    STRING

    いいえ

    デフォルト値なし

    このオプションの値は、テーブルに存在するフィールドである必要があります。

    partitionField

    フィールド名を指定します。SLS にデータが書き込まれるとき、このパラメーターの値に基づいてハッシュ値が計算されます。同じハッシュ値を含むデータは、同じシャードに書き込まれます。

    STRING

    いいえ

    デフォルト値なし

    このオプションを指定しない場合、各データエントリは使用可能なシャードにランダムに書き込まれます。

    buckets

    partitionField オプションが指定されている場合に、ハッシュ値に基づいて再グループ化されるバケットの数。

    文字列

    いいえ

    64

    有効値の範囲:[1,256]。このオプションの値は 2 の整数乗である必要があります。バケットの数は、シャードの数以上である必要があります。そうでない場合、データは特定のシャードに書き込まれません。

    flushIntervalMs

    データ書き込みがトリガーされる間隔。

    STRING

    いいえ

    2000

    単位:ミリ秒。

    writeNullProperties

    null 値を空の文字列として SLS に書き込むかどうかを指定します。

    BOOLEAN

    いいえ

    true

    • true

    • false

    説明

    VVR 8.0.6 以降のみがこのオプションをサポートしています。

データ型のマッピング

Apache Flink用Realtime Computeのデータ型

SLS のデータ型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

整数

BIGINT

フロート

DOUBLE

デシマル

データインジェスチョン

制限事項

VVR 11.1 以降のみが、SLS からのデータインジェスチョンをサポートしています。

構文

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

構成オプション

オプション

説明

データ型

必須?

デフォルト値

備考

type

データソースのタイプ。

文字列

はい

デフォルト値なし

sls に設定します。

endpoint

エンドポイント。

文字列

はい

デフォルト値なし

SLS の VPC アクセスアドレスを入力します。詳細については、「エンドポイント」をご参照ください。

説明

accessId

Alibaba Cloud アカウントの AccessKey ID。

文字列

はい

デフォルト値なし

アカウントの AccessKey ペアを表示するにはどうすればよいですか?」をご参照ください。

重要

AccessKey ペアを保護するために、変数 を使用して AccessKey ID とシークレットを構成します。

accessKey

Alibaba Cloud アカウントの AccessKey シークレット。

文字列

はい

デフォルト値なし

project

SLS プロジェクトの名前。

文字列

はい

デフォルト値なし

logStore

ログストアまたはメトリックストアの名前。

文字列

はい

デフォルト値なし

ログストアのデータは、メトリックストアの場合と同じ方法で消費されます。

schema.inference.strategy

スキーマ推論の戦略。

文字列

いいえ

continuous

  • continuous:スキーマ推論は各データエントリに対して実行されます。スキーマに互換性がない場合は、より広いスキーマが推論され、スキーマ変更イベントが生成されます。

  • static:スキーマ推論は、ジョブの開始時に一度だけ実行されます。後続のデータは初期スキーマに基づいて解析され、スキーマ変更イベントは生成されません。

maxPreFetchLogGroups

初期スキーマ推論中に各シャードに対して読み取りおよび解析されるロググループの最大数。

整数

いいえ

50

データがロードおよび処理される前に、コネクタは各シャードから指定された数のロググループを事前に消費して、スキーマを初期化しようとします。

shardDiscoveryIntervalMs

シャードの変更を動的に検出する間隔。

Long

いいえ

60000

このオプションを負の値に設定すると、動的検出が無効になります。単位:ミリ秒。

説明

このオプションの値は 1 分(60,000 ミリ秒)未満にすることはできません。

startupMode

起動モード。

文字列

いいえ

timestamp

  • timestamp:ログは指定された開始時刻から消費されます。

  • latest:ログは最新のオフセットから消費されます。

  • earliest:ログは最も古いオフセットから消費されます。

  • consumer_group: 使用者グループに記録されたオフセットからログが消費されます。使用者グループがシャードの消費オフセットを記録していない場合、ログは最も古いオフセットから消費されます。

startTime

ログ消費の開始時刻。

文字列

いいえ

現在時刻

このオプションの値は yyyy-MM-dd hh:mm:ss 形式です。

startupModetimestamp に設定されている場合にのみ有効になります。

説明

startTime オプションと stopTime オプションは、__timestamp__ フィールドではなく、SLS の __receive_time__ フィールドに基づいて構成されます。

stopTime

ログ消費の停止時刻。

文字列

いいえ

デフォルト値なし

このオプションの値は yyyy-MM-dd hh:mm:ss 形式です。

説明

ログ消費の完了時に Flink ジョブをキャンセルするには、exitAfterFinishtrue に設定します。

consumerGroup

コンシューマーグループの名前。

文字列

いいえ

デフォルト値なし

コンシューマーグループは消費の進捗状況を記録します。カスタムコンシューマーグループ名を指定できます。名前の形式は固定されていません。

batchGetSize

リクエストで読み取るロググループの数。

整数

いいえ

100

エラーを防ぐために、batchGetSize を 1000 未満の値に設定します。

maxRetries

SLS からの読み取りが失敗した後の再試行回数。

整数

いいえ

3

exitAfterFinish

データ消費の完了後に Flink プログラムを終了するかどうかを指定します。

ブール値

いいえ

false

  • true

  • false

query

Flink が SLS からデータを消費する前に、データを前処理するために使用されるクエリ文。

文字列

いいえ

デフォルト値なし

このオプションを構成すると、消費前にデータをフィルタリングして、コストを削減し、データ処理効率を向上させることができます。

たとえば、'query' = '*| where request_method = ''GET''' は、request_methodGET であるデータをフィルタリングすることを示します。

説明

SPL 構文 を使用してクエリを記述します。

重要
  • この機能をサポートしているリージョンについては、「ルールに基づいてログを消費する」をご参照ください。

  • この機能はパブリックプレビュー段階にあり、現在は無料です。今後、課金される場合があります。詳細については、「請求」をご参照ください。

compressType

圧縮タイプ。

文字列

いいえ

デフォルト値なし

有効な値:

  • lz4

  • deflate

  • zstd

timeZone

startTimestopTime のタイムゾーン。

文字列

いいえ

デフォルト値なし

デフォルトでは、オフセットは追加されません。

regionId

SLS が存在するリージョン。

文字列

いいえ

デフォルト値なし

サポートされているリージョン」をご参照ください。

signVersion

SLS リクエスト署名バージョン。

文字列

いいえ

デフォルト値なし

リクエスト署名」をご参照ください。

shardModDivisor

SLS ログストアシャードから読み取るときに使用される除数。

整数

いいえ

-1

このオプションを構成するには、「シャード」をご参照ください。

shardModRemainder

SLS ログストアシャードから読み取るときに使用される剰余。

整数

いいえ

-1

このオプションを構成するには、「シャード」をご参照ください。

metadata.list

ダウンストリームに渡されるメタデータ列。

文字列

いいえ

デフォルト値なし

使用可能なメタデータフィールドには、__source____topic____timestamp__、および __tag__ が含まれます。カンマで区切ることができます。

データ型のマッピング

データインジェスチョンのデータ型のマッピングは次のとおりです。

SLS データ型

Flink CDC データ型

STRING

STRING

スキーマの推論と進化

  • データの事前消費とスキーマの初期化

    SLS コネクタは、現在のログストアのスキーマを維持します。ログストアからデータを読み取る前に、コネクタは各シャードから最大 maxPreFetchLogGroups 個のロググループを事前に消費しようと試み、各ログのスキーマを解析およびマージすることでスキーマを初期化します。その後、データ消費が始まる前に、初期化されたスキーマに基づいてテーブル作成イベントが生成されます。

    説明

    各シャードについて、コネクタは現在時刻の 1 時間前にデータを消費してスキーマを解析しようとします。

  • プライマリキー

    SLS ログにはプライマリキーが含まれていません。変換モジュールでテーブルにプライマリキーを手動で追加します。

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • スキーマの推論と進化

    スキーマの初期化後、schema.inference.strategystatic に設定されている場合、コネクタはスキーマに基づいて各ログエントリを解析し、スキーマ変更イベントを生成しません。schema.inference.strategycontinuous に設定されている場合、コネクタは各ログエントリを解析し、物理列を推論し、現在のスキーマと比較します。推論されたスキーマが現在のスキーマと一致しない場合、スキーマは次のルールに従ってマージされます。

    • 推論されたスキーマに現在のスキーマにない物理列が含まれている場合、不足している列が現在のスキーマに追加され、null 許容列追加イベントが生成されます。

    • 推論されたスキーマに現在のスキーマの特定の列が含まれていない場合、これらの列は保持され、それらの値は NULL に設定されます。

    SLS コネクタはすべてのフィールドを文字列フィールドとして推論します。現在、列の追加のみがサポートされています。新しい列は、null 許容列として現在のスキーマに追加されます。

サンプルコード

  • ソーステーブルとシンクテーブル:

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • データインジェスチョンソース:

    source:
       type: sls
       name: SLS Source
       endpoint: ${endpoint}
       project: ${project}
       logstore: ${logstore}
       accessId: ${accessId}
       accessKey: ${accessKey}
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

DataStream API

重要

DataStream API を呼び出してデータの読み取りまたは書き込みを行う場合は、関連タイプの DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。詳細については、「DataStream コネクタの使用方法」をご参照ください。

8.0.10 より前の VVR バージョンを使用している場合、ジョブの起動時に依存関係の JAR パッケージが見つからない場合があります。これを解決するには、対応する uber JAR パッケージを追加の依存関係として含めます。

SLS からデータを読み取る

Realtime Compute for Apache Flink の VVR は、SLS からデータを読み取るための SourceFunction の実装クラス SlsSourceFunction を提供します。サンプルコード:

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // ストリーミング実行環境を設定します
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // SLS ソースとシンクを作成して追加します。
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source"); // SLSストリームソース
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint"); // エンドポイント
        accessInfo.setProjectName("yourProject"); // プロジェクト名
        accessInfo.setLogstore("yourLogStore"); // ログストア
        accessInfo.setAccessId("yourAccessId"); // アクセスID
        accessInfo.setAccessKey("yourAccessKey"); // アクセスキー

        // バッチ取得サイズを指定する必要があります。
        accessInfo.setBatchGetSize(10);

        // オプションパラメータ
        accessInfo.setConsumerGroup("yourConsumerGroup"); // コンシューマグループ
        accessInfo.setMaxRetries(3); // 最大再試行回数


        // 消費開始時刻、現在時刻に設定。
        int startInSec = (int) (new Date().getTime() / 1000);

        // 消費停止時刻、-1 は停止しないことを意味します。
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

SLS にデータを書き込む

Realtime Compute for Apache Flink の VVR は、SLS にデータを書き込むための OutputFormat の実装クラス SLSOutputFormat を提供します。サンプルコード:

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink"); // SLSストリームシンク
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint"); // エンドポイント
        conf.setString(SLSOptions.PROJECT, "yourProject"); // プロジェクト名
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore"); // ログストア
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId"); // アクセスID
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey"); // アクセスキー
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString()); // seed.toString() 用のテストメッセージです
        record.setContent(logItem);
        return record;
    }

}

XML

異なるバージョンの Simple Log Service DataStreamコネクタ は、Maven 中央リポジトリに保存されています。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

FAQ

失敗したFlinkプログラムを復元するときに、TaskManagerでOOMエラーが発生し、ソーステーブルに「java.lang.OutOfMemoryError: Java heap space」というエラーメッセージが表示される場合はどうすればよいですか?