このトピックでは、Simple Log Service (SLS) コネクタの使用方法について説明します。
背景情報
Simple Log Service は、ログデータ向けの包括的なサービスです。ログデータの迅速な収集、消費、配信、クエリを可能にし、運用保守 (O&M) と運用効率を向上させ、大規模なログ処理能力を構築します。
次の表に、SLS コネクタでサポートされている情報を示します。
カテゴリ | 詳細 |
サポートされる型 | ソーステーブルと結果テーブル |
実行モード | ストリーミングモードのみ |
カスタムモニタリングメトリック | 該当なし |
データフォーマット | なし |
API タイプ | SQL、DataStream、およびデータインジェスト YAML |
結果テーブルデータの更新または削除のサポート | 結果テーブルデータの更新または削除はサポートしていません。挿入操作のみをサポートします。 |
主な特徴
SLS コネクタのソーステーブルは、メッセージメタデータフィールドの直接読み取りをサポートしています。サポートされているメタデータフィールドを次の表に示します。
フィールド名 | フィールド型 | 説明 |
__source__ | STRING METADATA VIRTUAL | メッセージソース。 |
__topic__ | STRING METADATA VIRTUAL | メッセージトピック。 |
__timestamp__ | BIGINT METADATA VIRTUAL | ログ時間。 |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | メッセージタグ。 プロパティ |
前提条件
SLS プロジェクトと Logstore を作成する必要があります。詳細については、「プロジェクトと Logstore の作成」をご参照ください。
制限事項
Ververica Runtime (VVR) 11.1 以降のバージョンのみが、データインジェスト YAML の同期データソースとして SLS をサポートします。
SLS コネクタは at-least-once セマンティクスを保証します。
リソースの無駄遣いを避けるため、ソースの並列度をシャード数より高く設定しないでください。VVR 8.0.5 以前のバージョンでは、シャード数が変更されると、自動フェイルオーバーが機能しなくなり、一部のシャードが消費されなくなる可能性があります。
SQL
構文
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);WITH パラメーター
共通パラメーター
パラメーター
説明
データ型
必須
デフォルト値
注意
connector
テーブルタイプ。
String
はい
なし
固定値:sls。
endPoint
エンドポイントアドレス。
String
はい
なし
SLS のプライベートネットワークエンドポイントを入力します。詳細については、「サービスエンドポイント」をご参照ください。
説明Realtime Compute for Apache Flink は、デフォルトではパブリックネットワークアクセスをサポートしていません。ただし、Alibaba Cloud NAT Gateway を使用すると、VPC ネットワークとパブリックネットワーク間の通信が可能になります。詳細については、「パブリックネットワークへのアクセス方法」をご参照ください。
パブリックネットワーク経由での SLS へのアクセスは避けてください。必要な場合は、HTTPS を使用し、SLS の Global Accelerator (GA) を有効にしてください。詳細については、「転送アクセラレーションの管理」をご参照ください。
project
SLS プロジェクト名。
String
はい
なし
なし。
logStore
SLS Logstore またはメトリックストア名。
String
はい
なし
Logstore とメトリックストアは同じ消費メソッドを使用します。
accessId
ご利用の Alibaba Cloud アカウントの AccessKey ID。
String
はい
なし
詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。
重要AccessKey 情報の漏洩を防ぐため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。
accessKey
ご利用の Alibaba Cloud アカウントの AccessKey Secret。
String
はい
なし
ソース固有のパラメーター
パラメーター
説明
データ型
必須
デフォルト値
注意
enableNewSource
FLIP-27 インターフェイスを実装する新しいソースを有効にするかどうか。
Boolean
いいえ
false
新しいソースはシャードの変更に自動的に適応し、すべてのソースタスクにシャードを均等に分散します。
重要このパラメーターは VVR 8.0.9 以降でサポートされています。VVR 11.1 以降、デフォルト値は true です。
このパラメーターを変更すると、ジョブは以前の状態から再開できません。過去のオフセットから消費を再開するには、まず consumerGroup パラメーターを指定してジョブを開始し、SLS コンシューマーグループに消費の進行状況を記録します。次に、consumeFromCheckpoint を true に設定し、状態なしでジョブを再起動します。
SLS に読み取り専用シャードが存在する場合、一部の Flink タスクは読み取り専用シャードの消費が完了した後も、他の未処理のシャードをリクエストし続けることがあります。これにより、同時実行タスク間でシャードの分散が不均一になり、全体的な消費効率とシステムパフォーマンスが低下する可能性があります。この問題を軽減するには、並列度を調整するか、タスクスケジューリングを最適化するか、または小さなシャードをマージしてシャード数とタスク割り当ての複雑さを軽減します。
shardDiscoveryIntervalMs
シャードの変更を動的に検出する間隔 (ミリ秒)。
Long
いいえ
60000
負の値を設定すると、動的検出が無効になります。
説明この値は 60000 ms (1 分) 以上である必要があります。
このパラメーターは、enableNewSource が true の場合にのみ有効です。
このパラメーターは VVR 8.0.9 以降でサポートされています。
startupMode
ソーステーブルの起動モード。
String
いいえ
timestamp
timestamp(デフォルト):指定された時刻からログの消費を開始します。latest:最新のオフセットからログの消費を開始します。earliest:最も早いオフセットからログの消費を開始します。consumer_group:コンシューマーグループに記録されたオフセットからログの消費を開始します。シャードにオフセットが記録されていない場合、消費は最も早いオフセットから開始されます。
重要VVR 11.1 より前のバージョンは consumer_group をサポートしていません。
consumeFromCheckpointをtrueに設定してください。消費は指定されたコンシューマーグループに記録されたオフセットから開始され、startupMode は効果がありません。
startTime
ログ消費の開始時間。
String
いいえ
現在の時刻
フォーマット:
yyyy-MM-dd hh:mm:ss。startupModeがtimestampに設定されている場合にのみ有効です。説明startTime と stopTime は、__timestamp__ ではなく、SLS の __receive_time__ 属性に基づいています。
stopTime
ログ消費の終了時間。
String
いいえ
なし
フォーマット:
yyyy-MM-dd hh:mm:ss。説明このパラメーターは、過去のログのみを消費する場合に使用します。過去の時刻に設定してください。未来の時刻に設定すると、新しいログがないために消費が早期に終了し、エラーメッセージなしでデータストリームが中断される可能性があります。
ログ消費が完了したときに Flink プログラムを終了するには、exitAfterFinish=true も設定する必要があります。
consumerGroup
コンシューマーグループ名。
String
いいえ
なし
コンシューマーグループは消費の進行状況を記録します。制限なくカスタム名を定義できます。
説明同じコンシューマーグループを使用して複数のジョブ間で消費を調整することはできません。各 Flink ジョブは一意のコンシューマーグループを使用する必要があります。複数のジョブが同じコンシューマーグループを共有すると、すべてのデータを消費してしまいます。これは、Flink が SLS コンシューマーグループを介してパーティションを割り当てないため、各コンシューマーが共有グループに関係なくメッセージを独立して処理するためです。
consumeFromCheckpoint
指定されたコンシューマーグループに保存されているチェックポイントからログの消費を開始するかどうか。
String
いいえ
false
true:コンシューマーグループを指定します。Flink プログラムは、そのグループに保存されているチェックポイントからログの消費を開始します。チェックポイントが存在しない場合、消費は startTime の値から開始されます。false(デフォルト):指定されたコンシューマーグループに保存されているチェックポイントからログの消費を開始しません。
重要VVR 11.1 以降では、このパラメーターはサポートされていません。VVR 11.1 以降では、
startupModeをconsumer_groupに設定してください。maxRetries
SLS の読み取りに失敗した後の再試行回数。
String
いいえ
3
なし。
batchGetSize
リクエストごとに読み取るロググループの数。
String
いいえ
100
batchGetSizeの値は 1000 を超えてはなりません。超えた場合、エラーが発生します。exitAfterFinish
データ消費が完了した後に Flink プログラムを終了するかどうか。
String
いいえ
false
true:データ消費が完了した後に Flink プログラムを終了します。false(デフォルト):データ消費が完了した後も Flink プログラムは実行を継続します。
query
重要VVR 11.3 以降では非推奨です。以降のバージョンでも互換性は維持されます。
SLS 消費の前処理ステートメント。
String
いいえ
なし
query パラメーターを使用して、消費前に SLS データをフィルターします。これにより、すべてのデータを Flink に消費することを避け、コストを節約し、処理速度を向上させます。
たとえば、
'query' = '*| where request_method = ''GET'''は、Flink が読み取る前に request_method フィールドが GET に等しいログに一致します。説明クエリには SPL 構文を使用します。詳細については、「SPL 構文」をご参照ください。
重要このパラメーターは VVR 8.0.1 以降でサポートされています。
この機能には SLS の料金が発生します。詳細については、「料金」をご参照ください。
processor
SLS コンシューマープロセッサ。query と両方が存在する場合、こちらが優先されます。
String
いいえ
なし
processor パラメーターを使用して、消費前に SLS データをフィルターします。これにより、すべてのデータを Flink に消費することを避け、コストを節約し、処理速度を向上させます。query よりも processor の使用を推奨します。
たとえば、
'processor' = 'test-filter-processor'は、Flink がデータを読み取る前に SLS コンシューマープロセッサを適用します。説明プロセッサには SPL 構文を使用します。詳細については、「SPL 構文」をご参照ください。コンシューマープロセッサの作成または更新の手順については、「コンシューマープロセッサの管理」をご参照ください。
重要このパラメーターは VVR 11.3 以降でサポートされています。
この機能には SLS の料金が発生します。詳細については、「料金」をご参照ください。
結果テーブルのみ
パラメーター
説明
データ型
必須
デフォルト値
注意
topicField
値が __topic__ メタデータフィールドをオーバーライドするフィールドの名前。ログトピックを表します。
String
いいえ
なし
このフィールドはテーブルに存在する必要があります。
timeField
値が __timestamp__ メタデータフィールドをオーバーライドするフィールドの名前。ログの書き込み時間を表します。
String
いいえ
現在の時刻
このフィールドはテーブルに存在し、データ型が INT である必要があります。指定しない場合、現在の時刻が使用されます。
sourceField
値が __source__ メタデータフィールドをオーバーライドするフィールドの名前。ログを生成したマシンの IP アドレスなどのログソースを表します。
String
いいえ
なし
このフィールドはテーブルに存在する必要があります。
partitionField
フィールドの名前。データはこのフィールドのハッシュ値に基づいてシャードに書き込まれます。同じハッシュ値を持つデータは同じシャードに書き込まれます。
String
いいえ
なし
指定しない場合、各レコードは利用可能なシャードにランダムに書き込まれます。
buckets
partitionField が指定された場合にデータを再割り当てするためのバケット数。
String
いいえ
64
有効な値:1 から 256 までの 2 のべき乗の整数。バケット数はシャード数以上である必要があります。そうでない場合、一部のシャードはデータを受信しません。
flushIntervalMs
データ書き込みをトリガーする時間間隔。
String
いいえ
2000
単位:ミリ秒。
writeNullProperties
null 値を空文字列として SLS に書き込むかどうか。
Boolean
いいえ
true
true(デフォルト):null 値を空文字列として書き込みます。false:書き込み中に null 値を持つフィールドをスキップします。
説明このパラメーターは VVR 8.0.6 以降でサポートされています。
型マッピング
Flink フィールド型 | SLS フィールド型 |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
データインジェスト (パブリックプレビュー)
制限事項
この機能は VVR 11.1 以降でのみサポートされています。
構文
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>設定項目
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 | ||||||||||
type | データソースタイプ。 | String | はい | なし | 固定値:sls。 | ||||||||||
endpoint | エンドポイントアドレス。 | String | はい | なし | SLS のプライベートネットワークエンドポイントを入力します。詳細については、「サービスエンドポイント」をご参照ください。 説明
| ||||||||||
accessId | ご利用の Alibaba Cloud アカウントの AccessKey ID。 | String | はい | なし | 詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。 | ||||||||||
accessKey | ご利用の Alibaba Cloud アカウントの AccessKey Secret。 | String | はい | なし | |||||||||||
project | SLS プロジェクト名。 | String | はい | なし | なし。 | ||||||||||
logStore | SLS Logstore またはメトリックストア名。 | String | はい | なし | Logstore とメトリックストアは同じ消費メソッドを使用します。 | ||||||||||
schema.inference.strategy | スキーマ推論戦略。 | String | いいえ | continuous |
| ||||||||||
maxPreFetchLogGroups | 初期スキーマ推論中に、シャードごとに読み取りと解析を試みるロググループの最大数。 | Integer | いいえ | 50 | 実際のデータ読み取りと処理の前に、指定された数のロググループをシャードごとに消費してスキーマ情報を初期化しようとします。 | ||||||||||
shardDiscoveryIntervalMs | シャードの変更を動的に検出する間隔 (ミリ秒)。 | Long | いいえ | 60000 | 負の値を設定すると、動的検出が無効になります。 説明 この値は 60000 ms (1 分) 以上である必要があります。 | ||||||||||
startupMode | 起動モード。 | String | いいえ | なし |
| ||||||||||
startTime | ログ消費の開始時間。 | String | いいえ | 現在の時刻 | フォーマット:yyyy-MM-dd hh:mm:ss。 startupMode が timestamp に設定されている場合にのみ有効です。 説明 startTime と stopTime は、__timestamp__ ではなく、SLS の __receive_time__ 属性に基づいています。 | ||||||||||
stopTime | ログ消費の終了時間。 | String | いいえ | なし | フォーマット:yyyy-MM-dd hh:mm:ss。 説明 ログ消費が完了したときに Flink プログラムを終了するには、exitAfterFinish=true も設定する必要があります。 | ||||||||||
consumerGroup | コンシューマーグループ名。 | String | いいえ | なし | コンシューマーグループは消費の進行状況を記録します。制限なくカスタム名を定義できます。 | ||||||||||
batchGetSize | リクエストごとに読み取るロググループの数。 | Integer | いいえ | 100 | batchGetSize の値は 1000 を超えてはなりません。超えた場合、エラーが発生します。 | ||||||||||
maxRetries | SLS の読み取りに失敗した後の再試行回数。 | Integer | いいえ | 3 | なし。 | ||||||||||
exitAfterFinish | データ消費が完了した後に Flink プログラムを終了するかどうか。 | Boolean | いいえ | false |
| ||||||||||
query | SLS 消費の前処理ステートメント。 | String | いいえ | なし | query パラメーターを使用して、消費前に SLS データをフィルターします。これにより、すべてのデータを Flink に消費することを避け、コストを節約し、処理速度を向上させます。 たとえば、 説明 クエリには SPL 構文を使用します。詳細については、「SPL 構文」をご参照ください。 重要
| ||||||||||
compressType | SLS 圧縮タイプ。 | String | いいえ | なし | サポートされている圧縮タイプ:
| ||||||||||
timeZone | startTime と stopTime のタイムゾーン。 | String | いいえ | なし | デフォルトではオフセットは追加されません。 | ||||||||||
regionId | SLS が利用可能なリージョン。 | String | いいえ | なし | 設定の詳細については、「利用可能なリージョン」ドキュメントをご参照ください。 | ||||||||||
signVersion | SLS リクエスト署名バージョン。 | String | いいえ | なし | 設定の詳細については、「リクエスト署名」ドキュメントをご参照ください。 | ||||||||||
shardModDivisor | SLS Logstore シャードの読み取り時に使用される除数。 | Int | いいえ | -1 | 設定の詳細については、「シャード」ドキュメントをご参照ください。 | ||||||||||
shardModRemainder | SLS Logstore シャードの読み取り時に使用される剰余。 | Int | いいえ | -1 | 設定の詳細については、「シャード」ドキュメントをご参照ください。 | ||||||||||
metadata.list | ダウンストリームシステムに渡すメタデータ列。 | String | いいえ | なし | サポートされているメタデータフィールドには、 | ||||||||||
decode.table-id.fields | SLS ログデータを解析する際にテーブル ID を生成するために使用されるフィールド。 | String | いいえ | なし | 複数のフィールドは英語のカンマ (
説明 この設定は VVR 11.6 以降でサポートされています。 | ||||||||||
fixed-types | SLS ログデータを解析する際に指定するフィールド型。 | String | いいえ | なし | データを解析する際に、特定のフィールドのデータ型を指定します。複数のフィールドは英語のカンマ 説明 この設定は VVR 11.6 以降でサポートされています。 | ||||||||||
timestamp-format.standard | SLS ログデータ内のタイムスタンプフィールドのフォーマット。 | String | いいえ | SQL | 有効な値:
説明 この設定は VVR 11.6 以降でサポートされています。 | ||||||||||
ingestion.ignore-errors | データ解析中のエラーを無視するかどうか。 | Boolean | いいえ | false | 説明 この設定は VVR 11.6 以降でサポートされています。 | ||||||||||
ingestion.error-tolerance.max-count | ingestion.ignore-errors が有効な場合に、ジョブが失敗するまでに許容される解析エラーの最大数。 | Integer | いいえ | -1 | ingestion.ignore-errors が有効な場合にのみ効果があります。デフォルト値の -1 は、解析エラーがジョブの失敗をトリガーしないことを意味します。 説明 この設定は VVR 11.6 以降でサポートされています。 |
型マッピング
fixed-types が設定されていない場合、データインジェストの型マッピングは次のようになります:
SLS フィールド型 | CDC フィールド型 |
STRING | STRING |
fixed-types が設定されている場合、コネクタは指定された型を使用してデータの解析を試みます。
スキーマ推論とスキーマ変更の同期
シャードの事前消費とスキーマの初期化
SLS コネクタは、データを読み取る Logstore の現在のスキーマを維持します。データを読み取る前に、シャードごとに最大 `maxPreFetchLogGroups` 個のロググループを事前消費します。その後、各ログのスキーマを解析し、これらのスキーマをマージしてテーブル構造を初期化します。続いて、実際の消費の前に、初期化されたスキーマに基づいて対応するテーブル作成イベントを生成します。
説明各シャードについて、SLS コネクタは現在の時刻の 1 時間前からログスキーマの消費と解析を試みます。
プライマリキー情報
SLS ログにはプライマリキー情報が含まれていません。変換ルールを使用して手動でプライマリキーを追加できます:
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2スキーマ推論とスキーマ変更
スキーマの初期化後、schema.inference.strategy が static に設定されている場合、SLS コネクタは初期スキーマを使用して各ログを解析し、スキーマ変更イベントを生成しません。schema.inference.strategy が continuous に設定されている場合、SLS コネクタは各ログを解析し、物理列を推論し、それらを現在のスキーマと比較します。その後、スキーマが異なる場合にスキーマをマージします。マージルール:
推論された物理列に現在のスキーマに存在しないフィールドが含まれている場合、それらのフィールドはスキーマに追加され、新しい null 値許容列イベントが生成されます。
推論された物理列に現在のスキーマに既に存在するフィールドが含まれていない場合、それらのフィールドは残ります。そのデータは NULL で埋められ、列削除イベントは生成されません。
SLS コネクタは、ログ内のすべてのフィールド型を `STRING` として推論します。現在、列の追加のみがサポートされています。新しい列は現在のスキーマの末尾に追加され、null 値許容として設定されます。
コード例
SQL ソーステーブルと結果テーブル
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;データインジェストソース
SLS はデータインジェストジョブのデータソースとして機能し、SLS データをサポートされているダウンストリームシステムにリアルタイムで書き込みます。たとえば、データインジェストジョブを設定して、Logstore から DLF データレイクに Paimon 形式でデータを書き込むことができます。ジョブはフィールドのデータ型とダウンストリームテーブルの構造を自動的に推論し、実行時の動的なスキーマ進化をサポートします。
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# テーブルにプライマリキー情報を追加
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# test_log のすべてのデータを test_database.inventory テーブルに書き込み
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) 削除ベクターを有効にして読み取りパフォーマンスを向上
table.properties.deletion-vectors.enabled: trueDataStream API
DataStream を使用してデータを読み書きするには、Flink 用の対応する DataStream コネクタを使用する必要があります。手順については、「DataStream コネクタの使用」をご参照ください。
VVR 8.0.10 より前のバージョンを使用している場合、依存関係の JAR ファイルが不足しているとジョブが起動しないことがあります。対応する -uber JAR ファイルを追加の依存関係として追加する必要があります。
SLS からの読み取り
VVR は、SLS からの読み取り用に `SlsSourceFunction` クラスを提供します。次の例は、SLS から読み取る方法を示しています。
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// ストリーミング実行環境をセットアップします
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// SLS ソースとシンクを作成して追加します。
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// バッチ取得サイズを指定する必要があります。
accessInfo.setBatchGetSize(10);
// オプションのパラメーター
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// 消費を開始する時間。現在の時刻に設定します。
int startInSec = (int) (new Date().getTime() / 1000);
// 消費を停止する時間。-1 は停止しないことを意味します。
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}SLS への書き込み
VVR は、SLS への書き込み用に `SLSOutputFormat` クラスを提供します。次の例は、SLS に書き込む方法を示しています。
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
SLS DataStream コネクタは、次のリンクから Maven セントラルリポジトリで入手できます: SLS DataStream コネクタ。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-format-common</artifactId>
</exclusion>
</exclusions>
</dependency>