Topik ini menjelaskan cara menggunakan konektor Simple Log Service (SLS).
Informasi latar belakang
Simple Log Service adalah layanan end-to-end untuk data log yang membantu Anda mengumpulkan, mengonsumsi, mengirimkan, mengkueri, dan menganalisis data log secara efisien. Layanan ini meningkatkan efisiensi operasi dan pemeliharaan serta mendukung pemrosesan log skala besar.
Konektor SLS mendukung jenis informasi berikut.
Kategori | Deskripsi |
Jenis yang didukung | Tabel sumber dan tabel sink |
Mode eksekusi | Hanya mode streaming |
Metrik pemantauan | N/A |
Format data | N/A |
Jenis API | SQL, DataStream API, dan YAML data ingestion |
Memperbarui atau menghapus data di tabel sink | Anda tidak dapat memperbarui atau menghapus data di tabel sink. Anda hanya dapat menyisipkan data ke tabel sink. |
Fitur
Konektor sumber SLS membaca bidang atribut pesan secara langsung. Tabel berikut mencantumkan bidang atribut yang didukung.
Nama bidang | Tipe | Deskripsi |
__source__ | STRING METADATA VIRTUAL | Sumber pesan. |
__topic__ | STRING METADATA VIRTUAL | Topik pesan. |
__timestamp__ | BIGINT METADATA VIRTUAL | Waktu saat log dihasilkan. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | Tag pesan. Untuk atribut |
Prasyarat
Anda telah membuat proyek SLS dan Logstore. Untuk informasi selengkapnya, lihat Buat proyek dan Logstore.
Batasan
Hanya Ververica Runtime (VVR) 11.1 atau yang lebih baru yang mendukung penggunaan SLS sebagai sumber data ingestion.
Konektor SLS hanya mendukung semantik at-least-once.
Hindari mengatur parallelisme sumber lebih tinggi daripada jumlah shard karena hal tersebut akan membuang sumber daya. Pada VVR 8.0.5 atau versi sebelumnya, jika jumlah shard berubah setelah Anda menetapkan parallelisme tinggi, failover otomatis mungkin gagal, sehingga menyebabkan beberapa shard tidak dikonsumsi.
SQL
Sintaks
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);DENGAN parameter
Umum
Parameter
Deskripsi
Tipe data
Wajib?
Nilai default
Keterangan
connector
Jenis tabel.
String
Ya
Tidak ada
Atur ke sls.
endPoint
Alamat endpoint.
String
Ya
Tidak ada
Masukkan endpoint VPC SLS. Untuk informasi selengkapnya, lihat Endpoints.
CatatanSecara default, Realtime Compute for Apache Flink tidak dapat mengakses Internet. Namun, Alibaba Cloud menyediakan gerbang NAT untuk mengaktifkan komunikasi antara VPC dan Internet. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses Internet?.
Hindari mengakses SLS melalui Internet. Jika harus melakukannya, gunakan HTTPS dan aktifkan transfer acceleration untuk SLS.
project
Nama proyek SLS.
String
Ya
Tidak ada
Tidak ada.
logStore
Nama Logstore atau Metricstore SLS.
String
Ya
Tidak ada
Data di Logstore dikonsumsi dengan cara yang sama seperti di Metricstore.
accessId
ID AccessKey Akun Alibaba Cloud Anda.
String
Ya
Tidak ada nilai default
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?.
PentingUntuk melindungi pasangan AccessKey Anda, gunakan variabel untuk mengonfigurasi AccessKey Anda.
accessKey
Rahasia AccessKey Akun Alibaba Cloud Anda.
String
Ya
Tidak ada nilai default
Khusus sumber
Parameter
Deskripsi
Tipe data
Wajib?
Nilai default
Keterangan
enableNewSource
Menentukan apakah akan menggunakan antarmuka sumber baru yang mengimplementasikan FLIP-27.
Boolean
Tidak
false
Sumber baru menyesuaikan secara otomatis terhadap perubahan shard dan mendistribusikan shard secara merata di semua subtugas sumber.
PentingOpsi ini hanya didukung di VVR 8.0.9 atau yang lebih baru. Mulai dari VVR 11.1, opsi ini secara default bernilai true.
Jika Anda mengubah opsi ini, pekerjaan Anda tidak dapat dilanjutkan dari state yang disimpan. Sebagai solusi, pertama-tama jalankan pekerjaan Anda dengan opsi consumerGroup untuk mencatat offset konsumen saat ini. Kemudian, atur consumeFromCheckpoint ke true dan mulai ulang pekerjaan Anda tanpa state.
Jika SLS berisi shard read-only, beberapa subtugas Flink mungkin selesai membaca dari shard tersebut lalu meminta shard lain yang belum dibaca. Hal ini dapat menyebabkan distribusi shard yang tidak merata di antara subtugas, sehingga mengurangi efisiensi konsumsi dan kinerja sistem secara keseluruhan. Untuk mengurangi ketidakseimbangan ini, sesuaikan parallelisme sumber, optimalkan penjadwalan tugas, atau gabungkan shard kecil.
shardDiscoveryIntervalMs
Interval deteksi dinamis perubahan shard. Satuan: milidetik.
Long
Tidak
60000
Atur opsi ini ke nilai negatif untuk menonaktifkan deteksi dinamis.
CatatanOpsi ini harus minimal 1 menit (60.000 milidetik).
Opsi ini hanya berlaku jika enableNewSource diatur ke true.
Opsi ini hanya didukung di VVR 8.0.9 atau yang lebih baru.
startupMode
Mode startup tabel sumber.
String
Tidak
timestamp
timestamp(default): Mengonsumsi log mulai dari waktu yang ditentukan.latest: Mengonsumsi log mulai dari offset terbaru.earliest: Mengonsumsi log mulai dari offset paling awal.consumer_group: Mengonsumsi log mulai dari offset yang direkam dalam kelompok konsumen. Jika tidak ada offset yang direkam untuk suatu shard, konsumsi log dimulai dari offset paling awal.
PentingPada versi VVR sebelum 11.1, nilai consumer_group tidak didukung. Untuk mengonsumsi log dari offset yang direkam oleh kelompok konsumen tertentu, atur
consumeFromCheckpointketrue. Dalam kasus ini, mode startup ini tidak berlaku.
startTime
Waktu mulai mengonsumsi log.
String
Tidak
Waktu saat ini
Format:
yyyy-MM-dd hh:mm:ss.Opsi ini hanya berlaku jika
startupModediatur ketimestamp.CatatanOpsi startTime dan stopTime didasarkan pada bidang __receive_time__ di SLS, bukan bidang __timestamp__.
stopTime
Waktu akhir konsumsi log.
String
Tidak
Tidak ada
Format:
yyyy-MM-dd hh:mm:ss.CatatanGunakan opsi ini hanya untuk mengonsumsi log historis. Atur ke titik waktu di masa lalu. Jika Anda mengaturnya ke waktu di masa depan, konsumsi mungkin berhenti secara tak terduga jika tidak ada log baru yang ditulis. Hal ini tampak sebagai aliran data yang terputus tanpa pesan error.
Untuk keluar dari program Flink setelah konsumsi log selesai, atur juga exitAfterFinish ke true.
consumerGroup
Nama kelompok konsumen.
String
Tidak
Tidak ada
Kelompok konsumen mencatat progres konsumsi. Anda dapat menentukan nama kustom apa pun.
CatatanAnda tidak dapat berbagi kelompok konsumen di beberapa pekerjaan untuk konsumsi kolaboratif. Gunakan kelompok konsumen berbeda untuk pekerjaan berbeda. Jika Anda menggunakan kelompok konsumen yang sama untuk pekerjaan berbeda, setiap pekerjaan akan mengonsumsi semua data. Saat Flink mengonsumsi data dari SLS, shard tidak ditugaskan melalui kelompok konsumen SLS. Oleh karena itu, setiap pekerjaan secara independen mengonsumsi semua pesan, meskipun menggunakan kelompok konsumen yang sama.
consumeFromCheckpoint
Menentukan apakah akan mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen tertentu.
String
Tidak
false
true: Jika Anda mengatur parameter ini ke true, Anda juga harus menentukan kelompok konsumen. Flink mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen. Jika tidak ada checkpoint di kelompok konsumen, Flink mengonsumsi log dari waktu yang ditentukan oleh parameter startTime.false(default): Flink tidak mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen tertentu.
PentingOpsi ini tidak didukung di VVR 11.1 atau yang lebih baru. Untuk VVR 11.1 atau yang lebih baru, atur
startupModekeconsumer_group.maxRetries
Jumlah percobaan ulang setelah pembacaan dari SLS gagal.
String
Tidak
3
Tidak ada.
batchGetSize
Jumlah kelompok log yang dibaca per permintaan.
String
Tidak
100
Atur
batchGetSizeke nilai kurang dari 1000. Jika tidak, terjadi error.exitAfterFinish
Menentukan apakah program Flink keluar setelah konsumsi data selesai.
String
Tidak
false
true: Program Flink keluar setelah konsumsi data selesai.false(default): Program Flink tidak keluar setelah konsumsi data selesai.
query
PentingOpsi ini sudah ditinggalkan di VVR 11.3 tetapi tetap kompatibel di versi yang lebih baru.
Pernyataan kueri yang digunakan untuk memproses data sebelum mengonsumsi data SLS.
String
Tidak
Tidak ada nilai default
Gunakan opsi query untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari pemuatan semua data ke Flink, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan.
Contohnya,
'query' = '*| where request_method = ''GET'''memfilter log di mana bidang request_method bernilai GET sebelum Flink membacanya.CatatanTulis kueri menggunakan sintaks SPL.
PentingOpsi ini hanya didukung di VVR 8.0.1 atau yang lebih baru.
Fitur ini dikenai biaya SLS. Untuk detailnya, lihat Tagihan.
processor
Prosesor konsumen SLS. Jika query dan processor keduanya diatur, query memiliki prioritas lebih tinggi.
String
Tidak
Tidak ada
Gunakan opsi processor untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari pemuatan semua data ke Flink, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan. Kami merekomendasikan menggunakan processor daripada query.
Contohnya,
'processor' = 'test-filter-processor'menerapkan prosesor konsumen SLS untuk memfilter data sebelum Flink membacanya.CatatanTulis processor menggunakan sintaks SPL. Untuk detail tentang membuat dan memperbarui prosesor konsumen SLS, lihat Kelola prosesor konsumen.
PentingOpsi ini hanya didukung di VVR 11.3 atau yang lebih baru.
Fitur ini dikenai biaya SLS. Untuk detailnya, lihat Tagihan.
Khusus sink
parameter
Deskripsi
Tipe data
Wajib?
Nilai default
Keterangan
topicField
Nama bidang yang nilainya menggantikan bidang __topic__. Ini menunjukkan topik log.
String
Tidak
Tidak ada
Parameter ini menentukan bidang yang sudah ada di tabel.
timeField
Nama bidang yang nilainya menggantikan bidang __timestamp__. Ini menunjukkan waktu penulisan log.
String
Tidak
Waktu saat ini
Bidang ini harus ada di tabel dan tipenya harus INT. Jika tidak ditentukan, digunakan waktu saat ini.
sourceField
Nama bidang yang nilainya menggantikan bidang __source__. Ini menunjukkan sumber log, seperti alamat IP mesin yang menghasilkan log.
String
Tidak
Tidak ada
Bidang ini harus ada di tabel.
partitionField
Nama bidang. Nilai hash dihitung dari nilai bidang ini saat menulis data. Data dengan nilai hash yang sama ditulis ke shard yang sama.
String
Tidak
Tidak ada nilai default
Jika tidak ditentukan, setiap entri data ditulis secara acak ke shard yang tersedia.
buckets
Jumlah bucket untuk dikelompokkan ulang berdasarkan nilai hash saat partitionField ditentukan.
String
Tidak
64
Nilai valid: [1, 256]. Nilainya harus merupakan pangkat dari 2. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, beberapa shard tidak menerima data.
flushIntervalMs
Interval pemicuan penulisan data.
String
Tidak
2000
Satuan: milidetik.
writeNullProperties
Menentukan apakah nilai null ditulis sebagai string kosong ke SLS.
Boolean
Tidak
true
true(default): Menulis nilai null sebagai string kosong.false: Tidak menulis bidang yang nilainya dihitung sebagai null.
CatatanOpsi ini hanya didukung di VVR 8.0.6 atau yang lebih baru.
Pemetaan tipe data
Tipe data Flink | Tipe data SLS |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Data ingestion (pratinjau publik)
Batasan
Hanya didukung oleh mesin komputasi waktu nyata Ververica Runtime (VVR) 11.1 dan yang lebih baru.
Sintaks
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>Opsi konfigurasi
Parameter | Deskripsi | Tipe data | Wajib? | Nilai default | Keterangan |
type | Jenis sumber data. | String | Ya | Tidak ada | Atur ke sls. |
endpoint | Alamat endpoint. | String | Ya | Tidak ada nilai default | Masukkan endpoint VPC SLS. Untuk informasi selengkapnya, lihat Endpoints. Catatan
|
accessId | ID AccessKey Akun Alibaba Cloud Anda. | String | Ya | Tidak ada nilai default | Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?. Penting Untuk melindungi pasangan AccessKey Anda, gunakan variabel untuk mengonfigurasi AccessKey Anda. |
accessKey | Rahasia AccessKey Akun Alibaba Cloud Anda. | String | Ya | Tidak ada | |
project | Nama proyek SLS. | String | Ya | Tidak ada | Tidak ada. |
logStore | Nama Logstore atau Metricstore SLS. | String | Ya | Tidak ada | Data di Logstore dikonsumsi dengan cara yang sama seperti di Metricstore. |
schema.inference.strategy | Strategi inferensi skema. | String | Tidak | continuous |
|
maxPreFetchLogGroups | Jumlah maksimum kelompok log yang dibaca dan diurai per shard selama inferensi skema awal. | Integer | Tidak | 50 | Sebelum membaca dan memproses data, konektor mencoba melakukan pre-consume sejumlah kelompok log yang ditentukan dari setiap shard untuk menginisialisasi skema. |
shardDiscoveryIntervalMs | Interval deteksi dinamis perubahan shard. Satuan: milidetik. | Long | Tidak | 60000 | Atur opsi ini ke nilai negatif untuk menonaktifkan deteksi dinamis. Catatan Opsi ini harus minimal 1 menit (60.000 milidetik). |
startupMode | Mode startup. | String | Tidak | Tidak ada nilai default |
|
startTime | Waktu mulai mengonsumsi log. | String | Tidak | Waktu saat ini | Format: yyyy-MM-dd hh:mm:ss. Opsi ini hanya berlaku jika startupMode diatur ke timestamp. Catatan Opsi startTime dan stopTime didasarkan pada bidang __receive_time__ di SLS, bukan bidang __timestamp__. |
stopTime | Waktu akhir mengonsumsi log. | String | Tidak | Tidak ada nilai default | Format: yyyy-MM-dd hh:mm:ss. Catatan Untuk keluar dari program Flink setelah konsumsi log selesai, atur juga exitAfterFinish ke true. |
consumerGroup | Nama kelompok konsumen. | String | Tidak | Tidak ada | Kelompok konsumen mencatat progres konsumsi. Anda dapat menentukan nama kustom apa pun. |
batchGetSize | Jumlah kelompok log yang dibaca per permintaan. | Integer | Tidak | 100 | batchGetSize harus kurang dari 1000. Jika tidak, terjadi error. |
maxRetries | Jumlah percobaan ulang setelah pembacaan dari SLS gagal. | Integer | Tidak | 3 | Tidak ada |
exitAfterFinish | Menentukan apakah program Flink keluar setelah konsumsi data selesai. | Boolean | Tidak | false |
|
query | Pernyataan kueri yang digunakan untuk memproses data sebelum mengonsumsi data SLS. | String | Tidak | Tidak ada nilai default | Gunakan opsi query untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari pemuatan semua data ke Flink, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan. Contohnya, Catatan Tulis kueri menggunakan sintaks SPL. Penting
|
compressType | Jenis kompresi yang digunakan oleh SLS. | String | Tidak | Tidak ada nilai default | Jenis kompresi yang didukung meliputi:
|
timeZone | Zona waktu untuk startTime dan stopTime. | String | Tidak | Tidak ada nilai default | Tidak ada offset yang ditambahkan secara default. |
regionId | Wilayah tempat SLS ditempatkan. | String | Tidak | Tidak ada nilai default | Untuk detail konfigurasi, lihat Wilayah. |
signVersion | Versi signature yang digunakan untuk permintaan SLS. | String | Tidak | Tidak ada nilai default | Untuk detail konfigurasi, lihat Signature permintaan. |
shardModDivisor | Divisor yang digunakan saat membaca dari shard Logstore SLS. | Int | Tidak | -1 | Untuk detail konfigurasi, lihat Shard. |
shardModRemainder | Sisa bagi yang digunakan saat membaca dari shard Logstore SLS. | Int | Tidak | -1 | Untuk detail konfigurasi, lihat Shard. |
metadata.list | Kolom metadata yang diteruskan ke downstream. | String | Tidak | Tidak ada | Bidang metadata yang tersedia meliputi |
Pemetaan tipe data
Pemetaan tipe data untuk data ingestion adalah sebagai berikut:
Tipe data SLS | Tipe Bidang CDC |
STRING | STRING |
Inferensi dan evolusi skema
Pre-konsumsi dan inisialisasi skema
Konektor SLS mempertahankan skema Logstore saat ini. Sebelum membaca data dari Logstore, konektor melakukan pre-consume hingga maxPreFetchLogGroups kelompok log dari setiap shard, mengurai skema setiap log, lalu menggabungkannya untuk menginisialisasi skema tabel. Sebelum konsumsi aktual dimulai, konektor menghasilkan event pembuatan tabel berdasarkan skema yang diinisialisasi.
CatatanUntuk setiap shard, konektor mencoba mengonsumsi data satu jam sebelum waktu saat ini guna mengurai skema.
Kunci primer
Log SLS tidak berisi kunci primer. Tambahkan kunci primer secara manual menggunakan aturan transformasi:
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2Inferensi dan evolusi skema
Setelah inisialisasi skema, jika schema.inference.strategy diatur ke static, konektor mengurai setiap entri log menggunakan skema awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, konektor mengurai setiap entri log, menginferensi kolom fisik, lalu membandingkannya dengan skema saat ini. Jika skema yang diinferensi berbeda dari skema saat ini, konektor menggabungkannya menggunakan aturan berikut:
Jika skema yang diinferensi mencakup bidang yang tidak ada di skema saat ini, tambahkan bidang tersebut ke skema saat ini dan hasilkan event penambahan kolom nullable.
Jika skema yang diinferensi tidak mencakup bidang yang ada di skema saat ini, pertahankan bidang tersebut dan atur nilainya ke NULL. Jangan hasilkan event penghapusan kolom.
Konektor SLS menginferensi semua bidang sebagai string. Saat ini, hanya penambahan kolom yang didukung. Kolom baru ditambahkan di akhir skema saat ini dan ditandai sebagai nullable.
Kode contoh
Tabel sumber dan tabel sink SQL
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;Sumber data ingestion
Anda dapat menggunakan SLS sebagai sumber data ingestion untuk menulis data SLS secara real time ke sistem downstream yang didukung. Misalnya, Anda dapat mengonfigurasi pekerjaan data ingestion untuk menulis data dari Logstore ke data lake DLF dalam format Paimon. Pekerjaan tersebut secara otomatis menginferensi tipe data dan skema tabel sink serta mendukung evolusi skema dinamis selama runtime.
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# Tambahkan informasi kunci primer ke tabel
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# Tulis semua data dari test_log ke tabel test_database.inventory
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Aktifkan deletion vectors untuk meningkatkan kinerja baca
table.properties.deletion-vectors.enabled: trueDataStream API
Untuk membaca atau menulis data menggunakan DataStream API, gunakan konektor DataStream yang sesuai untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi selengkapnya, lihat Penggunaan konektor DataStream.
Jika Anda menggunakan versi VVR sebelum 8.0.10, pekerjaan Anda mungkin gagal dimulai karena paket JAR dependensi hilang. Untuk mengatasi hal ini, tambahkan paket uber JAR yang sesuai sebagai dependensi tambahan.
Baca data dari SLS
Realtime Compute for Apache Flink menyediakan implementasi SlsSourceFunction dari SourceFunction untuk membaca data dari SLS. Kode contoh:
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Menyiapkan lingkungan eksekusi streaming
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Membuat dan menambahkan sumber dan sink SLS.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// Ukuran batch get harus diberikan.
accessInfo.setBatchGetSize(10);
// Parameter opsional
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// waktu mulai mengonsumsi, diatur ke waktu saat ini.
int startInSec = (int) (new Date().getTime() / 1000);
// waktu berhenti mengonsumsi, -1 berarti tidak pernah berhenti.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}Tulis data ke SLS
Realtime Compute for Apache Flink menyediakan implementasi SLSOutputFormat dari OutputFormat untuk menulis data ke SLS. Kode contoh:
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
Konektor DataStream SLS tersedia di Repositori Maven Central.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-format-common</artifactId>
</exclusion>
</exclusions>
</dependency>