Anda dapat menggunakan Realtime Compute for Apache Flink untuk membuat tabel sumber Layanan Log Sederhana guna mengonsumsi data log. Topik ini menjelaskan cara membuat tabel sumber dan mengekstrak field atribut yang terlibat dalam proses pembuatan.
Informasi latar belakang
Tabel berikut menjelaskan pengaturan yang perlu dikonfigurasi agar Realtime Compute for Apache Flink dapat mengonsumsi data log.
Kategori | Deskripsi |
Jenis yang didukung | Anda dapat mengonfigurasi tabel sumber dan tabel hasil. |
Mode operasi | Hanya mode streaming yang didukung. |
Metrik | Metrik tidak didukung. |
Format data | Tidak ada. |
Tipe API | Pernyataan SQL didukung. |
Apakah data log dapat diperbarui atau dihapus di tabel hasil | Anda tidak dapat memperbarui atau menghapus data log di tabel hasil. Anda hanya dapat menyisipkan data log ke tabel hasil. |
Untuk informasi lebih lanjut tentang cara menggunakan Realtime Compute for Apache Flink untuk mengonsumsi data log, lihat Memulai dengan Penerapan Flink SQL.
Prasyarat
Jika Anda ingin menggunakan Pengguna Resource Access Management (RAM) atau Peran RAM untuk mengonsumsi data log, pastikan bahwa Pengguna RAM atau Peran RAM memiliki izin yang diperlukan pada konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Manajemen Izin.
Ruang kerja Realtime Compute for Apache Flink telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.
Proyek dan penyimpanan log telah dibuat. Untuk informasi lebih lanjut, lihat Buat Proyek dan Penyimpanan Log.
Batasan
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 2.0.0 atau versi lebih baru yang mendukung konektor Layanan Log Sederhana.
Konektor Layanan Log Sederhana hanya mendukung semantik setidaknya sekali.
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.13 atau versi lebih baru yang mendukung failover otomatis penerapan akibat perubahan jumlah shard.
Disarankan untuk tidak menyetel paralelisme penerapan untuk node sumber ke nilai yang lebih besar dari jumlah shard. Jika paralelisme penerapan untuk node sumber lebih besar dari jumlah shard, sumber daya mungkin terbuang. Di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.5 atau versi lebih lama, jika jumlah shard berubah, failover otomatis penerapan mungkin menjadi tidak valid dan shard tertentu mungkin tidak dikonsumsi.
Buat tabel sumber Layanan Log Sederhana dan tabel hasil
Anda harus mengembangkan draf SQL lengkap sebelum menggunakan Realtime Compute for Apache Flink untuk mengonsumsi data log di Layanan Log Sederhana. Draf SQL lengkap mencakup tabel sumber dan tabel hasil. Setelah data log di tabel sumber diproses, hasilnya dimasukkan ke tabel hasil menggunakan pernyataan INSERT INTO.
Untuk informasi lebih lanjut tentang cara mengembangkan draf SQL di Realtime Compute for Apache Flink, lihat Kembangkan Draf SQL.
Layanan Log Sederhana menyimpan data log secara real-time. Realtime Compute for Apache Flink dapat membaca data tersebut dalam mode streaming sebagai data masukan. Kode berikut memberikan contoh log:
__source__: 11.85.*.199
__tag__:__receive_time__: 1562125591
__topic__: test-topic
request_method: GET
status: 200Contoh kode
Kode berikut memberikan contoh draf SQL yang dapat Anda kembangkan di Realtime Compute for Apache Flink untuk mengonsumsi data log di Layanan Log Sederhana.
Jika nama tabel, kolom, dan field cadangan dalam draf SQL bertentangan satu sama lain, Anda harus mengapit nama-nama tersebut dengan backtick (`).
CREATE TEMPORARY TABLE sls_input(
request_method STRING,
status BIGINT,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
request_method STRING,
status BIGINT,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
request_method,
status,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; Parameter dalam klausa WITH
Parameter Umum
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Jenis tabel.
STRING
Ya
Tidak ada
Atur nilainya menjadi sls.
endPoint
Titik akhir Layanan Log Sederhana.
STRING
Ya
Tidak ada
Masukkan titik akhir internal Layanan Log Sederhana. Untuk informasi lebih lanjut, lihat Titik Akhir.
CatatanSecara default, Realtime Compute for Apache Flink tidak dapat mengakses Internet. Alibaba Cloud menyediakan NAT gateway untuk memungkinkan komunikasi antara virtual private cloud (VPC) dan Internet. Untuk informasi lebih lanjut, lihat Operasi Konsol.
Kami merekomendasikan agar Anda tidak mengakses Layanan Log Sederhana melalui Internet. Jika Anda ingin mengakses Layanan Log Sederhana melalui Internet, gunakan HTTPS dan aktifkan fitur percepatan global. Untuk informasi lebih lanjut, Gunakan fitur percepatan transfer.
project
Nama proyek Layanan Log Sederhana.
STRING
Ya
Tidak ada
Tidak tersedia
logStore
Nama Logstore atau penyimpanan metrik Layanan Log Sederhana.
STRING
Ya
Tidak ada
Data di Logstore dikonsumsi dengan cara yang sama seperti di penyimpanan metrik.
accessId
ID AccessKey akun Alibaba Cloud Anda.
STRING
Ya
Tidak ada
Untuk informasi lebih lanjut, lihat bagian "Bagaimana cara melihat informasi tentang ID AccessKey dan rahasia AccessKey akun?" topik Referensi.
PentingUntuk melindungi pasangan AccessKey Anda, kami merekomendasikan agar Anda mengonfigurasi ID AccessKey menggunakan metode manajemen kunci. Untuk informasi lebih lanjut, lihat Kelola variabel dan kunci.
accessKey
Rahasia AccessKey akun Alibaba Cloud Anda.
STRING
Ya
Tidak ada
Untuk informasi lebih lanjut, lihat bagian "Bagaimana cara melihat informasi tentang ID AccessKey dan rahasia AccessKey akun?" topik Referensi.
PentingUntuk melindungi pasangan AccessKey Anda, kami merekomendasikan agar Anda mengonfigurasi rahasia AccessKey menggunakan metode manajemen kunci. Untuk informasi lebih lanjut, lihat Kelola variabel dan kunci.
Parameter Hanya untuk Tabel Sumber
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
enableNewSource
Menentukan apakah akan menggunakan antarmuka sumber FLIP-27 yang direfaktor.
BOOLEAN
Tidak
false
Antarmuka sumber FLIP-27 yang direfaktor secara otomatis menyesuaikan dengan perubahan shard. Ini memastikan bahwa shard didistribusikan secara merata di semua sumber data.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.9 atau versi lebih baru yang mendukung parameter ini.
PentingPenerapan tidak dapat dipulihkan dari keadaan tertentu setelah nilai parameter ini berubah.
Anda dapat mengonfigurasi parameter consumerGroup untuk memulai penerapan. Saat penerapan mengonsumsi data Layanan Log Sederhana, Realtime Compute for Apache Flink mencatat offset konsumen saat ini di grup konsumen Layanan Log Sederhana. Dalam hal ini, Anda dapat menyetel parameter consumeFromCheckpoint ke true dan memulai penerapan tanpa keadaan. Dengan cara ini, penerapan melanjutkan konsumsi dari offset konsumen saat ini.
shardDiscoveryIntervalMs
Interval deteksi dinamis perubahan shard. Satuan: milidetik.
LONG
Tidak
60000
Jika Anda menyetel parameter ini ke nilai negatif, fitur deteksi dinamis dapat dinonaktifkan.
CatatanParameter ini hanya berlaku jika parameter enableNewSource disetel ke true.
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.9 atau versi lebih baru yang mendukung parameter ini.
startupMode
Mode startup tabel sumber.
STRING
Tidak
timestamp
Nilai yang valid:
timestamp: Log dikonsumsi dari waktu mulai yang ditentukan. Ini adalah nilai default.
latest: Log dikonsumsi dari offset terbaru.
earliest: Log dikonsumsi dari offset paling awal.
CatatanJika Anda menyetel parameter consumeFromCheckpoint ke true, log dikonsumsi dari checkpoint yang disimpan di grup konsumen yang ditentukan. Mode startup yang ditentukan oleh parameter ini tidak berlaku.
startTime
Waktu mulai konsumsi log.
STRING
Tidak
Waktu saat ini
Format: yyyy-MM-dd hh:mm:ss.
Parameter ini hanya berlaku jika parameter startupMode disetel ke timestamp.
CatatanParameter startTime dan stopTime dikonfigurasikan berdasarkan field atribut __receive_time__ di tabel sumber Layanan Log Sederhana tetapi tidak berdasarkan field atribut __timestamp__.
stopTime
Waktu berhenti konsumsi log.
STRING
Tidak
Tidak ada
Format: yyyy-MM-dd hh:mm:ss.
CatatanJika Anda ingin Realtime Compute for Apache Flink keluar setelah konsumsi log selesai, Anda harus mengonfigurasi parameter exitAfterFinish bersama dengan parameter ini dan menyetel parameter exitAfterFinish ke true.
consumerGroup
Nama grup konsumen.
STRING
Tidak
Tidak ada
Grup konsumen mencatat kemajuan konsumsi. Anda dapat menentukan nama grup konsumen kustom. Format nama tidak tetap.
CatatanGrup konsumen tidak dapat digunakan bersama oleh beberapa penerapan untuk konsumsi kolaboratif. Kami merekomendasikan agar Anda menentukan grup konsumen yang berbeda untuk penerapan Realtime Compute for Apache Flink yang berbeda. Jika Anda menentukan grup konsumen yang sama untuk penerapan Realtime Compute for Apache Flink yang berbeda, semua data dikonsumsi. Saat Realtime Compute for Apache Flink mengonsumsi data Layanan Log Sederhana, data tidak di-shard di grup konsumen Layanan Log Sederhana. Oleh karena itu, jika penerapan berbagi grup konsumen yang sama, semua pesan di grup konsumen dikonsumsi oleh setiap penerapan.
consumeFromCheckpoint
Menentukan apakah akan mengonsumsi log dari checkpoint yang disimpan di grup konsumen yang ditentukan.
STRING
Tidak
false
Nilai yang valid:
true: Jika Anda menyetel parameter ini ke true, Anda harus menentukan grup konsumen. Setelah Anda menentukan grup konsumen, Realtime Compute for Apache Flink mengonsumsi log dari checkpoint yang disimpan di grup konsumen. Jika tidak ada checkpoint di grup konsumen, Realtime Compute for Apache Flink mengonsumsi log dari waktu yang ditentukan oleh parameter startTime.
false: Realtime Compute for Apache Flink tidak mengonsumsi log dari checkpoint yang disimpan di grup konsumen yang ditentukan. Ini adalah nilai default.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.5 atau versi lebih baru yang mendukung parameter ini.
maxRetries
Jumlah percobaan ulang yang diizinkan ketika data gagal dibaca dari Layanan Log Sederhana.
STRING
Tidak
3
Tidak tersedia
batchGetSize
Jumlah grup log dari mana data dibaca dalam satu permintaan.
STRING
Tidak
100
Nilai parameter batchGetSize tidak boleh melebihi 1000. Jika tidak, kesalahan akan dikembalikan.
exitAfterFinish
Menentukan apakah Realtime Compute for Apache Flink keluar setelah konsumsi data selesai.
STRING
Tidak
false
Nilai yang valid:
true: Realtime Compute for Apache Flink keluar setelah konsumsi data selesai.
false: Realtime Compute for Apache Flink tidak keluar setelah konsumsi data selesai. Ini adalah nilai default.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.13 atau versi lebih baru yang mendukung parameter ini.
query
Pernyataan kueri yang digunakan untuk memproses data sebelum konsumsi data di Layanan Log Sederhana.
STRING
Tidak
Tidak ada
Jika Anda mengonfigurasi parameter query, Realtime Compute for Apache Flink dapat menyaring data dari Layanan Log Sederhana sebelum Realtime Compute for Apache Flink mengonsumsi data di Layanan Log Sederhana. Ini membantu mencegah Realtime Compute for Apache Flink mengonsumsi semua data di Layanan Log Sederhana. Ini mengurangi biaya dan meningkatkan efisiensi pemrosesan data.
Sebagai contoh, jika Anda mengeksekusi pernyataan
'query' = '*| where request_method = ''GET''', Realtime Compute for Apache Flink mencocokkan data yang nilainya pada field request_method sama dengan nilai klausa GET sebelum Realtime Compute for Apache Flink membaca data dari Logstore Layanan Log Sederhana.CatatanBahasa Pemrosesan Layanan Log Sederhana (SPL) diperlukan jika Anda mengeksekusi pernyataan kueri untuk memproses data. Untuk informasi lebih lanjut, lihat Ikhtisar SPL.
PentingHanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.1 atau versi lebih baru yang mendukung parameter ini.
Untuk informasi lebih lanjut tentang wilayah di mana Layanan Log Sederhana mendukung fitur ini, lihat Konsumsi log berdasarkan aturan.
Fitur ini gratis selama fase pratinjau publik. Anda mungkin dikenakan biaya untuk Layanan Log Sederhana di masa mendatang. Untuk informasi lebih lanjut, lihat bagian "Penagihan" topik Konsumsi log berdasarkan aturan.
Parameter Hanya untuk Tabel Hasil
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
topicField
Menentukan nama field. Nilai parameter ini menimpa nilai field atribut __topic__ untuk menunjukkan topik log.
STRING
Tidak
Tidak ada
Nilai parameter ini harus berupa field yang ada di tabel.
timeField
Menentukan nama field. Nilai parameter ini menimpa nilai field atribut __timestamp__ untuk menunjukkan waktu penulisan log.
STRING
Tidak
Waktu saat ini
Nilai parameter ini harus berupa field yang ada bertipe INT di tabel. Jika tidak ada field yang ditentukan, waktu saat ini digunakan secara default.
sourceField
Menentukan nama field. Nilai parameter ini menimpa nilai field atribut __source__ untuk menunjukkan asal log. Sebagai contoh, nilainya adalah alamat IP mesin yang menghasilkan log.
STRING
Tidak
Tidak ada
Nilai parameter ini harus berupa field yang ada di tabel.
partitionField
Menentukan nama field. Nilai hash dihitung berdasarkan nilai parameter ini ketika data ditulis ke Layanan Log Sederhana. Data yang memiliki nilai hash yang sama ditulis ke shard yang sama.
STRING
Tidak
Tidak ada
Jika Anda tidak menentukan parameter ini, setiap entri data ditulis secara acak ke shard yang tersedia.
buckets
Jumlah bucket yang dikelompokkan ulang berdasarkan nilai hash ketika parameter partitionField ditentukan.
STRING
Tidak
64
Nilai yang valid: [1,256]. Nilai parameter ini harus merupakan pangkat dua bilangan bulat. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, tidak ada data yang ditulis ke shard tertentu.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.5 atau versi lebih baru yang mendukung parameter ini.
flushIntervalMs
Interval pemicuan penulisan data.
STRING
Tidak
2000
Satuan: milidetik.
writeNullProperties
Menentukan apakah akan menulis nilai null sebagai string kosong ke Layanan Log Sederhana.
BOOLEAN
Tidak
true
Nilai yang valid:
true: Nilai null ditulis sebagai string kosong ke Layanan Log Sederhana. Ini adalah nilai default.
false: Nilai null tidak ditulis ke Layanan Log Sederhana.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau versi lebih baru yang mendukung parameter ini.
Ekstraksi field atribut
Realtime Compute for Apache Flink dapat mengekstrak field log, field kustom, dan field atribut berikut.
Field | Tipe | Deskripsi |
__source__ | STRING METADATA VIRTUAL | Sumber pesan. |
__topic__ | STRING METADATA VIRTUAL | Topik pesan. |
__timestamp__ | BIGINT METADATA VIRTUAL | Waktu log. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | Tag pesan. Untuk atribut |
Untuk mengekstrak field atribut, Anda harus mendefinisikan header dalam pernyataan SQL. Contoh:
create table sls_stream(
__timestamp__ bigint HEADER,
__receive_time__ bigint HEADER
b int,
c varchar
) with (
'connector' = 'sls',
'endpoint' ='cn-hangzhou.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);Referensi
Untuk informasi lebih lanjut tentang cara menggunakan DataStream API dari Realtime Compute for Apache Flink untuk mengonsumsi data log, lihat DataStream API.