Topik ini menjelaskan cara menggunakan konektor Layanan Log Sederhana (SLS).
Informasi latar belakang
Layanan Log Sederhana adalah layanan pencatatan data ujung ke ujung yang dikembangkan oleh Alibaba Cloud. Layanan ini memungkinkan pengumpulan, konsumsi, pengiriman, penanyakan, dan analisis data log secara efisien, meningkatkan efisiensi operasional serta menyediakan kemampuan untuk memproses sejumlah besar data log.
Tabel berikut menjelaskan kemampuan yang didukung oleh konektor SLS.
Kategori | Deskripsi |
Jenis yang Didukung | Tabel sumber dan tabel sink |
Mode operasi | Mode streaming |
Metrik | Tidak tersedia |
Format data | Tidak tersedia |
Jenis API | SQL, DataStream API, dan API YAML pengambilan data |
Pembaruan atau penghapusan data dalam tabel sink | Data dalam tabel sink tidak dapat diperbarui atau dihapus. Data hanya dapat dimasukkan ke dalam tabel sink. |
Fitur
Konektor sumber SLS dapat digunakan untuk membaca bidang atribut pesan. Tabel berikut menjelaskan bidang atribut yang didukung oleh konektor sumber SLS.
Bidang | Tipe | Deskripsi |
__source__ | STRING METADATA VIRTUAL | Sumber pesan. |
__topic__ | STRING METADATA VIRTUAL | Topik pesan. |
__timestamp__ | BIGINT METADATA VIRTUAL | Waktu ketika log dihasilkan. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | Tag pesan. Atribut |
Prasyarat
Proyek dan Logstore telah dibuat. Untuk informasi lebih lanjut, lihat Buat proyek dan logstore.
Batasan
Hanya Ververica Runtime (VVR) 11.1 atau versi lebih baru yang mendukung penggunaan SLS sebagai sumber pengambilan data.
Konektor SLS hanya mendukung semantik setidaknya sekali.
Untuk meningkatkan efisiensi sumber daya, atur paralelisme operator sumber ke nilai yang sama dengan atau kurang dari jumlah shard. Di VVR 8.0.5 atau lebih lama, jika paralelisme sumber melebihi jumlah shard dan jumlah shard berubah, pemulihan pekerjaan otomatis mungkin menjadi tidak valid, yang berpotensi menyebabkan shard yang tidak terkonsumsi.
SQL
Sintaks
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);Opsi konektor dalam klausa WITH
Umum
Opsi
Deskripsi
Tipe data
Diperlukan?
Nilai default
Catatan
connector
Konektor yang akan digunakan.
String
Ya
Tidak ada nilai default
Atur ke
sls.endPoint
Titik akhir SLS.
String
Ya
Tidak ada nilai default
Masukkan alamat akses VPC SLS. Untuk informasi lebih lanjut, lihat Titik akhir.
CatatanSecara default, Realtime Compute for Apache Flink tidak dapat mengakses Internet. Namun, Alibaba Cloud menyediakan NAT gateway untuk memungkinkan komunikasi antara VPC dan Internet. Untuk informasi lebih lanjut, lihat Bagaimana Realtime Compute for Apache Flink mengakses Internet?.
Kami sarankan agar Anda tidak mengakses SLS melalui Internet. Jika Anda perlu mengakses SLS melalui Internet, gunakan HTTPS dan aktifkan akselerasi transfer untuk SLS.
project
Nama proyek SLS.
String
Ya
Tidak ada nilai default
logStore
Nama Logstore atau Metricstore SLS.
STRING
Ya
Tidak ada nilai default
Data dalam Logstore dikonsumsi dengan cara yang sama seperti dalam Metricstore.
accessId
ID AccessKey akun Alibaba Cloud Anda.
STRING
Ya
Tidak ada nilai default
Untuk informasi lebih lanjut, lihat Bagaimana cara melihat pasangan AccessKey suatu akun?
PentingUntuk melindungi pasangan AccessKey Anda, konfigurasikan AccessKey Anda menggunakan variabel.
accessKey
Rahasia AccessKey akun Alibaba Cloud Anda.
STRING
Ya
Tidak ada nilai default
Spesifik Sumber
Opsi
Deskripsi
Tipe data
Diperlukan?
Nilai default
Catatan
enableNewSource
Menentukan apakah akan menggunakan antarmuka sumber refactor FLIP-27.
BOOLEAN
Tidak
falseAktifkan opsi ini dan sumber secara otomatis menyesuaikan dengan perubahan shard dan mendistribusikan shard di seluruh subtugas sumber seimbang mungkin.
CatatanHanya VVR 8.0.9 atau lebih baru yang mendukung opsi ini.
PentingMulai dari VVR 11.1, opsi ini diatur ke
truesecara default.Jika nilai opsi berubah, pekerjaan Anda tidak dapat dilanjutkan dari status tertentu. Untuk menyelesaikan ini, konfigurasikan opsi
consumerGroupuntuk mencatat offset konsumen saat ini dan mulai pekerjaan Anda. Kemudian, aturconsumeFromCheckpointketruedan mulai pekerjaan tanpa status.Saat subtugas sumber selesai membaca dari shard read-only, mereka terus meminta mengonsumsi shard lain. Hal ini dapat menyebabkan konsumsi shard yang tidak merata di antara subtugas sumber, memengaruhi kinerja keseluruhan pekerjaan. Untuk mengurangi masalah ini, pertimbangkan menyesuaikan paralelisme sumber, mengoptimalkan strategi penjadwalan Anda, atau menggabungkan shard kecil untuk menyederhanakan penugasan shard.
shardDiscoveryIntervalMs
Interval deteksi dinamis perubahan shard.
LONG
Tidak
60000Unit: milidetik.
Untuk menonaktifkan deteksi dinamis, atur opsi ke nilai negatif.
CatatanNilai opsi ini tidak boleh kurang dari 1 menit (atau 60.000 milidetik).
Opsi ini hanya berlaku jika opsi
enableNewSourcediatur ketrue.Hanya VVR 8.0.9 atau lebih baru yang mendukung opsi ini.
startupMode
Mode startup tabel sumber.
STRING
Tidak
timestamptimestamp: Log dikonsumsi dari waktu mulai yang ditentukan.latest: Log dikonsumsi dari offset terbaru.earliest: Log dikonsumsi dari offset paling awal.consumer_group: Log dikonsumsi dari offset yang direkam dalam kelompok konsumen. Jika kelompok konsumen tidak mencatat offset konsumsi shard, log dikonsumsi dari offset paling awal.
PentingDi versi VVR sebelum 11.1,
consumer_grouptidak lagi didukung. Untuk mengonsumsi data dari offset yang direkam dalam kelompok konsumen tertentu, aturconsumeFromCheckpointketrue.
startTime
Waktu mulai konsumsi log.
STRING
Tidak
Waktu saat ini
Nilai opsi ini dalam format
yyyy-MM-dd hh:mm:ss.Opsi ini berlaku hanya jika
startupModediatur ketimestamp.CatatanParameter startTime dan stopTime dikonfigurasi berdasarkan bidang __receive_time__ dalam tabel sumber SLS, bukan pada bidang __timestamp__.
stopTime
Waktu berhenti konsumsi log.
String
Tidak
Tidak ada nilai default
Nilai opsi ini dalam format
yyyy-MM-dd hh:mm:ss.CatatanUntuk hanya mengonsumsi log historis, atur opsi ini ke titik waktu historis tertentu. Menggunakan titik waktu di masa depan dapat menyebabkan konsumsi berhenti secara tak terduga jika injeksi log baru sementara terganggu. Gejala yang teramati adalah gangguan aliran data tanpa pesan kesalahan atau pengecualian yang menyertainya.
Jika Anda ingin program Realtime Compute for Apache Flink keluar setelah konsumsi log selesai, Anda juga harus mengonfigurasi opsi
exitAfterFinishdan atur opsiexitAfterFinishketrue.
consumerGroup
Nama grup konsumen.
STRING
Tidak
Tidak ada nilai default
Grup konsumen mencatat kemajuan konsumsi. Anda dapat menentukan nama grup konsumen kustom. Format nama tidak tetap.
CatatanGrup konsumen tidak dapat dibagikan oleh beberapa pekerjaan untuk konsumsi kolaboratif. Kami sarankan Anda menentukan grup konsumen berbeda untuk pekerjaan berbeda. Jika Anda menentukan grup konsumen yang sama untuk pekerjaan berbeda, semua data dikonsumsi. Ketika Realtime Compute for Apache Flink mengonsumsi data dari SLS, data tidak di-shard dalam grup konsumen. Oleh karena itu, jika beberapa pekerjaan berbagi grup konsumen yang sama, semua pesan dalam grup konsumen dikonsumsi oleh setiap pekerjaan.
consumeFromCheckpoint
Menentukan apakah akan mengonsumsi log dari checkpoint yang disimpan dalam grup konsumen yang ditentukan.
STRING
Tidak
falsetrue: Jika Anda mengatur opsi ini ke true, Anda juga harus menentukan grup konsumen. Flink mengonsumsi log dari titik kontrol yang disimpan dalam grup konsumen. Jika tidak ada titik kontrol dalam grup konsumen, Flink mengonsumsi log dari waktu yang ditentukan oleh opsistartTime.false: Flink tidak mengonsumsi log dari titik kontrol yang disimpan dalam grup konsumen tertentu.
PentingMulai dari VVR 11.1, opsi ini tidak lagi didukung. Anda perlu mengatur
startupModekeconsumer_group.maxRetries
Jumlah percobaan ulang yang diizinkan ketika gagal membaca data dari SLS.
String
Tidak
3batchGetSize
Jumlah grup log yang dibaca dalam satu permintaan.
String
Tidak
100Untuk mencegah kesalahan, atur
batchGetSizeke nilai kurang dari 1000.exitAfterFinish
Menentukan apakah program Realtime Compute for Apache Flink keluar setelah konsumsi data selesai.
String
Tidak
falsetruefalse
query
PentingOpsi ini sudah ditinggalkan di VVR 11.3, tetapi versi berikutnya tetap kompatibel.
Pernyataan query yang digunakan untuk memproses data sebelum konsumsi data dimulai.
STRING
Tidak
Tidak ada nilai default
Mengonfigurasi opsi ini untuk menyaring data dari SLS sebelum konsumsi data dimulai, mengurangi biaya dan meningkatkan efisiensi pemrosesan data.
Sebagai contoh, jika Anda menentukan
'query' = '*| where request_method = ''GET''', Realtime Compute for Apache Flink menyaring data di mana nilai bidangrequest_methodsama denganGETsebelum konsumsi data.CatatanGunakan sintaks SPL saat mengonfigurasi opsi ini.
PentingHanya VVR 8.0.1 atau lebih baru yang mendukung opsi ini.
Fitur ini membebankan biaya dari SLS. Untuk detailnya, lihat Tagihan.
processor
Prosesor SLS. Jika kedua opsi ini dan
querydiatur, makaquerymemiliki prioritas lebih tinggi.STRING
Tidak
Tidak ada nilai default
Opsi ini secara fungsional setara dengan
query, tetapi kami sarankan menggunakan opsi ini. Sebagai contoh, pengaturan'processor' = 'test-filter-processor'menunjukkan bahwa data akan difilter oleh prosesor SLS sebelum dikonsumsi oleh Flink.CatatanGunakan sintaksis SPL saat mengonfigurasi opsi ini.
PentingHanya VVR 8.0.1 atau lebih baru yang mendukung opsi ini.
Fitur ini membebankan biaya dari SLS. Untuk detailnya, lihat Tagihan.
Spesifik Sink
Opsi
Deskripsi
Tipe data
Diperlukan?
Nilai default
Catatan
topicField
Menentukan nama bidang. Nilai opsi ini menimpa nilai bidang __topic__ untuk menunjukkan topik log.
String
Tidak
Tidak ada nilai default
Nilai opsi ini harus berupa bidang yang ada dalam tabel.
timeField
Menentukan nama bidang. Nilai opsi ini menimpa nilai bidang __timestamp__ untuk menunjukkan waktu penulisan log.
String
Tidak
Waktu saat ini
Opsi ini harus diatur ke bidang INT yang ada. Jika tidak ada bidang yang ditentukan, waktu saat ini digunakan.
sourceField
Menentukan nama bidang. Nilai opsi ini menimpa nilai bidang atribut __source__ untuk menunjukkan asal log. Sebagai contoh, nilainya adalah alamat IP mesin yang menghasilkan log.
STRING
Tidak
Tidak ada nilai default
Nilai opsi ini harus berupa bidang yang ada dalam tabel.
partitionField
Menentukan nama bidang. Nilai hash dihitung berdasarkan nilai parameter ini ketika data ditulis ke SLS. Data yang memiliki nilai hash yang sama ditulis ke shard yang sama.
STRING
Tidak
Tidak ada nilai default
Jika Anda tidak menentukan opsi ini, setiap entri data secara acak ditulis ke shard yang tersedia.
buckets
Jumlah bucket yang dikelompokkan ulang berdasarkan nilai hash ketika opsi partitionField ditentukan.
String
Tidak
64Rentang nilai valid: [1,256]. Nilai opsi ini harus merupakan pangkat dua bilangan bulat. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, tidak ada data yang akan ditulis ke shard tertentu.
flushIntervalMs
Interval di mana penulisan data dipicu.
STRING
Tidak
2000Unit: milidetik.
writeNullProperties
Menentukan apakah akan menulis nilai null sebagai string kosong ke SLS.
BOOLEAN
Tidak
truetruefalse
CatatanHanya VVR 8.0.6 atau lebih baru yang mendukung opsi ini.
Pemetaan tipe data
Tipe data Realtime Compute for Apache Flink | Tipe data SLS |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Pengambilan data
Batasan
Hanya VVR 11.1 atau lebih baru yang mendukung pengambilan data dari SLS.
Sintaks
source:
type: sls
name: Sumber SLS
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>Opsi konfigurasi
Opsi | Deskripsi | Tipe data | Wajib? | Nilai default | Catatan |
jenis | Jenis sumber data. | String | Ya | Tidak ada nilai default | Atur ke |
Titik akhir | Titik akhir. | String | Ya | Tidak ada nilai default | Masukkan alamat akses VPC untuk SLS. Untuk informasi lebih lanjut, lihat Endpoints. Catatan
|
accessId | ID AccessKey dari akun Alibaba Cloud Anda. | String | Ya | Tidak ada nilai default | Lihat Bagaimana cara melihat pasangan AccessKey sebuah akun? Penting Untuk melindungi pasangan AccessKey Anda, gunakan variabel untuk mengonfigurasi ID AccessKey dan rahasia. |
accessKey | Rahasia AccessKey dari akun Alibaba Cloud Anda. | String | Ya | Tidak ada nilai default | |
Proyek | Nama dari proyek SLS. | String | Ya | Tidak ada nilai default | |
logStore | Nama dari sebuah Logstore atau Metricstore. | String | Ya | Tidak ada nilai default | Data dalam sebuah Logstore dikonsumsi dengan cara yang sama seperti dalam sebuah Metricstore. |
schema.inference.strategy | Strategi untuk inferensi skema. | String | Tidak |
|
|
maxPreFetchLogGroups | Jumlah maksimum grup log yang dibaca dan dianalisis untuk setiap shard selama inferensi skema awal. | Integer | Tidak |
| Sebelum data dimuat dan diproses, konektor mencoba mengonsumsi sejumlah grup log tertentu dari setiap shard terlebih dahulu untuk menginisialisasi skema. |
shardDiscoveryIntervalMs | Interval di mana perubahan pada shard dideteksi secara dinamis. | Long | Tidak |
| Atur opsi ini ke nilai negatif untuk menonaktifkan deteksi dinamis. Satuan: milidetik. Catatan Nilai dari opsi ini tidak boleh kurang dari 1 menit (yaitu, 60.000 milidetik). |
startupMode | Mode startup. | String | Tidak |
|
|
startTime | Waktu mulai konsumsi log. | String | Tidak | Waktu saat ini | Nilai dari opsi ini dalam format yyyy-MM-dd hh:mm:ss. Hanya berlaku jika Catatan Opsi |
stopTime | Waktu di mana konsumsi log dihentikan. | String | Tidak | Tidak ada nilai default | Nilai dari opsi ini berada dalam format yyyy-MM-dd hh:mm:ss. Catatan Untuk membatalkan pekerjaan Flink setelah selesai konsumsi log, atur |
consumerGroup | Nama grup konsumen. | String | Tidak | Tidak ada nilai default | Grup konsumen mencatat kemajuan konsumsi. Anda dapat menentukan nama grup konsumen kustom. Format nama tidak tetap. |
batchGetSize | Jumlah grup log yang dibaca dalam satu permintaan. | Integer | Tidak |
| Untuk mencegah kesalahan, atur |
maxRetries | Jumlah percobaan ulang setelah pembacaan dari SLS gagal. | Integer | Tidak |
| |
exitAfterFinish | Menentukan apakah program Flink keluar setelah konsumsi data selesai. | Boolean | Tidak |
|
|
query | Pernyataan query yang digunakan untuk memproses data sebelum Flink mengonsumsi data dari SLS. | String | Tidak | Tidak ada nilai default | Mengonfigurasi opsi ini dapat membantu Anda menyaring data sebelum dikonsumsi, mengurangi biaya, dan meningkatkan efisiensi pemrosesan data. Sebagai contoh, Catatan Gunakan sintaksis SPL untuk menulis query. Penting
|
compressType | Tipe kompresi. | String | Tidak | Tidak ada nilai default | Nilai yang valid:
|
zonaWaktu | Zona waktu untuk | String | Tidak | Tidak ada nilai default | Secara default, tidak ada offset yang ditambahkan. |
regionId | Wilayah tempat SLS berada. | String | Tidak | Tidak ada nilai default | Lihat Wilayah yang didukung. |
signVersion | Versi tanda tangan permintaan SLS. | String | Tidak | Tidak ada nilai default | Lihat Tanda tangan permintaan. |
shardModDivisor | Pembagi yang digunakan saat membaca dari shard Logstore SLS. | Integer | Tidak |
| Lihat Shard untuk mengonfigurasi opsi ini. |
shardModRemainder | Sisa yang digunakan saat membaca dari shard Logstore SLS. | Integer | Tidak |
| Lihat Shard untuk mengonfigurasi opsi ini. |
metadata.list | Kolom metadata yang dilewatkan ke downstream. | String | Tidak | Tidak ada nilai default | Bidang metadata yang tersedia termasuk |
Pemetaan tipe data
Pemetaan tipe data untuk pengambilan data adalah sebagai berikut:
Tipe data SLS | Tipe data Flink CDC |
STRING | STRING |
Inferensi dan evolusi skema
Konsumsi Data Awal dan Inisialisasi Skema
Konektor SLS mempertahankan skema dari Logstore saat ini. Sebelum membaca data dari Logstore, konektor mencoba melakukan konsumsi awal hingga
maxPreFetchLogGroupsgrup log dari setiap shard dan menginisialisasi skema dengan cara menguraikan dan menggabungkan skema dari setiap log. Selanjutnya, sebelum konsumsi data dimulai, sebuah event pembuatan tabel dihasilkan berdasarkan skema yang telah diinisialisasi.CatatanUntuk setiap shard, konektor mencoba mengonsumsi data satu jam sebelum waktu saat ini untuk menguraikan skema.
Kunci Utama
Log SLS tidak mengandung kunci utama. Tambahkan secara manual kunci utama ke tabel di modul transform:
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2Inferensi dan Evolusi Skema
Setelah inisialisasi skema, jika schema.inference.strategy disetel ke
static, konektor menguraikan setiap entri log berdasarkan skema dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy disetel kecontinuous, konektor menguraikan setiap entri log, melakukan inferensi kolom fisik, dan membandingkannya dengan skema saat ini. Jika skema yang diinferensikan tidak konsisten dengan skema saat ini, skema digabungkan sesuai aturan berikut:Jika skema yang diinferensikan mengandung kolom fisik yang tidak ada dalam skema saat ini, kolom yang hilang ditambahkan ke skema saat ini, dan event penambahan kolom nullable dihasilkan.
Jika skema yang diinferensikan tidak mengandung kolom tertentu dalam skema saat ini, kolom tersebut dipertahankan dan nilainya disetel ke NULL.
Konektor SLS menginferensikan semua bidang sebagai bidang string. Saat ini, hanya penambahan kolom yang didukung. Kolom baru ditambahkan ke skema saat ini sebagai kolom nullable.
Kode contoh
Tabel Sumber dan Tabel Sink:
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;Sumber Pengambilan Data:
source: type: sls name: Sumber SLS endpoint: ${endpoint} project: ${project} logstore: ${logstore} accessId: ${accessId} accessKey: ${accessKey} sink: type: values name: Sink Nilai print.enabled: true sink.print.logger: true
DataStream API
Jika Anda ingin memanggil API DataStream untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Penggunaan konektor DataStream.
Jika Anda menggunakan versi VVR sebelum 8.0.10, Anda mungkin mengalami paket JAR dependensi yang hilang saat startup pekerjaan. Untuk menyelesaikannya, sertakan paket uber JAR yang sesuai sebagai dependensi tambahan.
Baca data dari SLS
VVR dari Realtime Compute for Apache Flink menyediakan kelas implementasi SlsSourceFunction dari SourceFunction untuk membaca data dari SLS. Contoh kode:
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Mengatur lingkungan eksekusi streaming
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Membuat dan menambahkan sumber dan sink SLS.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// Ukuran batch get harus diberikan.
accessInfo.setBatchGetSize(10);
// Parameter opsional
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// Waktu mulai konsumsi, disetel ke waktu saat ini.
int startInSec = (int) (new Date().getTime() / 1000);
// Waktu berhenti konsumsi, -1 berarti tidak pernah berhenti.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}Tulis data ke SLS
VVR dari Realtime Compute for Apache Flink menyediakan kelas implementasi SLSOutputFormat dari OutputFormat untuk menulis data ke SLS. Contoh kode:
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
DataStream Connector Layanan Log Sederhana dari versi yang berbeda disimpan di repositori pusat Maven.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>