全部产品
Search
文档中心

Realtime Compute for Apache Flink:Simple Log Service (SLS)

更新时间:Feb 03, 2026

Topik ini menjelaskan cara menggunakan konektor Layanan Log Sederhana (SLS).

Informasi latar belakang

Simple Log Service adalah layanan pencatatan data end-to-end yang membantu Anda mengumpulkan, mengonsumsi, mengirimkan, menanyakan, dan menganalisis data log secara efisien. Layanan ini meningkatkan efisiensi operasional dan O&M serta membantu membangun kapasitas untuk memproses volume besar data log.

Tabel berikut menjelaskan fitur-fitur yang didukung oleh konektor SLS.

Kategori

Detail

Jenis yang didukung

Tabel Source dan Sink

Mode operasi

Hanya mode streaming

Metrik pemantauan spesifik

Tidak berlaku

Format data

Tidak ada

Jenis API

SQL, Datastream, dan YAML data ingestion

Apakah Anda dapat memperbarui atau menghapus data di tabel sink?

Anda tidak dapat memperbarui atau menghapus data di tabel sink. Anda hanya dapat menyisipkan data.

Fitur

Konektor SLS untuk tabel source mendukung pembacaan langsung bidang atribut pesan. Tabel berikut menjelaskan bidang atribut yang didukung.

Nama bidang

Tipe bidang

Deskripsi bidang

__source__

STRING METADATA VIRTUAL

Sumber pesan.

__topic__

STRING METADATA VIRTUAL

Topik pesan.

__timestamp__

BIGINT METADATA VIRTUAL

Waktu log.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

Tag pesan.

Untuk atribut "__tag__:__receive_time__":"1616742274", '__receive_time__' dan '1616742274' direkam sebagai pasangan kunci-nilai dalam map. Anda dapat mengakses nilainya menggunakan __tag__['__receive_time__'] dalam SQL.

Prasyarat

Anda telah membuat proyek Simple Log Service dan Logstore. Untuk informasi selengkapnya, lihat Buat proyek dan Logstore.

Batasan

  • Hanya Realtime Compute for Apache Flink VVR 11.1 atau yang lebih baru yang mendukung penggunaan Simple Log Service (SLS) sebagai sumber data sinkronisasi untuk data ingestion dalam format YAML.

  • Konektor SLS hanya menjamin semantik at-least-once.

  • Kami sangat menyarankan agar Anda tidak mengatur konkurensi source ke nilai yang lebih besar dari jumlah shard. Praktik ini membuang sumber daya dan dapat menyebabkan fitur failover otomatis gagal pada VVR 8.0.5 atau versi sebelumnya jika jumlah shard berubah, sehingga beberapa shard tidak dikonsumsi.

SQL

Sintaks

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

DENGAN PARAMETER

  • Umum

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Catatan

    connector

    Jenis tabel.

    String

    Ya

    Tidak ada

    Atur parameter ini ke sls.

    endPoint

    Titik akhir.

    String

    Ya

    Tidak ada

    Masukkan endpoint jaringan pribadi SLS. Untuk informasi selengkapnya, lihat Endpoints.

    Catatan
    • Secara default, Realtime Compute for Apache Flink tidak dapat mengakses jaringan publik. Namun, Alibaba Cloud menyediakan NAT Gateway untuk mengaktifkan komunikasi antara VPC dan jaringan publik. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses jaringan publik?.

    • Kami tidak menyarankan Anda mengakses SLS melalui jaringan publik. Jika Anda harus mengakses SLS melalui jaringan publik, gunakan protokol HTTPS dan aktifkan Global Accelerator (GA) untuk SLS. Untuk informasi selengkapnya, lihat Kelola akselerasi transfer.

    project

    Nama proyek SLS.

    String

    Ya

    Tidak ada

    Tidak ada.

    logStore

    Nama Logstore atau metricstore SLS.

    String

    Ya

    Tidak ada

    Data dalam Logstore dikonsumsi dengan cara yang sama seperti dalam metricstore.

    accessId

    ID AccessKey akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?.

    Penting

    Untuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Kelola variabel.

    accessKey

    Rahasia AccessKey akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada

  • Khusus tabel source

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Catatan

    enableNewSource

    Menentukan apakah akan mengaktifkan sumber data baru yang mengimplementasikan antarmuka FLIP-27.

    Boolean

    Tidak

    false

    Sumber data baru dapat secara otomatis beradaptasi terhadap perubahan shard dan memastikan bahwa shard didistribusikan secara merata di seluruh semua konkurensi source.

    Penting
    • Parameter ini hanya didukung di Realtime Compute for Apache Flink VVR 8.0.9 atau yang lebih baru. Parameter ini diatur ke true secara default mulai dari VVR 11.1.

    • Pekerjaan tidak dapat dipulihkan dari state setelah item konfigurasi ini diubah. Untuk mengatasi masalah ini, Anda dapat terlebih dahulu mengatur item konfigurasi consumerGroup untuk memulai pekerjaan dan mencatat progres konsumsi dalam kelompok konsumen SLS. Kemudian, atur item konfigurasi consumeFromCheckpoint ke true dan mulai pekerjaan secara tanpa status. Dengan cara ini, pekerjaan dapat melanjutkan konsumsi dari progres historis.

    • Jika terdapat shard read-only di SLS, beberapa task Flink konkuren tetap meminta data dari shard lain yang belum selesai setelah mereka selesai mengonsumsi data dari shard read-only tersebut. Hal ini dapat menyebabkan beberapa task konkuren ditugaskan ke beberapa shard, sehingga menyebabkan ketidakseimbangan distribusi shard di antara mereka. Ketidakseimbangan ini memengaruhi efisiensi konsumsi keseluruhan dan kinerja sistem. Untuk mengurangi masalah ini, sesuaikan konkurensi, optimalkan kebijakan penjadwalan task, atau gabungkan shard kecil untuk mengurangi jumlah shard dan kompleksitas penugasan task.

    shardDiscoveryIntervalMs

    Interval deteksi dinamis perubahan shard. Satuan: milidetik.

    Long

    Tidak

    60000

    Atur parameter ini ke nilai negatif untuk menonaktifkan deteksi dinamis.

    Catatan
    • Nilai parameter ini tidak boleh kurang dari 1 menit (60.000 milidetik).

    • Parameter ini hanya berlaku ketika parameter enableNewSource diatur ke true.

    • Parameter ini hanya didukung di Realtime Compute for Apache Flink VVR 8.0.9 atau yang lebih baru.

    startupMode

    Mode startup tabel sumber.

    String

    Tidak

    timestamp

    • timestamp (default): Log dikonsumsi dari waktu mulai yang ditentukan.

    • latest: Log dikonsumsi dari offset terbaru.

    • earliest: Log dikonsumsi dari offset paling awal.

    • consumer_group: Log dikonsumsi dari offset yang dicatat dalam kelompok konsumen. Jika kelompok konsumen tidak mencatat offset konsumen suatu shard, log dikonsumsi dari offset paling awal.

    Penting
    • Pada versi VVR sebelum 11.1, nilai consumer_group tidak didukung. Anda harus mengatur consumeFromCheckpoint ke true. Dalam kasus ini, log dikonsumsi dari offset yang dicatat dalam kelompok konsumen yang ditentukan, dan mode startup yang ditentukan di sini tidak berlaku.

    startTime

    Waktu mulai konsumsi log.

    String

    Tidak

    Waktu saat ini

    Formatnya adalah yyyy-MM-dd hh:mm:ss.

    Parameter ini hanya berlaku ketika startupMode diatur ke timestamp.

    Catatan

    Parameter startTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan atribut __timestamp__.

    stopTime

    Waktu berakhirnya konsumsi log.

    String

    Tidak

    Tidak ada

    Formatnya adalah yyyy-MM-dd hh:mm:ss.

    Catatan
    • Parameter ini hanya digunakan untuk mengonsumsi log historis dan harus diatur ke titik waktu di masa lalu. Jika Anda mengaturnya ke waktu di masa depan, konsumsi mungkin berhenti lebih awal jika tidak ada log baru yang ditulis. Hal ini tampak sebagai gangguan aliran data tanpa pesan error.

    • Jika Anda ingin program Flink keluar setelah konsumsi log selesai, Anda juga harus mengatur exitAfterFinish=true.

    consumerGroup

    Nama grup konsumen.

    String

    Tidak

    Tidak ada

    Kelompok konsumen digunakan untuk mencatat progres konsumsi. Anda dapat menentukan nama kustom untuk kelompok konsumen. Formatnya tidak tetap.

    Catatan

    Beberapa pekerjaan tidak dapat menggunakan kelompok konsumen yang sama untuk konsumsi kolaboratif. Pekerjaan Flink yang berbeda harus memiliki kelompok konsumen yang berbeda. Jika pekerjaan Flink yang berbeda menggunakan kelompok konsumen yang sama, mereka akan mengonsumsi semua data. Hal ini karena ketika Flink mengonsumsi data dari SLS, alokasi partisi tidak dilakukan melalui kelompok konsumen SLS. Akibatnya, setiap konsumen mengonsumsi pesannya sendiri secara independen, meskipun kelompok konsumennya sama.

    consumeFromCheckpoint

    Menentukan apakah akan mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen yang ditentukan.

    String

    Tidak

    false

    • true: Anda juga harus menentukan kelompok konsumen. Program Flink mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen. Jika kelompok konsumen tidak memiliki checkpoint yang sesuai, log dikonsumsi dari waktu yang ditentukan oleh parameter startTime.

    • false (default): Log tidak dikonsumsi dari checkpoint yang disimpan dalam kelompok konsumen yang ditentukan.

    Penting

    Parameter ini tidak lagi didukung mulai dari VVR 11.1. Untuk VVR 11.1 dan yang lebih baru, Anda harus mengatur startupMode ke consumer_group.

    maxRetries

    Jumlah percobaan ulang setelah kegagalan membaca data dari SLS.

    String

    Tidak

    3

    Tidak ada.

    batchGetSize

    Jumlah kelompok log yang dibaca dalam satu permintaan.

    String

    Tidak

    100

    Nilai batchGetSize tidak boleh melebihi 1.000. Jika tidak, akan muncul error.

    exitAfterFinish

    Menentukan apakah program Flink keluar setelah konsumsi data selesai.

    String

    Tidak

    false

    • true: Program Flink keluar setelah konsumsi data selesai.

    • false (default): Program Flink tidak keluar setelah konsumsi data selesai.

    query

    Penting

    Parameter ini sudah tidak digunakan lagi di VVR 11.3 dan yang lebih baru, tetapi tetap kompatibel dengan versi selanjutnya.

    Pernyataan pra-pemrosesan untuk konsumsi data SLS.

    String

    Tidak

    Tidak ada

    Menggunakan parameter query memungkinkan Anda memfilter data sebelum dikonsumsi dari SLS. Hal ini mencegah semua data dikonsumsi ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan.

    Misalnya, 'query' = '*| where request_method = ''GET''' menunjukkan bahwa sebelum Flink membaca data dari SLS, data tersebut dicocokkan dengan nilai bidang request_method yang bernilai 'get'.

    Catatan

    Parameter query memerlukan penggunaan Structured Process Language (SPL) untuk Simple Log Service. Untuk informasi selengkapnya, lihat Sintaksis SPL.

    Penting
    • Parameter ini hanya didukung di Realtime Compute for Apache Flink VVR 8.0.1 atau yang lebih baru.

    • Fitur ini dikenakan biaya oleh Simple Log Service. Untuk informasi selengkapnya, lihat Penagihan.

    processor

    Prosesor Consum SLS. Jika parameter ini dan `query` dikonfigurasi bersamaan, `query` akan diutamakan dan `processor` tidak berlaku.

    String

    Tidak

    Tidak ada

    Menggunakan parameter processor memungkinkan Anda memfilter data sebelum dikonsumsi oleh Flink. Hal ini membantu menghemat biaya dan meningkatkan kecepatan pemrosesan. Kami menyarankan agar Anda menggunakan parameter processor daripada parameter query.

    Misalnya, 'processor' = 'test-filter-processor' menunjukkan bahwa data difilter oleh Prosesor Consum SLS sebelum dibaca oleh Flink.

    Catatan

    Prosesor harus menggunakan Structured Process Language (SPL) dari Simple Log Service (SLS). Untuk informasi selengkapnya, lihat Sintaksis SPL. Untuk membuat dan memperbarui Prosesor Consum SLS, lihat Kelola Prosesor Consum.

    Penting

    Parameter ini hanya didukung di Realtime Compute for Apache Flink VVR 11.3 atau yang lebih baru.

    Fitur ini dikenakan biaya oleh Simple Log Service. Untuk informasi selengkapnya, lihat Penagihan.

  • Hanya untuk tabel sink

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Catatan

    topicField

    Menentukan nama bidang. Nilai bidang ini akan menggantikan nilai bidang atribut __topic__ dan menunjukkan topik log.

    String

    Tidak

    Tidak ada

    Nilai parameter ini harus merupakan salah satu bidang yang ada dalam tabel.

    timeField

    Menentukan nama bidang. Nilai bidang ini akan menggantikan nilai bidang atribut __timestamp__ dan menunjukkan waktu penulisan log.

    String

    Tidak

    Waktu saat ini

    Nilai parameter ini harus merupakan salah satu bidang yang ada dalam tabel, dan tipe bidangnya harus INT. Jika parameter ini tidak ditentukan, waktu saat ini digunakan secara default.

    sourceField

    Menentukan nama bidang. Nilai bidang ini akan menggantikan nilai bidang atribut __source__ dan menunjukkan sumber log, seperti alamat IP mesin yang menghasilkan log.

    String

    Tidak

    Tidak ada

    Nilai parameter ini harus merupakan salah satu bidang yang ada dalam tabel.

    partitionField

    Menentukan nama bidang. Saat data ditulis, nilai hash dihitung berdasarkan nilai kolom ini. Data dengan nilai hash yang sama ditulis ke shard yang sama.

    String

    Tidak

    Tidak ada

    Jika parameter ini tidak ditentukan, setiap data ditulis secara acak ke shard yang tersedia saat itu.

    buckets

    Jumlah kelompok yang dikelompokkan ulang berdasarkan nilai hash saat partitionField ditentukan.

    String

    Tidak

    64

    Nilai parameter ini harus berada dalam rentang [1, 256] dan harus merupakan pangkat bulat dari 2. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, beberapa shard tidak akan menerima data.

    flushIntervalMs

    Periode yang memicu penulisan data.

    String

    Tidak

    2000

    Satuan: milidetik.

    writeNullProperties

    Menentukan apakah akan menulis nilai null sebagai string kosong ke SLS.

    Boolean

    Tidak

    true

    • true (default): Nilai null ditulis sebagai string kosong ke log.

    • false: Bidang dengan nilai null tidak ditulis ke log.

    Catatan

    Parameter ini hanya didukung di Realtime Compute for Apache Flink VVR 8.0.6 atau yang lebih baru.

Pemetaan tipe

Tipe bidang Flink

Tipe bidang SLS

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Pengambilan data

Batasan

Fitur ini hanya didukung di Realtime Compute for Apache Flink VVR 11.1 atau yang lebih baru.

Sintaks

source:
   type: sls
   name: Sumber SLS
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

Item konfigurasi

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

jenis

Jenis sumber data.

String

Ya

Tidak ada

Atur parameter ini ke sls.

Titik akhir

Titik akhir.

String

Ya

Tidak ada

Masukkan endpoint jaringan pribadi SLS. Untuk informasi selengkapnya, lihat Endpoints.

Catatan
  • Secara default, Realtime Compute for Apache Flink tidak dapat mengakses jaringan publik. Namun, Alibaba Cloud menyediakan NAT Gateway untuk mengaktifkan komunikasi antara VPC dan jaringan publik. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses jaringan publik?.

  • Kami tidak menyarankan Anda mengakses SLS melalui jaringan publik. Jika Anda harus mengakses SLS melalui jaringan publik, gunakan protokol HTTPS dan aktifkan Global Accelerator (GA) untuk SLS. Untuk informasi selengkapnya, lihat Kelola akselerasi transfer.

accessId

ID AccessKey dari akun Alibaba Cloud Anda.

String

Ya

Tidak ada

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?.

Penting

Untuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Kelola variabel.

accessKey

Rahasia AccessKey dari akun Alibaba Cloud Anda.

String

Ya

Tidak ada

Proyek

Nama dari proyek SLS.

String

Ya

Tidak ada

Tidak ada.

logStore

Nama Logstore atau metricstore SLS.

String

Ya

Tidak ada

Data dalam Logstore dikonsumsi dengan cara yang sama seperti dalam metricstore.

schema.inference.strategy

Strategi inferensi skema.

String

Tidak

continuous

  • continuous: Inferensi skema dilakukan untuk setiap data. Jika skema sebelum dan sesudah tidak kompatibel, skema yang lebih luas diinferensi dan event perubahan skema dihasilkan.

  • static: Inferensi skema hanya dilakukan sekali saat pekerjaan dimulai. Data selanjutnya diurai berdasarkan skema awal, dan tidak ada event perubahan skema yang dihasilkan.

maxPreFetchLogGroups

Jumlah maksimum kelompok log yang akan dibaca dan diurai untuk setiap shard selama inferensi skema awal.

Integer

Tidak

50

Sebelum pekerjaan benar-benar membaca dan memproses data, pekerjaan tersebut mencoba melakukan pre-consume sejumlah kelompok log yang ditentukan untuk setiap shard guna menginisialisasi informasi skema.

shardDiscoveryIntervalMs

Interval deteksi dinamis perubahan shard. Satuan: milidetik.

Long

Tidak

60000

Atur parameter ini ke nilai negatif untuk menonaktifkan deteksi dinamis.

Catatan

Nilai parameter ini tidak boleh kurang dari 1 menit (60.000 milidetik).

startupMode

Mode startup.

String

Tidak

Tidak ada

  • timestamp (default): Log dikonsumsi dari waktu mulai yang ditentukan.

  • latest: Log dikonsumsi dari offset terbaru.

  • earliest: Log dikonsumsi dari offset paling awal.

  • consumer_group: Log dikonsumsi dari offset yang dicatat dalam kelompok konsumen. Jika kelompok konsumen tidak mencatat offset konsumen suatu shard, log dikonsumsi dari offset paling awal.

startTime

Waktu mulai konsumsi log.

String

Tidak

Waktu saat ini

Formatnya adalah yyyy-MM-dd hh:mm:ss.

Parameter ini hanya berlaku ketika startupMode diatur ke timestamp.

Catatan

Parameter startTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan atribut __timestamp__.

stopTime

Waktu berakhirnya konsumsi log.

String

Tidak

Tidak ada

Formatnya adalah yyyy-MM-dd hh:mm:ss.

Catatan

Jika Anda ingin program Flink keluar setelah konsumsi log selesai, Anda juga harus mengatur exitAfterFinish=true.

consumerGroup

Nama grup konsumen.

String

Tidak

Tidak ada

Kelompok konsumen digunakan untuk mencatat progres konsumsi. Anda dapat menentukan nama kustom untuk kelompok konsumen. Formatnya tidak tetap.

batchGetSize

Jumlah kelompok log yang dibaca dalam satu permintaan.

Integer

Tidak

100

Nilai batchGetSize tidak boleh melebihi 1.000. Jika tidak, akan muncul error.

maxRetries

Jumlah percobaan ulang setelah kegagalan membaca data dari SLS.

Integer

Tidak

3

Tidak ada.

exitAfterFinish

Menentukan apakah program Flink keluar setelah konsumsi data selesai.

Boolean

Tidak

false

  • true: Program Flink keluar setelah konsumsi data selesai.

  • false (default): Program Flink tidak keluar setelah konsumsi data selesai.

query

Pernyataan pra-pemrosesan untuk konsumsi data SLS.

String

Tidak

Tidak ada

Menggunakan parameter query memungkinkan Anda memfilter data sebelum dikonsumsi dari SLS. Hal ini mencegah semua data dikonsumsi ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan.

Misalnya, 'query' = '*| where request_method = ''GET''' menunjukkan bahwa sebelum Flink membaca data dari SLS, data tersebut dicocokkan dengan nilai bidang request_method yang bernilai 'get'.

Catatan

Parameter query memerlukan penggunaan Structured Process Language (SPL) untuk Simple Log Service. Untuk informasi selengkapnya, lihat Sintaksis SPL.

Penting
  • Untuk informasi tentang wilayah yang mendukung fitur ini, lihat Konsumsi log berdasarkan aturan.

  • Fitur ini sedang dalam pratinjau publik dan gratis. Anda mungkin akan dikenakan biaya untuk fitur ini di kemudian hari. Untuk informasi selengkapnya, lihat Penagihan.

compressType

Jenis kompresi untuk SLS.

String

Tidak

Tidak ada

Jenis kompresi yang didukung meliputi:

  • lz4

  • deflate

  • zstd

zonaWaktu

Zona waktu untuk startTime dan stopTime.

String

Tidak

Tidak ada

Secara default, tidak ada offset yang ditambahkan.

regionId

Wilayah tempat layanan SLS diaktifkan.

String

Tidak

Tidak ada

Lihat Wilayah yang didukung untuk konfigurasi.

signVersion

Versi signature permintaan SLS.

String

Tidak

Tidak ada

Untuk informasi selengkapnya tentang konfigurasi, lihat Request Signature.

shardModDivisor

Divisor untuk membaca partisi Logstore SLS.

Int

Tidak

-1

Lihat Shard untuk mengonfigurasi opsi ini.

shardModRemainder

Sisa kuota untuk membaca partisi dari penyimpanan log SLS.

Int

Tidak

-1

Lihat Shard untuk mengonfigurasi opsi ini.

metadata.list

Kolom metadata yang diteruskan ke downstream.

String

Tidak

Tidak ada

Bidang metadata yang tersedia meliputi __source__, __topic__, __timestamp__, dan __tag__. Pisahkan dengan koma.

Pemetaan tipe

Tabel berikut menunjukkan pemetaan tipe untuk data ingestion.

Tipe bidang SLS

Tipe bidang CDC

STRING

STRING

Inferensi skema tabel dan sinkronisasi perubahan

  • Pre-konsumsi data shard dan inisialisasi skema tabel

    Konektor SLS mempertahankan skema Logstore yang sedang dibaca. Sebelum membaca data dari Logstore, konektor SLS mencoba melakukan pre-consume hingga maxPreFetchLogGroups kelompok log dari setiap shard. Konektor kemudian mengurai skema setiap log dalam kelompok log tersebut dan menggabungkan skema-skema tersebut untuk menginisialisasi skema tabel. Sebelum konsumsi data aktual dimulai, event pembuatan tabel yang sesuai dihasilkan berdasarkan skema yang diinisialisasi.

    Catatan

    Untuk setiap shard, konektor SLS mencoba mengonsumsi data dan mengurai skema log mulai dari satu jam sebelum waktu saat ini.

  • Informasi primary key

    Log SLS tidak berisi informasi primary key. Anda dapat menambahkan primary key secara manual ke tabel menggunakan aturan transformasi:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • Inferensi skema dan sinkronisasi perubahan

    Setelah skema tabel diinisialisasi, jika schema.inference.strategy diatur ke static, konektor SLS mengurai setiap log berdasarkan skema tabel awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, konektor SLS mengurai data setiap log, menginferensi kolom fisik, dan membandingkannya dengan skema yang saat ini dicatat. Jika skema yang diinferensi tidak konsisten dengan skema saat ini, skema-skema tersebut digabung berdasarkan aturan berikut:

    • Jika kolom fisik yang diinferensi berisi bidang yang tidak ada dalam skema saat ini, bidang tersebut ditambahkan ke skema, dan event penambahan kolom nullable dihasilkan.

    • Jika kolom fisik yang diinferensi tidak berisi bidang yang sudah ada dalam skema saat ini, bidang tersebut dipertahankan, datanya diisi dengan NULL, dan tidak ada event penghapusan kolom yang dihasilkan.

    Konektor SLS menginferensi tipe semua bidang dalam setiap log sebagai String. Saat ini, hanya penambahan kolom yang didukung. Kolom baru ditambahkan di akhir skema saat ini dan diatur sebagai kolom nullable.

Contoh kode

  • Tabel source dan sink SQL

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • Sumber data untuk ingestasi data

    Anda dapat menggunakan SLS sebagai sumber data untuk pekerjaan data ingestion guna menulis data SLS secara real-time ke sistem downstream yang didukung. Misalnya, Anda dapat mengonfigurasi pekerjaan data ingestion sebagai berikut untuk menulis data dari Logstore ke data lake DLF dalam format paimon. Pekerjaan ini secara otomatis menginferensi tipe data bidang dan skema tabel downstream serta mendukung evolusi skema dinamis saat runtime.

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# Tambahkan informasi primary key ke tabel. 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# Tulis semua data dari test_log ke tabel test_database.inventory.
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

DataStream API

Penting

Saat Anda membaca atau menulis data menggunakan DataStream API, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi selengkapnya, lihat Gunakan konektor DataStream.

Jika Anda menggunakan versi VVR sebelum 8.0.10, paket JAR dependensi mungkin hilang saat Anda memulai pekerjaan. Untuk mengatasi masalah ini, Anda dapat menambahkan uber package yang sesuai ke dependensi tambahan.

Baca data dari SLS

Realtime Compute for Apache Flink menyediakan kelas SlsSourceFunction, yang merupakan implementasi dari SourceFunction, untuk membaca data dari SLS. Kode berikut memberikan contohnya.

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Mengatur lingkungan eksekusi streaming
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Membuat dan menambahkan sumber dan sink SLS.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // Ukuran batch get harus diberikan.
        accessInfo.setBatchGetSize(10);

        // Parameter opsional
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // Waktu mulai konsumsi, disetel ke waktu saat ini.
        int startInSec = (int) (new Date().getTime() / 1000);

        // Waktu berhenti konsumsi, -1 berarti tidak pernah berhenti.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

Tulis data ke SLS

Kelas SLSOutputFormat, yang merupakan implementasi dari OutputFormat, disediakan untuk menulis data ke SLS. Kode berikut memberikan contohnya.

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

Konektor DataStream SLS tersedia di repositori pusat Maven.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

FAQ

Apa yang harus saya lakukan jika TaskManager kehabisan memori dan melaporkan error "java.lang.OutOfMemoryError: Java heap space" untuk tabel source saat saya memulihkan program Flink yang gagal?