全部产品
Search
文档中心

Realtime Compute for Apache Flink:Simple Log Service (SLS)

更新时间:Feb 13, 2026

Topik ini menjelaskan cara menggunakan konektor Simple Log Service (SLS).

Informasi latar belakang

Simple Log Service adalah layanan end-to-end untuk data log yang membantu Anda mengumpulkan, mengonsumsi, mengirimkan, mengkueri, dan menganalisis data log secara efisien. Layanan ini meningkatkan efisiensi operasi dan pemeliharaan serta mendukung pemrosesan log skala besar.

Konektor SLS mendukung jenis informasi berikut.

Kategori

Deskripsi

Jenis yang didukung

Tabel sumber dan tabel sink

Mode eksekusi

Hanya mode streaming

Metrik pemantauan

N/A

Format data

N/A

Jenis API

SQL, DataStream API, dan YAML data ingestion

Memperbarui atau menghapus data di tabel sink

Anda tidak dapat memperbarui atau menghapus data di tabel sink. Anda hanya dapat menyisipkan data ke tabel sink.

Fitur

Konektor sumber SLS membaca bidang atribut pesan secara langsung. Tabel berikut mencantumkan bidang atribut yang didukung.

Nama bidang

Tipe

Deskripsi

__source__

STRING METADATA VIRTUAL

Sumber pesan.

__topic__

STRING METADATA VIRTUAL

Topik pesan.

__timestamp__

BIGINT METADATA VIRTUAL

Waktu saat log dihasilkan.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

Tag pesan.

Untuk atribut "__tag__:__receive_time__":"1616742274", '__receive_time__' dan '1616742274' disimpan sebagai pasangan kunci-nilai dalam map. Di SQL, akses menggunakan __tag__['__receive_time__'].

Prasyarat

Anda telah membuat proyek SLS dan Logstore. Untuk informasi selengkapnya, lihat Buat proyek dan Logstore.

Batasan

  • Hanya Ververica Runtime (VVR) 11.1 atau yang lebih baru yang mendukung penggunaan SLS sebagai sumber data ingestion.

  • Konektor SLS hanya mendukung semantik at-least-once.

  • Hindari mengatur parallelisme sumber lebih tinggi daripada jumlah shard karena hal tersebut akan membuang sumber daya. Pada VVR 8.0.5 atau versi sebelumnya, jika jumlah shard berubah setelah Anda menetapkan parallelisme tinggi, failover otomatis mungkin gagal, sehingga menyebabkan beberapa shard tidak dikonsumsi.

SQL

Sintaks

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

DENGAN parameter

  • Umum

    Parameter

    Deskripsi

    Tipe data

    Wajib?

    Nilai default

    Keterangan

    connector

    Jenis tabel.

    String

    Ya

    Tidak ada

    Atur ke sls.

    endPoint

    Alamat endpoint.

    String

    Ya

    Tidak ada

    Masukkan endpoint VPC SLS. Untuk informasi selengkapnya, lihat Endpoints.

    Catatan
    • Secara default, Realtime Compute for Apache Flink tidak dapat mengakses Internet. Namun, Alibaba Cloud menyediakan gerbang NAT untuk mengaktifkan komunikasi antara VPC dan Internet. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses Internet?.

    • Hindari mengakses SLS melalui Internet. Jika harus melakukannya, gunakan HTTPS dan aktifkan transfer acceleration untuk SLS.

    project

    Nama proyek SLS.

    String

    Ya

    Tidak ada

    Tidak ada.

    logStore

    Nama Logstore atau Metricstore SLS.

    String

    Ya

    Tidak ada

    Data di Logstore dikonsumsi dengan cara yang sama seperti di Metricstore.

    accessId

    ID AccessKey Akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada nilai default

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?.

    Penting

    Untuk melindungi pasangan AccessKey Anda, gunakan variabel untuk mengonfigurasi AccessKey Anda.

    accessKey

    Rahasia AccessKey Akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada nilai default

  • Khusus sumber

    Parameter

    Deskripsi

    Tipe data

    Wajib?

    Nilai default

    Keterangan

    enableNewSource

    Menentukan apakah akan menggunakan antarmuka sumber baru yang mengimplementasikan FLIP-27.

    Boolean

    Tidak

    false

    Sumber baru menyesuaikan secara otomatis terhadap perubahan shard dan mendistribusikan shard secara merata di semua subtugas sumber.

    Penting
    • Opsi ini hanya didukung di VVR 8.0.9 atau yang lebih baru. Mulai dari VVR 11.1, opsi ini secara default bernilai true.

    • Jika Anda mengubah opsi ini, pekerjaan Anda tidak dapat dilanjutkan dari state yang disimpan. Sebagai solusi, pertama-tama jalankan pekerjaan Anda dengan opsi consumerGroup untuk mencatat offset konsumen saat ini. Kemudian, atur consumeFromCheckpoint ke true dan mulai ulang pekerjaan Anda tanpa state.

    • Jika SLS berisi shard read-only, beberapa subtugas Flink mungkin selesai membaca dari shard tersebut lalu meminta shard lain yang belum dibaca. Hal ini dapat menyebabkan distribusi shard yang tidak merata di antara subtugas, sehingga mengurangi efisiensi konsumsi dan kinerja sistem secara keseluruhan. Untuk mengurangi ketidakseimbangan ini, sesuaikan parallelisme sumber, optimalkan penjadwalan tugas, atau gabungkan shard kecil.

    shardDiscoveryIntervalMs

    Interval deteksi dinamis perubahan shard. Satuan: milidetik.

    Long

    Tidak

    60000

    Atur opsi ini ke nilai negatif untuk menonaktifkan deteksi dinamis.

    Catatan
    • Opsi ini harus minimal 1 menit (60.000 milidetik).

    • Opsi ini hanya berlaku jika enableNewSource diatur ke true.

    • Opsi ini hanya didukung di VVR 8.0.9 atau yang lebih baru.

    startupMode

    Mode startup tabel sumber.

    String

    Tidak

    timestamp

    • timestamp (default): Mengonsumsi log mulai dari waktu yang ditentukan.

    • latest: Mengonsumsi log mulai dari offset terbaru.

    • earliest: Mengonsumsi log mulai dari offset paling awal.

    • consumer_group: Mengonsumsi log mulai dari offset yang direkam dalam kelompok konsumen. Jika tidak ada offset yang direkam untuk suatu shard, konsumsi log dimulai dari offset paling awal.

    Penting
    • Pada versi VVR sebelum 11.1, nilai consumer_group tidak didukung. Untuk mengonsumsi log dari offset yang direkam oleh kelompok konsumen tertentu, atur consumeFromCheckpoint ke true. Dalam kasus ini, mode startup ini tidak berlaku.

    startTime

    Waktu mulai mengonsumsi log.

    String

    Tidak

    Waktu saat ini

    Format: yyyy-MM-dd hh:mm:ss.

    Opsi ini hanya berlaku jika startupMode diatur ke timestamp.

    Catatan

    Opsi startTime dan stopTime didasarkan pada bidang __receive_time__ di SLS, bukan bidang __timestamp__.

    stopTime

    Waktu akhir konsumsi log.

    String

    Tidak

    Tidak ada

    Format: yyyy-MM-dd hh:mm:ss.

    Catatan
    • Gunakan opsi ini hanya untuk mengonsumsi log historis. Atur ke titik waktu di masa lalu. Jika Anda mengaturnya ke waktu di masa depan, konsumsi mungkin berhenti secara tak terduga jika tidak ada log baru yang ditulis. Hal ini tampak sebagai aliran data yang terputus tanpa pesan error.

    • Untuk keluar dari program Flink setelah konsumsi log selesai, atur juga exitAfterFinish ke true.

    consumerGroup

    Nama kelompok konsumen.

    String

    Tidak

    Tidak ada

    Kelompok konsumen mencatat progres konsumsi. Anda dapat menentukan nama kustom apa pun.

    Catatan

    Anda tidak dapat berbagi kelompok konsumen di beberapa pekerjaan untuk konsumsi kolaboratif. Gunakan kelompok konsumen berbeda untuk pekerjaan berbeda. Jika Anda menggunakan kelompok konsumen yang sama untuk pekerjaan berbeda, setiap pekerjaan akan mengonsumsi semua data. Saat Flink mengonsumsi data dari SLS, shard tidak ditugaskan melalui kelompok konsumen SLS. Oleh karena itu, setiap pekerjaan secara independen mengonsumsi semua pesan, meskipun menggunakan kelompok konsumen yang sama.

    consumeFromCheckpoint

    Menentukan apakah akan mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen tertentu.

    String

    Tidak

    false

    • true: Jika Anda mengatur parameter ini ke true, Anda juga harus menentukan kelompok konsumen. Flink mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen. Jika tidak ada checkpoint di kelompok konsumen, Flink mengonsumsi log dari waktu yang ditentukan oleh parameter startTime.

    • false (default): Flink tidak mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen tertentu.

    Penting

    Opsi ini tidak didukung di VVR 11.1 atau yang lebih baru. Untuk VVR 11.1 atau yang lebih baru, atur startupMode ke consumer_group.

    maxRetries

    Jumlah percobaan ulang setelah pembacaan dari SLS gagal.

    String

    Tidak

    3

    Tidak ada.

    batchGetSize

    Jumlah kelompok log yang dibaca per permintaan.

    String

    Tidak

    100

    Atur batchGetSize ke nilai kurang dari 1000. Jika tidak, terjadi error.

    exitAfterFinish

    Menentukan apakah program Flink keluar setelah konsumsi data selesai.

    String

    Tidak

    false

    • true: Program Flink keluar setelah konsumsi data selesai.

    • false (default): Program Flink tidak keluar setelah konsumsi data selesai.

    query

    Penting

    Opsi ini sudah ditinggalkan di VVR 11.3 tetapi tetap kompatibel di versi yang lebih baru.

    Pernyataan kueri yang digunakan untuk memproses data sebelum mengonsumsi data SLS.

    String

    Tidak

    Tidak ada nilai default

    Gunakan opsi query untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari pemuatan semua data ke Flink, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan.

    Contohnya, 'query' = '*| where request_method = ''GET''' memfilter log di mana bidang request_method bernilai GET sebelum Flink membacanya.

    Catatan

    Tulis kueri menggunakan sintaks SPL.

    Penting
    • Opsi ini hanya didukung di VVR 8.0.1 atau yang lebih baru.

    • Fitur ini dikenai biaya SLS. Untuk detailnya, lihat Tagihan.

    processor

    Prosesor konsumen SLS. Jika query dan processor keduanya diatur, query memiliki prioritas lebih tinggi.

    String

    Tidak

    Tidak ada

    Gunakan opsi processor untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari pemuatan semua data ke Flink, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan. Kami merekomendasikan menggunakan processor daripada query.

    Contohnya, 'processor' = 'test-filter-processor' menerapkan prosesor konsumen SLS untuk memfilter data sebelum Flink membacanya.

    Catatan

    Tulis processor menggunakan sintaks SPL. Untuk detail tentang membuat dan memperbarui prosesor konsumen SLS, lihat Kelola prosesor konsumen.

    Penting

    Opsi ini hanya didukung di VVR 11.3 atau yang lebih baru.

    Fitur ini dikenai biaya SLS. Untuk detailnya, lihat Tagihan.

  • Khusus sink

    parameter

    Deskripsi

    Tipe data

    Wajib?

    Nilai default

    Keterangan

    topicField

    Nama bidang yang nilainya menggantikan bidang __topic__. Ini menunjukkan topik log.

    String

    Tidak

    Tidak ada

    Parameter ini menentukan bidang yang sudah ada di tabel.

    timeField

    Nama bidang yang nilainya menggantikan bidang __timestamp__. Ini menunjukkan waktu penulisan log.

    String

    Tidak

    Waktu saat ini

    Bidang ini harus ada di tabel dan tipenya harus INT. Jika tidak ditentukan, digunakan waktu saat ini.

    sourceField

    Nama bidang yang nilainya menggantikan bidang __source__. Ini menunjukkan sumber log, seperti alamat IP mesin yang menghasilkan log.

    String

    Tidak

    Tidak ada

    Bidang ini harus ada di tabel.

    partitionField

    Nama bidang. Nilai hash dihitung dari nilai bidang ini saat menulis data. Data dengan nilai hash yang sama ditulis ke shard yang sama.

    String

    Tidak

    Tidak ada nilai default

    Jika tidak ditentukan, setiap entri data ditulis secara acak ke shard yang tersedia.

    buckets

    Jumlah bucket untuk dikelompokkan ulang berdasarkan nilai hash saat partitionField ditentukan.

    String

    Tidak

    64

    Nilai valid: [1, 256]. Nilainya harus merupakan pangkat dari 2. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, beberapa shard tidak menerima data.

    flushIntervalMs

    Interval pemicuan penulisan data.

    String

    Tidak

    2000

    Satuan: milidetik.

    writeNullProperties

    Menentukan apakah nilai null ditulis sebagai string kosong ke SLS.

    Boolean

    Tidak

    true

    • true (default): Menulis nilai null sebagai string kosong.

    • false: Tidak menulis bidang yang nilainya dihitung sebagai null.

    Catatan

    Opsi ini hanya didukung di VVR 8.0.6 atau yang lebih baru.

Pemetaan tipe data

Tipe data Flink

Tipe data SLS

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Data ingestion (pratinjau publik)

Batasan

Hanya didukung oleh mesin komputasi waktu nyata Ververica Runtime (VVR) 11.1 dan yang lebih baru.

Sintaks

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

Opsi konfigurasi

Parameter

Deskripsi

Tipe data

Wajib?

Nilai default

Keterangan

type

Jenis sumber data.

String

Ya

Tidak ada

Atur ke sls.

endpoint

Alamat endpoint.

String

Ya

Tidak ada nilai default

Masukkan endpoint VPC SLS. Untuk informasi selengkapnya, lihat Endpoints.

Catatan
  • Secara default, Realtime Compute for Apache Flink tidak dapat mengakses Internet. Namun, Alibaba Cloud menyediakan gerbang NAT untuk mengaktifkan komunikasi antara VPC dan Internet. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses Internet?.

  • Hindari mengakses SLS melalui Internet. Jika harus melakukannya, gunakan HTTPS dan aktifkan transfer acceleration untuk SLS.

accessId

ID AccessKey Akun Alibaba Cloud Anda.

String

Ya

Tidak ada nilai default

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?.

Penting

Untuk melindungi pasangan AccessKey Anda, gunakan variabel untuk mengonfigurasi AccessKey Anda.

accessKey

Rahasia AccessKey Akun Alibaba Cloud Anda.

String

Ya

Tidak ada

project

Nama proyek SLS.

String

Ya

Tidak ada

Tidak ada.

logStore

Nama Logstore atau Metricstore SLS.

String

Ya

Tidak ada

Data di Logstore dikonsumsi dengan cara yang sama seperti di Metricstore.

schema.inference.strategy

Strategi inferensi skema.

String

Tidak

continuous

  • continuous: Melakukan inferensi skema untuk setiap entri data. Jika skema tidak kompatibel, inferensi skema yang lebih luas dan hasilkan event perubahan skema.

  • static: Melakukan inferensi skema sekali saat startup pekerjaan. Uraikan data berikutnya menggunakan skema awal. Tidak ada event perubahan skema yang dihasilkan.

maxPreFetchLogGroups

Jumlah maksimum kelompok log yang dibaca dan diurai per shard selama inferensi skema awal.

Integer

Tidak

50

Sebelum membaca dan memproses data, konektor mencoba melakukan pre-consume sejumlah kelompok log yang ditentukan dari setiap shard untuk menginisialisasi skema.

shardDiscoveryIntervalMs

Interval deteksi dinamis perubahan shard. Satuan: milidetik.

Long

Tidak

60000

Atur opsi ini ke nilai negatif untuk menonaktifkan deteksi dinamis.

Catatan

Opsi ini harus minimal 1 menit (60.000 milidetik).

startupMode

Mode startup.

String

Tidak

Tidak ada nilai default

  • timestamp (default): Mengonsumsi log mulai dari waktu yang ditentukan.

  • latest: Mengonsumsi log mulai dari offset terbaru.

  • earliest: Mengonsumsi log mulai dari offset paling awal.

  • consumer_group: Mengonsumsi log mulai dari offset yang direkam dalam kelompok konsumen. Jika tidak ada offset yang direkam untuk suatu shard, konsumsi log dimulai dari offset paling awal.

startTime

Waktu mulai mengonsumsi log.

String

Tidak

Waktu saat ini

Format: yyyy-MM-dd hh:mm:ss.

Opsi ini hanya berlaku jika startupMode diatur ke timestamp.

Catatan

Opsi startTime dan stopTime didasarkan pada bidang __receive_time__ di SLS, bukan bidang __timestamp__.

stopTime

Waktu akhir mengonsumsi log.

String

Tidak

Tidak ada nilai default

Format: yyyy-MM-dd hh:mm:ss.

Catatan

Untuk keluar dari program Flink setelah konsumsi log selesai, atur juga exitAfterFinish ke true.

consumerGroup

Nama kelompok konsumen.

String

Tidak

Tidak ada

Kelompok konsumen mencatat progres konsumsi. Anda dapat menentukan nama kustom apa pun.

batchGetSize

Jumlah kelompok log yang dibaca per permintaan.

Integer

Tidak

100

batchGetSize harus kurang dari 1000. Jika tidak, terjadi error.

maxRetries

Jumlah percobaan ulang setelah pembacaan dari SLS gagal.

Integer

Tidak

3

Tidak ada

exitAfterFinish

Menentukan apakah program Flink keluar setelah konsumsi data selesai.

Boolean

Tidak

false

  • true: Program Flink keluar setelah konsumsi data selesai.

  • false (default): Program Flink tidak keluar setelah konsumsi data selesai.

query

Pernyataan kueri yang digunakan untuk memproses data sebelum mengonsumsi data SLS.

String

Tidak

Tidak ada nilai default

Gunakan opsi query untuk memfilter data SLS sebelum dikonsumsi. Hal ini menghindari pemuatan semua data ke Flink, sehingga mengurangi biaya dan meningkatkan kecepatan pemrosesan.

Contohnya, 'query' = '*| where request_method = ''GET''' berarti memfilter data di mana bidang request_method bernilai GET sebelum Flink membaca data SLS.

Catatan

Tulis kueri menggunakan sintaks SPL.

Penting

compressType

Jenis kompresi yang digunakan oleh SLS.

String

Tidak

Tidak ada nilai default

Jenis kompresi yang didukung meliputi:

  • lz4

  • deflate

  • zstd

timeZone

Zona waktu untuk startTime dan stopTime.

String

Tidak

Tidak ada nilai default

Tidak ada offset yang ditambahkan secara default.

regionId

Wilayah tempat SLS ditempatkan.

String

Tidak

Tidak ada nilai default

Untuk detail konfigurasi, lihat Wilayah.

signVersion

Versi signature yang digunakan untuk permintaan SLS.

String

Tidak

Tidak ada nilai default

Untuk detail konfigurasi, lihat Signature permintaan.

shardModDivisor

Divisor yang digunakan saat membaca dari shard Logstore SLS.

Int

Tidak

-1

Untuk detail konfigurasi, lihat Shard.

shardModRemainder

Sisa bagi yang digunakan saat membaca dari shard Logstore SLS.

Int

Tidak

-1

Untuk detail konfigurasi, lihat Shard.

metadata.list

Kolom metadata yang diteruskan ke downstream.

String

Tidak

Tidak ada

Bidang metadata yang tersedia meliputi __source__, __topic__, __timestamp__, dan __tag__. Pisahkan dengan koma.

Pemetaan tipe data

Pemetaan tipe data untuk data ingestion adalah sebagai berikut:

Tipe data SLS

Tipe Bidang CDC

STRING

STRING

Inferensi dan evolusi skema

  • Pre-konsumsi dan inisialisasi skema

    Konektor SLS mempertahankan skema Logstore saat ini. Sebelum membaca data dari Logstore, konektor melakukan pre-consume hingga maxPreFetchLogGroups kelompok log dari setiap shard, mengurai skema setiap log, lalu menggabungkannya untuk menginisialisasi skema tabel. Sebelum konsumsi aktual dimulai, konektor menghasilkan event pembuatan tabel berdasarkan skema yang diinisialisasi.

    Catatan

    Untuk setiap shard, konektor mencoba mengonsumsi data satu jam sebelum waktu saat ini guna mengurai skema.

  • Kunci primer

    Log SLS tidak berisi kunci primer. Tambahkan kunci primer secara manual menggunakan aturan transformasi:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • Inferensi dan evolusi skema

    Setelah inisialisasi skema, jika schema.inference.strategy diatur ke static, konektor mengurai setiap entri log menggunakan skema awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, konektor mengurai setiap entri log, menginferensi kolom fisik, lalu membandingkannya dengan skema saat ini. Jika skema yang diinferensi berbeda dari skema saat ini, konektor menggabungkannya menggunakan aturan berikut:

    • Jika skema yang diinferensi mencakup bidang yang tidak ada di skema saat ini, tambahkan bidang tersebut ke skema saat ini dan hasilkan event penambahan kolom nullable.

    • Jika skema yang diinferensi tidak mencakup bidang yang ada di skema saat ini, pertahankan bidang tersebut dan atur nilainya ke NULL. Jangan hasilkan event penghapusan kolom.

    Konektor SLS menginferensi semua bidang sebagai string. Saat ini, hanya penambahan kolom yang didukung. Kolom baru ditambahkan di akhir skema saat ini dan ditandai sebagai nullable.

Kode contoh

  • Tabel sumber dan tabel sink SQL

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • Sumber data ingestion

    Anda dapat menggunakan SLS sebagai sumber data ingestion untuk menulis data SLS secara real time ke sistem downstream yang didukung. Misalnya, Anda dapat mengonfigurasi pekerjaan data ingestion untuk menulis data dari Logstore ke data lake DLF dalam format Paimon. Pekerjaan tersebut secara otomatis menginferensi tipe data dan skema tabel sink serta mendukung evolusi skema dinamis selama runtime.

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# Tambahkan informasi kunci primer ke tabel 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# Tulis semua data dari test_log ke tabel test_database.inventory
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan kinerja baca
  table.properties.deletion-vectors.enabled: true

DataStream API

Penting

Untuk membaca atau menulis data menggunakan DataStream API, gunakan konektor DataStream yang sesuai untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi selengkapnya, lihat Penggunaan konektor DataStream.

Jika Anda menggunakan versi VVR sebelum 8.0.10, pekerjaan Anda mungkin gagal dimulai karena paket JAR dependensi hilang. Untuk mengatasi hal ini, tambahkan paket uber JAR yang sesuai sebagai dependensi tambahan.

Baca data dari SLS

Realtime Compute for Apache Flink menyediakan implementasi SlsSourceFunction dari SourceFunction untuk membaca data dari SLS. Kode contoh:

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Menyiapkan lingkungan eksekusi streaming
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Membuat dan menambahkan sumber dan sink SLS.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // Ukuran batch get harus diberikan.
        accessInfo.setBatchGetSize(10);

        // Parameter opsional
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // waktu mulai mengonsumsi, diatur ke waktu saat ini.
        int startInSec = (int) (new Date().getTime() / 1000);

        // waktu berhenti mengonsumsi, -1 berarti tidak pernah berhenti.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

Tulis data ke SLS

Realtime Compute for Apache Flink menyediakan implementasi SLSOutputFormat dari OutputFormat untuk menulis data ke SLS. Kode contoh:

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

Konektor DataStream SLS tersedia di Repositori Maven Central.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-format-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>

FAQ

Apa yang harus saya lakukan jika terjadi error OOM di TaskManager dan muncul pesan error "java.lang.OutOfMemoryError: Java heap space" untuk tabel sumber saat saya memulihkan program Flink yang gagal?