全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Simple Log Service (SLS)

更新时间:Nov 06, 2025

Topik ini menjelaskan cara menggunakan konektor Layanan Log Sederhana (SLS).

Informasi latar belakang

Layanan Log Sederhana adalah layanan pencatatan data ujung ke ujung yang dikembangkan oleh Alibaba Cloud. Layanan ini memungkinkan pengumpulan, konsumsi, pengiriman, penanyakan, dan analisis data log secara efisien, meningkatkan efisiensi operasional serta menyediakan kemampuan untuk memproses sejumlah besar data log.

Tabel berikut menjelaskan kemampuan yang didukung oleh konektor SLS.

Kategori

Deskripsi

Jenis yang Didukung

Tabel sumber dan tabel sink

Mode operasi

Mode streaming

Metrik

Tidak tersedia

Format data

Tidak tersedia

Jenis API

SQL, DataStream API, dan API YAML pengambilan data

Pembaruan atau penghapusan data dalam tabel sink

Data dalam tabel sink tidak dapat diperbarui atau dihapus. Data hanya dapat dimasukkan ke dalam tabel sink.

Fitur

Konektor sumber SLS dapat digunakan untuk membaca bidang atribut pesan. Tabel berikut menjelaskan bidang atribut yang didukung oleh konektor sumber SLS.

Bidang

Tipe

Deskripsi

__source__

STRING METADATA VIRTUAL

Sumber pesan.

__topic__

STRING METADATA VIRTUAL

Topik pesan.

__timestamp__

BIGINT METADATA VIRTUAL

Waktu ketika log dihasilkan.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

Tag pesan.

Atribut "__tag__:__receive_time__":"1616742274", '__receive_time__', dan '1616742274' dicatat sebagai pasangan kunci-nilai dalam peta dan diakses dalam mode __tag__['__receive_time__'] dalam SQL.

Prasyarat

Proyek dan Logstore telah dibuat. Untuk informasi lebih lanjut, lihat Buat proyek dan logstore.

Batasan

  • Hanya Ververica Runtime (VVR) 11.1 atau versi lebih baru yang mendukung penggunaan SLS sebagai sumber pengambilan data.

  • Konektor SLS hanya mendukung semantik setidaknya sekali.

  • Untuk meningkatkan efisiensi sumber daya, atur paralelisme operator sumber ke nilai yang sama dengan atau kurang dari jumlah shard. Di VVR 8.0.5 atau lebih lama, jika paralelisme sumber melebihi jumlah shard dan jumlah shard berubah, pemulihan pekerjaan otomatis mungkin menjadi tidak valid, yang berpotensi menyebabkan shard yang tidak terkonsumsi.

SQL

Sintaks

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

Opsi konektor dalam klausa WITH

  • Umum

    Opsi

    Deskripsi

    Tipe data

    Diperlukan?

    Nilai default

    Catatan

    connector

    Konektor yang akan digunakan.

    String

    Ya

    Tidak ada nilai default

    Atur ke sls.

    endPoint

    Titik akhir SLS.

    String

    Ya

    Tidak ada nilai default

    Masukkan alamat akses VPC SLS. Untuk informasi lebih lanjut, lihat Titik akhir.

    Catatan
    • Secara default, Realtime Compute for Apache Flink tidak dapat mengakses Internet. Namun, Alibaba Cloud menyediakan NAT gateway untuk memungkinkan komunikasi antara VPC dan Internet. Untuk informasi lebih lanjut, lihat Bagaimana Realtime Compute for Apache Flink mengakses Internet?.

    • Kami sarankan agar Anda tidak mengakses SLS melalui Internet. Jika Anda perlu mengakses SLS melalui Internet, gunakan HTTPS dan aktifkan akselerasi transfer untuk SLS.

    project

    Nama proyek SLS.

    String

    Ya

    Tidak ada nilai default

    logStore

    Nama Logstore atau Metricstore SLS.

    STRING

    Ya

    Tidak ada nilai default

    Data dalam Logstore dikonsumsi dengan cara yang sama seperti dalam Metricstore.

    accessId

    ID AccessKey akun Alibaba Cloud Anda.

    STRING

    Ya

    Tidak ada nilai default

    Untuk informasi lebih lanjut, lihat Bagaimana cara melihat pasangan AccessKey suatu akun?

    Penting

    Untuk melindungi pasangan AccessKey Anda, konfigurasikan AccessKey Anda menggunakan variabel.

    accessKey

    Rahasia AccessKey akun Alibaba Cloud Anda.

    STRING

    Ya

    Tidak ada nilai default

  • Spesifik Sumber

    Opsi

    Deskripsi

    Tipe data

    Diperlukan?

    Nilai default

    Catatan

    enableNewSource

    Menentukan apakah akan menggunakan antarmuka sumber refactor FLIP-27.

    BOOLEAN

    Tidak

    false

    Aktifkan opsi ini dan sumber secara otomatis menyesuaikan dengan perubahan shard dan mendistribusikan shard di seluruh subtugas sumber seimbang mungkin.

    Catatan

    Hanya VVR 8.0.9 atau lebih baru yang mendukung opsi ini.

    Penting
    • Mulai dari VVR 11.1, opsi ini diatur ke true secara default.

    • Jika nilai opsi berubah, pekerjaan Anda tidak dapat dilanjutkan dari status tertentu. Untuk menyelesaikan ini, konfigurasikan opsi consumerGroup untuk mencatat offset konsumen saat ini dan mulai pekerjaan Anda. Kemudian, atur consumeFromCheckpoint ke true dan mulai pekerjaan tanpa status.

    • Saat subtugas sumber selesai membaca dari shard read-only, mereka terus meminta mengonsumsi shard lain. Hal ini dapat menyebabkan konsumsi shard yang tidak merata di antara subtugas sumber, memengaruhi kinerja keseluruhan pekerjaan. Untuk mengurangi masalah ini, pertimbangkan menyesuaikan paralelisme sumber, mengoptimalkan strategi penjadwalan Anda, atau menggabungkan shard kecil untuk menyederhanakan penugasan shard.

    shardDiscoveryIntervalMs

    Interval deteksi dinamis perubahan shard.

    LONG

    Tidak

    60000

    Unit: milidetik.

    Untuk menonaktifkan deteksi dinamis, atur opsi ke nilai negatif.

    Catatan
    • Nilai opsi ini tidak boleh kurang dari 1 menit (atau 60.000 milidetik).

    • Opsi ini hanya berlaku jika opsi enableNewSource diatur ke true.

    • Hanya VVR 8.0.9 atau lebih baru yang mendukung opsi ini.

    startupMode

    Mode startup tabel sumber.

    STRING

    Tidak

    timestamp

    • timestamp: Log dikonsumsi dari waktu mulai yang ditentukan.

    • latest: Log dikonsumsi dari offset terbaru.

    • earliest: Log dikonsumsi dari offset paling awal.

    • consumer_group: Log dikonsumsi dari offset yang direkam dalam kelompok konsumen. Jika kelompok konsumen tidak mencatat offset konsumsi shard, log dikonsumsi dari offset paling awal.

    Penting
    • Di versi VVR sebelum 11.1, consumer_group tidak lagi didukung. Untuk mengonsumsi data dari offset yang direkam dalam kelompok konsumen tertentu, atur consumeFromCheckpoint ke true.

    startTime

    Waktu mulai konsumsi log.

    STRING

    Tidak

    Waktu saat ini

    Nilai opsi ini dalam format yyyy-MM-dd hh:mm:ss.

    Opsi ini berlaku hanya jika startupMode diatur ke timestamp.

    Catatan

    Parameter startTime dan stopTime dikonfigurasi berdasarkan bidang __receive_time__ dalam tabel sumber SLS, bukan pada bidang __timestamp__.

    stopTime

    Waktu berhenti konsumsi log.

    String

    Tidak

    Tidak ada nilai default

    Nilai opsi ini dalam format yyyy-MM-dd hh:mm:ss.

    Catatan
    • Untuk hanya mengonsumsi log historis, atur opsi ini ke titik waktu historis tertentu. Menggunakan titik waktu di masa depan dapat menyebabkan konsumsi berhenti secara tak terduga jika injeksi log baru sementara terganggu. Gejala yang teramati adalah gangguan aliran data tanpa pesan kesalahan atau pengecualian yang menyertainya.

    • Jika Anda ingin program Realtime Compute for Apache Flink keluar setelah konsumsi log selesai, Anda juga harus mengonfigurasi opsi exitAfterFinish dan atur opsi exitAfterFinish ke true.

    consumerGroup

    Nama grup konsumen.

    STRING

    Tidak

    Tidak ada nilai default

    Grup konsumen mencatat kemajuan konsumsi. Anda dapat menentukan nama grup konsumen kustom. Format nama tidak tetap.

    Catatan

    Grup konsumen tidak dapat dibagikan oleh beberapa pekerjaan untuk konsumsi kolaboratif. Kami sarankan Anda menentukan grup konsumen berbeda untuk pekerjaan berbeda. Jika Anda menentukan grup konsumen yang sama untuk pekerjaan berbeda, semua data dikonsumsi. Ketika Realtime Compute for Apache Flink mengonsumsi data dari SLS, data tidak di-shard dalam grup konsumen. Oleh karena itu, jika beberapa pekerjaan berbagi grup konsumen yang sama, semua pesan dalam grup konsumen dikonsumsi oleh setiap pekerjaan.

    consumeFromCheckpoint

    Menentukan apakah akan mengonsumsi log dari checkpoint yang disimpan dalam grup konsumen yang ditentukan.

    STRING

    Tidak

    false

    • true: Jika Anda mengatur opsi ini ke true, Anda juga harus menentukan grup konsumen. Flink mengonsumsi log dari titik kontrol yang disimpan dalam grup konsumen. Jika tidak ada titik kontrol dalam grup konsumen, Flink mengonsumsi log dari waktu yang ditentukan oleh opsi startTime.

    • false : Flink tidak mengonsumsi log dari titik kontrol yang disimpan dalam grup konsumen tertentu.

    Penting

    Mulai dari VVR 11.1, opsi ini tidak lagi didukung. Anda perlu mengatur startupMode ke consumer_group.

    maxRetries

    Jumlah percobaan ulang yang diizinkan ketika gagal membaca data dari SLS.

    String

    Tidak

    3

    batchGetSize

    Jumlah grup log yang dibaca dalam satu permintaan.

    String

    Tidak

    100

    Untuk mencegah kesalahan, atur batchGetSize ke nilai kurang dari 1000.

    exitAfterFinish

    Menentukan apakah program Realtime Compute for Apache Flink keluar setelah konsumsi data selesai.

    String

    Tidak

    false

    • true

    • false

    query

    Penting

    Opsi ini sudah ditinggalkan di VVR 11.3, tetapi versi berikutnya tetap kompatibel.

    Pernyataan query yang digunakan untuk memproses data sebelum konsumsi data dimulai.

    STRING

    Tidak

    Tidak ada nilai default

    Mengonfigurasi opsi ini untuk menyaring data dari SLS sebelum konsumsi data dimulai, mengurangi biaya dan meningkatkan efisiensi pemrosesan data.

    Sebagai contoh, jika Anda menentukan 'query' = '*| where request_method = ''GET''', Realtime Compute for Apache Flink menyaring data di mana nilai bidang request_method sama dengan GET sebelum konsumsi data.

    Catatan

    Gunakan sintaks SPL saat mengonfigurasi opsi ini.

    Penting
    • Hanya VVR 8.0.1 atau lebih baru yang mendukung opsi ini.

    • Fitur ini membebankan biaya dari SLS. Untuk detailnya, lihat Tagihan.

    processor

    Prosesor SLS. Jika kedua opsi ini dan query diatur, maka query memiliki prioritas lebih tinggi.

    STRING

    Tidak

    Tidak ada nilai default

    Opsi ini secara fungsional setara dengan query, tetapi kami sarankan menggunakan opsi ini. Sebagai contoh, pengaturan 'processor' = 'test-filter-processor' menunjukkan bahwa data akan difilter oleh prosesor SLS sebelum dikonsumsi oleh Flink.

    Catatan

    Gunakan sintaksis SPL saat mengonfigurasi opsi ini.

    Penting
    • Hanya VVR 8.0.1 atau lebih baru yang mendukung opsi ini.

    • Fitur ini membebankan biaya dari SLS. Untuk detailnya, lihat Tagihan.

  • Spesifik Sink

    Opsi

    Deskripsi

    Tipe data

    Diperlukan?

    Nilai default

    Catatan

    topicField

    Menentukan nama bidang. Nilai opsi ini menimpa nilai bidang __topic__ untuk menunjukkan topik log.

    String

    Tidak

    Tidak ada nilai default

    Nilai opsi ini harus berupa bidang yang ada dalam tabel.

    timeField

    Menentukan nama bidang. Nilai opsi ini menimpa nilai bidang __timestamp__ untuk menunjukkan waktu penulisan log.

    String

    Tidak

    Waktu saat ini

    Opsi ini harus diatur ke bidang INT yang ada. Jika tidak ada bidang yang ditentukan, waktu saat ini digunakan.

    sourceField

    Menentukan nama bidang. Nilai opsi ini menimpa nilai bidang atribut __source__ untuk menunjukkan asal log. Sebagai contoh, nilainya adalah alamat IP mesin yang menghasilkan log.

    STRING

    Tidak

    Tidak ada nilai default

    Nilai opsi ini harus berupa bidang yang ada dalam tabel.

    partitionField

    Menentukan nama bidang. Nilai hash dihitung berdasarkan nilai parameter ini ketika data ditulis ke SLS. Data yang memiliki nilai hash yang sama ditulis ke shard yang sama.

    STRING

    Tidak

    Tidak ada nilai default

    Jika Anda tidak menentukan opsi ini, setiap entri data secara acak ditulis ke shard yang tersedia.

    buckets

    Jumlah bucket yang dikelompokkan ulang berdasarkan nilai hash ketika opsi partitionField ditentukan.

    String

    Tidak

    64

    Rentang nilai valid: [1,256]. Nilai opsi ini harus merupakan pangkat dua bilangan bulat. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, tidak ada data yang akan ditulis ke shard tertentu.

    flushIntervalMs

    Interval di mana penulisan data dipicu.

    STRING

    Tidak

    2000

    Unit: milidetik.

    writeNullProperties

    Menentukan apakah akan menulis nilai null sebagai string kosong ke SLS.

    BOOLEAN

    Tidak

    true

    • true

    • false

    Catatan

    Hanya VVR 8.0.6 atau lebih baru yang mendukung opsi ini.

Pemetaan tipe data

Tipe data Realtime Compute for Apache Flink

Tipe data SLS

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Pengambilan data

Batasan

Hanya VVR 11.1 atau lebih baru yang mendukung pengambilan data dari SLS.

Sintaks

source:
   type: sls
   name: Sumber SLS
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

Opsi konfigurasi

Opsi

Deskripsi

Tipe data

Wajib?

Nilai default

Catatan

jenis

Jenis sumber data.

String

Ya

Tidak ada nilai default

Atur ke sls.

Titik akhir

Titik akhir.

String

Ya

Tidak ada nilai default

Masukkan alamat akses VPC untuk SLS. Untuk informasi lebih lanjut, lihat Endpoints.

Catatan
  • Secara default, Realtime Compute for Apache Flink tidak dapat mengakses Internet. Namun, Alibaba Cloud menyediakan NAT gateway untuk memungkinkan komunikasi antara VPC dan Internet. Untuk informasi lebih lanjut, lihat Bagaimana cara Realtime Compute for Apache Flink mengakses Internet?

  • Hindari mengakses SLS melalui Internet. Jika Anda memang perlu menggunakan koneksi Internet, gunakan HTTPS dan aktifkan akselerasi transfer untuk SLS.

accessId

ID AccessKey dari akun Alibaba Cloud Anda.

String

Ya

Tidak ada nilai default

Lihat Bagaimana cara melihat pasangan AccessKey sebuah akun?

Penting

Untuk melindungi pasangan AccessKey Anda, gunakan variabel untuk mengonfigurasi ID AccessKey dan rahasia.

accessKey

Rahasia AccessKey dari akun Alibaba Cloud Anda.

String

Ya

Tidak ada nilai default

Proyek

Nama dari proyek SLS.

String

Ya

Tidak ada nilai default

logStore

Nama dari sebuah Logstore atau Metricstore.

String

Ya

Tidak ada nilai default

Data dalam sebuah Logstore dikonsumsi dengan cara yang sama seperti dalam sebuah Metricstore.

schema.inference.strategy

Strategi untuk inferensi skema.

String

Tidak

continuous

  • continuous: Inferensi skema dilakukan untuk setiap entri data. Jika skema tidak kompatibel, skema yang lebih luas akan diinferensi dan acara perubahan skema akan dihasilkan.

  • static: Inferensi skema hanya dilakukan sekali saat pekerjaan dimulai. Data selanjutnya dianalisis berdasarkan skema awal, dan tidak ada acara perubahan skema yang dihasilkan.

maxPreFetchLogGroups

Jumlah maksimum grup log yang dibaca dan dianalisis untuk setiap shard selama inferensi skema awal.

Integer

Tidak

50

Sebelum data dimuat dan diproses, konektor mencoba mengonsumsi sejumlah grup log tertentu dari setiap shard terlebih dahulu untuk menginisialisasi skema.

shardDiscoveryIntervalMs

Interval di mana perubahan pada shard dideteksi secara dinamis.

Long

Tidak

60000

Atur opsi ini ke nilai negatif untuk menonaktifkan deteksi dinamis. Satuan: milidetik.

Catatan

Nilai dari opsi ini tidak boleh kurang dari 1 menit (yaitu, 60.000 milidetik).

startupMode

Mode startup.

String

Tidak

timestamp

  • timestamp: Log dikonsumsi dari waktu mulai yang ditentukan.

  • latest: Log dikonsumsi dari offset terbaru.

  • earliest: Log dikonsumsi dari offset paling awal.

  • consumer_group: Log dikonsumsi dari offset yang direkam dalam kelompok konsumen. Jika kelompok konsumen tidak mencatat offset konsumsi shard, log dikonsumsi dari offset paling awal.

startTime

Waktu mulai konsumsi log.

String

Tidak

Waktu saat ini

Nilai dari opsi ini dalam format yyyy-MM-dd hh:mm:ss.

Hanya berlaku jika startupMode diatur ke timestamp.

Catatan

Opsi startTime dan stopTime dikonfigurasi berdasarkan field __receive_time__ di SLS, bukan field __timestamp__.

stopTime

Waktu di mana konsumsi log dihentikan.

String

Tidak

Tidak ada nilai default

Nilai dari opsi ini berada dalam format yyyy-MM-dd hh:mm:ss.

Catatan

Untuk membatalkan pekerjaan Flink setelah selesai konsumsi log, atur exitAfterFinish ke true.

consumerGroup

Nama grup konsumen.

String

Tidak

Tidak ada nilai default

Grup konsumen mencatat kemajuan konsumsi. Anda dapat menentukan nama grup konsumen kustom. Format nama tidak tetap.

batchGetSize

Jumlah grup log yang dibaca dalam satu permintaan.

Integer

Tidak

100

Untuk mencegah kesalahan, atur batchGetSize ke nilai kurang dari 1.000.

maxRetries

Jumlah percobaan ulang setelah pembacaan dari SLS gagal.

Integer

Tidak

3

exitAfterFinish

Menentukan apakah program Flink keluar setelah konsumsi data selesai.

Boolean

Tidak

false

  • true

  • false

query

Pernyataan query yang digunakan untuk memproses data sebelum Flink mengonsumsi data dari SLS.

String

Tidak

Tidak ada nilai default

Mengonfigurasi opsi ini dapat membantu Anda menyaring data sebelum dikonsumsi, mengurangi biaya, dan meningkatkan efisiensi pemrosesan data.

Sebagai contoh, 'query' = '*| where request_method = ''GET''' menunjukkan penyaringan data di mana request_method adalah GET.

Catatan

Gunakan sintaksis SPL untuk menulis query.

Penting
  • Untuk wilayah yang mendukung fitur ini, lihat Mengonsumsi log berdasarkan aturan.

  • Fitur ini sedang dalam pratinjau publik dan saat ini gratis. Anda mungkin akan dikenakan biaya di masa mendatang. Untuk informasi lebih lanjut, lihat Tagihan.

compressType

Tipe kompresi.

String

Tidak

Tidak ada nilai default

Nilai yang valid:

  • lz4

  • deflate

  • zstd

zonaWaktu

Zona waktu untuk startTime dan stopTime.

String

Tidak

Tidak ada nilai default

Secara default, tidak ada offset yang ditambahkan.

regionId

Wilayah tempat SLS berada.

String

Tidak

Tidak ada nilai default

Lihat Wilayah yang didukung.

signVersion

Versi tanda tangan permintaan SLS.

String

Tidak

Tidak ada nilai default

Lihat Tanda tangan permintaan.

shardModDivisor

Pembagi yang digunakan saat membaca dari shard Logstore SLS.

Integer

Tidak

-1

Lihat Shard untuk mengonfigurasi opsi ini.

shardModRemainder

Sisa yang digunakan saat membaca dari shard Logstore SLS.

Integer

Tidak

-1

Lihat Shard untuk mengonfigurasi opsi ini.

metadata.list

Kolom metadata yang dilewatkan ke downstream.

String

Tidak

Tidak ada nilai default

Bidang metadata yang tersedia termasuk __source__, __topic__, __timestamp__, dan __tag__. Anda dapat memisahkannya dengan koma.

Pemetaan tipe data

Pemetaan tipe data untuk pengambilan data adalah sebagai berikut:

Tipe data SLS

Tipe data Flink CDC

STRING

STRING

Inferensi dan evolusi skema

  • Konsumsi Data Awal dan Inisialisasi Skema

    Konektor SLS mempertahankan skema dari Logstore saat ini. Sebelum membaca data dari Logstore, konektor mencoba melakukan konsumsi awal hingga maxPreFetchLogGroups grup log dari setiap shard dan menginisialisasi skema dengan cara menguraikan dan menggabungkan skema dari setiap log. Selanjutnya, sebelum konsumsi data dimulai, sebuah event pembuatan tabel dihasilkan berdasarkan skema yang telah diinisialisasi.

    Catatan

    Untuk setiap shard, konektor mencoba mengonsumsi data satu jam sebelum waktu saat ini untuk menguraikan skema.

  • Kunci Utama

    Log SLS tidak mengandung kunci utama. Tambahkan secara manual kunci utama ke tabel di modul transform:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • Inferensi dan Evolusi Skema

    Setelah inisialisasi skema, jika schema.inference.strategy disetel ke static, konektor menguraikan setiap entri log berdasarkan skema dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy disetel ke continuous, konektor menguraikan setiap entri log, melakukan inferensi kolom fisik, dan membandingkannya dengan skema saat ini. Jika skema yang diinferensikan tidak konsisten dengan skema saat ini, skema digabungkan sesuai aturan berikut:

    • Jika skema yang diinferensikan mengandung kolom fisik yang tidak ada dalam skema saat ini, kolom yang hilang ditambahkan ke skema saat ini, dan event penambahan kolom nullable dihasilkan.

    • Jika skema yang diinferensikan tidak mengandung kolom tertentu dalam skema saat ini, kolom tersebut dipertahankan dan nilainya disetel ke NULL.

    Konektor SLS menginferensikan semua bidang sebagai bidang string. Saat ini, hanya penambahan kolom yang didukung. Kolom baru ditambahkan ke skema saat ini sebagai kolom nullable.

Kode contoh

  • Tabel Sumber dan Tabel Sink:

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • Sumber Pengambilan Data:

    source:
       type: sls
       name: Sumber SLS
       endpoint: ${endpoint}
       project: ${project}
       logstore: ${logstore}
       accessId: ${accessId}
       accessKey: ${accessKey}
    
    sink:
      type: values
      name: Sink Nilai
      print.enabled: true
      sink.print.logger: true

DataStream API

Penting

Jika Anda ingin memanggil API DataStream untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Penggunaan konektor DataStream.

Jika Anda menggunakan versi VVR sebelum 8.0.10, Anda mungkin mengalami paket JAR dependensi yang hilang saat startup pekerjaan. Untuk menyelesaikannya, sertakan paket uber JAR yang sesuai sebagai dependensi tambahan.

Baca data dari SLS

VVR dari Realtime Compute for Apache Flink menyediakan kelas implementasi SlsSourceFunction dari SourceFunction untuk membaca data dari SLS. Contoh kode:

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Mengatur lingkungan eksekusi streaming
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Membuat dan menambahkan sumber dan sink SLS.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // Ukuran batch get harus diberikan.
        accessInfo.setBatchGetSize(10);

        // Parameter opsional
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // Waktu mulai konsumsi, disetel ke waktu saat ini.
        int startInSec = (int) (new Date().getTime() / 1000);

        // Waktu berhenti konsumsi, -1 berarti tidak pernah berhenti.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

Tulis data ke SLS

VVR dari Realtime Compute for Apache Flink menyediakan kelas implementasi SLSOutputFormat dari OutputFormat untuk menulis data ke SLS. Contoh kode:

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

DataStream Connector Layanan Log Sederhana dari versi yang berbeda disimpan di repositori pusat Maven.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

FAQ

Apa yang harus saya lakukan jika terjadi kesalahan OOM di TaskManagers dan pesan kesalahan "java.lang.OutOfMemoryError: Java heap space" muncul untuk tabel sumber ketika saya memulihkan program Flink yang gagal?