Topik ini menjelaskan cara menggunakan konektor Layanan Log Sederhana (SLS).
Informasi latar belakang
Simple Log Service adalah layanan pencatatan data end-to-end yang membantu Anda mengumpulkan, mengonsumsi, mengirimkan, menanyakan, dan menganalisis data log secara efisien. Layanan ini meningkatkan efisiensi operasional dan O&M serta membantu membangun kapasitas untuk memproses volume besar data log.
Tabel berikut menjelaskan fitur-fitur yang didukung oleh konektor SLS.
Kategori | Detail |
Jenis yang didukung | Tabel Source dan Sink |
Mode operasi | Hanya mode streaming |
Metrik pemantauan spesifik | Tidak berlaku |
Format data | Tidak ada |
Jenis API | SQL, Datastream, dan YAML data ingestion |
Apakah Anda dapat memperbarui atau menghapus data di tabel sink? | Anda tidak dapat memperbarui atau menghapus data di tabel sink. Anda hanya dapat menyisipkan data. |
Fitur
Konektor SLS untuk tabel source mendukung pembacaan langsung bidang atribut pesan. Tabel berikut menjelaskan bidang atribut yang didukung.
Nama bidang | Tipe bidang | Deskripsi bidang |
__source__ | STRING METADATA VIRTUAL | Sumber pesan. |
__topic__ | STRING METADATA VIRTUAL | Topik pesan. |
__timestamp__ | BIGINT METADATA VIRTUAL | Waktu log. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | Tag pesan. Untuk atribut |
Prasyarat
Anda telah membuat proyek Simple Log Service dan Logstore. Untuk informasi selengkapnya, lihat Buat proyek dan Logstore.
Batasan
Hanya Realtime Compute for Apache Flink VVR 11.1 atau yang lebih baru yang mendukung penggunaan Simple Log Service (SLS) sebagai sumber data sinkronisasi untuk data ingestion dalam format YAML.
Konektor SLS hanya menjamin semantik at-least-once.
Kami sangat menyarankan agar Anda tidak mengatur konkurensi source ke nilai yang lebih besar dari jumlah shard. Praktik ini membuang sumber daya dan dapat menyebabkan fitur failover otomatis gagal pada VVR 8.0.5 atau versi sebelumnya jika jumlah shard berubah, sehingga beberapa shard tidak dikonsumsi.
SQL
Sintaks
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);DENGAN PARAMETER
Umum
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Catatan
connector
Jenis tabel.
String
Ya
Tidak ada
Atur parameter ini ke sls.
endPoint
Titik akhir.
String
Ya
Tidak ada
Masukkan endpoint jaringan pribadi SLS. Untuk informasi selengkapnya, lihat Endpoints.
CatatanSecara default, Realtime Compute for Apache Flink tidak dapat mengakses jaringan publik. Namun, Alibaba Cloud menyediakan NAT Gateway untuk mengaktifkan komunikasi antara VPC dan jaringan publik. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses jaringan publik?.
Kami tidak menyarankan Anda mengakses SLS melalui jaringan publik. Jika Anda harus mengakses SLS melalui jaringan publik, gunakan protokol HTTPS dan aktifkan Global Accelerator (GA) untuk SLS. Untuk informasi selengkapnya, lihat Kelola akselerasi transfer.
project
Nama proyek SLS.
String
Ya
Tidak ada
Tidak ada.
logStore
Nama Logstore atau metricstore SLS.
String
Ya
Tidak ada
Data dalam Logstore dikonsumsi dengan cara yang sama seperti dalam metricstore.
accessId
ID AccessKey akun Alibaba Cloud Anda.
String
Ya
Tidak ada
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?.
PentingUntuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Kelola variabel.
accessKey
Rahasia AccessKey akun Alibaba Cloud Anda.
String
Ya
Tidak ada
Khusus tabel source
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Catatan
enableNewSource
Menentukan apakah akan mengaktifkan sumber data baru yang mengimplementasikan antarmuka FLIP-27.
Boolean
Tidak
false
Sumber data baru dapat secara otomatis beradaptasi terhadap perubahan shard dan memastikan bahwa shard didistribusikan secara merata di seluruh semua konkurensi source.
PentingParameter ini hanya didukung di Realtime Compute for Apache Flink VVR 8.0.9 atau yang lebih baru. Parameter ini diatur ke true secara default mulai dari VVR 11.1.
Pekerjaan tidak dapat dipulihkan dari state setelah item konfigurasi ini diubah. Untuk mengatasi masalah ini, Anda dapat terlebih dahulu mengatur item konfigurasi consumerGroup untuk memulai pekerjaan dan mencatat progres konsumsi dalam kelompok konsumen SLS. Kemudian, atur item konfigurasi consumeFromCheckpoint ke true dan mulai pekerjaan secara tanpa status. Dengan cara ini, pekerjaan dapat melanjutkan konsumsi dari progres historis.
Jika terdapat shard read-only di SLS, beberapa task Flink konkuren tetap meminta data dari shard lain yang belum selesai setelah mereka selesai mengonsumsi data dari shard read-only tersebut. Hal ini dapat menyebabkan beberapa task konkuren ditugaskan ke beberapa shard, sehingga menyebabkan ketidakseimbangan distribusi shard di antara mereka. Ketidakseimbangan ini memengaruhi efisiensi konsumsi keseluruhan dan kinerja sistem. Untuk mengurangi masalah ini, sesuaikan konkurensi, optimalkan kebijakan penjadwalan task, atau gabungkan shard kecil untuk mengurangi jumlah shard dan kompleksitas penugasan task.
shardDiscoveryIntervalMs
Interval deteksi dinamis perubahan shard. Satuan: milidetik.
Long
Tidak
60000
Atur parameter ini ke nilai negatif untuk menonaktifkan deteksi dinamis.
CatatanNilai parameter ini tidak boleh kurang dari 1 menit (60.000 milidetik).
Parameter ini hanya berlaku ketika parameter enableNewSource diatur ke true.
Parameter ini hanya didukung di Realtime Compute for Apache Flink VVR 8.0.9 atau yang lebih baru.
startupMode
Mode startup tabel sumber.
String
Tidak
timestamp
timestamp(default): Log dikonsumsi dari waktu mulai yang ditentukan.latest: Log dikonsumsi dari offset terbaru.earliest: Log dikonsumsi dari offset paling awal.consumer_group: Log dikonsumsi dari offset yang dicatat dalam kelompok konsumen. Jika kelompok konsumen tidak mencatat offset konsumen suatu shard, log dikonsumsi dari offset paling awal.
PentingPada versi VVR sebelum 11.1, nilai consumer_group tidak didukung. Anda harus mengatur
consumeFromCheckpointketrue. Dalam kasus ini, log dikonsumsi dari offset yang dicatat dalam kelompok konsumen yang ditentukan, dan mode startup yang ditentukan di sini tidak berlaku.
startTime
Waktu mulai konsumsi log.
String
Tidak
Waktu saat ini
Formatnya adalah
yyyy-MM-dd hh:mm:ss.Parameter ini hanya berlaku ketika
startupModediatur ketimestamp.CatatanParameter startTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan atribut __timestamp__.
stopTime
Waktu berakhirnya konsumsi log.
String
Tidak
Tidak ada
Formatnya adalah
yyyy-MM-dd hh:mm:ss.CatatanParameter ini hanya digunakan untuk mengonsumsi log historis dan harus diatur ke titik waktu di masa lalu. Jika Anda mengaturnya ke waktu di masa depan, konsumsi mungkin berhenti lebih awal jika tidak ada log baru yang ditulis. Hal ini tampak sebagai gangguan aliran data tanpa pesan error.
Jika Anda ingin program Flink keluar setelah konsumsi log selesai, Anda juga harus mengatur exitAfterFinish=true.
consumerGroup
Nama grup konsumen.
String
Tidak
Tidak ada
Kelompok konsumen digunakan untuk mencatat progres konsumsi. Anda dapat menentukan nama kustom untuk kelompok konsumen. Formatnya tidak tetap.
CatatanBeberapa pekerjaan tidak dapat menggunakan kelompok konsumen yang sama untuk konsumsi kolaboratif. Pekerjaan Flink yang berbeda harus memiliki kelompok konsumen yang berbeda. Jika pekerjaan Flink yang berbeda menggunakan kelompok konsumen yang sama, mereka akan mengonsumsi semua data. Hal ini karena ketika Flink mengonsumsi data dari SLS, alokasi partisi tidak dilakukan melalui kelompok konsumen SLS. Akibatnya, setiap konsumen mengonsumsi pesannya sendiri secara independen, meskipun kelompok konsumennya sama.
consumeFromCheckpoint
Menentukan apakah akan mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen yang ditentukan.
String
Tidak
false
true: Anda juga harus menentukan kelompok konsumen. Program Flink mengonsumsi log dari checkpoint yang disimpan dalam kelompok konsumen. Jika kelompok konsumen tidak memiliki checkpoint yang sesuai, log dikonsumsi dari waktu yang ditentukan oleh parameter startTime.false(default): Log tidak dikonsumsi dari checkpoint yang disimpan dalam kelompok konsumen yang ditentukan.
PentingParameter ini tidak lagi didukung mulai dari VVR 11.1. Untuk VVR 11.1 dan yang lebih baru, Anda harus mengatur
startupModekeconsumer_group.maxRetries
Jumlah percobaan ulang setelah kegagalan membaca data dari SLS.
String
Tidak
3
Tidak ada.
batchGetSize
Jumlah kelompok log yang dibaca dalam satu permintaan.
String
Tidak
100
Nilai
batchGetSizetidak boleh melebihi 1.000. Jika tidak, akan muncul error.exitAfterFinish
Menentukan apakah program Flink keluar setelah konsumsi data selesai.
String
Tidak
false
true: Program Flink keluar setelah konsumsi data selesai.false(default): Program Flink tidak keluar setelah konsumsi data selesai.
query
PentingParameter ini sudah tidak digunakan lagi di VVR 11.3 dan yang lebih baru, tetapi tetap kompatibel dengan versi selanjutnya.
Pernyataan pra-pemrosesan untuk konsumsi data SLS.
String
Tidak
Tidak ada
Menggunakan parameter query memungkinkan Anda memfilter data sebelum dikonsumsi dari SLS. Hal ini mencegah semua data dikonsumsi ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan.
Misalnya,
'query' = '*| where request_method = ''GET'''menunjukkan bahwa sebelum Flink membaca data dari SLS, data tersebut dicocokkan dengan nilai bidang request_method yang bernilai 'get'.CatatanParameter query memerlukan penggunaan Structured Process Language (SPL) untuk Simple Log Service. Untuk informasi selengkapnya, lihat Sintaksis SPL.
PentingParameter ini hanya didukung di Realtime Compute for Apache Flink VVR 8.0.1 atau yang lebih baru.
Fitur ini dikenakan biaya oleh Simple Log Service. Untuk informasi selengkapnya, lihat Penagihan.
processor
Prosesor Consum SLS. Jika parameter ini dan `query` dikonfigurasi bersamaan, `query` akan diutamakan dan `processor` tidak berlaku.
String
Tidak
Tidak ada
Menggunakan parameter processor memungkinkan Anda memfilter data sebelum dikonsumsi oleh Flink. Hal ini membantu menghemat biaya dan meningkatkan kecepatan pemrosesan. Kami menyarankan agar Anda menggunakan parameter processor daripada parameter query.
Misalnya,
'processor' = 'test-filter-processor'menunjukkan bahwa data difilter oleh Prosesor Consum SLS sebelum dibaca oleh Flink.CatatanProsesor harus menggunakan Structured Process Language (SPL) dari Simple Log Service (SLS). Untuk informasi selengkapnya, lihat Sintaksis SPL. Untuk membuat dan memperbarui Prosesor Consum SLS, lihat Kelola Prosesor Consum.
PentingParameter ini hanya didukung di Realtime Compute for Apache Flink VVR 11.3 atau yang lebih baru.
Fitur ini dikenakan biaya oleh Simple Log Service. Untuk informasi selengkapnya, lihat Penagihan.
Hanya untuk tabel sink
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Catatan
topicField
Menentukan nama bidang. Nilai bidang ini akan menggantikan nilai bidang atribut __topic__ dan menunjukkan topik log.
String
Tidak
Tidak ada
Nilai parameter ini harus merupakan salah satu bidang yang ada dalam tabel.
timeField
Menentukan nama bidang. Nilai bidang ini akan menggantikan nilai bidang atribut __timestamp__ dan menunjukkan waktu penulisan log.
String
Tidak
Waktu saat ini
Nilai parameter ini harus merupakan salah satu bidang yang ada dalam tabel, dan tipe bidangnya harus INT. Jika parameter ini tidak ditentukan, waktu saat ini digunakan secara default.
sourceField
Menentukan nama bidang. Nilai bidang ini akan menggantikan nilai bidang atribut __source__ dan menunjukkan sumber log, seperti alamat IP mesin yang menghasilkan log.
String
Tidak
Tidak ada
Nilai parameter ini harus merupakan salah satu bidang yang ada dalam tabel.
partitionField
Menentukan nama bidang. Saat data ditulis, nilai hash dihitung berdasarkan nilai kolom ini. Data dengan nilai hash yang sama ditulis ke shard yang sama.
String
Tidak
Tidak ada
Jika parameter ini tidak ditentukan, setiap data ditulis secara acak ke shard yang tersedia saat itu.
buckets
Jumlah kelompok yang dikelompokkan ulang berdasarkan nilai hash saat partitionField ditentukan.
String
Tidak
64
Nilai parameter ini harus berada dalam rentang [1, 256] dan harus merupakan pangkat bulat dari 2. Jumlah bucket harus lebih besar dari atau sama dengan jumlah shard. Jika tidak, beberapa shard tidak akan menerima data.
flushIntervalMs
Periode yang memicu penulisan data.
String
Tidak
2000
Satuan: milidetik.
writeNullProperties
Menentukan apakah akan menulis nilai null sebagai string kosong ke SLS.
Boolean
Tidak
true
true(default): Nilai null ditulis sebagai string kosong ke log.false: Bidang dengan nilai null tidak ditulis ke log.
CatatanParameter ini hanya didukung di Realtime Compute for Apache Flink VVR 8.0.6 atau yang lebih baru.
Pemetaan tipe
Tipe bidang Flink | Tipe bidang SLS |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Pengambilan data
Batasan
Fitur ini hanya didukung di Realtime Compute for Apache Flink VVR 11.1 atau yang lebih baru.
Sintaks
source:
type: sls
name: Sumber SLS
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>Item konfigurasi
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
jenis | Jenis sumber data. | String | Ya | Tidak ada | Atur parameter ini ke sls. |
Titik akhir | Titik akhir. | String | Ya | Tidak ada | Masukkan endpoint jaringan pribadi SLS. Untuk informasi selengkapnya, lihat Endpoints. Catatan
|
accessId | ID AccessKey dari akun Alibaba Cloud Anda. | String | Ya | Tidak ada | Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?. Penting Untuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Kelola variabel. |
accessKey | Rahasia AccessKey dari akun Alibaba Cloud Anda. | String | Ya | Tidak ada | |
Proyek | Nama dari proyek SLS. | String | Ya | Tidak ada | Tidak ada. |
logStore | Nama Logstore atau metricstore SLS. | String | Ya | Tidak ada | Data dalam Logstore dikonsumsi dengan cara yang sama seperti dalam metricstore. |
schema.inference.strategy | Strategi inferensi skema. | String | Tidak | continuous |
|
maxPreFetchLogGroups | Jumlah maksimum kelompok log yang akan dibaca dan diurai untuk setiap shard selama inferensi skema awal. | Integer | Tidak | 50 | Sebelum pekerjaan benar-benar membaca dan memproses data, pekerjaan tersebut mencoba melakukan pre-consume sejumlah kelompok log yang ditentukan untuk setiap shard guna menginisialisasi informasi skema. |
shardDiscoveryIntervalMs | Interval deteksi dinamis perubahan shard. Satuan: milidetik. | Long | Tidak | 60000 | Atur parameter ini ke nilai negatif untuk menonaktifkan deteksi dinamis. Catatan Nilai parameter ini tidak boleh kurang dari 1 menit (60.000 milidetik). |
startupMode | Mode startup. | String | Tidak | Tidak ada |
|
startTime | Waktu mulai konsumsi log. | String | Tidak | Waktu saat ini | Formatnya adalah yyyy-MM-dd hh:mm:ss. Parameter ini hanya berlaku ketika startupMode diatur ke timestamp. Catatan Parameter startTime dan stopTime didasarkan pada atribut __receive_time__ di SLS, bukan atribut __timestamp__. |
stopTime | Waktu berakhirnya konsumsi log. | String | Tidak | Tidak ada | Formatnya adalah yyyy-MM-dd hh:mm:ss. Catatan Jika Anda ingin program Flink keluar setelah konsumsi log selesai, Anda juga harus mengatur exitAfterFinish=true. |
consumerGroup | Nama grup konsumen. | String | Tidak | Tidak ada | Kelompok konsumen digunakan untuk mencatat progres konsumsi. Anda dapat menentukan nama kustom untuk kelompok konsumen. Formatnya tidak tetap. |
batchGetSize | Jumlah kelompok log yang dibaca dalam satu permintaan. | Integer | Tidak | 100 | Nilai batchGetSize tidak boleh melebihi 1.000. Jika tidak, akan muncul error. |
maxRetries | Jumlah percobaan ulang setelah kegagalan membaca data dari SLS. | Integer | Tidak | 3 | Tidak ada. |
exitAfterFinish | Menentukan apakah program Flink keluar setelah konsumsi data selesai. | Boolean | Tidak | false |
|
query | Pernyataan pra-pemrosesan untuk konsumsi data SLS. | String | Tidak | Tidak ada | Menggunakan parameter query memungkinkan Anda memfilter data sebelum dikonsumsi dari SLS. Hal ini mencegah semua data dikonsumsi ke Flink, sehingga menghemat biaya dan meningkatkan kecepatan pemrosesan. Misalnya, Catatan Parameter query memerlukan penggunaan Structured Process Language (SPL) untuk Simple Log Service. Untuk informasi selengkapnya, lihat Sintaksis SPL. Penting
|
compressType | Jenis kompresi untuk SLS. | String | Tidak | Tidak ada | Jenis kompresi yang didukung meliputi:
|
zonaWaktu | Zona waktu untuk startTime dan stopTime. | String | Tidak | Tidak ada | Secara default, tidak ada offset yang ditambahkan. |
regionId | Wilayah tempat layanan SLS diaktifkan. | String | Tidak | Tidak ada | Lihat Wilayah yang didukung untuk konfigurasi. |
signVersion | Versi signature permintaan SLS. | String | Tidak | Tidak ada | Untuk informasi selengkapnya tentang konfigurasi, lihat Request Signature. |
shardModDivisor | Divisor untuk membaca partisi Logstore SLS. | Int | Tidak | -1 | Lihat Shard untuk mengonfigurasi opsi ini. |
shardModRemainder | Sisa kuota untuk membaca partisi dari penyimpanan log SLS. | Int | Tidak | -1 | Lihat Shard untuk mengonfigurasi opsi ini. |
metadata.list | Kolom metadata yang diteruskan ke downstream. | String | Tidak | Tidak ada | Bidang metadata yang tersedia meliputi |
Pemetaan tipe
Tabel berikut menunjukkan pemetaan tipe untuk data ingestion.
Tipe bidang SLS | Tipe bidang CDC |
STRING | STRING |
Inferensi skema tabel dan sinkronisasi perubahan
Pre-konsumsi data shard dan inisialisasi skema tabel
Konektor SLS mempertahankan skema Logstore yang sedang dibaca. Sebelum membaca data dari Logstore, konektor SLS mencoba melakukan pre-consume hingga maxPreFetchLogGroups kelompok log dari setiap shard. Konektor kemudian mengurai skema setiap log dalam kelompok log tersebut dan menggabungkan skema-skema tersebut untuk menginisialisasi skema tabel. Sebelum konsumsi data aktual dimulai, event pembuatan tabel yang sesuai dihasilkan berdasarkan skema yang diinisialisasi.
CatatanUntuk setiap shard, konektor SLS mencoba mengonsumsi data dan mengurai skema log mulai dari satu jam sebelum waktu saat ini.
Informasi primary key
Log SLS tidak berisi informasi primary key. Anda dapat menambahkan primary key secara manual ke tabel menggunakan aturan transformasi:
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2Inferensi skema dan sinkronisasi perubahan
Setelah skema tabel diinisialisasi, jika schema.inference.strategy diatur ke static, konektor SLS mengurai setiap log berdasarkan skema tabel awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, konektor SLS mengurai data setiap log, menginferensi kolom fisik, dan membandingkannya dengan skema yang saat ini dicatat. Jika skema yang diinferensi tidak konsisten dengan skema saat ini, skema-skema tersebut digabung berdasarkan aturan berikut:
Jika kolom fisik yang diinferensi berisi bidang yang tidak ada dalam skema saat ini, bidang tersebut ditambahkan ke skema, dan event penambahan kolom nullable dihasilkan.
Jika kolom fisik yang diinferensi tidak berisi bidang yang sudah ada dalam skema saat ini, bidang tersebut dipertahankan, datanya diisi dengan NULL, dan tidak ada event penghapusan kolom yang dihasilkan.
Konektor SLS menginferensi tipe semua bidang dalam setiap log sebagai String. Saat ini, hanya penambahan kolom yang didukung. Kolom baru ditambahkan di akhir skema saat ini dan diatur sebagai kolom nullable.
Contoh kode
Tabel source dan sink SQL
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;Sumber data untuk ingestasi data
Anda dapat menggunakan SLS sebagai sumber data untuk pekerjaan data ingestion guna menulis data SLS secara real-time ke sistem downstream yang didukung. Misalnya, Anda dapat mengonfigurasi pekerjaan data ingestion sebagai berikut untuk menulis data dari Logstore ke data lake DLF dalam format paimon. Pekerjaan ini secara otomatis menginferensi tipe data bidang dan skema tabel downstream serta mendukung evolusi skema dinamis saat runtime.
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# Tambahkan informasi primary key ke tabel.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# Tulis semua data dari test_log ke tabel test_database.inventory.
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueDataStream API
Saat Anda membaca atau menulis data menggunakan DataStream API, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi selengkapnya, lihat Gunakan konektor DataStream.
Jika Anda menggunakan versi VVR sebelum 8.0.10, paket JAR dependensi mungkin hilang saat Anda memulai pekerjaan. Untuk mengatasi masalah ini, Anda dapat menambahkan uber package yang sesuai ke dependensi tambahan.
Baca data dari SLS
Realtime Compute for Apache Flink menyediakan kelas SlsSourceFunction, yang merupakan implementasi dari SourceFunction, untuk membaca data dari SLS. Kode berikut memberikan contohnya.
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Mengatur lingkungan eksekusi streaming
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Membuat dan menambahkan sumber dan sink SLS.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// Ukuran batch get harus diberikan.
accessInfo.setBatchGetSize(10);
// Parameter opsional
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// Waktu mulai konsumsi, disetel ke waktu saat ini.
int startInSec = (int) (new Date().getTime() / 1000);
// Waktu berhenti konsumsi, -1 berarti tidak pernah berhenti.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}Tulis data ke SLS
Kelas SLSOutputFormat, yang merupakan implementasi dari OutputFormat, disediakan untuk menulis data ke SLS. Kode berikut memberikan contohnya.
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
Konektor DataStream SLS tersedia di repositori pusat Maven.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>