All Products
Search
Document Center

Simple Log Service:Mengonsumsi data dengan Realtime Compute for Apache Flink

Last Updated:Mar 26, 2026

Realtime Compute for Apache Flink dapat langsung mengonsumsi data dari Log Service (SLS) dengan membuat tabel sumber SLS. Topik ini menjelaskan cara membuat tabel sumber SLS dan mengekstrak field metadata.

Latar Belakang

Tabel berikut menjelaskan dukungan Flink untuk mengonsumsi data Log Service (SLS).

Kategori

Detail

Jenis yang didukung

Tabel sumber dan tabel sink.

Mode eksekusi

Hanya mode streaming yang didukung.

Metrik pemantauan spesifik

Tidak berlaku.

Format data

Tidak ada.

Jenis API

SQL.

Memperbarui atau menghapus data di tabel sink

Anda tidak dapat memperbarui atau menghapus data di tabel sink; hanya penyisipan yang didukung.

Untuk mulai mengonsumsi log menggunakan Flink, lihat Panduan cepat untuk pekerjaan Flink SQL.

Prasyarat

Batasan

  • Hanya Ververica Runtime (VVR) 11.1 dan versi yang lebih baru yang mendukung SLS sebagai sumber data tersinkronisasi untuk YAML ingesti data.

  • Konektor SLS menjamin semantik at-least-once.

  • Hindari mengatur konkurensi sumber lebih tinggi daripada jumlah shard karena hal ini akan membuang sumber daya. Pada VVR 8.0.5 dan versi sebelumnya, jika jumlah shard berubah, failover otomatis mungkin berhenti bekerja sehingga beberapa shard tidak terkonsumsi.

Buat tabel sumber dan sink SLS

Penting

Mengonsumsi data dari Log Service (SLS) dengan Flink memerlukan pekerjaan SQL lengkap. Pekerjaan SQL lengkap terdiri dari tabel sumber, tabel sink, dan pernyataan INSERT INTO untuk memindahkan data dari sumber ke sink setelah diproses.

Untuk informasi tentang cara mengembangkan pekerjaan Flink SQL, lihat Panduan pengembangan pekerjaan SQL.

Realtime Compute for Apache Flink dapat menggunakan data real-time dari Log Service (SLS) sebagai input streaming. Sebagai contoh, pertimbangkan konten log berikut:

__source__:  11.85.*.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
request_method:  GET
status:  200

Contoh

Contoh berikut menunjukkan pekerjaan SQL yang mengonsumsi data dari Log Service (SLS):

Penting

Jika nama tabel, nama kolom, atau kata kunci dalam pernyataan SQL Anda bertentangan dengan kata tercadang, bungkus dengan backtick ().

CREATE TEMPORARY TABLE sls_input(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
  request_method,
  status,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

DENGAN parameter

  • Parameter umum

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Catatan

    connector

    Jenis tabel.

    String

    Ya

    Tidak ada

    Nilai tetap: sls.

    endPoint

    Alamat titik akhir.

    String

    Ya

    Tidak ada

    Gunakan titik akhir internal SLS. Untuk informasi selengkapnya, lihat Titik akhir layanan.

    Catatan
    • Realtime Compute for Apache Flink tidak mendukung akses jaringan publik secara default. Namun, Alibaba Cloud NAT Gateway memungkinkan komunikasi antara jaringan VPC dan jaringan publik. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses jaringan publik?.

    • Hindari mengakses SLS melalui jaringan publik. Jika diperlukan, gunakan HTTPS dan aktifkan Global Accelerator (GA) untuk SLS. Untuk informasi selengkapnya, lihat Mengelola akselerasi transfer.

    project

    Nama project SLS.

    String

    Ya

    Tidak ada

    Tidak ada.

    logStore

    Nama Logstore atau Metricstore SLS.

    String

    Ya

    Tidak ada

    Logstore dan Metricstore menggunakan metode konsumsi yang sama.

    accessId

    ID AccessKey akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey saya?.

    Penting

    Untuk menghindari pengeksposan Informasi AccessKey Anda, gunakan variabel untuk menentukan nilai AccessKey. Untuk informasi selengkapnya, lihat Variabel project.

    accessKey

    Rahasia AccessKey akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada

  • Parameter khusus sumber

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Catatan

    enableNewSource

    Apakah akan menggunakan sumber berbasis FLIP-27 yang baru.

    Boolean

    Tidak

    false

    Sumber baru secara otomatis menyesuaikan perubahan shard dan mendistribusikan shard secara merata di semua task sumber.

    Penting
    • Parameter ini didukung di VVR 8.0.9 dan versi yang lebih baru. Mulai dari VVR 11.1, nilai default-nya adalah true.

    • Jika Anda mengubah parameter ini, pekerjaan tidak dapat dilanjutkan dari status sebelumnya. Untuk melanjutkan konsumsi dari offset historis, pertama-tama jalankan pekerjaan dengan parameter consumerGroup untuk mencatat progres konsumsi di kelompok konsumen SLS. Kemudian atur consumeFromCheckpoint ke true dan restart pekerjaan tanpa state.

    • Jika terdapat shard read-only di SLS, beberapa task Flink mungkin terus meminta shard yang belum diproses setelah mengonsumsi shard read-only, menyebabkan distribusi shard tidak merata dan mengurangi efisiensi konsumsi. Untuk mengatasi hal ini, sesuaikan konkurensi, optimalkan penjadwalan task, atau gabungkan shard kecil.

    shardDiscoveryIntervalMs

    Interval deteksi perubahan shard, dalam milidetik.

    Long

    Tidak

    60000

    Atur ke nilai negatif untuk menonaktifkan deteksi dinamis.

    Catatan
    • Nilai ini harus minimal 60000 ms (1 menit).

    • Parameter ini hanya berlaku ketika enableNewSource bernilai true.

    • Parameter ini didukung di VVR 8.0.9 dan versi yang lebih baru.

    startupMode

    Mode startup untuk tabel sumber.

    String

    Tidak

    timestamp

    • timestamp (default): Mengonsumsi log mulai dari waktu tertentu.

    • latest: Mengonsumsi log mulai dari offset terbaru.

    • earliest: Mengonsumsi log mulai dari offset paling awal.

    • consumer_group: Mengonsumsi log mulai dari offset yang dicatat di kelompok konsumen. Jika tidak ada offset yang dicatat untuk suatu shard, konsumsi dimulai dari offset paling awal.

    Penting
    • Versi VVR sebelum 11.1 tidak mendukung consumer_group. Atur consumeFromCheckpoint ke true. Konsumsi kemudian dimulai dari offset yang dicatat di kelompok konsumen yang ditentukan, dan startupMode tidak berpengaruh.

    startTime

    Waktu mulai untuk mengonsumsi log.

    String

    Tidak

    Waktu saat ini

    Format: yyyy-MM-dd hh:mm:ss.

    Hanya berlaku ketika startupMode diatur ke timestamp.

    Catatan

    startTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan __timestamp__.

    stopTime

    Waktu akhir untuk mengonsumsi log.

    String

    Tidak

    Tidak ada

    Format: yyyy-MM-dd hh:mm:ss.

    Catatan
    • Gunakan parameter ini hanya untuk mengonsumsi log historis. Atur ke waktu lampau. Jika diatur ke waktu mendatang, konsumsi mungkin berakhir lebih awal karena kurangnya log baru, yang dapat mengganggu aliran data secara diam-diam.

    • Untuk keluar dari program Flink ketika konsumsi log selesai, atur juga exitAfterFinish=true.

    consumerGroup

    Nama kelompok konsumen.

    String

    Tidak

    Tidak ada

    Kelompok konsumen melacak progres konsumsi. Anda dapat menggunakan nama kustom apa pun.

    Catatan

    Anda tidak dapat mengoordinasikan konsumsi di beberapa pekerjaan menggunakan kelompok konsumen yang sama. Setiap pekerjaan Flink harus menggunakan kelompok konsumen unik. Jika beberapa pekerjaan berbagi kelompok konsumen yang sama, mereka akan mengonsumsi semua data. Hal ini karena Flink tidak menggunakan kelompok konsumen SLS untuk penugasan partisi. Setiap konsumen memproses semua pesan secara independen.

    consumeFromCheckpoint

    Apakah akan memulai konsumsi dari checkpoint kelompok konsumen yang ditentukan.

    String

    Tidak

    false

    • true: Program Flink memulai konsumsi dari checkpoint kelompok konsumen yang ditentukan. Jika tidak ada checkpoint, konsumsi dimulai dari nilai startTime.

    • false (default): Tidak mengonsumsi log mulai dari checkpoint yang disimpan di kelompok konsumen yang ditentukan.

    Penting

    VVR 11.1 dan versi yang lebih baru tidak mendukung parameter ini. Untuk VVR 11.1 dan versi yang lebih baru, atur startupMode ke consumer_group.

    maxRetries

    Jumlah percobaan ulang setelah pembacaan SLS gagal.

    String

    Tidak

    3

    Tidak ada.

    batchGetSize

    Jumlah kelompok log yang dibaca per permintaan.

    String

    Tidak

    100

    Nilai batchGetSize tidak boleh melebihi 1000. Jika tidak, terjadi error.

    exitAfterFinish

    Apakah program Flink keluar setelah konsumsi data selesai.

    String

    Tidak

    false

    • true: Program Flink keluar setelah konsumsi data selesai.

    • false (default): Program Flink terus berjalan setelah konsumsi data selesai.

    query

    Penting

    Tidak digunakan lagi sejak VVR 11.3. Masih didukung di versi yang lebih baru untuk kompatibilitas mundur.

    Pernyataan pra-pemrosesan konsumsi SLS.

    String

    Tidak

    Tidak ada

    Gunakan parameter ini untuk memfilter data SLS sebelum dikonsumsi, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan.

    Sebagai contoh, 'query' = '*| where request_method = ''GET''' mencocokkan log di mana field request_method bernilai GET sebelum Flink membacanya.

    Catatan

    Gunakan sintaks SPL untuk kueri. Untuk informasi selengkapnya, lihat Sintaks SPL.

    Penting
    • Parameter ini didukung di VVR 8.0.1 dan versi yang lebih baru.

    • Fitur ini dikenai biaya SLS. Untuk informasi selengkapnya, lihat Harga.

    processor

    Prosesor konsumen SLS. Memiliki prioritas lebih tinggi daripada query jika keduanya ada.

    String

    Tidak

    Tidak ada

    Gunakan parameter ini untuk memfilter data SLS sebelum dikonsumsi, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan. Kami merekomendasikan menggunakan processor daripada query.

    Sebagai contoh, 'processor' = 'test-filter-processor' menerapkan prosesor konsumen SLS sebelum Flink membaca data.

    Catatan

    Gunakan sintaks SPL untuk processor. Untuk informasi selengkapnya, lihat Sintaks SPL. Untuk petunjuk membuat atau memperbarui prosesor konsumen, lihat Mengelola prosesor konsumen.

    Penting

    Parameter ini didukung di VVR 11.3 dan versi yang lebih baru.

    Fitur ini dikenai biaya SLS. Untuk informasi selengkapnya, lihat Harga.

  • Hanya untuk tabel sink

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Catatan

    topicField

    Field yang nilainya menggantikan field metadata __topic__, merepresentasikan topik log.

    String

    Tidak

    Tidak ada

    Field ini harus ada di tabel.

    timeField

    Field yang nilainya menggantikan field metadata __timestamp__, merepresentasikan waktu penulisan log.

    String

    Tidak

    Waktu saat ini

    Field ini harus ada di tabel dan memiliki tipe data INT. Jika tidak ditentukan, waktu saat ini digunakan.

    sourceField

    Field yang nilainya menggantikan field metadata __source__, merepresentasikan sumber log (misalnya, alamat IP mesin yang menghasilkan log).

    String

    Tidak

    Tidak ada

    Field ini harus ada di tabel.

    partitionField

    Field yang digunakan untuk routing shard. Data ditulis ke shard berdasarkan nilai hash field ini. Catatan dengan nilai hash yang sama masuk ke shard yang sama.

    String

    Tidak

    Tidak ada

    Jika tidak ditentukan, setiap catatan ditulis secara acak ke shard yang tersedia.

    buckets

    Jumlah bucket untuk menetapkan ulang data ketika partitionField ditentukan.

    String

    Tidak

    64

    Nilai valid: bilangan bulat dari 1 hingga 256 yang merupakan pangkat dua. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, beberapa shard tidak menerima data.

    flushIntervalMs

    Interval yang memicu penulisan data.

    String

    Tidak

    2000

    Unit: milidetik.

    writeNullProperties

    Apakah akan menulis nilai null sebagai string kosong ke SLS.

    Boolean

    Tidak

    true

    • true (default): Menulis nilai null sebagai string kosong.

    • false: Melewatkan field dengan nilai null saat menulis.

    Catatan

    Parameter ini didukung di VVR 8.0.6 dan versi yang lebih baru.

Ekstrak field metadata

Selain field log, Anda dapat mengekstrak empat field metadata berikut dan field kustom lainnya.

Parameter

Tipe

Deskripsi

__source__

STRING METADATA VIRTUAL

Sumber pesan.

__topic__

STRING METADATA VIRTUAL

Topik pesan.

__timestamp__

BIGINT METADATA VIRTUAL

Waktu log.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

Tag pesan.

Untuk properti seperti "__tag__:__receive_time__":"1616742274", kunci ('__receive_time__') dan nilai ('1616742274') disimpan dalam map. Anda dapat mengakses nilai tersebut dalam SQL dengan __tag__['__receive_time__'].

Untuk mengekstrak field metadata, deklarasikan dengan kata kunci METADATA dalam pernyataan CREATE TABLE Anda, seperti pada contoh berikut:

create table sls_stream(
  `__timestamp__` bigint METADATA,
  b int,
  c varchar
) with (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

Dokumen terkait

Untuk mempelajari cara mengonsumsi data menggunakan Flink DataStream API, lihat DataStream API.