このトピックでは、Simple Log Service (SLS) コネクタの使用方法について説明します。
背景情報
Simple Log Service は、ログデータ向けのエンドツーエンドサービスです。ログデータの収集、消費、転送、照会、分析を迅速に実行できます。これにより、運用およびメンテナンスの効率が向上し、大規模なログ処理が可能になります。
SLS コネクタは、以下の種類の情報をサポートしています。
カテゴリ | 説明 |
サポートされる種類 | ソーステーブルおよび結果テーブル |
実行モード | ストリーミングモードのみ |
監視メトリクス | 該当なし |
データ形式 | 該当なし |
API の種類 | SQL、DataStream API、およびデータインジェスト YAML |
結果テーブルへのデータの更新または削除 | 結果テーブル内のデータを更新または削除することはできません。結果テーブルへのデータ挿入のみ可能です。 |
機能
SLS ソースコネクタは、メッセージ属性フィールドを直接読み取ります。以下の表に、サポートされる属性フィールドを示します。
フィールド名 | 型 | 説明 |
__source__ | STRING METADATA VIRTUAL | メッセージの送信元です。 |
__topic__ | STRING METADATA VIRTUAL | メッセージのトピックです。 |
__timestamp__ | BIGINT METADATA VIRTUAL | ログが生成された時刻です。 |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | メッセージのタグです。 属性 |
前提条件
SLS プロジェクトおよび Logstore を作成済みである必要があります。詳細については、「プロジェクトおよび Logstore の作成」をご参照ください。
制限事項
SLS をデータインジェストソースとして使用するには、Ververica Runtime (VVR) 11.1 以降が必要です。
SLS コネクタは、at-least-once セマンティクスのみをサポートします。
ソースの並列度をシャード数より高く設定しないでください。その場合、リソースが無駄になります。VVR 8.0.5 以前では、高い並列度を設定した後にシャード数が変更されると、自動フェールオーバーが失敗する可能性があります。これにより、一部のシャードが未消費のままになることがあります。
SQL
構文
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);WITH パラメーター
一般
パラメーター
説明
データ型
必須?
デフォルト値
備考
connector
テーブルの種類です。
String
はい
なし
値を sls に設定します。
endPoint
エンドポイントアドレスです。
String
はい
なし
SLS の VPC エンドポイントを入力します。詳細については、「エンドポイント」をご参照ください。
説明デフォルトでは、Realtime Compute for Apache Flink はインターネットにアクセスできません。ただし、Alibaba Cloud では NAT ゲートウェイを提供しており、VPC とインターネット間の通信を有効化できます。詳細については、「インターネットへのアクセス方法」をご参照ください。
SLS へのインターネット経由のアクセスは避けてください。やむを得ずインターネット経由でアクセスする場合は、HTTPS を使用し、SLS の転送アクセラレーションを有効化してください。
project
SLS プロジェクトの名称です。
String
はい
なし
該当なし。
logStore
SLS Logstore または Metricstore の名称です。
String
はい
なし
Logstore 内のデータは、Metricstore 内のデータと同じ方法で消費されます。
accessId
ご利用の Alibaba Cloud アカウントの AccessKey ID です。
String
はい
デフォルト値はありません
詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。
重要AccessKey ペアを保護するため、変数を使用して AccessKey を構成することを推奨します。
accessKey
ご利用の Alibaba Cloud アカウントの AccessKey Secret です。
String
はい
デフォルト値はありません
ソース固有
パラメーター
説明
データ型
必須?
デフォルト値
備考
enableNewSource
FLIP-27 を実装する新しいソースインターフェイスを使用するかどうかを指定します。
Boolean
いいえ
false
新しいソースは、シャードの変更に自動的に適応し、すべてのソースサブタスクにシャードを均等に分散します。
重要このオプションは VVR 8.0.9 以降でのみサポートされます。VVR 11.1 以降では、このオプションのデフォルト値は true です。
このオプションを変更すると、ジョブは保存された状態から再開できなくなります。回避策として、まず consumerGroup オプションを指定してジョブを起動し、現在のコンシューマオフセットを記録します。その後、consumeFromCheckpoint を true に設定して、状態なしでジョブを再起動します。
SLS に読み取り専用のシャードが含まれている場合、一部の Flink サブタスクがそれらのシャードからの読み取りを完了した後、他の未読シャードを要求することがあります。これにより、サブタスク間のシャード分散が不均等になり、全体的な消費効率およびシステムパフォーマンスが低下する可能性があります。この不均衡を軽減するには、ソースの並列度を調整するか、タスクスケジューリングを最適化するか、小さなシャードをマージしてください。
shardDiscoveryIntervalMs
シャードの変更を動的に検出する間隔です(単位:ミリ秒)。
Long
いいえ
60000
このオプションを負の値に設定すると、動的検出が無効になります。
説明このオプションの最小値は 1 分(60,000 ミリ秒)です。
このオプションは、enableNewSource が true に設定されている場合にのみ有効です。
このオプションは VVR 8.0.9 以降でのみサポートされます。
startupMode
ソーステーブルの起動モードです。
String
いいえ
timestamp
timestamp(デフォルト):指定された時刻からログを消費します。latest:最新のオフセットからログを消費します。earliest:最も古いオフセットからログを消費します。consumer_group:コンシューマグループに記録されたオフセットからログを消費します。シャードに対してオフセットが記録されていない場合は、最も古いオフセットからログを消費します。
重要VVR 11.1 より前のバージョンでは、consumer_group 値はサポートされていません。指定されたコンシューマグループによって記録されたオフセットからログを消費するには、
consumeFromCheckpointをtrueに設定します。この場合、この起動モードは適用されません。
startTime
ログの消費を開始する時刻です。
String
いいえ
現在時刻
書式:
yyyy-MM-dd hh:mm:ss。このオプションは、
startupModeがtimestampに設定されている場合にのみ有効です。説明startTime および stopTime オプションは、SLS の __timestamp__ フィールドではなく、__receive_time__ フィールドに基づいています。
stopTime
ログの消費を終了する時刻です。
String
いいえ
なし
書式:
yyyy-MM-dd hh:mm:ss。説明このオプションは、履歴ログを消費する場合にのみ使用します。過去の時刻を指定してください。将来の時刻を指定すると、新しいログが書き込まれない場合に予期せず消費が停止する可能性があります。これは、エラーメッセージのない切断されたデータストリームとして表示されます。
ログの消費が完了した後に Flink プログラムを終了するには、exitAfterFinish も true に設定してください。
consumerGroup
コンシューマグループの名称です。
String
いいえ
なし
コンシューマグループは消費の進行状況を記録します。任意のカスタム名を指定できます。
説明複数のジョブによる共同消費のために、コンシューマグループを共有することはできません。異なるジョブには異なるコンシューマグループを使用してください。異なるジョブで同じコンシューマグループを使用すると、各ジョブがすべてのデータを消費します。Flink が SLS からデータを消費する際には、SLS のコンシューマグループを通じてシャードが割り当てられません。そのため、同じコンシューマグループを共有していても、各ジョブが独立してすべてのメッセージを消費します。
consumeFromCheckpoint
指定されたコンシューマグループに保存されたチェックポイントからログを消費するかどうかを指定します。
String
いいえ
false
true:このパラメーターを true に設定する場合、コンシューマグループも指定する必要があります。Flink は、コンシューマグループに格納されたチェックポイントからログを消費します。コンシューマグループにチェックポイントが存在しない場合は、startTime パラメーターで指定された時刻からログを消費します。false(デフォルト):Flink は、指定されたコンシューマグループに保存されたチェックポイントからログを消費しません。
重要このオプションは VVR 11.1 以降ではサポートされていません。VVR 11.1 以降では、
startupModeをconsumer_groupに設定してください。maxRetries
SLS からの読み取りが失敗した場合の再試行回数です。
String
いいえ
3
該当なし。
batchGetSize
1 回のリクエストで読み取るロググループの数です。
String
いいえ
100
batchGetSizeを 1000 未満の値に設定してください。それ以外の場合、エラーが発生します。exitAfterFinish
データ消費が完了した後に Flink プログラムを終了するかどうかを指定します。
String
いいえ
false
true:データ消費が完了した後に Flink プログラムが終了します。false(デフォルト):データ消費が完了した後も Flink プログラムは終了しません。
query
重要このオプションは VVR 11.3 で非推奨となりましたが、後続のバージョンでも互換性が維持されています。
SLS データを消費する前にデータを前処理するために使用するクエリ文です。
String
いいえ
デフォルト値はありません
クエリオプションを使用して、SLS データを消費する前にフィルター処理することで、すべてのデータを Flink に読み込むことを回避し、コスト削減および処理速度の向上を図ることができます。
例:
'query' = '*| where request_method = ''GET'''は、Flink がログを読み取る前に、request_method フィールドが GET に等しいログをフィルターします。説明クエリは、SPL 構文を使用して記述してください。
重要このオプションは VVR 8.0.1 以降でのみサポートされます。
この機能は SLS の課金対象となります。詳細については、「課金」をご参照ください。
processor
SLS コンシューマプロセッサーです。query および processor の両方が設定されている場合、query が優先されます。
String
いいえ
なし
processor オプションを使用して、SLS データを消費する前にフィルター処理することで、すべてのデータを Flink に読み込むことを回避し、コスト削減および処理速度の向上を図ることができます。query の代わりに processor を使用することを推奨します。
たとえば、
'processor' = 'test-filter-processor'は、SLS コンシューマプロセッサーを適用して、Flink が SLS データを読み取る前にデータをフィルター処理します。説明プロセッサーは、SPL 構文を使用して記述してください。SLS コンシューマプロセッサーの作成および更新の詳細については、「コンシューマプロセッサーの管理」をご参照ください。
重要このオプションは VVR 11.3 以降でのみサポートされます。
この機能は SLS の課金対象となります。詳細については、「課金」をご参照ください。
シンク固有
パラメーター
説明
データ型
必須?
デフォルト値
備考
topicField
値が __topic__ フィールドをオーバーライドするフィールドの名称です。これはログのトピックを示します。
String
いいえ
なし
このパラメーターは、テーブル内の既存のフィールドを指定します。
timeField
値が __timestamp__ フィールドをオーバーライドするフィールドの名称です。これはログの書き込み時刻を示します。
String
いいえ
現在時刻
このフィールドはテーブルに存在する必要があり、その型は INT である必要があります。指定されていない場合は、現在時刻が使用されます。
sourceField
値が __source__ フィールドをオーバーライドするフィールドの名称です。これはログの送信元(例:ログを生成したマシンの IP アドレス)を示します。
String
いいえ
なし
このフィールドはテーブルに存在する必要があります。
partitionField
フィールドの名称です。データ書き込み時に、このフィールドの値からハッシュ値が計算されます。ハッシュ値が同じデータは、同じシャードに書き込まれます。
String
いいえ
デフォルト値はありません
指定されていない場合、各データエントリは利用可能なシャードにランダムに書き込まれます。
buckets
partitionField が指定されている場合に、ハッシュ値で再グループ化するバケット数です。
String
いいえ
64
有効な値:[1, 256]。値は 2 の累乗である必要があります。バケット数はシャード数以上である必要があります。そうでないと、一部のシャードにデータが書き込まれません。
flushIntervalMs
データ書き込みをトリガーする間隔です。
String
いいえ
2000
単位:ミリ秒。
writeNullProperties
null 値を SLS に空文字列として書き込むかどうかを指定します。
Boolean
いいえ
true
true(デフォルト):null 値を空文字列として書き込みます。false:計算結果が null であるフィールドを書き込みません。
説明このオプションは VVR 8.0.6 以降でのみサポートされます。
データ型マッピング
Flink データ型 | SLS データ型 |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
データインジェスト(パブリックプレビュー)
制限事項
リアルタイムコンピューティングエンジン Ververica Runtime (VVR) 11.1 以降でのみサポートされます。
構文
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>構成オプション
パラメーター | 説明 | データ型 | 必須? | デフォルト値 | 備考 |
type | データソースの種類です。 | String | はい | なし | 値を sls に設定します。 |
endpoint | エンドポイントアドレスです。 | String | はい | デフォルト値はありません | SLS の VPC エンドポイントを入力します。詳細については、「エンドポイント」をご参照ください。 説明
|
accessId | ご利用の Alibaba Cloud アカウントの AccessKey ID です。 | String | はい | デフォルト値はありません | 詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。 重要 AccessKey ペアを保護するため、変数を使用して AccessKey を構成することを推奨します。 |
accessKey | ご利用の Alibaba Cloud アカウントの AccessKey Secret です。 | String | はい | なし | |
project | SLS プロジェクトの名称です。 | String | はい | なし | 該当なし。 |
logStore | SLS Logstore または Metricstore の名称です。 | String | はい | なし | Logstore 内のデータは、Metricstore 内のデータと同じ方法で消費されます。 |
schema.inference.strategy | スキーマ推論の戦略です。 | String | いいえ | continuous |
|
maxPreFetchLogGroups | 初期スキーマ推論中に、各シャードから読み取りおよび解析する最大ロググループ数です。 | Integer | いいえ | 50 | データの読み取りおよび処理を開始する前に、コネクタは各シャードから指定された数のロググループを事前に消費してスキーマを初期化しようと試みます。 |
shardDiscoveryIntervalMs | シャードの変更を動的に検出する間隔です(単位:ミリ秒)。 | Long | いいえ | 60000 | このオプションを負の値に設定すると、動的検出が無効になります。 説明 このオプションの最小値は 1 分(60,000 ミリ秒)です。 |
startupMode | 起動モードです。 | String | いいえ | デフォルト値はありません |
|
startTime | ログの消費を開始する時刻です。 | String | いいえ | 現在時刻 | 書式:yyyy-MM-dd hh:mm:ss。 このオプションは、startupMode が timestamp に設定されている場合にのみ有効です。 説明 startTime および stopTime オプションは、SLS の __timestamp__ フィールドではなく、__receive_time__ フィールドに基づいています。 |
stopTime | ログの消費を終了する時刻です。 | String | いいえ | デフォルト値はありません | 書式:yyyy-MM-dd hh:mm:ss。 説明 ログの消費が完了した後に Flink プログラムを終了するには、exitAfterFinish も true に設定してください。 |
consumerGroup | コンシューマグループの名称です。 | String | いいえ | なし | コンシューマグループは消費の進行状況を記録します。任意のカスタム名を指定できます。 |
batchGetSize | 1 回のリクエストで読み取るロググループの数です。 | Integer | いいえ | 100 | batchGetSize は 1000 未満である必要があります。それ以外の場合、エラーが発生します。 |
maxRetries | SLS からの読み取りが失敗した場合の再試行回数です。 | Integer | いいえ | 3 | 該当なし |
exitAfterFinish | データ消費が完了した後に Flink プログラムを終了するかどうかを指定します。 | Boolean | いいえ | false |
|
query | SLS データを消費する前にデータを前処理するために使用するクエリ文です。 | String | いいえ | デフォルト値はありません | クエリオプションを使用して、SLS データを消費する前にフィルター処理することで、すべてのデータを Flink に読み込むことを回避し、コスト削減および処理速度の向上を図ることができます。 たとえば、 説明 クエリは、SPL 構文を使用して記述してください。 重要
|
compressType | SLS で使用される圧縮タイプです。 | String | いいえ | デフォルト値はありません | サポートされる圧縮タイプは以下のとおりです:
|
timeZone | startTime および stopTime のタイムゾーンです。 | String | いいえ | デフォルト値はありません | デフォルトではオフセットは追加されません。 |
regionId | SLS が展開されているリージョンです。 | String | いいえ | デフォルト値はありません | 構成の詳細については、「リージョン」をご参照ください。 |
signVersion | SLS リクエストで使用される署名バージョンです。 | String | いいえ | デフォルト値はありません | 構成の詳細については、「リクエスト署名」をご参照ください。 |
shardModDivisor | SLS Logstore シャードから読み取る際に使用される除算器です。 | Int | いいえ | -1 | 構成の詳細については、「シャード」をご参照ください。 |
shardModRemainder | SLS Logstore シャードから読み取る際に使用される剰余です。 | Int | いいえ | -1 | 構成の詳細については、「シャード」をご参照ください。 |
metadata.list | ダウンストリームに渡されるメタデータ列です。 | String | いいえ | なし | 利用可能なメタデータフィールドには、 |
データ型マッピング
データインジェストのデータ型マッピングは以下のとおりです:
SLS データ型 | CDC フィールド型 |
STRING | STRING |
スキーマ推論および進化
事前消費およびスキーマ初期化
SLS コネクタは、現在の Logstore のスキーマを維持します。Logstore からデータを読み取る前に、コネクタは各シャードから最大 maxPreFetchLogGroups 個のロググループを事前に消費しようとして、各ログのスキーマを解析し、それらをマージしてテーブルスキーマを初期化します。その後、実際の消費を開始する前に、初期化されたスキーマに基づいてテーブル作成イベントを生成します。
説明各シャードについて、コネクタは現在時刻の 1 時間前からデータを消費してスキーマを解析しようと試みます。
プライマリキー
SLS ログにはプライマリキーが含まれていません。変換ルールを使用して、プライマリキーを手動で追加します:
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2スキーマ推論および進化
スキーマ初期化後、schema.inference.strategy が static に設定されている場合、コネクタは各ログエントリを初期スキーマを使用して解析し、スキーマ変更イベントを生成しません。一方、schema.inference.strategy が continuous に設定されている場合、コネクタは各ログエントリを解析し、物理列を推論して現在のスキーマと比較します。推論されたスキーマが現在のスキーマと異なる場合、コネクタは以下のルールを使用してスキーマをマージします:
推論されたスキーマに現在のスキーマにないフィールドが含まれている場合、それらのフィールドを現在のスキーマに追加し、nullable 列追加イベントを生成します。
推論されたスキーマに現在のスキーマに存在するフィールドが含まれていない場合、それらのフィールドを保持し、値を NULL に設定します。列削除イベントは生成しません。
SLS コネクタは、すべてのフィールドを文字列として推論します。現在のところ、列の追加のみがサポートされています。新しい列は現在のスキーマの末尾に追加され、nullable としてマークされます。
サンプルコード
SQL ソーステーブルおよび結果テーブル
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;データインジェストソース
SLS をデータインジェストソースとして使用することで、SLS データをリアルタイムでサポートされているダウンストリームシステムに書き込むことができます。たとえば、データインジェストジョブを構成して、Logstore から Paimon 形式の DLF データレイクにデータを書き込むことができます。このジョブは、データ型およびシンクテーブルスキーマを自動的に推論し、実行時の動的なスキーマ進化をサポートします。
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# テーブルにプライマリキー情報を追加
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# test_log から test_database.inventory テーブルにすべてのデータを書き込む
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意)削除ベクターを有効化して読み取りパフォーマンスを向上
table.properties.deletion-vectors.enabled: trueDataStream API
DataStream API を使用してデータを読み取るまたは書き込むには、Realtime Compute for Apache Flink に接続するための対応する DataStream コネクタを使用します。詳細については、「DataStream コネクタの使用方法」をご参照ください。
VVR 8.0.10 より前のバージョンを使用している場合、依存関係 JAR パッケージが不足しているためにジョブの起動に失敗することがあります。これを解決するには、対応する uber JAR パッケージを追加の依存関係として追加します。
SLS からのデータ読み取り
Realtime Compute for Apache Flink では、SLS からデータを読み取るための SourceFunction 実装である SlsSourceFunction が提供されています。サンプルコードを以下に示します。
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// ストリーミング実行環境を設定
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// SLS ソースおよびシンクを作成して追加
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// バッチ取得サイズを指定する必要があります
accessInfo.setBatchGetSize(10);
// 任意のパラメーター
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// 消費を開始する時刻を現在時刻に設定
int startInSec = (int) (new Date().getTime() / 1000);
// 消費を終了する時刻を -1 に設定(終了しない)
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}SLS へのデータ書き込み
Realtime Compute for Apache Flink では、SLS にデータを書き込むための OutputFormat 実装である SLSOutputFormat が提供されています。サンプルコードを以下に示します。
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
SLS DataStream コネクタ は Maven Central Repository で利用可能です。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-format-common</artifactId>
</exclusion>
</exclusions>
</dependency>