全部产品
Search
文档中心

Simple Log Service:Gunakan Flume untuk mengonsumsi data log

更新时间:Feb 28, 2026

Simple Log Service (SLS) menyediakan plugin aliyun-log-flume untuk berintegrasi dengan Apache Flume. Anda dapat menggunakan plugin ini untuk menulis data log ke SLS dari sumber data lain atau mengonsumsi data log dari SLS dan mengirimkannya ke sistem downstream seperti Hadoop Distributed File System (HDFS) dan Kafka.

Cara kerja

Apache Flume menggunakan model aliran data Source-Channel-Sink. Plugin aliyun-log-flume menyediakan Sink kustom dan Source kustom yang menghubungkan SLS ke pipeline Flume:

  • Sink: Menerima data dari Channel Flume dan menuliskannya ke Logstore SLS. Gunakan komponen ini ketika Anda ingin mengingest data ke SLS dari sistem lain melalui Flume.

  • Source: Mengonsumsi data log dari Logstore SLS dan mengirimkannya ke Channel Flume. Gunakan komponen ini ketika Anda ingin mengirim data log SLS ke sistem lain melalui Flume.

Channel bertindak sebagai buffer antara Source dan Sink. Flume menyediakan tipe channel bawaan seperti Memory Channel dan File Channel. Untuk informasi lebih lanjut, lihat Apache Flume User Guide.

Untuk kode sumber plugin dan catatan rilis, lihat aliyun-log-flume di GitHub.

Prasyarat

Sebelum memulai, pastikan persyaratan berikut terpenuhi:

  • Java: JDK 1.8 atau versi yang lebih baru telah diinstal.

  • Apache Flume: Flume 1.8.0 atau versi yang lebih baru telah diinstal. Untuk mengunduh Flume, lihat halaman unduhan Apache Flume.

  • Sumber daya SLS: Proyek SLS dan Logstore telah dibuat. Untuk informasi lebih lanjut, lihat dokumentasi SLS.

  • Pasangan AccessKey: ID AccessKey dan Rahasia AccessKey telah diperoleh. Untuk keamanan, kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey.

Instal plugin

  1. Unduh dan instal Flume. Untuk informasi lebih lanjut, lihat halaman unduhan Apache Flume.

  2. Unduh file JAR plugin aliyun-log-flume dan simpan ke direktori <FLUME_HOME>/lib. Tautan unduhan: aliyun-log-flume-1.9.jar.

  3. Buat file konfigurasi bernama flumejob.conf di direktori <FLUME_HOME>/conf. Untuk konfigurasi Sink, lihat Konfigurasi Sink. Untuk konfigurasi Source, lihat Konfigurasi Source.

  4. Jalankan Flume.

       bin/flume-ng agent -n agent -c conf -f conf/flumejob.conf

Konfigurasi Sink

Gunakan SLS Sink untuk menulis data dari sistem lain ke Logstore SLS melalui Flume. Sink mendukung tiga mode serialisasi untuk mengonversi event Flume menjadi entri log SLS:

ModePerilaku
SIMPLESetiap isi event Flume ditulis ke SLS sebagai satu field.
DELIMITEDIsi setiap event Flume dipisah menjadi beberapa field berdasarkan delimiter dan dipetakan ke nama kolom yang dikonfigurasi.
JSONIsi setiap event Flume diurai sebagai JSON.

Parameter Sink

Parameter koneksi

ParameterWajibDeskripsi
typeYaJenis Sink. Atur nilai ini ke com.aliyun.loghub.flume.sink.LoghubSink.
endpointYaTitik akhir proyek SLS. Contoh: http://cn-qingdao.log.aliyuncs.com. Pilih titik akhir berdasarkan wilayah proyek Anda. Untuk informasi lebih lanjut, lihat Endpoints.
projectYaNama proyek SLS.
logstoreYaNama Logstore.
accessKeyIdYaID AccessKey Akun Alibaba Cloud atau RAM user Anda. Kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey.
accessKeyYaRahasia AccessKey Akun Alibaba Cloud atau RAM user Anda. Kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey.

Parameter batching

ParameterWajibDeskripsi
batchSizeTidakJumlah entri log yang ditulis ke SLS dalam satu batch. Nilai default: 1000.
maxBufferSizeTidakJumlah maksimum entri log yang diizinkan dalam antrian buffer internal. Nilai default: 1000.

Parameter serialisasi

ParameterWajibDeskripsi
serializerTidakMode serialisasi untuk mengonversi event Flume menjadi entri log SLS. Nilai yang valid: SIMPLE (default), DELIMITED, JSON, atau nama kelas serializer kustom yang lengkap.
columnsTidakDaftar nama kolom yang dipisahkan koma. Wajib saat serializer diatur ke DELIMITED. Kolom dipetakan ke field sesuai urutan kemunculannya dalam setiap record.
separatorCharTidakKarakter delimiter yang digunakan untuk memisahkan field. Harus berupa satu karakter. Wajib saat serializer diatur ke DELIMITED. Nilai default: , (koma).
quoteCharTidakKarakter quote yang digunakan untuk membungkus field. Wajib saat serializer diatur ke DELIMITED. Nilai default: " (tanda kutip ganda).
escapeCharTidakKarakter escape. Wajib saat serializer diatur ke DELIMITED. Nilai default: " (tanda kutip ganda).

Parameter stempel waktu

ParameterWajibDeskripsi
useRecordTimeTidakMenentukan apakah akan menggunakan field stempel waktu dalam entri data sebagai waktu log saat menulis ke SLS. Nilai default: false. Saat diatur ke false, waktu sistem saat ini digunakan sebagai waktu log.

Contoh konfigurasi Sink

Contoh berikut membaca data dari Avro Source dan menuliskannya ke Logstore SLS menggunakan serializer DELIMITED:

# Beri nama komponen
agent.sources = avroSrc
agent.channels = memCh
agent.sinks = slsSink

# Konfigurasi Avro Source
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = 0.0.0.0
agent.sources.avroSrc.port = 4141
agent.sources.avroSrc.channels = memCh

# Konfigurasi Memory Channel
agent.channels.memCh.type = memory
agent.channels.memCh.capacity = 1000
agent.channels.memCh.transactionCapacity = 100

# Konfigurasi SLS Sink
agent.sinks.slsSink.type = com.aliyun.loghub.flume.sink.LoghubSink
agent.sinks.slsSink.channel = memCh
agent.sinks.slsSink.endpoint = http://cn-hangzhou.log.aliyuncs.com
agent.sinks.slsSink.project = your-project
agent.sinks.slsSink.logstore = your-logstore
agent.sinks.slsSink.accessKeyId = your-access-key-id
agent.sinks.slsSink.accessKey = your-access-key-secret
agent.sinks.slsSink.batchSize = 1000
agent.sinks.slsSink.serializer = DELIMITED
agent.sinks.slsSink.columns = col1,col2,col3
agent.sinks.slsSink.separatorChar = ,

Untuk contoh konfigurasi lainnya, lihat contoh Sink di GitHub.

Konfigurasi Source

Gunakan SLS Source untuk mengonsumsi data log dari Logstore SLS dan mengirimkannya ke sistem downstream melalui Flume. Source mendukung dua mode deserialisasi untuk mengonversi entri log SLS menjadi event Flume:

ModePerilaku
DELIMITEDField log digabung dengan delimiter dan ditulis sebagai isi event Flume.
JSONEntri log diserialisasi sebagai JSON dan ditulis sebagai isi event Flume.

Parameter Source

Parameter koneksi

ParameterWajibDeskripsi
typeYaJenis Source. Atur nilai ini ke com.aliyun.loghub.flume.source.LoghubSource.
endpointYaTitik akhir proyek SLS. Contoh: http://cn-qingdao.log.aliyuncs.com. Pilih titik akhir berdasarkan wilayah proyek Anda. Untuk informasi lebih lanjut, lihat Endpoints.
projectYaNama proyek SLS.
logstoreYaNama Logstore.
accessKeyIdYaID AccessKey Akun Alibaba Cloud atau RAM user Anda. Kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey.
accessKeyYaRahasia AccessKey Akun Alibaba Cloud atau RAM user Anda. Kami menyarankan Anda menggunakan pasangan AccessKey milik RAM user. Untuk informasi lebih lanjut, lihat Pasangan AccessKey.

Parameter kelompok konsumen

ParameterWajibDeskripsi
consumerGroupTidakNama kelompok konsumen yang digunakan untuk mengoordinasikan konsumsi di antara beberapa konsumen. Jika parameter ini tidak ditentukan, nama kelompok konsumen akan dihasilkan secara acak.
heartbeatIntervalMsTidakInterval, dalam milidetik, di mana client konsumen mengirim pesan heartbeat ke SLS. Nilai default: 30000.
fetchIntervalMsTidakInterval, dalam milidetik, antara permintaan pengambilan data berturut-turut ke SLS. Nilai default: 100.
fetchInOrderTidakMenentukan apakah akan mengonsumsi data log sesuai urutan penulisannya ke SLS. Nilai default: false.
batchSizeTidakJumlah entri log yang diambil per permintaan. Nilai default: 100.
initialPositionTidakPosisi awal untuk konsumsi data. Nilai yang valid: begin (default), end, dan timestamp. Catatan: Jika checkpoint tersedia di SLS untuk kelompok konsumen yang ditentukan, checkpoint tersebut akan menggantikan pengaturan ini.
timestampTidakStempel waktu UNIX yang menentukan titik waktu mulai konsumsi data. Wajib saat initialPosition diatur ke timestamp.

Parameter deserialisasi

ParameterWajibDeskripsi
deserializerYaMode deserialisasi untuk mengonversi entri log SLS menjadi event Flume. Nilai yang valid: DELIMITED (default), JSON, atau nama kelas deserializer kustom yang lengkap.
columnsTidakDaftar nama kolom yang dipisahkan koma. Wajib saat deserializer diatur ke DELIMITED. Kolom dipetakan ke field sesuai urutan kemunculannya dalam setiap record.
separatorCharTidakKarakter delimiter yang digunakan untuk menggabungkan field. Harus berupa satu karakter. Wajib saat deserializer diatur ke DELIMITED. Nilai default: , (koma).
quoteCharTidakKarakter quote yang digunakan untuk membungkus field. Wajib saat deserializer diatur ke DELIMITED. Nilai default: " (tanda kutip ganda).
escapeCharTidakKarakter escape. Wajib saat deserializer diatur ke DELIMITED. Nilai default: " (tanda kutip ganda).
appendTimestampTidakMenentukan apakah akan menambahkan stempel waktu log sebagai field tambahan. Berlaku saat deserializer diatur ke DELIMITED. Nilai default: false.

Opsi field JSON

Parameter berikut hanya berlaku saat deserializer diatur ke JSON.

ParameterWajibDeskripsi
sourceAsFieldTidakMenentukan apakah akan menyertakan sumber log sebagai field bernama __source__. Nilai default: false.
tagAsFieldTidakMenentukan apakah akan menyertakan tag log sebagai field. Setiap tag ditambahkan sebagai field bernama __tag__:{nama tag}. Nilai default: false.
timeAsFieldTidakMenentukan apakah akan menyertakan waktu log sebagai field bernama __time__. Nilai default: false.

Parameter stempel waktu

ParameterWajibDeskripsi
useRecordTimeTidakMenentukan apakah akan menggunakan field stempel waktu dalam entri log sebagai stempel waktu event Flume. Nilai default: false. Saat diatur ke false, waktu sistem saat ini digunakan.

Parameter pemrosesan SPL

Parameter berikut memungkinkan Anda untuk memfilter atau mentransformasi data log selama konsumsi menggunakan SLS Search Processing Language (SPL).

ParameterWajibDeskripsi
processorTidakEkspresi SPL SLS yang digunakan untuk memfilter atau mentransformasi data log selama konsumsi. Untuk informasi lebih lanjut tentang sintaks SPL, lihat dokumentasi SLS.
queryTidakEkspresi kueri SPL SLS. Usang: Gunakan processor sebagai gantinya.

Contoh konfigurasi Source

Contoh berikut mengonsumsi data dari Logstore SLS menggunakan deserializer JSON dan menuliskannya ke file log lokal:

# Beri nama komponen
agent.sources = slsSrc
agent.channels = memCh
agent.sinks = loggerSink

# Konfigurasi SLS Source
agent.sources.slsSrc.type = com.aliyun.loghub.flume.source.LoghubSource
agent.sources.slsSrc.channels = memCh
agent.sources.slsSrc.endpoint = http://cn-hangzhou.log.aliyuncs.com
agent.sources.slsSrc.project = your-project
agent.sources.slsSrc.logstore = your-logstore
agent.sources.slsSrc.accessKeyId = your-access-key-id
agent.sources.slsSrc.accessKey = your-access-key-secret
agent.sources.slsSrc.deserializer = JSON
agent.sources.slsSrc.sourceAsField = true
agent.sources.slsSrc.tagAsField = true
agent.sources.slsSrc.timeAsField = true
agent.sources.slsSrc.consumerGroup = flume-consumer
agent.sources.slsSrc.initialPosition = begin

# Konfigurasi Memory Channel
agent.channels.memCh.type = memory
agent.channels.memCh.capacity = 1000
agent.channels.memCh.transactionCapacity = 100

# Konfigurasi Logger Sink
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memCh

Untuk contoh konfigurasi lainnya, lihat contoh Source di GitHub.

Konfigurasi Channel

Channel adalah buffer yang menghubungkan Source ke Sink dalam pipeline Flume. Plugin aliyun-log-flume bekerja dengan semua tipe Channel Flume standar. Dua opsi paling umum adalah:

  • Memory Channel: Menyimpan event di memori. Menawarkan throughput tinggi tetapi event akan hilang jika proses agen Flume dimulai ulang. Cocok untuk kasus penggunaan di mana kehilangan data dapat ditoleransi.

      agent.channels.memCh.type = memory
      agent.channels.memCh.capacity = 10000
      agent.channels.memCh.transactionCapacity = 1000
  • File Channel: Menyimpan event ke disk. Memberikan ketahanan dengan biaya throughput yang lebih rendah. Cocok untuk beban kerja produksi di mana kehilangan data tidak dapat diterima.

      agent.channels.fileCh.type = file
      agent.channels.fileCh.checkpointDir = /var/flume/checkpoint
      agent.channels.fileCh.dataDirs = /var/flume/data
      agent.channels.fileCh.capacity = 1000000
      agent.channels.fileCh.transactionCapacity = 10000

Penting: Atur transactionCapacity Channel ke nilai yang sama dengan atau lebih besar dari batchSize yang dikonfigurasi pada Sink atau Source. Jika kapasitas transaksi lebih kecil daripada ukuran batch, Sink atau Source tidak dapat menyelesaikan batch dalam satu transaksi dan akan terjadi error.

Referensi