Realtime Compute for Apache Flink dapat langsung mengonsumsi data dari Log Service (SLS) dengan membuat tabel sumber SLS. Topik ini menjelaskan cara membuat tabel sumber SLS dan mengekstrak field metadata.
Latar Belakang
Tabel berikut menjelaskan dukungan Flink untuk mengonsumsi data Log Service (SLS).
|
Kategori |
Detail |
|
Jenis yang didukung |
Tabel sumber dan tabel sink. |
|
Mode eksekusi |
Hanya mode streaming yang didukung. |
|
Metrik pemantauan spesifik |
Tidak berlaku. |
|
Format data |
Tidak ada. |
|
Jenis API |
SQL. |
|
Memperbarui atau menghapus data di tabel sink |
Anda tidak dapat memperbarui atau menghapus data di tabel sink; hanya penyisipan yang didukung. |
Untuk mulai mengonsumsi log menggunakan Flink, lihat Panduan cepat untuk pekerjaan Flink SQL.
Prasyarat
-
Jika Anda menggunakan RAM user atau RAM role, pastikan memiliki izin yang diperlukan untuk menggunakan konsol Flink. Untuk informasi selengkapnya, lihat Pengelolaan izin.
-
Anda telah membuat ruang kerja Flink. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.
-
Anda telah membuat Project dan LogStore. Untuk informasi selengkapnya, lihat Buat Project dan LogStore.
Batasan
-
Hanya Ververica Runtime (VVR) 11.1 dan versi yang lebih baru yang mendukung SLS sebagai sumber data tersinkronisasi untuk YAML ingesti data.
-
Konektor SLS menjamin semantik at-least-once.
-
Hindari mengatur konkurensi sumber lebih tinggi daripada jumlah shard karena hal ini akan membuang sumber daya. Pada VVR 8.0.5 dan versi sebelumnya, jika jumlah shard berubah, failover otomatis mungkin berhenti bekerja sehingga beberapa shard tidak terkonsumsi.
Buat tabel sumber dan sink SLS
Mengonsumsi data dari Log Service (SLS) dengan Flink memerlukan pekerjaan SQL lengkap. Pekerjaan SQL lengkap terdiri dari tabel sumber, tabel sink, dan pernyataan INSERT INTO untuk memindahkan data dari sumber ke sink setelah diproses.
Untuk informasi tentang cara mengembangkan pekerjaan Flink SQL, lihat Panduan pengembangan pekerjaan SQL.
Realtime Compute for Apache Flink dapat menggunakan data real-time dari Log Service (SLS) sebagai input streaming. Sebagai contoh, pertimbangkan konten log berikut:
__source__: 11.85.*.199
__tag__:__receive_time__: 1562125591
__topic__: test-topic
request_method: GET
status: 200
Contoh
Contoh berikut menunjukkan pekerjaan SQL yang mengonsumsi data dari Log Service (SLS):
Jika nama tabel, nama kolom, atau kata kunci dalam pernyataan SQL Anda bertentangan dengan kata tercadang, bungkus dengan backtick ().
CREATE TEMPORARY TABLE sls_input(
request_method STRING,
status BIGINT,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
request_method STRING,
status BIGINT,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
request_method,
status,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
DENGAN parameter
-
Parameter umum
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Catatan
connector
Jenis tabel.
String
Ya
Tidak ada
Nilai tetap: sls.
endPoint
Alamat titik akhir.
String
Ya
Tidak ada
Gunakan titik akhir internal SLS. Untuk informasi selengkapnya, lihat Titik akhir layanan.
Catatan-
Realtime Compute for Apache Flink tidak mendukung akses jaringan publik secara default. Namun, Alibaba Cloud NAT Gateway memungkinkan komunikasi antara jaringan VPC dan jaringan publik. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses jaringan publik?.
-
Hindari mengakses SLS melalui jaringan publik. Jika diperlukan, gunakan HTTPS dan aktifkan Global Accelerator (GA) untuk SLS. Untuk informasi selengkapnya, lihat Mengelola akselerasi transfer.
project
Nama project SLS.
String
Ya
Tidak ada
Tidak ada.
logStore
Nama Logstore atau Metricstore SLS.
String
Ya
Tidak ada
Logstore dan Metricstore menggunakan metode konsumsi yang sama.
accessId
ID AccessKey akun Alibaba Cloud Anda.
String
Ya
Tidak ada
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey saya?.
PentingUntuk menghindari pengeksposan Informasi AccessKey Anda, gunakan variabel untuk menentukan nilai AccessKey. Untuk informasi selengkapnya, lihat Variabel project.
accessKey
Rahasia AccessKey akun Alibaba Cloud Anda.
String
Ya
Tidak ada
-
-
Parameter khusus sumber
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Catatan
enableNewSource
Apakah akan menggunakan sumber berbasis FLIP-27 yang baru.
Boolean
Tidak
false
Sumber baru secara otomatis menyesuaikan perubahan shard dan mendistribusikan shard secara merata di semua task sumber.
Penting-
Parameter ini didukung di VVR 8.0.9 dan versi yang lebih baru. Mulai dari VVR 11.1, nilai default-nya adalah true.
-
Jika Anda mengubah parameter ini, pekerjaan tidak dapat dilanjutkan dari status sebelumnya. Untuk melanjutkan konsumsi dari offset historis, pertama-tama jalankan pekerjaan dengan parameter consumerGroup untuk mencatat progres konsumsi di kelompok konsumen SLS. Kemudian atur consumeFromCheckpoint ke true dan restart pekerjaan tanpa state.
-
Jika terdapat shard read-only di SLS, beberapa task Flink mungkin terus meminta shard yang belum diproses setelah mengonsumsi shard read-only, menyebabkan distribusi shard tidak merata dan mengurangi efisiensi konsumsi. Untuk mengatasi hal ini, sesuaikan konkurensi, optimalkan penjadwalan task, atau gabungkan shard kecil.
shardDiscoveryIntervalMs
Interval deteksi perubahan shard, dalam milidetik.
Long
Tidak
60000
Atur ke nilai negatif untuk menonaktifkan deteksi dinamis.
Catatan-
Nilai ini harus minimal 60000 ms (1 menit).
-
Parameter ini hanya berlaku ketika enableNewSource bernilai true.
-
Parameter ini didukung di VVR 8.0.9 dan versi yang lebih baru.
startupMode
Mode startup untuk tabel sumber.
String
Tidak
timestamp
-
timestamp(default): Mengonsumsi log mulai dari waktu tertentu. -
latest: Mengonsumsi log mulai dari offset terbaru. -
earliest: Mengonsumsi log mulai dari offset paling awal. -
consumer_group: Mengonsumsi log mulai dari offset yang dicatat di kelompok konsumen. Jika tidak ada offset yang dicatat untuk suatu shard, konsumsi dimulai dari offset paling awal.
Penting-
Versi VVR sebelum 11.1 tidak mendukung consumer_group. Atur
consumeFromCheckpointketrue. Konsumsi kemudian dimulai dari offset yang dicatat di kelompok konsumen yang ditentukan, dan startupMode tidak berpengaruh.
startTime
Waktu mulai untuk mengonsumsi log.
String
Tidak
Waktu saat ini
Format:
yyyy-MM-dd hh:mm:ss.Hanya berlaku ketika
startupModediatur ketimestamp.CatatanstartTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan __timestamp__.
stopTime
Waktu akhir untuk mengonsumsi log.
String
Tidak
Tidak ada
Format:
yyyy-MM-dd hh:mm:ss.Catatan-
Gunakan parameter ini hanya untuk mengonsumsi log historis. Atur ke waktu lampau. Jika diatur ke waktu mendatang, konsumsi mungkin berakhir lebih awal karena kurangnya log baru, yang dapat mengganggu aliran data secara diam-diam.
-
Untuk keluar dari program Flink ketika konsumsi log selesai, atur juga exitAfterFinish=true.
consumerGroup
Nama kelompok konsumen.
String
Tidak
Tidak ada
Kelompok konsumen melacak progres konsumsi. Anda dapat menggunakan nama kustom apa pun.
CatatanAnda tidak dapat mengoordinasikan konsumsi di beberapa pekerjaan menggunakan kelompok konsumen yang sama. Setiap pekerjaan Flink harus menggunakan kelompok konsumen unik. Jika beberapa pekerjaan berbagi kelompok konsumen yang sama, mereka akan mengonsumsi semua data. Hal ini karena Flink tidak menggunakan kelompok konsumen SLS untuk penugasan partisi. Setiap konsumen memproses semua pesan secara independen.
consumeFromCheckpoint
Apakah akan memulai konsumsi dari checkpoint kelompok konsumen yang ditentukan.
String
Tidak
false
-
true: Program Flink memulai konsumsi dari checkpoint kelompok konsumen yang ditentukan. Jika tidak ada checkpoint, konsumsi dimulai dari nilai startTime. -
false(default): Tidak mengonsumsi log mulai dari checkpoint yang disimpan di kelompok konsumen yang ditentukan.
PentingVVR 11.1 dan versi yang lebih baru tidak mendukung parameter ini. Untuk VVR 11.1 dan versi yang lebih baru, atur
startupModekeconsumer_group.maxRetries
Jumlah percobaan ulang setelah pembacaan SLS gagal.
String
Tidak
3
Tidak ada.
batchGetSize
Jumlah kelompok log yang dibaca per permintaan.
String
Tidak
100
Nilai
batchGetSizetidak boleh melebihi 1000. Jika tidak, terjadi error.exitAfterFinish
Apakah program Flink keluar setelah konsumsi data selesai.
String
Tidak
false
-
true: Program Flink keluar setelah konsumsi data selesai. -
false(default): Program Flink terus berjalan setelah konsumsi data selesai.
query
PentingTidak digunakan lagi sejak VVR 11.3. Masih didukung di versi yang lebih baru untuk kompatibilitas mundur.
Pernyataan pra-pemrosesan konsumsi SLS.
String
Tidak
Tidak ada
Gunakan parameter ini untuk memfilter data SLS sebelum dikonsumsi, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan.
Sebagai contoh,
'query' = '*| where request_method = ''GET'''mencocokkan log di mana field request_method bernilai GET sebelum Flink membacanya.CatatanGunakan sintaks SPL untuk kueri. Untuk informasi selengkapnya, lihat Sintaks SPL.
Penting-
Parameter ini didukung di VVR 8.0.1 dan versi yang lebih baru.
-
Fitur ini dikenai biaya SLS. Untuk informasi selengkapnya, lihat Harga.
processor
Prosesor konsumen SLS. Memiliki prioritas lebih tinggi daripada query jika keduanya ada.
String
Tidak
Tidak ada
Gunakan parameter ini untuk memfilter data SLS sebelum dikonsumsi, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan. Kami merekomendasikan menggunakan processor daripada query.
Sebagai contoh,
'processor' = 'test-filter-processor'menerapkan prosesor konsumen SLS sebelum Flink membaca data.CatatanGunakan sintaks SPL untuk processor. Untuk informasi selengkapnya, lihat Sintaks SPL. Untuk petunjuk membuat atau memperbarui prosesor konsumen, lihat Mengelola prosesor konsumen.
PentingParameter ini didukung di VVR 11.3 dan versi yang lebih baru.
Fitur ini dikenai biaya SLS. Untuk informasi selengkapnya, lihat Harga.
-
-
Hanya untuk tabel sink
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Catatan
topicField
Field yang nilainya menggantikan field metadata __topic__, merepresentasikan topik log.
String
Tidak
Tidak ada
Field ini harus ada di tabel.
timeField
Field yang nilainya menggantikan field metadata __timestamp__, merepresentasikan waktu penulisan log.
String
Tidak
Waktu saat ini
Field ini harus ada di tabel dan memiliki tipe data INT. Jika tidak ditentukan, waktu saat ini digunakan.
sourceField
Field yang nilainya menggantikan field metadata __source__, merepresentasikan sumber log (misalnya, alamat IP mesin yang menghasilkan log).
String
Tidak
Tidak ada
Field ini harus ada di tabel.
partitionField
Field yang digunakan untuk routing shard. Data ditulis ke shard berdasarkan nilai hash field ini. Catatan dengan nilai hash yang sama masuk ke shard yang sama.
String
Tidak
Tidak ada
Jika tidak ditentukan, setiap catatan ditulis secara acak ke shard yang tersedia.
buckets
Jumlah bucket untuk menetapkan ulang data ketika partitionField ditentukan.
String
Tidak
64
Nilai valid: bilangan bulat dari 1 hingga 256 yang merupakan pangkat dua. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, beberapa shard tidak menerima data.
flushIntervalMs
Interval yang memicu penulisan data.
String
Tidak
2000
Unit: milidetik.
writeNullProperties
Apakah akan menulis nilai null sebagai string kosong ke SLS.
Boolean
Tidak
true
-
true(default): Menulis nilai null sebagai string kosong. -
false: Melewatkan field dengan nilai null saat menulis.
CatatanParameter ini didukung di VVR 8.0.6 dan versi yang lebih baru.
-
Ekstrak field metadata
Selain field log, Anda dapat mengekstrak empat field metadata berikut dan field kustom lainnya.
|
Parameter |
Tipe |
Deskripsi |
|
__source__ |
STRING METADATA VIRTUAL |
Sumber pesan. |
|
__topic__ |
STRING METADATA VIRTUAL |
Topik pesan. |
|
__timestamp__ |
BIGINT METADATA VIRTUAL |
Waktu log. |
|
__tag__ |
MAP<VARCHAR, VARCHAR> METADATA VIRTUAL |
Tag pesan. Untuk properti seperti |
Untuk mengekstrak field metadata, deklarasikan dengan kata kunci METADATA dalam pernyataan CREATE TABLE Anda, seperti pada contoh berikut:
create table sls_stream(
`__timestamp__` bigint METADATA,
b int,
c varchar
) with (
'connector' = 'sls',
'endpoint' ='cn-hangzhou.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
Dokumen terkait
Untuk mempelajari cara mengonsumsi data menggunakan Flink DataStream API, lihat DataStream API.