Topik ini menjelaskan cara menggunakan konektor Simple Log Service (SLS).
Informasi latar belakang
Simple Log Service adalah layanan komprehensif untuk data log yang memungkinkan Anda mengumpulkan, mengonsumsi, mengirimkan, dan mengkueri data log secara cepat guna meningkatkan efisiensi operasional serta O&M, sekaligus membangun kemampuan pemrosesan log berskala besar.
Tabel berikut mencantumkan informasi yang didukung oleh konektor SLS.
Kategori | Detail |
Jenis yang didukung | Tabel sumber dan tabel sink |
Mode eksekusi | Hanya mode streaming |
Metrik pemantauan kustom | Tidak berlaku |
Format data | Tidak ada |
Jenis API | SQL, DataStream, dan YAML data ingestion |
Dukungan untuk memperbarui atau menghapus data tabel sink | Tidak mendukung pembaruan atau penghapusan data tabel sink. Hanya mendukung operasi insert. |
Fitur utama
Tabel sumber konektor SLS mendukung pembacaan langsung bidang metadata pesan. Bidang metadata yang didukung tercantum dalam tabel berikut.
Nama bidang | Tipe bidang | Deskripsi |
__source__ | STRING METADATA VIRTUAL | Sumber pesan. |
__topic__ | STRING METADATA VIRTUAL | Topik pesan. |
__timestamp__ | BIGINT METADATA VIRTUAL | Waktu log. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | Tag pesan. Untuk properti |
Prasyarat
Anda harus membuat proyek SLS dan Logstore. Untuk informasi selengkapnya, lihat Buat proyek dan Logstore.
Batasan
Hanya Ververica Runtime (VVR) 11.1 dan versi yang lebih baru yang mendukung SLS sebagai sumber data tersinkronisasi untuk YAML data ingestion.
Konektor SLS menjamin semantik at-least-once.
Hindari mengatur konkurensi sumber lebih tinggi daripada jumlah shard karena hal ini membuang sumber daya. Pada VVR 8.0.5 dan versi sebelumnya, jika jumlah shard berubah, failover otomatis mungkin berhenti bekerja sehingga beberapa shard tidak dikonsumsi.
SQL
Sintaksis
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);dengan parameter
Parameter umum
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Catatan
connector
Jenis tabel.
String
Ya
Tidak ada
Nilai tetap: sls.
endPoint
Alamat titik akhir.
String
Ya
Tidak ada
Masukkan titik akhir jaringan pribadi untuk SLS. Untuk informasi selengkapnya, lihat Titik akhir layanan.
CatatanRealtime Compute for Apache Flink tidak mendukung akses jaringan publik secara default. Namun, Alibaba Cloud NAT Gateway memungkinkan komunikasi antara jaringan VPC dan jaringan publik. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses jaringan publik?.
Hindari mengakses SLS melalui jaringan publik. Jika diperlukan, gunakan HTTPS dan aktifkan Global Accelerator (GA) untuk SLS. Untuk informasi selengkapnya, lihat Kelola akselerasi transfer.
project
Nama proyek SLS.
String
Ya
Tidak ada
Tidak ada.
logStore
Nama Logstore atau metricstore SLS.
String
Ya
Tidak ada
Logstore dan metricstore menggunakan metode konsumsi yang sama.
accessId
ID AccessKey Akun Alibaba Cloud Anda.
String
Ya
Tidak ada
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey saya?.
PentingUntuk menghindari paparan Informasi AccessKey Anda, gunakan variabel untuk menentukan nilai AccessKey. Untuk informasi selengkapnya, lihat Variabel proyek.
accessKey
Rahasia AccessKey Akun Alibaba Cloud Anda.
String
Ya
Tidak ada
Parameter khusus sumber
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Catatan
enableNewSource
Apakah akan mengaktifkan sumber baru yang mengimplementasikan antarmuka FLIP-27.
Boolean
Tidak
false
Sumber baru secara otomatis menyesuaikan perubahan shard dan mendistribusikan shard secara merata ke semua task sumber.
PentingParameter ini didukung pada VVR 8.0.9 dan versi yang lebih baru. Mulai dari VVR 11.1, nilai default-nya adalah true.
Jika Anda mengubah parameter ini, pekerjaan tidak dapat dilanjutkan dari status sebelumnya. Untuk melanjutkan konsumsi dari offset historis, pertama-tama mulai pekerjaan dengan parameter consumerGroup untuk mencatat progres konsumsi dalam kelompok konsumen SLS. Kemudian atur consumeFromCheckpoint ke true dan restart pekerjaan tanpa state.
Jika terdapat shard read-only di SLS, beberapa task Flink mungkin terus meminta shard lain yang belum diproses setelah menyelesaikan konsumsi shard read-only. Hal ini dapat menyebabkan distribusi shard yang tidak merata di antara task konkuren, sehingga mengurangi efisiensi konsumsi keseluruhan dan kinerja sistem. Untuk mengatasi masalah ini, sesuaikan konkurensi, optimalkan penjadwalan task, atau gabungkan shard kecil untuk mengurangi jumlah shard dan kompleksitas penugasan task.
shardDiscoveryIntervalMs
Interval untuk mendeteksi perubahan shard secara dinamis, dalam milidetik.
Long
Tidak
60000
Atur ke nilai negatif untuk menonaktifkan deteksi dinamis.
CatatanNilai ini harus minimal 60000 ms (1 menit).
Parameter ini hanya berlaku ketika enableNewSource bernilai true.
Parameter ini didukung pada VVR 8.0.9 dan versi yang lebih baru.
startupMode
Mode startup untuk tabel sumber.
String
Tidak
timestamp
timestamp(default): Mengonsumsi log mulai dari waktu tertentu.latest: Mengonsumsi log mulai dari offset terbaru.earliest: Mengonsumsi log mulai dari offset paling awal.consumer_group: Mengonsumsi log mulai dari offset yang dicatat dalam kelompok konsumen. Jika tidak ada offset yang dicatat untuk suatu shard, konsumsi dimulai dari offset paling awal.
PentingVersi VVR sebelum 11.1 tidak mendukung consumer_group. Atur
consumeFromCheckpointketrue. Konsumsi kemudian dimulai dari offset yang dicatat dalam kelompok konsumen yang ditentukan, dan startupMode tidak berpengaruh.
startTime
Waktu mulai untuk mengonsumsi log.
String
Tidak
Waktu saat ini
Format:
yyyy-MM-dd hh:mm:ss.Hanya berlaku ketika
startupModediatur ketimestamp.CatatanstartTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan __timestamp__.
stopTime
Waktu akhir untuk mengonsumsi log.
String
Tidak
Tidak ada
Format:
yyyy-MM-dd hh:mm:ss.CatatanGunakan parameter ini hanya untuk mengonsumsi log historis. Atur ke waktu lampau. Jika diatur ke waktu mendatang, konsumsi mungkin berakhir lebih awal karena kurangnya log baru, menyebabkan gangguan aliran data tanpa pesan error.
Untuk menghentikan program Flink ketika konsumsi log selesai, atur juga exitAfterFinish=true.
consumerGroup
Nama kelompok konsumen.
String
Tidak
Tidak ada
Kelompok konsumen mencatat progres konsumsi. Anda dapat menentukan nama kustom tanpa batasan.
CatatanAnda tidak dapat mengoordinasikan konsumsi lintas beberapa pekerjaan menggunakan kelompok konsumen yang sama. Setiap pekerjaan Flink harus menggunakan kelompok konsumen yang unik. Jika beberapa pekerjaan berbagi kelompok konsumen yang sama, mereka akan mengonsumsi semua data. Hal ini terjadi karena Flink tidak menetapkan partisi melalui kelompok konsumen SLS, sehingga setiap konsumen memproses pesan secara independen terlepas dari kelompok yang dibagikan.
consumeFromCheckpoint
Apakah akan mengonsumsi log mulai dari checkpoint yang disimpan dalam kelompok konsumen yang ditentukan.
String
Tidak
false
true: Tentukan kelompok konsumen. Program Flink mengonsumsi log mulai dari checkpoint yang disimpan dalam kelompok tersebut. Jika tidak ada checkpoint, konsumsi dimulai dari nilai startTime.false(default): Jangan mengonsumsi log mulai dari checkpoint yang disimpan dalam kelompok konsumen yang ditentukan.
PentingVVR 11.1 dan versi yang lebih baru tidak mendukung parameter ini. Untuk VVR 11.1 dan versi yang lebih baru, atur
startupModekeconsumer_group.maxRetries
Jumlah percobaan ulang setelah pembacaan SLS gagal.
String
Tidak
3
Tidak ada.
batchGetSize
Jumlah kelompok log yang dibaca per permintaan.
String
Tidak
100
Nilai
batchGetSizetidak boleh melebihi 1000. Jika tidak, terjadi error.exitAfterFinish
Apakah program Flink keluar setelah konsumsi data selesai.
String
Tidak
false
true: Program Flink keluar setelah konsumsi data selesai.false(default): Program Flink terus berjalan setelah konsumsi data selesai.
query
PentingUsang mulai dari VVR 11.3. Masih kompatibel di versi yang lebih baru.
Pernyataan pra-pemrosesan konsumsi SLS.
String
Tidak
Tidak ada
Gunakan parameter query untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari konsumsi semua data ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan.
Misalnya,
'query' = '*| where request_method = ''GET'''mencocokkan log yang bidang request_method-nya sama dengan GET sebelum Flink membacanya.CatatanGunakan sintaksis SPL untuk kueri. Untuk informasi selengkapnya, lihat Sintaksis SPL.
PentingParameter ini didukung pada VVR 8.0.1 dan versi yang lebih baru.
Fitur ini dikenai biaya SLS. Untuk informasi selengkapnya, lihat Harga.
processor
Prosesor konsumen SLS. Memiliki prioritas lebih tinggi daripada query jika keduanya ada.
String
Tidak
Tidak ada
Gunakan parameter processor untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari konsumsi semua data ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan. Kami merekomendasikan processor daripada query.
Misalnya,
'processor' = 'test-filter-processor'menerapkan prosesor konsumen SLS sebelum Flink membaca data.CatatanGunakan sintaksis SPL untuk prosesor. Untuk informasi selengkapnya, lihat Sintaksis SPL. Untuk petunjuk membuat atau memperbarui prosesor konsumen, lihat Kelola prosesor konsumen.
PentingParameter ini didukung pada VVR 11.3 dan versi yang lebih baru.
Fitur ini dikenai biaya SLS. Untuk informasi selengkapnya, lihat Harga.
Hanya untuk tabel sink
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Catatan
topicField
Nama bidang yang nilainya menggantikan bidang metadata __topic__. Mewakili topik log.
String
Tidak
Tidak ada
Bidang ini harus ada dalam tabel.
timeField
Nama bidang yang nilainya menggantikan bidang metadata __timestamp__. Mewakili waktu penulisan log.
String
Tidak
Waktu saat ini
Bidang ini harus ada dalam tabel dan memiliki tipe data INT. Jika tidak ditentukan, waktu saat ini digunakan.
sourceField
Nama bidang yang nilainya menggantikan bidang metadata __source__. Mewakili sumber log, seperti alamat IP mesin yang menghasilkan log.
String
Tidak
Tidak ada
Bidang ini harus ada dalam tabel.
partitionField
Nama bidang. Data ditulis ke shard berdasarkan nilai hash bidang ini. Data dengan nilai hash yang sama masuk ke shard yang sama.
String
Tidak
Tidak ada
Jika tidak ditentukan, setiap catatan ditulis secara acak ke shard yang tersedia.
buckets
Jumlah bucket untuk menetapkan ulang data ketika partitionField ditentukan.
String
Tidak
64
Nilai valid: bilangan bulat dari 1 hingga 256 yang merupakan pangkat dua. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, beberapa shard tidak menerima data.
flushIntervalMs
Interval waktu yang memicu penulisan data.
String
Tidak
2000
Unit: milidetik.
writeNullProperties
Apakah akan menulis nilai null sebagai string kosong ke SLS.
Boolean
Tidak
true
true(default): Menulis nilai null sebagai string kosong.false: Melewati bidang dengan nilai null saat menulis.
CatatanParameter ini didukung pada VVR 8.0.6 dan versi yang lebih baru.
Pemetaan tipe
Tipe bidang Flink | Tipe bidang SLS |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Data ingestion (pratinjau publik)
Batasan
Fitur ini hanya didukung pada VVR 11.1 dan versi yang lebih baru.
Sintaksis
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>Item konfigurasi
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan | ||||||||||
type | Jenis sumber data. | String | Ya | Tidak ada | Nilai tetap: sls. | ||||||||||
endpoint | Alamat titik akhir. | String | Ya | Tidak ada | Masukkan titik akhir jaringan pribadi untuk SLS. Untuk informasi selengkapnya, lihat Titik akhir layanan. Catatan
| ||||||||||
accessId | ID AccessKey Akun Alibaba Cloud Anda. | String | Ya | Tidak ada | Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey saya?. Penting Untuk menghindari paparan Informasi AccessKey Anda, gunakan variabel untuk menentukan nilai AccessKey. Untuk informasi selengkapnya, lihat Variabel proyek. | ||||||||||
accessKey | Rahasia AccessKey Akun Alibaba Cloud Anda. | String | Ya | Tidak ada | |||||||||||
project | Nama proyek SLS. | String | Ya | Tidak ada | Tidak ada. | ||||||||||
logStore | Nama Logstore atau metricstore SLS. | String | Ya | Tidak ada | Logstore dan metricstore menggunakan metode konsumsi yang sama. | ||||||||||
schema.inference.strategy | Strategi inferensi skema. | String | Tidak | continuous |
| ||||||||||
maxPreFetchLogGroups | Jumlah maksimum kelompok log yang akan dibaca dan diurai per shard selama inferensi skema awal. | Integer | Tidak | 50 | Sebelum pembacaan dan pemrosesan data aktual, coba konsumsi jumlah kelompok log yang ditentukan per shard untuk menginisialisasi informasi skema. | ||||||||||
shardDiscoveryIntervalMs | Interval untuk mendeteksi perubahan shard secara dinamis, dalam milidetik. | Long | Tidak | 60000 | Atur ke nilai negatif untuk menonaktifkan deteksi dinamis. Catatan Nilai ini harus minimal 60000 ms (1 menit). | ||||||||||
startupMode | Mode startup. | String | Tidak | Tidak ada |
| ||||||||||
startTime | Waktu mulai untuk mengonsumsi log. | String | Tidak | Waktu saat ini | Format: yyyy-MM-dd hh:mm:ss. Hanya berlaku ketika startupMode diatur ke timestamp. Catatan startTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan __timestamp__. | ||||||||||
stopTime | Waktu akhir konsumsi log. | String | Tidak | Tidak ada | Format: yyyy-MM-dd hh:mm:ss. Catatan Untuk menghentikan program Flink ketika konsumsi log selesai, atur juga exitAfterFinish=true. | ||||||||||
consumerGroup | Nama kelompok konsumen. | String | Tidak | Tidak ada | Kelompok konsumen mencatat progres konsumsi. Anda dapat menentukan nama kustom tanpa batasan. | ||||||||||
batchGetSize | Jumlah kelompok log yang dibaca per permintaan. | Integer | Tidak | 100 | Nilai batchGetSize tidak boleh melebihi 1000. Jika tidak, terjadi error. | ||||||||||
maxRetries | Jumlah percobaan ulang setelah pembacaan SLS gagal. | Integer | Tidak | 3 | Tidak ada. | ||||||||||
exitAfterFinish | Apakah program Flink keluar setelah konsumsi data selesai. | Boolean | Tidak | false |
| ||||||||||
query | Pernyataan pra-pemrosesan untuk konsumsi SLS. | String | Tidak | Tidak ada | Gunakan parameter query untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari konsumsi semua data ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan. Misalnya, Catatan Gunakan sintaksis SPL untuk kueri. Untuk informasi selengkapnya, lihat Sintaksis SPL. Penting
| ||||||||||
compressType | Jenis kompresi SLS. | String | Tidak | Tidak ada | Jenis kompresi yang didukung:
| ||||||||||
timeZone | Zona waktu untuk startTime dan stopTime. | String | Tidak | Tidak ada | Tidak ada offset yang ditambahkan secara default. | ||||||||||
regionId | Wilayah tempat SLS tersedia. | String | Tidak | Tidak ada | Untuk detail konfigurasi, lihat dokumentasi Wilayah yang tersedia. | ||||||||||
signVersion | Versi signature permintaan SLS. | String | Tidak | Tidak ada | Untuk detail konfigurasi, lihat dokumentasi Penandatanganan permintaan. | ||||||||||
shardModDivisor | Divisor yang digunakan saat membaca shard Logstore SLS. | Int | Tidak | -1 | Untuk detail konfigurasi, lihat dokumentasi Shard. | ||||||||||
shardModRemainder | Sisa yang digunakan saat membaca shard Logstore SLS. | Int | Tidak | -1 | Untuk detail konfigurasi, lihat dokumentasi Shard. | ||||||||||
metadata.list | Kolom metadata yang diteruskan ke sistem downstream. | String | Tidak | Tidak ada | Bidang metadata yang didukung meliputi | ||||||||||
decode.table-id.fields | Bidang yang digunakan untuk menghasilkan ID tabel saat mengurai data log SLS. | String | Tidak | Tidak ada | Beberapa bidang dihubungkan dengan koma Inggris (
Catatan Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru. | ||||||||||
fixed-types | Tipe bidang yang ditentukan saat mengurai data log SLS. | String | Tidak | Tidak ada | Saat mengurai data, tentukan tipe data untuk bidang tertentu. Pisahkan beberapa bidang dengan koma Inggris Catatan Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru. | ||||||||||
timestamp-format.standard | Format bidang timestamp dalam data log SLS. | String | Tidak | SQL | Nilai valid:
Catatan Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru. | ||||||||||
ingestion.ignore-errors | Apakah akan mengabaikan error selama penguraian data. | Boolean | Tidak | false | Catatan Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru. | ||||||||||
ingestion.error-tolerance.max-count | Jumlah maksimum error penguraian yang diizinkan sebelum pekerjaan gagal, ketika ingestion.ignore-errors diaktifkan. | Integer | Tidak | -1 | Hanya berlaku ketika ingestion.ignore-errors diaktifkan. Nilai default -1 berarti error penguraian tidak memicu kegagalan pekerjaan. Catatan Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru. |
Pemetaan tipe
Ketika fixed-types tidak dikonfigurasi, pemetaan tipe data ingestion adalah sebagai berikut:
Tipe bidang SLS | Tipe bidang CDC |
STRING | STRING |
Ketika fixed-types dikonfigurasi, konektor mencoba mengurai data menggunakan tipe yang ditentukan.
Inferensi skema dan sinkronisasi perubahan skema
Pra-konsumsi shard dan inisialisasi skema
Konektor SLS mempertahankan skema Logstore saat ini dari mana ia membaca data. Sebelum membaca data, konektor melakukan pra-konsumsi hingga `maxPreFetchLogGroups` kelompok log per shard. Kemudian, konektor mengurai skema setiap log dan menggabungkan skema tersebut untuk menginisialisasi struktur tabel. Selanjutnya, sebelum konsumsi aktual, konektor menghasilkan event pembuatan tabel yang sesuai berdasarkan skema yang diinisialisasi.
CatatanUntuk setiap shard, konektor SLS mencoba mengonsumsi dan mengurai skema log mulai satu jam sebelum waktu saat ini.
Informasi kunci primer
Log SLS tidak berisi informasi kunci primer. Anda dapat menambahkan kunci primer secara manual menggunakan aturan transformasi:
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2Inferensi skema dan perubahan skema
Setelah inisialisasi skema, jika schema.inference.strategy diatur ke static, konektor SLS mengurai setiap log menggunakan skema awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, konektor SLS mengurai setiap log, melakukan inferensi kolom fisik, dan membandingkannya dengan skema saat ini. Konektor kemudian menggabungkan skema ketika terdapat perbedaan. Aturan penggabungan:
Jika kolom fisik yang diinferensi mencakup bidang yang tidak ada dalam skema saat ini, bidang tersebut ditambahkan ke skema dan event kolom nullable baru dihasilkan.
Jika kolom fisik yang diinferensi tidak mencakup bidang yang sudah ada dalam skema saat ini, bidang tersebut tetap dipertahankan. Datanya diisi dengan NULL, dan tidak ada event penghapusan kolom yang dihasilkan.
Konektor SLS melakukan inferensi semua tipe bidang dalam log sebagai `STRING`. Saat ini, hanya penambahan kolom yang didukung. Kolom baru ditambahkan di akhir skema saat ini dan diatur sebagai nullable.
Contoh kode
Tabel sumber dan sink SQL
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;Sumber data ingestion
SLS dapat berfungsi sebagai sumber data untuk pekerjaan data ingestion, menulis data SLS secara real-time ke sistem downstream yang didukung. Misalnya, Anda dapat mengonfigurasi pekerjaan data ingestion untuk menulis data dari Logstore ke data lake DLF dalam format Paimon. Pekerjaan tersebut secara otomatis melakukan inferensi tipe data bidang dan struktur tabel downstream, serta mendukung evolusi skema dinamis selama runtime.
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# Tambahkan informasi kunci primer ke tabel
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# Tulis semua data dari test_log ke tabel test_database.inventory
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Aktifkan deletion vectors untuk meningkatkan kinerja baca
table.properties.deletion-vectors.enabled: trueAPI DataStream
Untuk membaca atau menulis data menggunakan DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk Flink. Untuk petunjuknya, lihat Gunakan konektor DataStream.
Jika Anda menggunakan VVR sebelum 8.0.10, file JAR dependensi yang hilang dapat mencegah startup pekerjaan. Anda harus menambahkan file JAR -uber yang sesuai sebagai dependensi tambahan.
Baca dari SLS
VVR menyediakan kelas `SlsSourceFunction` untuk membaca dari SLS. Contoh berikut menunjukkan cara membaca dari SLS.
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates and adds SLS source and sink.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size must be given.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// time to start consuming, set to current time.
int startInSec = (int) (new Date().getTime() / 1000);
// time to stop consuming, -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}Tulis ke SLS
VVR menyediakan kelas `SLSOutputFormat` untuk menulis ke SLS. Contoh berikut menunjukkan cara menulis ke SLS.
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
Konektor DataStream SLS tersedia di Maven Central Repository di tautan berikut: Konektor DataStream SLS.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-format-common</artifactId>
</exclusion>
</exclusions>
</dependency>