All Products
Search
Document Center

Realtime Compute for Apache Flink:Simple Log Service (SLS)

Last Updated:Mar 05, 2026

Topik ini menjelaskan cara menggunakan konektor Simple Log Service (SLS).

Informasi latar belakang

Simple Log Service adalah layanan komprehensif untuk data log yang memungkinkan Anda mengumpulkan, mengonsumsi, mengirimkan, dan mengkueri data log secara cepat guna meningkatkan efisiensi operasional serta O&M, sekaligus membangun kemampuan pemrosesan log berskala besar.

Tabel berikut mencantumkan informasi yang didukung oleh konektor SLS.

Kategori

Detail

Jenis yang didukung

Tabel sumber dan tabel sink

Mode eksekusi

Hanya mode streaming

Metrik pemantauan kustom

Tidak berlaku

Format data

Tidak ada

Jenis API

SQL, DataStream, dan YAML data ingestion

Dukungan untuk memperbarui atau menghapus data tabel sink

Tidak mendukung pembaruan atau penghapusan data tabel sink. Hanya mendukung operasi insert.

Fitur utama

Tabel sumber konektor SLS mendukung pembacaan langsung bidang metadata pesan. Bidang metadata yang didukung tercantum dalam tabel berikut.

Nama bidang

Tipe bidang

Deskripsi

__source__

STRING METADATA VIRTUAL

Sumber pesan.

__topic__

STRING METADATA VIRTUAL

Topik pesan.

__timestamp__

BIGINT METADATA VIRTUAL

Waktu log.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

Tag pesan.

Untuk properti "__tag__:__receive_time__":"1616742274", '__receive_time__' dan '1616742274' disimpan sebagai pasangan kunci-nilai dalam Map dan dapat diakses dalam SQL menggunakan __tag__['__receive_time__'].

Prasyarat

Anda harus membuat proyek SLS dan Logstore. Untuk informasi selengkapnya, lihat Buat proyek dan Logstore.

Batasan

  • Hanya Ververica Runtime (VVR) 11.1 dan versi yang lebih baru yang mendukung SLS sebagai sumber data tersinkronisasi untuk YAML data ingestion.

  • Konektor SLS menjamin semantik at-least-once.

  • Hindari mengatur konkurensi sumber lebih tinggi daripada jumlah shard karena hal ini membuang sumber daya. Pada VVR 8.0.5 dan versi sebelumnya, jika jumlah shard berubah, failover otomatis mungkin berhenti bekerja sehingga beberapa shard tidak dikonsumsi.

SQL

Sintaksis

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

dengan parameter

  • Parameter umum

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Catatan

    connector

    Jenis tabel.

    String

    Ya

    Tidak ada

    Nilai tetap: sls.

    endPoint

    Alamat titik akhir.

    String

    Ya

    Tidak ada

    Masukkan titik akhir jaringan pribadi untuk SLS. Untuk informasi selengkapnya, lihat Titik akhir layanan.

    Catatan
    • Realtime Compute for Apache Flink tidak mendukung akses jaringan publik secara default. Namun, Alibaba Cloud NAT Gateway memungkinkan komunikasi antara jaringan VPC dan jaringan publik. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses jaringan publik?.

    • Hindari mengakses SLS melalui jaringan publik. Jika diperlukan, gunakan HTTPS dan aktifkan Global Accelerator (GA) untuk SLS. Untuk informasi selengkapnya, lihat Kelola akselerasi transfer.

    project

    Nama proyek SLS.

    String

    Ya

    Tidak ada

    Tidak ada.

    logStore

    Nama Logstore atau metricstore SLS.

    String

    Ya

    Tidak ada

    Logstore dan metricstore menggunakan metode konsumsi yang sama.

    accessId

    ID AccessKey Akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey saya?.

    Penting

    Untuk menghindari paparan Informasi AccessKey Anda, gunakan variabel untuk menentukan nilai AccessKey. Untuk informasi selengkapnya, lihat Variabel proyek.

    accessKey

    Rahasia AccessKey Akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada

  • Parameter khusus sumber

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Catatan

    enableNewSource

    Apakah akan mengaktifkan sumber baru yang mengimplementasikan antarmuka FLIP-27.

    Boolean

    Tidak

    false

    Sumber baru secara otomatis menyesuaikan perubahan shard dan mendistribusikan shard secara merata ke semua task sumber.

    Penting
    • Parameter ini didukung pada VVR 8.0.9 dan versi yang lebih baru. Mulai dari VVR 11.1, nilai default-nya adalah true.

    • Jika Anda mengubah parameter ini, pekerjaan tidak dapat dilanjutkan dari status sebelumnya. Untuk melanjutkan konsumsi dari offset historis, pertama-tama mulai pekerjaan dengan parameter consumerGroup untuk mencatat progres konsumsi dalam kelompok konsumen SLS. Kemudian atur consumeFromCheckpoint ke true dan restart pekerjaan tanpa state.

    • Jika terdapat shard read-only di SLS, beberapa task Flink mungkin terus meminta shard lain yang belum diproses setelah menyelesaikan konsumsi shard read-only. Hal ini dapat menyebabkan distribusi shard yang tidak merata di antara task konkuren, sehingga mengurangi efisiensi konsumsi keseluruhan dan kinerja sistem. Untuk mengatasi masalah ini, sesuaikan konkurensi, optimalkan penjadwalan task, atau gabungkan shard kecil untuk mengurangi jumlah shard dan kompleksitas penugasan task.

    shardDiscoveryIntervalMs

    Interval untuk mendeteksi perubahan shard secara dinamis, dalam milidetik.

    Long

    Tidak

    60000

    Atur ke nilai negatif untuk menonaktifkan deteksi dinamis.

    Catatan
    • Nilai ini harus minimal 60000 ms (1 menit).

    • Parameter ini hanya berlaku ketika enableNewSource bernilai true.

    • Parameter ini didukung pada VVR 8.0.9 dan versi yang lebih baru.

    startupMode

    Mode startup untuk tabel sumber.

    String

    Tidak

    timestamp

    • timestamp (default): Mengonsumsi log mulai dari waktu tertentu.

    • latest: Mengonsumsi log mulai dari offset terbaru.

    • earliest: Mengonsumsi log mulai dari offset paling awal.

    • consumer_group: Mengonsumsi log mulai dari offset yang dicatat dalam kelompok konsumen. Jika tidak ada offset yang dicatat untuk suatu shard, konsumsi dimulai dari offset paling awal.

    Penting
    • Versi VVR sebelum 11.1 tidak mendukung consumer_group. Atur consumeFromCheckpoint ke true. Konsumsi kemudian dimulai dari offset yang dicatat dalam kelompok konsumen yang ditentukan, dan startupMode tidak berpengaruh.

    startTime

    Waktu mulai untuk mengonsumsi log.

    String

    Tidak

    Waktu saat ini

    Format: yyyy-MM-dd hh:mm:ss.

    Hanya berlaku ketika startupMode diatur ke timestamp.

    Catatan

    startTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan __timestamp__.

    stopTime

    Waktu akhir untuk mengonsumsi log.

    String

    Tidak

    Tidak ada

    Format: yyyy-MM-dd hh:mm:ss.

    Catatan
    • Gunakan parameter ini hanya untuk mengonsumsi log historis. Atur ke waktu lampau. Jika diatur ke waktu mendatang, konsumsi mungkin berakhir lebih awal karena kurangnya log baru, menyebabkan gangguan aliran data tanpa pesan error.

    • Untuk menghentikan program Flink ketika konsumsi log selesai, atur juga exitAfterFinish=true.

    consumerGroup

    Nama kelompok konsumen.

    String

    Tidak

    Tidak ada

    Kelompok konsumen mencatat progres konsumsi. Anda dapat menentukan nama kustom tanpa batasan.

    Catatan

    Anda tidak dapat mengoordinasikan konsumsi lintas beberapa pekerjaan menggunakan kelompok konsumen yang sama. Setiap pekerjaan Flink harus menggunakan kelompok konsumen yang unik. Jika beberapa pekerjaan berbagi kelompok konsumen yang sama, mereka akan mengonsumsi semua data. Hal ini terjadi karena Flink tidak menetapkan partisi melalui kelompok konsumen SLS, sehingga setiap konsumen memproses pesan secara independen terlepas dari kelompok yang dibagikan.

    consumeFromCheckpoint

    Apakah akan mengonsumsi log mulai dari checkpoint yang disimpan dalam kelompok konsumen yang ditentukan.

    String

    Tidak

    false

    • true: Tentukan kelompok konsumen. Program Flink mengonsumsi log mulai dari checkpoint yang disimpan dalam kelompok tersebut. Jika tidak ada checkpoint, konsumsi dimulai dari nilai startTime.

    • false (default): Jangan mengonsumsi log mulai dari checkpoint yang disimpan dalam kelompok konsumen yang ditentukan.

    Penting

    VVR 11.1 dan versi yang lebih baru tidak mendukung parameter ini. Untuk VVR 11.1 dan versi yang lebih baru, atur startupMode ke consumer_group.

    maxRetries

    Jumlah percobaan ulang setelah pembacaan SLS gagal.

    String

    Tidak

    3

    Tidak ada.

    batchGetSize

    Jumlah kelompok log yang dibaca per permintaan.

    String

    Tidak

    100

    Nilai batchGetSize tidak boleh melebihi 1000. Jika tidak, terjadi error.

    exitAfterFinish

    Apakah program Flink keluar setelah konsumsi data selesai.

    String

    Tidak

    false

    • true: Program Flink keluar setelah konsumsi data selesai.

    • false (default): Program Flink terus berjalan setelah konsumsi data selesai.

    query

    Penting

    Usang mulai dari VVR 11.3. Masih kompatibel di versi yang lebih baru.

    Pernyataan pra-pemrosesan konsumsi SLS.

    String

    Tidak

    Tidak ada

    Gunakan parameter query untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari konsumsi semua data ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan.

    Misalnya, 'query' = '*| where request_method = ''GET''' mencocokkan log yang bidang request_method-nya sama dengan GET sebelum Flink membacanya.

    Catatan

    Gunakan sintaksis SPL untuk kueri. Untuk informasi selengkapnya, lihat Sintaksis SPL.

    Penting
    • Parameter ini didukung pada VVR 8.0.1 dan versi yang lebih baru.

    • Fitur ini dikenai biaya SLS. Untuk informasi selengkapnya, lihat Harga.

    processor

    Prosesor konsumen SLS. Memiliki prioritas lebih tinggi daripada query jika keduanya ada.

    String

    Tidak

    Tidak ada

    Gunakan parameter processor untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari konsumsi semua data ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan. Kami merekomendasikan processor daripada query.

    Misalnya, 'processor' = 'test-filter-processor' menerapkan prosesor konsumen SLS sebelum Flink membaca data.

    Catatan

    Gunakan sintaksis SPL untuk prosesor. Untuk informasi selengkapnya, lihat Sintaksis SPL. Untuk petunjuk membuat atau memperbarui prosesor konsumen, lihat Kelola prosesor konsumen.

    Penting

    Parameter ini didukung pada VVR 11.3 dan versi yang lebih baru.

    Fitur ini dikenai biaya SLS. Untuk informasi selengkapnya, lihat Harga.

  • Hanya untuk tabel sink

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Catatan

    topicField

    Nama bidang yang nilainya menggantikan bidang metadata __topic__. Mewakili topik log.

    String

    Tidak

    Tidak ada

    Bidang ini harus ada dalam tabel.

    timeField

    Nama bidang yang nilainya menggantikan bidang metadata __timestamp__. Mewakili waktu penulisan log.

    String

    Tidak

    Waktu saat ini

    Bidang ini harus ada dalam tabel dan memiliki tipe data INT. Jika tidak ditentukan, waktu saat ini digunakan.

    sourceField

    Nama bidang yang nilainya menggantikan bidang metadata __source__. Mewakili sumber log, seperti alamat IP mesin yang menghasilkan log.

    String

    Tidak

    Tidak ada

    Bidang ini harus ada dalam tabel.

    partitionField

    Nama bidang. Data ditulis ke shard berdasarkan nilai hash bidang ini. Data dengan nilai hash yang sama masuk ke shard yang sama.

    String

    Tidak

    Tidak ada

    Jika tidak ditentukan, setiap catatan ditulis secara acak ke shard yang tersedia.

    buckets

    Jumlah bucket untuk menetapkan ulang data ketika partitionField ditentukan.

    String

    Tidak

    64

    Nilai valid: bilangan bulat dari 1 hingga 256 yang merupakan pangkat dua. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, beberapa shard tidak menerima data.

    flushIntervalMs

    Interval waktu yang memicu penulisan data.

    String

    Tidak

    2000

    Unit: milidetik.

    writeNullProperties

    Apakah akan menulis nilai null sebagai string kosong ke SLS.

    Boolean

    Tidak

    true

    • true (default): Menulis nilai null sebagai string kosong.

    • false: Melewati bidang dengan nilai null saat menulis.

    Catatan

    Parameter ini didukung pada VVR 8.0.6 dan versi yang lebih baru.

Pemetaan tipe

Tipe bidang Flink

Tipe bidang SLS

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Data ingestion (pratinjau publik)

Batasan

Fitur ini hanya didukung pada VVR 11.1 dan versi yang lebih baru.

Sintaksis

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

Item konfigurasi

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

type

Jenis sumber data.

String

Ya

Tidak ada

Nilai tetap: sls.

endpoint

Alamat titik akhir.

String

Ya

Tidak ada

Masukkan titik akhir jaringan pribadi untuk SLS. Untuk informasi selengkapnya, lihat Titik akhir layanan.

Catatan
  • Realtime Compute for Apache Flink tidak mendukung akses jaringan publik secara default. Namun, Alibaba Cloud NAT Gateway memungkinkan komunikasi antara jaringan VPC dan jaringan publik. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses jaringan publik?.

  • Hindari mengakses SLS melalui jaringan publik. Jika diperlukan, gunakan HTTPS dan aktifkan Global Accelerator (GA) untuk SLS. Untuk informasi selengkapnya, lihat Kelola akselerasi transfer.

accessId

ID AccessKey Akun Alibaba Cloud Anda.

String

Ya

Tidak ada

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey saya?.

Penting

Untuk menghindari paparan Informasi AccessKey Anda, gunakan variabel untuk menentukan nilai AccessKey. Untuk informasi selengkapnya, lihat Variabel proyek.

accessKey

Rahasia AccessKey Akun Alibaba Cloud Anda.

String

Ya

Tidak ada

project

Nama proyek SLS.

String

Ya

Tidak ada

Tidak ada.

logStore

Nama Logstore atau metricstore SLS.

String

Ya

Tidak ada

Logstore dan metricstore menggunakan metode konsumsi yang sama.

schema.inference.strategy

Strategi inferensi skema.

String

Tidak

continuous

  • continuous: Melakukan inferensi skema untuk setiap catatan. Ketika skema tidak kompatibel, lakukan inferensi skema yang lebih luas dan hasilkan event perubahan skema.

  • static: Lakukan inferensi skema sekali saat startup pekerjaan. Uraikan catatan berikutnya menggunakan skema awal. Tidak ada event perubahan skema yang dihasilkan.

maxPreFetchLogGroups

Jumlah maksimum kelompok log yang akan dibaca dan diurai per shard selama inferensi skema awal.

Integer

Tidak

50

Sebelum pembacaan dan pemrosesan data aktual, coba konsumsi jumlah kelompok log yang ditentukan per shard untuk menginisialisasi informasi skema.

shardDiscoveryIntervalMs

Interval untuk mendeteksi perubahan shard secara dinamis, dalam milidetik.

Long

Tidak

60000

Atur ke nilai negatif untuk menonaktifkan deteksi dinamis.

Catatan

Nilai ini harus minimal 60000 ms (1 menit).

startupMode

Mode startup.

String

Tidak

Tidak ada

  • timestamp (default): Mengonsumsi log mulai dari waktu tertentu.

  • latest: Mengonsumsi log mulai dari offset terbaru.

  • earliest: Mengonsumsi log mulai dari offset paling awal.

  • consumer_group: Mengonsumsi log mulai dari offset yang dicatat dalam kelompok konsumen. Jika tidak ada offset yang dicatat untuk suatu shard, konsumsi dimulai dari offset paling awal.

startTime

Waktu mulai untuk mengonsumsi log.

String

Tidak

Waktu saat ini

Format: yyyy-MM-dd hh:mm:ss.

Hanya berlaku ketika startupMode diatur ke timestamp.

Catatan

startTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan __timestamp__.

stopTime

Waktu akhir konsumsi log.

String

Tidak

Tidak ada

Format: yyyy-MM-dd hh:mm:ss.

Catatan

Untuk menghentikan program Flink ketika konsumsi log selesai, atur juga exitAfterFinish=true.

consumerGroup

Nama kelompok konsumen.

String

Tidak

Tidak ada

Kelompok konsumen mencatat progres konsumsi. Anda dapat menentukan nama kustom tanpa batasan.

batchGetSize

Jumlah kelompok log yang dibaca per permintaan.

Integer

Tidak

100

Nilai batchGetSize tidak boleh melebihi 1000. Jika tidak, terjadi error.

maxRetries

Jumlah percobaan ulang setelah pembacaan SLS gagal.

Integer

Tidak

3

Tidak ada.

exitAfterFinish

Apakah program Flink keluar setelah konsumsi data selesai.

Boolean

Tidak

false

  • true: Program Flink keluar setelah konsumsi data selesai.

  • false (default): Program Flink terus berjalan setelah konsumsi data selesai.

query

Pernyataan pra-pemrosesan untuk konsumsi SLS.

String

Tidak

Tidak ada

Gunakan parameter query untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari konsumsi semua data ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan.

Misalnya, 'query' = '*| where request_method = ''GET''' mencocokkan log yang bidang request_method-nya sama dengan GET sebelum Flink membacanya.

Catatan

Gunakan sintaksis SPL untuk kueri. Untuk informasi selengkapnya, lihat Sintaksis SPL.

Penting
  • Untuk wilayah tempat SLS mendukung fitur ini, lihat Konsumsi log berdasarkan aturan.

  • Fitur ini gratis selama pratinjau publik. Biaya mungkin berlaku nanti. Untuk informasi selengkapnya, lihat Harga.

compressType

Jenis kompresi SLS.

String

Tidak

Tidak ada

Jenis kompresi yang didukung:

  • lz4

  • deflate

  • zstd

timeZone

Zona waktu untuk startTime dan stopTime.

String

Tidak

Tidak ada

Tidak ada offset yang ditambahkan secara default.

regionId

Wilayah tempat SLS tersedia.

String

Tidak

Tidak ada

Untuk detail konfigurasi, lihat dokumentasi Wilayah yang tersedia.

signVersion

Versi signature permintaan SLS.

String

Tidak

Tidak ada

Untuk detail konfigurasi, lihat dokumentasi Penandatanganan permintaan.

shardModDivisor

Divisor yang digunakan saat membaca shard Logstore SLS.

Int

Tidak

-1

Untuk detail konfigurasi, lihat dokumentasi Shard.

shardModRemainder

Sisa yang digunakan saat membaca shard Logstore SLS.

Int

Tidak

-1

Untuk detail konfigurasi, lihat dokumentasi Shard.

metadata.list

Kolom metadata yang diteruskan ke sistem downstream.

String

Tidak

Tidak ada

Bidang metadata yang didukung meliputi __source__, __topic__, __timestamp__, dan __tag__. Pisahkan beberapa bidang dengan koma.

decode.table-id.fields

Bidang yang digunakan untuk menghasilkan ID tabel saat mengurai data log SLS.

String

Tidak

Tidak ada

Beberapa bidang dihubungkan dengan koma Inggris (,). Misalnya, ketika catatan log SLS upstream adalah {"col0":"a", "col1":"b", "col2":"c"}, pengaturan parameter yang berbeda menghasilkan hasil berikut:

Konfigurasi

ID Tabel

Tidak ada

Semua pesan disimpan di Project.LogStore.

col0

a

col0,col1

a.b

col0,col1,col2

a.b.c

Catatan

Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru.

fixed-types

Tipe bidang yang ditentukan saat mengurai data log SLS.

String

Tidak

Tidak ada

Saat mengurai data, tentukan tipe data untuk bidang tertentu. Pisahkan beberapa bidang dengan koma Inggris ,. Misalnya, id BIGINT, name VARCHAR(10) menentukan bahwa bidang id dalam data bertipe BIGINT, dan bidang name bertipe VARCHAR(10).

Catatan

Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru.

timestamp-format.standard

Format bidang timestamp dalam data log SLS.

String

Tidak

SQL

Nilai valid:

  • SQL: Mengurai timestamp dalam format yyyy-MM-dd HH:mm:ss.s{precision}, seperti 2020-12-30 12:13:14.123, dan mengeluarkannya dalam format yang sama.

  • ISO-8601: Mengurai timestamp dalam format yyyy-MM-ddTHH:mm:ss.s{precision}, seperti 2020-12-30T12:13:14.123, dan mengeluarkannya dalam format yang sama.

Catatan

Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru.

ingestion.ignore-errors

Apakah akan mengabaikan error selama penguraian data.

Boolean

Tidak

false

Catatan

Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru.

ingestion.error-tolerance.max-count

Jumlah maksimum error penguraian yang diizinkan sebelum pekerjaan gagal, ketika ingestion.ignore-errors diaktifkan.

Integer

Tidak

-1

Hanya berlaku ketika ingestion.ignore-errors diaktifkan. Nilai default -1 berarti error penguraian tidak memicu kegagalan pekerjaan.

Catatan

Konfigurasi ini didukung pada VVR 11.6 dan versi yang lebih baru.

Pemetaan tipe

Ketika fixed-types tidak dikonfigurasi, pemetaan tipe data ingestion adalah sebagai berikut:

Tipe bidang SLS

Tipe bidang CDC

STRING

STRING

Ketika fixed-types dikonfigurasi, konektor mencoba mengurai data menggunakan tipe yang ditentukan.

Inferensi skema dan sinkronisasi perubahan skema

  • Pra-konsumsi shard dan inisialisasi skema

    Konektor SLS mempertahankan skema Logstore saat ini dari mana ia membaca data. Sebelum membaca data, konektor melakukan pra-konsumsi hingga `maxPreFetchLogGroups` kelompok log per shard. Kemudian, konektor mengurai skema setiap log dan menggabungkan skema tersebut untuk menginisialisasi struktur tabel. Selanjutnya, sebelum konsumsi aktual, konektor menghasilkan event pembuatan tabel yang sesuai berdasarkan skema yang diinisialisasi.

    Catatan

    Untuk setiap shard, konektor SLS mencoba mengonsumsi dan mengurai skema log mulai satu jam sebelum waktu saat ini.

  • Informasi kunci primer

    Log SLS tidak berisi informasi kunci primer. Anda dapat menambahkan kunci primer secara manual menggunakan aturan transformasi:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • Inferensi skema dan perubahan skema

    Setelah inisialisasi skema, jika schema.inference.strategy diatur ke static, konektor SLS mengurai setiap log menggunakan skema awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, konektor SLS mengurai setiap log, melakukan inferensi kolom fisik, dan membandingkannya dengan skema saat ini. Konektor kemudian menggabungkan skema ketika terdapat perbedaan. Aturan penggabungan:

    • Jika kolom fisik yang diinferensi mencakup bidang yang tidak ada dalam skema saat ini, bidang tersebut ditambahkan ke skema dan event kolom nullable baru dihasilkan.

    • Jika kolom fisik yang diinferensi tidak mencakup bidang yang sudah ada dalam skema saat ini, bidang tersebut tetap dipertahankan. Datanya diisi dengan NULL, dan tidak ada event penghapusan kolom yang dihasilkan.

    Konektor SLS melakukan inferensi semua tipe bidang dalam log sebagai `STRING`. Saat ini, hanya penambahan kolom yang didukung. Kolom baru ditambahkan di akhir skema saat ini dan diatur sebagai nullable.

Contoh kode

  • Tabel sumber dan sink SQL

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • Sumber data ingestion

    SLS dapat berfungsi sebagai sumber data untuk pekerjaan data ingestion, menulis data SLS secara real-time ke sistem downstream yang didukung. Misalnya, Anda dapat mengonfigurasi pekerjaan data ingestion untuk menulis data dari Logstore ke data lake DLF dalam format Paimon. Pekerjaan tersebut secara otomatis melakukan inferensi tipe data bidang dan struktur tabel downstream, serta mendukung evolusi skema dinamis selama runtime.

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# Tambahkan informasi kunci primer ke tabel 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# Tulis semua data dari test_log ke tabel test_database.inventory
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan kinerja baca
  table.properties.deletion-vectors.enabled: true

API DataStream

Penting

Untuk membaca atau menulis data menggunakan DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk Flink. Untuk petunjuknya, lihat Gunakan konektor DataStream.

Jika Anda menggunakan VVR sebelum 8.0.10, file JAR dependensi yang hilang dapat mencegah startup pekerjaan. Anda harus menambahkan file JAR -uber yang sesuai sebagai dependensi tambahan.

Baca dari SLS

VVR menyediakan kelas `SlsSourceFunction` untuk membaca dari SLS. Contoh berikut menunjukkan cara membaca dari SLS.

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

Tulis ke SLS

VVR menyediakan kelas `SLSOutputFormat` untuk menulis ke SLS. Contoh berikut menunjukkan cara menulis ke SLS.

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

Konektor DataStream SLS tersedia di Maven Central Repository di tautan berikut: Konektor DataStream SLS.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-format-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>

FAQ

Saat memulihkan pekerjaan Flink yang gagal, TaskManager kehabisan memori, dan tabel sumber melaporkan `java.lang.OutOfMemoryError: Java heap space`