全部产品
Search
文档中心

Simple Log Service:Gunakan Realtime Compute for Apache Flink untuk mengonsumsi data log

更新时间:Jul 06, 2025

Anda dapat menggunakan Realtime Compute for Apache Flink untuk membuat tabel sumber Layanan Log Sederhana guna mengonsumsi data log. Topik ini menjelaskan cara membuat tabel sumber dan mengekstrak field atribut yang terlibat dalam proses pembuatan.

Informasi latar belakang

Tabel berikut menjelaskan pengaturan yang perlu dikonfigurasi agar Realtime Compute for Apache Flink dapat mengonsumsi data log.

Kategori

Deskripsi

Jenis yang didukung

Anda dapat mengonfigurasi tabel sumber dan tabel hasil.

Mode operasi

Hanya mode streaming yang didukung.

Metrik

Metrik tidak didukung.

Format data

Tidak ada.

Tipe API

Pernyataan SQL didukung.

Apakah data log dapat diperbarui atau dihapus di tabel hasil

Anda tidak dapat memperbarui atau menghapus data log di tabel hasil. Anda hanya dapat menyisipkan data log ke tabel hasil.

Untuk informasi lebih lanjut tentang cara menggunakan Realtime Compute for Apache Flink untuk mengonsumsi data log, lihat Memulai dengan Penerapan Flink SQL.

Prasyarat

  • Jika Anda ingin menggunakan Pengguna Resource Access Management (RAM) atau Peran RAM untuk mengonsumsi data log, pastikan bahwa Pengguna RAM atau Peran RAM memiliki izin yang diperlukan pada konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Manajemen Izin.

  • Ruang kerja Realtime Compute for Apache Flink telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.

  • Proyek dan penyimpanan log telah dibuat. Untuk informasi lebih lanjut, lihat Buat Proyek dan Penyimpanan Log.

Batasan

  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 2.0.0 atau versi lebih baru yang mendukung konektor Layanan Log Sederhana.

  • Konektor Layanan Log Sederhana hanya mendukung semantik setidaknya sekali.

  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.13 atau versi lebih baru yang mendukung failover otomatis penerapan akibat perubahan jumlah shard.

  • Disarankan untuk tidak menyetel paralelisme penerapan untuk node sumber ke nilai yang lebih besar dari jumlah shard. Jika paralelisme penerapan untuk node sumber lebih besar dari jumlah shard, sumber daya mungkin terbuang. Di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.5 atau versi lebih lama, jika jumlah shard berubah, failover otomatis penerapan mungkin menjadi tidak valid dan shard tertentu mungkin tidak dikonsumsi.

Buat tabel sumber Layanan Log Sederhana dan tabel hasil

Penting

Anda harus mengembangkan draf SQL lengkap sebelum menggunakan Realtime Compute for Apache Flink untuk mengonsumsi data log di Layanan Log Sederhana. Draf SQL lengkap mencakup tabel sumber dan tabel hasil. Setelah data log di tabel sumber diproses, hasilnya dimasukkan ke tabel hasil menggunakan pernyataan INSERT INTO.

Untuk informasi lebih lanjut tentang cara mengembangkan draf SQL di Realtime Compute for Apache Flink, lihat Kembangkan Draf SQL.

Layanan Log Sederhana menyimpan data log secara real-time. Realtime Compute for Apache Flink dapat membaca data tersebut dalam mode streaming sebagai data masukan. Kode berikut memberikan contoh log:

__source__:  11.85.*.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
request_method:  GET
status:  200

Contoh kode

Kode berikut memberikan contoh draf SQL yang dapat Anda kembangkan di Realtime Compute for Apache Flink untuk mengonsumsi data log di Layanan Log Sederhana.

Penting

Jika nama tabel, kolom, dan field cadangan dalam draf SQL bertentangan satu sama lain, Anda harus mengapit nama-nama tersebut dengan backtick (`).

CREATE TEMPORARY TABLE sls_input(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
  request_method,
  status,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

Parameter dalam klausa WITH

  • Parameter Umum

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Jenis tabel.

    STRING

    Ya

    Tidak ada

    Atur nilainya menjadi sls.

    endPoint

    Titik akhir Layanan Log Sederhana.

    STRING

    Ya

    Tidak ada

    Masukkan titik akhir internal Layanan Log Sederhana. Untuk informasi lebih lanjut, lihat Titik Akhir.

    Catatan
    • Secara default, Realtime Compute for Apache Flink tidak dapat mengakses Internet. Alibaba Cloud menyediakan NAT gateway untuk memungkinkan komunikasi antara virtual private cloud (VPC) dan Internet. Untuk informasi lebih lanjut, lihat Operasi Konsol.

    • Kami merekomendasikan agar Anda tidak mengakses Layanan Log Sederhana melalui Internet. Jika Anda ingin mengakses Layanan Log Sederhana melalui Internet, gunakan HTTPS dan aktifkan fitur percepatan global. Untuk informasi lebih lanjut, Gunakan fitur percepatan transfer.

    project

    Nama proyek Layanan Log Sederhana.

    STRING

    Ya

    Tidak ada

    Tidak tersedia

    logStore

    Nama Logstore atau penyimpanan metrik Layanan Log Sederhana.

    STRING

    Ya

    Tidak ada

    Data di Logstore dikonsumsi dengan cara yang sama seperti di penyimpanan metrik.

    accessId

    ID AccessKey akun Alibaba Cloud Anda.

    STRING

    Ya

    Tidak ada

    Untuk informasi lebih lanjut, lihat bagian "Bagaimana cara melihat informasi tentang ID AccessKey dan rahasia AccessKey akun?" topik Referensi.

    Penting

    Untuk melindungi pasangan AccessKey Anda, kami merekomendasikan agar Anda mengonfigurasi ID AccessKey menggunakan metode manajemen kunci. Untuk informasi lebih lanjut, lihat Kelola variabel dan kunci.

    accessKey

    Rahasia AccessKey akun Alibaba Cloud Anda.

    STRING

    Ya

    Tidak ada

    Untuk informasi lebih lanjut, lihat bagian "Bagaimana cara melihat informasi tentang ID AccessKey dan rahasia AccessKey akun?" topik Referensi.

    Penting

    Untuk melindungi pasangan AccessKey Anda, kami merekomendasikan agar Anda mengonfigurasi rahasia AccessKey menggunakan metode manajemen kunci. Untuk informasi lebih lanjut, lihat Kelola variabel dan kunci.

  • Parameter Hanya untuk Tabel Sumber

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    enableNewSource

    Menentukan apakah akan menggunakan antarmuka sumber FLIP-27 yang direfaktor.

    BOOLEAN

    Tidak

    false

    Antarmuka sumber FLIP-27 yang direfaktor secara otomatis menyesuaikan dengan perubahan shard. Ini memastikan bahwa shard didistribusikan secara merata di semua sumber data.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.9 atau versi lebih baru yang mendukung parameter ini.

    Penting

    Penerapan tidak dapat dipulihkan dari keadaan tertentu setelah nilai parameter ini berubah.

    Anda dapat mengonfigurasi parameter consumerGroup untuk memulai penerapan. Saat penerapan mengonsumsi data Layanan Log Sederhana, Realtime Compute for Apache Flink mencatat offset konsumen saat ini di grup konsumen Layanan Log Sederhana. Dalam hal ini, Anda dapat menyetel parameter consumeFromCheckpoint ke true dan memulai penerapan tanpa keadaan. Dengan cara ini, penerapan melanjutkan konsumsi dari offset konsumen saat ini.

    shardDiscoveryIntervalMs

    Interval deteksi dinamis perubahan shard. Satuan: milidetik.

    LONG

    Tidak

    60000

    Jika Anda menyetel parameter ini ke nilai negatif, fitur deteksi dinamis dapat dinonaktifkan.

    Catatan
    • Parameter ini hanya berlaku jika parameter enableNewSource disetel ke true.

    • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.9 atau versi lebih baru yang mendukung parameter ini.

    startupMode

    Mode startup tabel sumber.

    STRING

    Tidak

    timestamp

    Nilai yang valid:

    • timestamp: Log dikonsumsi dari waktu mulai yang ditentukan. Ini adalah nilai default.

    • latest: Log dikonsumsi dari offset terbaru.

    • earliest: Log dikonsumsi dari offset paling awal.

    Catatan

    Jika Anda menyetel parameter consumeFromCheckpoint ke true, log dikonsumsi dari checkpoint yang disimpan di grup konsumen yang ditentukan. Mode startup yang ditentukan oleh parameter ini tidak berlaku.

    startTime

    Waktu mulai konsumsi log.

    STRING

    Tidak

    Waktu saat ini

    Format: yyyy-MM-dd hh:mm:ss.

    Parameter ini hanya berlaku jika parameter startupMode disetel ke timestamp.

    Catatan

    Parameter startTime dan stopTime dikonfigurasikan berdasarkan field atribut __receive_time__ di tabel sumber Layanan Log Sederhana tetapi tidak berdasarkan field atribut __timestamp__.

    stopTime

    Waktu berhenti konsumsi log.

    STRING

    Tidak

    Tidak ada

    Format: yyyy-MM-dd hh:mm:ss.

    Catatan

    Jika Anda ingin Realtime Compute for Apache Flink keluar setelah konsumsi log selesai, Anda harus mengonfigurasi parameter exitAfterFinish bersama dengan parameter ini dan menyetel parameter exitAfterFinish ke true.

    consumerGroup

    Nama grup konsumen.

    STRING

    Tidak

    Tidak ada

    Grup konsumen mencatat kemajuan konsumsi. Anda dapat menentukan nama grup konsumen kustom. Format nama tidak tetap.

    Catatan

    Grup konsumen tidak dapat digunakan bersama oleh beberapa penerapan untuk konsumsi kolaboratif. Kami merekomendasikan agar Anda menentukan grup konsumen yang berbeda untuk penerapan Realtime Compute for Apache Flink yang berbeda. Jika Anda menentukan grup konsumen yang sama untuk penerapan Realtime Compute for Apache Flink yang berbeda, semua data dikonsumsi. Saat Realtime Compute for Apache Flink mengonsumsi data Layanan Log Sederhana, data tidak di-shard di grup konsumen Layanan Log Sederhana. Oleh karena itu, jika penerapan berbagi grup konsumen yang sama, semua pesan di grup konsumen dikonsumsi oleh setiap penerapan.

    consumeFromCheckpoint

    Menentukan apakah akan mengonsumsi log dari checkpoint yang disimpan di grup konsumen yang ditentukan.

    STRING

    Tidak

    false

    Nilai yang valid:

    • true: Jika Anda menyetel parameter ini ke true, Anda harus menentukan grup konsumen. Setelah Anda menentukan grup konsumen, Realtime Compute for Apache Flink mengonsumsi log dari checkpoint yang disimpan di grup konsumen. Jika tidak ada checkpoint di grup konsumen, Realtime Compute for Apache Flink mengonsumsi log dari waktu yang ditentukan oleh parameter startTime.

    • false: Realtime Compute for Apache Flink tidak mengonsumsi log dari checkpoint yang disimpan di grup konsumen yang ditentukan. Ini adalah nilai default.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.5 atau versi lebih baru yang mendukung parameter ini.

    maxRetries

    Jumlah percobaan ulang yang diizinkan ketika data gagal dibaca dari Layanan Log Sederhana.

    STRING

    Tidak

    3

    Tidak tersedia

    batchGetSize

    Jumlah grup log dari mana data dibaca dalam satu permintaan.

    STRING

    Tidak

    100

    Nilai parameter batchGetSize tidak boleh melebihi 1000. Jika tidak, kesalahan akan dikembalikan.

    exitAfterFinish

    Menentukan apakah Realtime Compute for Apache Flink keluar setelah konsumsi data selesai.

    STRING

    Tidak

    false

    Nilai yang valid:

    • true: Realtime Compute for Apache Flink keluar setelah konsumsi data selesai.

    • false: Realtime Compute for Apache Flink tidak keluar setelah konsumsi data selesai. Ini adalah nilai default.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.13 atau versi lebih baru yang mendukung parameter ini.

    query

    Pernyataan kueri yang digunakan untuk memproses data sebelum konsumsi data di Layanan Log Sederhana.

    STRING

    Tidak

    Tidak ada

    Jika Anda mengonfigurasi parameter query, Realtime Compute for Apache Flink dapat menyaring data dari Layanan Log Sederhana sebelum Realtime Compute for Apache Flink mengonsumsi data di Layanan Log Sederhana. Ini membantu mencegah Realtime Compute for Apache Flink mengonsumsi semua data di Layanan Log Sederhana. Ini mengurangi biaya dan meningkatkan efisiensi pemrosesan data.

    Sebagai contoh, jika Anda mengeksekusi pernyataan 'query' = '*| where request_method = ''GET''', Realtime Compute for Apache Flink mencocokkan data yang nilainya pada field request_method sama dengan nilai klausa GET sebelum Realtime Compute for Apache Flink membaca data dari Logstore Layanan Log Sederhana.

    Catatan

    Bahasa Pemrosesan Layanan Log Sederhana (SPL) diperlukan jika Anda mengeksekusi pernyataan kueri untuk memproses data. Untuk informasi lebih lanjut, lihat Ikhtisar SPL.

    Penting
    • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.1 atau versi lebih baru yang mendukung parameter ini.

    • Untuk informasi lebih lanjut tentang wilayah di mana Layanan Log Sederhana mendukung fitur ini, lihat Konsumsi log berdasarkan aturan.

    • Fitur ini gratis selama fase pratinjau publik. Anda mungkin dikenakan biaya untuk Layanan Log Sederhana di masa mendatang. Untuk informasi lebih lanjut, lihat bagian "Penagihan" topik Konsumsi log berdasarkan aturan.

  • Parameter Hanya untuk Tabel Hasil

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    topicField

    Menentukan nama field. Nilai parameter ini menimpa nilai field atribut __topic__ untuk menunjukkan topik log.

    STRING

    Tidak

    Tidak ada

    Nilai parameter ini harus berupa field yang ada di tabel.

    timeField

    Menentukan nama field. Nilai parameter ini menimpa nilai field atribut __timestamp__ untuk menunjukkan waktu penulisan log.

    STRING

    Tidak

    Waktu saat ini

    Nilai parameter ini harus berupa field yang ada bertipe INT di tabel. Jika tidak ada field yang ditentukan, waktu saat ini digunakan secara default.

    sourceField

    Menentukan nama field. Nilai parameter ini menimpa nilai field atribut __source__ untuk menunjukkan asal log. Sebagai contoh, nilainya adalah alamat IP mesin yang menghasilkan log.

    STRING

    Tidak

    Tidak ada

    Nilai parameter ini harus berupa field yang ada di tabel.

    partitionField

    Menentukan nama field. Nilai hash dihitung berdasarkan nilai parameter ini ketika data ditulis ke Layanan Log Sederhana. Data yang memiliki nilai hash yang sama ditulis ke shard yang sama.

    STRING

    Tidak

    Tidak ada

    Jika Anda tidak menentukan parameter ini, setiap entri data ditulis secara acak ke shard yang tersedia.

    buckets

    Jumlah bucket yang dikelompokkan ulang berdasarkan nilai hash ketika parameter partitionField ditentukan.

    STRING

    Tidak

    64

    Nilai yang valid: [1,256]. Nilai parameter ini harus merupakan pangkat dua bilangan bulat. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, tidak ada data yang ditulis ke shard tertentu.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.5 atau versi lebih baru yang mendukung parameter ini.

    flushIntervalMs

    Interval pemicuan penulisan data.

    STRING

    Tidak

    2000

    Satuan: milidetik.

    writeNullProperties

    Menentukan apakah akan menulis nilai null sebagai string kosong ke Layanan Log Sederhana.

    BOOLEAN

    Tidak

    true

    Nilai yang valid:

    • true: Nilai null ditulis sebagai string kosong ke Layanan Log Sederhana. Ini adalah nilai default.

    • false: Nilai null tidak ditulis ke Layanan Log Sederhana.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau versi lebih baru yang mendukung parameter ini.

Ekstraksi field atribut

Realtime Compute for Apache Flink dapat mengekstrak field log, field kustom, dan field atribut berikut.

Field

Tipe

Deskripsi

__source__

STRING METADATA VIRTUAL

Sumber pesan.

__topic__

STRING METADATA VIRTUAL

Topik pesan.

__timestamp__

BIGINT METADATA VIRTUAL

Waktu log.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

Tag pesan.

Untuk atribut "__tag__:__receive_time__":"1616742274", field __receive_time__ dan 1616742274 dicatat sebagai pasangan kunci-nilai dalam map. Anda dapat menyertakan __tag__['__receive_time__'] dalam pernyataan SQL untuk menanyakan tag tersebut.

Untuk mengekstrak field atribut, Anda harus mendefinisikan header dalam pernyataan SQL. Contoh:

create table sls_stream(
  __timestamp__ bigint HEADER,
  __receive_time__ bigint HEADER
  b int,
  c varchar
) with (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

Referensi

Untuk informasi lebih lanjut tentang cara menggunakan DataStream API dari Realtime Compute for Apache Flink untuk mengonsumsi data log, lihat DataStream API.