すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Simple Log Service (SLS)

最終更新日:Feb 13, 2026

このトピックでは、Simple Log Service (SLS) コネクタの使用方法について説明します。

背景情報

Simple Log Service は、ログデータ向けのエンドツーエンドサービスです。ログデータの収集、消費、転送、照会、分析を迅速に実行できます。これにより、運用およびメンテナンスの効率が向上し、大規模なログ処理が可能になります。

SLS コネクタは、以下の種類の情報をサポートしています。

カテゴリ

説明

サポートされる種類

ソーステーブルおよび結果テーブル

実行モード

ストリーミングモードのみ

監視メトリクス

該当なし

データ形式

該当なし

API の種類

SQL、DataStream API、およびデータインジェスト YAML

結果テーブルへのデータの更新または削除

結果テーブル内のデータを更新または削除することはできません。結果テーブルへのデータ挿入のみ可能です。

機能

SLS ソースコネクタは、メッセージ属性フィールドを直接読み取ります。以下の表に、サポートされる属性フィールドを示します。

フィールド名

説明

__source__

STRING METADATA VIRTUAL

メッセージの送信元です。

__topic__

STRING METADATA VIRTUAL

メッセージのトピックです。

__timestamp__

BIGINT METADATA VIRTUAL

ログが生成された時刻です。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

メッセージのタグです。

属性 "__tag__:__receive_time__":"1616742274" では、'__receive_time__' と '1616742274' がキーと値のペアとしてマップ内に格納されます。SQL では、__tag__['__receive_time__'] を使用してアクセスします。

前提条件

SLS プロジェクトおよび Logstore を作成済みである必要があります。詳細については、「プロジェクトおよび Logstore の作成」をご参照ください。

制限事項

  • SLS をデータインジェストソースとして使用するには、Ververica Runtime (VVR) 11.1 以降が必要です。

  • SLS コネクタは、at-least-once セマンティクスのみをサポートします。

  • ソースの並列度をシャード数より高く設定しないでください。その場合、リソースが無駄になります。VVR 8.0.5 以前では、高い並列度を設定した後にシャード数が変更されると、自動フェールオーバーが失敗する可能性があります。これにより、一部のシャードが未消費のままになることがあります。

SQL

構文

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH パラメーター

  • 一般

    パラメーター

    説明

    データ型

    必須?

    デフォルト値

    備考

    connector

    テーブルの種類です。

    String

    はい

    なし

    値を sls に設定します。

    endPoint

    エンドポイントアドレスです。

    String

    はい

    なし

    SLS の VPC エンドポイントを入力します。詳細については、「エンドポイント」をご参照ください。

    説明
    • デフォルトでは、Realtime Compute for Apache Flink はインターネットにアクセスできません。ただし、Alibaba Cloud では NAT ゲートウェイを提供しており、VPC とインターネット間の通信を有効化できます。詳細については、「インターネットへのアクセス方法」をご参照ください。

    • SLS へのインターネット経由のアクセスは避けてください。やむを得ずインターネット経由でアクセスする場合は、HTTPS を使用し、SLS の転送アクセラレーションを有効化してください。

    project

    SLS プロジェクトの名称です。

    String

    はい

    なし

    該当なし。

    logStore

    SLS Logstore または Metricstore の名称です。

    String

    はい

    なし

    Logstore 内のデータは、Metricstore 内のデータと同じ方法で消費されます。

    accessId

    ご利用の Alibaba Cloud アカウントの AccessKey ID です。

    String

    はい

    デフォルト値はありません

    詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。

    重要

    AccessKey ペアを保護するため、変数を使用して AccessKey を構成することを推奨します。

    accessKey

    ご利用の Alibaba Cloud アカウントの AccessKey Secret です。

    String

    はい

    デフォルト値はありません

  • ソース固有

    パラメーター

    説明

    データ型

    必須?

    デフォルト値

    備考

    enableNewSource

    FLIP-27 を実装する新しいソースインターフェイスを使用するかどうかを指定します。

    Boolean

    いいえ

    false

    新しいソースは、シャードの変更に自動的に適応し、すべてのソースサブタスクにシャードを均等に分散します。

    重要
    • このオプションは VVR 8.0.9 以降でのみサポートされます。VVR 11.1 以降では、このオプションのデフォルト値は true です。

    • このオプションを変更すると、ジョブは保存された状態から再開できなくなります。回避策として、まず consumerGroup オプションを指定してジョブを起動し、現在のコンシューマオフセットを記録します。その後、consumeFromCheckpoint を true に設定して、状態なしでジョブを再起動します。

    • SLS に読み取り専用のシャードが含まれている場合、一部の Flink サブタスクがそれらのシャードからの読み取りを完了した後、他の未読シャードを要求することがあります。これにより、サブタスク間のシャード分散が不均等になり、全体的な消費効率およびシステムパフォーマンスが低下する可能性があります。この不均衡を軽減するには、ソースの並列度を調整するか、タスクスケジューリングを最適化するか、小さなシャードをマージしてください。

    shardDiscoveryIntervalMs

    シャードの変更を動的に検出する間隔です(単位:ミリ秒)。

    Long

    いいえ

    60000

    このオプションを負の値に設定すると、動的検出が無効になります。

    説明
    • このオプションの最小値は 1 分(60,000 ミリ秒)です。

    • このオプションは、enableNewSource が true に設定されている場合にのみ有効です。

    • このオプションは VVR 8.0.9 以降でのみサポートされます。

    startupMode

    ソーステーブルの起動モードです。

    String

    いいえ

    timestamp

    • timestamp(デフォルト):指定された時刻からログを消費します。

    • latest:最新のオフセットからログを消費します。

    • earliest:最も古いオフセットからログを消費します。

    • consumer_group:コンシューマグループに記録されたオフセットからログを消費します。シャードに対してオフセットが記録されていない場合は、最も古いオフセットからログを消費します。

    重要
    • VVR 11.1 より前のバージョンでは、consumer_group 値はサポートされていません。指定されたコンシューマグループによって記録されたオフセットからログを消費するには、consumeFromCheckpointtrue に設定します。この場合、この起動モードは適用されません。

    startTime

    ログの消費を開始する時刻です。

    String

    いいえ

    現在時刻

    書式:yyyy-MM-dd hh:mm:ss

    このオプションは、startupModetimestamp に設定されている場合にのみ有効です。

    説明

    startTime および stopTime オプションは、SLS の __timestamp__ フィールドではなく、__receive_time__ フィールドに基づいています。

    stopTime

    ログの消費を終了する時刻です。

    String

    いいえ

    なし

    書式:yyyy-MM-dd hh:mm:ss

    説明
    • このオプションは、履歴ログを消費する場合にのみ使用します。過去の時刻を指定してください。将来の時刻を指定すると、新しいログが書き込まれない場合に予期せず消費が停止する可能性があります。これは、エラーメッセージのない切断されたデータストリームとして表示されます。

    • ログの消費が完了した後に Flink プログラムを終了するには、exitAfterFinish も true に設定してください。

    consumerGroup

    コンシューマグループの名称です。

    String

    いいえ

    なし

    コンシューマグループは消費の進行状況を記録します。任意のカスタム名を指定できます。

    説明

    複数のジョブによる共同消費のために、コンシューマグループを共有することはできません。異なるジョブには異なるコンシューマグループを使用してください。異なるジョブで同じコンシューマグループを使用すると、各ジョブがすべてのデータを消費します。Flink が SLS からデータを消費する際には、SLS のコンシューマグループを通じてシャードが割り当てられません。そのため、同じコンシューマグループを共有していても、各ジョブが独立してすべてのメッセージを消費します。

    consumeFromCheckpoint

    指定されたコンシューマグループに保存されたチェックポイントからログを消費するかどうかを指定します。

    String

    いいえ

    false

    • true:このパラメーターを true に設定する場合、コンシューマグループも指定する必要があります。Flink は、コンシューマグループに格納されたチェックポイントからログを消費します。コンシューマグループにチェックポイントが存在しない場合は、startTime パラメーターで指定された時刻からログを消費します。

    • false(デフォルト):Flink は、指定されたコンシューマグループに保存されたチェックポイントからログを消費しません。

    重要

    このオプションは VVR 11.1 以降ではサポートされていません。VVR 11.1 以降では、startupModeconsumer_group に設定してください。

    maxRetries

    SLS からの読み取りが失敗した場合の再試行回数です。

    String

    いいえ

    3

    該当なし。

    batchGetSize

    1 回のリクエストで読み取るロググループの数です。

    String

    いいえ

    100

    batchGetSize を 1000 未満の値に設定してください。それ以外の場合、エラーが発生します。

    exitAfterFinish

    データ消費が完了した後に Flink プログラムを終了するかどうかを指定します。

    String

    いいえ

    false

    • true:データ消費が完了した後に Flink プログラムが終了します。

    • false(デフォルト):データ消費が完了した後も Flink プログラムは終了しません。

    query

    重要

    このオプションは VVR 11.3 で非推奨となりましたが、後続のバージョンでも互換性が維持されています。

    SLS データを消費する前にデータを前処理するために使用するクエリ文です。

    String

    いいえ

    デフォルト値はありません

    クエリオプションを使用して、SLS データを消費する前にフィルター処理することで、すべてのデータを Flink に読み込むことを回避し、コスト削減および処理速度の向上を図ることができます。

    例: 'query' = '*| where request_method = ''GET''' は、Flink がログを読み取る前に、request_method フィールドが GET に等しいログをフィルターします。

    説明

    クエリは、SPL 構文を使用して記述してください。

    重要
    • このオプションは VVR 8.0.1 以降でのみサポートされます。

    • この機能は SLS の課金対象となります。詳細については、「課金」をご参照ください。

    processor

    SLS コンシューマプロセッサーです。query および processor の両方が設定されている場合、query が優先されます。

    String

    いいえ

    なし

    processor オプションを使用して、SLS データを消費する前にフィルター処理することで、すべてのデータを Flink に読み込むことを回避し、コスト削減および処理速度の向上を図ることができます。query の代わりに processor を使用することを推奨します。

    たとえば、 'processor' = 'test-filter-processor' は、SLS コンシューマプロセッサーを適用して、Flink が SLS データを読み取る前にデータをフィルター処理します。

    説明

    プロセッサーは、SPL 構文を使用して記述してください。SLS コンシューマプロセッサーの作成および更新の詳細については、「コンシューマプロセッサーの管理」をご参照ください。

    重要

    このオプションは VVR 11.3 以降でのみサポートされます。

    この機能は SLS の課金対象となります。詳細については、「課金」をご参照ください。

  • シンク固有

    パラメーター

    説明

    データ型

    必須?

    デフォルト値

    備考

    topicField

    値が __topic__ フィールドをオーバーライドするフィールドの名称です。これはログのトピックを示します。

    String

    いいえ

    なし

    このパラメーターは、テーブル内の既存のフィールドを指定します。

    timeField

    値が __timestamp__ フィールドをオーバーライドするフィールドの名称です。これはログの書き込み時刻を示します。

    String

    いいえ

    現在時刻

    このフィールドはテーブルに存在する必要があり、その型は INT である必要があります。指定されていない場合は、現在時刻が使用されます。

    sourceField

    値が __source__ フィールドをオーバーライドするフィールドの名称です。これはログの送信元(例:ログを生成したマシンの IP アドレス)を示します。

    String

    いいえ

    なし

    このフィールドはテーブルに存在する必要があります。

    partitionField

    フィールドの名称です。データ書き込み時に、このフィールドの値からハッシュ値が計算されます。ハッシュ値が同じデータは、同じシャードに書き込まれます。

    String

    いいえ

    デフォルト値はありません

    指定されていない場合、各データエントリは利用可能なシャードにランダムに書き込まれます。

    buckets

    partitionField が指定されている場合に、ハッシュ値で再グループ化するバケット数です。

    String

    いいえ

    64

    有効な値:[1, 256]。値は 2 の累乗である必要があります。バケット数はシャード数以上である必要があります。そうでないと、一部のシャードにデータが書き込まれません。

    flushIntervalMs

    データ書き込みをトリガーする間隔です。

    String

    いいえ

    2000

    単位:ミリ秒。

    writeNullProperties

    null 値を SLS に空文字列として書き込むかどうかを指定します。

    Boolean

    いいえ

    true

    • true(デフォルト):null 値を空文字列として書き込みます。

    • false:計算結果が null であるフィールドを書き込みません。

    説明

    このオプションは VVR 8.0.6 以降でのみサポートされます。

データ型マッピング

Flink データ型

SLS データ型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

データインジェスト(パブリックプレビュー)

制限事項

リアルタイムコンピューティングエンジン Ververica Runtime (VVR) 11.1 以降でのみサポートされます。

構文

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

構成オプション

パラメーター

説明

データ型

必須?

デフォルト値

備考

type

データソースの種類です。

String

はい

なし

値を sls に設定します。

endpoint

エンドポイントアドレスです。

String

はい

デフォルト値はありません

SLS の VPC エンドポイントを入力します。詳細については、「エンドポイント」をご参照ください。

説明
  • デフォルトでは、Realtime Compute for Apache Flink はインターネットにアクセスできません。ただし、Alibaba Cloud では NAT ゲートウェイを提供しており、VPC とインターネット間の通信を有効化できます。詳細については、「インターネットへのアクセス方法」をご参照ください。

  • SLS へのインターネット経由のアクセスは避けてください。やむを得ずインターネット経由でアクセスする場合は、HTTPS を使用し、SLS の転送アクセラレーションを有効化してください。

accessId

ご利用の Alibaba Cloud アカウントの AccessKey ID です。

String

はい

デフォルト値はありません

詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。

重要

AccessKey ペアを保護するため、変数を使用して AccessKey を構成することを推奨します。

accessKey

ご利用の Alibaba Cloud アカウントの AccessKey Secret です。

String

はい

なし

project

SLS プロジェクトの名称です。

String

はい

なし

該当なし。

logStore

SLS Logstore または Metricstore の名称です。

String

はい

なし

Logstore 内のデータは、Metricstore 内のデータと同じ方法で消費されます。

schema.inference.strategy

スキーマ推論の戦略です。

String

いいえ

continuous

  • continuous:各データエントリに対してスキーマ推論を実行します。スキーマが互換でない場合、より広いスキーマを推論し、スキーマ変更イベントを生成します。

  • static:ジョブ起動時に一度だけスキーマ推論を実行します。その後のデータは初期スキーマを使用して解析します。スキーマ変更イベントは生成されません。

maxPreFetchLogGroups

初期スキーマ推論中に、各シャードから読み取りおよび解析する最大ロググループ数です。

Integer

いいえ

50

データの読み取りおよび処理を開始する前に、コネクタは各シャードから指定された数のロググループを事前に消費してスキーマを初期化しようと試みます。

shardDiscoveryIntervalMs

シャードの変更を動的に検出する間隔です(単位:ミリ秒)。

Long

いいえ

60000

このオプションを負の値に設定すると、動的検出が無効になります。

説明

このオプションの最小値は 1 分(60,000 ミリ秒)です。

startupMode

起動モードです。

String

いいえ

デフォルト値はありません

  • timestamp(デフォルト):指定された時刻からログを消費します。

  • latest:最新のオフセットからログを消費します。

  • earliest:最も古いオフセットからログを消費します。

  • consumer_group:コンシューマグループに記録されたオフセットからログを消費します。シャードに対してオフセットが記録されていない場合は、最も古いオフセットからログを消費します。

startTime

ログの消費を開始する時刻です。

String

いいえ

現在時刻

書式:yyyy-MM-dd hh:mm:ss。

このオプションは、startupMode が timestamp に設定されている場合にのみ有効です。

説明

startTime および stopTime オプションは、SLS の __timestamp__ フィールドではなく、__receive_time__ フィールドに基づいています。

stopTime

ログの消費を終了する時刻です。

String

いいえ

デフォルト値はありません

書式:yyyy-MM-dd hh:mm:ss。

説明

ログの消費が完了した後に Flink プログラムを終了するには、exitAfterFinish も true に設定してください。

consumerGroup

コンシューマグループの名称です。

String

いいえ

なし

コンシューマグループは消費の進行状況を記録します。任意のカスタム名を指定できます。

batchGetSize

1 回のリクエストで読み取るロググループの数です。

Integer

いいえ

100

batchGetSize は 1000 未満である必要があります。それ以外の場合、エラーが発生します。

maxRetries

SLS からの読み取りが失敗した場合の再試行回数です。

Integer

いいえ

3

該当なし

exitAfterFinish

データ消費が完了した後に Flink プログラムを終了するかどうかを指定します。

Boolean

いいえ

false

  • true:データ消費が完了した後に Flink プログラムが終了します。

  • false(デフォルト):データ消費が完了した後も Flink プログラムは終了しません。

query

SLS データを消費する前にデータを前処理するために使用するクエリ文です。

String

いいえ

デフォルト値はありません

クエリオプションを使用して、SLS データを消費する前にフィルター処理することで、すべてのデータを Flink に読み込むことを回避し、コスト削減および処理速度の向上を図ることができます。

たとえば、'query' = '*| where request_method = ''GET''' は、Flink が SLS データを読み取る前に、request_method フィールドが GET に等しいデータをフィルター処理することを意味します。

説明

クエリは、SPL 構文を使用して記述してください。

重要
  • この機能をサポートするリージョンについては、「ルールに基づくログの消費」をご参照ください。

  • この機能はパブリックプレビュー期間中は無料です。今後課金される可能性があります。詳細については、「課金」をご参照ください。

compressType

SLS で使用される圧縮タイプです。

String

いいえ

デフォルト値はありません

サポートされる圧縮タイプは以下のとおりです:

  • lz4

  • deflate

  • zstd

timeZone

startTime および stopTime のタイムゾーンです。

String

いいえ

デフォルト値はありません

デフォルトではオフセットは追加されません。

regionId

SLS が展開されているリージョンです。

String

いいえ

デフォルト値はありません

構成の詳細については、「リージョン」をご参照ください。

signVersion

SLS リクエストで使用される署名バージョンです。

String

いいえ

デフォルト値はありません

構成の詳細については、「リクエスト署名」をご参照ください。

shardModDivisor

SLS Logstore シャードから読み取る際に使用される除算器です。

Int

いいえ

-1

構成の詳細については、「シャード」をご参照ください。

shardModRemainder

SLS Logstore シャードから読み取る際に使用される剰余です。

Int

いいえ

-1

構成の詳細については、「シャード」をご参照ください。

metadata.list

ダウンストリームに渡されるメタデータ列です。

String

いいえ

なし

利用可能なメタデータフィールドには、__source____topic____timestamp__、および __tag__ があります。カンマで区切ってください。

データ型マッピング

データインジェストのデータ型マッピングは以下のとおりです:

SLS データ型

CDC フィールド型

STRING

STRING

スキーマ推論および進化

  • 事前消費およびスキーマ初期化

    SLS コネクタは、現在の Logstore のスキーマを維持します。Logstore からデータを読み取る前に、コネクタは各シャードから最大 maxPreFetchLogGroups 個のロググループを事前に消費しようとして、各ログのスキーマを解析し、それらをマージしてテーブルスキーマを初期化します。その後、実際の消費を開始する前に、初期化されたスキーマに基づいてテーブル作成イベントを生成します。

    説明

    各シャードについて、コネクタは現在時刻の 1 時間前からデータを消費してスキーマを解析しようと試みます。

  • プライマリキー

    SLS ログにはプライマリキーが含まれていません。変換ルールを使用して、プライマリキーを手動で追加します:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • スキーマ推論および進化

    スキーマ初期化後、schema.inference.strategy が static に設定されている場合、コネクタは各ログエントリを初期スキーマを使用して解析し、スキーマ変更イベントを生成しません。一方、schema.inference.strategy が continuous に設定されている場合、コネクタは各ログエントリを解析し、物理列を推論して現在のスキーマと比較します。推論されたスキーマが現在のスキーマと異なる場合、コネクタは以下のルールを使用してスキーマをマージします:

    • 推論されたスキーマに現在のスキーマにないフィールドが含まれている場合、それらのフィールドを現在のスキーマに追加し、nullable 列追加イベントを生成します。

    • 推論されたスキーマに現在のスキーマに存在するフィールドが含まれていない場合、それらのフィールドを保持し、値を NULL に設定します。列削除イベントは生成しません。

    SLS コネクタは、すべてのフィールドを文字列として推論します。現在のところ、列の追加のみがサポートされています。新しい列は現在のスキーマの末尾に追加され、nullable としてマークされます。

サンプルコード

  • SQL ソーステーブルおよび結果テーブル

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • データインジェストソース

    SLS をデータインジェストソースとして使用することで、SLS データをリアルタイムでサポートされているダウンストリームシステムに書き込むことができます。たとえば、データインジェストジョブを構成して、Logstore から Paimon 形式の DLF データレイクにデータを書き込むことができます。このジョブは、データ型およびシンクテーブルスキーマを自動的に推論し、実行時の動的なスキーマ進化をサポートします。

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# テーブルにプライマリキー情報を追加
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# test_log から test_database.inventory テーブルにすべてのデータを書き込む
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (任意)削除ベクターを有効化して読み取りパフォーマンスを向上
  table.properties.deletion-vectors.enabled: true

DataStream API

重要

DataStream API を使用してデータを読み取るまたは書き込むには、Realtime Compute for Apache Flink に接続するための対応する DataStream コネクタを使用します。詳細については、「DataStream コネクタの使用方法」をご参照ください。

VVR 8.0.10 より前のバージョンを使用している場合、依存関係 JAR パッケージが不足しているためにジョブの起動に失敗することがあります。これを解決するには、対応する uber JAR パッケージを追加の依存関係として追加します。

SLS からのデータ読み取り

Realtime Compute for Apache Flink では、SLS からデータを読み取るための SourceFunction 実装である SlsSourceFunction が提供されています。サンプルコードを以下に示します。

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // ストリーミング実行環境を設定
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // SLS ソースおよびシンクを作成して追加
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // バッチ取得サイズを指定する必要があります
        accessInfo.setBatchGetSize(10);

        // 任意のパラメーター
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // 消費を開始する時刻を現在時刻に設定
        int startInSec = (int) (new Date().getTime() / 1000);

        // 消費を終了する時刻を -1 に設定(終了しない)
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

SLS へのデータ書き込み

Realtime Compute for Apache Flink では、SLS にデータを書き込むための OutputFormat 実装である SLSOutputFormat が提供されています。サンプルコードを以下に示します。

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

SLS DataStream コネクタ は Maven Central Repository で利用可能です。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-format-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>

よくある質問

Flink プログラムの復元時に TaskManager で OOM エラーが発生し、「java.lang.OutOfMemoryError: Java heap space」というエラーメッセージがソーステーブルに表示された場合、どうすればよいですか?