Simple Log Service (SLS) menyediakan plugin aliyun-log-flume untuk berintegrasi dengan Apache Flume. Anda dapat menggunakan plugin ini untuk menulis data log ke SLS dari sumber data lain atau mengonsumsi data log dari SLS dan mengirimkannya ke sistem downstream seperti Hadoop Distributed File System (HDFS) dan Kafka.
Cara kerja
Apache Flume menggunakan model aliran data Source-Channel-Sink. Plugin aliyun-log-flume menyediakan Sink kustom dan Source kustom yang menghubungkan SLS ke pipeline Flume:
Sink: Menerima data dari Channel Flume dan menuliskannya ke Logstore SLS. Gunakan komponen ini ketika Anda ingin mengingest data ke SLS dari sistem lain melalui Flume.
Source: Mengonsumsi data log dari Logstore SLS dan mengirimkannya ke Channel Flume. Gunakan komponen ini ketika Anda ingin mengirim data log SLS ke sistem lain melalui Flume.
Channel bertindak sebagai buffer antara Source dan Sink. Flume menyediakan tipe channel bawaan seperti Memory Channel dan File Channel. Untuk informasi lebih lanjut, lihat Apache Flume User Guide.
Untuk kode sumber plugin dan catatan rilis, lihat aliyun-log-flume di GitHub.
Prasyarat
Sebelum memulai, pastikan persyaratan berikut terpenuhi:
Java: JDK 1.8 atau versi yang lebih baru telah diinstal.
Apache Flume: Flume 1.8.0 atau versi yang lebih baru telah diinstal. Untuk mengunduh Flume, lihat halaman unduhan Apache Flume.
Sumber daya SLS: Proyek SLS dan Logstore telah dibuat. Untuk informasi lebih lanjut, lihat dokumentasi SLS.
Pasangan AccessKey: ID AccessKey dan Rahasia AccessKey telah diperoleh. Untuk keamanan, kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey.
Instal plugin
Unduh dan instal Flume. Untuk informasi lebih lanjut, lihat halaman unduhan Apache Flume.
Unduh file JAR plugin aliyun-log-flume dan simpan ke direktori
<FLUME_HOME>/lib. Tautan unduhan: aliyun-log-flume-1.9.jar.Buat file konfigurasi bernama
flumejob.confdi direktori<FLUME_HOME>/conf. Untuk konfigurasi Sink, lihat Konfigurasi Sink. Untuk konfigurasi Source, lihat Konfigurasi Source.Jalankan Flume.
bin/flume-ng agent -n agent -c conf -f conf/flumejob.conf
Konfigurasi Sink
Gunakan SLS Sink untuk menulis data dari sistem lain ke Logstore SLS melalui Flume. Sink mendukung tiga mode serialisasi untuk mengonversi event Flume menjadi entri log SLS:
| Mode | Perilaku |
|---|---|
| SIMPLE | Setiap isi event Flume ditulis ke SLS sebagai satu field. |
| DELIMITED | Isi setiap event Flume dipisah menjadi beberapa field berdasarkan delimiter dan dipetakan ke nama kolom yang dikonfigurasi. |
| JSON | Isi setiap event Flume diurai sebagai JSON. |
Parameter Sink
Parameter koneksi
| Parameter | Wajib | Deskripsi |
|---|---|---|
| type | Ya | Jenis Sink. Atur nilai ini ke com.aliyun.loghub.flume.sink.LoghubSink. |
| endpoint | Ya | Titik akhir proyek SLS. Contoh: http://cn-qingdao.log.aliyuncs.com. Pilih titik akhir berdasarkan wilayah proyek Anda. Untuk informasi lebih lanjut, lihat Endpoints. |
| project | Ya | Nama proyek SLS. |
| logstore | Ya | Nama Logstore. |
| accessKeyId | Ya | ID AccessKey Akun Alibaba Cloud atau RAM user Anda. Kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey. |
| accessKey | Ya | Rahasia AccessKey Akun Alibaba Cloud atau RAM user Anda. Kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey. |
Parameter batching
| Parameter | Wajib | Deskripsi |
|---|---|---|
| batchSize | Tidak | Jumlah entri log yang ditulis ke SLS dalam satu batch. Nilai default: 1000. |
| maxBufferSize | Tidak | Jumlah maksimum entri log yang diizinkan dalam antrian buffer internal. Nilai default: 1000. |
Parameter serialisasi
| Parameter | Wajib | Deskripsi |
|---|---|---|
| serializer | Tidak | Mode serialisasi untuk mengonversi event Flume menjadi entri log SLS. Nilai yang valid: SIMPLE (default), DELIMITED, JSON, atau nama kelas serializer kustom yang lengkap. |
| columns | Tidak | Daftar nama kolom yang dipisahkan koma. Wajib saat serializer diatur ke DELIMITED. Kolom dipetakan ke field sesuai urutan kemunculannya dalam setiap record. |
| separatorChar | Tidak | Karakter delimiter yang digunakan untuk memisahkan field. Harus berupa satu karakter. Wajib saat serializer diatur ke DELIMITED. Nilai default: , (koma). |
| quoteChar | Tidak | Karakter quote yang digunakan untuk membungkus field. Wajib saat serializer diatur ke DELIMITED. Nilai default: " (tanda kutip ganda). |
| escapeChar | Tidak | Karakter escape. Wajib saat serializer diatur ke DELIMITED. Nilai default: " (tanda kutip ganda). |
Parameter stempel waktu
| Parameter | Wajib | Deskripsi |
|---|---|---|
| useRecordTime | Tidak | Menentukan apakah akan menggunakan field stempel waktu dalam entri data sebagai waktu log saat menulis ke SLS. Nilai default: false. Saat diatur ke false, waktu sistem saat ini digunakan sebagai waktu log. |
Contoh konfigurasi Sink
Contoh berikut membaca data dari Avro Source dan menuliskannya ke Logstore SLS menggunakan serializer DELIMITED:
# Beri nama komponen
agent.sources = avroSrc
agent.channels = memCh
agent.sinks = slsSink
# Konfigurasi Avro Source
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = 0.0.0.0
agent.sources.avroSrc.port = 4141
agent.sources.avroSrc.channels = memCh
# Konfigurasi Memory Channel
agent.channels.memCh.type = memory
agent.channels.memCh.capacity = 1000
agent.channels.memCh.transactionCapacity = 100
# Konfigurasi SLS Sink
agent.sinks.slsSink.type = com.aliyun.loghub.flume.sink.LoghubSink
agent.sinks.slsSink.channel = memCh
agent.sinks.slsSink.endpoint = http://cn-hangzhou.log.aliyuncs.com
agent.sinks.slsSink.project = your-project
agent.sinks.slsSink.logstore = your-logstore
agent.sinks.slsSink.accessKeyId = your-access-key-id
agent.sinks.slsSink.accessKey = your-access-key-secret
agent.sinks.slsSink.batchSize = 1000
agent.sinks.slsSink.serializer = DELIMITED
agent.sinks.slsSink.columns = col1,col2,col3
agent.sinks.slsSink.separatorChar = ,Untuk contoh konfigurasi lainnya, lihat contoh Sink di GitHub.
Konfigurasi Source
Gunakan SLS Source untuk mengonsumsi data log dari Logstore SLS dan mengirimkannya ke sistem downstream melalui Flume. Source mendukung dua mode deserialisasi untuk mengonversi entri log SLS menjadi event Flume:
| Mode | Perilaku |
|---|---|
| DELIMITED | Field log digabung dengan delimiter dan ditulis sebagai isi event Flume. |
| JSON | Entri log diserialisasi sebagai JSON dan ditulis sebagai isi event Flume. |
Parameter Source
Parameter koneksi
| Parameter | Wajib | Deskripsi |
|---|---|---|
| type | Ya | Jenis Source. Atur nilai ini ke com.aliyun.loghub.flume.source.LoghubSource. |
| endpoint | Ya | Titik akhir proyek SLS. Contoh: http://cn-qingdao.log.aliyuncs.com. Pilih titik akhir berdasarkan wilayah proyek Anda. Untuk informasi lebih lanjut, lihat Endpoints. |
| project | Ya | Nama proyek SLS. |
| logstore | Ya | Nama Logstore. |
| accessKeyId | Ya | ID AccessKey Akun Alibaba Cloud atau RAM user Anda. Kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey. |
| accessKey | Ya | Rahasia AccessKey Akun Alibaba Cloud atau RAM user Anda. Kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey. |
Parameter kelompok konsumen
| Parameter | Wajib | Deskripsi |
|---|---|---|
| consumerGroup | Tidak | Nama kelompok konsumen yang digunakan untuk mengoordinasikan konsumsi di antara beberapa konsumen. Jika parameter ini tidak ditentukan, nama kelompok konsumen akan dihasilkan secara acak. |
| heartbeatIntervalMs | Tidak | Interval, dalam milidetik, di mana client konsumen mengirim pesan heartbeat ke SLS. Nilai default: 30000. |
| fetchIntervalMs | Tidak | Interval, dalam milidetik, antara permintaan pengambilan data berturut-turut ke SLS. Nilai default: 100. |
| fetchInOrder | Tidak | Menentukan apakah akan mengonsumsi data log sesuai urutan penulisannya ke SLS. Nilai default: false. |
| batchSize | Tidak | Jumlah entri log yang diambil per permintaan. Nilai default: 100. |
| initialPosition | Tidak | Posisi awal untuk konsumsi data. Nilai yang valid: begin (default), end, dan timestamp. Catatan: Jika checkpoint tersedia di SLS untuk kelompok konsumen yang ditentukan, checkpoint tersebut akan menggantikan pengaturan ini. |
| timestamp | Tidak | Stempel waktu UNIX yang menentukan titik waktu mulai konsumsi data. Wajib saat initialPosition diatur ke timestamp. |
Parameter deserialisasi
| Parameter | Wajib | Deskripsi |
|---|---|---|
| deserializer | Ya | Mode deserialisasi untuk mengonversi entri log SLS menjadi event Flume. Nilai yang valid: DELIMITED (default), JSON, atau nama kelas deserializer kustom yang lengkap. |
| columns | Tidak | Daftar nama kolom yang dipisahkan koma. Wajib saat deserializer diatur ke DELIMITED. Kolom dipetakan ke field sesuai urutan kemunculannya dalam setiap record. |
| separatorChar | Tidak | Karakter delimiter yang digunakan untuk menggabungkan field. Harus berupa satu karakter. Wajib saat deserializer diatur ke DELIMITED. Nilai default: , (koma). |
| quoteChar | Tidak | Karakter quote yang digunakan untuk membungkus field. Wajib saat deserializer diatur ke DELIMITED. Nilai default: " (tanda kutip ganda). |
| escapeChar | Tidak | Karakter escape. Wajib saat deserializer diatur ke DELIMITED. Nilai default: " (tanda kutip ganda). |
| appendTimestamp | Tidak | Menentukan apakah akan menambahkan stempel waktu log sebagai field tambahan. Berlaku saat deserializer diatur ke DELIMITED. Nilai default: false. |
Opsi field JSON
Parameter berikut hanya berlaku saat deserializer diatur ke JSON.
| Parameter | Wajib | Deskripsi |
|---|---|---|
| sourceAsField | Tidak | Menentukan apakah akan menyertakan sumber log sebagai field bernama __source__. Nilai default: false. |
| tagAsField | Tidak | Menentukan apakah akan menyertakan tag log sebagai field. Setiap tag ditambahkan sebagai field bernama __tag__:{nama tag}. Nilai default: false. |
| timeAsField | Tidak | Menentukan apakah akan menyertakan waktu log sebagai field bernama __time__. Nilai default: false. |
Parameter stempel waktu
| Parameter | Wajib | Deskripsi |
|---|---|---|
| useRecordTime | Tidak | Menentukan apakah akan menggunakan field stempel waktu dalam entri log sebagai stempel waktu event Flume. Nilai default: false. Saat diatur ke false, waktu sistem saat ini digunakan. |
Parameter pemrosesan SPL
Parameter berikut memungkinkan Anda untuk memfilter atau mentransformasi data log selama konsumsi menggunakan SLS Search Processing Language (SPL).
| Parameter | Wajib | Deskripsi |
|---|---|---|
| processor | Tidak | Ekspresi SPL SLS yang digunakan untuk memfilter atau mentransformasi data log selama konsumsi. Untuk informasi lebih lanjut tentang sintaks SPL, lihat dokumentasi SLS. |
| query | Tidak | Ekspresi kueri SPL SLS. Usang: Gunakan processor sebagai gantinya. |
Contoh konfigurasi Source
Contoh berikut mengonsumsi data dari Logstore SLS menggunakan deserializer JSON dan menuliskannya ke file log lokal:
# Beri nama komponen
agent.sources = slsSrc
agent.channels = memCh
agent.sinks = loggerSink
# Konfigurasi SLS Source
agent.sources.slsSrc.type = com.aliyun.loghub.flume.source.LoghubSource
agent.sources.slsSrc.channels = memCh
agent.sources.slsSrc.endpoint = http://cn-hangzhou.log.aliyuncs.com
agent.sources.slsSrc.project = your-project
agent.sources.slsSrc.logstore = your-logstore
agent.sources.slsSrc.accessKeyId = your-access-key-id
agent.sources.slsSrc.accessKey = your-access-key-secret
agent.sources.slsSrc.deserializer = JSON
agent.sources.slsSrc.sourceAsField = true
agent.sources.slsSrc.tagAsField = true
agent.sources.slsSrc.timeAsField = true
agent.sources.slsSrc.consumerGroup = flume-consumer
agent.sources.slsSrc.initialPosition = begin
# Konfigurasi Memory Channel
agent.channels.memCh.type = memory
agent.channels.memCh.capacity = 1000
agent.channels.memCh.transactionCapacity = 100
# Konfigurasi Logger Sink
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memChUntuk contoh konfigurasi lainnya, lihat contoh Source di GitHub.
Konfigurasi Channel
Channel adalah buffer yang menghubungkan Source ke Sink dalam pipeline Flume. Plugin aliyun-log-flume bekerja dengan semua tipe Channel Flume standar. Dua opsi paling umum adalah:
Memory Channel: Menyimpan event di memori. Menawarkan throughput tinggi tetapi event akan hilang jika proses agen Flume dimulai ulang. Cocok untuk kasus penggunaan di mana kehilangan data dapat ditoleransi.
agent.channels.memCh.type = memory agent.channels.memCh.capacity = 10000 agent.channels.memCh.transactionCapacity = 1000File Channel: Menyimpan event ke disk. Memberikan ketahanan dengan biaya throughput yang lebih rendah. Cocok untuk beban kerja produksi di mana kehilangan data tidak dapat diterima.
agent.channels.fileCh.type = file agent.channels.fileCh.checkpointDir = /var/flume/checkpoint agent.channels.fileCh.dataDirs = /var/flume/data agent.channels.fileCh.capacity = 1000000 agent.channels.fileCh.transactionCapacity = 10000
Penting: Atur transactionCapacity Channel ke nilai yang sama dengan atau lebih besar dari batchSize yang dikonfigurasi pada Sink atau Source. Jika kapasitas transaksi lebih kecil daripada ukuran batch, Sink atau Source tidak dapat menyelesaikan batch dalam satu transaksi dan akan terjadi error.