このトピックでは、Simple Log Service(SLS)コネクタの使用方法について説明します。
背景情報
Simple Log Service は、Alibaba Cloud によって開発されたエンドツーエンドのデータロギングサービスです。ログデータの効率的な収集、消費、転送、クエリ、分析が可能です。O&M 効率を向上させ、大量のログデータを処理する機能を提供します。
次の表は、SLS コネクタでサポートされている機能を示しています。
カテゴリ | 説明 |
サポートされているタイプ | ソーステーブルとシンクテーブル |
実行モード | ストリーミングモード |
メトリック | N/A |
データ形式 | N/A |
APIタイプ | SQL、DataStream API、およびデータインジェスチョン YAML API |
シンクテーブルのデータ更新または削除 | シンクテーブルのデータは更新または削除できません。データはシンクテーブルに挿入することのみ可能です。 |
機能
SLS ソースコネクタを使用して、メッセージの属性フィールドを読み取ることができます。次の表は、SLS ソースコネクタでサポートされている属性フィールドを示しています。
フィールド | タイプ | 説明 |
__source__ | STRING METADATA VIRTUAL | メッセージソース。 |
__topic__ | STRING METADATA VIRTUAL | メッセージトピック。 |
__timestamp__ | BIGINT METADATA VIRTUAL | ログが生成された時刻。 |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | メッセージタグ。
|
前提条件
プロジェクトとログストアが作成されていること。詳細については、「プロジェクトとログストアを作成する」をご参照ください。
制限事項
Ververica Runtime(VVR)11.1 以降のみが、データインジェスチョンソースとしての SLS の使用をサポートしています。
SLS コネクタは、少なくとも1回のセマンティクスのみをサポートしています。
リソース効率を高めるには、ソースオペレーターの並列処理をシャード数以下の値に設定します。VVR 8.0.5 以前では、ソースの並列処理がシャード数を超え、シャード数が変更された場合、自動ジョブフェールオーバーが無効になり、シャードが消費されない可能性があります。
SQL
構文
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);WITH 句のコネクタオプション
一般
オプション
説明
データ型
必須?
デフォルト値
備考
connector
使用するコネクタ。
文字列
はい
デフォルト値なし
slsに設定します。endPoint
SLS のエンドポイント。
文字列
はい
デフォルト値なし
SLS の VPC アクセスアドレスを入力します。詳細については、「エンドポイント」をご参照ください。
説明デフォルトでは、Realtime Compute for Apache Flink はインターネットにアクセスできません。ただし、Alibaba Cloud は、VPC とインターネット間の通信を可能にする NAT ゲートウェイを提供しています。詳細については、「Realtime Compute for Apache Flink はどのようにインターネットにアクセスしますか?」をご参照ください。
インターネット経由で SLS にアクセスしないことをお勧めします。インターネット経由で SLS にアクセスする必要がある場合は、HTTPS を使用し、SLS の 転送アクセラレーション を有効にします。
project
SLS プロジェクトの名前。
文字列
はい
デフォルト値なし
logStore
SLS ログストアまたはメトリックストアの名前。
STRING
はい
デフォルト値なし
ログストアのデータは、メトリックストアの場合と同じ方法で消費されます。
accessId
Alibaba CloudアカウントのAccessKey ID。
STRING
はい
デフォルト値なし
詳細については、「アカウントの AccessKey ペアを表示するにはどうすればよいですか?」をご参照ください。
重要AccessKey ペアを保護するために、変数 を使用して AccessKey を構成します。
accessKey
Alibaba CloudアカウントのAccessKeyシークレット。
STRING
はい
デフォルト値なし
ソース固有
オプション
説明
データ型
必須?
デフォルト値
備考
enableNewSource
FLIP-27 リファクタリングソースインターフェースを使用するかどうかを指定します。
BOOLEAN
いいえ
falseこのオプションを有効にすると、ソースはシャードの変更に自動的に適応し、ソースサブタスク間でシャードをできるだけ均等に分散します。
説明VVR 8.0.9 以降のみがこのオプションをサポートしています。
重要VVR 11.1 以降では、このオプションはデフォルトで
trueに設定されています。オプションの値が変更された場合、ジョブは特定の状態から再開できません。これを解決するには、
consumerGroupオプションを構成して現在のコンシューマオフセットを記録し、ジョブを開始します。次に、consumeFromCheckpointをtrueに設定し、状態なしでジョブを開始します。ソースサブタスクが読み取り専用シャードからの読み取りを完了すると、他のシャードの消費を要求し続けます。これにより、ソースサブタスク間でシャードの消費が不均一になり、ジョブ全体のパフォーマンスに影響を与える可能性があります。この問題を軽減するには、ソースの並列処理の調整、スケジューリング戦略の最適化、または小さなシャードのマージによるシャード割り当ての簡素化を検討してください。
shardDiscoveryIntervalMs
シャードの変更を動的に検出する間隔。
LONG
いいえ
60000単位:ミリ秒。
動的検出を無効にするには、オプションを負の値に設定します。
説明このオプションの値は 1 分(60,000 ミリ秒)未満にすることはできません。
このオプションは、
enableNewSourceオプションがtrueに設定されている場合にのみ有効になります。VVR 8.0.9 以降のみがこのオプションをサポートしています。
startupMode
ソーステーブルの起動モード。
STRING
いいえ
timestamptimestamp:ログは指定された開始時刻から消費されます。latest:ログは最新のオフセットから消費されます。earliest:ログは最も古いオフセットから消費されます。consumer_group: 使用者グループに記録されたオフセットからログが消費されます。使用者グループがシャードの消費オフセットを記録していない場合、ログは最も古いオフセットから消費されます。
重要VVR 11.1 より前のバージョンでは、
consumer_groupはサポートされなくなりました。指定された使用者グループに記録されたオフセットからデータを消費するには、consumeFromCheckpointをtrueに設定します。
startTime
ログの消費を開始する時刻。
STRING
いいえ
現在時刻
このオプションの値は、
yyyy-MM-dd hh:mm:ss形式です。このオプションは、
startupModeがtimestampに設定されている場合にのみ有効になります。説明startTime パラメーターと stopTime パラメーターは、__timestamp__ フィールドではなく、SLS ソーステーブルの __receive_time__ フィールドに基づいて構成されます。
stopTime
ログの消費を停止する時刻。
文字列
いいえ
デフォルト値なし
このオプションの値は、
yyyy-MM-dd hh:mm:ss形式です。説明履歴ログのみを消費するには、このオプションを特定の履歴時点に設定します。将来の時点を使用すると、新しいログのインジェストが一時的に中断された場合に、消費が予期せず停止する可能性があります。観測可能な症状は、付随するエラーメッセージや例外なしにデータストリームが中断することです。
ログの消費が完了した後に Realtime Compute for Apache Flink プログラムを終了させたい場合は、
exitAfterFinishオプションも設定し、exitAfterFinishオプションをtrueに設定する必要があります。
consumerGroup
コンシューマグループの名前。
STRING
いいえ
デフォルト値なし
コンシューマグループは消費の進捗状況を記録します。カスタムコンシューマグループ名を指定できます。名前の形式は固定されていません。
説明コンシューマーグループは、共同消費のために複数のジョブで共有することはできません。ジョブごとに異なるコンシューマーグループを指定することをお勧めします。異なるジョブに同じコンシューマーグループを指定すると、すべてのデータが消費されます。Realtime Compute for Apache Flink が SLS からデータを消費する場合、データはコンシューマーグループ内でシャーディングされません。したがって、複数のジョブが同じコンシューマーグループを共有する場合、コンシューマーグループ内のすべてのメッセージが各ジョブによって消費されます。
consumeFromCheckpoint
指定されたコンシューマグループに保存されているチェックポイントからログを消費するかどうかを指定します。
STRING
いいえ
falsetrue:このオプションを true に設定する場合は、コンシューマーグループも指定する必要があります。Flink は、コンシューマーグループに保存されているチェックポイントからログを消費します。コンシューマーグループにチェックポイントが存在しない場合、Flink はstartTimeオプションで指定された時刻からログを消費します。false:Flink は、指定されたコンシューマーグループに保存されているチェックポイントからログを消費しません。
重要VVR 11.1 以降、このオプションはサポートされなくなりました。
startupModeをconsumer_groupに設定する必要があります。maxRetries
SLS からデータを読み取ることができない場合に許可される再試行回数。
文字列
いいえ
3batchGetSize
リクエストで読み取るロググループの数。
文字列
いいえ
100エラーを防ぐために、
batchGetSizeを 1000 未満の値に設定します。exitAfterFinish
データ消費の完了後に Realtime Compute for Apache Flink プログラムを終了するかどうかを指定します。
文字列
いいえ
falsetruefalse
query
重要このオプションは VVR 11.3 で非推奨になりましたが、後続のバージョンでは互換性が維持されます。
データ消費前にデータを前処理するために使用されるクエリ文。
STRING
いいえ
デフォルト値なし
このオプションを構成すると、データ消費が始まる前に SLS からデータをフィルタリングして、コストを削減し、データ処理効率を向上させることができます。
たとえば、
'query' = '*| where request_method = ''GET'''を指定すると、Realtime Compute for Apache Flink は、データ消費前にrequest_methodフィールド値がGETと等しいデータをフィルタリングします。説明このオプションを構成する際は、SPL 構文 を使用してください。
重要VVR 8.0.1 以降のみがこのオプションをサポートしています。
この機能は SLS から料金が発生します。詳細については、「課金」をご参照ください。
processor
SLS プロセッサ。このオプションと
queryの両方が設定されている場合、queryが優先されます。STRING
いいえ
デフォルト値なし
このオプションは機能的に
queryと同等ですが、このオプションの使用をお勧めします。たとえば、'processor' = 'test-filter-processor'を設定すると、データは Flink によって消費される前に SLS プロセッサによってフィルター処理されることを示します。説明このオプションを設定する際は、「SPL 構文」をご参照ください。
重要VVR 8.0.1 以降のみがこのオプションをサポートしています。
この機能は SLS から料金が発生します。詳細については、「課金」をご参照ください。
シンク固有
オプション
説明
データ型
必須?
デフォルト値
備考
topicField
フィールド名を指定します。このオプションの値は、__topic__ フィールドの値を上書きして、ログのトピックを示します。
文字列
いいえ
デフォルト値なし
このオプションの値は、テーブルに存在するフィールドである必要があります。
timeField
フィールド名を指定します。このオプションの値は、__timestamp__ フィールドの値を上書きして、ログの書き込み時刻を示します。
文字列
いいえ
現在時刻
このオプションは、既存の INT フィールドに設定する必要があります。フィールドが指定されていない場合は、現在時刻が使用されます。
sourceField
フィールド名を指定します。このオプションの値は、__source__ 属性フィールドの値を上書きして、ログの発信元を示します。たとえば、値はログを生成するマシンの IP アドレスです。
STRING
いいえ
デフォルト値なし
このオプションの値は、テーブルに存在するフィールドである必要があります。
partitionField
フィールド名を指定します。SLS にデータが書き込まれるとき、このパラメーターの値に基づいてハッシュ値が計算されます。同じハッシュ値を含むデータは、同じシャードに書き込まれます。
STRING
いいえ
デフォルト値なし
このオプションを指定しない場合、各データエントリは使用可能なシャードにランダムに書き込まれます。
buckets
partitionField オプションが指定されている場合に、ハッシュ値に基づいて再グループ化されるバケットの数。
文字列
いいえ
64有効値の範囲:[1,256]。このオプションの値は 2 の整数乗である必要があります。バケットの数は、シャードの数以上である必要があります。そうでない場合、データは特定のシャードに書き込まれません。
flushIntervalMs
データ書き込みがトリガーされる間隔。
STRING
いいえ
2000単位:ミリ秒。
writeNullProperties
null 値を空の文字列として SLS に書き込むかどうかを指定します。
BOOLEAN
いいえ
truetruefalse
説明VVR 8.0.6 以降のみがこのオプションをサポートしています。
データ型のマッピング
Apache Flink用Realtime Computeのデータ型 | SLS のデータ型 |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
整数 | |
BIGINT | |
フロート | |
DOUBLE | |
デシマル |
データインジェスチョン
制限事項
VVR 11.1 以降のみが、SLS からのデータインジェスチョンをサポートしています。
構文
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>構成オプション
オプション | 説明 | データ型 | 必須? | デフォルト値 | 備考 |
type | データソースのタイプ。 | 文字列 | はい | デフォルト値なし |
|
endpoint | エンドポイント。 | 文字列 | はい | デフォルト値なし | SLS の VPC アクセスアドレスを入力します。詳細については、「エンドポイント」をご参照ください。 説明
|
accessId | Alibaba Cloud アカウントの AccessKey ID。 | 文字列 | はい | デフォルト値なし | 「アカウントの AccessKey ペアを表示するにはどうすればよいですか?」をご参照ください。 重要 AccessKey ペアを保護するために、変数 を使用して AccessKey ID とシークレットを構成します。 |
accessKey | Alibaba Cloud アカウントの AccessKey シークレット。 | 文字列 | はい | デフォルト値なし | |
project | SLS プロジェクトの名前。 | 文字列 | はい | デフォルト値なし | |
logStore | ログストアまたはメトリックストアの名前。 | 文字列 | はい | デフォルト値なし | ログストアのデータは、メトリックストアの場合と同じ方法で消費されます。 |
schema.inference.strategy | スキーマ推論の戦略。 | 文字列 | いいえ |
|
|
maxPreFetchLogGroups | 初期スキーマ推論中に各シャードに対して読み取りおよび解析されるロググループの最大数。 | 整数 | いいえ |
| データがロードおよび処理される前に、コネクタは各シャードから指定された数のロググループを事前に消費して、スキーマを初期化しようとします。 |
shardDiscoveryIntervalMs | シャードの変更を動的に検出する間隔。 | Long | いいえ |
| このオプションを負の値に設定すると、動的検出が無効になります。単位:ミリ秒。 説明 このオプションの値は 1 分(60,000 ミリ秒)未満にすることはできません。 |
startupMode | 起動モード。 | 文字列 | いいえ |
|
|
startTime | ログ消費の開始時刻。 | 文字列 | いいえ | 現在時刻 | このオプションの値は yyyy-MM-dd hh:mm:ss 形式です。
説明
|
stopTime | ログ消費の停止時刻。 | 文字列 | いいえ | デフォルト値なし | このオプションの値は yyyy-MM-dd hh:mm:ss 形式です。 説明 ログ消費の完了時に Flink ジョブをキャンセルするには、 |
consumerGroup | コンシューマーグループの名前。 | 文字列 | いいえ | デフォルト値なし | コンシューマーグループは消費の進捗状況を記録します。カスタムコンシューマーグループ名を指定できます。名前の形式は固定されていません。 |
batchGetSize | リクエストで読み取るロググループの数。 | 整数 | いいえ |
| エラーを防ぐために、 |
maxRetries | SLS からの読み取りが失敗した後の再試行回数。 | 整数 | いいえ |
| |
exitAfterFinish | データ消費の完了後に Flink プログラムを終了するかどうかを指定します。 | ブール値 | いいえ |
|
|
query | Flink が SLS からデータを消費する前に、データを前処理するために使用されるクエリ文。 | 文字列 | いいえ | デフォルト値なし | このオプションを構成すると、消費前にデータをフィルタリングして、コストを削減し、データ処理効率を向上させることができます。 たとえば、 説明 SPL 構文 を使用してクエリを記述します。 重要
|
compressType | 圧縮タイプ。 | 文字列 | いいえ | デフォルト値なし | 有効な値:
|
timeZone |
| 文字列 | いいえ | デフォルト値なし | デフォルトでは、オフセットは追加されません。 |
regionId | SLS が存在するリージョン。 | 文字列 | いいえ | デフォルト値なし | 「サポートされているリージョン」をご参照ください。 |
signVersion | SLS リクエスト署名バージョン。 | 文字列 | いいえ | デフォルト値なし | 「リクエスト署名」をご参照ください。 |
shardModDivisor | SLS ログストアシャードから読み取るときに使用される除数。 | 整数 | いいえ |
| このオプションを構成するには、「シャード」をご参照ください。 |
shardModRemainder | SLS ログストアシャードから読み取るときに使用される剰余。 | 整数 | いいえ |
| このオプションを構成するには、「シャード」をご参照ください。 |
metadata.list | ダウンストリームに渡されるメタデータ列。 | 文字列 | いいえ | デフォルト値なし | 使用可能なメタデータフィールドには、 |
データ型のマッピング
データインジェスチョンのデータ型のマッピングは次のとおりです。
SLS データ型 | Flink CDC データ型 |
STRING | STRING |
スキーマの推論と進化
データの事前消費とスキーマの初期化
SLS コネクタは、現在のログストアのスキーマを維持します。ログストアからデータを読み取る前に、コネクタは各シャードから最大
maxPreFetchLogGroups個のロググループを事前に消費しようと試み、各ログのスキーマを解析およびマージすることでスキーマを初期化します。その後、データ消費が始まる前に、初期化されたスキーマに基づいてテーブル作成イベントが生成されます。説明各シャードについて、コネクタは現在時刻の 1 時間前にデータを消費してスキーマを解析しようとします。
プライマリキー
SLS ログにはプライマリキーが含まれていません。変換モジュールでテーブルにプライマリキーを手動で追加します。
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2スキーマの推論と進化
スキーマの初期化後、schema.inference.strategy が
staticに設定されている場合、コネクタはスキーマに基づいて各ログエントリを解析し、スキーマ変更イベントを生成しません。schema.inference.strategy がcontinuousに設定されている場合、コネクタは各ログエントリを解析し、物理列を推論し、現在のスキーマと比較します。推論されたスキーマが現在のスキーマと一致しない場合、スキーマは次のルールに従ってマージされます。推論されたスキーマに現在のスキーマにない物理列が含まれている場合、不足している列が現在のスキーマに追加され、null 許容列追加イベントが生成されます。
推論されたスキーマに現在のスキーマの特定の列が含まれていない場合、これらの列は保持され、それらの値は NULL に設定されます。
SLS コネクタはすべてのフィールドを文字列フィールドとして推論します。現在、列の追加のみがサポートされています。新しい列は、null 許容列として現在のスキーマに追加されます。
サンプルコード
ソーステーブルとシンクテーブル:
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;データインジェスチョンソース:
source: type: sls name: SLS Source endpoint: ${endpoint} project: ${project} logstore: ${logstore} accessId: ${accessId} accessKey: ${accessKey} sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
DataStream API
DataStream API を呼び出してデータの読み取りまたは書き込みを行う場合は、関連タイプの DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。詳細については、「DataStream コネクタの使用方法」をご参照ください。
8.0.10 より前の VVR バージョンを使用している場合、ジョブの起動時に依存関係の JAR パッケージが見つからない場合があります。これを解決するには、対応する uber JAR パッケージを追加の依存関係として含めます。
SLS からデータを読み取る
Realtime Compute for Apache Flink の VVR は、SLS からデータを読み取るための SourceFunction の実装クラス SlsSourceFunction を提供します。サンプルコード:
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// ストリーミング実行環境を設定します
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// SLS ソースとシンクを作成して追加します。
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source"); // SLSストリームソース
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint"); // エンドポイント
accessInfo.setProjectName("yourProject"); // プロジェクト名
accessInfo.setLogstore("yourLogStore"); // ログストア
accessInfo.setAccessId("yourAccessId"); // アクセスID
accessInfo.setAccessKey("yourAccessKey"); // アクセスキー
// バッチ取得サイズを指定する必要があります。
accessInfo.setBatchGetSize(10);
// オプションパラメータ
accessInfo.setConsumerGroup("yourConsumerGroup"); // コンシューマグループ
accessInfo.setMaxRetries(3); // 最大再試行回数
// 消費開始時刻、現在時刻に設定。
int startInSec = (int) (new Date().getTime() / 1000);
// 消費停止時刻、-1 は停止しないことを意味します。
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}SLS にデータを書き込む
Realtime Compute for Apache Flink の VVR は、SLS にデータを書き込むための OutputFormat の実装クラス SLSOutputFormat を提供します。サンプルコード:
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink"); // SLSストリームシンク
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint"); // エンドポイント
conf.setString(SLSOptions.PROJECT, "yourProject"); // プロジェクト名
conf.setString(SLSOptions.LOGSTORE, "yourLogStore"); // ログストア
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId"); // アクセスID
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey"); // アクセスキー
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString()); // seed.toString() 用のテストメッセージです
record.setContent(logItem);
return record;
}
}XML
異なるバージョンの Simple Log Service DataStreamコネクタ は、Maven 中央リポジトリに保存されています。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>